forked from PrestaShopCorp/cqrs
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathquery-bus.ts
93 lines (80 loc) · 2.75 KB
/
query-bus.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import { Injectable, Type } from '@nestjs/common';
import { ModuleRef } from '@nestjs/core';
import 'reflect-metadata';
import { QUERY_HANDLER_METADATA } from './decorators/constants';
import { QueryHandlerNotFoundException } from './exceptions';
import { InvalidQueryHandlerException } from './exceptions/invalid-query-handler.exception';
import { DefaultQueryPubSub } from './helpers/default-query-pubsub';
import {
IQuery,
IQueryBus,
IQueryHandler,
IQueryPublisher,
IQueryResult,
} from './interfaces';
import { ObservableBus } from './utils/observable-bus';
export type QueryHandlerType<
QueryBase extends IQuery = IQuery,
QueryResultBase extends IQueryResult = IQueryResult
> = Type<IQueryHandler<QueryBase, QueryResultBase>>;
@Injectable()
export class QueryBus<QueryBase extends IQuery = IQuery>
extends ObservableBus<QueryBase>
implements IQueryBus<QueryBase> {
private handlers = new Map<string, IQueryHandler<QueryBase, IQueryResult>>();
private _publisher: IQueryPublisher<QueryBase>;
constructor(private readonly moduleRef: ModuleRef) {
super();
this.useDefaultPublisher();
}
get publisher(): IQueryPublisher<QueryBase> {
return this._publisher;
}
set publisher(_publisher: IQueryPublisher<QueryBase>) {
this._publisher = _publisher;
}
async execute<T extends QueryBase, TResult = any>(
query: T,
): Promise<TResult> {
const queryName = this.getQueryName((query as any) as Function);
const handler = this.handlers.get(queryName);
if (!handler) {
throw new QueryHandlerNotFoundException(queryName);
}
this.subject$.next(query);
const result = await handler.execute(query);
return result as TResult;
}
bind<T extends QueryBase, TResult = any>(
handler: IQueryHandler<T, TResult>,
name: string,
) {
this.handlers.set(name, handler);
}
register(handlers: QueryHandlerType<QueryBase>[] = []) {
handlers.forEach((handler) => this.registerHandler(handler));
}
protected registerHandler(handler: QueryHandlerType<QueryBase>) {
const instance = this.moduleRef.get(handler, { strict: false });
if (!instance) {
return;
}
const target = this.reflectQueryName(handler);
if (!target) {
throw new InvalidQueryHandlerException();
}
this.bind(instance as IQueryHandler<QueryBase, IQueryResult>, target.name);
}
private getQueryName(query: Function): string {
const { constructor } = Object.getPrototypeOf(query);
return constructor.name as string;
}
private reflectQueryName(
handler: QueryHandlerType<QueryBase>,
): FunctionConstructor {
return Reflect.getMetadata(QUERY_HANDLER_METADATA, handler);
}
private useDefaultPublisher() {
this._publisher = new DefaultQueryPubSub<QueryBase>(this.subject$);
}
}