Skip to content

Commit e9dc35a

Browse files
committed
support confluent cloud
1 parent e18829f commit e9dc35a

File tree

7 files changed

+54
-30
lines changed

7 files changed

+54
-30
lines changed

buildSrc/src/main/kotlin/Dependencies.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ fun isReleaseVersion(): Boolean {
1616
}
1717

1818
object FloodplainDeps {
19-
const val floodplain_version = "1.3.2-SNAPSHOT"
19+
const val floodplain_version = "1.3.4-SNAPSHOT"
2020
const val jackson_version = "2.11.3"
2121
const val kafka_version = "2.7.0"
2222
const val slf4j_version = "1.7.30"

floodplain-dsl/src/main/kotlin/io/floodplain/kotlindsl/Stream.kt

+26-12
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import org.apache.kafka.streams.KafkaStreams
3333
import org.apache.kafka.streams.StreamsConfig
3434
import org.apache.kafka.streams.Topology
3535
import org.apache.kafka.streams.processor.WallclockTimestampExtractor
36+
import java.io.InputStream
3637
import java.lang.StringBuilder
3738
import java.net.URL
3839
import java.util.Properties
@@ -140,16 +141,24 @@ class Stream(override val topologyContext: TopologyContext) : FloodplainSourceCo
140141
)
141142
}
142143

144+
fun renderAndSchedule(connectorURL: URL?, settings: InputStream, force: Boolean = false): KafkaStreams {
145+
val prop = Properties()
146+
prop.load(settings)
147+
val propMap = mutableMapOf<String,Any>()
148+
prop.forEach { (k,v)-> propMap.put(k as String,v) }
149+
return renderAndSchedule(connectorURL,prop[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] as String,force,propMap)
150+
}
151+
143152
/**
144153
* Will create an executable definition of the str
145154
* eam (@see render), then will start the topology by starting a streams
146155
* instance pointing at the kafka cluster at kafkaHosts, using the supplied clientId.
147156
* Finally, it will POST the supplied
148157
*/
149-
fun renderAndSchedule(connectorURL: URL?, kafkaHosts: String, force: Boolean = false): KafkaStreams {
158+
fun renderAndSchedule(connectorURL: URL?, kafkaHosts: String, force: Boolean = false,settings: Map<String,Any>? = null): KafkaStreams {
150159
val topologyConstructor = TopologyConstructor()
151160
val (topology, sources, sinks) = render(topologyConstructor)
152-
topologyConstructor.createTopicsAsNeeded(topologyContext, kafkaHosts)
161+
topologyConstructor.createTopicsAsNeeded(settings?: mapOf(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaHosts))
153162
sources.forEach { (name, json) ->
154163
connectorURL?.let {
155164
startConstructor(name, topologyContext, it, json, force)
@@ -161,7 +170,11 @@ class Stream(override val topologyContext: TopologyContext) : FloodplainSourceCo
161170
}
162171
}
163172
val appId = topologyContext.topicName("@applicationId")
164-
val streams = runTopology(topology, appId, kafkaHosts, "storagePath")
173+
val extra: MutableMap<String,Any> = mutableMapOf()
174+
if(settings!=null) {
175+
extra.putAll(settings)
176+
}
177+
val streams = runTopology(topology, appId, kafkaHosts, "storagePath", extra)
165178
logger.info { "Topology running!" }
166179
return streams
167180
}
@@ -187,8 +200,12 @@ class Stream(override val topologyContext: TopologyContext) : FloodplainSourceCo
187200
return Triple(topology, sources, sinks)
188201
}
189202

