import asyncio
import datetime
import json
import os
from functools import partial
from pathlib import Path
from typing import Any, Coroutine, Optional

import aiofiles
from aiobotocore.session import get_session
from mdutils.mdutils import MdUtils
from metagpt.actions import Action
from metagpt.actions.action_output import ActionOutput
from metagpt.actions.design_api import WriteDesign
from metagpt.actions.prepare_documents import PrepareDocuments
from metagpt.actions.project_management import WriteTasks
from metagpt.actions.summarize_code import SummarizeCode
from metagpt.actions.write_code import WriteCode
from metagpt.actions.write_prd import WritePRD
from metagpt.config import CONFIG
from metagpt.const import (
    COMPETITIVE_ANALYSIS_FILE_REPO,
    DATA_API_DESIGN_FILE_REPO,
    SEQ_FLOW_FILE_REPO,
    SERDESER_PATH,
)
from metagpt.roles import Architect, Engineer, ProductManager, ProjectManager, Role
from metagpt.schema import Message
from metagpt.team import Team
from metagpt.utils.common import any_to_str, read_json_file, write_json_file
from metagpt.utils.git_repository import GitRepository
from pydantic import BaseModel, Field
from zipstream import AioZipStream

_default_llm_stream_log = partial(print, end="")


class PackInfo(BaseModel):
    url: str


class RoleRun(Action):
    role: Role

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        action = self.role.rc.todo
        self.desc = f"{self.role.profile} {action.desc or str(action)}"


class PackProject(Action):
    role: Role

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.desc = "Pack the project with prd, design, code and more."

    async def run(self, key: str):
        url = await self.upload(key)
        info = PackInfo(url=url)
        mdfile = MdUtils(None)
        mdfile.new_line(mdfile.new_inline_link(url, url.rsplit("/", 1)[-1]))
        return ActionOutput(mdfile.get_md_text(), info)

    async def upload(self, key: str):
        files = []
        workspace = CONFIG.git_repo.workdir
        workspace = str(workspace)
        for r, _, fs in os.walk(workspace):
            _r = r[len(workspace) :].lstrip("/")
            for f in fs:
                files.append({"file": os.path.join(r, f), "name": os.path.join(_r, f)})
        # aiozipstream
        chunks = []
        async for chunk in AioZipStream(files, chunksize=32768).stream():
            chunks.append(chunk)
        return await get_download_url(b"".join(chunks), key)


