Skip to content

Commit

Permalink
KAFKA-7790: Fix Bugs in Trogdor Task Expiration (apache#6103)
Browse files Browse the repository at this point in the history
The Trogdor Coordinator now overwrites a task's startMs to the time it received it if startMs is in the past.

The Trogdor Agent now correctly expires a task after the expiry time (startMs + durationMs) passes. Previously, it would ignore startMs and expire after durationMs milliseconds of local start of the task.

Reviewed-by: Colin P. McCabe <[email protected]>
  • Loading branch information
stanislavkozlovski authored and cmccabe committed Jan 11, 2019
1 parent 3746bf2 commit 625e0d8
Show file tree
Hide file tree
Showing 7 changed files with 282 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,8 @@ WorkerState state() {
void transitionToRunning() {
state = State.RUNNING;
timeoutFuture = scheduler.schedule(stateChangeExecutor,
new StopWorker(workerId, false), spec.durationMs());
new StopWorker(workerId, false),
Math.max(0, spec.endMs() - time.milliseconds()));
}

void transitionToStopping() {
Expand Down Expand Up @@ -316,6 +317,12 @@ public void createWorker(long workerId, String taskId, TaskSpec spec) throws Thr
"a worker with that id.", nodeName, workerId);
return;
}
if (worker.spec.endMs() <= time.milliseconds()) {
log.info("{}: Will not run worker {} as it has expired.", nodeName, worker);
stateChangeExecutor.submit(new HandleWorkerHalting(worker,
"worker expired", true));
return;
}
KafkaFutureImpl<String> haltFuture = new KafkaFutureImpl<>();
haltFuture.thenApply((KafkaFuture.BaseFunction<String, Void>) errorString -> {
if (errorString == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,12 @@
import org.apache.kafka.trogdor.rest.AgentStatusResponse;
import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
import org.apache.kafka.trogdor.rest.StopWorkerRequest;
import org.apache.kafka.trogdor.rest.WorkerDone;
import org.apache.kafka.trogdor.rest.WorkerReceiving;
import org.apache.kafka.trogdor.rest.WorkerRunning;
import org.apache.kafka.trogdor.rest.WorkerStarting;
import org.apache.kafka.trogdor.rest.WorkerState;
import org.apache.kafka.trogdor.rest.WorkerStopping;
import org.apache.kafka.trogdor.task.TaskSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -202,46 +204,58 @@ public void run() {
if (log.isTraceEnabled()) {
log.trace("{}: got heartbeat status {}", node.name(), agentStatus);
}
// Identify workers which we think should be running, but which do not appear
// in the agent's response. We need to send startWorker requests for these.
for (Map.Entry<Long, ManagedWorker> entry : workers.entrySet()) {
Long workerId = entry.getKey();
if (!agentStatus.workers().containsKey(workerId)) {
ManagedWorker worker = entry.getValue();
if (worker.shouldRun) {
worker.tryCreate();
}
handleMissingWorkers(agentStatus);
handlePresentWorkers(agentStatus);
} catch (Throwable e) {
log.error("{}: Unhandled exception in NodeHeartbeatRunnable", node.name(), e);
}
}

/**
* Identify workers which we think should be running but do not appear in the agent's response.
* We need to send startWorker requests for those
*/
private void handleMissingWorkers(AgentStatusResponse agentStatus) {
for (Map.Entry<Long, ManagedWorker> entry : workers.entrySet()) {
Long workerId = entry.getKey();
if (!agentStatus.workers().containsKey(workerId)) {
ManagedWorker worker = entry.getValue();
if (worker.shouldRun) {
worker.tryCreate();
}
}
for (Map.Entry<Long, WorkerState> entry : agentStatus.workers().entrySet()) {
long workerId = entry.getKey();
WorkerState state = entry.getValue();
ManagedWorker worker = workers.get(workerId);
if (worker == null) {
// Identify tasks which are running, but which we don't know about.
// Add these to the NodeManager as tasks that should not be running.
log.warn("{}: scheduling unknown worker with ID {} for stopping.", node.name(), workerId);
workers.put(workerId, new ManagedWorker(workerId, state.taskId(),
state.spec(), false, state));
} else {
// Handle workers which need to be stopped.
if (state instanceof WorkerStarting || state instanceof WorkerRunning) {
if (!worker.shouldRun) {
worker.tryStop();
}
}
// Notify the TaskManager if the worker state has changed.
if (worker.state.equals(state)) {
log.debug("{}: worker state is still {}", node.name(), worker.state);
} else {
log.info("{}: worker state changed from {} to {}", node.name(), worker.state, state);
worker.state = state;
taskManager.updateWorkerState(node.name(), worker.workerId, state);
}
}

private void handlePresentWorkers(AgentStatusResponse agentStatus) {
for (Map.Entry<Long, WorkerState> entry : agentStatus.workers().entrySet()) {
long workerId = entry.getKey();
WorkerState state = entry.getValue();
ManagedWorker worker = workers.get(workerId);
if (worker == null) {
// Identify tasks which are running, but which we don't know about.
// Add these to the NodeManager as tasks that should not be running.
log.warn("{}: scheduling unknown worker with ID {} for stopping.", node.name(), workerId);
workers.put(workerId, new ManagedWorker(workerId, state.taskId(),
state.spec(), false, state));
} else {
// Handle workers which need to be stopped.
if (state instanceof WorkerStarting || state instanceof WorkerRunning) {
if (!worker.shouldRun) {
worker.tryStop();
}
}
// Notify the TaskManager if the worker state has changed.
if (worker.state.equals(state)) {
log.debug("{}: worker state is still {}", node.name(), worker.state);
} else {
log.info("{}: worker state changed from {} to {}", node.name(), worker.state, state);
if (state instanceof WorkerDone || state instanceof WorkerStopping)
worker.shouldRun = false;
worker.state = state;
taskManager.updateWorkerState(node.name(), worker.workerId, state);
}
}
} catch (Throwable e) {
log.error("{}: Unhandled exception in NodeHeartbeatRunnable", node.name(), e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.kafka.trogdor.coordinator;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.LongNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.InvalidRequestException;
Expand Down Expand Up @@ -150,7 +152,13 @@ class ManagedTask {
final private String id;

/**
* The task specification.
* The original task specification as submitted when the task was created.
*/
final private TaskSpec originalSpec;

/**
* The effective task specification.
* The start time will be adjusted to reflect the time when the task was submitted.
*/
final private TaskSpec spec;

Expand Down Expand Up @@ -195,8 +203,10 @@ class ManagedTask {
*/
private String error = "";

ManagedTask(String id, TaskSpec spec, TaskController controller, TaskStateType state) {
ManagedTask(String id, TaskSpec originalSpec, TaskSpec spec,
TaskController controller, TaskStateType state) {
this.id = id;
this.originalSpec = originalSpec;
this.spec = spec;
this.controller = controller;
this.state = state;
Expand Down Expand Up @@ -297,7 +307,7 @@ public void createTask(final String id, TaskSpec spec)
throws Throwable {
try {
executor.submit(new CreateTask(id, spec)).get();
} catch (ExecutionException e) {
} catch (ExecutionException | JsonProcessingException e) {
log.info("createTask(id={}, spec={}) error", id, spec, e);
throw e.getCause();
}
Expand All @@ -308,11 +318,15 @@ public void createTask(final String id, TaskSpec spec)
*/
class CreateTask implements Callable<Void> {
private final String id;
private final TaskSpec originalSpec;
private final TaskSpec spec;

CreateTask(String id, TaskSpec spec) {
CreateTask(String id, TaskSpec spec) throws JsonProcessingException {
this.id = id;
this.spec = spec;
this.originalSpec = spec;
ObjectNode node = JsonUtil.JSON_SERDE.valueToTree(originalSpec);
node.set("startMs", new LongNode(Math.max(time.milliseconds(), originalSpec.startMs())));
this.spec = JsonUtil.JSON_SERDE.treeToValue(node, TaskSpec.class);
}

@Override
Expand All @@ -322,11 +336,11 @@ public Void call() throws Exception {
}
ManagedTask task = tasks.get(id);
if (task != null) {
if (!task.spec.equals(spec)) {
if (!task.originalSpec.equals(originalSpec)) {
throw new RequestConflictException("Task ID " + id + " already " +
"exists, and has a different spec " + task.spec);
"exists, and has a different spec " + task.originalSpec);
}
log.info("Task {} already exists with spec {}", id, spec);
log.info("Task {} already exists with spec {}", id, originalSpec);
return null;
}
TaskController controller = null;
Expand All @@ -339,13 +353,13 @@ public Void call() throws Exception {
if (failure != null) {
log.info("Failed to create a new task {} with spec {}: {}",
id, spec, failure);
task = new ManagedTask(id, spec, null, TaskStateType.DONE);
task = new ManagedTask(id, originalSpec, spec, null, TaskStateType.DONE);
task.doneMs = time.milliseconds();
task.maybeSetError(failure);
tasks.put(id, task);
return null;
}
task = new ManagedTask(id, spec, controller, TaskStateType.PENDING);
task = new ManagedTask(id, originalSpec, spec, controller, TaskStateType.PENDING);
tasks.put(id, task);
long delayMs = task.startDelayMs(time.milliseconds());
task.startFuture = scheduler.schedule(executor, new RunTask(task), delayMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ public final long startMs() {
return startMs;
}

/**
* Get the deadline time of this task in ms
*/
public final long endMs() {
return startMs + durationMs;
}

/**
* Get the duration of this task in ms.
*/
Expand Down
71 changes: 57 additions & 14 deletions tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.kafka.trogdor.rest.JsonRestServer;
import org.apache.kafka.trogdor.rest.RequestConflictException;
import org.apache.kafka.trogdor.rest.StopWorkerRequest;
import org.apache.kafka.trogdor.rest.TaskDone;
import org.apache.kafka.trogdor.rest.WorkerDone;
import org.apache.kafka.trogdor.rest.WorkerRunning;
import org.apache.kafka.trogdor.task.NoOpTaskSpec;
Expand Down Expand Up @@ -109,6 +110,43 @@ public void testAgentGetStatus() throws Exception {
agent.waitForShutdown();
}

@Test
public void testCreateExpiredWorkerIsNotScheduled() throws Exception {
long initialTimeMs = 100;
long tickMs = 15;
final boolean[] toSleep = {true};
MockTime time = new MockTime(tickMs, initialTimeMs, 0) {
/**
* Modify sleep() to call super.sleep() every second call
* in order to avoid the endless loop in the tick() calls to the MockScheduler listener
*/
@Override
public void sleep(long ms) {
toSleep[0] = !toSleep[0];
if (toSleep[0])
super.sleep(ms);
}
};
MockScheduler scheduler = new MockScheduler(time);
Agent agent = createAgent(scheduler);
AgentClient client = new AgentClient.Builder().
maxTries(10).target("localhost", agent.port()).build();
AgentStatusResponse status = client.status();
assertEquals(Collections.emptyMap(), status.workers());
new ExpectedTasks().waitFor(client);

final NoOpTaskSpec fooSpec = new NoOpTaskSpec(10, 10);
client.createWorker(new CreateWorkerRequest(0, "foo", fooSpec));
long actualStartTimeMs = initialTimeMs + tickMs;
long doneMs = actualStartTimeMs + 2 * tickMs;
new ExpectedTasks().addTask(new ExpectedTaskBuilder("foo").
workerState(new WorkerDone("foo", fooSpec, actualStartTimeMs,
doneMs, null, "worker expired")).
taskState(new TaskDone(fooSpec, actualStartTimeMs, doneMs, "worker expired", false, null)).
build()).
waitFor(client);
}

@Test
public void testAgentCreateWorkers() throws Exception {
MockTime time = new MockTime(0, 0, 0);
Expand Down Expand Up @@ -171,53 +209,58 @@ public void testAgentCreateWorkers() throws Exception {

@Test
public void testAgentFinishesTasks() throws Exception {
MockTime time = new MockTime(0, 0, 0);
long startTimeMs = 2000;
MockTime time = new MockTime(0, startTimeMs, 0);
MockScheduler scheduler = new MockScheduler(time);
Agent agent = createAgent(scheduler);
AgentClient client = new AgentClient.Builder().
maxTries(10).target("localhost", agent.port()).build();
new ExpectedTasks().waitFor(client);

final NoOpTaskSpec fooSpec = new NoOpTaskSpec(10, 2);
final NoOpTaskSpec fooSpec = new NoOpTaskSpec(startTimeMs, 2);
long fooSpecStartTimeMs = startTimeMs;
client.createWorker(new CreateWorkerRequest(0, "foo", fooSpec));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
workerState(new WorkerRunning("foo", fooSpec, 0, new TextNode("active"))).
workerState(new WorkerRunning("foo", fooSpec, startTimeMs, new TextNode("active"))).
build()).
waitFor(client);

time.sleep(1);

final NoOpTaskSpec barSpec = new NoOpTaskSpec(2000, 900000);
client.createWorker(new CreateWorkerRequest(1, "bar", barSpec));
long barSpecWorkerId = 1;
long barSpecStartTimeMs = startTimeMs + 1;
final NoOpTaskSpec barSpec = new NoOpTaskSpec(startTimeMs, 900000);
client.createWorker(new CreateWorkerRequest(barSpecWorkerId, "bar", barSpec));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
workerState(new WorkerRunning("foo", fooSpec, 0, new TextNode("active"))).
workerState(new WorkerRunning("foo", fooSpec, fooSpecStartTimeMs, new TextNode("active"))).
build()).
addTask(new ExpectedTaskBuilder("bar").
workerState(new WorkerRunning("bar", barSpec, 1, new TextNode("active"))).
workerState(new WorkerRunning("bar", barSpec, barSpecStartTimeMs, new TextNode("active"))).
build()).
waitFor(client);

time.sleep(1);

// foo task expired
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
workerState(new WorkerDone("foo", fooSpec, 0, 2, new TextNode("done"), "")).
workerState(new WorkerDone("foo", fooSpec, fooSpecStartTimeMs, fooSpecStartTimeMs + 2, new TextNode("done"), "")).
build()).
addTask(new ExpectedTaskBuilder("bar").
workerState(new WorkerRunning("bar", barSpec, 1, new TextNode("active"))).
workerState(new WorkerRunning("bar", barSpec, barSpecStartTimeMs, new TextNode("active"))).
build()).
waitFor(client);

time.sleep(5);
client.stopWorker(new StopWorkerRequest(1));
client.stopWorker(new StopWorkerRequest(barSpecWorkerId));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
workerState(new WorkerDone("foo", fooSpec, 0, 2, new TextNode("done"), "")).
workerState(new WorkerDone("foo", fooSpec, fooSpecStartTimeMs, fooSpecStartTimeMs + 2, new TextNode("done"), "")).
build()).
addTask(new ExpectedTaskBuilder("bar").
workerState(new WorkerDone("bar", barSpec, 1, 7, new TextNode("done"), "")).
workerState(new WorkerDone("bar", barSpec, barSpecStartTimeMs, startTimeMs + 7, new TextNode("done"), "")).
build()).
waitFor(client);

Expand Down Expand Up @@ -348,7 +391,7 @@ public void testDestroyWorkers() throws Exception {
maxTries(10).target("localhost", agent.port()).build();
new ExpectedTasks().waitFor(client);

final NoOpTaskSpec fooSpec = new NoOpTaskSpec(10, 5);
final NoOpTaskSpec fooSpec = new NoOpTaskSpec(0, 5);
client.createWorker(new CreateWorkerRequest(0, "foo", fooSpec));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
Expand All @@ -363,7 +406,7 @@ public void testDestroyWorkers() throws Exception {
new ExpectedTasks().waitFor(client);
time.sleep(1);

final NoOpTaskSpec fooSpec2 = new NoOpTaskSpec(100, 1);
final NoOpTaskSpec fooSpec2 = new NoOpTaskSpec(2, 1);
client.createWorker(new CreateWorkerRequest(1, "foo", fooSpec2));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
Expand Down
Loading

0 comments on commit 625e0d8

Please sign in to comment.