RustにおけるArc、Mutex、チャンネルを使った並行処理のマスター
Wenhao Wang
Dev Intern · Leapcell

はじめに
高性能で応答性の高いアプリケーションを追求する上で、並行処理を通じて複数のCPUコアを活用することは、単なる利点ではなく、必要不可欠なものとなっています。しかし、並行処理はしばしば大きな課題を伴います:共有状態と、安全な並列実行ユニット間の通信の管理です。多くの言語における従来ののアプローチは、データ競合、デッドロック、メモリ破損といった悪名高いバグにつながる可能性があり、並行プログラミングを困難な作業にしています。Rustは、その強力な所有権と型システムにより、並行処理への新鮮で堅牢なアプローチを提供します。実行時チェックやプログラマのエラーを起こしやすい複雑なロック機構に頼るのではなく、Rustはコンパイル時に安全性を強制し、これらの一般的な落とし穴を排除することを目指しています。この記事では、Rustが並行プログラミングのために提供するコアツール—共有所有権のためのArc、ミュータブルな共有状態のためのMutex、そして安全な通信のためのチャンネル—を探り、それらの正しい使い方と、それらがどのように協力して信頼性の高い効率的な並行アプリケーションを可能にするかを実証します。
Rustのプリミティブによる安全な並行処理
Rustの並行処理に関する哲学は、「恐れを知らぬ並行処理」として要約されることがよくあります。これは単なるマーケティングスローガンではありません。その設計原則の直接的な結果です。具体的に掘り下げる前に、Rustの並行処理モデルの基礎となる基本的な概念を理解しましょう。
スレッドとは? 基本的なレベルでは、スレッドはプログラム内の実行シーケンスです。単一のプログラムは複数のスレッドを並行して実行できます。各スレッドは独自のコールスタックを持ちますが、同じプロセス内のスレッドは同じメモリ空間を共有します。この共有メモリこそが危険な場所であり、複数のスレッドが同時に同じデータへの読み書きを試みると、予期しない動作につながる可能性があります。
Arc:アトミック参照カウント
複数のスレッドが同じデータの一部を所有し、アクセスする必要がある場合、Arc(Atomic Reference Count、アトミック参照カウント)が救世主となります。これはRc(Reference Count、参照カウント)の、スレッドセーフなバージョンです。Rcと同様に、ArcはTへの複数のポインタの作成を許可し、Tデータはそれへの最後のArcポインタがスコープを外れたときにのみ解放されます。「アトミック」という言葉が重要です。これは、参照カウントがアトミック操作を使用して更新されることを意味し、マルチスレッドコンテキストでは安全であることが保証されています。アトミック操作なしでは、共有参照カウントのインクリメントまたはデクリメントは、カウントが不正確になる競合状態につながる可能性があり、メモリリークや早期解放につながる可能性があります。
複数のワーカー・スレッドが共有構成オブジェクトからデータを処理する必要があるシナリオを考えてみましょう。Arcを使用することで、各スレッドはその構成の独自の「所有権」を持つことができます。
use std::sync::Arc; use std::thread; struct Config { processing_units: usize, timeout_seconds: u64, } fn main() { let app_config = Arc::new(Config { processing_units: 4, timeout_seconds: 30, }); let mut handles = vec![]; for i in 0..app_config.processing_units { // Arcをクローンして、各スレッドに新しい「所有者」を作成します let thread_config = Arc::clone(&app_config); handles.push(thread::spawn(move || { println!("Thread {} using config: units={}, timeout={}", i, thread_config.processing_units, thread_config.timeout_seconds); // 設定を使用する作業をシミュレートします thread::sleep(std::time::Duration::from_millis(500)); })); } for handle in handles { handle.join().unwrap(); } println!("All threads finished."); }
この例では、Arc::clone(&app_config)は参照カウントをインクリメントします。スレッドが終了し、そのthread_configがスコープを外れると、参照カウントはデクリメントされます。app_config(およびConfigデータ)は、すべてのArcインスタンスがなくなったときにのみドロップされます。
Mutex:共有ミュータブル状態のための相互排他
Arcは複数のスレッドがデータを所有しアクセスできるようにしますが、そのデータを安全に変更する問題は解決しません。複数のスレッドが同時に同じ共有データへの書き込みを試みると、データ競合が発生します。ここでMutex(Mutual Exclusion、相互排他)が登場します。Mutexは、一度に1つのスレッドだけが保護されたデータにアクセスできることを保証します。スレッドがデータにアクセスしたい場合、まずMutexロックを「取得」する必要があります。ロックがすでに他のスレッドによって保持されている場合、要求しているスレッドはロックが利用可能になるまでブロックされます。スレッドがデータの使用を終えたら、ロックを「解放」し、他のスレッドがそれを取得できるようにします。
Rustでは、Mutex<T>は保護するデータTをラップします。内部のTにアクセスするには、.lock()を呼び出す必要があります。これはMutexGuardを返します。このガードはDerefを&mut Tに、Dropをロックの自動解放に実装します。このDrop実装は、スレッドがパニックを起こした場合でもロックが解放されることを保証するため、安全性と利便性にとって重要です。
ArcとMutexを組み合わせて、スレッド間でミュータブルなカウンターを共有してみましょう。
use std::sync::{Arc, Mutex}; use std::thread; fn main() { // Arcはスレッド間での共有所有権に必要です // Mutexはカウンターへのミュータブルアクセスに必要です let counter = Arc::new(Mutex::new(0)); let mut handles = vec![]; for _ in 0..10 { let counter_clone = Arc::clone(&counter); handles.push(thread::spawn(move || { // ロックを取得します。他のスレッドがロックを保持している場合、この呼び出しはブロックされます。 let mut num = counter_clone.lock().unwrap(); *num += 1; // 共有データを変更します // ロックは`num`がスコープを外れると(このクロージャの終わりに)自動的に解放されます。 })); } for handle in handles { handle.join().unwrap(); } // 最後の値を確認するために、もう一度ロックを取得します println!("Final counter value: {}", *counter.lock().unwrap()); }
このコードでは、Arc<Mutex<i32>>は、複数のスレッド間でミュータブルな整数を共有する慣用的な方法です。各thread::spawnクロージャは、クローンされたArcを受け取ります。クロージャ内では、counter_clone.lock().unwrap()がロックの取得を試みます。成功すると、MutexGuard(&mut i32にデリファレンスされる)が返され、カウンターをインクリメントできるようになります。numがスコープを外れると、MutexGuardがドロップされ、ロックが自動的に解放されます。
チャンネル:メッセージパッシングによる通信
ArcとMutexは状態共有に非常に役立ちますが、状態を直接共有することを避け、代わりにメッセージを渡すことでスレッド間を通信する方が良い場合もあります。Rustの標準ライブラリは、std::sync::mpsc(Multiple Producer, Single Consumer、複数プロデューサー、単一コンシューマー)を介してチャンネルを提供します。このモジュールは、送信者(Sender<T>)と受信者(Receiver<T>)を持つ「チャンネル」を作成することを可能にします。1つ以上の送信者がチャンネルにT型のメッセージを送信でき、単一の受信者がそれらのメッセージを受信できます。
チャンネルは、計算が独立しており、結果の収集が必要な場合、またはスレッドが共有メモリを直接操作することなくアクションを調整する必要がある場合に最適です。
ワーカー・スレッドに作業を送信し、それらのワーカーが完了した結果を返すメイン・スレッドの例を見てみましょう。
use std::sync::mpsc; use std::thread; use std::time::Duration; fn main() { // チャンネルを作成します: (送信者, 受信者) let (tx, rx) = mpsc::channel(); let num_workers = 3; let mut handles = vec![]; for i in 0..num_workers { let tx_clone = tx.clone(); // 各ワーカーのために送信者をクローンします handles.push(thread::spawn(move || { let task_id = i + 1; println!("Worker {} started.", task_id); // 作業をシミュレートします thread::sleep(Duration::from_millis(500 * task_id as u64)); let result = format!("Worker {} finished task.", task_id); // 結果をメイン・スレッドに送信します tx_clone.send(result).unwrap(); println!("Worker {} sent result.", task_id); })); } // メイン・スレッドがこれ以上メッセージを送信しないことを通知するために、元の送信者をドロップします。 // これは、受信者がメッセージの待機を停止すべき時期を知るために重要です。 drop(tx); // 受信者から結果を収集します for received in rx { println!("Main thread received: {}", received); } // すべてのワーカー・スレッドの完了を待ちます for handle in handles { handle.join().unwrap(); } println!("All workers and main thread finished processing messages."); }
この例では:
mpsc::channel()がチャンネルを作成します。tx(送信者)は各ワーカー・スレッドのためにクローンされます。これは「複数プロデューサー」の側面を示しています。- 各ワーカーはいくつかの作業を実行し、その後
tx_clone.send(result).unwrap()がチャンネルにメッセージを送信します。 - メイン・スレッドはその後
rx(受信者)を反復処理します。このループは、メッセージが利用可能になるまでブロックし、すべての送信者がドロップされるまで(すべてのワーカーを作成した後でdrop(tx)を明示的に行い、ワーカーのtx_cloneがスコープを外れることで暗黙的に)続行します。
適切なツールの選択
- 複数のスレッドが同じ不変データに所有し読み取り専用アクセスする必要がある場合は、**
Arc**を使用します。 - 複数のスレッドが同じデータに所有しミュータブルアクセスする必要がある場合は、**
Arc<Mutex<T>>**を使用します。Mutexは競合を導入し、過度に使用したりクリティカルセクションが長すぎたりするとパフォーマンスを低下させる可能性があることに注意してください。 - スレッドがメッセージを渡すことで通信する必要がある場合、特にそれらのアクティビティがある程度独立しており、1つのスレッドがもう1つのスレッドが消費するデータを生成する場合、チャンネルを使用します。これは、共有ミュータブル状態を回避することで、よりシンプルで堅牢な設計につながることがよくあります。
結論
Rustの並行処理へのアプローチは、その堅牢な所有権と型システムに基づいて構築されており、Arc、Mutex、チャンネルのような強力なプリミティブを提供します。これらのツールにより、開発者は他の言語でよく見られる悪名高いデータ競合やデッドロックなしに、自信を持って高度に並行なアプリケーションを構築できます。共有所有権(Arc)を使用するタイミング、ミュータブルデータ(Mutex)の相互排他を提供するタイミング、メッセージパッシング(チャンネル)を選択するタイミングを理解することで、現代のマルチコアプロセッサの力を真に活用する、効率的で安全で信頼性の高い並行システムを設計できます。Rustは、複雑な並列処理の課題を驚くほど扱いやすくすることで、恐れを知らぬ並行処理を実現する力を与えてくれます。

