Kevin Hu commited on
Commit
36b9967
·
1 Parent(s): 05c4835

refine upload & parse (#1969)

Browse files

### What problem does this PR solve?


### Type of change
- [x] Refactoring

api/apps/api_app.py CHANGED
@@ -26,7 +26,7 @@ from api.db.db_models import APIToken, API4Conversation, Task, File
26
  from api.db.services import duplicate_name
27
  from api.db.services.api_service import APITokenService, API4ConversationService
28
  from api.db.services.dialog_service import DialogService, chat
29
- from api.db.services.document_service import DocumentService
30
  from api.db.services.file2document_service import File2DocumentService
31
  from api.db.services.file_service import FileService
32
  from api.db.services.knowledgebase_service import KnowledgebaseService
@@ -470,6 +470,29 @@ def upload():
470
  return get_json_result(data=doc_result.to_json())
471
 
472
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
473
  @manager.route('/list_chunks', methods=['POST'])
474
  # @login_required
475
  def list_chunks():
@@ -560,7 +583,6 @@ def document_rm():
560
 
561
  tenant_id = objs[0].tenant_id
562
  req = request.json
563
- doc_ids = []
564
  try:
565
  doc_ids = [DocumentService.get_doc_id_by_doc_name(doc_name) for doc_name in req.get("doc_names", [])]
566
  for doc_id in req.get("doc_ids", []):
 
26
  from api.db.services import duplicate_name
27
  from api.db.services.api_service import APITokenService, API4ConversationService
28
  from api.db.services.dialog_service import DialogService, chat
29
+ from api.db.services.document_service import DocumentService, doc_upload_and_parse
30
  from api.db.services.file2document_service import File2DocumentService
31
  from api.db.services.file_service import FileService
32
  from api.db.services.knowledgebase_service import KnowledgebaseService
 
470
  return get_json_result(data=doc_result.to_json())
471
 
472
 
473
+ @manager.route('/document/upload_and_parse', methods=['POST'])
474
+ @validate_request("conversation_id")
475
+ def upload_parse():
476
+ token = request.headers.get('Authorization').split()[1]
477
+ objs = APIToken.query(token=token)
478
+ if not objs:
479
+ return get_json_result(
480
+ data=False, retmsg='Token is not valid!"', retcode=RetCode.AUTHENTICATION_ERROR)
481
+
482
+ if 'file' not in request.files:
483
+ return get_json_result(
484
+ data=False, retmsg='No file part!', retcode=RetCode.ARGUMENT_ERROR)
485
+
486
+ file_objs = request.files.getlist('file')
487
+ for file_obj in file_objs:
488
+ if file_obj.filename == '':
489
+ return get_json_result(
490
+ data=False, retmsg='No file selected!', retcode=RetCode.ARGUMENT_ERROR)
491
+
492
+ doc_ids = doc_upload_and_parse(request.form.get("conversation_id"), file_objs, objs[0].tenant_id)
493
+ return get_json_result(data=doc_ids)
494
+
495
+
496
  @manager.route('/list_chunks', methods=['POST'])
497
  # @login_required
498
  def list_chunks():
 
583
 
584
  tenant_id = objs[0].tenant_id
585
  req = request.json
 
586
  try:
587
  doc_ids = [DocumentService.get_doc_id_by_doc_name(doc_name) for doc_name in req.get("doc_names", [])]
588
  for doc_id in req.get("doc_ids", []):
api/apps/document_app.py CHANGED
@@ -45,7 +45,7 @@ from api.db.services.knowledgebase_service import KnowledgebaseService
45
  from api.utils.api_utils import server_error_response, get_data_error_result, validate_request
46
  from api.utils import get_uuid
47
  from api.db import FileType, TaskStatus, ParserType, FileSource, LLMType
48
- from api.db.services.document_service import DocumentService
49
  from api.settings import RetCode, stat_logger
50
  from api.utils.api_utils import get_json_result
51
  from rag.utils.minio_conn import MINIO
@@ -75,7 +75,7 @@ def upload():
75
  if not e:
76
  raise LookupError("Can't find this knowledgebase!")
77
 
78
- err, _ = FileService.upload_document(kb, file_objs)
79
  if err:
80
  return get_json_result(
81
  data=False, retmsg="\n".join(err), retcode=RetCode.SERVER_ERROR)
@@ -212,7 +212,7 @@ def docinfos():
212
 
213
 
214
  @manager.route('/thumbnails', methods=['GET'])
215
- @login_required
216
  def thumbnails():
217
  doc_ids = request.args.get("doc_ids").split(",")
218
  if not doc_ids:
@@ -460,7 +460,6 @@ def get_image(image_id):
460
  @login_required
461
  @validate_request("conversation_id")
462
  def upload_and_parse():
463
- from rag.app import presentation, picture, naive, audio, email
464
  if 'file' not in request.files:
465
  return get_json_result(
466
  data=False, retmsg='No file part!', retcode=RetCode.ARGUMENT_ERROR)
@@ -471,124 +470,6 @@ def upload_and_parse():
471
  return get_json_result(
472
  data=False, retmsg='No file selected!', retcode=RetCode.ARGUMENT_ERROR)
473
 
474
- e, conv = ConversationService.get_by_id(request.form.get("conversation_id"))
475
- if not e:
476
- return get_data_error_result(retmsg="Conversation not found!")
477
- e, dia = DialogService.get_by_id(conv.dialog_id)
478
- kb_id = dia.kb_ids[0]
479
- e, kb = KnowledgebaseService.get_by_id(kb_id)
480
- if not e:
481
- raise LookupError("Can't find this knowledgebase!")
482
-
483
- idxnm = search.index_name(kb.tenant_id)
484
- if not ELASTICSEARCH.indexExist(idxnm):
485
- ELASTICSEARCH.createIdx(idxnm, json.load(
486
- open(os.path.join(get_project_base_directory(), "conf", "mapping.json"), "r")))
487
-
488
- embd_mdl = LLMBundle(kb.tenant_id, LLMType.EMBEDDING, llm_name=kb.embd_id, lang=kb.language)
489
-
490
- err, files = FileService.upload_document(kb, file_objs)
491
- if err:
492
- return get_json_result(
493
- data=False, retmsg="\n".join(err), retcode=RetCode.SERVER_ERROR)
494
-
495
- def dummy(prog=None, msg=""):
496
- pass
497
-
498
- FACTORY = {
499
- ParserType.PRESENTATION.value: presentation,
500
- ParserType.PICTURE.value: picture,
501
- ParserType.AUDIO.value: audio,
502
- ParserType.EMAIL.value: email
503
- }
504
- parser_config = {"chunk_token_num": 4096, "delimiter": "\n!?;。;!?", "layout_recognize": False}
505
- exe = ThreadPoolExecutor(max_workers=12)
506
- threads = []
507
- for d, blob in files:
508
- kwargs = {
509
- "callback": dummy,
510
- "parser_config": parser_config,
511
- "from_page": 0,
512
- "to_page": 100000,
513
- "tenant_id": kb.tenant_id,
514
- "lang": kb.language
515
- }
516
- threads.append(exe.submit(FACTORY.get(d["parser_id"], naive).chunk, d["name"], blob, **kwargs))
517
 
518
- for (docinfo,_), th in zip(files, threads):
519
- docs = []
520
- doc = {
521
- "doc_id": docinfo["id"],
522
- "kb_id": [kb.id]
523
- }
524
- for ck in th.result():
525
- d = deepcopy(doc)
526
- d.update(ck)
527
- md5 = hashlib.md5()
528
- md5.update((ck["content_with_weight"] +
529
- str(d["doc_id"])).encode("utf-8"))
530
- d["_id"] = md5.hexdigest()
531
- d["create_time"] = str(datetime.datetime.now()).replace("T", " ")[:19]
532
- d["create_timestamp_flt"] = datetime.datetime.now().timestamp()
533
- if not d.get("image"):
534
- docs.append(d)
535
- continue
536
-
537
- output_buffer = BytesIO()
538
- if isinstance(d["image"], bytes):
539
- output_buffer = BytesIO(d["image"])
540
- else:
541
- d["image"].save(output_buffer, format='JPEG')
542
-
543
- MINIO.put(kb.id, d["_id"], output_buffer.getvalue())
544
- d["img_id"] = "{}-{}".format(kb.id, d["_id"])
545
- del d["image"]
546
- docs.append(d)
547
-
548
- parser_ids = {d["id"]: d["parser_id"] for d, _ in files}
549
- docids = [d["id"] for d, _ in files]
550
- chunk_counts = {id: 0 for id in docids}
551
- token_counts = {id: 0 for id in docids}
552
- es_bulk_size = 64
553
-
554
- def embedding(doc_id, cnts, batch_size=16):
555
- nonlocal embd_mdl, chunk_counts, token_counts
556
- vects = []
557
- for i in range(0, len(cnts), batch_size):
558
- vts, c = embd_mdl.encode(cnts[i: i + batch_size])
559
- vects.extend(vts.tolist())
560
- chunk_counts[doc_id] += len(cnts[i:i + batch_size])
561
- token_counts[doc_id] += c
562
- return vects
563
-
564
- _, tenant = TenantService.get_by_id(kb.tenant_id)
565
- llm_bdl = LLMBundle(kb.tenant_id, LLMType.CHAT, tenant.llm_id)
566
- for doc_id in docids:
567
- cks = [c for c in docs if c["doc_id"] == doc_id]
568
-
569
- if False and parser_ids[doc_id] != ParserType.PICTURE.value:
570
- mindmap = MindMapExtractor(llm_bdl)
571
- try:
572
- mind_map = json.dumps(mindmap([c["content_with_weight"] for c in docs if c["doc_id"] == doc_id]).output, ensure_ascii=False, indent=2)
573
- if len(mind_map) < 32: raise Exception("Few content: "+mind_map)
574
- cks.append({
575
- "doc_id": doc_id,
576
- "kb_id": [kb.id],
577
- "content_with_weight": mind_map,
578
- "knowledge_graph_kwd": "mind_map"
579
- })
580
- except Exception as e:
581
- stat_logger.error("Mind map generation error:", traceback.format_exc())
582
-
583
- vects = embedding(doc_id, [c["content_with_weight"] for c in cks])
584
- assert len(cks) == len(vects)
585
- for i, d in enumerate(cks):
586
- v = vects[i]
587
- d["q_%d_vec" % len(v)] = v
588
- for b in range(0, len(cks), es_bulk_size):
589
- ELASTICSEARCH.bulk(cks[b:b + es_bulk_size], idxnm)
590
-
591
- DocumentService.increment_chunk_num(
592
- doc_id, kb.id, token_counts[doc_id], chunk_counts[doc_id], 0)
593
-
594
- return get_json_result(data=[d["id"] for d,_ in files])
 
45
  from api.utils.api_utils import server_error_response, get_data_error_result, validate_request
46
  from api.utils import get_uuid
47
  from api.db import FileType, TaskStatus, ParserType, FileSource, LLMType
48
+ from api.db.services.document_service import DocumentService, doc_upload_and_parse
49
  from api.settings import RetCode, stat_logger
50
  from api.utils.api_utils import get_json_result
51
  from rag.utils.minio_conn import MINIO
 
75
  if not e:
76
  raise LookupError("Can't find this knowledgebase!")
77
 
78
+ err, _ = FileService.upload_document(kb, file_objs, current_user.id)
79
  if err:
80
  return get_json_result(
81
  data=False, retmsg="\n".join(err), retcode=RetCode.SERVER_ERROR)
 
212
 
213
 
214
  @manager.route('/thumbnails', methods=['GET'])
215
+ #@login_required
216
  def thumbnails():
217
  doc_ids = request.args.get("doc_ids").split(",")
218
  if not doc_ids:
 
460
  @login_required
461
  @validate_request("conversation_id")
462
  def upload_and_parse():
 
463
  if 'file' not in request.files:
464
  return get_json_result(
465
  data=False, retmsg='No file part!', retcode=RetCode.ARGUMENT_ERROR)
 
470
  return get_json_result(
471
  data=False, retmsg='No file selected!', retcode=RetCode.ARGUMENT_ERROR)
472
 
473
+ doc_ids = doc_upload_and_parse(request.form.get("conversation_id"), file_objs, current_user.id)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
474
 
475
+ return get_json_result(data=doc_ids)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
api/db/services/document_service.py CHANGED
@@ -13,20 +13,29 @@
13
  # See the License for the specific language governing permissions and
14
  # limitations under the License.
15
  #
 
 
 
16
  import random
 
 
17
  from datetime import datetime
 
 
18
  from elasticsearch_dsl import Q
19
  from peewee import fn
20
 
21
  from api.db.db_utils import bulk_insert_into_db
22
  from api.settings import stat_logger
23
  from api.utils import current_timestamp, get_format_time, get_uuid
 
 
24
  from rag.settings import SVR_QUEUE_NAME
25
  from rag.utils.es_conn import ELASTICSEARCH
26
  from rag.utils.minio_conn import MINIO
27
  from rag.nlp import search
28
 
29
- from api.db import FileType, TaskStatus, ParserType
30
  from api.db.db_models import DB, Knowledgebase, Tenant, Task
31
  from api.db.db_models import Document
32
  from api.db.services.common_service import CommonService
@@ -380,3 +389,136 @@ def queue_raptor_tasks(doc):
380
  bulk_insert_into_db(Task, [task], True)
381
  task["type"] = "raptor"
382
  assert REDIS_CONN.queue_product(SVR_QUEUE_NAME, message=task), "Can't access Redis. Please check the Redis' status."
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
13
  # See the License for the specific language governing permissions and
14
  # limitations under the License.
15
  #
16
+ import hashlib
17
+ import json
18
+ import os
19
  import random
20
+ from concurrent.futures import ThreadPoolExecutor
21
+ from copy import deepcopy
22
  from datetime import datetime
23
+ from io import BytesIO
24
+
25
  from elasticsearch_dsl import Q
26
  from peewee import fn
27
 
28
  from api.db.db_utils import bulk_insert_into_db
29
  from api.settings import stat_logger
30
  from api.utils import current_timestamp, get_format_time, get_uuid
31
+ from api.utils.file_utils import get_project_base_directory
32
+ from graphrag.mind_map_extractor import MindMapExtractor
33
  from rag.settings import SVR_QUEUE_NAME
34
  from rag.utils.es_conn import ELASTICSEARCH
35
  from rag.utils.minio_conn import MINIO
36
  from rag.nlp import search
37
 
38
+ from api.db import FileType, TaskStatus, ParserType, LLMType
39
  from api.db.db_models import DB, Knowledgebase, Tenant, Task
40
  from api.db.db_models import Document
41
  from api.db.services.common_service import CommonService
 
389
  bulk_insert_into_db(Task, [task], True)
390
  task["type"] = "raptor"
391
  assert REDIS_CONN.queue_product(SVR_QUEUE_NAME, message=task), "Can't access Redis. Please check the Redis' status."
392
+
393
+
394
+ def doc_upload_and_parse(conversation_id, file_objs, user_id):
395
+ from rag.app import presentation, picture, naive, audio, email
396
+ from api.db.services.dialog_service import ConversationService, DialogService
397
+ from api.db.services.file_service import FileService
398
+ from api.db.services.llm_service import LLMBundle
399
+ from api.db.services.user_service import TenantService
400
+ from api.db.services.api_service import API4ConversationService
401
+
402
+ e, conv = ConversationService.get_by_id(conversation_id)
403
+ if not e:
404
+ e, conv = API4ConversationService.get_by_id(conversation_id)
405
+ assert e, "Conversation not found!"
406
+
407
+ e, dia = DialogService.get_by_id(conv.dialog_id)
408
+ kb_id = dia.kb_ids[0]
409
+ e, kb = KnowledgebaseService.get_by_id(kb_id)
410
+ if not e:
411
+ raise LookupError("Can't find this knowledgebase!")
412
+
413
+ idxnm = search.index_name(kb.tenant_id)
414
+ if not ELASTICSEARCH.indexExist(idxnm):
415
+ ELASTICSEARCH.createIdx(idxnm, json.load(
416
+ open(os.path.join(get_project_base_directory(), "conf", "mapping.json"), "r")))
417
+
418
+ embd_mdl = LLMBundle(kb.tenant_id, LLMType.EMBEDDING, llm_name=kb.embd_id, lang=kb.language)
419
+
420
+ err, files = FileService.upload_document(kb, file_objs, user_id)
421
+ assert not err, "\n".join(err)
422
+
423
+ def dummy(prog=None, msg=""):
424
+ pass
425
+
426
+ FACTORY = {
427
+ ParserType.PRESENTATION.value: presentation,
428
+ ParserType.PICTURE.value: picture,
429
+ ParserType.AUDIO.value: audio,
430
+ ParserType.EMAIL.value: email
431
+ }
432
+ parser_config = {"chunk_token_num": 4096, "delimiter": "\n!?;。;!?", "layout_recognize": False}
433
+ exe = ThreadPoolExecutor(max_workers=12)
434
+ threads = []
435
+ for d, blob in files:
436
+ kwargs = {
437
+ "callback": dummy,
438
+ "parser_config": parser_config,
439
+ "from_page": 0,
440
+ "to_page": 100000,
441
+ "tenant_id": kb.tenant_id,
442
+ "lang": kb.language
443
+ }
444
+ threads.append(exe.submit(FACTORY.get(d["parser_id"], naive).chunk, d["name"], blob, **kwargs))
445
+
446
+ for (docinfo, _), th in zip(files, threads):
447
+ docs = []
448
+ doc = {
449
+ "doc_id": docinfo["id"],
450
+ "kb_id": [kb.id]
451
+ }
452
+ for ck in th.result():
453
+ d = deepcopy(doc)
454
+ d.update(ck)
455
+ md5 = hashlib.md5()
456
+ md5.update((ck["content_with_weight"] +
457
+ str(d["doc_id"])).encode("utf-8"))
458
+ d["_id"] = md5.hexdigest()
459
+ d["create_time"] = str(datetime.now()).replace("T", " ")[:19]
460
+ d["create_timestamp_flt"] = datetime.now().timestamp()
461
+ if not d.get("image"):
462
+ docs.append(d)
463
+ continue
464
+
465
+ output_buffer = BytesIO()
466
+ if isinstance(d["image"], bytes):
467
+ output_buffer = BytesIO(d["image"])
468
+ else:
469
+ d["image"].save(output_buffer, format='JPEG')
470
+
471
+ MINIO.put(kb.id, d["_id"], output_buffer.getvalue())
472
+ d["img_id"] = "{}-{}".format(kb.id, d["_id"])
473
+ del d["image"]
474
+ docs.append(d)
475
+
476
+ parser_ids = {d["id"]: d["parser_id"] for d, _ in files}
477
+ docids = [d["id"] for d, _ in files]
478
+ chunk_counts = {id: 0 for id in docids}
479
+ token_counts = {id: 0 for id in docids}
480
+ es_bulk_size = 64
481
+
482
+ def embedding(doc_id, cnts, batch_size=16):
483
+ nonlocal embd_mdl, chunk_counts, token_counts
484
+ vects = []
485
+ for i in range(0, len(cnts), batch_size):
486
+ vts, c = embd_mdl.encode(cnts[i: i + batch_size])
487
+ vects.extend(vts.tolist())
488
+ chunk_counts[doc_id] += len(cnts[i:i + batch_size])
489
+ token_counts[doc_id] += c
490
+ return vects
491
+
492
+ _, tenant = TenantService.get_by_id(kb.tenant_id)
493
+ llm_bdl = LLMBundle(kb.tenant_id, LLMType.CHAT, tenant.llm_id)
494
+ for doc_id in docids:
495
+ cks = [c for c in docs if c["doc_id"] == doc_id]
496
+
497
+ if parser_ids[doc_id] != ParserType.PICTURE.value:
498
+ mindmap = MindMapExtractor(llm_bdl)
499
+ try:
500
+ mind_map = json.dumps(mindmap([c["content_with_weight"] for c in docs if c["doc_id"] == doc_id]).output,
501
+ ensure_ascii=False, indent=2)
502
+ if len(mind_map) < 32: raise Exception("Few content: " + mind_map)
503
+ cks.append({
504
+ "id": get_uuid(),
505
+ "doc_id": doc_id,
506
+ "kb_id": [kb.id],
507
+ "content_with_weight": mind_map,
508
+ "knowledge_graph_kwd": "mind_map"
509
+ })
510
+ except Exception as e:
511
+ stat_logger.error("Mind map generation error:", traceback.format_exc())
512
+
513
+ vects = embedding(doc_id, [c["content_with_weight"] for c in cks])
514
+ assert len(cks) == len(vects)
515
+ for i, d in enumerate(cks):
516
+ v = vects[i]
517
+ d["q_%d_vec" % len(v)] = v
518
+ for b in range(0, len(cks), es_bulk_size):
519
+ ELASTICSEARCH.bulk(cks[b:b + es_bulk_size], idxnm)
520
+
521
+ DocumentService.increment_chunk_num(
522
+ doc_id, kb.id, token_counts[doc_id], chunk_counts[doc_id], 0)
523
+
524
+ return [d["id"] for d,_ in files]
api/db/services/file_service.py CHANGED
@@ -327,11 +327,11 @@ class FileService(CommonService):
327
 
328
  @classmethod
329
  @DB.connection_context()
330
- def upload_document(self, kb, file_objs):
331
- root_folder = self.get_root_folder(current_user.id)
332
  pf_id = root_folder["id"]
333
- self.init_knowledgebase_docs(pf_id, current_user.id)
334
- kb_root_folder = self.get_kb_folder(current_user.id)
335
  kb_folder = self.new_a_file_from_kb(kb.tenant_id, kb.name, kb_root_folder["id"])
336
 
337
  err, files = [], []
@@ -359,7 +359,7 @@ class FileService(CommonService):
359
  "kb_id": kb.id,
360
  "parser_id": kb.parser_id,
361
  "parser_config": kb.parser_config,
362
- "created_by": current_user.id,
363
  "type": filetype,
364
  "name": filename,
365
  "location": location,
 
327
 
328
  @classmethod
329
  @DB.connection_context()
330
+ def upload_document(self, kb, file_objs, user_id):
331
+ root_folder = self.get_root_folder(user_id)
332
  pf_id = root_folder["id"]
333
+ self.init_knowledgebase_docs(pf_id, user_id)
334
+ kb_root_folder = self.get_kb_folder(user_id)
335
  kb_folder = self.new_a_file_from_kb(kb.tenant_id, kb.name, kb_root_folder["id"])
336
 
337
  err, files = [], []
 
359
  "kb_id": kb.id,
360
  "parser_id": kb.parser_id,
361
  "parser_config": kb.parser_config,
362
+ "created_by": user_id,
363
  "type": filetype,
364
  "name": filename,
365
  "location": location,