Rustでの恐れのない並行性:眠りを失わずにスレッドをテイミング
James Reed
Infrastructure Engineer · Leapcell

Concurrent programs とは、複数のタスクを実行する(またはそうであるかのように見える)プログラムであり、2つ以上のタスクが重複する時間範囲内で交互に実行されることを意味します。これらのタスクは、最小の処理単位であるスレッドによって実行されます。舞台裏では、これは真のマルチタスク(並列処理)ではなく、人間には知覚できない速度でスレッド間のコンテキストを高速に切り替えるものです。多くの最新アプリケーションはこの錯覚に依存しています。例えば、サーバーは他のリクエストを待機しながら、あるリクエストを処理することができます。スレッドがデータを共有する場合、多くの問題が発生する可能性があり、最も一般的なのは 競合状態 と デッドロック です。
Rustの 所有権システム と 型安全性システム は、メモリ安全性と並行性の問題を解決するための強力なツールです。所有権と型チェックを通じて、ほとんどのエラーは実行時ではなくコンパイル時に検出されます。これにより、開発者はデプロイ後ではなく、開発中にコードを修正することができます。コードがコンパイルされると、他の言語でよく見られる追跡困難なバグなしに、マルチスレッド環境で安全に動作することを信頼できます。これがRustが fearless concurrency(恐れのない並行性) と呼ぶものです。
マルチスレッドモデル
マルチスレッドプログラミングのリスク
ほとんどの最新のオペレーティングシステムでは、実行中のプログラムコードは、オペレーティングシステムが管理する プロセス 内で実行されます。プログラム内には、スレッド と呼ばれる、独立して実行される複数のコンポーネントも存在します。
プログラムの計算を複数のスレッドに分割すると、プログラムが複数のタスクを同時に処理できるため、パフォーマンスが向上します。ただし、複雑さも増します。スレッドは同時に実行されるため、異なるスレッドのコードが実行される順序は保証されません。これにより、次のような問題が発生する可能性があります。
- 競合状態:複数のスレッドが矛盾した順序でデータやリソースにアクセスする状態
- デッドロック:2つのスレッドが互いに相手が保持しているリソースの解放を待機し、それ以上の進行を妨げる状態
- 特定の状況下でのみ発生し、再現または一貫して修正することが困難なバグ
プログラミング言語は、スレッドを実装するさまざまな方法を持っています。多くのオペレーティングシステムは、新しいスレッドを作成するためのAPIを提供しています。言語がOS APIを使用してスレッドを作成する場合、これは多くの場合 1:1モデル と呼ばれ、1つのOSスレッドが1つの言語レベルのスレッドに対応します。
Rustの標準ライブラリは、1:1スレッドモデル のみを提供します。
spawn
を使用した新しいスレッドの作成
use std::thread; use std::time::Duration; fn main() { let thread = thread::spawn(|| { for i in 1..10 { println!("this is thread {}", i); thread::sleep(Duration::from_millis(1)); } }); for k in 1..5 { println!("this is main {}", k); thread::sleep(Duration::from_millis(1)); } }
出力:
this is main 1
this is thread 1
this is main 2
this is thread 2
this is main 3
this is thread 3
this is main 4
this is thread 4
this is thread 5
メインスレッドが5回のループ反復を終えて終了すると、新しく作成されたスレッドは、10回の反復用に設計されているにもかかわらず、5回の反復しか実行できずに終了することがわかります。メインスレッドが終了すると、新しいスレッドも、完了したかどうかに関係なく終了します。
新しいスレッドがメインスレッドの続行前に終了するようにしたい場合は、JoinHandle
を使用できます。
use std::thread; use std::time::Duration; fn main() { let handler = thread::spawn(|| { for i in 1..10 { println!("this is thread {}", i); thread::sleep(Duration::from_millis(1)); } }); for k in 1..5 { println!("this is main {}", k); thread::sleep(Duration::from_millis(1)); } handler.join().unwrap(); // 新しいスレッドが終了するまでメインスレッドをブロックします }
出力:
this is main 1
this is thread 1
this is main 2
this is thread 2
this is main 3
this is thread 3
this is main 4
this is thread 4
this is thread 5
this is thread 6
this is thread 7
this is thread 8
this is thread 9
thread::spawn
の戻り値の型はJoinHandle
です。JoinHandle
は所有されている値であり、そのjoin
メソッドを呼び出すと、スレッドが終了するまで待機します。
ハンドルでjoin
を呼び出すと、ハンドルで表されるスレッドが終了するまで、現在のスレッドがブロックされます。スレッドをブロックするとは、それ以上の作業を行ったり、終了したりするのを防ぐことを意味します。
スレッドと move
クロージャ
move
クロージャを使用すると、メインスレッドからクロージャに変数の所有権を転送できます。
use std::thread; fn main() { let v = vec![2, 4, 5]; // `move`は`v`の所有権をクロージャに転送します let thread = thread::spawn(move || { println!("v is {:?}", v); }); }
出力:
v is [2, 4, 5]
Rustは、変数v
の所有権を新しいスレッドに移動します。これにより、新しいスレッド内で変数を安全に使用できるようになり、メインスレッドはv
を使用(たとえば、ドロップ)できなくなります。
move
キーワードを省略すると、コンパイラはエラーを発生させます。
$ cargo run error[E0373]: closure may outlive the current function, but it borrows `v`, which is owned by the current function --> src/main.rs:6:32 | 6 | let handle = thread::spawn(|| { | ^^ may outlive borrowed value `v` 7 | println!("Here's a vector: {:?}", v); | - `v` is borrowed here
Rustの所有権ルールは、メモリの安全性を確保するのに役立ちました!
メッセージパッシング
Rustでは、メッセージパッシングの並行処理のための主要なツールの1つはチャネルです。チャネルは標準ライブラリによって提供される概念です。チャネルは、水のチャネル(川または小川)のように考えることができます。ゴム製のアヒルやボートのようなものをチャネルに入れると、下流の受信機に流れ込みます。
チャネルには、送信機と受信機の2つの部分があります。送信機または受信機のいずれかがドロップされると、チャネルは閉じられたと見なされます。
チャネルは、標準ライブラリのstd::sync::mpsc
を介して実装されます。これは、multiple producer, single consumer(複数プロデューサー、単一コンシューマー) の略です。
注:リーダーとライターの数に基づいて、チャネルは次のように分類できます。
- SPSC – Single Producer, Single Consumer(アトミックのみを使用できます)
- SPMC – Single Producer, Multiple Consumers(コンシューマー側でのロックが必要)
- MPSC – Multiple Producers, Single Consumer(プロデューサー側でのロックが必要)
- MPMC – Multiple Producers, Multiple Consumers
スレッド間でのメッセージの受け渡し
use std::thread; use std::sync::mpsc; fn main() { let (tx, rx) = mpsc::channel(); // `tx`をクロージャに移動して、新しいスレッドがそれを所有するようにします thread::spawn(move || { tx.send("hello").unwrap(); }); // `recv()`は、値が受信されるまでメインスレッドをブロックします let msg = rx.recv().unwrap(); println!("message is {}", msg); }
出力:
message is hello
チャネルの受信側には、recv
とtry_recv
という2つの便利なメソッドがあります。
ここでは、_receive_の略であるrecv
を使用しました。これは、値が受信されるまでメインスレッドをブロックします。値が送信されると、recv
はResult<T, E>
でそれを返します。送信機が閉じている場合、それ以上値が到着しないことを示すエラーが返されます。
try_recv
はブロックしません。代わりに、Result<T, E>
ですぐに戻ります。データが利用可能な場合はOk
、そうでない場合はErr
です。
新しいスレッドの実行が完了していない場合、try_recv
を使用すると、ランタイムエラーが発生する可能性があります。
use std::thread; use std::sync::mpsc; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { tx.send("hello").unwrap(); }); // `try_recv`はすぐに戻るため、メッセージを時間内に受信できない場合があります let msg = rx.try_recv().unwrap(); println!("message is {}", msg); }
エラー:
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Empty', ...
複数の値を送信し、受信側の待機を監視する
use std::thread; use std::sync::mpsc; use std::time::Duration; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("thread"), ]; for val in vals { tx.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } }); // `rx`の`for`ループは、イテレーターとして受信値が来るのを暗黙的に待ちます for received in rx { println!("Got: {}", received); } }
サンプル出力(行間に1秒のポーズがあります):
Got: hi
Got: from
Got: the
Got: thread
送信機を複製して複数のプロデューサーを作成する
use std::thread; use std::sync::mpsc; use std::time::Duration; fn main() { let (tx, rx) = mpsc::channel(); // 送信機`tx`を複製して、2番目のプロデューサーを作成します let tx1 = tx.clone(); thread::spawn(move || { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("thread"), ]; for val in vals { tx1.send(val).unwrap(); // 複製された送信機を使用します thread::sleep(Duration::from_secs(1)); } }); thread::spawn(move || { let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ]; for val in vals { tx.send(val).unwrap(); // 元の送信機を使用します thread::sleep(Duration::from_secs(1)); } }); // txとtx1の両方が同じ受信機rxに値を送信します for received in rx { println!("Got: {}", received); } }
サンプル出力(スケジューリングによりシステムによって異なります):
Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you
共有状態
共有状態またはデータとは、複数のスレッドが同じメモリ位置に同時にアクセスすることを意味します。 Rustは、**mutex(相互排他ロック)**を使用して、共有メモリの並行処理プリミティブを実装します。
Mutexを使用すると、一度に1つのスレッドのみがデータにアクセスできます
Mutexは**相互排他(mutual exclusion)**の略で、特定のデータには一度に1つのスレッドのみがアクセスできることを意味します。 mutex内のデータにアクセスするには、スレッドは最初にロックを取得する必要があります。ロックは、現在排他的アクセス権を持っている人を追跡するデータ構造です。
標準ライブラリのstd::sync::Mutex
を使用します。
use std::sync::Mutex; fn main() { let m = Mutex::new(5); { let mut num = m.lock().unwrap(); *num = 10; // Mutex内の値を変更します println!("num is {}", num); } println!("m is {:?}", m); }
出力:
num is 10
m is Mutex { data: 10 }
lock
メソッドを使用して、mutex内のデータへのアクセスを取得します。この呼び出しは、ロックが取得されるまで現在のスレッドをブロックします。
Mutex
はスマートポインタです。より具体的には、lock
はMutexGuard
を返します。これは、基になるデータを指すDeref
を実装するスマートポインタです。また、Drop
も実装しているため、MutexGuard
がスコープ外になると、ロックは自動的に解放されます。
スレッド間でのMutexの共有
複数のスレッド間でデータを共有する場合、複数の所有者が必要な場合は、Arc
スマートポインタを使用してMutex
をラップします。 Arc
はスレッドセーフです。 Rc
はスレッドセーフではなく、マルチスレッドコンテキストで安全に使用することはできません。
Arc
を使用してMutex
をラップし、スレッド間で共有所有権を許可する例を次に示します。
use std::sync::{Mutex, Arc}; use std::thread; fn main() { let counter = Arc::new(Mutex::new(0)); let mut handles = vec![]; for _ in 0..10 { let counter = Arc::clone(&counter); // スレッドに移動する前にArcを複製します let handle = thread::spawn(move || { let mut num = counter.lock().unwrap(); *num += 1; }); handles.push(handle); } for handle in handles { handle.join().unwrap(); } println!("Result: {}", *counter.lock().unwrap()); }
出力:
Result: 10
要約すると:
Rc <T>
+RefCell <T>
は、通常、シングルスレッドの内部可変性に使用されますArc <T>
+Mutex <T>
は、マルチスレッドの内部可変性に使用されます
注:Mutex
は依然としてデッドロックを引き起こす可能性があります。これは、2つの操作がそれぞれ2つのリソースをロックする必要があり、2つのスレッドがそれぞれ1つのロックを保持し、もう一方を待機している場合(循環待機が発生する)に発生する可能性があります。
Send
とSync
に基づくスレッド安全性
Rustでは、並行処理関連のツールは、言語自体ではなく、標準ライブラリの一部です。ただし、言語に組み込まれている2つの並行処理概念があります。それは、std::marker
のSend
とSync
トレイトです。
Send
とSync
の目的
Send
とSync
は、Rustでの安全な並行処理の中核です。厳密に言うと、これらはマーカートレイト(メソッドを定義しないトレイト)であり、並行処理の動作を型にマークするために使用されます。
Send
を実装する型は、その所有権をスレッド間で安全に転送できます。- **
Sync
**を実装する型は、参照を介してスレッド間で共有できます。
これから、&T
がSend
の場合、T
はSync
であると推測できます。
Send
とSync
を実装する型
Rustでは、ほぼすべての型がデフォルトでSend
とSync
を実装しています。これは、複合型(構造体など)の場合、すべてのメンバーがSend
またはSync
である場合、複合型はそれらのトレイトを自動的に継承することを意味します。
ただし、1つのメンバーでもSend
またはSync
でない場合、型全体がそうではありません。
要約すると:
Send
を実装する型は、スレッド間で所有権を安全に転送できます。Sync
を実装する型は、スレッド間で安全に共有できます(参照による)。- Rustの大部分の型は、
Send
とSync
の両方です。
これらのトレイトを実装しない一般的な型は次のとおりです。
- 生ポインタ
- Cell、RefCell
- Rc
独自の型に対してSend
とSync
を 手動で実装することは可能ですが:
unsafe
コードを使用する必要があります- スレッドの安全性を 手動で確保する必要があります
- これは めったに必要ありません。細心の注意を払って実行する必要があります
注:
Cell
とRefCell
は、それらのコア実装(UnsafeCell
)がSync
ではないため、Sync
ではありません。Rc
は、その内部参照カウンタがスレッドセーフではないため、Send
でもSync
でもありません。- 生ポインタは、安全性の保証を提供しないため、どちらのトレイトも実装しません。
まとめ
Rustは、async/awaitとマルチスレッドの両方の並行処理モデルを提供します。マルチスレッドモデルを効果的に使用するには、次のものを含むRustのスレッドの基礎を理解する必要があります。
- スレッドの作成
- スレッドの同期
- スレッドの安全性
Rustは以下をサポートしています。
- メッセージパッシングの並行処理。ここでは、
channel
を使用してスレッド間でデータを送信します - 共有状態の並行処理。ここでは、
Mutex
とArc
を使用して、スレッド間でデータを共有し、安全に変更します
型システムとボローチェッカーにより、これらのパターンにはデータ競合やダングリング参照がないことが保証されます。
コードが コンパイルされると、他の言語で見られるようなとらえどころのない、デバッグが困難なバグなしに、マルチスレッド環境で正しく 実行されることを確信できます。
Send
およびSync
トレイトは、スレッド間でのデータの安全な転送または共有のための保証を提供します。
要約すると:
- スレッドモデル:マルチスレッドプログラムは、競合状態、デッドロック、および再現が困難なバグを処理する必要があります。
- メッセージパッシング:チャネルを使用してスレッド間でデータを送信します。
- 共有状態:
Mutex
+Arc
により、複数のスレッドが同じデータにアクセスして変更できます。 - スレッド安全性:
Send
およびSync
トレイトは、マルチスレッドコンテキストでのデータ転送と共有の安全性を保証します。
Rustプロジェクトをホストするのに最適なLeapcellはこちらです。
Leapcellは、Webホスティング、非同期タスク、Redis向けの次世代サーバーレスプラットフォームです。
マルチ言語サポート
- Node.js、Python、Go、またはRustで開発します。
無制限のプロジェクトを無料でデプロイ
- 使用量に対してのみ支払い—リクエストなし、料金なし。
比類のないコスト効率
- アイドル料金なしの従量課金制。
- 例:25ドルで平均応答時間60ミリ秒で694万リクエストをサポートします。
合理化された開発者エクスペリエンス
- 簡単なセットアップのための直感的なUI。
- 完全に自動化されたCI / CDパイプラインとGitOps統合。
- 実用的な洞察のためのリアルタイムメトリックとロギング。
簡単なスケーラビリティと高性能
- 高い同時実行性を簡単に処理するための自動スケーリング。
- 運用のオーバーヘッドはゼロ—構築に集中するだけです。
詳細については、ドキュメントをご覧ください。
Xでフォローしてください:@LeapcellHQ