import json import requests import re def normalize_line_name(line_name): """ Normalise les noms de lignes pour éviter les doublons causés par des préfixes ou variations. Exemples : - "RATP Métro 14" devient "Métro 14" - "SNCF RER A" devient "RER A" """ line_name = line_name.lower() # Convertir en minuscule pour uniformiser line_name = re.sub(r"\b(ratp|sncf)\b", "", line_name) # Supprimer les préfixes "RATP" ou "SNCF" line_name = re.sub(r"\s+", " ", line_name).strip() # Supprimer les espaces multiples et les espaces de début/fin return line_name.capitalize() # Capitaliser la première lettre pour cohérence # Configurer l'API headers = { 'accept': 'application/json', 'apiKey': '3WsgOWybmrTiEwa3q8ZsvovvwPkrctnX' # Remplacez par votre clé API } base_url_api = 'https://prim.iledefrance-mobilites.fr/marketplace/v2/navitia/line_reports/physical_modes/physical_mode%3A' categories = { "Métro": "Metro", "RER": "RapidTransit", "Bus": "Bus", "Transilien": "LocalTrain", "Tram": "Tramway" # Nouvelle catégorie } # Fonction pour récupérer les perturbations par catégorie def fetch_disruptions(category_key, category_value): url_api = f"{base_url_api}{category_value}/line_reports?" try: response_api = requests.get(url_api, headers=headers) response_api.raise_for_status() # Vérifie les erreurs HTTP data = response_api.json() # Convertit la réponse en JSON print(f"Données récupérées avec succès pour {category_key}.") return data.get("disruptions", []) except requests.exceptions.RequestException as e: print(f"Erreur lors de la récupération des données pour {category_key}: {e}") return [] def extract_lines_from_message(messages): """ Extraire les lignes mentionnées dans les messages d'une perturbation. """ lines = [] for msg in messages: text = msg.get("text", "") # Rechercher des noms de lignes tels que "Métro 1", "RER A", "Bus 72", etc. matches = re.findall(r"(Métro \d+|RER [A-Z]|Bus \d+|Transilien \w+|Tram \d+)", text) lines.extend(matches) return lines or ["Toutes les lignes"] # Extraire les perturbations actives pour chaque catégorie all_disruptions = {} for category, api_value in categories.items(): disruptions = fetch_disruptions(category, api_value) active_disruptions = [ { "id": disruption.get("id"), "cause": disruption.get("cause"), "severity": disruption.get("severity", {}).get("name", "N/A"), "effect": disruption.get("effect"), "message": disruption.get("messages")[0].get("text") if disruption.get("messages") else "No message", # Compléter les lignes impactées avec celles extraites des messages si nécessaire "impacted_lines": [ obj.get("pt_object", {}).get("name", "Unknown Line") for obj in disruption.get("impacted_objects", []) if obj.get("pt_object", {}).get("embedded_type") == "line" ] or extract_lines_from_message(disruption.get("messages", [])), } for disruption in disruptions if disruption.get("status") == "active" and all(excluded not in (disruption.get("cause", "").lower() + " " + " ".join([msg.get("text", "").lower() for msg in disruption.get("messages", [])])) for excluded in ["ascenseur", "transdev"]) ] all_disruptions[category] = active_disruptions # Générer le fichier HTML avec un récapitulatif filtré html_output_path = "./tmp/traffic_report.html" with open(html_output_path, "w", encoding="utf-8") as html_file: # Collecter les lignes ayant des problèmes sans doublons et filtrer par gravité pour le récapitulatif seen_lines = set() summary_data = [] # Gravités à inclure uniquement pour le récapitulatif allowed_severities = {"perturbée", "bloquante"} for category, disruptions in all_disruptions.items(): for disruption in disruptions: # Filtrer par gravité uniquement pour le récapitulatif severity = disruption.get("severity", "").lower() if severity not in allowed_severities: continue # Ignorer dans le récapitulatif si la gravité ne correspond pas for line in disruption["impacted_lines"]: # Normaliser le nom de la ligne avant de vérifier les doublons normalized_line = normalize_line_name(line) if normalized_line not in seen_lines: summary_data.append({ "category": category, "line": normalized_line, # Utiliser la version normalisée "cause": disruption["cause"], "severity": disruption["severity"] }) seen_lines.add(normalized_line) # Ajouter la version normalisée à l'ensemble # Générer le fichier HTML html_file.write(f""" Rapport des Perturbations

