Skip to content

Commit

Permalink
progress on refactoring, bolt + minor issues left
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathan Marz committed Feb 8, 2012
1 parent 559195b commit 4959adf
Show file tree
Hide file tree
Showing 15 changed files with 238 additions and 265 deletions.
2 changes: 1 addition & 1 deletion src/clj/backtype/storm/bootstrap.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
'(do
(import (quote [backtype.storm Constants]))
(import (quote [backtype.storm.testing FeederSpout TestPlannerBolt TestPlannerSpout
AckFailDelegate AckTracker DelegateOutputCollector]))
AckFailDelegate AckTracker]))
(import (quote [backtype.storm.utils Utils LocalState Time TimeCacheMap
TimeCacheMap$ExpiredCallback BufferFileInputStream
RegisteredGlobalState ThriftTopologyUtils]))
Expand Down
4 changes: 2 additions & 2 deletions src/clj/backtype/storm/daemon/acker.clj
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
curr (.get pending id)
curr (condp = (.getSourceStreamId tuple)
ACKER-INIT-STREAM-ID (-> curr
(update-ack id)
(assoc :spout-task (.getValue tuple 1)))
(update-ack (.getValue tuple 1))
(assoc :spout-task (.getValue tuple 2)))
ACKER-ACK-STREAM-ID (update-ack curr (.getValue tuple 1))
ACKER-FAIL-STREAM-ID (assoc curr :failed true))]
(.put pending id curr)
Expand Down
3 changes: 1 addition & 2 deletions src/clj/backtype/storm/daemon/common.clj
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,7 @@
(dofor [[_ spout] (.get_spouts ret)
:let [common (.get_common spout)]]
(do
(.put_to_streams common ACKER-INIT-STREAM-ID (thrift/output-fields ["id" "spout-task"]))
(.put_to_streams common ACKER-ACK-STREAM-ID (thrift/output-fields ["id" "ack-val"]))
(.put_to_streams common ACKER-INIT-STREAM-ID (thrift/output-fields ["id" "init-val" "spout-task"]))
(.put_to_inputs common
(GlobalStreamId. ACKER-COMPONENT-ID ACKER-ACK-STREAM-ID)
(thrift/mk-direct-grouping))
Expand Down
357 changes: 176 additions & 181 deletions src/clj/backtype/storm/daemon/task.clj

Large diffs are not rendered by default.

20 changes: 10 additions & 10 deletions src/clj/backtype/storm/daemon/worker.clj
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,11 @@
(-> (reverse-map task->component) (select-keys components) vals)))
))

(defn mk-transfer-fn [transfer-queue]
(fn [task ^Tuple tuple]
(.put ^LinkedBlockingQueue transfer-queue [task tuple])
))
(defn mk-transfer-fn [storm-conf context transfer-queue]
(let [^KryoTupleSerializer serializer (KryoTupleSerializer. storm-conf context)]
(fn [task ^Tuple tuple]
(.put ^LinkedBlockingQueue transfer-queue [task (.serialize serializer tuple)])
)))

;; TODO: should worker even take the storm-id as input? this should be
;; deducable from cluster state (by searching through assignments)
Expand Down Expand Up @@ -112,7 +113,7 @@

transfer-queue (LinkedBlockingQueue.) ; possibly bound the size of it

