-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkafka-client.js
48 lines (48 loc) · 1.85 KB
/
kafka-client.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.KafkaClient = void 0;
const js_binding_js_1 = require("./js-binding.js");
const kafka_stream_readable_1 = require("./kafka-stream-readable");
/**
* KafkaClient class
*/
class KafkaClient {
/**
* Creates a KafkaClient instance
* @throws {Error} If the configuration is invalid
*/
constructor(kafkaConfiguration) {
this.kafkaConfiguration = kafkaConfiguration;
this.kafkaClientConfig = new js_binding_js_1.KafkaClientConfig(this.kafkaConfiguration);
}
/**
* Creates a KafkaProducer instance
* @param {ProducerConfiguration} [producerConfiguration] - Optional producer configuration
* @returns {KafkaProducer} A KafkaProducer instance
*/
createProducer(producerConfiguration) {
if (producerConfiguration) {
return this.kafkaClientConfig.createProducer(producerConfiguration);
}
return this.kafkaClientConfig.createProducer({});
}
/**
* Creates a KafkaConsumer instance
* @param {ConsumerConfiguration} consumerConfiguration - Consumer configuration
* @returns {KafkaConsumer} A KafkaConsumer instance
* @throws {Error} If the configuration is invalid
*/
createConsumer(consumerConfiguration) {
return this.kafkaClientConfig.createConsumer(consumerConfiguration);
}
/**
* Creates a KafkaStreamReadable instance
* @param {ConsumerConfiguration} consumerConfiguration - Consumer configuration
* @returns {KafkaStreamReadable} A KafkaStreamReadable instance
* @throws {Error} If the configuration is invalid
*/
createStreamConsumer(consumerConfiguration) {
return new kafka_stream_readable_1.KafkaStreamReadable(this.kafkaClientConfig.createConsumer(consumerConfiguration));
}
}
exports.KafkaClient = KafkaClient;