Goでのトークンバケットアルゴリズムの実装:トークンバケットアルゴリズムによるレート制限実装
Takashi Yamamoto
Infrastructure Engineer · Leapcell

Goでのトークンバケットアルゴリズムの実装
レート制限の概要
コンピュータサイエンスとネットワークエンジニアリングの分野では、レート制限はシステムの安定性を維持し、不正利用を防ぎ、公平なリソース割り当てを保証するための重要なメカニズムとして位置付けられています。その中核となるレート制限は、特定の時間枠内で処理できるリクエストまたは操作の数を制御します。これは、APIゲートウェイ、分散システム、およびリソース消費を規制する必要があるアプリケーションなどのシナリオで不可欠になります。
レート制限を実装するために設計されたさまざまなアルゴリズムの中で、トークンバケットアルゴリズムは、最も人気があり用途の広い選択肢の1つとして登場しました。その魅力は、定常状態のトラフィックと時折発生するトラフィックバーストの両方を処理できる能力にあり、幅広い現実世界のアプリケーションに適しています。この包括的なガイドでは、トークンバケットアルゴリズムの内部動作を掘り下げ、Goプログラミング言語での実装を探求し、その実用的なアプリケーションと考慮事項を検討します。
トークンバケットアルゴリズムの理解
コアコンセプト
トークンバケットアルゴリズムは、トークンを保持するバケットという単純かつエレガントなメタファーに基づいて動作します。各トークンは、操作を実行するか、リクエストを処理する権利を表します。アルゴリズムは、次の主要なメカニズムを通じて機能します。
- トークン生成: 一定の間隔で、新しいトークンがバケットに追加されます。トークンが生成されるレートは、システムが処理できる長期的な平均リクエストレートを決定します。
- トークン消費: リクエストを処理するには、トークンをバケットから削除する必要があります。トークンが利用可能な場合、リクエストは許可されます。そうでない場合、リクエストは遅延または拒否されます。
- バケット容量: バケットには最大容量があり、一度に保持できるトークンの数を制限します。この容量は、システムがトラフィックバーストを処理する能力を決定します。
ビジュアル表現
+------------------------+
| |
| [トークン] [トークン] |
| [トークン] [トークン] | <- 容量 = 5 のバケット
| [トークン] | 現在 5 つのトークンを保持
| |
+------------------------+
^ |
| v
+------------------------+
| |
| トークンジェネレーター | <- 1 秒あたり 2 つのトークンを追加
| |
+------------------------+
^
|
+------------------------+
| |
| 受信リクエスト |
| |
+------------------------+
この図では、バケットは最大5つのトークンを保持できます。トークンジェネレーターは1秒あたり2つのトークンをバケットに追加しますが、バケットが5つのトークンの容量を超えることはありません。受信リクエストが到着すると、各リクエストはバケットから1つのトークンを取得します。トークンが利用できない場合、リクエストは後で処理するためにキューに入れられるか、完全に拒否されます。
動的動作
トークンバケットアルゴリズムの真の力は、その動的動作にあります。
- リクエスト量が少ない期間中、トークンはバケットに蓄積され、最大容量まで蓄積されます。これにより、突然のトラフィックスパイクを処理するために使用できる「予備」が構築されます。
- トラフィックバースト中、システムはバケットにトークンがある限り、平均トークン生成レートよりも高いレートでリクエストを処理できます。
- バケットが空になると、システムはトークン生成レートでのみリクエストを処理でき、トラフィックを長期的な平均に効果的に絞ります。
この動作により、トークンバケットアルゴリズムは、Webサーバー、APIエンドポイント、ネットワークルーターなど、変動するトラフィックパターンが発生するアプリケーションに特に適しています。
Goでのトークンバケットの実装
Goの並行処理プリミティブ、特にゴルーチンとチャネルは、トークンバケットアルゴリズムを実装するための優れた言語になります。ステップごとの実装を見ていきましょう。
基本的なトークンバケット構造
まず、トークンバケットの構造を定義しましょう。
package tokenbucket import ( "sync" "time" ) // TokenBucketは、トークンバケットレートリミッターを表します type TokenBucket struct { capacity int // バケットが保持できるトークンの最大数 rate int // 1秒あたりに追加するトークンの数 tokens int // バケット内の現在のトークン数 lastRefill time.Time // 最後のトークン補充のタイムスタンプ mutex sync.Mutex // 同時アクセスを保護するためのMutex }
この構造には以下が含まれます:
capacity
: バケットが保持できるトークンの最大数rate
: 1秒あたりに追加するトークンの数tokens
: バケット内の現在のトークン数lastRefill
: トークンが最後にバケットに追加されたときのタイムスタンプmutex
: スレッドの安全性を確保するための同期プリミティブ
新しいトークンバケットの作成
次に、新しいトークンバケットを作成する関数を実装しましょう。
// NewTokenBucketは、指定された容量とレートで新しいトークンバケットを作成します func NewTokenBucket(capacity, rate int) *TokenBucket { return &TokenBucket{ capacity: capacity, rate: rate, tokens: capacity, // フルバケットから開始 lastRefill: time.Now(), } }
この関数は、指定された容量とトークン生成レートで新しいトークンバケットを初期化します。フルバケットから開始することにより、バケットの容量までの即時トラフィックバーストを許可します。
トークンの補充
トークンバケットアルゴリズムの中心は、トークン補充メカニズムです。最後の補充からの経過時間に基づいて、バケットに追加するトークンの数を計算する必要があります。
// refillは、最後の補充からの経過時間に基づいて、トークンをバケットに追加します func (tb *TokenBucket) refill() { now := time.Now() elapsed := now.Sub(tb.lastRefill) // 追加するトークンの数を計算します tokensToAdd := int(elapsed.Seconds() * float64(tb.rate)) if tokensToAdd > 0 { tb.lastRefill = now // トークンを追加しますが、バケットの容量を超えないようにします tb.tokens = min(tb.tokens+tokensToAdd, tb.capacity) } } // minは、2つの整数の最小値を見つけるためのヘルパー関数です func min(a, b int) int { if a < b { return a } return b }
トークンの取得
次に、クライアントがバケットからトークンを取得できるようにするメソッドを実装しましょう。
// Takeは、バケットから指定された数のトークンを取得しようとします // 成功した場合はtrue、それ以外の場合はfalseを返します func (tb *TokenBucket) Take(tokens int) bool { tb.mutex.Lock() defer tb.mutex.Unlock() // まず、経過時間に基づいてトークンでバケットを補充します tb.refill() // 十分なトークンがあるかどうかを確認します if tb.tokens >= tokens { tb.tokens -= tokens return true } // 十分なトークンがありません return false }
Take
メソッドは、次の手順を実行します。
- 複数のゴルーチンがバケットに同時にアクセスしようとする可能性があるため、スレッドの安全性を確保するためにロックを取得します。
- 最後の操作以降に生成されたトークンでバケットを補充します。
- リクエストを満たすのに十分なトークンがあるかどうかを確認します。
- 十分なトークンがある場合は、バケットからトークンを削除して
true
を返します。それ以外の場合は、false
を返します。
ブロッキング取得
場合によっては、トークンが利用可能になるまでブロックし、すぐにfalseを返さないようにする必要があります。トークンが利用可能になるまで指定された期間待機するTakeWithTimeout
メソッドを実装しましょう。
// TakeWithTimeoutは、指定されたタイムアウトまで待機して、バケットからトークンを取得しようとします // 成功した場合はtrue、タイムアウトに達した場合はfalseを返します func (tb *TokenBucket) TakeWithTimeout(tokens int, timeout time.Duration) bool { // 待機を停止できる最も早い時間を計算します deadline := time.Now().Add(timeout) for { tb.mutex.Lock() // バケットを補充します tb.refill() // 十分なトークンがあるかどうかを確認します if tb.tokens >= tokens { tb.tokens -= tokens tb.mutex.Unlock() return true } // より多くのトークンを待つ必要がある時間を計算します tokensNeeded := tokens - tb.tokens timeNeeded := time.Duration(tokensNeeded) * time.Second / time.Duration(tb.rate) // デッドライン前にトークンを取得できる場合は、待機して再試行します if time.Now().Add(timeNeeded).Before(deadline) { // 他の操作を許可するために待機する前にロックを解除します tb.mutex.Unlock() // 必要な時間だけ待機しますが、残りのタイムアウトを超えることはありません waitTime := minDuration(timeNeeded, deadline.Sub(time.Now())) time.Sleep(waitTime) } else { // 必要なトークンを取得するのに十分な時間がありません tb.mutex.Unlock() return false } } } // minDurationは、2つの期間の最小値を見つけるためのヘルパー関数です func minDuration(a, b time.Duration) time.Duration { if a < b { return a } return b }
このメソッドは、必要なトークンが取得されるか、タイムアウトに達するまで、利用可能なトークンを繰り返し確認するためにループを使用します。必要なトークンを生成するために必要な最小時間を計算し、その期間(またはタイムアウトまで、いずれか早い方)待機してから再度確認します。
高度な機能と最適化
バースト制御
基本的な実装ではトークンの蓄積を許可することでバーストを処理しますが、より高度なバースト制御を追加したい場合があります。たとえば、1つのリクエストで取得できるトークンの最大数を制限できます。
// TakeWithBurstLimitはTakeに似ていますが、1つのリクエストで取得できるトークンの最大数を制限します func (tb *TokenBucket) TakeWithBurstLimit(tokens, maxBurst int) bool { // 最大バーストサイズを超えないようにします if tokens > maxBurst { tokens = maxBurst } return tb.Take(tokens) }
この変更により、1つのリクエストが利用可能なすべてのトークンを消費するのを防ぎ、他のリクエストのためにいくつかのトークンが残るようにします。
モニタリングとメトリクス
本番システムでは、トークンバケットの状態を監視すると便利な場合があります。
// Metricsは、トークンバケットの現在の状態を返します func (tb *TokenBucket) Metrics() (capacity, rate, currentTokens int) { tb.mutex.Lock() defer tb.mutex.Unlock() // 正確な現在のトークン数を取得するために補充します tb.refill() return tb.capacity, tb.rate, tb.tokens }
このメソッドは、バケットの容量、トークン生成レート、および現在のトークン数に関する洞察を提供し、パフォーマンスの監視と調整に非常に役立ちます。
動的レート調整
一部のシナリオでは、システムの状態に基づいてトークン生成レートを動的に調整したい場合があります。
// SetRateは、トークン生成レートを更新します func (tb *TokenBucket) SetRate(newRate int) { tb.mutex.Lock() defer tb.mutex.Unlock() // 最初に補充して、古いレートでの時間を考慮します tb.refill() tb.rate = newRate }
これにより、システムは、オフピーク時にレートを上げたり、負荷が高い時にレートを下げたりするなど、変化する状態に適応できます。
実用的なアプリケーション
APIレート制限
トークンバケットアルゴリズムの最も一般的なアプリケーションの1つは、APIレート制限です。
// 例: トークンバケットを使用してAPIエンドポイントのレートを制限する func handleAPIRequest(w http.ResponseWriter, r *http.Request, bucket *TokenBucket) { // このリクエストのトークンを取得しようとします if !bucket.Take(1) { http.Error(w, "Too many requests", http.StatusTooManyRequests) return } // リクエストを処理します... w.WriteHeader(http.StatusOK) w.Write([]byte("リクエストは正常に処理されました")) }
この例では、各APIリクエストはバケットから1つのトークンを消費します。トークンが利用できない場合、サーバーは429 Too Many Requests応答を返します。
トラフィックシェイピング
トークンバケットアルゴリズムは、ネットワークアプリケーションのトラフィックシェイピングにも使用できます。
// 例: トークンバケットをトラフィックシェイピングに使用する func sendPackets(packets [][]byte, bucket *TokenBucket) { for _, packet := range packets { // 各パケットは、そのサイズに基づいてトークンを消費します packetSize := len(packet) // このパケットに十分なトークンがあるまで待ちます if !bucket.TakeWithTimeout(packetSize, 5*time.Second) { fmt.Println("トークンを待機中にタイムアウトしました") return } // パケットを送信します... fmt.Printf("サイズ%dのパケットを送信しています\n", packetSize) } }
ここでは、必要なトークンの数は、送信されるパケットのサイズに比例するため、ネットワーク帯域幅をより細かく制御できます。
リソース割り当て
トークンバケットを使用して、システム内のリソース割り当てを管理することもできます。
// 例: トークンバケットを使用して、さまざまなタイプの操作に優先順位を付ける type ResourceManager struct { criticalOperationsBucket *TokenBucket normalOperationsBucket *TokenBucket lowPriorityBucket *TokenBucket } func NewResourceManager() *ResourceManager { return &ResourceManager{ // クリティカルな操作は、より多くのトークンとより大きなバースト容量を取得します criticalOperationsBucket: NewTokenBucket(100, 50), // 通常の操作は、適度なトークン割り当てを取得します normalOperationsBucket: NewTokenBucket(50, 20), // 優先度の低い操作は、より少ないトークンを取得します lowPriorityBucket: NewTokenBucket(20, 5), } } func (rm *ResourceManager) performCriticalOperation() bool { return rm.criticalOperationsBucket.Take(1) } func (rm *ResourceManager) performNormalOperation() bool { return rm.normalOperationsBucket.Take(1) } func (rm *ResourceManager) performLowPriorityOperation() bool { return rm.lowPriorityBucket.Take(1) }
この例では、異なるタイプの操作には、容量とレートが異なるトークンバケットがあり、クリティカルな操作がシステムリソースへの優先アクセスを確実に受けられるようにします。
パフォーマンスに関する考慮事項
並行性
Goでトークンバケットアルゴリズムを実装する場合は、同時アクセスのパフォーマンスへの影響を考慮することが重要です。mutexを使用すると、スレッドの安全性が確保されますが、同時実行性の高いシステムではボトルネックになる可能性があります。
これを軽減する方法の1つは、よりきめ細かいロック戦略を使用するか、トークンバケットを複数のインスタンスにシャーディングすることです。
// 例: 並行性を向上させるためのトークンバケットのシャーディング type ShardedTokenBucket struct { buckets []*TokenBucket numShards int } func NewShardedTokenBucket(capacity, rate, numShards int) *ShardedTokenBucket { buckets := make([]*TokenBucket, numShards) for i := 0; i < numShards; i++ { // 各シャードは、合計容量とレートの均等なシェアを取得します buckets[i] = NewTokenBucket(capacity/numShards, rate/numShards) } return &ShardedTokenBucket{ buckets: buckets, numShards: numShards, } } func (stb *ShardedTokenBucket) Take(tokens int) bool { // ハッシュ(リクエストIDなど)に基づいてシャードを選択します shard := 0 // 実際には、適切なハッシュ関数を使用します return stb.buckets[shard].Take(tokens) }
トークンバケットを複数のシャードに分割することにより、mutexの競合を減らし、同時実行環境でのパフォーマンスを向上させることができます。
精度対効率
この記事で提示されている実装では、各操作でトークン数を再計算します。これにより、高精度が得られますが、非常にスループットの高いシステムではパフォーマンスに影響を与える可能性があります。
代替アプローチは、バックグラウンドゴルーチンを使用して、定期的にトークンをバケットに追加することです。
// 例: バックグラウンドゴルーチンを使用してトークンを追加する func startRefillGoroutine(bucket *TokenBucket, interval time.Duration) { go func() { ticker := time.NewTicker(interval) defer ticker.Stop() for range ticker.C { bucket.mutex.Lock() // 間隔とレートに基づいてトークンを追加します tokensToAdd := int(interval.Seconds() * float64(bucket.rate)) bucket.tokens = min(bucket.tokens+tokensToAdd, bucket.capacity) bucket.mutex.Unlock() } }() }
このアプローチはより効率的ですが、トークンは継続的ではなくバッチで追加されるため、いくつかの非精度が生じます。
結論
トークンバケットアルゴリズムは、幅広いアプリケーションでレート制限とトラフィックシェーピングを実装するための柔軟で効率的な方法を提供します。定常トラフィックと突然のバーストの両方を処理できるため、トラフィックパターンが予測できない現実世界のシステムで特に価値があります。
この記事では、トークンバケットアルゴリズムのコアコンセプトを探求し、Goで実装し、さまざまな拡張機能と最適化を検討しました。また、APIレート制限からネットワークトラフィックシェーピングまで、実用的なアプリケーションについても見てきました。
独自のアプリケーションでトークンバケットを実装する場合は、次のことを考慮することを忘れないでください。
- ユースケースに適した容量とトークン生成レート
- トークンが利用できない場合にリクエストを処理する方法(拒否、キュー、または遅延)
- 同時実行環境でのスレッドの安全性とパフォーマンス
- システムの状態に基づいてモニタリングと動的調整
Leapcell:最高のサーバーレスWebホスティング
最後に、Goサービスをデプロイするための優れたプラットフォームをお勧めします:Leapcell
🚀 お気に入りの言語で構築
JavaScript、Python、Go、またはRustで簡単に開発できます。
🌍 無制限のプロジェクトを無料でデプロイ
使用した分だけ支払います—リクエストも料金もありません。
⚡ 従量課金制、隠れたコストなし
アイドル料金はなく、シームレスなスケーラビリティだけです。
🔹 Twitterでフォローしてください:@LeapcellHQ