단일 데이터베이스 테이블을 사용한 이벤트 소싱: 단순화된 접근 방식
Ethan Miller
Product Engineer · Leapcell

소개
현대 소프트웨어 아키텍처의 발전하는 환경에서 강력하고 감사 가능하며 확장 가능한 시스템의 필요성은 매우 중요합니다. 애플리케이션 상태의 모든 변경 사항을 불변 이벤트 시퀀스로 기록하는 강력한 패턴인 이벤트 소싱이 상당한 주목을 받고 있습니다. 감사, 디버깅 및 복잡한 비즈니스 규칙을 가진 시스템 구축에 비할 데 없는 이점을 제공합니다. 전통적으로 이벤트 소싱을 구현하려면 종종 이벤트 로그 역할을 하는 Kafka와 같은 정교한 메시징 시스템이 사용됩니다. 매우 효과적이지만 Kafka를 통합하고 관리하면 상당한 운영 오버헤드와 복잡성이 초래될 수 있으며, 특히 소규모 프로젝트나 전담 DevOps 리소스가 없는 팀의 경우 더욱 그렇습니다. 이 글에서는 대안적인 단순화된 접근 방식, 즉 "이벤트 로그"로서 단일 관계형 데이터베이스 테이블만 사용하여 이벤트 소싱을 구현하는 방법에 대해 알아봅니다. 이 방법은 이벤트 소싱을 민주화하고 접근 가능하게 만드는 것을 목표로 하며, 외부 메시지 브로커의 오버헤드 없이 핵심 이점을 활용할 수 있음을 증명합니다. 이는 운영 단순성이 우선시될 때 특히 매력적인 접근 방식입니다.
핵심 개념 이해
구현으로 들어가기 전에 이벤트 소싱을 이해하는 데 중요한 몇 가지 기본 개념을 명확히 하겠습니다.
- 이벤트 소싱: 이 아키텍처 패턴은 엔터티의 현재 상태를 저장하는 대신 상태를 변경하는 모든 변경 사항을 불변 이벤트로 저장합니다. 그런 다음 현재 상태는 이러한 이벤트를 시간순으로 다시 재생하여 파생됩니다.
 - 이벤트: 과거에 발생한 일에 대한 기록입니다. 이벤트는 불변의 사실입니다. 과거 시제로 명명됩니다(예: 
OrderPlaced,InventoryAdjusted,UserRegistered). 각 이벤트는 일반적으로timestamp,event_type,aggregate_id,version과 같은 메타데이터와 특정 변경과 관련된 데이터를 나타내는payload를 포함합니다. - Aggregate: 단일 단위로 취급하여 데이터 변경을 처리할 수 있는 도메인 객체의 클러스터입니다. DDD(Domain-Driven Design)에서 일관성 경계입니다. 이벤트는 항상 특정 aggregate 인스턴스( 
aggregate_id로 식별됨)와 연결됩니다. - 상태 재구성: aggregate의 과거 이벤트를 다시 재생하여 현재 상태를 재구성하는 프로세스입니다.
 - 스냅샷: 특정 시점의 aggregate의 사전 계산된 상태입니다. 스냅샷은 특히 긴 이벤트 기록을 가진 aggregate의 경우 모든 기록 이벤트를 처음부터 다시 재생하는 것을 피하여 상태 재 구성을 최적화하는 데 사용됩니다.
 
