BosonAI_Hackathon / tools /step045_emotion.py
github-actions[bot]
Deploy snapshot for HF Space (LFS pointers, heavy tests removed)
09eaf7c
# -*- coding: utf-8 -*-
"""
step045_emotion.py
Rate-SAFE, ultra-obvious DSP emotion shaping (no TTS prompt changes).
- Angry: lean low end, huge bite, bright, hard compression, gritty saturation, consonant snap, tense micro-jitter
- Happy: bright, sparkly, buoyant with parallel (upward) compression, mild grit
- Sad: darker/warmer, slower *feel* via pauses & HF roll-off, relaxed dynamics
- Speaking RATE is hard-limited to small changes (<= ±0.08), per request.
Public API (unchanged):
apply_emotion(wav, sr, preset="angry", strength=0.85, lang="en",
sentence_times=None, exaggerate=True) -> np.ndarray
auto_tune_emotion(wav, sr, target_preset="angry", strength=0.85, lang="en",
sentence_times=None, latency_budget_s=1.0, min_confidence=0.35,
max_iters=6, exaggerate=True)
"""
from __future__ import annotations
import time
from typing import List, Optional, Tuple
import numpy as np
import librosa
from loguru import logger
from scipy.signal import lfilter, butter
from .step046_higgs_understanding import score_emotion, EmotionScore
# ---------------------------------------------------------
# Strong targets with MINIMAL rate changes
# ---------------------------------------------------------
_BASE_PRESETS = {
"neutral": dict(pitch_st=0.0, rate= 0.00, shelf_db= 0.0, mid_db= 0.0, comp_ratio=1.2, pause_scale=1.00, drive=0.00),
"happy": dict(pitch_st=+1.8, rate=+0.06, shelf_db=+8.0, mid_db=+3.0, comp_ratio=2.2, pause_scale=0.92, drive=0.18),
"sad": dict(pitch_st=-1.8, rate=-0.05, shelf_db=-6.0, mid_db=-2.0, comp_ratio=1.3, pause_scale=1.40, drive=0.00),
"angry": dict(pitch_st=+2.4, rate=+0.05, shelf_db=+11.0, mid_db=+9.0, comp_ratio=8.0, pause_scale=0.82, drive=0.55),
}
# Hard clamps (keep rate small)
_LIMITS = {
"neutral": dict(pitch_st=2.5, rate=0.08, shelf_db=12.0, mid_db=10.0, comp_ratio=8.0, drive=0.50),
"happy": dict(pitch_st=3.0, rate=0.08, shelf_db=12.0, mid_db=10.0, comp_ratio=8.0, drive=0.45),
"sad": dict(pitch_st=3.0, rate=0.08, shelf_db=12.0, mid_db=10.0, comp_ratio=6.0, drive=0.35),
"angry": dict(pitch_st=3.0, rate=0.08, shelf_db=13.5, mid_db=12.0, comp_ratio=12.0, drive=0.85),
}
_MAX_PITCH_ST_GLOBAL = 3.0
_MAX_RATE_FRAC_GLOBAL = 0.08 # <= 8% speed change total
# Guidance targets (Higgs VA)
_VA_TARGETS = {
"neutral": ( 0.00, 0.00),
"happy": (+0.60, +0.60),
"sad": (-0.60, -0.50),
"angry": (-0.40, +0.88),
}
# ---------- DSP helpers ----------
def _db_to_lin(db: float) -> float:
return float(10 ** (db / 20.0))
def _soft_compress(y: np.ndarray, ratio: float = 1.0) -> np.ndarray:
y = np.asarray(y, dtype=np.float32)
if ratio <= 1.0: return y
rms = float(np.sqrt(np.mean(y**2) + 1e-8))
gain = 1.0 / np.maximum(1.0, (np.abs(y) / (rms + 1e-8)) ** (ratio - 1.0))
return (y * gain).astype(np.float32)
def _parallel_compress(y: np.ndarray, ratio: float = 2.0, mix: float = 0.35) -> np.ndarray:
"""Upward(ish) compression via parallel mix of a compressed copy."""
if ratio <= 1.0 or mix <= 1e-4: return y
c = _soft_compress(y, ratio=ratio)
m = float(np.clip(mix, 0.0, 0.9))
out = (1.0 - m) * y + m * c
return np.clip(out, -1.0, 1.0).astype(np.float32)
def _limiter(y: np.ndarray, thr_db: float = -1.0) -> np.ndarray:
thr = _db_to_lin(thr_db)
peak = float(np.max(np.abs(y)) + 1e-8)
if peak <= thr: return y
return (y / peak * thr).astype(np.float32)
def _saturate(y: np.ndarray, drive: float = 0.15) -> np.ndarray:
y = np.asarray(y, dtype=np.float32)
if drive <= 1e-4: return y
t = np.tanh(y * (1.0 + float(drive)))
c = y - (y**3)/3.0
out = 0.6*t + 0.4*c
return np.clip(out, -1.0, 1.0).astype(np.float32)
def _biquad_peak(sr: int, f0: float, Q: float, gain_db: float):
A = 10 ** (gain_db / 40.0)
w0 = 2 * np.pi * f0 / float(sr)
alpha = np.sin(w0) / (2.0 * Q)
cosw0 = np.cos(w0)
b0 = 1 + alpha*A
b1 = -2*cosw0
b2 = 1 - alpha*A
a0 = 1 + alpha/A
a1 = -2*cosw0
a2 = 1 - alpha/A
b = np.array([b0, b1, b2], dtype=np.float64) / a0
a = np.array([1.0, a1 / a0, a2 / a0], dtype=np.float64)
return b, a
def _peaking_eq(y: np.ndarray, sr: int, gain_db: float, f0: float, Q: float) -> np.ndarray:
if abs(gain_db) < 1e-3: return y
b, a = _biquad_peak(sr, f0=f0, Q=Q, gain_db=gain_db)
return lfilter(b, a, y).astype(np.float32)
def _shelf(y: np.ndarray, sr: int, gain_db: float, cutoff: float, high: bool) -> np.ndarray:
if abs(gain_db) < 1e-3: return y
A = 10 ** (gain_db / 40.0)
w0 = 2*np.pi*cutoff/float(sr)
alpha = np.sin(w0)/2.0
cosw0 = np.cos(w0)
if high:
b0 = A*((A+1)+(A-1)*cosw0+2*np.sqrt(A)*alpha)
b1 = -2*A*((A-1)+(A+1)*cosw0)
b2 = A*((A+1)+(A-1)*cosw0-2*np.sqrt(A)*alpha)
a0 = (A+1)-(A-1)*cosw0+2*np.sqrt(A)*alpha
a1 = 2*((A-1)-(A+1)*cosw0)
a2 = (A+1)-(A-1)*cosw0-2*np.sqrt(A)*alpha
else:
b0 = A*((A+1)-(A-1)*cosw0+2*np.sqrt(A)*alpha)
b1 = 2*A*((A-1)-(A+1)*cosw0)
b2 = A*((A+1)-(A-1)*cosw0-2*np.sqrt(A)*alpha)
a0 = (A+1)+(A-1)*cosw0+2*np.sqrt(A)*alpha
a1 = -2*((A-1)+(A+1)*cosw0)
a2 = (A+1)+(A-1)*cosw0-2*np.sqrt(A)*alpha
if abs(a0) < 1e-12: return y
b = np.array([b0,b1,b2],dtype=np.float64)/a0
a = np.array([1.0,a1/a0,a2/a0],dtype=np.float64)
return lfilter(b,a,y).astype(np.float32)
def _high_shelf(y, sr, gain_db, cutoff): return _shelf(y,sr,gain_db,cutoff,True)
def _low_shelf(y, sr, gain_db, cutoff): return _shelf(y,sr,gain_db,cutoff,False)
def _hp(y: np.ndarray, sr: int, cutoff: float, order: int = 2) -> np.ndarray:
if cutoff <= 0.0: return y
b, a = butter(order, cutoff / (0.5 * sr), btype='high', output='ba')
return lfilter(b, a, y).astype(np.float32)
def _lp(y: np.ndarray, sr: int, cutoff: float, order: int = 2) -> np.ndarray:
if cutoff <= 0.0: return y
b, a = butter(order, cutoff / (0.5 * sr), btype='low', output='ba')
return lfilter(b, a, y).astype(np.float32)
def _de_ess(y: np.ndarray, sr: int, center: float = 7200.0, Q: float = 3.0, depth_db: float = -7.0) -> np.ndarray:
return _peaking_eq(y, sr, gain_db=depth_db, f0=center, Q=Q)
def _transient_snap(y: np.ndarray, amount: float = 0.32) -> np.ndarray:
if amount <= 1e-4: return y
yy = np.abs(y) - librosa.effects.preemphasis(np.abs(y), coef=0.85)
yy = np.clip(yy, 0.0, 1.0).astype(np.float32)
mix = float(np.clip(amount, 0.0, 0.6))
return np.clip((1.0 - mix) * y + mix * yy * np.sign(y), -1.0, 1.0).astype(np.float32)
def _micro_jitter(y: np.ndarray, sr: int, pitch_cents: float = 12.0, rate_ppm: float = 900.0) -> np.ndarray:
if len(y) < sr//3: return y
t = np.linspace(0, len(y)/sr, num=len(y), dtype=np.float32, endpoint=False)
p_lfo = 2*np.pi*0.9*t
r_lfo = 2*np.pi*0.7*t
n_steps = (pitch_cents / 100.0) * np.sin(p_lfo)
try:
yp = librosa.effects.pitch_shift(y, sr=sr, n_steps=n_steps.astype(np.float32))
except Exception:
yp = y
rate = 1.0 + (rate_ppm / 1_000_000.0) * np.sin(r_lfo)
try:
idx = np.cumsum(rate).astype(np.float32)
idx = (idx / idx[-1]) * (len(yp)-1)
yj = np.interp(idx, np.arange(len(yp), dtype=np.float32), yp).astype(np.float32)
except Exception:
yj = yp
return yj
def _stretch_pauses(y: np.ndarray, sr: int, sentence_times: Optional[List[Tuple[float, float]]], scale: float) -> np.ndarray:
y = np.asarray(y, dtype=np.float32)
if not sentence_times or abs(scale-1.0) < 1e-3: return y
n = len(y)
sent = sorted([(max(0.0,s), max(0.0,e)) for (s,e) in sentence_times], key=lambda x:x[0])
out: List[np.ndarray] = []
lead_end = max(0, min(n, int(sent[0][0]*sr)))
if lead_end > 0: out.append(y[:lead_end])
for i,(s,e) in enumerate(sent):
s_i = max(0, min(n, int(s*sr))); e_i = max(0, min(n, int(e*sr)))
if e_i > s_i: out.append(y[s_i:e_i])
nxt = sent[i+1][0] if i+1 < len(sent) else (n/ sr)
p1, p2 = e_i, max(0, min(n, int(nxt*sr)))
if p2 > p1:
pause_seg = y[p1:p2]
if len(pause_seg) > 16 and abs(scale-1.0) > 1e-3:
try:
new_rate = float(np.clip(1.0/float(scale), 0.70, 1.30))
pause_seg = librosa.effects.time_stretch(pause_seg, rate=new_rate)
except Exception as ex:
logger.warning(f"[Emotion] pause time_stretch failed: {ex}")
out.append(pause_seg)
try:
return np.concatenate(out).astype(np.float32, copy=False)
except Exception:
return y
# ---------- Parameter calibration ----------
def _calibrate_params(preset: str, params: dict, lang: str) -> dict:
lim = _LIMITS.get(preset, _LIMITS["neutral"])
out = params.copy()
# Mandarin pitch safety
if lang.lower().startswith("zh"):
out["pitch_st"] = float(np.clip(out["pitch_st"], -0.9, 0.9))
def cap(v, lo, hi): return float(np.clip(v, lo, hi))
req = dict(**out)
out["pitch_st"] = cap(out["pitch_st"], -lim["pitch_st"], lim["pitch_st"])
out["rate"] = cap(out["rate"], -lim["rate"], lim["rate"])
out["mid_db"] = cap(out["mid_db"], -lim["mid_db"], lim["mid_db"])
out["shelf_db"] = cap(out["shelf_db"], -lim["shelf_db"], lim["shelf_db"])
out["comp_ratio"] = max(1.0, min(lim["comp_ratio"], float(out["comp_ratio"])))
out["drive"] = max(0.0, min(lim["drive"], float(out.get("drive", 0.0))))
# global caps
out["pitch_st"] = cap(out["pitch_st"], -_MAX_PITCH_ST_GLOBAL, _MAX_PITCH_ST_GLOBAL)
out["rate"] = cap(out["rate"], -_MAX_RATE_FRAC_GLOBAL, _MAX_RATE_FRAC_GLOBAL)
# clamp logs
def log_clamp(name):
if abs(req[name] - out[name]) > 1e-6:
logger.debug(f"[Emotion] clamp {name}: {req[name]:+.2f} -> {out[name]:+.2f}")
for k in ("pitch_st","rate","mid_db","shelf_db","comp_ratio","drive"):
log_clamp(k)
return out
# ---------- Main effect ----------
def apply_emotion(
wav: np.ndarray,
sr: int,
preset: str = "neutral",
strength: float = 0.85,
lang: str = "en",
sentence_times: Optional[List[Tuple[float, float]]] = None,
exaggerate: bool = True,
) -> np.ndarray:
"""
Ultra-obvious pure-DSP shaping with SMALL rate adjustments.
"""
p = (preset or "neutral").lower()
if p not in _BASE_PRESETS:
logger.warning(f"[Emotion] Unknown preset '{preset}', defaulting to neutral.")
p = "neutral"
if strength <= 0:
return np.asarray(wav, dtype=np.float32)
ex = 1.0
if exaggerate:
ex = 1.45 if p == "angry" else 1.25 if p == "happy" else 1.25 if p == "sad" else 1.05
base = {k: (v * strength * ex if isinstance(v,(int,float)) else v) for k,v in _BASE_PRESETS[p].items()}
params = _calibrate_params(p, base, lang)
logger.info(
f"[Emotion] {p}{' (EXAG)' if exaggerate else ''} | "
f"pitch={params['pitch_st']:+.2f}st rate={params['rate']:+.2f} "
f"shelf={params['shelf_db']:+.1f}dB mid={params['mid_db']:+.1f}dB "
f"comp={params['comp_ratio']:.2f} pause={params['pause_scale']:.2f} drive={params['drive']:.2f}"
)
y = np.asarray(wav, dtype=np.float32)
# Prosody (keep rate subtle)
if abs(params["pitch_st"]) > 1e-3:
try: y = librosa.effects.pitch_shift(y, sr=sr, n_steps=float(params["pitch_st"])).astype(np.float32)
except Exception as e: logger.warning(f"[Emotion] pitch_shift failed: {e}")
if abs(params["rate"]) > 1e-3:
try: y = librosa.effects.time_stretch(y, rate=float(1.0 + params["rate"])).astype(np.float32)
except Exception as e: logger.warning(f"[Emotion] time_stretch failed: {e}")
if sentence_times:
y = _stretch_pauses(y, sr, sentence_times, float(params["pause_scale"]))
# Timbre/dynamics chains
if p == "angry":
# Thin warmth, add dual bite + bright tilt, control hiss, crush, grit, snap, tension
y = _hp(y, sr, cutoff=200.0, order=2)
y = _low_shelf(y, sr, gain_db=-3.0, cutoff=360.0)
y = _peaking_eq(y, sr, gain_db=float(params["mid_db"]), f0=2850.0, Q=0.9)
y = _peaking_eq(y, sr, gain_db=float(params["mid_db"]*0.65), f0=4300.0, Q=1.0)
y = _high_shelf(y, sr, gain_db=float(params["shelf_db"]), cutoff=3800.0)
y = _de_ess(y, sr, center=7200.0, Q=3.0, depth_db=-6.5)
# compression → saturation → transient snap → micro-jitter
y = _soft_compress(y, ratio=float(max(params["comp_ratio"], 7.0)))
y = _saturate(y, drive=float(max(params["drive"], 0.55)))
y = _transient_snap(y, amount=0.34)
y = _micro_jitter(y, sr, pitch_cents=12.0, rate_ppm=800.0)
elif p == "happy":
# Buoyant brightness + presence + upward compression + mild grit
y = _low_shelf(y, sr, gain_db=+2.0, cutoff=180.0)
y = _peaking_eq(y, sr, gain_db=float(max(params["mid_db"], 2.5)), f0=2400.0, Q=1.1)
y = _high_shelf(y, sr, gain_db=float(max(params["shelf_db"], 7.0)), cutoff=4200.0)
y = _parallel_compress(y, ratio=float(max(params["comp_ratio"], 2.2)), mix=0.38)
y = _saturate(y, drive=float(max(params["drive"], 0.16)))
elif p == "sad":
# Warmth + HF roll-off + relaxed dynamics (longer pauses already applied)
y = _lp(y, sr, cutoff=7000.0, order=2)
y = _high_shelf(y, sr, gain_db=float(min(params["shelf_db"], -6.0)), cutoff=3600.0)
y = _peaking_eq(y, sr, gain_db=float(min(params["mid_db"], -1.5)), f0=1800.0, Q=1.1)
y = _soft_compress(y, ratio=float(min(params["comp_ratio"], 1.6)))
# Final safety
y = _limiter(y, thr_db=-1.0)
return np.clip(y, -1.0, 1.0).astype(np.float32)
# ---------- Auto-tune with VA feedback (no rate escalation) ----------
def _angry_ok(v: float, a: float) -> bool: return (a >= 0.88) and (v <= -0.35)
def _happy_ok(v: float, a: float) -> bool: return (a >= 0.62) and (v >= +0.35)
def _sad_ok(v: float, a: float) -> bool: return (v <= -0.50) and (a <= 0.25)
def auto_tune_emotion(
wav: np.ndarray, sr: int, target_preset: str = "happy", strength: float = 0.85,
lang: str = "en", sentence_times: Optional[List[Tuple[float, float]]] = None,
latency_budget_s: float = 1.0, min_confidence: float = 0.35, max_iters: int = 6,
exaggerate: bool = True
):
"""
Escalates *non-rate* parameters until VA thresholds are met (rate stays clamped).
"""
t0 = time.time()
p = (target_preset or "neutral").lower()
if p not in _BASE_PRESETS:
logger.warning(f"[EmotionAuto] Unknown preset '{target_preset}', defaulting to neutral.")
p = "neutral"
def _ok(v, a):
return _angry_ok(v,a) if p=="angry" else _happy_ok(v,a) if p=="happy" else _sad_ok(v,a) if p=="sad" else True
best_y = wav
best_sc = score_emotion(best_y, sr)
# strong first pass
cur_y = apply_emotion(best_y, sr, preset=p, strength=strength, lang=lang,
sentence_times=sentence_times, exaggerate=exaggerate)
cur_sc = score_emotion(cur_y, sr)
if cur_sc.confidence >= best_sc.confidence or _ok(cur_sc.valence, cur_sc.arousal):
best_y, best_sc = cur_y, cur_sc
it = 1
bite_boost = 0.0
shelf_boost = 0.0
drive_boost = 0.0
comp_boost = 0.0
shelf_cut_boost = 0.0 # for sad high cut
while it < max_iters and (time.time() - t0) < latency_budget_s:
it += 1
v, a = best_sc.valence, best_sc.arousal
if _ok(v,a) and best_sc.confidence >= min_confidence:
break
# Escalate WITHOUT touching rate
if p == "angry":
if a < 0.88: # more arousal → brighter + tighter
shelf_boost += 1.5; comp_boost += 0.8
if v > -0.35: # more negative valence → harsher bite + drive + low warmth cut
bite_boost += 1.8; drive_boost += 0.10
elif p == "happy":
if a < 0.62: shelf_boost += 1.2
if v < 0.35: bite_boost += 1.0; drive_boost += 0.05
elif p == "sad":
if a > 0.25: shelf_cut_boost += 1.5 # darker feel
if v > -0.50: bite_boost -= 0.6 # soften presence
# Re-run apply_emotion with slightly higher strength (still rate-clamped)
local_strength = min(1.0, strength * (1.03 ** it))
y_try = apply_emotion(best_y, sr, preset=p, strength=local_strength, lang=lang,
sentence_times=sentence_times, exaggerate=True)
# Macro post-tweaks (no rate)
if p == "angry":
if bite_boost > 0:
y_try = _peaking_eq(y_try, sr, gain_db=+min(4.0, bite_boost), f0=2950.0, Q=0.95)
y_try = _peaking_eq(y_try, sr, gain_db=+min(3.0, bite_boost*0.7), f0=4300.0, Q=1.0)
if shelf_boost > 0:
y_try = _high_shelf(y_try, sr, gain_db=+min(4.0, shelf_boost), cutoff=4000.0)
if drive_boost > 0:
y_try = _saturate(y_try, drive=min(0.25, drive_boost))
if comp_boost > 0:
y_try = _soft_compress(y_try, ratio=1.0 + min(3.0, comp_boost))
y_try = _limiter(y_try, thr_db=-1.0)
elif p == "happy":
if bite_boost > 0:
y_try = _peaking_eq(y_try, sr, gain_db=+min(3.0, bite_boost), f0=2400.0, Q=1.0)
if shelf_boost > 0:
y_try = _high_shelf(y_try, sr, gain_db=+min(3.0, shelf_boost), cutoff=4200.0)
if drive_boost > 0:
y_try = _saturate(y_try, drive=min(0.12, drive_boost))
y_try = _limiter(y_try, thr_db=-1.0)
elif p == "sad":
if shelf_cut_boost > 0:
y_try = _high_shelf(y_try, sr, gain_db=-min(4.0, shelf_cut_boost), cutoff=3600.0)
y_try = _lp(y_try, sr, cutoff=6800.0, order=2)
if bite_boost < 0:
y_try = _peaking_eq(y_try, sr, gain_db=max(-2.0, bite_boost), f0=2000.0, Q=1.1)
y_try = _limiter(y_try, thr_db=-1.0)
sc_try = score_emotion(y_try, sr)
better = (sc_try.confidence > best_sc.confidence) or (_ok(sc_try.valence, sc_try.arousal) and not _ok(best_sc.valence, best_sc.arousal))
if better:
best_y, best_sc = y_try, sc_try
meta = {
"final": dict(valence=best_sc.valence, arousal=best_sc.arousal,
label=best_sc.label, confidence=best_sc.confidence),
"preset": p, "strength": strength, "iters": it,
"exaggerate": exaggerate,
}
return best_y, meta