class SoftwareCompany(Role):
    """封装软件公司成角色,以快速接入agent store。"""

    finish: bool = False
    company: Team = Field(default_factory=Team)
    active_role: Optional[Role] = None
    git_repo: Optional[GitRepository] = None
    max_auto_summarize_code: int = 0

    def __init__(self, use_code_review=False, *args, **kwargs):
        super().__init__(*args, **kwargs)
        engineer = Engineer(n_borg=5, use_code_review=use_code_review)
        self.company.hire([ProductManager(), Architect(), ProjectManager(), engineer])
        self._init_actions([PackProject(role=engineer)])

    def recv(self, message: Message) -> None:
        self.company.run_project(message.content)

    async def _think(self) -> Coroutine[Any, Any, bool]:
        """软件公司运行需要4轮

        BOSS            -> ProductManager -> Architect   -> ProjectManager -> Engineer
        BossRequirement -> WritePRD       -> WriteDesign -> WriteTasks     -> WriteCode ->
        """
        if self.finish:
            self.rc.todo = None
            return False

        if self.git_repo is not None:
            CONFIG.git_repo = self.git_repo

        environment = self.company.env
        for role in environment.roles.values():
            if await role._observe():
                await role._think()
                if isinstance(role.rc.todo, PrepareDocuments):
                    self.active_role = role
                    await self.act()
                    self.git_repo = CONFIG.git_repo
                    return await self._think()

                if isinstance(role.rc.todo, SummarizeCode):
                    return await self._think()

                self.rc.todo = RoleRun(role=role)
                self.active_role = role
                return True

        self._set_state(0)
        return True

    async def _act(self) -> Message:
        if self.git_repo is not None:
            CONFIG.git_repo = self.git_repo
            CONFIG.src_workspace = CONFIG.git_repo.workdir / CONFIG.git_repo.workdir.name
            CONFIG.max_auto_summarize_code = self.max_auto_summarize_code

        if isinstance(self.rc.todo, PackProject):
            workdir = CONFIG.git_repo.workdir
            name = workdir.name
            uid = workdir.parent.name
            now = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
            key = f"{uid}/metagpt-{name}-{now}.zip"
            output = await self.rc.todo.run(key)
            self.finish = True
            return Message(output.content, role=self.profile, cause_by=type(self.rc.todo))

        default_log_stream = CONFIG.get("LLM_STREAM_LOG", _default_llm_stream_log)

        start = False
        insert_code = False

        def log_stream(msg):
            nonlocal start, insert_code
            if not start:
                if msg.startswith("["):
                    msg = "```json\n" + msg
                    insert_code = True
                start = True
            return default_log_stream(msg)

        CONFIG.LLM_STREAM_LOG = log_stream

        output = await self.active_role._act()
        self.active_role._set_state(state=-1)
        self.active_role.publish_message(output)

        if insert_code:
            default_log_stream("\n```\n")

        cause_by = output.cause_by

        if cause_by == any_to_str(WritePRD):
            output = await self.format_prd(output)
        elif cause_by == any_to_str(WriteDesign):
            output = await self.format_system_design(output)
        elif cause_by == any_to_str(WriteTasks):
            output = await self.format_tasks(output)
        elif cause_by == any_to_str(WriteCode):
            output = await self.format_code(output)
        elif cause_by == any_to_str(SummarizeCode):
            output = await self.format_code_summary(output)
        return output

    async def format_prd(self, msg: Message):
        docs = [(k, v) for k, v in msg.instruct_content.docs.items()]
        prd_doc = docs[0][1]
        data = json.loads(prd_doc.content)

        mdfile = MdUtils(None)
        title = "Original Requirements"
        mdfile.new_header(2, title, add_table_of_contents=False)
        mdfile.new_paragraph(data[title])

        title = "Product Goals"
        mdfile.new_header(2, title, add_table_of_contents=False)
        mdfile.new_list(data[title], marked_with="1")

        title = "User Stories"
        mdfile.new_header(2, title, add_table_of_contents=False)
        mdfile.new_list(data[title], marked_with="1")

        title = "Competitive Analysis"
        mdfile.new_header(2, title, add_table_of_contents=False)
        if all(i.count(":") == 1 for i in data[title]):
            mdfile.new_table(
                2, len(data[title]) + 1, ["Competitor", "Description", *(i for j in data[title] for i in j.split(":"))]
            )
        else:
            mdfile.new_list(data[title], marked_with="1")

        title = "Competitive Quadrant Chart"
        mdfile.new_header(2, title, add_table_of_contents=False)
        competitive_analysis_path = (
            CONFIG.git_repo.workdir / Path(COMPETITIVE_ANALYSIS_FILE_REPO) / Path(prd_doc.filename).with_suffix(".png")
        )

        if competitive_analysis_path.exists():
            key = str(competitive_analysis_path.relative_to(CONFIG.git_repo.workdir.parent.parent))
            url = await upload_file_to_s3(competitive_analysis_path, key)
            mdfile.new_line(mdfile.new_inline_image(title, url))
        else:
            mdfile.insert_code(data[title], "mermaid")

        title = "Requirement Analysis"
        mdfile.new_header(2, title, add_table_of_contents=False)
        mdfile.new_paragraph(data[title])

        title = "Requirement Pool"
        mdfile.new_header(2, title, add_table_of_contents=False)
        mdfile.new_table(
            2, len(data[title]) + 1, ["Task Description", "Priority", *(i for j in data[title] for i in j)]
        )

        title = "UI Design draft"
        mdfile.new_header(2, title, add_table_of_contents=False)
        mdfile.new_paragraph(data[title])

        title = "Anything UNCLEAR"
        mdfile.new_header(2, title, add_table_of_contents=False)
        mdfile.new_paragraph(data[title])
        content = mdfile.get_md_text()
        return Message(content, cause_by=msg.cause_by, role=msg.role)

    async def format_system_design(self, msg: Message):
        system_designs = [(k, v) for k, v in msg.instruct_content.docs.items()]
        system_design_doc = system_designs[0][1]
        data = json.loads(system_design_doc.content)

        mdfile = MdUtils(None)

        title = "Implementation approach"
        mdfile.new_header(2, title, add_table_of_contents=False)
        mdfile.new_paragraph(data[title])

        title = "File list"
        mdfile.new_header(2, title, add_table_of_contents=False)
        mdfile.new_list(data[title], marked_with="1")

        title = "Data structures and interfaces"
        mdfile.new_header(2, title, add_table_of_contents=False)

        data_api_design_path = (
            CONFIG.git_repo.workdir
            / Path(DATA_API_DESIGN_FILE_REPO)
            / Path(system_design_doc.filename).with_suffix(".png")
        )
        if data_api_design_path.exists():
            key = str(data_api_design_path.relative_to(CONFIG.git_repo.workdir.parent.parent))
            url = await upload_file_to_s3(data_api_design_path, key)
            mdfile.new_line(mdfile.new_inline_image(title, url))
        else:
            mdfile.insert_code(data[title], "mermaid")

        title = "Program call flow"
        mdfile.new_header(2, title, add_table_of_contents=False)
        seq_flow_path = (
            CONFIG.git_repo.workdir / SEQ_FLOW_FILE_REPO / Path(system_design_doc.filename).with_suffix(".png")
        )
        if seq_flow_path.exists():
            key = str(seq_flow_path.relative_to(CONFIG.git_repo.workdir.parent.parent))
            url = await upload_file_to_s3(seq_flow_path, key)
            mdfile.new_line(mdfile.new_inline_image(title, url))
        else:
            mdfile.insert_code(data[title], "mermaid")

        title = "Anything UNCLEAR"
        mdfile.new_header(2, title, add_table_of_contents=False)
        mdfile.new_paragraph(data[title])
        content = mdfile.get_md_text()
        return Message(content, cause_by=msg.cause_by, role=msg.role)

    async def format_tasks(self, msg: Message):
        tasks = [(k, v) for k, v in msg.instruct_content.docs.items()]
        task_doc = tasks[0][1]
        data = json.loads(task_doc.content)

        mdfile = MdUtils(None)
        title = "Required Python packages"
        mdfile.new_header(2, title, add_table_of_contents=False)
        mdfile.insert_code("\n".join(data[title]), "txt")

        title = "Required Other language third-party packages"
        mdfile.new_header(2, title, add_table_of_contents=False)
        mdfile.insert_code("\n".join(data[title]), "txt")

        title = "Logic Analysis"
        mdfile.new_header(2, title, add_table_of_contents=False)
        mdfile.new_table(
            2, len(data[title]) + 1, ["Filename", "Class/Function Name", *(i for j in data[title] for i in j)]
        )

        title = "Task list"
        mdfile.new_header(2, title, add_table_of_contents=False)
        mdfile.new_list(data[title])

        title = "Full API spec"
        mdfile.new_header(2, title, add_table_of_contents=False)
        if data[title]:
            mdfile.insert_code(data[title], "json")

        title = "Shared Knowledge"
        mdfile.new_header(2, title, add_table_of_contents=False)
        mdfile.insert_code(data[title], "python")

        title = "Anything UNCLEAR"
        mdfile.new_header(2, title, add_table_of_contents=False)
        mdfile.insert_code(data[title], "python")
        content = mdfile.get_md_text()
        return Message(content, cause_by=msg.cause_by, role=msg.role)

    async def format_code(self, msg: Message):
        data = msg.content.splitlines()
        workdir = CONFIG.git_repo.workdir
        code_root = workdir / workdir.name

        mdfile = MdUtils(None)

        for filename in data:
            mdfile.new_header(2, filename, add_table_of_contents=False)
            async with aiofiles.open(code_root / filename) as f:
                content = await f.read()
            suffix = filename.rsplit(".", maxsplit=1)[-1]
            mdfile.insert_code(content, "python" if suffix == "py" else suffix)
        return Message(mdfile.get_md_text(), cause_by=msg.cause_by, role=msg.role)

    async def format_code_summary(self, msg: Message):
        # TODO
        return msg

    async def think(self):
        await self._think()
        return self.rc.todo

    async def act(self):
        return await self._act()

    def serialize(self, stg_path: Path = None):
        stg_path = SERDESER_PATH.joinpath("software_company") if stg_path is None else stg_path

        team_info_path = stg_path.joinpath("software_company_info.json")
        write_json_file(team_info_path, self.model_dump(exclude={"company": True}))

        self.company.serialize(stg_path.joinpath("company"))  # save company alone

    @classmethod
    def deserialize(cls, stg_path: Path) -> "Team":
        """stg_path = ./storage/team"""
        # recover team_info
        software_company_info_path = stg_path.joinpath("software_company_info.json")
        if not software_company_info_path.exists():
            raise FileNotFoundError(
                "recover storage meta file `team_info.json` not exist, "
                "not to recover and please start a new project."
            )

        software_company_info: dict = read_json_file(software_company_info_path)

        # recover environment
        company = Team.deserialize(stg_path=stg_path.joinpath("company"))
        software_company_info.update({"company": company})

        return cls(**software_company_info)


