Beta Program - 3 Months Free Business!

LangGraph & KirokuForms: HITL Examples

Explore practical examples demonstrating how to integrate KirokuForms with LangGraph for various Human-in-the-Loop (HITL) workflow patterns. These examples showcase common use cases and integration techniques.

Basic Verification Workflow

This example demonstrates a common HITL scenario: a transaction processing workflow where high-value transactions require manual human verification, while lower-value ones are auto-approved. It utilizes the create_kiroku_interrupt_handler which is called directly within a node to initiate the HITL process.

Transaction Verification with KirokuForms Interrupt
from typing import TypedDict, Optional
from langgraph.graph import StateGraph, END
from langgraph.checkpoint import MemorySaver # Simple in-memory saver for examples
from kirokuforms_langgraph import create_kiroku_interrupt_handler # Updated import

# Initialize the KirokuForms interrupt handler
api_key = "YOUR_KIROKU_API_KEY" # Replace with your actual API key
human_review_interrupt = create_kiroku_interrupt_handler(api_key=api_key)

# Define application state
class TransactionState(TypedDict):
    transaction: dict
    human_input: Optional[dict]
    verified: bool
    status: str
    notes: Optional[str]

# Define a node that processes data and may require human verification
def check_transaction_node(state: TransactionState) -> TransactionState:
    transaction = state["transaction"]
    
    # Determine if transaction needs human verification
    if transaction["amount"] > 1000:
        # This will trigger the KirokuForms interrupt
        # The interrupt handler will create the form and pause the graph
        # The state passed to the interrupt handler is the current graph state
        # Form fields can be dynamically generated or use default values from state
        return human_review_interrupt(state, {
            "title": "Verify High-Value Transaction",
            "description": f"Transaction ID: {transaction['id']} for {transaction['amount']:.2f} requires manual verification.",
            "form_fields": [ # Renamed from 'fields' for clarity with kirokuforms-langgraph
                {"type": "staticText", "name": "info", "label": "Transaction Details", 
                 "text": f"ID: {transaction['id']}, Customer: {transaction['customer']}, Amount: {transaction['amount']:.2f}"},
                {"type": "radio", "label": "Approve Transaction?", "name": "approved", "required": True,
                 "options": [{"label": "Approve", "value": "yes"}, {"label": "Reject", "value": "no"}]},
                {"type": "textarea", "label": "Verification Notes", "name": "notes", "required": False}
            ]
        })
    
    # If amount is under threshold, auto-approve
    return {**state, "verified": True, "status": "auto_approved"}

# Define a node to handle the result of human verification (or auto-approval)
def process_verification_result_node(state: TransactionState) -> TransactionState:
    human_input = state.get("human_input") # Data from KirokuForm if human review occurred
    
    if human_input: # Means human review happened
        approved = human_input.get("approved") == "yes"
        notes = human_input.get("notes", "")
        return {**state, "verified": approved, "status": "approved_by_human" if approved else "rejected_by_human", "notes": notes}
    
    # If no human_input, it was auto-approved
    return state # No change needed if already auto_approved

# Define final processing nodes
def finalize_approved_transaction_node(state: TransactionState) -> TransactionState:
    return {**state, "status": "processed_approved"}

def finalize_rejected_transaction_node(state: TransactionState) -> TransactionState:
    return {**state, "status": "processed_rejected"}

# Build the graph
workflow_builder = StateGraph(TransactionState)

workflow_builder.add_node("check_transaction", check_transaction_node)
workflow_builder.add_node("process_verification_result", process_verification_result_node)
workflow_builder.add_node("finalize_approved", finalize_approved_transaction_node)
workflow_builder.add_node("finalize_rejected", finalize_rejected_transaction_node)

# Define conditional routing
def route_after_check(state: TransactionState):
    # The interrupt handler adds 'human_input' to state after review
    if "human_input" in state or state.get("verified"): # human_input means review happened, 'verified' means auto-approved
        return "process_verification_result"
    # This path shouldn't be hit if interrupt is configured correctly,
    # as check_transaction_node will pause if human_review_interrupt is called.
    # LangGraph waits for human_review_interrupt to complete (i.e., human submits form).
    return END # Fallback, should ideally not occur

def route_after_verification_processing(state: TransactionState):
    if state.get("verified", False):
        return "finalize_approved"
    else:
        return "finalize_rejected"

