Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import FileResponse | |
| import litellm | |
| import pandas as pd | |
| from pydantic import BaseModel, Field | |
| from typing import Any, List, Dict, Optional | |
| import re | |
| import subprocess | |
| import requests | |
| import os | |
| from lxml import etree | |
| import zipfile | |
| import io | |
| import warnings | |
| warnings.filterwarnings("ignore") | |
| from bs4 import BeautifulSoup | |
| app = FastAPI(title="Requirements Extractor") | |
| app.add_middleware(CORSMiddleware, allow_credentials=True, allow_headers=["*"], allow_methods=["*"], allow_origins=["*"]) | |
| class MeetingsRequest(BaseModel): | |
| working_group: str | |
| class MeetingsResponse(BaseModel): | |
| meetings: Dict[str, str] | |
| class DataRequest(BaseModel): | |
| working_group: str | |
| meeting: str | |
| class DataResponse(BaseModel): | |
| data: List[Dict[Any, Any]] | |
| class DocRequirements(BaseModel): | |
| doc_id: str | |
| context: str | |
| requirements: List[str] | |
| class DocInfo(BaseModel): | |
| document: str | |
| url: str | |
| class RequirementsRequest(BaseModel): | |
| documents: List[DocInfo] | |
| class RequirementsResponse(BaseModel): | |
| requirements: List[DocRequirements] | |
| NSMAP = { | |
| 'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main', | |
| 'v': 'urn:schemas-microsoft-com:vml' | |
| } | |
| def get_docx_archive(url: str) -> zipfile.ZipFile: | |
| """Récupère le docx depuis l'URL et le retourne comme objet ZipFile""" | |
| if not url.endswith("zip"): | |
| raise ValueError("URL doit pointer vers un fichier ZIP") | |
| resp = requests.get(url, verify=False, headers={ | |
| "User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
| }) | |
| resp.raise_for_status() | |
| with zipfile.ZipFile(io.BytesIO(resp.content)) as zf: | |
| for file_name in zf.namelist(): | |
| if file_name.endswith((".docx", ".doc")): | |
| docx_bytes = zf.read(file_name) | |
| return zipfile.ZipFile(io.BytesIO(docx_bytes)) | |
| raise ValueError("Aucun fichier docx/doc trouvé dans l'archive") | |
| def parse_document_xml(docx_zip: zipfile.ZipFile) -> etree._ElementTree: | |
| """Parse le document.xml principal""" | |
| xml_bytes = docx_zip.read('word/document.xml') | |
| parser = etree.XMLParser(remove_blank_text=True) | |
| return etree.fromstring(xml_bytes, parser=parser) | |
| def clean_document_xml(root: etree._Element) -> None: | |
| """Nettoie le XML en modifiant l'arbre directement""" | |
| # Suppression des balises <w:del> et leur contenu | |
| for del_elem in root.xpath('//w:del', namespaces=NSMAP): | |
| parent = del_elem.getparent() | |
| if parent is not None: | |
| parent.remove(del_elem) | |
| # Désencapsulation des balises <w:ins> | |
| for ins_elem in root.xpath('//w:ins', namespaces=NSMAP): | |
| parent = ins_elem.getparent() | |
| index = parent.index(ins_elem) | |
| for child in ins_elem.iterchildren(): | |
| parent.insert(index, child) | |
| index += 1 | |
| parent.remove(ins_elem) | |
| # Nettoyage des commentaires | |
| for tag in ['w:commentRangeStart', 'w:commentRangeEnd', 'w:commentReference']: | |
| for elem in root.xpath(f'//{tag}', namespaces=NSMAP): | |
| parent = elem.getparent() | |
| if parent is not None: | |
| parent.remove(elem) | |
| def create_modified_docx(original_zip: zipfile.ZipFile, modified_root: etree._Element) -> bytes: | |
| """Crée un nouveau docx avec le XML modifié""" | |
| output = io.BytesIO() | |
| with zipfile.ZipFile(output, 'w', compression=zipfile.ZIP_DEFLATED) as new_zip: | |
| # Copier tous les fichiers non modifiés | |
| for file in original_zip.infolist(): | |
| if file.filename != 'word/document.xml': | |
| new_zip.writestr(file, original_zip.read(file.filename)) | |
| # Ajouter le document.xml modifié | |
| xml_str = etree.tostring( | |
| modified_root, | |
| xml_declaration=True, | |
| encoding='UTF-8', | |
| pretty_print=True | |
| ) | |
| new_zip.writestr('word/document.xml', xml_str) | |
| output.seek(0) | |
| return output.getvalue() | |
| def docx_to_txt(doc_id: str, url: str): | |
| docx_zip = get_docx_archive(url) | |
| root = parse_document_xml(docx_zip) | |
| clean_document_xml(root) | |
| modified_bytes = create_modified_docx(docx_zip, root) | |
| input_path = f"/tmp/{doc_id}_cleaned.docx" | |
| output_path = f"/tmp/{doc_id}_cleaned.txt" | |
| with open(input_path, "wb") as f: | |
| f.write(modified_bytes) | |
| subprocess.run([ | |
| "libreoffice", | |
| "--headless", | |
| "--convert-to", "txt", | |
| "--outdir", "/tmp", | |
| input_path | |
| ], check=True) | |
| with open(output_path, "r", encoding="utf-8") as f: | |
| txt_data = [line.strip() for line in f if line.strip()] | |
| os.remove(input_path) | |
| os.remove(output_path) | |
| return txt_data | |
| def render_page(): | |
| return FileResponse("index.html") | |
| def get_meetings(req: MeetingsRequest): | |
| working_group = req.working_group | |
| tsg = re.sub(r"\d+", "", working_group) | |
| wg_number = re.search(r"\d", working_group).group(0) | |
| url = "https://www.3gpp.org/ftp/tsg_" + tsg | |
| resp = requests.get(url, verify=False) | |
| soup = BeautifulSoup(resp.text, "html.parser") | |
| meeting_folders = [] | |
| all_meetings = [] | |
| wg_folders = [item.get_text() for item in soup.select("tr td a")] | |
| selected_folder = None | |
| for folder in wg_folders: | |
| if str(wg_number) in folder: | |
| selected_folder = folder | |
| break | |
| url += "/" + selected_folder | |
| if selected_folder: | |
| resp = requests.get(url, verify=False) | |
| soup = BeautifulSoup(resp.text, "html.parser") | |
| meeting_folders = [item.get_text() for item in soup.select("tr td a") if item.get_text().startswith("TSG")] | |
| all_meetings = [working_group + "#" + meeting.split("_", 1)[1].replace("_", " ").replace("-", " ") for meeting in meeting_folders] | |
| return MeetingsResponse(meetings=dict(zip(all_meetings, meeting_folders))) | |
| def get_change_request_dataframe(req: DataRequest): | |
| working_group = req.working_group | |
| tsg = re.sub(r"\d+", "", working_group) | |
| wg_number = re.search(r"\d", working_group).group(0) | |
| url = "https://www.3gpp.org/ftp/tsg_" + tsg | |
| resp = requests.get(url, verify=False) | |
| soup = BeautifulSoup(resp.text, "html.parser") | |
| wg_folders = [item.get_text() for item in soup.select("tr td a")] | |
| selected_folder = None | |
| for folder in wg_folders: | |
| if str(wg_number) in folder: | |
| selected_folder = folder | |
| break | |
| url += "/" + selected_folder + "/" + req.meeting + "/docs" | |
| resp = requests.get(url, verify=False) | |
| soup = BeautifulSoup(resp.text, "html.parser") | |
| files = [item.get_text() for item in soup.select("tr td a") if item.get_text().endswith(".xlsx")] | |
| def gen_url(tdoc: str): | |
| return f"{url}/{tdoc}.zip" | |
| df = pd.read_excel(str(url + "/" + files[0]).replace("#", "%23")) | |
| filtered_df = df[(((df["Type"] == "CR") & ((df["CR category"] == "B") | (df["CR category"] == "C"))) | (df["Type"] == "pCR")) & ~(df["Uploaded"].isna())][["TDoc", "Title", "CR category", "Source", "Type", "Agenda item", "Agenda item description", "TDoc Status"]] | |
| filtered_df["URL"] = filtered_df["TDoc"].apply(gen_url) | |
| df = filtered_df.fillna("") | |
| return DataResponse(data=df[["TDoc", "Title", "Type", "TDoc Status", "Agenda item description", "URL"]].to_dict(orient="records")) | |
| def gen_reqs(req: RequirementsRequest): | |
| documents = req.documents | |
| output = [] | |
| for doc in documents: | |
| doc_id = doc.document | |
| url = doc.url | |
| full = "\n".join(docx_to_txt(doc_id, url)) | |
| resp_ai = litellm.completion( | |
| model="gemini/gemini-2.0-flash", | |
| api_key=os.environ.get("GEMINI"), | |
| messages=[{"role":"user","content": f"Here's the document whose ID is {doc_id} with requirements : {full}\n\nI want you to extract all the requirements and give me a context (not giving the section or whatever, a sentence is needed) where that calls for those requirements. If multiples covered contexts is present, make as many requirements list by context as you want."}], | |
| response_format=DocRequirements | |
| ) | |
| reqs = DocRequirements.model_validate_json(resp_ai.choices[0].message.content) | |
| output.append(reqs) | |
| return RequirementsResponse(requirements=output) | |