2편: https://saysimple.tistory.com/199
서론
저번 시간에는 테스트 코드를 리뷰했습니다. 이번 시간에는 Handler와 Aggregate를 리뷰해 어떻게 Command, Query를 실행하고 Event를 발행하는지 알아보겠습니다.
프로젝트 구조
- commandmodel
- OrderAggregate
- OrderLine
- querymodel
- InMemoryOrdersEventHandler
- MongoConfiguration
- OrderQueryService
- OrderResponse
- OrdersEventhandler
- OrderStatusResponse
Aggregate
Aggregate란 집합하다, 모이다란 뜻으로 Command 수행을 위해 CRUD해야 하는 도메인의 집합입니다.
OrderAggregate
주문 Aggregate는 주문, 상품의 도메인을 관리합니다. 주문을 생성, 수정, 삭제하며 주문에 상품을 추가 및 삭제, 상품의 갯수를 증감 시킵니다.
@Aggregate(snapshotTriggerDefinition = "orderAggregateSnapshotTriggerDefinition")
public class OrderAggregate {
@AggregateIdentifier
private String orderId;
private boolean orderConfirmed;
@AggregateMember
private Map<String, OrderLine> orderLines;
@CommandHandler
public OrderAggregate(CreateOrderCommand command) {
apply(new OrderCreatedEvent(command.getOrderId()));
}
주문 아이디, 주문 확인 유무, 주문 라인과 생성자로 이루어져 있습니다. snapshotTriggerDefinition은 Aggregate 스냅샷에 대한 설정 값을 등록합니다. 자세한 내용은 아래 링크를 참고해주세요.
https://www.baeldung.com/axon-snapshotting-aggregates
OrderApplicationConfiguration
주문 서비스의 설정을 불러옵니다. 위 코드에선 스냅샷을 트리거하는 최대 threshold를 지정합니다.
@Configuration
public class OrderApplicationConfiguration {
@Bean
public SnapshotTriggerDefinition orderAggregateSnapshotTriggerDefinition(Snapshotter snapshotter, @Value("${axon.aggregate.order.snapshot-threshold:250}") int threshold) {
return new EventCountSnapshotTriggerDefinition(snapshotter, threshold);
}
}
EventCountSnapshotTriggerDefinition
이벤트의 스냅샷 트리거 규칙을 정의하는 클래스입니다.
private static class EventCountSnapshotTrigger extends AbstractSnapshotTrigger {
private final int threshold;
private int counter = 0;
public EventCountSnapshotTrigger(Snapshotter snapshotter, Class<?> aggregateType, int threshold) {
super(snapshotter, aggregateType);
this.threshold = threshold;
}
public boolean exceedsThreshold() {
return ++this.counter >= this.threshold;
}
public void reset() {
this.counter = 0;
}
}
아래 EventCountSnapshoutTrigger 클래스를 보면 this.counter가 this.threshold보다 커졌을 때 threshold를 exceeds 하는 것을 볼 수 있습니다. 이어서 주문 Aggregate의 CommandHandler 메소드입니다.
@CommandHandler
public void handle(AddProductCommand command) {
if (orderConfirmed) {
throw new OrderAlreadyConfirmedException(orderId);
}
String productId = command.getProductId();
if (orderLines.containsKey(productId)) {
throw new DuplicateOrderLineException(productId);
}
apply(new ProductAddedEvent(orderId, productId));
}
@CommandHandler
public void handle(ConfirmOrderCommand command) {
if (orderConfirmed) {
return;
}
apply(new OrderConfirmedEvent(orderId));
}
@CommandHandler
public void handle(ShipOrderCommand command) {
if (!orderConfirmed) {
throw new UnconfirmedOrderException();
}
apply(new OrderShippedEvent(orderId));
}
AddProductCommand, ConfirmOrderCommand, ShipOrderCommand 커맨드에 대한 핸들러 메소드 입니다. 각 메소드에서 커맨드를 받고, 이벤트를 apply 하는 모습을 볼 수 있습니다.
다음은 EventSourcingHandler 메소드입니다.
@EventSourcingHandler
public void on(OrderCreatedEvent event) {
this.orderId = event.getOrderId();
this.orderConfirmed = false;
this.orderLines = new HashMap<>();
}
@EventSourcingHandler
public void on(OrderConfirmedEvent event) {
this.orderConfirmed = true;
}
@EventSourcingHandler
public void on(ProductAddedEvent event) {
String productId = event.getProductId();
this.orderLines.put(productId, new OrderLine(productId));
}
@EventSourcingHandler
public void on(ProductRemovedEvent event) {
this.orderLines.remove(event.getProductId());
}
protected OrderAggregate() {
// Required by Axon to build a default Aggregate prior to Event Sourcing
}
OrderCreatedEvent, OrderConfirmedEvent, ProductAddedEvent가 트리거 될 때 메모리에 주문 추가, 주문 상태 변경, 주문 라인에서 주문 삭제를 실행하는 것을 볼 수 있습니다. 이와 같은 패턴을 UOW(Unit Of Work) 패턴이라 합니다. 메모리에서 Aggregate 안의 데이터 변경 사항을 저장 및 관리해 데이터의 일관성을 보장하는 패턴입니다.
OrderLine
주문의 상품 데이터를 나타냅니다.
public class OrderLine {
@EntityId
private final String productId;
private Integer count;
private boolean orderConfirmed;
public OrderLine(String productId) {
this.productId = productId;
this.count = 1;
}
상품 아이디, 상품 갯수, 주문 확인 유무와 생성자로 이루어져 있습니다.
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.ANNOTATION_TYPE, ElementType.METHOD, ElementType.FIELD})
public @interface EntityId {
String routingKey() default "";
}
EntityId는 Aggregate 안의 멤버를 routing할 키를 설정합니다.
다음은 CommandHandler입니다.
@CommandHandler
public void handle(IncrementProductCountCommand command) {
if (orderConfirmed) {
throw new OrderAlreadyConfirmedException(command.getOrderId());
}
apply(new ProductCountIncrementedEvent(command.getOrderId(), productId));
}
@CommandHandler
public void handle(DecrementProductCountCommand command) {
if (orderConfirmed) {
throw new OrderAlreadyConfirmedException(command.getOrderId());
}
if (count <= 1) {
apply(new ProductRemovedEvent(command.getOrderId(), productId));
} else {
apply(new ProductCountDecrementedEvent(command.getOrderId(), productId));
}
}
IncrementProductCountCommand, DecrementProductCountCommand를 통해 상품 갯수를 증감하고 갯수가 1개 이하일 때 요청을 받으면 상품을 제거합니다
EventSourcingHandler입니다.
@EventSourcingHandler
public void on(ProductCountIncrementedEvent event) {
this.count++;
}
@EventSourcingHandler
public void on(ProductCountDecrementedEvent event) {
this.count--;
}
@EventSourcingHandler
public void on(OrderConfirmedEvent event) {
this.orderConfirmed = true;
}
ProductCountIncrementedEvent, ProductCountDecrementedEvent를 통해 상품 갯수를 증감시키며 상품 확인 상태를 변경합니다.
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
OrderLine orderLine = (OrderLine) o;
return Objects.equals(productId, orderLine.productId) && Objects.equals(count, orderLine.count);
}
@Override
public int hashCode() {
return Objects.hash(productId, count);
}
equals, hashCode로 각 객체를 식별합니다. productId, orderLine.count를 이용합니다.
EventHandler
이벤트 핸들러는 Query를 수행하고 관련된 Event를 발행합니다.
ImMemoryOrdersEventHandler
메모리에서 주문 쿼리를 수행하고 관련 이벤트를 발행합니다. @ProcessingGroup은 이벤트 핸들러 그룹의 이름을 지정합니다. 지정하지 않으면 기본값으로 패키지의 풀네임이 들어갑니다. @Profile은 해당 프로필에서만 Bean으로 등록하며 !, &, | 와 같은 연산자를 사용할 수 있습니다. 위에선 프로필이 mongo가 아닐 때만 등록되도록 명시되어 있습니다.
@Service
@ProcessingGroup("orders")
@Profile("!mongo")
public class InMemoryOrdersEventHandler implements OrdersEventHandler {
private final Map<String, Order> orders = new HashMap<>();
private final QueryUpdateEmitter emitter;
public InMemoryOrdersEventHandler(QueryUpdateEmitter emitter) {
this.emitter = emitter;
}
EventHandler입니다.
@EventHandler
public void on(OrderCreatedEvent event) {
String orderId = event.getOrderId();
orders.put(orderId, new Order(orderId));
}
@EventHandler
public void on(ProductAddedEvent event) {
orders.computeIfPresent(event.getOrderId(), (orderId, order) -> {
order.addProduct(event.getProductId());
emitUpdate(order);
return order;
});
}
@EventHandler
public void on(ProductCountIncrementedEvent event) {
orders.computeIfPresent(event.getOrderId(), (orderId, order) -> {
order.incrementProductInstance(event.getProductId());
emitUpdate(order);
return order;
});
}
@EventHandler
public void on(ProductCountDecrementedEvent event) {
orders.computeIfPresent(event.getOrderId(), (orderId, order) -> {
order.decrementProductInstance(event.getProductId());
emitUpdate(order);
return order;
});
}
@EventHandler
public void on(ProductRemovedEvent event) {
orders.computeIfPresent(event.getOrderId(), (orderId, order) -> {
order.removeProduct(event.getProductId());
emitUpdate(order);
return order;
});
}
@EventHandler
public void on(OrderConfirmedEvent event) {
orders.computeIfPresent(event.getOrderId(), (orderId, order) -> {
order.setOrderConfirmed();
emitUpdate(order);
return order;
});
}
@EventHandler
public void on(OrderShippedEvent event) {
orders.computeIfPresent(event.getOrderId(), (orderId, order) -> {
order.setOrderShipped();
emitUpdate(order);
return order;
});
}
computeIfPresent는 주어진 값이 주어진 람다 함수를 실행합니다. 각 이벤트에 따라서 주문 생성, 상품 증감, 상품 제거, 주문 상태 변경을 하는 모습을 볼 수 있습니다.
private void emitUpdate(Order order) {
emitter.emit(OrderUpdatesQuery.class, q -> order.getOrderId()
.equals(q.getOrderId()), order);
}
emitUpdate는 emitter에 Query의 업데이트를 전달하는 역할을 합니다. 2번 게시글의 AbstractOrdersEventHandler에서 QueryUpdateEmitter를 모킹해 사용합니다.
QueryHandler입니다.
@QueryHandler
public List<Order> handle(FindAllOrderedProductsQuery query) {
return new ArrayList<>(orders.values());
}
@QueryHandler
public Publisher<Order> handleStreaming(FindAllOrderedProductsQuery query) {
return Mono.fromCallable(orders::values)
.flatMapMany(Flux::fromIterable);
}
@QueryHandler
public Integer handle(TotalProductsShippedQuery query) {
return orders.values()
.stream()
.filter(o -> o.getOrderStatus() == OrderStatus.SHIPPED)
.map(o -> Optional.ofNullable(o.getProducts()
.get(query.getProductId()))
.orElse(0))
.reduce(0, Integer::sum);
}
@QueryHandler
public Order handle(OrderUpdatesQuery query) {
return orders.get(query.getOrderId());
}
모든 주문을 동기적으로 혹은 Streaming으로 가져오는 모습을 볼 수 있습니다
@Override
public void reset(List<Order> orderList) {
orders.clear();
orderList.forEach(o -> orders.put(o.getOrderId(), o));
}
reset메소드는 테스트 코드에서 주문 데이터를 초기화하는 역할을 합니다.
메모리 안에서 주문에 대한 Query를 받고 적절한 값을 리턴하는 모습을 볼 수 있습니다. 메모리 안에서만 작동하기 때문에 테스트 코드를 위한 코드로 보입니다.
MongoConfiguration
MongoDB의 설정 값을 지정합니다.
@Configuration
@Profile("mongo")
public class MongoConfiguration {
@Bean
public TokenStore getTokenStore(MongoClient client, Serializer serializer) {
return MongoTokenStore.builder()
.mongoTemplate(DefaultMongoTemplate.builder()
.mongoDatabase(client)
.build())
.serializer(serializer)
.build();
}
}
MongoTokenStore는 Axon에서 제공하는 mongo extension입니다. Axon docs에 따르면 이벤트 프로세서에서 어떤 이벤트가 처리 되었는지, 어떤 세그먼트가 요청 되었는지 추적하는데 사용된다 합니다. 자세한 내용은 아래 docs를 참고해주세요.
https://docs.axoniq.io/reference-guide/extensions/mongo
MongoOrdersEventHandler
몽고 DB의 이벤트 핸들러입니다.
@Service
@ProcessingGroup("orders")
@Profile("mongo")
public class MongoOrdersEventHandler implements OrdersEventHandler {
static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup()
.lookupClass());
private static final String ORDER_COLLECTION_NAME = "orders";
private static final String AXON_FRAMEWORK_DATABASE_NAME = "axonframework";
private static final String ORDER_ID_PROPERTY_NAME = "orderId";
private static final String PRODUCTS_PROPERTY_NAME = "products";
private static final String ORDER_STATUS_PROPERTY_NAME = "orderStatus";
private final MongoCollection<Document> orders;
private final QueryUpdateEmitter emitter;
public MongoOrdersEventHandler(MongoClient client, QueryUpdateEmitter emitter) {
orders = client.getDatabase(AXON_FRAMEWORK_DATABASE_NAME)
.getCollection(ORDER_COLLECTION_NAME);
orders.createIndex(Indexes.ascending(ORDER_ID_PROPERTY_NAME), new IndexOptions().unique(true));
this.emitter = emitter;
}
로거를 생성하고 각 프로퍼티와 데이터베이스 이름을 정의합니다. 그리고 MongoDB에 삽입할 주문 목록과 마찬가지로 emitter를 선언합니다.
생성자에선 MongoClient를 통해서 주문 목록을 가져오고 오름차순으로 인덱스를 생성해줍니다. 그리고 emiiter를 정의합니다.
EventHandler입니다.
@EventHandler
public void on(OrderCreatedEvent event) {
orders.insertOne(orderToDocument(new Order(event.getOrderId())));
}
@EventHandler
public void on(ProductAddedEvent event) {
update(event.getOrderId(), o -> o.addProduct(event.getProductId()));
}
@EventHandler
public void on(ProductCountIncrementedEvent event) {
update(event.getOrderId(), o -> o.incrementProductInstance(event.getProductId()));
}
@EventHandler
public void on(ProductCountDecrementedEvent event) {
update(event.getOrderId(), o -> o.decrementProductInstance(event.getProductId()));
}
@EventHandler
public void on(ProductRemovedEvent event) {
update(event.getOrderId(), o -> o.removeProduct(event.getProductId()));
}
@EventHandler
public void on(OrderConfirmedEvent event) {
update(event.getOrderId(), Order::setOrderConfirmed);
}
@EventHandler
public void on(OrderShippedEvent event) {
update(event.getOrderId(), Order::setOrderShipped);
}
각 이벤트 핸들러에서 MongoDB에 주문 인덱스에 Document 삽입, 주문에 상품 추가, 주문 상태 변경 등의 수행하고 MongoDB에 반영합니다. 또한 emitter로 주문의 변경을 알립니다.
비즈니스 로직입니다.
private Optional<Order> getOrder(String orderId) {
return Optional.ofNullable(orders.find(eq(ORDER_ID_PROPERTY_NAME, orderId))
.first())
.map(this::documentToOrder);
}
private Order emitUpdate(Order order) {
emitter.emit(OrderUpdatesQuery.class, q -> order.getOrderId()
.equals(q.getOrderId()), order);
return order;
}
private Order updateOrder(Order order, Consumer<Order> updateFunction) {
updateFunction.accept(order);
return order;
}
private UpdateResult persistUpdate(Order order) {
return orders.replaceOne(eq(ORDER_ID_PROPERTY_NAME, order.getOrderId()), orderToDocument(order));
}
private void update(String orderId, Consumer<Order> updateFunction) {
UpdateResult result = getOrder(orderId).map(o -> updateOrder(o, updateFunction))
.map(this::emitUpdate)
.map(this::persistUpdate)
.orElse(null);
logger.info("Result of updating order with orderId '{}': {}", orderId, result);
}
Document의 Collections에서 주문을 가져와 주문 객체로 변환하는 것과 emitter, 주문, 실제 저장 등의 업데이트 로직을 볼 수 있습니다.
private Document orderToDocument(Order order) {
return new Document(ORDER_ID_PROPERTY_NAME, order.getOrderId()).append(PRODUCTS_PROPERTY_NAME, order.getProducts())
.append(ORDER_STATUS_PROPERTY_NAME, order.getOrderStatus()
.toString());
}
private Order documentToOrder(@NonNull Document document) {
Order order = new Order(document.getString(ORDER_ID_PROPERTY_NAME));
Document products = document.get(PRODUCTS_PROPERTY_NAME, Document.class);
products.forEach((k, v) -> order.getProducts()
.put(k, (Integer) v));
String status = document.getString(ORDER_STATUS_PROPERTY_NAME);
if (OrderStatus.CONFIRMED.toString()
.equals(status)) {
order.setOrderConfirmed();
} else if (OrderStatus.SHIPPED.toString()
.equals(status)) {
order.setOrderShipped();
}
return order;
}
주문과 Document를 각각 변환하는 함수입니다. 복잡해 보이지만 단순히 각 데이터 타입에 맞게 변환해주는 로직입니다.
private Bson shippedProductFilter(String productId) {
return and(eq(ORDER_STATUS_PROPERTY_NAME, OrderStatus.SHIPPED.toString()), exists(String.format(PRODUCTS_PROPERTY_NAME + ".%s", productId)));
}
배송 상태인 주문에 있는 모든 상품을 가져옵니다.
OrderEventHandler입니다. 주문 이벤트를 관리하는 인터페이스가 선언되어 있습니다.
public interface OrdersEventHandler {
void on(OrderCreatedEvent event);
void on(ProductAddedEvent event);
void on(ProductCountIncrementedEvent event);
void on(ProductCountDecrementedEvent event);
void on(ProductRemovedEvent event);
void on(OrderConfirmedEvent event);
void on(OrderShippedEvent event);
List<Order> handle(FindAllOrderedProductsQuery query);
Publisher<Order> handleStreaming(FindAllOrderedProductsQuery query);
Integer handle(TotalProductsShippedQuery query);
Order handle(OrderUpdatesQuery query);
void reset(List<Order> orderList);
}
QueryService
OrderQueryService
주문 쿼리 서비스입니다. 주문에 대한 쿼리를 수행해 주문 DTO를 반환하고 주문 DTO를 OrderResponse vo로 변환합니다.
@Service
public class OrderQueryService {
private final QueryGateway queryGateway;
public OrderQueryService(QueryGateway queryGateway) {
this.queryGateway = queryGateway;
}
쿼리 게이트웨이를 선언하고 생성자에서 정의 해줍니다.
Query 비즈니스 로직입니다.
public CompletableFuture<List<OrderResponse>> findAllOrders() {
return queryGateway.query(new FindAllOrderedProductsQuery(), ResponseTypes.multipleInstancesOf(Order.class))
.thenApply(r -> r.stream()
.map(OrderResponse::new)
.collect(Collectors.toList()));
}
public Flux<OrderResponse> allOrdersStreaming() {
Publisher<Order> publisher = queryGateway.streamingQuery(new FindAllOrderedProductsQuery(), Order.class);
return Flux.from(publisher)
.map(OrderResponse::new);
}
public Integer totalShipped(String productId) {
return queryGateway.scatterGather(new TotalProductsShippedQuery(productId), ResponseTypes.instanceOf(Integer.class), 10L, TimeUnit.SECONDS)
.reduce(0, Integer::sum);
}
public Flux<OrderResponse> orderUpdates(String orderId) {
return subscriptionQuery(new OrderUpdatesQuery(orderId), ResponseTypes.instanceOf(Order.class)).map(OrderResponse::new);
}
private <Q, R> Flux<R> subscriptionQuery(Q query, ResponseType<R> resultType) {
return Flux.using(
() -> queryGateway.subscriptionQuery(query, resultType, resultType),
ret -> Flux.from(ret.initialResult()),
SubscriptionQueryResult::close
);
}
모든 주문을 비동기 혹은 Streaming으로 반환하거나 모든 배송 상태 상품의 갯수, 주문 쿼리 구독 함수를 볼 수 있습니다. 주문 쿼리 구독 함수에선 Flux를 반환하여 Server Send Event 형태로 통신합니다.
OrderResponse
주문에 대한 반환을 표현하는 vo입니다.
@Getter
public class OrderResponse {
private final String orderId;
private final Map<String, Integer> products;
private final OrderStatusResponse orderStatus;
OrderResponse(Order order) {
this.orderId = order.getOrderId();
this.products = order.getProducts();
this.orderStatus = toResponse(order.getOrderStatus());
}
}
사용자에게 반환할 멤버와 생성자로 이루어져 있습니다. 마찬가지로 vo에는 사용자에게 보여줘도 되는 멤버만 포함되어야 합니다.
주문 상태 enum입니다.
public enum OrderStatusResponse {
CREATED, CONFIRMED, SHIPPED, UNKNOWN;
static OrderStatusResponse toResponse(OrderStatus status) {
for (OrderStatusResponse response : values()) {
if (response.toString()
.equals(status.toString())) {
return response;
}
}
return UNKNOWN;
}
}
위 OrderResponse의 orderStatus 멤버의 타입으로 사용됩니다.
결론
이로써 해당 예제를 모두 둘러보았습니다. 이번 리뷰를 통해서 Command는 Aggregate에서 CommandGateway를 통해 Command를 수행하고 관련 Event를 발행, Query는 QueryService에서 QueryGateway를 통해 Query를 수행하고 EventHandler에서 적절한 vo로 변환해 반환하며 QueryService에서 엔드포인트(API)로 데이터를 반환한다는 것을 알았습니다.
여기서 주의해야 할 점은 OrderAggregate에서 CommandHandler를 정의하지만 실제 Event에 의한 데이터 CRUD는 EventHandler에서 일어난다는 점입니다.
다음 글에선 제가 좋아하는 방식으로 프로젝트를 리팩토링하고, 유저 도메인을 추가 해보겠습니다.
'Backend > SpringBoot' 카테고리의 다른 글
[SpringBoot] Axon을 사용해 CQRS와 이벤트 소싱이 적용된 Order 서비스 만들기 - 4 (0) | 2024.04.24 |
---|---|
[SpringBoot] Axon을 사용해 CQRS와 이벤트 소싱이 적용된 Order 서비스 만들기 - 2 (0) | 2024.04.21 |
[SpringBoot] Axon을 사용해 CQRS와 이벤트 소싱이 적용된 Order 서비스 만들기 - 1 (1) | 2024.04.20 |
[SpringBoot] ColumnDefault와 jpa 생명주기 (0) | 2024.04.18 |
[SpringBoot] ModelMapper Matching Strategy 정리 (0) | 2024.04.08 |