axel-darmouni's picture
update: docx use
2dd2794
import os
import pandas as pd
import json
import glob
from smolagents import tool
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
import numpy as np
@tool
def load_previous_dataset() -> pd.DataFrame:
"""
Load the dataset that was used in the previous analysis.
Returns:
The pandas DataFrame that was used in the previous report generation
"""
try:
# Look for saved dataset in generated_data folder
dataset_files = glob.glob('generated_data/*dataset*.csv') + glob.glob('generated_data/*data*.csv')
if not dataset_files:
# Try to find any CSV file in generated_data
csv_files = glob.glob('generated_data/*.csv')
if csv_files:
dataset_files = csv_files
if not dataset_files:
raise Exception("No dataset found in generated_data folder")
# Use the most recent dataset file
latest_file = max(dataset_files, key=os.path.getctime)
df = pd.read_csv(latest_file)
print(f"✅ Loaded dataset from {latest_file} with {len(df)} rows and {len(df.columns)} columns")
return df
except Exception as e:
raise Exception(f"Error loading previous dataset: {str(e)}")
@tool
def get_dataset_summary(df: pd.DataFrame) -> str:
"""
Get a comprehensive summary of the dataset structure and content.
Args:
df: The pandas DataFrame to analyze
Returns:
A formatted string with dataset summary information
"""
try:
summary_lines = []
summary_lines.append("=== DATASET SUMMARY ===")
summary_lines.append(f"Shape: {df.shape[0]} rows × {df.shape[1]} columns")
summary_lines.append("")
summary_lines.append("Column Information:")
for col in df.columns:
dtype = str(df[col].dtype)
non_null = df[col].count()
null_count = df[col].isnull().sum()
unique_count = df[col].nunique()
summary_lines.append(f" • {col}: {dtype}, {non_null} non-null, {null_count} null, {unique_count} unique")
# Show sample values for categorical columns
if df[col].dtype == 'object' and unique_count <= 10:
sample_values = df[col].value_counts().head(5).index.tolist()
summary_lines.append(f" Sample values: {sample_values}")
summary_lines.append("")
summary_lines.append("First 3 rows:")
summary_lines.append(df.head(3).to_string())
return "\n".join(summary_lines)
except Exception as e:
return f"Error analyzing dataset: {str(e)}"
@tool
def create_followup_visualization(df: pd.DataFrame, chart_type: str, x_column: str, y_column: str = None, title: str = "Follow-up Analysis", filename: str = "followup_chart.png") -> str:
"""
Create a visualization for follow-up analysis.
Args:
df: The pandas DataFrame to visualize
chart_type: Type of chart ('bar', 'line', 'scatter', 'histogram', 'box', 'pie')
x_column: Column name for x-axis
y_column: Column name for y-axis (optional for some chart types)
title: Title for the chart
filename: Name of the file to save (should end with .png)
Returns:
Path to the saved visualization file
"""
try:
plt.figure(figsize=(12, 8))
if chart_type == 'bar':
if y_column:
df_grouped = df.groupby(x_column)[y_column].sum().sort_values(ascending=False)
plt.bar(range(len(df_grouped)), df_grouped.values)
plt.xticks(range(len(df_grouped)), df_grouped.index, rotation=45)
plt.ylabel(y_column)
else:
value_counts = df[x_column].value_counts().head(10)
plt.bar(range(len(value_counts)), value_counts.values)
plt.xticks(range(len(value_counts)), value_counts.index, rotation=45)
plt.ylabel('Count')
elif chart_type == 'line':
if y_column:
df_sorted = df.sort_values(x_column)
plt.plot(df_sorted[x_column], df_sorted[y_column])
plt.ylabel(y_column)
else:
value_counts = df[x_column].value_counts().sort_index()
plt.plot(value_counts.index, value_counts.values)
plt.ylabel('Count')
elif chart_type == 'scatter':
if y_column:
plt.scatter(df[x_column], df[y_column], alpha=0.6)
plt.ylabel(y_column)
else:
raise ValueError("Scatter plot requires both x_column and y_column")
elif chart_type == 'histogram':
plt.hist(df[x_column], bins=30, alpha=0.7)
plt.ylabel('Frequency')
elif chart_type == 'box':
if y_column:
df.boxplot(column=y_column, by=x_column)
else:
plt.boxplot(df[x_column])
plt.ylabel(x_column)
elif chart_type == 'pie':
value_counts = df[x_column].value_counts().head(10)
plt.pie(value_counts.values, labels=value_counts.index, autopct='%1.1f%%')
else:
raise ValueError(f"Unsupported chart type: {chart_type}")
plt.xlabel(x_column)
plt.title(title)
plt.tight_layout()
# Save to generated_data folder
if not filename.endswith('.png'):
filename += '.png'
filepath = os.path.join('generated_data', filename)
plt.savefig(filepath, dpi=300, bbox_inches='tight')
plt.close()
return f"Visualization saved to: {filepath}"
except Exception as e:
plt.close() # Ensure plot is closed even on error
return f"Error creating visualization: {str(e)}"
@tool
def get_previous_report_content() -> str:
"""
Get the content of the previously generated report.
Returns:
The text content of the previous report for context
"""
try:
# Look for DOCX files in generated_data
report_files = glob.glob('generated_data/*.docx')
if not report_files:
return "No previous report found in generated_data folder"
# Use the most recent report file
latest_report = max(report_files, key=os.path.getctime)
# Try to extract basic text from DOCX file
docx_content = ""
try:
from docx import Document
doc = Document(latest_report)
paragraphs = []
for para in doc.paragraphs:
if para.text.strip():
paragraphs.append(para.text.strip())
docx_content = "\n".join(paragraphs[:10]) # First 10 paragraphs for context
except Exception as e:
docx_content = f"Could not extract text from DOCX: {str(e)}"
file_size = os.path.getsize(latest_report)
# Also look for any text files that might contain analysis
text_files = glob.glob('generated_data/*.txt')
text_content = ""
if text_files:
latest_text = max(text_files, key=os.path.getctime)
with open(latest_text, 'r', encoding='utf-8') as f:
text_content = f.read()
summary = f"""=== PREVIOUS REPORT CONTEXT ===
Report file: {latest_report}
File size: {file_size} bytes
Created: {os.path.getctime(latest_report)}
DOCX Report Content (first 10 paragraphs):
{docx_content}
Additional analysis content:
{text_content if text_content else 'No additional text content found'}
The report was generated from the dataset in the previous analysis.
You can use load_previous_dataset() to access the same data.
"""
return summary
except Exception as e:
return f"Error accessing previous report: {str(e)}"
@tool
def analyze_column_correlation(df: pd.DataFrame, column1: str, column2: str) -> str:
"""
Analyze correlation between two columns in the dataset.
Args:
df: The pandas DataFrame
column1: First column name
column2: Second column name
Returns:
Correlation analysis results
"""
try:
# Check if columns exist
if column1 not in df.columns or column2 not in df.columns:
return f"Error: One or both columns not found. Available columns: {list(df.columns)}"
# Convert to numeric if possible
try:
col1_numeric = pd.to_numeric(df[column1], errors='coerce')
col2_numeric = pd.to_numeric(df[column2], errors='coerce')
except:
return f"Error: Cannot convert columns to numeric for correlation analysis"
# Calculate correlation
correlation = col1_numeric.corr(col2_numeric)
# Create scatter plot
plt.figure(figsize=(10, 6))
plt.scatter(col1_numeric, col2_numeric, alpha=0.6)
plt.xlabel(column1)
plt.ylabel(column2)
plt.title(f'Correlation between {column1} and {column2}\nCorrelation coefficient: {correlation:.3f}')
# Add trend line
if not col1_numeric.isna().all() and not col2_numeric.isna().all():
z = np.polyfit(col1_numeric.dropna(), col2_numeric.dropna(), 1)
p = np.poly1d(z)
plt.plot(col1_numeric, p(col1_numeric), "r--", alpha=0.8)
plt.tight_layout()
# Save plot
filename = f'correlation_{column1}_{column2}.png'
filepath = os.path.join('generated_data', filename)
plt.savefig(filepath, dpi=300, bbox_inches='tight')
plt.close()
# Interpret correlation
if abs(correlation) > 0.7:
strength = "strong"
elif abs(correlation) > 0.4:
strength = "moderate"
elif abs(correlation) > 0.2:
strength = "weak"
else:
strength = "very weak"
direction = "positive" if correlation > 0 else "negative"
result = f"""=== CORRELATION ANALYSIS ===
Columns: {column1} vs {column2}
Correlation coefficient: {correlation:.3f}
Strength: {strength} {direction} correlation
Interpretation:
- The correlation is {strength} and {direction}
- Values closer to 1 or -1 indicate stronger linear relationships
- Values closer to 0 indicate weaker linear relationships
Visualization saved to: {filepath}
"""
return result
except Exception as e:
return f"Error in correlation analysis: {str(e)}"
@tool
def create_statistical_summary(df: pd.DataFrame, column_name: str) -> str:
"""
Create a comprehensive statistical summary with visualization for a specific column.
Args:
df: The pandas DataFrame
column_name: Name of the column to analyze
Returns:
Statistical summary and saves a visualization
"""
try:
if column_name not in df.columns:
return f"Error: Column '{column_name}' not found. Available columns: {list(df.columns)}"
column_data = df[column_name]
# Generate statistical summary
summary_lines = [f"=== STATISTICAL SUMMARY: {column_name} ==="]
if pd.api.types.is_numeric_dtype(column_data):
# Numeric column analysis
stats = column_data.describe()
summary_lines.extend([
f"Count: {stats['count']:.0f}",
f"Mean: {stats['mean']:.2f}",
f"Median: {stats['50%']:.2f}",
f"Standard Deviation: {stats['std']:.2f}",
f"Min: {stats['min']:.2f}",
f"Max: {stats['max']:.2f}",
f"25th Percentile: {stats['25%']:.2f}",
f"75th Percentile: {stats['75%']:.2f}",
])
# Create histogram and box plot
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
# Histogram
ax1.hist(column_data.dropna(), bins=30, alpha=0.7, color='skyblue', edgecolor='black')
ax1.set_title(f'Distribution of {column_name}')
ax1.set_xlabel(column_name)
ax1.set_ylabel('Frequency')
ax1.grid(True, alpha=0.3)
# Box plot
ax2.boxplot(column_data.dropna())
ax2.set_title(f'Box Plot of {column_name}')
ax2.set_ylabel(column_name)
ax2.grid(True, alpha=0.3)
else:
# Categorical column analysis
value_counts = column_data.value_counts()
summary_lines.extend([
f"Total unique values: {column_data.nunique()}",
f"Most frequent value: {value_counts.index[0]} ({value_counts.iloc[0]} times)",
f"Least frequent value: {value_counts.index[-1]} ({value_counts.iloc[-1]} times)",
"",
"Top 10 values:"
])
for value, count in value_counts.head(10).items():
percentage = (count / len(column_data)) * 100
summary_lines.append(f" {value}: {count} ({percentage:.1f}%)")
# Create bar chart and pie chart
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
# Bar chart
top_values = value_counts.head(10)
ax1.bar(range(len(top_values)), top_values.values, color='lightcoral')
ax1.set_title(f'Top 10 Values in {column_name}')
ax1.set_xlabel('Categories')
ax1.set_ylabel('Count')
ax1.set_xticks(range(len(top_values)))
ax1.set_xticklabels(top_values.index, rotation=45, ha='right')
ax1.grid(True, alpha=0.3)
# Pie chart (top 8 values + others)
top_8 = value_counts.head(8)
others_count = value_counts.iloc[8:].sum() if len(value_counts) > 8 else 0
if others_count > 0:
pie_data = list(top_8.values) + [others_count]
pie_labels = list(top_8.index) + ['Others']
else:
pie_data = top_8.values
pie_labels = top_8.index
ax2.pie(pie_data, labels=pie_labels, autopct='%1.1f%%', startangle=90)
ax2.set_title(f'Distribution of {column_name}')
plt.tight_layout()
# Save the plot
filename = f'statistical_summary_{column_name}.png'
filepath = os.path.join('generated_data', filename)
plt.savefig(filepath, dpi=300, bbox_inches='tight')
plt.close()
summary_lines.append(f"\nVisualization saved to: {filepath}")
return "\n".join(summary_lines)
except Exception as e:
return f"Error in statistical analysis: {str(e)}"
@tool
def filter_and_visualize_data(df: pd.DataFrame, filter_column: str, filter_value: str, analysis_column: str, chart_type: str = "bar") -> str:
"""
Filter the dataset and create a visualization of the filtered data.
Args:
df: The pandas DataFrame
filter_column: Column to filter by
filter_value: Value to filter for (can be partial match for string columns)
analysis_column: Column to analyze in the filtered data
chart_type: Type of chart to create ('bar', 'line', 'histogram', 'pie')
Returns:
Analysis results and saves a visualization
"""
try:
if filter_column not in df.columns:
return f"Error: Filter column '{filter_column}' not found. Available columns: {list(df.columns)}"
if analysis_column not in df.columns:
return f"Error: Analysis column '{analysis_column}' not found. Available columns: {list(df.columns)}"
# Filter the data
if df[filter_column].dtype == 'object':
# String filtering - partial match
filtered_df = df[df[filter_column].str.contains(filter_value, case=False, na=False)]
else:
# Numeric filtering - exact match
try:
filter_value_numeric = float(filter_value)
filtered_df = df[df[filter_column] == filter_value_numeric]
except ValueError:
return f"Error: Cannot convert '{filter_value}' to numeric for filtering"
if filtered_df.empty:
return f"No data found matching filter: {filter_column} = '{filter_value}'"
result_lines = [
f"=== FILTERED DATA ANALYSIS ===",
f"Filter: {filter_column} contains/equals '{filter_value}'",
f"Filtered dataset size: {len(filtered_df)} rows (from {len(df)} total)",
f"Analysis column: {analysis_column}",
""
]
# Analyze the filtered data
analysis_data = filtered_df[analysis_column]
plt.figure(figsize=(12, 8))
if chart_type == "bar":
if pd.api.types.is_numeric_dtype(analysis_data):
# For numeric data, create bins
analysis_data.hist(bins=20, alpha=0.7, color='lightblue', edgecolor='black')
plt.ylabel('Frequency')
else:
# For categorical data, show value counts
value_counts = analysis_data.value_counts().head(15)
plt.bar(range(len(value_counts)), value_counts.values, color='lightcoral')
plt.xticks(range(len(value_counts)), value_counts.index, rotation=45, ha='right')
plt.ylabel('Count')
# Add statistics to result
result_lines.extend([
f"Top value: {value_counts.index[0]} ({value_counts.iloc[0]} occurrences)",
f"Total unique values: {analysis_data.nunique()}"
])
elif chart_type == "line":
if pd.api.types.is_numeric_dtype(analysis_data):
sorted_data = analysis_data.sort_values()
plt.plot(range(len(sorted_data)), sorted_data.values, marker='o', alpha=0.7)
plt.ylabel(analysis_column)
plt.xlabel('Sorted Index')
else:
return "Line chart requires numeric data for analysis column"
elif chart_type == "histogram":
if pd.api.types.is_numeric_dtype(analysis_data):
plt.hist(analysis_data.dropna(), bins=30, alpha=0.7, color='green', edgecolor='black')
plt.ylabel('Frequency')
# Add statistics
mean_val = analysis_data.mean()
median_val = analysis_data.median()
result_lines.extend([
f"Mean: {mean_val:.2f}",
f"Median: {median_val:.2f}",
f"Standard Deviation: {analysis_data.std():.2f}"
])
else:
return "Histogram requires numeric data for analysis column"
elif chart_type == "pie":
value_counts = analysis_data.value_counts().head(10)
plt.pie(value_counts.values, labels=value_counts.index, autopct='%1.1f%%', startangle=90)
plt.title(f'{chart_type.title()} Chart: {analysis_column}\nFiltered by {filter_column} = "{filter_value}"')
plt.xlabel(analysis_column)
plt.tight_layout()
# Save the plot
filename = f'filtered_{filter_column}_{filter_value}_{analysis_column}_{chart_type}.png'
# Clean filename
filename = "".join(c for c in filename if c.isalnum() or c in ('_', '-', '.')).rstrip()
filepath = os.path.join('generated_data', filename)
plt.savefig(filepath, dpi=300, bbox_inches='tight')
plt.close()
result_lines.append(f"\nVisualization saved to: {filepath}")
return "\n".join(result_lines)
except Exception as e:
return f"Error in filtered analysis: {str(e)}"