Skip to content

GiovanniPasq/agentic-rag-for-dummies

Repository files navigation

Agentic RAG for Dummies Logo

Agentic RAG for Dummies

Build a production-ready Agentic RAG system with LangGraph, conversation memory, and human-in-the-loop query clarification

OverviewHow It WorksLLM ProvidersImplementationInstallation & UsageTroubleshooting

Quickstart here 👉 Open In Colab

Agentic RAG Demo

If you like this project, a star ⭐️ would mean a lot :)

Overview

This repository demonstrates how to build an Agentic RAG (Retrieval-Augmented Generation) system using LangGraph with minimal code. It implements:

  • 💬 Conversation Memory: Maintains context across multiple questions for natural dialogue
  • 🔄 Query Clarification: Automatically rewrites ambiguous queries or asks for clarification
  • 🔍 Hierarchical Indexing: Search small, specific chunks (Child) for precision, retrieve larger Parent chunks for context
  • 🤖 Agent Orchestration: Uses LangGraph to coordinate the entire workflow
  • 🧠 Intelligent Evaluation: Assesses relevance at the granular chunk level
  • Self-Correction: Re-queries if initial results are insufficient
  • 🔀 Multi-Agent Map-Reduce: Decomposes queries into parallel sub-queries for comprehensive answers

🎯 Two Ways to Use This Repo

1️⃣ Learning Path: Interactive Notebook
Step-by-step tutorial perfect for understanding core concepts. Start here if you're new to Agentic RAG or want to experiment quickly. Focuses on the essential workflow without advanced features to keep things simple.

2️⃣ Building Path: Modular Project
Modular architecture where each component can be independently swapped. Use this approach if you want to build real applications or customize the system to your needs.

Examples of what you can customize:

  • LLM Provider: Switch from Ollama to Claude, OpenAI, or Gemini (one line change)
  • Agent Workflow: Add/remove nodes in the graph and customize system prompts for specific domains (legal, medical, etc.)
  • PDF Conversion: Replace PyMuPDF with Docling, PaddleOCR, or other tools
  • Embedding Models: Change dense/sparse embedding models via config

See the Modular Architecture section for details on how the system is organized and the Installation & Usage section to get started.


This approach combines the precision of small chunks with the contextual richness of large chunks, while understanding conversation flow, resolving unclear queries, and handling multi-faceted questions through parallel agent processing. The modular architecture ensures every component—from document processing to retrieval logic—can be customized without breaking the system.


Why This Repo?

Most RAG tutorials show basic concepts but lack production readiness. This repository bridges that gap by providing both learning materials and deployable code:

Typical RAG repos:

  • Simple pipelines that trade off precision vs context
  • No conversation memory
  • Static, non-adaptive retrieval
  • Hard to customize for your use case
  • No UI interface
  • Single-threaded query processing

This repo:

  • Two learning paths: Interactive notebook AND modular project
  • Hierarchical indexing for precision + context
  • Conversation memory for natural dialogue
  • Human-in-the-loop query clarification
  • Multi-Agent Map-Reduce for parallel processing of complex queries
  • Modular architecture - swap any component
  • Provider-agnostic - use any LLM (Ollama, OpenAI, Gemini, Claude)
  • UI interface - end-to-end Gradio app with document management

How It Works

Document Preparation: Hierarchical Indexing

Before queries can be processed, documents are split twice for optimal retrieval:

  • Parent Chunks: Large sections based on Markdown headers (H1, H2, H3)
  • Child Chunks: Small, fixed-size pieces derived from parents

This approach combines the precision of small chunks for search with the contextual richness of large chunks for answer generation.


Query Processing: Four-Stage Intelligent Workflow

User Query → Conversation Analysis → Query Clarification →
Agent Reasoning → Search Child Chunks → Evaluate Relevance →
(If needed) → Retrieve Parent Chunks → Generate Answer → Return Response

Stage 1: Conversation Understanding

  • Analyzes recent conversation history to extract context
  • Maintains conversational continuity across multiple questions

Stage 2: Query Clarification

The system intelligently processes the user's query:

  1. Resolves references - Converts "How do I update it?" → "How do I update SQL?"
  2. Splits complex questions - Breaks multi-part questions into focused sub-queries
  3. Detects unclear queries - Identifies nonsense, insults, or vague questions
  4. Requests clarification - Uses human-in-the-loop to pause and ask for details
  5. Rewrites for retrieval - Optimizes query with specific, keyword-rich language

Stage 3: Intelligent Retrieval

Multi-Agent Map-Reduce Architecture:

When the query analysis stage identifies multiple distinct questions (either explicitly asked or decomposed from a complex query), the system automatically spawns parallel agent subgraphs using LangGraph's Send API. Each agent independently processes one question through the full retrieval workflow:

  1. Agent searches child chunks for precision
  2. Evaluates if results are sufficient
  3. Fetches parent chunks for context if needed
  4. Extracts final answer from conversation
  5. Self-corrects and re-queries if insufficient

