|
import gradio as gr |
|
import os |
|
import json |
|
import pandas as pd |
|
import random |
|
import shutil |
|
import time |
|
import collections |
|
from functools import wraps |
|
from filelock import FileLock |
|
from datasets import load_dataset, Audio |
|
from huggingface_hub import HfApi, hf_hub_download |
|
from multiprocessing import TimeoutError |
|
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError |
|
|
|
|
|
dataset = load_dataset("intersteller2887/Turing-test-dataset-en", split="train") |
|
dataset = dataset.cast_column("audio", Audio(decode=False)) |
|
|
|
|
|
target_audio_dir = "/home/user/app/audio" |
|
os.makedirs(target_audio_dir, exist_ok=True) |
|
COUNT_JSON_PATH = "/home/user/app/count.json" |
|
COUNT_JSON_REPO_PATH = "submissions/count.json" |
|
|
|
|
|
local_audio_paths = [] |
|
|
|
for item in dataset: |
|
src_path = item["audio"]["path"] |
|
if src_path and os.path.exists(src_path): |
|
filename = os.path.basename(src_path) |
|
dst_path = os.path.join(target_audio_dir, filename) |
|
if not os.path.exists(dst_path): |
|
shutil.copy(src_path, dst_path) |
|
local_audio_paths.append(dst_path) |
|
|
|
all_data_audio_paths = local_audio_paths |
|
|
|
|
|
sample1_audio_path = local_audio_paths[0] |
|
print(sample1_audio_path) |
|
|
|
|
|
|
|
|
|
|
|
DIMENSIONS_DATA = [ |
|
{ |
|
"title": "Semantic and Pragmatic Features", |
|
"audio": sample1_audio_path, |
|
"sub_dims": [ |
|
"Memory Consistency: Human-like: Consistent memory in short contexts, and asks for clarification when memory deviations occur; Machine-like: Inconsistent memory across contexts and unable to detect or correct errors (e.g., forgetting key information and insisting on incorrect answers)", |
|
"Logical Coherence: Human-like: Natural and smooth logic; Machine-like: Abrupt logical transitions or self-contradictions (e.g., suddenly changing topics without transition)", |
|
"Pronunciation Accuracy: Human-like: Correct and natural pronunciation of words, with proper usage of polyphonic characters based on context; Machine-like: Unnatural pronunciation errors, mispronunciation of common polyphonic characters", |
|
"Multilingual Mixing: Human-like: Multilingual mixing is often context-dependent (e.g., proper nouns, idiomatic expressions), with awkward or unnatural language switching; Machine-like: Rigid multilingual mixing without logical language switching", |
|
"Imprecision in Language: Human-like: Uses vague expressions like 'more or less', 'probably', and may self-correct (e.g., 'no, no'); Machine-like: Rarely uses vague expressions, responses are precise and affirmative", |
|
"Use of Fillers: Human-like: Frequently uses fillers (e.g., 'um', 'like') while thinking; Machine-like: Rare use of fillers or unnatural usage", |
|
"Metaphor and Pragmatic Intent: Human-like: Uses metaphor, irony, and euphemism to convey layered meanings; Machine-like: Literal and direct, lacking semantic diversity, only capable of surface-level interpretation" |
|
], |
|
"reference_scores": [5, 5, 5, 0, 5, 5, 0] |
|
}, |
|
{ |
|
"title": "Non-Physiological Paralinguistic Features", |
|
"audio": sample1_audio_path, |
|
"sub_dims": [ |
|
"Rhythm: Human-like: Speaking rate varies with semantic flow, occasional pauses or hesitations; Machine-like: Almost no pauses or mechanical pauses", |
|
"Intonation: Human-like: Natural pitch rise or fall when expressing questions, surprise, or emphasis; Machine-like: Monotonous or overly regular pitch changes, inappropriate to the context", |
|
"Stress: Human-like: Consciously emphasizes key words to highlight focus; Machine-like: No emphasis on words or abnormal emphasis placement", |
|
"Auxiliary Vocalizations: Human-like: Produces context-appropriate non-verbal sounds, such as laughter or sighs; Machine-like: Contextually incorrect or mechanical auxiliary sounds, or completely absent" |
|
], |
|
"reference_scores": [5, 5, 5, 5] |
|
}, |
|
{ |
|
"title": "Physiological Paralinguistic Features", |
|
"audio": sample1_audio_path, |
|
"sub_dims": [ |
|
"Micro-physiological Noise: Human-like: Presence of breathing sounds, saliva sounds, bubble noise, etc., naturally occurring during speech; Machine-like: Speech is overly clean or emits unnatural noises (e.g., electrical static)", |
|
"Instability in Pronunciation: Human-like: Some irregularities in pronunciation (e.g., liaison, tremolo, slurred speech, nasal sounds); Machine-like: Pronunciation is overly clear and regular", |
|
"Accent: Human-like: Natural regional accent or vocal traits; Machine-like: Stiff or unnatural accent" |
|
], |
|
"reference_scores": [5, 4, 4] |
|
}, |
|
{ |
|
"title": "Mechanical Persona", |
|
"audio": sample1_audio_path, |
|
"sub_dims": [ |
|
"Sycophancy: Human-like: Judges whether to agree with requests or opinions based on context, doesn't always agree or echo; Machine-like: Frequently agrees, thanks, apologizes, excessively aligns with the other’s opinion, lacking genuine interaction", |
|
"Written-style Expression: Human-like: Conversational, flexible, and varied expression; Machine-like: Responses are well-structured and formal, overly formal wording, frequent listing, and vague word choice" |
|
], |
|
"reference_scores": [5, 5] |
|
}, |
|
{ |
|
"title": "Emotional Expression", |
|
"audio": sample1_audio_path, |
|
"sub_dims": [ |
|
"Semantic Level: Human-like: Displays human-like emotional responses to contexts such as sadness or joy; Machine-like: Fails to respond emotionally to the other’s feelings, or uses vague and context-inappropriate emotional language", |
|
"Acoustic Level: Human-like: Pitch, volume, and rhythm dynamically change with emotion; Machine-like: Emotional tone is patterned or context-inappropriate" |
|
], |
|
"reference_scores": [5, 5] |
|
} |
|
] |
|
|
|
|
|
DIMENSION_TITLES = [d["title"] for d in DIMENSIONS_DATA] |
|
SPECIAL_KEYWORDS = ["Multilingual Mixing", "Metaphor and Pragmatic Intent", "Auxiliary Vocalizations", "Accent"] |
|
MAX_SUB_DIMS = max(len(d['sub_dims']) for d in DIMENSIONS_DATA) |
|
THE_SUB_DIMS = [d['sub_dims'] for d in DIMENSIONS_DATA] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
"""def load_or_initialize_count_json(audio_paths): |
|
try: |
|
# Only try downloading if file doesn't exist yet |
|
if not os.path.exists(COUNT_JSON_PATH): |
|
downloaded_path = hf_hub_download( |
|
repo_id="intersteller2887/Turing-test-dataset", |
|
repo_type="dataset", |
|
filename=COUNT_JSON_REPO_PATH, |
|
token=os.getenv("HF_TOKEN") |
|
) |
|
# Save it as COUNT_JSON_PATH so that the lock logic remains untouched |
|
with open(downloaded_path, "rb") as src, open(COUNT_JSON_PATH, "wb") as dst: |
|
dst.write(src.read()) |
|
except Exception as e: |
|
print(f"Could not download count.json from HuggingFace dataset: {e}") |
|
|
|
# Add filelock to /workspace/count.json |
|
lock_path = COUNT_JSON_PATH + ".lock" |
|
|
|
# Read of count.json will wait for 10 seconds until another thread involving releases it, and then add a lock to it |
|
with FileLock(lock_path, timeout=10): |
|
# If count.json exists: load into count_data |
|
# Else initialize count_data with orderedDict |
|
|
|
if os.path.exists(COUNT_JSON_PATH): |
|
with open(COUNT_JSON_PATH, "r", encoding="utf-8") as f: |
|
count_data = json.load(f, object_pairs_hook=collections.OrderedDict) |
|
else: |
|
count_data = collections.OrderedDict() |
|
|
|
updated = False |
|
sample_audio_files = {os.path.basename(d["audio"]) for d in DIMENSIONS_DATA} |
|
|
|
# Guarantee that the sample recording won't be take into the pool |
|
# Update newly updated recordings into count.json |
|
for path in audio_paths: |
|
filename = os.path.basename(path) |
|
if filename not in count_data: |
|
if filename in sample_audio_files: |
|
count_data[filename] = 999 |
|
else: |
|
count_data[filename] = 0 |
|
updated = True |
|
|
|
if updated or not os.path.exists(COUNT_JSON_PATH): |
|
with open(COUNT_JSON_PATH, "w", encoding="utf-8") as f: |
|
json.dump(count_data, f, indent=4, ensure_ascii=False) |
|
|
|
return count_data""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def load_or_initialize_count_json(audio_paths): |
|
|
|
lock_path = COUNT_JSON_PATH + ".lock" |
|
with FileLock(lock_path, timeout=10): |
|
|
|
if not os.path.exists(COUNT_JSON_PATH): |
|
try: |
|
|
|
downloaded_path = hf_hub_download( |
|
repo_id="intersteller2887/Turing-test-dataset-en", |
|
repo_type="dataset", |
|
filename=COUNT_JSON_REPO_PATH, |
|
token=os.getenv("HF_TOKEN") |
|
) |
|
with open(downloaded_path, "rb") as src, open(COUNT_JSON_PATH, "wb") as dst: |
|
dst.write(src.read()) |
|
except Exception: |
|
pass |
|
|
|
|
|
if os.path.exists(COUNT_JSON_PATH): |
|
with open(COUNT_JSON_PATH, "r", encoding="utf-8") as f: |
|
count_data = json.load(f, object_pairs_hook=collections.OrderedDict) |
|
|
|
|
|
else: |
|
count_data = collections.OrderedDict() |
|
|
|
updated = False |
|
sample_audio_files = {os.path.basename(d["audio"]) for d in DIMENSIONS_DATA} |
|
|
|
|
|
|
|
for path in audio_paths: |
|
filename = os.path.basename(path) |
|
if filename not in count_data: |
|
if filename in sample_audio_files: |
|
count_data[filename] = 999 |
|
else: |
|
count_data[filename] = 0 |
|
updated = True |
|
|
|
|
|
if updated or not os.path.exists(COUNT_JSON_PATH): |
|
with open(COUNT_JSON_PATH, "w", encoding="utf-8") as f: |
|
json.dump(count_data, f, indent=4, ensure_ascii=False) |
|
|
|
return |
|
|
|
|
|
def append_cache_buster(audio_path): |
|
return f"{audio_path}?t={int(time.time() * 1000)}" |
|
|
|
|
|
|
|
|
|
"""def sample_audio_paths(audio_paths, count_data, k=5, max_count=1): # k for questions per test; max_count for question limit in total |
|
eligible_paths = [p for p in audio_paths if count_data.get(os.path.basename(p), 0) < max_count] |
|
|
|
if len(eligible_paths) < k: |
|
raise ValueError(f"可用音频数量不足(只剩 {len(eligible_paths)} 条 count<{max_count} 的音频),无法抽取 {k} 条") |
|
|
|
# Shuffule to avoid fixed selections resulted from directory structure |
|
selected = random.sample(eligible_paths, k) |
|
|
|
# Once sampled a test, update these questions immediately |
|
for path in selected: |
|
filename = os.path.basename(path) |
|
count_data[filename] = count_data.get(filename, 0) + 1 |
|
|
|
# Add filelock to /workspace/count.json |
|
lock_path = COUNT_JSON_PATH + ".lock" |
|
with FileLock(lock_path, timeout=10): |
|
with open(COUNT_JSON_PATH, "w", encoding="utf-8") as f: |
|
json.dump(count_data, f, indent=4, ensure_ascii=False) |
|
|
|
return selected, count_data""" |
|
|
|
|
|
def sample_audio_paths(audio_paths, k=5, max_count=1): |
|
|
|
lock_path = COUNT_JSON_PATH + ".lock" |
|
|
|
|
|
with FileLock(lock_path, timeout=10): |
|
with open(COUNT_JSON_PATH, "r", encoding="utf-8") as f: |
|
count_data = json.load(f) |
|
|
|
eligible_paths = [ |
|
p for p in audio_paths |
|
if count_data.get(os.path.basename(p), 0) < max_count |
|
] |
|
|
|
if len(eligible_paths) < k: |
|
raise ValueError(f"可用音频数量不足(只剩 {len(eligible_paths)} 条 count<{max_count} 的音频),无法抽取 {k} 条") |
|
|
|
selected = random.sample(eligible_paths, k) |
|
|
|
|
|
for path in selected: |
|
filename = os.path.basename(path) |
|
count_data[filename] = count_data.get(filename, 0) + 1 |
|
|
|
|
|
with open(COUNT_JSON_PATH, "w", encoding="utf-8") as f: |
|
json.dump(count_data, f, indent=4, ensure_ascii=False) |
|
|
|
|
|
|
|
|
|
return selected |
|
|
|
|
|
|
|
|
|
|
|
|
|
def start_challenge(user_data_state): |
|
|
|
load_or_initialize_count_json(all_data_audio_paths) |
|
|
|
|
|
selected_audio_paths = sample_audio_paths(all_data_audio_paths, k=5) |
|
|
|
question_set = [ |
|
{"audio": path, "desc": f"这是音频文件 {os.path.basename(path)} 的描述"} |
|
for path in selected_audio_paths |
|
] |
|
|
|
user_data_state["question_set"] = question_set |
|
|
|
|
|
|
|
|
|
return gr.update(visible=False), gr.update(visible=True), user_data_state |
|
|
|
|
|
def toggle_education_other(choice): |
|
is_other = (choice == "其他(请注明)") |
|
return gr.update(visible=is_other, interactive=is_other, value="") |
|
|
|
|
|
def check_info_complete(username, age, gender, education, education_other, ai_experience): |
|
if username.strip() and age and gender and education and ai_experience: |
|
if education == "其他(请注明)" and not education_other.strip(): |
|
return gr.update(interactive=False) |
|
return gr.update(interactive=True) |
|
return gr.update(interactive=False) |
|
|
|
|
|
def show_sample_page_and_init(username, age, gender, education, education_other, ai_experience, user_data): |
|
final_edu = education_other if education == "其他(请注明)" else education |
|
user_data.update({ |
|
"username": username.strip(), |
|
"age": age, |
|
"gender": gender, |
|
"education": final_edu, |
|
"ai_experience": ai_experience |
|
}) |
|
first_dim_title = DIMENSION_TITLES[0] |
|
|
|
initial_updates = update_sample_view(first_dim_title) |
|
|
|
return [ |
|
gr.update(visible=False), gr.update(visible=True), user_data, first_dim_title |
|
] + initial_updates |
|
|
|
def update_sample_view(dimension_title): |
|
dim_data = next((d for d in DIMENSIONS_DATA if d["title"] == dimension_title), None) |
|
if dim_data: |
|
audio_up = gr.update(value=dim_data["audio"]) |
|
|
|
interactive_view_up = gr.update(visible=True) |
|
reference_view_up = gr.update(visible=False) |
|
reference_btn_up = gr.update(value="参考") |
|
sample_slider_ups = [] |
|
ref_slider_ups = [] |
|
scores = dim_data.get("reference_scores", []) |
|
|
|
for i in range(MAX_SUB_DIMS): |
|
if i < len(dim_data['sub_dims']): |
|
label = dim_data['sub_dims'][i] |
|
score = scores[i] if i < len(scores) else 0 |
|
sample_slider_ups.append(gr.update(visible=True, label=label, value=0)) |
|
ref_slider_ups.append(gr.update(visible=True, label=label, value=score)) |
|
else: |
|
sample_slider_ups.append(gr.update(visible=False, value=0)) |
|
ref_slider_ups.append(gr.update(visible=False, value=0)) |
|
return [audio_up, interactive_view_up, reference_view_up, reference_btn_up] + sample_slider_ups + ref_slider_ups |
|
empty_updates = [gr.update()] * 4 |
|
slider_empty_updates = [gr.update()] * (MAX_SUB_DIMS * 2) |
|
return empty_updates + slider_empty_updates |
|
|
|
def update_test_dimension_view(d_idx, selections): |
|
|
|
slider_updates = [] |
|
dim_data = DIMENSIONS_DATA[d_idx] |
|
sub_dims = dim_data["sub_dims"] |
|
dim_title = dim_data["title"] |
|
existing_scores = selections.get(dim_data['title'], {}) |
|
progress_d = f"Dimension {d_idx + 1} / {len(DIMENSIONS_DATA)}: **{dim_data['title']}**" |
|
|
|
for i in range(MAX_SUB_DIMS): |
|
if i < len(sub_dims): |
|
desc = sub_dims[i] |
|
|
|
name = desc.split(":")[0].strip() |
|
default_value = 0 if name in SPECIAL_KEYWORDS else 1 |
|
value = existing_scores.get(desc, default_value) |
|
|
|
slider_updates.append(gr.update( |
|
visible=True, |
|
label=desc, |
|
minimum=default_value, |
|
maximum=5, |
|
step=1, |
|
value=value, |
|
interactive=True, |
|
)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
else: |
|
slider_updates.append(gr.update(visible=False)) |
|
print(f"{desc} -> default value: {existing_scores.get(desc, 0)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
prev_btn_update = gr.update(interactive=(d_idx > 0)) |
|
next_btn_update = gr.update( |
|
value="Proceed to Final Judgement" if d_idx == len(DIMENSIONS_DATA) - 1 else "Next Dimension", |
|
interactive=True |
|
) |
|
|
|
return [gr.update(value=progress_d), prev_btn_update, next_btn_update] + slider_updates |
|
|
|
def init_test_question(user_data, q_idx): |
|
d_idx = 0 |
|
question = user_data["question_set"][q_idx] |
|
progress_q = f"第 {q_idx + 1} / {len(user_data['question_set'])} 题" |
|
|
|
initial_updates = update_test_dimension_view(d_idx, {}) |
|
dim_title_update, prev_btn_update, next_btn_update = initial_updates[:3] |
|
slider_updates = initial_updates[3:] |
|
|
|
return ( |
|
gr.update(visible=False), |
|
gr.update(visible=True), |
|
gr.update(visible=False), |
|
gr.update(visible=False), |
|
q_idx, d_idx, {}, |
|
gr.update(value=progress_q), |
|
dim_title_update, |
|
gr.update(value=question['audio']), |
|
|
|
prev_btn_update, |
|
next_btn_update, |
|
gr.update(value=None), |
|
gr.update(interactive=False), |
|
) + tuple(slider_updates) |
|
|
|
def navigate_dimensions(direction, q_idx, d_idx, selections, *slider_values): |
|
current_dim_data = DIMENSIONS_DATA[d_idx] |
|
current_sub_dims = current_dim_data['sub_dims'] |
|
scores = {sub_dim: slider_values[i] for i, sub_dim in enumerate(current_sub_dims)} |
|
selections[current_dim_data['title']] = scores |
|
|
|
new_d_idx = d_idx + (1 if direction == "next" else -1) |
|
|
|
if direction == "next" and d_idx == len(DIMENSIONS_DATA) - 1: |
|
return ( |
|
gr.update(visible=False), |
|
gr.update(visible=True), |
|
q_idx, new_d_idx, selections, |
|
gr.update(), |
|
gr.update(), |
|
gr.update(), |
|
gr.update(interactive=True), |
|
gr.update(interactive=False), |
|
gr.update(interactive=False), |
|
gr.update(interactive=False), |
|
) + (gr.update(),) * MAX_SUB_DIMS |
|
|
|
else: |
|
view_updates = update_test_dimension_view(new_d_idx, selections) |
|
dim_title_update, prev_btn_update, next_btn_update = view_updates[:3] |
|
slider_updates = view_updates[3:] |
|
|
|
return ( |
|
gr.update(), gr.update(), |
|
q_idx, new_d_idx, selections, |
|
gr.update(), |
|
dim_title_update, |
|
gr.update(), |
|
gr.update(), |
|
gr.update(), |
|
prev_btn_update, |
|
next_btn_update, |
|
) + tuple(slider_updates) |
|
|
|
def toggle_reference_view(current): |
|
if current == "参考": |
|
return gr.update(visible=False), gr.update(visible=True), gr.update(value="返回") |
|
else: |
|
return gr.update(visible=True), gr.update(visible=False), gr.update(value="参考") |
|
|
|
def back_to_welcome(): |
|
return ( |
|
gr.update(visible=True), |
|
gr.update(visible=False), |
|
gr.update(visible=False), |
|
gr.update(visible=False), |
|
gr.update(visible=False), |
|
gr.update(visible=False), |
|
gr.update(visible=False), |
|
{}, |
|
0, |
|
0, |
|
{}, |
|
[] |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
def retry_with_timeout(max_retries=3, timeout=10, backoff=1): |
|
def decorator(func): |
|
@wraps(func) |
|
def wrapper(*args, **kwargs): |
|
last_exception = None |
|
for attempt in range(max_retries): |
|
try: |
|
with ThreadPoolExecutor(max_workers=1) as executor: |
|
future = executor.submit(func, *args, **kwargs) |
|
try: |
|
result = future.result(timeout=timeout) |
|
return result |
|
except FutureTimeoutError: |
|
future.cancel() |
|
raise TimeoutError(f"Operation timed out after {timeout} seconds") |
|
except Exception as e: |
|
last_exception = e |
|
print(f"Attempt {attempt + 1} failed: {str(e)}") |
|
if attempt < max_retries - 1: |
|
time.sleep(backoff * (attempt + 1)) |
|
|
|
print(f"All {max_retries} attempts failed") |
|
if last_exception: |
|
raise last_exception |
|
raise Exception("Unknown error occurred") |
|
return wrapper |
|
return decorator |
|
|
|
def save_with_retry(all_results, user_data): |
|
|
|
try: |
|
|
|
with ThreadPoolExecutor(max_workers=1) as executor: |
|
future = executor.submit(save_all_results_to_file, all_results, user_data) |
|
try: |
|
future.result(timeout=30) |
|
return True |
|
except FutureTimeoutError: |
|
future.cancel() |
|
print("上传超时") |
|
return False |
|
except Exception as e: |
|
print(f"上传到Hub失败: {e}") |
|
return False |
|
|
|
def save_locally_with_retry(data, filename, max_retries=3): |
|
for attempt in range(max_retries): |
|
try: |
|
with open(filename, 'w', encoding='utf-8') as f: |
|
json.dump(data, f, indent=4, ensure_ascii=False) |
|
return True |
|
except Exception as e: |
|
print(f"本地保存尝试 {attempt + 1} 失败: {e}") |
|
if attempt < max_retries - 1: |
|
time.sleep(1) |
|
return False |
|
|
|
def update_count_with_retry(count_data, question_set, max_retries=3): |
|
for attempt in range(max_retries): |
|
try: |
|
lock_path = COUNT_JSON_PATH + ".lock" |
|
with FileLock(lock_path, timeout=10): |
|
|
|
for question in question_set: |
|
filename = os.path.basename(question['audio']) |
|
if filename in count_data and count_data[filename] < 1: |
|
count_data[filename] = 0 |
|
|
|
with open(COUNT_JSON_PATH, 'w', encoding='utf-8') as f: |
|
json.dump(count_data, f, indent=4, ensure_ascii=False) |
|
return True |
|
except Exception as e: |
|
print(f"Fail to update count.json {e} for {attempt + 1} time") |
|
if attempt < max_retries - 1: |
|
time.sleep(1) |
|
return False |
|
|
|
|
|
|
|
|
|
"""def submit_question_and_advance(q_idx, d_idx, selections, final_choice, all_results, user_data): |
|
# selections["final_choice"] = final_choice |
|
|
|
cleaned_selections = {} |
|
for dim_title, sub_scores in selections.items(): |
|
# if dim_title == "final_choice": # 去掉if判断 |
|
cleaned_selections["final_choice"] = final_choice |
|
# continue |
|
cleaned_sub_scores = {} |
|
for sub_dim, score in sub_scores.items(): |
|
cleaned_sub_scores[sub_dim] = None if score == 0 else score |
|
cleaned_selections[dim_title] = cleaned_sub_scores |
|
|
|
final_question_result = { |
|
"question_id": q_idx, |
|
"audio_file": user_data["question_set"][q_idx]['audio'], |
|
"selections": cleaned_selections |
|
} |
|
|
|
all_results.append(final_question_result) |
|
|
|
q_idx += 1 |
|
|
|
# If q_idx hasn't reached the last one |
|
if q_idx < len(user_data["question_set"]): |
|
init_q_updates = init_test_question(user_data, q_idx) # Case 1: jam happens when initialize next question |
|
return init_q_updates + (all_results, gr.update(value="")) |
|
# If q_idx has reached the last one |
|
else: |
|
result_str = "### 测试全部完成!\n\n你的提交结果概览:\n" |
|
for res in all_results: |
|
# result_str += f"\n#### 题目: {res['audio_file']}\n" |
|
result_str += f"##### 最终判断: **{res['selections'].get('final_choice', '未选择')}**\n" |
|
for dim_title, dim_data in res['selections'].items(): |
|
if dim_title == 'final_choice': continue |
|
result_str += f"- **{dim_title}**:\n" |
|
for sub_dim, score in dim_data.items(): |
|
result_str += f" - *{sub_dim[:20]}...*: {score}/5\n" |
|
|
|
# save_all_results_to_file(all_results, user_data) |
|
# save_all_results_to_file(all_results, user_data, count_data=updated_count_data) |
|
save_all_results_to_file(all_results, user_data, count_data=user_data.get("updated_count_data")) |
|
|
|
return ( |
|
gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), |
|
q_idx, d_idx, {}, |
|
gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), |
|
gr.update(), gr.update(), |
|
) + (gr.update(),) * MAX_SUB_DIMS + (all_results, result_str)""" |
|
|
|
|
|
def submit_question_and_advance(q_idx, d_idx, selections, final_choice, all_results, user_data): |
|
try: |
|
|
|
cleaned_selections = {} |
|
for dim_title, sub_scores in selections.items(): |
|
cleaned_selections["final_choice"] = final_choice |
|
cleaned_sub_scores = {} |
|
for sub_dim, score in sub_scores.items(): |
|
cleaned_sub_scores[sub_dim] = None if score == 0 else score |
|
cleaned_selections[dim_title] = cleaned_sub_scores |
|
|
|
final_question_result = { |
|
"question_id": q_idx, |
|
"audio_file": user_data["question_set"][q_idx]['audio'], |
|
"selections": cleaned_selections |
|
} |
|
|
|
all_results.append(final_question_result) |
|
q_idx += 1 |
|
|
|
if q_idx < len(user_data["question_set"]): |
|
init_q_updates = init_test_question(user_data, q_idx) |
|
return init_q_updates + (all_results, gr.update(value="")) |
|
else: |
|
|
|
result_str = "### 测试全部完成!\n\n你的提交结果概览:\n" |
|
for res in all_results: |
|
result_str += f"##### 最终判断: **{res['selections'].get('final_choice', '未选择')}**\n" |
|
for dim_title, dim_data in res['selections'].items(): |
|
if dim_title == 'final_choice': continue |
|
result_str += f"- **{dim_title}**:\n" |
|
for sub_dim, score in dim_data.items(): |
|
result_str += f" - *{sub_dim[:20]}...*: {score}/5\n" |
|
|
|
|
|
try: |
|
|
|
success = save_with_retry(all_results, user_data) |
|
except Exception as e: |
|
print(f"上传过程中发生错误: {e}") |
|
success = False |
|
|
|
if not success: |
|
|
|
username = user_data.get("username", "anonymous") |
|
timestamp = pd.Timestamp.now().strftime('%Y%m%d_%H%M%S') |
|
local_filename = f"submission_{username}_{timestamp}.json" |
|
|
|
|
|
user_info_clean = { |
|
k: v for k, v in user_data.items() if k not in ["question_set"] |
|
} |
|
final_data_package = { |
|
"user_info": user_info_clean, |
|
"results": all_results |
|
} |
|
|
|
|
|
local_success = save_locally_with_retry(final_data_package, local_filename) |
|
|
|
if local_success: |
|
result_str += f"\n\n⚠️ 上传失败,结果已保存到本地文件: {local_filename}" |
|
else: |
|
result_str += "\n\n❌ 上传失败且无法保存到本地文件,请联系管理员" |
|
|
|
|
|
try: |
|
with FileLock(COUNT_JSON_PATH + ".lock", timeout=5): |
|
with open(COUNT_JSON_PATH, "r", encoding="utf-8") as f: |
|
count_data = json.load(f, object_pairs_hook=collections.OrderedDict) |
|
count_update_success = update_count_with_retry(count_data, user_data["question_set"]) |
|
except Exception as e: |
|
print(f"更新count.json失败: {e}") |
|
count_update_success = False |
|
|
|
if not count_update_success: |
|
result_str += "\n\n⚠️ 无法更新题目计数,请联系管理员" |
|
|
|
return ( |
|
gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), |
|
q_idx, d_idx, {}, |
|
gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), |
|
gr.update(), gr.update(), |
|
) + (gr.update(),) * MAX_SUB_DIMS + (all_results, result_str) |
|
except Exception as e: |
|
print(f"提交过程中发生错误: {e}") |
|
|
|
error_msg = f"提交过程中发生错误: {str(e)}" |
|
return ( |
|
gr.update(), gr.update(), gr.update(), gr.update(), |
|
q_idx, d_idx, selections, |
|
gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), |
|
gr.update(), gr.update(), |
|
) + (gr.update(),) * MAX_SUB_DIMS + (all_results, error_msg) |
|
|
|
"""def save_all_results_to_file(all_results, user_data, count_data=None): |
|
repo_id = "intersteller2887/Turing-test-dataset" |
|
username = user_data.get("username", "user") |
|
timestamp = pd.Timestamp.now().strftime('%Y%m%d_%H%M%S') |
|
submission_filename = f"submissions_{username}_{timestamp}.json" |
|
|
|
user_info_clean = { |
|
k: v for k, v in user_data.items() if k not in ["question_set", "updated_count_data"] |
|
} |
|
|
|
final_data_package = { |
|
"user_info": user_info_clean, |
|
"results": all_results |
|
} |
|
json_string = json.dumps(final_data_package, ensure_ascii=False, indent=4) |
|
hf_token = os.getenv("HF_TOKEN") |
|
|
|
if not hf_token: |
|
print("HF_TOKEN not found. Cannot upload to the Hub.") |
|
return |
|
|
|
try: |
|
api = HfApi() |
|
|
|
# Upload submission file |
|
api.upload_file( |
|
path_or_fileobj=bytes(json_string, "utf-8"), |
|
path_in_repo=f"submissions/{submission_filename}", |
|
repo_id=repo_id, |
|
repo_type="dataset", |
|
token=hf_token, |
|
commit_message=f"Add new submission from {username}" |
|
) |
|
print(f"上传成功: {submission_filename}") |
|
|
|
if count_data: |
|
with FileLock(COUNT_JSON_PATH + ".lock", timeout=10): |
|
with open(COUNT_JSON_PATH, "w", encoding="utf-8") as f: |
|
json.dump(count_data, f, indent=4, ensure_ascii=False) |
|
|
|
api.upload_file( |
|
path_or_fileobj=COUNT_JSON_PATH, |
|
path_in_repo=COUNT_JSON_REPO_PATH, |
|
repo_id=repo_id, |
|
repo_type="dataset", |
|
token=hf_token, |
|
commit_message=f"Update count.json after submission by {username}" |
|
) |
|
|
|
except Exception as e: |
|
print(f"上传出错: {e}")""" |
|
|
|
def save_all_results_to_file(all_results, user_data): |
|
repo_id = "intersteller2887/Turing-test-dataset-en" |
|
username = user_data.get("username", "user") |
|
timestamp = pd.Timestamp.now().strftime('%Y%m%d_%H%M%S') |
|
submission_filename = f"submissions_{username}_{timestamp}.json" |
|
|
|
user_info_clean = { |
|
k: v for k, v in user_data.items() if k not in ["question_set"] |
|
} |
|
|
|
final_data_package = { |
|
"user_info": user_info_clean, |
|
"results": all_results |
|
} |
|
json_string = json.dumps(final_data_package, ensure_ascii=False, indent=4) |
|
hf_token = os.getenv("HF_TOKEN") |
|
|
|
if not hf_token: |
|
raise Exception("HF_TOKEN not found. Cannot upload to the Hub.") |
|
|
|
api = HfApi() |
|
|
|
|
|
api.upload_file( |
|
path_or_fileobj=bytes(json_string, "utf-8"), |
|
path_in_repo=f"submissions/{submission_filename}", |
|
repo_id=repo_id, |
|
repo_type="dataset", |
|
token=hf_token, |
|
commit_message=f"Add new submission from {username}" |
|
) |
|
|
|
try: |
|
with FileLock(COUNT_JSON_PATH + ".lock", timeout=5): |
|
with open(COUNT_JSON_PATH, "r", encoding="utf-8") as f: |
|
count_data_str = f.read() |
|
|
|
api.upload_file( |
|
path_or_fileobj=bytes(count_data_str, "utf-8"), |
|
path_in_repo=COUNT_JSON_REPO_PATH, |
|
repo_id=repo_id, |
|
repo_type="dataset", |
|
token=hf_token, |
|
commit_message=f"Update count.json after submission by {username}" |
|
) |
|
except Exception as e: |
|
print(f"上传 count.json 失败: {e}") |
|
|
|
|
|
|
|
|
|
with gr.Blocks(theme=gr.themes.Soft(), css=".gradio-container {max-width: 960px !important}") as demo: |
|
user_data_state = gr.State({}) |
|
current_question_index = gr.State(0) |
|
current_test_dimension_index = gr.State(0) |
|
current_question_selections = gr.State({}) |
|
test_results = gr.State([]) |
|
|
|
welcome_page = gr.Column(visible=True) |
|
info_page = gr.Column(visible=False) |
|
sample_page = gr.Column(visible=False) |
|
pretest_page = gr.Column(visible=False) |
|
test_page = gr.Column(visible=False) |
|
final_judgment_page = gr.Column(visible=False) |
|
result_page = gr.Column(visible=False) |
|
pages = { |
|
"welcome": welcome_page, "info": info_page, "sample": sample_page, |
|
"pretest": pretest_page, "test": test_page, "final_judgment": final_judgment_page, |
|
"result": result_page |
|
} |
|
|
|
with welcome_page: |
|
gr.Markdown("# AI 识破者\n你将听到一系列对话,请判断哪个回应者是 AI。") |
|
start_btn = gr.Button("开始挑战", variant="primary") |
|
|
|
with info_page: |
|
gr.Markdown("## 请提供一些基本信息") |
|
username_input = gr.Textbox(label="用户名", placeholder="请输入你的昵称") |
|
age_input = gr.Radio(["18岁以下", "18-25岁", "26-35岁", "36-50岁", "50岁以上"], label="年龄") |
|
gender_input = gr.Radio(["男", "女", "其他"], label="性别") |
|
education_input = gr.Radio(["高中及以下", "本科", "硕士", "博士", "其他"], label="学历") |
|
education_other_input = gr.Textbox(label="请填写你的学历", visible=False, interactive=False) |
|
ai_experience_input = gr.Radio(["从未使用过", "偶尔接触(如看别人用)", "使用过几次,了解基本功能", "经常使用,有一定操作经验", "非常熟悉,深入使用过多个 AI 工具"], label="对 AI 工具的熟悉程度") |
|
submit_info_btn = gr.Button("提交并开始学习样例", variant="primary", interactive=False) |
|
|
|
with sample_page: |
|
|
|
gr.Markdown("## 样例分析\n请选择一个维度进行学习和打分练习。所有维度共用同一个样例音频。") |
|
sample_dimension_selector = gr.Radio(DIMENSION_TITLES, label="选择学习维度", value=DIMENSION_TITLES[0]) |
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
sample_audio = gr.Audio(label="样例音频", value=DIMENSIONS_DATA[0]["audio"]) |
|
with gr.Column(scale=2): |
|
with gr.Column(visible=True) as interactive_view: |
|
gr.Markdown("#### 请为以下特征打分 (0-5分。0-特征无体现;1-机器;3-特征无偏向;5-人类)") |
|
sample_sliders = [gr.Slider(minimum=0, maximum=5, step=1, label=f"Sub-dim {i+1}", visible=False, interactive=True) for i in range(MAX_SUB_DIMS)] |
|
with gr.Column(visible=False) as reference_view: |
|
gr.Markdown("### 参考答案解析") |
|
reference_sliders = [gr.Slider(minimum=0, maximum=5, step=1, label=f"Sub-dim {i+1}", visible=False, interactive=False) for i in range(MAX_SUB_DIMS)] |
|
with gr.Row(): |
|
reference_btn = gr.Button("参考") |
|
go_to_pretest_btn = gr.Button("我明白了,开始测试", variant="primary") |
|
|
|
with pretest_page: |
|
gr.Markdown("## 测试说明\n" |
|
"- 对于每一道题,你都需要对全部 **5 个维度** 进行评估。\n" |
|
"- 在每个维度下,请为出现的每个特征 **从0到5打分**。\n" |
|
"- **评分解释如下:**\n" |
|
" - **0 分:特征未体现** (有些特征一定会体现,所以按1到5打分);\n" |
|
" - **1 分:极度符合机器特征**;\n" |
|
" - **2 分:较为符合机器特征**;\n" |
|
" - **3 分:无明显人类或机器倾向**;\n" |
|
" - **4 分:较为符合人类特征**;\n" |
|
" - **5 分:极度符合人类特征**。\n" |
|
"- 完成所有维度后,请根据整体印象对回应方的身份做出做出“人类”或“机器人”的 **最终判断**。\n" |
|
"- 你可以使用“上一维度”和“下一维度”按钮在5个维度间自由切换和修改分数。\n" |
|
"## 特别注意\n" |
|
"- 我们希望您能判断每个维度上**回应者**的表现是**偏向人还是机器**,分数的大小反映回应者的语音类人的程度,而**不是**这个维度体现的程度多少\n(如读音正确也不代表是人类,读音错误也不代表是机器,您应当判断的是“听到的发音更偏向机器还是人类”)\n" |
|
"- 即使您一开始就已经很肯定回应方的身份,同样应当**独立地**对每个维度上回应方的表现进行细致的评判。比如您很肯定回应方是机器,也需要独立地对每个维度判断,而非简单地将每个维度归为偏机器。") |
|
go_to_test_btn = gr.Button("开始测试", variant="primary") |
|
|
|
with test_page: |
|
gr.Markdown("## 正式测试") |
|
question_progress_text = gr.Markdown() |
|
test_dimension_title = gr.Markdown() |
|
test_audio = gr.Audio(label="测试音频") |
|
gr.Markdown("--- \n ### 请为对话中的回应者(非发起者)针对以下特征打分 (0-5分。0-特征无体现;1-机器;3-特征无偏向;5-人类)") |
|
|
|
test_sliders = [gr.Slider(minimum=0, maximum=5, step=1, label=f"Sub-dim {i+1}", visible=False, interactive=True, show_label = True) for i in range(MAX_SUB_DIMS)] |
|
|
|
with gr.Row(): |
|
prev_dim_btn = gr.Button("上一维度") |
|
next_dim_btn = gr.Button("下一维度", variant="primary") |
|
|
|
with final_judgment_page: |
|
gr.Markdown("## 最终判断") |
|
gr.Markdown("您已完成对所有维度的评分。请根据您的综合印象,做出最终判断。") |
|
final_human_robot_radio = gr.Radio(["👤 人类", "🤖 机器人"], label="请判断回应者类型 (必填)") |
|
submit_final_answer_btn = gr.Button("提交本题答案", variant="primary", interactive=False) |
|
|
|
with result_page: |
|
gr.Markdown("## 测试完成") |
|
result_text = gr.Markdown() |
|
back_to_welcome_btn = gr.Button("返回主界面", variant="primary") |
|
|
|
|
|
|
|
|
|
sample_init_outputs = [ |
|
info_page, sample_page, user_data_state, sample_dimension_selector, |
|
sample_audio, interactive_view, reference_view, reference_btn |
|
] + sample_sliders + reference_sliders |
|
|
|
test_init_outputs = [ |
|
pretest_page, test_page, final_judgment_page, result_page, |
|
current_question_index, current_test_dimension_index, current_question_selections, |
|
question_progress_text, test_dimension_title, test_audio, |
|
prev_dim_btn, next_dim_btn, |
|
final_human_robot_radio, submit_final_answer_btn, |
|
] + test_sliders |
|
|
|
nav_inputs = [current_question_index, current_test_dimension_index, current_question_selections] + test_sliders |
|
nav_outputs = [ |
|
test_page, final_judgment_page, |
|
current_question_index, current_test_dimension_index, current_question_selections, |
|
question_progress_text, test_dimension_title, test_audio, |
|
final_human_robot_radio, submit_final_answer_btn, |
|
prev_dim_btn, next_dim_btn, |
|
] + test_sliders |
|
|
|
full_outputs_with_results = test_init_outputs + [test_results, result_text] |
|
|
|
|
|
start_btn.click( |
|
fn=start_challenge, |
|
inputs=[user_data_state], |
|
outputs=[welcome_page, info_page, user_data_state] |
|
) |
|
|
|
|
|
for comp in [age_input, gender_input, education_input, education_other_input, ai_experience_input]: |
|
comp.change( |
|
fn=check_info_complete, |
|
inputs=[username_input, age_input, gender_input, education_input, education_other_input, ai_experience_input], |
|
outputs=submit_info_btn |
|
) |
|
|
|
education_input.change(fn=toggle_education_other, inputs=education_input, outputs=education_other_input) |
|
|
|
submit_info_btn.click( |
|
fn=show_sample_page_and_init, |
|
inputs=[username_input, age_input, gender_input, education_input, education_other_input, ai_experience_input, user_data_state], |
|
outputs=sample_init_outputs |
|
) |
|
|
|
sample_dimension_selector.change( |
|
fn=update_sample_view, |
|
inputs=sample_dimension_selector, |
|
outputs=[sample_audio, interactive_view, reference_view, reference_btn] + sample_sliders + reference_sliders |
|
) |
|
|
|
reference_btn.click( |
|
fn=toggle_reference_view, |
|
inputs=reference_btn, |
|
outputs=[interactive_view, reference_view, reference_btn] |
|
) |
|
|
|
go_to_pretest_btn.click(lambda: (gr.update(visible=False), gr.update(visible=True)), outputs=[sample_page, pretest_page]) |
|
|
|
go_to_test_btn.click( |
|
fn=lambda user: init_test_question(user, 0) + ([], gr.update()), |
|
inputs=[user_data_state], |
|
outputs=full_outputs_with_results |
|
) |
|
|
|
prev_dim_btn.click( |
|
fn=lambda q,d,s, *sliders: navigate_dimensions("prev", q,d,s, *sliders), |
|
inputs=nav_inputs, outputs=nav_outputs |
|
) |
|
|
|
next_dim_btn.click( |
|
fn=lambda q,d,s, *sliders: navigate_dimensions("next", q,d,s, *sliders), |
|
inputs=nav_inputs, outputs=nav_outputs |
|
) |
|
|
|
final_human_robot_radio.change( |
|
fn=lambda choice: gr.update(interactive=bool(choice)), |
|
inputs=final_human_robot_radio, |
|
outputs=submit_final_answer_btn |
|
) |
|
|
|
submit_final_answer_btn.click( |
|
fn=submit_question_and_advance, |
|
inputs=[current_question_index, current_test_dimension_index, current_question_selections, final_human_robot_radio, test_results, user_data_state], |
|
outputs=full_outputs_with_results |
|
) |
|
|
|
back_to_welcome_btn.click(fn=back_to_welcome, outputs=list(pages.values()) + [user_data_state, current_question_index, current_test_dimension_index, current_question_selections, test_results]) |
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
if not os.path.exists("audio"): |
|
os.makedirs("audio") |
|
if "SPACE_ID" in os.environ: |
|
print("Running in a Hugging Face Space, checking for audio files...") |
|
|
|
all_files = [d["audio"] for d in DIMENSIONS_DATA] |
|
for audio_file in set(all_files): |
|
if not os.path.exists(audio_file): |
|
print(f"⚠️ Warning: Audio file not found: {audio_file}") |
|
|
|
demo.launch(debug=True) |