Spaces:
Runtime error
Runtime error
"""Slack reader.""" | |
import logging | |
import os | |
import time | |
from datetime import datetime | |
from ssl import SSLContext | |
from typing import List, Optional | |
from gpt_index.readers.base import BaseReader | |
from gpt_index.readers.schema.base import Document | |
logger = logging.getLogger(__name__) | |
class SlackReader(BaseReader): | |
"""Slack reader. | |
Reads conversations from channels. If an earliest_date is provided, an | |
optional latest_date can also be provided. If no latest_date is provided, | |
we assume the latest date is the current timestamp. | |
Args: | |
slack_token (Optional[str]): Slack token. If not provided, we | |
assume the environment variable `SLACK_BOT_TOKEN` is set. | |
ssl (Optional[str]): Custom SSL context. If not provided, it is assumed | |
there is already an SSL context available. | |
earliest_date (Optional[datetime]): Earliest date from which | |
to read conversations. If not provided, we read all messages. | |
latest_date (Optional[datetime]): Latest date from which to | |
read conversations. If not provided, defaults to current timestamp | |
in combination with earliest_date. | |
""" | |
def __init__( | |
self, | |
slack_token: Optional[str] = None, | |
ssl: Optional[SSLContext] = None, | |
earliest_date: Optional[datetime] = None, | |
latest_date: Optional[datetime] = None, | |
) -> None: | |
"""Initialize with parameters.""" | |
from slack_sdk import WebClient | |
if slack_token is None: | |
slack_token = os.environ["SLACK_BOT_TOKEN"] | |
if slack_token is None: | |
raise ValueError( | |
"Must specify `slack_token` or set environment " | |
"variable `SLACK_BOT_TOKEN`." | |
) | |
if ssl is None: | |
self.client = WebClient(token=slack_token) | |
else: | |
self.client = WebClient(token=slack_token, ssl=ssl) | |
if latest_date is not None and earliest_date is None: | |
raise ValueError( | |
"Must specify `earliest_date` if `latest_date` is specified." | |
) | |
if earliest_date is not None: | |
self.earliest_date_timestamp: Optional[float] = earliest_date.timestamp() | |
else: | |
self.earliest_date_timestamp = None | |
if latest_date is not None: | |
self.latest_date_timestamp = latest_date.timestamp() | |
else: | |
self.latest_date_timestamp = datetime.now().timestamp() | |
res = self.client.api_test() | |
if not res["ok"]: | |
raise ValueError(f"Error initializing Slack API: {res['error']}") | |
def _read_message(self, channel_id: str, message_ts: str) -> str: | |
from slack_sdk.errors import SlackApiError | |
"""Read a message.""" | |
messages_text: List[str] = [] | |
next_cursor = None | |
while True: | |
try: | |
# https://slack.com/api/conversations.replies | |
# List all replies to a message, including the message itself. | |
if self.earliest_date_timestamp is None: | |
result = self.client.conversations_replies( | |
channel=channel_id, ts=message_ts, cursor=next_cursor | |
) | |
else: | |
conversations_replies_kwargs = { | |
"channel": channel_id, | |
"ts": message_ts, | |
"cursor": next_cursor, | |
"latest": str(self.latest_date_timestamp), | |
} | |
if self.earliest_date_timestamp is not None: | |
conversations_replies_kwargs["oldest"] = str( | |
self.earliest_date_timestamp | |
) | |
result = self.client.conversations_replies( | |
**conversations_replies_kwargs # type: ignore | |
) | |
messages = result["messages"] | |
messages_text.extend(message["text"] for message in messages) | |
if not result["has_more"]: | |
break | |
next_cursor = result["response_metadata"]["next_cursor"] | |
except SlackApiError as e: | |
if e.response["error"] == "ratelimited": | |
logger.error( | |
"Rate limit error reached, sleeping for: {} seconds".format( | |
e.response.headers["retry-after"] | |
) | |
) | |
time.sleep(int(e.response.headers["retry-after"])) | |
else: | |
logger.error("Error parsing conversation replies: {}".format(e)) | |
return "\n\n".join(messages_text) | |
def _read_channel(self, channel_id: str, reverse_chronological: bool) -> str: | |
from slack_sdk.errors import SlackApiError | |
"""Read a channel.""" | |
result_messages: List[str] = [] | |
next_cursor = None | |
while True: | |
try: | |
# Call the conversations.history method using the WebClient | |
# conversations.history returns the first 100 messages by default | |
# These results are paginated, | |
# see: https://api.slack.com/methods/conversations.history$pagination | |
conversations_history_kwargs = { | |
"channel": channel_id, | |
"cursor": next_cursor, | |
"latest": str(self.latest_date_timestamp), | |
} | |
if self.earliest_date_timestamp is not None: | |
conversations_history_kwargs["oldest"] = str( | |
self.earliest_date_timestamp | |
) | |
result = self.client.conversations_history( | |
**conversations_history_kwargs # type: ignore | |
) | |
conversation_history = result["messages"] | |
# Print results | |
logger.info( | |
"{} messages found in {}".format( | |
len(conversation_history), channel_id | |
) | |
) | |
result_messages.extend( | |
self._read_message(channel_id, message["ts"]) | |
for message in conversation_history | |
) | |
if not result["has_more"]: | |
break | |
next_cursor = result["response_metadata"]["next_cursor"] | |
except SlackApiError as e: | |
if e.response["error"] == "ratelimited": | |
logger.error( | |
"Rate limit error reached, sleeping for: {} seconds".format( | |
e.response.headers["retry-after"] | |
) | |
) | |
time.sleep(int(e.response.headers["retry-after"])) | |
else: | |
logger.error("Error parsing conversation replies: {}".format(e)) | |
return ( | |
"\n\n".join(result_messages) | |
if reverse_chronological | |
else "\n\n".join(result_messages[::-1]) | |
) | |
def load_data( | |
self, channel_ids: List[str], reverse_chronological: bool = True | |
) -> List[Document]: | |
"""Load data from the input directory. | |
Args: | |
channel_ids (List[str]): List of channel ids to read. | |
Returns: | |
List[Document]: List of documents. | |
""" | |
results = [] | |
for channel_id in channel_ids: | |
channel_content = self._read_channel( | |
channel_id, reverse_chronological=reverse_chronological | |
) | |
results.append( | |
Document(channel_content, extra_info={"channel": channel_id}) | |
) | |
return results | |
if __name__ == "__main__": | |
reader = SlackReader() | |
logger.info(reader.load_data(channel_ids=["C04DC2VUY3F"])) | |