Skip to content

Commit 1b9b9fc

Browse files
committed
Add keytransformer (+test), upgrade some deps
1 parent 2f3f47b commit 1b9b9fc

File tree

11 files changed

+156
-11
lines changed

11 files changed

+156
-11
lines changed

build.gradle.kts

+2-2
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ buildscript {
1717
}
1818
plugins {
1919
id("eclipse")
20-
id("org.jetbrains.kotlin.jvm") version "1.4.20"
21-
id("org.jetbrains.kotlin.plugin.allopen") version "1.4.20"
20+
id("org.jetbrains.kotlin.jvm") version "1.4.31"
21+
id("org.jetbrains.kotlin.plugin.allopen") version "1.4.31"
2222
id("com.github.johnrengelman.shadow") version "5.2.0"
2323
id("org.jlleitschuh.gradle.ktlint") version "9.4.1"
2424
id("org.jetbrains.dokka") version "0.10.1"

buildSrc/build.gradle.kts

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
plugins {
22
`kotlin-dsl`
3-
id("org.jetbrains.kotlin.jvm").version("1.4.20")
3+
id("org.jetbrains.kotlin.jvm").version("1.4.31")
44
id("org.jetbrains.dokka").version("1.4.10.2")
55
id("io.gitlab.arturbosch.detekt").version ("1.14.1")
66

buildSrc/src/main/kotlin/Dependencies.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ fun isReleaseVersion(): Boolean {
1616
}
1717

1818
object FloodplainDeps {
19-
const val floodplain_version = "1.4.6-SNAPSHOT"
19+
const val floodplain_version = "1.4.7"
2020
const val jackson_version = "2.12.2"
2121
const val kafka_version = "2.7.0"
2222
const val slf4j_version = "1.7.30"

floodplain-dsl/src/main/kotlin/io/floodplain/kotlindsl/Floodplain.kt

+5
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import io.floodplain.reactive.source.topology.FilterTransformer
3333
import io.floodplain.reactive.source.topology.GroupTransformer
3434
import io.floodplain.reactive.source.topology.JoinRemoteTransformer
3535
import io.floodplain.reactive.source.topology.JoinWithTransformer
36+
import io.floodplain.reactive.source.topology.KeyTransformer
3637
import io.floodplain.reactive.source.topology.ScanTransformer
3738
import io.floodplain.reactive.source.topology.SetTransformer
3839
import io.floodplain.reactive.source.topology.SinkTransformer
@@ -184,6 +185,10 @@ fun PartialStream.set(transform: (String, IMessage, IMessage) -> IMessage): Tran
184185
return addTransformer(Transformer(this.rootTopology, set, topologyContext))
185186
}
186187

188+
fun PartialStream.keyTransform(transform: (String) -> String): Transformer {
189+
return addTransformer(Transformer(this.rootTopology, KeyTransformer(transform), topologyContext))
190+
}
191+
187192
// fun PartialPipe.copy()
188193
/**
189194
* Join the current source with another source, that has a different key.

floodplain-dsl/src/test/kotlin/io/floodplain/kotlindsl/TestTopology.kt

+18
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import io.floodplain.streams.debezium.JSONToReplicationMessage
2828
import io.floodplain.streams.remotejoin.StoreStateProcessor
2929
import kotlinx.coroutines.delay
3030
import org.apache.kafka.streams.state.KeyValueStore
31+
import org.junit.Assert
3132
import java.math.BigDecimal
3233
import java.time.Duration
3334
import java.util.Optional
@@ -579,6 +580,23 @@ class TestTopology {
579580
}
580581

581582
@Test
583+
fun testKeyTransformer() {
584+
stream {
585+
from("source") {
586+
keyTransform { key->"mon${key}" }
587+
sinkQualified("output")
588+
}
589+
}.renderAndExecute {
590+
inputQualified("source", "key", empty().set("value", "value1"))
591+
val (key, _) = outputQualified("output")
592+
Assert.assertEquals("monkey",key)
593+
deleteQualified("source","soon")
594+
val deleted = deletedQualified("output")
595+
Assert.assertEquals("monsoon",deleted)
596+
}
597+
}
598+
599+
@Test
582600
fun testDiff() {
583601
stream {
584602
source("@source") {

floodplain-integration/src/test/kotlin/io.floodplain.integration/FilmToMongoIntegratedSink.kt

-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ import kotlin.test.assertEquals
3838

3939
private val logger = mu.KotlinLogging.logger {}
4040

41-
@kotlinx.coroutines.ExperimentalCoroutinesApi
4241
class FilmToMongoIntegratedSink {
4342

4443
private val containerNetwork = Network.newNetwork()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package io.floodplain.reactive.source.topology;
20+
21+
import io.floodplain.reactive.source.topology.api.TopologyPipeComponent;
22+
import io.floodplain.streams.api.TopologyContext;
23+
import io.floodplain.streams.remotejoin.KeyProcessor;
24+
import io.floodplain.streams.remotejoin.ReplicationTopologyParser;
25+
import io.floodplain.streams.remotejoin.TopologyConstructor;
26+
import org.apache.kafka.streams.Topology;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
import java.util.Stack;
31+
import java.util.function.Function;
32+
33+
public class KeyTransformer implements TopologyPipeComponent {
34+
35+
private boolean materialize;
36+
37+
// private final boolean fromEmpty;
38+
private final Function<String,String> keyTransformer;
39+
40+
41+
private final static Logger logger = LoggerFactory.getLogger(KeyTransformer.class);
42+
public KeyTransformer(Function<String, String> keyTransformer) {
43+
this.keyTransformer = keyTransformer;
44+
}
45+
46+
47+
@Override
48+
public void addToTopology(Stack<String> transformerNames, int currentPipeId, Topology topology,
49+
TopologyContext topologyContext, TopologyConstructor topologyConstructor) {
50+
KeyProcessor fp = new KeyProcessor(this.keyTransformer);
51+
String name = topologyContext.qualifiedName("keyTransform", transformerNames.size(), currentPipeId);
52+
logger.info("Adding processor: {} to parent: {} hash: {}", name, transformerNames, transformerNames.hashCode());
53+
if (this.materialize) {
54+
topology.addProcessor(name + "_prematerialize", () -> fp, transformerNames.peek());
55+
ReplicationTopologyParser.addMaterializeStore(topology, topologyContext, topologyConstructor, name, name + "_prematerialize");
56+
} else {
57+
topology.addProcessor(name, () -> fp, transformerNames.peek());
58+
}
59+
transformerNames.push(name);
60+
}
61+
62+
@Override
63+
public boolean materializeParent() {
64+
return false;
65+
}
66+
67+
@Override
68+
public void setMaterialize() {
69+
this.materialize = true;
70+
}
71+
72+
73+
}

floodplain-test/src/main/kotlin/io/floodplain/test/InstantiatedContainer.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ class InstantiatedContainer(image: String, port: Int, env: Map<String, String> =
5252
// KafkaContainer("5.5.3").withEmbeddedZookeeper().withExposedPorts(9092,9093)
5353
class InstantiatedKafkaContainer(customizer: ((KafkaContainer) -> KafkaContainer)? = null) {
5454
// class KGenericContainer(imageName: String) : GenericContainer<KGenericContainer>(DockerImageName.parse(imageName))
55-
val container = KafkaContainer(DockerImageName("confluentinc/cp-kafka:6.0.2"))
55+
val container = KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.0.2"))
5656
.apply { withExposedPorts(9092, 9093) }
5757
.apply { withStartupTimeout(Duration.ofMinutes(5)) }
5858
.apply { withStartupAttempts(50) }
@@ -61,11 +61,11 @@ class InstantiatedKafkaContainer(customizer: ((KafkaContainer) -> KafkaContainer
6161
var host: String
6262
var exposedPort: Int = -1
6363
init {
64-
container?.start()
65-
host = container?.host ?: "localhost"
66-
exposedPort = container?.getMappedPort(9093) ?: -1
64+
container.start()
65+
host = container.host ?: "localhost"
66+
exposedPort = container.getMappedPort(9093) ?: -1
6767
}
6868
fun close() {
69-
container?.close()
69+
container.close()
7070
}
7171
}

gradle.properties

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
#Gradle properties
22
systemProp.org.gradle.internal.publish.checksums.insecure=true
3-
kotlinVersion=1.4.20
3+
kotlinVersion=1.4.31
44
org.gradle.jvmargs=-Xmx1024m
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package io.floodplain.streams.remotejoin;
20+
21+
import io.floodplain.replication.api.ReplicationMessage;
22+
import org.apache.kafka.streams.processor.AbstractProcessor;
23+
24+
import java.util.function.Function;
25+
26+
/**
27+
*
28+
*/
29+
public class KeyProcessor extends AbstractProcessor<String, ReplicationMessage> {
30+
31+
private final Function<String, String> keyTransform;
32+
33+
public KeyProcessor(Function<String,String> keyTransform) {
34+
this.keyTransform = keyTransform;
35+
}
36+
@Override
37+
public void process(String key, ReplicationMessage value) {
38+
super.context().forward(keyTransform.apply(key), value);
39+
}
40+
41+
}

streams/src/main/java/io/floodplain/streams/remotejoin/ReplicationTopologyParser.java

+9
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,15 @@ public static void materializeStateStores(TopologyConstructor topologyConstructo
9797
}
9898
}
9999

100+
public static void addKeyTransformProcessor(Topology current, TopologyContext topologyContext,
101+
TopologyConstructor topologyConstructor, String fromProcessor,
102+
Function<String,String> keyTransformer) {
103+
// TODO I think the topologyContext should be used.
104+
// current.addProcessor(diffProcessorNamePrefix, () -> new DiffProcessor(diffProcessorNamePrefix), fromProcessor);
105+
// addStateStoreMapping(topologyConstructor.processorStateStoreMapper, diffProcessorNamePrefix, diffProcessorNamePrefix);
106+
// logger.info("Granting access for processor: {} to store: {}", diffProcessorNamePrefix, diffProcessorNamePrefix);
107+
// topologyConstructor.stateStoreSupplier.put(diffProcessorNamePrefix, createMessageStoreSupplier(diffProcessorNamePrefix, true));
108+
}
100109

101110
// I think we need to use the topologyContext
102111
public static void addDiffProcessor(Topology current, TopologyContext topologyContext,

0 commit comments

Comments
 (0)