Spaces:
Runtime error
Runtime error
| # Utils.py | |
| ######################################### | |
| # General Utilities Library | |
| # This library is used to hold random utilities used by various other libraries. | |
| # | |
| #### | |
| #################### | |
| # Function List | |
| # | |
| # 1. extract_text_from_segments(segments: List[Dict]) -> str | |
| # 2. download_file(url, dest_path, expected_checksum=None, max_retries=3, delay=5) | |
| # 3. verify_checksum(file_path, expected_checksum) | |
| # 4. create_download_directory(title) | |
| # 5. sanitize_filename(filename) | |
| # 6. normalize_title(title) | |
| # 7. | |
| # | |
| # | |
| # | |
| #################### | |
| # Import necessary libraries | |
| import configparser | |
| import hashlib | |
| import json | |
| import logging | |
| import os | |
| import re | |
| import time | |
| from datetime import timedelta | |
| from urllib.parse import urlparse, parse_qs, urlencode, urlunparse | |
| import requests | |
| import unicodedata | |
| from tqdm import tqdm | |
| ####################################################################################################################### | |
| # Function Definitions | |
| # | |
| def extract_text_from_segments(segments): | |
| logging.debug(f"Segments received: {segments}") | |
| logging.debug(f"Type of segments: {type(segments)}") | |
| def extract_text_recursive(data): | |
| if isinstance(data, dict): | |
| for key, value in data.items(): | |
| if key == 'Text': | |
| return value | |
| elif isinstance(value, (dict, list)): | |
| result = extract_text_recursive(value) | |
| if result: | |
| return result | |
| elif isinstance(data, list): | |
| return ' '.join(filter(None, [extract_text_recursive(item) for item in data])) | |
| return None | |
| text = extract_text_recursive(segments) | |
| if text: | |
| return text.strip() | |
| else: | |
| logging.error(f"Unable to extract text from segments: {segments}") | |
| return "Error: Unable to extract transcription" | |
| def import_data(file): | |
| # Implement this function to import data from a file | |
| pass | |
| # | |
| # | |
| ####################### | |
| # Temp file cleanup | |
| # | |
| # Global list to keep track of downloaded files | |
| downloaded_files = [] | |
| def cleanup_downloads(): | |
| """Function to clean up downloaded files when the server exits.""" | |
| for file_path in downloaded_files: | |
| try: | |
| if os.path.exists(file_path): | |
| os.remove(file_path) | |
| print(f"Cleaned up file: {file_path}") | |
| except Exception as e: | |
| print(f"Error cleaning up file {file_path}: {e}") | |
| # | |
| # | |
| ####################################################################################################################### | |
| ####################################################################################################################### | |
| # Config loading | |
| # | |
| def load_comprehensive_config(): | |
| # Get the directory of the current script | |
| current_dir = os.path.dirname(os.path.abspath(__file__)) | |
| # Go up one level to the project root directory | |
| project_root = os.path.dirname(current_dir) | |
| # Construct the path to the config file in the project root directory | |
| config_path = os.path.join(project_root, 'config.txt') | |
| # Create a ConfigParser object | |
| config = configparser.ConfigParser() | |
| # Read the configuration file | |
| files_read = config.read(config_path) | |
| if not files_read: | |
| raise FileNotFoundError(f"Config file not found at {config_path}") | |
| return config | |
| # FIXME - update to include prompt path in return statement | |
| def load_and_log_configs(): | |
| try: | |
| config = load_comprehensive_config() | |
| if config is None: | |
| logging.error("Config is None, cannot proceed") | |
| return None | |
| # API Keys | |
| anthropic_api_key = config.get('API', 'anthropic_api_key', fallback=None) | |
| logging.debug( | |
| f"Loaded Anthropic API Key: {anthropic_api_key[:5]}...{anthropic_api_key[-5:] if anthropic_api_key else None}") | |
| cohere_api_key = config.get('API', 'cohere_api_key', fallback=None) | |
| logging.debug( | |
| f"Loaded Cohere API Key: {cohere_api_key[:5]}...{cohere_api_key[-5:] if cohere_api_key else None}") | |
| groq_api_key = config.get('API', 'groq_api_key', fallback=None) | |
| logging.debug(f"Loaded Groq API Key: {groq_api_key[:5]}...{groq_api_key[-5:] if groq_api_key else None}") | |
| openai_api_key = config.get('API', 'openai_api_key', fallback=None) | |
| logging.debug( | |
| f"Loaded OpenAI API Key: {openai_api_key[:5]}...{openai_api_key[-5:] if openai_api_key else None}") | |
| huggingface_api_key = config.get('API', 'huggingface_api_key', fallback=None) | |
| logging.debug( | |
| f"Loaded HuggingFace API Key: {huggingface_api_key[:5]}...{huggingface_api_key[-5:] if huggingface_api_key else None}") | |
| openrouter_api_key = config.get('API', 'openrouter_api_key', fallback=None) | |
| logging.debug( | |
| f"Loaded OpenRouter API Key: {openrouter_api_key[:5]}...{openrouter_api_key[-5:] if openrouter_api_key else None}") | |
| deepseek_api_key = config.get('API', 'deepseek_api_key', fallback=None) | |
| logging.debug( | |
| f"Loaded DeepSeek API Key: {deepseek_api_key[:5]}...{deepseek_api_key[-5:] if deepseek_api_key else None}") | |
| mistral_api_key = config.get('API', 'mistral_api_key', fallback=None) | |
| logging.debug( | |
| f"Loaded Mistral API Key: {mistral_api_key[:5]}...{mistral_api_key[-5:] if mistral_api_key else None}") | |
| # Models | |
| anthropic_model = config.get('API', 'anthropic_model', fallback='claude-3-sonnet-20240229') | |
| cohere_model = config.get('API', 'cohere_model', fallback='command-r-plus') | |
| groq_model = config.get('API', 'groq_model', fallback='llama3-70b-8192') | |
| openai_model = config.get('API', 'openai_model', fallback='gpt-4-turbo') | |
| huggingface_model = config.get('API', 'huggingface_model', fallback='CohereForAI/c4ai-command-r-plus') | |
| openrouter_model = config.get('API', 'openrouter_model', fallback='microsoft/wizardlm-2-8x22b') | |
| deepseek_model = config.get('API', 'deepseek_model', fallback='deepseek-chat') | |
| mistral_model = config.get('API', 'mistral_model', fallback='mistral-large-latest') | |
| logging.debug(f"Loaded Anthropic Model: {anthropic_model}") | |
| logging.debug(f"Loaded Cohere Model: {cohere_model}") | |
| logging.debug(f"Loaded Groq Model: {groq_model}") | |
| logging.debug(f"Loaded OpenAI Model: {openai_model}") | |
| logging.debug(f"Loaded HuggingFace Model: {huggingface_model}") | |
| logging.debug(f"Loaded OpenRouter Model: {openrouter_model}") | |
| logging.debug(f"Loaded Deepseek Model: {deepseek_model}") | |
| logging.debug(f"Loaded Mistral Model: {mistral_model}") | |
| # Local-Models | |
| kobold_api_ip = config.get('Local-API', 'kobold_api_IP', fallback='http://127.0.0.1:5000/api/v1/generate') | |
| kobold_api_key = config.get('Local-API', 'kobold_api_key', fallback='') | |
| llama_api_IP = config.get('Local-API', 'llama_api_IP', fallback='http://127.0.0.1:8080/v1/chat/completions') | |
| llama_api_key = config.get('Local-API', 'llama_api_key', fallback='') | |
| ooba_api_IP = config.get('Local-API', 'ooba_api_IP', fallback='http://127.0.0.1:5000/v1/chat/completions') | |
| ooba_api_key = config.get('Local-API', 'ooba_api_key', fallback='') | |
| tabby_api_IP = config.get('Local-API', 'tabby_api_IP', fallback='http://127.0.0.1:5000/api/v1/generate') | |
| tabby_api_key = config.get('Local-API', 'tabby_api_key', fallback=None) | |
| tabby_model = config.get('models', 'tabby_model', fallback=None) | |
| vllm_api_url = config.get('Local-API', 'vllm_api_IP', fallback='http://127.0.0.1:500/api/v1/chat/completions') | |
| vllm_api_key = config.get('Local-API', 'vllm_api_key', fallback=None) | |
| vllm_model = config.get('Local-API', 'vllm_model', fallback=None) | |
| ollama_api_url = config.get('Local-API', 'ollama_api_IP', fallback='http://127.0.0.1:11434/api/generate') | |
| ollama_api_key = config.get('Local-API', 'ollama_api_key', fallback=None) | |
| ollama_model = config.get('Local-API', 'ollama_model', fallback=None) | |
| aphrodite_api_url = config.get('Local-API', 'aphrodite_api_IP', fallback='http://127.0.0.1:8080/v1/chat/completions') | |
| aphrodite_api_key = config.get('Local-API', 'aphrodite_api_key', fallback='') | |
| logging.debug(f"Loaded Kobold API IP: {kobold_api_ip}") | |
| logging.debug(f"Loaded Llama API IP: {llama_api_IP}") | |
| logging.debug(f"Loaded Ooba API IP: {ooba_api_IP}") | |
| logging.debug(f"Loaded Tabby API IP: {tabby_api_IP}") | |
| logging.debug(f"Loaded VLLM API URL: {vllm_api_url}") | |
| # Retrieve output paths from the configuration file | |
| output_path = config.get('Paths', 'output_path', fallback='results') | |
| logging.debug(f"Output path set to: {output_path}") | |
| # Retrieve processing choice from the configuration file | |
| processing_choice = config.get('Processing', 'processing_choice', fallback='cpu') | |
| logging.debug(f"Processing choice set to: {processing_choice}") | |
| # Prompts - FIXME | |
| prompt_path = config.get('Prompts', 'prompt_path', fallback='prompts.db') | |
| return { | |
| 'api_keys': { | |
| 'anthropic': anthropic_api_key, | |
| 'cohere': cohere_api_key, | |
| 'groq': groq_api_key, | |
| 'openai': openai_api_key, | |
| 'huggingface': huggingface_api_key, | |
| 'openrouter': openrouter_api_key, | |
| 'deepseek': deepseek_api_key, | |
| 'mistral': mistral_api_key, | |
| 'kobold': kobold_api_key, | |
| 'llama': llama_api_key, | |
| 'ooba': ooba_api_key, | |
| 'tabby': tabby_api_key, | |
| 'vllm': vllm_api_key, | |
| 'ollama': ollama_api_key | |
| }, | |
| 'models': { | |
| 'anthropic': anthropic_model, | |
| 'cohere': cohere_model, | |
| 'groq': groq_model, | |
| 'openai': openai_model, | |
| 'huggingface': huggingface_model, | |
| 'openrouter': openrouter_model, | |
| 'deepseek': deepseek_model, | |
| 'mistral': mistral_model, | |
| 'vllm': vllm_model, | |
| 'tabby': tabby_model, | |
| 'ollama': ollama_model | |
| }, | |
| 'local_api_ip': { | |
| 'kobold': kobold_api_ip, | |
| 'llama': llama_api_IP, | |
| 'ooba': ooba_api_IP, | |
| 'tabby': tabby_api_IP, | |
| 'vllm': vllm_api_url, | |
| 'ollama': ollama_api_url, | |
| 'aphrodite': aphrodite_api_url | |
| }, | |
| 'output_path': output_path, | |
| 'processing_choice': processing_choice | |
| } | |
| except Exception as e: | |
| logging.error(f"Error loading config: {str(e)}") | |
| return None | |
| # | |
| # End of Config loading | |
| ####################################################################################################################### | |
| ####################################################################################################################### | |
| # | |
| # Prompt Handling Functions | |
| # | |
| # End of Prompt Handling Functions | |
| ### ############################################################################################################# | |
| ####################################################################################################################### | |
| # | |
| # Misc-Functions | |
| # Log file | |
| # logging.basicConfig(filename='debug-runtime.log', encoding='utf-8', level=logging.DEBUG) | |
| def format_metadata_as_text(metadata): | |
| if not metadata: | |
| return "No metadata available" | |
| formatted_text = "Video Metadata:\n" | |
| for key, value in metadata.items(): | |
| if value is not None: | |
| if isinstance(value, list): | |
| # Join list items with commas | |
| formatted_value = ", ".join(str(item) for item in value) | |
| elif key == 'upload_date' and len(str(value)) == 8: | |
| # Format date as YYYY-MM-DD | |
| formatted_value = f"{value[:4]}-{value[4:6]}-{value[6:]}" | |
| elif key in ['view_count', 'like_count']: | |
| # Format large numbers with commas | |
| formatted_value = f"{value:,}" | |
| elif key == 'duration': | |
| # Convert seconds to HH:MM:SS format | |
| hours, remainder = divmod(value, 3600) | |
| minutes, seconds = divmod(remainder, 60) | |
| formatted_value = f"{hours:02d}:{minutes:02d}:{seconds:02d}" | |
| else: | |
| formatted_value = str(value) | |
| formatted_text += f"{key.capitalize()}: {formatted_value}\n" | |
| return formatted_text.strip() | |
| # # Example usage: | |
| # example_metadata = { | |
| # 'title': 'Sample Video Title', | |
| # 'uploader': 'Channel Name', | |
| # 'upload_date': '20230615', | |
| # 'view_count': 1000000, | |
| # 'like_count': 50000, | |
| # 'duration': 3725, # 1 hour, 2 minutes, 5 seconds | |
| # 'tags': ['tag1', 'tag2', 'tag3'], | |
| # 'description': 'This is a sample video description.' | |
| # } | |
| # | |
| # print(format_metadata_as_text(example_metadata)) | |
| def convert_to_seconds(time_str): | |
| if not time_str: | |
| return 0 | |
| # If it's already a number, assume it's in seconds | |
| if time_str.isdigit(): | |
| return int(time_str) | |
| # Parse time string in format HH:MM:SS, MM:SS, or SS | |
| time_parts = time_str.split(':') | |
| if len(time_parts) == 3: | |
| return int(timedelta(hours=int(time_parts[0]), | |
| minutes=int(time_parts[1]), | |
| seconds=int(time_parts[2])).total_seconds()) | |
| elif len(time_parts) == 2: | |
| return int(timedelta(minutes=int(time_parts[0]), | |
| seconds=int(time_parts[1])).total_seconds()) | |
| elif len(time_parts) == 1: | |
| return int(time_parts[0]) | |
| else: | |
| raise ValueError(f"Invalid time format: {time_str}") | |
| # | |
| # End of Misc-Functions | |
| ####################################################################################################################### | |
| ####################################################################################################################### | |
| # | |
| # File-saving Function Definitions | |
| def save_to_file(video_urls, filename): | |
| with open(filename, 'w') as file: | |
| file.write('\n'.join(video_urls)) | |
| print(f"Video URLs saved to {filename}") | |
| def save_segments_to_json(segments, file_name="transcription_segments.json"): | |
| """ | |
| Save transcription segments to a JSON file. | |
| Parameters: | |
| segments (list): List of transcription segments | |
| file_name (str): Name of the JSON file to save (default: "transcription_segments.json") | |
| Returns: | |
| str: Path to the saved JSON file | |
| """ | |
| # Ensure the Results directory exists | |
| os.makedirs("Results", exist_ok=True) | |
| # Full path for the JSON file | |
| json_file_path = os.path.join("Results", file_name) | |
| # Save segments to JSON file | |
| with open(json_file_path, 'w', encoding='utf-8') as json_file: | |
| json.dump(segments, json_file, ensure_ascii=False, indent=4) | |
| return json_file_path | |
| def download_file(url, dest_path, expected_checksum=None, max_retries=3, delay=5): | |
| temp_path = dest_path + '.tmp' | |
| for attempt in range(max_retries): | |
| try: | |
| # Check if a partial download exists and get its size | |
| resume_header = {} | |
| if os.path.exists(temp_path): | |
| resume_header = {'Range': f'bytes={os.path.getsize(temp_path)}-'} | |
| response = requests.get(url, stream=True, headers=resume_header) | |
| response.raise_for_status() | |
| # Get the total file size from headers | |
| total_size = int(response.headers.get('content-length', 0)) | |
| initial_pos = os.path.getsize(temp_path) if os.path.exists(temp_path) else 0 | |
| mode = 'ab' if 'Range' in response.headers else 'wb' | |
| with open(temp_path, mode) as temp_file, tqdm( | |
| total=total_size, unit='B', unit_scale=True, desc=dest_path, initial=initial_pos, ascii=True | |
| ) as pbar: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| if chunk: # filter out keep-alive new chunks | |
| temp_file.write(chunk) | |
| pbar.update(len(chunk)) | |
| # Verify the checksum if provided | |
| if expected_checksum: | |
| if not verify_checksum(temp_path, expected_checksum): | |
| os.remove(temp_path) | |
| raise ValueError("Downloaded file's checksum does not match the expected checksum") | |
| # Move the file to the final destination | |
| os.rename(temp_path, dest_path) | |
| print("Download complete and verified!") | |
| return dest_path | |
| except Exception as e: | |
| print(f"Attempt {attempt + 1} failed: {e}") | |
| if attempt < max_retries - 1: | |
| print(f"Retrying in {delay} seconds...") | |
| time.sleep(delay) | |
| else: | |
| print("Max retries reached. Download failed.") | |
| raise | |
| def create_download_directory(title): | |
| base_dir = "Results" | |
| # Remove characters that are illegal in Windows filenames and normalize | |
| safe_title = normalize_title(title) | |
| logging.debug(f"{title} successfully normalized") | |
| session_path = os.path.join(base_dir, safe_title) | |
| if not os.path.exists(session_path): | |
| os.makedirs(session_path, exist_ok=True) | |
| logging.debug(f"Created directory for downloaded video: {session_path}") | |
| else: | |
| logging.debug(f"Directory already exists for downloaded video: {session_path}") | |
| return session_path | |
| def safe_read_file(file_path): | |
| encodings = ['utf-8', 'utf-16', 'ascii', 'latin-1', 'iso-8859-1', 'cp1252'] | |
| for encoding in encodings: | |
| try: | |
| with open(file_path, 'r', encoding=encoding) as file: | |
| return file.read() | |
| except UnicodeDecodeError: | |
| continue | |
| except FileNotFoundError: | |
| return f"File not found: {file_path}" | |
| except Exception as e: | |
| return f"An error occurred: {e}" | |
| return f"Unable to decode the file {file_path} with any of the attempted encodings: {encodings}" | |
| # | |
| # End of Files-saving Function Definitions | |
| ####################################################################################################################### | |
| ####################################################################################################################### | |
| # | |
| # UUID-Functions | |
| def generate_unique_filename(base_path, base_filename): | |
| """Generate a unique filename by appending a counter if necessary.""" | |
| filename = base_filename | |
| counter = 1 | |
| while os.path.exists(os.path.join(base_path, filename)): | |
| name, ext = os.path.splitext(base_filename) | |
| filename = f"{name}_{counter}{ext}" | |
| counter += 1 | |
| return filename | |
| def generate_unique_identifier(file_path): | |
| filename = os.path.basename(file_path) | |
| timestamp = int(time.time()) | |
| # Generate a hash of the file content | |
| hasher = hashlib.md5() | |
| with open(file_path, 'rb') as f: | |
| buf = f.read() | |
| hasher.update(buf) | |
| content_hash = hasher.hexdigest()[:8] # Use first 8 characters of the hash | |
| return f"local:{timestamp}:{content_hash}:{filename}" | |
| # | |
| # End of UUID-Functions | |
| ####################################################################################################################### | |
| ####################################################################################################################### | |
| # | |
| # Backup code | |
| # | |
| # End of backup code | |
| ####################################################################################################################### | |
| ####################################################################################################################### | |
| # | |
| # Sanitization/Verification Functions | |
| # Helper function to validate URL format | |
| def is_valid_url(url: str) -> bool: | |
| regex = re.compile( | |
| r'^(?:http|ftp)s?://' # http:// or https:// | |
| r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain... | |
| r'localhost|' # localhost... | |
| r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|' # ...or ipv4 | |
| r'\[?[A-F0-9]*:[A-F0-9:]+\]?)' # ...or ipv6 | |
| r'(?::\d+)?' # optional port | |
| r'(?:/?|[/?]\S+)$', re.IGNORECASE) | |
| return re.match(regex, url) is not None | |
| def verify_checksum(file_path, expected_checksum): | |
| sha256_hash = hashlib.sha256() | |
| with open(file_path, 'rb') as f: | |
| for byte_block in iter(lambda: f.read(4096), b''): | |
| sha256_hash.update(byte_block) | |
| return sha256_hash.hexdigest() == expected_checksum | |
| def normalize_title(title): | |
| # Normalize the string to 'NFKD' form and encode to 'ascii' ignoring non-ascii characters | |
| title = unicodedata.normalize('NFKD', title).encode('ascii', 'ignore').decode('ascii') | |
| title = title.replace('/', '_').replace('\\', '_').replace(':', '_').replace('"', '').replace('*', '').replace('?', | |
| '').replace( | |
| '<', '').replace('>', '').replace('|', '') | |
| return title | |
| def clean_youtube_url(url): | |
| parsed_url = urlparse(url) | |
| query_params = parse_qs(parsed_url.query) | |
| if 'list' in query_params: | |
| query_params.pop('list') | |
| cleaned_query = urlencode(query_params, doseq=True) | |
| cleaned_url = urlunparse(parsed_url._replace(query=cleaned_query)) | |
| return cleaned_url | |
| def sanitize_filename(filename): | |
| # Remove invalid characters and replace spaces with underscores | |
| sanitized = re.sub(r'[<>:"/\\|?*]', '', filename) | |
| sanitized = re.sub(r'\s+', ' ', sanitized).strip() | |
| return sanitized | |
| def format_transcription(content): | |
| # Replace '\n' with actual line breaks | |
| content = content.replace('\\n', '\n') | |
| # Split the content by newlines first | |
| lines = content.split('\n') | |
| formatted_lines = [] | |
| for line in lines: | |
| # Add extra space after periods for better readability | |
| line = line.replace('.', '. ').replace('. ', '. ') | |
| # Split into sentences using a more comprehensive regex | |
| sentences = re.split('(?<=[.!?]) +', line) | |
| # Trim whitespace from each sentence and add a line break | |
| formatted_sentences = [sentence.strip() for sentence in sentences if sentence.strip()] | |
| # Join the formatted sentences | |
| formatted_lines.append(' '.join(formatted_sentences)) | |
| # Join the lines with HTML line breaks | |
| formatted_content = '<br>'.join(formatted_lines) | |
| return formatted_content | |
| def format_file_path(file_path, fallback_path=None): | |
| if file_path and os.path.exists(file_path): | |
| logging.debug(f"File exists: {file_path}") | |
| return file_path | |
| elif fallback_path and os.path.exists(fallback_path): | |
| logging.debug(f"File does not exist: {file_path}. Returning fallback path: {fallback_path}") | |
| return fallback_path | |
| else: | |
| logging.debug(f"File does not exist: {file_path}. No fallback path available.") | |
| return None | |
| # | |
| # End of Sanitization/Verification Functions | |
| ####################################################################################################################### | |
| ####################################################################################################################### | |
| # | |
| # DB Config Loading | |
| def get_db_config(): | |
| config = configparser.ConfigParser() | |
| config.read('config.txt') | |
| return { | |
| 'type': config['Database']['type'], | |
| 'sqlite_path': config.get('Database', 'sqlite_path', fallback='media_summary.db'), | |
| 'elasticsearch_host': config.get('Database', 'elasticsearch_host', fallback='localhost'), | |
| 'elasticsearch_port': config.getint('Database', 'elasticsearch_port', fallback=9200) | |
| } | |
| # | |
| # End of DB Config Loading | |
| ####################################################################################################################### | |