|
from fastapi import FastAPI, HTTPException
|
|
from pydantic import BaseModel
|
|
import httpx
|
|
import requests
|
|
import os
|
|
from dotenv import load_dotenv
|
|
from typing import List, Optional, Dict, Any
|
|
import json
|
|
|
|
|
|
load_dotenv()
|
|
|
|
|
|
OPENROUTER_API_KEY = os.getenv("OPEN_ROUTER_API_KEY")
|
|
PERSONALITY_URL = os.getenv("PERSONALITY_URL")
|
|
app = FastAPI(title="ER5 v1.0.0")
|
|
|
|
class ChatMessage(BaseModel):
|
|
role: str
|
|
content: str
|
|
|
|
|
|
class ChatRequest(BaseModel):
|
|
messages: List[ChatMessage]=[]
|
|
temperature: Optional[float] = 0.7
|
|
max_tokens: Optional[int] = 1000
|
|
user_preferences: Optional[Dict[str, Any]] = None
|
|
context: Optional[str] = ""
|
|
personality: Optional[str] = None
|
|
|
|
class ChatResponse(BaseModel):
|
|
response: str
|
|
context: Optional[str] = None
|
|
|
|
def build_system_message(preferences: Dict[str, Any]) -> dict:
|
|
"""Create a system message based on schemaless user preferences"""
|
|
if not preferences:
|
|
return {"role": "system", "content": "You are a helpful assistant."}
|
|
|
|
preferences_str = "\n".join(
|
|
f"- {key}: {value if not isinstance(value, (list, dict)) else json.dumps(value)}"
|
|
for key, value in preferences.items()
|
|
)
|
|
|
|
system_content = f"""You are a helpful assistant. Please consider these user preferences in your responses:
|
|
|
|
{preferences_str}
|
|
|
|
Tailor your responses to align with these user preferences and characteristics."""
|
|
|
|
return {"role": "system", "content": system_content}
|
|
|
|
def parse_context(context_str: str) -> List[dict]:
|
|
"""Parse context string into messages list"""
|
|
if not context_str:
|
|
return []
|
|
try:
|
|
return json.loads(context_str)
|
|
except json.JSONDecodeError:
|
|
return []
|
|
|
|
|
|
|
|
async def call_openrouter(messages: List[dict], temperature: float, max_tokens: int):
|
|
url = "https://openrouter.ai/api/v1/chat/completions"
|
|
|
|
headers = {
|
|
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
payload = {
|
|
"model": "qwen/qwen-2.5-7b-instruct",
|
|
"messages": messages,
|
|
"temperature": temperature,
|
|
"max_tokens": max_tokens
|
|
}
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.post(url, json=payload, headers=headers)
|
|
|
|
if response.status_code != 200:
|
|
raise HTTPException(status_code=response.status_code,
|
|
detail=f"OpenRouter API error: {response.text}")
|
|
|
|
return response.json()['choices'][0]['message']['content']
|
|
|
|
async def chat_endpoint(request: ChatRequest):
|
|
try:
|
|
|
|
|
|
include_preferences = await should_include_preferences_llm(request.messages[-1].content)
|
|
|
|
|
|
messages = []
|
|
|
|
if include_preferences and request.user_preferences:
|
|
messages.append(build_system_message(request.user_preferences))
|
|
|
|
|
|
context_messages = parse_context(request.context)
|
|
messages.extend(context_messages)
|
|
|
|
|
|
messages.extend([{"role": msg.role, "content": msg.content}
|
|
for msg in request.messages])
|
|
|
|
|
|
messages = messages[-10:]
|
|
|
|
print(messages)
|
|
|
|
response = await decide_chat_api(
|
|
messages=messages,
|
|
temperature=request.temperature,
|
|
max_tokens=request.max_tokens,
|
|
personality=request.personality
|
|
)
|
|
|
|
assistant_message = response
|
|
|
|
|
|
context_messages = messages[1:] if include_preferences else messages
|
|
context_messages.append({"role": "assistant", "content": assistant_message})
|
|
updated_context = json.dumps(context_messages[-10:])
|
|
|
|
return ChatResponse(response=assistant_message, context=updated_context)
|
|
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
async def should_include_preferences_llm(query: str) -> bool:
|
|
"""
|
|
Use the LLM to decide if user preferences should be included based on the query.
|
|
"""
|
|
url = "https://openrouter.ai/api/v1/chat/completions"
|
|
headers = {
|
|
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
|
|
prompt = (
|
|
"You are an assistant that classifies user queries. "
|
|
"Given the query below, decide if user-specific context or preferences are relevant. "
|
|
"Respond with 'true' if any kind of personal context or preferences should be included in the response, "
|
|
"and 'false' otherwise.\n\n"
|
|
f"Query: {query}\n\n"
|
|
"Is this query related to personal context such as user preferences, lifestyle, habits, or any other factors "
|
|
"that could affect the response?"
|
|
)
|
|
|
|
payload = {
|
|
"model": "qwen/qwen-2.5-7b-instruct",
|
|
"messages": [{"role": "user", "content": prompt}],
|
|
"temperature": 0.0,
|
|
"max_tokens": 5
|
|
}
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.post(url, json=payload, headers=headers)
|
|
|
|
if response.status_code != 200:
|
|
raise HTTPException(status_code=response.status_code,
|
|
detail=f"OpenRouter API error: {response.text}")
|
|
|
|
|
|
llm_response = response.json()['choices'][0]['message']['content'].strip().lower()
|
|
return llm_response == "true"
|
|
|
|
async def call_personality_api(messages: List[dict], personality: str):
|
|
"""
|
|
Call the external Personality API to handle chat completions.
|
|
"""
|
|
url = PERSONALITY_URL + "/chat"
|
|
message_str = "\n".join([msg['content'] for msg in messages]) if messages else ""
|
|
|
|
payload = {
|
|
"message": message_str,
|
|
"personality": personality
|
|
}
|
|
payload = {
|
|
"message": message_str,
|
|
"personality": 'humanish'
|
|
}
|
|
|
|
response = requests.post(url, json=payload)
|
|
if response.status_code != 200:
|
|
raise HTTPException(status_code=response.status_code,
|
|
detail=f"Personality API error: {response.text}")
|
|
|
|
response_json = response.json()
|
|
if "response" in response_json:
|
|
return response_json["response"]
|
|
|
|
|
|
async def decide_chat_api(messages: List[dict], temperature: float, max_tokens: int,personality:str):
|
|
"""
|
|
This function decides which API to call based on the presence of the PERSONALITY_URL environment variable.
|
|
"""
|
|
if PERSONALITY_URL and PERSONALITY_URL.strip() and personality:
|
|
print("yes")
|
|
return await call_personality_api(messages,personality)
|
|
else:
|
|
|
|
return await call_openrouter(messages, temperature, max_tokens)
|
|
|
|
|
|
|
|
|
|
|