import datetime import requests import numpy as np import matplotlib.pyplot as plt from scipy.interpolate import make_interp_spline import praw import pandas as pd from xgboost import XGBRegressor from sklearn.multioutput import MultiOutputRegressor import joblib import time import os import json import matplotlib.font_manager as fm import base64 import streamlit as st # --- Custom Font & Color Setup --- font_path = "AfacadFlux-VariableFont_slnt,wght[1].ttf" # Update with your actual font file path custom_color = "#000000" # Your custom color custom_color1='#244B48' # Setup custom font properties for matplotlib labels title_font = fm.FontProperties(fname=font_path, size=22) label_font = fm.FontProperties(fname=font_path, size=16) tick_font = fm.FontProperties(fname=font_path, size=14) # --- API and Reddit Setup --- API_URL = "https://api-inference.huggingface.co/models/cardiffnlp/xlm-twitter-politics-sentiment" API_TOKEN = os.environ["HF_API_TOKEN"] headers = {"Authorization": f"Bearer {API_TOKEN}"} reddit = praw.Reddit( client_id=os.environ["REDDIT_CLIENT_ID"], client_secret=os.environ["REDDIT_CLIENT_SECRET"], user_agent=os.environ["REDDIT_USER_AGENT"], check_for_async=False ) # --- List of Subreddits and Analysis Period --- subreddits = [ "centrist", "wayofthebern", "libertarian", "conservatives", "progun" ] end_date = datetime.datetime.utcnow().date() # today in UTC start_date = end_date - datetime.timedelta(days=14) # 14 days ago start_timestamp = int(datetime.datetime.combine(start_date, datetime.time.min).timestamp()) end_timestamp = int(datetime.datetime.combine(end_date, datetime.time.max).timestamp()) # Global variable for rate limiting last_request_time = time.time() def query(text_list, batch_size=5, min_request_interval=10): global last_request_time results = [] for i in range(0, len(text_list), batch_size): batch = text_list[i:i + batch_size] payload = {"inputs": batch} time_since_last_request = time.time() - last_request_time if time_since_last_request < min_request_interval: sleep_time = min_request_interval - time_since_last_request print(f"Rate limiting: Sleeping for {sleep_time:.2f} seconds...") time.sleep(sleep_time) try: response = requests.post(API_URL, headers=headers, json=payload, timeout=60) response.raise_for_status() results.extend(response.json()) last_request_time = time.time() except requests.exceptions.RequestException as e: print(f"API request failed for batch {i//batch_size + 1}: {e}") results.extend([{"error": "Skipped due to API failure"}] * len(batch)) return results def get_negative_score(text): try: post = query([text]) for item in post: itemized = str(item) if len(item) == 0: negative_scores = 0.0 else: score_index = itemized.find("'Negative'") + 23 negative_scores = float(itemized[score_index-1:score_index+2]) except Exception as e: return 0.0 return negative_scores def fetch_daily_posts_sentiments(subreddit_name, day_start_ts, day_end_ts, max_posts=5, limit=10): sentiments = [] subreddit_obj = reddit.subreddit(subreddit_name) query_str = f"timestamp:{day_start_ts}..{day_end_ts}" submissions = list(subreddit_obj.search(query_str, sort="new", limit=limit)) post_dates = [datetime.datetime.utcfromtimestamp(s.created_utc).date() for s in submissions] titles = [s.title for s in submissions] for post_date, title in zip(post_dates, titles): negative_score = get_negative_score(title) sentiments.append((post_date, negative_score)) if len(sentiments) >= max_posts: break return sentiments def main(): print("Running 7-Day Sentiment Forecast Analysis...") all_sentiments = [] # Loop over each day in the past 14 days for day_offset in range(14): current_day = start_date + datetime.timedelta(days=day_offset) day_start = datetime.datetime.combine(current_day, datetime.time.min) day_end = datetime.datetime.combine(current_day, datetime.time.max) day_start_ts = int(day_start.timestamp()) day_end_ts = int(day_end.timestamp()) print(f"Processing date: {current_day}") for sub in subreddits: print(f" Processing subreddit: {sub}") daily_sentiments = fetch_daily_posts_sentiments(sub, day_start_ts, day_end_ts, max_posts=5, limit=5) all_sentiments.extend(daily_sentiments) time.sleep(1) print("Analysis complete!") # Group and average sentiment scores by day daily_scores = {start_date + datetime.timedelta(days=i): [] for i in range(14)} for (post_date, score) in all_sentiments: if post_date in daily_scores: daily_scores[post_date].append(score) avg_daily_scores = [] for i in range(14): day = start_date + datetime.timedelta(days=i) if daily_scores[day]: avg = np.mean(daily_scores[day]) else: avg = 0.0 avg_daily_scores.append(avg) nonzero_values = [v for v in avg_daily_scores if v > 0] mean_nonzero = np.mean(nonzero_values) if nonzero_values else 0.0 avg_daily_scores = [mean_nonzero if v == 0 else v for v in avg_daily_scores] # Load the pre-trained model # model = joblib.load("multioutput_regressor_model.pkl") model = joblib.load("sentiment_forecast_model (1).pkl") def convert_to_model_input(input_array): arr = np.array(input_array) if arr.ndim != 1 or arr.shape[0] != 14: raise ValueError("Input array must be one-dimensional with exactly 14 elements.") return arr.reshape(1, -1) x = convert_to_model_input(avg_daily_scores) pred = model.predict(x)[0] # Generate forecast dates today = datetime.date.today() forecast_days = [today + datetime.timedelta(days=i) for i in range(7)] days_str = [day.strftime('%a %m/%d') for day in forecast_days] # Smooth the prediction curve using spline interpolation x_smooth = np.linspace(0, 6, 100) spline = make_interp_spline(np.arange(7), pred, k=3) pred_smooth = spline(x_smooth) # Create the matplotlib figure fig, ax = plt.subplots(figsize=(12, 7)) ax.fill_between(x_smooth, pred_smooth, color=custom_color1, alpha=0.4) ax.plot(x_smooth, pred_smooth, color=custom_color1, lw=3, label='Forecast') ax.scatter(np.arange(7), pred, color=custom_color1, s=100, zorder=5) ax.set_title("7-Day Sentiment Forecast", fontproperties=title_font, color=custom_color, pad=20) ax.set_xlabel("Day", fontproperties=label_font, color=custom_color) ax.set_ylabel("Negative Sentiment", fontproperties=label_font, color=custom_color) ax.set_xticks(np.arange(7)) ax.set_xticklabels(days_str, fontproperties=tick_font, color=custom_color) plt.setp(ax.get_yticklabels(), fontproperties=tick_font, color=custom_color) # Enable only vertical grid lines (no horizontal grid lines) ax.grid(False) for spine in ax.spines.values(): spine.set_visible(False) legend = ax.legend(prop=tick_font) for text in legend.get_texts(): text.set_color(custom_color) st.pyplot(fig) print('graphdone') if __name__ == "__main__": main()