|
from flask import Flask, render_template, request, jsonify, send_file |
|
import os |
|
import json |
|
import uuid |
|
import threading |
|
import time |
|
from datetime import datetime |
|
import base64 |
|
import mimetypes |
|
from google import genai |
|
from google.genai import types |
|
from reportlab.lib.pagesizes import A4 |
|
from reportlab.platypus import SimpleDocTemplate, Image |
|
from reportlab.lib.utils import ImageReader |
|
from PIL import Image as PILImage |
|
import io |
|
|
|
app = Flask(__name__) |
|
|
|
|
|
UPLOAD_FOLDER = 'generated_pages' |
|
PDF_FOLDER = 'generated_pdfs' |
|
os.makedirs(UPLOAD_FOLDER, exist_ok=True) |
|
os.makedirs(PDF_FOLDER, exist_ok=True) |
|
|
|
|
|
active_tasks = {} |
|
|
|
class MangaGenerator: |
|
def __init__(self, api_key): |
|
self.client = genai.Client(api_key=api_key) |
|
self.model = "gemini-2.5-flash-image-preview" |
|
|
|
def save_binary_file(self, file_name, data): |
|
"""Sauvegarde un fichier binaire""" |
|
try: |
|
with open(file_name, "wb") as f: |
|
f.write(data) |
|
return True |
|
except Exception as e: |
|
print(f"Erreur lors de la sauvegarde: {e}") |
|
return False |
|
|
|
def generate_page(self, prompt, page_number, task_id): |
|
"""Génère une page de manga avec le prompt donné""" |
|
try: |
|
contents = [ |
|
types.Content( |
|
role="user", |
|
parts=[ |
|
types.Part.from_text(text=prompt), |
|
], |
|
), |
|
] |
|
|
|
generate_content_config = types.GenerateContentConfig( |
|
response_modalities=["IMAGE", "TEXT"], |
|
) |
|
|
|
generated_files = [] |
|
|
|
for chunk in self.client.models.generate_content_stream( |
|
model=self.model, |
|
contents=contents, |
|
config=generate_content_config, |
|
): |
|
if ( |
|
chunk.candidates is None |
|
or chunk.candidates[0].content is None |
|
or chunk.candidates[0].content.parts is None |
|
): |
|
continue |
|
|
|
if (chunk.candidates[0].content.parts[0].inline_data and |
|
chunk.candidates[0].content.parts[0].inline_data.data): |
|
|
|
file_name = f"{UPLOAD_FOLDER}/page_{page_number}_{task_id}" |
|
inline_data = chunk.candidates[0].content.parts[0].inline_data |
|
data_buffer = inline_data.data |
|
file_extension = mimetypes.guess_extension(inline_data.mime_type) or '.png' |
|
|
|
full_path = f"{file_name}{file_extension}" |
|
if self.save_binary_file(full_path, data_buffer): |
|
generated_files.append(full_path) |
|
else: |
|
if hasattr(chunk, 'text') and chunk.text: |
|
print(f"Texte généré pour page {page_number}: {chunk.text}") |
|
|
|
return generated_files |
|
|
|
except Exception as e: |
|
print(f"Erreur lors de la génération de la page {page_number}: {e}") |
|
return [] |
|
|
|
def create_pdf(image_paths, output_path): |
|
"""Crée un PDF à partir des images générées""" |
|
try: |
|
doc = SimpleDocTemplate(output_path, pagesize=A4) |
|
story = [] |
|
|
|
for img_path in sorted(image_paths): |
|
if os.path.exists(img_path): |
|
|
|
with PILImage.open(img_path) as img: |
|
img_buffer = io.BytesIO() |
|
img.save(img_buffer, format='PNG') |
|
img_buffer.seek(0) |
|
|
|
|
|
page_width, page_height = A4 |
|
img_width, img_height = img.size |
|
|
|
|
|
ratio = min(page_width/img_width, page_height/img_height) * 0.9 |
|
new_width = img_width * ratio |
|
new_height = img_height * ratio |
|
|
|
story.append(Image(ImageReader(img_buffer), |
|
width=new_width, height=new_height)) |
|
|
|
doc.build(story) |
|
return True |
|
|
|
except Exception as e: |
|
print(f"Erreur lors de la création du PDF: {e}") |
|
return False |
|
|
|
def generate_manga_task(manga_data, task_id): |
|
"""Tâche de génération de manga qui s'exécute en arrière-plan""" |
|
try: |
|
api_key = os.environ.get("GEMINI_API_KEY") |
|
if not api_key: |
|
active_tasks[task_id]['status'] = 'error' |
|
active_tasks[task_id]['error'] = 'API Key Gemini non trouvée' |
|
return |
|
|
|
generator = MangaGenerator(api_key) |
|
active_tasks[task_id]['status'] = 'generating' |
|
active_tasks[task_id]['total_pages'] = len(manga_data) |
|
active_tasks[task_id]['current_page'] = 0 |
|
|
|
generated_files = [] |
|
|
|
|
|
sorted_parts = sorted(manga_data.items(), key=lambda x: int(x[0].split('-')[1])) |
|
|
|
for i, (part_key, prompt) in enumerate(sorted_parts, 1): |
|
active_tasks[task_id]['current_page'] = i |
|
active_tasks[task_id]['current_part'] = part_key |
|
|
|
print(f"Génération de la page {i}/{len(sorted_parts)} - {part_key}") |
|
|
|
page_files = generator.generate_page(prompt, i, task_id) |
|
generated_files.extend(page_files) |
|
|
|
|
|
time.sleep(2) |
|
|
|
|
|
active_tasks[task_id]['status'] = 'creating_pdf' |
|
pdf_path = f"{PDF_FOLDER}/manga_{task_id}.pdf" |
|
|
|
if create_pdf(generated_files, pdf_path): |
|
active_tasks[task_id]['status'] = 'completed' |
|
active_tasks[task_id]['pdf_path'] = pdf_path |
|
active_tasks[task_id]['completed_at'] = datetime.now().isoformat() |
|
else: |
|
active_tasks[task_id]['status'] = 'error' |
|
active_tasks[task_id]['error'] = 'Erreur lors de la création du PDF' |
|
|
|
except Exception as e: |
|
active_tasks[task_id]['status'] = 'error' |
|
active_tasks[task_id]['error'] = str(e) |
|
print(f"Erreur dans la tâche {task_id}: {e}") |
|
|
|
@app.route('/') |
|
def index(): |
|
return render_template('index.html') |
|
|
|
@app.route('/generate', methods=['POST']) |
|
def generate(): |
|
try: |
|
data = request.get_json() |
|
|
|
if not data: |
|
return jsonify({'error': 'Aucune donnée JSON fournie'}), 400 |
|
|
|
|
|
manga_parts = {k: v for k, v in data.items() if k.startswith('partie-')} |
|
|
|
if not manga_parts: |
|
return jsonify({'error': 'Aucune partie trouvée dans les données'}), 400 |
|
|
|
|
|
task_id = str(uuid.uuid4()) |
|
active_tasks[task_id] = { |
|
'status': 'queued', |
|
'created_at': datetime.now().isoformat(), |
|
'manga_data': manga_parts |
|
} |
|
|
|
|
|
thread = threading.Thread(target=generate_manga_task, args=(manga_parts, task_id)) |
|
thread.daemon = True |
|
thread.start() |
|
|
|
return jsonify({ |
|
'task_id': task_id, |
|
'status': 'queued', |
|
'message': 'Génération démarrée' |
|
}) |
|
|
|
except Exception as e: |
|
return jsonify({'error': str(e)}), 500 |
|
|
|
@app.route('/status/<task_id>') |
|
def get_status(task_id): |
|
if task_id not in active_tasks: |
|
return jsonify({'error': 'Tâche non trouvée'}), 404 |
|
|
|
task = active_tasks[task_id].copy() |
|
|
|
if 'manga_data' in task: |
|
del task['manga_data'] |
|
|
|
return jsonify(task) |
|
|
|
@app.route('/download/<task_id>') |
|
def download_pdf(task_id): |
|
if task_id not in active_tasks: |
|
return jsonify({'error': 'Tâche non trouvée'}), 404 |
|
|
|
task = active_tasks[task_id] |
|
if task['status'] != 'completed' or 'pdf_path' not in task: |
|
return jsonify({'error': 'PDF non disponible'}), 400 |
|
|
|
pdf_path = task['pdf_path'] |
|
if not os.path.exists(pdf_path): |
|
return jsonify({'error': 'Fichier PDF non trouvé'}), 404 |
|
|
|
return send_file(pdf_path, as_attachment=True, |
|
download_name=f'manga_{task_id}.pdf') |
|
|
|
@app.route('/tasks') |
|
def list_tasks(): |
|
|
|
current_time = datetime.now() |
|
to_remove = [] |
|
|
|
for task_id, task in active_tasks.items(): |
|
created_at = datetime.fromisoformat(task['created_at']) |
|
if (current_time - created_at).total_seconds() > 86400: |
|
to_remove.append(task_id) |
|
|
|
for task_id in to_remove: |
|
del active_tasks[task_id] |
|
|
|
|
|
tasks_summary = {} |
|
for task_id, task in active_tasks.items(): |
|
task_copy = task.copy() |
|
if 'manga_data' in task_copy: |
|
del task_copy['manga_data'] |
|
tasks_summary[task_id] = task_copy |
|
|
|
return jsonify(tasks_summary) |
|
|
|
if __name__ == '__main__': |
|
|
|
if not os.environ.get("GEMINI_API_KEY"): |
|
print("⚠️ ATTENTION: La variable d'environnement GEMINI_API_KEY n'est pas définie!") |
|
print(" Définissez-la avec: export GEMINI_API_KEY=votre_clé_api") |
|
|
|
app.run(debug=True, host='0.0.0.0', port=5000) |