forked from tulios/kafkajs
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathadmin.js
65 lines (55 loc) · 1.6 KB
/
admin.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
const fs = require('fs')
const ip = require('ip')
const { Kafka, logLevel } = require('../index')
const PrettyConsoleLogger = require('./prettyConsoleLogger')
const host = process.env.HOST_IP || ip.address()
const kafka = new Kafka({
logLevel: logLevel.INFO,
logCreator: PrettyConsoleLogger,
brokers: [`${host}:9094`, `${host}:9097`, `${host}:9100`],
clientId: 'test-admin-id',
ssl: {
servername: 'localhost',
rejectUnauthorized: false,
ca: [fs.readFileSync('./testHelpers/certs/cert-signed', 'utf-8')],
},
sasl: {
mechanism: 'plain',
username: 'test',
password: 'testtest',
},
})
const topic = 'topic-test1'
const admin = kafka.admin()
const run = async () => {
await admin.connect()
await admin.createTopics({
topics: [{ topic }],
waitForLeaders: true,
})
await admin.createPartitions({
topicPartitions: [{ topic: topic, count: 3 }],
})
}
run().catch(e => kafka.logger().error(`[example/admin] ${e.message}`, { stack: e.stack }))
const errorTypes = ['unhandledRejection', 'uncaughtException']
const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2']
errorTypes.map(type => {
process.on(type, async e => {
try {
kafka.logger().info(`process.on ${type}`)
kafka.logger().error(e.message, { stack: e.stack })
await admin.disconnect()
process.exit(0)
} catch (_) {
process.exit(1)
}
})
})
signalTraps.map(type => {
process.once(type, async () => {
console.log('')
kafka.logger().info('[example/admin] disconnecting')
await admin.disconnect()
})
})