Redis遅延キューを単純化する
Grace Collins
Solutions Engineer · Leapcell

遅延キューは、基本的に実行を遅らせるメッセージキューです。どのようなビジネスシナリオで役立つのでしょうか?
実用的なシナリオ
- 注文の支払いが失敗した場合、定期的にユーザーにリマインダーを送信します。
- ユーザーの同時実行の場合、ユーザーへのメール送信を2分遅延させることができます。
Redisを使用して基本的なメッセージキューを実装する
ご存知のように、KafkaやRabbitMQなどのプロフェッショナルなメッセージキューミドルウェアでは、コンシューマーはメッセージを消費する前に一連の複雑な手順を経る必要があります。
たとえば、RabbitMQでは、メッセージを送信する前にExchangeを作成し、次にQueueを作成し、いくつかのルーティングルールでQueueとExchangeをバインドし、メッセージを送信するときにルーティングキーを指定し、メッセージヘッダー情報を制御する必要があります。 ほとんどの場合、メッセージキューにコンシューマーが1つしかない場合でも、上記の手順をすべて実行する必要があります。
Redisを使用すると、コンシューマーグループが1つしかないメッセージキューの場合、物事ははるかに簡単になります。Redisは専門的なメッセージキューではなく、高度な機能はありません。ack保証はありません。したがって、メッセージに厳密な信頼性要件がある場合、Redisは適切ではない可能性があります。
非同期メッセージキューの基本的な実装
Redisのlist
データ構造は、通常、非同期メッセージキューに使用されます。rpush
またはlpush
を使用してアイテムをエンキューし、lpop
またはrpop
を使用してデキューできます。
> rpush queue Leapcell_1 Leapcell_2 Leapcell_3 (integer) 3 > lpop queue "Leapcell_1" > llen queue (integer) 2
問題1:キューが空の場合はどうなるか?
クライアントはpop操作を使用してメッセージをフェッチし、処理します。処理後、次のメッセージをフェッチして処理を続行します。このサイクルが繰り返されます。これがキューコンシューマーのライフサイクルです。
ただし、キューが空の場合、クライアントはpop操作のデッドループに入ります。データなしで何度も何度も連続してpopします。これは無駄で非効率的なポーリングです。クライアントのCPU使用率が急上昇するだけでなく、Redis QPSも増加します。多数のクライアントがこのようにポーリングしている場合、Redisで多数のスロークエリが発生する可能性があります。
通常、これはsleep操作で解決します。スレッドを1秒間スリープさせます。これにより、クライアント側のCPU使用率が低下し、Redis QPSも低下します。
問題2:キューの遅延
スリープは問題の解決に役立ちますが、コンシューマーが1つしかない場合、その遅延は1秒です。複数のコンシューマーがいる場合、スリープ時間がずれているため、遅延は多少短縮されます。
この遅延を大幅に短縮する方法はありますか?
はい。blpop
/ brpop
を使用します。
b
プレフィックスはブロッキング、つまりブロッキング読み取りを表します。
キューにデータがない場合、ブロッキング読み取りにより、スレッドはすぐにスリープ状態になり、データが到着するとすぐにウェイクアップします。これにより、メッセージの遅延がほぼゼロになります。lpop
/ rpop
をblpop
/ brpop
に置き換えることで、上記の問題を完全に解決できます。
問題3:アイドル接続が自動的に切断される
対処する必要があるもう1つの問題があります。アイドル接続です。
スレッドが長時間ブロックされたままになっている場合、Redisクライアント接続はアイドル状態になります。ほとんどのサーバーは、リソースの使用量を減らすために、アイドル接続を積極的に閉じます。これが発生すると、blpop
/ brpop
は例外をスローします。
したがって、クライアント側のコンシューマーロジックを記述するときは、必ず例外をキャッチし、再試行ロジックを実装してください。
分散ロックの競合の処理
リクエストの処理中に、クライアントが分散ロックを取得できなかった場合はどうなりますか?
通常、ロックの取得の失敗を処理するには、次の3つの戦略があります。
- 例外を直接スローし、後で再試行するようにユーザーに通知します。
- しばらくスリープしてから再試行します。
- リクエストを遅延キューに移動し、後で再試行します。
特定の種類の例外を直接スローする
このアプローチは、ユーザーが開始したリクエストに適しています。ユーザーがエラーダイアログを表示すると、通常、メッセージを読んで「再試行」をクリックします。これにより、自然に遅延が発生します。ユーザーエクスペリエンスを向上させるために、フロントエンドコードはユーザーに依存する代わりに、この再試行遅延を引き継ぐことができます。基本的に、この戦略は現在のリクエストを破棄し、再開するかどうかをユーザーに委ねます。
スリープ
sleep
を使用すると、現在のメッセージ処理スレッドがブロックされ、キュー内の後続のメッセージの処理が遅延します。競合が頻繁に発生する場合、またはキューに多数のメッセージがある場合、sleep
は適切ではない可能性があります。ロックの取得の失敗がデッドロックされたキーによって発生した場合、スレッドは完全にスタックし、それ以上のメッセージ処理を防ぎます。
遅延キュー
この戦略は、非同期メッセージ処理により適しています。競合するリクエストを別のキューに投入して後で処理し、即時の競合を回避します。
遅延キューの実装
Redisのzset
データ構造を使用して、タイムスタンプをスコアとして割り当て、要素をソートできます。zadd score1 value1 ...
コマンドを使用して、メッセージをメモリ内に継続的に生成します。次に、zrangebyscore
を使用して、処理の準備ができているすべてのタスクをクエリします。これらのタスクをループ処理して、1つずつ実行できます。また、zrangebyscore key min max withscores limit 0 1
を使用して、消費する最も早いタスクのみをクエリすることもできます。
private Jedis jedis; public void redisDelayQueueTest() { String key = "delay_queue"; // 実際のアプリケーションでは、ビジネスIDとランダムに生成された一意のIDを値として使用することをお勧めします。 // 一意のIDはメッセージの一意性を保証し、ビジネスIDは値に過剰なデータを保持することを回避します。 String orderId1 = UUID.randomUUID().toString(); jedis.zadd(queueKey, System.currentTimeMillis() + 5000, orderId1); String orderId2 = UUID.randomUUID().toString(); jedis.zadd(queueKey, System.currentTimeMillis() + 5000, orderId2); new Thread() { @Override public void run() { while (true) { Set<String> resultList; // 最初のアイテムのみを取得します(非破壊的な読み取り) resultList = jedis.zrangebyscore(key, System.currentTimeMillis(), 0, 1); if (resultList.size() == 0) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); break; } } else { // フェッチされたデータを削除します if (jedis.zrem(key, resultList.iterator().next()) > 0) { String orderId = resultList.iterator().next(); log.info("orderId = {}", orderId); this.handleMsg(orderId); } } } } }.start(); } public void handleMsg(T msg) { System.out.println(msg); }
上記の実装は、マルチスレッドシナリオでも問題なく動作します。2つのスレッドT1とT2、およびより多くのスレッドがあるとします。ロジックは次のように進み、1つのスレッドのみがメッセージを処理することを保証します。
- T1、T2、およびその他のスレッドは
zrangebyscore
を呼び出し、メッセージAを取得します。 - T1はメッセージAの削除を開始します。これはアトミック操作であるため、T2およびその他のスレッドはT1が
zrem
を完了するまで待機してから続行します。 - T1はメッセージAを正常に削除し、処理します。
- T2などはメッセージAを削除しようとしますが、すでに削除されているため失敗します。処理をあきらめます。
また、handleMsg
に例外処理を追加して、1つの障害のあるタスクが処理ループ全体をクラッシュさせないようにしてください。
さらなる最適化
上記のアルゴリズムでは、同じタスクが複数のプロセスによってフェッチされる可能性があり、zrem
を使用して削除に成功するのは1つだけです。他のプロセスは無駄にタスクをフェッチします。これは無駄です。これを改善するには、Luaスクリプトを使用してロジックを最適化し、zrangebyscore
とzrem
をサーバー側の単一のアトミック操作に結合します。このようにすると、同じタスクを競合する複数のプロセスが不要なフェッチを行うことはありません。
Luaスクリプトを使用してさらに最適化する
Luaスクリプトは、期限切れのメッセージを確認し、削除し、削除が成功した場合はメッセージを返します。それ以外の場合は、空の文字列を返します。
String luaScript = "local resultArray = redis.call('zrangebyscore', KEYS[1], 0, ARGV[1], 'limit', 0, 1)\n" + "if #resultArray > 0 then\n" + " if redis.call('zrem', KEYS[1], resultArray[1]) > 0 then\n" + " return resultArray[1]\n" + " else\n" + " return ''\n" + " end\n" + "else\n" + " return ''\n" + "end"; jedis.eval(luaScript, ScriptOutputType.VALUE, new String[]{key}, String.valueOf(System.currentTimeMillis()));
Redisベースの遅延キューの利点
Redisは、遅延キューの実装に使用する場合、次の利点を提供します。
- Redis
zset
は、高性能なスコアベースのソートを提供します。 - Redisはインメモリで動作するため、非常に高速です。
- Redisはクラスタリングをサポートしています。メッセージが多い場合、クラスタはメッセージ処理速度と可用性を向上させることができます。
- Redisは永続化をサポートしています。障害が発生した場合、AOFまたはRDBを使用してデータを回復し、信頼性を確保できます。
Redisベースの遅延キューの欠点
ただし、Redisベースの遅延キューには、いくつかの制限もあります。
- メッセージの永続性と信頼性は依然として懸念事項です。Redisは永続化をサポートしていますが、専用のMQほど信頼性が高くありません。
- 再試行メカニズムがない – メッセージの処理中に例外が発生した場合、Redisは組み込みの再試行メカニズムを提供しません。再試行回数の管理を含め、これを自分で実装する必要があります。
- ACKメカニズムがない – たとえば、クライアントがメッセージを取得して削除したが、処理中にクラッシュした場合、メッセージは失われます。対照的に、メッセージキュー(MQ)は、メッセージを削除する前に、正常な処理を確認するための確認応答が必要です。
メッセージの信頼性が重要な場合は、専用のMQを使用することをお勧めします。
Redissonを使用した遅延キューの実装
Redissonベースの分散遅延キュー(RDelayedQueue
)は、RQueue
インターフェイス上に構築されており、キューへのアイテムの追加を遅延させる機能を提供します。これは、幾何学的に増加または減少するメッセージ配信戦略を実装するために使用できます。
RQueue<String> destinationQueue = ... RDelayedQueue<String> delayedQueue = getDelayedQueue(destinationQueue); // 10秒後にメッセージをキューに送信します delayedQueue.offer("msg1", 10, TimeUnit.SECONDS); // 1分後にメッセージをキューに送信します delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);
オブジェクトが不要になった場合は、積極的に破棄する必要があります。関連付けられたRedissonオブジェクトがシャットダウンされている場合にのみ、手動で破棄しないことが許容されます。
RDelayedQueue<String> delayedQueue = ... delayedQueue.destroy();
便利ではありませんか?
Leapcellは、バックエンドプロジェクトをホストするための最適な選択肢です。
Leapcellは、Webホスティング、非同期タスク、およびRedis向けの次世代サーバーレスプラットフォームです。
多言語サポート
- Node.js、Python、Go、またはRustで開発します。
無制限のプロジェクトを無料でデプロイ
- 使用量に対してのみ支払い、リクエストや料金はかかりません。
比類のないコスト効率
- アイドル料金なしの従量課金制。
- 例:25ドルで平均応答時間60ミリ秒で694万リクエストをサポートします。
合理化された開発者エクスペリエンス
- 簡単なセットアップのための直感的なUI。
- 完全に自動化されたCI / CDパイプラインとGitOps統合。
- 実用的な洞察のためのリアルタイムメトリックとロギング。
簡単なスケーラビリティと高パフォーマンス
- 高い同時実行性を簡単に処理するための自動スケーリング。
- 運用上のオーバーヘッドはゼロです。構築に集中するだけです。
ドキュメントで詳細をご覧ください。
Xでフォローしてください:@LeapcellHQ