FRA / app.py
TawasoaAi's picture
Update app.py
5b152a9 verified
# -*- coding: utf-8 -*-
"""
📕 TawasoaAi_OCR.py - الإصدار 18.4 (إصلاح الأخطاء وإضافة أزرار زحف متعددة)
النسخة المستقرة مع إصلاحات للزحف، معالجة ملفات DOCX، والبيانات الثنائية.
✅ [إصلاح] تحويل 'psycopg2.extensions.Binary' إلى bytes باستخدام bytes().
✅ [إصلاح] إضافة التحقق من صحة ملفات DOCX وفحص MIME type لتجنب الأخطاء في الملفات التالفة أو غير الصالحة.
✅ [جديد] إضافة أزرار منفصلة للزحف على كل نوع من الملفات (PDF, DOC/DOCX, XLS/XLSX, TXT/MD) مع زر للكل.
✅ ربط الأزرار بأوضاع crawl_mode مختلفة في دالة crawl_website.
✅ دعم معالجة ملفات PDF, HTML, DOC/DOCX, XLS/XLSX, TXT, MD.
✅ دعم الزحف مع تحكم في الحد الأقصى والأدنى لعدد الملفات.
✅ معالجة ذكية باستخدام Gemini AI لتلخيص النصوص وتصنيفها.
✅ واجهة Gradio محسنة مع زر إيقاف نهائي.
"""
import os
import re
import io
import json
import time
import signal
import uuid
import traceback
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urljoin, urlparse, unquote
from collections import deque
import logging
import threading
import sys
from typing import Optional, List, Dict, Any, Tuple
from datetime import datetime, timedelta
# --- تحميل المكتبات الأساسية ---
import requests
from bs4 import BeautifulSoup
from dotenv import load_dotenv
from tqdm import tqdm
import psycopg2
from psycopg2 import pool, Binary
from psycopg2.extras import RealDictCursor, execute_values
import gradio as gr
# --- مكتبات متخصصة (اختيارية) ---
try:
from google.cloud import storage
GCS_AVAILABLE = True
except ImportError:
GCS_AVAILABLE = False
try:
import fitz # PyMuPDF
PDF_AVAILABLE = True
except ImportError:
PDF_AVAILABLE = False
try:
import docx
DOCX_AVAILABLE = True
except ImportError:
DOCX_AVAILABLE = False
try:
import pandas as pd
EXCEL_AVAILABLE = True
except ImportError:
EXCEL_AVAILABLE = False
try:
import markdown
MD_AVAILABLE = True
except ImportError:
MD_AVAILABLE = False
try:
import google.generativeai as genai
GEMINI_AVAILABLE = True
except ImportError:
GEMINI_AVAILABLE = False
# ✅ مكتبات خاصة بالـ OCR
try:
import pytesseract
from PIL import Image
OCR_AVAILABLE = True
except ImportError:
OCR_AVAILABLE = False
import zipfile
import mimetypes
# ============================================================
# 1. الإعدادات والتهيئة
# ============================================================
def setup_logging():
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S", handlers=[logging.FileHandler('tawasoa_ai_ocr.log', encoding='utf-8'), logging.StreamHandler(sys.stdout)])
return logging.getLogger("TawasoaAiOCR")
log = setup_logging()
load_dotenv()
DB_CONFIG = {"host": os.getenv("DB_HOST", "localhost"), "user": os.getenv("DB_USERNAME", "postgres"), "password": os.getenv("DB_PASSWORD", ""), "dbname": os.getenv("DB_DATABASE", "tawasoa"), "port": int(os.getenv("DB_PORT", 5432))}
GCS_BUCKET_NAME = os.getenv("GCS_BUCKET_NAME", "tawasoa-fra-documents")
MAX_WORKERS_DEFAULT = int(os.getenv("MAX_WORKERS_DEFAULT", 5))
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
REQUEST_HEADERS = {'User-Agent': 'Mozilla/5.0'}
connection_pool = None
storage_client = None
app_shutdown = threading.Event()
GCS_ENABLED = PDF_ENABLED = DOCX_ENABLED = EXCEL_ENABLED = GEMINI_ENABLED = OCR_ENABLED = MD_ENABLED = False
# ✅ قائمة بامتدادات الملفات المدعومة
SUPPORTED_EXTENSIONS = ['.pdf', '.html', '.htm', '.doc', '.docx', '.xls', '.xlsx', '.txt', '.md']
# ============================================================
# 2. تهيئة المكونات
# ============================================================
def initialize_components():
global connection_pool, storage_client, GCS_ENABLED, PDF_ENABLED, DOCX_ENABLED, EXCEL_ENABLED, GEMINI_ENABLED, OCR_ENABLED, MD_ENABLED
log.info("🔧 بدء تهيئة المكونات...")
if GEMINI_AVAILABLE and GOOGLE_API_KEY:
try:
genai.configure(api_key=GOOGLE_API_KEY)
GEMINI_ENABLED = True
log.info("✅ تم إعداد Google API.")
except Exception as e: log.error(f"❌ فشل تهيئة Google API: {e}")
if GCS_AVAILABLE:
try:
storage_client = storage.Client()
bucket = storage_client.bucket(GCS_BUCKET_NAME)
if not bucket.exists(): bucket.create(location='us-central1')
GCS_ENABLED = True
log.info(f"✅ تم الاتصال بدلو GCS '{GCS_BUCKET_NAME}'")
except Exception as e: log.error(f"❌ فشل تهيئة GCS: {e}")
try:
connection_pool = psycopg2.pool.ThreadedConnectionPool(minconn=2, maxconn=MAX_WORKERS_DEFAULT + 5, **DB_CONFIG)
log.info("✅ تم إنشاء مجمع الاتصالات.")
except Exception as e:
log.error(f"❌ فشل تهيئة قاعدة البيانات: {e}")
return False
PDF_ENABLED = PDF_AVAILABLE
DOCX_ENABLED = DOCX_AVAILABLE
EXCEL_ENABLED = EXCEL_AVAILABLE
MD_ENABLED = MD_AVAILABLE
OCR_ENABLED = OCR_AVAILABLE
if OCR_ENABLED:
log.info("✅ تم تفعيل المعالجة باستخدام OCR.")
else:
log.warning("⚠️ مكتبات OCR (pytesseract, Pillow) غير مثبتة، لن يتم معالجة الملفات الممسوحة ضوئياً.")
return True
# ============================================================
# 3. إدارة الحالة العامة
# ============================================================
job_lock = threading.Lock()
background_job = {"thread": None, "type": None, "status": "⚪ خامل", "log": ["مرحبًا بك!"], "stop_event": threading.Event()}
def update_job_log(message: str):
if not message: return
with job_lock:
background_job["log"].append(f"[{time.strftime('%H:%M:%S')}] {message}")
if len(background_job["log"]) > 200: background_job["log"] = background_job["log"][-200:]
def update_job_status(status: str):
with job_lock: background_job["status"] = status
def get_job_status() -> tuple[str, str, bool]:
with job_lock:
log_text = "\n".join(background_job["log"][-100:])
status = background_job["status"]
is_running = background_job["thread"] is not None and background_job["thread"].is_alive()
return log_text, status, is_running
# ============================================================
# 4. دوال التفكيك والمساعدة
# ============================================================
def is_valid_docx(content_bytes: bytes) -> bool:
"""✅ [جديد] التحقق من صحة ملف DOCX (يجب أن يكون أرشيف ZIP صالح)."""
try:
with zipfile.ZipFile(io.BytesIO(content_bytes)) as zf:
return 'word/document.xml' in zf.namelist()
except zipfile.BadZipFile:
return False
def get_mime_type(filename: str, content_bytes: bytes) -> str:
"""✅ [جديد] الحصول على نوع MIME للملف بناءً على الامتداد أو المحتوى."""
mime_type, _ = mimetypes.guess_type(filename)
if mime_type is None:
if content_bytes.startswith(b'\x50\x4B\x03\x04'): # توقيع ZIP
return 'application/vnd.openxmlformats-officedocument.wordprocessingml.document'
elif content_bytes.startswith(b'\xD0\xCF\x11\xE0'): # توقيع DOC القديم
return 'application/msword'
elif b'<?xml' in content_bytes[:100]:
return 'application/xml'
return mime_type or 'application/octet-stream'
def ocr_image_to_text(image_bytes: bytes) -> str:
"""✅ [جديد] دالة لاستخراج النص من صورة باستخدام Tesseract OCR."""
if not OCR_ENABLED: return ""
try:
image = Image.open(io.BytesIO(image_bytes))
# استخدام اللغة العربية في OCR
text = pytesseract.image_to_string(image, lang='ara')
return text.strip()
except Exception as e:
log.error(f"❌ فشل في عملية الـ OCR: {e}")
return ""
def generate_and_upload_cover_image(content_bytes: bytes, original_filename: str) -> Optional[str]:
"""✅ [مطور] إنشاء ورفع صورة غلاف باستخدام Signed URL."""
if not PDF_ENABLED or not GCS_ENABLED: return None
try:
doc = fitz.open(stream=io.BytesIO(content_bytes), filetype="pdf")
if not doc or doc.page_count == 0: return None
page = doc[0]
pix = page.get_pixmap(dpi=150)
image_bytes = pix.tobytes("png")
image_filename = f"cover_{os.path.splitext(original_filename)[0]}.png"
blob_name = f"covers/{uuid.uuid4().hex[:10]}_{image_filename}"
bucket = storage_client.bucket(GCS_BUCKET_NAME)
blob = bucket.blob(blob_name)
blob.upload_from_string(image_bytes, content_type="image/png")
signed_url = blob.generate_signed_url(expiration=timedelta(days=365*10))
log.info(f"🖼️ تم رفع صورة الغلاف وإنشاء رابط موقّع.")
return signed_url
except Exception as e:
log.error(f"❌ فشل في إنشاء أو رفع صورة الغلاف لـ '{original_filename}': {e}")
return None
def generate_page_images_as_binary(content_bytes: bytes) -> List[Dict[str, Any]]:
page_images = []
if not PDF_ENABLED: return page_images
try:
doc = fitz.open(stream=io.BytesIO(content_bytes), filetype="pdf")
if not doc or doc.page_count == 0: return page_images
for i, page in enumerate(doc):
pix = page.get_pixmap(dpi=200) # زيادة الدقة لتحسين نتائج OCR
image_bytes = pix.tobytes("png")
page_images.append({"page_number": i + 1, "image_data": Binary(image_bytes)})
log.info(f"🖼️ تم إنشاء {len(page_images)} صورة كبيانات ثنائية.")
return page_images
except Exception as e:
log.error(f"❌ فشل في إنشاء صور الصفحات: {e}")
return []
def decompose_pdf(content_bytes: bytes, page_images: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""✅ [مطور] تفكيك PDF بطريقتين: سريعة (fitz) وعميقة (OCR)."""
elements = []
if not PDF_ENABLED: return elements
# المحاولة الأولى: استخراج النصوص المدمجة (سريع)
try:
doc = fitz.open(stream=io.BytesIO(content_bytes), filetype="pdf")
for page_num, page in enumerate(doc):
text = page.get_text("text", sort=True).strip()
if text:
elements.append({"element_type": "page_content", "content": text, "page_number": page_num + 1, "metadata": "{}"})
log.info(f"📄 [المعالجة السريعة] تم تفكيك PDF إلى {len(elements)} عنصر نصي.")
except Exception as e:
log.error(f"❌ فشل في تفكيك PDF بالطريقة السريعة: {e}")
elements = []
# المحاولة الثانية: استخراج النصوص من الصور (معالجة عميقة باستخدام OCR)
if OCR_ENABLED:
ocr_elements = []
log.info("🔬 بدء المعالجة العميقة (OCR) لصور الصفحات...")
for page_image_data in page_images:
# ✅ [إصلاح] تحويل Binary إلى bytes
image_bytes = bytes(page_image_data['image_data'])
ocr_text = ocr_image_to_text(image_bytes)
if ocr_text and len(ocr_text) > 20: # التأكد من وجود نص حقيقي
ocr_elements.append({
"element_type": "ocr_content",
"content": ocr_text,
"page_number": page_image_data['page_number'],
"metadata": "{}"
})
log.info(f"🔬 [المعالجة العميقة] تم استخراج {len(ocr_elements)} عنصر نصي باستخدام OCR.")
# دمج النتائج: إعطاء الأولوية للنصوص المستخرجة من OCR لأنها قد تكون أكمل
if ocr_elements:
final_elements = {}
for el in elements:
final_elements[el['page_number']] = el # حفظ النتائج السريعة أولاً
for el in ocr_elements:
final_elements[el['page_number']] = el # الكتابة فوقها بنتائج OCR إذا وجدت
return list(final_elements.values())
return elements
def decompose_html(content_bytes: bytes) -> List[Dict[str, Any]]:
elements = []
try:
soup = BeautifulSoup(content_bytes.decode('utf-8', errors='ignore'), 'html.parser')
text = soup.get_text(separator='\n', strip=True)
if text:
elements.append({"element_type": "html_content", "content": text, "page_number": 1, "metadata": "{}"})
log.info(f"🌐 تم تفكيك HTML إلى {len(elements)} عنصر نصي.")
except Exception as e:
log.error(f"❌ فشل في تفكيك HTML: {e}")
return elements
def decompose_docx(content_bytes: bytes) -> List[Dict[str, Any]]:
elements = []
if not DOCX_ENABLED: return elements
try:
if not is_valid_docx(content_bytes):
log.error(f"❌ فشل في تفكيك DOCX: الملف غير صالح")
return elements
doc = docx.Document(io.BytesIO(content_bytes))
text = '\n'.join([para.text for para in doc.paragraphs if para.text.strip()])
if text:
elements.append({"element_type": "docx_content", "content": text, "page_number": 1, "metadata": "{}"})
log.info(f"📝 تم تفكيك DOCX إلى {len(elements)} عنصر نصي.")
except Exception as e:
log.error(f"❌ فشل في تفكيك DOCX: {e}")
return elements
def decompose_excel(content_bytes: bytes) -> List[Dict[str, Any]]:
elements = []
if not EXCEL_ENABLED: return elements
try:
df = pd.read_excel(io.BytesIO(content_bytes), sheet_name=None)
text = ''
for sheet_name, sheet_df in df.items():
text += f"Sheet: {sheet_name}\n{sheet_df.to_string(index=False)}\n\n"
if text:
elements.append({"element_type": "excel_content", "content": text, "page_number": 1, "metadata": "{}"})
log.info(f"📊 تم تفكيك Excel إلى {len(elements)} عنصر نصي.")
except Exception as e:
log.error(f"❌ فشل في تفكيك Excel: {e}")
return elements
def decompose_text_or_md(content_bytes: bytes, file_ext: str) -> List[Dict[str, Any]]:
elements = []
try:
text = content_bytes.decode('utf-8', errors='ignore').strip()
if file_ext == '.md' and MD_ENABLED:
html = markdown.markdown(text)
soup = BeautifulSoup(html, 'html.parser')
text = soup.get_text(separator='\n', strip=True)
if text:
elements.append({"element_type": "text_content", "content": text, "page_number": 1, "metadata": "{}"})
log.info(f"📄 تم تفكيك {file_ext.upper()} إلى {len(elements)} عنصر نصي.")
except Exception as e:
log.error(f"❌ فشل في تفكيك {file_ext.upper()}: {e}")
return elements
def get_embedding_for_text(text: str) -> Optional[List[float]]:
if not text or not GEMINI_ENABLED: return None
try:
return genai.embed_content(model="models/embedding-001", content=text[:8000], task_type="RETRIEVAL_DOCUMENT")['embedding']
except Exception: return None
def advanced_ai_processing(text: str) -> Dict[str, str]:
if not GEMINI_ENABLED: return {"summary": text[:500], "category": "غير مصنف"}
try:
model = genai.GenerativeModel('gemini-1.5-pro-latest')
prompt = f"تلخيص النص التالي باختصار وحدد فئته الرئيسية (مثل: أخبار، تقرير، قانوني، إلخ):\n\n{text[:4000]}"
response = model.generate_content(prompt)
summary = response.text.strip()
category = re.search(r'فئة: (.*)', summary) or "غير مصنف"
return {"summary": summary, "category": category.group(1) if category else "غير مصنف"}
except Exception as e:
log.error(f"❌ فشل في المعالجة الذكية: {e}")
return {"summary": text[:500], "category": "غير مصنف"}
# ============================================================
# 5. الوحدات الوظيفية (Workers)
# ============================================================
def process_single_pdf_from_gcs(blob: Any) -> str:
"""✅ [مطور] المحرك الجديد: يعالج ملفات PDF باستخدام OCR إذا لزم الأمر."""
conn = None
original_path = blob.public_url
safe_filename = unquote(os.path.basename(blob.name))
try:
content_bytes = blob.download_as_bytes()
conn = connection_pool.getconn()
with conn.cursor() as cursor:
cursor.execute("SELECT id FROM documents WHERE original_path = %s;", (original_path,))
if cursor.fetchone(): return f"🟡 (تخطي) موجود مسبقاً: {safe_filename}"
# خطوة 1: إنشاء الصور أولاً
page_images = generate_page_images_as_binary(content_bytes)
# خطوة 2: تفكيك النص باستخدام الصور التي تم إنشاؤها
elements = decompose_pdf(content_bytes, page_images)
if not elements and not page_images:
raise ValueError("الملف فارغ أو تالف، لم يتم استخراج نصوص أو صور.")
bucket = storage_client.bucket(GCS_BUCKET_NAME)
document_uuid = str(uuid.uuid4())
blob_name = f"documents/{document_uuid}/{safe_filename}"
pdf_blob = bucket.blob(blob_name)
pdf_blob.upload_from_string(content_bytes, content_type="application/pdf")
storage_path = pdf_blob.generate_signed_url(expiration=timedelta(days=365*10))
with conn.cursor() as cursor:
status = 'pending' if elements else 'completed_no_text'
summary = elements[0]['content'][:500] if elements else f"مستند صور فقط يحتوي على {len(page_images)} صفحة."
cursor.execute(
"""INSERT INTO documents (source, filename, original_path, storage_path, status, summary, created_at, updated_at)
VALUES (%s, %s, %s, %s, %s, %s, NOW(), NOW()) RETURNING id;""",
(original_path, safe_filename, original_path, storage_path, status, summary)
)
document_id = cursor.fetchone()[0]
if elements:
elements_to_insert = [(document_id, el['element_type'], el['content'], el['page_number'], el['metadata']) for el in elements]
execute_values(cursor, "INSERT INTO document_elements (document_id, element_type, content, page_number, metadata) VALUES %s", elements_to_insert)
if page_images:
pages_to_insert = [(document_id, p['page_number'], p['image_data']) for p in page_images]
execute_values(cursor, "INSERT INTO document_pages (document_id, page_number, image_data) VALUES %s", pages_to_insert)
conn.commit()
return f"✅ تمت معالجة '{safe_filename}' ({len(elements)} عنصر نصي, {len(page_images)} صورة)"
except Exception as e:
log.error(f"❌ خطأ فادح أثناء معالجة {safe_filename}: {e}\n{traceback.format_exc()}")
if conn: conn.rollback()
try:
conn_fail = connection_pool.getconn()
with conn_fail.cursor() as cur_fail:
cur_fail.execute("INSERT INTO documents (source, filename, original_path, status, summary, created_at, updated_at) VALUES (%s, %s, %s, 'failed', %s, NOW(), NOW()) ON CONFLICT (original_path) DO NOTHING", (original_path, safe_filename, original_path, str(e)))
conn_fail.commit()
except Exception as log_fail_e:
log.error(f"فشل حتى في تسجيل الخطأ: {log_fail_e}")
finally:
if 'conn_fail' in locals():
connection_pool.putconn(conn_fail)
return f"❌ فشلت معالجة {safe_filename}"
finally:
if conn:
connection_pool.putconn(conn)
def gcs_archive_master_worker():
if not GCS_ENABLED: return update_job_status("🔴 GCS غير مهيأ!")
update_job_status("⏳ فحص أرشيف GCS...")
conn = None
try:
conn = connection_pool.getconn()
with conn.cursor() as cur:
cur.execute("SELECT original_path FROM documents")
existing_paths = {row[0] for row in cur.fetchall()}
blobs = list(storage_client.list_blobs(GCS_BUCKET_NAME))
new_blobs = [b for b in blobs if b.public_url not in existing_paths and b.name.lower().endswith('.pdf')]
if not new_blobs: return update_job_status("✅ GCS محدث, لا توجد ملفات جديدة.")
update_job_log(f"☁️ تم العثور على {len(new_blobs)} مستند PDF جديد للمعالجة.")
with ThreadPoolExecutor(max_workers=MAX_WORKERS_DEFAULT) as executor:
future_to_blob = {executor.submit(process_single_pdf_from_gcs, blob): blob for blob in new_blobs}
for future in tqdm(as_completed(future_to_blob), total=len(new_blobs), desc="🗃️ معالجة الأرشيف"):
if background_job["stop_event"].is_set(): break
update_job_log(future.result())
finally:
if conn: connection_pool.putconn(conn)
with job_lock: background_job["thread"] = None
update_job_status("✅ اكتملت معالجة الأرشيف")
def crawl_site(start_url: str, page_limit: int, crawl_mode: str) -> List[str]:
"""جمع الروابط من موقع ويب بناءً على وضع الزحف."""
if not start_url.startswith(('http://', 'https://')):
start_url = 'https://' + start_url
visited = set()
queue = deque([start_url])
all_links = set()
base_domain = urlparse(start_url).netloc
crawl_all_content = crawl_mode == "الكل"
target_extensions = {
"ملفات PDF فقط": ['.pdf'],
"ملفات DOC/DOCX فقط": ['.doc', '.docx'],
"ملفات XLS/XLSX فقط": ['.xls', '.xlsx'],
"ملفات TXT/MD فقط": ['.txt', '.md'],
"الكل": ['.pdf', '.doc', '.docx', '.xls', '.xlsx', '.txt', '.md']
}[crawl_mode]
pbar = tqdm(total=page_limit, desc=f"🔍 جمع ({crawl_mode})")
session = requests.Session()
session.headers.update(REQUEST_HEADERS)
while queue and len(all_links) < page_limit:
if background_job["stop_event"].is_set():
break
url = queue.popleft()
if url in visited:
continue
visited.add(url)
try:
resp = session.get(url, timeout=20)
if resp.status_code != 200:
continue
soup = BeautifulSoup(resp.text, "lxml", from_encoding="utf-8")
for a_tag in soup.find_all("a", href=True):
href = urljoin(url, a_tag["href"]).split("#")[0].strip()
if urlparse(href).netloc == base_domain and href not in visited:
if any(href.lower().endswith(ext) for ext in target_extensions):
if len(all_links) < page_limit:
all_links.add(href)
pbar.update(1)
if crawl_all_content or not any(href.lower().endswith(ext) for ext in target_extensions):
queue.append(href)
except Exception as e:
log.error(f"❌ فشل في زحف {url}: {e}")
update_job_log(f"فشل في زحف {url}: {e}")
pbar.close()
return list(all_links)
def process_single_document(link: str, content_type: str) -> str:
"""معالجة مستند واحد (PDF, HTML, DOCX, Excel) وتخزينه."""
conn = None
original_path = link
safe_filename = unquote(os.path.basename(urlparse(link).path))
try:
resp = requests.get(link, timeout=60, headers=REQUEST_HEADERS)
resp.raise_for_status()
content_bytes = resp.content
conn = connection_pool.getconn()
with conn.cursor() as cursor:
cursor.execute("SELECT id FROM documents WHERE original_path = %s;", (original_path,))
if cursor.fetchone():
return f"🟡 (تخطي) موجود مسبقاً: {safe_filename}"
# اختيار دالة التفكيك بناءً على نوع المحتوى
content_type = content_type.lower()
if 'pdf' in content_type:
page_images = generate_page_images_as_binary(content_bytes)
elements = decompose_pdf(content_bytes, page_images)
elif 'html' in content_type:
elements = decompose_html(content_bytes)
elif 'docx' in content_type or 'doc' in content_type:
elements = decompose_docx(content_bytes)
elif 'xlsx' in content_type or 'xls' in content_type:
elements = decompose_excel(content_bytes)
elif 'text' in content_type or any(content_type.endswith(ext) for ext in ['.txt', '.md']):
file_ext = os.path.splitext(safe_filename)[1].lower()
elements = decompose_text_or_md(content_bytes, file_ext)
else:
raise ValueError(f"نوع محتوى غير مدعوم: {content_type}")
cover_image_url = generate_and_upload_cover_image(content_bytes, safe_filename) if 'pdf' in content_type else None
if not elements and (not page_images if 'pdf' in content_type else True):
raise ValueError("الملف فارغ أو تالف، لم يتم استخراج نصوص أو صور.")
if not storage_client or not GCS_ENABLED:
raise ValueError("GCS غير مهيأ، لا يمكن رفع الملف.")
bucket = storage_client.bucket(GCS_BUCKET_NAME)
document_uuid = str(uuid.uuid4())
blob_name = f"documents/{document_uuid}/{safe_filename}"
pdf_blob = bucket.blob(blob_name)
pdf_blob.upload_from_string(content_bytes, content_type=content_type)
pdf_blob.make_public()
storage_path = pdf_blob.public_url
with conn.cursor() as cursor:
if elements:
status = 'pending'
summary = elements[0]['content'][:500]
else:
status = 'completed_no_text'
summary = f"مستند صور فقط يحتوي على {len(page_images)} صفحة." if 'pdf' in content_type else "لا يحتوي على نص"
cursor.execute(
"""INSERT INTO documents (source, filename, original_path, storage_path, cover_image_path, status, summary, created_at, updated_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW()) RETURNING id;""",
(original_path, safe_filename, original_path, storage_path, cover_image_url, status, summary)
)
document_id = cursor.fetchone()[0]
if elements:
elements_to_insert = [(document_id, el['element_type'], el['content'], el['page_number'], el['metadata']) for el in elements]
execute_values(cursor,
"INSERT INTO document_elements (document_id, element_type, content, page_number, metadata) VALUES %s",
elements_to_insert)
if 'pdf' in content_type and page_images:
pages_to_insert = [(document_id, p['page_number'], p['image_data']) for p in page_images]
execute_values(cursor,
"INSERT INTO document_pages (document_id, page_number, image_data) VALUES %s",
pages_to_insert)
conn.commit()
return f"✅ تمت معالجة '{safe_filename}' ({len(elements)} عنصر نصي, {len(page_images)} صورة)" if 'pdf' in content_type else f"✅ تمت معالجة '{safe_filename}' ({len(elements)} عنصر نصي)"
except Exception as e:
log.error(f"❌ خطأ فادح أثناء معالجة {safe_filename}: {e}")
if conn:
conn.rollback()
try:
conn_fail = conn if conn else connection_pool.getconn()
with conn_fail.cursor() as cur_fail:
cur_fail.execute(
"""INSERT INTO documents (source, filename, original_path, status, summary, created_at, updated_at)
VALUES (%s, %s, %s, 'failed', %s, NOW(), NOW())
ON CONFLICT (original_path) DO NOTHING""",
(original_path, safe_filename, original_path, str(e))
)
conn_fail.commit()
return f"❌ فشلت معالجة {safe_filename}"
except Exception as log_fail_e:
log.error(f"فشل في تسجيل الخطأ: {log_fail_e}")
return f"❌ فشلت معالجة {safe_filename} وتسجيل الخطأ"
finally:
if 'conn_fail' in locals() and conn_fail and not conn:
connection_pool.putconn(conn_fail)
finally:
if conn:
connection_pool.putconn(conn)
def crawler_master_worker(start_url: str, page_limit: int, crawl_mode: str):
"""إدارة عملية الزحف ومعالجة المستندات."""
update_job_status("⏳ بدء الزحف...")
update_job_log(f"🌐 بدء الزحف من: {start_url}")
links = crawl_site(start_url, page_limit, crawl_mode)
if not links:
update_job_status("✅ لا توجد روابط جديدة.")
return
update_job_log(f"🔗 تم العثور على {len(links)} رابط للمعالجة.")
with ThreadPoolExecutor(max_workers=MAX_WORKERS_DEFAULT) as executor:
futures = []
for link in links:
try:
headers = requests.head(link, timeout=20, headers=REQUEST_HEADERS).headers
content_type = headers.get('Content-Type', '')
futures.append(executor.submit(process_single_document, link, content_type))
except Exception as e:
update_job_log(f"❌ فشل في جلب رأس {link}: {e}")
for future in as_completed(futures):
if background_job["stop_event"].is_set():
break
update_job_log(future.result())
update_job_status("✅ اكتمل الزحف والمعالجة")
def elements_embedding_worker():
conn = None
try:
update_job_status("⏳ بدء إنشاء المتجهات...")
conn = connection_pool.getconn()
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("SELECT id, content FROM document_elements WHERE embedding IS NULL")
elements = cur.fetchall()
if not elements: return update_job_status("✅ لا توجد عناصر جديدة لإنشاء متجه لها.")
update_job_log(f"🧠 تم العثور على {len(elements):,} عنصر للمعالجة.")
for element in tqdm(elements, desc="🧠 إنشاء المتجهات"):
if background_job["stop_event"].is_set(): break
embedding = get_embedding_for_text(element['content'])
if embedding:
with conn.cursor() as update_cur:
update_cur.execute("UPDATE document_elements SET embedding = %s WHERE id = %s", (embedding, element['id']))
conn.commit()
with conn.cursor() as cur:
cur.execute("UPDATE documents d SET status = 'completed' WHERE d.status = 'pending' AND NOT EXISTS (SELECT 1 FROM document_elements de WHERE de.document_id = d.id AND de.embedding IS NULL)")
conn.commit()
finally:
if conn: connection_pool.putconn(conn)
with job_lock: background_job["thread"] = None
update_job_status("✅ اكتمل إنشاء المتجهات")
# ============================================================
# 6. وظائف الواجهة
# ============================================================
def get_dashboard_stats():
conn = None
try:
conn = connection_pool.getconn()
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("SELECT status, COUNT(*) as count FROM documents GROUP BY status;")
stats = {row['status']: row['count'] for row in cur.fetchall()}
cur.execute("SELECT COUNT(*) as element_count, COUNT(embedding) as embedded_count FROM document_elements;")
element_stats = cur.fetchone() or {}
total = sum(stats.values())
pending = stats.get('pending', 0)
completed = stats.get('completed', 0)
failed = stats.get('failed', 0)
completed_no_text = stats.get('completed_no_text', 0)
stats_md = f"""### 📊 إحصائيات قاعدة البيانات\n| الفئة | العدد |\n|---|---|\n| **إجمالي المستندات** | **{total:,}** |\n| ⏳ قيد الإنشاء | {pending:,} |\n| ✅ مكتمل (مع نص) | {completed:,} |\n| 🖼️ مكتمل (صور فقط) | {completed_no_text:,} |\n| 🔴 فاشل | {failed:,} |\n| **إجمالي العناصر النصية** | **{element_stats.get('element_count', 0):,}** |\n| 🔗 عناصر مع متجه | {element_stats.get('embedded_count', 0):,} |"""
return stats_md
except Exception as e: return f"### خطأ: {e}"
finally:
if conn: connection_pool.putconn(conn)
def clear_database_wrapper(confirmation: bool):
if not confirmation: return "⚠️ تم إلغاء العملية."
conn = None
try:
conn = connection_pool.getconn()
with conn.cursor() as cur:
cur.execute("TRUNCATE TABLE documents RESTART IDENTITY CASCADE;")
conn.commit()
return "✅ تم مسح جميع البيانات بنجاح"
except Exception as e:
if conn: conn.rollback()
return f"❌ خطأ أثناء المسح: {e}"
finally:
if conn: connection_pool.putconn(conn)
def clear_failed_documents_wrapper():
conn = None
try:
conn = connection_pool.getconn()
with conn.cursor() as cur:
cur.execute("DELETE FROM documents WHERE status = 'failed';")
deleted_count = cur.rowcount
conn.commit()
msg = f"✅ تم حذف {deleted_count} مستند من المستندات الفاشلة."
log.info(msg)
return msg
except Exception as e:
if conn: conn.rollback()
msg = f"❌ خطأ أثناء حذف المستندات الفاشلة: {e}"
log.error(msg)
return msg
finally:
if conn: connection_pool.putconn(conn)
# ============================================================
# 7. بناء الواجهة الرسومية
# ============================================================
def start_job_wrapper(job_type: str, *args):
with job_lock:
if background_job["thread"] and background_job["thread"].is_alive(): return "⏳ يوجد عملية قيد التنفيذ."
target_func_map = {
'embedding_processor': elements_embedding_worker,
'gcs_processor': gcs_archive_master_worker,
'crawler_pdf': lambda url, limit: crawler_master_worker(url, limit, "ملفات PDF فقط"),
'crawler_doc': lambda url, limit: crawler_master_worker(url, limit, "ملفات DOC/DOCX فقط"),
'crawler_xls': lambda url, limit: crawler_master_worker(url, limit, "ملفات XLS/XLSX فقط"),
'crawler_txt_md': lambda url, limit: crawler_master_worker(url, limit, "ملفات TXT/MD فقط"),
'crawler_all': lambda url, limit: crawler_master_worker(url, limit, "الكل")
}
if job_type not in target_func_map: return "🔴 نوع مهمة غير معروف"
background_job.update({
"thread": threading.Thread(target=target_func_map[job_type], args=args, daemon=True),
"log": [f"🚀 بدء مهمة: {job_type}..."],
"status": "🚀 يستعد",
"stop_event": threading.Event(),
"type": job_type
})
background_job["thread"].start()
return f"🚀 بدأت عملية {job_type}!"
def stop_job_wrapper():
with job_lock:
if not (background_job["thread"] and background_job["thread"].is_alive()): return "ℹ️ لا توجد عملية لإيقافها."
background_job["stop_event"].set()
update_job_status("🛑 جاري الإيقاف...")
return "🛑 تم إرسال إشارة الإيقاف."
def build_ui():
with gr.Blocks(theme=gr.themes.Soft(), title="TawasoaAi OCR v18.4") as demo:
gr.Markdown("# 🚀 **TawasoaAi v18.4** - المعالجة فائقة الذكاء (OCR)")
with gr.Row():
status_indicator = gr.Textbox(label="حالة النظام", interactive=False)
stop_btn = gr.Button("🛑 إيقاف العملية", variant="stop", visible=False)
with gr.Tabs():
with gr.Tab("📊 لوحة التحكم"):
stats_display = gr.Markdown("...")
refresh_stats_btn = gr.Button("🔄 تحديث الإحصائيات")
with gr.Tab("🗃️ معالجة أرشيف GCS"):
log_gcs = gr.Textbox(label="سجل معالجة الأرشيف", lines=20, interactive=False, autoscroll=True)
process_gcs_btn = gr.Button("🚀 ابدأ معالجة ملفات GCS", variant="primary")
with gr.Tab("🧠 إنشاء المتجهات"):
log_embed = gr.Textbox(label="سجل إنشاء المتجهات", lines=20, interactive=False, autoscroll=True)
process_embed_btn = gr.Button("🧠 ابدأ إنشاء المتجهات", variant="primary")
with gr.Tab("🌐 الزحف على المواقع"):
with gr.Row():
crawl_url = gr.Textbox(label="رابط الموقع", placeholder="أدخل رابط الموقع (مثال: https://example.com)")
crawl_limit = gr.Number(label="الحد الأقصى لعدد الملفات", value=100, minimum=1, maximum=1000)
with gr.Row():
crawl_pdf_btn = gr.Button("📄 زحف على ملفات PDF فقط", variant="secondary")
crawl_doc_btn = gr.Button("📝 زحف على ملفات DOC/DOCX فقط", variant="secondary")
crawl_xls_btn = gr.Button("📊 زحف على ملفات XLS/XLSX فقط", variant="secondary")
crawl_txt_md_btn = gr.Button("📄 زحف على ملفات TXT/MD فقط", variant="secondary")
crawl_all_btn = gr.Button("🌐 زحف على الكل", variant="primary")
log_crawl = gr.Textbox(label="سجل الزحف", lines=20, interactive=False, autoscroll=True)
with gr.Tab("🔧 صيانة"):
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### 🗑️ حذف **كل** البيانات")
confirm_checkbox_all = gr.Checkbox(label="✅ أؤكد حذف جميع البيانات نهائياً")
clear_all_btn = gr.Button("🗑️ مسح كل شيء الآن", variant="stop", interactive=False)
clear_all_output = gr.Textbox(label="النتيجة", interactive=False)
with gr.Column(scale=1):
gr.Markdown("### 🧹 حذف المستندات **الفاشلة** فقط")
gr.Markdown("استخدم هذا الخيار للتخلص من السجلات التي فشلت أثناء المعالجة والمحاولة مرة أخرى.")
clear_failed_btn = gr.Button("🧹 مسح المستندات الفاشلة", variant="secondary")
clear_failed_output = gr.Textbox(label="النتيجة", interactive=False)
def update_interface():
log, status, is_running = get_job_status()
log_text = "\n".join(background_job["log"][-100:])
status = background_job["status"]
is_running = background_job["thread"] is not None and background_job["thread"].is_alive()
return log_text, status, is_running
demo.load(update_interface, outputs=[log_crawl, status_indicator, stop_btn])
refresh_stats_btn.click(get_dashboard_stats, outputs=stats_display)
stop_btn.click(stop_job_wrapper, outputs=status_indicator)
process_gcs_btn.click(lambda: start_job_wrapper("gcs_processor"), outputs=status_indicator)
process_embed_btn.click(lambda: start_job_wrapper("embedding_processor"), outputs=status_indicator)
confirm_checkbox_all.change(lambda x: gr.update(interactive=x), inputs=confirm_checkbox_all, outputs=clear_all_btn)
clear_all_btn.click(clear_database_wrapper, inputs=confirm_checkbox_all, outputs=clear_all_output).then(get_dashboard_stats, outputs=stats_display)
clear_failed_btn.click(clear_failed_documents_wrapper, outputs=clear_failed_output).then(get_dashboard_stats, outputs=stats_display)
crawl_pdf_btn.click(lambda url, limit: start_job_wrapper("crawler_pdf", url, limit), inputs=[crawl_url, crawl_limit], outputs=status_indicator)
crawl_doc_btn.click(lambda url, limit: start_job_wrapper("crawler_doc", url, limit), inputs=[crawl_url, crawl_limit], outputs=status_indicator)
crawl_xls_btn.click(lambda url, limit: start_job_wrapper("crawler_xls", url, limit), inputs=[crawl_url, crawl_limit], outputs=status_indicator)
crawl_txt_md_btn.click(lambda url, limit: start_job_wrapper("crawler_txt_md", url, limit), inputs=[crawl_url, crawl_limit], outputs=status_indicator)
crawl_all_btn.click(lambda url, limit: start_job_wrapper("crawler_all", url, limit), inputs=[crawl_url, crawl_limit], outputs=status_indicator)
return demo
# ============================================================
# 8. نقطة انطلاق البرنامج
# ============================================================
def graceful_shutdown(signum, frame):
log.info("SIGINT/SIGTERM received. Shutting down...")
stop_job_wrapper()
if connection_pool: connection_pool.closeall()
sys.exit(0)
if __name__ == "__main__":
signal.signal(signal.SIGINT, graceful_shutdown)
signal.signal(signal.SIGTERM, graceful_shutdown)
if not initialize_components():
sys.exit(1)
ui = build_ui()
ui.queue().launch(server_name="0.0.0.0", server_port=7860, share=False)