Spaces:
Running
Running
import argparse | |
import json | |
import os | |
import random | |
import re | |
from collections import defaultdict | |
def list_directories(path): | |
# μ§μ λ κ²½λ‘μ μλ νλͺ©λ€μ 리μ€νΈλ‘ λ°μμ΄ | |
items = os.listdir(path) | |
# νλͺ©λ€ μ€μμ λλ ν 리(ν΄λ)λ§μ νν°λ§ | |
directories = [item for item in items if os.path.isdir(os.path.join(path, item))] | |
return directories | |
def parse_by_regex(string): | |
varco_template_w_src = r"μλλ μμ μ μ€λͺ νλ λͺ λ Ήμ΄μ μΆκ°μ λ§₯λ½μ μ 곡νλ μ λ ₯μ΄ μ§μ μ΄λ£¨λ μμ μ λλ€.\nμ£Όμ΄μ§ μ λ ₯μ λν΄ λͺ λ Ήμ΄λ₯Ό μ μ ν μννλ μλ΅μ μμ±νμΈμ.\n\n### μ λ ₯:\n(?P<source>.*?)\n\n### λͺ λ Ήμ΄:\n(?P<instruction>.*?)\n\n### μλ΅:\n" | |
varco_template_wo_src = r"μλλ μμ μ μ€λͺ νλ λͺ λ Ήμ΄μ λλ€.\nλͺ λ Ήμ΄μ λ°λ₯Έ μμ²μ μ μ ν μλ£νλ μλ΅μ μμ±νμΈμ.\n\n### λͺ λ Ήμ΄:\n(?P<instruction>.*?)\n\n### μλ΅:\n" | |
if re.compile(varco_template_w_src, flags=re.MULTILINE | re.DOTALL).match(string): | |
match = re.compile(varco_template_w_src, flags=re.MULTILINE | re.DOTALL).match( | |
string | |
) | |
source = match.group("source") | |
instruction = match.group("instruction") | |
elif re.compile(varco_template_wo_src, flags=re.MULTILINE | re.DOTALL).match( | |
string | |
): | |
match = re.compile(varco_template_wo_src, flags=re.MULTILINE | re.DOTALL).match( | |
string | |
) | |
source = "" | |
instruction = match.group("instruction") | |
else: | |
source = None | |
instruction = None | |
return source, instruction | |
# path μ μλ result.json νμΌ μ½μ΄μ μ μ²λ¦¬λ instanceλ€λ‘ λ§λ λ€. | |
def result_file_process(model, task, path): | |
with open(path, encoding="utf8") as f: | |
instances = json.loads(f.read()) | |
processed_instances = [] | |
for instance in instances: | |
raw = instance.get("input", False) | |
if raw: | |
source = instance["source"] | |
instruction = instance["instruction"] | |
else: | |
raw = instance.get("source", False) | |
source, instruction = parse_by_regex(instance.get("source", False)) | |
if source is None or instruction is None: | |
print(f"PARSING ERROR IN MODEL {model} TASK {task} PATH {path} SRC {raw}") | |
else: | |
processed_instances.append( | |
{ | |
"model_id": model, | |
"task": task, | |
"instruction": instruction.strip(), | |
"source": source.strip(), | |
"generated": instance["generated_result"], | |
} | |
) | |
return processed_instances | |
# model results λλ ν 리μμ κ²°κ³Όκ° λ³ν μμ | |
def transform_results_folder(input_path, output_path, model_name_pattern, num_instance): | |
regex_pattern = re.compile(model_name_pattern) | |
tasks = list_directories(input_path) | |
models = list_directories(os.path.join(input_path, tasks[0])) | |
models = [model for model in models if regex_pattern.match(model)] | |
model_results = {} | |
print(f"TASKS: {tasks}") | |
print(f"MODELS: {models}") | |
for task in tasks: | |
models = [ | |
model | |
for model in list_directories(os.path.join(input_path, task)) | |
if regex_pattern.match(model) | |
] | |
for model in models: | |
result_path = os.path.join(input_path, task, model, "result.json") | |
model_name = model | |
if task in model: | |
model_name = model.split(f"-{task}-")[0] | |
instances = result_file_process(model_name, task, result_path) | |
if model_name in model_results.keys(): | |
model_results[model_name] += instances | |
else: | |
model_results[model_name] = instances | |
print(f"{task} results processing is over..") | |
for k, v in model_results.items(): | |
print(f"# of instances in {k} is {len(v)}") | |
dataset_by_task = defaultdict(lambda: defaultdict(list)) | |
for data in ( | |
all_datasets := [obj for obj_list in model_results.values() for obj in obj_list] | |
): | |
dataset_by_task[data["task"]][ | |
f"{data['instruction']}\n\n{data['source']}" | |
].append(data) | |
new_results = {model: [] for model in {data["model_id"] for data in all_datasets}} | |
num_model = len(list(new_results.keys())) | |
for task in dataset_by_task.keys(): | |
candidates = [] | |
for data in dataset_by_task[task].values(): | |
if len(data) != num_model: | |
continue | |
candidates.append(data) | |
random.shuffle(candidates) | |
selected = candidates[:num_instance] | |
for data_list in selected: | |
for data in data_list: | |
new_results[data["model_id"]].append(data) | |
for model in new_results.keys(): | |
path = os.path.join(output_path, f"{model}.jsonl") | |
os.makedirs(os.path.dirname(path), exist_ok=True) | |
with open(path, "w", encoding="utf8") as f_out: | |
for instance in new_results[model]: | |
json.dump(instance, f_out, ensure_ascii=False) | |
f_out.write("\n") | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
"-i", "--input_path", type=str, help="path of generated result directory" | |
) | |
parser.add_argument( | |
"-o", "--output_path", type=str, help="path of processed result directory" | |
) | |
parser.add_argument( | |
"-m", | |
"--model_name_pattern", | |
type=str, | |
help="model name's pattern for regex", | |
default="", | |
) | |
parser.add_argument( | |
"-n", "--num_instance", type=int, help="number of instance to choice" | |
) | |
args = parser.parse_args() | |
transform_results_folder( | |
args.input_path, args.output_path, args.model_name_pattern, args.num_instance | |
) | |