BosonAI_Hackathon / tools /step030_translation.py
github-actions[bot]
Deploy snapshot for HF Space (LFS pointers, heavy tests removed)
09eaf7c
# -*- coding: utf-8 -*-
"""
Step030 — Translation pipeline (robust + language-aware + enforcement) [streamlined]
Goal: Proper translation + storage to JSON quickly, without breaking existing usage.
Key tweaks:
- Early skip for non-speech tokens (e.g., "[LAUGHTER]") to avoid wasted calls.
- Normalized de-dup (spacing/case) so repeated lines translate once.
- Optional FAST mode to prefer MT path automatically (env toggle; default off).
- Parallel MT path preserved; safer caching; tighter sleeps/backoff.
- Stricter "absolute translation" enforcement (rejects same-language paraphrases) with smart relaxations.
- Progressive validation (strict → relaxed) + faster MT fallback.
- Atomic writes for JSON outputs.
- NEW: Strip <t>...</t> wrappers from all final outputs (no <t> in translation.json).
Public APIs preserved:
summarize(...)
translate(...)
translate_all_transcript_under_folder(...)
"""
from __future__ import annotations
import json
import os
import re
import time
import string
from typing import List, Dict, Tuple, Any, Optional
from functools import lru_cache
from concurrent.futures import ThreadPoolExecutor, as_completed
from dotenv import load_dotenv
from loguru import logger
# Backends (keep your existing modules/paths)
from tools.step032_translation_llm import llm_response
from tools.step033_translation_translator import translator_response
load_dotenv()
# ============================================================
# Tunables (perf+behavior knobs; defaults conservative)
# ============================================================
ENABLE_BACKTRANSLATE_VERIFY = os.getenv("TRANSLATION_BACKTRANSLATE_VERIFY", "0") == "1"
ENABLE_DEDUP_SAME_LINES = os.getenv("TRANSLATION_DEDUP", "1") == "1"
MT_MAX_WORKERS = max(1, int(os.getenv("TRANSLATION_MT_MAX_WORKERS", "4"))) # only used on MT path
RETRY_SLEEP_S = float(os.getenv("TRANSLATION_RETRY_SLEEP", "0.2"))
SMALL_SLEEP_S = float(os.getenv("TRANSLATION_SMALL_SLEEP", "0.03"))
LLM_MAX_RETRIES = max(1, int(os.getenv("TRANSLATION_LLM_MAX_RETRIES", "3"))) # default 3
LLM_HISTORY_WINDOW = max(12, int(os.getenv("TRANSLATION_LLM_HISTORY_WINDOW", "14")))
SUMMARY_TEXT_LIMIT = max(800, int(os.getenv("TRANSLATION_SUMMARY_TEXT_LIMIT", "1600")))
FAST_TRANSLATION_MODE = os.getenv("TRANSLATION_FAST_MODE", "0") == "1" # prefer MT path automatically
# Non-speech pattern (skip heavy translation path)
_NON_SPEECH = re.compile(
r'^\s*\[(?:music|applause|laughter|silent|silence|noise|beat|pause|inaudible|coughs|cough|breath|breathing)[^\]]*\]\s*$',
re.I
)
# ============================================================
# Precompiled regexes
# ============================================================
_RE_FW_PARENS = re.compile(r'\([^)]*\)')
_RE_NUM_COMMA = re.compile(r'(?<=\d),(?=\d)')
_RE_JSON_FENCE = re.compile(r"```json\s*(\{.*?\})\s*```", re.DOTALL | re.IGNORECASE)
_RE_PREFIXES = [
re.compile(pat, re.IGNORECASE | re.DOTALL)
for pat in [
r'^\s*translated\s*text\s*:\s*(.+)$',
r'^\s*translation\s*:\s*(.+)$',
r'^\s*译文\s*[::]\s*(.+)$',
r'^\s*翻译\s*[::]\s*(.+)$',
r'^\s*resultado\s*[::]\s*(.+)$',
r'^\s*traducci[oó]n\s*[::]\s*(.+)$',
]
]
_RE_CJK = re.compile(r'[\u3400-\u4dbf\u4e00-\u9fff\uf900-\ufaff]')
_RE_HIRA = re.compile(r'[\u3040-\u309f]')
_RE_KATA = re.compile(r'[\u30a0-\u30ff]')
_RE_HANG = re.compile(r'[\uac00-\ud7af]')
_RE_LATN = re.compile(r'[A-Za-z]')
_PUNC_TABLE = str.maketrans('', '', string.punctuation + ",。!?;:、“”‘’—…《》·")
# Strip <t>...</t> safely (NEW)
_RE_T_WRAPPER = re.compile(r'^\s*<t\s*>(.*?)</t\s*>\s*$', re.IGNORECASE | re.DOTALL)
_RE_T_TAGS = re.compile(r'</?t\s*>', re.IGNORECASE)
def _strip_t_tags(s: str) -> str:
if not s:
return s
m = _RE_T_WRAPPER.match(s)
if m:
return m.group(1).strip()
# If it's not a perfect single wrapper, remove any loose <t> / </t> occurrences
return _RE_T_TAGS.sub('', s).strip()
# For CN sentence splitting
_RE_CN_SPLIT_1 = re.compile(r'([。!?\?])([^,。!?\?”’》])')
_RE_CN_SPLIT_2 = re.compile(r'(\.{6})([^,。!?\?”’》])') # ......
_RE_CN_SPLIT_3 = re.compile(r'(\…{2})([^,。!?\?”’》])') # ……
_RE_CN_SPLIT_4 = re.compile(r'([。!?\?][”’])([^ ,。!?\?”’》])')
_RE_LAT_SPLIT = re.compile(r'(?<=[.!?])\s+')
# ============================================================
# Utilities & small helpers
# ============================================================
def get_necessary_info(info: dict) -> dict:
return {
'title': info.get('title', ''),
'uploader': info.get('uploader', ''),
'description': info.get('description', ''),
'upload_date': info.get('upload_date', ''),
'tags': info.get('tags', []),
}
def ensure_transcript_length(transcript: str, max_length: int = 4000) -> str:
if len(transcript) <= max_length:
return transcript
mid = len(transcript) // 2
half = max_length // 2
return transcript[:mid][:half] + transcript[mid:][-half:]
def _is_chinese_target(lang: str) -> bool:
lang = (lang or "").lower()
return any(k in lang for k in ["zh", "简体", "繁体", "中文", "chinese"])
def translation_postprocess(result: str, target_language: str = "简体中文") -> str:
result = (result or "").strip()
result = _strip_t_tags(result) # ensure <t> never survives
result = _RE_FW_PARENS.sub('', result)
result = _RE_NUM_COMMA.sub('', result)
result = result.replace('²', '^2')
if _is_chinese_target(target_language):
result = (result
.replace('...', ',')
.replace('————', ':')
.replace('——', ':')
.replace('°', '度')
.replace('变压器', 'Transformer')
.replace('AI', '人工智能'))
return result
def _extract_first_json_object(text: str) -> dict:
if not text:
raise ValueError("Empty text")
m = _RE_JSON_FENCE.search(text)
if m:
return json.loads(m.group(1).strip())
# Brace-balance scan
start = text.find("{")
while start != -1:
depth = 0
for i in range(start, len(text)):
ch = text[i]
if ch == "{":
depth += 1
elif ch == "}":
depth -= 1
if depth == 0:
candidate = text[start:i+1]
try:
return json.loads(candidate)
except Exception:
break
start = text.find("{", start + 1)
raise ValueError("No valid JSON object found in text")
def _pluck_translation_payload(raw: str) -> str:
if not raw:
return ""
t = raw.strip()
if t.startswith("```") and t.endswith("```"):
t = t[3:-3].strip()
try:
obj = _extract_first_json_object(t)
for key in ("translation", "译文", "resultado", "traducción", "traduccion"):
val = obj.get(key)
if isinstance(val, str) and val.strip():
return _strip_t_tags(val.strip()) # strip <t> here
except Exception:
pass
for rex in _RE_PREFIXES:
m = rex.match(t)
if m:
t = m.group(1).strip()
break
m = (re.search(r'“([^”]+)”', t) or
re.search(r'"([^"]+)"', t) or
re.search(r'‘([^’]+)’', t) or
re.search(r"'([^']+)'", t))
if m and len(m.group(1).strip()) >= 1:
return _strip_t_tags(m.group(1).strip()) # strip <t> here
wrappers = ['“', '”', '"', '‘', '’', "'", '《', '》', '「', '」', '『', '』']
while len(t) >= 2 and t[0] in wrappers and t[-1] in wrappers:
t = t[1:-1].strip()
return _strip_t_tags(t.strip()) # strip <t> here
# ============================================================
# Language normalization & detection
# ============================================================
def _norm_lang_label(label: str) -> str:
if not label:
return "unknown"
s = label.strip().lower()
mapping = {
"chinese": "zh", "simplified chinese": "zh", "zh": "zh", "zh-cn": "zh", "zh_cn": "zh",
"简体中文": "zh", "中文": "zh",
"english": "en", "en": "en", "en-us": "en", "en_gb": "en",
"japanese": "ja", "ja": "ja", "日本語": "ja",
"korean": "ko", "ko": "ko", "韩国语": "ko", "한국어": "ko",
"spanish": "es", "es": "es", "español": "es",
"french": "fr", "fr": "fr", "français": "fr",
}
return mapping.get(s, "unknown")
def _heuristic_lang(text: str) -> str:
t = text or ""
cjk = len(_RE_CJK.findall(t))
hira = len(_RE_HIRA.findall(t))
kata = len(_RE_KATA.findall(t))
hang = len(_RE_HANG.findall(t))
latin = len(_RE_LATN.findall(t))
if (hira + kata) > 0:
return "ja"
if hang > 0:
return "ko"
if cjk > 0 and (hira + kata + hang) == 0:
return "zh"
if latin > 0 and (cjk + hira + kata + hang) == 0:
return "en"
return "unknown"
try:
import cld3 # type: ignore
def _detect_lang(text: str) -> str:
# heuristic first (cheap), CLD3 if needed
h = _heuristic_lang(text)
if h != "unknown":
return h
res = cld3.get_language(text or "")
if res and res.language:
code = res.language.lower()
if code.startswith("zh"): return "zh"
if code.startswith("en"): return "en"
if code.startswith("ja"): return "ja"
if code.startswith("ko"): return "ko"
if code.startswith("es"): return "es"
if code.startswith("fr"): return "fr"
if code.startswith("pl"): return "pl"
return h
except Exception:
def _detect_lang(text: str) -> str:
return _heuristic_lang(text)
# ============================================================
# Similarity / overlap guards
# ============================================================
def _token_set(s: str) -> set:
s = (s or "").lower().translate(_PUNC_TABLE)
return set(s.split())
def _too_similar_to_source(src: str, tgt: str, threshold: float = 0.92) -> bool:
ts, tt = _token_set(src), _token_set(tgt)
if not ts or not tt:
return False
overlap = len(ts & tt) / max(1, len(ts | tt))
return overlap >= threshold
# ============================================================
# Tiny / numeric inputs helpers
# ============================================================
_MICRO_MAX = 3
_RE_NUMERICISH = re.compile(r'^[\d\W_]+$') # digits/punct/underscore only (no letters)
def _is_micro_utterance(s: str) -> bool:
return len((s or "").strip()) <= _MICRO_MAX
def _is_numericish(s: str) -> bool:
return bool(_RE_NUMERICISH.fullmatch((s or "").strip()))
# ============================================================
# Back-translation verification (optional)
# ============================================================
def _verify_by_backtranslation(src_text: str, tgt_text: str, target_language: str) -> bool:
# Skip noisy verification for tiny/numeric content
if _is_micro_utterance(src_text) or _is_numericish(src_text):
return True
try:
src_code = _detect_lang(src_text)
src_label = {
"zh": "Chinese", "en": "English", "ja": "Japanese", "ko": "Korean",
"es": "Spanish", "fr": "French"
}.get(src_code, "English")
bt = translator_response(tgt_text, to_language=src_label, translator_server='google')
ts, tb = _token_set(src_text), _token_set(bt)
if not ts or not tb:
return True
jacc = len(ts & tb) / max(1, len(ts | tb))
return jacc >= 0.25
except Exception:
return True
# ============================================================
# Validation — enforces absolute translation (with progressive strictness)
# ============================================================
def valid_translation(
text: str,
translation: str,
target_language: str = "简体中文",
*,
strict: bool = True
) -> Tuple[bool, str]:
t = _pluck_translation_payload(translation)
if not t:
return False, 'Only translate the following sentence and give me the result.'
# Postprocess early (also strips <t> if any)
t = translation_postprocess(t, target_language)
src_len = len(text or "")
out_len = len(t)
# Allow a bit more expansion; looser when strict=False
limit = max(24, int(src_len * (3.0 if strict else 3.6)))
if src_len > 10 and out_len > limit:
return False, 'The translation is too long. Only translate the sentence and give me the result.'
if src_len <= 10 and out_len > (50 if not strict else 40):
return False, 'Only translate the sentence and give me the result.'
target_code = _norm_lang_label(target_language)
trans_code = _detect_lang(t)
src_code = _detect_lang(text)
# Micro-utterance fast path: only enforce language
if _is_micro_utterance(text):
if target_code != "unknown" and trans_code != "unknown" and trans_code != target_code:
return False, f'Output must be in {target_language}. Only output the translation (no explanations).'
return True, t
# Must be in target language
if target_code != "unknown" and trans_code != "unknown" and trans_code != target_code:
return False, f'Output must be in {target_language}. Only output the translation (no explanations).'
# Hard reject same-language paraphrase (threshold slightly stricter)
if trans_code != "unknown" and src_code != "unknown" and trans_code == src_code:
if _too_similar_to_source(text, t, threshold=0.92):
return False, f'The output is not a translation. Translate into {target_language} and output only the translated text.'
# Script coverage guards (RELAXED)
if target_code == "zh":
cjk = len(_RE_CJK.findall(t))
min_ratio = 0.30 if strict else 0.25
if out_len > 0 and (cjk / out_len) < min_ratio:
return False, 'Output must be in Chinese. Only output the translation.'
if target_code == "ja":
kana = len(_RE_HIRA.findall(t)) + len(_RE_KATA.findall(t))
min_ratio = 0.12 if strict else 0.10
if out_len > 0 and (kana / out_len) < min_ratio and len(_RE_CJK.findall(t)) < 2:
return False, 'Output must be in Japanese. Only output the translation.'
if target_code == "ko":
hang = len(_RE_HANG.findall(t))
min_ratio = 0.25 if strict else 0.20
if out_len > 0 and (hang / out_len) < min_ratio:
return False, 'Output must be in Korean. Only output the translation.'
# Some visible text required
if not re.search(r'\w', t, flags=re.UNICODE) and not _RE_CJK.search(t):
return False, 'Only output the translation text.'
return True, t
# ============================================================
# Sentence splitting & timing
# ============================================================
def split_text_into_sentences(para: str, target_language: str = "简体中文") -> List[str]:
para = (para or "").strip()
if not para:
return []
if _is_chinese_target(target_language):
para = _RE_CN_SPLIT_1.sub(r"\1\n\2", para)
para = _RE_CN_SPLIT_2.sub(r"\1\n\2", para)
para = _RE_CN_SPLIT_3.sub(r"\1\n\2", para)
para = _RE_CN_SPLIT_4.sub(r'\1\n\2', para)
return [s.strip() for s in para.rstrip().split("\n") if s.strip()]
return [p.strip() for p in _RE_LAT_SPLIT.split(para) if p.strip()]
def split_sentences(translation_items: List[Dict], target_language: str = "简体中文", use_char_based_end: bool = True) -> List[Dict]:
output = []
for item in translation_items:
start = float(item['start'])
end = float(item['end'])
text = item['text']
speaker = item['speaker']
translation_text = (item.get('translation') or "").strip()
if not translation_text:
output.append({
"start": round(start, 3),
"end": round(end, 3),
"text": text,
"speaker": speaker,
"translation": translation_text
})
continue
sentences = split_text_into_sentences(translation_text, target_language) or [translation_text]
if use_char_based_end:
total_chars = max(1, sum(len(s) for s in sentences))
duration = end - start
acc = start
for i, s in enumerate(sentences):
if i < len(sentences) - 1:
seg = duration * (len(s) / total_chars)
seg_end = acc + seg
else:
seg_end = end
output.append({
"start": round(acc, 3),
"end": round(seg_end, 3),
"text": text,
"speaker": speaker,
"translation": s
})
acc = seg_end
else:
for s in sentences:
output.append({
"start": round(start, 3),
"end": round(end, 3),
"text": text,
"speaker": speaker,
"translation": s
})
return output
# ============================================================
# Summarization + summary translate (kept; fast limit)
# ============================================================
def summarize(info: dict, transcript: List[dict], target_language: str = '简体中文', method: str = 'LLM') -> dict:
transcript_text = ' '.join(line.get('text', '') for line in transcript)
transcript_text = ensure_transcript_length(transcript_text, max_length=SUMMARY_TEXT_LIMIT)
info_message = f'Title: "{info["title"]}" Author: "{info["uploader"]}". '
if method in ['Google Translate', 'Bing Translate']:
full_description = f'{info_message}\n{transcript_text}\n{info_message}\n'
translation = translator_response(full_description, target_language)
return {
'title': translator_response(info['title'], target_language),
'author': info['uploader'],
'summary': translation,
'language': target_language,
'tags': info.get('tags', [])
}
schema_hint = (
'Return ONLY JSON with the keys "title" and "summary". '
'Example: {"title": "t", "summary": "s"}'
)
messages = [
{'role': 'system',
'content': f'You are an expert in the field of this video. {schema_hint}'},
{'role': 'user',
'content': f'The following is the full content of the video:\n'
f'{info_message}\n{transcript_text}\n{info_message}\n'
f'Please summarize the video in JSON only.'},
]
summary_obj = None
for attempt in range(6):
try:
response = llm_response(messages) if method == 'LLM' else None
logger.debug(f"[summarize] raw response (attempt {attempt+1}): {str(response)[:300]}...")
summary_obj = _extract_first_json_object(response)
t = (summary_obj.get('title') or '').strip()
s = (summary_obj.get('summary') or '').strip()
if not t or not s or 'title' in t.lower():
raise ValueError("Invalid summary fields")
break
except Exception as e:
logger.debug(f"[summarize] parse error: {e}")
time.sleep(RETRY_SLEEP_S)
if summary_obj is None:
# graceful fallback: a minimal summary using info
summary_obj = {"title": info.get("title", "Untitled"), "summary": info.get("description", "")}
safe_title = summary_obj["title"].replace('"', '\\"')
safe_summary = summary_obj["summary"].replace('"', '\\"')
safe_tags = json.dumps(info.get("tags", []), ensure_ascii=False)
trans_messages = [
{'role': 'system',
'content': (
f'You are a native speaker of {target_language}. '
f'Return ONLY JSON: {{"title": "...", "summary": "...", "tags": ["..."]}}'
)},
{'role': 'user',
'content': (
f'Please translate the following into {target_language} and return JSON only:\n'
f'{{"title": "{safe_title}", "summary": "{safe_summary}", "tags": {safe_tags} }}'
)}
]
trans = None
for attempt in range(5):
try:
resp = llm_response(trans_messages)
resp = resp.strip()
logger.debug(f"[summarize-translate] raw response (attempt {attempt+1}): {resp[:300]}...")
trans = _extract_first_json_object(resp)
if not trans.get('title') or not trans.get('summary'):
raise ValueError("Missing fields")
break
except Exception as e:
logger.debug(f"[summarize-translate] parse error: {e}")
time.sleep(RETRY_SLEEP_S)
if trans is None:
trans = {
'title': summary_obj['title'],
'summary': summary_obj['summary'],
'tags': info.get('tags', [])
}
title = (trans.get('title', '')).strip().strip('“”"‘’\'《》')
return {
'title': title,
'author': info.get('uploader', ''),
'summary': (trans.get('summary', '')).strip(),
'tags': trans.get('tags', info.get('tags', [])),
'language': target_language
}
# ============================================================
# Line-by-line translation (LLM path kept; MT path fast/parallel)
# ============================================================
@lru_cache(maxsize=4096)
def _mt_cached(text: str, target_language: str, server: str) -> str:
return translator_response(text, to_language=target_language, translator_server=server)
def _norm_key(s: str) -> str:
return re.sub(r'\s+', ' ', (s or '').strip().lower())
def _translate_llm_path(summary: dict, transcript: List[dict], target_language: str) -> List[str]:
info = f'This is a video called "{summary["title"]}". {summary["summary"]}.'
full_translation: List[str] = []
fixed_message = [
{
'role': 'system',
'content': (
f'You are a professional translator.\n'
f'Context (terminology only): {info}\n'
f'RULES (must obey exactly):\n'
f'1) Translate the quoted sentence into {target_language}.\n'
f'2) Output ONLY inside tags: <t>...translation...</t>\n'
f'3) No other text, no quotes, no markdown, no explanations.\n'
f'4) Do NOT paraphrase in the original language; output MUST be in {target_language}.\n'
f'5) Preserve numbers and technical terms faithfully.\n'
)
},
{'role': 'user', 'content': 'Translate: "Original Text"'},
{'role': 'assistant', 'content': '<t>Example translation</t>'}
]
history: List[Dict[str, Any]] = []
dedup_cache: Dict[str, str] = {}
for line_idx, line in enumerate(transcript):
text = line.get('text', '')
if not text or _NON_SPEECH.match(text):
full_translation.append('')
continue
key = _norm_key(text)
if ENABLE_DEDUP_SAME_LINES and key in dedup_cache:
full_translation.append(dedup_cache[key])
history = history[-LLM_HISTORY_WINDOW:]
history += [
{'role': 'user', 'content': f'Translate: "{text}"'},
{'role': 'assistant', 'content': dedup_cache[key]},
]
time.sleep(SMALL_SLEEP_S)
continue
retry_hint = ''
success = False
last_err = None
for attempt in range(LLM_MAX_RETRIES):
strict = (attempt == 0) # first attempt strict, later attempts relaxed
messages = fixed_message + history[-LLM_HISTORY_WINDOW:] + [
{'role': 'user',
'content': f'{retry_hint}Translate the following and output ONLY <t>...</t>:\n"{text}"'}
]
try:
resp = llm_response(messages)
ok, t_clean = valid_translation(text, resp, target_language, strict=strict)
do_bt = ENABLE_BACKTRANSLATE_VERIFY and not (_is_micro_utterance(text) or _is_numericish(text))
if ok and do_bt:
if not _verify_by_backtranslation(text, t_clean, target_language):
ok = False
retry_hint = "Ensure the output is a faithful translation into the target language. "
raise ValueError("Back-translation verification failed")
if not ok:
retry_hint = "Only output the translation. No quotes. No markdown. "
raise ValueError("Invalid translation output")
full_translation.append(t_clean)
if ENABLE_DEDUP_SAME_LINES:
dedup_cache[key] = t_clean
success = True
break
except Exception as e:
last_err = e
logger.debug(f"[translate-LLM] retryable issue at idx={line_idx}: {e}")
time.sleep(RETRY_SLEEP_S)
if not success:
try:
mt_fallback = _mt_cached(text, target_language, 'google')
ok, t_clean = valid_translation(text, mt_fallback, target_language, strict=False)
if ok and ENABLE_BACKTRANSLATE_VERIFY and not (_is_micro_utterance(text) or _is_numericish(text)):
if not _verify_by_backtranslation(text, t_clean, target_language):
ok = False
full_translation.append(t_clean if ok else text)
if ok and ENABLE_DEDUP_SAME_LINES:
dedup_cache[key] = t_clean
logger.warning(f"[translate-line] fell back to MT for a line due to: {last_err}")
except Exception as ee:
logger.warning(f"[translate-line] MT fallback failed: {ee}")
full_translation.append(text)
history = history[-LLM_HISTORY_WINDOW:]
history += [
{'role': 'user', 'content': f'Translate: "{text}"'},
{'role': 'assistant', 'content': full_translation[-1]},
]
time.sleep(SMALL_SLEEP_S)
return full_translation
def _translate_mt_path(transcript: List[dict], target_language: str, server: str) -> List[str]:
texts = [(i, line.get('text', '')) for i, line in enumerate(transcript)]
results = [''] * len(texts)
if MT_MAX_WORKERS <= 1:
for i, t in texts:
if not t or _NON_SPEECH.match(t):
results[i] = ''
continue
mt = _mt_cached(t, target_language, server)
ok, t_clean = valid_translation(t, mt, target_language) # strict default
if ok and ENABLE_BACKTRANSLATE_VERIFY and not _is_micro_utterance(t) and not _is_numericish(t):
if not _verify_by_backtranslation(t, t_clean, target_language):
ok = False
results[i] = t_clean if ok else t
time.sleep(SMALL_SLEEP_S)
return results
with ThreadPoolExecutor(max_workers=MT_MAX_WORKERS) as ex:
futs = {}
for i, t in texts:
if not t or _NON_SPEECH.match(t):
results[i] = ''
continue
futs[ex.submit(_mt_cached, t, target_language, server)] = (i, t)
for fut in as_completed(futs):
i, src = futs[fut]
try:
mt = fut.result()
ok, t_clean = valid_translation(src, mt, target_language) # strict default
if ok and ENABLE_BACKTRANSLATE_VERIFY and not _is_micro_utterance(src) and not _is_numericish(src):
if not _verify_by_backtranslation(src, t_clean, target_language):
ok = False
results[i] = t_clean if ok else src
except Exception as e:
logger.debug(f"[translate-mt] worker error: {e}")
results[i] = src
return results
def _translate(summary: dict, transcript: List[dict], target_language: str = '简体中文', method: str = 'LLM') -> List[str]:
# FAST mode: prefer MT path unless explicitly forced to LLM
if FAST_TRANSLATION_MODE and method not in ['Google Translate', 'Bing Translate', 'LLM']:
method = 'Google Translate'
if method in ['Google Translate', 'Bing Translate']:
server = 'google' if method == 'Google Translate' else 'bing'
return _translate_mt_path(transcript, target_language, server)
return _translate_llm_path(summary, transcript, target_language)
# ============================================================
# Public entry points
# ============================================================
def _atomic_write_json(path: str, obj: Any):
tmp = f"{path}.tmp"
with open(tmp, 'w', encoding='utf-8') as f:
json.dump(obj, f, indent=2, ensure_ascii=False)
os.replace(tmp, path)
def translate(method: str, folder: str, target_language: str = '简体中文'):
"""
Translate a single video folder w/ transcript.json.
Writes/updates summary.json and translation.json (time-aligned).
"""
translation_path = os.path.join(folder, 'translation.json')
if os.path.exists(translation_path):
logger.info(f'Translation already exists in {folder}')
return True
info_path = os.path.join(folder, 'download.info.json')
if os.path.exists(info_path):
with open(info_path, 'r', encoding='utf-8') as f:
info_raw = json.load(f)
info = get_necessary_info(info_raw)
else:
info = {
'title': os.path.basename(folder),
'uploader': 'Unknown',
'description': 'Unknown',
'upload_date': 'Unknown',
'tags': []
}
transcript_path = os.path.join(folder, 'transcript.json')
with open(transcript_path, 'r', encoding='utf-8') as f:
transcript = json.load(f)
summary_path = os.path.join(folder, 'summary.json')
if os.path.exists(summary_path):
with open(summary_path, 'r', encoding='utf-8') as f:
summary = json.load(f)
else:
summary = summarize(info, transcript, target_language, method)
_atomic_write_json(summary_path, summary)
translations = _translate(summary, transcript, target_language, method)
# Attach and split
for i, line in enumerate(transcript):
line['translation'] = translations[i]
transcript_split = split_sentences(transcript, target_language=target_language, use_char_based_end=True)
_atomic_write_json(translation_path, transcript_split)
return summary, transcript_split
def translate_all_transcript_under_folder(folder: str, method: str, target_language: str):
"""
Walk directory; translate each subfolder that has transcript.json but not translation.json.
Returns (message, last_summary_json, last_translation_json)
"""
summary_json, translate_json = None, None
for root, dirs, files in os.walk(folder):
if 'transcript.json' in files and 'translation.json' not in files:
summary_json, translate_json = translate(method, root, target_language)
elif 'translation.json' in files:
sum_p = os.path.join(root, 'summary.json')
trn_p = os.path.join(root, 'translation.json')
if os.path.exists(sum_p):
with open(sum_p, 'r', encoding='utf-8') as f:
summary_json = json.load(f)
if os.path.exists(trn_p):
with open(trn_p, 'r', encoding='utf-8') as f:
translate_json = json.load(f)
print(summary_json, translate_json)
return f'Translated all videos under {folder}', summary_json, translate_json