workflow_builder.set_entry_point("check_transaction")
workflow_builder.add_conditional_edges("check_transaction", route_after_check) # Interrupt handles pause
workflow_builder.add_conditional_edges("process_verification_result", route_after_verification_processing)
workflow_builder.add_edge("finalize_approved", END)
workflow_builder.add_edge("finalize_rejected", END)

# Compile the graph with checkpointing and the interrupt
# The interrupt will be active for ALL nodes if not specified with interrupt_before/after.
# For this example, check_transaction_node explicitly calls the interrupt handler.
memory = MemorySaver()
app = workflow_builder.compile(
    checkpointer=memory,
    # No interrupt_before/after needed here as node calls handler directly
)

# Example Invocation
# config = {"configurable": {"thread_id": "tx-1"}}
# result_high_value = app.invoke({"transaction": {"id": "TRX-001", "customer": "Big Spender", "amount": 1500.00}}, config)
# print(f"High-value result (after human review): {result_high_value}")

# config_low = {"configurable": {"thread_id": "tx-2"}}
# result_low_value = app.invoke({"transaction": {"id": "TRX-002", "customer": "Small Fry", "amount": 150.00}}, config_low)
# print(f"Low-value result: {result_low_value}")

Workflow Logic

  • Processes incoming transactions.
  • If a transaction's amount exceeds a threshold (e.g., $1000), it calls the KirokuForms interrupt handler.
  • The interrupt handler creates a KirokuForm for verification and pauses the LangGraph execution for that specific thread.
  • Once the human submits the form, LangGraph resumes, and a subsequent node processes the human's decision.
  • Low-value transactions bypass human review and are auto-approved.

AI Agent Oversight

In this example, KirokuForms provides a human oversight mechanism for an AI agent built with LangGraph. When the agent proposes actions deemed critical (based on LLM output or tools used), it triggers a human review step.

Human Oversight for AI Agent Actions
from typing import TypedDict, Annotated, Sequence, Optional
import operator # Required for Annotated human_input setter
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint import MemorySaver
from kirokuforms_langgraph import create_kiroku_interrupt_handler # Updated import
from langchain_core.tools import tool
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, ToolMessage # For explicit message construction
from langchain_openai import ChatOpenAI


# Define tools for the agent
@tool
def search_tool(query: str) -> str:
    """Searches for information based on the query."""
    # In a real scenario, this would call a search API
    return f"Search results for '{query}': Information found about {query}."

@tool
def financial_analysis_tool(data: dict) -> str:
    """Performs financial analysis on the provided data."""
    item = data.get("item", "the subject")
    # In a real scenario, this would perform calculations or call another service
    return f"Financial analysis complete for '{item}'. Recommendation: Based on the current data, proceed with caution."

tools = [search_tool, financial_analysis_tool]

# Agent state
class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], add_messages]
    human_input: Annotated[Optional[dict], operator.setitem] # KirokuForms interrupt handler adds 'human_input' after review

# Initialize LLM and KirokuForms interrupt handler
llm = ChatOpenAI(model="gpt-4-turbo-preview", api_key="YOUR_OPENAI_API_KEY") # Replace with your key
human_oversight_interrupt = create_kiroku_interrupt_handler(api_key="YOUR_KIROKU_API_KEY") # Replace

# Define agent nodes
def call_model_node(state: AgentState) -> dict:
    response = llm.invoke(state["messages"], tools=tools)
    return {"messages": [response]}

def call_tools_node(state: AgentState) -> dict:
    last_message = state["messages"][-1]
    if not isinstance(last_message, AIMessage) or not last_message.tool_calls:
        return {"messages": [HumanMessage(content="No tool calls found in the last AI message.")]} # Or handle as an error

    tool_results = []
    for tc in last_message.tool_calls:
        tool_name = tc["name"]
        try:
            # Find the tool in the global scope or a registered tool list
            selected_tool = globals().get(tool_name) # Or use a more robust tool registry
            if callable(selected_tool):
                tool_output = selected_tool(tc["args"])
                tool_results.append(ToolMessage(content=str(tool_output), tool_call_id=tc["id"]))
            else:
                tool_results.append(ToolMessage(content=f"Error: Tool '{tool_name}' not found or not callable.", tool_call_id=tc["id"]))
        except Exception as e:
            tool_results.append(ToolMessage(content=f"Error executing tool '{tool_name}': {e}", tool_call_id=tc["id"]))
            
    return {"messages": tool_results}


