import gradio as gr import os import json import pandas as pd import random import shutil import time import collections from functools import wraps from filelock import FileLock from datasets import load_dataset, Audio from huggingface_hub import HfApi from multiprocessing import TimeoutError from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError dataset = load_dataset("intersteller2887/Turing-test-dataset", split="train") dataset = dataset.cast_column("audio", Audio(decode=False)) # Prevent calling 'torchcodec' from newer version of 'datasets' # Huggingface space working directory: "/home/user/app" target_audio_dir = "/home/user/app/audio" os.makedirs(target_audio_dir, exist_ok=True) COUNT_JSON_PATH = "/home/user/app/count.json" COUNT_JSON_REPO_PATH = "submissions/count.json" # Output directory (Huggingface dataset directory) # Copy recordings to the working directory local_audio_paths = [] for item in dataset: src_path = item["audio"]["path"] if src_path and os.path.exists(src_path): filename = os.path.basename(src_path) dst_path = os.path.join(target_audio_dir, filename) if not os.path.exists(dst_path): shutil.copy(src_path, dst_path) local_audio_paths.append(dst_path) all_data_audio_paths = local_audio_paths # Take first file of the datasets as sample sample1_audio_path = local_audio_paths[0] print(sample1_audio_path) # ============================================================================== # 数据定义 (Data Definition) # ============================================================================== DIMENSIONS_DATA = [ { "title": "语义和语用特征", "audio": sample1_audio_path, "sub_dims": [ "记忆一致性:人类在短上下文中记忆通常一致,不一致也会尝试自我修正(如提问);机器则可能会出现上下文记忆不一致且无法察觉或修正(如遗忘掉关键信息坚持错误回答)", "逻辑连贯性:人类逻辑自然流畅,允许合理跳跃;机器逻辑转折生硬或自相矛盾(如:突然切换话题无过渡)", "读音正确性:人类大部分情况下发音正确、自然,会结合语境使用、区分多音字;机器存在不自然的发音错误,且对多音字语境的判断能力有限", "多语言混杂:人类多语言混杂流畅,且带有语境色彩;机器多语言混杂生硬,无语言切换逻辑", "语言不精确性:人类说话时倾向存在含糊表达:如“差不多”、“应该是吧”,且会出现自我修正的行为;机器的回应通常不存在模糊表达,回答准确、肯定", "填充词使用:人类填充词(如‘嗯’‘那个’)随机且带有思考痕迹;机器填充词规律重复或完全缺失", "隐喻与语用用意:人类会使用隐喻、反语、委婉来表达多重含义;机器表达直白,仅能字面理解或生硬使用修辞,缺乏语义多样性" ], "reference_scores": [5, 5, 5, 0, 5, 5, 0] }, { "title": "非生理性副语言特征", "audio": sample1_audio_path, "sub_dims": [ "节奏:人类语速随语义起伏,偶尔卡顿或犹豫;机器节奏均匀,几乎无停顿或停顿机械", "语调:人类在表达如疑问、惊讶、强调时,音调会自然上扬或下降;机器语调单一或变化过于规律,不符合语境", "重读:人类会有意识地加强重要词语,从而突出信息焦点;机器的词语强度一致性强,或出现强调部位异常", "辅助性发声:人类会发出符合语境的非语言声音,如笑声、叹气等;机器的辅助性发声语境错误或机械化,或完全无辅助性发声" ], "reference_scores": [5, 5, 5, 5] }, { "title": "生理性副语言特征", "audio": sample1_audio_path, "sub_dims": [ "微生理杂音:人类说话存在呼吸声、口水音、气泡音等无意识发声,且自然地穿插在语流节奏当中;机器没有微生理杂音、语音过于干净,或添加不自然杂音", "发音不稳定性:人类存在一定不规则性(诸如连读、颤音、含糊发音、鼻音等);机器发音过于标准或统一,缺乏个性", "口音:人类存在自然的地区口音或语音特征;机器带口音效果生硬" ], "reference_scores": [5, 4, 4] }, { "title": "机械人格", "audio": sample1_audio_path, "sub_dims": [ "谄媚现象:人类会根据语境判断是否同意,有时提出不同意见;机器频繁同意、感谢、道歉,过度认同对方观点,缺乏真实互动感", "书面化表达:人类表达灵活,;机器回应句式工整、规范,内容过于书面化、用词泛泛" ], "reference_scores": [5, 5] }, { "title": "情感表达", "audio": sample1_audio_path, "sub_dims": [ "语义层面:人类能对悲伤、开心等语境有符合人类的情感反应;机器回应情绪淡漠,或情感词泛泛、脱离语境", "声学层面:人类音调、音量、节奏等声学特征随情绪动态变化;机器情感语调模式化,或与语境不符" ], "reference_scores": [5, 5] } ] DIMENSION_TITLES = [d["title"] for d in DIMENSIONS_DATA] MAX_SUB_DIMS = max(len(d['sub_dims']) for d in DIMENSIONS_DATA) """ # Issue: this is initialized on the starting of the space, might somehow not covered count_data = load_or_initialize_count_json(all_data_audio_paths) selected_audio_paths, updated_count_data = sample_audio_paths(all_data_audio_paths, count_data, k=5) QUESTION_SET = [ {"audio": path, "desc": f"这是音频文件 {os.path.basename(path)} 的描述"} for path in selected_audio_paths ]""" # ============================================================================== # 功能函数定义 (Function Definitions) # ============================================================================== # Function that load or initialize count.json def load_or_initialize_count_json(audio_paths): # Add filelock to /workspace/count.json lock_path = COUNT_JSON_PATH + ".lock" try: api = HfApi() repo_id = "intersteller2887/Turing-test-dataset" hf_token = os.getenv("HF_TOKEN") count_json = None try: count_json = api.get_file( path_in_repo=COUNT_JSON_REPO_PATH, repo_id=repo_id, repo_type="dataset", token=hf_token ) except Exception as e: print(f"Could not download count.json from dataset {e}") # Read of count.json will wait for 10 seconds until another thread involving releases it, and then add a lock to it with FileLock(lock_path, timeout=10): # If count.json exists: load into count_data with open(COUNT_JSON_PATH, "wb") as f: f.write(count_json) # Else initialize count_data with orderedDict if os.path.exists(COUNT_JSON_PATH): with open(COUNT_JSON_PATH, "r", encoding="utf-8") as f: count_data = json.load(f, object_pairs_hook=collections.OrderedDict) else: count_data = collections.OrderedDict() updated = False sample_audio_files = {os.path.basename(d["audio"]) for d in DIMENSIONS_DATA} # Guarantee that the sample recording won't be take into the pool # Update newly updated recordings into count.json for path in audio_paths: filename = os.path.basename(path) if filename not in count_data: if filename in sample_audio_files: count_data[filename] = 999 else: count_data[filename] = 0 updated = True if updated or not os.path.exists(COUNT_JSON_PATH): with open(COUNT_JSON_PATH, "w", encoding="utf-8") as f: json.dump(count_data, f, indent=4, ensure_ascii=False) return count_data # Shorten the time of playing previous audio when reached next question def append_cache_buster(audio_path): return f"{audio_path}?t={int(time.time() * 1000)}" """def sample_audio_paths(audio_paths, count_data, k=5, max_count=1): eligible_paths = [p for p in audio_paths if count_data.get(os.path.basename(p), 0) < max_count] if len(eligible_paths) < k: raise ValueError(f"可用音频数量不足(只剩 {len(eligible_paths)} 条 count<{max_count} 的音频),无法抽取 {k} 条") eligible_paths_copy = eligible_paths.copy() random.seed(int(time.time())) selected = random.sample(eligible_paths_copy, k) for path in selected: filename = os.path.basename(path) count_data[filename] = count_data.get(filename, 0) + 1 with open(COUNT_JSON_PATH, "w", encoding="utf-8") as f: json.dump(count_data, f, indent=4, ensure_ascii=False) return selected, count_data""" def sample_audio_paths(audio_paths, count_data, k=5, max_count=1): # k for questions per test; max_count for question limit in total eligible_paths = [p for p in audio_paths if count_data.get(os.path.basename(p), 0) < max_count] if len(eligible_paths) < k: raise ValueError(f"可用音频数量不足(只剩 {len(eligible_paths)} 条 count<{max_count} 的音频),无法抽取 {k} 条") selected = random.sample(eligible_paths, k) for path in selected: filename = os.path.basename(path) count_data[filename] = count_data.get(filename, 0) + 1 lock_path = COUNT_JSON_PATH + ".lock" with FileLock(lock_path, timeout=10): with open(COUNT_JSON_PATH, "w", encoding="utf-8") as f: json.dump(count_data, f, indent=4, ensure_ascii=False) return selected, count_data """def start_challenge(user_data_state): # global QUESTION_SET, updated_count_data # Issue: global variables in huggingface hub is shared by all threads # 每次点击“开始挑战”时重新抽题 count_data = load_or_initialize_count_json(all_data_audio_paths) selected_audio_paths, updated_count_data = sample_audio_paths(all_data_audio_paths, count_data, k=5) QUESTION_SET = [ {"audio": path, "desc": f"这是音频文件 {os.path.basename(path)} 的描述"} for path in selected_audio_paths ] # 重置 user_data 中的状态(也可以留空) user_data_state.clear() return gr.update(visible=False), gr.update(visible=True), user_data_state""" # Save question_set in each user_data_state, preventing global sharing def start_challenge(user_data_state): count_data = load_or_initialize_count_json(all_data_audio_paths) selected_audio_paths, updated_count_data = sample_audio_paths(all_data_audio_paths, count_data, k=5) question_set = [ {"audio": path, "desc": f"这是音频文件 {os.path.basename(path)} 的描述"} for path in selected_audio_paths ] user_data_state["question_set"] = question_set user_data_state["updated_count_data"] = updated_count_data return gr.update(visible=False), gr.update(visible=True), user_data_state def toggle_education_other(choice): is_other = (choice == "其他(请注明)") return gr.update(visible=is_other, interactive=is_other, value="") def check_info_complete(username, age, gender, education, education_other, ai_experience): if username.strip() and age and gender and education and ai_experience: if education == "其他(请注明)" and not education_other.strip(): return gr.update(interactive=False) return gr.update(interactive=True) return gr.update(interactive=False) def show_sample_page_and_init(username, age, gender, education, education_other, ai_experience, user_data): final_edu = education_other if education == "其他(请注明)" else education user_data.update({ "username": username.strip(), "age": age, "gender": gender, "education": final_edu, "ai_experience": ai_experience }) first_dim_title = DIMENSION_TITLES[0] initial_updates = update_sample_view(first_dim_title) return [ gr.update(visible=False), gr.update(visible=True), user_data, first_dim_title ] + initial_updates def update_sample_view(dimension_title): dim_data = next((d for d in DIMENSIONS_DATA if d["title"] == dimension_title), None) if dim_data: audio_up = gr.update(value=dim_data["audio"]) # audio_up = gr.update(value=append_cache_buster(dim_data["audio"])) interactive_view_up = gr.update(visible=True) reference_view_up = gr.update(visible=False) reference_btn_up = gr.update(value="参考") sample_slider_ups = [] ref_slider_ups = [] scores = dim_data.get("reference_scores", []) for i in range(MAX_SUB_DIMS): if i < len(dim_data['sub_dims']): label = dim_data['sub_dims'][i] score = scores[i] if i < len(scores) else 0 sample_slider_ups.append(gr.update(visible=True, label=label, value=0)) ref_slider_ups.append(gr.update(visible=True, label=label, value=score)) else: sample_slider_ups.append(gr.update(visible=False, value=0)) ref_slider_ups.append(gr.update(visible=False, value=0)) return [audio_up, interactive_view_up, reference_view_up, reference_btn_up] + sample_slider_ups + ref_slider_ups empty_updates = [gr.update()] * 4 slider_empty_updates = [gr.update()] * (MAX_SUB_DIMS * 2) return empty_updates + slider_empty_updates def update_test_dimension_view(d_idx, selections): dimension = DIMENSIONS_DATA[d_idx] progress_d = f"维度 {d_idx + 1} / {len(DIMENSIONS_DATA)}: **{dimension['title']}**" existing_scores = selections.get(dimension['title'], {}) slider_updates = [] for i in range(MAX_SUB_DIMS): if i < len(dimension['sub_dims']): sub_dim_label = dimension['sub_dims'][i] value = existing_scores.get(sub_dim_label, 0) slider_updates.append(gr.update(visible=True, label=sub_dim_label, value=value)) else: slider_updates.append(gr.update(visible=False, value=0)) prev_btn_update = gr.update(interactive=(d_idx > 0)) next_btn_update = gr.update( value="进入最终判断" if d_idx == len(DIMENSIONS_DATA) - 1 else "下一维度", interactive=True ) return [gr.update(value=progress_d), prev_btn_update, next_btn_update] + slider_updates def init_test_question(user_data, q_idx): d_idx = 0 # question = QUESTION_SET[q_idx] # progress_q = f"第 {q_idx + 1} / {len(QUESTION_SET)} 题" question = user_data["question_set"][q_idx] progress_q = f"第 {q_idx + 1} / {len(user_data['question_set'])} 题" initial_updates = update_test_dimension_view(d_idx, {}) dim_title_update, prev_btn_update, next_btn_update = initial_updates[:3] slider_updates = initial_updates[3:] return ( gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), q_idx, d_idx, {}, gr.update(value=progress_q), dim_title_update, gr.update(value=question['audio']), # gr.update(value=append_cache_buster(question['audio'])), prev_btn_update, next_btn_update, gr.update(value=None), # BUG FIX: Changed from "" to None to correctly clear the radio button gr.update(interactive=False), ) + tuple(slider_updates) def navigate_dimensions(direction, q_idx, d_idx, selections, *slider_values): current_dim_data = DIMENSIONS_DATA[d_idx] current_sub_dims = current_dim_data['sub_dims'] scores = {sub_dim: slider_values[i] for i, sub_dim in enumerate(current_sub_dims)} selections[current_dim_data['title']] = scores new_d_idx = d_idx + (1 if direction == "next" else -1) if direction == "next" and d_idx == len(DIMENSIONS_DATA) - 1: return ( gr.update(visible=False), gr.update(visible=True), q_idx, new_d_idx, selections, gr.update(), gr.update(), gr.update(), gr.update(interactive=True), gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False), ) + (gr.update(),) * MAX_SUB_DIMS else: view_updates = update_test_dimension_view(new_d_idx, selections) dim_title_update, prev_btn_update, next_btn_update = view_updates[:3] slider_updates = view_updates[3:] return ( gr.update(), gr.update(), q_idx, new_d_idx, selections, gr.update(), dim_title_update, gr.update(), gr.update(), gr.update(), prev_btn_update, next_btn_update, ) + tuple(slider_updates) # ============================================================================== # 重连函数定义 (Retry Function Definitions) # ============================================================================== # Function for handling connection error def retry_with_timeout(max_retries=3, timeout=10, backoff=1): def decorator(func): @wraps(func) def wrapper(*args, **kwargs): last_exception = None for attempt in range(max_retries): try: with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(func, *args, **kwargs) try: result = future.result(timeout=timeout) return result except FutureTimeoutError: future.cancel() raise TimeoutError(f"Operation timed out after {timeout} seconds") except Exception as e: last_exception = e print(f"Attempt {attempt + 1} failed: {str(e)}") if attempt < max_retries - 1: time.sleep(backoff * (attempt + 1)) print(f"All {max_retries} attempts failed") if last_exception: raise last_exception raise Exception("Unknown error occurred") return wrapper return decorator def save_with_retry(all_results, user_data, count_data): # 尝试上传到Hugging Face Hub try: # 使用线程安全的保存方式 with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(save_all_results_to_file, all_results, user_data, count_data) try: future.result(timeout=30) # 设置30秒超时 return True except FutureTimeoutError: future.cancel() print("上传超时") return False except Exception as e: print(f"上传到Hub失败: {e}") return False def save_locally_with_retry(data, filename, max_retries=3): for attempt in range(max_retries): try: with open(filename, 'w', encoding='utf-8') as f: json.dump(data, f, indent=4, ensure_ascii=False) return True except Exception as e: print(f"本地保存尝试 {attempt + 1} 失败: {e}") if attempt < max_retries - 1: time.sleep(1) return False def update_count_with_retry(count_data, question_set, max_retries=3): for attempt in range(max_retries): try: lock_path = COUNT_JSON_PATH + ".lock" with FileLock(lock_path, timeout=10): # Remove unfinished question(s) from count.json for question in question_set: filename = os.path.basename(question['audio']) if filename in count_data and count_data[filename] < 1: count_data[filename] = 0 # Mark unfinished data as 0 with open(COUNT_JSON_PATH, 'w', encoding='utf-8') as f: json.dump(count_data, f, indent=4, ensure_ascii=False) return True except Exception as e: print(f"Fail to update count.json {e} for {attempt + 1} time") if attempt < max_retries - 1: time.sleep(1) return False # ============================================================================== # Previous version of submit_question_and_advance """def submit_question_and_advance(q_idx, d_idx, selections, final_choice, all_results, user_data): # selections["final_choice"] = final_choice cleaned_selections = {} for dim_title, sub_scores in selections.items(): # if dim_title == "final_choice": # 去掉if判断 cleaned_selections["final_choice"] = final_choice # continue cleaned_sub_scores = {} for sub_dim, score in sub_scores.items(): cleaned_sub_scores[sub_dim] = None if score == 0 else score cleaned_selections[dim_title] = cleaned_sub_scores final_question_result = { "question_id": q_idx, "audio_file": user_data["question_set"][q_idx]['audio'], "selections": cleaned_selections } all_results.append(final_question_result) q_idx += 1 # If q_idx hasn't reached the last one if q_idx < len(user_data["question_set"]): init_q_updates = init_test_question(user_data, q_idx) # Case 1: jam happens when initialize next question return init_q_updates + (all_results, gr.update(value="")) # If q_idx has reached the last one else: result_str = "### 测试全部完成!\n\n你的提交结果概览:\n" for res in all_results: # result_str += f"\n#### 题目: {res['audio_file']}\n" result_str += f"##### 最终判断: **{res['selections'].get('final_choice', '未选择')}**\n" for dim_title, dim_data in res['selections'].items(): if dim_title == 'final_choice': continue result_str += f"- **{dim_title}**:\n" for sub_dim, score in dim_data.items(): result_str += f" - *{sub_dim[:20]}...*: {score}/5\n" # save_all_results_to_file(all_results, user_data) # save_all_results_to_file(all_results, user_data, count_data=updated_count_data) save_all_results_to_file(all_results, user_data, count_data=user_data.get("updated_count_data")) return ( gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), q_idx, d_idx, {}, gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), ) + (gr.update(),) * MAX_SUB_DIMS + (all_results, result_str)""" def submit_question_and_advance(q_idx, d_idx, selections, final_choice, all_results, user_data): try: # 准备数据 cleaned_selections = {} for dim_title, sub_scores in selections.items(): cleaned_selections["final_choice"] = final_choice cleaned_sub_scores = {} for sub_dim, score in sub_scores.items(): cleaned_sub_scores[sub_dim] = None if score == 0 else score cleaned_selections[dim_title] = cleaned_sub_scores final_question_result = { "question_id": q_idx, "audio_file": user_data["question_set"][q_idx]['audio'], "selections": cleaned_selections } all_results.append(final_question_result) q_idx += 1 if q_idx < len(user_data["question_set"]): init_q_updates = init_test_question(user_data, q_idx) return init_q_updates + (all_results, gr.update(value="")) else: # 准备完整结果数据 result_str = "### 测试全部完成!\n\n你的提交结果概览:\n" for res in all_results: result_str += f"##### 最终判断: **{res['selections'].get('final_choice', '未选择')}**\n" for dim_title, dim_data in res['selections'].items(): if dim_title == 'final_choice': continue result_str += f"- **{dim_title}**:\n" for sub_dim, score in dim_data.items(): result_str += f" - *{sub_dim[:20]}...*: {score}/5\n" # 尝试上传(带重试) try: success = save_with_retry(all_results, user_data, user_data.get("updated_count_data")) except Exception as e: print(f"上传过程中发生错误: {e}") success = False if not success: # 上传失败,保存到本地 username = user_data.get("username", "anonymous") timestamp = pd.Timestamp.now().strftime('%Y%m%d_%H%M%S') local_filename = f"submission_{username}_{timestamp}.json" # 准备数据包 user_info_clean = { k: v for k, v in user_data.items() if k not in ["question_set", "updated_count_data"] } final_data_package = { "user_info": user_info_clean, "results": all_results } # 尝试保存到本地 local_success = save_locally_with_retry(final_data_package, local_filename) if local_success: result_str += f"\n\n⚠️ 上传失败,结果已保存到本地文件: {local_filename}" else: result_str += "\n\n❌ 上传失败且无法保存到本地文件,请联系管理员" # 更新count.json(剔除未完成的题目) try: count_update_success = update_count_with_retry( user_data.get("updated_count_data", {}), user_data["question_set"] ) except Exception as e: print(f"更新count.json失败: {e}") count_update_success = False if not count_update_success: result_str += "\n\n⚠️ 无法更新题目计数,请联系管理员" return ( gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), q_idx, d_idx, {}, gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), ) + (gr.update(),) * MAX_SUB_DIMS + (all_results, result_str) except Exception as e: print(f"提交过程中发生错误: {e}") # 返回错误信息 error_msg = f"提交过程中发生错误: {str(e)}" return ( gr.update(), gr.update(), gr.update(), gr.update(), q_idx, d_idx, selections, gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), ) + (gr.update(),) * MAX_SUB_DIMS + (all_results, error_msg) """def save_all_results_to_file(all_results, user_data, count_data=None): repo_id = "intersteller2887/Turing-test-dataset" username = user_data.get("username", "user") timestamp = pd.Timestamp.now().strftime('%Y%m%d_%H%M%S') submission_filename = f"submissions_{username}_{timestamp}.json" user_info_clean = { k: v for k, v in user_data.items() if k not in ["question_set", "updated_count_data"] } final_data_package = { "user_info": user_info_clean, "results": all_results } json_string = json.dumps(final_data_package, ensure_ascii=False, indent=4) hf_token = os.getenv("HF_TOKEN") if not hf_token: print("HF_TOKEN not found. Cannot upload to the Hub.") return try: api = HfApi() # Upload submission file api.upload_file( path_or_fileobj=bytes(json_string, "utf-8"), path_in_repo=f"submissions/{submission_filename}", repo_id=repo_id, repo_type="dataset", token=hf_token, commit_message=f"Add new submission from {username}" ) print(f"上传成功: {submission_filename}") if count_data: with FileLock(COUNT_JSON_PATH + ".lock", timeout=10): with open(COUNT_JSON_PATH, "w", encoding="utf-8") as f: json.dump(count_data, f, indent=4, ensure_ascii=False) api.upload_file( path_or_fileobj=COUNT_JSON_PATH, path_in_repo=COUNT_JSON_REPO_PATH, repo_id=repo_id, repo_type="dataset", token=hf_token, commit_message=f"Update count.json after submission by {username}" ) except Exception as e: print(f"上传出错: {e}")""" def save_all_results_to_file(all_results, user_data, count_data=None): repo_id = "intersteller2887/Turing-test-dataset" username = user_data.get("username", "user") timestamp = pd.Timestamp.now().strftime('%Y%m%d_%H%M%S') submission_filename = f"submissions_{username}_{timestamp}.json" user_info_clean = { k: v for k, v in user_data.items() if k not in ["question_set", "updated_count_data"] } final_data_package = { "user_info": user_info_clean, "results": all_results } json_string = json.dumps(final_data_package, ensure_ascii=False, indent=4) hf_token = os.getenv("HF_TOKEN") if not hf_token: raise Exception("HF_TOKEN not found. Cannot upload to the Hub.") api = HfApi() # 上传提交文件(不再使用装饰器,直接调用) api.upload_file( path_or_fileobj=bytes(json_string, "utf-8"), path_in_repo=f"submissions/{submission_filename}", repo_id=repo_id, repo_type="dataset", token=hf_token, commit_message=f"Add new submission from {username}" ) if count_data: with FileLock(COUNT_JSON_PATH + ".lock", timeout=5): with open(COUNT_JSON_PATH, "w", encoding="utf-8") as f: json.dump(count_data, f, indent=4, ensure_ascii=False) api.upload_file( path_or_fileobj=COUNT_JSON_PATH, path_in_repo=COUNT_JSON_REPO_PATH, repo_id=repo_id, repo_type="dataset", token=hf_token, commit_message=f"Update count.json after submission by {username}" ) def toggle_reference_view(current): if current == "参考": return gr.update(visible=False), gr.update(visible=True), gr.update(value="返回") else: return gr.update(visible=True), gr.update(visible=False), gr.update(value="参考") def back_to_welcome(): return ( gr.update(visible=True), # welcome_page gr.update(visible=False), # info_page gr.update(visible=False), # sample_page gr.update(visible=False), # pretest_page gr.update(visible=False), # test_page gr.update(visible=False), # final_judgment_page gr.update(visible=False), # result_page {}, # user_data_state 0, # current_question_index 0, # current_test_dimension_index {}, # current_question_selections [] # test_results ) # ============================================================================== # Gradio 界面定义 (Gradio UI Definition) # ============================================================================== with gr.Blocks(theme=gr.themes.Soft(), css=".gradio-container {max-width: 960px !important}") as demo: user_data_state = gr.State({}) current_question_index = gr.State(0) current_test_dimension_index = gr.State(0) current_question_selections = gr.State({}) test_results = gr.State([]) welcome_page = gr.Column(visible=True) info_page = gr.Column(visible=False) sample_page = gr.Column(visible=False) pretest_page = gr.Column(visible=False) test_page = gr.Column(visible=False) final_judgment_page = gr.Column(visible=False) result_page = gr.Column(visible=False) pages = { "welcome": welcome_page, "info": info_page, "sample": sample_page, "pretest": pretest_page, "test": test_page, "final_judgment": final_judgment_page, "result": result_page } with welcome_page: gr.Markdown("# AI 识破者\n你将听到一系列对话,请判断哪个回应者是 AI。") start_btn = gr.Button("开始挑战", variant="primary") with info_page: gr.Markdown("## 请提供一些基本信息") username_input = gr.Textbox(label="用户名", placeholder="请输入你的昵称") age_input = gr.Radio(["18岁以下", "18-25岁", "26-35岁", "36-50岁", "50岁以上"], label="年龄") gender_input = gr.Radio(["男", "女", "其他"], label="性别") education_input = gr.Radio(["高中及以下", "本科", "硕士", "博士", "其他(请注明)"], label="学历") education_other_input = gr.Textbox(label="请填写你的学历", visible=False, interactive=False) ai_experience_input = gr.Radio(["从未使用过", "偶尔接触(如看别人用)", "使用过几次,了解基本功能", "经常使用,有一定操作经验", "非常熟悉,深入使用过多个 AI 工具"], label="对 AI 工具的熟悉程度") submit_info_btn = gr.Button("提交并开始学习样例", variant="primary", interactive=False) with sample_page: gr.Markdown("## 样例分析\n请选择一个维度进行学习和打分练习。所有维度共用同一个样例音频。") sample_dimension_selector = gr.Radio(DIMENSION_TITLES, label="选择学习维度", value=DIMENSION_TITLES[0]) with gr.Row(): with gr.Column(scale=1): sample_audio = gr.Audio(label="样例音频", value=DIMENSIONS_DATA[0]["audio"]) with gr.Column(scale=2): with gr.Column(visible=True) as interactive_view: gr.Markdown("#### 请为以下特征打分 (0-5分。0-特征无体现;1-机器;3-特征无偏向;5-人类)") sample_sliders = [gr.Slider(minimum=0, maximum=5, step=1, label=f"Sub-dim {i+1}", visible=False, interactive=True) for i in range(MAX_SUB_DIMS)] with gr.Column(visible=False) as reference_view: gr.Markdown("### 参考答案解析 (1-5分。1对应机器,5对应人类)") reference_sliders = [gr.Slider(minimum=0, maximum=5, step=1, label=f"Sub-dim {i+1}", visible=False, interactive=False) for i in range(MAX_SUB_DIMS)] with gr.Row(): reference_btn = gr.Button("参考") go_to_pretest_btn = gr.Button("我明白了,开始测试", variant="primary") with pretest_page: gr.Markdown("## 测试说明\n" "- 对于每一道题,你都需要对全部 **5 个维度** 进行评估。\n" "- 在每个维度下,请为出现的每个特征 **从1到5打分**。\n" "- **评分解释如下:**\n" " - **0 分:特征未体现**;\n" " - **1 分:极度符合机器特征**;\n" " - **2 分:较为符合机器特征**;\n" " - **3 分:无明显人类或机器倾向**;\n" " - **4 分:较为符合人类特征**;\n" " - **5 分:极度符合人类特征**。\n" "- 完成所有维度后,请根据整体印象对回应方的身份做出做出“人类”或“机器人”的 **最终判断**。\n" "- 你可以使用“上一维度”和“下一维度”按钮在5个维度间自由切换和修改分数。") go_to_test_btn = gr.Button("开始测试", variant="primary") with test_page: gr.Markdown("## 正式测试") question_progress_text = gr.Markdown() test_dimension_title = gr.Markdown() test_audio = gr.Audio(label="测试音频") gr.Markdown("--- \n ### 请为对话中的回应者(非发起者)针对以下特征打分 (0-5分。0-特征无体现;1-机器;3-特征无偏向;5-人类)") test_sliders = [gr.Slider(minimum=0, maximum=5, step=1, label=f"Sub-dim {i+1}", visible=False, interactive=True) for i in range(MAX_SUB_DIMS)] with gr.Row(): prev_dim_btn = gr.Button("上一维度") next_dim_btn = gr.Button("下一维度", variant="primary") with final_judgment_page: gr.Markdown("## 最终判断") gr.Markdown("您已完成对所有维度的评分。请根据您的综合印象,做出最终判断。") final_human_robot_radio = gr.Radio(["👤 人类", "🤖 机器人"], label="请判断回应者类型 (必填)") submit_final_answer_btn = gr.Button("提交本题答案", variant="primary", interactive=False) with result_page: gr.Markdown("## 测试完成") result_text = gr.Markdown() back_to_welcome_btn = gr.Button("返回主界面", variant="primary") # ============================================================================== # 事件绑定 (Event Binding) & IO 列表定义 # ============================================================================== sample_init_outputs = [ info_page, sample_page, user_data_state, sample_dimension_selector, sample_audio, interactive_view, reference_view, reference_btn ] + sample_sliders + reference_sliders test_init_outputs = [ pretest_page, test_page, final_judgment_page, result_page, current_question_index, current_test_dimension_index, current_question_selections, question_progress_text, test_dimension_title, test_audio, prev_dim_btn, next_dim_btn, final_human_robot_radio, submit_final_answer_btn, ] + test_sliders nav_inputs = [current_question_index, current_test_dimension_index, current_question_selections] + test_sliders nav_outputs = [ test_page, final_judgment_page, current_question_index, current_test_dimension_index, current_question_selections, question_progress_text, test_dimension_title, test_audio, final_human_robot_radio, submit_final_answer_btn, prev_dim_btn, next_dim_btn, ] + test_sliders full_outputs_with_results = test_init_outputs + [test_results, result_text] # start_btn.click(fn=start_challenge, outputs=[welcome_page, info_page]) start_btn.click( fn=start_challenge, inputs=[user_data_state], outputs=[welcome_page, info_page, user_data_state] ) for comp in [age_input, gender_input, education_input, education_other_input, ai_experience_input]: comp.change( fn=check_info_complete, inputs=[username_input, age_input, gender_input, education_input, education_other_input, ai_experience_input], outputs=submit_info_btn ) education_input.change(fn=toggle_education_other, inputs=education_input, outputs=education_other_input) submit_info_btn.click( fn=show_sample_page_and_init, inputs=[username_input, age_input, gender_input, education_input, education_other_input, ai_experience_input, user_data_state], outputs=sample_init_outputs ) sample_dimension_selector.change( fn=update_sample_view, inputs=sample_dimension_selector, outputs=[sample_audio, interactive_view, reference_view, reference_btn] + sample_sliders + reference_sliders ) reference_btn.click( fn=toggle_reference_view, inputs=reference_btn, outputs=[interactive_view, reference_view, reference_btn] ) go_to_pretest_btn.click(lambda: (gr.update(visible=False), gr.update(visible=True)), outputs=[sample_page, pretest_page]) go_to_test_btn.click( fn=lambda user: init_test_question(user, 0) + ([], gr.update()), inputs=[user_data_state], outputs=full_outputs_with_results ) prev_dim_btn.click( fn=lambda q,d,s, *sliders: navigate_dimensions("prev", q,d,s, *sliders), inputs=nav_inputs, outputs=nav_outputs ) next_dim_btn.click( fn=lambda q,d,s, *sliders: navigate_dimensions("next", q,d,s, *sliders), inputs=nav_inputs, outputs=nav_outputs ) final_human_robot_radio.change( fn=lambda choice: gr.update(interactive=bool(choice)), inputs=final_human_robot_radio, outputs=submit_final_answer_btn ) submit_final_answer_btn.click( fn=submit_question_and_advance, inputs=[current_question_index, current_test_dimension_index, current_question_selections, final_human_robot_radio, test_results, user_data_state], outputs=full_outputs_with_results ) back_to_welcome_btn.click(fn=back_to_welcome, outputs=list(pages.values()) + [user_data_state, current_question_index, current_test_dimension_index, current_question_selections, test_results]) # ============================================================================== # 程序入口 (Entry Point) # ============================================================================== if __name__ == "__main__": if not os.path.exists("audio"): os.makedirs("audio") if "SPACE_ID" in os.environ: print("Running in a Hugging Face Space, checking for audio files...") # all_files = [q["audio"] for q in QUESTION_SET] + [d["audio"] for d in DIMENSIONS_DATA] all_files = [d["audio"] for d in DIMENSIONS_DATA] for audio_file in set(all_files): if not os.path.exists(audio_file): print(f"⚠️ Warning: Audio file not found: {audio_file}") demo.launch(debug=True)