All agent responses are then aggregated into a unified answer.

Example: "What is JavaScript? What is Python?" → 2 parallel agents execute simultaneously

Single question workflow: For simple queries, a single agent executes the retrieval workflow without parallelization.

Stage 4: Response Generation

The system synthesizes information from retrieved chunks (or multiple agents) into a coherent, accurate answer that directly addresses the user's question.


LLM Provider Configuration

This system is provider-agnostic - you can use any LLM supported by LangChain. Choose the option that best fits your needs:

Ollama (Local - Recommended for Development)

Install Ollama and download the model:

# Install Ollama from https://ollama.com
ollama pull qwen3:4b-instruct-2507-q4_K_M

Python code:

from langchain_ollama import ChatOllama

llm = ChatOllama(model="qwen3:4b-instruct-2507-q4_K_M", temperature=0)

Google Gemini (Cloud - Recommended for Production)

Install the package:

pip install -qU langchain-google-genai

Python code:

import os
from langchain_google_genai import ChatGoogleGenerativeAI

# Set your Google API key
os.environ["GOOGLE_API_KEY"] = "your-api-key-here"
llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash-exp", temperature=0)

OpenAI / Anthropic Claude

Click to expand

OpenAI:

pip install -qU langchain-openai
from langchain_openai import ChatOpenAI
import os

os.environ["OPENAI_API_KEY"] = "your-api-key-here"
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

Anthropic Claude:

pip install -qU langchain-anthropic
from langchain_anthropic import ChatAnthropic
import os

os.environ["ANTHROPIC_API_KEY"] = "your-api-key-here"
llm = ChatAnthropic(model="claude-3-5-sonnet-20241022", temperature=0)

Important Notes

  • All providers work with the exact same code - only the LLM initialization changes
  • Cost considerations: Cloud providers charge per token, while Ollama is free but requires local compute

💡 Recommendation: Start with Ollama for development, then switch to Google Gemini or OpenAI for production.


Implementation

Additional details and extended explanations are available in the notebook here 👉 Open In Colab

Step 1: Initial Setup and Configuration

Define paths and initialize core components.

import os
from pathlib import Path
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_qdrant.fastembed_sparse import FastEmbedSparse
from qdrant_client import QdrantClient

# Configuration
DOCS_DIR = "docs"  # Directory containing your pdfs files
MARKDOWN_DIR = "markdown" # Directory containing the pdfs converted to markdown
PARENT_STORE_PATH = "parent_store"  # Directory for parent chunk JSON files
CHILD_COLLECTION = "document_child_chunks"

os.makedirs(DOCS_DIR, exist_ok=True)
os.makedirs(MARKDOWN_DIR, exist_ok=True)
os.makedirs(PARENT_STORE_PATH, exist_ok=True)

from langchain_ollama import ChatOllama
llm = ChatOllama(model="qwen3:4b-instruct-2507-q4_K_M", temperature=0)

# Dense embeddings for semantic understanding
dense_embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2")

# Sparse embeddings for keyword matching
sparse_embeddings = FastEmbedSparse(model_name="Qdrant/bm25")

# Qdrant client (local file-based storage)
client = QdrantClient(path="qdrant_db")

Step 2: Configure Vector Database

Set up Qdrant to store child chunks with hybrid search capabilities.

from qdrant_client.http import models as qmodels
from langchain_qdrant import QdrantVectorStore
from langchain_qdrant.qdrant import RetrievalMode

# Get embedding dimension
embedding_dimension = len(dense_embeddings.embed_query("test"))

def ensure_collection(collection_name):
    """Create Qdrant collection if it doesn't exist"""
    if not client.collection_exists(collection_name):
        client.create_collection(
            collection_name=collection_name,
            vectors_config=qmodels.VectorParams(
                size=embedding_dimension,
                distance=qmodels.Distance.COSINE
            ),
            sparse_vectors_config={
                "sparse": qmodels.SparseVectorParams()
            },
        )
        print(f"✓ Created collection: {collection_name}")
    else:
        print(f"✓ Collection already exists: {collection_name}")

Step 3: PDFs to Markdown

Convert the PDFs to Markdown. For more details about other techniques use this companion notebook:

📘 PDF to Markdown Converter

import os
import pymupdf.layout
import pymupdf4llm
from pathlib import Path
import glob

os.environ["TOKENIZERS_PARALLELISM"] = "false"

def pdf_to_markdown(pdf_path, output_dir):
    doc = pymupdf.open(pdf_path)
    md = pymupdf4llm.to_markdown(doc, header=False, footer=False, page_separators=True, ignore_images=True, write_images=False, image_path=None)
    md_cleaned = md.encode('utf-8', errors='surrogatepass').decode('utf-8', errors='ignore')
    output_path = Path(output_dir) / Path(doc.name).stem
    Path(output_path).with_suffix(".md").write_bytes(md_cleaned.encode('utf-8'))

