Spaces:
Paused
Paused
| # ============================ | |
| # utils.py — Utility Functions | |
| # ============================ | |
| import os | |
| import shutil | |
| from fastapi import UploadFile | |
| from moviepy.editor import VideoFileClip | |
| from pydub import AudioSegment, effects | |
| import pandas as pd | |
| import numpy as np | |
| from collections import Counter | |
| import time | |
| from config import UPLOAD_FOLDER | |
| from models import pipelines, models, together | |
| import subprocess | |
| import librosa | |
| # from pythainlp.spell import correct_sent | |
| def save_uploaded_file(file: UploadFile) -> str: | |
| os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
| filepath = os.path.join(UPLOAD_FOLDER, file.filename) | |
| with open(filepath, "wb") as f: | |
| shutil.copyfileobj(file.file, f) | |
| return filepath | |
| def correct_text_with_tokenizer(text: str) -> str: | |
| tokens = word_tokenize(text, engine="newmm") | |
| corrected_tokens = [correct(word) for word in tokens] | |
| return ''.join(corrected_tokens) | |
| def extract_and_normalize_audio(file_path: str) -> str: | |
| ext = os.path.splitext(file_path)[1].lower() | |
| audio_path = os.path.join(UPLOAD_FOLDER, "extracted_audio.wav") | |
| if ext == ".mp4": | |
| clip = VideoFileClip(file_path) | |
| clip.audio.write_audiofile(audio_path) | |
| elif ext in [".mp3", ".wav"]: | |
| audio_path = file_path | |
| else: | |
| raise ValueError("รองรับเฉพาะไฟล์ mp4, mp3, wav เท่านั้น") | |
| audio = AudioSegment.from_file(audio_path) | |
| normalized_audio = effects.normalize(audio) | |
| cleaned_path = os.path.join(UPLOAD_FOLDER, "cleaned.wav") | |
| normalized_audio.export(cleaned_path, format="wav") | |
| return cleaned_path | |
| def split_segments(audio_path: str, df: pd.DataFrame, stretch_factor: float = 1.25) -> str: | |
| segment_folder = os.path.join(UPLOAD_FOLDER, "segments") | |
| # ล้างของเก่าแล้วสร้างใหม่ | |
| if os.path.exists(segment_folder): | |
| shutil.rmtree(segment_folder) | |
| os.makedirs(segment_folder, exist_ok=True) | |
| audio = AudioSegment.from_file(audio_path) | |
| for i, row in df.iterrows(): | |
| start_ms = int(row['start'] * 1000) | |
| end_ms = int(row['end'] * 1000) | |
| segment = audio[start_ms:end_ms] | |
| # Export temp segment | |
| temp_path = os.path.join(segment_folder, f"temp_{i:03d}.wav") | |
| segment.export(temp_path, format="wav") | |
| # Final output path (after stretch) | |
| output_path = os.path.join(segment_folder, f"segment_{i:03d}_{row['speaker']}.wav") | |
| # Stretch ด้วย ffmpeg (พูดช้าลงแต่ไม่เพี้ยน) | |
| subprocess.run([ | |
| "ffmpeg", "-y", "-i", temp_path, | |
| "-filter:a", f"atempo={1/stretch_factor:.3f}", | |
| output_path | |
| ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| # ลบ temp | |
| os.remove(temp_path) | |
| return segment_folder | |
| def transcribe_segments(segment_folder: str) -> pd.DataFrame: | |
| files = sorted([f for f in os.listdir(segment_folder) if f.endswith(".wav")]) | |
| model = models[0] | |
| results = [] | |
| for filename in files: | |
| segment_path = os.path.join(segment_folder, filename) | |
| try: | |
| segments, _ = model.transcribe( | |
| segment_path, | |
| language="th", | |
| beam_size=5, | |
| vad_filter=True, | |
| word_timestamps=True | |
| ) | |
| # ดึงคำทั้งหมดจากทุก segment | |
| words = [word for seg in segments if hasattr(seg, "words") for word in seg.words] | |
| if words: | |
| full_text = ''.join([w.word for w in words]) | |
| probs = [w.probability for w in words if w.probability is not None] | |
| avg_prob = round(np.mean(probs), 4) if probs else 0.0 | |
| results.append({ | |
| "filename": filename, | |
| "text": full_text, | |
| "avg_probability": avg_prob, | |
| }) | |
| else: | |
| results.append({ | |
| "filename": filename, | |
| "text": "", | |
| "avg_probability": 0.0, | |
| }) | |
| except Exception as e: | |
| print(f"❌ Error with {filename}: {e}") | |
| results.append({ | |
| "filename": filename, | |
| "text": "", | |
| "avg_probability": 0.0, | |
| "error": str(e) | |
| }) | |
| return pd.DataFrame(results) | |
| def clean_summary(text): | |
| import re | |
| if not text or len(str(text).strip()) == 0: | |
| return "ไม่มีข้อมูลสำคัญที่จะสรุป" | |
| text = str(text) | |
| patterns_to_remove = [ | |
| r'สรุป:\s*', r'สรุปการประชุม:\s*', r'บทสรุป:\s*', r'ข้อสรุป:\s*', | |
| r'\*\*Key Messages:\*\*|\*\*หัวข้อหลัก:\*\*', r'\*\*Action Items:\*\*|\*\*ประเด็นสำคัญ:\*\*', | |
| r'\*\*Summary:\*\*|\*\*สรุป:\*\*', | |
| r'^[-•]\s*Key Messages?:?\s*', r'^[-•]\s*Action Items?:?\s*', r'^[-•]\s*หัวข้อหลัก:?', | |
| r'^[-•]\s*ประเด็นสำคัญ:?', r'^[-•]\s*ข้อมูลน่าสนใจ:?', r'^[-•]\s*บทสรุป:?', | |
| r'\r\n|\r|\n', r'\t+', | |
| r'หมายเหตุ:.*?(?=\n|\r|$)', r'เนื่องจาก.*?(?=\n|\r|$)', r'ไม่มีข้อความ.*?(?=\n|\r|$)', | |
| r'ไม่มีประเด็น.*?(?=\n|\r|$)', r'ไม่มี Action Items.*?(?=\n|\r|$)', r'ไม่มีรายการ.*?(?=\n|\r|$)', | |
| r'ต้องการข้อมูลเพิ่มเติม.*?(?=\n|\r|$)', r'ต้องขอความชัดเจนเพิ่มเติม.*?(?=\n|\r|$)', | |
| r'\(ตัดประโยคที่ไม่เกี่ยวข้องหรือซ้ำซ้อนออก.*?\)', r'\(.*?เพื่อเน้นความชัดเจน.*?\)', | |
| r'ตามที่ได้กล่าวไว้.*?(?=\n|\r|$)', r'จากข้อความที่ให้มา.*?(?=\n|\r|$)', | |
| r'Based on the provided text.*?(?=\n|\r|$)', r'According to the text.*?(?=\n|\r|$)', | |
| r'\s+' | |
| ] | |
| cleaned_text = text | |
| for pattern in patterns_to_remove: | |
| if pattern == r'\s+': | |
| cleaned_text = re.sub(pattern, ' ', cleaned_text) | |
| else: | |
| cleaned_text = re.sub(pattern, '', cleaned_text, flags=re.IGNORECASE | re.MULTILINE | re.DOTALL) | |
| cleaned_text = re.sub(r'\*\*(.*?)\*\*', r'\1', cleaned_text) | |
| cleaned_text = re.sub(r'\*(.*?)\*', r'\1', cleaned_text) | |
| cleaned_text = re.sub(r'_{2,}(.*?)_{2,}', r'\1', cleaned_text) | |
| cleaned_text = re.sub(r'[.]{3,}', '...', cleaned_text) | |
| cleaned_text = re.sub(r'[!]{2,}', '!', cleaned_text) | |
| cleaned_text = re.sub(r'[?]{2,}', '?', cleaned_text) | |
| cleaned_text = re.sub(r'^[-•*]\s*', '', cleaned_text, flags=re.MULTILINE) | |
| cleaned_text = re.sub(r'^\d+\.\s*', '', cleaned_text, flags=re.MULTILINE) | |
| useless_phrases = [ | |
| 'ไม่มี', 'ไม่สามารถสรุปได้', 'ข้อความต้นฉบับไม่มีความหมาย', 'ไม่มีข้อมูลเพียงพอ', | |
| 'ไม่มีประเด็นสำคัญ', 'ไม่มี Action Items', 'ต้องขอความชัดเจนเพิ่มเติม', | |
| 'ไม่มีข้อมูลที่สำคัญ', 'ไม่สามารถระบุได้', 'ข้อมูลไม่ชัดเจน', 'ไม่มีเนื้อหาที่เกี่ยวข้อง', | |
| 'N/A', 'n/a', 'Not applicable', 'No content', 'No summary available' | |
| ] | |
| cleaned_text = cleaned_text.strip() | |
| if (len(cleaned_text) < 15 or | |
| any(phrase.lower() in cleaned_text.lower() for phrase in useless_phrases) or | |
| cleaned_text.lower() in [phrase.lower() for phrase in useless_phrases]): | |
| return "ไม่มีข้อมูลสำคัญที่จะสรุปมากพอ" | |
| cleaned_text = re.sub(r'\s+([.!?])', r'\1', cleaned_text) | |
| cleaned_text = re.sub(r'([.!?])\s*([A-Za-zก-๙])', r'\1 \2', cleaned_text) | |
| return cleaned_text | |
| def summarize_texts(texts, api_key, model="deepseek-ai/DeepSeek-V3", delay=0): | |
| import time | |
| def _is_quota_error(err_msg: str) -> bool: | |
| msg = str(err_msg).lower() | |
| keys = [ | |
| "insufficient_quota", "insufficient quota", "insufficient credits", | |
| "out of credits", "credit exhausted", "quota", "429", | |
| "rate limit", "too many requests", "token exhausted" | |
| ] | |
| return any(k in msg for k in keys) | |
| summaries = [] | |
| texts = [t if t is not None else "" for t in texts] | |
| for idx, text in enumerate(texts): | |
| prompt = f""" | |
| สรุปข้อความประชุมนี้เป็นภาษาไทยสั้น ๆ เน้นประเด็นสำคัญ (key messages) และ Action Items โดยตัดรายละเอียดที่ไม่สำคัญออก: | |
| ข้อความ: | |
| {text} | |
| สรุป: | |
| - Key Messages: | |
| - Action Items: | |
| """.strip() | |
| try: | |
| response = together.chat.completions.create( | |
| model=model, | |
| messages=[ | |
| {"role": "system", "content": "คุณเป็นผู้เชี่ยวชาญในการสรุปเนื้อหา ตอบเป็นภาษาไทยเสมอ เน้นหัวข้อหลักและข้อมูลสำคัญ"}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| max_tokens=1024, | |
| temperature=0.7, | |
| ) | |
| summary = (response.choices[0].message.content or "").strip() | |
| summary = clean_summary(summary) # สมมติว่าฟังก์ชันนี้มีอยู่แล้วในโค้ดคุณ | |
| summaries.append(summary) | |
| except Exception as e: | |
| print(f"Error at index {idx}: {e}") | |
| if _is_quota_error(e): | |
| summaries.append(" - ") | |
| else: | |
| summaries.append("ไม่สามารถสรุปได้") | |
| if idx < len(texts) - 1: | |
| time.sleep(delay) | |
| return summaries | |
| def add_corrected_text_column(df): | |
| # แก้ประโยคแต่ละบรรทัด แล้วเพิ่มคอลัมน์ใหม่ | |
| # df["nlp_correct_text"] = df["text"].apply(lambda text: correct_sent(text) if isinstance(text, str) else "") | |
| return df | |
| def add_llm_spell_corrected_text_column(df, model="google/gemma-3-27b-it", delay=1.5): | |
| import time | |
| def _is_quota_error(err_msg: str) -> bool: | |
| msg = err_msg.lower() | |
| # ครอบให้กว้าง: โควต้า/เครดิตหมด, โดน rate limit, token exhausted ฯลฯ | |
| keys = [ | |
| "insufficient_quota", "insufficient quota", "insufficient credits", | |
| "out of credits", "credit exhausted", "quota", "429", | |
| "rate limit", "too many requests", "token exhausted" | |
| ] | |
| return any(k in msg for k in keys) | |
| texts = df["text"].fillna("").astype(str).tolist() | |
| corrected = [] | |
| for idx, text in enumerate(texts): | |
| prompt = f""" | |
| กรุณาแก้ไขข้อความต่อไปนี้ให้ถูกต้องตามหลักภาษาไทย: | |
| - แก้ไขคำสะกดผิด คำพิมพ์ผิด หรือคำที่ไม่ถูกต้องและการผันวรรณยุกต์ผิด | |
| - ห้ามเปลี่ยนความหมาย | |
| - ห้ามตอบเกิน | |
| - **ตอบกลับเฉพาะข้อความที่แก้แล้ว** | |
| {text} | |
| """.strip() | |
| try: | |
| response = together.chat.completions.create( | |
| model=model, | |
| messages=[ | |
| { | |
| "role": "system", | |
| "content": """คุณคือนักภาษาศาสตร์ผู้เชี่ยวชาญด้านการตรวจสอบคำสะกดผิด คำพิมพ์ผิด และการผันวรรณยุกต์ผิดของภาษาไทย | |
| หน้าที่ของคุณคือแก้ไขคำผิดในข้อความที่ได้รับให้ถูกต้องตามมาตรฐานภาษาไทย โดยไม่เปลี่ยนความหมายเดิม | |
| หน้าที่ของคุณ: | |
| - แก้ไขข้อความภาษาไทยให้ถูกต้องตามหลักภาษาไทยมาตรฐาน | |
| - ตรวจสอบคำสะกดผิด คำพิมพ์ผิด และการผันวรรณยุกต์ผิด | |
| - แก้คำเพี้ยน คำที่มาจากเสียงพูด เช่น ภาษาวัยรุ่นหรือคำพูดที่ออกเสียงคล้ายกัน ให้เป็นคำที่ถูกต้อง | |
| - รักษาความหมายเดิมของข้อความให้มากที่สุด | |
| - ห้ามแปลความใหม่ ห้ามตีความเกิน ห้ามปรับสำนวน | |
| - ห้ามอธิบาย หรือใส่คำพูดใด ๆ เพิ่มเติมก่อนหรือหลังข้อความ | |
| - **ให้ตอบกลับเฉพาะข้อความที่แก้ไขแล้วเท่านั้น** | |
| ตัวอย่าง: | |
| ผู้ใช้: ผมไช้คอมพิวเตอรทุกวัน | |
| คุณ: ผมใช้คอมพิวเตอร์ทุกวัน | |
| ผู้ใช้: ปวดหัวจะตายุ่ละ | |
| คุณ: ปวดหัวจะตายอยู่ละ | |
| ผู้ใช้: ไอ้เส้นหลั่งกุ้ง | |
| คุณ: ไอ้เส้นหลังกุ้ง | |
| ผู้ใช้: เซโยโมมันน่ากลัว | |
| คุณ: เชื้อโรคมันน่ากลัว | |
| จงตอบกลับเฉพาะข้อความที่แก้ไขแล้วตามตัวอย่างข้างต้นเท่านั้น | |
| """ | |
| }, | |
| {"role": "user", "content": prompt} | |
| ], | |
| max_tokens=256, | |
| temperature=0.3, | |
| ) | |
| corrected_text = (response.choices[0].message.content or "").strip() | |
| corrected.append(corrected_text) | |
| except Exception as e: | |
| err = str(e) | |
| print(f"❌ Error at index {idx}: {err}") | |
| if _is_quota_error(err): | |
| corrected.append(" - ") | |
| else: | |
| corrected.append("") | |
| if idx < len(texts) - 1: | |
| time.sleep(delay) | |
| df["llm_corrected_text"] = corrected | |
| return df | |
| # def _merge_intervals(intervals, gap=0.0): | |
| # if not intervals: | |
| # return [] | |
| # intervals = sorted(intervals, key=lambda x: x[0]) | |
| # merged = [list(intervals[0])] | |
| # for s, e in intervals[1:]: | |
| # if s <= merged[-1][1] + gap: | |
| # merged[-1][1] = max(merged[-1][1], e) | |
| # else: | |
| # merged.append([s, e]) | |
| # return [(float(a), float(b)) for a, b in merged] | |
| # def _interval_intersection(a, b): | |
| # s = max(a[0], b[0]); e = min(a[1], b[1]) | |
| # return (s, e) if e > s else None | |
| # def detect_overlap_timeline(audio_path: str): | |
| # """ | |
| # คืนรายการช่วงเวลาที่มีการพูดซ้อน [(start, end), ...] | |
| # ถ้าโหลดโมเดลไม่ได้ → คืน [] | |
| # """ | |
| # if overlap_pipeline is None: | |
| # return [] | |
| # try: | |
| # ov = overlap_pipeline(audio_path) # pyannote Annotation | |
| # intervals = [(float(seg.start), float(seg.end)) for seg in ov.get_timeline()] | |
| # return _merge_intervals(intervals) | |
| # except Exception as e: | |
| # print(f"⚠️ Overlap detection failed: {e}") | |
| # return [] | |
| def _confidence_metrics(audio_seg, sr): | |
| try: | |
| rms = librosa.feature.rms(y=audio_seg)[0] | |
| snr_est = float(np.mean(rms) / (np.std(rms) + 1e-9)) | |
| zcr = float(np.mean(librosa.feature.zero_crossing_rate(audio_seg)[0])) | |
| dur = len(audio_seg) / sr | |
| # normalize แบบง่าย | |
| snr_score = min(snr_est / 10.0, 1.0) | |
| zcr_score = 1.0 if 0.05 <= zcr <= 0.15 else 0.5 | |
| dur_score = min(dur / 5.0, 1.0) | |
| conf = 0.5 * snr_score + 0.2 * zcr_score + 0.3 * dur_score | |
| return max(0.0, min(1.0, conf)) | |
| except Exception: | |
| return 0.5 | |
| def add_confidence_to_segments(audio_path: str, sr: int, segments: list): | |
| """ | |
| เติม key 'confidence' ให้แต่ละ segment (in-place) | |
| segment: {'start','end','speaker','duration', ...} | |
| """ | |
| audio, _sr = librosa.load(audio_path, sr=sr) | |
| for seg in segments: | |
| s = int(seg["start"] * sr); e = int(seg["end"] * sr) | |
| piece = audio[s:e] if 0 <= s < e <= len(audio) else np.array([]) | |
| conf = _confidence_metrics(piece, sr) if piece.size > 0 else 0.5 | |
| seg["confidence"] = float(conf) | |
| return segments | |
| def tag_segments_use_or_remove(segments: list, min_segment_duration=3.0, min_speaker_total=5.0): | |
| # รวมเวลาแต่ละ speaker | |
| tot = {} | |
| for seg in segments: | |
| sp = seg["speaker"] | |
| tot.setdefault(sp, 0.0) | |
| tot[sp] += float(seg["duration"]) | |
| valid_speakers = {sp for sp, t in tot.items() if t >= float(min_speaker_total)} | |
| kept, removed = [], [] | |
| for seg in segments: | |
| reasons = [] | |
| if seg["speaker"] not in valid_speakers: | |
| reasons.append(f"speaker_total_duration<{min_speaker_total}s") | |
| if float(seg["duration"]) < float(min_segment_duration): | |
| reasons.append(f"segment_duration<{min_segment_duration}s") | |
| if reasons: | |
| seg2 = dict(seg) | |
| seg2["tag"] = "remove" | |
| seg2["remove_reason"] = ";".join(reasons) | |
| removed.append(seg2) | |
| else: | |
| seg2 = dict(seg) | |
| seg2["tag"] = "use" | |
| seg2["remove_reason"] = "" | |
| kept.append(seg2) | |
| return kept, removed, sorted(list(valid_speakers)) | |
| # def enrich_with_overlap(segments: list, overlap_timeline: list): | |
| # """ | |
| # เติม: has_overlap, overlap_intervals, overlap_ratio | |
| # """ | |
| # for seg in segments: | |
| # s, e = float(seg["start"]), float(seg["end"]) | |
| # overlaps = [] | |
| # total = 0.0 | |
| # for (os, oe) in overlap_timeline: | |
| # inter = _interval_intersection((s, e), (os, oe)) | |
| # if inter: | |
| # overlaps.append([round(inter[0], 3), round(inter[1], 3)]) | |
| # total += (inter[1] - inter[0]) | |
| # dur = max(1e-9, e - s) | |
| # seg["has_overlap"] = bool(overlaps) | |
| # seg["overlap_intervals"] = overlaps | |
| # seg["overlap_ratio"] = float(total / dur) | |
| # return segments | |
| def diarize_audio(audio_path: str) -> pd.DataFrame: | |
| sr = 16000 | |
| min_segment_duration = 3.0 | |
| min_speaker_total = 5.0 | |
| compute_confidence = True | |
| # 1) Diarization | |
| diar = pipelines[0](audio_path) | |
| segments = [] | |
| for turn, _, speaker in diar.itertracks(yield_label=True): | |
| segments.append({ | |
| "speaker": str(speaker), | |
| "start": float(turn.start), | |
| "end": float(turn.end), | |
| "duration": float(turn.end - turn.start), | |
| }) | |
| # 2) Confidence | |
| if compute_confidence: | |
| add_confidence_to_segments(audio_path, sr, segments) | |
| else: | |
| for s in segments: | |
| s["confidence"] = 0.5 | |
| # 3) Tagging | |
| kept, removed, _ = tag_segments_use_or_remove( | |
| segments, | |
| min_segment_duration=min_segment_duration, | |
| min_speaker_total=min_speaker_total | |
| ) | |
| # # 4) Overlap | |
| # ov_tl = detect_overlap_timeline(audio_path) | |
| # kept = enrich_with_overlap(kept, ov_tl) | |
| # removed = enrich_with_overlap(removed, ov_tl) | |
| # 5) Combine | |
| all_rows = kept + removed | |
| all_rows.sort(key=lambda r: r["start"]) | |
| df = pd.DataFrame(all_rows, columns=[ | |
| "speaker","start","end","duration","confidence", | |
| "tag","remove_reason" | |
| ]) | |
| return df |