Spaces:
Running
Running
from typing import Dict, List, Literal | |
import pandas as pd | |
from varco_arena_core.prompts import load_prompt | |
from varco_arena_core.tracking_utils import pricing | |
def calculate( | |
dataset_df: pd.DataFrame = None, | |
model_name: str = None, | |
matching_method: Literal["tournament", "league"] = None, | |
evalprompt: str = None, | |
): | |
# same tournament hashed with "tournament_idx" column in data_utils.py:load_all_data() | |
df = dataset_df | |
# estimate the number of total required tokens | |
""" | |
total_toks = num_matches * avg_num_toks | |
= num_matches * (prompt + output) | |
= num_matches * ((inst + src + gen_a + gen_b) + output) | |
if "tournament": | |
num_matches = (n_participants - 1) * n_testset | |
elif "league": | |
num_matches = combination(n_participants, 2) * n_testset | |
""" | |
# n_testset, n_models | |
n_testset = len(df.tournament_idx.unique()) | |
n_models = len(df.model_id.unique()) | |
# num_matches | |
n_participants = len(df.model_id.unique()) | |
if matching_method == "tournament": | |
num_matches = n_participants - 1 | |
elif matching_method == "league": | |
num_matches = n_participants * (n_participants - 1) / 2 | |
else: | |
raise ValueError( | |
f"{matching_method=} is undefined! Should be in [tournament, league]" | |
) | |
# load prompt objects of use for later below | |
eval_task_2_prm = dict() | |
tasks = df.task.unique().tolist() | |
for task in tasks: | |
eval_task_2_prm[f"{evalprompt}_{task}"] = load_prompt(evalprompt, task=task) | |
# num_generated_tokens / model | |
generateds = df.apply( | |
lambda row: eval_task_2_prm[ | |
f"{evalprompt}_{row.task}" | |
].get_expected_max_tokens_w_room( | |
model_name, room=1.01 | |
), # here, prompt_obj will define tokenizer with `model_name` | |
axis=1, | |
) | |
# assert len(generateds) == n_testset * n_models, f"{len(generateds)=}, {n_testset=}, {n_models=}" | |
gen_tokens = generateds.sum() / n_models | |
# num_queried_tokens / model | |
""" | |
we don't know what model's output will proceeds to the finalist matches, so we need to approximate. | |
let's use average of per-prompt LLM's generated tokens. | |
""" | |
df["approximate_match_prompts"] = df.apply( | |
lambda row: eval_task_2_prm[f"{evalprompt}_{row.task}"].complete_prompt( | |
inst=row.instruction, | |
src=row.source, | |
out_a=row.generated, | |
out_b=row.generated, | |
task=row.task, | |
), | |
axis=1, | |
) # all the llm responses appears uniformly (not realistic) | |
query_tokens = ( | |
df.apply( | |
lambda row: eval_task_2_prm[ | |
f"{evalprompt}_{row.task}" | |
].get_num_tokens_from_messages(row.approximate_match_prompts), | |
axis=1, | |
).sum() | |
/ n_models | |
) | |
# to total tokens: | |
total_num_matches = n_testset * num_matches | |
total_num_prompt_tokens = query_tokens * (n_models - 1) | |
total_num_completion_tokens = gen_tokens * (n_models - 1) | |
total_cost = cost_in_usd( | |
model_name, total_num_prompt_tokens, total_num_completion_tokens | |
) | |
return ( | |
total_num_matches, | |
int(total_num_prompt_tokens), | |
int(total_num_completion_tokens), | |
total_cost, | |
) | |
def num_tokens_from_messages(messages, tokenizer): | |
if tokenizer is None: | |
return 0 | |
tokens_per_message = 3 | |
tokens_per_name = 1 | |
num_tokens = 0 | |
for message in messages: | |
num_tokens += tokens_per_message | |
for key, value in message.items(): | |
num_tokens += len(tokenizer.encode(value)) | |
if key == "name": | |
num_tokens += tokens_per_name | |
num_tokens += 3 # every reply is primed with <|start|>assistant<|message|> | |
return num_tokens | |
def num_tokens_from_string(string, tokenizer): | |
if tokenizer is None: | |
return 0 | |
return len(tokenizer.encode(string)) | |
def cost_in_usd(model_name, num_prompt_tokens, num_completion_tokens): | |
# Check if the provided model is in the pricing dictionary | |
if model_name not in pricing: | |
return 0.0 | |
# Calculate the cost in USD for input and output tokens separately | |
cost_in_usd = (num_prompt_tokens / 1_000_000) * pricing[model_name]["input"] + ( | |
num_completion_tokens / 1_000_000 | |
) * pricing[model_name]["output"] | |
return cost_in_usd | |