Spaces:
Sleeping
Sleeping
import os | |
import json | |
import mimetypes | |
import requests | |
import time | |
from yt_dlp import YoutubeDL | |
from reportlab.lib.pagesizes import letter | |
from reportlab.lib.styles import getSampleStyleSheet | |
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer | |
from reportlab.lib.units import inch | |
import gradio as gr | |
from langchain_community.document_loaders import PyPDFLoader | |
from langchain_openai import ChatOpenAI | |
from openai import OpenAI, DefaultHttpxClient | |
from langchain_chroma import Chroma | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain_openai import OpenAIEmbeddings | |
from langchain_community.document_loaders import WebBaseLoader | |
from langchain_core.runnables import RunnableLambda | |
from langchain_core.runnables.passthrough import RunnableAssign | |
from langchain_core.prompts import ChatPromptTemplate | |
from langchain_core.output_parsers import StrOutputParser | |
from langchain.output_parsers import PydanticOutputParser | |
from langchain_core.pydantic_v1 import BaseModel, Field | |
from typing import List | |
from pprint import pprint | |
def download_youtube_video(youtube_url, download_path): | |
try: | |
ydl_opts = { | |
'cookiefile': "cookies.txt", | |
'format': 'bestaudio/best', | |
'outtmpl': os.path.join(download_path, '%(title)s.%(ext)s'), | |
'postprocessors': [{ | |
'key': 'FFmpegExtractAudio', | |
'preferredcodec': 'mp3', | |
'preferredquality': '192', | |
}], | |
} | |
with YoutubeDL(ydl_opts) as ydl: | |
info_dict = ydl.extract_info(youtube_url, download=True) | |
title = info_dict.get('title', None) | |
filename = ydl.prepare_filename(info_dict).replace('.webm', '.mp3').replace('.m4a', '.mp3') | |
return filename, title | |
except Exception as e: | |
print(f"Failed to download video from {youtube_url}: {e}") | |
return None, None | |
def upload_file(filepath, api_key): | |
url = "https://api.monsterapi.ai/v1/upload" | |
headers = { | |
"accept": "application/json", | |
"authorization": f"Bearer {api_key}" | |
} | |
file_name = os.path.basename(filepath) | |
get_file_urls = requests.get(f"{url}?filename={file_name}", headers=headers) | |
if get_file_urls.status_code != 200: | |
print(f"Failed to get upload URL: {get_file_urls.status_code}") | |
return None | |
response_json = get_file_urls.json() | |
upload_url = response_json['upload_url'] | |
download_url = response_json['download_url'] | |
data = open(filepath, 'rb').read() | |
file_headers = { | |
"Content-Type": mimetypes.guess_type(filepath)[0], | |
} | |
file_uploaded = requests.put(upload_url, data=data, headers=file_headers) | |
if file_uploaded.status_code == 200: | |
print(f"File successfully uploaded. Usable link is {download_url}") | |
return download_url | |
else: | |
print(f"Failed to upload file: {file_uploaded.status_code}") | |
return None | |
def generate_process_id(download_url, api_key): | |
whisper_url = "https://api.monsterapi.ai/v1/generate/whisper" | |
payload = { | |
"file": f"{download_url}", | |
"language": "en" | |
} | |
headers = { | |
"accept": "application/json", | |
"content-type": "application/json", | |
"authorization": f"Bearer {api_key}" | |
} | |
response = requests.post(whisper_url, json=payload, headers=headers) | |
if response.status_code != 200: | |
print(f"Failed to generate process ID: {response.status_code}") | |
return None | |
else: | |
process_id = response.json().get("process_id") | |
print(f"Process ID is: {process_id}") | |
return process_id | |
def query_job_status(job_id, api_key): | |
transcript = "" | |
url = f"https://api.monsterapi.ai/v1/status/{job_id}" | |
headers = { | |
"accept": "application/json", | |
"authorization": f"Bearer {api_key}" | |
} | |
while True: | |
response = requests.get(url, headers=headers) | |
if response.status_code != 200: | |
print(f"Failed to get status: {response.status_code}") | |
return transcript | |
status = response.json().get("status") | |
if status in ["COMPLETED", "FAILED"]: | |
print(f"Job status: {status}") | |
if status == "COMPLETED": | |
transcript = response.json().get("result")["text"] | |
return transcript | |
print(f"Job status: {status}, checking again in 5 seconds...") | |
time.sleep(5) | |
def create_pdf(transcripts, file_path): | |
doc = SimpleDocTemplate(file_path, pagesize=letter) | |
styles = getSampleStyleSheet() | |
story = [] | |
for i, (title, transcript) in enumerate(transcripts, start=1): | |
story.append(Paragraph(f'YouTube Video {i} Title: {title}', styles['Title'])) | |
story.append(Spacer(1, 12)) | |
story.append(Paragraph(f'YouTube Video {i} Transcript:', styles['Heading2'])) | |
story.append(Spacer(1, 12)) | |
story.append(Paragraph(transcript.replace('\n', '<br/>'), styles['BodyText'])) | |
story.append(Spacer(1, 24)) | |
doc.build(story) | |
import gradio as gr | |
import os | |
from langchain_community.document_loaders import PyPDFLoader | |
from langchain_openai import ChatOpenAI | |
from openai import OpenAI, DefaultHttpxClient | |
from langchain_chroma import Chroma | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain_openai import OpenAIEmbeddings | |
from langchain_community.document_loaders import WebBaseLoader | |
from langchain_core.runnables import RunnableLambda | |
from langchain_core.runnables.passthrough import RunnableAssign | |
from langchain_core.prompts import ChatPromptTemplate | |
from langchain_core.output_parsers import StrOutputParser | |
from langchain.output_parsers import PydanticOutputParser | |
from langchain_core.pydantic_v1 import BaseModel, Field | |
from typing import List | |
from pprint import pprint | |
os.environ["OPENAI_API_KEY"] = "sk-proj-3XiMKGvrD8ev35tnGZ76T3BlbkFJmUSzs9Xpq8RBVF7tMyMh" | |
class DocumentSummaryBase(BaseModel): | |
running_summary: str = Field("", description="Running description of the document. Do not override; only update!") | |
main_ideas: List[str] = Field([], description="Most important information from the document (max 3)") | |
loose_ends: List[str] = Field([], description="Open questions that would be good to incorporate into summary, but that are yet unknown (max 3)") | |
def transcribe_and_save(youtube_urls): | |
download_path = os.getcwd() | |
api_key = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VybmFtZSI6Ijc4YTFjM2JmYzY4NTRlYmE0YWIxNzkwNzMwZjVlYjY4IiwiY3JlYXRlZF9hdCI6IjIwMjQtMDYtMjFUMDU6Mzc6MzkuNDU1MTM5In0.5-eKWqvK3x11CysTdfjvV36FityW-d_0N2hhht_HajA" | |
pdf_output_path = os.getcwd()+"/transcripts.pdf" | |
transcripts = [] | |
for youtube_url in youtube_urls: | |
filepath, title = download_youtube_video(youtube_url, download_path) | |
if filepath and title: | |
download_url = upload_file(filepath, api_key) | |
if download_url: | |
process_id = generate_process_id(download_url, api_key) | |
if process_id: | |
transcript = query_job_status(process_id, api_key) | |
transcripts.append((title, transcript)) | |
# Save all transcripts into a PDF file | |
create_pdf(transcripts, "transcripts.pdf") | |
def RExtract(pydantic_class, llm, prompt): | |
''' | |
Runnable Extraction module | |
Returns a knowledge dictionary populated by slot-filling extraction | |
''' | |
parser = PydanticOutputParser(pydantic_object=pydantic_class) | |
instruct_merge = RunnableAssign({'format_instructions' : lambda x: parser.get_format_instructions()}) | |
def preparse(string): | |
if '{' not in string: string = '{' + string | |
if '}' not in string: string = string + '}' | |
string = (string | |
.replace("\\_", "_") | |
.replace("\n", " ") | |
.replace("\]", "]") | |
.replace("\[", "[") | |
) | |
# print(string) ## Good for diagnostics | |
return string | |
return instruct_merge | prompt | llm | preparse | parser | |
def RSummarizer(knowledge, llm, prompt, verbose=False): | |
''' | |
Exercise: Create a chain that summarizes | |
''' | |
def summarize_docs(docs): | |
parse_chain = RunnableAssign({"info_base": RExtract(knowledge.__class__, llm, prompt)}) | |
state = {"info_base": knowledge} | |
all_summaries = [] # List to store all intermediate summaries | |
for i, doc in enumerate(docs): | |
state['input'] = doc.page_content | |
state = parse_chain.invoke(state) | |
# Store the current info_base in the list | |
all_summaries.append(state['info_base'].dict()) | |
if verbose: | |
print(f"Considered {i+1} documents") | |
pprint(state['info_base'].dict()) | |
return all_summaries | |
return RunnableLambda(summarize_docs) | |
def find_first_non_empty_summary(summaries): | |
for summary in reversed(summaries): | |
if summary['loose_ends'] or summary['main_ideas'] or summary['running_summary']: | |
return summary | |
return None | |
def create_running_summary(url): | |
loader = WebBaseLoader(url) | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1200,chunk_overlap=100,separators=["\n\n", "\n", ".", ";", ",", " ", ""]) | |
documents = loader.load() | |
docs_split = text_splitter.split_documents(documents) | |
summary_prompt =ChatPromptTemplate.from_template("""You are generating a running summary of the document. Make it readable by a technical user. | |
After this, the old knowledge base will be replaced by the new one. Make sure a reader can still understand everything. | |
Keep it short, but as dense and useful as possible! The information should flow from chunk to (loose ends or main ideas) to running_summary. | |
Strictly output a json and nothing else do not output any strings or explanations just the json is enough. | |
The updated knowledge base keep all of the information from running_summary here: {info_base}. | |
{format_instructions}. Follow the format precisely, including quotations and commas\n\n | |
{info_base}\nWithout losing any of the info, update the knowledge base with the following: {input}""") | |
instruct_model = llm_1 | StrOutputParser() | |
summarizer = RSummarizer(DocumentSummaryBase(), instruct_model, summary_prompt, verbose=True) | |
summaries = summarizer.invoke(docs_split) | |
summary = find_first_non_empty_summary(summaries) | |
return summary | |
def setup_vectorstore(): | |
embeddings = OpenAIEmbeddings(model="text-embedding-3-large") | |
vector_store = Chroma(collection_name="collection-1",embedding_function=embeddings,persist_directory="./vectorstore",) | |
loader = PyPDFLoader(os.getcwd()+"/transcripts.pdf") | |
documents = loader.load() | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=250,chunk_overlap=0,separators=["\n\n"]) | |
text = text_splitter.split_documents(documents) | |
retriever = vector_store.as_retriever() | |
retriever.add_documents(text) | |
return retriever | |
def generate(content,examples): | |
chat_template = ChatPromptTemplate.from_template("""Your are provided with a few sample youtube video scripts below. | |
your task is to create a similar script for the following content provided to you below. | |
Follow the style followd in the examples and create a similar script for the content givent to you. | |
Create me a script for a youtube video explaining the following content: {content}. | |
Here are a few example scripts of my previous videos that you have to adapt: {examples}.""") | |
gen_chain = chat_template | llm_2 | StrOutputParser() | |
return gen_chain.invoke({"content": content, "examples": examples}) | |
def docs2str(docs, title="Document"): | |
out_str = "" | |
for doc in docs: | |
doc_name = getattr(doc, 'metadata', {}).get('Title', title) | |
if doc_name: | |
out_str += f"[Quote from {doc_name}] " | |
out_str += getattr(doc, 'page_content', str(doc)) + "\n" | |
return out_str | |
llm_1 = ChatOpenAI( | |
model="google/gemma-2-9b-it", | |
temperature=0, | |
max_tokens=None, | |
timeout=None, | |
max_retries=2, | |
api_key="eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VybmFtZSI6Ijc4YTFjM2JmYzY4NTRlYmE0YWIxNzkwNzMwZjVlYjY4IiwiY3JlYXRlZF9hdCI6IjIwMjQtMDYtMjFUMDU6Mzc6MzkuNDU1MTM5In0.5-eKWqvK3x11CysTdfjvV36FityW-d_0N2hhht_HajA", | |
base_url="https://llm.monsterapi.ai/v1/", | |
http_client=DefaultHttpxClient(verify = False) | |
) | |
llm_2 = ChatOpenAI( | |
model="meta-llama/Meta-Llama-3.1-8B-Instruct", | |
temperature=0, | |
max_tokens=None, | |
timeout=None, | |
max_retries=2, | |
api_key="eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VybmFtZSI6Ijc4YTFjM2JmYzY4NTRlYmE0YWIxNzkwNzMwZjVlYjY4IiwiY3JlYXRlZF9hdCI6IjIwMjQtMDYtMjFUMDU6Mzc6MzkuNDU1MTM5In0.5-eKWqvK3x11CysTdfjvV36FityW-d_0N2hhht_HajA", | |
base_url="https://llm.monsterapi.ai/v1/", | |
http_client=DefaultHttpxClient(verify = False) | |
) | |
def process_links(style_links, context_link): | |
# Here you can define the processing logic for the links. | |
style_links = style_links.split(",") | |
style_links = [link.strip() for link in style_links] | |
transcribe_and_save(style_links) | |
retriever = setup_vectorstore() | |
summary = create_running_summary(context_link) | |
summary = summary['running_summary'] | |
print("Summarized the url successfully:", summary) | |
examples = retriever.invoke(summary) | |
return generate(summary,examples) | |
# Define the Gradio interface | |
with gr.Blocks() as demo: | |
gr.Markdown("## Link Processor") | |
style_links = gr.Textbox(lines=5, placeholder="Enter style links separated by commas", label="Style Links") | |
context_link = gr.Textbox(lines=1, placeholder="Enter context link", label="Context Link") | |
output = gr.Textbox(lines=2, label="Output") | |
process_button = gr.Button("Process") | |
process_button.click(process_links, inputs=[style_links, context_link], outputs=output) | |
demo.launch() | |