>  기사  >  웹 프론트엔드  >  가져오기를 사용하여 HTTP 응답 스트리밍

가져오기를 사용하여 HTTP 응답 스트리밍

王林
王林원래의
2024-07-24 11:35:41528검색

Streaming HTTP Responses using fetch

이 게시물에서는 HTTP 가져오기 호출을 수행하고 스트리밍 응답을 청크 단위로 수신할 수 있는 JavaScript Streams API 작업을 살펴보겠습니다. 이를 통해 클라이언트는 서버 응답에 더 많이 응답할 수 있습니다. 신속하게 ChatGPT와 같은 UI를 구축하세요.

동기 부여의 예로, npm 종속성을 사용하지 않고 내장된 가져오기만 사용하여 OpenAI(또는 동일한 http 스트리밍 API를 사용하는 모든 서버)의 스트리밍 LLM 응답을 처리하는 기능을 구현하겠습니다. 지수 백오프를 사용한 재시도, 임베딩, 비스트리밍 채팅, 채팅 완료 및 임베딩과 상호작용하기 위한 더 간단한 API를 포함한 전체 코드가 여기에 있습니다.

HTTP 스트림을 클라이언트에 반환하는 방법에 관심이 있다면 이 게시물을 확인하세요.

전체 예제 코드

전체 예시는 다음과 같습니다. 아래에서 각 부분을 살펴보겠습니다.

async function createChatCompletion(body: ChatCompletionCreateParams) {
  // Making the request
  const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com";
  const response = await fetch(baseUrl + "/v1/chat/completions", {
    method: "POST",
    headers: {
      "Content-Type": "application/json",
      "Authorization": "Bearer " + process.env.LLM_API_KEY,
    },
    body: JSON.stringify(body),
  });
  // Handling errors
  if (!response.ok) {
    const error = await response.text();
    throw new Error(`Failed (${response.status}): ${error}`,
  }
  if (!body.stream) { // the non-streaming case
    return response.json();
  }
  const stream = response.body;
  if (!stream) throw new Error("No body in response");
  // Returning an async iterator
  return {
    [Symbol.asyncIterator]: async function* () {
      for await (const data of splitStream(stream)) {
        // Handling the OpenAI HTTP streaming protocol
        if (data.startsWith("data:")) {
          const json = data.substring("data:".length).trimStart();
          if (json.startsWith("[DONE]")) {
            return;
          }
          yield JSON.parse(json);
        }
      }
    },
  };
}

// Reading the stream  
async function* splitStream(stream: ReadableStream<Uint8Array>) {
  const reader = stream.getReader();
  let lastFragment = "";
  try {
    while (true) {
      const { value, done } = await reader.read();
      if (done) {
        // Flush the last fragment now that we're done
        if (lastFragment !== "") {
          yield lastFragment;
        }
        break;
      }
      const data = new TextDecoder().decode(value);
      lastFragment += data;
      const parts = lastFragment.split("\n\n");
      // Yield all except for the last part
      for (let i = 0; i < parts.length - 1; i += 1) {
        yield parts[i];
      }
      // Save the last part as the new last fragment
      lastFragment = parts[parts.length - 1];
    }
  } finally {
    reader.releaseLock();
  }
}

재시도 및 기타 개선 사항과 함께 스트리밍 및 비스트리밍 매개변수 변형에 대한 적절한 형식의 오버로드가 포함된 버전은 여기에서 코드를 참조하세요.

이 게시물의 나머지 부분은 이 코드의 기능을 이해하는 것입니다.

요청하기

이 부분은 사실 매우 쉽습니다. 스트리밍 HTTP 응답은 일반 HTTP 요청에서 나옵니다.

const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com";
const response = await fetch(baseUrl + "/v1/chat/completions", {
  method: "POST",
  headers: {
    "Content-Type": "application/json",
    "Authorization": "Bearer " + process.env.LLM_API_KEY,
  },
  body: JSON.stringify(body),
});

HTTP 헤더는 평소대로 전송되며 스트리밍을 활성화하기 위해 특별히 아무것도 설정할 필요가 없습니다. 또한 HTTP 스트리밍에 일반 캐싱 헤더를 계속 활용할 수 있습니다.

오류 처리

클라이언트 측의 오류에 관한 이야기는 HTTP 스트리밍의 경우 조금 안타깝습니다. 장점은 HTTP 스트리밍의 경우 클라이언트가 초기 응답에서 즉시 상태 코드를 받고 거기에서 오류를 감지할 수 있다는 것입니다. http 프로토콜의 단점은 서버가 성공을 반환했지만 스트림 중간에 중단된 경우 클라이언트에게 스트림이 중단되었음을 알려주는 프로토콜 수준의 어떤 것도 없다는 것입니다. OpenAI가 이 문제를 해결하기 위해 마지막에 "모두 완료" 센티넬을 인코딩하는 방법을 아래에서 살펴보겠습니다.

if (!response.ok) {
  const error = await response.text();
  throw new Error(`Failed (${response.status}): ${error}`,
}

스트림 읽기

HTTP 스트리밍 응답을 읽기 위해 클라이언트는 .getReader() 메소드를 사용하여 서버에서 들어오는 청크를 반복할 수 있는 ReadableStream인 response.body 속성을 사용할 수 있습니다. 1

const reader = request.body.getReader();
try {
    while (true) {
      const { value, done } = await reader.read();
      if (done) break;
      const text = TextDecoder().decode(value);
      //... do something with the chunk
    }
} finally {
  reader.releaseLock();
}

이것은 우리가 반환하는 모든 데이터 비트를 처리하지만 OpenAI HTTP 프로토콜의 경우 데이터가 줄 바꿈으로 구분된 JSON일 것으로 예상하므로 대신 응답 본문을 분할하고 각 줄을 '생성'합니다. 다시 완료되었습니다. 진행 중인 줄을 lastFragment에 버퍼링하고 두 줄 바꿈으로 구분된 전체 줄만 반환합니다.

// stream here is request.body
async function* splitStream(stream: ReadableStream<Uint8Array>) {
  const reader = stream.getReader();
  let lastFragment = "";
  try {
    while (true) {
      const { value, done } = await reader.read();
      if (done) {
        // Flush the last fragment now that we're done
        if (lastFragment !== "") {
          yield lastFragment;
        }
        break;
      }
      const data = new TextDecoder().decode(value);
      lastFragment += data;
      const parts = lastFragment.split("\n\n");
      // Yield all except for the last part
      for (let i = 0; i < parts.length - 1; i += 1) {
        yield parts[i];
      }
      // Save the last part as the new last fragment
      lastFragment = parts[parts.length - 1];
    }
  } finally {
    reader.releaseLock();
  }
}

이 함수*와 Yield 구문이 익숙하지 않다면 function*을 루프에서 여러 항목을 반환할 수 있는 함수로 취급하고, Yield를 함수에서 여러 번 반환하는 방법으로 취급하세요.

그런 다음 이 SplitStream 함수를 다음과 같이 반복할 수 있습니다.

for await (const data of splitStream(response.body)) {
  // data here is a full line of text. For OpenAI, it might look like
  // "data: {...some json object...}" or "data: [DONE]" at the end
}

이 "for wait" 구문이 문제가 된다면 이는 "비동기 반복자"를 사용하는 것입니다. 이는 for 루프와 함께 사용하는 일반 반복자와 같지만 다음 값을 얻을 때마다 대기됩니다.

예를 들어 OpenAI에서 일부 텍스트를 얻었고 더 많은 것을 기다리고 있는 경우 for 루프는 SplitStream이 다른 값을 생성할 때까지 기다립니다. 이는 wait reader.read()가 완료되는 값을 반환할 때 발생합니다. 한 줄 이상의 텍스트

다음으로 호출자가 "for Wait" 루프를 사용하여 이 데이터를 반복할 수 있도록 분할Stream과 같은 함수가 아닌 비동기 반복자를 반환하는 또 다른 방법을 살펴보겠습니다.

비동기 반복자 반환

이제 전체 텍스트 줄을 반환하는 비동기 반복자가 있으므로, 그냥 SplitStream(response.body)을 반환할 수도 있지만, 함수 호출자가 계속 반복하도록 하면서 각 줄을 가로채서 변환하고 싶습니다. .

접근 방식은 위의 async function* 구문과 유사합니다. 여기서는 호출 시 반환하는 비동기 함수 대신 비동기 반복자를 직접 반환합니다. 차이점은 먼저 호출해야 하는 AsyncGenerator 대신 AsyncIterator 유형이라는 것입니다. AsyncIterator는 특정 이름의 함수(Symbol.asyncIterator.2

)를 사용하여 정의할 수 있습니다.
      return {
        [Symbol.asyncIterator]: async function* () {
          for await (const data of splitStream(stream)) {
            //handle the data
            yield data;
          }
        },
      };

splitStream에서 오는 데이터와 다른 것을 반환하려는 경우에 유용합니다. 스트리밍 HTTP 요청에서 새 라인이 들어올 때마다, SplitStream은 이를 생성하고, 이 함수는 이를 데이터로 수신하고 호출자에게 전달하기 전에 작업을 수행할 수 있습니다.

다음으로 OpenAI의 스트리밍 채팅 완료 API의 경우 이 데이터를 구체적으로 해석하는 방법을 살펴보겠습니다.

Handling the OpenAI HTTP streaming protocol

The OpenAI response protocol is a series of lines that start with data: or event:, but we’ll just handle the data responses, since that’s the useful part for chat completions. There’s a sentinel of [DONE] if the stream is done, otherwise it’s just JSON.

for await (const data of splitStream(stream)) {
  if (data.startsWith("data:")) {
    const json = data.substring("data:".length).trimStart();
    if (json.startsWith("[DONE]")) {
      return;
    }
    yield JSON.parse(json);
  } else {
    console.debug("Unexpected data:", data);
  }
}

Bringing it all together

Now that you understand HTTP streaming, you can feel confident working directly with streaming APIs without relying on sdks or libraries. This allows you to hide latency, as your UI can immediately start updating, without consuming more bandwidth with multiple requests. You can use the above function like you would with the official openai npm package:

  const response = await createChatCompletion({
    model: "llama3",
    messages: [...your messages...],
    stream: true,
  });
  for await (const chunk of response) {
    if (chunk.choices[0].delta?.content) {
      console.log(chunk.choices[0].delta.content);
    }
  }

See the code here that also lets you make some utility functions to make this even easier by pre-configuring the model and extracting the .choices[0].delta.content:

const response = await chatStream(messages);
for await (const content of response) {
  console.log(content);
}

Before you copy the code, try to implement it yourself as an exercise in async functions.

More resources

  • For information about returning HTTP streaming data from your own server endpoint, check out this post on AI Chat with HTTP Streaming that both streams data from OpenAI (or similar) to your server and simultaneously streams it down to a client, while doing custom logic as it goes (such as saving chunks to a database).
  • The MDN docs, as always, are great. Beyond the links above, here’s a guide on the readable streams API that shows how to connect a readable stream to an tag to stream in an image request. Note: this guide uses response.body as an async iterator, but currently that is not widely implemented and not in the TypeScript types.
    1. Note: you can only have one reader of the stream at a time, so you generally don’t call .getReader() multiple times - you probabaly want .tee() in that case, and if you want to use .getReader() multiple times for some reason, make sure to have the first .releaseLock() first. ↩

    2. Or alternatively you can If you aren’t familiar with Symbol, it’s used in a way to have keys in an object that aren’t strings or numbers. That way they don’t conflict if you added a key named asyncIterator. You could access the function with myIterator[Symbol.asyncIterator](). ↩

    위 내용은 가져오기를 사용하여 HTTP 응답 스트리밍의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.