Skip to content

Commit

Permalink
SAMZA-1054: Refactor Operator APIs
Browse files Browse the repository at this point in the history
Some suggestions for an Operator API refactor and misc. cleanup. It does contain some implementation changes, mostly due to deleted, extracted or merged classes. (e.g. OperatorFactory + ChainedOperators == OperatorImpls).

Since git marked several moved classes as (delete + new) instead, it's probably best to apply the diff locally and  browse the code in an IDE.

Some of the changes, in no particular order:
* Extracted XFunction interfaces into a .functions package in -api.
* -api's internal.Operators is now the -operators's spec.* package. Extracted interfaces and classes. Factory methods are now in OperatorSpecs.
* -api's MessageStreams is now -api's MessageStream interface and -operators's MessageStreamImpl.
* -api's internal.Windows classes are now in -api's .window package. Extracted interfaces and classes, but no implementation changes.
* OperatorFactory + ChainedOperators is now OperatorImpls, which is used from StreamOperatorAdaptorTask.
* Added a NoOpOperatorImpl, which acts as the root node for the OperatorImpl DAG returned by OperatorImpls.
* Removed usages of reactivestreams APIs since current code looks simpler without them. We can add them back when we need features like backpressure etc.
* Removed the InputSystemMessage interface.
* Made field names consistent (e.g Fn suffix for functions everywhere etc.).
* Some method/class visibility changes due to moved classes.
* General documentation changes, mostly to make public APIs clearer.

There are additional questions/tasks that we can address in future RBs:
* Updating Window and Trigger APIs.
* Merging samza-operator into samza-core.
* Questions about Message timestamp and Offset comparison semantics.
* Questions about OperatorSpec serialization (e.g. ID generation).
* Questions about StateStoreImpl and StoreFunctions.

Author: Prateek Maheshwari <[email protected]>

Reviewers: Yi Pan <[email protected]>, Jagadish <[email protected]>

Closes apache#25 from prateekm/master
  • Loading branch information
prateekm authored and nickpan47 committed Dec 1, 2016
1 parent a980c96 commit 0054380
Show file tree
Hide file tree
Showing 71 changed files with 2,722 additions and 2,588 deletions.
197 changes: 68 additions & 129 deletions samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,173 +16,112 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.operators;

import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.operators.data.Message;
import org.apache.samza.operators.internal.Operators;
import org.apache.samza.operators.internal.Operators.Operator;
import org.apache.samza.operators.internal.WindowOutput;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.operators.data.MessageEnvelope;
import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.operators.functions.FlatMapFunction;
import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.operators.functions.SinkFunction;
import org.apache.samza.operators.windows.Window;
import org.apache.samza.operators.windows.WindowOutput;
import org.apache.samza.operators.windows.WindowState;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;


