diff --git a/bin/server.js b/bin/server.js new file mode 100644 index 0000000..e6762f4 --- /dev/null +++ b/bin/server.js @@ -0,0 +1,3 @@ +import { createWSServer } from '../src/comms/websocket-server.js' + +export const server = await createWSServer() diff --git a/src/comms/websocket-server.js b/src/comms/websocket-server.js index 5b1d0c3..8d97bf0 100644 --- a/src/comms/websocket-server.js +++ b/src/comms/websocket-server.js @@ -21,7 +21,10 @@ import * as encoding from 'lib0/encoding' import * as decoding from 'lib0/decoding' import * as protocol from '../protocol.js' import * as comm from '../comm.js' // eslint-disable-line -import { openYdb } from '../index.js' +import * as ydb from '../index.js' +import * as promise from 'lib0/promise' +import * as error from 'lib0/error' +import * as ran from 'lib0/random' const expectedBufferedAmount = 512 * 1024 // 512kb @@ -88,43 +91,64 @@ class WSClient { } } -const ydb = await openYdb('.ydb-websocket-server', ['*'], {}) - -const port = 9000 +/** + * @param {Object} options + * @param {number} [options.port] + * @param {string} [options.dbname] + */ +export const createWSServer = async ({ port = 9000, dbname = '.ydb-websocket-server' } = {}) => { + const db = await ydb.openYdb(dbname, ['*'], {}) + const server = new WSServer(db, port) + await server.ready + return server +} -uws.App({}).ws('/*', /** @type {uws.WebSocketBehavior<{ client: WSClient }>} */ ({ - /* Options */ - compression: uws.SHARED_COMPRESSOR, - maxPayloadLength: 70 * 1024 * 1024, - idleTimeout: 60, - /* Handlers */ - open: (ws) => { - const client = new WSClient(ws) - ws.getUserData().client = client - client.send(encoding.encode(encoder => { - protocol.writeInfo(encoder, ydb) - })) - }, - message: (ws, message) => { - const decoder = decoding.createDecoder(new Uint8Array(message.slice(0))) // copy buffer because uws will reuse the memory space - const client = ws.getUserData().client - client.queueMessage(async (encoder) => { - await protocol.readMessage(encoder, decoder, ydb, client) - return false +class WSServer { + /** + * @param {ydb.Ydb} ydb + * @param {number} port + */ + constructor (ydb, port) { + this.ready = promise.create((resolve, reject) => { + uws.App({}).ws('/*', /** @type {uws.WebSocketBehavior<{ client: WSClient }>} */ ({ + /* Options */ + compression: uws.SHARED_COMPRESSOR, + maxPayloadLength: 70 * 1024 * 1024, + idleTimeout: 60, + /* Handlers */ + open: (ws) => { + const client = new WSClient(ws) + ws.getUserData().client = client + client.send(encoding.encode(encoder => { + protocol.writeInfo(encoder, ydb) + })) + }, + message: (ws, message) => { + const decoder = decoding.createDecoder(new Uint8Array(message.slice(0))) // copy buffer because uws will reuse the memory space + const client = ws.getUserData().client + client.queueMessage(async (encoder) => { + await protocol.readMessage(encoder, decoder, ydb, client) + return false + }) + }, + drain: ws => { + ws.getUserData().client._drain() + }, + close: ws => { + ws.getUserData().client.destroy() + } + })).any('/*', (res, _req) => { + res.end('Oh no, you found me 🫣') + }).listen(port, (token) => { + if (token) { + console.log('Listening to port ' + port) + resolve(port) + } else { + const m = 'Failed to listen to port ' + port + reject(error.create(m)) + console.log(m) + } + }) }) - }, - drain: ws => { - ws.getUserData().client._drain() - }, - close: ws => { - ws.getUserData().client.destroy() } -})).any('/*', (res, _req) => { - res.end('Oh no, you found me 🫣') -}).listen(port, (token) => { - if (token) { - console.log('Listening to port ' + port) - } else { - console.log('Failed to listen to port ' + port) - } -}) +} diff --git a/tests/helpers.js b/tests/helpers.js index c515b54..9a48fcb 100644 --- a/tests/helpers.js +++ b/tests/helpers.js @@ -5,6 +5,12 @@ import * as Y from 'yjs' import * as array from 'lib0/array' import * as wscomm from '../src/comms/websocket.js' import * as env from 'lib0/environment' +import * as random from 'lib0/random' + +/** + * New test runs shouldn't reuse old data + */ +const randTestRunName = random.uint32().toString(32).slice(0, 8) if (env.isNode) { const fs = await import('fs') @@ -12,15 +18,15 @@ if (env.isNode) { fs.rmSync('./.test_dbs', { recursive: true }) } catch (e) {} try { - fs.rmSync('./.ydb-websocket-server', { recursive: true }) + fs.rmSync(randTestRunName, { recursive: true }) } catch (e) {} - await import('../src/comms/websocket-server.js') + await import('../bin/server.js') } /** * @param {t.TestCase} tc */ -const getDbName = tc => `.test_dbs/${tc.moduleName}/${tc.testName}` +const getDbName = tc => `.test_dbs/${randTestRunName}/${tc.moduleName}/${tc.testName}` export const emptyUpdate = Y.encodeStateAsUpdateV2(new Y.Doc()) @@ -38,7 +44,7 @@ class TestClients { } async createClient () { - const dbname = `${this.name}-${this.cliNum++}` + const dbname = `.test_dbs/${randTestRunName}-${this.name}-${this.cliNum++}` await Ydb.deleteYdb(dbname) const ydb = await Ydb.openYdb(dbname, ['c1', 'c2', 'c3'], { comms: [new wscomm.WebSocketComm('ws://localhost:9000')] diff --git a/tests/ydb.tests.js b/tests/ydb.tests.js index 5aa2a95..7a779cd 100644 --- a/tests/ydb.tests.js +++ b/tests/ydb.tests.js @@ -3,8 +3,8 @@ import * as promise from 'lib0/promise' import * as Ydb from '../src/index.js' import * as helpers from './helpers.js' -import * as actions from '../src/actions.js' -import * as operations from '../src/operations.js' +// import * as actions from '../src/actions.js' +// import * as operations from '../src/operations.js' /** * @param {string} testname @@ -59,10 +59,10 @@ export const testComm = async tc => { const ydoc3 = ydb3.getYdoc('c1', 'ydoc') await ydoc3.whenLoaded t.compare(ydoc3.getMap().get('k'), 'v2') - console.log(await actions.getClocks(ydb1), 'clientid: ', ydb1.clientid) - console.log(await actions.getClocks(ydb2), 'clientid: ', ydb2.clientid) - console.log(await actions.getClocks(ydb3), 'clientid: ', ydb3.clientid) - console.log('updates', ydb1.clientid, await actions.getDocOps(ydb1, 'c1', 'ydoc', operations.OpYjsUpdateType, 0)) - console.log('updates 2', ydb2.clientid, await actions.getDocOps(ydb2, 'c1', 'ydoc', operations.OpYjsUpdateType, 0)) - console.log('updates 3', ydb3.clientid, await actions.getDocOps(ydb3, 'c1', 'ydoc', operations.OpYjsUpdateType, 0)) + // console.log(await actions.getClocks(ydb1), 'clientid: ', ydb1.clientid) + // console.log(await actions.getClocks(ydb2), 'clientid: ', ydb2.clientid) + // console.log(await actions.getClocks(ydb3), 'clientid: ', ydb3.clientid) + // console.log('updates', ydb1.clientid, await actions.getDocOps(ydb1, 'c1', 'ydoc', operations.OpYjsUpdateType, 0)) + // console.log('updates 2', ydb2.clientid, await actions.getDocOps(ydb2, 'c1', 'ydoc', operations.OpYjsUpdateType, 0)) + // console.log('updates 3', ydb3.clientid, await actions.getDocOps(ydb3, 'c1', 'ydoc', operations.OpYjsUpdateType, 0)) }