Spaces:
Sleeping
Sleeping
| ## Digiyatra | |
| from fastapi import FastAPI, Depends, BackgroundTasks, HTTPException, APIRouter, Query, Header | |
| from pydantic import BaseModel | |
| from typing import List, Dict, Optional, Union, Annotated, Any, get_args | |
| from openai import AsyncOpenAI | |
| from observability import LLMObservabilityManager, log_execution, logger | |
| from aiclient import DatabaseManager, AIClient | |
| from limit_tokens import trim_messages_openai | |
| from prompts import FOLLOWUP_DIGIYATRA_PROMPT | |
| from utils import parse_followup_and_tools | |
| from sse_starlette.sse import EventSourceResponse | |
| ## | |
| from fastapi import FastAPI, HTTPException, Depends, Security, BackgroundTasks | |
| from fastapi.security import APIKeyHeader | |
| from fastapi.responses import StreamingResponse | |
| from pydantic import BaseModel, Field | |
| from typing import Literal, List, Dict | |
| import os | |
| from functools import lru_cache | |
| from openai import OpenAI | |
| from uuid import uuid4 | |
| import tiktoken | |
| import sqlite3 | |
| import time | |
| from datetime import datetime, timedelta | |
| import pandas as pd | |
| import requests | |
| import json | |
| import os | |
| from pydantic import BaseModel, Field | |
| import yaml | |
| import json | |
| from yaml.loader import SafeLoader | |
| app = FastAPI( | |
| title="Digiyatra Chatbot", | |
| description="Digiyatra Chatbot", | |
| version="1.0.0", | |
| tags=["chat"], | |
| contact={ | |
| "name": "Digiyatra", | |
| "url": "https://digiyatra.com", | |
| "email": "[email protected]" | |
| } | |
| ) | |
| from observability_router import router as observability_router | |
| from rag_routerv2 import router as rag_router, query_table, QueryTableResponse, get_db_connection | |
| app.include_router(observability_router) | |
| app.include_router(rag_router) | |
| # SQLite setup | |
| DB_PATH = '/data/conversations.db' | |
| def init_db(): | |
| logger.info("Initializing database") | |
| os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) | |
| conn = sqlite3.connect(DB_PATH) | |
| c = conn.cursor() | |
| c.execute('''CREATE TABLE IF NOT EXISTS conversations | |
| (id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| user_id TEXT, | |
| conversation_id TEXT, | |
| message TEXT, | |
| response TEXT, | |
| timestamp DATETIME DEFAULT CURRENT_TIMESTAMP)''') | |
| conn.commit() | |
| conn.close() | |
| logger.info("Database initialized successfully") | |
| # In-memory storage for conversations | |
| conversations: Dict[str, List[Dict[str, str]]] = {} | |
| last_activity: Dict[str, float] = {} | |
| from aiclient import AIClient | |
| client = AIClient() | |
| def update_db(user_id, conversation_id, message, response): | |
| logger.info(f"Updating database for conversation: {conversation_id}") | |
| conn = sqlite3.connect(DB_PATH) | |
| c = conn.cursor() | |
| c.execute('''INSERT INTO conversations (user_id, conversation_id, message, response) | |
| VALUES (?, ?, ?, ?)''', (user_id, conversation_id, message, response)) | |
| conn.commit() | |
| conn.close() | |
| logger.info("Database updated successfully") | |
| ModelID = Literal[ | |
| "openai/gpt-4o-mini", | |
| "openai/chatgpt-4o-latest", | |
| "deepseek/deepseek-chat", | |
| "anthropic/claude-3.5-sonnet", | |
| "anthropic/claude-3-haiku", | |
| "meta-llama/llama-3.3-70b-instruct", | |
| "google/gemini-2.0-flash-exp:free", | |
| "google/gemini-flash-1.5", | |
| "meta-llama/llama-3-70b-instruct", | |
| "meta-llama/llama-3-8b-instruct", | |
| ] | |
| class FollowupQueryModel(BaseModel): | |
| query: str = Field(..., description="User's query for the followup agent") | |
| model_id: ModelID = Field( | |
| default="openai/gpt-4o-mini", | |
| description="ID of the model to use for response generation" | |
| ) | |
| conversation_id: str = Field(default_factory=lambda: str(uuid4()), description="Unique identifier for the conversation") | |
| user_id: str = Field(..., description="Unique identifier for the user") | |
| table_id: Optional[str] = Field(None, description="Optional knowledge base table identifier for the query") | |
| class Config: | |
| schema_extra = { | |
| "example": { | |
| "query": "How can I improve my productivity?", | |
| "model_id": "openai/gpt-4o-mini", | |
| "conversation_id": "123e4567-e89b-12d3-a456-426614174000", | |
| "user_id": "user123", | |
| "table_id":"digiyatra" | |
| } | |
| } | |
| async def digiyatra_query_table(query: FollowupQueryModel, db: Annotated[Any, Depends(get_db_connection)], limit: Optional[int] = 5): | |
| """Query the digiyatra table.""" | |
| if query.table_id is None: | |
| query.table_id = "digiyatra" | |
| response = await query_table( | |
| table_id=query.table_id, | |
| query=query.query, | |
| user_id=query.user_id, | |
| limit=limit | |
| ) | |
| return response.results['data'][0]['text'] | |
| async def followup_agent(query: FollowupQueryModel, background_tasks: BackgroundTasks): | |
| """ | |
| Followup agent endpoint that provides helpful responses or generates clarifying questions based on user queries. | |
| Requires API Key authentication via X-API-Key header. | |
| """ | |
| try: | |
| logger.info(f"Received followup agent query: {query.query}") | |
| if query.conversation_id not in conversations: | |
| conversations[query.conversation_id] = [ | |
| {"role": "system", "content": FOLLOWUP_DIGIYATRA_PROMPT} | |
| ] | |
| digiyatra_response = await digiyatra_query_table(query, db=get_db_connection(), limit=5) | |
| user_query_with_context = f"{query.query} \n\n FAQ Context for ANSWERING: {digiyatra_response}" | |
| conversations[query.conversation_id].append({"role": "user", "content": user_query_with_context}) | |
| last_activity[query.conversation_id] = time.time() | |
| # Limit tokens in the conversation history | |
| limited_conversation = conversations[query.conversation_id] | |
| async def process_response(): | |
| try: | |
| full_response = "" | |
| async for content in client.generate_response(limited_conversation, model=query.model_id, conversation_id=query.conversation_id, user=query.user_id): | |
| full_response += content | |
| yield f"{json.dumps({'type': 'token', 'content': content})}" | |
| logger.info(f"LLM RAW response for query: {query.query}: {full_response}") | |
| response_content, interact, tools = parse_followup_and_tools(full_response) | |
| result = { | |
| "response": response_content, | |
| "clarification": interact | |
| } | |
| yield f"{json.dumps({'type': 'metadata', 'response_full': result})}" | |
| # Add the assistant's response to the conversation history | |
| conversations[query.conversation_id].append({"role": "assistant", "content": full_response}) | |
| background_tasks.add_task(update_db, query.user_id, query.conversation_id, query.query, full_response) | |
| logger.info(f"Completed followup agent response for query: {query.query}, send result: {result}") | |
| except Exception as e: | |
| logger.error(f"Error during response processing: {str(e)}") | |
| yield f"{json.dumps({'type': 'error', 'message': 'An error occurred while processing the response.'})}" | |
| return EventSourceResponse(process_response(), media_type="text/event-stream") | |
| except Exception as e: | |
| logger.error(f"Error in followup_agent: {str(e)}") | |
| raise HTTPException(status_code=500, detail="An error occurred while processing the followup agent request.") | |
| class ModelsResponse(BaseModel): | |
| models: List[str] | |
| def get_models(): | |
| """ | |
| Endpoint to return a list of available model IDs dynamically extracted from the ModelID Literal. | |
| """ | |
| try: | |
| # Dynamically extract the literal values of ModelID | |
| model_list = list(get_args(ModelID)) | |
| return ModelsResponse(models=model_list) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"An error occurred while fetching the models: {str(e)}") | |
| from fastapi.middleware.cors import CORSMiddleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| def startup(): | |
| logger.info("Starting up the application") | |
| init_db() | |
| def shutdown(): | |
| logger.info("Shutting down the application") | |
| # import uvicorn | |
| # if __name__ == "__main__": | |
| # uvicorn.run( | |
| # "app:app", | |
| # host="0.0.0.0", | |
| # port=8000, | |
| # workers=4, | |
| # reload=False, | |
| # access_log=False | |
| # ) |