Leaderboard / utils.py
Thun09's picture
Update space
c36f3d8
import os
import random
import json
import pandas as pd
dimensions = ['Audience', 'Keyword', 'Format', 'Language', 'Length', 'Source']
def make_clickable_model(model_name, link):
return f'<a target="_blank" style="text-decoration: underline" href="{link}">{model_name}</a>'
def rerank():
for dimension in dimensions:
with open(f"all_dimensions/{dimension}.jsonl", "r") as f:
data = [json.loads(line) for line in f]
data.sort(key=lambda x: (x["WISE"], x["SICR"]), reverse=True)
# 排序完后按顺序重新赋值 Rank,这个Rank是从1开始的,且放在第一列
for i, d in enumerate(data):
d["Rank"] = i + 1
with open(f"all_dimensions/{dimension}.jsonl", "w") as f:
for d in data:
# 重新构建字典,使 Rank 成为第一个键
ordered_d = {"Rank": d["Rank"]}
ordered_d.update({k: v for k, v in d.items() if k != "Rank"})
f.write(json.dumps(ordered_d) + "\n")
def generate_sample_data():
model_names = []
for i in range(10):
model_names.append(f"Model_{i}")
for dimension in dimensions:
for model_name in model_names:
data = {
"Model": make_clickable_model(model_name, f"https://huggingface.co/"),
"WISE": round(random.uniform(0, 1), 2),
"SICR": round(random.uniform(0, 1), 2),
"nDCG@10(Original)": round(random.uniform(0, 1), 2),
"nDCG@10(Instructed)": round(random.uniform(0, 1), 2),
"nDCG@10(Reversely Instructed)": round(random.uniform(0, 1), 2),
"MRR@1(Original)": round(random.uniform(0, 1), 2),
"MRR@1(Instructed)": round(random.uniform(0, 1), 2),
"MRR@1(Reversely Instructed)": round(random.uniform(0, 1), 2),
}
with open(f"all_dimensions/{dimension}.jsonl", "a") as f:
f.write(json.dumps(data) + "\n")
def get_data(dimension):
with open(f"all_dimensions/{dimension}.jsonl", "r") as f:
data = [json.loads(line) for line in f]
return pd.DataFrame(data)
def get_submission_data():
if is_empty("temp"):
return pd.DataFrame()
data = []
with open("temp/Audience.jsonl", "r") as f:
data.extend([json.loads(line) for line in f])
return pd.DataFrame(data)
def submit(json_file):
flag, message = check_json_file(json_file)
if flag:
with open(json_file, "r") as f:
data = json.load(f)
if data['in_huggingface_hub']:
model_name = make_clickable_model(data["Model"], f"https://huggingface.co")
else:
if data["Model Link"]:
model_name = make_clickable_model(data["Model"], data["Model Link"])
else:
model_name = data["Model"]
all_dimension_data = data["dimensions"]
for dimension in dimensions:
each_dimension_data = all_dimension_data[dimension]
# 如果temp/{dimension}.jsonl文件不存在,则创建
if not os.path.exists(f"temp/{dimension}.jsonl"):
with open(f"temp/{dimension}.jsonl", "w"):
pass
with open(f"temp/{dimension}.jsonl", "a") as f:
f.write(json.dumps({
"Model": model_name,
"WISE": each_dimension_data["WISE"],
"SICR": each_dimension_data["SICR"],
"nDCG@10(Original)": each_dimension_data["nDCG@10(Original)"],
"nDCG@10(Instructed)": each_dimension_data["nDCG@10(Instructed)"],
"nDCG@10(Reversely Instructed)": each_dimension_data["nDCG@10(Reversely Instructed)"],
"MRR@1(Original)": each_dimension_data["MRR@1(Original)"],
"MRR@1(Instructed)": each_dimension_data["MRR@1(Instructed)"],
"MRR@1(Reversely Instructed)": each_dimension_data["MRR@1(Reversely Instructed)"]
}) + "\n")
return "Submission successful."
else:
return message
def refresh():
if is_empty("temp"):
return
for dimension in dimensions:
# 读取temp/{dimension}.jsonl文件
with open(f"temp/{dimension}.jsonl", "r") as f:
data = [json.loads(line) for line in f]
# 将其写入all_dimensions/{dimension}.jsonl文件
# 如果存在相同的模型,则覆盖
with open(f"all_dimensions/{dimension}.jsonl", "r") as f:
all_data = [json.loads(line) for line in f]
for d in data:
for i, ad in enumerate(all_data):
if ad["Model"] == d["Model"]:
all_data[i] = d
break
else:
all_data.append(d)
with open(f"all_dimensions/{dimension}.jsonl", "w") as f:
for d in all_data:
f.write(json.dumps(d) + "\n")
# 删除temp/{dimension}.jsonl文件
os.remove(f"temp/{dimension}.jsonl")
rerank()
def check_json_file(json_file):
with open(json_file, "r") as f:
try:
data = json.load(f)
except json.JSONDecodeError:
return False, "JSON file is not valid JSON."
# 检查Model是否已在temp文件夹中
submission_queue_df = get_submission_data()
if any([data["Model"] in row["Model"] for _, row in submission_queue_df.iterrows()]):
return False, "Model already in submission queue."
# 检查dimensions键是否存在且是否存在对应的值('Audience', 'Keyword', 'Format', 'Language', 'Length', 'Source')
if "dimensions" not in data:
return False, "JSON file does not contain 'dimensions' key.",
all_dimension_data = data["dimensions"]
if not all([d in all_dimension_data for d in dimensions]):
return False, "JSON file does not contain all dimensions.",
# 检查每一个维度的数据是否符合要求( WISE, SICR, nDCG@10(Original), nDCG@10(Instructed), nDCG@10(Reversely Instructed), MRR@1(Original), MRR@1(Instructed), MRR@1(Reversely Instructed))
for d in dimensions:
each_dimension_data = all_dimension_data[d]
if not all(k in each_dimension_data for k in
["WISE", "SICR", "nDCG@10(Original)", "nDCG@10(Instructed)",
"nDCG@10(Reversely Instructed)", "MRR@1(Original)", "MRR@1(Instructed)",
"MRR@1(Reversely Instructed)"]):
return False, f"Dimension '{d}' does not contain all required keys.",
return True, "JSON file is valid."
def is_empty(dir_path):
# check if the directory contains jsonl files
return not any([f.endswith(".jsonl") for f in os.listdir(dir_path)])