Skip to content

Commit

Permalink
Use TransformStream and return SSE stream
Browse files Browse the repository at this point in the history
  • Loading branch information
smaeda-ks committed Mar 30, 2023
1 parent e799871 commit 17ae6ce
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 28 deletions.
14 changes: 13 additions & 1 deletion pages/api/generate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,19 @@ const handler = async (req: Request): Promise<Response> => {
};

const stream = await OpenAIStream(payload);
return new Response(stream);
// return stream response (SSE)
return new Response(
stream, {
headers: new Headers({
// since we don't use browser's EventSource interface, specifying content-type is optional.
// the eventsource-parser library can handle the stream response as SSE, as long as the data format complies with SSE:
// https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#sending_events_from_the_server

// 'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
})
}
);
};

export default handler;
22 changes: 20 additions & 2 deletions pages/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ import Footer from "../components/Footer";
import Github from "../components/GitHub";
import Header from "../components/Header";
import LoadingDots from "../components/LoadingDots";
import {
createParser,
ParsedEvent,
ReconnectInterval,
} from "eventsource-parser";

const Home: NextPage = () => {
const [loading, setLoading] = useState(false);
Expand Down Expand Up @@ -56,15 +61,28 @@ const Home: NextPage = () => {
return;
}

const onParse = (event: ParsedEvent | ReconnectInterval) => {
if (event.type === "event") {
const data = event.data;
try {
const text = JSON.parse(data).text ?? ""
setGeneratedBios((prev) => prev + text);
} catch (e) {
console.error(e);
}
}
}

// https://web.dev/streams/#the-getreader-and-read-methods
const reader = data.getReader();
const decoder = new TextDecoder();
const parser = createParser(onParse);
let done = false;

while (!done) {
const { value, done: doneReading } = await reader.read();
done = doneReading;
const chunkValue = decoder.decode(value);
setGeneratedBios((prev) => prev + chunkValue);
parser.feed(chunkValue);
}
scrollToBios();
setLoading(false);
Expand Down
59 changes: 34 additions & 25 deletions utils/OpenAIStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ export async function OpenAIStream(payload: OpenAIStreamPayload) {
const encoder = new TextEncoder();
const decoder = new TextDecoder();

let counter = 0;

const res = await fetch("https://api.openai.com/v1/chat/completions", {
headers: {
"Content-Type": "application/json",
Expand All @@ -38,34 +36,15 @@ export async function OpenAIStream(payload: OpenAIStreamPayload) {
body: JSON.stringify(payload),
});

const stream = new ReadableStream({
const readableStream = new ReadableStream({
async start(controller) {
// callback
function onParse(event: ParsedEvent | ReconnectInterval) {
const onParse = (event: ParsedEvent | ReconnectInterval) => {
if (event.type === "event") {
const data = event.data;
// https://beta.openai.com/docs/api-reference/completions/create#completions/create-stream
if (data === "[DONE]") {
controller.close();
return;
}
try {
const json = JSON.parse(data);
const text = json.choices[0].delta?.content || "";
if (counter < 2 && (text.match(/\n/) || []).length) {
// this is a prefix character (i.e., "\n\n"), do nothing
return;
}
const queue = encoder.encode(text);
controller.enqueue(queue);
counter++;
} catch (e) {
// maybe parse error
controller.error(e);
}
controller.enqueue(encoder.encode(data));
}
}

// stream response (SSE) from OpenAI may be fragmented into multiple chunks
// this ensures we properly read chunks and invoke an event for each SSE event stream
const parser = createParser(onParse);
Expand All @@ -76,5 +55,35 @@ export async function OpenAIStream(payload: OpenAIStreamPayload) {
},
});

return stream;
let counter = 0;
const transformStream = new TransformStream({
async transform(chunk, controller) {
const data = decoder.decode(chunk);
// https://beta.openai.com/docs/api-reference/completions/create#completions/create-stream
if (data === "[DONE]") {
controller.terminate();
return;
}
try {
const json = JSON.parse(data);
const text = json.choices[0].delta?.content || "";
if (counter < 2 && (text.match(/\n/) || []).length) {
// this is a prefix character (i.e., "\n\n"), do nothing
return;
}
// stream transformed JSON resposne as SSE
const payload = {text: text};
// https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#event_stream_format
controller.enqueue(
encoder.encode(`data: ${JSON.stringify(payload)}\n\n`)
);
counter++;
} catch (e) {
// maybe parse error
controller.error(e);
}
},
});

return readableStream.pipeThrough(transformStream);
}

0 comments on commit 17ae6ce

Please sign in to comment.