|
import logging |
|
import gradio as gr |
|
import pandas as pd |
|
import torch |
|
import numpy as np |
|
from GoogleNews import GoogleNews |
|
from transformers import pipeline |
|
import yfinance as yf |
|
import requests |
|
from fuzzywuzzy import process |
|
import statistics |
|
import matplotlib.pyplot as plt |
|
from datetime import datetime, timedelta |
|
import warnings |
|
warnings.filterwarnings("ignore", category=UserWarning, module="fuzzywuzzy") |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" |
|
) |
|
|
|
SENTIMENT_ANALYSIS_MODEL = ( |
|
"mrm8488/distilroberta-finetuned-financial-news-sentiment-analysis" |
|
) |
|
|
|
DEVICE = "cuda" if torch.cuda.is_available() else "cpu" |
|
logging.info(f"Using device: {DEVICE}") |
|
|
|
logging.info("Initializing sentiment analysis model...") |
|
sentiment_analyzer = pipeline( |
|
"sentiment-analysis", model=SENTIMENT_ANALYSIS_MODEL, device=DEVICE |
|
) |
|
logging.info("Model initialized successfully") |
|
|
|
|
|
TA_CONFIG = { |
|
'rsi_window': 14, |
|
'macd_fast': 12, |
|
'macd_slow': 26, |
|
'macd_signal': 9, |
|
'bollinger_window': 20, |
|
'sma_windows': [20, 50, 200], |
|
'ema_windows': [12, 26], |
|
'volatility_window': 30 |
|
} |
|
|
|
EXCHANGE_SUFFIXES = { |
|
"NSE": ".NS", |
|
"BSE": ".BO", |
|
"NYSE": "", |
|
"NASDAQ": "", |
|
} |
|
|
|
def calculate_technical_indicators(history): |
|
"""Calculate various technical indicators from historical price data""" |
|
ta_results = {} |
|
|
|
|
|
delta = history['Close'].diff() |
|
gain = delta.where(delta > 0, 0) |
|
loss = -delta.where(delta < 0, 0) |
|
|
|
avg_gain = gain.rolling(TA_CONFIG['rsi_window']).mean() |
|
avg_loss = loss.rolling(TA_CONFIG['rsi_window']).mean() |
|
rs = avg_gain / avg_loss |
|
ta_results['rsi'] = 100 - (100 / (1 + rs)).iloc[-1] |
|
|
|
|
|
ema_fast = history['Close'].ewm(span=TA_CONFIG['macd_fast'], adjust=False).mean() |
|
ema_slow = history['Close'].ewm(span=TA_CONFIG['macd_slow'], adjust=False).mean() |
|
macd = ema_fast - ema_slow |
|
signal = macd.ewm(span=TA_CONFIG['macd_signal'], adjust=False).mean() |
|
ta_results['macd'] = macd.iloc[-1] |
|
ta_results['macd_signal'] = signal.iloc[-1] |
|
|
|
|
|
sma = history['Close'].rolling(TA_CONFIG['bollinger_window']).mean() |
|
std = history['Close'].rolling(TA_CONFIG['bollinger_window']).std() |
|
ta_results['bollinger_upper'] = (sma + 2 * std).iloc[-1] |
|
ta_results['bollinger_lower'] = (sma - 2 * std).iloc[-1] |
|
|
|
|
|
for window in TA_CONFIG['sma_windows']: |
|
ta_results[f'sma_{window}'] = history['Close'].rolling(window).mean().iloc[-1] |
|
for window in TA_CONFIG['ema_windows']: |
|
ta_results[f'ema_{window}'] = history['Close'].ewm(span=window, adjust=False).mean().iloc[-1] |
|
|
|
|
|
returns = history['Close'].pct_change().dropna() |
|
ta_results['volatility_30d'] = returns.rolling(TA_CONFIG['volatility_window']).std().iloc[-1] * np.sqrt(252) |
|
|
|
return ta_results |
|
|
|
def generate_price_chart(history): |
|
"""Generate interactive price chart with technical indicators""" |
|
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8), sharex=True) |
|
|
|
|
|
history['Close'].plot(ax=ax1, label='Price') |
|
for window in TA_CONFIG['sma_windows']: |
|
history['Close'].rolling(window).mean().plot(ax=ax1, label=f'SMA {window}') |
|
ax1.set_title('Price and Moving Averages') |
|
ax1.legend() |
|
|
|
|
|
delta = history['Close'].diff() |
|
gain = delta.where(delta > 0, 0) |
|
loss = -delta.where(delta < 0, 0) |
|
avg_gain = gain.rolling(TA_CONFIG['rsi_window']).mean() |
|
avg_loss = loss.rolling(TA_CONFIG['rsi_window']).mean() |
|
rs = avg_gain / avg_loss |
|
rsi = 100 - (100 / (1 + rs)) |
|
|
|
rsi.plot(ax=ax2, label='RSI') |
|
ax2.axhline(70, color='red', linestyle='--') |
|
ax2.axhline(30, color='green', linestyle='--') |
|
ax2.set_title('Relative Strength Index (RSI)') |
|
ax2.legend() |
|
|
|
plt.tight_layout() |
|
return fig |
|
|
|
def resolve_ticker_symbol(query: str) -> str: |
|
""" |
|
Convert company names/partial symbols to valid Yahoo Finance tickers. |
|
Example: "Kalyan Jewellers" → "KALYANKJIL.NS" |
|
""" |
|
url = "https://query2.finance.yahoo.com/v1/finance/search" |
|
headers = {"User-Agent": "Mozilla/5.0"} |
|
params = {"q": query, "quotesCount": 5, "country": "India"} |
|
|
|
response = requests.get(url, headers=headers, params=params) |
|
data = response.json() |
|
|
|
if not data.get("quotes"): |
|
raise ValueError(f"No ticker found for: {query}") |
|
|
|
|
|
quotes = data["quotes"] |
|
tickers = [quote["symbol"] for quote in quotes] |
|
names = [quote.get("longname") or quote.get("shortname", "") for quote in quotes] |
|
|
|
|
|
best_match, score = process.extractOne(query, names) |
|
if not best_match: |
|
raise ValueError(f"No matching ticker found for: {query}") |
|
|
|
index = names.index(best_match) |
|
best_quote = quotes[index] |
|
resolved_ticker = best_quote["symbol"] |
|
exchange_code = best_quote.get("exchange", "").upper() |
|
|
|
|
|
exchange_suffix_map = { |
|
"NSI": ".NS", |
|
"BOM": ".BO", |
|
"BSE": ".BO", |
|
"NSE": ".NS", |
|
} |
|
suffix = exchange_suffix_map.get(exchange_code, ".NS") |
|
|
|
|
|
if not resolved_ticker.endswith(suffix): |
|
resolved_ticker += suffix |
|
|
|
return resolved_ticker |
|
|
|
def fetch_articles(query): |
|
try: |
|
logging.info(f"Fetching articles for query: '{query}'") |
|
googlenews = GoogleNews(lang="en") |
|
googlenews.search(query) |
|
articles = googlenews.result() |
|
logging.info(f"Fetched {len(articles)} articles") |
|
return articles |
|
except Exception as e: |
|
logging.error( |
|
f"Error while searching articles for query: '{query}'. Error: {e}" |
|
) |
|
raise gr.Error( |
|
f"Unable to search articles for query: '{query}'. Try again later...", |
|
duration=5, |
|
) |
|
|
|
def analyze_article_sentiment(article): |
|
logging.info(f"Analyzing sentiment for article: {article['title']}") |
|
sentiment = sentiment_analyzer(article["desc"])[0] |
|
article["sentiment"] = sentiment |
|
return article |
|
|
|
def fetch_yfinance_data(ticker): |
|
"""Enhanced Yahoo Finance data fetching with technical analysis""" |
|
try: |
|
logging.info(f"Fetching Yahoo Finance data for: {ticker}") |
|
stock = yf.Ticker(ticker) |
|
history = stock.history(period="1y", interval="1d") |
|
|
|
if history.empty: |
|
logging.error(f"No data found for {ticker}") |
|
return {"error": f"No data found for {ticker}"} |
|
|
|
|
|
ta_data = calculate_technical_indicators(history) |
|
|
|
|
|
current_price = history['Close'].iloc[-1] |
|
prev_close = history['Close'].iloc[-2] if len(history) > 1 else 0 |
|
price_change = current_price - prev_close |
|
percent_change = (price_change / prev_close) * 100 if prev_close != 0 else 0 |
|
|
|
|
|
chart = generate_price_chart(history[-120:]) |
|
|
|
return { |
|
'current_price': current_price, |
|
'price_change': price_change, |
|
'percent_change': percent_change, |
|
'chart': chart, |
|
'technical_indicators': ta_data, |
|
'fundamentals': stock.info |
|
} |
|
|
|
except Exception as e: |
|
logging.error(f"Error fetching Yahoo Finance data for {ticker}: {str(e)}") |
|
return {"error": f"Failed to fetch data for {ticker}: {str(e)}"} |
|
|
|
def time_weighted_sentiment(articles): |
|
"""Apply time-based weighting to sentiment scores""" |
|
now = datetime.now() |
|
weighted_scores = [] |
|
|
|
for article in articles: |
|
try: |
|
article_date = datetime.strptime(article['date'], '%Y-%m-%d %H:%M:%S') |
|
days_old = (now - article_date).days |
|
weight = max(0, 1 - (days_old / 7)) |
|
except: |
|
weight = 0.5 |
|
|
|
sentiment = article['sentiment']['label'] |
|
score = 1 if sentiment == 'positive' else -1 if sentiment == 'negative' else 0 |
|
weighted_scores.append(score * weight) |
|
|
|
return weighted_scores |
|
|
|
def _format_number(num): |
|
"""Helper to format large numbers with suffixes""" |
|
if isinstance(num, (int, float)): |
|
for unit in ['','K','M','B','T']: |
|
if abs(num) < 1000: |
|
return f"{num:,.2f}{unit}" |
|
num /= 1000 |
|
return f"{num:,.2f}P" |
|
return num |
|
|
|
def convert_to_dataframe(analyzed_articles): |
|
df = pd.DataFrame(analyzed_articles) |
|
|
|
def sentiment_badge(sentiment): |
|
colors = { |
|
"negative": "#ef4444", |
|
"neutral": "#64748b", |
|
"positive": "#22c55e", |
|
} |
|
color = colors.get(sentiment, "grey") |
|
return ( |
|
f'<div style="display: inline-flex; align-items: center; gap: 0.5rem;">' |
|
f'<div style="width: 0.75rem; height: 0.75rem; background-color: {color}; border-radius: 50%;"></div>' |
|
f'<span style="text-transform: capitalize; font-weight: 500; color: {color}">{sentiment}</span>' |
|
f'</div>' |
|
) |
|
|
|
df["Sentiment"] = df["sentiment"].apply(lambda x: sentiment_badge(x["label"].lower())) |
|
df["Title"] = df.apply( |
|
lambda row: f'<a href="{row["link"]}" target="_blank" style="text-decoration: none; color: #2563eb;">{row["title"]}</a>', |
|
axis=1, |
|
) |
|
df["Description"] = df["desc"].apply(lambda x: f'<div style="font-size: 0.9rem; color: #4b5563;">{x}</div>') |
|
df["Date"] = df["date"].apply(lambda x: f'<div style="font-size: 0.8rem; color: #6b7280;">{x}</div>') |
|
|
|
|
|
html_table = df[["Sentiment", "Title", "Description", "Date"]].to_html( |
|
escape=False, |
|
index=False, |
|
border=0, |
|
classes="gradio-table", |
|
justify="start" |
|
) |
|
|
|
|
|
styled_html = f""" |
|
<style> |
|
.gradio-table {{ |
|
width: 100%; |
|
border-collapse: collapse; |
|
margin-bottom: 1rem; |
|
}} |
|
.gradio-table th {{ |
|
text-align: left; |
|
padding: 0.75rem; |
|
background-color: #f3f4f6; |
|
border-bottom: 2px solid #d1d5db; |
|
color: #1f2937; |
|
font-weight: 600; |
|
}} |
|
.gradio-table td {{ |
|
padding: 0.75rem; |
|
border-bottom: 1px solid #e5e7eb; |
|
background-color: #ffffff; |
|
}} |
|
.gradio-table tr:hover td {{ |
|
background-color: #f9fafb; |
|
}} |
|
.gradio-table tr:nth-child(even) td {{ |
|
background-color: #f9fafb; |
|
}} |
|
</style> |
|
{html_table} |
|
""" |
|
return styled_html |
|
|
|
def generate_stock_recommendation(articles, finance_data): |
|
"""Enhanced recommendation system with technical analysis""" |
|
|
|
sentiment_scores = time_weighted_sentiment(articles) |
|
positive_score = sum(s for s in sentiment_scores if s > 0) |
|
negative_score = abs(sum(s for s in sentiment_scores if s < 0)) |
|
total_score = positive_score - negative_score |
|
|
|
|
|
ta = finance_data.get('technical_indicators', {}) |
|
rec = { |
|
'recommendation': 'HOLD', |
|
'confidence': 'Medium', |
|
'reasons': [], |
|
'risk_factors': [] |
|
} |
|
|
|
|
|
if total_score > 3: |
|
rec['recommendation'] = 'BUY' |
|
rec['reasons'].append("Strong positive sentiment trend") |
|
elif total_score < -3: |
|
rec['recommendation'] = 'SELL' |
|
rec['reasons'].append("Significant negative sentiment") |
|
|
|
|
|
if ta.get('rsi', 50) > 70: |
|
rec['risk_factors'].append("RSI indicates overbought condition") |
|
elif ta.get('rsi', 50) < 30: |
|
rec['reasons'].append("RSI suggests oversold opportunity") |
|
|
|
if ta.get('macd', 0) > ta.get('macd_signal', 0): |
|
rec['reasons'].append("Bullish MACD crossover") |
|
else: |
|
rec['risk_factors'].append("Bearish MACD trend") |
|
|
|
|
|
if ta.get('volatility_30d', 0) > 0.4: |
|
rec['risk_factors'].append("High volatility detected") |
|
|
|
|
|
if len(rec['reasons']) > len(rec['risk_factors']): |
|
rec['confidence'] = 'High' |
|
elif len(rec['risk_factors']) > 2: |
|
rec['recommendation'] = 'SELL' if rec['recommendation'] == 'HOLD' else rec['recommendation'] |
|
rec['confidence'] = 'Low' |
|
|
|
|
|
output = f"Recommendation: {rec['recommendation']} ({rec['confidence']} Confidence)\n\n" |
|
output += "Supporting Factors:\n" + "\n".join(f"- {r}" for r in rec['reasons']) + "\n\n" |
|
output += "Risk Factors:\n" + "\n".join(f"- {r}" for r in rec['risk_factors']) + "\n\n" |
|
output += f"Sentiment Score: {total_score:.2f}\n" |
|
output += f"30-Day Volatility: {ta.get('volatility_30d', 0):.2%}" |
|
|
|
return output |
|
|
|
def analyze_asset_sentiment(asset_input): |
|
logging.info(f"Starting sentiment analysis for asset: {asset_input}") |
|
|
|
try: |
|
|
|
ticker = resolve_ticker_symbol(asset_input) |
|
logging.info(f"Resolved '{asset_input}' to ticker: {ticker}") |
|
|
|
|
|
articles = fetch_articles(asset_input) |
|
analyzed_articles = [analyze_article_sentiment(article) for article in articles] |
|
|
|
|
|
finance_data = fetch_yfinance_data(ticker) |
|
|
|
|
|
price_chart = finance_data.get('chart') |
|
if 'chart' in finance_data: |
|
del finance_data['chart'] |
|
|
|
|
|
recommendation = generate_stock_recommendation(analyzed_articles, finance_data) |
|
|
|
return ( |
|
convert_to_dataframe(analyzed_articles), |
|
finance_data, |
|
recommendation, |
|
price_chart |
|
) |
|
|
|
except Exception as e: |
|
logging.error(f"Error in analysis: {str(e)}") |
|
return ( |
|
pd.DataFrame(), |
|
{"error": str(e)}, |
|
"Analysis failed", |
|
None |
|
) |
|
|
|
|
|
with gr.Blocks(theme=gr.themes.Default()) as iface: |
|
gr.Markdown("# Advanced Trading Analytics Suite") |
|
|
|
with gr.Row(): |
|
input_asset = gr.Textbox( |
|
label="Asset Name/Ticker", |
|
placeholder="Enter stock name or symbol...", |
|
max_lines=1 |
|
) |
|
analyze_btn = gr.Button("Analyze", variant="primary") |
|
|
|
with gr.Tabs(): |
|
with gr.TabItem("Sentiment Analysis"): |
|
gr.Markdown("## News Sentiment Analysis") |
|
articles_output = gr.HTML(label="Analyzed News Articles") |
|
|
|
with gr.TabItem("Technical Analysis"): |
|
price_chart = gr.Plot(label="Price Analysis") |
|
ta_json = gr.JSON(label="Technical Indicators") |
|
|
|
with gr.TabItem("Recommendation"): |
|
recommendation_output = gr.Textbox( |
|
lines=8, |
|
label="Analysis Summary", |
|
interactive=False |
|
) |
|
|
|
analyze_btn.click( |
|
analyze_asset_sentiment, |
|
inputs=[input_asset], |
|
outputs=[articles_output, ta_json, recommendation_output, price_chart] |
|
) |
|
|
|
logging.info("Launching enhanced Gradio interface") |
|
iface.queue().launch() |