# Criteria for when to request human oversight
def needs_human_oversight(state: AgentState) -> bool:
    last_message = state["messages"][-1]
    if isinstance(last_message, AIMessage) and last_message.content and isinstance(last_message.content, str):
        critical_phrases = ["critical decision", "large investment", "significant financial impact", "major commitment"]
        if any(phrase in last_message.content.lower() for phrase in critical_phrases):
            return True
    # Example: if a specific tool (like financial_analysis_tool) was just called and produced output
    if isinstance(last_message, ToolMessage) and last_message.name == "financial_analysis_tool":
        return True
    return False

# Node to trigger human oversight
def human_oversight_node(state: AgentState) -> dict:
    # Try to get the last AI message content or a summary
    agent_plan_summary = "No specific plan articulated by AI yet."
    if state["messages"]:
        # Iterate backwards to find the last AI message that is not a tool call response
        for msg in reversed(state["messages"]):
            if isinstance(msg, AIMessage) and msg.content:
                agent_plan_summary = msg.content
                break
            elif isinstance(msg, HumanMessage) and msg.content: # If last is human, show that too
                agent_plan_summary = f"Last user input: {msg.content}"
                break


    # This call will pause the graph and create a KirokuForm
    return human_oversight_interrupt(state, {
        "title": "AI Agent Action Requires Approval",
        "description": "The AI agent proposes an action or has reached a point that requires your review and approval before proceeding.",
        "form_fields": [
            {"type": "staticText", "name":"plan_display", "label": "Agent's Current Context / Proposed Action:", 
             "text": agent_plan_summary},
            {"type": "radio", "label": "Your Decision:", "name": "human_decision", "required": True,
             "options": [
                 {"label": "Approve Plan", "value": "approve"},
                 {"label": "Request Modification", "value": "modify"},
                 {"label": "Reject Plan", "value": "reject"}
             ]},
            {"type": "textarea", "label": "Reason / Modification Instructions (if any):", "name": "human_feedback", "required": False}
        ]
    })

# Node to process human feedback
def process_human_feedback_node(state: AgentState) -> dict:
    if state.get("human_input"):
        decision = state["human_input"].get("human_decision")
        feedback = state["human_input"].get("human_feedback", "")
        
        if decision == "approve":
            return {"messages": [HumanMessage(content=f"Human approved the plan. Original feedback: {feedback if feedback else 'None'}. Proceed.")]}
        elif decision == "modify":
            return {"messages": [HumanMessage(content=f"Human requested modification: '{feedback}'. Please revise the plan accordingly.")]}
        else: # Reject
            return {"messages": [HumanMessage(content=f"Human rejected the plan. Reason: '{feedback if feedback else 'None'}'. Halting operation.")]}
    # This case should ideally not be reached if human_input is guaranteed by the interrupt
    return {"messages": [HumanMessage(content="Error: Human feedback processing called without human_input in state.")]}


# Graph definition
agent_workflow_builder = StateGraph(AgentState) # Changed variable name for clarity
agent_workflow_builder.add_node("agent", call_model_node)
agent_workflow_builder.add_node("tools", call_tools_node)
agent_workflow_builder.add_node("human_oversight_trigger_node", human_oversight_node) # Renamed for clarity
agent_workflow_builder.add_node("process_human_feedback_node", process_human_feedback_node) # Renamed for clarity

# Conditional routing
def route_after_agent_logic(state: AgentState) -> str: # Renamed for clarity
    last_message = state["messages"][-1]
    if isinstance(last_message, AIMessage) and last_message.tool_calls:
        return "tools"
    # Check for oversight AFTER LLM response (or tool use if that's a trigger)
    if needs_human_oversight(state): 
        return "human_oversight_trigger_node"
    return END # If no tools and no oversight needed

def route_after_human_feedback_logic(state: AgentState) -> str: # Renamed for clarity
    # Based on human feedback message, decide next step
    last_message_content = state["messages"][-1].content.lower() if state["messages"] and state["messages"][-1].content else ""
    if "rejected the plan" in last_message_content or "approved the plan" in last_message_content:
        return END
    # If "requested modification" or other cases that require more agent work
    return "agent" 