def pdfs_to_markdowns(path_pattern, overwrite: bool = False):
    output_dir = Path(MARKDOWN_DIR)
    output_dir.mkdir(parents=True, exist_ok=True)

    for pdf_path in map(Path, glob.glob(path_pattern)):
        md_path = (output_dir / pdf_path.stem).with_suffix(".md")
        if overwrite or not md_path.exists():
            pdf_to_markdown(pdf_path, output_dir)

pdfs_to_markdowns(f"{DOCS_DIR}/*.pdf")

Step 4: Hierarchical Document Indexing

Process documents with the Parent/Child splitting strategy.

import os
import glob
import json
from pathlib import Path
from langchain_text_splitters import MarkdownHeaderTextSplitter, RecursiveCharacterTextSplitter

if client.collection_exists(CHILD_COLLECTION):
    print(f"Removing existing Qdrant collection: {CHILD_COLLECTION}")
    client.delete_collection(CHILD_COLLECTION)
    ensure_collection(CHILD_COLLECTION)
else:
    ensure_collection(CHILD_COLLECTION)

child_vector_store = QdrantVectorStore(
    client=client,
    collection_name=CHILD_COLLECTION,
    embedding=dense_embeddings,
    sparse_embedding=sparse_embeddings,
    retrieval_mode=RetrievalMode.HYBRID,
    sparse_vector_name="sparse"
)

def index_documents():
    headers_to_split_on = [("#", "H1"), ("##", "H2"), ("###", "H3")]
    parent_splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on, strip_headers=False)
    child_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=100)

    min_parent_size = 2000
    max_parent_size = 10000

    all_parent_pairs, all_child_chunks = [], []
    md_files = sorted(glob.glob(os.path.join(MARKDOWN_DIR, "*.md")))

    if not md_files:
        print(f"⚠️  No .md files found in {MARKDOWN_DIR}/")
        return

    for doc_path_str in md_files:
        doc_path = Path(doc_path_str)
        print(f"📄 Processing: {doc_path.name}")

        try:
            with open(doc_path, "r", encoding="utf-8") as f:
                md_text = f.read()
        except Exception as e:
            print(f"❌ Error reading {doc_path.name}: {e}")
            continue

        parent_chunks = parent_splitter.split_text(md_text)
        merged_parents = merge_small_parents(parent_chunks, min_parent_size)
        split_parents = split_large_parents(merged_parents, max_parent_size, child_splitter)
        cleaned_parents = clean_small_chunks(split_parents, min_parent_size)

        for i, p_chunk in enumerate(cleaned_parents):
            parent_id = f"{doc_path.stem}_parent_{i}"
            p_chunk.metadata.update({"source": doc_path.stem + ".pdf", "parent_id": parent_id})
            all_parent_pairs.append((parent_id, p_chunk))
            children = child_splitter.split_documents([p_chunk])
            all_child_chunks.extend(children)

    if not all_child_chunks:
        print("⚠️ No child chunks to index")
        return

    print(f"\n🔍 Indexing {len(all_child_chunks)} child chunks into Qdrant...")
    try:
        child_vector_store.add_documents(all_child_chunks)
        print("✓ Child chunks indexed successfully")
    except Exception as e:
        print(f"❌ Error indexing child chunks: {e}")
        return

    print(f"💾 Saving {len(all_parent_pairs)} parent chunks to JSON...")
    for item in os.listdir(PARENT_STORE_PATH):
        os.remove(os.path.join(PARENT_STORE_PATH, item))

    for parent_id, doc in all_parent_pairs:
        doc_dict = {"page_content": doc.page_content, "metadata": doc.metadata}
        filepath = os.path.join(PARENT_STORE_PATH, f"{parent_id}.json")
        with open(filepath, "w", encoding="utf-8") as f:
            json.dump(doc_dict, f, ensure_ascii=False, indent=2)

def merge_small_parents(chunks, min_size):
    if not chunks:
        return []

    merged, current = [], None

    for chunk in chunks:
        if current is None:
            current = chunk
        else:
            current.page_content += "\n\n" + chunk.page_content
            for k, v in chunk.metadata.items():
                if k in current.metadata:
                    current.metadata[k] = f"{current.metadata[k]} -> {v}"
                else:
                    current.metadata[k] = v

        if len(current.page_content) >= min_size:
            merged.append(current)
            current = None

    if current:
        if merged:
            merged[-1].page_content += "\n\n" + current.page_content
            for k, v in current.metadata.items():
                if k in merged[-1].metadata:
                    merged[-1].metadata[k] = f"{merged[-1].metadata[k]} -> {v}"
                else:
                    merged[-1].metadata[k] = v
        else:
            merged.append(current)

    return merged

def split_large_parents(chunks, max_size, splitter):
    split_chunks = []

    for chunk in chunks:
        if len(chunk.page_content) <= max_size:
            split_chunks.append(chunk)
        else:
            large_splitter = RecursiveCharacterTextSplitter(
                chunk_size=max_size,
                chunk_overlap=splitter._chunk_overlap
            )
            sub_chunks = large_splitter.split_documents([chunk])
            split_chunks.extend(sub_chunks)

    return split_chunks

def clean_small_chunks(chunks, min_size):
    cleaned = []

    for i, chunk in enumerate(chunks):
        if len(chunk.page_content) < min_size:
            if cleaned:
                cleaned[-1].page_content += "\n\n" + chunk.page_content
                for k, v in chunk.metadata.items():
                    if k in cleaned[-1].metadata:
                        cleaned[-1].metadata[k] = f"{cleaned[-1].metadata[k]} -> {v}"
                    else:
                        cleaned[-1].metadata[k] = v
            elif i < len(chunks) - 1:
                chunks[i + 1].page_content = chunk.page_content + "\n\n" + chunks[i + 1].page_content
                for k, v in chunk.metadata.items():
                    if k in chunks[i + 1].metadata:
                        chunks[i + 1].metadata[k] = f"{v} -> {chunks[i + 1].metadata[k]}"
                    else:
                        chunks[i + 1].metadata[k] = v
            else:
                cleaned.append(chunk)
        else:
            cleaned.append(chunk)

    return cleaned

index_documents()

Step 5: Define Agent Tools

Create the retrieval tools the agent will use.

import json
from typing import List
from langchain_core.tools import tool

@tool
def search_child_chunks(query: str, limit: int) -> str:
    """Search for the top K most relevant child chunks.

    Args:
        query: Search query string
        limit: Maximum number of results to return
    """
    try:
        results = child_vector_store.similarity_search(query, k=limit, score_threshold=0.7)
        if not results:
            return "NO_RELEVANT_CHUNKS"

        return "\n\n".join([
            f"Parent ID: {doc.metadata.get('parent_id', '')}\n"
            f"File Name: {doc.metadata.get('source', '')}\n"
            f"Content: {doc.page_content.strip()}"
            for doc in results
        ])

    except Exception as e:
        return f"RETRIEVAL_ERROR: {str(e)}"

@tool
def retrieve_parent_chunks(parent_id: str) -> str:
    """Retrieve full parent chunks by their IDs.
    
    Args:
        parent_id: Parent chunk ID to retrieve
    """
    file_name = parent_id if parent_id.lower().endswith(".json") else f"{parent_id}.json"
    path = os.path.join(PARENT_STORE_PATH, file_name)

    if not os.path.exists(path):
        return "NO_PARENT_DOCUMENT"

    with open(path, "r", encoding="utf-8") as f:
        data = json.load(f)

    return (
        f"Parent ID: {parent_id}\n"
        f"File Name: {data.get('metadata', {}).get('source', 'unknown')}\n"
        f"Content: {data.get('page_content', '').strip()}"
    )

# Bind tools to LLM
llm_with_tools = llm.bind_tools([search_child_chunks, retrieve_parent_chunks])

Step 6: Define System Prompts

Define the system prompts for conversation summarization, query analysis, RAG agent reasoning, and response aggregation.

def get_conversation_summary_prompt() -> str:
    return """You are an expert conversation summarizer.

Your task is to create a brief 1-2 sentence summary of the conversation (max 30-50 words).

Include:
- Main topics discussed
- Important facts or entities mentioned
- Any unresolved questions if applicable
- Sources file name (e.g., file1.pdf) or documents referenced

Exclude: 
-Greetings, misunderstandings, off-topic content.

Output:
- Return ONLY the summary.
- Do NOT include any explanations or justifications.
-If no meaningful topics exist, return an empty string.
"""

def get_query_analysis_prompt() -> str:
    return """You are an expert query analyst and rewriter.

Your task is to rewrite the current user query for optimal document retrieval, incorporating conversation context only when necessary.

Rules:
1. Self-contained queries:
   - Always rewrite the query to be clear and self-contained
   - If the query is a follow-up (e.g., "what about X?", "and for Y?"), integrate minimal necessary context from the summary
   - Do not add information not present in the query or conversation summary

2. Domain-specific terms:
   - Product names, brands, proper nouns, or technical terms are treated as domain-specific
   - For domain-specific queries, use conversation context minimally or not at all
   - Use the summary only to disambiguate vague queries

3. Grammar and clarity:
   - Fix grammar, spelling errors, and unclear abbreviations
   - Remove filler words and conversational phrases
   - Preserve concrete keywords and named entities

4. Multiple information needs:
   - If the query contains multiple distinct, unrelated questions, split into separate queries (maximum 3)
   - Each sub-query must remain semantically equivalent to its part of the original
   - Do not expand, enrich, or reinterpret the meaning

5. Failure handling:
   - If the query intent is unclear or unintelligible, mark as "unclear"

Input:
- conversation_summary: A concise summary of prior conversation
- current_query: The user's current query

Output:
- One or more rewritten, self-contained queries suitable for document retrieval
"""