Rapport des Perturbations

Récapitulatif des Lignes Impactées (Perturbée / Bloquante)

""") # Ajouter les lignes impactées uniques au récapitulatif avec des couleurs selon la gravité for item in summary_data: severity_class = "severity-perturbee" if item["severity"].lower() == "perturbée" else "severity-bloquante" html_file.write(f""" """) # Fermer la section récapitulative html_file.write("""
Catégorie Ligne Cause Gravité
{item["category"]} {item["line"]} {item["cause"]} {item["severity"]}
""") # Reste de la génération des détails des perturbations for category, disruptions in all_disruptions.items(): html_file.write(f"

{category}

") html_file.write(""" """) for disruption in disruptions: for line in disruption["impacted_lines"]: html_file.write(f""" """) html_file.write("""
Ligne Cause Gravité Message
{line} {disruption["cause"]} {disruption["severity"]} {disruption["message"]}
""") # Fermer les balises HTML html_file.write(""" """) print(f"HTML report generated at {html_output_path}") def generate_traffic_html(): all_disruptions = {} for category, api_value in categories.items(): disruptions = fetch_disruptions(category, api_value) active_disruptions = [ { "id": disruption.get("id"), "cause": disruption.get("cause"), "severity": disruption.get("severity", {}).get("name", "N/A"), "effect": disruption.get("effect"), "message": disruption["messages"][0].get("text") if disruption.get("messages") else "No message", "impacted_lines": [ obj.get("pt_object", {}).get("name", "Unknown Line") for obj in disruption.get("impacted_objects", []) if obj.get("pt_object", {}).get("embedded_type") == "line" ] or extract_lines_from_message(disruption.get("messages", [])), } for disruption in disruptions if disruption.get("status") == "active" and all(excluded not in (disruption.get("cause", "").lower() + " " + " ".join([msg.get("text", "").lower() for msg in disruption.get("messages", [])])) for excluded in ["ascenseur", "transdev"]) ] all_disruptions[category] = active_disruptions # Construction des données du récapitulatif seen_lines = set() summary_data = [] allowed_severities = {"perturbée", "bloquante"} for category, disruptions in all_disruptions.items(): for disruption in disruptions: severity = disruption.get("severity", "").lower() if severity not in allowed_severities: continue for line in disruption["impacted_lines"]: normalized_line = normalize_line_name(line) if normalized_line not in seen_lines: summary_data.append({ "category": category, "line": normalized_line, "cause": disruption["cause"], "severity": disruption["severity"] }) seen_lines.add(normalized_line) # Génération du HTML avec un menu déroulant pour filtrer par catégorie # Récupérer la liste unique des catégories présentes categories_presentes = sorted(list(all_disruptions.keys())) html_content = f""" Rapport des Perturbations

Rapport des Perturbations

Récapitulatif des Lignes Impactées (Perturbée / Bloquante)

""" # Ajouter les lignes impactées uniques au récapitulatif for item in summary_data: severity_class = "severity-perturbee" if item["severity"].lower() == "perturbée" else "severity-bloquante" # Ajouter un data-attribute pour la catégorie html_content += f""" """ html_content += """
Catégorie Ligne Cause Gravité
{item["category"]} {item["line"]} {item["cause"]} {item["severity"]}
""" # Ajouter les détails des perturbations for category, disruptions in all_disruptions.items(): # Ajouter un data-category au

html_content += f'

{category}

' html_content += f""" """ for disruption in disruptions: for line in disruption["impacted_lines"]: sev_class = "severity-perturbee" if disruption["severity"].lower() == "perturbée" else "severity-bloquante" html_content += f""" """ html_content += """
Ligne Cause Gravité Message
{line} {disruption["cause"]} {disruption["severity"]} {disruption["message"]}
""" # Après avoir fini de générer le HTML, ajouter le script de filtrage html_content += """ """ return html_content