楽水

人々の創造が自由に表現できる舞台づくり

DX アプリケーション

イベントソーシングとは何か

投稿日:

ここでは、イベントソーシングについて次の観点で説明します。

イベントソーシングとは

イベントソーシング(Event Sourcing)とは、、「誰が・いつ・何を・なぜ したか」というイベントをデータの“唯一の情報源(source of truth)”として保存・管理することです。
なので、イベントは 「事実の記録」 として更新や削除できず不変(immutable)です。

通常のシステムでは、現在の状態(たとえば契約の最新情報)をRDBなどに「最新値」として保存します。
しかし、イベントソーシングでは、「状態がどう変化してきたか(イベント)」を時系列で保存するので、必要に応じて特定の時点の状態を再構築(再現)することができます。
つまり、通常のDBが記録するのが「結果」であれば、イベとソーシングはその「原因」を記録するのです。
会計で例えると、データストアのデータがB/S(Balance Sheet) の「現在の状態のスナップショット」で、イベントストアのイベントは、P/L(Profit and Loss Statement) の期間内の変化(収益や費用)になります。

なので、次のような要求がある場合、状態の記録だけでなく、イベントソーシングを検討します。

  • 監査とコンプライアンス
    金融業界や医療業界など、厳格な規制が存在する分野では、データの変更履歴をすべて追跡・保存する必要があります。
    監査の際に、ある取引や医療記録がどのように変更され、誰がいつそれを行ったのかを正確に再現することができます。
    例えば、ある口座の残高が特定の日時にどのように変更されたのかを確認し、その正当性を証明するために過去の状態を再現できます。
  • デバッグとエラー解析
    ある問題が発生した時点のシステム状態を再現することで、どのイベントが原因でエラーが発生したのかを突き止めることができます。
    例えば、特定の顧客の注文がなぜ誤処理されたのかを再現し、エラーの原因を特定するために、当時のイベントシーケンスを確認します。
  • ビジネスプロセスの検証・分析
    ある注文がどの時点で承認され、どのタイミングで出荷指示が出され、その後どのように顧客に届いたか、という一連のプロセスをイベントを使って再現することで、業務フローを振り返り改善点を見つけることができます。
  • ユーザートラッキングと行動解析
    ユーザーがどのような操作を行った結果、特定の商品をカートに入れ、購入を完了したのかというユーザーの操作履歴を再現し行動を解析をすることができます。
  • 予測モデルの精度向上
    予測モデルを訓練するために、特定の時間範囲のデータが必要な場合があります。
    イベントソーシングではその期間内の状態遷移を正確に再現できるためより精度の高い予測モデルを構築することができます。

イベントソーシングの実現方法

イベントソーシングを実現するときは、コマンド・クエリ責務分離(CQRS)パターン
とイベント駆動アーキテクチャ(Event-Driven Architecture: EDA:システム内のコンポーネント同士が「イベント」を使って非同期に連携・通信するアーキテクチャスタイル)を使います。
CQRSのコマンド処理にイベントソーシングを適用し原因をイベントストアに記録するとともに、クエリ処理に通常の状態管理を適用し結果をデータストアに記録します。
具体的には次のようなプロセスでイベントと状態を管理します。

  1. イベント発生(コマンド処理)
    アプリケーションサービスはコマンドを受け取り、集約(Aggregate)にドメインロジックを適用し、状態変更ではなく ドメインイベント を生成し、イベントストアに永続化します。
  2. Readモデルの更新(CQRSクエリ側)
    イベントストアへの保存後、非同期メッセージング(Queue/Topicなど)で最新状態をクエリ側に送信します。
    クエリ側は最新の状態(スナップショット)を受取り、Readモデルのデータストアに保存します。
  3. クエリ処理
    最新の状態を取得する場合は、データストアを参照します。
    高頻度なクエリ処理はデータストアにオフロードし、イベントストアの負荷を減らすことができます。
  4. 状態再構築(リプレイ)
    必要に応じて、データストアのデータが壊れた場合やリプレイ(再現)が必要な場合は、イベントストアのイベントから最新の状態を復元します。

