# # Copyright 2025 The InfiniFlow Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import logging import boto3 from botocore.exceptions import ClientError import time from io import BytesIO from rag.utils import singleton from rag import settings @singleton class RAGFlowS3(object): def __init__(self): self.conn = None self.s3_config = settings.S3 self.access_key = self.s3_config.get('access_key', None) self.secret_key = self.s3_config.get('secret_key', None) self.region = self.s3_config.get('region', None) self.__open__() def __open__(self): try: if self.conn: self.__close__() except Exception: pass try: self.conn = boto3.client( 's3', region_name=self.region, aws_access_key_id=self.access_key, aws_secret_access_key=self.secret_key ) except Exception: logging.exception(f"Fail to connect at region {self.region}") def __close__(self): del self.conn self.conn = None def bucket_exists(self, bucket): try: logging.debug(f"head_bucket bucketname {bucket}") self.conn.head_bucket(Bucket=bucket) exists = True except ClientError: logging.exception(f"head_bucket error {bucket}") exists = False return exists def health(self): bucket, fnm, binary = "txtxtxtxt1", "txtxtxtxt1", b"_t@@@1" if not self.bucket_exists(bucket): self.conn.create_bucket(Bucket=bucket) logging.debug(f"create bucket {bucket} ********") r = self.conn.upload_fileobj(BytesIO(binary), bucket, fnm) return r def get_properties(self, bucket, key): return {} def list(self, bucket, dir, recursive=True): return [] def put(self, bucket, fnm, binary): logging.debug(f"bucket name {bucket}; filename :{fnm}:") for _ in range(1): try: if not self.bucket_exists(bucket): self.conn.create_bucket(Bucket=bucket) logging.info(f"create bucket {bucket} ********") r = self.conn.upload_fileobj(BytesIO(binary), bucket, fnm) return r except Exception: logging.exception(f"Fail put {bucket}/{fnm}") self.__open__() time.sleep(1) def rm(self, bucket, fnm): try: self.conn.delete_object(Bucket=bucket, Key=fnm) except Exception: logging.exception(f"Fail rm {bucket}/{fnm}") def get(self, bucket, fnm): for _ in range(1): try: r = self.conn.get_object(Bucket=bucket, Key=fnm) object_data = r['Body'].read() return object_data except Exception: logging.exception(f"fail get {bucket}/{fnm}") self.__open__() time.sleep(1) return def obj_exist(self, bucket, fnm): try: if self.conn.head_object(Bucket=bucket, Key=fnm): return True except ClientError as e: if e.response['Error']['Code'] == '404': return False else: raise def get_presigned_url(self, bucket, fnm, expires): for _ in range(10): try: r = self.conn.generate_presigned_url('get_object', Params={'Bucket': bucket, 'Key': fnm}, ExpiresIn=expires) return r except Exception: logging.exception(f"fail get url {bucket}/{fnm}") self.__open__() time.sleep(1) return