sivakorn-su
delete overlab
b0c0237
# ============================
# utils.py — Utility Functions
# ============================
import os
import shutil
from fastapi import UploadFile
from moviepy.editor import VideoFileClip
from pydub import AudioSegment, effects
import pandas as pd
import numpy as np
from collections import Counter
import time
from config import UPLOAD_FOLDER
from models import pipelines, models, together
import subprocess
import librosa
# from pythainlp.spell import correct_sent
def save_uploaded_file(file: UploadFile) -> str:
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
filepath = os.path.join(UPLOAD_FOLDER, file.filename)
with open(filepath, "wb") as f:
shutil.copyfileobj(file.file, f)
return filepath
def correct_text_with_tokenizer(text: str) -> str:
tokens = word_tokenize(text, engine="newmm")
corrected_tokens = [correct(word) for word in tokens]
return ''.join(corrected_tokens)
def extract_and_normalize_audio(file_path: str) -> str:
ext = os.path.splitext(file_path)[1].lower()
audio_path = os.path.join(UPLOAD_FOLDER, "extracted_audio.wav")
if ext == ".mp4":
clip = VideoFileClip(file_path)
clip.audio.write_audiofile(audio_path)
elif ext in [".mp3", ".wav"]:
audio_path = file_path
else:
raise ValueError("รองรับเฉพาะไฟล์ mp4, mp3, wav เท่านั้น")
audio = AudioSegment.from_file(audio_path)
normalized_audio = effects.normalize(audio)
cleaned_path = os.path.join(UPLOAD_FOLDER, "cleaned.wav")
normalized_audio.export(cleaned_path, format="wav")
return cleaned_path
def split_segments(audio_path: str, df: pd.DataFrame, stretch_factor: float = 1.25) -> str:
segment_folder = os.path.join(UPLOAD_FOLDER, "segments")
# ล้างของเก่าแล้วสร้างใหม่
if os.path.exists(segment_folder):
shutil.rmtree(segment_folder)
os.makedirs(segment_folder, exist_ok=True)
audio = AudioSegment.from_file(audio_path)
for i, row in df.iterrows():
start_ms = int(row['start'] * 1000)
end_ms = int(row['end'] * 1000)
segment = audio[start_ms:end_ms]
# Export temp segment
temp_path = os.path.join(segment_folder, f"temp_{i:03d}.wav")
segment.export(temp_path, format="wav")
# Final output path (after stretch)
output_path = os.path.join(segment_folder, f"segment_{i:03d}_{row['speaker']}.wav")
# Stretch ด้วย ffmpeg (พูดช้าลงแต่ไม่เพี้ยน)
subprocess.run([
"ffmpeg", "-y", "-i", temp_path,
"-filter:a", f"atempo={1/stretch_factor:.3f}",
output_path
], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
# ลบ temp
os.remove(temp_path)
return segment_folder
def transcribe_segments(segment_folder: str) -> pd.DataFrame:
files = sorted([f for f in os.listdir(segment_folder) if f.endswith(".wav")])
model = models[0]
results = []
for filename in files:
segment_path = os.path.join(segment_folder, filename)
try:
segments, _ = model.transcribe(
segment_path,
language="th",
beam_size=5,
vad_filter=True,
word_timestamps=True
)
# ดึงคำทั้งหมดจากทุก segment
words = [word for seg in segments if hasattr(seg, "words") for word in seg.words]
if words:
full_text = ''.join([w.word for w in words])
probs = [w.probability for w in words if w.probability is not None]
avg_prob = round(np.mean(probs), 4) if probs else 0.0
results.append({
"filename": filename,
"text": full_text,
"avg_probability": avg_prob,
})
else:
results.append({
"filename": filename,
"text": "",
"avg_probability": 0.0,
})
except Exception as e:
print(f"❌ Error with {filename}: {e}")
results.append({
"filename": filename,
"text": "",
"avg_probability": 0.0,
"error": str(e)
})
return pd.DataFrame(results)
def clean_summary(text):
import re
if not text or len(str(text).strip()) == 0:
return "ไม่มีข้อมูลสำคัญที่จะสรุป"
text = str(text)
patterns_to_remove = [
r'สรุป:\s*', r'สรุปการประชุม:\s*', r'บทสรุป:\s*', r'ข้อสรุป:\s*',
r'\*\*Key Messages:\*\*|\*\*หัวข้อหลัก:\*\*', r'\*\*Action Items:\*\*|\*\*ประเด็นสำคัญ:\*\*',
r'\*\*Summary:\*\*|\*\*สรุป:\*\*',
r'^[-•]\s*Key Messages?:?\s*', r'^[-•]\s*Action Items?:?\s*', r'^[-•]\s*หัวข้อหลัก:?',
r'^[-•]\s*ประเด็นสำคัญ:?', r'^[-•]\s*ข้อมูลน่าสนใจ:?', r'^[-•]\s*บทสรุป:?',
r'\r\n|\r|\n', r'\t+',
r'หมายเหตุ:.*?(?=\n|\r|$)', r'เนื่องจาก.*?(?=\n|\r|$)', r'ไม่มีข้อความ.*?(?=\n|\r|$)',
r'ไม่มีประเด็น.*?(?=\n|\r|$)', r'ไม่มี Action Items.*?(?=\n|\r|$)', r'ไม่มีรายการ.*?(?=\n|\r|$)',
r'ต้องการข้อมูลเพิ่มเติม.*?(?=\n|\r|$)', r'ต้องขอความชัดเจนเพิ่มเติม.*?(?=\n|\r|$)',
r'\(ตัดประโยคที่ไม่เกี่ยวข้องหรือซ้ำซ้อนออก.*?\)', r'\(.*?เพื่อเน้นความชัดเจน.*?\)',
r'ตามที่ได้กล่าวไว้.*?(?=\n|\r|$)', r'จากข้อความที่ให้มา.*?(?=\n|\r|$)',
r'Based on the provided text.*?(?=\n|\r|$)', r'According to the text.*?(?=\n|\r|$)',
r'\s+'
]
cleaned_text = text
for pattern in patterns_to_remove:
if pattern == r'\s+':
cleaned_text = re.sub(pattern, ' ', cleaned_text)
else:
cleaned_text = re.sub(pattern, '', cleaned_text, flags=re.IGNORECASE | re.MULTILINE | re.DOTALL)
cleaned_text = re.sub(r'\*\*(.*?)\*\*', r'\1', cleaned_text)
cleaned_text = re.sub(r'\*(.*?)\*', r'\1', cleaned_text)
cleaned_text = re.sub(r'_{2,}(.*?)_{2,}', r'\1', cleaned_text)
cleaned_text = re.sub(r'[.]{3,}', '...', cleaned_text)
cleaned_text = re.sub(r'[!]{2,}', '!', cleaned_text)
cleaned_text = re.sub(r'[?]{2,}', '?', cleaned_text)
cleaned_text = re.sub(r'^[-•*]\s*', '', cleaned_text, flags=re.MULTILINE)
cleaned_text = re.sub(r'^\d+\.\s*', '', cleaned_text, flags=re.MULTILINE)
useless_phrases = [
'ไม่มี', 'ไม่สามารถสรุปได้', 'ข้อความต้นฉบับไม่มีความหมาย', 'ไม่มีข้อมูลเพียงพอ',
'ไม่มีประเด็นสำคัญ', 'ไม่มี Action Items', 'ต้องขอความชัดเจนเพิ่มเติม',
'ไม่มีข้อมูลที่สำคัญ', 'ไม่สามารถระบุได้', 'ข้อมูลไม่ชัดเจน', 'ไม่มีเนื้อหาที่เกี่ยวข้อง',
'N/A', 'n/a', 'Not applicable', 'No content', 'No summary available'
]
cleaned_text = cleaned_text.strip()
if (len(cleaned_text) < 15 or
any(phrase.lower() in cleaned_text.lower() for phrase in useless_phrases) or
cleaned_text.lower() in [phrase.lower() for phrase in useless_phrases]):
return "ไม่มีข้อมูลสำคัญที่จะสรุปมากพอ"
cleaned_text = re.sub(r'\s+([.!?])', r'\1', cleaned_text)
cleaned_text = re.sub(r'([.!?])\s*([A-Za-zก-๙])', r'\1 \2', cleaned_text)
return cleaned_text
def summarize_texts(texts, api_key, model="deepseek-ai/DeepSeek-V3", delay=0):
import time
def _is_quota_error(err_msg: str) -> bool:
msg = str(err_msg).lower()
keys = [
"insufficient_quota", "insufficient quota", "insufficient credits",
"out of credits", "credit exhausted", "quota", "429",
"rate limit", "too many requests", "token exhausted"
]
return any(k in msg for k in keys)
summaries = []
texts = [t if t is not None else "" for t in texts]
for idx, text in enumerate(texts):
prompt = f"""
สรุปข้อความประชุมนี้เป็นภาษาไทยสั้น ๆ เน้นประเด็นสำคัญ (key messages) และ Action Items โดยตัดรายละเอียดที่ไม่สำคัญออก:
ข้อความ:
{text}
สรุป:
- Key Messages:
- Action Items:
""".strip()
try:
response = together.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": "คุณเป็นผู้เชี่ยวชาญในการสรุปเนื้อหา ตอบเป็นภาษาไทยเสมอ เน้นหัวข้อหลักและข้อมูลสำคัญ"},
{"role": "user", "content": prompt}
],
max_tokens=1024,
temperature=0.7,
)
summary = (response.choices[0].message.content or "").strip()
summary = clean_summary(summary) # สมมติว่าฟังก์ชันนี้มีอยู่แล้วในโค้ดคุณ
summaries.append(summary)
except Exception as e:
print(f"Error at index {idx}: {e}")
if _is_quota_error(e):
summaries.append(" - ")
else:
summaries.append("ไม่สามารถสรุปได้")
if idx < len(texts) - 1:
time.sleep(delay)
return summaries
def add_corrected_text_column(df):
# แก้ประโยคแต่ละบรรทัด แล้วเพิ่มคอลัมน์ใหม่
# df["nlp_correct_text"] = df["text"].apply(lambda text: correct_sent(text) if isinstance(text, str) else "")
return df
def add_llm_spell_corrected_text_column(df, model="google/gemma-3-27b-it", delay=1.5):
import time
def _is_quota_error(err_msg: str) -> bool:
msg = err_msg.lower()
# ครอบให้กว้าง: โควต้า/เครดิตหมด, โดน rate limit, token exhausted ฯลฯ
keys = [
"insufficient_quota", "insufficient quota", "insufficient credits",
"out of credits", "credit exhausted", "quota", "429",
"rate limit", "too many requests", "token exhausted"
]
return any(k in msg for k in keys)
texts = df["text"].fillna("").astype(str).tolist()
corrected = []
for idx, text in enumerate(texts):
prompt = f"""
กรุณาแก้ไขข้อความต่อไปนี้ให้ถูกต้องตามหลักภาษาไทย:
- แก้ไขคำสะกดผิด คำพิมพ์ผิด หรือคำที่ไม่ถูกต้องและการผันวรรณยุกต์ผิด
- ห้ามเปลี่ยนความหมาย
- ห้ามตอบเกิน
- **ตอบกลับเฉพาะข้อความที่แก้แล้ว**
{text}
""".strip()
try:
response = together.chat.completions.create(
model=model,
messages=[
{
"role": "system",
"content": """คุณคือนักภาษาศาสตร์ผู้เชี่ยวชาญด้านการตรวจสอบคำสะกดผิด คำพิมพ์ผิด และการผันวรรณยุกต์ผิดของภาษาไทย
หน้าที่ของคุณคือแก้ไขคำผิดในข้อความที่ได้รับให้ถูกต้องตามมาตรฐานภาษาไทย โดยไม่เปลี่ยนความหมายเดิม
หน้าที่ของคุณ:
- แก้ไขข้อความภาษาไทยให้ถูกต้องตามหลักภาษาไทยมาตรฐาน
- ตรวจสอบคำสะกดผิด คำพิมพ์ผิด และการผันวรรณยุกต์ผิด
- แก้คำเพี้ยน คำที่มาจากเสียงพูด เช่น ภาษาวัยรุ่นหรือคำพูดที่ออกเสียงคล้ายกัน ให้เป็นคำที่ถูกต้อง
- รักษาความหมายเดิมของข้อความให้มากที่สุด
- ห้ามแปลความใหม่ ห้ามตีความเกิน ห้ามปรับสำนวน
- ห้ามอธิบาย หรือใส่คำพูดใด ๆ เพิ่มเติมก่อนหรือหลังข้อความ
- **ให้ตอบกลับเฉพาะข้อความที่แก้ไขแล้วเท่านั้น**
ตัวอย่าง:
ผู้ใช้: ผมไช้คอมพิวเตอรทุกวัน
คุณ: ผมใช้คอมพิวเตอร์ทุกวัน
ผู้ใช้: ปวดหัวจะตายุ่ละ
คุณ: ปวดหัวจะตายอยู่ละ
ผู้ใช้: ไอ้เส้นหลั่งกุ้ง
คุณ: ไอ้เส้นหลังกุ้ง
ผู้ใช้: เซโยโมมันน่ากลัว
คุณ: เชื้อโรคมันน่ากลัว
จงตอบกลับเฉพาะข้อความที่แก้ไขแล้วตามตัวอย่างข้างต้นเท่านั้น
"""
},
{"role": "user", "content": prompt}
],
max_tokens=256,
temperature=0.3,
)
corrected_text = (response.choices[0].message.content or "").strip()
corrected.append(corrected_text)
except Exception as e:
err = str(e)
print(f"❌ Error at index {idx}: {err}")
if _is_quota_error(err):
corrected.append(" - ")
else:
corrected.append("")
if idx < len(texts) - 1:
time.sleep(delay)
df["llm_corrected_text"] = corrected
return df
# def _merge_intervals(intervals, gap=0.0):
# if not intervals:
# return []
# intervals = sorted(intervals, key=lambda x: x[0])
# merged = [list(intervals[0])]
# for s, e in intervals[1:]:
# if s <= merged[-1][1] + gap:
# merged[-1][1] = max(merged[-1][1], e)
# else:
# merged.append([s, e])
# return [(float(a), float(b)) for a, b in merged]
# def _interval_intersection(a, b):
# s = max(a[0], b[0]); e = min(a[1], b[1])
# return (s, e) if e > s else None
# def detect_overlap_timeline(audio_path: str):
# """
# คืนรายการช่วงเวลาที่มีการพูดซ้อน [(start, end), ...]
# ถ้าโหลดโมเดลไม่ได้ → คืน []
# """
# if overlap_pipeline is None:
# return []
# try:
# ov = overlap_pipeline(audio_path) # pyannote Annotation
# intervals = [(float(seg.start), float(seg.end)) for seg in ov.get_timeline()]
# return _merge_intervals(intervals)
# except Exception as e:
# print(f"⚠️ Overlap detection failed: {e}")
# return []
def _confidence_metrics(audio_seg, sr):
try:
rms = librosa.feature.rms(y=audio_seg)[0]
snr_est = float(np.mean(rms) / (np.std(rms) + 1e-9))
zcr = float(np.mean(librosa.feature.zero_crossing_rate(audio_seg)[0]))
dur = len(audio_seg) / sr
# normalize แบบง่าย
snr_score = min(snr_est / 10.0, 1.0)
zcr_score = 1.0 if 0.05 <= zcr <= 0.15 else 0.5
dur_score = min(dur / 5.0, 1.0)
conf = 0.5 * snr_score + 0.2 * zcr_score + 0.3 * dur_score
return max(0.0, min(1.0, conf))
except Exception:
return 0.5
def add_confidence_to_segments(audio_path: str, sr: int, segments: list):
"""
เติม key 'confidence' ให้แต่ละ segment (in-place)
segment: {'start','end','speaker','duration', ...}
"""
audio, _sr = librosa.load(audio_path, sr=sr)
for seg in segments:
s = int(seg["start"] * sr); e = int(seg["end"] * sr)
piece = audio[s:e] if 0 <= s < e <= len(audio) else np.array([])
conf = _confidence_metrics(piece, sr) if piece.size > 0 else 0.5
seg["confidence"] = float(conf)
return segments
def tag_segments_use_or_remove(segments: list, min_segment_duration=3.0, min_speaker_total=5.0):
# รวมเวลาแต่ละ speaker
tot = {}
for seg in segments:
sp = seg["speaker"]
tot.setdefault(sp, 0.0)
tot[sp] += float(seg["duration"])
valid_speakers = {sp for sp, t in tot.items() if t >= float(min_speaker_total)}
kept, removed = [], []
for seg in segments:
reasons = []
if seg["speaker"] not in valid_speakers:
reasons.append(f"speaker_total_duration<{min_speaker_total}s")
if float(seg["duration"]) < float(min_segment_duration):
reasons.append(f"segment_duration<{min_segment_duration}s")
if reasons:
seg2 = dict(seg)
seg2["tag"] = "remove"
seg2["remove_reason"] = ";".join(reasons)
removed.append(seg2)
else:
seg2 = dict(seg)
seg2["tag"] = "use"
seg2["remove_reason"] = ""
kept.append(seg2)
return kept, removed, sorted(list(valid_speakers))
# def enrich_with_overlap(segments: list, overlap_timeline: list):
# """
# เติม: has_overlap, overlap_intervals, overlap_ratio
# """
# for seg in segments:
# s, e = float(seg["start"]), float(seg["end"])
# overlaps = []
# total = 0.0
# for (os, oe) in overlap_timeline:
# inter = _interval_intersection((s, e), (os, oe))
# if inter:
# overlaps.append([round(inter[0], 3), round(inter[1], 3)])
# total += (inter[1] - inter[0])
# dur = max(1e-9, e - s)
# seg["has_overlap"] = bool(overlaps)
# seg["overlap_intervals"] = overlaps
# seg["overlap_ratio"] = float(total / dur)
# return segments
def diarize_audio(audio_path: str) -> pd.DataFrame:
sr = 16000
min_segment_duration = 3.0
min_speaker_total = 5.0
compute_confidence = True
# 1) Diarization
diar = pipelines[0](audio_path)
segments = []
for turn, _, speaker in diar.itertracks(yield_label=True):
segments.append({
"speaker": str(speaker),
"start": float(turn.start),
"end": float(turn.end),
"duration": float(turn.end - turn.start),
})
# 2) Confidence
if compute_confidence:
add_confidence_to_segments(audio_path, sr, segments)
else:
for s in segments:
s["confidence"] = 0.5
# 3) Tagging
kept, removed, _ = tag_segments_use_or_remove(
segments,
min_segment_duration=min_segment_duration,
min_speaker_total=min_speaker_total
)
# # 4) Overlap
# ov_tl = detect_overlap_timeline(audio_path)
# kept = enrich_with_overlap(kept, ov_tl)
# removed = enrich_with_overlap(removed, ov_tl)
# 5) Combine
all_rows = kept + removed
all_rows.sort(key=lambda r: r["start"])
df = pd.DataFrame(all_rows, columns=[
"speaker","start","end","duration","confidence",
"tag","remove_reason"
])
return df