Spaces:
Running
Running
import base64 | |
import os.path | |
import traceback | |
import uuid | |
from pathlib import Path | |
from typing import Optional | |
import aioboto3 | |
import aiofiles | |
from metagpt.config2 import S3Config | |
from metagpt.const import BASE64_FORMAT | |
from metagpt.logs import logger | |
class S3: | |
"""A class for interacting with Amazon S3 storage.""" | |
def __init__(self, config: S3Config): | |
self.session = aioboto3.Session() | |
self.config = config | |
self.auth_config = { | |
"service_name": "s3", | |
"aws_access_key_id": config.access_key, | |
"aws_secret_access_key": config.secret_key, | |
"endpoint_url": config.endpoint, | |
} | |
async def upload_file( | |
self, | |
bucket: str, | |
local_path: str, | |
object_name: str, | |
) -> None: | |
"""Upload a file from the local path to the specified path of the storage bucket specified in s3. | |
Args: | |
bucket: The name of the S3 storage bucket. | |
local_path: The local file path, including the file name. | |
object_name: The complete path of the uploaded file to be stored in S3, including the file name. | |
Raises: | |
Exception: If an error occurs during the upload process, an exception is raised. | |
""" | |
try: | |
async with self.session.client(**self.auth_config) as client: | |
async with aiofiles.open(local_path, mode="rb") as reader: | |
body = await reader.read() | |
await client.put_object(Body=body, Bucket=bucket, Key=object_name) | |
logger.info(f"Successfully uploaded the file to path {object_name} in bucket {bucket} of s3.") | |
except Exception as e: | |
logger.error(f"Failed to upload the file to path {object_name} in bucket {bucket} of s3: {e}") | |
raise e | |
async def get_object_url( | |
self, | |
bucket: str, | |
object_name: str, | |
) -> str: | |
"""Get the URL for a downloadable or preview file stored in the specified S3 bucket. | |
Args: | |
bucket: The name of the S3 storage bucket. | |
object_name: The complete path of the file stored in S3, including the file name. | |
Returns: | |
The URL for the downloadable or preview file. | |
Raises: | |
Exception: If an error occurs while retrieving the URL, an exception is raised. | |
""" | |
try: | |
async with self.session.client(**self.auth_config) as client: | |
file = await client.get_object(Bucket=bucket, Key=object_name) | |
return str(file["Body"].url) | |
except Exception as e: | |
logger.error(f"Failed to get the url for a downloadable or preview file: {e}") | |
raise e | |
async def get_object( | |
self, | |
bucket: str, | |
object_name: str, | |
) -> bytes: | |
"""Get the binary data of a file stored in the specified S3 bucket. | |
Args: | |
bucket: The name of the S3 storage bucket. | |
object_name: The complete path of the file stored in S3, including the file name. | |
Returns: | |
The binary data of the requested file. | |
Raises: | |
Exception: If an error occurs while retrieving the file data, an exception is raised. | |
""" | |
try: | |
async with self.session.client(**self.auth_config) as client: | |
s3_object = await client.get_object(Bucket=bucket, Key=object_name) | |
return await s3_object["Body"].read() | |
except Exception as e: | |
logger.error(f"Failed to get the binary data of the file: {e}") | |
raise e | |
async def download_file( | |
self, bucket: str, object_name: str, local_path: str, chunk_size: Optional[int] = 128 * 1024 | |
) -> None: | |
"""Download an S3 object to a local file. | |
Args: | |
bucket: The name of the S3 storage bucket. | |
object_name: The complete path of the file stored in S3, including the file name. | |
local_path: The local file path where the S3 object will be downloaded. | |
chunk_size: The size of data chunks to read and write at a time. Default is 128 KB. | |
Raises: | |
Exception: If an error occurs during the download process, an exception is raised. | |
""" | |
try: | |
async with self.session.client(**self.auth_config) as client: | |
s3_object = await client.get_object(Bucket=bucket, Key=object_name) | |
stream = s3_object["Body"] | |
async with aiofiles.open(local_path, mode="wb") as writer: | |
while True: | |
file_data = await stream.read(chunk_size) | |
if not file_data: | |
break | |
await writer.write(file_data) | |
except Exception as e: | |
logger.error(f"Failed to download the file from S3: {e}") | |
raise e | |
async def cache(self, data: str, file_ext: str, format: str = "") -> str: | |
"""Save data to remote S3 and return url""" | |
object_name = uuid.uuid4().hex + file_ext | |
path = Path(__file__).parent | |
pathname = path / object_name | |
try: | |
async with aiofiles.open(str(pathname), mode="wb") as file: | |
data = base64.b64decode(data) if format == BASE64_FORMAT else data.encode(encoding="utf-8") | |
await file.write(data) | |
bucket = self.config.bucket | |
object_pathname = self.config.bucket or "system" | |
object_pathname += f"/{object_name}" | |
object_pathname = os.path.normpath(object_pathname) | |
await self.upload_file(bucket=bucket, local_path=str(pathname), object_name=object_pathname) | |
pathname.unlink(missing_ok=True) | |
return await self.get_object_url(bucket=bucket, object_name=object_pathname) | |
except Exception as e: | |
logger.exception(f"{e}, stack:{traceback.format_exc()}") | |
pathname.unlink(missing_ok=True) | |
return None | |