Skip to content

Commit

Permalink
abstracted streaming logic in helper function
Browse files Browse the repository at this point in the history
  • Loading branch information
Nutlope committed Jan 23, 2023
1 parent 733d66b commit e696fd4
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 57 deletions.
60 changes: 3 additions & 57 deletions pages/api/generate.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
import type { NextRequest } from "next/server";
import {
createParser,
ParsedEvent,
ReconnectInterval,
} from "eventsource-parser";
import { OpenAIStream, OpenAIStreamPayload } from "../../utils/OpenAIStream";

if (!process.env.OPENAI_API_KEY) {
throw new Error("Missing env var from OpenAI");
Expand All @@ -22,10 +18,7 @@ const handler = async (req: NextRequest): Promise<Response> => {
return new Response("No prompt in the request", { status: 400 });
}

const encoder = new TextEncoder();
const decoder = new TextDecoder();

const payload = {
const payload: OpenAIStreamPayload = {
model: "text-davinci-003",
prompt,
temperature: 0.7,
Expand All @@ -37,54 +30,7 @@ const handler = async (req: NextRequest): Promise<Response> => {
n: 1,
};

const res = await fetch("https://api.openai.com/v1/completions", {
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${process.env.OPENAI_API_KEY ?? ""}`,
},
method: "POST",
body: JSON.stringify(payload),
});

let counter = 0;
const stream = new ReadableStream({
async start(controller) {
// callback
function onParse(event: ParsedEvent | ReconnectInterval) {
if (event.type === "event") {
const data = event.data;
// https://beta.openai.com/docs/api-reference/completions/create#completions/create-stream
if (data === "[DONE]") {
controller.close();
return;
}
try {
const json = JSON.parse(data);
const text = json.choices[0].text;
if (counter < 2 && (text.match(/\n/) || []).length) {
// this is a prefix character (i.e., "\n\n"), do nothing
return;
}
const queue = encoder.encode(text);
controller.enqueue(queue);
counter++;
} catch (e) {
// maybe parse error
controller.error(e);
}
}
}

// stream response (SSE) from OpenAI may be fragmented into multiple chunks
// this ensures we properly read chunks and invoke an event for each SSE event stream
const parser = createParser(onParse);
// https://web.dev/streams/#asynchronous-iteration
for await (const chunk of res.body as any) {
parser.feed(decoder.decode(chunk));
}
},
});

const stream = await OpenAIStream(payload);
return new Response(stream);
};

Expand Down
73 changes: 73 additions & 0 deletions utils/OpenAIStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import {
createParser,
ParsedEvent,
ReconnectInterval,
} from "eventsource-parser";

export interface OpenAIStreamPayload {
model: string;
prompt: string;
temperature: number;
top_p: number;
frequency_penalty: number;
presence_penalty: number;
max_tokens: number;
stream: boolean;
n: number;
}

export async function OpenAIStream(payload: OpenAIStreamPayload) {
const encoder = new TextEncoder();
const decoder = new TextDecoder();

let counter = 0;

const res = await fetch("https://api.openai.com/v1/completions", {
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${process.env.OPENAI_API_KEY ?? ""}`,
},
method: "POST",
body: JSON.stringify(payload),
});

const stream = new ReadableStream({
async start(controller) {
// callback
function onParse(event: ParsedEvent | ReconnectInterval) {
if (event.type === "event") {
const data = event.data;
// https://beta.openai.com/docs/api-reference/completions/create#completions/create-stream
if (data === "[DONE]") {
controller.close();
return;
}
try {
const json = JSON.parse(data);
const text = json.choices[0].text;
if (counter < 2 && (text.match(/\n/) || []).length) {
// this is a prefix character (i.e., "\n\n"), do nothing
return;
}
const queue = encoder.encode(text);
controller.enqueue(queue);
counter++;
} catch (e) {
// maybe parse error
controller.error(e);
}
}
}

// stream response (SSE) from OpenAI may be fragmented into multiple chunks
// this ensures we properly read chunks and invoke an event for each SSE event stream
const parser = createParser(onParse);
// https://web.dev/streams/#asynchronous-iteration
for await (const chunk of res.body as any) {
parser.feed(decoder.decode(chunk));
}
},
});

return stream;
}

0 comments on commit e696fd4

Please sign in to comment.