Spaces:
Sleeping
Sleeping
import gradio as gr | |
import os | |
import json | |
import pandas as pd | |
import random | |
import shutil | |
import time | |
import collections | |
from functools import wraps | |
from filelock import FileLock | |
from datasets import load_dataset, Audio | |
from huggingface_hub import HfApi, hf_hub_download | |
from multiprocessing import TimeoutError | |
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError | |
# Load dataset from HuggingFace | |
dataset = load_dataset("intersteller2887/Turing-test-dataset-en", split="train") | |
dataset = dataset.cast_column("audio", Audio(decode=False)) # Prevent calling 'torchcodec' from newer version of 'datasets' | |
# Huggingface space working directory: "/home/user/app" | |
target_audio_dir = "/home/user/app/audio" | |
os.makedirs(target_audio_dir, exist_ok=True) | |
COUNT_JSON_PATH = "/home/user/app/count.json" | |
COUNT_JSON_REPO_PATH = "submissions/count.json" # Output directory (Huggingface dataset directory) | |
# Copy recordings to the working directory | |
local_audio_paths = [] | |
for item in dataset: | |
src_path = item["audio"]["path"] | |
if src_path and os.path.exists(src_path): | |
filename = os.path.basename(src_path) | |
dst_path = os.path.join(target_audio_dir, filename) | |
if not os.path.exists(dst_path): | |
shutil.copy(src_path, dst_path) | |
local_audio_paths.append(dst_path) | |
all_data_audio_paths = local_audio_paths | |
# Take first file of the datasets as sample | |
sample1_audio_path = local_audio_paths[0] | |
print(sample1_audio_path) | |
# ============================================================================== | |
# Data Definition | |
# ============================================================================== | |
DIMENSIONS_DATA = [ | |
{ | |
"title": "Semantic and Pragmatic Features", | |
"audio": sample1_audio_path, | |
"sub_dims": [ | |
"Memory Consistency: Machine-like: Inconsistent memory across contexts and unable to detect or correct errors (e.g., forgetting key information and insisting on incorrect answers); Human-like: Consistent memory in short contexts, and asks for clarification when memory deviations occur", | |
"Logical Coherence: Machine-like: Abrupt logical transitions or self-contradictions (e.g., suddenly changing topics without transition); Human-like: Natural and smooth logic", | |
"Pronunciation Accuracy: Machine-like: Unnatural pronunciation errors, mispronunciation of heteronyms; Human-like: Correct and natural pronunciation of words, with proper usage of heteronyms based on context", | |
"Code-switching: Machine-like: Rigid multilingual mixing without logical language switching; Human-like: Multilingual mixing is often context-dependent (e.g., proper nouns, idiomatic expressions), and the switching between languages is smooth", | |
"Precision in Expression: Machine-like: Rarely uses vague expressions, responses are precise and affirmative; Human-like: Uses vague expressions like 'more or less', 'probably', and self-correct (e.g., 'no, no')", | |
"Use of Fillers: Machine-like: Rare use of fillers or unnatural usage; Human-like: Frequently uses fillers (e.g., 'um', 'like') while thinking", | |
"Metaphor and Pragmatic Intent: Machine-like: Literal and direct, lacking semantic diversity, only capable of surface-level interpretation; Human-like: Uses metaphor, irony, and euphemism to convey layered meanings" | |
], | |
"reference_scores": [5, 5, 5, 0, 5, 5, 0] | |
}, | |
{ | |
"title": "Non-Physiological Paralinguistic Features", | |
"audio": sample1_audio_path, | |
"sub_dims": [ | |
"Rhythm: Machine-like: Almost no pauses or mechanical pauses; Human-like: Speaking rate varies with semantic flow, occasional pauses or hesitations", | |
"Intonation: Machine-like: Monotonous or overly regular pitch changes, inappropriate to the context; Human-like: Natural pitch rise or fall when expressing questions, surprise, or emphasis", | |
"Stress: Machine-like: No emphasis on words or abnormal emphasis placement; Human-like: Consciously emphasizes key words to highlight focus", | |
"Auxiliary Vocalizations: Machine-like: Contextually incorrect or mechanical auxiliary sounds; Human-like: Produces context-appropriate non-verbal sounds, such as laughter or sighs" | |
], | |
"reference_scores": [5, 5, 5, 5] | |
}, | |
{ | |
"title": "Physiological Paralinguistic Features", | |
"audio": sample1_audio_path, | |
"sub_dims": [ | |
"Micro-physiological Noise: Machine-like: Speech is overly clean or emits unnatural noises (e.g., electrical static); Human-like: Presence of breathing sounds, saliva sounds, bubble noise, etc., naturally occurring during speech", | |
"Instability in Pronunciation: Machine-like: Pronunciation is overly clear and regular; Human-like: Some irregularities in pronunciation (e.g., liaison, tremolo, slurred speech, nasal sounds)", | |
"Accent: Machine-like: Stiff or unnatural accent; Human-like: Natural regional accent or vocal traits" | |
], | |
"reference_scores": [5, 4, 4] | |
}, | |
{ | |
"title": "Mechanical Persona", | |
"audio": sample1_audio_path, | |
"sub_dims": [ | |
"Sycophant Behavior: Machine-like: Frequently agrees, thanks, apologizes, excessively aligns with the other’s opinion, lacking genuine interaction; Human-like: Judges whether to agree with requests or opinions based on context, doesn't always agree or echo", | |
"Written-style Expression: Machine-like: Responses are well-structured and formal, overly formal wording, frequent listing, and vague word choice; Human-like: Conversational, flexible, and varied expression" | |
], | |
"reference_scores": [5, 5] | |
}, | |
{ | |
"title": "Emotional Expression", | |
"audio": sample1_audio_path, | |
"sub_dims": [ | |
"Semantic Level: Machine-like: Fails to respond emotionally to the other’s feelings, or uses vague and context-inappropriate emotional language; Human-like: Displays human-like emotional responses to contexts such as sadness or joy", | |
"Acoustic Level: Machine-like: Emotional tone is patterned or context-inappropriate; Human-like: Pitch, volume, and rhythm dynamically change with emotion" | |
], | |
"reference_scores": [5, 5] | |
} | |
] | |
DIMENSION_TITLES = [d["title"] for d in DIMENSIONS_DATA] | |
SPECIAL_KEYWORDS = ["Code-switching", "Metaphor and Pragmatic Intent", "Accent"] | |
MAX_SUB_DIMS = max(len(d['sub_dims']) for d in DIMENSIONS_DATA) | |
THE_SUB_DIMS = [d['sub_dims'] for d in DIMENSIONS_DATA] | |
# ============================================================================== | |
# Backend Function Definitions | |
# ============================================================================== | |
# This version did not place file reading into filelock, concurrent read could happen | |
"""def load_or_initialize_count_json(audio_paths): | |
try: | |
# Only try downloading if file doesn't exist yet | |
if not os.path.exists(COUNT_JSON_PATH): | |
downloaded_path = hf_hub_download( | |
repo_id="intersteller2887/Turing-test-dataset", | |
repo_type="dataset", | |
filename=COUNT_JSON_REPO_PATH, | |
token=os.getenv("HF_TOKEN") | |
) | |
# Save it as COUNT_JSON_PATH so that the lock logic remains untouched | |
with open(downloaded_path, "rb") as src, open(COUNT_JSON_PATH, "wb") as dst: | |
dst.write(src.read()) | |
except Exception as e: | |
print(f"Could not download count.json from HuggingFace dataset: {e}") | |
# Add filelock to /workspace/count.json | |
lock_path = COUNT_JSON_PATH + ".lock" | |
# Read of count.json will wait for 10 seconds until another thread involving releases it, and then add a lock to it | |
with FileLock(lock_path, timeout=10): | |
# If count.json exists: load into count_data | |
# Else initialize count_data with orderedDict | |
if os.path.exists(COUNT_JSON_PATH): | |
with open(COUNT_JSON_PATH, "r", encoding="utf-8") as f: | |
count_data = json.load(f, object_pairs_hook=collections.OrderedDict) | |
else: | |
count_data = collections.OrderedDict() | |
updated = False | |
sample_audio_files = {os.path.basename(d["audio"]) for d in DIMENSIONS_DATA} | |
# Guarantee that the sample recording won't be take into the pool | |
# Update newly updated recordings into count.json | |
for path in audio_paths: | |
filename = os.path.basename(path) | |
if filename not in count_data: | |
if filename in sample_audio_files: | |
count_data[filename] = 999 | |
else: | |
count_data[filename] = 0 | |
updated = True | |
if updated or not os.path.exists(COUNT_JSON_PATH): | |
with open(COUNT_JSON_PATH, "w", encoding="utf-8") as f: | |
json.dump(count_data, f, indent=4, ensure_ascii=False) | |
return count_data""" | |
# Function that load or initialize count.json | |
# Function is called when user start a challenge, and this will load or initialize count.json to working directory | |
# Initialize happens when count.json does not exist in the working directory as well as HuggingFace dataset | |
# Load happens when count.json exists in HuggingFace dataset, and it's not loaded to the working directory yet | |
# After load/initialize, all newly added audio files will be added to count.json with initial value of 0 | |
# Load/Initialize will generate count.json in the working directory for all users under this space | |
# This version also places file reading into filelock, and modified | |
def load_or_initialize_count_json(audio_paths): | |
# Add filelock to /workspace/count.json | |
lock_path = COUNT_JSON_PATH + ".lock" | |
with FileLock(lock_path, timeout=10): | |
# If count.json does not exist in the working directory, try to download it from HuggingFace dataset | |
if not os.path.exists(COUNT_JSON_PATH): | |
try: | |
# Save latest count.json to working directory | |
downloaded_path = hf_hub_download( | |
repo_id="intersteller2887/Turing-test-dataset-en", | |
repo_type="dataset", | |
filename=COUNT_JSON_REPO_PATH, | |
token=os.getenv("HF_TOKEN") | |
) | |
with open(downloaded_path, "rb") as src, open(COUNT_JSON_PATH, "wb") as dst: | |
dst.write(src.read()) | |
except Exception: | |
pass | |
# If count.json exists in the working directory: load into count_data for potential update | |
if os.path.exists(COUNT_JSON_PATH): | |
with open(COUNT_JSON_PATH, "r", encoding="utf-8") as f: | |
count_data = json.load(f, object_pairs_hook=collections.OrderedDict) | |
# Else initialize count_data with orderedDict | |
# This happens when there is no count.json (both working directory and HuggingFace dataset) | |
else: | |
count_data = collections.OrderedDict() | |
updated = False | |
sample_audio_files = {os.path.basename(d["audio"]) for d in DIMENSIONS_DATA} | |
# Guarantee that the sample recording won't be take into the pool | |
# Update newly updated recordings into count.json | |
for path in audio_paths: | |
filename = os.path.basename(path) | |
if filename not in count_data: | |
if filename in sample_audio_files: | |
count_data[filename] = 999 | |
else: | |
count_data[filename] = 0 | |
updated = True | |
# Write updated count_data to /home/user/app/count.json | |
if updated or not os.path.exists(COUNT_JSON_PATH): | |
with open(COUNT_JSON_PATH, "w", encoding="utf-8") as f: | |
json.dump(count_data, f, indent=4, ensure_ascii=False) | |
return | |
# Shorten the time of playing previous audio when reached next question | |
def append_cache_buster(audio_path): | |
return f"{audio_path}?t={int(time.time() * 1000)}" | |
# Function that samples questions from avaliable question set | |
# This version utilizes a given count_data to sample audio paths | |
"""def sample_audio_paths(audio_paths, count_data, k=5, max_count=1): # k for questions per test; max_count for question limit in total | |
eligible_paths = [p for p in audio_paths if count_data.get(os.path.basename(p), 0) < max_count] | |
if len(eligible_paths) < k: | |
raise ValueError(f"可用音频数量不足(只剩 {len(eligible_paths)} 条 count<{max_count} 的音频),无法抽取 {k} 条") | |
# Shuffule to avoid fixed selections resulted from directory structure | |
selected = random.sample(eligible_paths, k) | |
# Once sampled a test, update these questions immediately | |
for path in selected: | |
filename = os.path.basename(path) | |
count_data[filename] = count_data.get(filename, 0) + 1 | |
# Add filelock to /workspace/count.json | |
lock_path = COUNT_JSON_PATH + ".lock" | |
with FileLock(lock_path, timeout=10): | |
with open(COUNT_JSON_PATH, "w", encoding="utf-8") as f: | |
json.dump(count_data, f, indent=4, ensure_ascii=False) | |
return selected, count_data""" | |
# This version places file reading into filelock to guarantee correct update of count.json | |
def sample_audio_paths(audio_paths, k=5, max_count=1): | |
# Add filelock to /workspace/count.json | |
lock_path = COUNT_JSON_PATH + ".lock" | |
# Load newest count.json | |
with FileLock(lock_path, timeout=10): | |
with open(COUNT_JSON_PATH, "r", encoding="utf-8") as f: | |
count_data = json.load(f) | |
eligible_paths = [ | |
p for p in audio_paths | |
if count_data.get(os.path.basename(p), 0) < max_count | |
] | |
if len(eligible_paths) < k: | |
raise ValueError(f"可用音频数量不足(只剩 {len(eligible_paths)} 条 count<{max_count} 的音频),无法抽取 {k} 条") | |
selected = random.sample(eligible_paths, k) | |
# Update count_data | |
for path in selected: | |
filename = os.path.basename(path) | |
count_data[filename] = count_data.get(filename, 0) + 1 | |
# Update count.json | |
with open(COUNT_JSON_PATH, "w", encoding="utf-8") as f: | |
json.dump(count_data, f, indent=4, ensure_ascii=False) | |
# return selected, count_data | |
# Keep count_data atomic | |
return selected | |
# ============================================================================== | |
# Frontend Function Definitions | |
# ============================================================================== | |
# Save question_set in each user_data_state, preventing global sharing | |
def start_challenge(user_data_state): | |
load_or_initialize_count_json(all_data_audio_paths) | |
# selected_audio_paths, updated_count_data = sample_audio_paths(all_data_audio_paths, k=5) | |
# Keep count_data atomic | |
selected_audio_paths = sample_audio_paths(all_data_audio_paths, k=5) | |
question_set = [ | |
{"audio": path, "desc": f"这是音频文件 {os.path.basename(path)} 的描述"} | |
for path in selected_audio_paths | |
] | |
user_data_state["question_set"] = question_set | |
# count_data is not needed in the user data | |
# user_data_state["updated_count_data"] = updated_count_data | |
return gr.update(visible=False), gr.update(visible=True), user_data_state | |
# This function toggles the visibility of the "其他(请注明)" input field based on the selected education choice | |
def toggle_education_other(choice): | |
is_other = (choice == "其他(请注明)") | |
return gr.update(visible=is_other, interactive=is_other, value="") | |
# This function checks if the user information is complete | |
def check_info_complete(username, age, gender, education, education_other, ai_experience): | |
if username.strip() and age and gender and education and ai_experience: | |
if education == "其他(请注明)" and not education_other.strip(): | |
return gr.update(interactive=False) | |
return gr.update(interactive=True) | |
return gr.update(interactive=False) | |
# This function updates user_data and initializes the sample page (called when user submits their info) | |
def show_sample_page_and_init(username, age, gender, education, education_other, ai_experience, user_data): | |
final_edu = education_other if education == "其他(请注明)" else education | |
user_data.update({ | |
"username": username.strip(), | |
"age": age, | |
"gender": gender, | |
"education": final_edu, | |
"ai_experience": ai_experience | |
}) | |
first_dim_title = DIMENSION_TITLES[0] | |
initial_updates = update_sample_view(first_dim_title) | |
return [ | |
gr.update(visible=False), gr.update(visible=True), user_data, first_dim_title | |
] + initial_updates | |
def update_sample_view(dimension_title): | |
dim_data = next((d for d in DIMENSIONS_DATA if d["title"] == dimension_title), None) | |
if dim_data: | |
audio_up = gr.update(value=dim_data["audio"]) | |
# audio_up = gr.update(value=append_cache_buster(dim_data["audio"])) | |
interactive_view_up = gr.update(visible=True) | |
reference_view_up = gr.update(visible=False) | |
reference_btn_up = gr.update(value="Reference") | |
sample_slider_ups = [] | |
ref_slider_ups = [] | |
scores = dim_data.get("reference_scores", []) | |
for i in range(MAX_SUB_DIMS): | |
if i < len(dim_data['sub_dims']): | |
label = dim_data['sub_dims'][i] | |
score = scores[i] if i < len(scores) else 0 | |
sample_slider_ups.append(gr.update(visible=True, label=label, value=0)) | |
ref_slider_ups.append(gr.update(visible=True, label=label, value=score)) | |
else: | |
sample_slider_ups.append(gr.update(visible=False, value=0)) | |
ref_slider_ups.append(gr.update(visible=False, value=0)) | |
return [audio_up, interactive_view_up, reference_view_up, reference_btn_up] + sample_slider_ups + ref_slider_ups | |
empty_updates = [gr.update()] * 4 | |
slider_empty_updates = [gr.update()] * (MAX_SUB_DIMS * 2) | |
return empty_updates + slider_empty_updates | |
def update_test_dimension_view(d_idx, selections): | |
# dimension = DIMENSIONS_DATA[d_idx] | |
slider_updates = [] | |
dim_data = DIMENSIONS_DATA[d_idx] | |
sub_dims = dim_data["sub_dims"] | |
dim_title = dim_data["title"] | |
existing_scores = selections.get(dim_data['title'], {}) | |
progress_d = f"Dimension {d_idx + 1} / {len(DIMENSIONS_DATA)}: **{dim_data['title']}**" | |
for i in range(MAX_SUB_DIMS): | |
if i < len(sub_dims): | |
desc = sub_dims[i] | |
# print(f"{desc} -> default value: {existing_scores.get(desc, 0)}") | |
name = desc.split(":")[0].strip() | |
default_value = 0 if name in SPECIAL_KEYWORDS else 1 | |
value = existing_scores.get(desc, default_value) | |
slider_updates.append(gr.update( | |
visible=True, | |
label=desc, | |
minimum=default_value, | |
maximum=5, | |
step=1, | |
value=value, | |
interactive=True, | |
)) | |
# slider_updates.append(gr.update( | |
# visible=True, | |
# label=desc, | |
# minimum=0 if name in SPECIAL_KEYWORDS else 1, | |
# maximum=5, | |
# value = existing_scores.get(desc, 0), | |
# interactive=True, | |
# )) | |
else: | |
slider_updates.append(gr.update(visible=False)) | |
# print(f"{desc} -> default value: {existing_scores.get(desc, 0)}") | |
# for i in range(MAX_SUB_DIMS): | |
# if i < len(dimension['sub_dims']): | |
# sub_dim_label = dimension['sub_dims'][i] | |
# value = existing_scores.get(sub_dim_label, 0) | |
# slider_updates.append(gr.update(visible=True, label=sub_dim_label, value=value)) | |
# else: | |
# slider_updates.append(gr.update(visible=False, value=0)) | |
prev_btn_update = gr.update(interactive=(d_idx > 0)) | |
next_btn_update = gr.update( | |
value="Proceed to Final Judgement" if d_idx == len(DIMENSIONS_DATA) - 1 else "Next Dimension", | |
interactive=True | |
) | |
return [gr.update(value=progress_d), prev_btn_update, next_btn_update] + slider_updates | |
def init_test_question(user_data, q_idx): | |
d_idx = 0 | |
question = user_data["question_set"][q_idx] | |
progress_q = f"Question {q_idx + 1} / {len(user_data['question_set'])}" | |
initial_updates = update_test_dimension_view(d_idx, {}) | |
dim_title_update, prev_btn_update, next_btn_update = initial_updates[:3] | |
slider_updates = initial_updates[3:] | |
return ( | |
gr.update(visible=False), | |
gr.update(visible=True), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
q_idx, d_idx, {}, | |
gr.update(value=progress_q), | |
dim_title_update, | |
gr.update(value=question['audio']), | |
# gr.update(value=append_cache_buster(question['audio'])), | |
prev_btn_update, | |
next_btn_update, | |
gr.update(value=None), # BUG FIX: Changed from "" to None to correctly clear the radio button | |
gr.update(interactive=False), | |
) + tuple(slider_updates) | |
def navigate_dimensions(direction, q_idx, d_idx, selections, *slider_values): | |
current_dim_data = DIMENSIONS_DATA[d_idx] | |
current_sub_dims = current_dim_data['sub_dims'] | |
scores = {sub_dim: slider_values[i] for i, sub_dim in enumerate(current_sub_dims)} | |
selections[current_dim_data['title']] = scores | |
new_d_idx = d_idx + (1 if direction == "next" else -1) | |
if direction == "next" and d_idx == len(DIMENSIONS_DATA) - 1: | |
return ( | |
gr.update(visible=False), | |
gr.update(visible=True), | |
q_idx, new_d_idx, selections, | |
gr.update(), | |
gr.update(), | |
gr.update(), | |
gr.update(interactive=True), | |
gr.update(interactive=False), | |
gr.update(interactive=False), | |
gr.update(interactive=False), | |
) + (gr.update(),) * MAX_SUB_DIMS | |
else: | |
view_updates = update_test_dimension_view(new_d_idx, selections) | |
dim_title_update, prev_btn_update, next_btn_update = view_updates[:3] | |
slider_updates = view_updates[3:] | |
return ( | |
gr.update(), gr.update(), | |
q_idx, new_d_idx, selections, | |
gr.update(), | |
dim_title_update, | |
gr.update(), | |
gr.update(), | |
gr.update(), | |
prev_btn_update, | |
next_btn_update, | |
) + tuple(slider_updates) | |
def toggle_reference_view(current): | |
if current == "Reference": | |
return gr.update(visible=False), gr.update(visible=True), gr.update(value="Back") | |
else: | |
return gr.update(visible=True), gr.update(visible=False), gr.update(value="Reference") | |
def back_to_welcome(): | |
return ( | |
gr.update(visible=True), # welcome_page | |
gr.update(visible=False), # info_page | |
gr.update(visible=False), # sample_page | |
gr.update(visible=False), # pretest_page | |
gr.update(visible=False), # test_page | |
gr.update(visible=False), # final_judgment_page | |
gr.update(visible=False), # result_page | |
{}, # user_data_state | |
0, # current_question_index | |
0, # current_test_dimension_index | |
{}, # current_question_selections | |
[] # test_results | |
) | |
# ============================================================================== | |
# Retry Function Definitions | |
# ============================================================================== | |
# Decorator function that allows to use ThreadPoolExecutor to retry a function with timeout | |
def retry_with_timeout(max_retries=3, timeout=10, backoff=1): | |
def decorator(func): | |
def wrapper(*args, **kwargs): | |
last_exception = None | |
for attempt in range(max_retries): | |
try: | |
with ThreadPoolExecutor(max_workers=1) as executor: | |
future = executor.submit(func, *args, **kwargs) | |
try: | |
result = future.result(timeout=timeout) | |
return result | |
except FutureTimeoutError: | |
future.cancel() | |
raise TimeoutError(f"Operation timed out after {timeout} seconds") | |
except Exception as e: | |
last_exception = e | |
print(f"Attempt {attempt + 1} failed: {str(e)}") | |
if attempt < max_retries - 1: | |
time.sleep(backoff * (attempt + 1)) | |
print(f"All {max_retries} attempts failed") | |
if last_exception: | |
raise last_exception | |
raise Exception("Unknown error occurred") | |
return wrapper | |
return decorator | |
def save_with_retry(all_results, user_data): | |
# 尝试上传到Hugging Face Hub | |
try: | |
# 使用线程安全的保存方式 | |
with ThreadPoolExecutor(max_workers=1) as executor: | |
future = executor.submit(save_all_results_to_file, all_results, user_data) | |
try: | |
future.result(timeout=30) # 设置30秒超时 | |
return True | |
except FutureTimeoutError: | |
future.cancel() | |
print("上传超时") | |
return False | |
except Exception as e: | |
print(f"上传到Hub失败: {e}") | |
return False | |
def save_locally_with_retry(data, filename, max_retries=3): | |
for attempt in range(max_retries): | |
try: | |
with open(filename, 'w', encoding='utf-8') as f: | |
json.dump(data, f, indent=4, ensure_ascii=False) | |
return True | |
except Exception as e: | |
print(f"本地保存尝试 {attempt + 1} 失败: {e}") | |
if attempt < max_retries - 1: | |
time.sleep(1) | |
return False | |
def update_count_with_retry(count_data, question_set, max_retries=3): | |
for attempt in range(max_retries): | |
try: | |
lock_path = COUNT_JSON_PATH + ".lock" | |
with FileLock(lock_path, timeout=10): | |
# Remove unfinished question(s) from count.json | |
for question in question_set: | |
filename = os.path.basename(question['audio']) | |
if filename in count_data and count_data[filename] < 1: | |
count_data[filename] = 0 # Mark unfinished data as 0 | |
with open(COUNT_JSON_PATH, 'w', encoding='utf-8') as f: | |
json.dump(count_data, f, indent=4, ensure_ascii=False) | |
return True | |
except Exception as e: | |
print(f"Fail to update count.json {e} for {attempt + 1} time") | |
if attempt < max_retries - 1: | |
time.sleep(1) | |
return False | |
# ============================================================================== | |
# Previous version of submit_question_and_advance | |
"""def submit_question_and_advance(q_idx, d_idx, selections, final_choice, all_results, user_data): | |
# selections["final_choice"] = final_choice | |
cleaned_selections = {} | |
for dim_title, sub_scores in selections.items(): | |
# if dim_title == "final_choice": # 去掉if判断 | |
cleaned_selections["final_choice"] = final_choice | |
# continue | |
cleaned_sub_scores = {} | |
for sub_dim, score in sub_scores.items(): | |
cleaned_sub_scores[sub_dim] = None if score == 0 else score | |
cleaned_selections[dim_title] = cleaned_sub_scores | |
final_question_result = { | |
"question_id": q_idx, | |
"audio_file": user_data["question_set"][q_idx]['audio'], | |
"selections": cleaned_selections | |
} | |
all_results.append(final_question_result) | |
q_idx += 1 | |
# If q_idx hasn't reached the last one | |
if q_idx < len(user_data["question_set"]): | |
init_q_updates = init_test_question(user_data, q_idx) # Case 1: jam happens when initialize next question | |
return init_q_updates + (all_results, gr.update(value="")) | |
# If q_idx has reached the last one | |
else: | |
result_str = "### 测试全部完成!\n\n你的提交结果概览:\n" | |
for res in all_results: | |
# result_str += f"\n#### 题目: {res['audio_file']}\n" | |
result_str += f"##### 最终判断: **{res['selections'].get('final_choice', '未选择')}**\n" | |
for dim_title, dim_data in res['selections'].items(): | |
if dim_title == 'final_choice': continue | |
result_str += f"- **{dim_title}**:\n" | |
for sub_dim, score in dim_data.items(): | |
result_str += f" - *{sub_dim[:20]}...*: {score}/5\n" | |
# save_all_results_to_file(all_results, user_data) | |
# save_all_results_to_file(all_results, user_data, count_data=updated_count_data) | |
save_all_results_to_file(all_results, user_data, count_data=user_data.get("updated_count_data")) | |
return ( | |
gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), | |
q_idx, d_idx, {}, | |
gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), | |
gr.update(), gr.update(), | |
) + (gr.update(),) * MAX_SUB_DIMS + (all_results, result_str)""" | |
# user_data now no further contain "updated_count_data", which should be read/write with filelock and be directly accessed from working directory | |
def submit_question_and_advance(q_idx, d_idx, selections, final_choice, all_results, user_data): | |
try: | |
# 准备数据 | |
cleaned_selections = {} | |
for dim_title, sub_scores in selections.items(): | |
cleaned_selections["final_choice"] = final_choice | |
cleaned_sub_scores = {} | |
for sub_dim, score in sub_scores.items(): | |
cleaned_sub_scores[sub_dim] = None if score == 0 else score | |
cleaned_selections[dim_title] = cleaned_sub_scores | |
final_question_result = { | |
"question_id": q_idx, | |
"audio_file": user_data["question_set"][q_idx]['audio'], | |
"selections": cleaned_selections | |
} | |
all_results.append(final_question_result) | |
q_idx += 1 | |
if q_idx < len(user_data["question_set"]): | |
init_q_updates = init_test_question(user_data, q_idx) | |
return init_q_updates + (all_results, gr.update(value="")) | |
else: | |
# 准备完整结果数据 | |
result_str = "### Test Completed!\n\nOverview of your submission:\n" | |
for res in all_results: | |
result_str += f"##### Final Judgement: **{res['selections'].get('final_choice', 'empty')}**\n" # empty == no choice | |
for dim_title, dim_data in res['selections'].items(): | |
if dim_title == 'final_choice': continue | |
result_str += f"- **{dim_title}**:\n" | |
for sub_dim, score in dim_data.items(): | |
result_str += f" - *{sub_dim[:20]}...*: {score}/5\n" | |
# 尝试上传(带重试) | |
try: | |
# success = save_with_retry(all_results, user_data, user_data.get("updated_count_data")) | |
success = save_with_retry(all_results, user_data) | |
except Exception as e: | |
print(f"上传过程中发生错误: {e}") | |
success = False | |
if not success: | |
# 上传失败,保存到本地 | |
username = user_data.get("username", "anonymous") | |
timestamp = pd.Timestamp.now().strftime('%Y%m%d_%H%M%S') | |
local_filename = f"submission_{username}_{timestamp}.json" | |
# 准备数据包 | |
user_info_clean = { | |
k: v for k, v in user_data.items() if k not in ["question_set"] | |
} | |
final_data_package = { | |
"user_info": user_info_clean, | |
"results": all_results | |
} | |
# 尝试保存到本地 | |
local_success = save_locally_with_retry(final_data_package, local_filename) | |
if local_success: | |
result_str += f"\n\n⚠️ 上传失败,结果已保存到本地文件: {local_filename}" | |
else: | |
result_str += "\n\n❌ 上传失败且无法保存到本地文件,请联系管理员" | |
# 更新count.json(剔除未完成的题目) | |
try: | |
with FileLock(COUNT_JSON_PATH + ".lock", timeout=5): | |
with open(COUNT_JSON_PATH, "r", encoding="utf-8") as f: | |
count_data = json.load(f, object_pairs_hook=collections.OrderedDict) | |
count_update_success = update_count_with_retry(count_data, user_data["question_set"]) | |
except Exception as e: | |
print(f"更新count.json失败: {e}") | |
count_update_success = False | |
if not count_update_success: | |
result_str += "\n\n⚠️ 无法更新题目计数,请联系管理员" | |
return ( | |
gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), | |
q_idx, d_idx, {}, | |
gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), | |
gr.update(), gr.update(), | |
) + (gr.update(),) * MAX_SUB_DIMS + (all_results, result_str) | |
except Exception as e: | |
print(f"提交过程中发生错误: {e}") | |
# 返回错误信息 | |
error_msg = f"提交过程中发生错误: {str(e)}" | |
return ( | |
gr.update(), gr.update(), gr.update(), gr.update(), | |
q_idx, d_idx, selections, | |
gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), | |
gr.update(), gr.update(), | |
) + (gr.update(),) * MAX_SUB_DIMS + (all_results, error_msg) | |
"""def save_all_results_to_file(all_results, user_data, count_data=None): | |
repo_id = "intersteller2887/Turing-test-dataset" | |
username = user_data.get("username", "user") | |
timestamp = pd.Timestamp.now().strftime('%Y%m%d_%H%M%S') | |
submission_filename = f"submissions_{username}_{timestamp}.json" | |
user_info_clean = { | |
k: v for k, v in user_data.items() if k not in ["question_set", "updated_count_data"] | |
} | |
final_data_package = { | |
"user_info": user_info_clean, | |
"results": all_results | |
} | |
json_string = json.dumps(final_data_package, ensure_ascii=False, indent=4) | |
hf_token = os.getenv("HF_TOKEN") | |
if not hf_token: | |
print("HF_TOKEN not found. Cannot upload to the Hub.") | |
return | |
try: | |
api = HfApi() | |
# Upload submission file | |
api.upload_file( | |
path_or_fileobj=bytes(json_string, "utf-8"), | |
path_in_repo=f"submissions/{submission_filename}", | |
repo_id=repo_id, | |
repo_type="dataset", | |
token=hf_token, | |
commit_message=f"Add new submission from {username}" | |
) | |
print(f"上传成功: {submission_filename}") | |
if count_data: | |
with FileLock(COUNT_JSON_PATH + ".lock", timeout=10): | |
with open(COUNT_JSON_PATH, "w", encoding="utf-8") as f: | |
json.dump(count_data, f, indent=4, ensure_ascii=False) | |
api.upload_file( | |
path_or_fileobj=COUNT_JSON_PATH, | |
path_in_repo=COUNT_JSON_REPO_PATH, | |
repo_id=repo_id, | |
repo_type="dataset", | |
token=hf_token, | |
commit_message=f"Update count.json after submission by {username}" | |
) | |
except Exception as e: | |
print(f"上传出错: {e}")""" | |
def save_all_results_to_file(all_results, user_data): | |
repo_id = "intersteller2887/Turing-test-dataset-en" | |
username = user_data.get("username", "user") | |
timestamp = pd.Timestamp.now().strftime('%Y%m%d_%H%M%S') | |
submission_filename = f"submissions_{username}_{timestamp}.json" | |
user_info_clean = { | |
k: v for k, v in user_data.items() if k not in ["question_set"] | |
} | |
final_data_package = { | |
"user_info": user_info_clean, | |
"results": all_results | |
} | |
json_string = json.dumps(final_data_package, ensure_ascii=False, indent=4) | |
hf_token = os.getenv("HF_TOKEN") | |
if not hf_token: | |
raise Exception("HF_TOKEN not found. Cannot upload to the Hub.") | |
api = HfApi() | |
# 上传提交文件(不再使用装饰器,直接调用) | |
api.upload_file( | |
path_or_fileobj=bytes(json_string, "utf-8"), | |
path_in_repo=f"submissions/{submission_filename}", | |
repo_id=repo_id, | |
repo_type="dataset", | |
token=hf_token, | |
commit_message=f"Add new submission from {username}" | |
) | |
try: | |
with FileLock(COUNT_JSON_PATH + ".lock", timeout=5): | |
with open(COUNT_JSON_PATH, "r", encoding="utf-8") as f: | |
count_data_str = f.read() | |
api.upload_file( | |
path_or_fileobj=bytes(count_data_str, "utf-8"), | |
path_in_repo=COUNT_JSON_REPO_PATH, | |
repo_id=repo_id, | |
repo_type="dataset", | |
token=hf_token, | |
commit_message=f"Update count.json after submission by {username}" | |
) | |
except Exception as e: | |
print(f"上传 count.json 失败: {e}") | |
# ============================================================================== | |
# Gradio 界面定义 (Gradio UI Definition) | |
# ============================================================================== | |
with gr.Blocks(theme=gr.themes.Soft(), css=".gradio-container {max-width: 960px !important}") as demo: | |
user_data_state = gr.State({}) | |
current_question_index = gr.State(0) | |
current_test_dimension_index = gr.State(0) | |
current_question_selections = gr.State({}) | |
test_results = gr.State([]) | |
welcome_page = gr.Column(visible=True) | |
info_page = gr.Column(visible=False) | |
sample_page = gr.Column(visible=False) | |
pretest_page = gr.Column(visible=False) | |
test_page = gr.Column(visible=False) | |
final_judgment_page = gr.Column(visible=False) | |
result_page = gr.Column(visible=False) | |
pages = { | |
"welcome": welcome_page, "info": info_page, "sample": sample_page, | |
"pretest": pretest_page, "test": test_page, "final_judgment": final_judgment_page, | |
"result": result_page | |
} | |
with welcome_page: | |
gr.Markdown("# Can you spot the hidden AI?\nListen to the following conversations. Try to tell which respondent is an AI.") | |
start_btn = gr.Button("Start", variant="primary") | |
with info_page: | |
gr.Markdown("## Basic Information") | |
username_input = gr.Textbox(label="Username", placeholder="Please enter your nickname") | |
age_input = gr.Radio(["Under 18", "18-25", "26-35", "36-50", "Over 50"], label="Age") | |
gender_input = gr.Radio(["Male", "Female", "Other"], label="Gender") | |
education_input = gr.Radio(["High school or below", "Bachelor", "Master", "PhD", "Other (please specify)"], label="Education Level") | |
education_other_input = gr.Textbox(label="Please enter your education", visible=False, interactive=False) | |
ai_experience_input = gr.Radio([ | |
"Never used", | |
"Occasionally exposed (e.g., watching others use)", | |
"Used a few times, understand basic functions", | |
"Use frequently, have some experience", | |
"Very familiar, have in-depth experience with multiple AI tools" | |
], label="Familiarity with AI Tools") | |
submit_info_btn = gr.Button("Submit and Start Learning Sample", variant="primary", interactive=False) | |
with sample_page: | |
gr.Markdown("## Sample Analysis\nPlease select a dimension to study and practice scoring. All dimensions share the same sample audio.") | |
sample_dimension_selector = gr.Radio(DIMENSION_TITLES, label="Select Learning Dimension", value=DIMENSION_TITLES[0]) | |
with gr.Row(): | |
with gr.Column(scale=1): | |
sample_audio = gr.Audio(label="Sample Audio", value=DIMENSIONS_DATA[0]["audio"]) | |
# sample_audio = gr.Audio(label="Sample Audio", value=sample1_audio_path) | |
with gr.Column(scale=2): | |
with gr.Column(visible=True) as interactive_view: | |
gr.Markdown("#### Please rate the following features (0-5 points. 0 - Feature not present; 1 - Machine; 3 - Neutral; 5 - Human)") | |
sample_sliders = [gr.Slider(minimum=0, maximum=5, step=1, label=f"Sub-dim {i+1}", visible=False, interactive=True) for i in range(MAX_SUB_DIMS)] | |
with gr.Column(visible=False) as reference_view: | |
gr.Markdown("### Reference Answer Explanation (1-5 points. 1 = Machine-like, 5 = Human-like)") | |
reference_sliders = [gr.Slider(minimum=0, maximum=5, step=1, label=f"Sub-dim {i+1}", visible=False, interactive=False) for i in range(MAX_SUB_DIMS)] | |
with gr.Row(): | |
reference_btn = gr.Button("Reference") | |
go_to_pretest_btn = gr.Button("Got it, start the test", variant="primary") | |
with pretest_page: | |
gr.Markdown("""## Test Instructions | |
- Every dialogue includes 2 speakers and lasts around 1 minute. | |
- **Initiator:** The one who talks the first in the dialogue. | |
- **Respondent:** The other one. | |
- For each question, you'll evaluate the **respondent** (not the initiator) across **5 dimensions**. | |
- Under each dimension, score **every listed feature** from **0 to 5**: | |
### 🔢 Scoring Guide: | |
- **0** – The feature is **not present** *(some features are always present, so use 1–5 in those cases)* | |
- **1** – Strongly machine-like | |
- **2** – Somewhat machine-like | |
- **3** – Neutral (no clear human or machine lean) | |
- **4** – Somewhat human-like | |
- **5** – Strongly human-like | |
- After rating all dimensions, make a final judgment: is the **respondent** a human or an AI? | |
- You can freely switch between dimensions using the **Previous** and **Next** buttons. | |
--- | |
### ⚠️ Important Notes: | |
- Stick to your username all the time. | |
- Remember to **pause the audio** before you proceed to the final judgement. Otherwise it will keep playing and you cannot stop it. | |
- Once you start the test, try not to refresh the page or quit it. You need to grade 5 recordings every test. | |
- Focus on whether the **respondent's speech** sounds more **human-like or machine-like** for each feature. | |
> For example: correct pronunciation doesn't always mean "human", and mispronunciation doesn't mean "AI". Think in terms of human-likeness. | |
- Even if you're confident early on about the respondent's identity, still evaluate **each dimension independently**. | |
Avoid just labeling all dimensions as "machine-like" or "human-like" without listening carefully. | |
""") | |
go_to_test_btn = gr.Button("Start the Test", variant="primary") | |
with test_page: | |
gr.Markdown("## Formal Test") | |
question_progress_text = gr.Markdown() | |
test_dimension_title = gr.Markdown() | |
test_audio = gr.Audio(label="Test Audio") | |
gr.Markdown("--- \n ### Please rate the respondent (not the initiator) in the conversation based on the following features (0-5 points. 0 - Feature not present; 1 - Machine; 3 - Neutral; 5 - Human)") | |
test_sliders = [gr.Slider(minimum=1, maximum=5, step=1, label=f"Sub-dim {i+1}", visible=False, interactive=True) for i in range(MAX_SUB_DIMS)] | |
with gr.Row(): | |
prev_dim_btn = gr.Button("Previous Dimension") | |
next_dim_btn = gr.Button("Next Dimension", variant="primary") | |
with final_judgment_page: | |
gr.Markdown("## Final Judgment") | |
gr.Markdown("You have completed scoring for all dimensions. Please make a final judgment based on your overall impression.") | |
final_human_robot_radio = gr.Radio(["👤 Human", "🤖 AI"], label="Please determine the respondent type (required)") | |
submit_final_answer_btn = gr.Button("Submit Answer for This Question", variant="primary", interactive=False) | |
with result_page: | |
gr.Markdown("## Test Completed") | |
result_text = gr.Markdown() | |
back_to_welcome_btn = gr.Button("Back to Main Page", variant="primary") | |
# ============================================================================== | |
# 事件绑定 (Event Binding) & IO 列表定义 | |
# ============================================================================== | |
sample_init_outputs = [ | |
info_page, sample_page, user_data_state, sample_dimension_selector, | |
sample_audio, interactive_view, reference_view, reference_btn | |
] + sample_sliders + reference_sliders | |
test_init_outputs = [ | |
pretest_page, test_page, final_judgment_page, result_page, | |
current_question_index, current_test_dimension_index, current_question_selections, | |
question_progress_text, test_dimension_title, test_audio, | |
prev_dim_btn, next_dim_btn, | |
final_human_robot_radio, submit_final_answer_btn, | |
] + test_sliders | |
nav_inputs = [current_question_index, current_test_dimension_index, current_question_selections] + test_sliders | |
nav_outputs = [ | |
test_page, final_judgment_page, | |
current_question_index, current_test_dimension_index, current_question_selections, | |
question_progress_text, test_dimension_title, test_audio, | |
final_human_robot_radio, submit_final_answer_btn, | |
prev_dim_btn, next_dim_btn, | |
] + test_sliders | |
full_outputs_with_results = test_init_outputs + [test_results, result_text] | |
# start_btn.click(fn=start_challenge, outputs=[welcome_page, info_page]) | |
start_btn.click( | |
fn=start_challenge, | |
inputs=[user_data_state], | |
outputs=[welcome_page, info_page, user_data_state] | |
) | |
for comp in [age_input, gender_input, education_input, education_other_input, ai_experience_input]: | |
comp.change( | |
fn=check_info_complete, | |
inputs=[username_input, age_input, gender_input, education_input, education_other_input, ai_experience_input], | |
outputs=submit_info_btn | |
) | |
education_input.change(fn=toggle_education_other, inputs=education_input, outputs=education_other_input) | |
submit_info_btn.click( | |
fn=show_sample_page_and_init, | |
inputs=[username_input, age_input, gender_input, education_input, education_other_input, ai_experience_input, user_data_state], | |
outputs=sample_init_outputs | |
) | |
sample_dimension_selector.change( | |
fn=update_sample_view, | |
inputs=sample_dimension_selector, | |
outputs=[sample_audio, interactive_view, reference_view, reference_btn] + sample_sliders + reference_sliders | |
) | |
reference_btn.click( | |
fn=toggle_reference_view, | |
inputs=reference_btn, | |
outputs=[interactive_view, reference_view, reference_btn] | |
) | |
go_to_pretest_btn.click(lambda: (gr.update(visible=False), gr.update(visible=True)), outputs=[sample_page, pretest_page]) | |
go_to_test_btn.click( | |
fn=lambda user: init_test_question(user, 0) + ([], gr.update()), | |
inputs=[user_data_state], | |
outputs=full_outputs_with_results | |
) | |
prev_dim_btn.click( | |
fn=lambda q,d,s, *sliders: navigate_dimensions("prev", q,d,s, *sliders), | |
inputs=nav_inputs, outputs=nav_outputs | |
) | |
next_dim_btn.click( | |
fn=lambda q,d,s, *sliders: navigate_dimensions("next", q,d,s, *sliders), | |
inputs=nav_inputs, outputs=nav_outputs | |
) | |
final_human_robot_radio.change( | |
fn=lambda choice: gr.update(interactive=bool(choice)), | |
inputs=final_human_robot_radio, | |
outputs=submit_final_answer_btn | |
) | |
submit_final_answer_btn.click( | |
fn=submit_question_and_advance, | |
inputs=[current_question_index, current_test_dimension_index, current_question_selections, final_human_robot_radio, test_results, user_data_state], | |
outputs=full_outputs_with_results | |
) | |
back_to_welcome_btn.click(fn=back_to_welcome, outputs=list(pages.values()) + [user_data_state, current_question_index, current_test_dimension_index, current_question_selections, test_results]) | |
# ============================================================================== | |
# 程序入口 (Entry Point) | |
# ============================================================================== | |
if __name__ == "__main__": | |
if not os.path.exists("audio"): | |
os.makedirs("audio") | |
if "SPACE_ID" in os.environ: | |
print("Running in a Hugging Face Space, checking for audio files...") | |
# all_files = [q["audio"] for q in QUESTION_SET] + [d["audio"] for d in DIMENSIONS_DATA] | |
all_files = [d["audio"] for d in DIMENSIONS_DATA] | |
for audio_file in set(all_files): | |
if not os.path.exists(audio_file): | |
print(f"⚠️ Warning: Audio file not found: {audio_file}") | |
demo.launch(debug=True) |