Spaces:
Runtime error
Runtime error
#!/usr/bin/python3 | |
# -*- coding: utf-8 -*- | |
import asyncio | |
import urllib.parse | |
from datetime import datetime | |
import uuid | |
from enum import Enum | |
from fastapi import FastAPI, Request, HTTPException | |
from fastapi.responses import StreamingResponse, RedirectResponse | |
from fastapi.staticfiles import StaticFiles | |
import fire | |
from pydantic import BaseModel, Field | |
import uvicorn | |
from typing import Any, Optional | |
from metagpt import Message | |
from metagpt.actions.action import Action | |
from metagpt.actions.action_output import ActionOutput | |
from metagpt.config import CONFIG | |
from metagpt.roles.software_company import RoleRun, SoftwareCompany | |
class QueryAnswerType(Enum): | |
Query = "Q" | |
Answer = "A" | |
class SentenceType(Enum): | |
TEXT = "text" | |
HIHT = "hint" | |
ACTION = "action" | |
class MessageStatus(Enum): | |
COMPLETE = "complete" | |
class SentenceValue(BaseModel): | |
answer: str | |
class Sentence(BaseModel): | |
type: str | |
id: Optional[str] = None | |
value: SentenceValue | |
is_finished: Optional[bool] = None | |
class Sentences(BaseModel): | |
id: Optional[str] = None | |
action: Optional[str] = None | |
role: Optional[str] = None | |
skill: Optional[str] = None | |
description: Optional[str] = None | |
timestamp: str = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f%z") | |
status: str | |
contents: list[dict] | |
class NewMsg(BaseModel): | |
"""Chat with MetaGPT""" | |
query: str = Field(description="Problem description") | |
config: dict[str, Any] = Field(description="Configuration information") | |
class ErrorInfo(BaseModel): | |
error: str = None | |
traceback: str = None | |
class ThinkActStep(BaseModel): | |
id: str | |
status: str | |
title: str | |
timestamp: str | |
description: str | |
content: Sentence = None | |
class ThinkActPrompt(BaseModel): | |
message_id: int = None | |
timestamp: str = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f%z") | |
step: ThinkActStep = None | |
skill: Optional[str] = None | |
role: Optional[str] = None | |
def update_think(self, tc_id, action: Action): | |
self.step = ThinkActStep( | |
id=str(tc_id), | |
status="running", | |
title=action.desc, | |
timestamp=datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f%z"), | |
description=action.desc, | |
) | |
def update_act(self, message: ActionOutput): | |
self.step.status = "finish" | |
self.step.content = Sentence( | |
type="text", | |
id=ThinkActPrompt.guid32(), | |
value=SentenceValue(answer=message.content), | |
is_finished=True, | |
) | |
def guid32(): | |
return str(uuid.uuid4()).replace("-", "")[0:32] | |
def prompt(self): | |
v = self.json(exclude_unset=True) | |
return urllib.parse.quote(v) | |
class MessageJsonModel(BaseModel): | |
steps: list[Sentences] | |
qa_type: str | |
created_at: datetime = datetime.now() | |
query_time: datetime = datetime.now() | |
answer_time: datetime = datetime.now() | |
score: Optional[int] = None | |
feedback: Optional[str] = None | |
def add_think_act(self, think_act_prompt: ThinkActPrompt): | |
s = Sentences( | |
action=think_act_prompt.step.title, | |
skill=think_act_prompt.skill, | |
description=think_act_prompt.step.description, | |
timestamp=think_act_prompt.timestamp, | |
status=think_act_prompt.step.status, | |
contents=[think_act_prompt.step.content.dict()], | |
) | |
self.steps.append(s) | |
def prompt(self): | |
v = self.json(exclude_unset=True) | |
return urllib.parse.quote(v) | |
async def create_message(req_model: NewMsg, request: Request): | |
""" | |
Session message stream | |
""" | |
config = {k.upper(): v for k, v in req_model.config.items()} | |
CONFIG.set_context(config) | |
role = SoftwareCompany() | |
role.recv(message=Message(content=req_model.query)) | |
answer = MessageJsonModel( | |
steps=[ | |
Sentences( | |
contents=[ | |
Sentence(type=SentenceType.TEXT.value, value=SentenceValue(answer=req_model.query), is_finished=True) | |
], | |
status=MessageStatus.COMPLETE.value, | |
) | |
], | |
qa_type=QueryAnswerType.Answer.value, | |
) | |
tc_id = 0 | |
while True: | |
tc_id += 1 | |
if request and await request.is_disconnected(): | |
return | |
think_result: RoleRun = await role.think() | |
if not think_result: # End of conversion | |
break | |
think_act_prompt = ThinkActPrompt(role=think_result.role.profile) | |
think_act_prompt.update_think(tc_id, think_result) | |
yield think_act_prompt.prompt + "\n\n" | |
act_result = await role.act() | |
think_act_prompt.update_act(act_result) | |
yield think_act_prompt.prompt + "\n\n" | |
answer.add_think_act(think_act_prompt) | |
yield answer.prompt + "\n\n" # Notify the front-end that the message is complete. | |
class ChatHandler: | |
async def create_message(req_model: NewMsg, request: Request): | |
"""Message stream, using SSE.""" | |
event = create_message(req_model, request) | |
headers = {"Cache-Control": "no-cache", "Connection": "keep-alive"} | |
return StreamingResponse(event, headers=headers, media_type="text/event-stream") | |
app = FastAPI() | |
app.mount( | |
"/static", | |
StaticFiles(directory="./metagpt/static/", check_dir=True), | |
name="static", | |
) | |
app.add_api_route( | |
"/api/messages", | |
endpoint=ChatHandler.create_message, | |
methods=["post"], | |
summary="Session message sending (streaming response)", | |
) | |
async def catch_all(request: Request): | |
if request.url.path == "/": | |
return RedirectResponse(url="/static/index.html") | |
if request.url.path.startswith("/api"): | |
raise HTTPException(status_code=404) | |
new_path = f"/static{request.url.path}" | |
return RedirectResponse(url=new_path) | |
def main(): | |
uvicorn.run(app="__main__:app", host="0.0.0.0", port=7860) | |
if __name__ == "__main__": | |
fire.Fire(main) | |