Rusts Polling mit benutzerdefinierten Futures verstehen
Takashi Yamamoto
Infrastructure Engineer · Leapcell

Einleitung
Asynchrone Programmierung hat sich zu einem unverzichtbaren Paradigma für die Erstellung performanter und reaktionsschneller Anwendungen entwickelt, insbesondere in Bereichen wie Netzwerke, I/O-gebundene Aufgaben und Systeme mit hoher Parallelität. Rust bietet mit seinem starken Typsystem und seinem Ownership-Modell einen leistungsstarken und sicheren Ansatz für die asynchrone Programmierung, der auf seinem Future-Trait basiert. Obwohl Sie Futures oft über die async/await-Syntax nutzen, ist ein tiefes Verständnis dafür, wie diese Abstraktionen im Hintergrund funktionieren, entscheidend für das Debugging, die Optimierung und sogar das Design benutzerdefinierter asynchroner Komponenten. Dieser Deep Dive in das Schreiben eines benutzerdefinierten Future wird den Polling-Mechanismus entmystifizieren, den grundlegenden Tanz zwischen Ihren asynchronen Aufgaben und dem Executor enthüllen und Sie letztendlich befähigen, die asynchronen Fähigkeiten von Rust mit größerem Vertrauen und Präzision zu nutzen.
Das Herz der asynchronen Ausführung: Polling
Bevor wir unseren benutzerdefinierten Future konstruieren, wollen wir ein klares Verständnis der beteiligten Kernkonzepte aufbauen:
- Future Trait: In Rust ist ein
Futureein Trait, der eine asynchrone Berechnung darstellt, die möglicherweise noch nicht abgeschlossen ist. Er verfügt über eine einzige Methode,poll, die ein Executor wiederholt aufruft, um den Fortschritt des Future zu überprüfen. - Executor: Ein Executor ist dafür verantwortlich,
Futures entgegenzunehmen und sie durch wiederholtes Aufrufen ihrerpoll-Methode zum Abschluss zu treiben. Er verwaltet den Lebenszyklus von Futures, plant Aufgaben und kümmert sich darum, Aufgaben zu wecken, wenn sie bereit sind, Fortschritte zu machen. Beliebte Exekutoren sindtokioundasync-std. - Polling: Dies ist die Handlung des Executors, der die
poll-Methode für einen nicht abgeschlossenenFutureaufruft. Wennpollaufgerufen wird, versucht der Future, Fortschritte zu machen. Poll-Enum: Diepoll-Methode gibt einePoll-Enum zurück, die zwei Varianten hat:Poll::Ready(T): Zeigt an, dass der Future erfolgreich abgeschlossen wurde undTdas Ergebnis der Berechnung ist.Poll::Pending: Zeigt an, dass der Future noch nicht abgeschlossen ist. WennPendingzurückgegeben wird, muss der Future sicherstellen, dass er so arrangiert wird, dass er geweckt wird (überWaker), wenn er bereit ist, weitere Fortschritte zu machen.
Waker: EinWakerist ein Low-Level-Mechanismus, der vom Executor bereitgestellt wird, um es einemFuturezu ermöglichen, zu signalisieren, dass es bereit ist, erneut gepollt zu werden. Wenn ein FuturePoll::Pendingzurückgibt, erfasst und klont es denWakeraus demContext. Später, wenn ein Ereignis ausgelöst wird, das den Future möglicherweise entblockiert (z. B. Daten kommen auf einem Socket an, ein Timer läuft ab), ruft der Futurewaker.wake_by_ref()auf, um den Executor zu benachrichtigen, ihn erneut zu pollen.Context: DerContext, der an diepoll-Methode übergeben wird, enthält einenWakerund andere Informationen, die für den Future nützlich sind, um mit dem Executor zu interagieren.
Erstellen eines benutzerdefinierten Future: Ein einfacher Verzug
Erstellen wir einen benutzerdefinierten Future, der einen nicht blockierenden Verzug einführt. Dies ermöglicht es uns, den Polling-Mechanismus direkt zu beobachten.
Wir definieren eine Delay-Struktur, die eine deadline (wann er abgeschlossen sein sollte) und einen optionalen Waker speichert, um die Aufgabe zu wecken.
use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll, Waker}; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use std::thread; // Repräsentiert den Zustand unseres Delay-Futures struct Delay { deadline: Instant, // Wir müssen den Waker speichern, um den Future zu wecken, wenn die Frist abläuft. // Arc<Mutex<Option<Waker>>> ermöglicht es uns, den Waker sicher zwischen Threads zu teilen und zu ändern. waker_storage: Arc<Mutex<Option<Waker>>>, // Ein Flag, um sicherzustellen, dass wir den Timer-Thread nur einmal starten. timer_thread_spawned: bool, } impl Delay { fn new(duration: Duration) -> Self { Delay { deadline: Instant::now() + duration, waker_storage: Arc::new(Mutex::new(None)), timer_thread_spawned: false, } } } // Implementieren Sie das Future-Trait für unsere Delay-Struktur impl Future for Delay { // Der Ausgabetyp unseres Futures ist Einheit, da er nur einen Verzug darstellt. type Output = (); // Das Herz des Futures: die poll-Methode fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { // Wenn die Frist bereits abgelaufen ist, ist der Future bereit. if Instant::now() >= self.deadline { println!("Delay future: Deadline reached. Returning Ready."); return Poll::Ready(()) } // --- Speichern des Wakers und Einrichten des Timers (nur einmal) --- // Wenn der Timer-Thread noch nicht gestartet wurde, richten Sie ihn ein. if !self.timer_thread_spawned { println!("Delay future: First poll. Storing waker and spawning timer thread."); // Speichern Sie den aktuellen Waker aus dem Kontext. // Dieser Waker wird vom Timer-Thread verwendet, um diese Aufgabe zu wecken. let mut waker_guard = self.waker_storage.lock().unwrap(); *waker_guard = Some(cx.waker().clone()); drop(waker_guard); // Sperre frühzeitig freigeben // Klonen Sie den Arc, um ihn an den neuen Thread zu übergeben. let waker_storage_clone = self.waker_storage.clone(); let duration_until_deadline = self.deadline.duration_since(Instant::now()); // Starten Sie einen neuen Thread, der bis zur Frist schläft // und dann die ursprüngliche Aufgabe weckt. thread::spawn(move || { thread::sleep(duration_until_deadline); println!("Delay timer thread: Deadline passed. Waking up the task."); // Rufen Sie den Waker ab und wecken Sie die Aufgabe if let Some(waker) = waker_storage_clone.lock().unwrap().take() { waker.wake(); } }); // Markieren Sie, dass der Timer-Thread gestartet wurde, um ein erneutes Starten zu vermeiden self.timer_thread_spawned = true; } else { // Dieser Teil behandelt nachfolgende Abfragen, wenn der Timer-Thread bereits ausgeführt wird. // Es ist wichtig, den Waker zu aktualisieren, wenn der Executor beschließt, die Aufgabe zu verschieben // oder neu zu planen. Wenn der Waker nicht aktualisiert wird, kann der vorherige Waker // veraltet werden, was dazu führt, dass die Aufgabe niemals geweckt wird. let mut waker_guard = self.waker_storage.lock().unwrap(); if waker_guard.as_ref().map_or(true, |w| !w.will_wake(cx.waker())) { println!("Delay future: Waker changed. Updating."); *waker_guard = Some(cx.waker().clone()); } } // Wenn die Frist noch nicht abgelaufen ist, ist der Future ausstehend. // Er wird erneut abgefragt, wenn `waker.wake()` vom Timer-Thread aufgerufen wird. println!("Delay future: Deadline not yet reached. Returning Pending."); Poll::Pending } } #[tokio::main] async fn main() { println!("Main: Starting program."); let delay_future = Delay::new(Duration::from_secs(2)); // Erstellen Sie einen 2-Sekunden-Verzug println!("Main: Awaiting delay future..."); delay_future.await; // Warten Sie auf unseren benutzerdefinierten Future println!("Main: Delay completed. Program finished."); }
Erläuterung des Delay-Future:
-
struct Delay:deadline: EinInstant, das angibt, wann der Verzug enden soll.waker_storage: EinArc<Mutex<Option<Waker>>>ist unerlässlich. DerWakermuss zwischen demFuture(dasself.waker_storagebesitzt) und dem separatenthread::spawn(daswakeaufruft) geteilt werden.Arcermöglicht gemeinsamen Besitz, undMutexbietet eine sichere interne Mutabilität, um denWakerzu speichern und abzurufen.Optionwird verwendet, da derWakermöglicherweise nicht beim allererstenpollverfügbar ist, bevor er gespeichert wird.timer_thread_spawned: Ein einfaches boolesches Flag, um sicherzustellen, dass wir unseren "Timer"-Thread nur einmal einrichten.
-
impl Future for Delay:type Output = ();: Unser Verzögerungs-Future wird einfach abgeschlossen und liefert keinen aussagekräftigen Wert.poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>: Dies ist der Kern.if Instant::now() >= self.deadline: Bei jedem Poll prüfen wir, ob die Frist abgelaufen ist. Wenn ja, sind wirReadyund gebenPoll::Ready(())zurück.if !self.timer_thread_spawned: Dieser bedingte Block stellt sicher, dass wir den eigentlichen Timer (denthread::spawn-Teil) nur einmal einrichten.let mut waker_guard = self.waker_storage.lock().unwrap(); *waker_guard = Some(cx.waker().clone());: Wir erhalten eine Sperre für unserenwaker_storage, klonen denWakeraus dem aktuellenContextund speichern ihn. DieserWakerverweist zurück auf diese spezifische Aufgabe, die gerade abgefragt wird.thread::spawn(...): Wir starten einen normalen Rust-Thread. Dieser Thread wird für die verbleibende Dauersleep()en. Dies ist ein blockierendessleepaus der Perspektive dieses Hilfs-Threads, blockiert aber nicht den Executor-Thread, da er sich in einem separaten Betriebssystem-Thread befindet.- Innerhalb des gestarteten Threads, nach dem Schlafen, ruft er den gespeicherten
Wakerab und ruftwaker.wake()auf. Dieserwake()-Aufruf teilt der async-Laufzeit (Tokio in unseremmain) mit, dass die dieserWakerzugeordnete Aufgabe nun bereit ist, erneut gepollt zu werden. self.timer_thread_spawned = true;: Wir setzen das Flag auf true, um zu verhindern, dass mehrere Timer-Threads für dasselbeDelay-Future gestartet werden.
else { ... }: Wenn der Timer-Thread bereits gestartet wurde (d. h. dies ist ein nachfolgender Poll eines bereits ausstehenden Futures), müssen wir immer noch prüfen, ob sich derWakerimContextgeändert hat (!w.will_wake(cx.waker())). Wenn ja, aktualisieren wir unseren gespeichertenWaker. Dies ist wichtig, da Exekutoren manchmal Aufgaben verschieben oder neu planen können, was einen neuenWakererfordert, um die Aufgabe korrekt zu benachrichtigen.Poll::Pending: Wenn die Frist noch nicht abgelaufen ist und der Timer eingerichtet ist, wartet der Future immer noch. Wir gebenPoll::Pendingzurück. Der Executor wird das Polling dieses Futures stoppen, biswaker.wake()aufgerufen wird.
Wie es mit tokio::main und await funktioniert:
Delay::new(Duration::from_secs(2)): EineDelay-Instanz wird erstellt.delay_future.await: Hier geschieht die Magie.- Tokios Executor erhält
delay_future. - Erster Poll: Der Executor ruft
delay_future.poll(cx)auf.- Die Frist wird nicht eingehalten.
timer_thread_spawnedistfalse.- Der
Wakerauscxwird geklont und indelay_future.waker_storagegespeichert. - Ein neuer
thread::spawnwird erstellt. Dieser Thread beginnt, für 2 Sekunden zu schlafen. timer_thread_spawnedwird auftruegesetzt.pollgibtPoll::Pendingzurück.
- Aktion des Executors nach
Poll::Pending: Der Executor weiß nun, dassdelay_futurenicht bereit ist. Er legt diese Aufgabe beiseite und beginnt, andere bereite Aufgaben (falls vorhanden) abzufragen oder wartet aufwaker.wake()-Aufrufe. Wichtig ist, dass der Tokio-Laufzeit-Thread durch denthread::sleepunseresthread::spawnNICHT blockiert wird. - Nach 2 Sekunden:
thread::spawnschließt seinenthread::sleepab.- Er ruft den gespeicherten
Wakerab und ruftwaker.wake()auf.
- Er ruft den gespeicherten
- Aktion des Executors nach
waker.wake(): Der Executor empfängt das Wecksignal für die mitdelay_futureverbundene Aufgabe. Er plantdelay_futureerneut abzufragen. - Zweiter (oder späterer) Poll: Der Executor ruft
delay_future.poll(cx)erneut auf.- Nun ist
Instant::now() >= self.deadlinewahr. pollgibtPoll::Ready(())zurück.
- Nun ist
- Abschluss: Der Ausdruck
delay_future.awaitwird schließlich abgeschlossen, und die Funktionmainwird fortgesetzt.
- Tokios Executor erhält
Schlussfolgerung
Durch die Implementierung eines benutzerdefinierten Delay-Future haben wir ein praktisches Verständnis des asynchronen Polling-Mechanismus von Rust gewonnen. Wir haben gesehen, wie Future::poll wiederholt von einem Executor aufgerufen wird, wie Poll::Pending einen unvollständigen Zustand signalisiert und vor allem, wie der Waker als Brücke fungiert, die es einer wartenden Aufgabe ermöglicht, dem Executor zu signalisieren, die Abfrage fortzusetzen, wenn Fortschritte erzielt werden können. Dieser explizite Tanz zwischen dem Future und dem Executor über den Waker ist das Fundament der effizienten, nicht blockierenden asynchronen Programmierung von Rust und ermöglicht leistungsstarke und skalierbare Anwendungen ohne den Overhead blockierender Threads. Die Beherrschung benutzerdefinierter Future-Implementierungen ist eine fortgeschrittene Fähigkeit, die tiefere Einblicke in das leistungsstarke asynchrone Ökosystem von Rust ermöglicht.