File size: 6,058 Bytes
fe5c39d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import base64
import os.path
import traceback
import uuid
from pathlib import Path
from typing import Optional

import aioboto3
import aiofiles

from metagpt.config2 import S3Config
from metagpt.const import BASE64_FORMAT
from metagpt.logs import logger


class S3:
    """A class for interacting with Amazon S3 storage."""

    def __init__(self, config: S3Config):
        self.session = aioboto3.Session()
        self.config = config
        self.auth_config = {
            "service_name": "s3",
            "aws_access_key_id": config.access_key,
            "aws_secret_access_key": config.secret_key,
            "endpoint_url": config.endpoint,
        }

    async def upload_file(
        self,
        bucket: str,
        local_path: str,
        object_name: str,
    ) -> None:
        """Upload a file from the local path to the specified path of the storage bucket specified in s3.

        Args:
            bucket: The name of the S3 storage bucket.
            local_path: The local file path, including the file name.
            object_name: The complete path of the uploaded file to be stored in S3, including the file name.

        Raises:
            Exception: If an error occurs during the upload process, an exception is raised.
        """
        try:
            async with self.session.client(**self.auth_config) as client:
                async with aiofiles.open(local_path, mode="rb") as reader:
                    body = await reader.read()
                    await client.put_object(Body=body, Bucket=bucket, Key=object_name)
                    logger.info(f"Successfully uploaded the file to path {object_name} in bucket {bucket} of s3.")
        except Exception as e:
            logger.error(f"Failed to upload the file to path {object_name} in bucket {bucket} of s3: {e}")
            raise e

    async def get_object_url(
        self,
        bucket: str,
        object_name: str,
    ) -> str:
        """Get the URL for a downloadable or preview file stored in the specified S3 bucket.

        Args:
            bucket: The name of the S3 storage bucket.
            object_name: The complete path of the file stored in S3, including the file name.

        Returns:
            The URL for the downloadable or preview file.

        Raises:
            Exception: If an error occurs while retrieving the URL, an exception is raised.
        """
        try:
            async with self.session.client(**self.auth_config) as client:
                file = await client.get_object(Bucket=bucket, Key=object_name)
                return str(file["Body"].url)
        except Exception as e:
            logger.error(f"Failed to get the url for a downloadable or preview file: {e}")
            raise e

    async def get_object(
        self,
        bucket: str,
        object_name: str,
    ) -> bytes:
        """Get the binary data of a file stored in the specified S3 bucket.

        Args:
            bucket: The name of the S3 storage bucket.
            object_name: The complete path of the file stored in S3, including the file name.

        Returns:
            The binary data of the requested file.

        Raises:
            Exception: If an error occurs while retrieving the file data, an exception is raised.
        """
        try:
            async with self.session.client(**self.auth_config) as client:
                s3_object = await client.get_object(Bucket=bucket, Key=object_name)
                return await s3_object["Body"].read()
        except Exception as e:
            logger.error(f"Failed to get the binary data of the file: {e}")
            raise e

    async def download_file(
        self, bucket: str, object_name: str, local_path: str, chunk_size: Optional[int] = 128 * 1024
    ) -> None:
        """Download an S3 object to a local file.

        Args:
            bucket: The name of the S3 storage bucket.
            object_name: The complete path of the file stored in S3, including the file name.
            local_path: The local file path where the S3 object will be downloaded.
            chunk_size: The size of data chunks to read and write at a time. Default is 128 KB.

        Raises:
            Exception: If an error occurs during the download process, an exception is raised.
        """
        try:
            async with self.session.client(**self.auth_config) as client:
                s3_object = await client.get_object(Bucket=bucket, Key=object_name)
                stream = s3_object["Body"]
                async with aiofiles.open(local_path, mode="wb") as writer:
                    while True:
                        file_data = await stream.read(chunk_size)
                        if not file_data:
                            break
                        await writer.write(file_data)
        except Exception as e:
            logger.error(f"Failed to download the file from S3: {e}")
            raise e

    async def cache(self, data: str, file_ext: str, format: str = "") -> str:
        """Save data to remote S3 and return url"""
        object_name = uuid.uuid4().hex + file_ext
        path = Path(__file__).parent
        pathname = path / object_name
        try:
            async with aiofiles.open(str(pathname), mode="wb") as file:
                data = base64.b64decode(data) if format == BASE64_FORMAT else data.encode(encoding="utf-8")
                await file.write(data)

            bucket = self.config.bucket
            object_pathname = self.config.bucket or "system"
            object_pathname += f"/{object_name}"
            object_pathname = os.path.normpath(object_pathname)
            await self.upload_file(bucket=bucket, local_path=str(pathname), object_name=object_pathname)
            pathname.unlink(missing_ok=True)

            return await self.get_object_url(bucket=bucket, object_name=object_pathname)
        except Exception as e:
            logger.exception(f"{e}, stack:{traceback.format_exc()}")
            pathname.unlink(missing_ok=True)
            return None