LLMCalc / src /process_data.py
carbonnnnn's picture
minor fixes
834ab51
raw
history blame
7.72 kB
import pandas as pd
import json
import os
import pycountry
import re
from src.collect_data import fetch_version_metadata, fetch_registry_data
import assets.text_content as tc
PRICING_PATH = os.path.join('assets', 'pricing.json')
# Convert parameters to float, handling both B and T suffixes
def convert_parameters(param):
if pd.isna(param) or param == '':
return None
param = str(param)
if 'T' in param:
return float(param.replace('T', '')) * 1000
return float(param.replace('B', ''))
# Clean price strings by removing '$' and handling empty strings
def clean_price(price):
if pd.isna(price) or price == '':
return None
return float(price.replace('$', ''))
# Handle language mapping for both string and list inputs
def map_languages(languages):
if isinstance(languages, float) and pd.isna(languages):
return None
def get_language_name(lang):
# Clean and standardize the language code
lang = str(lang).strip().lower()
# Try to find the language
try:
# First try as language code (en, fr, etc.)
language = pycountry.languages.get(alpha_2=lang)
if not language:
# Try as language name (English, French, etc.)
language = pycountry.languages.get(name=lang.capitalize())
return language.name if language else lang
except (AttributeError, LookupError):
return lang
# Handle different input types
if isinstance(languages, list):
lang_list = languages
elif isinstance(languages, str):
lang_list = [l.strip() for l in languages.split(',')]
else:
try:
lang_list = list(languages)
except:
return str(languages)
# Map all languages and join them
return ', '.join(get_language_name(lang) for lang in lang_list)
# Extract multimodality fields
def get_multimodality_field(model_data, field):
try:
return model_data.get('model_config', {}).get('multimodality', {}).get(field, False)
except:
return False
def clean_model_name(model_name: str) -> str:
"""Clean model name by removing temperature suffix pattern."""
# Match pattern like -t0.0--, -t0.7--, -t1.0--, etc.
pattern = r'-t[0-1]\.[0-9]--'
return re.split(pattern, model_name)[0]
def merge_data():
mm_latency_df, mm_result_df, text_latency_df, text_result_df = fetch_version_metadata()
registry_data = fetch_registry_data()
with open(PRICING_PATH, 'r') as f:
pricing_data = json.load(f)
# Ensure the unnamed column is renamed to 'model'
mm_result_df.rename(columns={tc.DEFAULT_MODEL_NAME: 'model', tc.DEFAULT_CLEMSCORE: 'clemscore'}, inplace=True)
text_result_df.rename(columns={tc.DEFAULT_MODEL_NAME: 'model', tc.DEFAULT_CLEMSCORE: 'clemscore'}, inplace=True)
mm_result_df['model'] = mm_result_df['model'].apply(clean_model_name)
text_result_df['model'] = text_result_df['model'].apply(clean_model_name)
# Merge datasets to compute average values
avg_latency_df = pd.concat([mm_latency_df, text_latency_df], axis=0).groupby('model')['latency'].mean().reset_index()
avg_clemscore_df = pd.concat([mm_result_df, text_result_df], axis=0).groupby('model')['clemscore'].mean().reset_index()
# Merge latency, clemscore, registry, and pricing data
lat_clem_df = pd.merge(avg_latency_df, avg_clemscore_df, on='model', how='outer')
# Convert registry_data to DataFrame for easier merging
registry_df = pd.DataFrame(registry_data)
# Extract license info
registry_df['license_name'] = registry_df['license'].apply(lambda x: x['name'])
registry_df['license_url'] = registry_df['license'].apply(lambda x: x['url'])
# Add individual multimodality columns
registry_df['single_image'] = registry_df.apply(lambda x: get_multimodality_field(x, 'single_image'), axis=1)
registry_df['multiple_images'] = registry_df.apply(lambda x: get_multimodality_field(x, 'multiple_images'), axis=1)
registry_df['audio'] = registry_df.apply(lambda x: get_multimodality_field(x, 'audio'), axis=1)
registry_df['video'] = registry_df.apply(lambda x: get_multimodality_field(x, 'video'), axis=1)
# Update columns list to include new multimodality fields
registry_df = registry_df[[
'model_name', 'parameters', 'release_date', 'open_weight',
'languages', 'context_size', 'license_name', 'license_url',
'single_image', 'multiple_images', 'audio', 'video'
]]
# Merge with previous data
merged_df = pd.merge(
lat_clem_df,
registry_df,
left_on='model',
right_on='model_name',
how='inner'
)
# Update column renaming
merged_df = merged_df.rename(columns={
'model': tc.MODEL_NAME,
'latency': tc.LATENCY,
'clemscore': tc.CLEMSCORE,
'parameters': tc.PARAMS,
'release_date': tc.RELEASE_DATE,
'open_weight': tc.OPEN_WEIGHT,
'languages': tc.LANGS,
'context_size': tc.CONTEXT,
'license_name': tc.LICENSE_NAME,
'license_url': tc.LICENSE_URL,
'single_image': tc.SINGLE_IMG,
'multiple_images': tc.MULT_IMG,
'audio': tc.AUDIO,
'video': tc.VIDEO
})
# Convert pricing_data list to DataFrame
pricing_df = pd.DataFrame(pricing_data)
pricing_df['input'] = pricing_df['input'].apply(clean_price)
pricing_df['output'] = pricing_df['output'].apply(clean_price)
# Merge pricing data with the existing dataframe
merged_df = pd.merge(
merged_df,
pricing_df,
left_on='Model Name',
right_on='model_id',
how='left'
)
# Drop duplicate model column and rename price columns
merged_df = merged_df.drop('model_id', axis=1)
merged_df = merged_df.rename(columns={
'input': tc.INPUT,
'output': tc.OUTPUT
})
# Fill NaN values with 0.0 for pricing columns
merged_df[tc.INPUT] = merged_df[tc.INPUT].fillna(0.0)
merged_df[tc.OUTPUT] = merged_df[tc.OUTPUT].fillna(0.0)
# Convert parameters and set to None for commercial models
merged_df[tc.PARAMS] = merged_df.apply(
lambda row: None if not row[tc.OPEN_WEIGHT] else convert_parameters(row[tc.PARAMS]),
axis=1
)
merged_df[tc.LICENSE] = merged_df.apply(
lambda row: f'[{row[tc.LICENSE_NAME]}]({row[tc.LICENSE_URL]})', axis=1
)
merged_df[tc.TEMP_DATE] = merged_df[tc.RELEASE_DATE]
merged_df[tc.LANGS] = merged_df[tc.LANGS].apply(map_languages)
# Sort by Clemscore in descending order
merged_df = merged_df.sort_values(by=tc.CLEMSCORE, ascending=False)
# Drop model_name column
merged_df.drop(columns=['model_name'], inplace=True)
# Clean up context and convert to integer
merged_df[tc.CONTEXT] = merged_df[tc.CONTEXT].astype(str).str.replace('k', '', regex=False)
merged_df[tc.CONTEXT] = pd.to_numeric(merged_df[tc.CONTEXT], errors='coerce').fillna(0).astype(int)
# Handle commercial model parameters / Set to max of open models
# Find the maximum value of tc.PARAMS where tc.OPEN_WEIGHT is True
max_params_value = merged_df.loc[merged_df[tc.OPEN_WEIGHT], tc.PARAMS].max()
# Create a new dummy PARAM column
merged_df[tc.DUMMY_PARAMS] = merged_df.apply(
lambda row: max_params_value if not row[tc.OPEN_WEIGHT] else row[tc.PARAMS],
axis=1
)
return merged_df
if __name__=='__main__':
merged_df = merge_data()
# # Save to CSV
output_path = os.path.join('assets', 'merged_data.csv')
merged_df.to_csv(output_path, index=False)