File size: 3,473 Bytes
44198e0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
from typing import List, Dict, Any
import numpy as np
from langchain.vectorstores import FAISS
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import HuggingFaceEmbeddings
from search_engine import WebSearchEngine
import logging

logger = logging.getLogger(__name__)

class RAGEngine:
    def __init__(self):
        self.web_search = WebSearchEngine()
        self.embeddings = HuggingFaceEmbeddings(
            model_name="sentence-transformers/all-MiniLM-L6-v2",
            model_kwargs={"device": "cpu"}
        )
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=500,
            chunk_overlap=50
        )
        self.vector_store = None
        
    def process_and_store_content(self, content: str, metadata: Dict[str, Any] = None) -> None:
        """Process content and store in vector store"""
        try:
            # Split content into chunks
            texts = self.text_splitter.split_text(content)
            
            # Create metadata for each chunk
            metadatas = [metadata or {}] * len(texts)
            
            # Initialize or update vector store
            if self.vector_store is None:
                self.vector_store = FAISS.from_texts(texts, self.embeddings, metadatas=metadatas)
            else:
                self.vector_store.add_texts(texts, metadatas=metadatas)
                
        except Exception as e:
            logger.error(f"Error processing content: {str(e)}")
            raise

    async def search_and_process(self, query: str, max_results: int = 5, similarity_k: int = 3) -> Dict:
        """Search the web and process results with RAG"""
        try:
            # Get web search results
            web_results = self.web_search.search(query, max_results)
            
            # Process and store new content
            for result in web_results['results']:
                if 'content' in result:
                    self.process_and_store_content(
                        result['content'],
                        metadata={'url': result.get('url'), 'title': result.get('title')}
                    )
            
            # Perform similarity search
            if self.vector_store:
                similar_docs = self.vector_store.similarity_search_with_score(
                    query,
                    k=similarity_k
                )
                
                # Add similarity results
                web_results['similar_chunks'] = [
                    {
                        'content': doc[0].page_content,
                        'metadata': doc[0].metadata,
                        'similarity_score': doc[1]
                    }
                    for doc in similar_docs
                ]
            
            return web_results
            
        except Exception as e:
            logger.error(f"Error in search_and_process: {str(e)}")
            raise

    def get_relevant_context(self, query: str, k: int = 3) -> List[Dict]:
        """Get most relevant context from vector store"""
        if not self.vector_store:
            return []
            
        similar_docs = self.vector_store.similarity_search_with_score(query, k=k)
        return [
            {
                'content': doc[0].page_content,
                'metadata': doc[0].metadata,
                'similarity_score': doc[1]
            }
            for doc in similar_docs
        ]