LangGraph
LangGraph 技術概要 — ステートフルなAIオーケストレーションフレームワークの全容
目次
- はじめに
- コアコンセプト: StateGraph、ノード、エッジ、条件付きエッジ
- 状態管理とチェックポインティング
- 組み込みパターン: ReActエージェント、ツール呼び出し
- Human-in-the-Loop ワークフロー
- サブグラフとマルチエージェントアーキテクチャ
- ストリーミングとリアルタイム処理
- 永続化とメモリ
- LangGraph Platformによるデプロイメント
- 実践的なコードサンプル
- 他のオーケストレーションフレームワークとの比較
- ベストプラクティスとパフォーマンス最適化
1. はじめに
1.1 LangGraphとは何か
LangGraph は LangChain チームが開発した、LLM (Large Language Model) を活用したステートフルかつマルチアクターなアプリケーションを構築するための低レベルオーケストレーションフレームワークである。Google の Pregel や Apache Beam からインスピレーションを得て設計されており、NetworkX に似たパブリックインターフェースを持つ。
従来のLLMアプリケーション開発では、単純なプロンプトチェーンやDAG (有向非巡回グラフ) ベースのワークフローが主流であった。しかし、実世界のエージェントシステムでは、状態の永続化、条件分岐、ループ処理、人間の介入、障害からの復旧といった複雑な要件が不可欠である。LangGraph はこれらの要件に対し、グラフベースのオーケストレーションという統一的なアプローチで解決策を提供する。
pip install -U langgraph
Python 3.10以上が必要であり、MITライセンスで公開されている。2026年4月時点での最新バージョンは 1.1.6 であり、GitHubでは 28,700以上のスター、4,900以上のフォークを獲得している。
1.2 LangChainとの関係
LangGraph はLangChainエコシステムの一部であるが、LangChain本体とは独立して使用可能である。LangChainが提供するモデルアダプタやツール統合などのコンポーネントは便利であるが、LangGraphの中核機能はLangChainへの依存なしに利用できる。
この設計思想は重要である。LangGraph はプロンプトエンジニアリングやアーキテクチャの抽象化を提供するのではなく、エージェントワークフローの実行基盤としての役割に専念している。これにより、開発者はアプリケーションロジックに集中しながら、耐久性のある実行環境を享受できる。
1.3 なぜLangGraphが必要なのか
従来のLLMアプリケーション開発における課題を整理すると、LangGraphの必要性が明確になる。
課題1: ステートレスな実行 多くのLLMフレームワークはリクエスト単位のステートレス実行を前提としている。しかし、実世界のエージェントは複数のステップにまたがるコンテキストを維持する必要がある。
課題2: 障害耐性の欠如 長時間実行されるエージェントワークフローでは、ネットワーク障害、API制限、予期せぬエラーが頻発する。中断地点から正確に再開する能力が不可欠である。
課題3: 人間の監督の困難さ 完全自律型のAIエージェントはリスクが高く、適切なタイミングで人間が介入し、状態を確認・修正できる仕組みが求められる。
課題4: 複雑なワークフローの表現力不足 線形的なチェーンやシンプルなDAGでは、条件分岐、ループ、並行実行、サブワークフローといった複雑なパターンを表現しきれない。
LangGraph はこれらすべての課題に対して、グラフベースのプログラミングモデルという統一的な解決策を提供する。
1.4 主要な特徴
LangGraph が提供する中核的な能力は以下の4つに集約される。
-
耐久性のある実行 (Durable Execution): エージェントは障害を通じて永続化され、中断地点から正確に再開できる。長時間実行されるワークフローに対応し、自動的に復旧する。
-
Human-in-the-Loop: 開発者は任意の時点でエージェントの状態を検査し、修正できる。承認ワークフロー、エラー修正、戦略変更などに対応する。
-
包括的なメモリ: 進行中の推論のための短期ワーキングメモリと、セッション間で持続する長期永続メモリの両方をサポートする。
-
本番デプロイメント対応: スケーラブルで長時間実行可能なステートフルワークフローのためのインフラストラクチャを提供する。
1.5 採用企業
LangGraph は Klarna、Uber、Replit、Elastic、J.P.Morgan など、グローバル規模の企業に採用されている。これらの企業での利用実績は、フレームワークの成熟度と本番環境での信頼性を示している。
2. コアコンセプト
LangGraph のアーキテクチャを理解するためには、いくつかの基本概念を把握する必要がある。本章では、StateGraph、ノード、エッジ、条件付きエッジの各概念を詳細に解説する。
2.1 グラフの基本構造
LangGraph では、アプリケーションのワークフローを有向グラフとしてモデル化する。グラフは以下の3つの要素で構成される:
- ノード (Nodes): 実際の処理を行う関数
- エッジ (Edges): ノード間の遷移を定義する接続
- 状態 (State): グラフ全体で共有されるデータ構造
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
import operator
# 状態の定義
class AgentState(TypedDict):
messages: Annotated[list, operator.add]
current_step: str
iteration_count: int
# グラフの作成
graph = StateGraph(AgentState)
2.2 StateGraph
StateGraph は LangGraph の中心的なクラスであり、状態付きグラフを定義するための基盤を提供する。StateGraph のコンストラクタには状態のスキーマを渡す。これにより、グラフ内のすべてのノードが共有する状態の型が決定される。
from langgraph.graph import StateGraph
from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages
class ChatState(TypedDict):
"""チャットボットの状態定義"""
messages: Annotated[list, add_messages]
user_name: str
language: str
# StateGraphの初期化
workflow = StateGraph(ChatState)
状態の定義には Python の TypedDict を使用する。各フィールドには Annotated 型を使って リデューサー (reducer) を指定できる。リデューサーは、ノードからの出力を既存の状態にどのようにマージするかを定義する関数である。
2.2.1 リデューサーの概念
リデューサーは、複数のノードが同じ状態フィールドを更新する際の競合を解決するメカニズムである。
from typing import Annotated
import operator
class CounterState(TypedDict):
# operator.add: 新しい値を既存のリストに追加する
messages: Annotated[list, operator.add]
# リデューサーなし: 最後の値で上書きする
current_status: str
# カスタムリデューサー: 独自のマージロジック
score: Annotated[float, lambda old, new: max(old, new)]
add_messages はLangGraphが提供する特別なリデューサーで、メッセージリストのインテリジェントなマージを行う。同じIDのメッセージは更新され、新しいメッセージは追加される。
from langgraph.graph.message import add_messages
class ConversationState(TypedDict):
messages: Annotated[list, add_messages]
2.3 ノード (Nodes)
ノードは、グラフ内で実際の計算処理を行う Python 関数である。各ノードは現在の状態を入力として受け取り、状態の更新を出力として返す。
from langchain_openai import ChatOpenAI
model = ChatOpenAI(model="gpt-4o")
# ノード関数の定義
def chatbot_node(state: ChatState) -> dict:
"""チャットボットノード: ユーザーメッセージに応答する"""
response = model.invoke(state["messages"])
return {"messages": [response]}
def analyzer_node(state: ChatState) -> dict:
"""分析ノード: 会話の感情分析を行う"""
last_message = state["messages"][-1]
analysis = model.invoke([
{"role": "system", "content": "感情分析を行ってください。"},
{"role": "user", "content": last_message.content}
])
return {"messages": [analysis]}
# グラフにノードを追加
workflow.add_node("chatbot", chatbot_node)
workflow.add_node("analyzer", analyzer_node)
ノード関数の設計原則:
- 状態辞書の一部を受け取り、更新部分のみを返す
- 副作用 (API呼び出し、データベース操作など) を含められる
- 同期・非同期の両方をサポートする
# 非同期ノードの例
async def async_chatbot_node(state: ChatState) -> dict:
"""非同期チャットボットノード"""
response = await model.ainvoke(state["messages"])
return {"messages": [response]}
2.4 エッジ (Edges)
エッジは、ノード間の遷移を定義する。LangGraph は3種類のエッジをサポートする。
2.4.1 通常のエッジ
あるノードの実行完了後、常に特定のノードに遷移する。
# 通常のエッジ: startからchatbotへ
workflow.add_edge(START, "chatbot")
# chatbotからanalyzerへ
workflow.add_edge("chatbot", "analyzer")
# analyzerから終了へ
workflow.add_edge("analyzer", END)
2.4.2 条件付きエッジ (Conditional Edges)
条件付きエッジは、現在の状態に基づいて次のノードを動的に決定する。これにより、グラフ内での分岐ロジックを実現する。
def should_continue(state: ChatState) -> str:
"""次のノードを決定するルーティング関数"""
last_message = state["messages"][-1]
# ツール呼び出しがある場合
if last_message.tool_calls:
return "tools"
# 終了条件
return END
# 条件付きエッジの追加
workflow.add_conditional_edges(
"chatbot", # 起点ノード
should_continue, # ルーティング関数
{
"tools": "tool_node", # "tools" -> tool_nodeノードへ
END: END # END -> 終了
}
)
条件付きエッジのルーティング関数は、現在の状態を受け取り、次に遷移するノードの名前を文字列として返す。マッピング辞書を省略すると、返された文字列がそのままノード名として使用される。
2.4.3 エントリーポイントとフィニッシュポイント
START と END は特別な定数で、グラフの開始点と終了点を表す。
from langgraph.graph import START, END
# エントリーポイントの設定
workflow.add_edge(START, "first_node")
# フィニッシュポイントの設定
workflow.add_edge("last_node", END)
2.5 グラフのコンパイルと実行
定義したグラフは compile() メソッドでコンパイルし、実行可能なオブジェクトに変換する。
# グラフのコンパイル
app = workflow.compile()
# 同期実行
result = app.invoke({
"messages": [{"role": "user", "content": "こんにちは!"}],
"user_name": "田中",
"language": "ja"
})
# 非同期実行
result = await app.ainvoke({
"messages": [{"role": "user", "content": "Hello!"}],
"user_name": "Tanaka",
"language": "en"
})
2.6 グラフの可視化
LangGraph はグラフ構造を可視化する機能を提供する。これはデバッグやドキュメント作成に有用である。
# Mermaid記法での可視化
print(app.get_graph().draw_mermaid())
# PNG画像として保存 (graphvizが必要)
from IPython.display import Image
Image(app.get_graph().draw_mermaid_png())
2.7 完全な基本グラフの例
以下に、上述のコンセプトを統合した完全な例を示す。
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated, Literal
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
# 状態定義
class State(TypedDict):
messages: Annotated[list, add_messages]
step_count: int
# モデル初期化
model = ChatOpenAI(model="gpt-4o")
# ノード定義
def greeting_node(state: State) -> dict:
return {
"messages": [{"role": "assistant", "content": "いらっしゃいませ!"}],
"step_count": 1
}
def process_node(state: State) -> dict:
response = model.invoke(state["messages"])
return {
"messages": [response],
"step_count": state["step_count"] + 1
}
def farewell_node(state: State) -> dict:
return {
"messages": [{"role": "assistant", "content": "ありがとうございました!"}],
"step_count": state["step_count"] + 1
}
# ルーティング関数
def route_decision(state: State) -> Literal["process", "farewell"]:
if state["step_count"] < 3:
return "process"
return "farewell"
# グラフ構築
workflow = StateGraph(State)
workflow.add_node("greeting", greeting_node)
workflow.add_node("process", process_node)
workflow.add_node("farewell", farewell_node)
workflow.add_edge(START, "greeting")
workflow.add_conditional_edges("greeting", route_decision)
workflow.add_conditional_edges("process", route_decision)
workflow.add_edge("farewell", END)
# コンパイルと実行
app = workflow.compile()
result = app.invoke({
"messages": [{"role": "user", "content": "商品を探しています"}],
"step_count": 0
})
3. 状態管理とチェックポインティング
3.1 状態管理の基本概念
LangGraph における状態管理は、フレームワークの最も重要な機能の一つである。すべてのノード実行はステップ単位でチェックポイントされ、障害からの復旧や状態の巻き戻しが可能になる。
状態はグラフのすべてのノードで共有されるデータ構造であり、各ノードは状態を読み取り、更新を返す。この更新は、定義されたリデューサーに基づいて既存の状態にマージされる。
3.2 チェックポインターの仕組み
チェックポインター (Checkpointer) は、グラフ実行の各ステップにおける状態のスナップショットを保存する仕組みである。これにより以下が実現される:
- 障害からの自動復旧
- 状態の巻き戻し (タイムトラベル)
- Human-in-the-loop の中断・再開
- マルチターン会話の状態保持
from langgraph.checkpoint.memory import MemorySaver
# メモリベースのチェックポインター
memory = MemorySaver()
# チェックポインター付きでコンパイル
app = workflow.compile(checkpointer=memory)
# スレッドIDを指定して実行
config = {"configurable": {"thread_id": "user-123"}}
# 最初のターン
result1 = app.invoke(
{"messages": [{"role": "user", "content": "Pythonについて教えて"}]},
config=config
)
# 同じスレッドで2番目のターン (前の状態が保持される)
result2 = app.invoke(
{"messages": [{"role": "user", "content": "もっと詳しく"}]},
config=config
)
3.3 チェックポインターの種類
LangGraph はいくつかの組み込みチェックポインターを提供する:
メモリベース (開発用)
from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()
SQLiteベース (軽量永続化)
from langgraph.checkpoint.sqlite import SqliteSaver
# ファイルベースの永続化
checkpointer = SqliteSaver.from_conn_string("checkpoints.db")
# 非同期版
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
async_checkpointer = AsyncSqliteSaver.from_conn_string("checkpoints.db")
PostgreSQLベース (本番用)
from langgraph.checkpoint.postgres import PostgresSaver
checkpointer = PostgresSaver.from_conn_string(
"postgresql://user:password@localhost:5432/langgraph"
)
3.4 スレッドとチェックポイント
LangGraph では「スレッド」という概念で個別の実行コンテキストを管理する。各スレッドは独立したチェックポイント履歴を持ち、マルチユーザー環境での並行処理をサポートする。
# スレッド1: ユーザーAの会話
config_a = {"configurable": {"thread_id": "user-A-conversation"}}
result_a = app.invoke(
{"messages": [{"role": "user", "content": "天気を教えて"}]},
config=config_a
)
# スレッド2: ユーザーBの会話 (独立した状態)
config_b = {"configurable": {"thread_id": "user-B-conversation"}}
result_b = app.invoke(
{"messages": [{"role": "user", "content": "ニュースを教えて"}]},
config=config_b
)
3.5 状態のタイムトラベル
チェックポイントを利用することで、過去の状態に戻って再実行することが可能である。これは「タイムトラベル」と呼ばれ、デバッグやエラー修正に非常に有用である。
# チェックポイント履歴の取得
checkpoints = list(app.get_state_history(config))
# 特定のチェックポイントの状態を取得
for cp in checkpoints:
print(f"Checkpoint ID: {cp.config['configurable']['checkpoint_id']}")
print(f"Step: {cp.metadata.get('step', 'unknown')}")
print(f"State: {cp.values}")
print("---")
# 過去の状態から再実行
old_config = checkpoints[2].config # 3つ前の状態
result = app.invoke(
{"messages": [{"role": "user", "content": "別の方法で試して"}]},
config=old_config
)
3.6 状態スキーマの設計パターン
効果的な状態スキーマの設計は、LangGraphアプリケーションの品質に大きく影響する。
from typing import TypedDict, Annotated, Optional
from langgraph.graph.message import add_messages
import operator
class AdvancedState(TypedDict):
# メッセージ履歴 (add_messagesリデューサーで管理)
messages: Annotated[list, add_messages]
# ワーキングメモリ (最後の値で上書き)
current_task: str
current_plan: Optional[list[str]]
# 累積データ (addリデューサーで追加)
collected_data: Annotated[list[dict], operator.add]
# カウンター
retry_count: int
# フラグ
needs_human_review: bool
is_complete: bool
入力・出力スキーマの分離
LangGraph では、グラフの入力スキーマと出力スキーマを分離することで、インターフェースを明確にできる。
class InputState(TypedDict):
user_query: str
context: Optional[dict]
class OutputState(TypedDict):
response: str
sources: list[str]
confidence: float
class InternalState(InputState, OutputState):
messages: Annotated[list, add_messages]
intermediate_results: list[dict]
step: str
# 入力・出力スキーマを指定してグラフを作成
workflow = StateGraph(
InternalState,
input=InputState,
output=OutputState
)
4. 組み込みパターン: ReActエージェント、ツール呼び出し
4.1 ReActパターンの概要
ReAct (Reasoning + Acting) は、LLMが「推論」と「行動」を交互に繰り返すことで、複雑なタスクを段階的に解決するパターンである。LangGraph はこのパターンをファーストクラスでサポートしており、最も一般的なエージェントアーキテクチャとして広く利用されている。
ReActパターンの基本的な流れは以下の通りである:
- LLMがユーザーの入力を分析し、次のアクションを決定する
- ツールの呼び出しが必要な場合、適切なツールを選択して実行する
- ツールの実行結果をLLMにフィードバックする
- LLMが結果を評価し、追加のアクションが必要か判断する
- 十分な情報が得られたら、最終的な回答を生成する
4.2 ツールの定義
LangGraph でツールを定義する方法は複数ある。LangChain の @tool デコレータを使用する方法が最も一般的である。
from langchain_core.tools import tool
from typing import Optional
@tool
def search_web(query: str) -> str:
"""Webで情報を検索する。
Args:
query: 検索クエリ文字列
"""
# 実際の検索API呼び出し
results = perform_web_search(query)
return f"検索結果: {results}"
@tool
def get_weather(city: str, date: Optional[str] = None) -> str:
"""指定した都市の天気情報を取得する。
Args:
city: 都市名
date: 日付 (YYYY-MM-DD形式、省略時は今日)
"""
weather = fetch_weather_api(city, date)
return f"{city}の天気: {weather}"
@tool
def calculate(expression: str) -> str:
"""数学的な計算を実行する。
Args:
expression: 計算式 (例: "2 + 3 * 4")
"""
try:
result = eval(expression) # 本番環境ではsafer alternativeを使用
return f"計算結果: {result}"
except Exception as e:
return f"計算エラー: {str(e)}"
tools = [search_web, get_weather, calculate]
4.3 ツールノードの構築
LangGraph の ToolNode は、LLMのツール呼び出しを自動的に処理するプリビルトのノードである。
from langgraph.prebuilt import ToolNode
# ツールノードの作成
tool_node = ToolNode(tools)
4.4 ReActエージェントの完全な実装
以下に、ReActパターンを使用したエージェントの完全な実装を示す。
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import ToolNode
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from typing import TypedDict, Annotated, Literal
# 状態定義
class AgentState(TypedDict):
messages: Annotated[list, add_messages]
# モデルにツールをバインド
model = ChatOpenAI(model="gpt-4o")
model_with_tools = model.bind_tools(tools)
# エージェントノード
def agent_node(state: AgentState) -> dict:
"""LLMを呼び出してツール使用を判断するノード"""
response = model_with_tools.invoke(state["messages"])
return {"messages": [response]}
# ルーティング関数
def should_continue(state: AgentState) -> Literal["tools", "__end__"]:
"""ツール呼び出しが必要かどうかを判断する"""
last_message = state["messages"][-1]
if last_message.tool_calls:
return "tools"
return END
# ツールノード
tool_node = ToolNode(tools)
# グラフ構築
workflow = StateGraph(AgentState)
# ノード追加
workflow.add_node("agent", agent_node)
workflow.add_node("tools", tool_node)
# エッジ追加
workflow.add_edge(START, "agent")
workflow.add_conditional_edges(
"agent",
should_continue,
{"tools": "tools", END: END}
)
workflow.add_edge("tools", "agent") # ツール実行後、エージェントに戻る
# コンパイル
app = workflow.compile()
# 実行
result = app.invoke({
"messages": [{"role": "user", "content": "東京の明日の天気を教えて"}]
})
for msg in result["messages"]:
print(f"{msg.type}: {msg.content}")
4.5 create_react_agentヘルパー
LangGraph は create_react_agent というヘルパー関数を提供しており、ReActエージェントを数行で作成できる。
from langgraph.prebuilt import create_react_agent
# 最もシンプルなReActエージェント
agent = create_react_agent(
model=ChatOpenAI(model="gpt-4o"),
tools=tools
)
# システムプロンプト付き
agent = create_react_agent(
model=ChatOpenAI(model="gpt-4o"),
tools=tools,
prompt="あなたは親切なアシスタントです。日本語で回答してください。"
)
# チェックポインター付き
from langgraph.checkpoint.memory import MemorySaver
agent = create_react_agent(
model=ChatOpenAI(model="gpt-4o"),
tools=tools,
checkpointer=MemorySaver()
)
# 実行
result = agent.invoke({
"messages": [{"role": "user", "content": "3 + 5 の計算をして"}]
})
4.6 構造化出力を伴うツール呼び出し
ツールの出力を構造化データとして処理するパターンも重要である。
from pydantic import BaseModel, Field
from langchain_core.tools import tool
class WeatherResult(BaseModel):
"""天気情報の構造化データ"""
city: str = Field(description="都市名")
temperature: float = Field(description="気温(摂氏)")
condition: str = Field(description="天気状態")
humidity: int = Field(description="湿度(%)")
@tool
def get_structured_weather(city: str) -> WeatherResult:
"""構造化された天気情報を取得する"""
# API呼び出しの模擬
return WeatherResult(
city=city,
temperature=22.5,
condition="晴れ",
humidity=65
)
4.7 並行ツール実行
複数のツール呼び出しが同時に発生した場合、ToolNode はそれらを並行して実行する。
# LLMが複数のツールを同時に呼び出すシナリオ
# 例: "東京と大阪の天気を比較して"
# -> get_weather("東京") と get_weather("大阪") が並行実行される
# カスタム並行実行ノード
import asyncio
async def parallel_tool_node(state: AgentState) -> dict:
"""複数のツール呼び出しを並行実行するノード"""
last_message = state["messages"][-1]
tool_calls = last_message.tool_calls
# 非同期で並行実行
tasks = []
for tc in tool_calls:
tool_fn = tool_map[tc["name"]]
tasks.append(tool_fn.ainvoke(tc["args"]))
results = await asyncio.gather(*tasks)
# 結果をメッセージとして返す
tool_messages = []
for tc, result in zip(tool_calls, results):
tool_messages.append({
"role": "tool",
"content": str(result),
"tool_call_id": tc["id"]
})
return {"messages": tool_messages}
4.8 エラーハンドリングとリトライ
ツール実行時のエラーハンドリングは、堅牢なエージェントにとって不可欠である。
from langgraph.prebuilt import ToolNode
from langchain_core.messages import ToolMessage
class RobustToolNode(ToolNode):
"""エラーハンドリング付きのツールノード"""
def _run_one(self, call, config):
try:
result = super()._run_one(call, config)
return result
except Exception as e:
return ToolMessage(
content=f"ツール実行エラー: {str(e)}。別の方法を試してください。",
tool_call_id=call["id"]
)
# リトライロジック付きエージェント
class RetryState(TypedDict):
messages: Annotated[list, add_messages]
retry_count: int
def agent_with_retry(state: RetryState) -> dict:
"""リトライ機能付きエージェントノード"""
max_retries = 3
if state["retry_count"] >= max_retries:
return {
"messages": [{"role": "assistant",
"content": "申し訳ありません。処理を完了できませんでした。"}],
"retry_count": state["retry_count"]
}
response = model_with_tools.invoke(state["messages"])
return {
"messages": [response],
"retry_count": state["retry_count"] + 1
}
5. Human-in-the-Loop ワークフロー
5.1 Human-in-the-Loopの必要性
AI エージェントが完全に自律的に動作することは、多くのビジネスシナリオにおいてリスクが高い。高額な取引の承認、機密データへのアクセス、取り消し不能な操作の実行など、人間の判断が必要な場面は数多く存在する。LangGraph のHuman-in-the-Loop (HITL) 機能は、エージェントのワークフローに自然な形で人間の介入ポイントを組み込むことを可能にする。
5.2 interrupt を使用した中断
LangGraph の interrupt 関数は、グラフの実行を任意の時点で一時停止し、人間の入力を待つことを可能にする。
from langgraph.types import interrupt, Command
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages
class HumanReviewState(TypedDict):
messages: Annotated[list, add_messages]
pending_action: dict
human_approved: bool
def propose_action(state: HumanReviewState) -> dict:
"""アクションを提案するノード"""
# LLMがアクションを提案
proposed = {
"action": "send_email",
"to": "client@example.com",
"subject": "契約書の送付",
"body": "添付の契約書をご確認ください。"
}
return {"pending_action": proposed}
def human_review(state: HumanReviewState) -> dict:
"""人間のレビューを求めるノード"""
action = state["pending_action"]
# ここで実行が中断され、人間の入力を待つ
decision = interrupt({
"question": "以下のアクションを承認しますか?",
"proposed_action": action,
"options": ["approve", "reject", "modify"]
})
if decision == "approve":
return {"human_approved": True}
else:
return {"human_approved": False}
def execute_action(state: HumanReviewState) -> dict:
"""承認されたアクションを実行するノード"""
if state["human_approved"]:
# アクションを実行
action = state["pending_action"]
result = execute_email(action)
return {"messages": [{"role": "assistant",
"content": f"アクション完了: {result}"}]}
else:
return {"messages": [{"role": "assistant",
"content": "アクションはキャンセルされました。"}]}
# グラフ構築
workflow = StateGraph(HumanReviewState)
workflow.add_node("propose", propose_action)
workflow.add_node("review", human_review)
workflow.add_node("execute", execute_action)
workflow.add_edge(START, "propose")
workflow.add_edge("propose", "review")
workflow.add_edge("review", "execute")
workflow.add_edge("execute", END)
# チェックポインター付きでコンパイル (HITL に必須)
app = workflow.compile(checkpointer=MemorySaver())
# 実行 (review ノードで中断する)
config = {"configurable": {"thread_id": "review-1"}}
result = app.invoke(
{"messages": [{"role": "user", "content": "クライアントにメールを送って"}],
"pending_action": {},
"human_approved": False},
config=config
)
# 人間が承認して再開
result = app.invoke(
Command(resume="approve"),
config=config
)
5.3 ブレークポイントによる中断
ノード実行前後にブレークポイントを設定して、自動的に中断することも可能である。
# ブレークポイント付きコンパイル
app = workflow.compile(
checkpointer=MemorySaver(),
interrupt_before=["execute"], # executeノードの前で中断
interrupt_after=["propose"] # proposeノードの後で中断
)
# 実行 (指定されたポイントで自動的に中断)
config = {"configurable": {"thread_id": "bp-1"}}
result = app.invoke(initial_state, config=config)
# 現在の状態を確認
current_state = app.get_state(config)
print(f"次のノード: {current_state.next}")
print(f"現在の状態: {current_state.values}")
# 状態を確認後、実行を再開
result = app.invoke(None, config=config)
5.4 状態の動的編集
Human-in-the-Loopでは、中断中に状態を直接編集することも可能である。
# 状態の取得
current_state = app.get_state(config)
# 状態の更新 (人間が修正)
app.update_state(
config,
{
"pending_action": {
"action": "send_email",
"to": "modified@example.com", # メールアドレスを修正
"subject": "修正済み: 契約書の送付",
"body": "修正された内容です。"
}
}
)
# 修正された状態で実行を再開
result = app.invoke(None, config=config)
5.5 承認ワークフローのパターン
以下に、実用的な承認ワークフローの設計パターンを示す。
from enum import Enum
class ApprovalLevel(Enum):
AUTO = "auto" # 自動承認
SINGLE = "single" # 単一承認者
MULTI = "multi" # 複数承認者
ESCALATION = "escalation" # エスカレーション
class ApprovalState(TypedDict):
messages: Annotated[list, add_messages]
action: dict
risk_level: str
approvals: Annotated[list[dict], operator.add]
approval_count: int
required_approvals: int
def assess_risk(state: ApprovalState) -> dict:
"""アクションのリスクレベルを評価する"""
action = state["action"]
if action.get("amount", 0) > 100000:
return {"risk_level": "high", "required_approvals": 3}
elif action.get("amount", 0) > 10000:
return {"risk_level": "medium", "required_approvals": 2}
else:
return {"risk_level": "low", "required_approvals": 1}
def request_approval(state: ApprovalState) -> dict:
"""承認を要求する"""
remaining = state["required_approvals"] - state["approval_count"]
decision = interrupt({
"message": f"承認が必要です (残り{remaining}名)",
"action": state["action"],
"risk_level": state["risk_level"],
"current_approvals": state["approvals"]
})
return {
"approvals": [decision],
"approval_count": state["approval_count"] + 1
}
def check_approval_status(state: ApprovalState) -> str:
"""承認状況を確認する"""
if state["approval_count"] >= state["required_approvals"]:
if all(a["decision"] == "approve" for a in state["approvals"]):
return "execute"
return "reject"
return "request_approval"
6. サブグラフとマルチエージェントアーキテクチャ
6.1 サブグラフの概念
LangGraph では、グラフの中に別のグラフをネストする「サブグラフ」をサポートしている。これにより、大規模なワークフローをモジュール化し、再利用可能なコンポーネントとして管理できる。
# サブグラフ: リサーチャーエージェント
class ResearchState(TypedDict):
query: str
results: Annotated[list[str], operator.add]
summary: str
def search_node(state: ResearchState) -> dict:
results = perform_search(state["query"])
return {"results": results}
def summarize_node(state: ResearchState) -> dict:
summary = summarize_results(state["results"])
return {"summary": summary}
research_workflow = StateGraph(ResearchState)
research_workflow.add_node("search", search_node)
research_workflow.add_node("summarize", summarize_node)
research_workflow.add_edge(START, "search")
research_workflow.add_edge("search", "summarize")
research_workflow.add_edge("summarize", END)
research_graph = research_workflow.compile()
# メイングラフにサブグラフを組み込む
class MainState(TypedDict):
messages: Annotated[list, add_messages]
research_results: str
final_report: str
def call_researcher(state: MainState) -> dict:
"""リサーチャーサブグラフを呼び出す"""
last_message = state["messages"][-1].content
result = research_graph.invoke({"query": last_message, "results": [], "summary": ""})
return {"research_results": result["summary"]}
main_workflow = StateGraph(MainState)
main_workflow.add_node("researcher", call_researcher)
main_workflow.add_node("report_writer", write_report_node)
main_workflow.add_edge(START, "researcher")
main_workflow.add_edge("researcher", "report_writer")
main_workflow.add_edge("report_writer", END)
6.2 マルチエージェントパターン
LangGraph が提供するマルチエージェントアーキテクチャのパターンは主に3種類ある。
6.2.1 スーパーバイザーパターン
スーパーバイザーエージェントが、タスクを適切なワーカーエージェントに振り分ける。
from langchain_openai import ChatOpenAI
class SupervisorState(TypedDict):
messages: Annotated[list, add_messages]
next_agent: str
task_complete: bool
# スーパーバイザーノード
def supervisor_node(state: SupervisorState) -> dict:
"""タスクを適切なエージェントに振り分ける"""
model = ChatOpenAI(model="gpt-4o")
response = model.invoke([
{"role": "system", "content": """あなたはタスク管理者です。
利用可能なエージェント:
- researcher: 情報検索と分析
- coder: コード生成と修正
- writer: 文書作成とレビュー
タスクに最適なエージェントを選択してください。
完了した場合は 'FINISH' と回答してください。"""},
*state["messages"]
])
next_agent = response.content.strip()
return {
"next_agent": next_agent,
"task_complete": next_agent == "FINISH"
}
# ワーカーエージェント
def researcher_node(state: SupervisorState) -> dict:
model = ChatOpenAI(model="gpt-4o")
response = model.invoke([
{"role": "system", "content": "あなたはリサーチ専門エージェントです。"},
*state["messages"]
])
return {"messages": [response]}
def coder_node(state: SupervisorState) -> dict:
model = ChatOpenAI(model="gpt-4o")
response = model.invoke([
{"role": "system", "content": "あなたはコーディング専門エージェントです。"},
*state["messages"]
])
return {"messages": [response]}
def writer_node(state: SupervisorState) -> dict:
model = ChatOpenAI(model="gpt-4o")
response = model.invoke([
{"role": "system", "content": "あなたは文書作成専門エージェントです。"},
*state["messages"]
])
return {"messages": [response]}
# ルーティング
def route_to_agent(state: SupervisorState) -> str:
if state["task_complete"]:
return END
return state["next_agent"]
# グラフ構築
workflow = StateGraph(SupervisorState)
workflow.add_node("supervisor", supervisor_node)
workflow.add_node("researcher", researcher_node)
workflow.add_node("coder", coder_node)
workflow.add_node("writer", writer_node)
workflow.add_edge(START, "supervisor")
workflow.add_conditional_edges("supervisor", route_to_agent, {
"researcher": "researcher",
"coder": "coder",
"writer": "writer",
END: END
})
# 各ワーカーからスーパーバイザーに戻る
workflow.add_edge("researcher", "supervisor")
workflow.add_edge("coder", "supervisor")
workflow.add_edge("writer", "supervisor")
app = workflow.compile()
6.2.2 スウォーム (Swarm) パターン
各エージェントが自律的に次のエージェントを決定し、処理をハンドオフする分散型パターン。
from langgraph.types import Command
class SwarmState(TypedDict):
messages: Annotated[list, add_messages]
current_agent: str
def agent_a(state: SwarmState) -> Command:
"""エージェントA: 初期分析を行い、次のエージェントに引き継ぐ"""
model = ChatOpenAI(model="gpt-4o")
response = model.invoke([
{"role": "system", "content": "分析を行い、技術的な質問はagent_bに、"
"ビジネスの質問はagent_cに引き継いでください。"},
*state["messages"]
])
# 動的にハンドオフ先を決定
if "技術" in response.content or "コード" in response.content:
goto = "agent_b"
elif "ビジネス" in response.content or "戦略" in response.content:
goto = "agent_c"
else:
goto = END
return Command(
update={"messages": [response], "current_agent": goto},
goto=goto
)
6.2.3 階層型マルチエージェント
複数のスーパーバイザーが階層構造を形成し、大規模なタスクを分割統治するパターン。
# レベル1: プロジェクトマネージャー
# レベル2: チームリーダー (各専門分野)
# レベル3: 個別ワーカーエージェント
# チームサブグラフ
def create_team_subgraph(team_name: str, workers: list[str]):
"""チームサブグラフを生成するファクトリ関数"""
class TeamState(TypedDict):
messages: Annotated[list, add_messages]
task: str
results: Annotated[list[str], operator.add]
def team_leader(state: TeamState) -> dict:
# チームリーダーがタスクを分解し、ワーカーに割り当て
model = ChatOpenAI(model="gpt-4o")
response = model.invoke([
{"role": "system",
"content": f"あなたは{team_name}チームのリーダーです。"
f"利用可能なワーカー: {workers}"},
*state["messages"]
])
return {"messages": [response]}
workflow = StateGraph(TeamState)
workflow.add_node("leader", team_leader)
# ワーカーノードの追加 (省略)
workflow.add_edge(START, "leader")
workflow.add_edge("leader", END)
return workflow.compile()
7. ストリーミングとリアルタイム処理
7.1 ストリーミングの概要
LangGraph は、エージェントの実行結果をリアルタイムでストリーミングする複数のモードを提供する。これにより、長時間実行されるエージェントワークフローにおいて、ユーザーに即座にフィードバックを提供できる。
7.2 ストリーミングモード
LangGraph は主に3つのストリーミングモードをサポートする。
7.2.1 values モード
グラフの各ステップ終了後に、完全な状態を出力する。
# values モード: 各ステップの完全な状態をストリーム
for state_snapshot in app.stream(
{"messages": [{"role": "user", "content": "分析してください"}]},
stream_mode="values"
):
print(f"Current state: {state_snapshot}")
if "messages" in state_snapshot:
latest_msg = state_snapshot["messages"][-1]
print(f"Latest message: {latest_msg.content}")
7.2.2 updates モード
各ノードが返した差分 (更新) のみを出力する。デフォルトのモードである。
# updates モード: 各ノードの更新のみをストリーム
for update in app.stream(
{"messages": [{"role": "user", "content": "調査して"}]},
stream_mode="updates"
):
for node_name, node_output in update.items():
print(f"Node '{node_name}' produced: {node_output}")
7.2.3 messages モード
LLM のトークン単位でのストリーミングを提供する。チャットアプリケーションでの利用に最適である。
# messages モード: トークン単位のストリーミング
for message_chunk, metadata in app.stream(
{"messages": [{"role": "user", "content": "詳しく説明して"}]},
stream_mode="messages"
):
if isinstance(message_chunk, AIMessageChunk):
print(message_chunk.content, end="", flush=True)
7.3 非同期ストリーミング
非同期環境でのストリーミングも完全にサポートされている。
async for event in app.astream(
{"messages": [{"role": "user", "content": "質問です"}]},
stream_mode="values"
):
print(event)
# 非同期イベントストリーミング
async for event in app.astream_events(
{"messages": [{"role": "user", "content": "質問です"}]},
version="v2"
):
kind = event["event"]
if kind == "on_chat_model_stream":
content = event["data"]["chunk"].content
if content:
print(content, end="", flush=True)
elif kind == "on_tool_start":
print(f"\nツール開始: {event['name']}")
elif kind == "on_tool_end":
print(f"\nツール完了: {event['data'].get('output', '')}")
7.4 複数ストリーミングモードの組み合わせ
複数のストリーミングモードを同時に使用することも可能である。
# 複数モードの同時使用
for event in app.stream(
{"messages": [{"role": "user", "content": "分析してください"}]},
stream_mode=["values", "updates", "messages"]
):
mode = event[0] # ストリーミングモード
data = event[1] # データ
if mode == "messages":
chunk, metadata = data
print(f"[Token] {chunk.content}", end="")
elif mode == "updates":
print(f"\n[Update] {data}")
elif mode == "values":
print(f"\n[State] Step completed")
7.5 カスタムストリーミングイベント
ノード内からカスタムイベントを発行することも可能である。
from langchain_core.callbacks import dispatch_custom_event
def processing_node(state: AgentState) -> dict:
"""処理中にカスタムイベントを発行するノード"""
# 進捗イベントの発行
dispatch_custom_event(
"progress",
{"step": "data_loading", "progress": 0.25}
)
data = load_data()
dispatch_custom_event(
"progress",
{"step": "analysis", "progress": 0.50}
)
analysis = analyze(data)
dispatch_custom_event(
"progress",
{"step": "complete", "progress": 1.0}
)
return {"messages": [{"role": "assistant", "content": analysis}]}
7.6 Webアプリケーションとの統合
FastAPI や他のWebフレームワークとのストリーミング統合例:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json
app_api = FastAPI()
@app_api.post("/chat/stream")
async def chat_stream(request: ChatRequest):
"""SSE (Server-Sent Events) でストリーミング応答を返す"""
async def event_generator():
async for event in agent.astream(
{"messages": [{"role": "user", "content": request.message}]},
stream_mode="messages",
config={"configurable": {"thread_id": request.thread_id}}
):
chunk, metadata = event
if chunk.content:
yield f"data: {json.dumps({'content': chunk.content})}\n\n"
yield f"data: {json.dumps({'done': True})}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream"
)
8. 永続化とメモリ
8.1 メモリアーキテクチャの概要
LangGraph のメモリシステムは、2つのレイヤーで構成される:
-
短期メモリ (Working Memory): 現在の会話セッション内で保持されるコンテキスト。チェックポインターを通じてスレッド単位で管理される。
-
長期メモリ (Persistent Memory): セッション間で持続する情報。ユーザーの嗜好、過去のインタラクション履歴、学習されたパターンなどを保持する。
┌─────────────────────────────────────────┐
│ LangGraph Memory │
├─────────────────┬───────────────────────┤
│ 短期メモリ │ 長期メモリ │
│ (Thread-scoped) │ (Cross-thread) │
├─────────────────┼───────────────────────┤
│ • 会話履歴 │ • ユーザープロファイル │
│ • 現在のタスク状態 │ • 嗜好設定 │
│ • 中間結果 │ • 過去の対話パターン │
│ • ツール実行結果 │ • 学習されたファクト │
└─────────────────┴───────────────────────┘
8.2 Store を使った長期メモリ
LangGraph の Store は、スレッドを超えて持続するデータを管理するためのキーバリューストアである。
from langgraph.store.memory import InMemoryStore
from langgraph.graph import StateGraph
# メモリストアの作成
store = InMemoryStore()
# グラフのコンパイル時にストアを指定
app = workflow.compile(
checkpointer=MemorySaver(),
store=store
)
# ノード内でストアにアクセス
def personalized_node(state: AgentState, *, store) -> dict:
"""ユーザーの嗜好をストアから取得して応答を生成する"""
user_id = state.get("user_id", "default")
# ユーザーの嗜好を取得
namespace = ("user_preferences", user_id)
preferences = store.search(namespace)
if preferences:
pref_data = preferences[0].value
system_prompt = f"ユーザーの嗜好: {pref_data}"
else:
system_prompt = "新しいユーザーです。"
# LLMで応答生成
response = model.invoke([
{"role": "system", "content": system_prompt},
*state["messages"]
])
# 新しい情報をストアに保存
store.put(
namespace,
"profile",
{"last_topic": state["messages"][-1].content,
"interaction_count": len(preferences) + 1}
)
return {"messages": [response]}
8.3 メモリの管理戦略
会話履歴のトリミング
長い会話でトークン制限に達しないよう、メッセージ履歴を適切にトリミングする。
from langchain_core.messages import trim_messages
def trim_conversation(state: AgentState) -> dict:
"""会話履歴をトリミングするノード"""
trimmed = trim_messages(
state["messages"],
max_tokens=4000,
strategy="last", # 最新のメッセージを保持
token_counter=model, # モデルのトークンカウンタを使用
include_system=True, # システムメッセージは保持
allow_partial=False # 部分的なメッセージを許可しない
)
return {"messages": trimmed}
サマリーベースのメモリ
長い会話を要約して保持する戦略:
class SummaryState(TypedDict):
messages: Annotated[list, add_messages]
summary: str
def summarize_conversation(state: SummaryState) -> dict:
"""会話を要約してメモリを圧縮する"""
if len(state["messages"]) > 10:
summary_prompt = f"""現在の要約: {state['summary']}
以下の新しいメッセージを含めて要約を更新してください:
{state['messages'][-5:]}"""
response = model.invoke([
{"role": "system", "content": "会話の要約を生成してください。"},
{"role": "user", "content": summary_prompt}
])
# 古いメッセージを削除し、要約を更新
return {
"summary": response.content,
"messages": state["messages"][-2:] # 最新2件のみ保持
}
return {}
8.4 永続化バックエンドの選択
| バックエンド | 用途 | 永続性 | スケーラビリティ |
|---|---|---|---|
MemorySaver | 開発・テスト | なし (インメモリ) | 単一プロセス |
SqliteSaver | 小規模本番 | ファイルベース | 単一サーバー |
PostgresSaver | 本番環境 | 完全永続化 | 水平スケール可能 |
| カスタムバックエンド | 特殊要件 | 実装依存 | 実装依存 |
# PostgreSQL本番設定
from langgraph.checkpoint.postgres import PostgresSaver
# 接続プール付き
checkpointer = PostgresSaver.from_conn_string(
"postgresql://user:password@db-host:5432/langgraph",
pipeline=True # パイプラインモードで高速化
)
# 非同期版
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
async_checkpointer = AsyncPostgresSaver.from_conn_string(
"postgresql://user:password@db-host:5432/langgraph"
)
9. LangGraph Platformによるデプロイメント
9.1 LangGraph Platform の概要
LangGraph Platform は、LangGraph アプリケーションを本番環境にデプロイするためのインフラストラクチャである。LangGraph Server、LangGraph CLI、LangGraph SDK、LangGraph Studio の4つのコンポーネントで構成される。
9.2 LangGraph Server
LangGraph Server は、LangGraph エージェントを REST API として公開するサーバーである。
# langgraph.json - サーバー設定ファイル
{
"dependencies": ["./requirements.txt"],
"graphs": {
"agent": "./agent.py:graph"
},
"env": {
"OPENAI_API_KEY": ""
}
}
# agent.py - エージェント定義
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
class State(TypedDict):
messages: Annotated[list, add_messages]
model = ChatOpenAI(model="gpt-4o")
def chatbot(state: State) -> dict:
response = model.invoke(state["messages"])
return {"messages": [response]}
workflow = StateGraph(State)
workflow.add_node("chatbot", chatbot)
workflow.add_edge(START, "chatbot")
workflow.add_edge("chatbot", END)
graph = workflow.compile()
デプロイメントコマンド
# LangGraph CLIのインストール
pip install langgraph-cli
# ローカル開発サーバーの起動
langgraph dev
# Dockerイメージのビルド
langgraph build -t my-agent:latest
# Docker Composeでの起動
docker compose up
9.3 LangGraph SDK
デプロイされたエージェントにプログラムからアクセスするための SDK。
from langgraph_sdk import get_client
# クライアントの初期化
client = get_client(url="http://localhost:8123")
# アシスタント (デプロイされたグラフ) の一覧
assistants = await client.assistants.search()
# スレッドの作成
thread = await client.threads.create()
# 実行
result = await client.runs.create(
thread["thread_id"],
assistant_id="agent",
input={"messages": [{"role": "user", "content": "こんにちは"}]}
)
# ストリーミング実行
async for event in client.runs.stream(
thread["thread_id"],
assistant_id="agent",
input={"messages": [{"role": "user", "content": "説明して"}]},
stream_mode="messages"
):
print(event)
9.4 LangGraph Studio
LangGraph Studio は、LangGraph アプリケーションの視覚的なデバッグとプロトタイピングツールである。グラフの構造をリアルタイムで可視化し、各ノードの状態を検査できる。
主な機能:
- グラフ構造のリアルタイム可視化
- ノード実行状態のステップバイステップ検査
- 状態の直接編集とリプレイ
- ブレークポイントの設定と管理
- ストリーミング出力のリアルタイム表示
9.5 LangSmith との統合
LangSmith はLangGraphアプリケーションの可観測性を提供する。
import os
# LangSmith トレーシングの有効化
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
os.environ["LANGCHAIN_PROJECT"] = "my-langgraph-project"
# トレースが自動的にLangSmithに送信される
result = app.invoke({"messages": [{"role": "user", "content": "テスト"}]})
9.6 本番デプロイメントのベストプラクティス
# docker-compose.yml
version: '3.8'
services:
langgraph-server:
image: my-agent:latest
ports:
- "8123:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- LANGCHAIN_API_KEY=${LANGCHAIN_API_KEY}
- DATABASE_URL=postgresql://user:pass@postgres:5432/langgraph
depends_on:
- postgres
- redis
deploy:
replicas: 3
resources:
limits:
memory: 2G
cpus: '1.0'
postgres:
image: postgres:16
environment:
POSTGRES_DB: langgraph
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
volumes:
- pgdata:/var/lib/postgresql/data
redis:
image: redis:7-alpine
volumes:
pgdata:
10. 実践的なコードサンプル
10.1 カスタマーサポートエージェント
以下は、実用的なカスタマーサポートエージェントの完全な実装例である。
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import ToolNode
from langgraph.graph.message import add_messages
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import interrupt
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from typing import TypedDict, Annotated, Literal, Optional
import operator
# ===== 状態定義 =====
class CustomerState(TypedDict):
messages: Annotated[list, add_messages]
customer_id: str
customer_info: Optional[dict]
ticket_id: Optional[str]
escalation_needed: bool
sentiment: str
# ===== ツール定義 =====
@tool
def lookup_customer(customer_id: str) -> dict:
"""顧客情報を検索する"""
# データベース検索の模擬
customers = {
"C001": {"name": "田中太郎", "plan": "Premium", "since": "2023-01"},
"C002": {"name": "佐藤花子", "plan": "Basic", "since": "2024-06"},
}
return customers.get(customer_id, {"error": "顧客が見つかりません"})
@tool
def create_ticket(subject: str, priority: str, description: str) -> str:
"""サポートチケットを作成する"""
ticket_id = f"TKT-{hash(subject) % 10000:04d}"
return f"チケット {ticket_id} を作成しました (優先度: {priority})"
@tool
def check_order_status(order_id: str) -> str:
"""注文状況を確認する"""
return f"注文 {order_id}: 発送済み (配送予定: 明日)"
@tool
def process_refund(order_id: str, amount: float, reason: str) -> str:
"""返金処理を実行する (要承認)"""
return f"返金処理 (注文: {order_id}, 金額: ¥{amount:,.0f}) を申請しました"
tools = [lookup_customer, create_ticket, check_order_status, process_refund]
model = ChatOpenAI(model="gpt-4o").bind_tools(tools)
# ===== ノード定義 =====
def classify_intent(state: CustomerState) -> dict:
"""問い合わせの意図を分類する"""
classifier = ChatOpenAI(model="gpt-4o-mini")
response = classifier.invoke([
{"role": "system", "content": """問い合わせの感情を分析してください。
回答: positive, neutral, negative, angry のいずれか"""},
state["messages"][-1]
])
return {"sentiment": response.content.strip()}
def agent_respond(state: CustomerState) -> dict:
"""エージェントが応答する"""
system_msg = f"""あなたはカスタマーサポートエージェントです。
顧客ID: {state.get('customer_id', '不明')}
顧客情報: {state.get('customer_info', '未取得')}
感情: {state.get('sentiment', '不明')}
丁寧で的確な対応を心がけてください。"""
response = model.invoke([
{"role": "system", "content": system_msg},
*state["messages"]
])
return {"messages": [response]}
def check_escalation(state: CustomerState) -> dict:
"""エスカレーションが必要か判断する"""
if state["sentiment"] in ["angry", "negative"]:
return {"escalation_needed": True}
return {"escalation_needed": False}
def human_escalation(state: CustomerState) -> dict:
"""人間のオペレーターにエスカレーションする"""
decision = interrupt({
"type": "escalation",
"customer_id": state["customer_id"],
"sentiment": state["sentiment"],
"conversation": [m.content for m in state["messages"][-3:]],
"question": "このケースを引き継ぎますか?"
})
if decision.get("action") == "takeover":
return {"messages": [{"role": "assistant",
"content": "担当オペレーターに転送いたします。少々お待ちください。"}]}
else:
return {"messages": [{"role": "assistant",
"content": decision.get("response", "引き続き対応いたします。")}]}
# ===== ルーティング =====
def route_after_agent(state: CustomerState) -> Literal["tools", "check_escalation", "__end__"]:
last_msg = state["messages"][-1]
if hasattr(last_msg, "tool_calls") and last_msg.tool_calls:
return "tools"
return "check_escalation"
def route_escalation(state: CustomerState) -> Literal["escalation", "__end__"]:
if state.get("escalation_needed"):
return "escalation"
return END
# ===== グラフ構築 =====
workflow = StateGraph(CustomerState)
workflow.add_node("classify", classify_intent)
workflow.add_node("agent", agent_respond)
workflow.add_node("tools", ToolNode(tools))
workflow.add_node("check_escalation", check_escalation)
workflow.add_node("escalation", human_escalation)
workflow.add_edge(START, "classify")
workflow.add_edge("classify", "agent")
workflow.add_conditional_edges("agent", route_after_agent)
workflow.add_edge("tools", "agent")
workflow.add_conditional_edges("check_escalation", route_escalation)
workflow.add_edge("escalation", END)
# コンパイル
support_agent = workflow.compile(checkpointer=MemorySaver())
10.2 RAG (Retrieval-Augmented Generation) エージェント
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
from typing import TypedDict, Annotated, Literal
from langgraph.graph.message import add_messages
class RAGState(TypedDict):
messages: Annotated[list, add_messages]
query: str
documents: list[dict]
answer: str
needs_more_info: bool
# ベクトルストア (事前構築済みと仮定)
embeddings = OpenAIEmbeddings()
vectorstore = FAISS.load_local("./knowledge_base", embeddings)
retriever = vectorstore.as_retriever(search_kwargs={"k": 5})
model = ChatOpenAI(model="gpt-4o")
def extract_query(state: RAGState) -> dict:
"""ユーザーメッセージから検索クエリを抽出する"""
last_msg = state["messages"][-1].content
response = model.invoke([
{"role": "system", "content": "ユーザーの質問から、文書検索に最適なクエリを生成してください。クエリのみを返してください。"},
{"role": "user", "content": last_msg}
])
return {"query": response.content}
def retrieve_documents(state: RAGState) -> dict:
"""関連文書を検索する"""
docs = retriever.invoke(state["query"])
return {"documents": [{"content": d.page_content, "source": d.metadata.get("source", "")} for d in docs]}
def grade_documents(state: RAGState) -> dict:
"""検索された文書の関連性を評価する"""
grader = ChatOpenAI(model="gpt-4o-mini")
relevant_docs = []
for doc in state["documents"]:
response = grader.invoke([
{"role": "system", "content": "この文書がクエリに関連するかを yes/no で答えてください。"},
{"role": "user", "content": f"クエリ: {state['query']}\n文書: {doc['content']}"}
])
if "yes" in response.content.lower():
relevant_docs.append(doc)
return {
"documents": relevant_docs,
"needs_more_info": len(relevant_docs) < 2
}
def generate_answer(state: RAGState) -> dict:
"""検索結果に基づいて回答を生成する"""
context = "\n\n".join([d["content"] for d in state["documents"]])
response = model.invoke([
{"role": "system", "content": f"""以下のコンテキストに基づいて回答してください。
コンテキストに情報がない場合は「情報が見つかりませんでした」と答えてください。
コンテキスト:
{context}"""},
*state["messages"]
])
return {
"messages": [response],
"answer": response.content
}
def route_after_grading(state: RAGState) -> Literal["rewrite_query", "generate"]:
if state["needs_more_info"] and state.get("retry_count", 0) < 2:
return "rewrite_query"
return "generate"
def rewrite_query(state: RAGState) -> dict:
"""クエリを書き換えて再検索する"""
response = model.invoke([
{"role": "system", "content": "検索結果が不十分でした。より良い検索クエリに書き換えてください。"},
{"role": "user", "content": f"元のクエリ: {state['query']}"}
])
return {"query": response.content}
# グラフ構築
rag_workflow = StateGraph(RAGState)
rag_workflow.add_node("extract_query", extract_query)
rag_workflow.add_node("retrieve", retrieve_documents)
rag_workflow.add_node("grade", grade_documents)
rag_workflow.add_node("generate", generate_answer)
rag_workflow.add_node("rewrite_query", rewrite_query)
rag_workflow.add_edge(START, "extract_query")
rag_workflow.add_edge("extract_query", "retrieve")
rag_workflow.add_edge("retrieve", "grade")
rag_workflow.add_conditional_edges("grade", route_after_grading)
rag_workflow.add_edge("rewrite_query", "retrieve")
rag_workflow.add_edge("generate", END)
rag_agent = rag_workflow.compile()
10.3 コード生成・実行エージェント
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from typing import TypedDict, Annotated, Literal
from langgraph.graph.message import add_messages
import subprocess
import tempfile
import os
class CodeAgentState(TypedDict):
messages: Annotated[list, add_messages]
code: str
execution_result: str
error: str
iteration: int
model = ChatOpenAI(model="gpt-4o")
def generate_code(state: CodeAgentState) -> dict:
"""ユーザーの要求に基づいてコードを生成する"""
error_context = ""
if state.get("error"):
error_context = f"\n\n前回のエラー:\n{state['error']}\n修正してください。"
response = model.invoke([
{"role": "system", "content": f"""Pythonコードを生成してください。
コードのみを返してください (```python ... ``` 形式)。{error_context}"""},
*state["messages"]
])
# コードブロックの抽出
content = response.content
if "```python" in content:
code = content.split("```python")[1].split("```")[0].strip()
elif "```" in content:
code = content.split("```")[1].split("```")[0].strip()
else:
code = content
return {"code": code, "iteration": state.get("iteration", 0) + 1}
def execute_code(state: CodeAgentState) -> dict:
"""生成されたコードを安全に実行する"""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write(state["code"])
temp_path = f.name
try:
result = subprocess.run(
["python", temp_path],
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0:
return {
"execution_result": result.stdout,
"error": ""
}
else:
return {
"execution_result": "",
"error": result.stderr
}
except subprocess.TimeoutExpired:
return {"error": "実行タイムアウト (30秒)"}
finally:
os.unlink(temp_path)
def present_result(state: CodeAgentState) -> dict:
"""結果をユーザーに提示する"""
if state["error"]:
content = f"コードの実行中にエラーが発生しました:\n```\n{state['error']}\n```"
else:
content = f"実行結果:\n```\n{state['execution_result']}\n```\n\n生成したコード:\n```python\n{state['code']}\n```"
return {"messages": [{"role": "assistant", "content": content}]}
def should_retry(state: CodeAgentState) -> Literal["generate", "present"]:
if state["error"] and state["iteration"] < 3:
return "generate"
return "present"
# グラフ構築
code_workflow = StateGraph(CodeAgentState)
code_workflow.add_node("generate", generate_code)
code_workflow.add_node("execute", execute_code)
code_workflow.add_node("present", present_result)
code_workflow.add_edge(START, "generate")
code_workflow.add_edge("generate", "execute")
code_workflow.add_conditional_edges("execute", should_retry)
code_workflow.add_edge("present", END)
code_agent = code_workflow.compile()
11. 他のオーケストレーションフレームワークとの比較
11.1 比較対象フレームワーク
AIエージェントオーケストレーション分野には、LangGraph以外にもいくつかの主要なフレームワークが存在する。それぞれの特徴を理解することで、LangGraphの位置付けと強みが明確になる。
11.2 比較表
| 特徴 | LangGraph | AutoGen | CrewAI | Semantic Kernel | Haystack |
|---|---|---|---|---|---|
| 開発元 | LangChain | Microsoft | CrewAI Inc. | Microsoft | deepset |
| 主要言語 | Python, JS | Python, .NET | Python | Python, C#, Java | Python |
| グラフベース | あり (中核) | なし | なし | 部分的 | あり (パイプライン) |
| ステートフル | ファーストクラス | 限定的 | 限定的 | あり | 限定的 |
| チェックポイント | 組み込み | なし | なし | なし | なし |
| Human-in-the-Loop | ネイティブ | 基本的 | 基本的 | 基本的 | なし |
| ストリーミング | 多モード | 基本的 | なし | あり | 基本的 |
| マルチエージェント | 柔軟 | 中核機能 | 中核機能 | 限定的 | なし |
| 本番デプロイ | Platform提供 | なし | なし | Azure統合 | なし |
| ライセンス | MIT | MIT (v0.4+) | MIT | MIT | Apache 2.0 |
11.3 詳細比較
LangGraph vs AutoGen
AutoGen はMicrosoft が開発したマルチエージェント会話フレームワークである。エージェント間の会話パターンに焦点を当てており、グループチャット形式のマルチエージェント協調が得意である。
LangGraphの優位点:
- グラフベースの明示的なワークフロー制御
- チェックポイントによる耐障害性
- 本番デプロイメントのサポート
- より細粒度の状態管理
AutoGenの優位点:
- マルチエージェント会話の設定が簡単
- コード実行環境の統合が充実
- .NETサポート
# LangGraph: 明示的なグラフ定義
workflow = StateGraph(State)
workflow.add_node("agent1", agent1_fn)
workflow.add_node("agent2", agent2_fn)
workflow.add_conditional_edges("agent1", router)
# AutoGen: 会話パターンベース
# agent1 = AssistantAgent("agent1", ...)
# agent2 = AssistantAgent("agent2", ...)
# group_chat = GroupChat(agents=[agent1, agent2], ...)
LangGraph vs CrewAI
CrewAI は、役割ベースのマルチエージェントシステムを簡単に構築するためのフレームワークである。「クルー」というメタファーで、各エージェントに役割、目標、バックストーリーを与える。
LangGraphの優位点:
- 低レベルの柔軟性
- 複雑なワークフローの表現力
- チェックポイントとHITL
- 本番環境への対応
CrewAIの優位点:
- 高レベルの抽象化で迅速なプロトタイピング
- 直感的なAPI
- 学習曲線が緩やか
LangGraph vs 従来のDAGフレームワーク (Airflow, Prefect)
従来のワークフローエンジンとの違いも理解が重要である。
従来のDAG: A → B → C → D (一方向、非循環)
LangGraph: A → B → C → B → C → D (循環可能、動的分岐)
↑ ↓
└── 条件判定 ←┘
LangGraphは循環グラフをサポートするため、エージェントが「考え直す」「ツールを再実行する」といった反復的なパターンを自然に表現できる。これはAirflow等のDAGベースのフレームワークでは困難である。
11.4 選択基準
以下の基準でフレームワークを選択することを推奨する:
- 高い制御性と本番対応が必要 → LangGraph
- 迅速なマルチエージェントプロトタイプ → CrewAI
- エージェント間会話の研究 → AutoGen
- Microsoft/Azureエコシステム → Semantic Kernel
- RAGパイプライン特化 → Haystack
12. ベストプラクティスとパフォーマンス最適化
12.1 設計原則
12.1.1 状態スキーマの設計
状態スキーマは、LangGraphアプリケーションの基盤である。以下の原則に従うことが推奨される。
# 良い設計: 明確な型、適切なリデューサー、ドキュメント
class WellDesignedState(TypedDict):
"""エージェントの状態定義
Attributes:
messages: 会話メッセージ履歴
current_task: 現在処理中のタスク
collected_data: 収集されたデータ
error_count: エラー発生回数
"""
messages: Annotated[list, add_messages]
current_task: Optional[str]
collected_data: Annotated[list[dict], operator.add]
error_count: int
# 悪い設計: 曖昧な型、リデューサーの欠如
class PoorlyDesignedState(TypedDict):
data: dict # 何のデータ?
stuff: list # 何のリスト?
flag: bool # 何のフラグ?
12.1.2 ノードの粒度
ノードは「一つのことをうまくやる」原則に従い、適切な粒度で分割する。
# 良い設計: 責務が明確
def fetch_data(state): ...
def validate_data(state): ...
def transform_data(state): ...
def store_results(state): ...
# 悪い設計: 一つのノードに多くの責務
def do_everything(state):
data = fetch()
if valid(data):
transformed = transform(data)
store(transformed)
...
12.1.3 エラーハンドリング
import logging
from functools import wraps
logger = logging.getLogger(__name__)
def with_error_handling(max_retries: int = 3):
"""エラーハンドリングデコレータ"""
def decorator(func):
@wraps(func)
def wrapper(state, *args, **kwargs):
for attempt in range(max_retries):
try:
return func(state, *args, **kwargs)
except Exception as e:
logger.error(f"ノード {func.__name__} でエラー "
f"(試行 {attempt + 1}/{max_retries}): {e}")
if attempt == max_retries - 1:
return {
"messages": [{"role": "assistant",
"content": f"エラーが発生しました: {str(e)}"}],
"error_count": state.get("error_count", 0) + 1
}
return wrapper
return decorator
@with_error_handling(max_retries=3)
def api_call_node(state: AgentState) -> dict:
"""外部APIを呼び出すノード"""
result = call_external_api(state["query"])
return {"messages": [{"role": "assistant", "content": result}]}
12.2 パフォーマンス最適化
12.2.1 並行ノード実行
独立したノードは並行して実行できる。
# Send APIを使用した並行実行
from langgraph.types import Send
def fan_out(state: AgentState) -> list[Send]:
"""複数のワーカーに並行してタスクを送る"""
tasks = state["tasks"]
return [Send("worker", {"task": task}) for task in tasks]
workflow.add_conditional_edges("planner", fan_out)
12.2.2 メッセージ履歴の管理
# トークン使用量を制御するメッセージ管理
def manage_messages(state: AgentState) -> dict:
messages = state["messages"]
# 直近のN件のみ保持
if len(messages) > 20:
# システムメッセージは常に保持
system_msgs = [m for m in messages if m.type == "system"]
recent_msgs = messages[-10:]
return {"messages": system_msgs + recent_msgs}
return {}
12.2.3 キャッシュ戦略
from functools import lru_cache
import hashlib
# 結果のキャッシュ
_cache = {}
def cached_tool_call(state: AgentState) -> dict:
"""ツール呼び出し結果をキャッシュする"""
query = state["messages"][-1].content
cache_key = hashlib.md5(query.encode()).hexdigest()
if cache_key in _cache:
return {"messages": [{"role": "assistant", "content": _cache[cache_key]}]}
result = expensive_api_call(query)
_cache[cache_key] = result
return {"messages": [{"role": "assistant", "content": result}]}
12.2.4 適切なモデルの選択
# タスクの複雑さに応じてモデルを使い分ける
model_fast = ChatOpenAI(model="gpt-4o-mini") # 軽量タスク向け
model_smart = ChatOpenAI(model="gpt-4o") # 高品質タスク向け
def route_to_model(state):
"""タスクの複雑さに応じてモデルを選択"""
complexity = assess_complexity(state["messages"][-1].content)
if complexity == "simple":
response = model_fast.invoke(state["messages"])
else:
response = model_smart.invoke(state["messages"])
return {"messages": [response]}
12.3 テスト戦略
import pytest
from langgraph.graph import StateGraph
# ユニットテスト: 個別ノードのテスト
def test_greeting_node():
state = {
"messages": [{"role": "user", "content": "こんにちは"}],
"step_count": 0
}
result = greeting_node(state)
assert "messages" in result
assert result["step_count"] == 1
# 統合テスト: グラフ全体のテスト
def test_full_workflow():
app = workflow.compile()
result = app.invoke({
"messages": [{"role": "user", "content": "テスト入力"}],
"step_count": 0
})
assert len(result["messages"]) > 1
assert result["step_count"] > 0
# チェックポイントのテスト
def test_checkpointing():
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)
config = {"configurable": {"thread_id": "test-1"}}
# 最初の実行
result1 = app.invoke(
{"messages": [{"role": "user", "content": "最初"}]},
config=config
)
# 同じスレッドで2回目
result2 = app.invoke(
{"messages": [{"role": "user", "content": "次"}]},
config=config
)
# 状態が保持されていることを確認
assert len(result2["messages"]) > len(result1["messages"])
# 条件付きエッジのテスト
def test_routing():
state_with_tools = {"messages": [mock_message_with_tool_calls]}
assert should_continue(state_with_tools) == "tools"
state_without_tools = {"messages": [mock_message_no_tools]}
assert should_continue(state_without_tools) == END
12.4 可観測性とモニタリング
import logging
import time
from contextlib import contextmanager
logger = logging.getLogger("langgraph.monitoring")
@contextmanager
def monitor_node(node_name: str):
"""ノード実行の監視"""
start_time = time.time()
logger.info(f"ノード '{node_name}' 開始")
try:
yield
elapsed = time.time() - start_time
logger.info(f"ノード '{node_name}' 完了 ({elapsed:.2f}秒)")
except Exception as e:
elapsed = time.time() - start_time
logger.error(f"ノード '{node_name}' エラー ({elapsed:.2f}秒): {e}")
raise
def monitored_node(state: AgentState) -> dict:
with monitor_node("processing"):
result = process_data(state)
return result
12.5 セキュリティ考慮事項
# 入力の検証とサニタイゼーション
from pydantic import BaseModel, validator
class SafeInput(BaseModel):
message: str
user_id: str
@validator('message')
def validate_message(cls, v):
if len(v) > 10000:
raise ValueError("メッセージが長すぎます")
# インジェクション対策
dangerous_patterns = ["__import__", "exec(", "eval(", "os.system"]
for pattern in dangerous_patterns:
if pattern in v:
raise ValueError("不正な入力が検出されました")
return v
# ツール実行の権限管理
def authorized_tool_node(state: AgentState) -> dict:
"""権限を確認してからツールを実行する"""
user_role = state.get("user_role", "viewer")
tool_call = state["messages"][-1].tool_calls[0]
# 権限マッピング
tool_permissions = {
"read_data": ["viewer", "editor", "admin"],
"modify_data": ["editor", "admin"],
"delete_data": ["admin"],
"process_refund": ["admin"]
}
required_roles = tool_permissions.get(tool_call["name"], ["admin"])
if user_role not in required_roles:
return {"messages": [{"role": "tool",
"content": "権限が不足しています。",
"tool_call_id": tool_call["id"]}]}
# 権限がある場合は実行
return tool_node.invoke(state)
まとめ
LangGraph は、LLMベースのステートフルなエージェントアプリケーションを構築するための包括的かつ柔軟なフレームワークである。本記事で解説した主要な概念を振り返ると:
-
グラフベースのアーキテクチャ: StateGraph、ノード、エッジ、条件付きエッジを組み合わせることで、複雑なワークフローを明示的に定義できる。
-
堅牢な状態管理: チェックポインティングにより、障害からの自動復旧、タイムトラベル、マルチターン会話が実現される。
-
柔軟なエージェントパターン: ReActパターン、ツール呼び出し、マルチエージェントアーキテクチャなど、多様なパターンをサポートする。
-
Human-in-the-Loop: interrupt機能とブレークポイントにより、安全なエージェント運用が可能である。
-
本番対応: LangGraph Platformにより、開発からデプロイメント、モニタリングまでの一貫したワークフローが提供される。
LangGraph のエコシステムは急速に発展しており、Klarna、Uber、J.P.Morgan などの大規模企業での採用は、その成熟度と信頼性を証明している。AIエージェントの開発においてLangGraphが提供する構造化されたアプローチは、複雑なワークフローの管理を大幅に簡素化し、本番環境での安定した運用を実現する。
本記事は2026年4月時点のLangGraph v1.1.6に基づいて執筆されています。最新情報については 公式ドキュメント を参照してください。