import json
import requests
import re
def normalize_line_name(line_name):
"""
Normalise les noms de lignes pour éviter les doublons causés par des préfixes ou variations.
Exemples :
- "RATP Métro 14" devient "Métro 14"
- "SNCF RER A" devient "RER A"
"""
line_name = line_name.lower() # Convertir en minuscule pour uniformiser
line_name = re.sub(r"\b(ratp|sncf)\b", "", line_name) # Supprimer les préfixes "RATP" ou "SNCF"
line_name = re.sub(r"\s+", " ", line_name).strip() # Supprimer les espaces multiples et les espaces de début/fin
return line_name.capitalize() # Capitaliser la première lettre pour cohérence
# Configurer l'API
headers = {
'accept': 'application/json',
'apiKey': '3WsgOWybmrTiEwa3q8ZsvovvwPkrctnX' # Remplacez par votre clé API
}
base_url_api = 'https://prim.iledefrance-mobilites.fr/marketplace/v2/navitia/line_reports/physical_modes/physical_mode%3A'
categories = {
"Métro": "Metro",
"RER": "RapidTransit",
"Bus": "Bus",
"Transilien": "LocalTrain",
"Tram": "Tramway" # Nouvelle catégorie
}
# Fonction pour récupérer les perturbations par catégorie
def fetch_disruptions(category_key, category_value):
url_api = f"{base_url_api}{category_value}/line_reports?"
try:
response_api = requests.get(url_api, headers=headers)
response_api.raise_for_status() # Vérifie les erreurs HTTP
data = response_api.json() # Convertit la réponse en JSON
print(f"Données récupérées avec succès pour {category_key}.")
return data.get("disruptions", [])
except requests.exceptions.RequestException as e:
print(f"Erreur lors de la récupération des données pour {category_key}: {e}")
return []
def extract_lines_from_message(messages):
"""
Extraire les lignes mentionnées dans les messages d'une perturbation.
"""
lines = []
for msg in messages:
text = msg.get("text", "")
# Rechercher des noms de lignes tels que "Métro 1", "RER A", "Bus 72", etc.
matches = re.findall(r"(Métro \d+|RER [A-Z]|Bus \d+|Transilien \w+|Tram \d+)", text)
lines.extend(matches)
return lines or ["Toutes les lignes"]
# Extraire les perturbations actives pour chaque catégorie
all_disruptions = {}
for category, api_value in categories.items():
disruptions = fetch_disruptions(category, api_value)
active_disruptions = [
{
"id": disruption.get("id"),
"cause": disruption.get("cause"),
"severity": disruption.get("severity", {}).get("name", "N/A"),
"effect": disruption.get("effect"),
"message": disruption.get("messages")[0].get("text") if disruption.get("messages") else "No message",
# Compléter les lignes impactées avec celles extraites des messages si nécessaire
"impacted_lines": [
obj.get("pt_object", {}).get("name", "Unknown Line")
for obj in disruption.get("impacted_objects", [])
if obj.get("pt_object", {}).get("embedded_type") == "line"
] or extract_lines_from_message(disruption.get("messages", [])),
}
for disruption in disruptions
if disruption.get("status") == "active"
and all(excluded not in (disruption.get("cause", "").lower() + " " +
" ".join([msg.get("text", "").lower() for msg in disruption.get("messages", [])]))
for excluded in ["ascenseur", "transdev"])
]
all_disruptions[category] = active_disruptions
# Générer le fichier HTML avec un récapitulatif filtré
html_output_path = "./tmp/traffic_report.html"
with open(html_output_path, "w", encoding="utf-8") as html_file:
# Collecter les lignes ayant des problèmes sans doublons et filtrer par gravité pour le récapitulatif
seen_lines = set()
summary_data = []
# Gravités à inclure uniquement pour le récapitulatif
allowed_severities = {"perturbée", "bloquante"}
for category, disruptions in all_disruptions.items():
for disruption in disruptions:
# Filtrer par gravité uniquement pour le récapitulatif
severity = disruption.get("severity", "").lower()
if severity not in allowed_severities:
continue # Ignorer dans le récapitulatif si la gravité ne correspond pas
for line in disruption["impacted_lines"]:
# Normaliser le nom de la ligne avant de vérifier les doublons
normalized_line = normalize_line_name(line)
if normalized_line not in seen_lines:
summary_data.append({
"category": category,
"line": normalized_line, # Utiliser la version normalisée
"cause": disruption["cause"],
"severity": disruption["severity"]
})
seen_lines.add(normalized_line) # Ajouter la version normalisée à l'ensemble
# Générer le fichier HTML
html_file.write(f"""
Rapport des Perturbations
Rapport des Perturbations
Récapitulatif des Lignes Impactées (Perturbée / Bloquante)
Catégorie
Ligne
Cause
Gravité
""")
# Ajouter les lignes impactées uniques au récapitulatif avec des couleurs selon la gravité
for item in summary_data:
severity_class = "severity-perturbee" if item["severity"].lower() == "perturbée" else "severity-bloquante"
html_file.write(f"""
{item["category"]}
{item["line"]}
{item["cause"]}
{item["severity"]}
""")
# Fermer la section récapitulative
html_file.write("""
""")
# Reste de la génération des détails des perturbations
for category, disruptions in all_disruptions.items():
html_file.write(f"
{category}
")
html_file.write("""
Ligne
Cause
Gravité
Message
""")
for disruption in disruptions:
for line in disruption["impacted_lines"]:
html_file.write(f"""
{line}
{disruption["cause"]}
{disruption["severity"]}
{disruption["message"]}
""")
html_file.write("""
""")
# Fermer les balises HTML
html_file.write("""
""")
print(f"HTML report generated at {html_output_path}")
def generate_traffic_html():
all_disruptions = {}
for category, api_value in categories.items():
disruptions = fetch_disruptions(category, api_value)
active_disruptions = [
{
"id": disruption.get("id"),
"cause": disruption.get("cause"),
"severity": disruption.get("severity", {}).get("name", "N/A"),
"effect": disruption.get("effect"),
"message": disruption["messages"][0].get("text") if disruption.get("messages") else "No message",
"impacted_lines": [
obj.get("pt_object", {}).get("name", "Unknown Line")
for obj in disruption.get("impacted_objects", [])
if obj.get("pt_object", {}).get("embedded_type") == "line"
] or extract_lines_from_message(disruption.get("messages", [])),
}
for disruption in disruptions
if disruption.get("status") == "active"
and all(excluded not in (disruption.get("cause", "").lower() + " " +
" ".join([msg.get("text", "").lower() for msg in disruption.get("messages", [])]))
for excluded in ["ascenseur", "transdev"])
]
all_disruptions[category] = active_disruptions
# Construction des données du récapitulatif
seen_lines = set()
summary_data = []
allowed_severities = {"perturbée", "bloquante"}
for category, disruptions in all_disruptions.items():
for disruption in disruptions:
severity = disruption.get("severity", "").lower()
if severity not in allowed_severities:
continue
for line in disruption["impacted_lines"]:
normalized_line = normalize_line_name(line)
if normalized_line not in seen_lines:
summary_data.append({
"category": category,
"line": normalized_line,
"cause": disruption["cause"],
"severity": disruption["severity"]
})
seen_lines.add(normalized_line)
# Génération du HTML avec un menu déroulant pour filtrer par catégorie
# Récupérer la liste unique des catégories présentes
categories_presentes = sorted(list(all_disruptions.keys()))
html_content = f"""
Rapport des Perturbations
Rapport des Perturbations
Récapitulatif des Lignes Impactées (Perturbée / Bloquante)
Catégorie
Ligne
Cause
Gravité
"""
# Ajouter les lignes impactées uniques au récapitulatif
for item in summary_data:
severity_class = "severity-perturbee" if item["severity"].lower() == "perturbée" else "severity-bloquante"
# Ajouter un data-attribute pour la catégorie
html_content += f"""
{item["category"]}
{item["line"]}
{item["cause"]}
{item["severity"]}
"""
html_content += """
"""
# Ajouter les détails des perturbations
for category, disruptions in all_disruptions.items():
# Ajouter un data-category au
html_content += f'
{category}
'
html_content += f"""
Ligne
Cause
Gravité
Message
"""
for disruption in disruptions:
for line in disruption["impacted_lines"]:
sev_class = "severity-perturbee" if disruption["severity"].lower() == "perturbée" else "severity-bloquante"
html_content += f"""
{line}
{disruption["cause"]}
{disruption["severity"]}
{disruption["message"]}
"""
html_content += """
"""
# Après avoir fini de générer le HTML, ajouter le script de filtrage
html_content += """
"""
return html_content