File size: 3,706 Bytes
cbce622
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#!/usr/bin/env python3
from tweepy import Paginator, TooManyRequests
import os
import pandas as pd
import pickle
from tqdm import tqdm
import yaml

import boto3

from helper.twitter_client_wrapper import (
    TWEET_FIELDS,
    format_tweets_df, format_context_annotations,
    load_topic_domains, load_topic_entities, TwitterClientWrapper
)

USER_IDS_PATH = "users_ids.csv"

def run(twitter_client, directory, users_ids, tweets_per_user=20000, push_to_remote=True):
    topic_domains = load_topic_domains(f'{directory}topic_domains.pickle')
    topic_entities = load_topic_entities(f'{directory}topic_entities.pickle')

    # List where we accumulate the tweets retrieved so far
    viral_users_tweets = []
    # Number of users processed so far
    users_processed = 0
    filename = f"tweets/{users_ids.id[0]}-to-"

    try:
        for user_id in tqdm(users_ids.id):
            for tweet in Paginator(twitter_client.get_users_tweets, id=user_id, tweet_fields=TWEET_FIELDS, exclude="retweets").flatten(limit=tweets_per_user):
                processed_tweet, tweet_topic_domains, tweet_topic_entities = format_context_annotations(tweet.data)
                viral_users_tweets.append(processed_tweet)
                topic_domains.update(tweet_topic_domains)
                topic_entities.update(tweet_topic_entities)
            users_processed += 1
    except TooManyRequests:
        # Reached API limit
        print("Hit Rate Limit")
    finally:
        # Dump all to parquet and keep track at which user we stopped.
        if len(viral_users_tweets) > 0:
            # Append end user id for this iteration to end of filename
            filename += f"{user_id}.parquet.gzip"
            filepath = directory + filename
            os.makedirs(os.path.dirname(filepath), exist_ok=True)
            format_tweets_df(viral_users_tweets).to_parquet(filepath, compression="gzip", index=False)

            # Save the topics encountered so far as pickle file
            with open(f'{directory}topic_domains.pickle', 'wb') as handle:
                pickle.dump(topic_domains, handle, protocol=pickle.HIGHEST_PROTOCOL)

            with open(f'{directory}topic_entities.pickle', 'wb') as handle:
                pickle.dump(topic_entities, handle, protocol=pickle.HIGHEST_PROTOCOL)

            # Update the users ids to remove the ones already processed
            users_ids[users_processed:].to_csv(f"{directory}{USER_IDS_PATH}", index=False)

            if (push_to_remote):
                s3 = boto3.resource("s3")
                bucket_name = "semester-project-twitter-storage"
                # Upload to S3
                s3.Bucket(bucket_name).upload_file(filepath, filename)
        else:
            print("Finished processing users")

    return

def main():
    # TODO: Change depending on whether you're executing this script locally or on a remote server (possibly with s3 access)
    LOCAL = False
    TWEETS_PER_USER = 4000
    
    if LOCAL:
        DIRECTORY = ""
        with open("api_key.yaml", 'rt') as file:
            secret = yaml.safe_load(file)
        BEARER_TOKEN = secret['Bearer Token']
        PUSH_TO_REMOTE = False
    else:
        DIRECTORY="/home/ubuntu/tweet/"
        BEARER_TOKEN = os.environ["BearerToken"]
        PUSH_TO_REMOTE = True
    
    # Authenticate to Twitter
    client_wrapper = TwitterClientWrapper(BEARER_TOKEN, wait_on_rate_limit=False)
    client = client_wrapper.client

    users_ids = pd.read_csv(f"{DIRECTORY}{USER_IDS_PATH}", dtype={"id": str})

    if len(users_ids) != 0:
        run(client, DIRECTORY, users_ids=users_ids, tweets_per_user=TWEETS_PER_USER, push_to_remote=PUSH_TO_REMOTE)

if __name__ == "__main__":
    main()