Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import numpy as np | |
| import yfinance as yf | |
| import ast | |
| from scipy import stats | |
| from datetime import datetime, timedelta | |
| import pytz | |
| import pandas_market_calendars as mcal | |
| import alphalens as al | |
| import matplotlib.pyplot as plt | |
| def sentiment_to_numerical(sentiment): | |
| mapping = {'Negative': -1, 'Positive': 1, 'Neutral': 0} | |
| return sentiment.map(mapping) | |
| def process_sentiment_data(sentiment_data = 'finbert_sentiment.csv', sector_ticker = 'sector_ticker.csv', prices = 'prices.csv'): | |
| columns_to_load = ['Ticker', 'pubDate', 'finbert_output'] | |
| df = pd.read_csv(sentiment_data, usecols=columns_to_load) | |
| df.rename(columns={'Publication Date': 'pubDate','finbert_output': 'Sentiment'}, inplace=True) | |
| # Adjusting the dates of news articles | |
| nyse = mcal.get_calendar('NYSE') | |
| trading_start_hour = 9 | |
| trading_start_minute = 30 | |
| trading_end_hour = 16 | |
| trading_end_minute = 0 | |
| def adjust_date(pub_date): | |
| if pd.isnull(pub_date) or not isinstance(pub_date, pd.Timestamp): | |
| return pub_date | |
| trading_start_time = pd.Timestamp(f'{pub_date.date()} {trading_start_hour}:{trading_start_minute}') | |
| if pub_date >= trading_start_time: | |
| next_trading_day = nyse.schedule(start_date=pub_date.date() + pd.DateOffset(days=1), end_date=pub_date.date() + pd.DateOffset(days=10)).iloc[0]['market_open'] | |
| return next_trading_day | |
| else: | |
| valid_days = nyse.valid_days(start_date=pub_date.date(), end_date=pub_date.date()) | |
| if not valid_days.empty and pub_date.date() == valid_days[0].date(): | |
| return pub_date | |
| else: | |
| next_trading_day = nyse.schedule(start_date=pub_date.date() + pd.DateOffset(days=1), end_date=pub_date.date() + pd.DateOffset(days=10)).iloc[0]['market_open'] | |
| return next_trading_day | |
| df['pubDate'] = df['pubDate'].apply(adjust_date) | |
| # Converting probabiltiies to one value | |
| def convert_sentiment_to_score(sentiment): | |
| predicted_sentiment_probabilities = {} | |
| components = sentiment.split(', ') | |
| for component in components: | |
| key_value = component.split(':') | |
| if len(key_value) == 2: | |
| key, value = key_value | |
| key = key.strip(" '{}").capitalize() | |
| try: | |
| value = float(value.strip()) | |
| except ValueError: | |
| continue | |
| predicted_sentiment_probabilities[key] = value | |
| positive = predicted_sentiment_probabilities.get('Positive', 0) | |
| negative = predicted_sentiment_probabilities.get('Negative', 0) | |
| neutral = predicted_sentiment_probabilities.get('Neutral',0) | |
| sentiment_score = (positive - negative)/(1 + neutral) | |
| return sentiment_score | |
| df['Sentiment_Score_2'] = df['Sentiment'].apply(convert_sentiment_to_score) | |
| # replacing invalid tickers | |
| df['pubDate'] = pd.to_datetime(df['pubDate'], utc=True, format='ISO8601') | |
| df['pubDate'] = df['pubDate'].dt.date | |
| print(df['pubDate'].dtypes) | |
| replacements = { | |
| 'ATVI': 'ATVIX', | |
| 'ABC': 'ABG', | |
| 'FBHS': 'FBIN', | |
| 'FISV': 'FI', | |
| 'FRC': 'FRCB', | |
| 'NLOK': 'SYM.MU', | |
| 'PKI': 'PKN.SG', | |
| 'RE': 'EG', | |
| 'SIVB': 'SIVBQ', | |
| } | |
| df['Ticker'] = df['Ticker'].replace(replacements) | |
| df = df[df['Ticker'] != 'SBNY'] | |
| # | |
| aggregated_data = df.groupby(['Ticker', 'pubDate'])['Sentiment_Score_2'].mean().reset_index() | |
| aggregated_data['pubDate'] = pd.to_datetime(aggregated_data['pubDate']).dt.tz_localize('UTC') | |
| aggregated_data.set_index(['pubDate', 'Ticker'], inplace=True) | |
| prices = pd.read_csv(prices, index_col=0, parse_dates=True) | |
| # | |
| equal_weighted_benchmark = prices.pct_change(periods=1).shift(periods=-1).mean(axis=1) | |
| equal_weighted_benchmark_df = equal_weighted_benchmark.reset_index() | |
| equal_weighted_benchmark_df.columns = ['date', 'equal_weighted_benchmark'] | |
| returns_5d=prices.pct_change(periods=5).shift(periods=-5)/5 | |
| returns_10d=prices.pct_change(periods=10).shift(periods=-10)/10 | |
| returns_20d=prices.pct_change(periods=20).shift(periods=-20)/20 | |
| mean_5d = returns_5d.mean(axis=1).reset_index() | |
| mean_10d = returns_10d.mean(axis=1).reset_index() | |
| mean_20d = returns_20d.mean(axis=1).reset_index() | |
| mean_5d.columns = ['date', '5d_mean_return'] | |
| mean_10d.columns = ['date', '10d_mean_return'] | |
| mean_20d.columns = ['date', '20d_mean_return'] | |
| equal_weighted_benchmark_df = equal_weighted_benchmark_df.merge(mean_5d, on='date', how='left') | |
| equal_weighted_benchmark_df = equal_weighted_benchmark_df.merge(mean_10d, on='date', how='left') | |
| equal_weighted_benchmark_df = equal_weighted_benchmark_df.merge(mean_20d, on='date', how='left') | |
| cut_date_min= aggregated_data.index.get_level_values('pubDate').min() | |
| cut_date_max= aggregated_data.index.get_level_values('pubDate').max() | |
| equal_weighted_benchmark_df = equal_weighted_benchmark_df[equal_weighted_benchmark_df.date>=cut_date_min] | |
| equal_weighted_benchmark_df = equal_weighted_benchmark_df[equal_weighted_benchmark_df.date<=cut_date_max] | |
| equal_weighted_benchmark_df | |
| # | |
| tickers = aggregated_data.index.get_level_values('Ticker').unique() | |
| start_date = aggregated_data.index.get_level_values('pubDate').min() - pd.Timedelta(days=30) | |
| end_date = aggregated_data.index.get_level_values('pubDate').max() + pd.Timedelta(days=30) | |
| all_dates = prices.loc[cut_date_min:cut_date_max].index | |
| all_tickers_dates = pd.MultiIndex.from_product([tickers, all_dates], names=['Ticker', 'Date']) | |
| all_tickers_dates_df = pd.DataFrame(index=all_tickers_dates).reset_index() | |
| aggregated_data_reset = aggregated_data.reset_index() | |
| merged_data = pd.merge(all_tickers_dates_df, aggregated_data_reset, how='left', left_on=['Ticker', 'Date'], right_on=['Ticker', 'pubDate']) | |
| sector_data = pd.read_excel('scraping.xlsx', usecols=['Ticker', 'Sector']) | |
| merged_data = merged_data.reset_index() | |
| merged_data = pd.merge(merged_data, sector_data, how='left', left_on='Ticker', right_on='Ticker') | |
| # | |
| decay_factor = 0.7 | |
| for ticker in tickers: | |
| ticker_data = merged_data[merged_data['Ticker'] == ticker].copy() | |
| original_nans = ticker_data['Sentiment_Score_2'].isna() | |
| ticker_data['Sentiment_Score_2'] = ticker_data['Sentiment_Score_2'].ffill() | |
| for i in range(1, len(ticker_data)): | |
| if original_nans.iloc[i]: | |
| ticker_data.iloc[i, ticker_data.columns.get_loc('Sentiment_Score_2')] = ticker_data.iloc[i - 1, ticker_data.columns.get_loc('Sentiment_Score_2')] * decay_factor | |
| merged_data.loc[merged_data['Ticker'] == ticker, 'Sentiment_Score_2'] = ticker_data['Sentiment_Score_2'] | |
| merged_data['Sentiment_Score_2'].fillna(0, inplace=True) | |
| merged_data.drop(columns=['pubDate'], inplace=True) | |
| merged_data.set_index(['Date', 'Ticker'], inplace=True) | |
| return merged_data, prices, equal_weighted_benchmark_df | |
| # Alphalens | |
| def alphalens_analysis(merged_data, prices): | |
| factor_data=[] | |
| factor_data = al.utils.get_clean_factor_and_forward_returns( | |
| factor=merged_data['Sentiment_Score_2'], | |
| prices=prices, | |
| binning_by_group=False, | |
| bins=None, | |
| quantiles=5, | |
| periods=(1, 5, 10, 20), | |
| groupby=merged_data['Sector'], | |
| ) | |
| al.tears.create_returns_tear_sheet(factor_data, long_short=True, group_neutral=False) | |
| return factor_data | |
| def alphalens_analysis_by_sector(factor_data): | |
| mean_return_by_qt, std_err_by_qt = al.performance.mean_return_by_quantile(factor_data, by_group=True) | |
| al.plotting.plot_quantile_returns_bar(mean_return_by_qt, by_group=True) | |
| def calculate_information_ratio(factor_data, equal_weighted_benchmark_df): | |
| # Merge the factor data with the benchmark data | |
| factor_data = factor_data.merge(equal_weighted_benchmark_df, on='date', how='left') | |
| # Calculate excess returns for various holding periods | |
| factor_data['excess_return_1D'] = factor_data['1D'] - factor_data['equal_weighted_benchmark'] | |
| factor_data['excess_return_5D'] = factor_data['5D'] - factor_data['5d_mean_return'] | |
| factor_data['excess_return_10D'] = factor_data['10D'] - factor_data['10d_mean_return'] | |
| factor_data['excess_return_20D'] = factor_data['20D'] - factor_data['20d_mean_return'] | |
| # Initialize a DataFrame to store IR results | |
| results = pd.DataFrame(index=range(1, 6), columns=['IR 1D', 'IR 5D', 'IR 10D', 'IR 20D']) | |
| # Calculate IR for each quantile and holding period | |
| for quantile in range(1, 6): | |
| for period in [1, 5, 10, 20]: | |
| column_name = f'excess_return_{period}D' | |
| tmp = factor_data[factor_data.factor_quantile == quantile][['date', column_name]].groupby('date').mean() | |
| ir = np.mean(tmp) / np.std(tmp) * np.sqrt(252) | |
| results.at[quantile, f'IR {period}D'] = ir.values[0] | |
| from IPython.display import display | |
| display(results.style.format("{:.3f}")) | |