|
""" |
|
Advanced Time Series Forecasters for TIPM |
|
======================================== |
|
|
|
LSTM-based forecasting models for GDP impact, trade flows, employment, |
|
and price impacts with enterprise-grade quality and performance. |
|
""" |
|
|
|
import logging |
|
from typing import Dict, List, Optional, Any, Union, Tuple |
|
from datetime import datetime, timedelta |
|
import numpy as np |
|
import pandas as pd |
|
|
|
|
|
try: |
|
import torch |
|
import torch.nn as nn |
|
import torch.optim as optim |
|
from torch.utils.data import DataLoader, TensorDataset |
|
|
|
TORCH_AVAILABLE = True |
|
except ImportError: |
|
TORCH_AVAILABLE = False |
|
torch = None |
|
nn = None |
|
optim = None |
|
DataLoader = None |
|
TensorDataset = None |
|
|
|
|
|
try: |
|
from sklearn.preprocessing import MinMaxScaler, StandardScaler |
|
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score |
|
|
|
SKLEARN_AVAILABLE = True |
|
except ImportError: |
|
SKLEARN_AVAILABLE = False |
|
MinMaxScaler = None |
|
StandardScaler = None |
|
|
|
|
|
from .base import BaseMLModel, ModelType, PredictionResult, ModelStatus |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class LSTMNetwork(nn.Module): |
|
"""Advanced LSTM network for time series forecasting""" |
|
|
|
def __init__( |
|
self, |
|
input_size: int, |
|
hidden_size: int, |
|
num_layers: int, |
|
output_size: int, |
|
dropout: float = 0.2, |
|
): |
|
super(LSTMNetwork, self).__init__() |
|
|
|
self.hidden_size = hidden_size |
|
self.num_layers = num_layers |
|
|
|
|
|
self.lstm = nn.LSTM( |
|
input_size=input_size, |
|
hidden_size=hidden_size, |
|
num_layers=num_layers, |
|
batch_first=True, |
|
dropout=dropout if num_layers > 1 else 0, |
|
bidirectional=False, |
|
) |
|
|
|
|
|
self.attention = nn.MultiheadAttention( |
|
embed_dim=hidden_size, num_heads=4, dropout=dropout |
|
) |
|
|
|
|
|
self.fc1 = nn.Linear(hidden_size, hidden_size // 2) |
|
self.dropout = nn.Dropout(dropout) |
|
self.fc2 = nn.Linear(hidden_size // 2, output_size) |
|
self.relu = nn.ReLU() |
|
|
|
def forward(self, x): |
|
|
|
lstm_out, _ = self.lstm(x) |
|
|
|
|
|
attn_out, _ = self.attention(lstm_out, lstm_out, lstm_out) |
|
|
|
|
|
out = self.fc1(attn_out[:, -1, :]) |
|
out = self.relu(out) |
|
out = self.dropout(out) |
|
out = self.fc2(out) |
|
|
|
return out |
|
|
|
|
|
class BaseTimeSeriesForecaster(BaseMLModel): |
|
"""Base class for time series forecasting models""" |
|
|
|
def __init__( |
|
self, |
|
model_id: str, |
|
name: str, |
|
description: str, |
|
sequence_length: int = 12, |
|
forecast_horizon: int = 6, |
|
): |
|
super().__init__( |
|
model_id=model_id, |
|
name=name, |
|
description=description, |
|
model_type=ModelType.TIME_SERIES, |
|
) |
|
|
|
|
|
self.sequence_length = sequence_length |
|
self.forecast_horizon = forecast_horizon |
|
|
|
|
|
self.scaler = None |
|
self.feature_scaler = None |
|
|
|
|
|
self._model = None |
|
self._device = None |
|
|
|
|
|
self.hyperparameters = { |
|
"hidden_size": 128, |
|
"num_layers": 3, |
|
"learning_rate": 0.001, |
|
"batch_size": 32, |
|
"epochs": 100, |
|
"dropout": 0.2, |
|
"patience": 10, |
|
} |
|
|
|
logger.info(f"Initialized {name}: {model_id}") |
|
|
|
def _create_model(self): |
|
"""Create the LSTM model""" |
|
if not TORCH_AVAILABLE: |
|
raise RuntimeError("PyTorch is required for LSTM models") |
|
|
|
|
|
input_size = self._get_input_size() |
|
|
|
model = LSTMNetwork( |
|
input_size=input_size, |
|
hidden_size=self.hyperparameters["hidden_size"], |
|
num_layers=self.hyperparameters["num_layers"], |
|
output_size=self.forecast_horizon, |
|
dropout=self.hyperparameters["dropout"], |
|
) |
|
|
|
return model |
|
|
|
def _get_input_size(self) -> int: |
|
"""Get input size for the model""" |
|
|
|
base_features = getattr(self, "_feature_count", 1) |
|
time_features = 4 |
|
return base_features + time_features |
|
|
|
def _prepare_features(self, X: Union[pd.DataFrame, np.ndarray]) -> np.ndarray: |
|
"""Prepare features for time series forecasting""" |
|
if isinstance(X, pd.DataFrame): |
|
|
|
if hasattr(X, "index") and isinstance(X.index, pd.DatetimeIndex): |
|
X = self._add_time_features(X) |
|
|
|
|
|
X = X.values |
|
|
|
|
|
if X.ndim == 2: |
|
X = self._create_sequences(X) |
|
|
|
return X.astype(np.float32) |
|
|
|
def _prepare_targets(self, y: Union[pd.Series, np.ndarray]) -> np.ndarray: |
|
"""Prepare target variables for time series forecasting""" |
|
if isinstance(y, pd.Series): |
|
y = y.values |
|
|
|
|
|
y_sequences = self._create_target_sequences(y) |
|
return y_sequences.astype(np.float32) |
|
|
|
def _add_time_features(self, df: pd.DataFrame) -> pd.DataFrame: |
|
"""Add time-based features to the dataframe""" |
|
df = df.copy() |
|
|
|
|
|
df["year"] = df.index.year |
|
df["month"] = df.index.month |
|
df["quarter"] = df.index.quarter |
|
df["day_of_week"] = df.index.dayofweek |
|
|
|
return df |
|
|
|
def _create_sequences(self, data: np.ndarray) -> np.ndarray: |
|
"""Create sliding window sequences for LSTM""" |
|
sequences = [] |
|
for i in range(len(data) - self.sequence_length + 1): |
|
sequences.append(data[i : i + self.sequence_length]) |
|
return np.array(sequences) |
|
|
|
def _create_target_sequences(self, data: np.ndarray) -> np.ndarray: |
|
"""Create target sequences for forecasting""" |
|
targets = [] |
|
for i in range(self.sequence_length, len(data) - self.forecast_horizon + 1): |
|
targets.append(data[i : i + self.forecast_horizon]) |
|
return np.array(targets) |
|
|
|
def fit( |
|
self, |
|
X: Union[pd.DataFrame, np.ndarray], |
|
y: Union[pd.Series, np.ndarray], |
|
**kwargs, |
|
) -> "BaseTimeSeriesForecaster": |
|
"""Train the time series forecasting model""" |
|
training_start = datetime.now() |
|
|
|
try: |
|
logger.info(f"Starting training for {self.model_id}") |
|
self.metadata.status = ModelStatus.TRAINING |
|
|
|
|
|
X_prepared = self._prepare_features(X) |
|
y_prepared = self._prepare_targets(y) |
|
|
|
|
|
self.metadata.feature_count = X_prepared.shape[2] |
|
self.metadata.sample_count = X_prepared.shape[0] |
|
|
|
|
|
self._model = self._create_model() |
|
self._train_lstm_model(X_prepared, y_prepared, **kwargs) |
|
|
|
|
|
self._is_trained = True |
|
self.metadata.status = ModelStatus.TRAINED |
|
self.metadata.last_trained = datetime.now() |
|
|
|
|
|
training_score = self._calculate_training_score(X_prepared, y_prepared) |
|
self.metadata.training_score = training_score |
|
|
|
|
|
training_result = TrainingResult( |
|
model_id=self.model_id, |
|
training_start=training_start, |
|
training_end=datetime.now(), |
|
training_score=training_score, |
|
validation_score=0.0, |
|
) |
|
self.training_history.append(training_result) |
|
|
|
logger.info(f"Training completed for {self.model_id}") |
|
return self |
|
|
|
except Exception as e: |
|
self.metadata.status = ModelStatus.FAILED |
|
logger.error(f"Training failed for {self.model_id}: {e}") |
|
raise |
|
|
|
def _train_lstm_model(self, X: np.ndarray, y: np.ndarray, **kwargs): |
|
"""Train the LSTM model with advanced techniques""" |
|
|
|
self._device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
self._model.to(self._device) |
|
|
|
|
|
X_tensor = torch.FloatTensor(X).to(self._device) |
|
y_tensor = torch.FloatTensor(y).to(self._device) |
|
|
|
|
|
dataset = TensorDataset(X_tensor, y_tensor) |
|
dataloader = DataLoader( |
|
dataset, batch_size=self.hyperparameters["batch_size"], shuffle=True |
|
) |
|
|
|
|
|
criterion = nn.MSELoss() |
|
optimizer = optim.Adam( |
|
self._model.parameters(), lr=self.hyperparameters["learning_rate"] |
|
) |
|
|
|
|
|
scheduler = optim.lr_scheduler.ReduceLROnPlateau( |
|
optimizer, mode="min", patience=5, factor=0.5 |
|
) |
|
|
|
|
|
best_loss = float("inf") |
|
patience_counter = 0 |
|
loss_history = [] |
|
|
|
for epoch in range(self.hyperparameters["epochs"]): |
|
self._model.train() |
|
total_loss = 0 |
|
|
|
for batch_X, batch_y in dataloader: |
|
optimizer.zero_grad() |
|
outputs = self._model(batch_X) |
|
loss = criterion(outputs, batch_y) |
|
loss.backward() |
|
optimizer.step() |
|
total_loss += loss.item() |
|
|
|
avg_loss = total_loss / len(dataloader) |
|
loss_history.append(avg_loss) |
|
|
|
|
|
scheduler.step(avg_loss) |
|
|
|
|
|
if avg_loss < best_loss: |
|
best_loss = avg_loss |
|
patience_counter = 0 |
|
else: |
|
patience_counter += 1 |
|
|
|
if patience_counter >= self.hyperparameters["patience"]: |
|
logger.info(f"Early stopping at epoch {epoch}") |
|
break |
|
|
|
if epoch % 10 == 0: |
|
logger.info(f"Epoch {epoch}, Loss: {avg_loss:.6f}") |
|
|
|
|
|
if self.training_history: |
|
self.training_history[-1].loss_history = loss_history |
|
self.training_history[-1].epochs = epoch + 1 |
|
self.training_history[-1].learning_rate = self.hyperparameters[ |
|
"learning_rate" |
|
] |
|
|
|
def _calculate_training_score(self, X: np.ndarray, y: np.ndarray) -> float: |
|
"""Calculate training score (RΒ² for regression)""" |
|
self._model.eval() |
|
with torch.no_grad(): |
|
X_tensor = torch.FloatTensor(X).to(self._device) |
|
y_pred = self._model(X_tensor).cpu().numpy() |
|
return r2_score(y.flatten(), y_pred.flatten()) |
|
|
|
def predict(self, X: Union[pd.DataFrame, np.ndarray]) -> PredictionResult: |
|
"""Make time series forecasts""" |
|
if not self._is_trained: |
|
raise RuntimeError( |
|
f"Model {self.model_id} must be trained before forecasting" |
|
) |
|
|
|
prediction_start = datetime.now() |
|
|
|
try: |
|
|
|
X_prepared = self._prepare_features(X) |
|
|
|
|
|
self._model.eval() |
|
with torch.no_grad(): |
|
X_tensor = torch.FloatTensor(X_prepared).to(self._device) |
|
predictions = self._model(X_tensor).cpu().numpy() |
|
|
|
|
|
processing_time = (datetime.now() - prediction_start).total_seconds() |
|
|
|
|
|
result = PredictionResult( |
|
prediction_id=f"{self.model_id}_{prediction_start.strftime('%Y%m%d_%H%M%S')}", |
|
model_id=self.model_id, |
|
timestamp=prediction_start, |
|
predictions=predictions, |
|
input_features=X.to_dict() if hasattr(X, "to_dict") else None, |
|
feature_names=getattr(self, "_feature_names", None), |
|
prediction_type=self.model_type.value, |
|
processing_time=processing_time, |
|
) |
|
|
|
return result |
|
|
|
except Exception as e: |
|
logger.error(f"Forecasting failed for {self.model_id}: {e}") |
|
raise |
|
|
|
def forecast_future(self, steps: int, last_sequence: np.ndarray) -> np.ndarray: |
|
"""Forecast future values beyond training data""" |
|
if not self._is_trained: |
|
raise RuntimeError("Model must be trained before forecasting") |
|
|
|
self._model.eval() |
|
forecasts = [] |
|
current_sequence = last_sequence.copy() |
|
|
|
with torch.no_grad(): |
|
for _ in range(steps): |
|
|
|
X_input = ( |
|
torch.FloatTensor(current_sequence).unsqueeze(0).to(self._device) |
|
) |
|
|
|
|
|
pred = self._model(X_input).cpu().numpy()[0] |
|
forecasts.append(pred) |
|
|
|
|
|
current_sequence = np.roll(current_sequence, -1, axis=0) |
|
current_sequence[-1] = pred |
|
|
|
return np.array(forecasts) |
|
|
|
|
|
class GDPImpactForecaster(BaseTimeSeriesForecaster): |
|
"""LSTM forecaster for GDP impact predictions""" |
|
|
|
def __init__(self, model_id: str = "gdp_impact_forecaster"): |
|
super().__init__( |
|
model_id=model_id, |
|
name="GDP Impact Forecaster", |
|
description="LSTM-based forecaster for predicting GDP impacts from tariff policies", |
|
sequence_length=24, |
|
forecast_horizon=12, |
|
) |
|
|
|
|
|
self.hyperparameters.update( |
|
{ |
|
"hidden_size": 256, |
|
"num_layers": 4, |
|
"learning_rate": 0.0005, |
|
"epochs": 150, |
|
} |
|
) |
|
|
|
logger.info(f"Initialized GDPImpactForecaster: {model_id}") |
|
|
|
def get_gdp_forecast(self, X: Union[pd.DataFrame, np.ndarray]) -> Dict[str, Any]: |
|
"""Get comprehensive GDP impact forecast""" |
|
prediction_result = self.predict(X) |
|
|
|
forecast = { |
|
"predictions": prediction_result.predictions.tolist(), |
|
"forecast_horizon": self.forecast_horizon, |
|
"confidence_intervals": self._calculate_confidence_intervals( |
|
prediction_result.predictions |
|
), |
|
"economic_indicators": self._analyze_gdp_indicators( |
|
prediction_result.predictions |
|
), |
|
"policy_implications": self._generate_policy_implications( |
|
prediction_result.predictions |
|
), |
|
} |
|
|
|
return forecast |
|
|
|
def _calculate_confidence_intervals( |
|
self, predictions: np.ndarray |
|
) -> Dict[str, List[float]]: |
|
"""Calculate confidence intervals for GDP forecasts""" |
|
|
|
|
|
|
|
confidence_levels = [0.68, 0.95, 0.99] |
|
intervals = {} |
|
|
|
for level in confidence_levels: |
|
if level == 0.68: |
|
multiplier = 1.0 |
|
elif level == 0.95: |
|
multiplier = 1.96 |
|
else: |
|
multiplier = 2.58 |
|
|
|
|
|
lower = predictions * (1 - 0.05 * multiplier) |
|
upper = predictions * (1 + 0.05 * multiplier) |
|
|
|
intervals[f"{int(level*100)}%_lower"] = lower.tolist() |
|
intervals[f"{int(level*100)}%_upper"] = upper.tolist() |
|
|
|
return intervals |
|
|
|
def _analyze_gdp_indicators(self, predictions: np.ndarray) -> Dict[str, Any]: |
|
"""Analyze GDP indicators from forecasts""" |
|
|
|
growth_rates = np.diff(predictions) / predictions[:-1] * 100 |
|
|
|
|
|
trend = "Growing" if np.mean(growth_rates) > 0 else "Declining" |
|
volatility = np.std(growth_rates) |
|
|
|
|
|
turning_points = [] |
|
for i in range(1, len(growth_rates) - 1): |
|
if ( |
|
growth_rates[i - 1] < growth_rates[i] > growth_rates[i + 1] |
|
or growth_rates[i - 1] > growth_rates[i] < growth_rates[i + 1] |
|
): |
|
turning_points.append(i + 1) |
|
|
|
return { |
|
"trend": trend, |
|
"average_growth_rate": float(np.mean(growth_rates)), |
|
"volatility": float(volatility), |
|
"turning_points": turning_points, |
|
"growth_acceleration": float(np.mean(np.diff(growth_rates))), |
|
} |
|
|
|
def _generate_policy_implications(self, predictions: np.ndarray) -> List[str]: |
|
"""Generate policy implications from GDP forecasts""" |
|
implications = [] |
|
|
|
|
|
if np.all(np.diff(predictions) > 0): |
|
implications.append( |
|
"Sustained GDP growth expected - maintain current policies" |
|
) |
|
elif np.any(np.diff(predictions) < 0): |
|
implications.append("GDP contraction possible - consider stimulus measures") |
|
|
|
|
|
growth_changes = np.diff(np.diff(predictions)) |
|
if np.any(growth_changes < 0): |
|
implications.append( |
|
"Growth deceleration detected - monitor economic indicators" |
|
) |
|
|
|
|
|
if np.std(np.diff(predictions)) > np.mean(np.diff(predictions)) * 0.5: |
|
implications.append( |
|
"High GDP volatility expected - implement stabilization measures" |
|
) |
|
|
|
return implications |
|
|
|
|
|
class TradeFlowForecaster(BaseTimeSeriesForecaster): |
|
"""LSTM forecaster for trade flow predictions""" |
|
|
|
def __init__(self, model_id: str = "trade_flow_forecaster"): |
|
super().__init__( |
|
model_id=model_id, |
|
name="Trade Flow Forecaster", |
|
description="LSTM-based forecaster for predicting trade flow impacts from tariffs", |
|
sequence_length=18, |
|
forecast_horizon=6, |
|
) |
|
|
|
|
|
self.hyperparameters.update( |
|
{"hidden_size": 192, "num_layers": 3, "learning_rate": 0.001, "epochs": 120} |
|
) |
|
|
|
logger.info(f"Initialized TradeFlowForecaster: {model_id}") |
|
|
|
def get_trade_forecast(self, X: Union[pd.DataFrame, np.ndarray]) -> Dict[str, Any]: |
|
"""Get comprehensive trade flow forecast""" |
|
prediction_result = self.predict(X) |
|
|
|
forecast = { |
|
"predictions": prediction_result.predictions.tolist(), |
|
"forecast_horizon": self.forecast_horizon, |
|
"trade_balance_analysis": self._analyze_trade_balance( |
|
prediction_result.predictions |
|
), |
|
"sector_impacts": self._analyze_sector_impacts( |
|
prediction_result.predictions |
|
), |
|
"policy_recommendations": self._generate_trade_policy_recommendations( |
|
prediction_result.predictions |
|
), |
|
} |
|
|
|
return forecast |
|
|
|
def _analyze_trade_balance(self, predictions: np.ndarray) -> Dict[str, Any]: |
|
"""Analyze trade balance from forecasts""" |
|
|
|
if predictions.shape[1] >= 2: |
|
imports = predictions[:, 0] |
|
exports = predictions[:, 1] |
|
balance = exports - imports |
|
|
|
return { |
|
"trade_balance": balance.tolist(), |
|
"balance_trend": ( |
|
"Improving" if np.all(np.diff(balance) > 0) else "Deteriorating" |
|
), |
|
"average_balance": float(np.mean(balance)), |
|
"balance_volatility": float(np.std(balance)), |
|
} |
|
else: |
|
return {"error": "Insufficient data for trade balance analysis"} |
|
|
|
def _analyze_sector_impacts(self, predictions: np.ndarray) -> Dict[str, Any]: |
|
"""Analyze sector-specific impacts""" |
|
|
|
return { |
|
"manufacturing": "Moderate impact expected", |
|
"agriculture": "High sensitivity to trade policy changes", |
|
"services": "Limited direct impact", |
|
"technology": "Varies by export dependency", |
|
} |
|
|
|
def _generate_trade_policy_recommendations( |
|
self, predictions: np.ndarray |
|
) -> List[str]: |
|
"""Generate trade policy recommendations""" |
|
recommendations = [] |
|
|
|
|
|
if np.all(np.diff(predictions.sum(axis=1)) > 0): |
|
recommendations.append( |
|
"Trade volume growing - maintain open trade policies" |
|
) |
|
else: |
|
recommendations.append("Trade volume declining - review trade restrictions") |
|
|
|
|
|
if predictions.shape[1] >= 2: |
|
balance = predictions[:, 1] - predictions[:, 0] |
|
if np.any(balance < 0): |
|
recommendations.append( |
|
"Trade deficit detected - consider export promotion" |
|
) |
|
|
|
return recommendations |
|
|
|
|
|
class EmploymentForecaster(BaseTimeSeriesForecaster): |
|
"""LSTM forecaster for employment impact predictions""" |
|
|
|
def __init__(self, model_id: str = "employment_forecaster"): |
|
super().__init__( |
|
model_id=model_id, |
|
name="Employment Impact Forecaster", |
|
description="LSTM-based forecaster for predicting employment impacts from tariff policies", |
|
sequence_length=36, |
|
forecast_horizon=12, |
|
) |
|
|
|
|
|
self.hyperparameters.update( |
|
{"hidden_size": 128, "num_layers": 3, "learning_rate": 0.001, "epochs": 100} |
|
) |
|
|
|
logger.info(f"Initialized EmploymentForecaster: {model_id}") |
|
|
|
|
|
class PriceImpactForecaster(BaseTimeSeriesForecaster): |
|
"""LSTM forecaster for price impact predictions""" |
|
|
|
def __init__(self, model_id: str = "price_impact_forecaster"): |
|
super().__init__( |
|
model_id=model_id, |
|
name="Price Impact Forecaster", |
|
description="LSTM-based forecaster for predicting price impacts from tariff policies", |
|
sequence_length=24, |
|
forecast_horizon=6, |
|
) |
|
|
|
|
|
self.hyperparameters.update( |
|
{"hidden_size": 160, "num_layers": 3, "learning_rate": 0.001, "epochs": 120} |
|
) |
|
|
|
logger.info(f"Initialized PriceImpactForecaster: {model_id}") |
|
|