Spaces:
Sleeping
Sleeping
import os | |
import random | |
import json | |
import pandas as pd | |
dimensions = ['Audience', 'Keyword', 'Format', 'Language', 'Length', 'Source'] | |
def make_clickable_model(model_name, link): | |
return f'<a target="_blank" style="text-decoration: underline" href="{link}">{model_name}</a>' | |
def rerank(): | |
for dimension in dimensions: | |
with open(f"all_dimensions/{dimension}.jsonl", "r") as f: | |
data = [json.loads(line) for line in f] | |
data.sort(key=lambda x: (x["WISE"], x["SICR"]), reverse=True) | |
# 排序完后按顺序重新赋值 Rank,这个Rank是从1开始的,且放在第一列 | |
for i, d in enumerate(data): | |
d["Rank"] = i + 1 | |
with open(f"all_dimensions/{dimension}.jsonl", "w") as f: | |
for d in data: | |
# 重新构建字典,使 Rank 成为第一个键 | |
ordered_d = {"Rank": d["Rank"]} | |
ordered_d.update({k: v for k, v in d.items() if k != "Rank"}) | |
f.write(json.dumps(ordered_d) + "\n") | |
def generate_sample_data(): | |
model_names = [] | |
for i in range(10): | |
model_names.append(f"Model_{i}") | |
for dimension in dimensions: | |
for model_name in model_names: | |
data = { | |
"Model": make_clickable_model(model_name, f"https://huggingface.co/"), | |
"WISE": round(random.uniform(0, 1), 2), | |
"SICR": round(random.uniform(0, 1), 2), | |
"nDCG@10(Original)": round(random.uniform(0, 1), 2), | |
"nDCG@10(Instructed)": round(random.uniform(0, 1), 2), | |
"nDCG@10(Reversely Instructed)": round(random.uniform(0, 1), 2), | |
"MRR@1(Original)": round(random.uniform(0, 1), 2), | |
"MRR@1(Instructed)": round(random.uniform(0, 1), 2), | |
"MRR@1(Reversely Instructed)": round(random.uniform(0, 1), 2), | |
} | |
with open(f"all_dimensions/{dimension}.jsonl", "a") as f: | |
f.write(json.dumps(data) + "\n") | |
def get_data(dimension): | |
with open(f"all_dimensions/{dimension}.jsonl", "r") as f: | |
data = [json.loads(line) for line in f] | |
return pd.DataFrame(data) | |
def get_submission_data(): | |
if is_empty("temp"): | |
return pd.DataFrame() | |
data = [] | |
with open("temp/Audience.jsonl", "r") as f: | |
data.extend([json.loads(line) for line in f]) | |
return pd.DataFrame(data) | |
def submit(json_file): | |
flag, message = check_json_file(json_file) | |
if flag: | |
with open(json_file, "r") as f: | |
data = json.load(f) | |
if data['in_huggingface_hub']: | |
model_name = make_clickable_model(data["Model"], f"https://huggingface.co") | |
else: | |
if data["Model Link"]: | |
model_name = make_clickable_model(data["Model"], data["Model Link"]) | |
else: | |
model_name = data["Model"] | |
all_dimension_data = data["dimensions"] | |
for dimension in dimensions: | |
each_dimension_data = all_dimension_data[dimension] | |
# 如果temp/{dimension}.jsonl文件不存在,则创建 | |
if not os.path.exists(f"temp/{dimension}.jsonl"): | |
with open(f"temp/{dimension}.jsonl", "w"): | |
pass | |
with open(f"temp/{dimension}.jsonl", "a") as f: | |
f.write(json.dumps({ | |
"Model": model_name, | |
"WISE": each_dimension_data["WISE"], | |
"SICR": each_dimension_data["SICR"], | |
"nDCG@10(Original)": each_dimension_data["nDCG@10(Original)"], | |
"nDCG@10(Instructed)": each_dimension_data["nDCG@10(Instructed)"], | |
"nDCG@10(Reversely Instructed)": each_dimension_data["nDCG@10(Reversely Instructed)"], | |
"MRR@1(Original)": each_dimension_data["MRR@1(Original)"], | |
"MRR@1(Instructed)": each_dimension_data["MRR@1(Instructed)"], | |
"MRR@1(Reversely Instructed)": each_dimension_data["MRR@1(Reversely Instructed)"] | |
}) + "\n") | |
return "Submission successful." | |
else: | |
return message | |
def refresh(): | |
if is_empty("temp"): | |
return | |
for dimension in dimensions: | |
# 读取temp/{dimension}.jsonl文件 | |
with open(f"temp/{dimension}.jsonl", "r") as f: | |
data = [json.loads(line) for line in f] | |
# 将其写入all_dimensions/{dimension}.jsonl文件 | |
# 如果存在相同的模型,则覆盖 | |
with open(f"all_dimensions/{dimension}.jsonl", "r") as f: | |
all_data = [json.loads(line) for line in f] | |
for d in data: | |
for i, ad in enumerate(all_data): | |
if ad["Model"] == d["Model"]: | |
all_data[i] = d | |
break | |
else: | |
all_data.append(d) | |
with open(f"all_dimensions/{dimension}.jsonl", "w") as f: | |
for d in all_data: | |
f.write(json.dumps(d) + "\n") | |
# 删除temp/{dimension}.jsonl文件 | |
os.remove(f"temp/{dimension}.jsonl") | |
rerank() | |
def check_json_file(json_file): | |
with open(json_file, "r") as f: | |
try: | |
data = json.load(f) | |
except json.JSONDecodeError: | |
return False, "JSON file is not valid JSON." | |
# 检查Model是否已在temp文件夹中 | |
submission_queue_df = get_submission_data() | |
if any([data["Model"] in row["Model"] for _, row in submission_queue_df.iterrows()]): | |
return False, "Model already in submission queue." | |
# 检查dimensions键是否存在且是否存在对应的值('Audience', 'Keyword', 'Format', 'Language', 'Length', 'Source') | |
if "dimensions" not in data: | |
return False, "JSON file does not contain 'dimensions' key.", | |
all_dimension_data = data["dimensions"] | |
if not all([d in all_dimension_data for d in dimensions]): | |
return False, "JSON file does not contain all dimensions.", | |
# 检查每一个维度的数据是否符合要求( WISE, SICR, nDCG@10(Original), nDCG@10(Instructed), nDCG@10(Reversely Instructed), MRR@1(Original), MRR@1(Instructed), MRR@1(Reversely Instructed)) | |
for d in dimensions: | |
each_dimension_data = all_dimension_data[d] | |
if not all(k in each_dimension_data for k in | |
["WISE", "SICR", "nDCG@10(Original)", "nDCG@10(Instructed)", | |
"nDCG@10(Reversely Instructed)", "MRR@1(Original)", "MRR@1(Instructed)", | |
"MRR@1(Reversely Instructed)"]): | |
return False, f"Dimension '{d}' does not contain all required keys.", | |
return True, "JSON file is valid." | |
def is_empty(dir_path): | |
# check if the directory contains jsonl files | |
return not any([f.endswith(".jsonl") for f in os.listdir(dir_path)]) | |