zhichyu commited on
Commit
4fd5400
·
1 Parent(s): c5b9023

Fix logs. Use dict.pop instead of del. Close #3473 (#3484)

Browse files

### What problem does this PR solve?

Fix logs. Use dict.pop instead of del.

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)

api/apps/api_app.py CHANGED
@@ -839,8 +839,7 @@ def retrieval():
839
  similarity_threshold, vector_similarity_weight, top,
840
  doc_ids, rerank_mdl=rerank_mdl)
841
  for c in ranks["chunks"]:
842
- if "vector" in c:
843
- del c["vector"]
844
  return get_json_result(data=ranks)
845
  except Exception as e:
846
  if str(e).find("not_found") > 0:
 
839
  similarity_threshold, vector_similarity_weight, top,
840
  doc_ids, rerank_mdl=rerank_mdl)
841
  for c in ranks["chunks"]:
842
+ c.pop("vector", None)
 
843
  return get_json_result(data=ranks)
844
  except Exception as e:
845
  if str(e).find("not_found") > 0:
api/apps/chunk_app.py CHANGED
@@ -287,8 +287,7 @@ def retrieval_test():
287
  similarity_threshold, vector_similarity_weight, top,
288
  doc_ids, rerank_mdl=rerank_mdl, highlight=req.get("highlight"))
289
  for c in ranks["chunks"]:
290
- if "vector" in c:
291
- del c["vector"]
292
 
293
  return get_json_result(data=ranks)
294
  except Exception as e:
 
287
  similarity_threshold, vector_similarity_weight, top,
288
  doc_ids, rerank_mdl=rerank_mdl, highlight=req.get("highlight"))
289
  for c in ranks["chunks"]:
290
+ c.pop("vector", None)
 
291
 
292
  return get_json_result(data=ranks)
293
  except Exception as e:
api/apps/sdk/dify_retrieval.py CHANGED
@@ -58,8 +58,7 @@ def retrieval(tenant_id):
58
  )
59
  records = []
60
  for c in ranks["chunks"]:
