# ============================ # utils.py — Utility Functions # ============================ import os import shutil from fastapi import UploadFile from moviepy.editor import VideoFileClip from pydub import AudioSegment, effects import pandas as pd import numpy as np from collections import Counter import time from config import UPLOAD_FOLDER from models import pipelines, models, together import subprocess import librosa # from pythainlp.spell import correct_sent def save_uploaded_file(file: UploadFile) -> str: os.makedirs(UPLOAD_FOLDER, exist_ok=True) filepath = os.path.join(UPLOAD_FOLDER, file.filename) with open(filepath, "wb") as f: shutil.copyfileobj(file.file, f) return filepath def correct_text_with_tokenizer(text: str) -> str: tokens = word_tokenize(text, engine="newmm") corrected_tokens = [correct(word) for word in tokens] return ''.join(corrected_tokens) def extract_and_normalize_audio(file_path: str) -> str: ext = os.path.splitext(file_path)[1].lower() audio_path = os.path.join(UPLOAD_FOLDER, "extracted_audio.wav") if ext == ".mp4": clip = VideoFileClip(file_path) clip.audio.write_audiofile(audio_path) elif ext in [".mp3", ".wav"]: audio_path = file_path else: raise ValueError("รองรับเฉพาะไฟล์ mp4, mp3, wav เท่านั้น") audio = AudioSegment.from_file(audio_path) normalized_audio = effects.normalize(audio) cleaned_path = os.path.join(UPLOAD_FOLDER, "cleaned.wav") normalized_audio.export(cleaned_path, format="wav") return cleaned_path def split_segments(audio_path: str, df: pd.DataFrame, stretch_factor: float = 1.25) -> str: segment_folder = os.path.join(UPLOAD_FOLDER, "segments") # ล้างของเก่าแล้วสร้างใหม่ if os.path.exists(segment_folder): shutil.rmtree(segment_folder) os.makedirs(segment_folder, exist_ok=True) audio = AudioSegment.from_file(audio_path) for i, row in df.iterrows(): start_ms = int(row['start'] * 1000) end_ms = int(row['end'] * 1000) segment = audio[start_ms:end_ms] # Export temp segment temp_path = os.path.join(segment_folder, f"temp_{i:03d}.wav") segment.export(temp_path, format="wav") # Final output path (after stretch) output_path = os.path.join(segment_folder, f"segment_{i:03d}_{row['speaker']}.wav") # Stretch ด้วย ffmpeg (พูดช้าลงแต่ไม่เพี้ยน) subprocess.run([ "ffmpeg", "-y", "-i", temp_path, "-filter:a", f"atempo={1/stretch_factor:.3f}", output_path ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) # ลบ temp os.remove(temp_path) return segment_folder def transcribe_segments(segment_folder: str) -> pd.DataFrame: files = sorted([f for f in os.listdir(segment_folder) if f.endswith(".wav")]) model = models[0] results = [] for filename in files: segment_path = os.path.join(segment_folder, filename) try: segments, _ = model.transcribe( segment_path, language="th", beam_size=5, vad_filter=True, word_timestamps=True ) # ดึงคำทั้งหมดจากทุก segment words = [word for seg in segments if hasattr(seg, "words") for word in seg.words] if words: full_text = ''.join([w.word for w in words]) probs = [w.probability for w in words if w.probability is not None] avg_prob = round(np.mean(probs), 4) if probs else 0.0 results.append({ "filename": filename, "text": full_text, "avg_probability": avg_prob, }) else: results.append({ "filename": filename, "text": "", "avg_probability": 0.0, }) except Exception as e: print(f"❌ Error with {filename}: {e}") results.append({ "filename": filename, "text": "", "avg_probability": 0.0, "error": str(e) }) return pd.DataFrame(results) def clean_summary(text): import re if not text or len(str(text).strip()) == 0: return "ไม่มีข้อมูลสำคัญที่จะสรุป" text = str(text) patterns_to_remove = [ r'สรุป:\s*', r'สรุปการประชุม:\s*', r'บทสรุป:\s*', r'ข้อสรุป:\s*', r'\*\*Key Messages:\*\*|\*\*หัวข้อหลัก:\*\*', r'\*\*Action Items:\*\*|\*\*ประเด็นสำคัญ:\*\*', r'\*\*Summary:\*\*|\*\*สรุป:\*\*', r'^[-•]\s*Key Messages?:?\s*', r'^[-•]\s*Action Items?:?\s*', r'^[-•]\s*หัวข้อหลัก:?', r'^[-•]\s*ประเด็นสำคัญ:?', r'^[-•]\s*ข้อมูลน่าสนใจ:?', r'^[-•]\s*บทสรุป:?', r'\r\n|\r|\n', r'\t+', r'หมายเหตุ:.*?(?=\n|\r|$)', r'เนื่องจาก.*?(?=\n|\r|$)', r'ไม่มีข้อความ.*?(?=\n|\r|$)', r'ไม่มีประเด็น.*?(?=\n|\r|$)', r'ไม่มี Action Items.*?(?=\n|\r|$)', r'ไม่มีรายการ.*?(?=\n|\r|$)', r'ต้องการข้อมูลเพิ่มเติม.*?(?=\n|\r|$)', r'ต้องขอความชัดเจนเพิ่มเติม.*?(?=\n|\r|$)', r'\(ตัดประโยคที่ไม่เกี่ยวข้องหรือซ้ำซ้อนออก.*?\)', r'\(.*?เพื่อเน้นความชัดเจน.*?\)', r'ตามที่ได้กล่าวไว้.*?(?=\n|\r|$)', r'จากข้อความที่ให้มา.*?(?=\n|\r|$)', r'Based on the provided text.*?(?=\n|\r|$)', r'According to the text.*?(?=\n|\r|$)', r'\s+' ] cleaned_text = text for pattern in patterns_to_remove: if pattern == r'\s+': cleaned_text = re.sub(pattern, ' ', cleaned_text) else: cleaned_text = re.sub(pattern, '', cleaned_text, flags=re.IGNORECASE | re.MULTILINE | re.DOTALL) cleaned_text = re.sub(r'\*\*(.*?)\*\*', r'\1', cleaned_text) cleaned_text = re.sub(r'\*(.*?)\*', r'\1', cleaned_text) cleaned_text = re.sub(r'_{2,}(.*?)_{2,}', r'\1', cleaned_text) cleaned_text = re.sub(r'[.]{3,}', '...', cleaned_text) cleaned_text = re.sub(r'[!]{2,}', '!', cleaned_text) cleaned_text = re.sub(r'[?]{2,}', '?', cleaned_text) cleaned_text = re.sub(r'^[-•*]\s*', '', cleaned_text, flags=re.MULTILINE) cleaned_text = re.sub(r'^\d+\.\s*', '', cleaned_text, flags=re.MULTILINE) useless_phrases = [ 'ไม่มี', 'ไม่สามารถสรุปได้', 'ข้อความต้นฉบับไม่มีความหมาย', 'ไม่มีข้อมูลเพียงพอ', 'ไม่มีประเด็นสำคัญ', 'ไม่มี Action Items', 'ต้องขอความชัดเจนเพิ่มเติม', 'ไม่มีข้อมูลที่สำคัญ', 'ไม่สามารถระบุได้', 'ข้อมูลไม่ชัดเจน', 'ไม่มีเนื้อหาที่เกี่ยวข้อง', 'N/A', 'n/a', 'Not applicable', 'No content', 'No summary available' ] cleaned_text = cleaned_text.strip() if (len(cleaned_text) < 15 or any(phrase.lower() in cleaned_text.lower() for phrase in useless_phrases) or cleaned_text.lower() in [phrase.lower() for phrase in useless_phrases]): return "ไม่มีข้อมูลสำคัญที่จะสรุปมากพอ" cleaned_text = re.sub(r'\s+([.!?])', r'\1', cleaned_text) cleaned_text = re.sub(r'([.!?])\s*([A-Za-zก-๙])', r'\1 \2', cleaned_text) return cleaned_text def summarize_texts(texts, api_key, model="deepseek-ai/DeepSeek-V3", delay=0): import time def _is_quota_error(err_msg: str) -> bool: msg = str(err_msg).lower() keys = [ "insufficient_quota", "insufficient quota", "insufficient credits", "out of credits", "credit exhausted", "quota", "429", "rate limit", "too many requests", "token exhausted" ] return any(k in msg for k in keys) summaries = [] texts = [t if t is not None else "" for t in texts] for idx, text in enumerate(texts): prompt = f""" สรุปข้อความประชุมนี้เป็นภาษาไทยสั้น ๆ เน้นประเด็นสำคัญ (key messages) และ Action Items โดยตัดรายละเอียดที่ไม่สำคัญออก: ข้อความ: {text} สรุป: - Key Messages: - Action Items: """.strip() try: response = together.chat.completions.create( model=model, messages=[ {"role": "system", "content": "คุณเป็นผู้เชี่ยวชาญในการสรุปเนื้อหา ตอบเป็นภาษาไทยเสมอ เน้นหัวข้อหลักและข้อมูลสำคัญ"}, {"role": "user", "content": prompt} ], max_tokens=1024, temperature=0.7, ) summary = (response.choices[0].message.content or "").strip() summary = clean_summary(summary) # สมมติว่าฟังก์ชันนี้มีอยู่แล้วในโค้ดคุณ summaries.append(summary) except Exception as e: print(f"Error at index {idx}: {e}") if _is_quota_error(e): summaries.append(" - ") else: summaries.append("ไม่สามารถสรุปได้") if idx < len(texts) - 1: time.sleep(delay) return summaries def add_corrected_text_column(df): # แก้ประโยคแต่ละบรรทัด แล้วเพิ่มคอลัมน์ใหม่ # df["nlp_correct_text"] = df["text"].apply(lambda text: correct_sent(text) if isinstance(text, str) else "") return df def add_llm_spell_corrected_text_column(df, model="google/gemma-3-27b-it", delay=1.5): import time def _is_quota_error(err_msg: str) -> bool: msg = err_msg.lower() # ครอบให้กว้าง: โควต้า/เครดิตหมด, โดน rate limit, token exhausted ฯลฯ keys = [ "insufficient_quota", "insufficient quota", "insufficient credits", "out of credits", "credit exhausted", "quota", "429", "rate limit", "too many requests", "token exhausted" ] return any(k in msg for k in keys) texts = df["text"].fillna("").astype(str).tolist() corrected = [] for idx, text in enumerate(texts): prompt = f""" กรุณาแก้ไขข้อความต่อไปนี้ให้ถูกต้องตามหลักภาษาไทย: - แก้ไขคำสะกดผิด คำพิมพ์ผิด หรือคำที่ไม่ถูกต้องและการผันวรรณยุกต์ผิด - ห้ามเปลี่ยนความหมาย - ห้ามตอบเกิน - **ตอบกลับเฉพาะข้อความที่แก้แล้ว** {text} """.strip() try: response = together.chat.completions.create( model=model, messages=[ { "role": "system", "content": """คุณคือนักภาษาศาสตร์ผู้เชี่ยวชาญด้านการตรวจสอบคำสะกดผิด คำพิมพ์ผิด และการผันวรรณยุกต์ผิดของภาษาไทย หน้าที่ของคุณคือแก้ไขคำผิดในข้อความที่ได้รับให้ถูกต้องตามมาตรฐานภาษาไทย โดยไม่เปลี่ยนความหมายเดิม หน้าที่ของคุณ: - แก้ไขข้อความภาษาไทยให้ถูกต้องตามหลักภาษาไทยมาตรฐาน - ตรวจสอบคำสะกดผิด คำพิมพ์ผิด และการผันวรรณยุกต์ผิด - แก้คำเพี้ยน คำที่มาจากเสียงพูด เช่น ภาษาวัยรุ่นหรือคำพูดที่ออกเสียงคล้ายกัน ให้เป็นคำที่ถูกต้อง - รักษาความหมายเดิมของข้อความให้มากที่สุด - ห้ามแปลความใหม่ ห้ามตีความเกิน ห้ามปรับสำนวน - ห้ามอธิบาย หรือใส่คำพูดใด ๆ เพิ่มเติมก่อนหรือหลังข้อความ - **ให้ตอบกลับเฉพาะข้อความที่แก้ไขแล้วเท่านั้น** ตัวอย่าง: ผู้ใช้: ผมไช้คอมพิวเตอรทุกวัน คุณ: ผมใช้คอมพิวเตอร์ทุกวัน ผู้ใช้: ปวดหัวจะตายุ่ละ คุณ: ปวดหัวจะตายอยู่ละ ผู้ใช้: ไอ้เส้นหลั่งกุ้ง คุณ: ไอ้เส้นหลังกุ้ง ผู้ใช้: เซโยโมมันน่ากลัว คุณ: เชื้อโรคมันน่ากลัว จงตอบกลับเฉพาะข้อความที่แก้ไขแล้วตามตัวอย่างข้างต้นเท่านั้น """ }, {"role": "user", "content": prompt} ], max_tokens=256, temperature=0.3, ) corrected_text = (response.choices[0].message.content or "").strip() corrected.append(corrected_text) except Exception as e: err = str(e) print(f"❌ Error at index {idx}: {err}") if _is_quota_error(err): corrected.append(" - ") else: corrected.append("") if idx < len(texts) - 1: time.sleep(delay) df["llm_corrected_text"] = corrected return df # def _merge_intervals(intervals, gap=0.0): # if not intervals: # return [] # intervals = sorted(intervals, key=lambda x: x[0]) # merged = [list(intervals[0])] # for s, e in intervals[1:]: # if s <= merged[-1][1] + gap: # merged[-1][1] = max(merged[-1][1], e) # else: # merged.append([s, e]) # return [(float(a), float(b)) for a, b in merged] # def _interval_intersection(a, b): # s = max(a[0], b[0]); e = min(a[1], b[1]) # return (s, e) if e > s else None # def detect_overlap_timeline(audio_path: str): # """ # คืนรายการช่วงเวลาที่มีการพูดซ้อน [(start, end), ...] # ถ้าโหลดโมเดลไม่ได้ → คืน [] # """ # if overlap_pipeline is None: # return [] # try: # ov = overlap_pipeline(audio_path) # pyannote Annotation # intervals = [(float(seg.start), float(seg.end)) for seg in ov.get_timeline()] # return _merge_intervals(intervals) # except Exception as e: # print(f"⚠️ Overlap detection failed: {e}") # return [] def _confidence_metrics(audio_seg, sr): try: rms = librosa.feature.rms(y=audio_seg)[0] snr_est = float(np.mean(rms) / (np.std(rms) + 1e-9)) zcr = float(np.mean(librosa.feature.zero_crossing_rate(audio_seg)[0])) dur = len(audio_seg) / sr # normalize แบบง่าย snr_score = min(snr_est / 10.0, 1.0) zcr_score = 1.0 if 0.05 <= zcr <= 0.15 else 0.5 dur_score = min(dur / 5.0, 1.0) conf = 0.5 * snr_score + 0.2 * zcr_score + 0.3 * dur_score return max(0.0, min(1.0, conf)) except Exception: return 0.5 def add_confidence_to_segments(audio_path: str, sr: int, segments: list): """ เติม key 'confidence' ให้แต่ละ segment (in-place) segment: {'start','end','speaker','duration', ...} """ audio, _sr = librosa.load(audio_path, sr=sr) for seg in segments: s = int(seg["start"] * sr); e = int(seg["end"] * sr) piece = audio[s:e] if 0 <= s < e <= len(audio) else np.array([]) conf = _confidence_metrics(piece, sr) if piece.size > 0 else 0.5 seg["confidence"] = float(conf) return segments def tag_segments_use_or_remove(segments: list, min_segment_duration=3.0, min_speaker_total=5.0): # รวมเวลาแต่ละ speaker tot = {} for seg in segments: sp = seg["speaker"] tot.setdefault(sp, 0.0) tot[sp] += float(seg["duration"]) valid_speakers = {sp for sp, t in tot.items() if t >= float(min_speaker_total)} kept, removed = [], [] for seg in segments: reasons = [] if seg["speaker"] not in valid_speakers: reasons.append(f"speaker_total_duration<{min_speaker_total}s") if float(seg["duration"]) < float(min_segment_duration): reasons.append(f"segment_duration<{min_segment_duration}s") if reasons: seg2 = dict(seg) seg2["tag"] = "remove" seg2["remove_reason"] = ";".join(reasons) removed.append(seg2) else: seg2 = dict(seg) seg2["tag"] = "use" seg2["remove_reason"] = "" kept.append(seg2) return kept, removed, sorted(list(valid_speakers)) # def enrich_with_overlap(segments: list, overlap_timeline: list): # """ # เติม: has_overlap, overlap_intervals, overlap_ratio # """ # for seg in segments: # s, e = float(seg["start"]), float(seg["end"]) # overlaps = [] # total = 0.0 # for (os, oe) in overlap_timeline: # inter = _interval_intersection((s, e), (os, oe)) # if inter: # overlaps.append([round(inter[0], 3), round(inter[1], 3)]) # total += (inter[1] - inter[0]) # dur = max(1e-9, e - s) # seg["has_overlap"] = bool(overlaps) # seg["overlap_intervals"] = overlaps # seg["overlap_ratio"] = float(total / dur) # return segments def diarize_audio(audio_path: str) -> pd.DataFrame: sr = 16000 min_segment_duration = 3.0 min_speaker_total = 5.0 compute_confidence = True # 1) Diarization diar = pipelines[0](audio_path) segments = [] for turn, _, speaker in diar.itertracks(yield_label=True): segments.append({ "speaker": str(speaker), "start": float(turn.start), "end": float(turn.end), "duration": float(turn.end - turn.start), }) # 2) Confidence if compute_confidence: add_confidence_to_segments(audio_path, sr, segments) else: for s in segments: s["confidence"] = 0.5 # 3) Tagging kept, removed, _ = tag_segments_use_or_remove( segments, min_segment_duration=min_segment_duration, min_speaker_total=min_speaker_total ) # # 4) Overlap # ov_tl = detect_overlap_timeline(audio_path) # kept = enrich_with_overlap(kept, ov_tl) # removed = enrich_with_overlap(removed, ov_tl) # 5) Combine all_rows = kept + removed all_rows.sort(key=lambda r: r["start"]) df = pd.DataFrame(all_rows, columns=[ "speaker","start","end","duration","confidence", "tag","remove_reason" ]) return df