diff --git a/flinkx/src/main/scala/edp/wormhole/flinkx/common/FlinkxUtils.scala b/flinkx/src/main/scala/edp/wormhole/flinkx/common/FlinkxUtils.scala index 8eb373257..5b39ea4be 100644 --- a/flinkx/src/main/scala/edp/wormhole/flinkx/common/FlinkxUtils.scala +++ b/flinkx/src/main/scala/edp/wormhole/flinkx/common/FlinkxUtils.scala @@ -54,10 +54,12 @@ object FlinkxUtils { streamId: Long, errorPattern: String): String = { val ts: String = null + val errorMaxLength = 2000 val errorMsg = if(error!=null){ val first = if(error.getStackTrace!=null&&error.getStackTrace.nonEmpty) error.getStackTrace.head.toString else "" - error.toString + "\n" + first + val errorAll = error.toString + "\n" + first + errorAll.substring(0, math.min(errorMaxLength, errorAll.length)) } else null UmsProtocolUtils.feedbackFlowError(sourceNamespace, streamId, DateUtils.currentDateTime, sinkNamespace, UmsWatermark(ts), diff --git a/sparkx/src/main/scala/edp/wormhole/sparkx/common/SparkxUtils.scala b/sparkx/src/main/scala/edp/wormhole/sparkx/common/SparkxUtils.scala index 0db9d5994..e01bf850f 100644 --- a/sparkx/src/main/scala/edp/wormhole/sparkx/common/SparkxUtils.scala +++ b/sparkx/src/main/scala/edp/wormhole/sparkx/common/SparkxUtils.scala @@ -52,6 +52,7 @@ object SparkxUtils extends EdpLogging{ errorPattern:String): Unit ={ val ts: String = null + val errorMaxLength = 2000 // val tmpJsonArray = new JSONArray() // val sourceTopicSet = mutable.HashSet.empty[String] // sourceTopicSet ++= incrementTopicList @@ -63,7 +64,8 @@ object SparkxUtils extends EdpLogging{ val errorMsg = if(error!=null){ val first = if(error.getStackTrace!=null&&error.getStackTrace.nonEmpty) error.getStackTrace.head.toString else "" - error.toString + "\n" + first + val errorAll = error.toString + "\n" + first + errorAll.substring(0, math.min(errorMaxLength, errorAll.length)) } else null WormholeKafkaProducer.sendMessage(config.kafka_output.feedback_topic_name, FeedbackPriority.feedbackPriority, UmsProtocolUtils.feedbackFlowError(sourceNamespace,