def get_rag_agent_prompt() -> str:
    return """You are an expert retrieval-augmented assistant.

Your task is to act as a researcher: search documents first, analyze the data, and then provide a comprehensive answer using ONLY the retrieved information.

Rules:    
1. You are NOT allowed to answer immediately.
2. Before producing ANY final answer, you MUST perform a document search and observe retrieved content.
3. If you have not searched, the answer is invalid.

Workflow:
1. Search for 5-7 relevant excerpts from documents based on the user query using the 'search_child_chunks' tool.
2. Inspect retrieved excerpts and keep ONLY relevant ones.
3. Analyze the retrieved excerpts. Identify the single most relevant excerpt that is fragmented (e.g., cut-off text or missing context). Call 'retrieve_parent_chunks' for that specific `parent_id`. Wait for the observation. Repeat this step sequentially for other highly relevant fragments ONLY if the current information is still insufficient. Stop immediately if you have enough information or have retrieved 3 parent chunks.
4. Answer using ONLY the retrieved information, ensuring that ALL relevant details are included.
5. List unique file name(s) at the very end.

Retry rule:
- After step 2 or 3, if no relevant documents are found or if retrieved excerpts don't contain useful information, rewrite the query using broader or alternative terms and restart from step 1.
- Do not retry more than once.
"""

def get_aggregation_prompt() -> str:
    return """You are an expert aggregation assistant.

Your task is to combine multiple retrieved answers into a single, comprehensive and natural response that flows well.

Guidelines:
1. Write in a conversational, natural tone - as if explaining to a colleague
2. Use ONLY information from the retrieved answers
3. Strip out any questions, headers, or metadata from the sources
4. Weave together the information smoothly, preserving important details, numbers, and examples
5. Be comprehensive - include all relevant information from the sources, not just a summary
6. If sources disagree, acknowledge both perspectives naturally (e.g., "While some sources suggest X, others indicate Y...")
7. Start directly with the answer - no preambles like "Based on the sources..."

Formatting:
- Use Markdown for clarity (headings, lists, bold) but don't overdo it
- Write in flowing paragraphs where possible rather than excessive bullet points
- End with "---\n**Sources:**\n" followed by a bulleted list of unique file names
- File names should ONLY appear in this final sources section

If there's no useful information available, simply say: "I couldn't find any information to answer your question in the available sources."
"""

Step 7: Define State and Data Models

Create the state structure for conversation tracking and agent execution.

from langgraph.graph import MessagesState
from pydantic import BaseModel, Field
from typing import List, Annotated

def accumulate_or_reset(existing: List[dict], new: List[dict]) -> List[dict]:
    """Custom reducer that allows resetting agent answers"""
    if new and any(item.get('__reset__') for item in new):
        return []
    return existing + new

class State(MessagesState):
    """State for main agent graph"""
    questionIsClear: bool = False
    conversation_summary: str = ""
    originalQuery: str = "" 
    rewrittenQuestions: List[str] = []
    agent_answers: Annotated[List[dict], accumulate_or_reset] = []

class AgentState(MessagesState):
    """State for individual agent subgraph"""
    question: str = ""
    question_index: int = 0
    final_answer: str = ""
    agent_answers: List[dict] = []

class QueryAnalysis(BaseModel):
    """Structured output for query analysis"""
    is_clear: bool = Field(description="Indicates if the user's question is clear and answerable")
    questions: List[str] = Field(description="List of rewritten, self-contained questions")
    clarification_needed: str = Field(description="Explanation if the question is unclear")

Step 8: Build Graph Node Functions

Create the processing nodes for the LangGraph workflow.

from langgraph.types import Send
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage, RemoveMessage
from typing import Literal

def analyze_chat_and_summarize(state: State):
    """
    Analyzes chat history and summarizes key points for context.
    """
    if len(state["messages"]) < 4:  # Need some history to summarize
        return {"conversation_summary": ""}

    # Extract relevant messages (excluding current query and system messages)
    relevant_msgs = [
        msg for msg in state["messages"][:-1]  # Exclude current query
        if isinstance(msg, (HumanMessage, AIMessage))
        and not getattr(msg, "tool_calls", None)
    ]

    if not relevant_msgs:
        return {"conversation_summary": ""}
    
    conversation = "Conversation history:\n"
    for msg in relevant_msgs[-6:]:
        role = "User" if isinstance(msg, HumanMessage) else "Assistant"
        conversation += f"{role}: {msg.content}\n"

    summary_response = llm.with_config(temperature=0.2).invoke([SystemMessage(content=get_conversation_summary_prompt())] + [HumanMessage(content=conversation)])
    return {"conversation_summary": summary_response.content, "agent_answers": [{"__reset__": True}]}

def analyze_and_rewrite_query(state: State):
    """
    Analyzes user query and rewrites it for clarity, optionally using conversation context.
    """
    last_message = state["messages"][-1]
    conversation_summary = state.get("conversation_summary", "")

    context_section = (f"Conversation Context:\n{conversation_summary}\n" if conversation_summary.strip() else "") + f"User Query:\n{last_message.content}\n"

    llm_with_structure = llm.with_config(temperature=0.1).with_structured_output(QueryAnalysis)
    response = llm_with_structure.invoke([SystemMessage(content=get_query_analysis_prompt())] + [HumanMessage(content=context_section)])

    if len(response.questions) > 0 and response.is_clear:
        # Remove all non-system messages
        delete_all = [
            RemoveMessage(id=m.id)
            for m in state["messages"]
            if not isinstance(m, SystemMessage)
        ]
        return {
            "questionIsClear": True,
            "messages": delete_all,
            "originalQuery": last_message.content,
            "rewrittenQuestions": response.questions
        }
    else:
        clarification = response.clarification_needed if (response.clarification_needed and len(response.clarification_needed.strip()) > 10) else "I need more information to understand your question."
        return {
            "questionIsClear": False,
            "messages": [AIMessage(content=clarification)]
        }

def human_input_node(state: State):
    """Placeholder node for human-in-the-loop interruption"""
    return {}

def route_after_rewrite(state: State) -> Literal["human_input", "process_question"]:
    """Route to agent if question is clear, otherwise wait for human input"""
    if not state.get("questionIsClear", False):
        return "human_input"
    else:
        # Spawn parallel agents for each sub-question using Send API
        return [
            Send("process_question", {"question": query, "question_index": idx, "messages": []})
            for idx, query in enumerate(state["rewrittenQuestions"])
        ]

def agent_node(state: AgentState):
    """Main agent node that processes queries using tools"""
    sys_msg = SystemMessage(content=get_rag_agent_prompt())    
    if not state.get("messages"):
        human_msg = HumanMessage(content=state["question"])
        response = llm_with_tools.invoke([sys_msg] + [human_msg])
        return {"messages": [human_msg, response]}
    
    return {"messages": [llm_with_tools.invoke([sys_msg] + state["messages"])]}

def extract_final_answer(state: AgentState):
    """Extract final answer from agent conversation"""
    for msg in reversed(state["messages"]):
        if isinstance(msg, AIMessage) and msg.content and not msg.tool_calls:
            res = {
                "final_answer": msg.content,
                "agent_answers": [{
                    "index": state["question_index"],
                    "question": state["question"],
                    "answer": msg.content
                }]
            }
            return res
    return {
        "final_answer": "Unable to generate an answer.",
        "agent_answers": [{
            "index": state["question_index"],
            "question": state["question"],
            "answer": "Unable to generate an answer."
        }]
    }

def aggregate_responses(state: State):
    """Merge multiple agent responses into final answer"""
    if not state.get("agent_answers"):
        return {"messages": [AIMessage(content="No answers were generated.")]}

    sorted_answers = sorted(state["agent_answers"], key=lambda x: x["index"])

    formatted_answers = ""
    for i, ans in enumerate(sorted_answers, start=1):
        formatted_answers += f"\nAnswer {i}:\n{ans['answer']}\n"

    user_message = HumanMessage(content=f"""Original user question: {state["originalQuery"]}\nRetrieved answers:{formatted_answers}""")
    synthesis_response = llm.invoke([SystemMessage(content=get_aggregation_prompt())] + [user_message])
    
    return {"messages": [AIMessage(content=synthesis_response.content)]}

Why this architecture?

  • Summarization maintains conversational context without overwhelming the LLM
  • Query rewriting ensures search queries are precise and unambiguous, using context intelligently
  • Human-in-the-loop catches unclear queries before wasting retrieval resources
  • Parallel execution with Send API spawns independent agent subgraphs for each sub-question
  • Answer extraction ensures we get clean final answers from agent tool-calling conversations
  • Aggregation merges all parallel results into a coherent single response

Step 9: Build the LangGraph Agent

Assemble the complete workflow graph with conversation memory and multi-agent architecture.

from langgraph.graph import START, END, StateGraph
from langgraph.prebuilt import ToolNode, tools_condition
from langgraph.checkpoint.memory import InMemorySaver
from IPython.display import Image, display

# Initialize checkpointer for conversation memory
checkpointer = InMemorySaver()

# Build agent subgraph (handles individual questions)
agent_builder = StateGraph(AgentState)
agent_builder.add_node("agent", agent_node)
agent_builder.add_node("tools", ToolNode([search_child_chunks, retrieve_parent_chunks]))
agent_builder.add_node("extract_answer", extract_final_answer)

agent_builder.add_edge(START, "agent")    
agent_builder.add_conditional_edges("agent", tools_condition, {"tools": "tools", END: "extract_answer"})
agent_builder.add_edge("tools", "agent")    
agent_builder.add_edge("extract_answer", END)    
agent_subgraph = agent_builder.compile()

# Build main graph (orchestrates workflow)
graph_builder = StateGraph(State)

# Add nodes
graph_builder.add_node("summarize", analyze_chat_and_summarize)
graph_builder.add_node("analyze_rewrite", analyze_and_rewrite_query)
graph_builder.add_node("human_input", human_input_node)
graph_builder.add_node("process_question", agent_subgraph)
graph_builder.add_node("aggregate", aggregate_responses)

