Skip to content

Commit bc3bcea

Browse files
committed
fix formatting
1 parent dd06e61 commit bc3bcea

File tree

17 files changed

+98
-113
lines changed

17 files changed

+98
-113
lines changed

floodplain-dsl/src/main/kotlin/io/floodplain/kotlindsl/Floodplain.kt

+9-17
Original file line numberDiff line numberDiff line change
@@ -89,28 +89,25 @@ interface SinkConfig : Config {
8989
fun sinkElements(): Map<Topic, List<FloodplainSink>>
9090
}
9191

92-
abstract class AbstractSinkConfig: SinkConfig {
92+
abstract class AbstractSinkConfig : SinkConfig {
9393
// var instantiatedSinkElements: Map<Topic, List<FloodplainSink>>? = null
9494
var sinkTask: SinkTask? = null
9595
var floodplainSink: FloodplainSink? = null
9696
var connector: SinkConnector? = null
9797
override fun instantiateSinkElements(): List<Map<String, String>> {
9898
val configs = materializeConnectorConfig()
99-
if(configs.size > 1) {
99+
if (configs.size > 1) {
100100
throw RuntimeException("Multiple configs not supported for now")
101101
}
102102
return configs.map {
103103
it.settings
104104
}.toList()
105-
106105
}
107106
override fun sinkElements(): Map<Topic, List<FloodplainSink>> {
108107
return instantiateSinkConfig(this)
109108
}
110-
111109
}
112110

113-
114111
class MaterializedConfig(val name: String, val topics: List<Topic>, val settings: Map<String, String>)
115112

116113
interface SourceTopic {
@@ -223,7 +220,6 @@ fun PartialStream.joinMulti(key: (IMessage) -> String?, secondaryKey: (IMessage)
223220
addTransformer(Transformer(this.rootTopology, jrt, topologyContext))
224221
}
225222

226-
227223
@Deprecated("Don't use unqualified topics")
228224
fun PartialStream.joinAttributes(withTopic: String, nameAttribute: String, valueAttribute: String, vararg keys: String) {
229225
return joinAttributes(withTopic, nameAttribute, valueAttribute) { msg ->
@@ -242,7 +238,7 @@ fun PartialStream.joinAttributes(withTopic: String, nameAttribute: String, value
242238
},
243239
{
244240
set {
245-
_, msg, acc ->
241+
_, msg, acc ->
246242
val name = msg[nameAttribute] as String?
247243
val value = msg[valueAttribute]
248244
if (name == null) {
@@ -254,7 +250,7 @@ fun PartialStream.joinAttributes(withTopic: String, nameAttribute: String, value
254250
},
255251
{
256252
set {
257-
_, msg, acc ->
253+
_, msg, acc ->
258254
acc.clear(msg[nameAttribute] as String); acc
259255
}
260256
}
@@ -365,7 +361,7 @@ private fun createSource(
365361
return source
366362
}
367363

368-
@Deprecated("Don't use unqualified topics",ReplaceWith("qualifiedTopic"))
364+
@Deprecated("Don't use unqualified topics", ReplaceWith("qualifiedTopic"))
369365
fun FloodplainOperator.topic(name: String): Topic {
370366
return Topic.from(name, topologyContext)
371367
}
@@ -380,10 +376,10 @@ fun FloodplainOperator.qualifiedTopic(name: String): Topic {
380376
* alltogether. When that is the case, we can simply use the qualified string instead of the Topic object
381377
*/
382378
fun FloodplainOperator.generationalTopic(name: String): Topic {
383-
if(name.startsWith("@")) {
379+
if (name.startsWith("@")) {
384380
throw RuntimeException("Can't create generationalTopic that starts with '@', remove the '@' from $name")
385381
}
386-
return Topic.from("@$name",topologyContext)
382+
return Topic.from("@$name", topologyContext)
387383
}
388384

389385
/**
@@ -478,7 +474,6 @@ fun PartialStream.externalSinkQualified(topic: String): Transformer {
478474
return addTransformer(Transformer(this.rootTopology, sink, topologyContext))
479475
}
480476

481-
482477
/**
483478
* Creates a simple sink that takes a lambda that will choose a **qualified** topic */
484479
fun PartialStream.dynamicSink(name: String, extractor: (String, IMessage) -> String): Transformer {
@@ -647,10 +642,7 @@ interface FloodplainOperator {
647642
/**
648643
* Concrete version of a partial pipe
649644
*/
650-
class Block(rootTopology: Stream, override val topologyContext: TopologyContext) : PartialStream(rootTopology) {
651-
652-
}
653-
645+
class Block(rootTopology: Stream, override val topologyContext: TopologyContext) : PartialStream(rootTopology)
654646

655647
class AbstractFloodplainSink(private val task: SinkTask, private val config: SinkConfig) : FloodplainSink {
656648
private val offsetCounter = AtomicLong(System.currentTimeMillis())
@@ -684,4 +676,4 @@ class AbstractFloodplainSink(private val task: SinkTask, private val config: Sin
684676
override fun taskObject(): Any? {
685677
TODO("Not yet implemented")
686678
}
687-
}
679+
}

floodplain-dsl/src/main/kotlin/io/floodplain/kotlindsl/FloodplainConnector.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ fun instantiateSinkConfig(config: SinkConfig): MutableMap<Topic, MutableList<Flo
152152
val result = mutableMapOf<Topic, MutableList<FloodplainSink>>()
153153
val materializedSinks = config.materializeConnectorConfig()
154154
materializedSinks.forEach { materializedSink ->
155-
val connectorInstance = Class.forName(materializedSink.settings["connector.class"]).getDeclaredConstructor().newInstance() as SinkConnector
155+
val connectorInstance = Class.forName(materializedSink.settings["connector.class"]).getDeclaredConstructor().newInstance() as SinkConnector
156156
connectorInstance.start(materializedSink.settings)
157157
val task = connectorInstance.taskClass().getDeclaredConstructor().newInstance() as SinkTask
158158
task.start(materializedSink.settings)

floodplain-dsl/src/main/kotlin/io/floodplain/kotlindsl/LocalRuntime.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ import java.util.Properties
7070

7171
private val logger = mu.KotlinLogging.logger {}
7272

73-
interface InputReceiver: FloodplainOperator {
73+
interface InputReceiver : FloodplainOperator {
7474
fun input(topic: Topic, key: String, msg: IMessage)
7575
fun input(topic: Topic, key: ByteArray, msg: ByteArray)
7676
fun inputQualified(topic: String, key: String, msg: IMessage)
@@ -164,7 +164,7 @@ class LocalDriverContext(
164164
override val rootTopology: Stream,
165165
override val topologyContext: TopologyContext,
166166
private val topologyConstructor: TopologyConstructor,
167-
private val config: Map<String,String>,
167+
private val config: Map<String, String>,
168168
private val sourceConfigs: List<SourceConfig>,
169169
private val sinkConfigs: List<SinkConfig>,
170170
private val sinks: List<Pair<String, String>>,
@@ -262,7 +262,7 @@ class LocalDriverContext(
262262
// it.instantiateSinkElements()
263263
// it.sinkElements().entries
264264
}.forEach { entry ->
265-
entry.entries.forEach { (key,value)->
265+
entry.entries.forEach { (key, value) ->
266266
val list = result.computeIfAbsent(key) { mutableListOf() }
267267
list.addAll(value)
268268
}

floodplain-dsl/src/main/kotlin/io/floodplain/kotlindsl/Stream.kt

+27-22
Original file line numberDiff line numberDiff line change
@@ -191,20 +191,20 @@ class Stream(override val topologyContext: TopologyContext, val topologyConstruc
191191
* instance pointing at the kafka cluster at kafkaHosts, using the supplied clientId.
192192
* Finally, it will POST the supplied
193193
*/
194-
fun renderAndSchedule(connectorURL: URL?, kafkaHosts: String, force: Boolean = false, initialSettings: Map<String, String>? = null, monitor: (suspend Stream.(KafkaStreams)->Unit)? = null): KafkaStreams {
194+
fun renderAndSchedule(connectorURL: URL?, kafkaHosts: String, force: Boolean = false, initialSettings: Map<String, String>? = null, monitor: (suspend Stream.(KafkaStreams) -> Unit)? = null): KafkaStreams {
195195
val (topology, sources, sinks) = render()
196-
val settings = initialSettings?.toMutableMap()?: mutableMapOf()
197-
val replicationFactor = initialSettings?.get(StreamsConfig.REPLICATION_FACTOR_CONFIG)?:"1"
196+
val settings = initialSettings?.toMutableMap() ?: mutableMapOf()
197+
val replicationFactor = initialSettings?.get(StreamsConfig.REPLICATION_FACTOR_CONFIG) ?: "1"
198198
settings["bootstrap.servers"] = kafkaHosts
199-
settings.putIfAbsent("offset.storage.topic","OFFSET_STORAGE")
200-
settings.putIfAbsent("config.storage.topic","CONFIG_STORAGE")
201-
settings.putIfAbsent("status.storage.topic","STATUS_STORAGE")
202-
settings.putIfAbsent("offset.storage.replication.factor",replicationFactor)
203-
settings.putIfAbsent("offset.storage.partitions","2")
204-
settings.putIfAbsent("status.storage.replication.factor",replicationFactor)
205-
settings.putIfAbsent("status.storage.partitions","2")
206-
settings.putIfAbsent("config.storage.replication.factor",replicationFactor)
207-
settings.putIfAbsent("group.id","${topologyContext.applicationId()}-connectors")
199+
settings.putIfAbsent("offset.storage.topic", "OFFSET_STORAGE")
200+
settings.putIfAbsent("config.storage.topic", "CONFIG_STORAGE")
201+
settings.putIfAbsent("status.storage.topic", "STATUS_STORAGE")
202+
settings.putIfAbsent("offset.storage.replication.factor", replicationFactor)
203+
settings.putIfAbsent("offset.storage.partitions", "2")
204+
settings.putIfAbsent("status.storage.replication.factor", replicationFactor)
205+
settings.putIfAbsent("status.storage.partitions", "2")
206+
settings.putIfAbsent("config.storage.replication.factor", replicationFactor)
207+
settings.putIfAbsent("group.id", "${topologyContext.applicationId()}-connectors")
208208
topologyConstructor.createTopicsAsNeeded(settings.toMap())
209209
sources.forEach { (name, json) ->
210210
connectorURL?.let {
@@ -219,11 +219,11 @@ class Stream(override val topologyContext: TopologyContext, val topologyConstruc
219219
instantiateLocalSinks(settings)
220220
val appId = topologyContext.topicName("@applicationId")
221221
val extra: MutableMap<String, Any> = mutableMapOf()
222-
extra.putAll(settings)
222+
extra.putAll(settings)
223223
val streams = runTopology(topology, appId, kafkaHosts, "storagePath", extra)
224224
logger.info { "Topology running!" }
225225
runBlocking {
226-
monitor?.invoke(this@Stream,streams)
226+
monitor?.invoke(this@Stream, streams)
227227
}
228228
return streams
229229
}
@@ -250,14 +250,14 @@ class Stream(override val topologyContext: TopologyContext, val topologyConstruc
250250
}
251251
}
252252

253-
private fun extendWorkerProperties(workerProps: MutableMap<String,String>) {
253+
private fun extendWorkerProperties(workerProps: MutableMap<String, String>) {
254254
workerProps["key.converter"] = "org.apache.kafka.connect.json.JsonConverter"
255255
workerProps["value.converter"] = "org.apache.kafka.connect.json.JsonConverter"
256256
// workerProps["offset.storage.file.filename"] = "offset"
257257
// TODO: Using port 8084 now, that might clash. Random? Don't know.
258258
workerProps[WorkerConfig.LISTENERS_CONFIG] = "http://127.0.0.1:8084"
259259
val keys = workerProps.keys.toSet()
260-
keys.filter { it.startsWith("security") || it.startsWith("sasl")|| it.startsWith("ssl") || it.startsWith("bootstrap")}
260+
keys.filter { it.startsWith("security") || it.startsWith("sasl") || it.startsWith("ssl") || it.startsWith("bootstrap") }
261261
.forEach { key ->
262262
workerProps["consumer.$key"] = workerProps[key]!!
263263
workerProps["producer.$key"] = workerProps[key]!!
@@ -272,15 +272,16 @@ class Stream(override val topologyContext: TopologyContext, val topologyConstruc
272272
}
273273

274274
private fun startLocalConnect(initialWorkerProps: Map<String, String>): Herder {
275-
val workerProps = mutableMapOf<String,String>()
275+
val workerProps = mutableMapOf<String, String>()
276276
workerProps.putAll(initialWorkerProps)
277277
extendWorkerProperties(workerProps)
278278
val plugins = Plugins(workerProps)
279279
val config = DistributedConfig(workerProps)
280280
val time: Time = Time.SYSTEM
281281
val connectorClientConfigOverridePolicy: ConnectorClientConfigOverridePolicy = plugins.newPlugin(
282282
config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
283-
config, ConnectorClientConfigOverridePolicy::class.java
283+
config,
284+
ConnectorClientConfigOverridePolicy::class.java
284285
)
285286
val kafkaClusterId: String = ConnectUtils.lookupKafkaClusterId(config)
286287
logger.debug("Kafka cluster ID: $kafkaClusterId")
@@ -293,15 +294,19 @@ class Stream(override val topologyContext: TopologyContext, val topologyConstruc
293294

294295
val workerId: String = advertisedUrl.host.toString() + ":" + advertisedUrl.port
295296
val worker = Worker(
296-
workerId, time, plugins, config, offsetBackingStore,
297+
workerId,
298+
time,
299+
plugins,
300+
config,
301+
offsetBackingStore,
297302
connectorClientConfigOverridePolicy
298303
)
299304

300305
val statusBackingStore: StatusBackingStore = KafkaStatusBackingStore(time, worker.getInternalValueConverter())
301306
statusBackingStore.configure(config)
302307

303-
val configBackingStore = KafkaConfigBackingStore(worker.getInternalValueConverter(),config,worker.configTransformer())
304-
val herder = DistributedHerder(config,time,worker,kafkaClusterId,statusBackingStore,configBackingStore,advertisedUrl.toString(),connectorClientConfigOverridePolicy)
308+
val configBackingStore = KafkaConfigBackingStore(worker.getInternalValueConverter(), config, worker.configTransformer())
309+
val herder = DistributedHerder(config, time, worker, kafkaClusterId, statusBackingStore, configBackingStore, advertisedUrl.toString(), connectorClientConfigOverridePolicy)
305310

306311
// val herder: Herder = StandaloneHerder(worker, kafkaClusterId, connectorClientConfigOverridePolicy)
307312
val connect = Connect(herder, rest)
@@ -333,7 +338,7 @@ class Stream(override val topologyContext: TopologyContext, val topologyConstruc
333338
// }
334339
}
335340

336-
private fun renderLocal(): Pair<Topology,List<MaterializedConfig>> {
341+
private fun renderLocal(): Pair<Topology, List<MaterializedConfig>> {
337342
val topology = renderTopology()
338343
val sources = sourceConfigurations().flatMap { element ->
339344
element.materializeConnectorConfig()

floodplain-dsl/src/main/kotlin/io/floodplain/kotlindsl/message/IMessage.kt

-1
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ interface IMessage {
6262
fun merge(msg: IMessage): IMessage
6363
fun messageList(path: String): List<IMessage>?
6464
fun messageElement(path: String, index: Int): IMessage?
65-
6665
}
6766

6867
private data class IMessageImpl(private val content: MutableMap<String, Any>) : IMessage {

floodplain-dsl/src/test/kotlin/io/floodplain/kotlindsl/IMessageTest.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -236,10 +236,10 @@ class IMessageTest {
236236
fun testQueryMessageListElement() {
237237
val msg = createComplexMessage()
238238
val id = msg.messageList("sublist")?.get(0)?.string("astring")
239-
val id2 = msg.messageElement("sublist",0)?.string("astring")
240-
val nonExistingIndex = msg.messageElement("sublist",15)?.string("astring")
241-
val nonExistingList = msg.messageElement("sublist2",0)?.string("astring")
242-
assertEquals(id,id2)
239+
val id2 = msg.messageElement("sublist", 0)?.string("astring")
240+
val nonExistingIndex = msg.messageElement("sublist", 15)?.string("astring")
241+
val nonExistingList = msg.messageElement("sublist2", 0)?.string("astring")
242+
assertEquals(id, id2)
243243
assertNull(nonExistingIndex)
244244
assertNull(nonExistingList)
245245
}

floodplain-dsl/src/test/kotlin/io/floodplain/kotlindsl/TestTopology.kt

+15-16
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class TestTopology {
5959
}
6060

6161
private fun testQualifiedWithTenantAndDeployment(tenant: String?, deployment: String?, generation: String?) {
62-
stream(tenant,deployment,generation?:"defaultGeneration") {
62+
stream(tenant, deployment, generation ?: "defaultGeneration") {
6363
from("sometopic") {
6464
sinkQualified("outputTopic")
6565
}
@@ -72,7 +72,7 @@ class TestTopology {
7272
}
7373

7474
private fun testNonQualifiedWithTenantAndDeployment(tenant: String?, deployment: String?, generation: String?) {
75-
stream(tenant,deployment, generation?: "defaultGeneration") {
75+
stream(tenant, deployment, generation ?: "defaultGeneration") {
7676
source("@sometopic") {
7777
sink("@outputTopic")
7878
}
@@ -89,15 +89,15 @@ class TestTopology {
8989

9090
// Test both qualified and non-qualified topic resolution.
9191
// I intend to drop non-qualified in the future
92-
testNonQualifiedWithTenantAndDeployment(null,null,null)
93-
testNonQualifiedWithTenantAndDeployment("ten",null,null)
94-
testNonQualifiedWithTenantAndDeployment("ten","dep",null)
95-
testNonQualifiedWithTenantAndDeployment("ten","dep","somegeneration")
96-
97-
testQualifiedWithTenantAndDeployment(null,null,null)
98-
testQualifiedWithTenantAndDeployment("ten",null,null)
99-
testQualifiedWithTenantAndDeployment("ten","dep",null)
100-
testQualifiedWithTenantAndDeployment("ten","dep","somegeneration")
92+
testNonQualifiedWithTenantAndDeployment(null, null, null)
93+
testNonQualifiedWithTenantAndDeployment("ten", null, null)
94+
testNonQualifiedWithTenantAndDeployment("ten", "dep", null)
95+
testNonQualifiedWithTenantAndDeployment("ten", "dep", "somegeneration")
96+
97+
testQualifiedWithTenantAndDeployment(null, null, null)
98+
testQualifiedWithTenantAndDeployment("ten", null, null)
99+
testQualifiedWithTenantAndDeployment("ten", "dep", null)
100+
testQualifiedWithTenantAndDeployment("ten", "dep", "somegeneration")
101101
}
102102
@Test
103103
fun testDelete() {
@@ -556,10 +556,9 @@ class TestTopology {
556556
fun testReplicationParserForDateTime() {
557557
val data = javaClass.classLoader.getResource("decimalwithscale.json")?.readBytes()
558558
?: throw IllegalArgumentException("Missing json file for decimalwithscale.json")
559-
val replicationMessage = JSONToReplicationMessage.processDebeziumBody(data, Optional.empty());
559+
val replicationMessage = JSONToReplicationMessage.processDebeziumBody(data, Optional.empty())
560560
val date = replicationMessage.value("payment_date")
561561
assertNotNull(date)
562-
563562
}
564563

565564
@Test
@@ -883,9 +882,9 @@ class TestTopology {
883882
sinkQualified("output")
884883
}
885884
}.renderAndExecute {
886-
input( qualifiedTopic("external"), originalKey.toByteArray(), body.toByteArray())
887-
val (key,value) = output(qualifiedTopic("output"))
888-
assertEquals("965",key)
885+
input(qualifiedTopic("external"), originalKey.toByteArray(), body.toByteArray())
886+
val (key, value) = output(qualifiedTopic("output"))
887+
assertEquals("965", key)
889888
logger.info("Result: $value")
890889
}
891890
}

floodplain-example/src/main/kotlin/io/floodplain/kotlindsl/example/Class.kt

-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package io.floodplain.kotlindsl.example
2020

2121
import io.floodplain.kotlindsl.buffer
22-
import io.floodplain.kotlindsl.from
2322
import io.floodplain.kotlindsl.joinAttributes
2423
import io.floodplain.kotlindsl.joinRemote
2524
import io.floodplain.kotlindsl.set

floodplain-fhir/src/main/kotlin/FhirSource.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ fun Stream.fhirGeneric(topic: Topic, init: Source.() -> Unit = {}) {
5151
}
5252

5353
fun <T : IBaseResource> PartialStream.fhirSource(topic: String, transform: (T) -> IMessage, init: Source.() -> Unit = {}): Source {
54-
return fhirSource<T>(Topic.fromQualified(topic,topologyContext),transform,init)
54+
return fhirSource<T>(Topic.fromQualified(topic, topologyContext), transform, init)
5555
}
5656

5757
fun <T : IBaseResource> PartialStream.fhirSource(topic: Topic, transform: (T) -> IMessage, init: Source.() -> Unit = {}): Source {

floodplain-googlesheets/src/main/kotlin/io/floodplain/sink/sheet/GoogleSheetSink.kt

-1
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@ class GoogleSheetConfiguration(override val topologyContext: TopologyContext, ov
8888
return instantiatedSinkElements ?: emptyMap()
8989
}
9090

91-
9291
override fun sinkTask(): Any? {
9392
return googleTask
9493
}

floodplain-googlesheets/src/test/kotlin/GoogleSheetTest.kt

-2
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,7 @@
1919
import io.floodplain.kotlindsl.each
2020
import io.floodplain.kotlindsl.from
2121
import io.floodplain.kotlindsl.message.empty
22-
import io.floodplain.kotlindsl.qualifiedTopic
2322
import io.floodplain.kotlindsl.set
24-
import io.floodplain.kotlindsl.source
2523
import io.floodplain.kotlindsl.stream
2624
import io.floodplain.sink.sheet.GoogleSheetConfiguration
2725
import io.floodplain.sink.sheet.googleSheetConfig

floodplain-integration/src/test/kotlin/io.floodplain.integration/Film.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ class FilmSimple {
7575
)
7676
postgresSource("film", postgresConfig) {
7777
// Clear the last_update field, it makes no sense in a denormalized situation
78-
each { _,msg,_ ->
78+
each { _, msg, _ ->
7979
logger.info("Last update: ${msg["last_update"]?.javaClass}")
8080
}
8181
toMongo("filmwithactors", "$generation-filmwithcat", mongoConfig)

0 commit comments

Comments
 (0)