From 654a4296f8586aab87fcd8d887e76c5940163153 Mon Sep 17 00:00:00 2001 From: Graeme Yeates Date: Wed, 29 Oct 2014 14:56:06 -0400 Subject: [PATCH] Make Topic streams pipeable --- src/node/TopicStream.js | 25 ++++++++-- test/examples/topic-listener.example.js | 62 ++++++++++++++++++------- 2 files changed, 68 insertions(+), 19 deletions(-) diff --git a/src/node/TopicStream.js b/src/node/TopicStream.js index be8780e56..759f108ed 100644 --- a/src/node/TopicStream.js +++ b/src/node/TopicStream.js @@ -1,12 +1,31 @@ var Topic = require('../core/Topic'); -var ReadableStream = require('stream').Readable; +var DuplexStream = require('stream').Duplex; -Topic.prototype.toStream = function() { - var stream = new ReadableStream({ +/** + * Publish a connected ROS topic to a duplex + * stream. This stream can be piped to, which will + * publish to the topic + */ +Topic.prototype.toStream = function(transform) { + var topic = this; + var hasTransform = typeof transform === 'function'; + + var stream = new DuplexStream({ objectMode: true }); stream._read = function() {}; + // Publish to the topic if someone pipes to stream + stream._write = function(chunk, encoding, callback) { + if (hasTransform) { + chunk = transform(chunk); + } + if (chunk) { + topic.publish(chunk); + } + callback(); + }; + this.subscribe(function(message) { stream.push(message); }); diff --git a/test/examples/topic-listener.example.js b/test/examples/topic-listener.example.js index ae4204590..2da859f7d 100644 --- a/test/examples/topic-listener.example.js +++ b/test/examples/topic-listener.example.js @@ -1,22 +1,22 @@ var expect = require('chai').expect; var ROSLIB = require('../..'); -describe('Topics Example', function() { - this.timeout(1000); +var ros = new ROSLIB.Ros({ + url: 'ws://localhost:9090' +}); - var ros = new ROSLIB.Ros({ - url: 'ws://localhost:9090' - }); +var example = ros.Topic({ + name: '/test_topic', + messageType: 'std_msgs/String' +}); - var example = ros.Topic({ - name: '/test_topic', - messageType: 'std_msgs/String' - }); +function format(msg) { + return {data: msg}; +} +var messages = ['1', '2', '3', '4'].map(format); - function format(msg) { - return {data: msg}; - } - var messages = ['1', '2', '3', '4'].map(format); +describe('Topics Example', function() { + this.timeout(1000); function createAndStreamTopic(topicName) { var topic = ros.Topic({ @@ -53,8 +53,38 @@ describe('Topics Example', function() { topic.on('unsubscribe', done); }); +}); + +if (ROSLIB.Topic.prototype.toStream) { + var TransformStream = require('stream').Transform; + describe('Topic Stream Example', function() { + this.timeout(1000); + + function createAndStreamTopic(topicName) { + var stream = new TransformStream({objectMode: true}); + var topic = ros.Topic({ + name: topicName, + messageType: 'std_msgs/String' + }); + + var idx = 0; + function emit() { + setTimeout(function() { + stream.push(messages[idx++]); + if (idx < messages.length) { + emit(); + } else { + stream.end(); + topic.unadvertise(); + } + }, 50); + } + emit(); + + stream.pipe(topic.toStream()); + return topic; + } - if (ROSLIB.Topic.prototype.toStream) { it('Topic.toStream()', function(done) { var stream = createAndStreamTopic('/echo/test-stream').toStream(); var expected = messages.slice(); @@ -65,5 +95,5 @@ describe('Topics Example', function() { }); stream.on('end', done); }); - } -}); \ No newline at end of file + }); +} \ No newline at end of file