Skip to content

Commit

Permalink
BaconJS
Browse files Browse the repository at this point in the history
  • Loading branch information
gwing33 committed Apr 17, 2015
1 parent feb82b1 commit cf0879c
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 1 deletion.
7 changes: 6 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,9 @@ tests/fsharpqa/Source/CodeGen/EmittedIL/QueryExpressionStepping/Utils.dll
tests/fsharpqa/Source/CodeGen/EmittedIL/ComputationExpressions/ComputationExprLibrary.dll
packages
FSharp.Core.Nuget/*.nupkg
data
data


*.log
node_modules/
.idea/
155 changes: 155 additions & 0 deletions StreamingBaconJS/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/**
* Created by msdn_000 on 4/13/2015.
*/

var fileinput = require('fileinput');
var Bacon = require('baconjs');

function getIdFromObject(key) {
var id = "";
if(typeof key == 'object') {
for(var i in key) {
id += key[i].toString();
}
} else {
id = key.toString();
}

return id;
}

Bacon.Observable.prototype.groupBy = function(keyF, limitF, comparerF) {
if(typeof limitF === 'undefined') {
limitF = itemIdentity;
}

if(typeof comparerF == 'undefined') {
comparerF = function() {
return true;
};
}

var streams = {},
src = this;

return src.filter(function(x) {
var key = keyF(x),
id = getIdFromObject(key);
return !streams[id];
}).map(function (x) {
var key = keyF(x),
id = getIdFromObject(key);

similar = src.filter(function(y) {
var idY = getIdFromObject(keyF(y));
return idY == id && comparerF(x, y);
});

data = Bacon.once(x).concat(similar);

limited = limitF(data).withHandler(function(event) {
if(event.isEnd()) {
return delete streams[id];
}

return this.push(event);
});

streams[id] = limited;
return streams[id];
});
};

var fixNewLine = new RegExp("(\r)?\n");
function toOutputType(item) {
return {
"groupName": item.name,
"bib": item.item.bib,
"time": item.item.time,
"age": item.item.age
};
}
function itemIdentity(e) {
return e;
}
function byCheckpoint(item) {
return item.checkpoint;
}
function byCheckpointGender(item) {
return { "checkpoint": item.checkpoint, "gender": item.gender };
}
function byCheckpointGenderAge(item) {
return { "checkpoint": item.checkpoint, "gender": item.gender, "age": item.age };
}
function compareByCheckpointGender(groupA, groupB) {
return groupA.checkpoint === groupB.checkpoint && groupA.gender === groupB.gender;
}
function compareByCheckpointGenderAge(groupA, groupB) {
return compareByCheckpointGender(groupA, groupB) && groupA.age === groupB.age;
}
function endsWith(str, suffix) {
return str.indexOf(suffix, str.length - suffix.length) !== -1;
}
function getInput() {
if (process.argv.length > 2) {
return Bacon.fromEvent(fileinput.input(), 'line').map(function (line) {
return line.toString('utf8');
}).map(function (line) {
return line.replace(fixNewLine, "");
});
} else {
return Bacon.fromBinder(function(sink) {
process.stdin.on('data', function(buff) {
sink(buff.toString());
});
}).flatMap(function (line) {
return Bacon.fromArray(line.toString().split(/(\r)?\n/));
}).slidingWindow(2, 1).flatMap(function (l) {
return l.reduce(function (acc, x) {
if (endsWith(acc, "}") === true) {
return acc;
} else {
return acc + x;
}
}, "");
}).filter(function (item) {
return item[0] === "{";
});
}
}
function groupReads(reads) {
var overall = reads.groupBy(byCheckpoint).flatMap(function (group) {
return group.map(function (item) {

return { "name": "Overall Checkpoint " + item.checkpoint, "item": item };
});
});

var gender = reads.groupBy(byCheckpointGender, itemIdentity, compareByCheckpointGender).flatMap(function (group) {
return group.map(function (item) {
return { "name": item.gender + " Checkpoint " + item.checkpoint, "item": item };
});
});

var genderAge = reads.groupBy(byCheckpointGenderAge, itemIdentity, compareByCheckpointGenderAge).flatMap(function (group) {
return group.map(function (item) {
return {
"name": item.gender + " " + item.age + " Checkpoint " + item.checkpoint,
"item": item
};
});
});

return overall.merge(gender).merge(genderAge);
}
function stream(lines) {
var reads = lines.map(JSON.parse);
return groupReads(reads).map(toOutputType);
}
function main() {
var lines = getInput();
var subscription = stream(lines).subscribe(function (x) {
console.log(x);
});
}
main();
15 changes: 15 additions & 0 deletions StreamingBaconJS/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"name": "StreamingBaconJS",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "",
"license": "ISC",
"dependencies": {
"baconjs": "^0.7.53",
"fileinput": "^0.2.0"
}
}

0 comments on commit cf0879c

Please sign in to comment.