Skip to content

Commit

Permalink
Add explicit system for handling 'upgrade' events (graphile#354)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjie authored Mar 9, 2023
1 parent 1b41196 commit 8b6d6e5
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 12 deletions.
12 changes: 11 additions & 1 deletion @app/lib/src/withApollo.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,24 @@ class WebSocketLink extends ApolloLink {
: ""
)
);
} else {
} else if (Array.isArray(err)) {
sink.error(
new Error(
(err as GraphQLError[])
.map(({ message }) => message)
.join(", ")
)
);
} else {
console.error(
"Error was neither a list nor an instanceof Error?",
err
);
sink.error(
new Error(`Unknown error occurred in the websocket client.`, {
cause: err,
})
);
}
},
}
Expand Down
26 changes: 25 additions & 1 deletion @app/server/src/app.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import express, { Express } from "express";
import { Server } from "http";
import { IncomingMessage, Server } from "http";
import { Middleware } from "postgraphile";
import { Duplex } from "stream";

import { cloudflareIps } from "./cloudflare";
import * as middleware from "./middleware";
Expand All @@ -16,6 +17,19 @@ export function getShutdownActions(app: Express): ShutdownAction[] {
return app.get("shutdownActions");
}

type UpgradeHandlers = Array<{
name: string;
check: (
req: IncomingMessage,
socket: Duplex,
head: Buffer
) => boolean | Promise<boolean>;
upgrade: (req: IncomingMessage, socket: Duplex, head: Buffer) => void;
}>;
export function getUpgradeHandlers(app: Express): UpgradeHandlers {
return app.get("upgradeHandlers");
}

