Eps2 / app.py
Docfile's picture
Update app.py
481cfe6 verified
raw
history blame
9.94 kB
from flask import Flask, render_template, request, jsonify, send_file
import os
import json
import uuid
import threading
import time
from datetime import datetime
import base64
import mimetypes
from google import genai
from google.genai import types
from reportlab.lib.pagesizes import A4
from reportlab.platypus import SimpleDocTemplate, Image
from reportlab.lib.utils import ImageReader
from PIL import Image as PILImage
import io
app = Flask(__name__)
# Configuration
UPLOAD_FOLDER = 'generated_pages'
PDF_FOLDER = 'generated_pdfs'
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
os.makedirs(PDF_FOLDER, exist_ok=True)
# Stockage des tâches en cours
active_tasks = {}
class MangaGenerator:
def __init__(self, api_key):
self.client = genai.Client(api_key=api_key)
self.model = "gemini-2.5-flash-image-preview"
def save_binary_file(self, file_name, data):
"""Sauvegarde un fichier binaire"""
try:
with open(file_name, "wb") as f:
f.write(data)
return True
except Exception as e:
print(f"Erreur lors de la sauvegarde: {e}")
return False
def generate_page(self, prompt, page_number, task_id):
"""Génère une page de manga avec le prompt donné"""
try:
contents = [
types.Content(
role="user",
parts=[
types.Part.from_text(text=prompt),
],
),
]
generate_content_config = types.GenerateContentConfig(
response_modalities=["IMAGE", "TEXT"],
)
generated_files = []
for chunk in self.client.models.generate_content_stream(
model=self.model,
contents=contents,
config=generate_content_config,
):
if (
chunk.candidates is None
or chunk.candidates[0].content is None
or chunk.candidates[0].content.parts is None
):
continue
if (chunk.candidates[0].content.parts[0].inline_data and
chunk.candidates[0].content.parts[0].inline_data.data):
file_name = f"{UPLOAD_FOLDER}/page_{page_number}_{task_id}"
inline_data = chunk.candidates[0].content.parts[0].inline_data
data_buffer = inline_data.data
file_extension = mimetypes.guess_extension(inline_data.mime_type) or '.png'
full_path = f"{file_name}{file_extension}"
if self.save_binary_file(full_path, data_buffer):
generated_files.append(full_path)
else:
if hasattr(chunk, 'text') and chunk.text:
print(f"Texte généré pour page {page_number}: {chunk.text}")
return generated_files
except Exception as e:
print(f"Erreur lors de la génération de la page {page_number}: {e}")
return []
def create_pdf(image_paths, output_path):
"""Crée un PDF à partir des images générées"""
try:
doc = SimpleDocTemplate(output_path, pagesize=A4)
story = []
for img_path in sorted(image_paths):
if os.path.exists(img_path):
# Redimensionner l'image pour s'adapter à la page A4
with PILImage.open(img_path) as img:
img_buffer = io.BytesIO()
img.save(img_buffer, format='PNG')
img_buffer.seek(0)
# Calculer les dimensions pour s'adapter à A4
page_width, page_height = A4
img_width, img_height = img.size
# Maintenir le ratio d'aspect
ratio = min(page_width/img_width, page_height/img_height) * 0.9
new_width = img_width * ratio
new_height = img_height * ratio
story.append(Image(ImageReader(img_buffer),
width=new_width, height=new_height))
doc.build(story)
return True
except Exception as e:
print(f"Erreur lors de la création du PDF: {e}")
return False
def generate_manga_task(manga_data, task_id):
"""Tâche de génération de manga qui s'exécute en arrière-plan"""
try:
api_key = os.environ.get("GEMINI_API_KEY")
if not api_key:
active_tasks[task_id]['status'] = 'error'
active_tasks[task_id]['error'] = 'API Key Gemini non trouvée'
return
generator = MangaGenerator(api_key)
active_tasks[task_id]['status'] = 'generating'
active_tasks[task_id]['total_pages'] = len(manga_data)
active_tasks[task_id]['current_page'] = 0
generated_files = []
# Trier les parties par ordre numérique
sorted_parts = sorted(manga_data.items(), key=lambda x: int(x[0].split('-')[1]))
for i, (part_key, prompt) in enumerate(sorted_parts, 1):
active_tasks[task_id]['current_page'] = i
active_tasks[task_id]['current_part'] = part_key
print(f"Génération de la page {i}/{len(sorted_parts)} - {part_key}")
page_files = generator.generate_page(prompt, i, task_id)
generated_files.extend(page_files)
# Attendre un peu entre les générations pour éviter les limites de taux
time.sleep(2)
# Créer le PDF
active_tasks[task_id]['status'] = 'creating_pdf'
pdf_path = f"{PDF_FOLDER}/manga_{task_id}.pdf"
if create_pdf(generated_files, pdf_path):
active_tasks[task_id]['status'] = 'completed'
active_tasks[task_id]['pdf_path'] = pdf_path
active_tasks[task_id]['completed_at'] = datetime.now().isoformat()
else:
active_tasks[task_id]['status'] = 'error'
active_tasks[task_id]['error'] = 'Erreur lors de la création du PDF'
except Exception as e:
active_tasks[task_id]['status'] = 'error'
active_tasks[task_id]['error'] = str(e)
print(f"Erreur dans la tâche {task_id}: {e}")
@app.route('/')
def index():
return render_template('index.html')
@app.route('/generate', methods=['POST'])
def generate():
try:
data = request.get_json()
if not data:
return jsonify({'error': 'Aucune donnée JSON fournie'}), 400
# Valider que les données contiennent des parties
manga_parts = {k: v for k, v in data.items() if k.startswith('partie-')}
if not manga_parts:
return jsonify({'error': 'Aucune partie trouvée dans les données'}), 400
# Créer une nouvelle tâche
task_id = str(uuid.uuid4())
active_tasks[task_id] = {
'status': 'queued',
'created_at': datetime.now().isoformat(),
'manga_data': manga_parts
}
# Lancer la génération en arrière-plan
thread = threading.Thread(target=generate_manga_task, args=(manga_parts, task_id))
thread.daemon = True
thread.start()
return jsonify({
'task_id': task_id,
'status': 'queued',
'message': 'Génération démarrée'
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/status/<task_id>')
def get_status(task_id):
if task_id not in active_tasks:
return jsonify({'error': 'Tâche non trouvée'}), 404
task = active_tasks[task_id].copy()
# Ne pas renvoyer les données du manga dans le status pour économiser la bande passante
if 'manga_data' in task:
del task['manga_data']
return jsonify(task)
@app.route('/download/<task_id>')
def download_pdf(task_id):
if task_id not in active_tasks:
return jsonify({'error': 'Tâche non trouvée'}), 404
task = active_tasks[task_id]
if task['status'] != 'completed' or 'pdf_path' not in task:
return jsonify({'error': 'PDF non disponible'}), 400
pdf_path = task['pdf_path']
if not os.path.exists(pdf_path):
return jsonify({'error': 'Fichier PDF non trouvé'}), 404
return send_file(pdf_path, as_attachment=True,
download_name=f'manga_{task_id}.pdf')
@app.route('/tasks')
def list_tasks():
# Nettoyer les tâches anciennes (plus de 24h)
current_time = datetime.now()
to_remove = []
for task_id, task in active_tasks.items():
created_at = datetime.fromisoformat(task['created_at'])
if (current_time - created_at).total_seconds() > 86400: # 24 heures
to_remove.append(task_id)
for task_id in to_remove:
del active_tasks[task_id]
# Retourner la liste des tâches sans les données manga
tasks_summary = {}
for task_id, task in active_tasks.items():
task_copy = task.copy()
if 'manga_data' in task_copy:
del task_copy['manga_data']
tasks_summary[task_id] = task_copy
return jsonify(tasks_summary)
if __name__ == '__main__':
# Vérifier que l'API key est configurée
if not os.environ.get("GEMINI_API_KEY"):
print("⚠️ ATTENTION: La variable d'environnement GEMINI_API_KEY n'est pas définie!")
print(" Définissez-la avec: export GEMINI_API_KEY=votre_clé_api")
app.run(debug=True, host='0.0.0.0', port=5000)