forked from numaproj/numaflow-java
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: implement sdk for session reduce (numaproj#94)
Signed-off-by: Keran Yang <[email protected]>
- Loading branch information
Showing
38 changed files
with
4,568 additions
and
15 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
23 changes: 23 additions & 0 deletions
23
examples/src/main/java/io/numaproj/numaflow/examples/reducesession/counter/CountFactory.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
package io.numaproj.numaflow.examples.reducesession.counter; | ||
|
||
import io.numaproj.numaflow.sessionreducer.Server; | ||
import io.numaproj.numaflow.sessionreducer.model.SessionReducerFactory; | ||
import lombok.extern.slf4j.Slf4j; | ||
|
||
/** | ||
* CountFactory extends SessionReducerFactory to support creating instances of SumFunction. | ||
* It also provides a main function to start a server for handling the session reduce stream. | ||
*/ | ||
@Slf4j | ||
public class CountFactory extends SessionReducerFactory<CountFunction> { | ||
|
||
public static void main(String[] args) throws Exception { | ||
log.info("count udf was invoked"); | ||
new Server(new CountFactory()).start(); | ||
} | ||
|
||
@Override | ||
public CountFunction createSessionReducer() { | ||
return new CountFunction(); | ||
} | ||
} |
48 changes: 48 additions & 0 deletions
48
...ples/src/main/java/io/numaproj/numaflow/examples/reducesession/counter/CountFunction.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
package io.numaproj.numaflow.examples.reducesession.counter; | ||
|
||
import io.numaproj.numaflow.sessionreducer.model.Datum; | ||
import io.numaproj.numaflow.sessionreducer.model.Message; | ||
import io.numaproj.numaflow.sessionreducer.model.SessionReducer; | ||
import lombok.extern.slf4j.Slf4j; | ||
|
||
import java.util.concurrent.atomic.AtomicInteger; | ||
|
||
/** | ||
* CountFunction is a simple session reducer which counts the number of events in a session. | ||
*/ | ||
@Slf4j | ||
public class CountFunction extends SessionReducer { | ||
|
||
private final AtomicInteger count = new AtomicInteger(0); | ||
|
||
@Override | ||
public void processMessage( | ||
String[] keys, | ||
Datum datum, | ||
io.numaproj.numaflow.sessionreducer.model.OutputStreamObserver outputStreamObserver) { | ||
this.count.incrementAndGet(); | ||
} | ||
|
||
@Override | ||
public void handleEndOfStream( | ||
String[] keys, | ||
io.numaproj.numaflow.sessionreducer.model.OutputStreamObserver outputStreamObserver) { | ||
outputStreamObserver.send(new Message(String.valueOf(this.count.get()).getBytes())); | ||
} | ||
|
||
@Override | ||
public byte[] accumulator() { | ||
return String.valueOf(this.count.get()).getBytes(); | ||
} | ||
|
||
@Override | ||
public void mergeAccumulator(byte[] accumulator) { | ||
int value = 0; | ||
try { | ||
value = Integer.parseInt(new String(accumulator)); | ||
} catch (NumberFormatException e) { | ||
log.info("error while parsing integer - {}", e.getMessage()); | ||
} | ||
this.count.addAndGet(value); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
61 changes: 61 additions & 0 deletions
61
src/main/java/io/numaproj/numaflow/sessionreducer/ActorRequest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
package io.numaproj.numaflow.sessionreducer; | ||
|
||
import io.numaproj.numaflow.sessionreduce.v1.Sessionreduce; | ||
import lombok.Builder; | ||
import lombok.Getter; | ||
|
||
/** | ||
* ActorRequest is used by the supervisor actor to distribute session reduce operations to | ||
* individual session reducer actors. One actor request is sent to only one session reducer actor. | ||
*/ | ||
@Getter | ||
class ActorRequest { | ||
private final ActorRequestType type; | ||
// the window of the target session the actor request is sent to | ||
private final Sessionreduce.KeyedWindow keyedWindow; | ||
// the new keyed window the target session is to be expanded to | ||
// it is specified only when the actor request is an EXPAND | ||
private final Sessionreduce.KeyedWindow newKeyedWindow; | ||
// the payload of the request | ||
private final Sessionreduce.SessionReduceRequest.Payload payload; | ||
// the id of the merge task this request belongs to, it's equal to the unique id of the merged window, | ||
// it is specified only when the actor request is a GET_ACCUMULATOR | ||
private final String mergeTaskId; | ||
|
||
@Builder | ||
private ActorRequest( | ||
ActorRequestType type, | ||
Sessionreduce.KeyedWindow keyedWindow, | ||
Sessionreduce.KeyedWindow newKeyedWindow, | ||
Sessionreduce.SessionReduceRequest.Payload payload, | ||
String mergeTaskId | ||
) { | ||
this.type = type; | ||
this.keyedWindow = keyedWindow; | ||
this.newKeyedWindow = newKeyedWindow; | ||
this.payload = payload; | ||
this.mergeTaskId = mergeTaskId; | ||
} | ||
|
||
static class ActorRequestBuilder { | ||
ActorRequest build() { | ||
if (newKeyedWindow != null && type != ActorRequestType.EXPAND) { | ||
throw new IllegalStateException( | ||
"attribute newKeyedWindow can only be set when the request is an EXPAND."); | ||
} | ||
if (newKeyedWindow == null && type == ActorRequestType.EXPAND) { | ||
throw new IllegalStateException( | ||
"attribute newKeyedWindow must be set when the request is an EXPAND."); | ||
} | ||
if (mergeTaskId != null && type != ActorRequestType.GET_ACCUMULATOR) { | ||
throw new IllegalStateException( | ||
"attribute mergeTaskId can only be set when the request is a GET_ACCUMULATOR."); | ||
} | ||
if (mergeTaskId == null && type == ActorRequestType.GET_ACCUMULATOR) { | ||
throw new IllegalStateException( | ||
"attribute mergeTaskId must be set when the request is a GET_ACCUMULATOR."); | ||
} | ||
return new ActorRequest(type, keyedWindow, newKeyedWindow, payload, mergeTaskId); | ||
} | ||
} | ||
} |
17 changes: 17 additions & 0 deletions
17
src/main/java/io/numaproj/numaflow/sessionreducer/ActorRequestType.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
package io.numaproj.numaflow.sessionreducer; | ||
|
||
/** | ||
* ActorRequestType represents the purpose of an ActorRequest. | ||
*/ | ||
public enum ActorRequestType { | ||
// open a brand-new session window | ||
OPEN, | ||
// append a message to an existing session window | ||
APPEND, | ||
// close a session window | ||
CLOSE, | ||
// expand a session window | ||
EXPAND, | ||
// ask a to-be-merged session window for it's accumulator | ||
GET_ACCUMULATOR, | ||
} |
52 changes: 52 additions & 0 deletions
52
src/main/java/io/numaproj/numaflow/sessionreducer/ActorResponse.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
package io.numaproj.numaflow.sessionreducer; | ||
|
||
import io.numaproj.numaflow.sessionreduce.v1.Sessionreduce; | ||
import lombok.Builder; | ||
import lombok.Getter; | ||
import lombok.Setter; | ||
|
||
/** | ||
* The actor response holds the session reduce response from a particular session window. | ||
*/ | ||
@Getter | ||
@Setter | ||
class ActorResponse { | ||
Sessionreduce.SessionReduceResponse response; | ||
// The isLast attribute indicates whether the response is globally the last one to be sent to | ||
// the output gRPC stream, if set to true, it means the response is the very last response among | ||
// all windows. When output actor receives an isLast response, it sends the response to and immediately | ||
// closes the output stream. | ||
boolean isLast; | ||
// The accumulator attribute holds the accumulator of the session. | ||
byte[] accumulator; | ||
// The mergeTaskId attribute holds the id of the merged window that this session is to be merged into. | ||
String mergeTaskId; | ||
|
||
@Builder | ||
private ActorResponse( | ||
Sessionreduce.SessionReduceResponse response, | ||
boolean isLast, | ||
byte[] accumulator, | ||
String mergeTaskId | ||
) { | ||
this.response = response; | ||
this.isLast = isLast; | ||
this.accumulator = accumulator; | ||
this.mergeTaskId = mergeTaskId; | ||
} | ||
|
||
static class ActorResponseBuilder { | ||
ActorResponse build() { | ||
if ((accumulator != null && mergeTaskId == null) || (accumulator == null | ||
&& mergeTaskId != null)) { | ||
throw new IllegalStateException( | ||
"attributes accumulator and mergeTaskId should be either both null or both non-null."); | ||
} | ||
return new ActorResponse(response, isLast, accumulator, mergeTaskId); | ||
} | ||
} | ||
|
||
boolean isEOFResponse() { | ||
return this.accumulator == null && this.mergeTaskId == null; | ||
} | ||
} |
15 changes: 15 additions & 0 deletions
15
src/main/java/io/numaproj/numaflow/sessionreducer/Constants.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
package io.numaproj.numaflow.sessionreducer; | ||
|
||
class Constants { | ||
public static final int DEFAULT_MESSAGE_SIZE = 1024 * 1024 * 64; | ||
|
||
public static final String DEFAULT_SOCKET_PATH = "/var/run/numaflow/sessionreduce.sock"; | ||
|
||
public static final String DEFAULT_SERVER_INFO_FILE_PATH = "/var/run/numaflow/sessionreducer-server-info"; | ||
|
||
public static final String EOF = "EOF"; | ||
|
||
public static final String SUCCESS = "SUCCESS"; | ||
|
||
public static final String DELIMITER = ":"; | ||
} |
Oops, something went wrong.