async def upload_file_to_s3(filepath: str, key: str):
    async with aiofiles.open(filepath, "rb") as f:
        content = await f.read()
        return await get_download_url(content, key)


async def get_download_url(content: bytes, key: str) -> str:
    if CONFIG.get("STORAGE_TYPE") == "S3":
        session = get_session()
        async with session.create_client(
            "s3",
            aws_secret_access_key=CONFIG.get("S3_SECRET_KEY"),
            aws_access_key_id=CONFIG.get("S3_ACCESS_KEY"),
            endpoint_url=CONFIG.get("S3_ENDPOINT_URL"),
            use_ssl=CONFIG.get("S3_SECURE"),
        ) as client:
            # upload object to amazon s3
            bucket = CONFIG.get("S3_BUCKET")
            await client.put_object(Bucket=bucket, Key=key, Body=content)
            return f"{CONFIG.get('S3_ENDPOINT_URL')}/{bucket}/{key}"
    else:
        storage = CONFIG.get("LOCAL_ROOT", "storage")
        base_url = CONFIG.get("LOCAL_BASE_URL", "storage")
        filepath = Path(storage) / key
        filepath.parent.mkdir(exist_ok=True, parents=True)
        async with aiofiles.open(filepath, "wb") as f:
            await f.write(content)
        return f"{base_url}/{key}"


async def main(idea, **kwargs):
    sc = SoftwareCompany(**kwargs)
    sc.recv(Message(idea))
    while await sc.think():
        print(await sc.act())


if __name__ == "__main__":
    asyncio.run(main())