Skip to content

Commit 4c46b9a

Browse files
committed
Merge branch 'feat_1.10_4.2.x_kafkaSample2' into '1.10_test_4.2.x'
Feat 1.10 4.2.x kafka sample2 See merge request dt-insight-engine/flinkStreamSQL!289
2 parents b26a3f3 + be62b81 commit 4c46b9a

File tree

22 files changed

+855
-221
lines changed

22 files changed

+855
-221
lines changed

core/src/main/java/com/dtstack/flink/sql/format/dtnest/DtNestRowDeserializationSchema.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@
3636

3737
import java.io.IOException;
3838
import java.lang.reflect.Array;
39+
import java.sql.Clob;
3940
import java.sql.Date;
41+
import java.sql.NClob;
4042
import java.sql.Time;
4143
import java.sql.Timestamp;
4244
import java.util.Iterator;
@@ -148,6 +150,10 @@ private Object convert(JsonNode node, TypeInformation<?> info) {
148150
} else {
149151
return node.asText();
150152
}
153+
} else if (info.getTypeClass().equals(Clob.class)) {
154+
return node.asText();
155+
} else if (info.getTypeClass().equals(NClob.class)) {
156+
return node.asText();
151157
} else if (info.getTypeClass().equals(Types.SQL_DATE.getTypeClass())) {
152158
return convertToDate(node.asText());
153159
} else if (info.getTypeClass().equals(Types.SQL_TIME.getTypeClass())) {
@@ -225,7 +231,7 @@ private Row convertTopRow() {
225231
JsonNode node = getIgnoreCase(fieldNames[i]);
226232
AbstractTableInfo.FieldExtraInfo fieldExtraInfo = fieldExtraInfos.get(i);
227233

228-
if (node == null) {
234+
if (node == null || node instanceof NullNode) {
229235
if (fieldExtraInfo != null && fieldExtraInfo.getNotNull()) {
230236
throw new IllegalStateException("Failed to find field with name '"
231237
+ fieldNames[i] + "'.");

core/src/main/java/com/dtstack/flink/sql/util/ClassUtil.java

+6
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222

2323
import java.lang.reflect.Array;
2424
import java.math.BigDecimal;
25+
import java.sql.Clob;
2526
import java.sql.Date;
27+
import java.sql.NClob;
2628
import java.sql.Time;
2729
import java.sql.Timestamp;
2830
import java.util.HashMap;
@@ -101,6 +103,10 @@ public static Class<?> stringConvertClass(String str) {
101103
case "decimal":
102104
case "decimalunsigned":
103105
return BigDecimal.class;
106+
case "clob":
107+
return Clob.class;
108+
case "nclob":
109+
return NClob.class;
104110
default:
105111
break;
106112
}

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/AbstractKafkaConsumerFactory.java

+3-50
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,10 @@
2020

2121
import com.dtstack.flink.sql.dirtyManager.manager.DirtyDataManager;
2222
import com.dtstack.flink.sql.format.DeserializationMetricWrapper;
23-
import com.dtstack.flink.sql.format.dtnest.DtNestRowDeserializationSchema;
24-
import com.dtstack.flink.sql.format.FormatType;
23+
import com.dtstack.flink.sql.source.kafka.deserialization.DeserializationSchemaFactory;
24+
import com.dtstack.flink.sql.source.kafka.deserialization.KafkaDeserializationMetricWrapper;
2525
import com.dtstack.flink.sql.source.kafka.table.KafkaSourceTableInfo;
26-
import org.apache.commons.lang3.StringUtils;
27-
import org.apache.flink.api.common.serialization.DeserializationSchema;
2826
import org.apache.flink.api.common.typeinfo.TypeInformation;
29-
import org.apache.flink.formats.avro.AvroRowDeserializationSchema;
30-
import org.apache.flink.formats.csv.CsvRowDeserializationSchema;
31-
import org.apache.flink.formats.json.DTJsonRowDeserializationSchema;
3227
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
3328
import org.apache.flink.types.Row;
3429

@@ -50,50 +45,8 @@ protected DeserializationMetricWrapper createDeserializationMetricWrapper(KafkaS
5045
TypeInformation<Row> typeInformation,
5146
Calculate calculate) {
5247
return new KafkaDeserializationMetricWrapper(typeInformation,
53-
createDeserializationSchema(kafkaSourceTableInfo, typeInformation),
48+
DeserializationSchemaFactory.createDeserializationSchema(kafkaSourceTableInfo, typeInformation),
5449
calculate,
5550
DirtyDataManager.newInstance(kafkaSourceTableInfo.getDirtyProperties()));
5651
}
57-
58-
private DeserializationSchema<Row> createDeserializationSchema(KafkaSourceTableInfo kafkaSourceTableInfo, TypeInformation<Row> typeInformation) {
59-
DeserializationSchema<Row> deserializationSchema = null;
60-
if (FormatType.DT_NEST.name().equalsIgnoreCase(kafkaSourceTableInfo.getSourceDataType())) {
61-
deserializationSchema = new DtNestRowDeserializationSchema(typeInformation, kafkaSourceTableInfo.getPhysicalFields(),
62-
kafkaSourceTableInfo.getFieldExtraInfoList(),kafkaSourceTableInfo.getCharsetName());
63-
} else if (FormatType.JSON.name().equalsIgnoreCase(kafkaSourceTableInfo.getSourceDataType())) {
64-
65-
if (StringUtils.isNotBlank(kafkaSourceTableInfo.getSchemaString())) {
66-
deserializationSchema = new DTJsonRowDeserializationSchema(kafkaSourceTableInfo.getSchemaString());
67-
} else if (typeInformation != null && typeInformation.getArity() != 0) {
68-
deserializationSchema = new DTJsonRowDeserializationSchema(typeInformation);
69-
} else {
70-
throw new IllegalArgumentException("sourceDataType:" + FormatType.JSON.name() + " must set schemaString(JSON Schema)or TypeInformation<Row>");
71-
}
72-
73-
} else if (FormatType.CSV.name().equalsIgnoreCase(kafkaSourceTableInfo.getSourceDataType())) {
74-
75-
if (StringUtils.isBlank(kafkaSourceTableInfo.getFieldDelimiter())) {
76-
throw new IllegalArgumentException("sourceDataType:" + FormatType.CSV.name() + " must set fieldDelimiter");
77-
}
78-
79-
final CsvRowDeserializationSchema.Builder deserSchemaBuilder = new CsvRowDeserializationSchema.Builder(typeInformation);
80-
deserSchemaBuilder.setFieldDelimiter(kafkaSourceTableInfo.getFieldDelimiter().toCharArray()[0]);
81-
deserializationSchema = deserSchemaBuilder.build();
82-
83-
} else if (FormatType.AVRO.name().equalsIgnoreCase(kafkaSourceTableInfo.getSourceDataType())) {
84-
85-
if (StringUtils.isBlank(kafkaSourceTableInfo.getSchemaString())) {
86-
throw new IllegalArgumentException("sourceDataType:" + FormatType.AVRO.name() + " must set schemaString");
87-
}
88-
89-
deserializationSchema = new AvroRowDeserializationSchema(kafkaSourceTableInfo.getSchemaString());
90-
}
91-
92-
if (null == deserializationSchema) {
93-
throw new UnsupportedOperationException("FormatType:" + kafkaSourceTableInfo.getSourceDataType());
94-
}
95-
96-
return deserializationSchema;
97-
}
98-
9952
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.source.kafka.deserialization;
20+
21+
import com.dtstack.flink.sql.format.FormatType;
22+
import com.dtstack.flink.sql.format.dtnest.DtNestRowDeserializationSchema;
23+
import com.dtstack.flink.sql.source.kafka.table.KafkaSourceTableInfo;
24+
import org.apache.commons.lang3.StringUtils;
25+
import org.apache.flink.api.common.serialization.DeserializationSchema;
26+
import org.apache.flink.api.common.typeinfo.TypeInformation;
27+
import org.apache.flink.formats.avro.AvroRowDeserializationSchema;
28+
import org.apache.flink.formats.csv.CsvRowDeserializationSchema;
29+
import org.apache.flink.formats.json.DTJsonRowDeserializationSchema;
30+
import org.apache.flink.types.Row;
31+
32+
/**
33+
* Date: 2021/05/25 Company: www.dtstack.com
34+
*
35+
* @author tiezhu
36+
*/
37+
public class DeserializationSchemaFactory {
38+
39+
public static DeserializationSchema<Row> createDeserializationSchema(
40+
KafkaSourceTableInfo kafkaSourceTableInfo, TypeInformation<Row> typeInformation) {
41+
DeserializationSchema<Row> deserializationSchema = null;
42+
if (FormatType.DT_NEST.name().equalsIgnoreCase(kafkaSourceTableInfo.getSourceDataType())) {
43+
deserializationSchema =
44+
new DtNestRowDeserializationSchema(
45+
typeInformation,
46+
kafkaSourceTableInfo.getPhysicalFields(),
47+
kafkaSourceTableInfo.getFieldExtraInfoList(),
48+
kafkaSourceTableInfo.getCharsetName());
49+
} else if (FormatType.JSON
50+
.name()
51+
.equalsIgnoreCase(kafkaSourceTableInfo.getSourceDataType())) {
52+
53+
if (StringUtils.isNotBlank(kafkaSourceTableInfo.getSchemaString())) {
54+
deserializationSchema =
55+
new DTJsonRowDeserializationSchema(kafkaSourceTableInfo.getSchemaString());
56+
} else if (typeInformation != null && typeInformation.getArity() != 0) {
57+
deserializationSchema = new DTJsonRowDeserializationSchema(typeInformation);
58+
} else {
59+
throw new IllegalArgumentException(
60+
"sourceDataType:"
61+
+ FormatType.JSON.name()
62+
+ " must set schemaString(JSON Schema)or TypeInformation<Row>");
63+
}
64+
65+
} else if (FormatType.CSV
66+
.name()
67+
.equalsIgnoreCase(kafkaSourceTableInfo.getSourceDataType())) {
68+
69+
if (StringUtils.isBlank(kafkaSourceTableInfo.getFieldDelimiter())) {
70+
throw new IllegalArgumentException(
71+
"sourceDataType:" + FormatType.CSV.name() + " must set fieldDelimiter");
72+
}
73+
74+
final CsvRowDeserializationSchema.Builder deserSchemaBuilder =
75+
new CsvRowDeserializationSchema.Builder(typeInformation);
76+
deserSchemaBuilder.setFieldDelimiter(
77+
kafkaSourceTableInfo.getFieldDelimiter().toCharArray()[0]);
78+
deserializationSchema = deserSchemaBuilder.build();
79+
80+
} else if (FormatType.AVRO
81+
.name()
82+
.equalsIgnoreCase(kafkaSourceTableInfo.getSourceDataType())) {
83+
84+
if (StringUtils.isBlank(kafkaSourceTableInfo.getSchemaString())) {
85+
throw new IllegalArgumentException(
86+
"sourceDataType:" + FormatType.AVRO.name() + " must set schemaString");
87+
}
88+
89+
deserializationSchema =
90+
new AvroRowDeserializationSchema(kafkaSourceTableInfo.getSchemaString());
91+
}
92+
93+
if (null == deserializationSchema) {
94+
throw new UnsupportedOperationException(
95+
"FormatType:" + kafkaSourceTableInfo.getSourceDataType());
96+
}
97+
98+
return deserializationSchema;
99+
}
100+
}
+24-8
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,16 @@
1616
* limitations under the License.
1717
*/
1818

19-
package com.dtstack.flink.sql.source.kafka;
19+
package com.dtstack.flink.sql.source.kafka.deserialization;
2020

21+
import com.dtstack.flink.sql.source.kafka.sample.OffsetMap;
2122
import org.apache.flink.api.common.serialization.DeserializationSchema;
2223
import org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper;
2324
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
2425
import org.apache.kafka.clients.consumer.ConsumerRecord;
2526

2627
import java.util.ArrayList;
28+
import java.util.HashMap;
2729
import java.util.List;
2830
import java.util.Map;
2931

@@ -34,15 +36,28 @@
3436
*/
3537
public class DtKafkaDeserializationSchemaWrapper<T> extends KafkaDeserializationSchemaWrapper<T> {
3638

37-
private final Map<KafkaTopicPartition, Long> specificEndOffsets;
38-
3939
private final List<Integer> endPartition = new ArrayList<>();
4040

41-
public DtKafkaDeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema,
42-
Map<KafkaTopicPartition, Long> specificEndOffsets) {
41+
private Map<KafkaTopicPartition, Long> specificEndOffsets;
4342

43+
public DtKafkaDeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema) {
4444
super(deserializationSchema);
45-
this.specificEndOffsets = specificEndOffsets;
45+
}
46+
47+
public void setSpecificEndOffsets(OffsetMap offsetMap) {
48+
Map<KafkaTopicPartition, Long> latest = offsetMap.getLatest();
49+
Map<KafkaTopicPartition, Long> earliest = offsetMap.getEarliest();
50+
51+
this.specificEndOffsets = new HashMap<>(latest);
52+
53+
// 除去没有数据的分区,避免任务一直等待分区数据
54+
latest.keySet().forEach(
55+
partition -> {
56+
if (latest.get(partition).equals(earliest.get(partition))) {
57+
specificEndOffsets.remove(partition);
58+
}
59+
}
60+
);
4661
}
4762

4863
@Override
@@ -53,9 +68,9 @@ public T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
5368
}
5469
if (specificEndOffsets != null) {
5570
Long endOffset = specificEndOffsets.get(topicPartition);
56-
if (endOffset != null && record.offset() >= endOffset) {
71+
if (endOffset != null && record.offset() >= endOffset - 1) {
5772
endPartition.add(record.partition());
58-
return null;
73+
return super.deserialize(record);
5974
}
6075
}
6176

@@ -65,6 +80,7 @@ public T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
6580
public boolean isEndOfStream(T nextElement) {
6681
boolean isEnd =
6782
specificEndOffsets != null
83+
&& !specificEndOffsets.isEmpty()
6884
&& endPartition.size() == specificEndOffsets.size();
6985
return super.isEndOfStream(nextElement) || isEnd;
7086
}
+7-6
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,11 @@
1616
* limitations under the License.
1717
*/
1818

19-
package com.dtstack.flink.sql.source.kafka;
19+
package com.dtstack.flink.sql.source.kafka.deserialization;
2020

2121
import com.dtstack.flink.sql.dirtyManager.manager.DirtyDataManager;
2222
import com.dtstack.flink.sql.format.DeserializationMetricWrapper;
23+
import com.dtstack.flink.sql.source.kafka.Calculate;
2324
import com.dtstack.flink.sql.util.ReflectionUtils;
2425
import org.apache.flink.api.common.serialization.DeserializationSchema;
2526
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -54,13 +55,13 @@ public class KafkaDeserializationMetricWrapper extends DeserializationMetricWrap
5455

5556
private static final Logger LOG = LoggerFactory.getLogger(KafkaDeserializationMetricWrapper.class);
5657

57-
private Calculate calculate;
58+
private final Calculate calculate;
5859

5960
public KafkaDeserializationMetricWrapper(
60-
TypeInformation<Row> typeInfo
61-
, DeserializationSchema<Row> deserializationSchema
62-
, Calculate calculate
63-
, DirtyDataManager dirtyDataManager) {
61+
TypeInformation<Row> typeInfo,
62+
DeserializationSchema<Row> deserializationSchema,
63+
Calculate calculate,
64+
DirtyDataManager dirtyDataManager) {
6465
super(typeInfo, deserializationSchema, dirtyDataManager);
6566
this.calculate = calculate;
6667
}

0 commit comments

Comments
 (0)