Skip to content

Commit 0f98917

Browse files
committed
upgrade kafka to latest, and debezium. Fix changed api's. Mostly topicPrefix andserver id
1 parent 8ed29c4 commit 0f98917

File tree

16 files changed

+93
-49
lines changed

16 files changed

+93
-49
lines changed

buildSrc/src/main/kotlin/Dependencies.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ object FloodplainDeps {
1616
const val kotlin = FloodplainPlugins.kotlin
1717
const val floodplain_version = "1.11.7-SNAPSHOT"
1818
const val jackson_version = "2.13.3"
19-
const val kafka_version = "3.3.1"
19+
const val kafka_version = "3.3.1!!"
2020
const val slf4j_version = "1.7.36"
2121
const val mysql_version = "8.0.23"
2222
const val mongodb_version = "4.5.0"

floodplain-debezium/src/main/kotlin/io/floodplain/debezium/postgres/DebeziumSource.kt

+4-1
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ private fun createLocalDebeziumSettings(
7575
username: String,
7676
password: String,
7777
topicPrefix: String,
78+
serverId: Long,
7879
offsetId: String? = null,
7980
settings: Map<String, String> = emptyMap()
8081
): Properties {
@@ -86,6 +87,7 @@ private fun createLocalDebeziumSettings(
8687
props.setProperty("database.hostname", hostname)
8788
props.setProperty("database.port", "$port")
8889
props.setProperty("database.server.name", name) // don't think this matters?
90+
props.setProperty("database.server.id", serverId.toString()) // don't think this matters?
8991
props.setProperty("database.dbname", database)
9092
// props.setProperty("database.include.list", database)
9193
props.setProperty("database.user", username)
@@ -123,12 +125,13 @@ fun createDebeziumChangeFlow(
123125
username: String,
124126
password: String,
125127
topicPrefix: String,
128+
serverId: Long,
126129
offsetId: String? = null,
127130
settings: Map<String, String> = emptyMap()
128131
): Flow<ChangeRecord> {
129132
val props = createLocalDebeziumSettings(
130133
name, taskClass, hostname, port, database,
131-
username, password,topicPrefix, offsetId, settings
134+
username, password,topicPrefix, serverId, offsetId, settings
132135
)
133136
props.list(System.out)
134137
return runDebeziumServer(props)

floodplain-debezium/src/test/kotlin/TestDebeziumSource.kt

+1
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ class TestDebeziumSource {
5252
"postgres",
5353
"mysecretpassword",
5454
"my_prefix",
55+
1,
5556
UUID.randomUUID().toString(),
5657
emptyMap()
5758
)

floodplain-direct/build.gradle.kts

+1
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ import io.floodplain.build.FloodplainDeps
33
dependencies {
44
implementation(FloodplainDeps.kotlinLogging)
55
implementation(FloodplainDeps.kafkaStreams)
6+
implementation(FloodplainDeps.kafkaClient)
67
}

floodplain-direct/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -601,7 +601,7 @@ private void completeAllProcessableWork() {
601601
task.process(mockWallClockTime.milliseconds());
602602
task.maybePunctuateStreamTime();
603603
commit(task.prepareCommit());
604-
task.postCommit(true);
604+
task.postCommit(false);
605605
captureOutputsAndReEnqueueInternalResults();
606606
}
607607
if (task.hasRecordsQueued()) {

floodplain-dsl/src/main/kotlin/io/floodplain/kotlindsl/LocalRuntime.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ class LocalDriverContext(
204204
override fun connectSourceAndSink(): List<Job> {
205205
val outputJob = GlobalScope.launch(newSingleThreadContext("TopologySource"), CoroutineStart.UNDISPATCHED) {
206206
val outputFlows = outputFlows(this)
207-
.map { (topic, flow) -> topic to flow.bufferTimeout(2000, bufferTime?.toLong() ?: 1000) }
207+
.map { (topic, flow) -> topic to flow.bufferTimeout(500, bufferTime?.toLong() ?: 10000) }
208208
.toMap()
209209
val sinks = sinksByTopic()
210210
outputFlows.forEach { (topic, flow) ->

floodplain-example/src/main/kotlin/io/floodplain/kotlindsl/example/WordpressToMongoDB.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ const val DEFAULT_MYSQL_PORT = 3306
3232
fun main() {
3333
stream {
3434
val mysqlConfig =
35-
mysqlSourceConfig("mysqlsource", "mysql", DEFAULT_MYSQL_PORT, "root", "mysecretpassword", "wpdb","topicPrefix")
35+
mysqlSourceConfig("mysqlsource", "mysql", DEFAULT_MYSQL_PORT, "root", "mysecretpassword", "wpdb","mysqlsource")
3636
val mongoConfig = mongoConfig("mongosink", "mongodb://mongo", "$generation-mongodumpalt")
3737
mysqlSource("wpdb.wp_posts", mysqlConfig) {
3838
each { key, msg, _ ->

floodplain-integration/src/test/kotlin/io.floodplain.integration/FilmSimple.kt

+4-2
Original file line numberDiff line numberDiff line change
@@ -58,15 +58,17 @@ class FilmSimple {
5858
logger.info("Not performing integration tests, doesn't seem to work in circleci")
5959
return
6060
}
61-
stream {
61+
stream("mytenant") {
6262
val postgresConfig = postgresSourceConfig(
6363
"mypostgres",
6464
postgresContainer.host,
6565
postgresContainer.exposedPort,
6666
"postgres",
6767
"mysecretpassword",
6868
"dvdrental",
69-
"public"
69+
"public",
70+
1,
71+
7072
)
7173
val mongoConfig = remoteMongoConfig(
7274
"mongosink",

floodplain-integration/src/test/kotlin/io.floodplain.integration/FilmToGoogleSheets.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ class FilmToGoogleSheets {
149149
"postgres",
150150
"mysecretpassword",
151151
"dvdrental",
152-
"public"
152+
"public",
153153
)
154154
val sheetConfig = googleSheetConfig("sheets")
155155
postgresSource("payment", postgresConfig) {

floodplain-integration/src/test/kotlin/io.floodplain.integration/FilmToMongoDB.kt

+5-1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import org.junit.jupiter.api.AfterAll
3939
import org.junit.jupiter.api.Assertions.assertEquals
4040
import org.junit.jupiter.api.Test
4141
import org.junit.jupiter.api.TestInstance
42+
import java.util.concurrent.atomic.AtomicInteger
4243

4344
private val logger = mu.KotlinLogging.logger {}
4445

@@ -77,6 +78,8 @@ class FilmToMongoDB {
7778
"mongodb://${mongoContainer.host}:${mongoContainer.exposedPort}",
7879
"$generation-mongodump"
7980
)
81+
82+
val counter = AtomicInteger()
8083
postgresSource("film", postgresConfig) {
8184
// Clear the last_update field, it makes no sense in a denormalized situation
8285
set { _, film, _ ->
@@ -111,6 +114,7 @@ class FilmToMongoDB {
111114
// Ideas welcome
112115
set { _, film, actorlist ->
113116
film["actors"] = actorlist["list"] ?: emptyList<IMessage>()
117+
println("Count: ${counter.incrementAndGet()}")
114118
film
115119
}
116120
// pass this message to the mongo sink
@@ -120,7 +124,7 @@ class FilmToMongoDB {
120124
val database = "${topologyContext.generation}-mongodump"
121125
var hits = 0L
122126
val start = System.currentTimeMillis()
123-
withTimeout(250000) {
127+
withTimeout(4500000) {
124128
repeat(1000) {
125129
MongoClients.create("mongodb://${mongoContainer.host}:${mongoContainer.exposedPort}")
126130
.use { client ->

floodplain-integration/src/test/kotlin/io.floodplain.integration/FilmToMongoIntegratedSinkRedPanda.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ class FilmToMongoIntegratedSinkRedPanda {
127127
val database = "mongodump" // topologyContext.topicName("@mongodump")
128128
var hits = 0L
129129
val start = System.currentTimeMillis()
130-
withTimeout(200000) {
130+
withTimeout(2000000) {
131131
MongoClients.create("mongodb://${mongoContainer.host}:${mongoContainer.exposedPort}")
132132
.use { client ->
133133
repeat(1000) {

floodplain-integration/src/test/kotlin/io.floodplain.integration/MySQLTest.kt

+39-25
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ import io.floodplain.mongodb.waitForMongoDbCondition
3434
import io.floodplain.test.InstantiatedContainer
3535
import io.floodplain.test.useIntegraton
3636
import kotlinx.coroutines.delay
37+
import org.junit.After
38+
import org.junit.Before
3739
import org.junit.jupiter.api.AfterAll
3840
import org.junit.jupiter.api.Assertions.*
3941
import org.junit.jupiter.api.Disabled
@@ -45,27 +47,32 @@ private val logger = mu.KotlinLogging.logger {}
4547
@Suppress("UNCHECKED_CAST")
4648
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
4749
class MySQLTest {
48-
private val mysqlContainer = InstantiatedContainer(
49-
"debezium/example-mysql:1.6",
50-
3306,
51-
mapOf(
52-
"MYSQL_ROOT_PASSWORD" to "mysecretpassword",
53-
"MYSQL_DATABASE" to "wpdb",
54-
"MYSQL_USER" to "mysqluser",
55-
"MYSQL_PASSWORD" to "mysqlpw",
56-
"MYSQL_ROOT_HOST" to "%"
50+
// private var mysqlContainer: InstantiatedContainer = createMySql()
51+
// private var mongoContainer: InstantiatedContainer = createMongodb() //= InstantiatedContainer("mongo:latest", 27017)
52+
53+
54+
fun createMySql(): InstantiatedContainer {
55+
return InstantiatedContainer(
56+
"debezium/example-mysql:1.6",
57+
3306,
58+
mapOf(
59+
"MYSQL_ROOT_PASSWORD" to "mysecretpassword",
60+
"MYSQL_DATABASE" to "wpdb",
61+
"MYSQL_USER" to "mysqluser",
62+
"MYSQL_PASSWORD" to "mysqlpw",
63+
"MYSQL_ROOT_HOST" to "%"
64+
)
5765
)
58-
)
59-
private val mongoContainer = InstantiatedContainer("mongo:latest", 27017)
66+
}
6067

61-
@AfterAll
62-
fun shutdown() {
63-
mysqlContainer.close()
64-
mongoContainer.close()
68+
fun createMongodb(): InstantiatedContainer {
69+
return InstantiatedContainer("mongo:latest", 27017)
6570
}
6671

6772
@Test
6873
fun testSimple() {
74+
val mysqlContainer = createMySql()
75+
val mongoContainer = createMongodb()
6976
if (!useIntegraton) {
7077
logger.warn("Skipping integration test")
7178
return
@@ -78,7 +85,7 @@ class MySQLTest {
7885
"root",
7986
"mysecretpassword",
8087
"inventory",
81-
"topicPrefix"
88+
"mypostgres",
8289
)
8390
val mongoConfig = remoteMongoConfig(
8491
"mongosink",
@@ -106,12 +113,16 @@ class MySQLTest {
106113
connectJobs().forEach {
107114
it.cancel()
108115
}
116+
mysqlContainer.close()
117+
mongoContainer.close()
109118
assertNotNull(hits)
110119
}
111120
}
112121

113122
@Test
114123
fun testRuntimeParamParser() {
124+
val mysqlContainer = createMySql()
125+
val mongoContainer = createMongodb()
115126
if (!useIntegraton) {
116127
logger.warn("Skipping integration test")
117128
return
@@ -124,20 +135,19 @@ class MySQLTest {
124135
"root",
125136
"mysecretpassword",
126137
"inventory",
127-
"topicPrefix"
128-
138+
"mypostgres",
129139
)
130140
val mongoConfig = remoteMongoConfig(
131141
"mongosink",
132-
"mongodb://${mongoContainer.host}:${mongoContainer.exposedPort}",
142+
"mongodb://${mongoContainer?.host}:${mongoContainer?.exposedPort}",
133143
"@mongodump"
134144
)
135145
mysqlSource("inventory.customers", mysqlConfig) {
136146
toMongo("customers", "$generation-customers", mongoConfig)
137147
}
138148
}.runWithArguments { topologyContext ->
139149
val hits = waitForMongoDbCondition(
140-
"mongodb://${mongoContainer.host}:${mongoContainer.exposedPort}",
150+
"mongodb://${mongoContainer?.host}:${mongoContainer?.exposedPort}",
141151
"${topologyContext.generation}-mongodump"
142152
) { database ->
143153
val customerCount = database.getCollection("customers").countDocuments()
@@ -151,11 +161,15 @@ class MySQLTest {
151161
}
152162
} as Long?
153163
assertNotNull(hits)
164+
mysqlContainer.close()
165+
mongoContainer.close()
154166
}
155167
}
156168

157169
@Test
158170
fun testInventory() {
171+
val mysqlContainer = createMySql()
172+
val mongoContainer = createMongodb()
159173
stream {
160174
val mysqlConfig = mysqlSourceConfig(
161175
"mypostgres",
@@ -164,12 +178,11 @@ class MySQLTest {
164178
"root",
165179
"mysecretpassword",
166180
"inventory",
167-
"topicPrefix"
168-
181+
"mypostgres",
169182
)
170183
val mongoConfig = remoteMongoConfig(
171184
"mongosink",
172-
"mongodb://${mongoContainer.host}:${mongoContainer.exposedPort}",
185+
"mongodb://${mongoContainer?.host}:${mongoContainer?.exposedPort}",
173186
"$generation-mongodump"
174187
)
175188
mysqlSource("inventory.customers", mysqlConfig) {
@@ -223,6 +236,8 @@ class MySQLTest {
223236
}
224237
} as Long?
225238
assertNotNull(hits)
239+
mysqlContainer.close()
240+
mongoContainer.close()
226241
}
227242
}
228243
// can make this a proper unit test when I have a persisted wordpress installation image
@@ -236,8 +251,7 @@ class MySQLTest {
236251
"root",
237252
"mysecretpassword",
238253
"wpdb",
239-
"topicPrefix"
240-
254+
"topicPrefix",
241255
)
242256
val mongoConfig = remoteMongoConfig("mongosink", "mongodb://localhost", "@mongodump2")
243257
mysqlSource("wpdb.wp_posts", mysqlConfig) {

floodplain-jdbc/src/main/kotlin/io/floodplain/jdbc/MySQLSource.kt

+18-8
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ private class MySQLConfig(
4747
private val username: String,
4848
private val password: String,
4949
private val database: String,
50-
private val topicPrefix: String
50+
private val topicPrefix: String,
51+
private val serverId: Long
5152
) : SourceConfig {
5253

5354
private val sourceElements: MutableList<SourceTopic> = mutableListOf()
@@ -57,7 +58,7 @@ private class MySQLConfig(
5758
}
5859

5960
override suspend fun connectSource(inputReceiver: InputReceiver) {
60-
val broadcastFlow = directSource(offsetId,topicPrefix)
61+
val broadcastFlow = directSource(serverId, offsetId,topicPrefix)
6162
broadcastFlow
6263
.onEach { logger.info("Record found: ${it.topic} ${it.key}") }
6364
.collect {
@@ -87,13 +88,16 @@ private class MySQLConfig(
8788
"database.user" to username,
8889
"database.password" to password,
8990
"database.server.name" to name,
91+
"database.server.id" to serverId.toString(),
9092
"database.whitelist" to database,
9193
"key.converter" to "org.apache.kafka.connect.json.JsonConverter",
9294
"value.converter" to "org.apache.kafka.connect.json.JsonConverter",
9395
// TODO Deal with this, is it required?
9496
"database.history.kafka.bootstrap.servers" to "kafka:9092",
95-
"database.history.kafka.topic" to "dbhistory.wordpress",
96-
"include.schema.changes" to "false"
97+
"database.history.kafka.topic" to "history",
98+
"schema.history.internal.kafka.topic" to "schema_history",
99+
"schema.history.internal.kafka.bootstrap.servers" to "kafka:9092",
100+
"include.schema.changes" to "false",
97101
)
98102
)
99103
)
@@ -103,12 +107,15 @@ private class MySQLConfig(
103107
sourceElements.add(elt)
104108
}
105109

106-
private fun directSource(offsetId: String, topicPrefix: String): Flow<ChangeRecord> {
110+
private fun directSource(serverId: Long, offsetId: String, topicPrefix: String): Flow<ChangeRecord> {
107111
val tempFile = createTempFile(offsetId)
112+
val schemaTempFile = createTempFile(offsetId,"_schema")
108113

109114
val extraSettings = mapOf(
110115
"database.history" to "io.debezium.relational.history.FileDatabaseHistory",
111-
"database.history.file.filename" to tempFile.absolutePath
116+
"database.history.file.filename" to tempFile.absolutePath,
117+
"schema.history.internal" to "io.debezium.storage.file.history.FileSchemaHistory",
118+
"schema.history.internal.file.filename" to schemaTempFile.absolutePath,
112119
)
113120
return createDebeziumChangeFlow(
114121
topologyContext.topicName(name),
@@ -119,6 +126,7 @@ private class MySQLConfig(
119126
username,
120127
password,
121128
topicPrefix,
129+
serverId,
122130
offsetId,
123131
extraSettings
124132
)
@@ -138,7 +146,8 @@ fun Stream.mysqlSourceConfig(
138146
username: String,
139147
password: String,
140148
database: String,
141-
topicPrefix: String
149+
topicPrefix: String,
150+
serverId: Long = 1
142151
): SourceConfig {
143152
val mySQLConfig = MySQLConfig(
144153
this.topologyContext,
@@ -150,7 +159,8 @@ fun Stream.mysqlSourceConfig(
150159
username,
151160
password,
152161
database,
153-
topicPrefix
162+
topicPrefix,
163+
serverId
154164
)
155165
addSourceConfiguration(mySQLConfig)
156166
return mySQLConfig

0 commit comments

Comments
 (0)