Skip to content

Commit

Permalink
Create public api
Browse files Browse the repository at this point in the history
  • Loading branch information
interpretor committed May 22, 2017
1 parent 5310c8d commit ba70c63
Show file tree
Hide file tree
Showing 18 changed files with 322 additions and 74 deletions.
44 changes: 42 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,44 @@
# zre
# zyre.js

[![Build Status](https://travis-ci.org/interpretor/zyre.js.svg?branch=master)](https://travis-ci.org/interpretor/zyre.js)

Implementation of ZRE (http://rfc.zeromq.org/spec:36/ZRE) in node.js
Node.js port of [Zyre](https://github.com/zeromq/zyre) - an open-source framework for proximity-based peer-to-peer applications

## Notice

This project is currently in development and not considered stable yet.

## Installation

```bash
npm install zyre.js
```

## Examples

```js
const zyre = require('zyre.js');

const z1 = zyre.new();
const z2 = zyre.new();

z1.on('message', (name, message, group) => {
console.log(`#${group} <${name}> ${message}`);
z1.shout('CHAT', 'Hey!');
});

z2.on('message', (name, message, group) => {
console.log(`#${group} <${name}> ${message}`);
});

z1.start().then(() => {
z1.join('CHAT');
z2.start().then(() => {
z2.join('CHAT');
});
});

setInterval(() => {
z2.shout('CHAT', 'Hello World!');
}, 1000);
```
17 changes: 15 additions & 2 deletions lib/zbeacon.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
/*
* Copyright (c) 2017 Sebastian Rager
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

const dgram = require('dgram');
const debug = require('debug')('zyre:zbeacon');

Expand Down Expand Up @@ -172,8 +180,13 @@ module.exports = class ZBeacon {
* Starts listening and broadcasting
*/
start() {
this.startListening();
this.startBroadcasting();
return new Promise((resolve) => {
this.startListening().then(() => {
this.startBroadcasting().then(() => {
resolve();
});
});
});
}

/**
Expand Down
8 changes: 8 additions & 0 deletions lib/zhelper.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
/*
* Copyright (c) 2017 Sebastian Rager
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

const os = require('os');
const tcpPortUsed = require('tcp-port-used');

Expand Down
24 changes: 16 additions & 8 deletions lib/zre_msg.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
/*
* Copyright (c) 2017 Sebastian Rager
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

const zeromq = require('zeromq');

const ZRE_VERSION = 2;
Expand Down Expand Up @@ -115,14 +123,6 @@ class ZreMsg {
if (headers) this._headers = headers;
}

setSequence(sequence) {
this._sequence = sequence;
}

setGroup(group) {
this._group = group;
}

getCmd() {
return this._cmd;
}
Expand Down Expand Up @@ -159,6 +159,14 @@ class ZreMsg {
return this._headers;
}

setSequence(sequence) {
this._sequence = sequence;
}

setGroup(group) {
this._group = group;
}

send(socket) {
return new Promise((resolve, reject) => {
if (socket instanceof zeromq.Socket) {
Expand Down
78 changes: 57 additions & 21 deletions lib/zyre.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
/*
* Copyright (c) 2017 Sebastian Rager
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

const EventEmitter = require('events');
const uuid = require('uuid');
const ZHelper = require('./zhelper');
const ZyrePeers = require('./zyre_peers');
Expand All @@ -6,9 +15,11 @@ const ZBeacon = require('./zbeacon');

const MAILBOX = 49152;

class Zyre {
module.exports = class Zyre extends EventEmitter {

constructor({ name, iface }) {
super();

if (iface) {
this._ifaceData = ZHelper.getIfData(iface);
} else {
Expand All @@ -31,32 +42,57 @@ class Zyre {
}

this._zyrePeers = new ZyrePeers(this._identity);
}

ZHelper.getFreePort(this._ifaceData.address, MAILBOX).then((port) => {
this._zyreNode = new ZyreNode({
identity: this._identity,
name: this._name,
address: this._ifaceData.address,
mailbox: port,
zyrePeers: this._zyrePeers,
});
start(callback) {
return new Promise((resolve) => {
ZHelper.getFreePort(this._ifaceData.address, MAILBOX).then((port) => {
this._zyreNode = new ZyreNode({
identity: this._identity,
name: this._name,
address: this._ifaceData.address,
mailbox: port,
zyrePeers: this._zyrePeers,
});

this._zBeacon = new ZBeacon({
identity: this._identity,
address: this._ifaceData.broadcast,
mailbox: port,
zyrePeers: this._zyrePeers,
});
this._zyreNode.on('message', (name, message, group) => {
this.emit('message', name, message, group);
});

this._zBeacon = new ZBeacon({
identity: this._identity,
address: this._ifaceData.broadcast,
mailbox: port,
zyrePeers: this._zyrePeers,
});

this._zyreNode.startListening().then(() => {
this._zBeacon.start();
this._zyreNode.startListening().then(() => {
this._zBeacon.start().then(() => {
if (callback) callback();
resolve();
});
});
});
});
}

getName() {
return this._name;
whisper(identity, message) {
this._zyreNode.whisper(identity, message);
}
}

exports.new = ({ name, iface } = {}) => new Zyre({ name, iface });
shout(group, message) {
this._zyreNode.shout(group, message);
}

join(group) {
this._zyreNode.join(group);
}

leave(group) {
this._zyreNode.leave(group);
}

static new({ name, iface } = {}) {
return new Zyre({ name, iface });
}
};
8 changes: 8 additions & 0 deletions lib/zyre_group.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
/*
* Copyright (c) 2017 Sebastian Rager
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

const ZreMsg = require('./zre_msg');

module.exports = class ZyreGroup {
Expand Down
8 changes: 8 additions & 0 deletions lib/zyre_groups.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
/*
* Copyright (c) 2017 Sebastian Rager
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

const ZyreGroup = require('./zyre_group');

module.exports = class ZyreGroups {
Expand Down
79 changes: 63 additions & 16 deletions lib/zyre_node.js
Original file line number Diff line number Diff line change
@@ -1,34 +1,50 @@
/*
* Copyright (c) 2017 Sebastian Rager
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

const EventEmitter = require('events');
const debug = require('debug')('zyre:zyre_node');
const zeromq = require('zeromq');
const ZreMsg = require('./zre_msg');
const ZyreGroups = require('./zyre_groups');

module.exports = class ZyreNode {
module.exports = class ZyreNode extends EventEmitter {

constructor({ identity, name, address, mailbox, headers = {}, zyrePeers }) {
super();

this._identity = identity;
this._name = name;
this._endpoint = `tcp://${address}:${mailbox}`;
this._headers = headers;
this._zyrePeers = zyrePeers;
this._status = 1;
this._groups = [];
this._status = 0;
this._zyreGroups = new ZyreGroups();
}

startListening() {
this._socket = zeromq.socket('router');

// New peer handling
this._zyrePeers.on('new', (zyrePeer) => {
zyrePeer.connect();
zyrePeer.send(new ZreMsg(ZreMsg.HELLO, {
endpoint: this._endpoint,
groups: ['CHAT'],
groups: this._groups,
status: this._status,
name: this._name,
headers: this._headers,
}));

this.emit('new', zyrePeer.getName());
});

this._socket = zeromq.socket('router');

// Message handling
this._socket.on('message', (id, msg, frame) => {
const zreMsg = ZreMsg.read(msg, frame);
if (!zreMsg) {
Expand All @@ -54,30 +70,26 @@ module.exports = class ZyreNode {
zreMsg.getGroups().forEach((e) => {
this._zyreGroups.push(e, this._zyrePeers.getPeer(identity));
});
// If received other than HELLO message from unknown peer
// If received other than HELLO message from unknown peer, do nothing
} else if (!this._zyrePeers.getPeer(identity)) {
debug(`${identity}: unknown peer wants to send (${zreMsg.getCmd()})`);
return;
// If received other messages from known peers
// If received message from known peer
} else {
this._zyrePeers.push({
identity,
sequence: zreMsg.getSequence(),
status: zreMsg.getStatus(),
});

const zyrePeer = this._zyrePeers.getPeer(identity);

if (zreMsg.getCmd() === ZreMsg.JOIN) {
this._zyreGroups.push(zreMsg.getGroup(), this._zyrePeers.getPeer(identity));
} else if (zreMsg.getCmd === ZreMsg.LEAVE) {
this._zyreGroups.push(zreMsg.getGroup(), zyrePeer);
} else if (zreMsg.getCmd() === ZreMsg.LEAVE) {
this._zyreGroups.pop(zreMsg.getGroup(), identity);
}
}

// Mocking
if (zreMsg.getCmd() === ZreMsg.SHOUT) {
this._zyreGroups.getGroup('CHAT').send(new ZreMsg(ZreMsg.SHOUT, {
content: 'Hello World!',
}));
this.emit('message', zyrePeer.getName(), zreMsg.getContent(), zreMsg.getGroup());
}
});

Expand All @@ -88,4 +100,39 @@ module.exports = class ZyreNode {
});
});
}

whisper(identity, message) {
if (this._zyrePeers.getPeer(identity)) {
this._zyrePeers.getPeer(identity).send(new ZreMsg(ZreMsg.WHISPER, {
content: message,
}));
}
}

shout(group, message) {
if (this._zyreGroups.getGroup(group)) {
this._zyreGroups.getGroup(group).send(new ZreMsg(ZreMsg.SHOUT, {
group,
content: message,
}));
}
}

join(group) {
this._groups.push(group);
this._status += 1;
this._zyrePeers.send(new ZreMsg(ZreMsg.JOIN, {
group,
status: this._status,
}));
}

leave(group) {
this._groups.pop(group);
this._status += 1;
this._zyrePeers.send(new ZreMsg(ZreMsg.LEAVE, {
group,
status: this._status,
}));
}
};
Loading

0 comments on commit ba70c63

Please sign in to comment.