首頁 >web前端 >js教程 >附插座的 Langgraph Human In The Loop

附插座的 Langgraph Human In The Loop

DDD
DDD原創
2024-11-30 04:25:12579瀏覽

Langgraph Human In The Loop with socket

透過langgraph的中斷功能,我了解到人類可以介入Agent執行的中途。

但是如果你看一下這些例子,你會發現它們都忽略了人際互動。我該怎麼做才能真正得到用戶的確認?我認為主要有以下三種方式。

使用 Langgraph API 伺服器

您可以使用 langgraph cli 透過 docker 執行 langgraph API 伺服器,然後執行圖表,變更狀態,然後使用 langgraph SDK 重新啟動它。

langgraph提供的項目必須按照提供的方式使用。有很多設置,似乎很難將其與我的程式碼整合

伺服器上的圖形管理

這是一種在我的自訂伺服器上僅實作上述 Langgraph API 伺服器的必要部分的方法。例如,運行圖時,必須儲存執行圖的客戶端和圖檢查點,並在使用者確認後,必須再次載入圖表並根據使用者的回應更改狀態以再次執行

可能有很多事情要思考。

套接字連接

執行Agent時,會連接一個socket,並透過socket與使用者互動。只需在現有範例程式碼中新增透過套接字連接和套接字通訊接收使用者確認的步驟即可實現。

相反,像打字一樣實現串流可能會很困難。

透過socket連接實現

首先,我想以一種盡可能不增加複雜性的方式來實現它,所以我用套接字連接來實現它。

伺服器端使用NestJs,客戶端使用NextJs。

伺服器

首先,建立一個用於Websocket連線的網關。在agent/start時創建了連接,並立即執行agent。


關鍵很簡單。當套接字連接時,立即建立並執行代理,中斷時,向客戶端發送確認請求訊息並等待。一旦確認得到解決,圖表就會繼續。
@WebSocketGateway({
  namespace: "/",
  transport: ["websocket", "polling"],
  path: "/agent/start",
  cors: {
    origin: "*",
    methods: ["GET", "POST"],
    credentials: true,
  },
})
export class AgentGateway implements OnGatewayConnection, OnGatewayDisconnect {
  @WebSocketServer()
  server: Server;
  protected readonly logger = new Logger(this.constructor.name);

  constructor(
    private readonly agentFactory: AgentFactory
  ) {}

  private pendingConfirmations = new Map<string, (response: boolean) => void>();

  // Handle new connections
  handleConnection(client: Socket) {
    console.log(`Client connected: ${client.id}`);

    // Option 1: Get actionData from query parameters
    const actionData: { agent: AgentName } = client.handshake.query.actionData
      ? JSON.parse(client.handshake.query.actionData as string)
      : null;

    if (actionData) {
      this.startAgentProcess(client, actionData);
    } else {
      // If no actionData is provided, you can wait for an event
      client.emit("error", "No action data provided");
      client.disconnect();
    }
  }

  // Handle disconnections
  handleDisconnect(client: Socket) {
    console.log(`Client disconnected: ${client.id}`);
    this.pendingConfirmations.delete(client.id);
  }

  // Send confirmation request
  async sendConfirmationRequest(clientId: string, data: any): Promise<boolean> {
    return new Promise((resolve) => {
      this.pendingConfirmations.set(clientId, resolve);
      this.server.to(clientId).emit("confirmation_request", data);

      // Optional timeout for response
      setTimeout(() => {
        if (this.pendingConfirmations.has(clientId)) {
          this.pendingConfirmations.delete(clientId);
          resolve(false); // Default to 'false' if timeout occurs
        }
      }, 3000000); // 3000 seconds timeout
    });
  }

  // Handle client's confirmation response
  @SubscribeMessage("confirmation_response")
  handleClientResponse(
    @MessageBody() data: { confirmed: boolean },
    @ConnectedSocket() client: Socket
  ) {
    const resolve = this.pendingConfirmations.get(client.id);
    if (resolve) {
      resolve(data.confirmed);
      this.pendingConfirmations.delete(client.id);
    }
  }

