from api.settings import RetCode from test_sdkbase import TestSdk from ragflow import RAGFlow import pytest from common import API_KEY, HOST_ADDRESS class TestFile(TestSdk): """ This class contains a suite of tests for the content management functionality within the dataset. It ensures that the following functionalities as expected: 1. upload local files 2. upload remote files 3. download a file 4. delete a file 5. enable rename 6. list files 7. start parsing 8. end parsing 9. check the status of the file 10. list the chunks 11. delete a chunk 12. insert a new chunk 13. edit the status of chunk 14. get the specific chunk 15. retrieval test """ # ----------------------------upload local files----------------------------------------------------- def test_upload_two_files(self): """ Test uploading two files with success. """ ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_upload_two_files") dataset_id = created_res["data"]["dataset_id"] file_paths = ["test_data/test.txt", "test_data/test1.txt"] res = ragflow.upload_local_file(dataset_id, file_paths) assert res["code"] == RetCode.SUCCESS and res["message"] == "success" def test_upload_one_file(self): """ Test uploading one file with success. """ ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_upload_one_file") dataset_id = created_res["data"]["dataset_id"] file_paths = ["test_data/test.txt"] res = ragflow.upload_local_file(dataset_id, file_paths) assert res["code"] == RetCode.SUCCESS and res["message"] == "success" def test_upload_nonexistent_files(self): """ Test uploading a file which does not exist. """ ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_upload_nonexistent_files") dataset_id = created_res["data"]["dataset_id"] file_paths = ["test_data/imagination.txt"] res = ragflow.upload_local_file(dataset_id, file_paths) assert res["code"] == RetCode.DATA_ERROR and "does not exist" in res["message"] def test_upload_file_if_dataset_does_not_exist(self): """ Test uploading files if the dataset id does not exist. """ ragflow = RAGFlow(API_KEY, HOST_ADDRESS) file_paths = ["test_data/test.txt"] res = ragflow.upload_local_file("111", file_paths) assert res["code"] == RetCode.DATA_ERROR and res["message"] == "Can't find this dataset" def test_upload_file_without_name(self): """ Test uploading files that do not have name. """ ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_upload_file_without_name") dataset_id = created_res["data"]["dataset_id"] file_paths = ["test_data/.txt"] res = ragflow.upload_local_file(dataset_id, file_paths) assert res["code"] == RetCode.SUCCESS def test_upload_file_without_name1(self): """ Test uploading files that do not have name. """ ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_upload_file_without_name") dataset_id = created_res["data"]["dataset_id"] file_paths = ["test_data/.txt", "test_data/empty.txt"] res = ragflow.upload_local_file(dataset_id, file_paths) assert res["code"] == RetCode.SUCCESS def test_upload_files_exceeding_the_number_limit(self): """ Test uploading files whose number exceeds the limit. """ ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_upload_files_exceeding_the_number_limit") dataset_id = created_res["data"]["dataset_id"] file_paths = ["test_data/test.txt", "test_data/test1.txt"] * 256 res = ragflow.upload_local_file(dataset_id, file_paths) assert (res["message"] == "You try to upload 512 files, which exceeds the maximum number of uploading files: 256" and res["code"] == RetCode.DATA_ERROR) def test_upload_files_without_files(self): """ Test uploading files without files. """ ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_upload_files_without_files") dataset_id = created_res["data"]["dataset_id"] file_paths = [None] res = ragflow.upload_local_file(dataset_id, file_paths) assert (res["message"] == "None is not string." and res["code"] == RetCode.ARGUMENT_ERROR) def test_upload_files_with_two_files_with_same_name(self): """ Test uploading files with the same name. """ ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_upload_files_with_two_files_with_same_name") dataset_id = created_res["data"]["dataset_id"] file_paths = ["test_data/test.txt"] * 2 res = ragflow.upload_local_file(dataset_id, file_paths) assert (res["message"] == "success" and res["code"] == RetCode.SUCCESS) def test_upload_files_with_file_paths(self): """ Test uploading files with only specifying the file path's repo. """ ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_upload_files_with_file_paths") dataset_id = created_res["data"]["dataset_id"] file_paths = ["test_data/"] res = ragflow.upload_local_file(dataset_id, file_paths) assert (res["message"] == "The file test_data/ does not exist" and res["code"] == RetCode.DATA_ERROR) def test_upload_files_with_remote_file_path(self): """ Test uploading files with remote files. """ ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_upload_files_with_remote_file_path") dataset_id = created_res["data"]["dataset_id"] file_paths = ["https://github.com/genostack/ragflow"] res = ragflow.upload_local_file(dataset_id, file_paths) assert res["code"] == RetCode.ARGUMENT_ERROR and res["message"] == "Remote files have not unsupported." # ----------------------------delete a file----------------------------------------------------- def test_delete_one_file(self): """ Test deleting one file with success. """ ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_delete_one_file") dataset_id = created_res["data"]["dataset_id"] file_paths = ["test_data/test.txt"] res = ragflow.upload_local_file(dataset_id, file_paths) # get the doc_id data = res["data"][0] doc_id = data["id"] # delete the files deleted_res = ragflow.delete_files(doc_id, dataset_id) # assert value assert deleted_res["code"] == RetCode.SUCCESS and deleted_res["data"] is True def test_delete_document_with_not_existing_document(self): """ Test deleting a document that does not exist with failure. """ ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_delete_document_with_not_existing_document") dataset_id = created_res["data"]["dataset_id"] res = ragflow.delete_files("111", dataset_id) assert res["code"] == RetCode.DATA_ERROR and res["message"] == "Document 111 not found!" def test_delete_document_with_creating_100_documents_and_deleting_100_documents(self): """ Test deleting documents when uploading 100 docs and deleting 100 docs. """ # upload 100 docs ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_delete_one_file") dataset_id = created_res["data"]["dataset_id"] file_paths = ["test_data/test.txt"] * 100 res = ragflow.upload_local_file(dataset_id, file_paths) # get the doc_id data = res["data"] for d in data: doc_id = d["id"] # delete the files deleted_res = ragflow.delete_files(doc_id, dataset_id) # assert value assert deleted_res["code"] == RetCode.SUCCESS and deleted_res["data"] is True def test_delete_document_from_nonexistent_dataset(self): """ Test deleting documents from a non-existent dataset """ ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_delete_one_file") dataset_id = created_res["data"]["dataset_id"] file_paths = ["test_data/test.txt"] res = ragflow.upload_local_file(dataset_id, file_paths) # get the doc_id data = res["data"][0] doc_id = data["id"] # delete the files deleted_res = ragflow.delete_files(doc_id, "000") # assert value assert (deleted_res["code"] == RetCode.ARGUMENT_ERROR and deleted_res["message"] == f"The document {doc_id} is not in the dataset: 000, but in the dataset: {dataset_id}.") def test_delete_document_which_is_located_in_other_dataset(self): """ Test deleting a document which is located in other dataset. """ ragflow = RAGFlow(API_KEY, HOST_ADDRESS) # upload a document created_res = ragflow.create_dataset("test_delete_document_which_is_located_in_other_dataset") created_res_id = created_res["data"]["dataset_id"] file_paths = ["test_data/test.txt"] res = ragflow.upload_local_file(created_res_id, file_paths) # other dataset other_res = ragflow.create_dataset("other_dataset") other_dataset_id = other_res["data"]["dataset_id"] # get the doc_id data = res["data"][0] doc_id = data["id"] # delete the files from the other dataset deleted_res = ragflow.delete_files(doc_id, other_dataset_id) # assert value assert (deleted_res["code"] == RetCode.ARGUMENT_ERROR and deleted_res["message"] == f"The document {doc_id} is not in the dataset: {other_dataset_id}, but in the dataset: {created_res_id}.") # ----------------------------list files----------------------------------------------------- def test_list_documents_with_success(self): """ Test listing documents with a successful outcome. """ ragflow = RAGFlow(API_KEY, HOST_ADDRESS) # upload a document created_res = ragflow.create_dataset("test_list_documents_with_success") created_res_id = created_res["data"]["dataset_id"] file_paths = ["test_data/test.txt"] ragflow.upload_local_file(created_res_id, file_paths) # Call the list_document method response = ragflow.list_files(created_res_id) assert response["code"] == RetCode.SUCCESS and len(response["data"]["docs"]) == 1 def test_list_documents_with_checking_size(self): """ Test listing documents and verify the size and names of the documents. """ ragflow = RAGFlow(API_KEY, HOST_ADDRESS) # upload 10 documents created_res = ragflow.create_dataset("test_list_documents_with_checking_size") created_res_id = created_res["data"]["dataset_id"] file_paths = ["test_data/test.txt"] * 10 ragflow.upload_local_file(created_res_id, file_paths) # Call the list_document method response = ragflow.list_files(created_res_id) assert response["code"] == RetCode.SUCCESS and len(response["data"]["docs"]) == 10 def test_list_documents_with_getting_empty_result(self): """ Test listing documents that should be empty. """ ragflow = RAGFlow(API_KEY, HOST_ADDRESS) # upload 0 documents created_res = ragflow.create_dataset("test_list_documents_with_getting_empty_result") created_res_id = created_res["data"]["dataset_id"] # Call the list_document method response = ragflow.list_files(created_res_id) assert response["code"] == RetCode.SUCCESS and len(response["data"]["docs"]) == 0 def test_list_documents_with_creating_100_documents(self): """ Test listing 100 documents and verify the size of these documents. """ ragflow = RAGFlow(API_KEY, HOST_ADDRESS) # upload 100 documents created_res = ragflow.create_dataset("test_list_documents_with_creating_100_documents") created_res_id = created_res["data"]["dataset_id"] file_paths = ["test_data/test.txt"] * 100 ragflow.upload_local_file(created_res_id, file_paths) # Call the list_document method response = ragflow.list_files(created_res_id) assert response["code"] == RetCode.SUCCESS and len(response["data"]["docs"]) == 100 def test_list_document_with_failure(self): """ Test listing documents with IndexError. """ ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_list_document_with_failure") created_res_id = created_res["data"]["dataset_id"] response = ragflow.list_files(created_res_id, offset=-1, count=-1) assert "IndexError" in response["message"] and response["code"] == RetCode.EXCEPTION_ERROR def test_list_document_with_verifying_offset_and_count(self): """ Test listing documents with verifying the functionalities of offset and count. """ ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_list_document_with_verifying_offset_and_count") created_res_id = created_res["data"]["dataset_id"] file_paths = ["test_data/test.txt", "test_data/empty.txt"] * 10 ragflow.upload_local_file(created_res_id, file_paths) # Call the list_document method response = ragflow.list_files(created_res_id, offset=2, count=10) assert response["code"] == RetCode.SUCCESS and len(response["data"]["docs"]) == 10 def test_list_document_with_verifying_keywords(self): """ Test listing documents with verifying the functionality of searching keywords. """ ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_list_document_with_verifying_keywords") created_res_id = created_res["data"]["dataset_id"] file_paths = ["test_data/test.txt", "test_data/empty.txt"] ragflow.upload_local_file(created_res_id, file_paths) # Call the list_document method response = ragflow.list_files(created_res_id, keywords="empty") assert response["code"] == RetCode.SUCCESS and len(response["data"]["docs"]) == 1 def test_list_document_with_verifying_order_by_and_descend(self): """ Test listing documents with verifying the functionality of order_by and descend. """ ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_list_document_with_verifying_order_by_and_descend") created_res_id = created_res["data"]["dataset_id"] file_paths = ["test_data/test.txt", "test_data/empty.txt"] ragflow.upload_local_file(created_res_id, file_paths) # Call the list_document method response = ragflow.list_files(created_res_id) assert response["code"] == RetCode.SUCCESS and len(response["data"]["docs"]) == 2 docs = response["data"]["docs"] # reverse i = 1 for doc in docs: assert doc["name"] in file_paths[i] i -= 1 def test_list_document_with_verifying_order_by_and_ascend(self): """ Test listing documents with verifying the functionality of order_by and ascend. """ ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_list_document_with_verifying_order_by_and_ascend") created_res_id = created_res["data"]["dataset_id"] file_paths = ["test_data/test.txt", "test_data/test1.txt", "test_data/empty.txt"] ragflow.upload_local_file(created_res_id, file_paths) # Call the list_document method response = ragflow.list_files(created_res_id, descend=False) assert response["code"] == RetCode.SUCCESS and len(response["data"]["docs"]) == 3 docs = response["data"]["docs"] i = 0 for doc in docs: assert doc["name"] in file_paths[i] i += 1 # ----------------------------update files: enable, rename, template_type------------------------------------------- def test_update_nonexistent_document(self): """ Test updating a document which does not exist. """ ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_update_nonexistent_document") created_res_id = created_res["data"]["dataset_id"] params = { "name": "new_name" } res = ragflow.update_file(created_res_id, "weird_doc_id", **params) assert res["code"] == RetCode.ARGUMENT_ERROR and res["message"] == f"This document weird_doc_id cannot be found!" def test_update_document_without_parameters(self): """ Test updating a document without giving parameters. """ # create a dataset ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_update_document_without_parameters") created_res_id = created_res["data"]["dataset_id"] # upload files file_paths = ["test_data/test.txt"] uploading_res = ragflow.upload_local_file(created_res_id, file_paths) # get the doc_id data = uploading_res["data"][0] doc_id = data["id"] # update file params = { } update_res = ragflow.update_file(created_res_id, doc_id, **params) assert (update_res["code"] == RetCode.DATA_ERROR and update_res["message"] == "Please input at least one parameter that you want to update!") def test_update_document_in_nonexistent_dataset(self): """ Test updating a document in the nonexistent dataset. """ # create a dataset ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_update_document_in_nonexistent_dataset") created_res_id = created_res["data"]["dataset_id"] # upload files file_paths = ["test_data/test.txt"] uploading_res = ragflow.upload_local_file(created_res_id, file_paths) # get the doc_id data = uploading_res["data"][0] doc_id = data["id"] # update file params = { "name": "new_name" } update_res = ragflow.update_file("fake_dataset_id", doc_id, **params) assert (update_res["code"] == RetCode.DATA_ERROR and update_res["message"] == f"This dataset fake_dataset_id cannot be found!") def test_update_document_with_different_extension_name(self): """ Test the updating of a document with an extension name that differs from its original. """ # create a dataset ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_update_document_with_different_extension_name") created_res_id = created_res["data"]["dataset_id"] # upload files file_paths = ["test_data/test.txt"] uploading_res = ragflow.upload_local_file(created_res_id, file_paths) # get the doc_id data = uploading_res["data"][0] doc_id = data["id"] # update file params = { "name": "new_name.doc" } update_res = ragflow.update_file(created_res_id, doc_id, **params) assert (update_res["code"] == RetCode.ARGUMENT_ERROR and update_res["message"] == "The extension of file cannot be changed") def test_update_document_with_duplicate_name(self): """ Test the updating of a document with a duplicate name. """ # create a dataset ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_update_document_with_different_extension_name") created_res_id = created_res["data"]["dataset_id"] # upload files file_paths = ["test_data/test.txt", "test_data/test1.txt"] uploading_res = ragflow.upload_local_file(created_res_id, file_paths) # get the doc_id data = uploading_res["data"][0] doc_id = data["id"] # update file params = { "name": "test.txt" } update_res = ragflow.update_file(created_res_id, doc_id, **params) assert (update_res["code"] == RetCode.ARGUMENT_ERROR and update_res["message"] == "Duplicated document name in the same dataset.") def test_update_document_with_updating_its_name_with_success(self): """ Test the updating of a document's name with success. """ # create a dataset ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_update_document_with_updating_its_name_with_success") created_res_id = created_res["data"]["dataset_id"] # upload files file_paths = ["test_data/test.txt", "test_data/test1.txt"] uploading_res = ragflow.upload_local_file(created_res_id, file_paths) # get the doc_id data = uploading_res["data"][0] doc_id = data["id"] # update file params = { "name": "new_name.txt" } update_res = ragflow.update_file(created_res_id, doc_id, **params) assert (update_res["code"] == RetCode.SUCCESS and update_res["message"] == "Success" and update_res["data"]["name"] == "new_name.txt") def test_update_document_with_updating_its_template_type_with_success(self): """ Test the updating of a document's template type with success. """ # create a dataset ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_update_document_with_updating_its_template_type_with_success") created_res_id = created_res["data"]["dataset_id"] # upload files file_paths = ["test_data/test.txt", "test_data/test1.txt"] uploading_res = ragflow.upload_local_file(created_res_id, file_paths) # get the doc_id data = uploading_res["data"][0] doc_id = data["id"] # update file params = { "template_type": "laws" } update_res = ragflow.update_file(created_res_id, doc_id, **params) assert (update_res["code"] == RetCode.SUCCESS and update_res["message"] == "Success" and update_res["data"]["parser_id"] == "laws") def test_update_document_with_updating_its_enable_value_with_success(self): """ Test the updating of a document's enable value with success. """ # create a dataset ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_update_document_with_updating_its_enable_value_with_success") created_res_id = created_res["data"]["dataset_id"] # upload files file_paths = ["test_data/test.txt", "test_data/test1.txt"] uploading_res = ragflow.upload_local_file(created_res_id, file_paths) # get the doc_id data = uploading_res["data"][0] doc_id = data["id"] # update file params = { "enable": "0" } update_res = ragflow.update_file(created_res_id, doc_id, **params) assert (update_res["code"] == RetCode.SUCCESS and update_res["message"] == "Success" and update_res["data"]["status"] == "0") def test_update_document_with_updating_illegal_parameter(self): """ Test the updating of a document's illegal parameter. """ # create a dataset ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_update_document_with_updating_illegal_parameter") created_res_id = created_res["data"]["dataset_id"] # upload files file_paths = ["test_data/test.txt", "test_data/test1.txt"] uploading_res = ragflow.upload_local_file(created_res_id, file_paths) # get the doc_id data = uploading_res["data"][0] doc_id = data["id"] # update file params = { "illegal_parameter": "0" } update_res = ragflow.update_file(created_res_id, doc_id, **params) assert (update_res["code"] == RetCode.ARGUMENT_ERROR and update_res["message"] == "illegal_parameter is an illegal parameter.") def test_update_document_with_giving_its_name_value(self): """ Test the updating of a document's name without its name value. """ # create a dataset ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_update_document_with_updating_its_name_with_success") created_res_id = created_res["data"]["dataset_id"] # upload files file_paths = ["test_data/test.txt", "test_data/test1.txt"] uploading_res = ragflow.upload_local_file(created_res_id, file_paths) # get the doc_id data = uploading_res["data"][0] doc_id = data["id"] # update file params = { "name": "" } update_res = ragflow.update_file(created_res_id, doc_id, **params) assert (update_res["code"] == RetCode.DATA_ERROR and update_res["message"] == "There is no new name.") def test_update_document_with_giving_illegal_value_for_enable(self): """ Test the updating of a document's with giving illegal enable's value. """ # create a dataset ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_update_document_with_updating_its_name_with_success") created_res_id = created_res["data"]["dataset_id"] # upload files file_paths = ["test_data/test.txt", "test_data/test1.txt"] uploading_res = ragflow.upload_local_file(created_res_id, file_paths) # get the doc_id data = uploading_res["data"][0] doc_id = data["id"] # update file params = { "enable": "?" } update_res = ragflow.update_file(created_res_id, doc_id, **params) assert (update_res["code"] == RetCode.DATA_ERROR and update_res["message"] == "Illegal value ? for 'enable' field.") def test_update_document_with_giving_illegal_value_for_type(self): """ Test the updating of a document's with giving illegal type's value. """ # create a dataset ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_update_document_with_updating_its_name_with_success") created_res_id = created_res["data"]["dataset_id"] # upload files file_paths = ["test_data/test.txt", "test_data/test1.txt"] uploading_res = ragflow.upload_local_file(created_res_id, file_paths) # get the doc_id data = uploading_res["data"][0] doc_id = data["id"] # update file params = { "template_type": "?" } update_res = ragflow.update_file(created_res_id, doc_id, **params) assert (update_res["code"] == RetCode.DATA_ERROR and update_res["message"] == "Illegal value ? for 'template_type' field.") # ----------------------------download a file----------------------------------------------------- def test_download_nonexistent_document(self): """ Test downloading a document which does not exist. """ # create a dataset ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_download_nonexistent_document") created_res_id = created_res["data"]["dataset_id"] res = ragflow.download_file(created_res_id, "imagination") assert res["code"] == RetCode.ARGUMENT_ERROR and res["message"] == f"This document 'imagination' cannot be found!" def test_download_document_in_nonexistent_dataset(self): """ Test downloading a document whose dataset is nonexistent. """ # create a dataset ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_download_nonexistent_document") created_res_id = created_res["data"]["dataset_id"] # upload files file_paths = ["test_data/test.txt"] uploading_res = ragflow.upload_local_file(created_res_id, file_paths) # get the doc_id data = uploading_res["data"][0] doc_id = data["id"] # download file res = ragflow.download_file("imagination", doc_id) assert res["code"] == RetCode.DATA_ERROR and res["message"] == f"This dataset 'imagination' cannot be found!" def test_download_document_with_success(self): """ Test the downloading of a document with success. """ # create a dataset ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_download_nonexistent_document") created_res_id = created_res["data"]["dataset_id"] # upload files file_paths = ["test_data/test.txt"] uploading_res = ragflow.upload_local_file(created_res_id, file_paths) # get the doc_id data = uploading_res["data"][0] doc_id = data["id"] # download file with open("test_data/test.txt", "rb") as file: binary_data = file.read() res = ragflow.download_file(created_res_id, doc_id) assert res["code"] == RetCode.SUCCESS and res["data"] == binary_data def test_download_an_empty_document(self): """ Test the downloading of an empty document. """ # create a dataset ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_download_nonexistent_document") created_res_id = created_res["data"]["dataset_id"] # upload files file_paths = ["test_data/empty.txt"] uploading_res = ragflow.upload_local_file(created_res_id, file_paths) # get the doc_id data = uploading_res["data"][0] doc_id = data["id"] # download file res = ragflow.download_file(created_res_id, doc_id) assert res["code"] == RetCode.DATA_ERROR and res["message"] == "This file is empty." # ----------------------------start parsing----------------------------------------------------- def test_start_parsing_document_with_success(self): """ Test the parsing of a document with success. """ # create a dataset ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_start_parsing_document_with_success") created_res_id = created_res["data"]["dataset_id"] # upload files file_paths = ["test_data/lol.txt"] uploading_res = ragflow.upload_local_file(created_res_id, file_paths) # get the doc_id data = uploading_res["data"][0] doc_id = data["id"] # parse file res = ragflow.start_parsing_document(created_res_id, doc_id) assert res["code"] == RetCode.SUCCESS and res["message"] == "" def test_start_parsing_nonexistent_document(self): """ Test the parsing a document which does not exist. """ # create a dataset ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_start_parsing_nonexistent_document") created_res_id = created_res["data"]["dataset_id"] res = ragflow.start_parsing_document(created_res_id, "imagination") assert res["code"] == RetCode.ARGUMENT_ERROR and res["message"] == "This document 'imagination' cannot be found!" def test_start_parsing_document_in_nonexistent_dataset(self): """ Test the parsing a document whose dataset is nonexistent. """ # create a dataset ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_download_nonexistent_document") created_res_id = created_res["data"]["dataset_id"] # upload files file_paths = ["test_data/test.txt"] uploading_res = ragflow.upload_local_file(created_res_id, file_paths) # get the doc_id data = uploading_res["data"][0] doc_id = data["id"] # parse res = ragflow.start_parsing_document("imagination", doc_id) assert res["code"] == RetCode.DATA_ERROR and res["message"] == "This dataset 'imagination' cannot be found!" def test_start_parsing_an_empty_document(self): """ Test the parsing of an empty document. """ # create a dataset ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_download_nonexistent_document") created_res_id = created_res["data"]["dataset_id"] # upload files file_paths = ["test_data/empty.txt"] uploading_res = ragflow.upload_local_file(created_res_id, file_paths) # get the doc_id data = uploading_res["data"][0] doc_id = data["id"] res = ragflow.start_parsing_document(created_res_id, doc_id) assert res["code"] == RetCode.SUCCESS and res["message"] == "Empty data in the document: empty.txt; " # ------------------------parsing multiple documents---------------------------- def test_start_parsing_documents_in_nonexistent_dataset(self): """ Test the parsing documents whose dataset is nonexistent. """ # create a dataset ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_download_nonexistent_document") created_res_id = created_res["data"]["dataset_id"] # upload files file_paths = ["test_data/test.txt"] uploading_res = ragflow.upload_local_file(created_res_id, file_paths) # parse res = ragflow.start_parsing_documents("imagination") assert res["code"] == RetCode.DATA_ERROR and res["message"] == "This dataset 'imagination' cannot be found!" def test_start_parsing_multiple_documents(self): """ Test the parsing documents with a success. """ # create a dataset ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset(" test_start_parsing_multiple_documents") created_res_id = created_res["data"]["dataset_id"] # upload files file_paths = ["test_data/test.txt", "test_data/test1.txt"] ragflow.upload_local_file(created_res_id, file_paths) res = ragflow.start_parsing_documents(created_res_id) assert res["code"] == RetCode.SUCCESS and res["data"] is True and res["message"] == "" def test_start_parsing_multiple_documents_with_one_empty_file(self): """ Test the parsing documents, one of which is empty. """ # create a dataset ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset(" test_start_parsing_multiple_documents") created_res_id = created_res["data"]["dataset_id"] # upload files file_paths = ["test_data/test.txt", "test_data/test1.txt", "test_data/empty.txt"] ragflow.upload_local_file(created_res_id, file_paths) res = ragflow.start_parsing_documents(created_res_id) assert res["code"] == RetCode.SUCCESS and res["message"] == "Empty data in the document: empty.txt; " def test_start_parsing_multiple_specific_documents(self): """ Test the parsing documents whose document ids are specified. """ # create a dataset ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset(" test_start_parsing_multiple_documents") created_res_id = created_res["data"]["dataset_id"] # upload files file_paths = ["test_data/test.txt", "test_data/test1.txt"] uploading_res = ragflow.upload_local_file(created_res_id, file_paths) # get the doc_id data = uploading_res["data"] doc_ids = [] for d in data: doc_ids.append(d["id"]) res = ragflow.start_parsing_documents(created_res_id, doc_ids) assert res["code"] == RetCode.SUCCESS and res["message"] == "" def test_start_re_parsing_multiple_specific_documents(self): """ Test the re-parsing documents. """ # create a dataset ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset(" test_start_parsing_multiple_documents") created_res_id = created_res["data"]["dataset_id"] # upload files file_paths = ["test_data/test.txt", "test_data/test1.txt"] uploading_res = ragflow.upload_local_file(created_res_id, file_paths) # get the doc_id data = uploading_res["data"] doc_ids = [] for d in data: doc_ids.append(d["id"]) res = ragflow.start_parsing_documents(created_res_id, doc_ids) assert res["code"] == RetCode.SUCCESS and res["message"] == "" # re-parse res = ragflow.start_parsing_documents(created_res_id, doc_ids) assert res["code"] == RetCode.SUCCESS and res["message"] == "" def test_start_re_parsing_multiple_specific_documents_with_changing_parser_id(self): """ Test the re-parsing documents after changing the parser id. """ # create a dataset ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset(" test_start_parsing_multiple_documents") created_res_id = created_res["data"]["dataset_id"] # upload files file_paths = ["test_data/test.txt", "test_data/test1.txt"] uploading_res = ragflow.upload_local_file(created_res_id, file_paths) # get the doc_id data = uploading_res["data"] doc_ids = [] for d in data: doc_ids.append(d["id"]) res = ragflow.start_parsing_documents(created_res_id, doc_ids) assert res["code"] == RetCode.SUCCESS and res["message"] == "" # general -> laws params = { "template_type": "laws" } ragflow.update_file(created_res_id, doc_ids[0], **params) # re-parse res = ragflow.start_parsing_documents(created_res_id, doc_ids) assert res["code"] == RetCode.SUCCESS and res["message"] == "" def test_start_re_parsing_multiple_specific_documents_with_changing_illegal_parser_id(self): """ Test the re-parsing documents after changing an illegal parser id. """ # create a dataset ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset(" test_start_parsing_multiple_documents") created_res_id = created_res["data"]["dataset_id"] # upload files file_paths = ["test_data/test.txt", "test_data/test1.txt"] uploading_res = ragflow.upload_local_file(created_res_id, file_paths) # get the doc_id data = uploading_res["data"] doc_ids = [] for d in data: doc_ids.append(d["id"]) res = ragflow.start_parsing_documents(created_res_id, doc_ids) assert res["code"] == RetCode.SUCCESS and res["message"] == "" # general -> illegal params = { "template_type": "illegal" } res = ragflow.update_file(created_res_id, doc_ids[0], **params) assert res["code"] == RetCode.DATA_ERROR and res["message"] == "Illegal value illegal for 'template_type' field." # re-parse res = ragflow.start_parsing_documents(created_res_id, doc_ids) assert res["code"] == RetCode.SUCCESS and res["message"] == "" def test_start_parsing_multiple_specific_documents_with_changing_illegal_parser_id(self): """ Test the parsing documents after changing an illegal parser id. """ # create a dataset ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset(" test_start_parsing_multiple_documents") created_res_id = created_res["data"]["dataset_id"] # upload files file_paths = ["test_data/test.txt", "test_data/test1.txt"] uploading_res = ragflow.upload_local_file(created_res_id, file_paths) # get the doc_id data = uploading_res["data"] doc_ids = [] for d in data: doc_ids.append(d["id"]) # general -> illegal params = { "template_type": "illegal" } res = ragflow.update_file(created_res_id, doc_ids[0], **params) assert res["code"] == RetCode.DATA_ERROR and res["message"] == "Illegal value illegal for 'template_type' field." # re-parse res = ragflow.start_parsing_documents(created_res_id, doc_ids) assert res["code"] == RetCode.SUCCESS and res["message"] == "" def test_start_parsing_multiple_documents_in_the_dataset_whose_parser_id_is_illegal(self): """ Test the parsing documents whose dataset's parser id is illegal. """ # create a dataset ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_start_parsing_multiple_documents_in_the_dataset_whose_parser_id_is_illegal") created_res_id = created_res["data"]["dataset_id"] # update the parser id params = { "chunk_method": "illegal" } res = ragflow.update_dataset("test_start_parsing_multiple_documents_in_the_dataset_whose_parser_id_is_illegal", **params) assert res["code"] == RetCode.DATA_ERROR and res["message"] == "Illegal value illegal for 'chunk_method' field." # upload files file_paths = ["test_data/test.txt", "test_data/test1.txt"] uploading_res = ragflow.upload_local_file(created_res_id, file_paths) # get the doc_id data = uploading_res["data"] doc_ids = [] for d in data: doc_ids.append(d["id"]) # parse res = ragflow.start_parsing_documents(created_res_id, doc_ids) assert res["code"] == RetCode.SUCCESS and res["message"] == "" # ----------------------------stop parsing----------------------------------------------------- def test_stop_parsing_document_with_success(self): """ Test the stopping parsing of a document with success. """ # create a dataset ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_start_parsing_document_with_success") created_res_id = created_res["data"]["dataset_id"] # upload files file_paths = ["test_data/lol.txt"] uploading_res = ragflow.upload_local_file(created_res_id, file_paths) # get the doc_id data = uploading_res["data"][0] doc_id = data["id"] # parse file res = ragflow.start_parsing_document(created_res_id, doc_id) assert res["code"] == RetCode.SUCCESS and res["message"] == "" res = ragflow.stop_parsing_document(created_res_id, doc_id) assert res["code"] == RetCode.SUCCESS and res["message"] == "" def test_stop_parsing_nonexistent_document(self): """ Test the stopping parsing a document which does not exist. """ # create a dataset ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_start_parsing_nonexistent_document") created_res_id = created_res["data"]["dataset_id"] res = ragflow.stop_parsing_document(created_res_id, "imagination.txt") assert res["code"] == RetCode.ARGUMENT_ERROR and res["message"] == "This document 'imagination.txt' cannot be found!" def test_stop_parsing_document_in_nonexistent_dataset(self): """ Test the stopping parsing a document whose dataset is nonexistent. """ # create a dataset ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_download_nonexistent_document") created_res_id = created_res["data"]["dataset_id"] # upload files file_paths = ["test_data/test.txt"] uploading_res = ragflow.upload_local_file(created_res_id, file_paths) # get the doc_id data = uploading_res["data"][0] doc_id = data["id"] # parse res = ragflow.stop_parsing_document("imagination", doc_id) assert res["code"] == RetCode.DATA_ERROR and res["message"] == "This dataset 'imagination' cannot be found!" # ------------------------stop parsing multiple documents---------------------------- def test_stop_parsing_documents_in_nonexistent_dataset(self): """ Test the stopping parsing documents whose dataset is nonexistent. """ # create a dataset ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_download_nonexistent_document") created_res_id = created_res["data"]["dataset_id"] # upload files file_paths = ["test_data/test.txt"] uploading_res = ragflow.upload_local_file(created_res_id, file_paths) # parse res = ragflow.stop_parsing_documents("imagination") assert res["code"] == RetCode.DATA_ERROR and res["message"] == "This dataset 'imagination' cannot be found!" def test_stop_parsing_multiple_documents(self): """ Test the stopping parsing documents with a success. """ # create a dataset ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_start_parsing_multiple_documents") created_res_id = created_res["data"]["dataset_id"] # upload files file_paths = ["test_data/test.txt", "test_data/test1.txt"] ragflow.upload_local_file(created_res_id, file_paths) res = ragflow.start_parsing_documents(created_res_id) assert res["code"] == RetCode.SUCCESS and res["data"] is True and res["message"] == "" res = ragflow.stop_parsing_documents(created_res_id) assert res["code"] == RetCode.SUCCESS and res["data"] is True and res["message"] == "" def test_stop_parsing_multiple_documents_with_one_empty_file(self): """ Test the stopping parsing documents, one of which is empty. """ # create a dataset ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset(" test_start_parsing_multiple_documents") created_res_id = created_res["data"]["dataset_id"] # upload files file_paths = ["test_data/test.txt", "test_data/test1.txt", "test_data/empty.txt"] ragflow.upload_local_file(created_res_id, file_paths) res = ragflow.start_parsing_documents(created_res_id) assert res["code"] == RetCode.SUCCESS and res["message"] == "Empty data in the document: empty.txt; " res = ragflow.stop_parsing_documents(created_res_id) assert res["code"] == RetCode.SUCCESS and res["data"] is True and res["message"] == "" def test_stop_parsing_multiple_specific_documents(self): """ Test the stopping parsing documents whose document ids are specified. """ # create a dataset ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset(" test_start_parsing_multiple_documents") created_res_id = created_res["data"]["dataset_id"] # upload files file_paths = ["test_data/test.txt", "test_data/test1.txt"] uploading_res = ragflow.upload_local_file(created_res_id, file_paths) # get the doc_id data = uploading_res["data"] doc_ids = [] for d in data: doc_ids.append(d["id"]) res = ragflow.start_parsing_documents(created_res_id, doc_ids) assert res["code"] == RetCode.SUCCESS and res["message"] == "" res = ragflow.stop_parsing_documents(created_res_id, doc_ids) assert res["code"] == RetCode.SUCCESS and res["data"] is True and res["message"] == "" # ----------------------------show the status of the file----------------------------------------------------- def test_show_status_with_success(self): # create a dataset ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_show_status_with_success") created_res_id = created_res["data"]["dataset_id"] # upload files file_paths = ["test_data/lol.txt"] uploading_res = ragflow.upload_local_file(created_res_id, file_paths) # get the doc_id data = uploading_res["data"][0] doc_id = data["id"] # parse file res = ragflow.start_parsing_document(created_res_id, doc_id) assert res["code"] == RetCode.SUCCESS and res["message"] == "" # show status status_res = ragflow.show_parsing_status(created_res_id, doc_id) assert status_res["code"] == RetCode.SUCCESS and status_res["data"]["status"] == "RUNNING" def test_show_status_nonexistent_document(self): """ Test showing the status of a document which does not exist. """ # create a dataset ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_show_status_nonexistent_document") created_res_id = created_res["data"]["dataset_id"] res = ragflow.show_parsing_status(created_res_id, "imagination") assert res["code"] == RetCode.DATA_ERROR and res["message"] == "This document: 'imagination' is not a valid document." def test_show_status_document_in_nonexistent_dataset(self): """ Test showing the status of a document whose dataset is nonexistent. """ # create a dataset ragflow = RAGFlow(API_KEY, HOST_ADDRESS) created_res = ragflow.create_dataset("test_show_status_document_in_nonexistent_dataset") created_res_id = created_res["data"]["dataset_id"] # upload files file_paths = ["test_data/test.txt"] uploading_res = ragflow.upload_local_file(created_res_id, file_paths) # get the doc_id data = uploading_res["data"][0] doc_id = data["id"] # parse res = ragflow.show_parsing_status("imagination", doc_id) assert res["code"] == RetCode.DATA_ERROR and res["message"] == "This dataset: 'imagination' cannot be found!" # ----------------------------list the chunks of the file----------------------------------------------------- # ----------------------------delete the chunk----------------------------------------------------- # ----------------------------edit the status of the chunk----------------------------------------------------- # ----------------------------insert a new chunk----------------------------------------------------- # ----------------------------get a specific chunk----------------------------------------------------- # ----------------------------retrieval test-----------------------------------------------------