Skip to content

Commit

Permalink
Add PPP implementation and supporting interfaces
Browse files Browse the repository at this point in the history
Change-Id: I8addeceb3fd491ecf30ee72fbbd6ea220477b9a4
  • Loading branch information
Hexxeh committed Jan 13, 2022
1 parent 56058b1 commit e2c1a0e
Show file tree
Hide file tree
Showing 7 changed files with 1,546 additions and 0 deletions.
123 changes: 123 additions & 0 deletions src/Interface.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import * as stream from 'stream';

import Interface from './Interface';
import { encode as encodeFrame } from './framing/encoder';
import PPPFrame from './ppp/PPPFrame';
import { encode } from './encodingUtil';

let intf: Interface;
let sink: BufferSink;

class BufferSink extends stream.Duplex {
public data: Buffer[] = [];

constructor() {
super({ allowHalfOpen: false });
}

// eslint-disable-next-line @typescript-eslint/no-empty-function
_read(): void {}

_write(chunk: Buffer, _: string, done: (err?: Error) => void): void {
this.data.push(chunk);
done();
}

getData(): Promise<Buffer[]> {
this.push(null);
return new Promise((resolve) =>
sink.once('close', () => resolve(this.data)),
);
}

waitForClose(): Promise<void> {
return new Promise((resolve) => sink.once('close', () => resolve()));
}
}

beforeEach(() => {
jest.useFakeTimers();
sink = new BufferSink();
intf = Interface.create(sink);
});

afterEach(() => {
jest.useRealTimers();
});

it('sends a packet', async () => {
const packet = Buffer.from('data');
intf.sendPacket(0x8889, packet);

return expect(sink.getData()).resolves.toContainEqual(
encodeFrame(PPPFrame.build(0x8889, packet)),
);
});

it('sends from a socket', () => {
const socket = intf.connect(0xf0f1);
const packet = Buffer.from('data');
socket.send(packet);
return expect(sink.getData()).resolves.toContainEqual(
encodeFrame(PPPFrame.build(0xf0f1, packet)),
);
});

it('receives a packet', async () => {
const socket = intf.connect(0xf0f1);
const packetHandler = jest.fn();
socket.on('data', packetHandler);
sink.push(encodeFrame(PPPFrame.build(0xf0f1, Buffer.from('hello world!'))));
sink.push(null);
await sink.waitForClose();
expect(packetHandler).toBeCalledWith(encode('hello world!'));
});

it('closing interface closes sockets and underlying stream', () => {
const socketA = intf.connect(0xf0f1);
const socketB = intf.connect(0xf0f3);

intf.close();

expect(socketA.closed).toEqual(true);
expect(socketB.closed).toEqual(true);
expect(intf.destroyed).toEqual(true);
});

it('ending underlying stream closes sockets and interface', async () => {
const socket = intf.connect(0xf0f1);

sink.destroy();
jest.runAllTimers();
await sink.waitForClose();

expect(socket.closed).toEqual(true);
expect(intf.destroyed).toEqual(true);
});

it('throws if opening two sockets for same protocol', () => {
intf.connect(0xf0f1);
expect(() => intf.connect(0xf0f1)).toThrowError(
'A socket is already bound to protocol 0xf0f1',
);
});

it('closing one socket allows another to be opened for the same protocol', () => {
const socketA = intf.connect(0xf0f1);
socketA.close();
const socketB = intf.connect(0xf0f1);
expect(socketA).not.toBe(socketB);
});

it('throws if sending on a closed interface', () => {
intf.close();

expect(intf.closed).toEqual(true);
expect(() => intf.sendPacket(0x8889, Buffer.from('data'))).toThrowError(
'I/O operation on closed interface',
);
});

it('ignores corrupted PPP frames', () => {
expect(() => intf.write(encode('?'))).not.toThrowError();
});
112 changes: 112 additions & 0 deletions src/Interface.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import * as stream from 'stream';

import InterfaceSocket from './InterfaceSocket';
import PPPFrame from './ppp/PPPFrame';

import { FrameDecoder } from './framing/decoder';
import { FrameEncoder } from './framing/encoder';
import { FrameSplitter } from './framing/splitter';

