|
|
|
|
|
|
|
import os |
|
import shutil |
|
from fastapi import UploadFile |
|
from moviepy.editor import VideoFileClip |
|
from pydub import AudioSegment, effects |
|
import pandas as pd |
|
import numpy as np |
|
from collections import Counter |
|
import time |
|
from config import UPLOAD_FOLDER |
|
from models import pipelines, models, together |
|
import subprocess |
|
import librosa |
|
|
|
|
|
def save_uploaded_file(file: UploadFile) -> str: |
|
os.makedirs(UPLOAD_FOLDER, exist_ok=True) |
|
filepath = os.path.join(UPLOAD_FOLDER, file.filename) |
|
with open(filepath, "wb") as f: |
|
shutil.copyfileobj(file.file, f) |
|
return filepath |
|
|
|
def correct_text_with_tokenizer(text: str) -> str: |
|
tokens = word_tokenize(text, engine="newmm") |
|
corrected_tokens = [correct(word) for word in tokens] |
|
return ''.join(corrected_tokens) |
|
|
|
def extract_and_normalize_audio(file_path: str) -> str: |
|
ext = os.path.splitext(file_path)[1].lower() |
|
audio_path = os.path.join(UPLOAD_FOLDER, "extracted_audio.wav") |
|
if ext == ".mp4": |
|
clip = VideoFileClip(file_path) |
|
clip.audio.write_audiofile(audio_path) |
|
elif ext in [".mp3", ".wav"]: |
|
audio_path = file_path |
|
else: |
|
raise ValueError("รองรับเฉพาะไฟล์ mp4, mp3, wav เท่านั้น") |
|
audio = AudioSegment.from_file(audio_path) |
|
normalized_audio = effects.normalize(audio) |
|
cleaned_path = os.path.join(UPLOAD_FOLDER, "cleaned.wav") |
|
normalized_audio.export(cleaned_path, format="wav") |
|
return cleaned_path |
|
|
|
def split_segments(audio_path: str, df: pd.DataFrame, stretch_factor: float = 1.25) -> str: |
|
segment_folder = os.path.join(UPLOAD_FOLDER, "segments") |
|
|
|
|
|
if os.path.exists(segment_folder): |
|
shutil.rmtree(segment_folder) |
|
os.makedirs(segment_folder, exist_ok=True) |
|
|
|
audio = AudioSegment.from_file(audio_path) |
|
|
|
for i, row in df.iterrows(): |
|
start_ms = int(row['start'] * 1000) |
|
end_ms = int(row['end'] * 1000) |
|
segment = audio[start_ms:end_ms] |
|
|
|
|
|
temp_path = os.path.join(segment_folder, f"temp_{i:03d}.wav") |
|
segment.export(temp_path, format="wav") |
|
|
|
|
|
output_path = os.path.join(segment_folder, f"segment_{i:03d}_{row['speaker']}.wav") |
|
|
|
|
|
subprocess.run([ |
|
"ffmpeg", "-y", "-i", temp_path, |
|
"-filter:a", f"atempo={1/stretch_factor:.3f}", |
|
output_path |
|
], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) |
|
|
|
|
|
os.remove(temp_path) |
|
|
|
return segment_folder |
|
|
|
def transcribe_segments(segment_folder: str) -> pd.DataFrame: |
|
files = sorted([f for f in os.listdir(segment_folder) if f.endswith(".wav")]) |
|
model = models[0] |
|
|
|
results = [] |
|
|
|
for filename in files: |
|
segment_path = os.path.join(segment_folder, filename) |
|
|
|
try: |
|
segments, _ = model.transcribe( |
|
segment_path, |
|
language="th", |
|
beam_size=5, |
|
vad_filter=True, |
|
word_timestamps=True |
|
) |
|
|
|
|
|
words = [word for seg in segments if hasattr(seg, "words") for word in seg.words] |
|
|
|
if words: |
|
full_text = ''.join([w.word for w in words]) |
|
probs = [w.probability for w in words if w.probability is not None] |
|
avg_prob = round(np.mean(probs), 4) if probs else 0.0 |
|
|
|
results.append({ |
|
"filename": filename, |
|
"text": full_text, |
|
"avg_probability": avg_prob, |
|
}) |
|
else: |
|
results.append({ |
|
"filename": filename, |
|
"text": "", |
|
"avg_probability": 0.0, |
|
}) |
|
|
|
except Exception as e: |
|
print(f"❌ Error with {filename}: {e}") |
|
results.append({ |
|
"filename": filename, |
|
"text": "", |
|
"avg_probability": 0.0, |
|
"error": str(e) |
|
}) |
|
|
|
return pd.DataFrame(results) |
|
|
|
def clean_summary(text): |
|
import re |
|
if not text or len(str(text).strip()) == 0: |
|
return "ไม่มีข้อมูลสำคัญที่จะสรุป" |
|
text = str(text) |
|
patterns_to_remove = [ |
|
r'สรุป:\s*', r'สรุปการประชุม:\s*', r'บทสรุป:\s*', r'ข้อสรุป:\s*', |
|
r'\*\*Key Messages:\*\*|\*\*หัวข้อหลัก:\*\*', r'\*\*Action Items:\*\*|\*\*ประเด็นสำคัญ:\*\*', |
|
r'\*\*Summary:\*\*|\*\*สรุป:\*\*', |
|
r'^[-•]\s*Key Messages?:?\s*', r'^[-•]\s*Action Items?:?\s*', r'^[-•]\s*หัวข้อหลัก:?', |
|
r'^[-•]\s*ประเด็นสำคัญ:?', r'^[-•]\s*ข้อมูลน่าสนใจ:?', r'^[-•]\s*บทสรุป:?', |
|
r'\r\n|\r|\n', r'\t+', |
|
r'หมายเหตุ:.*?(?=\n|\r|$)', r'เนื่องจาก.*?(?=\n|\r|$)', r'ไม่มีข้อความ.*?(?=\n|\r|$)', |
|
r'ไม่มีประเด็น.*?(?=\n|\r|$)', r'ไม่มี Action Items.*?(?=\n|\r|$)', r'ไม่มีรายการ.*?(?=\n|\r|$)', |
|
r'ต้องการข้อมูลเพิ่มเติม.*?(?=\n|\r|$)', r'ต้องขอความชัดเจนเพิ่มเติม.*?(?=\n|\r|$)', |
|
r'\(ตัดประโยคที่ไม่เกี่ยวข้องหรือซ้ำซ้อนออก.*?\)', r'\(.*?เพื่อเน้นความชัดเจน.*?\)', |
|
r'ตามที่ได้กล่าวไว้.*?(?=\n|\r|$)', r'จากข้อความที่ให้มา.*?(?=\n|\r|$)', |
|
r'Based on the provided text.*?(?=\n|\r|$)', r'According to the text.*?(?=\n|\r|$)', |
|
r'\s+' |
|
] |
|
cleaned_text = text |
|
for pattern in patterns_to_remove: |
|
if pattern == r'\s+': |
|
cleaned_text = re.sub(pattern, ' ', cleaned_text) |
|
else: |
|
cleaned_text = re.sub(pattern, '', cleaned_text, flags=re.IGNORECASE | re.MULTILINE | re.DOTALL) |
|
cleaned_text = re.sub(r'\*\*(.*?)\*\*', r'\1', cleaned_text) |
|
cleaned_text = re.sub(r'\*(.*?)\*', r'\1', cleaned_text) |
|
cleaned_text = re.sub(r'_{2,}(.*?)_{2,}', r'\1', cleaned_text) |
|
cleaned_text = re.sub(r'[.]{3,}', '...', cleaned_text) |
|
cleaned_text = re.sub(r'[!]{2,}', '!', cleaned_text) |
|
cleaned_text = re.sub(r'[?]{2,}', '?', cleaned_text) |
|
cleaned_text = re.sub(r'^[-•*]\s*', '', cleaned_text, flags=re.MULTILINE) |
|
cleaned_text = re.sub(r'^\d+\.\s*', '', cleaned_text, flags=re.MULTILINE) |
|
useless_phrases = [ |
|
'ไม่มี', 'ไม่สามารถสรุปได้', 'ข้อความต้นฉบับไม่มีความหมาย', 'ไม่มีข้อมูลเพียงพอ', |
|
'ไม่มีประเด็นสำคัญ', 'ไม่มี Action Items', 'ต้องขอความชัดเจนเพิ่มเติม', |
|
'ไม่มีข้อมูลที่สำคัญ', 'ไม่สามารถระบุได้', 'ข้อมูลไม่ชัดเจน', 'ไม่มีเนื้อหาที่เกี่ยวข้อง', |
|
'N/A', 'n/a', 'Not applicable', 'No content', 'No summary available' |
|
] |
|
cleaned_text = cleaned_text.strip() |
|
if (len(cleaned_text) < 15 or |
|
any(phrase.lower() in cleaned_text.lower() for phrase in useless_phrases) or |
|
cleaned_text.lower() in [phrase.lower() for phrase in useless_phrases]): |
|
return "ไม่มีข้อมูลสำคัญที่จะสรุปมากพอ" |
|
cleaned_text = re.sub(r'\s+([.!?])', r'\1', cleaned_text) |
|
cleaned_text = re.sub(r'([.!?])\s*([A-Za-zก-๙])', r'\1 \2', cleaned_text) |
|
return cleaned_text |
|
|
|
def summarize_texts(texts, api_key, model="deepseek-ai/DeepSeek-V3", delay=0): |
|
import time |
|
|
|
def _is_quota_error(err_msg: str) -> bool: |
|
msg = str(err_msg).lower() |
|
keys = [ |
|
"insufficient_quota", "insufficient quota", "insufficient credits", |
|
"out of credits", "credit exhausted", "quota", "429", |
|
"rate limit", "too many requests", "token exhausted" |
|
] |
|
return any(k in msg for k in keys) |
|
|
|
summaries = [] |
|
texts = [t if t is not None else "" for t in texts] |
|
|
|
for idx, text in enumerate(texts): |
|
prompt = f""" |
|
สรุปข้อความประชุมนี้เป็นภาษาไทยสั้น ๆ เน้นประเด็นสำคัญ (key messages) และ Action Items โดยตัดรายละเอียดที่ไม่สำคัญออก: |
|
ข้อความ: |
|
{text} |
|
สรุป: |
|
- Key Messages: |
|
- Action Items: |
|
""".strip() |
|
|
|
try: |
|
response = together.chat.completions.create( |
|
model=model, |
|
messages=[ |
|
{"role": "system", "content": "คุณเป็นผู้เชี่ยวชาญในการสรุปเนื้อหา ตอบเป็นภาษาไทยเสมอ เน้นหัวข้อหลักและข้อมูลสำคัญ"}, |
|
{"role": "user", "content": prompt} |
|
], |
|
max_tokens=1024, |
|
temperature=0.7, |
|
) |
|
summary = (response.choices[0].message.content or "").strip() |
|
summary = clean_summary(summary) |
|
summaries.append(summary) |
|
|
|
except Exception as e: |
|
print(f"Error at index {idx}: {e}") |
|
if _is_quota_error(e): |
|
summaries.append(" - ") |
|
else: |
|
summaries.append("ไม่สามารถสรุปได้") |
|
|
|
if idx < len(texts) - 1: |
|
time.sleep(delay) |
|
|
|
return summaries |
|
|
|
def add_corrected_text_column(df): |
|
|
|
|
|
return df |
|
|
|
def add_llm_spell_corrected_text_column(df, model="google/gemma-3-27b-it", delay=1.5): |
|
import time |
|
|
|
def _is_quota_error(err_msg: str) -> bool: |
|
msg = err_msg.lower() |
|
|
|
keys = [ |
|
"insufficient_quota", "insufficient quota", "insufficient credits", |
|
"out of credits", "credit exhausted", "quota", "429", |
|
"rate limit", "too many requests", "token exhausted" |
|
] |
|
return any(k in msg for k in keys) |
|
|
|
texts = df["text"].fillna("").astype(str).tolist() |
|
corrected = [] |
|
|
|
for idx, text in enumerate(texts): |
|
prompt = f""" |
|
กรุณาแก้ไขข้อความต่อไปนี้ให้ถูกต้องตามหลักภาษาไทย: |
|
|
|
- แก้ไขคำสะกดผิด คำพิมพ์ผิด หรือคำที่ไม่ถูกต้องและการผันวรรณยุกต์ผิด |
|
- ห้ามเปลี่ยนความหมาย |
|
- ห้ามตอบเกิน |
|
- **ตอบกลับเฉพาะข้อความที่แก้แล้ว** |
|
{text} |
|
""".strip() |
|
|
|
try: |
|
response = together.chat.completions.create( |
|
model=model, |
|
messages=[ |
|
{ |
|
"role": "system", |
|
"content": """คุณคือนักภาษาศาสตร์ผู้เชี่ยวชาญด้านการตรวจสอบคำสะกดผิด คำพิมพ์ผิด และการผันวรรณยุกต์ผิดของภาษาไทย |
|
หน้าที่ของคุณคือแก้ไขคำผิดในข้อความที่ได้รับให้ถูกต้องตามมาตรฐานภาษาไทย โดยไม่เปลี่ยนความหมายเดิม |
|
|
|
หน้าที่ของคุณ: |
|
- แก้ไขข้อความภาษาไทยให้ถูกต้องตามหลักภาษาไทยมาตรฐาน |
|
- ตรวจสอบคำสะกดผิด คำพิมพ์ผิด และการผันวรรณยุกต์ผิด |
|
- แก้คำเพี้ยน คำที่มาจากเสียงพูด เช่น ภาษาวัยรุ่นหรือคำพูดที่ออกเสียงคล้ายกัน ให้เป็นคำที่ถูกต้อง |
|
- รักษาความหมายเดิมของข้อความให้มากที่สุด |
|
- ห้ามแปลความใหม่ ห้ามตีความเกิน ห้ามปรับสำนวน |
|
- ห้ามอธิบาย หรือใส่คำพูดใด ๆ เพิ่มเติมก่อนหรือหลังข้อความ |
|
- **ให้ตอบกลับเฉพาะข้อความที่แก้ไขแล้วเท่านั้น** |
|
|
|
ตัวอย่าง: |
|
|
|
ผู้ใช้: ผมไช้คอมพิวเตอรทุกวัน |
|
คุณ: ผมใช้คอมพิวเตอร์ทุกวัน |
|
|
|
ผู้ใช้: ปวดหัวจะตายุ่ละ |
|
คุณ: ปวดหัวจะตายอยู่ละ |
|
|
|
ผู้ใช้: ไอ้เส้นหลั่งกุ้ง |
|
คุณ: ไอ้เส้นหลังกุ้ง |
|
|
|
ผู้ใช้: เซโยโมมันน่ากลัว |
|
คุณ: เชื้อโรคมันน่ากลัว |
|
|
|
จงตอบกลับเฉพาะข้อความที่แก้ไขแล้วตามตัวอย่างข้างต้นเท่านั้น |
|
""" |
|
}, |
|
{"role": "user", "content": prompt} |
|
], |
|
max_tokens=256, |
|
temperature=0.3, |
|
) |
|
|
|
corrected_text = (response.choices[0].message.content or "").strip() |
|
corrected.append(corrected_text) |
|
|
|
except Exception as e: |
|
err = str(e) |
|
print(f"❌ Error at index {idx}: {err}") |
|
if _is_quota_error(err): |
|
corrected.append(" - ") |
|
else: |
|
corrected.append("") |
|
|
|
if idx < len(texts) - 1: |
|
time.sleep(delay) |
|
|
|
df["llm_corrected_text"] = corrected |
|
return df |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _confidence_metrics(audio_seg, sr): |
|
try: |
|
rms = librosa.feature.rms(y=audio_seg)[0] |
|
snr_est = float(np.mean(rms) / (np.std(rms) + 1e-9)) |
|
zcr = float(np.mean(librosa.feature.zero_crossing_rate(audio_seg)[0])) |
|
dur = len(audio_seg) / sr |
|
|
|
snr_score = min(snr_est / 10.0, 1.0) |
|
zcr_score = 1.0 if 0.05 <= zcr <= 0.15 else 0.5 |
|
dur_score = min(dur / 5.0, 1.0) |
|
conf = 0.5 * snr_score + 0.2 * zcr_score + 0.3 * dur_score |
|
return max(0.0, min(1.0, conf)) |
|
except Exception: |
|
return 0.5 |
|
|
|
def add_confidence_to_segments(audio_path: str, sr: int, segments: list): |
|
""" |
|
เติม key 'confidence' ให้แต่ละ segment (in-place) |
|
segment: {'start','end','speaker','duration', ...} |
|
""" |
|
audio, _sr = librosa.load(audio_path, sr=sr) |
|
for seg in segments: |
|
s = int(seg["start"] * sr); e = int(seg["end"] * sr) |
|
piece = audio[s:e] if 0 <= s < e <= len(audio) else np.array([]) |
|
conf = _confidence_metrics(piece, sr) if piece.size > 0 else 0.5 |
|
seg["confidence"] = float(conf) |
|
return segments |
|
|
|
def tag_segments_use_or_remove(segments: list, min_segment_duration=3.0, min_speaker_total=5.0): |
|
|
|
tot = {} |
|
for seg in segments: |
|
sp = seg["speaker"] |
|
tot.setdefault(sp, 0.0) |
|
tot[sp] += float(seg["duration"]) |
|
|
|
valid_speakers = {sp for sp, t in tot.items() if t >= float(min_speaker_total)} |
|
|
|
kept, removed = [], [] |
|
for seg in segments: |
|
reasons = [] |
|
if seg["speaker"] not in valid_speakers: |
|
reasons.append(f"speaker_total_duration<{min_speaker_total}s") |
|
if float(seg["duration"]) < float(min_segment_duration): |
|
reasons.append(f"segment_duration<{min_segment_duration}s") |
|
|
|
if reasons: |
|
seg2 = dict(seg) |
|
seg2["tag"] = "remove" |
|
seg2["remove_reason"] = ";".join(reasons) |
|
removed.append(seg2) |
|
else: |
|
seg2 = dict(seg) |
|
seg2["tag"] = "use" |
|
seg2["remove_reason"] = "" |
|
kept.append(seg2) |
|
|
|
return kept, removed, sorted(list(valid_speakers)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def diarize_audio(audio_path: str) -> pd.DataFrame: |
|
sr = 16000 |
|
min_segment_duration = 3.0 |
|
min_speaker_total = 5.0 |
|
compute_confidence = True |
|
|
|
|
|
diar = pipelines[0](audio_path) |
|
segments = [] |
|
for turn, _, speaker in diar.itertracks(yield_label=True): |
|
segments.append({ |
|
"speaker": str(speaker), |
|
"start": float(turn.start), |
|
"end": float(turn.end), |
|
"duration": float(turn.end - turn.start), |
|
}) |
|
|
|
|
|
if compute_confidence: |
|
add_confidence_to_segments(audio_path, sr, segments) |
|
else: |
|
for s in segments: |
|
s["confidence"] = 0.5 |
|
|
|
|
|
kept, removed, _ = tag_segments_use_or_remove( |
|
segments, |
|
min_segment_duration=min_segment_duration, |
|
min_speaker_total=min_speaker_total |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
all_rows = kept + removed |
|
all_rows.sort(key=lambda r: r["start"]) |
|
|
|
df = pd.DataFrame(all_rows, columns=[ |
|
"speaker","start","end","duration","confidence", |
|
"tag","remove_reason" |
|
]) |
|
return df |