Skip to content

Commit

Permalink
add batching
Browse files Browse the repository at this point in the history
  • Loading branch information
P.J.Shand committed Oct 8, 2019
1 parent cf6f2cb commit 00cad5a
Show file tree
Hide file tree
Showing 21 changed files with 684 additions and 199 deletions.
66 changes: 65 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,68 @@ WARNING: Not production ready

The aim of this library is to offer a simple way to setup a p2p communication layer.

This library is currently in the early stages of development, therefore I wouldn't recommend using this for anything in production.
This library is currently in the early stages of development, therefore I wouldn't recommend using this for anything in production.

### Goal

The goal of this library is to create a standardize Peer to Peer API that can be layered on top of multiple types of communication protocols.

### Installing a communication layer
```
P2P.install(ICommunication);
```


### [Notifier](https://github.com/peteshand/notifier) Binding API

As the name suggest notifier binding allows you to bind a notifier to a string Id and then have it synchronise over the p2p network when it's value is changed.

Listen for changes with Id 'test' and assign value to `notifier`

```
P2P.bind(notifier, 'test', P2P.IN);
```

Listen for value changes on `notifier` and when it updates broascast it's value over the network with Id 'test'.

```
P2P.bind(notifier, 'test', P2P.OUT);
```

Both listen and broadcast

```
P2P.bind(notifier, 'test', P2P.IO);
```

.....

```
notifier1.add((value:Int) -> {
trace("value = " + value);
}
var notifier1:Notifier<Int> = new Notifier<Int>(0);
P2P.addSubscriber(notifier1, 'test');
```

```
var notifier1:Notifier<Int> = new Notifier<Int>(0);
P2P.addBroadcast(notifier1, 'test');
notifier1.value = 2;
```

### Basic listener and sender API
```
P2P.listen("messageId", (payload:{foo:String}) -> {
trace("payload.foo = " + payload.foo);
});
```

```
var payload = {
foo:"bar"
};
P2P.send("messageId", payload);
```

