-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathpool.ts
125 lines (114 loc) · 3.77 KB
/
pool.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import { Either, left, right } from "fp-ts/lib/Either";
import { compose, identity } from "fp-ts/lib/function";
import { tryCatch as ioTryCatch } from "fp-ts/lib/IOEither";
import { fromNullable } from "fp-ts/lib/Option";
import { ask, fromTaskEither, ReaderTaskEither } from "fp-ts/lib/ReaderTaskEither";
import {
fromEither,
fromIOEither,
TaskEither,
taskEither,
tryCatch as taskEitherTryCatch,
} from "fp-ts/lib/TaskEither";
import * as pg from "pg";
import { wrapPoolClient } from "./connection";
import {
isPoolCreationError,
isTransactionRollbackError,
makePoolCheckoutError,
makePoolCreationError,
makePoolShutdownError,
makeUnhandledConnectionError,
makeUnhandledPoolError,
PgPoolCreationError,
PgTypeParserSetupError,
PgUnhandledConnectionError,
} from "./errors";
import { setupParsers } from "./parser";
import {
ConnectedEnvironment,
Connection,
ConnectionError,
ConnectionPool,
ConnectionPoolConfig,
ConnectionSymbol,
} from "./types";
export const makeConnectionPool = (
poolConfig: ConnectionPoolConfig,
): TaskEither<PgPoolCreationError | PgTypeParserSetupError, ConnectionPool> => {
const { onError, parsers } = poolConfig;
const poolIo = ioTryCatch(() => {
const pool = new pg.Pool(poolConfig);
pool.on(
"error",
compose(
onError,
makeUnhandledPoolError,
),
);
return pool;
}, makePoolCreationError).mapLeft(error =>
isPoolCreationError(error) ? error : makePoolCreationError(error),
);
return fromIOEither(poolIo)
.mapLeft<PgPoolCreationError | PgTypeParserSetupError>(identity)
.chain(pool =>
fromNullable(parsers)
.map(setupParsers(pool))
.getOrElse(taskEither.of<PgTypeParserSetupError, pg.Pool>(pool)),
)
.map(wrapConnectionPool);
};
const checkoutConnection = (pool: pg.Pool) =>
taskEitherTryCatch(() => pool.connect(), makePoolCheckoutError);
const executeProgramWithConnection = <E extends {}, L, A>(
environment: E,
program: ReaderTaskEither<E & ConnectedEnvironment, L, A>,
) => (connection: Connection): TaskEither<PgUnhandledConnectionError | L, A> =>
new TaskEither(
taskEitherTryCatch(
() => program.run(Object.assign({}, environment, { [ConnectionSymbol]: connection })),
makeUnhandledConnectionError,
)
.mapLeft<PgUnhandledConnectionError | L>(identity)
.chain(fromEither)
.fold<Either<PgUnhandledConnectionError | L, A>>(
err => {
// If a rollback error reaches this point, we should assume the connection
// is poisoned and ask the pool implementation to dispose of it.
connection.release(isTransactionRollbackError(err) ? err : undefined);
return left(err);
},
a => {
connection.release();
return right(a);
},
),
);
const withConnectionFromPool = (pool: pg.Pool) => <L, A>(
program: ReaderTaskEither<ConnectedEnvironment, L, A>,
) =>
checkoutConnection(pool)
.map(wrapPoolClient)
.mapLeft<ConnectionError<L>>(identity)
.chain(executeProgramWithConnection({}, program));
const withConnectionEFromPool = (pool: pg.Pool) => <E extends {}, L, A>(
program: ReaderTaskEither<E & ConnectedEnvironment, L, A>,
) =>
ask<E, ConnectionError<L>>()
.map(environment =>
checkoutConnection(pool)
.map(wrapPoolClient)
.mapLeft<ConnectionError<L>>(identity)
.chain(executeProgramWithConnection(environment, program)),
)
.chain(fromTaskEither);
export const wrapConnectionPool = (pool: pg.Pool): ConnectionPool => ({
end: () =>
taskEitherTryCatch(
() => ((pool as any).ending ? Promise.resolve<void>(undefined) : pool.end()),
makePoolShutdownError,
),
withConnection: withConnectionFromPool(pool),
withConnectionE: withConnectionEFromPool(pool),
});