Spaces:
Runtime error
Runtime error
""" | |
Github readers utils. | |
This module contains utility functions for the Github readers. | |
""" | |
import asyncio | |
import os | |
import time | |
from abc import ABC, abstractmethod | |
from typing import List, Tuple | |
from gpt_index.readers.github_readers.github_api_client import ( | |
GitBlobResponseModel, | |
GithubClient, | |
GitTreeResponseModel, | |
) | |
def print_if_verbose(verbose: bool, message: str) -> None: | |
"""Log message if verbose is True.""" | |
if verbose: | |
print(message) | |
def get_file_extension(filename: str) -> str: | |
"""Get file extension.""" | |
return f".{os.path.splitext(filename)[1][1:].lower()}" | |
class BufferedAsyncIterator(ABC): | |
""" | |
Base class for buffered async iterators. | |
This class is to be used as a base class for async iterators | |
that need to buffer the results of an async operation. | |
The async operation is defined in the _fill_buffer method. | |
The _fill_buffer method is called when the buffer is empty. | |
""" | |
def __init__(self, buffer_size: int): | |
""" | |
Initialize params. | |
Args: | |
- `buffer_size (int)`: Size of the buffer. | |
It is also the number of items that will | |
be retrieved from the async operation at once. | |
see _fill_buffer. Defaults to 2. Setting it to 1 | |
will result in the same behavior as a synchronous iterator. | |
""" | |
self._buffer_size = buffer_size | |
self._buffer: List[Tuple[GitBlobResponseModel, str]] = [] | |
self._index = 0 | |
async def _fill_buffer(self) -> None: | |
raise NotImplementedError | |
def __aiter__(self) -> "BufferedAsyncIterator": | |
"""Return the iterator object.""" | |
return self | |
async def __anext__(self) -> Tuple[GitBlobResponseModel, str]: | |
""" | |
Get next item. | |
Returns: | |
- `item (Tuple[GitBlobResponseModel, str])`: Next item. | |
Raises: | |
- `StopAsyncIteration`: If there are no more items. | |
""" | |
if not self._buffer: | |
await self._fill_buffer() | |
if not self._buffer: | |
raise StopAsyncIteration | |
item = self._buffer.pop(0) | |
self._index += 1 | |
return item | |
class BufferedGitBlobDataIterator(BufferedAsyncIterator): | |
""" | |
Buffered async iterator for Git blobs. | |
This class is an async iterator that buffers the results of the get_blob operation. | |
It is used to retrieve the contents of the files in a Github repository. | |
getBlob endpoint supports up to 100 megabytes of content for blobs. | |
This concrete implementation of BufferedAsyncIterator allows you to lazily retrieve | |
the contents of the files in a Github repository. | |
Otherwise you would have to retrieve all the contents of | |
the files in the repository at once, which would | |
be problematic if the repository is large. | |
""" | |
def __init__( | |
self, | |
blobs_and_paths: List[Tuple[GitTreeResponseModel.GitTreeObject, str]], | |
github_client: GithubClient, | |
owner: str, | |
repo: str, | |
loop: asyncio.AbstractEventLoop, | |
buffer_size: int, | |
verbose: bool = False, | |
): | |
""" | |
Initialize params. | |
Args: | |
- blobs_and_paths (List[Tuple[GitTreeResponseModel.GitTreeObject, str]]): | |
List of tuples containing the blob and the path of the file. | |
- github_client (GithubClient): Github client. | |
- owner (str): Owner of the repository. | |
- repo (str): Name of the repository. | |
- loop (asyncio.AbstractEventLoop): Event loop. | |
- buffer_size (int): Size of the buffer. | |
""" | |
super().__init__(buffer_size) | |
self._blobs_and_paths = blobs_and_paths | |
self._github_client = github_client | |
self._owner = owner | |
self._repo = repo | |
self._verbose = verbose | |
if loop is None: | |
loop = asyncio.get_event_loop() | |
if loop is None: | |
raise ValueError("No event loop found") | |
async def _fill_buffer(self) -> None: | |
""" | |
Fill the buffer with the results of the get_blob operation. | |
The get_blob operation is called for each blob in the blobs_and_paths list. | |
The blobs are retrieved in batches of size buffer_size. | |
""" | |
del self._buffer[:] | |
self._buffer = [] | |
start = self._index | |
end = min(start + self._buffer_size, len(self._blobs_and_paths)) | |
if start >= end: | |
return | |
if self._verbose: | |
start_t = time.time() | |
results: List[GitBlobResponseModel] = await asyncio.gather( | |
*[ | |
self._github_client.get_blob(self._owner, self._repo, blob.sha) | |
for blob, _ in self._blobs_and_paths[ | |
start:end | |
] # TODO: use batch_size instead of buffer_size for concurrent requests | |
] | |
) | |
if self._verbose: | |
end_t = time.time() | |
blob_names_and_sizes = [ | |
(blob.path, blob.size) for blob, _ in self._blobs_and_paths[start:end] | |
] | |
print( | |
"Time to get blobs (" | |
+ f"{blob_names_and_sizes}" | |
+ f"): {end_t - start_t:.2f} seconds" | |
) | |
self._buffer = [ | |
(result, path) | |
for result, (_, path) in zip(results, self._blobs_and_paths[start:end]) | |
] | |