190-
private fun runTopology(topology: Topology, applicationId: String, kafkaHosts: String, storagePath: String): KafkaStreams {
191-
val props = createProperties(applicationId, kafkaHosts, storagePath)
203+
private fun runTopology(topology: Topology, applicationId: String, kafkaHosts: String, storagePath: String, extra: MutableMap<String,Any>): KafkaStreams {
204+
extra[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = kafkaHosts
205+
extra[StreamsConfig.APPLICATION_ID_CONFIG] = applicationId
206+
extra[StreamsConfig.STATE_DIR_CONFIG] = storagePath
207+
208+
val props = createProperties(extra)
192209
val stream = KafkaStreams(topology, props)
193210
logger.info("CurrentTopology:\n ${topology.describe()}")
194211
stream.setUncaughtExceptionHandler { thread: Thread, exception: Throwable? ->
@@ -202,15 +219,12 @@ class Stream(override val topologyContext: TopologyContext) : FloodplainSourceCo
202219
return stream
203220
}
204221

205-
private fun createProperties(applicationId: String, brokers: String, storagePath: String): Properties {
222+
private fun createProperties(extra: Map<String,Any>): Properties {
206223
val streamsConfiguration = Properties()
207224
// Give the Streams application a unique name. The name must be unique in the Kafka cluster
208225
// against which the application is run.
209-
logger.info("Creating application with name: {}", applicationId)
210-
logger.info("Creating application id: {}", applicationId)
211-
logger.info("Starting instance in storagePath: {}", storagePath)
212-
streamsConfiguration[StreamsConfig.APPLICATION_ID_CONFIG] = applicationId
213-
streamsConfiguration[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = brokers
226+
streamsConfiguration.putAll(extra)
227+
// logger.info("Starting instance in storagePath: {}", storagePath)
214228
streamsConfiguration[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.String().javaClass
215229
streamsConfiguration[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = StreamOperators.replicationSerde.javaClass
216230
streamsConfiguration[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
@@ -220,7 +234,7 @@ class Stream(override val topologyContext: TopologyContext) : FloodplainSourceCo
220234
streamsConfiguration[ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG] = 7200000
221235
streamsConfiguration[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = 100
222236
streamsConfiguration[ProducerConfig.COMPRESSION_TYPE_CONFIG] = "lz4"
223-
streamsConfiguration[StreamsConfig.STATE_DIR_CONFIG] = storagePath
237+
// streamsConfiguration[StreamsConfig.STATE_DIR_CONFIG] = storagePath
224238
streamsConfiguration[StreamsConfig.NUM_STREAM_THREADS_CONFIG] = 1
225239
streamsConfiguration[StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG] = 0
226240
streamsConfiguration[StreamsConfig.RETRIES_CONFIG] = 50

floodplain-example/src/main/kotlin/io/floodplain/kotlindsl/example/CalendarDate.kt

-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package io.floodplain.kotlindsl.example
22

33
import io.floodplain.kotlindsl.each
4-
import io.floodplain.kotlindsl.set
54
import io.floodplain.kotlindsl.source
65
import io.floodplain.kotlindsl.stream
76
import io.floodplain.mongodb.mongoConfig

floodplain-integration/src/test/kotlin/io.floodplain.integration/TestTopicCreation.kt

+16-9
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,15 @@
11
package io.floodplain.integration
22

3-
import io.floodplain.test.useIntegraton
3+
import io.floodplain.test.RedPandaContainer
44
import org.apache.kafka.clients.admin.AdminClient
55
import org.apache.kafka.clients.admin.CreateTopicsOptions
66
import org.apache.kafka.clients.admin.ListTopicsOptions
77
import org.apache.kafka.clients.admin.NewTopic
88
import org.apache.kafka.common.config.ConfigResource
9-
import org.apache.kafka.common.config.ConfigResource.*
109
import org.apache.kafka.common.config.ConfigResource.Type.TOPIC
10+
import org.junit.Assert
1111
import org.junit.Ignore
1212
import org.junit.Test
13-
import org.testcontainers.containers.KafkaContainer
1413
import java.util.Collections
1514
import java.util.UUID
1615
import java.util.HashMap
@@ -24,14 +23,21 @@ class TestTopicCreation {
2423
// .withEnv(env)
2524

2625
// Infra for testing Kafka interaction
26+
private val panda = RedPandaContainer("vectorized/redpanda:latest", 9092)
27+
// val kafkaContainer = KafkaContainer("5.5.3").withEmbeddedZookeeper().withExposedPorts(9092,9093)
28+
29+
// Not functional yet
2730
@Test @Ignore
2831
fun testCreateTopic() {
29-
val kafkaContainer = KafkaContainer("5.5.3").withEmbeddedZookeeper().withExposedPorts(9092,9093)
3032

31-
kafkaContainer.start()
32-
logger.info("Bootstrap: ${kafkaContainer.bootstrapServers}")
33+
34+
// kafkaContainer.start()
35+
// logger.info("Bootsx§trap: ${kafkaContainer.bootstrapServers}")
3336
val config: MutableMap<String, Any> = HashMap()
34-
config["bootstrap.servers"] = kafkaContainer.bootstrapServers
37+
// val exposedPort = panda.exposedPort
38+
val host = "localhost:${panda.exposedPort}"
39+
logger.info("Exposed host: $host")
40+
config["bootstrap.servers"] = host //"localhost:51347"
3541
config["client.id"] = UUID.randomUUID().toString()
3642
val adminClient = AdminClient.create(config)
3743
adminClient.listTopics().names().get().forEach {
@@ -42,15 +48,16 @@ class TestTopicCreation {
4248
val topicDescription = adminClient.describeTopics(listOf("mytopic")).all().get()["mytopic"]
4349
// topicDescription?
4450
val cr = Collections.singleton(ConfigResource(TOPIC, "mytopic"))
45-
val configsResult = adminClient.describeConfigs(cr);
51+
val configsResult = adminClient.describeConfigs(cr)
4652
val cnf = configsResult.all().get()["mytopic"]
4753

4854
// adminClient.createTopics()
4955
val configMap: MutableMap<String, String> = HashMap()
5056
configMap["cleanup.policy"] = "compact"
5157

5258

53-
// adminClient.listTopics(ListTopicsOptions())
59+
Assert.assertEquals(1,adminClient.listTopics(ListTopicsOptions()).names().get().size)
60+
5461
}
5562

5663
// KafkaContainer kafka = new KafkaContainer(KAFKA_TEST_IMAGE)

floodplain-mongodb/src/main/kotlin/io/floodplain/mongodb/MongoSink.kt

+4
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@ class MongoConfig(val name: String, private val uri: String, private val databas
6565
"document.id.strategy" to "com.mongodb.kafka.connect.sink.processor.id.strategy.FullKeyStrategy",
6666
"delete.on.null.values" to "true",
6767
"debug" to "true",
68+
"topic.creation.default.replication.factor" to "3",
69+
"topic.creation.default.partitions" to "1",
70+
"topic.creation.default.cleanup.policy" to "compact",
71+
"topic.creation.default.compression.type" to "lz4",
6872
"connection.uri" to uri,
6973
"database" to generationalDatabase,
7074
"collection" to collections,

floodplain-test/src/main/kotlin/io/floodplain/test/InstantiatedContainer.kt

+3-4
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,12 @@ val useIntegraton: Boolean by lazy {
3131
class InstantiatedContainer(image: String, port: Int, env: Map<String, String> = emptyMap()) {
3232

3333
class KGenericContainer(imageName: String) : GenericContainer<KGenericContainer>(DockerImageName.parse(imageName))
34-
var container: KGenericContainer?
34+
var container: KGenericContainer? = KGenericContainer(image)
35+
.apply { withExposedPorts(port) }
36+
.apply { withEnv(env) }
3537
var host: String
3638
var exposedPort: Int = -1
3739
init {
38-
container = KGenericContainer(image)
39-
.apply { withExposedPorts(port) }
40-
.apply { withEnv(env) }
4140
container?.start()
4241
host = container?.host ?: "localhost"
4342
exposedPort = container?.firstMappedPort ?: -1

streams/src/main/java/io/floodplain/streams/remotejoin/TopologyConstructor.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,10 @@ public void ensureTopicExists(Topic topicName, Optional<Integer> partitionCount)
6767
desiredTopics.put(topicName, partitionCount);
6868
}
6969

70-
public void createTopicsAsNeeded(TopologyContext topologyContext, String kafkaHosts) {
71-
Map<String, Object> config = new HashMap<>();
72-
config.put("bootstrap.servers", kafkaHosts);
70+
public void createTopicsAsNeeded(Map<String,Object> settings) {
71+
Map<String, Object> config = new HashMap<>(settings);
72+
// config.put("bootstrap.servers", kafkaHosts);
73+
// TODO remove these?
7374
config.put("client.id", UUID.randomUUID().toString());
7475
config.put("cleanup.policy","compact");
7576
AdminClient adminClient = AdminClient.create(config);

0 commit comments

Comments
 (0)