イベントソーシングのメカニズム

次に、イベントソーシングのメカニズムを詳しく見ていきましょう。
次の図は、UMLのシーケンス図で書いたイベントソーシングのメカニズムです。

次のような流れになります。

  1. フロントエンドアプリケーションがRestControllerに注文を登録する
  2. RestControllerはアプリケーションサービスであるコマンドハンドラーに注文登録コマンドを渡し、注文の登録を依頼する
  3. コマンドハンドラーはイベントストアから同じ注文IDの過去のイベントを取得する
  4. コマンドハンドラーは、注文(集約)に過去のイベントから同じ注文IDの集約を再構築させる(replay)
  5. コマンドハンドラーは、注文(集約)にドメインロジックを実行させて新しいイベントを生成する
  6. コマンドハンドラーは、新しいイベントをイベントストアに保存する
  7. コマンドハンドラーは、注文(集約)の状態を更新するために新しいイベントを適用する(apply)
  8. コマンドハンドラーは、イベント駆動で、CQRSのクエリ処理に最新の注文オブジェクトを発行する
  9. 最新の注文オブジェクトを受け取ったパブリッシャー(発行者)は、ActiveMQなどのQueueやTopicを介して、非同期で、Readモデルであるサブスクライバー(購読者)に注文オブジェクトを渡す。

イベントソーシングの実装例

イベントソーシングの実装例

コマンド

注文登録コマンド

public class PlaceOrderCommand {
public final String orderId;
public final String customerId;
public final String employeeId;
public final LocalDate orderDate;
public final List orderLines;

public PlaceOrderCommand(String orderId, String customerId, String employeeId,
LocalDate orderDate, List orderLines) {
this.orderId = orderId;
this.customerId = customerId;
this.employeeId = employeeId;
this.orderDate = orderDate;
this.orderLines = orderLines;
}
}

注文取消コマンド
public class CancelOrderCommand {
public final String orderId;

public CancelOrderCommand(String orderId) {
this.orderId = orderId;
}
}

イベント

ドメインイベント
public interface DomainEvent {
LocalDateTime occurredAt();
}
注文登録イベント
public record OrderPlaced(
String orderId,
String customerId,
String employeeId,
LocalDate orderDate,
List orderLines,
LocalDateTime occurredAt
) implements DomainEvent {}

注文取り消しイベント
public record OrderCancelled(
String orderId,
LocalDateTime occurredAt
) implements DomainEvent {}

永続化イベント
import jakarta.persistence.*;
import java.time.LocalDateTime;

@Entity
@Table(name = “stored_events”)
public class StoredEvent {

@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;

private String aggregateId;

private String eventType;

@Lob
private String payload;

private LocalDateTime occurredAt;

// — コンストラクタ —
protected StoredEvent() {} // JPA用

public StoredEvent(String aggregateId, String eventType, String payload, LocalDateTime occurredAt) {
this.aggregateId = aggregateId;
this.eventType = eventType;
this.payload = payload;
this.occurredAt = occurredAt;
}

// — Getter —
public String getAggregateId() { return aggregateId; }
public String getEventType() { return eventType; }
public String getPayload() { return payload; }
public LocalDateTime getOccurredAt() { return occurredAt; }
}

イベントストア

イベントストア
public interface EventStore {
List loadEvents(String aggregateId);

void saveEvents(String aggregateId, List events);
}

永続化イベントに対するJPAリポジトリ
import org.springframework.data.jpa.repository.JpaRepository;
import java.util.List;

public interface StoredEventRepository extends JpaRepository {
List findByAggregateIdOrderByOccurredAtAsc(String aggregateId);
}

JPAのイベントストア
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;

import java.util.*;

