aiws / rag_engine.py
fikird
Update dependencies and imports for langchain-community
a4caf5b
raw
history blame
3.49 kB
from typing import List, Dict, Any
import numpy as np
from langchain_community.vectorstores import FAISS
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.embeddings import HuggingFaceEmbeddings
from search_engine import WebSearchEngine
import logging
logger = logging.getLogger(__name__)
class RAGEngine:
def __init__(self):
self.web_search = WebSearchEngine()
self.embeddings = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-MiniLM-L6-v2",
model_kwargs={"device": "cpu"}
)
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=50
)
self.vector_store = None
def process_and_store_content(self, content: str, metadata: Dict[str, Any] = None) -> None:
"""Process content and store in vector store"""
try:
# Split content into chunks
texts = self.text_splitter.split_text(content)
# Create metadata for each chunk
metadatas = [metadata or {}] * len(texts)
# Initialize or update vector store
if self.vector_store is None:
self.vector_store = FAISS.from_texts(texts, self.embeddings, metadatas=metadatas)
else:
self.vector_store.add_texts(texts, metadatas=metadatas)
except Exception as e:
logger.error(f"Error processing content: {str(e)}")
raise
async def search_and_process(self, query: str, max_results: int = 5, similarity_k: int = 3) -> Dict:
"""Search the web and process results with RAG"""
try:
# Get web search results
web_results = self.web_search.search(query, max_results)
# Process and store new content
for result in web_results['results']:
if 'content' in result:
self.process_and_store_content(
result['content'],
metadata={'url': result.get('url'), 'title': result.get('title')}
)
# Perform similarity search
if self.vector_store:
similar_docs = self.vector_store.similarity_search_with_score(
query,
k=similarity_k
)
# Add similarity results
web_results['similar_chunks'] = [
{
'content': doc[0].page_content,
'metadata': doc[0].metadata,
'similarity_score': doc[1]
}
for doc in similar_docs
]
return web_results
except Exception as e:
logger.error(f"Error in search_and_process: {str(e)}")
raise
def get_relevant_context(self, query: str, k: int = 3) -> List[Dict]:
"""Get most relevant context from vector store"""
if not self.vector_store:
return []
similar_docs = self.vector_store.similarity_search_with_score(query, k=k)
return [
{
'content': doc[0].page_content,
'metadata': doc[0].metadata,
'similarity_score': doc[1]
}
for doc in similar_docs
]