import os import pandas as pd from datetime import date import gradio as gr import networkx as nx import ast import plotly.graph_objects as go import plotly.express as px # Load the CSV file df = pd.read_csv("https://jeodpp.jrc.ec.europa.eu/ftp/jrc-opendata/ETOHA/storylines/emdat2.csv", sep=',', header=0, dtype=str, encoding='utf-8') def try_parse_date(y, m, d): try: if not y or not m or not d: return None return date(int(float(y)), int(float(m)), int(float(d))) except (ValueError, TypeError): return None def plot_cgraph(grp): if not grp: return None source, relations, target = list(zip(*grp)) kg_df = pd.DataFrame({'source': source, 'target': target, 'edge': relations}) G = nx.from_pandas_edgelist(kg_df, "source", "target", edge_attr='edge', create_using=nx.MultiDiGraph()) pos = nx.spring_layout(G, k=1.5, iterations=100) # Separate edges based on their color edge_colors_dict = {"causes": "red", "prevents": "green"} traces = [] for color in edge_colors_dict.values(): edge_x = [] edge_y = [] for u, v, key in G.edges(keys=True): current_color = edge_colors_dict.get(G[u][v][key]['edge'], 'black') if current_color == color: x0, y0 = pos[u] x1, y1 = pos[v] edge_x.extend([x0, x1, None]) edge_y.extend([y0, y1, None]) trace = go.Scatter(x=edge_x, y=edge_y, line=dict(width=2, color=color), hoverinfo='none', mode='lines') traces.append(trace) node_x = [] node_y = [] node_text = [] for node in G.nodes(): x, y = pos[node] node_x.append(x) node_y.append(y) node_text.append(node) node_trace = go.Scatter( x=node_x, y=node_y, mode='markers+text', text=node_text, marker=dict(size=10, color='skyblue', line_width=2), textposition="top center", hoverinfo='text' ) traces.append(node_trace) fig = go.Figure(data=traces, layout=go.Layout(showlegend=False, hovermode='closest', margin=dict(b=20, l=5, r=5, t=40))) return fig def display_info(selected_row_str, country, year, month, day): additional_fields = [ "Country", "ISO", "Subregion", "Region", "Location", "Origin", "Disaster Group", "Disaster Subgroup", "Disaster Type", "Disaster Subtype", "External IDs", "Event Name", "Associated Types", "OFDA/BHA Response", "Appeal", "Declaration", "AID Contribution ('000 US$)", "Magnitude", "Magnitude Scale", "Latitude", "Longitude", "River Basin", "Total Deaths", "No. Injured", "No. Affected", "No. Homeless", "Total Affected", "Reconstruction Costs ('000 US$)", "Reconstruction Costs, Adjusted ('000 US$)", "Insured Damage ('000 US$)", "Insured Damage, Adjusted ('000 US$)", "Total Damage ('000 US$)", "Total Damage, Adjusted ('000 US$)", "CPI", "Admin Units", ] if selected_row_str is None or selected_row_str == '': return ('', '', '', '', '', '', '', None, '', '') + tuple([''] * len(additional_fields)) filtered_df = df if country: filtered_df = filtered_df[filtered_df['Country'] == country] selected_date = try_parse_date(year, month, day) if selected_date: filtered_df = filtered_df[filtered_df.apply( lambda row: ( (try_parse_date(row['Start Year'], "01" if row['Start Month'] == "" else row['Start Month'], "01" if row['Start Day'] == "" else row['Start Day']) is not None) and (try_parse_date(row['End Year'], "01" if row['End Month'] == "" else row['End Month'], "01" if row['End Day'] == "" else row['End Day']) is not None) and (try_parse_date(row['Start Year'], "01" if row['Start Month'] == "" else row['Start Month'], "01" if row['Start Day'] == "" else row['Start Day']) <= selected_date <= try_parse_date(row['End Year'], "01" if row['End Month'] == "" else row['End Month'], "01" if row['End Day'] == "" else row['End Day'])) ), axis=1)] else: if year: sstart = None eend = None if month: try: sstart = try_parse_date(year, month, "01") eend = try_parse_date(year, int(float(month)) + 1, "01") except Exception as err: sstart = None eend = None if sstart and eend: filtered_df = filtered_df[filtered_df.apply( lambda row: ( (try_parse_date(row['Start Year'], "01" if row['Start Month'] == "" else row['Start Month'], "01" if row['Start Day'] == "" else row['Start Day']) is not None) and (sstart <= try_parse_date(row['Start Year'], "01" if row['Start Month'] == "" else row['Start Month'], "01" if row['Start Day'] == "" else row['Start Day']) < eend) ), axis=1)] else: try: sstart = try_parse_date(year, "01", "01") eend = try_parse_date(year, "12", "31") except Exception as err: sstart = None eend = None if sstart and eend: filtered_df = filtered_df[filtered_df.apply( lambda row: ( (try_parse_date(row['Start Year'], "01" if row['Start Month'] == "" else row['Start Month'], "01" if row['Start Day'] == "" else row['Start Day']) is not None) and (sstart <= try_parse_date(row['Start Year'], "01" if row['Start Month'] == "" else row['Start Month'], "01" if row['Start Day'] == "" else row['Start Day']) <= eend) ), axis=1)] row_data = filtered_df[filtered_df['DisNo.'] == selected_row_str].squeeze() if not row_data.empty: key_information = row_data.get('key information', '') severity = row_data.get('severity', '') key_drivers = row_data.get('key drivers', '') impacts_exposure_vulnerability = row_data.get('main impacts, exposure, and vulnerability', '') likelihood_multi_hazard = row_data.get('likelihood of multi-hazard risks', '') best_practices = row_data.get('best practices for managing this risk', '') recommendations = row_data.get('recommendations and supportive measures for recovery', '') causal_graph_caption = row_data.get('causal graph', '') grp = ast.literal_eval(causal_graph_caption) if causal_graph_caption else [] causal_graph_plot = plot_cgraph(grp) start_date = try_parse_date(row_data['Start Year'], row_data['Start Month'], row_data['Start Day']) start_date_str = start_date.strftime('%Y-%m-%d') if start_date else 'N/A' end_date = try_parse_date(row_data['End Year'], row_data['End Month'], row_data['End Day']) end_date_str = end_date.strftime('%Y-%m-%d') if end_date else 'N/A' additional_data = [row_data.get(field, '') for field in additional_fields] return ( key_information, severity, key_drivers, impacts_exposure_vulnerability, likelihood_multi_hazard, best_practices, recommendations, causal_graph_plot, start_date_str, end_date_str ) + tuple(additional_data) else: return ('', '', '', '', '', '', '', None, '', '') + tuple([''] * len(additional_fields)) def update_row_dropdown(country, year, month, day): filtered_df = df if country: filtered_df = filtered_df[filtered_df['Country'] == country] selected_date = try_parse_date(year, month, day) if selected_date: filtered_df = filtered_df[filtered_df.apply( lambda row: ( (try_parse_date(row['Start Year'], "01" if row['Start Month'] == "" else row['Start Month'], "01" if row['Start Day'] == "" else row['Start Day']) is not None) and (try_parse_date(row['End Year'], "01" if row['End Month'] == "" else row['End Month'], "01" if row['End Day'] == "" else row['End Day']) is not None) and (try_parse_date(row['Start Year'], "01" if row['Start Month'] == "" else row['Start Month'], "01" if row['Start Day'] == "" else row['Start Day']) <= selected_date <= try_parse_date(row['End Year'], "01" if row['End Month'] == "" else row['End Month'], "01" if row['End Day'] == "" else row['End Day'])) ), axis=1)] else: if year: sstart = None eend = None if month: try: sstart = try_parse_date(year, month, "01") eend = try_parse_date(year, int(float(month)) + 1, "01") except Exception as err: sstart = None eend = None if sstart and eend: filtered_df = filtered_df[filtered_df.apply( lambda row: ( (try_parse_date(row['Start Year'], "01" if row['Start Month'] == "" else row['Start Month'], "01" if row['Start Day'] == "" else row['Start Day']) is not None) and (sstart <= try_parse_date(row['Start Year'], "01" if row['Start Month'] == "" else row['Start Month'], "01" if row['Start Day'] == "" else row['Start Day']) < eend) ), axis=1)] else: try: sstart = try_parse_date(year, "01", "01") eend = try_parse_date(year, "12", "31") except Exception as err: sstart = None eend = None if sstart and eend: filtered_df = filtered_df[filtered_df.apply( lambda row: ( (try_parse_date(row['Start Year'], "01" if row['Start Month'] == "" else row['Start Month'], "01" if row['Start Day'] == "" else row['Start Day']) is not None) and (sstart <= try_parse_date(row['Start Year'], "01" if row['Start Month'] == "" else row['Start Month'], "01" if row['Start Day'] == "" else row['Start Day']) <= eend) ), axis=1)] choices = filtered_df['DisNo.'].tolist() if not filtered_df.empty else [] return gr.update(choices=choices, value=choices[0] if choices else None) def build_interface(): with gr.Blocks() as interface: gr.Markdown("## From Data to Narratives: AI-Enhanced Disaster and Health Threats Storylines") gr.Markdown( "This Gradio app complements Health Threats and Disaster event data...
" "Select an event data below..." ) if not df.empty: start_years = df["Start Year"].dropna().unique() end_years = df["End Year"].dropna().unique() years = set(start_years.astype(int).tolist() + end_years.astype(int).tolist()) year_choices = sorted(years) else: year_choices = [] country_dropdown = gr.Dropdown(choices=[''] + df['Country'].unique().tolist(), label="Select Country") year_dropdown = gr.Dropdown(choices=[""] + [str(year) for year in year_choices], label="Select Year") month_dropdown = gr.Dropdown(choices=[""] + [f"{i:02d}" for i in range(1, 13)], label="Select Month") day_dropdown = gr.Dropdown(choices=[""] + [f"{i:02d}" for i in range(1, 32)], label="Select Day") row_dropdown = gr.Dropdown(choices=[], label="Select Disaster Event #", interactive=True) additional_fields = [ "Country", "ISO", "Subregion", "Region", "Location", "Origin", "Disaster Group", "Disaster Subgroup", "Disaster Type", "Disaster Subtype", "External IDs", "Event Name", "Associated Types", "OFDA/BHA Response", "Appeal", "Declaration", "AID Contribution ('000 US$)", "Magnitude", "Magnitude Scale", "Latitude", "Longitude", "River Basin", "Total Deaths", "No. Injured", "No. Affected", "No. Homeless", "Total Affected", "Reconstruction Costs ('000 US$)", "Reconstruction Costs, Adjusted ('000 US$)", "Insured Damage ('000 US$)", "Insured Damage, Adjusted ('000 US$)", "Total Damage ('000 US$)", "Total Damage, Adjusted ('000 US$)", "CPI", "Admin Units", ] with gr.Row(): with gr.Column(): country_dropdown year_dropdown month_dropdown day_dropdown row_dropdown outputs = [ gr.Textbox(label="Key Information", interactive=False), gr.Textbox(label="Severity", interactive=False), gr.Textbox(label="Key Drivers", interactive=False), gr.Textbox(label="Main Impacts, Exposure, and Vulnerability", interactive=False), gr.Textbox(label="Likelihood of Multi-Hazard Risks", interactive=False), gr.Textbox(label="Best Practices for Managing This Risk", interactive=False), gr.Textbox(label="Recommendations and Supportive Measures for Recovery", interactive=False), gr.Plot(label="Causal Graph") ] with gr.Column(): outputs.extend([ gr.Textbox(label="Start Date", interactive=False), gr.Textbox(label="End Date", interactive=False) ]) for field in additional_fields: outputs.append(gr.Textbox(label=field, interactive=False)) country_dropdown.change( fn=update_row_dropdown, inputs=[country_dropdown, year_dropdown, month_dropdown, day_dropdown], outputs=row_dropdown ) year_dropdown.change( fn=update_row_dropdown, inputs=[country_dropdown, year_dropdown, month_dropdown, day_dropdown], outputs=row_dropdown ) month_dropdown.change( fn=update_row_dropdown, inputs=[country_dropdown, year_dropdown, month_dropdown, day_dropdown], outputs=row_dropdown ) day_dropdown.change( fn=update_row_dropdown, inputs=[country_dropdown, year_dropdown, month_dropdown, day_dropdown], outputs=row_dropdown ) row_dropdown.change( fn=display_info, inputs=[row_dropdown, country_dropdown, year_dropdown, month_dropdown, day_dropdown], outputs=outputs ) return interface app = build_interface() app.launch()