-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
108 lines (108 loc) · 5.42 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
"use strict";
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }
return new (P || (P = Promise))(function (resolve, reject) {
function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); }
step((generator = generator.apply(thisArg, _arguments || [])).next());
});
};
Object.defineProperty(exports, "__esModule", { value: true });
require('dotenv').config();
const client_1 = require("@prisma/client");
const kafkajs_1 = require("kafkajs");
const parser_1 = require("./parser");
const email_1 = require("./email");
const solana_1 = require("./solana");
const prismaClient = new client_1.PrismaClient();
const TOPIC_NAME = "zap-events";
const kafka = new kafkajs_1.Kafka({
clientId: 'outbox-processor-2',
brokers: ['localhost:9092']
});
function main() {
return __awaiter(this, void 0, void 0, function* () {
const consumer = kafka.consumer({ groupId: 'main-worker' });
yield consumer.connect();
const producer = kafka.producer();
yield producer.connect();
yield consumer.subscribe({ topic: TOPIC_NAME, fromBeginning: true });
yield consumer.run({
autoCommit: false,
eachMessage: (_a) => __awaiter(this, [_a], void 0, function* ({ topic, partition, message }) {
var _b, _c, _d, _e, _f, _g, _h, _j;
console.log({
partition,
offset: message.offset,
value: (_b = message.value) === null || _b === void 0 ? void 0 : _b.toString(),
});
if (!((_c = message.value) === null || _c === void 0 ? void 0 : _c.toString())) {
return;
}
const parsedValue = JSON.parse((_d = message.value) === null || _d === void 0 ? void 0 : _d.toString());
const zapRunId = parsedValue.zapRunId;
const stage = parsedValue.stage;
const zapRunDetails = yield prismaClient.zapRun.findFirst({
where: {
id: zapRunId
},
include: {
zap: {
include: {
actions: {
include: {
type: true
}
}
}
},
}
});
const currentAction = zapRunDetails === null || zapRunDetails === void 0 ? void 0 : zapRunDetails.zap.actions.find(x => x.sortingOrder === stage);
if (!currentAction) {
console.log("Current action not found?");
return;
}
const zapRunMetadata = zapRunDetails === null || zapRunDetails === void 0 ? void 0 : zapRunDetails.metadata;
if (currentAction.type.id === "email") {
const body = (0, parser_1.parse)((_e = currentAction.metadata) === null || _e === void 0 ? void 0 : _e.body, zapRunMetadata);
const to = (0, parser_1.parse)((_f = currentAction.metadata) === null || _f === void 0 ? void 0 : _f.email, zapRunMetadata);
console.log(`Sending out email to ${to} body is ${body}`);
(0, email_1.sendEmail)(to, body);
}
if (currentAction.type.id === "send-sol") {
const amount = (0, parser_1.parse)((_g = currentAction.metadata) === null || _g === void 0 ? void 0 : _g.amount, zapRunMetadata);
const address = (0, parser_1.parse)((_h = currentAction.metadata) === null || _h === void 0 ? void 0 : _h.address, zapRunMetadata);
console.log(`Sending out SOL of ${amount} to address ${address}`);
(0, solana_1.sendSol)(address, amount);
}
//
yield new Promise(r => setTimeout(r, 500));
const lastStage = (((_j = zapRunDetails === null || zapRunDetails === void 0 ? void 0 : zapRunDetails.zap.actions) === null || _j === void 0 ? void 0 : _j.length) || 1) - 1; // 1
console.log(lastStage);
console.log(stage);
if (lastStage !== stage) {
console.log("pushing back to the queue");
yield producer.send({
topic: TOPIC_NAME,
messages: [{
value: JSON.stringify({
stage: stage + 1,
zapRunId
})
}]
});
}
console.log("processing done");
//
yield consumer.commitOffsets([{
topic: TOPIC_NAME,
partition: partition,
offset: (parseInt(message.offset) + 1).toString() // 5
}]);
}),
});
});
}
main();