Skip to content

Commit

Permalink
Orchestrator Service (CorfuDB#1014)
Browse files Browse the repository at this point in the history
* Orchestrator Service

Implementation of a workflow execution service that runs in the
management server.

* Add New Node Workflow

Implementation of a workflow that adds a new node to the cluster.
  • Loading branch information
Maithem authored and no2chem committed Dec 13, 2017
1 parent ffff81f commit 287ada9
Show file tree
Hide file tree
Showing 20 changed files with 1,065 additions and 4 deletions.
16 changes: 16 additions & 0 deletions format/proto/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,22 @@ enum CheckpointEntryType {
END = 2;
}

// The type of requests that can be made to the Orchestrator Service
enum OrchestratorRequestType {
// Query a workflow id
QUERY = 0;
// Add a new node to the cluster
ADD_NODE = 1;
}

// Orchestrator responses
enum OrchestratorResponseType {
// The status of a workflow
WORKFLOW_STATUS = 0;
// Id of a created workflow
WORKFLOW_ID = 1;
}

message DataRank {
required int64 rank = 1;
required int64 uuid_most_significant = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@
import lombok.extern.slf4j.Slf4j;

import org.corfudb.format.Types.NodeMetrics;

import org.corfudb.infrastructure.management.IFailureDetectorPolicy;
import org.corfudb.infrastructure.management.PollReport;

import org.corfudb.infrastructure.management.ReconfigurationEventHandler;
import org.corfudb.infrastructure.orchestrator.Orchestrator;
import org.corfudb.protocols.wireprotocol.CorfuMsg;
import org.corfudb.protocols.wireprotocol.CorfuMsgType;
import org.corfudb.protocols.wireprotocol.CorfuPayloadMsg;
import org.corfudb.protocols.wireprotocol.FailureDetectorMsg;
import org.corfudb.protocols.wireprotocol.orchestrator.OrchestratorRequest;
import org.corfudb.runtime.CorfuRuntime;
import org.corfudb.runtime.clients.LayoutClient;
import org.corfudb.runtime.clients.ManagementClient;
Expand All @@ -43,6 +43,8 @@
import org.corfudb.runtime.view.Layout;
import org.corfudb.runtime.view.QuorumFuturesFactory;

import javax.annotation.Nonnull;

/**
* Instantiates and performs failure detection and handling asynchronously.
*
Expand Down Expand Up @@ -114,6 +116,8 @@ public class ManagementServer extends AbstractServer {
@Getter
private volatile CompletableFuture<Boolean> sequencerBootstrappedFuture;

private final Orchestrator orchestrator;

/**
* Returns new ManagementServer.
*
Expand Down Expand Up @@ -176,6 +180,8 @@ public ManagementServer(ServerContext serverContext) {
} catch (RejectedExecutionException err) {
log.error("Error scheduling failure detection task, {}", err);
}

orchestrator = new Orchestrator(this::getCorfuRuntime);
}

private void bootstrapPrimarySequencerServer() {
Expand Down Expand Up @@ -263,6 +269,23 @@ private boolean checkBootstrap(CorfuMsg msg, ChannelHandlerContext ctx, IServerR
return true;
}

/**
* Forward an orchestrator request to the orchestrator service.
*
* @param msg corfu message containing ORCHESTRATOR_REQUEST
* @param ctx netty ChannelHandlerContext
* @param r server router
*/
@ServerHandler(type = CorfuMsgType.ORCHESTRATOR_REQUEST, opTimer = metricsPrefix
+ "orchestrator-request")
public synchronized void handleOrchestratorMsg(@Nonnull CorfuPayloadMsg<OrchestratorRequest> msg,
@Nonnull ChannelHandlerContext ctx,
@Nonnull IServerRouter r,
boolean isMetricsEnabled) {
log.debug("Received an orchestrator message {}", msg);
orchestrator.handle(msg, ctx, r);
}

/**
* Bootstraps the management server.
* The msg contains the layout to be bootstrapped.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package org.corfudb.infrastructure.orchestrator;

import lombok.extern.slf4j.Slf4j;
import org.corfudb.runtime.CorfuRuntime;

import javax.annotation.Nonnull;

/**
*
* A workflow action. All workflow actions must extend this class.
*
* Created by Maithem on 10/25/17.
*/

@Slf4j
public abstract class Action {

ActionStatus status = ActionStatus.CREATED;

/**
* Returns the name of this action.
* @return Name of action
*/
@Nonnull
public abstract String getName();

/**
* The implementation of the action
* @param runtime A runtime that the action will use to execute
* @throws Exception
*/
public abstract void impl(@Nonnull CorfuRuntime runtime) throws Exception;

/**
* Execute the action.
*/
@Nonnull
public void execute(@Nonnull CorfuRuntime runtime) {
try {
changeStatus(ActionStatus.STARTED);
impl(runtime);
changeStatus(ActionStatus.COMPLETED);
} catch (Exception e) {
log.error("execute: error executing action", e);
changeStatus(ActionStatus.ERROR);
}
}

/**
* Get the status of this action.
* @return ActionStatus
*/
public ActionStatus getStatus() {
return status;
}

/**
* Changes the status of this action
* @param newStatus the new status
*/
void changeStatus(ActionStatus newStatus) {
status = newStatus;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.corfudb.infrastructure.orchestrator;

/**
* The possible states in which an action can be in.
*
* Created by Maithem on 10/25/17.
*/

public enum ActionStatus {

/**
* The state of the action when it gets created
*/
CREATED,

/**
* The state of the action once it is invoked
*/
STARTED,

/**
* The state of the action once it completes successfully
*/
COMPLETED,

/**
* The state of the action if it fails executing
*/
ERROR
}
Loading

0 comments on commit 287ada9

Please sign in to comment.