백엔드 프레임워크에서 CQRS 및 이벤트 소싱을 사용한 확장 가능한 시스템 구축
Wenhao Wang
Dev Intern · Leapcell

소개
빠르게 발전하는 현대 소프트웨어 개발 환경에서 강력하고 확장 가능하며 유지 관리 가능한 애플리케이션을 구축하는 것이 무엇보다 중요합니다. 시스템의 복잡성과 사용자 요구 사항이 증가함에 따라 기존 CRUD(Create, Read, Update, Delete) 아키텍처는 종종 병목 현상을 일으키며 읽기-쓰기 경합, 데이터 일관성 문제 및 과거 변경 기록 감사 어려움에 시달릴 수 있습니다. 바로 여기서 명령-쿼리 책임 분리(CQRS) 및 이벤트 소싱과 같은 고급 아키텍처 패턴이 강력한 솔루션으로 등장합니다. 데이터 변경 및 쿼리 처리를 근본적으로 재정의함으로써 이러한 패턴은 성능이 뛰어날 뿐만 아니라 본질적으로 복원력이 있고 통찰력 있는 시스템 설계를 위한 경로를 제공합니다. 이 글에서는 백엔드 프레임워크 내에서 CQRS 및 이벤트 소싱의 실제 적용을 심층적으로 살펴보고, 이를 통해 정교한 엔터프라이즈급 애플리케이션 구축 접근 방식을 어떻게 변화시킬 수 있는지 보여줍니다.
핵심 개념 및 원칙
실제 구현에 들어가기 전에 CQRS 및 이벤트 소싱을 뒷받침하는 핵심 개념을 확실하게 이해하는 것이 중요합니다.
명령-쿼리 책임 분리(CQRS)
CQRS는 애플리케이션 상태를 변경하는 작업인 명령 처리에 대한 책임과 데이터를 검색하는 작업인 쿼리 처리에 대한 책임을 분리하는 아키텍처 패턴입니다.
- 명령 모델(쓰기측): 이 애플리케이션 부분은 명령을 수락하고 처리하며 비즈니스 규칙을 유효성 검사하고 상태 변경을 영속화하는 책임을 집니다. 일반적으로 쓰기에 최적화된 데이터 저장소를 사용하며 종종 트랜잭션 일관성에 중점을 둡니다.
- 쿼리 모델(읽기측): 이 부분은 데이터 읽기에 최적화되어 있습니다. 특정 쿼리 패턴에 맞춰 높은 성능과 유연성을 제공하는 비정규화된 데이터 저장소(예: 보고 데이터베이스, 키-값 저장소 또는 검색 인덱스)를 사용할 수 있습니다.
분리해야 하는 이유는 무엇인가요? 기존 CRUD 시스템은 종종 읽기와 쓰기에 단일 모델을 사용하므로 절충이 발생합니다. 하나를 최적화하면 종종 다른 하나에 해를 끼칩니다. CQRS는 각 측면의 독립적인 확장 및 최적화를 허용하여 성능, 유연성 및 유지 관리성을 개선합니다.
이벤트 소싱
이벤트 소싱은 집계의 현재 상태를 저장하는 대신 해당 집계에 대한 모든 변경을 설명하는 불변 이벤트 시퀀스를 저장하는 영속성 패턴입니다.
- 이벤트: 과거에 발생한 사실을 나타냅니다(예:
OrderPlacedEvent
,ProductQuantityAdjustedEvent
). 불변이며 append-only입니다. - 이벤트 저장소: 이러한 이벤트 시퀀스를 저장하고 검색하도록 설계된 특수한 데이터베이스입니다. 애플리케이션 상태에 대한 단일 진실 공급원 역할을 합니다.
- 집계: 데이터 변경에 대해 단일 단위로 취급할 수 있는 도메인 객체 클러스터입니다. 트랜잭션 일관성 및 이벤트 적용을 위한 경계입니다.
상태 대신 이벤트를 저장해야 하는 이유는 무엇인가요? 이벤트 소싱은 모든 변경에 대한 완전하고 감사 가능한 기록을 제공하여 시간 여행 디버깅, 이벤트 재현을 통한 특정 시점의 상태 복원, 다양한 소비자에 맞춰진 여러 읽기 모델 생성과 같은 강력한 기능을 가능하게 합니다. 또한 분산 시스템을 위한 게시-서브 메커니즘과 자연스럽게 통합됩니다.
CQRS와 이벤트 소싱 간의 관계
CQRS와 이벤트 소싱은 상호 보완적인 패턴입니다. 이벤트 소싱은 CQRS의 쓰기 측면에 자연스럽게 적합합니다. 명령은 이벤트를 생성하고, 이 이벤트는 이벤트 저장소에 저장됩니다. 그런 다음 이러한 이벤트는 하나 이상의 읽기 모델을 구축하고 업데이트하기 위해 비동기적으로 게시될 수 있으며, 이 읽기 모델은 CQRS의 읽기 측면을 구성합니다. 이러한 시너지는 강력하고 고도로 확장 가능하며 감사 가능한 시스템을 가능하게 합니다.
CQRS 및 이벤트 소싱 구현
가상 전자 상거래 제품 관리 서비스를 예로 들어 CQRS 및 이벤트 소싱 구현을 살펴보겠습니다. 개념을 설명하기 위해 일반 백엔드 프레임워크(예: Java의 Spring Boot 또는 Python의 FastAPI)를 사용하겠습니다.
프로젝트 구조 개요
├── src
│ ├── main
│ │ ├── java/python/...
│ │ │ └── com
│ │ │ └── example
│ │ │ └── productservice
│ │ │ ├── api // 명령 및 쿼리에 대한 REST 컨트롤러
│ │ │ ├── command // 명령 정의 및 처리기
│ │ │ │ ├── model // 명령에 대한 DTO
│ │ │ │ └── service // 명령 처리기 로직
│ │ │ ├── query // 쿼리 정의 및 처리기
│ │ │ │ ├── model // 쿼리 및 읽기 모델에 대한 DTO
│ │ │ │ └── service // 쿼리 처리기 로직
│ │ │ ├── domain // 집계, 이벤트 및 비즈니스 로직
│ │ │ │ ├── aggregate // ProductAggregate
│ │ │ │ ├── event // Product 이벤트(예: ProductCreatedEvent)
│ │ │ │ └── repository // 이벤트 저장소 상호 작용
│ │ │ ├── infrastructure // 이벤트 저장소 구성, 이벤트 게시자
│ │ │ └── config // 애플리케이션 구성
1. 명령 및 이벤트 정의
먼저 Product
집계에 대한 명령과 해당 명령이 생성하는 이벤트를 정의합니다.
명령 (쓰기측 입력):
// Java 예제 (명령 DTO) public class CreateProductCommand { private String productId; private String name; private double price; private int quantity; // Getter, Setter, 생성자 } public class UpdateProductPriceCommand { private String productId; private double newPrice; // Getter, Setter, 생성자 }
# Python 예제 (Pydantic을 사용한 명령 DTO) from pydantic import BaseModel class CreateProductCommand(BaseModel): product_id: str name: str price: float quantity: int class UpdateProductPriceCommand(BaseModel): product_id: str new_price: float
이벤트 (시스템 진실):
// Java 예제 (이벤트) public abstract class ProductEvent { private String productId; private long timestamp; // Getter, Setter, 생성자 } public class ProductCreatedEvent extends ProductEvent { private String name; private double price; private int quantity; // Getter, Setter, 생성자 } public class ProductPriceUpdatedEvent extends ProductEvent { private double newPrice; // Getter, Setter, 생성자 }
# Python 예제 (Pydantic을 사용한 이벤트) from datetime import datetime from typing import Optional class Event(BaseModel): product_id: str timestamp: datetime = datetime.utcnow() class ProductCreatedEvent(Event): name: str price: float quantity: int class ProductPriceUpdatedEvent(Event): new_price: float
2. 집계 및 이벤트 소싱 논리 (쓰기측)
ProductAggregate
는 명령을 적용하고 이벤트를 생성하는 책임이 있습니다. 자체 이벤트 재생을 통해 상태를 재구성합니다.
// Java 예제 (Product Aggregate) public class ProductAggregate { private String productId; private String name; private double price; private int quantity; private long version; // 낙관적 동시성 제어를 위한 버전 // 새 집계 생성을 위한 생성자 public ProductAggregate(CreateProductCommand command) { applyNewEvent(new ProductCreatedEvent(command.getProductId(), command.getName(), command.getPrice(), command.getQuantity())); } // 기록에서 다시 빌드하기 위한 생성자 public ProductAggregate(List<ProductEvent> history) { history.forEach(this::apply); } // 명령 처리기 public void updatePrice(UpdateProductPriceCommand command) { if (command.getNewPrice() <= 0) { throw new IllegalArgumentException("Price cannot be negative or zero."); } applyNewEvent(new ProductPriceUpdatedEvent(this.productId, command.getNewPrice())); } // 이벤트에 따라 상태를 변경하는 적용 메서드 private void apply(ProductEvent event) { if (event instanceof ProductCreatedEvent) { ProductCreatedEvent e = (ProductCreatedEvent) event; this.productId = e.getProductId(); this.name = e.getName(); this.price = e.getPrice(); this.quantity = e.getQuantity(); } else if (event instanceof ProductPriceUpdatedEvent) { ProductPriceUpdatedEvent e = (ProductPriceUpdatedEvent) event; this.price = e.getNewPrice(); } this.version++; } // 새 이벤트를 적용하고 저장하는 도우미 private void applyNewEvent(ProductEvent event) { apply(event); // 이 '이벤트'는 저장되고 잠재적으로 게시됩니다. // 실제 시스템에서는 수집된 이벤트가 이벤트 저장소로 전송됩니다. } // Getter }
# Python 예제 (Product Aggregate) from typing import List, Dict, Any class ProductAggregate: def __init__(self, product_id: str): self.product_id = product_id self.name: Optional[str] = None self.price: Optional[float] = None self.quantity: Optional[int] = None self.version: int = -1 self._uncommitted_events: List[Event] = [] @classmethod def create(cls, command: CreateProductCommand) -> 'ProductAggregate': aggregate = cls(command.product_id) aggregate._apply_new_event(ProductCreatedEvent( product_id=command.product_id, name=command.name, price=command.price, quantity=command.quantity )) return aggregate @classmethod def from_history(cls, product_id: str, history: List[Event]) -> 'ProductAggregate': aggregate = cls(product_id) for event in history: aggregate._apply(event) return aggregate def update_price(self, command: UpdateProductPriceCommand): if command.new_price <= 0: raise ValueError("Price cannot be negative or zero.") self._apply_new_event(ProductPriceUpdatedEvent( product_id=self.product_id, new_price=command.new_price )) def _apply(self, event: Event): if isinstance(event, ProductCreatedEvent): self.name = event.name self.price = event.price self.quantity = event.quantity elif isinstance(event, ProductPriceUpdatedEvent): self.price = event.new_price self.version += 1 def _apply_new_event(self, event: Event): self._apply(event) self._uncommitted_events.append(event) # 이벤트를 저장하기 위해 수집 def get_uncommitted_events(self) -> List[Event]: return self._uncommitted_events def clear_uncommitted_events(self): self._uncommitted_events = [] # Getter/속성 @property def current_state(self) -> Dict[str, Any]: return { "productId": self.product_id, "name": self.name, "price": self.price, "quantity": self.quantity, "version": self.version }
3. 명령 처리기 및 이벤트 저장소 상호 작용
명령 처리기는 이벤트 저장소에서 집계를 로드하고, 명령을 적용하고, 새로 생성된 이벤트를 저장하고, 게시하는 프로세스를 조정합니다.
// Java 예제 (Product Command Handler) @Service public class ProductCommandHandler { private final EventStore eventStore; private final EventPublisher eventPublisher; // 읽기 모델에 대한 이벤트 게시 public ProductCommandHandler(EventStore eventStore, EventPublisher eventPublisher) { this.eventStore = eventStore; this.eventPublisher = eventPublisher; } public void handle(CreateProductCommand command) { ProductAggregate aggregate = new ProductAggregate(command); List<ProductEvent> newEvents = // 집계에서 새 이벤트 가져오기 로직 eventStore.saveEvents(command.getProductId(), newEvents, aggregate.getVersion()); newEvents.forEach(eventPublisher::publish); } public void handle(UpdateProductPriceCommand command) { List<ProductEvent> history = eventStore.getEventsForAggregate(command.getProductId()); ProductAggregate aggregate = new ProductAggregate(history); // 낙관적 동시성 확인 (버전 관리) // 명령에서 예상한 버전이 aggregate.version과 일치하지 않으면 오류 발생 // 단순화를 위해 여기서는 최신 버전을 가정합니다. aggregate.updatePrice(command); List<ProductEvent> newEvents = // 집계에서 새 이벤트 가져오기 로직 eventStore.saveEvents(command.getProductId(), newEvents, aggregate.getVersion()); newEvents.forEach(eventPublisher::publish); } } // EventStore 및 EventPublisher는 인프라 구성 요소입니다. EventStore는 NoSQL DB // (예: Cassandra, MongoDB) 또는 특수 이벤트 저장소(예: EventStoreDB)일 수 있습니다.
# Python 예제 (Product Command Handler) from typing import Protocol, List from .domain.aggregate import ProductAggregate from .infrastructure.event_store import EventStore from .infrastructure.event_publisher import EventPublisher from .domain.event import Event class ProductCommandHandler: def __init__(self, event_store: EventStore, event_publisher: EventPublisher): self.event_store = event_store self.event_publisher = event_publisher def handle_create_product(self, command: CreateProductCommand): aggregate = ProductAggregate.create(command) self.event_store.save_events(command.product_id, aggregate.get_uncommitted_events(), aggregate.version) for event in aggregate.get_uncommitted_events(): self.event_publisher.publish(event) aggregate.clear_uncommitted_events() def handle_update_product_price(self, command: UpdateProductPriceCommand): history = self.event_store.get_events_for_aggregate(command.product_id) if not history: raise ValueError(f"Product with ID {command.product_id} not found.") aggregate = ProductAggregate.from_history(command.product_id, history) # 실제 시스템에서는 명령과 함께 예상 버전(expected_version)을 전달하고 # aggregate.version과 비교하여 낙관적 동시성을 처리합니다. # 여기서는 단순화를 위해 최신 버전을 가정합니다. aggregate.update_price(command) self.event_store.save_events(command.product_id, aggregate.get_uncommitted_events(), aggregate.version) for event in aggregate.get_uncommitted_events(): self.event_publisher.publish(event) aggregate.clear_uncommitted_events() # 예제 EventStore (단순화) class EventStore: def __init__(self): self._stores: Dict[str, List[Event]] = {} # 데모를 위한 간단한 인메모리 스토어 def save_events(self, aggregate_id: str, events: List[Event], expected_version: int): current_events = self._stores.get(aggregate_id, []) # 기본적인 낙관적 동시성 확인 (실제 환경에서는 더 강력하게) if current_events and len(current_events) != expected_version - len(events): raise ValueError("Concurrency conflict: aggregate has been modified.") current_events.extend(events) self._stores[aggregate_id] = current_events print(f"Saved {len(events)} events for {aggregate_id}. Total: {len(current_events)}") def get_events_for_aggregate(self, aggregate_id: str) -> List[Event]: return self._stores.get(aggregate_id, []) # 예제 EventPublisher (단순화) class EventPublisher: def publish(self, event: Event): print(f"Publishing event: {event.__class__.__name__} for product {event.product_id}") # 실제 시스템에서는 메시지 브로커(예: Kafka, RabbitMQ)로 푸시합니다.
4. 읽기 모델 및 쿼리 처리기 (읽기측)
명령측에서 게시된 이벤트는 비정규화된 읽기 모델을 업데이트하는 프로젝터(또는 이벤트 처리기)에서 소비됩니다. 그런 다음 이러한 읽기 모델이 쿼리됩니다.
읽기 모델 (비정규화된 뷰):
// Java 예제 (Product Overview Read Model) public class ProductSummary { private String productId; private String name; private double currentPrice; private int quantityOnHand; // 잠재적으로 'lastUpdated', 'totalOrders'와 같은 다른 계산된 필드 // Getter, Setter, 생성자 }
# Python 예제 (Pydantic을 사용한 Product Overview Read Model) class ProductSummary(BaseModel): product_id: str name: str current_price: float quantity_on_hand: int last_updated: datetime = datetime.utcnow() # 계산된 필드
이벤트 소비자 / 프로젝터:
// Java 예제 (Product Read Model Projector) @Service public class ProductReadModelProjector { private final ProductSummaryRepository productSummaryRepository; // ProductSummary를 영속화 public ProductReadModelProjector(ProductSummaryRepository productSummaryRepository) { this.productSummaryRepository = productSummaryRepository; } @EventListener // Spring의 애플리케이션 이벤트 수신 방식 public void handle(ProductCreatedEvent event) { ProductSummary summary = new ProductSummary(event.getProductId(), event.getName(), event.getPrice(), event.getQuantity()); productSummaryRepository.save(summary); } @EventListener public void handle(ProductPriceUpdatedEvent event) { ProductSummary summary = productSummaryRepository.findByProductId(event.getProductId()) .orElseThrow(() -> new RuntimeException("Product summary not found")); summary.setCurrentPrice(event.getNewPrice()); summary.setLastUpdated(event.getTimestamp()); // 마지막 업데이트 시간 업데이트 productSummaryRepository.save(summary); } }
# Python 예제 (Product Read Model Projector) from .infrastructure.read_model_db import ProductSummaryRepository from .query.model import ProductSummary class ProductReadModelProjector: def __init__(self, product_summary_repo: ProductSummaryRepository): self.product_summary_repo = product_summary_repo def handle_product_created_event(self, event: ProductCreatedEvent): summary = ProductSummary( product_id=event.product_id, name=event.name, current_price=event.price, quantity_on_hand=event.quantity, last_updated=event.timestamp ) self.product_summary_repo.save(summary) print(f"Projector: Created summary for product {summary.product_id}") def handle_product_price_updated_event(self, event: ProductPriceUpdatedEvent): summary = self.product_summary_repo.find_by_product_id(event.product_id) if not summary: raise ValueError(f"Product summary for {event.product_id} not found.") summary.current_price = event.new_price summary.last_updated = event.timestamp self.product_summary_repo.save(summary) print(f"Projector: Updated price for product {summary.product_id} to {summary.current_price}") # 예제 Read Model Repository (단순화, PostgreSQL과 같은 별도의 데이터베이스일 수 있음) class ProductSummaryRepository: def __init__(self): self._store: Dict[str, ProductSummary] = {} def save(self, summary: ProductSummary): self._store[summary.product_id] = summary def find_by_product_id(self, product_id: str) -> Optional[ProductSummary]: return self._store.get(product_id) def find_all(self) -> List[ProductSummary]: return list(self._store.values())
쿼리 처리기 및 API 엔드포인트:
// Java 예제 (Product Query Handler 및 REST 엔드포인트) @Service public class ProductQueryHandler { private final ProductSummaryRepository productSummaryRepository; public ProductQueryHandler(ProductSummaryRepository productSummaryRepository) { this.productSummaryRepository = productSummaryRepository; } public ProductSummary getProductSummary(String productId) { return productSummaryRepository.findByProductId(productId) .orElseThrow(() -> new ProductNotFoundException(productId)); } public List<ProductSummary> getAllProductSummaries() { return productSummaryRepository.findAll(); } } @RestController @RequestMapping("/api/products") public class ProductQueryController { private final ProductQueryHandler queryHandler; public ProductQueryController(ProductQueryHandler queryHandler) { this.queryHandler = queryHandler; } @GetMapping("/{productId}") public ResponseEntity<ProductSummary> getProduct(@PathVariable String productId) { return ResponseEntity.ok(queryHandler.getProductSummary(productId)); } @GetMapping public ResponseEntity<List<ProductSummary>> getAllProducts() { return ResponseEntity.ok(queryHandler.getAllProductSummaries()); } }
# Python 예제 (Product Query Handler 및 FastAPI 엔드포인트) from fastapi import APIRouter, HTTPException from typing import List product_query_router = APIRouter() class ProductQueryHandler: def __init__(self, product_summary_repo: ProductSummaryRepository): self.product_summary_repo = product_summary_repo def get_product_summary(self, product_id: str) -> ProductSummary: summary = self.product_summary_repo.find_by_product_id(product_id) if not summary: raise HTTPException(status_code=404, detail=f"Product with ID {product_id} not found.") return summary def get_all_product_summaries(self) -> List[ProductSummary]: return self.product_summary_repo.find_all() # FastAPI 엔드포인트 설정 (핸들러에 대한 앱 컨텍스트/종속성 주입 가정) @product_query_router.get("/{product_id}", response_model=ProductSummary) async def get_product( product_id: str, query_handler: ProductQueryHandler # FastAPI의 Depends를 통해 주입 ): return query_handler.get_product_summary(product_id) @product_query_router.get("/", response_model=List[ProductSummary]) async def get_all_products( query_handler: ProductQueryHandler ): return query_handler.get_all_product_summaries() # 메인 앱에서: # app = FastAPI() # app.include_router(product_query_router, prefix="/api/products")
애플리케이션 시나리오
CQRS 및 이벤트 소싱은 특히 다음과 같은 경우에 적합합니다.
- 복잡한 도메인 모델: 비즈니스 규칙이 복잡하고 상태 변경 기록이 필요한 경우.
- 고성능 읽기/쓰기 시스템: 읽기 및 쓰기 패턴이 크게 다른 시스템으로, 독립적으로 확장할 수 있습니다.
- 감사 가능성 및 규정 준수: 모든 변경 사항이 이벤트로 기록되어 완전한 감사 추적을 제공합니다.
- 과거 분석 및 비즈니스 인텔리전스: 이벤트를 재생하거나 다양한 분석 모델로 변환할 수 있습니다.
- 마이크로서비스 아키텍처: 이벤트는 서비스 간의 통신 및 최종 일관성을 촉진하는 데 자연스럽게 사용됩니다.
- 실시간 대시보드 및 프로젝션: 이벤트는 실시간 뷰와 보고서를 업데이트할 수 있습니다.
결론
CQRS와 이벤트 소싱은 신중하게 적용하면 매우 확장 가능하고 복원력이 있으며 유지 관리 가능한 백엔드 시스템으로 이어질 수 있는 강력한 아키텍처 패턴입니다. 명령 처리 및 쿼리 처리의 책임을 명확하게 분리하고 변경 사항을 불변하는 이벤트 시퀀스로 영속화함으로써 개발자는 뛰어난 성능, 포괄적인 감사 추적, 진화하는 비즈니스 요구 사항에 대한 탁월한 유연성을 제공하는 애플리케이션을 구축할 수 있습니다. 복잡성을 도입하지만 장기적인 이점은 특정 문제 도메인에서 초기 학습 곡선을 훨씬 능가하는 경우가 많아 팀이 진정으로 강력하고 통찰력 있는 소프트웨어를 구축할 수 있도록 지원합니다. 이러한 패턴은 현재 상태를 저장하는 것에서 현재 상태에 이르게 된 변경의 인과 관계 시퀀스를 이해하는 것으로 우리의 관점을 근본적으로 변화시킵니다.