Skip to content

Commit

Permalink
Added transformation to filter records based on items in fields. Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
jcustenborder authored Oct 18, 2018
1 parent 189cb8a commit 0f21d20
Show file tree
Hide file tree
Showing 4 changed files with 280 additions and 29 deletions.
44 changes: 15 additions & 29 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<parent>
<groupId>com.github.jcustenborder.kafka.connect</groupId>
<artifactId>kafka-connect-parent</artifactId>
<version>1.1.0-cp3</version>
<version>2.0.0</version>
</parent>
<artifactId>kafka-connect-transform-common</artifactId>
<version>0.1.0-SNAPSHOT</version>
Expand Down Expand Up @@ -82,34 +82,20 @@
<plugin>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-maven-plugin</artifactId>
<version>0.9.0</version>
<executions>
<execution>
<goals>
<goal>kafka-connect</goal>
</goals>
<configuration>
<ownerUsername>jcustenborder</ownerUsername>
<confluentControlCenterIntegration>true</confluentControlCenterIntegration>
<documentationUrl>
https://jcustenborder.github.io/kafka-connect-documentation/
</documentationUrl>
<ownerName>Jeremy Custenborder</ownerName>
<dockerNamespace>jcustenborder</dockerNamespace>
<dockerName>kafka-connect-docker</dockerName>
<pluginTypes>
<pluginType>transform</pluginType>
</pluginTypes>
<tags>
<tag>Transformation</tag>
</tags>
<title>Common Transformations</title>
<supportUrl>${pom.issueManagement.url}</supportUrl>
<supportSummary>Support provided through community involvement.
</supportSummary>
</configuration>
</execution>
</executions>
<configuration>
<confluentControlCenterIntegration>true</confluentControlCenterIntegration>
<documentationUrl>https://jcustenborder.github.io/kafka-connect-documentation/</documentationUrl>
<componentTypes>
<componentType>source</componentType>
</componentTypes>
<tags>
<tag>Twitter</tag>
<tag>Social</tag>
</tags>
<title>Kafka Connect Common Transformations</title>
<supportUrl>${pom.issueManagement.url}</supportUrl>
<supportSummary>Support provided through community involvement.</supportSummary>
</configuration>
</plugin>
</plugins>
</build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/**
* Copyright © 2017 Jeremy Custenborder ([email protected])
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.jcustenborder.kafka.connect.transform.common;

import com.github.jcustenborder.kafka.connect.utils.config.Description;
import com.github.jcustenborder.kafka.connect.utils.config.DocumentationTip;
import com.github.jcustenborder.kafka.connect.utils.config.Title;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.Transformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.regex.Matcher;

public abstract class PatternFilter<R extends ConnectRecord<R>> implements Transformation<R> {
private static final Logger log = LoggerFactory.getLogger(PatternFilter.class);

@Override
public ConfigDef config() {
return PatternFilterConfig.config();
}

PatternFilterConfig config;

@Override
public void configure(Map<String, ?> settings) {
this.config = new PatternFilterConfig(settings);
}

@Override
public void close() {

}

R filter(R record, Struct struct) {
for (Field field : struct.schema().fields()) {
if (this.config.fields.contains(field.name())) {
if (field.schema().type() == Schema.Type.STRING) {
String input = struct.getString(field.name());
if (null != input) {
Matcher matcher = this.config.pattern.matcher(input);
if (matcher.matches()) {
return null;
}
}
}
}
}
return record;
}

R filter(R record, Map map) {
for (Object field : map.keySet()) {
if (this.config.fields.contains(field)) {
Object value = map.get(field);

if (value instanceof String) {
String input = (String) value;
Matcher matcher = this.config.pattern.matcher(input);
if (matcher.matches()) {
return null;
}
}
}
}

return record;
}


R filter(R record, final boolean key) {
final SchemaAndValue input = key ?
new SchemaAndValue(record.keySchema(), record.key()) :
new SchemaAndValue(record.valueSchema(), record.value());
final R result;
if (input.schema() != null) {
if (Schema.Type.STRUCT == input.schema().type()) {
result = filter(record, (Struct) input.value());
} else if (Schema.Type.MAP == input.schema().type()) {
result = filter(record, (Map) input.value());
} else {
result = record;
}
} else if (input.value() instanceof Map) {
result = filter(record, (Map) input.value());
} else {
result = record;
}

return result;
}

@Title("PatternFilter(Key)")
@Description("This transformation is used to filter records based on a regular expression.")
@DocumentationTip("This transformation is used to filter records based on fields in the Key of the record.")
public static class Key<R extends ConnectRecord<R>> extends PatternFilter<R> {
@Override
public R apply(R r) {
return filter(r, true);
}
}

@Title("PatternFilter(Value)")
@Description("This transformation is used to filter records based on a regular expression.")
@DocumentationTip("This transformation is used to filter records based on fields in the Value of the record.")
public static class Value<R extends ConnectRecord<R>> extends PatternFilter<R> {
@Override
public R apply(R r) {
return filter(r, false);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/**
* Copyright © 2017 Jeremy Custenborder ([email protected])
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.jcustenborder.kafka.connect.transform.common;

import com.github.jcustenborder.kafka.connect.utils.config.ConfigKeyBuilder;
import com.github.jcustenborder.kafka.connect.utils.config.ConfigUtils;
import com.github.jcustenborder.kafka.connect.utils.config.validators.Validators;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;

public class PatternFilterConfig extends AbstractConfig {
public final Pattern pattern;
public final Set<String> fields;

public static final String PATTERN_CONFIG = "pattern";
public static final String PATTERN_DOC = "The regex to test the message with. ";

public static final String FIELD_CONFIG = "fields";
public static final String FIELD_DOC = "The fields to transform.";


public PatternFilterConfig(Map<String, ?> settings) {
super(config(), settings);
this.pattern = ConfigUtils.pattern(this, PATTERN_CONFIG);
List<String> fields = getList(FIELD_CONFIG);
this.fields = new HashSet<>(fields);
}

public static ConfigDef config() {
return new ConfigDef()
.define(
ConfigKeyBuilder.of(PATTERN_CONFIG, ConfigDef.Type.STRING)
.documentation(PATTERN_DOC)
.importance(ConfigDef.Importance.HIGH)
.validator(Validators.pattern())
.build()
).define(
ConfigKeyBuilder.of(FIELD_CONFIG, ConfigDef.Type.LIST)
.documentation(FIELD_DOC)
.defaultValue(Collections.emptyList())
.importance(ConfigDef.Importance.HIGH)
.build()
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.github.jcustenborder.kafka.connect.transform.common;

import com.google.common.collect.ImmutableMap;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;

public class PatternFilterTest {
public PatternFilter.Value transform;

@BeforeEach
public void before() {
this.transform = new PatternFilter.Value();
this.transform.configure(
ImmutableMap.of(
PatternFilterConfig.FIELD_CONFIG, "input",
PatternFilterConfig.PATTERN_CONFIG, "^filter$"
)
);
}

SinkRecord map(String value) {
return new SinkRecord(
"asdf",
1,
null,
null,
null,
ImmutableMap.of("input", value),
1234L
);
}

SinkRecord struct(String value) {
Schema schema = SchemaBuilder.struct()
.field("input", Schema.STRING_SCHEMA)
.build();
Struct struct = new Struct(schema)
.put("input", value);
return new SinkRecord(
"asdf",
1,
null,
null,
schema,
struct,
1234L
);
}

@Test
public void filtered() {
assertNull(this.transform.apply(struct("filter")));
assertNull(this.transform.apply(map("filter")));
}

@Test
public void notFiltered() {
assertNotNull(this.transform.apply(struct("ok")));
assertNotNull(this.transform.apply(map("ok")));
}

}

0 comments on commit 0f21d20

Please sign in to comment.