KevinHuSh commited on
Commit
cac848f
·
1 Parent(s): 54ec234

fix bug about fetching file from minio (#574)

Browse files

### What problem does this PR solve?


### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)

api/apps/file_app.py CHANGED
@@ -328,12 +328,12 @@ def rename():
328
  # @login_required
329
  def get(file_id):
330
  try:
331
- e, doc = FileService.get_by_id(file_id)
332
  if not e:
333
  return get_data_error_result(retmsg="Document not found!")
334
 
335
- response = flask.make_response(MINIO.get(doc.parent_id, doc.location))
336
- ext = re.search(r"\.([^.]+)$", doc.name)
337
  if ext:
338
  if doc.type == FileType.VISUAL.value:
339
  response.headers.set('Content-Type', 'image/%s' % ext.group(1))
 
328
  # @login_required
329
  def get(file_id):
330
  try:
331
+ e, file = FileService.get_by_id(file_id)
332
  if not e:
333
  return get_data_error_result(retmsg="Document not found!")
334
 
335
+ response = flask.make_response(MINIO.get(file.parent_id, file.location))
336
+ ext = re.search(r"\.([^.]+)$", file.name)
337
  if ext:
338
  if doc.type == FileType.VISUAL.value:
339
  response.headers.set('Content-Type', 'image/%s' % ext.group(1))
api/db/services/file2document_service.py CHANGED
@@ -18,6 +18,8 @@ from datetime import datetime
18
  from api.db.db_models import DB
19
  from api.db.db_models import File, Document, File2Document
20
  from api.db.services.common_service import CommonService
 
 
21
  from api.utils import current_timestamp, datetime_format
22
 
23
 
@@ -64,3 +66,18 @@ class File2DocumentService(CommonService):
64
  num = cls.model.update(obj).where(cls.model.id == file_id).execute()
65
  e, obj = cls.get_by_id(cls.model.id)
66
  return obj
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
18
  from api.db.db_models import DB
19
  from api.db.db_models import File, Document, File2Document
20
  from api.db.services.common_service import CommonService
21
+ from api.db.services.document_service import DocumentService
22
+ from api.db.services.file_service import FileService
23
  from api.utils import current_timestamp, datetime_format
24
 
25
 
 
66
  num = cls.model.update(obj).where(cls.model.id == file_id).execute()
67
  e, obj = cls.get_by_id(cls.model.id)
68
  return obj
69
+
70
+ @classmethod
71
+ @DB.connection_context()
72
+ def get_minio_address(cls, doc_id=None, file_id=None):
73
+ if doc_id:
74
+ ids = File2DocumentService.get_by_document_id(doc_id)
75
+ else:
76
+ ids = File2DocumentService.get_by_file_id(file_id)
77
+ if ids:
78
+ e, file = FileService.get_by_id(ids[0].file_id)
79
+ return file.parent_id, file.location
80
+ else:
81
+ assert doc_id, "please specify doc_id"
82
+ e, doc = DocumentService.get_by_id(doc_id)
83
+ return doc.kb_id, doc.location
api/db/services/file_service.py CHANGED
@@ -21,7 +21,6 @@ from api.db.db_models import DB, File2Document, Knowledgebase
21
  from api.db.db_models import File, Document
22
  from api.db.services.common_service import CommonService
23
  from api.utils import get_uuid
24
- from rag.utils import MINIO
25
 
26
 
27
  class FileService(CommonService):
@@ -241,3 +240,4 @@ class FileService(CommonService):
241
 
242
  dfs(folder_id)
243
  return size
 
 
21
  from api.db.db_models import File, Document
22
  from api.db.services.common_service import CommonService
23
  from api.utils import get_uuid
 
24
 
25
 
26
  class FileService(CommonService):
 
240
 
241
  dfs(folder_id)
242
  return size
243
+
api/db/services/task_service.py CHANGED
@@ -15,8 +15,8 @@
15
  #
16
  import random
17
 
18
- from peewee import Expression
19
- from api.db.db_models import DB
20
  from api.db import StatusEnum, FileType, TaskStatus
21
  from api.db.db_models import Task, Document, Knowledgebase, Tenant
22
  from api.db.services.common_service import CommonService
@@ -75,8 +75,10 @@ class TaskService(CommonService):
75
  @DB.connection_context()
76
  def get_ongoing_doc_name(cls):
77
  with DB.lock("get_task", -1):
78
- docs = cls.model.select(*[Document.kb_id, Document.location]) \
79
  .join(Document, on=(cls.model.doc_id == Document.id)) \
 
 
80
  .where(
81
  Document.status == StatusEnum.VALID.value,
82
  Document.run == TaskStatus.RUNNING.value,
@@ -88,7 +90,7 @@ class TaskService(CommonService):
88
  docs = list(docs.dicts())
89
  if not docs: return []
90
 
91
- return list(set([(d["kb_id"], d["location"]) for d in docs]))
92
 
93
  @classmethod
94
  @DB.connection_context()
 
15
  #
16
  import random
17
 
18
+ from peewee import Expression, JOIN
19
+ from api.db.db_models import DB, File2Document, File
20
  from api.db import StatusEnum, FileType, TaskStatus
21
  from api.db.db_models import Task, Document, Knowledgebase, Tenant
22
  from api.db.services.common_service import CommonService
 
75
  @DB.connection_context()
76
  def get_ongoing_doc_name(cls):
77
  with DB.lock("get_task", -1):
78
+ docs = cls.model.select(*[Document.id, Document.kb_id, Document.location, File.parent_id]) \
79
  .join(Document, on=(cls.model.doc_id == Document.id)) \
80
+ .join(File2Document, on=(File2Document.document_id == Document.id), join_type=JOIN.LEFT_OUTER) \
81
+ .join(File, on=(File2Document.file_id == File.id)) \
82
  .where(
83
  Document.status == StatusEnum.VALID.value,
84
  Document.run == TaskStatus.RUNNING.value,
 
90
  docs = list(docs.dicts())
91
  if not docs: return []
92
 
93
+ return list(set([(d["parent_id"] if d["parent_id"] else d["kb_id"], d["location"]) for d in docs]))
94
 
95
  @classmethod
96
  @DB.connection_context()
rag/svr/task_broker.py CHANGED
@@ -20,6 +20,8 @@ import random
20
  from datetime import datetime
21
  from api.db.db_models import Task
22
  from api.db.db_utils import bulk_insert_into_db
 
 
23
  from api.db.services.task_service import TaskService
24
  from deepdoc.parser import PdfParser
25
  from deepdoc.parser.excel_parser import HuExcelParser
@@ -87,10 +89,11 @@ def dispatch():
87
 
88
  tsks = []
89
  try:
90
- file_bin = MINIO.get(r["kb_id"], r["location"])
 
91
  if REDIS_CONN.is_alive():
92
  try:
93
- REDIS_CONN.set("{}/{}".format(r["kb_id"], r["location"]), file_bin, 12*60)
94
  except Exception as e:
95
  cron_logger.warning("Put into redis[EXCEPTION]:" + str(e))
96
 
 
20
  from datetime import datetime
21
  from api.db.db_models import Task
22
  from api.db.db_utils import bulk_insert_into_db
23
+ from api.db.services.file2document_service import File2DocumentService
24
+ from api.db.services.file_service import FileService
25
  from api.db.services.task_service import TaskService
26
  from deepdoc.parser import PdfParser
27
  from deepdoc.parser.excel_parser import HuExcelParser
 
89
 
90
  tsks = []
91
  try:
92
+ bucket, name = File2DocumentService.get_minio_address(doc_id=r["id"])
93
+ file_bin = MINIO.get(bucket, name)
94
  if REDIS_CONN.is_alive():
95
  try:
96
+ REDIS_CONN.set("{}/{}".format(bucket, name), file_bin, 12*60)
97
  except Exception as e:
98
  cron_logger.warning("Put into redis[EXCEPTION]:" + str(e))
99
 
rag/svr/task_executor.py CHANGED
@@ -24,6 +24,8 @@ import sys
24
  import time
25
  import traceback
26
  from functools import partial
 
 
27
  from rag.utils import MINIO
28
  from api.db.db_models import close_connection
29
  from rag.settings import database_logger
@@ -135,7 +137,8 @@ def build(row):
135
  pool = Pool(processes=1)
136
  try:
137
  st = timer()
138
- thr = pool.apply_async(get_minio_binary, args=(row["kb_id"], row["location"]))
 
139
  binary = thr.get(timeout=90)
140
  pool.terminate()
141
  cron_logger.info(
 
24
  import time
25
  import traceback
26
  from functools import partial
27
+
28
+ from api.db.services.file2document_service import File2DocumentService
29
  from rag.utils import MINIO
30
  from api.db.db_models import close_connection
31
  from rag.settings import database_logger
 
137
  pool = Pool(processes=1)
138
  try:
139
  st = timer()
140
+ bucket, name = File2DocumentService.get_minio_address(doc_id=row["doc_id"])
141
+ thr = pool.apply_async(get_minio_binary, args=(bucket, name))
142
  binary = thr.get(timeout=90)
143
  pool.terminate()
144
  cron_logger.info(