단일 데이터베이스 테이블을 사용한 이벤트 소싱 구현
핵심 아이디어는 간단합니다. 모든 이벤트는 유형이나 속한 aggregate와 관계없이 단일 append-only 데이터베이스 테이블에 저장됩니다. 이 테이블은 우리의 중앙 집중식 권위 있는 이벤트 로그 역할을 합니다.
이벤트 로그 테이블을 위한 스키마 설계
모든 도메인 이벤트를 저장할 events 테이블을 고려해 보겠습니다.
CREATE TABLE events ( id SERIAL PRIMARY KEY, aggregate_id UUID NOT NULL, aggregate_type VARCHAR(255) NOT NULL, version INT NOT NULL, event_type VARCHAR(255) NOT NULL, payload JSONB NOT NULL, timestamp TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, metadata JSONB, UNIQUE (aggregate_id, version) -- Ensures event order and prevents duplicate events for an aggregate ); CREATE INDEX idx_events_aggregate_id ON events (aggregate_id); CREATE INDEX idx_events_timestamp ON events (timestamp);
id: 이벤트 자체의 고유 식별자(예: 직렬 기본 키).aggregate_id: 이 이벤트가 적용되는 aggregate 인스턴스의 ID. 필터링에 중요합니다.aggregate_type: aggregate의 유형 또는 이름(예: 'Order', 'UserAccount'). 쿼리 및 이벤트 스트림 이해에 유용합니다.version: 이 이벤트가 적용된 후의 aggregate 버전. 낙관적 동시성 제어에 중요합니다.event_type: 이벤트의 특정 유형(예: 'OrderCreated', 'ItemAddedToOrder').payload: 실제 이벤트 데이터를 저장하는 JSONB 열. JSONB는 효율적인 저장 및 쿼리 기능으로 인해 선호됩니다.timestamp: 이벤트가 발생한 시간.metadata: 추가 운영 메타데이터(예: 작업을 시작한user_id,ip_address)를 위한 선택적 JSONB 열.
핵심 작업
이벤트 소싱 시스템에는 두 가지 기본 작업이 있습니다.
- 이벤트 추가: 비즈니스 작업이 발생하면 새 이벤트가 생성되어 이벤트 로그에 추가됩니다.
 - Aggregate 상태 로드: aggregate에 대한 작업을 수행하려면 해당 이벤트를 다시 재생하여 현재 상태를 재구성해야 합니다.
 