agent_workflow_builder.set_entry_point("agent")
agent_workflow_builder.add_conditional_edges("agent", route_after_agent_logic)
agent_workflow_builder.add_edge("tools", "agent") # Tools always return to agent for next step

# human_oversight_trigger_node calls interrupt; graph pauses. On resume, human_input is in state.
# The interrupt handler itself manages resumption to the *next* node in sequence if not conditional.
# Here, we want to explicitly route to process_human_feedback_node.
# The interrupt handler should add human_input, then this edge is followed.
agent_workflow_builder.add_edge("human_oversight_trigger_node", "process_human_feedback_node")
agent_workflow_builder.add_conditional_edges("process_human_feedback_node", route_after_human_feedback_logic)


# Compile the agent
memory_agent = MemorySaver()
graph_agent = agent_workflow_builder.compile(checkpointer=memory_agent)

# Example Invocation:
# config_agent = {"configurable": {"thread_id": "agent-thread-1"}}
# initial_input_agent = {"messages": [HumanMessage(content="Analyze ACME Corp's financials for a large investment.")]}
# for event in graph_agent.stream(initial_input_agent, config_agent, stream_mode="values"):
#     print("--- Agent Stream Event ---")
#     for key, value in event.items():
#         if key == "messages":
#             print(f"Messages:")
#             for msg in value:
#                 print(f"  - Role: {msg.role if hasattr(msg, 'role') else type(msg)}, Content: {msg.content if hasattr(msg, 'content') else 'N/A'}")
#                 if hasattr(msg, 'tool_calls') and msg.tool_calls:
#                     print(f"    Tool Calls: {msg.tool_calls}")
#         else:
#             print(f"{key}: {value}")
#     print("--- End Agent Stream Event ---")
#     if "human_input" in event and event["human_input"]: # This check might be too early, human_input appears after resume
#         print(f"Human Input was processed: {event['human_input']}")

# To see the form URL (if HITL is triggered):
# response = graph_agent.invoke(initial_input_agent, config_agent)
# if response.get("hitl_form_url"): # Assuming interrupt handler adds this
#    print(f"KirokuForm URL for human review: {response.get('hitl_form_url')}")
# else:
#    print(f"Final Agent State: {response}")

Agent Oversight Features

  • An AI agent processes user requests using an LLM and tools.
  • If the agent's proposed plan meets certain criteria (e.g., involves critical financial decisions), a human review is initiated via KirokuForms.
  • A human reviewer can approve the plan, reject it, or suggest modifications through the KirokuForm.
  • The agent's workflow adapts based on the human's feedback, either proceeding, revising its plan, or halting.

Asynchronous API with Webhooks

This advanced example showcases building an asynchronous API (using FastAPI) where LangGraph workflows involving HITL tasks are managed without blocking the main API request. KirokuForms notifies the API via a webhook upon task completion.

Asynchronous Document Review with Webhooks
from typing import TypedDict, Optional
from langgraph.graph import StateGraph, END
from kirokuforms_langgraph import create_kiroku_interrupt_handler # Updated import
import os
from fastapi import FastAPI, Request, BackgroundTasks
from pydantic import BaseModel
import uvicorn
import asyncio
import uuid # For generating unique thread_ids

# --- LangGraph Setup ---
# In production, use a persistent checkpointer (e.g., RedisSaver, SqliteSaver from langgraph.checkpoint.redis import RedisSaver)
# and a robust way to map KirokuForms task_id to LangGraph thread_id.
# For this example, we use a simple in-memory dictionary.
thread_task_map = {} 

# State for the workflow
class DocumentState(TypedDict):
    document_id: str
    document_content: str
    thread_id: str # Store thread_id in state for easier reference if needed
    hitl_task_id: Optional[str] # KirokuForms task ID
    hitl_form_url: Optional[str] # URL for the KirokuForm
    human_input: Optional[dict] # Data from KirokuForm submission
    status: str
    notes: Optional[str] # Notes from human review

# Initialize KirokuForms interrupt handler with webhook details
# Ensure your server is reachable at this webhook_url by KirokuForms service
WEBHOOK_BASE_URL = os.environ.get("WEBHOOK_BASE_URL", "http://localhost:8000") # Use ngrok for local dev
human_review_async_interrupt = create_kiroku_interrupt_handler(
    api_key="YOUR_KIROKU_API_KEY", # Replace
    webhook_url=f"{WEBHOOK_BASE_URL}/webhook/kirokuforms_hitl_callback",
    webhook_secret="YOUR_SECURE_WEBHOOK_SECRET" # Replace
)

