Skip to content

Commit

Permalink
fix flink lookup bug
Browse files Browse the repository at this point in the history
  • Loading branch information
Liuwenli11 authored and dlt1111 committed Jun 26, 2019
1 parent fb15579 commit 8f1e842
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ object LookupHelper extends java.io.Serializable {
val sourceTableFields: Array[String] = if (swiftsSql.sourceTableFields.isDefined) swiftsSql.sourceTableFields.get else null
val lookupTableFields = if (swiftsSql.lookupTableFields.isDefined) swiftsSql.lookupTableFields.get else null
val sql = swiftsSql.sql
val joinFieldsValueArray: Array[Any] = joinFieldsInRow(row, lookupTableFields, sourceTableFields, preSchemaMap)
val joinFieldsValueArray: Array[Any] = joinFieldsInRow(row, lookupTableFields, sourceTableFields, preSchemaMap, true)
UmsDataSystem.dataSystem(dataSystem) match {
case UmsDataSystem.CASSANDRA => getCassandraSql(joinFieldsValueArray, sql, lookupTableFields)
case _ => getRmdbSql(joinFieldsValueArray, sql, lookupTableFields)
Expand All @@ -144,11 +144,12 @@ object LookupHelper extends java.io.Serializable {
def joinFieldsInRow(row: Row,
lookupTableFields: Array[String],
sourceTableFields: Array[String],
preSchemaMap: Map[String, (TypeInformation[_], Int)]): Array[Any] = {
preSchemaMap: Map[String, (TypeInformation[_], Int)],
exeSql: Boolean): Array[Any] = {
val fieldContent = sourceTableFields.map(fieldName => {
var value = FlinkSchemaUtils.object2TrueValue(preSchemaMap(fieldName.trim)._1, row.getField(preSchemaMap(fieldName.trim)._2))
value =if (value != null) value else "N/A"
if (preSchemaMap(fieldName)._1 == Types.STRING || preSchemaMap(fieldName)._1 == Types.SQL_TIMESTAMP || preSchemaMap(fieldName)._1 == Types.SQL_DATE)
if (exeSql && (preSchemaMap(fieldName)._1 == Types.STRING || preSchemaMap(fieldName)._1 == Types.SQL_TIMESTAMP || preSchemaMap(fieldName)._1 == Types.SQL_DATE))
"'" + value + "'"
else value
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class LookupProcessElement(swiftsSql: SwiftsSql,
val joinFields = UmsDataSystem.dataSystem(lookupNamespace.split("\\.")(0).toLowerCase()) match {
case UmsDataSystem.HBASE => LookupHbaseHelper.joinFieldsInRow(value, lookupTableFields, sourceTableFields, preSchemaMap).mkString("_")
case UmsDataSystem.REDIS => LookupRedisHelper.joinFieldsInRow(value, swiftsSql, preSchemaMap)
case _ => LookupHelper.joinFieldsInRow(value, lookupTableFields, sourceTableFields, preSchemaMap).mkString("_")
case _ => LookupHelper.joinFieldsInRow(value, lookupTableFields, sourceTableFields, preSchemaMap, false).mkString("_")
}

if (lookupDataMap == null || !lookupDataMap.contains(joinFields)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ object LookupKuduHelper extends java.io.Serializable {
//value
val sourceTableFields: Array[String] = if (swiftsSql.sourceTableFields.isDefined) swiftsSql.sourceTableFields.get else null
val lookupTableFields = if (swiftsSql.lookupTableFields.isDefined) swiftsSql.lookupTableFields.get else null
val joinFieldsValueArray: Array[Any] = LookupHelper.joinFieldsInRow(row, lookupTableFields, sourceTableFields, preSchemaMap)
val joinFieldsValueArray: Array[Any] = LookupHelper.joinFieldsInRow(row, lookupTableFields, sourceTableFields, preSchemaMap, false)
val joinFieldsValueString: Array[String] = joinFieldsValueArray.map(value => value.toString)
val tableSchemaInKudu = KuduConnection.getAllFieldsKuduTypeMap(table)
val queryResult: (String, Map[String, (Any, String)]) = KuduConnection.doQueryByKey(lookupTableFields, joinFieldsValueString.toList, tableSchemaInKudu, client, table, selectFields)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,12 @@ object BatchflowDirective extends Directive {
val kerberos = UmsFieldType.umsFieldValue(tuple.tuple, schemas, "kerberos").toString.toBoolean
val tmpPriorityIdStr = UmsFieldType.umsFieldValue(tuple.tuple, schemas, "priority_id")
val priorityId = if (tmpPriorityIdStr == null) directiveId else tmpPriorityIdStr.toString.toLong
val sourceIncrementTopicList = UmsFieldType.umsFieldValue(tuple.tuple, schemas, "source_increment_topic").toString.split(",").toList
//val sourceIncrementTopicList = UmsFieldType.umsFieldValue(tuple.tuple, schemas, "source_increment_topic").toString.split(",").toList
val sourceIncrementTopic = UmsFieldType.umsFieldValue(tuple.tuple, schemas, "source_increment_topic")

val sourceIncrementTopicList =
if(null != sourceIncrementTopic) sourceIncrementTopic.toString.split(",").toList
else null

val flowDirectiveConfig = FlowDirectiveConfig(sourceNamespace, fullSinkNamespace, streamId, flowId, directiveId, swiftsStr, sinksStr, consumptionDataStr, dataType, dataParseStr, kerberos, priorityId, sourceIncrementTopicList)

Expand Down

0 comments on commit 8f1e842

Please sign in to comment.