import streamlit as st from ai_config_faiss import get_ai_assistant from ttv_web_scraper import db_load_metadata_sets import json from datetime import datetime import os import base64 # Initialize session state if 'results' not in st.session_state: st.session_state.results = None if 'where' not in st.session_state: st.session_state.where = {} if 'num_results' not in st.session_state: st.session_state.num_results = 3 if 'favorites' not in st.session_state: st.session_state.favorites = {} if 'show_filters' not in st.session_state: st.session_state.show_filters = True # Initialize filter selections for filter_type in ['company', 'speaker', 'subjects']: if f'selected_{filter_type}' not in st.session_state: st.session_state[f'selected_{filter_type}'] = [] @st.cache_resource def get_assistant(): return get_ai_assistant() def format_timestamp(timestamp): try: time = datetime.strptime(timestamp, "%H:%M:%S") return time.strftime("%M:%S") except ValueError: return timestamp def get_file_content(file_path): if os.path.exists(file_path): with open(file_path, "rb") as file: return file.read() return None def create_markdown_download_link(markdown_content): b64 = base64.b64encode(markdown_content.encode()).decode() return f'Download Favorites' def update_filter(filter_type, item): if item in st.session_state[f'selected_{filter_type}']: st.session_state[f'selected_{filter_type}'].remove(item) else: st.session_state[f'selected_{filter_type}'].append(item) update_where() def update_where(): st.session_state.where = {} for filter_type in ['company', 'speaker', 'subjects']: if st.session_state[f'selected_{filter_type}']: st.session_state.where[filter_type] = st.session_state[f'selected_{filter_type}'] def toggle_show_filters(): st.session_state.show_filters = not st.session_state.show_filters def update_num_results(): st.session_state.num_results = st.session_state.num_results_slider def submit_query(): if not st.session_state.where: st.warning("Please select at least one filter before submitting.") return assistant = get_assistant() with st.spinner("Thinking..."): response = assistant.query("", num_results=st.session_state.num_results, filters=st.session_state.where) try: st.session_state.results = json.loads(response) except json.JSONDecodeError: st.error("Failed to parse the response. Please try again.") def update_favorite(result_id): result = next((r for r in st.session_state.results if r['id'] == result_id), None) if result: result['favorite'] = not result['favorite'] if result['favorite']: st.session_state.favorites[result_id] = result else: st.session_state.favorites.pop(result_id, None) def clear_favorites(): st.session_state.favorites.clear() st.success("All favorites have been cleared.") def save_favorites(): if st.session_state.favorites: markdown_content = "# Favorites\n\n" for fav in st.session_state.favorites.values(): markdown_content += f"## {fav['metadata']['title']}\n\n" markdown_content += f"**Speaker:** {fav['metadata']['speaker']} ({fav['metadata']['company']})\n\n" markdown_content += f"**Date:** {fav['metadata']['date']}\n\n" markdown_content += f"**Time:** {format_timestamp(fav['metadata']['start_timestamp'])} - {format_timestamp(fav['metadata']['end_timestamp'])}\n\n" markdown_content += f"**Transcript:** {fav['content']}\n\n" play_link = fav['metadata']['play'] modified_play_link = f"{play_link}&controls=1&showinfo=0&modestbranding=1" markdown_content += f"**Video Link:** [{play_link}]({modified_play_link})\n\n" if fav['metadata']['subjects']: markdown_content += f"**Subjects:** {', '.join(fav['metadata']['subjects'])}\n\n" markdown_content += "---\n\n" st.markdown(create_markdown_download_link(markdown_content), unsafe_allow_html=True) else: st.warning("No favorites selected.") def display_result(result, favorite_tab=False): st.markdown(f"### {result['metadata']['title']}") col1, col2 = st.columns([3, 2]) with col1: st.markdown(f"**Speaker:** {result['metadata']['speaker']} ({result['metadata']['company']})") st.markdown(f"**Date:** {result['metadata']['date']}") st.markdown("**Transcript:**") st.markdown(result['content']) with col2: start_time = format_timestamp(result['metadata']['start_timestamp']) end_time = format_timestamp(result['metadata']['end_timestamp']) st.markdown(f"**Time:** {start_time} - {end_time}") play_url = result['metadata']['play'] if play_url: st.components.v1.iframe(src=play_url, width=300, height=169, scrolling=True) else: st.warning("No video found") if 'download' in result['metadata']: download_path = result['metadata']['download'] file_name = os.path.basename(download_path) file_content = get_file_content(download_path) if file_content: prefix = "fav_dl_" if favorite_tab else "dl_" st.download_button(label="Download Clip", data=file_content, file_name=file_name, mime="video/mp4", key=f"{prefix}{result['id']}") else: st.warning(f"Clip file not found: {file_name}") if result['metadata']['subjects']: st.markdown("**Subjects:**") subject_tags = ' '.join([f"{subject}" for subject in result['metadata']['subjects']]) st.markdown(subject_tags, unsafe_allow_html=True) favorite_key = f"fav_{favorite_tab}_{result['id']}" st.checkbox("Favorite", value=result['favorite'], key=favorite_key, on_change=update_favorite, args=(result['id'],)) st.markdown("---") def main(): st.title("Telecom TV Video Expert") st.markdown("Trained on data from [here](https://www.telecomtv.com/content/dsp-leaders-forum-videos/)") _, _, companies, sentiments, subjects = db_load_metadata_sets() tab1, tab2 = st.tabs(["Search", "Favorites"]) with tab1: st.header("Filter Options") st.checkbox("Show Filters", value=st.session_state.show_filters, on_change=toggle_show_filters) if st.session_state.show_filters: col1, col2, col3 = st.columns(3) for filter_type, items in [('company', companies.keys()), ('speaker', set().union(*companies.values())), ('subjects', subjects)]: with locals()[f'col{["company", "speaker", "subjects"].index(filter_type) + 1}']: st.subheader(filter_type.capitalize()) for item in sorted(items): st.checkbox(item, key=f'{filter_type}_{item}', value=item in st.session_state[f'selected_{filter_type}'], on_change=update_filter, args=(filter_type, item)) st.slider("Number of relevant transcript excerpts to show:", min_value=1, max_value=500, value=st.session_state.num_results, step=1, key='num_results_slider', on_change=update_num_results) st.button("Submit", on_click=submit_query) if st.session_state.results: for result in st.session_state.results: result['favorite'] = result['id'] in st.session_state.favorites display_result(result) with tab2: st.header("Favorites") col1, col2 = st.columns(2) with col1: st.button("Save Favorites", on_click=save_favorites) with col2: st.button("Clear Favorites", on_click=clear_favorites) for fav in st.session_state.favorites.values(): display_result(fav, favorite_tab=True) if __name__ == "__main__": main()