Rumah >hujung hadapan web >tutorial js >Menstrim Respons HTTP menggunakan fetch

Menstrim Respons HTTP menggunakan fetch

王林
王林asal
2024-07-24 11:35:41592semak imbas

Streaming HTTP Responses using fetch

Siaran ini akan melihat cara bekerja dengan JavaScript Streams API yang membolehkan membuat panggilan HTTP ambil dan menerima respons penstriman dalam ketulan, yang membolehkan pelanggan mula membalas respons pelayan dengan lebih banyak lagi cepat dan bina UI seperti ChatGPT.

Sebagai contoh yang memotivasikan, kami akan melaksanakan fungsi untuk mengendalikan respons LLM penstriman daripada OpenAI (atau mana-mana pelayan yang menggunakan API penstriman http yang sama), tidak menggunakan kebergantungan npm—hanya pengambilan terbina dalam. Kod penuh ada di sini termasuk percubaan semula dengan penyingkiran eksponen, pembenaman, sembang bukan penstriman dan API yang lebih mudah untuk berinteraksi dengan pelengkapan sembang dan pembenaman.

Jika anda berminat untuk melihat cara untuk mengembalikan strim HTTP kepada pelanggan, lihat siaran ini.

Kod contoh penuh

Berikut ialah contoh penuh. Kami akan melihat setiap bahagian di bawah:

async function createChatCompletion(body: ChatCompletionCreateParams) {
  // Making the request
  const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com";
  const response = await fetch(baseUrl + "/v1/chat/completions", {
    method: "POST",
    headers: {
      "Content-Type": "application/json",
      "Authorization": "Bearer " + process.env.LLM_API_KEY,
    },
    body: JSON.stringify(body),
  });
  // Handling errors
  if (!response.ok) {
    const error = await response.text();
    throw new Error(`Failed (${response.status}): ${error}`,
  }
  if (!body.stream) { // the non-streaming case
    return response.json();
  }
  const stream = response.body;
  if (!stream) throw new Error("No body in response");
  // Returning an async iterator
  return {
    [Symbol.asyncIterator]: async function* () {
      for await (const data of splitStream(stream)) {
        // Handling the OpenAI HTTP streaming protocol
        if (data.startsWith("data:")) {
          const json = data.substring("data:".length).trimStart();
          if (json.startsWith("[DONE]")) {
            return;
          }
          yield JSON.parse(json);
        }
      }
    },
  };
}

// Reading the stream  
async function* splitStream(stream: ReadableStream<Uint8Array>) {
  const reader = stream.getReader();
  let lastFragment = "";
  try {
    while (true) {
      const { value, done } = await reader.read();
      if (done) {
        // Flush the last fragment now that we're done
        if (lastFragment !== "") {
          yield lastFragment;
        }
        break;
      }
      const data = new TextDecoder().decode(value);
      lastFragment += data;
      const parts = lastFragment.split("\n\n");
      // Yield all except for the last part
      for (let i = 0; i < parts.length - 1; i += 1) {
        yield parts[i];
      }
      // Save the last part as the new last fragment
      lastFragment = parts[parts.length - 1];
    }
  } finally {
    reader.releaseLock();
  }
}

Lihat kod di sini untuk versi yang mempunyai kelebihan taip yang bagus untuk varian parameter penstriman & bukan penstriman, bersama-sama dengan percubaan semula dan peningkatan lain.

Siaran selebihnya adalah tentang memahami perkara yang dilakukan oleh kod ini.

Membuat permintaan

Bahagian ini sebenarnya sangat mudah. Respons HTTP penstriman datang daripada permintaan HTTP biasa:

const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com";
const response = await fetch(baseUrl + "/v1/chat/completions", {
  method: "POST",
  headers: {
    "Content-Type": "application/json",
    "Authorization": "Bearer " + process.env.LLM_API_KEY,
  },
  body: JSON.stringify(body),
});

Pengepala HTTP dihantar seperti biasa, dan tidak perlu menetapkan apa-apa khususnya untuk mendayakan penstriman. Dan anda masih boleh memanfaatkan pengepala caching biasa untuk penstriman HTTP.

Mengendalikan ralat

Cerita tentang ralat pada bahagian klien agak malang untuk penstriman HTTP. Kelebihannya ialah untuk penstriman HTTP, pelanggan mendapat kod status serta-merta dalam respons awal dan boleh mengesan kegagalan di sana. Kelemahan protokol http ialah jika pelayan mengembalikan kejayaan tetapi kemudian memecahkan pertengahan strim, tidak ada apa-apa pada tahap protokol yang akan memberitahu klien bahawa strim telah terganggu. Kita akan lihat di bawah cara OpenAI mengekodkan sentinel "selesai" pada penghujungnya untuk menangani perkara ini.

if (!response.ok) {
  const error = await response.text();
  throw new Error(`Failed (${response.status}): ${error}`,
}

Membaca aliran

Untuk membaca respons penstriman HTTP, pelanggan boleh menggunakan sifat response.body yang merupakan ReadableStream yang membolehkan anda mengulangi bahagian apabila ia masuk dari pelayan menggunakan kaedah .getReader(). 1

const reader = request.body.getReader();
try {
    while (true) {
      const { value, done } = await reader.read();
      if (done) break;
      const text = TextDecoder().decode(value);
      //... do something with the chunk
    }
} finally {
  reader.releaseLock();
}

Ini mengendalikan setiap bit data yang kami dapat semula, tetapi untuk protokol HTTP OpenAI kami menjangkakan data tersebut akan JSON dipisahkan oleh baris baharu, jadi sebaliknya kami akan membahagikan badan tindak balas dan "menghasilkan" setiap baris kerana ia' selesai semula. Kami menampan baris dalam proses menjadi lastFragment dan hanya mengembalikan baris penuh yang telah dipisahkan oleh dua baris baharu:

// stream here is request.body
async function* splitStream(stream: ReadableStream<Uint8Array>) {
  const reader = stream.getReader();
  let lastFragment = "";
  try {
    while (true) {
      const { value, done } = await reader.read();
      if (done) {
        // Flush the last fragment now that we're done
        if (lastFragment !== "") {
          yield lastFragment;
        }
        break;
      }
      const data = new TextDecoder().decode(value);
      lastFragment += data;
      const parts = lastFragment.split("\n\n");
      // Yield all except for the last part
      for (let i = 0; i < parts.length - 1; i += 1) {
        yield parts[i];
      }
      // Save the last part as the new last fragment
      lastFragment = parts[parts.length - 1];
    }
  } finally {
    reader.releaseLock();
  }
}

Jika fungsi* dan sintaks hasil ini tidak anda kenali, cuma layan fungsi* sebagai fungsi yang boleh mengembalikan berbilang perkara dalam satu gelung dan hasil sebagai cara mengembalikan sesuatu berbilang kali daripada fungsi.

Anda kemudiannya boleh mengulangi fungsi splitStream ini seperti:

for await (const data of splitStream(response.body)) {
  // data here is a full line of text. For OpenAI, it might look like
  // "data: {...some json object...}" or "data: [DONE]" at the end
}

Jika sintaks "untuk menunggu" ini tidak membantu anda, ia menggunakan apa yang dipanggil "peulang async" - seperti lelaran biasa yang anda akan gunakan dengan gelung for, tetapi setiap kali ia mendapat nilai seterusnya, ia ditunggu-tunggu.

Untuk contoh kami, apabila kami mendapat beberapa teks daripada OpenAI dan kami menunggu lebih banyak lagi, gelung for akan menunggu sehingga splitStream menghasilkan nilai lain, yang akan berlaku apabila menunggu reader.read() mengembalikan nilai yang selesai satu atau lebih baris teks.

Seterusnya kita akan melihat cara lain untuk mengembalikan iterator async yang bukan fungsi seperti splitStream, jadi pemanggil boleh menggunakan gelung "untuk menunggu" untuk mengulangi data ini.

Mengembalikan lelaran async

Sekarang kami mempunyai iterator async yang mengembalikan baris teks penuh, kami hanya boleh mengembalikan splitStream(response.body), tetapi kami mahu memintas setiap baris dan mengubahnya, sambil masih membenarkan pemanggil fungsi kami untuk melelaran .

Pendekatannya serupa dengan sintaks fungsi async* di atas. Di sini kami akan mengembalikan iterator async secara langsung, bukannya fungsi async yang mengembalikannya apabila ia dipanggil. Perbezaannya ialah jenis AsyncIterator dan bukannya AsyncGenerator yang perlu dipanggil terlebih dahulu. AsyncIterator boleh ditakrifkan dengan mempunyai fungsi bernama tertentu: Symbol.asyncIterator.2

      return {
        [Symbol.asyncIterator]: async function* () {
          for await (const data of splitStream(stream)) {
            //handle the data
            yield data;
          }
        },
      };

Ini berguna apabila anda ingin mengembalikan sesuatu yang berbeza daripada data yang datang daripada splitStream. Setiap kali baris baharu masuk daripada permintaan HTTP penstriman, splitStream akan menghasilkannya, fungsi ini akan menerimanya dalam data dan boleh melakukan sesuatu sebelum menyerahkannya kepada pemanggilnya.

Seterusnya kita akan melihat cara mentafsir data ini secara khusus dalam kes API penyiapan sembang penstriman OpenAI.

Handling the OpenAI HTTP streaming protocol

The OpenAI response protocol is a series of lines that start with data: or event:, but we’ll just handle the data responses, since that’s the useful part for chat completions. There’s a sentinel of [DONE] if the stream is done, otherwise it’s just JSON.

for await (const data of splitStream(stream)) {
  if (data.startsWith("data:")) {
    const json = data.substring("data:".length).trimStart();
    if (json.startsWith("[DONE]")) {
      return;
    }
    yield JSON.parse(json);
  } else {
    console.debug("Unexpected data:", data);
  }
}

Bringing it all together

Now that you understand HTTP streaming, you can feel confident working directly with streaming APIs without relying on sdks or libraries. This allows you to hide latency, as your UI can immediately start updating, without consuming more bandwidth with multiple requests. You can use the above function like you would with the official openai npm package:

  const response = await createChatCompletion({
    model: "llama3",
    messages: [...your messages...],
    stream: true,
  });
  for await (const chunk of response) {
    if (chunk.choices[0].delta?.content) {
      console.log(chunk.choices[0].delta.content);
    }
  }

See the code here that also lets you make some utility functions to make this even easier by pre-configuring the model and extracting the .choices[0].delta.content:

const response = await chatStream(messages);
for await (const content of response) {
  console.log(content);
}

Before you copy the code, try to implement it yourself as an exercise in async functions.

More resources

  • For information about returning HTTP streaming data from your own server endpoint, check out this post on AI Chat with HTTP Streaming that both streams data from OpenAI (or similar) to your server and simultaneously streams it down to a client, while doing custom logic as it goes (such as saving chunks to a database).
  • The MDN docs, as always, are great. Beyond the links above, here’s a guide on the readable streams API that shows how to connect a readable stream to an tag to stream in an image request. Note: this guide uses response.body as an async iterator, but currently that is not widely implemented and not in the TypeScript types.
    1. Note: you can only have one reader of the stream at a time, so you generally don’t call .getReader() multiple times - you probabaly want .tee() in that case, and if you want to use .getReader() multiple times for some reason, make sure to have the first .releaseLock() first. ↩

    2. Or alternatively you can If you aren’t familiar with Symbol, it’s used in a way to have keys in an object that aren’t strings or numbers. That way they don’t conflict if you added a key named asyncIterator. You could access the function with myIterator[Symbol.asyncIterator](). ↩

    Atas ialah kandungan terperinci Menstrim Respons HTTP menggunakan fetch. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn