async-graphql サブスクリプションによるリアルタイムデータ
Emily Parker
Product Engineer · Leapcell

はじめに:静的データと動的なユーザーエクスペリエンスの架け橋
今日のペースの速いデジタル世界では、ユーザーは瞬時の更新と反応性の高いインターフェイスを期待しています。静的データの取得には効果的ですが、従来の要求-応答パラダイムは、ライブスポーツスコア、チャットメッセージ、株価の変更などのリアルタイム情報を提供する際にはしばしば不十分です。サーバーへの継続的なポーリングは非効率的でリソースを大量に消費する可能性があり、帯域幅の無駄とサーバー負荷の増加につながります。そこで、リアルタイムテクノロジーが活躍し、イベントが発生するとサーバーが接続されたクライアントに proactively データをプッシュできるようになります。GraphQL は、その強力なクエリ言語により、データ取得のためのエレガントなソリューションを提供しますが、その真のリアルタイムの可能性はサブスクリプションを通じて解き放たれます。この記事では、Rust で async-graphql
を活用して GraphQL サブスクリプションを実装し、アプリケーションを静的なデータプロバイダーから動的なリアルタイムの強力なものに変える方法について説明します。
GraphQL サブスクリプションによるリアルタイムの理解
コードに飛び込む前に、GraphQL と async-graphql
によるリアルタイムデータ更新の基盤となるいくつかの基本的な概念を明確にしましょう。
主要な概念
- GraphQL: API 用のオープンソースデータクエリおよび操作言語であり、既存のデータでクエリを満たすためのランタイムです。クライアントは、必要なものを正確に、それ以上でもそれ以下でもないものを要求できます。
- サブスクリプション: 特定のイベントが発生したときにクライアントがサーバーからリアルタイム更新を受信できるようにする GraphQL 操作のタイプです。クエリ(一度取得)およびミューテーション(変更して一度取得)とは異なり、サブスクリプションはオープン接続(通常は WebSocket ベース)を維持し、サーバーで新しいイベントが公開されるとクライアントにデータをプッシュします。
- WebSockets: 単一の TCP 接続を介してフルデュプレックス通信チャネルを提供する通信プロトコルです。この永続的な接続はサブスクリプションにとって重要であり、クライアントがデータを繰り返し要求することなく、サーバーが更新をプッシュできるようにします。
async-graphql
: Rust 用の強力で人間工学に基づいた GraphQL サーバーライブラリです。サブスクリプションを含むすべての GraphQL 操作タイプをサポートし、さまざまな非同期ランタイムおよび Web フレームワークとシームレスに統合されます。- Publish-Subscribe (Pub/Sub) パターン: 送信者(発行者)がメッセージを特定の受信者(購読者)に直接送信するのではなく、公開されたメッセージをクラスに分類するメッセージングパターンです。購読者は 1 つ以上のクラスへの関心を表明し、関心のあるメッセージのみを受信します。このパターンは、グローバルイベントがサブスクリプション更新をトリガーする方法の基盤となります。
GraphQL サブスクリプションのメカニズム
基本的に、GraphQL サブスクリプションは次のように機能します。
- クライアントがサブスクリプションを開始: クライアントは、通常 WebSocket 接続を介して、GraphQL サブスクリプションクエリをサーバーに送信します。
- サーバーがイベントをリッスン: サーバーは、サブスクリプションを受信すると、クライアントの関心を特定のイベントストリームまたはトピックに登録します。次に、これらのイベントをリッスンするメカニズムをセットアップします。
- イベントが発生して公開: サーバー側でイベントが発生すると(例:新しいチャットメッセージが送信された、データベースレコードが更新された)、サーバーはこのイベントを Pub/Sub システム(例:インメモリチャネル、Redis Pub/Sub、Kafka)に公開します。
- サーバーが購読者に通知: Pub/Sub システム、またはそれにリッスンしているコンポーネントが、公開されたイベントをピックアップします。次に、GraphQL サーバーは、クライアントの元のサブスクリプションクエリに従ってこのイベントデータを変換し、確立された WebSocket 接続を介して結果のペイロードをクライアントにプッシュします。
- クライアントが更新を受信: クライアントはリアルタイム更新を受信し、それに応じて UI または状態を更新できます。
async-graphql
によるサブスクリプションの実装
リアルタイムで新しいメッセージがサブスクライブされたクライアントにプッシュされるシンプルなチャットアプリケーションを実装する例を次に示します。
まず、Cargo.toml
で依存関係を設定する必要があります。
[dependencies] async-graphql = { version = "7.0", features = ["apollo_tracing", "tracing"] } async-graphql-warp = "7.0" # または async-graphql-actix-web, async-graphql-poem など。 tokio = { version = "1.0", features = ["full"] } tokio-stream = "0.1" futures = "0.3" warp = "0.3" # または actix-web, poem など。
次に、サブスクリプションタイプを含む GraphQL スキーマを定義します。ここでは、単純化のために Pub/Sub システムのシミュレーションとしてインメモリチャネルを使用します。運用環境では Redis などを検討してください。
use async_graphql::{ http::{GraphiQLSource, WebSocket}, Subscription, Schema, Object, futures_util::stream::{SplitSink, SplitStream}, futures_util::{Stream, SinkExt, StreamExt}, Context, EmptyMutation, }; tokio::sync::broadcast; tokio_stream::wrappers::BroadcastStream; futures::channel::mpsc; use std::{pin::Pin, collections::HashMap, sync::Arc, sync::Mutex}; use warp::{Filter, Rejection, Reply}; // シンプルな Message 構造体 #[derive(Clone, Debug)] struct Message { id: u32, content: String, author: String, } // Pub/Sub システム:ブロードキャストチャネル // 送信者を Arc<Mutex<_>> に格納して、スレッド間での共有と変更を可能にする type MessageSender = Arc<broadcast::Sender<Message>>; // 送信者を含む GraphQL コンテキスト struct AppContext { message_sender: MessageSender, // 簡単のため、メッセージはメモリに格納する messages: Arc<Mutex<Vec<Message>> >, next_id: Arc<Mutex<u32>>, } // メッセージを追加するメソッドを実装 impl AppContext { fn add_message(&self, content: String, author: String) -> Message { let mut messages = self.messages.lock().unwrap(); let mut next_id = self.next_id.lock().unwrap(); let id = *next_id; *next_id += 1; let new_message = Message { id, content, author }; messages.push(new_message.clone()); // 新しいメッセージをブロードキャストチャネルに公開する if let Err(e) = self.message_sender.send(new_message.clone()) { eprintln!("Failed to send message: {:?}", e); } new_message } } // Query タイプ:通常の既存メッセージまたは他のデータの取得用 struct Query; #[Object] impl Query { async fn hello(&self) -> String { "world".to_string() } async fn messages(&self, ctx: &Context<'_>) -> Vec<Message> { let app_ctx = ctx.data::<AppContext>().unwrap(); app_ctx.messages.lock().unwrap().clone() } } // Subscription タイプ:クライアントが購読できるものを定義する struct Subscription; #[Subscription] impl Subscription { // このサブスクリプションは新しいメッセージをストリーミングする async fn new_messages<'ctx>(&self, ctx: &'ctx Context<'_>) -> impl Stream<Item = Message> + 'ctx { let app_ctx = ctx.data::<AppContext>().unwrap(); let receiver = app_ctx.message_sender.subscribe(); BroadcastStream::new(receiver) .map(|result| result.unwrap_or_else(|e| { eprintln!("Error receiving message from broadcast channel: {:?}", e); // thực tếなアプリでは、このエラーをもっと適切に処理する方が良いかもしれません、 // 例えば、アイテムをスキップしたり、カスタムエラーメッセージを送信したりするなど。 Message { id: 0, content: "Error".to_string(), author: "System".to_string() } })) } } // Mutation タイプ:新しいメッセージを公開するため struct Mutation; #[Object] impl Mutation { async fn send_message(&self, ctx: &Context<'_>, content: String, author: String) -> Message { let app_ctx = ctx.data::<AppContext>().unwrap(); app_ctx.add_message(content, author) } } // GraphQL スキーマをビルドする関数 fn build_schema() -> Schema<Query, Mutation, Subscription> { let (tx, _rx) = broadcast::channel(1024); // 1024 メッセージのバッファ let message_sender = Arc::new(tx); let app_ctx = AppContext { message_sender: message_sender.clone(), messages: Arc::new(Mutex::new(Vec::new())), next_id: Arc::new(Mutex::new(1)), }; Schema::build(Query, Mutation, Subscription) .data(app_ctx) .finish() } #[tokio::main] async fn main() { let schema = build_schema(); // GraphQL エンドポイントを定義 let graphql_post = async_graphql_warp::graphql(schema.clone()) .and_then(|(schema, request)| async move { Ok::<_, Rejection>(async_graphql_warp::Response::from(schema.execute(request).await)) }); // GraphQL サブスクリプション エンドポイント // これは Warp 用に `async_graphql_warp::graphql_subscription` を使用 let graphql_ws = async_graphql_warp::graphql_subscription(schema); // GraphiQL IDE エンドポイントを定義 let graphiql = warp::path!("graphiql") .and(warp::get()) .map(|| { warp::http::Response::builder() .header("content-type", "text/html") .body(GraphiQLSource::build().endpoint("/").subscription_endpoint("/").finish()) .unwrap() }); let routes = graphql_post .or(graphql_ws) .or(graphiql); println!("GraphiQL IDE: http://localhost:8000/graphiql"); warp::serve(routes).run(([127, 0, 0, 1], 8000)).await; }
コード例の説明
Message
構造体: チャットメッセージを表すシンプルなMessage
構造体です。tokio::sync::broadcast
はメッセージをクローン可能にする必要があるため、Clone
が必要です。MessageSender
(Pub/Sub): インメモリ Pub/Sub メカニズムとしてtokio::sync::broadcast::channel
を使用します。sender.send(message)
を介してメッセージが送信されると、アクティブなすべての購読者はメッセージのクローンを受信します。AppContext
: GraphQL コンテキストとして機能し、MessageSender
やすべてのメッセージを格納するベクトル(クエリ用)などの共有状態を保持します。messages
とnext_id
をArc<Mutex<T>>
でラップして、スレッド間で安全な可変アクセスを可能にします。Query
タイプ: すべての履歴メッセージを取得するためのmessages
フィールドと、単純なhello
フィールドを定義します。Mutation
タイプ:send_message
ミューテーションは新しいMessage
を作成し、インメモリ ストアに追加し、重要なことにself.message_sender.send(new_message)
を呼び出して公開します。Subscription
タイプ:#[Subscription]
属性は、これが GraphQL サブスクリプション タイプであることを示します。new_messages
フィールドは、impl Stream<Item = Message>
を返す非同期関数です。これはサブスクリプションのコアです。new_messages
内で、app_ctx.message_sender.subscribe()
を介してmessage_sender
から受信機を取得します。BroadcastStream::new(receiver)
はtokio::sync::broadcast::Receiver
をfutures::Stream
に変換します。これはasync-graphql
が理解するものです。- ブロードキャストチャネルからの潜在的なエラーを処理するために
.map
を追加し、結果をアンラップします。
main
関数セットアップ:Schema
をビルドし、AppContext
を.data(app_ctx)
を使用して提供します。これにより、AppContext
はctx.data::<AppContext>()
を介してすべてのリゾルバでアクセス可能になります。- 3 つの Warp ルートをセットアップします。
/
:async_graphql_warp::graphql()
を使用して標準の GraphQL クエリとミューテーションを処理します。/
:async_graphql_warp::graphql_subscription()
を使用して、サブスクリプション用の WebSocket 経由の GraphQL を処理します。HTTP と WebSocket の両方に同じエンドポイントパスを使用することは一般的です。異なるプロトコルで通信するためです。/graphiql
: GraphiQL IDE エンドポイントを提供し、メイン GraphQL エンドポイントおよびサブスクリプション エンドポイントと対話するように構成されます。
サブスクリプションのテスト
- アプリケーションを実行します:
cargo run
- ブラウザで
http://localhost:8000/graphiql
を開きます。 - GraphiQL IDE で、2 つのタブまたはウィンドウを開きます。
タブ 1 (サブスクリプション):
subscription NewMessages { newMessages { id content author } }
このサブスクリプションを実行します。WebSocket 接続が確立され、リッスンが開始されます。
タブ 2 (ミューテーション):
mutation SendMessage { sendMessage(content: "Hello from GraphQL!", author: "Alice") { id content author } }
このミューテーションを数回実行し、コンテンツと作成者を変更します。すぐに、タブ 1 のサブスクリプション出力に新しいメッセージが表示されるはずです。
別のタブで履歴メッセージをクエリすることもできます。
query GetMessages { messages { id content author } }
アプリケーションシナリオ
GraphQL サブスクリプションは、さまざまなリアルタイムアプリケーションに最適です。
- チャットアプリケーション: インスタントメッセージング、グループチャット。
- ライブダッシュボード: リアルタイムメトリクス、システム監視。
- 共同編集: Google ドキュメントのように、変更が即座に反映されるアプリケーション。
- ゲーム: ゲームの状態、プレイヤーの位置の更新。
- 金融アプリケーション: ライブ株価ティッカー、仮想通貨価格。
- 通知: ユーザー通知の即時プッシュ。
結論:動的なユーザーエクスペリエンスの強化
Rust で async-graphql
を使用して GraphQL サブスクリプションを実装することは、アプリケーションにリアルタイムデータ機能を構築するための堅牢で効率的な方法を提供します。WebSocket プロトコルと Publish-Subscribe パターンを活用することで、async-graphql
は、イベントが発生するとサーバーがクライアントに proactively データをプッシュできるようにし、静的なデータ表現を動的な、生きているインターフェイスに変換することで、ユーザーエクスペリエンスを劇的に向上させます。Rust のパフォーマンスと async-graphql
の人間工学のこの強力な組み合わせにより、開発者は比較的簡単に、高度な応答性とインタラクティブなアプリケーションを作成できます。