@Repository
public class JpaEventStore implements EventStore {

private final StoredEventRepository repository;
private final ObjectMapper objectMapper;

public JpaEventStore(StoredEventRepository repository, ObjectMapper objectMapper) {
this.repository = repository;
this.objectMapper = objectMapper;
}

@Override
@Transactional(readOnly = true)
public List loadEvents(String aggregateId) {
List storedEvents = repository.findByAggregateIdOrderByOccurredAtAsc(aggregateId);
List events = new ArrayList();

for (StoredEvent stored : storedEvents) {
try {
Class> eventClass = Class.forName(stored.getEventType());
DomainEvent event = (DomainEvent) objectMapper.readValue(stored.getPayload(), eventClass);
events.add(event);
} catch (Exception e) {
throw new RuntimeException(“イベントの復元に失敗しました: ” + stored.getEventType(), e);
}
}

return events;
}

@Override
@Transactional
public void saveEvents(String aggregateId, List events) {
for (DomainEvent event : events) {
try {
String payload = objectMapper.writeValueAsString(event);
String eventType = event.getClass().getName();
LocalDateTime occurredAt = event.occurredAt();

StoredEvent storedEvent = new StoredEvent(aggregateId, eventType, payload, occurredAt);
repository.save(storedEvent);

} catch (Exception e) {
throw new RuntimeException(“イベントの保存に失敗しました: ” + event, e);
}
}
}
}

集約

注文(集約)
public class Order {

private String orderId;
private String customerId;
private String employeeId;
private LocalDate orderDate;
private List orderLines = new ArrayList();
private boolean confirmed = false;

// ドメインロジック → イベント生成
public List placeOrder(String orderId, String customerId, String employeeId, LocalDate orderDate, List lines) {
if (confirmed) {
throw new IllegalStateException(“すでに注文済みです”);
}
DomainEvent event = new OrderPlaced(orderId, customerId, employeeId, orderDate, lines, LocalDateTime.now());
return List.of(event);
}

public List cancelOrder() {
if (!confirmed) {
throw new IllegalStateException(“未確定の注文は取り消せません”);
}
DomainEvent event = new OrderCancelled(orderId, LocalDateTime.now());
return List.of(event);
}

public void apply(DomainEvent event) {
if (event instanceof OrderPlaced placed) {
this.orderId = placed.orderId();
this.customerId = placed.customerId();
this.employeeId = placed.employeeId();
this.orderDate = placed.orderDate();
this.orderLines = placed.orderLines();
this.confirmed = true;
} else if (event instanceof OrderCancelled) {
this.confirmed = false;
}
}

public void applyAll(List events) {
for (DomainEvent event : events) {
apply(event);
}
}

 //全てのイベントから最新の注文を再構築する
public static Order replay(List events) {
Order order = new Order();
order.applyAll(events);
return order;
}
}

コマンドハンドラー

コマンドハンドラー(アプリケーションサービス)
public class OrderCommandHandler {

private final EventStore eventStore;

public OrderCommandHandler(EventStore eventStore) {
this.eventStore = eventStore;
}

public void handle(PlaceOrderCommand command) {
// ① 過去のイベントから同じ注文IDの集約を再構築
List events = eventStore.loadEvents(command.orderId);
Order order = Order.replay(events);← ここではまだ新しいイベントは含まれていない

// ② ドメインロジックを実行して新しいイベントを生成
List newEvents = order.placeOrder(
command.orderId,
command.customerId,
command.employeeId,
command.orderDate,
command.orderLines
);

// ③ 新しいイベントをイベントストアに保存
eventStore.saveEvents(command.orderId, newEvents);

// ④ 状態を更新するために新しいイベントを適用
order.applyAll(newEvents);← ここで新しいイベントの状態が反映される。rderId や orderLines のセット、confirmed = true など

//⑤CQRSのクエリ処理にオブジェクトを発行
eventPublisher.publish(order); // 状態反映済みのオブジェクトを通知

}

public void handle(CancelOrderCommand command) {
List events = eventStore.loadEvents(command.orderId);
Order order = Order.replay(events);

List newEvents = order.cancelOrder();

eventStore.saveEvents(command.orderId, newEvents);

order.applyAll(newEvents);

eventPublisher.publish(order); // 状態反映済みのオブジェクトを通知
}
}

