# -*- coding: utf-8 -*- """ 📕 TawasoaAi_OCR.py - الإصدار 23.4 (إصلاح حالة المعالجة وتحسين عرض الحالة) النسخة المخصصة لبيئة Hugging Face: تقوم بالزحف وحفظ الملفات في GCS وإدخال السجلات الأساسية في DB. **تخطي جميع عمليات استخلاص النصوص و OCR للملفات، واستخلاص بسيط لصفحات السجلات.** ✅ [إصلاح 412] تعديل gcs_upload_file لاستخدام صلاحيات حساب الخدمة. ✅ [حالة DB] توحيد حالة الملفات والسجلات إلى 'awaiting_processing'. ✅ [تحسين] تعزيز دالة الزحف للتعامل مع أخطاء Requests. ✅ [إصلاح التوقف] إضافة مهلة عامة ومعالجة استثناءات الخيوط. ✅ [تحقق من الروابط] إضافة تحقق من صحة الروابط. ✅ [إصلاح Gradio] تعديل get_status لإرجاع قيمتين. ✅ [إصلاح SSL] زيادة مهلة الاتصال ودعم SSL صريح. ✅ [جديد: إصلاح الحالة] ضمان تسجيل الحالة كـ 'awaiting_processing' عند الرفع الناجح إلى GCS. """ # ============================================================================== # 0. IMPORTS AND GLOBAL SETUP # ============================================================================== import os import re import io import json import time import signal import uuid import traceback import logging import threading import sys import zipfile import mimetypes from concurrent.futures import ThreadPoolExecutor, as_completed from urllib.parse import urljoin, urlparse, unquote from collections import deque from datetime import datetime, timedelta from typing import Optional, List, Dict, Any, Tuple, Set from enum import Enum import requests from bs4 import BeautifulSoup from tqdm import tqdm import gradio as gr try: from dotenv import load_dotenv DOTENV_AVAILABLE = True except ImportError: DOTENV_AVAILABLE = False try: import google.generativeai as genai from google.cloud import storage from google.oauth2 import service_account GCS_GEMINI_AVAILABLE = True except ImportError: GCS_GEMINI_AVAILABLE = False try: import psycopg2 from psycopg2 import pool, Binary from psycopg2.extras import RealDictCursor, execute_values POSTGRES_AVAILABLE = True except ImportError: POSTGRES_AVAILABLE = False PDF_AVAILABLE = False DOCX_AVAILABLE = False EXCEL_AVAILABLE = False OCR_AVAILABLE = False MD_AVAILABLE = False # ============================================================================== # 1. CONSTANTS # ============================================================================== class JobStatus(Enum): IDLE = "⚪ خامل" SCANNING = "⏳ فحص الأرشيف..." CRAWLING = "🔍 يتم الزحف..." PROCESSING = "⚙️ تتم المعالجة (جلب الخام)..." DONE = "✅ اكتمل" GCS_UP_TO_DATE = "✅ GCS محدث" ERROR = "🔴 خطأ" class CrawlMode(Enum): ALL = "الكل" PDF = "ملفات PDF فقط" DOC = "ملفات DOC/DOCX فقط" XLS = "ملفات XLS/XLSX فقط" TXT = "ملفات TXT/MD فقط" RECORDS = "سجلات HTML فقط" SUPPORTED_FILE_EXTENSIONS = {'.pdf', '.doc', '.docx', '.xls', '.xlsx', '.txt', '.md'} RECORD_PAGE_INDICATORS = {'/records/', '/company_records/', '/registration/'} REQUEST_HEADERS = {'User-Agent': 'Mozilla/5.0'} # قاموس ترجمة الحالات لعرضها في واجهة Gradio STATUS_TRANSLATIONS = { "awaiting_processing": "جارى المعالجة", "failed": "فشلت المعالجة", "processed": "تمت المعالجة", "N/A": "غير متاح" } # ============================================================================== # 2. CONFIGURATION AND CONTEXT # ============================================================================== def setup_logging() -> logging.Logger: """إعداد وتكوين نظام تسجيل الأحداث.""" log_format = "%(asctime)s [%(levelname)s] %(name)s: %(message)s" date_format = "%Y-%m-%d %H:%M:%S" logger = logging.getLogger("TawasoaAiOCR") if not logger.handlers: logger.setLevel(logging.INFO) file_handler = logging.FileHandler('tawasoa_ai_ocr.log', encoding='utf-8') file_handler.setFormatter(logging.Formatter(log_format, date_format)) logger.addHandler(file_handler) stream_handler = logging.StreamHandler(sys.stdout) stream_handler.setFormatter(logging.Formatter(log_format, date_format)) logger.addHandler(stream_handler) return logger class Config: """كلاس مركزي لتجميع إعدادات المشروع.""" def __init__(self, logger: logging.Logger): if DOTENV_AVAILABLE: load_dotenv() self.log = logger self.DB_CONFIG: Dict[str, Any] = { "host": os.getenv("DB_HOST", "localhost"), "user": os.getenv("DB_USERNAME", "postgres"), "password": os.getenv("DB_PASSWORD"), "dbname": os.getenv("DB_DATABASE", "tawasoa"), "port": self._get_env_as_int("DB_PORT", 5432), "sslmode": os.getenv("DB_SSLMODE", "prefer"), "connect_timeout": 30 } self.GCS_BUCKET_NAME: Optional[str] = os.getenv("GCS_BUCKET_NAME") self.GOOGLE_API_KEY: Optional[str] = os.getenv("GOOGLE_API_KEY") self.MAX_WORKERS: int = self._get_env_as_int("MAX_WORKERS_DEFAULT", 5) def _get_env_as_int(self, key: str, default: int) -> int: try: return int(os.getenv(key, str(default))) except (ValueError, TypeError): self.log.warning(f"قيمة متغير البيئة '{key}' غير صالحة. استخدام الافتراضي: {default}") return default class AppContext: """يحتوي على حالة التطبيق الحية والمكونات المُهيأة.""" def __init__(self): self.connection_pool = None self.storage_client = None self.app_shutdown = threading.Event() self.is_db_ready: bool = False self.is_gcs_ready: bool = False self.is_gemini_ready: bool = False # ============================================================================== # 3. STATE MANAGEMENT # ============================================================================== class JobManager: """فئة لإدارة حالة المهمة التي تعمل في الخلفية.""" def __init__(self): self.lock = threading.Lock() self.job = { "thread": None, "type": None, "status": JobStatus.IDLE.value, "log": ["مرحبًا بك!"], "stop_event": threading.Event() } def update_log(self, message: str): if not message: return with self.lock: self.job["log"].append(f"[{time.strftime('%H:%M:%S')}] {message}") if len(self.job["log"]) > 200: self.job["log"] = self.job["log"][-200:] def update_status(self, status: JobStatus): with self.lock: self.job["status"] = status.value def get_status(self) -> tuple[str, str]: with self.lock: log_text = "\n".join(self.job["log"][-100:]) status = self.job["status"] return log_text, status def start_job(self, target_func, job_type: str, *args): with self.lock: if self.job["thread"] and self.job["thread"].is_alive(): self.update_log("⚠️ مهمة أخرى قيد التشغيل بالفعل.") return self.job["stop_event"].clear() self.job["type"] = job_type self.job["thread"] = threading.Thread(target=target_func, args=args) self.job["thread"].start() def stop_job(self): with self.lock: if self.job["thread"] and self.job["thread"].is_alive(): self.update_log("🛑 جاري إرسال إشارة الإيقاف...") self.job["stop_event"].set() return "تم إرسال طلب الإيقاف." return "لا توجد مهمة قيد التشغيل." def reset_thread(self): with self.lock: self.job["thread"] = None # ============================================================================== # 4. DATABASE OPERATIONS # ============================================================================== def check_pool_health(pool, config: Config, log: logging.Logger) -> bool: """فحص صحة الـ connection pool وإعادة تهيئته إذا لزم الأمر.""" try: test_conn = pool.getconn() with test_conn.cursor() as cursor: cursor.execute("SELECT 1") pool.putconn(test_conn) log.info("✅ فحص صحة الـ connection pool: ناجح.") return True except Exception as e: log.error(f"❌ فشل فحص الـ connection pool: {e}. محاولة إعادة التهيئة...") try: pool.closeall() new_pool = psycopg2.pool.ThreadedConnectionPool( minconn=2, maxconn=config.MAX_WORKERS + 3, **config.DB_CONFIG ) pool = new_pool log.info("✅ تم إعادة تهيئة الـ connection pool بنجاح.") return True except Exception as e: log.error(f"❌ فشل إعادة تهيئة الـ connection pool: {e}.") return False def db_execute_query(pool, query: str, params=None, fetch_result=False, max_retries=7): """تنفيذ استعلام قاعدة البيانات مع إعادة المحاولة.""" retry_delays = [5, 10, 15, 20, 25, 30, 35] for attempt in range(max_retries): conn = None try: conn = pool.getconn() with conn.cursor() as cursor: cursor.execute(query, params) if fetch_result: result = cursor.fetchall() else: result = True conn.commit() log.info(f"✅ نجاح استعلام قاعدة البيانات في المحاولة {attempt + 1}.") return result except (psycopg2.OperationalError, psycopg2.InterfaceError) as e: log.warning(f"⚠️ خطأ تشغيلي في قاعدة البيانات (محاولة {attempt + 1}/{max_retries}): {e}.") if conn: try: conn.rollback() except: pass pool.putconn(conn, close=True) conn = None if attempt < max_retries - 1: delay = retry_delays[attempt] log.info(f"إعادة المحاولة بعد {delay} ثوانٍ.") time.sleep(delay) continue else: log.error(f"❌ فشلت جميع محاولات الاتصال بعد {max_retries} محاولات.") raise except Exception as e: if conn: try: conn.rollback() except: pass raise finally: if conn: try: pool.putconn(conn) except psycopg2.pool.PoolError as pe: log.error(f"❌ خطأ في إعادة الاتصال إلى التجمع: {pe}. إغلاق الاتصال.") pool.putconn(conn, close=True) return False def db_check_document_exists(pool, original_path: str) -> bool: """التحقق من وجود مستند.""" results = db_execute_query( pool, "SELECT id FROM documents WHERE original_path = %s;", (original_path,), fetch_result=True ) return bool(results) if results is not False else False def db_insert_document(pool, doc_data: Dict[str, Any]) -> int: """إدراج سجل مستند جديد.""" conn = pool.getconn() try: with conn.cursor() as cursor: cursor.execute( """INSERT INTO documents (source, filename, original_path, storage_path, cover_image_path, status, summary, full_text_content, created_at, updated_at) VALUES (%(source)s, %(filename)s, %(original_path)s, %(storage_path)s, %(cover_image_path)s, %(status)s, %(summary)s, %(full_text_content)s, NOW(), NOW()) RETURNING id;""", doc_data ) doc_id = cursor.fetchone()[0] conn.commit() log.info(f"✅ تم إدراج المستند '{doc_data['filename']}' بحالة '{doc_data['status']}'.") return doc_id except Exception as e: conn.rollback() log.error(f"❌ فشل إدراج المستند '{doc_data['filename']}': {e}") raise finally: pool.putconn(conn) def db_log_failed_document(pool, original_path: str, filename: str, error: str): """تسجيل المستندات الفاشلة.""" conn = pool.getconn() try: with conn.cursor() as cursor: cursor.execute( "INSERT INTO documents (source, filename, original_path, status, summary, created_at, updated_at) " "VALUES (%s, %s, %s, 'failed', %s, NOW(), NOW()) ON CONFLICT (original_path) DO NOTHING", (original_path, filename, original_path, f"فشل الرفع: {error}") ) conn.commit() log.info(f"📝 تم تسجيل المستند الفاشل '{filename}' بسبب: {error}") except Exception as e: log.error(f"❌ فشل تسجيل المستند الفاشل '{filename}': {e}") finally: pool.putconn(conn) def db_search_records(pool, search_term: str) -> List[Dict[str, Any]]: """البحث في المستندات.""" conn = pool.getconn() try: with conn.cursor(cursor_factory=RealDictCursor) as cursor: search_pattern = f"%{search_term}%" cursor.execute( """ SELECT id, filename, original_path, status, summary, created_at, full_text_content FROM documents WHERE filename ILIKE %s OR full_text_content ILIKE %s ORDER BY created_at DESC LIMIT 50; """, (search_pattern, search_pattern) ) return cursor.fetchall() finally: pool.putconn(conn) # ============================================================================== # 5. FILE PARSING AND OCR # ============================================================================== def decompose_html_record(content_bytes: bytes) -> str: """استخلاص النص المرئي من HTML.""" try: soup = BeautifulSoup(content_bytes, 'html.parser') for script_or_style in soup(['script', 'style', 'header', 'footer', 'nav']): script_or_style.decompose() return soup.get_text(separator=' ', strip=True) except Exception as e: log.error(f"❌ فشل في تحليل HTML: {e}") return content_bytes.decode('utf-8', errors='ignore') def is_valid_docx(content_bytes: bytes) -> bool: return True def ocr_image_to_text(image_bytes: bytes) -> str: return "" def generate_page_images_as_binary(content_bytes: bytes) -> List[Dict[str, Any]]: return [] def decompose_pdf(content_bytes: bytes, page_images: List[Dict[str, Any]]) -> List[Dict[str, Any]]: return [] def decompose_docx(content_bytes: bytes) -> List[Dict[str, Any]]: return [] def decompose_excel(content_bytes: bytes) -> List[Dict[str, Any]]: return [] def decompose_text_or_md(content_bytes: bytes, file_ext: str) -> List[Dict[str, Any]]: return [] # ============================================================================== # 6. CLOUD AND AI SERVICES # ============================================================================== def gcs_upload_file(client, bucket_name: str, content_bytes: bytes, blob_name: str, content_type: str) -> str: """رفع ملف إلى GCS.""" bucket = client.bucket(bucket_name) blob = bucket.blob(blob_name) blob.upload_from_string(content_bytes, content_type=content_type) log.info(f"✅ تم رفع الملف '{blob_name}' إلى GCS.") return f"gs://{bucket_name}/{blob_name}" def gcs_generate_cover_image(client, bucket_name: str, content_bytes: bytes, original_filename: str) -> Optional[str]: """تعطيل إنشاء الغلاف.""" return None def ai_get_embedding(text: str) -> Optional[List[float]]: """تعطيل التضمين.""" return None def ai_summarize_and_classify(text: str) -> Dict[str, str]: """تعطيل التلخيص والتصنيف.""" return {"summary": "لم تتم معالجة النص على الخادم.", "category": "غير مصنف"} # ============================================================================== # 7. CORE LOGIC / WORKERS # ============================================================================== def is_valid_url(url: str) -> bool: """التحقق من صحة الرابط.""" try: result = urlparse(url) return all([result.scheme, result.netloc]) except Exception: return False def process_single_document(link: str, content_type: str, session: requests.Session) -> str: """معالجة مستند واحد.""" safe_filename = unquote(os.path.basename(urlparse(link).path)) try: if db_check_document_exists(app_context.connection_pool, link): return f"🟡 (تخطي) ملف موجود مسبقاً: {safe_filename}" resp = session.get(link, timeout=20, headers=REQUEST_HEADERS) resp.raise_for_status() content_bytes = resp.content full_text_content = "" document_uuid = str(uuid.uuid4()) blob_name = f"documents/{document_uuid}/{safe_filename}" storage_path = gcs_upload_file(app_context.storage_client, app_config.GCS_BUCKET_NAME, content_bytes, blob_name, content_type) cover_image_url = gcs_generate_cover_image(app_context.storage_client, app_config.GCS_BUCKET_NAME, content_bytes, safe_filename) summary = "تم حفظ الملف الخام في GCS. (جارى انتظار المعالجة النصية محليًا)." status = 'awaiting_processing' doc_data = { "source": link, "filename": safe_filename, "original_path": link, "storage_path": storage_path, "cover_image_path": cover_image_url, "status": status, "summary": summary, "full_text_content": full_text_content, } doc_id = db_insert_document(app_context.connection_pool, doc_data) return f"✅ تم حفظ الملف الخام '{safe_filename}' في GCS و DB (ID: {doc_id}). (الحالة: جارى المعالجة)" except requests.exceptions.RequestException as e: error_msg = f"فشل طلب HTTP: {str(e)}" log.error(f"❌ خطأ فادح أثناء معالجة ملف {safe_filename}: {error_msg}\n{traceback.format_exc()}") db_log_failed_document(app_context.connection_pool, link, safe_filename, error_msg) return f"❌ فشلت معالجة ملف {safe_filename} - {error_msg}" except Exception as e: error_msg = f"خطأ غير متوقع: {str(e)}" log.error(f"❌ خطأ فادح أثناء معالجة ملف {safe_filename}: {error_msg}\n{traceback.format_exc()}") db_log_failed_document(app_context.connection_pool, link, safe_filename, error_msg) return f"❌ فشلت معالجة ملف {safe_filename} - {error_msg}" def process_single_record_page(link: str, session: requests.Session) -> str: """معالجة صفحة سجل واحدة (HTML).""" safe_filename = unquote(os.path.basename(urlparse(link).path)) try: if db_check_document_exists(app_context.connection_pool, link): return f"🟡 (تخطي) سجل موجود مسبقاً: {safe_filename}" resp = session.get(link, timeout=20, headers=REQUEST_HEADERS) resp.raise_for_status() content_bytes = resp.content full_text_content = decompose_html_record(content_bytes) summary = "سجل تم استخلاص نصه جزئياً (HTML). (جارى انتظار المعالجة المتبقية محليًا)." status = 'awaiting_processing' doc_data = { "source": link, "filename": safe_filename, "original_path": link, "storage_path": None, "cover_image_path": None, "status": status, "summary": summary, "full_text_content": full_text_content, } doc_id = db_insert_document(app_context.connection_pool, doc_data) return f"✅ تم حفظ سجل الصفحة '{safe_filename}' في DB (ID: {doc_id}). (الحالة: جارى المعالجة)" except requests.exceptions.RequestException as e: error_msg = f"فشل طلب HTTP: {str(e)}" log.error(f"❌ خطأ فادح أثناء معالجة سجل {safe_filename}: {error_msg}\n{traceback.format_exc()}") db_log_failed_document(app_context.connection_pool, link, safe_filename, error_msg) return f"❌ فشلت معالجة سجل {safe_filename} - {error_msg}" except Exception as e: error_msg = f"خطأ غير متوقع: {str(e)}" log.error(f"❌ خطأ فادح أثناء معالجة سجل {safe_filename}: {error_msg}\n{traceback.format_exc()}") db_log_failed_document(app_context.connection_pool, link, safe_filename, error_msg) return f"❌ فشلت معالجة سجل {safe_filename} - {error_msg}" def crawl_website(url: str, limit: int, mode: CrawlMode): """المحرك الرئيسي لعملية الزحف.""" job_manager.update_status(JobStatus.CRAWLING) session = requests.Session() try: if not is_valid_url(url): job_manager.update_log(f"❌ رابط البداية غير صالح: {url}") job_manager.update_status(JobStatus.ERROR) return if not url.startswith(('http://', 'https://')): url = 'https://' + url base_domain = urlparse(url).netloc visited = set() queue = deque([url]) found_links = set() target_file_extensions = { CrawlMode.PDF: ['.pdf'], CrawlMode.DOC: ['.doc', '.docx'], CrawlMode.XLS: ['.xls', '.xlsx'], CrawlMode.TXT: ['.txt', '.md'], CrawlMode.ALL: SUPPORTED_FILE_EXTENSIONS }.get(mode, set()) is_record_mode = mode == CrawlMode.RECORDS pbar = tqdm(total=limit, desc=f"🔍 جمع ({mode.value})") while queue and len(found_links) < limit: if job_manager.job["stop_event"].is_set(): job_manager.update_log("🛑 تم استلام إشارة الإيقاف أثناء الزحف.") break current_url = queue.popleft() if current_url in visited or not is_valid_url(current_url): continue visited.add(current_url) try: resp = session.get(current_url, timeout=20, headers=REQUEST_HEADERS) resp.raise_for_status() soup = BeautifulSoup(resp.content, "lxml") for a_tag in soup.find_all("a", href=True): href = urljoin(current_url, a_tag["href"]).split("#")[0].strip() if not is_valid_url(href) or urlparse(href).netloc != base_domain: continue is_target_file = any(href.lower().endswith(ext) for ext in target_file_extensions) is_record_page = any(indicator in href for indicator in RECORD_PAGE_INDICATORS) if (is_target_file or (is_record_mode and is_record_page)) and href not in found_links: found_links.add(href) pbar.update(1) job_manager.update_log(f"🔗 + {unquote(os.path.basename(href))}") elif href not in visited: queue.append(href) except requests.exceptions.Timeout: log.warning(f"⚠️ تجاوزت مهلة الزحف {current_url}.") job_manager.update_log(f"⚠️ تجاوزت مهلة الرابط: {current_url}") continue except requests.exceptions.RequestException as e: log.error(f"❌ فشل في زحف {current_url}: {e}") job_manager.update_log(f"❌ فشل الرابط: {current_url} ({type(e).__name__})") continue except Exception as e: log.error(f"❌ خطأ غير متوقع أثناء زحف {current_url}: {e}") job_manager.update_log(f"❌ خطأ غير متوقع: {current_url}") continue pbar.close() job_manager.update_log(f"🔎 تم العثور على {len(found_links)} رابط. بدء المعالجة...") job_manager.update_status(JobStatus.PROCESSING) with ThreadPoolExecutor(max_workers=app_config.MAX_WORKERS) as executor: future_to_link = {} for link in found_links: if not is_valid_url(link): job_manager.update_log(f"❌ تخطي رابط غير صالح: {link}") continue is_file = any(link.lower().endswith(ext) for ext in SUPPORTED_FILE_EXTENSIONS) if is_file: content_type = mimetypes.guess_type(link)[0] or 'application/octet-stream' future_to_link[executor.submit(process_single_document, link, content_type, session)] = link else: future_to_link[executor.submit(process_single_record_page, link, session)] = link timeout = 600 start_time = time.time() for future in tqdm(as_completed(future_to_link, timeout=timeout), total=len(future_to_link), desc="⚙️ معالجة الأهداف"): if job_manager.job["stop_event"].is_set(): job_manager.update_log("🛑 توقف المعالجة بناءً على طلب الإيقاف.") break if time.time() - start_time > timeout: job_manager.update_log("🛑 توقف المعالجة بسبب انتهاء المهلة الزمنية.") break try: if len(future_to_link) % 10 == 0: if not check_pool_health(app_context.connection_pool, app_config, log): job_manager.update_log("❌ فشل فحص الـ connection pool. إيقاف المعالجة.") break job_manager.update_log(future.result()) except Exception as e: link = future_to_link[future] job_manager.update_log(f"❌ فشل معالجة {link}: {str(e)}") log.error(f"❌ خطأ في معالجة {link}: {str(e)}\n{traceback.format_exc()}") job_manager.update_log(f"✅ اكتملت المعالجة. تم حفظ {len(future_to_link)} هدف في DB/GCS.") job_manager.update_status(JobStatus.DONE) except Exception as e: log.error(f"❌ حدث خطأ فادح في عملية الزحف: {e}") job_manager.update_status(JobStatus.ERROR) finally: session.close() job_manager.reset_thread() # ============================================================================== # 8. APPLICATION INITIALIZATION # ============================================================================== def initialize_app(config: Config, log: logging.Logger) -> Optional[AppContext]: """تهيئة جميع المكونات.""" log.info("🔧 بدء تهيئة مكونات التطبيق...") context = AppContext() if POSTGRES_AVAILABLE: try: context.connection_pool = psycopg2.pool.ThreadedConnectionPool( minconn=2, maxconn=config.MAX_WORKERS + 3, **config.DB_CONFIG, options="-c tcp_keepalives_idle=300 -c tcp_keepalives_interval=10 -c tcp_keepalives_count=5" ) if not check_pool_health(context.connection_pool, config, log): log.error("❌ فشل فحص الـ connection pool الأولي. لا يمكن المتابعة.") return None context.is_db_ready = True log.info("✅ تم إنشاء مجمع اتصالات قاعدة البيانات بنجاح.") except Exception as e: log.error(f"❌ فشل فادح في تهيئة قاعدة البيانات: {e}.") return None else: log.error("❌ مكتبة 'psycopg2' غير مثبتة.") return None if GCS_GEMINI_AVAILABLE: try: if config.GCS_BUCKET_NAME: gcs_credentials_json = os.getenv("GCS_CREDENTIALS") if not gcs_credentials_json: log.warning("⚠️ لم يتم العثور على GCS_CREDENTIALS.") context.is_gcs_ready = False else: credentials_info = json.loads(gcs_credentials_json) credentials = service_account.Credentials.from_service_account_info(credentials_info) context.storage_client = storage.Client(credentials=credentials) context.is_gcs_ready = True log.info("✅ تم الاتصال بـ Google Cloud Storage.") except Exception as e: log.error(f"❌ فشل تهيئة Google Cloud Storage: {e}") if config.GOOGLE_API_KEY: try: genai.configure(api_key=config.GOOGLE_API_KEY) context.is_gemini_ready = True log.info("✅ تم إعداد Google Gemini API.") except Exception as e: log.error(f"❌ فشل تهيئة Google Gemini API: {e}") log.info("👍 اكتملت تهيئة المكونات.") return context # ============================================================================== # 9. GRADIO UI DEFINITION # ============================================================================== def create_gradio_interface(): """إنشاء واجهة المستخدم باستخدام Gradio.""" with gr.Blocks(theme=gr.themes.Soft(), title="TawasoaAI OCR & Crawler") as demo: gr.Markdown("# 🚀 لوحة تحكم الزحف والمعالجة - TawasoaAI") with gr.Tabs(): with gr.TabItem("🕷️ الزحف والتجميع"): with gr.Row(): with gr.Column(scale=2): gr.Markdown("### 1. إعدادات الزحف") url_input = gr.Textbox(label="🔗 رابط الموقع للبدء", placeholder="example.com") limit_input = gr.Slider(minimum=1, maximum=5000, value=100, step=10, label="🔢 الحد الأقصى للأهداف") with gr.Row(): crawl_pdf_btn = gr.Button("📄 زحف PDF فقط", variant="secondary") crawl_records_btn = gr.Button("📝 زحف السجلات/الصفحات (HTML)", variant="primary") with gr.Row(): crawl_doc_btn = gr.Button("📑 زحف DOC/DOCX فقط", variant="secondary") crawl_xls_btn = gr.Button("📊 زحف XLS/XLSX فقط", variant="secondary") crawl_all_btn = gr.Button("🌍 زحف كل الملفات المدعومة", variant="secondary") stop_btn = gr.Button("🛑 إيقاف المهمة الحالية", variant="stop") with gr.Column(scale=3): gr.Markdown("### 2. مراقبة الحالة") status_output = gr.Textbox(label="الحالة الحالية", value=JobStatus.IDLE.value, interactive=False) log_output = gr.Textbox(label="📝 سجل الأحداث", lines=15, max_lines=15, interactive=False, autoscroll=True) def start_crawl_job(url, limit, mode): job_manager.start_job(crawl_website, "crawl", url, limit, mode) return "بدء مهمة الزحف...", JobStatus.CRAWLING.value crawl_records_btn.click(fn=lambda u, l: start_crawl_job(u, l, CrawlMode.RECORDS), inputs=[url_input, limit_input], outputs=[log_output, status_output]) crawl_pdf_btn.click(fn=lambda u, l: start_crawl_job(u, l, CrawlMode.PDF), inputs=[url_input, limit_input], outputs=[log_output, status_output]) crawl_doc_btn.click(fn=lambda u, l: start_crawl_job(u, l, CrawlMode.DOC), inputs=[url_input, limit_input], outputs=[log_output, status_output]) crawl_xls_btn.click(fn=lambda u, l: start_crawl_job(u, l, CrawlMode.XLS), inputs=[url_input, limit_input], outputs=[log_output, status_output]) crawl_all_btn.click(fn=lambda u, l: start_crawl_job(u, l, CrawlMode.ALL), inputs=[url_input, limit_input], outputs=[log_output, status_output]) stop_btn.click(fn=job_manager.stop_job, inputs=[], outputs=[log_output]) demo.load(fn=job_manager.get_status, inputs=[], outputs=[log_output, status_output]) with gr.TabItem("📄 البحث عن السجلات"): gr.Markdown("### 🔍 البحث عن الشركات والرخص") gr.Markdown("*(نتائج البحث ستشمل النصوص المستخلصة من صفحات السجلات (HTML) والبيانات الأساسية للملفات.)*") search_input = gr.Textbox(label="اسم الشركة / رقم الترخيص / كلمات مفتاحية", placeholder="أدخل اسم شركة أو رقم رخصة...") search_btn = gr.Button("ابحث", variant="primary") search_results_output = gr.Dataframe( headers=["اسم الملف", "الحالة", "الخلاصة", "تاريخ الإضافة", "نص المحتوى (مقتطع)"], datatype=["str", "str", "str", "str", "str"], row_count=5, col_count=5, label="نتائج البحث" ) def run_search(term: str) -> List[List[str]]: if not app_context.is_db_ready: return [["❌ فشل الاتصال بقاعدة البيانات", "", "", "", ""]] results = db_search_records(app_context.connection_pool, term) formatted_results = [] for res in results: full_text_preview = res.get('full_text_content', 'N/A') if full_text_preview: full_text_preview = full_text_preview[:150] + "..." if len(full_text_preview) > 150 else full_text_preview display_status = STATUS_TRANSLATIONS.get(res.get('status', 'N/A'), "غير متاح") formatted_results.append([ res.get('filename', 'N/A'), display_status, res.get('summary', 'N/A'), res.get('created_at', datetime.now()).strftime("%Y-%m-%d %H:%M:%S"), full_text_preview ]) return formatted_results search_btn.click(fn=run_search, inputs=[search_input], outputs=[search_results_output]) return demo # ============================================================================== # 10. MAIN APPLICATION ENTRY POINT # ============================================================================== if __name__ == "__main__": log = setup_logging() app_config = Config(logger=log) job_manager = JobManager() app_context = initialize_app(config=app_config, log=log) if app_context and app_context.is_db_ready: log.info("🚀 التطبيق جاهز للبدء.") gradio_app = create_gradio_interface() gradio_app.launch(server_name="0.0.0.0", server_port=7860, ssr_mode=False) else: log.critical("🔥 فشلت تهيئة التطبيق. سيتم إيقاف التشغيل.") sys.exit(1)