Spaces:
Runtime error
Runtime error
File size: 5,471 Bytes
35b22df |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
"""
Github readers utils.
This module contains utility functions for the Github readers.
"""
import asyncio
import os
import time
from abc import ABC, abstractmethod
from typing import List, Tuple
from gpt_index.readers.github_readers.github_api_client import (
GitBlobResponseModel,
GithubClient,
GitTreeResponseModel,
)
def print_if_verbose(verbose: bool, message: str) -> None:
"""Log message if verbose is True."""
if verbose:
print(message)
def get_file_extension(filename: str) -> str:
"""Get file extension."""
return f".{os.path.splitext(filename)[1][1:].lower()}"
class BufferedAsyncIterator(ABC):
"""
Base class for buffered async iterators.
This class is to be used as a base class for async iterators
that need to buffer the results of an async operation.
The async operation is defined in the _fill_buffer method.
The _fill_buffer method is called when the buffer is empty.
"""
def __init__(self, buffer_size: int):
"""
Initialize params.
Args:
- `buffer_size (int)`: Size of the buffer.
It is also the number of items that will
be retrieved from the async operation at once.
see _fill_buffer. Defaults to 2. Setting it to 1
will result in the same behavior as a synchronous iterator.
"""
self._buffer_size = buffer_size
self._buffer: List[Tuple[GitBlobResponseModel, str]] = []
self._index = 0
@abstractmethod
async def _fill_buffer(self) -> None:
raise NotImplementedError
def __aiter__(self) -> "BufferedAsyncIterator":
"""Return the iterator object."""
return self
async def __anext__(self) -> Tuple[GitBlobResponseModel, str]:
"""
Get next item.
Returns:
- `item (Tuple[GitBlobResponseModel, str])`: Next item.
Raises:
- `StopAsyncIteration`: If there are no more items.
"""
if not self._buffer:
await self._fill_buffer()
if not self._buffer:
raise StopAsyncIteration
item = self._buffer.pop(0)
self._index += 1
return item
class BufferedGitBlobDataIterator(BufferedAsyncIterator):
"""
Buffered async iterator for Git blobs.
This class is an async iterator that buffers the results of the get_blob operation.
It is used to retrieve the contents of the files in a Github repository.
getBlob endpoint supports up to 100 megabytes of content for blobs.
This concrete implementation of BufferedAsyncIterator allows you to lazily retrieve
the contents of the files in a Github repository.
Otherwise you would have to retrieve all the contents of
the files in the repository at once, which would
be problematic if the repository is large.
"""
def __init__(
self,
blobs_and_paths: List[Tuple[GitTreeResponseModel.GitTreeObject, str]],
github_client: GithubClient,
owner: str,
repo: str,
loop: asyncio.AbstractEventLoop,
buffer_size: int,
verbose: bool = False,
):
"""
Initialize params.
Args:
- blobs_and_paths (List[Tuple[GitTreeResponseModel.GitTreeObject, str]]):
List of tuples containing the blob and the path of the file.
- github_client (GithubClient): Github client.
- owner (str): Owner of the repository.
- repo (str): Name of the repository.
- loop (asyncio.AbstractEventLoop): Event loop.
- buffer_size (int): Size of the buffer.
"""
super().__init__(buffer_size)
self._blobs_and_paths = blobs_and_paths
self._github_client = github_client
self._owner = owner
self._repo = repo
self._verbose = verbose
if loop is None:
loop = asyncio.get_event_loop()
if loop is None:
raise ValueError("No event loop found")
async def _fill_buffer(self) -> None:
"""
Fill the buffer with the results of the get_blob operation.
The get_blob operation is called for each blob in the blobs_and_paths list.
The blobs are retrieved in batches of size buffer_size.
"""
del self._buffer[:]
self._buffer = []
start = self._index
end = min(start + self._buffer_size, len(self._blobs_and_paths))
if start >= end:
return
if self._verbose:
start_t = time.time()
results: List[GitBlobResponseModel] = await asyncio.gather(
*[
self._github_client.get_blob(self._owner, self._repo, blob.sha)
for blob, _ in self._blobs_and_paths[
start:end
] # TODO: use batch_size instead of buffer_size for concurrent requests
]
)
if self._verbose:
end_t = time.time()
blob_names_and_sizes = [
(blob.path, blob.size) for blob, _ in self._blobs_and_paths[start:end]
]
print(
"Time to get blobs ("
+ f"{blob_names_and_sizes}"
+ f"): {end_t - start_t:.2f} seconds"
)
self._buffer = [
(result, path)
for result, (_, path) in zip(results, self._blobs_and_paths[start:end])
]
|