Maison >interface Web >js tutoriel >Langgraph Human In The Loop avec prise

Langgraph Human In The Loop avec prise

DDD
DDDoriginal
2024-11-30 04:25:12583parcourir

Langgraph Human In The Loop with socket

Grâce à la fonction d'interruption de Langgraph, j'ai appris que les humains peuvent intervenir au milieu de l'exécution d'un agent.

Mais si vous regardez les exemples, ils ignorent tous simplement l’interaction humaine. Que dois-je faire pour obtenir la confirmation de l'utilisateur ? Je pense qu'il y a trois manières principales.

Utilisation du serveur API Langgraph

Vous pouvez exécuter le serveur API langgraph avec docker à l'aide de langgraph cli, puis exécuter le graphique, modifier l'état et le redémarrer avec le SDK langgraph.

Les articles fournis par Langgraph doivent être utilisés de la manière prévue. Il y a beaucoup de paramètres, et il semble qu'il puisse être difficile de l'intégrer à mon code

.

Gestion des graphes sur le serveur

Il s'agit d'une méthode permettant d'implémenter uniquement les parties nécessaires du serveur API Langgraph ci-dessus sur mon serveur personnalisé. Par exemple, lors de l'exécution d'un graphique, le client qui a exécuté le graphique et le point de contrôle du graphique doivent être enregistrés, et après la confirmation de l'utilisateur, le graphique doit être chargé à nouveau et l'état doit être modifié en fonction de la réponse de l'utilisateur pour être réexécuté

.

Il y a peut-être beaucoup de choses à penser.

connexion de prise

Lors de l'exécution de l'agent, il connecte un socket et interagit avec l'utilisateur via le socket. Cela fonctionne en ajoutant simplement l'étape de réception de la confirmation de l'utilisateur via la connexion socket et la communication socket dans l'exemple de code existant

.

Au lieu de cela, il peut être difficile de mettre en œuvre un streaming qui est tapé comme une frappe.

Implémenté avec une connexion socket

Tout d'abord, je voulais l'implémenter d'une manière qui n'augmente pas autant que possible la complexité, je l'ai donc implémenté avec une connexion socket

.

Le serveur utilise NestJs et le client utilise NextJs.

serveur

Tout d'abord, créez une passerelle pour la connexion Websocket. Une connexion a été créée au moment de l'agent/démarrage et l'agent a été exécuté immédiatement.

@WebSocketGateway({
  namespace: "/",
  transport: ["websocket", "polling"],
  path: "/agent/start",
  cors: {
    origin: "*",
    methods: ["GET", "POST"],
    credentials: true,
  },
})
export class AgentGateway implements OnGatewayConnection, OnGatewayDisconnect {
  @WebSocketServer()
  server: Server;
  protected readonly logger = new Logger(this.constructor.name);

  constructor(
    private readonly agentFactory: AgentFactory
  ) {}

  private pendingConfirmations = new Map<string, (response: boolean) => void>();

  // Handle new connections
  handleConnection(client: Socket) {
    console.log(`Client connected: ${client.id}`);

    // Option 1: Get actionData from query parameters
    const actionData: { agent: AgentName } = client.handshake.query.actionData
      ? JSON.parse(client.handshake.query.actionData as string)
      : null;

    if (actionData) {
      this.startAgentProcess(client, actionData);
    } else {
      // If no actionData is provided, you can wait for an event
      client.emit("error", "No action data provided");
      client.disconnect();
    }
  }

  // Handle disconnections
  handleDisconnect(client: Socket) {
    console.log(`Client disconnected: ${client.id}`);
    this.pendingConfirmations.delete(client.id);
  }

  // Send confirmation request
  async sendConfirmationRequest(clientId: string, data: any): Promise<boolean> {
    return new Promise((resolve) => {
      this.pendingConfirmations.set(clientId, resolve);
      this.server.to(clientId).emit("confirmation_request", data);

      // Optional timeout for response
      setTimeout(() => {
        if (this.pendingConfirmations.has(clientId)) {
          this.pendingConfirmations.delete(clientId);
          resolve(false); // Default to 'false' if timeout occurs
        }
      }, 3000000); // 3000 seconds timeout
    });
  }

  // Handle client's confirmation response
  @SubscribeMessage("confirmation_response")
  handleClientResponse(
    @MessageBody() data: { confirmed: boolean },
    @ConnectedSocket() client: Socket
  ) {
    const resolve = this.pendingConfirmations.get(client.id);
    if (resolve) {
      resolve(data.confirmed);
      this.pendingConfirmations.delete(client.id);
    }
  }