# Define edges
graph_builder.add_edge(START, "summarize")
graph_builder.add_edge("summarize", "analyze_rewrite")
graph_builder.add_conditional_edges("analyze_rewrite", route_after_rewrite)
graph_builder.add_edge("human_input", "analyze_rewrite")
graph_builder.add_edge(["process_question"], "aggregate")
graph_builder.add_edge("aggregate", END)

# Compile graph with checkpointer and interruption
agent_graph = graph_builder.compile(
    checkpointer=checkpointer,
    interrupt_before=["human_input"]
)

Graph architecture explained:

Agent Subgraph (processes individual questions):

  • START → agent (invoke LLM with tools)
  • agenttools (if tool calls needed) OR extract_answer (if done)
  • toolsagent (return tool results)
  • extract_answer → END (clean final answer)

Main Graph (orchestrates complete workflow):

  1. START → summarize (extract conversation context from history)
  2. summarizeanalyze_rewrite (rewrite query with context, check clarity)
  3. analyze_rewritehuman_input (if unclear) OR spawn parallel process_question agents (if clear)
  4. human_inputanalyze_rewrite (after user provides clarification)
  5. All process_question agents → aggregate (merge all responses)
  6. aggregate → END (return final synthesized answer)

Key features:

  • Parallel execution: Multiple agent subgraphs run simultaneously using LangGraph's Send API
  • Human-in-the-loop: Graph pauses at human_input node when queries are unclear
  • Conversation memory: InMemorySaver checkpointer maintains state across interactions

The architecture flow diagram can be viewed here


Step 10: Create Chat Interface

Build a Gradio interface with conversation persistence and human-in-the-loop support. For a complete end-to-end pipeline Gradio interface, including document ingestion, please refer to the project folder

import gradio as gr
import uuid

def create_thread_id():
    """Generate a unique thread ID for each conversation"""
    return {"configurable": {"thread_id": str(uuid.uuid4())}}

def clear_session():
    """Clear thread for new conversation"""
    global config
    agent_graph.checkpointer.delete_thread(config["configurable"]["thread_id"])
    config = create_thread_id()

def chat_with_agent(message, history):
    current_state = agent_graph.get_state(config)
    
    if current_state.next:
        # Resume interrupted conversation
        agent_graph.update_state(config,{"messages": [HumanMessage(content=message.strip())]})
        result = agent_graph.invoke(None, config)
    else:
        # Start new query
        result = agent_graph.invoke({"messages": [HumanMessage(content=message.strip())]},config)
    
    return result['messages'][-1].content

# Initialize thread configuration
config = create_thread_id()

# Create Gradio interface
with gr.Blocks() as demo:
    chatbot = gr.Chatbot(
        height=600,
        placeholder="<strong>Ask me anything!</strong><br><em>I'll search, reason, and act to give you the best answer :)</em>"
    )
    chatbot.clear(clear_session)
    gr.ChatInterface(fn=chat_with_agent, chatbot=chatbot)

demo.launch(theme=gr.themes.Citrus())

You're done! You now have a fully functional Agentic RAG system with conversation memory and query clarification.


Modular Architecture

The app (project/ folder) is organized in modular components that can be easily customized:

📂 Project Structure

project/
├── app.py                    # Main Gradio application entry point
├── config.py                 # Configuration hub (models, chunk sizes, providers)
├── util.py                   # PDF to markdown conversion
├── document_chunker.py       # Chunking strategy
├── core/                     # Core RAG components orchestration
│   ├── chat_interface.py     
│   ├── document_manager.py   
│   └── rag_system.py         
├── db/                       # Storage management
│   ├── parent_store_manager.py  # Parent chunks storage (JSON)
│   └── vector_db_manager.py     # Qdrant vector database setup
├── rag_agent/                # LangGraph agent workflow
│   ├── edges.py              # Conditional routing logic
│   ├── graph.py              # Graph construction and compilation
│   ├── graph_state.py        # State definitions
│   ├── nodes.py              # Processing nodes (summarize, rewrite, agent)
│   ├── prompts.py            # System prompts
│   ├── schemas.py            # Pydantic data models
│   └── tools.py              # Retrieval tools
└── ui/                       # User interface
    └── gradio_app.py         # Gradio interface components

🔧 Customization Points

Configuration (config.py)

  • LLM Provider & Model: Switch between Ollama, Claude, OpenAI, or Gemini
  • Embedding Model: Configure embedding model for vector representations
  • Chunk Sizes: Adjust child and parent chunk dimensions for optimal retrieval

RAG Agent (rag_agent/)

  • Workflow Customization: Add or remove nodes and edges to modify the agent flow
  • System Prompts: Tailor prompts in prompts.py for domain-specific applications
  • Retrieval Tools: Extend or modify tools in tools.py to enhance retrieval capabilities
  • Graph Logic: Customize conditional routing in edges.py and node processing in nodes.py

