aiws / rag_engine.py
fikird
Fix async issues and improve error handling
edb4444
from typing import List, Dict, Any
import numpy as np
from langchain_community.vectorstores import FAISS
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.embeddings import HuggingFaceEmbeddings
from search_engine import WebSearchEngine
import logging
logger = logging.getLogger(__name__)
class RAGEngine:
def __init__(self):
self.web_search = WebSearchEngine()
self.embeddings = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-MiniLM-L6-v2",
model_kwargs={"device": "cpu"}
)
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=50
)
self.vector_store = None
def process_and_store_content(self, content: str, metadata: Dict[str, Any] = None) -> None:
"""Process content and store in vector store"""
try:
# Split content into chunks
texts = self.text_splitter.split_text(content)
# Create metadata for each chunk
metadatas = [metadata or {}] * len(texts)
# Initialize or update vector store
if self.vector_store is None:
self.vector_store = FAISS.from_texts(texts, self.embeddings, metadatas=metadatas)
else:
self.vector_store.add_texts(texts, metadatas=metadatas)
except Exception as e:
logger.error(f"Error processing content: {str(e)}")
raise
def search_and_process(self, query: str, max_results: int = 5, similarity_k: int = 3) -> Dict:
"""Search the web and process results with RAG"""
try:
# Get web search results
web_results = self.web_search.search(query, max_results)
if 'error' in web_results:
return {'error': web_results['error']}
if not web_results.get('results'):
return {'error': 'No results found'}
# Process and store new content
for result in web_results['results']:
if result and isinstance(result, dict) and 'content' in result:
try:
self.process_and_store_content(
result['content'],
metadata={'url': result.get('url'), 'title': result.get('title')}
)
except Exception as e:
logger.error(f"Error processing result: {str(e)}")
continue
# Perform similarity search if we have stored vectors
if self.vector_store:
try:
similar_docs = self.vector_store.similarity_search_with_score(
query,
k=similarity_k
)
# Add similarity results to web results
web_results['similar_chunks'] = [
{
'content': doc[0].page_content,
'metadata': doc[0].metadata,
'similarity_score': float(doc[1])
}
for doc in similar_docs
]
except Exception as e:
logger.error(f"Error in similarity search: {str(e)}")
return web_results
except Exception as e:
logger.error(f"Error in search_and_process: {str(e)}")
return {'error': f"Search failed: {str(e)}"}
def get_relevant_context(self, query: str, k: int = 3) -> List[Dict]:
"""Get most relevant context from vector store"""
if not self.vector_store:
return []
similar_docs = self.vector_store.similarity_search_with_score(query, k=k)
return [
{
'content': doc[0].page_content,
'metadata': doc[0].metadata,
'similarity_score': float(doc[1])
}
for doc in similar_docs
]