Python Forum
RAG pipeline returns wrong page citations and occasional hallucinations
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
RAG pipeline returns wrong page citations and occasional hallucinations
#1
I built a RAG pipeline using LangGraph with the following flow:

rewrite → hybrid retrieve → cross-encoder rerank → parent expansion → grounded generation

The system enforces strict grounding (returns a fallback message if no relevant context is found) and requires inline citations like:

[file.pdf, p. 123]

Problem
Retrieval doesnt works so good, I’m facing several issues:

Wrong page citations

The model often uses the correct information but cites the wrong page.

Example: answer says [file.pdf, p. 71] but the UI shows a completely different page.

Mismatch between cited pages and rendered sources

The sources shown in the UI don’t match the pages referenced in the answer.
Occasional hallucinations / degeneration

The model sometimes starts repeating a word until the end of the response.
Current setup (simplified)
Hybrid retrieval (vector + keyword)

Cross-encoder reranking (ms-marco style)

Parent-child document structure

Context built from parent documents, but citations come from child chunks

Strict prompting: “use only context or return NOT_FOUND”

Question
What are best practices to:

Ensure correct and stable citations (no wrong page numbers)?

Avoid mismatch between generated citations and UI-rendered documents?

Reduce hallucinations and repetition loops in grounded RAG systems?
How to make my retrieval to be better and to ensure better quality of answers?

I’ve included my full rag_graph.py below.

Any architectural or practical suggestions are appreciated.
"""
RAG pipeline LangGraph.

Pipeline:
  rewrite → retrieve (hybrid) → rerank (cross-encoder)
         → expand_to_parents → generate (grounded)


"""

from __future__ import annotations

import logging
import re
from typing import Optional, TypedDict, Any

from langchain_core.documents import Document
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_ollama import ChatOllama
from langgraph.graph import StateGraph, END

from config import LLM_MODEL, OLLAMA_BASE_URL
from modules.vector_store import NotebookVectorStore
from modules.parent_store import ParentStore

logger = logging.getLogger(__name__)

NOT_FOUND_MSG = "Túto informáciu som v nahraných dokumentoch nenašiel."

# ── Parametre pipeline ───────────────────────────────────────────────────────
RERANKER_MODEL      = "cross-encoder/mmarco-mMiniLMv2-L12-H384-v1"
INITIAL_K           = 40       # hyvrid retrieval
RERANK_KEEP_K       = 10        # top candidates
MAX_CONTEXT_CHARS   = 9000     # 
MAX_PARENTS         = 6        # top limit parents in kontexte
#
MIN_RERANK_SCORE    = -4

# ── Reranker singleton ───────────────────────────────────────────────────────
_RERANKER = None


def get_reranker():
    global _RERANKER
    if _RERANKER is None:
        from sentence_transformers import CrossEncoder
        try:
            import torch
            device = "cuda" if torch.cuda.is_available() else "cpu"
        except Exception:
            device = "cpu"
        logger.info(f"Načítavam reranker: {RERANKER_MODEL} na {device}")
        _RERANKER = CrossEncoder(RERANKER_MODEL, device=device, max_length=512)
    return _RERANKER


# ── Deiktiká pre query rewriting ─────────────────────────────────────────────
_DEICTIC_PATTERNS = [
    r"\ba (čo|aký|aká|ako|kedy|prečo|potom|ďalej|ten|tá|to|teda)\b",
    r"\b(ten|tá|to|tie|toto|túto|tomto|týmto) ",
    r"\b(vysvetli|rozveď|podrobnejšie|viac|ešte)\b",
    r"\b(predchádzajúc|predošl|prvý|druhý|tretí|ďalší|ďalšia)\b",
]
_DEICTIC_RE = re.compile("|".join(_DEICTIC_PATTERNS), re.IGNORECASE)


def _needs_rewrite(question: str) -> bool:
    q = question.strip()
    if len(q.split()) < 4:
        return True
    return bool(_DEICTIC_RE.search(q))


# ╔══════════════════════════════════════════════════════════════════════════╗
# ║                           RAGState                                       ║
# ╚══════════════════════════════════════════════════════════════════════════╝
class RAGState(TypedDict, total=False):
    question: str
    chat_history: list[dict]
    standalone_question: str
    retrieved: list[tuple[Document, float]]
    reranked: list[tuple[Document, float]]
    context_docs: list[Document]
    context_text: str
    answer: str
    source_docs: list[Document]
    retrieval_debug: dict


