Rustにおける非同期処理:Futures、Executor、とタスクスケジューリング
Daniel Hayes
Full-Stack Engineer · Leapcell

Futureの定義
Future
は、Rustの非同期プログラミングの核心です。以下はFuture
トレイトの定義です。
#[must_use = "futures do nothing unless you `.await` or poll them"] #[lang = "future_trait"] pub trait Future { type Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>; } #[must_use = "this `Poll` may be a `Pending` variant, which should be handled"] pub enum Poll<T> { Ready(T), Pending, }
Future
は関連型Output
と、Poll<Self::Output>
を返すpoll()
メソッドを持ちます。Poll
はReady
とPending
の2つのバリアントを持つenumです。poll()
メソッドを呼び出すことで、Future
はタスクが完了して切り替わるまで、完了に向けてさらに進行できます。
現在の
poll
呼び出しで、Future
が完了している場合、Poll::Ready(result)
を返します。これはFuture
の値が返されることを意味します。Future
がまだ完了していない場合、Poll::Pending()
を返します。この時点で、Future
は中断され、何らかのイベント(wake関数経由)によって起動されるのを待ちます。
Executor(実行スケジューラ)
エグゼキュータはFuture
のスケジューラです。オペレーティングシステムはスレッドのスケジューリングを担当しますが、ユーザースペースのコルーチン(Future
など)はスケジュールしません。したがって、並行処理にコルーチンを使用するプログラムは、スケジューリングを処理するためのエグゼキュータが必要です。
RustのFuture
は遅延評価です。つまり、ポーリングされたときにのみ実行されます。それらを実行する方法の1つは、.await
を使用して別のasync関数内で別のasync関数を呼び出すことですが、それはasync関数自体の問題を解決するだけです。最も外側のasync関数は、エグゼキュータによって駆動される必要があります。
Executor Runtime
RustはFuture
のようなコルーチンを提供しますが、言語レベルでエグゼキュータは提供しません。コルーチンを使用しない場合は、ランタイムを導入する必要はありません。必要な場合は、エコシステムから選択できるさまざまなエグゼキュータが提供されます。
Rustの一般的なエグゼキュータを4つ紹介します。
futures
: このライブラリには、シンプルなビルトインエグゼキュータが付属しています。tokio
: エグゼキュータを提供します。#[tokio::main]
を使用すると、Tokioのエグゼキュータが暗黙的に含まれます。async-std
: Tokioと同様のエグゼキュータを提供します。smol
:async-executor
を提供し、主にblock_on
を公開します。
Wake Notification Mechanism
エグゼキュータは、Future
のグループ(通常は最も外側のasync関数)を管理し、完了するまで継続的にポーリングしてそれらを進めます。最初に、エグゼキュータはFuture
を1回ポーリングします。その後は、積極的にポーリングすることはありません。poll
メソッドがPoll::Pending
を返した場合、Future
は、wake()
関数を介して何らかのイベントがトリガーされてウェイクアップされるまで中断されます。次に、Future
はエグゼキュータに積極的に通知し、エグゼキュータにポーリングを再開してタスクの実行を継続するように促します。このwakeしてからpollするサイクルは、Future
が完了するまで繰り返されます。
Waker
はwake()
メソッドを提供します。このメソッドは、関連するタスクを再開する準備ができていることをエグゼキュータに通知し、エグゼキュータが対応するFuture
を再度ポーリングできるようにします。
Context
はWaker
のラッパーです。poll
メソッドで使用されるContext
構造を見てみましょう。
pub struct Context<'a> { waker: &'a Waker, _marker: PhantomData<fn(&'a ()) -> &'a ()>, }
Waker
の定義と実装は非常に抽象的です。内部的には、仮想関数テーブル(vtable)を使用して、さまざまなwaker
の動作を可能にします。
pub struct RawWakerVTable { clone: unsafe fn(*const ()) -> RawWaker, wake: unsafe fn(*const ()), wake_by_ref: unsafe fn(*const ()), drop: unsafe fn(*const ()), }
Rust自体はasyncランタイムを提供しません。標準ライブラリに基本的なインターフェースを定義するだけで、ランタイムの動作はサードパーティのランタイムによって実装されます。したがって、標準ライブラリでは、インターフェースの定義といくつかの高レベルの実装のみが表示されます。たとえば、Waker
のwake()
メソッドは、vtableの対応する関数への呼び出しを委任するだけです。
impl Waker { /// Wake up the task associated with this `Waker`. #[inline] pub fn wake(self) { // The actual wakeup call is delegated through a virtual function call // to the implementation which is defined by the executor. let wake = self.waker.vtable.wake; let data = self.waker.data; // Don't call `drop` -- the waker will be consumed by `wake`. crate::mem::forget(self); // SAFETY: This is safe because `Waker::from_raw` is the only way // to initialize `wake` and `data` requiring the user to acknowledge // that the contract of `RawWaker` is upheld. unsafe { (wake)(data) }; } ... }
vtableの実際の実装は標準ライブラリにはありません。これは、futures
クレートにあるものなど、サードパーティのasyncランタイムによって提供されます。
タイマーの構築
タイマーの例を使って、Future
のスケジューリングメカニズムを理解しましょう。目標は次のとおりです。タイマーを作成するときに、特定の期間スリープする新しいスレッドが生成され、時間枠が経過すると、タイマーFuture
に信号を送ります。
注:これには、Waker
を構築する便利な方法を提供するfutures
クレートのArcWake
トレイトが必要です。Cargo.toml
を編集して、次の依存関係を追加します。
[dependencies] futures = "0.3"
タイマーFuture
の完全なコード:
// future_timer.rs use futures; use std::{ future::Future, pin::Pin, sync::{Arc, Mutex}, task::{Context, Poll, Waker}, thread, time::Duration, }; pub struct TimerFuture { shared_state: Arc<Mutex<SharedState>>, } /// Futureとスリーピングスレッド間の共有状態 struct SharedState { /// タイマー(スリープ)が完了したかどうかを示します completed: bool, /// スリープが終了すると、スレッドはこの`waker`を使用して`TimerFuture`にタスクをウェイクアップするように通知できます waker: Option<Waker>, } impl Future for TimerFuture { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { // 共有状態をチェックして、タイマーが完了したかどうかを判断します let mut shared_state = self.shared_state.lock().unwrap(); if shared_state.completed { println!("future ready. execute poll to return."); Poll::Ready(()) } else { println!("future not ready, tell the future task how to wakeup to executor"); // 新しいスレッドがスリープが完了したらタスクをウェイクアップできるように、`waker`を設定します。 // `Future`を再度ポーリングできるようにします。 // ここでの`clone`はすべての`poll`で発生しますが、理想的には一度だけ発生する必要があります。 // 各回クローンする理由は、`TimerFuture`がエグゼキュータ内のタスク間を移動する可能性があるためです。 // 単一の`waker`インスタンスが変更され、間違ったタスクを指す可能性があるため、 // エグゼキュータが間違ったタスクを実行する可能性があります。 shared_state.waker = Some(cx.waker().clone()); Poll::Pending } } } impl TimerFuture { /// 指定された期間後に完了する新しい`TimerFuture`を作成します pub fn new(duration: Duration) -> Self { let shared_state = Arc::new(Mutex::new(SharedState { completed: false, waker: None, })); // 新しいスレッドを生成します let thread_shared_state = shared_state.clone(); thread::spawn(move || { // タイマーをシミュレートするために、指定された期間スリープします thread::sleep(duration); let mut shared_state = thread_shared_state.lock().unwrap(); // タイマーが完了したこと、および対応する`Future`を再度ポーリングできることをエグゼキュータに通知します shared_state.completed = true; if let Some(waker) = shared_state.waker.take() { println!("detect future is ready, wakeup the future task to executor."); waker.wake() } }); TimerFuture { shared_state } } } fn main() { // まだ独自のエグゼキュータを実装していないため、`futures`クレートのエグゼキュータを使用します futures::executor::block_on(TimerFuture::new(Duration::new(10, 0))); }
実行結果:
future not ready, tell the future task how to wakeup to executor detect future is ready, wakeup the future task to executor. future ready. execute poll to return.
上記のように、最初に10秒のタイマーは完了しておらず、Pending
状態です。この時点で、準備ができたときにタスクが自身をウェイクアップする方法をタスクに伝える必要があります。10秒後、タイマーが完了し、以前に設定されたWaker
を使用してFuture
タスクをウェイクアップして実行します。
エグゼキュータの構築
前のコードでは、独自スケジューラを実装していませんでした。futures
クレートによって提供されるエグゼキュータを使用しました。次に、カスタムエグゼキュータを自分で構築して、内部の仕組みを理解しましょう。ただし、実際のRust asyncプログラミングでは、通常はtokio
ライブラリを使用します。ここでは、asyncの仕組みをよりよく理解するために、学習目的で最初から構築しています。
キーコード:
// future_executor.rs use { futures::{ future::{BoxFuture, FutureExt}, task::{waker_ref, ArcWake}, }, std::{ future::Future, sync::mpsc::{sync_channel, Receiver, SyncSender}, sync::{Arc, Mutex}, task::Context, time::Duration, }, }; mod future_timer; // 以前に実装されたタイマーモジュールをインポートします use future_timer::TimerFuture; /// タスクエグゼキュータ:チャネルからタスクを受信し、実行する役割 struct Executor { ready_queue: Receiver<Arc<Task>>, } /// `Spawner`は、新しい`Future`を作成し、タスクチャネルに送信する役割があります #[derive(Clone)] struct Spawner { task_sender: SyncSender<Arc<Task>>, } /// 自身をスケジュールできる(自身をタスクチャネルに送信することにより)、実行されるのを待つ`Future` struct Task { /// 将来のある時点で完了する進行中の`Future` /// /// 技術的には、ここではすべてを単一のスレッドで実行しているため、`Mutex`は不要です。 /// しかし、Rustは`Future`がスレッド間で共有されていないことを認識するほど賢くありません。 /// そのため、コンパイラの安全なスレッド要件を満たすために`Mutex`を使用します。 /// /// プロダクショングレードのエグゼキュータは、オーバーヘッドが発生するため、ここでは`Mutex`を使用しません。 /// 代わりに、`UnsafeCell`を使用します。 future: Mutex<Option<BoxFuture<'static, ()>>>, /// このタスクがタスクキューに自分自身を再送信し、エグゼキュータが`poll`するのを待つことができます task_sender: SyncSender<Arc<Task>>, } fn new_executor_and_spawner() -> (Executor, Spawner) { // タスクチャネル(キューの長さ)のバッファリングされたタスクの最大数 // この実装は簡略化されています。実際のエグゼキュータはこれを異なる方法で処理します const MAX_QUEUED_TASKS: usize = 10_000; let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS); (Executor { ready_queue }, Spawner { task_sender }) } impl Spawner { fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) { let future = future.boxed(); let task = Arc::new(Task { future: Mutex::new(Some(future)), task_sender: self.task_sender.clone(), }); println!("first dispatch the future task to executor."); self.task_sender.send(task).expect("too many tasks queued."); } } /// タスクをウェイクアップして実行するようにスケジュールする方法を定義するために、`ArcWake`を実装します impl ArcWake for Task { fn wake_by_ref(arc_self: &Arc<Self>) { // ウェイクは、タスクを再びタスクチャネルに送信することによって実装されます。 // したがって、エグゼキュータは再び`poll`します。 let cloned = arc_self.clone(); arc_self .task_sender .send(cloned) .expect("too many tasks queued"); } } impl Executor { /// タスクを受信して実行することにより、実際に`Future`タスクを実行します fn run(&self) { let mut count = 0; while let Ok(task) = self.ready_queue.recv() { count += 1; println!("received task. {}", count); // Futureを取得します。完了していない場合(まだSome)、`poll`して完了を試みます let mut future_slot = task.future.lock().unwrap(); if let Some(mut future) = future_slot.take() { // タスク自体に基づいて`LocalWaker`を作成します let waker = waker_ref(&task); let context = &mut Context::from_waker(&*waker); // `BoxFuture<T>`は`Pin<Box<dyn Future<Output = T> + Send + 'static>>`のエイリアスです // `as_mut`はそれを`Pin<&mut dyn Future + Send + 'static>`に変換します if future.as_mut().poll(context).is_pending() { println!("executor run the future task, but is not ready, create a future again."); // Futureはまだ完了していないため、元に戻して次のポーリングを待ちます *future_slot = Some(future); } else { println!("executor run the future task, is ready. the future task is done."); } } } } } fn main() { let (executor, spawner) = new_executor_and_spawner(); // TimerFutureをタスクでラップし、実行のためにスケジューラにディスパッチします spawner.spawn(async { println!("TimerFuture await"); // タイマーFutureを作成し、完了するのを待ちます TimerFuture::new(Duration::new(10, 0)).await; println!("TimerFuture Done"); }); // スポナーをドロップして、エグゼキュータがこれ以上のタスクが送信されないことを認識するようにします drop(spawner); // タスクキューが空になるまでエグゼキュータを実行します // タスクが実行されると、「howdy!」と出力され、2秒間一時停止してから、「done!」と出力されます executor.run(); }
実行結果:
first dispatch the future task to executor. received task. 1 TimerFuture await future not ready, tell the future task how to wakeup to executor executor run the future task, but is not ready, create a future again. detect future is ready, wakeup the future task to executor. received task. 2 future ready. execute poll to return. TimerFuture Done executor run the future task, is ready. the future task is done.
最初のスケジューリング試行では、タスクはまだ準備ができておらず、Pending
を返します。次に、準備ができたときにどのようにウェイクアップする必要があるかをタスクに通知します。その後、イベントの準備が完了すると、タスクは指示どおりにウェイクアップされ、実行のためにスケジュールされます。
非同期処理の流れ
Reactorパターンは、高性能なイベント駆動型システムを構築するために使用される古典的な設計パターンです。エグゼキュータとリアクターはこのパターンのコンポーネントです。Reactorパターンは、主に3つの部分で構成されています。
- タスク:実行される作業の単位。タスクは一時停止してエグゼキュータに制御を譲り、後で再スケジュールされるのを待つことができます。
- エグゼキュータ:実行準備完了のタスク(準備完了キュー)とブロックされているタスク(待機キュー)を維持するスケジューラ。
- リアクター:イベントキューを維持します。イベントが発生すると、特定タスクを起動して実行するようにエグゼキュータに通知します。
エグゼキュータはタスクの実行をスケジュールします。タスクが続行できないが、まだ完了していない場合、中断され、適切なウェイクアップ条件が登録されます。後で、リアクターがウェイクアップ条件を満たすイベントを受信すると、中断されたタスクがウェイクアップします。エグゼキュータは、このタスクのポーリングを再開できます。このサイクルは、タスクが完了するまで繰り返されます。
Future
を介したRustの非同期処理は、Reactorパターンの典型的な実装です。
tokio
を例にとると:async/await
は構文レベルのサポートを提供し、Future
は非同期タスクを表すデータ構造です。.await
が呼び出されると、エグゼキュータはタスクをスケジュールして実行します。
Tokioのスケジューラは複数のスレッドで実行されます。各スレッドは、独自の_準備完了キュー_からタスクを実行します。スレッドのキューが空の場合、他のスレッドのキューからタスクを盗むことができます(ワークスティーリングと呼ばれる戦略)。 タスクがそれ以上進行できなくなり、
Poll::Pending
を返すと、スケジューラはそれを中断し、Waker
を使用して適切なウェイクアップ条件を設定します。リアクターは、OSのasync I/Oメカニズム(epoll
、kqueue
、IOCP
など)を使用してI/Oイベントを監視します。関連イベントがトリガーされると、リアクターはWaker::wake()
を呼び出し、中断されたFuture
をウェイクアップします。Future
は_準備完了キュー_に戻され、実行を待機します。
まとめ
Future
は、Rustの非同期プログラミングモデルのコアとなる抽象化であり、将来のある時点で完了する操作を表しています。RustのFuture
は遅延評価です。つまり、それらを駆動するにはエグゼキュータが必要です。この実行はポーリングを介して実装されます。
- 現在のポーリングサイクルで、
Future
が完了している場合、Poll::Ready(result)
を返します。 Future
がまだ完了していない場合、Poll::Pending()
を返します。この時点で、Future
は中断され、Waker
を介してウェイクアップする外部イベントを待機します。
Waker
は、再開する必要があるタスクをエグゼキュータに通知するためのwake()
メソッドを提供します。wake()
が呼び出されると、エグゼキュータはWaker
に関連付けられたタスクが進行する準備ができていることを認識し、Future
を再度ポーリングします。このwake→poll→中断サイクルは、Future
が最終的に完了するまで続きます。
各非同期タスクは、通常、3つの段階を経ます。
- ポーリングフェーズ:エグゼキュータは
Future
でポーリングを開始します。さらに進行できないポイント(Poll::Pending
)に達すると、タスクは中断され、待機フェーズに入ります。 - 待機フェーズ:イベントソース(通常はリアクターと呼ばれます)は、イベントを待機するために
Waker
を登録します。イベントが発生すると、Waker
をトリガーして関連するFuture
をウェイクアップし、ウェイクアップフェーズに移行します。 - ウェイクアップフェーズ:イベントが発生すると、対応する
Future
はWaker
によってウェイクアップされます。エグゼキュータは、Future
を再度ポーリングするようにスケジュールします。タスクは、完了するか、別のPending
ポイントに到達するまで進行します。このサイクルは、タスクが完全に完了するまで繰り返されます。
Rustプロジェクトをホストするための最適な選択肢であるLeapcellはこちらです。
Leapcellは、Webホスティング、非同期タスク、およびRedisのための次世代サーバーレスプラットフォームです:
多言語サポート
- Node.js、Python、Go、または Rust で開発できます。
無制限のプロジェクトを無料でデプロイ
- 使用量に対してのみ支払い — リクエストも課金もありません。
比類のないコスト効率
- アイドル課金なしの従量課金制。
- 例:25ドルで、平均応答時間60msで694万リクエストをサポートします。
合理化された開発者エクスペリエンス
- 簡単なセットアップのための直感的なUI。
- 完全に自動化されたCI/CDパイプラインとGitOps統合。
- 実用的な洞察を得るためのリアルタイムのメトリックとロギング。
容易な拡張性と高いパフォーマンス
- 高い同時実行性を容易に処理するための自動スケーリング。
- 運用のオーバーヘッドはゼロ — 構築に集中するだけです。
ドキュメントで詳細をご覧ください!
X でフォローしてください: @LeapcellHQ