Skip to content

Commit 2186426

Browse files
committed
remove unused code, remove usage of deprecated API's
1 parent 33f36b1 commit 2186426

File tree

12 files changed

+6
-307
lines changed

12 files changed

+6
-307
lines changed

floodplain-dsl/src/main/kotlin/io/floodplain/kotlindsl/Floodplain.kt

-3
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,9 @@ import io.floodplain.streams.api.Topic
4444
import io.floodplain.streams.api.TopologyContext
4545
import io.floodplain.streams.remotejoin.TopologyConstructor
4646
import org.apache.kafka.connect.sink.SinkConnector
47-
import org.apache.kafka.connect.sink.SinkRecord
4847
import org.apache.kafka.connect.sink.SinkTask
4948
import java.time.Duration
5049
import java.util.Optional
51-
import java.util.concurrent.atomic.AtomicLong
52-
import kotlin.system.measureTimeMillis
5350

5451
private val logger = mu.KotlinLogging.logger {}
5552

floodplain-dsl/src/main/kotlin/io/floodplain/kotlindsl/LocalRuntime.kt

+2-3
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,6 @@ class LocalDriverContext(
175175

176176
val connectJobs = mutableListOf<Job>()
177177
private val inputTopics = mutableMapOf<String, TestInputTopic<String, ReplicationMessage>>()
178-
// private val rawInputTopics = mutableMapOf<String, TestInputTopic<String, ByteArray>>()
179178

180179
private val outputTopics = mutableMapOf<String, TestOutputTopic<String, ReplicationMessage>>()
181180

@@ -277,8 +276,8 @@ class LocalDriverContext(
277276
val topics = topics()
278277
val deserializer = JsonDeserializer() // TODO protobuf issue, topic is not in json connect
279278
val mapper = ObjectMapper()
280-
mapper.findAndRegisterModules()
281-
val fallback = FallbackReplicationMessageParser()
279+
mapper.findAndRegisterModules() // use shared mapping object
280+
val fallback = FallbackReplicationMessageParser() // use shared parser
282281
val sourceFlow = outputFlowSingle()
283282
.map { (topic, key, value) ->
284283
val parsed = if (value == null) null else fallback.parseBytes(Optional.ofNullable(topic.qualifiedString()), value).valueMap(

floodplain-dsl/src/main/kotlin/io/floodplain/kotlindsl/Stream.kt

+2-25
Original file line numberDiff line numberDiff line change
@@ -304,40 +304,17 @@ class Stream(override val topologyContext: TopologyContext, val topologyConstruc
304304
connectorClientConfigOverridePolicy
305305
)
306306

307-
val statusBackingStore: StatusBackingStore = KafkaStatusBackingStore(time, worker.getInternalValueConverter())
307+
val statusBackingStore: StatusBackingStore = KafkaStatusBackingStore(time, worker.internalValueConverter)
308308
statusBackingStore.configure(config)
309309

310-
val configBackingStore = KafkaConfigBackingStore(worker.getInternalValueConverter(), config, worker.configTransformer())
310+
val configBackingStore = KafkaConfigBackingStore(worker.internalValueConverter, config, worker.configTransformer())
311311
val herder = DistributedHerder(config, time, worker, kafkaClusterId, statusBackingStore, configBackingStore, advertisedUrl.toString(), connectorClientConfigOverridePolicy)
312312

313-
// val herder: Herder = StandaloneHerder(worker, kafkaClusterId, connectorClientConfigOverridePolicy)
314313
val connect = Connect(herder, rest)
315314

316315
connect.start()
317316
logger.info("Connect started!!")
318317
return herder
319-
// try {
320-
// // for (connectorPropsFile in Arrays.copyOfRange(args, 1, args.length)) {
321-
// // val connectorProps: Map<String, String> = Utils.propsToStringMap(Utils.loadProps(connectorPropsFile))
322-
// // val cb: FutureCallback<Herder.Created<ConnectorInfo>> =
323-
// // FutureCallback(object : Callback<Herder.Created<ConnectorInfo?>?>() {
324-
// // fun onCompletion(error: Throwable?, info: Herder.Created<ConnectorInfo>) {
325-
// // if (error != null) log.error(
326-
// // "Failed to create job for {}",
327-
// // connectorPropsFile
328-
// // ) else log.info("Created connector {}", info.result().name())
329-
// // }
330-
// // })
331-
// // herder.putConnectorConfig(
332-
// // connectorProps[ConnectorConfig.NAME_CONFIG],
333-
// // connectorProps, false, cb
334-
// // )
335-
// // cb.get()
336-
// // }
337-
// } catch (t: Throwable) {
338-
// logger.error("Stopping after connector error", t)
339-
// connect.stop()
340-
// }
341318
}
342319

343320
/**

floodplain-elasticsearch/src/main/kotlin/io/floodplain/elasticsearch/ElasticSearchSink.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ fun PartialStream.elasticSearchSink(sinkName: String, topicName: String, config:
6868
"tasks.max" to "1",
6969
"type.name" to "_doc",
7070
"value.converter" to "org.apache.kafka.connect.json.JsonConverter",
71-
"key.converter" to "org.apache.kafka.connect.json.JsonConverter", // maps not supported by elasticsearch
71+
"key.converter" to "org.apache.kafka.connect.storage.StringConverter", // maps not supported by elasticsearch
7272
"topics" to topic.qualifiedString(),
7373
"schema.ignore" to "true",
7474
"behavior.on.null.values" to "delete",

floodplain-example/src/main/kotlin/io/floodplain/kotlindsl/example/Dispensation.kt

-71
This file was deleted.

floodplain-stream-topology/src/main/java/io/floodplain/reactive/source/topology/FunctionProcessor.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import io.floodplain.immutable.factory.ImmutableFactory;
2323
import io.floodplain.replication.api.ReplicationMessage;
2424
import io.floodplain.replication.factory.ReplicationFactory;
25-
import org.apache.kafka.streams.processor.AbstractProcessor;
2625
import org.apache.kafka.streams.processor.api.Processor;
2726
import org.apache.kafka.streams.processor.api.ProcessorContext;
2827
import org.apache.kafka.streams.processor.api.Record;
@@ -50,7 +49,7 @@ public void process(Record<String, ReplicationMessage> record) {
5049
}
5150
if(record.value().operation()!= ReplicationMessage.Operation.DELETE) {
5251
ImmutableMessage applied = function.apply(record.key(), record.value().message(), record.value().paramMessage().orElse(ImmutableFactory.empty()));
53-
context.forward(new Record<String, ReplicationMessage>(record.key(), ReplicationFactory.standardMessage(applied), record.timestamp())); //.withParamMessage(value.paramMessage()); //.orElse(ImmutableFactory.empty())).withOperation(operation));
52+
context.forward(new Record<>(record.key(), ReplicationFactory.standardMessage(applied), record.timestamp())); //.withParamMessage(value.paramMessage()); //.orElse(ImmutableFactory.empty())).withOperation(operation));
5453

5554
} else {
5655
// Skip processing, just forward message if it is a delete

floodplain-test/src/main/kotlin/io/floodplain/test/InstantiatedContainer.kt

-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ class InstantiatedKafkaContainer(customizer: ((KafkaContainer) -> KafkaContainer
5757
.apply { withStartupTimeout(Duration.ofMinutes(5)) }
5858
.apply { withStartupAttempts(50) }
5959
.apply { customizer?.invoke(this) }
60-
// .apply { withEnv(env) }
6160
var host: String
6261
var exposedPort: Int = -1
6362
init {

immutable-api/src/main/java/io/floodplain/immutable/api/ImmutableMessage.java

-2
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,6 @@ enum ValueType {
7171

7272
ImmutableMessage merge(ImmutableMessage other, Optional<List<String>> only);
7373

74-
String toFlatString(ImmutableMessageParser parser);
75-
7674
Optional<List<ImmutableMessage>> subMessages(String field);
7775

7876
Optional<ImmutableMessage> subMessage(String field);

immutable-impl/src/main/java/io/floodplain/immutable/impl/ImmutableMessageImpl.java

-29
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,10 @@
1919
package io.floodplain.immutable.impl;
2020

2121
import io.floodplain.immutable.api.ImmutableMessage;
22-
import io.floodplain.immutable.api.ImmutableMessageParser;
2322
import io.floodplain.immutable.factory.ImmutableFactory;
2423
import org.slf4j.Logger;
2524
import org.slf4j.LoggerFactory;
2625

27-
import java.time.LocalDate;
28-
import java.time.LocalDateTime;
29-
import java.time.LocalTime;
30-
import java.time.ZoneOffset;
3126
import java.util.*;
3227
import java.util.Map.Entry;
3328
import java.util.function.Function;
@@ -109,19 +104,6 @@ public Map<String, Object> valueMap(boolean ignoreNull, Set<String> ignore, List
109104
return Collections.unmodifiableMap(result);
110105
}
111106

112-
// private Object postProcessValue(Object value) {
113-
// if(value instanceof LocalDate) {
114-
// return ((LocalDate)value).toEpochDay();
115-
// }
116-
// if(value instanceof LocalDateTime) {
117-
// return ((LocalDateTime)value).toInstant(ZoneOffset.UTC).toEpochMilli();
118-
// }
119-
// if (value instanceof LocalTime) {
120-
// return ((LocalTime)value).toSecondOfDay();
121-
// }
122-
// return value;
123-
// }
124-
125107
@Override
126108
public Set<String> columnNames() {
127109
return this.values.keySet();
@@ -250,17 +232,6 @@ public ImmutableMessage with(String key, Object value, ValueType type) {
250232
}
251233
}
252234

253-
254-
@Override
255-
public String toFlatString(ImmutableMessageParser parser) {
256-
if (parser == null) {
257-
logger.info("Can not flatten parser, no parser present");
258-
return "";
259-
} else {
260-
return parser.describe(this);
261-
}
262-
}
263-
264235
@Override
265236
public ImmutableMessage withoutSubMessages(String field) {
266237
Map<String, List<ImmutableMessage>> res = new HashMap<>(this.subMessagesMap);

kafka-converter/src/test/java/io/floodplain/kafka/converter/TestConverter.java

-2
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@ public class TestConverter {
3636

3737
@Before
3838
public void setup() {
39-
// ReplicationFactory.setInstance(new FallbackReplicationMessageParser());
40-
4139
}
4240

4341
@Test

0 commit comments

Comments
 (0)