Skip to content

Commit a397705

Browse files
committed
clean warnings, removed duplicated / unused code
1 parent 44c136f commit a397705

File tree

8 files changed

+18
-74
lines changed

8 files changed

+18
-74
lines changed

floodplain-debezium/src/main/kotlin/io/floodplain/debezium/postgres/DebeziumSource.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ fun createDebeziumChangeFlow(name: String, taskClass: String, hostname: String,
100100
private fun runDebeziumServer(props: Properties): Flow<ChangeRecord> {
101101
val engineKillSwitch = EngineKillSwitch()
102102
val totalTimeInSend = AtomicLong(0L)
103-
return callbackFlow<ChangeRecord> {
103+
return callbackFlow {
104104
val engine = DebeziumEngine.create(Json::class.java)
105105
.using(props)
106106
.notifying { record: ChangeEvent<String, String> ->

floodplain-debezium/src/test/java/SimpleMySQL.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,7 @@ public static void main(String[] args) throws IOException, InterruptedException
5454
// Create the engine with this configuration ...
5555
try (DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
5656
.using(props)
57-
.notifying(record -> {
58-
System.out.println(record);
59-
}).build()
57+
.notifying(System.out::println).build()
6058
) {
6159
Executors.newSingleThreadExecutor().execute(engine);
6260

floodplain-dsl/src/main/kotlin/io/floodplain/kotlindsl/LocalRuntime.kt

+5-4
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,13 @@ import kotlinx.coroutines.Job
3838
import kotlinx.coroutines.channels.awaitClose
3939
import kotlinx.coroutines.channels.sendBlocking
4040
import kotlinx.coroutines.flow.Flow
41-
import kotlinx.coroutines.flow.asFlow
42-
import kotlinx.coroutines.flow.broadcastIn
41+
import kotlinx.coroutines.flow.SharingStarted
4342
import kotlinx.coroutines.flow.callbackFlow
4443
import kotlinx.coroutines.flow.catch
4544
import kotlinx.coroutines.flow.collect
4645
import kotlinx.coroutines.flow.filter
4746
import kotlinx.coroutines.flow.map
47+
import kotlinx.coroutines.flow.shareIn
4848
import kotlinx.coroutines.isActive
4949
import kotlinx.coroutines.launch
5050
import kotlinx.coroutines.newSingleThreadContext
@@ -197,6 +197,7 @@ class LocalDriverContext(
197197
private fun <T> Flow<T>.handleErrors(): Flow<T> =
198198
catch { e -> logger.error("Error in flow $e") }
199199

200+
@OptIn(kotlinx.coroutines.ObsoleteCoroutinesApi::class)
200201
override fun connectSourceAndSink(): List<Job> {
201202
val outputJob = GlobalScope.launch(newSingleThreadContext("TopologySource"), CoroutineStart.UNDISPATCHED) {
202203
val outputFlows = outputFlows(this)
@@ -266,8 +267,8 @@ class LocalDriverContext(
266267
val result = if (value == null) null else mapper.convertValue(parsed, object : TypeReference<Map<String, Any>>() {})
267268
Triple(topic, key, result)
268269
}
269-
.broadcastIn(context)
270-
.asFlow()
270+
.shareIn(context,SharingStarted.Lazily)
271+
// .asFlow()
271272
.handleErrors()
272273
return topics.map { topic ->
273274
val flow = sourceFlow.filter { (incomingTopic, _, _) ->

floodplain-dsl/src/test/kotlin/io/floodplain/test/InstantiatedContainer.kt

-47
This file was deleted.

floodplain-elasticsearch/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ dependencies {
99
implementation(project(":streams"))
1010
implementation(project(":floodplain-stream-topology"))
1111
implementation(project(":floodplain-dsl"))
12+
implementation(project(":floodplain-test"))
1213
implementation("io.confluent:kafka-connect-elasticsearch:5.5.0")
1314
implementation(FloodplainDeps.kotlinCoroutines)
1415
testImplementation("org.jetbrains.kotlin:kotlin-test")

floodplain-elasticsearch/src/test/kotlin/io/floodplain/elasticsearch/TestElasticSearch.kt

+7-18
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ import com.fasterxml.jackson.databind.ObjectMapper
2323
import io.floodplain.kotlindsl.message.empty
2424
import io.floodplain.kotlindsl.source
2525
import io.floodplain.kotlindsl.stream
26+
import io.floodplain.test.InstantiatedContainer
2627
import kotlinx.coroutines.delay
2728
import kotlinx.coroutines.withTimeout
2829
import org.junit.Test
29-
import org.testcontainers.containers.GenericContainer
3030
import org.testcontainers.shaded.com.fasterxml.jackson.databind.node.NullNode
3131
import java.net.URI
3232
import java.net.http.HttpClient
@@ -42,25 +42,12 @@ class TestElasticSearch {
4242

4343
private val objectMapper = ObjectMapper()
4444

45-
class KGenericContainer(imageName: String) : GenericContainer<KGenericContainer>(imageName)
46-
var address: String? = "localhost"
47-
var port: Int? = 0
48-
var container: GenericContainer<*>? = null
49-
50-
init {
51-
container = KGenericContainer("docker.elastic.co/elasticsearch/elasticsearch-oss:7.7.0")
52-
.apply {
53-
withExposedPorts(9200)
54-
.withEnv("discovery.type", "single-node")
55-
}
56-
container?.start()
57-
address = container?.getHost()
58-
port = container?.getFirstMappedPort()
59-
}
45+
private val container = InstantiatedContainer("docker.elastic.co/elasticsearch/elasticsearch-oss:7.7.0",
46+
9200, mapOf("discovery.type" to "single-node"))
6047

6148
@Test
6249
fun testElasticInsert() {
63-
val uri = "http://$address:$port"
50+
val uri = "http://${container.host}:${container.exposedPort}"
6451
stream {
6552
source("sometopic") {
6653
val config = elasticSearchConfig("elasticName", uri)
@@ -116,7 +103,9 @@ class TestElasticSearch {
116103
}
117104

118105
private fun queryUUIDHits(query: String): Int {
119-
val node = queryUUID("http://$address:$port", "q=$query")
106+
val uri = "http://${container.host}:${container.exposedPort}"
107+
108+
val node = queryUUID(uri, "q=$query")
120109
logger.info("Query uri: $node")
121110
val error = node.get("error")
122111
if (error == null || error is NullNode) {

floodplain-test/src/main/kotlin/io/floodplain/test/InstantiatedContainer.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package io.floodplain.test
2020

2121
import org.testcontainers.containers.GenericContainer
22+
import org.testcontainers.utility.DockerImageName
2223

2324
val useIntegraton: Boolean by lazy {
2425
System.getenv("NO_INTEGRATION") == null
@@ -29,7 +30,7 @@ val useIntegraton: Boolean by lazy {
2930
*/
3031
class InstantiatedContainer(image: String, port: Int, env: Map<String, String> = emptyMap()) {
3132

32-
class KGenericContainer(imageName: String) : GenericContainer<KGenericContainer>(imageName)
33+
class KGenericContainer(imageName: String) : GenericContainer<KGenericContainer>(DockerImageName.parse(imageName))
3334
var container: KGenericContainer?
3435
var host: String
3536
var exposedPort: Int = -1

immutable-impl/src/main/java/io/floodplain/immutable/impl/ImmutableMessageImpl.java

+1
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@ public ImmutableMessage rename(String columnName, String newName) {
201201
return this;
202202
}
203203

204+
@SuppressWarnings("unchecked")
204205
@Override
205206
public ImmutableMessage with(String key, Object value, ValueType type) {
206207

0 commit comments

Comments
 (0)