Skip to content

Commit

Permalink
SAMZA-123: Move topic partition grouping to the AM and generalize
Browse files Browse the repository at this point in the history
  • Loading branch information
jghoman committed Jul 28, 2014
1 parent 7cecf0a commit da79b6f
Show file tree
Hide file tree
Showing 65 changed files with 2,335 additions and 857 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,5 @@ build
samza-test/state
docs/learn/documentation/0.7.0/api/javadocs
.DS_Store
out/
*.patch
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ project(":samza-kafka_$scalaVersion") {
compile "org.apache.zookeeper:zookeeper:$zookeeperVersion"
compile "org.codehaus.jackson:jackson-jaxrs:$jacksonVersion"
compile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion"
compile "org.codehaus.jackson:jackson-jaxrs:$jacksonVersion"
testCompile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion:test"
testCompile "junit:junit:$junitVersion"
testCompile "org.mockito:mockito-all:$mockitoVersion"
Expand Down
10 changes: 8 additions & 2 deletions docs/learn/documentation/0.7.0/container/samza-container.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,21 @@ public interface InitableTask {
}
{% endhighlight %}

How many instances of your task class are created depends on the number of partitions in the job's input streams. If your Samza job has ten partitions, there will be ten instantiations of your task class: one for each partition. The first task instance will receive all messages for partition one, the second instance will receive all messages for partition two, and so on.
By default, how many instances of your task class are created depends on the number of partitions in the job's input streams. If your Samza job has ten partitions, there will be ten instantiations of your task class: one for each partition. The first task instance will receive all messages for partition one, the second instance will receive all messages for partition two, and so on.

<img src="/img/0.7.0/learn/documentation/container/tasks-and-partitions.svg" alt="Illustration of tasks consuming partitions" class="diagram-large">

The number of partitions in the input streams is determined by the systems from which you are consuming. For example, if your input system is Kafka, you can specify the number of partitions when you create a topic from the command line or using the num.partitions in Kafka's server properties file.

If a Samza job has more than one input stream, the number of task instances for the Samza job is the maximum number of partitions across all input streams. For example, if a Samza job is reading from PageViewEvent (12 partitions), and ServiceMetricEvent (14 partitions), then the Samza job would have 14 task instances (numbered 0 through 13). Task instances 12 and 13 only receive events from ServiceMetricEvent, because there is no corresponding PageViewEvent partition.

There is [work underway](https://issues.apache.org/jira/browse/SAMZA-71) to make the assignment of partitions to tasks more flexible in future versions of Samza.
With this default approach to assigning input streams to task instances, Samza is effectively performing a group-by operation on the input streams with their partitions as the key. Other strategies for grouping input stream partitions are possible by implementing a new [SystemStreamPartitionGrouper](../api/javadocs/org/apache/samza/container/SystemStreamPartitionGrouper.html) and factory, and configuring the job to use it via the job.systemstreampartition.grouper.factory configuration value.

Samza provides the above-discussed per-partition grouper as well as the [GroupBySystemStreamPartitionGrouper](../api/javadocs/org/apache/samza/container/systemstreampartition/groupers/GroupBySystemStreamPartition), which provides a separate task class instance for every input stream partition, effectively grouping by the input stream itself. This provides maximum scalability in terms of how many containers can be used to process those input streams and is appropriate for very high volume jobs that need no grouping of the input streams.

Considering the above example of a PageViewEvent partitioned 12 ways and a ServiceMetricEvent partitioned 14 ways, the GroupBySystemStreamPartitionGrouper would create 12 + 14 = 26 task instances, which would then be distributed across the number of containers configured, as discussed below.

Note that once a job has been started using a particular SystemStreamPartitionGrouper and that job is using state or checkpointing, it is not possible to change that grouping in subsequent job starts, as the previous checkpoints and state information would likely be incorrect under the new grouping approach.

### Containers and resource allocation

Expand Down
40 changes: 16 additions & 24 deletions samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,58 +19,50 @@

package org.apache.samza.checkpoint;

import org.apache.samza.system.SystemStreamPartition;

import java.util.Collections;
import java.util.Map;

import org.apache.samza.system.SystemStream;

/**
* A checkpoint is a mapping of all the streams a job is consuming and the most recent current offset for each.
* It is used to restore a {@link org.apache.samza.task.StreamTask}, either as part of a job restart or as part
* of restarting a failed container within a running job.
*/
public class Checkpoint {
private final Map<SystemStream, String> offsets;
private final Map<SystemStreamPartition, String> offsets;

/**
* Constructs a new checkpoint based off a map of Samza stream offsets.
* @param offsets Map of Samza streams to their current offset.
*/
public Checkpoint(Map<SystemStream, String> offsets) {
public Checkpoint(Map<SystemStreamPartition, String> offsets) {
this.offsets = offsets;
}

/**
* Gets a unmodifiable view of the current Samza stream offsets.
* @return A unmodifiable view of a Map of Samza streams to their recorded offsets.
*/
public Map<SystemStream, String> getOffsets() {
public Map<SystemStreamPartition, String> getOffsets() {
return Collections.unmodifiableMap(offsets);
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((offsets == null) ? 0 : offsets.hashCode());
return result;
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof Checkpoint)) return false;

Checkpoint that = (Checkpoint) o;

if (offsets != null ? !offsets.equals(that.offsets) : that.offsets != null) return false;

return true;
}

@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Checkpoint other = (Checkpoint) obj;
if (offsets == null) {
if (other.offsets != null)
return false;
} else if (!offsets.equals(other.offsets))
return false;
return true;
public int hashCode() {
return offsets != null ? offsets.hashCode() : 0;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@

package org.apache.samza.checkpoint;

import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.container.TaskName;

import java.util.Map;

/**
* CheckpointManagers read and write {@link org.apache.samza.checkpoint.Checkpoint} to some
Expand All @@ -30,23 +33,38 @@ public interface CheckpointManager {

/**
* Registers this manager to write checkpoints of a specific Samza stream partition.
* @param partition Specific Samza stream partition of which to write checkpoints for.
* @param taskName Specific Samza taskName of which to write checkpoints for.
*/
public void register(Partition partition);
public void register(TaskName taskName);

/**
* Writes a checkpoint based on the current state of a Samza stream partition.
* @param partition Specific Samza stream partition of which to write a checkpoint of.
* @param taskName Specific Samza taskName of which to write a checkpoint of.
* @param checkpoint Reference to a Checkpoint object to store offset data in.
*/
public void writeCheckpoint(Partition partition, Checkpoint checkpoint);
public void writeCheckpoint(TaskName taskName, Checkpoint checkpoint);

/**
* Returns the last recorded checkpoint for a specified Samza stream partition.
* @param partition Specific Samza stream partition for which to get the last checkpoint of.
* Returns the last recorded checkpoint for a specified taskName.
* @param taskName Specific Samza taskName for which to get the last checkpoint of.
* @return A Checkpoint object with the recorded offset data of the specified partition.
*/
public Checkpoint readLastCheckpoint(Partition partition);
public Checkpoint readLastCheckpoint(TaskName taskName);

/**
* Read the taskName to partition mapping that is being maintained by this CheckpointManager
*
* @return TaskName to task log partition mapping, or an empty map if there were no messages.
*/
public Map<TaskName, Integer> readChangeLogPartitionMapping();

/**
* Write the taskName to partition mapping that is being maintained by this CheckpointManager
*
* @param mapping Each TaskName's partition within the changelog
*/
public void writeChangeLogPartitionMapping(Map<TaskName, Integer> mapping);

public void stop();

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,32 @@

package org.apache.samza.container;

import java.util.Collection;

import org.apache.samza.Partition;
import org.apache.samza.config.Config;

import java.util.Collection;
import java.util.Collections;

/**
* A SamzaContainerContext maintains per-container information for the tasks it executes.
*/
public class SamzaContainerContext {
public final String name;
public final Config config;
public final Collection<Partition> partitions;
public final Collection<TaskName> taskNames;

/**
* An immutable context object that can passed to tasks to give them information
* about the container in which they are executing.
* @param name The name of the container (either a YARN AM or SamzaContainer).
* @param config The job configuration.
* @param partitions The set of input partitions assigned to this container.
* @param taskNames The set of taskName keys for which this container is responsible.
*/
public SamzaContainerContext(
String name,
Config config,
Collection<Partition> partitions) {
Collection<TaskName> taskNames) {
this.name = name;
this.config = config;
this.partitions = partitions;
this.taskNames = Collections.unmodifiableCollection(taskNames);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.container;

import org.apache.samza.system.SystemStreamPartition;

import java.util.Map;
import java.util.Set;

/**
* Group a set of SystemStreamPartitions into logical taskNames that share a common characteristic, defined
* by the implementation. Each taskName has a key that uniquely describes what sets may be in it, but does
* not generally enumerate the elements of those sets. For example, a SystemStreamPartitionGrouper that
* groups SystemStreamPartitions (each with 4 partitions) by their partition, would end up generating
* four TaskNames: 0, 1, 2, 3. These TaskNames describe the partitions but do not list all of the
* SystemStreamPartitions, which allows new SystemStreamPartitions to be added later without changing
* the definition of the TaskNames, assuming these new SystemStreamPartitions do not have more than
* four partitions. On the other hand, a SystemStreamPartitionGrouper that wanted each SystemStreamPartition
* to be its own, unique group would use the SystemStreamPartition's entire description to generate
* the TaskNames.
*/
public interface SystemStreamPartitionGrouper {
public Map<TaskName, Set<SystemStreamPartition>> group(Set<SystemStreamPartition> ssps);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.container;

import org.apache.samza.config.Config;

/**
* Return an instance a SystemStreamPartitionGrouper per the particular implementation
*/
public interface SystemStreamPartitionGrouperFactory {
public SystemStreamPartitionGrouper getSystemStreamPartitionGrouper(Config config);
}
63 changes: 63 additions & 0 deletions samza-api/src/main/java/org/apache/samza/container/TaskName.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.container;

/**
* A unique identifier of a set of a SystemStreamPartitions that have been grouped by
* a {@link org.apache.samza.container.SystemStreamPartitionGrouper}. The
* SystemStreamPartitionGrouper determines the TaskName for each set it creates.
*/
public class TaskName implements Comparable<TaskName> {
private final String taskName;

public String getTaskName() {
return taskName;
}

public TaskName(String taskName) {
this.taskName = taskName;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

TaskName taskName1 = (TaskName) o;

if (!taskName.equals(taskName1.taskName)) return false;

return true;
}

@Override
public int hashCode() {
return taskName.hashCode();
}

@Override
public String toString() {
return taskName;
}

@Override
public int compareTo(TaskName that) {
return taskName.compareTo(that.taskName);
}
}
13 changes: 10 additions & 3 deletions samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.samza.job;

import org.apache.samza.config.Config;
import org.apache.samza.container.TaskName;
import org.apache.samza.system.SystemStreamPartition;

import java.util.Map;
Expand All @@ -30,12 +31,13 @@
* such as YARN or the LocalJobRunner.
*/
public abstract class CommandBuilder {
protected Set<SystemStreamPartition> systemStreamPartitions;
protected Map<TaskName, Set<SystemStreamPartition>> taskNameToSystemStreamPartitionsMapping;
protected Map<TaskName, Integer> taskNameToChangeLogPartitionMapping;
protected String name;
protected Config config;

public CommandBuilder setStreamPartitions(Set<SystemStreamPartition> ssp) {
this.systemStreamPartitions = ssp;
public CommandBuilder setTaskNameToSystemStreamPartitionsMapping(Map<TaskName, Set<SystemStreamPartition>> systemStreamPartitionTaskNames) {
this.taskNameToSystemStreamPartitionsMapping = systemStreamPartitionTaskNames;
return this;
}

Expand All @@ -54,6 +56,11 @@ public CommandBuilder setConfig(Config config) {
return this;
}

public CommandBuilder setTaskNameToChangeLogPartitionMapping(Map<TaskName, Integer> mapping) {
this.taskNameToChangeLogPartitionMapping = mapping;
return this;
}

public abstract String buildCommand();

public abstract Map<String, String> buildEnvironment();
Expand Down
Loading

0 comments on commit da79b6f

Please sign in to comment.