File size: 7,322 Bytes
24da205
 
 
 
 
 
 
 
 
8faf53b
24da205
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8faf53b
24da205
 
 
 
 
 
 
 
 
 
 
 
 
8faf53b
24da205
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
121b0b5
24da205
 
 
b691127
24da205
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2459d65
b691127
24da205
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2459d65
b691127
 
24da205
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9753e7a
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
from ragflow_sdk import RAGFlow
from common import HOST_ADDRESS
from time import sleep

def test_parse_document_with_txt(get_api_key_fixture):
    API_KEY = get_api_key_fixture
    rag = RAGFlow(API_KEY, HOST_ADDRESS)
    ds = rag.create_dataset(name="test_parse_document")
    name = 'ragflow_test.txt'
    with open("test_data/ragflow_test.txt", "rb") as file :
        blob = file.read()
    docs = ds.upload_documents([{"displayed_name": name, "blob": blob}])
    doc = docs[0]
    ds.async_parse_documents(document_ids=[doc.id])
    '''
    for n in range(100):
        if doc.progress == 1:
            break
        sleep(1)
    else:
        raise Exception("Run time ERROR: Document parsing did not complete in time.")
    '''

def test_parse_and_cancel_document(get_api_key_fixture):
    API_KEY = get_api_key_fixture
    rag = RAGFlow(API_KEY, HOST_ADDRESS)
    ds = rag.create_dataset(name="test_parse_and_cancel_document")
    name = 'ragflow_test.txt'
    with open("test_data/ragflow_test.txt", "rb") as file :
        blob = file.read()
    docs=ds.upload_documents([{"displayed_name": name, "blob": blob}])
    doc = docs[0]
    ds.async_parse_documents(document_ids=[doc.id])
    sleep(1)
    if 0 < doc.progress < 1:
        ds.async_cancel_parse_documents(document_ids=[doc.id])


def test_bulk_parse_documents(get_api_key_fixture):
    API_KEY = get_api_key_fixture
    rag = RAGFlow(API_KEY, HOST_ADDRESS)
    ds = rag.create_dataset(name="test_bulk_parse_and_cancel_documents")
    with open("test_data/ragflow.txt", "rb") as file:
        blob = file.read()
    documents = [
        {'displayed_name': 'test1.txt', 'blob': blob},
        {'displayed_name': 'test2.txt', 'blob': blob},
        {'displayed_name': 'test3.txt', 'blob': blob}
    ]
    docs = ds.upload_documents(documents)
    ids = [doc.id for doc in docs]
    ds.async_parse_documents(ids)
    '''
    for n in range(100):
        all_completed = all(doc.progress == 1 for doc in docs)
        if all_completed:
            break
        sleep(1)
    else:
        raise Exception("Run time ERROR: Bulk document parsing did not complete in time.")
    '''

def test_list_chunks_with_success(get_api_key_fixture):
    API_KEY = get_api_key_fixture
    rag = RAGFlow(API_KEY, HOST_ADDRESS)
    ds = rag.create_dataset(name="test_list_chunks_with_success")
    with open("test_data/ragflow_test.txt", "rb") as file:
        blob = file.read()
    '''
    # chunk_size = 1024 * 1024
    # chunks = [blob[i:i + chunk_size] for i in range(0, len(blob), chunk_size)]
    documents = [
        {'displayed_name': f'chunk_{i}.txt', 'blob': chunk} for i, chunk in enumerate(chunks)
    ]
    '''
    documents =[{"displayed_name":"test_list_chunks_with_success.txt","blob":blob}]
    docs = ds.upload_documents(documents)
    ids = [doc.id for doc in docs]
    ds.async_parse_documents(ids)
    '''
    for n in range(100):
        all_completed = all(doc.progress == 1 for doc in docs)
        if all_completed:
            break
        sleep(1)
    else:
        raise Exception("Run time ERROR: Chunk document parsing did not complete in time.")
    '''
    doc = docs[0]
    doc.list_chunks()


def test_add_chunk_with_success(get_api_key_fixture):
    API_KEY = get_api_key_fixture
    rag = RAGFlow(API_KEY, HOST_ADDRESS)
    ds = rag.create_dataset(name="test_add_chunk_with_success")
    with open("test_data/ragflow_test.txt", "rb") as file:
        blob = file.read()
    '''
    # chunk_size = 1024 * 1024
    # chunks = [blob[i:i + chunk_size] for i in range(0, len(blob), chunk_size)]
    documents = [
        {'displayed_name': f'chunk_{i}.txt', 'blob': chunk} for i, chunk in enumerate(chunks)
    ]
    '''
    documents =[{"displayed_name":"test_list_chunks_with_success.txt","blob":blob}]
    docs = ds.upload_documents(documents)
    doc = docs[0]
    doc.add_chunk(content="This is a chunk addition test")