# ╔══════════════════════════════════════════════════════════════════════════╗
# ║                           RAGGraph                                       ║
# ╚══════════════════════════════════════════════════════════════════════════╝
class RAGGraph:
    """Hlavná RAG trieda — LangGraph pipeline s parent/child retrievalom."""

    def __init__(self, vector_store: NotebookVectorStore, parent_store: ParentStore):
        self.vs = vector_store
        self.ps = parent_store

        # Hlavný generátor: nízka teplota pre faktualitu
        self.llm = ChatOllama(
            model=LLM_MODEL,
            base_url=OLLAMA_BASE_URL,
            temperature=0.1,
            num_predict=1024,
            num_ctx=8192,
        )
        # Rýchly LLM pre rewrite (kratšie výstupy)
        self.rewriter_llm = ChatOllama(
            model=LLM_MODEL,
            base_url=OLLAMA_BASE_URL,
            temperature=0.0,
            num_predict=150,
            num_ctx=2048,
        )

        self.graph = self._build_graph()

    # ─── Build graph ─────────────────────────────────────────────────────────
    def _build_graph(self):
        g = StateGraph(RAGState)
        g.add_node("rewrite",  self._rewrite_node)
        g.add_node("retrieve", self._retrieve_node)
        g.add_node("rerank",   self._rerank_node)
        g.add_node("expand",   self._expand_node)
        g.add_node("generate", self._generate_node)

        g.set_entry_point("rewrite")
        g.add_edge("rewrite", "retrieve")

        g.add_conditional_edges(
            "retrieve",
            lambda s: "empty" if not s.get("retrieved") else "ok",
            {"empty": END, "ok": "rerank"},
        )
        g.add_conditional_edges(
            "rerank",
            lambda s: "empty" if not s.get("reranked") else "ok",
            {"empty": END, "ok": "expand"},
        )
        g.add_edge("expand", "generate")
        g.add_edge("generate", END)

        return g.compile()

    # ─── Node: rewrite ───────────────────────────────────────────────────────
    def _rewrite_node(self, state: RAGState) -> dict:
        question = state["question"]
        history  = state.get("chat_history") or []

        # no history  → skip
        if not history or not _needs_rewrite(question):
            return {"standalone_question": question}

        # Posledné 4 správy ako kontext
        recent = history[-4:]
        convo = "\n".join(
            f"{'Študent' if m.get('role') == 'user' else 'Asistent'}: {m.get('content','')}"
            for m in recent
        )

        prompt = (
            "Daná je konverzácia a posledná otázka študenta. Ak otázka odkazuje na "
            "predchádzajúci kontext (napr. 'a čo to druhé?', 'vysvetli to'), prepíš ju "
            "ako samostatnú, úplnú otázku v slovenčine. Ak je už samostatná, vráť ju nezmenenú.\n"
            "VRÁŤ IBA prepísanú otázku. Žiadne úvody, žiadne vysvetlenia, žiadne úvodzovky.\n\n"
            f"KONVERZÁCIA:\n{convo}\n\n"
            f"POSLEDNÁ OTÁZKA: {question}\n\n"
            "SAMOSTATNÁ OTÁZKA:"
        )

        try:
            resp = self.rewriter_llm.invoke([HumanMessage(content=prompt)])
            rewritten = resp.content.strip().strip('"').strip("'").strip()
            # Odstráň prípadný prefix typu "Samostatná otázka: ..."
            rewritten = re.sub(r"^(samostatn[aá]?\s*ot[áa]zka[:\-]?\s*)", "", rewritten, flags=re.I)
            if 5 < len(rewritten) < 400:
                logger.info(f"Rewrite: {question!r} → {rewritten!r}")
                return {"standalone_question": rewritten}
        except Exception as e:
            logger.warning(f"Rewrite zlyhal: {e}")

        return {"standalone_question": question}

    # ─── Node: hybrid retrieve ───────────────────────────────────────────────
    def _retrieve_node(self, state: RAGState) -> dict:
        query = state.get("standalone_question") or state["question"]

        if not self.vs.has_documents():
            logger.info("Retrieve: vector store je prázdny.")
            return {
                "retrieved": [],
                "answer": NOT_FOUND_MSG,
                "source_docs": [],
                "retrieval_debug": {"query": query, "note": "prázdny index"},
            }

        results = self.vs.hybrid_search(query, k=INITIAL_K)
        logger.info(f"Retrieve: {len(results)} kandidátov pre {query!r}")

        if not results:
            return {
                "retrieved": [],
                "answer": NOT_FOUND_MSG,
                "source_docs": [],
                "retrieval_debug": {"query": query, "note": "hybrid search 0 výsledkov"},
            }

        return {"retrieved": results}

    # ─── Node: rerank ────────────────────────────────────────────────────────
    def _rerank_node(self, state: RAGState) -> dict:
        query   = state.get("standalone_question") or state["question"]
        results = state.get("retrieved", [])

        if not results:
            return {"reranked": [], "answer": NOT_FOUND_MSG, "source_docs": []}

        reranker = get_reranker()
        docs  = [doc for doc, _ in results]
        pairs = [(query, d.page_content) for d in docs]

        try:
            scores = reranker.predict(pairs, show_progress_bar=False, batch_size=16)
            scores = [float(s) for s in scores]
        except Exception as e:
            logger.error(f"Reranker zlyhal: {e}")
            # Fallback — hybrid skóre
            scores = [float(s) for _, s in results]

        scored = list(zip(docs, scores))
        scored.sort(key=lambda x: x[1], reverse=True)

        # Filter slabých kandidátov
        kept = [(d, s) for d, s in scored[:RERANK_KEEP_K] if s > MIN_RERANK_SCORE]
        top_raw = [round(s, 3) for _, s in scored[:5]]

        logger.info(f"Rerank: kept={len(kept)} / {len(scored)}; top_raw={top_raw}")

        if not kept:
            return {
                "reranked": [],
                "answer": NOT_FOUND_MSG,
                "source_docs": [],
                "retrieval_debug": {
                    "query": query,
                    "note": f"žiadny kandidát nad prahom {MIN_RERANK_SCORE}",
                    "top_raw_scores": top_raw,
                },
            }

        return {
            "reranked": kept,
            "retrieval_debug": {
                "query": query,
                "initial_retrieved": len(results),
                "after_rerank": len(kept),
                "top_scores": [round(s, 3) for _, s in kept],
            },
        }

    # ─── Node: parent expansion ──────────────────────────────────────────────
    def _expand_node(self, state: RAGState) -> dict:
        reranked = state.get("reranked", [])
        if not reranked:
            return {"context_docs": [], "context_text": "", "source_docs": []}

        # 1) Pokús sa rozšíriť na parentov (ak ParentStore ponúka `get`)
        parent_order: list[str] = []
        seen: set[str] = set()
        for doc, _ in reranked:
            pid = doc.metadata.get("parent_id")
            if pid and pid not in seen:
                seen.add(pid)
                parent_order.append(pid)

        parents: list[Document] = []
        for pid in parent_order[:MAX_PARENTS]:
            p = self._fetch_parent(pid)
            if p is not None:
                parents.append(p)

        # 2) Ak parents nie sú dostupné, použi rerankované child chunky
        context_docs = parents if parents else [d for d, _ in reranked[:RERANK_KEEP_K]]

        # 3) Rozpočet znakov
        limited: list[Document] = []
        total = 0
        for d in context_docs:
            L = len(d.page_content)
            if limited and total + L > MAX_CONTEXT_CHARS:
                break
            limited.append(d)
            total += L

        # 4) source_docs pre UI = child chunky (majú presné čísla strán + images)
        source_docs = [d for d, _ in reranked[:RERANK_KEEP_K]]
        context_text = self._format_context(limited)

        logger.info(f"Kontext: {len(limited)} docs, ~{total} znakov, "
                    f"{'parenti' if parents else 'childovia'}")

        return {
            "context_docs": limited,
            "context_text": context_text,
            "source_docs":  source_docs,
        }

    def _fetch_parent(self, parent_id: str) -> Optional[Document]:
        """Robustne skúsi rôzne rozhrania ParentStore."""
        if not parent_id or self.ps is None:
            return None
        # Skúsi `get`, `fetch`, `mget`, `__getitem__`
        for method_name in ("get", "fetch"):
            fn = getattr(self.ps, method_name, None)
            if callable(fn):
                try:
                    r = fn(parent_id)
                    if isinstance(r, Document):
                        return r
                    if isinstance(r, list) and r and isinstance(r[0], Document):
                        return r[0]
                except Exception:
                    continue
        # mget (langchain storage interface)
        mget = getattr(self.ps, "mget", None)
        if callable(mget):
            try:
                rs = mget([parent_id])
                if rs and rs[0] is not None:
                    r = rs[0]
                    return r if isinstance(r, Document) else None
            except Exception:
                pass
        return None

    # ─── Node: generate ──────────────────────────────────────────────────────
    def _generate_node(self, state: RAGState) -> dict:
        context_docs = state.get("context_docs", [])
        context      = state.get("context_text", "")
        q_orig       = state["question"]
        q_std        = state.get("standalone_question") or q_orig

        if not context.strip():
            return {"answer": NOT_FOUND_MSG, "source_docs": []}

        
        available_sources = sorted({
            d.metadata.get("source", "") for d in context_docs
            if d.metadata.get("source")
        })

        system = self._system_prompt(available_sources)
        user   = self._user_prompt(q_std, context)

        try:
            resp = self.llm.invoke([
                SystemMessage(content=system),
                HumanMessage(content=user),
            ])
            answer = resp.content.strip()
        except Exception as e:
            logger.error(f"LLM zlyhal: {e}")
            return {"answer": f"⚠️ Chyba pri generovaní: {e}", "source_docs": []}

        if self._looks_like_refusal(answer):
            logger.info("Model sám priznal neznalosť → NOT_FOUND_MSG")
            return {"answer": NOT_FOUND_MSG, "source_docs": []}

        cited_sources = self._filter_cited_sources(answer, state.get("source_docs", []))
        return {"answer": answer, "source_docs": cited_sources}

    # ─── Prompty ─────────────────────────────────────────────────────────────
    @staticmethod
    def _system_prompt(available_sources: list[str]) -> str:
        # Vytvor explicitný zoznam dostupných zdrojov
        if available_sources:
            src_list = "\n".join(f"  • {s}" for s in available_sources)
            src_block = (
                f"DOSTUPNÉ ZDROJE (existujú IBA tieto súbory — žiadne iné):\n{src_list}\n\n"
            )
        else:
            src_block = ""

        return (
            "Si študijný asistent pre vysokoškolských študentov. Odpovedáš VÝHRADNE "
            "na základe zdrojov poskytnutých v sekcii KONTEXT. Si vecný, presný a pedagogický.\n\n"
            f"{src_block}"
            "━━━━━━━━━━━━━━ PRAVIDLÁ (DODRŽIAVAJ PRÍSNE) ━━━━━━━━━━━━━━\n"
            "1. Používaj IBA informácie z KONTEXTU. NIKDY nedopĺňaj vlastné znalosti.\n"
            f"2. Ak odpoveď v KONTEXTE NIE JE, vráť PRESNE: \"{NOT_FOUND_MSG}\"\n"
            "3. CITÁCIE — KRITICKY DÔLEŽITÉ:\n"
            "   • Cituj PRESNE v hranatých zátvorkách s NÁZVOM SÚBORU a číslom strany:\n"
            "     [názov_súboru.pdf, s. 282]\n"
            "   • Názov súboru musí byť PRESNE ten zo zoznamu DOSTUPNÝCH ZDROJOV.\n"
            "   • NIKDY nepoužívaj čísla zdrojov ako [1, s. X], [2, s. X], [3, s. X].\n"
            "   • NIKDY nevymýšľaj súbory, ktoré nie sú v zozname vyššie.\n"
            "   • Každé faktografické tvrdenie má mať citáciu priamo za vetou.\n"
            "4. MATEMATIKU PÍŠ V LATEXu:\n"
            "   • inline: $x^2 + y^2 = r^2$\n"
            "   • samostatne: $$\\sigma^2 = \\frac{1}{n-1}\\sum_{i=1}^{n}(x_i - \\bar{x})^2$$\n"
            "   • NIKDY nepíš prázdne $$ $$ alebo samostatné ť/kódy — ak vzorec nemáš, vynechaj ho.\n"
            "5. Odpovedaj v SLOVENČINE. Odborné EN termíny v zátvorke: replikácia (replication).\n"
            "6. Ak sú zdroje protichodné, uveď oba pohľady s citáciami.\n"
            "7. Žiadne frázy 'všeobecne', 'typicky', 'zvyčajne', pokiaľ to nie je v KONTEXTE."
        )

    def _user_prompt(self, question: str, context: str) -> str:
        return (
            "KONTEXT — JEDINÝ zdroj, z ktorého smieš čerpať (každý úryvok má svoj názov súboru a stranu):\n"
            "═══════════════════════════════════════════════\n"
            f"{context}\n"
            "═══════════════════════════════════════════════\n\n"
            f"OTÁZKA ŠTUDENTA: {question}\n\n"
            "Odpoveď v slovenčine s citáciami presne podľa vzoru [súbor.pdf, s. X] "
            "a LaTeX vzorcami. Cituj iba reálne názvy súborov z KONTEXTU:"
        )

    @staticmethod
    def _format_context(docs: list[Document]) -> str:
        """
        Format:  [name of file, page. X].
        .
        """
        blocks = []
        for d in docs:
            src  = d.metadata.get("source", "neznámy_zdroj")
            page = d.metadata.get("page", "?")
            blocks.append(
                f"━━━ [{src}, s. {page}] ━━━\n"
                f"{d.page_content.strip()}"
            )
        return "\n\n".join(blocks)

    # ─── Post-processing helpers ─────────────────────────────────────────────
    @staticmethod
    def _looks_like_refusal(answer: str) -> bool:
       
        if NOT_FOUND_MSG in answer:
            return False  # už je to správna forma
        low = answer.lower()
        triggers = [
            "nie je uvedené v dokumentoch",
            "v dokumentoch som nenašiel",
            "v zdrojoch nie je",
            "v kontexte sa nenachádza",
            "nemám k dispozícii informácie",
            "v poskytnutých zdrojoch nie",
            "nenašiel som informáciu",
        ]
        # 
        return len(answer) < 300 and any(t in low for t in triggers)

    @staticmethod
    def _filter_cited_sources(answer: str, source_docs: list[Document]) -> list[Document]:
        """
        
        """
        if not source_docs:
            return []

        # [súbor.pdf, s. 3]  |  [súbor, strana 3]  |  [súbor.pdf, p. 3]
        pat = re.compile(
            r"\[([^\[\]\n]+?)[,;]\s*(?:s\.?|str\.?|strana|strane|page|p\.?)\s*(\d+)\s*\]",
            re.IGNORECASE,
        )
        cited: set[tuple[str, int]] = set()
        for m in pat.finditer(answer):
            src  = m.group(1).strip().lower()
            page = int(m.group(2))
            cited.add((src, page))

        if not cited:
            
            return source_docs

        kept: list[Document] = []
        seen: set[tuple[str, int]] = set()
        for d in source_docs:
            d_src  = (d.metadata.get("source") or "").lower()
            d_page = int(d.metadata.get("page") or 0)
            key = (d_src, d_page)
            if key in seen:
                continue
            # Fuzzy match: 
            hit = False
            for c_src, c_page in cited:
                if c_page != d_page:
                    continue
                if c_src == d_src or c_src in d_src or d_src in c_src:
                    hit = True
                    break
            if hit:
                seen.add(key)
                kept.append(d)

        return kept if kept else source_docs

    # ─── Public API ──────────────────────────────────────────────────────────
    def query(
        self,
        question: str,
        chat_history: Optional[list[dict]] = None,
    ) -> tuple[str, list[Document], dict]:
        """
        Spusti RAG pipeline.

        Returns:
            (answer, source_docs, retrieval_debug)
            - answer: slovenská odpoveď s [citáciami] a LaTeXom
            - source_docs: iba dokumenty reálne citované v odpovedi (pre UI panel)
            - retrieval_debug: dict s info o retrievale (top_scores, counts)
        """
        init_state: RAGState = {
            "question":     question,
            "chat_history": chat_history or [],
        }
        try:
            final = self.graph.invoke(init_state)
        except Exception as e:
            logger.error(f"RAG graph pipeline zlyhal: {e}", exc_info=True)
            return f"⚠️ Chyba RAG pipeline: {e}", [], {}

        answer = (final.get("answer") or NOT_FOUND_MSG).strip()
        sources = final.get("source_docs", []) or []
        debug   = final.get("retrieval_debug", {}) or {}


        if answer == NOT_FOUND_MSG:
            sources = []

        return answer, sources, debug
        
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  Help with local RAG pipeline – poor retrieval quality, wrong page numbers IchNar 3 85 Apr-17-2026, 11:20 PM
Last Post: Pedroski55
  Regression with pipeline and GridSearch patite 0 2,207 Jul-31-2020, 01:40 PM
Last Post: patite
  Python+Dash+ Can't get menu page to feed to page georgelza 0 3,148 Apr-15-2018, 02:09 PM
Last Post: georgelza

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020