Spaces:
Sleeping
Sleeping
import yaml | |
import tweepy | |
import pandas as pd | |
import pickle | |
from typing import List | |
######################### TWITTER FIELDS ######################### | |
# Available tweet fields to choose from (as of October 13 2022): | |
# attachments, | |
# author_id, | |
# context_annotations, | |
# conversation_id, | |
# created_at, | |
# edit_controls, | |
# entities, | |
# geo, | |
# id, | |
# in_reply_to_user_id, | |
# lang, | |
# public_metrics, [Important for retweet and like count] | |
# possibly_sensitive, | |
# referenced_tweets, | |
# reply_settings, | |
# source, | |
# text, | |
# withheld | |
# NOTE: You can only get non-public metrics for Tweets that belong to the account you’re authenticated as. You can use the account token and secret with OAuth 1.0A - this will not work with an app-only bearer token. | |
TWEET_FIELDS = ['attachments', 'author_id', 'context_annotations', 'created_at', 'entities', 'geo', 'id', 'lang', 'possibly_sensitive', 'public_metrics', 'referenced_tweets', 'text'] | |
# In case of user fields, public_metrics include followers count, following count, tweet count and listed count | |
USER_FIELDS = ['created_at', 'description', 'id', 'location', 'name', 'pinned_tweet_id', 'protected', 'public_metrics', 'url', 'username', 'verified'] | |
# In case of media fields, public_metrics include view count | |
# duration_ms available only if type is video | |
MEDIA_FIELDS = ['media_key', 'type', 'duration_ms', 'public_metrics'] | |
EXPANSIONS = ['author_id', 'attachments.media_keys'] | |
class TwitterClientWrapper: | |
def __init__(self, bearer_token, wait_on_rate_limit=False) -> None: | |
# Get the bearer token and authenticate with Twitter Client | |
# Authenticate to Twitter | |
self.client = tweepy.Client(bearer_token, wait_on_rate_limit=wait_on_rate_limit) | |
def retrieve_tweets_by_ids(self, ids): | |
return self.client.get_tweets( | |
ids, tweet_fields=TWEET_FIELDS, expansions=EXPANSIONS, user_fields=USER_FIELDS, media_fields=MEDIA_FIELDS) | |
def retrieve_tweet(self, id): | |
return self.client.get_tweet( | |
id, tweet_fields=TWEET_FIELDS, expansions=EXPANSIONS, user_fields=USER_FIELDS, media_fields=MEDIA_FIELDS) | |
######################### HELPER FUNCTIONS ######################### | |
def format_users_df(user_data: List[tweepy.user.User]): | |
'''Format the user using his data. If we specify it and the fields, Twitter API can include user data | |
inside the "includes" field when retrieving tweets. This returns a Tweepy User object, out of which | |
we can retrieve the user fields we specified when querying, by accessing the "data" field. | |
''' | |
users_data_df = pd.json_normalize(user_data) | |
users_data_df.columns = users_data_df.columns.str.removeprefix("public_metrics.") | |
return users_data_df | |
# TODO: Flatten everything and remove prefixes | |
def format_tweets_df(tweets_data): | |
# Remove prefix after normalization of json value columns | |
tweets_data_df = pd.json_normalize(tweets_data) | |
tweets_data_df.columns = tweets_data_df.columns.str.removeprefix("public_metrics.").str.removeprefix("entities.") | |
tweets_data_df.rename(columns={"attachments.media_keys": "has_media"}, inplace=True) | |
tweets_data_df['has_media'] = ~tweets_data_df['has_media'].isna() | |
# Get the hashtags of a tweet if any | |
tweets_data_df['hashtags'] = tweets_data_df['hashtags'].map(lambda hashtags: [hashtag['tag'] for hashtag in hashtags], na_action='ignore') | |
''' | |
tweets_data_df.rename(columns={"hashtags": "has_hashtags"}, inplace=True) | |
tweets_data_df['has_hashtags'] = ~tweets_data_df['has_hashtags'].isna() | |
''' | |
return tweets_data_df | |
def format_context_annotations(tweet): | |
'''Retrieve the context annotations of a tweet if any, and keep only the ids of the topic domains and entities. | |
Returns: the tweet formatted, dict of domains retrieved, dict of entities retrieved | |
''' | |
tweet_copy = tweet.copy() | |
context_annotations = tweet_copy.get('context_annotations', []) | |
# Create a dict of all topic domains in this tweet | |
tweet_topic_domains = dict([(topic['domain']['id'], topic['domain']) for topic in context_annotations]) | |
# Create a dict of all topic entities in this tweet | |
tweet_topic_entities = dict([(topic['entity']['id'], topic['entity']) for topic in context_annotations]) | |
# Columns contain only the ids of the above topic domains and entities | |
tweet_copy['topic_domains'] = list(tweet_topic_domains.keys()) if len(tweet_topic_domains.keys()) > 0 else pd.NA | |
tweet_copy['topic_entities'] = list(tweet_topic_entities.keys()) if len(tweet_topic_entities.keys()) > 0 else pd.NA | |
# Remove the context annotations column to save space | |
tweet_copy.pop('context_annotations', None) | |
return tweet_copy, tweet_topic_domains, tweet_topic_entities | |
def load_topic_domains(path): | |
try: | |
with open(path, 'rb') as handle: | |
topic_domains = pickle.load(handle) | |
except FileNotFoundError: | |
topic_domains = {} | |
return topic_domains | |
def load_topic_entities(path): | |
try: | |
with open(path, 'rb') as handle: | |
topic_entities = pickle.load(handle) | |
except FileNotFoundError: | |
topic_entities = {} | |
return topic_entities | |