KevinHuSh commited on
Commit
14174de
·
1 Parent(s): 7c07000

fix #258 task_executor occupy cpu too much (#288)

Browse files

### What problem does this PR solve?

Issue link:#285

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)

rag/nlp/query.py CHANGED
@@ -62,7 +62,7 @@ class EsQueryer:
62
  return Q("bool",
63
  must=Q("query_string", fields=self.flds,
64
  type="best_fields", query=" ".join(q),
65
- boost=1, minimum_should_match=min_match)
66
  ), tks
67
 
68
  def needQieqie(tk):
 
62
  return Q("bool",
63
  must=Q("query_string", fields=self.flds,
64
  type="best_fields", query=" ".join(q),
65
+ boost=1)#, minimum_should_match=min_match)
66
  ), tks
67
 
68
  def needQieqie(tk):
rag/svr/task_executor.py CHANGED
@@ -21,16 +21,15 @@ import hashlib
21
  import copy
22
  import re
23
  import sys
 
24
  import traceback
25
  from functools import partial
26
- import signal
27
- from contextlib import contextmanager
28
  from rag.settings import database_logger
29
  from rag.settings import cron_logger, DOC_MAXIMUM_SIZE
30
-
31
  import numpy as np
32
  from elasticsearch_dsl import Q
33
-
34
  from api.db.services.task_service import TaskService
35
  from rag.utils import ELASTICSEARCH
36
  from rag.utils import MINIO
@@ -92,23 +91,17 @@ def set_progress(task_id, from_page=0, to_page=-1,
92
  def collect(comm, mod, tm):
93
  tasks = TaskService.get_tasks(tm, mod, comm)
94
  if len(tasks) == 0:
 
95
  return pd.DataFrame()
96
  tasks = pd.DataFrame(tasks)
97
  mtm = tasks["update_time"].max()
98
  cron_logger.info("TOTAL:{}, To:{}".format(len(tasks), mtm))
99
  return tasks
100
 
101
- @contextmanager
102
- def timeout(time):
103
- # Register a function to raise a TimeoutError on the signal.
104
- signal.signal(signal.SIGALRM, raise_timeout)
105
- # Schedule the signal to be sent after ``time``.
106
- signal.alarm(time)
107
- yield
108
-
109
 
110
- def raise_timeout(signum, frame):
111
- raise TimeoutError
 
112
 
113
 
114
  def build(row):
@@ -124,24 +117,34 @@ def build(row):
124
  row["from_page"],
125
  row["to_page"])
126
  chunker = FACTORY[row["parser_id"].lower()]
 
127
  try:
128
  st = timer()
129
- with timeout(30):
130
- binary = MINIO.get(row["kb_id"], row["location"])
 
 
 
131
  cks = chunker.chunk(row["name"], binary=binary, from_page=row["from_page"],
132
  to_page=row["to_page"], lang=row["language"], callback=callback,
133
  kb_id=row["kb_id"], parser_config=row["parser_config"], tenant_id=row["tenant_id"])
134
  cron_logger.info(
135
  "Chunkking({}) {}/{}".format(timer()-st, row["location"], row["name"]))
 
 
 
 
 
136
  except Exception as e:
137
  if re.search("(No such file|not found)", str(e)):
138
  callback(-1, "Can not find file <%s>" % row["name"])
139
  else:
140
  callback(-1, f"Internal server error: %s" %
141
  str(e).replace("'", ""))
 
142
  traceback.print_exc()
143
 
144
- cron_logger.warn(
145
  "Chunkking {}/{}: {}".format(row["location"], row["name"], str(e)))
146
 
147
  return
 
21
  import copy
22
  import re
23
  import sys
24
+ import time
25
  import traceback
26
  from functools import partial
 
 
27
  from rag.settings import database_logger
28
  from rag.settings import cron_logger, DOC_MAXIMUM_SIZE
29
+ from multiprocessing import Pool
30
  import numpy as np
31
  from elasticsearch_dsl import Q
32
+ from multiprocessing.context import TimeoutError
33
  from api.db.services.task_service import TaskService
34
  from rag.utils import ELASTICSEARCH
35
  from rag.utils import MINIO
 
91
  def collect(comm, mod, tm):
92
  tasks = TaskService.get_tasks(tm, mod, comm)
93
  if len(tasks) == 0:
94
+ time.sleep(1)
95
  return pd.DataFrame()
96
  tasks = pd.DataFrame(tasks)
97
  mtm = tasks["update_time"].max()
98
  cron_logger.info("TOTAL:{}, To:{}".format(len(tasks), mtm))
99
  return tasks
100
 
 
 
 
 
 
 
 
 
101
 
102
+ def get_minio_binary(bucket, name):
103
+ global MINIO
104
+ return MINIO.get(bucket, name)
105
 
106
 
107
  def build(row):
 
117
  row["from_page"],
118
  row["to_page"])
119
  chunker = FACTORY[row["parser_id"].lower()]
120
+ pool = Pool(processes=1)
121
  try:
122
  st = timer()
123
+ thr = pool.apply_async(get_minio_binary, args=(row["kb_id"], row["location"]))
124
+ binary = thr.get(timeout=90)
125
+ pool.terminate()
126
+ cron_logger.info(
127
+ "From minio({}) {}/{}".format(timer()-st, row["location"], row["name"]))
128
  cks = chunker.chunk(row["name"], binary=binary, from_page=row["from_page"],
129
  to_page=row["to_page"], lang=row["language"], callback=callback,
130
  kb_id=row["kb_id"], parser_config=row["parser_config"], tenant_id=row["tenant_id"])
131
  cron_logger.info(
132
  "Chunkking({}) {}/{}".format(timer()-st, row["location"], row["name"]))
133
+ except TimeoutError as e:
134
+ callback(-1, f"Internal server error: Fetch file timeout. Could you try it again.")
135
+ cron_logger.error(
136
+ "Chunkking {}/{}: Fetch file timeout.".format(row["location"], row["name"]))
137
+ return
138
  except Exception as e:
139
  if re.search("(No such file|not found)", str(e)):
140
  callback(-1, "Can not find file <%s>" % row["name"])
141
  else:
142
  callback(-1, f"Internal server error: %s" %
143
  str(e).replace("'", ""))
144
+ pool.terminate()
145
  traceback.print_exc()
146
 
147
+ cron_logger.error(
148
  "Chunkking {}/{}: {}".format(row["location"], row["name"], str(e)))
149
 
150
  return
rag/utils/minio_conn.py CHANGED
@@ -58,7 +58,7 @@ class HuMinio(object):
58
 
59
 
60
  def get(self, bucket, fnm):
61
- for _ in range(10):
62
  try:
63
  r = self.conn.get_object(bucket, fnm)
64
  return r.read()
 
58
 
59
 
60
  def get(self, bucket, fnm):
61
+ for _ in range(1):
62
  try:
63
  r = self.conn.get_object(bucket, fnm)
64
  return r.read()