Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException, Depends, Security, BackgroundTasks | |
| from fastapi.security import APIKeyHeader | |
| from fastapi.responses import StreamingResponse | |
| from pydantic import BaseModel, Field | |
| from typing import Literal, List, Dict | |
| import os | |
| from functools import lru_cache | |
| from openai import OpenAI | |
| from uuid import uuid4 | |
| import tiktoken | |
| import sqlite3 | |
| import time | |
| from datetime import datetime, timedelta | |
| import asyncio | |
| import requests | |
| from prompts import * | |
| from fastapi_cache import FastAPICache | |
| from fastapi_cache.backends.inmemory import InMemoryBackend | |
| from fastapi_cache.decorator import cache | |
| import logging | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler("app.log"), | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| app = FastAPI() | |
| API_KEY_NAME = "X-API-Key" | |
| API_KEY = os.environ.get("CHAT_AUTH_KEY", "default_secret_key") | |
| api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False) | |
| ModelID = Literal[ | |
| "openai/gpt-4o-mini", | |
| "meta-llama/llama-3-70b-instruct", | |
| "anthropic/claude-3.5-sonnet", | |
| "deepseek/deepseek-coder", | |
| "anthropic/claude-3-haiku", | |
| "openai/gpt-3.5-turbo-instruct", | |
| "qwen/qwen-72b-chat", | |
| "google/gemma-2-27b-it" | |
| ] | |
| class QueryModel(BaseModel): | |
| user_query: str = Field(..., description="User's coding query") | |
| model_id: ModelID = Field( | |
| default="meta-llama/llama-3-70b-instruct", | |
| description="ID of the model to use for response generation" | |
| ) | |
| conversation_id: str = Field(default_factory=lambda: str(uuid4()), description="Unique identifier for the conversation") | |
| user_id: str = Field(..., description="Unique identifier for the user") | |
| class Config: | |
| schema_extra = { | |
| "example": { | |
| "user_query": "How do I implement a binary search in Python?", | |
| "model_id": "meta-llama/llama-3-70b-instruct", | |
| "conversation_id": "123e4567-e89b-12d3-a456-426614174000", | |
| "user_id": "user123" | |
| } | |
| } | |
| class NewsQueryModel(BaseModel): | |
| query: str = Field(..., description="News topic to search for") | |
| model_id: ModelID = Field( | |
| default="openai/gpt-4o-mini", | |
| description="ID of the model to use for response generation" | |
| ) | |
| class Config: | |
| schema_extra = { | |
| "example": { | |
| "query": "Latest developments in AI", | |
| "model_id": "openai/gpt-4o-mini" | |
| } | |
| } | |
| def get_api_keys(): | |
| logger.info("Loading API keys") | |
| return { | |
| "OPENROUTER_API_KEY": f"sk-or-v1-{os.environ['OPENROUTER_API_KEY']}", | |
| "BRAVE_API_KEY": os.environ['BRAVE_API_KEY'] | |
| } | |
| api_keys = get_api_keys() | |
| or_client = OpenAI(api_key=api_keys["OPENROUTER_API_KEY"], base_url="https://openrouter.ai/api/v1") | |
| # In-memory storage for conversations | |
| conversations: Dict[str, List[Dict[str, str]]] = {} | |
| last_activity: Dict[str, float] = {} | |
| # Token encoding | |
| encoding = tiktoken.encoding_for_model("gpt-3.5-turbo") | |
| def limit_tokens(input_string, token_limit=6000): | |
| return encoding.decode(encoding.encode(input_string)[:token_limit]) | |
| def calculate_tokens(msgs): | |
| return sum(len(encoding.encode(str(m))) for m in msgs) | |
| def chat_with_llama_stream(messages, model="openai/gpt-4o-mini", max_llm_history=4, max_output_tokens=2500): | |
| logger.info(f"Starting chat with model: {model}") | |
| while calculate_tokens(messages) > (8000 - max_output_tokens): | |
| if len(messages) > max_llm_history: | |
| messages = [messages[0]] + messages[-max_llm_history:] | |
| else: | |
| max_llm_history -= 1 | |
| if max_llm_history < 2: | |
| error_message = "Token limit exceeded. Please shorten your input or start a new conversation." | |
| logger.error(error_message) | |
| raise HTTPException(status_code=400, detail=error_message) | |
| try: | |
| response = or_client.chat.completions.create( | |
| model=model, | |
| messages=messages, | |
| max_tokens=max_output_tokens, | |
| stream=True | |
| ) | |
| full_response = "" | |
| for chunk in response: | |
| if chunk.choices[0].delta.content is not None: | |
| content = chunk.choices[0].delta.content | |
| full_response += content | |
| yield content | |
| # After streaming, add the full response to the conversation history | |
| messages.append({"role": "assistant", "content": full_response}) | |
| logger.info("Chat completed successfully") | |
| except Exception as e: | |
| logger.error(f"Error in model response: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Error in model response: {str(e)}") | |
| async def verify_api_key(api_key: str = Security(api_key_header)): | |
| if api_key != API_KEY: | |
| logger.warning("Invalid API key used") | |
| raise HTTPException(status_code=403, detail="Could not validate credentials") | |
| return api_key | |
| # SQLite setup | |
| DB_PATH = '/app/data/conversations.db' | |
| def init_db(): | |
| logger.info("Initializing database") | |
| os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) | |
| conn = sqlite3.connect(DB_PATH) | |
| c = conn.cursor() | |
| c.execute('''CREATE TABLE IF NOT EXISTS conversations | |
| (id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| user_id TEXT, | |
| conversation_id TEXT, | |
| message TEXT, | |
| response TEXT, | |
| timestamp DATETIME DEFAULT CURRENT_TIMESTAMP)''') | |
| conn.commit() | |
| conn.close() | |
| logger.info("Database initialized successfully") | |
| init_db() | |
| def update_db(user_id, conversation_id, message, response): | |
| logger.info(f"Updating database for conversation: {conversation_id}") | |
| conn = sqlite3.connect(DB_PATH) | |
| c = conn.cursor() | |
| c.execute('''INSERT INTO conversations (user_id, conversation_id, message, response) | |
| VALUES (?, ?, ?, ?)''', (user_id, conversation_id, message, response)) | |
| conn.commit() | |
| conn.close() | |
| logger.info("Database updated successfully") | |
| async def clear_inactive_conversations(): | |
| while True: | |
| current_time = time.time() | |
| inactive_convos = [conv_id for conv_id, last_time in last_activity.items() | |
| if current_time - last_time > 1800] # 30 minutes | |
| for conv_id in inactive_convos: | |
| if conv_id in conversations: | |
| del conversations[conv_id] | |
| if conv_id in last_activity: | |
| del last_activity[conv_id] | |
| await asyncio.sleep(60) # Check every minute | |
| async def startup_event(): | |
| logger.info("Starting up the application") | |
| FastAPICache.init(InMemoryBackend(), prefix="fastapi-cache") | |
| asyncio.create_task(clear_inactive_conversations()) | |
| async def coding_assistant(query: QueryModel, background_tasks: BackgroundTasks, api_key: str = Depends(verify_api_key)): | |
| """ | |
| Coding assistant endpoint that provides programming help based on user queries. | |
| Available models: | |
| - meta-llama/llama-3-70b-instruct (default) | |
| - anthropic/claude-3.5-sonnet | |
| - deepseek/deepseek-coder | |
| - anthropic/claude-3-haiku | |
| - openai/gpt-3.5-turbo-instruct | |
| - qwen/qwen-72b-chat | |
| - google/gemma-2-27b-it | |
| - openai/gpt-4o-mini | |
| Requires API Key authentication via X-API-Key header. | |
| """ | |
| logger.info(f"Received coding assistant query: {query.user_query}") | |
| if query.conversation_id not in conversations: | |
| conversations[query.conversation_id] = [ | |
| {"role": "system", "content": "You are a helpful assistant proficient in coding tasks. Help the user in understanding and writing code."} | |
| ] | |
| conversations[query.conversation_id].append({"role": "user", "content": query.user_query}) | |
| last_activity[query.conversation_id] = time.time() | |
| # Limit tokens in the conversation history | |
| limited_conversation = conversations[query.conversation_id] | |
| def process_response(): | |
| full_response = "" | |
| for content in chat_with_llama_stream(limited_conversation, model=query.model_id): | |
| full_response += content | |
| yield content | |
| background_tasks.add_task(update_db, query.user_id, query.conversation_id, query.user_query, full_response) | |
| logger.info(f"Completed coding assistant response for query: {query.user_query}") | |
| return StreamingResponse(process_response(), media_type="text/event-stream") | |
| # New functions for news assistant | |
| def internet_search(query, search_type="web", num_results=20): | |
| logger.info(f"Performing internet search for query: {query}, type: {search_type}") | |
| url = f"https://api.search.brave.com/res/v1/{'web' if search_type == 'web' else 'news'}/search" | |
| headers = { | |
| "Accept": "application/json", | |
| "Accept-Encoding": "gzip", | |
| "X-Subscription-Token": api_keys["BRAVE_API_KEY"] | |
| } | |
| params = {"q": query} | |
| response = requests.get(url, headers=headers, params=params) | |
| if response.status_code != 200: | |
| logger.error(f"Failed to fetch search results. Status code: {response.status_code}") | |
| return [] | |
| search_data = response.json()["web"]["results"] if search_type == "web" else response.json()["results"] | |
| processed_results = [ | |
| { | |
| "title": item["title"], | |
| "snippet": item["extra_snippets"][0], | |
| "last_updated": item.get("age", ""), | |
| "url":item.get("url", "") | |
| } | |
| for item in search_data | |
| if item.get("extra_snippets") | |
| ][:num_results] | |
| logger.info(f"Retrieved {len(processed_results)} search results") | |
| return processed_results | |
| def cached_internet_search(query: str): | |
| logger.info(f"Performing cached internet search for query: {query}") | |
| return internet_search(query, search_type="news") | |
| def analyze_data(query, data_type="news"): | |
| logger.info(f"Analyzing {data_type} for query: {query}") | |
| if data_type == "news": | |
| data = cached_internet_search(query) | |
| prompt_generator = generate_news_prompt | |
| system_prompt = NEWS_ASSISTANT_PROMPT | |
| else: | |
| data = internet_search(query, search_type="web") | |
| prompt_generator = generate_search_prompt | |
| system_prompt = SEARCH_ASSISTANT_PROMPT | |
| if not data: | |
| logger.error(f"Failed to fetch {data_type} data") | |
| return None | |
| prompt = prompt_generator(query, data) | |
| messages = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": prompt} | |
| ] | |
| logger.info(f"{data_type.capitalize()} analysis completed") | |
| return messages,data | |
| class QueryModel(BaseModel): | |
| query: str = Field(..., description="Search query") | |
| model_id: ModelID = Field( | |
| default="openai/gpt-4o-mini", | |
| description="ID of the model to use for response generation" | |
| ) | |
| class Config: | |
| schema_extra = { | |
| "example": { | |
| "query": "What are the latest advancements in quantum computing?", | |
| "model_id": "meta-llama/llama-3-70b-instruct" | |
| } | |
| } | |
| def search_assistant_api(query, data_type, model="openai/gpt-4o-mini"): | |
| logger.info(f"Received {data_type} assistant query: {query}") | |
| messages, search_data = analyze_data(query, data_type) | |
| if not messages: | |
| logger.error(f"Failed to fetch {data_type} data") | |
| raise HTTPException(status_code=500, detail=f"Failed to fetch {data_type} data") | |
| def process_response(): | |
| logger.info(f"Generating response using LLM: {messages}") | |
| full_response = "" | |
| for content in chat_with_llama_stream(messages, model=model): | |
| full_response += content | |
| yield content | |
| logger.info(f"Completed {data_type} assistant response for query: {query}") | |
| logger.info(f"LLM Response: {full_response}") | |
| yield "<json><ref>"+ json.dumps(search_data)+"</ref></json>" | |
| return process_response | |
| def create_streaming_response(generator): | |
| return StreamingResponse(generator(), media_type="text/event-stream") | |
| async def news_assistant(query: QueryModel, api_key: str = Depends(verify_api_key)): | |
| """ | |
| News assistant endpoint that provides summaries and analysis of recent news based on user queries. | |
| Requires API Key authentication via X-API-Key header. | |
| """ | |
| response_generator = search_assistant_api(query.query, "news", model=query.model_id) | |
| return create_streaming_response(response_generator) | |
| async def search_assistant(query: QueryModel, api_key: str = Depends(verify_api_key)): | |
| """ | |
| Search assistant endpoint that provides summaries and analysis of web search results based on user queries. | |
| Requires API Key authentication via X-API-Key header. | |
| """ | |
| response_generator = search_assistant_api(query.query, "web", model=query.model_id) | |
| return create_streaming_response(response_generator) | |
| from pydantic import BaseModel, Field | |
| import yaml | |
| import json | |
| from yaml.loader import SafeLoader | |
| class FollowupQueryModel(BaseModel): | |
| query: str = Field(..., description="User's query for the followup agent") | |
| model_id: ModelID = Field( | |
| default="openai/gpt-4o-mini", | |
| description="ID of the model to use for response generation" | |
| ) | |
| conversation_id: str = Field(default_factory=lambda: str(uuid4()), description="Unique identifier for the conversation") | |
| user_id: str = Field(..., description="Unique identifier for the user") | |
| tool_call: Literal["web", "news", "auto"] = Field( | |
| default="auto", | |
| description="Type of tool to call (web, news, auto)" | |
| ) | |
| class Config: | |
| schema_extra = { | |
| "example": { | |
| "query": "How can I improve my productivity?", | |
| "model_id": "openai/gpt-4o-mini", | |
| "conversation_id": "123e4567-e89b-12d3-a456-426614174000", | |
| "user_id": "user123", | |
| "tool_call": "auto" | |
| } | |
| } | |
| import re | |
| def parse_followup_and_tools(input_text): | |
| # Remove extra brackets and excess quotes | |
| cleaned_text = re.sub(r'\[|\]|"+', ' ', input_text) | |
| # Extract response content | |
| response_pattern = re.compile(r'<response>(.*?)</response>', re.DOTALL) | |
| response_parts = response_pattern.findall(cleaned_text) | |
| combined_response = ' '.join(response_parts) | |
| # Normalize spaces in the combined response | |
| combined_response = ' '.join(combined_response.split()) | |
| parsed_interacts = [] | |
| parsed_tools = [] | |
| # Parse interacts and tools | |
| blocks = re.finditer(r'<(interact|tools?)(.*?)>(.*?)</\1>', cleaned_text, re.DOTALL) | |
| for block in blocks: | |
| block_type, _, content = block.groups() | |
| content = content.strip() | |
| if block_type == 'interact': | |
| question_blocks = re.split(r'\s*-\s*text:', content)[1:] | |
| for qblock in question_blocks: | |
| parts = re.split(r'\s*options:\s*', qblock, maxsplit=1) | |
| if len(parts) == 2: | |
| question = ' '.join(parts[0].split()) # Normalize spaces | |
| options = [' '.join(opt.split()) for opt in re.split(r'\s*-\s*', parts[1]) if opt.strip()] | |
| parsed_interacts.append({'question': question, 'options': options}) | |
| elif block_type.startswith('tool'): # This will match both 'tool' and 'tools' | |
| tool_match = re.search(r'text:\s*(.*?)\s*options:\s*-\s*(.*)', content, re.DOTALL) | |
| if tool_match: | |
| tool_name = ' '.join(tool_match.group(1).split()) # Normalize spaces | |
| option = ' '.join(tool_match.group(2).split()) # Normalize spaces | |
| parsed_tools.append({'name': tool_name, 'input': option}) | |
| return combined_response, parsed_interacts, parsed_tools | |
| async def followup_agent(query: FollowupQueryModel, background_tasks: BackgroundTasks, api_key: str = Depends(verify_api_key)): | |
| """ | |
| Followup agent endpoint that provides helpful responses or generates clarifying questions based on user queries. | |
| Requires API Key authentication via X-API-Key header. | |
| """ | |
| logger.info(f"Received followup agent query: {query.query}") | |
| if query.conversation_id not in conversations: | |
| conversations[query.conversation_id] = [ | |
| {"role": "system", "content": FOLLOWUP_AGENT_PROMPT} | |
| ] | |
| conversations[query.conversation_id].append({"role": "user", "content": query.query}) | |
| last_activity[query.conversation_id] = time.time() | |
| # Limit tokens in the conversation history | |
| limited_conversation = conversations[query.conversation_id] | |
| def process_response(): | |
| full_response = "" | |
| for content in chat_with_llama_stream(limited_conversation, model=query.model_id): | |
| full_response += content | |
| yield content | |
| logger.info(f"LLM RAW response for query: {query.query}: {full_response}") | |
| response_content, interact,tools = parse_followup_and_tools(full_response) | |
| result = { | |
| "response": response_content, | |
| "clarification": interact | |
| } | |
| yield "\n\n" + json.dumps(result) | |
| # Add the assistant's response to the conversation history | |
| conversations[query.conversation_id].append({"role": "assistant", "content": full_response}) | |
| background_tasks.add_task(update_db, query.user_id, query.conversation_id, query.query, full_response) | |
| logger.info(f"Completed followup agent response for query: {query.query}, send result: {result}") | |
| return StreamingResponse(process_response(), media_type="text/event-stream") | |
| async def followup_agent(query: FollowupQueryModel, background_tasks: BackgroundTasks, api_key: str = Depends(verify_api_key)): | |
| """ | |
| Followup agent endpoint that provides helpful responses or generates clarifying questions based on user queries. | |
| Requires API Key authentication via X-API-Key header. | |
| """ | |
| logger.info(f"Received followup agent query: {query.query}") | |
| if query.conversation_id not in conversations: | |
| conversations[query.conversation_id] = [ | |
| {"role": "system", "content": FOLLOWUP_AGENT_PROMPT} | |
| ] | |
| conversations[query.conversation_id].append({"role": "user", "content": query.query}) | |
| last_activity[query.conversation_id] = time.time() | |
| # Limit tokens in the conversation history | |
| limited_conversation = conversations[query.conversation_id] | |
| def process_response(): | |
| full_response = "" | |
| for content in chat_with_llama_stream(limited_conversation, model=query.model_id): | |
| full_response += content | |
| yield content | |
| logger.info(f"LLM RAW response for query: {query.query}: {full_response}") | |
| response_content, interact,tools = parse_followup_and_tools(full_response) | |
| result = { | |
| "clarification": interact | |
| } | |
| yield "\n<json>" | |
| yield json.dumps(result) | |
| # Add the assistant's response to the conversation history | |
| conversations[query.conversation_id].append({"role": "assistant", "content": full_response}) | |
| background_tasks.add_task(update_db, query.user_id, query.conversation_id, query.query, full_response) | |
| logger.info(f"Completed followup agent response for query: {query.query}, send result: {result}") | |
| return StreamingResponse(process_response(), media_type="text/event-stream") | |
| def followup_agent(query: FollowupQueryModel, background_tasks: BackgroundTasks, api_key: str = Depends(verify_api_key)): | |
| """ | |
| Followup agent endpoint that provides helpful responses or generates clarifying questions based on user queries. | |
| Requires API Key authentication via X-API-Key header. | |
| """ | |
| logger.info(f"Received followup agent query: {query.query}") | |
| if query.conversation_id not in conversations: | |
| conversations[query.conversation_id] = [ | |
| {"role": "system", "content": MULTI_AGENT_PROMPT_V2} | |
| ] | |
| conversations[query.conversation_id].append({"role": "user", "content": query.query}) | |
| last_activity[query.conversation_id] = time.time() | |
| # Limit tokens in the conversation history | |
| limited_conversation = conversations[query.conversation_id] | |
| def process_response(): | |
| full_response = "" | |
| result = dict() | |
| # Check if tool_call is specified and call the tool directly | |
| if query.tool_call in ["web", "news"]: | |
| search_query = query.query | |
| search_response = search_assistant_api(search_query, query.tool_call, model=query.model_id) | |
| yield "<report>" | |
| for content in search_response(): | |
| yield content | |
| full_response += content | |
| yield "</report>" | |
| else: | |
| for content in chat_with_llama_stream(limited_conversation, model=query.model_id): | |
| yield content | |
| full_response += content | |
| logger.info(f"LLM RAW response for query: {query.query}: {full_response}") | |
| response_content, interact, tools = parse_followup_and_tools(full_response) | |
| result = { | |
| "clarification": interact, | |
| "tools": tools | |
| } | |
| yield "<json>"+ json.dumps(result)+"</json>" | |
| # Process tool if present | |
| if tools and len(tools) > 0: | |
| tool = tools[0] # Assume only one tool is present | |
| if tool["name"] in ["news", "web"]: | |
| search_query = tool["input"] | |
| search_response = search_assistant_api(search_query, tool["name"], model=query.model_id) | |
| yield "<report>" | |
| for content in search_response(): | |
| yield content | |
| full_response += content | |
| yield "</report>" | |
| # Add the assistant's response to the conversation history | |
| conversations[query.conversation_id].append({"role": "assistant", "content": full_response}) | |
| background_tasks.add_task(update_db, query.user_id, query.conversation_id, query.query, full_response) | |
| logger.info(f"Completed followup agent response for query: {query.query}, send result:{result}, Full response: {full_response}") | |
| return StreamingResponse(process_response(), media_type="text/event-stream") | |
| async def followup_agent(query: FollowupQueryModel, background_tasks: BackgroundTasks, api_key: str = Depends(verify_api_key)): | |
| """ | |
| Followup agent endpoint that provides helpful responses or generates clarifying questions based on user queries. | |
| Requires API Key authentication via X-API-Key header. | |
| """ | |
| logger.info(f"Received followup agent query: {query.query}") | |
| if query.conversation_id not in conversations: | |
| conversations[query.conversation_id] = [ | |
| {"role": "system", "content": FOLLOWUP_AGENT_PROMPT} | |
| ] | |
| conversations[query.conversation_id].append({"role": "user", "content": query.query}) | |
| last_activity[query.conversation_id] = time.time() | |
| # Limit tokens in the conversation history | |
| limited_conversation = conversations[query.conversation_id] | |
| async def process_response(): | |
| yield "<followup-response>\n\n" | |
| full_response = "" | |
| for content in chat_with_llama_stream(limited_conversation, model=query.model_id): | |
| full_response += content | |
| yield content | |
| yield "</followup-response>\n\n" | |
| logger.info(f"LLM RAW response for query: {query.query}: {full_response}") | |
| # Add a slight delay after sending the full LLM response | |
| await asyncio.sleep(0.01) | |
| response_content, interact, tools = parse_followup_and_tools(full_response) | |
| result = { | |
| "clarification": interact | |
| } | |
| yield "<followup-json>\n\n" | |
| yield json.dumps(result) + "\n\n" | |
| yield "</followup-json>\n\n" | |
| # Add the assistant's response to the conversation history | |
| conversations[query.conversation_id].append({"role": "assistant", "content": full_response}) | |
| background_tasks.add_task(update_db, query.user_id, query.conversation_id, query.query, full_response) | |
| logger.info(f"Completed followup agent response for query: {query.query}, send result: {result}") | |
| return StreamingResponse(process_response(), media_type="text/event-stream") | |
| async def followup_agent_v4(query: FollowupQueryModel, background_tasks: BackgroundTasks, api_key: str = Depends(verify_api_key)): | |
| """ | |
| Followup agent endpoint that provides helpful responses or generates clarifying questions based on user queries. | |
| Requires API Key authentication via X-API-Key header. | |
| """ | |
| logger.info(f"Received followup agent query: {query.query}") | |
| if query.conversation_id not in conversations: | |
| conversations[query.conversation_id] = [ | |
| {"role": "system", "content": FOLLOWUP_AGENT_PROMPT} | |
| ] | |
| conversations[query.conversation_id].append({"role": "user", "content": query.query}) | |
| last_activity[query.conversation_id] = time.time() | |
| # Limit tokens in the conversation history | |
| limited_conversation = conversations[query.conversation_id] | |
| async def process_response(): | |
| yield "<followup-response>"+"\n" | |
| full_response = "" | |
| for content in chat_with_llama_stream(limited_conversation, model=query.model_id): | |
| full_response += content | |
| yield content | |
| yield "</followup-response>"+"\n" | |
| yield "--END_SECTION--\n" | |
| logger.info(f"LLM RAW response for query: {query.query}: {full_response}") | |
| response_content, interact, tools = parse_followup_and_tools(full_response) | |
| result = { | |
| "clarification": interact | |
| } | |
| yield "<followup-json>" + "\n" | |
| yield json.dumps(result) + "\n" | |
| yield "</followup-json>" +"\n" | |
| yield "--END_SECTION--\n" | |
| # Add the assistant's response to the conversation history | |
| conversations[query.conversation_id].append({"role": "assistant", "content": full_response}) | |
| background_tasks.add_task(update_db, query.user_id, query.conversation_id, query.query, full_response) | |
| logger.info(f"Completed followup agent response for query: {query.query}, send result: {result}") | |
| return StreamingResponse(process_response(), media_type="text/event-stream") | |
| ## Digiyatra | |
| async def followup_agent(query: FollowupQueryModel, background_tasks: BackgroundTasks, api_key: str = Depends(verify_api_key)): | |
| """ | |
| Followup agent endpoint that provides helpful responses or generates clarifying questions based on user queries. | |
| Requires API Key authentication via X-API-Key header. | |
| """ | |
| logger.info(f"Received followup agent query: {query.query}") | |
| if query.conversation_id not in conversations: | |
| conversations[query.conversation_id] = [ | |
| {"role": "system", "content": FOLLOWUP_DIGIYATRA_PROMPT} | |
| ] | |
| conversations[query.conversation_id].append({"role": "user", "content": query.query}) | |
| last_activity[query.conversation_id] = time.time() | |
| # Limit tokens in the conversation history | |
| limited_conversation = conversations[query.conversation_id] | |
| def process_response(): | |
| full_response = "" | |
| for content in chat_with_llama_stream(limited_conversation, model=query.model_id): | |
| full_response += content | |
| yield content | |
| logger.info(f"LLM RAW response for query: {query.query}: {full_response}") | |
| response_content, interact,tools = parse_followup_and_tools(full_response) | |
| result = { | |
| "response": response_content, | |
| "clarification": interact | |
| } | |
| yield "\n\n" + json.dumps(result) | |
| # Add the assistant's response to the conversation history | |
| conversations[query.conversation_id].append({"role": "assistant", "content": full_response}) | |
| background_tasks.add_task(update_db, query.user_id, query.conversation_id, query.query, full_response) | |
| logger.info(f"Completed followup agent response for query: {query.query}, send result: {result}") | |
| return StreamingResponse(process_response(), media_type="text/event-stream") | |
| async def digi_followup_agent_v2(query: FollowupQueryModel, background_tasks: BackgroundTasks, api_key: str = Depends(verify_api_key)): | |
| """ | |
| Followup agent endpoint that provides helpful responses or generates clarifying questions based on user queries. | |
| Requires API Key authentication via X-API-Key header. | |
| """ | |
| logger.info(f"Received followup agent query: {query.query}") | |
| if query.conversation_id not in conversations: | |
| conversations[query.conversation_id] = [ | |
| {"role": "system", "content": FOLLOWUP_DIGIYATRA_PROMPT} | |
| ] | |
| conversations[query.conversation_id].append({"role": "user", "content": query.query}) | |
| last_activity[query.conversation_id] = time.time() | |
| # Limit tokens in the conversation history | |
| limited_conversation = conversations[query.conversation_id] | |
| def process_response(): | |
| full_response = "" | |
| for content in chat_with_llama_stream(limited_conversation, model=query.model_id): | |
| full_response += content | |
| yield json.dumps({"type": "response","content": content}) + "\n" | |
| logger.info(f"LLM RAW response for query: {query.query}: {full_response}") | |
| response_content, interact,tools = parse_followup_and_tools(full_response) | |
| result = { | |
| "response": response_content, | |
| "clarification": interact | |
| } | |
| yield json.dumps({"type": "interact","content": result}) +"\n" | |
| # Add the assistant's response to the conversation history | |
| conversations[query.conversation_id].append({"role": "assistant", "content": full_response}) | |
| background_tasks.add_task(update_db, query.user_id, query.conversation_id, query.query, full_response) | |
| logger.info(f"Completed followup agent response for query: {query.query}, send result: {result}") | |
| return StreamingResponse(process_response(), media_type="text/event-stream") | |
| from document_generator import router as document_generator_router | |
| app.include_router(document_generator_router, prefix="/api/v1") | |
| from fastapi.middleware.cors import CORSMiddleware | |
| # CORS middleware setup | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=[ | |
| "http://127.0.0.1:5501/", | |
| "http://localhost:3000", | |
| "https://www.elevaticsai.com", | |
| "https://www.elevatics.cloud", | |
| "https://www.elevatics.online", | |
| "https://www.elevatics.ai", | |
| "https://elevaticsai.com", | |
| "https://elevatics.cloud", | |
| "https://elevatics.online", | |
| "https://elevatics.ai", | |
| "https://pvanand-specialized-agents.hf.space", | |
| "https://pvanand-general-chat.hf.space" | |
| ], | |
| allow_credentials=True, | |
| allow_methods=["GET", "POST"], | |
| allow_headers=["*"], | |
| ) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| logger.info("Starting the application") | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |