Skip to content

Commit

Permalink
support seamless config changes (apache#3051)
Browse files Browse the repository at this point in the history
  • Loading branch information
dclim authored and fjy committed Jun 3, 2016
1 parent 6d38dde commit a2290a8
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 34 deletions.
29 changes: 21 additions & 8 deletions docs/content/development/extensions-core/kafka-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ This service is provided in the `kafka-indexing-service` core extension (see
currently designated as an *experimental feature* and is subject to the usual
[experimental caveats](../experimental.html).

<div class="note info">
The Kafka indexing service uses the Java consumer that was introduced in Kafka 0.9. As there were protocol changes
made in this version, Kafka 0.9 consumers are not compatible with older brokers. Ensure that your Kafka brokers are
version 0.9 or better before using this service.
</div>

## Submitting a Supervisor Spec

The Kafka indexing service requires that the `kafka-indexing-service` extension be loaded on both the overlord and the
Expand Down Expand Up @@ -151,6 +157,15 @@ POST /druid/indexer/v1/supervisor
```
Use `Content-Type: application/json` and provide a supervisor spec in the request body.

Calling this endpoint when there is already an existing supervisor for the same dataSource will cause:

- The running supervisor to signal its managed tasks to stop reading and begin publishing.
- The running supervisor to exit.
- A new supervisor to be created using the configuration provided in the request body. This supervisor will retain the
existing publishing tasks and will create new tasks starting at the offsets the publishing tasks ended on.

Seamless schema migrations can thus be achieved by simply submitting the new schema using this endpoint.

#### Shutdown Supervisor
```
POST /druid/indexer/v1/supervisor/<supervisorId>/shutdown
Expand Down Expand Up @@ -239,11 +254,9 @@ return after all tasks have been signalled to stop but before the tasks finish p

### Schema/Configuration Changes

Following from the previous section, schema and configuration changes are managed by first shutting down the supervisor
with a call to the `POST /druid/indexer/v1/supervisor/<supervisorId>/shutdown` endpoint, waiting for the running tasks
to complete, and then submitting the updated schema via the `POST /druid/indexer/v1/supervisor` create supervisor
endpoint. The updated supervisor will begin reading from the offsets where the previous supervisor ended and no data
will be lost. If the updated schema is posted before the previously running tasks have completed, the supervisor will
detect that the tasks are no longer compatible with the new schema and will issue a shutdown command to the tasks which
may result in the current segments not being published. If this happens, the tasks based on the updated schema will
begin reading from the same starting offsets as the previous aborted tasks and no data will be lost.
Schema and configuration changes are handled by submitting the new supervisor spec via the same
`POST /druid/indexer/v1/supervisor` endpoint used to initially create the supervisor. The overlord will initiate a
graceful shutdown of the existing supervisor which will cause the tasks being managed by that supervisor to stop reading
and begin publishing their segments. A new supervisor will then be started which will create a new set of tasks that
will start reading from the offsets where the previous now-publishing tasks left off, but using the updated schema.
In this way, configuration changes can be applied without requiring any pause in ingestion.
Original file line number Diff line number Diff line change
Expand Up @@ -559,18 +559,15 @@ private void discoverTasks()
KafkaIndexTask kafkaTask = (KafkaIndexTask) task;
String taskId = task.getId();

// determine which task group this task belongs to and do a consistency check on partitions
Integer taskGroupId = null;
for (Integer partition : kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap().keySet()) {
if (taskGroupId == null) {
taskGroupId = getTaskGroupIdForPartition(partition);
} else if (!taskGroupId.equals(getTaskGroupIdForPartition(partition))) {
log.warn("Stopping task [%s] which does not match the expected partition allocation", taskId);
stopTask(taskId, false);
taskGroupId = null; // prevents the next block of code from adding the item to taskGroups
break;
}
}
// Determine which task group this task belongs to based on one of the partitions handled by this task. If we
// later determine that this task is actively reading, we will make sure that it matches our current partition
// allocation (getTaskGroupIdForPartition(partition) should return the same value for every partition being read
// by this task) and kill it if it is not compatible. If the task is instead found to be in the publishing
// state, we will permit it to complete even if it doesn't match our current partition allocation to support
// seamless schema migration.

Iterator<Integer> it = kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap().keySet().iterator();
Integer taskGroupId = (it.hasNext() ? getTaskGroupIdForPartition(it.next()) : null);

if (taskGroupId != null) {
// check to see if we already know about this task, either in [taskGroups] or in [pendingCompletionTaskGroups]
Expand All @@ -590,17 +587,29 @@ private void discoverTasks()
// update partitionGroups with the publishing task's offsets (if they are greater than what is existing)
// so that the next tasks will start reading from where this task left off
Map<Integer, Long> publishingTaskCurrentOffsets = getCurrentOffsets(taskId, true);
Map<Integer, Long> partitionOffsets = partitionGroups.get(taskGroupId);

for (Map.Entry<Integer, Long> entry : publishingTaskCurrentOffsets.entrySet()) {
Integer partition = entry.getKey();
Long offset = entry.getValue();
Map<Integer, Long> partitionOffsets = partitionGroups.get(getTaskGroupIdForPartition(partition));
if (partitionOffsets.get(partition) == null || partitionOffsets.get(partition) < offset) {
partitionOffsets.put(partition, offset);
}
}

} else {
for (Integer partition : kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap().keySet()) {
if (!taskGroupId.equals(getTaskGroupIdForPartition(partition))) {
log.warn("Stopping task [%s] which does not match the expected partition allocation", taskId);
stopTask(taskId, false);
taskGroupId = null;
break;
}
}

if (taskGroupId == null) {
continue;
}

if (!taskGroups.containsKey(taskGroupId)) {
log.debug("Creating new task group [%d]", taskGroupId);
taskGroups.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -966,6 +966,94 @@ public void testDiscoverExistingPublishingTask() throws Exception
Assert.assertEquals(Long.MAX_VALUE, (long) capturedTaskConfig.getEndPartitions().getPartitionOffsetMap().get(2));
}

@Test
public void testDiscoverExistingPublishingTaskWithDifferentPartitionAllocation() throws Exception
{
final TaskLocation location = new TaskLocation("testHost", 1234);

supervisor = getSupervisor(1, 1, true, "PT1H", null);
addSomeEvents(1);

Task task = createKafkaIndexTask(
"id1",
DATASOURCE,
"sequenceName-0",
new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 2, 0L)),
new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
null
);

Collection workItems = new ArrayList<>();
workItems.add(new TestTaskRunnerWorkItem(task.getId(), null, location));

Capture<KafkaIndexTask> captured = Capture.newInstance();
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(task)).anyTimes();
expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
expect(taskStorage.getTask("id1")).andReturn(Optional.of(task)).anyTimes();
expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
null
)
).anyTimes();
expect(taskClient.getStatus("id1")).andReturn(KafkaIndexTask.Status.PUBLISHING);
expect(taskClient.getCurrentOffsets("id1", false)).andReturn(ImmutableMap.of(0, 10L, 2, 30L));
expect(taskClient.getCurrentOffsets("id1", true)).andReturn(ImmutableMap.of(0, 10L, 2, 30L));
expect(taskQueue.add(capture(captured))).andReturn(true);

taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
replayAll();

supervisor.start();
supervisor.runInternal();
SupervisorReport report = supervisor.getStatus();
verifyAll();

Assert.assertEquals(DATASOURCE, report.getId());
Assert.assertTrue(report.getPayload() instanceof KafkaSupervisorReport.KafkaSupervisorReportPayload);

KafkaSupervisorReport.KafkaSupervisorReportPayload payload = (KafkaSupervisorReport.KafkaSupervisorReportPayload)
report.getPayload();

Assert.assertEquals(DATASOURCE, payload.getDataSource());
Assert.assertEquals(3600L, (long) payload.getDurationSeconds());
Assert.assertEquals(NUM_PARTITIONS, (int) payload.getPartitions());
Assert.assertEquals(1, (int) payload.getReplicas());
Assert.assertEquals(KAFKA_TOPIC, payload.getTopic());
Assert.assertEquals(0, payload.getActiveTasks().size());
Assert.assertEquals(1, payload.getPublishingTasks().size());

KafkaSupervisorReport.TaskReportData publishingReport = payload.getPublishingTasks().get(0);

Assert.assertEquals("id1", publishingReport.getId());
Assert.assertEquals(ImmutableMap.of(0, 0L, 2, 0L), publishingReport.getStartingOffsets());
Assert.assertEquals(ImmutableMap.of(0, 10L, 2, 30L), publishingReport.getCurrentOffsets());