# Define workflow nodes
def start_document_processing_node(state: DocumentState) -> DocumentState:
    # Simulate initial processing
    print(f"Thread {state['thread_id']}: Starting processing for document {state['document_id']}")
    return {**state, "status": "processing_started"}

def request_human_review_node(state: DocumentState) -> DocumentState:
    print(f"Thread {state['thread_id']}: Requesting human review for document {state['document_id']}")
    # This call creates the KirokuForm task.
    # Because the handler is configured with a webhook_url, LangGraph will pause this thread's execution.
    # The KirokuForms task_id and form_url are typically added to the state by the interrupt handler.
    updated_state_with_task_info = human_review_async_interrupt(state, { # The interrupt returns the modified state
        "title": "Async Document Review Required",
        "description": f"Please review document: {state['document_id']}. Content preview: {state['document_content'][:100]}...",
        "form_fields": [
            {"type": "staticText", "name": "doc_id_display", "label": "Document ID:", "text": state['document_id']},
            {"type": "textarea", "name": "doc_content_full", "label": "Full Document Content (read-only for review):", 
             "defaultValue": state['document_content'], "readonly": True, "rows": 10},
            {"type": "radio", "label": "Approve Document?", "name": "approved", "required": True,
             "options": [{"label": "Approve", "value": "yes"}, {"label": "Reject", "value": "no"}]},
            {"type": "textarea", "label": "Review Comments (if any)", "name": "comments", "required": False}
        ]
    })
    # Map LangGraph thread_id to KirokuForms task_id for webhook lookup
    kiroku_task_id = updated_state_with_task_info.get('hitl_task_id')
    if kiroku_task_id and state.get('thread_id'):
         thread_task_map[kiroku_task_id] = state['thread_id']
         print(f"Thread {state['thread_id']}: Kiroku Task {kiroku_task_id} created. Waiting for webhook.")
    else:
        print(f"Thread {state['thread_id']}: Warning - Kiroku Task ID or Thread ID missing after interrupt call.")

    return updated_state_with_task_info


def process_human_feedback_node(state: DocumentState) -> DocumentState:
    thread_id = state['thread_id']
    print(f"Thread {thread_id}: Processing human feedback for document {state['document_id']}")
    if state.get("human_input"):
        approved = state["human_input"].get("approved") == "yes"
        comments = state["human_input"].get("comments", "")
        final_status = "approved_by_human" if approved else "rejected_by_human"
        print(f"Thread {thread_id}: Document review result - Approved: {approved}, Comments: {comments}")
        return {**state, "status": final_status, "notes": comments}
    
    print(f"Thread {thread_id}: Error - No human_input found in state during feedback processing.")
    return {**state, "status": "error_no_human_input"}

# Build workflow
async_workflow_builder = StateGraph(DocumentState)
async_workflow_builder.add_node("start_processing", start_document_processing_node)
async_workflow_builder.add_node("request_review", request_human_review_node) # This node calls the interrupt
async_workflow_builder.add_node("process_feedback", process_human_feedback_node)

async_workflow_builder.set_entry_point("start_processing")
async_workflow_builder.add_edge("start_processing", "request_review")
# Graph pauses at request_review node (due to interrupt call with webhook).
# When webhook is received, it will resume graph execution which should proceed to process_feedback.
async_workflow_builder.add_edge("request_review", "process_feedback") 
async_workflow_builder.add_edge("process_feedback", END)

# Compile workflow
# For a real async system with webhooks, you MUST use a persistent checkpointer.
# Example: from langgraph.checkpoint.sqlite import SqliteSaver
# memory = SqliteSaver.from_conn_string(":memory:") # Or a file path
# graph_async = async_workflow_builder.compile(checkpointer=memory)
# For this simplified example, we will manage resumption manually without an explicit checkpointer in compile(),
# relying on the FastAPI handler to re-invoke with state. This is NOT robust for production.
# A proper checkpointer allows graph.get_state and graph.update_state.
# The KirokuForms interrupt handler is designed to work best with a checkpointer.
graph_async = async_workflow_builder.compile()