61
- if "vector" in c:
62
- del c["vector"]
63
  records.append({
64
  "content": c["content_ltks"],
65
  "score": c["similarity"],
 
58
  )
59
  records = []
60
  for c in ranks["chunks"]:
61
+ c.pop("vector", None)
 
62
  records.append({
63
  "content": c["content_ltks"],
64
  "score": c["similarity"],
api/apps/sdk/doc.py CHANGED
@@ -37,7 +37,6 @@ from api.db.services.document_service import DocumentService
37
  from api.db.services.file2document_service import File2DocumentService
38
  from api.db.services.file_service import FileService
39
  from api.db.services.knowledgebase_service import KnowledgebaseService
40
- from api import settings
41
  from api.utils.api_utils import construct_json_result, get_parser_config
42
  from rag.nlp import search
43
  from rag.utils import rmSpace
@@ -1342,8 +1341,7 @@ def retrieval_test(tenant_id):
1342
  highlight=highlight,
1343
  )
1344
  for c in ranks["chunks"]:
1345
- if "vector" in c:
1346
- del c["vector"]
1347
 
1348
  ##rename keys
1349
  renamed_chunks = []
 
37
  from api.db.services.file2document_service import File2DocumentService
38
  from api.db.services.file_service import FileService
39
  from api.db.services.knowledgebase_service import KnowledgebaseService
 
40
  from api.utils.api_utils import construct_json_result, get_parser_config
41
  from rag.nlp import search
42
  from rag.utils import rmSpace
 
1341
  highlight=highlight,
1342
  )
1343
  for c in ranks["chunks"]:
1344
+ c.pop("vector", None)
 
1345
 
1346
  ##rename keys
1347
  renamed_chunks = []
api/apps/user_app.py CHANGED
@@ -696,8 +696,7 @@ def set_tenant_info():
696
  """
697
  req = request.json
698
  try:
699
- tid = req["tenant_id"]
700
- del req["tenant_id"]
701
  TenantService.update_by_id(tid, req)
702
  return get_json_result(data=True)
703
  except Exception as e:
 
696
  """
697
  req = request.json
698
  try:
699
+ tid = req.pop("tenant_id")
 
700
  TenantService.update_by_id(tid, req)
701
  return get_json_result(data=True)
702
  except Exception as e:
api/db/services/document_service.py CHANGED
@@ -500,7 +500,7 @@ def doc_upload_and_parse(conversation_id, file_objs, user_id):
500
 
501
  STORAGE_IMPL.put(kb.id, d["id"], output_buffer.getvalue())
502
  d["img_id"] = "{}-{}".format(kb.id, d["id"])
503
- del d["image"]
504
  docs.append(d)
505
 
506
  parser_ids = {d["id"]: d["parser_id"] for d, _ in files}
 
500
 
501
  STORAGE_IMPL.put(kb.id, d["id"], output_buffer.getvalue())
502
  d["img_id"] = "{}-{}".format(kb.id, d["id"])
503
+ d.pop("image", None)
504
  docs.append(d)
505
 
506
  parser_ids = {d["id"]: d["parser_id"] for d, _ in files}
api/utils/log_utils.py CHANGED
@@ -49,5 +49,6 @@ def initRootLogger(logfile_basename: str, log_level: int = logging.INFO, log_for
49
  handler2.setFormatter(formatter)
50
  logger.addHandler(handler2)
51
 
 
52
  msg = f"{logfile_basename} log path: {log_path}"
53
  logger.info(msg)
 
49
  handler2.setFormatter(formatter)
50
  logger.addHandler(handler2)
51
 
52
+ logging.captureWarnings(True)
53
  msg = f"{logfile_basename} log path: {log_path}"
54
  logger.info(msg)
graphrag/graph_extractor.py CHANGED
@@ -9,7 +9,7 @@ import logging
9
  import numbers
10
  import re
11
  import traceback
12
- from typing import Any, Callable
13
  from dataclasses import dataclass
14
  import tiktoken
15
  from graphrag.graph_prompt import GRAPH_EXTRACTION_PROMPT, CONTINUE_PROMPT, LOOP_PROMPT
 
9
  import numbers
10
  import re
11
  import traceback
12
+ from typing import Any, Callable, Mapping
13
  from dataclasses import dataclass
14
  import tiktoken
15
  from graphrag.graph_prompt import GRAPH_EXTRACTION_PROMPT, CONTINUE_PROMPT, LOOP_PROMPT
rag/benchmark.py CHANGED
@@ -59,8 +59,7 @@ class Benchmark:
59
  del qrels[query]
60
  continue
61
  for c in ranks["chunks"]:
62
- if "vector" in c:
63
- del c["vector"]
64
  run[query][c["chunk_id"]] = c["similarity"]
65
  return run
66
 
 
59
  del qrels[query]
60
  continue
61
  for c in ranks["chunks"]:
62
+ c.pop("vector", None)
 
63
  run[query][c["chunk_id"]] = c["similarity"]
64
  return run
65
 
rag/nlp/search.py CHANGED
@@ -106,8 +106,7 @@ class Dealer:
106
  # If result is empty, try again with lower min_match
107
  if total == 0:
108
  matchText, _ = self.qryr.question(qst, min_match=0.1)
109
- if "doc_ids" in filters:
110
- del filters["doc_ids"]
111
  matchDense.extra_options["similarity"] = 0.17
112
  res = self.dataStore.search(src, highlightFields, filters, [matchText, matchDense, fusionExpr], orderBy, offset, limit, idx_names, kb_ids)
113
  total=self.dataStore.getTotal(res)
 
106
  # If result is empty, try again with lower min_match
107
  if total == 0:
108
  matchText, _ = self.qryr.question(qst, min_match=0.1)
109
+ filters.pop("doc_ids", None)
 
110
  matchDense.extra_options["similarity"] = 0.17
111
  res = self.dataStore.search(src, highlightFields, filters, [matchText, matchDense, fusionExpr], orderBy, offset, limit, idx_names, kb_ids)
112
  total=self.dataStore.getTotal(res)
rag/utils/es_conn.py CHANGED
@@ -5,7 +5,7 @@ import time
5
  import os
6
 
7
  import copy
8
- from elasticsearch import Elasticsearch
9
  from elasticsearch_dsl import UpdateByQuery, Q, Search, Index
10
  from elastic_transport import ConnectionTimeout
11
  from rag import settings
@@ -82,7 +82,9 @@ class ESConnection(DocStoreConnection):
82
 
83
  def deleteIdx(self, indexName: str, knowledgebaseId: str):
84
  try:
85
- return self.es.indices.delete(indexName, allow_no_indices=True)
 
 
86
  except Exception:
87
  logging.exception("ES delete index error %s" % (indexName))
88
 
@@ -146,6 +148,7 @@ class ESConnection(DocStoreConnection):
146
  similarity=similarity,
147
  )
148
 
 
149
  if condition:
150
  if not bqry:
151
  bqry = Q("bool", must=[])
@@ -226,8 +229,7 @@ class ESConnection(DocStoreConnection):
226
  assert "_id" not in d
227
  assert "id" in d
228
  d_copy = copy.deepcopy(d)
229
- meta_id = d_copy["id"]
230
- del d_copy["id"]
231
  operations.append(
232
  {"index": {"_index": indexName, "_id": meta_id}})
233
  operations.append(d_copy)
@@ -254,7 +256,7 @@ class ESConnection(DocStoreConnection):
254
 
255
  def update(self, condition: dict, newValue: dict, indexName: str, knowledgebaseId: str) -> bool:
256
  doc = copy.deepcopy(newValue)
257
- del doc['id']
258
  if "id" in condition and isinstance(condition["id"], str):
259
  # update specific single document
260
  chunkId = condition["id"]
 
5
  import os
6
 
7
  import copy
8
+ from elasticsearch import Elasticsearch, NotFoundError
9
  from elasticsearch_dsl import UpdateByQuery, Q, Search, Index
10
  from elastic_transport import ConnectionTimeout
11
  from rag import settings
 
82
 
83
  def deleteIdx(self, indexName: str, knowledgebaseId: str):
84
  try:
85
+ self.es.indices.delete(index=indexName, allow_no_indices=True)
86
+ except NotFoundError:
87
+ pass
88
  except Exception:
89
  logging.exception("ES delete index error %s" % (indexName))
90
 
 
148
  similarity=similarity,
149
  )
