Python を使用した CQRS による高スケーラブルなビジネス システムの構築
Ethan Miller
Product Engineer · Leapcell

はじめに
複雑なビジネスアプリケーションの世界では、データを効果的に管理することが最も重要です。システムが拡大し、機能が増えるにつれて、データ処理メカニズムへの要求は激化します。従来のモノリシックアーキテクチャは、しばしば追いつくのに苦労し、パフォーマンスのボトルネック、複雑さの増加、およびアジリティの維持における課題につながります。最新の e コマースプラットフォームが、何百万ものユーザーが製品カタログを閲覧できるようにしながら、毎分数千件の注文を処理する必要があるシナリオを想像してみてください。単一のデータベースは、しばしばボトルネックになります。ここで、コマンドクエリ機能分離(CQRS)のようなパターンが強力な代替手段を提供します。CQRS は、データとの対話を根本的に見直し、複雑なビジネスシステムのパフォーマンス、スケーラビリティ、および保守性を大幅に向上させることができる専門的なアプローチを提供します。Python を使用して CQRS を活用し、真に回復力があり、高性能なアプリケーションを構築する方法を掘り下げてみましょう。
CQRS の基礎の理解
実践に入る前に、CQRS の基盤となる、その成功した実装に不可欠なコアコンセプトを明確にしましょう。
コマンドクエリ機能分離(CQRS): CQRS は、データ変更(コマンド)の処理責任と、データ取得(クエリ)の処理責任を分離するパターンです。両方の操作を実行する単一のモデルまたはインターフェースを持つのではなく、明確なモデルがあります。
- コマンド: システムの状態変更の意図を表すオブジェクト。これらは「何が起こるべきか」に焦点を当てた命令的なものです。
- クエリ: データの取得の意図を表すオブジェクト。これらは「どのデータが必要か」に焦点を当てた宣言的なものです。
イベントソーシング(CQRS と組み合わせて使用されることが多い): 厳密には必須ではありませんが、イベントソーシングは CQRS と組み合わせてよく使用されるパターンです。集約の現在の状態のみを保存するのではなく、イベントソーシングは、変更されていないイベントとして、追加専用ログにすべての状態変更を永続化します。次に、これらのイベントを再生することで現在の状態が再構築されます。これにより、完全な監査証跡が提供され、タイムトラベルやデバッグなどの強力な機能が可能になります。
ドメイン駆動設計(DDD): CQRS およびイベントソーシングは、DDD の原則の強力な理解からしばしば恩恵を受けます。DDD は、ドメインエキスパートからの入力を受けて、ドメインに合わせてソフトウェアをモデリングすることを強調します。集約、エンティティ、値オブジェクトなどの概念は、コマンドとイベントの明確な境界を定義するのに役立ちます。
CQRS が解決する問題
典型的な作成、読み取り、更新、削除(CRUD)アプリケーションでは、読み取り操作と書き込み操作はしばしば同じデータモデルとインフラストラクチャを共有します。これは単純なアプリケーションではうまく機能しますが、複雑なシステムでは、この統合されたアプローチはいくつかの課題につながる可能性があります。
- パフォーマンスのボトルネック: 読み取りモデルは、多くの場合、速度と非正規化のために最適化され、書き込みモデルは一貫性と正規化を優先します。単一のモデルを使用すると妥協が生じます。
- スケーラビリティの問題: 読み取りは書き込みを大幅に上回ることがよくあります。高負荷の読み取りと高負荷の書き込みの両方を効率的に処理するために、結合された読み取り/書き込みデータベースをスケーリングすることは困難で費用がかかる場合があります。
- 複雑さ: 読み取り(複雑なレポート、検索など)と書き込み(トランザクション整合性など)に対する異なるビジネス要件により、単一のモデルが過度に複雑になり、管理が困難になる可能性があります。
- セキュリティ上の懸念: 読み取り操作に適用される詳細なセキュリティは、書き込み操作とは大きく異なる場合があります。
CQRS の仕組みと Python での実装
CQRS の中心的な原則は単純です。コマンドとクエリには別々のパスがあります。
-
コマンドパス(書き込み側):
- コマンドはクライアント(例:ユーザーインターフェース、別のサービス)によって発行されます。
- コマンドハンドラーがコマンドを受信します。
- コマンドハンドラーは、関連する集約(書き込みモデルデータベース、イベントソーシングを使用している場合はイベントストア)をロードします。
- 集約は、ビジネスロジックを実行し、コマンドを検証し、ドメインイベントを発行します。
- これらのイベントはイベントストアに永続化されます。
- オプションで、イベントはイベントバスに公開されます。
-
クエリパス(読み取り側):
- クエリはクライアントによって発行されます。
- クエリハンドラーがクエリを受信します。
- クエリハンドラーは、読み取りモデルデータベースから直接データを取得します。このデータベースは、通常、非正規化され、高速な読み取りのために特別に最適化されています(例:ドキュメントデータベース、検索インデックス、または高度に最適化されたリレーショナルデータベースビュー)。
- データがクライアントに返されます。
実際のマジックは、読み取りモデルがどのように最新の状態に保たれるかです。イベントがイベントバスに公開されると(コマンドパスから)、**読み取りモデルプロジェクター(またはイベントハンドラー)**がこれらのイベントを消費し、非正規化された読み取りモデルデータベースを更新します。この非同期更新により、書き込み側は読み取り側から分離されたままになり、独立したスケーリングと最適化が可能になります。
Order
システムの簡単な Python の例で示します。
import abc import uuid from datetime import datetime from typing import Dict, List, Optional, Type, TypeVar # --- 1. ドメインイベント(何が起こったかの不変の事実) --- class DomainEvent: def __init__(self, aggregate_id: str, timestamp: datetime = None): self.aggregate_id = aggregate_id self.timestamp = timestamp or datetime.utcnow() class OrderCreated(DomainEvent): def __init__(self, order_id: str, customer_id: str, items: Dict[str, int], total_amount: float): super().__init__(order_id) self.customer_id = customer_id self.items = items self.total_amount = total_amount class OrderItemQuantityAdjusted(DomainEvent): def __init__(self, order_id: str, item_name: str, new_quantity: int): super().__init__(order_id) self.item_name = item_name self.new_quantity = new_quantity class OrderShipped(DomainEvent): def __init__(self, order_id: str): super().__init__(order_id) # --- 2. 集約(イベントを介したビジネスロジックと状態変更) --- class OrderAggregate: def __init__(self, order_id: str): self.id = order_id self.customer_id: Optional[str] = None self.items: Dict[str, int] = {} self.total_amount: float = 0.0 self.status: str = "PENDING" self._uncommitted_events: List[DomainEvent] = [] def apply(self, event: DomainEvent): """イベントを集約の状態に適用します。""" if isinstance(event, OrderCreated): self.customer_id = event.customer_id self.items = event.items self.total_amount = event.total_amount self.status = "CREATED" elif isinstance(event, OrderItemQuantityAdjusted): if event.item_name in self.items: self.items[event.item_name] = event.new_quantity # 実際のシステムでは total_amount を再計算 else: raise ValueError(f"Item {event.item_name} not in order.") elif isinstance(event, OrderShipped): self.status = "SHIPPED" # 必要に応じて、他のイベントのイベントハンドラーを追加 @classmethod def create(cls, customer_id: str, items: Dict[str, int], total_amount: float): order_id = str(uuid.uuid4()) aggregate = cls(order_id) event = OrderCreated(order_id, customer_id, items, total_amount) aggregate.apply(event) aggregate._uncommitted_events.append(event) return aggregate def adjust_item_quantity(self, item_name: str, new_quantity: int): if self.status != "CREATED": raise ValueError("Cannot adjust items for an order that is not in 'CREATED' status.") event = OrderItemQuantityAdjusted(self.id, item_name, new_quantity) self.apply(event) self._uncommitted_events.append(event) def ship_order(self): if self.status != "CREATED": raise ValueError("Only 'CREATED' orders can be shipped.") event = OrderShipped(self.id) self.apply(event) self._uncommitted_events.append(event) def get_uncommitted_events(self) -> List[DomainEvent]: return self._uncommitted_events def clear_uncommitted_events(self): self._uncommitted_events = [] # --- 3. コマンド(状態変更のリクエスト) --- class Command: pass class CreateOrderCommand(Command): def __init__(self, customer_id: str, items: Dict[str, int], total_amount: float): self.customer_id = customer_id self.items = items self.total_amount = total_amount class AdjustOrderItemQuantityCommand(Command): def __init__(self, order_id: str, item_name: str, new_quantity: int): self.order_id = order_id self.item_name = item_name self.new_quantity = new_quantity class ShipOrderCommand(Command): def __init__(self, order_id: str): self.order_id = order_id # --- 4. コマンドハンドラー(コマンドを処理し、イベントを生成) --- class CommandHandler(abc.ABC): @abc.abstractmethod def handle(self, command: Command) -> None: pass # デモンストレーション用の架空のイベントストアとイベントバス class EventStore: def __init__(self): self.events: Dict[str, List[DomainEvent]] = {} def save_events(self, aggregate_id: str, new_events: List[DomainEvent]): if aggregate_id not in self.events: self.events[aggregate_id] = [] self.events[aggregate_id].extend(new_events) print(f"Saved {len(new_events)} events for aggregate {aggregate_id}. Total: {len(self.events[aggregate_id])}") def get_events_for_aggregate(self, aggregate_id: str) -> List[DomainEvent]: return self.events.get(aggregate_id, []) class EventBus: def __init__(self): self._handlers: Dict[Type[DomainEvent], List[callable]] = {} def subscribe(self, event_type: Type[DomainEvent], handler: callable): if event_type not in self._handlers: self._handlers[event_type] = [] self._handlers[event_type].append(handler) def publish(self, event: DomainEvent): for handler in self._handlers.get(type(event), []): handler(event) print(f"Published event: {type(event).__name__} for aggregate {event.aggregate_id}") class CreateOrderCommandHandler(CommandHandler): def __init__(self, event_store: EventStore, event_bus: EventBus): self.event_store = event_store self.event_bus = event_bus def handle(self, command: CreateOrderCommand): order = OrderAggregate.create(command.customer_id, command.items, command.total_amount) self.event_store.save_events(order.id, order.get_uncommitted_events()) for event in order.get_uncommitted_events(): self.event_bus.publish(event) order.clear_uncommitted_events() return order.id class AdjustOrderItemQuantityCommandHandler(CommandHandler): def __init__(self, event_store: EventStore, event_bus: EventBus): self.event_store = event_store self.event_bus = event_bus def handle(self, command: AdjustOrderItemQuantityCommand): events = self.event_store.get_events_for_aggregate(command.order_id) order = OrderAggregate(command.order_id) for event in events: order.apply(event) # 状態を再構築 order.adjust_item_quantity(command.item_name, command.new_quantity) self.event_store.save_events(order.id, order.get_uncommitted_events()) for event in order.get_uncommitted_events(): self.event_bus.publish(event) order.clear_uncommitted_events() class ShipOrderCommandHandler(CommandHandler): def __init__(self, event_store: EventStore, event_bus: EventBus): self.event_store = event_store self.event_bus = event_bus def handle(self, command: ShipOrderCommand): events = self.event_store.get_events_for_aggregate(command.order_id) order = OrderAggregate(command.order_id) for event in events: order.apply(event) # 状態を再構築 order.ship_order() self.event_store.save_events(order.id, order.get_uncommitted_events()) for event in order.get_uncommitted_events(): self.event_bus.publish(event) order.clear_uncommitted_events() # --- 5. 読み取りモデル(非正規化され、クエリ用に最適化) --- class OrderReadModel: def __init__(self, order_id: str, customer_id: str, items: Dict[str, int], total_amount: float, status: str, created_at: datetime): self.id = order_id self.customer_id = customer_id self.items = items self.total_amount = total_amount self.status = status self.created_at = created_at # 架空の読み取りモデルデータベース class OrderReadModelDatabase: def __init__(self): self.orders: Dict[str, OrderReadModel] = {} def save(self, read_model: OrderReadModel): self.orders[read_model.id] = read_model print(f"Read model updated for order {read_model.id} (Status: {read_model.status})") def get_by_id(self, order_id: str) -> Optional[OrderReadModel]: return self.orders.get(order_id) def get_all_pending_orders(self) -> List[OrderReadModel]: return [order for order in self.orders.values() if order.status == "CREATED"] # --- 6. 読み取りモデルプロジェクター(イベントから読み取りモデルを更新) --- class OrderReadModelProjector: def __init__(self, read_model_db: OrderReadModelDatabase): self.read_model_db = read_model_db def handle_order_created(self, event: OrderCreated): read_model = OrderReadModel( order_id=event.aggregate_id, customer_id=event.customer_id, items=event.items, total_amount=event.total_amount, status="CREATED", # 初期ステータス created_at=event.timestamp ) self.read_model_db.save(read_model) def handle_order_item_quantity_adjusted(self, event: OrderItemQuantityAdjusted): existing_read_model = self.read_model_db.get_by_id(event.aggregate_id) if existing_read_model: existing_read_model.items[event.item_name] = event.new_quantity # 簡単のため、実際のシステムでは total_amount を再計算 self.read_model_db.save(existing_read_model) def handle_order_shipped(self, event: OrderShipped): existing_read_model = self.read_model_db.get_by_id(event.aggregate_id) if existing_read_model: existing_read_model.status = "SHIPPED" self.read_model_db.save(existing_read_model) # --- 7. クエリ(読み取りモデルからデータを取得) --- class GetOrderByIdQuery: def __init__(self, order_id: str): self.order_id = order_id class GetPendingOrdersQuery: pass class GetOrderByIdQueryHandler: def __init__(self, read_model_db: OrderReadModelDatabase): self.read_model_db = read_model_db def handle(self, query: GetOrderByIdQuery) -> Optional[OrderReadModel]: return self.read_model_db.get_by_id(query.order_id) class GetPendingOrdersQueryHandler: def __init__(self, read_model_db: OrderReadModelDatabase): self.read_model_db = read_model_db def handle(self, query: GetPendingOrdersQuery) -> List[OrderReadModel]: return self.read_model_db.get_all_pending_orders() # --- オーケストレーション --- class Application: def __init__(self): self.event_store = EventStore() self.event_bus = EventBus() self.read_model_db = OrderReadModelDatabase() # コマンドハンドラーを登録 self.create_order_cmd_handler = CreateOrderCommandHandler(self.event_store, self.event_bus) self.adjust_item_cmd_handler = AdjustOrderItemQuantityCommandHandler(self.event_store, self.event_bus) self.ship_order_cmd_handler = ShipOrderCommandHandler(self.event_store, self.event_bus) # クエリハンドラーを登録 self.get_order_by_id_query_handler = GetOrderByIdQueryHandler(self.read_model_db) self.get_pending_orders_query_handler = GetPendingOrdersQueryHandler(self.read_model_db) # 読み取りモデルを再構築するためにイベントハンドラー(プロジェクター)を登録 self.order_projector = OrderReadModelProjector(self.read_model_db) self.event_bus.subscribe(OrderCreated, self.order_projector.handle_order_created) self.event_bus.subscribe(OrderItemQuantityAdjusted, self.order_projector.handle_order_item_quantity_adjusted) self.event_bus.subscribe(OrderShipped, self.order_projector.handle_order_shipped) def execute_command(self, command: Command): if isinstance(command, CreateOrderCommand): return self.create_order_cmd_handler.handle(command) elif isinstance(command, AdjustOrderItemQuantityCommand): self.adjust_item_cmd_handler.handle(command) elif isinstance(command, ShipOrderCommand): self.ship_order_cmd_handler.handle(command) else: raise ValueError(f"Unknown command: {type(command)}") def execute_query(self, query): if isinstance(query, GetOrderByIdQuery): return self.get_order_by_id_query_handler.handle(query) elif isinstance(query, GetPendingOrdersQuery): return self.get_pending_orders_query_handler.handle(query) else: raise ValueError(f"Unknown query: {type(query)}") # --- クライアントの使用 --- if __name__ == "__main__": app = Application() # --- コマンドサイド --- print("--- Executing Commands ---") create_order_cmd = CreateOrderCommand( customer_id="cust-123", items={"Laptop": 1, "Mouse": 1}, total_amount=1200.00 ) order_id = app.execute_command(create_order_cmd) print(f"New Order Created with ID: {order_id}") adjust_item_cmd = AdjustOrderItemQuantityCommand( order_id=order_id, item_name="Mouse", new_quantity=2 ) app.execute_command(adjust_item_cmd) ship_order_cmd = ShipOrderCommand(order_id=order_id) app.execute_command(ship_order_cmd) # --- クエリサイド --- print("\n--- Executing Queries ---") # 特定の注文をクエリ query_order_by_id = GetOrderByIdQuery(order_id=order_id) order_details = app.execute_query(query_order_by_id) if order_details: print(f"Query Result for Order {order_details.id}:") print(f" Customer: {order_details.customer_id}") print(f" Items: {order_details.items}") print(f" Total: {order_details.total_amount}") print(f" Status: {order_details.status}") else: print(f"Order {order_id} not found in read model.") # 保留中の注文をデモンストレーションするために別の注文を作成 new_order_id = app.execute_command(CreateOrderCommand("cust-456", {"Book": 3}, 75.00)) print(f"Another Order Created with ID: {new_order_id}") # 保留中の注文をクエリ pending_orders = app.execute_query(GetPendingOrdersQuery()) print("\nPending Orders:") for order in pending_orders: print(f" Order ID: {order.id}, Customer: {order.customer_id}, Status: {order.status}")
コード例の説明:
DomainEvent
: システムで発生したことの不変の事実を表します(例:OrderCreated
、OrderItemQuantityAdjusted
)。OrderAggregate
: 書き込みモデルの核となります。ビジネスルールの強制とDomainEvent
の生成を担当します。状態を直接変更するのではなく、イベントをapply
して状態を再構築します。_uncommitted_events
のリストを保持します。Command
: 意図を表現する単純なデータ構造です(例:CreateOrderCommand
、ShipOrderCommand
)。CommandHandler
: コマンドを受け取り、集約をロードし(EventStore
の過去のイベントから状態を再構築)、集約に対して必要なアクションを実行し、次に新しく生成されたイベントをEventStore
に永続化してEventBus
に公開します。EventStore
: イベントの単純なインメモリストアです。実際のアプリケーションでは、これは永続データベース(PostgreSQL、DynamoDB、または専用のイベントストアなど)になります。EventBus
: 登録されたハンドラーにイベントを公開する単純なディスパッチャーです。本番システムでは、これは Kafka、RabbitMQ、または AWS SNS/SQS になる可能性があります。OrderReadModel
: 表示およびクエリ用に最適化された、注文データの非正規化されたフラットな投影です。OrderReadModelDatabase
: 読み取りモデルの単純なインメモリデータベースです。通常、これは NoSQL データベース(MongoDB、Elasticsearch)、高度に非正規化されたリレーショナルテーブル、または検索インデックスになります。OrderReadModelProjector
:EventBus
からDomainEvent
を消費し、OrderReadModelDatabase
を更新します。ここで非正規化と読み取りモデルの最適化が行われます。Query
: 情報の要求を表す単純なデータ構造です(例:GetOrderByIdQuery
)。QueryHandler
: 複雑なビジネスロジックなしに、直接OrderReadModelDatabase
からデータを取得します。
このセットアップは、関心を明確に分離します。OrderAggregate
は書き込み中のビジネスルールの保証と整合性にのみ責任を持ち、ReadModelProjector
はクエリ用に最適化されたビューを独立して構築します。
アプリケーションシナリオ
CQRS はすべての問題の万能薬ではありませんが、特定の複雑なシナリオで輝きます。
- 高性能な読み書きシステム: 読み書きの負荷が大幅に異なるシステムで、それぞれを独立してスケーリングすることが不可欠です(例:ソーシャルメディアフィード、e コマースプラットフォーム、IoT データ処理)。
- 複雑なドメインロジック: 書き込みモデルが洗練されたビジネスルール強制と整合性チェックを必要とする場合(イベントソーシングと組み合わされることが多い)。
- データレポートと分析: トランザクションパフォーマンスに影響を与えることなく、複雑なレポートクエリ用に最適化された読み取りモデルの構築。
- 外部システムとの統合: さまざまなコンシューマーに合わせた読み取りモデルの提供、またはサードパーティの分析ツールとの統合。
- イベント駆動型アーキテクチャ: サービスがイベントを介して通信するマイクロサービスおよびイベント駆動型パターンとの自然な適合。
- 監査とデバッグ: CQRS と一緒に使用されるイベントソーシングは、監査、デバッグ、および過去の状態の再現に不可欠な、すべての変更の完全で時間的な記録を提供します。
結論
Python でコマンドクエリ機能分離(CQRS)を実践することは、複雑でスケーラブルで保守可能なビジネスシステムを構築するための強力なアーキテクチャパターンを提供します。データを変更するモデルとクエリするモデルを明示的に分離することで、独立した最適化、パフォーマンスの向上、および回復力の強化の機会を解き放ちます。複雑さのレベルを導入しますが、特定の要求の厳しいシナリオにおけるメリット(スケーラビリティの向上からドメインモデリングの明確化まで)により、CQRS は現代の開発者のツールキットにおいて貴重なツールとなります。CQRS を採用して、パフォーマンスが高いだけでなく、絶えず変化する要件に合わせて進化できるビジネスアプリケーションを構築してください。