Skip to content

Commit

Permalink
SAMZA-2303: Exclude side inputs when handling end-of-stream and water…
Browse files Browse the repository at this point in the history
…marks for high-level (apache#1506)

Symptom: End-of-stream and watermarks are not properly propagated through Samza when side inputs are used.
This prevents many tests from using the TestRunner framework, since the TestRunner framework relies on having tasks shut themselves down based on end-of-stream messages. Being able to use TestRunner is helpful because it significantly decreases test times.

Cause: OperatorImplGraph builds EndOfStreamStates and WatermarkStates objects with all of the input SSPs from the job model. That includes side-input SSPs. However, high-level operator tasks aren't given messages from side-input SSPs, so high-level operators should not need to include handling for end-of-stream and watermarks. The result of this issue is that end-of-stream and watermark handling tries to include side-inputs but never updates those states, which can result in not exiting properly (end-of-stream) and not correctly calculating watermarks.

Changes:
1. Pass set of SSPs excluding side-inputs to high-level operators so that they don't read directly from the task model which does have side-inputs. High-level operators will then handle end-of-stream and watermark propagation without considering side-input SSPs.
2. Change InMemoryManager to only use IncomingMessageEnvelope.END_OF_STREAM_OFFSET when the taskName in the EndOfStreamMessage is null. This prevents the issue with SAMZA-2300 which causes end-of-stream messages to not get properly get aggregated and then broadcast to all partitions (see SAMZA-2300 for more details). Some existing tests would fail without this change.
3. Add unique app.id in TestRunner for each test. This helps prevents clashes between different tests. For example, ControlMessageSender has a static cache keyed by stream id of intermediate streams, and multiple tests could end up using the same key in that cache. By using a unique app id, the intermediate streams are unique, so multiple tests won't use the same key in the cache.

API changes (impacts testing framework only):
1. The default app.id used for tests executed by TestRunner is set to the "in-memory scope", which is a string that is randomly generated for each test. Before this change, the app.id was not set.
2. InMemoryManager only uses IncomingMessageEnvelope.END_OF_STREAM_OFFSET when the EndOfStreamMessage has a null taskName. Before this change, InMemoryManager used IncomingMessageEnvelope.END_OF_STREAM_OFFSET for all EndOfStreamMessages.

Upgrade/usage instructions:
1. If tests are written using TestRunner, and those tests rely on app.id being unset, then those will need to be updated to use/read the new app.id. It isn't expected to be a common use case that tests rely on app.id.
2. If the in-memory system is being used (which includes tests written using TestRunner), and it is expected that the in-memory system sets END_OF_STREAM_OFFSET for messages when the taskName is non-null, then that usage will need to be removed. The taskName is intended for use by intermediate streams, so it shouldn't be used outside of Samza internals anyways.
  • Loading branch information
cameronlee314 authored Jul 1, 2021
1 parent 337628e commit 4c11067
Show file tree
Hide file tree
Showing 12 changed files with 393 additions and 205 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,24 @@

import org.apache.samza.job.model.JobModel;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.SystemStreamPartition;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public class InternalTaskContext {

/**
* This class is used for passing objects around for the implementation of the high-level API.
* 1) Container for objects that need to be passed between different components in the implementation of the high-level
* API.
* 2) The implementation of the high-level API is built on top of the low-level API. The low-level API only exposes
* {@link TaskContext}, but the implementation of the high-level API needs some other internal Samza components (e.g.
* {@link StreamMetadataCache}. We internally make these components available through {@link TaskContextImpl} so that we
* can do a cast to access the components. This class hides some of the messiness of casting. It's still not ideal to
* need to do any casting, even in this class.
*/
public class InternalTaskContext {
private final Context context;
private final Map<String, Object> objectRegistry = new HashMap<>();

Expand All @@ -46,11 +58,22 @@ public Context getContext() {
return context;
}

/**
* TODO: The public {@link JobContext} exposes {@link JobModel} now, so can this internal method be replaced by the
* public API?
*/
public JobModel getJobModel() {
return ((TaskContextImpl) this.context.getTaskContext()).getJobModel();
}

public StreamMetadataCache getStreamMetadataCache() {
return ((TaskContextImpl) this.context.getTaskContext()).getStreamMetadataCache();
}

/**
* See {@link TaskContextImpl#getSspsExcludingSideInputs()}.
*/
public Set<SystemStreamPartition> getSspsExcludingSideInputs() {
return ((TaskContextImpl) this.context.getTaskContext()).getSspsExcludingSideInputs();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,29 @@
import org.apache.samza.table.ReadWriteTable;
import org.apache.samza.table.TableManager;

import java.util.Set;
import java.util.function.Function;


/**
* This class provides the implementation for the public {@link TaskContext} interface.
* It also allows us to pass certain internal Samza components around so that the implementation of the high-level API
* can use them (see InternalTaskContext for some more details).
*/
public class TaskContextImpl implements TaskContext {
private final TaskModel taskModel;
private final MetricsRegistry taskMetricsRegistry;
private final Function<String, KeyValueStore> keyValueStoreProvider;
private final TableManager tableManager;
private final CallbackScheduler callbackScheduler;
private final OffsetManager offsetManager;

// The instance variables below are not used for implementing any public API methods. They are here so that we can
// pass some internal components over to the implementation of the high-level API. See InternalTaskContext.

private final JobModel jobModel;
private final StreamMetadataCache streamMetadataCache;
private final Set<SystemStreamPartition> sspsExcludingSideInputs;

public TaskContextImpl(TaskModel taskModel,
MetricsRegistry taskMetricsRegistry,
Expand All @@ -49,7 +60,8 @@ public TaskContextImpl(TaskModel taskModel,
CallbackScheduler callbackScheduler,
OffsetManager offsetManager,
JobModel jobModel,
StreamMetadataCache streamMetadataCache) {
StreamMetadataCache streamMetadataCache,
Set<SystemStreamPartition> sspsExcludingSideInputs) {
this.taskModel = taskModel;
this.taskMetricsRegistry = taskMetricsRegistry;
this.keyValueStoreProvider = keyValueStoreProvider;
Expand All @@ -58,6 +70,7 @@ public TaskContextImpl(TaskModel taskModel,
this.offsetManager = offsetManager;
this.jobModel = jobModel;
this.streamMetadataCache = streamMetadataCache;
this.sspsExcludingSideInputs = sspsExcludingSideInputs;
}

@Override
Expand Down Expand Up @@ -101,4 +114,14 @@ public JobModel getJobModel() {
public StreamMetadataCache getStreamMetadataCache() {
return this.streamMetadataCache;
}
}

/**
* Returns the {@link SystemStreamPartition}s excluding the side-input SSPs. For the high-level API, watermarks and
* end-of-stream messages are propagated based on their input SSPs. However, the Samza framework does not give side
* input messages to the high-level operator tasks. Therefore, the operators need to know the input SSPs excluding the
* side input SSPs. See SAMZA-2303 for more details.
*/
public Set<SystemStreamPartition> getSspsExcludingSideInputs() {
return this.sspsExcludingSideInputs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,17 +111,13 @@ public OperatorImplGraph(OperatorSpecGraph specGraph, Context context, Clock clo
LOG.info("{} has {} producer tasks.", stream, count);
});

// set states for end-of-stream
// set states for end-of-stream; don't include side inputs (see SAMZA-2303)
internalTaskContext.registerObject(EndOfStreamStates.class.getName(),
new EndOfStreamStates(
internalTaskContext.getContext().getTaskContext().getTaskModel().getSystemStreamPartitions(),
producerTaskCounts));
// set states for watermark
new EndOfStreamStates(internalTaskContext.getSspsExcludingSideInputs(), producerTaskCounts));
// set states for watermark; don't include side inputs (see SAMZA-2303)
internalTaskContext.registerObject(WatermarkStates.class.getName(),
new WatermarkStates(
internalTaskContext.getContext().getTaskContext().getTaskModel().getSystemStreamPartitions(),
producerTaskCounts,
context.getContainerContext().getContainerMetricsRegistry()));
new WatermarkStates(internalTaskContext.getSspsExcludingSideInputs(), producerTaskCounts,
context.getContainerContext().getContainerMetricsRegistry()));

specGraph.getInputOperators().forEach((streamId, inputOpSpec) -> {
SystemStream systemStream = streamConfig.streamIdToSystemStream(streamId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ void put(SystemStreamPartition ssp, Object key, Object message) {
List<IncomingMessageEnvelope> messages = bufferedMessages.get(ssp);
String offset = String.valueOf(messages.size());

if (message instanceof EndOfStreamMessage) {
if (shouldUseEndOfStreamOffset(message)) {
offset = IncomingMessageEnvelope.END_OF_STREAM_OFFSET;
}

Expand Down Expand Up @@ -224,4 +224,22 @@ private List<IncomingMessageEnvelope> poll(SystemStreamPartition ssp, String off

return ImmutableList.copyOf(messageEnvelopesForSSP.subList(startingOffset, messageEnvelopesForSSP.size()));
}

/**
* We don't always want to use {@link IncomingMessageEnvelope#END_OF_STREAM_OFFSET} for all
* {@link EndOfStreamMessage}s. Certain control message flows (e.g. end-of-stream) have an aggregation partition,
* which needs to listen for messages from all other partitions. These aggregation messages are marked by the task
* name being non-null. If we use {@link IncomingMessageEnvelope#END_OF_STREAM_OFFSET} for the aggregation messages,
* then the aggregation partition would stop listening once it got the message from one of the tasks, but that means
* it would miss the aggregation messages from all other tasks. See SAMZA-2300 for more details.
* One other note: If there is a serializer set for the stream, then by the time the message gets to this check, it
* will be a byte array, so this check will not return true, even if the deserialized message was an
* {@link EndOfStreamMessage}. So far this isn't a problem, because we only really need this to return true for
* input streams (not intermediate streams), and in-memory input stream data doesn't get serialized. For intermediate
* streams, we don't need END_OF_STREAM_OFFSET to be used since the high-level operators take care of end-of-stream
* messages based on MessageType.
*/
private static boolean shouldUseEndOfStreamOffset(Object message) {
return (message instanceof EndOfStreamMessage) && ((EndOfStreamMessage) message).getTaskName() == null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class TaskInstance(
}
})
private val taskContext = new TaskContextImpl(taskModel, metrics.registry, kvStoreSupplier, tableManager,
new CallbackSchedulerImpl(epochTimeScheduler), offsetManager, jobModel, streamMetadataCache)
new CallbackSchedulerImpl(epochTimeScheduler), offsetManager, jobModel, streamMetadataCache, systemStreamPartitions)
// need separate field for this instead of using it through Context, since Context throws an exception if it is null
private val applicationTaskContextOption = applicationTaskContextFactoryOption
.map(_.create(externalContextOption.orNull, jobContext, containerContext, taskContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void setup() {
MockitoAnnotations.initMocks(this);
taskContext =
new TaskContextImpl(taskModel, taskMetricsRegistry, keyValueStoreProvider, tableManager, callbackScheduler,
offsetManager, null, null);
offsetManager, null, null, null);
when(this.taskModel.getTaskName()).thenReturn(TASK_NAME);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.samza.container.TaskName;
import org.apache.samza.context.Context;
import org.apache.samza.context.MockContext;
import org.apache.samza.context.TaskContextImpl;
import org.apache.samza.system.descriptors.GenericInputDescriptor;
import org.apache.samza.system.descriptors.GenericSystemDescriptor;
import org.apache.samza.job.model.TaskModel;
Expand Down Expand Up @@ -93,11 +94,13 @@ public void setup() {
Serde storeKeySerde = new TimeSeriesKeySerde(new IntegerSerde());
Serde storeValSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde());

SystemStreamPartition ssp = new SystemStreamPartition("kafka", "integers", new Partition(0));
TaskModel taskModel = mock(TaskModel.class);
when(taskModel.getSystemStreamPartitions()).thenReturn(ImmutableSet
.of(new SystemStreamPartition("kafka", "integers", new Partition(0))));
when(taskModel.getSystemStreamPartitions()).thenReturn(ImmutableSet.of(ssp));
when(taskModel.getTaskName()).thenReturn(new TaskName("task 1"));
when(this.context.getTaskContext().getTaskModel()).thenReturn(taskModel);
when(((TaskContextImpl) this.context.getTaskContext()).getSspsExcludingSideInputs()).thenReturn(
ImmutableSet.of(ssp));
when(this.context.getTaskContext().getTaskMetricsRegistry()).thenReturn(new MetricsRegistryMap());
when(this.context.getContainerContext().getContainerMetricsRegistry()).thenReturn(new MetricsRegistryMap());
when(this.context.getTaskContext().getStore("jobName-jobId-window-w1"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public void testConsumerRespectsOffset() {
}

@Test
public void testEndOfStreamMessage() {
public void testEndOfStreamMessageWithTask() {
EndOfStreamMessage eos = new EndOfStreamMessage("test-task");

produceMessages(eos);
Expand All @@ -139,6 +139,24 @@ public void testEndOfStreamMessage() {
List<IncomingMessageEnvelope> results = consumeRawMessages(sspsToPoll);

assertEquals(1, results.size());
assertEquals("test-task", ((EndOfStreamMessage) results.get(0).getMessage()).getTaskName());
assertFalse(results.get(0).isEndOfStream());
}

@Test
public void testEndOfStreamMessageWithoutTask() {
EndOfStreamMessage eos = new EndOfStreamMessage();

produceMessages(eos);

Set<SystemStreamPartition> sspsToPoll = IntStream.range(0, PARTITION_COUNT)
.mapToObj(partition -> new SystemStreamPartition(SYSTEM_STREAM, new Partition(partition)))
.collect(Collectors.toSet());

List<IncomingMessageEnvelope> results = consumeRawMessages(sspsToPoll);

assertEquals(1, results.size());
assertNull(((EndOfStreamMessage) results.get(0).getMessage()).getTaskName());
assertTrue(results.get(0).isEndOfStream());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,14 @@ private TestRunner() {
this.configs = new HashMap<>();
this.inMemoryScope = RandomStringUtils.random(10, true, true);
configs.put(ApplicationConfig.APP_NAME, APP_NAME);
/*
* Use a unique app id to help make sure a test execution is isolated from others.
* A concrete example of where this helps is to avoid an issue with ControlMessageSender. It has a static cache
* keyed by stream id to save partition counts for intermediate streams. This means that different tests can
* collide in this cache if they use the same intermediate stream names. Having a unique app id makes the
* intermediate streams unique across tests.
*/
configs.put(ApplicationConfig.APP_ID, this.inMemoryScope);
configs.put(JobConfig.PROCESSOR_ID, "1");
configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
configs.put(JobConfig.STARTPOINT_METADATA_STORE_FACTORY, InMemoryMetadataStoreFactory.class.getCanonicalName());
Expand Down
Loading

0 comments on commit 4c11067

Please sign in to comment.