Goでイベントバスを構築する方法
Grace Collins
Solutions Engineer · Leapcell

Preface
今日のマイクロサービスと分散システムが普及している状況において、イベント駆動型アーキテクチャ(EDA)は重要な役割を果たします。このアーキテクチャ設計により、サービスは従来のような直接的なインターフェース呼び出しの代わりに、イベントを介して同期または非同期で通信できます。イベントベースのインタラクションモードは、サービス間の疎結合を促進し、システムの拡張性を向上させます。
パブリッシュ/サブスクライブパターンは、イベント駆動型アーキテクチャを実装する1つの方法です。これにより、システムのさまざまなコンポーネントまたはサービスがイベントをパブリッシュし、他のコンポーネントまたはサービスがこれらのイベントをサブスクライブし、イベントの内容に基づいて応答できます。ほとんどの開発者はこのパターンに精通しているでしょう。一般的な技術的実装には、メッセージキュー(MQ)やRedisのパブリッシュ/サブスクライブ(PUB/SUB)機能などがあります。
Goでは、その強力なチャネルと並行処理メカニズムを活用して、パブリッシュ/サブスクライブパターンを実装できます。この記事では、Goでシンプルなイベントバスを実装する方法を詳しく説明します。これは、パブリッシュ/サブスクライブパターンの具体的な実現です。
Event Bus
イベントバスは、パブリッシュ/サブスクライブパターンの具体的な実装です。パブリッシャーとサブスクライバーの間にあるミドルウェアとして、イベントの配信と配布を管理し、イベントがパブリッシャーからサブスクライバーにスムーズに伝送されるようにします。
イベントバスの主な利点は次のとおりです。
- 疎結合性: サービスは直接通信する必要はなく、代わりにイベントを介してインタラクトするため、サービス間の依存関係が軽減されます。
- 非同期処理: イベントは非同期で処理できるため、システムの応答性とパフォーマンスが向上します。
- 拡張性: 新しいサブスクライバーは、既存のパブリッシャーコードを変更せずに、イベントを簡単にサブスクライブできます。
- フォールトアイソレーション: イベント処理の失敗は、他のサービスの通常の動作に直接影響しません。
Code Implementation of the Event Bus
次に、Goでシンプルなイベントバスを実装する方法を紹介します。これには、次の主要な機能が含まれます。
- Publish: システム内のさまざまなサービスがイベントを送信できます。
- Subscribe: 関心のあるサービスが特定のタイプのイベントをサブスクライブして受信できます。
- Unsubscribe: サービスが以前にサブスクライブしたイベントを削除できます。
Event Data Structure Definition
type Event struct { Payload any }
Event
はイベントをカプセル化する構造体で、Payload
はイベントのコンテキスト情報を表し、その型はany
です。
Event Bus Definition
type ( EventChan chan Event ) type EventBus struct { mu sync.RWMutex subscribers map[string][]EventChan } func NewEventBus() *EventBus { return &EventBus{ subscribers: make(map[string][]EventChan), } }
EventChan
は型エイリアスで、Event
構造体を渡すためのチャネルとして定義されています:chan Event
。
EventBus
はイベントバスの定義です。これには2つのプロパティが含まれています。
- mu: 読み書きミューテックス(
sync.RWMutex
)。以下のsubscribers
の同時読み取り/書き込みの安全性を確保するために使用されます。 - subscribers: キーがサブスクリプショントピックを表す文字列で、値が
EventChan
のスライスであるマップ。このプロパティは、トピックごとのすべてのサブスクライバーを格納するために使用されます。各サブスクライバーは、独自のEventChan
を通じてイベントを受信します。
NewEventBus
関数は、新しいEventBus
インスタンスを作成するために使用されます。
Event Bus Method Implementation
イベントバスは、イベントのパブリッシュ(Publish
)、イベントのサブスクライブ(Subscribe
)、イベントのサブスクライブ解除(Unsubscribe
)の3つのメソッドを実装します。
Publish
func (eb *EventBus) Publish(topic string, event Event) { eb.mu.RLock() defer eb.mu.RUnlock() // Copy a new subscriber list to avoid modifying the list while publishing subscribers := append([]EventChan{}, eb.subscribers[topic]...) go func() { for _, subscriber := range subscribers { subscriber <- event } }() }
Publish
メソッドは、イベントをパブリッシュするために使用されます。このメソッドは、topic
(件名)とevent
(カプセル化されたイベントオブジェクト)の2つのパラメータを受け取ります。
Publish
の実装では、最初にmu
プロパティを介して読み取りロックを取得し、後続のsubscribers
に対する操作が同時ルーチンで安全であることを保証します。次に、トピックの現在のサブスクライバーリストのコピーが作成されます。新しいgoroutineが開始され、コピーされたサブスクライバーリストを反復処理し、各サブスクライバーにチャネルを介してイベントを送信します。これらの操作が完了すると、読み取りロックが解除されます。
サブスクライバーリストのコピーを作成する理由
回答:サブスクライバーリストをコピーすると、イベントの送信中にデータの整合性と安定性が保証されます。チャネルへのデータの送信は新しいgoroutineで行われるため、データが送信されるまでに読み取りロックがすでに解除されており、元のサブスクライバーリストはサブスクライバーの追加または削除によって変更されている可能性があります。元のサブスクライバーリストを直接使用すると、予期しないエラーが発生する可能性があります(たとえば、閉じられたチャネルにデータを送信するとパニックが発生する可能性があります)。
Subscribe
func (eb *EventBus) Subscribe(topic string) EventChan { eb.mu.Lock() defer eb.mu.Unlock() ch := make(EventChan) eb.subscribers[topic] = append(eb.subscribers[topic], ch) return ch }
Subscribe
メソッドは、特定のトピックのイベントをサブスクライブするために使用されます。topic
パラメータを受け入れます。これは、サブスクライブするトピックを指定します。このメソッドを使用すると、トピックのイベントを受信するEventChan
チャネルを取得できます。
Subscribe
の実装では、最初にmu
プロパティを介して書き込みロックを取得し、今後のsubscribers
に対する読み取り/書き込み操作が同時ルーチンで安全であることを保証します。次に、新しいEventChan
チャネルch
が作成され、関連するトピックのサブスクライバースライスに追加されます。これらの操作が完了すると、書き込みロックが解除されます。
Unsubscribe
func (eb *EventBus) Unsubscribe(topic string, ch EventChan) { eb.mu.Lock() defer eb.mu.Unlock() if subscribers, ok := eb.subscribers[topic]; ok { for i, subscriber := range subscribers { if ch == subscriber { eb.subscribers[topic] = append(subscribers[:i], subscribers[i+1:]...) close(ch) // Drain the channel for range ch { } return } } } }
Unsubscribe
メソッドは、イベントのサブスクライブを解除するために使用されます。topic
(サブスクライブされたトピック)とch
(発行されたチャネル)の2つのパラメータを受け取ります。
Unsubscribe
メソッド内では、最初にmu
プロパティを介して書き込みロックを取得し、今後のsubscribers
に対する操作の同時読み取り/書き込みの安全性を保証します。次に、トピックに対応するサブスクライバーがいるかどうかを確認します。存在する場合は、そのトピックのサブスクライバースライスをトラバースし、ch
に一致するチャネルを見つけ、サブスクライバースライスから削除して、チャネルを閉じます。次に、チャネルがドレインされます。これらの操作の後、書き込みロックが解除されます。
Usage Example
package main import ( "fmt" "time" "demo-eventbus" ) func main() { eventBus := eventbus.NewEventBus() // Subscribe to the "post" topic event subscribe := eventBus.Subscribe("post") go func() { for event := range subscribe { fmt.Println(event.Payload) } }() eventBus.Publish("post", eventbus.Event{Payload: map[string]any{ "postId": 1, "title": "Welcome to Leapcell", "author": "Leapcell", }}) // Topic with no subscribers eventBus.Publish("pay", eventbus.Event{Payload: "pay"}) time.Sleep(time.Second * 2) // Unsubscribe from the "post" topic event eventBus.Unsubscribe("post", subscribe) }
Suggestions for Extensions
この記事で実装されているイベントバスは比較的単純です。イベントバスの柔軟性、信頼性、および使いやすさを向上させる場合は、次の方法で拡張することを検討できます。
- イベントの永続化: システムクラッシュ後に未処理のイベントを回復できるように、イベントの永続的なストレージを実装します。
- ワイルドカードとパターンマッチングのサブスクリプション: 単一の特定のトピックだけでなく、ワイルドカードまたは正規表現を使用して、関連するトピックのグループをサブスクライブできるようにします。
- 負荷分散とメッセージ配信戦略: 複数のサブスクライバー間でイベントを配信して、負荷分散を実現します。
- プラグインのサポート: ログ記録、メッセージフィルタリング、変換など、プラグインを介して機能拡張を有効にします。
Conclusion
この記事では、Goでシンプルなイベントバスを実装するプロセスを徹底的に検証します。チャネルや並行処理メカニズムなどのGoの強力な機能を利用することで、パブリッシュ/サブスクライブパターンを簡単に実装できます。
この記事では、最初に疎結合性、非同期処理、拡張性、障害分離など、イベントバスの利点を紹介します。次に、イベントデータ構造とイベントバス構造を定義する方法、およびイベントをパブリッシュ、サブスクライブ、およびサブスクライブ解除するためのメソッドを実装する方法について詳しく説明します。最後に、イベントの永続化、ワイルドカードサブスクリプション、負荷分散、プラグインサポートなど、いくつかの潜在的な拡張の方向性を提案し、イベントバスの柔軟性と機能を強化します。
この記事を読むことで、Goでシンプルでありながら強力なイベントバスを実装する方法を学び、可能な要件に応じて拡張できます。
We are Leapcell, your top choice for hosting Go projects.
Leapcell is the Next-Gen Serverless Platform for Web Hosting, Async Tasks, and Redis:
Multi-Language Support
- Develop with Node.js, Python, Go, or Rust.
Deploy unlimited projects for free
- pay only for usage — no requests, no charges.
Unbeatable Cost Efficiency
- Pay-as-you-go with no idle charges.
- Example: $25 supports 6.94M requests at a 60ms average response time.
Streamlined Developer Experience
- Intuitive UI for effortless setup.
- Fully automated CI/CD pipelines and GitOps integration.
- Real-time metrics and logging for actionable insights.
Effortless Scalability and High Performance
- Auto-scaling to handle high concurrency with ease.
- Zero operational overhead — just focus on building.
Explore more in the Documentation!
Follow us on X: @LeapcellHQ