Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import os | |
| import json | |
| import pandas as pd | |
| import random | |
| import shutil | |
| import time | |
| import collections | |
| from functools import wraps | |
| from filelock import FileLock | |
| from datasets import load_dataset, Audio | |
| from huggingface_hub import HfApi, hf_hub_download | |
| from multiprocessing import TimeoutError | |
| from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError | |
| # Load dataset from HuggingFace | |
| dataset = load_dataset("intersteller2887/Turing-test-dataset-en", split="train") | |
| dataset = dataset.cast_column("audio", Audio(decode=False)) # Prevent calling 'torchcodec' from newer version of 'datasets' | |
| # Huggingface space working directory: "/home/user/app" | |
| target_audio_dir = "/home/user/app/audio" | |
| os.makedirs(target_audio_dir, exist_ok=True) | |
| COUNT_JSON_PATH = "/home/user/app/count.json" | |
| COUNT_JSON_REPO_PATH = "submissions/count.json" # Output directory (Huggingface dataset directory) | |
| # Copy recordings to the working directory | |
| local_audio_paths = [] | |
| for item in dataset: | |
| src_path = item["audio"]["path"] | |
| if src_path and os.path.exists(src_path): | |
| filename = os.path.basename(src_path) | |
| dst_path = os.path.join(target_audio_dir, filename) | |
| if not os.path.exists(dst_path): | |
| shutil.copy(src_path, dst_path) | |
| local_audio_paths.append(dst_path) | |
| all_data_audio_paths = local_audio_paths | |
| # Take first file of the datasets as sample | |
| sample1_audio_path = local_audio_paths[0] | |
| print(sample1_audio_path) | |
| # ============================================================================== | |
| # Data Definition | |
| # ============================================================================== | |
| DIMENSIONS_DATA = [ | |
| { | |
| "title": "Semantic and Pragmatic Features", | |
| "audio": sample1_audio_path, | |
| "sub_dims": [ | |
| "Memory Consistency: Machine-like: Inconsistent memory across contexts and unable to detect or correct errors (e.g., forgetting key information and insisting on incorrect answers); Human-like: Consistent memory in short contexts, and asks for clarification when memory deviations occur", | |
| "Logical Coherence: Machine-like: Abrupt logical transitions or self-contradictions (e.g., suddenly changing topics without transition); Human-like: Natural and smooth logic", | |
| "Pronunciation Accuracy: Machine-like: Unnatural pronunciation errors, mispronunciation of heteronyms; Human-like: Correct and natural pronunciation of words, with proper usage of heteronyms based on context", | |
| "Code-switching: Machine-like: Rigid multilingual mixing without logical language switching; Human-like: Multilingual mixing is often context-dependent (e.g., proper nouns, idiomatic expressions), and the switching between languages is smooth", | |
| "Precision in Expression: Machine-like: Rarely uses vague expressions, responses are precise and affirmative; Human-like: Uses vague expressions like 'more or less', 'probably', and self-correct (e.g., 'no, no')", | |
| "Use of Fillers: Machine-like: Rare use of fillers or unnatural usage; Human-like: Frequently uses fillers (e.g., 'um', 'like') while thinking", | |
| "Metaphor and Pragmatic Intent: Machine-like: Literal and direct, lacking semantic diversity, only capable of surface-level interpretation; Human-like: Uses metaphor, irony, and euphemism to convey layered meanings" | |
| ], | |
| "reference_scores": [5, 5, 5, 0, 5, 5, 0] | |
| }, | |
| { | |
| "title": "Non-Physiological Paralinguistic Features", | |
| "audio": sample1_audio_path, | |
| "sub_dims": [ | |
| "Rhythm: Machine-like: Almost no pauses or mechanical pauses; Human-like: Speaking rate varies with semantic flow, occasional pauses or hesitations", | |
| "Intonation: Machine-like: Monotonous or overly regular pitch changes, inappropriate to the context; Human-like: Natural pitch rise or fall when expressing questions, surprise, or emphasis", | |
| "Stress: Machine-like: No emphasis on words or abnormal emphasis placement; Human-like: Consciously emphasizes key words to highlight focus", | |
| "Auxiliary Vocalizations: Machine-like: Contextually incorrect or mechanical auxiliary sounds; Human-like: Produces context-appropriate non-verbal sounds, such as laughter or sighs" | |
| ], | |
| "reference_scores": [5, 5, 5, 5] | |
| }, | |
| { | |
| "title": "Physiological Paralinguistic Features", | |
| "audio": sample1_audio_path, | |
| "sub_dims": [ | |
| "Micro-physiological Noise: Machine-like: Speech is overly clean or emits unnatural noises (e.g., electrical static); Human-like: Presence of breathing sounds, saliva sounds, bubble noise, etc., naturally occurring during speech", | |
| "Instability in Pronunciation: Machine-like: Pronunciation is overly clear and regular; Human-like: Some irregularities in pronunciation (e.g., liaison, tremolo, slurred speech, nasal sounds)", | |
| "Accent: Machine-like: Stiff or unnatural accent; Human-like: Natural regional accent or vocal traits" | |
| ], | |
| "reference_scores": [5, 4, 4] | |
| }, | |
| { | |
| "title": "Mechanical Persona", | |
| "audio": sample1_audio_path, | |
| "sub_dims": [ | |
| "Sycophant Behavior: Machine-like: Frequently agrees, thanks, apologizes, excessively aligns with the other’s opinion, lacking genuine interaction; Human-like: Judges whether to agree with requests or opinions based on context, doesn't always agree or echo", | |
| "Written-style Expression: Machine-like: Responses are well-structured and formal, overly formal wording, frequent listing, and vague word choice; Human-like: Conversational, flexible, and varied expression" | |
| ], | |
| "reference_scores": [5, 5] | |
| }, | |
| { | |
| "title": "Emotional Expression", | |
| "audio": sample1_audio_path, | |
| "sub_dims": [ | |
| "Semantic Level: Machine-like: Fails to respond emotionally to the other’s feelings, or uses vague and context-inappropriate emotional language; Human-like: Displays human-like emotional responses to contexts such as sadness or joy", | |
| "Acoustic Level: Machine-like: Emotional tone is patterned or context-inappropriate; Human-like: Pitch, volume, and rhythm dynamically change with emotion" | |
| ], | |
| "reference_scores": [5, 5] | |
| } | |
| ] | |
| DIMENSION_TITLES = [d["title"] for d in DIMENSIONS_DATA] | |
| SPECIAL_KEYWORDS = ["Code-switching", "Metaphor and Pragmatic Intent", "Accent"] | |
| MAX_SUB_DIMS = max(len(d['sub_dims']) for d in DIMENSIONS_DATA) | |
| THE_SUB_DIMS = [d['sub_dims'] for d in DIMENSIONS_DATA] | |
| # ============================================================================== | |
| # Backend Function Definitions | |
| # ============================================================================== | |
| # This version did not place file reading into filelock, concurrent read could happen | |
| """def load_or_initialize_count_json(audio_paths): | |
| try: | |
| # Only try downloading if file doesn't exist yet | |
| if not os.path.exists(COUNT_JSON_PATH): | |
| downloaded_path = hf_hub_download( | |
| repo_id="intersteller2887/Turing-test-dataset", | |
| repo_type="dataset", | |
| filename=COUNT_JSON_REPO_PATH, | |
| token=os.getenv("HF_TOKEN") | |
| ) | |
| # Save it as COUNT_JSON_PATH so that the lock logic remains untouched | |
| with open(downloaded_path, "rb") as src, open(COUNT_JSON_PATH, "wb") as dst: | |
| dst.write(src.read()) | |
| except Exception as e: | |
| print(f"Could not download count.json from HuggingFace dataset: {e}") | |
| # Add filelock to /workspace/count.json | |
| lock_path = COUNT_JSON_PATH + ".lock" | |
| # Read of count.json will wait for 10 seconds until another thread involving releases it, and then add a lock to it | |
| with FileLock(lock_path, timeout=10): | |
| # If count.json exists: load into count_data | |
| # Else initialize count_data with orderedDict | |
| if os.path.exists(COUNT_JSON_PATH): | |
| with open(COUNT_JSON_PATH, "r", encoding="utf-8") as f: | |
| count_data = json.load(f, object_pairs_hook=collections.OrderedDict) | |
| else: | |
| count_data = collections.OrderedDict() | |
| updated = False | |
| sample_audio_files = {os.path.basename(d["audio"]) for d in DIMENSIONS_DATA} | |
| # Guarantee that the sample recording won't be take into the pool | |
| # Update newly updated recordings into count.json | |
| for path in audio_paths: | |
| filename = os.path.basename(path) | |
| if filename not in count_data: | |
| if filename in sample_audio_files: | |
| count_data[filename] = 999 | |
| else: | |
| count_data[filename] = 0 | |
| updated = True | |
| if updated or not os.path.exists(COUNT_JSON_PATH): | |
| with open(COUNT_JSON_PATH, "w", encoding="utf-8") as f: | |
| json.dump(count_data, f, indent=4, ensure_ascii=False) | |
| return count_data""" | |
| # Function that load or initialize count.json | |
| # Function is called when user start a challenge, and this will load or initialize count.json to working directory | |
| # Initialize happens when count.json does not exist in the working directory as well as HuggingFace dataset | |
| # Load happens when count.json exists in HuggingFace dataset, and it's not loaded to the working directory yet | |
| # After load/initialize, all newly added audio files will be added to count.json with initial value of 0 | |
| # Load/Initialize will generate count.json in the working directory for all users under this space | |
| # This version also places file reading into filelock, and modified | |
| def load_or_initialize_count_json(audio_paths): | |
| # Add filelock to /workspace/count.json | |
| lock_path = COUNT_JSON_PATH + ".lock" | |
| with FileLock(lock_path, timeout=10): | |
| # If count.json does not exist in the working directory, try to download it from HuggingFace dataset | |
| if not os.path.exists(COUNT_JSON_PATH): | |
| try: | |
| # Save latest count.json to working directory | |
| downloaded_path = hf_hub_download( | |
| repo_id="intersteller2887/Turing-test-dataset-en", | |
| repo_type="dataset", | |
| filename=COUNT_JSON_REPO_PATH, | |
| token=os.getenv("HF_TOKEN") | |
| ) | |
| with open(downloaded_path, "rb") as src, open(COUNT_JSON_PATH, "wb") as dst: | |
| dst.write(src.read()) | |
| except Exception: | |
| pass | |
| # If count.json exists in the working directory: load into count_data for potential update | |
| if os.path.exists(COUNT_JSON_PATH): | |
| with open(COUNT_JSON_PATH, "r", encoding="utf-8") as f: | |
| count_data = json.load(f, object_pairs_hook=collections.OrderedDict) | |
| # Else initialize count_data with orderedDict | |
| # This happens when there is no count.json (both working directory and HuggingFace dataset) | |
| else: | |
| count_data = collections.OrderedDict() | |
| updated = False | |
| sample_audio_files = {os.path.basename(d["audio"]) for d in DIMENSIONS_DATA} | |
| # Guarantee that the sample recording won't be take into the pool | |
| # Update newly updated recordings into count.json | |
| for path in audio_paths: | |
| filename = os.path.basename(path) | |
| if filename not in count_data: | |
| if filename in sample_audio_files: | |
| count_data[filename] = 999 | |
| else: | |
| count_data[filename] = 0 | |
| updated = True | |
| # Write updated count_data to /home/user/app/count.json | |
| if updated or not os.path.exists(COUNT_JSON_PATH): | |
| with open(COUNT_JSON_PATH, "w", encoding="utf-8") as f: | |
| json.dump(count_data, f, indent=4, ensure_ascii=False) | |
| return | |
| # Shorten the time of playing previous audio when reached next question | |
| def append_cache_buster(audio_path): | |
| return f"{audio_path}?t={int(time.time() * 1000)}" | |
| # Function that samples questions from avaliable question set | |
| # This version utilizes a given count_data to sample audio paths | |
| """def sample_audio_paths(audio_paths, count_data, k=5, max_count=1): # k for questions per test; max_count for question limit in total | |
| eligible_paths = [p for p in audio_paths if count_data.get(os.path.basename(p), 0) < max_count] | |
| if len(eligible_paths) < k: | |
| raise ValueError(f"可用音频数量不足(只剩 {len(eligible_paths)} 条 count<{max_count} 的音频),无法抽取 {k} 条") | |
| # Shuffule to avoid fixed selections resulted from directory structure | |
| selected = random.sample(eligible_paths, k) | |
| # Once sampled a test, update these questions immediately | |
| for path in selected: | |
| filename = os.path.basename(path) | |
| count_data[filename] = count_data.get(filename, 0) + 1 | |
| # Add filelock to /workspace/count.json | |
| lock_path = COUNT_JSON_PATH + ".lock" | |
| with FileLock(lock_path, timeout=10): | |
| with open(COUNT_JSON_PATH, "w", encoding="utf-8") as f: | |
| json.dump(count_data, f, indent=4, ensure_ascii=False) | |
| return selected, count_data""" | |
| # This version places file reading into filelock to guarantee correct update of count.json | |
| def sample_audio_paths(audio_paths, k=5, max_count=1): | |
| # Add filelock to /workspace/count.json | |
| lock_path = COUNT_JSON_PATH + ".lock" | |
| # Load newest count.json | |
| with FileLock(lock_path, timeout=10): | |
| with open(COUNT_JSON_PATH, "r", encoding="utf-8") as f: | |
| count_data = json.load(f) | |
| eligible_paths = [ | |
| p for p in audio_paths | |
| if count_data.get(os.path.basename(p), 0) < max_count | |
| ] | |
| if len(eligible_paths) < k: | |
| raise ValueError(f"可用音频数量不足(只剩 {len(eligible_paths)} 条 count<{max_count} 的音频),无法抽取 {k} 条") | |
| selected = random.sample(eligible_paths, k) | |
| # Update count_data | |
| for path in selected: | |
| filename = os.path.basename(path) | |
| count_data[filename] = count_data.get(filename, 0) + 1 | |
| # Update count.json | |
| with open(COUNT_JSON_PATH, "w", encoding="utf-8") as f: | |
| json.dump(count_data, f, indent=4, ensure_ascii=False) | |
| # return selected, count_data | |
| # Keep count_data atomic | |
| return selected | |
| # ============================================================================== | |
| # Frontend Function Definitions | |
| # ============================================================================== | |
| # Save question_set in each user_data_state, preventing global sharing | |
| def start_challenge(user_data_state): | |
| load_or_initialize_count_json(all_data_audio_paths) | |
| # selected_audio_paths, updated_count_data = sample_audio_paths(all_data_audio_paths, k=5) | |
| # Keep count_data atomic | |
| selected_audio_paths = sample_audio_paths(all_data_audio_paths, k=5) | |
| question_set = [ | |
| {"audio": path, "desc": f"这是音频文件 {os.path.basename(path)} 的描述"} | |
| for path in selected_audio_paths | |
| ] | |
| user_data_state["question_set"] = question_set | |
| # count_data is not needed in the user data | |
| # user_data_state["updated_count_data"] = updated_count_data | |
| return gr.update(visible=False), gr.update(visible=True), user_data_state | |
| # This function toggles the visibility of the "其他(请注明)" input field based on the selected education choice | |
| def toggle_education_other(choice): | |
| is_other = (choice == "其他(请注明)") | |
| return gr.update(visible=is_other, interactive=is_other, value="") | |
| # This function checks if the user information is complete | |
| def check_info_complete(username, age, gender, education, education_other, ai_experience): | |
| if username.strip() and age and gender and education and ai_experience: | |
| if education == "其他(请注明)" and not education_other.strip(): | |
| return gr.update(interactive=False) | |
| return gr.update(interactive=True) | |
| return gr.update(interactive=False) | |
| # This function updates user_data and initializes the sample page (called when user submits their info) | |
| def show_sample_page_and_init(username, age, gender, education, education_other, ai_experience, user_data): | |
| final_edu = education_other if education == "其他(请注明)" else education | |
| user_data.update({ | |
| "username": username.strip(), | |
| "age": age, | |
| "gender": gender, | |
| "education": final_edu, | |
| "ai_experience": ai_experience | |
| }) | |
| first_dim_title = DIMENSION_TITLES[0] | |
| initial_updates = update_sample_view(first_dim_title) | |
| return [ | |
| gr.update(visible=False), gr.update(visible=True), user_data, first_dim_title | |
| ] + initial_updates | |
| def update_sample_view(dimension_title): | |
| dim_data = next((d for d in DIMENSIONS_DATA if d["title"] == dimension_title), None) | |
| if dim_data: | |
| audio_up = gr.update(value=dim_data["audio"]) | |
| # audio_up = gr.update(value=append_cache_buster(dim_data["audio"])) | |
| interactive_view_up = gr.update(visible=True) | |
| reference_view_up = gr.update(visible=False) | |
| reference_btn_up = gr.update(value="Reference") | |
| sample_slider_ups = [] | |
| ref_slider_ups = [] | |
| scores = dim_data.get("reference_scores", []) | |
| for i in range(MAX_SUB_DIMS): | |
| if i < len(dim_data['sub_dims']): | |
| label = dim_data['sub_dims'][i] | |
| score = scores[i] if i < len(scores) else 0 | |
| sample_slider_ups.append(gr.update(visible=True, label=label, value=0)) | |
| ref_slider_ups.append(gr.update(visible=True, label=label, value=score)) | |
| else: | |
| sample_slider_ups.append(gr.update(visible=False, value=0)) | |
| ref_slider_ups.append(gr.update(visible=False, value=0)) | |
| return [audio_up, interactive_view_up, reference_view_up, reference_btn_up] + sample_slider_ups + ref_slider_ups | |
| empty_updates = [gr.update()] * 4 | |
| slider_empty_updates = [gr.update()] * (MAX_SUB_DIMS * 2) | |
| return empty_updates + slider_empty_updates | |
| def update_test_dimension_view(d_idx, selections): | |
| # dimension = DIMENSIONS_DATA[d_idx] | |
| slider_updates = [] | |
| dim_data = DIMENSIONS_DATA[d_idx] | |
| sub_dims = dim_data["sub_dims"] | |
| dim_title = dim_data["title"] | |
| existing_scores = selections.get(dim_data['title'], {}) | |
| progress_d = f"Dimension {d_idx + 1} / {len(DIMENSIONS_DATA)}: **{dim_data['title']}**" | |
| for i in range(MAX_SUB_DIMS): | |
| if i < len(sub_dims): | |
| desc = sub_dims[i] | |
| # print(f"{desc} -> default value: {existing_scores.get(desc, 0)}") | |
| name = desc.split(":")[0].strip() | |
| default_value = 0 if name in SPECIAL_KEYWORDS else 1 | |
| value = existing_scores.get(desc, default_value) | |
| slider_updates.append(gr.update( | |
| visible=True, | |
| label=desc, | |
| minimum=default_value, | |
| maximum=5, | |
| step=1, | |
| value=value, | |
| interactive=True, | |
| )) | |
| # slider_updates.append(gr.update( | |
| # visible=True, | |
| # label=desc, | |
| # minimum=0 if name in SPECIAL_KEYWORDS else 1, | |
| # maximum=5, | |
| # value = existing_scores.get(desc, 0), | |
| # interactive=True, | |
| # )) | |
| else: | |
| slider_updates.append(gr.update(visible=False)) | |
| # print(f"{desc} -> default value: {existing_scores.get(desc, 0)}") | |
| # for i in range(MAX_SUB_DIMS): | |
| # if i < len(dimension['sub_dims']): | |
| # sub_dim_label = dimension['sub_dims'][i] | |
| # value = existing_scores.get(sub_dim_label, 0) | |
| # slider_updates.append(gr.update(visible=True, label=sub_dim_label, value=value)) | |
| # else: | |
| # slider_updates.append(gr.update(visible=False, value=0)) | |
| prev_btn_update = gr.update(interactive=(d_idx > 0)) | |
| next_btn_update = gr.update( | |
| value="Proceed to Final Judgement" if d_idx == len(DIMENSIONS_DATA) - 1 else "Next Dimension", | |
| interactive=True | |
| ) | |
| return [gr.update(value=progress_d), prev_btn_update, next_btn_update] + slider_updates | |
| def init_test_question(user_data, q_idx): | |
| d_idx = 0 | |
| question = user_data["question_set"][q_idx] | |
| progress_q = f"Question {q_idx + 1} / {len(user_data['question_set'])}" | |
| initial_updates = update_test_dimension_view(d_idx, {}) | |
| dim_title_update, prev_btn_update, next_btn_update = initial_updates[:3] | |
| slider_updates = initial_updates[3:] | |
| return ( | |
| gr.update(visible=False), | |
| gr.update(visible=True), | |
| gr.update(visible=False), | |
| gr.update(visible=False), | |
| q_idx, d_idx, {}, | |
| gr.update(value=progress_q), | |
| dim_title_update, | |
| gr.update(value=question['audio']), | |
| # gr.update(value=append_cache_buster(question['audio'])), | |
| prev_btn_update, | |
| next_btn_update, | |
| gr.update(value=None), # BUG FIX: Changed from "" to None to correctly clear the radio button | |
| gr.update(interactive=False), | |
| ) + tuple(slider_updates) | |
| def navigate_dimensions(direction, q_idx, d_idx, selections, *slider_values): | |
| current_dim_data = DIMENSIONS_DATA[d_idx] | |
| current_sub_dims = current_dim_data['sub_dims'] | |
| scores = {sub_dim: slider_values[i] for i, sub_dim in enumerate(current_sub_dims)} | |
| selections[current_dim_data['title']] = scores | |
| new_d_idx = d_idx + (1 if direction == "next" else -1) | |
| if direction == "next" and d_idx == len(DIMENSIONS_DATA) - 1: | |
| return ( | |
| gr.update(visible=False), | |
| gr.update(visible=True), | |
| q_idx, new_d_idx, selections, | |
| gr.update(), | |
| gr.update(), | |
| gr.update(), | |
| gr.update(interactive=True), | |
| gr.update(interactive=False), | |
| gr.update(interactive=False), | |
| gr.update(interactive=False), | |
| ) + (gr.update(),) * MAX_SUB_DIMS | |
| else: | |
| view_updates = update_test_dimension_view(new_d_idx, selections) | |
| dim_title_update, prev_btn_update, next_btn_update = view_updates[:3] | |
| slider_updates = view_updates[3:] | |
| return ( | |
| gr.update(), gr.update(), | |
| q_idx, new_d_idx, selections, | |
| gr.update(), | |
| dim_title_update, | |
| gr.update(), | |
| gr.update(), | |
| gr.update(), | |
| prev_btn_update, | |
| next_btn_update, | |
| ) + tuple(slider_updates) | |
| def toggle_reference_view(current): | |
| if current == "Reference": | |
| return gr.update(visible=False), gr.update(visible=True), gr.update(value="Back") | |
| else: | |
| return gr.update(visible=True), gr.update(visible=False), gr.update(value="Reference") | |
| def back_to_welcome(): | |
| return ( | |
| gr.update(visible=True), # welcome_page | |
| gr.update(visible=False), # info_page | |
| gr.update(visible=False), # sample_page | |
| gr.update(visible=False), # pretest_page | |
| gr.update(visible=False), # test_page | |
| gr.update(visible=False), # final_judgment_page | |
| gr.update(visible=False), # result_page | |
| {}, # user_data_state | |
| 0, # current_question_index | |
| 0, # current_test_dimension_index | |
| {}, # current_question_selections | |
| [] # test_results | |
| ) | |
| # ============================================================================== | |
| # Retry Function Definitions | |
| # ============================================================================== | |
| # Decorator function that allows to use ThreadPoolExecutor to retry a function with timeout | |
| def retry_with_timeout(max_retries=3, timeout=10, backoff=1): | |
| def decorator(func): | |
| def wrapper(*args, **kwargs): | |
| last_exception = None | |
| for attempt in range(max_retries): | |
| try: | |
| with ThreadPoolExecutor(max_workers=1) as executor: | |
| future = executor.submit(func, *args, **kwargs) | |
| try: | |
| result = future.result(timeout=timeout) | |
| return result | |
| except FutureTimeoutError: | |
| future.cancel() | |
| raise TimeoutError(f"Operation timed out after {timeout} seconds") | |
| except Exception as e: | |
| last_exception = e | |
| print(f"Attempt {attempt + 1} failed: {str(e)}") | |
| if attempt < max_retries - 1: | |
| time.sleep(backoff * (attempt + 1)) | |
| print(f"All {max_retries} attempts failed") | |
| if last_exception: | |
| raise last_exception | |
| raise Exception("Unknown error occurred") | |
| return wrapper | |
| return decorator | |
| def save_with_retry(all_results, user_data): | |
| # 尝试上传到Hugging Face Hub | |
| try: | |
| # 使用线程安全的保存方式 | |
| with ThreadPoolExecutor(max_workers=1) as executor: | |
| future = executor.submit(save_all_results_to_file, all_results, user_data) | |
| try: | |
| future.result(timeout=30) # 设置30秒超时 | |
| return True | |
| except FutureTimeoutError: | |
| future.cancel() | |
| print("上传超时") | |
| return False | |
| except Exception as e: | |
| print(f"上传到Hub失败: {e}") | |
| return False | |
| def save_locally_with_retry(data, filename, max_retries=3): | |
| for attempt in range(max_retries): | |
| try: | |
| with open(filename, 'w', encoding='utf-8') as f: | |
| json.dump(data, f, indent=4, ensure_ascii=False) | |
| return True | |
| except Exception as e: | |
| print(f"本地保存尝试 {attempt + 1} 失败: {e}") | |
| if attempt < max_retries - 1: | |
| time.sleep(1) | |
| return False | |
| def update_count_with_retry(count_data, question_set, max_retries=3): | |
| for attempt in range(max_retries): | |
| try: | |
| lock_path = COUNT_JSON_PATH + ".lock" | |
| with FileLock(lock_path, timeout=10): | |
| # Remove unfinished question(s) from count.json | |
| for question in question_set: | |
| filename = os.path.basename(question['audio']) | |
| if filename in count_data and count_data[filename] < 1: | |
| count_data[filename] = 0 # Mark unfinished data as 0 | |
| with open(COUNT_JSON_PATH, 'w', encoding='utf-8') as f: | |
| json.dump(count_data, f, indent=4, ensure_ascii=False) | |
| return True | |
| except Exception as e: | |
| print(f"Fail to update count.json {e} for {attempt + 1} time") | |
| if attempt < max_retries - 1: | |
| time.sleep(1) | |
| return False | |
| # ============================================================================== | |
| # Previous version of submit_question_and_advance | |
| """def submit_question_and_advance(q_idx, d_idx, selections, final_choice, all_results, user_data): | |
| # selections["final_choice"] = final_choice | |
| cleaned_selections = {} | |
| for dim_title, sub_scores in selections.items(): | |
| # if dim_title == "final_choice": # 去掉if判断 | |
| cleaned_selections["final_choice"] = final_choice | |
| # continue | |
| cleaned_sub_scores = {} | |
| for sub_dim, score in sub_scores.items(): | |
| cleaned_sub_scores[sub_dim] = None if score == 0 else score | |
| cleaned_selections[dim_title] = cleaned_sub_scores | |
| final_question_result = { | |
| "question_id": q_idx, | |
| "audio_file": user_data["question_set"][q_idx]['audio'], | |
| "selections": cleaned_selections | |
| } | |
| all_results.append(final_question_result) | |
| q_idx += 1 | |
| # If q_idx hasn't reached the last one | |
| if q_idx < len(user_data["question_set"]): | |
| init_q_updates = init_test_question(user_data, q_idx) # Case 1: jam happens when initialize next question | |
| return init_q_updates + (all_results, gr.update(value="")) | |
| # If q_idx has reached the last one | |
| else: | |
| result_str = "### 测试全部完成!\n\n你的提交结果概览:\n" | |
| for res in all_results: | |
| # result_str += f"\n#### 题目: {res['audio_file']}\n" | |
| result_str += f"##### 最终判断: **{res['selections'].get('final_choice', '未选择')}**\n" | |
| for dim_title, dim_data in res['selections'].items(): | |
| if dim_title == 'final_choice': continue | |
| result_str += f"- **{dim_title}**:\n" | |
| for sub_dim, score in dim_data.items(): | |
| result_str += f" - *{sub_dim[:20]}...*: {score}/5\n" | |
| # save_all_results_to_file(all_results, user_data) | |
| # save_all_results_to_file(all_results, user_data, count_data=updated_count_data) | |
| save_all_results_to_file(all_results, user_data, count_data=user_data.get("updated_count_data")) | |
| return ( | |
| gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), | |
| q_idx, d_idx, {}, | |
| gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), | |
| gr.update(), gr.update(), | |
| ) + (gr.update(),) * MAX_SUB_DIMS + (all_results, result_str)""" | |
| # user_data now no further contain "updated_count_data", which should be read/write with filelock and be directly accessed from working directory | |
| def submit_question_and_advance(q_idx, d_idx, selections, final_choice, all_results, user_data): | |
| try: | |
| # 准备数据 | |
| cleaned_selections = {} | |
| for dim_title, sub_scores in selections.items(): | |
| cleaned_selections["final_choice"] = final_choice | |
| cleaned_sub_scores = {} | |
| for sub_dim, score in sub_scores.items(): | |
| cleaned_sub_scores[sub_dim] = None if score == 0 else score | |
| cleaned_selections[dim_title] = cleaned_sub_scores | |
| final_question_result = { | |
| "question_id": q_idx, | |
| "audio_file": user_data["question_set"][q_idx]['audio'], | |
| "selections": cleaned_selections | |
| } | |
| all_results.append(final_question_result) | |
| q_idx += 1 | |
| if q_idx < len(user_data["question_set"]): | |
| init_q_updates = init_test_question(user_data, q_idx) | |
| return init_q_updates + (all_results, gr.update(value="")) | |
| else: | |
| # 准备完整结果数据 | |
| result_str = "### Test Completed!\n\nOverview of your submission:\n" | |
| for res in all_results: | |
| result_str += f"##### Final Judgement: **{res['selections'].get('final_choice', 'empty')}**\n" # empty == no choice | |
| for dim_title, dim_data in res['selections'].items(): | |
| if dim_title == 'final_choice': continue | |
| result_str += f"- **{dim_title}**:\n" | |
| for sub_dim, score in dim_data.items(): | |
| result_str += f" - *{sub_dim[:20]}...*: {score}/5\n" | |
| # 尝试上传(带重试) | |
| try: | |
| # success = save_with_retry(all_results, user_data, user_data.get("updated_count_data")) | |
| success = save_with_retry(all_results, user_data) | |
| except Exception as e: | |
| print(f"上传过程中发生错误: {e}") | |
| success = False | |
| if not success: | |
| # 上传失败,保存到本地 | |
| username = user_data.get("username", "anonymous") | |
| timestamp = pd.Timestamp.now().strftime('%Y%m%d_%H%M%S') | |
| local_filename = f"submission_{username}_{timestamp}.json" | |
| # 准备数据包 | |
| user_info_clean = { | |
| k: v for k, v in user_data.items() if k not in ["question_set"] | |
| } | |
| final_data_package = { | |
| "user_info": user_info_clean, | |
| "results": all_results | |
| } | |
| # 尝试保存到本地 | |
| local_success = save_locally_with_retry(final_data_package, local_filename) | |
| if local_success: | |
| result_str += f"\n\n⚠️ 上传失败,结果已保存到本地文件: {local_filename}" | |
| else: | |
| result_str += "\n\n❌ 上传失败且无法保存到本地文件,请联系管理员" | |
| # 更新count.json(剔除未完成的题目) | |
| try: | |
| with FileLock(COUNT_JSON_PATH + ".lock", timeout=5): | |
| with open(COUNT_JSON_PATH, "r", encoding="utf-8") as f: | |
| count_data = json.load(f, object_pairs_hook=collections.OrderedDict) | |
| count_update_success = update_count_with_retry(count_data, user_data["question_set"]) | |
| except Exception as e: | |
| print(f"更新count.json失败: {e}") | |
| count_update_success = False | |
| if not count_update_success: | |
| result_str += "\n\n⚠️ 无法更新题目计数,请联系管理员" | |
| return ( | |
| gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), | |
| q_idx, d_idx, {}, | |
| gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), | |
| gr.update(), gr.update(), | |
| ) + (gr.update(),) * MAX_SUB_DIMS + (all_results, result_str) | |
| except Exception as e: | |
| print(f"提交过程中发生错误: {e}") | |
| # 返回错误信息 | |
| error_msg = f"提交过程中发生错误: {str(e)}" | |
| return ( | |
| gr.update(), gr.update(), gr.update(), gr.update(), | |
| q_idx, d_idx, selections, | |
| gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), | |
| gr.update(), gr.update(), | |
| ) + (gr.update(),) * MAX_SUB_DIMS + (all_results, error_msg) | |
| """def save_all_results_to_file(all_results, user_data, count_data=None): | |
| repo_id = "intersteller2887/Turing-test-dataset" | |
| username = user_data.get("username", "user") | |
| timestamp = pd.Timestamp.now().strftime('%Y%m%d_%H%M%S') | |
| submission_filename = f"submissions_{username}_{timestamp}.json" | |
| user_info_clean = { | |
| k: v for k, v in user_data.items() if k not in ["question_set", "updated_count_data"] | |
| } | |
| final_data_package = { | |
| "user_info": user_info_clean, | |
| "results": all_results | |
| } | |
| json_string = json.dumps(final_data_package, ensure_ascii=False, indent=4) | |
| hf_token = os.getenv("HF_TOKEN") | |
| if not hf_token: | |
| print("HF_TOKEN not found. Cannot upload to the Hub.") | |
| return | |
| try: | |
| api = HfApi() | |
| # Upload submission file | |
| api.upload_file( | |
| path_or_fileobj=bytes(json_string, "utf-8"), | |
| path_in_repo=f"submissions/{submission_filename}", | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| token=hf_token, | |
| commit_message=f"Add new submission from {username}" | |
| ) | |
| print(f"上传成功: {submission_filename}") | |
| if count_data: | |
| with FileLock(COUNT_JSON_PATH + ".lock", timeout=10): | |
| with open(COUNT_JSON_PATH, "w", encoding="utf-8") as f: | |
| json.dump(count_data, f, indent=4, ensure_ascii=False) | |
| api.upload_file( | |
| path_or_fileobj=COUNT_JSON_PATH, | |
| path_in_repo=COUNT_JSON_REPO_PATH, | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| token=hf_token, | |
| commit_message=f"Update count.json after submission by {username}" | |
| ) | |
| except Exception as e: | |
| print(f"上传出错: {e}")""" | |
| def save_all_results_to_file(all_results, user_data): | |
| repo_id = "intersteller2887/Turing-test-dataset-en" | |
| username = user_data.get("username", "user") | |
| timestamp = pd.Timestamp.now().strftime('%Y%m%d_%H%M%S') | |
| submission_filename = f"submissions_{username}_{timestamp}.json" | |
| user_info_clean = { | |
| k: v for k, v in user_data.items() if k not in ["question_set"] | |
| } | |
| final_data_package = { | |
| "user_info": user_info_clean, | |
| "results": all_results | |
| } | |
| json_string = json.dumps(final_data_package, ensure_ascii=False, indent=4) | |
| hf_token = os.getenv("HF_TOKEN") | |
| if not hf_token: | |
| raise Exception("HF_TOKEN not found. Cannot upload to the Hub.") | |
| api = HfApi() | |
| # 上传提交文件(不再使用装饰器,直接调用) | |
| api.upload_file( | |
| path_or_fileobj=bytes(json_string, "utf-8"), | |
| path_in_repo=f"submissions/{submission_filename}", | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| token=hf_token, | |
| commit_message=f"Add new submission from {username}" | |
| ) | |
| try: | |
| with FileLock(COUNT_JSON_PATH + ".lock", timeout=5): | |
| with open(COUNT_JSON_PATH, "r", encoding="utf-8") as f: | |
| count_data_str = f.read() | |
| api.upload_file( | |
| path_or_fileobj=bytes(count_data_str, "utf-8"), | |
| path_in_repo=COUNT_JSON_REPO_PATH, | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| token=hf_token, | |
| commit_message=f"Update count.json after submission by {username}" | |
| ) | |
| except Exception as e: | |
| print(f"上传 count.json 失败: {e}") | |
| # ============================================================================== | |
| # Gradio 界面定义 (Gradio UI Definition) | |
| # ============================================================================== | |
| with gr.Blocks(theme=gr.themes.Soft(), css=".gradio-container {max-width: 960px !important}") as demo: | |
| user_data_state = gr.State({}) | |
| current_question_index = gr.State(0) | |
| current_test_dimension_index = gr.State(0) | |
| current_question_selections = gr.State({}) | |
| test_results = gr.State([]) | |
| welcome_page = gr.Column(visible=True) | |
| info_page = gr.Column(visible=False) | |
| sample_page = gr.Column(visible=False) | |
| pretest_page = gr.Column(visible=False) | |
| test_page = gr.Column(visible=False) | |
| final_judgment_page = gr.Column(visible=False) | |
| result_page = gr.Column(visible=False) | |
| pages = { | |
| "welcome": welcome_page, "info": info_page, "sample": sample_page, | |
| "pretest": pretest_page, "test": test_page, "final_judgment": final_judgment_page, | |
| "result": result_page | |
| } | |
| with welcome_page: | |
| gr.Markdown("# Can you spot the hidden AI?\nListen to the following conversations. Try to tell which respondent is an AI.") | |
| start_btn = gr.Button("Start", variant="primary") | |
| with info_page: | |
| gr.Markdown("## Basic Information") | |
| username_input = gr.Textbox(label="Username", placeholder="Please enter your nickname") | |
| age_input = gr.Radio(["Under 18", "18-25", "26-35", "36-50", "Over 50"], label="Age") | |
| gender_input = gr.Radio(["Male", "Female", "Other"], label="Gender") | |
| education_input = gr.Radio(["High school or below", "Bachelor", "Master", "PhD", "Other (please specify)"], label="Education Level") | |
| education_other_input = gr.Textbox(label="Please enter your education", visible=False, interactive=False) | |
| ai_experience_input = gr.Radio([ | |
| "Never used", | |
| "Occasionally exposed (e.g., watching others use)", | |
| "Used a few times, understand basic functions", | |
| "Use frequently, have some experience", | |
| "Very familiar, have in-depth experience with multiple AI tools" | |
| ], label="Familiarity with AI Tools") | |
| submit_info_btn = gr.Button("Submit and Start Learning Sample", variant="primary", interactive=False) | |
| with sample_page: | |
| gr.Markdown("## Sample Analysis\nPlease select a dimension to study and practice scoring. All dimensions share the same sample audio.") | |
| sample_dimension_selector = gr.Radio(DIMENSION_TITLES, label="Select Learning Dimension", value=DIMENSION_TITLES[0]) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| sample_audio = gr.Audio(label="Sample Audio", value=DIMENSIONS_DATA[0]["audio"]) | |
| # sample_audio = gr.Audio(label="Sample Audio", value=sample1_audio_path) | |
| with gr.Column(scale=2): | |
| with gr.Column(visible=True) as interactive_view: | |
| gr.Markdown("#### Please rate the following features (0-5 points. 0 - Feature not present; 1 - Machine; 3 - Neutral; 5 - Human)") | |
| sample_sliders = [gr.Slider(minimum=0, maximum=5, step=1, label=f"Sub-dim {i+1}", visible=False, interactive=True) for i in range(MAX_SUB_DIMS)] | |
| with gr.Column(visible=False) as reference_view: | |
| gr.Markdown("### Reference Answer Explanation (1-5 points. 1 = Machine-like, 5 = Human-like)") | |
| reference_sliders = [gr.Slider(minimum=0, maximum=5, step=1, label=f"Sub-dim {i+1}", visible=False, interactive=False) for i in range(MAX_SUB_DIMS)] | |
| with gr.Row(): | |
| reference_btn = gr.Button("Reference") | |
| go_to_pretest_btn = gr.Button("Got it, start the test", variant="primary") | |
| with pretest_page: | |
| gr.Markdown("""## Test Instructions | |
| - Every dialogue includes 2 speakers and lasts around 1 minute. | |
| - **Initiator:** The one who talks the first in the dialogue. | |
| - **Respondent:** The other one. | |
| - For each question, you'll evaluate the **respondent** (not the initiator) across **5 dimensions**. | |
| - Under each dimension, score **every listed feature** from **0 to 5**: | |
| ### 🔢 Scoring Guide: | |
| - **0** – The feature is **not present** *(some features are always present, so use 1–5 in those cases)* | |
| - **1** – Strongly machine-like | |
| - **2** – Somewhat machine-like | |
| - **3** – Neutral (no clear human or machine lean) | |
| - **4** – Somewhat human-like | |
| - **5** – Strongly human-like | |
| - After rating all dimensions, make a final judgment: is the **respondent** a human or an AI? | |
| - You can freely switch between dimensions using the **Previous** and **Next** buttons. | |
| --- | |
| ### ⚠️ Important Notes: | |
| - Stick to your username all the time. | |
| - Remember to **pause the audio** before you proceed to the final judgement. Otherwise it will keep playing and you cannot stop it. | |
| - Once you start the test, try not to refresh the page or quit it. You need to grade 5 recordings every test. | |
| - Focus on whether the **respondent's speech** sounds more **human-like or machine-like** for each feature. | |
| > For example: correct pronunciation doesn't always mean "human", and mispronunciation doesn't mean "AI". Think in terms of human-likeness. | |
| - Even if you're confident early on about the respondent's identity, still evaluate **each dimension independently**. | |
| Avoid just labeling all dimensions as "machine-like" or "human-like" without listening carefully. | |
| """) | |
| go_to_test_btn = gr.Button("Start the Test", variant="primary") | |
| with test_page: | |
| gr.Markdown("## Formal Test") | |
| question_progress_text = gr.Markdown() | |
| test_dimension_title = gr.Markdown() | |
| test_audio = gr.Audio(label="Test Audio") | |
| gr.Markdown("--- \n ### Please rate the respondent (not the initiator) in the conversation based on the following features (0-5 points. 0 - Feature not present; 1 - Machine; 3 - Neutral; 5 - Human)") | |
| test_sliders = [gr.Slider(minimum=1, maximum=5, step=1, label=f"Sub-dim {i+1}", visible=False, interactive=True) for i in range(MAX_SUB_DIMS)] | |
| with gr.Row(): | |
| prev_dim_btn = gr.Button("Previous Dimension") | |
| next_dim_btn = gr.Button("Next Dimension", variant="primary") | |
| with final_judgment_page: | |
| gr.Markdown("## Final Judgment") | |
| gr.Markdown("You have completed scoring for all dimensions. Please make a final judgment based on your overall impression.") | |
| final_human_robot_radio = gr.Radio(["👤 Human", "🤖 AI"], label="Please determine the respondent type (required)") | |
| submit_final_answer_btn = gr.Button("Submit Answer for This Question", variant="primary", interactive=False) | |
| with result_page: | |
| gr.Markdown("## Test Completed") | |
| result_text = gr.Markdown() | |
| back_to_welcome_btn = gr.Button("Back to Main Page", variant="primary") | |
| # ============================================================================== | |
| # 事件绑定 (Event Binding) & IO 列表定义 | |
| # ============================================================================== | |
| sample_init_outputs = [ | |
| info_page, sample_page, user_data_state, sample_dimension_selector, | |
| sample_audio, interactive_view, reference_view, reference_btn | |
| ] + sample_sliders + reference_sliders | |
| test_init_outputs = [ | |
| pretest_page, test_page, final_judgment_page, result_page, | |
| current_question_index, current_test_dimension_index, current_question_selections, | |
| question_progress_text, test_dimension_title, test_audio, | |
| prev_dim_btn, next_dim_btn, | |
| final_human_robot_radio, submit_final_answer_btn, | |
| ] + test_sliders | |
| nav_inputs = [current_question_index, current_test_dimension_index, current_question_selections] + test_sliders | |
| nav_outputs = [ | |
| test_page, final_judgment_page, | |
| current_question_index, current_test_dimension_index, current_question_selections, | |
| question_progress_text, test_dimension_title, test_audio, | |
| final_human_robot_radio, submit_final_answer_btn, | |
| prev_dim_btn, next_dim_btn, | |
| ] + test_sliders | |
| full_outputs_with_results = test_init_outputs + [test_results, result_text] | |
| # start_btn.click(fn=start_challenge, outputs=[welcome_page, info_page]) | |
| start_btn.click( | |
| fn=start_challenge, | |
| inputs=[user_data_state], | |
| outputs=[welcome_page, info_page, user_data_state] | |
| ) | |
| for comp in [age_input, gender_input, education_input, education_other_input, ai_experience_input]: | |
| comp.change( | |
| fn=check_info_complete, | |
| inputs=[username_input, age_input, gender_input, education_input, education_other_input, ai_experience_input], | |
| outputs=submit_info_btn | |
| ) | |
| education_input.change(fn=toggle_education_other, inputs=education_input, outputs=education_other_input) | |
| submit_info_btn.click( | |
| fn=show_sample_page_and_init, | |
| inputs=[username_input, age_input, gender_input, education_input, education_other_input, ai_experience_input, user_data_state], | |
| outputs=sample_init_outputs | |
| ) | |
| sample_dimension_selector.change( | |
| fn=update_sample_view, | |
| inputs=sample_dimension_selector, | |
| outputs=[sample_audio, interactive_view, reference_view, reference_btn] + sample_sliders + reference_sliders | |
| ) | |
| reference_btn.click( | |
| fn=toggle_reference_view, | |
| inputs=reference_btn, | |
| outputs=[interactive_view, reference_view, reference_btn] | |
| ) | |
| go_to_pretest_btn.click(lambda: (gr.update(visible=False), gr.update(visible=True)), outputs=[sample_page, pretest_page]) | |
| go_to_test_btn.click( | |
| fn=lambda user: init_test_question(user, 0) + ([], gr.update()), | |
| inputs=[user_data_state], | |
| outputs=full_outputs_with_results | |
| ) | |
| prev_dim_btn.click( | |
| fn=lambda q,d,s, *sliders: navigate_dimensions("prev", q,d,s, *sliders), | |
| inputs=nav_inputs, outputs=nav_outputs | |
| ) | |
| next_dim_btn.click( | |
| fn=lambda q,d,s, *sliders: navigate_dimensions("next", q,d,s, *sliders), | |
| inputs=nav_inputs, outputs=nav_outputs | |
| ) | |
| final_human_robot_radio.change( | |
| fn=lambda choice: gr.update(interactive=bool(choice)), | |
| inputs=final_human_robot_radio, | |
| outputs=submit_final_answer_btn | |
| ) | |
| submit_final_answer_btn.click( | |
| fn=submit_question_and_advance, | |
| inputs=[current_question_index, current_test_dimension_index, current_question_selections, final_human_robot_radio, test_results, user_data_state], | |
| outputs=full_outputs_with_results | |
| ) | |
| back_to_welcome_btn.click(fn=back_to_welcome, outputs=list(pages.values()) + [user_data_state, current_question_index, current_test_dimension_index, current_question_selections, test_results]) | |
| # ============================================================================== | |
| # 程序入口 (Entry Point) | |
| # ============================================================================== | |
| if __name__ == "__main__": | |
| if not os.path.exists("audio"): | |
| os.makedirs("audio") | |
| if "SPACE_ID" in os.environ: | |
| print("Running in a Hugging Face Space, checking for audio files...") | |
| # all_files = [q["audio"] for q in QUESTION_SET] + [d["audio"] for d in DIMENSIONS_DATA] | |
| all_files = [d["audio"] for d in DIMENSIONS_DATA] | |
| for audio_file in set(all_files): | |
| if not os.path.exists(audio_file): | |
| print(f"⚠️ Warning: Audio file not found: {audio_file}") | |
| demo.launch(debug=True) |