Skip to content

Commit

Permalink
fix: update event time filter transformer to use tag (numaproj#43)
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang authored Apr 26, 2023
1 parent 792d481 commit cfbe68e
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
* data transformation, and returns the message.
* <p>
* If the message event time is before year 2022, drop the message. If it's within year 2022, update
* the key to "within_year_2022" and update the message event time to Jan 1st 2022.
* Otherwise, (exclusively after year 2022), update the key to "after_year_2022" and update the
* the tag to "within_year_2022" and update the message event time to Jan 1st 2022.
* Otherwise, (exclusively after year 2022), update the tag to "after_year_2022" and update the
* message event time to Jan 1st 2023.
*/
public class EventTimeFilterFunction extends MapTHandler {
Expand All @@ -40,6 +40,7 @@ public MessageTList processMessage(String[] keys, Datum data) {
new MessageT(
data.getValue(),
januaryFirst2022,
null,
new String[]{"within_year_2022"}))
.build();
} else {
Expand All @@ -48,6 +49,7 @@ public MessageTList processMessage(String[] keys, Datum data) {
.addMessage(new MessageT(
data.getValue(),
januaryFirst2023,
null,
new String[]{"after_year_2022"}))
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public void start() throws Exception {
Language.JAVA,
serverInfoAccessor.getSDKVersion(),
new HashMap<>());
log.info("Writing server info {} to %s", serverInfo, infoFilePath);
serverInfoAccessor.write(serverInfo, infoFilePath);

// build server
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/numaproj/numaflow/sink/SinkServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public void start() throws Exception {
Language.JAVA,
serverInfoAccessor.getSDKVersion(),
new HashMap<>());
log.info("Writing server info {} to %s", serverInfo, infoFilePath);
serverInfoAccessor.write(serverInfo, infoFilePath);

// build server
Expand Down

0 comments on commit cfbe68e

Please sign in to comment.