fredbai Kevin Hu commited on
Commit
4d0b8a7
·
1 Parent(s): f9e19e4

Storage: Support the s3, azure blob as the object storage of ragflow. (#2278)

Browse files

### What problem does this PR solve?

issue: https://github.com/infiniflow/ragflow/issues/2277

_Briefly describe what this PR aims to solve. Include background context
that will help reviewers understand the purpose of the PR._

### Type of change

- [ ] Bug Fix (non-breaking change which fixes an issue)
- [x] New Feature (non-breaking change which adds functionality)
- [ ] Documentation Update
- [ ] Refactoring
- [ ] Performance Improvement
- [ ] Other (please describe):

Co-authored-by: Kevin Hu <[email protected]>

api/apps/api_app.py CHANGED
@@ -39,7 +39,7 @@ from itsdangerous import URLSafeTimedSerializer
39
 
40
  from api.utils.file_utils import filename_type, thumbnail
41
  from rag.nlp import keyword_extraction
42
- from rag.utils.minio_conn import MINIO
43
 
44
  from api.db.services.canvas_service import CanvasTemplateService, UserCanvasService
45
  from agent.canvas import Canvas
@@ -427,10 +427,10 @@ def upload():
427
  retmsg="This type of file has not been supported yet!")
428
 
429
  location = filename
430
- while MINIO.obj_exist(kb_id, location):
431
  location += "_"
432
  blob = request.files['file'].read()
433
- MINIO.put(kb_id, location, blob)
434
  doc = {
435
  "id": get_uuid(),
436
  "kb_id": kb.id,
@@ -650,7 +650,7 @@ def document_rm():
650
  FileService.filter_delete([File.source_type == FileSource.KNOWLEDGEBASE, File.id == f2d[0].file_id])
651
  File2DocumentService.delete_by_document_id(doc_id)
652
 
653
- MINIO.rm(b, n)
654
  except Exception as e:
655
  errors += str(e)
656
 
@@ -723,7 +723,7 @@ def completion_faq():
723
  if ans["reference"]["chunks"][chunk_idx]["img_id"]:
724
  try:
725
  bkt, nm = ans["reference"]["chunks"][chunk_idx]["img_id"].split("-")
726
- response = MINIO.get(bkt, nm)
727
  data_type_picture["url"] = base64.b64encode(response).decode('utf-8')
728
  data.append(data_type_picture)
729
  break
 
39
 
40
  from api.utils.file_utils import filename_type, thumbnail
41
  from rag.nlp import keyword_extraction
42
+ from rag.utils.storage_factory import STORAGE_IMPL
43
 
44
  from api.db.services.canvas_service import CanvasTemplateService, UserCanvasService
45
  from agent.canvas import Canvas
 
427
  retmsg="This type of file has not been supported yet!")
428
 
429
  location = filename
430
+ while STORAGE_IMPL.obj_exist(kb_id, location):
431
  location += "_"
432
  blob = request.files['file'].read()
