import argparse import numpy as np import itertools from typing import Union, List from tqdm import tqdm from pebble import ProcessPool from concurrent.futures import TimeoutError # from grader import * from parser import * from utils import load_jsonl from python_executor import PythonExecutor from math_verify import verify, parse def new_math_equal_process(params): idx, pred, gt = params try: pred = parse('\\boxed{' + pred + '}') gt = parse('\\boxed{' + gt + '}') return verify(gt, pred) except Exception as e: print(f"Error in sample {idx}: {e}") return False def estimate_pass_at_k( num_samples: Union[int, List[int], np.ndarray], num_correct: Union[List[int], np.ndarray], k: int ) -> np.ndarray: """ Estimates pass@k of each problem and returns them in an array. """ def estimator(n: int, c: int, k: int) -> float: """ Calculates 1 - comb(n - c, k) / comb(n, k). """ if n - c < k: return 1.0 return 1.0 - np.prod(1.0 - k / np.arange(n - c + 1, n + 1)) if isinstance(num_samples, int): num_samples_it = itertools.repeat(num_samples, len(num_correct)) else: assert len(num_samples) == len(num_correct) num_samples_it = iter(num_samples) return np.array([estimator(int(n), int(c), k) for n, c in zip(num_samples_it, num_correct)]) def evaluate(data_name, prompt_type, samples: list=None, file_path: str=None, max_num_samples=None, execute=False): assert samples or file_path, "samples or file_path must be provided" if not samples: samples = list(load_jsonl(file_path)) if 'idx' in samples[0]: samples = {sample['idx']: sample for sample in samples}.values() samples = sorted(samples, key=lambda x: x['idx']) else: samples = [dict(idx=idx, **sample) for idx, sample in enumerate(samples)] if max_num_samples: print(f"max_num_samples: {max_num_samples} / {len(samples)}") samples = samples[:max_num_samples] # parse gt for sample in samples: sample['gt_cot'], sample['gt'] = parse_ground_truth(sample, data_name) params = [(idx, pred, sample['gt']) for idx, sample in enumerate(samples) for pred in sample['pred']] scores = [] timeout_cnt = 0 with ProcessPool(max_workers=1) as pool: future = pool.map(new_math_equal_process, params, timeout=3) iterator = future.result() with tqdm(total=len(samples), desc="Evaluate") as progress_bar: while True: try: result = next(iterator) scores.append(result) except StopIteration: break except TimeoutError as error: print(error) scores.append(False) timeout_cnt += 1 except Exception as error: print(error.traceback) exit() progress_bar.update(1) # for debug only # import random # scores = [random.random() > 0.9 for _ in range(len(params))] idx = 0 score_mat = [] for sample in samples: sample['score'] = scores[idx: idx+len(sample['pred'])] assert len(sample['score']) == len(sample['pred']) score_mat.append(sample['score']) idx += len(sample['pred']) max_len = max([len(s) for s in score_mat]) for i, s in enumerate(score_mat): if len(s) < max_len: score_mat[i] = s + [s[-1]] * (max_len - len(s)) # pad # Convert score matrix to numpy array for easier manipulation score_mat_np = np.array(score_mat) # Calculate number of correct answers per problem num_correct = np.sum(score_mat_np, axis=1) # Calculate pass@k metrics for powers of 2 values k_values = [1] # Start with 1 power = 1 while 2**power <= max_len: # Add powers of 2 up to max_len k_values.append(2**power) power += 1 pass_at_k = {} for k in k_values: pass_at_k_estimates = estimate_pass_at_k(max_len, num_correct, k) pass_at_k[k] = float(np.round(np.mean(pass_at_k_estimates) * 100, decimals=1)) # Original metrics # Convert each row to a single boolean indicating if any True exists in the row row_eval = [any(row) for row in score_mat] # Calculate the average pass_acc = np.mean(row_eval) # output mean of each column of scores col_means = np.array(score_mat).mean() mean_score = float(np.round(col_means * 100, decimals=1)) result_json = { "num_samples": len(samples), "num_scores": len(scores), "timeout_samples": timeout_cnt, "empty_samples": len([s for s in samples if not s['pred'][-1]]), "acc": mean_score, "pass_acc": np.round(pass_acc*100, decimals=1), "pass@k": pass_at_k, } # each type score if "type" in samples[0]: type_scores = {} for sample in samples: if sample['type'] not in type_scores: type_scores[sample['type']] = [] type_scores[sample['type']].append(sample['score'][-1]) type_scores = {k: np.round(np.array(v).mean() * 100, decimals=1) for k, v in type_scores.items()} type_scores = {k: v for k, v in sorted(type_scores.items(), key=lambda item: item[0])} result_json['type_acc'] = type_scores print(result_json) return samples, result_json def parse_args(): parser = argparse.ArgumentParser() parser.add_argument("--data_name", type=str, default="math") parser.add_argument("--prompt_type", type=str, default="tool-integrated") parser.add_argument("--file_path", type=str, default=None, required=True) parser.add_argument("--max_num_samples", type=int, default=None) parser.add_argument("--execute", action="store_true") args = parser.parse_args() return args if __name__ == "__main__": args = parse_args() evaluate(data_name=args.data_name, prompt_type=args.prompt_type, file_path=args.file_path, max_num_samples=args.max_num_samples, execute=args.execute)