@@ -29,17 +29,8 @@ import kotlinx.coroutines.runBlocking
29
29
import org.apache.kafka.clients.consumer.ConsumerConfig
30
30
import org.apache.kafka.clients.producer.ProducerConfig
31
31
import org.apache.kafka.common.serialization.Serdes
32
- import org.apache.kafka.streams.KafkaStreams
33
- import org.apache.kafka.streams.StreamsConfig
34
- import org.apache.kafka.streams.Topology
35
- import org.apache.kafka.streams.processor.WallclockTimestampExtractor
36
- import java.io.InputStream
37
- import java.lang.StringBuilder
38
- import java.net.URL
39
- import java.util.Properties
40
- import java.util.Stack
41
- import java.util.UUID
42
32
import org.apache.kafka.common.utils.Time
33
+ import org.apache.kafka.common.utils.Utils
43
34
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy
44
35
import org.apache.kafka.connect.runtime.Connect
45
36
import org.apache.kafka.connect.runtime.Herder
@@ -51,10 +42,16 @@ import org.apache.kafka.connect.runtime.standalone.StandaloneConfig
51
42
import org.apache.kafka.connect.runtime.standalone.StandaloneHerder
52
43
import org.apache.kafka.connect.storage.FileOffsetBackingStore
53
44
import org.apache.kafka.connect.util.ConnectUtils
45
+ import org.apache.kafka.streams.KafkaStreams
46
+ import org.apache.kafka.streams.StreamsConfig
47
+ import org.apache.kafka.streams.Topology
48
+ import org.apache.kafka.streams.processor.WallclockTimestampExtractor
49
+ import java.io.InputStream
54
50
import java.net.URI
55
-
56
-
57
-
51
+ import java.net.URL
52
+ import java.util.Properties
53
+ import java.util.Stack
54
+ import java.util.UUID
58
55
59
56
private val logger = mu.KotlinLogging .logger {}
60
57
@@ -169,9 +166,7 @@ class Stream(override val topologyContext: TopologyContext, val topologyConstruc
169
166
fun renderAndSchedule (connectorURL : URL ? , settings : InputStream , force : Boolean = false): KafkaStreams {
170
167
val prop = Properties ()
171
168
prop.load(settings)
172
- val propMap = mutableMapOf<String , String >()
173
- prop.forEach { (k, v) -> propMap.put(k as String , v as String ) }
174
- return renderAndSchedule(connectorURL, prop[StreamsConfig .BOOTSTRAP_SERVERS_CONFIG ] as String , force, propMap)
169
+ return renderAndSchedule(connectorURL, prop[StreamsConfig .BOOTSTRAP_SERVERS_CONFIG ] as String , force, Utils .propsToStringMap(prop))
175
170
}
176
171
177
172
fun renderAndSchedule (connectorURL : URL ? , kafkaHosts : String , kafkaUsername : String , kafkaPassword : String , replicationFactor : Int , force : Boolean = false): KafkaStreams {
@@ -186,41 +181,6 @@ class Stream(override val topologyContext: TopologyContext, val topologyConstruc
186
181
return renderAndSchedule(connectorURL, kafkaHosts, force, properties)
187
182
}
188
183
189
-
190
- /* *
191
- * Will create an executable definition of the str
192
- * eam (@see render), then will start the topology by starting a streams
193
- * instance pointing at the kafka cluster at kafkaHosts, using the supplied clientId.
194
- * Finally, it will POST the supplied
195
- */
196
- fun renderAndRun (kafkaHosts : String , replicationFactor : Int , extraSettings : Map <String , Any >? = null, kafkaUsername : String? = null, kafkaPassword : String? = null): KafkaStreams {
197
- val (topology, materializedConnectors) = renderLocal()
198
- val properties = mutableMapOf<String ,Any >(
199
- StreamsConfig .BOOTSTRAP_SERVERS_CONFIG to kafkaHosts,
200
- " acks" to " all" ,
201
- StreamsConfig .REPLICATION_FACTOR_CONFIG to replicationFactor
202
- )
203
- if (kafkaUsername!= null && kafkaPassword!= null ) {
204
- properties[" security.protocol" ] = " SASL_SSL"
205
- properties[" sasl.jaas.config" ] = " \" org.apache.kafka.common.security.plain.PlainLoginModule required username='$kafkaUsername ' password='$kafkaPassword ';\" "
206
- properties[" sasl.mechanism" ] = " PLAIN"
207
- }
208
-
209
- topologyConstructor.createTopicsAsNeeded(properties)
210
-
211
- val appId = topologyContext.applicationId()
212
- val extra: MutableMap <String , Any > = mutableMapOf ()
213
- if (extraSettings != null ) {
214
- extra.putAll(extraSettings)
215
- }
216
- val streams = runTopology(topology, appId, kafkaHosts, " storagePath" , extra)
217
- materializedConnectors.forEach {
218
- logger.info(" Starting connector named: ${it.name} to settings: ${it.settings} " )
219
- }
220
- logger.info { " Topology running!" }
221
- return streams
222
- }
223
-
224
184
/* *
225
185
* Will create an executable definition of the str
226
186
* eam (@see render), then will start the topology by starting a streams
@@ -241,41 +201,45 @@ class Stream(override val topologyContext: TopologyContext, val topologyConstruc
241
201
startConstructor(name, topologyContext, it, json, force)
242
202
}
243
203
}
204
+ instantiateLocalSinks(settings)
205
+ val appId = topologyContext.topicName(" @applicationId" )
206
+ val extra: MutableMap <String , Any > = mutableMapOf ()
207
+ extra.putAll(settings)
208
+ val streams = runTopology(topology, appId, kafkaHosts, " storagePath" , extra)
209
+ logger.info { " Topology running!" }
210
+ runBlocking {
211
+ monitor?.invoke(this @Stream,streams)
212
+ }
213
+ return streams
214
+ }
215
+
216
+ private fun instantiateLocalSinks (settings : Map <String , String >) {
244
217
var herder: Herder ? = null
245
- if (localSinkConfigurations.isNotEmpty()) {
218
+ if (localSinkConfigurations.isNotEmpty()) {
246
219
herder = startLocalConnect(settings)
247
220
}
248
221
var count = 0
249
222
localSinkConfigurations.flatMap {
250
223
it.instantiateSinkElements()
251
224
}.forEach {
252
- val localSettings = mutableMapOf<String ,String >()
225
+ val localSettings = mutableMapOf<String , String >()
253
226
localSettings.putAll(it)
254
227
val name = " conn-${count++ } "
255
228
localSettings[" name" ] = name
256
- herder?.putConnectorConfig(name,localSettings,true ) { err,created ->
257
- if (err!= null ) {
258
- logger.error(" Error creating connector:" ,err)
229
+ herder?.putConnectorConfig(name, localSettings, true ) { err, created ->
230
+ if (err != null ) {
231
+ logger.error(" Error creating connector:" , err)
259
232
}
260
- logger.info(" something happened " )
233
+ logger.info(" Instantiated: ${created?.created()} result: ${created?.result()} " )
261
234
}
262
235
}
263
- val appId = topologyContext.topicName(" @applicationId" )
264
- val extra: MutableMap <String , Any > = mutableMapOf ()
265
- extra.putAll(settings)
266
- val streams = runTopology(topology, appId, kafkaHosts, " storagePath" , extra)
267
- logger.info { " Topology running!" }
268
- runBlocking {
269
- monitor?.invoke(this @Stream,streams)
270
- }
271
- return streams
272
236
}
273
237
274
238
private fun startLocalConnect (initialWorkerProps : Map <String , String >): Herder {
275
239
val workerProps = mutableMapOf<String ,String >()
276
240
workerProps.putAll(initialWorkerProps)
277
- workerProps.put( " key.converter" , " org.apache.kafka.connect.json.JsonConverter" )
278
- workerProps.put( " value.converter" , " org.apache.kafka.connect.json.JsonConverter" )
241
+ workerProps[ " key.converter" ] = " org.apache.kafka.connect.json.JsonConverter"
242
+ workerProps[ " value.converter" ] = " org.apache.kafka.connect.json.JsonConverter"
279
243
workerProps[" offset.storage.file.filename" ] = " offset"
280
244
val plugins = Plugins (workerProps)
281
245
val config = StandaloneConfig (workerProps)
@@ -290,7 +254,7 @@ class Stream(override val topologyContext: TopologyContext, val topologyConstruc
290
254
val rest = RestServer (config)
291
255
rest.initializeServer()
292
256
val advertisedUrl: URI = rest.advertisedUrl()
293
- val workerId: String = advertisedUrl.getHost() .toString() + " :" + advertisedUrl.getPort()
257
+ val workerId: String = advertisedUrl.host .toString() + " :" + advertisedUrl.port
294
258
295
259
val worker = Worker (
296
260
workerId, time, plugins, config, FileOffsetBackingStore (),
@@ -384,7 +348,6 @@ class Stream(override val topologyContext: TopologyContext, val topologyConstruc
384
348
// Give the Streams application a unique name. The name must be unique in the Kafka cluster
385
349
// against which the application is run.
386
350
streamsConfiguration.putAll(extra)
387
- // logger.info("Starting instance in storagePath: {}", storagePath)
388
351
streamsConfiguration.putIfAbsent(StreamsConfig .DEFAULT_KEY_SERDE_CLASS_CONFIG , Serdes .String ().javaClass)
389
352
streamsConfiguration.putIfAbsent(StreamsConfig .DEFAULT_KEY_SERDE_CLASS_CONFIG , Serdes .String ().javaClass)
390
353
streamsConfiguration.putIfAbsent(StreamsConfig .DEFAULT_VALUE_SERDE_CLASS_CONFIG , StreamOperators .replicationSerde.javaClass)
@@ -395,10 +358,8 @@ class Stream(override val topologyContext: TopologyContext, val topologyConstruc
395
358
streamsConfiguration.putIfAbsent(ConsumerConfig .MAX_POLL_INTERVAL_MS_CONFIG , 7200000 )
396
359
streamsConfiguration.putIfAbsent(ConsumerConfig .MAX_POLL_RECORDS_CONFIG , 100 )
397
360
streamsConfiguration.putIfAbsent(ProducerConfig .COMPRESSION_TYPE_CONFIG , " lz4" )
398
- // streamsConfiguration.putIfAbsent(StreamsConfig.STATE_DIR_CONFIG,storagePath)
399
361
streamsConfiguration.putIfAbsent(StreamsConfig .NUM_STREAM_THREADS_CONFIG , 1 )
400
362
streamsConfiguration.putIfAbsent(StreamsConfig .NUM_STANDBY_REPLICAS_CONFIG , 0 )
401
- streamsConfiguration.putIfAbsent(StreamsConfig .RETRIES_CONFIG , 50 )
402
363
streamsConfiguration.putIfAbsent(StreamsConfig .REPLICATION_FACTOR_CONFIG , CoreOperators .topicReplicationCount())
403
364
streamsConfiguration.putIfAbsent(StreamsConfig .DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG , WallclockTimestampExtractor ::class .java)
404
365
@@ -423,7 +384,6 @@ class Stream(override val topologyContext: TopologyContext, val topologyConstruc
423
384
runBlocking {
424
385
io.floodplain.runtime.run (this @Stream, args, { after(it) }, { _, topologyContext -> after(topologyContext) })
425
386
}
426
- // io.floodplain.runtime.runWithArguments(this@Stream, arrayOf(*args), { after(it) }, { after() })
427
387
}
428
388
429
389
override val rootTopology: Stream
0 commit comments