4 changes: 2 additions & 2 deletions haxelib.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
"license": "MIT",
"tags": ["p2p", "peer to peer"],
"description": "library for p2p communication",
"version": "0.0.1",
"releasenote": "initial commit, lib currently very limited",
"version": "0.1.2",
"releasenote": "Add batching",
"contributors": [ "p.j.shand" ],
"classPath": "src/",
"dependencies": {
Expand Down
10 changes: 0 additions & 10 deletions src/ICommunication.hx

This file was deleted.

84 changes: 0 additions & 84 deletions src/P2P.hx

This file was deleted.

100 changes: 100 additions & 0 deletions src/comms/Comms.hx
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package comms;

import delay.Delay;
import comms.broadcaster.*;
import comms.subscriber.*;
import comms.connection.IConnection;
import notifier.Notifier;
import notifier.MapNotifier3;
import comms.notifier.*;
import haxe.Json;
import time.EnterFrame;

class Comms {
static var instanceId:Null<Float>;
static public var broadcasters = new Map<String, IBroadcaster>();
static public var subscribers = new Map<String, ISubscriber>();
static var connections:Array<IConnection> = [];
static var listeningToConnect:Bool = false;

static var messages:Array<CommsMessage> = [];

// @:isVar public static var selfListen(default, set):Bool = false;

public function new() {}

public static function install(connection:IConnection) {
if (instanceId == null) {
instanceId = Math.floor(Math.random() * 100000000000);
}
Comms.connections.push(connection);

for (subscriber in subscribers) {
subscriber.addConnection(connection);
}

Delay.killDelay(sendConnectMessage);
Delay.byFrames(30, sendConnectMessage);
}

static function sendConnectMessage() {
if (!listeningToConnect) {
listeningToConnect = true;
on("connect", (value:Dynamic = null) -> {
for (broadcaster in broadcasters) {
broadcaster.setCurrentValue();
}
});
}
send("connect");
EnterFrame.add(tick);
}

public static function addBroadcast<T>(id:String, ?map:MapNotifier3<String, T>, ?notifier:Notifier<T>):Void {
if (notifier != null)
broadcasters.set(id, new NotifierBroadcaster<T>(notifier, id));
if (map != null)
broadcasters.set(id, new MapBroadcaster<T>(map, id));
}

public static function addSubscriber<T>(id:String, ?map:MapNotifier3<String, T>, ?notifier:Notifier<T>):Void {
if (notifier != null)
subscribers.set(id, new NotifierSubscriber<T>(notifier, id));
if (map != null)
subscribers.set(id, new MapSubscriber<T>(map, id));
}

public static function removeBroadcast(notifier:Notifier<Dynamic>, id:String):Void {}

public static function removeSubscriber(notifier:Notifier<Dynamic>, id:String):Void {}

public static function send(id:String, payload:Dynamic = ""):Void {
var message:CommsMessage = {
id: id,
payload: Json.stringify({value: payload})
}
messages.push(message);
}

public static function on(id:String, callback:(payload:Dynamic) -> Void):Void {
for (connection in connections) {
connection.on(id, callback);
}
}

public static function relay<T>(id:String):Void {
Comms.on(id, (payload) -> {
Comms.send(id, payload);
});
}

static function tick() {
for (connection in connections) {
connection.send({
senderId: Comms.instanceId,
messages: messages
});
}
messages = [];
}
}
14 changes: 14 additions & 0 deletions src/comms/CommsMessage.hx
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package comms;

typedef CommsMessage = {
// senderId:Float,
id:String,
payload:String,
// ?remoteHost:String,
// ?remotePort:Int
}

typedef CommsBatch = {
senderId:Float,
messages:Array<CommsMessage>
}
5 changes: 5 additions & 0 deletions src/comms/CommsPayload.hx
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package comms;

typedef CommsPayload = {
value:Dynamic
}
30 changes: 30 additions & 0 deletions src/comms/MulticastAddr.hx
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package comms;

abstract MulticastAddr(String) to String {
public static var DEFAULT_ADDRESS:String = "233.255.255.255";

// VALID RANGE
// 224.0.0.0 to 239.255.255.255
// 3758096384 - 4026531839

public function new(value:String) {
this = validate(value);
}

@:from
static public function fromString(s:String) {
return new MulticastAddr(s);
}

static inline function validate(s:String) {
if (s == null)
return DEFAULT_ADDRESS;
var regex = ~/2(?:2[4-9]|3\d)(?:\.(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d?|0)){3}$/i;
if (regex.match(s)) {
return s;
} else {
trace(s + " is NOT valid, using " + DEFAULT_ADDRESS + " instead");
return DEFAULT_ADDRESS;
}
}
}
6 changes: 6 additions & 0 deletions src/comms/broadcaster/IBroadcaster.hx
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package comms.broadcaster;

interface IBroadcaster
{
function setCurrentValue():Void;
}
43 changes: 43 additions & 0 deletions src/comms/broadcaster/MapBroadcaster.hx
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package comms.broadcaster;

import notifier.MapNotifier3;

@:access(comms.Comms)
class MapBroadcaster<T> implements IBroadcaster {
var map:MapNotifier3<String, T>;
var id:String;

public function new(map:MapNotifier3<String, T>, id:String) {
this.id = id;
this.map = map;

map.onAdd.add(onAdd);
map.onChange.add(onChange);
map.onRemove.add(onRemove);
}

function onAdd(key:String, value:T) {
send(id + ",add", key, value);
}

function onChange(key:String, value:T) {
send(id + ",add", key, value);
}

function onRemove(key:String) {
send(id + ",remove", key, null);
}

function send(commsKey:String, key:String, value:T) {
Comms.send(commsKey, {key: key, value: value});
// for (connection in Comms.connections) {
// connection.send(commsKey, {key: key, value: value});
// }
}

public function setCurrentValue():Void {
for (item in map.keyValueIterator()) {
send(id + ",add", item.key, item.value);
}
}
}
Loading

0 comments on commit 00cad5a

Please sign in to comment.