|
|
|
|
|
|
|
from flask import Flask, render_template, request, jsonify, send_file, send_from_directory |
|
import os |
|
import json |
|
import uuid |
|
import threading |
|
import time |
|
from datetime import datetime |
|
import mimetypes |
|
from google import genai |
|
from google.genai import types |
|
import zipfile |
|
|
|
|
|
|
|
|
|
app = Flask(__name__) |
|
|
|
|
|
UPLOAD_FOLDER = 'generated_pages' |
|
ZIP_FOLDER = 'generated_zips' |
|
os.makedirs(UPLOAD_FOLDER, exist_ok=True) |
|
os.makedirs(ZIP_FOLDER, exist_ok=True) |
|
|
|
|
|
active_tasks = {} |
|
|
|
|
|
|
|
|
|
class MangaGenerator: |
|
"""Gère la communication avec l'API Google Gemini pour générer des images.""" |
|
def __init__(self, api_key): |
|
self.client = genai.Client(api_key=api_key) |
|
self.model = "gemini-2.5-flash-image-preview" |
|
|
|
def save_binary_file(self, file_name, data): |
|
"""Sauvegarde les données binaires (image) dans un fichier.""" |
|
try: |
|
with open(file_name, "wb") as f: |
|
f.write(data) |
|
return True |
|
except Exception as e: |
|
print(f"Erreur lors de la sauvegarde du fichier {file_name}: {e}") |
|
return False |
|
|
|
def generate_page(self, prompt, page_number, task_id): |
|
"""Envoie un prompt à Gemini et sauvegarde l'image générée.""" |
|
try: |
|
contents = [types.Content(role="user", parts=[types.Part.from_text(text=prompt)])] |
|
generate_content_config = types.GenerateContentConfig(response_modalities=["IMAGE", "TEXT"]) |
|
generated_files = [] |
|
|
|
for chunk in self.client.models.generate_content_stream( |
|
model=self.model, |
|
contents=contents, |
|
config=generate_content_config, |
|
): |
|
if (chunk.candidates and chunk.candidates[0].content and chunk.candidates[0].content.parts): |
|
part = chunk.candidates[0].content.parts[0] |
|
if part.inline_data and part.inline_data.data: |
|
file_name_base = f"{UPLOAD_FOLDER}/page_{page_number}_{task_id}" |
|
data_buffer = part.inline_data.data |
|
file_extension = mimetypes.guess_extension(part.inline_data.mime_type) or '.png' |
|
|
|
full_path = f"{file_name_base}{file_extension}" |
|
if self.save_binary_file(full_path, data_buffer): |
|
generated_files.append(full_path) |
|
|
|
return generated_files |
|
|
|
except Exception as e: |
|
print(f"Erreur lors de la génération de la page {page_number}: {e}") |
|
return [] |
|
|
|
|
|
|
|
|
|
def create_zip(image_paths, output_path): |
|
"""Crée une archive ZIP à partir d'une liste de chemins d'images.""" |
|
try: |
|
with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zipf: |
|
for img_path in sorted(image_paths): |
|
if os.path.exists(img_path): |
|
|
|
zipf.write(img_path, os.path.basename(img_path)) |
|
return True |
|
except Exception as e: |
|
print(f"Erreur lors de la création de l'archive ZIP: {e}") |
|
return False |
|
|
|
|
|
|
|
|
|
def generate_manga_task(manga_data, task_id): |
|
""" |
|
Tâche exécutée en arrière-plan (thread) pour générer toutes les pages du manga. |
|
""" |
|
try: |
|
api_key = os.environ.get("GEMINI_API_KEY") |
|
if not api_key: |
|
active_tasks[task_id].update({'status': 'error', 'error': 'API Key Gemini non trouvée'}) |
|
return |
|
|
|
generator = MangaGenerator(api_key) |
|
|
|
|
|
active_tasks[task_id].update({ |
|
'status': 'generating', |
|
'total_pages': len(manga_data), |
|
'current_page': 0, |
|
'generated_files': [] |
|
}) |
|
|
|
|
|
sorted_parts = sorted(manga_data.items(), key=lambda x: int(x[0].split('-')[1])) |
|
|
|
|
|
for i, (part_key, prompt) in enumerate(sorted_parts, 1): |
|
active_tasks[task_id].update({'current_page': i, 'current_part': part_key}) |
|
print(f"Génération de la page {i}/{len(sorted_parts)} pour la tâche {task_id}") |
|
|
|
page_files = generator.generate_page(prompt, i, task_id) |
|
active_tasks[task_id]['generated_files'].extend(page_files) |
|
|
|
time.sleep(1) |
|
|
|
|
|
active_tasks[task_id]['status'] = 'creating_zip' |
|
zip_path = f"{ZIP_FOLDER}/manga_{task_id}.zip" |
|
|
|
if create_zip(active_tasks[task_id]['generated_files'], zip_path): |
|
active_tasks[task_id].update({ |
|
'status': 'completed', |
|
'zip_path': zip_path, |
|
'completed_at': datetime.now().isoformat() |
|
}) |
|
else: |
|
active_tasks[task_id].update({ |
|
'status': 'error', |
|
'error': "Erreur lors de la création de l'archive ZIP" |
|
}) |
|
|
|
except Exception as e: |
|
active_tasks[task_id].update({'status': 'error', 'error': str(e)}) |
|
print(f"Erreur majeure dans la tâche {task_id}: {e}") |
|
|
|
|
|
|
|
|
|
@app.route('/') |
|
def index(): |
|
"""Sert la page principale de l'application.""" |
|
return render_template('index.html') |
|
|
|
@app.route('/generated_pages/<filename>') |
|
def serve_generated_image(filename): |
|
""" |
|
Sert les fichiers images individuels pour l'affichage progressif sur la page web. |
|
""" |
|
return send_from_directory(UPLOAD_FOLDER, filename) |
|
|
|
@app.route('/generate', methods=['POST']) |
|
def generate(): |
|
""" |
|
Démarre une nouvelle tâche de génération de manga. |
|
""" |
|
try: |
|
data = request.get_json() |
|
if not data: |
|
return jsonify({'error': 'Aucune donnée JSON fournie'}), 400 |
|
|
|
manga_parts = {k: v for k, v in data.items() if k.startswith('partie-')} |
|
if not manga_parts: |
|
return jsonify({'error': 'Aucune "partie-X" trouvée dans les données'}), 400 |
|
|
|
task_id = str(uuid.uuid4()) |
|
active_tasks[task_id] = { |
|
'status': 'queued', |
|
'created_at': datetime.now().isoformat() |
|
} |
|
|
|
|
|
thread = threading.Thread(target=generate_manga_task, args=(manga_parts, task_id)) |
|
thread.daemon = True |
|
thread.start() |
|
|
|
return jsonify({'task_id': task_id, 'status': 'queued'}) |
|
|
|
except Exception as e: |
|
return jsonify({'error': str(e)}), 500 |
|
|
|
@app.route('/status/<task_id>') |
|
def get_status(task_id): |
|
""" |
|
Retourne le statut actuel d'une tâche de génération. |
|
""" |
|
if task_id not in active_tasks: |
|
return jsonify({'error': 'Tâche non trouvée'}), 404 |
|
|
|
task = active_tasks[task_id].copy() |
|
|
|
|
|
if 'generated_files' in task: |
|
task['image_urls'] = [f"/generated_pages/{os.path.basename(p)}" for p in task['generated_files']] |
|
|
|
return jsonify(task) |
|
|
|
@app.route('/download/<task_id>') |
|
def download_zip(task_id): |
|
""" |
|
Permet de télécharger l'archive ZIP finale. |
|
""" |
|
if task_id not in active_tasks: |
|
return jsonify({'error': 'Tâche non trouvée'}), 404 |
|
|
|
task = active_tasks[task_id] |
|
if task.get('status') != 'completed' or 'zip_path' not in task: |
|
return jsonify({'error': 'Archive ZIP non disponible ou non finalisée'}), 400 |
|
|
|
zip_path = task['zip_path'] |
|
if not os.path.exists(zip_path): |
|
return jsonify({'error': 'Fichier ZIP non trouvé sur le serveur'}), 404 |
|
|
|
return send_file(zip_path, as_attachment=True, download_name=f'manga_{task_id}.zip') |
|
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
|
if not os.environ.get("GEMINI_API_KEY"): |
|
print("\n⚠️ ATTENTION: La variable d'environnement GEMINI_API_KEY n'est pas définie!") |
|
print(" Sous Linux/macOS, utilisez: export GEMINI_API_KEY='votre_clé_api'") |
|
print(" Sous Windows (cmd), utilisez: set GEMINI_API_KEY=votre_clé_api\n") |
|
|
|
app.run(debug=True, host='0.0.0.0', port=5000) |