  // Start the agent process
  private async startAgentProcess(
    client: Socket,
    actionData: { agent: AgentName }
  ) {
    const graph = await this.agentFactory.create({
      agentName: actionData.agent,
    });

    const initialInput = { input: "hello world" };

    // Thread
    const graphStateConfig = {
      configurable: { thread_id: "1" },
      streamMode: "values" as const,
    };

    // Run the graph until the first interruption
    for await (const event of await graph.stream(
      initialInput,
      graphStateConfig
    )) {
      this.logAndEmit(client, `--- ${event.input} ---`);
    }

    // Will log when the graph is interrupted, after step 2.
    this.logAndEmit(client, "---GRAPH INTERRUPTED---");

    const userConfirmed = await this.sendConfirmationRequest(client.id, {
      message: "Do you want to proceed with this action?",
      actionData,
    });

    if (userConfirmed) {
      // If approved, continue the graph execution. We must pass `null` as
      // the input here, or the graph will
      for await (const event of await graph.stream(null, graphStateConfig)) {
        this.logAndEmit(client, `--- ${event.input} ---`);
      }
      this.logAndEmit(client, "---ACTION EXECUTED---");
    } else {
      this.logAndEmit(client, "---ACTION CANCELLED---");
    }

    // Optionally disconnect the client
    client.disconnect();
  }

  private logAndEmit(client: Socket, message: string) {
    console.log(message);
    client.emit("message", { message });
  }
}

La clé est simple. Lorsqu'un socket est connecté, un agent est immédiatement créé et exécuté, et lorsqu'il est interrompu, un message de demande de confirmation est envoyé au client et attend. Une fois la confirmation résolue, le graphique continue.

L'agent utilisé dans le code ci-dessus est un agent qui utilise séquentiellement les étapes 1 2 3 ci-dessous dans le document Langgraph.

  const GraphState = Annotation.Root({
    input: Annotation<string>,
  });

  const step1 = (state: typeof GraphState.State) => {
    console.log("---Step 1---");
    return state;
  };

  const step2 = (state: typeof GraphState.State) => {
    console.log("---Step 2---");
    return state;
  };

  const step3 = (state: typeof GraphState.State) => {
    console.log("---Step 3---");
    return state;
  };

  const builder = new StateGraph(GraphState)
    .addNode("step1", step1)
    .addNode("step2", step2)
    .addNode("step3", step3)
    .addEdge(START, "step1")
    .addEdge("step1", "step2")
    .addEdge("step2", "step3")
    .addEdge("step3", END);

  // Set up memory
  const graphStateMemory = new MemorySaver();

  const graph = builder.compile({
    checkpointer: graphStateMemory,
    interruptBefore: ["step3"],
  });
  return graph;

client

Le client crée un hook pour gérer le démarrage de l'agent et son statut.

import { useRef, useState } from "react";
import io, { Socket } from "socket.io-client";

export const useAgentSocket = () => {
  const socketRef = useRef<Socket | null>(null);
  const [confirmationRequest, setConfirmationRequest] = useState<any>(null);
  const [messages, setMessages] = useState<string[]>([]);

  const connectAndRun = (actionData: any) => {
    return new Promise((resolve, reject) => {
      socketRef.current = io("http://localhost:8000", {
        path: "/agent/start",
        transports: ["websocket", "polling"],
        query: {
          actionData: JSON.stringify(actionData),
        },
      });

      socketRef.current.on("connect", () => {
        console.log("Connected:", socketRef.current?.id);
        resolve(void 0);
      });

      socketRef.current.on("connect_error", (error) => {
        console.error("Connection error:", error);
        reject(error);
      });

      // Listen for confirmation requests
      socketRef.current.on("confirmation_request", (data) => {
        setConfirmationRequest(data);
      });

      // Listen for messages
      socketRef.current.on("message", (data) => {
        console.log("Received message:", data);
        setMessages((prevMessages) => [...prevMessages, data.message]);
      });

      socketRef.current.on("disconnect", () => {
        console.log("Disconnected from server");
      });
    });
  };

  const sendConfirmationResponse = (confirmed: boolean) => {
    if (socketRef.current) {
      socketRef.current.emit("confirmation_response", { confirmed });
      setConfirmationRequest(null);
    }
  };

  const disconnectSocket = () => {
    if (socketRef.current) {
      socketRef.current.disconnect();
    }
  };

  const clearMessages = () => {
    setMessages([]);
  };

  return {
    confirmationRequest,
    sendConfirmationResponse,
    connectAndRun,
    disconnectSocket,
    messages,
    clearMessages,
  };
};

Établit une connexion et met à jour le statut de confirmationRequest lorsqu'une demande de confirmation arrive. Regardez simplement l'état de confirmationRequest dans le composant de l'interface utilisateur et ouvrez une fenêtre à l'utilisateur.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn