|
|
|
""" |
|
📕 TawasoaAi_OCR.py - الإصدار 18.4 (إصلاح الأخطاء وإضافة أزرار زحف متعددة) |
|
النسخة المستقرة مع إصلاحات للزحف، معالجة ملفات DOCX، والبيانات الثنائية. |
|
✅ [إصلاح] تحويل 'psycopg2.extensions.Binary' إلى bytes باستخدام bytes(). |
|
✅ [إصلاح] إضافة التحقق من صحة ملفات DOCX وفحص MIME type لتجنب الأخطاء في الملفات التالفة أو غير الصالحة. |
|
✅ [جديد] إضافة أزرار منفصلة للزحف على كل نوع من الملفات (PDF, DOC/DOCX, XLS/XLSX, TXT/MD) مع زر للكل. |
|
✅ ربط الأزرار بأوضاع crawl_mode مختلفة في دالة crawl_website. |
|
✅ دعم معالجة ملفات PDF, HTML, DOC/DOCX, XLS/XLSX, TXT, MD. |
|
✅ دعم الزحف مع تحكم في الحد الأقصى والأدنى لعدد الملفات. |
|
✅ معالجة ذكية باستخدام Gemini AI لتلخيص النصوص وتصنيفها. |
|
✅ واجهة Gradio محسنة مع زر إيقاف نهائي. |
|
""" |
|
|
|
import os |
|
import re |
|
import io |
|
import json |
|
import time |
|
import signal |
|
import uuid |
|
import traceback |
|
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
from urllib.parse import urljoin, urlparse, unquote |
|
from collections import deque |
|
import logging |
|
import threading |
|
import sys |
|
from typing import Optional, List, Dict, Any, Tuple |
|
from datetime import datetime, timedelta |
|
|
|
|
|
import requests |
|
from bs4 import BeautifulSoup |
|
from dotenv import load_dotenv |
|
from tqdm import tqdm |
|
import psycopg2 |
|
from psycopg2 import pool, Binary |
|
from psycopg2.extras import RealDictCursor, execute_values |
|
import gradio as gr |
|
|
|
|
|
try: |
|
from google.cloud import storage |
|
GCS_AVAILABLE = True |
|
except ImportError: |
|
GCS_AVAILABLE = False |
|
|
|
try: |
|
import fitz |
|
PDF_AVAILABLE = True |
|
except ImportError: |
|
PDF_AVAILABLE = False |
|
|
|
try: |
|
import docx |
|
DOCX_AVAILABLE = True |
|
except ImportError: |
|
DOCX_AVAILABLE = False |
|
|
|
try: |
|
import pandas as pd |
|
EXCEL_AVAILABLE = True |
|
except ImportError: |
|
EXCEL_AVAILABLE = False |
|
|
|
try: |
|
import markdown |
|
MD_AVAILABLE = True |
|
except ImportError: |
|
MD_AVAILABLE = False |
|
|
|
try: |
|
import google.generativeai as genai |
|
GEMINI_AVAILABLE = True |
|
except ImportError: |
|
GEMINI_AVAILABLE = False |
|
|
|
|
|
try: |
|
import pytesseract |
|
from PIL import Image |
|
OCR_AVAILABLE = True |
|
except ImportError: |
|
OCR_AVAILABLE = False |
|
|
|
import zipfile |
|
import mimetypes |
|
|
|
|
|
|
|
|
|
def setup_logging(): |
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S", handlers=[logging.FileHandler('tawasoa_ai_ocr.log', encoding='utf-8'), logging.StreamHandler(sys.stdout)]) |
|
return logging.getLogger("TawasoaAiOCR") |
|
|
|
log = setup_logging() |
|
load_dotenv() |
|
|
|
DB_CONFIG = {"host": os.getenv("DB_HOST", "localhost"), "user": os.getenv("DB_USERNAME", "postgres"), "password": os.getenv("DB_PASSWORD", ""), "dbname": os.getenv("DB_DATABASE", "tawasoa"), "port": int(os.getenv("DB_PORT", 5432))} |
|
GCS_BUCKET_NAME = os.getenv("GCS_BUCKET_NAME", "tawasoa-fra-documents") |
|
MAX_WORKERS_DEFAULT = int(os.getenv("MAX_WORKERS_DEFAULT", 5)) |
|
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") |
|
REQUEST_HEADERS = {'User-Agent': 'Mozilla/5.0'} |
|
|
|
connection_pool = None |
|
storage_client = None |
|
app_shutdown = threading.Event() |
|
GCS_ENABLED = PDF_ENABLED = DOCX_ENABLED = EXCEL_ENABLED = GEMINI_ENABLED = OCR_ENABLED = MD_ENABLED = False |
|
|
|
|
|
SUPPORTED_EXTENSIONS = ['.pdf', '.html', '.htm', '.doc', '.docx', '.xls', '.xlsx', '.txt', '.md'] |
|
|
|
|
|
|
|
|
|
def initialize_components(): |
|
global connection_pool, storage_client, GCS_ENABLED, PDF_ENABLED, DOCX_ENABLED, EXCEL_ENABLED, GEMINI_ENABLED, OCR_ENABLED, MD_ENABLED |
|
log.info("🔧 بدء تهيئة المكونات...") |
|
if GEMINI_AVAILABLE and GOOGLE_API_KEY: |
|
try: |
|
genai.configure(api_key=GOOGLE_API_KEY) |
|
GEMINI_ENABLED = True |
|
log.info("✅ تم إعداد Google API.") |
|
except Exception as e: log.error(f"❌ فشل تهيئة Google API: {e}") |
|
if GCS_AVAILABLE: |
|
try: |
|
storage_client = storage.Client() |
|
bucket = storage_client.bucket(GCS_BUCKET_NAME) |
|
if not bucket.exists(): bucket.create(location='us-central1') |
|
GCS_ENABLED = True |
|
log.info(f"✅ تم الاتصال بدلو GCS '{GCS_BUCKET_NAME}'") |
|
except Exception as e: log.error(f"❌ فشل تهيئة GCS: {e}") |
|
try: |
|
connection_pool = psycopg2.pool.ThreadedConnectionPool(minconn=2, maxconn=MAX_WORKERS_DEFAULT + 5, **DB_CONFIG) |
|
log.info("✅ تم إنشاء مجمع الاتصالات.") |
|
except Exception as e: |
|
log.error(f"❌ فشل تهيئة قاعدة البيانات: {e}") |
|
return False |
|
|
|
PDF_ENABLED = PDF_AVAILABLE |
|
DOCX_ENABLED = DOCX_AVAILABLE |
|
EXCEL_ENABLED = EXCEL_AVAILABLE |
|
MD_ENABLED = MD_AVAILABLE |
|
OCR_ENABLED = OCR_AVAILABLE |
|
if OCR_ENABLED: |
|
log.info("✅ تم تفعيل المعالجة باستخدام OCR.") |
|
else: |
|
log.warning("⚠️ مكتبات OCR (pytesseract, Pillow) غير مثبتة، لن يتم معالجة الملفات الممسوحة ضوئياً.") |
|
return True |
|
|
|
|
|
|
|
|
|
job_lock = threading.Lock() |
|
background_job = {"thread": None, "type": None, "status": "⚪ خامل", "log": ["مرحبًا بك!"], "stop_event": threading.Event()} |
|
def update_job_log(message: str): |
|
if not message: return |
|
with job_lock: |
|
background_job["log"].append(f"[{time.strftime('%H:%M:%S')}] {message}") |
|
if len(background_job["log"]) > 200: background_job["log"] = background_job["log"][-200:] |
|
def update_job_status(status: str): |
|
with job_lock: background_job["status"] = status |
|
def get_job_status() -> tuple[str, str, bool]: |
|
with job_lock: |
|
log_text = "\n".join(background_job["log"][-100:]) |
|
status = background_job["status"] |
|
is_running = background_job["thread"] is not None and background_job["thread"].is_alive() |
|
return log_text, status, is_running |
|
|
|
|
|
|
|
|
|
def is_valid_docx(content_bytes: bytes) -> bool: |
|
"""✅ [جديد] التحقق من صحة ملف DOCX (يجب أن يكون أرشيف ZIP صالح).""" |
|
try: |
|
with zipfile.ZipFile(io.BytesIO(content_bytes)) as zf: |
|
return 'word/document.xml' in zf.namelist() |
|
except zipfile.BadZipFile: |
|
return False |
|
|
|
def get_mime_type(filename: str, content_bytes: bytes) -> str: |
|
"""✅ [جديد] الحصول على نوع MIME للملف بناءً على الامتداد أو المحتوى.""" |
|
mime_type, _ = mimetypes.guess_type(filename) |
|
if mime_type is None: |
|
if content_bytes.startswith(b'\x50\x4B\x03\x04'): |
|
return 'application/vnd.openxmlformats-officedocument.wordprocessingml.document' |
|
elif content_bytes.startswith(b'\xD0\xCF\x11\xE0'): |
|
return 'application/msword' |
|
elif b'<?xml' in content_bytes[:100]: |
|
return 'application/xml' |
|
return mime_type or 'application/octet-stream' |
|
|
|
def ocr_image_to_text(image_bytes: bytes) -> str: |
|
"""✅ [جديد] دالة لاستخراج النص من صورة باستخدام Tesseract OCR.""" |
|
if not OCR_ENABLED: return "" |
|
try: |
|
image = Image.open(io.BytesIO(image_bytes)) |
|
|
|
text = pytesseract.image_to_string(image, lang='ara') |
|
return text.strip() |
|
except Exception as e: |
|
log.error(f"❌ فشل في عملية الـ OCR: {e}") |
|
return "" |
|
|
|
def generate_and_upload_cover_image(content_bytes: bytes, original_filename: str) -> Optional[str]: |
|
"""✅ [مطور] إنشاء ورفع صورة غلاف باستخدام Signed URL.""" |
|
if not PDF_ENABLED or not GCS_ENABLED: return None |
|
try: |
|
doc = fitz.open(stream=io.BytesIO(content_bytes), filetype="pdf") |
|
if not doc or doc.page_count == 0: return None |
|
page = doc[0] |
|
pix = page.get_pixmap(dpi=150) |
|
image_bytes = pix.tobytes("png") |
|
|
|
image_filename = f"cover_{os.path.splitext(original_filename)[0]}.png" |
|
blob_name = f"covers/{uuid.uuid4().hex[:10]}_{image_filename}" |
|
bucket = storage_client.bucket(GCS_BUCKET_NAME) |
|
blob = bucket.blob(blob_name) |
|
blob.upload_from_string(image_bytes, content_type="image/png") |
|
|
|
signed_url = blob.generate_signed_url(expiration=timedelta(days=365*10)) |
|
|
|
log.info(f"🖼️ تم رفع صورة الغلاف وإنشاء رابط موقّع.") |
|
return signed_url |
|
except Exception as e: |
|
log.error(f"❌ فشل في إنشاء أو رفع صورة الغلاف لـ '{original_filename}': {e}") |
|
return None |
|
|
|
def generate_page_images_as_binary(content_bytes: bytes) -> List[Dict[str, Any]]: |
|
page_images = [] |
|
if not PDF_ENABLED: return page_images |
|
try: |
|
doc = fitz.open(stream=io.BytesIO(content_bytes), filetype="pdf") |
|
if not doc or doc.page_count == 0: return page_images |
|
for i, page in enumerate(doc): |
|
pix = page.get_pixmap(dpi=200) |
|
image_bytes = pix.tobytes("png") |
|
page_images.append({"page_number": i + 1, "image_data": Binary(image_bytes)}) |
|
log.info(f"🖼️ تم إنشاء {len(page_images)} صورة كبيانات ثنائية.") |
|
return page_images |
|
except Exception as e: |
|
log.error(f"❌ فشل في إنشاء صور الصفحات: {e}") |
|
return [] |
|
|
|
def decompose_pdf(content_bytes: bytes, page_images: List[Dict[str, Any]]) -> List[Dict[str, Any]]: |
|
"""✅ [مطور] تفكيك PDF بطريقتين: سريعة (fitz) وعميقة (OCR).""" |
|
elements = [] |
|
if not PDF_ENABLED: return elements |
|
|
|
|
|
try: |
|
doc = fitz.open(stream=io.BytesIO(content_bytes), filetype="pdf") |
|
for page_num, page in enumerate(doc): |
|
text = page.get_text("text", sort=True).strip() |
|
if text: |
|
elements.append({"element_type": "page_content", "content": text, "page_number": page_num + 1, "metadata": "{}"}) |
|
log.info(f"📄 [المعالجة السريعة] تم تفكيك PDF إلى {len(elements)} عنصر نصي.") |
|
except Exception as e: |
|
log.error(f"❌ فشل في تفكيك PDF بالطريقة السريعة: {e}") |
|
elements = [] |
|
|
|
|
|
if OCR_ENABLED: |
|
ocr_elements = [] |
|
log.info("🔬 بدء المعالجة العميقة (OCR) لصور الصفحات...") |
|
for page_image_data in page_images: |
|
|
|
image_bytes = bytes(page_image_data['image_data']) |
|
ocr_text = ocr_image_to_text(image_bytes) |
|
if ocr_text and len(ocr_text) > 20: |
|
ocr_elements.append({ |
|
"element_type": "ocr_content", |
|
"content": ocr_text, |
|
"page_number": page_image_data['page_number'], |
|
"metadata": "{}" |
|
}) |
|
log.info(f"🔬 [المعالجة العميقة] تم استخراج {len(ocr_elements)} عنصر نصي باستخدام OCR.") |
|
|
|
|
|
if ocr_elements: |
|
final_elements = {} |
|
for el in elements: |
|
final_elements[el['page_number']] = el |
|
for el in ocr_elements: |
|
final_elements[el['page_number']] = el |
|
return list(final_elements.values()) |
|
|
|
return elements |
|
|
|
def decompose_html(content_bytes: bytes) -> List[Dict[str, Any]]: |
|
elements = [] |
|
try: |
|
soup = BeautifulSoup(content_bytes.decode('utf-8', errors='ignore'), 'html.parser') |
|
text = soup.get_text(separator='\n', strip=True) |
|
if text: |
|
elements.append({"element_type": "html_content", "content": text, "page_number": 1, "metadata": "{}"}) |
|
log.info(f"🌐 تم تفكيك HTML إلى {len(elements)} عنصر نصي.") |
|
except Exception as e: |
|
log.error(f"❌ فشل في تفكيك HTML: {e}") |
|
return elements |
|
|
|
def decompose_docx(content_bytes: bytes) -> List[Dict[str, Any]]: |
|
elements = [] |
|
if not DOCX_ENABLED: return elements |
|
try: |
|
if not is_valid_docx(content_bytes): |
|
log.error(f"❌ فشل في تفكيك DOCX: الملف غير صالح") |
|
return elements |
|
doc = docx.Document(io.BytesIO(content_bytes)) |
|
text = '\n'.join([para.text for para in doc.paragraphs if para.text.strip()]) |
|
if text: |
|
elements.append({"element_type": "docx_content", "content": text, "page_number": 1, "metadata": "{}"}) |
|
log.info(f"📝 تم تفكيك DOCX إلى {len(elements)} عنصر نصي.") |
|
except Exception as e: |
|
log.error(f"❌ فشل في تفكيك DOCX: {e}") |
|
return elements |
|
|
|
def decompose_excel(content_bytes: bytes) -> List[Dict[str, Any]]: |
|
elements = [] |
|
if not EXCEL_ENABLED: return elements |
|
try: |
|
df = pd.read_excel(io.BytesIO(content_bytes), sheet_name=None) |
|
text = '' |
|
for sheet_name, sheet_df in df.items(): |
|
text += f"Sheet: {sheet_name}\n{sheet_df.to_string(index=False)}\n\n" |
|
if text: |
|
elements.append({"element_type": "excel_content", "content": text, "page_number": 1, "metadata": "{}"}) |
|
log.info(f"📊 تم تفكيك Excel إلى {len(elements)} عنصر نصي.") |
|
except Exception as e: |
|
log.error(f"❌ فشل في تفكيك Excel: {e}") |
|
return elements |
|
|
|
def decompose_text_or_md(content_bytes: bytes, file_ext: str) -> List[Dict[str, Any]]: |
|
elements = [] |
|
try: |
|
text = content_bytes.decode('utf-8', errors='ignore').strip() |
|
if file_ext == '.md' and MD_ENABLED: |
|
html = markdown.markdown(text) |
|
soup = BeautifulSoup(html, 'html.parser') |
|
text = soup.get_text(separator='\n', strip=True) |
|
if text: |
|
elements.append({"element_type": "text_content", "content": text, "page_number": 1, "metadata": "{}"}) |
|
log.info(f"📄 تم تفكيك {file_ext.upper()} إلى {len(elements)} عنصر نصي.") |
|
except Exception as e: |
|
log.error(f"❌ فشل في تفكيك {file_ext.upper()}: {e}") |
|
return elements |
|
|
|
def get_embedding_for_text(text: str) -> Optional[List[float]]: |
|
if not text or not GEMINI_ENABLED: return None |
|
try: |
|
return genai.embed_content(model="models/embedding-001", content=text[:8000], task_type="RETRIEVAL_DOCUMENT")['embedding'] |
|
except Exception: return None |
|
|
|
def advanced_ai_processing(text: str) -> Dict[str, str]: |
|
if not GEMINI_ENABLED: return {"summary": text[:500], "category": "غير مصنف"} |
|
try: |
|
model = genai.GenerativeModel('gemini-1.5-pro-latest') |
|
prompt = f"تلخيص النص التالي باختصار وحدد فئته الرئيسية (مثل: أخبار، تقرير، قانوني، إلخ):\n\n{text[:4000]}" |
|
response = model.generate_content(prompt) |
|
summary = response.text.strip() |
|
category = re.search(r'فئة: (.*)', summary) or "غير مصنف" |
|
return {"summary": summary, "category": category.group(1) if category else "غير مصنف"} |
|
except Exception as e: |
|
log.error(f"❌ فشل في المعالجة الذكية: {e}") |
|
return {"summary": text[:500], "category": "غير مصنف"} |
|
|
|
|
|
|
|
|
|
def process_single_pdf_from_gcs(blob: Any) -> str: |
|
"""✅ [مطور] المحرك الجديد: يعالج ملفات PDF باستخدام OCR إذا لزم الأمر.""" |
|
conn = None |
|
original_path = blob.public_url |
|
safe_filename = unquote(os.path.basename(blob.name)) |
|
try: |
|
content_bytes = blob.download_as_bytes() |
|
|
|
conn = connection_pool.getconn() |
|
with conn.cursor() as cursor: |
|
cursor.execute("SELECT id FROM documents WHERE original_path = %s;", (original_path,)) |
|
if cursor.fetchone(): return f"🟡 (تخطي) موجود مسبقاً: {safe_filename}" |
|
|
|
|
|
page_images = generate_page_images_as_binary(content_bytes) |
|
|
|
|
|
elements = decompose_pdf(content_bytes, page_images) |
|
|
|
if not elements and not page_images: |
|
raise ValueError("الملف فارغ أو تالف، لم يتم استخراج نصوص أو صور.") |
|
|
|
bucket = storage_client.bucket(GCS_BUCKET_NAME) |
|
document_uuid = str(uuid.uuid4()) |
|
blob_name = f"documents/{document_uuid}/{safe_filename}" |
|
pdf_blob = bucket.blob(blob_name) |
|
pdf_blob.upload_from_string(content_bytes, content_type="application/pdf") |
|
storage_path = pdf_blob.generate_signed_url(expiration=timedelta(days=365*10)) |
|
|
|
with conn.cursor() as cursor: |
|
status = 'pending' if elements else 'completed_no_text' |
|
summary = elements[0]['content'][:500] if elements else f"مستند صور فقط يحتوي على {len(page_images)} صفحة." |
|
|
|
cursor.execute( |
|
"""INSERT INTO documents (source, filename, original_path, storage_path, status, summary, created_at, updated_at) |
|
VALUES (%s, %s, %s, %s, %s, %s, NOW(), NOW()) RETURNING id;""", |
|
(original_path, safe_filename, original_path, storage_path, status, summary) |
|
) |
|
document_id = cursor.fetchone()[0] |
|
|
|
if elements: |
|
elements_to_insert = [(document_id, el['element_type'], el['content'], el['page_number'], el['metadata']) for el in elements] |
|
execute_values(cursor, "INSERT INTO document_elements (document_id, element_type, content, page_number, metadata) VALUES %s", elements_to_insert) |
|
|
|
if page_images: |
|
pages_to_insert = [(document_id, p['page_number'], p['image_data']) for p in page_images] |
|
execute_values(cursor, "INSERT INTO document_pages (document_id, page_number, image_data) VALUES %s", pages_to_insert) |
|
|
|
conn.commit() |
|
return f"✅ تمت معالجة '{safe_filename}' ({len(elements)} عنصر نصي, {len(page_images)} صورة)" |
|
|
|
except Exception as e: |
|
log.error(f"❌ خطأ فادح أثناء معالجة {safe_filename}: {e}\n{traceback.format_exc()}") |
|
if conn: conn.rollback() |
|
try: |
|
conn_fail = connection_pool.getconn() |
|
with conn_fail.cursor() as cur_fail: |
|
cur_fail.execute("INSERT INTO documents (source, filename, original_path, status, summary, created_at, updated_at) VALUES (%s, %s, %s, 'failed', %s, NOW(), NOW()) ON CONFLICT (original_path) DO NOTHING", (original_path, safe_filename, original_path, str(e))) |
|
conn_fail.commit() |
|
except Exception as log_fail_e: |
|
log.error(f"فشل حتى في تسجيل الخطأ: {log_fail_e}") |
|
finally: |
|
if 'conn_fail' in locals(): |
|
connection_pool.putconn(conn_fail) |
|
return f"❌ فشلت معالجة {safe_filename}" |
|
finally: |
|
if conn: |
|
connection_pool.putconn(conn) |
|
|
|
def gcs_archive_master_worker(): |
|
if not GCS_ENABLED: return update_job_status("🔴 GCS غير مهيأ!") |
|
update_job_status("⏳ فحص أرشيف GCS...") |
|
conn = None |
|
try: |
|
conn = connection_pool.getconn() |
|
with conn.cursor() as cur: |
|
cur.execute("SELECT original_path FROM documents") |
|
existing_paths = {row[0] for row in cur.fetchall()} |
|
|
|
blobs = list(storage_client.list_blobs(GCS_BUCKET_NAME)) |
|
new_blobs = [b for b in blobs if b.public_url not in existing_paths and b.name.lower().endswith('.pdf')] |
|
|
|
if not new_blobs: return update_job_status("✅ GCS محدث, لا توجد ملفات جديدة.") |
|
update_job_log(f"☁️ تم العثور على {len(new_blobs)} مستند PDF جديد للمعالجة.") |
|
|
|
with ThreadPoolExecutor(max_workers=MAX_WORKERS_DEFAULT) as executor: |
|
future_to_blob = {executor.submit(process_single_pdf_from_gcs, blob): blob for blob in new_blobs} |
|
for future in tqdm(as_completed(future_to_blob), total=len(new_blobs), desc="🗃️ معالجة الأرشيف"): |
|
if background_job["stop_event"].is_set(): break |
|
update_job_log(future.result()) |
|
finally: |
|
if conn: connection_pool.putconn(conn) |
|
with job_lock: background_job["thread"] = None |
|
update_job_status("✅ اكتملت معالجة الأرشيف") |
|
|
|
def crawl_site(start_url: str, page_limit: int, crawl_mode: str) -> List[str]: |
|
"""جمع الروابط من موقع ويب بناءً على وضع الزحف.""" |
|
if not start_url.startswith(('http://', 'https://')): |
|
start_url = 'https://' + start_url |
|
visited = set() |
|
queue = deque([start_url]) |
|
all_links = set() |
|
base_domain = urlparse(start_url).netloc |
|
crawl_all_content = crawl_mode == "الكل" |
|
target_extensions = { |
|
"ملفات PDF فقط": ['.pdf'], |
|
"ملفات DOC/DOCX فقط": ['.doc', '.docx'], |
|
"ملفات XLS/XLSX فقط": ['.xls', '.xlsx'], |
|
"ملفات TXT/MD فقط": ['.txt', '.md'], |
|
"الكل": ['.pdf', '.doc', '.docx', '.xls', '.xlsx', '.txt', '.md'] |
|
}[crawl_mode] |
|
pbar = tqdm(total=page_limit, desc=f"🔍 جمع ({crawl_mode})") |
|
session = requests.Session() |
|
session.headers.update(REQUEST_HEADERS) |
|
|
|
while queue and len(all_links) < page_limit: |
|
if background_job["stop_event"].is_set(): |
|
break |
|
url = queue.popleft() |
|
if url in visited: |
|
continue |
|
visited.add(url) |
|
try: |
|
resp = session.get(url, timeout=20) |
|
if resp.status_code != 200: |
|
continue |
|
soup = BeautifulSoup(resp.text, "lxml", from_encoding="utf-8") |
|
for a_tag in soup.find_all("a", href=True): |
|
href = urljoin(url, a_tag["href"]).split("#")[0].strip() |
|
if urlparse(href).netloc == base_domain and href not in visited: |
|
if any(href.lower().endswith(ext) for ext in target_extensions): |
|
if len(all_links) < page_limit: |
|
all_links.add(href) |
|
pbar.update(1) |
|
if crawl_all_content or not any(href.lower().endswith(ext) for ext in target_extensions): |
|
queue.append(href) |
|
except Exception as e: |
|
log.error(f"❌ فشل في زحف {url}: {e}") |
|
update_job_log(f"فشل في زحف {url}: {e}") |
|
pbar.close() |
|
return list(all_links) |
|
|
|
def process_single_document(link: str, content_type: str) -> str: |
|
"""معالجة مستند واحد (PDF, HTML, DOCX, Excel) وتخزينه.""" |
|
conn = None |
|
original_path = link |
|
safe_filename = unquote(os.path.basename(urlparse(link).path)) |
|
try: |
|
resp = requests.get(link, timeout=60, headers=REQUEST_HEADERS) |
|
resp.raise_for_status() |
|
content_bytes = resp.content |
|
|
|
conn = connection_pool.getconn() |
|
with conn.cursor() as cursor: |
|
cursor.execute("SELECT id FROM documents WHERE original_path = %s;", (original_path,)) |
|
if cursor.fetchone(): |
|
return f"🟡 (تخطي) موجود مسبقاً: {safe_filename}" |
|
|
|
|
|
content_type = content_type.lower() |
|
if 'pdf' in content_type: |
|
page_images = generate_page_images_as_binary(content_bytes) |
|
elements = decompose_pdf(content_bytes, page_images) |
|
elif 'html' in content_type: |
|
elements = decompose_html(content_bytes) |
|
elif 'docx' in content_type or 'doc' in content_type: |
|
elements = decompose_docx(content_bytes) |
|
elif 'xlsx' in content_type or 'xls' in content_type: |
|
elements = decompose_excel(content_bytes) |
|
elif 'text' in content_type or any(content_type.endswith(ext) for ext in ['.txt', '.md']): |
|
file_ext = os.path.splitext(safe_filename)[1].lower() |
|
elements = decompose_text_or_md(content_bytes, file_ext) |
|
else: |
|
raise ValueError(f"نوع محتوى غير مدعوم: {content_type}") |
|
|
|
cover_image_url = generate_and_upload_cover_image(content_bytes, safe_filename) if 'pdf' in content_type else None |
|
|
|
if not elements and (not page_images if 'pdf' in content_type else True): |
|
raise ValueError("الملف فارغ أو تالف، لم يتم استخراج نصوص أو صور.") |
|
|
|
if not storage_client or not GCS_ENABLED: |
|
raise ValueError("GCS غير مهيأ، لا يمكن رفع الملف.") |
|
|
|
bucket = storage_client.bucket(GCS_BUCKET_NAME) |
|
document_uuid = str(uuid.uuid4()) |
|
blob_name = f"documents/{document_uuid}/{safe_filename}" |
|
pdf_blob = bucket.blob(blob_name) |
|
pdf_blob.upload_from_string(content_bytes, content_type=content_type) |
|
pdf_blob.make_public() |
|
storage_path = pdf_blob.public_url |
|
|
|
with conn.cursor() as cursor: |
|
if elements: |
|
status = 'pending' |
|
summary = elements[0]['content'][:500] |
|
else: |
|
status = 'completed_no_text' |
|
summary = f"مستند صور فقط يحتوي على {len(page_images)} صفحة." if 'pdf' in content_type else "لا يحتوي على نص" |
|
|
|
cursor.execute( |
|
"""INSERT INTO documents (source, filename, original_path, storage_path, cover_image_path, status, summary, created_at, updated_at) |
|
VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW()) RETURNING id;""", |
|
(original_path, safe_filename, original_path, storage_path, cover_image_url, status, summary) |
|
) |
|
document_id = cursor.fetchone()[0] |
|
|
|
if elements: |
|
elements_to_insert = [(document_id, el['element_type'], el['content'], el['page_number'], el['metadata']) for el in elements] |
|
execute_values(cursor, |
|
"INSERT INTO document_elements (document_id, element_type, content, page_number, metadata) VALUES %s", |
|
elements_to_insert) |
|
|
|
if 'pdf' in content_type and page_images: |
|
pages_to_insert = [(document_id, p['page_number'], p['image_data']) for p in page_images] |
|
execute_values(cursor, |
|
"INSERT INTO document_pages (document_id, page_number, image_data) VALUES %s", |
|
pages_to_insert) |
|
|
|
conn.commit() |
|
return f"✅ تمت معالجة '{safe_filename}' ({len(elements)} عنصر نصي, {len(page_images)} صورة)" if 'pdf' in content_type else f"✅ تمت معالجة '{safe_filename}' ({len(elements)} عنصر نصي)" |
|
except Exception as e: |
|
log.error(f"❌ خطأ فادح أثناء معالجة {safe_filename}: {e}") |
|
if conn: |
|
conn.rollback() |
|
try: |
|
conn_fail = conn if conn else connection_pool.getconn() |
|
with conn_fail.cursor() as cur_fail: |
|
cur_fail.execute( |
|
"""INSERT INTO documents (source, filename, original_path, status, summary, created_at, updated_at) |
|
VALUES (%s, %s, %s, 'failed', %s, NOW(), NOW()) |
|
ON CONFLICT (original_path) DO NOTHING""", |
|
(original_path, safe_filename, original_path, str(e)) |
|
) |
|
conn_fail.commit() |
|
return f"❌ فشلت معالجة {safe_filename}" |
|
except Exception as log_fail_e: |
|
log.error(f"فشل في تسجيل الخطأ: {log_fail_e}") |
|
return f"❌ فشلت معالجة {safe_filename} وتسجيل الخطأ" |
|
finally: |
|
if 'conn_fail' in locals() and conn_fail and not conn: |
|
connection_pool.putconn(conn_fail) |
|
finally: |
|
if conn: |
|
connection_pool.putconn(conn) |
|
|
|
def crawler_master_worker(start_url: str, page_limit: int, crawl_mode: str): |
|
"""إدارة عملية الزحف ومعالجة المستندات.""" |
|
update_job_status("⏳ بدء الزحف...") |
|
update_job_log(f"🌐 بدء الزحف من: {start_url}") |
|
links = crawl_site(start_url, page_limit, crawl_mode) |
|
if not links: |
|
update_job_status("✅ لا توجد روابط جديدة.") |
|
return |
|
update_job_log(f"🔗 تم العثور على {len(links)} رابط للمعالجة.") |
|
|
|
with ThreadPoolExecutor(max_workers=MAX_WORKERS_DEFAULT) as executor: |
|
futures = [] |
|
for link in links: |
|
try: |
|
headers = requests.head(link, timeout=20, headers=REQUEST_HEADERS).headers |
|
content_type = headers.get('Content-Type', '') |
|
futures.append(executor.submit(process_single_document, link, content_type)) |
|
except Exception as e: |
|
update_job_log(f"❌ فشل في جلب رأس {link}: {e}") |
|
for future in as_completed(futures): |
|
if background_job["stop_event"].is_set(): |
|
break |
|
update_job_log(future.result()) |
|
update_job_status("✅ اكتمل الزحف والمعالجة") |
|
|
|
def elements_embedding_worker(): |
|
conn = None |
|
try: |
|
update_job_status("⏳ بدء إنشاء المتجهات...") |
|
conn = connection_pool.getconn() |
|
with conn.cursor(cursor_factory=RealDictCursor) as cur: |
|
cur.execute("SELECT id, content FROM document_elements WHERE embedding IS NULL") |
|
elements = cur.fetchall() |
|
if not elements: return update_job_status("✅ لا توجد عناصر جديدة لإنشاء متجه لها.") |
|
update_job_log(f"🧠 تم العثور على {len(elements):,} عنصر للمعالجة.") |
|
|
|
for element in tqdm(elements, desc="🧠 إنشاء المتجهات"): |
|
if background_job["stop_event"].is_set(): break |
|
embedding = get_embedding_for_text(element['content']) |
|
if embedding: |
|
with conn.cursor() as update_cur: |
|
update_cur.execute("UPDATE document_elements SET embedding = %s WHERE id = %s", (embedding, element['id'])) |
|
conn.commit() |
|
|
|
with conn.cursor() as cur: |
|
cur.execute("UPDATE documents d SET status = 'completed' WHERE d.status = 'pending' AND NOT EXISTS (SELECT 1 FROM document_elements de WHERE de.document_id = d.id AND de.embedding IS NULL)") |
|
conn.commit() |
|
finally: |
|
if conn: connection_pool.putconn(conn) |
|
with job_lock: background_job["thread"] = None |
|
update_job_status("✅ اكتمل إنشاء المتجهات") |
|
|
|
|
|
|
|
|
|
def get_dashboard_stats(): |
|
conn = None |
|
try: |
|
conn = connection_pool.getconn() |
|
with conn.cursor(cursor_factory=RealDictCursor) as cur: |
|
cur.execute("SELECT status, COUNT(*) as count FROM documents GROUP BY status;") |
|
stats = {row['status']: row['count'] for row in cur.fetchall()} |
|
cur.execute("SELECT COUNT(*) as element_count, COUNT(embedding) as embedded_count FROM document_elements;") |
|
element_stats = cur.fetchone() or {} |
|
total = sum(stats.values()) |
|
pending = stats.get('pending', 0) |
|
completed = stats.get('completed', 0) |
|
failed = stats.get('failed', 0) |
|
completed_no_text = stats.get('completed_no_text', 0) |
|
stats_md = f"""### 📊 إحصائيات قاعدة البيانات\n| الفئة | العدد |\n|---|---|\n| **إجمالي المستندات** | **{total:,}** |\n| ⏳ قيد الإنشاء | {pending:,} |\n| ✅ مكتمل (مع نص) | {completed:,} |\n| 🖼️ مكتمل (صور فقط) | {completed_no_text:,} |\n| 🔴 فاشل | {failed:,} |\n| **إجمالي العناصر النصية** | **{element_stats.get('element_count', 0):,}** |\n| 🔗 عناصر مع متجه | {element_stats.get('embedded_count', 0):,} |""" |
|
return stats_md |
|
except Exception as e: return f"### خطأ: {e}" |
|
finally: |
|
if conn: connection_pool.putconn(conn) |
|
|
|
def clear_database_wrapper(confirmation: bool): |
|
if not confirmation: return "⚠️ تم إلغاء العملية." |
|
conn = None |
|
try: |
|
conn = connection_pool.getconn() |
|
with conn.cursor() as cur: |
|
cur.execute("TRUNCATE TABLE documents RESTART IDENTITY CASCADE;") |
|
conn.commit() |
|
return "✅ تم مسح جميع البيانات بنجاح" |
|
except Exception as e: |
|
if conn: conn.rollback() |
|
return f"❌ خطأ أثناء المسح: {e}" |
|
finally: |
|
if conn: connection_pool.putconn(conn) |
|
|
|
def clear_failed_documents_wrapper(): |
|
conn = None |
|
try: |
|
conn = connection_pool.getconn() |
|
with conn.cursor() as cur: |
|
cur.execute("DELETE FROM documents WHERE status = 'failed';") |
|
deleted_count = cur.rowcount |
|
conn.commit() |
|
msg = f"✅ تم حذف {deleted_count} مستند من المستندات الفاشلة." |
|
log.info(msg) |
|
return msg |
|
except Exception as e: |
|
if conn: conn.rollback() |
|
msg = f"❌ خطأ أثناء حذف المستندات الفاشلة: {e}" |
|
log.error(msg) |
|
return msg |
|
finally: |
|
if conn: connection_pool.putconn(conn) |
|
|
|
|
|
|
|
|
|
def start_job_wrapper(job_type: str, *args): |
|
with job_lock: |
|
if background_job["thread"] and background_job["thread"].is_alive(): return "⏳ يوجد عملية قيد التنفيذ." |
|
target_func_map = { |
|
'embedding_processor': elements_embedding_worker, |
|
'gcs_processor': gcs_archive_master_worker, |
|
'crawler_pdf': lambda url, limit: crawler_master_worker(url, limit, "ملفات PDF فقط"), |
|
'crawler_doc': lambda url, limit: crawler_master_worker(url, limit, "ملفات DOC/DOCX فقط"), |
|
'crawler_xls': lambda url, limit: crawler_master_worker(url, limit, "ملفات XLS/XLSX فقط"), |
|
'crawler_txt_md': lambda url, limit: crawler_master_worker(url, limit, "ملفات TXT/MD فقط"), |
|
'crawler_all': lambda url, limit: crawler_master_worker(url, limit, "الكل") |
|
} |
|
if job_type not in target_func_map: return "🔴 نوع مهمة غير معروف" |
|
background_job.update({ |
|
"thread": threading.Thread(target=target_func_map[job_type], args=args, daemon=True), |
|
"log": [f"🚀 بدء مهمة: {job_type}..."], |
|
"status": "🚀 يستعد", |
|
"stop_event": threading.Event(), |
|
"type": job_type |
|
}) |
|
background_job["thread"].start() |
|
return f"🚀 بدأت عملية {job_type}!" |
|
|
|
def stop_job_wrapper(): |
|
with job_lock: |
|
if not (background_job["thread"] and background_job["thread"].is_alive()): return "ℹ️ لا توجد عملية لإيقافها." |
|
background_job["stop_event"].set() |
|
update_job_status("🛑 جاري الإيقاف...") |
|
return "🛑 تم إرسال إشارة الإيقاف." |
|
|
|
def build_ui(): |
|
with gr.Blocks(theme=gr.themes.Soft(), title="TawasoaAi OCR v18.4") as demo: |
|
gr.Markdown("# 🚀 **TawasoaAi v18.4** - المعالجة فائقة الذكاء (OCR)") |
|
with gr.Row(): |
|
status_indicator = gr.Textbox(label="حالة النظام", interactive=False) |
|
stop_btn = gr.Button("🛑 إيقاف العملية", variant="stop", visible=False) |
|
with gr.Tabs(): |
|
with gr.Tab("📊 لوحة التحكم"): |
|
stats_display = gr.Markdown("...") |
|
refresh_stats_btn = gr.Button("🔄 تحديث الإحصائيات") |
|
with gr.Tab("🗃️ معالجة أرشيف GCS"): |
|
log_gcs = gr.Textbox(label="سجل معالجة الأرشيف", lines=20, interactive=False, autoscroll=True) |
|
process_gcs_btn = gr.Button("🚀 ابدأ معالجة ملفات GCS", variant="primary") |
|
with gr.Tab("🧠 إنشاء المتجهات"): |
|
log_embed = gr.Textbox(label="سجل إنشاء المتجهات", lines=20, interactive=False, autoscroll=True) |
|
process_embed_btn = gr.Button("🧠 ابدأ إنشاء المتجهات", variant="primary") |
|
with gr.Tab("🌐 الزحف على المواقع"): |
|
with gr.Row(): |
|
crawl_url = gr.Textbox(label="رابط الموقع", placeholder="أدخل رابط الموقع (مثال: https://example.com)") |
|
crawl_limit = gr.Number(label="الحد الأقصى لعدد الملفات", value=100, minimum=1, maximum=1000) |
|
with gr.Row(): |
|
crawl_pdf_btn = gr.Button("📄 زحف على ملفات PDF فقط", variant="secondary") |
|
crawl_doc_btn = gr.Button("📝 زحف على ملفات DOC/DOCX فقط", variant="secondary") |
|
crawl_xls_btn = gr.Button("📊 زحف على ملفات XLS/XLSX فقط", variant="secondary") |
|
crawl_txt_md_btn = gr.Button("📄 زحف على ملفات TXT/MD فقط", variant="secondary") |
|
crawl_all_btn = gr.Button("🌐 زحف على الكل", variant="primary") |
|
log_crawl = gr.Textbox(label="سجل الزحف", lines=20, interactive=False, autoscroll=True) |
|
with gr.Tab("🔧 صيانة"): |
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
gr.Markdown("### 🗑️ حذف **كل** البيانات") |
|
confirm_checkbox_all = gr.Checkbox(label="✅ أؤكد حذف جميع البيانات نهائياً") |
|
clear_all_btn = gr.Button("🗑️ مسح كل شيء الآن", variant="stop", interactive=False) |
|
clear_all_output = gr.Textbox(label="النتيجة", interactive=False) |
|
with gr.Column(scale=1): |
|
gr.Markdown("### 🧹 حذف المستندات **الفاشلة** فقط") |
|
gr.Markdown("استخدم هذا الخيار للتخلص من السجلات التي فشلت أثناء المعالجة والمحاولة مرة أخرى.") |
|
clear_failed_btn = gr.Button("🧹 مسح المستندات الفاشلة", variant="secondary") |
|
clear_failed_output = gr.Textbox(label="النتيجة", interactive=False) |
|
|
|
def update_interface(): |
|
log, status, is_running = get_job_status() |
|
log_text = "\n".join(background_job["log"][-100:]) |
|
status = background_job["status"] |
|
is_running = background_job["thread"] is not None and background_job["thread"].is_alive() |
|
return log_text, status, is_running |
|
|
|
demo.load(update_interface, outputs=[log_crawl, status_indicator, stop_btn]) |
|
refresh_stats_btn.click(get_dashboard_stats, outputs=stats_display) |
|
stop_btn.click(stop_job_wrapper, outputs=status_indicator) |
|
process_gcs_btn.click(lambda: start_job_wrapper("gcs_processor"), outputs=status_indicator) |
|
process_embed_btn.click(lambda: start_job_wrapper("embedding_processor"), outputs=status_indicator) |
|
confirm_checkbox_all.change(lambda x: gr.update(interactive=x), inputs=confirm_checkbox_all, outputs=clear_all_btn) |
|
clear_all_btn.click(clear_database_wrapper, inputs=confirm_checkbox_all, outputs=clear_all_output).then(get_dashboard_stats, outputs=stats_display) |
|
clear_failed_btn.click(clear_failed_documents_wrapper, outputs=clear_failed_output).then(get_dashboard_stats, outputs=stats_display) |
|
crawl_pdf_btn.click(lambda url, limit: start_job_wrapper("crawler_pdf", url, limit), inputs=[crawl_url, crawl_limit], outputs=status_indicator) |
|
crawl_doc_btn.click(lambda url, limit: start_job_wrapper("crawler_doc", url, limit), inputs=[crawl_url, crawl_limit], outputs=status_indicator) |
|
crawl_xls_btn.click(lambda url, limit: start_job_wrapper("crawler_xls", url, limit), inputs=[crawl_url, crawl_limit], outputs=status_indicator) |
|
crawl_txt_md_btn.click(lambda url, limit: start_job_wrapper("crawler_txt_md", url, limit), inputs=[crawl_url, crawl_limit], outputs=status_indicator) |
|
crawl_all_btn.click(lambda url, limit: start_job_wrapper("crawler_all", url, limit), inputs=[crawl_url, crawl_limit], outputs=status_indicator) |
|
|
|
return demo |
|
|
|
|
|
|
|
|
|
def graceful_shutdown(signum, frame): |
|
log.info("SIGINT/SIGTERM received. Shutting down...") |
|
stop_job_wrapper() |
|
if connection_pool: connection_pool.closeall() |
|
sys.exit(0) |
|
|
|
if __name__ == "__main__": |
|
signal.signal(signal.SIGINT, graceful_shutdown) |
|
signal.signal(signal.SIGTERM, graceful_shutdown) |
|
if not initialize_components(): |
|
sys.exit(1) |
|
|
|
ui = build_ui() |
|
ui.queue().launch(server_name="0.0.0.0", server_port=7860, share=False) |
|
|