Rustにおける非同期プログラミング: Stream Traitとそのデザイン
Ethan Miller
Product Engineer · Leapcell

Stream
トレイトは、Future
トレイトと似ています。Future
が単一のアイテムの状態変化を表すのに対し、Stream
は、標準ライブラリのIterator
トレイトと同様に、完了するまでに複数の値を生成できます。簡単に言うと、Stream
は一連のFutures
で構成されており、Stream
が完了するまで、各Future
の結果を読み取ることができます。
Streamの定義
Future
は、非同期プログラミングにおける最も基本的な概念です。Future
が1回限りの非同期値を表す場合、Stream
は一連の非同期値を表します。Future
は1ですが、Stream
は0、1、またはNです。Stream
のシグネチャは次のとおりです。
pub trait Stream { type Item; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>; }
Stream
の概念は、同期プリミティブのIterator
に対応します。シグネチャがいかに似ているか思い出してください!
pub trait Iterator { type Item; fn next(&mut self) -> Option<Self::Item>; }
Streamは連続的なデータソースを抽象化するために使用されますが、終了することもあります(poll
がNone
を返す場合)
Stream
の一般的な例は、futures
クレートのメッセージチャネルのコンシューマーReceiver
です。メッセージがSend
側から送信されるたびに、レシーバーはSome(val)
値を取得します。Send
側が閉じられ(ドロップされ)、チャネルにメッセージがなくなった場合、None
を受信します。
use futures::channel::mpsc; use futures::{executor::block_on, SinkExt, StreamExt}; async fn send_recv() { const BUFFER_SIZE: usize = 10; let (mut tx, mut rx) = mpsc::channel::<i32>(BUFFER_SIZE); println!("tx: Send 1, 2"); tx.send(1).await.unwrap(); tx.send(2).await.unwrap(); drop(tx); // `StreamExt::next`は`Iterator::next`に似ていますが、値を返す代わりに、 // `Future<Output = Option<T>>`を返すため、実際の値を取得するには`.await`が必要です assert_eq!(Some(1), rx.next().await); assert_eq!(Some(2), rx.next().await); assert_eq!(None, rx.next().await); } fn main() { block_on(send_recv()); }
IteratorとStreamの違い
Iterator
では、next()
メソッドを繰り返し呼び出して、None
を返すまで新しい値を取得できます。Iterator
はブロッキングです。next()
の各呼び出しは、結果が得られるまでCPUを占有します。対照的に、非同期のStream
はノンブロッキングであり、待機中にCPUを解放します。Stream
のpoll_next()
メソッドは、Future
のpoll()
メソッドと非常によく似ており、その機能はIterator
のnext()
メソッドに似ています。ただし、poll_next()
を直接呼び出すのは、Poll
状態を手動で処理する必要があるため、不便です。そのため、RustはStreamExt
を提供しています。これはStream
の拡張トレイトであり、Next
構造体によって実装されたFuture
を返すnext()
メソッドを提供します。これにより、stream.next().await
を使用して値を直接反復処理できます。
注:
StreamExt
は_Stream Extension_の略です。Rustでは、最小限のトレイト定義(Stream
など)を1つのファイルに保持し、追加のAPI(StreamExt
など)を別の関連ファイルに配置するのが一般的な方法です。
注:
Future
とは異なり、Stream
トレイトはまだRustのコアライブラリ(std::core
)にありません。これはfutures-util
クレートに存在し、StreamExtensions
も標準ライブラリの一部ではありません。これは、異なるライブラリが競合するインポートを提供する可能性があることを意味します。たとえば、Tokioはfutures-util
とは別に、独自のStreamExt
を提供します。可能であれば、futures-util
を使用することをお勧めします。これは、async/awaitで最も一般的に使用されるクレートです。
StreamExt
のnext()
メソッドとNext
構造体の実装:
pub trait StreamExt: Stream { fn next(&mut self) -> Next<'_, Self> where Self: Unpin { assert_future::<Option<Self::Item>, _>(Next::new(self)) } } // `next`は`Next`構造体を返します pub struct Next<'a, St: ?Sized> { stream: &'a mut St, } // StreamがUnpinの場合、NextもUnpinです impl<St: ?Sized + Unpin> Unpin for Next<'_, St> {} impl<'a, St: ?Sized + Stream + Unpin> Next<'a, St> { pub(super) fn new(stream: &'a mut St) -> Self { Self { stream } } } // NextはFutureを実装し、各poll()は基本的にpoll_next()を介してストリームからポーリングします impl<St: ?Sized + Stream + Unpin> Future for Next<'_, St> { type Output = Option<St::Item>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { self.stream.poll_next_unpin(cx) } }
Streamの作成
futures
ライブラリは、基本的なStream
を作成するためのいくつかの便利なメソッドを提供します。
empty()
:空のStream
を作成しますonce()
:単一の値を含むStream
を作成しますpending()
:値を生成せず、常にPoll::Pending
を返すStream
を作成しますrepeat()
:同じ値を繰り返し生成するStream
を作成しますrepeat_with()
:クロージャを介して遅延的に値を生成するStream
を作成しますpoll_fn()
:Poll
を返すクロージャからStream
を作成しますunfold()
:初期状態とFuture
を返すクロージャからStream
を作成します
use futures::prelude::*; #[tokio::main] async fn main() { let mut st = stream::iter(1..10) .filter(|x| future::ready(x % 2 == 0)) .map(|x| x * x); // ストリームを反復処理します while let Some(x) = st.next().await { println!("Got item: {}", x); } }
上記のコードでは、stream::iter
がStream
を生成し、それがfilter
およびmap
操作を介して渡されます。最後に、ストリームが反復処理され、結果のデータが出力されます。
async/awaitに関心がなく、ストリームの動作のみに関心がある場合は、Stream::iter
がテストに非常に役立ちます。もう1つの興味深いメソッドはrepeat_with
です。これにより、必要に応じて値を遅延的に生成するためにクロージャを渡すことができます。例:
use futures::stream::{self, StreamExt}; // 2の0乗から3乗まで: async fn stream_repeat_with() { let mut curr = 1; let mut pow2 = futures::stream::repeat_with(|| { let tmp = curr; curr *= 2; tmp }); assert_eq!(Some(1), pow2.next().await); assert_eq!(Some(2), pow2.next().await); assert_eq!(Some(4), pow2.next().await); assert_eq!(Some(8), pow2.next().await); }
Streamの実装
独自のStream
を作成するには、次の2つのステップが必要です。
- まず、ストリームの状態を保持する
struct
を定義します - 次に、その
struct
にStream
トレイトを実装します
1から5までカウントするCounter
というストリームを作成しましょう。
#![feature(async_stream)] // まず、struct: /// 1から5までカウントするストリーム struct Counter { count: usize, } // カウンターを1から開始するため、ヘルパーとして`new()`メソッドを追加しましょう。 // これは厳密には必要ありませんが、便利です。 // `count`を0から開始することに注意してください。理由は`poll_next()`の実装で明らかになります。 impl Counter { fn new() -> Counter { Counter { count: 0 } } } // 次に、`Counter`に`Stream`を実装します: impl Stream for Counter { // カウントには`usize`を使用します type Item = usize; // `poll_next()`は唯一の必須メソッドです fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { // カウンターをインクリメントします。これが0から開始した理由です。 self.count += 1; // カウントが終了したかどうかを確認します。 if self.count < 6 { Poll::Ready(Some(self.count)) } else { Poll::Ready(None) } } }
Streamトレイト
Rustには、Stream
、TryStream
、FusedStream
など、ストリームに関連するいくつかのトレイトがあります。
-
Stream
はIterator
と非常によく似ています。ただし、None
を返すと、ストリームが使い果たされ、それ以上ポーリングしないように指示されます。None
を返した後にストリームをポーリングし続けると、未定義の動作が発生し、予期しない結果を引き起こす可能性があります。 -
TryStream
は、Result<value, error>
アイテムを生成するストリームの特殊化されたトレイトです。TryStream
は、内部のResult
を照合および変換するための関数を提供します。これは、Result
アイテムを生成するストリーム用に設計されたAPIと考えることができ、エラー処理ケースの操作がより便利になります。 -
FusedStream
は通常のストリームに似ていますが、ストリームがNone
を返した後に本当に使い果たされたかどうか、または再び安全にポーリングできるかどうかをユーザーが知ることができる機能を追加します。たとえば、循環バッファによってバックアップされたストリームを作成する場合、ストリームは最初の反復でNone
を返す可能性がありますが、FusedStream
を使用すると、後で再びポーリングしてバッファに対する新しいラウンドの反復を再開しても安全です。
反復処理と並行処理
Iterator
トレイトと同様に、Stream
も反復処理をサポートしています。たとえば、map
、filter
、fold
、for_each
、skip
などのメソッド、およびそれらのエラー対応の対応物:try_map
、try_filter
、try_fold
、try_for_each
などを使用できます。
ただし、Iterator
とは異なり、for
ループを直接使用してStream
を反復処理することはできません。代わりに、while let
やloop
のような命令型スタイルのループを使用して、next
またはtry_next
を明示的に繰り返し呼び出すことができます。たとえば、次のいずれかの方法でストリームから読み取ることができます。
// 反復パターン1 while let Some(value) = s.next().await {} // 反復パターン2 loop { match s.next().await { Some(value) => {} None => break; } }
ストリーム内の値の合計を計算する例:
use futures_util::{pin_mut, Stream, stream, StreamExt}; async fn sum(stream: impl Stream<Item=usize>) -> usize { // 反復処理の前にストリームをピン留めすることを忘れないでください pin_mut!(stream); let mut sum: usize = 0; // ストリームを反復処理します while let Some(item) = stream.next().await { sum = sum + item; } sum }
一度に1つの値を処理する場合、非同期プログラミングの目的である並行処理の利点を逃す可能性があります。Stream
から複数の値を並行して処理するには、for_each_concurrent
およびtry_for_each_concurrent
を使用できます。
use std::{pin::Pin, io}; use futures_util::{Stream, TryStreamExt}; async fn jump_around(stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>) -> Result<(), io::Error> { // `try_for_each_concurrent`を使用します stream.try_for_each_concurrent(100, |num| async move { jump_n_times(num).await?; report_n_jumps(num).await?; Ok(()) }).await?; Ok(()) } async fn jump_n_times(num: i32) -> Result<(), io::Error> { println!("jump_n_times :{}", num + 1); Ok(()) } async fn report_n_jumps(num: i32) -> Result<(), io::Error> { println!("report_n_jumps : {}", num); Ok(()) }
まとめ
Stream
はFuture
に似ていますが、Future
が単一のアイテムの状態変化を表すのに対し、Stream
は完了するまでに複数の値を生成できるIterator
のように動作します。簡単に言うと、Stream
は一連のFutures
で構成されており、完了するまでStream
から各Future
の結果を取得できます。これにより、非同期イテレータになります。
Stream
のpoll_next
関数は、次の3つの可能な値のいずれかを返すことができます。
Poll::Pending
:次の値がまだ準備できておらず、まだ待つ必要があることを示します。Poll::Ready(Some(val))
:値が準備できており、正常に返されたことを示します。poll_next
を再度呼び出して、次の値を取得できます。Poll::Ready(None)
:ストリームが終了し、poll_next
を呼び出す必要がなくなったことを示します。
RustプロジェクトのホスティングにはLeapcellが最適です。
Leapcellは、Webホスティング、非同期タスク、Redis向けの次世代サーバーレスプラットフォームです。
多言語サポート
- Node.js、Python、Go、またはRustで開発できます。
無制限のプロジェクトを無料でデプロイ
- 使用量に応じて料金をお支払いください。リクエストや料金はかかりません。
圧倒的なコスト効率
- アイドル料金なしの従量課金制。
- 例:25ドルで平均応答時間60msで694万リクエストをサポートします。
合理化された開発者エクスペリエンス
- 簡単なセットアップのための直感的なUI。
- 完全に自動化されたCI/CDパイプラインとGitOps統合。
- 実用的な洞察のためのリアルタイムメトリックとロギング。
簡単なスケーラビリティと高性能
- 高い同時実行性を容易に処理するための自動スケーリング。
- 運用上のオーバーヘッドはゼロです。構築に集中してください。
詳細については、ドキュメントをご覧ください。
Xでフォローしてください:@LeapcellHQ