Skip to content

Commit

Permalink
SAMZA-1089: Runner should support kill and status commands
Browse files Browse the repository at this point in the history
Author: Jacob Maes <[email protected]>

Reviewers: Prateek Maheshwari <[email protected]>,Xinyu Liu <[email protected]>

Closes apache#106 from jmakes/samza-1089-2
  • Loading branch information
Jacob Maes committed Apr 3, 2017
1 parent 101ca43 commit 311f9d1
Show file tree
Hide file tree
Showing 18 changed files with 641 additions and 157 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@

import java.lang.reflect.Constructor;
import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.config.ConfigException;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigException;
import org.apache.samza.job.ApplicationStatus;
import org.apache.samza.system.StreamSpec;


Expand Down Expand Up @@ -79,12 +80,27 @@ public ApplicationRunner(Config config) {
}

/**
* Method to be invoked to deploy and run the actual Samza jobs to execute {@link StreamApplication}
* Deploy and run the Samza jobs to execute {@link StreamApplication}
*
* @param streamApp the user-defined {@link StreamApplication} object
*/
public abstract void run(StreamApplication streamApp);

/**
* Kill the Samza jobs represented by {@link StreamApplication}
*
* @param streamApp the user-defined {@link StreamApplication} object
*/
public abstract void kill(StreamApplication streamApp);

/**
* Get the collective status of the Samza jobs represented by {@link StreamApplication}.
* Returns {@link ApplicationStatus#Running} if any of the jobs are running.
*
* @param streamApp the user-defined {@link StreamApplication} object
*/
public abstract ApplicationStatus status(StreamApplication streamApp);

