neural-data-anlayst / advanced_features.py
Dhruv Pawar
Initial commit: Neural Data Analyst v1.0
1273036
import streamlit as st
import pandas as pd
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import json
from datetime import datetime, timedelta
from typing import Dict, List, Any
import base64
from io import BytesIO
# Additional advanced features for Neural Data Analyst
class AdvancedFeatures:
"""Advanced features and utilities for the Neural Data Analyst"""
def __init__(self, db_manager):
self.db_manager = db_manager
def render_advanced_analytics_dashboard(self, df: pd.DataFrame):
"""Render advanced analytics dashboard"""
st.markdown("## πŸ”¬ Advanced Analytics Dashboard")
tabs = st.tabs([
"πŸ“Š Interactive Plots",
"🎯 Smart Recommendations",
"πŸ“ˆ Trend Analysis",
"πŸ” Anomaly Detection",
"πŸ“‹ Report Generator"
])
with tabs[0]:
self.render_interactive_plots(df)
with tabs[1]:
self.render_smart_recommendations(df)
with tabs[2]:
self.render_trend_analysis(df)
with tabs[3]:
self.render_anomaly_detection(df)
with tabs[4]:
self.render_report_generator(df)
def render_interactive_plots(self, df: pd.DataFrame):
"""Render interactive plotting interface"""
st.markdown("### πŸ“Š Interactive Plot Builder")
col1, col2, col3 = st.columns(3)
with col1:
plot_type = st.selectbox(
"Plot Type",
["Scatter", "Line", "Bar", "Histogram", "Box", "Violin", "Heatmap", "3D Scatter"]
)
with col2:
x_column = st.selectbox("X-axis", df.columns)
with col3:
y_column = st.selectbox("Y-axis", df.columns)
# Color and size options
col1, col2 = st.columns(2)
with col1:
color_column = st.selectbox("Color by", ["None"] + list(df.columns))
with col2:
size_column = st.selectbox("Size by", ["None"] + list(df.select_dtypes(include=[np.number]).columns))
# Generate plot based on selections
if st.button("🎨 Generate Plot"):
fig = self.create_dynamic_plot(df, plot_type, x_column, y_column, color_column, size_column)
if fig:
st.plotly_chart(fig, use_container_width=True)
# Plot gallery
with st.expander("πŸ–ΌοΈ Quick Plot Gallery"):
self.render_plot_gallery(df)
def create_dynamic_plot(self, df: pd.DataFrame, plot_type: str, x_col: str, y_col: str,
color_col: str = None, size_col: str = None):
"""Create dynamic plot based on user selections"""
try:
kwargs = {
'data_frame': df,
'x': x_col,
'title': f'{plot_type} Plot: {x_col} vs {y_col}'
}
if y_col and y_col != x_col:
kwargs['y'] = y_col
if color_col and color_col != "None":
kwargs['color'] = color_col
if size_col and size_col != "None" and plot_type in ["Scatter", "3D Scatter"]:
kwargs['size'] = size_col
if plot_type == "Scatter":
fig = px.scatter(**kwargs)
elif plot_type == "Line":
fig = px.line(**kwargs)
elif plot_type == "Bar":
fig = px.bar(**kwargs)
elif plot_type == "Histogram":
fig = px.histogram(df, x=x_col, title=f'Histogram: {x_col}')
elif plot_type == "Box":
fig = px.box(**kwargs)
elif plot_type == "Violin":
fig = px.violin(**kwargs)
elif plot_type == "3D Scatter":
z_col = st.selectbox("Z-axis", df.select_dtypes(include=[np.number]).columns)
kwargs['z'] = z_col
fig = px.scatter_3d(**kwargs)
elif plot_type == "Heatmap":
numeric_df = df.select_dtypes(include=[np.number])
corr_matrix = numeric_df.corr()
fig = px.imshow(corr_matrix, text_auto=True, title="Correlation Heatmap")
else:
return None
fig.update_layout(
plot_bgcolor='rgba(0,0,0,0)',
paper_bgcolor='rgba(0,0,0,0)',
font=dict(color='white')
)
return fig
except Exception as e:
st.error(f"Error creating plot: {str(e)}")
return None
def render_plot_gallery(self, df: pd.DataFrame):
"""Render quick plot gallery"""
numeric_cols = df.select_dtypes(include=[np.number]).columns
if len(numeric_cols) >= 2:
col1, col2 = st.columns(2)
with col1:
# Quick correlation plot
fig = px.scatter(df, x=numeric_cols[0], y=numeric_cols[1],
title="Quick Correlation View")
fig.update_layout(height=300)
st.plotly_chart(fig, use_container_width=True)
with col2:
# Quick distribution plot
fig = px.histogram(df, x=numeric_cols[0], title="Quick Distribution")
fig.update_layout(height=300)
st.plotly_chart(fig, use_container_width=True)
def render_smart_recommendations(self, df: pd.DataFrame):
"""Render smart analysis recommendations"""
st.markdown("### 🎯 Smart Analysis Recommendations")
recommendations = self.generate_analysis_recommendations(df)
for i, rec in enumerate(recommendations):
with st.expander(f"πŸ’‘ {rec['title']}", expanded=i == 0):
st.markdown(f"**Recommendation:** {rec['description']}")
st.markdown(f"**Rationale:** {rec['rationale']}")
if st.button(f"Apply Recommendation", key=f"apply_rec_{i}"):
self.apply_recommendation(df, rec)
def generate_analysis_recommendations(self, df: pd.DataFrame) -> List[Dict[str, str]]:
"""Generate smart analysis recommendations"""
recommendations = []
numeric_cols = df.select_dtypes(include=[np.number]).columns
categorical_cols = df.select_dtypes(include=['object', 'category']).columns
# Missing data recommendation
missing_data = df.isnull().sum()
high_missing = missing_data[missing_data > len(df) * 0.1]
if len(high_missing) > 0:
recommendations.append({
'title': 'Missing Data Analysis',
'description': f'Analyze missing data patterns in {len(high_missing)} columns with >10% missing values',
'rationale': 'Understanding missing data patterns can reveal data collection issues or systematic biases',
'action': 'missing_analysis'
})
# Correlation analysis recommendation
if len(numeric_cols) > 2:
recommendations.append({
'title': 'Correlation Deep Dive',
'description': 'Perform comprehensive correlation analysis with feature selection recommendations',
'rationale': 'Identifying highly correlated features can improve model performance and interpretability',
'action': 'correlation_analysis'
})
# Outlier detection recommendation
if len(numeric_cols) > 0:
recommendations.append({
'title': 'Outlier Detection & Treatment',
'description': 'Identify and analyze outliers using multiple statistical methods',
'rationale': 'Outliers can significantly impact analysis results and model performance',
'action': 'outlier_analysis'
})
# Segmentation recommendation
if len(categorical_cols) > 0 and len(numeric_cols) > 0:
recommendations.append({
'title': 'Customer/Data Segmentation',
'description': 'Perform clustering analysis to identify natural data segments',
'rationale': 'Segmentation can reveal hidden patterns and improve targeted strategies',
'action': 'segmentation_analysis'
})
# Time series recommendation
date_cols = df.select_dtypes(include=['datetime64']).columns
if len(date_cols) > 0:
recommendations.append({
'title': 'Time Series Analysis',
'description': 'Analyze temporal patterns and trends in your data',
'rationale': 'Time-based analysis can reveal seasonality, trends, and forecasting opportunities',
'action': 'time_series_analysis'
})
return recommendations
def apply_recommendation(self, df: pd.DataFrame, recommendation: Dict[str, str]):
"""Apply a smart recommendation"""
action = recommendation.get('action')
if action == 'missing_analysis':
self.perform_missing_analysis(df)
elif action == 'correlation_analysis':
self.perform_correlation_analysis(df)
elif action == 'outlier_analysis':
self.perform_outlier_analysis(df)
elif action == 'segmentation_analysis':
self.perform_segmentation_analysis(df)
elif action == 'time_series_analysis':
self.perform_time_series_analysis(df)
def perform_missing_analysis(self, df: pd.DataFrame):
"""Perform detailed missing data analysis"""
st.markdown("#### πŸ” Missing Data Analysis Results")
missing_data = df.isnull().sum()
missing_percent = (missing_data / len(df)) * 100
missing_df = pd.DataFrame({
'Column': missing_data.index,
'Missing_Count': missing_data.values,
'Missing_Percentage': missing_percent.values
})
missing_df = missing_df[missing_df['Missing_Count'] > 0].sort_values('Missing_Percentage', ascending=False)
if len(missing_df) > 0:
fig = px.bar(missing_df, x='Column', y='Missing_Percentage',
title='Missing Data by Column (%)')
fig.update_layout(height=400)
st.plotly_chart(fig, use_container_width=True)
st.dataframe(missing_df, use_container_width=True)
else:
st.success("βœ… No missing data found in the dataset!")
def perform_correlation_analysis(self, df: pd.DataFrame):
"""Perform detailed correlation analysis"""
st.markdown("#### πŸ”— Advanced Correlation Analysis")
numeric_df = df.select_dtypes(include=[np.number])
if len(numeric_df.columns) > 1:
corr_matrix = numeric_df.corr()
# Hierarchical clustering of correlations
from scipy.cluster.hierarchy import linkage, dendrogram
from scipy.spatial.distance import squareform
distance_matrix = 1 - np.abs(corr_matrix)
condensed_distances = squareform(distance_matrix, checks=False)
linkage_matrix = linkage(condensed_distances, method='average')
fig = go.Figure()
dendro = dendrogram(linkage_matrix, labels=corr_matrix.columns, no_plot=True)
# Create dendrogram plot
for i in range(len(dendro['icoord'])):
x = dendro['icoord'][i]
y = dendro['dcoord'][i]
fig.add_trace(go.Scatter(x=x, y=y, mode='lines',
line=dict(color='gold', width=2),
showlegend=False))
fig.update_layout(
title="Feature Clustering Dendrogram",
xaxis_title="Features",
yaxis_title="Distance",
height=400
)
st.plotly_chart(fig, use_container_width=True)
def render_trend_analysis(self, df: pd.DataFrame):
"""Render trend analysis interface"""
st.markdown("### πŸ“ˆ Trend Analysis")
date_cols = df.select_dtypes(include=['datetime64']).columns
numeric_cols = df.select_dtypes(include=[np.number]).columns
if len(date_cols) == 0:
st.warning("No datetime columns found. Try converting date columns to datetime format.")
# Offer to convert columns
potential_date_cols = [col for col in df.columns if 'date' in col.lower() or 'time' in col.lower()]
if potential_date_cols:
date_col = st.selectbox("Select date column to convert:", potential_date_cols)
if st.button("Convert to DateTime"):
try:
df[date_col] = pd.to_datetime(df[date_col])
st.success(f"Converted {date_col} to datetime!")
st.experimental_rerun()
except Exception as e:
st.error(f"Conversion failed: {str(e)}")
return
col1, col2 = st.columns(2)
with col1:
date_col = st.selectbox("Date Column", date_cols)
with col2:
value_col = st.selectbox("Value Column", numeric_cols)
if st.button("πŸ” Analyze Trends"):
self.perform_trend_analysis(df, date_col, value_col)
def perform_trend_analysis(self, df: pd.DataFrame, date_col: str, value_col: str):
"""Perform trend analysis"""
st.markdown("#### πŸ“Š Trend Analysis Results")
# Time series plot
fig = px.line(df.sort_values(date_col), x=date_col, y=value_col,
title=f'{value_col} Over Time')
fig.update_layout(height=400)
st.plotly_chart(fig, use_container_width=True)
# Rolling statistics
df_sorted = df.sort_values(date_col).copy()
df_sorted['7_day_avg'] = df_sorted[value_col].rolling(window=7, min_periods=1).mean()
df_sorted['30_day_avg'] = df_sorted[value_col].rolling(window=30, min_periods=1).mean()
fig = go.Figure()
fig.add_trace(go.Scatter(x=df_sorted[date_col], y=df_sorted[value_col],
name='Original', mode='lines'))
fig.add_trace(go.Scatter(x=df_sorted[date_col], y=df_sorted['7_day_avg'],
name='7-Day Average', mode='lines'))
fig.add_trace(go.Scatter(x=df_sorted[date_col], y=df_sorted['30_day_avg'],
name='30-Day Average', mode='lines'))
fig.update_layout(title="Trend with Moving Averages", height=400)
st.plotly_chart(fig, use_container_width=True)
def render_anomaly_detection(self, df: pd.DataFrame):
"""Render anomaly detection interface"""
st.markdown("### πŸ” Anomaly Detection")
numeric_cols = df.select_dtypes(include=[np.number]).columns
if len(numeric_cols) == 0:
st.warning("No numeric columns found for anomaly detection.")
return
col1, col2 = st.columns(2)
with col1:
target_col = st.selectbox("Target Column", numeric_cols)
with col2:
method = st.selectbox("Detection Method",
["IQR", "Z-Score", "Isolation Forest", "Local Outlier Factor"])
if st.button("🎯 Detect Anomalies"):
self.perform_anomaly_detection(df, target_col, method)
def perform_anomaly_detection(self, df: pd.DataFrame, target_col: str, method: str):
"""Perform anomaly detection"""
st.markdown("#### 🎯 Anomaly Detection Results")
data = df[target_col].dropna()
anomalies = []
if method == "IQR":
Q1 = data.quantile(0.25)
Q3 = data.quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
anomalies = df[(df[target_col] < lower_bound) | (df[target_col] > upper_bound)]
elif method == "Z-Score":
z_scores = np.abs((data - data.mean()) / data.std())
anomalies = df[z_scores > 3]
elif method == "Isolation Forest":
from sklearn.ensemble import IsolationForest
iso_forest = IsolationForest(contamination=0.1, random_state=42)
outlier_labels = iso_forest.fit_predict(data.values.reshape(-1, 1))
anomalies = df[outlier_labels == -1]
elif method == "Local Outlier Factor":
from sklearn.neighbors import LocalOutlierFactor
lof = LocalOutlierFactor(n_neighbors=20, contamination=0.1)
outlier_labels = lof.fit_predict(data.values.reshape(-1, 1))
anomalies = df[outlier_labels == -1]
# Visualization
fig = go.Figure()
# Normal data points
normal_data = df[~df.index.isin(anomalies.index)]
fig.add_trace(go.Scatter(
x=normal_data.index,
y=normal_data[target_col],
mode='markers',
name='Normal',
marker=dict(color='blue', size=6)
))
# Anomalies
fig.add_trace(go.Scatter(
x=anomalies.index,
y=anomalies[target_col],
mode='markers',
name='Anomalies',
marker=dict(color='red', size=10, symbol='x')
))
fig.update_layout(
title=f'Anomaly Detection: {target_col} ({method})',
xaxis_title='Index',
yaxis_title=target_col,
height=500
)
st.plotly_chart(fig, use_container_width=True)
# Summary
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Total Data Points", len(df))
with col2:
st.metric("Anomalies Found", len(anomalies))
with col3:
st.metric("Anomaly Rate", f"{len(anomalies)/len(df)*100:.2f}%")
if len(anomalies) > 0:
with st.expander("πŸ” Anomaly Details"):
st.dataframe(anomalies[[target_col]], use_container_width=True)
def render_report_generator(self, df: pd.DataFrame):
"""Render automated report generator"""
st.markdown("### πŸ“‹ Automated Report Generator")
report_type = st.selectbox(
"Report Type",
["Executive Summary", "Technical Analysis", "Data Quality Report", "Custom Report"]
)
col1, col2 = st.columns(2)
with col1:
include_charts = st.checkbox("Include Charts", value=True)
with col2:
include_recommendations = st.checkbox("Include Recommendations", value=True)
if st.button("πŸ“„ Generate Report"):
report_content = self.generate_report(df, report_type, include_charts, include_recommendations)
# Display report
st.markdown("#### πŸ“Š Generated Report")
st.markdown(report_content)
# Download option
self.create_download_link(report_content, f"neural_analyst_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md")
def generate_report(self, df: pd.DataFrame, report_type: str, include_charts: bool, include_recommendations: bool) -> str:
"""Generate automated report"""
report = f"""
# Neural Data Analyst Report
**Generated on:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
**Report Type:** {report_type}
## Dataset Overview
- **Total Rows:** {len(df):,}
- **Total Columns:** {len(df.columns)}
- **Memory Usage:** {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB
- **Missing Values:** {df.isnull().sum().sum():,} ({df.isnull().sum().sum() / df.size * 100:.1f}%)
## Column Information
"""
# Column details
for col in df.columns:
dtype = str(df[col].dtype)
null_count = df[col].isnull().sum()
unique_count = df[col].nunique()
report += f"- **{col}** ({dtype}): {null_count} missing, {unique_count} unique values\n"
# Numeric summary
numeric_cols = df.select_dtypes(include=[np.number]).columns
if len(numeric_cols) > 0:
report += "\n## Numeric Summary\n"
summary_stats = df[numeric_cols].describe()
report += summary_stats.to_markdown()
# Key insights
if include_recommendations:
report += "\n## Key Insights & Recommendations\n"
recommendations = self.generate_analysis_recommendations(df)
for i, rec in enumerate(recommendations[:5], 1):
report += f"{i}. **{rec['title']}:** {rec['description']}\n"
return report
def create_download_link(self, content: str, filename: str):
"""Create download link for report"""
b64 = base64.b64encode(content.encode()).decode()
href = f'<a href="data:text/markdown;base64,{b64}" download="{filename}">πŸ“₯ Download Report</a>'
st.markdown(href, unsafe_allow_html=True)
def render_data_comparison_tool(self):
"""Render data comparison tool for multiple datasets"""
st.markdown("## βš–οΈ Data Comparison Tool")
st.markdown("Upload multiple datasets to compare their characteristics:")
uploaded_files = st.file_uploader(
"Choose CSV files for comparison",
type=['csv'],
accept_multiple_files=True
)
if len(uploaded_files) >= 2:
datasets = {}
for file in uploaded_files:
try:
df = pd.read_csv(file)
datasets[file.name] = df
except Exception as e:
st.error(f"Error loading {file.name}: {str(e)}")
if len(datasets) >= 2:
self.perform_dataset_comparison(datasets)
def perform_dataset_comparison(self, datasets: Dict[str, pd.DataFrame]):
"""Perform comparison between multiple datasets"""
st.markdown("### πŸ“Š Dataset Comparison Results")
# Basic comparison table
comparison_data = []
for name, df in datasets.items():
comparison_data.append({
'Dataset': name,
'Rows': len(df),
'Columns': len(df.columns),
'Numeric Columns': len(df.select_dtypes(include=[np.number]).columns),
'Text Columns': len(df.select_dtypes(include=['object']).columns),
'Missing Values': df.isnull().sum().sum(),
'Memory (MB)': f"{df.memory_usage(deep=True).sum() / 1024**2:.2f}"
})
comparison_df = pd.DataFrame(comparison_data)
st.dataframe(comparison_df, use_container_width=True)
# Visual comparison
fig = make_subplots(
rows=2, cols=2,
subplot_titles=['Rows Comparison', 'Columns Comparison',
'Missing Values', 'Memory Usage'],
specs=[[{"type": "bar"}, {"type": "bar"}],
[{"type": "bar"}, {"type": "bar"}]]
)
names = list(datasets.keys())
# Rows comparison
fig.add_trace(
go.Bar(x=names, y=[len(datasets[name]) for name in names], name="Rows"),
row=1, col=1
)
# Columns comparison
fig.add_trace(
go.Bar(x=names, y=[len(datasets[name].columns) for name in names], name="Columns"),
row=1, col=2
)
# Missing values comparison
fig.add_trace(
go.Bar(x=names, y=[datasets[name].isnull().sum().sum() for name in names], name="Missing"),
row=2, col=1
)
# Memory usage comparison
fig.add_trace(
go.Bar(x=names, y=[datasets[name].memory_usage(deep=True).sum() / 1024**2 for name in names], name="Memory"),
row=2, col=2
)
fig.update_layout(height=600, showlegend=False, title_text="Dataset Comparison Dashboard")
st.plotly_chart(fig, use_container_width=True)
def render_data_profiling_tool(self, df: pd.DataFrame):
"""Render comprehensive data profiling tool"""
st.markdown("## πŸ”¬ Data Profiling Tool")
if st.button("πŸš€ Generate Complete Data Profile"):
with st.spinner("Generating comprehensive data profile..."):
profile = self.generate_data_profile(df)
self.display_data_profile(profile)
def generate_data_profile(self, df: pd.DataFrame) -> Dict[str, Any]:
"""Generate comprehensive data profile"""
profile = {
'overview': {},
'column_profiles': {},
'data_quality': {},
'relationships': {},
'recommendations': []
}
# Overview
profile['overview'] = {
'shape': df.shape,
'memory_usage': df.memory_usage(deep=True).sum(),
'dtypes': df.dtypes.value_counts().to_dict(),
'missing_cells': df.isnull().sum().sum(),
'duplicate_rows': df.duplicated().sum()
}
# Column profiles
for col in df.columns:
col_profile = {
'dtype': str(df[col].dtype),
'null_count': df[col].isnull().sum(),
'null_percentage': df[col].isnull().sum() / len(df) * 100,
'unique_count': df[col].nunique(),
'unique_percentage': df[col].nunique() / len(df) * 100
}
if df[col].dtype in ['int64', 'float64']:
col_profile.update({
'min': df[col].min(),
'max': df[col].max(),
'mean': df[col].mean(),
'std': df[col].std(),
'skewness': df[col].skew(),
'kurtosis': df[col].kurtosis()
})
else:
col_profile.update({
'most_frequent': df[col].mode().iloc[0] if len(df[col].mode()) > 0 else None,
'most_frequent_count': df[col].value_counts().iloc[0] if len(df[col].value_counts()) > 0 else 0
})
profile['column_profiles'][col] = col_profile
return profile
def display_data_profile(self, profile: Dict[str, Any]):
"""Display data profile results"""
st.markdown("### πŸ“Š Complete Data Profile")
# Overview metrics
overview = profile['overview']
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("Rows", f"{overview['shape'][0]:,}")
with col2:
st.metric("Columns", overview['shape'][1])
with col3:
st.metric("Missing Cells", f"{overview['missing_cells']:,}")
with col4:
st.metric("Duplicates", f"{overview['duplicate_rows']:,}")
# Column details table
st.markdown("#### πŸ“‹ Column Details")
col_data = []
for col, details in profile['column_profiles'].items():
col_data.append({
'Column': col,
'Type': details['dtype'],
'Missing %': f"{details['null_percentage']:.1f}%",
'Unique %': f"{details['unique_percentage']:.1f}%",
'Details': f"Min: {details.get('min', 'N/A')}, Max: {details.get('max', 'N/A')}" if 'min' in details else f"Most Frequent: {details.get('most_frequent', 'N/A')}"
})
col_df = pd.DataFrame(col_data)
st.dataframe(col_df, use_container_width=True)
# Usage in main app
def integrate_advanced_features():
"""Integration function for advanced features"""
return """
# Add this to your main.py file:
from advanced_features import AdvancedFeatures
# In your NeuralDataAnalyst class:
def __init__(self):
# ... existing code ...
self.advanced_features = AdvancedFeatures(self.db_manager)
# Add this after your existing data upload section:
if st.session_state.uploaded_data is not None:
if st.button("πŸ”¬ Advanced Analytics", key="advanced_analytics"):
self.advanced_features.render_advanced_analytics_dashboard(st.session_state.uploaded_data)
if st.button("πŸ” Data Profiling", key="data_profiling"):
self.advanced_features.render_data_profiling_tool(st.session_state.uploaded_data)
# Add dataset comparison in sidebar:
with st.sidebar:
st.markdown("---")
if st.button("βš–οΈ Compare Datasets"):
self.advanced_features.render_data_comparison_tool()
"""