import os
import json
import mimetypes
import requests
import time
from yt_dlp import YoutubeDL
from reportlab.lib.pagesizes import letter
from reportlab.lib.styles import getSampleStyleSheet
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer
from reportlab.lib.units import inch
import gradio as gr
from langchain_community.document_loaders import PyPDFLoader
from langchain_openai import ChatOpenAI
from openai import OpenAI, DefaultHttpxClient
from langchain_chroma import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_community.document_loaders import WebBaseLoader
from langchain_core.runnables import RunnableLambda
from langchain_core.runnables.passthrough import RunnableAssign
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain.output_parsers import PydanticOutputParser
from langchain_core.pydantic_v1 import BaseModel, Field
from typing import List
from pprint import pprint
def download_youtube_video(youtube_url, download_path):
ydl_opts = {
'cookiefile': "cookies.txt",
'format': 'bestaudio/best',
'outtmpl': os.path.join(download_path, '%(title)s.%(ext)s'),
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
with YoutubeDL(ydl_opts) as ydl:
info_dict = ydl.extract_info(youtube_url, download=True)
title = info_dict.get('title', None)
filename = ydl.prepare_filename(info_dict).replace('.webm', '.mp3').replace('.m4a', '.mp3')
return filename, title
except Exception as e:
print(f"Failed to download video from {youtube_url}: {e}")
return None, None
def upload_file(filepath, api_key):
url = ""
headers = {
"accept": "application/json",
"authorization": f"Bearer {api_key}"
file_name = os.path.basename(filepath)
get_file_urls = requests.get(f"{url}?filename={file_name}", headers=headers)
if get_file_urls.status_code != 200:
print(f"Failed to get upload URL: {get_file_urls.status_code}")
return None
response_json = get_file_urls.json()
upload_url = response_json['upload_url']
download_url = response_json['download_url']
data = open(filepath, 'rb').read()
file_headers = {
"Content-Type": mimetypes.guess_type(filepath)[0],
file_uploaded = requests.put(upload_url, data=data, headers=file_headers)
if file_uploaded.status_code == 200:
print(f"File successfully uploaded. Usable link is {download_url}")
return download_url
print(f"Failed to upload file: {file_uploaded.status_code}")
return None
def generate_process_id(download_url, api_key):
whisper_url = ""
payload = {
"file": f"{download_url}",
"language": "en"
headers = {
"accept": "application/json",
"content-type": "application/json",
"authorization": f"Bearer {api_key}"
response =, json=payload, headers=headers)
if response.status_code != 200:
print(f"Failed to generate process ID: {response.status_code}")
return None
process_id = response.json().get("process_id")
print(f"Process ID is: {process_id}")
return process_id
def query_job_status(job_id, api_key):
transcript = ""
url = f"{job_id}"
headers = {
"accept": "application/json",
"authorization": f"Bearer {api_key}"
while True:
response = requests.get(url, headers=headers)
if response.status_code != 200:
print(f"Failed to get status: {response.status_code}")
return transcript
status = response.json().get("status")
if status in ["COMPLETED", "FAILED"]:
print(f"Job status: {status}")
if status == "COMPLETED":
transcript = response.json().get("result")["text"]
return transcript
print(f"Job status: {status}, checking again in 5 seconds...")
def create_pdf(transcripts, file_path):
doc = SimpleDocTemplate(file_path, pagesize=letter)
styles = getSampleStyleSheet()
story = []
for i, (title, transcript) in enumerate(transcripts, start=1):
story.append(Paragraph(f'YouTube Video {i} Title: {title}', styles['Title']))
story.append(Spacer(1, 12))
story.append(Paragraph(f'YouTube Video {i} Transcript:', styles['Heading2']))
story.append(Spacer(1, 12))
story.append(Paragraph(transcript.replace('\n', '<br/>'), styles['BodyText']))
story.append(Spacer(1, 24))
class DocumentSummaryBase(BaseModel):
running_summary: str = Field("", description="Running description of the document. Do not override; only update!")
main_ideas: List[str] = Field([], description="Most important information from the document (max 3)")
loose_ends: List[str] = Field([], description="Open questions that would be good to incorporate into summary, but that are yet unknown (max 3)")
def transcribe_and_save(youtube_urls):
download_path = os.getcwd()
pdf_output_path = os.getcwd()+"/transcripts.pdf"
transcripts = []
for youtube_url in youtube_urls:
filepath, title = download_youtube_video(youtube_url, download_path)
if filepath and title:
download_url = upload_file(filepath, api_key)
if download_url:
process_id = generate_process_id(download_url, api_key)
if process_id:
transcript = query_job_status(process_id, api_key)
transcripts.append((title, transcript))
# Save all transcripts into a PDF file
create_pdf(transcripts, "transcripts.pdf")
def RExtract(pydantic_class, llm, prompt):
Runnable Extraction module
Returns a knowledge dictionary populated by slot-filling extraction
parser = PydanticOutputParser(pydantic_object=pydantic_class)
instruct_merge = RunnableAssign({'format_instructions' : lambda x: parser.get_format_instructions()})
def preparse(string):
if '{' not in string: string = '{' + string
if '}' not in string: string = string + '}'
string = (string
.replace("\\_", "_")
.replace("\n", " ")
.replace("\]", "]")
.replace("\[", "[")
# print(string) ## Good for diagnostics
return string
return instruct_merge | prompt | llm | preparse | parser
def RSummarizer(knowledge, llm, prompt, verbose=False):
Exercise: Create a chain that summarizes
def summarize_docs(docs):
parse_chain = RunnableAssign({"info_base": RExtract(knowledge.__class__, llm, prompt)})
state = {"info_base": knowledge}
all_summaries = [] # List to store all intermediate summaries
for i, doc in enumerate(docs):
state['input'] = doc.page_content
state = parse_chain.invoke(state)
# Store the current info_base in the list
if verbose:
print(f"Considered {i+1} documents")
return all_summaries
return RunnableLambda(summarize_docs)
def find_first_non_empty_summary(summaries):
for summary in reversed(summaries):
if summary['loose_ends'] or summary['main_ideas'] or summary['running_summary']:
return summary
return None
def create_running_summary(url):
loader = WebBaseLoader(url)
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1200,chunk_overlap=100,separators=["\n\n", "\n", ".", ";", ",", " ", ""])
documents = loader.load()
docs_split = text_splitter.split_documents(documents)
summary_prompt =ChatPromptTemplate.from_template("""You are generating a running summary of the document. Make it readable by a technical user.
After this, the old knowledge base will be replaced by the new one. Make sure a reader can still understand everything.
Keep it short, but as dense and useful as possible! The information should flow from chunk to (loose ends or main ideas) to running_summary.
Strictly output a json and nothing else do not output any strings or explanations just the json is enough.
The updated knowledge base keep all of the information from running_summary here: {info_base}.
{format_instructions}. Follow the format precisely, including quotations and commas\n\n
{info_base}\nWithout losing any of the info, update the knowledge base with the following: {input}""")
instruct_model = llm_1 | StrOutputParser()
summarizer = RSummarizer(DocumentSummaryBase(), instruct_model, summary_prompt, verbose=True)
summaries = summarizer.invoke(docs_split)
summary = find_first_non_empty_summary(summaries)
return summary
def setup_vectorstore():
embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
vector_store = Chroma(collection_name="collection-1",embedding_function=embeddings,persist_directory="./vectorstore",)
loader = PyPDFLoader(os.getcwd()+"/transcripts.pdf")
documents = loader.load()
text_splitter = RecursiveCharacterTextSplitter(chunk_size=250,chunk_overlap=0,separators=["\n\n"])
text = text_splitter.split_documents(documents)
retriever = vector_store.as_retriever()
return retriever
def generate(content,examples):
chat_template = ChatPromptTemplate.from_template("""Your are provided with a few sample youtube video scripts below.
your task is to create a similar script for the following content provided to you below.
Follow the style followd in the examples and create a similar script for the content givent to you.
Create me a script for a youtube video explaining the following content: {content}.
Here are a few example scripts of my previous videos that you have to adapt: {examples}.""")
gen_chain = chat_template | llm_2 | StrOutputParser()
return gen_chain.invoke({"content": content, "examples": examples})
def docs2str(docs, title="Document"):
out_str = ""
for doc in docs:
doc_name = getattr(doc, 'metadata', {}).get('Title', title)
if doc_name:
out_str += f"[Quote from {doc_name}] "
out_str += getattr(doc, 'page_content', str(doc)) + "\n"
return out_str
llm_1 = ChatOpenAI(
http_client=DefaultHttpxClient(verify = False)
llm_2 = ChatOpenAI(
http_client=DefaultHttpxClient(verify = False)
def process_links(style_links, context_link):
# Here you can define the processing logic for the links.
style_links = style_links.split(",")
style_links = [link.strip() for link in style_links]
retriever = setup_vectorstore()
summary = create_running_summary(context_link)
summary = summary['running_summary']
print("Summarized the url successfully:", summary)
examples = retriever.invoke(summary)
return generate(summary,examples)
# Define the Gradio interface
with gr.Blocks() as demo:
gr.Markdown("## Link Processor")
style_links = gr.Textbox(lines=5, placeholder="Enter style links separated by commas", label="Style Links")
context_link = gr.Textbox(lines=1, placeholder="Enter context link", label="Context Link")
output = gr.Textbox(lines=2, label="Output")
process_button = gr.Button("Process"), inputs=[style_links, context_link], outputs=output)