Spaces:
Runtime error
Runtime error
| # copyright (c) 2020 PaddlePaddle Authors. All Rights Reserve. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| from rapidfuzz.distance import Levenshtein | |
| from difflib import SequenceMatcher | |
| import numpy as np | |
| import string | |
| class RecMetric(object): | |
| def __init__(self, | |
| main_indicator='acc', | |
| is_filter=False, | |
| ignore_space=True, | |
| **kwargs): | |
| self.main_indicator = main_indicator | |
| self.is_filter = is_filter | |
| self.ignore_space = ignore_space | |
| self.eps = 1e-5 | |
| self.reset() | |
| def _normalize_text(self, text): | |
| text = ''.join( | |
| filter(lambda x: x in (string.digits + string.ascii_letters), text)) | |
| return text.lower() | |
| def __call__(self, pred_label, *args, **kwargs): | |
| preds, labels = pred_label | |
| correct_num = 0 | |
| all_num = 0 | |
| norm_edit_dis = 0.0 | |
| for (pred, pred_conf), (target, _) in zip(preds, labels): | |
| if self.ignore_space: | |
| pred = pred.replace(" ", "") | |
| target = target.replace(" ", "") | |
| if self.is_filter: | |
| pred = self._normalize_text(pred) | |
| target = self._normalize_text(target) | |
| norm_edit_dis += Levenshtein.normalized_distance(pred, target) | |
| if pred == target: | |
| correct_num += 1 | |
| all_num += 1 | |
| self.correct_num += correct_num | |
| self.all_num += all_num | |
| self.norm_edit_dis += norm_edit_dis | |
| return { | |
| 'acc': correct_num / (all_num + self.eps), | |
| 'norm_edit_dis': 1 - norm_edit_dis / (all_num + self.eps) | |
| } | |
| def get_metric(self): | |
| """ | |
| return metrics { | |
| 'acc': 0, | |
| 'norm_edit_dis': 0, | |
| } | |
| """ | |
| acc = 1.0 * self.correct_num / (self.all_num + self.eps) | |
| norm_edit_dis = 1 - self.norm_edit_dis / (self.all_num + self.eps) | |
| self.reset() | |
| return {'acc': acc, 'norm_edit_dis': norm_edit_dis} | |
| def reset(self): | |
| self.correct_num = 0 | |
| self.all_num = 0 | |
| self.norm_edit_dis = 0 | |
| class CNTMetric(object): | |
| def __init__(self, main_indicator='acc', **kwargs): | |
| self.main_indicator = main_indicator | |
| self.eps = 1e-5 | |
| self.reset() | |
| def __call__(self, pred_label, *args, **kwargs): | |
| preds, labels = pred_label | |
| correct_num = 0 | |
| all_num = 0 | |
| for pred, target in zip(preds, labels): | |
| if pred == target: | |
| correct_num += 1 | |
| all_num += 1 | |
| self.correct_num += correct_num | |
| self.all_num += all_num | |
| return {'acc': correct_num / (all_num + self.eps), } | |
| def get_metric(self): | |
| """ | |
| return metrics { | |
| 'acc': 0, | |
| } | |
| """ | |
| acc = 1.0 * self.correct_num / (self.all_num + self.eps) | |
| self.reset() | |
| return {'acc': acc} | |
| def reset(self): | |
| self.correct_num = 0 | |
| self.all_num = 0 | |
| class CANMetric(object): | |
| def __init__(self, main_indicator='exp_rate', **kwargs): | |
| self.main_indicator = main_indicator | |
| self.word_right = [] | |
| self.exp_right = [] | |
| self.word_total_length = 0 | |
| self.exp_total_num = 0 | |
| self.word_rate = 0 | |
| self.exp_rate = 0 | |
| self.reset() | |
| self.epoch_reset() | |
| def __call__(self, preds, batch, **kwargs): | |
| for k, v in kwargs.items(): | |
| epoch_reset = v | |
| if epoch_reset: | |
| self.epoch_reset() | |
| word_probs = preds | |
| word_label, word_label_mask = batch | |
| line_right = 0 | |
| if word_probs is not None: | |
| word_pred = word_probs.argmax(2) | |
| word_pred = word_pred.cpu().detach().numpy() | |
| word_scores = [ | |
| SequenceMatcher( | |
| None, | |
| s1[:int(np.sum(s3))], | |
| s2[:int(np.sum(s3))], | |
| autojunk=False).ratio() * ( | |
| len(s1[:int(np.sum(s3))]) + len(s2[:int(np.sum(s3))])) / | |
| len(s1[:int(np.sum(s3))]) / 2 | |
| for s1, s2, s3 in zip(word_label, word_pred, word_label_mask) | |
| ] | |
| batch_size = len(word_scores) | |
| for i in range(batch_size): | |
| if word_scores[i] == 1: | |
| line_right += 1 | |
| self.word_rate = np.mean(word_scores) #float | |
| self.exp_rate = line_right / batch_size #float | |
| exp_length, word_length = word_label.shape[:2] | |
| self.word_right.append(self.word_rate * word_length) | |
| self.exp_right.append(self.exp_rate * exp_length) | |
| self.word_total_length = self.word_total_length + word_length | |
| self.exp_total_num = self.exp_total_num + exp_length | |
| def get_metric(self): | |
| """ | |
| return { | |
| 'word_rate': 0, | |
| "exp_rate": 0, | |
| } | |
| """ | |
| cur_word_rate = sum(self.word_right) / self.word_total_length | |
| cur_exp_rate = sum(self.exp_right) / self.exp_total_num | |
| self.reset() | |
| return {'word_rate': cur_word_rate, "exp_rate": cur_exp_rate} | |
| def reset(self): | |
| self.word_rate = 0 | |
| self.exp_rate = 0 | |
| def epoch_reset(self): | |
| self.word_right = [] | |
| self.exp_right = [] | |
| self.word_total_length = 0 | |
| self.exp_total_num = 0 | |