Skip to content

Commit 6fb66fc

Browse files
committed
upgrade red panda, but no transactions yet
1 parent 609687e commit 6fb66fc

File tree

2 files changed

+147
-1
lines changed

2 files changed

+147
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package io.floodplain.integration
20+
21+
import com.mongodb.client.MongoClients
22+
import io.floodplain.kotlindsl.each
23+
import io.floodplain.kotlindsl.postgresSource
24+
import io.floodplain.kotlindsl.postgresSourceConfig
25+
import io.floodplain.kotlindsl.stream
26+
import io.floodplain.mongodb.mongoConfig
27+
import io.floodplain.mongodb.toMongo
28+
import io.floodplain.test.InstantiatedContainer
29+
import io.floodplain.test.InstantiatedRedPandaContainer
30+
import kotlinx.coroutines.delay
31+
import kotlinx.coroutines.withTimeout
32+
import org.junit.After
33+
import org.junit.Before
34+
import org.junit.Ignore
35+
import org.junit.Test
36+
import org.testcontainers.containers.Network
37+
import java.net.URL
38+
import java.util.concurrent.TimeoutException
39+
import kotlin.test.assertEquals
40+
41+
private val logger = mu.KotlinLogging.logger {}
42+
43+
class FilmToMongoIntegratedSinkRedPanda {
44+
45+
private val containerNetwork = Network.newNetwork()
46+
private val kafkaContainer = InstantiatedRedPandaContainer() // KafkaContainer("5.5.3").withEmbeddedZookeeper().withExposedPorts(9092)
47+
private val postgresContainer = InstantiatedContainer("floodplain/floodplain-postgres-demo:1.0.0", 5432, mapOf()) {
48+
it.withNetwork(
49+
containerNetwork
50+
).withNetworkAliases("postgres")
51+
}
52+
private val mongoContainer = InstantiatedContainer("mongo:latest", 27017, mapOf()) {
53+
it.withNetwork(
54+
containerNetwork
55+
).withNetworkAliases("mongo")
56+
}
57+
private var debeziumContainer: InstantiatedContainer? = null
58+
59+
@Before @Ignore
60+
fun setup() {
61+
val bootstrap = "${kafkaContainer.host}:${kafkaContainer.exposedPort}"
62+
logger.info("kafka.getBootstrapServers(): bootstrap: $bootstrap")
63+
64+
debeziumContainer = InstantiatedContainer(
65+
"debezium/connect:1.5",
66+
8083,
67+
mapOf(
68+
"BOOTSTRAP_SERVERS" to "kafka:9092",
69+
"CONFIG_STORAGE_TOPIC" to "CONNECTOR_STORAGE",
70+
"OFFSET_STORAGE_TOPIC" to "OFFSET_STORAGE"
71+
)
72+
) {
73+
it.withNetwork(containerNetwork)
74+
.withNetworkAliases("debezium")
75+
}
76+
debeziumContainer?.container?.start()
77+
logger.info("Setup done")
78+
Thread.sleep(20000)
79+
}
80+
81+
@After
82+
fun shutdown() {
83+
postgresContainer.close()
84+
mongoContainer.close()
85+
kafkaContainer.close()
86+
debeziumContainer?.close()
87+
}
88+
89+
/**
90+
* Test the simplest imaginable pipe: One source and one sink.
91+
*/
92+
@Test @Ignore
93+
fun testPostgresRunLocal() {
94+
stream {
95+
val postgresConfig = postgresSourceConfig(
96+
"mypostgres",
97+
"postgres",
98+
5432,
99+
"postgres",
100+
"mysecretpassword",
101+
"dvdrental",
102+
"public"
103+
)
104+
val mongoConfig = mongoConfig(
105+
"mongosink",
106+
"mongodb://localhost:${mongoContainer.exposedPort}",
107+
"mongodump"
108+
)
109+
postgresSource("film", postgresConfig) {
110+
each { _, m, _ ->
111+
logger.info("Film: $m")
112+
}
113+
toMongo("filmwithactors", "somtopic", mongoConfig)
114+
}
115+
}.renderAndSchedule(
116+
URL("http://${debeziumContainer?.host}:${debeziumContainer?.exposedPort}/connectors"),
117+
"${kafkaContainer.host}:${kafkaContainer.exposedPort}",
118+
true,
119+
mapOf()
120+
) { kafkaStreams ->
121+
val database = "mongodump" // topologyContext.topicName("@mongodump")
122+
var hits = 0L
123+
val start = System.currentTimeMillis()
124+
withTimeout(200000) {
125+
MongoClients.create("mongodb://${mongoContainer.host}:${mongoContainer.exposedPort}")
126+
.use { client ->
127+
repeat(1000) {
128+
val collection = client.getDatabase(database).getCollection("filmwithactors")
129+
hits = collection.countDocuments()
130+
logger.info("Count of Documents: $hits in database: $database")
131+
if (hits == 1000L) {
132+
return@withTimeout
133+
}
134+
delay(1000)
135+
}
136+
}
137+
throw TimeoutException("Test timed out")
138+
}
139+
140+
val diff = System.currentTimeMillis() - start
141+
logger.info("Elapsed: $diff millis")
142+
assertEquals(1000L, hits)
143+
kafkaStreams.close()
144+
}
145+
}
146+
}

floodplain-test/src/main/kotlin/io/floodplain/test/InstantiatedContainer.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ class InstantiatedRedPandaContainer(customizer: ((RedpandaContainer) -> Redpanda
105105

106106
private const val STARTER_SCRIPT = "/testcontainers_start.sh"
107107

108-
class RedpandaContainer : GenericContainer<RedpandaContainer?>("vectorized/redpanda:v21.4.13") {
108+
class RedpandaContainer : GenericContainer<RedpandaContainer?>("vectorized/redpanda:v21.5.1") {
109109
override fun containerIsStarting(containerInfo: InspectContainerResponse?) {
110110
super.containerIsStarting(containerInfo)
111111
var command = "#!/bin/bash\n"

0 commit comments

Comments
 (0)