PawMatchAI / score_calibrator.py
DawnC's picture
Upload 18 files
1e4c9bc verified
import numpy as np
from typing import List, Dict, Tuple, Any, Optional
from dataclasses import dataclass, field
import traceback
from scipy import stats
@dataclass
class CalibrationResult:
"""校準結果結構"""
original_scores: List[float]
calibrated_scores: List[float]
score_mapping: Dict[str, float] # breed -> calibrated_score
calibration_method: str
distribution_stats: Dict[str, float]
quality_metrics: Dict[str, float] = field(default_factory=dict)
@dataclass
class ScoreDistribution:
"""分數分布統計"""
mean: float
std: float
min_score: float
max_score: float
percentile_5: float
percentile_95: float
compression_ratio: float # 分數壓縮比率
effective_range: float # 有效分數範圍
class ScoreCalibrator:
"""
動態分數校準系統
解決分數壓縮問題並保持相對排名
"""
def __init__(self):
"""初始化校準器"""
self.calibration_methods = {
'dynamic_range_mapping': self._dynamic_range_mapping,
'percentile_stretching': self._percentile_stretching,
'gaussian_normalization': self._gaussian_normalization,
'sigmoid_transformation': self._sigmoid_transformation
}
self.quality_thresholds = {
'min_effective_range': 0.3, # 最小有效分數範圍
'max_compression_ratio': 0.2, # 最大允許壓縮比率
'target_distribution_range': (0.45, 0.95) # 目標分布範圍
}
def calibrate_scores(self, breed_scores: List[Tuple[str, float]],
method: str = 'auto') -> CalibrationResult:
"""
校準品種分數
Args:
breed_scores: (breed_name, score) 元組列表
method: 校準方法 ('auto', 'dynamic_range_mapping', 'percentile_stretching', etc.)
Returns:
CalibrationResult: 校準結果
"""
try:
if not breed_scores:
return CalibrationResult(
original_scores=[],
calibrated_scores=[],
score_mapping={},
calibration_method='none',
distribution_stats={}
)
# 提取分數和品種名稱
breeds = [item[0] for item in breed_scores]
original_scores = [item[1] for item in breed_scores]
# 分析原始分數分布
distribution = self._analyze_score_distribution(original_scores)
# 選擇校準方法
if method == 'auto':
method = self._select_calibration_method(distribution)
# 應用校準
calibration_func = self.calibration_methods.get(method, self._dynamic_range_mapping)
calibrated_scores = calibration_func(original_scores, distribution)
# 保持排名一致性
calibrated_scores = self._preserve_ranking(original_scores, calibrated_scores)
# 建立分數映射
score_mapping = dict(zip(breeds, calibrated_scores))
# 計算品質指標
quality_metrics = self._calculate_quality_metrics(
original_scores, calibrated_scores, distribution
)
return CalibrationResult(
original_scores=original_scores,
calibrated_scores=calibrated_scores,
score_mapping=score_mapping,
calibration_method=method,
distribution_stats=self._distribution_to_dict(distribution),
quality_metrics=quality_metrics
)
except Exception as e:
print(f"Error calibrating scores: {str(e)}")
print(traceback.format_exc())
# 回傳原始分數作為降級方案
breeds = [item[0] for item in breed_scores]
original_scores = [item[1] for item in breed_scores]
return CalibrationResult(
original_scores=original_scores,
calibrated_scores=original_scores,
score_mapping=dict(zip(breeds, original_scores)),
calibration_method='fallback',
distribution_stats={}
)
def _analyze_score_distribution(self, scores: List[float]) -> ScoreDistribution:
"""分析分數分布"""
try:
scores_array = np.array(scores)
# 基本統計
mean_score = np.mean(scores_array)
std_score = np.std(scores_array)
min_score = np.min(scores_array)
max_score = np.max(scores_array)
# 百分位數
percentile_5 = np.percentile(scores_array, 5)
percentile_95 = np.percentile(scores_array, 95)
# 壓縮比率和有效範圍
full_range = max_score - min_score
effective_range = percentile_95 - percentile_5
compression_ratio = 1.0 - (effective_range / 1.0) if full_range > 0 else 0.0
return ScoreDistribution(
mean=mean_score,
std=std_score,
min_score=min_score,
max_score=max_score,
percentile_5=percentile_5,
percentile_95=percentile_95,
compression_ratio=compression_ratio,
effective_range=effective_range
)
except Exception as e:
print(f"Error analyzing score distribution: {str(e)}")
# 返回預設分布
return ScoreDistribution(
mean=0.5, std=0.1, min_score=0.0, max_score=1.0,
percentile_5=0.4, percentile_95=0.6,
compression_ratio=0.6, effective_range=0.2
)
def _select_calibration_method(self, distribution: ScoreDistribution) -> str:
"""根據分布特性選擇校準方法"""
# 高度壓縮的分數需要強力展開
if distribution.compression_ratio > 0.8:
return 'percentile_stretching'
# 中等壓縮使用動態範圍映射
elif distribution.compression_ratio > 0.5:
return 'dynamic_range_mapping'
# 分數集中在中間使用 sigmoid 轉換
elif 0.4 <= distribution.mean <= 0.6 and distribution.std < 0.1:
return 'sigmoid_transformation'
# 其他情況使用高斯正規化
else:
return 'gaussian_normalization'
def _dynamic_range_mapping(self, scores: List[float],
distribution: ScoreDistribution) -> List[float]:
"""動態範圍映射校準"""
try:
scores_array = np.array(scores)
# 使用5%和95%百分位數作為邊界
lower_bound = distribution.percentile_5
upper_bound = distribution.percentile_95
# 避免除零
if upper_bound - lower_bound < 0.001:
upper_bound = distribution.max_score
lower_bound = distribution.min_score
if upper_bound - lower_bound < 0.001:
return scores # 所有分數相同,無需校準
# 映射到目標範圍 [0.45, 0.95]
target_min, target_max = self.quality_thresholds['target_distribution_range']
# 線性映射
normalized = (scores_array - lower_bound) / (upper_bound - lower_bound)
normalized = np.clip(normalized, 0, 1) # 限制在 [0,1] 範圍
calibrated = target_min + normalized * (target_max - target_min)
return calibrated.tolist()
except Exception as e:
print(f"Error in dynamic range mapping: {str(e)}")
return scores
def _percentile_stretching(self, scores: List[float],
distribution: ScoreDistribution) -> List[float]:
"""百分位數拉伸校準"""
try:
scores_array = np.array(scores)
# 計算百分位數排名
percentile_ranks = stats.rankdata(scores_array, method='average') / len(scores_array)
# 使用平方根轉換來增強差異
stretched_ranks = np.sqrt(percentile_ranks)
# 映射到目標範圍
target_min, target_max = self.quality_thresholds['target_distribution_range']
calibrated = target_min + stretched_ranks * (target_max - target_min)
return calibrated.tolist()
except Exception as e:
print(f"Error in percentile stretching: {str(e)}")
return self._dynamic_range_mapping(scores, distribution)
def _gaussian_normalization(self, scores: List[float],
distribution: ScoreDistribution) -> List[float]:
"""高斯正規化校準"""
try:
scores_array = np.array(scores)
# Z-score 正規化
if distribution.std > 0:
z_scores = (scores_array - distribution.mean) / distribution.std
# 限制 Z-scores 在合理範圍內
z_scores = np.clip(z_scores, -3, 3)
else:
z_scores = np.zeros_like(scores_array)
# 轉換到目標範圍
target_min, target_max = self.quality_thresholds['target_distribution_range']
target_mean = (target_min + target_max) / 2
target_std = (target_max - target_min) / 6 # 3-sigma 範圍
calibrated = target_mean + z_scores * target_std
calibrated = np.clip(calibrated, target_min, target_max)
return calibrated.tolist()
except Exception as e:
print(f"Error in gaussian normalization: {str(e)}")
return self._dynamic_range_mapping(scores, distribution)
def _sigmoid_transformation(self, scores: List[float],
distribution: ScoreDistribution) -> List[float]:
"""Sigmoid 轉換校準"""
try:
scores_array = np.array(scores)
# 中心化分數
centered = scores_array - distribution.mean
# Sigmoid 轉換 (增強中等分數的差異)
sigmoid_factor = 10.0 # 控制 sigmoid 的陡峭程度
transformed = 1 / (1 + np.exp(-sigmoid_factor * centered))
# 映射到目標範圍
target_min, target_max = self.quality_thresholds['target_distribution_range']
calibrated = target_min + transformed * (target_max - target_min)
return calibrated.tolist()
except Exception as e:
print(f"Error in sigmoid transformation: {str(e)}")
return self._dynamic_range_mapping(scores, distribution)
def _preserve_ranking(self, original_scores: List[float],
calibrated_scores: List[float]) -> List[float]:
"""確保校準後的分數保持原始排名"""
try:
# 獲取原始排名
original_ranks = stats.rankdata([-score for score in original_scores], method='ordinal')
# 獲取校準後的排名
calibrated_with_ranks = list(zip(calibrated_scores, original_ranks))
# 按原始排名排序校準後的分數
calibrated_with_ranks.sort(key=lambda x: x[1])
# 重新分配分數以保持排名但使用校準後的分布
sorted_calibrated = sorted(calibrated_scores, reverse=True)
# 建立新的分數列表
preserved_scores = [0.0] * len(original_scores)
for i, (_, original_rank) in enumerate(calibrated_with_ranks):
# 找到原始位置
original_index = original_ranks.tolist().index(original_rank)
preserved_scores[original_index] = sorted_calibrated[i]
return preserved_scores
except Exception as e:
print(f"Error preserving ranking: {str(e)}")
return calibrated_scores
def _calculate_quality_metrics(self, original_scores: List[float],
calibrated_scores: List[float],
distribution: ScoreDistribution) -> Dict[str, float]:
"""計算校準品質指標"""
try:
original_array = np.array(original_scores)
calibrated_array = np.array(calibrated_scores)
# 範圍改善
original_range = np.max(original_array) - np.min(original_array)
calibrated_range = np.max(calibrated_array) - np.min(calibrated_array)
range_improvement = calibrated_range / max(0.001, original_range)
# 分離度改善 (相鄰分數間的平均差異)
original_sorted = np.sort(original_array)
calibrated_sorted = np.sort(calibrated_array)
original_separation = np.mean(np.diff(original_sorted)) if len(original_sorted) > 1 else 0
calibrated_separation = np.mean(np.diff(calibrated_sorted)) if len(calibrated_sorted) > 1 else 0
separation_improvement = (calibrated_separation / max(0.001, original_separation)
if original_separation > 0 else 1.0)
# 排名保持度 (Spearman 相關係數)
if len(original_scores) > 1:
rank_correlation, _ = stats.spearmanr(original_scores, calibrated_scores)
rank_correlation = abs(rank_correlation) if not np.isnan(rank_correlation) else 1.0
else:
rank_correlation = 1.0
# 分布品質
calibrated_std = np.std(calibrated_array)
distribution_quality = min(1.0, calibrated_std * 2) # 標準差越大品質越好(在合理範圍內)
return {
'range_improvement': range_improvement,
'separation_improvement': separation_improvement,
'rank_preservation': rank_correlation,
'distribution_quality': distribution_quality,
'effective_range_achieved': calibrated_range,
'compression_reduction': max(0, distribution.compression_ratio -
(1.0 - calibrated_range))
}
except Exception as e:
print(f"Error calculating quality metrics: {str(e)}")
return {'error': str(e)}
def _distribution_to_dict(self, distribution: ScoreDistribution) -> Dict[str, float]:
"""將分布統計轉換為字典"""
return {
'mean': distribution.mean,
'std': distribution.std,
'min_score': distribution.min_score,
'max_score': distribution.max_score,
'percentile_5': distribution.percentile_5,
'percentile_95': distribution.percentile_95,
'compression_ratio': distribution.compression_ratio,
'effective_range': distribution.effective_range
}
def apply_tie_breaking(self, breed_scores: List[Tuple[str, float]]) -> List[Tuple[str, float]]:
"""應用確定性的打破平手機制"""
try:
# 按分數分組
score_groups = {}
for breed, score in breed_scores:
rounded_score = round(score, 6) # 避免浮點數精度問題
if rounded_score not in score_groups:
score_groups[rounded_score] = []
score_groups[rounded_score].append((breed, score))
# 處理每個分數組
result = []
for rounded_score in sorted(score_groups.keys(), reverse=True):
group = score_groups[rounded_score]
if len(group) == 1:
result.extend(group)
else:
# 按品種名稱字母順序打破平手
sorted_group = sorted(group, key=lambda x: x[0])
# 為平手的品種分配微小的分數差異
for i, (breed, original_score) in enumerate(sorted_group):
adjusted_score = original_score - (i * 0.0001)
result.append((breed, adjusted_score))
return result
except Exception as e:
print(f"Error in tie breaking: {str(e)}")
return breed_scores
def get_calibration_summary(self, result: CalibrationResult) -> Dict[str, Any]:
"""獲取校準摘要資訊"""
try:
summary = {
'method_used': result.calibration_method,
'breeds_processed': len(result.original_scores),
'score_range_before': {
'min': min(result.original_scores) if result.original_scores else 0,
'max': max(result.original_scores) if result.original_scores else 0,
'range': (max(result.original_scores) - min(result.original_scores))
if result.original_scores else 0
},
'score_range_after': {
'min': min(result.calibrated_scores) if result.calibrated_scores else 0,
'max': max(result.calibrated_scores) if result.calibrated_scores else 0,
'range': (max(result.calibrated_scores) - min(result.calibrated_scores))
if result.calibrated_scores else 0
},
'distribution_stats': result.distribution_stats,
'quality_metrics': result.quality_metrics,
'improvement_summary': {
'range_expanded': result.quality_metrics.get('range_improvement', 1.0) > 1.1,
'separation_improved': result.quality_metrics.get('separation_improvement', 1.0) > 1.1,
'ranking_preserved': result.quality_metrics.get('rank_preservation', 1.0) > 0.95
}
}
return summary
except Exception as e:
print(f"Error generating calibration summary: {str(e)}")
return {'error': str(e)}
def calibrate_breed_scores(breed_scores: List[Tuple[str, float]],
method: str = 'auto') -> CalibrationResult:
"""
便利函數:校準品種分數
Args:
breed_scores: (breed_name, score) 元組列表
method: 校準方法
Returns:
CalibrationResult: 校準結果
"""
calibrator = ScoreCalibrator()
return calibrator.calibrate_scores(breed_scores, method)
def get_calibrated_rankings(breed_scores: List[Tuple[str, float]],
method: str = 'auto') -> List[Tuple[str, float, int]]:
"""
便利函數:獲取校準後的排名
Args:
breed_scores: (breed_name, score) 元組列表
method: 校準方法
Returns:
List[Tuple[str, float, int]]: (breed_name, calibrated_score, rank) 列表
"""
calibrator = ScoreCalibrator()
result = calibrator.calibrate_scores(breed_scores, method)
# 打破平手機制
calibrated_with_breed = [(breed, result.score_mapping[breed]) for breed in result.score_mapping]
calibrated_with_tie_breaking = calibrator.apply_tie_breaking(calibrated_with_breed)
# 添加排名
ranked_results = []
for rank, (breed, score) in enumerate(calibrated_with_tie_breaking, 1):
ranked_results.append((breed, score, rank))
return ranked_results