SOAPAssistV00 / gpt_index /readers /github_readers /github_repository_reader.py
AbeerTrial's picture
Duplicate from AbeerTrial/SOAPAssist
35b22df
"""
Github repository reader.
Retrieves the contents of a Github repository and returns a list of documents.
The documents are either the contents of the files in the repository or
the text extracted from the files using the parser.
"""
import asyncio
import base64
import binascii
import logging
import os
import pathlib
import tempfile
from typing import Any, Callable, List, Optional, Tuple
from gpt_index.readers.base import BaseReader
from gpt_index.readers.file.base import DEFAULT_FILE_EXTRACTOR
from gpt_index.readers.github_readers.github_api_client import (
GitBranchResponseModel,
GitCommitResponseModel,
GithubClient,
GitTreeResponseModel,
)
from gpt_index.readers.github_readers.utils import (
BufferedGitBlobDataIterator,
get_file_extension,
print_if_verbose,
)
from gpt_index.readers.schema.base import Document
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class GithubRepositoryReader(BaseReader):
"""
Github repository reader.
Retrieves the contents of a Github repository and returns a list of documents.
The documents are either the contents of the files in the repository or the text
extracted from the files using the parser.
Examples:
>>> reader = GithubRepositoryReader("owner", "repo")
>>> branch_documents = reader.load_data(branch="branch")
>>> commit_documents = reader.load_data(commit_sha="commit_sha")
"""
def __init__(
self,
owner: str,
repo: str,
use_parser: bool = True,
verbose: bool = False,
github_token: Optional[str] = None,
concurrent_requests: int = 5,
ignore_file_extensions: Optional[List[str]] = None,
ignore_directories: Optional[List[str]] = None,
):
"""
Initialize params.
Args:
- owner (str): Owner of the repository.
- repo (str): Name of the repository.
- use_parser (bool): Whether to use the parser to extract
the text from the files.
- verbose (bool): Whether to print verbose messages.
- github_token (str): Github token. If not provided,
it will be read from the GITHUB_TOKEN environment variable.
- concurrent_requests (int): Number of concurrent requests to
make to the Github API.
- ignore_file_extensions (List[str]): List of file extensions to ignore.
i.e. ['.png', '.jpg']
- ignore_directories (List[str]): List of directories to ignore.
i.e. ['node_modules', 'dist']
Raises:
- `ValueError`: If the github_token is not provided and
the GITHUB_TOKEN environment variable is not set.
"""
super().__init__()
if github_token is None:
github_token = os.getenv("GITHUB_TOKEN")
if github_token is None:
raise ValueError(
"Please provide a Github token. "
"You can do so by passing it as an argument or"
+ "by setting the GITHUB_TOKEN environment variable."
)
self._owner = owner
self._repo = repo
self._use_parser = use_parser
self._verbose = verbose
self._concurrent_requests = concurrent_requests
self._ignore_file_extensions = ignore_file_extensions
self._ignore_directories = ignore_directories
# Set up the event loop
try:
self._loop = asyncio.get_running_loop()
except RuntimeError:
# If there is no running loop, create a new one
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
self._client = GithubClient(github_token)
def _load_data_from_commit(self, commit_sha: str) -> List[Document]:
"""
Load data from a commit.
Loads github repository data from a specific commit sha.
:param `commit`: commit sha
:return: list of documents
"""
commit_response: GitCommitResponseModel = self._loop.run_until_complete(
self._client.get_commit(self._owner, self._repo, commit_sha)
)
tree_sha = commit_response.commit.tree.sha
blobs_and_paths = self._loop.run_until_complete(self._recurse_tree(tree_sha))
print_if_verbose(self._verbose, f"got {len(blobs_and_paths)} blobs")
return self._loop.run_until_complete(
self._generate_documents(blobs_and_paths=blobs_and_paths)
)
def _load_data_from_branch(self, branch: str) -> List[Document]:
"""
Load data from a branch.
Loads github repository data from a specific branch.
:param `branch`: branch name
:return: list of documents
"""
branch_data: GitBranchResponseModel = self._loop.run_until_complete(
self._client.get_branch(self._owner, self._repo, branch)
)
tree_sha = branch_data.commit.commit.tree.sha
blobs_and_paths = self._loop.run_until_complete(self._recurse_tree(tree_sha))
print_if_verbose(self._verbose, f"got {len(blobs_and_paths)} blobs")
return self._loop.run_until_complete(
self._generate_documents(blobs_and_paths=blobs_and_paths)
)
def load_data(
self,
commit_sha: Optional[str] = None,
branch: Optional[str] = None,
) -> List[Document]:
"""
Load data from a commit or a branch.
Loads github repository data from a specific commit sha or a branch.
:param `commit`: commit sha
:param `branch`: branch name
:return: list of documents
"""
if commit_sha is not None and branch is not None:
raise ValueError("You can only specify one of commit or branch.")
if commit_sha is None and branch is None:
raise ValueError("You must specify one of commit or branch.")
if commit_sha is not None:
return self._load_data_from_commit(commit_sha)
if branch is not None:
return self._load_data_from_branch(branch)
raise ValueError("You must specify one of commit or branch.")
async def _recurse_tree(
self, tree_sha: str, current_path: str = "", current_depth: int = 0
) -> Any:
"""
Recursively get all blob tree objects in a tree.
And construct their full path relative to the root of the repository.
(see GitTreeResponseModel.GitTreeObject in
github_api_client.py for more information)
:param `tree_sha`: sha of the tree to recurse
:param `current_path`: current path of the tree
:param `current_depth`: current depth of the tree
:return: list of tuples of
(tree object, file's full path realtive to the root of the repo)
"""
blobs_and_full_paths: List[Tuple[GitTreeResponseModel.GitTreeObject, str]] = []
print_if_verbose(
self._verbose, "\t" * current_depth + f"current path: {current_path}"
)
tree_data: GitTreeResponseModel = await self._client.get_tree(
self._owner, self._repo, tree_sha
)
print_if_verbose(
self._verbose, "\t" * current_depth + f"processing tree {tree_sha}"
)
for tree_obj in tree_data.tree:
file_path = os.path.join(current_path, tree_obj.path)
if tree_obj.type == "tree":
print_if_verbose(
self._verbose,
"\t" * current_depth + f"recursing into {tree_obj.path}",
)
if self._ignore_directories is not None:
if file_path in self._ignore_directories:
print_if_verbose(
self._verbose,
"\t" * current_depth
+ f"ignoring tree {tree_obj.path} due to directory",
)
continue
blobs_and_full_paths.extend(
await self._recurse_tree(tree_obj.sha, file_path, current_depth + 1)
)
elif tree_obj.type == "blob":
print_if_verbose(
self._verbose, "\t" * current_depth + f"found blob {tree_obj.path}"
)
if self._ignore_file_extensions is not None:
if get_file_extension(file_path) in self._ignore_file_extensions:
print_if_verbose(
self._verbose,
"\t" * current_depth
+ f"ignoring blob {tree_obj.path} due to file extension",
)
continue
blobs_and_full_paths.append((tree_obj, file_path))
return blobs_and_full_paths
async def _generate_documents(
self, blobs_and_paths: List[Tuple[GitTreeResponseModel.GitTreeObject, str]]
) -> List[Document]:
"""
Generate documents from a list of blobs and their full paths.
:param `blobs_and_paths`: list of tuples of
(tree object, file's full path in the repo realtive to the root of the repo)
:return: list of documents
"""
buffered_iterator = BufferedGitBlobDataIterator(
blobs_and_paths=blobs_and_paths,
github_client=self._client,
owner=self._owner,
repo=self._repo,
loop=self._loop,
buffer_size=self._concurrent_requests, # TODO: make this configurable
verbose=self._verbose,
)
documents = []
async for blob_data, full_path in buffered_iterator:
print_if_verbose(self._verbose, f"generating document for {full_path}")
assert (
blob_data.encoding == "base64"
), f"blob encoding {blob_data.encoding} not supported"
decoded_bytes = None
try:
decoded_bytes = base64.b64decode(blob_data.content)
del blob_data.content
except binascii.Error:
print_if_verbose(
self._verbose, f"could not decode {full_path} as base64"
)
continue
if self._use_parser:
document = self._parse_supported_file(
file_path=full_path,
file_content=decoded_bytes,
tree_sha=blob_data.sha,
tree_path=full_path,
)
if document is not None:
documents.append(document)
else:
continue
try:
if decoded_bytes is None:
raise ValueError("decoded_bytes is None")
decoded_text = decoded_bytes.decode("utf-8")
except UnicodeDecodeError:
print_if_verbose(
self._verbose, f"could not decode {full_path} as utf-8"
)
continue
print_if_verbose(
self._verbose,
f"got {len(decoded_text)} characters"
+ f"- adding to documents - {full_path}",
)
document = Document(
text=decoded_text,
doc_id=blob_data.sha,
extra_info={
"file_path": full_path,
"file_name": full_path.split("/")[-1],
},
)
documents.append(document)
return documents
def _parse_supported_file(
self, file_path: str, file_content: bytes, tree_sha: str, tree_path: str
) -> Optional[Document]:
"""
Parse a file if it is supported by a parser.
:param `file_path`: path of the file in the repo
:param `file_content`: content of the file
:return: Document if the file is supported by a parser, None otherwise
"""
file_extension = get_file_extension(file_path)
parser = DEFAULT_FILE_EXTRACTOR.get(file_extension)
if parser is not None:
parser.init_parser()
print_if_verbose(
self._verbose,
f"parsing {file_path}"
+ f"as {file_extension} with "
+ f"{parser.__class__.__name__}",
)
with tempfile.TemporaryDirectory() as tmpdirname:
with tempfile.NamedTemporaryFile(
dir=tmpdirname,
suffix=f".{file_extension}",
mode="w+b",
delete=False,
) as tmpfile:
print_if_verbose(
self._verbose,
"created a temporary file"
+ f"{tmpfile.name} for parsing {file_path}",
)
tmpfile.write(file_content)
tmpfile.flush()
tmpfile.close()
try:
parsed_file = parser.parse_file(pathlib.Path(tmpfile.name))
parsed_file = "\n\n".join(parsed_file)
except Exception as e:
print_if_verbose(
self._verbose, f"error while parsing {file_path}"
)
logger.error(
"Error while parsing "
+ f"{file_path} with "
+ f"{parser.__class__.__name__}:\n{e}"
)
parsed_file = None
finally:
os.remove(tmpfile.name)
if parsed_file is None:
return None
return Document(
text=parsed_file,
doc_id=tree_sha,
extra_info={
"file_path": file_path,
"file_name": tree_path,
},
)
return None
if __name__ == "__main__":
import time
def timeit(func: Callable) -> Callable:
"""Time a function."""
def wrapper(*args: Any, **kwargs: Any) -> None:
"""Callcuate time taken to run a function."""
start = time.time()
func(*args, **kwargs)
end = time.time()
print(f"Time taken: {end - start} seconds for {func.__name__}")
return wrapper
reader1 = GithubRepositoryReader(
github_token=os.environ["GITHUB_TOKEN"],
owner="jerryjliu",
repo="gpt_index",
use_parser=False,
verbose=True,
ignore_directories=["examples"],
)
@timeit
def load_data_from_commit() -> None:
"""Load data from a commit."""
documents = reader1.load_data(
commit_sha="22e198b3b166b5facd2843d6a62ac0db07894a13"
)
for document in documents:
print(document.extra_info)
@timeit
def load_data_from_branch() -> None:
"""Load data from a branch."""
documents = reader1.load_data(branch="main")
for document in documents:
print(document.extra_info)
input("Press enter to load github repository from branch name...")
load_data_from_branch()
input("Press enter to load github repository from commit sha...")
load_data_from_commit()