433
+ STORAGE_IMPL.put(kb_id, location, blob)
434
  doc = {
435
  "id": get_uuid(),
436
  "kb_id": kb.id,
 
650
  FileService.filter_delete([File.source_type == FileSource.KNOWLEDGEBASE, File.id == f2d[0].file_id])
651
  File2DocumentService.delete_by_document_id(doc_id)
652
 
653
+ STORAGE_IMPL.rm(b, n)
654
  except Exception as e:
655
  errors += str(e)
656
 
 
723
  if ans["reference"]["chunks"][chunk_idx]["img_id"]:
724
  try:
725
  bkt, nm = ans["reference"]["chunks"][chunk_idx]["img_id"].split("-")
726
+ response = STORAGE_IMPL.get(bkt, nm)
727
  data_type_picture["url"] = base64.b64encode(response).decode('utf-8')
728
  data.append(data_type_picture)
729
  break
api/apps/dataset_api.py CHANGED
@@ -42,7 +42,7 @@ from api.utils.file_utils import filename_type, thumbnail
42
  from rag.app import book, laws, manual, naive, one, paper, presentation, qa, resume, table, picture, audio, email
43
  from rag.nlp import search
44
  from rag.utils.es_conn import ELASTICSEARCH
45
- from rag.utils.minio_conn import MINIO
46
 
47
  MAXIMUM_OF_UPLOADING_FILES = 256
48
 
@@ -352,7 +352,7 @@ def upload_documents(dataset_id):
352
 
353
  # upload to the minio
354
  location = filename
355
- while MINIO.obj_exist(dataset_id, location):
356
  location += "_"
357
 
358
  blob = file.read()
@@ -361,7 +361,7 @@ def upload_documents(dataset_id):
361
  if blob == b'':
362
  warnings.warn(f"[WARNING]: The content of the file {filename} is empty.")
363
 
364
- MINIO.put(dataset_id, location, blob)
365
 
366
  doc = {
367
  "id": get_uuid(),
@@ -441,7 +441,7 @@ def delete_document(document_id, dataset_id): # string
441
  File2DocumentService.delete_by_document_id(document_id)
442
 
443
  # delete it from minio
444
- MINIO.rm(dataset_id, location)
445
  except Exception as e:
446
  errors += str(e)
447
  if errors:
@@ -596,7 +596,7 @@ def download_document(dataset_id, document_id):
596
 
597
  # The process of downloading
598
  doc_id, doc_location = File2DocumentService.get_minio_address(doc_id=document_id) # minio address
599
- file_stream = MINIO.get(doc_id, doc_location)
600
  if not file_stream:
601
  return construct_json_result(message="This file is empty.", code=RetCode.DATA_ERROR)
602
 
@@ -737,7 +737,7 @@ def parsing_document_internal(id):
737
  doc_id = doc_attributes["id"]
738
 
739
  bucket, doc_name = File2DocumentService.get_minio_address(doc_id=doc_id)
740
- binary = MINIO.get(bucket, doc_name)
741
  parser_name = doc_attributes["parser_id"]
742
  if binary:
743
  res = doc_parse(binary, doc_name, parser_name, tenant_id, doc_id)
 
42
  from rag.app import book, laws, manual, naive, one, paper, presentation, qa, resume, table, picture, audio, email
43
  from rag.nlp import search
44
  from rag.utils.es_conn import ELASTICSEARCH
45
+ from rag.utils.storage_factory import STORAGE_IMPL
46
 
47
  MAXIMUM_OF_UPLOADING_FILES = 256
48
 
 
352
 
353
  # upload to the minio
354
  location = filename
355
+ while STORAGE_IMPL.obj_exist(dataset_id, location):
356
  location += "_"
357
 
358
  blob = file.read()
 
361
  if blob == b'':
362
  warnings.warn(f"[WARNING]: The content of the file {filename} is empty.")
363
 
364
+ STORAGE_IMPL.put(dataset_id, location, blob)
365
 
366
  doc = {
367
  "id": get_uuid(),
 
441
  File2DocumentService.delete_by_document_id(document_id)
442
 
443
  # delete it from minio
444
+ STORAGE_IMPL.rm(dataset_id, location)
445
  except Exception as e:
446
  errors += str(e)
447
  if errors:
 
596
 
597
  # The process of downloading
598
  doc_id, doc_location = File2DocumentService.get_minio_address(doc_id=document_id) # minio address
599
+ file_stream = STORAGE_IMPL.get(doc_id, doc_location)
600
  if not file_stream:
601
  return construct_json_result(message="This file is empty.", code=RetCode.DATA_ERROR)
602
 
 
737
  doc_id = doc_attributes["id"]
738
 
739
  bucket, doc_name = File2DocumentService.get_minio_address(doc_id=doc_id)
740
+ binary = STORAGE_IMPL.get(bucket, doc_name)
741
  parser_name = doc_attributes["parser_id"]
742
  if binary:
743
  res = doc_parse(binary, doc_name, parser_name, tenant_id, doc_id)
api/apps/document_app.py CHANGED
@@ -48,7 +48,7 @@ from api.db import FileType, TaskStatus, ParserType, FileSource, LLMType
48
  from api.db.services.document_service import DocumentService, doc_upload_and_parse
49
  from api.settings import RetCode, stat_logger
50
  from api.utils.api_utils import get_json_result
51
- from rag.utils.minio_conn import MINIO
52
  from api.utils.file_utils import filename_type, thumbnail, get_project_base_directory
53
  from api.utils.web_utils import html2pdf, is_valid_url
54
 
@@ -118,9 +118,9 @@ def web_crawl():
118
  raise RuntimeError("This type of file has not been supported yet!")
119
 
120
  location = filename
121
- while MINIO.obj_exist(kb_id, location):
122
  location += "_"
123
- MINIO.put(kb_id, location, blob)
124
  doc = {
125
  "id": get_uuid(),
126
  "kb_id": kb.id,
@@ -307,7 +307,7 @@ def rm():
307
  FileService.filter_delete([File.source_type == FileSource.KNOWLEDGEBASE, File.id == f2d[0].file_id])
308
  File2DocumentService.delete_by_document_id(doc_id)
309
 
310
- MINIO.rm(b, n)
311
  except Exception as e:
312
  errors += str(e)
313
 
@@ -394,7 +394,7 @@ def get(doc_id):
394
  return get_data_error_result(retmsg="Document not found!")
395
 
396
  b, n = File2DocumentService.get_minio_address(doc_id=doc_id)
397
- response = flask.make_response(MINIO.get(b, n))
398
 
399
  ext = re.search(r"\.([^.]+)$", doc.name)
400
  if ext:
@@ -458,7 +458,7 @@ def change_parser():
458
  def get_image(image_id):
459
  try:
460
  bkt, nm = image_id.split("-")
461
- response = flask.make_response(MINIO.get(bkt, nm))
462
  response.headers.set('Content-Type', 'image/JPEG')
463
  return response
464
  except Exception as e:
 
48
  from api.db.services.document_service import DocumentService, doc_upload_and_parse
49
  from api.settings import RetCode, stat_logger
50
  from api.utils.api_utils import get_json_result
51
+ from rag.utils.storage_factory import STORAGE_IMPL
52
  from api.utils.file_utils import filename_type, thumbnail, get_project_base_directory
53
  from api.utils.web_utils import html2pdf, is_valid_url
54
 
 
118
  raise RuntimeError("This type of file has not been supported yet!")
119
 
120
  location = filename
121
+ while STORAGE_IMPL.obj_exist(kb_id, location):
122
  location += "_"
123
+ STORAGE_IMPL.put(kb_id, location, blob)
124
  doc = {
125
  "id": get_uuid(),
126
  "kb_id": kb.id,
 
307
  FileService.filter_delete([File.source_type == FileSource.KNOWLEDGEBASE, File.id == f2d[0].file_id])
308
  File2DocumentService.delete_by_document_id(doc_id)
309
 
310
+ STORAGE_IMPL.rm(b, n)
311
  except Exception as e:
312
  errors += str(e)
313
 
 
394
  return get_data_error_result(retmsg="Document not found!")
395
 
396
  b, n = File2DocumentService.get_minio_address(doc_id=doc_id)
397
+ response = flask.make_response(STORAGE_IMPL.get(b, n))
398
 
399
  ext = re.search(r"\.([^.]+)$", doc.name)
400
  if ext:
 
458
  def get_image(image_id):
459
  try:
460
  bkt, nm = image_id.split("-")
461
+ response = flask.make_response(STORAGE_IMPL.get(bkt, nm))
462
  response.headers.set('Content-Type', 'image/JPEG')
463
  return response
464
  except Exception as e:
api/apps/file_app.py CHANGED
@@ -34,7 +34,7 @@ from api.utils.api_utils import get_json_result
34
  from api.utils.file_utils import filename_type
35
  from rag.nlp import search
36
  from rag.utils.es_conn import ELASTICSEARCH
37
- from rag.utils.minio_conn import MINIO
38
 
39
 
40
  @manager.route('/upload', methods=['POST'])
@@ -98,7 +98,7 @@ def upload():
98
  # file type
99
  filetype = filename_type(file_obj_names[file_len - 1])
100
  location = file_obj_names[file_len - 1]
101
- while MINIO.obj_exist(last_folder.id, location):
102
  location += "_"
103
  blob = file_obj.read()
104
  filename = duplicate_name(
@@ -260,7 +260,7 @@ def rm():
260
  e, file = FileService.get_by_id(inner_file_id)
261
  if not e:
262
  return get_data_error_result(retmsg="File not found!")
263
- MINIO.rm(file.parent_id, file.location)
264
  FileService.delete_folder_by_pf_id(current_user.id, file_id)
265
  else:
266
  if not FileService.delete(file):
@@ -333,7 +333,7 @@ def get(file_id):
333
  if not e:
334
  return get_data_error_result(retmsg="Document not found!")
335
  b, n = File2DocumentService.get_minio_address(file_id=file_id)
336
- response = flask.make_response(MINIO.get(b, n))
337
  ext = re.search(r"\.([^.]+)$", file.name)
338
  if ext:
339
  if file.type == FileType.VISUAL.value:
 
34
  from api.utils.file_utils import filename_type
35
  from rag.nlp import search
36
  from rag.utils.es_conn import ELASTICSEARCH
37
+ from rag.utils.storage_factory import STORAGE_IMPL
38
 
39
 
40
  @manager.route('/upload', methods=['POST'])
 
98
  # file type
99
  filetype = filename_type(file_obj_names[file_len - 1])
100
  location = file_obj_names[file_len - 1]
101
+ while STORAGE_IMPL.obj_exist(last_folder.id, location):
102
  location += "_"
103
  blob = file_obj.read()
104
  filename = duplicate_name(
 
260
  e, file = FileService.get_by_id(inner_file_id)
261
  if not e:
262
  return get_data_error_result(retmsg="File not found!")
263
+ STORAGE_IMPL.rm(file.parent_id, file.location)
264
  FileService.delete_folder_by_pf_id(current_user.id, file_id)
265
  else:
266
  if not FileService.delete(file):
 
333
  if not e:
334
  return get_data_error_result(retmsg="Document not found!")
335
  b, n = File2DocumentService.get_minio_address(file_id=file_id)
336
+ response = flask.make_response(STORAGE_IMPL.get(b, n))
337
  ext = re.search(r"\.([^.]+)$", file.name)
338
  if ext:
339
  if file.type == FileType.VISUAL.value:
api/apps/system_app.py CHANGED
@@ -22,7 +22,7 @@ from api.utils.api_utils import get_json_result
22
  from api.versions import get_rag_version
23
  from rag.settings import SVR_QUEUE_NAME
24
  from rag.utils.es_conn import ELASTICSEARCH
25
- from rag.utils.minio_conn import MINIO
26
  from timeit import default_timer as timer
27
 
28
  from rag.utils.redis_conn import REDIS_CONN
@@ -47,7 +47,7 @@ def status():
47
 
48
  st = timer()
49
  try:
50
- MINIO.health()
51
  res["minio"] = {"status": "green", "elapsed": "{:.1f}".format((timer() - st)*1000.)}
52
  except Exception as e:
53
  res["minio"] = {"status": "red", "elapsed": "{:.1f}".format((timer() - st)*1000.), "error": str(e)}
 
22
  from api.versions import get_rag_version
23
  from rag.settings import SVR_QUEUE_NAME
24
  from rag.utils.es_conn import ELASTICSEARCH
25
+ from rag.utils.storage_factory import STORAGE_IMPL
26
  from timeit import default_timer as timer
27
 
28
  from rag.utils.redis_conn import REDIS_CONN
 
47
 
48
  st = timer()
49
  try:
50
+ STORAGE_IMPL.health()
51
  res["minio"] = {"status": "green", "elapsed": "{:.1f}".format((timer() - st)*1000.)}
52
  except Exception as e:
53
  res["minio"] = {"status": "red", "elapsed": "{:.1f}".format((timer() - st)*1000.), "error": str(e)}
api/db/services/document_service.py CHANGED
@@ -34,7 +34,7 @@ from api.utils.file_utils import get_project_base_directory
34
  from graphrag.mind_map_extractor import MindMapExtractor
35
  from rag.settings import SVR_QUEUE_NAME
36
  from rag.utils.es_conn import ELASTICSEARCH
37
- from rag.utils.minio_conn import MINIO
38
  from rag.nlp import search, rag_tokenizer
39
 
40
  from api.db import FileType, TaskStatus, ParserType, LLMType
@@ -473,7 +473,7 @@ def doc_upload_and_parse(conversation_id, file_objs, user_id):
473
  else:
474
  d["image"].save(output_buffer, format='JPEG')
475
 
476
- MINIO.put(kb.id, d["_id"], output_buffer.getvalue())
477
  d["img_id"] = "{}-{}".format(kb.id, d["_id"])
478
  del d["image"]
479
  docs.append(d)
 
34
  from graphrag.mind_map_extractor import MindMapExtractor
35
  from rag.settings import SVR_QUEUE_NAME
36
  from rag.utils.es_conn import ELASTICSEARCH
37
+ from rag.utils.storage_factory import STORAGE_IMPL
38
  from rag.nlp import search, rag_tokenizer
39
 
40
  from api.db import FileType, TaskStatus, ParserType, LLMType
 
473
  else:
474
  d["image"].save(output_buffer, format='JPEG')
475
 
476
+ STORAGE_IMPL.put(kb.id, d["_id"], output_buffer.getvalue())
477
  d["img_id"] = "{}-{}".format(kb.id, d["_id"])
478
  del d["image"]
479
  docs.append(d)
api/db/services/file_service.py CHANGED
@@ -27,7 +27,7 @@ from api.db.services.document_service import DocumentService
27
  from api.db.services.file2document_service import File2DocumentService
28
  from api.utils import get_uuid
29
  from api.utils.file_utils import filename_type, thumbnail
30
- from rag.utils.minio_conn import MINIO
31
 
32
 
33
  class FileService(CommonService):
@@ -350,10 +350,10 @@ class FileService(CommonService):
350
  raise RuntimeError("This type of file has not been supported yet!")
351
 
352
  location = filename
353
- while MINIO.obj_exist(kb.id, location):
354
  location += "_"
355
  blob = file.read()
356
- MINIO.put(kb.id, location, blob)
357
  doc = {
358
  "id": get_uuid(),
359
  "kb_id": kb.id,
 
27
  from api.db.services.file2document_service import File2DocumentService
28
  from api.utils import get_uuid
29
  from api.utils.file_utils import filename_type, thumbnail
30
+ from rag.utils.storage_factory import STORAGE_IMPL
31
 
32
 
33
  class FileService(CommonService):
 
350
  raise RuntimeError("This type of file has not been supported yet!")
351
 
352
  location = filename
353
+ while STORAGE_IMPL.obj_exist(kb.id, location):
354
  location += "_"
355
  blob = file.read()
356
+ STORAGE_IMPL.put(kb.id, location, blob)
357
  doc = {
358
  "id": get_uuid(),
359
  "kb_id": kb.id,
api/db/services/task_service.py CHANGED
@@ -27,7 +27,7 @@ from api.db.services.document_service import DocumentService
27
  from api.utils import current_timestamp, get_uuid
28
  from deepdoc.parser.excel_parser import RAGFlowExcelParser
29
  from rag.settings import SVR_QUEUE_NAME
30
- from rag.utils.minio_conn import MINIO
31
  from rag.utils.redis_conn import REDIS_CONN
32
 
33
 
@@ -143,7 +143,7 @@ def queue_tasks(doc, bucket, name):
143
  tsks = []
144
 
145
  if doc["type"] == FileType.PDF.value:
146
- file_bin = MINIO.get(bucket, name)
147
  do_layout = doc["parser_config"].get("layout_recognize", True)
148
  pages = PdfParser.total_page_number(doc["name"], file_bin)
149
  page_size = doc["parser_config"].get("task_page_size", 12)
@@ -169,7 +169,7 @@ def queue_tasks(doc, bucket, name):
169
  tsks.append(task)
170
 
171
  elif doc["parser_id"] == "table":
172
- file_bin = MINIO.get(bucket, name)
173
  rn = RAGFlowExcelParser.row_number(
174
  doc["name"], file_bin)
175
  for i in range(0, rn, 3000):
 
27
  from api.utils import current_timestamp, get_uuid
28
  from deepdoc.parser.excel_parser import RAGFlowExcelParser
29
  from rag.settings import SVR_QUEUE_NAME
30
+ from rag.utils.storage_factory import STORAGE_IMPL
31
  from rag.utils.redis_conn import REDIS_CONN
32
 
33
 
 
143
  tsks = []
144
 
145
  if doc["type"] == FileType.PDF.value:
146
+ file_bin = STORAGE_IMPL.get(bucket, name)
147
  do_layout = doc["parser_config"].get("layout_recognize", True)
148
  pages = PdfParser.total_page_number(doc["name"], file_bin)
149
  page_size = doc["parser_config"].get("task_page_size", 12)
 
169
  tsks.append(task)
170
 
171
  elif doc["parser_id"] == "table":
172
+ file_bin = STORAGE_IMPL.get(bucket, name)
173
  rn = RAGFlowExcelParser.row_number(
174
  doc["name"], file_bin)
175
  for i in range(0, rn, 3000):
conf/service_conf.yaml CHANGED
@@ -13,6 +13,22 @@ minio:
13
  user: 'rag_flow'
14
  password: 'infini_rag_flow'
15
  host: 'minio:9000'
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
16
  es:
17
  hosts: 'http://es01:9200'
18
  username: 'elastic'
 
13
  user: 'rag_flow'
14
  password: 'infini_rag_flow'
15
  host: 'minio:9000'
16
+ azure:
17
+ auth_type: 'sas'
18
+ container_url: 'container_url'
19
+ sas_token: 'sas_token'
20
+ #azure:
21
+ # auth_type: 'spn'
22
+ # account_url: 'account_url'
23
+ # client_id: 'client_id'
24
+ # secret: 'secret'
25
+ # tenant_id: 'tenant_id'
26
+ # container_name: 'container_name'
27
+ s3:
28
+ endpoint: 'endpoint'
29
+ access_key: 'access_key'
30
+ secret_key: 'secret_key'
31
+ region: 'region'
32
  es:
33
  hosts: 'http://es01:9200'
34
  username: 'elastic'
rag/settings.py CHANGED
@@ -24,6 +24,8 @@ RAG_CONF_PATH = os.path.join(get_project_base_directory(), "conf")
24
  SUBPROCESS_STD_LOG_NAME = "std.log"
25
 
26
  ES = get_base_config("es", {})
 
 
27
  MINIO = decrypt_database_config(name="minio")
28
  try:
29
  REDIS = decrypt_database_config(name="redis")
@@ -43,6 +45,8 @@ LoggerFactory.LEVEL = 30
43
 
44
  es_logger = getLogger("es")
45
  minio_logger = getLogger("minio")
 
 
46
  cron_logger = getLogger("cron_logger")
47
  cron_logger.setLevel(20)
48
  chunk_logger = getLogger("chunk_logger")
 
24
  SUBPROCESS_STD_LOG_NAME = "std.log"
25
 
26
  ES = get_base_config("es", {})
27
+ AZURE = get_base_config("azure", {})
28
+ S3 = get_base_config("s3", {})
29
  MINIO = decrypt_database_config(name="minio")
30
  try:
31
  REDIS = decrypt_database_config(name="redis")
 
45
 
46
  es_logger = getLogger("es")
47
  minio_logger = getLogger("minio")
48
+ s3_logger = getLogger("s3")
49
+ azure_logger = getLogger("azure")
50
  cron_logger = getLogger("cron_logger")
51
  cron_logger.setLevel(20)
52
  chunk_logger = getLogger("chunk_logger")
rag/svr/cache_file_svr.py CHANGED
@@ -20,7 +20,7 @@ import traceback
20
  from api.db.db_models import close_connection
21
  from api.db.services.task_service import TaskService
22
  from rag.settings import cron_logger
23
- from rag.utils.minio_conn import MINIO
24
  from rag.utils.redis_conn import REDIS_CONN
25
 
26
 
@@ -42,7 +42,7 @@ def main():
42
  try:
43
  key = "{}/{}".format(kb_id, loc)
44
  if REDIS_CONN.exist(key):continue
45
- file_bin = MINIO.get(kb_id, loc)
46
  REDIS_CONN.transaction(key, file_bin, 12 * 60)
47
  cron_logger.info("CACHE: {}".format(loc))
48
  except Exception as e:
 
20
  from api.db.db_models import close_connection
21
  from api.db.services.task_service import TaskService
22
  from rag.settings import cron_logger
23
+ from rag.utils.storage_factory import STORAGE_IMPL
24
  from rag.utils.redis_conn import REDIS_CONN
25
 
26
 
 
42
  try:
43
  key = "{}/{}".format(kb_id, loc)
44
  if REDIS_CONN.exist(key):continue
45
+ file_bin = STORAGE_IMPL.get(kb_id, loc)
46
  REDIS_CONN.transaction(key, file_bin, 12 * 60)
47
  cron_logger.info("CACHE: {}".format(loc))
48
  except Exception as e:
rag/svr/task_executor.py CHANGED
@@ -29,7 +29,7 @@ from functools import partial
29
  from api.db.services.file2document_service import File2DocumentService
30
  from api.settings import retrievaler
31
  from rag.raptor import RecursiveAbstractiveProcessing4TreeOrganizedRetrieval as Raptor
32
- from rag.utils.minio_conn import MINIO
33
  from api.db.db_models import close_connection
34
  from rag.settings import database_logger, SVR_QUEUE_NAME
35
  from rag.settings import cron_logger, DOC_MAXIMUM_SIZE
@@ -138,7 +138,7 @@ def collect():
138
 
139
 
140
  def get_minio_binary(bucket, name):
141
- return MINIO.get(bucket, name)
142
 
143
 
144
  def build(row):
@@ -214,7 +214,7 @@ def build(row):
214
  d["image"].save(output_buffer, format='JPEG')
215
 
216
  st = timer()
217
- MINIO.put(row["kb_id"], d["_id"], output_buffer.getvalue())
218
  el += timer() - st
219
  except Exception as e:
220
  cron_logger.error(str(e))
 
29
  from api.db.services.file2document_service import File2DocumentService
30
  from api.settings import retrievaler
31
  from rag.raptor import RecursiveAbstractiveProcessing4TreeOrganizedRetrieval as Raptor
32
+ from rag.utils.storage_factory import STORAGE_IMPL
33
  from api.db.db_models import close_connection
34
  from rag.settings import database_logger, SVR_QUEUE_NAME
35
  from rag.settings import cron_logger, DOC_MAXIMUM_SIZE
 
138
 
139
 
140
  def get_minio_binary(bucket, name):
141
+ return STORAGE_IMPL.get(bucket, name)
142
 
143
 
144
  def build(row):
 
214
  d["image"].save(output_buffer, format='JPEG')
215
 
216
  st = timer()
217
+ STORAGE_IMPL.put(row["kb_id"], d["_id"], output_buffer.getvalue())
218
  el += timer() - st
219
  except Exception as e:
220
  cron_logger.error(str(e))
rag/utils/azure_sas_conn.py ADDED
@@ -0,0 +1,80 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import time
3
+ from io import BytesIO
4
+ from rag import settings
5
+ from rag.settings import azure_logger
6
+ from rag.utils import singleton
7
+ from azure.storage.blob import ContainerClient
8
+
9
+
10
+ @singleton
11
+ class RAGFlowAzureSasBlob(object):
12
+ def __init__(self):
13
+ self.conn = None
14
+ self.container_url = os.getenv('CONTAINER_URL', settings.AZURE["container_url"])
15
+ self.sas_token = os.getenv('SAS_TOKEN', settings.AZURE["sas_token"])
16
+ self.__open__()
17
+
18
+ def __open__(self):
19
+ try:
20
+ if self.conn:
21
+ self.__close__()
22
+ except Exception as e:
23
+ pass
24
+
25
+ try:
26
+ self.conn = ContainerClient.from_container_url(self.account_url + "?" + self.sas_token)
27
+ except Exception as e:
28
+ azure_logger.error(
29
+ "Fail to connect %s " % self.account_url + str(e))
30
+
31
+ def __close__(self):
32
+ del self.conn
33
+ self.conn = None
34
+
35
+ def health(self):
36
+ bucket, fnm, binary = "txtxtxtxt1", "txtxtxtxt1", b"_t@@@1"
37
+ return self.conn.upload_blob(name=fnm, data=BytesIO(binary), length=len(binary))
38
+
39
+ def put(self, bucket, fnm, binary):
40
+ for _ in range(3):
41
+ try:
42
+ return self.conn.upload_blob(name=fnm, data=BytesIO(binary), length=len(binary))
43
+ except Exception as e:
44
+ azure_logger.error(f"Fail put {bucket}/{fnm}: " + str(e))
45
+ self.__open__()
46
+ time.sleep(1)
47
+
48
+ def rm(self, bucket, fnm):
49
+ try:
50
+ self.conn.delete_blob(fnm)
51
+ except Exception as e:
52
+ azure_logger.error(f"Fail rm {bucket}/{fnm}: " + str(e))
53
+
54
+ def get(self, bucket, fnm):
55
+ for _ in range(1):
56
+ try:
57
+ r = self.conn.download_blob(fnm)
58
+ return r.read()
59
+ except Exception as e:
60
+ azure_logger.error(f"fail get {bucket}/{fnm}: " + str(e))
61
+ self.__open__()
62
+ time.sleep(1)
63
+ return
64
+
65
+ def obj_exist(self, bucket, fnm):
66
+ try:
67
+ return self.conn.get_blob_client(fnm).exists()
68
+ except Exception as e:
69
+ azure_logger.error(f"Fail put {bucket}/{fnm}: " + str(e))
70
+ return False
71
+
72
+ def get_presigned_url(self, bucket, fnm, expires):
73
+ for _ in range(10):
74
+ try:
75
+ return self.conn.get_presigned_url("GET", bucket, fnm, expires)
76
+ except Exception as e:
77
+ azure_logger.error(f"fail get {bucket}/{fnm}: " + str(e))
78
+ self.__open__()
79
+ time.sleep(1)
80
+ return
rag/utils/azure_spn_conn.py ADDED
@@ -0,0 +1,90 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import time
3
+ from rag import settings
4
+ from rag.settings import azure_logger
5
+ from rag.utils import singleton
6
+ from azure.identity import ClientSecretCredential, AzureAuthorityHosts
7
+ from azure.storage.filedatalake import FileSystemClient
8
+
9
+
10
+ @singleton
11
+ class RAGFlowAzureSpnBlob(object):
12
+ def __init__(self):
13
+ self.conn = None
14
+ self.account_url = os.getenv('ACCOUNT_URL', settings.AZURE["account_url"])
15
+ self.client_id = os.getenv('CLIENT_ID', settings.AZURE["client_id"])
16
+ self.secret = os.getenv('SECRET', settings.AZURE["secret"])
17
+ self.tenant_id = os.getenv('TENANT_ID', settings.AZURE["tenant_id"])
18
+ self.container_name = os.getenv('CONTAINER_NAME', settings.AZURE["container_name"])
19
+ self.__open__()
20
+
21
+ def __open__(self):
22
+ try:
23
+ if self.conn:
24
+ self.__close__()
25
+ except Exception as e:
26
+ pass
27
+
28
+ try:
29
+ credentials = ClientSecretCredential(tenant_id=self.tenant_id, client_id=self.client_id, client_secret=self.secret, authority=AzureAuthorityHosts.AZURE_CHINA)
30
+ self.conn = FileSystemClient(account_url=self.account_url, file_system_name=self.container_name, credential=credentials)
31
+ except Exception as e:
32
+ azure_logger.error(
33
+ "Fail to connect %s " % self.account_url + str(e))
34
+
35
+ def __close__(self):
36
+ del self.conn
37
+ self.conn = None
38
+
39
+ def health(self):
40
+ bucket, fnm, binary = "txtxtxtxt1", "txtxtxtxt1", b"_t@@@1"
41
+ f = self.conn.create_file(fnm)
42
+ f.append_data(binary, offset=0, length=len(binary))
43
+ return f.flush_data(len(binary))
44
+
45
+ def put(self, bucket, fnm, binary):
46
+ for _ in range(3):
47
+ try:
48
+ f = self.conn.create_file(fnm)
49
+ f.append_data(binary, offset=0, length=len(binary))
50
+ return f.flush_data(len(binary))
51
+ except Exception as e:
52
+ azure_logger.error(f"Fail put {bucket}/{fnm}: " + str(e))
53
+ self.__open__()
54
+ time.sleep(1)
55
+
56
+ def rm(self, bucket, fnm):
57
+ try:
58
+ self.conn.delete_file(fnm)
59
+ except Exception as e:
60
+ azure_logger.error(f"Fail rm {bucket}/{fnm}: " + str(e))
61
+
62
+ def get(self, bucket, fnm):
63
+ for _ in range(1):
64
+ try:
65
+ client = self.conn.get_file_client(fnm)
66
+ r = client.download_file()
67
+ return r.read()
68
+ except Exception as e:
69
+ azure_logger.error(f"fail get {bucket}/{fnm}: " + str(e))
70
+ self.__open__()
71
+ time.sleep(1)
72
+ return
73
+
74
+ def obj_exist(self, bucket, fnm):
75
+ try:
76
+ client = self.conn.get_file_client(fnm)
77
+ return client.exists()
78
+ except Exception as e:
79
+ azure_logger.error(f"Fail put {bucket}/{fnm}: " + str(e))
80
+ return False
81
+
82
+ def get_presigned_url(self, bucket, fnm, expires):
83
+ for _ in range(10):
84
+ try:
85
+ return self.conn.get_presigned_url("GET", bucket, fnm, expires)
86
+ except Exception as e:
87
+ azure_logger.error(f"fail get {bucket}/{fnm}: " + str(e))
88
+ self.__open__()
89
+ time.sleep(1)
90
+ return
rag/utils/s3_conn.py ADDED
@@ -0,0 +1,135 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import boto3
2
+ import os
3
+ from botocore.exceptions import ClientError
4
+ from botocore.client import Config
5
+ import time
6
+ from io import BytesIO
7
+ from rag.settings import s3_logger
8
+ from rag.utils import singleton
9
+
10
+ @singleton
11
+ class RAGFlowS3(object):
12
+ def __init__(self):
13
+ self.conn = None
14
+ self.endpoint = os.getenv('ENDPOINT', None)
15
+ self.access_key = os.getenv('ACCESS_KEY', None)
16
+ self.secret_key = os.getenv('SECRET_KEY', None)
17
+ self.region = os.getenv('REGION', None)
18
+ self.__open__()
19
+
20
+ def __open__(self):
21
+ try:
22
+ if self.conn:
23
+ self.__close__()
24
+ except Exception as e:
25
+ pass
26
+
27
+ try:
28
+
29
+ config = Config(
30
+ s3={
31
+ 'addressing_style': 'virtual'
32
+ }
33
+ )
34
+
35
+ self.conn = boto3.client(
36
+ 's3',
37
+ endpoint_url=self.endpoint,
38
+ region_name=self.region,
39
+ aws_access_key_id=self.access_key,
40
+ aws_secret_access_key=self.secret_key,
41
+ config=config
42
+ )
43
+ except Exception as e:
44
+ s3_logger.error(
45
+ "Fail to connect %s " % self.endpoint + str(e))
46
+
47
+ def __close__(self):
48
+ del self.conn
49
+ self.conn = None
50
+
51
+ def bucket_exists(self, bucket):
52
+ try:
53
+ s3_logger.error(f"head_bucket bucketname {bucket}")
54
+ self.conn.head_bucket(Bucket=bucket)
55
+ exists = True
56
+ except ClientError as e:
57
+ s3_logger.error(f"head_bucket error {bucket}: " + str(e))
58
+ exists = False
59
+ return exists
60
+
61
+ def health(self):
62
+ bucket, fnm, binary = "txtxtxtxt1", "txtxtxtxt1", b"_t@@@1"
63
+
64
+ if not self.bucket_exists(bucket):
65
+ self.conn.create_bucket(Bucket=bucket)
66
+ s3_logger.error(f"create bucket {bucket} ********")
67
+
68
+ r = self.conn.upload_fileobj(BytesIO(binary), bucket, fnm)
69
+ return r
70
+
71
+ def get_properties(self, bucket, key):
72
+ return {}
73
+
74
+ def list(self, bucket, dir, recursive=True):
75
+ return []
76
+
77
+ def put(self, bucket, fnm, binary):
78
+ s3_logger.error(f"bucket name {bucket}; filename :{fnm}:")
79
+ for _ in range(1):
80
+ try:
81
+ if not self.bucket_exists(bucket):
82
+ self.conn.create_bucket(Bucket=bucket)
83
+ s3_logger.error(f"create bucket {bucket} ********")
84
+ r = self.conn.upload_fileobj(BytesIO(binary), bucket, fnm)
85
+
86
+ return r
87
+ except Exception as e:
88
+ s3_logger.error(f"Fail put {bucket}/{fnm}: " + str(e))
89
+ self.__open__()
90
+ time.sleep(1)
91
+
92
+ def rm(self, bucket, fnm):
93
+ try:
94
+ self.conn.delete_object(Bucket=bucket, Key=fnm)
95
+ except Exception as e:
96
+ s3_logger.error(f"Fail rm {bucket}/{fnm}: " + str(e))
97
+
98
+ def get(self, bucket, fnm):
99
+ for _ in range(1):
100
+ try:
101
+ r = self.conn.get_object(Bucket=bucket, Key=fnm)
102
+ object_data = r['Body'].read()
103
+ return object_data
104
+ except Exception as e:
105
+ s3_logger.error(f"fail get {bucket}/{fnm}: " + str(e))
106
+ self.__open__()
107
+ time.sleep(1)
108
+ return
109
+
110
+ def obj_exist(self, bucket, fnm):
111
+ try:
112
+
113
+ if self.conn.head_object(Bucket=bucket, Key=fnm):
114
+ return True
115
+ except ClientError as e:
116
+ if e.response['Error']['Code'] == '404':
117
+
118
+ return False
119
+ else:
120
+ raise
121
+
122
+ def get_presigned_url(self, bucket, fnm, expires):
123
+ for _ in range(10):
124
+ try:
125
+ r = self.conn.generate_presigned_url('get_object',
126
+ Params={'Bucket': bucket,
127
+ 'Key': fnm},
128
+ ExpiresIn=expires)
129
+
130
+ return r
131
+ except Exception as e:
132
+ s3_logger.error(f"fail get url {bucket}/{fnm}: " + str(e))
133
+ self.__open__()
134
+ time.sleep(1)
135
+ return
rag/utils/storage_factory.py ADDED
@@ -0,0 +1,30 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ from enum import Enum
3
+
4
+ from rag.utils.azure_sas_conn import RAGFlowAzureSasBlob
5
+ from rag.utils.azure_spn_conn import RAGFlowAzureSpnBlob
6
+ from rag.utils.minio_conn import RAGFlowMinio
7
+ from rag.utils.s3_conn import RAGFlowS3
8
+
9
+
10
+ class Storage(Enum):
11
+ MINIO = 1
12
+ AZURE_SPN = 2
13
+ AZURE_SAS = 3
14
+ AWS_S3 = 4
15
+
16
+
17
+ class StorageFactory:
18
+ storage_mapping = {
19
+ Storage.MINIO: RAGFlowMinio,
20
+ Storage.AZURE_SPN: RAGFlowAzureSpnBlob,
21
+ Storage.AZURE_SAS: RAGFlowAzureSasBlob,
22
+ Storage.AWS_S3: RAGFlowS3,
23
+ }
24
+
25
+ @classmethod
26
+ def create(cls, storage: Storage):
27
+ return cls.storage_mapping[storage]()
28
+
29
+
30
+ STORAGE_IMPL = StorageFactory.create(Storage[os.getenv('STORAGE_IMPL', 'MINIO')])
requirements.txt CHANGED
@@ -1,5 +1,7 @@
1
- akshare==1.14.72
2
- anthropic==0.34.1
 
 
3
  arxiv==2.1.3
4
  Aspose.Slides==24.2.0
5
  BCEmbedding==0.1.3
 
1
+ azure-storage-blob==12.22.0
2
+ azure-identity==1.17.1
3
+ azure-storage-file-datalake==12.16.0
4
+ anthropic===0.34.1
5
  arxiv==2.1.3
6
  Aspose.Slides==24.2.0
7
  BCEmbedding==0.1.3