SOAPAssistV00 / gpt_index /readers /discord_reader.py
AbeerTrial's picture
Duplicate from AbeerTrial/SOAPAssist
35b22df
"""Discord reader.
Note: this file is named discord_reader.py to avoid conflicts with the
discord.py module.
"""
import asyncio
import logging
import os
from typing import List, Optional
from gpt_index.readers.base import BaseReader
from gpt_index.readers.schema.base import Document
async def read_channel(
discord_token: str, channel_id: int, limit: Optional[int], oldest_first: bool
) -> str:
"""Async read channel.
Note: This is our hack to create a synchronous interface to the
async discord.py API. We use the `asyncio` module to run
this function with `asyncio.get_event_loop().run_until_complete`.
"""
import discord # noqa: F401
messages: List[discord.Message] = []
class CustomClient(discord.Client):
async def on_ready(self) -> None:
try:
logging.info(f"{self.user} has connected to Discord!")
channel = client.get_channel(channel_id)
# only work for text channels for now
if not isinstance(channel, discord.TextChannel):
raise ValueError(
f"Channel {channel_id} is not a text channel. "
"Only text channels are supported for now."
)
# thread_dict maps thread_id to thread
thread_dict = {}
for thread in channel.threads:
thread_dict[thread.id] = thread
async for msg in channel.history(
limit=limit, oldest_first=oldest_first
):
messages.append(msg)
if msg.id in thread_dict:
thread = thread_dict[msg.id]
async for thread_msg in thread.history(
limit=limit, oldest_first=oldest_first
):
messages.append(thread_msg)
except Exception as e:
logging.error("Encountered error: " + str(e))
finally:
await self.close()
intents = discord.Intents.default()
intents.message_content = True
client = CustomClient(intents=intents)
await client.start(discord_token)
msg_txt_list = [m.content for m in messages]
return "\n\n".join(msg_txt_list)
class DiscordReader(BaseReader):
"""Discord reader.
Reads conversations from channels.
Args:
discord_token (Optional[str]): Discord token. If not provided, we
assume the environment variable `DISCORD_TOKEN` is set.
"""
def __init__(self, discord_token: Optional[str] = None) -> None:
"""Initialize with parameters."""
try:
import discord # noqa: F401
except ImportError:
raise ImportError(
"`discord.py` package not found, please run `pip install discord.py`"
)
if discord_token is None:
discord_token = os.environ["DISCORD_TOKEN"]
if discord_token is None:
raise ValueError(
"Must specify `discord_token` or set environment "
"variable `DISCORD_TOKEN`."
)
self.discord_token = discord_token
def _read_channel(
self, channel_id: int, limit: Optional[int] = None, oldest_first: bool = True
) -> str:
"""Read channel."""
result = asyncio.get_event_loop().run_until_complete(
read_channel(
self.discord_token, channel_id, limit=limit, oldest_first=oldest_first
)
)
return result
def load_data(
self,
channel_ids: List[int],
limit: Optional[int] = None,
oldest_first: bool = True,
) -> List[Document]:
"""Load data from the input directory.
Args:
channel_ids (List[int]): List of channel ids to read.
limit (Optional[int]): Maximum number of messages to read.
oldest_first (bool): Whether to read oldest messages first.
Defaults to `True`.
Returns:
List[Document]: List of documents.
"""
results: List[Document] = []
for channel_id in channel_ids:
if not isinstance(channel_id, int):
raise ValueError(
f"Channel id {channel_id} must be an integer, "
f"not {type(channel_id)}."
)
channel_content = self._read_channel(
channel_id, limit=limit, oldest_first=oldest_first
)
results.append(
Document(channel_content, extra_info={"channel": channel_id})
)
return results
if __name__ == "__main__":
reader = DiscordReader()
logging.info("initialized reader")
output = reader.load_data(channel_ids=[1057178784895348746], limit=10)
logging.info(output)