transfer-fn (mk-transfer-fn transfer-queue)
transfer-fn (mk-transfer-fn storm-conf (mk-topology-context nil) transfer-queue)
refresh-connections (fn this
([]
(this (fn [& ignored] (.add event-manager this))))
Expand Down Expand Up @@ -176,22 +177,21 @@
(when @active (storm-conf TASK-REFRESH-POLL-SECS))
))
(async-loop
(fn [^ArrayList drainer ^KryoTupleSerializer serializer]
(fn [^ArrayList drainer]
(let [felem (.take transfer-queue)]
(.add drainer felem)
(.drainTo transfer-queue drainer))
(read-locked endpoint-socket-lock
(let [node+port->socket @node+port->socket
task->node+port @task->node+port]
(doseq [[task ^Tuple tuple] drainer]
(let [socket (node+port->socket (task->node+port task))
ser-tuple (.serialize serializer tuple)]
(doseq [[task ser-tuple] drainer]
(let [socket (node+port->socket (task->node+port task))]
(msg/send socket task ser-tuple)
))
))
(.clear drainer)
0 )
:args-fn (fn [] [(ArrayList.) (KryoTupleSerializer. storm-conf (mk-topology-context nil))]))
:args-fn (fn [] [(ArrayList.)]))
heartbeat-thread]
virtual-port-shutdown (when (local-mode-zmq? conf)
(log-message "Launching virtual port for " supervisor-id ":" port)
Expand Down
4 changes: 0 additions & 4 deletions src/clj/backtype/storm/tuple.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,5 @@

(bootstrap)

(defn tuple-hash-code [^Tuple tuple]
(.hashCode (.getValues tuple))
)

(defn list-hash-code [^List alist]
(.hashCode alist))
4 changes: 2 additions & 2 deletions src/jvm/backtype/storm/coordination/CoordinatedBolt.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public String toString() {
}
}

public class CoordinatedOutputCollector extends OutputCollector {
public class CoordinatedOutputCollector implements IOutputCollector {
IOutputCollector _delegate;

public CoordinatedOutputCollector(IOutputCollector delegate) {
Expand Down Expand Up @@ -185,7 +185,7 @@ public void prepare(Map config, TopologyContext context, OutputCollector collect
}
_tracked = new TimeCacheMap<Object, TrackingInfo>(Utils.getInt(config.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)), callback);
_collector = collector;
_delegate.prepare(config, context, new CoordinatedOutputCollector(collector));
_delegate.prepare(config, context, new OutputCollector(new CoordinatedOutputCollector(collector)));
for(String component: Utils.get(context.getThisTargets(),
Constants.COORDINATED_STREAM_ID,
new HashMap<String, Grouping>())
Expand Down
10 changes: 6 additions & 4 deletions src/jvm/backtype/storm/grouping/CustomStreamGrouping.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package backtype.storm.grouping;

import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import java.io.Serializable;
import java.util.List;

Expand All @@ -9,17 +9,19 @@ public interface CustomStreamGrouping extends Serializable {
/**
* Tells the stream grouping at runtime the number of tasks in the target bolt.
* This information should be used in taskIndicies to determine the target tasks.
*
* It also tells the grouping the metadata on the stream this grouping will be used on.
*/
void prepare(int numTasks);
void prepare(Fields outFields, int numTasks);

/**
* This function implements a custom stream grouping. It takes in as input
* the number of tasks in the target bolt in prepare and returns the
* indices of the tasks to send the tuple to. Each index must be in the range
* [0, numTargetTasks-1]
*
* @param tuple the tuple to group on
* @param tuple the values to group on
* @param numTargetTasks the number of tasks in the target bolt
*/
List<Integer> taskIndices(Tuple tuple);
List<Integer> taskIndices(List<Object> values);
}
12 changes: 0 additions & 12 deletions src/jvm/backtype/storm/task/IInternalOutputCollector.java

This file was deleted.

34 changes: 33 additions & 1 deletion src/jvm/backtype/storm/task/OutputCollector.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,14 @@
* This is the core API for emitting tuples. For a simpler API, and a more restricted
* form of stream processing, see IBasicBolt and BasicOutputCollector.
*/
public abstract class OutputCollector implements IOutputCollector {
public class OutputCollector implements IOutputCollector {
private IOutputCollector _delegate;


public OutputCollector(IOutputCollector delegate) {
_delegate = delegate;
}

/**
* Emits a new tuple to a specific stream with a single anchor.
*
Expand Down Expand Up @@ -163,4 +170,29 @@ public void emitDirect(int taskId, Tuple anchor, List<Object> tuple) {
public void emitDirect(int taskId, List<Object> tuple) {
emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple);
}

@Override
public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
return _delegate.emit(streamId, anchors, tuple);
}

@Override
public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
_delegate.emitDirect(taskId, streamId, anchors, tuple);
}

@Override
public void ack(Tuple input) {
_delegate.ack(input);
}

@Override
public void fail(Tuple input) {
_delegate.fail(input);
}

@Override
public void reportError(Throwable error) {
_delegate.reportError(error);
}
}
1 change: 1 addition & 0 deletions src/jvm/backtype/storm/task/OutputCollectorImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class OutputCollectorImpl extends OutputCollector {
private Map<Tuple, List<Long>> _pendingAcks = new ConcurrentHashMap<Tuple, List<Long>>();

public OutputCollectorImpl(TopologyContext context, IInternalOutputCollector collector) {
super(null); // TODO: remove
_context = context;
_collector = collector;
}
Expand Down
40 changes: 0 additions & 40 deletions src/jvm/backtype/storm/testing/DelegateOutputCollector.java

This file was deleted.

6 changes: 3 additions & 3 deletions src/jvm/backtype/storm/testing/NGrouping.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package backtype.storm.testing;

import backtype.storm.grouping.CustomStreamGrouping;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import java.util.ArrayList;
import java.util.List;

Expand All @@ -13,11 +13,11 @@ public NGrouping(int n) {
}

@Override
public void prepare(int numTasks) {
public void prepare(Fields outFields, int numTasks) {
}

@Override
public List<Integer> taskIndices(Tuple tuple) {
public List<Integer> taskIndices(List<Object> values) {
List<Integer> ret = new ArrayList<Integer>();
for(int i=0; i<_n; i++) {
ret.add(i);
Expand Down
4 changes: 2 additions & 2 deletions src/jvm/backtype/storm/tuple/MessageId.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ public static MessageId makeId(Map<Long, Long> anchorsToIds) {
return new MessageId(anchorsToIds);
}

public static MessageId makeRootId(long id) {
public static MessageId makeRootId(long id, long val) {
Map<Long, Long> anchorsToIds = new HashMap<Long, Long>();
anchorsToIds.put(id, id);
anchorsToIds.put(id, val);
return new MessageId(anchorsToIds);
}

Expand Down
2 changes: 1 addition & 1 deletion test/clj/backtype/storm/transactional_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@
(fn [ov]
(concat ov [newvalue])
)))]
(DelegateOutputCollector.
(OutputCollector.
(reify IOutputCollector
(emit [this stream-id anchors values]
(swap! capturer adder stream-id values)
Expand Down

0 comments on commit 4959adf

Please sign in to comment.