Spaces:
Runtime error
Runtime error
| import os | |
| import re | |
| import yaml | |
| import sqlite3 | |
| import traceback | |
| import time | |
| import zipfile | |
| import tempfile | |
| import shutil | |
| import gradio as gr | |
| import logging | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| # Assume db connection is set up elsewhere | |
| db = None # Replace with your actual database connection | |
| class DatabaseError(Exception): | |
| pass | |
| def scan_obsidian_vault(vault_path): | |
| markdown_files = [] | |
| for root, dirs, files in os.walk(vault_path): | |
| for file in files: | |
| if file.endswith('.md'): | |
| markdown_files.append(os.path.join(root, file)) | |
| return markdown_files | |
| def parse_obsidian_note(file_path): | |
| with open(file_path, 'r', encoding='utf-8') as file: | |
| content = file.read() | |
| frontmatter = {} | |
| frontmatter_match = re.match(r'^---\s*\n(.*?)\n---\s*\n', content, re.DOTALL) | |
| if frontmatter_match: | |
| frontmatter_text = frontmatter_match.group(1) | |
| frontmatter = yaml.safe_load(frontmatter_text) | |
| content = content[frontmatter_match.end():] | |
| tags = re.findall(r'#(\w+)', content) | |
| links = re.findall(r'\[\[(.*?)\]\]', content) | |
| return { | |
| 'title': os.path.basename(file_path).replace('.md', ''), | |
| 'content': content, | |
| 'frontmatter': frontmatter, | |
| 'tags': tags, | |
| 'links': links, | |
| 'file_path': file_path # Add this line | |
| } | |
| def import_obsidian_note_to_db(note_data): | |
| try: | |
| with db.get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT id FROM Media WHERE title = ? AND type = 'obsidian_note'", (note_data['title'],)) | |
| existing_note = cursor.fetchone() | |
| if existing_note: | |
| media_id = existing_note[0] | |
| cursor.execute(""" | |
| UPDATE Media | |
| SET content = ?, author = ?, ingestion_date = CURRENT_TIMESTAMP | |
| WHERE id = ? | |
| """, (note_data['content'], note_data['frontmatter'].get('author', 'Unknown'), media_id)) | |
| cursor.execute("DELETE FROM MediaKeywords WHERE media_id = ?", (media_id,)) | |
| else: | |
| cursor.execute(""" | |
| INSERT INTO Media (title, content, type, author, ingestion_date, url) | |
| VALUES (?, ?, 'obsidian_note', ?, CURRENT_TIMESTAMP, ?) | |
| """, (note_data['title'], note_data['content'], note_data['frontmatter'].get('author', 'Unknown'), | |
| note_data['file_path'])) | |
| media_id = cursor.lastrowid | |
| for tag in note_data['tags']: | |
| cursor.execute("INSERT OR IGNORE INTO Keywords (keyword) VALUES (?)", (tag,)) | |
| cursor.execute("SELECT id FROM Keywords WHERE keyword = ?", (tag,)) | |
| keyword_id = cursor.fetchone()[0] | |
| cursor.execute("INSERT OR IGNORE INTO MediaKeywords (media_id, keyword_id) VALUES (?, ?)", | |
| (media_id, keyword_id)) | |
| frontmatter_str = yaml.dump(note_data['frontmatter']) | |
| cursor.execute(""" | |
| INSERT INTO MediaModifications (media_id, prompt, summary, modification_date) | |
| VALUES (?, 'Obsidian Frontmatter', ?, CURRENT_TIMESTAMP) | |
| """, (media_id, frontmatter_str)) | |
| # Update full-text search index | |
| cursor.execute('INSERT OR REPLACE INTO media_fts (rowid, title, content) VALUES (?, ?, ?)', | |
| (media_id, note_data['title'], note_data['content'])) | |
| action = "Updated" if existing_note else "Imported" | |
| logger.info(f"{action} Obsidian note: {note_data['title']}") | |
| return True, None | |
| except sqlite3.Error as e: | |
| error_msg = f"Database error {'updating' if existing_note else 'importing'} note {note_data['title']}: {str(e)}" | |
| logger.error(error_msg) | |
| return False, error_msg | |
| except Exception as e: | |
| error_msg = f"Unexpected error {'updating' if existing_note else 'importing'} note {note_data['title']}: {str(e)}\n{traceback.format_exc()}" | |
| logger.error(error_msg) | |
| return False, error_msg | |
| def import_obsidian_vault(vault_path, progress=gr.Progress()): | |
| try: | |
| markdown_files = scan_obsidian_vault(vault_path) | |
| total_files = len(markdown_files) | |
| imported_files = 0 | |
| errors = [] | |
| for i, file_path in enumerate(markdown_files): | |
| try: | |
| note_data = parse_obsidian_note(file_path) | |
| success, error_msg = import_obsidian_note_to_db(note_data) | |
| if success: | |
| imported_files += 1 | |
| else: | |
| errors.append(error_msg) | |
| except Exception as e: | |
| error_msg = f"Error processing {file_path}: {str(e)}" | |
| logger.error(error_msg) | |
| errors.append(error_msg) | |
| progress((i + 1) / total_files, f"Imported {imported_files} of {total_files} files") | |
| time.sleep(0.1) # Small delay to prevent UI freezing | |
| return imported_files, total_files, errors | |
| except Exception as e: | |
| error_msg = f"Error scanning vault: {str(e)}\n{traceback.format_exc()}" | |
| logger.error(error_msg) | |
| return 0, 0, [error_msg] | |
| def process_obsidian_zip(zip_file): | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| try: | |
| with zipfile.ZipFile(zip_file, 'r') as zip_ref: | |
| zip_ref.extractall(temp_dir) | |
| imported_files, total_files, errors = import_obsidian_vault(temp_dir) | |
| return imported_files, total_files, errors | |
| except zipfile.BadZipFile: | |
| error_msg = "The uploaded file is not a valid zip file." | |
| logger.error(error_msg) | |
| return 0, 0, [error_msg] | |
| except Exception as e: | |
| error_msg = f"Error processing zip file: {str(e)}\n{traceback.format_exc()}" | |
| logger.error(error_msg) | |
| return 0, 0, [error_msg] | |
| finally: | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| # Gradio interface | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# Content Export and Import Interface") | |
| # ... (your existing tabs and components) | |
| with gr.Tab("Import Obsidian Vault"): | |
| gr.Markdown("## Import Obsidian Vault") | |
| with gr.Row(): | |
| vault_path_input = gr.Textbox(label="Obsidian Vault Path (Local)") | |
| vault_zip_input = gr.File(label="Upload Obsidian Vault (Zip)") | |
| import_vault_button = gr.Button("Import Obsidian Vault") | |
| import_status = gr.Textbox(label="Import Status", interactive=False) | |
| def import_vault(vault_path, vault_zip): | |
| if vault_zip: | |
| imported, total, errors = process_obsidian_zip(vault_zip.name) | |
| elif vault_path: | |
| imported, total, errors = import_obsidian_vault(vault_path) | |
| else: | |
| return "Please provide either a local vault path or upload a zip file." | |
| status = f"Imported {imported} out of {total} files.\n" | |
| if errors: | |
| status += f"Encountered {len(errors)} errors:\n" + "\n".join(errors) | |
| return status | |
| import_vault_button.click( | |
| fn=import_vault, | |
| inputs=[vault_path_input, vault_zip_input], | |
| outputs=[import_status], | |
| show_progress=True | |
| ) | |
| # ... (rest of your existing code) | |
| demo.launch() | |
| # This comprehensive solution includes: | |
| # | |
| # Enhanced error handling throughout the import process. | |
| # Progress updates for large vaults using Gradio's progress bar. | |
| # The ability to update existing notes if they're reimported. | |
| # Support for importing Obsidian vaults from both local directories and uploaded zip files. |