Skip to main content

How to Publish a Domain Event to the Event Stream

1. Create a Domain Event and Dispatch it from the Aggregate

// domain/Event/ItemBonusAdded.ts
class ItemBonusAdded {
constructor(
public readonly itemId: number,
public readonly bonusId: number,
) {}
}

// domain/Entity/Item.ts
class Item extends AggregateRoot {
public addItemBonusId(bonusId: number) {
// Validate
if (bonusId < 0) {
throw new BonusIdMustBePositiveException();
}

// Update state
this.bonusIds.push(bonusId);

// Raise a new domain event
this.record(new ItemBonusAdded(this.id, bonusId));
}
}

2. Create a Saga to Listen for the Domain Event

// infrastructure/IntegrationEvents/Sagas/ItemDomainEvents.ts
class ItemDomainEvents {
@Saga()
public created$ = (events$: Observable<ItemCreated>): Observable<never> => {
return events$.pipe(
ofType(ItemCreated),
switchMap((domainEvent) => {
const eventMessage = this.createItemEvent(ItemEventNames.ItemCreated, domainEvent);

return forkJoin([
this.eventStream.emit(this.topic, domainEvent, eventMessage),
this.messageStream.emit(this.topic, domainEvent, eventMessage),
]);
}),
switchMap(() => EMPTY),
);
};

@Saga()
public bonusAdded$ = (events$: Observable<ItemBonusAdded>): Observable<never> => {
return events$.pipe(
ofType(ItemBonusAdded),
switchMap((domainEvent) => {
const eventMessage = this.createItemEvent(ItemEventNames.ItemUpdated, domainEvent);

return forkJoin([
this.eventStream.emit(this.topic, domainEvent, eventMessage),
this.messageStream.emit(this.topic, domainEvent, eventMessage),
]);
}),
switchMap(() => EMPTY),
);
};

public constructor(
private readonly eventStream: EventStream,
private readonly messageStream: MessageStream,
) {}

private createItemEvent(name: ItemEventNames, domainEvent: ItemCreated | ItemUpdated | ItemBonusAdded): ItemEvents {
return new ItemEventV2(name, domainEvent.id, {
id: domainEvent.id,
gameType: domainEvent.gameType,
warcraftId: domainEvent.warcraftId,
names: domainEvent.names,
// ...
});
}
}

3. Configure the Event Types

// @tsm/types/app-item
// DomainEvents/ItemEvents.ts
export type ItemEvents = ItemEventV1 | ItemEventV2;

export enum ItemEventNames {
ItemCreated = 'ItemCreated',
ItemUpdated = 'ItemUpdated',
}

/**
* Version 1
*/
export interface ItemEventV1Payload {
id: string;
gameType: GameType;
warcraftId: number;
names: Locales;
// ...
}
export class ItemEventV1 extends Event<ItemEventNames, 1, ItemEventV1Payload>(1) {}

/**
* Version 2
*/
export interface ItemEventV2Payload {
id: string;
gameType: GameType;
warcraftId: number;
names: Locales;
iconImage: string; // new Field
// ...
}
export class ItemEventV2 extends Event<ItemEventNames, 2, ItemEventV1Payload>(2) {}