Skip to content

Commit

Permalink
fix spark kafka source offset field is null
Browse files Browse the repository at this point in the history
  • Loading branch information
harbby committed Sep 15, 2020
1 parent 8a27b0c commit 550a42b
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,13 @@ public void commitOffsets(RDD<?> kafkaRdd)
continue;
case "_offset":
values[i] = record.offset();
continue;
case "_timestamp":
values[i] = record.timestamp();
continue;
case "_timestampType":
values[i] = record.timestampType().id;
continue;
default:
values[i] = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ private static Dataset<Row> createSource(SparkSession spark, KafkaSourceConfig c
continue;
case "_offset":
values[i] = record.<Long>getAs("offset");
continue;
default:
values[i] = null;
}
Expand Down

0 comments on commit 550a42b

Please sign in to comment.