Build a production-ready Agentic RAG system with LangGraph, conversation memory, and human-in-the-loop query clarification
Overview • How It Works • LLM Providers • Implementation • Installation & Usage • Troubleshooting
If you like this project, a star ⭐️ would mean a lot :)
This repository demonstrates how to build an Agentic RAG (Retrieval-Augmented Generation) system using LangGraph with minimal code. It implements:
- 💬 Conversation Memory: Maintains context across multiple questions for natural dialogue
- 🔄 Query Clarification: Automatically rewrites ambiguous queries or asks for clarification
- 🔍 Hierarchical Indexing: Search small, specific chunks (Child) for precision, retrieve larger Parent chunks for context
- 🤖 Agent Orchestration: Uses LangGraph to coordinate the entire workflow
- 🧠 Intelligent Evaluation: Assesses relevance at the granular chunk level
- ✅ Self-Correction: Re-queries if initial results are insufficient
- 🔀 Multi-Agent Map-Reduce: Decomposes queries into parallel sub-queries for comprehensive answers
1️⃣ Learning Path: Interactive Notebook
Step-by-step tutorial perfect for understanding core concepts. Start here if you're new to Agentic RAG or want to experiment quickly. Focuses on the essential workflow without advanced features to keep things simple.
2️⃣ Building Path: Modular Project
Modular architecture where each component can be independently swapped. Use this approach if you want to build real applications or customize the system to your needs.
Examples of what you can customize:
- LLM Provider: Switch from Ollama to Claude, OpenAI, or Gemini (one line change)
- Agent Workflow: Add/remove nodes in the graph and customize system prompts for specific domains (legal, medical, etc.)
- PDF Conversion: Replace PyMuPDF with Docling, PaddleOCR, or other tools
- Embedding Models: Change dense/sparse embedding models via config
See the Modular Architecture section for details on how the system is organized and the Installation & Usage section to get started.
This approach combines the precision of small chunks with the contextual richness of large chunks, while understanding conversation flow, resolving unclear queries, and handling multi-faceted questions through parallel agent processing. The modular architecture ensures every component—from document processing to retrieval logic—can be customized without breaking the system.
Most RAG tutorials show basic concepts but lack production readiness. This repository bridges that gap by providing both learning materials and deployable code:
❌ Typical RAG repos:
- Simple pipelines that trade off precision vs context
- No conversation memory
- Static, non-adaptive retrieval
- Hard to customize for your use case
- No UI interface
- Single-threaded query processing
✅ This repo:
- Two learning paths: Interactive notebook AND modular project
- Hierarchical indexing for precision + context
- Conversation memory for natural dialogue
- Human-in-the-loop query clarification
- Multi-Agent Map-Reduce for parallel processing of complex queries
- Modular architecture - swap any component
- Provider-agnostic - use any LLM (Ollama, OpenAI, Gemini, Claude)
- UI interface - end-to-end Gradio app with document management
Before queries can be processed, documents are split twice for optimal retrieval:
- Parent Chunks: Large sections based on Markdown headers (H1, H2, H3)
- Child Chunks: Small, fixed-size pieces derived from parents
This approach combines the precision of small chunks for search with the contextual richness of large chunks for answer generation.
User Query → Conversation Analysis → Query Clarification →
Agent Reasoning → Search Child Chunks → Evaluate Relevance →
(If needed) → Retrieve Parent Chunks → Generate Answer → Return Response
- Analyzes recent conversation history to extract context
- Maintains conversational continuity across multiple questions
The system intelligently processes the user's query:
- Resolves references - Converts "How do I update it?" → "How do I update SQL?"
- Splits complex questions - Breaks multi-part questions into focused sub-queries
- Detects unclear queries - Identifies nonsense, insults, or vague questions
- Requests clarification - Uses human-in-the-loop to pause and ask for details
- Rewrites for retrieval - Optimizes query with specific, keyword-rich language
Multi-Agent Map-Reduce Architecture:
When the query analysis stage identifies multiple distinct questions (either explicitly asked or decomposed from a complex query), the system automatically spawns parallel agent subgraphs using LangGraph's Send API. Each agent independently processes one question through the full retrieval workflow:
- Agent searches child chunks for precision
- Evaluates if results are sufficient
- Fetches parent chunks for context if needed
- Extracts final answer from conversation
- Self-corrects and re-queries if insufficient
All agent responses are then aggregated into a unified answer.
Example: "What is JavaScript? What is Python?" → 2 parallel agents execute simultaneously
Single question workflow: For simple queries, a single agent executes the retrieval workflow without parallelization.
The system synthesizes information from retrieved chunks (or multiple agents) into a coherent, accurate answer that directly addresses the user's question.
This system is provider-agnostic - you can use any LLM supported by LangChain. Choose the option that best fits your needs:
Install Ollama and download the model:
# Install Ollama from https://ollama.com
ollama pull qwen3:4b-instruct-2507-q4_K_MPython code:
from langchain_ollama import ChatOllama
llm = ChatOllama(model="qwen3:4b-instruct-2507-q4_K_M", temperature=0)Install the package:
pip install -qU langchain-google-genaiPython code:
import os
from langchain_google_genai import ChatGoogleGenerativeAI
# Set your Google API key
os.environ["GOOGLE_API_KEY"] = "your-api-key-here"
llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash-exp", temperature=0)Click to expand
OpenAI:
pip install -qU langchain-openaifrom langchain_openai import ChatOpenAI
import os
os.environ["OPENAI_API_KEY"] = "your-api-key-here"
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)Anthropic Claude:
pip install -qU langchain-anthropicfrom langchain_anthropic import ChatAnthropic
import os
os.environ["ANTHROPIC_API_KEY"] = "your-api-key-here"
llm = ChatAnthropic(model="claude-3-5-sonnet-20241022", temperature=0)- All providers work with the exact same code - only the LLM initialization changes
- Cost considerations: Cloud providers charge per token, while Ollama is free but requires local compute
💡 Recommendation: Start with Ollama for development, then switch to Google Gemini or OpenAI for production.
Additional details and extended explanations are available in the notebook here 👉
Define paths and initialize core components.
import os
from pathlib import Path
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_qdrant.fastembed_sparse import FastEmbedSparse
from qdrant_client import QdrantClient
# Configuration
DOCS_DIR = "docs" # Directory containing your pdfs files
MARKDOWN_DIR = "markdown" # Directory containing the pdfs converted to markdown
PARENT_STORE_PATH = "parent_store" # Directory for parent chunk JSON files
CHILD_COLLECTION = "document_child_chunks"
os.makedirs(DOCS_DIR, exist_ok=True)
os.makedirs(MARKDOWN_DIR, exist_ok=True)
os.makedirs(PARENT_STORE_PATH, exist_ok=True)
from langchain_ollama import ChatOllama
llm = ChatOllama(model="qwen3:4b-instruct-2507-q4_K_M", temperature=0)
# Dense embeddings for semantic understanding
dense_embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2")
# Sparse embeddings for keyword matching
sparse_embeddings = FastEmbedSparse(model_name="Qdrant/bm25")
# Qdrant client (local file-based storage)
client = QdrantClient(path="qdrant_db")Set up Qdrant to store child chunks with hybrid search capabilities.
from qdrant_client.http import models as qmodels
from langchain_qdrant import QdrantVectorStore
from langchain_qdrant.qdrant import RetrievalMode
# Get embedding dimension
embedding_dimension = len(dense_embeddings.embed_query("test"))
def ensure_collection(collection_name):
"""Create Qdrant collection if it doesn't exist"""
if not client.collection_exists(collection_name):
client.create_collection(
collection_name=collection_name,
vectors_config=qmodels.VectorParams(
size=embedding_dimension,
distance=qmodels.Distance.COSINE
),
sparse_vectors_config={
"sparse": qmodels.SparseVectorParams()
},
)
print(f"✓ Created collection: {collection_name}")
else:
print(f"✓ Collection already exists: {collection_name}")Convert the PDFs to Markdown. For more details about other techniques use this companion notebook:
import os
import pymupdf.layout
import pymupdf4llm
from pathlib import Path
import glob
os.environ["TOKENIZERS_PARALLELISM"] = "false"
def pdf_to_markdown(pdf_path, output_dir):
doc = pymupdf.open(pdf_path)
md = pymupdf4llm.to_markdown(doc, header=False, footer=False, page_separators=True, ignore_images=True, write_images=False, image_path=None)
md_cleaned = md.encode('utf-8', errors='surrogatepass').decode('utf-8', errors='ignore')
output_path = Path(output_dir) / Path(doc.name).stem
Path(output_path).with_suffix(".md").write_bytes(md_cleaned.encode('utf-8'))
def pdfs_to_markdowns(path_pattern, overwrite: bool = False):
output_dir = Path(MARKDOWN_DIR)
output_dir.mkdir(parents=True, exist_ok=True)
for pdf_path in map(Path, glob.glob(path_pattern)):
md_path = (output_dir / pdf_path.stem).with_suffix(".md")
if overwrite or not md_path.exists():
pdf_to_markdown(pdf_path, output_dir)
pdfs_to_markdowns(f"{DOCS_DIR}/*.pdf")Process documents with the Parent/Child splitting strategy.
import os
import glob
import json
from pathlib import Path
from langchain_text_splitters import MarkdownHeaderTextSplitter, RecursiveCharacterTextSplitter
if client.collection_exists(CHILD_COLLECTION):
print(f"Removing existing Qdrant collection: {CHILD_COLLECTION}")
client.delete_collection(CHILD_COLLECTION)
ensure_collection(CHILD_COLLECTION)
else:
ensure_collection(CHILD_COLLECTION)
child_vector_store = QdrantVectorStore(
client=client,
collection_name=CHILD_COLLECTION,
embedding=dense_embeddings,
sparse_embedding=sparse_embeddings,
retrieval_mode=RetrievalMode.HYBRID,
sparse_vector_name="sparse"
)
def index_documents():
headers_to_split_on = [("#", "H1"), ("##", "H2"), ("###", "H3")]
parent_splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on, strip_headers=False)
child_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=100)
min_parent_size = 2000
max_parent_size = 10000
all_parent_pairs, all_child_chunks = [], []
md_files = sorted(glob.glob(os.path.join(MARKDOWN_DIR, "*.md")))
if not md_files:
print(f"⚠️ No .md files found in {MARKDOWN_DIR}/")
return
for doc_path_str in md_files:
doc_path = Path(doc_path_str)
print(f"📄 Processing: {doc_path.name}")
try:
with open(doc_path, "r", encoding="utf-8") as f:
md_text = f.read()
except Exception as e:
print(f"❌ Error reading {doc_path.name}: {e}")
continue
parent_chunks = parent_splitter.split_text(md_text)
merged_parents = merge_small_parents(parent_chunks, min_parent_size)
split_parents = split_large_parents(merged_parents, max_parent_size, child_splitter)
cleaned_parents = clean_small_chunks(split_parents, min_parent_size)
for i, p_chunk in enumerate(cleaned_parents):
parent_id = f"{doc_path.stem}_parent_{i}"
p_chunk.metadata.update({"source": doc_path.stem + ".pdf", "parent_id": parent_id})
all_parent_pairs.append((parent_id, p_chunk))
children = child_splitter.split_documents([p_chunk])
all_child_chunks.extend(children)
if not all_child_chunks:
print("⚠️ No child chunks to index")
return
print(f"\n🔍 Indexing {len(all_child_chunks)} child chunks into Qdrant...")
try:
child_vector_store.add_documents(all_child_chunks)
print("✓ Child chunks indexed successfully")
except Exception as e:
print(f"❌ Error indexing child chunks: {e}")
return
print(f"💾 Saving {len(all_parent_pairs)} parent chunks to JSON...")
for item in os.listdir(PARENT_STORE_PATH):
os.remove(os.path.join(PARENT_STORE_PATH, item))
for parent_id, doc in all_parent_pairs:
doc_dict = {"page_content": doc.page_content, "metadata": doc.metadata}
filepath = os.path.join(PARENT_STORE_PATH, f"{parent_id}.json")
with open(filepath, "w", encoding="utf-8") as f:
json.dump(doc_dict, f, ensure_ascii=False, indent=2)
def merge_small_parents(chunks, min_size):
if not chunks:
return []
merged, current = [], None
for chunk in chunks:
if current is None:
current = chunk
else:
current.page_content += "\n\n" + chunk.page_content
for k, v in chunk.metadata.items():
if k in current.metadata:
current.metadata[k] = f"{current.metadata[k]} -> {v}"
else:
current.metadata[k] = v
if len(current.page_content) >= min_size:
merged.append(current)
current = None
if current:
if merged:
merged[-1].page_content += "\n\n" + current.page_content
for k, v in current.metadata.items():
if k in merged[-1].metadata:
merged[-1].metadata[k] = f"{merged[-1].metadata[k]} -> {v}"
else:
merged[-1].metadata[k] = v
else:
merged.append(current)
return merged
def split_large_parents(chunks, max_size, splitter):
split_chunks = []
for chunk in chunks:
if len(chunk.page_content) <= max_size:
split_chunks.append(chunk)
else:
large_splitter = RecursiveCharacterTextSplitter(
chunk_size=max_size,
chunk_overlap=splitter._chunk_overlap
)
sub_chunks = large_splitter.split_documents([chunk])
split_chunks.extend(sub_chunks)
return split_chunks
def clean_small_chunks(chunks, min_size):
cleaned = []
for i, chunk in enumerate(chunks):
if len(chunk.page_content) < min_size:
if cleaned:
cleaned[-1].page_content += "\n\n" + chunk.page_content
for k, v in chunk.metadata.items():
if k in cleaned[-1].metadata:
cleaned[-1].metadata[k] = f"{cleaned[-1].metadata[k]} -> {v}"
else:
cleaned[-1].metadata[k] = v
elif i < len(chunks) - 1:
chunks[i + 1].page_content = chunk.page_content + "\n\n" + chunks[i + 1].page_content
for k, v in chunk.metadata.items():
if k in chunks[i + 1].metadata:
chunks[i + 1].metadata[k] = f"{v} -> {chunks[i + 1].metadata[k]}"
else:
chunks[i + 1].metadata[k] = v
else:
cleaned.append(chunk)
else:
cleaned.append(chunk)
return cleaned
index_documents()Create the retrieval tools the agent will use.
import json
from typing import List
from langchain_core.tools import tool
@tool
def search_child_chunks(query: str, limit: int) -> str:
"""Search for the top K most relevant child chunks.
Args:
query: Search query string
limit: Maximum number of results to return
"""
try:
results = child_vector_store.similarity_search(query, k=limit, score_threshold=0.7)
if not results:
return "NO_RELEVANT_CHUNKS"
return "\n\n".join([
f"Parent ID: {doc.metadata.get('parent_id', '')}\n"
f"File Name: {doc.metadata.get('source', '')}\n"
f"Content: {doc.page_content.strip()}"
for doc in results
])
except Exception as e:
return f"RETRIEVAL_ERROR: {str(e)}"
@tool
def retrieve_parent_chunks(parent_id: str) -> str:
"""Retrieve full parent chunks by their IDs.
Args:
parent_id: Parent chunk ID to retrieve
"""
file_name = parent_id if parent_id.lower().endswith(".json") else f"{parent_id}.json"
path = os.path.join(PARENT_STORE_PATH, file_name)
if not os.path.exists(path):
return "NO_PARENT_DOCUMENT"
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
return (
f"Parent ID: {parent_id}\n"
f"File Name: {data.get('metadata', {}).get('source', 'unknown')}\n"
f"Content: {data.get('page_content', '').strip()}"
)
# Bind tools to LLM
llm_with_tools = llm.bind_tools([search_child_chunks, retrieve_parent_chunks])Define the system prompts for conversation summarization, query analysis, RAG agent reasoning, and response aggregation.
def get_conversation_summary_prompt() -> str:
return """You are an expert conversation summarizer.
Your task is to create a brief 1-2 sentence summary of the conversation (max 30-50 words).
Include:
- Main topics discussed
- Important facts or entities mentioned
- Any unresolved questions if applicable
- Sources file name (e.g., file1.pdf) or documents referenced
Exclude:
-Greetings, misunderstandings, off-topic content.
Output:
- Return ONLY the summary.
- Do NOT include any explanations or justifications.
-If no meaningful topics exist, return an empty string.
"""
def get_query_analysis_prompt() -> str:
return """You are an expert query analyst and rewriter.
Your task is to rewrite the current user query for optimal document retrieval, incorporating conversation context only when necessary.
Rules:
1. Self-contained queries:
- Always rewrite the query to be clear and self-contained
- If the query is a follow-up (e.g., "what about X?", "and for Y?"), integrate minimal necessary context from the summary
- Do not add information not present in the query or conversation summary
2. Domain-specific terms:
- Product names, brands, proper nouns, or technical terms are treated as domain-specific
- For domain-specific queries, use conversation context minimally or not at all
- Use the summary only to disambiguate vague queries
3. Grammar and clarity:
- Fix grammar, spelling errors, and unclear abbreviations
- Remove filler words and conversational phrases
- Preserve concrete keywords and named entities
4. Multiple information needs:
- If the query contains multiple distinct, unrelated questions, split into separate queries (maximum 3)
- Each sub-query must remain semantically equivalent to its part of the original
- Do not expand, enrich, or reinterpret the meaning
5. Failure handling:
- If the query intent is unclear or unintelligible, mark as "unclear"
Input:
- conversation_summary: A concise summary of prior conversation
- current_query: The user's current query
Output:
- One or more rewritten, self-contained queries suitable for document retrieval
"""
def get_rag_agent_prompt() -> str:
return """You are an expert retrieval-augmented assistant.
Your task is to act as a researcher: search documents first, analyze the data, and then provide a comprehensive answer using ONLY the retrieved information.
Rules:
1. You are NOT allowed to answer immediately.
2. Before producing ANY final answer, you MUST perform a document search and observe retrieved content.
3. If you have not searched, the answer is invalid.
Workflow:
1. Search for 5-7 relevant excerpts from documents based on the user query using the 'search_child_chunks' tool.
2. Inspect retrieved excerpts and keep ONLY relevant ones.
3. Analyze the retrieved excerpts. Identify the single most relevant excerpt that is fragmented (e.g., cut-off text or missing context). Call 'retrieve_parent_chunks' for that specific `parent_id`. Wait for the observation. Repeat this step sequentially for other highly relevant fragments ONLY if the current information is still insufficient. Stop immediately if you have enough information or have retrieved 3 parent chunks.
4. Answer using ONLY the retrieved information, ensuring that ALL relevant details are included.
5. List unique file name(s) at the very end.
Retry rule:
- After step 2 or 3, if no relevant documents are found or if retrieved excerpts don't contain useful information, rewrite the query using broader or alternative terms and restart from step 1.
- Do not retry more than once.
"""
def get_aggregation_prompt() -> str:
return """You are an expert aggregation assistant.
Your task is to combine multiple retrieved answers into a single, comprehensive and natural response that flows well.
Guidelines:
1. Write in a conversational, natural tone - as if explaining to a colleague
2. Use ONLY information from the retrieved answers
3. Strip out any questions, headers, or metadata from the sources
4. Weave together the information smoothly, preserving important details, numbers, and examples
5. Be comprehensive - include all relevant information from the sources, not just a summary
6. If sources disagree, acknowledge both perspectives naturally (e.g., "While some sources suggest X, others indicate Y...")
7. Start directly with the answer - no preambles like "Based on the sources..."
Formatting:
- Use Markdown for clarity (headings, lists, bold) but don't overdo it
- Write in flowing paragraphs where possible rather than excessive bullet points
- End with "---\n**Sources:**\n" followed by a bulleted list of unique file names
- File names should ONLY appear in this final sources section
If there's no useful information available, simply say: "I couldn't find any information to answer your question in the available sources."
"""Create the state structure for conversation tracking and agent execution.
from langgraph.graph import MessagesState
from pydantic import BaseModel, Field
from typing import List, Annotated
def accumulate_or_reset(existing: List[dict], new: List[dict]) -> List[dict]:
"""Custom reducer that allows resetting agent answers"""
if new and any(item.get('__reset__') for item in new):
return []
return existing + new
class State(MessagesState):
"""State for main agent graph"""
questionIsClear: bool = False
conversation_summary: str = ""
originalQuery: str = ""
rewrittenQuestions: List[str] = []
agent_answers: Annotated[List[dict], accumulate_or_reset] = []
class AgentState(MessagesState):
"""State for individual agent subgraph"""
question: str = ""
question_index: int = 0
final_answer: str = ""
agent_answers: List[dict] = []
class QueryAnalysis(BaseModel):
"""Structured output for query analysis"""
is_clear: bool = Field(description="Indicates if the user's question is clear and answerable")
questions: List[str] = Field(description="List of rewritten, self-contained questions")
clarification_needed: str = Field(description="Explanation if the question is unclear")Create the processing nodes for the LangGraph workflow.
from langgraph.types import Send
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage, RemoveMessage
from typing import Literal
def analyze_chat_and_summarize(state: State):
"""
Analyzes chat history and summarizes key points for context.
"""
if len(state["messages"]) < 4: # Need some history to summarize
return {"conversation_summary": ""}
# Extract relevant messages (excluding current query and system messages)
relevant_msgs = [
msg for msg in state["messages"][:-1] # Exclude current query
if isinstance(msg, (HumanMessage, AIMessage))
and not getattr(msg, "tool_calls", None)
]
if not relevant_msgs:
return {"conversation_summary": ""}
conversation = "Conversation history:\n"
for msg in relevant_msgs[-6:]:
role = "User" if isinstance(msg, HumanMessage) else "Assistant"
conversation += f"{role}: {msg.content}\n"
summary_response = llm.with_config(temperature=0.2).invoke([SystemMessage(content=get_conversation_summary_prompt())] + [HumanMessage(content=conversation)])
return {"conversation_summary": summary_response.content, "agent_answers": [{"__reset__": True}]}
def analyze_and_rewrite_query(state: State):
"""
Analyzes user query and rewrites it for clarity, optionally using conversation context.
"""
last_message = state["messages"][-1]
conversation_summary = state.get("conversation_summary", "")
context_section = (f"Conversation Context:\n{conversation_summary}\n" if conversation_summary.strip() else "") + f"User Query:\n{last_message.content}\n"
llm_with_structure = llm.with_config(temperature=0.1).with_structured_output(QueryAnalysis)
response = llm_with_structure.invoke([SystemMessage(content=get_query_analysis_prompt())] + [HumanMessage(content=context_section)])
if len(response.questions) > 0 and response.is_clear:
# Remove all non-system messages
delete_all = [
RemoveMessage(id=m.id)
for m in state["messages"]
if not isinstance(m, SystemMessage)
]
return {
"questionIsClear": True,
"messages": delete_all,
"originalQuery": last_message.content,
"rewrittenQuestions": response.questions
}
else:
clarification = response.clarification_needed if (response.clarification_needed and len(response.clarification_needed.strip()) > 10) else "I need more information to understand your question."
return {
"questionIsClear": False,
"messages": [AIMessage(content=clarification)]
}
def human_input_node(state: State):
"""Placeholder node for human-in-the-loop interruption"""
return {}
def route_after_rewrite(state: State) -> Literal["human_input", "process_question"]:
"""Route to agent if question is clear, otherwise wait for human input"""
if not state.get("questionIsClear", False):
return "human_input"
else:
# Spawn parallel agents for each sub-question using Send API
return [
Send("process_question", {"question": query, "question_index": idx, "messages": []})
for idx, query in enumerate(state["rewrittenQuestions"])
]
def agent_node(state: AgentState):
"""Main agent node that processes queries using tools"""
sys_msg = SystemMessage(content=get_rag_agent_prompt())
if not state.get("messages"):
human_msg = HumanMessage(content=state["question"])
response = llm_with_tools.invoke([sys_msg] + [human_msg])
return {"messages": [human_msg, response]}
return {"messages": [llm_with_tools.invoke([sys_msg] + state["messages"])]}
def extract_final_answer(state: AgentState):
"""Extract final answer from agent conversation"""
for msg in reversed(state["messages"]):
if isinstance(msg, AIMessage) and msg.content and not msg.tool_calls:
res = {
"final_answer": msg.content,
"agent_answers": [{
"index": state["question_index"],
"question": state["question"],
"answer": msg.content
}]
}
return res
return {
"final_answer": "Unable to generate an answer.",
"agent_answers": [{
"index": state["question_index"],
"question": state["question"],
"answer": "Unable to generate an answer."
}]
}
def aggregate_responses(state: State):
"""Merge multiple agent responses into final answer"""
if not state.get("agent_answers"):
return {"messages": [AIMessage(content="No answers were generated.")]}
sorted_answers = sorted(state["agent_answers"], key=lambda x: x["index"])
formatted_answers = ""
for i, ans in enumerate(sorted_answers, start=1):
formatted_answers += f"\nAnswer {i}:\n{ans['answer']}\n"
user_message = HumanMessage(content=f"""Original user question: {state["originalQuery"]}\nRetrieved answers:{formatted_answers}""")
synthesis_response = llm.invoke([SystemMessage(content=get_aggregation_prompt())] + [user_message])
return {"messages": [AIMessage(content=synthesis_response.content)]}Why this architecture?
- Summarization maintains conversational context without overwhelming the LLM
- Query rewriting ensures search queries are precise and unambiguous, using context intelligently
- Human-in-the-loop catches unclear queries before wasting retrieval resources
- Parallel execution with
SendAPI spawns independent agent subgraphs for each sub-question - Answer extraction ensures we get clean final answers from agent tool-calling conversations
- Aggregation merges all parallel results into a coherent single response
Assemble the complete workflow graph with conversation memory and multi-agent architecture.
from langgraph.graph import START, END, StateGraph
from langgraph.prebuilt import ToolNode, tools_condition
from langgraph.checkpoint.memory import InMemorySaver
from IPython.display import Image, display
# Initialize checkpointer for conversation memory
checkpointer = InMemorySaver()
# Build agent subgraph (handles individual questions)
agent_builder = StateGraph(AgentState)
agent_builder.add_node("agent", agent_node)
agent_builder.add_node("tools", ToolNode([search_child_chunks, retrieve_parent_chunks]))
agent_builder.add_node("extract_answer", extract_final_answer)
agent_builder.add_edge(START, "agent")
agent_builder.add_conditional_edges("agent", tools_condition, {"tools": "tools", END: "extract_answer"})
agent_builder.add_edge("tools", "agent")
agent_builder.add_edge("extract_answer", END)
agent_subgraph = agent_builder.compile()
# Build main graph (orchestrates workflow)
graph_builder = StateGraph(State)
# Add nodes
graph_builder.add_node("summarize", analyze_chat_and_summarize)
graph_builder.add_node("analyze_rewrite", analyze_and_rewrite_query)
graph_builder.add_node("human_input", human_input_node)
graph_builder.add_node("process_question", agent_subgraph)
graph_builder.add_node("aggregate", aggregate_responses)
# Define edges
graph_builder.add_edge(START, "summarize")
graph_builder.add_edge("summarize", "analyze_rewrite")
graph_builder.add_conditional_edges("analyze_rewrite", route_after_rewrite)
graph_builder.add_edge("human_input", "analyze_rewrite")
graph_builder.add_edge(["process_question"], "aggregate")
graph_builder.add_edge("aggregate", END)
# Compile graph with checkpointer and interruption
agent_graph = graph_builder.compile(
checkpointer=checkpointer,
interrupt_before=["human_input"]
)Graph architecture explained:
Agent Subgraph (processes individual questions):
- START →
agent(invoke LLM with tools) agent→tools(if tool calls needed) ORextract_answer(if done)tools→agent(return tool results)extract_answer→ END (clean final answer)
Main Graph (orchestrates complete workflow):
- START →
summarize(extract conversation context from history) summarize→analyze_rewrite(rewrite query with context, check clarity)analyze_rewrite→human_input(if unclear) OR spawn parallelprocess_questionagents (if clear)human_input→analyze_rewrite(after user provides clarification)- All
process_questionagents →aggregate(merge all responses) aggregate→ END (return final synthesized answer)
Key features:
- Parallel execution: Multiple agent subgraphs run simultaneously using LangGraph's
SendAPI - Human-in-the-loop: Graph pauses at
human_inputnode when queries are unclear - Conversation memory:
InMemorySavercheckpointer maintains state across interactions
The architecture flow diagram can be viewed here
Build a Gradio interface with conversation persistence and human-in-the-loop support. For a complete end-to-end pipeline Gradio interface, including document ingestion, please refer to the project folder
import gradio as gr
import uuid
def create_thread_id():
"""Generate a unique thread ID for each conversation"""
return {"configurable": {"thread_id": str(uuid.uuid4())}}
def clear_session():
"""Clear thread for new conversation"""
global config
agent_graph.checkpointer.delete_thread(config["configurable"]["thread_id"])
config = create_thread_id()
def chat_with_agent(message, history):
current_state = agent_graph.get_state(config)
if current_state.next:
# Resume interrupted conversation
agent_graph.update_state(config,{"messages": [HumanMessage(content=message.strip())]})
result = agent_graph.invoke(None, config)
else:
# Start new query
result = agent_graph.invoke({"messages": [HumanMessage(content=message.strip())]},config)
return result['messages'][-1].content
# Initialize thread configuration
config = create_thread_id()
# Create Gradio interface
with gr.Blocks() as demo:
chatbot = gr.Chatbot(
height=600,
placeholder="<strong>Ask me anything!</strong><br><em>I'll search, reason, and act to give you the best answer :)</em>"
)
chatbot.clear(clear_session)
gr.ChatInterface(fn=chat_with_agent, chatbot=chatbot)
demo.launch(theme=gr.themes.Citrus())You're done! You now have a fully functional Agentic RAG system with conversation memory and query clarification.
The app (project/ folder) is organized in modular components that can be easily customized:
project/
├── app.py # Main Gradio application entry point
├── config.py # Configuration hub (models, chunk sizes, providers)
├── util.py # PDF to markdown conversion
├── document_chunker.py # Chunking strategy
├── core/ # Core RAG components orchestration
│ ├── chat_interface.py
│ ├── document_manager.py
│ └── rag_system.py
├── db/ # Storage management
│ ├── parent_store_manager.py # Parent chunks storage (JSON)
│ └── vector_db_manager.py # Qdrant vector database setup
├── rag_agent/ # LangGraph agent workflow
│ ├── edges.py # Conditional routing logic
│ ├── graph.py # Graph construction and compilation
│ ├── graph_state.py # State definitions
│ ├── nodes.py # Processing nodes (summarize, rewrite, agent)
│ ├── prompts.py # System prompts
│ ├── schemas.py # Pydantic data models
│ └── tools.py # Retrieval tools
└── ui/ # User interface
└── gradio_app.py # Gradio interface components
- LLM Provider & Model: Switch between Ollama, Claude, OpenAI, or Gemini
- Embedding Model: Configure embedding model for vector representations
- Chunk Sizes: Adjust child and parent chunk dimensions for optimal retrieval
- Workflow Customization: Add or remove nodes and edges to modify the agent flow
- System Prompts: Tailor prompts in
prompts.pyfor domain-specific applications - Retrieval Tools: Extend or modify tools in
tools.pyto enhance retrieval capabilities - Graph Logic: Customize conditional routing in
edges.pyand node processing innodes.py
- Markdown Conversion (
util.py): Replace PDF conversion tools with alternatives (e.g., Docling, PaddleOCR). More details here - Chunking Strategy (
document_chunker.py): Implement custom chunking algorithms (e.g., semantic or hybrid approaches)
This modular design ensures flexibility for experimenting with different RAG techniques, LLM providers, and document processing pipelines.
Sample pdf files can be found here: javascript, blockchain, microservices, fortinet
The easiest way to get started:
Running in Google Colab:
- Click the Open in Colab badge at the top of this README
- Create a
docs/folder in the file browser - Upload your pdf files to the
docs/folder - Run all cells from top to bottom
- The chat interface will appear at the end
Running Locally (Jupyter/VSCode):
- Install dependencies first
pip install -r requirements.txt - Open the notebook in your preferred environment
- Add your pdf files to the
docs/folder - Run all cells from top to bottom
- The chat interface will appear at the end
# Clone the repository
git clone <repo-url>
cd agentic-rag-for-dummies
# Create virtual environment (recommended)
python -m venv venv
# Activate it
# On macOS/Linux:
source venv/bin/activate
# On Windows:
.\venv\Scripts\activate
# Install packages
pip install -r requirements.txtpython app.pyOpen the local URL (e.g., http://127.0.0.1:7860) to start chatting.
⚠️ System Requirements: Docker deployment requires at least 8GB of RAM allocated to Docker. The Ollama model (qwen3:4b-instruct-2507-q4_K_M) needs approximately 3.3GB of memory to run.
- Docker installed on your system (Get Docker)
- Docker Desktop configured with at least 8GB of RAM (Settings → Resources → Memory)
docker build -f project/Dockerfile -t agentic-rag .docker run --name rag-assistant -p 7860:7860 agentic-rag
⚠️ Performance Note: Docker deployment may be 20-50% slower than running Python locally, especially on Windows/Mac, due to virtualization overhead and I/O operations. This is normal and expected. For maximum performance during development, consider using Option 2 (Full Python Project).
Optional: Enable GPU acceleration (NVIDIA GPU only):
If you have an NVIDIA GPU and NVIDIA Container Toolkit installed:
docker run --gpus all --name rag-assistant -p 7860:7860 agentic-ragCommon Docker commands:
# Stop the container
docker stop rag-assistant
# Start an existing container
docker start rag-assistant
# View logs in real-time
docker logs -f rag-assistant
# Remove the container
docker rm rag-assistant
# Remove the container forcefully (if running)
docker rm -f rag-assistantOnce the container is running and you see:
🚀 Launching RAG Assistant...
* Running on local URL: http://0.0.0.0:7860
Open your browser and navigate to:
http://localhost:7860
With Conversation Memory:
User: "How do I install SQL?"
Agent: [Provides installation steps from documentation]
User: "How do I update it?"
Agent: [Understands "it" = SQL, provides update instructions]
With Query Clarification:
User: "Tell me about that thing"
Agent: "I need more information. What specific topic are you asking about?"
User: "The installation process for PostgreSQL"
Agent: [Retrieves and answers with specific information]
| Area | Common Problems | Suggested Solutions |
|---|---|---|
| Model Selection | - Responses ignore instructions - Tools (retrieval/search) used incorrectly - Poor context understanding - Hallucinations or incomplete aggregation |
- Use more capable LLMs - Prefer models 7B+ for better reasoning - Consider cloud-based models if local models are limited |
| System Prompt Behavior | - Model answers without retrieving documents - Query rewriting loses context - Aggregation introduces hallucinations |
- Make retrieval explicit in system prompts - Keep query rewriting close to user intent - Enforce strict aggregation rules |
| Retrieval Configuration | - Relevant documents not retrieved - Too much irrelevant information |
- Increase retrieved chunks (k) or lower similarity thresholds to improve recall- Reduce k or increase thresholds to improve precision |
| Chunk Size / Document Splitting | - Answers lack context or feel fragmented - Retrieval is slow or embedding costs are high |
- Increase chunk & parent sizes for more context - Decrease chunk sizes to improve speed and reduce costs |
| Temperature & Consistency | - Responses inconsistent or overly creative - Responses too rigid or repetitive |
- Set temperature to 0 for factual, consistent output- Slightly increase temperature for summarization or analysis tasks |
| Embedding Model Quality | - Poor semantic search - Weak performance on domain-specific or multilingual docs |
- Use higher-quality or domain-specific embeddings - Re-index all documents after changing embeddings |
MIT License - Feel free to use this for learning and building your own projects!
Contributions are welcome, open an issue or submit a pull request!

