How to Publish a Domain Event to the Event Stream
1. Create a Domain Event and Dispatch it from the Aggregate
// domain/Event/ItemBonusAdded.ts
class ItemBonusAdded {
constructor(
public readonly itemId: number,
public readonly bonusId: number,
) {}
}
// domain/Entity/Item.ts
class Item extends AggregateRoot {
public addItemBonusId(bonusId: number) {
// Validate
if (bonusId < 0) {
throw new BonusIdMustBePositiveException();
}
// Update state
this.bonusIds.push(bonusId);
// Raise a new domain event
this.record(new ItemBonusAdded(this.id, bonusId));
}
}
2. Create a Saga to Listen for the Domain Event
// infrastructure/IntegrationEvents/Sagas/ItemDomainEvents.ts
class ItemDomainEvents {
@Saga()
public created$ = (events$: Observable<ItemCreated>): Observable<never> => {
return events$.pipe(
ofType(ItemCreated),
switchMap((domainEvent) => {
const eventMessage = this.createItemEvent(ItemEventNames.ItemCreated, domainEvent);
return forkJoin([
this.eventStream.emit(this.topic, domainEvent, eventMessage),
this.messageStream.emit(this.topic, domainEvent, eventMessage),
]);
}),
switchMap(() => EMPTY),
);
};
@Saga()
public bonusAdded$ = (events$: Observable<ItemBonusAdded>): Observable<never> => {
return events$.pipe(
ofType(ItemBonusAdded),
switchMap((domainEvent) => {
const eventMessage = this.createItemEvent(ItemEventNames.ItemUpdated, domainEvent);
return forkJoin([
this.eventStream.emit(this.topic, domainEvent, eventMessage),
this.messageStream.emit(this.topic, domainEvent, eventMessage),
]);
}),
switchMap(() => EMPTY),
);
};
public constructor(
private readonly eventStream: EventStream,
private readonly messageStream: MessageStream,
) {}
private createItemEvent(name: ItemEventNames, domainEvent: ItemCreated | ItemUpdated | ItemBonusAdded): ItemEvents {
return new ItemEventV2(name, domainEvent.id, {
id: domainEvent.id,
gameType: domainEvent.gameType,
warcraftId: domainEvent.warcraftId,
names: domainEvent.names,
// ...
});
}
}
3. Configure the Event Types
// @tsm/types/app-item
// DomainEvents/ItemEvents.ts
export type ItemEvents = ItemEventV1 | ItemEventV2;
export enum ItemEventNames {
ItemCreated = 'ItemCreated',
ItemUpdated = 'ItemUpdated',
}
/**
* Version 1
*/
export interface ItemEventV1Payload {
id: string;
gameType: GameType;
warcraftId: number;
names: Locales;
// ...
}
export class ItemEventV1 extends Event<ItemEventNames, 1, ItemEventV1Payload>(1) {}
/**
* Version 2
*/
export interface ItemEventV2Payload {
id: string;
gameType: GameType;
warcraftId: number;
names: Locales;
iconImage: string; // new Field
// ...
}
export class ItemEventV2 extends Event<ItemEventNames, 2, ItemEventV1Payload>(2) {}