# --- FastAPI Application ---
app_fastapi = FastAPI()

class ProcessRequest(BaseModel):
    document_id: str
    content: str
    # thread_id: Optional[str] = None # Client can provide or we generate

class KirokuWebhookPayload(BaseModel):
    eventType: str
    taskId: str
    formId: str
    submissionId: str
    timestamp: str
    data: dict # Contains formData which has the human's input

async def run_graph_async_task(thread_id: str, initial_state: DocumentState):
    """Helper to run graph in background after initial invoke for webhook setup"""
    config = {"configurable": {"thread_id": thread_id}}
    # This first invoke runs until the interrupt (request_human_review_node)
    # The interrupt handler should populate hitl_task_id and hitl_form_url.
    # The graph execution pauses here.
    current_graph_state = graph_async.invoke(initial_state, config)
    
    kiroku_task_id = current_graph_state.get('hitl_task_id')
    form_url = current_graph_state.get('hitl_form_url')

    if kiroku_task_id:
        thread_task_map[kiroku_task_id] = thread_id
        print(f"POST /process_document (Thread: {thread_id}): Kiroku Task {kiroku_task_id} created. Form URL: {form_url}. Workflow paused awaiting webhook.")
    else:
        print(f"POST /process_document (Thread: {thread_id}): Error - Kiroku Task ID not found after interrupt call. State: {current_graph_state}")


@app_fastapi.post("/process_document")
async def start_doc_processing_endpoint(payload: ProcessRequest, background_tasks: BackgroundTasks):
    thread_id = str(uuid.uuid4()) # Generate a unique thread_id for this workflow instance
    
    initial_state = DocumentState(
        document_id=payload.document_id,
        document_content=payload.content,
        thread_id=thread_id,
        status="pending_start",
        hitl_task_id=None,
        hitl_form_url=None,
        human_input=None,
        notes=None
    )
    
    # Run the initial part of the graph in the background up to the HITL pause point
    # This ensures the API call returns quickly.
    background_tasks.add_task(run_graph_async_task, thread_id, initial_state)
    
    # The client will need to get the form_url through other means if not returned immediately,
    # or the system needs to notify the user. For this example, the form_url is logged by run_graph_async_task.
    return {
        "message": "Document processing initiated. Awaiting human review if required.",
        "thread_id": thread_id,
        "status_note": "Workflow will pause for human review; form URL will be logged by the server. Check server logs."
    }


@app_fastapi.post("/webhook/kirokuforms_hitl_callback")
async def kiroku_webhook_receiver_endpoint(payload: KirokuWebhookPayload, request: Request, background_tasks: BackgroundTasks):
    # IMPORTANT: Add robust webhook signature verification in production!
    # header_signature = request.headers.get("X-Kiroku-Signature")
    # raw_body = await request.body()
    # if not human_review_async_interrupt.verify_webhook_signature(raw_body, header_signature, "YOUR_SECURE_WEBHOOK_SECRET"):
    #     print("Webhook Error: Invalid signature")
    #     return {"status": "error", "detail": "Invalid signature"}, 401

    print(f"Webhook received - Kiroku Task ID: {payload.taskId}, Event: {payload.eventType}")

    if payload.eventType == "hitl.task.completed":
        kiroku_task_id = payload.taskId
        thread_id = thread_task_map.get(kiroku_task_id)

        if not thread_id:
            print(f"Webhook Error: No LangGraph thread_id found mapped for Kiroku Task ID: {kiroku_task_id}")
            return {"status": "error", "detail": "Thread ID not found for task."}

        human_form_data = payload.data.get("formData", {})
        print(f"Webhook for Thread {thread_id}: Human input received: {human_form_data}")
        
        # Prepare the input to resume the graph. This contains the human's submitted data.
        # The KirokuForms interrupt handler is designed to expect this in the 'human_input' field of the state.
        resume_input = {"human_input": human_form_data}
        
        config = {"configurable": {"thread_id": thread_id}}
        
        # Resume the graph by invoking it with the new input (human_form_data).
        # The graph's checkpointer (if configured) would load the state for thread_id,
        # merge this input, and continue execution from where it paused.
        # Since we're not using a persistent checkpointer directly in graph.compile for this simplified demo,
        # the 'invoke' here effectively continues the flow, and the state is managed in-memory by LangGraph for the thread.
        
        # To make this cleaner with a real checkpointer, you might do:
        # current_state = graph_async.get_state(config)
        # updated_values_for_state = {**current_state.values, "human_input": human_form_data}
        # graph_async.update_state(config, updated_values_for_state)
        # final_result = graph_async.invoke(None, config) # Invoke with None as input to continue with updated state

        # For simplicity with the current setup (no explicit checkpointer in compile):
        # The KirokuForms interrupt handler, when it resumes after a webhook,
        # typically handles updating the state internally before continuing the graph.
        # The following invoke passes the human_input which the already paused graph thread will pick up.
        
        final_result = graph_async.invoke(resume_input, config) # Pass human_input to be merged
        
        print(f"Webhook for Thread {thread_id}: Workflow resumed and completed. Final state: {final_result}")
        
        if kiroku_task_id in thread_task_map:
            del thread_task_map[kiroku_task_id] # Clean up map
        
        return {"status": "success_webhook_processed", "thread_id": thread_id, "final_status_from_graph": final_result.get("status")}
    
    return {"status": "webhook_event_ignored", "event_type": payload.eventType}

