Skip to content

Commit

Permalink
feat: container-type level version compatibility check (#140)
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang authored Sep 26, 2024
1 parent f66f67c commit babc8e1
Show file tree
Hide file tree
Showing 15 changed files with 91 additions and 22 deletions.
2 changes: 2 additions & 0 deletions src/main/java/io/numaproj/numaflow/batchmapper/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.ServerBuilder;
import io.numaproj.numaflow.info.ContainerType;
import io.numaproj.numaflow.info.ServerInfoAccessor;
import io.numaproj.numaflow.info.ServerInfoAccessorImpl;
import io.numaproj.numaflow.shared.GrpcServerUtils;
Expand Down Expand Up @@ -52,6 +53,7 @@ public void start() throws Exception {
serverInfoAccessor,
grpcConfig.getSocketPath(),
grpcConfig.getInfoFilePath(),
ContainerType.MAPPER,
Collections.singletonMap(Constants.MAP_MODE_KEY, Constants.MAP_MODE));

if (this.server == null) {
Expand Down
26 changes: 26 additions & 0 deletions src/main/java/io/numaproj/numaflow/info/ContainerType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.numaproj.numaflow.info;

import com.fasterxml.jackson.annotation.JsonValue;

public enum ContainerType {
SOURCER("sourcer"),
SOURCE_TRANSFORMER("sourcetransformer"),
SINKER("sinker"),
MAPPER("mapper"),
REDUCER("reducer"),
REDUCE_STREAMER("reducestreamer"),
SESSION_REDUCER("sessionreducer"),
SIDEINPUT("sideinput"),
FBSINKER("fb-sinker");

private final String name;

ContainerType(String name) {
this.name = name;
}

@JsonValue
public String getName() {
return name;
}
}
14 changes: 13 additions & 1 deletion src/main/java/io/numaproj/numaflow/info/ServerInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

import java.util.Map;

import static java.util.Map.entry;

/**
* Server Information to be used by client to determine:
* - protocol: what is right protocol to use (UDS or TCP)
Expand All @@ -24,7 +26,17 @@ public class ServerInfo {
// Specify the minimum Numaflow version required by the current SDK version
// To update this value, please follow the instructions for MINIMUM_NUMAFLOW_VERSION in
// https://github.com/numaproj/numaflow-rs/blob/main/src/shared.rs
public static final String MINIMUM_NUMAFLOW_VERSION = "1.3.1-z";
public static final Map<ContainerType, String> MINIMUM_NUMAFLOW_VERSION = Map.ofEntries(
entry(ContainerType.SOURCER, "1.3.1-z"),
entry(ContainerType.SOURCE_TRANSFORMER, "1.3.1-z"),
entry(ContainerType.SINKER, "1.3.1-z"),
entry(ContainerType.MAPPER, "1.3.1-z"),
entry(ContainerType.REDUCER, "1.3.1-z"),
entry(ContainerType.REDUCE_STREAMER, "1.3.1-z"),
entry(ContainerType.SESSION_REDUCER, "1.3.1-z"),
entry(ContainerType.SIDEINPUT, "1.3.1-z"),
entry(ContainerType.FBSINKER, "1.3.1-z")
);
@JsonProperty("protocol")
private Protocol protocol;
@JsonProperty("language")
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/io/numaproj/numaflow/mapper/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import io.grpc.ServerBuilder;
import io.numaproj.numaflow.info.ContainerType;
import io.numaproj.numaflow.info.ServerInfoAccessor;
import io.numaproj.numaflow.info.ServerInfoAccessorImpl;
import io.numaproj.numaflow.shared.GrpcServerUtils;
Expand Down Expand Up @@ -55,11 +56,12 @@ public void start() throws Exception {
serverInfoAccessor,
grpcConfig.getSocketPath(),
grpcConfig.getInfoFilePath(),
ContainerType.MAPPER,
Collections.singletonMap(Constants.MAP_MODE_KEY, Constants.MAP_MODE));
}

if (this.server == null) {
ServerBuilder<?> serverBuilder = null;
ServerBuilder<?> serverBuilder;
// create server builder for domain socket server
serverBuilder = GrpcServerUtils.createServerBuilder(
grpcConfig.getSocketPath(),
Expand All @@ -79,7 +81,7 @@ public void start() throws Exception {
log.info(
"Server started, listening on {}",
grpcConfig.isLocal() ?
"localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath());
"localhost:" + grpcConfig.getPort():grpcConfig.getSocketPath());

// register shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/numaproj/numaflow/mapstreamer/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.ServerBuilder;
import io.numaproj.numaflow.info.ContainerType;
import io.numaproj.numaflow.info.ServerInfoAccessor;
import io.numaproj.numaflow.info.ServerInfoAccessorImpl;
import io.numaproj.numaflow.shared.GrpcServerUtils;
Expand Down Expand Up @@ -52,6 +53,7 @@ public void start() throws Exception {
serverInfoAccessor,
grpcConfig.getSocketPath(),
grpcConfig.getInfoFilePath(),
ContainerType.MAPPER,
Collections.singletonMap(Constants.MAP_MODE_KEY, Constants.MAP_MODE));

if (this.server == null) {
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/io/numaproj/numaflow/reducer/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.ServerBuilder;
import io.numaproj.numaflow.info.ContainerType;
import io.numaproj.numaflow.info.ServerInfoAccessor;
import io.numaproj.numaflow.info.ServerInfoAccessorImpl;
import io.numaproj.numaflow.shared.GrpcServerUtils;
Expand Down Expand Up @@ -51,7 +52,8 @@ public void start() throws Exception {
GrpcServerUtils.writeServerInfo(
serverInfoAccessor,
grpcConfig.getSocketPath(),
grpcConfig.getInfoFilePath());
grpcConfig.getInfoFilePath(),
ContainerType.REDUCER);
}

if (this.server == null) {
Expand All @@ -74,7 +76,7 @@ public void start() throws Exception {
log.info(
"Server started, listening on {}",
grpcConfig.isLocal() ?
"localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath());
"localhost:" + grpcConfig.getPort():grpcConfig.getSocketPath());

// register shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/io/numaproj/numaflow/reducestreamer/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.ServerBuilder;
import io.numaproj.numaflow.info.ContainerType;
import io.numaproj.numaflow.info.ServerInfoAccessor;
import io.numaproj.numaflow.info.ServerInfoAccessorImpl;
import io.numaproj.numaflow.reducestreamer.model.ReduceStreamer;
Expand Down Expand Up @@ -54,7 +55,8 @@ public void start() throws Exception {
GrpcServerUtils.writeServerInfo(
serverInfoAccessor,
grpcConfig.getSocketPath(),
grpcConfig.getInfoFilePath());
grpcConfig.getInfoFilePath(),
ContainerType.REDUCE_STREAMER);
}

if (this.server == null) {
Expand All @@ -77,7 +79,7 @@ public void start() throws Exception {
log.info(
"Server started, listening on {}",
grpcConfig.isLocal() ?
"localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath());
"localhost:" + grpcConfig.getPort():grpcConfig.getSocketPath());

// register shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/io/numaproj/numaflow/sessionreducer/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.ServerBuilder;
import io.numaproj.numaflow.info.ContainerType;
import io.numaproj.numaflow.info.ServerInfoAccessor;
import io.numaproj.numaflow.info.ServerInfoAccessorImpl;
import io.numaproj.numaflow.sessionreducer.model.SessionReducer;
Expand Down Expand Up @@ -54,7 +55,8 @@ public void start() throws Exception {
GrpcServerUtils.writeServerInfo(
serverInfoAccessor,
grpcConfig.getSocketPath(),
grpcConfig.getInfoFilePath());
grpcConfig.getInfoFilePath(),
ContainerType.SESSION_REDUCER);
}

if (this.server == null) {
Expand All @@ -77,7 +79,7 @@ public void start() throws Exception {
log.info(
"Server started, listening on {}",
grpcConfig.isLocal() ?
"localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath());
"localhost:" + grpcConfig.getPort():grpcConfig.getSocketPath());

// register shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Expand Down
16 changes: 13 additions & 3 deletions src/main/java/io/numaproj/numaflow/shared/GrpcServerUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.channel.kqueue.KQueueServerDomainSocketChannel;
import io.netty.channel.unix.DomainSocketAddress;
import io.numaproj.numaflow.info.ContainerType;
import io.numaproj.numaflow.info.Language;
import io.numaproj.numaflow.info.Protocol;
import io.numaproj.numaflow.info.ServerInfo;
Expand All @@ -30,6 +31,8 @@
import java.util.HashMap;
import java.util.Map;

import static io.numaproj.numaflow.info.ServerInfo.MINIMUM_NUMAFLOW_VERSION;

/**
* GrpcServerUtils is the utility class for netty server channel.
*/
Expand Down Expand Up @@ -80,14 +83,21 @@ public static EventLoopGroup createEventLoopGroup(int threads, String name) {
public static void writeServerInfo(
ServerInfoAccessor serverInfoAccessor,
String socketPath,
String infoFilePath) throws Exception {
writeServerInfo(serverInfoAccessor, socketPath, infoFilePath, new HashMap<>());
String infoFilePath,
ContainerType containerType) throws Exception {
writeServerInfo(
serverInfoAccessor,
socketPath,
infoFilePath,
containerType,
new HashMap<>());
}

public static void writeServerInfo(
ServerInfoAccessor serverInfoAccessor,
String socketPath,
String infoFilePath,
ContainerType containerType,
Map<String, String> metaData) throws Exception {
// cleanup socket path if it exists (unit test builder doesn't use one)
if (socketPath != null) {
Expand All @@ -111,7 +121,7 @@ public static void writeServerInfo(
ServerInfo serverInfo = new ServerInfo(
Protocol.UDS_PROTOCOL,
Language.JAVA,
ServerInfo.MINIMUM_NUMAFLOW_VERSION,
MINIMUM_NUMAFLOW_VERSION.get(containerType),
serverInfoAccessor.getSDKVersion(),
metaData);
log.info("Writing server info {} to {}", serverInfo, infoFilePath);
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/io/numaproj/numaflow/sideinput/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.ServerBuilder;
import io.numaproj.numaflow.info.ContainerType;
import io.numaproj.numaflow.info.ServerInfoAccessor;
import io.numaproj.numaflow.info.ServerInfoAccessorImpl;
import io.numaproj.numaflow.shared.GrpcServerUtils;
Expand Down Expand Up @@ -51,7 +52,8 @@ public void start() throws Exception {
GrpcServerUtils.writeServerInfo(
serverInfoAccessor,
grpcConfig.getSocketPath(),
grpcConfig.getInfoFilePath());
grpcConfig.getInfoFilePath(),
ContainerType.SIDEINPUT);
}

if (this.server == null) {
Expand All @@ -74,7 +76,7 @@ public void start() throws Exception {
log.info(
"Server started, listening on {}",
grpcConfig.isLocal() ?
"localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath());
"localhost:" + grpcConfig.getPort():grpcConfig.getSocketPath());

// register shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/io/numaproj/numaflow/sinker/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import io.grpc.ServerBuilder;
import io.numaproj.numaflow.info.ContainerType;
import io.numaproj.numaflow.info.ServerInfoAccessor;
import io.numaproj.numaflow.info.ServerInfoAccessorImpl;
import io.numaproj.numaflow.shared.GrpcServerUtils;
Expand Down Expand Up @@ -50,7 +51,8 @@ public void start() throws Exception {
GrpcServerUtils.writeServerInfo(
serverInfoAccessor,
grpcConfig.getSocketPath(),
grpcConfig.getInfoFilePath());
grpcConfig.getInfoFilePath(),
ContainerType.SINKER);
}

if (this.server == null) {
Expand All @@ -73,7 +75,7 @@ public void start() throws Exception {
log.info(
"Server started, listening on {}",
grpcConfig.isLocal() ?
"localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath());
"localhost:" + grpcConfig.getPort():grpcConfig.getSocketPath());

// register shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/io/numaproj/numaflow/sourcer/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.ServerBuilder;
import io.numaproj.numaflow.info.ContainerType;
import io.numaproj.numaflow.info.ServerInfoAccessor;
import io.numaproj.numaflow.info.ServerInfoAccessorImpl;
import io.numaproj.numaflow.shared.GrpcServerUtils;
Expand Down Expand Up @@ -51,7 +52,8 @@ public void start() throws Exception {
GrpcServerUtils.writeServerInfo(
serverInfoAccessor,
grpcConfig.getSocketPath(),
grpcConfig.getInfoFilePath());
grpcConfig.getInfoFilePath(),
ContainerType.SOURCER);
}

if (this.server == null) {
Expand All @@ -74,7 +76,7 @@ public void start() throws Exception {
log.info(
"Server started, listening on {}",
grpcConfig.isLocal() ?
"localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath());
"localhost:" + grpcConfig.getPort():grpcConfig.getSocketPath());

// register shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.ServerBuilder;
import io.numaproj.numaflow.info.ContainerType;
import io.numaproj.numaflow.info.ServerInfoAccessor;
import io.numaproj.numaflow.info.ServerInfoAccessorImpl;
import io.numaproj.numaflow.shared.GrpcServerUtils;
Expand Down Expand Up @@ -51,7 +52,8 @@ public void start() throws Exception {
GrpcServerUtils.writeServerInfo(
serverInfoAccessor,
grpcConfig.getSocketPath(),
grpcConfig.getInfoFilePath());
grpcConfig.getInfoFilePath(),
ContainerType.SOURCE_TRANSFORMER);
}

if (this.server == null) {
Expand All @@ -74,7 +76,7 @@ public void start() throws Exception {
log.info(
"Server started, listening on {}",
grpcConfig.isLocal() ?
"localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath());
"localhost:" + grpcConfig.getPort():grpcConfig.getSocketPath());

// register shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public void given_writeServerInfo_when_read_then_returnExactSame() {
ServerInfo testServerInfo = new ServerInfo(
Protocol.TCP_PROTOCOL,
Language.JAVA,
ServerInfo.MINIMUM_NUMAFLOW_VERSION,
"1.3.1-z",
"0.4.3",
new HashMap<>() {{
put("key1", "value1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.grpc.ServerBuilder;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.numaproj.numaflow.info.ContainerType;
import io.numaproj.numaflow.info.ServerInfoAccessor;
import org.junit.Assert;
import org.junit.Test;
Expand All @@ -29,7 +30,7 @@ public void testCreateEventLoopGroup() {
public void testWriteServerInfo() throws Exception {
ServerInfoAccessor mockAccessor = Mockito.mock(ServerInfoAccessor.class);
Mockito.when(mockAccessor.getSDKVersion()).thenReturn("1.0.0");
GrpcServerUtils.writeServerInfo(mockAccessor, null, "infoFilePath");
GrpcServerUtils.writeServerInfo(mockAccessor, null, "infoFilePath", ContainerType.MAPPER);
Mockito
.verify(mockAccessor, Mockito.times(1))
.write(Mockito.any(), Mockito.eq("infoFilePath"));
Expand Down

0 comments on commit babc8e1

Please sign in to comment.