Skip to content

Commit

Permalink
Fix Orchestrator Query (CorfuDB#1084)
Browse files Browse the repository at this point in the history
Properly add/remove workflow ids when tracking active workflows.
  • Loading branch information
Maithem authored and no2chem committed Dec 14, 2017
1 parent 4acfbe4 commit c061e06
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 57 deletions.
2 changes: 1 addition & 1 deletion format/proto/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ enum OrchestratorResponseType {
// The status of a workflow
WORKFLOW_STATUS = 0;
// Id of a created workflow
WORKFLOW_ID = 1;
WORKFLOW_CREATE = 1;
}

message DataRank {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import org.corfudb.protocols.wireprotocol.CorfuMsgType;
import org.corfudb.protocols.wireprotocol.CorfuPayloadMsg;
import org.corfudb.protocols.wireprotocol.orchestrator.AddNodeRequest;
import org.corfudb.protocols.wireprotocol.orchestrator.AddNodeResponse;
import org.corfudb.protocols.wireprotocol.orchestrator.CreateWorkflowResponse;
import org.corfudb.protocols.wireprotocol.orchestrator.OrchestratorRequest;
import org.corfudb.protocols.wireprotocol.orchestrator.OrchestratorResponse;
import org.corfudb.protocols.wireprotocol.orchestrator.QueryRequest;
Expand All @@ -19,7 +19,6 @@

import javax.annotation.Nonnull;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
Expand All @@ -34,15 +33,14 @@
* cluster. Initiated through RPC, the orchestrator will create a workflow instance and attempt
* to execute all its actions.
*
* <p>
* Created by Maithem on 10/25/17.
*/

@Slf4j
public class Orchestrator {

final Callable<CorfuRuntime> getRuntime;
final Map<String, UUID> activeWorkflows = new ConcurrentHashMap();
final Map<UUID, String> activeWorkflows = new ConcurrentHashMap();

public Orchestrator(@Nonnull Callable<CorfuRuntime> runtime) {
this.getRuntime = runtime;
Expand All @@ -65,41 +63,46 @@ void handleQuery(CorfuPayloadMsg<OrchestratorRequest> msg, ChannelHandlerContext
QueryRequest req = (QueryRequest) msg.getPayload().getRequest();

Response resp;
if (activeWorkflows.values().contains(req.getId())) {
if (activeWorkflows.containsKey(req.getId())) {
resp = new QueryResponse(true);
log.trace("handleQuery: returning active for id {}", req.getId());
} else {
resp = new QueryResponse(false);
log.trace("handleQuery: returning not active for id {}", req.getId());
}

r.sendResponse(ctx, msg, CorfuMsgType.ORCHESTRATOR_RESPONSE
.payloadMsg(new OrchestratorResponse(resp )));
.payloadMsg(new OrchestratorResponse(resp)));
}

void addNode(CorfuPayloadMsg<OrchestratorRequest> msg, ChannelHandlerContext ctx, IServerRouter r) {
CompletableFuture.runAsync(() -> {
AddNodeRequest req = (AddNodeRequest) msg.getPayload().getRequest();
if (activeWorkflows.containsKey(req.getEndpoint())) {
// An add node workflow is already executing for this endpoint, return
// existing workflow id.
OrchestratorResponse resp = new OrchestratorResponse(
new AddNodeResponse(activeWorkflows.get(req.getEndpoint())));
r.sendResponse(ctx, msg, CorfuMsgType.ORCHESTRATOR_RESPONSE
.payloadMsg(resp));
return;
} else {
// Create a new workflow for this endpoint and return a new workflow id
Workflow workflow = getWorkflow(msg.getPayload());
activeWorkflows.put(req.getEndpoint(), workflow.getId());
OrchestratorResponse resp = new OrchestratorResponse(new AddNodeResponse(workflow.getId()));
r.sendResponse(ctx, msg, CorfuMsgType.ORCHESTRATOR_RESPONSE
.payloadMsg(resp));
AddNodeRequest req = (AddNodeRequest) msg.getPayload().getRequest();
if (findUUID(req.getEndpoint()) != null) {
// An add node workflow is already executing for this endpoint, return
// existing workflow id.
OrchestratorResponse resp = new OrchestratorResponse(
new CreateWorkflowResponse(findUUID(req.getEndpoint())));
r.sendResponse(ctx, msg, CorfuMsgType.ORCHESTRATOR_RESPONSE
.payloadMsg(resp));
log.trace("addNode: ignoring req for {}", findUUID(req.getEndpoint()));
return;
} else {
// Create a new workflow for this endpoint and return a new workflow id
Workflow workflow = getWorkflow(msg.getPayload());
activeWorkflows.put(workflow.getId(), req.getEndpoint());
log.trace("addNode: putting id {}", workflow.getId());
OrchestratorResponse resp = new OrchestratorResponse(new CreateWorkflowResponse(workflow.getId()));
r.sendResponse(ctx, msg, CorfuMsgType.ORCHESTRATOR_RESPONSE
.payloadMsg(resp));
CompletableFuture.runAsync(() -> {
run(workflow);
}
});
});
}
}

/**
* Create a workflow instance from an orchestrator request
*
* @param req Orchestrator request
* @return Workflow instance
*/
Expand All @@ -116,13 +119,14 @@ private Workflow getWorkflow(@Nonnull OrchestratorRequest req) {
/**
* Run a particular workflow, which entails executing all its defined
* actions
*
* @param workflow instance to run
*/
void run(@Nonnull Workflow workflow) {
CorfuRuntime rt = null;

try {
Layout currLayout = getRuntime.call().layout.get();
Layout currLayout = getRuntime.call().layout.get();
String servers = String.join(",", currLayout.getLayoutServers());
rt = new CorfuRuntime(servers)
.setCacheDisabled(true)
Expand Down Expand Up @@ -154,11 +158,20 @@ void run(@Nonnull Workflow workflow) {
log.error("run: Encountered an error while running workflow {}", workflow.getId(), e);
return;
} finally {
activeWorkflows.remove(workflow.getId());
log.debug("run: removed {} from {}",workflow.getId(), activeWorkflows);
if (rt != null) {
rt.shutdown();
}
}
}

activeWorkflows.remove(workflow.getId());
UUID findUUID(String endpoint) {
for (Map.Entry<UUID, String> entry : activeWorkflows.entrySet()) {
if (entry.getValue().equals(endpoint)) {
return entry.getKey();
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,29 @@
import java.nio.ByteBuffer;
import java.util.UUID;

import static org.corfudb.format.Types.OrchestratorResponseType.WORKFLOW_ID;
import static org.corfudb.format.Types.OrchestratorResponseType.WORKFLOW_CREATE;

/**
* AddNodeResponse returns the UUID of the add node workflow that was requested.
* CreateWorkflowResponse returns the UUID of a created workflow.
* @author Maithem
*/
public class AddNodeResponse implements Response {
public class CreateWorkflowResponse implements Response {

@Getter
public UUID workflowId;

public AddNodeResponse(UUID workflowId) {
public CreateWorkflowResponse(UUID workflowId) {
this.workflowId = workflowId;
}

public AddNodeResponse(byte[] buf) {
public CreateWorkflowResponse(byte[] buf) {
ByteBuffer bytes = ByteBuffer.wrap(buf);
this.workflowId = new UUID(bytes.getLong(), bytes.getLong());
}

@Override
public OrchestratorResponseType getType() {
return WORKFLOW_ID;
return WORKFLOW_CREATE;
}

@Override
Expand All @@ -38,4 +38,4 @@ public byte[] getSerialized() {
buf.putLong(workflowId.getLeastSignificantBits());
return buf.array();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ static Response mapResponse(OrchestratorResponseType type, byte[] payload) {
switch (type) {
case WORKFLOW_STATUS:
return new QueryResponse(payload);
case WORKFLOW_ID:
return new AddNodeResponse(payload);
case WORKFLOW_CREATE:
return new CreateWorkflowResponse(payload);
default:
throw new IllegalStateException("mapResponse: Unknown Type");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import org.corfudb.protocols.wireprotocol.CorfuPayloadMsg;
import org.corfudb.protocols.wireprotocol.FailureDetectorMsg;
import org.corfudb.protocols.wireprotocol.orchestrator.AddNodeRequest;
import org.corfudb.protocols.wireprotocol.orchestrator.AddNodeResponse;
import org.corfudb.protocols.wireprotocol.orchestrator.CreateWorkflowResponse;
import org.corfudb.protocols.wireprotocol.orchestrator.OrchestratorRequest;
import org.corfudb.protocols.wireprotocol.orchestrator.OrchestratorResponse;
import org.corfudb.protocols.wireprotocol.orchestrator.QueryRequest;
Expand Down Expand Up @@ -115,12 +115,12 @@ public CompletableFuture<byte[]> sendHeartbeatRequest() {
return router.sendMessageAndGetCompletable(CorfuMsgType.HEARTBEAT_REQUEST.msg());
}

public AddNodeResponse addNodeRequest(String endpoint) throws Exception {
public CreateWorkflowResponse addNodeRequest(String endpoint) throws Exception {
OrchestratorRequest req = new OrchestratorRequest(new AddNodeRequest(endpoint));
CompletableFuture<OrchestratorResponse> resp = router.sendMessageAndGetCompletable(CorfuMsgType
.ORCHESTRATOR_REQUEST
.payloadMsg(req));
return (AddNodeResponse) resp.get().getResponse();
return (CreateWorkflowResponse) resp.get().getResponse();
}

public QueryResponse queryRequest(UUID workflowId) throws Exception {
Expand Down
17 changes: 10 additions & 7 deletions test/src/test/java/org/corfudb/integration/AddNodeIT.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package org.corfudb.integration;

import lombok.extern.slf4j.Slf4j;
import org.corfudb.protocols.wireprotocol.orchestrator.AddNodeResponse;
import org.corfudb.protocols.wireprotocol.orchestrator.CreateWorkflowResponse;
import org.corfudb.runtime.CorfuRuntime;
import org.corfudb.runtime.MultiCheckpointWriter;
import org.corfudb.runtime.clients.ManagementClient;
import org.corfudb.runtime.collections.CorfuTable;
import org.corfudb.runtime.collections.SMRMap;
import org.junit.Test;

import java.util.UUID;
import java.util.concurrent.ExecutionException;

import static org.assertj.core.api.Assertions.assertThat;

Expand All @@ -31,9 +29,9 @@ public class AddNodeIT extends AbstractIT {

final String host = "localhost";

final int maxTries = 5;
final int maxTries = 10;

final int sleepTime = 1000;
final int sleepTime = 5_000;

String getConnectionString(int port) {
return host + ":" + port;
Expand Down Expand Up @@ -75,7 +73,7 @@ public void AddNodeTest() throws Exception {
ManagementClient mgmt = n1Rt.getRouter(getConnectionString(n1Port))
.getClient(ManagementClient.class);

AddNodeResponse resp = mgmt.addNodeRequest(getConnectionString(n2Port));
CreateWorkflowResponse resp = mgmt.addNodeRequest(getConnectionString(n2Port));

assertThat(resp.getWorkflowId()).isNotNull();

Expand All @@ -85,6 +83,9 @@ public void AddNodeTest() throws Exception {
final int clusterSizeN2 = 2;
assertThat(n1Rt.getLayoutView().getLayout().getAllServers().size()).isEqualTo(clusterSizeN2);

// Verify that the workflow ID for node 2 is no longer active
assertThat(mgmt.queryRequest(resp.getWorkflowId()).isActive()).isFalse();

MultiCheckpointWriter mcw = new MultiCheckpointWriter();
mcw.addMap(table);

Expand All @@ -104,7 +105,7 @@ public void AddNodeTest() throws Exception {
.setPort(n3Port)
.runServer();

AddNodeResponse resp2 = mgmt.addNodeRequest(getConnectionString(n3Port));
CreateWorkflowResponse resp2 = mgmt.addNodeRequest(getConnectionString(n3Port));
assertThat(resp2.getWorkflowId()).isNotNull();

waitForWorkflow(resp2.getWorkflowId(), n1Rt, n1Port);
Expand All @@ -113,6 +114,8 @@ public void AddNodeTest() throws Exception {
n1Rt.invalidateLayout();
final int clusterSizeN3 = 3;
assertThat(n1Rt.getLayoutView().getLayout().getAllServers().size()).isEqualTo(clusterSizeN3);
// Verify that the workflow ID for node 3 is no longer active
assertThat(mgmt.queryRequest(resp2.getWorkflowId()).isActive()).isFalse();
for (int x = 0; x < numEntries; x++) {
String v = (String) table.get(String.valueOf(x));
assertThat(v).isEqualTo(String.valueOf(x));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,11 @@
import com.google.common.collect.ImmutableSet;
import org.corfudb.format.Types.NodeMetrics;
import org.corfudb.infrastructure.*;
import org.corfudb.protocols.wireprotocol.orchestrator.AddNodeResponse;
import org.corfudb.protocols.wireprotocol.orchestrator.OrchestratorResponse;
import org.corfudb.protocols.wireprotocol.orchestrator.QueryResponse;
import org.junit.After;
import org.junit.Test;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -80,13 +77,6 @@ public void handleBootstrap()
.isInstanceOf(ExecutionException.class);
}

@Test
public void addNodeWorkflowRPCTest() throws Exception {
// Verify that a workflow id is generated for ID node.
AddNodeResponse resp = client.addNodeRequest("localhost:9000");
assertThat(resp.getWorkflowId()).isNotNull();
}

@Test
public void queryWorkflowRPCTest() throws Exception {
// verify that non-active workflows return false when queried.
Expand Down

0 comments on commit c061e06

Please sign in to comment.