export default class Interface extends stream.Duplex {
static create(phy: stream.Duplex): Interface {
const intf = new Interface();
const splitter = new FrameSplitter();
const decoder = new FrameDecoder();
const encoder = new FrameEncoder();

stream.pipeline([phy, splitter, decoder, intf, encoder, phy], () => {
intf.down();
});

return intf;
}

constructor() {
super({ objectMode: true, allowHalfOpen: false });
}

public closed = false;
private sockets: Record<number, InterfaceSocket> = {};

// eslint-disable-next-line @typescript-eslint/no-empty-function
_read(): void {}

_write(chunk: Buffer, _: string, callback: (err?: Error) => void): void {
let frame: PPPFrame;
try {
frame = PPPFrame.parse(chunk);
} catch {
console.warn(`Received malformed PPP frame: ${chunk.toString('hex')}`);
callback();
return;
}

console.log(
`[PHY recv] [protocol:0x${frame.protocol.toString(
16,
)}] ${frame.information.toString('hex')}`,
);

const socket: InterfaceSocket | undefined = this.sockets[frame.protocol];
if (socket !== undefined) {
socket.handlePacket(frame.information);
} else {
// Protocol-reject
}

callback();
}

/*
Open a link-layer socket for sending and receiving packets
of a specific protocol number.
*/
connect(protocol: number): InterfaceSocket {
if (this.sockets[protocol] !== undefined) {
throw new Error(
`A socket is already bound to protocol 0x${protocol.toString(16)}`,
);
}

return (this.sockets[protocol] = new InterfaceSocket(this, protocol));
}

/*
Used by InterfaceSocket objets to unregister themselves when closing.
*/
unregisterSocket(protocol: number): void {
delete this.sockets[protocol];
}

sendPacket(protocol: number, packet: Buffer): void {
if (this.closed) throw new Error('I/O operation on closed interface');
console.log(
`[PHY send] [protocol:0x${protocol.toString(16)}] ${packet.toString(
'hex',
)}`,
);
const datagram = PPPFrame.build(protocol, packet);
this.push(datagram);
}

closeAllSockets(): void {
for (const socket of Object.values(this.sockets)) {
socket.close();
}
}

public close(): void {
if (this.closed) return;
this.closeAllSockets();
this.down();
}

/*
The lower layer (iostream) is down. Bring down the interface.
*/
private down(): void {
this.closed = true;
this.closeAllSockets();
this.destroy();
}
}
71 changes: 71 additions & 0 deletions src/InterfaceSocket.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import Interface from './Interface';
import InterfaceSocket from './InterfaceSocket';

jest.mock('./Interface');

let intf: Interface;
let intfSocket: InterfaceSocket;

beforeEach(() => {
intf = new Interface();
intfSocket = new InterfaceSocket(intf, 0xf2f1);
});

it('is not closed after instanciation', () => {
expect(intfSocket.closed).toEqual(false);
});

it('sends data', () => {
const data = Buffer.from('data');
intfSocket.send(data);
expect(intf.sendPacket).toBeCalledWith(0xf2f1, data);
});

it('marks as close once closed', () => {
intfSocket.close();
expect(intfSocket.closed).toEqual(true);
});

it('unregisters socket from interface upon close', () => {
intfSocket.close();
expect(intf.unregisterSocket).toBeCalledWith(0xf2f1);
});

it('calls close handler on close', () => {
const closeHandler = jest.fn();
intfSocket.once('close', closeHandler);
intfSocket.close();
expect(closeHandler).toBeCalledTimes(1);
});

it('throws when sending after closed', () => {
intfSocket.close();
expect(() => intfSocket.send(Buffer.from('data'))).toThrowError(
'I/O operation on closed socket',
);
});

it('handles a packet', () => {
const dataHandler = jest.fn();
const packet = Buffer.from('data');
intfSocket.once('data', dataHandler);
intfSocket.handlePacket(packet);
expect(dataHandler).toBeCalledWith(packet);
});

it("doesn't emit event for packet if closed", () => {
const dataHandler = jest.fn();
intfSocket.once('data', dataHandler);
intfSocket.close();
intfSocket.handlePacket(Buffer.from('data'));
expect(dataHandler).not.toBeCalled();
});

it('close() is idempotent', () => {
const closeHandler = jest.fn();
intfSocket.once('close', closeHandler);
intfSocket.close();
intfSocket.close();
expect(closeHandler).toBeCalledTimes(1);
expect(intf.unregisterSocket).toBeCalledTimes(1);
});
36 changes: 36 additions & 0 deletions src/InterfaceSocket.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import EventEmitter from 'events';

import Interface from './Interface';

/*
A socket for sending and receiving link-layer packets over a
PULSE interface.
Available events:
- close
- data
*/
export default class InterfaceSocket extends EventEmitter {
public closed = false;

constructor(private intf: Interface, private protocol: number) {
super();
}

public send(packet: Buffer): void {
if (this.closed) throw new Error('I/O operation on closed socket');
this.intf.sendPacket(this.protocol, packet);
}

public handlePacket(packet: Buffer): void {
if (!this.closed) this.emit('data', packet);
}

public close(): void {
if (this.closed) return;
this.closed = true;
this.emit('close');
this.intf.unregisterSocket(this.protocol);
this.removeAllListeners();
}
}
Loading

0 comments on commit e2c1a0e

Please sign in to comment.