from fastapi import FastAPI, HTTPException from pydantic import BaseModel import httpx import requests import os from dotenv import load_dotenv from typing import List, Optional, Dict, Any import json load_dotenv() OPENROUTER_API_KEY = os.getenv("OPEN_ROUTER_API_KEY") PERSONALITY_URL = os.getenv("PERSONALITY_URL") app = FastAPI(title="ER5 v1.0.0") class ChatMessage(BaseModel): role: str content: str class ChatRequest(BaseModel): messages: List[ChatMessage]=[] temperature: Optional[float] = 0.7 max_tokens: Optional[int] = 1000 user_preferences: Optional[Dict[str, Any]] = None context: Optional[str] = "" personality: Optional[str] = None class ChatResponse(BaseModel): response: str context: Optional[str] = None def build_system_message(preferences: Dict[str, Any]) -> dict: """Create a system message based on schemaless user preferences""" if not preferences: return {"role": "system", "content": "You are a helpful assistant."} preferences_str = "\n".join( f"- {key}: {value if not isinstance(value, (list, dict)) else json.dumps(value)}" for key, value in preferences.items() ) system_content = f"""You are a helpful assistant. Please consider these user preferences in your responses: {preferences_str} Tailor your responses to align with these user preferences and characteristics.""" return {"role": "system", "content": system_content} def parse_context(context_str: str) -> List[dict]: """Parse context string into messages list""" if not context_str: return [] try: return json.loads(context_str) except json.JSONDecodeError: return [] async def call_openrouter(messages: List[dict], temperature: float, max_tokens: int): url = "https://openrouter.ai/api/v1/chat/completions" headers = { "Authorization": f"Bearer {OPENROUTER_API_KEY}", "Content-Type": "application/json" } payload = { "model": "qwen/qwen-2.5-7b-instruct", "messages": messages, "temperature": temperature, "max_tokens": max_tokens } async with httpx.AsyncClient() as client: response = await client.post(url, json=payload, headers=headers) if response.status_code != 200: raise HTTPException(status_code=response.status_code, detail=f"OpenRouter API error: {response.text}") return response.json()['choices'][0]['message']['content'] # @app.post("/chat", response_model=ChatResponse) async def chat_endpoint(request: ChatRequest): try: include_preferences = await should_include_preferences_llm(request.messages[-1].content) messages = [] if include_preferences and request.user_preferences: messages.append(build_system_message(request.user_preferences)) context_messages = parse_context(request.context) messages.extend(context_messages) messages.extend([{"role": msg.role, "content": msg.content} for msg in request.messages]) messages = messages[-10:] print(messages) response = await decide_chat_api( messages=messages, temperature=request.temperature, max_tokens=request.max_tokens, personality=request.personality ) assistant_message = response context_messages = messages[1:] if include_preferences else messages context_messages.append({"role": "assistant", "content": assistant_message}) updated_context = json.dumps(context_messages[-10:]) return ChatResponse(response=assistant_message, context=updated_context) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) async def should_include_preferences_llm(query: str) -> bool: """ Use the LLM to decide if user preferences should be included based on the query. """ url = "https://openrouter.ai/api/v1/chat/completions" headers = { "Authorization": f"Bearer {OPENROUTER_API_KEY}", "Content-Type": "application/json" } prompt = ( "You are an assistant that classifies user queries. " "Given the query below, decide if user-specific context or preferences are relevant. " "Respond with 'true' if any kind of personal context or preferences should be included in the response, " "and 'false' otherwise.\n\n" f"Query: {query}\n\n" "Is this query related to personal context such as user preferences, lifestyle, habits, or any other factors " "that could affect the response?" ) payload = { "model": "qwen/qwen-2.5-7b-instruct", "messages": [{"role": "user", "content": prompt}], "temperature": 0.0, "max_tokens": 5 } async with httpx.AsyncClient() as client: response = await client.post(url, json=payload, headers=headers) if response.status_code != 200: raise HTTPException(status_code=response.status_code, detail=f"OpenRouter API error: {response.text}") llm_response = response.json()['choices'][0]['message']['content'].strip().lower() return llm_response == "true" async def call_personality_api(messages: List[dict], personality: str): """ Call the external Personality API to handle chat completions. """ url = PERSONALITY_URL + "/chat" message_str = "\n".join([msg['content'] for msg in messages]) if messages else "" payload = { "message": message_str, "personality": personality } payload = { "message": message_str, "personality": 'humanish' } response = requests.post(url, json=payload) if response.status_code != 200: raise HTTPException(status_code=response.status_code, detail=f"Personality API error: {response.text}") response_json = response.json() if "response" in response_json: return response_json["response"] async def decide_chat_api(messages: List[dict], temperature: float, max_tokens: int,personality:str): """ This function decides which API to call based on the presence of the PERSONALITY_URL environment variable. """ if PERSONALITY_URL and PERSONALITY_URL.strip() and personality: print("yes") return await call_personality_api(messages,personality) else: return await call_openrouter(messages, temperature, max_tokens) # if __name__ == "__main__": # import uvicorn # uvicorn.run(app, host="0.0.0.0", port=8000)