import gradio as gr import os import json import pandas as pd import random import shutil import time import collections from functools import wraps from filelock import FileLock from datasets import load_dataset, Audio from huggingface_hub import HfApi, hf_hub_download from multiprocessing import TimeoutError from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError # Load dataset from HuggingFace dataset = load_dataset("intersteller2887/Turing-test-dataset-en", split="train") dataset = dataset.cast_column("audio", Audio(decode=False)) # Prevent calling 'torchcodec' from newer version of 'datasets' # Huggingface space working directory: "/home/user/app" target_audio_dir = "/home/user/app/audio" os.makedirs(target_audio_dir, exist_ok=True) COUNT_JSON_PATH = "/home/user/app/count.json" COUNT_JSON_REPO_PATH = "submissions/count.json" # Output directory (Huggingface dataset directory) # Copy recordings to the working directory local_audio_paths = [] for item in dataset: src_path = item["audio"]["path"] if src_path and os.path.exists(src_path): filename = os.path.basename(src_path) dst_path = os.path.join(target_audio_dir, filename) if not os.path.exists(dst_path): shutil.copy(src_path, dst_path) local_audio_paths.append(dst_path) all_data_audio_paths = local_audio_paths # Take first file of the datasets as sample sample1_audio_path = local_audio_paths[0] print(sample1_audio_path) # ============================================================================== # Data Definition # ============================================================================== DIMENSIONS_DATA = [ { "title": "Semantic and Pragmatic Features", "audio": sample1_audio_path, "sub_dims": [ "Memory Consistency: Machine-like: Inconsistent memory across contexts and unable to detect or correct errors (e.g., forgetting key information and insisting on incorrect answers); Human-like: Consistent memory in short contexts, and asks for clarification when memory deviations occur", "Logical Coherence: Machine-like: Abrupt logical transitions or self-contradictions (e.g., suddenly changing topics without transition); Human-like: Natural and smooth logic", "Pronunciation Accuracy: Machine-like: Unnatural pronunciation errors, mispronunciation of heteronyms; Human-like: Correct and natural pronunciation of words, with proper usage of heteronyms based on context", "Code-switching: Machine-like: Rigid multilingual mixing without logical language switching; Human-like: Multilingual mixing is often context-dependent (e.g., proper nouns, idiomatic expressions), and the switching between languages is smooth", "Precision in Expression: Machine-like: Rarely uses vague expressions, responses are precise and affirmative; Human-like: Uses vague expressions like 'more or less', 'probably', and self-correct (e.g., 'no, no')", "Use of Fillers: Machine-like: Rare use of fillers or unnatural usage; Human-like: Frequently uses fillers (e.g., 'um', 'like') while thinking", "Metaphor and Pragmatic Intent: Machine-like: Literal and direct, lacking semantic diversity, only capable of surface-level interpretation; Human-like: Uses metaphor, irony, and euphemism to convey layered meanings" ], "reference_scores": [5, 5, 5, 0, 5, 5, 0] }, { "title": "Non-Physiological Paralinguistic Features", "audio": sample1_audio_path, "sub_dims": [ "Rhythm: Machine-like: Almost no pauses or mechanical pauses; Human-like: Speaking rate varies with semantic flow, occasional pauses or hesitations", "Intonation: Machine-like: Monotonous or overly regular pitch changes, inappropriate to the context; Human-like: Natural pitch rise or fall when expressing questions, surprise, or emphasis", "Stress: Machine-like: No emphasis on words or abnormal emphasis placement; Human-like: Consciously emphasizes key words to highlight focus", "Auxiliary Vocalizations: Machine-like: Contextually incorrect or mechanical auxiliary sounds; Human-like: Produces context-appropriate non-verbal sounds, such as laughter or sighs" ], "reference_scores": [5, 5, 5, 5] }, { "title": "Physiological Paralinguistic Features", "audio": sample1_audio_path, "sub_dims": [ "Micro-physiological Noise: Machine-like: Speech is overly clean or emits unnatural noises (e.g., electrical static); Human-like: Presence of breathing sounds, saliva sounds, bubble noise, etc., naturally occurring during speech", "Instability in Pronunciation: Machine-like: Pronunciation is overly clear and regular; Human-like: Some irregularities in pronunciation (e.g., liaison, tremolo, slurred speech, nasal sounds)", "Accent: Machine-like: Stiff or unnatural accent; Human-like: Natural regional accent or vocal traits" ], "reference_scores": [5, 4, 4] }, { "title": "Mechanical Persona", "audio": sample1_audio_path, "sub_dims": [ "Sycophant Behavior: Machine-like: Frequently agrees, thanks, apologizes, excessively aligns with the other’s opinion, lacking genuine interaction; Human-like: Judges whether to agree with requests or opinions based on context, doesn't always agree or echo", "Written-style Expression: Machine-like: Responses are well-structured and formal, overly formal wording, frequent listing, and vague word choice; Human-like: Conversational, flexible, and varied expression" ], "reference_scores": [5, 5] }, { "title": "Emotional Expression", "audio": sample1_audio_path, "sub_dims": [ "Semantic Level: Machine-like: Fails to respond emotionally to the other’s feelings, or uses vague and context-inappropriate emotional language; Human-like: Displays human-like emotional responses to contexts such as sadness or joy", "Acoustic Level: Machine-like: Emotional tone is patterned or context-inappropriate; Human-like: Pitch, volume, and rhythm dynamically change with emotion" ], "reference_scores": [5, 5] } ] DIMENSION_TITLES = [d["title"] for d in DIMENSIONS_DATA] SPECIAL_KEYWORDS = ["Code-switching", "Metaphor and Pragmatic Intent", "Accent"] MAX_SUB_DIMS = max(len(d['sub_dims']) for d in DIMENSIONS_DATA) THE_SUB_DIMS = [d['sub_dims'] for d in DIMENSIONS_DATA] # ============================================================================== # Backend Function Definitions # ============================================================================== # This version did not place file reading into filelock, concurrent read could happen """def load_or_initialize_count_json(audio_paths): try: # Only try downloading if file doesn't exist yet if not os.path.exists(COUNT_JSON_PATH): downloaded_path = hf_hub_download( repo_id="intersteller2887/Turing-test-dataset", repo_type="dataset", filename=COUNT_JSON_REPO_PATH, token=os.getenv("HF_TOKEN") ) # Save it as COUNT_JSON_PATH so that the lock logic remains untouched with open(downloaded_path, "rb") as src, open(COUNT_JSON_PATH, "wb") as dst: dst.write(src.read()) except Exception as e: print(f"Could not download count.json from HuggingFace dataset: {e}") # Add filelock to /workspace/count.json lock_path = COUNT_JSON_PATH + ".lock" # Read of count.json will wait for 10 seconds until another thread involving releases it, and then add a lock to it with FileLock(lock_path, timeout=10): # If count.json exists: load into count_data # Else initialize count_data with orderedDict if os.path.exists(COUNT_JSON_PATH): with open(COUNT_JSON_PATH, "r", encoding="utf-8") as f: count_data = json.load(f, object_pairs_hook=collections.OrderedDict) else: count_data = collections.OrderedDict() updated = False sample_audio_files = {os.path.basename(d["audio"]) for d in DIMENSIONS_DATA} # Guarantee that the sample recording won't be take into the pool # Update newly updated recordings into count.json for path in audio_paths: filename = os.path.basename(path) if filename not in count_data: if filename in sample_audio_files: count_data[filename] = 999 else: count_data[filename] = 0 updated = True if updated or not os.path.exists(COUNT_JSON_PATH): with open(COUNT_JSON_PATH, "w", encoding="utf-8") as f: json.dump(count_data, f, indent=4, ensure_ascii=False) return count_data""" # Function that load or initialize count.json # Function is called when user start a challenge, and this will load or initialize count.json to working directory # Initialize happens when count.json does not exist in the working directory as well as HuggingFace dataset # Load happens when count.json exists in HuggingFace dataset, and it's not loaded to the working directory yet # After load/initialize, all newly added audio files will be added to count.json with initial value of 0 # Load/Initialize will generate count.json in the working directory for all users under this space # This version also places file reading into filelock, and modified def load_or_initialize_count_json(audio_paths): # Add filelock to /workspace/count.json lock_path = COUNT_JSON_PATH + ".lock" with FileLock(lock_path, timeout=10): # If count.json does not exist in the working directory, try to download it from HuggingFace dataset if not os.path.exists(COUNT_JSON_PATH): try: # Save latest count.json to working directory downloaded_path = hf_hub_download( repo_id="intersteller2887/Turing-test-dataset-en", repo_type="dataset", filename=COUNT_JSON_REPO_PATH, token=os.getenv("HF_TOKEN") ) with open(downloaded_path, "rb") as src, open(COUNT_JSON_PATH, "wb") as dst: dst.write(src.read()) except Exception: pass # If count.json exists in the working directory: load into count_data for potential update if os.path.exists(COUNT_JSON_PATH): with open(COUNT_JSON_PATH, "r", encoding="utf-8") as f: count_data = json.load(f, object_pairs_hook=collections.OrderedDict) # Else initialize count_data with orderedDict # This happens when there is no count.json (both working directory and HuggingFace dataset) else: count_data = collections.OrderedDict() updated = False sample_audio_files = {os.path.basename(d["audio"]) for d in DIMENSIONS_DATA} # Guarantee that the sample recording won't be take into the pool # Update newly updated recordings into count.json for path in audio_paths: filename = os.path.basename(path) if filename not in count_data: if filename in sample_audio_files: count_data[filename] = 999 else: count_data[filename] = 0 updated = True # Write updated count_data to /home/user/app/count.json if updated or not os.path.exists(COUNT_JSON_PATH): with open(COUNT_JSON_PATH, "w", encoding="utf-8") as f: json.dump(count_data, f, indent=4, ensure_ascii=False) return # Shorten the time of playing previous audio when reached next question def append_cache_buster(audio_path): return f"{audio_path}?t={int(time.time() * 1000)}" # Function that samples questions from avaliable question set # This version utilizes a given count_data to sample audio paths """def sample_audio_paths(audio_paths, count_data, k=5, max_count=1): # k for questions per test; max_count for question limit in total eligible_paths = [p for p in audio_paths if count_data.get(os.path.basename(p), 0) < max_count] if len(eligible_paths) < k: raise ValueError(f"可用音频数量不足(只剩 {len(eligible_paths)} 条 count<{max_count} 的音频),无法抽取 {k} 条") # Shuffule to avoid fixed selections resulted from directory structure selected = random.sample(eligible_paths, k) # Once sampled a test, update these questions immediately for path in selected: filename = os.path.basename(path) count_data[filename] = count_data.get(filename, 0) + 1 # Add filelock to /workspace/count.json lock_path = COUNT_JSON_PATH + ".lock" with FileLock(lock_path, timeout=10): with open(COUNT_JSON_PATH, "w", encoding="utf-8") as f: json.dump(count_data, f, indent=4, ensure_ascii=False) return selected, count_data""" # This version places file reading into filelock to guarantee correct update of count.json def sample_audio_paths(audio_paths, k=5, max_count=1): # Add filelock to /workspace/count.json lock_path = COUNT_JSON_PATH + ".lock" # Load newest count.json with FileLock(lock_path, timeout=10): with open(COUNT_JSON_PATH, "r", encoding="utf-8") as f: count_data = json.load(f) eligible_paths = [ p for p in audio_paths if count_data.get(os.path.basename(p), 0) < max_count ] if len(eligible_paths) < k: raise ValueError(f"可用音频数量不足(只剩 {len(eligible_paths)} 条 count<{max_count} 的音频),无法抽取 {k} 条") selected = random.sample(eligible_paths, k) # Update count_data for path in selected: filename = os.path.basename(path) count_data[filename] = count_data.get(filename, 0) + 1 # Update count.json with open(COUNT_JSON_PATH, "w", encoding="utf-8") as f: json.dump(count_data, f, indent=4, ensure_ascii=False) # return selected, count_data # Keep count_data atomic return selected # ============================================================================== # Frontend Function Definitions # ============================================================================== # Save question_set in each user_data_state, preventing global sharing def start_challenge(user_data_state): load_or_initialize_count_json(all_data_audio_paths) # selected_audio_paths, updated_count_data = sample_audio_paths(all_data_audio_paths, k=5) # Keep count_data atomic selected_audio_paths = sample_audio_paths(all_data_audio_paths, k=5) question_set = [ {"audio": path, "desc": f"这是音频文件 {os.path.basename(path)} 的描述"} for path in selected_audio_paths ] user_data_state["question_set"] = question_set # count_data is not needed in the user data # user_data_state["updated_count_data"] = updated_count_data return gr.update(visible=False), gr.update(visible=True), user_data_state # This function toggles the visibility of the "其他(请注明)" input field based on the selected education choice def toggle_education_other(choice): is_other = (choice == "其他(请注明)") return gr.update(visible=is_other, interactive=is_other, value="") # This function checks if the user information is complete def check_info_complete(username, age, gender, education, education_other, ai_experience): if username.strip() and age and gender and education and ai_experience: if education == "其他(请注明)" and not education_other.strip(): return gr.update(interactive=False) return gr.update(interactive=True) return gr.update(interactive=False) # This function updates user_data and initializes the sample page (called when user submits their info) def show_sample_page_and_init(username, age, gender, education, education_other, ai_experience, user_data): final_edu = education_other if education == "其他(请注明)" else education user_data.update({ "username": username.strip(), "age": age, "gender": gender, "education": final_edu, "ai_experience": ai_experience }) first_dim_title = DIMENSION_TITLES[0] initial_updates = update_sample_view(first_dim_title) return [ gr.update(visible=False), gr.update(visible=True), user_data, first_dim_title ] + initial_updates def update_sample_view(dimension_title): dim_data = next((d for d in DIMENSIONS_DATA if d["title"] == dimension_title), None) if dim_data: audio_up = gr.update(value=dim_data["audio"]) # audio_up = gr.update(value=append_cache_buster(dim_data["audio"])) interactive_view_up = gr.update(visible=True) reference_view_up = gr.update(visible=False) reference_btn_up = gr.update(value="Reference") sample_slider_ups = [] ref_slider_ups = [] scores = dim_data.get("reference_scores", []) for i in range(MAX_SUB_DIMS): if i < len(dim_data['sub_dims']): label = dim_data['sub_dims'][i] score = scores[i] if i < len(scores) else 0 sample_slider_ups.append(gr.update(visible=True, label=label, value=0)) ref_slider_ups.append(gr.update(visible=True, label=label, value=score)) else: sample_slider_ups.append(gr.update(visible=False, value=0)) ref_slider_ups.append(gr.update(visible=False, value=0)) return [audio_up, interactive_view_up, reference_view_up, reference_btn_up] + sample_slider_ups + ref_slider_ups empty_updates = [gr.update()] * 4 slider_empty_updates = [gr.update()] * (MAX_SUB_DIMS * 2) return empty_updates + slider_empty_updates def update_test_dimension_view(d_idx, selections): # dimension = DIMENSIONS_DATA[d_idx] slider_updates = [] dim_data = DIMENSIONS_DATA[d_idx] sub_dims = dim_data["sub_dims"] dim_title = dim_data["title"] existing_scores = selections.get(dim_data['title'], {}) progress_d = f"Dimension {d_idx + 1} / {len(DIMENSIONS_DATA)}: **{dim_data['title']}**" for i in range(MAX_SUB_DIMS): if i < len(sub_dims): desc = sub_dims[i] # print(f"{desc} -> default value: {existing_scores.get(desc, 0)}") name = desc.split(":")[0].strip() default_value = 0 if name in SPECIAL_KEYWORDS else 1 value = existing_scores.get(desc, default_value) slider_updates.append(gr.update( visible=True, label=desc, minimum=default_value, maximum=5, step=1, value=value, interactive=True, )) # slider_updates.append(gr.update( # visible=True, # label=desc, # minimum=0 if name in SPECIAL_KEYWORDS else 1, # maximum=5, # value = existing_scores.get(desc, 0), # interactive=True, # )) else: slider_updates.append(gr.update(visible=False)) # print(f"{desc} -> default value: {existing_scores.get(desc, 0)}") # for i in range(MAX_SUB_DIMS): # if i < len(dimension['sub_dims']): # sub_dim_label = dimension['sub_dims'][i] # value = existing_scores.get(sub_dim_label, 0) # slider_updates.append(gr.update(visible=True, label=sub_dim_label, value=value)) # else: # slider_updates.append(gr.update(visible=False, value=0)) prev_btn_update = gr.update(interactive=(d_idx > 0)) next_btn_update = gr.update( value="Proceed to Final Judgement" if d_idx == len(DIMENSIONS_DATA) - 1 else "Next Dimension", interactive=True ) return [gr.update(value=progress_d), prev_btn_update, next_btn_update] + slider_updates def init_test_question(user_data, q_idx): d_idx = 0 question = user_data["question_set"][q_idx] progress_q = f"Question {q_idx + 1} / {len(user_data['question_set'])}" initial_updates = update_test_dimension_view(d_idx, {}) dim_title_update, prev_btn_update, next_btn_update = initial_updates[:3] slider_updates = initial_updates[3:] return ( gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), q_idx, d_idx, {}, gr.update(value=progress_q), dim_title_update, gr.update(value=question['audio']), # gr.update(value=append_cache_buster(question['audio'])), prev_btn_update, next_btn_update, gr.update(value=None), # BUG FIX: Changed from "" to None to correctly clear the radio button gr.update(interactive=False), ) + tuple(slider_updates) def navigate_dimensions(direction, q_idx, d_idx, selections, *slider_values): current_dim_data = DIMENSIONS_DATA[d_idx] current_sub_dims = current_dim_data['sub_dims'] scores = {sub_dim: slider_values[i] for i, sub_dim in enumerate(current_sub_dims)} selections[current_dim_data['title']] = scores new_d_idx = d_idx + (1 if direction == "next" else -1) if direction == "next" and d_idx == len(DIMENSIONS_DATA) - 1: return ( gr.update(visible=False), gr.update(visible=True), q_idx, new_d_idx, selections, gr.update(), gr.update(), gr.update(), gr.update(interactive=True), gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False), ) + (gr.update(),) * MAX_SUB_DIMS else: view_updates = update_test_dimension_view(new_d_idx, selections) dim_title_update, prev_btn_update, next_btn_update = view_updates[:3] slider_updates = view_updates[3:] return ( gr.update(), gr.update(), q_idx, new_d_idx, selections, gr.update(), dim_title_update, gr.update(), gr.update(), gr.update(), prev_btn_update, next_btn_update, ) + tuple(slider_updates) def toggle_reference_view(current): if current == "Reference": return gr.update(visible=False), gr.update(visible=True), gr.update(value="Back") else: return gr.update(visible=True), gr.update(visible=False), gr.update(value="Reference") def back_to_welcome(): return ( gr.update(visible=True), # welcome_page gr.update(visible=False), # info_page gr.update(visible=False), # sample_page gr.update(visible=False), # pretest_page gr.update(visible=False), # test_page gr.update(visible=False), # final_judgment_page gr.update(visible=False), # result_page {}, # user_data_state 0, # current_question_index 0, # current_test_dimension_index {}, # current_question_selections [] # test_results ) # ============================================================================== # Retry Function Definitions # ============================================================================== # Decorator function that allows to use ThreadPoolExecutor to retry a function with timeout def retry_with_timeout(max_retries=3, timeout=10, backoff=1): def decorator(func): @wraps(func) def wrapper(*args, **kwargs): last_exception = None for attempt in range(max_retries): try: with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(func, *args, **kwargs) try: result = future.result(timeout=timeout) return result except FutureTimeoutError: future.cancel() raise TimeoutError(f"Operation timed out after {timeout} seconds") except Exception as e: last_exception = e print(f"Attempt {attempt + 1} failed: {str(e)}") if attempt < max_retries - 1: time.sleep(backoff * (attempt + 1)) print(f"All {max_retries} attempts failed") if last_exception: raise last_exception raise Exception("Unknown error occurred") return wrapper return decorator def save_with_retry(all_results, user_data): # 尝试上传到Hugging Face Hub try: # 使用线程安全的保存方式 with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(save_all_results_to_file, all_results, user_data) try: future.result(timeout=30) # 设置30秒超时 return True except FutureTimeoutError: future.cancel() print("上传超时") return False except Exception as e: print(f"上传到Hub失败: {e}") return False def save_locally_with_retry(data, filename, max_retries=3): for attempt in range(max_retries): try: with open(filename, 'w', encoding='utf-8') as f: json.dump(data, f, indent=4, ensure_ascii=False) return True except Exception as e: print(f"本地保存尝试 {attempt + 1} 失败: {e}") if attempt < max_retries - 1: time.sleep(1) return False def update_count_with_retry(count_data, question_set, max_retries=3): for attempt in range(max_retries): try: lock_path = COUNT_JSON_PATH + ".lock" with FileLock(lock_path, timeout=10): # Remove unfinished question(s) from count.json for question in question_set: filename = os.path.basename(question['audio']) if filename in count_data and count_data[filename] < 1: count_data[filename] = 0 # Mark unfinished data as 0 with open(COUNT_JSON_PATH, 'w', encoding='utf-8') as f: json.dump(count_data, f, indent=4, ensure_ascii=False) return True except Exception as e: print(f"Fail to update count.json {e} for {attempt + 1} time") if attempt < max_retries - 1: time.sleep(1) return False # ============================================================================== # Previous version of submit_question_and_advance """def submit_question_and_advance(q_idx, d_idx, selections, final_choice, all_results, user_data): # selections["final_choice"] = final_choice cleaned_selections = {} for dim_title, sub_scores in selections.items(): # if dim_title == "final_choice": # 去掉if判断 cleaned_selections["final_choice"] = final_choice # continue cleaned_sub_scores = {} for sub_dim, score in sub_scores.items(): cleaned_sub_scores[sub_dim] = None if score == 0 else score cleaned_selections[dim_title] = cleaned_sub_scores final_question_result = { "question_id": q_idx, "audio_file": user_data["question_set"][q_idx]['audio'], "selections": cleaned_selections } all_results.append(final_question_result) q_idx += 1 # If q_idx hasn't reached the last one if q_idx < len(user_data["question_set"]): init_q_updates = init_test_question(user_data, q_idx) # Case 1: jam happens when initialize next question return init_q_updates + (all_results, gr.update(value="")) # If q_idx has reached the last one else: result_str = "### 测试全部完成!\n\n你的提交结果概览:\n" for res in all_results: # result_str += f"\n#### 题目: {res['audio_file']}\n" result_str += f"##### 最终判断: **{res['selections'].get('final_choice', '未选择')}**\n" for dim_title, dim_data in res['selections'].items(): if dim_title == 'final_choice': continue result_str += f"- **{dim_title}**:\n" for sub_dim, score in dim_data.items(): result_str += f" - *{sub_dim[:20]}...*: {score}/5\n" # save_all_results_to_file(all_results, user_data) # save_all_results_to_file(all_results, user_data, count_data=updated_count_data) save_all_results_to_file(all_results, user_data, count_data=user_data.get("updated_count_data")) return ( gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), q_idx, d_idx, {}, gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), ) + (gr.update(),) * MAX_SUB_DIMS + (all_results, result_str)""" # user_data now no further contain "updated_count_data", which should be read/write with filelock and be directly accessed from working directory def submit_question_and_advance(q_idx, d_idx, selections, final_choice, all_results, user_data): try: # 准备数据 cleaned_selections = {} for dim_title, sub_scores in selections.items(): cleaned_selections["final_choice"] = final_choice cleaned_sub_scores = {} for sub_dim, score in sub_scores.items(): cleaned_sub_scores[sub_dim] = None if score == 0 else score cleaned_selections[dim_title] = cleaned_sub_scores final_question_result = { "question_id": q_idx, "audio_file": user_data["question_set"][q_idx]['audio'], "selections": cleaned_selections } all_results.append(final_question_result) q_idx += 1 if q_idx < len(user_data["question_set"]): init_q_updates = init_test_question(user_data, q_idx) return init_q_updates + (all_results, gr.update(value="")) else: # 准备完整结果数据 result_str = "### Test Completed!\n\nOverview of your submission:\n" for res in all_results: result_str += f"##### Final Judgement: **{res['selections'].get('final_choice', 'empty')}**\n" # empty == no choice for dim_title, dim_data in res['selections'].items(): if dim_title == 'final_choice': continue result_str += f"- **{dim_title}**:\n" for sub_dim, score in dim_data.items(): result_str += f" - *{sub_dim[:20]}...*: {score}/5\n" # 尝试上传(带重试) try: # success = save_with_retry(all_results, user_data, user_data.get("updated_count_data")) success = save_with_retry(all_results, user_data) except Exception as e: print(f"上传过程中发生错误: {e}") success = False if not success: # 上传失败,保存到本地 username = user_data.get("username", "anonymous") timestamp = pd.Timestamp.now().strftime('%Y%m%d_%H%M%S') local_filename = f"submission_{username}_{timestamp}.json" # 准备数据包 user_info_clean = { k: v for k, v in user_data.items() if k not in ["question_set"] } final_data_package = { "user_info": user_info_clean, "results": all_results } # 尝试保存到本地 local_success = save_locally_with_retry(final_data_package, local_filename) if local_success: result_str += f"\n\n⚠️ 上传失败,结果已保存到本地文件: {local_filename}" else: result_str += "\n\n❌ 上传失败且无法保存到本地文件,请联系管理员" # 更新count.json(剔除未完成的题目) try: with FileLock(COUNT_JSON_PATH + ".lock", timeout=5): with open(COUNT_JSON_PATH, "r", encoding="utf-8") as f: count_data = json.load(f, object_pairs_hook=collections.OrderedDict) count_update_success = update_count_with_retry(count_data, user_data["question_set"]) except Exception as e: print(f"更新count.json失败: {e}") count_update_success = False if not count_update_success: result_str += "\n\n⚠️ 无法更新题目计数,请联系管理员" return ( gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), q_idx, d_idx, {}, gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), ) + (gr.update(),) * MAX_SUB_DIMS + (all_results, result_str) except Exception as e: print(f"提交过程中发生错误: {e}") # 返回错误信息 error_msg = f"提交过程中发生错误: {str(e)}" return ( gr.update(), gr.update(), gr.update(), gr.update(), q_idx, d_idx, selections, gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), ) + (gr.update(),) * MAX_SUB_DIMS + (all_results, error_msg) """def save_all_results_to_file(all_results, user_data, count_data=None): repo_id = "intersteller2887/Turing-test-dataset" username = user_data.get("username", "user") timestamp = pd.Timestamp.now().strftime('%Y%m%d_%H%M%S') submission_filename = f"submissions_{username}_{timestamp}.json" user_info_clean = { k: v for k, v in user_data.items() if k not in ["question_set", "updated_count_data"] } final_data_package = { "user_info": user_info_clean, "results": all_results } json_string = json.dumps(final_data_package, ensure_ascii=False, indent=4) hf_token = os.getenv("HF_TOKEN") if not hf_token: print("HF_TOKEN not found. Cannot upload to the Hub.") return try: api = HfApi() # Upload submission file api.upload_file( path_or_fileobj=bytes(json_string, "utf-8"), path_in_repo=f"submissions/{submission_filename}", repo_id=repo_id, repo_type="dataset", token=hf_token, commit_message=f"Add new submission from {username}" ) print(f"上传成功: {submission_filename}") if count_data: with FileLock(COUNT_JSON_PATH + ".lock", timeout=10): with open(COUNT_JSON_PATH, "w", encoding="utf-8") as f: json.dump(count_data, f, indent=4, ensure_ascii=False) api.upload_file( path_or_fileobj=COUNT_JSON_PATH, path_in_repo=COUNT_JSON_REPO_PATH, repo_id=repo_id, repo_type="dataset", token=hf_token, commit_message=f"Update count.json after submission by {username}" ) except Exception as e: print(f"上传出错: {e}")""" def save_all_results_to_file(all_results, user_data): repo_id = "intersteller2887/Turing-test-dataset-en" username = user_data.get("username", "user") timestamp = pd.Timestamp.now().strftime('%Y%m%d_%H%M%S') submission_filename = f"submissions_{username}_{timestamp}.json" user_info_clean = { k: v for k, v in user_data.items() if k not in ["question_set"] } final_data_package = { "user_info": user_info_clean, "results": all_results } json_string = json.dumps(final_data_package, ensure_ascii=False, indent=4) hf_token = os.getenv("HF_TOKEN") if not hf_token: raise Exception("HF_TOKEN not found. Cannot upload to the Hub.") api = HfApi() # 上传提交文件(不再使用装饰器,直接调用) api.upload_file( path_or_fileobj=bytes(json_string, "utf-8"), path_in_repo=f"submissions/{submission_filename}", repo_id=repo_id, repo_type="dataset", token=hf_token, commit_message=f"Add new submission from {username}" ) try: with FileLock(COUNT_JSON_PATH + ".lock", timeout=5): with open(COUNT_JSON_PATH, "r", encoding="utf-8") as f: count_data_str = f.read() api.upload_file( path_or_fileobj=bytes(count_data_str, "utf-8"), path_in_repo=COUNT_JSON_REPO_PATH, repo_id=repo_id, repo_type="dataset", token=hf_token, commit_message=f"Update count.json after submission by {username}" ) except Exception as e: print(f"上传 count.json 失败: {e}") # ============================================================================== # Gradio 界面定义 (Gradio UI Definition) # ============================================================================== with gr.Blocks(theme=gr.themes.Soft(), css=".gradio-container {max-width: 960px !important}") as demo: user_data_state = gr.State({}) current_question_index = gr.State(0) current_test_dimension_index = gr.State(0) current_question_selections = gr.State({}) test_results = gr.State([]) welcome_page = gr.Column(visible=True) info_page = gr.Column(visible=False) sample_page = gr.Column(visible=False) pretest_page = gr.Column(visible=False) test_page = gr.Column(visible=False) final_judgment_page = gr.Column(visible=False) result_page = gr.Column(visible=False) pages = { "welcome": welcome_page, "info": info_page, "sample": sample_page, "pretest": pretest_page, "test": test_page, "final_judgment": final_judgment_page, "result": result_page } with welcome_page: gr.Markdown("# Can you spot the hidden AI?\nListen to the following conversations. Try to tell which respondent is an AI.") start_btn = gr.Button("Start", variant="primary") with info_page: gr.Markdown("## Basic Information") username_input = gr.Textbox(label="Username", placeholder="Please enter your nickname") age_input = gr.Radio(["Under 18", "18-25", "26-35", "36-50", "Over 50"], label="Age") gender_input = gr.Radio(["Male", "Female", "Other"], label="Gender") education_input = gr.Radio(["High school or below", "Bachelor", "Master", "PhD", "Other (please specify)"], label="Education Level") education_other_input = gr.Textbox(label="Please enter your education", visible=False, interactive=False) ai_experience_input = gr.Radio([ "Never used", "Occasionally exposed (e.g., watching others use)", "Used a few times, understand basic functions", "Use frequently, have some experience", "Very familiar, have in-depth experience with multiple AI tools" ], label="Familiarity with AI Tools") submit_info_btn = gr.Button("Submit and Start Learning Sample", variant="primary", interactive=False) with sample_page: gr.Markdown("## Sample Analysis\nPlease select a dimension to study and practice scoring. All dimensions share the same sample audio.") sample_dimension_selector = gr.Radio(DIMENSION_TITLES, label="Select Learning Dimension", value=DIMENSION_TITLES[0]) with gr.Row(): with gr.Column(scale=1): sample_audio = gr.Audio(label="Sample Audio", value=DIMENSIONS_DATA[0]["audio"]) # sample_audio = gr.Audio(label="Sample Audio", value=sample1_audio_path) with gr.Column(scale=2): with gr.Column(visible=True) as interactive_view: gr.Markdown("#### Please rate the following features (0-5 points. 0 - Feature not present; 1 - Machine; 3 - Neutral; 5 - Human)") sample_sliders = [gr.Slider(minimum=0, maximum=5, step=1, label=f"Sub-dim {i+1}", visible=False, interactive=True) for i in range(MAX_SUB_DIMS)] with gr.Column(visible=False) as reference_view: gr.Markdown("### Reference Answer Explanation (1-5 points. 1 = Machine-like, 5 = Human-like)") reference_sliders = [gr.Slider(minimum=0, maximum=5, step=1, label=f"Sub-dim {i+1}", visible=False, interactive=False) for i in range(MAX_SUB_DIMS)] with gr.Row(): reference_btn = gr.Button("Reference") go_to_pretest_btn = gr.Button("Got it, start the test", variant="primary") with pretest_page: gr.Markdown("""## Test Instructions - Every dialogue includes 2 speakers and lasts around 1 minute. - **Initiator:** The one who talks the first in the dialogue. - **Respondent:** The other one. - For each question, you'll evaluate the **respondent** (not the initiator) across **5 dimensions**. - Under each dimension, score **every listed feature** from **0 to 5**: ### 🔢 Scoring Guide: - **0** – The feature is **not present** *(some features are always present, so use 1–5 in those cases)* - **1** – Strongly machine-like - **2** – Somewhat machine-like - **3** – Neutral (no clear human or machine lean) - **4** – Somewhat human-like - **5** – Strongly human-like - After rating all dimensions, make a final judgment: is the **respondent** a human or an AI? - You can freely switch between dimensions using the **Previous** and **Next** buttons. --- ### ⚠️ Important Notes: - Stick to your username all the time. - Remember to **pause the audio** before you proceed to the final judgement. Otherwise it will keep playing and you cannot stop it. - Once you start the test, try not to refresh the page or quit it. You need to grade 5 recordings every test. - Focus on whether the **respondent's speech** sounds more **human-like or machine-like** for each feature. > For example: correct pronunciation doesn't always mean "human", and mispronunciation doesn't mean "AI". Think in terms of human-likeness. - Even if you're confident early on about the respondent's identity, still evaluate **each dimension independently**. Avoid just labeling all dimensions as "machine-like" or "human-like" without listening carefully. """) go_to_test_btn = gr.Button("Start the Test", variant="primary") with test_page: gr.Markdown("## Formal Test") question_progress_text = gr.Markdown() test_dimension_title = gr.Markdown() test_audio = gr.Audio(label="Test Audio") gr.Markdown("--- \n ### Please rate the respondent (not the initiator) in the conversation based on the following features (0-5 points. 0 - Feature not present; 1 - Machine; 3 - Neutral; 5 - Human)") test_sliders = [gr.Slider(minimum=1, maximum=5, step=1, label=f"Sub-dim {i+1}", visible=False, interactive=True) for i in range(MAX_SUB_DIMS)] with gr.Row(): prev_dim_btn = gr.Button("Previous Dimension") next_dim_btn = gr.Button("Next Dimension", variant="primary") with final_judgment_page: gr.Markdown("## Final Judgment") gr.Markdown("You have completed scoring for all dimensions. Please make a final judgment based on your overall impression.") final_human_robot_radio = gr.Radio(["👤 Human", "🤖 AI"], label="Please determine the respondent type (required)") submit_final_answer_btn = gr.Button("Submit Answer for This Question", variant="primary", interactive=False) with result_page: gr.Markdown("## Test Completed") result_text = gr.Markdown() back_to_welcome_btn = gr.Button("Back to Main Page", variant="primary") # ============================================================================== # 事件绑定 (Event Binding) & IO 列表定义 # ============================================================================== sample_init_outputs = [ info_page, sample_page, user_data_state, sample_dimension_selector, sample_audio, interactive_view, reference_view, reference_btn ] + sample_sliders + reference_sliders test_init_outputs = [ pretest_page, test_page, final_judgment_page, result_page, current_question_index, current_test_dimension_index, current_question_selections, question_progress_text, test_dimension_title, test_audio, prev_dim_btn, next_dim_btn, final_human_robot_radio, submit_final_answer_btn, ] + test_sliders nav_inputs = [current_question_index, current_test_dimension_index, current_question_selections] + test_sliders nav_outputs = [ test_page, final_judgment_page, current_question_index, current_test_dimension_index, current_question_selections, question_progress_text, test_dimension_title, test_audio, final_human_robot_radio, submit_final_answer_btn, prev_dim_btn, next_dim_btn, ] + test_sliders full_outputs_with_results = test_init_outputs + [test_results, result_text] # start_btn.click(fn=start_challenge, outputs=[welcome_page, info_page]) start_btn.click( fn=start_challenge, inputs=[user_data_state], outputs=[welcome_page, info_page, user_data_state] ) for comp in [age_input, gender_input, education_input, education_other_input, ai_experience_input]: comp.change( fn=check_info_complete, inputs=[username_input, age_input, gender_input, education_input, education_other_input, ai_experience_input], outputs=submit_info_btn ) education_input.change(fn=toggle_education_other, inputs=education_input, outputs=education_other_input) submit_info_btn.click( fn=show_sample_page_and_init, inputs=[username_input, age_input, gender_input, education_input, education_other_input, ai_experience_input, user_data_state], outputs=sample_init_outputs ) sample_dimension_selector.change( fn=update_sample_view, inputs=sample_dimension_selector, outputs=[sample_audio, interactive_view, reference_view, reference_btn] + sample_sliders + reference_sliders ) reference_btn.click( fn=toggle_reference_view, inputs=reference_btn, outputs=[interactive_view, reference_view, reference_btn] ) go_to_pretest_btn.click(lambda: (gr.update(visible=False), gr.update(visible=True)), outputs=[sample_page, pretest_page]) go_to_test_btn.click( fn=lambda user: init_test_question(user, 0) + ([], gr.update()), inputs=[user_data_state], outputs=full_outputs_with_results ) prev_dim_btn.click( fn=lambda q,d,s, *sliders: navigate_dimensions("prev", q,d,s, *sliders), inputs=nav_inputs, outputs=nav_outputs ) next_dim_btn.click( fn=lambda q,d,s, *sliders: navigate_dimensions("next", q,d,s, *sliders), inputs=nav_inputs, outputs=nav_outputs ) final_human_robot_radio.change( fn=lambda choice: gr.update(interactive=bool(choice)), inputs=final_human_robot_radio, outputs=submit_final_answer_btn ) submit_final_answer_btn.click( fn=submit_question_and_advance, inputs=[current_question_index, current_test_dimension_index, current_question_selections, final_human_robot_radio, test_results, user_data_state], outputs=full_outputs_with_results ) back_to_welcome_btn.click(fn=back_to_welcome, outputs=list(pages.values()) + [user_data_state, current_question_index, current_test_dimension_index, current_question_selections, test_results]) # ============================================================================== # 程序入口 (Entry Point) # ============================================================================== if __name__ == "__main__": if not os.path.exists("audio"): os.makedirs("audio") if "SPACE_ID" in os.environ: print("Running in a Hugging Face Space, checking for audio files...") # all_files = [q["audio"] for q in QUESTION_SET] + [d["audio"] for d in DIMENSIONS_DATA] all_files = [d["audio"] for d in DIMENSIONS_DATA] for audio_file in set(all_files): if not os.path.exists(audio_file): print(f"⚠️ Warning: Audio file not found: {audio_file}") demo.launch(debug=True)