Upload loaders.py with huggingface_hub
Browse files- loaders.py +33 -14
loaders.py
CHANGED
|
@@ -60,7 +60,7 @@ class Loader(SourceOperator):
|
|
| 60 |
# loader may ingore this. In any case, the recipe, will limit the number of instances in the returned
|
| 61 |
# stream, after load is complete.
|
| 62 |
loader_limit: int = None
|
| 63 |
-
|
| 64 |
|
| 65 |
|
| 66 |
class LoadHF(Loader):
|
|
@@ -71,21 +71,27 @@ class LoadHF(Loader):
|
|
| 71 |
data_files: Optional[
|
| 72 |
Union[str, Sequence[str], Mapping[str, Union[str, Sequence[str]]]]
|
| 73 |
] = None
|
| 74 |
-
streaming: bool =
|
| 75 |
|
| 76 |
def process(self):
|
| 77 |
try:
|
| 78 |
with tempfile.TemporaryDirectory() as dir_to_be_deleted:
|
| 79 |
-
|
| 80 |
-
|
| 81 |
-
|
| 82 |
-
|
| 83 |
-
|
| 84 |
-
|
| 85 |
-
|
| 86 |
-
|
| 87 |
-
|
| 88 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 89 |
if self.split is not None:
|
| 90 |
dataset = {self.split: dataset}
|
| 91 |
except (
|
|
@@ -122,15 +128,23 @@ class LoadCSV(Loader):
|
|
| 122 |
files: Dict[str, str]
|
| 123 |
chunksize: int = 1000
|
| 124 |
|
| 125 |
-
def
|
| 126 |
for chunk in pd.read_csv(file, chunksize=self.chunksize):
|
| 127 |
for _index, row in chunk.iterrows():
|
| 128 |
yield row.to_dict()
|
| 129 |
|
| 130 |
def process(self):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 131 |
return MultiStream(
|
| 132 |
{
|
| 133 |
-
name:
|
| 134 |
for name, file in self.files.items()
|
| 135 |
}
|
| 136 |
)
|
|
@@ -155,6 +169,9 @@ class LoadFromKaggle(Loader):
|
|
| 155 |
"Please obtain kaggle credentials https://christianjmills.com/posts/kaggle-obtain-api-key-tutorial/ and save them to local ./kaggle.json file"
|
| 156 |
)
|
| 157 |
|
|
|
|
|
|
|
|
|
|
| 158 |
def prepare(self):
|
| 159 |
super().prepare()
|
| 160 |
from opendatasets import download
|
|
@@ -246,6 +263,8 @@ class LoadFromIBMCloud(Loader):
|
|
| 246 |
assert (
|
| 247 |
self.aws_secret_access_key is not None
|
| 248 |
), f"Please set {self.aws_secret_access_key_env} environmental variable"
|
|
|
|
|
|
|
| 249 |
|
| 250 |
def process(self):
|
| 251 |
cos = ibm_boto3.resource(
|
|
|
|
| 60 |
# loader may ingore this. In any case, the recipe, will limit the number of instances in the returned
|
| 61 |
# stream, after load is complete.
|
| 62 |
loader_limit: int = None
|
| 63 |
+
streaming: bool = False
|
| 64 |
|
| 65 |
|
| 66 |
class LoadHF(Loader):
|
|
|
|
| 71 |
data_files: Optional[
|
| 72 |
Union[str, Sequence[str], Mapping[str, Union[str, Sequence[str]]]]
|
| 73 |
] = None
|
| 74 |
+
streaming: bool = False
|
| 75 |
|
| 76 |
def process(self):
|
| 77 |
try:
|
| 78 |
with tempfile.TemporaryDirectory() as dir_to_be_deleted:
|
| 79 |
+
try:
|
| 80 |
+
dataset = hf_load_dataset(
|
| 81 |
+
self.path,
|
| 82 |
+
name=self.name,
|
| 83 |
+
data_dir=self.data_dir,
|
| 84 |
+
data_files=self.data_files,
|
| 85 |
+
streaming=self.streaming,
|
| 86 |
+
cache_dir=None if self.streaming else dir_to_be_deleted,
|
| 87 |
+
split=self.split,
|
| 88 |
+
trust_remote_code=settings.allow_unverified_code,
|
| 89 |
+
)
|
| 90 |
+
except ValueError as e:
|
| 91 |
+
if "trust_remote_code" in str(e):
|
| 92 |
+
raise ValueError(
|
| 93 |
+
f"{self.__class__.__name__} cannot run remote code from huggingface without setting unitxt.settings.allow_unverified_code=True or by setting environment vairable: UNITXT_ALLOW_UNVERIFIED_CODE."
|
| 94 |
+
) from e
|
| 95 |
if self.split is not None:
|
| 96 |
dataset = {self.split: dataset}
|
| 97 |
except (
|
|
|
|
| 128 |
files: Dict[str, str]
|
| 129 |
chunksize: int = 1000
|
| 130 |
|
| 131 |
+
def stream_csv(self, file):
|
| 132 |
for chunk in pd.read_csv(file, chunksize=self.chunksize):
|
| 133 |
for _index, row in chunk.iterrows():
|
| 134 |
yield row.to_dict()
|
| 135 |
|
| 136 |
def process(self):
|
| 137 |
+
if self.streaming:
|
| 138 |
+
return MultiStream(
|
| 139 |
+
{
|
| 140 |
+
name: Stream(generator=self.stream_csv, gen_kwargs={"file": file})
|
| 141 |
+
for name, file in self.files.items()
|
| 142 |
+
}
|
| 143 |
+
)
|
| 144 |
+
|
| 145 |
return MultiStream(
|
| 146 |
{
|
| 147 |
+
name: pd.read_csv(file).to_dict("records")
|
| 148 |
for name, file in self.files.items()
|
| 149 |
}
|
| 150 |
)
|
|
|
|
| 169 |
"Please obtain kaggle credentials https://christianjmills.com/posts/kaggle-obtain-api-key-tutorial/ and save them to local ./kaggle.json file"
|
| 170 |
)
|
| 171 |
|
| 172 |
+
if self.streaming:
|
| 173 |
+
raise NotImplementedError("LoadFromKaggle cannot load with streaming.")
|
| 174 |
+
|
| 175 |
def prepare(self):
|
| 176 |
super().prepare()
|
| 177 |
from opendatasets import download
|
|
|
|
| 263 |
assert (
|
| 264 |
self.aws_secret_access_key is not None
|
| 265 |
), f"Please set {self.aws_secret_access_key_env} environmental variable"
|
| 266 |
+
if self.streaming:
|
| 267 |
+
raise NotImplementedError("LoadFromKaggle cannot load with streaming.")
|
| 268 |
|
| 269 |
def process(self):
|
| 270 |
cos = ibm_boto3.resource(
|