#!/usr/bin/env python3 """ 모델별 특정 문제 응답 비교 도구 (전체 응답 출력 버전) 사용법: python compare_models.py --benchmark aime24 --problem_idx 0 --models qwen25_7b_base azr_coder_7b azr_base_7b 주요 변경사항: - 문제 텍스트와 추론 과정을 전체 출력 (길이 제한 제거) - 더 상세한 분석을 위한 완전한 응답 확인 가능 """ import json import argparse import os import glob import sys from typing import List, Dict, Any import re def extract_answer_from_response(response: str) -> str: """응답에서 최종 답안 추출""" # \boxed{...} 패턴 찾기 boxed_pattern = r'\\boxed\{([^}]+)\}' matches = re.findall(boxed_pattern, response) if matches: return matches[-1] # 마지막 boxed 답안 사용 # 숫자 패턴 찾기 (fallback) number_pattern = r'\b\d+(?:\.\d+)?\b' numbers = re.findall(number_pattern, response) if numbers: return numbers[-1] return "답안을 찾을 수 없음" def load_problem_result(model_dir: str, benchmark: str, problem_idx: int) -> Dict[str, Any]: """특정 모델의 특정 문제 결과 로드""" # 결과 파일 찾기 result_files = glob.glob(f"{model_dir}/{benchmark}/*.jsonl") if not result_files: return {"error": f"벤치마크 '{benchmark}' 결과 파일을 찾을 수 없음"} result_file = result_files[0] # 첫 번째 파일 사용 try: with open(result_file, 'r', encoding='utf-8') as f: lines = f.readlines() if problem_idx >= len(lines): return {"error": f"문제 인덱스 {problem_idx}가 범위를 벗어남 (총 {len(lines)}개 문제)"} problem_data = json.loads(lines[problem_idx]) return problem_data except Exception as e: return {"error": f"파일 읽기 오류: {str(e)}"} def format_response_text(text: str, max_length: int = None) -> str: """응답 텍스트 포맷팅""" if max_length is not None and len(text) > max_length: return text[:max_length] + "... (truncated)" return text def compare_models_on_problem( benchmark: str, problem_idx: int, model_dirs: List[str], results_base_dir: str = "/home/ubuntu/RLVR/Absolute-Zero-Reasoner/evaluation/math_eval/EVAL/results" ): """특정 문제에 대한 모델별 비교""" print("=" * 100) print(f"📊 벤치마크: {benchmark.upper()}") print(f"🔢 문제 번호: {problem_idx}") print("=" * 100) # 각 모델의 결과 로드 model_results = {} problem_text = None correct_answer = None for model_name in model_dirs: model_path = os.path.join(results_base_dir, model_name) result = load_problem_result(model_path, benchmark, problem_idx) model_results[model_name] = result # 문제 텍스트와 정답은 첫 번째 성공한 모델에서 가져오기 if problem_text is None and "error" not in result: problem_text = result.get("question", "문제를 찾을 수 없음") correct_answer = result.get("gt", result.get("answer", "정답을 찾을 수 없음")) # 문제 정보 출력 print(f"📝 문제:") print("-" * 80) print(format_response_text(problem_text)) print() print(f"✅ 정답: {correct_answer}") print() # 모델 출력 순서 정의 (선호하는 순서) preferred_order = [ "qwen25_7b_base", "azr_base_7b", "qwen25_7b_coder", "azr_coder_7b" ] # 입력된 모델들을 선호 순서대로 정렬 ordered_models = [] # 먼저 선호 순서에 있는 모델들을 순서대로 추가 for preferred_model in preferred_order: if preferred_model in model_dirs: ordered_models.append(preferred_model) # 선호 순서에 없는 모델들을 나머지에 추가 for model in model_dirs: if model not in ordered_models: ordered_models.append(model) # 모델별 결과 비교 print("🤖 모델별 응답 비교:") print("=" * 100) for model_name in ordered_models: result = model_results[model_name] print(f"\n🔸 {model_name.upper()}") print("-" * 60) if "error" in result: print(f"❌ 오류: {result['error']}") continue # 모델 예측과 정답 여부 model_pred = result.get("pred", [""])[0] if result.get("pred") else "" is_correct = result.get("score", [False])[0] if result.get("score") else False # 전체 응답에서 답안 추출 full_response = result.get("code", [""])[0] if result.get("code") else "" extracted_answer = extract_answer_from_response(full_response) # 결과 출력 status = "✅ 정답" if is_correct else "❌ 오답" print(f"결과: {status}") print(f"모델 답안: {model_pred}") print(f"추출된 답안: {extracted_answer}") print() # 추론 과정 (전체) if full_response: print("🧠 추론 과정 (전체):") print(format_response_text(full_response)) else: print("추론 과정을 찾을 수 없음") print("-" * 60) # 요약 통계 print(f"\n📈 요약:") print("-" * 40) correct_models = [] wrong_models = [] error_models = [] for model_name in ordered_models: result = model_results[model_name] if "error" in result: error_models.append(model_name) elif result.get("score", [False])[0]: correct_models.append(model_name) else: wrong_models.append(model_name) if correct_models: print(f"✅ 정답 모델: {', '.join(correct_models)}") if wrong_models: print(f"❌ 오답 모델: {', '.join(wrong_models)}") if error_models: print(f"⚠️ 오류 모델: {', '.join(error_models)}") print(f"정답률: {len(correct_models)}/{len(model_dirs) - len(error_models)} ({len(correct_models)/(len(model_dirs) - len(error_models))*100:.1f}%)") def list_available_benchmarks(results_base_dir: str, model_name: str) -> List[str]: """사용 가능한 벤치마크 목록 반환""" model_path = os.path.join(results_base_dir, model_name) if not os.path.exists(model_path): return [] benchmarks = [] for item in os.listdir(model_path): item_path = os.path.join(model_path, item) if os.path.isdir(item_path): benchmarks.append(item) return sorted(benchmarks) def get_problem_count(results_base_dir: str, model_name: str, benchmark: str) -> int: """특정 벤치마크의 문제 수 반환""" result_files = glob.glob(f"{results_base_dir}/{model_name}/{benchmark}/*.jsonl") if not result_files: return 0 try: with open(result_files[0], 'r', encoding='utf-8') as f: return len(f.readlines()) except: return 0 def main(): parser = argparse.ArgumentParser(description="모델별 특정 문제 응답 비교") parser.add_argument("--benchmark", "-b", type=str, required=True, help="벤치마크 이름 (예: aime24, aime25, amc23, math500)") parser.add_argument("--problem_idx", "-p", type=int, required=True, help="문제 인덱스 (0부터 시작)") parser.add_argument("--models", "-m", nargs="+", required=True, help="비교할 모델 디렉토리 이름들") parser.add_argument("--results_dir", "-r", type=str, default="/home/ubuntu/RLVR/Absolute-Zero-Reasoner/evaluation/math_eval/EVAL/results", help="결과 디렉토리 경로") parser.add_argument("--list", "-l", action="store_true", help="사용 가능한 벤치마크와 문제 수 나열") parser.add_argument("--full", "-f", action="store_true", help="전체 응답 출력 (기본값: True, 이제 항상 전체 출력)") parser.add_argument("--output", "-o", type=str, help="결과를 파일로 저장 (예: --output result.txt)") args = parser.parse_args() if args.list: print("📋 사용 가능한 정보:") print("-" * 50) # 첫 번째 모델 기준으로 벤치마크 나열 if args.models: benchmarks = list_available_benchmarks(args.results_dir, args.models[0]) for benchmark in benchmarks: problem_count = get_problem_count(args.results_dir, args.models[0], benchmark) print(f"• {benchmark}: {problem_count}개 문제 (인덱스 0-{problem_count-1})") return # 출력 파일 설정 output_file = None if args.output: try: output_file = open(args.output, 'w', encoding='utf-8') # 표준 출력을 파일로 리디렉션 original_stdout = sys.stdout sys.stdout = output_file print(f"결과를 {args.output} 파일에 저장합니다...") # 파일에 저장될 내용에서는 이 메시지를 제거하기 위해 다시 원래 stdout으로 출력 sys.stdout = original_stdout print(f"📁 결과를 {args.output} 파일에 저장합니다...") sys.stdout = output_file except Exception as e: print(f"❌ 파일 생성 오류: {e}") return try: # 모델 비교 실행 compare_models_on_problem( benchmark=args.benchmark, problem_idx=args.problem_idx, model_dirs=args.models, results_base_dir=args.results_dir ) finally: # 파일 출력인 경우 원래 stdout 복원 및 파일 닫기 if output_file: sys.stdout = original_stdout output_file.close() print(f"✅ 결과가 {args.output} 파일에 저장되었습니다.") print(f"📖 파일 내용 확인: cat {args.output}") print(f"📄 파일 열기: less {args.output}") print(f"🔍 특정 모델 검색: grep -A 10 'QWEN25_7B_BASE' {args.output}") if __name__ == "__main__": main()