# ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- from flask import Flask, render_template, request, jsonify, send_file, send_from_directory import os import json import uuid import threading import time from datetime import datetime import mimetypes from google import genai from google.genai import types import zipfile # Bibliothèque pour créer des archives ZIP # ----------------------------------------------------------------------------- # Initialisation de l'application Flask et Configuration # ----------------------------------------------------------------------------- app = Flask(__name__) # Configuration des dossiers de sortie UPLOAD_FOLDER = 'generated_pages' ZIP_FOLDER = 'generated_zips' os.makedirs(UPLOAD_FOLDER, exist_ok=True) os.makedirs(ZIP_FOLDER, exist_ok=True) # Dictionnaire pour stocker l'état des tâches en cours active_tasks = {} # ----------------------------------------------------------------------------- # Classe pour interagir avec l'API Gemini # ----------------------------------------------------------------------------- class MangaGenerator: """Gère la communication avec l'API Google Gemini pour générer des images.""" def __init__(self, api_key): self.client = genai.Client(api_key=api_key) self.model = "gemini-2.5-flash-image-preview" def save_binary_file(self, file_name, data): """Sauvegarde les données binaires (image) dans un fichier.""" try: with open(file_name, "wb") as f: f.write(data) return True except Exception as e: print(f"Erreur lors de la sauvegarde du fichier {file_name}: {e}") return False def generate_page(self, prompt, page_number, task_id): """Envoie un prompt à Gemini et sauvegarde l'image générée.""" try: contents = [types.Content(role="user", parts=[types.Part.from_text(text=prompt)])] generate_content_config = types.GenerateContentConfig(response_modalities=["IMAGE", "TEXT"]) generated_files = [] for chunk in self.client.models.generate_content_stream( model=self.model, contents=contents, config=generate_content_config, ): if (chunk.candidates and chunk.candidates[0].content and chunk.candidates[0].content.parts): part = chunk.candidates[0].content.parts[0] if part.inline_data and part.inline_data.data: file_name_base = f"{UPLOAD_FOLDER}/page_{page_number}_{task_id}" data_buffer = part.inline_data.data file_extension = mimetypes.guess_extension(part.inline_data.mime_type) or '.png' full_path = f"{file_name_base}{file_extension}" if self.save_binary_file(full_path, data_buffer): generated_files.append(full_path) return generated_files except Exception as e: print(f"Erreur lors de la génération de la page {page_number}: {e}") return [] # ----------------------------------------------------------------------------- # Fonctions Utilitaires # ----------------------------------------------------------------------------- def create_zip(image_paths, output_path): """Crée une archive ZIP à partir d'une liste de chemins d'images.""" try: with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zipf: for img_path in sorted(image_paths): if os.path.exists(img_path): # On ajoute l'image à l'archive avec son nom de base (sans le chemin du dossier) zipf.write(img_path, os.path.basename(img_path)) return True except Exception as e: print(f"Erreur lors de la création de l'archive ZIP: {e}") return False # ----------------------------------------------------------------------------- # Tâche de fond pour la génération # ----------------------------------------------------------------------------- def generate_manga_task(manga_data, task_id): """ Tâche exécutée en arrière-plan (thread) pour générer toutes les pages du manga. """ try: api_key = os.environ.get("GEMINI_API_KEY") if not api_key: active_tasks[task_id].update({'status': 'error', 'error': 'API Key Gemini non trouvée'}) return generator = MangaGenerator(api_key) # Initialisation de la tâche active_tasks[task_id].update({ 'status': 'generating', 'total_pages': len(manga_data), 'current_page': 0, 'generated_files': [] # Important pour le suivi progressif }) # Trier les parties pour les générer dans l'ordre (partie-1, partie-2, etc.) sorted_parts = sorted(manga_data.items(), key=lambda x: int(x[0].split('-')[1])) # Boucle de génération pour chaque page for i, (part_key, prompt) in enumerate(sorted_parts, 1): active_tasks[task_id].update({'current_page': i, 'current_part': part_key}) print(f"Génération de la page {i}/{len(sorted_parts)} pour la tâche {task_id}") page_files = generator.generate_page(prompt, i, task_id) active_tasks[task_id]['generated_files'].extend(page_files) time.sleep(1) # Petite pause pour éviter de surcharger l'API # Création de l'archive ZIP active_tasks[task_id]['status'] = 'creating_zip' zip_path = f"{ZIP_FOLDER}/manga_{task_id}.zip" if create_zip(active_tasks[task_id]['generated_files'], zip_path): active_tasks[task_id].update({ 'status': 'completed', 'zip_path': zip_path, 'completed_at': datetime.now().isoformat() }) else: active_tasks[task_id].update({ 'status': 'error', 'error': "Erreur lors de la création de l'archive ZIP" }) except Exception as e: active_tasks[task_id].update({'status': 'error', 'error': str(e)}) print(f"Erreur majeure dans la tâche {task_id}: {e}") # ----------------------------------------------------------------------------- # Routes de l'API Flask # ----------------------------------------------------------------------------- @app.route('/') def index(): """Sert la page principale de l'application.""" return render_template('index.html') @app.route('/generated_pages/') def serve_generated_image(filename): """ Sert les fichiers images individuels pour l'affichage progressif sur la page web. """ return send_from_directory(UPLOAD_FOLDER, filename) @app.route('/generate', methods=['POST']) def generate(): """ Démarre une nouvelle tâche de génération de manga. """ try: data = request.get_json() if not data: return jsonify({'error': 'Aucune donnée JSON fournie'}), 400 manga_parts = {k: v for k, v in data.items() if k.startswith('partie-')} if not manga_parts: return jsonify({'error': 'Aucune "partie-X" trouvée dans les données'}), 400 task_id = str(uuid.uuid4()) active_tasks[task_id] = { 'status': 'queued', 'created_at': datetime.now().isoformat() } # Lancer la génération dans un thread pour ne pas bloquer la requête thread = threading.Thread(target=generate_manga_task, args=(manga_parts, task_id)) thread.daemon = True thread.start() return jsonify({'task_id': task_id, 'status': 'queued'}) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/status/') def get_status(task_id): """ Retourne le statut actuel d'une tâche de génération. """ if task_id not in active_tasks: return jsonify({'error': 'Tâche non trouvée'}), 404 task = active_tasks[task_id].copy() # Transformer les chemins de fichiers locaux en URLs accessibles par le navigateur if 'generated_files' in task: task['image_urls'] = [f"/generated_pages/{os.path.basename(p)}" for p in task['generated_files']] return jsonify(task) @app.route('/download/') def download_zip(task_id): """ Permet de télécharger l'archive ZIP finale. """ if task_id not in active_tasks: return jsonify({'error': 'Tâche non trouvée'}), 404 task = active_tasks[task_id] if task.get('status') != 'completed' or 'zip_path' not in task: return jsonify({'error': 'Archive ZIP non disponible ou non finalisée'}), 400 zip_path = task['zip_path'] if not os.path.exists(zip_path): return jsonify({'error': 'Fichier ZIP non trouvé sur le serveur'}), 404 return send_file(zip_path, as_attachment=True, download_name=f'manga_{task_id}.zip') # ----------------------------------------------------------------------------- # Démarrage de l'application # ----------------------------------------------------------------------------- if __name__ == '__main__': # Vérification de la présence de la clé API au démarrage if not os.environ.get("GEMINI_API_KEY"): print("\n⚠️ ATTENTION: La variable d'environnement GEMINI_API_KEY n'est pas définie!") print(" Sous Linux/macOS, utilisez: export GEMINI_API_KEY='votre_clé_api'") print(" Sous Windows (cmd), utilisez: set GEMINI_API_KEY=votre_clé_api\n") app.run(debug=True, host='0.0.0.0', port=5000)