VARCO_Arena / varco_arena /calc_cost.py
sonsus's picture
others
c2ba4d5
from typing import Dict, List, Literal
import pandas as pd
from varco_arena_core.prompts import load_prompt
from varco_arena_core.tracking_utils import pricing
def calculate(
dataset_df: pd.DataFrame = None,
model_name: str = None,
matching_method: Literal["tournament", "league"] = None,
evalprompt: str = None,
):
# same tournament hashed with "tournament_idx" column in data_utils.py:load_all_data()
df = dataset_df
# estimate the number of total required tokens
"""
total_toks = num_matches * avg_num_toks
= num_matches * (prompt + output)
= num_matches * ((inst + src + gen_a + gen_b) + output)
if "tournament":
num_matches = (n_participants - 1) * n_testset
elif "league":
num_matches = combination(n_participants, 2) * n_testset
"""
# n_testset, n_models
n_testset = len(df.tournament_idx.unique())
n_models = len(df.model_id.unique())
# num_matches
n_participants = len(df.model_id.unique())
if matching_method == "tournament":
num_matches = n_participants - 1
elif matching_method == "league":
num_matches = n_participants * (n_participants - 1) / 2
else:
raise ValueError(
f"{matching_method=} is undefined! Should be in [tournament, league]"
)
# load prompt objects of use for later below
eval_task_2_prm = dict()
tasks = df.task.unique().tolist()
for task in tasks:
eval_task_2_prm[f"{evalprompt}_{task}"] = load_prompt(evalprompt, task=task)
# num_generated_tokens / model
generateds = df.apply(
lambda row: eval_task_2_prm[
f"{evalprompt}_{row.task}"
].get_expected_max_tokens_w_room(
model_name, room=1.01
), # here, prompt_obj will define tokenizer with `model_name`
axis=1,
)
# assert len(generateds) == n_testset * n_models, f"{len(generateds)=}, {n_testset=}, {n_models=}"
gen_tokens = generateds.sum() / n_models
# num_queried_tokens / model
"""
we don't know what model's output will proceeds to the finalist matches, so we need to approximate.
let's use average of per-prompt LLM's generated tokens.
"""
df["approximate_match_prompts"] = df.apply(
lambda row: eval_task_2_prm[f"{evalprompt}_{row.task}"].complete_prompt(
inst=row.instruction,
src=row.source,
out_a=row.generated,
out_b=row.generated,
task=row.task,
),
axis=1,
) # all the llm responses appears uniformly (not realistic)
query_tokens = (
df.apply(
lambda row: eval_task_2_prm[
f"{evalprompt}_{row.task}"
].get_num_tokens_from_messages(row.approximate_match_prompts),
axis=1,
).sum()
/ n_models
)
# to total tokens:
total_num_matches = n_testset * num_matches
total_num_prompt_tokens = query_tokens * (n_models - 1)
total_num_completion_tokens = gen_tokens * (n_models - 1)
total_cost = cost_in_usd(
model_name, total_num_prompt_tokens, total_num_completion_tokens
)
return (
total_num_matches,
int(total_num_prompt_tokens),
int(total_num_completion_tokens),
total_cost,
)
def num_tokens_from_messages(messages, tokenizer):
if tokenizer is None:
return 0
tokens_per_message = 3
tokens_per_name = 1
num_tokens = 0
for message in messages:
num_tokens += tokens_per_message
for key, value in message.items():
num_tokens += len(tokenizer.encode(value))
if key == "name":
num_tokens += tokens_per_name
num_tokens += 3 # every reply is primed with <|start|>assistant<|message|>
return num_tokens
def num_tokens_from_string(string, tokenizer):
if tokenizer is None:
return 0
return len(tokenizer.encode(string))
def cost_in_usd(model_name, num_prompt_tokens, num_completion_tokens):
# Check if the provided model is in the pricing dictionary
if model_name not in pricing:
return 0.0
# Calculate the cost in USD for input and output tokens separately
cost_in_usd = (num_prompt_tokens / 1_000_000) * pricing[model_name]["input"] + (
num_completion_tokens / 1_000_000
) * pricing[model_name]["output"]
return cost_in_usd