RestControllerの実装例

RestController
//RestControllerからコマンドハンドラーを呼ぶ
//コマンドハンドラーは、アプリケーションサービス。
@RestController
@RequestMapping(“/orders”)
public class OrderController {

private final OrderCommandHandler commandHandler;

public OrderController(OrderCommandHandler commandHandler) {
this.commandHandler = commandHandler;
}

@PostMapping
public ResponseEntity placeOrder(@RequestBody PlaceOrderRequest request) {
PlaceOrderCommand command = new PlaceOrderCommand(
request.orderId(),
request.customerId(),
request.employeeId(),
request.orderDate(),
request.orderLines()
);

commandHandler.handle(command);
return ResponseEntity.ok().build();
}

@PostMapping(“/{orderId}/cancel”)
public ResponseEntity cancelOrder(@PathVariable String orderId) {
CancelOrderCommand command = new CancelOrderCommand(orderId);
commandHandler.handle(command);
return ResponseEntity.ok().build();
}
}

CQRSの実装例

Publisher

public interface EventPublisher {
void publish(Order order);
}

import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

@Component
public class ActiveMQEventPublisher implements EventPublisher {

private final JmsTemplate jmsTemplate;

public ActiveMQEventPublisher(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}

@Override
public void publish(Order order) {
// イベント用DTOへ変換(できればシリアライズしやすい形式に)
OrderEventDto dto = OrderEventDto.from(order);
jmsTemplate.convertAndSend(“order-events-queue”, dto);
}
}

DTO

public class OrderEventDto {
public String orderId;
public String customerId;
public String employeeId;
public LocalDate orderDate;
public List orderLines;

public static OrderEventDto from(Order order) {
OrderEventDto dto = new OrderEventDto();
dto.orderId = order.getOrderId();
dto.customerId = order.getCustomerId();
dto.employeeId = order.getEmployeeId();
dto.orderDate = order.getOrderDate();
dto.orderLines = order.getOrderLines();
return dto;
}
}

Subscriber

@Component
public class OrderEventListener {

@JmsListener(destination = “order-events-queue”)
public void handle(OrderEventDto dto) {
// リードモデルに反映
System.out.println(“注文受信:” + dto.orderId);
}
}

イベントソーシングとドメイン駆動設計

ヴォーン・ヴァーノンの実践ドメイン駆動設計という書籍では、ドメイン駆動設計(DDD)の構成要素である集約単位にイベントストアを設ける設計が示されています。これは集約のライフサイクル境界と一貫性境界を一致させるため、整合性を保ちやすくなります。
また、Sam Newmanのマイクロサービスアーキテクチャという書籍では、DDDの集約境界とマイクロサービス境界を一致させることで、サービス間依存を最小化し、一貫性確保が容易になると説明しています。
集約は、ドメインオブジェクトのライフサイクルを管理する単位であるので、この単位にコマンド処理のマイクロサービスを設け、イベントソーシングによって集約のイベントを管理することで、過去の証跡が追跡可能なシステムを構築することができます。
次の図のように各マイクロサービスがイベント駆動で自律的にトランザクションを管理するSagaにイベントソーシングを組み込むことも可能です。これによって、各マイクロサービスが自律的にローカルトランザクションを管理しつつ、イベントソーシングによる追跡性と補償処理の容易性を両立できます。

イベントソーシングの適用例

最後に、DBの現在の状態に至る過程(イベント)の履歴を持たせる必要がある業務の例を紹介します。

監査証跡(Audit Trail)が必要な業務

