Skip to content

Commit

Permalink
Transfer the payloads using streams (#38)
Browse files Browse the repository at this point in the history
* added basic message port streams

Signed-off-by: Matteo Collina <[email protected]>

* moar streams

Signed-off-by: Matteo Collina <[email protected]>

* fixup

Signed-off-by: Matteo Collina <[email protected]>

* handle port closing

Signed-off-by: Matteo Collina <[email protected]>

* post bodies as stream

Signed-off-by: Matteo Collina <[email protected]>

* working

Signed-off-by: Matteo Collina <[email protected]>

* speedup

Signed-off-by: Matteo Collina <[email protected]>

* fixup

Signed-off-by: Matteo Collina <[email protected]>

* todos

Signed-off-by: Matteo Collina <[email protected]>

* fixup

Signed-off-by: Matteo Collina <[email protected]>

* fixup

Signed-off-by: Matteo Collina <[email protected]>

* 100% code cov

Signed-off-by: Matteo Collina <[email protected]>

* fixup

Signed-off-by: Matteo Collina <[email protected]>

---------

Signed-off-by: Matteo Collina <[email protected]>
  • Loading branch information
mcollina authored Jan 9, 2025
1 parent 60a1552 commit 0a1d01d
Show file tree
Hide file tree
Showing 19 changed files with 683 additions and 60 deletions.
12 changes: 9 additions & 3 deletions bench/bench.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@ interceptor.route('myserver', worker)

const agent = new Agent().compose(interceptor)

async function performRequest () {
const res = await request('http://myserver.local', {
dispatcher: agent,
})

await res.body.text()
}

console.time('request')
const responses = []
for (let i = 0; i < 100000; i++) {
responses.push(request('http://myserver.local', {
dispatcher: agent,
}))
responses.push(performRequest())
}
await Promise.all(responses)
console.timeEnd('request')
Expand Down
146 changes: 107 additions & 39 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ const inject = require('light-my-request')
const Hooks = require('./lib/hooks')
const DispatchController = require('./lib/dispatch-controller')
const WrapHandler = require('./lib/wrap-handler')
const { MessagePortWritable, MessagePortReadable } = require('./lib/message-port-streams')

const kAddress = Symbol('undici-thread-interceptor.address')

const MAX_BODY = 32 * 1024

function createThreadInterceptor (opts) {
const routes = new Map()
const portInflights = new Map()
Expand Down Expand Up @@ -72,17 +75,18 @@ function createThreadInterceptor (opts) {
const clientCtx = {}
hooks.fireOnClientRequest(newOpts, clientCtx)

if (newOpts.body?.[Symbol.asyncIterator]) {
collectBodyAndDispatch(newOpts, handler).then(() => {
port.postMessage({ type: 'request', id, opts: newOpts, threadId })
}, (err) => {
clearTimeout(handle)
hooks.fireOnClientError(newOpts, null, err)
handler.onResponseError(controller, err)
if (typeof newOpts.body?.resume === 'function' || newOpts.body?.[Symbol.asyncIterator]) {
const body = newOpts.body
delete newOpts.body
const transferable = MessagePortWritable.asTransferable({
body
})

port.postMessage({ type: 'request', id, opts: newOpts, port: transferable.port, threadId }, transferable.transferList)
} else {
port.postMessage({ type: 'request', id, opts: newOpts, threadId })
}

const inflights = portInflights.get(port)

let handle
Expand Down Expand Up @@ -119,20 +123,49 @@ function createThreadInterceptor (opts) {
)
// TODO(mcollina): I don't think this can be triggered,
// but we should consider adding a test for this in the future
/* c8 ignore next 4 */
/* c8 ignore next 6 */
if (controller.aborted) {
res.port?.close()
handler.onResponseError(controller, controller.reason)
return
}
} catch (err) {
// No need to close the transferable port here, because it cannot happen
// for requests with a body
handler.onResponseError(controller, err)
return
}

handler.onResponseData(controller, res.rawPayload)
handler.onResponseEnd(controller, [])
if (res.port) {
const body = new MessagePortReadable({
port: res.port
})

controller.on('resume', () => {
body.resume()
})

controller.on('pause', () => {
body.pause()
})

body.on('data', (chunk) => {
handler.onResponseData(controller, chunk)
})

body.on('end', () => {
handler.onResponseEnd(controller, [])
hooks.fireOnClientResponseEnd(newOpts, res, clientCtx)
})

hooks.fireOnClientResponseEnd(newOpts, res, clientCtx)
body.on('error', (err) => {
handler.onResponseError(controller, err)
})
} else {
handler.onResponseData(controller, res.body)
handler.onResponseEnd(controller, [])
hooks.fireOnClientResponseEnd(newOpts, res, clientCtx)
}
}))

return true
Expand Down Expand Up @@ -282,7 +315,14 @@ function wire ({ server: newServer, port, ...undiciOpts }) {

function onMessage (msg) {
if (msg.type === 'request') {
const { id, opts } = msg
const { id, opts, port: bodyPort } = msg
let bodyReadable

if (bodyPort) {
bodyReadable = new MessagePortReadable({
port: bodyPort
})
}

const headers = {}

Expand All @@ -297,41 +337,70 @@ function wire ({ server: newServer, port, ...undiciOpts }) {
url: opts.path,
headers,
query: opts.query,
body: opts.body instanceof Uint8Array ? Buffer.from(opts.body) : opts.body,
body: opts.body || bodyReadable,
payloadAsStream: true
}
interceptor.hooks.fireOnServerRequest(injectOpts, () => {
const onInject = (err, res) => {
const onInject = async (err, res) => {
if (err) {
interceptor.hooks.fireOnServerError(injectOpts, res, err)
port.postMessage({ type: 'response', id, err })
return
}

const newRes = {
headers: res.headers,
statusCode: res.statusCode,
}

if (res.headers['content-type']?.indexOf('application/json') === 0) {
// TODO(mcollina): maybe use a fast path also for HTML
// fast path because it's utf-8, use a string
newRes.rawPayload = res.payload
const length = res.headers['content-length']
const parsedLength = length === undefined ? MAX_BODY : Number(length)

let newRes
let forwardRes
let transferList

if (parsedLength < MAX_BODY) {
try {
const body = await collectBody(res.stream())

newRes = {
headers: res.headers,
statusCode: res.statusCode,
body
}

forwardRes = {
type: 'response',
id,
res: newRes,
}
} catch (err) {
forwardRes = {
type: 'response',
id,
err
}
}
} else {
// slow path, buffer
newRes.rawPayload = res.rawPayload
}

const forwardRes = {
type: 'response',
id,
res: newRes,
const transferable = MessagePortWritable.asTransferable({
body: res.stream()
})
transferList = transferable.transferList

newRes = {
headers: res.headers,
statusCode: res.statusCode,
port: transferable.port,
}

forwardRes = {
type: 'response',
id,
res: newRes,
}
}

interceptor.hooks.fireOnServerResponse(injectOpts, newRes)

// So we route the message back to the port
// that sent the request
this.postMessage(forwardRes)
this.postMessage(forwardRes, transferList)
}

if (!server) {
Expand Down Expand Up @@ -363,19 +432,18 @@ function wire ({ server: newServer, port, ...undiciOpts }) {
return { interceptor, replaceServer }
}

async function collectBodyAndDispatch (opts) {
async function collectBody (stream) {
const data = []

for await (const chunk of opts.body) {
for await (const chunk of stream) {
data.push(chunk)
}

if (typeof data[0] === 'string') {
opts.body = data.join('')
} else if (data[0] instanceof Buffer || data[0] instanceof Uint8Array) {
opts.body = Buffer.concat(data)
/* c8 ignore next 7 */
if (data[0] instanceof Buffer || data[0] instanceof Uint8Array) {
return Buffer.concat(data)
} else {
throw new Error('Cannot transfer streams of objects')
throw new Error('Cannot transfer streams of strings or objects')
}
}

Expand Down
6 changes: 5 additions & 1 deletion lib/dispatch-controller.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
'use strict'

class DispatchController {
const { EventEmitter } = require('node:events')

class DispatchController extends EventEmitter {
#paused = false
#reason = null
#aborted = false
Expand All @@ -19,10 +21,12 @@ class DispatchController {

pause () {
this.#paused = true
this.emit('pause')
}

resume () {
this.#paused = false
this.emit('resume')
}

abort (reason) {
Expand Down
123 changes: 123 additions & 0 deletions lib/message-port-streams.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
'use strict'

const { Writable, Readable, pipeline } = require('node:stream')

class MessagePortWritable extends Writable {
#otherSideDestroyed = false

constructor ({ port }) {
super({ decodeStrings: false })
this.messagePort = port
this._callback = null

this.messagePort.on('message', (control) => {
if (control.more) {
const callback = this._callback
this._callback = null
if (callback) {
callback()
}
} else if (control.err) {
this.#otherSideDestroyed = true
this.destroy(control.err)
}
})

this.messagePort.on('close', () => {
if (!this.destroyed && !this.writableFinished) {
this.destroy(new Error('message port closed'))
}
})
}

_write (chunk, encoding, callback) {
this.messagePort.postMessage({ chunks: [chunk] })
this._callback = callback
}

_writev (chunks, callback) {
const toWrite = new Array(chunks.length)
for (let i = 0; i < chunks.length; i++) {
toWrite[i] = chunks[i].chunk
}
this.messagePort.postMessage({ chunks: toWrite })
this._callback = callback
}

_destroy (err, callback) {
if (!this.#otherSideDestroyed) {
if (err) {
this.messagePort.postMessage({ err })
} else {
this.messagePort.postMessage({ fin: true })
}
}
setImmediate(() => {
this.messagePort.close()
callback(err)
})
}

static asTransferable ({ body, worker }) {
const channel = new MessageChannel()
const stream = new MessagePortWritable({
port: channel.port1
})

// We cork the writable side so that we can fill the stream with all data ready to be read
stream.cork()
pipeline(body, stream, () => {
// nothing do do here, we consume the stream and ignore errors
})
process.nextTick(() => {
stream.uncork()
})

return { port: channel.port2, transferList: [channel.port2], stream }
}
}

module.exports.MessagePortWritable = MessagePortWritable

class MessagePortReadable extends Readable {
#otherSideDestroyed = false

constructor ({ port }) {
super({ decodeStrings: false })
this.messagePort = port
this.messagePort.on('message', (msg) => {
if (Array.isArray(msg.chunks)) {
for (const c of msg.chunks) {
this.push(c)
}
} else if (msg.fin) {
this.push(null)
} else if (msg.err) {
this.#otherSideDestroyed = true
this.destroy(msg.err)
}
})

this.messagePort.on('close', () => {
if (!this.destroyed && !this.readableEnded) {
this.destroy(new Error('message port closed'))
}
})
}

_read () {
this.messagePort.postMessage({ more: true })
}

_destroy (err, callback) {
if (err && !this.#otherSideDestroyed) {
this.messagePort.postMessage({ err })
}
setImmediate(() => {
this.messagePort.close()
callback(err)
})
}
}

module.exports.MessagePortReadable = MessagePortReadable
Loading

0 comments on commit 0a1d01d

Please sign in to comment.