-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathtransaction.ts
102 lines (95 loc) · 3.26 KB
/
transaction.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import { Either } from "fp-ts/lib/Either";
import { constant, identity } from "fp-ts/lib/function";
import { ask, fromTaskEither, ReaderTaskEither } from "fp-ts/lib/ReaderTaskEither";
import { fromEither, TaskEither, tryCatch } from "fp-ts/lib/TaskEither";
import { mixed } from "io-ts";
import {
isDriverQueryError,
isTransactionRollbackError,
makeUnhandledConnectionError,
PgTransactionRollbackError,
} from "./errors";
import {
ConnectedEnvironment,
Connection,
connectionLens,
TransactionError,
TransactionOptions,
} from "./types";
import { eitherToPromise } from "./utils/eitherToPromise";
import { SQL } from "./utils/sql";
export const defaultTxOptions: TransactionOptions = {
context: undefined,
deferrable: false,
isolation: "READ COMMITTED",
readOnly: false,
};
const beginTransactionQuery = ({ deferrable, isolation, readOnly }: TransactionOptions) => SQL`
BEGIN TRANSACTION
ISOLATION LEVEL ${() => isolation}
${() => (readOnly ? "READ ONLY" : "")}
${() => (deferrable ? "DEFERRABLE" : "")}
`;
const rollbackTransaction = (connection: Connection, context: mixed) => <L>(err: L) =>
tryCatch(
() =>
connection
.query(SQL`ROLLBACK;`, context)
.run()
.then(eitherToPromise)
.catch(rollbackErr =>
Promise.reject(new PgTransactionRollbackError(rollbackErr, err, context)),
)
.then(() => Promise.reject(err)),
e => (isTransactionRollbackError(e) ? e : (e as L)),
);
const commitTransaction = (connection: Connection, context: mixed) => <A>(a: A) =>
tryCatch(
() =>
connection
.query(SQL`COMMIT;`, context)
.run()
.then(eitherToPromise)
.then(constant(a)),
e => (isDriverQueryError(e) ? e : makeUnhandledConnectionError(e)),
);
const executeTransaction = <L, A>(
connection: Connection,
opts: TransactionOptions,
program: () => Promise<Either<L, A>>,
): TaskEither<TransactionError<L>, A> =>
connection
.query(beginTransactionQuery(opts), opts.context)
.mapLeft<TransactionError<L>>(identity)
.chain(() =>
tryCatch(
() =>
program().then(programE =>
programE
.fold<TaskEither<TransactionError<L>, A>>(
rollbackTransaction(connection, opts.context),
commitTransaction(connection, opts.context),
)
.run(),
),
makeUnhandledConnectionError,
),
)
.chain(fromEither);
export function withTransaction<E, L, A>(
x: Partial<TransactionOptions>,
y: ReaderTaskEither<E & ConnectedEnvironment, L, A>,
): ReaderTaskEither<E & ConnectedEnvironment, TransactionError<L>, A>;
export function withTransaction<E, L, A>(
x: ReaderTaskEither<E & ConnectedEnvironment, L, A>,
): ReaderTaskEither<E & ConnectedEnvironment, TransactionError<L>, A>;
export function withTransaction<E, L, A>(
x: any,
y?: any,
): ReaderTaskEither<E & ConnectedEnvironment, TransactionError<L>, A> {
const opts: TransactionOptions = y ? { ...defaultTxOptions, ...x } : defaultTxOptions;
const program: ReaderTaskEither<E & ConnectedEnvironment, L, A> = y || x;
return ask<E & ConnectedEnvironment, TransactionError<L>>()
.map(e => executeTransaction(connectionLens.get(e), opts, () => program.run(e)))
.chain(fromTaskEither);
}