Rustにおける数千ものWebSocket接続管理:アクターモデル vs Mutex<HashMap>
Olivia Novak
Dev Intern · Leapcell

はじめに
リアルタイムWebアプリケーションの領域では、WebSocket接続は不可欠なものとなっています。チャットアプリケーション、共同編集ツール、ライブダッシュボード、ゲームなど、クライアントとサーバー間の永続的で低遅延な通信を維持する能力は極めて重要です。これらのアプリケーションがスケールするにつれて、おそらく数千、あるいは数万もの同時WebSocket接続に関連する状態を管理することは、重大なアーキテクチャ上の課題となります。データの一貫性、高いスループット、耐障害性を確保するには、並行処理モデルとデータ構造を慎重に検討する必要があります。この記事では、Rustエコシステムにおけるこの課題に対処するための2つの主要なアプローチ、すなわちアクターモデルと、より伝統的なMutex<HashMap>を検討し、それらの原則、実装、および大量の接続を管理する上での実際的な影響を分析します。
スケーラブルな接続管理のためのコアコンセプト
2つの主要なアプローチを掘り下げる前に、Rustで効率的に同時WebSocket接続を管理するために重要なコアコンセプトについて共通の理解を確立しましょう。
WebSocket接続の状態
各アクティブなWebSocket接続には、ユーザーID、セッション情報、さまざまなトピックへのサブスクリプション詳細、さらにはクライアントにメッセージを送信するためのチャネルのSender部分など、関連データが存在することがよくあります。この「状態」は、メッセージが到着したりイベントが発生したりするにつれて、アプリケーションのさまざまな部分からアクセス可能で変更可能である必要があります。
並行処理と並列処理
Rustの所有権と借用システムは、コンパイル時にデータ競合を防ぐための強力なツールです。しかし、複数の非同期タスク(WebSocketサーバーでは一般的)間で共有されるミュータブルな状態を扱う場合、慎重なパターンが必要です。
- 並行処理(Concurrency): 潜在的に単一コア上で、時間とともにインターリーブされる複数のタスクを処理すること。これはRustの
async/awaitで典型的です。 - 並列処理(Parallelism): 通常、複数のCPUコアで、複数のタスクを同時に実行すること。
非同期プログラミング (async/await)
Rustのasync/await構文は、ネットワーク通信のようなI/Oバウンドな操作に不可欠な、ノンブロッキングコードを記述する方法を提供します。単一のスレッドは、I/O操作中に制御を譲ることで、多くのWebSocket接続を同時に管理し、他のタスクを実行できるようにします。
メッセージパッシング
タスクが、ミュータブルなメモリを直接共有するのではなく、互いにデータを送信することによって通信する基本的な並行処理プリミティブ。これには、チャネル(例:flume、tokio::mpsc)がよく使用されます。
共有ミュータブル状態
複数のタスクが同じデータにアクセスし、それを変更する必要がある場合、それは共有ミュータブル状態になります。Rustは、主に共有所有権のためのArc(Atomic Reference Counted)と、排他的アクセスを同期するためのMutexのような同期プリミティブを使用して、これを安全に管理するためのいくつかのメカニズムを提供します。
接続管理:アクターモデル
アクターモデルは、計算の普遍的なプリミティブとして「アクター」を持つ、並行計算のための強力なパラダイムです。各アクターは、自身の状態、動作、およびメールボックスを持つ独立した計算エンティティです。アクターは、互いのメールボックスに不変なメッセージを送信することによってのみ通信します。それらは一度に1つのメッセージを処理し、複数の送信者によって内部状態が同時にアクセスされることを防ぎ、設計上データ競合を排除します。
原則
WebSocket接続の文脈では、アクターモデルアプローチは通常、以下を含みます:
- 接続アクター: 各WebSocket接続は理論的にはアクターとなり、自身の状態を管理し、メッセージを送受信できます。しかし、数千の接続の場合、接続ごとに完全なアクターを作成すると、オーバーヘッドが大きすぎる可能性があります。
- 接続マネージャーアクター: より一般的でスケーラブルなアプローチは、すべてのアクティブな接続の状態を所有および管理する単一の「接続マネージャー」アクター(または少数の「シャード」アクター)を持つことです。WebSocketクライアントがメッセージを送信すると、それはこの接続マネージャーアクターに転送されます。サーバーが特定のクライアントにメッセージを送信する必要がある場合、それは接続マネージャーアクターにメッセージを送信し、アクターはクライアントの送信者チャネルを検索してメッセージをディスパッチします。
tokio::mpsc を使用した実装例
tokio::mpsc チャネルを使用した簡単な例で示しましょう。
use tokio::net::{TcpListener, TcpStream}; use tokio::sync::{mpsc, Mutex}; use tokio_tungstenite::{accept_async, tungstenite::Message}; use futures_util::{StreamExt, SinkExt}; use std::collections::HashMap; use std::sync::Arc; // --- Connection Manager Actor へのメッセージ --- #[derive(Debug)] enum ConnectionManagerMessage { Register { id: u32, sender: mpsc::Sender<Message> }, Unregister { id: u32 }, // 例: 全体にブロードキャストまたは特定のクライアントに送信 Broadcast { msg: String }, SendToClient { id: u32, msg: String }, } // --- Connection Manager Actor --- struct ConnectionManagerActor { connections: HashMap<u32, mpsc::Sender<Message>>, next_client_id: u32, } impl ConnectionManagerActor { fn new() -> Self { ConnectionManagerActor { connections: HashMap::new(), next_client_id: 0, } } async fn run(mut self, mut receiver: mpsc::Receiver<ConnectionManagerMessage>) { while let Some(msg) = receiver.recv().await { match msg { ConnectionManagerMessage::Register { id, sender } => { self.connections.insert(id, sender); println!("Client {} registered. Total: {}", id, self.connections.len()); } ConnectionManagerMessage::Unregister { id } => { self.connections.remove(&id); println!("Client {} unregistered. Total: {}", id, self.connections.len()); } ConnectionManagerMessage::Broadcast { msg } => { for (_id, sender) in &self.connections { let _ = sender.send(Message::text(msg.clone())).await; } } ConnectionManagerMessage::SendToClient { id, msg } => { if let Some(sender) = self.connections.get(&id) { let _ = sender.send(Message::text(msg)).await; } else { eprintln!("Client {} not found for targeted message.", id); } } } } } fn generate_id(&mut self) -> u32 { let id = self.next_client_id; self.next_client_id += 1; id } } // --- WebSocket Handler Task --- async fn handle_connection( raw_stream: TcpStream, manager_sender: mpsc::Sender<ConnectionManagerMessage>, client_id: u32, ) { let ws_stream = match accept_async(raw_stream).await { Ok(ws) => ws, Err(e) => { eprintln!("Error during WebSocket handshake: {}", e); return; } }; let (mut ws_sender, mut ws_receiver) = ws_stream.split(); let (tx, mut rx) = mpsc::channel::<Message>(100); // Channel for sending messages to this specific client // Register client with the manager let _ = manager_sender.send(ConnectionManagerMessage::Register { id: client_id, sender: tx }).await; // Task to send messages from manager to client let send_to_client_task = tokio::spawn(async move { while let Some(msg) = rx.recv().await { if let Err(e) = ws_sender.send(msg).await { eprintln!("Error sending message to client {}: {}", client_id, e); break; } } }); // Task to receive messages from client and forward to manager (or process directly) while let Some(msg) = ws_receiver.next().await { match msg { Ok(Message::Text(text)) => { println!("Received from client {}: {}", client_id, text); // Example: client sends a broadcast request if text == "broadcast" { let _ = manager_sender.send(ConnectionManagerMessage::Broadcast { msg: format!("Hello from client {}", client_id) }).await; } else { // Or simply echo back let _ = manager_sender.send(ConnectionManagerMessage::SendToClient { id: client_id, msg: format!("Echo: {}", text) }).await; } } Ok(Message::Ping(_)) => { let _ = ws_sender.send(Message::Pong(vec![])).await; } Ok(Message::Close(_)) => { println!("Client {} disconnected.", client_id); break; } Err(e) => { eprintln!("Error receiving from client {}: {}", client_id, e); break; } _ => {} // Ignore other message types for simplicity } } println!("Client handler for {} shutting down.", client_id); let _ = manager_sender.send(ConnectionManagerMessage::Unregister { id: client_id }).await; send_to_client_task.abort(); // Stop the sender task } #[tokio::main] async fn main() { let listener = TcpListener::bind("127.0.0.1:8080").await.expect("Can't listen"); println!("Listening on: 127.0.0.1:8080"); let (manager_sender, manager_receiver) = mpsc::channel::<ConnectionManagerMessage>(1000); // Channel for manager messages let mut manager_actor = ConnectionManagerActor::new(); let manager_sender_clone = manager_sender.clone(); // Clone for the main loop // Spawn the Connection Manager Actor tokio::spawn(async move { manager_actor.run(manager_receiver).await; }); loop { let (stream, _) = listener.accept().await.expect("failed to accept"); let client_id = { let mut guard = manager_actor.next_client_id; // Temporary access for ID generation, careful here // In a real actor model, ID generation would be a message to the manager // For simplicity, we'll assume manager_actor is mutable here. // A better way would be the manager sending a message BACK with the assigned ID. let id = guard; guard += 1; id }; tokio::spawn(handle_connection(stream, manager_sender_clone.clone(), client_id)); } }
(注意: mainにおけるアクターモデルの例のID生成は簡略化されています。厳密なアクターモデルでは、ID生成さえもアクターへのメッセージになるか、アクターがRegister時にIDを割り当て、クライアントハンドラに返送することになります。)
アプリケーションシナリオ
アクターモデルは、以下に特に適しています:
- 複雑な状態遷移: 接続状態がさまざまな受信メッセージに基づいて大幅に変更される場合。
- サービスディスカバリ/ルーティング: アクターは、どのクライアントがどのトピックにサブスクライブしているかを管理し、それに応じてメッセージをルーティングできます。
- 疎結合なコンポーネント: 本質的に疎結合を促進し、システムを理解しやすくテストしやすくします。
接続管理:Mutex<HashMap>
Mutex<HashMap>アプローチは、Rustで共有状態を管理するためのより直接的な方法です。これには、Mutex(非同期コンテキストでない場合はstd::sync::Mutex)で保護されたHashMap(キーは接続ID、値は送信者部分または完全な接続オブジェクト)を使用し、通常はArcにラップしてタスク間での共有所有権を確保します。
原則
タスクが共有接続状態にアクセスまたは変更する必要がある場合:
Mutexのロックを取得します。これにより、ロックが解放されるまで他のタスクがロックを取得できなくなり、排他的アクセスが保証されます。HashMapに必要な操作を実行します。- ロックを解放します。
このメカニズムは、共有データへのアクセスをシリアル化することによって、データ競合を明示的に防ぎます。
Arc<Mutex<HashMap>> を使用した実装例
use tokio::net::{TcpListener, TcpStream}; use tokio::sync::{mpsc, Mutex}; use tokio_tungstenite::{accept_async, tungstenite::Message}; use futures_util::{StreamExt, SinkExt}; use std::collections::HashMap; use std::sync::Arc; // すべての接続のための共有状態 struct SharedState { connections: Mutex<HashMap<u32, mpsc::Sender<Message>>>, next_client_id: Mutex<u32>, } impl SharedState { fn new() -> Self { SharedState { connections: Mutex::new(HashMap::new()), next_client_id: Mutex::new(0), } } } // --- WebSocket Handler Task --- async fn handle_connection_mutex( raw_stream: TcpStream, state: Arc<SharedState>, ) { let ws_stream = match accept_async(raw_stream).await { Ok(ws) => ws, Err(e) => { eprintln!("Error during WebSocket handshake: {}", e); return; } }; let (mut ws_sender, mut ws_receiver) = ws_stream.split(); let (tx, mut rx) = mpsc::channel::<Message>(100); // Channel for sending messages to this specific client let client_id = { let mut next_id = state.next_client_id.lock().await; let id = *next_id; *next_id += 1; id }; println!("New client connected, ID: {}", client_id); // Register client with the shared state { let mut connections = state.connections.lock().await; connections.insert(client_id, tx); println!("Client {} registered (Mutex). Total: {}", client_id, connections.len()); } // Task to send messages from manager to client let send_to_client_task = tokio::spawn(async move { while let Some(msg) = rx.recv().await { if let Err(e) = ws_sender.send(msg).await { eprintln!("Error sending message to client {}: {}", client_id, e); break; } } }); // Task to receive messages from client and process/forward while let Some(msg) = ws_receiver.next().await { match msg { Ok(Message::Text(text)) => { println!("Received from client {} (Mutex): {}", client_id, text); // Example: Broadcast to all if text == "broadcast" { let connections = state.connections.lock().await; for (&id, sender) in connections.iter() { if id != client_id { // Don't send back to self for broadcast example let _ = sender.send(Message::text(format!("Broadcast from {}: {}", client_id, text))).await; } } } else { // Echo back to sender let connections = state.connections.lock().await; if let Some(sender) = connections.get(&client_id) { let _ = sender.send(Message::text(format!("Echo (Mutex): {}", text))).await; } } } Ok(Message::Ping(_)) => { let _ = ws_sender.send(Message::Pong(vec![])).await; } Ok(Message::Close(_)) => { println!("Client {} disconnected (Mutex).", client_id); break; } Err(e) => { eprintln!("Error receiving from client {} (Mutex): {}", client_id, e); break; } _ => {} // Ignore other message types for simplicity } } println!("Client handler for {} shutting down (Mutex).", client_id); // Unregister client from shared state { let mut connections = state.connections.lock().await; connections.remove(&client_id); println!("Client {} unregistered (Mutex). Total: {}", client_id, connections.len()); } send_to_client_task.abort(); // Stop the sender task } #[tokio::main] async fn main() { let listener = TcpListener::bind("127.0.0.1:8081").await.expect("Can't listen"); println!("Listening on: 127.0.0.1:8081 (Mutex)"); let shared_state = Arc::new(SharedState::new()); loop { let (stream, _) = listener.accept().await.expect("failed to accept"); tokio::spawn(handle_connection_mutex(stream, Arc::clone(&shared_state))); } }
アプリケーションシナリオ
Mutex<HashMap>アプローチは、以下の場合にしばしば好まれます:
- シンプルさが鍵: 共有状態が比較的単純で、それにアクセスする操作の数がそれほど多くないアプリケーションでは、
Mutexは概念的に理解しやすく実装も容易です。 - オーバーヘッドが少ない: メッセージパッシングの指示がない直接的な
Mutexアクセスは、個々の操作でより低い遅延を提供する場合がありますが、これは競合によって相殺される可能性があります。 - 直接アクセス: アプリケーションの多くの異なる部分が、接続情報のサブセットに直接クエリまたは変更する必要がある場合。
比較と考慮事項
| 特徴 | アクターモデル (ConnectionManagerActor) | Mutex<HashMap> |
|---|---|---|
| 並行処理モデル | メッセージパッシング、アクターごとのシングルスレッド処理 | 共有メモリ、明示的なロック |
| データ安全性 | メッセージパッシング設計により本質的に安全。アクターが自身の状態を所有する。 | Mutexによる安全性(排他的アクセスを保証)。 |
| スケーラビリティ | アクターのシャーディングやマネージャーアクター間での負荷分散により高度にスケーラブル。単一アクターでのメッセージの逐次処理はボトルネックになる可能性がある。 | Mutexの取得がブロックするため、高負荷時のボトルネックになりうる。中程度の負荷には良好。 |
| 複雑さ | メッセージとチャネル定義のための初期セットアップのボイラープレートが多い。セットアップ後はビジネスロジックを理解しやすくなる。 | 初期セットアップがよりシンプル。複雑なシナリオでのデッドロック管理や適切なロック解放の保証が複雑になる可能性がある。 |
| パフォーマンス | メッセージパッシングのオーバーヘッド。実際のデータに対するロックを回避することによる良好なスループット。 | Mutexのロック/アンロックのオーバーヘッド。競合下ではレイテンシが高くなる可能性がある。 |
| テスト容易性 | 事前定義されたメッセージを送信し、応答を確認することで、アクターを分離してテストしやすい。 | 同時アクセスをシミュレートし、競合状態を確認するためのテストがより複雑になる。 |
| デバッグ | メッセージがイベントの明確な監査証跡を提供する。状態変更の追跡が容易。 | デッドロックや微妙な競合状態のデバッグは困難な場合がある。 |
| 障害分離 | アクターの障害は、通常、そのアクターに限定される。 | |
| 共有状態へのアクセスにおけるバグがシステム全体を不安定にする可能性がある。 |
どちらを選択するか
-
アクターモデルを選択する場合:
- 接続に関するアプリケーションロジックが複雑で、複数の明確な状態を含み、特定のルーティング要件がある場合。
Mutexの競合が重大なボトルネックとなる非常に高い同時実行性(毎秒数千メッセージ)を予想する場合。- 状態遷移を明示的に制御し、コンポーネントが定義済みのメッセージのみを介して通信するシステムを好む場合。
- 大規模分散システムにおける保守性およびテスト容易性が最優先事項である場合。
-
Mutex<HashMap>を選択する場合:- 共有状態が比較的単純(例:クライアント送信者の格納のみ)な場合。
- 共有マップを直接変更する同時操作の数が中程度で、競合が主要な問題になると予想されない場合。
- 共有状態への直接的で(競合がない場合)高速なアクセスが必要な場合。
- 初期実装のシンプルさが最優先事項である場合。
また、これらが相互排他的ではないことにも注目する価値があります。アクターモデルをハイレベルな接続管理と全体的なシステム調整に使用し、個々のアクター(またはアクターモデル外のシステムのコンポーネント)が、自身に限定された共有ミュータブル状態のためにMutexを使用するかもしれません。
結論
Rustで数千ものWebSocket接続を管理するには、堅牢な並行処理戦略が必要です。中央マネージャーアクターを介したアクターモデルとArc<Mutex<HashMap>>パターンは、どちらも有効なアプローチを提供します。アクターモデルは、メッセージベースの通信を強制し、状態を分離することによって、複雑なシステムのために本質的に安全でスケーラブルな設計を提供し、非常にインタラクティブでデータリッチなリアルタイムアプリケーションに最適です。逆に、Mutex<HashMap>は、それほど複雑でない共有状態のための、よりシンプルで直接的なソリューションを提供し、多くの場合、中規模の負荷に対して十分でパフォーマンスが高く、アーキテクチャレイヤーが少なくて済みます。最終的な選択は、アプリケーションの特定の要件に依存し、複雑さ、予想される負荷、および厳格な状態整合性保証の必要性のバランスをとる必要があります。どちらのパターンも、注意深く適用されれば、Rust開発者が非常にパフォーマンスが高く信頼性の高いリアルタイムサービスを構築することを可能にします。

