Spaces:
Running
on
Zero
Running
on
Zero
import numpy as np | |
from typing import List, Dict, Tuple, Any, Optional | |
from dataclasses import dataclass, field | |
import traceback | |
from scipy import stats | |
class CalibrationResult: | |
"""校準結果結構""" | |
original_scores: List[float] | |
calibrated_scores: List[float] | |
score_mapping: Dict[str, float] # breed -> calibrated_score | |
calibration_method: str | |
distribution_stats: Dict[str, float] | |
quality_metrics: Dict[str, float] = field(default_factory=dict) | |
class ScoreDistribution: | |
"""分數分布統計""" | |
mean: float | |
std: float | |
min_score: float | |
max_score: float | |
percentile_5: float | |
percentile_95: float | |
compression_ratio: float # 分數壓縮比率 | |
effective_range: float # 有效分數範圍 | |
class ScoreCalibrator: | |
""" | |
動態分數校準系統 | |
解決分數壓縮問題並保持相對排名 | |
""" | |
def __init__(self): | |
"""初始化校準器""" | |
self.calibration_methods = { | |
'dynamic_range_mapping': self._dynamic_range_mapping, | |
'percentile_stretching': self._percentile_stretching, | |
'gaussian_normalization': self._gaussian_normalization, | |
'sigmoid_transformation': self._sigmoid_transformation | |
} | |
self.quality_thresholds = { | |
'min_effective_range': 0.3, # 最小有效分數範圍 | |
'max_compression_ratio': 0.2, # 最大允許壓縮比率 | |
'target_distribution_range': (0.45, 0.95) # 目標分布範圍 | |
} | |
def calibrate_scores(self, breed_scores: List[Tuple[str, float]], | |
method: str = 'auto') -> CalibrationResult: | |
""" | |
校準品種分數 | |
Args: | |
breed_scores: (breed_name, score) 元組列表 | |
method: 校準方法 ('auto', 'dynamic_range_mapping', 'percentile_stretching', etc.) | |
Returns: | |
CalibrationResult: 校準結果 | |
""" | |
try: | |
if not breed_scores: | |
return CalibrationResult( | |
original_scores=[], | |
calibrated_scores=[], | |
score_mapping={}, | |
calibration_method='none', | |
distribution_stats={} | |
) | |
# 提取分數和品種名稱 | |
breeds = [item[0] for item in breed_scores] | |
original_scores = [item[1] for item in breed_scores] | |
# 分析原始分數分布 | |
distribution = self._analyze_score_distribution(original_scores) | |
# 選擇校準方法 | |
if method == 'auto': | |
method = self._select_calibration_method(distribution) | |
# 應用校準 | |
calibration_func = self.calibration_methods.get(method, self._dynamic_range_mapping) | |
calibrated_scores = calibration_func(original_scores, distribution) | |
# 保持排名一致性 | |
calibrated_scores = self._preserve_ranking(original_scores, calibrated_scores) | |
# 建立分數映射 | |
score_mapping = dict(zip(breeds, calibrated_scores)) | |
# 計算品質指標 | |
quality_metrics = self._calculate_quality_metrics( | |
original_scores, calibrated_scores, distribution | |
) | |
return CalibrationResult( | |
original_scores=original_scores, | |
calibrated_scores=calibrated_scores, | |
score_mapping=score_mapping, | |
calibration_method=method, | |
distribution_stats=self._distribution_to_dict(distribution), | |
quality_metrics=quality_metrics | |
) | |
except Exception as e: | |
print(f"Error calibrating scores: {str(e)}") | |
print(traceback.format_exc()) | |
# 回傳原始分數作為降級方案 | |
breeds = [item[0] for item in breed_scores] | |
original_scores = [item[1] for item in breed_scores] | |
return CalibrationResult( | |
original_scores=original_scores, | |
calibrated_scores=original_scores, | |
score_mapping=dict(zip(breeds, original_scores)), | |
calibration_method='fallback', | |
distribution_stats={} | |
) | |
def _analyze_score_distribution(self, scores: List[float]) -> ScoreDistribution: | |
"""分析分數分布""" | |
try: | |
scores_array = np.array(scores) | |
# 基本統計 | |
mean_score = np.mean(scores_array) | |
std_score = np.std(scores_array) | |
min_score = np.min(scores_array) | |
max_score = np.max(scores_array) | |
# 百分位數 | |
percentile_5 = np.percentile(scores_array, 5) | |
percentile_95 = np.percentile(scores_array, 95) | |
# 壓縮比率和有效範圍 | |
full_range = max_score - min_score | |
effective_range = percentile_95 - percentile_5 | |
compression_ratio = 1.0 - (effective_range / 1.0) if full_range > 0 else 0.0 | |
return ScoreDistribution( | |
mean=mean_score, | |
std=std_score, | |
min_score=min_score, | |
max_score=max_score, | |
percentile_5=percentile_5, | |
percentile_95=percentile_95, | |
compression_ratio=compression_ratio, | |
effective_range=effective_range | |
) | |
except Exception as e: | |
print(f"Error analyzing score distribution: {str(e)}") | |
# 返回預設分布 | |
return ScoreDistribution( | |
mean=0.5, std=0.1, min_score=0.0, max_score=1.0, | |
percentile_5=0.4, percentile_95=0.6, | |
compression_ratio=0.6, effective_range=0.2 | |
) | |
def _select_calibration_method(self, distribution: ScoreDistribution) -> str: | |
"""根據分布特性選擇校準方法""" | |
# 高度壓縮的分數需要強力展開 | |
if distribution.compression_ratio > 0.8: | |
return 'percentile_stretching' | |
# 中等壓縮使用動態範圍映射 | |
elif distribution.compression_ratio > 0.5: | |
return 'dynamic_range_mapping' | |
# 分數集中在中間使用 sigmoid 轉換 | |
elif 0.4 <= distribution.mean <= 0.6 and distribution.std < 0.1: | |
return 'sigmoid_transformation' | |
# 其他情況使用高斯正規化 | |
else: | |
return 'gaussian_normalization' | |
def _dynamic_range_mapping(self, scores: List[float], | |
distribution: ScoreDistribution) -> List[float]: | |
"""動態範圍映射校準""" | |
try: | |
scores_array = np.array(scores) | |
# 使用5%和95%百分位數作為邊界 | |
lower_bound = distribution.percentile_5 | |
upper_bound = distribution.percentile_95 | |
# 避免除零 | |
if upper_bound - lower_bound < 0.001: | |
upper_bound = distribution.max_score | |
lower_bound = distribution.min_score | |
if upper_bound - lower_bound < 0.001: | |
return scores # 所有分數相同,無需校準 | |
# 映射到目標範圍 [0.45, 0.95] | |
target_min, target_max = self.quality_thresholds['target_distribution_range'] | |
# 線性映射 | |
normalized = (scores_array - lower_bound) / (upper_bound - lower_bound) | |
normalized = np.clip(normalized, 0, 1) # 限制在 [0,1] 範圍 | |
calibrated = target_min + normalized * (target_max - target_min) | |
return calibrated.tolist() | |
except Exception as e: | |
print(f"Error in dynamic range mapping: {str(e)}") | |
return scores | |
def _percentile_stretching(self, scores: List[float], | |
distribution: ScoreDistribution) -> List[float]: | |
"""百分位數拉伸校準""" | |
try: | |
scores_array = np.array(scores) | |
# 計算百分位數排名 | |
percentile_ranks = stats.rankdata(scores_array, method='average') / len(scores_array) | |
# 使用平方根轉換來增強差異 | |
stretched_ranks = np.sqrt(percentile_ranks) | |
# 映射到目標範圍 | |
target_min, target_max = self.quality_thresholds['target_distribution_range'] | |
calibrated = target_min + stretched_ranks * (target_max - target_min) | |
return calibrated.tolist() | |
except Exception as e: | |
print(f"Error in percentile stretching: {str(e)}") | |
return self._dynamic_range_mapping(scores, distribution) | |
def _gaussian_normalization(self, scores: List[float], | |
distribution: ScoreDistribution) -> List[float]: | |
"""高斯正規化校準""" | |
try: | |
scores_array = np.array(scores) | |
# Z-score 正規化 | |
if distribution.std > 0: | |
z_scores = (scores_array - distribution.mean) / distribution.std | |
# 限制 Z-scores 在合理範圍內 | |
z_scores = np.clip(z_scores, -3, 3) | |
else: | |
z_scores = np.zeros_like(scores_array) | |
# 轉換到目標範圍 | |
target_min, target_max = self.quality_thresholds['target_distribution_range'] | |
target_mean = (target_min + target_max) / 2 | |
target_std = (target_max - target_min) / 6 # 3-sigma 範圍 | |
calibrated = target_mean + z_scores * target_std | |
calibrated = np.clip(calibrated, target_min, target_max) | |
return calibrated.tolist() | |
except Exception as e: | |
print(f"Error in gaussian normalization: {str(e)}") | |
return self._dynamic_range_mapping(scores, distribution) | |
def _sigmoid_transformation(self, scores: List[float], | |
distribution: ScoreDistribution) -> List[float]: | |
"""Sigmoid 轉換校準""" | |
try: | |
scores_array = np.array(scores) | |
# 中心化分數 | |
centered = scores_array - distribution.mean | |
# Sigmoid 轉換 (增強中等分數的差異) | |
sigmoid_factor = 10.0 # 控制 sigmoid 的陡峭程度 | |
transformed = 1 / (1 + np.exp(-sigmoid_factor * centered)) | |
# 映射到目標範圍 | |
target_min, target_max = self.quality_thresholds['target_distribution_range'] | |
calibrated = target_min + transformed * (target_max - target_min) | |
return calibrated.tolist() | |
except Exception as e: | |
print(f"Error in sigmoid transformation: {str(e)}") | |
return self._dynamic_range_mapping(scores, distribution) | |
def _preserve_ranking(self, original_scores: List[float], | |
calibrated_scores: List[float]) -> List[float]: | |
"""確保校準後的分數保持原始排名""" | |
try: | |
# 獲取原始排名 | |
original_ranks = stats.rankdata([-score for score in original_scores], method='ordinal') | |
# 獲取校準後的排名 | |
calibrated_with_ranks = list(zip(calibrated_scores, original_ranks)) | |
# 按原始排名排序校準後的分數 | |
calibrated_with_ranks.sort(key=lambda x: x[1]) | |
# 重新分配分數以保持排名但使用校準後的分布 | |
sorted_calibrated = sorted(calibrated_scores, reverse=True) | |
# 建立新的分數列表 | |
preserved_scores = [0.0] * len(original_scores) | |
for i, (_, original_rank) in enumerate(calibrated_with_ranks): | |
# 找到原始位置 | |
original_index = original_ranks.tolist().index(original_rank) | |
preserved_scores[original_index] = sorted_calibrated[i] | |
return preserved_scores | |
except Exception as e: | |
print(f"Error preserving ranking: {str(e)}") | |
return calibrated_scores | |
def _calculate_quality_metrics(self, original_scores: List[float], | |
calibrated_scores: List[float], | |
distribution: ScoreDistribution) -> Dict[str, float]: | |
"""計算校準品質指標""" | |
try: | |
original_array = np.array(original_scores) | |
calibrated_array = np.array(calibrated_scores) | |
# 範圍改善 | |
original_range = np.max(original_array) - np.min(original_array) | |
calibrated_range = np.max(calibrated_array) - np.min(calibrated_array) | |
range_improvement = calibrated_range / max(0.001, original_range) | |
# 分離度改善 (相鄰分數間的平均差異) | |
original_sorted = np.sort(original_array) | |
calibrated_sorted = np.sort(calibrated_array) | |
original_separation = np.mean(np.diff(original_sorted)) if len(original_sorted) > 1 else 0 | |
calibrated_separation = np.mean(np.diff(calibrated_sorted)) if len(calibrated_sorted) > 1 else 0 | |
separation_improvement = (calibrated_separation / max(0.001, original_separation) | |
if original_separation > 0 else 1.0) | |
# 排名保持度 (Spearman 相關係數) | |
if len(original_scores) > 1: | |
rank_correlation, _ = stats.spearmanr(original_scores, calibrated_scores) | |
rank_correlation = abs(rank_correlation) if not np.isnan(rank_correlation) else 1.0 | |
else: | |
rank_correlation = 1.0 | |
# 分布品質 | |
calibrated_std = np.std(calibrated_array) | |
distribution_quality = min(1.0, calibrated_std * 2) # 標準差越大品質越好(在合理範圍內) | |
return { | |
'range_improvement': range_improvement, | |
'separation_improvement': separation_improvement, | |
'rank_preservation': rank_correlation, | |
'distribution_quality': distribution_quality, | |
'effective_range_achieved': calibrated_range, | |
'compression_reduction': max(0, distribution.compression_ratio - | |
(1.0 - calibrated_range)) | |
} | |
except Exception as e: | |
print(f"Error calculating quality metrics: {str(e)}") | |
return {'error': str(e)} | |
def _distribution_to_dict(self, distribution: ScoreDistribution) -> Dict[str, float]: | |
"""將分布統計轉換為字典""" | |
return { | |
'mean': distribution.mean, | |
'std': distribution.std, | |
'min_score': distribution.min_score, | |
'max_score': distribution.max_score, | |
'percentile_5': distribution.percentile_5, | |
'percentile_95': distribution.percentile_95, | |
'compression_ratio': distribution.compression_ratio, | |
'effective_range': distribution.effective_range | |
} | |
def apply_tie_breaking(self, breed_scores: List[Tuple[str, float]]) -> List[Tuple[str, float]]: | |
"""應用確定性的打破平手機制""" | |
try: | |
# 按分數分組 | |
score_groups = {} | |
for breed, score in breed_scores: | |
rounded_score = round(score, 6) # 避免浮點數精度問題 | |
if rounded_score not in score_groups: | |
score_groups[rounded_score] = [] | |
score_groups[rounded_score].append((breed, score)) | |
# 處理每個分數組 | |
result = [] | |
for rounded_score in sorted(score_groups.keys(), reverse=True): | |
group = score_groups[rounded_score] | |
if len(group) == 1: | |
result.extend(group) | |
else: | |
# 按品種名稱字母順序打破平手 | |
sorted_group = sorted(group, key=lambda x: x[0]) | |
# 為平手的品種分配微小的分數差異 | |
for i, (breed, original_score) in enumerate(sorted_group): | |
adjusted_score = original_score - (i * 0.0001) | |
result.append((breed, adjusted_score)) | |
return result | |
except Exception as e: | |
print(f"Error in tie breaking: {str(e)}") | |
return breed_scores | |
def get_calibration_summary(self, result: CalibrationResult) -> Dict[str, Any]: | |
"""獲取校準摘要資訊""" | |
try: | |
summary = { | |
'method_used': result.calibration_method, | |
'breeds_processed': len(result.original_scores), | |
'score_range_before': { | |
'min': min(result.original_scores) if result.original_scores else 0, | |
'max': max(result.original_scores) if result.original_scores else 0, | |
'range': (max(result.original_scores) - min(result.original_scores)) | |
if result.original_scores else 0 | |
}, | |
'score_range_after': { | |
'min': min(result.calibrated_scores) if result.calibrated_scores else 0, | |
'max': max(result.calibrated_scores) if result.calibrated_scores else 0, | |
'range': (max(result.calibrated_scores) - min(result.calibrated_scores)) | |
if result.calibrated_scores else 0 | |
}, | |
'distribution_stats': result.distribution_stats, | |
'quality_metrics': result.quality_metrics, | |
'improvement_summary': { | |
'range_expanded': result.quality_metrics.get('range_improvement', 1.0) > 1.1, | |
'separation_improved': result.quality_metrics.get('separation_improvement', 1.0) > 1.1, | |
'ranking_preserved': result.quality_metrics.get('rank_preservation', 1.0) > 0.95 | |
} | |
} | |
return summary | |
except Exception as e: | |
print(f"Error generating calibration summary: {str(e)}") | |
return {'error': str(e)} | |
def calibrate_breed_scores(breed_scores: List[Tuple[str, float]], | |
method: str = 'auto') -> CalibrationResult: | |
""" | |
便利函數:校準品種分數 | |
Args: | |
breed_scores: (breed_name, score) 元組列表 | |
method: 校準方法 | |
Returns: | |
CalibrationResult: 校準結果 | |
""" | |
calibrator = ScoreCalibrator() | |
return calibrator.calibrate_scores(breed_scores, method) | |
def get_calibrated_rankings(breed_scores: List[Tuple[str, float]], | |
method: str = 'auto') -> List[Tuple[str, float, int]]: | |
""" | |
便利函數:獲取校準後的排名 | |
Args: | |
breed_scores: (breed_name, score) 元組列表 | |
method: 校準方法 | |
Returns: | |
List[Tuple[str, float, int]]: (breed_name, calibrated_score, rank) 列表 | |
""" | |
calibrator = ScoreCalibrator() | |
result = calibrator.calibrate_scores(breed_scores, method) | |
# 打破平手機制 | |
calibrated_with_breed = [(breed, result.score_mapping[breed]) for breed in result.score_mapping] | |
calibrated_with_tie_breaking = calibrator.apply_tie_breaking(calibrated_with_breed) | |
# 添加排名 | |
ranked_results = [] | |
for rank, (breed, score) in enumerate(calibrated_with_tie_breaking, 1): | |
ranked_results.append((breed, score, rank)) | |
return ranked_results | |