diff --git "a/app.py" "b/app.py"
new file mode 100644--- /dev/null
+++ "b/app.py"
@@ -0,0 +1,1738 @@
+
+import pandas as pd
+import numpy as np
+import ccxt
+import time
+import os
+import csv # Import csv module for logging
+import traceback # Import traceback for detailed error logging
+from datetime import datetime, timedelta
+import warnings
+import plotly.graph_objects as go
+import plotly.colors as pcolors
+import gradio as gr
+
+# Import necessary TA indicators (Existing + New)
+from ta.trend import MACD, ADXIndicator, IchimokuIndicator, VortexIndicator
+from ta.momentum import RSIIndicator, StochasticOscillator, AwesomeOscillatorIndicator, WilliamsRIndicator
+from ta.volume import MFIIndicator, OnBalanceVolumeIndicator, ChaikinMoneyFlowIndicator, VolumeWeightedAveragePrice
+from ta.volatility import AverageTrueRange, BollingerBands
+
+# Suppress specific warnings
+warnings.filterwarnings('ignore', category=RuntimeWarning)
+warnings.filterwarnings('ignore', category=FutureWarning)
+warnings.filterwarnings('ignore', category=UserWarning) # Ignore some TA lib warnings if needed
+
+# --- Configuration ---
+DEFAULT_EXCHANGE_ID = 'mexc' # Changed default as requested context seemed to imply binance API issues
+DEFAULT_TOP_N_COINS = 30 # Reduced default due to increased backtest history
+DEFAULT_TIMEFRAMES = ['1m', '5m', '15m', '30m', '1h', '4h'] # Example Timeframes
+DEFAULT_MIN_CONFIRMATION = 0.75 # Used for Zone finding (Average Score)
+# Increased limits for longer backtesting
+LIMIT_PER_TIMEFRAME = 1050 # Needs to be >= BACKTEST_HISTORY_CANDLES + indicator lookbacks (~50)
+BACKTEST_HISTORY_CANDLES = 1000 # Increased backtest candle count
+# --- Trade Parameters ---
+ATR_SL_MULTIPLIER = 1.5
+ATR_TP1_MULTIPLIER = 1.0
+ATR_TP2_MULTIPLIER = 2.0
+LEVERAGES = [20, 50] # For display/estimation only
+SIMULATED_FEE_PERCENT = 0.06 # Approximate futures fee per side (entry/exit = *2)
+BACKTEST_RESULTS_FILE = 'backtest_summary_enhanced.csv'
+SIGNAL_LOG_FILE = 'realtime_signal_log.csv' # <<< New: CSV file for logging signals
+
+TIMEFRAME_ORDER_MAP = {
+ '1m': 1, '3m': 2, '5m': 3, '15m': 4, '30m': 5, '1h': 6, '2h': 7,
+ '4h': 8, '6h': 9, '8h': 10, '12h': 11, '1d': 12, '3d': 13, '1w': 14, '1M': 15
+}
+# TIMEFRAME_WEIGHTS not actively used in current logic, kept for potential future use.
+
+# --- CSV Signal Logging Function ---
+def log_signal_to_csv(signal_info):
+ """Appends signal information to the CSV log file."""
+ file_exists = os.path.isfile(SIGNAL_LOG_FILE)
+ fieldnames = [
+ 'LogTimestamp', 'SignalCandleTime', 'Symbol', 'Timeframe', 'Direction',
+ 'Entry', 'SL', 'TP1', 'TP2', 'Status', #'RSI', 'MACD_Diff', 'ADX' # Optional: Add key indicator values
+ ]
+ try:
+ with open(SIGNAL_LOG_FILE, 'a', newline='', encoding='utf-8') as csvfile:
+ writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
+ if not file_exists:
+ writer.writeheader() # Write header only if file is new
+ writer.writerow(signal_info)
+ except IOError as e:
+ print(f"Error: Could not write to CSV log file {SIGNAL_LOG_FILE}: {e}")
+ except Exception as e:
+ print(f"Error logging signal to CSV: {e}\n{traceback.format_exc()}")
+
+# --- Crypto Analysis Class ---
+class CryptoTrendIndicator:
+ def __init__(self, exchange_id, top_coins, selected_timeframes):
+ self.exchange_id = exchange_id
+ self.top_coins = top_coins
+ self.requested_timeframes = selected_timeframes
+ self.exchange = None
+ self.valid_timeframes = []
+ # Stores detailed results {symbol: {tf: {signals: {}, values: {}, trade_params: {}, direction: int}}}
+ self.analysis_results = {}
+ self.backtest_results = [] # List of dicts for backtest summary
+ self.heatmap_df = pd.DataFrame()
+ self.active_signals_df = pd.DataFrame() # DataFrame for active signals table
+ self.heatmap_details = {} # Stores dict for hover {Coin: {tf: {Ind: Val,...}}}
+
+ self._initialize_exchange()
+
+ def _initialize_exchange(self):
+ """Initialize the ccxt exchange instance."""
+ try:
+ # Force spot market for analysis consistency
+ self.exchange = getattr(ccxt, self.exchange_id)({
+ 'enableRateLimit': True,
+ 'options': {'defaultType': 'spot'} # Use SPOT for fetching data
+ })
+ # Increase timeout for potentially longer data fetches
+ self.exchange.timeout = 30000 # 30 seconds
+ self.exchange.load_markets(reload=True)
+ print(f"Exchange {self.exchange_id} initialized (using SPOT markets for data).")
+ self._validate_timeframes()
+ except AttributeError:
+ raise ValueError(f"Error: Exchange '{self.exchange_id}' not found or supported by ccxt.")
+ except ccxt.AuthenticationError as e:
+ raise ValueError(f"Authentication Error for {self.exchange_id}: {e}")
+ except ccxt.ExchangeError as e:
+ raise ValueError(f"Exchange Error initializing {self.exchange_id}: {e}")
+ except Exception as e:
+ raise ValueError(f"Unexpected error initializing exchange: {e}\n{traceback.format_exc()}")
+
+ def _validate_timeframes(self):
+ """Filter selected timeframes against those supported by the exchange."""
+ if not self.exchange or not self.exchange.timeframes:
+ print(f"Warning: Could not get timeframes from {self.exchange_id}. Cannot validate.")
+ # Attempt to use requested timeframes, hoping they are valid
+ self.valid_timeframes = sorted(
+ self.requested_timeframes,
+ key=lambda tf: TIMEFRAME_ORDER_MAP.get(tf, 99)
+ )
+ print(f"Proceeding with requested timeframes (validation skipped): {self.valid_timeframes}")
+ return
+
+ supported_tfs = self.exchange.timeframes
+ self.valid_timeframes = sorted(
+ [tf for tf in self.requested_timeframes if tf in supported_tfs],
+ key=lambda tf: TIMEFRAME_ORDER_MAP.get(tf, 99)
+ )
+ print(f"Supported timeframes for analysis: {self.valid_timeframes}")
+ if len(self.valid_timeframes) != len(self.requested_timeframes):
+ skipped = set(self.requested_timeframes) - set(self.valid_timeframes)
+ print(f"Warning: Skipped unsupported timeframes for {self.exchange_id}: {', '.join(skipped)}")
+ if not self.valid_timeframes:
+ print(f"Warning: No valid timeframes selected or supported by {self.exchange_id}.")
+
+
+ def fetch_top_coins(self):
+ """Fetch the top coins by USDT volume from the exchange (spot only)"""
+ if not self.exchange: return [], "Exchange not initialized"
+ # --- Logic unchanged ---
+ try:
+ tickers = self.exchange.fetch_tickers()
+ usdt_pairs = {}
+ # Stricter filtering for spot, non-leveraged, common pairs
+ for symbol, data in tickers.items():
+ try:
+ market = self.exchange.market(symbol)
+ if (symbol.endswith('/USDT') and
+ data is not None and
+ market is not None and market.get('spot', False) and # Explicitly check for spot
+ market.get('active', True) and
+ not market.get('leveraged', False) and # Exclude leveraged
+ data.get('quoteVolume') is not None and data['quoteVolume'] > 10000 and # Example: Filter low volume
+ data.get('symbol') is not None and
+ # Additional filters for common leveraged/problematic tokens
+ 'UP/' not in symbol and 'DOWN/' not in symbol and
+ 'BULL/' not in symbol and 'BEAR/' not in symbol and
+ '3L/' not in symbol and '3S/' not in symbol and
+ '5L/' not in symbol and '5S/' not in symbol
+ ):
+ usdt_pairs[symbol] = data
+ except ccxt.BadSymbol:
+ continue # Skip symbols that ccxt can't parse market data for
+ except Exception as e_inner:
+ # print(f"Minor error processing ticker {symbol}: {e_inner}") # Log minor errors
+ continue
+
+
+ if not usdt_pairs:
+ return [], f"No suitable USDT spot pairs found on {self.exchange_id} (check filters/volume)."
+
+ sorted_pairs = sorted(
+ usdt_pairs.items(),
+ key=lambda x: x[1]['quoteVolume'],
+ reverse=True
+ )
+
+ # Fetch slightly more initially to account for potential loading errors
+ fetch_limit = min(len(sorted_pairs), self.top_coins + 10) # Fetch slightly more
+ top_symbols_initial = [pair[0] for pair in sorted_pairs[:fetch_limit]]
+
+ # Filter again to ensure markets are loadable and active
+ final_symbols = []
+ count = 0
+ print(f"Validating {len(top_symbols_initial)} potential symbols...")
+ for s in top_symbols_initial:
+ if count >= self.top_coins:
+ break
+ try:
+ mkt = self.exchange.market(s) # Check if market data is valid & active
+ if mkt and mkt.get('active', True):
+ final_symbols.append(s)
+ count += 1
+ except ccxt.BadSymbol:
+ pass
+ except Exception as e:
+ print(f"Market {s} skipped during validation due to error: {e}")
+
+ msg = f"Fetched and validated top {len(final_symbols)} USDT spot pairs by volume from {self.exchange_id}."
+ print(msg)
+ if not final_symbols:
+ msg += " (Warning: Result list is empty)"
+ return final_symbols, msg
+
+ except (ccxt.NetworkError, ccxt.ExchangeNotAvailable, ccxt.RequestTimeout) as e:
+ msg = f"Network/Timeout Error fetching tickers from {self.exchange_id}: {e}."
+ print(msg)
+ return [], msg
+ except ccxt.ExchangeError as e:
+ msg = f"Exchange Error fetching tickers from {self.exchange_id}: {e}"
+ print(msg)
+ return [], msg
+ except Exception as e:
+ msg = f"An unexpected error occurred fetching top coins: {e}\n{traceback.format_exc()}"
+ print(msg)
+ return [], msg
+
+ def fetch_ohlcv_data(self, symbol, timeframe, limit=LIMIT_PER_TIMEFRAME):
+ """Fetches OHLCV data with retry mechanism."""
+ if not self.exchange: return [], "Exchange not initialized"
+ max_retries = 3
+ retry_delay = 5 # seconds
+ for attempt in range(max_retries):
+ try:
+ # print(f"Fetching {limit} candles for {symbol} {timeframe} (Attempt {attempt+1})...")
+ # *** Fetch as many as limit allows ***
+ # CCXT handles the max limit per request internally,
+ # this 'limit' param tells it the total number desired.
+ # If limit > exchange max, ccxt might fetch multiple times if supported,
+ # or just return the max allowed per call. We rely on ccxt's behavior here.
+ ohlcv = self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
+
+ if not ohlcv:
+ # If empty list is returned, treat as insufficient
+ return [], f"No data returned for {symbol} [{timeframe}]"
+ elif len(ohlcv) < 100: # Need substantial data for long backtest & indicators
+ return [], f"Insufficient data ({len(ohlcv)}) for {symbol} [{timeframe}] (Need >100)"
+ elif len(ohlcv) < BACKTEST_HISTORY_CANDLES + 50:
+ print(f"Warning: Fetched {len(ohlcv)} candles for {symbol} [{timeframe}], less than ideal ({BACKTEST_HISTORY_CANDLES + 50}) for full backtest + lookback.")
+ # Proceed anyway, backtest might be shorter than intended
+
+ # Successfully fetched enough data
+ print(f"Fetched {len(ohlcv)} candles for {symbol} [{timeframe}]")
+ return ohlcv, None
+
+ except (ccxt.NetworkError, ccxt.ExchangeNotAvailable, ccxt.RequestTimeout) as e:
+ print(f"Network error fetching {symbol} [{timeframe}] (Attempt {attempt+1}): {e}. Retrying in {retry_delay}s...")
+ if attempt == max_retries - 1:
+ return [], f"Network error {symbol} [{timeframe}] after {max_retries} attempts: {e}"
+ time.sleep(retry_delay + attempt * 2) # Incremental backoff
+ except ccxt.RateLimitExceeded as e:
+ print(f"Rate limit hit fetching {symbol} [{timeframe}]. Waiting longer...")
+ time.sleep(self.exchange.rateLimit / 1000 * 5 if self.exchange.rateLimit else 60) # Wait longer for rate limits
+ if attempt == max_retries - 1:
+ return [], f"Rate limit exceeded for {symbol} [{timeframe}] after retries: {e}"
+ # Continue to next attempt after waiting
+ except ccxt.BadSymbol:
+ return [], f"Invalid symbol {symbol}"
+ except ccxt.ExchangeError as e:
+ print(f"Exchange error fetching {symbol} [{timeframe}]: {e}")
+ # Handle specific errors if needed, e.g., timeframe not available for symbol
+ if 'timeframe not available' in str(e).lower():
+ return [], f"Timeframe {timeframe} not supported for {symbol} on {self.exchange_id}"
+ return [], f"Exchange error {symbol} [{timeframe}]: {e}"
+ except Exception as e:
+ print(f"Unexpected error fetching OHLCV {symbol} [{timeframe}]: {e}\n{traceback.format_exc()}")
+ return [], f"Unexpected error fetching OHLCV {symbol} [{timeframe}]"
+ return [], f"Failed to fetch data for {symbol} [{timeframe}] after {max_retries} attempts."
+
+
+ def calculate_indicators(self, ohlcv_data, timeframe):
+ """Calculates existing and new technical indicators."""
+ # Increased required length for Ichimoku and other lookbacks
+ required_length = 100
+ if not isinstance(ohlcv_data, list) or len(ohlcv_data) < required_length:
+ print(f"Indicator calc skip: Input data invalid or too short for {timeframe} (needs {required_length}, got {len(ohlcv_data) if ohlcv_data else 0})")
+ return None
+
+ try:
+ df = pd.DataFrame(ohlcv_data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
+ df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
+ df.set_index('timestamp', inplace=True)
+
+ # Convert to numeric, coerce errors, drop NaNs needed for core calcs
+ for col in ['open', 'high', 'low', 'close', 'volume']:
+ df[col] = pd.to_numeric(df[col], errors='coerce')
+ df.dropna(subset=['close', 'volume', 'high', 'low'], inplace=True) # Drop rows where essential data is missing
+
+ if df.empty or len(df) < required_length:
+ print(f"Data too short after cleaning for {timeframe} (needs {required_length}, got {len(df)})")
+ return None
+
+ # --- Calculate Existing Indicators ---
+ df['rsi'] = RSIIndicator(close=df['close'], window=14).rsi().fillna(50)
+ stoch_obj = StochasticOscillator(high=df['high'], low=df['low'], close=df['close'], window=14, smooth_window=3)
+ df['stoch_k'] = stoch_obj.stoch().fillna(50)
+ df['stoch_d'] = stoch_obj.stoch_signal().fillna(50)
+ ao_obj = AwesomeOscillatorIndicator(high=df['high'], low=df['low'], fillna=True)
+ df['ao'] = ao_obj.awesome_oscillator().fillna(0)
+ macd_obj = MACD(close=df['close'], window_slow=26, window_fast=12, window_sign=9, fillna=True)
+ df['macd'] = macd_obj.macd().fillna(0)
+ df['macd_signal'] = macd_obj.macd_signal().fillna(0)
+ df['macd_diff'] = macd_obj.macd_diff().fillna(0)
+ adx_obj = ADXIndicator(high=df['high'], low=df['low'], close=df['close'], window=14, fillna=True)
+ df['adx'] = adx_obj.adx().fillna(20)
+ df['adx_pos'] = adx_obj.adx_pos().fillna(0)
+ df['adx_neg'] = adx_obj.adx_neg().fillna(0)
+ df['ema_20'] = df['close'].ewm(span=20, adjust=False).mean()
+ df['ema_50'] = df['close'].ewm(span=50, adjust=False).mean()
+ df['ema_100'] = df['close'].ewm(span=100, adjust=False).mean()
+ df['mfi'] = MFIIndicator(high=df['high'], low=df['low'], close=df['close'], volume=df['volume'], window=14, fillna=True).money_flow_index().fillna(50)
+ df['obv'] = OnBalanceVolumeIndicator(close=df['close'], volume=df['volume'], fillna=True).on_balance_volume().fillna(method='ffill')
+ df['cmf'] = ChaikinMoneyFlowIndicator(high=df['high'], low=df['low'], close=df['close'], volume=df['volume'], window=20, fillna=True).chaikin_money_flow().fillna(0)
+ df['volume_sma'] = df['volume'].rolling(window=20, min_periods=10).mean()
+ # Handle potential division by zero or NaN in volume_sma
+ df['volume_ratio'] = (df['volume'] / df['volume_sma'].replace(0, np.nan)).replace([np.inf, -np.inf], 1.0).fillna(1.0)
+ df['atr'] = AverageTrueRange(high=df['high'], low=df['low'], close=df['close'], window=14, fillna=True).average_true_range().fillna(method='ffill').fillna(0)
+
+ # --- Calculate NEW Indicators ---
+ bb_obj = BollingerBands(close=df['close'], window=20, window_dev=2, fillna=True)
+ df['bb_hband'] = bb_obj.bollinger_hband()
+ df['bb_lband'] = bb_obj.bollinger_lband()
+ df['bb_mavg'] = bb_obj.bollinger_mavg()
+ df['bb_width'] = bb_obj.bollinger_wband()
+ ichi_obj = IchimokuIndicator(high=df['high'], low=df['low'], window1=9, window2=26, window3=52, fillna=True)
+ df['ichi_a'] = ichi_obj.ichimoku_a()
+ df['ichi_b'] = ichi_obj.ichimoku_b()
+ df['ichi_base'] = ichi_obj.ichimoku_base_line()
+ df['ichi_conv'] = ichi_obj.ichimoku_conversion_line()
+ df['will_r'] = WilliamsRIndicator(high=df['high'], low=df['low'], close=df['close'], lbp=14, fillna=True).williams_r().fillna(-50)
+ df['vwap'] = VolumeWeightedAveragePrice(high=df['high'], low=df['low'], close=df['close'], volume=df['volume'], window=14, fillna=True).volume_weighted_average_price().fillna(method='ffill')
+ vortex_obj = VortexIndicator(high=df['high'], low=df['low'], close=df['close'], window=14, fillna=True)
+ df['vortex_pos'] = vortex_obj.vortex_indicator_pos()
+ df['vortex_neg'] = vortex_obj.vortex_indicator_neg()
+ # --- End Indicators ---
+
+ # Fill any remaining NaNs using forward fill first, then backward fill
+ # This helps ensure indicators near the start/end are usable for signals/backtest
+ df.ffill(inplace=True)
+ df.bfill(inplace=True)
+
+ # Ensure sufficient length remains after potential NaNs at the start
+ min_usable_length = 50 # Minimum needed for signal generation logic using prev row
+ if len(df) < min_usable_length:
+ print(f"Data too short after indicator calculation & filling for {timeframe} (needs >{min_usable_length}, got {len(df)})")
+ return None
+
+ return df.copy() # Use copy to avoid SettingWithCopyWarning later
+ except Exception as e:
+ print(f"Error calculating indicators for {timeframe}: {e}\n{traceback.format_exc()}")
+ return None
+
+ def generate_signals_and_values(self, df):
+ """
+ Generates signals based on latest indicator values, including new ones.
+ Implements the "K-map" concept via multi-factor confirmation scoring.
+ Returns:
+ - final_signals (dict): Dictionary of individual indicator signals {indicator_name: signal (-1, 0, 1)}.
+ - values (dict): Dictionary of raw indicator values {indicator_name: value}.
+ - signal_direction (int): Overall signal direction (-1, 0, 1) based on composite score.
+ """
+ if df is None or not isinstance(df, pd.DataFrame) or len(df.index) < 2: # Check length using index after potential cleaning
+ # print("Signal Gen Skip: DataFrame invalid or too short.")
+ return None, None, None
+
+ try:
+ # Ensure index is datetime for proper iloc selection
+ if not isinstance(df.index, pd.DatetimeIndex):
+ df.index = pd.to_datetime(df.index)
+ df = df.sort_index() # Ensure sorting after conversion
+
+ # Use .iloc for positional access (robust to non-sequential indices)
+ latest = df.iloc[-1]
+ prev = df.iloc[-2]
+
+ signals = {} # Stores intermediate signals (can be 0.5, -0.5)
+ final_signals = {} # Stores final signals (-1, 0, 1)
+ values = {} # Store raw values
+
+ # --- Store latest values (including new ones) ---
+ values['price'] = latest['close']
+ values['volume'] = latest['volume']
+ values['timestamp'] = latest.name.strftime('%Y-%m-%d %H:%M:%S') # Use index name
+ values['atr'] = latest['atr']
+ # Momentum
+ values['rsi'] = latest['rsi']
+ values['stoch_k'] = latest['stoch_k']; values['stoch_d'] = latest['stoch_d']
+ values['ao'] = latest['ao']
+ values['will_r'] = latest['will_r'] # New
+ # Trend
+ values['macd'] = latest['macd']; values['macd_signal'] = latest['macd_signal']; values['macd_diff'] = latest['macd_diff']
+ values['adx'] = latest['adx']; values['adx_pos'] = latest['adx_pos']; values['adx_neg'] = latest['adx_neg']
+ values['ema_20'] = latest['ema_20']; values['ema_50'] = latest['ema_50']; values['ema_100'] = latest['ema_100']
+ values['ichi_a'] = latest['ichi_a']; values['ichi_b'] = latest['ichi_b']; values['ichi_base'] = latest['ichi_base']; values['ichi_conv'] = latest['ichi_conv'] # New
+ values['vortex_pos'] = latest['vortex_pos']; values['vortex_neg'] = latest['vortex_neg'] # New
+ # Volume
+ values['mfi'] = latest['mfi']
+ values['obv'] = latest['obv']
+ values['cmf'] = latest['cmf']
+ values['volume_ratio'] = latest['volume_ratio']
+ values['vwap'] = latest['vwap'] # New (volume-related)
+ # Volatility
+ values['bb_hband'] = latest['bb_hband']; values['bb_lband'] = latest['bb_lband']; values['bb_mavg'] = latest['bb_mavg']; values['bb_width'] = latest['bb_width'] # New
+ # ---
+
+ # --- Generate Signals (Using intermediate 'signals' dict) ---
+ # RSI (Momentum)
+ signals['rsi'] = 1 if latest['rsi'] < 30 else (-1 if latest['rsi'] > 70 else 0)
+ # Stochastic (Momentum) - Crossover signal
+ signals['stoch'] = 1 if latest['stoch_k'] < 25 and prev['stoch_k'] <= prev['stoch_d'] and latest['stoch_k'] > latest['stoch_d'] else (-1 if latest['stoch_k'] > 75 and prev['stoch_k'] >= prev['stoch_d'] and latest['stoch_k'] < latest['stoch_d'] else 0)
+ # Awesome Oscillator (Momentum) - Zero cross and twin peaks (simplified)
+ if latest['ao'] > 0 and prev['ao'] <= 0: signals['ao'] = 1
+ elif latest['ao'] < 0 and prev['ao'] >= 0: signals['ao'] = -1
+ elif latest['ao'] > 0 and prev['ao'] > 0 and latest['ao'] > prev['ao']: signals['ao'] = 0.5 # Bullish momentum
+ elif latest['ao'] < 0 and prev['ao'] < 0 and latest['ao'] < prev['ao']: signals['ao'] = -0.5 # Bearish momentum
+ else: signals['ao'] = 0
+ # Williams %R (Momentum) - Exiting Overbought/Oversold
+ signals['will_r'] = 1 if latest['will_r'] > -20 and prev['will_r'] <= -20 else (-1 if latest['will_r'] < -80 and prev['will_r'] >= -80 else 0)
+
+ # MACD (Trend/Momentum) - Line cross Signal
+ if (latest['macd'] > latest['macd_signal'] and prev['macd'] <= prev['macd_signal']): signals['macd'] = 1
+ elif (latest['macd'] < latest['macd_signal'] and prev['macd'] >= prev['macd_signal']): signals['macd'] = -1
+ else: signals['macd'] = 0
+ # ADX (Trend Strength) - Directional movement
+ signals['adx'] = 1 if latest['adx'] > 25 and latest['adx_pos'] > latest['adx_neg'] else (-1 if latest['adx'] > 25 and latest['adx_neg'] > latest['adx_pos'] else 0)
+ # EMA Trend (Trend)
+ if latest['close'] > latest['ema_20'] and latest['ema_20'] > latest['ema_50'] and latest['ema_50'] > latest['ema_100']: signals['ema_trend'] = 1
+ elif latest['close'] < latest['ema_20'] and latest['ema_20'] < latest['ema_50'] and latest['ema_50'] < latest['ema_100']: signals['ema_trend'] = -1
+ elif latest['close'] > latest['ema_50']: signals['ema_trend'] = 0.5 # Price above mid-term EMA
+ elif latest['close'] < latest['ema_50']: signals['ema_trend'] = -0.5 # Price below mid-term EMA
+ else: signals['ema_trend'] = 0
+ # Ichimoku (Trend) - TK cross, Cloud position, Price vs Kijun
+ ichi_signal = 0
+ tenkan_cross_kijun_up = latest['ichi_conv'] > latest['ichi_base'] and prev['ichi_conv'] <= prev['ichi_base']
+ tenkan_cross_kijun_down = latest['ichi_conv'] < latest['ichi_base'] and prev['ichi_conv'] >= prev['ichi_base']
+ above_cloud = latest['close'] > latest['ichi_a'] and latest['close'] > latest['ichi_b']
+ below_cloud = latest['close'] < latest['ichi_a'] and latest['close'] < latest['ichi_b']
+ price_above_kijun = latest['close'] > latest['ichi_base']
+ price_below_kijun = latest['close'] < latest['ichi_base']
+
+ if tenkan_cross_kijun_up and above_cloud and price_above_kijun: ichi_signal = 1 # Strong Bullish
+ elif tenkan_cross_kijun_down and below_cloud and price_below_kijun: ichi_signal = -1 # Strong Bearish
+ elif above_cloud and price_above_kijun and latest['ichi_conv'] > latest['ichi_base']: ichi_signal = 0.5 # Bullish Bias
+ elif below_cloud and price_below_kijun and latest['ichi_conv'] < latest['ichi_base']: ichi_signal = -0.5 # Bearish Bias
+ signals['ichimoku'] = ichi_signal
+ # Vortex (Trend) - Crossover
+ if latest['vortex_pos'] > latest['vortex_neg'] and prev['vortex_pos'] <= prev['vortex_neg']: signals['vortex'] = 1
+ elif latest['vortex_neg'] > latest['vortex_pos'] and prev['vortex_neg'] <= prev['vortex_pos']: signals['vortex'] = -1
+ else: signals['vortex'] = 0
+
+ # MFI (Volume/Momentum)
+ signals['mfi'] = 1 if latest['mfi'] < 20 else (-1 if latest['mfi'] > 80 else 0)
+ # CMF (Volume/Flow)
+ signals['cmf'] = 1 if latest['cmf'] > 0.05 else (-1 if latest['cmf'] < -0.05 else 0)
+ # OBV (Volume/Trend) - Simple trend vs moving average
+ if len(df) > 5:
+ try:
+ obv_sma5 = df['obv'].rolling(window=5).mean().iloc[-1]
+ # Check for NaN sma due to insufficient data at the start
+ if pd.notna(obv_sma5):
+ signals['obv_trend'] = 1 if latest['obv'] > obv_sma5 else (-1 if latest['obv'] < obv_sma5 else 0)
+ else: signals['obv_trend'] = 0
+ except IndexError: # Catch potential index error if rolling mean fails near start
+ signals['obv_trend'] = 0
+ else: signals['obv_trend'] = 0
+ # Volume Spike (Volume) - Relative to recent average
+ if latest['volume_ratio'] > 1.8: # Threshold for "high volume"
+ # Signal direction based on candle close vs open during spike
+ signals['vol_spike'] = 0.5 if latest['close'] > latest['open'] else (-0.5 if latest['close'] < latest['open'] else 0)
+ else: signals['vol_spike'] = 0
+ # VWAP (Volume/Price Level) - Price cross VWAP
+ signals['vwap_cross'] = 1 if latest['close'] > latest['vwap'] and prev['close'] <= prev['vwap'] else (-1 if latest['close'] < latest['vwap'] and prev['close'] >= prev['vwap'] else 0)
+
+ # Bollinger Bands (Volatility/Mean Reversion/Breakout) - Breakout example
+ if latest['close'] > latest['bb_hband'] and prev['close'] <= prev['bb_hband']: signals['bbands'] = 1
+ elif latest['close'] < latest['bb_lband'] and prev['close'] >= prev['bb_lband']: signals['bbands'] = -1
+ else: signals['bbands'] = 0
+ # --- End Signal Logic ---
+
+ # --- Final Cleanup & Composite (K-Map/Scoring Implementation) ---
+ # Ensure all indicators used in signals have a default entry
+ all_signal_keys = list(signals.keys()) # Get keys from the intermediate signals
+ for k in all_signal_keys:
+ values.setdefault(k, np.nan) # Ensure value exists even if calculation failed (use NaN)
+
+ # Convert intermediate 0.5/-0.5 signals to 1/-1 for final score, store in final_signals
+ for key, value in signals.items():
+ if value >= 0.5: final_signals[key] = 1
+ elif value <= -0.5: final_signals[key] = -1
+ else: final_signals[key] = 0
+
+ # Calculate composite score based on sum of FINAL (-1, 0, 1) signals
+ non_neutral_signals = [s for s in final_signals.values() if s != 0]
+ composite_score = sum(non_neutral_signals)
+
+ # Determine overall signal direction based on score magnitude
+ # Refined Threshold: Need ~30% net agreement among indicators, minimum 3 net signals
+ num_indicators_signaling = len(final_signals) # Count how many indicators produced a signal
+ signal_strength_threshold = max(3, int(num_indicators_signaling * 0.30))
+
+ signal_direction = 0
+ if num_indicators_signaling > 0: # Avoid division by zero if no signals generated
+ # Check if composite score meets the threshold
+ if composite_score >= signal_strength_threshold:
+ signal_direction = 1
+ elif composite_score <= -signal_strength_threshold:
+ signal_direction = -1
+
+ # Ensure all keys from values (excluding non-indicator ones) are in final_signals with 0 if not set
+ value_keys_for_signals = [k for k in values.keys() if k not in ['price', 'volume', 'timestamp', 'atr']]
+ for k in value_keys_for_signals:
+ final_signals.setdefault(k, 0) # Default to neutral if no signal logic applied
+
+ return final_signals, values, signal_direction
+
+ except KeyError as e:
+ print(f"KeyError during signal generation (likely missing indicator column: {e}) in DF columns: {df.columns if df is not None else 'None'}. Check calculation step.")
+ return None, None, None
+ except IndexError as e:
+ print(f"IndexError during signal generation (likely insufficient data rows for prev/latest): {e}. DF length: {len(df) if df is not None else 0}")
+ return None, None, None
+ except Exception as e:
+ print(f"Error generating signals/values: {e}\n{traceback.format_exc()}")
+ return None, None, None
+
+
+ def calculate_trade_params(self, values, signal_direction):
+ """Calculate Entry, SL, TP1, TP2 based on ATR"""
+ params = {'entry': None, 'sl': None, 'tp1': None, 'tp2': None, 'lev_profit': {}}
+ try:
+ # Validate necessary inputs more rigorously
+ if signal_direction == 0 or \
+ not values or \
+ pd.isna(values.get('atr')) or values.get('atr', 0) <= 0 or \
+ pd.isna(values.get('price')) or values.get('price', 0) <= 0:
+ return params # No signal or invalid data
+
+ entry_price = values['price']
+ atr_val = values['atr']
+ # Added check here as well
+ if atr_val <= 0 or entry_price <= 0 : return params
+
+ params['entry'] = entry_price
+
+ if signal_direction == 1: # Long
+ params['sl'] = entry_price - ATR_SL_MULTIPLIER * atr_val
+ params['tp1'] = entry_price + ATR_TP1_MULTIPLIER * atr_val
+ params['tp2'] = entry_price + ATR_TP2_MULTIPLIER * atr_val
+ elif signal_direction == -1: # Short
+ params['sl'] = entry_price + ATR_SL_MULTIPLIER * atr_val
+ params['tp1'] = entry_price - ATR_TP1_MULTIPLIER * atr_val
+ params['tp2'] = entry_price - ATR_TP2_MULTIPLIER * atr_val
+
+ # Ensure SL/TP are valid numbers and positive
+ # Also check if SL crossed entry (e.g., due to very small ATR) - invalidate if so.
+ if pd.isna(params['sl']) or pd.isna(params['tp1']) or pd.isna(params['tp2']) or \
+ params['sl'] <= 0 or params['tp1'] <= 0 or params['tp2'] <= 0 or \
+ (signal_direction == 1 and params['sl'] >= params['entry']) or \
+ (signal_direction == -1 and params['sl'] <= params['entry']):
+ # print(f"Warning: Invalid SL/TP (NaN, <=0, or SL crossed entry) for entry {entry_price:.5f}, ATR {atr_val:.5f}. Signal: {signal_direction}")
+ # Reset params if invalid
+ return {'entry': None, 'sl': None, 'tp1': None, 'tp2': None, 'lev_profit': {}}
+
+ # Calculate potential leveraged profit for $1 (simplified)
+ fee = SIMULATED_FEE_PERCENT / 100.0
+
+ for lev in LEVERAGES:
+ # Ratios based on entry price
+ profit_ratio_tp1 = abs(params['tp1'] - entry_price) / entry_price
+ loss_ratio_sl = abs(params['sl'] - entry_price) / entry_price
+
+ # Calculate gross P/L ratio with leverage
+ leveraged_profit_tp1 = profit_ratio_tp1 * lev
+ leveraged_loss_sl = loss_ratio_sl * lev
+
+ # Calculate fee impact (applied to leveraged position size)
+ fee_impact = 2 * fee * lev # Entry fee + Exit fee on leveraged amount
+
+ # Net profit/loss per $1 invested (considering $1 as margin)
+ dollar_profit_tp1 = leveraged_profit_tp1 - fee_impact
+ dollar_loss_sl = -leveraged_loss_sl - fee_impact # Loss is negative
+
+ params['lev_profit'][f'{lev}x'] = {'tp1_profit_$': round(dollar_profit_tp1, 3), 'sl_loss_$': round(dollar_loss_sl, 3)}
+
+ return params
+
+ except Exception as e:
+ print(f"Error calculating trade params for signal {signal_direction}, values {values}: {e}\n{traceback.format_exc()}")
+ return {'entry': None, 'sl': None, 'tp1': None, 'tp2': None, 'lev_profit': {}}
+
+
+ def _run_simple_backtest(self, symbol, timeframe, df):
+ """VERY Basic backtest simulation on the provided DataFrame (Uses longer history)."""
+ default_result = {'symbol': symbol, 'timeframe': timeframe, 'trades': 0, 'win_rate': 0, 'pnl_sum': 0, 'pnl_%_sum': 0}
+ # Need enough history + buffer for indicator lookbacks
+ min_backtest_len = BACKTEST_HISTORY_CANDLES + 50 # Need ~50 for lookback before backtest starts
+
+ # Check if DataFrame is valid and has enough rows
+ if df is None or not isinstance(df, pd.DataFrame) or len(df) < min_backtest_len:
+ # print(f"BT Skip {symbol} {timeframe}: Not enough data ({len(df) if df is not None else 0} < {min_backtest_len})")
+ return default_result
+
+ try:
+ # Ensure index is datetime and sorted for correct slicing
+ if not isinstance(df.index, pd.DatetimeIndex):
+ df.index = pd.to_datetime(df.index)
+ df = df.sort_index()
+
+ # Select the slice for backtesting simulation
+ # Use iloc for robustness to non-sequential indices after cleaning
+ backtest_start_iloc = len(df) - BACKTEST_HISTORY_CANDLES
+ if backtest_start_iloc < 0: backtest_start_iloc = 0 # Should not happen with check above, but safety
+
+ # Iterate through the candles *within the backtest period*
+ # Signal is generated using data *up to* candle i-1 close
+ # Trade entry occurs at candle i open
+ # Exit checks happen based on candle i high/low
+ trades = []
+ in_position = False
+ entry_price = 0
+ position_direction = 0 # 1 for long, -1 for short
+ stop_loss = 0
+ take_profit = 0 # Using TP1 for this simple backtest
+ entry_timestamp = None # For debugging/tracking
+
+ # Loop from the start of the backtest period + 1 (need previous candle for signal)
+ # Ensure we have enough lookback *before* the first signal candle (iloc-based)
+ # Ensure lookback of at least 50 candles for indicators
+ first_signal_candle_idx = max(50, backtest_start_iloc) # Start generating signals from here
+
+ for i in range(first_signal_candle_idx, len(df)):
+ current_row = df.iloc[i]
+ signal_candle_iloc = i - 1 # Signal based on close of previous candle (iloc)
+
+ # Slice original df up to and including the signal candle
+ # Use iloc slicing for performance and robustness
+ # Ensure the slice is valid
+ if signal_candle_iloc < 1: continue # Need at least 2 rows for signal calc
+ df_for_signal_calc = df.iloc[:signal_candle_iloc + 1]
+
+ # Check if df_for_signal_calc is valid before generating signal
+ if df_for_signal_calc is None or len(df_for_signal_calc) < 2:
+ continue # Skip if not enough data for signal calc
+
+ # Generate signal based on data ending at the previous candle's close
+ sim_signals, sim_values, sim_direction = self.generate_signals_and_values(df_for_signal_calc)
+
+ # --- Entry Logic ---
+ # Enter only if not already in position AND a clear signal occurs AND we have values
+ if not in_position and sim_direction != 0 and sim_values:
+ # Use ATR from the *signal candle* for SL/TP calc
+ atr_at_entry = sim_values.get('atr')
+ # Enter at current candle's open price
+ entry_price_candidate = current_row['open']
+ entry_timestamp = current_row.name # Timestamp of entry candle
+
+ # Validate entry conditions before taking position
+ if pd.notna(entry_price_candidate) and entry_price_candidate > 0 and \
+ pd.notna(atr_at_entry) and atr_at_entry > 0:
+
+ # Calculate potential SL/TP *before* deciding to enter fully
+ if sim_direction == 1: # Long
+ potential_sl = entry_price_candidate - ATR_SL_MULTIPLIER * atr_at_entry
+ potential_tp = entry_price_candidate + ATR_TP1_MULTIPLIER * atr_at_entry
+ else: # Short
+ potential_sl = entry_price_candidate + ATR_SL_MULTIPLIER * atr_at_entry
+ potential_tp = entry_price_candidate - ATR_TP1_MULTIPLIER * atr_at_entry
+
+ # Basic sanity check for SL/TP (positive values and SL not crossing entry)
+ if not (pd.isna(potential_sl) or pd.isna(potential_tp) or potential_sl <= 0 or potential_tp <= 0 or \
+ (sim_direction == 1 and potential_sl >= entry_price_candidate) or \
+ (sim_direction == -1 and potential_sl <= entry_price_candidate)):
+ # All checks passed, commit to position
+ in_position = True
+ position_direction = sim_direction
+ entry_price = entry_price_candidate
+ stop_loss = potential_sl
+ take_profit = potential_tp
+ # print(f"BT Enter {symbol} {timeframe} @ {entry_timestamp}: Dir={position_direction} Entry={entry_price:.4f} SL={stop_loss:.4f} TP={take_profit:.4f} ATR={atr_at_entry:.4f}")
+
+ # else: print(f"BT Skip Entry {symbol} {timeframe}: Invalid entry conditions (Price:{entry_price_candidate}, ATR:{atr_at_entry})")
+
+
+ # --- Exit Logic ---
+ # Check exits only if currently in a position
+ elif in_position:
+ exit_price = None
+ pnl = 0
+ exit_reason = "N/A"
+ current_high = current_row['high']
+ current_low = current_row['low']
+ exit_timestamp = current_row.name # Timestamp of exit check candle
+
+ # Validate high/low data
+ if pd.isna(current_high) or pd.isna(current_low):
+ print(f"BT Warning {symbol} {timeframe}: NaN High/Low at {exit_timestamp}, cannot check exit.")
+ continue # Skip exit check for this candle
+
+ # Check SL/TP hit based on current candle's high/low
+ # Important: Check SL first in case both are hit within the same candle
+ if position_direction == 1: # Long
+ if current_low <= stop_loss:
+ exit_price = stop_loss
+ exit_reason = "SL Hit"
+ elif current_high >= take_profit:
+ exit_price = take_profit
+ exit_reason = "TP1 Hit"
+ elif position_direction == -1: # Short
+ if current_high >= stop_loss:
+ exit_price = stop_loss
+ exit_reason = "SL Hit"
+ elif current_low <= take_profit:
+ exit_price = take_profit
+ exit_reason = "TP1 Hit"
+
+ # If an exit condition was met
+ if exit_price is not None:
+ # Calculate PnL based on entry and exit
+ # For shorts, PnL = Entry - Exit; for longs, PnL = Exit - Entry
+ if position_direction == 1:
+ pnl = exit_price - entry_price
+ else: # Short
+ pnl = entry_price - exit_price
+
+ # Calculate and subtract simulated fees
+ # Fee is % of trade value at entry and exit
+ entry_fee = (SIMULATED_FEE_PERCENT / 100.0) * entry_price
+ exit_fee = (SIMULATED_FEE_PERCENT / 100.0) * exit_price
+ pnl -= (entry_fee + exit_fee)
+
+ trades.append({'entry': entry_price, 'exit': exit_price, 'pnl': pnl, 'direction': position_direction})
+ # print(f"BT Exit {symbol} {timeframe} @ {exit_timestamp}: Reason={exit_reason} ExitPx={exit_price:.4f} PnL={pnl:.4f} (Entry @ {entry_price:.4f} on {entry_timestamp})")
+ in_position = False # Reset position state after closing trade
+ entry_timestamp = None # Reset entry timestamp
+
+ # --- Summarize Results ---
+ num_trades = len(trades)
+ if num_trades > 0:
+ wins = sum(1 for t in trades if t['pnl'] > 0)
+ win_rate = (wins / num_trades * 100)
+ total_pnl = sum(t['pnl'] for t in trades)
+ # Calculate PnL % sum based on entry price (approximation of capital growth)
+ pnl_percentage_sum = sum((t['pnl'] / t['entry']) * 100 for t in trades if t['entry'] > 0)
+ else:
+ win_rate = 0
+ total_pnl = 0
+ pnl_percentage_sum = 0
+
+ # print(f"BT Summary {symbol} {timeframe}: Trades={num_trades}, WinRate={win_rate:.2f}%, PnL Sum={total_pnl:.5f}, PnL % Sum={pnl_percentage_sum:.2f}%")
+
+ return {
+ 'symbol': symbol, 'timeframe': timeframe, 'trades': num_trades,
+ 'win_rate': round(win_rate, 2), 'pnl_sum': round(total_pnl, 5),
+ 'pnl_%_sum': round(pnl_percentage_sum, 2)
+ }
+ except Exception as e:
+ print(f"Error during backtest simulation for {symbol} {timeframe}: {e}\n{traceback.format_exc()}")
+ # Print details about the DataFrame state at error might help debugging
+ # print(f"DF info at error for {symbol} {timeframe}:")
+ # try: df.info()
+ # except: print("Could not get DF info")
+ return default_result
+
+
+ def analyze_symbol(self, symbol, progress):
+ """Analyzes a single symbol across all valid timeframes."""
+ timeframe_details = {}
+ heatmap_composites = {}
+ symbol_backtest_results = []
+ log_msgs = []
+ symbol_active_signals = {} # Store active signals {tf: {details}} for this symbol
+ symbol_hover_details = {} # Store hover details {tf: {ind: val}} for this symbol
+
+ if not self.valid_timeframes:
+ return timeframe_details, heatmap_composites, symbol_backtest_results, [f"No valid timeframes for {symbol}."], {}, {}
+
+ # Add small delay before starting analysis for a symbol to help with rate limits
+ time.sleep(0.1)
+
+ for i, timeframe in enumerate(self.valid_timeframes):
+ progress(i / len(self.valid_timeframes), desc=f"Fetching {symbol} [{timeframe}]")
+
+ # Fetch data
+ ohlcv, err_msg = self.fetch_ohlcv_data(symbol, timeframe)
+ if err_msg:
+ log_msgs.append(f"Data fetch skip: {symbol} [{timeframe}] {err_msg}")
+ heatmap_composites[timeframe] = 0
+ # Run backtest with None to get a default failure entry
+ symbol_backtest_results.append(self._run_simple_backtest(symbol, timeframe, None))
+ symbol_hover_details[timeframe] = {} # Placeholder
+ time.sleep(max(self.exchange.rateLimit / 1000 if self.exchange.rateLimit else 1, 0.3)) # Longer sleep on error
+ continue
+
+ # Calculate indicators
+ progress((i + 0.3) / len(self.valid_timeframes), desc=f"Calculating Ind. {symbol} [{timeframe}]")
+ df = self.calculate_indicators(ohlcv, timeframe)
+ if df is None or df.empty:
+ log_msgs.append(f"Indicator calc skip: {symbol} [{timeframe}] (DataFrame invalid or empty)")
+ heatmap_composites[timeframe] = 0
+ # Run backtest with None
+ symbol_backtest_results.append(self._run_simple_backtest(symbol, timeframe, None))
+ symbol_hover_details[timeframe] = {} # Placeholder
+ time.sleep(max(self.exchange.rateLimit / 1000 if self.exchange.rateLimit else 1, 0.2))
+ continue
+
+ # Generate signals/values based on the *latest* data
+ progress((i + 0.6) / len(self.valid_timeframes), desc=f"Generating Sig. {symbol} [{timeframe}]")
+ signals, values, signal_direction = self.generate_signals_and_values(df)
+
+ if signals is not None and values is not None:
+ # Calculate trade params based on the latest signal
+ trade_params = self.calculate_trade_params(values, signal_direction)
+ # Store all details for this timeframe
+ timeframe_details[timeframe] = {
+ 'signals': signals, # Individual indicator signals (-1, 0, 1)
+ 'values': values, # Raw indicator values
+ 'trade_params': trade_params, # Entry, SL, TP etc.
+ 'direction': signal_direction # Overall signal direction (-1, 0, 1)
+ }
+ # Calculate composite for heatmap (normalized sum of final signals)
+ num_potential_signals = len(signals)
+ composite = sum(signals.values()) / num_potential_signals if num_potential_signals > 0 else 0
+ heatmap_composites[timeframe] = composite
+
+ # Store key values for hover text
+ symbol_hover_details[timeframe] = {
+ 'Price': values.get('price', np.nan), # Add price to hover
+ 'RSI': values.get('rsi', np.nan),
+ 'MACD': values.get('macd', np.nan),
+ 'StochK': values.get('stoch_k', np.nan),
+ 'ADX': values.get('adx', np.nan),
+ 'WillR': values.get('will_r', np.nan),
+ }
+
+ # Store active signal IF direction is non-zero AND trade params are valid
+ if signal_direction != 0 and trade_params.get('entry') is not None:
+ active_sig_details = {
+ 'direction': signal_direction,
+ 'entry': trade_params['entry'],
+ 'sl': trade_params['sl'],
+ 'tp1': trade_params['tp1'],
+ 'tp2': trade_params['tp2']
+ }
+ symbol_active_signals[timeframe] = active_sig_details
+
+ # --- Log Signal to CSV ---
+ log_data = {
+ 'LogTimestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), # Log time
+ 'SignalCandleTime': values.get('timestamp', 'N/A'), # Time of candle signal was based on
+ 'Symbol': symbol,
+ 'Timeframe': timeframe,
+ 'Direction': 'LONG' if signal_direction == 1 else 'SHORT',
+ 'Entry': f"{trade_params['entry']:.8f}", # Use more precision for logging
+ 'SL': f"{trade_params['sl']:.8f}",
+ 'TP1': f"{trade_params['tp1']:.8f}",
+ 'TP2': f"{trade_params['tp2']:.8f}",
+ 'Status': 'Triggered' # Initial status
+ # Optional: Add key indicator values
+ #'RSI': f"{values.get('rsi', np.nan):.2f}",
+ #'MACD_Diff': f"{values.get('macd_diff', np.nan):.6f}",
+ #'ADX': f"{values.get('adx', np.nan):.2f}"
+ }
+ log_signal_to_csv(log_data)
+ # --- End CSV Logging ---
+
+ else:
+ log_msgs.append(f"Signal gen skip: {symbol} [{timeframe}]")
+ heatmap_composites[timeframe] = 0
+ symbol_hover_details[timeframe] = {} # Placeholder
+
+ # Run backtest using the full historical dataframe calculated earlier
+ progress((i + 0.9) / len(self.valid_timeframes), desc=f"Backtesting {symbol} [{timeframe}]")
+ # IMPORTANT: Pass a COPY of df to backtest to avoid modification issues
+ bt_result = self._run_simple_backtest(symbol, timeframe, df.copy())
+ symbol_backtest_results.append(bt_result)
+
+ # Sleep based on exchange rate limit (minimum 0.2s)
+ time.sleep(max(self.exchange.rateLimit / 1000 if self.exchange.rateLimit else 1, 0.2))
+
+ return timeframe_details, heatmap_composites, symbol_backtest_results, log_msgs, symbol_active_signals, symbol_hover_details
+
+
+ def run_full_analysis(self, progress=gr.Progress()):
+ """Runs the complete analysis for top coins across selected timeframes."""
+ self.analysis_results = {}
+ self.backtest_results = [] # Reset backtest results
+ self.active_signals_df = pd.DataFrame() # Reset active signals
+ self.heatmap_details = {} # Reset hover details
+ all_log_msgs = []
+ all_active_signals_list = [] # Temp list to build the DataFrame
+
+ # Start Fetching Coins
+ progress(0, desc="Fetching top coins...")
+ symbols, msg = self.fetch_top_coins()
+ all_log_msgs.append(msg)
+ if not symbols:
+ self.heatmap_df = pd.DataFrame()
+ all_log_msgs.append("Error: No symbols found to analyze. Stopping.")
+ # Return empty structures
+ return {}, pd.DataFrame(), [], pd.DataFrame(), {}, all_log_msgs
+
+ # Start Analyzing Symbols
+ heatmap_data_list = [] # For building heatmap DataFrame [{Coin: X, tf1: score1, ...}, ...]
+ total_symbols = len(symbols)
+ progress(0.05, desc=f"Starting analysis for {total_symbols} coins...")
+
+ for idx, symbol in enumerate(symbols):
+ # Calculate progress for this symbol
+ symbol_progress_start = 0.05 + (idx / total_symbols) * 0.90 # Leave space at end
+ symbol_progress_range = (1 / total_symbols) * 0.90
+ symbol_progress_desc = f"Analyzing {symbol} ({idx+1}/{total_symbols})"
+ progress(symbol_progress_start, desc=symbol_progress_desc)
+
+ # Create a lambda for inner progress updates relative to this symbol's range
+ symbol_progress_tracker = lambda p, desc=symbol_progress_desc: progress(
+ symbol_progress_start + (p * symbol_progress_range), desc=desc
+ )
+
+ try:
+ # Analyze symbol (includes signal gen, trade param calc, backtest sim)
+ # Returns: details per tf, heatmap scores per tf, backtest results list, logs, active signals dict, hover details dict
+ tf_details, tf_heatmap_scores, symbol_bt_results, log_msgs, symbol_active_tf_signals, symbol_tf_hover_details = \
+ self.analyze_symbol(symbol, symbol_progress_tracker)
+
+ all_log_msgs.extend(log_msgs)
+ self.backtest_results.extend(symbol_bt_results) # Aggregate backtest results
+
+ # Only process if we got some valid details back for the symbol
+ if tf_details: # Check if the dict is not empty
+ self.analysis_results[symbol] = tf_details # Store full details
+
+ coin_name = symbol.split('/')[0]
+ heatmap_row = {'Coin': coin_name}
+ symbol_hover_data = {} # To store hover details for this coin
+
+ # Populate heatmap row scores and collect hover details for the coin
+ for tf in self.valid_timeframes:
+ heatmap_row[tf] = tf_heatmap_scores.get(tf, 0) # Get score, default 0
+ symbol_hover_data[tf] = symbol_tf_hover_details.get(tf, {}) # Get hover dict, default empty
+
+ heatmap_data_list.append(heatmap_row)
+ self.heatmap_details[coin_name] = symbol_hover_data # Store hover details keyed by coin name
+
+ # Add any active signals found for this symbol to the main list
+ for tf, active_sig in symbol_active_tf_signals.items():
+ # Use more precision for the active signals table as well
+ all_active_signals_list.append({
+ 'Symbol': symbol,
+ 'Timeframe': tf,
+ 'Direction': 'LONG' if active_sig['direction'] == 1 else 'SHORT',
+ 'Entry': f"{active_sig['entry']:.8f}", # Format for display
+ 'SL': f"{active_sig['sl']:.8f}",
+ 'TP1': f"{active_sig['tp1']:.8f}",
+ 'TP2': f"{active_sig['tp2']:.8f}",
+ })
+
+ except ccxt.RateLimitExceeded as e:
+ wait_time = 60 # Longer wait time
+ all_log_msgs.append(f"Rate limit exceeded analyzing {symbol}. Sleeping for {wait_time}s... {e}")
+ print(all_log_msgs[-1])
+ progress(symbol_progress_start + symbol_progress_range * 0.9, desc=f"Rate Limit Hit! Waiting {wait_time}s...") # Update progress during wait
+ time.sleep(wait_time)
+ # Optionally, you might want to retry the symbol analysis here (more complex)
+ except Exception as e:
+ error_msg = f"Critical error processing {symbol}: {e}\n{traceback.format_exc()}"
+ all_log_msgs.append(error_msg)
+ print(error_msg)
+ # Add placeholder row to heatmap and default backtest results if analysis failed critically
+ coin_name = symbol.split('/')[0]
+ row = {'Coin': coin_name}
+ for tf in self.valid_timeframes: row[tf] = 0
+ heatmap_data_list.append(row)
+ self.heatmap_details[coin_name] = {tf: {} for tf in self.valid_timeframes} # Empty hover details
+ for tf in self.valid_timeframes:
+ # Add default failed backtest result for this timeframe
+ self.backtest_results.append({'symbol': symbol, 'timeframe': tf, 'trades': 0, 'win_rate': 0, 'pnl_sum': 0, 'pnl_%_sum': 0})
+
+ progress(0.95, desc="Finalizing results...") # Progress before final processing
+
+ # --- Final Processing ---
+ if not heatmap_data_list:
+ self.heatmap_df = pd.DataFrame()
+ all_log_msgs.append("Warning: No heatmap data generated (no symbols processed successfully?).")
+ else:
+ self.heatmap_df = pd.DataFrame(heatmap_data_list).set_index('Coin')
+ # Ensure columns are ordered correctly according to valid_timeframes
+ if self.valid_timeframes:
+ # Filter out columns not in valid_timeframes (if any slipped through)
+ cols_to_keep = [tf for tf in self.valid_timeframes if tf in self.heatmap_df.columns]
+ self.heatmap_df = self.heatmap_df[cols_to_keep]
+ # Reindex to ensure all valid_timeframes are present, filling missing with 0
+ # Make sure valid_timeframes is used for columns
+ self.heatmap_df = self.heatmap_df.reindex(columns=self.valid_timeframes, fill_value=0)
+ self.heatmap_df.index.name = 'Coin'
+
+
+ # Create Active Signals DataFrame from the collected list
+ if all_active_signals_list:
+ self.active_signals_df = pd.DataFrame(all_active_signals_list)
+ # Sort for better presentation
+ self.active_signals_df['tf_order'] = self.active_signals_df['Timeframe'].map(TIMEFRAME_ORDER_MAP)
+ self.active_signals_df = self.active_signals_df.sort_values(by=['Symbol', 'tf_order']).drop('tf_order', axis=1)
+ # Reorder columns for consistency
+ self.active_signals_df = self.active_signals_df[['Symbol', 'Timeframe', 'Direction', 'Entry', 'SL', 'TP1', 'TP2']]
+ else:
+ # Create empty DF with correct columns if no signals found
+ self.active_signals_df = pd.DataFrame(columns=['Symbol', 'Timeframe', 'Direction', 'Entry', 'SL', 'TP1', 'TP2'])
+
+
+ # Save backtest results to CSV
+ if self.backtest_results:
+ # Filter out any potential None results before creating DataFrame
+ valid_bt_results = [res for res in self.backtest_results if isinstance(res, dict)]
+ if valid_bt_results:
+ bt_df = pd.DataFrame(valid_bt_results)
+ try:
+ bt_df.to_csv(BACKTEST_RESULTS_FILE, index=False)
+ all_log_msgs.append(f"Backtest summary saved to {BACKTEST_RESULTS_FILE}")
+ except Exception as e:
+ all_log_msgs.append(f"Error saving backtest results: {e}")
+ print(f"Error saving backtest results: {e}\n{traceback.format_exc()}")
+ else:
+ all_log_msgs.append("No valid backtest results generated to save.")
+ else:
+ all_log_msgs.append("No backtest results generated.")
+
+ progress(1, desc="Analysis complete.")
+ all_log_msgs.append("Analysis Complete.")
+ if os.path.exists(SIGNAL_LOG_FILE):
+ all_log_msgs.append(f"Signals logged to {SIGNAL_LOG_FILE}")
+
+ # Return all generated data
+ return self.analysis_results, self.heatmap_df, self.backtest_results, self.active_signals_df, self.heatmap_details, all_log_msgs
+
+
+# --- Gradio Helper Functions ---
+
+def create_plotly_heatmap(df, heatmap_details):
+ """Creates heatmap with enhanced hover text including key indicator values."""
+ if df is None or df.empty:
+ print("Heatmap DF is empty, returning empty figure.")
+ return go.Figure(layout=go.Layout(title="No data available for heatmap", height=300))
+
+ colorscale = pcolors.diverging.RdYlGn
+ # Ensure data is numeric for heatmap values, coerce errors
+ try:
+ # Ensure index and columns are strings for processing if needed
+ df.index = df.index.astype(str)
+ df.columns = df.columns.astype(str)
+ heatmap_values = df.apply(pd.to_numeric, errors='coerce').fillna(0).values
+ rows = list(df.index)
+ cols = list(df.columns)
+ except Exception as e:
+ print(f"Error preparing heatmap data: {e}")
+ return go.Figure(layout=go.Layout(title=f"Error preparing heatmap: {e}", height=300))
+
+
+ # Create hover text matrix using heatmap_details
+ hover_texts = []
+ for r_idx, coin in enumerate(rows):
+ row_texts = []
+ coin_details_by_tf = heatmap_details.get(coin, {}) # Get details for the coin (keyed by BASE coin name now)
+ for c_idx, tf in enumerate(cols):
+ try:
+ score = heatmap_values[r_idx, c_idx]
+ # Lookup details for this specific coin/tf from the pre-computed dict
+ details = coin_details_by_tf.get(tf, {}) # Get details for this TF
+
+ # Format values from details, handle N/A or missing data gracefully
+ price_val = details.get('Price', 'N/A')
+ rsi_val = details.get('RSI', 'N/A')
+ macd_val = details.get('MACD', 'N/A')
+ stochk_val = details.get('StochK', 'N/A')
+ adx_val = details.get('ADX', 'N/A')
+ willr_val = details.get('WillR', 'N/A')
+
+ # Build hover text string
+ text = f"Coin: {coin}
"
+ text += f"Timeframe: {tf}
"
+ # Use more precision for price display in hover
+ text += f"Price: {float(price_val):.8f}
" if isinstance(price_val, (int, float)) and not pd.isna(price_val) else f"Price: {price_val}
"
+ text += f"Score: {score:.3f}
" # Show score from heatmap itself
+ text += "----------
"
+ # Format indicator values nicely
+ try: text += f"RSI: {float(rsi_val):.1f}
" if isinstance(rsi_val, (int, float)) and not pd.isna(rsi_val) else f"RSI: {rsi_val}
"
+ except: text += f"RSI: {rsi_val}
" # Fallback if conversion fails
+ try: text += f"MACD: {float(macd_val):.6f}
" if isinstance(macd_val, (int, float)) and not pd.isna(macd_val) else f"MACD: {macd_val}
"
+ except: text += f"MACD: {macd_val}
"
+ try: text += f"Stoch K: {float(stochk_val):.1f}
" if isinstance(stochk_val, (int, float)) and not pd.isna(stochk_val) else f"Stoch K: {stochk_val}
"
+ except: text += f"Stoch K: {stochk_val}
"
+ try: text += f"ADX: {float(adx_val):.1f}
" if isinstance(adx_val, (int, float)) and not pd.isna(adx_val) else f"ADX: {adx_val}
"
+ except: text += f"ADX: {adx_val}
"
+ try: text += f"Will %R: {float(willr_val):.1f}
" if isinstance(willr_val, (int, float)) and not pd.isna(willr_val) else f"Will %R: {willr_val}
"
+ except: text += f"Will %R: {willr_val}
"
+
+ text += "" # Hide default Plotly hover labels
+
+ row_texts.append(text)
+ except Exception as hover_e:
+ print(f"Error generating hover text for {coin}/{tf}: {hover_e}")
+ row_texts.append(f"Error displaying hover for {coin}/{tf}") # Add error placeholder
+ hover_texts.append(row_texts)
+
+ try:
+ fig = go.Figure(data=go.Heatmap(
+ z=heatmap_values,
+ x=cols,
+ y=rows,
+ colorscale=colorscale,
+ zmid=0, zmin=-0.6, zmax=0.6, # Adjusted range based on normalized score
+ hoverongaps=False,
+ hoverinfo='text', # Use the custom text matrix for hover
+ text=hover_texts, # Assign the text matrix
+ texttemplate=None # Disable texttemplate when using hoverinfo='text'
+ ))
+ fig.update_layout(
+ title='Cryptocurrency Signal Strength Heatmap (Hover for Key Values, Click Cell for Full Details)',
+ xaxis_title='Timeframe',
+ yaxis_title='Coin',
+ yaxis={'tickmode': 'linear', 'tickfont': {'size': 9}, 'automargin': True}, # Automargin for y-axis labels
+ xaxis={'tickmode': 'linear'},
+ height=max(450, len(rows) * 18 + 100), # Dynamic height
+ margin=dict(l=70, r=50, t=60, b=50)
+ )
+ # print("Plotly heatmap figure created successfully.") # Debug print
+ return fig
+ except Exception as e:
+ print(f"Error creating Plotly heatmap figure: {e}\n{traceback.format_exc()}")
+ # Return an empty figure with error message
+ return go.Figure(layout=go.Layout(title=f"Error creating heatmap: {e}", height=400))
+
+
+def format_heatmap_click_details(evt: gr.SelectData, current_state):
+ """Formats detailed analysis results for the specific cell clicked on the heatmap."""
+ # Check if event data is valid (using SelectData attributes)
+ if evt is None or evt.index is None or not isinstance(evt.index, (list, tuple)) or len(evt.index) != 2:
+ # print("Debug: format_heatmap_click_details called with invalid event data:", evt)
+ return "Click on a heatmap cell after running analysis to see details here. (Ensure you clicked a colored cell)"
+
+ # Check if state and necessary data are present
+ if not isinstance(current_state, dict) or 'analysis_results' not in current_state or 'heatmap_df' not in current_state:
+ # print("Debug: State invalid or missing analysis_results or heatmap_df")
+ return "Analysis data not found in state. Please run the analysis first."
+
+ try:
+ row_index, col_index = evt.index
+ heatmap_df = current_state.get('heatmap_df')
+ analysis_data = current_state.get('analysis_results') # This holds the full nested dict {symbol: {tf: details}}
+
+ if heatmap_df is None or heatmap_df.empty or analysis_data is None:
+ # print("Debug: Heatmap or analysis data is None or empty.")
+ return "Heatmap or analysis data is not available. Please run analysis."
+
+ # Validate indices against the DataFrame dimensions
+ if not (0 <= row_index < len(heatmap_df.index) and 0 <= col_index < len(heatmap_df.columns)):
+ # print(f"Debug: Indices out of bounds. Row: {row_index} (max: {len(heatmap_df.index)-1}), Col: {col_index} (max: {len(heatmap_df.columns)-1})")
+ return "Error: Clicked cell index is out of bounds."
+
+ coin_name = heatmap_df.index[row_index] # This is the base coin (e.g., 'BTC')
+ timeframe = heatmap_df.columns[col_index]
+ # print(f"Debug: Clicked on Coin: {coin_name}, Timeframe: {timeframe}")
+
+ # Find the full symbol (e.g., BTC/USDT) in the analysis_data keys
+ full_symbol = None
+ for symbol_key in analysis_data.keys():
+ # Match based on the start of the symbol key (more robust)
+ if symbol_key.startswith(str(coin_name) + '/'): # Ensure coin_name is string
+ full_symbol = symbol_key
+ break
+
+ if not full_symbol or full_symbol not in analysis_data:
+ # print(f"Debug: Full symbol not found or no data for {full_symbol} (Base: {coin_name})")
+ return f"Details not found for {coin_name} (symbol mismatch or no analysis data?)."
+
+ # Get the specific details for the symbol and timeframe
+ details = analysis_data.get(full_symbol, {}).get(timeframe)
+ if details is None:
+ # print(f"Debug: No details found for {full_symbol} on timeframe {timeframe}")
+ return f"No analysis data available for {coin_name} on {timeframe}."
+
+ # --- Format Markdown String ---
+ values = details.get('values', {}) # Raw indicator values
+ signals = details.get('signals', {}) # Final signals (-1, 0, 1) per indicator
+ trade_params = details.get('trade_params', {}) # Entry, SL, TP etc.
+ direction = details.get('direction', 0) # Overall signal direction
+
+ if not values or not signals:
+ # print(f"Debug: Missing 'values' or 'signals' dict in details for {full_symbol} / {timeframe}")
+ return f"Incomplete data for {coin_name} [{timeframe}]. Cannot display details."
+
+ markdown_str = f"### Details for {coin_name} ({full_symbol}) [{timeframe}]\n\n"
+ markdown_str += f"- **Timestamp:** {values.get('timestamp', 'N/A')}\n"
+ price = values.get('price', np.nan)
+ markdown_str += f"- **Price:** {price:.8f}\n" if isinstance(price, (int, float)) and not pd.isna(price) else f"- **Price:** {price}\n"
+ volume = values.get('volume', np.nan)
+ markdown_str += f"- **Volume:** {volume:,.0f}\n" if isinstance(volume, (int, float)) and not pd.isna(volume) else f"- **Volume:** {volume}\n"
+ atr = values.get('atr', np.nan)
+ markdown_str += f"- **ATR:** {atr:.8f}\n\n" if isinstance(atr, (int, float)) and not pd.isna(atr) else f"- **ATR:** {atr}\n\n"
+
+ # Display the FINAL calculated direction based on the composite score threshold
+ markdown_str += f"**Overall Signal Direction:** {'BULLISH (+1)' if direction > 0 else ('BEARISH (-1)' if direction < 0 else 'NEUTRAL (0)')}\n\n"
+
+ markdown_str += "**Indicator Values & Individual Signals:**\n"
+ markdown_str += "| Indicator | Value | Signal |\n" # Adjusted padding
+ markdown_str += "|----------------|-----------------|--------|\n"
+
+ # Get keys from signals dict, which should represent all indicators evaluated
+ indicator_keys = sorted(signals.keys())
+
+ for name in indicator_keys:
+ val = values.get(name, 'N/A') # Get raw value
+ sig_val = signals.get(name, 0) # Get the final -1, 0, 1 signal
+ signal_char = '🟩 (+1)' if sig_val > 0 else ('🟥 (-1)' if sig_val < 0 else '⬜ (0)')
+
+ # Formatting value carefully
+ if isinstance(val, (int, float)) and not pd.isna(val):
+ if abs(val) > 100000: val_str = f"{val:,.0f}" # Large integer
+ elif abs(val) > 100: val_str = f"{val:,.2f}" # Moderate number
+ elif abs(val) < 0.000001 and abs(val) > 0: val_str = f"{val:.4e}" # Very small number (adjust precision)
+ elif abs(val) < 1: val_str = f"{val:.8f}" # Small decimal (more precision)
+ else: val_str = f"{val:.6f}" # Default decimal (more precision)
+ elif pd.isna(val): val_str = "NaN"
+ else: val_str = str(val) # Non-numeric
+
+ # Pad indicator name and value for alignment in Markdown table
+ markdown_str += f"| {name:<14} | {val_str:<15} | {signal_char:<7} |\n"
+
+ # --- Add Trade Parameters Section ---
+ if trade_params and trade_params.get('entry') is not None:
+ markdown_str += f"\n**Potential Trade Setup (Based on this signal & ATR):**\n"
+ markdown_str += f"- **Direction:** {'LONG' if direction > 0 else 'SHORT'}\n"
+ # More precision for trade params display
+ markdown_str += f"- **Entry:** {trade_params['entry']:.8f}\n"
+ markdown_str += f"- **Stop Loss:** {trade_params['sl']:.8f}\n"
+ markdown_str += f"- **Take Profit 1:** {trade_params['tp1']:.8f}\n"
+ markdown_str += f"- **Take Profit 2:** {trade_params['tp2']:.8f}\n\n"
+ markdown_str += f"**Est. P/L per $1 Margin (TP1/SL Hit, incl. ~{SIMULATED_FEE_PERCENT*2:.2f}% fees):**\n"
+ for lev, pnl_data in trade_params.get('lev_profit', {}).items():
+ tp1_pnl = pnl_data.get('tp1_profit_$','N/A')
+ sl_loss = pnl_data.get('sl_loss_$','N/A')
+ # Format P/L values
+ tp1_pnl_str = f"{tp1_pnl:.3f}" if isinstance(tp1_pnl, (int, float)) else str(tp1_pnl)
+ sl_loss_str = f"{sl_loss:.3f}" if isinstance(sl_loss, (int, float)) else str(sl_loss)
+ markdown_str += f" - **{lev}:** Profit ${tp1_pnl_str} / Loss ${sl_loss_str}\n"
+ else:
+ markdown_str += f"\n**Trade Setup:** No active trade signal ({'Neutral' if direction == 0 else 'Params Invalid'}) generated for this timeframe at this time.\n"
+
+ # print(f"Debug: Successfully formatted details for {full_symbol} / {timeframe}")
+ return markdown_str
+
+ except IndexError:
+ # print("Debug: IndexError during heatmap click processing.")
+ return "Error processing click: Index out of range. Please ensure the heatmap is up to date."
+ except KeyError as e:
+ # print(f"Debug: KeyError processing click: Missing key '{e}'.")
+ return f"Error processing click: Missing expected data key '{e}'. Analysis data might be incomplete."
+ except Exception as e:
+ print(f"Unexpected error formatting heatmap click details: {e}\n{traceback.format_exc()}")
+ return f"An unexpected error occurred displaying details for the clicked cell: {e}"
+
+def format_coin_details(symbol, analysis_data, valid_timeframes):
+ """Formats full details for a selected coin across all analyzed timeframes."""
+ if not analysis_data or symbol not in analysis_data:
+ return f"No analysis data available for {symbol}. Please run analysis first."
+
+ coin_data_per_tf = analysis_data[symbol]
+ markdown_str = f"## Full Details for {symbol}\n\n"
+
+ # Iterate through the timeframes the analysis was run for
+ # Use the provided valid_timeframes list for consistent ordering
+ for timeframe in valid_timeframes:
+ markdown_str += f"---\n### Timeframe: {timeframe}\n"
+ details = coin_data_per_tf.get(timeframe)
+
+ # Check if details exist for this timeframe
+ if details is None:
+ markdown_str += f"*No analysis data generated for this timeframe.*\n"
+ continue # Skip to the next timeframe
+
+ # Extract components, check if they exist
+ values = details.get('values')
+ signals = details.get('signals')
+ trade_params = details.get('trade_params')
+ direction = details.get('direction') # Use the stored direction
+
+ if values is None or signals is None or trade_params is None or direction is None:
+ markdown_str += f"*Incomplete analysis data for this timeframe.*\n"
+ continue
+
+ # --- Format Section for this Timeframe (similar to heatmap click) ---
+ markdown_str += f"- Timestamp: {values.get('timestamp', 'N/A')}\n"
+ price = values.get('price', np.nan)
+ markdown_str += f"- Price: {price:.8f}\n" if isinstance(price, (int, float)) and not pd.isna(price) else f"- Price: {price}\n"
+ volume = values.get('volume', np.nan)
+ markdown_str += f"- Volume: {volume:,.0f}\n" if isinstance(volume, (int, float)) and not pd.isna(volume) else f"- Volume: {volume}\n"
+ atr = values.get('atr', np.nan)
+ markdown_str += f"- ATR: {atr:.8f}\n\n" if isinstance(atr, (int, float)) and not pd.isna(atr) else f"- ATR: {atr}\n\n"
+
+ markdown_str += f"**Overall Signal Direction:** {'BULLISH (+1)' if direction > 0 else ('BEARISH (-1)' if direction < 0 else 'NEUTRAL (0)')}\n\n"
+
+ markdown_str += "**Indicator Values & Individual Signals:**\n"
+ markdown_str += "| Indicator | Value | Signal |\n"
+ markdown_str += "|----------------|-----------------|--------|\n"
+ indicator_keys = sorted(signals.keys())
+ for name in indicator_keys:
+ val = values.get(name, 'N/A')
+ sig_val = signals.get(name, 0)
+ signal_char = '🟩 (+1)' if sig_val > 0 else ('🟥 (-1)' if sig_val < 0 else '⬜ (0)')
+ # Value formatting (reuse from heatmap click with more precision)
+ if isinstance(val, (int, float)) and not pd.isna(val):
+ if abs(val) > 100000: val_str = f"{val:,.0f}"
+ elif abs(val) > 100: val_str = f"{val:,.2f}"
+ elif abs(val) < 0.000001 and abs(val) > 0: val_str = f"{val:.4e}" # Very small number (adjust precision)
+ elif abs(val) < 1: val_str = f"{val:.8f}" # Small decimal (more precision)
+ else: val_str = f"{val:.6f}" # Default decimal (more precision)
+ elif pd.isna(val): val_str = "NaN"
+ else: val_str = str(val)
+ markdown_str += f"| {name:<14} | {val_str:<15} | {signal_char:<7} |\n"
+
+ # Trade Params Section
+ if trade_params and trade_params.get('entry') is not None:
+ markdown_str += f"\n**Potential Trade Setup:**\n"
+ markdown_str += f"- Direction: {'LONG' if direction > 0 else 'SHORT'}\n"
+ # More precision
+ markdown_str += f"- Entry: {trade_params['entry']:.8f}\n"
+ markdown_str += f"- SL: {trade_params['sl']:.8f}\n"
+ markdown_str += f"- TP1: {trade_params['tp1']:.8f}\n"
+ markdown_str += f"- TP2: {trade_params['tp2']:.8f}\n\n"
+ markdown_str += f"**Est. P/L per $1 Margin (TP1/SL Hit, incl. ~{SIMULATED_FEE_PERCENT*2:.2f}% fees):**\n"
+ for lev, pnl_data in trade_params.get('lev_profit', {}).items():
+ tp1_pnl = pnl_data.get('tp1_profit_$','N/A')
+ sl_loss = pnl_data.get('sl_loss_$','N/A')
+ tp1_pnl_str = f"{tp1_pnl:.3f}" if isinstance(tp1_pnl, (int, float)) else str(tp1_pnl)
+ sl_loss_str = f"{sl_loss:.3f}" if isinstance(sl_loss, (int, float)) else str(sl_loss)
+ markdown_str += f" - **{lev}:** Profit ${tp1_pnl_str} / Loss ${sl_loss_str}\n"
+ else:
+ markdown_str += f"\n**Trade Setup:** No active trade signal ({'Neutral' if direction == 0 else 'Params Invalid'}) generated for this timeframe.\n"
+
+ markdown_str += "\n---\n" # End separator for the coin
+ return markdown_str
+
+
+def find_zones(heatmap_df, min_confirmation_threshold):
+ """Finds potential long/short zones based on average heatmap score."""
+ if heatmap_df is None or heatmap_df.empty:
+ print("Zone finding skipped: Heatmap DF is empty.")
+ return [], []
+
+ try:
+ # Calculate average score across valid timeframes for each coin
+ # Ensure we only average over numeric columns
+ numeric_df = heatmap_df.apply(pd.to_numeric, errors='coerce').dropna(axis=1, how='all') # Drop cols that are all NaN
+ if numeric_df.empty:
+ print("Zone finding skipped: No numeric timeframe columns found after coercion.")
+ return [],[]
+
+ # Calculate mean, skipping NaNs if any occurred during coercion
+ avg_scores = numeric_df.mean(axis=1, skipna=True)
+
+ # Define thresholds based on the average score range (-1 to 1 typically)
+ # Threshold requires a minimum average positive/negative score.
+ # Example: if min_confirmation_threshold is 0.75, zones need avg > 0.3 or avg < -0.3
+ zone_threshold_abs = max(0.1, min(1.0, min_confirmation_threshold * 0.4)) # Adjusted multiplier
+ long_zone_threshold = zone_threshold_abs
+ short_zone_threshold = -zone_threshold_abs
+
+ # print(f"Debug Zones: Long Threshold={long_zone_threshold:.3f}, Short Threshold={short_zone_threshold:.3f}")
+
+ # Filter based on thresholds
+ long_zone_coins = avg_scores[avg_scores >= long_zone_threshold].sort_values(ascending=False)
+ short_zone_coins = avg_scores[avg_scores <= short_zone_threshold].sort_values(ascending=True)
+
+ # Prepare lists of tuples (Coin, Score)
+ long_zone_list = list(zip(long_zone_coins.index.astype(str), long_zone_coins.round(3)))
+ short_zone_list = list(zip(short_zone_coins.index.astype(str), short_zone_coins.round(3)))
+
+ print(f"Zones Found: {len(long_zone_list)} long, {len(short_zone_list)} short candidates (Threshold +/- {zone_threshold_abs:.3f}).")
+ return long_zone_list, short_zone_list
+ except Exception as e:
+ print(f"Error finding zones: {e}\n{traceback.format_exc()}")
+ return [],[] # Return empty lists on error
+
+def format_backtest_summary(backtest_results):
+ """Formats the list of backtest result dictionaries into a DataFrame for display."""
+ if not backtest_results:
+ print("No backtest results to format.")
+ # Return empty DF with correct columns if no results
+ return pd.DataFrame(columns=['symbol', 'timeframe', 'trades', 'win_rate', 'pnl_abs_sum', 'pnl_%_sum_on_entry'])
+
+ try:
+ # Filter out potential non-dict entries just in case
+ valid_results = [r for r in backtest_results if isinstance(r, dict)]
+ if not valid_results:
+ print("No valid dictionary entries found in backtest results.")
+ return pd.DataFrame(columns=['symbol', 'timeframe', 'trades', 'win_rate', 'pnl_abs_sum', 'pnl_%_sum_on_entry'])
+
+ df = pd.DataFrame(valid_results)
+
+ # Ensure required columns exist, fill with defaults if missing
+ required_cols = ['symbol', 'timeframe', 'trades', 'win_rate', 'pnl_sum', 'pnl_%_sum']
+ for col in required_cols:
+ if col not in df.columns:
+ print(f"Warning: Backtest result missing column '{col}', filling default.")
+ if col in ['trades', 'win_rate', 'pnl_sum', 'pnl_%_sum']:
+ df[col] = 0 # Default numeric to 0
+ else:
+ df[col] = 'N/A' # Default string to N/A
+
+
+ # Sort by symbol then timeframe order
+ df['tf_order'] = df['timeframe'].map(TIMEFRAME_ORDER_MAP).fillna(99) # Handle potential unknown timeframes
+ df = df.sort_values(by=['symbol', 'tf_order']).drop('tf_order', axis=1)
+
+ # Rename columns for better display clarity
+ df = df.rename(columns={'pnl_sum': 'pnl_abs_sum', 'pnl_%_sum': 'pnl_%_sum_on_entry'})
+
+ # Select and reorder columns for final display
+ display_cols = ['symbol', 'timeframe', 'trades', 'win_rate', 'pnl_abs_sum', 'pnl_%_sum_on_entry']
+ # Ensure all display columns exist before selecting
+ final_cols = [col for col in display_cols if col in df.columns]
+ df = df[final_cols]
+
+ return df
+ except Exception as e:
+ print(f"Error formatting backtest summary: {e}\n{traceback.format_exc()}")
+ # Return empty DF on error
+ return pd.DataFrame(columns=['symbol', 'timeframe', 'trades', 'win_rate', 'pnl_abs_sum', 'pnl_%_sum_on_entry'])
+
+
+# --- Gradio App Definition ---
+def create_gradio_app():
+ with gr.Blocks(theme=gr.themes.Soft(primary_hue=gr.themes.colors.blue), title="Crypto Signal & Backtest V3") as app:
+ gr.Markdown("# Crypto Multi-Indicator, Multi-Timeframe Signal & Backtest V3")
+ gr.Markdown(f"*Warning: Analysis uses up to **{LIMIT_PER_TIMEFRAME}** candles per timeframe and backtests on the last **{BACKTEST_HISTORY_CANDLES}**. This can be **very slow** and **API-intensive**. Rate limits may occur. Use fewer coins/timeframes for faster results. Signals are logged to **`{SIGNAL_LOG_FILE}`**.*")
+
+ # Global state to store results between interactions
+ shared_state = gr.State({
+ 'analysis_results': {}, # {symbol: {tf: {signals, values, trade_params, direction}}}
+ 'heatmap_df': pd.DataFrame(), # DataFrame for heatmap display values
+ 'heatmap_details': {}, # {Coin: {tf: {Ind: Val,...}}} for hover
+ 'backtest_results': [], # List of backtest result dicts
+ 'active_signals_df': pd.DataFrame(), # DataFrame of currently active signals
+ 'valid_timeframes': [], # List of timeframes used in the last run
+ 'analyzer': None # Instance of the analyzer class
+ })
+
+ with gr.Row():
+ with gr.Column(scale=1):
+ gr.Markdown("## Configuration")
+ # Get available exchanges dynamically, handle potential errors
+ try:
+ available_exchanges = ccxt.exchanges
+ except Exception as e:
+ print(f"Warning: Could not fetch ccxt exchanges list: {e}")
+ available_exchanges = [DEFAULT_EXCHANGE_ID] # Fallback
+
+ exchange_input = gr.Dropdown(label="Exchange", choices=available_exchanges, value=DEFAULT_EXCHANGE_ID, interactive=True)
+ top_n_input = gr.Slider(label="Number of Top Coins by Volume", minimum=5, maximum=100, step=5, value=DEFAULT_TOP_N_COINS, interactive=True) # Reduced Max
+ all_timeframes = list(TIMEFRAME_ORDER_MAP.keys())
+ timeframe_input = gr.CheckboxGroup(label="Select Timeframes (Fewer = Faster)", choices=all_timeframes, value=DEFAULT_TIMEFRAMES, interactive=True)
+
+ run_button = gr.Button("Run Analysis & Backtest", variant="primary")
+ status_log = gr.Textbox(label="Status Log", lines=15, interactive=False, placeholder="Analysis logs will appear here...", max_lines=30) # Increased lines
+
+ with gr.Column(scale=3):
+ gr.Markdown("## Results")
+ with gr.Tabs():
+ with gr.TabItem("Heatmap"):
+ gr.Markdown("Signal strength heatmap based on composite indicator score. Hover over cells for key values, click a cell for full details below.")
+ # Use gr.Plot which supports Plotly and the 'select' event for clicks
+ # Explicitly set label for Plot component
+ heatmap_plot = gr.Plot(label="Signal Heatmap", show_label=False) # show_label=False to hide the default label if desired
+ heatmap_detail_output = gr.Markdown(label="Clicked Cell Full Details", value="*Click on a heatmap cell after analysis to see full indicator details and trade setup.*") # Placeholder text
+
+ with gr.TabItem("Active Trade Setups"):
+ gr.Markdown("### Potential Trade Setups (Current Snapshot)")
+ gr.Markdown(f"*Shows pairs and timeframes with a non-neutral signal direction and valid ATR-based parameters from the **latest** analyzed candle. Signals logged to `{SIGNAL_LOG_FILE}`. This is NOT financial advice. DYOR!*")
+ active_signals_table = gr.DataFrame(
+ label="Active Signals",
+ headers=['Symbol', 'Timeframe', 'Direction', 'Entry', 'SL', 'TP1', 'TP2'],
+ datatype=['str'] * 7, # Treat all as strings for display consistency with precision
+ interactive=False,
+ row_count=(10, "dynamic"), # Show ~10 rows, allow scroll
+ col_count=(7, "fixed"),
+ wrap=True
+ )
+
+ with gr.TabItem("Zones"):
+ gr.Markdown("### Potential Long/Short Zones")
+ zone_threshold_display = max(0.1, min(1.0, DEFAULT_MIN_CONFIRMATION * 0.4)) # Calculate display threshold
+ gr.Markdown(f"*Coins with the highest/lowest average signal score across the selected timeframes (based on an average score threshold of ~**+/- {zone_threshold_display:.2f}**). Indicates potential broader trend alignment.*")
+ with gr.Row():
+ long_zone_output = gr.DataFrame(label="Potential Long Zone (Highest Avg Scores)", headers=["Coin", "Avg Score"], col_count=(2, "fixed"), row_count=10)
+ short_zone_output = gr.DataFrame(label="Potential Short Zone (Lowest Avg Scores)", headers=["Coin", "Avg Score"], col_count=(2, "fixed"), row_count=10)
+
+ with gr.TabItem("Full Coin Details"):
+ gr.Markdown("Select a coin analyzed in the heatmap to view its detailed indicator values, signals, and potential trade setup across all selected timeframes.")
+ coin_selector = gr.Dropdown(label="Select Coin to View All Timeframe Details", choices=[], interactive=False) # Initially disabled
+ coin_detail_output = gr.Markdown(label="Detailed Indicator Values & Trade Setup per Timeframe", value="*Select a coin from the dropdown after analysis runs.*")
+
+ with gr.TabItem("Backtest Summary"):
+ gr.Markdown(f"### Simplified Backtest Results (ATR TP1/SL Strategy)")
+ gr.Markdown(f"*Note: Simulated on last **{BACKTEST_HISTORY_CANDLES}** candles per timeframe. Assumes entry on signal candle's open, exits on TP1/SL hit within the **next** candle's high/low. Includes estimated ~{SIMULATED_FEE_PERCENT*2:.2f}% round-trip fee. **This is a highly simplified simulation for indicative purposes only and NOT investment advice.** Results also saved to `{BACKTEST_RESULTS_FILE}`.*")
+ backtest_summary_df = gr.DataFrame(
+ label="Backtest Metrics per Symbol/Timeframe",
+ interactive=False,
+ wrap=True,
+ row_count=(15, "dynamic"), # Show more rows
+ col_count=(6, "fixed")
+ )
+
+
+ # --- Event Handler: Run Button ---
+ def analysis_process_wrapper(exchange, top_n, timeframes, current_state, progress=gr.Progress(track_tqdm=True)):
+ """Wrapper to run analysis and update UI components."""
+ start_time = time.time()
+ log = ["Initializing analysis..."]
+ # Clear previous results visually and reset state components
+ current_state = { # Reset state explicitly
+ 'analysis_results': {}, 'heatmap_df': pd.DataFrame(), 'heatmap_details': {},
+ 'backtest_results': [], 'active_signals_df': pd.DataFrame(),
+ 'valid_timeframes': [], 'analyzer': None
+ }
+ initial_updates = {
+ status_log: "\n".join(log),
+ heatmap_plot: None, # Clear plot
+ heatmap_detail_output: "Running analysis...",
+ active_signals_table: pd.DataFrame(columns=['Symbol', 'Timeframe', 'Direction', 'Entry', 'SL', 'TP1', 'TP2']), # Clear table
+ long_zone_output: pd.DataFrame(columns=["Coin", "Avg Score"]),
+ short_zone_output: pd.DataFrame(columns=["Coin", "Avg Score"]),
+ coin_selector: gr.Dropdown(choices=[], value=None, label="Select Coin...", interactive=False), # Disable dropdown
+ coin_detail_output: "Running analysis...",
+ backtest_summary_df: pd.DataFrame(columns=['symbol', 'timeframe', 'trades', 'win_rate', 'pnl_abs_sum', 'pnl_%_sum_on_entry']), # Clear backtest table
+ shared_state: current_state # Update the state with cleared data
+ }
+ yield initial_updates # Update UI immediately
+
+ try:
+ # Validate inputs
+ if not exchange:
+ log.append("Error: No exchange selected.")
+ yield {status_log: "\n".join(log), shared_state: current_state}
+ return
+ if not timeframes:
+ log.append("Error: No timeframes selected.")
+ yield {status_log: "\n".join(log), shared_state: current_state}
+ return
+
+ analyzer = CryptoTrendIndicator(exchange, int(top_n), timeframes)
+ current_state['analyzer'] = analyzer # Store analyzer instance
+
+ if not analyzer.valid_timeframes:
+ log.append(f"Error: No valid timeframes found or supported for exchange '{exchange}'. Check selection or exchange capabilities.")
+ yield {status_log: "\n".join(log), shared_state: current_state}
+ return # Stop processing
+
+ log.append(f"Analyzer initialized for {exchange} | {top_n} coins | Timeframes: {analyzer.valid_timeframes}")
+ log.append(f"Fetching up to {LIMIT_PER_TIMEFRAME} candles | Backtesting last {BACKTEST_HISTORY_CANDLES} candles.")
+ log.append("Starting data fetch and analysis (this may take several minutes)...")
+ yield {status_log: "\n".join(log)} # Update log
+
+ # Run the full analysis, get all results
+ analysis_results, heatmap_df, backtest_results, active_signals_df, heatmap_details, log_msgs = analyzer.run_full_analysis(progress=progress) # Pass progress tracker
+ log.extend(log_msgs) # Add logs from the analysis process
+
+ # Update state with the new results
+ current_state['analysis_results'] = analysis_results
+ current_state['heatmap_df'] = heatmap_df if isinstance(heatmap_df, pd.DataFrame) else pd.DataFrame() # Ensure DF
+ current_state['heatmap_details'] = heatmap_details if isinstance(heatmap_details, dict) else {}
+ current_state['backtest_results'] = backtest_results if isinstance(backtest_results, list) else []
+ current_state['active_signals_df'] = active_signals_df if isinstance(active_signals_df, pd.DataFrame) else pd.DataFrame()
+ current_state['valid_timeframes'] = analyzer.valid_timeframes
+
+ # --- Prepare final UI updates ---
+ # Heatmap & Coin Selector
+ if not current_state['heatmap_df'].empty:
+ fig = create_plotly_heatmap(current_state['heatmap_df'], current_state['heatmap_details'])
+ coin_list = sorted(current_state['heatmap_df'].index.astype(str).tolist())
+ coin_selector_update = gr.Dropdown(choices=coin_list, value=None, label="Select Coin...", interactive=True) # Enable dropdown
+ heatmap_detail_msg = "Click on a heatmap cell for specific details."
+ else:
+ log.append("Warning: Heatmap data is empty after analysis.")
+ fig = go.Figure(layout=go.Layout(title="No heatmap data generated", height=300)) # Empty figure
+ coin_list = []
+ coin_selector_update = gr.Dropdown(choices=[], value=None, label="No Coins Analyzed", interactive=False) # Keep disabled
+ heatmap_detail_msg = "No heatmap data generated. Check logs."
+
+ # Zones
+ long_coins, short_coins = find_zones(current_state['heatmap_df'], DEFAULT_MIN_CONFIRMATION)
+ # Convert list of tuples directly for Gradio DataFrame
+ long_df_data = long_coins if long_coins else [(" ", " ")] # Placeholder if empty
+ short_df_data = short_coins if short_coins else [(" ", " ")]
+
+ # Backtest Summary
+ bt_summary_display_df = format_backtest_summary(current_state['backtest_results'])
+ if bt_summary_display_df.empty:
+ bt_summary_display_df = pd.DataFrame([{'symbol': 'No results', 'timeframe': '', 'trades': 0, 'win_rate': 0, 'pnl_abs_sum': 0, 'pnl_%_sum_on_entry': 0}])
+
+
+ # Active Signals
+ active_signals_display_df = current_state['active_signals_df']
+ if active_signals_display_df.empty:
+ active_signals_display_df = pd.DataFrame([{'Symbol': 'No active signals', 'Timeframe': '', 'Direction': '', 'Entry': '', 'SL': '', 'TP1': '', 'TP2': ''}])
+
+
+ end_time = time.time()
+ log.append(f"Analysis & Backtest finished in {end_time - start_time:.2f} seconds.")
+
+ # Prepare the final dictionary of updates for the UI
+ final_updates = {
+ status_log: "\n".join(log),
+ heatmap_plot: fig,
+ heatmap_detail_output: heatmap_detail_msg,
+ active_signals_table: active_signals_display_df, # Show the active signals table
+ long_zone_output: gr.DataFrame(value=long_df_data, headers=["Coin", "Avg Score"]), # Update with new value/headers
+ short_zone_output: gr.DataFrame(value=short_df_data, headers=["Coin", "Avg Score"]), # Update with new value/headers
+ coin_selector: coin_selector_update, # Update dropdown with choices
+ coin_detail_output: "Select a coin from the dropdown above." if coin_list else "No analysis results.",
+ backtest_summary_df: bt_summary_display_df, # Show backtest summary
+ shared_state: current_state # IMPORTANT: Update the state with all results
+ }
+ yield final_updates
+
+ except ValueError as ve:
+ # Catch initialization or config errors
+ log.append(f"--- CONFIGURATION ERROR ---")
+ error_details = f"{str(ve)}\n{traceback.format_exc()}"
+ log.append(error_details)
+ print(error_details)
+ current_state = { # Ensure state is reset
+ 'analysis_results': {}, 'heatmap_df': pd.DataFrame(), 'heatmap_details': {},
+ 'backtest_results': [], 'active_signals_df': pd.DataFrame(),
+ 'valid_timeframes': [], 'analyzer': None
+ }
+ yield { status_log: "\n".join(log), shared_state: current_state } # Update log and state
+
+ except Exception as e:
+ log.append(f"--- FATAL ERROR DURING ANALYSIS ---")
+ error_details = f"{str(e)}\n{traceback.format_exc()}"
+ log.append(error_details)
+ print(error_details)
+ # Reset state components on fatal error during run
+ current_state = { # Ensure state is reset
+ 'analysis_results': {}, 'heatmap_df': pd.DataFrame(), 'heatmap_details': {},
+ 'backtest_results': [], 'active_signals_df': pd.DataFrame(),
+ 'valid_timeframes': [], 'analyzer': None
+ }
+ error_updates = {
+ status_log: "\n".join(log),
+ # Keep other outputs cleared or show error message
+ heatmap_plot: None,
+ heatmap_detail_output: f"Analysis failed. Check logs.\nError: {e}",
+ active_signals_table: pd.DataFrame(columns=['Symbol', 'Timeframe', 'Direction', 'Entry', 'SL', 'TP1', 'TP2']),
+ long_zone_output: pd.DataFrame(columns=["Coin", "Avg Score"]),
+ short_zone_output: pd.DataFrame(columns=["Coin", "Avg Score"]),
+ coin_selector: gr.Dropdown(choices=[], value=None, label="Error", interactive=False),
+ coin_detail_output: f"Analysis failed. Check logs.\nError: {e}",
+ backtest_summary_df: pd.DataFrame(columns=['symbol', 'timeframe', 'trades', 'win_rate', 'pnl_abs_sum', 'pnl_%_sum_on_entry']),
+ shared_state: current_state # Update state with cleared data
+ }
+ yield error_updates
+
+ run_button.click(
+ fn=analysis_process_wrapper,
+ inputs=[exchange_input, top_n_input, timeframe_input, shared_state],
+ outputs=[ # List all components that can be updated
+ status_log, heatmap_plot, heatmap_detail_output, active_signals_table,
+ long_zone_output, short_zone_output, coin_selector, coin_detail_output,
+ backtest_summary_df, shared_state
+ ]
+ )
+
+ # --- Event Handler: Coin Dropdown Change ---
+ def display_full_details_handler(selected_coin, current_state):
+ """Handles dropdown change to show full details for a selected coin."""
+ if not selected_coin:
+ return "Select a coin from the dropdown."
+ # Check state validity
+ if not isinstance(current_state, dict) or not current_state.get('analysis_results') or not current_state.get('valid_timeframes'):
+ print("Debug Coin Select: State invalid or missing data.")
+ return "Run analysis first or analysis data is missing/incomplete in state."
+
+ analysis_results = current_state['analysis_results']
+ valid_tfs = current_state.get('valid_timeframes', [])
+
+ # Find the full symbol (e.g., BTC/USDT) based on the selected base coin name
+ full_symbol = None
+ for symbol_key in analysis_results.keys():
+ # Ensure comparison is string-to-string
+ if str(symbol_key).startswith(str(selected_coin) + '/'):
+ full_symbol = symbol_key
+ break # Found the first match
+
+ if not full_symbol:
+ print(f"Debug Coin Select: Full symbol not found for base {selected_coin}")
+ return f"Details not found for {selected_coin} in the current analysis results."
+
+ if not valid_tfs:
+ print(f"Debug Coin Select: Valid timeframes list missing for {selected_coin}")
+ return f"Valid timeframes list is missing from state for {selected_coin}."
+
+ # Call the formatting function
+ return format_coin_details(full_symbol, analysis_results, valid_tfs)
+
+ coin_selector.change(
+ fn=display_full_details_handler,
+ inputs=[coin_selector, shared_state],
+ outputs=[coin_detail_output]
+ )
+
+ # --- Event Handler: Heatmap Click ---
+ # Use .select event for gr.Plot with Plotly figures
+ heatmap_plot.change( # Use .select for Plotly click events
+ fn=format_heatmap_click_details,
+ inputs=[shared_state], # Pass the whole state
+ outputs=[heatmap_detail_output] # Update the Markdown component below heatmap
+ )
+
+ return app
+
+# --- Main Execution ---
+if __name__ == "__main__":
+ print("\n--- Crypto Analysis App V3 ---")
+ # Check if results/log files exist and inform user
+ for fpath in [BACKTEST_RESULTS_FILE, SIGNAL_LOG_FILE]:
+ if os.path.exists(fpath):
+ print(f"INFO: Existing file found: '{fpath}'. It may be appended to or overwritten on the next run.")
+ else:
+ print(f"INFO: Results/Logs will be saved to '{fpath}' after analysis.")
+
+ print("\nStarting Crypto Analysis Gradio App...")
+ print("------------------------------------------------------")
+ print(f"CONFIG: Backtest Candles={BACKTEST_HISTORY_CANDLES}, Fetch Limit={LIMIT_PER_TIMEFRAME}")
+ print(f"CONFIG: Default Exchange={DEFAULT_EXCHANGE_ID}, Top Coins={DEFAULT_TOP_N_COINS}, Timeframes={DEFAULT_TIMEFRAMES}")
+ print("WARNING: Initial analysis might be slow due to extensive data fetching and calculations.")
+ print("Ensure you have required libraries: pandas, numpy, ccxt, ta, plotly, gradio")
+ print("------------------------------------------------------")
+
+ gradio_app = create_gradio_app()
+ # Launch the app (debug=False for production/sharing, debug=True for development errors)
+ # share=True can be used to create a temporary public link (use with caution)
+ # Increase max_threads if analysis is CPU-bound and you have cores, but be mindful of API rate limits
+ gradio_app.queue().launch(debug=False, max_threads=4) # Enable queue for better handling of long processes
\ No newline at end of file