# To run this FastAPI app (save as, e.g., main.py):
# uvicorn main:app_fastapi --reload --port 8000
#
# And use ngrok for a public URL if testing KirokuForms webhooks from the cloud:
# ngrok http 8000
# Update WEBHOOK_BASE_URL with your ngrok https URL.

Asynchronous Workflow Highlights

  • An API endpoint starts a LangGraph workflow for document processing.
  • When human review is needed, the KirokuForms interrupt handler (configured with a webhook_url) creates the HITL task and the graph pauses for that thread, but the API call returns immediately.
  • KirokuForms sends a notification to a dedicated webhook endpoint in the API when the human completes the review.
  • The webhook handler retrieves the human's input and resumes the corresponding LangGraph workflow instance using its thread_id.
  • Important: A persistent checkpointer (e.g., Redis, SQL) is essential for robust asynchronous webhook-driven workflows to manage graph states across requests. This example simplifies state mapping for clarity.

Common Integration Patterns

Synchronous vs. Asynchronous HITL

Execution Modes

KirokuForms interrupt handlers adapt their behavior:
  • Synchronous (Default): If no webhook_url is provided to create_kiroku_interrupt_handler, the handler will effectively pause the LangGraph execution thread and can be configured to poll KirokuForms for task completion (or you manage resumption manually). The graph waits.
  • Asynchronous (via Webhooks): If a webhook_url is provided, the interrupt handler creates the KirokuForm task and allows the LangGraph invocation to return quickly. The graph execution truly pauses and relies on the external webhook notification to be resumed later. This is ideal for non-blocking APIs. The wait_for_completion parameter in the interrupt call options can further influence behavior if direct polling is desired over webhooks in some specific synchronous scenarios.

Form Definition Strategies

Defining Forms for HITL Tasks

When calling the KirokuForms interrupt handler (or KirokuFormsHITL client methods), you can define the review form in several ways:
  • Dynamic Field Definition: Pass a form_fields array directly in the call options, as shown in the examples. This gives maximum flexibility to tailor forms based on the current state or context.
  • Using KirokuForms Templates: For standardized review processes, you can create form templates within the KirokuForms dashboard. Then, instead of form_fields, pass a template_id in the options. The interrupt handler will use this template. (Note: The kirokuforms-langgraph library would need to support passing template_id; check its documentation for current capabilities).

Error Handling & Timeouts

Try It Yourself

Prerequisites to Run Examples

  1. A KirokuForms account and an API Key.
  2. The kirokuforms-langgraph Python package: pip install kirokuforms-langgraph.
  3. LangGraph and any other required libraries (e.g., langchain-openai, fastapi, uvicorn, pydantic) installed: pip install langgraph langchain-openai fastapi uvicorn pydantic.
  4. For the AI Agent example, an OpenAI API key.
  5. For the Asynchronous API example, a tool like ngrok to expose your local webhook endpoint to the internet during development.

You can adapt these examples by replacing placeholder API keys and modifying the workflow logic to fit your specific use case. For more detailed setup instructions and the full codebase, check out the KirokuForms LangGraph Python SDK repository on GitHub.

Further Exploration

Continue exploring KirokuForms and LangGraph capabilities: