ER5_EUI_Fetching_Demo / existing_solution.py
balatechner's picture
Hardcoded changes
735f3aa verified
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import httpx
import requests
import os
from dotenv import load_dotenv
from typing import List, Optional, Dict, Any
import json
load_dotenv()
OPENROUTER_API_KEY = os.getenv("OPEN_ROUTER_API_KEY")
PERSONALITY_URL = os.getenv("PERSONALITY_URL")
app = FastAPI(title="ER5 v1.0.0")
class ChatMessage(BaseModel):
role: str
content: str
class ChatRequest(BaseModel):
messages: List[ChatMessage]=[]
temperature: Optional[float] = 0.7
max_tokens: Optional[int] = 1000
user_preferences: Optional[Dict[str, Any]] = None
context: Optional[str] = ""
personality: Optional[str] = None
class ChatResponse(BaseModel):
response: str
context: Optional[str] = None
def build_system_message(preferences: Dict[str, Any]) -> dict:
"""Create a system message based on schemaless user preferences"""
if not preferences:
return {"role": "system", "content": "You are a helpful assistant."}
preferences_str = "\n".join(
f"- {key}: {value if not isinstance(value, (list, dict)) else json.dumps(value)}"
for key, value in preferences.items()
)
system_content = f"""You are a helpful assistant. Please consider these user preferences in your responses:
{preferences_str}
Tailor your responses to align with these user preferences and characteristics."""
return {"role": "system", "content": system_content}
def parse_context(context_str: str) -> List[dict]:
"""Parse context string into messages list"""
if not context_str:
return []
try:
return json.loads(context_str)
except json.JSONDecodeError:
return []
async def call_openrouter(messages: List[dict], temperature: float, max_tokens: int):
url = "https://openrouter.ai/api/v1/chat/completions"
headers = {
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
"Content-Type": "application/json"
}
payload = {
"model": "qwen/qwen-2.5-7b-instruct",
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens
}
async with httpx.AsyncClient() as client:
response = await client.post(url, json=payload, headers=headers)
if response.status_code != 200:
raise HTTPException(status_code=response.status_code,
detail=f"OpenRouter API error: {response.text}")
return response.json()['choices'][0]['message']['content']
# @app.post("/chat", response_model=ChatResponse)
async def chat_endpoint(request: ChatRequest):
try:
include_preferences = await should_include_preferences_llm(request.messages[-1].content)
messages = []
if include_preferences and request.user_preferences:
messages.append(build_system_message(request.user_preferences))
context_messages = parse_context(request.context)
messages.extend(context_messages)
messages.extend([{"role": msg.role, "content": msg.content}
for msg in request.messages])
messages = messages[-10:]
print(messages)
response = await decide_chat_api(
messages=messages,
temperature=request.temperature,
max_tokens=request.max_tokens,
personality=request.personality
)
assistant_message = response
context_messages = messages[1:] if include_preferences else messages
context_messages.append({"role": "assistant", "content": assistant_message})
updated_context = json.dumps(context_messages[-10:])
return ChatResponse(response=assistant_message, context=updated_context)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
async def should_include_preferences_llm(query: str) -> bool:
"""
Use the LLM to decide if user preferences should be included based on the query.
"""
url = "https://openrouter.ai/api/v1/chat/completions"
headers = {
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
"Content-Type": "application/json"
}
prompt = (
"You are an assistant that classifies user queries. "
"Given the query below, decide if user-specific context or preferences are relevant. "
"Respond with 'true' if any kind of personal context or preferences should be included in the response, "
"and 'false' otherwise.\n\n"
f"Query: {query}\n\n"
"Is this query related to personal context such as user preferences, lifestyle, habits, or any other factors "
"that could affect the response?"
)
payload = {
"model": "qwen/qwen-2.5-7b-instruct",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.0,
"max_tokens": 5
}
async with httpx.AsyncClient() as client:
response = await client.post(url, json=payload, headers=headers)
if response.status_code != 200:
raise HTTPException(status_code=response.status_code,
detail=f"OpenRouter API error: {response.text}")
llm_response = response.json()['choices'][0]['message']['content'].strip().lower()
return llm_response == "true"
async def call_personality_api(messages: List[dict], personality: str):
"""
Call the external Personality API to handle chat completions.
"""
url = PERSONALITY_URL + "/chat"
message_str = "\n".join([msg['content'] for msg in messages]) if messages else ""
payload = {
"message": message_str,
"personality": personality
}
payload = {
"message": message_str,
"personality": 'humanish'
}
response = requests.post(url, json=payload)
if response.status_code != 200:
raise HTTPException(status_code=response.status_code,
detail=f"Personality API error: {response.text}")
response_json = response.json()
if "response" in response_json:
return response_json["response"]
async def decide_chat_api(messages: List[dict], temperature: float, max_tokens: int,personality:str):
"""
This function decides which API to call based on the presence of the PERSONALITY_URL environment variable.
"""
if PERSONALITY_URL and PERSONALITY_URL.strip() and personality:
print("yes")
return await call_personality_api(messages,personality)
else:
return await call_openrouter(messages, temperature, max_tokens)
# if __name__ == "__main__":
# import uvicorn
# uvicorn.run(app, host="0.0.0.0", port=8000)