export function getWebsocketMiddlewares(
app: Express
): Middleware<express.Request, express.Response>[] {
Expand Down Expand Up @@ -83,6 +97,16 @@ export async function makeApp({
*/
app.set("shutdownActions", shutdownActions);

/*
* Since multiple things in our server might want to handle websockets and
* this is handled in node via the 'upgrade' event which should have one handler
* only, we need a centralised location to listen for upgrade events that then
* decides which handler to dispatch the event to. This array stores these
* handlers.
*/
const upgradeHandlers: UpgradeHandlers = [];
app.set("upgradeHandlers", upgradeHandlers);

/*
* When we're using websockets, we may want them to have access to
* sessions/etc for authentication.
Expand Down
44 changes: 42 additions & 2 deletions @app/server/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
#!/usr/bin/env node
/* eslint-disable no-console */
import { createServer } from "http";
import { createServer, IncomingMessage } from "http";
import { Duplex } from "stream";

import { getShutdownActions, makeApp } from "./app";
import { getShutdownActions, getUpgradeHandlers, makeApp } from "./app";

// @ts-ignore
const packageJson = require("../../../package.json");

const isDev = process.env.NODE_ENV === "development";

async function main() {
const { default: chalk } = await import("chalk");

Expand All @@ -19,6 +22,41 @@ async function main() {
// Add our application to our HTTP server
httpServer.addListener("request", app);

const upgradeHandlers = getUpgradeHandlers(app);
async function handleUpgrade(
req: IncomingMessage,
socket: Duplex,
head: Buffer
) {
if (isDev && httpServer.listeners("upgrade").length > 1) {
console.error(httpServer.listeners("upgrade").map((f) => f.toString()));
throw new Error(`ERROR: more than one upgrade listener!`);
}
try {
for (const upgradeHandler of upgradeHandlers) {
if (await upgradeHandler.check(req, socket, head)) {
upgradeHandler.upgrade(req, socket, head);
return;
}
}
// No handler matched:
socket.destroy();
} catch (e) {
console.error(
`Error occurred whilst trying to handle 'upgrade' event:`,
e
);
socket.destroy();
}
}

if (upgradeHandlers.length > 0) {
if (isDev && httpServer.listeners("upgrade").length > 0) {
throw new Error(`ERROR: we already have an upgrade listener!`);
}
httpServer.addListener("upgrade", handleUpgrade);
}

// And finally, we open the listen port
const PORT = parseInt(process.env.PORT || "", 10) || 3000;
httpServer.listen(PORT, () => {
Expand Down Expand Up @@ -52,6 +90,8 @@ async function main() {
// Nodemon SIGUSR2 handling
const shutdownActions = getShutdownActions(app);
shutdownActions.push(() => {
httpServer.removeListener("request", app);
httpServer.removeListener("upgrade", handleUpgrade);
httpServer.close();
});
}
Expand Down
34 changes: 30 additions & 4 deletions @app/server/src/middleware/installPostGraphile.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
import { Express, Request, Response } from "express";
import { createServer } from "http";
import { enhanceHttpServerWithSubscriptions, postgraphile } from "postgraphile";

import { getHttpServer, getWebsocketMiddlewares } from "../app";
import {
getHttpServer,
getUpgradeHandlers,
getWebsocketMiddlewares,
} from "../app";
import { getPostGraphileOptions } from "../graphile.config";
import { getAuthPgPool, getRootPgPool } from "./installDatabasePools";

export default function installPostGraphile(app: Express) {
export default async function installPostGraphile(app: Express) {
const websocketMiddlewares = getWebsocketMiddlewares(app);
const authPgPool = getAuthPgPool(app);
const rootPgPool = getRootPgPool(app);
const httpServer = getHttpServer(app);
// Forbid PostGraphile from adding websocket listeners to httpServer
(httpServer as any)["__postgraphileSubscriptionsEnabled"] = true;
const middleware = postgraphile<Request, Response>(
authPgPool,
"app_public",
Expand All @@ -23,7 +30,26 @@ export default function installPostGraphile(app: Express) {

app.use(middleware);

if (httpServer) {
enhanceHttpServerWithSubscriptions(httpServer, middleware);
// Extract the upgrade handler from PostGraphile so we can mix it with
// other upgrade handlers.
const fakeHttpServer = createServer();
await enhanceHttpServerWithSubscriptions(fakeHttpServer, middleware);
const postgraphileUpgradeHandler = fakeHttpServer.listeners(
"upgrade"
)[0] as any;
// Prevent PostGraphile registering its websocket handler

// Now handle websockets
if (postgraphileUpgradeHandler) {
const upgradeHandlers = getUpgradeHandlers(app);
upgradeHandlers.push({
name: "PostGraphile",
check(req) {
return (
(req.url === "/graphql" || req.url?.startsWith("/graphql?")) ?? false
);
},
upgrade: postgraphileUpgradeHandler,
});
}
}
33 changes: 29 additions & 4 deletions @app/server/src/middleware/installSSR.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,33 @@
// TODO: fix to 'import next' when next fixes the bug
import { Express } from "express";
import { createServer } from "http";
import next from "next";
import { parse } from "url";

import { getUpgradeHandlers } from "../app";

if (!process.env.NODE_ENV) {
throw new Error("No NODE_ENV envvar! Try `export NODE_ENV=development`");
}

const isDev = process.env.NODE_ENV === "development";

export default async function installSSR(app: Express) {
// @ts-ignore Next had a bad typing file, they claim `export default` but should have `export =`
// Ref: https://unpkg.com/[email protected]/dist/server/next.js
const fakeHttpServer = createServer();
const nextApp = next({
dev: isDev,
dir: `${__dirname}/../../../client/src`,
quiet: !isDev,
// Don't specify 'conf' key

// Trick Next.js into adding its upgrade handler here, so we can extract
// it. Calling `getUpgradeHandler()` is insufficient because that doesn't
// handle the assets.
httpServer: fakeHttpServer,
});
const handlerPromise = (async () => {
await nextApp.prepare();
return nextApp.getRequestHandler();
})();
// Foo
handlerPromise.catch((e) => {
console.error("Error occurred starting Next.js; aborting process");
console.error(e);
Expand All @@ -42,4 +47,24 @@ export default async function installSSR(app: Express) {
},
});
});

// Now handle websockets
if (!(nextApp as any).getServer) {
console.warn(
`Our Next.js workaround for getting the upgrade handler without giving Next.js dominion over all websockets might no longer work - nextApp.getServer (private API) is no more.`
);
} else {
await (nextApp as any).getServer();
}
const nextJsUpgradeHandler = fakeHttpServer.listeners("upgrade")[0] as any;
if (nextJsUpgradeHandler) {
const upgradeHandlers = getUpgradeHandlers(app);
upgradeHandlers.push({
name: "Next.js",
check(req) {
return req.url?.includes("/_next/") ?? false;
},
upgrade: nextJsUpgradeHandler,
});
}
}

0 comments on commit 8b6d6e5

Please sign in to comment.