Spark Chou
English version of app.py
22be697
raw
history blame
49.3 kB
import gradio as gr
import os
import json
import pandas as pd
import random
import shutil
import time
import collections
from functools import wraps
from filelock import FileLock
from datasets import load_dataset, Audio
from huggingface_hub import HfApi, hf_hub_download
from multiprocessing import TimeoutError
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError
# Load dataset from HuggingFace
dataset = load_dataset("intersteller2887/Turing-test-dataset-en", split="train")
dataset = dataset.cast_column("audio", Audio(decode=False)) # Prevent calling 'torchcodec' from newer version of 'datasets'
# Huggingface space working directory: "/home/user/app"
target_audio_dir = "/home/user/app/audio"
os.makedirs(target_audio_dir, exist_ok=True)
COUNT_JSON_PATH = "/home/user/app/count.json"
COUNT_JSON_REPO_PATH = "submissions/count.json" # Output directory (Huggingface dataset directory)
# Copy recordings to the working directory
local_audio_paths = []
for item in dataset:
src_path = item["audio"]["path"]
if src_path and os.path.exists(src_path):
filename = os.path.basename(src_path)
dst_path = os.path.join(target_audio_dir, filename)
if not os.path.exists(dst_path):
shutil.copy(src_path, dst_path)
local_audio_paths.append(dst_path)
all_data_audio_paths = local_audio_paths
# Take first file of the datasets as sample
sample1_audio_path = local_audio_paths[0]
print(sample1_audio_path)
# ==============================================================================
# Data Definition
# ==============================================================================
DIMENSIONS_DATA = [
{
"title": "Semantic and Pragmatic Features",
"audio": sample1_audio_path,
"sub_dims": [
"Memory Consistency: Human-like: Consistent memory in short contexts, and asks for clarification when memory deviations occur; Machine-like: Inconsistent memory across contexts and unable to detect or correct errors (e.g., forgetting key information and insisting on incorrect answers)",
"Logical Coherence: Human-like: Natural and smooth logic; Machine-like: Abrupt logical transitions or self-contradictions (e.g., suddenly changing topics without transition)",
"Pronunciation Accuracy: Human-like: Correct and natural pronunciation of words, with proper usage of polyphonic characters based on context; Machine-like: Unnatural pronunciation errors, mispronunciation of common polyphonic characters",
"Multilingual Mixing: Human-like: Multilingual mixing is often context-dependent (e.g., proper nouns, idiomatic expressions), with awkward or unnatural language switching; Machine-like: Rigid multilingual mixing without logical language switching",
"Imprecision in Language: Human-like: Uses vague expressions like 'more or less', 'probably', and may self-correct (e.g., 'no, no'); Machine-like: Rarely uses vague expressions, responses are precise and affirmative",
"Use of Fillers: Human-like: Frequently uses fillers (e.g., 'um', 'like') while thinking; Machine-like: Rare use of fillers or unnatural usage",
"Metaphor and Pragmatic Intent: Human-like: Uses metaphor, irony, and euphemism to convey layered meanings; Machine-like: Literal and direct, lacking semantic diversity, only capable of surface-level interpretation"
],
"reference_scores": [5, 5, 5, 0, 5, 5, 0]
},
{
"title": "Non-Physiological Paralinguistic Features",
"audio": sample1_audio_path,
"sub_dims": [
"Rhythm: Human-like: Speaking rate varies with semantic flow, occasional pauses or hesitations; Machine-like: Almost no pauses or mechanical pauses",
"Intonation: Human-like: Natural pitch rise or fall when expressing questions, surprise, or emphasis; Machine-like: Monotonous or overly regular pitch changes, inappropriate to the context",
"Stress: Human-like: Consciously emphasizes key words to highlight focus; Machine-like: No emphasis on words or abnormal emphasis placement",
"Auxiliary Vocalizations: Human-like: Produces context-appropriate non-verbal sounds, such as laughter or sighs; Machine-like: Contextually incorrect or mechanical auxiliary sounds, or completely absent"
],
"reference_scores": [5, 5, 5, 5]
},
{
"title": "Physiological Paralinguistic Features",
"audio": sample1_audio_path,
"sub_dims": [
"Micro-physiological Noise: Human-like: Presence of breathing sounds, saliva sounds, bubble noise, etc., naturally occurring during speech; Machine-like: Speech is overly clean or emits unnatural noises (e.g., electrical static)",
"Instability in Pronunciation: Human-like: Some irregularities in pronunciation (e.g., liaison, tremolo, slurred speech, nasal sounds); Machine-like: Pronunciation is overly clear and regular",
"Accent: Human-like: Natural regional accent or vocal traits; Machine-like: Stiff or unnatural accent"
],
"reference_scores": [5, 4, 4]
},
{
"title": "Mechanical Persona",
"audio": sample1_audio_path,
"sub_dims": [
"Sycophancy: Human-like: Judges whether to agree with requests or opinions based on context, doesn't always agree or echo; Machine-like: Frequently agrees, thanks, apologizes, excessively aligns with the other’s opinion, lacking genuine interaction",
"Written-style Expression: Human-like: Conversational, flexible, and varied expression; Machine-like: Responses are well-structured and formal, overly formal wording, frequent listing, and vague word choice"
],
"reference_scores": [5, 5]
},
{
"title": "Emotional Expression",
"audio": sample1_audio_path,
"sub_dims": [
"Semantic Level: Human-like: Displays human-like emotional responses to contexts such as sadness or joy; Machine-like: Fails to respond emotionally to the other’s feelings, or uses vague and context-inappropriate emotional language",
"Acoustic Level: Human-like: Pitch, volume, and rhythm dynamically change with emotion; Machine-like: Emotional tone is patterned or context-inappropriate"
],
"reference_scores": [5, 5]
}
]
DIMENSION_TITLES = [d["title"] for d in DIMENSIONS_DATA]
SPECIAL_KEYWORDS = ["Multilingual Mixing", "Metaphor and Pragmatic Intent", "Auxiliary Vocalizations", "Accent"]
MAX_SUB_DIMS = max(len(d['sub_dims']) for d in DIMENSIONS_DATA)
THE_SUB_DIMS = [d['sub_dims'] for d in DIMENSIONS_DATA]
# ==============================================================================
# Backend Function Definitions
# ==============================================================================
# This version did not place file reading into filelock, concurrent read could happen
"""def load_or_initialize_count_json(audio_paths):
try:
# Only try downloading if file doesn't exist yet
if not os.path.exists(COUNT_JSON_PATH):
downloaded_path = hf_hub_download(
repo_id="intersteller2887/Turing-test-dataset",
repo_type="dataset",
filename=COUNT_JSON_REPO_PATH,
token=os.getenv("HF_TOKEN")
)
# Save it as COUNT_JSON_PATH so that the lock logic remains untouched
with open(downloaded_path, "rb") as src, open(COUNT_JSON_PATH, "wb") as dst:
dst.write(src.read())
except Exception as e:
print(f"Could not download count.json from HuggingFace dataset: {e}")
# Add filelock to /workspace/count.json
lock_path = COUNT_JSON_PATH + ".lock"
# Read of count.json will wait for 10 seconds until another thread involving releases it, and then add a lock to it
with FileLock(lock_path, timeout=10):
# If count.json exists: load into count_data
# Else initialize count_data with orderedDict
if os.path.exists(COUNT_JSON_PATH):
with open(COUNT_JSON_PATH, "r", encoding="utf-8") as f:
count_data = json.load(f, object_pairs_hook=collections.OrderedDict)
else:
count_data = collections.OrderedDict()
updated = False
sample_audio_files = {os.path.basename(d["audio"]) for d in DIMENSIONS_DATA}
# Guarantee that the sample recording won't be take into the pool
# Update newly updated recordings into count.json
for path in audio_paths:
filename = os.path.basename(path)
if filename not in count_data:
if filename in sample_audio_files:
count_data[filename] = 999
else:
count_data[filename] = 0
updated = True
if updated or not os.path.exists(COUNT_JSON_PATH):
with open(COUNT_JSON_PATH, "w", encoding="utf-8") as f:
json.dump(count_data, f, indent=4, ensure_ascii=False)
return count_data"""
# Function that load or initialize count.json
# Function is called when user start a challenge, and this will load or initialize count.json to working directory
# Initialize happens when count.json does not exist in the working directory as well as HuggingFace dataset
# Load happens when count.json exists in HuggingFace dataset, and it's not loaded to the working directory yet
# After load/initialize, all newly added audio files will be added to count.json with initial value of 0
# Load/Initialize will generate count.json in the working directory for all users under this space
# This version also places file reading into filelock, and modified
def load_or_initialize_count_json(audio_paths):
# Add filelock to /workspace/count.json
lock_path = COUNT_JSON_PATH + ".lock"
with FileLock(lock_path, timeout=10):
# If count.json does not exist in the working directory, try to download it from HuggingFace dataset
if not os.path.exists(COUNT_JSON_PATH):
try:
# Save latest count.json to working directory
downloaded_path = hf_hub_download(
repo_id="intersteller2887/Turing-test-dataset-en",
repo_type="dataset",
filename=COUNT_JSON_REPO_PATH,
token=os.getenv("HF_TOKEN")
)
with open(downloaded_path, "rb") as src, open(COUNT_JSON_PATH, "wb") as dst:
dst.write(src.read())
except Exception:
pass
# If count.json exists in the working directory: load into count_data for potential update
if os.path.exists(COUNT_JSON_PATH):
with open(COUNT_JSON_PATH, "r", encoding="utf-8") as f:
count_data = json.load(f, object_pairs_hook=collections.OrderedDict)
# Else initialize count_data with orderedDict
# This happens when there is no count.json (both working directory and HuggingFace dataset)
else:
count_data = collections.OrderedDict()
updated = False
sample_audio_files = {os.path.basename(d["audio"]) for d in DIMENSIONS_DATA}
# Guarantee that the sample recording won't be take into the pool
# Update newly updated recordings into count.json
for path in audio_paths:
filename = os.path.basename(path)
if filename not in count_data:
if filename in sample_audio_files:
count_data[filename] = 999
else:
count_data[filename] = 0
updated = True
# Write updated count_data to /home/user/app/count.json
if updated or not os.path.exists(COUNT_JSON_PATH):
with open(COUNT_JSON_PATH, "w", encoding="utf-8") as f:
json.dump(count_data, f, indent=4, ensure_ascii=False)
return
# Shorten the time of playing previous audio when reached next question
def append_cache_buster(audio_path):
return f"{audio_path}?t={int(time.time() * 1000)}"
# Function that samples questions from avaliable question set
# This version utilizes a given count_data to sample audio paths
"""def sample_audio_paths(audio_paths, count_data, k=5, max_count=1): # k for questions per test; max_count for question limit in total
eligible_paths = [p for p in audio_paths if count_data.get(os.path.basename(p), 0) < max_count]
if len(eligible_paths) < k:
raise ValueError(f"可用音频数量不足(只剩 {len(eligible_paths)} 条 count<{max_count} 的音频),无法抽取 {k} 条")
# Shuffule to avoid fixed selections resulted from directory structure
selected = random.sample(eligible_paths, k)
# Once sampled a test, update these questions immediately
for path in selected:
filename = os.path.basename(path)
count_data[filename] = count_data.get(filename, 0) + 1
# Add filelock to /workspace/count.json
lock_path = COUNT_JSON_PATH + ".lock"
with FileLock(lock_path, timeout=10):
with open(COUNT_JSON_PATH, "w", encoding="utf-8") as f:
json.dump(count_data, f, indent=4, ensure_ascii=False)
return selected, count_data"""
# This version places file reading into filelock to guarantee correct update of count.json
def sample_audio_paths(audio_paths, k=5, max_count=1):
# Add filelock to /workspace/count.json
lock_path = COUNT_JSON_PATH + ".lock"
# Load newest count.json
with FileLock(lock_path, timeout=10):
with open(COUNT_JSON_PATH, "r", encoding="utf-8") as f:
count_data = json.load(f)
eligible_paths = [
p for p in audio_paths
if count_data.get(os.path.basename(p), 0) < max_count
]
if len(eligible_paths) < k:
raise ValueError(f"可用音频数量不足(只剩 {len(eligible_paths)} 条 count<{max_count} 的音频),无法抽取 {k} 条")
selected = random.sample(eligible_paths, k)
# Update count_data
for path in selected:
filename = os.path.basename(path)
count_data[filename] = count_data.get(filename, 0) + 1
# Update count.json
with open(COUNT_JSON_PATH, "w", encoding="utf-8") as f:
json.dump(count_data, f, indent=4, ensure_ascii=False)
# return selected, count_data
# Keep count_data atomic
return selected
# ==============================================================================
# Frontend Function Definitions
# ==============================================================================
# Save question_set in each user_data_state, preventing global sharing
def start_challenge(user_data_state):
load_or_initialize_count_json(all_data_audio_paths)
# selected_audio_paths, updated_count_data = sample_audio_paths(all_data_audio_paths, k=5)
# Keep count_data atomic
selected_audio_paths = sample_audio_paths(all_data_audio_paths, k=5)
question_set = [
{"audio": path, "desc": f"这是音频文件 {os.path.basename(path)} 的描述"}
for path in selected_audio_paths
]
user_data_state["question_set"] = question_set
# count_data is not needed in the user data
# user_data_state["updated_count_data"] = updated_count_data
return gr.update(visible=False), gr.update(visible=True), user_data_state
# This function toggles the visibility of the "其他(请注明)" input field based on the selected education choice
def toggle_education_other(choice):
is_other = (choice == "其他(请注明)")
return gr.update(visible=is_other, interactive=is_other, value="")
# This function checks if the user information is complete
def check_info_complete(username, age, gender, education, education_other, ai_experience):
if username.strip() and age and gender and education and ai_experience:
if education == "其他(请注明)" and not education_other.strip():
return gr.update(interactive=False)
return gr.update(interactive=True)
return gr.update(interactive=False)
# This function updates user_data and initializes the sample page (called when user submits their info)
def show_sample_page_and_init(username, age, gender, education, education_other, ai_experience, user_data):
final_edu = education_other if education == "其他(请注明)" else education
user_data.update({
"username": username.strip(),
"age": age,
"gender": gender,
"education": final_edu,
"ai_experience": ai_experience
})
first_dim_title = DIMENSION_TITLES[0]
initial_updates = update_sample_view(first_dim_title)
return [
gr.update(visible=False), gr.update(visible=True), user_data, first_dim_title
] + initial_updates
def update_sample_view(dimension_title):
dim_data = next((d for d in DIMENSIONS_DATA if d["title"] == dimension_title), None)
if dim_data:
audio_up = gr.update(value=dim_data["audio"])
# audio_up = gr.update(value=append_cache_buster(dim_data["audio"]))
interactive_view_up = gr.update(visible=True)
reference_view_up = gr.update(visible=False)
reference_btn_up = gr.update(value="参考")
sample_slider_ups = []
ref_slider_ups = []
scores = dim_data.get("reference_scores", [])
for i in range(MAX_SUB_DIMS):
if i < len(dim_data['sub_dims']):
label = dim_data['sub_dims'][i]
score = scores[i] if i < len(scores) else 0
sample_slider_ups.append(gr.update(visible=True, label=label, value=0))
ref_slider_ups.append(gr.update(visible=True, label=label, value=score))
else:
sample_slider_ups.append(gr.update(visible=False, value=0))
ref_slider_ups.append(gr.update(visible=False, value=0))
return [audio_up, interactive_view_up, reference_view_up, reference_btn_up] + sample_slider_ups + ref_slider_ups
empty_updates = [gr.update()] * 4
slider_empty_updates = [gr.update()] * (MAX_SUB_DIMS * 2)
return empty_updates + slider_empty_updates
def update_test_dimension_view(d_idx, selections):
# dimension = DIMENSIONS_DATA[d_idx]
slider_updates = []
dim_data = DIMENSIONS_DATA[d_idx]
sub_dims = dim_data["sub_dims"]
dim_title = dim_data["title"]
existing_scores = selections.get(dim_data['title'], {})
progress_d = f"Dimension {d_idx + 1} / {len(DIMENSIONS_DATA)}: **{dim_data['title']}**"
for i in range(MAX_SUB_DIMS):
if i < len(sub_dims):
desc = sub_dims[i]
# print(f"{desc} -> default value: {existing_scores.get(desc, 0)}")
name = desc.split(":")[0].strip()
default_value = 0 if name in SPECIAL_KEYWORDS else 1
value = existing_scores.get(desc, default_value)
slider_updates.append(gr.update(
visible=True,
label=desc,
minimum=default_value,
maximum=5,
step=1,
value=value,
interactive=True,
))
# slider_updates.append(gr.update(
# visible=True,
# label=desc,
# minimum=0 if name in SPECIAL_KEYWORDS else 1,
# maximum=5,
# value = existing_scores.get(desc, 0),
# interactive=True,
# ))
else:
slider_updates.append(gr.update(visible=False))
print(f"{desc} -> default value: {existing_scores.get(desc, 0)}")
# for i in range(MAX_SUB_DIMS):
# if i < len(dimension['sub_dims']):
# sub_dim_label = dimension['sub_dims'][i]
# value = existing_scores.get(sub_dim_label, 0)
# slider_updates.append(gr.update(visible=True, label=sub_dim_label, value=value))
# else:
# slider_updates.append(gr.update(visible=False, value=0))
prev_btn_update = gr.update(interactive=(d_idx > 0))
next_btn_update = gr.update(
value="Proceed to Final Judgement" if d_idx == len(DIMENSIONS_DATA) - 1 else "Next Dimension",
interactive=True
)
return [gr.update(value=progress_d), prev_btn_update, next_btn_update] + slider_updates
def init_test_question(user_data, q_idx):
d_idx = 0
question = user_data["question_set"][q_idx]
progress_q = f"第 {q_idx + 1} / {len(user_data['question_set'])} 题"
initial_updates = update_test_dimension_view(d_idx, {})
dim_title_update, prev_btn_update, next_btn_update = initial_updates[:3]
slider_updates = initial_updates[3:]
return (
gr.update(visible=False),
gr.update(visible=True),
gr.update(visible=False),
gr.update(visible=False),
q_idx, d_idx, {},
gr.update(value=progress_q),
dim_title_update,
gr.update(value=question['audio']),
# gr.update(value=append_cache_buster(question['audio'])),
prev_btn_update,
next_btn_update,
gr.update(value=None), # BUG FIX: Changed from "" to None to correctly clear the radio button
gr.update(interactive=False),
) + tuple(slider_updates)
def navigate_dimensions(direction, q_idx, d_idx, selections, *slider_values):
current_dim_data = DIMENSIONS_DATA[d_idx]
current_sub_dims = current_dim_data['sub_dims']
scores = {sub_dim: slider_values[i] for i, sub_dim in enumerate(current_sub_dims)}
selections[current_dim_data['title']] = scores
new_d_idx = d_idx + (1 if direction == "next" else -1)
if direction == "next" and d_idx == len(DIMENSIONS_DATA) - 1:
return (
gr.update(visible=False),
gr.update(visible=True),
q_idx, new_d_idx, selections,
gr.update(),
gr.update(),
gr.update(),
gr.update(interactive=True),
gr.update(interactive=False),
gr.update(interactive=False),
gr.update(interactive=False),
) + (gr.update(),) * MAX_SUB_DIMS
else:
view_updates = update_test_dimension_view(new_d_idx, selections)
dim_title_update, prev_btn_update, next_btn_update = view_updates[:3]
slider_updates = view_updates[3:]
return (
gr.update(), gr.update(),
q_idx, new_d_idx, selections,
gr.update(),
dim_title_update,
gr.update(),
gr.update(),
gr.update(),
prev_btn_update,
next_btn_update,
) + tuple(slider_updates)
def toggle_reference_view(current):
if current == "参考":
return gr.update(visible=False), gr.update(visible=True), gr.update(value="返回")
else:
return gr.update(visible=True), gr.update(visible=False), gr.update(value="参考")
def back_to_welcome():
return (
gr.update(visible=True), # welcome_page
gr.update(visible=False), # info_page
gr.update(visible=False), # sample_page
gr.update(visible=False), # pretest_page
gr.update(visible=False), # test_page
gr.update(visible=False), # final_judgment_page
gr.update(visible=False), # result_page
{}, # user_data_state
0, # current_question_index
0, # current_test_dimension_index
{}, # current_question_selections
[] # test_results
)
# ==============================================================================
# Retry Function Definitions
# ==============================================================================
# Decorator function that allows to use ThreadPoolExecutor to retry a function with timeout
def retry_with_timeout(max_retries=3, timeout=10, backoff=1):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries):
try:
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(func, *args, **kwargs)
try:
result = future.result(timeout=timeout)
return result
except FutureTimeoutError:
future.cancel()
raise TimeoutError(f"Operation timed out after {timeout} seconds")
except Exception as e:
last_exception = e
print(f"Attempt {attempt + 1} failed: {str(e)}")
if attempt < max_retries - 1:
time.sleep(backoff * (attempt + 1))
print(f"All {max_retries} attempts failed")
if last_exception:
raise last_exception
raise Exception("Unknown error occurred")
return wrapper
return decorator
def save_with_retry(all_results, user_data):
# 尝试上传到Hugging Face Hub
try:
# 使用线程安全的保存方式
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(save_all_results_to_file, all_results, user_data)
try:
future.result(timeout=30) # 设置30秒超时
return True
except FutureTimeoutError:
future.cancel()
print("上传超时")
return False
except Exception as e:
print(f"上传到Hub失败: {e}")
return False
def save_locally_with_retry(data, filename, max_retries=3):
for attempt in range(max_retries):
try:
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=4, ensure_ascii=False)
return True
except Exception as e:
print(f"本地保存尝试 {attempt + 1} 失败: {e}")
if attempt < max_retries - 1:
time.sleep(1)
return False
def update_count_with_retry(count_data, question_set, max_retries=3):
for attempt in range(max_retries):
try:
lock_path = COUNT_JSON_PATH + ".lock"
with FileLock(lock_path, timeout=10):
# Remove unfinished question(s) from count.json
for question in question_set:
filename = os.path.basename(question['audio'])
if filename in count_data and count_data[filename] < 1:
count_data[filename] = 0 # Mark unfinished data as 0
with open(COUNT_JSON_PATH, 'w', encoding='utf-8') as f:
json.dump(count_data, f, indent=4, ensure_ascii=False)
return True
except Exception as e:
print(f"Fail to update count.json {e} for {attempt + 1} time")
if attempt < max_retries - 1:
time.sleep(1)
return False
# ==============================================================================
# Previous version of submit_question_and_advance
"""def submit_question_and_advance(q_idx, d_idx, selections, final_choice, all_results, user_data):
# selections["final_choice"] = final_choice
cleaned_selections = {}
for dim_title, sub_scores in selections.items():
# if dim_title == "final_choice": # 去掉if判断
cleaned_selections["final_choice"] = final_choice
# continue
cleaned_sub_scores = {}
for sub_dim, score in sub_scores.items():
cleaned_sub_scores[sub_dim] = None if score == 0 else score
cleaned_selections[dim_title] = cleaned_sub_scores
final_question_result = {
"question_id": q_idx,
"audio_file": user_data["question_set"][q_idx]['audio'],
"selections": cleaned_selections
}
all_results.append(final_question_result)
q_idx += 1
# If q_idx hasn't reached the last one
if q_idx < len(user_data["question_set"]):
init_q_updates = init_test_question(user_data, q_idx) # Case 1: jam happens when initialize next question
return init_q_updates + (all_results, gr.update(value=""))
# If q_idx has reached the last one
else:
result_str = "### 测试全部完成!\n\n你的提交结果概览:\n"
for res in all_results:
# result_str += f"\n#### 题目: {res['audio_file']}\n"
result_str += f"##### 最终判断: **{res['selections'].get('final_choice', '未选择')}**\n"
for dim_title, dim_data in res['selections'].items():
if dim_title == 'final_choice': continue
result_str += f"- **{dim_title}**:\n"
for sub_dim, score in dim_data.items():
result_str += f" - *{sub_dim[:20]}...*: {score}/5\n"
# save_all_results_to_file(all_results, user_data)
# save_all_results_to_file(all_results, user_data, count_data=updated_count_data)
save_all_results_to_file(all_results, user_data, count_data=user_data.get("updated_count_data"))
return (
gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=True),
q_idx, d_idx, {},
gr.update(), gr.update(), gr.update(), gr.update(), gr.update(),
gr.update(), gr.update(),
) + (gr.update(),) * MAX_SUB_DIMS + (all_results, result_str)"""
# user_data now no further contain "updated_count_data", which should be read/write with filelock and be directly accessed from working directory
def submit_question_and_advance(q_idx, d_idx, selections, final_choice, all_results, user_data):
try:
# 准备数据
cleaned_selections = {}
for dim_title, sub_scores in selections.items():
cleaned_selections["final_choice"] = final_choice
cleaned_sub_scores = {}
for sub_dim, score in sub_scores.items():
cleaned_sub_scores[sub_dim] = None if score == 0 else score
cleaned_selections[dim_title] = cleaned_sub_scores
final_question_result = {
"question_id": q_idx,
"audio_file": user_data["question_set"][q_idx]['audio'],
"selections": cleaned_selections
}
all_results.append(final_question_result)
q_idx += 1
if q_idx < len(user_data["question_set"]):
init_q_updates = init_test_question(user_data, q_idx)
return init_q_updates + (all_results, gr.update(value=""))
else:
# 准备完整结果数据
result_str = "### 测试全部完成!\n\n你的提交结果概览:\n"
for res in all_results:
result_str += f"##### 最终判断: **{res['selections'].get('final_choice', '未选择')}**\n"
for dim_title, dim_data in res['selections'].items():
if dim_title == 'final_choice': continue
result_str += f"- **{dim_title}**:\n"
for sub_dim, score in dim_data.items():
result_str += f" - *{sub_dim[:20]}...*: {score}/5\n"
# 尝试上传(带重试)
try:
# success = save_with_retry(all_results, user_data, user_data.get("updated_count_data"))
success = save_with_retry(all_results, user_data)
except Exception as e:
print(f"上传过程中发生错误: {e}")
success = False
if not success:
# 上传失败,保存到本地
username = user_data.get("username", "anonymous")
timestamp = pd.Timestamp.now().strftime('%Y%m%d_%H%M%S')
local_filename = f"submission_{username}_{timestamp}.json"
# 准备数据包
user_info_clean = {
k: v for k, v in user_data.items() if k not in ["question_set"]
}
final_data_package = {
"user_info": user_info_clean,
"results": all_results
}
# 尝试保存到本地
local_success = save_locally_with_retry(final_data_package, local_filename)
if local_success:
result_str += f"\n\n⚠️ 上传失败,结果已保存到本地文件: {local_filename}"
else:
result_str += "\n\n❌ 上传失败且无法保存到本地文件,请联系管理员"
# 更新count.json(剔除未完成的题目)
try:
with FileLock(COUNT_JSON_PATH + ".lock", timeout=5):
with open(COUNT_JSON_PATH, "r", encoding="utf-8") as f:
count_data = json.load(f, object_pairs_hook=collections.OrderedDict)
count_update_success = update_count_with_retry(count_data, user_data["question_set"])
except Exception as e:
print(f"更新count.json失败: {e}")
count_update_success = False
if not count_update_success:
result_str += "\n\n⚠️ 无法更新题目计数,请联系管理员"
return (
gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=True),
q_idx, d_idx, {},
gr.update(), gr.update(), gr.update(), gr.update(), gr.update(),
gr.update(), gr.update(),
) + (gr.update(),) * MAX_SUB_DIMS + (all_results, result_str)
except Exception as e:
print(f"提交过程中发生错误: {e}")
# 返回错误信息
error_msg = f"提交过程中发生错误: {str(e)}"
return (
gr.update(), gr.update(), gr.update(), gr.update(),
q_idx, d_idx, selections,
gr.update(), gr.update(), gr.update(), gr.update(), gr.update(),
gr.update(), gr.update(),
) + (gr.update(),) * MAX_SUB_DIMS + (all_results, error_msg)
"""def save_all_results_to_file(all_results, user_data, count_data=None):
repo_id = "intersteller2887/Turing-test-dataset"
username = user_data.get("username", "user")
timestamp = pd.Timestamp.now().strftime('%Y%m%d_%H%M%S')
submission_filename = f"submissions_{username}_{timestamp}.json"
user_info_clean = {
k: v for k, v in user_data.items() if k not in ["question_set", "updated_count_data"]
}
final_data_package = {
"user_info": user_info_clean,
"results": all_results
}
json_string = json.dumps(final_data_package, ensure_ascii=False, indent=4)
hf_token = os.getenv("HF_TOKEN")
if not hf_token:
print("HF_TOKEN not found. Cannot upload to the Hub.")
return
try:
api = HfApi()
# Upload submission file
api.upload_file(
path_or_fileobj=bytes(json_string, "utf-8"),
path_in_repo=f"submissions/{submission_filename}",
repo_id=repo_id,
repo_type="dataset",
token=hf_token,
commit_message=f"Add new submission from {username}"
)
print(f"上传成功: {submission_filename}")
if count_data:
with FileLock(COUNT_JSON_PATH + ".lock", timeout=10):
with open(COUNT_JSON_PATH, "w", encoding="utf-8") as f:
json.dump(count_data, f, indent=4, ensure_ascii=False)
api.upload_file(
path_or_fileobj=COUNT_JSON_PATH,
path_in_repo=COUNT_JSON_REPO_PATH,
repo_id=repo_id,
repo_type="dataset",
token=hf_token,
commit_message=f"Update count.json after submission by {username}"
)
except Exception as e:
print(f"上传出错: {e}")"""
def save_all_results_to_file(all_results, user_data):
repo_id = "intersteller2887/Turing-test-dataset-en"
username = user_data.get("username", "user")
timestamp = pd.Timestamp.now().strftime('%Y%m%d_%H%M%S')
submission_filename = f"submissions_{username}_{timestamp}.json"
user_info_clean = {
k: v for k, v in user_data.items() if k not in ["question_set"]
}
final_data_package = {
"user_info": user_info_clean,
"results": all_results
}
json_string = json.dumps(final_data_package, ensure_ascii=False, indent=4)
hf_token = os.getenv("HF_TOKEN")
if not hf_token:
raise Exception("HF_TOKEN not found. Cannot upload to the Hub.")
api = HfApi()
# 上传提交文件(不再使用装饰器,直接调用)
api.upload_file(
path_or_fileobj=bytes(json_string, "utf-8"),
path_in_repo=f"submissions/{submission_filename}",
repo_id=repo_id,
repo_type="dataset",
token=hf_token,
commit_message=f"Add new submission from {username}"
)
try:
with FileLock(COUNT_JSON_PATH + ".lock", timeout=5):
with open(COUNT_JSON_PATH, "r", encoding="utf-8") as f:
count_data_str = f.read()
api.upload_file(
path_or_fileobj=bytes(count_data_str, "utf-8"),
path_in_repo=COUNT_JSON_REPO_PATH,
repo_id=repo_id,
repo_type="dataset",
token=hf_token,
commit_message=f"Update count.json after submission by {username}"
)
except Exception as e:
print(f"上传 count.json 失败: {e}")
# ==============================================================================
# Gradio 界面定义 (Gradio UI Definition)
# ==============================================================================
with gr.Blocks(theme=gr.themes.Soft(), css=".gradio-container {max-width: 960px !important}") as demo:
user_data_state = gr.State({})
current_question_index = gr.State(0)
current_test_dimension_index = gr.State(0)
current_question_selections = gr.State({})
test_results = gr.State([])
welcome_page = gr.Column(visible=True)
info_page = gr.Column(visible=False)
sample_page = gr.Column(visible=False)
pretest_page = gr.Column(visible=False)
test_page = gr.Column(visible=False)
final_judgment_page = gr.Column(visible=False)
result_page = gr.Column(visible=False)
pages = {
"welcome": welcome_page, "info": info_page, "sample": sample_page,
"pretest": pretest_page, "test": test_page, "final_judgment": final_judgment_page,
"result": result_page
}
with welcome_page:
gr.Markdown("# AI 识破者\n你将听到一系列对话,请判断哪个回应者是 AI。")
start_btn = gr.Button("开始挑战", variant="primary")
with info_page:
gr.Markdown("## 请提供一些基本信息")
username_input = gr.Textbox(label="用户名", placeholder="请输入你的昵称")
age_input = gr.Radio(["18岁以下", "18-25岁", "26-35岁", "36-50岁", "50岁以上"], label="年龄")
gender_input = gr.Radio(["男", "女", "其他"], label="性别")
education_input = gr.Radio(["高中及以下", "本科", "硕士", "博士", "其他"], label="学历")
education_other_input = gr.Textbox(label="请填写你的学历", visible=False, interactive=False)
ai_experience_input = gr.Radio(["从未使用过", "偶尔接触(如看别人用)", "使用过几次,了解基本功能", "经常使用,有一定操作经验", "非常熟悉,深入使用过多个 AI 工具"], label="对 AI 工具的熟悉程度")
submit_info_btn = gr.Button("提交并开始学习样例", variant="primary", interactive=False)
with sample_page:
gr.Markdown("## 样例分析\n请选择一个维度进行学习和打分练习。所有维度共用同一个样例音频。")
sample_dimension_selector = gr.Radio(DIMENSION_TITLES, label="选择学习维度", value=DIMENSION_TITLES[0])
with gr.Row():
with gr.Column(scale=1):
sample_audio = gr.Audio(label="样例音频", value=DIMENSIONS_DATA[0]["audio"])
with gr.Column(scale=2):
with gr.Column(visible=True) as interactive_view:
gr.Markdown("#### 请为以下特征打分 (0-5分。0-特征无体现;1-机器;3-特征无偏向;5-人类)")
sample_sliders = [gr.Slider(minimum=0, maximum=5, step=1, label=f"Sub-dim {i+1}", visible=False, interactive=True) for i in range(MAX_SUB_DIMS)]
with gr.Column(visible=False) as reference_view:
gr.Markdown("### 参考答案解析")
reference_sliders = [gr.Slider(minimum=0, maximum=5, step=1, label=f"Sub-dim {i+1}", visible=False, interactive=False) for i in range(MAX_SUB_DIMS)]
with gr.Row():
reference_btn = gr.Button("参考")
go_to_pretest_btn = gr.Button("我明白了,开始测试", variant="primary")
with pretest_page:
gr.Markdown("## 测试说明\n"
"- 对于每一道题,你都需要对全部 **5 个维度** 进行评估。\n"
"- 在每个维度下,请为出现的每个特征 **从0到5打分**。\n"
"- **评分解释如下:**\n"
" - **0 分:特征未体现** (有些特征一定会体现,所以按1到5打分);\n"
" - **1 分:极度符合机器特征**;\n"
" - **2 分:较为符合机器特征**;\n"
" - **3 分:无明显人类或机器倾向**;\n"
" - **4 分:较为符合人类特征**;\n"
" - **5 分:极度符合人类特征**。\n"
"- 完成所有维度后,请根据整体印象对回应方的身份做出做出“人类”或“机器人”的 **最终判断**。\n"
"- 你可以使用“上一维度”和“下一维度”按钮在5个维度间自由切换和修改分数。\n"
"## 特别注意\n"
"- 我们希望您能判断每个维度上**回应者**的表现是**偏向人还是机器**,分数的大小反映回应者的语音类人的程度,而**不是**这个维度体现的程度多少\n(如读音正确也不代表是人类,读音错误也不代表是机器,您应当判断的是“听到的发音更偏向机器还是人类”)\n"
"- 即使您一开始就已经很肯定回应方的身份,同样应当**独立地**对每个维度上回应方的表现进行细致的评判。比如您很肯定回应方是机器,也需要独立地对每个维度判断,而非简单地将每个维度归为偏机器。")
go_to_test_btn = gr.Button("开始测试", variant="primary")
with test_page:
gr.Markdown("## 正式测试")
question_progress_text = gr.Markdown()
test_dimension_title = gr.Markdown()
test_audio = gr.Audio(label="测试音频")
gr.Markdown("--- \n ### 请为对话中的回应者(非发起者)针对以下特征打分 (0-5分。0-特征无体现;1-机器;3-特征无偏向;5-人类)")
test_sliders = [gr.Slider(minimum=0, maximum=5, step=1, label=f"Sub-dim {i+1}", visible=False, interactive=True, show_label = True) for i in range(MAX_SUB_DIMS)]
with gr.Row():
prev_dim_btn = gr.Button("上一维度")
next_dim_btn = gr.Button("下一维度", variant="primary")
with final_judgment_page:
gr.Markdown("## 最终判断")
gr.Markdown("您已完成对所有维度的评分。请根据您的综合印象,做出最终判断。")
final_human_robot_radio = gr.Radio(["👤 人类", "🤖 机器人"], label="请判断回应者类型 (必填)")
submit_final_answer_btn = gr.Button("提交本题答案", variant="primary", interactive=False)
with result_page:
gr.Markdown("## 测试完成")
result_text = gr.Markdown()
back_to_welcome_btn = gr.Button("返回主界面", variant="primary")
# ==============================================================================
# 事件绑定 (Event Binding) & IO 列表定义
# ==============================================================================
sample_init_outputs = [
info_page, sample_page, user_data_state, sample_dimension_selector,
sample_audio, interactive_view, reference_view, reference_btn
] + sample_sliders + reference_sliders
test_init_outputs = [
pretest_page, test_page, final_judgment_page, result_page,
current_question_index, current_test_dimension_index, current_question_selections,
question_progress_text, test_dimension_title, test_audio,
prev_dim_btn, next_dim_btn,
final_human_robot_radio, submit_final_answer_btn,
] + test_sliders
nav_inputs = [current_question_index, current_test_dimension_index, current_question_selections] + test_sliders
nav_outputs = [
test_page, final_judgment_page,
current_question_index, current_test_dimension_index, current_question_selections,
question_progress_text, test_dimension_title, test_audio,
final_human_robot_radio, submit_final_answer_btn,
prev_dim_btn, next_dim_btn,
] + test_sliders
full_outputs_with_results = test_init_outputs + [test_results, result_text]
# start_btn.click(fn=start_challenge, outputs=[welcome_page, info_page])
start_btn.click(
fn=start_challenge,
inputs=[user_data_state],
outputs=[welcome_page, info_page, user_data_state]
)
for comp in [age_input, gender_input, education_input, education_other_input, ai_experience_input]:
comp.change(
fn=check_info_complete,
inputs=[username_input, age_input, gender_input, education_input, education_other_input, ai_experience_input],
outputs=submit_info_btn
)
education_input.change(fn=toggle_education_other, inputs=education_input, outputs=education_other_input)
submit_info_btn.click(
fn=show_sample_page_and_init,
inputs=[username_input, age_input, gender_input, education_input, education_other_input, ai_experience_input, user_data_state],
outputs=sample_init_outputs
)
sample_dimension_selector.change(
fn=update_sample_view,
inputs=sample_dimension_selector,
outputs=[sample_audio, interactive_view, reference_view, reference_btn] + sample_sliders + reference_sliders
)
reference_btn.click(
fn=toggle_reference_view,
inputs=reference_btn,
outputs=[interactive_view, reference_view, reference_btn]
)
go_to_pretest_btn.click(lambda: (gr.update(visible=False), gr.update(visible=True)), outputs=[sample_page, pretest_page])
go_to_test_btn.click(
fn=lambda user: init_test_question(user, 0) + ([], gr.update()),
inputs=[user_data_state],
outputs=full_outputs_with_results
)
prev_dim_btn.click(
fn=lambda q,d,s, *sliders: navigate_dimensions("prev", q,d,s, *sliders),
inputs=nav_inputs, outputs=nav_outputs
)
next_dim_btn.click(
fn=lambda q,d,s, *sliders: navigate_dimensions("next", q,d,s, *sliders),
inputs=nav_inputs, outputs=nav_outputs
)
final_human_robot_radio.change(
fn=lambda choice: gr.update(interactive=bool(choice)),
inputs=final_human_robot_radio,
outputs=submit_final_answer_btn
)
submit_final_answer_btn.click(
fn=submit_question_and_advance,
inputs=[current_question_index, current_test_dimension_index, current_question_selections, final_human_robot_radio, test_results, user_data_state],
outputs=full_outputs_with_results
)
back_to_welcome_btn.click(fn=back_to_welcome, outputs=list(pages.values()) + [user_data_state, current_question_index, current_test_dimension_index, current_question_selections, test_results])
# ==============================================================================
# 程序入口 (Entry Point)
# ==============================================================================
if __name__ == "__main__":
if not os.path.exists("audio"):
os.makedirs("audio")
if "SPACE_ID" in os.environ:
print("Running in a Hugging Face Space, checking for audio files...")
# all_files = [q["audio"] for q in QUESTION_SET] + [d["audio"] for d in DIMENSIONS_DATA]
all_files = [d["audio"] for d in DIMENSIONS_DATA]
for audio_file in set(all_files):
if not os.path.exists(audio_file):
print(f"⚠️ Warning: Audio file not found: {audio_file}")
demo.launch(debug=True)