Skip to content

Commit 609687e

Browse files
committed
Improve static code analysis, fix formatting / complexity issues
1 parent 8d54d2b commit 609687e

File tree

18 files changed

+126
-58
lines changed

18 files changed

+126
-58
lines changed

.editorconfig

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
[*.{kt,kts}]
2-
max_line_length=130
2+
max_line_length=120

build.gradle.kts

+42-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import io.floodplain.build.FloodplainDeps
22
import io.floodplain.build.isReleaseVersion
33
import io.gitlab.arturbosch.detekt.Detekt
4-
import org.gradle.kotlin.dsl.detekt as detekt
54

65
buildscript {
76
repositories {
@@ -31,6 +30,7 @@ plugins {
3130
signing
3231
`maven-publish`
3332
`java-library`
33+
jacoco
3434
}
3535

3636
dependencies {
@@ -67,6 +67,7 @@ subprojects {
6767
apply(plugin = "org.jetbrains.dokka")
6868
apply(plugin = "com.github.hierynomus.license-base")
6969
apply(plugin = "checkstyle")
70+
apply(plugin = "jacoco")
7071
if (!isKotlinModule(this)) {
7172
apply(plugin = "com.github.spotbugs")
7273
}
@@ -249,7 +250,46 @@ val detektAll by tasks.registering(Detekt::class) {
249250
exclude("**/build/**")
250251
// baseline.set(project.rootDir.resolve("reports/baseline.xml"))
251252
reports {
252-
xml.enabled = false
253+
xml.enabled = true
253254
html.enabled = true
254255
}
255256
}
257+
258+
jacoco {
259+
toolVersion = "0.8.6"
260+
}
261+
262+
tasks.test {
263+
finalizedBy(tasks.jacocoTestReport) // report is always generated after tests run
264+
}
265+
266+
tasks.jacocoTestReport {
267+
dependsOn(tasks.test) // tests are required to run before generating the report
268+
}
269+
270+
// tasks.jacocoTestReport {
271+
// reports {
272+
// xml.isEnabled = true
273+
// csv.isEnabled = false
274+
// html.isEnabled = true
275+
// html.destination = layout.buildDirectory.dir("jacocoHtml").get().asFile
276+
// }
277+
// }
278+
val codeCoverageReport by tasks.creating(JacocoReport::class) {
279+
executionData(fileTree(project.rootDir.absolutePath).include("**/build/jacoco/*.exec"))
280+
281+
subprojects.onEach {
282+
sourceSets(it.sourceSets["main"])
283+
}
284+
285+
reports {
286+
// sourceDirectories = files(sourceSets["main"].allSource.srcDirs)
287+
// classDirectories = files(sourceSets["main"].output)
288+
xml.isEnabled = true
289+
xml.destination = File("$buildDir/reports/jacoco/report.xml")
290+
html.isEnabled = true
291+
csv.isEnabled = false
292+
}
293+
294+
dependsOn("test")
295+
}

buildSrc/build.gradle.kts

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,4 @@ repositories {
1010
mavenLocal()
1111
mavenCentral()
1212
gradlePluginPortal()
13-
}
13+
}

detektConfig.yml

+5-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ exceptions:
55
# excludes: ['**/Chunk.kt']
66
formatting:
77
MaximumLineLength:
8-
maxLineLength: 140
8+
maxLineLength: 120
99

1010
complexity:
1111
TooManyFunctions:
@@ -14,6 +14,10 @@ complexity:
1414
excludes: ['**/test/**']
1515
LargeClass:
1616
excludes: ['**/test/**']
17+
LongParameterList:
18+
functionThreshold: 7
19+
constructorThreshold: 10
20+
ignoreDefaultParameters: true
1721

1822
style:
1923
ReturnCount:

floodplain-debezium/src/main/kotlin/io/floodplain/debezium/postgres/DebeziumSource.kt

+9-4
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ internal class EngineKillSwitch(var engine: DebeziumEngine<ChangeEvent<String, S
5656

5757
@OptIn(ExperimentalPathApi::class)
5858
private fun createOffsetFilePath(offsetId: String?): Path {
59-
val tempFile = createTempFile(offsetId ?: UUID.randomUUID().toString().substring(0, 7))
59+
val tempFile = createTempFile(offsetId ?: UUID.randomUUID().toString().substring(0..7))
6060
if (offsetId == null) {
6161
tempFile.toFile().deleteOnExit()
6262
}
@@ -96,13 +96,15 @@ private fun createLocalDebeziumSettings(
9696

9797
/**
9898
* @return A hot flow of ChangeRecord. Perhaps one day there might be a colder one.
99-
* @param name: The prefix of the outgoing 'topic', basically the destination field of the changerecord is <topicprefix>.<schema>.<table>
99+
* @param name: The prefix of the outgoing 'topic', basically the destination
100+
* field of the changerecord is <topicprefix>.<schema>.<table>
100101
* @param hostname: The host of the postgres database
101102
* @param port: The port of the postgres database
102103
* @param username: The username of the postgres database
103104
* @param password: The password of the postgres database
104105
* @param offsetId: By default, we will save the offsets in a file path
105-
* @param settings An optional string-string map, that represents any extra parameters you want to pass to Debezium
106+
* @param settings An optional string-string map, that represents any extra
107+
* parameters you want to pass to Debezium
106108
* Defaults to empty map.
107109
*
108110
*/
@@ -117,7 +119,10 @@ fun createDebeziumChangeFlow(
117119
offsetId: String? = null,
118120
settings: Map<String, String> = emptyMap()
119121
): Flow<ChangeRecord> {
120-
val props = createLocalDebeziumSettings(name, taskClass, hostname, port, database, username, password, offsetId, settings)
122+
val props = createLocalDebeziumSettings(
123+
name, taskClass, hostname, port, database,
124+
username, password, offsetId, settings
125+
)
121126
props.list(System.out)
122127
return runDebeziumServer(props)
123128
}

floodplain-dsl/src/main/kotlin/io/floodplain/kotlindsl/Floodplain.kt

+21-7
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ abstract class AbstractSinkConfig : SinkConfig {
103103
it.settings
104104
}.toList()
105105
}
106+
106107
override fun sinkElements(): Map<Topic, List<FloodplainSink>> {
107108
return instantiateSinkConfig(this)
108109
}
@@ -168,8 +169,8 @@ fun PartialStream.buffer(duration: Duration, maxSize: Int = 10000, inMemory: Boo
168169
/**
169170
* Modifies the incoming message before passing it on.
170171
* You can use it as a simple stateless transformation: Add constant values, remove fields or transform fields.
171-
* A 'set' is also required for more complicated combinations. Typically after a join you use a 'set' to merge the messages
172-
* from both sources.
172+
* A 'set' is also required for more complicated combinations. Typically after a join you use a 'set' to merge
173+
* the messages from both sources.
173174
* The first message is the original message from the join, the second message the result from the 'secondary' source.
174175
* The second message will be lost after this operation, so you need to append anything you want to keep from the inner
175176
* source to the outer source here.
@@ -256,7 +257,12 @@ fun PartialStream.joinMulti(
256257
addTransformer(Transformer(this.rootTopology, jrt, topologyContext))
257258
}
258259

259-
fun PartialStream.joinAttributes(withTopic: String, nameAttribute: String, valueAttribute: String, vararg keys: String) {
260+
fun PartialStream.joinAttributes(
261+
withTopic: String,
262+
nameAttribute: String,
263+
valueAttribute: String,
264+
vararg keys: String
265+
) {
260266
return joinAttributes(withTopic, nameAttribute, valueAttribute) { msg ->
261267
keys.joinToString(ReplicationMessage.KEYSEPARATOR) { msg[it].toString() }
262268
}
@@ -295,6 +301,7 @@ fun PartialStream.joinAttributes(
295301
}
296302
}
297303
}
304+
298305
/**
299306
* Group a source, using the key from the lambda. The messages will be unchanged,
300307
* only the key will have the supplied key pre-pended.
@@ -306,8 +313,8 @@ fun PartialStream.group(key: (IMessage) -> String) {
306313
}
307314

308315
/**
309-
* Create sub source (without qualifying with tenant / deployment / gen)
310-
*/
316+
* Create sub source (without qualifying with tenant / deployment / gen)
317+
*/
311318
fun PartialStream.from(topic: String, init: Source.() -> Unit = {}): Source {
312319
return createSource(Topic.fromQualified(topic, rootTopology.topologyContext), rootTopology, init)
313320
}
@@ -373,7 +380,8 @@ private fun existingDebeziumSource(topicSource: String, rootTopology: Stream, in
373380
}
374381

375382
/**
376-
* Creates a simple sink that will contain the result of the current transformation. Will not qualify with tenant / deployment
383+
* Creates a simple sink that will contain the result of the current transformation.
384+
* Will not qualify with tenant / deployment
377385
*/
378386
fun PartialStream.to(topic: String): Transformer {
379387
val sink = SinkTransformer(
@@ -472,6 +480,7 @@ fun PartialStream.scan(
472480
fun PartialStream.createBlock(): Block {
473481
return Block(rootTopology, topologyContext)
474482
}
483+
475484
/**
476485
* Scan is effectively a 'reduce' operator (The 'scan' name is used in Rx, which means a reduce operator that emits a
477486
* 'running aggregate' every time it consumes a message). A real reduce makes no sense in infinite streams
@@ -524,7 +533,12 @@ fun PartialStream.fork(vararg destinations: Block.() -> Unit): Transformer {
524533
* with no further meaning within the framework, you can choose what meaning you want to attach. You can increment a
525534
* number, use a sort of time stamp, or even a git commit.
526535
*/
527-
fun stream(tenant: String? = null, deployment: String? = null, generation: String = "any", init: Stream.() -> Unit): Stream {
536+
fun stream(
537+
tenant: String? = null,
538+
deployment: String? = null,
539+
generation: String = "any",
540+
init: Stream.() -> Unit
541+
): Stream {
528542
val topologyContext = TopologyContext.context(
529543
Optional.ofNullable(tenant),
530544
Optional.ofNullable(deployment),

floodplain-dsl/src/main/kotlin/io/floodplain/kotlindsl/FloodplainConnector.kt

+6-4
Original file line numberDiff line numberDiff line change
@@ -165,14 +165,17 @@ fun instantiateSinkConfig(config: SinkConfig): MutableMap<Topic, MutableList<Flo
165165
val materializedSinks = config.materializeConnectorConfig()
166166
materializedSinks.forEach { materializedSink ->
167167
val connectorInstance =
168-
Class.forName(materializedSink.settings["connector.class"]).getDeclaredConstructor().newInstance() as SinkConnector
168+
Class.forName(materializedSink.settings["connector.class"])
169+
.getDeclaredConstructor().newInstance() as SinkConnector
169170
connectorInstance.start(materializedSink.settings)
170171
val task = connectorInstance.taskClass().getDeclaredConstructor().newInstance() as SinkTask
171172
task.start(materializedSink.settings)
172173
val keyConverter =
173-
Class.forName(materializedSink.settings["key.converter"]).getDeclaredConstructor().newInstance() as Converter
174+
Class.forName(materializedSink.settings["key.converter"])
175+
.getDeclaredConstructor().newInstance() as Converter
174176
val valueConverter =
175-
Class.forName(materializedSink.settings["value.converter"]).getDeclaredConstructor().newInstance() as Converter
177+
Class.forName(materializedSink.settings["value.converter"])
178+
.getDeclaredConstructor().newInstance() as Converter
176179
keyConverter.configure(settingsWithPrefix(materializedSink.settings, "key.converter."), true)
177180
valueConverter.configure(settingsWithPrefix(materializedSink.settings, "value.converter."), false)
178181
val localSink = floodplainSinkFromTask(task, config, keyConverter, valueConverter)
@@ -204,7 +207,6 @@ class LocalConnectorSink(
204207
override fun send(topic: Topic, elements: List<Pair<ByteArray?, ByteArray?>>) {
205208
logger.info("Inserting # of documents ${elements.size} for topic: $topic")
206209
val list = elements.map { (key, value) ->
207-
// logger.info("Key: ${String(key?: byteArrayOf())} \nValue: ${String(value?: byteArrayOf())}")
208210
SinkRecord(
209211
topic.qualifiedString(),
210212
0,

floodplain-dsl/src/main/kotlin/io/floodplain/kotlindsl/GenericSource.kt

+5-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,11 @@ import io.floodplain.reactive.source.topology.CustomTopicSource
2323
import io.floodplain.replication.factory.ReplicationFactory
2424
import org.apache.kafka.common.serialization.Serdes
2525

26-
fun PartialStream.genericSource(topic: String, parseMessage: (ByteArray) -> IMessage?, init: Source.() -> Unit = {}): Source {
26+
fun PartialStream.genericSource(
27+
topic: String,
28+
parseMessage: (ByteArray) -> IMessage?,
29+
init: Source.() -> Unit = {}
30+
): Source {
2731
return genericSource(
2832
topic,
2933
{ Serdes.String().deserializer().deserialize(topic, it) },

floodplain-dsl/src/main/kotlin/io/floodplain/kotlindsl/PostgresSource.kt

+11-4
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ class PostgresConfig(
5757
}
5858

5959
private fun acceptRecord(inputReceiver: InputReceiver, record: ChangeRecord) {
60-
val availableSourceTopics = sourceElements.map { sourceElement -> sourceElement.topic().qualifiedString() }.toSet()
60+
val availableSourceTopics =
61+
sourceElements.map { sourceElement -> sourceElement.topic().qualifiedString() }.toSet()
6162
if (availableSourceTopics.contains(record.topic)) {
6263
if (record.value != null) {
6364
inputReceiver.input(record.topic, record.key.toByteArray(), record.value!!)
@@ -111,7 +112,7 @@ class PostgresConfig(
111112
fun Stream.postgresSourceConfig(
112113
name: String,
113114
hostname: String,
114-
port: Int,
115+
port: Int = 5432,
115116
username: String,
116117
password: String,
117118
database: String,
@@ -133,7 +134,12 @@ fun Stream.postgresSourceConfig(
133134
return postgresConfig
134135
}
135136

136-
fun PartialStream.postgresSource(table: String, config: PostgresConfig, schema: String? = null, init: Source.() -> Unit): Source {
137+
fun PartialStream.postgresSource(
138+
table: String,
139+
config: PostgresConfig,
140+
schema: String? = null,
141+
init: Source.() -> Unit
142+
): Source {
137143
val effectiveSchema = schema ?: config.defaultSchema
138144
?: throw IllegalArgumentException("No schema defined, and also no default schema")
139145
val topic = Topic.from("${config.name}.$effectiveSchema.$table", topologyContext)
@@ -165,7 +171,8 @@ fun Stream.postgresSource(name: String, table: String, schema: String, init: Sou
165171
addSource(databaseSource)
166172
}
167173

168-
class DebeziumSourceElement(private val prefix: Topic, private val schema: String? = null, val table: String) : SourceTopic {
174+
class DebeziumSourceElement(private val prefix: Topic, private val schema: String? = null, val table: String) :
175+
SourceTopic {
169176
override fun topic(): Topic {
170177
return prefix
171178
}

floodplain-dsl/src/main/kotlin/io/floodplain/kotlindsl/sink/LogSink.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ fun PartialStream.logSink(sinkName: String, topicName: String, config: LogSinkCo
7474
"connector.class" to "io.floodplain.sink.LogSinkConnector",
7575
"tasks.max" to "1",
7676
"value.converter" to "org.apache.kafka.connect.json.JsonConverter",
77-
"key.converter" to "org.apache.kafka.connect.storage.StringConverter", // "org.apache.kafka.connect.json.JsonConverter",
77+
"key.converter" to "org.apache.kafka.connect.storage.StringConverter",
7878
"topics" to topicName,
7979
"schema.ignore" to "true",
8080
"type.name" to "_doc"

floodplain-dsl/src/main/kotlin/io/floodplain/kotlindsl/transformer/BufferTransformer.kt

-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ class BufferTransformer(private val duration: Duration, private val maxSize: Int
4141
val top = transformerNames.peek()
4242
val name = topologyContext.qualifiedName("buffer", transformerNames.size, currentPipeId)
4343
if (materialize) {
44-
// val prematerialize = topologyContext.qualifiedName("buffer-prematerialize", transformerNames.size, currentPipeId)
4544
addPersistentCache(
4645
topology,
4746
topologyContext,

floodplain-dsl/src/test/kotlin/io/floodplain/kotlindsl/IMessageTest.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ class IMessageTest {
7373
return convertThereAndBack(afterJSON, protoParser)
7474
}
7575

76-
// Utility function to check if a message remains unchanged after serialization and deserialization for a specific parser
76+
// Utility function to check if a message remains unchanged after serialization
77+
// and deserialization for a specific parser
7778
private fun convertThereAndBack(input: IMessage, currentParser: ReplicationMessageParser): IMessage {
7879
val immutable = input.toImmutable()
7980
val repl = ReplicationFactory.standardMessage(immutable)

floodplain-example/src/main/kotlin/io/floodplain/kotlindsl/example/FloodplainReduce.kt

+6-3
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,17 @@ import io.floodplain.sink.sheet.googleSheetsSink
3030
import kotlinx.coroutines.delay
3131
import java.math.BigDecimal
3232

33+
const val DEFAULT_POSTGRES_PORT = 5432
34+
const val DEFAULT_TIMEOUT = 200000L
35+
3336
fun main() {
3437
val spreadsheetId = "1MTAn1d13M8ptb2MkBHOSNK1gbJOOW1sFQoSfqa1JbXU"
3538

36-
val instance = stream("genxx") {
39+
stream("genxx") {
3740
val postgresConfig = postgresSourceConfig(
3841
"mypostgres",
3942
"postgres",
40-
5432,
43+
DEFAULT_POSTGRES_PORT,
4144
"postgres",
4245
"mysecretpassword",
4346
"dvdrental",
@@ -80,6 +83,6 @@ fun main() {
8083
)
8184
}
8285
}.renderAndExecute {
83-
delay(1000000)
86+
delay(DEFAULT_TIMEOUT)
8487
}
8588
}

floodplain-example/src/main/kotlin/io/floodplain/kotlindsl/example/WordpressToMongoDB.kt

+4-1
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,12 @@ import io.floodplain.mongodb.toMongo
2727
import java.net.URL
2828

2929
private val logger = mu.KotlinLogging.logger {}
30+
31+
const val DEFAULT_MYSQL_PORT = 3306
3032
fun main() {
3133
stream {
32-
val mysqlConfig = mysqlSourceConfig("mysqlsource", "mysql", 3306, "root", "mysecretpassword", "wpdb")
34+
val mysqlConfig =
35+
mysqlSourceConfig("mysqlsource", "mysql", DEFAULT_MYSQL_PORT, "root", "mysecretpassword", "wpdb")
3336
val mongoConfig = mongoConfig("mongosink", "mongodb://mongo", "$generation-mongodumpalt")
3437
mysqlSource("wpdb.wp_posts", mysqlConfig) {
3538
each { key, msg, _ ->

floodplain-integration/src/test/kotlin/io.floodplain.integration/TestTopicCreation.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ class TestTopicCreation {
5959
logger.info("${adminClient.describeTopics(listOf("mytopic")).all().get()["mytopic"]}")
6060
val cr = Collections.singleton(ConfigResource(TOPIC, "mytopic"))
6161
val configsResult = adminClient.describeConfigs(cr)
62-
// val cnf = configsResult.all().get().
63-
// logger.info("Config: ${cnf.toString()}")
62+
val cnf = configsResult.all().get()
63+
logger.info("Config: $cnf")
6464
val configMap: MutableMap<String, String> = HashMap()
6565
configMap["cleanup.policy"] = "compact"
6666

0 commit comments

Comments
 (0)