Spaces:
Running
Running
# Copyright 2020 The HuggingFace Datasets Authors and the current dataset script contributor. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
"""TODO: Add a description here.""" | |
import concurrent.futures | |
import math | |
import statistics | |
from typing import List, Optional, Union | |
import datasets | |
import evaluate | |
import numpy as np | |
# TODO: Add BibTeX citation | |
_CITATION = """\ | |
@InProceedings{huggingface:module, | |
title = {A great new module}, | |
authors={huggingface, Inc.}, | |
year={2020} | |
} | |
""" | |
# TODO: Add description of the module here | |
_DESCRIPTION = """\ | |
This new module is designed to solve this great ML task and is crafted with a lot of care. | |
""" | |
# TODO: Add description of the arguments of the module here | |
_KWARGS_DESCRIPTION = """ | |
Calculates how good are predictions given some references, using certain scores | |
Args: | |
predictions: list of generated time series. | |
shape: (num_generation, num_timesteps, num_features) | |
references: list of reference | |
shape: (num_reference, num_timesteps, num_features) | |
Returns: | |
Examples: | |
Examples should be written in doctest format, and should illustrate how | |
to use the function. | |
>>> my_new_module = evaluate.load("bowdbeg/matching_series") | |
>>> results = my_new_module.compute(references=[[[0.0, 1.0]]], predictions=[[[0.0, 1.0]]]) | |
>>> print(results) | |
{'matchin': 1.0} | |
""" | |
class matching_series(evaluate.Metric): | |
"""TODO: Short description of my evaluation module.""" | |
def _info(self): | |
# TODO: Specifies the evaluate.EvaluationModuleInfo object | |
return evaluate.MetricInfo( | |
# This is the description that will appear on the modules page. | |
module_type="metric", | |
description=_DESCRIPTION, | |
citation=_CITATION, | |
inputs_description=_KWARGS_DESCRIPTION, | |
# This defines the format of each prediction and reference | |
features=datasets.Features( | |
{ | |
"predictions": datasets.Sequence(datasets.Sequence(datasets.Value("float"))), | |
"references": datasets.Sequence(datasets.Sequence(datasets.Value("float"))), | |
} | |
), | |
# Homepage of the module for documentation | |
homepage="https://huggingface.co/spaces/bowdbeg/matching_series", | |
# Additional links to the codebase or references | |
codebase_urls=["http://github.com/path/to/codebase/of/new_module"], | |
reference_urls=["http://path.to.reference.url/new_module"], | |
) | |
def _download_and_prepare(self, dl_manager): | |
"""Optional: download external resources useful to compute the scores""" | |
pass | |
def compute(self, *, predictions=None, references=None, **kwargs) -> Optional[dict]: | |
"""Compute the evaluation module. | |
Usage of positional arguments is not allowed to prevent mistakes. | |
Args: | |
predictions (`list/array/tensor`, *optional*): | |
Predictions. | |
references (`list/array/tensor`, *optional*): | |
References. | |
**kwargs (optional): | |
Keyword arguments that will be forwarded to the evaluation module [`~evaluate.EvaluationModule.compute`] | |
method (see details in the docstring). | |
Return: | |
`dict` or `None` | |
- Dictionary with the results if this evaluation module is run on the main process (`process_id == 0`). | |
- `None` if the evaluation module is not run on the main process (`process_id != 0`). | |
```py | |
>>> import evaluate | |
>>> accuracy = evaluate.load("accuracy") | |
>>> accuracy.compute(predictions=[0, 1, 1, 0], references=[0, 1, 0, 1]) | |
``` | |
""" | |
all_kwargs = {"predictions": predictions, "references": references, **kwargs} | |
if predictions is None and references is None: | |
missing_kwargs = {k: None for k in self._feature_names() if k not in all_kwargs} | |
all_kwargs.update(missing_kwargs) | |
else: | |
missing_inputs = [k for k in self._feature_names() if k not in all_kwargs] | |
if missing_inputs: | |
raise ValueError( | |
f"Evaluation module inputs are missing: {missing_inputs}. All required inputs are {list(self._feature_names())}" | |
) | |
inputs = {input_name: all_kwargs[input_name] for input_name in self._feature_names()} | |
compute_kwargs = {k: kwargs[k] for k in kwargs if k not in self._feature_names()} | |
return self._compute(**inputs, **compute_kwargs) | |
def _compute( | |
self, | |
predictions: Union[List, np.ndarray], | |
references: Union[List, np.ndarray], | |
batch_size: Optional[int] = None, | |
cuc_n_calculation: int = 3, | |
cuc_n_samples: Union[List[int], str] = "auto", | |
metric: str = "mse", | |
num_process: int = 1, | |
return_distance: bool = False, | |
return_matching: bool = False, | |
return_each_features: bool = False, | |
return_coverages: bool = False, | |
return_all: bool = False, | |
dtype=np.float32, | |
instance_normalization: bool = False, | |
eps: float = 1e-10, | |
): | |
""" | |
Compute the scores of the module given the predictions and references | |
Args: | |
predictions: list of generated time series. | |
shape: (num_generation, num_timesteps, num_features) | |
references: list of reference | |
shape: (num_reference, num_timesteps, num_features) | |
batch_size: batch size to use for the computation. If None, the whole dataset is processed at once. | |
cuc_n_calculation: number of Coverage Under Curve calculate times | |
cuc_n_samples: number of samples to use for Coverage Under Curve calculation. If "auto", it uses the number of samples of the predictions. | |
Returns: | |
""" | |
if return_all: | |
return_distance = True | |
return_matching = True | |
return_each_features = True | |
return_coverages = True | |
predictions = np.array(predictions).astype(dtype) | |
references = np.array(references).astype(dtype) | |
if instance_normalization: | |
predictions = (predictions - predictions.mean(axis=1, keepdims=True)) / predictions.std( | |
axis=1, keepdims=True | |
) | |
references = (references - references.mean(axis=1, keepdims=True)) / references.std(axis=1, keepdims=True) | |
assert isinstance(predictions, np.ndarray) and isinstance(references, np.ndarray) | |
if predictions.shape[1:] != references.shape[1:]: | |
raise ValueError( | |
"The number of features in the predictions and references should be the same. predictions: {}, references: {}".format( | |
predictions.shape[1:], references.shape[1:] | |
) | |
) | |
# at first, convert the inputs to numpy arrays | |
distance = self.compute_distance( | |
predictions=predictions, | |
references=references, | |
metric=metric, | |
batch_size=batch_size, | |
num_process=num_process, | |
dtype=dtype, | |
) | |
metrics = self._compute_metrics( | |
distance=distance.mean(axis=-1), | |
eps=eps, | |
cuc_n_calculation=cuc_n_calculation, | |
cuc_n_samples=cuc_n_samples, | |
) | |
metrics_feature = [ | |
self._compute_metrics(distance[:, :, f], eps, cuc_n_calculation, cuc_n_samples) | |
for f in range(predictions.shape[-1]) | |
] | |
macro_metrics = { | |
"macro_" + k: statistics.mean([m[k] for m in metrics_feature]) # type: ignore | |
for k in metrics_feature[0].keys() | |
if isinstance(metrics_feature[0][k], (int, float)) | |
} | |
out = {} | |
out.update({k: v for k, v in metrics.items() if isinstance(v, (int, float))}) | |
out.update(macro_metrics) | |
if return_distance: | |
out["distance"] = distance | |
if return_matching: | |
out.update({k: v for k, v in metrics.items() if "match" in k}) | |
if return_coverages: | |
out["coverages"] = metrics["coverages"] | |
if return_each_features: | |
out.update( | |
{ | |
k + "_features": [m[k] for m in metrics_feature] | |
for k in metrics_feature[0].keys() | |
if isinstance(metrics_feature[0][k], (int, float)) | |
} | |
) | |
if return_coverages: | |
out.update( | |
{ | |
"coverages_features": [m["coverages"] for m in metrics_feature], | |
} | |
) | |
return out | |
def compute_cuc( | |
self, | |
match: np.ndarray, | |
n_reference: int, | |
n_calculation: int, | |
n_samples: Union[List[int], str], | |
): | |
""" | |
Compute Coverage Under Curve | |
Args: | |
match: best match for each generated time series | |
n_reference: number of reference time series | |
n_calculation: number of Coverage Under Curve calculate times | |
n_samples: number of samples to use for Coverage Under Curve calculation. If "auto", it uses the number of samples of the predictions. | |
Returns: | |
""" | |
n_generaiton = len(match) | |
if n_samples == "auto": | |
exp = int(math.log2(n_generaiton)) | |
n_samples = [int(2**i) for i in range(exp)] | |
n_samples.append(n_generaiton) | |
assert isinstance(n_samples, list) and all(isinstance(n, int) for n in n_samples) | |
coverages = [] | |
for n_sample in n_samples: | |
coverage = 0 | |
for _ in range(n_calculation): | |
sample = np.random.choice(match, size=n_sample, replace=False) # type: ignore | |
coverage += len(np.unique(sample)) / n_reference | |
coverages.append(coverage / n_calculation) | |
cuc = (np.trapz(coverages, n_samples) / len(n_samples) / max(n_samples)).item() | |
return coverages, cuc | |
def _compute_distance(x, y, metric: str = "mse", axis: int = -1): | |
if metric.lower() == "mse": | |
return np.mean((x - y) ** 2, axis=axis) | |
elif metric.lower() == "mae": | |
return np.mean(np.abs(x - y), axis=axis) | |
elif metric.lower() == "rmse": | |
return np.sqrt(np.mean((x - y) ** 2, axis=axis)) | |
else: | |
raise ValueError("Unknown metric: {}".format(metric)) | |
def compute_distance( | |
self, | |
predictions: np.ndarray, | |
references: np.ndarray, | |
metric: str, | |
batch_size: Optional[int] = None, | |
num_process: int = 1, | |
dtype=np.float32, | |
): | |
# distance between predictions and references for all example combinations for each features | |
# shape: (num_generation, num_reference, num_features) | |
if batch_size is not None: | |
if num_process > 1: | |
distance = np.zeros((len(predictions), len(references), predictions.shape[-1]), dtype=dtype) | |
idxs = [ | |
(i, j) | |
for i in range(0, len(predictions) + batch_size, batch_size) | |
for j in range(0, len(references) + batch_size, batch_size) | |
] | |
args = [ | |
(predictions[i : i + batch_size, None], references[None, j : j + batch_size], metric, -2) | |
for i, j in idxs | |
] | |
with concurrent.futures.ProcessPoolExecutor(max_workers=num_process) as executor: | |
results = executor.map( | |
self._compute_distance, | |
*zip(*args), | |
) | |
for (i, j), d in zip(idxs, results): | |
distance[i : i + batch_size, j : j + batch_size] = d | |
else: | |
distance = np.zeros((len(predictions), len(references), predictions.shape[-1]), dtype=dtype) | |
# iterate over the predictions and references in batches | |
for i in range(0, len(predictions) + batch_size, batch_size): | |
for j in range(0, len(references) + batch_size, batch_size): | |
d = self._compute_distance( | |
predictions[i : i + batch_size, None], | |
references[None, j : j + batch_size], | |
metric=metric, | |
axis=-2, | |
) | |
distance[i : i + batch_size, j : j + batch_size] = d | |
else: | |
distance = self._compute_distance(predictions[:, None], references[None, :], metric=metric, axis=-2) | |
return distance | |
def _compute_metrics( | |
self, | |
distance: np.ndarray, | |
eps: float = 1e-10, | |
cuc_n_calculation: int = 3, | |
cuc_n_samples: Union[List[int], str] = "auto", | |
) -> dict[str, float | list[float]]: | |
""" | |
Compute metrics from the distance matrix | |
Args: | |
distance: distance matrix. shape: (num_generation, num_reference) | |
Returns: | |
""" | |
index_distance = distance.diagonal(axis1=0, axis2=1).mean().item() | |
# matching scores | |
# best match for each generated time series | |
# shape: (num_generation,) | |
best_match = np.argmin(distance, axis=-1) | |
precision_distance = distance[np.arange(len(best_match)), best_match].mean().item() | |
# best match for each reference time series | |
# shape: (num_reference,) | |
best_match_inv = np.argmin(distance, axis=0) | |
recall_distance = distance[best_match_inv, np.arange(len(best_match_inv))].mean().item() | |
f1_distance = 2 / (1 / (precision_distance + eps) + 1 / (recall_distance + eps)) | |
mean_distance = (precision_distance + recall_distance) / 2 | |
# matching precision, recall and f1 | |
matching_recall = np.unique(best_match).size / len(best_match_inv) | |
matching_precision = np.unique(best_match_inv).size / len(best_match) | |
matching_f1 = 2 / (1 / (matching_precision + eps) + 1 / (matching_recall + eps)) | |
# cuc | |
coverages, cuc = self.compute_cuc(best_match, len(best_match_inv), cuc_n_calculation, cuc_n_samples) | |
return { | |
"precision_distance": precision_distance, | |
"f1_distance": f1_distance, | |
"recall_distance": recall_distance, | |
"mean_distance": mean_distance, | |
"index_distance": index_distance, | |
"matching_precision": matching_precision, | |
"matching_recall": matching_recall, | |
"matching_f1": matching_f1, | |
"cuc": cuc, | |
"coverages": coverages, | |
"match": best_match, | |
"match_inv": best_match_inv, | |
} | |