Spaces:
Build error
Build error
| import os | |
| import subprocess | |
| import asyncio | |
| import time | |
| import threading | |
| import gc | |
| import psutil | |
| import torch | |
| from fastapi import FastAPI, HTTPException | |
| from pydantic import BaseModel | |
| import uvicorn | |
| from langchain.prompts import PromptTemplate | |
| from langchain_community.chat_models import ChatOllama | |
| from langchain_community.tools import DuckDuckGoSearchRun | |
| from langchain_community.utilities import DuckDuckGoSearchAPIWrapper | |
| from langchain_core.output_parsers import JsonOutputParser, StrOutputParser | |
| from langgraph.graph import END, StateGraph | |
| from typing_extensions import TypedDict | |
| # Aseg煤rate de tener la librer铆a Ollama instalada | |
| # pip install ollama langchain langchain_community langgraph | |
| os.system("curl -fsSL https://ollama.com/install.sh | sh") | |
| # Funci贸n para instalar Ollama | |
| def install_ollama(): | |
| try: | |
| print("Instalando Ollama...") | |
| subprocess.run("curl -fsSL https://ollama.com/install.sh | sh", shell=True, check=False) | |
| print("Ollama instalado con 茅xito.") | |
| except subprocess.CalledProcessError as e: | |
| print(f"Error al instalar Ollama: {e}") | |
| raise | |
| async def async_download_ollamahkvh(): | |
| asyncio.run(install_ollama()) | |
| # Iniciar el servicio de Ollama en un hilo | |
| def ollama_service_thread(): | |
| print("Iniciando el servicio de Ollama") | |
| subprocess.run("ollama serve", shell=True) | |
| # Funci贸n para descargar el modelo de Ollama | |
| async def download_ollama_model(model_name='hf.co/MaziyarPanahi/Llama-3.2-3B-Instruct-uncensored-GGUF:IQ1_S'): | |
| try: | |
| print(f"Descargando el modelo: {model_name}") | |
| subprocess.run(["ollama", "pull", model_name], check=False) | |
| except subprocess.CalledProcessError as e: | |
| print(f"Error al descargar el modelo: {e}") | |
| raise | |
| # Funci贸n as铆ncrona para manejar la descarga del modelo | |
| async def async_download_ollama_model(): | |
| await asyncio.to_thread(download_ollama_model) | |
| # Crear un hilo para iniciar Ollama | |
| OLLAMA_SERVICE_THREAD = threading.Thread(target=ollama_service_thread) | |
| OLLAMA_SERVICE_THREAD.start() | |
| # Esperar a que Ollama est茅 listo | |
| print("Esperando a que Ollama inicie...") | |
| time.sleep(10) | |
| # Descargar el modelo de Hugging Face si no est谩 disponible | |
| asyncio.run(async_download_ollama_model()) | |
| # Crear instancia de FastAPI | |
| app = FastAPI() | |
| # Definici贸n del modelo de datos para recibir las consultas en la API | |
| class QueryRequest(BaseModel): | |
| query: str | |
| # Definir el modelo de lenguaje de Ollama (sin 'temperature') | |
| local_llm = 'hf.co/MaziyarPanahi/Llama-3.2-3B-Instruct-uncensored-GGUF:IQ1_S' | |
| llama3 = ChatOllama(model=local_llm) | |
| # Definir la herramienta de b煤squeda web usando DuckDuckGo | |
| wrapper = DuckDuckGoSearchAPIWrapper(max_results=1) | |
| web_search_tool = DuckDuckGoSearchRun(api_wrapper=wrapper) | |
| # Definici贸n de los prompts para generaci贸n y enrutamiento | |
| generate_prompt = PromptTemplate( | |
| template=""" | |
| <|begin_of_text|> | |
| <|start_header_id|>system<|end_header_id|> | |
| You are an AI assistant for Research Question Tasks, that synthesizes web search results. | |
| Strictly use the following pieces of web search context to answer the question. If you don't know the answer, just say that you don't know. | |
| Keep the answer concise, but provide all of the details you can in the form of a research report. | |
| Only make direct references to material if provided in the context. | |
| <|eot_id|> | |
| <|start_header_id|>user<|end_header_id|> | |
| Question: {question} | |
| Web Search Context: {context} | |
| Answer: | |
| <|eot_id|> | |
| <|start_header_id|>assistant<|end_header_id|>""", | |
| input_variables=["question", "context"], | |
| ) | |
| generate_chain = generate_prompt | llama3 | StrOutputParser() | |
| router_prompt = PromptTemplate( | |
| template=""" | |
| <|begin_of_text|> | |
| <|start_header_id|>system<|end_header_id|> | |
| You are an expert at routing a user question to either the generation stage or web search. | |
| Use the web search for questions that require more context for a better answer, or recent events. | |
| Otherwise, you can skip and go straight to the generation phase to respond. | |
| You do not need to be stringent with the keywords in the question related to these topics. | |
| Give a binary choice 'web_search' or 'generate' based on the question. | |
| Return the JSON with a single key 'choice' with no premable or explanation. | |
| Question to route: {question} | |
| <|eot_id|> | |
| <|start_header_id|>assistant<|end_header_id|>""", | |
| input_variables=["question"], | |
| ) | |
| question_router = router_prompt | llama3 | JsonOutputParser() | |
| query_prompt = PromptTemplate( | |
| template=""" | |
| <|begin_of_text|> | |
| <|start_header_id|>system<|end_header_id|> | |
| You are an expert at crafting web search queries for research questions. | |
| More often than not, a user will ask a basic question that they wish to learn more about, however it might not be in the best format. | |
| Reword their query to be the most effective web search string possible. | |
| Return the JSON with a single key 'query' with no premable or explanation. | |
| Question to transform: {question} | |
| <|eot_id|> | |
| <|start_header_id|>assistant<|end_header_id|>""", | |
| input_variables=["question"], | |
| ) | |
| query_chain = query_prompt | llama3 | JsonOutputParser() | |
| # Definir el estado del grafo | |
| class GraphState(TypedDict): | |
| question: str | |
| generation: str | |
| search_query: str | |
| context: str | |
| # Nodos de procesamiento | |
| async def generate(state): | |
| print("Step: Generating Final Response") | |
| question = state["question"] | |
| context = state["context"] | |
| generation = await asyncio.to_thread(generate_chain.invoke, {"context": context, "question": question}) | |
| return {"generation": generation} | |
| async def transform_query(state): | |
| print("Step: Optimizing Query for Web Search") | |
| question = state['question'] | |
| gen_query = await asyncio.to_thread(query_chain.invoke, {"question": question}) | |
| search_query = gen_query.get("query", "") # Asegurarnos de que estamos obteniendo la clave correcta | |
| return {"search_query": search_query} | |
| async def web_search(state): | |
| search_query = state['search_query'] | |
| print(f'Step: Searching the Web for: "{search_query}"') | |
| try: | |
| search_result = await asyncio.to_thread(web_search_tool.invoke, search_query) | |
| if isinstance(search_result, str): # Si la respuesta es una cadena, la convertimos en un diccionario | |
| print(f"Respuesta de b煤squeda web es cadena: {search_result}") | |
| return {"context": search_result} | |
| elif isinstance(search_result, dict): # Si es un diccionario, lo usamos directamente | |
| return {"context": search_result} | |
| else: | |
| raise ValueError("Respuesta de b煤squeda web no es v谩lida") | |
| except Exception as e: | |
| print(f"Web search failed: {e}") | |
| return None # Si la b煤squeda falla, no devuelve contexto | |
| async def route_question(state): | |
| print("Step: Routing Query") | |
| question = state['question'] | |
| output = await asyncio.to_thread(question_router.invoke, {"question": question}) | |
| if output.get('choice') == "web_search": | |
| print("Step: Routing Query to Web Search") | |
| return "websearch" | |
| elif output.get('choice') == 'generate': | |
| print("Step: Routing Query to Generation") | |
| return "generate" | |
| # Crear el grafo de estado | |
| workflow = StateGraph(GraphState) | |
| workflow.add_node("websearch", web_search) | |
| workflow.add_node("transform_query", transform_query) | |
| workflow.add_node("generate", generate) | |
| workflow.set_conditional_entry_point( | |
| route_question, | |
| { | |
| "websearch": "transform_query", | |
| "generate": "generate", | |
| }, | |
| ) | |
| workflow.add_edge("transform_query", "websearch") | |
| workflow.add_edge("websearch", "generate") | |
| workflow.add_edge("generate", END) | |
| # Compilar el agente | |
| local_agent = workflow.compile() | |
| # Funci贸n para ejecutar el agente | |
| async def run_agent_parallel(query): | |
| output = await asyncio.to_thread(local_agent.invoke, {"question": query}) | |
| return output['generation'] | |
| # L贸gica del servidor FastAPI | |
| async def query_endpoint(request: QueryRequest): | |
| query = request.query | |
| return {"response": await run_agent_parallel(query)} | |
| # L贸gica de recursos | |
| def release_resources(): | |
| try: | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| except Exception as e: | |
| print(f"Failed to release resources: {e}") | |
| def resource_manager(): | |
| MAX_RAM_PERCENT = 1 | |
| MAX_CPU_PERCENT = 1 | |
| MAX_GPU_PERCENT = 1 | |
| MAX_RAM_MB = 1 | |
| while True: | |
| try: | |
| virtual_mem = psutil.virtual_memory() | |
| current_ram_percent = virtual_mem.percent | |
| current_ram_mb = virtual_mem.used / (1 * 1) # Convert to MB | |
| if current_ram_percent > MAX_RAM_PERCENT or current_ram_mb > MAX_RAM_MB: | |
| release_resources() | |
| current_cpu_percent = psutil.cpu_percent() | |
| if current_cpu_percent > MAX_CPU_PERCENT: | |
| psutil.Process(os.getpid()).nice() | |
| if torch.cuda.is_available(): | |
| gpu = torch.cuda.current_device() | |
| gpu_mem = torch.cuda.memory_percent(gpu) | |
| if gpu_mem > MAX_GPU_PERCENT: | |
| release_resources() | |
| except Exception as e: | |
| print(f"Error in resource manager: {e}") | |
| resource_manager() | |
| if __name__ == "__main__": | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |