這篇文章將著眼於使用JavaScript Streams API,它允許進行fetch HTTP 呼叫並以區塊的形式接收串流回應,這允許客戶端開始更多地回應伺服器回應快速建構像ChatGPT 這樣的UI。
作為一個激勵性的範例,我們將實作一個函數來處理來自 OpenAI(或任何使用相同 http 流 API 的伺服器)的串流 LLM 回應,不使用 npm 依賴項,僅使用內建的 fetch。完整的程式碼在這裡,包括指數退避重試、嵌入、非串流聊天以及用於與聊天完成和嵌入互動的更簡單的 API。
如果您有興趣了解如何將 HTTP 流傳回給客戶端,請查看這篇文章。
這是完整的範例。我們將看看下面的每個部分:
async function createChatCompletion(body: ChatCompletionCreateParams) { // Making the request const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com"; const response = await fetch(baseUrl + "/v1/chat/completions", { method: "POST", headers: { "Content-Type": "application/json", "Authorization": "Bearer " + process.env.LLM_API_KEY, }, body: JSON.stringify(body), }); // Handling errors if (!response.ok) { const error = await response.text(); throw new Error(`Failed (${response.status}): ${error}`, } if (!body.stream) { // the non-streaming case return response.json(); } const stream = response.body; if (!stream) throw new Error("No body in response"); // Returning an async iterator return { [Symbol.asyncIterator]: async function* () { for await (const data of splitStream(stream)) { // Handling the OpenAI HTTP streaming protocol if (data.startsWith("data:")) { const json = data.substring("data:".length).trimStart(); if (json.startsWith("[DONE]")) { return; } yield JSON.parse(json); } } }, }; } // Reading the stream async function* splitStream(stream: ReadableStream<Uint8Array>) { const reader = stream.getReader(); let lastFragment = ""; try { while (true) { const { value, done } = await reader.read(); if (done) { // Flush the last fragment now that we're done if (lastFragment !== "") { yield lastFragment; } break; } const data = new TextDecoder().decode(value); lastFragment += data; const parts = lastFragment.split("\n\n"); // Yield all except for the last part for (let i = 0; i < parts.length - 1; i += 1) { yield parts[i]; } // Save the last part as the new last fragment lastFragment = parts[parts.length - 1]; } } finally { reader.releaseLock(); } }
請參閱此處的程式碼,以了解具有串流和非串流參數變體的良好類型重載的版本,以及重試和其他改進。
貼文的其餘部分是關於理解這段程式碼的作用。
這部分其實很簡單。串流 HTTP 回應來自普通的 HTTP 請求:
const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com"; const response = await fetch(baseUrl + "/v1/chat/completions", { method: "POST", headers: { "Content-Type": "application/json", "Authorization": "Bearer " + process.env.LLM_API_KEY, }, body: JSON.stringify(body), });
HTTP 標頭以平常方式傳送,無需特別設定任何內容即可啟用串流。您仍然可以利用常規快取標頭進行 HTTP 串流。
關於客戶端錯誤的故事對於 HTTP 流來說有點不幸。好處是,對於 HTTP 串流傳輸,用戶端會在初始回應中立即取得狀態碼,並且可以偵測到故障。 http 協定的缺點是,如果伺服器回傳成功,但隨後在流中中斷,則協定層級沒有任何內容可以告訴用戶端流已中斷。我們將在下面看到 OpenAI 如何在最後編碼「全部完成」哨兵來解決這個問題。
if (!response.ok) { const error = await response.text(); throw new Error(`Failed (${response.status}): ${error}`, }
為了讀取 HTTP 串流回應,客戶端可以使用 response.body 屬性,該屬性是一個 ReadableStream,允許您使用 .getReader() 方法迭代從伺服器傳入的區塊。 1
const reader = request.body.getReader(); try { while (true) { const { value, done } = await reader.read(); if (done) break; const text = TextDecoder().decode(value); //... do something with the chunk } } finally { reader.releaseLock(); }
這會處理我們返回的每一位數據,但對於OpenAI HTTP 協議,我們期望數據是由換行符分隔的JSON,因此我們將拆分響應正文並按每行的形式“生成”它們重新完成。我們將正在進行的行緩衝到lastFragment中,並且只傳回由兩個換行符號分隔的完整行:
// stream here is request.body async function* splitStream(stream: ReadableStream<Uint8Array>) { const reader = stream.getReader(); let lastFragment = ""; try { while (true) { const { value, done } = await reader.read(); if (done) { // Flush the last fragment now that we're done if (lastFragment !== "") { yield lastFragment; } break; } const data = new TextDecoder().decode(value); lastFragment += data; const parts = lastFragment.split("\n\n"); // Yield all except for the last part for (let i = 0; i < parts.length - 1; i += 1) { yield parts[i]; } // Save the last part as the new last fragment lastFragment = parts[parts.length - 1]; } } finally { reader.releaseLock(); } }
如果你不熟悉這個function*和yield語法,只需將function*視為可以循環返回多個內容的函數,而將yield視為從函數中多次返回內容的方式。
然後你可以循環這個 splitStream 函數,例如:
for await (const data of splitStream(response.body)) { // data here is a full line of text. For OpenAI, it might look like // "data: {...some json object...}" or "data: [DONE]" at the end }
如果這個「for wait」語法讓您感到困惑,那麼它正在使用所謂的「非同步迭代器」 - 就像您在for 循環中使用的常規迭代器一樣,但每次它會取得下一個值時,都會等待它。
對於我們的範例,當我們從OpenAI 取得一些文字並且正在等待更多文字時,for 迴圈會等待直到splitStream 產生另一個值,這將在wait reader.read() 傳回完成的值時發生一行或多行文字。
接下來我們將研究返回非同步迭代器的另一種方法,該迭代器不是 splitStream 等函數,因此呼叫者可以使用「for wait」循環來迭代此資料。
現在我們有一個傳回整行文字的非同步迭代器,我們可以只回傳 splitStream(response.body),但我們希望攔截每一行並轉換它們,同時仍然讓函數的呼叫者迭代。
此方法類似於上面的 async function* 語法。這裡我們將直接傳回一個非同步迭代器,而不是呼叫時回傳一個的非同步函數。不同之處在於類型是 AsyncIterator 而不是需要先呼叫的 AsyncGenerator。 AsyncIterator 可以透過特定的命名函數來定義:Symbol.asyncIterator.2
return { [Symbol.asyncIterator]: async function* () { for await (const data of splitStream(stream)) { //handle the data yield data; } }, };
當您想要傳回與 splitStream 的資料不同的內容時,這非常有用。每次從串流 HTTP 請求中傳入新行時,splitStream 都會產生它,該函數將在資料中接收它,並可以在將其產生給呼叫者之前執行一些操作。
接下來我們將了解如何在 OpenAI 的串流聊天完成 API 的情況下具體解釋這些資料。
The OpenAI response protocol is a series of lines that start with data: or event:, but we’ll just handle the data responses, since that’s the useful part for chat completions. There’s a sentinel of [DONE] if the stream is done, otherwise it’s just JSON.
for await (const data of splitStream(stream)) { if (data.startsWith("data:")) { const json = data.substring("data:".length).trimStart(); if (json.startsWith("[DONE]")) { return; } yield JSON.parse(json); } else { console.debug("Unexpected data:", data); } }
Now that you understand HTTP streaming, you can feel confident working directly with streaming APIs without relying on sdks or libraries. This allows you to hide latency, as your UI can immediately start updating, without consuming more bandwidth with multiple requests. You can use the above function like you would with the official openai npm package:
const response = await createChatCompletion({ model: "llama3", messages: [...your messages...], stream: true, }); for await (const chunk of response) { if (chunk.choices[0].delta?.content) { console.log(chunk.choices[0].delta.content); } }
See the code here that also lets you make some utility functions to make this even easier by pre-configuring the model and extracting the .choices[0].delta.content:
const response = await chatStream(messages); for await (const content of response) { console.log(content); }
Before you copy the code, try to implement it yourself as an exercise in async functions.
Note: you can only have one reader of the stream at a time, so you generally don’t call .getReader() multiple times - you probabaly want .tee() in that case, and if you want to use .getReader() multiple times for some reason, make sure to have the first .releaseLock() first. ↩
Or alternatively you can If you aren’t familiar with Symbol, it’s used in a way to have keys in an object that aren’t strings or numbers. That way they don’t conflict if you added a key named asyncIterator. You could access the function with myIterator[Symbol.asyncIterator](). ↩
以上是使用 fetch 串流 HTTP 回應的詳細內容。更多資訊請關注PHP中文網其他相關文章!