/**
* Constructs a {@link StreamSpec} from the configuration for the specified streamId.
*
Expand All @@ -104,5 +120,5 @@ public ApplicationRunner(Config config) {
* @param streamId The logical identifier for the stream in Samza.
* @return The {@link StreamSpec} instance.
*/
public abstract StreamSpec getStream(String streamId);
public abstract StreamSpec getStreamSpec(String streamId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.samza.SamzaException;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.util.Util;


/**
* a java version of the system config
Expand Down Expand Up @@ -60,4 +66,37 @@ public List<String> getSystemNames() {
}
return systemNames;
}

/**
* Get {@link SystemAdmin} instances for all the systems defined in this config.
*
* @return map of system name to {@link SystemAdmin}
*/
public Map<String, SystemAdmin> getSystemAdmins() {
return getSystemFactories().entrySet()
.stream()
.collect(Collectors.toMap(systemNameToFactoryEntry -> systemNameToFactoryEntry.getKey(),
systemNameToFactoryEntry -> systemNameToFactoryEntry.getValue()
.getAdmin(systemNameToFactoryEntry.getKey(), this)));
}

/**
* Get {@link SystemFactory} instances for all the systems defined in this config.
*
* @return a map from system name to {@link SystemFactory}
*/
public Map<String, SystemFactory> getSystemFactories() {
Map<String, SystemFactory> systemFactories = getSystemNames().stream().collect(Collectors.toMap(
systemName -> systemName,
systemName -> {
String systemFactoryClassName = getSystemFactory(systemName);
if (systemFactoryClassName == null) {
throw new SamzaException(
String.format("A stream uses system %s, which is missing from the configuration.", systemName));
}
return Util.getObj(systemFactoryClassName);
}));

return systemFactories;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,16 @@
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.JavaSystemConfig;
import org.apache.samza.config.JobConfig;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.StreamGraph;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -56,23 +50,20 @@ public class ExecutionPlanner {
private static final Logger log = LoggerFactory.getLogger(ExecutionPlanner.class);

private final Config config;
private final StreamManager streamManager;

public ExecutionPlanner(Config config) {
public ExecutionPlanner(Config config, StreamManager streamManager) {
this.config = config;
this.streamManager = streamManager;
}

public ProcessorGraph plan(StreamGraph streamGraph) throws Exception {
Map<String, SystemAdmin> sysAdmins = getSystemAdmins(config);

// create physical processors based on stream graph
ProcessorGraph processorGraph = createProcessorGraph(streamGraph);

if (!processorGraph.getIntermediateStreams().isEmpty()) {
// figure out the partitions for internal streams
calculatePartitions(streamGraph, processorGraph, sysAdmins);

// create the streams
createStreams(processorGraph, sysAdmins);
calculatePartitions(streamGraph, processorGraph);
}

return processorGraph;
Expand Down Expand Up @@ -110,9 +101,9 @@ public ProcessorGraph plan(StreamGraph streamGraph) throws Exception {
/**
* Figure out the number of partitions of all streams
*/
/* package private */ void calculatePartitions(StreamGraph streamGraph, ProcessorGraph processorGraph, Map<String, SystemAdmin> sysAdmins) {
/* package private */ void calculatePartitions(StreamGraph streamGraph, ProcessorGraph processorGraph) {
// fetch the external streams partition info
updateExistingPartitions(processorGraph, sysAdmins);
updateExistingPartitions(processorGraph, streamManager);

// calculate the partitions for the input streams of join operators
calculateJoinInputPartitions(streamGraph, processorGraph);
Expand All @@ -127,9 +118,9 @@ public ProcessorGraph plan(StreamGraph streamGraph) throws Exception {
/**
* Fetch the partitions of source/sink streams and update the StreamEdges.
* @param processorGraph ProcessorGraph
* @param sysAdmins mapping from system name to the {@link SystemAdmin}
* @param streamManager the {@StreamManager} to interface with the streams.
*/
/* package private */ static void updateExistingPartitions(ProcessorGraph processorGraph, Map<String, SystemAdmin> sysAdmins) {
/* package private */ static void updateExistingPartitions(ProcessorGraph processorGraph, StreamManager streamManager) {
Set<StreamEdge> existingStreams = new HashSet<>();
existingStreams.addAll(processorGraph.getSources());
existingStreams.addAll(processorGraph.getSinks());
Expand All @@ -146,14 +137,12 @@ public ProcessorGraph plan(StreamGraph streamGraph) throws Exception {
Map<String, StreamEdge> streamToStreamEdge = new HashMap<>();
// create the stream name to StreamEdge mapping for this system
streamEdges.forEach(streamEdge -> streamToStreamEdge.put(streamEdge.getSystemStream().getStream(), streamEdge));
SystemAdmin systemAdmin = sysAdmins.get(systemName);
// retrieve the metadata for the streams in this system
Map<String, SystemStreamMetadata> streamToMetadata = systemAdmin.getSystemStreamMetadata(streamToStreamEdge.keySet());
// retrieve the partition counts for the streams in this system
Map<String, Integer> streamToPartitionCount = streamManager.getStreamPartitionCounts(systemName, streamToStreamEdge.keySet());
// set the partitions of a stream to its StreamEdge
streamToMetadata.forEach((stream, data) -> {
int partitions = data.getSystemStreamPartitionMetadata().size();
streamToStreamEdge.get(stream).setPartitionCount(partitions);
log.debug("Partition count is {} for stream {}", partitions, stream);
streamToPartitionCount.forEach((stream, partitionCount) -> {
streamToStreamEdge.get(stream).setPartitionCount(partitionCount);
log.debug("Partition count is {} for stream {}", partitionCount, stream);
});
}
}
Expand Down Expand Up @@ -283,55 +272,8 @@ private static void validatePartitions(ProcessorGraph processorGraph) {
}
}

private static void createStreams(ProcessorGraph graph, Map<String, SystemAdmin> sysAdmins) {
Multimap<String, StreamSpec> streamsToCreate = HashMultimap.create();
graph.getIntermediateStreams().forEach(edge -> {
StreamSpec streamSpec = createStreamSpec(edge);
streamsToCreate.put(edge.getSystemStream().getSystem(), streamSpec);
});

for (Map.Entry<String, Collection<StreamSpec>> entry : streamsToCreate.asMap().entrySet()) {
String systemName = entry.getKey();
SystemAdmin systemAdmin = sysAdmins.get(systemName);

for (StreamSpec stream : entry.getValue()) {
log.info("Creating stream {} with partitions {} on system {}",
new Object[]{stream.getPhysicalName(), stream.getPartitionCount(), systemName});
systemAdmin.createStream(stream);
}
}
}

/* package private */ static int maxPartition(Collection<StreamEdge> edges) {
return edges.stream().map(StreamEdge::getPartitionCount).reduce(Integer::max).orElse(StreamEdge.PARTITIONS_UNKNOWN);
}

private static StreamSpec createStreamSpec(StreamEdge edge) {
StreamSpec orgSpec = edge.getStreamSpec();
return orgSpec.copyWithPartitionCount(edge.getPartitionCount());
}

private static Map<String, SystemAdmin> getSystemAdmins(Config config) {
return getSystemFactories(config).entrySet()
.stream()
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue().getAdmin(entry.getKey(), config)));
}

private static Map<String, SystemFactory> getSystemFactories(Config config) {
Map<String, SystemFactory> systemFactories =
getSystemNames(config).stream().collect(Collectors.toMap(systemName -> systemName, systemName -> {
String systemFactoryClassName = new JavaSystemConfig(config).getSystemFactory(systemName);
if (systemFactoryClassName == null) {
throw new SamzaException(
String.format("A stream uses system %s, which is missing from the configuration.", systemName));
}
return Util.getObj(systemFactoryClassName);
}));

return systemFactories;
}

private static Collection<String> getSystemNames(Config config) {
return new JavaSystemConfig(config).getSystemNames();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,12 @@ void addTargetNode(ProcessorNode targetNode) {
targetNodes.add(targetNode);
}

StreamSpec getStreamSpec() {
return streamSpec;
public StreamSpec getStreamSpec() {
if (partitions == PARTITIONS_UNKNOWN) {
return streamSpec;
} else {
return streamSpec.copyWithPartitionCount(partitions);
}
}

SystemStream getSystemStream() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.execution;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemStreamMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class StreamManager {
private static final Logger LOGGER = LoggerFactory.getLogger(StreamManager.class);

private final Map<String, SystemAdmin> sysAdmins;

public StreamManager(Map<String, SystemAdmin> sysAdmins) {
this.sysAdmins = sysAdmins;
}

public void createStreams(List<StreamSpec> streams) {
Multimap<String, StreamSpec> streamsGroupedBySystem = HashMultimap.create();
streams.forEach(streamSpec ->
streamsGroupedBySystem.put(streamSpec.getSystemName(), streamSpec));

for (Map.Entry<String, Collection<StreamSpec>> entry : streamsGroupedBySystem.asMap().entrySet()) {
String systemName = entry.getKey();
SystemAdmin systemAdmin = sysAdmins.get(systemName);

for (StreamSpec stream : entry.getValue()) {
LOGGER.info("Creating stream {} with partitions {} on system {}",
new Object[]{stream.getPhysicalName(), stream.getPartitionCount(), systemName});
systemAdmin.createStream(stream);
}
}
}

Map<String, Integer> getStreamPartitionCounts(String systemName, Set<String> streamNames) {
Map<String, Integer> streamToPartitionCount = new HashMap<>();

SystemAdmin systemAdmin = sysAdmins.get(systemName);
// retrieve the metadata for the streams in this system
Map<String, SystemStreamMetadata> streamToMetadata = systemAdmin.getSystemStreamMetadata(streamNames);
// set the partitions of a stream to its StreamEdge
streamToMetadata.forEach((stream, data) ->
streamToPartitionCount.put(stream, data.getSystemStreamPartitionMetadata().size()));

return streamToPartitionCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ <PK, M> MessageStreamImpl<M> generateIntStreamFromOpId(int opId, Function<M, PK>
config.get(JobConfig.JOB_NAME()),
config.get(JobConfig.JOB_ID(), "1"),
opNameWithId);
StreamSpec streamSpec = runner.getStream(streamId);
StreamSpec streamSpec = runner.getStreamSpec(streamId);

this.inStreams.putIfAbsent(streamSpec.getId(), new IntermediateStreamImpl(this, streamSpec, null, null, parKeyFn));
IntermediateStreamImpl intStream = (IntermediateStreamImpl) this.inStreams.get(streamSpec.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ public AbstractApplicationRunner(Config config) {
}

@Override
public StreamSpec getStream(String streamId) {
public StreamSpec getStreamSpec(String streamId) {
StreamConfig streamConfig = new StreamConfig(config);
String physicalName = streamConfig.getPhysicalName(streamId);
return getStream(streamId, physicalName);
return getStreamSpec(streamId, physicalName);
}

/**
Expand All @@ -58,11 +58,11 @@ public StreamSpec getStream(String streamId) {
* @param physicalName The system-specific name for this stream. It could be a file URN, topic name, or other identifer.
* @return The {@link StreamSpec} instance.
*/
/*package private*/ StreamSpec getStream(String streamId, String physicalName) {
/*package private*/ StreamSpec getStreamSpec(String streamId, String physicalName) {
StreamConfig streamConfig = new StreamConfig(config);
String system = streamConfig.getSystem(streamId);

return getStream(streamId, physicalName, system);
return getStreamSpec(streamId, physicalName, system);
}

/**
Expand All @@ -76,7 +76,7 @@ public StreamSpec getStream(String streamId) {
* @param system The name of the System on which this stream will be used.
* @return The {@link StreamSpec} instance.
*/
/*package private*/ StreamSpec getStream(String streamId, String physicalName, String system) {
/*package private*/ StreamSpec getStreamSpec(String streamId, String physicalName, String system) {
StreamConfig streamConfig = new StreamConfig(config);
Map<String, String> properties = streamConfig.getStreamProperties(streamId);

Expand Down
Loading

0 comments on commit 311f9d1

Please sign in to comment.