Runtime error
Runtime error
import s3fs | |
import pandas as pd | |
import numpy as np | |
from numpy import arange | |
from colour import Color | |
import plotly.graph_objects as go | |
from nltk import tokenize | |
from IPython.display import Markdown | |
from PIL import ImageColor | |
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer | |
import nltk | |'punkt') | |
import email | |
import codecs | |
import pickle | |
import string | |
from scipy import spatial | |
import re | |
import pytorch_lightning as pl | |
from bs4 import BeautifulSoup | |
import ipywidgets as widgets | |
from ipywidgets import FileUpload | |
from urlextract import URLExtract | |
from transformers import BertTokenizerFast as BertTokenizer, BertModel, BertConfig | |
import torch.nn as nn | |
import torch | |
from ipywidgets import interact, Dropdown | |
import boto3 | |
# from sagemaker import get_execution_role | |
from scipy import spatial | |
from ipyfilechooser import FileChooser | |
import streamlit as st | |
import utils | |
PARAMS={ | |
'BATCH_SIZE': 8, | |
'MAX_TOKEN_COUNT':100, | |
'BERT_MODEL_NAME':'google/bert_uncased_L-2_H-128_A-2' , | |
'N_EPOCHS': 10, | |
'n_classes':8, | |
'LABEL_COLUMNS': ['label_analytical', 'label_casual', 'label_confident', 'label_friendly', | |
'label_joyful', 'label_opstimistic', 'label_respectful', | |
'label_urgent'], | |
'TEXTCOL': 'text', | |
'rf_labels':['label_analytical', 'label_casual', 'label_confident', | |
'label_friendly', 'label_joyful', 'label_opstimistic', | |
'label_respectful', 'label_urgent', | |
'industry_Academic and Education', 'industry_Energy', | |
'industry_Entertainment', 'industry_Finance and Banking', | |
'industry_Healthcare', 'industry_Hospitality', 'industry_Real Estate', | |
'industry_Retail', 'industry_Software and Technology', | |
'campaign_type_Abandoned_Cart', 'campaign_type_Engagement', | |
'campaign_type_Newsletter', 'campaign_type_Product_Announcement', | |
'campaign_type_Promotional', 'campaign_type_Review_Request', | |
'campaign_type_Survey', 'campaign_type_Transactional', | |
'campaign_type_Usage_and_Consumption', 'campaign_type_Webinar'] | |
} | |
# # CI_rates=pd.read_csv('CI_RATES.csv') | |
# s3://emailcampaignmodeldata/ModelSADataSets/CI_RATES.csv | |
CI_rates = utils.get_files_from_aws('emailcampaignmodeldata','ModelSADataSets/CI_RATES.csv') | |
### create file uploading widget | |
def email_upload(): | |
print("Please upload your email (In EML Format)") | |
upload = FileUpload(accept='.eml', multiple=True) | |
display(upload) | |
return upload | |
def parse_email(uploaded_file): | |
check=[] | |
filename = list(uploaded_file.value.keys())[0] | |
email_body_str = codecs.decode(uploaded_file.value[filename]['content'], encoding="utf-8") | |
b=email.message_from_string(email_body_str) | |
for part in b.walk(): | |
if part.get_content_type(): | |
body = str(part.get_payload()) | |
soup = BeautifulSoup(body) | |
paragraphs = soup.find_all('body') | |
for paragraph in paragraphs: | |
check.append(paragraph.text) | |
file="".join(check) | |
return file | |
def text_clean(x,punct=True): | |
### Light | |
x = x.lower() # lowercase everything | |
x = x.encode('ascii', 'ignore').decode() # remove unicode characters | |
x = re.sub(r'https*\S+', ' ', x) # remove links | |
x = re.sub(r'http*\S+', ' ', x) | |
# cleaning up text | |
x = re.sub(r'\'\w+', ' ', x) | |
x = re.sub(r'\w*\d+\w*', ' ', x) | |
x = re.sub(r'\s{2,}', ' ', x) | |
x = re.sub(r'\s[^\w\s]\s', ' ', x) | |
### Heavy | |
x = re.sub(r'@\S', ' ', x) | |
x = re.sub(r'#\S+', ' ', x) | |
x=x.replace('=',' ') | |
if(punct==True): | |
x = re.sub('[%s]' % re.escape(string.punctuation), ' ', x) | |
# remove single letters and numbers surrounded by space | |
x = re.sub(r'\s[a-z]\s|\s[0-9]\s', ' ', x) | |
clean=[' Â\x8a','\t','\n','Ã\x83','Â\x92','Â\x93','Â\x8a','Â\x95'] | |
for y in clean: | |
x=x.replace(y,'') | |
return x | |
class ToneTagger(pl.LightningModule): | |
def __init__(self, n_classes: int, n_training_steps=None, n_warmup_steps=None): | |
super().__init__() | |
self.bert = BertModel.from_pretrained(PARAMS['BERT_MODEL_NAME'], return_dict=True) | |
self.classifier = nn.Linear(self.bert.config.hidden_size, n_classes) | |
self.n_training_steps = n_training_steps | |
self.n_warmup_steps = n_warmup_steps | |
self.criterion = nn.BCELoss() | |
def forward(self, input_ids, attention_mask): | |
output = self.bert(input_ids,attention_mask) | |
output = self.classifier(output.pooler_output) | |
output = torch.sigmoid(output) | |
return output | |
model=ToneTagger(8) # load up the model archetecture with 8 different tones | |
model.load_state_dict(torch.load("models/SAMODEL")) # populate the weights of the model | |
model.eval() | |
def bert_tones(text_sentences,model): | |
""" This function takes in setences and the model cleaned them then predicts the bert tones""" | |
predictions=[] | |
text=[] | |
tokenizer = BertTokenizer.from_pretrained('google/bert_uncased_L-2_H-128_A-2') | |
for sent in text_sentences: | |
text.append(text_clean(sent,False)) | |
cleaned_text=text_clean(sent) | |
encoding = tokenizer.encode_plus( | |
cleaned_text, | |
add_special_tokens=True, | |
max_length=100, | |
return_token_type_ids=False, | |
padding="max_length", | |
truncation=True, | |
return_attention_mask=True, | |
return_tensors='pt', | |
) | |
with torch.no_grad(): | |
inputs=encoding['input_ids'] | |
attention=encoding['attention_mask'] | |
pred=model(inputs,attention) | |
pred=pred.cpu().numpy() | |
predictions.append(np.array(pred[0])) | |
return text,predictions | |
def convert_text_to_tone(text,model=model,params=PARAMS): | |
""" This Function will convert the text to tone, it takes in the text with punctuations seperates it into senteces""" | |
data=[] | |
# Find the sentiment from vader sentiment analyzer (Not currently in use) | |
sid_obj = SentimentIntensityAnalyzer() | |
total_cleaned=text_clean(text) | |
sentiment_dict = sid_obj.polarity_scores(total_cleaned)# Find the sentiment from | |
text_sentences=tokenize.sent_tokenize(text) #Find all the different sentences through the NLTK library | |
plain_text,predictions=bert_tones(text_sentences,model) | |
data.append([plain_text,sentiment_dict,predictions]) | |
final=pd.DataFrame(data,columns=['text','sentiment','sentencetone']) | |
agg_tones=final['sentencetone'].apply(np.mean,axis=0) | |
tones=pd.DataFrame(agg_tones.tolist(),columns=params['LABEL_COLUMNS']) | |
return final,tones | |
### This will be abstracted away to a more dynamic model | |
brf='Rate_Models/bounce_rate_model.sav' | |
BRM = pickle.load(open(brf, 'rb')) | |
orf='Rate_Models/open_rate_model.sav' | |
ORM = pickle.load(open(orf, 'rb')) | |
urf='Rate_Models/unsubscribe_rate_model.sav' | |
URM = pickle.load(open(urf, 'rb')) | |
crf='Rate_Models/click_trough_rate_model.sav' | |
CRM = pickle.load(open(crf, 'rb')) | |
CV='Rate_Models/Conversion_rate.sav' | |
ConM = pickle.load(open(CV, 'rb')) | |
CTOR='Rate_Models/Click-To-Open_Rates.sav' | |
CTORM = pickle.load(open(CTOR, 'rb')) | |
RV='Rate_Models/Revenue_per_email.sav' | |
RVM = pickle.load(open(RV, 'rb')) | |
model_dict={'Open_Rate':ORM, | |
'Click_Through_Rate': CRM, | |
'Unsubscribe_Rate': URM, | |
'Bounce_Rate':BRM, | |
'Click_To_Open_Rate': CTORM, | |
'Conversion_Rate': ConM, | |
'Revenue_Per_Email':RVM} | |
def plot_CI(pred,lower,upper,scale_factor=0.5,streamlit=False): | |
"""This function plots the confidence intervals of your prediction | |
pred- The prediction varaible given from the Random Forest for the target variable | |
lower- The lower half of the prediction confidence interval | |
upper- The upper half of the confidence interval | |
scale_factor- This will modify the size of the graph """ | |
title=f'The Predicted Value is {pred}' | |
fig = go.Figure() | |
fig.update_xaxes(showgrid=False) | |
fig.update_yaxes(showgrid=False, | |
zeroline=True, zerolinecolor='black', zerolinewidth=3, | |
showticklabels=False) | |
fig.update_layout(height=200, plot_bgcolor='white') | |
fig.add_trace(go.Scatter( | |
x=[pred], y=[0,0], mode='markers', marker_size=10,line=dict(color="red") | |
)) | |
fig.update_layout(xaxis_range=[0,upper+upper*scale_factor]) | |
fig.update_layout(showlegend=False) | |
fig.add_vline(x=lower,annotation_text=f"{lower}",annotation_position="top") | |
fig.add_vline(x=upper,annotation_text=f"{upper}",annotation_position="top") | |
fig.add_vrect(lower,upper,fillcolor='red',opacity=0.25,annotation_text='95% CI',annotation_position="outside top") | |
fig.update_layout(title_text=title, title_x=0.5) | |
if streamlit: | |
st.plotly_chart(fig) | |
else: | | | |
def find_max_cat(df,target,industry,campaign): | |
d=df[(df[campaign]==1) & (df[industry]==1)] | |
if(len(d)>0): | |
rec=df.loc[d[target].idxmax()][3:11] | |
return round(d[target].max(),3),rec | |
else: | |
return 0,0 | |
def recommend(tones,recommend_changes,change,target,streamlit=False): | |
''' This function creates the recomended changes plots it takes it the tones, the changes and ''' | |
fig = go.Figure() | |
fig.add_trace(go.Bar( | |
y=tones.columns, | |
x=tones.values[0], | |
name='Current Tones', | |
orientation='h', | |
# text=np.round(tones.values[0],3), | |
width=.5, | |
marker=dict( | |
color='#00e6b1', | |
line=dict(color='rgba(58, 71, 80, 1.0)', width=3) | |
) | |
)) | |
fig.add_trace(go.Bar( | |
y=tones.columns, | |
x=recommend_changes, | |
name='Recommend changes', | |
orientation='h', | |
text=np.round(recommend_changes,3), | |
width=0.3, | |
marker=dict( | |
color='#e60f00', | |
line=dict(color='rgba(58, 71, 80, 1.0)', width=3) | |
) | |
)) | |
fig.update_traces(textfont_size=18, textposition="outside", cliponaxis=False) | |
fig.update_layout(height=700, plot_bgcolor='white') | |
fig.update_layout(barmode='stack', yaxis={'categoryorder':'array','categoryarray': recommend_changes.sort_values(key=abs,ascending=True).index}) | |
fig.update_layout(title_text=f'The following Changes will yield a {round(change,3)} increase in {target}') | |
if streamlit: | |
st.plotly_chart(fig) | |
else: | | | |
def prediction(tones,campaign_val,industry_val,target): | |
model_val=pd.DataFrame(tones,columns=PARAMS['rf_labels']).fillna(0) | |
model_val.loc[0,campaign_val]=1 | |
model_val.loc[0,industry_val]=1 | |
model=model_dict[target] | |
pred=model.predict(model_val)[0] | |
CI=CI_rates[CI_rates['model']==target] | |
lower=pred+CI['2_5'].values[0] | |
higher=pred+CI['97_5'].values[0] | |
return round(pred,3),round(lower,3),round(higher,3),model | |
def load_data(buckets,key): | |
# data_location='Tone_and_target.csv' | |
# data=pd.read_csv(data_location) | |
df=utils.get_files_from_aws(buckets,key) | |
df_unique = df.drop_duplicates() | |
df_unique = pd.get_dummies(df_unique, columns=['industry','campaign_type']) | |
df_data=df_unique.drop(columns=['Unnamed: 0','body']) | |
df_data=df_data.rename(columns={'Click-To-Open Rates':'Click_To_Open_Rate','Conversion Rate':'Conversion_Rate','Revenue Per email':'Revenue_Per_Email'}) | |
return df_data | |
def plot_table(sorted_setence_tuple,streamlit=True): | |
""" Plots the bottom most table, takes in a list of tuples where the tuple is the sentence the sentiment distance | |
from the best values """ | |
sentences=list(zip(*sorted_setence_tuple))[0] | |
scores= list(zip(*sorted_setence_tuple))[1] | |
colors= list(zip(*sorted_setence_tuple))[2] | |
rbg_list=[] | |
for i in colors: | |
rbg_list.append('rgb'+str(i)) | |
fig = go.Figure(data=[go.Table( | |
header=dict(values=['<b>Sentences</b>', '<b>Difference from Recommended Tone</b>'], | |
line_color = 'darkslategray', | |
fill_color = '#010405', | |
align = 'center', | |
font=dict(family="Metropolis",color='white', size=16)), | |
cells=dict(values=[sentences, # 1st column | |
scores] , # 2nd column | |
line_color='darkslategray', | |
fill_color=[rbg_list], | |
align=['left','center'], | |
font=dict(family="Arial",size=12))) | |
]) | |
if streamlit: | |
st.plotly_chart(fig) | |
else: | | | |
def corrections(best,df,streamlit=False): | |
"""This function finds the the difference between the tone of each sentence and the best tone for the desired metric | |
best- tone values of the best email for the current categories | |
df- dataframe of the sentences of the uploaded email and the """ | |
sentence_order=[] | |
colors=['#48f0c9','#6ef5d6','#94f7e1','#bbfaec','#e6fff9','#ffe7e6','#ffc3bf','#ffa099','#ff7c73','#ff584d'] #loxz green primary to Loxz light red | |
for i in range(len(df['sentencetone'][0])): | |
text=df['text'][0][i] | |
cur=df['sentencetone'][0][i] | |
cosine_distance= spatial.distance.cosine(best,cur) | |
distance=cosine_distance # Cosine distance | |
new_value = round(( (distance - 0) / (1 - 0) ) * (100 - 0) + 0) # for distance metric this is just normalizing the varaible | |
color_value=round(( (distance - 0) / (1 - 0) ) * (10 - 0) + 0) # Color whell value | |
col=colors[(color_value)] | |
rbg=ImageColor.getcolor(f'{col}', "RGB") | |
sentence_order.append((text,new_value,rbg)) | |
sorted_sentences=sorted(sentence_order,key=lambda x: x[1],reverse=True) | |
plot_table(sorted_sentences,streamlit) | |
def read_file(fc): | |
with open(fc.selected) as file: # Use file to refer to the file object | |
data = | |
check=[] | |
b=email.message_from_string(data) | |
for part in b.walk(): | |
if part.get_content_type(): | |
body = str(part.get_payload()) | |
soup = BeautifulSoup(body) | |
paragraphs = soup.find_all('body') | |
for paragraph in paragraphs: | |
check.append(paragraph.text) | |
file="".join(check) | |
return file |