Upload fusion.py with huggingface_hub
Browse files
fusion.py
CHANGED
|
@@ -1,19 +1,15 @@
|
|
| 1 |
import copy
|
| 2 |
from abc import abstractmethod
|
| 3 |
-
from dataclasses import asdict
|
| 4 |
from typing import Generator, List, Optional
|
| 5 |
|
| 6 |
-
from .card import ICLCard, TaskCard
|
| 7 |
-
from .common import CommonRecipe
|
| 8 |
from .dataclass import NonPositionalField
|
| 9 |
from .operator import SourceOperator, StreamSource
|
| 10 |
-
from .random_utils import
|
| 11 |
from .stream import MultiStream, Stream
|
| 12 |
|
| 13 |
|
| 14 |
class BaseFusion(SourceOperator):
|
| 15 |
-
"""
|
| 16 |
-
BaseFusion operator that combines multiple streams into one.
|
| 17 |
|
| 18 |
Args:
|
| 19 |
include_splits: List of splits to include. If None, all splits are included.
|
|
@@ -45,8 +41,7 @@ class BaseFusion(SourceOperator):
|
|
| 45 |
|
| 46 |
|
| 47 |
class FixedFusion(BaseFusion):
|
| 48 |
-
"""
|
| 49 |
-
FixedFusion operator that combines multiple streams into one based on a fixed number of examples per task.
|
| 50 |
|
| 51 |
Args:
|
| 52 |
orgins: List of StreamSource objects.
|
|
@@ -70,8 +65,7 @@ class FixedFusion(BaseFusion):
|
|
| 70 |
|
| 71 |
|
| 72 |
class WeightedFusion(BaseFusion):
|
| 73 |
-
"""
|
| 74 |
-
Fusion operator that combines multiple streams based
|
| 75 |
|
| 76 |
Args:
|
| 77 |
orgins: List of StreamSource objects.
|
|
@@ -87,14 +81,18 @@ class WeightedFusion(BaseFusion):
|
|
| 87 |
super().verify()
|
| 88 |
assert self.origins is not None, "origins must be specified"
|
| 89 |
assert self.weights is not None, "weights must be specified"
|
| 90 |
-
assert len(self.origins) == len(
|
|
|
|
|
|
|
| 91 |
|
| 92 |
def fusion_generator(self, split) -> Generator:
|
| 93 |
weights = copy.deepcopy(self.weights)
|
| 94 |
iterators = [iter(origin()[split]) for origin in self.origins]
|
| 95 |
total_examples = 0
|
| 96 |
-
while (
|
| 97 |
-
|
|
|
|
|
|
|
| 98 |
try:
|
| 99 |
yield next(iterator)
|
| 100 |
total_examples += 1
|
|
|
|
| 1 |
import copy
|
| 2 |
from abc import abstractmethod
|
|
|
|
| 3 |
from typing import Generator, List, Optional
|
| 4 |
|
|
|
|
|
|
|
| 5 |
from .dataclass import NonPositionalField
|
| 6 |
from .operator import SourceOperator, StreamSource
|
| 7 |
+
from .random_utils import get_random
|
| 8 |
from .stream import MultiStream, Stream
|
| 9 |
|
| 10 |
|
| 11 |
class BaseFusion(SourceOperator):
|
| 12 |
+
"""BaseFusion operator that combines multiple streams into one.
|
|
|
|
| 13 |
|
| 14 |
Args:
|
| 15 |
include_splits: List of splits to include. If None, all splits are included.
|
|
|
|
| 41 |
|
| 42 |
|
| 43 |
class FixedFusion(BaseFusion):
|
| 44 |
+
"""FixedFusion operator that combines multiple streams into one based on a fixed number of examples per task.
|
|
|
|
| 45 |
|
| 46 |
Args:
|
| 47 |
orgins: List of StreamSource objects.
|
|
|
|
| 65 |
|
| 66 |
|
| 67 |
class WeightedFusion(BaseFusion):
|
| 68 |
+
"""Fusion operator that combines multiple streams based.
|
|
|
|
| 69 |
|
| 70 |
Args:
|
| 71 |
orgins: List of StreamSource objects.
|
|
|
|
| 81 |
super().verify()
|
| 82 |
assert self.origins is not None, "origins must be specified"
|
| 83 |
assert self.weights is not None, "weights must be specified"
|
| 84 |
+
assert len(self.origins) == len(
|
| 85 |
+
self.weights
|
| 86 |
+
), "origins and weights must have the same length"
|
| 87 |
|
| 88 |
def fusion_generator(self, split) -> Generator:
|
| 89 |
weights = copy.deepcopy(self.weights)
|
| 90 |
iterators = [iter(origin()[split]) for origin in self.origins]
|
| 91 |
total_examples = 0
|
| 92 |
+
while (
|
| 93 |
+
self.max_total_examples is None or total_examples <= self.max_total_examples
|
| 94 |
+
) and len(iterators) > 0:
|
| 95 |
+
iterator = get_random().choices(population=iterators, weights=weights)[0]
|
| 96 |
try:
|
| 97 |
yield next(iterator)
|
| 98 |
total_examples += 1
|