Skip to content

Commit

Permalink
Port the rest of rxflow to TypeScript.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Mar 10, 2014
1 parent cd871ab commit 22bcdb1
Show file tree
Hide file tree
Showing 11 changed files with 388 additions and 140 deletions.
61 changes: 37 additions & 24 deletions rxflow/src/Buffer.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 29 additions & 0 deletions rxflow/src/Buffer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import DataflowElement = require('./DataflowElement');
import InputPort = require('./InputPort');
import OutputPort = require('./OutputPort');

class Buffer<T> extends DataflowElement {

private buffer: Array<T> = [];

input = new InputPort<T>(x => this.buffer.push(x));
output = new OutputPort<T>();


invalidate() {
this.buffer = [];
}

isEmpty(): boolean {
return this.buffer.length === 0;
}

flush() {
var oldBuffer = this.buffer;
this.buffer = [];
oldBuffer.forEach(x => this.output.onNext(x));
return oldBuffer.length;
}
}

export = Buffer;
112 changes: 66 additions & 46 deletions rxflow/src/HashJoin.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 64 additions & 0 deletions rxflow/src/HashJoin.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import DataflowElement = require('./DataflowElement');
import InputPort = require('./InputPort');
import OutputPort = require('./OutputPort');


class HashJoin extends DataflowElement {

leftInput: InputPort<any>;
rightInput: InputPort<any>;
output = new OutputPort();

private buildInput: InputPort<any>;
private probeInput: InputPort<any>;

private hashTable = {};

private buildKeyFunc;
private probeKeyFunc;
private resultOrderingFunction;

constructor(leftKeyFunc, rightKeyFunc, buildInput) {
super();
if (buildInput === 'left') {
this.leftInput = new InputPort(x => this.handleBuildInput(x));
this.rightInput = new InputPort(x => this.handleProbeInput(x));
this.buildInput = this.leftInput;
this.probeInput = this.rightInput;
this.buildKeyFunc = leftKeyFunc;
this.probeKeyFunc = rightKeyFunc;
this.resultOrderingFunction = function (b, p) { return [b, p]; };
} else if (buildInput === 'right') {
this.rightInput = new InputPort(x => this.handleBuildInput(x));
this.leftInput = new InputPort(x => this.handleProbeInput(x));
this.buildInput = this.rightInput;
this.probeInput = this.leftInput;
this.buildKeyFunc = rightKeyFunc;
this.probeKeyFunc = leftKeyFunc;
this.resultOrderingFunction = function (b, p) { return [p, b]; };
} else {
throw new Error('buildInput should be \'left\' or \'right\', not \'' + buildInput + '\'');
}
}


private handleProbeInput(p) {
var key = this.probeKeyFunc(p);
if (key in this.hashTable) {
var matches = this.hashTable[key];
matches.forEach(b => this.output.onNext(this.resultOrderingFunction(b, p)));
}
}

private handleBuildInput(b) {
var key = this.buildKeyFunc(b);
if (!(key in this.hashTable)) {
this.hashTable[key] = [b];
} else {
this.hashTable[key].push(b);
}
}

}

export = HashJoin;
2 changes: 1 addition & 1 deletion rxflow/src/ObservableScanner.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
var Buffer = require('./buffer');
var Buffer = require('./Buffer');


function ObservableScanner(observable) {
Expand Down
83 changes: 42 additions & 41 deletions rxflow/src/Table.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 22bcdb1

Please sign in to comment.