accessibility / main.py
Bentham's picture
Update main.py
5896966 verified
# app.py
import os
import uuid
import base64
import re
import threading
import time
from typing import List, Dict, Tuple
import logging
import tempfile
import shutil
import json
import asyncio
from openai import AsyncOpenAI
from readability import Document
import instructor
from fastapi import FastAPI, File, UploadFile, HTTPException, BackgroundTasks
from fastapi.responses import FileResponse, JSONResponse
import pypandoc
import fitz # PyMuPDF
from bs4 import BeautifulSoup, Comment
try:
from pptx import Presentation
except ImportError:
pass
try:
import textract
except ImportError:
pass
logging.basicConfig(level=logging.DEBUG)
app = FastAPI()
client = instructor.apatch(AsyncOpenAI())
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
JOBS_DIR = os.path.join(tempfile.gettempdir(), 'jobs')
if not os.path.exists(JOBS_DIR):
os.makedirs(JOBS_DIR)
FORMAT_MAP = {
'.odt': 'odt',
'.pdf': 'pdf',
'.docx': 'docx',
'.html': 'html',
'.htm': 'html',
'.md': 'markdown',
'.txt': 'markdown',
'.rtf': 'rtf',
'.epub': 'epub',
'.xml': 'xml',
'.org': 'org',
'.commonmark': 'commonmark',
'.cm': 'commonmark',
'.wiki': 'mediawiki',
'.opml': 'opml'
}
ALLOWED_EXTENSIONS_FOR_ACCESSIBILITY = list(FORMAT_MAP.keys()) + ['.doc', '.ppt', '.pptx']
def get_pandoc_format(extension: str) -> str:
return FORMAT_MAP.get(extension, 'auto')
def update_job_status(job_id: str, status: str, message: str = '', result_file: str = None):
job_dir = os.path.join(JOBS_DIR, job_id)
status_file = os.path.join(job_dir, 'status.json')
status_data = {
'status': status,
'message': message,
'updated_at': time.time()
}
if result_file:
status_data['result_file'] = result_file
with open(status_file, 'w') as f:
json.dump(status_data, f)
def get_job_status(job_id: str):
job_dir = os.path.join(JOBS_DIR, job_id)
status_file = os.path.join(job_dir, 'status.json')
if not os.path.exists(status_file):
return None
with open(status_file, 'r') as f:
status_data = json.load(f)
return status_data
def process_file(job_id: str, input_file_path: str, ext: str, original_filename: str):
job_dir = os.path.join(JOBS_DIR, job_id)
try:
update_job_status(job_id, 'processing', 'Le fichier est en cours de traitement')
image_counter = [1]
images_data = {}
base_filename = os.path.splitext(original_filename)[0]
output_filename = os.path.join(job_dir, f"{base_filename}.html")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
final_html = loop.run_until_complete(
convert_to_accessible_html(
input_file_path, ext, base_filename, image_counter, images_data
)
)
finally:
loop.close()
if not final_html:
update_job_status(job_id, 'error', 'Erreur lors de la conversion.')
return
with open(output_filename, 'w', encoding='utf-8') as f:
f.write(final_html)
update_job_status(
job_id, 'completed', 'Traitement terminé', result_file=f"{base_filename}.html"
)
delete_files_after_delay([input_file_path], delay=6000)
except Exception as e:
logging.error(f"Erreur lors du traitement du job {job_id}: {str(e)}")
update_job_status(job_id, 'error', f"Erreur: {str(e)}")
def delete_files_after_delay(file_paths: List[str], delay: int = 6000):
def delayed_delete():
time.sleep(delay)
for file_path in file_paths:
try:
if os.path.exists(file_path):
os.remove(file_path)
logging.debug(f"Fichier temporaire supprimé après délai : {file_path}")
except Exception as e:
logging.error(f"Erreur lors de la suppression du fichier {file_path} : {str(e)}")
thread = threading.Thread(target=delayed_delete)
thread.start()
async def convert_to_accessible_html(input_filename, ext, base_filename, image_counter, images_data):
try:
if ext == '.pdf':
# PDF -> HTML avec pages
html_content = pdf_to_html(input_filename)
# Pour le PDF, on a déjà des <!--PAGE_X--> par page, pas besoin d'en ajouter toutes les 20 lignes
elif ext in ['.ppt', '.pptx']:
# PPT/PPTX -> texte -> HTML minimal
text = convert_ppt_to_text(input_filename)
html_content = text_to_html(text)
# Ajouter les <!--PAGE_X--> toutes les 20 lignes pour ce format
html_content = insert_page_comments_every_20_paragraphs(html_content)
elif ext == '.doc':
# DOC -> texte (textract) -> HTML minimal
text = convert_doc_to_text(input_filename)
html_content = text_to_html(text)
html_content = insert_page_comments_every_20_paragraphs(html_content)
elif ext in ['.html', '.htm']:
with open(input_filename, 'r', encoding='utf-8') as f:
html_content = f.read()
try:
doc = Document(html_content)
html_content = doc.summary()
except Exception as e:
logging.error(f"Erreur lors du nettoyage HTML avec readability-lxml : {str(e)}")
# Ajouter les <!--PAGE_X--> toutes les 20 lignes
html_content = insert_page_comments_every_20_paragraphs(html_content)
else:
# Formats gérés par Pandoc
input_format = get_pandoc_format(ext)
html_content = convert_with_pandoc(input_filename, input_format)
# Ajouter les <!--PAGE_X--> toutes les 20 lignes
html_content = insert_page_comments_every_20_paragraphs(html_content)
# Nettoyage
cleaned_html = await clean_html_content(html_content, image_counter, images_data)
# Réécriture accessible
html_rewrite_task = asyncio.create_task(rewrite_html_accessible(cleaned_html))
# Traitement des images (description)
for image_key in images_data:
base64_image = images_data[image_key]['base64_image']
description = await get_image_description(base64_image)
images_data[image_key]['description'] = description
await html_rewrite_task
rewritten_html = html_rewrite_task.result()
final_html = reinsert_images(rewritten_html, images_data)
# Retirer les scripts indésirables
final_soup = BeautifulSoup(final_html, 'html.parser')
scripts_to_remove = final_soup.find_all('script', src=True)
for script in scripts_to_remove:
src = script['src']
if src.startswith('https://bentham-converttohtml.hf.space/'):
script.decompose()
final_html = str(final_soup)
# Supprimer lignes contenant ```html ou ``` seules
final_html = re.sub(r'^\s*```(?:html)?\s*$', '', final_html, flags=re.MULTILINE)
# Insérer le CSS
final_html = insert_css_into_html(final_html)
return final_html
except Exception as e:
logging.error(f"Erreur lors de la conversion : {str(e)}")
return None
def insert_page_comments_every_20_paragraphs(html_content: str) -> str:
# Insère un commentaire <!--PAGE_X--> toutes les 20 balises <p>
soup = BeautifulSoup(html_content, 'html.parser')
paragraphs = soup.find_all('p')
page_number = 1
count = 0
for i, p in enumerate(paragraphs, start=1):
if i % 20 == 1: # Avant le premier <p> d'un "bloc"
comment = soup.new_string(f"<!--PAGE_{page_number}-->")
p.insert_before(comment)
page_number += 1
return str(soup)
def insert_css_into_html(html_content: str) -> str:
css_code = """
:root {
--font-size-min: 1rem;
--font-size-base: 1rem;
--font-size-large: 2.5rem;
--line-height: 1.5;
--font-family: Arial, Calibri, Verdana, sans-serif;
--text-color: #1a1a1a;
--background-color: #fdfdfd;
--link-color: #1a1a1a;
--heading-color-primary: Navy;
--heading-color-secondary: DarkGreen;
--heading-color-tertiary: DarkRed;
--heading-color-quaternary: DarkSlateGray;
--heading-color-cinq: DarkSlateBlue;
--heading-color-six: DarkViolet;
}
* {
font-size: 1rem;
}
html {
font-family: var(--font-family);
font-size: var(--font-size-base);
line-height: var(--line-height);
color: var(--text-color);
background-color: var(--background-color);
font-size: clamp(var(--font-size-min), 2vw, 1.5rem);
}
body {
margin: 20px auto;
max-width: 36em;
padding: 2rem;
hyphens: auto;
overflow-wrap: break-word;
text-rendering: optimizeLegibility;
font-kerning: normal;
text-align: left;
}
h1 {margin-left: 0; color: var(--heading-color-primary);}
h2 {margin-left: 1rem; color: var(--heading-color-secondary);}
h3 {margin-left: 2rem; color: var(--heading-color-tertiary);}
h4 {margin-left: 3rem; color: var(--heading-color-quaternary);}
h5 {margin-left: 4rem; color: var(--heading-color-cinq);}
h6 {margin-left: 5rem; color: var(--heading-color-six);}
@media (max-width: 600px) {
html {
font-size: clamp(var(--font-size-min), 4vw, 1.5rem);
}
body {
padding: 1rem;
}
h1 {font-size: clamp(1.5rem, 6vw, 2.5rem);}
h2 {font-size: clamp(1.25rem, 5vw, 2rem);}
h3 {font-size: clamp(1.125rem, 4.5vw, 1.75rem);}
h4, h5, h6 {font-size: clamp(1rem, 4vw, 1.5rem);}
}
@media print {
body {
background-color: transparent;
color: black;
font-size: 12pt;
}
p, h2, h3 {
orphans: 3;
widows: 3;
}
h2, h3, h4 {
page-break-after: avoid;
}
}
p {margin: 1em 0; font-size: 1rem;}
a {color: var(--link-color); text-decoration: none;}
a:visited {color: var(--link-color);}
a:hover, a:focus {text-decoration: underline;}
img {max-width: 100%; height: auto;}
table {
margin: 1em 0;
border-collapse: collapse;
width: 100%;
overflow-x: auto;
display: block;
font-variant-numeric: lining-nums tabular-nums;
}
table caption {margin-bottom: 0.75em;}
th, td {border: 1px solid #000; padding: 0.5em; text-align: left;}
tbody tr:nth-child(odd) {background-color: #f2f2f2;}
tbody tr:nth-child(even) {background-color: #ffffff;}
blockquote {
margin: 1em 0 1em 1.7em;
padding-left: 1em;
border-left: 2px solid #e6e6e6;
color: #606060;
}
code {
font-family: Menlo, Monaco, 'Lucida Console', Consolas, monospace;
font-size: 0.85rem;
margin: 0;
white-space: pre-wrap;
}
pre {
margin: 1em 0;
overflow: auto;
}
pre code {
padding: 0;
overflow: visible;
overflow-wrap: normal;
}
.sourceCode {
background-color: transparent;
overflow: visible;
}
hr {
background-color: #1a1a1a;
border: none;
height: 1px;
margin: 1em 0;
}
span.smallcaps {font-variant: small-caps;}
span.underline {text-decoration: underline;}
div.column {display: inline-block; vertical-align: top; width: 50%;}
.description {
background-color: #f0f3ff;
padding: 1em;
border: 1px solid black;
}
div.hanging-indent {
margin-left: 1.5em;
text-indent: -1.5em;
}
ul.task-list {list-style: none;}
.display.math {
display: block;
text-align: center;
margin: 0.5rem auto;
}
"""
final_soup = BeautifulSoup(html_content, 'html.parser')
style_tag = final_soup.new_tag('style')
style_tag.string = css_code
head_tag = final_soup.head
if head_tag:
head_tag.clear()
head_tag.append(style_tag)
else:
head_tag = final_soup.new_tag('head')
head_tag.append(style_tag)
final_soup.insert(0, head_tag)
final_html = str(final_soup)
return final_html
def encode_image_from_data_uri(data_uri: str) -> str:
try:
header, encoded = data_uri.split(',', 1)
encoded = ''.join(encoded.split())
return encoded
except Exception as e:
logging.error(f"Erreur lors de l'encodage de l'image : {str(e)}")
return ""
def markdown_to_html(markdown_text: str) -> str:
html = markdown_text
html = re.sub(r'\*\*(.*?)\*\*', r'<strong>\1</strong>', html)
html = re.sub(r'\*(.*?)\*', r'<i>\1</i>', html)
html = re.sub(r'__(.*?)__', r'<strong>\1</strong>', html)
html = re.sub(r'_(.*?)_', r'<i>\1</i>', html)
return html
async def get_image_description(base64_image: str) -> str:
try:
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "user",
"content": [
{
"type": "text",
"text": "Décris ce que l'on peut voir sur cette image, pour qu'un lecteur malvoyant puisse comprendre ce qu'elle représente.",
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{base64_image}"
},
},
],
}
],
)
description = response.choices[0].message.content.strip()
return description
except Exception as e:
logging.error(f"Erreur lors de l'appel à l'API OpenAI : {str(e)}")
return "Description indisponible."
async def rewrite_html_accessible(html_content: str) -> str:
prompt = (
"Je vais te donner un fichier HTML, et je voudrais que tu le réécrives pour permettre l'accessibilité à toutes les formes de handicap, tout en **préservant strictement l'ordre du contenu original**.\n"
"Commence à analyser le plan du document. Il faut d'abord identifier les titres et comprendre leur logique :\n"
"- A priori, les titres qui sont préfixés par une écriture romaine (I, II, III), "
"par un nombre (1, 2, 3) ou par une lettre (a, b, c, ou bien A, B, C) doivent être de même niveau."
"Idem pour les titres rédigés en majuscules.\n"
"- Quand une expression très courte qui ne ressemble pas syntaxiquement à une phrase est présentée sur une seule ligne,"
"il y a des chances qu'il s'agisse d'un titre : dans ce cas (et si c'est pertinent) traite-la comme telle.\n"
"- Au contraire, **une phrase longue ne doit JAMAIS être traitée comme un titre**,"
"même quand elle est précédée par un numéro ou une lettre."
"De même, ne traite jamais comme un titre un ensemble de plusieurs phrases. Je repète : les balises <h1>, <h2>, etc., ne sont destinées qu'à encadrer des expressions relativement courtes, et rien d'autre.\n\n"
"Tu ne dois **rien réorganiser**, **ne rien supprimer** et **ne rien ajouter** en termes de structure ou de contenu. "
"Ton intervention doit se faire exclusivement sur la **forme** du document : le contenu doit être **intégralement préservé dans le même ordre**, jusqu'à la fin. "
"Laisse la balise <head> vide.\n"
"IMPORTANT : Tu dois **respecter scrupuleusement l'ordre indiqué par les commentaires HTML de la forme <!--PAGE_X-->,** s'ils existent. On doit avoir <!--PAGE_1--> [...] <!--PAGE_2--> [...] <!--PAGE_3--> [...], et ainsi de suite, dans l'ordre exact et sans en oublier un seul. C'est très important ! Ces marqueurs te permettent de t'assurer que la page est bien retranscrite dans le bon ordre. Ne déplace, ne supprime, et ne modifie pas ces commentaires.\n"
"Attention, ce document est peut-être issu d'un PDF ou d'un DOCX. Il faut donc être attentif :\n"
"- Aux balises <p> qui suivent immédiatement les marqueurs <!--PAGE_X--> : il peut s'agir de headers. Pour le savoir, il faut les comparer entre eux pour savoir s'ils sont à peu près similaires.\n"
"- Aux balises <p> qui précèdent immédiatement les marqueurs <!--PAGE_X--> : il peut s'agir de footers. De même, il faut les comparer entre eux pour savoir s'ils sont à peu près similaires.\n"
"Dans tous les cas, il faut supprimer les balises <p> correspondant aux headers et les footers identifiés. Attention, ces suppressions ne doivent pas affecter les autres éléments.\n"
"S'il y a des retours à la ligne injustifiés, il faut rétablir l'intégrité des phrases, et constituer de véritables paragraphes complets. L'ensemble du code doit être inclus entre des balises <html></html>\n"
"Tu donneras la totalité du HTML réécrit, et rien d'autre, ni avant ni après. "
"Ne résume jamais les informations, ne réorganise pas le contenu et ne supprime aucune section.\n\n"
"Voici tout d'abord les règles à suivre pour avoir un document accessible :\n\n"
"1. Limiter l'italique et les soulignements.\n"
"2. S'il y a des tableaux, insérer un tiret dans les cellules ne contenant pas d’information, et associer une légende aux tableaux.\n"
"3. Pour les titres, utilise absolument les balises h1, h2, h3, h4, h5 et h6. Utilise la balise h1 pour le titre qui a le plus grand niveau.\n\n"
"On évite les balises <ul> et <li>\n"
"Encore une fois, fais bien attention à reproduire fidèlement l'ordre des marqueurs <!--PAGE_X-->, dans l'ordre croissant des X : c'est ta tâche principale. Recompte régulièrement les X des PAGE_X pour être sûr qu'il n'en manque aucun.\n"
"N'oublie pas qu'on ne doit avoir AUCUN header et AUCUN footer, c'est très important.\n"
"Voici maintenant le fichier HTML d'origine :\n"
+ html_content
)
try:
logging.debug("Contenu avant l'appel à l'API OpenAI :")
logging.debug(html_content)
response = await client.chat.completions.create(
model="o1-mini",
messages=[
{"role": "user", "content": prompt}
],
)
rewritten_html = response.choices[0].message.content.strip()
rewritten_html = rewritten_html.replace("&lt;!--", "<!--").replace("--&gt;", "-->")
logging.debug("Contenu après l'appel à l'API OpenAI :")
logging.debug(rewritten_html)
return rewritten_html
except Exception as e:
logging.error(f"Erreur lors de la réécriture du HTML : {str(e)}")
return html_content
async def clean_html_content(html_content: str, image_counter: List[int], images_data: Dict[str, Dict[str, str]]) -> str:
soup = BeautifulSoup(html_content, 'html.parser')
for tag in soup.find_all():
if 'style' in tag.attrs:
del tag['style']
for element in soup.find_all(['header', 'footer']):
element.decompose()
for div in soup.find_all('div'):
if div.get_text(strip=True).isdigit():
div.decompose()
for span in soup.find_all('span'):
span.unwrap()
img_tags = soup.find_all('img')
if img_tags:
if len(img_tags) > 20:
logging.warning(f"Number of images ({len(img_tags)}) exceeds 20. Images will be ignored.")
for img in img_tags:
img.decompose()
else:
for img in img_tags:
src = img.get('src', '')
X = image_counter[0]
if src.startswith('data:image/'):
base64_image = encode_image_from_data_uri(src)
if base64_image:
images_data[f"IMG_{X}"] = {
'base64_image': base64_image
}
placeholder = f"<!--IMG_{X}-->"
img.replace_with(BeautifulSoup(placeholder, 'html.parser'))
image_counter[0] += 1
else:
img.decompose()
else:
img.decompose()
else:
logging.debug("No <img> tags found in the HTML content.")
for img in soup.find_all('img'):
img.decompose()
scripts_to_remove = soup.find_all('script', src=True)
for script in scripts_to_remove:
src = script['src']
if src.startswith('https://bentham-converttohtml.hf.space/'):
script.decompose()
for tag in soup.find_all('p'):
if not tag.get_text(strip=True):
tag.decompose()
return str(soup)
def reinsert_images(html_content: str, images_data: Dict[str, Dict[str, str]]) -> str:
soup = BeautifulSoup(html_content, 'html.parser')
for comment in soup.find_all(string=lambda text: isinstance(text, Comment)):
match = re.match(r'IMG_(\d+)', comment)
if match:
image_number = match.group(1)
image_key = f"IMG_{image_number}"
if image_key in images_data:
img_tag = soup.new_tag('img')
img_tag['src'] = f"data:image/jpeg;base64,{images_data[image_key]['base64_image']}"
img_tag['alt'] = images_data[image_key]['description']
new_content = soup.new_tag('div')
new_content.append(img_tag)
p_tag = soup.new_tag('p', attrs={'class': 'description'})
strong_tag = soup.new_tag('strong')
strong_tag.string = f"Image {image_number}"
p_tag.append(strong_tag)
p_tag.append(" : ")
y_markdown = images_data[image_key]['description']
y_html = markdown_to_html(y_markdown)
y_soup = BeautifulSoup(y_html, 'html.parser')
p_tag.append(y_soup)
new_content.append(p_tag)
comment.replace_with(new_content)
else:
logging.error(f"Données pour {image_key} non trouvées.")
return str(soup)
def pdf_to_html(input_filename: str) -> str:
soup = BeautifulSoup("<html><head></head><body></body></html>", 'html.parser')
body = soup.body
page_number = 1
with fitz.open(input_filename) as doc:
for page in doc:
page_comment = f"<!--PAGE_{page_number}-->"
body.append(BeautifulSoup(page_comment, 'html.parser'))
page_html = page.get_text("html")
page_fragment = BeautifulSoup(page_html, 'html.parser')
body.append(page_fragment)
page_number += 1
return str(soup)
def convert_with_pandoc(input_filename: str, input_format: str) -> str:
try:
output = pypandoc.convert_file(
input_filename,
'html',
format=input_format,
outputfile=None,
extra_args=['--self-contained', '--strip-comments', '--quiet']
)
return output
except RuntimeError as e:
logging.error(f"Pandoc a rencontré une erreur : {str(e)}, tentative sans --self-contained.")
output = pypandoc.convert_file(
input_filename,
'html',
format=input_format,
outputfile=None,
extra_args=['--strip-comments', '--quiet']
)
return output
def text_to_html(text: str) -> str:
lines = text.split('\n')
html_lines = ['<p>' + line.strip() + '</p>' for line in lines if line.strip()]
return "<html><head></head><body>" + "\n".join(html_lines) + "</body></html>"
def convert_ppt_to_text(input_filename: str) -> str:
if 'Presentation' not in globals():
raise HTTPException(status_code=500, detail="La librairie python-pptx n'est pas installée.")
prs = Presentation(input_filename)
text_content = []
for slide in prs.slides:
for shape in slide.shapes:
if hasattr(shape, "text"):
text_content.append(shape.text)
return "\n".join(text_content)
def convert_doc_to_text(input_filename: str) -> str:
if 'textract' not in globals():
raise HTTPException(status_code=500, detail="La librairie textract n'est pas installée.")
text = textract.process(input_filename).decode('utf-8', errors='replace')
return text
def clean_html_file(input_filepath: str, cleaned_output_filepath: str) -> bool:
try:
with open(input_filepath, 'r', encoding='utf-8') as f:
html_content = f.read()
doc = Document(html_content)
cleaned_html = doc.summary()
with open(cleaned_output_filepath, 'w', encoding='utf-8') as f:
f.write(cleaned_html)
logging.debug("Contenu HTML nettoyé avec readability-lxml.")
return True
except Exception as e:
logging.error(f"Erreur lors du nettoyage du fichier HTML {input_filepath} : {str(e)}")
return False
@app.post("/accessibility/")
async def convert_file_to_html(
file: UploadFile = File(...),
background_tasks: BackgroundTasks = BackgroundTasks()
):
try:
job_id = str(uuid.uuid4())
job_dir = os.path.join(JOBS_DIR, job_id)
os.makedirs(job_dir)
ext = os.path.splitext(file.filename)[1].lower()
if ext not in ALLOWED_EXTENSIONS_FOR_ACCESSIBILITY:
raise HTTPException(status_code=400, detail=f"Extension de fichier non supportée : {ext}")
input_file_path = os.path.join(job_dir, f'input{ext}')
with open(input_file_path, "wb") as f:
shutil.copyfileobj(file.file, f)
status = {
'status': 'pending',
'message': 'Traitement démarré',
'created_at': time.time()
}
status_file = os.path.join(job_dir, 'status.json')
with open(status_file, 'w') as f:
json.dump(status, f)
background_tasks.add_task(process_file, job_id, input_file_path, ext, file.filename)
return JSONResponse(content={'job_id': job_id})
except Exception as e:
logging.error(f"Erreur lors du démarrage du job : {str(e)}")
return JSONResponse(status_code=500, content={"message": f"Erreur lors du démarrage du job : {str(e)}"})
@app.get("/status/{job_id}")
async def check_status(job_id: str):
status_data = get_job_status(job_id)
if status_data is None:
return JSONResponse(status_code=404, content={"message": "Job non trouvé"})
return JSONResponse(content=status_data)
@app.get("/result/{job_id}")
async def get_result(job_id: str):
job_dir = os.path.join(JOBS_DIR, job_id)
status_data = get_job_status(job_id)
if status_data is None:
return JSONResponse(status_code=404, content={"message": "Job non trouvé"})
if status_data.get('status') != 'completed':
return JSONResponse(status_code=400, content={"message": "Résultat non prêt"})
result_file = status_data.get('result_file')
if not result_file:
return JSONResponse(status_code=500, content={"message": "Fichier résultat non trouvé"})
result_file_path = os.path.join(job_dir, result_file)
if not os.path.exists(result_file_path):
return JSONResponse(status_code=500, content={"message": "Fichier résultat non trouvé sur le serveur"})
return FileResponse(result_file_path, filename=os.path.basename(result_file_path), media_type='text/html')
def delete_temp_files(file_paths: list):
for file_path in file_paths:
try:
if os.path.exists(file_path):
os.remove(file_path)
logging.debug(f"Fichier temporaire supprimé : {file_path}")
except Exception as e:
logging.error(f"Erreur lors de la suppression du fichier {file_path} : {str(e)}")
@app.post("/convert_to_txt/")
async def convert_file_to_txt(
file: UploadFile = File(...),
background_tasks: BackgroundTasks = BackgroundTasks()
):
try:
original_filename = file.filename
base_filename, ext = os.path.splitext(original_filename)
ext = ext.lower()
allowed_extensions = [
'.odt', '.pdf', '.docx', '.html', '.htm', '.md', '.txt', '.rtf', '.epub',
'.tex', '.xml', '.org', '.commonmark', '.cm', '.wiki', '.opml',
'.ppt', '.pptx', '.doc'
]
if ext not in allowed_extensions:
raise HTTPException(status_code=400, detail=f"Extension de fichier non supportée : {ext}")
with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as input_tmp_file:
input_filename = input_tmp_file.name
with open(input_filename, "wb") as f:
shutil.copyfileobj(file.file, f)
logging.debug(f"Fichier téléchargé enregistré : {input_filename}")
if ext in ['.html', '.htm']:
cleaned_input_filename = input_filename + '_cleaned.html'
nettoyage_reussi = clean_html_file(input_filename, cleaned_input_filename)
if not nettoyage_reussi:
raise HTTPException(status_code=500, detail="Erreur lors du nettoyage du fichier HTML.")
input_filename = cleaned_input_filename
logging.debug(f"Fichier HTML nettoyé enregistré : {input_filename}")
unique_id = uuid.uuid4().hex
output_filename = os.path.join(tempfile.gettempdir(), f"{base_filename}_{unique_id}.txt")
if ext == '.pdf':
text = ""
with fitz.open(input_filename) as doc:
for page in doc:
text += page.get_text()
with open(output_filename, "w", encoding="utf-8") as f:
f.write(text)
elif ext == '.pptx':
if 'Presentation' not in globals():
raise HTTPException(status_code=500, detail="La librairie python-pptx n'est pas installée.")
prs = Presentation(input_filename)
text_content = []
for slide in prs.slides:
for shape in slide.shapes:
if hasattr(shape, "text"):
text_content.append(shape.text)
text = "\n".join(text_content)
with open(output_filename, "w", encoding="utf-8") as f:
f.write(text)
elif ext == '.ppt':
if 'textract' not in globals():
raise HTTPException(status_code=500, detail="La librairie textract n'est pas installée.")
text = textract.process(input_filename).decode('utf-8', errors='replace')
with open(output_filename, "w", encoding="utf-8") as f:
f.write(text)
elif ext == '.doc':
if 'textract' not in globals():
raise HTTPException(status_code=500, detail="La librairie textract n'est pas installée.")
text = textract.process(input_filename).decode('utf-8', errors='replace')
with open(output_filename, "w", encoding="utf-8") as f:
f.write(text)
else:
output = pypandoc.convert_file(input_filename, 'plain', outputfile=output_filename)
if not os.path.exists(output_filename):
logging.error(f"Le fichier {output_filename} n'a pas été généré.")
raise HTTPException(status_code=500, detail="Erreur lors de la conversion.")
temp_files_to_delete = [input_filename, output_filename]
if ext in ['.html', '.htm']:
temp_files_to_delete.append(cleaned_input_filename)
background_tasks.add_task(delete_temp_files, temp_files_to_delete)
return FileResponse(output_filename, filename=f"{base_filename}.txt")
except HTTPException as http_exc:
logging.error(f"Erreur HTTP lors de la conversion : {str(http_exc.detail)}")
return JSONResponse(status_code=http_exc.status_code, content={"message": http_exc.detail})
except Exception as e:
logging.error(f"Erreur interne lors de la conversion : {str(e)}")
return JSONResponse(status_code=500, content={"message": f"Erreur interne : {str(e)}"})