150
 
151
+ condition["kb_id"] = knowledgebaseIds
152
  if condition:
153
  if not bqry:
154
  bqry = Q("bool", must=[])
 
229
  assert "_id" not in d
230
  assert "id" in d
231
  d_copy = copy.deepcopy(d)
232
+ meta_id = d_copy.pop("id", "")
 
233
  operations.append(
234
  {"index": {"_index": indexName, "_id": meta_id}})
235
  operations.append(d_copy)
 
256
 
257
  def update(self, condition: dict, newValue: dict, indexName: str, knowledgebaseId: str) -> bool:
258
  doc = copy.deepcopy(newValue)
259
+ doc.pop("id", None)
260
  if "id" in condition and isinstance(condition["id"], str):
261
  # update specific single document
262
  chunkId = condition["id"]
rag/utils/minio_conn.py CHANGED
@@ -78,10 +78,13 @@ class RAGFlowMinio(object):
78
 
79
  def obj_exist(self, bucket, fnm):
80
  try:
81
- if self.conn.stat_object(bucket, fnm):return True
 
 
 
82
  return False
83
  except Exception:
84
- logging.exception(f"Fail put {bucket}/{fnm}:")
85
  return False
86
 
87
 
 
78
 
79
  def obj_exist(self, bucket, fnm):
80
  try:
81
+ if not self.conn.bucket_exists(bucket):
82
+ return False
83
+ if self.conn.stat_object(bucket, fnm):
84
+ return True
85
  return False
86
  except Exception:
87
+ logging.exception(f"RAGFlowMinio.obj_exist {bucket}/{fnm} got exception:")
88
  return False
89
 
90
 
rag/utils/redis_conn.py CHANGED
@@ -12,7 +12,7 @@ class Payload:
12
  self.__queue_name = queue_name
13
  self.__group_name = group_name
14
  self.__msg_id = msg_id
15
- self.__message = json.loads(message['message'])
16
 
17
  def ack(self):
18
  try:
@@ -35,19 +35,20 @@ class RedisDB:
35
 
36
  def __open__(self):
37
  try:
38
- self.REDIS = redis.StrictRedis(host=self.config["host"].split(":")[0],
39
- port=int(self.config.get("host", ":6379").split(":")[1]),
40
- db=int(self.config.get("db", 1)),
41
- password=self.config.get("password"),
42
- decode_responses=True)
 
 
43
  except Exception:
44
  logging.warning("Redis can't be connected.")
45
  return self.REDIS
46
 
47
  def health(self):
48
-
49
  self.REDIS.ping()
50
- a, b = 'xx', 'yy'
51
  self.REDIS.set(a, b, 3)
52
 
53
  if self.REDIS.get(a) == b:
@@ -57,19 +58,21 @@ class RedisDB:
57
  return self.REDIS is not None
58
 
59
  def exist(self, k):
60
- if not self.REDIS: return
 
61
  try:
62
  return self.REDIS.exists(k)
63
  except Exception as e:
64
- logging.warning("[EXCEPTION]exist" + str(k) + "||" + str(e))
65
  self.__open__()
66
 
67
  def get(self, k):
68
- if not self.REDIS: return
 
69
  try:
70
  return self.REDIS.get(k)
71
  except Exception as e:
72
- logging.warning("[EXCEPTION]get" + str(k) + "||" + str(e))
73
  self.__open__()
74
 
75
  def set_obj(self, k, obj, exp=3600):
@@ -77,7 +80,7 @@ class RedisDB:
77
  self.REDIS.set(k, json.dumps(obj, ensure_ascii=False), exp)
78
  return True
79
  except Exception as e:
80
- logging.warning("[EXCEPTION]set_obj" + str(k) + "||" + str(e))
81
  self.__open__()
82
  return False
83
 
@@ -86,7 +89,7 @@ class RedisDB:
86
  self.REDIS.set(k, v, exp)
87
  return True
88
  except Exception as e:
89
- logging.warning("[EXCEPTION]set" + str(k) + "||" + str(e))
90
  self.__open__()
91
  return False
92
 
@@ -95,7 +98,7 @@ class RedisDB:
95
  self.REDIS.sadd(key, member)
96
  return True
97
  except Exception as e:
98
- logging.warning("[EXCEPTION]sadd" + str(key) + "||" + str(e))
99
  self.__open__()
100
  return False
101
 
@@ -104,7 +107,7 @@ class RedisDB:
104
  self.REDIS.srem(key, member)
105
  return True
106
  except Exception as e:
107
- logging.warning("[EXCEPTION]srem" + str(key) + "||" + str(e))
108
  self.__open__()
109
  return False
110
 
@@ -113,7 +116,9 @@ class RedisDB:
113
  res = self.REDIS.smembers(key)
114
  return res
115
  except Exception as e:
116
- logging.warning("[EXCEPTION]smembers" + str(key) + "||" + str(e))
 
 
117
  self.__open__()
118
  return None
119
 
@@ -122,7 +127,7 @@ class RedisDB:
122
  self.REDIS.zadd(key, {member: score})
123
  return True
124
  except Exception as e:
125
- logging.warning("[EXCEPTION]zadd" + str(key) + "||" + str(e))
126
  self.__open__()
127
  return False
128
 
@@ -131,7 +136,7 @@ class RedisDB:
131
  res = self.REDIS.zcount(key, min, max)
132
  return res
133
  except Exception as e:
134
- logging.warning("[EXCEPTION]spopmin" + str(key) + "||" + str(e))
135
  self.__open__()
136
  return 0
137
 
@@ -140,7 +145,7 @@ class RedisDB:
140
  res = self.REDIS.zpopmin(key, count)
141
  return res
142
  except Exception as e:
143
- logging.warning("[EXCEPTION]spopmin" + str(key) + "||" + str(e))
144
  self.__open__()
145
  return None
146
 
@@ -149,7 +154,9 @@ class RedisDB:
149
  res = self.REDIS.zrangebyscore(key, min, max)
150
  return res
151
  except Exception as e:
152
- logging.warning("[EXCEPTION]srangebyscore" + str(key) + "||" + str(e))
 
 
153
  self.__open__()
154
  return None
155
 
@@ -160,7 +167,9 @@ class RedisDB:
160
  pipeline.execute()
161
  return True
162
  except Exception as e:
163
- logging.warning("[EXCEPTION]set" + str(key) + "||" + str(e))
 
 
164
  self.__open__()
165
  return False
166
 
@@ -170,23 +179,22 @@ class RedisDB:
170
  payload = {"message": json.dumps(message)}
171
  pipeline = self.REDIS.pipeline()
172
  pipeline.xadd(queue, payload)
173
- #pipeline.expire(queue, exp)
174
  pipeline.execute()
175
  return True
176
- except Exception:
177
- logging.exception("producer" + str(queue) + " got exception")
 
 
178
  return False
179
 
180
- def queue_consumer(self, queue_name, group_name, consumer_name, msg_id=b">") -> Payload:
 
 
181
  try:
182
  group_info = self.REDIS.xinfo_groups(queue_name)
183
  if not any(e["name"] == group_name for e in group_info):
184
- self.REDIS.xgroup_create(
185
- queue_name,
186
- group_name,
187
- id="0",
188
- mkstream=True
189
- )
190
  args = {
191
  "groupname": group_name,
192
  "consumername": consumer_name,
@@ -202,10 +210,15 @@ class RedisDB:
202
  res = Payload(self.REDIS, queue_name, group_name, msg_id, payload)
203
  return res
204
  except Exception as e:
205
- if 'key' in str(e):
206
  pass
207
  else:
208
- logging.exception("consumer: " + str(queue_name) + " got exception")
 
 
 
 
 
209
  return None
210
 
211
  def get_unacked_for(self, consumer_name, queue_name, group_name):
@@ -213,36 +226,39 @@ class RedisDB:
213
  group_info = self.REDIS.xinfo_groups(queue_name)
214
  if not any(e["name"] == group_name for e in group_info):
215
  return
216
- pendings = self.REDIS.xpending_range(queue_name, group_name, min=0, max=10000000000000, count=1, consumername=consumer_name)
217
- if not pendings: return
 
 
 
 
 
 
 
 
218
  msg_id = pendings[0]["message_id"]
219
  msg = self.REDIS.xrange(queue_name, min=msg_id, count=1)
220
  _, payload = msg[0]
221
  return Payload(self.REDIS, queue_name, group_name, msg_id, payload)
222
  except Exception as e:
223
- if 'key' in str(e):
224
  return
225
- logging.exception("xpending_range: " + consumer_name + " got exception")
 
 
226
  self.__open__()
227
 
228
- def queue_info(self, queue, group_name) -> dict:
229
- for _ in range(3):
230
- try:
231
- groups = self.REDIS.xinfo_groups(queue)
232
- for group in groups:
233
- if group["name"] == group_name:
234
- return group
235
- except Exception:
236
- logging.exception("queue_length" + str(queue) + " got exception")
 
237
  return None
238
 
239
- def queue_head(self, queue) -> int:
240
- for _ in range(3):
241
- try:
242
- ent = self.REDIS.xrange(queue, count=1)
243
- return ent[0]
244
- except Exception:
245
- logging.exception("queue_head" + str(queue) + " got exception")
246
- return 0
247
 
248
  REDIS_CONN = RedisDB()
 
12
  self.__queue_name = queue_name
13
  self.__group_name = group_name
14
  self.__msg_id = msg_id
15
+ self.__message = json.loads(message["message"])
16
 
17
  def ack(self):
18
  try:
 
35
 
36
  def __open__(self):
37
  try:
38
+ self.REDIS = redis.StrictRedis(
39
+ host=self.config["host"].split(":")[0],
40
+ port=int(self.config.get("host", ":6379").split(":")[1]),
41
+ db=int(self.config.get("db", 1)),
42
+ password=self.config.get("password"),
43
+ decode_responses=True,
44
+ )
45
  except Exception:
46
  logging.warning("Redis can't be connected.")
47
  return self.REDIS
48
 
49
  def health(self):
 
50
  self.REDIS.ping()
51
+ a, b = "xx", "yy"
52
  self.REDIS.set(a, b, 3)
53
 
54
  if self.REDIS.get(a) == b:
 
58
  return self.REDIS is not None
59
 
60
  def exist(self, k):
61
+ if not self.REDIS:
62
+ return
63
  try:
64
  return self.REDIS.exists(k)
65
  except Exception as e:
66
+ logging.warning("RedisDB.exist " + str(k) + " got exception: " + str(e))
67
  self.__open__()
68
 
69
  def get(self, k):
70
+ if not self.REDIS:
71
+ return
72
  try:
73
  return self.REDIS.get(k)
74
  except Exception as e:
75
+ logging.warning("RedisDB.get " + str(k) + " got exception: " + str(e))
76
  self.__open__()
77
 
78
  def set_obj(self, k, obj, exp=3600):
 
80
  self.REDIS.set(k, json.dumps(obj, ensure_ascii=False), exp)
81
  return True
82
  except Exception as e:
83
+ logging.warning("RedisDB.set_obj " + str(k) + " got exception: " + str(e))
84
  self.__open__()
85
  return False
86
 
 
89
  self.REDIS.set(k, v, exp)
90
  return True
91
  except Exception as e:
92
+ logging.warning("RedisDB.set " + str(k) + " got exception: " + str(e))
93
  self.__open__()
94
  return False
95
 
 
98
  self.REDIS.sadd(key, member)
99
  return True
100
  except Exception as e:
101
+ logging.warning("RedisDB.sadd " + str(key) + " got exception: " + str(e))
102
  self.__open__()
103
  return False
104
 
 
107
  self.REDIS.srem(key, member)
108
  return True
109
  except Exception as e:
110
+ logging.warning("RedisDB.srem " + str(key) + " got exception: " + str(e))
111
  self.__open__()
112
  return False
113
 
 
116
  res = self.REDIS.smembers(key)
117
  return res
118
  except Exception as e:
119
+ logging.warning(
120
+ "RedisDB.smembers " + str(key) + " got exception: " + str(e)
121
+ )
122
  self.__open__()
123
  return None
124
 
 
127
  self.REDIS.zadd(key, {member: score})
128
  return True
129
  except Exception as e:
130
+ logging.warning("RedisDB.zadd " + str(key) + " got exception: " + str(e))
131
  self.__open__()
132
  return False
133
 
 
136
  res = self.REDIS.zcount(key, min, max)
137
  return res
138
  except Exception as e:
139
+ logging.warning("RedisDB.zcount " + str(key) + " got exception: " + str(e))
140
  self.__open__()
141
  return 0
142
 
 
145
  res = self.REDIS.zpopmin(key, count)
146
  return res
147
  except Exception as e:
148
+ logging.warning("RedisDB.zpopmin " + str(key) + " got exception: " + str(e))
149
  self.__open__()
150
  return None
151
 
 
154
  res = self.REDIS.zrangebyscore(key, min, max)
155
  return res
156
  except Exception as e:
157
+ logging.warning(
158
+ "RedisDB.zrangebyscore " + str(key) + " got exception: " + str(e)
159
+ )
160
  self.__open__()
161
  return None
162
 
 
167
  pipeline.execute()
168
  return True
169
  except Exception as e:
170
+ logging.warning(
171
+ "RedisDB.transaction " + str(key) + " got exception: " + str(e)
172
+ )
173
  self.__open__()
174
  return False
175
 
 
179
  payload = {"message": json.dumps(message)}
180
  pipeline = self.REDIS.pipeline()
181
  pipeline.xadd(queue, payload)
182
+ # pipeline.expire(queue, exp)
183
  pipeline.execute()
184
  return True
185
+ except Exception as e:
186
+ logging.exception(
187
+ "RedisDB.queue_product " + str(queue) + " got exception: " + str(e)
188
+ )
189
  return False
190
 
191
+ def queue_consumer(
192
+ self, queue_name, group_name, consumer_name, msg_id=b">"
193
+ ) -> Payload:
194
  try:
195
  group_info = self.REDIS.xinfo_groups(queue_name)
196
  if not any(e["name"] == group_name for e in group_info):
197
+ self.REDIS.xgroup_create(queue_name, group_name, id="0", mkstream=True)
 
 
 
 
 
198
  args = {
199
  "groupname": group_name,
200
  "consumername": consumer_name,
 
210
  res = Payload(self.REDIS, queue_name, group_name, msg_id, payload)
211
  return res
212
  except Exception as e:
213
+ if "key" in str(e):
214
  pass
215
  else:
216
+ logging.exception(
217
+ "RedisDB.queue_consumer "
218
+ + str(queue_name)
219
+ + " got exception: "
220
+ + str(e)
221
+ )
222
  return None
223
 
224
  def get_unacked_for(self, consumer_name, queue_name, group_name):
 
226
  group_info = self.REDIS.xinfo_groups(queue_name)
227
  if not any(e["name"] == group_name for e in group_info):
228
  return
229
+ pendings = self.REDIS.xpending_range(
230
+ queue_name,
231
+ group_name,
232
+ min=0,
233
+ max=10000000000000,
234
+ count=1,
235
+ consumername=consumer_name,
236
+ )
237
+ if not pendings:
238
+ return
239
  msg_id = pendings[0]["message_id"]
240
  msg = self.REDIS.xrange(queue_name, min=msg_id, count=1)
241
  _, payload = msg[0]
242
  return Payload(self.REDIS, queue_name, group_name, msg_id, payload)
243
  except Exception as e:
244
+ if "key" in str(e):
245
  return
246
+ logging.exception(
247
+ "RedisDB.get_unacked_for " + consumer_name + " got exception: " + str(e)
248
+ )
249
  self.__open__()
250
 
251
+ def queue_info(self, queue, group_name) -> dict | None:
252
+ try:
253
+ groups = self.REDIS.xinfo_groups(queue)
254
+ for group in groups:
255
+ if group["name"] == group_name:
256
+ return group
257
+ except Exception as e:
258
+ logging.warning(
259
+ "RedisDB.queue_info " + str(queue) + " got exception: " + str(e)
260
+ )
261
  return None
262
 
 
 
 
 
 
 
 
 
263
 
264
  REDIS_CONN = RedisDB()