File size: 23,633 Bytes
8bc2fc9
b691127
 
 
2459d65
68ba7f0
b691127
6807fac
b691127
 
7e0b5ff
2459d65
b691127
 
 
 
 
 
 
 
 
 
 
 
 
 
e2bab34
6807fac
de65b37
b691127
 
 
de65b37
b691127
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
faa9f3e
b691127
 
1b46744
 
 
 
faa9f3e
1b46744
 
 
3808212
 
 
 
 
1b46744
 
b691127
 
 
 
 
 
 
 
2459d65
e2bab34
2459d65
 
 
 
7e0b5ff
 
de65b37
 
 
7e0b5ff
de65b37
e2bab34
7e0b5ff
2459d65
e2bab34
2459d65
 
 
e2bab34
2459d65
e2bab34
b691127
de65b37
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b691127
 
 
 
 
 
 
 
 
 
 
 
22fe41e
b691127
 
 
7e0b5ff
b691127
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
de65b37
b691127
de65b37
 
 
 
 
 
 
b691127
e2bab34
b691127
 
 
 
 
 
 
 
 
e2bab34
b691127
 
 
 
 
 
 
 
 
2459d65
e2bab34
b691127
 
 
 
 
 
 
6807fac
 
 
 
 
 
 
 
 
 
de65b37
b691127
 
 
 
 
 
 
 
 
 
5e03e67
faa9f3e
 
5e03e67
 
 
b691127
 
de65b37
b691127
 
 
 
 
de65b37
b691127
 
de65b37
 
b691127
bcf5fd5
 
 
 
b691127
 
 
de65b37
b691127
de65b37
b691127
 
 
 
de65b37
 
 
6807fac
 
b691127
 
6807fac
 
 
 
b691127
de65b37
b691127
 
 
 
 
 
 
 
 
 
6807fac
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b691127
 
 
de65b37
 
 
faa9f3e
b691127
 
1b46744
5e03e67
3808212
faa9f3e
 
de65b37
b691127
 
6807fac
b691127
 
 
 
 
be98b1d
b691127
 
be98b1d
b691127
de65b37
faa9f3e
 
b691127
1b46744
b691127
 
 
 
6807fac
b691127
 
 
 
 
 
 
 
1427166
b691127
 
 
 
 
 
 
 
 
 
 
 
 
68ba7f0
 
b691127
 
 
e033370
 
 
be98b1d
1427166
 
be98b1d
 
 
 
aa2e210
be98b1d
 
68ba7f0
b691127
 
 
 
be98b1d
e2bab34
68ba7f0
b691127
de65b37
b691127
 
 
6807fac
b691127
be98b1d
 
b691127
 
 
 
faa9f3e
 
b691127
faa9f3e
b691127
 
be98b1d
 
 
 
 
 
 
 
 
 
faa9f3e
 
 
de65b37
b691127
 
 
 
 
 
 
 
 
 
 
 
e2bab34
b691127
 
 
de65b37
b691127
 
 
 
 
 
 
 
de65b37
 
 
b691127
 
de65b37
 
 
b691127
 
de65b37
 
 
b691127
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e033370
b691127
e033370
be98b1d
 
 
 
aa2e210
be98b1d
 
 
 
 
 
 
 
b691127
 
 
 
 
 
 
 
 
de65b37
 
 
b691127
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6807fac
b691127
 
 
 
 
 
de65b37
b691127
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
import logging
import os
import re
import json
import time
import copy
import infinity
from infinity.common import ConflictType, InfinityException, SortType
from infinity.index import IndexInfo, IndexType
from infinity.connection_pool import ConnectionPool
from infinity.errors import ErrorCode
from rag import settings
from rag.utils import singleton
import polars as pl
from polars.series.series import Series
from api.utils.file_utils import get_project_base_directory

from rag.utils.doc_store_conn import (
    DocStoreConnection,
    MatchExpr,
    MatchTextExpr,
    MatchDenseExpr,
    FusionExpr,
    OrderByExpr,
)

logger = logging.getLogger('ragflow.infinity_conn')

