import streamlit as st
from ai_config_faiss import get_ai_assistant
from ttv_web_scraper import db_load_metadata_sets
import json
from datetime import datetime
import os
import base64
# Initialize session state
if 'results' not in st.session_state:
st.session_state.results = None
if 'where' not in st.session_state:
st.session_state.where = {}
if 'num_results' not in st.session_state:
st.session_state.num_results = 3
if 'favorites' not in st.session_state:
st.session_state.favorites = {}
if 'show_filters' not in st.session_state:
st.session_state.show_filters = True
# Initialize filter selections
for filter_type in ['company', 'speaker', 'subjects']:
if f'selected_{filter_type}' not in st.session_state:
st.session_state[f'selected_{filter_type}'] = []
@st.cache_resource
def get_assistant():
return get_ai_assistant()
def format_timestamp(timestamp):
try:
time = datetime.strptime(timestamp, "%H:%M:%S")
return time.strftime("%M:%S")
except ValueError:
return timestamp
def get_file_content(file_path):
if os.path.exists(file_path):
with open(file_path, "rb") as file:
return file.read()
return None
def create_markdown_download_link(markdown_content):
b64 = base64.b64encode(markdown_content.encode()).decode()
return f'Download Favorites'
def update_filter(filter_type, item):
if item in st.session_state[f'selected_{filter_type}']:
st.session_state[f'selected_{filter_type}'].remove(item)
else:
st.session_state[f'selected_{filter_type}'].append(item)
update_where()
def update_where():
st.session_state.where = {}
for filter_type in ['company', 'speaker', 'subjects']:
if st.session_state[f'selected_{filter_type}']:
st.session_state.where[filter_type] = st.session_state[f'selected_{filter_type}']
def toggle_show_filters():
st.session_state.show_filters = not st.session_state.show_filters
def update_num_results():
st.session_state.num_results = st.session_state.num_results_slider
def submit_query():
if not st.session_state.where:
st.warning("Please select at least one filter before submitting.")
return
assistant = get_assistant()
with st.spinner("Thinking..."):
response = assistant.query("", num_results=st.session_state.num_results, filters=st.session_state.where)
try:
st.session_state.results = json.loads(response)
except json.JSONDecodeError:
st.error("Failed to parse the response. Please try again.")
def update_favorite(result_id):
result = next((r for r in st.session_state.results if r['id'] == result_id), None)
if result:
result['favorite'] = not result['favorite']
if result['favorite']:
st.session_state.favorites[result_id] = result
else:
st.session_state.favorites.pop(result_id, None)
def clear_favorites():
st.session_state.favorites.clear()
st.success("All favorites have been cleared.")
def save_favorites():
if st.session_state.favorites:
markdown_content = "# Favorites\n\n"
for fav in st.session_state.favorites.values():
markdown_content += f"## {fav['metadata']['title']}\n\n"
markdown_content += f"**Speaker:** {fav['metadata']['speaker']} ({fav['metadata']['company']})\n\n"
markdown_content += f"**Date:** {fav['metadata']['date']}\n\n"
markdown_content += f"**Time:** {format_timestamp(fav['metadata']['start_timestamp'])} - {format_timestamp(fav['metadata']['end_timestamp'])}\n\n"
markdown_content += f"**Transcript:** {fav['content']}\n\n"
play_link = fav['metadata']['play']
modified_play_link = f"{play_link}&controls=1&showinfo=0&modestbranding=1"
markdown_content += f"**Video Link:** [{play_link}]({modified_play_link})\n\n"
if fav['metadata']['subjects']:
markdown_content += f"**Subjects:** {', '.join(fav['metadata']['subjects'])}\n\n"
markdown_content += "---\n\n"
st.markdown(create_markdown_download_link(markdown_content), unsafe_allow_html=True)
else:
st.warning("No favorites selected.")
def display_result(result, favorite_tab=False):
st.markdown(f"### {result['metadata']['title']}")
col1, col2 = st.columns([3, 2])
with col1:
st.markdown(f"**Speaker:** {result['metadata']['speaker']} ({result['metadata']['company']})")
st.markdown(f"**Date:** {result['metadata']['date']}")
st.markdown("**Transcript:**")
st.markdown(result['content'])
with col2:
start_time = format_timestamp(result['metadata']['start_timestamp'])
end_time = format_timestamp(result['metadata']['end_timestamp'])
st.markdown(f"**Time:** {start_time} - {end_time}")
play_url = result['metadata']['play']
if play_url:
st.components.v1.iframe(src=play_url, width=300, height=169, scrolling=True)
else:
st.warning("No video found")
if 'download' in result['metadata']:
download_path = result['metadata']['download']
file_name = os.path.basename(download_path)
file_content = get_file_content(download_path)
if file_content:
prefix = "fav_dl_" if favorite_tab else "dl_"
st.download_button(label="Download Clip", data=file_content, file_name=file_name, mime="video/mp4", key=f"{prefix}{result['id']}")
else:
st.warning(f"Clip file not found: {file_name}")
if result['metadata']['subjects']:
st.markdown("**Subjects:**")
subject_tags = ' '.join([f"{subject}" for subject in result['metadata']['subjects']])
st.markdown(subject_tags, unsafe_allow_html=True)
favorite_key = f"fav_{favorite_tab}_{result['id']}"
st.checkbox("Favorite", value=result['favorite'], key=favorite_key, on_change=update_favorite, args=(result['id'],))
st.markdown("---")
def main():
st.title("Telecom TV Video Expert")
st.markdown("Trained on data from [here](https://www.telecomtv.com/content/dsp-leaders-forum-videos/)")
_, _, companies, sentiments, subjects = db_load_metadata_sets()
tab1, tab2 = st.tabs(["Search", "Favorites"])
with tab1:
st.header("Filter Options")
st.checkbox("Show Filters", value=st.session_state.show_filters, on_change=toggle_show_filters)
if st.session_state.show_filters:
col1, col2, col3 = st.columns(3)
for filter_type, items in [('company', companies.keys()), ('speaker', set().union(*companies.values())), ('subjects', subjects)]:
with locals()[f'col{["company", "speaker", "subjects"].index(filter_type) + 1}']:
st.subheader(filter_type.capitalize())
for item in sorted(items):
st.checkbox(item, key=f'{filter_type}_{item}',
value=item in st.session_state[f'selected_{filter_type}'],
on_change=update_filter,
args=(filter_type, item))
st.slider("Number of relevant transcript excerpts to show:", min_value=1, max_value=500, value=st.session_state.num_results, step=1, key='num_results_slider', on_change=update_num_results)
st.button("Submit", on_click=submit_query)
if st.session_state.results:
for result in st.session_state.results:
result['favorite'] = result['id'] in st.session_state.favorites
display_result(result)
with tab2:
st.header("Favorites")
col1, col2 = st.columns(2)
with col1:
st.button("Save Favorites", on_click=save_favorites)
with col2:
st.button("Clear Favorites", on_click=clear_favorites)
for fav in st.session_state.favorites.values():
display_result(fav, favorite_tab=True)
if __name__ == "__main__":
main()