def test_delete_chunk_with_success(get_api_key_fixture):
    API_KEY = get_api_key_fixture
    rag = RAGFlow(API_KEY, HOST_ADDRESS)
    ds = rag.create_dataset(name="test_delete_chunk_with_success")
    with open("test_data/ragflow_test.txt", "rb") as file:
        blob = file.read()
    '''
    # chunk_size = 1024 * 1024
    # chunks = [blob[i:i + chunk_size] for i in range(0, len(blob), chunk_size)]
    documents = [
        {'displayed_name': f'chunk_{i}.txt', 'blob': chunk} for i, chunk in enumerate(chunks)
    ]
    '''
    documents =[{"displayed_name":"test_delete_chunk_with_success.txt","blob":blob}]
    docs = ds.upload_documents(documents)
    doc = docs[0]
    chunk = doc.add_chunk(content="This is a chunk addition test")
    sleep(5)
    doc.delete_chunks([chunk.id])


def test_update_chunk_content(get_api_key_fixture):
    API_KEY = get_api_key_fixture
    rag = RAGFlow(API_KEY, HOST_ADDRESS)
    ds = rag.create_dataset(name="test_update_chunk_content_with_success")
    with open("test_data/ragflow_test.txt", "rb") as file:
        blob = file.read()
    '''
    # chunk_size = 1024 * 1024
    # chunks = [blob[i:i + chunk_size] for i in range(0, len(blob), chunk_size)]
    documents = [
        {'displayed_name': f'chunk_{i}.txt', 'blob': chunk} for i, chunk in enumerate(chunks)
    ]
    '''
    documents =[{"displayed_name":"test_update_chunk_content_with_success.txt","blob":blob}]
    docs = ds.upload_documents(documents)
    doc = docs[0]
    chunk = doc.add_chunk(content="This is a chunk addition test")
    # For Elasticsearch, the chunk is not searchable in shot time (~2s).
    sleep(3)
    chunk.update({"content":"This is a updated content"})

def test_update_chunk_available(get_api_key_fixture):
    API_KEY = get_api_key_fixture
    rag = RAGFlow(API_KEY, HOST_ADDRESS)
    ds = rag.create_dataset(name="test_update_chunk_available_with_success")
    with open("test_data/ragflow_test.txt", "rb") as file:
        blob = file.read()
    '''
    # chunk_size = 1024 * 1024
    # chunks = [blob[i:i + chunk_size] for i in range(0, len(blob), chunk_size)]
    documents = [
        {'displayed_name': f'chunk_{i}.txt', 'blob': chunk} for i, chunk in enumerate(chunks)
    ]
    '''
    documents =[{"displayed_name":"test_update_chunk_available_with_success.txt","blob":blob}]
    docs = ds.upload_documents(documents)
    doc = docs[0]
    chunk = doc.add_chunk(content="This is a chunk addition test")
    # For Elasticsearch, the chunk is not searchable in shot time (~2s).
    sleep(3)
    chunk.update({"available":0})


def test_retrieve_chunks(get_api_key_fixture):
    API_KEY = get_api_key_fixture
    rag = RAGFlow(API_KEY, HOST_ADDRESS)
    ds = rag.create_dataset(name="retrieval")
    with open("test_data/ragflow_test.txt", "rb") as file:
        blob = file.read()
    '''
    # chunk_size = 1024 * 1024
    # chunks = [blob[i:i + chunk_size] for i in range(0, len(blob), chunk_size)]
    documents = [
        {'displayed_name': f'chunk_{i}.txt', 'blob': chunk} for i, chunk in enumerate(chunks)
    ]
    '''
    documents =[{"displayed_name":"test_retrieve_chunks.txt","blob":blob}]
    docs = ds.upload_documents(documents)
    doc = docs[0]
    doc.add_chunk(content="This is a chunk addition test")
    rag.retrieve(dataset_ids=[ds.id],document_ids=[doc.id])
    rag.delete_datasets(ids=[ds.id])

# test different parameters for the retrieval