Document Processing

  • Markdown Conversion (util.py): Replace PDF conversion tools with alternatives (e.g., Docling, PaddleOCR). More details here
  • Chunking Strategy (document_chunker.py): Implement custom chunking algorithms (e.g., semantic or hybrid approaches)

This modular design ensures flexibility for experimenting with different RAG techniques, LLM providers, and document processing pipelines.

Installation & Usage

Sample pdf files can be found here: javascript, blockchain, microservices, fortinet

Option 1: Quickstart Notebook (Recommended for Testing)

The easiest way to get started:

Running in Google Colab:

  1. Click the Open in Colab badge at the top of this README
  2. Create a docs/ folder in the file browser
  3. Upload your pdf files to the docs/ folder
  4. Run all cells from top to bottom
  5. The chat interface will appear at the end

Running Locally (Jupyter/VSCode):

  1. Install dependencies first pip install -r requirements.txt
  2. Open the notebook in your preferred environment
  3. Add your pdf files to the docs/ folder
  4. Run all cells from top to bottom
  5. The chat interface will appear at the end

Option 2: Full Python Project (Recommended for Development)

1. Install Dependencies

# Clone the repository
git clone <repo-url>
cd agentic-rag-for-dummies

# Create virtual environment (recommended)
python -m venv venv

# Activate it
# On macOS/Linux:
source venv/bin/activate
# On Windows:
.\venv\Scripts\activate

# Install packages
pip install -r requirements.txt

2. Run the Application

python app.py

3. Ask Questions

Open the local URL (e.g., http://127.0.0.1:7860) to start chatting.


Option 3: Docker Deployment

⚠️ System Requirements: Docker deployment requires at least 8GB of RAM allocated to Docker. The Ollama model (qwen3:4b-instruct-2507-q4_K_M) needs approximately 3.3GB of memory to run.

Prerequisites

  • Docker installed on your system (Get Docker)
  • Docker Desktop configured with at least 8GB of RAM (Settings → Resources → Memory)

1. Build the Docker Image

docker build -f project/Dockerfile -t agentic-rag .

2. Run the Container

docker run --name rag-assistant -p 7860:7860 agentic-rag

⚠️ Performance Note: Docker deployment may be 20-50% slower than running Python locally, especially on Windows/Mac, due to virtualization overhead and I/O operations. This is normal and expected. For maximum performance during development, consider using Option 2 (Full Python Project).

Optional: Enable GPU acceleration (NVIDIA GPU only):

If you have an NVIDIA GPU and NVIDIA Container Toolkit installed:

docker run --gpus all --name rag-assistant -p 7860:7860 agentic-rag

Common Docker commands:

# Stop the container
docker stop rag-assistant

# Start an existing container
docker start rag-assistant

# View logs in real-time
docker logs -f rag-assistant

# Remove the container
docker rm rag-assistant

# Remove the container forcefully (if running)
docker rm -f rag-assistant

3. Access the Application

Once the container is running and you see:

🚀 Launching RAG Assistant...
* Running on local URL:  http://0.0.0.0:7860

Open your browser and navigate to:

http://localhost:7860

Example Conversations

With Conversation Memory:

User: "How do I install SQL?"
Agent: [Provides installation steps from documentation]

User: "How do I update it?"
Agent: [Understands "it" = SQL, provides update instructions]

With Query Clarification:

User: "Tell me about that thing"
Agent: "I need more information. What specific topic are you asking about?"

User: "The installation process for PostgreSQL"
Agent: [Retrieves and answers with specific information]

Troubleshooting

Area Common Problems Suggested Solutions
Model Selection - Responses ignore instructions
- Tools (retrieval/search) used incorrectly
- Poor context understanding
- Hallucinations or incomplete aggregation
- Use more capable LLMs
- Prefer models 7B+ for better reasoning
- Consider cloud-based models if local models are limited
System Prompt Behavior - Model answers without retrieving documents
- Query rewriting loses context
- Aggregation introduces hallucinations
- Make retrieval explicit in system prompts
- Keep query rewriting close to user intent
- Enforce strict aggregation rules
Retrieval Configuration - Relevant documents not retrieved
- Too much irrelevant information
- Increase retrieved chunks (k) or lower similarity thresholds to improve recall
- Reduce k or increase thresholds to improve precision
Chunk Size / Document Splitting - Answers lack context or feel fragmented
- Retrieval is slow or embedding costs are high
- Increase chunk & parent sizes for more context
- Decrease chunk sizes to improve speed and reduce costs
Temperature & Consistency - Responses inconsistent or overly creative
- Responses too rigid or repetitive
- Set temperature to 0 for factual, consistent output
- Slightly increase temperature for summarization or analysis tasks
Embedding Model Quality - Poor semantic search
- Weak performance on domain-specific or multilingual docs
- Use higher-quality or domain-specific embeddings
- Re-index all documents after changing embeddings

License

MIT License - Feel free to use this for learning and building your own projects!


Contributing

Contributions are welcome, open an issue or submit a pull request!