File size: 5,050 Bytes
35b22df
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
"""Slack reader."""
import logging
import os
import time
from typing import List, Optional

from gpt_index.readers.base import BaseReader
from gpt_index.readers.schema.base import Document


class SlackReader(BaseReader):
    """Slack reader.

    Reads conversations from channels.

    Args:
        slack_token (Optional[str]): Slack token. If not provided, we
            assume the environment variable `SLACK_BOT_TOKEN` is set.

    """

    def __init__(self, slack_token: Optional[str] = None) -> None:
        """Initialize with parameters."""
        try:
            from slack_sdk import WebClient
        except ImportError:
            raise ImportError(
                "`slack_sdk` package not found, please run `pip install slack_sdk`"
            )
        if slack_token is None:
            slack_token = os.environ["SLACK_BOT_TOKEN"]
            if slack_token is None:
                raise ValueError(
                    "Must specify `slack_token` or set environment "
                    "variable `SLACK_BOT_TOKEN`."
                )
        self.client = WebClient(token=slack_token)
        res = self.client.api_test()
        if not res["ok"]:
            raise ValueError(f"Error initializing Slack API: {res['error']}")

    def _read_message(self, channel_id: str, message_ts: str) -> str:
        from slack_sdk.errors import SlackApiError

        """Read a message."""

        messages_text = []
        next_cursor = None
        while True:
            try:
                # https://slack.com/api/conversations.replies
                # List all replies to a message, including the message itself.
                result = self.client.conversations_replies(
                    channel=channel_id, ts=message_ts, cursor=next_cursor
                )
                messages = result["messages"]
                for message in messages:
                    messages_text.append(message["text"])

                if not result["has_more"]:
                    break

                next_cursor = result["response_metadata"]["next_cursor"]
            except SlackApiError as e:
                if e.response["error"] == "ratelimited":
                    logging.error(
                        "Rate limit error reached, sleeping for: {} seconds".format(
                            e.response.headers["retry-after"]
                        )
                    )
                    time.sleep(int(e.response.headers["retry-after"]))
                else:
                    logging.error("Error parsing conversation replies: {}".format(e))

        return "\n\n".join(messages_text)

    def _read_channel(self, channel_id: str) -> str:
        from slack_sdk.errors import SlackApiError

        """Read a channel."""

        result_messages = []
        next_cursor = None
        while True:
            try:
                # Call the conversations.history method using the WebClient
                # conversations.history returns the first 100 messages by default
                # These results are paginated,
                # see: https://api.slack.com/methods/conversations.history$pagination
                result = self.client.conversations_history(
                    channel=channel_id, cursor=next_cursor
                )
                conversation_history = result["messages"]
                # Print results
                logging.info(
                    "{} messages found in {}".format(len(conversation_history), id)
                )
                for message in conversation_history:
                    result_messages.append(
                        self._read_message(channel_id, message["ts"])
                    )

                if not result["has_more"]:
                    break
                next_cursor = result["response_metadata"]["next_cursor"]

            except SlackApiError as e:
                if e.response["error"] == "ratelimited":
                    logging.error(
                        "Rate limit error reached, sleeping for: {} seconds".format(
                            e.response.headers["retry-after"]
                        )
                    )
                    time.sleep(int(e.response.headers["retry-after"]))
                else:
                    logging.error("Error parsing conversation replies: {}".format(e))

        return "\n\n".join(result_messages)

    def load_data(self, channel_ids: List[str]) -> List[Document]:
        """Load data from the input directory.

        Args:
            channel_ids (List[str]): List of channel ids to read.

        Returns:
            List[Document]: List of documents.

        """
        results = []
        for channel_id in channel_ids:
            channel_content = self._read_channel(channel_id)
            results.append(
                Document(channel_content, extra_info={"channel": channel_id})
            )
        return results


if __name__ == "__main__":
    reader = SlackReader()
    logging.info(reader.load_data(channel_ids=["C04DC2VUY3F"]))