Python의 개념적인 Order aggregate를 사용하여 이를 설명하겠습니다.
1. 이벤트 및 Aggregate 정의
import uuid import datetime import json from typing import List, Dict, Any, Type # --- 이벤트 정의 --- class Event: def __init__(self, aggregate_id: uuid.UUID, version: int, timestamp: datetime.datetime, payload: Dict[str, Any]): self.aggregate_id = aggregate_id self.version = version self.timestamp = timestamp self.payload = payload @property def event_type(self) -> str: return self.__class__.__name__ def to_dict(self) -> Dict[str, Any]: return { "aggregate_id": str(self.aggregate_id), "version": self.version, "event_type": self.event_type, "timestamp": self.timestamp.isoformat(), "payload": self.payload } class OrderCreated(Event): def __init__(self, aggregate_id: uuid.UUID, version: int, customer_id: uuid.UUID, order_items: List[Dict[str, Any]], timestamp: datetime.datetime = None): super().__init__(aggregate_id, version, timestamp or datetime.datetime.now(datetime.timezone.utc), { "customer_id": str(customer_id), "order_items": order_items }) class ItemAddedToOrder(Event): def __init__(self, aggregate_id: uuid.UUID, version: int, item_id: str, quantity: int, price: float, timestamp: datetime.datetime = None): super().__init__(aggregate_id, version, timestamp or datetime.datetime.now(datetime.timezone.utc), { "item_id": item_id, "quantity": quantity, "price": price }) class OrderShipped(Event): def __init__(self, aggregate_id: uuid.UUID, version: int, shipping_date: datetime.datetime, timestamp: datetime.datetime = None): super().__init__(aggregate_id, version, timestamp or datetime.datetime.now(datetime.timezone.utc), { "shipping_date": shipping_date.isoformat() }) # --- Aggregate 정의 --- class OrderAggregate: def __init__(self, order_id: uuid.UUID = None, current_version: int = 0): self.order_id = order_id or uuid.uuid4() self.customer_id: uuid.UUID = None self.items: List[Dict[str, Any]] = [] self.status: str = "PENDING" self.current_version = current_version def apply(self, event: Event): if isinstance(event, OrderCreated): self._apply_order_created(event) elif isinstance(event, ItemAddedToOrder): self._apply_item_added_to_order(event) elif isinstance(event, OrderShipped): self._apply_order_shipped(event) self.current_version = event.version # Apply event to self to update internal state def _apply_order_created(self, event: OrderCreated): self.customer_id = uuid.UUID(event.payload["customer_id"]) self.items = event.payload["order_items"] self.status = "CREATED" def _apply_item_added_to_order(self, event: ItemAddedToOrder): self.items.append({ "item_id": event.payload["item_id"], "quantity": event.payload["quantity"], "price": event.payload["price"] }) def _apply_order_shipped(self, event: OrderShipped): self.status = "SHIPPED" # --- 이벤트 생성 명령 --- def create_order(self, customer_id: uuid.UUID, order_items: List[Dict[str, Any]]) -> List[Event]: # Business logic goes here (e.g., validate items) new_version = self.current_version + 1 event = OrderCreated(self.order_id, new_version, customer_id, order_items) self.apply(event) # Apply to self for internal state update return [event] def add_item(self, item_id: str, quantity: int, price: float) -> List[Event]: if self.status != "CREATED": raise ValueError("Cannot add items to an order that is not in CREATED status.") if self._item_exists(item_id): raise ValueError("Item already exists in order. Use update item quantity instead.") new_version = self.current_version + 1 event = ItemAddedToOrder(self.order_id, new_version, item_id, quantity, price) self.apply(event) return [event] def ship_order(self) -> List[Event]: if self.status != "CREATED": # Assuming order needs to be created to be shipped raise ValueError("Cannot ship an order that is not in CREATED status.") new_version = self.current_version + 1 event = OrderShipped(self.order_id, new_version, datetime.datetime.now(datetime.timezone.utc)) self.apply(event) return [event] def _item_exists(self, item_id: str) -> bool: return any(item['item_id'] == item_id for item in self.items) def __repr__(self): return f"Order(id={self.order_id}, status={self.status}, version={self.current_version}, items={self.items})"
2. 이벤트 저장소 (영속성 계층)
이제 데이터베이스와 상호 작용하는 간단한 EventStore 클래스를 만들어 보겠습니다. PostgreSQL의 경우 psycopg2를 사용합니다.
import psycopg2 from psycopg2 import extras class EventStore: def __init__(self, db_config: Dict[str, str]): self.db_config = db_config self.event_type_map: Dict[str, Type[Event]] = { "OrderCreated": OrderCreated, "ItemAddedToOrder": ItemAddedToOrder, "OrderShipped": OrderShipped, # ... other event types } def _get_connection(self): return psycopg2.connect(**self.db_config) def save_events(self, aggregate_id: uuid.UUID, aggregate_type: str, expected_version: int, events: List[Event]): conn = None try: conn = self._get_connection() cur = conn.cursor() # Optimistic Concurrency Check: # Check if current version matches expected_version + events_to_be_saved - 1 # Or, more simply, count events for aggregate_id. This is crucial. # If `expected_version` is 0, it means it's a new aggregate. cur.execute( "SELECT version FROM events WHERE aggregate_id = %s ORDER BY version DESC LIMIT 1;", (str(aggregate_id),) ) latest_version_record = cur.fetchone() current_db_version = latest_version_record[0] if latest_version_record else 0 if current_db_version != expected_version: raise ConcurrencyException( f"Concurrency conflict for aggregate {aggregate_id}: " f"Expected version {expected_version}, but found {current_db_version}." ) for event in events: cur.execute( """ INSERT INTO events (aggregate_id, aggregate_type, version, event_type, payload, timestamp) VALUES (%s, %s, %s, %s, %s, %s); """, ( str(event.aggregate_id), aggregate_type, event.version, event.event_type, json.dumps(event.payload), event.timestamp ) ) conn.commit() except psycopg2.IntegrityError as e: conn.rollback() if "duplicate key value violates unique constraint" in str(e): raise ConcurrencyException( "Attempted to save an event with a duplicate aggregate_id and version. Concurrent modification likely." ) from e raise # Re-raise other integrity errors except Exception as e: if conn: conn.rollback() raise finally: if conn: cur.close() conn.close() def load_events(self, aggregate_id: uuid.UUID) -> List[Event]: conn = None events: List[Event] = [] try: conn = self._get_connection() cur = conn.cursor(cursor_factory=extras.DictCursor) # Use DictCursor for easier access by column name cur.execute( "SELECT aggregate_id, version, event_type, payload, timestamp FROM events WHERE aggregate_id = %s ORDER BY version ASC;", (str(aggregate_id),) ) for record in cur.fetchall(): event_type_name = record['event_type'] if event_type_name not in self.event_type_map: raise ValueError(f"Unknown event type: {event_type_name}") EventClass = self.event_type_map[event_type_name] # Reconstruct event object from database record # Note: Payload might be stored as string, need specific handling if custom deserialization needed event = EventClass( aggregate_id=uuid.UUID(record['aggregate_id']), version=record['version'], timestamp=record['timestamp'], **record['payload'] # Unpack payload dictionary directly to event constructor if applicable ) events.append(event) return events except Exception as e: raise finally: if conn: cur.close() conn.close() def get_aggregate_by_id(self, aggregate_id: uuid.UUID, aggregate_type: Type[OrderAggregate]) -> OrderAggregate: events = self.load_events(aggregate_id) if not events: return None # Aggregate not found aggregate = aggregate_type(aggregate_id=aggregate_id, current_version=0) # Initialize with ID for event in events: aggregate.apply(event) return aggregate class ConcurrencyException(Exception): """Custom exception for concurrency conflicts.""" pass
이벤트 재구성 참고: Event 기본 클래스는 payload를 직접 예상하고, OrderCreated 등 생성자는 속성을 직접 받습니다(예: customer_id). 실제 시스템의 경우 데이터베이스 페이로드 사전과 잘 정의된 이벤트 객체를 매핑하기 위한 보다 정교한 이벤트 직렬화/역직렬화 메커니즘이 있을 수 있습니다. 위의 예는 명확성을 위해 이를 단순화했습니다.
작업을 실행해 보겠습니다.
# 데이터베이스 구성 (실제 세부 정보로 바꾸세요) DB_CONFIG = { "host": "localhost", "database": "event_sourcing_db", "user": "your_user", "password": "your_password" } event_store = EventStore(DB_CONFIG) customer_id = uuid.uuid4() order_id = uuid.uuid4() try: # 1. 새 주문 생성 print(f"Creating a new order with ID: {order_id}") new_order = OrderAggregate(order_id=order_id, current_version=0) created_events = new_order.create_order( customer_id=customer_id, order_items=[{"item_id": "product-A", "quantity": 2, "price": 10.0}] ) event_store.save_events(new_order.order_id, "Order", 0, created_events) # 새 aggregate의 예상 버전은 0입니다. print(f"Order created. State: {new_order}") # 2. 주문을 로드하고 항목을 추가합니다. print(f"\nLoading order {order_id} to add an item...") loaded_order = event_store.get_aggregate_by_id(order_id, OrderAggregate) if loaded_order: print(f"Loaded order state: {loaded_order}") added_item_events = loaded_order.add_item("product-B", 1, 25.0) event_store.save_events(loaded_order.order_id, "Order", loaded_order.current_version - len(added_item_events), added_item_events) print(f"Item added. Current state: {loaded_order}") # 3. 주문을 다시 로드하고 배송합니다. print(f"\nLoading order {order_id} to ship it...") final_order = event_store.get_aggregate_by_id(order_id, OrderAggregate) if final_order: print(f"Loaded order state: {final_order}") shipped_events = final_order.ship_order() event_store.save_events(final_order.order_id, "Order", final_order.current_version - len(shipped_events), shipped_events) print(f"Order shipped. Final state: {final_order}") # 4. 동시성 충돌 시연 (선택적 실행) # 이 시뮬레이션은 약간 다른 시작 지점에서 두 개의 이벤트를 저장하려고 시도합니다. # 이를 실제로 보려면 별도의 프로세스/스레드에서 실행해야 합니다. # order_for_conflict_1 = event_store.get_aggregate_by_id(order_id, OrderAggregate) # order_for_conflict_2 = event_store.get_aggregate_by_id(order_id, OrderAggregate) # # 프로세스 1 (항목 추가 시도) # conflict_events_1 = order_for_conflict_1.add_item("product-C", 3, 5.0) # event_store.save_events(order_for_conflict_1.order_id, "Order", order_for_conflict_1.current_version - len(conflict_events_1), conflict_events_1) # print(f"\nConflict attempt 1 saved.") # # 프로세스 2 (이전 상태에서 다른 항목 추가 시도, 프로세스 1 저장 이전) # try: # conflict_events_2 = order_for_conflict_2.add_item("product-D", 1, 15.0) # event_store.save_events(order_for_conflict_2.order_id, "Order", order_for_conflict_2.current_version - len(conflict_events_2), conflict_events_2) # print(f"This should not be printed if concurrency works.") # except ConcurrencyException as e: # print(f"\nSuccessfully caught concurrency conflict: {e}") except ConcurrencyException as e: print(f"Application error due to concurrency: {e}") except Exception as e: print(f"An unexpected error occurred: {e}")
주요 고려 사항 및 모범 사례
- 동시성 제어: 
events테이블의UNIQUE (aggregate_id, version)제약 조건과 저장 전에expected_version을 확인하는 것이 낙관적 동시성 제어의 초석입니다. 두 명령이 동일한 aggregate를 동시에 수정하려고 하면 하나만 성공하고 다른 하나는 버전 불일치 또는 고유 제약 조건 위반으로 인해 실패하여 다시 시도하게 됩니다. - 상태 재구성 성능: 수천 개의 이벤트가 있는 aggregate의 경우 매번 모든 이벤트를 다시 재생하는 것은 느릴 수 있습니다.
- 스냅샷: aggregate의 현재 상태 스냅샷을 별도의 
snapshots테이블에 주기적으로 저장합니다. aggregate를 로드할 때 최신 스냅샷을 로드한 다음 해당 스냅샷 버전 이후의 이벤트만 적용합니다. - 비정규화된 읽기 모델 (프로젝션): 이것은 CQRS(Command Query Responsibility Segregation)의 중요한 부분입니다. 
events테이블에서 이벤트 스트림을 비동기적으로 처리하여 별도의 최적화된 읽기 모델(예: 요약 테이블, 검색 인덱스)을 구축합니다. 이는 종종events테이블에서 새 항목에 대한 폴링, 처리 및 읽기 모델 업데이트를 수행하는 별도의 "프로젝터" 서비스에 의해 수행됩니다. 
 - 스냅샷: aggregate의 현재 상태 스냅샷을 별도의 
 - 이벤트 불변성: 이벤트가 기록되면 절대 변경하거나 삭제해서는 안 됩니다. 이것은 이벤트 소싱의 기본입니다.
 - 이벤트 스키마 진화: 애플리케이션이 발전함에 따라 이벤트 스키마가 변경될 수 있습니다. 이벤트 버전 관리를 계획하고 상태 재구성 중에 이전 이벤트 형식을 처리하는 메커니즘을 제공합니다.
 - 쿼리: 
events테이블 자체는 aggregate의 현재 상태에 대한 복잡한 쿼리(예: "100달러 이상의 모든 주문을 제공")에는 일반적으로 좋지 않습니다. 이것이 읽기 모델이 중요한 이유입니다. 감사 또는 디버깅의 경우aggregate_id,event_type또는timestamp별로events테이블을 쿼리하면 매우 효과적입니다. - 원자 작업: 동일한 트랜잭션 내에서 새 이벤트 저장과 읽기 모델 업데이트(동기식으로 수행되는 경우)를 보장하거나, 언제나 일관성을 위해 외부 시스템으로 이벤트를 푸시하는 경우 "outbox 패턴"을 사용합니다. 이 "단일 테이블" 접근 방식의 경우 
save_events메서드가 이벤트 영속성에 대한 원자성을 처리합니다. 
애플리케이션 시나리오
이 단순화된 이벤트 소싱 접근 방식은 특히 다음과 같은 경우에 적합합니다.
- 감사 및 규정 준수: 모든 변경 사항이 기록되어 완전하고 불변적인 감사 추적을 제공합니다.
 - 디버깅 및 문제 해결: 이벤트를 다시 재생하여 시스템이 특정 잘못된 상태에 도달한 정확한 방법을 이해합니다.
 - 복잡한 도메인 논리: 비즈니스 규칙이 복잡하고 기록에 따라 달라질 때(예: "사용자는 지난 Z일 동안 Y를 하지 않은 경우에만 X를 할 수 있습니다.").
 - 시간 쿼리: "어제 오후 2시에 주문 상태가 어땠습니까?"와 같은 질문을 합니다.
 - 소규모에서 중규모 애플리케이션: Kafka 또는 기타 메시지 브로커의 전체 운영 오버헤드가 과도하지만 이벤트 소싱의 이점이 필요한 경우.
 - 프로토타이핑 및 학습: 인프라 관련 높은 학습 곡선 없이 이벤트 소싱을 시작할 수 있는 좋은 방법입니다.
 - 읽기 모델 재구성: 읽기 모델이 손상되었거나 재설계가 필요한 경우, 
events테이블에서 모든 기록 이벤트를 다시 재생하여 항상 다시 빌드할 수 있습니다. 
결론
이벤트 로그로 단일 데이터베이스 테이블 하나만 사용하여 이벤트 소싱을 구현하는 것은 실용적이고 강력한 접근 방식입니다. 외부 메시지 브로커의 복잡성을 제거하여 개발자가 패턴의 핵심 이점, 즉 모든 변경 사항에 대한 불변의 감사 가능한 기록, 강력한 동시성 제어, 애플리케이션 상태를 다시 빌드하는 능력에 집중할 수 있습니다. 매우 활발한 aggregate 및 복잡한 쿼리 성능에 대한 신중한 고려가 필요하지만, 스냅샷 및 전용 읽기 모델을 통해 이러한 문제를 효과적으로 해결할 수 있으며, 이는 놀랍도록 간단한 인프라로도 복원력 있고 기록이 풍부한 시스템을 달성할 수 있음을 보여줍니다. 이 패턴은 단순성과 강력함을 매력적으로 혼합하여 더 넓은 범위의 프로젝트에서 이벤트 소싱을 접근 가능하게 만듭니다.