File size: 4,379 Bytes
e13d87a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
from typing import Dict, List, Optional
from pathlib import Path
import os

from llama_index import VectorStoreIndex, StorageContext
from llama_index.vector_stores import ChromaVectorStore
from llama_index.embeddings import HuggingFaceEmbedding
import chromadb

from indexes.csv_index_builder import EnhancedCSVReader

class CSVIndexManager:
    """Manages creation and retrieval of indexes for CSV files."""
    
    def __init__(self, embedding_model_name: str = "all-MiniLM-L6-v2"):
        self.csv_reader = EnhancedCSVReader()
        self.embed_model = HuggingFaceEmbedding(model_name=embedding_model_name)
        self.chroma_client = chromadb.Client()
        self.indexes = {}
        
    def create_index(self, file_path: str) -> VectorStoreIndex:
        """Create vector index for a CSV file."""
        # Extract filename as identifier
        file_id = Path(file_path).stem
        
        # Load documents with metadata
        documents = self.csv_reader.load_data(file_path)
        
        # Create Chroma collection for this CSV
        chroma_collection = self.chroma_client.create_collection(file_id)
        vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
        storage_context = StorageContext.from_defaults(vector_store=vector_store)
        
        # Create vector index with our embedding model
        index = VectorStoreIndex.from_documents(
            documents,
            storage_context=storage_context,
            embed_model=self.embed_model
        )
        
        # Store in our registry
        self.indexes[file_id] = {
            "index": index,
            "metadata": documents[0].metadata if documents else {}
        }
        
        return index
    
    def index_directory(self, directory_path: str) -> Dict[str, VectorStoreIndex]:
        """Index all CSV files in a directory."""
        indexed_files = {}
        
        # Get all CSV files in directory
        csv_files = [f for f in os.listdir(directory_path) 
                    if f.lower().endswith('.csv')]
        
        # Create index for each CSV file
        for csv_file in csv_files:
            file_path = os.path.join(directory_path, csv_file)
            file_id = Path(file_path).stem
            index = self.create_index(file_path)
            indexed_files[file_id] = index
        
        return indexed_files

    def find_relevant_csvs(self, query: str, top_k: int = 3) -> List[str]:
        """Find most relevant CSV files for a given query."""
        if not self.indexes:
            return []
        
        # Create a document from the query
        query_embedding = self.embed_model.get_text_embedding(query)
        
        # Calculate similarity with each CSV's metadata
        similarities = {}
        for file_id, index_info in self.indexes.items():
            # Get metadata description
            metadata = index_info["metadata"]
            
            # Create a rich description of the CSV
            csv_description = f"CSV file {metadata['filename']} with columns: {', '.join(metadata['columns'])}. "
            csv_description += f"Contains {metadata['row_count']} rows. "
            csv_description += "Sample data: "
            for col, samples in metadata['samples'].items():
                if samples:
                    csv_description += f"{col}: {', '.join(str(s) for s in samples[:2])}; "
            
            # Get embedding for this description
            csv_embedding = self.embed_model.get_text_embedding(csv_description)
            
            # Calculate cosine similarity
            similarity = self._cosine_similarity(query_embedding, csv_embedding)
            similarities[file_id] = similarity
        
        # Sort by similarity and return top_k
        sorted_files = sorted(similarities.items(), key=lambda x: x[1], reverse=True)
        return [file_id for file_id, _ in sorted_files[:top_k]]

    def _cosine_similarity(self, vec1, vec2):
        """Calculate cosine similarity between two vectors."""
        dot_product = sum(a * b for a, b in zip(vec1, vec2))
        norm_a = sum(a * a for a in vec1) ** 0.5
        norm_b = sum(b * b for b in vec2) ** 0.5
        return dot_product / (norm_a * norm_b) if norm_a * norm_b != 0 else 0