Skip to content

Commit

Permalink
better abstractions & reusability for ws server
Browse files Browse the repository at this point in the history
  • Loading branch information
dmonad committed Oct 6, 2023
1 parent b607e29 commit 1b80ab8
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 50 deletions.
3 changes: 3 additions & 0 deletions bin/server.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import { createWSServer } from '../src/comms/websocket-server.js'

export const server = await createWSServer()
100 changes: 62 additions & 38 deletions src/comms/websocket-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ import * as encoding from 'lib0/encoding'
import * as decoding from 'lib0/decoding'
import * as protocol from '../protocol.js'
import * as comm from '../comm.js' // eslint-disable-line
import { openYdb } from '../index.js'
import * as ydb from '../index.js'
import * as promise from 'lib0/promise'
import * as error from 'lib0/error'
import * as ran from 'lib0/random'

const expectedBufferedAmount = 512 * 1024 // 512kb

Expand Down Expand Up @@ -88,43 +91,64 @@ class WSClient {
}
}

const ydb = await openYdb('.ydb-websocket-server', ['*'], {})

const port = 9000
/**
* @param {Object} options
* @param {number} [options.port]
* @param {string} [options.dbname]
*/
export const createWSServer = async ({ port = 9000, dbname = '.ydb-websocket-server' } = {}) => {
const db = await ydb.openYdb(dbname, ['*'], {})
const server = new WSServer(db, port)
await server.ready
return server
}