def equivalent_condition_to_str(condition: dict) -> str|None:
    assert "_id" not in condition
    cond = list()
    for k, v in condition.items():
        if not isinstance(k, str) or k in ["kb_id"] or not v:
            continue
        if isinstance(v, list):
            inCond = list()
            for item in v:
                if isinstance(item, str):
                    inCond.append(f"'{item}'")
                else:
                    inCond.append(str(item))
            if inCond:
                strInCond = ", ".join(inCond)
                strInCond = f"{k} IN ({strInCond})"
                cond.append(strInCond)
        elif isinstance(v, str):
            cond.append(f"{k}='{v}'")
        else:
            cond.append(f"{k}={str(v)}")
    return " AND ".join(cond) if cond else "1=1"


def concat_dataframes(df_list: list[pl.DataFrame], selectFields: list[str]) -> pl.DataFrame:
    """
    Concatenate multiple dataframes into one.
    """
    df_list = [df for df in df_list if not df.is_empty()]
    if df_list:
        return pl.concat(df_list)
    schema = dict()
    for field_name in selectFields:
        if field_name == 'score()': # Workaround: fix schema is changed to score()
            schema['SCORE'] = str
        else:
            schema[field_name] = str
    return pl.DataFrame(schema=schema)

@singleton
class InfinityConnection(DocStoreConnection):
    def __init__(self):
        self.dbName = settings.INFINITY.get("db_name", "default_db")
        infinity_uri = settings.INFINITY["uri"]
        if ":" in infinity_uri:
            host, port = infinity_uri.split(":")
            infinity_uri = infinity.common.NetworkAddress(host, int(port))
        self.connPool = None
        logger.info(f"Use Infinity {infinity_uri} as the doc engine.")
        for _ in range(24):
            try:
                connPool = ConnectionPool(infinity_uri)
                inf_conn = connPool.get_conn()
                res = inf_conn.show_current_node()
                if res.error_code == ErrorCode.OK and res.server_status=="started":
                    self._migrate_db(inf_conn)
                    self.connPool = connPool
                    connPool.release_conn(inf_conn)
                    break
                connPool.release_conn(inf_conn)
                logger.warn(f"Infinity status: {res.server_status}. Waiting Infinity {infinity_uri} to be healthy.")
                time.sleep(5)
            except Exception as e:
                logger.warning(f"{str(e)}. Waiting Infinity {infinity_uri} to be healthy.")
                time.sleep(5)
        if self.connPool is None:
            msg = f"Infinity {infinity_uri} didn't become healthy in 120s."
            logger.error(msg)
            raise Exception(msg)
        logger.info(f"Infinity {infinity_uri} is healthy.")

    def _migrate_db(self, inf_conn):
        inf_db = inf_conn.create_database(self.dbName, ConflictType.Ignore)
        fp_mapping = os.path.join(
            get_project_base_directory(), "conf", "infinity_mapping.json"
        )
        if not os.path.exists(fp_mapping):
            raise Exception(f"Mapping file not found at {fp_mapping}")
        schema = json.load(open(fp_mapping))
        table_names = inf_db.list_tables().table_names
        for table_name in table_names:
            inf_table = inf_db.get_table(table_name)
            index_names = inf_table.list_indexes().index_names
            if "q_vec_idx" not in index_names:
                # Skip tables not created by me
                continue
            column_names = inf_table.show_columns()["name"]
            column_names = set(column_names)
            for field_name, field_info in schema.items():
                if field_name in column_names:
                    continue
                res = inf_table.add_columns({field_name: field_info})
                assert res.error_code == infinity.ErrorCode.OK
                logger.info(
                    f"INFINITY added following column to table {table_name}: {field_name} {field_info}"
                )
                if field_info["type"] != "varchar" or "analyzer" not in field_info:
                    continue
                inf_table.create_index(
                    f"text_idx_{field_name}",
                    IndexInfo(
                        field_name, IndexType.FullText, {"ANALYZER": field_info["analyzer"]}
                    ),
                    ConflictType.Ignore,
                )

    """
    Database operations
    """

    def dbType(self) -> str:
        return "infinity"

    def health(self) -> dict:
        """
        Return the health status of the database.
        """
        inf_conn = self.connPool.get_conn()
        res = inf_conn.show_current_node()
        self.connPool.release_conn(inf_conn)
        res2 = {
            "type": "infinity",
            "status": "green" if res.error_code == 0 and res.server_status == "started" else "red",
            "error": res.error_msg,
        }
        return res2

    """
    Table operations
    """

    def createIdx(self, indexName: str, knowledgebaseId: str, vectorSize: int):
        table_name = f"{indexName}_{knowledgebaseId}"
        inf_conn = self.connPool.get_conn()
        inf_db = inf_conn.create_database(self.dbName, ConflictType.Ignore)

        fp_mapping = os.path.join(
            get_project_base_directory(), "conf", "infinity_mapping.json"
        )
        if not os.path.exists(fp_mapping):
            raise Exception(f"Mapping file not found at {fp_mapping}")
        schema = json.load(open(fp_mapping))
        vector_name = f"q_{vectorSize}_vec"
        schema[vector_name] = {"type": f"vector,{vectorSize},float"}
        inf_table = inf_db.create_table(
            table_name,
            schema,
            ConflictType.Ignore,
        )
        inf_table.create_index(
            "q_vec_idx",
            IndexInfo(
                vector_name,
                IndexType.Hnsw,
                {
                    "M": "16",
                    "ef_construction": "50",
                    "metric": "cosine",
                    "encode": "lvq",
                },
            ),
            ConflictType.Ignore,
        )
        for field_name, field_info in schema.items():
            if field_info["type"] != "varchar" or "analyzer" not in field_info:
                continue
            inf_table.create_index(
                f"text_idx_{field_name}",
                IndexInfo(
                    field_name, IndexType.FullText, {"ANALYZER": field_info["analyzer"]}
                ),
                ConflictType.Ignore,
            )
        self.connPool.release_conn(inf_conn)
        logger.info(
            f"INFINITY created table {table_name}, vector size {vectorSize}"
        )

    def deleteIdx(self, indexName: str, knowledgebaseId: str):
        table_name = f"{indexName}_{knowledgebaseId}"
        inf_conn = self.connPool.get_conn()
        db_instance = inf_conn.get_database(self.dbName)
        db_instance.drop_table(table_name, ConflictType.Ignore)
        self.connPool.release_conn(inf_conn)
        logger.info(f"INFINITY dropped table {table_name}")

    def indexExist(self, indexName: str, knowledgebaseId: str) -> bool:
        table_name = f"{indexName}_{knowledgebaseId}"
        try:
            inf_conn = self.connPool.get_conn()
            db_instance = inf_conn.get_database(self.dbName)
            _ = db_instance.get_table(table_name)
            self.connPool.release_conn(inf_conn)
            return True
        except Exception as e:
            logger.warning(f"INFINITY indexExist {str(e)}")
        return False

    """
    CRUD operations
    """

    def search(
            self,
            selectFields: list[str],
            highlightFields: list[str],
            condition: dict,
            matchExprs: list[MatchExpr],
            orderBy: OrderByExpr,
            offset: int,
            limit: int,
            indexNames: str | list[str],
            knowledgebaseIds: list[str],
    ) -> tuple[pl.DataFrame, int]:
        """
        TODO: Infinity doesn't provide highlight
        """
        if isinstance(indexNames, str):
            indexNames = indexNames.split(",")
        assert isinstance(indexNames, list) and len(indexNames) > 0
        inf_conn = self.connPool.get_conn()
        db_instance = inf_conn.get_database(self.dbName)
        df_list = list()
        table_list = list()
        for essential_field in ["id"]:
            if essential_field not in selectFields:
                selectFields.append(essential_field)
        if matchExprs:
            for essential_field in ["score()", "pagerank_fea"]:
                selectFields.append(essential_field)

        # Prepare expressions common to all tables
        filter_cond = None
        filter_fulltext = ""
        if condition:
            filter_cond = equivalent_condition_to_str(condition)
        for matchExpr in matchExprs:
            if isinstance(matchExpr, MatchTextExpr):
                if filter_cond and "filter" not in matchExpr.extra_options:
                    matchExpr.extra_options.update({"filter": filter_cond})
                fields = ",".join(matchExpr.fields)
                filter_fulltext = f"filter_fulltext('{fields}', '{matchExpr.matching_text}')"
                if filter_cond:
                    filter_fulltext = f"({filter_cond}) AND {filter_fulltext}"
                minimum_should_match = matchExpr.extra_options.get("minimum_should_match", 0.0)
                if isinstance(minimum_should_match, float):
                    str_minimum_should_match = str(int(minimum_should_match * 100)) + "%"
                    matchExpr.extra_options["minimum_should_match"] = str_minimum_should_match
                for k, v in matchExpr.extra_options.items():
                    if not isinstance(v, str):
                        matchExpr.extra_options[k] = str(v)
                logger.debug(f"INFINITY search MatchTextExpr: {json.dumps(matchExpr.__dict__)}")
            elif isinstance(matchExpr, MatchDenseExpr):
                if filter_cond and "filter" not in matchExpr.extra_options:
                    matchExpr.extra_options.update({"filter": filter_fulltext})
                for k, v in matchExpr.extra_options.items():
                    if not isinstance(v, str):
                        matchExpr.extra_options[k] = str(v)
                logger.debug(f"INFINITY search MatchDenseExpr: {json.dumps(matchExpr.__dict__)}")
            elif isinstance(matchExpr, FusionExpr):
                logger.debug(f"INFINITY search FusionExpr: {json.dumps(matchExpr.__dict__)}")

        order_by_expr_list = list()
        if orderBy.fields:
            for order_field in orderBy.fields:
                if order_field[1] == 0:
                    order_by_expr_list.append((order_field[0], SortType.Asc))
                else:
                    order_by_expr_list.append((order_field[0], SortType.Desc))

        total_hits_count = 0
        # Scatter search tables and gather the results
        for indexName in indexNames:
            for knowledgebaseId in knowledgebaseIds:
                table_name = f"{indexName}_{knowledgebaseId}"
                try:
                    table_instance = db_instance.get_table(table_name)
                except Exception:
                    continue
                table_list.append(table_name)
                builder = table_instance.output(selectFields)
                if len(matchExprs) > 0:
                    for matchExpr in matchExprs:
                        if isinstance(matchExpr, MatchTextExpr):
                            fields = ",".join(matchExpr.fields)
                            builder = builder.match_text(
                                fields,
                                matchExpr.matching_text,
                                matchExpr.topn,
                                matchExpr.extra_options,
                            )
                        elif isinstance(matchExpr, MatchDenseExpr):
                            builder = builder.match_dense(
                                matchExpr.vector_column_name,
                                matchExpr.embedding_data,
                                matchExpr.embedding_data_type,
                                matchExpr.distance_type,
                                matchExpr.topn,
                                matchExpr.extra_options,
                            )
                        elif isinstance(matchExpr, FusionExpr):
                            builder = builder.fusion(
                                matchExpr.method, matchExpr.topn, matchExpr.fusion_params
                            )
                else:
                    if len(filter_cond) > 0:
                        builder.filter(filter_cond)
                if orderBy.fields:
                    builder.sort(order_by_expr_list)
                builder.offset(offset).limit(limit)
                kb_res, extra_result = builder.option({"total_hits_count": True}).to_pl()
                if extra_result:
                    total_hits_count += int(extra_result["total_hits_count"])
                logger.debug(f"INFINITY search table: {str(table_name)}, result: {str(kb_res)}")
                df_list.append(kb_res)
        self.connPool.release_conn(inf_conn)
        res = concat_dataframes(df_list, selectFields)
        if matchExprs:
            res = res.sort(pl.col("SCORE") + pl.col("pagerank_fea"), descending=True, maintain_order=True)
        res = res.limit(limit)
        logger.debug(f"INFINITY search final result: {str(res)}")
        return res, total_hits_count

    def get(
            self, chunkId: str, indexName: str, knowledgebaseIds: list[str]
    ) -> dict | None:
        inf_conn = self.connPool.get_conn()
        db_instance = inf_conn.get_database(self.dbName)
        df_list = list()
        assert isinstance(knowledgebaseIds, list)
        table_list = list()
        for knowledgebaseId in knowledgebaseIds:
            table_name = f"{indexName}_{knowledgebaseId}"
            table_list.append(table_name)
            table_instance = db_instance.get_table(table_name)
            kb_res, _ = table_instance.output(["*"]).filter(f"id = '{chunkId}'").to_pl()
            logger.debug(f"INFINITY get table: {str(table_list)}, result: {str(kb_res)}")
            df_list.append(kb_res)
        self.connPool.release_conn(inf_conn)
        res = concat_dataframes(df_list, ["id"])
        res_fields = self.getFields(res, res.columns)
        return res_fields.get(chunkId, None)

    def insert(
            self, documents: list[dict], indexName: str, knowledgebaseId: str
    ) -> list[str]:
        inf_conn = self.connPool.get_conn()
        db_instance = inf_conn.get_database(self.dbName)
        table_name = f"{indexName}_{knowledgebaseId}"
        try:
            table_instance = db_instance.get_table(table_name)
        except InfinityException as e:
            # src/common/status.cppm, kTableNotExist = 3022
            if e.error_code != ErrorCode.TABLE_NOT_EXIST:
                raise
            vector_size = 0
            patt = re.compile(r"q_(?P<vector_size>\d+)_vec")
            for k in documents[0].keys():
                m = patt.match(k)
                if m:
                    vector_size = int(m.group("vector_size"))
                    break
            if vector_size == 0:
                raise ValueError("Cannot infer vector size from documents")
            self.createIdx(indexName, knowledgebaseId, vector_size)
            table_instance = db_instance.get_table(table_name)

        docs = copy.deepcopy(documents)
        for d in docs:
            assert "_id" not in d
            assert "id" in d
            for k, v in d.items():
                if k in ["important_kwd", "question_kwd", "entities_kwd"]:
                    assert isinstance(v, list)
                    d[k] = "###".join(v)
                elif k == 'kb_id':
                    if isinstance(d[k], list):
                        d[k] = d[k][0] # since d[k] is a list, but we need a str
                elif k == "position_int":
                    assert isinstance(v, list)
                    arr = [num for row in v for num in row]
                    d[k] = "_".join(f"{num:08x}" for num in arr)
                elif k in ["page_num_int", "top_int"]:
                    assert isinstance(v, list)
                    d[k] = "_".join(f"{num:08x}" for num in v)
        ids = ["'{}'".format(d["id"]) for d in docs]
        str_ids = ", ".join(ids)
        str_filter = f"id IN ({str_ids})"
        table_instance.delete(str_filter)
        # for doc in documents:
        #     logger.info(f"insert position_int: {doc['position_int']}")
        # logger.info(f"InfinityConnection.insert {json.dumps(documents)}")
        table_instance.insert(docs)
        self.connPool.release_conn(inf_conn)
        logger.debug(f"INFINITY inserted into {table_name} {str_ids}.")
        return []

    def update(
            self, condition: dict, newValue: dict, indexName: str, knowledgebaseId: str
    ) -> bool:
        # if 'position_int' in newValue:
        #     logger.info(f"update position_int: {newValue['position_int']}")
        inf_conn = self.connPool.get_conn()
        db_instance = inf_conn.get_database(self.dbName)
        table_name = f"{indexName}_{knowledgebaseId}"
        table_instance = db_instance.get_table(table_name)
        if "exist" in condition:
            del condition["exist"]
        filter = equivalent_condition_to_str(condition)
        for k, v in list(newValue.items()):
            if k.endswith("_kwd") and isinstance(v, list):
                newValue[k] = " ".join(v)
            elif k == 'kb_id':
                if isinstance(newValue[k], list):
                    newValue[k] = newValue[k][0] # since d[k] is a list, but we need a str
            elif k == "position_int":
                assert isinstance(v, list)
                arr = [num for row in v for num in row]
                newValue[k] = "_".join(f"{num:08x}" for num in arr)
            elif k in ["page_num_int", "top_int"]:
                assert isinstance(v, list)
                newValue[k] = "_".join(f"{num:08x}" for num in v)
            elif k == "remove" and v in ["pagerank_fea"]:
                del newValue[k]
                newValue[v] = 0
        logger.debug(f"INFINITY update table {table_name}, filter {filter}, newValue {newValue}.")
        table_instance.update(filter, newValue)
        self.connPool.release_conn(inf_conn)
        return True

    def delete(self, condition: dict, indexName: str, knowledgebaseId: str) -> int:
        inf_conn = self.connPool.get_conn()
        db_instance = inf_conn.get_database(self.dbName)
        table_name = f"{indexName}_{knowledgebaseId}"
        filter = equivalent_condition_to_str(condition)
        try:
            table_instance = db_instance.get_table(table_name)
        except Exception:
            logger.warning(
                f"Skipped deleting `{filter}` from table {table_name} since the table doesn't exist."
            )
            return 0
        logger.debug(f"INFINITY delete table {table_name}, filter {filter}.")
        res = table_instance.delete(filter)
        self.connPool.release_conn(inf_conn)
        return res.deleted_rows

    """
    Helper functions for search result
    """

    def getTotal(self, res: tuple[pl.DataFrame, int] | pl.DataFrame) -> int:
        if isinstance(res, tuple):
            return res[1]
        return len(res)

    def getChunkIds(self, res: tuple[pl.DataFrame, int] | pl.DataFrame) -> list[str]:
        if isinstance(res, tuple):
            res = res[0]
        return list(res["id"])

    def getFields(self, res: tuple[pl.DataFrame, int] | pl.DataFrame, fields: list[str]) -> list[str, dict]:
        if isinstance(res, tuple):
            res = res[0]
        res_fields = {}
        if not fields:
            return {}
        num_rows = len(res)
        column_id = res["id"]
        for i in range(num_rows):
            id = column_id[i]
            m = {"id": id}
            for fieldnm in fields:
                if fieldnm not in res:
                    m[fieldnm] = None
                    continue
                v = res[fieldnm][i]
                if isinstance(v, Series):
                    v = list(v)
                elif fieldnm in ["important_kwd", "question_kwd", "entities_kwd"]:
                    assert isinstance(v, str)
                    v = [kwd for kwd in v.split("###") if kwd]
                elif fieldnm == "position_int":
                    assert isinstance(v, str)
                    if v:
                        arr = [int(hex_val, 16) for hex_val in v.split('_')]
                        v = [arr[i:i + 5] for i in range(0, len(arr), 5)]
                    else:
                        v = []
                elif fieldnm in ["page_num_int", "top_int"]:
                    assert isinstance(v, str)
                    if v:
                        v = [int(hex_val, 16) for hex_val in v.split('_')]
                    else:
                        v = []
                else:
                    if not isinstance(v, str):
                        v = str(v)
                    # if fieldnm.endswith("_tks"):
                    #     v = rmSpace(v)
                m[fieldnm] = v
            res_fields[id] = m
        return res_fields

    def getHighlight(self, res: tuple[pl.DataFrame, int] | pl.DataFrame, keywords: list[str], fieldnm: str):
        if isinstance(res, tuple):
            res = res[0]
        ans = {}
        num_rows = len(res)
        column_id = res["id"]
        for i in range(num_rows):
            id = column_id[i]
            txt = res[fieldnm][i]
            txt = re.sub(r"[\r\n]", " ", txt, flags=re.IGNORECASE | re.MULTILINE)
            txts = []
            for t in re.split(r"[.?!;\n]", txt):
                for w in keywords:
                    t = re.sub(
                        r"(^|[ .?/'\"\(\)!,:;-])(%s)([ .?/'\"\(\)!,:;-])"
                        % re.escape(w),
                        r"\1<em>\2</em>\3",
                        t,
                        flags=re.IGNORECASE | re.MULTILINE,
                    )
                if not re.search(
                        r"<em>[^<>]+</em>", t, flags=re.IGNORECASE | re.MULTILINE
                ):
                    continue
                txts.append(t)
            ans[id] = "...".join(txts)
        return ans

    def getAggregation(self, res: tuple[pl.DataFrame, int] | pl.DataFrame, fieldnm: str):
        """
        TODO: Infinity doesn't provide aggregation
        """
        return list()

    """
    SQL
    """

    def sql(sql: str, fetch_size: int, format: str):
        raise NotImplementedError("Not implemented")