Skip to content

Commit

Permalink
KAFKA-2811: add standby tasks
Browse files Browse the repository at this point in the history
guozhangwang
* added a new config param "num.standby.replicas" (the default value is 0).
* added a new abstract class AbstractTask
* added StandbyTask as a subclass of AbstractTask
* modified StreamTask to a subclass of AbstractTask
* StreamThread
  * standby tasks are created by calling StreamThread.addStandbyTask() from onPartitionsAssigned()
  * standby tasks are destroyed by calling StreamThread.removeStandbyTasks() from onPartitionRevoked()
  * In addStandbyTasks(), change log partitions are assigned to restoreConsumer.
  * In removeStandByTasks(), change log partitions are removed from restoreConsumer.
  * StreamThread polls change log records using restoreConsumer in the runLoop with timeout=0.
  * If records are returned, StreamThread calls StandbyTask.update and pass records to each standby tasks.

Author: Yasuhiro Matsuda <[email protected]>

Reviewers: Guozhang Wang

Closes apache#526 from ymatsuda/standby_task
  • Loading branch information
Yasuhiro Matsuda authored and guozhangwang committed Nov 16, 2015
1 parent 356544c commit 45e7f71
Show file tree
Hide file tree
Showing 12 changed files with 951 additions and 181 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ public class StreamingConfig extends AbstractConfig {
public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads";
private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing.";

/** <code>num.stream.threads</code> */
public static final String NUM_STANDBY_REPLICAS_CONFIG = "num.standby.replicas";
private static final String NUM_STANDBY_REPLICAS_DOC = "The number of standby replicas for each task.";

/** <code>buffered.records.per.partition</code> */
public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG = "buffered.records.per.partition";
private static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "The maximum number of records to buffer per partition.";
Expand Down Expand Up @@ -136,6 +140,11 @@ public class StreamingConfig extends AbstractConfig {
1,
Importance.LOW,
NUM_STREAM_THREADS_DOC)
.define(NUM_STANDBY_REPLICAS_CONFIG,
Type.INT,
0,
Importance.LOW,
NUM_STANDBY_REPLICAS_DOC)
.define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
Type.INT,
1000,
Expand Down Expand Up @@ -214,6 +223,7 @@ public StreamingConfig(Map<?, ?> props) {

public Map<String, Object> getConsumerConfigs(StreamThread streamThread) {
Map<String, Object> props = getConsumerConfigs();
props.put(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG, getInt(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG));
props.put(StreamingConfig.InternalConfig.STREAM_THREAD_INSTANCE, streamThread);
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, KafkaStreamingPartitionAssignor.class.getName());
return props;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.StreamingConfig;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.TaskId;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Set;

public abstract class AbstractTask {
protected final TaskId id;
protected final ProcessorTopology topology;
protected final ProcessorStateManager stateMgr;
protected final Set<TopicPartition> partitions;
protected ProcessorContext processorContext;

protected AbstractTask(TaskId id,
Consumer<byte[], byte[]> restoreConsumer,
ProcessorTopology topology,
StreamingConfig config,
Set<TopicPartition> partitions) {
this.id = id;
this.topology = topology;
this.partitions = partitions;

// create the processor state manager
try {
File stateFile = new File(config.getString(StreamingConfig.STATE_DIR_CONFIG), id.toString());
// if partitions is null, this is a standby task
this.stateMgr = new ProcessorStateManager(id.partition, stateFile, restoreConsumer, partitions == null);
} catch (IOException e) {
throw new KafkaException("Error while creating the state manager", e);
}
}

protected void initializeStateStores() {
for (StateStoreSupplier stateStoreSupplier : this.topology.stateStoreSuppliers()) {
StateStore store = stateStoreSupplier.get();
store.init(this.processorContext);
}
}

public final TaskId id() {
return id;
}

public final Set<TopicPartition> partitions() {
return this.partitions;
}

public final ProcessorTopology topology() {
return topology;
}

public final ProcessorContext context() {
return processorContext;
}

public abstract void commit();

public void close() {
try {
stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
} catch (IOException e) {
throw new KafkaException("Error while closing the state manager in processor context", e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,14 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
private static final Logger log = LoggerFactory.getLogger(KafkaStreamingPartitionAssignor.class);

private StreamThread streamThread;
private int numStandbyReplicas;
private Map<TopicPartition, Set<TaskId>> partitionToTaskIds;
private Set<TaskId> standbyTasks;

@Override
public void configure(Map<String, ?> configs) {
numStandbyReplicas = (Integer) configs.get(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG);

Object o = configs.get(StreamingConfig.InternalConfig.STREAM_THREAD_INSTANCE);
if (o == null) {
KafkaException ex = new KafkaException("StreamThread is not specified");
Expand Down Expand Up @@ -99,7 +102,6 @@ public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription
// - We try not to assign the same set of tasks to two different clients
// We do the assignment in one-pass. The result may not satisfy above all.
// 2. within each client, tasks are assigned to consumer clients in round-robin manner.

Map<UUID, Set<String>> consumersByClient = new HashMap<>();
Map<UUID, ClientState<TaskId>> states = new HashMap<>();

Expand Down Expand Up @@ -132,7 +134,7 @@ public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription
// Get partition groups from the partition grouper
Map<TaskId, Set<TopicPartition>> partitionGroups = streamThread.partitionGrouper.partitionGroups(metadata);

states = TaskAssignor.assign(states, partitionGroups.keySet(), 0); // TODO: enable standby tasks
states = TaskAssignor.assign(states, partitionGroups.keySet(), numStandbyReplicas);
Map<String, Assignment> assignment = new HashMap<>();

for (Map.Entry<UUID, Set<String>> entry : consumersByClient.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.nio.channels.OverlappingFileLockException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ProcessorStateManager {
Expand All @@ -51,13 +52,17 @@ public class ProcessorStateManager {
private final Consumer<byte[], byte[]> restoreConsumer;
private final Map<TopicPartition, Long> restoredOffsets;
private final Map<TopicPartition, Long> checkpointedOffsets;
private final boolean isStandby;
private final Map<String, StateRestoreCallback> restoreCallbacks; // used for standby tasks

public ProcessorStateManager(int partition, File baseDir, Consumer<byte[], byte[]> restoreConsumer) throws IOException {
public ProcessorStateManager(int partition, File baseDir, Consumer<byte[], byte[]> restoreConsumer, boolean isStandby) throws IOException {
this.partition = partition;
this.baseDir = baseDir;
this.stores = new HashMap<>();
this.restoreConsumer = restoreConsumer;
this.restoredOffsets = new HashMap<>();
this.isStandby = isStandby;
this.restoreCallbacks = isStandby ? new HashMap<String, StateRestoreCallback>() : null;

// create the state directory for this task if missing (we won't create the parent directory)
createStateDirectory(baseDir);
Expand Down Expand Up @@ -103,8 +108,6 @@ public void register(StateStore store, StateRestoreCallback stateRestoreCallback
if (this.stores.containsKey(store.name()))
throw new IllegalArgumentException("Store " + store.name() + " has already been registered.");

// ---- register the store ---- //

// check that the underlying change log topic exist or not
if (restoreConsumer.listTopics().containsKey(store.name())) {
boolean partitionNotFound = true;
Expand All @@ -124,48 +127,91 @@ public void register(StateStore store, StateRestoreCallback stateRestoreCallback

this.stores.put(store.name(), store);

if (isStandby) {
if (store.persistent())
restoreCallbacks.put(store.name(), stateRestoreCallback);
} else {
restoreActiveState(store, stateRestoreCallback);
}
}

private void restoreActiveState(StateStore store, StateRestoreCallback stateRestoreCallback) {

if (store == null)
throw new IllegalArgumentException("Store " + store.name() + " has not been registered.");

// ---- try to restore the state from change-log ---- //

// subscribe to the store's partition
TopicPartition storePartition = new TopicPartition(store.name(), partition);
if (!restoreConsumer.subscription().isEmpty()) {
throw new IllegalStateException("Restore consumer should have not subscribed to any partitions beforehand");
}
TopicPartition storePartition = new TopicPartition(store.name(), partition);
restoreConsumer.assign(Collections.singletonList(storePartition));

// calculate the end offset of the partition
// TODO: this is a bit hacky to first seek then position to get the end offset
restoreConsumer.seekToEnd(storePartition);
long endOffset = restoreConsumer.position(storePartition);
try {
// calculate the end offset of the partition
// TODO: this is a bit hacky to first seek then position to get the end offset
restoreConsumer.seekToEnd(storePartition);
long endOffset = restoreConsumer.position(storePartition);

// restore from the checkpointed offset of the change log if it is persistent and the offset exists;
// restore the state from the beginning of the change log otherwise
if (checkpointedOffsets.containsKey(storePartition)) {
restoreConsumer.seek(storePartition, checkpointedOffsets.get(storePartition));
} else {
restoreConsumer.seekToBeginning(storePartition);
}

// restore from the checkpointed offset of the change log if it is persistent and the offset exists;
// restore the state from the beginning of the change log otherwise
if (checkpointedOffsets.containsKey(storePartition) && store.persistent()) {
restoreConsumer.seek(storePartition, checkpointedOffsets.get(storePartition));
} else {
restoreConsumer.seekToBeginning(storePartition);
}
// restore its state from changelog records; while restoring the log end offset
// should not change since it is only written by this thread.
while (true) {
for (ConsumerRecord<byte[], byte[]> record : restoreConsumer.poll(100).records(storePartition)) {
stateRestoreCallback.restore(record.key(), record.value());
}

// restore its state from changelog records; while restoring the log end offset
// should not change since it is only written by this thread.
while (true) {
for (ConsumerRecord<byte[], byte[]> record : restoreConsumer.poll(100).records(storePartition)) {
stateRestoreCallback.restore(record.key(), record.value());
if (restoreConsumer.position(storePartition) == endOffset) {
break;
} else if (restoreConsumer.position(storePartition) > endOffset) {
throw new IllegalStateException("Log end offset should not change while restoring");
}
}

if (restoreConsumer.position(storePartition) == endOffset) {
break;
} else if (restoreConsumer.position(storePartition) > endOffset) {
throw new IllegalStateException("Log end offset should not change while restoring");
// record the restored offset for its change log partition
long newOffset = restoreConsumer.position(storePartition);
restoredOffsets.put(storePartition, newOffset);
} finally {
// un-assign the change log partition
restoreConsumer.assign(Collections.<TopicPartition>emptyList());
}
}

public Map<TopicPartition, Long> checkpointedOffsets() {
Map<TopicPartition, Long> partitionsAndOffsets = new HashMap<>();

for (Map.Entry<String, StateRestoreCallback> entry : restoreCallbacks.entrySet()) {
String storeName = entry.getKey();
TopicPartition storePartition = new TopicPartition(storeName, partition);

if (checkpointedOffsets.containsKey(storePartition)) {
partitionsAndOffsets.put(storePartition, checkpointedOffsets.get(storePartition));
} else {
partitionsAndOffsets.put(storePartition, -1L);
}
}
return partitionsAndOffsets;
}

public void updateStandbyStates(TopicPartition storePartition, List<ConsumerRecord<byte[], byte[]>> records) {
// restore states from changelog records
StateRestoreCallback restoreCallback = restoreCallbacks.get(storePartition.topic());

for (ConsumerRecord<byte[], byte[]> record : records) {
restoreCallback.restore(record.key(), record.value());
}
// record the restored offset for its change log partition
long newOffset = restoreConsumer.position(storePartition);
restoredOffsets.put(storePartition, newOffset);

// un-assign the change log partition
restoreConsumer.assign(Collections.<TopicPartition>emptyList());
}

public StateStore getStore(String name) {
Expand Down Expand Up @@ -224,6 +270,9 @@ public void close(Map<TopicPartition, Long> ackedOffsets) throws IOException {
checkpoint.write(checkpointOffsets);
}

// un-assign the change log partition
restoreConsumer.assign(Collections.<TopicPartition>emptyList());

// release the state directory directoryLock
directoryLock.release();
}
Expand Down
Loading

0 comments on commit 45e7f71

Please sign in to comment.