Skip to content

Commit

Permalink
Merge branch 'fully-generic-events' of https://github.com/boenrobot/cqrs
Browse files Browse the repository at this point in the history
 into boenrobot-fully-generic-events
  • Loading branch information
kamilmysliwiec committed May 13, 2020
2 parents 7719e87 + 88f0e5f commit 0693a8d
Show file tree
Hide file tree
Showing 14 changed files with 91 additions and 67 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,4 @@
"git add"
]
}
}
}
18 changes: 10 additions & 8 deletions src/aggregate-root.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { IEvent } from './interfaces/index';
import { IEvent } from './interfaces';

const INTERNAL_EVENTS = Symbol();
const IS_AUTO_COMMIT_ENABLED = Symbol();

export abstract class AggregateRoot {
export abstract class AggregateRoot<EventBase extends IEvent = IEvent> {
public [IS_AUTO_COMMIT_ENABLED] = false;
private readonly [INTERNAL_EVENTS]: IEvent[] = [];
private readonly [INTERNAL_EVENTS]: EventBase[] = [];

set autoCommit(value: boolean) {
this[IS_AUTO_COMMIT_ENABLED] = value;
Expand All @@ -15,7 +15,7 @@ export abstract class AggregateRoot {
return this[IS_AUTO_COMMIT_ENABLED];
}

publish(event: IEvent) {}
publish<T extends EventBase = EventBase>(event: T) {}

commit() {
this[INTERNAL_EVENTS].forEach(event => this.publish(event));
Expand All @@ -26,15 +26,15 @@ export abstract class AggregateRoot {
this[INTERNAL_EVENTS].length = 0;
}

getUncommittedEvents(): IEvent[] {
getUncommittedEvents(): EventBase[] {
return this[INTERNAL_EVENTS];
}

loadFromHistory(history: IEvent[]) {
loadFromHistory(history: EventBase[]) {
history.forEach(event => this.apply(event, true));
}

apply(event: IEvent, isFromHistory = false) {
apply<T extends EventBase = EventBase>(event: T, isFromHistory = false) {
if (!isFromHistory && !this.autoCommit) {
this[INTERNAL_EVENTS].push(event);
}
Expand All @@ -44,7 +44,9 @@ export abstract class AggregateRoot {
handler && handler.call(this, event);
}

private getEventHandler(event: IEvent): Function | undefined {
private getEventHandler<T extends EventBase = EventBase>(
event: T,
): Function | undefined {
const handler = `on${this.getEventName(event)}`;
return this[handler];
}
Expand Down
8 changes: 5 additions & 3 deletions src/cqrs.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@ import { Module, OnModuleInit } from '@nestjs/common';
import { CommandBus } from './command-bus';
import { EventBus } from './event-bus';
import { EventPublisher } from './event-publisher';
import { IEvent } from './interfaces';
import { QueryBus } from './query-bus';
import { ExplorerService } from './services/explorer.service';

@Module({
providers: [CommandBus, QueryBus, EventBus, EventPublisher, ExplorerService],
exports: [CommandBus, QueryBus, EventBus, EventPublisher],
})
export class CqrsModule implements OnModuleInit {
export class CqrsModule<EventBase extends IEvent = IEvent>
implements OnModuleInit {
constructor(
private readonly explorerService: ExplorerService,
private readonly eventsBus: EventBus,
private readonly explorerService: ExplorerService<EventBase>,
private readonly eventsBus: EventBus<EventBase>,
private readonly commandsBus: CommandBus,
private readonly queryBus: QueryBus,
) {}
Expand Down
60 changes: 32 additions & 28 deletions src/event-bus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,61 +5,70 @@ import { filter } from 'rxjs/operators';
import { isFunction } from 'util';
import { CommandBus } from './command-bus';
import { EVENTS_HANDLER_METADATA, SAGA_METADATA } from './decorators/constants';
import { InvalidSagaException } from './exceptions/invalid-saga.exception';
import { InvalidSagaException } from './exceptions';
import { DefaultPubSub } from './helpers/default-pubsub';
import { defaultGetEventName } from './helpers/default-get-event-name';
import {
IEvent,
IEventBus,
IEventHandler,
IEventPublisher,
ISaga,
} from './interfaces';
import { ObservableBus } from './utils/observable-bus';
import { ObservableBus } from './utils';

export type EventHandlerType = Type<IEventHandler<IEvent>>;
export type EventHandlerType<EventBase extends IEvent = IEvent> = Type<
IEventHandler<EventBase>
>;

@Injectable()
export class EventBus extends ObservableBus<IEvent>
implements IEventBus, OnModuleDestroy {
private _publisher: IEventPublisher;
private readonly subscriptions: Subscription[];
export class EventBus<EventBase extends IEvent = IEvent>
extends ObservableBus<EventBase>
implements IEventBus<EventBase>, OnModuleDestroy {
protected getEventName: (event: EventBase) => string;
private _publisher: IEventPublisher<EventBase>;
protected readonly subscriptions: Subscription[];

constructor(
private readonly commandBus: CommandBus,
private readonly moduleRef: ModuleRef,
) {
super();
this.subscriptions = [];
this.getEventName = defaultGetEventName;
this.useDefaultPublisher();
}

get publisher(): IEventPublisher {
get publisher(): IEventPublisher<EventBase> {
return this._publisher;
}

set publisher(_publisher: IEventPublisher) {
set publisher(_publisher: IEventPublisher<EventBase>) {
this._publisher = _publisher;
}

onModuleDestroy() {
this.subscriptions.forEach(subscription => subscription.unsubscribe());
}

publish<T extends IEvent>(event: T) {
this._publisher.publish(event);
publish<T extends EventBase>(event: T) {
return this._publisher.publish(event);
}

publishAll(events: IEvent[]) {
(events || []).forEach(event => this._publisher.publish(event));
publishAll<T extends EventBase>(events: T[]) {
if (this._publisher.publishAll) {
return this._publisher.publishAll(events);
}
return (events || []).map(event => this._publisher.publish(event));
}

bind(handler: IEventHandler<IEvent>, name: string) {
bind(handler: IEventHandler<EventBase>, name: string) {
const stream$ = name ? this.ofEventName(name) : this.subject$;
const subscription = stream$.subscribe(event => handler.handle(event));
this.subscriptions.push(subscription);
}

registerSagas(types: Type<any>[] = []) {
registerSagas(types: Type<unknown>[] = []) {
const sagas = types
.map(target => {
const metadata = Reflect.getMetadata(SAGA_METADATA, target) || [];
Expand All @@ -74,18 +83,18 @@ export class EventBus extends ObservableBus<IEvent>
sagas.forEach(saga => this.registerSaga(saga));
}

register(handlers: EventHandlerType[] = []) {
register(handlers: EventHandlerType<EventBase>[] = []) {
handlers.forEach(handler => this.registerHandler(handler));
}

protected registerHandler(handler: EventHandlerType) {
protected registerHandler(handler: EventHandlerType<EventBase>) {
const instance = this.moduleRef.get(handler, { strict: false });
if (!instance) {
return;
}
const eventsNames = this.reflectEventsNames(handler);
eventsNames.map(event =>
this.bind(instance as IEventHandler<IEvent>, event.name),
this.bind(instance as IEventHandler<EventBase>, event.name),
);
}

Expand All @@ -95,12 +104,7 @@ export class EventBus extends ObservableBus<IEvent>
);
}

private getEventName(event): string {
const { constructor } = Object.getPrototypeOf(event);
return constructor.name as string;
}

protected registerSaga(saga: ISaga) {
protected registerSaga(saga: ISaga<EventBase>) {
if (!isFunction(saga)) {
throw new InvalidSagaException();
}
Expand All @@ -116,13 +120,13 @@ export class EventBus extends ObservableBus<IEvent>
this.subscriptions.push(subscription);
}

private reflectEventsNames(handler: EventHandlerType): FunctionConstructor[] {
private reflectEventsNames(
handler: EventHandlerType<EventBase>,
): FunctionConstructor[] {
return Reflect.getMetadata(EVENTS_HANDLER_METADATA, handler);
}

private useDefaultPublisher() {
const pubSub = new DefaultPubSub();
pubSub.bridgeEventsTo(this.subject$);
this._publisher = pubSub;
this._publisher = new DefaultPubSub<EventBase>(this.subject$);
}
}
16 changes: 9 additions & 7 deletions src/event-publisher.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,30 @@
import { Injectable } from '@nestjs/common';
import { EventBus } from './event-bus';
import { AggregateRoot } from './aggregate-root';
import { IEvent } from './interfaces/index';
import { IEvent } from './interfaces';

export interface Constructor<T> {
new (...args: any[]): T;
}

@Injectable()
export class EventPublisher {
constructor(private readonly eventBus: EventBus) {}
export class EventPublisher<EventBase extends IEvent = IEvent> {
constructor(private readonly eventBus: EventBus<EventBase>) {}

mergeClassContext<T extends Constructor<AggregateRoot>>(metatype: T): T {
mergeClassContext<T extends Constructor<AggregateRoot<EventBase>>>(
metatype: T,
): T {
const eventBus = this.eventBus;
return class extends metatype {
publish(event: IEvent) {
publish(event: EventBase) {
eventBus.publish(event);
}
};
}

mergeObjectContext<T extends AggregateRoot>(object: T): T {
mergeObjectContext<T extends AggregateRoot<EventBase>>(object: T): T {
const eventBus = this.eventBus;
object.publish = (event: IEvent) => {
object.publish = (event: EventBase) => {
eventBus.publish(event);
};
return object;
Expand Down
8 changes: 8 additions & 0 deletions src/helpers/default-get-event-name.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { IEvent } from '../interfaces';

export const defaultGetEventName = <EventBase extends IEvent = IEvent>(
event: EventBase,
): string => {
const { constructor } = Object.getPrototypeOf(event);
return constructor.name as string;
};
12 changes: 5 additions & 7 deletions src/helpers/default-pubsub.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
import { Subject } from 'rxjs';
import { IEvent, IEventPublisher, IMessageSource } from '../interfaces';

export class DefaultPubSub implements IEventPublisher, IMessageSource {
private subject$: Subject<any>;
export class DefaultPubSub<EventBase extends IEvent>
implements IEventPublisher<EventBase>, IMessageSource<EventBase> {
constructor(private subject$: Subject<EventBase>) {}

publish<T extends IEvent>(event: T) {
if (!this.subject$) {
throw new Error('Invalid underlying subject (call bridgeEventsTo())');
}
publish<T extends EventBase>(event: T) {
this.subject$.next(event);
}

bridgeEventsTo<T extends IEvent>(subject: Subject<T>) {
bridgeEventsTo<T extends EventBase>(subject: Subject<T>) {
this.subject$ = subject;
}
}
2 changes: 1 addition & 1 deletion src/interfaces/commands/command.interface.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export interface ICommand {}
export interface ICommand {}
6 changes: 3 additions & 3 deletions src/interfaces/events/event-bus.interface.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { IEvent } from './event.interface';

export interface IEventBus {
publish<T extends IEvent>(event: T);
publishAll(events: IEvent[]);
export interface IEventBus<EventBase extends IEvent = IEvent> {
publish<T extends EventBase>(event: T);
publishAll(events: EventBase[]);
}
5 changes: 3 additions & 2 deletions src/interfaces/events/event-publisher.interface.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { IEvent } from './event.interface';

export interface IEventPublisher {
publish<T extends IEvent>(event: T);
export interface IEventPublisher<EventBase extends IEvent = IEvent> {
publish<T extends EventBase = EventBase>(event: T);
publishAll?<T extends EventBase = EventBase>(events: T[]);
}
2 changes: 1 addition & 1 deletion src/interfaces/events/event.interface.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export interface IEvent {}
export interface IEvent {}
4 changes: 2 additions & 2 deletions src/interfaces/events/message-source.interface.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Subject } from 'rxjs';
import { IEvent } from './event.interface';

export interface IMessageSource {
bridgeEventsTo<T extends IEvent>(subject: Subject<T>);
export interface IMessageSource<EventBase extends IEvent = IEvent> {
bridgeEventsTo<T extends EventBase>(subject: Subject<T>);
}
4 changes: 3 additions & 1 deletion src/interfaces/saga.type.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@ import { Observable } from 'rxjs';
import { ICommand } from './commands/command.interface';
import { IEvent } from './events/event.interface';

export type ISaga = (events$: Observable<IEvent>) => Observable<ICommand>;
export type ISaga<EventBase extends IEvent = IEvent> = (
events$: Observable<EventBase>,
) => Observable<ICommand>;
11 changes: 8 additions & 3 deletions src/services/explorer.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,16 @@ import {
QUERY_HANDLER_METADATA,
SAGA_METADATA,
} from '../decorators/constants';
import { ICommandHandler, IEventHandler, IQueryHandler } from '../interfaces';
import {
ICommandHandler,
IEvent,
IEventHandler,
IQueryHandler,
} from '../interfaces';
import { CqrsOptions } from '../interfaces/cqrs-options.interface';

@Injectable()
export class ExplorerService {
export class ExplorerService<EventBase extends IEvent = IEvent> {
constructor(private readonly modulesContainer: ModulesContainer) {}

explore(): CqrsOptions {
Expand All @@ -23,7 +28,7 @@ export class ExplorerService {
const queries = this.flatMap<IQueryHandler>(modules, instance =>
this.filterProvider(instance, QUERY_HANDLER_METADATA),
);
const events = this.flatMap<IEventHandler>(modules, instance =>
const events = this.flatMap<IEventHandler<EventBase>>(modules, instance =>
this.filterProvider(instance, EVENTS_HANDLER_METADATA),
);
const sagas = this.flatMap(modules, instance =>
Expand Down

0 comments on commit 0693a8d

Please sign in to comment.