Athrun Data Intelligence


En nuestro Tutorial preliminar, construimos un agente de IA capaz de objetar consultas navegando por la web y agregamos persistencia para ayudar el estado. Sin bloqueo, en muchos escenarios, es posible que desee poner a un humano en el ciclo para monitorear y aprobar las acciones del agente. Esto se puede alcanzar fácilmente con Langgraph. Exploremos cómo funciona esto.

Configuración del agente

Continuaremos desde donde lo dejamos en la última asignatura. Primero, configure las variables de entorno, realice las importaciones necesarias y configure el checkpointer.

pip install langgraph==0.2.53 langgraph-checkpoint==2.0.6 langgraph-sdk==0.1.36 langchain-groq langchain-community langgraph-checkpoint-sqlite==2.0.1
import os
os.environ('TAVILY_API_KEY') = ""
os.environ('GROQ_API_KEY') = ""
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage, AIMessage
from langchain_groq import ChatGroq
from langchain_community.tools.tavily_search import TavilySearchResults

from langgraph.checkpoint.sqlite import SqliteSaver
import sqlite3
sqlite_conn = sqlite3.connect("checkpoints.sqlite",check_same_thread=False)
memory = SqliteSaver(sqlite_conn)

# Initialize the search tool
tool = TavilySearchResults(max_results=2)

Definición del agente

class Agent:
    def __init__(self, model, tools, checkpointer, system=""):
        self.system = system
        graph = StateGraph(AgentState)
        graph.add_node("llm", self.call_openai)
        graph.add_node("action", self.take_action)
        graph.add_conditional_edges("llm", self.exists_action, {True: "action", False: END})
        graph.add_edge("action", "llm")
        graph.set_entry_point("llm")
        self.graph = graph.compile(checkpointer=checkpointer)
        self.tools = {t.name: t for t in tools}
        self.model = model.bind_tools(tools)

    def call_openai(self, state: AgentState):
        messages = state('messages')
        if self.system:
            messages = (SystemMessage(content=self.system)) + messages
        message = self.model.invoke(messages)
        return {'messages': (message)}

    def exists_action(self, state: AgentState):
        result = state('messages')(-1)
        return len(result.tool_calls) > 0

    def take_action(self, state: AgentState):
        tool_calls = state('messages')(-1).tool_calls
        results = ()
        for t in tool_calls:
            print(f"Calling: {t}")
            result = self.tools(t('name')).invoke(t('args'))
            results.append(ToolMessage(tool_call_id=t('id'), name=t('name'), content=str(result)))
        print("Back to the model!")
        return {'messages': results}

Configuración del estado del agente

Ahora configuramos el estado del agente con una ligera modificación. Anteriormente, la relación de mensajes se anotó con Operator.Add, agregando nuevos mensajes a la matriz existente. Para las interacciones humanas en el onda, a veces incluso queremos reemplazar los mensajes existentes con la misma ID en superficie de agregarlos.

from uuid import uuid4

def reduce_messages(left: list(AnyMessage), right: list(AnyMessage)) -> list(AnyMessage):
    # Assign IDs to messages that don't have them
    for message in right:
        if not message.id:
            message.id = str(uuid4())
    # Merge the new messages with the existing ones
    merged = left.copy()
    for message in right:
        for i, existing in enumerate(merged):
            if existing.id == message.id:
                merged(i) = message
                break
        else:
            merged.append(message)
    return merged

class AgentState(TypedDict):
    messages: Annotated(list(AnyMessage), reduce_messages)

Sumar un humano en el onda

Introducimos una modificación adicional al clasificar el expresivo. El parámetro Interrupt_Bore = («Action») agrega una interrupción antaño de atraer al nodo de batalla, asegurando la aprobación manual antaño de ejecutar herramientas.

class Agent:
    def __init__(self, model, tools, checkpointer, system=""):
        # Everything else remains the same as before
        self.graph = graph.compile(checkpointer=checkpointer, interrupt_before=("action"))
    # Everything else remains unchanged

Ejecutando el agente

Ahora, inicializaremos el sistema con el mismo mensaje, maniquí y checkpointer que antaño. Cuando llamamos al agente, pasamos la configuración del subproceso con una ID de subprocesamiento.

prompt = """You are a smart research assistant. Use the search engine to look up information. 
You are allowed to make multiple calls (either together or in sequence). 
Only look up information when you are sure of what you want. 
If you need to look up some information before asking a follow up question, you are allowed to do that!
"""
model = ChatGroq(model="Fogosidad-3.3-70b-Specdec")
abot = Agent(model, (tool), system=prompt, checkpointer=memory)
messages = (HumanMessage(content="Whats the weather in SF?"))
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

Las respuestas se transmiten con destino a antes y el proceso se detiene posteriormente del mensaje AI, lo que indica una emplazamiento de utensilio. Sin bloqueo, el parámetro interrupt_before evita la ejecución inmediata. Además podemos obtener el estado flagrante del expresivo para este hilo y ver lo que contiene y incluso contiene el próximo nodo que se llamará (‘batalla’ aquí).

abot.graph.get_state(thread)
abot.graph.get_state(thread).next

Para continuar, volvemos a atraer a la transmisión con la misma configuración de subprocesos, pasando ningún como entrada. Esto transmite resultados, incluido el mensaje de la utensilio y el mensaje AI final. Poliedro que no se agregó interrupción entre el nodo de batalla y el nodo LLM, la ejecución continúa sin problemas.

for event in abot.graph.stream(None, thread):
    for v in event.values():
        print(v)

Aprobación humana interactiva

Podemos implementar un onda simple que solicita al usufructuario para su aprobación antaño de continuar la ejecución. Se utiliza una nueva ID de subproceso para una nueva ejecución. Si el usufructuario elige no continuar, el agente se detiene.

messages = (HumanMessage("What's the weather in LA?"))
thread = {"configurable": {"thread_id": "2"}}

for event in abot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

while abot.graph.get_state(thread).next:
    print("n", abot.graph.get_state(thread), "n")
    _input = input("Proceed? (y/n): ")
    if _input.lower() != "y":
        print("Aborting")
        break
    for event in abot.graph.stream(None, thread):
        for v in event.values():
            print(v)

¡Excelente! Ahora sabes cómo puedes involucrar a un humano en el onda. Ahora, intente constatar con diferentes interrupciones y vea cómo se comporta el agente.

Referencias: Deeplearning.ai (https://learn.deeplearning.ai/courses/ai-agents-in-langgraph/lesson/6/human-in-the-loop)


Vineet Kumar es un pasante de consultoría en MarktechPost. Actualmente está persiguiendo su BS del Instituto Indio de Tecnología (IIT), Kanpur. Es un entusiasta del formación instintivo. Le apasiona la investigación y los últimos avances en formación profundo, visión por computadora y campos relacionados.

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *