ホームページ >ウェブフロントエンド >jsチュートリアル >Langgraph Human In The Loop (ソケット付き)

Langgraph Human In The Loop (ソケット付き)

DDD
DDDオリジナル
2024-11-30 04:25:12579ブラウズ

Langgraph Human In The Loop with socket

langgraph の interruption 機能を通じて Agent の遂行の途中に human が介入できることが分かった。

しかし、例を見れば、全部の人間の相互作用はひとつになってしまいます。実際にユーザーに確認を受けるにはどうすればよいですか?大きく3つの方法があるようだ。

Langgraph API サーバーの使用

langgraph cli で langgraph API サーバーを docker で実行した後、 langgraph SDK でグラフを実行し、ステートを変更し、再作成することができる。

langgraphが提供するものを提供する方法で使用する必要があります。何らかの設定が多くなり、私のコードと融合するのが難しいかもしれません。

サーバーでグラフを管理する

上記のLanggraph APIサーバーで必要な部分だけを私のカスタムサーバーに実装する方法だ。たとえば、グラフを実行すると、グラフを実行したクライアントとグラフのチェックポイントを保存し、ユーザーの確認後に再度グラフを呼び出して、ユーザーの応答に合わせて状態を変更して再作成する必要があります。

織らなければならないのは隠されているかもしれません。

ソケット接続

Agent実行時にソケットを接続し、ソケットを介してユーザーとインタラクションするのだ。既存の例コードでソケット接続とソケット通信でユーザー確認を受けるステップだけを追加すれば動作する。

の代わりに、文字を入力するように打たれるストリーミングを実装するのは難しいかもしれません。

ソケット接続で実装

一度、できるだけ複雑さを増やさない方向で実装をしてみたくてソケット接続で実装してみた。

サーバーはNestJsを使用し、クライアントはNextJsを使用します。

サーバー

一度Websocket接続用のGatewayを作成します。 agent/start時にコネクションを作成し、すぐにagentを施行するようにした。

核心は簡単だ。 Socketが接続されたらすぐにagentを生成して実行し、実行してinterruptされた場合、Clientに confirmation request messageを送信して待つ。 confirmationがresolveされたら、graphを進めます。


上記のコードで使用したエージェントは、langgraphドキュメントの下のステップ1 2 3を順番に使用するエージェントです。

@WebSocketGateway({
  namespace: "/",
  transport: ["websocket", "polling"],
  path: "/agent/start",
  cors: {
    origin: "*",
    methods: ["GET", "POST"],
    credentials: true,
  },
})
export class AgentGateway implements OnGatewayConnection, OnGatewayDisconnect {
  @WebSocketServer()
  server: Server;
  protected readonly logger = new Logger(this.constructor.name);

  constructor(
    private readonly agentFactory: AgentFactory
  ) {}

  private pendingConfirmations = new Map<string, (response: boolean) => void>();

  // Handle new connections
  handleConnection(client: Socket) {
    console.log(`Client connected: ${client.id}`);

    // Option 1: Get actionData from query parameters
    const actionData: { agent: AgentName } = client.handshake.query.actionData
      ? JSON.parse(client.handshake.query.actionData as string)
      : null;

    if (actionData) {
      this.startAgentProcess(client, actionData);
    } else {
      // If no actionData is provided, you can wait for an event
      client.emit("error", "No action data provided");
      client.disconnect();
    }
  }

  // Handle disconnections
  handleDisconnect(client: Socket) {
    console.log(`Client disconnected: ${client.id}`);
    this.pendingConfirmations.delete(client.id);
  }

  // Send confirmation request
  async sendConfirmationRequest(clientId: string, data: any): Promise<boolean> {
    return new Promise((resolve) => {
      this.pendingConfirmations.set(clientId, resolve);
      this.server.to(clientId).emit("confirmation_request", data);

      // Optional timeout for response
      setTimeout(() => {
        if (this.pendingConfirmations.has(clientId)) {
          this.pendingConfirmations.delete(clientId);
          resolve(false); // Default to 'false' if timeout occurs
        }
      }, 3000000); // 3000 seconds timeout
    });
  }

  // Handle client's confirmation response
  @SubscribeMessage("confirmation_response")
  handleClientResponse(
    @MessageBody() data: { confirmed: boolean },
    @ConnectedSocket() client: Socket
  ) {
    const resolve = this.pendingConfirmations.get(client.id);
    if (resolve) {
      resolve(data.confirmed);
      this.pendingConfirmations.delete(client.id);
    }
  }

  // Start the agent process
  private async startAgentProcess(
    client: Socket,
    actionData: { agent: AgentName }
  ) {
    const graph = await this.agentFactory.create({
      agentName: actionData.agent,
    });

    const initialInput = { input: "hello world" };

    // Thread
    const graphStateConfig = {
      configurable: { thread_id: "1" },
      streamMode: "values" as const,
    };

    // Run the graph until the first interruption
    for await (const event of await graph.stream(
      initialInput,
      graphStateConfig
    )) {
      this.logAndEmit(client, `--- ${event.input} ---`);
    }

    // Will log when the graph is interrupted, after step 2.
    this.logAndEmit(client, "---GRAPH INTERRUPTED---");

    const userConfirmed = await this.sendConfirmationRequest(client.id, {
      message: "Do you want to proceed with this action?",
      actionData,
    });

    if (userConfirmed) {
      // If approved, continue the graph execution. We must pass `null` as
      // the input here, or the graph will
      for await (const event of await graph.stream(null, graphStateConfig)) {
        this.logAndEmit(client, `--- ${event.input} ---`);
      }
      this.logAndEmit(client, "---ACTION EXECUTED---");
    } else {
      this.logAndEmit(client, "---ACTION CANCELLED---");
    }

    // Optionally disconnect the client
    client.disconnect();
  }

  private logAndEmit(client: Socket, message: string) {
    console.log(message);
    client.emit("message", { message });
  }
}

クライアント


クライアントではフックを作ってagent startとその状態を管理する。

  const GraphState = Annotation.Root({
    input: Annotation<string>,
  });

  const step1 = (state: typeof GraphState.State) => {
    console.log("---Step 1---");
    return state;
  };

  const step2 = (state: typeof GraphState.State) => {
    console.log("---Step 2---");
    return state;
  };

  const step3 = (state: typeof GraphState.State) => {
    console.log("---Step 3---");
    return state;
  };

  const builder = new StateGraph(GraphState)
    .addNode("step1", step1)
    .addNode("step2", step2)
    .addNode("step3", step3)
    .addEdge(START, "step1")
    .addEdge("step1", "step2")
    .addEdge("step2", "step3")
    .addEdge("step3", END);

  // Set up memory
  const graphStateMemory = new MemorySaver();

  const graph = builder.compile({
    checkpointer: graphStateMemory,
    interruptBefore: ["step3"],
  });
  return graph;

コネクションを結び、 confirmation request が来ると confirmationRequest 状態を更新する。 UI componentでconfirmationRequestの状態を見て、ユーザーにウィンドウを表示させればよい。

以上がLanggraph Human In The Loop (ソケット付き)の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。