from docx import Document import json import datetime import tempfile from pathlib import Path from unidecode import unidecode from langchain_community.document_loaders import JSONLoader, UnstructuredWordDocumentLoader, WebBaseLoader, AsyncHtmlLoader from langchain_community.document_transformers import Html2TextTransformer from langchain_text_splitters import RecursiveCharacterTextSplitter, RecursiveJsonSplitter from langchain_community.vectorstores import FAISS from langchain_google_genai import GoogleGenerativeAIEmbeddings, ChatGoogleGenerativeAI import google.generativeai as genai from tqdm import tqdm from pathlib import Path import shutil import requests from bs4 import BeautifulSoup import os from langchain_docling import DoclingLoader#, ExportType from langchain_docling.loader import ExportType import logging from langchain.schema import Document import re import ast # logging.getLogger("langchain").setLevel(logging.ERROR) logging.getLogger().setLevel(logging.ERROR) import numpy as np print(np.__version__) # from file_loader import get_vectorstore key = os.environ["GOOGLE_API_KEY"] # import asyncio # from urllib.parse import urljoin # from playwright.async_api import async_playwright # from langchain_community.document_loaders import AsyncHtmlLoader # from langchain_community.document_transformers import Html2TextTransformer # from tqdm.asyncio import tqdm # async def _fetch_urls(base_url): # """Extract all links from a JavaScript-rendered webpage.""" # async with async_playwright() as p: # try: # browser = await p.chromium.launch(headless=True) # page = await browser.new_page() # await page.goto(base_url) # await page.wait_for_load_state("networkidle") # urls = set() # links = await page.locator("a").all() # for link in links: # href = await link.get_attribute("href") # if href and "#" not in href: # full_url = urljoin(base_url, href) # if full_url.startswith(base_url): # urls.add(full_url) # await browser.close() # except Exception as e: # print(f"⚠️ Không thể truy cập {base_url}: {e}") # return [] # Trả về danh sách rỗng nếu gặp lỗi # return list(urls) # async def _fetch_web_content(urls): # """Fetch HTML content and convert it to text, with a progress bar.""" # docs = [] # progress_bar = tqdm(total=len(urls), desc="Scraping Pages", unit="page") # for page_url in urls: # try: # loader = AsyncHtmlLoader(page_url) # html2text = Html2TextTransformer() # html = await loader.aload() # doc = html2text.transform_documents(html) # docs.extend(doc) # except Exception as e: # print(f"Error loading {page_url}: {e}") # progress_bar.update(1) # Update progress bar # progress_bar.close() # return docs # def scrape_website(base_urls): # """ # Scrapes a list of base URLs and extracts their content. # Includes a progress bar for tracking. # """ # async def _main(): # all_urls = [] # for base_url in base_urls: # urls = await _fetch_urls(base_url) # all_urls.extend(urls) # docs = await _fetch_web_content(all_urls) # return docs # return asyncio.run(_main) # class ChunkerWrapper: # def __init__(self, splitter): # self.splitter = splitter # def chunk(self, text): # # Use the 'split_text' method of the splitter to divide the text # return self.splitter.split_text(text) # def get_web_documents(base_urls=['https://nct.neu.edu.vn/']): # """Tải nội dung từ danh sách URL với thanh tiến trình""" # docs = [] # text_splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=100) # chunker = ChunkerWrapper(text_splitter) # for page_url in tqdm(base_urls, desc="Đang tải trang", unit="url"): # try: # # loader = WebBaseLoader(page_url) # loader = DoclingLoader(file_path=page_url,chunker=chunker # This will break your doc into manageable pieces. # ) # html = loader.load() # doc = html # docs.extend(doc) # except Exception as e: # print(f"Lỗi khi tải {page_url}: {e}") # print(f"Tải thành công {len(docs)} trang.") # return docs # def load_text_data(file_path): # """Tải nội dung văn bản từ file DOCX (đã loại bảng).""" # # cleaned_file = Document(file_path) #remove_tables_from_docx(file_path) # text_splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=100) # chunker = ChunkerWrapper(text_splitter) # return DoclingLoader(file_path=file_path, chunker=chunker # This will break your doc into manageable pieces. # ).load() def get_web_documents(base_urls=['https://nct.neu.edu.vn/']): """Fetch content from a list of URLs with a progress bar.""" docs = [] for page_url in tqdm(base_urls, desc="Loading page", unit="url"): try: loader = DoclingLoader( file_path=page_url, export_type=ExportType.MARKDOWN # Enable internal chunking ) doc = loader.load() docs.extend(doc) except Exception as e: print(f"Error loading {page_url}: {e}") print(f"Successfully loaded {len(docs)} documents.") return docs def load_text_data(file_path): """Load text content from a DOCX file (tables removed).""" text_splitter = RecursiveCharacterTextSplitter(chunk_size=5000, chunk_overlap=1000) loader = DoclingLoader( file_path=file_path, export_type=ExportType.MARKDOWN, # Enable internal chunking, chunker = text_splitter ) return loader.load() def log_message(messages, filename="chat_log.txt"): """Ghi lịch sử tin nhắn vào file log""" with open(filename, "a", encoding="utf-8") as f: log_entry = { "timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "conversation": messages } f.write(json.dumps(log_entry, ensure_ascii=False) + "\n") def remove_tables_from_docx(file_path): """Tạo bản sao của file DOCX nhưng loại bỏ tất cả bảng bên trong.""" doc = Document(file_path) new_doc = Document() for para in doc.paragraphs: new_doc.add_paragraph(para.text) # 📌 Lưu vào file tạm, đảm bảo đóng đúng cách with tempfile.NamedTemporaryFile(delete=False, suffix=".docx") as temp_file: temp_path = temp_file.name new_doc.save(temp_path) return temp_path # ✅ Trả về đường dẫn file mới, không làm hỏng file gốc def extract_tables_from_docx(file_path): doc = Document(file_path) tables = [] all_paragraphs = [p.text.strip() for p in doc.paragraphs if p.text.strip()] # Lấy tất cả đoạn văn bản không rỗng table_index = 0 para_index = 0 table_positions = [] # Xác định vị trí của bảng trong tài liệu for element in doc.element.body: if element.tag.endswith("tbl"): table_positions.append((table_index, para_index)) table_index += 1 elif element.tag.endswith("p"): para_index += 1 for idx, (table_idx, para_idx) in enumerate(table_positions): data = [] for row in doc.tables[table_idx].rows: data.append([cell.text.strip() for cell in row.cells]) if len(data) > 1: # Chỉ lấy bảng có dữ liệu # Lấy 5 dòng trước và sau bảng related_start = max(0, para_idx - 5) related_end = min(len(all_paragraphs), para_idx + 5) related_text = all_paragraphs[related_start:related_end] tables.append({"table": idx + 1, "content": data, "related": related_text}) return tables def convert_to_json(tables): structured_data = {} for table in tables: headers = [unidecode(h) for h in table["content"][0]] # Bỏ dấu ở headers rows = [[unidecode(cell) for cell in row] for row in table["content"][1:]] # Bỏ dấu ở dữ liệu json_table = [dict(zip(headers, row)) for row in rows if len(row) == len(headers)] related_text = [unidecode(text) for text in table["related"]] # Bỏ dấu ở văn bản liên quan structured_data[table["table"]] = { "content": json_table, "related": related_text } return json.dumps(structured_data, indent=4, ensure_ascii=False) def save_json_to_file(json_data, output_path): with open(output_path, 'w', encoding='utf-8') as f: json.dump(json.loads(json_data), f, ensure_ascii=False, indent=4) # def load_json_with_langchain(json_path): # loader = JSONLoader(file_path=json_path, jq_schema='.. | .content?', text_content=False) # data = loader.load() # # # Kiểm tra xem dữ liệu có bị lỗi không # # print("Sample Data:", data[:2]) # In thử 2 dòng đầu # return data def load_json_manually(json_path): with open(json_path, 'r', encoding='utf-8') as f: data = json.load(f) return data def load_table_data(file_path, output_json_path): tables = extract_tables_from_docx(file_path) json_output = convert_to_json(tables) save_json_to_file(json_output, output_json_path) table_data = load_json_manually(output_json_path) return table_data def get_splits(file_path, output_json_path): # table_data = load_table_data(file_path, output_json_path) text_data = load_text_data(file_path) # Chia nhỏ văn bản # json_splitter = RecursiveJsonSplitter(max_chunk_size = 1000) # text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=250) # table_splits = json_splitter.create_documents(texts=[table_data]) # text_splits = text_splitter.split_documents(text_data) # all_splits = table_splits + text_splits DoclingLoader return text_data #text_splits def get_json_splits_only(file_path): table_data = load_json_manually(file_path) def remove_accents(obj): #xoa dau tieng viet if isinstance(obj, str): return unidecode(obj) elif isinstance(obj, list): return [remove_accents(item) for item in obj] elif isinstance(obj, dict): return {remove_accents(k): remove_accents(v) for k, v in obj.items()} return obj cleaned_data = remove_accents(table_data) wrapped_data = {"data": cleaned_data} if isinstance(cleaned_data, list) else cleaned_data json_splitter = RecursiveJsonSplitter(max_chunk_size = 2000) text_splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=100) table_splits = json_splitter.create_documents(texts=[wrapped_data]) table_splits = text_splitter.split_documents(table_splits) return table_splits def list_docx_files(folder_path): """List all DOCX and DOC files in the given folder (including subfolders).""" return [str(file) for file in Path(folder_path).rglob("*.docx")] + \ [str(file) for file in Path(folder_path).rglob("*.pdf")] def prompt_order0(queries): text = 'Câu hỏi: ' for q in queries: text += f'{str(q)}. ' return text def prompt_order(queries): text = 'IMPORTANT: Here is the questions of user in order which the current question is the last question, use that and the context above to know the best answer for the current question:\n' i = 0 for q in queries: i += 1 text += f'Question {i}: {str(q)}\n' return text def update_documents_metadata(documents, new_metadata): updated_documents = [] for doc in documents: # Preserve the original 'source' original_source = doc.metadata.get("source") # Update metadata with new key-value pairs doc.metadata.update(new_metadata) # Ensure the 'source' remains unchanged if original_source: doc.metadata["source"] = original_source updated_documents.append(doc) return updated_documents def extract_metadata(response): if not isinstance(response, str): response = str(response) # Chuyển sang string nếu cần # Tìm tất cả các dictionary trong chuỗi đầu vào matches = re.findall(r'\{.*?\}', response, re.DOTALL) if not matches: return None # Trả về None nếu không tìm thấy dict nào smallest_dict = None min_length = float("inf") for match in matches: try: parsed_dict = ast.literal_eval(match) # Chuyển đổi string thành dictionary if isinstance(parsed_dict, dict): dict_length = len(str(parsed_dict)) # Độ dài chuỗi của dict if dict_length < min_length: smallest_dict = parsed_dict min_length = dict_length except Exception: continue # Bỏ qua nếu không phải dictionary hợp lệ return smallest_dict # def update_metadata(metadata, metadata_child): # for key, new_value in metadata_child.items(): # if key in metadata: # # Nếu giá trị hiện tại không phải list, chuyển đổi thành list # if not isinstance(metadata[key], list): # metadata[key] = [metadata[key]] # # Nếu giá trị mới cũng là list thì thêm tất cả, ngược lại thêm từng phần tử # if isinstance(new_value, list): # metadata[key].extend(new_value) # else: # metadata[key].append(new_value) # else: # # Nếu key chưa có, tạo mới với giá trị được chuyển sang dạng list (nếu cần) # metadata[key] = new_value if isinstance(new_value, list) else [new_value] # return metadata def update_metadata(metadata, metadata_child): for key, new_value in metadata_child.items(): if key in metadata: # Nếu giá trị hiện tại không phải list, chuyển đổi thành list if not isinstance(metadata[key], list): metadata[key] = [metadata[key]] # Nếu giá trị mới là list, duyệt từng phần tử if isinstance(new_value, list): for item in new_value: if item not in metadata[key]: metadata[key].append(item) else: if new_value not in metadata[key]: metadata[key].append(new_value) else: # Nếu key chưa có, tạo mới với giá trị dưới dạng list metadata[key] = new_value if isinstance(new_value, list) else [new_value] return metadata def define_metadata(input_text): condition1 = 'Chương trình' condition2 = 'Đề án' condition3 = 'Đề cương' condition4 = ['Trí tuệ nhân tạo', 'Toán kinh tế', 'Thống kê kinh tế', 'Phân tích dữ liệu trong Kinh tế', 'Kỹ thuật phần mềm', 'Khoa học máy tính', 'Khoa học dữ liệu', 'Hệ thống thông tin quản lý', 'Hệ thống thông tin', 'Định phí bảo hiểm và Quản trị rủi ro', 'Công nghệ thông tin', 'An toàn thông tin'] result = {} # Xác định loại tài liệu if condition3 in input_text: result['Tai lieu ve'] = 'Đề cương' elif condition1 in input_text: result['Tai lieu ve'] = 'Chương trình đào tạo' elif condition2 in input_text: result['Tai lieu ve'] = 'Đề án' # Nếu tài liệu là "Đề án", thêm tất cả các ngành và khoa theo mapping dưới đây. if result.get('Tai lieu ve') == 'Đề án': all_nganh = [] all_khoa = [] for cond in condition4: # Xác định khoa dựa trên tên ngành if cond in ['An toàn thông tin', 'Công nghệ thông tin', 'Khoa học máy tính', 'Kỹ thuật phần mềm']: khoa = 'Công nghệ thông tin (FIT)' cond_value = cond elif cond in ['Toán kinh tế', 'Phân tích dữ liệu trong Kinh tế', 'Định phí bảo hiểm và Quản trị rủi ro']: khoa = 'Toán Kinh tế (MFE)' if cond == 'Toán kinh tế': cond_value = 'Toán kinh tế (TOKT)' elif cond == 'Phân tích dữ liệu trong Kinh tế': cond_value = 'Phân tích dữ liệu trong Kinh tế (DSEB)' elif cond == 'Định phí bảo hiểm và Quản trị rủi ro': cond_value = 'Định phí bảo hiểm và Quản trị rủi ro (Actuary)' elif cond in ['Khoa học dữ liệu', 'Trí tuệ nhân tạo']: khoa = 'Khoa học dữ liệu và Trí tuệ nhân tạo (FDA)' cond_value = cond elif cond == 'Thống kê kinh tế': khoa = 'Thống kê' cond_value = cond elif cond in ['Hệ thống thông tin', 'Hệ thống thông tin quản lý']: khoa = 'Hệ thống thông tin quản lý (MIS)' cond_value = cond else: khoa = None cond_value = cond all_nganh.append(cond_value) if khoa is not None: all_khoa.append(khoa) result['Nganh'] = all_nganh result['Khoa'] = all_khoa else: # Nếu không phải "Đề án", duyệt từng điều kiện trong condition4 dựa trên input_text for cond in condition4: if cond in input_text: if cond in ['An toàn thông tin', 'Công nghệ thông tin', 'Khoa học máy tính', 'Kỹ thuật phần mềm']: result['Khoa'] = 'Công nghệ thông tin (FIT)' result['Nganh'] = cond elif cond in ['Toán kinh tế', 'Phân tích dữ liệu trong Kinh tế', 'Định phí bảo hiểm và Quản trị rủi ro']: result['Khoa'] = 'Toán Kinh tế (MFE)' if cond == 'Toán kinh tế': result['Nganh'] = 'Toán kinh tế (TOKT)' elif cond == 'Phân tích dữ liệu trong Kinh tế': result['Nganh'] = 'Phân tích dữ liệu trong Kinh tế (DSEB)' elif cond == 'Định phí bảo hiểm và Quản trị rủi ro': result['Nganh'] = 'Định phí bảo hiểm và Quản trị rủi ro (Actuary)' elif cond in ['Khoa học dữ liệu', 'Trí tuệ nhân tạo']: result['Khoa'] = 'Khoa học dữ liệu và Trí tuệ nhân tạo (FDA)' result['Nganh'] = cond elif cond == 'Thống kê kinh tế': result['Khoa'] = 'Thống kê' result['Nganh'] = cond elif cond in ['Hệ thống thông tin', 'Hệ thống thông tin quản lý']: result['Khoa'] = 'Hệ thống thông tin quản lý (MIS)' result['Nganh'] = cond return result # #cond1 cond2 la str, con3 la list ten cac nganh # result = {} # if condition3 in input_text: # result['Tai lieu ve'] = 'Đề cương' # elif condition1 in input_text: # result['Tai lieu ve'] = 'Chương trình đào tạo' # elif condition2 in input_text: # result['Tai lieu ve'] = 'Đề án' # for cond in condition4: # if cond in input_text: # if cond in ['An toàn thông tin', 'Công nghệ thông tin', 'Khoa học máy tính', 'Kỹ thuật phần mềm']: # result['Khoa'] = 'Công nghệ thông tin (FIT)' # elif cond in ['Toán kinh tế', 'Phân tích dữ liệu trong Kinh tế', 'Định phí bảo hiểm và Quản trị rủi ro']: # result['Khoa'] = 'Toán Kinh tế (MFE)' # if cond == 'Toán kinh tế': # cond == 'Toán kinh tế (TOKT)' # elif cond == 'Phân tích dữ liệu trong Kinh tế': # cond == 'Phân tích dữ liệu trong Kinh tế (DSEB)' # elif cond == 'Định phí bảo hiểm và Quản trị rủi ro': # cond == 'Định phí bảo hiểm và Quản trị rủi ro (Actuary)' # elif cond in ['Khoa học dữ liệu', 'Trí tuệ nhân tạo']: # result['Khoa'] = 'Khoa học dữ liệu và Trí tuệ nhân tạo (FDA)' # elif cond == 'Thống kê kinh tế': # result['Khoa'] = 'Thống kê' # elif cond in ['Hệ thống thông tin', 'Hệ thống thông tin quản lý']: # result['Khoa'] = 'Hệ thống thông tin quản lý (MIS)' # result['Nganh'] = cond # return result