Skip to content

Commit b5ff344

Browse files
committed
add support for embedded sinks
1 parent 8630afd commit b5ff344

File tree

16 files changed

+591
-122
lines changed

16 files changed

+591
-122
lines changed

buildSrc/src/main/kotlin/Dependencies.kt

+1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ object FloodplainDeps {
3030
const val debeziumMySQL = "io.debezium:debezium-connector-mysql:$debezium_version"
3131
const val debeziumEmbedded = "io.debezium:debezium-embedded:$debezium_version"
3232
const val debeziumServerCore = "io.debezium:debezium-server-core:$debezium_version"
33+
const val debeziumTestContainers = "io.debezium:debezium-testing-testcontainers:$debezium_version"
3334
const val cdiApi = "jakarta.enterprise:jakarta.enterprise.cdi-api:2.0.2"
3435
const val microProfileConfig = "org.eclipse.microprofile.config:microprofile-config-api:1.3"
3536
const val jUnit = "junit:junit:4.13.1"

floodplain-dsl/build.gradle.kts

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ dependencies {
2121
implementation(FloodplainDeps.argParser)
2222
testCompile(FloodplainDeps.jUnit)
2323
testCompile(FloodplainDeps.testContainer)
24+
implementation(FloodplainDeps.kafkaConnectRuntime)
2425
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
2526
implementation(FloodplainDeps.kotlinCoroutines)
2627
testImplementation("org.jetbrains.kotlin:kotlin-test")

floodplain-dsl/src/main/kotlin/io/floodplain/kotlindsl/Floodplain.kt

+62-2
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,16 @@ import io.floodplain.replication.api.ReplicationMessage
4343
import io.floodplain.streams.api.Topic
4444
import io.floodplain.streams.api.TopologyContext
4545
import io.floodplain.streams.remotejoin.TopologyConstructor
46+
import org.apache.kafka.connect.sink.SinkConnector
47+
import org.apache.kafka.connect.sink.SinkRecord
48+
import org.apache.kafka.connect.sink.SinkTask
49+
import java.lang.RuntimeException
4650
import java.time.Duration
4751
import java.util.Optional
52+
import java.util.concurrent.atomic.AtomicLong
53+
import kotlin.system.measureTimeMillis
54+
55+
private val logger = mu.KotlinLogging.logger {}
4856

4957
/**
5058
* Super (wrapper) class for all components (source, transformer or sink)
@@ -78,10 +86,28 @@ interface SourceConfig : Config {
7886

7987
interface SinkConfig : Config {
8088
fun sinkTask(): Any?
81-
fun instantiateSinkElements()
89+
fun instantiateSinkElements(): List<Map<String, String>>
8290
fun sinkElements(): Map<Topic, List<FloodplainSink>>
8391
}
8492

93+
abstract class AbstractSinkConfig: SinkConfig {
94+
// var instantiatedSinkElements: Map<Topic, List<FloodplainSink>>? = null
95+
var sinkTask: SinkTask? = null
96+
var floodplainSink: FloodplainSink? = null
97+
var connector: SinkConnector? = null
98+
override fun instantiateSinkElements(): List<Map<String, String>> {
99+
val configs = materializeConnectorConfig()
100+
if(configs.size > 1) {
101+
throw RuntimeException("Multiple configs not supported for now")
102+
}
103+
return configs.map {
104+
it.settings
105+
}.toList()
106+
107+
}
108+
}
109+
110+
85111
class MaterializedConfig(val name: String, val topics: List<Topic>, val settings: Map<String, String>)
86112

87113
interface SourceTopic {
@@ -197,7 +223,6 @@ fun PartialStream.joinAttributes(withTopic: String, nameAttribute: String, value
197223
scan(
198224
keyExtract,
199225
{
200-
_ ->
201226
empty()
202227
},
203228
{
@@ -552,3 +577,38 @@ class Block(rootTopology: Stream, override val topologyContext: TopologyContext)
552577
// throw UnsupportedOperationException("Blocks shouldn't add sources")
553578
// }
554579
}
580+
581+
582+
class AbstractFloodplainSink(private val task: SinkTask, private val config: SinkConfig) : FloodplainSink {
583+
private val offsetCounter = AtomicLong(System.currentTimeMillis())
584+
585+
override fun send(topic: Topic, elements: List<Pair<String, Map<String, Any>?>>) {
586+
val list = elements.map { (key, value) ->
587+
SinkRecord(topic.qualifiedString(), 0, null, key, null, value, offsetCounter.incrementAndGet())
588+
}.toList()
589+
val insertTime = measureTimeMillis {
590+
try {
591+
task.put(list)
592+
} catch (e: Throwable) {
593+
e.printStackTrace()
594+
}
595+
}
596+
logger.info("Inserting into mongodb size: ${list.size} duration: $insertTime")
597+
}
598+
599+
override fun config(): SinkConfig {
600+
return config
601+
}
602+
603+
override fun flush() {
604+
task.flush(emptyMap())
605+
}
606+
607+
override fun close() {
608+
task.close(emptyList())
609+
}
610+
611+
override fun taskObject(): Any? {
612+
TODO("Not yet implemented")
613+
}
614+
}

floodplain-dsl/src/main/kotlin/io/floodplain/kotlindsl/FloodplainConnector.kt

+9-5
Original file line numberDiff line numberDiff line change
@@ -99,12 +99,14 @@ fun startConstructor(
9999
deleteConnector(generatedName, connectURL)
100100
} else {
101101
logger.warn("Connector: {} already present, ignoring", generatedName)
102+
// TODO return from here?
102103
}
103104
}
104105
postToHttpJava11(connectURL, jsonString)
105106
}
106107

107108
private fun existingConnectors(url: URL): List<String> {
109+
logger.info("Connecting to URL: $url")
108110
val response = httpClient.send(HttpRequest.newBuilder().uri(url.toURI()).build(), BodyHandlers.ofInputStream())
109111
val an = objectMapper.readTree(response.body()) as ArrayNode
110112
val result: MutableList<String> = ArrayList()
@@ -146,10 +148,10 @@ fun floodplainSinkFromTask(task: SinkTask, config: SinkConfig): FloodplainSink {
146148
return LocalConnectorSink(task, config)
147149
}
148150

149-
fun instantiateSinkConfig(config: SinkConfig, connector: () -> SinkConnector): Map<Topic, MutableList<FloodplainSink>> {
151+
fun instantiateSinkConfig(config: SinkConfig, connector: () -> SinkConnector): SinkConnector {
150152
val result = mutableMapOf<Topic, MutableList<FloodplainSink>>()
151153
val materializedSinks = config.materializeConnectorConfig()
152-
materializedSinks.map { materializedSink ->
154+
val instance = materializedSinks.map { materializedSink ->
153155
val connectorInstance = connector()
154156
connector().start(materializedSink.settings)
155157
val task = connectorInstance.taskClass().getDeclaredConstructor().newInstance() as SinkTask
@@ -160,11 +162,13 @@ fun instantiateSinkConfig(config: SinkConfig, connector: () -> SinkConnector): M
160162
val list = result.computeIfAbsent(topic) { mutableListOf() }
161163
list.add(localSink)
162164
}
163-
}
164-
return result
165+
connectorInstance
166+
}.first()
167+
return instance
168+
// return connectorInstance to instance
165169
}
166170

167-
private class LocalConnectorSink(private val task: SinkTask, val config: SinkConfig) : FloodplainSink {
171+
class LocalConnectorSink(private val task: SinkTask, val config: SinkConfig) : FloodplainSink {
168172
private val offsetCounter = AtomicLong(System.currentTimeMillis())
169173
override fun send(topic: Topic, elements: List<Pair<String, Map<String, Any>?>>) {
170174
logger.info("Inserting # of documents ${elements.size} for topic: $topic")

floodplain-dsl/src/main/kotlin/io/floodplain/kotlindsl/LocalRuntime.kt

+4-1
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import kotlinx.coroutines.launch
5050
import kotlinx.coroutines.newSingleThreadContext
5151
import kotlinx.coroutines.runBlocking
5252
import org.apache.kafka.common.serialization.Serdes
53+
import org.apache.kafka.common.utils.Utils
5354
import org.apache.kafka.connect.json.JsonDeserializer
5455
import org.apache.kafka.streams.KeyValue
5556
import org.apache.kafka.streams.StreamsConfig
@@ -132,7 +133,8 @@ fun runLocalTopology(
132133
props.setProperty(StreamsConfig.STATE_DIR_CONFIG, storageFolder)
133134
props.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, StreamsConfig.METRICS_LATEST)
134135
val driver = TopologyTestDriver(topology, props)
135-
val contextInstance = LocalDriverContext(driver, context, topologyConstructor, sourceConfigs, sinkConfigs, sinks, bufferTime)
136+
137+
val contextInstance = LocalDriverContext(driver, context, topologyConstructor, Utils.propsToStringMap(props), sourceConfigs, sinkConfigs, sinks, bufferTime)
136138
val jobs = contextInstance.connectSourceAndSink()
137139
contextInstance.connectJobs.addAll(jobs)
138140
try {
@@ -158,6 +160,7 @@ class LocalDriverContext(
158160
private val driver: TopologyTestDriver,
159161
private val topologyContext: TopologyContext,
160162
private val topologyConstructor: TopologyConstructor,
163+
private val config: Map<String,String>,
161164
private val sourceConfigs: List<SourceConfig>,
162165
private val sinkConfigs: List<SinkConfig>,
163166
private val sinks: List<Pair<String, String>>,

0 commit comments

Comments
 (0)