import subprocess
from pathlib import Path
from typing import List, Tuple

import streamlit as st
from dotenv import load_dotenv
from haystack.components.builders import AnswerBuilder, PromptBuilder
from haystack.components.converters import TextFileToDocument
from haystack.components.generators.openai import OpenAIGenerator
from haystack.components.preprocessors import (
    DocumentCleaner,
    DocumentSplitter,
)
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.components.writers import DocumentWriter
from haystack.core.pipeline import Pipeline
from haystack.dataclasses import GeneratedAnswer
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.document_stores.types import DuplicatePolicy

# Load the environment variables, we're going to need it for OpenAI
load_dotenv()

# This is the list of documentation that we're going to fetch
DOCUMENTATIONS = [
    (
        "DocArray",
        "https://github.com/docarray/docarray",
        "./docs/**/*.md",
    ),
    (
        "Streamlit",
        "https://github.com/streamlit/docs",
        "./content/**/*.md",
    ),
    (
        "Jinja",
        "https://github.com/pallets/jinja",
        "./docs/**/*.rst",
    ),
    (
        "Pandas",
        "https://github.com/pandas-dev/pandas",
        "./doc/source/**/*.rst",
    ),
    (
        "Elasticsearch",
        "https://github.com/elastic/elasticsearch",
        "./docs/**/*.asciidoc",
    ),
    (
        "NumPy",
        "https://github.com/numpy/numpy",
        "./doc/**/*.rst",
    ),
]

DOCS_PATH = Path(__file__).parent / "downloaded_docs"


@st.cache_data(show_spinner=False)
def fetch(documentations: List[Tuple[str, str, str]]):
    files = []
    # Create the docs path if it doesn't exist
    DOCS_PATH.mkdir(parents=True, exist_ok=True)

    for name, url, pattern in documentations:
        st.write(f"Fetching {name} repository")
        repo = DOCS_PATH / name
        # Attempt cloning only if it doesn't exist
        if not repo.exists():
            subprocess.run(["git", "clone", "--depth", "1", url, str(repo)], check=True)
        res = subprocess.run(
            ["git", "rev-parse", "--abbrev-ref", "HEAD"],
            check=True,
            capture_output=True,
            encoding="utf-8",
            cwd=repo,
        )
        branch = res.stdout.strip()
        for p in repo.glob(pattern):
            data = {
                "path": p,
                "meta": {
                    "url_source": f"{url}/tree/{branch}/{p.relative_to(repo)}",
                    "suffix": p.suffix,
                },
            }
            files.append(data)

    return files


@st.cache_resource(show_spinner=False)
def document_store(index: str = "documentation"):
    # We're going to store the processed documents in here
    return InMemoryDocumentStore(index=index)


@st.cache_resource(show_spinner=False)
def index_files(files):
    # We create some components
    text_converter = TextFileToDocument()
    document_cleaner = DocumentCleaner()
    document_splitter = DocumentSplitter()
    document_writer = DocumentWriter(
        document_store=document_store(), policy=DuplicatePolicy.OVERWRITE
    )

    # And our pipeline
    indexing_pipeline = Pipeline()
    indexing_pipeline.add_component("converter", text_converter)
    indexing_pipeline.add_component("cleaner", document_cleaner)
    indexing_pipeline.add_component("splitter", document_splitter)
    indexing_pipeline.add_component("writer", document_writer)
    indexing_pipeline.connect("converter", "cleaner")
    indexing_pipeline.connect("cleaner", "splitter")
    indexing_pipeline.connect("splitter", "writer")

    # And now we save the documentation in our InMemoryDocumentStore
    paths = []
    meta = []
    for f in files:
        paths.append(f["path"])
        meta.append(f["meta"])
    indexing_pipeline.run(
        {
            "converter": {
                "sources": paths,
                "meta": meta,
            }
        }
    )


def search(question: str) -> GeneratedAnswer:
    retriever = InMemoryBM25Retriever(document_store=document_store(), top_k=5)

    template = (
        "Using the information contained in the context, give a comprehensive answer to the question."
        "If the answer cannot be deduced from the context, do not give an answer."
        "Context: {{ documents|map(attribute='content')|join(';')|replace('\n', ' ') }}"
        "Question: {{ query }}"
        "Answer:"
    )
    prompt_builder = PromptBuilder(template)

    generator = OpenAIGenerator(model="gpt-4o")
    answer_builder = AnswerBuilder()

    query_pipeline = Pipeline()

    query_pipeline.add_component("docs_retriever", retriever)
    query_pipeline.add_component("prompt_builder", prompt_builder)
    query_pipeline.add_component("llm", generator)
    query_pipeline.add_component("answer_builder", answer_builder)

    query_pipeline.connect("docs_retriever.documents", "prompt_builder.documents")
    query_pipeline.connect("prompt_builder.prompt", "llm.prompt")
    query_pipeline.connect("docs_retriever.documents", "answer_builder.documents")
    query_pipeline.connect("llm.replies", "answer_builder.replies")
    res = query_pipeline.run({"query": question})
    return res["answer_builder"]["answers"][0]


with st.status(
    "Downloading documentation files...",
    expanded=st.session_state.get("expanded", True),
) as status:
    files = fetch(DOCUMENTATIONS)
    status.update(label="Indexing documentation...")
    index_files(files)
    status.update(
        label="Download and indexing complete!", state="complete", expanded=False
    )
    st.session_state["expanded"] = False


st.header("🔎 Documentation finder", divider="rainbow")

st.caption(
    f"Use this to search answers for {', '.join([d[0] for d in DOCUMENTATIONS])}"
)

if question := st.text_input(
    label="What do you need to know?", placeholder="What is a DataFrame?"
):
    with st.spinner("Waiting"):
        answer = search(question)

    if not st.session_state.get("run_once", False):
        st.balloons()
        st.session_state["run_once"] = True

    st.markdown(answer.data)
    with st.expander("See sources:"):
        for document in answer.documents:
            url_source = document.meta.get("url_source", "")
            st.write(url_source)
            st.text(document.content)
            st.divider()