Spaces:
Runtime error
Runtime error
import requests | |
from haystack.nodes import BaseComponent | |
class TwitterRetriever(BaseComponent): | |
outgoing_edges = 1 | |
def __init__(self, bearer_token: str, last_k_tweets: int = 10): | |
self.headers = {"Authorization": "Bearer {}".format(bearer_token)} | |
self.last_k_tweets = last_k_tweets | |
def run(self, query: str, last_k_tweets: int = None): | |
if last_k_tweets is None: | |
last_k_tweets = self.last_k_tweets | |
url = f"https://api.twitter.com/1.1/statuses/user_timeline.json?screen_name={query}&count={last_k_tweets}" | |
try: | |
response = requests.request("GET", url, headers=self.headers) | |
twitter_stream = "" | |
for tweet in response.json(): | |
print(tweet) | |
twitter_stream += tweet["text"] + '\n' | |
except Exception as e: | |
twitter_stream = ["Please make sure you are providing a correct, public twitter account"] | |
output = { | |
"results": f"{twitter_stream}", | |
"username": query, | |
} | |
return output | |
def run_batch(self): | |
pass |