import logging import gradio as gr import pandas as pd import torch import numpy as np from GoogleNews import GoogleNews from transformers import pipeline import yfinance as yf import requests from fuzzywuzzy import process import statistics import matplotlib.pyplot as plt from datetime import datetime, timedelta import warnings warnings.filterwarnings("ignore", category=UserWarning, module="fuzzywuzzy") # Set up logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) SENTIMENT_ANALYSIS_MODEL = ( "mrm8488/distilroberta-finetuned-financial-news-sentiment-analysis" ) DEVICE = "cuda" if torch.cuda.is_available() else "cpu" logging.info(f"Using device: {DEVICE}") logging.info("Initializing sentiment analysis model...") sentiment_analyzer = pipeline( "sentiment-analysis", model=SENTIMENT_ANALYSIS_MODEL, device=DEVICE ) logging.info("Model initialized successfully") # Technical Analysis Parameters TA_CONFIG = { 'rsi_window': 14, 'macd_fast': 12, 'macd_slow': 26, 'macd_signal': 9, 'bollinger_window': 20, 'sma_windows': [20, 50, 200], 'ema_windows': [12, 26], 'volatility_window': 30 } EXCHANGE_SUFFIXES = { "NSE": ".NS", "BSE": ".BO", "NYSE": "", "NASDAQ": "", } def calculate_technical_indicators(history): """Calculate various technical indicators from historical price data""" ta_results = {} # RSI delta = history['Close'].diff() gain = delta.where(delta > 0, 0) loss = -delta.where(delta < 0, 0) avg_gain = gain.rolling(TA_CONFIG['rsi_window']).mean() avg_loss = loss.rolling(TA_CONFIG['rsi_window']).mean() rs = avg_gain / avg_loss ta_results['rsi'] = 100 - (100 / (1 + rs)).iloc[-1] # MACD ema_fast = history['Close'].ewm(span=TA_CONFIG['macd_fast'], adjust=False).mean() ema_slow = history['Close'].ewm(span=TA_CONFIG['macd_slow'], adjust=False).mean() macd = ema_fast - ema_slow signal = macd.ewm(span=TA_CONFIG['macd_signal'], adjust=False).mean() ta_results['macd'] = macd.iloc[-1] ta_results['macd_signal'] = signal.iloc[-1] # Bollinger Bands sma = history['Close'].rolling(TA_CONFIG['bollinger_window']).mean() std = history['Close'].rolling(TA_CONFIG['bollinger_window']).std() ta_results['bollinger_upper'] = (sma + 2 * std).iloc[-1] ta_results['bollinger_lower'] = (sma - 2 * std).iloc[-1] # Moving Averages for window in TA_CONFIG['sma_windows']: ta_results[f'sma_{window}'] = history['Close'].rolling(window).mean().iloc[-1] for window in TA_CONFIG['ema_windows']: ta_results[f'ema_{window}'] = history['Close'].ewm(span=window, adjust=False).mean().iloc[-1] # Volatility returns = history['Close'].pct_change().dropna() ta_results['volatility_30d'] = returns.rolling(TA_CONFIG['volatility_window']).std().iloc[-1] * np.sqrt(252) return ta_results def generate_price_chart(history): """Generate interactive price chart with technical indicators""" fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8), sharex=True) # Price and Moving Averages history['Close'].plot(ax=ax1, label='Price') for window in TA_CONFIG['sma_windows']: history['Close'].rolling(window).mean().plot(ax=ax1, label=f'SMA {window}') ax1.set_title('Price and Moving Averages') ax1.legend() # RSI delta = history['Close'].diff() gain = delta.where(delta > 0, 0) loss = -delta.where(delta < 0, 0) avg_gain = gain.rolling(TA_CONFIG['rsi_window']).mean() avg_loss = loss.rolling(TA_CONFIG['rsi_window']).mean() rs = avg_gain / avg_loss rsi = 100 - (100 / (1 + rs)) rsi.plot(ax=ax2, label='RSI') ax2.axhline(70, color='red', linestyle='--') ax2.axhline(30, color='green', linestyle='--') ax2.set_title('Relative Strength Index (RSI)') ax2.legend() plt.tight_layout() return fig def resolve_ticker_symbol(query: str) -> str: """ Convert company names/partial symbols to valid Yahoo Finance tickers. Example: "Kalyan Jewellers" → "KALYANKJIL.NS" """ url = "https://query2.finance.yahoo.com/v1/finance/search" headers = {"User-Agent": "Mozilla/5.0"} # Avoid blocking params = {"q": query, "quotesCount": 5, "country": "India"} response = requests.get(url, headers=headers, params=params) data = response.json() if not data.get("quotes"): raise ValueError(f"No ticker found for: {query}") # Extract quotes data quotes = data["quotes"] tickers = [quote["symbol"] for quote in quotes] names = [quote.get("longname") or quote.get("shortname", "") for quote in quotes] # Fuzzy match the query with company names best_match, score = process.extractOne(query, names) if not best_match: raise ValueError(f"No matching ticker found for: {query}") index = names.index(best_match) best_quote = quotes[index] resolved_ticker = best_quote["symbol"] exchange_code = best_quote.get("exchange", "").upper() # Map exchange codes to suffixes exchange_suffix_map = { "NSI": ".NS", # NSE "BOM": ".BO", # BSE "BSE": ".BO", "NSE": ".NS", } suffix = exchange_suffix_map.get(exchange_code, ".NS") # Default to NSE # Append suffix only if not already present if not resolved_ticker.endswith(suffix): resolved_ticker += suffix return resolved_ticker def fetch_articles(query): try: logging.info(f"Fetching articles for query: '{query}'") googlenews = GoogleNews(lang="en") googlenews.search(query) articles = googlenews.result() logging.info(f"Fetched {len(articles)} articles") return articles except Exception as e: logging.error( f"Error while searching articles for query: '{query}'. Error: {e}" ) raise gr.Error( f"Unable to search articles for query: '{query}'. Try again later...", duration=5, ) def analyze_article_sentiment(article): logging.info(f"Analyzing sentiment for article: {article['title']}") sentiment = sentiment_analyzer(article["desc"])[0] article["sentiment"] = sentiment return article def fetch_yfinance_data(ticker): """Enhanced Yahoo Finance data fetching with technical analysis""" try: logging.info(f"Fetching Yahoo Finance data for: {ticker}") stock = yf.Ticker(ticker) history = stock.history(period="1y", interval="1d") if history.empty: logging.error(f"No data found for {ticker}") return {"error": f"No data found for {ticker}"} # Calculate technical indicators ta_data = calculate_technical_indicators(history) # Current price data current_price = history['Close'].iloc[-1] prev_close = history['Close'].iloc[-2] if len(history) > 1 else 0 price_change = current_price - prev_close percent_change = (price_change / prev_close) * 100 if prev_close != 0 else 0 # Generate price chart chart = generate_price_chart(history[-120:]) # Last 120 days return { 'current_price': current_price, 'price_change': price_change, 'percent_change': percent_change, 'chart': chart, 'technical_indicators': ta_data, 'fundamentals': stock.info } except Exception as e: logging.error(f"Error fetching Yahoo Finance data for {ticker}: {str(e)}") return {"error": f"Failed to fetch data for {ticker}: {str(e)}"} def time_weighted_sentiment(articles): """Apply time-based weighting to sentiment scores""" now = datetime.now() weighted_scores = [] for article in articles: try: article_date = datetime.strptime(article['date'], '%Y-%m-%d %H:%M:%S') days_old = (now - article_date).days weight = max(0, 1 - (days_old / 7)) # Linear decay over 7 days except: weight = 0.5 # Default weight if date parsing fails sentiment = article['sentiment']['label'] score = 1 if sentiment == 'positive' else -1 if sentiment == 'negative' else 0 weighted_scores.append(score * weight) return weighted_scores def _format_number(num): """Helper to format large numbers with suffixes""" if isinstance(num, (int, float)): for unit in ['','K','M','B','T']: if abs(num) < 1000: return f"{num:,.2f}{unit}" num /= 1000 return f"{num:,.2f}P" return num def convert_to_dataframe(analyzed_articles): df = pd.DataFrame(analyzed_articles) def sentiment_badge(sentiment): colors = { "negative": "#ef4444", "neutral": "#64748b", "positive": "#22c55e", } color = colors.get(sentiment, "grey") return ( f'