/**
* This class defines either the input or output streams to/from the operators. Users use the API methods defined here to
* directly program the stream processing stages that processes a stream and generate another one.
* Represents a stream of {@link MessageEnvelope}s.
* <p>
* A {@link MessageStream} can be transformed into another {@link MessageStream} by applying the transforms in this API.
*
* @param <M> Type of message in this stream
* @param <M> type of {@link MessageEnvelope}s in this stream
*/
@InterfaceStability.Unstable
public class MessageStream<M extends Message> {

private final Set<Operator> subscribers = new HashSet<>();

/**
* Helper method to get the corresponding list of subscribers to a specific {@link MessageStream}.
*
* NOTE: This is purely an internal API and should not be used directly by programmers.
*
* @return A unmodifiable set containing all {@link Operator}s that subscribe to this {@link MessageStream} object
*/
public Collection<Operator> getSubscribers() {
return Collections.unmodifiableSet(this.subscribers);
}

/**
* Public API methods start here
*/
public interface MessageStream<M extends MessageEnvelope> {

/**
* Defines a function API that takes three input parameters w/ types {@code A}, {@code B}, and {@code C} and w/o a return value
* Applies the provided 1:1 {@link MapFunction} to {@link MessageEnvelope}s in this {@link MessageStream} and returns the
* transformed {@link MessageStream}.
*
* @param <A> the type of input {@code a}
* @param <B> the type of input {@code b}
* @param <C> the type of input {@code c}
* @param mapFn the function to transform a {@link MessageEnvelope} to another {@link MessageEnvelope}
* @param <TM> the type of {@link MessageEnvelope}s in the transformed {@link MessageStream}
* @return the transformed {@link MessageStream}
*/
@FunctionalInterface
public interface VoidFunction3<A, B, C> {
public void apply(A a, B b, C c);
}
<TM extends MessageEnvelope> MessageStream<TM> map(MapFunction<M, TM> mapFn);

/**
* Method to apply a map function (1:1) on a {@link MessageStream}
* Applies the provided 1:n {@link FlatMapFunction} to transform a {@link MessageEnvelope} in this {@link MessageStream}
* to n {@link MessageEnvelope}s in the transformed {@link MessageStream}
*
* @param mapper the mapper function to map one input {@link Message} to one output {@link Message}
* @param <OM> the type of the output {@link Message} in the output {@link MessageStream}
* @return the output {@link MessageStream} by applying the map function on the input {@link MessageStream}
* @param flatMapFn the function to transform a {@link MessageEnvelope} to zero or more {@link MessageEnvelope}s
* @param <TM> the type of {@link MessageEnvelope}s in the transformed {@link MessageStream}
* @return the transformed {@link MessageStream}
*/
public <OM extends Message> MessageStream<OM> map(Function<M, OM> mapper) {
Operator<OM> op = Operators.<M, OM>getStreamOperator(m -> new ArrayList<OM>() { {
OM r = mapper.apply(m);
if (r != null) {
this.add(r);
}
} });
this.subscribers.add(op);
return op.getOutputStream();
}
<TM extends MessageEnvelope> MessageStream<TM> flatMap(FlatMapFunction<M, TM> flatMapFn);

/**
* Method to apply a flatMap function (1:n) on a {@link MessageStream}
* Applies the provided {@link FilterFunction} to {@link MessageEnvelope}s in this {@link MessageStream} and returns the
* transformed {@link MessageStream}.
* <p>
* The {@link FilterFunction} is a predicate which determines whether a {@link MessageEnvelope} in this {@link MessageStream}
* should be retained in the transformed {@link MessageStream}.
*
* @param flatMapper the flat mapper function to map one input {@link Message} to zero or more output {@link Message}s
* @param <OM> the type of the output {@link Message} in the output {@link MessageStream}
* @return the output {@link MessageStream} by applying the map function on the input {@link MessageStream}
* @param filterFn the predicate to filter {@link MessageEnvelope}s from this {@link MessageStream}
* @return the transformed {@link MessageStream}
*/
public <OM extends Message> MessageStream<OM> flatMap(Function<M, Collection<OM>> flatMapper) {
Operator<OM> op = Operators.getStreamOperator(flatMapper);
this.subscribers.add(op);
return op.getOutputStream();
}
MessageStream<M> filter(FilterFunction<M> filterFn);

/**
* Method to apply a filter function on a {@link MessageStream}
* Allows sending {@link MessageEnvelope}s in this {@link MessageStream} to an output
* {@link org.apache.samza.system.SystemStream} using the provided {@link SinkFunction}.
*
* @param filter the filter function to filter input {@link Message}s from the input {@link MessageStream}
* @return the output {@link MessageStream} after applying the filter function on the input {@link MessageStream}
* @param sinkFn the function to send {@link MessageEnvelope}s in this stream to output systems
*/
public MessageStream<M> filter(Function<M, Boolean> filter) {
Operator<M> op = Operators.<M, M>getStreamOperator(t -> new ArrayList<M>() { {
if (filter.apply(t)) {
this.add(t);
}
} });
this.subscribers.add(op);
return op.getOutputStream();
}
void sink(SinkFunction<M> sinkFn);

/**
* Method to send an input {@link MessageStream} to an output {@link org.apache.samza.system.SystemStream}, and allows the output {@link MessageStream}
* to be consumed by downstream stream operators again.
* Groups the {@link MessageEnvelope}s in this {@link MessageStream} according to the provided {@link Window} semantics
* (e.g. tumbling, sliding or session windows) and returns the transformed {@link MessageStream} of
* {@link WindowOutput}s.
* <p>
* Use the {@link org.apache.samza.operators.windows.Windows} helper methods to create the appropriate windows.
*
* @param sink the user-defined sink function to send the input {@link Message}s to the external output systems
* @param window the {@link Window} to group and process {@link MessageEnvelope}s from this {@link MessageStream}
* @param <WK> the type of key in the {@link WindowOutput} from the {@link Window}
* @param <WV> the type of value in the {@link WindowOutput} from the {@link Window}
* @param <WS> the type of window state kept in the {@link Window}
* @param <WM> the type of {@link WindowOutput} in the transformed {@link MessageStream}
* @return the transformed {@link MessageStream}
*/
public void sink(VoidFunction3<M, MessageCollector, TaskCoordinator> sink) {
this.subscribers.add(Operators.getSinkOperator(sink));
}
<WK, WV, WS extends WindowState<WV>, WM extends WindowOutput<WK, WV>> MessageStream<WM> window(
Window<M, WK, WV, WM> window);

/**
* Method to perform a window function (i.e. a group-by, aggregate function) on a {@link MessageStream}
* Joins this {@link MessageStream} with another {@link MessageStream} using the provided pairwise {@link JoinFunction}.
* <p>
* We currently only support 2-way joins.
*
* @param window the window function to group and aggregate the input {@link Message}s from the input {@link MessageStream}
* @param <WK> the type of key in the output {@link Message} from the {@link Windows.Window} function
* @param <WV> the type of output value from
* @param <WS> the type of window state kept in the {@link Windows.Window} function
* @param <WM> the type of {@link org.apache.samza.operators.internal.WindowOutput} message from the {@link Windows.Window} function
* @return the output {@link MessageStream} after applying the window function on the input {@link MessageStream}
*/
public <WK, WV, WS extends WindowState<WV>, WM extends WindowOutput<WK, WV>> MessageStream<WM> window(Windows.Window<M, WK, WV, WM> window) {
Operator<WM> wndOp = Operators.getWindowOperator(Windows.getInternalWindowFn(window));
this.subscribers.add(wndOp);
return wndOp.getOutputStream();
}

/**
* Method to add an input {@link MessageStream} to a join function. Note that we currently only support 2-way joins.
*
* @param other the other stream to be joined w/
* @param merger the common function to merge messages from this {@link MessageStream} and {@code other}
* @param otherStream the other {@link MessageStream} to be joined with
* @param joinFn the function to join {@link MessageEnvelope}s from this and the other {@link MessageStream}
* @param <K> the type of join key
* @param <JM> the type of message in the {@link Message} from the other join stream
* @param <RM> the type of message in the {@link Message} from the join function
* @return the output {@link MessageStream} from the join function {@code joiner}
* @param <OM> the type of {@link MessageEnvelope}s in the other stream
* @param <RM> the type of {@link MessageEnvelope}s resulting from the {@code joinFn}
* @return the joined {@link MessageStream}
*/
public <K, JM extends Message<K, ?>, RM extends Message> MessageStream<RM> join(MessageStream<JM> other,
BiFunction<M, JM, RM> merger) {
MessageStream<RM> outputStream = new MessageStream<>();

BiFunction<M, JM, RM> parJoin1 = merger::apply;
BiFunction<JM, M, RM> parJoin2 = (m, t1) -> merger.apply(t1, m);

// TODO: need to add default store functions for the two partial join functions

other.subscribers.add(Operators.<JM, K, M, RM>getPartialJoinOperator(parJoin2, outputStream));
this.subscribers.add(Operators.<M, K, JM, RM>getPartialJoinOperator(parJoin1, outputStream));
return outputStream;
}
<K, OM extends MessageEnvelope<K, ?>, RM extends MessageEnvelope> MessageStream<RM> join(MessageStream<OM> otherStream,
JoinFunction<M, OM, RM> joinFn);

/**
* Method to merge all {@code others} streams w/ this {@link MessageStream}. The merging streams must have the same type {@code M}
* Merge all {@code otherStreams} with this {@link MessageStream}.
* <p>
* The merging streams must have the same {@link MessageEnvelope} type {@code M}.
*
* @param others other streams to be merged w/ this one
* @return the merged output stream
* @param otherStreams other {@link MessageStream}s to be merged with this {@link MessageStream}
* @return the merged {@link MessageStream}
*/
public MessageStream<M> merge(Collection<MessageStream<M>> others) {
MessageStream<M> outputStream = new MessageStream<>();

others.add(this);
others.forEach(other -> other.subscribers.add(Operators.getMergeOperator(outputStream)));
return outputStream;
}

MessageStream<M> merge(Collection<MessageStream<M>> otherStreams);

}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.operators;

import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.operators.data.IncomingSystemMessageEnvelope;
import org.apache.samza.system.SystemStreamPartition;

import java.util.Map;


/**
* A {@link StreamOperatorTask} is the basic interface to implement for processing {@link MessageStream}s.
* Implementations can describe the transformation steps for each {@link MessageStream} in the
* {@link #transform} method using {@link MessageStream} APIs.
* <p>
* Implementations may be augmented by implementing {@link org.apache.samza.task.InitableTask},
* {@link org.apache.samza.task.WindowableTask} and {@link org.apache.samza.task.ClosableTask} interfaces,
* but should not implement {@link org.apache.samza.task.StreamTask} or {@link org.apache.samza.task.AsyncStreamTask}
* interfaces.
*/
@InterfaceStability.Unstable
public interface StreamOperatorTask {

/**
* Describe the transformation steps for each {@link MessageStream}s for this task using the
* {@link MessageStream} APIs. Each {@link MessageStream} corresponds to one {@link SystemStreamPartition}
* in the input system.
*
* @param messageStreams the {@link MessageStream}s that receive {@link IncomingSystemMessageEnvelope}s
* from their corresponding {@link org.apache.samza.system.SystemStreamPartition}
*/
void transform(Map<SystemStreamPartition, MessageStream<IncomingSystemMessageEnvelope>> messageStreams);

}
Loading

0 comments on commit 0054380

Please sign in to comment.