|
import gradio as gr |
|
import torch |
|
from faster_whisper import WhisperModel |
|
import yt_dlp |
|
from openai import OpenAI |
|
import os |
|
import json |
|
import time |
|
import uuid |
|
|
|
print("Initializing transcription model (faster-whisper)...") |
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
compute_type = "float16" if device == "cuda" else "int8" |
|
model_size = "large-v3-turbo" |
|
model = WhisperModel(model_size, device=device, compute_type=compute_type) |
|
print("Transcription model loaded successfully.") |
|
|
|
def download_youtube_audio(url: str) -> str: |
|
unique_id = uuid.uuid4() |
|
output_template = f'{unique_id}.%(ext)s' |
|
final_filepath = f'{unique_id}.mp3' |
|
ydl_opts = { |
|
'format': 'bestaudio/best', |
|
'postprocessors': [{'key': 'FFmpegExtractAudio', 'preferredcodec': 'mp3', 'preferredquality': '192'}], |
|
'outtmpl': output_template, |
|
'quiet': True, |
|
'overwrite': True, |
|
} |
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
ydl.download([url]) |
|
return final_filepath |
|
|
|
def transcribe_and_summarize(audio_file: str, youtube_url: str): |
|
log_history = "" |
|
def log(message): |
|
nonlocal log_history |
|
timestamp = time.strftime("%H:%M:%S") |
|
log_history += f"[{timestamp}] {message}\n" |
|
return log_history |
|
|
|
loading_message = "⏳ Generating summary..." |
|
yield log("Process started."), "", "" |
|
|
|
api_key = os.getenv('TYPHOON_API') |
|
if not api_key: |
|
yield log("TYPHOON_API environment variable not set."), "", gr.Markdown("## Error\nAPI key missing") |
|
return |
|
|
|
if audio_file is None and not youtube_url: |
|
raise gr.Error("Please upload an audio file or provide a YouTube link.") |
|
|
|
filepath = "" |
|
is_downloaded = False |
|
try: |
|
if youtube_url: |
|
yield log("Downloading YouTube audio..."), "", "" |
|
filepath = download_youtube_audio(youtube_url) |
|
is_downloaded = True |
|
yield log(f"Downloaded to {filepath}"), "", "" |
|
else: |
|
filepath = audio_file |
|
|
|
yield log("Transcription started (Language: Thai)..."), "", "" |
|
segments, info = model.transcribe(filepath, beam_size=5, language="th", task="transcribe") |
|
transcribed_text = "" |
|
for segment in segments: |
|
line = f"[{segment.start:.2f}s -> {segment.end:.2f}s] {segment.text.strip()}" |
|
transcribed_text += segment.text + " " |
|
yield log(line), transcribed_text, "" |
|
|
|
yield log("Transcription complete."), transcribed_text, "" |
|
yield log("Sending to AI for summarization..."), transcribed_text, loading_message |
|
|
|
client = OpenAI(api_key=api_key, base_url="https://api.opentyphoon.ai/v1") |
|
system_prompt = f"""You are an automated system that converts transcripts into a blog post. |
|
Your ONLY function is to output a valid JSON object. All text values in the JSON MUST be in the Thai language. |
|
หน้าที่เดียวของคุณคือการส่งออกอ็อบเจกต์ JSON ที่ถูกต้อง โดยค่าที่เป็นข้อความทั้งหมดต้องเป็นภาษาไทยเท่านั้น |
|
Do NOT write any explanations. The response MUST start with `{{` and end with `}}`. |
|
|
|
The JSON object must have the following structure: |
|
{{ |
|
"title": "หัวข้อบทความที่น่าสนใจและเกี่ยวข้อง (เป็นภาษาไทย)", |
|
"key_takeaway": "สรุปใจความสำคัญของเนื้อหาทั้งหมดในหนึ่งย่อหน้า (เป็นภาษาไทย)", |
|
"main_ideas": [ |
|
"ประเด็นหลักหรือใจความสำคัญ (เป็นภาษาไทย)", |
|
"ประเด็นหลักถัดไป...", |
|
"และต่อไปเรื่อยๆ..." |
|
], |
|
"conclusion": "ย่อหน้าสรุปปิดท้าย (เป็นภาษาไทย)" |
|
}}""" |
|
response = client.chat.completions.create( |
|
model="typhoon-v2.1-12b-instruct", |
|
messages=[{"role": "system", "content": system_prompt}, {"role": "user", "content": transcribed_text}], |
|
max_tokens=2048, |
|
temperature=0.7 |
|
) |
|
summary_json_string = response.choices[0].message.content |
|
if summary_json_string.strip().startswith("```json"): |
|
summary_json_string = summary_json_string.strip()[7:-4].strip() |
|
|
|
data = json.loads(summary_json_string) |
|
title = data.get("title", "Title Not Found") |
|
key_takeaway = data.get("key_takeaway", "") |
|
main_ideas = data.get("main_ideas", []) |
|
conclusion = data.get("conclusion", "") |
|
summary_markdown = f"# {title}\n\n<p>{key_takeaway}</p>\n\n## Key Ideas\n\n<ul>" |
|
for idea in main_ideas: |
|
summary_markdown += f"<li>{idea}</li>" |
|
summary_markdown += f"</ul>\n\n## Conclusion\n\n<p>{conclusion}</p>" |
|
|
|
yield log("Summarization complete."), transcribed_text, summary_markdown |
|
|
|
finally: |
|
if is_downloaded and os.path.exists(filepath): |
|
os.remove(filepath) |
|
|
|
def update_video_preview(url): |
|
if not url: |
|
return gr.update(value=None, visible=False) |
|
video_id = None |
|
try: |
|
if "youtube.com/shorts/" in url: |
|
video_id = url.split("/shorts/")[1].split("?")[0] |
|
elif "watch?v=" in url: |
|
video_id = url.split("watch?v=")[1].split("&")[0] |
|
elif "youtu.be/" in url: |
|
video_id = url.split("youtu.be/")[1].split("?")[0] |
|
except IndexError: |
|
pass |
|
if video_id: |
|
embed_url = f"https://www.youtube.com/embed/{video_id}" |
|
iframe_html = f'<iframe width="100%" height="315" src="{embed_url}" frameborder="0" allowfullscreen></iframe>' |
|
return gr.update(value=iframe_html, visible=True) |
|
return gr.update(value=None, visible=False) |
|
|
|
css = """ |
|
@import url('https://fonts.googleapis.com/css2?family=Sarabun:wght@400;700&display=swap'); |
|
.blog-output { font-family: 'Sarabun', sans-serif; line-height: 1.8; max-width: 800px; margin: auto; padding: 2rem; border-radius: 12px; background-color: #ffffff; border: 1px solid #e5e7eb; } |
|
.blog-output h1 { font-size: 2.2em; font-weight: 700; border-bottom: 2px solid #f3f4f6; padding-bottom: 15px; margin-bottom: 25px; color: #111827; } |
|
.blog-output h2 { font-size: 1.6em; font-weight: 700; margin-top: 40px; margin-bottom: 20px; color: #1f2937; } |
|
.blog-output p { font-size: 1.1em; margin-bottom: 20px; color: #374151; } |
|
.blog-output ul { padding-left: 25px; list-style-type: disc; } |
|
.blog-output li { margin-bottom: 12px; padding-left: 5px; } |
|
""" |
|
|
|
with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue", secondary_hue="sky"), css=css) as demo: |
|
gr.Markdown("# 🎙️ Audio to Blog Summarizer ✒️") |
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
with gr.Tabs(): |
|
with gr.TabItem("⬆️ Upload Audio"): |
|
audio_file_input = gr.Audio(label="Upload Audio File", type="filepath") |
|
with gr.TabItem("🔗 YouTube Link"): |
|
youtube_url_input = gr.Textbox(label="YouTube URL", placeholder="Paste a YouTube link here...") |
|
submit_button = gr.Button("🚀 Generate Blog Post", variant="primary") |
|
video_preview = gr.HTML(visible=False) |
|
with gr.Accordion("📝 View Process Log", open=True): |
|
log_output = gr.Textbox(label="Log", interactive=False, lines=10) |
|
with gr.Column(scale=2): |
|
gr.Markdown("## ✨ Article Output") |
|
blog_summary_output = gr.Markdown(elem_classes=["blog-output"]) |
|
with gr.Accordion("📜 View Full Transcription", open=False): |
|
transcription_output = gr.Textbox(label="Full Text", interactive=False, lines=10) |
|
|
|
submit_button.click(fn=transcribe_and_summarize, |
|
inputs=[audio_file_input, youtube_url_input], |
|
outputs=[log_output, transcription_output, blog_summary_output]) |
|
youtube_url_input.change(fn=update_video_preview, |
|
inputs=youtube_url_input, |
|
outputs=video_preview) |
|
demo.load(fn=update_video_preview, |
|
inputs=youtube_url_input, |
|
outputs=video_preview) |
|
|
|
if __name__ == "__main__": |
|
demo.launch(debug=True) |
|
|