監査証跡を確保することで、不正防止・法令遵守(コンプライアンス)・データ改ざん防止を実現することができます。
特徴

  • 過去にどのような変更が行われたのかを正確に再現できる必要がある。
  • 誰が・いつ・何を・どのように変更したのかを記録する必要がある。
  • 法規制・コンプライアンスの要件を満たす必要がある。

業務例

  • 金融・銀行業務
    口座の取引履歴(入出金、送金、手数料の発生など)
    投資商品の売買履歴
    ローンの支払いや残高の変動
  • 医療・ヘルスケア業務
    患者の診療記録の変更履歴(診断内容、投薬変更など)
    医療機器の使用履歴(いつ・どの患者に使用されたか)
  • 政府・法務・公共機関
    許認可の申請と審査プロセスの履歴
    裁判記録や判決の履歴
    税務申告・修正履歴
  • ERP(企業資源計画)
    発注、支払い、在庫の変更履歴
    予算管理や会計データの変更履歴

長期間の履歴を活用した分析が必要な業務

過去のデータを分析することで、将来のトレンド予測・改善施策の立案・異常検知が可能になります。
特徴

  • 過去のデータの変化を分析する必要がある(単なる最新の状態では不十分)。
  • 業務の改善や予測モデルに、過去の変更データが役立つ。

業務例

  • マーケティング・顧客行動分析
    ECサイトのカート履歴や購入履歴の変化
    ユーザーのアクセス履歴、ページ滞在時間、クリック履歴
  • IoT・センサー管理
    温度や湿度の変動履歴(工場や物流の品質管理)
    スマートメーターの電力使用履歴
  • SCM(サプライチェーン・マネジメント)
    商品の物流・在庫の変動履歴
    需要予測のための販売データの変化

状態が頻繁に変化し、過去の状態が業務の意思決定に影響を与える業務

契約変更や給与計算では、「ある時点の状態」が業務の判断基準となるため、変更履歴が必須です。
特徴

  • データの状態が頻繁に変わるため、過去の状態を再現する必要がある。
  • 過去の状態に基づいて意思決定を行う必要がある。

業務例

  • 契約管理(長期間有効な契約の変更履歴)
    保険契約の内容変更(補償範囲の変更、契約者の変更)
    通信プランの契約変更(料金プランの変更、オプション追加)
  • 人事・給与管理
    従業員の給与履歴(昇給・降給・手当の追加)
    人事評価の履歴(過去の評価データ)
  • 価格変動の履歴を追う業務
    株式・仮想通貨の価格変動履歴
    不動産の価格変動履歴

イベントの順序や影響関係が重要な業務

イベントの順序が不明確だと、データの整合性や一貫性が保てなくなるため必要です。
特徴

  • あるイベントの発生順序が重要であり、それによって処理の結果が変わる。
  • イベントの順序を正しく保持しないと、一貫性が失われる。

業務例

  • 注文処理・在庫管理
    どのタイミングで注文が確定し、出荷されたのか
    返品が発生した場合、いつの注文に紐づくのか
  • 製造業・プロジェクト管理
    生産プロセスの履歴(いつ・どの工程で作業が行われたか)
    プロジェクトの変更履歴(仕様変更、納期変更)
  • トランザクション管理(分散システム)
    分散データベースの整合性管理(例えば、分散トランザクションが失敗した場合のロールバック処理)

過去の状態を再現する必要がある業務

過去の状態を特定できないと、不正行為の特定、バグの修正、障害対応が困難になるため必要です。
特徴

  • 過去の状態を特定の時点で再現できる必要がある。
  • 障害発生時に、過去の状態を復元して問題を調査できる必要がある。

業務例

  • 金融取引のリカバリー
    特定の時点の株価や取引状態を復元し、不正取引や障害の影響を分析
  • ソフトウェア開発・バージョン管理
    変更履歴を元に、過去のコードの状態を復元できるようにする(GitやCI/CDのデプロイ履歴)
  • ゲームのセーブデータ管理
    プレイヤーの過去の状態を復元し、ゲーム進行の問題を解決

-DX, アプリケーション

執筆者: