AB_ESG_Pipeline / financial_analysis.py
Rundstedtz's picture
Upload 9 files
6096304 verified
import pandas as pd
import numpy as np
import yfinance as yf
import ast
from scipy import stats
from datetime import datetime, timedelta
import pytz
import pandas_market_calendars as mcal
import alphalens as al
import matplotlib.pyplot as plt
def sentiment_to_numerical(sentiment):
mapping = {'Negative': -1, 'Positive': 1, 'Neutral': 0}
return sentiment.map(mapping)
def process_sentiment_data(sentiment_data = 'finbert_sentiment.csv', sector_ticker = 'sector_ticker.csv', prices = 'prices.csv'):
columns_to_load = ['Ticker', 'pubDate', 'finbert_output']
df = pd.read_csv(sentiment_data, usecols=columns_to_load)
df.rename(columns={'Publication Date': 'pubDate','finbert_output': 'Sentiment'}, inplace=True)
# Adjusting the dates of news articles
nyse = mcal.get_calendar('NYSE')
trading_start_hour = 9
trading_start_minute = 30
trading_end_hour = 16
trading_end_minute = 0
def adjust_date(pub_date):
if pd.isnull(pub_date) or not isinstance(pub_date, pd.Timestamp):
return pub_date
trading_start_time = pd.Timestamp(f'{pub_date.date()} {trading_start_hour}:{trading_start_minute}')
if pub_date >= trading_start_time:
next_trading_day = nyse.schedule(start_date=pub_date.date() + pd.DateOffset(days=1), end_date=pub_date.date() + pd.DateOffset(days=10)).iloc[0]['market_open']
return next_trading_day
else:
valid_days = nyse.valid_days(start_date=pub_date.date(), end_date=pub_date.date())
if not valid_days.empty and pub_date.date() == valid_days[0].date():
return pub_date
else:
next_trading_day = nyse.schedule(start_date=pub_date.date() + pd.DateOffset(days=1), end_date=pub_date.date() + pd.DateOffset(days=10)).iloc[0]['market_open']
return next_trading_day
df['pubDate'] = df['pubDate'].apply(adjust_date)
# Converting probabiltiies to one value
def convert_sentiment_to_score(sentiment):
predicted_sentiment_probabilities = {}
components = sentiment.split(', ')
for component in components:
key_value = component.split(':')
if len(key_value) == 2:
key, value = key_value
key = key.strip(" '{}").capitalize()
try:
value = float(value.strip())
except ValueError:
continue
predicted_sentiment_probabilities[key] = value
positive = predicted_sentiment_probabilities.get('Positive', 0)
negative = predicted_sentiment_probabilities.get('Negative', 0)
neutral = predicted_sentiment_probabilities.get('Neutral',0)
sentiment_score = (positive - negative)/(1 + neutral)
return sentiment_score
df['Sentiment_Score_2'] = df['Sentiment'].apply(convert_sentiment_to_score)
# replacing invalid tickers
df['pubDate'] = pd.to_datetime(df['pubDate'], utc=True, format='ISO8601')
df['pubDate'] = df['pubDate'].dt.date
print(df['pubDate'].dtypes)
replacements = {
'ATVI': 'ATVIX',
'ABC': 'ABG',
'FBHS': 'FBIN',
'FISV': 'FI',
'FRC': 'FRCB',
'NLOK': 'SYM.MU',
'PKI': 'PKN.SG',
'RE': 'EG',
'SIVB': 'SIVBQ',
}
df['Ticker'] = df['Ticker'].replace(replacements)
df = df[df['Ticker'] != 'SBNY']
#
aggregated_data = df.groupby(['Ticker', 'pubDate'])['Sentiment_Score_2'].mean().reset_index()
aggregated_data['pubDate'] = pd.to_datetime(aggregated_data['pubDate']).dt.tz_localize('UTC')
aggregated_data.set_index(['pubDate', 'Ticker'], inplace=True)
prices = pd.read_csv(prices, index_col=0, parse_dates=True)
#
equal_weighted_benchmark = prices.pct_change(periods=1).shift(periods=-1).mean(axis=1)
equal_weighted_benchmark_df = equal_weighted_benchmark.reset_index()
equal_weighted_benchmark_df.columns = ['date', 'equal_weighted_benchmark']
returns_5d=prices.pct_change(periods=5).shift(periods=-5)/5
returns_10d=prices.pct_change(periods=10).shift(periods=-10)/10
returns_20d=prices.pct_change(periods=20).shift(periods=-20)/20
mean_5d = returns_5d.mean(axis=1).reset_index()
mean_10d = returns_10d.mean(axis=1).reset_index()
mean_20d = returns_20d.mean(axis=1).reset_index()
mean_5d.columns = ['date', '5d_mean_return']
mean_10d.columns = ['date', '10d_mean_return']
mean_20d.columns = ['date', '20d_mean_return']
equal_weighted_benchmark_df = equal_weighted_benchmark_df.merge(mean_5d, on='date', how='left')
equal_weighted_benchmark_df = equal_weighted_benchmark_df.merge(mean_10d, on='date', how='left')
equal_weighted_benchmark_df = equal_weighted_benchmark_df.merge(mean_20d, on='date', how='left')
cut_date_min= aggregated_data.index.get_level_values('pubDate').min()
cut_date_max= aggregated_data.index.get_level_values('pubDate').max()
equal_weighted_benchmark_df = equal_weighted_benchmark_df[equal_weighted_benchmark_df.date>=cut_date_min]
equal_weighted_benchmark_df = equal_weighted_benchmark_df[equal_weighted_benchmark_df.date<=cut_date_max]
equal_weighted_benchmark_df
#
tickers = aggregated_data.index.get_level_values('Ticker').unique()
start_date = aggregated_data.index.get_level_values('pubDate').min() - pd.Timedelta(days=30)
end_date = aggregated_data.index.get_level_values('pubDate').max() + pd.Timedelta(days=30)
all_dates = prices.loc[cut_date_min:cut_date_max].index
all_tickers_dates = pd.MultiIndex.from_product([tickers, all_dates], names=['Ticker', 'Date'])
all_tickers_dates_df = pd.DataFrame(index=all_tickers_dates).reset_index()
aggregated_data_reset = aggregated_data.reset_index()
merged_data = pd.merge(all_tickers_dates_df, aggregated_data_reset, how='left', left_on=['Ticker', 'Date'], right_on=['Ticker', 'pubDate'])
sector_data = pd.read_excel('scraping.xlsx', usecols=['Ticker', 'Sector'])
merged_data = merged_data.reset_index()
merged_data = pd.merge(merged_data, sector_data, how='left', left_on='Ticker', right_on='Ticker')
#
decay_factor = 0.7
for ticker in tickers:
ticker_data = merged_data[merged_data['Ticker'] == ticker].copy()
original_nans = ticker_data['Sentiment_Score_2'].isna()
ticker_data['Sentiment_Score_2'] = ticker_data['Sentiment_Score_2'].ffill()
for i in range(1, len(ticker_data)):
if original_nans.iloc[i]:
ticker_data.iloc[i, ticker_data.columns.get_loc('Sentiment_Score_2')] = ticker_data.iloc[i - 1, ticker_data.columns.get_loc('Sentiment_Score_2')] * decay_factor
merged_data.loc[merged_data['Ticker'] == ticker, 'Sentiment_Score_2'] = ticker_data['Sentiment_Score_2']
merged_data['Sentiment_Score_2'].fillna(0, inplace=True)
merged_data.drop(columns=['pubDate'], inplace=True)
merged_data.set_index(['Date', 'Ticker'], inplace=True)
return merged_data, prices, equal_weighted_benchmark_df
# Alphalens
def alphalens_analysis(merged_data, prices):
factor_data=[]
factor_data = al.utils.get_clean_factor_and_forward_returns(
factor=merged_data['Sentiment_Score_2'],
prices=prices,
binning_by_group=False,
bins=None,
quantiles=5,
periods=(1, 5, 10, 20),
groupby=merged_data['Sector'],
)
al.tears.create_returns_tear_sheet(factor_data, long_short=True, group_neutral=False)
return factor_data
def alphalens_analysis_by_sector(factor_data):
mean_return_by_qt, std_err_by_qt = al.performance.mean_return_by_quantile(factor_data, by_group=True)
al.plotting.plot_quantile_returns_bar(mean_return_by_qt, by_group=True)
def calculate_information_ratio(factor_data, equal_weighted_benchmark_df):
# Merge the factor data with the benchmark data
factor_data = factor_data.merge(equal_weighted_benchmark_df, on='date', how='left')
# Calculate excess returns for various holding periods
factor_data['excess_return_1D'] = factor_data['1D'] - factor_data['equal_weighted_benchmark']
factor_data['excess_return_5D'] = factor_data['5D'] - factor_data['5d_mean_return']
factor_data['excess_return_10D'] = factor_data['10D'] - factor_data['10d_mean_return']
factor_data['excess_return_20D'] = factor_data['20D'] - factor_data['20d_mean_return']
# Initialize a DataFrame to store IR results
results = pd.DataFrame(index=range(1, 6), columns=['IR 1D', 'IR 5D', 'IR 10D', 'IR 20D'])
# Calculate IR for each quantile and holding period
for quantile in range(1, 6):
for period in [1, 5, 10, 20]:
column_name = f'excess_return_{period}D'
tmp = factor_data[factor_data.factor_quantile == quantile][['date', column_name]].groupby('date').mean()
ir = np.mean(tmp) / np.std(tmp) * np.sqrt(252)
results.at[quantile, f'IR {period}D'] = ir.values[0]
from IPython.display import display
display(results.style.format("{:.3f}"))