uws.App({}).ws('/*', /** @type {uws.WebSocketBehavior<{ client: WSClient }>} */ ({
/* Options */
compression: uws.SHARED_COMPRESSOR,
maxPayloadLength: 70 * 1024 * 1024,
idleTimeout: 60,
/* Handlers */
open: (ws) => {
const client = new WSClient(ws)
ws.getUserData().client = client
client.send(encoding.encode(encoder => {
protocol.writeInfo(encoder, ydb)
}))
},
message: (ws, message) => {
const decoder = decoding.createDecoder(new Uint8Array(message.slice(0))) // copy buffer because uws will reuse the memory space
const client = ws.getUserData().client
client.queueMessage(async (encoder) => {
await protocol.readMessage(encoder, decoder, ydb, client)
return false
class WSServer {
/**
* @param {ydb.Ydb} ydb
* @param {number} port
*/
constructor (ydb, port) {
this.ready = promise.create((resolve, reject) => {
uws.App({}).ws('/*', /** @type {uws.WebSocketBehavior<{ client: WSClient }>} */ ({
/* Options */
compression: uws.SHARED_COMPRESSOR,
maxPayloadLength: 70 * 1024 * 1024,
idleTimeout: 60,
/* Handlers */
open: (ws) => {
const client = new WSClient(ws)
ws.getUserData().client = client
client.send(encoding.encode(encoder => {
protocol.writeInfo(encoder, ydb)
}))
},
message: (ws, message) => {
const decoder = decoding.createDecoder(new Uint8Array(message.slice(0))) // copy buffer because uws will reuse the memory space
const client = ws.getUserData().client
client.queueMessage(async (encoder) => {
await protocol.readMessage(encoder, decoder, ydb, client)
return false
})
},
drain: ws => {
ws.getUserData().client._drain()
},
close: ws => {
ws.getUserData().client.destroy()
}
})).any('/*', (res, _req) => {
res.end('Oh no, you found me 🫣')
}).listen(port, (token) => {
if (token) {
console.log('Listening to port ' + port)
resolve(port)
} else {
const m = 'Failed to listen to port ' + port
reject(error.create(m))
console.log(m)
}
})
})
},
drain: ws => {
ws.getUserData().client._drain()
},
close: ws => {
ws.getUserData().client.destroy()
}
})).any('/*', (res, _req) => {
res.end('Oh no, you found me 🫣')
}).listen(port, (token) => {
if (token) {
console.log('Listening to port ' + port)
} else {
console.log('Failed to listen to port ' + port)
}
})
}
14 changes: 10 additions & 4 deletions tests/helpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,28 @@ import * as Y from 'yjs'
import * as array from 'lib0/array'
import * as wscomm from '../src/comms/websocket.js'
import * as env from 'lib0/environment'
import * as random from 'lib0/random'

/**
* New test runs shouldn't reuse old data
*/
const randTestRunName = random.uint32().toString(32).slice(0, 8)

if (env.isNode) {
const fs = await import('fs')
try {
fs.rmSync('./.test_dbs', { recursive: true })
} catch (e) {}
try {
fs.rmSync('./.ydb-websocket-server', { recursive: true })
fs.rmSync(randTestRunName, { recursive: true })
} catch (e) {}
await import('../src/comms/websocket-server.js')
await import('../bin/server.js')
}

/**
* @param {t.TestCase} tc
*/
const getDbName = tc => `.test_dbs/${tc.moduleName}/${tc.testName}`
const getDbName = tc => `.test_dbs/${randTestRunName}/${tc.moduleName}/${tc.testName}`

export const emptyUpdate = Y.encodeStateAsUpdateV2(new Y.Doc())

Expand All @@ -38,7 +44,7 @@ class TestClients {
}

async createClient () {
const dbname = `${this.name}-${this.cliNum++}`
const dbname = `.test_dbs/${randTestRunName}-${this.name}-${this.cliNum++}`
await Ydb.deleteYdb(dbname)
const ydb = await Ydb.openYdb(dbname, ['c1', 'c2', 'c3'], {
comms: [new wscomm.WebSocketComm('ws://localhost:9000')]
Expand Down
16 changes: 8 additions & 8 deletions tests/ydb.tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ import * as promise from 'lib0/promise'

import * as Ydb from '../src/index.js'
import * as helpers from './helpers.js'
import * as actions from '../src/actions.js'
import * as operations from '../src/operations.js'
// import * as actions from '../src/actions.js'
// import * as operations from '../src/operations.js'

/**
* @param {string} testname
Expand Down Expand Up @@ -59,10 +59,10 @@ export const testComm = async tc => {
const ydoc3 = ydb3.getYdoc('c1', 'ydoc')
await ydoc3.whenLoaded
t.compare(ydoc3.getMap().get('k'), 'v2')
console.log(await actions.getClocks(ydb1), 'clientid: ', ydb1.clientid)
console.log(await actions.getClocks(ydb2), 'clientid: ', ydb2.clientid)
console.log(await actions.getClocks(ydb3), 'clientid: ', ydb3.clientid)
console.log('updates', ydb1.clientid, await actions.getDocOps(ydb1, 'c1', 'ydoc', operations.OpYjsUpdateType, 0))
console.log('updates 2', ydb2.clientid, await actions.getDocOps(ydb2, 'c1', 'ydoc', operations.OpYjsUpdateType, 0))
console.log('updates 3', ydb3.clientid, await actions.getDocOps(ydb3, 'c1', 'ydoc', operations.OpYjsUpdateType, 0))
// console.log(await actions.getClocks(ydb1), 'clientid: ', ydb1.clientid)
// console.log(await actions.getClocks(ydb2), 'clientid: ', ydb2.clientid)
// console.log(await actions.getClocks(ydb3), 'clientid: ', ydb3.clientid)
// console.log('updates', ydb1.clientid, await actions.getDocOps(ydb1, 'c1', 'ydoc', operations.OpYjsUpdateType, 0))
// console.log('updates 2', ydb2.clientid, await actions.getDocOps(ydb2, 'c1', 'ydoc', operations.OpYjsUpdateType, 0))
// console.log('updates 3', ydb3.clientid, await actions.getDocOps(ydb3, 'c1', 'ydoc', operations.OpYjsUpdateType, 0))
}

0 comments on commit 1b80ab8

Please sign in to comment.