Spaces:
Runtime error
Runtime error
"""Slack reader.""" | |
import logging | |
import os | |
import time | |
from typing import List, Optional | |
from gpt_index.readers.base import BaseReader | |
from gpt_index.readers.schema.base import Document | |
class SlackReader(BaseReader): | |
"""Slack reader. | |
Reads conversations from channels. | |
Args: | |
slack_token (Optional[str]): Slack token. If not provided, we | |
assume the environment variable `SLACK_BOT_TOKEN` is set. | |
""" | |
def __init__(self, slack_token: Optional[str] = None) -> None: | |
"""Initialize with parameters.""" | |
try: | |
from slack_sdk import WebClient | |
except ImportError: | |
raise ImportError( | |
"`slack_sdk` package not found, please run `pip install slack_sdk`" | |
) | |
if slack_token is None: | |
slack_token = os.environ["SLACK_BOT_TOKEN"] | |
if slack_token is None: | |
raise ValueError( | |
"Must specify `slack_token` or set environment " | |
"variable `SLACK_BOT_TOKEN`." | |
) | |
self.client = WebClient(token=slack_token) | |
res = self.client.api_test() | |
if not res["ok"]: | |
raise ValueError(f"Error initializing Slack API: {res['error']}") | |
def _read_message(self, channel_id: str, message_ts: str) -> str: | |
from slack_sdk.errors import SlackApiError | |
"""Read a message.""" | |
messages_text = [] | |
next_cursor = None | |
while True: | |
try: | |
# https://slack.com/api/conversations.replies | |
# List all replies to a message, including the message itself. | |
result = self.client.conversations_replies( | |
channel=channel_id, ts=message_ts, cursor=next_cursor | |
) | |
messages = result["messages"] | |
for message in messages: | |
messages_text.append(message["text"]) | |
if not result["has_more"]: | |
break | |
next_cursor = result["response_metadata"]["next_cursor"] | |
except SlackApiError as e: | |
if e.response["error"] == "ratelimited": | |
logging.error( | |
"Rate limit error reached, sleeping for: {} seconds".format( | |
e.response.headers["retry-after"] | |
) | |
) | |
time.sleep(int(e.response.headers["retry-after"])) | |
else: | |
logging.error("Error parsing conversation replies: {}".format(e)) | |
return "\n\n".join(messages_text) | |
def _read_channel(self, channel_id: str) -> str: | |
from slack_sdk.errors import SlackApiError | |
"""Read a channel.""" | |
result_messages = [] | |
next_cursor = None | |
while True: | |
try: | |
# Call the conversations.history method using the WebClient | |
# conversations.history returns the first 100 messages by default | |
# These results are paginated, | |
# see: https://api.slack.com/methods/conversations.history$pagination | |
result = self.client.conversations_history( | |
channel=channel_id, cursor=next_cursor | |
) | |
conversation_history = result["messages"] | |
# Print results | |
logging.info( | |
"{} messages found in {}".format(len(conversation_history), id) | |
) | |
for message in conversation_history: | |
result_messages.append( | |
self._read_message(channel_id, message["ts"]) | |
) | |
if not result["has_more"]: | |
break | |
next_cursor = result["response_metadata"]["next_cursor"] | |
except SlackApiError as e: | |
if e.response["error"] == "ratelimited": | |
logging.error( | |
"Rate limit error reached, sleeping for: {} seconds".format( | |
e.response.headers["retry-after"] | |
) | |
) | |
time.sleep(int(e.response.headers["retry-after"])) | |
else: | |
logging.error("Error parsing conversation replies: {}".format(e)) | |
return "\n\n".join(result_messages) | |
def load_data(self, channel_ids: List[str]) -> List[Document]: | |
"""Load data from the input directory. | |
Args: | |
channel_ids (List[str]): List of channel ids to read. | |
Returns: | |
List[Document]: List of documents. | |
""" | |
results = [] | |
for channel_id in channel_ids: | |
channel_content = self._read_channel(channel_id) | |
results.append( | |
Document(channel_content, extra_info={"channel": channel_id}) | |
) | |
return results | |
if __name__ == "__main__": | |
reader = SlackReader() | |
logging.info(reader.load_data(channel_ids=["C04DC2VUY3F"])) | |