|
""" |
|
CompI Data Processing Utilities |
|
|
|
This module provides utilities for Phase 2.B: Data/Logic Input Integration |
|
- CSV data analysis and processing |
|
- Mathematical formula evaluation |
|
- Data-to-text conversion (poetic descriptions) |
|
- Data visualization generation |
|
- Statistical analysis and pattern detection |
|
""" |
|
|
|
import os |
|
import io |
|
import ast |
|
import math |
|
import numpy as np |
|
import pandas as pd |
|
import matplotlib |
|
matplotlib.use('Agg') |
|
import matplotlib.pyplot as plt |
|
import seaborn as sns |
|
from typing import Dict, List, Optional, Tuple, Union, Any |
|
from dataclasses import dataclass |
|
from PIL import Image |
|
import logging |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
@dataclass |
|
class DataFeatures: |
|
"""Container for extracted data features and statistics""" |
|
|
|
|
|
shape: Tuple[int, int] |
|
columns: List[str] |
|
numeric_columns: List[str] |
|
data_types: Dict[str, str] |
|
|
|
|
|
means: Dict[str, float] |
|
medians: Dict[str, float] |
|
stds: Dict[str, float] |
|
mins: Dict[str, float] |
|
maxs: Dict[str, float] |
|
ranges: Dict[str, float] |
|
|
|
|
|
trends: Dict[str, str] |
|
correlations: Dict[str, float] |
|
seasonality: Dict[str, bool] |
|
|
|
|
|
complexity_score: float |
|
variability_score: float |
|
pattern_strength: float |
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
"""Convert to dictionary for JSON serialization""" |
|
return { |
|
'shape': self.shape, |
|
'columns': self.columns, |
|
'numeric_columns': self.numeric_columns, |
|
'data_types': self.data_types, |
|
'means': self.means, |
|
'medians': self.medians, |
|
'stds': self.stds, |
|
'mins': self.mins, |
|
'maxs': self.maxs, |
|
'ranges': self.ranges, |
|
'trends': self.trends, |
|
'correlations': self.correlations, |
|
'seasonality': self.seasonality, |
|
'complexity_score': self.complexity_score, |
|
'variability_score': self.variability_score, |
|
'pattern_strength': self.pattern_strength |
|
} |
|
|
|
class DataProcessor: |
|
"""Core data processing and analysis functionality""" |
|
|
|
def __init__(self): |
|
"""Initialize the data processor""" |
|
self.safe_functions = { |
|
|
|
'abs': abs, 'round': round, 'min': min, 'max': max, |
|
'sum': sum, 'len': len, 'pow': pow, |
|
|
|
|
|
'np': np, 'numpy': np, |
|
'sin': np.sin, 'cos': np.cos, 'tan': np.tan, |
|
'exp': np.exp, 'log': np.log, 'sqrt': np.sqrt, |
|
'pi': np.pi, 'e': np.e, |
|
|
|
|
|
'math': math, |
|
|
|
|
|
'__builtins__': {} |
|
} |
|
|
|
def analyze_csv_data(self, df: pd.DataFrame) -> DataFeatures: |
|
""" |
|
Comprehensive analysis of CSV data |
|
|
|
Args: |
|
df: Input DataFrame |
|
|
|
Returns: |
|
DataFeatures object with extracted insights |
|
""" |
|
logger.info(f"Analyzing CSV data with shape {df.shape}") |
|
|
|
|
|
shape = df.shape |
|
columns = df.columns.tolist() |
|
numeric_df = df.select_dtypes(include=[np.number]) |
|
numeric_columns = numeric_df.columns.tolist() |
|
data_types = {col: str(df[col].dtype) for col in columns} |
|
|
|
|
|
means = {col: float(numeric_df[col].mean()) for col in numeric_columns} |
|
medians = {col: float(numeric_df[col].median()) for col in numeric_columns} |
|
stds = {col: float(numeric_df[col].std()) for col in numeric_columns} |
|
mins = {col: float(numeric_df[col].min()) for col in numeric_columns} |
|
maxs = {col: float(numeric_df[col].max()) for col in numeric_columns} |
|
ranges = {col: maxs[col] - mins[col] for col in numeric_columns} |
|
|
|
|
|
trends = self._analyze_trends(numeric_df) |
|
correlations = self._find_strongest_correlations(numeric_df) |
|
seasonality = self._detect_seasonality(numeric_df) |
|
|
|
|
|
complexity_score = self._calculate_complexity_score(numeric_df) |
|
variability_score = self._calculate_variability_score(stds, ranges) |
|
pattern_strength = self._calculate_pattern_strength(trends, correlations) |
|
|
|
return DataFeatures( |
|
shape=shape, |
|
columns=columns, |
|
numeric_columns=numeric_columns, |
|
data_types=data_types, |
|
means=means, |
|
medians=medians, |
|
stds=stds, |
|
mins=mins, |
|
maxs=maxs, |
|
ranges=ranges, |
|
trends=trends, |
|
correlations=correlations, |
|
seasonality=seasonality, |
|
complexity_score=complexity_score, |
|
variability_score=variability_score, |
|
pattern_strength=pattern_strength |
|
) |
|
|
|
def evaluate_formula(self, formula: str, num_points: int = 100) -> Tuple[np.ndarray, Dict[str, Any]]: |
|
""" |
|
Safely evaluate mathematical formula |
|
|
|
Args: |
|
formula: Mathematical expression (Python/NumPy syntax) |
|
num_points: Number of points to generate |
|
|
|
Returns: |
|
Tuple of (result_array, metadata) |
|
""" |
|
logger.info(f"Evaluating formula: {formula}") |
|
|
|
try: |
|
|
|
if 'x' in formula and 'linspace' not in formula and 'arange' not in formula: |
|
|
|
x = np.linspace(0, 10, num_points) |
|
self.safe_functions['x'] = x |
|
|
|
|
|
result = eval(formula, self.safe_functions) |
|
|
|
|
|
if not isinstance(result, np.ndarray): |
|
if isinstance(result, (list, tuple)): |
|
result = np.array(result) |
|
else: |
|
|
|
result = np.full(num_points, result) |
|
|
|
|
|
metadata = { |
|
'length': len(result), |
|
'min': float(np.min(result)), |
|
'max': float(np.max(result)), |
|
'mean': float(np.mean(result)), |
|
'std': float(np.std(result)), |
|
'range': float(np.max(result) - np.min(result)), |
|
'formula': formula, |
|
'has_pattern': self._detect_mathematical_pattern(result) |
|
} |
|
|
|
return result, metadata |
|
|
|
except Exception as e: |
|
logger.error(f"Formula evaluation failed: {e}") |
|
raise ValueError(f"Invalid formula: {e}") |
|
|
|
def _analyze_trends(self, df: pd.DataFrame) -> Dict[str, str]: |
|
"""Analyze trends in numeric columns""" |
|
trends = {} |
|
for col in df.columns: |
|
values = df[col].dropna() |
|
if len(values) < 3: |
|
trends[col] = 'insufficient_data' |
|
continue |
|
|
|
|
|
x = np.arange(len(values)) |
|
slope = np.polyfit(x, values, 1)[0] |
|
std_val = values.std() |
|
|
|
if abs(slope) < std_val * 0.1: |
|
trends[col] = 'stable' |
|
elif std_val > values.mean() * 0.5: |
|
trends[col] = 'volatile' |
|
elif slope > 0: |
|
trends[col] = 'increasing' |
|
else: |
|
trends[col] = 'decreasing' |
|
|
|
return trends |
|
|
|
def _find_strongest_correlations(self, df: pd.DataFrame) -> Dict[str, float]: |
|
"""Find strongest correlations between columns""" |
|
if len(df.columns) < 2: |
|
return {} |
|
|
|
corr_matrix = df.corr() |
|
correlations = {} |
|
|
|
for i, col1 in enumerate(df.columns): |
|
for j, col2 in enumerate(df.columns): |
|
if i < j: |
|
corr_val = corr_matrix.loc[col1, col2] |
|
if not np.isnan(corr_val): |
|
correlations[f"{col1}_vs_{col2}"] = float(corr_val) |
|
|
|
|
|
sorted_corr = sorted(correlations.items(), key=lambda x: abs(x[1]), reverse=True) |
|
return dict(sorted_corr[:3]) |
|
|
|
def _detect_seasonality(self, df: pd.DataFrame) -> Dict[str, bool]: |
|
"""Simple seasonality detection""" |
|
seasonality = {} |
|
for col in df.columns: |
|
values = df[col].dropna() |
|
if len(values) < 12: |
|
seasonality[col] = False |
|
continue |
|
|
|
|
|
try: |
|
autocorr = np.corrcoef(values[:-1], values[1:])[0, 1] |
|
seasonality[col] = not np.isnan(autocorr) and abs(autocorr) > 0.3 |
|
except: |
|
seasonality[col] = False |
|
|
|
return seasonality |
|
|
|
def _calculate_complexity_score(self, df: pd.DataFrame) -> float: |
|
"""Calculate data complexity score (0-1)""" |
|
if df.empty: |
|
return 0.0 |
|
|
|
|
|
num_cols = len(df.columns) |
|
col_score = min(num_cols / 10, 1.0) |
|
|
|
|
|
missing_ratio = df.isnull().sum().sum() / (df.shape[0] * df.shape[1]) |
|
missing_score = min(missing_ratio * 2, 1.0) |
|
|
|
return (col_score + missing_score) / 2 |
|
|
|
def _calculate_variability_score(self, stds: Dict[str, float], ranges: Dict[str, float]) -> float: |
|
"""Calculate data variability score (0-1)""" |
|
if not stds: |
|
return 0.0 |
|
|
|
|
|
normalized_vars = [] |
|
for col in stds: |
|
if ranges[col] > 0: |
|
normalized_vars.append(stds[col] / ranges[col]) |
|
|
|
if not normalized_vars: |
|
return 0.0 |
|
|
|
return min(np.mean(normalized_vars) * 2, 1.0) |
|
|
|
def _calculate_pattern_strength(self, trends: Dict[str, str], correlations: Dict[str, float]) -> float: |
|
"""Calculate pattern strength score (0-1)""" |
|
pattern_score = 0.0 |
|
|
|
|
|
trend_patterns = sum(1 for trend in trends.values() if trend in ['increasing', 'decreasing']) |
|
trend_score = min(trend_patterns / max(len(trends), 1), 1.0) |
|
|
|
|
|
if correlations: |
|
max_corr = max(abs(corr) for corr in correlations.values()) |
|
corr_score = max_corr |
|
else: |
|
corr_score = 0.0 |
|
|
|
return (trend_score + corr_score) / 2 |
|
|
|
def _detect_mathematical_pattern(self, data: np.ndarray) -> bool: |
|
"""Detect if mathematical data has recognizable patterns""" |
|
if len(data) < 10: |
|
return False |
|
|
|
|
|
try: |
|
|
|
autocorr = np.corrcoef(data[:-1], data[1:])[0, 1] |
|
return not np.isnan(autocorr) and abs(autocorr) > 0.5 |
|
except: |
|
return False |
|
|
|
|
|
class DataToTextConverter: |
|
"""Convert data patterns into poetic/narrative text descriptions""" |
|
|
|
def __init__(self): |
|
"""Initialize the converter with descriptive vocabularies""" |
|
self.trend_descriptions = { |
|
'increasing': ['ascending', 'rising', 'climbing', 'growing', 'soaring'], |
|
'decreasing': ['descending', 'falling', 'declining', 'diminishing', 'fading'], |
|
'stable': ['steady', 'constant', 'balanced', 'harmonious', 'peaceful'], |
|
'volatile': ['chaotic', 'turbulent', 'dynamic', 'energetic', 'wild'] |
|
} |
|
|
|
self.pattern_adjectives = { |
|
'high_complexity': ['intricate', 'complex', 'sophisticated', 'elaborate'], |
|
'low_complexity': ['simple', 'pure', 'minimal', 'clean'], |
|
'high_variability': ['diverse', 'varied', 'rich', 'multifaceted'], |
|
'low_variability': ['consistent', 'uniform', 'regular', 'predictable'], |
|
'strong_patterns': ['rhythmic', 'structured', 'organized', 'patterned'], |
|
'weak_patterns': ['random', 'scattered', 'free-flowing', 'organic'] |
|
} |
|
|
|
self.artistic_metaphors = [ |
|
'like brushstrokes on a canvas', |
|
'resembling musical notes in harmony', |
|
'flowing like water through landscapes', |
|
'dancing with mathematical precision', |
|
'weaving patterns of light and shadow', |
|
'creating symphonies of numbers', |
|
'painting stories with data points', |
|
'sculpting meaning from statistics' |
|
] |
|
|
|
def generate_poetic_description(self, features: DataFeatures) -> str: |
|
""" |
|
Generate poetic description from data features |
|
|
|
Args: |
|
features: DataFeatures object |
|
|
|
Returns: |
|
Poetic text description |
|
""" |
|
descriptions = [] |
|
|
|
|
|
descriptions.append(f"A tapestry woven from {features.shape[0]} data points across {features.shape[1]} dimensions") |
|
|
|
|
|
trend_desc = self._describe_trends(features.trends) |
|
if trend_desc: |
|
descriptions.append(trend_desc) |
|
|
|
|
|
var_desc = self._describe_variability(features.variability_score) |
|
if var_desc: |
|
descriptions.append(var_desc) |
|
|
|
|
|
pattern_desc = self._describe_patterns(features.pattern_strength, features.correlations) |
|
if pattern_desc: |
|
descriptions.append(pattern_desc) |
|
|
|
|
|
import random |
|
metaphor = random.choice(self.artistic_metaphors) |
|
descriptions.append(f"The data flows {metaphor}") |
|
|
|
return '. '.join(descriptions) + '.' |
|
|
|
def generate_formula_description(self, formula: str, metadata: Dict[str, Any]) -> str: |
|
""" |
|
Generate poetic description for mathematical formula |
|
|
|
Args: |
|
formula: Original formula |
|
metadata: Formula evaluation metadata |
|
|
|
Returns: |
|
Poetic text description |
|
""" |
|
descriptions = [] |
|
|
|
|
|
descriptions.append(f"Mathematical harmony emerges from the expression: {formula}") |
|
|
|
|
|
range_val = metadata['range'] |
|
if range_val > 10: |
|
descriptions.append("The function soars across vast numerical landscapes") |
|
elif range_val > 1: |
|
descriptions.append("Values dance within moderate bounds") |
|
else: |
|
descriptions.append("Numbers whisper in gentle, subtle variations") |
|
|
|
|
|
if metadata['has_pattern']: |
|
descriptions.append("Revealing intricate patterns that speak to the soul") |
|
else: |
|
descriptions.append("Creating unique, unrepeatable mathematical poetry") |
|
|
|
|
|
import random |
|
metaphor = random.choice(self.artistic_metaphors) |
|
descriptions.append(f"Each calculation {metaphor}") |
|
|
|
return '. '.join(descriptions) + '.' |
|
|
|
def _describe_trends(self, trends: Dict[str, str]) -> str: |
|
"""Describe overall trends in the data""" |
|
if not trends: |
|
return "" |
|
|
|
trend_counts = {} |
|
for trend in trends.values(): |
|
trend_counts[trend] = trend_counts.get(trend, 0) + 1 |
|
|
|
dominant_trend = max(trend_counts, key=trend_counts.get) |
|
|
|
if dominant_trend in self.trend_descriptions: |
|
import random |
|
adj = random.choice(self.trend_descriptions[dominant_trend]) |
|
return f"The data reveals {adj} patterns throughout its structure" |
|
|
|
return "" |
|
|
|
def _describe_variability(self, variability_score: float) -> str: |
|
"""Describe data variability""" |
|
import random |
|
|
|
if variability_score > 0.7: |
|
adj = random.choice(self.pattern_adjectives['high_variability']) |
|
return f"With {adj} expressions of numerical diversity" |
|
elif variability_score < 0.3: |
|
adj = random.choice(self.pattern_adjectives['low_variability']) |
|
return f"Maintaining {adj} elegance in its values" |
|
else: |
|
return "Balancing consistency with creative variation" |
|
|
|
def _describe_patterns(self, pattern_strength: float, correlations: Dict[str, float]) -> str: |
|
"""Describe pattern strength and correlations""" |
|
import random |
|
|
|
if pattern_strength > 0.6: |
|
adj = random.choice(self.pattern_adjectives['strong_patterns']) |
|
return f"Displaying {adj} relationships between its elements" |
|
elif pattern_strength < 0.3: |
|
adj = random.choice(self.pattern_adjectives['weak_patterns']) |
|
return f"Embracing {adj} freedom in its numerical expression" |
|
else: |
|
return "Weaving subtle connections throughout its numerical fabric" |
|
|
|
|
|
class DataVisualizer: |
|
"""Create visualizations from data for artistic conditioning""" |
|
|
|
def __init__(self, style: str = 'artistic'): |
|
""" |
|
Initialize visualizer |
|
|
|
Args: |
|
style: Visualization style ('artistic', 'scientific', 'minimal') |
|
""" |
|
self.style = style |
|
self.color_palettes = { |
|
'artistic': ['#FF6B6B', '#4ECDC4', '#45B7D1', '#96CEB4', '#FFEAA7'], |
|
'scientific': ['#2E86AB', '#A23B72', '#F18F01', '#C73E1D', '#592E83'], |
|
'minimal': ['#2C3E50', '#34495E', '#7F8C8D', '#95A5A6', '#BDC3C7'] |
|
} |
|
|
|
def create_data_visualization(self, df: pd.DataFrame, features: DataFeatures) -> Image.Image: |
|
""" |
|
Create artistic visualization from DataFrame |
|
|
|
Args: |
|
df: Input DataFrame |
|
features: DataFeatures object |
|
|
|
Returns: |
|
PIL Image of the visualization |
|
""" |
|
plt.style.use('default') |
|
fig, axes = plt.subplots(2, 2, figsize=(12, 10)) |
|
fig.suptitle('Data Pattern Visualization', fontsize=16, fontweight='bold') |
|
|
|
numeric_df = df.select_dtypes(include=[np.number]) |
|
colors = self.color_palettes[self.style] |
|
|
|
|
|
ax1 = axes[0, 0] |
|
for i, col in enumerate(numeric_df.columns[:3]): |
|
ax1.plot(numeric_df[col], color=colors[i % len(colors)], |
|
linewidth=2, alpha=0.8, label=col) |
|
ax1.set_title('Data Trends', fontweight='bold') |
|
ax1.legend() |
|
ax1.grid(True, alpha=0.3) |
|
|
|
|
|
ax2 = axes[0, 1] |
|
if len(numeric_df.columns) > 0: |
|
col = numeric_df.columns[0] |
|
ax2.hist(numeric_df[col].dropna(), bins=20, color=colors[0], |
|
alpha=0.7, edgecolor='black') |
|
ax2.set_title(f'Distribution: {col}', fontweight='bold') |
|
ax2.grid(True, alpha=0.3) |
|
|
|
|
|
ax3 = axes[1, 0] |
|
if len(numeric_df.columns) > 1: |
|
corr_matrix = numeric_df.corr() |
|
im = ax3.imshow(corr_matrix, cmap='RdBu_r', aspect='auto', vmin=-1, vmax=1) |
|
ax3.set_xticks(range(len(corr_matrix.columns))) |
|
ax3.set_yticks(range(len(corr_matrix.columns))) |
|
ax3.set_xticklabels(corr_matrix.columns, rotation=45) |
|
ax3.set_yticklabels(corr_matrix.columns) |
|
ax3.set_title('Correlations', fontweight='bold') |
|
plt.colorbar(im, ax=ax3, shrink=0.8) |
|
else: |
|
ax3.text(0.5, 0.5, 'Single Column\nNo Correlations', |
|
ha='center', va='center', transform=ax3.transAxes) |
|
ax3.set_title('Correlations', fontweight='bold') |
|
|
|
|
|
ax4 = axes[1, 1] |
|
if len(numeric_df.columns) > 0: |
|
stats_data = [features.means[col] for col in numeric_df.columns[:5]] |
|
bars = ax4.bar(range(len(stats_data)), stats_data, color=colors[:len(stats_data)]) |
|
ax4.set_title('Mean Values', fontweight='bold') |
|
ax4.set_xticks(range(len(stats_data))) |
|
ax4.set_xticklabels([col[:8] for col in numeric_df.columns[:5]], rotation=45) |
|
ax4.grid(True, alpha=0.3) |
|
|
|
plt.tight_layout() |
|
|
|
|
|
buf = io.BytesIO() |
|
plt.savefig(buf, format='png', dpi=150, bbox_inches='tight') |
|
plt.close() |
|
buf.seek(0) |
|
|
|
return Image.open(buf) |
|
|
|
def create_formula_visualization(self, data: np.ndarray, formula: str, metadata: Dict[str, Any]) -> Image.Image: |
|
""" |
|
Create artistic visualization from formula result |
|
|
|
Args: |
|
data: Formula result array |
|
formula: Original formula |
|
metadata: Formula metadata |
|
|
|
Returns: |
|
PIL Image of the visualization |
|
""" |
|
try: |
|
logger.info(f"Creating visualization for formula: {formula}") |
|
logger.info(f"Data shape: {data.shape}, Data range: [{np.min(data):.3f}, {np.max(data):.3f}]") |
|
|
|
plt.style.use('default') |
|
fig, axes = plt.subplots(2, 2, figsize=(12, 10)) |
|
fig.suptitle(f'Mathematical Pattern: {formula}', fontsize=14, fontweight='bold') |
|
|
|
colors = self.color_palettes[self.style] |
|
x = np.arange(len(data)) |
|
|
|
|
|
ax1 = axes[0, 0] |
|
ax1.plot(x, data, color=colors[0], linewidth=3, alpha=0.8) |
|
ax1.fill_between(x, data, alpha=0.3, color=colors[0]) |
|
ax1.set_title('Function Values', fontweight='bold') |
|
ax1.grid(True, alpha=0.3) |
|
|
|
|
|
ax2 = axes[0, 1] |
|
if len(data) > 1: |
|
derivative = np.gradient(data) |
|
ax2.plot(x, derivative, color=colors[1], linewidth=2) |
|
ax2.set_title('Rate of Change', fontweight='bold') |
|
ax2.grid(True, alpha=0.3) |
|
|
|
|
|
ax3 = axes[1, 0] |
|
ax3.hist(data, bins=30, color=colors[2], alpha=0.7, edgecolor='black') |
|
ax3.set_title('Value Distribution', fontweight='bold') |
|
ax3.grid(True, alpha=0.3) |
|
|
|
|
|
ax4 = axes[1, 1] |
|
if len(data) > 1: |
|
ax4.scatter(data[:-1], data[1:], c=x[:-1], cmap='viridis', alpha=0.6) |
|
ax4.set_xlabel('f(t)') |
|
ax4.set_ylabel('f(t+1)') |
|
ax4.set_title('Phase Space', fontweight='bold') |
|
ax4.grid(True, alpha=0.3) |
|
|
|
plt.tight_layout() |
|
|
|
|
|
buf = io.BytesIO() |
|
plt.savefig(buf, format='png', dpi=150, bbox_inches='tight') |
|
plt.close() |
|
buf.seek(0) |
|
|
|
image = Image.open(buf) |
|
logger.info(f"Successfully created visualization image: {image.size}") |
|
return image |
|
|
|
except Exception as e: |
|
logger.error(f"Error creating formula visualization: {e}") |
|
plt.close('all') |
|
|
|
|
|
fig, ax = plt.subplots(figsize=(8, 6)) |
|
ax.text(0.5, 0.5, f'Visualization Error:\n{str(e)}', |
|
ha='center', va='center', fontsize=12, |
|
bbox=dict(boxstyle="round,pad=0.3", facecolor="lightcoral")) |
|
ax.set_xlim(0, 1) |
|
ax.set_ylim(0, 1) |
|
ax.axis('off') |
|
|
|
buf = io.BytesIO() |
|
plt.savefig(buf, format='png', dpi=150, bbox_inches='tight') |
|
plt.close() |
|
buf.seek(0) |
|
|
|
return Image.open(buf) |
|
|