KevinHuSh commited on
Commit
b9d91e7
·
1 Parent(s): 10b4a61

add redis to accelerate access of minio (#482)

Browse files

### What problem does this PR solve?

### Type of change

- [x] New Feature (non-breaking change which adds functionality)

api/apps/__init__.py CHANGED
@@ -54,7 +54,7 @@ app.errorhandler(Exception)(server_error_response)
54
  #app.config["LOGIN_DISABLED"] = True
55
  app.config["SESSION_PERMANENT"] = False
56
  app.config["SESSION_TYPE"] = "filesystem"
57
- app.config['MAX_CONTENT_LENGTH'] = os.environ.get("MAX_CONTENT_LENGTH", 128 * 1024 * 1024)
58
 
59
  Session(app)
60
  login_manager = LoginManager()
 
54
  #app.config["LOGIN_DISABLED"] = True
55
  app.config["SESSION_PERMANENT"] = False
56
  app.config["SESSION_TYPE"] = "filesystem"
57
+ app.config['MAX_CONTENT_LENGTH'] = int(os.environ.get("MAX_CONTENT_LENGTH", 128 * 1024 * 1024))
58
 
59
  Session(app)
60
  login_manager = LoginManager()
api/utils/file_utils.py CHANGED
@@ -147,7 +147,7 @@ def filename_type(filename):
147
  return FileType.PDF.value
148
 
149
  if re.match(
150
- r".*\.(docx|doc|ppt|pptx|yml|xml|htm|json|csv|txt|ini|xls|xlsx|wps|rtf|hlp|pages|numbers|key|md)$", filename):
151
  return FileType.DOC.value
152
 
153
  if re.match(
 
147
  return FileType.PDF.value
148
 
149
  if re.match(
150
+ r".*\.(docx|ppt|pptx|yml|xml|htm|json|csv|txt|ini|xls|xlsx|wps|rtf|hlp|pages|numbers|key|md)$", filename):
151
  return FileType.DOC.value
152
 
153
  if re.match(
conf/mapping.json CHANGED
@@ -1,7 +1,7 @@
1
  {
2
  "settings": {
3
  "index": {
4
- "number_of_shards": 4,
5
  "number_of_replicas": 0,
6
  "refresh_interval" : "1000ms"
7
  },
 
1
  {
2
  "settings": {
3
  "index": {
4
+ "number_of_shards": 2,
5
  "number_of_replicas": 0,
6
  "refresh_interval" : "1000ms"
7
  },
conf/service_conf.yaml CHANGED
@@ -13,11 +13,16 @@ minio:
13
  user: 'rag_flow'
14
  password: 'infini_rag_flow'
15
  host: 'minio:9000'
 
 
 
 
16
  es:
17
  hosts: 'http://es01:9200'
18
  user_default_llm:
19
  factory: 'Tongyi-Qianwen'
20
  api_key: 'sk-xxxxxxxxxxxxx'
 
21
  oauth:
22
  github:
23
  client_id: xxxxxxxxxxxxxxxxxxxxxxxxx
 
13
  user: 'rag_flow'
14
  password: 'infini_rag_flow'
15
  host: 'minio:9000'
16
+ redis:
17
+ db: 1
18
+ password: 'infini_rag_flow'
19
+ host: 'redis:6379'
20
  es:
21
  hosts: 'http://es01:9200'
22
  user_default_llm:
23
  factory: 'Tongyi-Qianwen'
24
  api_key: 'sk-xxxxxxxxxxxxx'
25
+ base_url: ''
26
  oauth:
27
  github:
28
  client_id: xxxxxxxxxxxxxxxxxxxxxxxxx
docker/service_conf.yaml CHANGED
@@ -13,6 +13,10 @@ minio:
13
  user: 'rag_flow'
14
  password: 'infini_rag_flow'
15
  host: 'minio:9000'
 
 
 
 
16
  es:
17
  hosts: 'http://es01:9200'
18
  user_default_llm:
 
13
  user: 'rag_flow'
14
  password: 'infini_rag_flow'
15
  host: 'minio:9000'
16
+ redis:
17
+ db: 1
18
+ password: 'infini_rag_flow'
19
+ host: 'redis:6379'
20
  es:
21
  hosts: 'http://es01:9200'
22
  user_default_llm:
rag/app/naive.py CHANGED
@@ -66,6 +66,8 @@ class Docx(DocxParser):
66
  class Pdf(PdfParser):
67
  def __call__(self, filename, binary=None, from_page=0,
68
  to_page=100000, zoomin=3, callback=None):
 
 
69
  callback(msg="OCR is running...")
70
  self.__images__(
71
  filename if not binary else binary,
@@ -75,8 +77,8 @@ class Pdf(PdfParser):
75
  callback
76
  )
77
  callback(msg="OCR finished")
 
78
 
79
- from timeit import default_timer as timer
80
  start = timer()
81
  self._layouts_rec(zoomin)
82
  callback(0.63, "Layout analysis finished.")
@@ -90,7 +92,7 @@ class Pdf(PdfParser):
90
  self._concat_downward()
91
  #self._filter_forpages()
92
 
93
- cron_logger.info("paddle layouts:".format(
94
  (timer() - start) / (self.total_page + 0.1)))
95
  return [(b["text"], self._line_tag(b, zoomin))
96
  for b in self.boxes], tbls
 
66
  class Pdf(PdfParser):
67
  def __call__(self, filename, binary=None, from_page=0,
68
  to_page=100000, zoomin=3, callback=None):
69
+ from timeit import default_timer as timer
70
+ start = timer()
71
  callback(msg="OCR is running...")
72
  self.__images__(
73
  filename if not binary else binary,
 
77
  callback
78
  )
79
  callback(msg="OCR finished")
80
+ cron_logger.info("OCR: {}".format(timer() - start))
81
 
 
82
  start = timer()
83
  self._layouts_rec(zoomin)
84
  callback(0.63, "Layout analysis finished.")
 
92
  self._concat_downward()
93
  #self._filter_forpages()
94
 
95
+ cron_logger.info("paddle layouts: {}".format(
96
  (timer() - start) / (self.total_page + 0.1)))
97
  return [(b["text"], self._line_tag(b, zoomin))
98
  for b in self.boxes], tbls
rag/settings.py CHANGED
@@ -25,6 +25,11 @@ SUBPROCESS_STD_LOG_NAME = "std.log"
25
 
26
  ES = get_base_config("es", {})
27
  MINIO = decrypt_database_config(name="minio")
 
 
 
 
 
28
  DOC_MAXIMUM_SIZE = 128 * 1024 * 1024
29
 
30
  # Logger
@@ -39,5 +44,6 @@ LoggerFactory.LEVEL = 30
39
  es_logger = getLogger("es")
40
  minio_logger = getLogger("minio")
41
  cron_logger = getLogger("cron_logger")
 
42
  chunk_logger = getLogger("chunk_logger")
43
  database_logger = getLogger("database")
 
25
 
26
  ES = get_base_config("es", {})
27
  MINIO = decrypt_database_config(name="minio")
28
+ try:
29
+ REDIS = decrypt_database_config(name="redis")
30
+ except Exception as e:
31
+ REDIS = {}
32
+ pass
33
  DOC_MAXIMUM_SIZE = 128 * 1024 * 1024
34
 
35
  # Logger
 
44
  es_logger = getLogger("es")
45
  minio_logger = getLogger("minio")
46
  cron_logger = getLogger("cron_logger")
47
+ cron_logger.setLevel(20)
48
  chunk_logger = getLogger("chunk_logger")
49
  database_logger = getLogger("database")
rag/svr/task_broker.py CHANGED
@@ -32,6 +32,7 @@ from api.db.services.document_service import DocumentService
32
  from api.settings import database_logger
33
  from api.utils import get_format_time, get_uuid
34
  from api.utils.file_utils import get_project_base_directory
 
35
 
36
 
37
  def collect(tm):
@@ -84,10 +85,16 @@ def dispatch():
84
 
85
  tsks = []
86
  try:
 
 
 
 
 
 
 
87
  if r["type"] == FileType.PDF.value:
88
  do_layout = r["parser_config"].get("layout_recognize", True)
89
- pages = PdfParser.total_page_number(
90
- r["name"], MINIO.get(r["kb_id"], r["location"]))
91
  page_size = r["parser_config"].get("task_page_size", 12)
92
  if r["parser_id"] == "paper":
93
  page_size = r["parser_config"].get("task_page_size", 22)
@@ -110,8 +117,7 @@ def dispatch():
110
 
111
  elif r["parser_id"] == "table":
112
  rn = HuExcelParser.row_number(
113
- r["name"], MINIO.get(
114
- r["kb_id"], r["location"]))
115
  for i in range(0, rn, 3000):
116
  task = new_task()
117
  task["from_page"] = i
 
32
  from api.settings import database_logger
33
  from api.utils import get_format_time, get_uuid
34
  from api.utils.file_utils import get_project_base_directory
35
+ from rag.utils.redis_conn import REDIS_CONN
36
 
37
 
38
  def collect(tm):
 
85
 
86
  tsks = []
87
  try:
88
+ file_bin = MINIO.get(r["kb_id"], r["location"])
89
+ if REDIS_CONN.is_alive():
90
+ try:
91
+ REDIS_CONN.set("{}/{}".format(r["kb_id"], r["location"]), file_bin, 12*60)
92
+ except Exception as e:
93
+ cron_logger.warning("Put into redis[EXCEPTION]:" + str(e))
94
+
95
  if r["type"] == FileType.PDF.value:
96
  do_layout = r["parser_config"].get("layout_recognize", True)
97
+ pages = PdfParser.total_page_number(r["name"], file_bin)
 
98
  page_size = r["parser_config"].get("task_page_size", 12)
99
  if r["parser_id"] == "paper":
100
  page_size = r["parser_config"].get("task_page_size", 22)
 
117
 
118
  elif r["parser_id"] == "table":
119
  rn = HuExcelParser.row_number(
120
+ r["name"], file_bin)
 
121
  for i in range(0, rn, 3000):
122
  task = new_task()
123
  task["from_page"] = i
rag/svr/task_executor.py CHANGED
@@ -19,13 +19,12 @@ import logging
19
  import os
20
  import hashlib
21
  import copy
22
- import random
23
  import re
24
  import sys
25
  import time
26
  import traceback
27
  from functools import partial
28
-
29
  from api.db.db_models import close_connection
30
  from rag.settings import database_logger
31
  from rag.settings import cron_logger, DOC_MAXIMUM_SIZE
@@ -35,7 +34,7 @@ from elasticsearch_dsl import Q
35
  from multiprocessing.context import TimeoutError
36
  from api.db.services.task_service import TaskService
37
  from rag.utils import ELASTICSEARCH
38
- from rag.utils import MINIO
39
  from rag.utils import rmSpace, findMaxTm
40
 
41
  from rag.nlp import search
@@ -48,6 +47,7 @@ from api.db import LLMType, ParserType
48
  from api.db.services.document_service import DocumentService
49
  from api.db.services.llm_service import LLMBundle
50
  from api.utils.file_utils import get_project_base_directory
 
51
 
52
  BATCH_SIZE = 64
53
 
@@ -105,11 +105,16 @@ def collect(comm, mod, tm):
105
 
106
  def get_minio_binary(bucket, name):
107
  global MINIO
 
 
 
 
 
 
108
  return MINIO.get(bucket, name)
109
 
110
 
111
  def build(row):
112
- from timeit import default_timer as timer
113
  if row["size"] > DOC_MAXIMUM_SIZE:
114
  set_progress(row["id"], prog=-1, msg="File size exceeds( <= %dMb )" %
115
  (int(DOC_MAXIMUM_SIZE / 1024 / 1024)))
@@ -265,6 +270,7 @@ def main(comm, mod):
265
  callback(
266
  msg="Finished slicing files(%d). Start to embedding the content." %
267
  len(cks))
 
268
  try:
269
  tk_count = embedding(cks, embd_mdl, r["parser_config"], callback)
270
  except Exception as e:
@@ -272,9 +278,10 @@ def main(comm, mod):
272
  cron_logger.error(str(e))
273
  tk_count = 0
274
 
275
- callback(msg="Finished embedding! Start to build index!")
276
  init_kb(r)
277
  chunk_count = len(set([c["_id"] for c in cks]))
 
278
  es_r = ELASTICSEARCH.bulk(cks, search.index_name(r["tenant_id"]))
279
  if es_r:
280
  callback(-1, "Index failure!")
@@ -290,8 +297,8 @@ def main(comm, mod):
290
  DocumentService.increment_chunk_num(
291
  r["doc_id"], r["kb_id"], tk_count, chunk_count, 0)
292
  cron_logger.info(
293
- "Chunk doc({}), token({}), chunks({})".format(
294
- r["id"], tk_count, len(cks)))
295
 
296
  tmf.write(str(r["update_time"]) + "\n")
297
  tmf.close()
 
19
  import os
20
  import hashlib
21
  import copy
 
22
  import re
23
  import sys
24
  import time
25
  import traceback
26
  from functools import partial
27
+ from rag.utils import MINIO
28
  from api.db.db_models import close_connection
29
  from rag.settings import database_logger
30
  from rag.settings import cron_logger, DOC_MAXIMUM_SIZE
 
34
  from multiprocessing.context import TimeoutError
35
  from api.db.services.task_service import TaskService
36
  from rag.utils import ELASTICSEARCH
37
+ from timeit import default_timer as timer
38
  from rag.utils import rmSpace, findMaxTm
39
 
40
  from rag.nlp import search
 
47
  from api.db.services.document_service import DocumentService
48
  from api.db.services.llm_service import LLMBundle
49
  from api.utils.file_utils import get_project_base_directory
50
+ from rag.utils.redis_conn import REDIS_CONN
51
 
52
  BATCH_SIZE = 64
53
 
 
105
 
106
  def get_minio_binary(bucket, name):
107
  global MINIO
108
+ if REDIS_CONN.is_alive():
109
+ try:
110
+ r = REDIS_CONN.get("{}/{}".format(bucket, name))
111
+ if r: return r
112
+ except Exception as e:
113
+ cron_logger.warning("Get redis[EXCEPTION]:" + str(e))
114
  return MINIO.get(bucket, name)
115
 
116
 
117
  def build(row):
 
118
  if row["size"] > DOC_MAXIMUM_SIZE:
119
  set_progress(row["id"], prog=-1, msg="File size exceeds( <= %dMb )" %
120
  (int(DOC_MAXIMUM_SIZE / 1024 / 1024)))
 
270
  callback(
271
  msg="Finished slicing files(%d). Start to embedding the content." %
272
  len(cks))
273
+ st = timer()
274
  try:
275
  tk_count = embedding(cks, embd_mdl, r["parser_config"], callback)
276
  except Exception as e:
 
278
  cron_logger.error(str(e))
279
  tk_count = 0
280
 
281
+ callback(msg="Finished embedding({})! Start to build index!".format(timer()-st))
282
  init_kb(r)
283
  chunk_count = len(set([c["_id"] for c in cks]))
284
+ st = timer()
285
  es_r = ELASTICSEARCH.bulk(cks, search.index_name(r["tenant_id"]))
286
  if es_r:
287
  callback(-1, "Index failure!")
 
297
  DocumentService.increment_chunk_num(
298
  r["doc_id"], r["kb_id"], tk_count, chunk_count, 0)
299
  cron_logger.info(
300
+ "Chunk doc({}), token({}), chunks({}), elapsed:{}".format(
301
+ r["id"], tk_count, len(cks), timer()-st))
302
 
303
  tmf.write(str(r["update_time"]) + "\n")
304
  tmf.close()
rag/utils/redis_conn.py ADDED
@@ -0,0 +1,55 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+
3
+ import redis
4
+ import logging
5
+ from rag import settings
6
+ from rag.utils import singleton
7
+
8
+ @singleton
9
+ class RedisDB:
10
+ def __init__(self):
11
+ self.REDIS = None
12
+ self.config = settings.REDIS
13
+ self.__open__()
14
+
15
+ def __open__(self):
16
+ try:
17
+ self.REDIS = redis.Redis(host=self.config.get("host", "redis").split(":")[0],
18
+ port=int(self.config.get("host", ":6379").split(":")[1]),
19
+ db=int(self.config.get("db", 1)),
20
+ password=self.config.get("password"))
21
+ except Exception as e:
22
+ logging.warning("Redis can't be connected.")
23
+ return self.REDIS
24
+
25
+ def is_alive(self):
26
+ return self.REDIS is not None
27
+
28
+ def get(self, k):
29
+ if not self.REDIS: return
30
+ try:
31
+ return self.REDIS.get(k)
32
+ except Exception as e:
33
+ logging.warning("[EXCEPTION]get" + str(k) + "||" + str(e))
34
+ self.__open__()
35
+
36
+ def set_obj(self, k, obj, exp=3600):
37
+ try:
38
+ self.REDIS.set(k, json.dumps(obj, ensure_ascii=False), exp)
39
+ return True
40
+ except Exception as e:
41
+ logging.warning("[EXCEPTION]set_obj" + str(k) + "||" + str(e))
42
+ self.__open__()
43
+ return False
44
+
45
+ def set(self, k, v, exp=3600):
46
+ try:
47
+ self.REDIS.set(k, v, exp)
48
+ return True
49
+ except Exception as e:
50
+ logging.warning("[EXCEPTION]set" + str(k) + "||" + str(e))
51
+ self.__open__()
52
+ return False
53
+
54
+
55
+ REDIS_CONN = RedisDB()