Spaces:
Sleeping
Sleeping
# app.py | |
import os | |
import uuid | |
import base64 | |
import re | |
import threading | |
import time | |
from typing import List, Dict, Tuple | |
import logging | |
import tempfile | |
import shutil | |
import json | |
import asyncio | |
from openai import AsyncOpenAI | |
from readability import Document | |
import instructor | |
from fastapi import FastAPI, File, UploadFile, HTTPException, BackgroundTasks | |
from fastapi.responses import FileResponse, JSONResponse | |
import pypandoc | |
import fitz # PyMuPDF | |
from bs4 import BeautifulSoup, Comment | |
try: | |
from pptx import Presentation | |
except ImportError: | |
pass | |
try: | |
import textract | |
except ImportError: | |
pass | |
logging.basicConfig(level=logging.DEBUG) | |
app = FastAPI() | |
client = instructor.apatch(AsyncOpenAI()) | |
BASE_DIR = os.path.dirname(os.path.abspath(__file__)) | |
JOBS_DIR = os.path.join(tempfile.gettempdir(), 'jobs') | |
if not os.path.exists(JOBS_DIR): | |
os.makedirs(JOBS_DIR) | |
FORMAT_MAP = { | |
'.odt': 'odt', | |
'.pdf': 'pdf', | |
'.docx': 'docx', | |
'.html': 'html', | |
'.htm': 'html', | |
'.md': 'markdown', | |
'.txt': 'markdown', | |
'.rtf': 'rtf', | |
'.epub': 'epub', | |
'.xml': 'xml', | |
'.org': 'org', | |
'.commonmark': 'commonmark', | |
'.cm': 'commonmark', | |
'.wiki': 'mediawiki', | |
'.opml': 'opml' | |
} | |
ALLOWED_EXTENSIONS_FOR_ACCESSIBILITY = list(FORMAT_MAP.keys()) + ['.doc', '.ppt', '.pptx'] | |
def get_pandoc_format(extension: str) -> str: | |
return FORMAT_MAP.get(extension, 'auto') | |
def update_job_status(job_id: str, status: str, message: str = '', result_file: str = None): | |
job_dir = os.path.join(JOBS_DIR, job_id) | |
status_file = os.path.join(job_dir, 'status.json') | |
status_data = { | |
'status': status, | |
'message': message, | |
'updated_at': time.time() | |
} | |
if result_file: | |
status_data['result_file'] = result_file | |
with open(status_file, 'w') as f: | |
json.dump(status_data, f) | |
def get_job_status(job_id: str): | |
job_dir = os.path.join(JOBS_DIR, job_id) | |
status_file = os.path.join(job_dir, 'status.json') | |
if not os.path.exists(status_file): | |
return None | |
with open(status_file, 'r') as f: | |
status_data = json.load(f) | |
return status_data | |
def process_file(job_id: str, input_file_path: str, ext: str, original_filename: str): | |
job_dir = os.path.join(JOBS_DIR, job_id) | |
try: | |
update_job_status(job_id, 'processing', 'Le fichier est en cours de traitement') | |
image_counter = [1] | |
images_data = {} | |
base_filename = os.path.splitext(original_filename)[0] | |
output_filename = os.path.join(job_dir, f"{base_filename}.html") | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
try: | |
final_html = loop.run_until_complete( | |
convert_to_accessible_html( | |
input_file_path, ext, base_filename, image_counter, images_data | |
) | |
) | |
finally: | |
loop.close() | |
if not final_html: | |
update_job_status(job_id, 'error', 'Erreur lors de la conversion.') | |
return | |
with open(output_filename, 'w', encoding='utf-8') as f: | |
f.write(final_html) | |
update_job_status( | |
job_id, 'completed', 'Traitement terminé', result_file=f"{base_filename}.html" | |
) | |
delete_files_after_delay([input_file_path], delay=6000) | |
except Exception as e: | |
logging.error(f"Erreur lors du traitement du job {job_id}: {str(e)}") | |
update_job_status(job_id, 'error', f"Erreur: {str(e)}") | |
def delete_files_after_delay(file_paths: List[str], delay: int = 6000): | |
def delayed_delete(): | |
time.sleep(delay) | |
for file_path in file_paths: | |
try: | |
if os.path.exists(file_path): | |
os.remove(file_path) | |
logging.debug(f"Fichier temporaire supprimé après délai : {file_path}") | |
except Exception as e: | |
logging.error(f"Erreur lors de la suppression du fichier {file_path} : {str(e)}") | |
thread = threading.Thread(target=delayed_delete) | |
thread.start() | |
async def convert_to_accessible_html(input_filename, ext, base_filename, image_counter, images_data): | |
try: | |
if ext == '.pdf': | |
# PDF -> HTML avec pages | |
html_content = pdf_to_html(input_filename) | |
# Pour le PDF, on a déjà des <!--PAGE_X--> par page, pas besoin d'en ajouter toutes les 20 lignes | |
elif ext in ['.ppt', '.pptx']: | |
# PPT/PPTX -> texte -> HTML minimal | |
text = convert_ppt_to_text(input_filename) | |
html_content = text_to_html(text) | |
# Ajouter les <!--PAGE_X--> toutes les 20 lignes pour ce format | |
html_content = insert_page_comments_every_20_paragraphs(html_content) | |
elif ext == '.doc': | |
# DOC -> texte (textract) -> HTML minimal | |
text = convert_doc_to_text(input_filename) | |
html_content = text_to_html(text) | |
html_content = insert_page_comments_every_20_paragraphs(html_content) | |
elif ext in ['.html', '.htm']: | |
with open(input_filename, 'r', encoding='utf-8') as f: | |
html_content = f.read() | |
try: | |
doc = Document(html_content) | |
html_content = doc.summary() | |
except Exception as e: | |
logging.error(f"Erreur lors du nettoyage HTML avec readability-lxml : {str(e)}") | |
# Ajouter les <!--PAGE_X--> toutes les 20 lignes | |
html_content = insert_page_comments_every_20_paragraphs(html_content) | |
else: | |
# Formats gérés par Pandoc | |
input_format = get_pandoc_format(ext) | |
html_content = convert_with_pandoc(input_filename, input_format) | |
# Ajouter les <!--PAGE_X--> toutes les 20 lignes | |
html_content = insert_page_comments_every_20_paragraphs(html_content) | |
# Nettoyage | |
cleaned_html = await clean_html_content(html_content, image_counter, images_data) | |
# Réécriture accessible | |
html_rewrite_task = asyncio.create_task(rewrite_html_accessible(cleaned_html)) | |
# Traitement des images (description) | |
for image_key in images_data: | |
base64_image = images_data[image_key]['base64_image'] | |
description = await get_image_description(base64_image) | |
images_data[image_key]['description'] = description | |
await html_rewrite_task | |
rewritten_html = html_rewrite_task.result() | |
final_html = reinsert_images(rewritten_html, images_data) | |
# Retirer les scripts indésirables | |
final_soup = BeautifulSoup(final_html, 'html.parser') | |
scripts_to_remove = final_soup.find_all('script', src=True) | |
for script in scripts_to_remove: | |
src = script['src'] | |
if src.startswith('https://bentham-converttohtml.hf.space/'): | |
script.decompose() | |
final_html = str(final_soup) | |
# Supprimer lignes contenant ```html ou ``` seules | |
final_html = re.sub(r'^\s*```(?:html)?\s*$', '', final_html, flags=re.MULTILINE) | |
# Insérer le CSS | |
final_html = insert_css_into_html(final_html) | |
return final_html | |
except Exception as e: | |
logging.error(f"Erreur lors de la conversion : {str(e)}") | |
return None | |
def insert_page_comments_every_20_paragraphs(html_content: str) -> str: | |
# Insère un commentaire <!--PAGE_X--> toutes les 20 balises <p> | |
soup = BeautifulSoup(html_content, 'html.parser') | |
paragraphs = soup.find_all('p') | |
page_number = 1 | |
count = 0 | |
for i, p in enumerate(paragraphs, start=1): | |
if i % 20 == 1: # Avant le premier <p> d'un "bloc" | |
comment = soup.new_string(f"<!--PAGE_{page_number}-->") | |
p.insert_before(comment) | |
page_number += 1 | |
return str(soup) | |
def insert_css_into_html(html_content: str) -> str: | |
css_code = """ | |
:root { | |
--font-size-min: 1rem; | |
--font-size-base: 1rem; | |
--font-size-large: 2.5rem; | |
--line-height: 1.5; | |
--font-family: Arial, Calibri, Verdana, sans-serif; | |
--text-color: #1a1a1a; | |
--background-color: #fdfdfd; | |
--link-color: #1a1a1a; | |
--heading-color-primary: Navy; | |
--heading-color-secondary: DarkGreen; | |
--heading-color-tertiary: DarkRed; | |
--heading-color-quaternary: DarkSlateGray; | |
--heading-color-cinq: DarkSlateBlue; | |
--heading-color-six: DarkViolet; | |
} | |
* { | |
font-size: 1rem; | |
} | |
html { | |
font-family: var(--font-family); | |
font-size: var(--font-size-base); | |
line-height: var(--line-height); | |
color: var(--text-color); | |
background-color: var(--background-color); | |
font-size: clamp(var(--font-size-min), 2vw, 1.5rem); | |
} | |
body { | |
margin: 20px auto; | |
max-width: 36em; | |
padding: 2rem; | |
hyphens: auto; | |
overflow-wrap: break-word; | |
text-rendering: optimizeLegibility; | |
font-kerning: normal; | |
text-align: left; | |
} | |
h1 {margin-left: 0; color: var(--heading-color-primary);} | |
h2 {margin-left: 1rem; color: var(--heading-color-secondary);} | |
h3 {margin-left: 2rem; color: var(--heading-color-tertiary);} | |
h4 {margin-left: 3rem; color: var(--heading-color-quaternary);} | |
h5 {margin-left: 4rem; color: var(--heading-color-cinq);} | |
h6 {margin-left: 5rem; color: var(--heading-color-six);} | |
@media (max-width: 600px) { | |
html { | |
font-size: clamp(var(--font-size-min), 4vw, 1.5rem); | |
} | |
body { | |
padding: 1rem; | |
} | |
h1 {font-size: clamp(1.5rem, 6vw, 2.5rem);} | |
h2 {font-size: clamp(1.25rem, 5vw, 2rem);} | |
h3 {font-size: clamp(1.125rem, 4.5vw, 1.75rem);} | |
h4, h5, h6 {font-size: clamp(1rem, 4vw, 1.5rem);} | |
} | |
@media print { | |
body { | |
background-color: transparent; | |
color: black; | |
font-size: 12pt; | |
} | |
p, h2, h3 { | |
orphans: 3; | |
widows: 3; | |
} | |
h2, h3, h4 { | |
page-break-after: avoid; | |
} | |
} | |
p {margin: 1em 0; font-size: 1rem;} | |
a {color: var(--link-color); text-decoration: none;} | |
a:visited {color: var(--link-color);} | |
a:hover, a:focus {text-decoration: underline;} | |
img {max-width: 100%; height: auto;} | |
table { | |
margin: 1em 0; | |
border-collapse: collapse; | |
width: 100%; | |
overflow-x: auto; | |
display: block; | |
font-variant-numeric: lining-nums tabular-nums; | |
} | |
table caption {margin-bottom: 0.75em;} | |
th, td {border: 1px solid #000; padding: 0.5em; text-align: left;} | |
tbody tr:nth-child(odd) {background-color: #f2f2f2;} | |
tbody tr:nth-child(even) {background-color: #ffffff;} | |
blockquote { | |
margin: 1em 0 1em 1.7em; | |
padding-left: 1em; | |
border-left: 2px solid #e6e6e6; | |
color: #606060; | |
} | |
code { | |
font-family: Menlo, Monaco, 'Lucida Console', Consolas, monospace; | |
font-size: 0.85rem; | |
margin: 0; | |
white-space: pre-wrap; | |
} | |
pre { | |
margin: 1em 0; | |
overflow: auto; | |
} | |
pre code { | |
padding: 0; | |
overflow: visible; | |
overflow-wrap: normal; | |
} | |
.sourceCode { | |
background-color: transparent; | |
overflow: visible; | |
} | |
hr { | |
background-color: #1a1a1a; | |
border: none; | |
height: 1px; | |
margin: 1em 0; | |
} | |
span.smallcaps {font-variant: small-caps;} | |
span.underline {text-decoration: underline;} | |
div.column {display: inline-block; vertical-align: top; width: 50%;} | |
.description { | |
background-color: #f0f3ff; | |
padding: 1em; | |
border: 1px solid black; | |
} | |
div.hanging-indent { | |
margin-left: 1.5em; | |
text-indent: -1.5em; | |
} | |
ul.task-list {list-style: none;} | |
.display.math { | |
display: block; | |
text-align: center; | |
margin: 0.5rem auto; | |
} | |
""" | |
final_soup = BeautifulSoup(html_content, 'html.parser') | |
style_tag = final_soup.new_tag('style') | |
style_tag.string = css_code | |
head_tag = final_soup.head | |
if head_tag: | |
head_tag.clear() | |
head_tag.append(style_tag) | |
else: | |
head_tag = final_soup.new_tag('head') | |
head_tag.append(style_tag) | |
final_soup.insert(0, head_tag) | |
final_html = str(final_soup) | |
return final_html | |
def encode_image_from_data_uri(data_uri: str) -> str: | |
try: | |
header, encoded = data_uri.split(',', 1) | |
encoded = ''.join(encoded.split()) | |
return encoded | |
except Exception as e: | |
logging.error(f"Erreur lors de l'encodage de l'image : {str(e)}") | |
return "" | |
def markdown_to_html(markdown_text: str) -> str: | |
html = markdown_text | |
html = re.sub(r'\*\*(.*?)\*\*', r'<strong>\1</strong>', html) | |
html = re.sub(r'\*(.*?)\*', r'<i>\1</i>', html) | |
html = re.sub(r'__(.*?)__', r'<strong>\1</strong>', html) | |
html = re.sub(r'_(.*?)_', r'<i>\1</i>', html) | |
return html | |
async def get_image_description(base64_image: str) -> str: | |
try: | |
response = await client.chat.completions.create( | |
model="gpt-4o-mini", | |
messages=[ | |
{ | |
"role": "user", | |
"content": [ | |
{ | |
"type": "text", | |
"text": "Décris ce que l'on peut voir sur cette image, pour qu'un lecteur malvoyant puisse comprendre ce qu'elle représente.", | |
}, | |
{ | |
"type": "image_url", | |
"image_url": { | |
"url": f"data:image/jpeg;base64,{base64_image}" | |
}, | |
}, | |
], | |
} | |
], | |
) | |
description = response.choices[0].message.content.strip() | |
return description | |
except Exception as e: | |
logging.error(f"Erreur lors de l'appel à l'API OpenAI : {str(e)}") | |
return "Description indisponible." | |
async def rewrite_html_accessible(html_content: str) -> str: | |
prompt = ( | |
"Je vais te donner un fichier HTML, et je voudrais que tu le réécrives pour permettre l'accessibilité à toutes les formes de handicap, tout en **préservant strictement l'ordre du contenu original**.\n" | |
"Commence à analyser le plan du document. Il faut d'abord identifier les titres et comprendre leur logique :\n" | |
"- A priori, les titres qui sont préfixés par une écriture romaine (I, II, III), " | |
"par un nombre (1, 2, 3) ou par une lettre (a, b, c, ou bien A, B, C) doivent être de même niveau." | |
"Idem pour les titres rédigés en majuscules.\n" | |
"- Quand une expression très courte qui ne ressemble pas syntaxiquement à une phrase est présentée sur une seule ligne," | |
"il y a des chances qu'il s'agisse d'un titre : dans ce cas (et si c'est pertinent) traite-la comme telle.\n" | |
"- Au contraire, **une phrase longue ne doit JAMAIS être traitée comme un titre**," | |
"même quand elle est précédée par un numéro ou une lettre." | |
"De même, ne traite jamais comme un titre un ensemble de plusieurs phrases. Je repète : les balises <h1>, <h2>, etc., ne sont destinées qu'à encadrer des expressions relativement courtes, et rien d'autre.\n\n" | |
"Tu ne dois **rien réorganiser**, **ne rien supprimer** et **ne rien ajouter** en termes de structure ou de contenu. " | |
"Ton intervention doit se faire exclusivement sur la **forme** du document : le contenu doit être **intégralement préservé dans le même ordre**, jusqu'à la fin. " | |
"Laisse la balise <head> vide.\n" | |
"IMPORTANT : Tu dois **respecter scrupuleusement l'ordre indiqué par les commentaires HTML de la forme <!--PAGE_X-->,** s'ils existent. On doit avoir <!--PAGE_1--> [...] <!--PAGE_2--> [...] <!--PAGE_3--> [...], et ainsi de suite, dans l'ordre exact et sans en oublier un seul. C'est très important ! Ces marqueurs te permettent de t'assurer que la page est bien retranscrite dans le bon ordre. Ne déplace, ne supprime, et ne modifie pas ces commentaires.\n" | |
"Attention, ce document est peut-être issu d'un PDF ou d'un DOCX. Il faut donc être attentif :\n" | |
"- Aux balises <p> qui suivent immédiatement les marqueurs <!--PAGE_X--> : il peut s'agir de headers. Pour le savoir, il faut les comparer entre eux pour savoir s'ils sont à peu près similaires.\n" | |
"- Aux balises <p> qui précèdent immédiatement les marqueurs <!--PAGE_X--> : il peut s'agir de footers. De même, il faut les comparer entre eux pour savoir s'ils sont à peu près similaires.\n" | |
"Dans tous les cas, il faut supprimer les balises <p> correspondant aux headers et les footers identifiés. Attention, ces suppressions ne doivent pas affecter les autres éléments.\n" | |
"S'il y a des retours à la ligne injustifiés, il faut rétablir l'intégrité des phrases, et constituer de véritables paragraphes complets. L'ensemble du code doit être inclus entre des balises <html></html>\n" | |
"Tu donneras la totalité du HTML réécrit, et rien d'autre, ni avant ni après. " | |
"Ne résume jamais les informations, ne réorganise pas le contenu et ne supprime aucune section.\n\n" | |
"Voici tout d'abord les règles à suivre pour avoir un document accessible :\n\n" | |
"1. Limiter l'italique et les soulignements.\n" | |
"2. S'il y a des tableaux, insérer un tiret dans les cellules ne contenant pas d’information, et associer une légende aux tableaux.\n" | |
"3. Pour les titres, utilise absolument les balises h1, h2, h3, h4, h5 et h6. Utilise la balise h1 pour le titre qui a le plus grand niveau.\n\n" | |
"On évite les balises <ul> et <li>\n" | |
"Encore une fois, fais bien attention à reproduire fidèlement l'ordre des marqueurs <!--PAGE_X-->, dans l'ordre croissant des X : c'est ta tâche principale. Recompte régulièrement les X des PAGE_X pour être sûr qu'il n'en manque aucun.\n" | |
"N'oublie pas qu'on ne doit avoir AUCUN header et AUCUN footer, c'est très important.\n" | |
"Voici maintenant le fichier HTML d'origine :\n" | |
+ html_content | |
) | |
try: | |
logging.debug("Contenu avant l'appel à l'API OpenAI :") | |
logging.debug(html_content) | |
response = await client.chat.completions.create( | |
model="o1-mini", | |
messages=[ | |
{"role": "user", "content": prompt} | |
], | |
) | |
rewritten_html = response.choices[0].message.content.strip() | |
rewritten_html = rewritten_html.replace("<!--", "<!--").replace("-->", "-->") | |
logging.debug("Contenu après l'appel à l'API OpenAI :") | |
logging.debug(rewritten_html) | |
return rewritten_html | |
except Exception as e: | |
logging.error(f"Erreur lors de la réécriture du HTML : {str(e)}") | |
return html_content | |
async def clean_html_content(html_content: str, image_counter: List[int], images_data: Dict[str, Dict[str, str]]) -> str: | |
soup = BeautifulSoup(html_content, 'html.parser') | |
for tag in soup.find_all(): | |
if 'style' in tag.attrs: | |
del tag['style'] | |
for element in soup.find_all(['header', 'footer']): | |
element.decompose() | |
for div in soup.find_all('div'): | |
if div.get_text(strip=True).isdigit(): | |
div.decompose() | |
for span in soup.find_all('span'): | |
span.unwrap() | |
img_tags = soup.find_all('img') | |
if img_tags: | |
if len(img_tags) > 20: | |
logging.warning(f"Number of images ({len(img_tags)}) exceeds 20. Images will be ignored.") | |
for img in img_tags: | |
img.decompose() | |
else: | |
for img in img_tags: | |
src = img.get('src', '') | |
X = image_counter[0] | |
if src.startswith('data:image/'): | |
base64_image = encode_image_from_data_uri(src) | |
if base64_image: | |
images_data[f"IMG_{X}"] = { | |
'base64_image': base64_image | |
} | |
placeholder = f"<!--IMG_{X}-->" | |
img.replace_with(BeautifulSoup(placeholder, 'html.parser')) | |
image_counter[0] += 1 | |
else: | |
img.decompose() | |
else: | |
img.decompose() | |
else: | |
logging.debug("No <img> tags found in the HTML content.") | |
for img in soup.find_all('img'): | |
img.decompose() | |
scripts_to_remove = soup.find_all('script', src=True) | |
for script in scripts_to_remove: | |
src = script['src'] | |
if src.startswith('https://bentham-converttohtml.hf.space/'): | |
script.decompose() | |
for tag in soup.find_all('p'): | |
if not tag.get_text(strip=True): | |
tag.decompose() | |
return str(soup) | |
def reinsert_images(html_content: str, images_data: Dict[str, Dict[str, str]]) -> str: | |
soup = BeautifulSoup(html_content, 'html.parser') | |
for comment in soup.find_all(string=lambda text: isinstance(text, Comment)): | |
match = re.match(r'IMG_(\d+)', comment) | |
if match: | |
image_number = match.group(1) | |
image_key = f"IMG_{image_number}" | |
if image_key in images_data: | |
img_tag = soup.new_tag('img') | |
img_tag['src'] = f"data:image/jpeg;base64,{images_data[image_key]['base64_image']}" | |
img_tag['alt'] = images_data[image_key]['description'] | |
new_content = soup.new_tag('div') | |
new_content.append(img_tag) | |
p_tag = soup.new_tag('p', attrs={'class': 'description'}) | |
strong_tag = soup.new_tag('strong') | |
strong_tag.string = f"Image {image_number}" | |
p_tag.append(strong_tag) | |
p_tag.append(" : ") | |
y_markdown = images_data[image_key]['description'] | |
y_html = markdown_to_html(y_markdown) | |
y_soup = BeautifulSoup(y_html, 'html.parser') | |
p_tag.append(y_soup) | |
new_content.append(p_tag) | |
comment.replace_with(new_content) | |
else: | |
logging.error(f"Données pour {image_key} non trouvées.") | |
return str(soup) | |
def pdf_to_html(input_filename: str) -> str: | |
soup = BeautifulSoup("<html><head></head><body></body></html>", 'html.parser') | |
body = soup.body | |
page_number = 1 | |
with fitz.open(input_filename) as doc: | |
for page in doc: | |
page_comment = f"<!--PAGE_{page_number}-->" | |
body.append(BeautifulSoup(page_comment, 'html.parser')) | |
page_html = page.get_text("html") | |
page_fragment = BeautifulSoup(page_html, 'html.parser') | |
body.append(page_fragment) | |
page_number += 1 | |
return str(soup) | |
def convert_with_pandoc(input_filename: str, input_format: str) -> str: | |
try: | |
output = pypandoc.convert_file( | |
input_filename, | |
'html', | |
format=input_format, | |
outputfile=None, | |
extra_args=['--self-contained', '--strip-comments', '--quiet'] | |
) | |
return output | |
except RuntimeError as e: | |
logging.error(f"Pandoc a rencontré une erreur : {str(e)}, tentative sans --self-contained.") | |
output = pypandoc.convert_file( | |
input_filename, | |
'html', | |
format=input_format, | |
outputfile=None, | |
extra_args=['--strip-comments', '--quiet'] | |
) | |
return output | |
def text_to_html(text: str) -> str: | |
lines = text.split('\n') | |
html_lines = ['<p>' + line.strip() + '</p>' for line in lines if line.strip()] | |
return "<html><head></head><body>" + "\n".join(html_lines) + "</body></html>" | |
def convert_ppt_to_text(input_filename: str) -> str: | |
if 'Presentation' not in globals(): | |
raise HTTPException(status_code=500, detail="La librairie python-pptx n'est pas installée.") | |
prs = Presentation(input_filename) | |
text_content = [] | |
for slide in prs.slides: | |
for shape in slide.shapes: | |
if hasattr(shape, "text"): | |
text_content.append(shape.text) | |
return "\n".join(text_content) | |
def convert_doc_to_text(input_filename: str) -> str: | |
if 'textract' not in globals(): | |
raise HTTPException(status_code=500, detail="La librairie textract n'est pas installée.") | |
text = textract.process(input_filename).decode('utf-8', errors='replace') | |
return text | |
def clean_html_file(input_filepath: str, cleaned_output_filepath: str) -> bool: | |
try: | |
with open(input_filepath, 'r', encoding='utf-8') as f: | |
html_content = f.read() | |
doc = Document(html_content) | |
cleaned_html = doc.summary() | |
with open(cleaned_output_filepath, 'w', encoding='utf-8') as f: | |
f.write(cleaned_html) | |
logging.debug("Contenu HTML nettoyé avec readability-lxml.") | |
return True | |
except Exception as e: | |
logging.error(f"Erreur lors du nettoyage du fichier HTML {input_filepath} : {str(e)}") | |
return False | |
async def convert_file_to_html( | |
file: UploadFile = File(...), | |
background_tasks: BackgroundTasks = BackgroundTasks() | |
): | |
try: | |
job_id = str(uuid.uuid4()) | |
job_dir = os.path.join(JOBS_DIR, job_id) | |
os.makedirs(job_dir) | |
ext = os.path.splitext(file.filename)[1].lower() | |
if ext not in ALLOWED_EXTENSIONS_FOR_ACCESSIBILITY: | |
raise HTTPException(status_code=400, detail=f"Extension de fichier non supportée : {ext}") | |
input_file_path = os.path.join(job_dir, f'input{ext}') | |
with open(input_file_path, "wb") as f: | |
shutil.copyfileobj(file.file, f) | |
status = { | |
'status': 'pending', | |
'message': 'Traitement démarré', | |
'created_at': time.time() | |
} | |
status_file = os.path.join(job_dir, 'status.json') | |
with open(status_file, 'w') as f: | |
json.dump(status, f) | |
background_tasks.add_task(process_file, job_id, input_file_path, ext, file.filename) | |
return JSONResponse(content={'job_id': job_id}) | |
except Exception as e: | |
logging.error(f"Erreur lors du démarrage du job : {str(e)}") | |
return JSONResponse(status_code=500, content={"message": f"Erreur lors du démarrage du job : {str(e)}"}) | |
async def check_status(job_id: str): | |
status_data = get_job_status(job_id) | |
if status_data is None: | |
return JSONResponse(status_code=404, content={"message": "Job non trouvé"}) | |
return JSONResponse(content=status_data) | |
async def get_result(job_id: str): | |
job_dir = os.path.join(JOBS_DIR, job_id) | |
status_data = get_job_status(job_id) | |
if status_data is None: | |
return JSONResponse(status_code=404, content={"message": "Job non trouvé"}) | |
if status_data.get('status') != 'completed': | |
return JSONResponse(status_code=400, content={"message": "Résultat non prêt"}) | |
result_file = status_data.get('result_file') | |
if not result_file: | |
return JSONResponse(status_code=500, content={"message": "Fichier résultat non trouvé"}) | |
result_file_path = os.path.join(job_dir, result_file) | |
if not os.path.exists(result_file_path): | |
return JSONResponse(status_code=500, content={"message": "Fichier résultat non trouvé sur le serveur"}) | |
return FileResponse(result_file_path, filename=os.path.basename(result_file_path), media_type='text/html') | |
def delete_temp_files(file_paths: list): | |
for file_path in file_paths: | |
try: | |
if os.path.exists(file_path): | |
os.remove(file_path) | |
logging.debug(f"Fichier temporaire supprimé : {file_path}") | |
except Exception as e: | |
logging.error(f"Erreur lors de la suppression du fichier {file_path} : {str(e)}") | |
async def convert_file_to_txt( | |
file: UploadFile = File(...), | |
background_tasks: BackgroundTasks = BackgroundTasks() | |
): | |
try: | |
original_filename = file.filename | |
base_filename, ext = os.path.splitext(original_filename) | |
ext = ext.lower() | |
allowed_extensions = [ | |
'.odt', '.pdf', '.docx', '.html', '.htm', '.md', '.txt', '.rtf', '.epub', | |
'.tex', '.xml', '.org', '.commonmark', '.cm', '.wiki', '.opml', | |
'.ppt', '.pptx', '.doc' | |
] | |
if ext not in allowed_extensions: | |
raise HTTPException(status_code=400, detail=f"Extension de fichier non supportée : {ext}") | |
with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as input_tmp_file: | |
input_filename = input_tmp_file.name | |
with open(input_filename, "wb") as f: | |
shutil.copyfileobj(file.file, f) | |
logging.debug(f"Fichier téléchargé enregistré : {input_filename}") | |
if ext in ['.html', '.htm']: | |
cleaned_input_filename = input_filename + '_cleaned.html' | |
nettoyage_reussi = clean_html_file(input_filename, cleaned_input_filename) | |
if not nettoyage_reussi: | |
raise HTTPException(status_code=500, detail="Erreur lors du nettoyage du fichier HTML.") | |
input_filename = cleaned_input_filename | |
logging.debug(f"Fichier HTML nettoyé enregistré : {input_filename}") | |
unique_id = uuid.uuid4().hex | |
output_filename = os.path.join(tempfile.gettempdir(), f"{base_filename}_{unique_id}.txt") | |
if ext == '.pdf': | |
text = "" | |
with fitz.open(input_filename) as doc: | |
for page in doc: | |
text += page.get_text() | |
with open(output_filename, "w", encoding="utf-8") as f: | |
f.write(text) | |
elif ext == '.pptx': | |
if 'Presentation' not in globals(): | |
raise HTTPException(status_code=500, detail="La librairie python-pptx n'est pas installée.") | |
prs = Presentation(input_filename) | |
text_content = [] | |
for slide in prs.slides: | |
for shape in slide.shapes: | |
if hasattr(shape, "text"): | |
text_content.append(shape.text) | |
text = "\n".join(text_content) | |
with open(output_filename, "w", encoding="utf-8") as f: | |
f.write(text) | |
elif ext == '.ppt': | |
if 'textract' not in globals(): | |
raise HTTPException(status_code=500, detail="La librairie textract n'est pas installée.") | |
text = textract.process(input_filename).decode('utf-8', errors='replace') | |
with open(output_filename, "w", encoding="utf-8") as f: | |
f.write(text) | |
elif ext == '.doc': | |
if 'textract' not in globals(): | |
raise HTTPException(status_code=500, detail="La librairie textract n'est pas installée.") | |
text = textract.process(input_filename).decode('utf-8', errors='replace') | |
with open(output_filename, "w", encoding="utf-8") as f: | |
f.write(text) | |
else: | |
output = pypandoc.convert_file(input_filename, 'plain', outputfile=output_filename) | |
if not os.path.exists(output_filename): | |
logging.error(f"Le fichier {output_filename} n'a pas été généré.") | |
raise HTTPException(status_code=500, detail="Erreur lors de la conversion.") | |
temp_files_to_delete = [input_filename, output_filename] | |
if ext in ['.html', '.htm']: | |
temp_files_to_delete.append(cleaned_input_filename) | |
background_tasks.add_task(delete_temp_files, temp_files_to_delete) | |
return FileResponse(output_filename, filename=f"{base_filename}.txt") | |
except HTTPException as http_exc: | |
logging.error(f"Erreur HTTP lors de la conversion : {str(http_exc.detail)}") | |
return JSONResponse(status_code=http_exc.status_code, content={"message": http_exc.detail}) | |
except Exception as e: | |
logging.error(f"Erreur interne lors de la conversion : {str(e)}") | |
return JSONResponse(status_code=500, content={"message": f"Erreur interne : {str(e)}"}) |