  // Start the agent process
  private async startAgentProcess(
    client: Socket,
    actionData: { agent: AgentName }
  ) {
    const graph = await this.agentFactory.create({
      agentName: actionData.agent,
    });

    const initialInput = { input: "hello world" };

    // Thread
    const graphStateConfig = {
      configurable: { thread_id: "1" },
      streamMode: "values" as const,
    };

    // Run the graph until the first interruption
    for await (const event of await graph.stream(
      initialInput,
      graphStateConfig
    )) {
      this.logAndEmit(client, `--- ${event.input} ---`);
    }

    // Will log when the graph is interrupted, after step 2.
    this.logAndEmit(client, "---GRAPH INTERRUPTED---");

    const userConfirmed = await this.sendConfirmationRequest(client.id, {
      message: "Do you want to proceed with this action?",
      actionData,
    });

    if (userConfirmed) {
      // If approved, continue the graph execution. We must pass `null` as
      // the input here, or the graph will
      for await (const event of await graph.stream(null, graphStateConfig)) {
        this.logAndEmit(client, `--- ${event.input} ---`);
      }
      this.logAndEmit(client, "---ACTION EXECUTED---");
    } else {
      this.logAndEmit(client, "---ACTION CANCELLED---");
    }

    // Optionally disconnect the client
    client.disconnect();
  }

  private logAndEmit(client: Socket, message: string) {
    console.log(message);
    client.emit("message", { message });
  }
}

上面程式碼中使用的代理程式是順序使用langgraph文件中下面的步驟1 2 3的代理程式。


客戶
  const GraphState = Annotation.Root({
    input: Annotation<string>,
  });

  const step1 = (state: typeof GraphState.State) => {
    console.log("---Step 1---");
    return state;
  };

  const step2 = (state: typeof GraphState.State) => {
    console.log("---Step 2---");
    return state;
  };

  const step3 = (state: typeof GraphState.State) => {
    console.log("---Step 3---");
    return state;
  };

  const builder = new StateGraph(GraphState)
    .addNode("step1", step1)
    .addNode("step2", step2)
    .addNode("step3", step3)
    .addEdge(START, "step1")
    .addEdge("step1", "step2")
    .addEdge("step2", "step3")
    .addEdge("step3", END);

  // Set up memory
  const graphStateMemory = new MemorySaver();

  const graph = builder.compile({
    checkpointer: graphStateMemory,
    interruptBefore: ["step3"],
  });
  return graph;

客戶端建立一個鉤子來管理代理程式啟動及其狀態。


建立連線並在確認請求到達時更新確認請求狀態。只需查看UI元件中的confirmationRequest狀態並向使用者彈出一個視窗即可。
import { useRef, useState } from "react";
import io, { Socket } from "socket.io-client";

export const useAgentSocket = () => {
  const socketRef = useRef<Socket | null>(null);
  const [confirmationRequest, setConfirmationRequest] = useState<any>(null);
  const [messages, setMessages] = useState<string[]>([]);

  const connectAndRun = (actionData: any) => {
    return new Promise((resolve, reject) => {
      socketRef.current = io("http://localhost:8000", {
        path: "/agent/start",
        transports: ["websocket", "polling"],
        query: {
          actionData: JSON.stringify(actionData),
        },
      });

      socketRef.current.on("connect", () => {
        console.log("Connected:", socketRef.current?.id);
        resolve(void 0);
      });

      socketRef.current.on("connect_error", (error) => {
        console.error("Connection error:", error);
        reject(error);
      });

      // Listen for confirmation requests
      socketRef.current.on("confirmation_request", (data) => {
        setConfirmationRequest(data);
      });

      // Listen for messages
      socketRef.current.on("message", (data) => {
        console.log("Received message:", data);
        setMessages((prevMessages) => [...prevMessages, data.message]);
      });

      socketRef.current.on("disconnect", () => {
        console.log("Disconnected from server");
      });
    });
  };

  const sendConfirmationResponse = (confirmed: boolean) => {
    if (socketRef.current) {
      socketRef.current.emit("confirmation_response", { confirmed });
      setConfirmationRequest(null);
    }
  };

  const disconnectSocket = () => {
    if (socketRef.current) {
      socketRef.current.disconnect();
    }
  };

  const clearMessages = () => {
    setMessages([]);
  };

  return {
    confirmationRequest,
    sendConfirmationResponse,
    connectAndRun,
    disconnectSocket,
    messages,
    clearMessages,
  };
};

以上是附插座的 Langgraph Human In The Loop的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn