-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathreceiver.ts
130 lines (106 loc) · 3.79 KB
/
receiver.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import { type Socket, createSocket } from 'dgram';
import { EventEmitter } from 'events';
import { AssertionError } from 'assert';
import { Packet } from './packet';
import { multicastGroup } from './util';
/** @deprecated - use {@link Receiver.Props} instead */
export type ReceiverProps = Receiver.Props;
export declare namespace Receiver {
export interface Props {
/** List of universes to listen to. Must be within `1-63999 */
universes?: number[];
/** The multicast port to use. All professional consoles broadcast to the default port. */
port?: number;
/** local ip address of network inteface to use */
iface?: string;
/** Allow multiple programs on your computer to listen to the same sACN universe. */
reuseAddr?: boolean;
}
export interface EventMap {
packet: Packet;
PacketCorruption: AssertionError;
PacketOutOfOrder: Error;
error: Error;
}
}
export declare interface Receiver {
on<K extends keyof Receiver.EventMap>(
type: K,
listener: (event: Receiver.EventMap[K]) => void,
): this;
}
export class Receiver extends EventEmitter {
private socket: Socket;
private lastSequence: Record<string, number>;
private readonly port: Receiver.Props['port'];
public universes: NonNullable<Receiver.Props['universes']>;
private readonly iface: Receiver.Props['iface'];
constructor({
universes = [1],
port = 5568,
iface = undefined,
reuseAddr = false,
}: Receiver.Props) {
super();
this.universes = universes;
this.port = port;
this.iface = iface;
this.socket = createSocket({ type: 'udp4', reuseAddr });
this.lastSequence = {};
this.socket.on('message', (msg, rinfo) => {
try {
const packet = new Packet(msg, rinfo.address);
// somehow we received a packet for a universe we're not listening to
// silently drop this packet
if (!this.universes.includes(packet.universe)) return;
// we keep track of the last sequence per sender and per universe (see #37)
const key = packet.cid.toString('utf8') + packet.universe;
const outOfOrder =
this.lastSequence[key] &&
Math.abs(this.lastSequence[key]! - packet.sequence) > 20;
const oldSequence = this.lastSequence[key];
this.lastSequence[key] = packet.sequence === 255 ? -1 : packet.sequence;
if (outOfOrder) {
throw new Error(
`Packet significantly out of order in universe ${packet.universe} from ${packet.sourceName} (${oldSequence} -> ${packet.sequence})`,
);
}
this.emit('packet', packet);
} catch (err) {
const event =
err instanceof AssertionError
? 'PacketCorruption'
: 'PacketOutOfOrder';
this.emit(event, err);
}
});
this.socket.on('error', (ex) => this.emit('error', ex));
this.socket.bind(this.port, () => {
for (const uni of this.universes) {
try {
this.socket.addMembership(multicastGroup(uni), this.iface);
} catch (err) {
this.emit('error', err); // emit errors from socket.addMembership
}
}
});
}
public addUniverse(universe: number): this {
// already listening to this one; do nothing
if (this.universes.includes(universe)) return this;
this.socket.addMembership(multicastGroup(universe), this.iface);
this.universes.push(universe);
return this;
}
public removeUniverse(universe: number): this {
// not listening to this one; do nothing
if (!this.universes.includes(universe)) return this;
this.socket.dropMembership(multicastGroup(universe), this.iface);
this.universes = this.universes.filter((n) => n !== universe);
return this;
}
public close(callback?: () => void): this {
this.socket.close(callback);
return this;
}
}