Skip to content

Commit

Permalink
forward address to new services (#60)
Browse files Browse the repository at this point in the history
Signed-off-by: Matteo Collina <[email protected]>
  • Loading branch information
mcollina authored Jan 30, 2025
1 parent 7436b72 commit eae19ec
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 3 deletions.
16 changes: 13 additions & 3 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ function createThreadInterceptor (opts) {
forwarded.get(port).add(port1)
otherPort.postMessage({ type: 'route', url, port: port2, threadId: port.threadId }, [port2])
port.postMessage({ type: 'route', url: key, port: port1, threadId: otherPort.threadId }, [port1])
// If we have a real address for the other port, we need to forward it
if (otherPort[kAddress]) {
port.postMessage({ type: 'address', url: key, address: otherPort[kAddress], threadId: otherPort.threadId })
}
}
}
}
Expand All @@ -217,9 +221,16 @@ function createThreadInterceptor (opts) {
function onClose () {
const roundRobin = routes.get(url)
roundRobin.remove(port)
for (const f of forwarded.get(port)) {
f.close()

if (forwarded.has(port)) {
for (const f of forwarded.get(port)) {
f.close()
}
// delete all the array of forwarded ports,
// to avoid a memory leak
forwarded.delete(port)
}

for (const cb of portInflights.get(port).values()) {
cb(new Error('Worker exited'))
}
Expand Down Expand Up @@ -358,7 +369,6 @@ function wire ({ server: newServer, port, ...undiciOpts }) {
if (parsedLength < MAX_BODY) {
try {
const body = await collectBody(res.stream())

newRes = {
headers: res.headers,
statusCode: res.statusCode,
Expand Down
68 changes: 68 additions & 0 deletions test/service-restarted.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,71 @@ test('service restart with network / 2', async (t) => {
await res.body.dump()
}
})

test('service restart with network / 3', async (t) => {
const worker1 = new Worker(join(__dirname, 'fixtures', 'network-crash.js'), {
workerData: { name: 'worker1' },
})
t.after(() => worker1.terminate())
const worker2 = new Worker(join(__dirname, 'fixtures', 'network-crash.js'), {
workerData: { name: 'worker2' },
})
t.after(() => worker2.terminate())

const interceptor = createThreadInterceptor({
domain: '.local',
})
interceptor.route('myserver', worker1)
interceptor.route('myserver2', worker2)

const agent = new Agent().compose(interceptor)

await sleep(1000)

{
const res = await request('http://myserver2.local/example', {
dispatcher: agent,
})

strictEqual(res.statusCode, 200)
await res.body.dump()
}

await rejects(request('http://myserver2.local/crash', {
dispatcher: agent,
}))

const worker2bis = new Worker(join(__dirname, 'fixtures', 'network-crash.js'), {
workerData: { name: 'worker2bis' },
})
t.after(() => worker2bis.terminate())

interceptor.route('myserver2', worker2bis)

const composer = new Worker(join(__dirname, 'fixtures', 'composer.js'), {
workerData: { name: 'composer' },
})
t.after(() => composer.terminate())

interceptor.route('composer', composer)

await sleep(2000)

{
const res = await request('http://composer.local/s1/example', {
dispatcher: agent,
})

strictEqual(res.statusCode, 200)
await res.body.dump()
}

{
const res = await request('http://composer.local/s2/example', {
dispatcher: agent,
})

strictEqual(res.statusCode, 200)
await res.body.dump()
}
})

0 comments on commit eae19ec

Please sign in to comment.