KafkaIndexTask capturedTask = captured.getValue();
Assert.assertEquals(dataSchema, capturedTask.getDataSchema());
Assert.assertEquals(tuningConfig, capturedTask.getTuningConfig());

KafkaIOConfig capturedTaskConfig = capturedTask.getIOConfig();
Assert.assertEquals(kafkaHost, capturedTaskConfig.getConsumerProperties().get("bootstrap.servers"));
Assert.assertEquals("myCustomValue", capturedTaskConfig.getConsumerProperties().get("myCustomKey"));
Assert.assertEquals("sequenceName-0", capturedTaskConfig.getBaseSequenceName());
Assert.assertTrue("isUseTransaction", capturedTaskConfig.isUseTransaction());
Assert.assertFalse("pauseAfterRead", capturedTaskConfig.isPauseAfterRead());

// check that the new task was created with starting offsets matching where the publishing task finished
Assert.assertEquals(KAFKA_TOPIC, capturedTaskConfig.getStartPartitions().getTopic());
Assert.assertEquals(10L, (long) capturedTaskConfig.getStartPartitions().getPartitionOffsetMap().get(0));
Assert.assertEquals(0L, (long) capturedTaskConfig.getStartPartitions().getPartitionOffsetMap().get(1));
Assert.assertEquals(30L, (long) capturedTaskConfig.getStartPartitions().getPartitionOffsetMap().get(2));

Assert.assertEquals(KAFKA_TOPIC, capturedTaskConfig.getEndPartitions().getTopic());
Assert.assertEquals(Long.MAX_VALUE, (long) capturedTaskConfig.getEndPartitions().getPartitionOffsetMap().get(0));
Assert.assertEquals(Long.MAX_VALUE, (long) capturedTaskConfig.getEndPartitions().getPartitionOffsetMap().get(1));
Assert.assertEquals(Long.MAX_VALUE, (long) capturedTaskConfig.getEndPartitions().getPartitionOffsetMap().get(2));
}

@Test
public void testDiscoverExistingPublishingAndReadingTask() throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,13 @@ public boolean createAndStartSupervisor(SupervisorSpec spec)
return createAndStartSupervisorInternal(spec, true);
}

public void stopAndRemoveSupervisor(String id)
public void stopAndRemoveSupervisor(String id, boolean writeTombstone)
{
Pair<Supervisor, SupervisorSpec> pair = supervisors.get(id);
if (pair != null) {
metadataSupervisorManager.insert(id, new NoopSupervisorSpec()); // where NoopSupervisorSpec is a tombstone
if (writeTombstone) {
metadataSupervisorManager.insert(id, new NoopSupervisorSpec()); // where NoopSupervisorSpec is a tombstone
}
pair.lhs.stop(true);
supervisors.remove(id);
}
Expand Down Expand Up @@ -125,12 +127,14 @@ private boolean createAndStartSupervisorInternal(SupervisorSpec spec, boolean pe
{
String id = spec.getId();
if (!supervisors.containsKey(id)) {
Supervisor supervisor = spec.createSupervisor();
supervisor.start(); // try starting the supervisor first so we don't persist a bad spec

if (persistSpec) {
metadataSupervisorManager.insert(id, spec);
}

supervisors.put(id, Pair.of(spec.createSupervisor(), spec));
supervisors.get(id).lhs.start();
supervisors.put(id, Pair.of(supervisor, spec));
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,7 @@ public Response specPost(final SupervisorSpec spec)
public Response apply(SupervisorManager manager)
{
if (manager.hasSupervisor(spec.getId())) {
return Response.status(Response.Status.CONFLICT)
.entity(
ImmutableMap.of(
"error",
String.format("Supervisor already exists for [%s]", spec.getId())
)
).build();
manager.stopAndRemoveSupervisor(spec.getId(), false);
}

manager.createAndStartSupervisor(spec);
Expand Down Expand Up @@ -160,7 +154,7 @@ public Response apply(SupervisorManager manager)
.build();
}

manager.stopAndRemoveSupervisor(id);
manager.stopAndRemoveSupervisor(id, true);
return Response.ok(ImmutableMap.of("id", id)).build();
}
}
Expand Down

0 comments on commit a2290a8

Please sign in to comment.