|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import logging |
|
import os |
|
import random |
|
|
|
import xgboost as xgb |
|
from io import BytesIO |
|
import re |
|
import pdfplumber |
|
from PIL import Image |
|
import numpy as np |
|
from pypdf import PdfReader as pdf2_read |
|
|
|
from api import settings |
|
from api.utils.file_utils import get_project_base_directory |
|
from deepdoc.vision import OCR, Recognizer, LayoutRecognizer, TableStructureRecognizer |
|
from rag.nlp import rag_tokenizer |
|
from copy import deepcopy |
|
from huggingface_hub import snapshot_download |
|
|
|
class RAGFlowPdfParser: |
|
def __init__(self): |
|
self.ocr = OCR() |
|
if hasattr(self, "model_speciess"): |
|
self.layouter = LayoutRecognizer("layout." + self.model_speciess) |
|
else: |
|
self.layouter = LayoutRecognizer("layout") |
|
self.tbl_det = TableStructureRecognizer() |
|
|
|
self.updown_cnt_mdl = xgb.Booster() |
|
if not settings.LIGHTEN: |
|
try: |
|
import torch |
|
if torch.cuda.is_available(): |
|
self.updown_cnt_mdl.set_param({"device": "cuda"}) |
|
except Exception: |
|
logging.exception("RAGFlowPdfParser __init__") |
|
try: |
|
model_dir = os.path.join( |
|
get_project_base_directory(), |
|
"rag/res/deepdoc") |
|
self.updown_cnt_mdl.load_model(os.path.join( |
|
model_dir, "updown_concat_xgb.model")) |
|
except Exception: |
|
model_dir = snapshot_download( |
|
repo_id="InfiniFlow/text_concat_xgb_v1.0", |
|
local_dir=os.path.join(get_project_base_directory(), "rag/res/deepdoc"), |
|
local_dir_use_symlinks=False) |
|
self.updown_cnt_mdl.load_model(os.path.join( |
|
model_dir, "updown_concat_xgb.model")) |
|
|
|
self.page_from = 0 |
|
""" |
|
If you have trouble downloading HuggingFace models, -_^ this might help!! |
|
|
|
For Linux: |
|
export HF_ENDPOINT=https://hf-mirror.com |
|
|
|
For Windows: |
|
Good luck |
|
^_- |
|
|
|
""" |
|
|
|
def __char_width(self, c): |
|
return (c["x1"] - c["x0"]) // max(len(c["text"]), 1) |
|
|
|
def __height(self, c): |
|
return c["bottom"] - c["top"] |
|
|
|
def _x_dis(self, a, b): |
|
return min(abs(a["x1"] - b["x0"]), abs(a["x0"] - b["x1"]), |
|
abs(a["x0"] + a["x1"] - b["x0"] - b["x1"]) / 2) |
|
|
|
def _y_dis( |
|
self, a, b): |
|
return ( |
|
b["top"] + b["bottom"] - a["top"] - a["bottom"]) / 2 |
|
|
|
def _match_proj(self, b): |
|
proj_patt = [ |
|
r"第[零一二三四五六七八九十百]+章", |
|
r"第[零一二三四五六七八九十百]+[条节]", |
|
r"[零一二三四五六七八九十百]+[、是 ]", |
|
r"[\((][零一二三四五六七八九十百]+[)\)]", |
|
r"[\((][0-9]+[)\)]", |
|
r"[0-9]+(、|\.[ ]|)|\.[^0-9./a-zA-Z_%><-]{4,})", |
|
r"[0-9]+\.[0-9.]+(、|\.[ ])", |
|
r"[⚫•➢①② ]", |
|
] |
|
return any([re.match(p, b["text"]) for p in proj_patt]) |
|
|
|
def _updown_concat_features(self, up, down): |
|
w = max(self.__char_width(up), self.__char_width(down)) |
|
h = max(self.__height(up), self.__height(down)) |
|
y_dis = self._y_dis(up, down) |
|
LEN = 6 |
|
tks_down = rag_tokenizer.tokenize(down["text"][:LEN]).split() |
|
tks_up = rag_tokenizer.tokenize(up["text"][-LEN:]).split() |
|
tks_all = up["text"][-LEN:].strip() \ |
|
+ (" " if re.match(r"[a-zA-Z0-9]+", |
|
up["text"][-1] + down["text"][0]) else "") \ |
|
+ down["text"][:LEN].strip() |
|
tks_all = rag_tokenizer.tokenize(tks_all).split() |
|
fea = [ |
|
up.get("R", -1) == down.get("R", -1), |
|
y_dis / h, |
|
down["page_number"] - up["page_number"], |
|
up["layout_type"] == down["layout_type"], |
|
up["layout_type"] == "text", |
|
down["layout_type"] == "text", |
|
up["layout_type"] == "table", |
|
down["layout_type"] == "table", |
|
True if re.search( |
|
r"([。?!;!?;+))]|[a-z]\.)$", |
|
up["text"]) else False, |
|
True if re.search(r"[,:‘“、0-9(+-]$", up["text"]) else False, |
|
True if re.search( |
|
r"(^.?[/,?;:\],。;:’”?!》】)-])", |
|
down["text"]) else False, |
|
True if re.match(r"[\((][^\(\)()]+[)\)]$", up["text"]) else False, |
|
True if re.search(r"[,,][^。.]+$", up["text"]) else False, |
|
True if re.search(r"[,,][^。.]+$", up["text"]) else False, |
|
True if re.search(r"[\((][^\))]+$", up["text"]) |
|
and re.search(r"[\))]", down["text"]) else False, |
|
self._match_proj(down), |
|
True if re.match(r"[A-Z]", down["text"]) else False, |
|
True if re.match(r"[A-Z]", up["text"][-1]) else False, |
|
True if re.match(r"[a-z0-9]", up["text"][-1]) else False, |
|
True if re.match(r"[0-9.%,-]+$", down["text"]) else False, |
|
up["text"].strip()[-2:] == down["text"].strip()[-2:] if len(up["text"].strip() |
|
) > 1 and len( |
|
down["text"].strip()) > 1 else False, |
|
up["x0"] > down["x1"], |
|
abs(self.__height(up) - self.__height(down)) / min(self.__height(up), |
|
self.__height(down)), |
|
self._x_dis(up, down) / max(w, 0.000001), |
|
(len(up["text"]) - len(down["text"])) / |
|
max(len(up["text"]), len(down["text"])), |
|
len(tks_all) - len(tks_up) - len(tks_down), |
|
len(tks_down) - len(tks_up), |
|
tks_down[-1] == tks_up[-1] if tks_down and tks_up else False, |
|
max(down["in_row"], up["in_row"]), |
|
abs(down["in_row"] - up["in_row"]), |
|
len(tks_down) == 1 and rag_tokenizer.tag(tks_down[0]).find("n") >= 0, |
|
len(tks_up) == 1 and rag_tokenizer.tag(tks_up[0]).find("n") >= 0 |
|
] |
|
return fea |
|
|
|
@staticmethod |
|
def sort_X_by_page(arr, threashold): |
|
|
|
arr = sorted(arr, key=lambda r: (r["page_number"], r["x0"], r["top"])) |
|
for i in range(len(arr) - 1): |
|
for j in range(i, -1, -1): |
|
|
|
if abs(arr[j + 1]["x0"] - arr[j]["x0"]) < threashold \ |
|
and arr[j + 1]["top"] < arr[j]["top"] \ |
|
and arr[j + 1]["page_number"] == arr[j]["page_number"]: |
|
tmp = arr[j] |
|
arr[j] = arr[j + 1] |
|
arr[j + 1] = tmp |
|
return arr |
|
|
|
def _has_color(self, o): |
|
if o.get("ncs", "") == "DeviceGray": |
|
if o["stroking_color"] and o["stroking_color"][0] == 1 and o["non_stroking_color"] and \ |
|
o["non_stroking_color"][0] == 1: |
|
if re.match(r"[a-zT_\[\]\(\)-]+", o.get("text", "")): |
|
return False |
|
return True |
|
|
|
def _table_transformer_job(self, ZM): |
|
logging.debug("Table processing...") |
|
imgs, pos = [], [] |
|
tbcnt = [0] |
|
MARGIN = 10 |
|
self.tb_cpns = [] |
|
assert len(self.page_layout) == len(self.page_images) |
|
for p, tbls in enumerate(self.page_layout): |
|
tbls = [f for f in tbls if f["type"] == "table"] |
|
tbcnt.append(len(tbls)) |
|
if not tbls: |
|
continue |
|
for tb in tbls: |
|
left, top, right, bott = tb["x0"] - MARGIN, tb["top"] - MARGIN, \ |
|
tb["x1"] + MARGIN, tb["bottom"] + MARGIN |
|
left *= ZM |
|
top *= ZM |
|
right *= ZM |
|
bott *= ZM |
|
pos.append((left, top)) |
|
imgs.append(self.page_images[p].crop((left, top, right, bott))) |
|
|
|
assert len(self.page_images) == len(tbcnt) - 1 |
|
if not imgs: |
|
return |
|
recos = self.tbl_det(imgs) |
|
tbcnt = np.cumsum(tbcnt) |
|
for i in range(len(tbcnt) - 1): |
|
pg = [] |
|
for j, tb_items in enumerate( |
|
recos[tbcnt[i]: tbcnt[i + 1]]): |
|
poss = pos[tbcnt[i]: tbcnt[i + 1]] |
|
for it in tb_items: |
|
it["x0"] = (it["x0"] + poss[j][0]) |
|
it["x1"] = (it["x1"] + poss[j][0]) |
|
it["top"] = (it["top"] + poss[j][1]) |
|
it["bottom"] = (it["bottom"] + poss[j][1]) |
|
for n in ["x0", "x1", "top", "bottom"]: |
|
it[n] /= ZM |
|
it["top"] += self.page_cum_height[i] |
|
it["bottom"] += self.page_cum_height[i] |
|
it["pn"] = i |
|
it["layoutno"] = j |
|
pg.append(it) |
|
self.tb_cpns.extend(pg) |
|
|
|
def gather(kwd, fzy=10, ption=0.6): |
|
eles = Recognizer.sort_Y_firstly( |
|
[r for r in self.tb_cpns if re.match(kwd, r["label"])], fzy) |
|
eles = Recognizer.layouts_cleanup(self.boxes, eles, 5, ption) |
|
return Recognizer.sort_Y_firstly(eles, 0) |
|
|
|
|
|
headers = gather(r".*header$") |
|
rows = gather(r".* (row|header)") |
|
spans = gather(r".*spanning") |
|
clmns = sorted([r for r in self.tb_cpns if re.match( |
|
r"table column$", r["label"])], key=lambda x: (x["pn"], x["layoutno"], x["x0"])) |
|
clmns = Recognizer.layouts_cleanup(self.boxes, clmns, 5, 0.5) |
|
for b in self.boxes: |
|
if b.get("layout_type", "") != "table": |
|
continue |
|
ii = Recognizer.find_overlapped_with_threashold(b, rows, thr=0.3) |
|
if ii is not None: |
|
b["R"] = ii |
|
b["R_top"] = rows[ii]["top"] |
|
b["R_bott"] = rows[ii]["bottom"] |
|
|
|
ii = Recognizer.find_overlapped_with_threashold( |
|
b, headers, thr=0.3) |
|
if ii is not None: |
|
b["H_top"] = headers[ii]["top"] |
|
b["H_bott"] = headers[ii]["bottom"] |
|
b["H_left"] = headers[ii]["x0"] |
|
b["H_right"] = headers[ii]["x1"] |
|
b["H"] = ii |
|
|
|
ii = Recognizer.find_horizontally_tightest_fit(b, clmns) |
|
if ii is not None: |
|
b["C"] = ii |
|
b["C_left"] = clmns[ii]["x0"] |
|
b["C_right"] = clmns[ii]["x1"] |
|
|
|
ii = Recognizer.find_overlapped_with_threashold(b, spans, thr=0.3) |
|
if ii is not None: |
|
b["H_top"] = spans[ii]["top"] |
|
b["H_bott"] = spans[ii]["bottom"] |
|
b["H_left"] = spans[ii]["x0"] |
|
b["H_right"] = spans[ii]["x1"] |
|
b["SP"] = ii |
|
|
|
def __ocr(self, pagenum, img, chars, ZM=3): |
|
bxs = self.ocr.detect(np.array(img)) |
|
if not bxs: |
|
self.boxes.append([]) |
|
return |
|
bxs = [(line[0], line[1][0]) for line in bxs] |
|
bxs = Recognizer.sort_Y_firstly( |
|
[{"x0": b[0][0] / ZM, "x1": b[1][0] / ZM, |
|
"top": b[0][1] / ZM, "text": "", "txt": t, |
|
"bottom": b[-1][1] / ZM, |
|
"page_number": pagenum} for b, t in bxs if b[0][0] <= b[1][0] and b[0][1] <= b[-1][1]], |
|
self.mean_height[-1] / 3 |
|
) |
|
|
|
|
|
for c in Recognizer.sort_Y_firstly( |
|
chars, self.mean_height[pagenum - 1] // 4): |
|
ii = Recognizer.find_overlapped(c, bxs) |
|
if ii is None: |
|
self.lefted_chars.append(c) |
|
continue |
|
ch = c["bottom"] - c["top"] |
|
bh = bxs[ii]["bottom"] - bxs[ii]["top"] |
|
if abs(ch - bh) / max(ch, bh) >= 0.7 and c["text"] != ' ': |
|
self.lefted_chars.append(c) |
|
continue |
|
if c["text"] == " " and bxs[ii]["text"]: |
|
if re.match(r"[0-9a-zA-Zа-яА-Я,.?;:!%%]", bxs[ii]["text"][-1]): |
|
bxs[ii]["text"] += " " |
|
else: |
|
bxs[ii]["text"] += c["text"] |
|
|
|
for b in bxs: |
|
if not b["text"]: |
|
left, right, top, bott = b["x0"] * ZM, b["x1"] * \ |
|
ZM, b["top"] * ZM, b["bottom"] * ZM |
|
b["text"] = self.ocr.recognize(np.array(img), |
|
np.array([[left, top], [right, top], [right, bott], [left, bott]], |
|
dtype=np.float32)) |
|
del b["txt"] |
|
bxs = [b for b in bxs if b["text"]] |
|
if self.mean_height[-1] == 0: |
|
self.mean_height[-1] = np.median([b["bottom"] - b["top"] |
|
for b in bxs]) |
|
self.boxes.append(bxs) |
|
|
|
def _layouts_rec(self, ZM, drop=True): |
|
assert len(self.page_images) == len(self.boxes) |
|
self.boxes, self.page_layout = self.layouter( |
|
self.page_images, self.boxes, ZM, drop=drop) |
|
|
|
for i in range(len(self.boxes)): |
|
self.boxes[i]["top"] += \ |
|
self.page_cum_height[self.boxes[i]["page_number"] - 1] |
|
self.boxes[i]["bottom"] += \ |
|
self.page_cum_height[self.boxes[i]["page_number"] - 1] |
|
|
|
def _text_merge(self): |
|
|
|
bxs = self.boxes |
|
|
|
def end_with(b, txt): |
|
txt = txt.strip() |
|
tt = b.get("text", "").strip() |
|
return tt and tt.find(txt) == len(tt) - len(txt) |
|
|
|
def start_with(b, txts): |
|
tt = b.get("text", "").strip() |
|
return tt and any([tt.find(t.strip()) == 0 for t in txts]) |
|
|
|
|
|
i = 0 |
|
while i < len(bxs) - 1: |
|
b = bxs[i] |
|
b_ = bxs[i + 1] |
|
if b.get("layoutno", "0") != b_.get("layoutno", "1") or b.get("layout_type", "") in ["table", "figure", |
|
"equation"]: |
|
i += 1 |
|
continue |
|
if abs(self._y_dis(b, b_) |
|
) < self.mean_height[bxs[i]["page_number"] - 1] / 3: |
|
|
|
bxs[i]["x1"] = b_["x1"] |
|
bxs[i]["top"] = (b["top"] + b_["top"]) / 2 |
|
bxs[i]["bottom"] = (b["bottom"] + b_["bottom"]) / 2 |
|
bxs[i]["text"] += b_["text"] |
|
bxs.pop(i + 1) |
|
continue |
|
i += 1 |
|
continue |
|
|
|
dis_thr = 1 |
|
dis = b["x1"] - b_["x0"] |
|
if b.get("layout_type", "") != "text" or b_.get( |
|
"layout_type", "") != "text": |
|
if end_with(b, ",") or start_with(b_, "(,"): |
|
dis_thr = -8 |
|
else: |
|
i += 1 |
|
continue |
|
|
|
if abs(self._y_dis(b, b_)) < self.mean_height[bxs[i]["page_number"] - 1] / 5 \ |
|
and dis >= dis_thr and b["x1"] < b_["x1"]: |
|
|
|
bxs[i]["x1"] = b_["x1"] |
|
bxs[i]["top"] = (b["top"] + b_["top"]) / 2 |
|
bxs[i]["bottom"] = (b["bottom"] + b_["bottom"]) / 2 |
|
bxs[i]["text"] += b_["text"] |
|
bxs.pop(i + 1) |
|
continue |
|
i += 1 |
|
self.boxes = bxs |
|
|
|
def _naive_vertical_merge(self): |
|
bxs = Recognizer.sort_Y_firstly( |
|
self.boxes, np.median( |
|
self.mean_height) / 3) |
|
i = 0 |
|
while i + 1 < len(bxs): |
|
b = bxs[i] |
|
b_ = bxs[i + 1] |
|
if b["page_number"] < b_["page_number"] and re.match( |
|
r"[0-9 •一—-]+$", b["text"]): |
|
bxs.pop(i) |
|
continue |
|
if not b["text"].strip(): |
|
bxs.pop(i) |
|
continue |
|
concatting_feats = [ |
|
b["text"].strip()[-1] in ",;:'\",、‘“;:-", |
|
len(b["text"].strip()) > 1 and b["text"].strip( |
|
)[-2] in ",;:'\",‘“、;:", |
|
b_["text"].strip() and b_["text"].strip()[0] in "。;?!?”)),,、:", |
|
] |
|
|
|
feats = [ |
|
b.get("layoutno", 0) != b_.get("layoutno", 0), |
|
b["text"].strip()[-1] in "。?!?", |
|
self.is_english and b["text"].strip()[-1] in ".!?", |
|
b["page_number"] == b_["page_number"] and b_["top"] - |
|
b["bottom"] > self.mean_height[b["page_number"] - 1] * 1.5, |
|
b["page_number"] < b_["page_number"] and abs( |
|
b["x0"] - b_["x0"]) > self.mean_width[b["page_number"] - 1] * 4, |
|
] |
|
|
|
detach_feats = [b["x1"] < b_["x0"], |
|
b["x0"] > b_["x1"]] |
|
if (any(feats) and not any(concatting_feats)) or any(detach_feats): |
|
logging.debug("{} {} {} {}".format( |
|
b["text"], |
|
b_["text"], |
|
any(feats), |
|
any(concatting_feats), |
|
)) |
|
i += 1 |
|
continue |
|
|
|
b["bottom"] = b_["bottom"] |
|
b["text"] += b_["text"] |
|
b["x0"] = min(b["x0"], b_["x0"]) |
|
b["x1"] = max(b["x1"], b_["x1"]) |
|
bxs.pop(i + 1) |
|
self.boxes = bxs |
|
|
|
def _concat_downward(self, concat_between_pages=True): |
|
|
|
for i in range(len(self.boxes)): |
|
mh = self.mean_height[self.boxes[i]["page_number"] - 1] |
|
self.boxes[i]["in_row"] = 0 |
|
j = max(0, i - 12) |
|
while j < min(i + 12, len(self.boxes)): |
|
if j == i: |
|
j += 1 |
|
continue |
|
ydis = self._y_dis(self.boxes[i], self.boxes[j]) / mh |
|
if abs(ydis) < 1: |
|
self.boxes[i]["in_row"] += 1 |
|
elif ydis > 0: |
|
break |
|
j += 1 |
|
|
|
|
|
boxes = deepcopy(self.boxes) |
|
blocks = [] |
|
while boxes: |
|
chunks = [] |
|
|
|
def dfs(up, dp): |
|
chunks.append(up) |
|
i = dp |
|
while i < min(dp + 12, len(boxes)): |
|
ydis = self._y_dis(up, boxes[i]) |
|
smpg = up["page_number"] == boxes[i]["page_number"] |
|
mh = self.mean_height[up["page_number"] - 1] |
|
mw = self.mean_width[up["page_number"] - 1] |
|
if smpg and ydis > mh * 4: |
|
break |
|
if not smpg and ydis > mh * 16: |
|
break |
|
down = boxes[i] |
|
if not concat_between_pages and down["page_number"] > up["page_number"]: |
|
break |
|
|
|
if up.get("R", "") != down.get( |
|
"R", "") and up["text"][-1] != ",": |
|
i += 1 |
|
continue |
|
|
|
if re.match(r"[0-9]{2,3}/[0-9]{3}$", up["text"]) \ |
|
or re.match(r"[0-9]{2,3}/[0-9]{3}$", down["text"]) \ |
|
or not down["text"].strip(): |
|
i += 1 |
|
continue |
|
|
|
if not down["text"].strip() or not up["text"].strip(): |
|
i += 1 |
|
continue |
|
|
|
if up["x1"] < down["x0"] - 10 * \ |
|
mw or up["x0"] > down["x1"] + 10 * mw: |
|
i += 1 |
|
continue |
|
|
|
if i - dp < 5 and up.get("layout_type") == "text": |
|
if up.get("layoutno", "1") == down.get( |
|
"layoutno", "2"): |
|
dfs(down, i + 1) |
|
boxes.pop(i) |
|
return |
|
i += 1 |
|
continue |
|
|
|
fea = self._updown_concat_features(up, down) |
|
if self.updown_cnt_mdl.predict( |
|
xgb.DMatrix([fea]))[0] <= 0.5: |
|
i += 1 |
|
continue |
|
dfs(down, i + 1) |
|
boxes.pop(i) |
|
return |
|
|
|
dfs(boxes[0], 1) |
|
boxes.pop(0) |
|
if chunks: |
|
blocks.append(chunks) |
|
|
|
|
|
boxes = [] |
|
for b in blocks: |
|
if len(b) == 1: |
|
boxes.append(b[0]) |
|
continue |
|
t = b[0] |
|
for c in b[1:]: |
|
t["text"] = t["text"].strip() |
|
c["text"] = c["text"].strip() |
|
if not c["text"]: |
|
continue |
|
if t["text"] and re.match( |
|
r"[0-9\.a-zA-Z]+$", t["text"][-1] + c["text"][-1]): |
|
t["text"] += " " |
|
t["text"] += c["text"] |
|
t["x0"] = min(t["x0"], c["x0"]) |
|
t["x1"] = max(t["x1"], c["x1"]) |
|
t["page_number"] = min(t["page_number"], c["page_number"]) |
|
t["bottom"] = c["bottom"] |
|
if not t["layout_type"] \ |
|
and c["layout_type"]: |
|
t["layout_type"] = c["layout_type"] |
|
boxes.append(t) |
|
|
|
self.boxes = Recognizer.sort_Y_firstly(boxes, 0) |
|
|
|
def _filter_forpages(self): |
|
if not self.boxes: |
|
return |
|
findit = False |
|
i = 0 |
|
while i < len(self.boxes): |
|
if not re.match(r"(contents|目录|目次|table of contents|致谢|acknowledge)$", |
|
re.sub(r"( | |\u3000)+", "", self.boxes[i]["text"].lower())): |
|
i += 1 |
|
continue |
|
findit = True |
|
eng = re.match( |
|
r"[0-9a-zA-Z :'.-]{5,}", |
|
self.boxes[i]["text"].strip()) |
|
self.boxes.pop(i) |
|
if i >= len(self.boxes): |
|
break |
|
prefix = self.boxes[i]["text"].strip()[:3] if not eng else " ".join( |
|
self.boxes[i]["text"].strip().split()[:2]) |
|
while not prefix: |
|
self.boxes.pop(i) |
|
if i >= len(self.boxes): |
|
break |
|
prefix = self.boxes[i]["text"].strip()[:3] if not eng else " ".join( |
|
self.boxes[i]["text"].strip().split()[:2]) |
|
self.boxes.pop(i) |
|
if i >= len(self.boxes) or not prefix: |
|
break |
|
for j in range(i, min(i + 128, len(self.boxes))): |
|
if not re.match(prefix, self.boxes[j]["text"]): |
|
continue |
|
for k in range(i, j): |
|
self.boxes.pop(i) |
|
break |
|
if findit: |
|
return |
|
|
|
page_dirty = [0] * len(self.page_images) |
|
for b in self.boxes: |
|
if re.search(r"(··|··|··)", b["text"]): |
|
page_dirty[b["page_number"] - 1] += 1 |
|
page_dirty = set([i + 1 for i, t in enumerate(page_dirty) if t > 3]) |
|
if not page_dirty: |
|
return |
|
i = 0 |
|
while i < len(self.boxes): |
|
if self.boxes[i]["page_number"] in page_dirty: |
|
self.boxes.pop(i) |
|
continue |
|
i += 1 |
|
|
|
def _merge_with_same_bullet(self): |
|
i = 0 |
|
while i + 1 < len(self.boxes): |
|
b = self.boxes[i] |
|
b_ = self.boxes[i + 1] |
|
if not b["text"].strip(): |
|
self.boxes.pop(i) |
|
continue |
|
if not b_["text"].strip(): |
|
self.boxes.pop(i + 1) |
|
continue |
|
|
|
if b["text"].strip()[0] != b_["text"].strip()[0] \ |
|
or b["text"].strip()[0].lower() in set("qwertyuopasdfghjklzxcvbnm") \ |
|
or rag_tokenizer.is_chinese(b["text"].strip()[0]) \ |
|
or b["top"] > b_["bottom"]: |
|
i += 1 |
|
continue |
|
b_["text"] = b["text"] + "\n" + b_["text"] |
|
b_["x0"] = min(b["x0"], b_["x0"]) |
|
b_["x1"] = max(b["x1"], b_["x1"]) |
|
b_["top"] = b["top"] |
|
self.boxes.pop(i) |
|
|
|
def _extract_table_figure(self, need_image, ZM, |
|
return_html, need_position): |
|
tables = {} |
|
figures = {} |
|
|
|
i = 0 |
|
lst_lout_no = "" |
|
nomerge_lout_no = [] |
|
while i < len(self.boxes): |
|
if "layoutno" not in self.boxes[i]: |
|
i += 1 |
|
continue |
|
lout_no = str(self.boxes[i]["page_number"]) + \ |
|
"-" + str(self.boxes[i]["layoutno"]) |
|
if TableStructureRecognizer.is_caption(self.boxes[i]) or self.boxes[i]["layout_type"] in ["table caption", |
|
"title", |
|
"figure caption", |
|
"reference"]: |
|
nomerge_lout_no.append(lst_lout_no) |
|
if self.boxes[i]["layout_type"] == "table": |
|
if re.match(r"(数据|资料|图表)*来源[:: ]", self.boxes[i]["text"]): |
|
self.boxes.pop(i) |
|
continue |
|
if lout_no not in tables: |
|
tables[lout_no] = [] |
|
tables[lout_no].append(self.boxes[i]) |
|
self.boxes.pop(i) |
|
lst_lout_no = lout_no |
|
continue |
|
if need_image and self.boxes[i]["layout_type"] == "figure": |
|
if re.match(r"(数据|资料|图表)*来源[:: ]", self.boxes[i]["text"]): |
|
self.boxes.pop(i) |
|
continue |
|
if lout_no not in figures: |
|
figures[lout_no] = [] |
|
figures[lout_no].append(self.boxes[i]) |
|
self.boxes.pop(i) |
|
lst_lout_no = lout_no |
|
continue |
|
i += 1 |
|
|
|
|
|
nomerge_lout_no = set(nomerge_lout_no) |
|
tbls = sorted([(k, bxs) for k, bxs in tables.items()], |
|
key=lambda x: (x[1][0]["top"], x[1][0]["x0"])) |
|
|
|
i = len(tbls) - 1 |
|
while i - 1 >= 0: |
|
k0, bxs0 = tbls[i - 1] |
|
k, bxs = tbls[i] |
|
i -= 1 |
|
if k0 in nomerge_lout_no: |
|
continue |
|
if bxs[0]["page_number"] == bxs0[0]["page_number"]: |
|
continue |
|
if bxs[0]["page_number"] - bxs0[0]["page_number"] > 1: |
|
continue |
|
mh = self.mean_height[bxs[0]["page_number"] - 1] |
|
if self._y_dis(bxs0[-1], bxs[0]) > mh * 23: |
|
continue |
|
tables[k0].extend(tables[k]) |
|
del tables[k] |
|
|
|
def x_overlapped(a, b): |
|
return not any([a["x1"] < b["x0"], a["x0"] > b["x1"]]) |
|
|
|
|
|
i = 0 |
|
while i < len(self.boxes): |
|
c = self.boxes[i] |
|
|
|
if not TableStructureRecognizer.is_caption(c): |
|
i += 1 |
|
continue |
|
|
|
|
|
def nearest(tbls): |
|
nonlocal c |
|
mink = "" |
|
minv = 1000000000 |
|
for k, bxs in tbls.items(): |
|
for b in bxs: |
|
if b.get("layout_type", "").find("caption") >= 0: |
|
continue |
|
y_dis = self._y_dis(c, b) |
|
x_dis = self._x_dis( |
|
c, b) if not x_overlapped( |
|
c, b) else 0 |
|
dis = y_dis * y_dis + x_dis * x_dis |
|
if dis < minv: |
|
mink = k |
|
minv = dis |
|
return mink, minv |
|
|
|
tk, tv = nearest(tables) |
|
fk, fv = nearest(figures) |
|
|
|
|
|
|
|
if tv < fv and tk: |
|
tables[tk].insert(0, c) |
|
logging.debug( |
|
"TABLE:" + |
|
self.boxes[i]["text"] + |
|
"; Cap: " + |
|
tk) |
|
elif fk: |
|
figures[fk].insert(0, c) |
|
logging.debug( |
|
"FIGURE:" + |
|
self.boxes[i]["text"] + |
|
"; Cap: " + |
|
tk) |
|
self.boxes.pop(i) |
|
|
|
res = [] |
|
positions = [] |
|
|
|
def cropout(bxs, ltype, poss): |
|
nonlocal ZM |
|
pn = set([b["page_number"] - 1 for b in bxs]) |
|
if len(pn) < 2: |
|
pn = list(pn)[0] |
|
ht = self.page_cum_height[pn] |
|
b = { |
|
"x0": np.min([b["x0"] for b in bxs]), |
|
"top": np.min([b["top"] for b in bxs]) - ht, |
|
"x1": np.max([b["x1"] for b in bxs]), |
|
"bottom": np.max([b["bottom"] for b in bxs]) - ht |
|
} |
|
louts = [layout for layout in self.page_layout[pn] if layout["type"] == ltype] |
|
ii = Recognizer.find_overlapped(b, louts, naive=True) |
|
if ii is not None: |
|
b = louts[ii] |
|
else: |
|
logging.warning( |
|
f"Missing layout match: {pn + 1},%s" % |
|
(bxs[0].get( |
|
"layoutno", ""))) |
|
|
|
left, top, right, bott = b["x0"], b["top"], b["x1"], b["bottom"] |
|
if right < left: |
|
right = left + 1 |
|
poss.append((pn + self.page_from, left, right, top, bott)) |
|
return self.page_images[pn] \ |
|
.crop((left * ZM, top * ZM, |
|
right * ZM, bott * ZM)) |
|
pn = {} |
|
for b in bxs: |
|
p = b["page_number"] - 1 |
|
if p not in pn: |
|
pn[p] = [] |
|
pn[p].append(b) |
|
pn = sorted(pn.items(), key=lambda x: x[0]) |
|
imgs = [cropout(arr, ltype, poss) for p, arr in pn] |
|
pic = Image.new("RGB", |
|
(int(np.max([i.size[0] for i in imgs])), |
|
int(np.sum([m.size[1] for m in imgs]))), |
|
(245, 245, 245)) |
|
height = 0 |
|
for img in imgs: |
|
pic.paste(img, (0, int(height))) |
|
height += img.size[1] |
|
return pic |
|
|
|
|
|
for k, bxs in figures.items(): |
|
txt = "\n".join([b["text"] for b in bxs]) |
|
if not txt: |
|
continue |
|
|
|
poss = [] |
|
res.append( |
|
(cropout( |
|
bxs, |
|
"figure", poss), |
|
[txt])) |
|
positions.append(poss) |
|
|
|
for k, bxs in tables.items(): |
|
if not bxs: |
|
continue |
|
bxs = Recognizer.sort_Y_firstly(bxs, np.mean( |
|
[(b["bottom"] - b["top"]) / 2 for b in bxs])) |
|
poss = [] |
|
res.append((cropout(bxs, "table", poss), |
|
self.tbl_det.construct_table(bxs, html=return_html, is_english=self.is_english))) |
|
positions.append(poss) |
|
|
|
assert len(positions) == len(res) |
|
|
|
if need_position: |
|
return list(zip(res, positions)) |
|
return res |
|
|
|
def proj_match(self, line): |
|
if len(line) <= 2: |
|
return |
|
if re.match(r"[0-9 ().,%%+/-]+$", line): |
|
return False |
|
for p, j in [ |
|
(r"第[零一二三四五六七八九十百]+章", 1), |
|
(r"第[零一二三四五六七八九十百]+[条节]", 2), |
|
(r"[零一二三四五六七八九十百]+[、 ]", 3), |
|
(r"[\((][零一二三四五六七八九十百]+[)\)]", 4), |
|
(r"[0-9]+(、|\.[ ]|\.[^0-9])", 5), |
|
(r"[0-9]+\.[0-9]+(、|[. ]|[^0-9])", 6), |
|
(r"[0-9]+\.[0-9]+\.[0-9]+(、|[ ]|[^0-9])", 7), |
|
(r"[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+(、|[ ]|[^0-9])", 8), |
|
(r".{,48}[::??]$", 9), |
|
(r"[0-9]+)", 10), |
|
(r"[\((][0-9]+[)\)]", 11), |
|
(r"[零一二三四五六七八九十百]+是", 12), |
|
(r"[⚫•➢✓]", 12) |
|
]: |
|
if re.match(p, line): |
|
return j |
|
return |
|
|
|
def _line_tag(self, bx, ZM): |
|
pn = [bx["page_number"]] |
|
top = bx["top"] - self.page_cum_height[pn[0] - 1] |
|
bott = bx["bottom"] - self.page_cum_height[pn[0] - 1] |
|
page_images_cnt = len(self.page_images) |
|
if pn[-1] - 1 >= page_images_cnt: |
|
return "" |
|
while bott * ZM > self.page_images[pn[-1] - 1].size[1]: |
|
bott -= self.page_images[pn[-1] - 1].size[1] / ZM |
|
pn.append(pn[-1] + 1) |
|
if pn[-1] - 1 >= page_images_cnt: |
|
return "" |
|
|
|
return "@@{}\t{:.1f}\t{:.1f}\t{:.1f}\t{:.1f}##" \ |
|
.format("-".join([str(p) for p in pn]), |
|
bx["x0"], bx["x1"], top, bott) |
|
|
|
def __filterout_scraps(self, boxes, ZM): |
|
|
|
def width(b): |
|
return b["x1"] - b["x0"] |
|
|
|
def height(b): |
|
return b["bottom"] - b["top"] |
|
|
|
def usefull(b): |
|
if b.get("layout_type"): |
|
return True |
|
if width( |
|
b) > self.page_images[b["page_number"] - 1].size[0] / ZM / 3: |
|
return True |
|
if b["bottom"] - b["top"] > self.mean_height[b["page_number"] - 1]: |
|
return True |
|
return False |
|
|
|
res = [] |
|
while boxes: |
|
lines = [] |
|
widths = [] |
|
pw = self.page_images[boxes[0]["page_number"] - 1].size[0] / ZM |
|
mh = self.mean_height[boxes[0]["page_number"] - 1] |
|
mj = self.proj_match( |
|
boxes[0]["text"]) or boxes[0].get( |
|
"layout_type", |
|
"") == "title" |
|
|
|
def dfs(line, st): |
|
nonlocal mh, pw, lines, widths |
|
lines.append(line) |
|
widths.append(width(line)) |
|
mmj = self.proj_match( |
|
line["text"]) or line.get( |
|
"layout_type", |
|
"") == "title" |
|
for i in range(st + 1, min(st + 20, len(boxes))): |
|
if (boxes[i]["page_number"] - line["page_number"]) > 0: |
|
break |
|
if not mmj and self._y_dis( |
|
line, boxes[i]) >= 3 * mh and height(line) < 1.5 * mh: |
|
break |
|
|
|
if not usefull(boxes[i]): |
|
continue |
|
if mmj or \ |
|
(self._x_dis(boxes[i], line) < pw / 10): \ |
|
|
|
|
|
dfs(boxes[i], i) |
|
boxes.pop(i) |
|
break |
|
|
|
try: |
|
if usefull(boxes[0]): |
|
dfs(boxes[0], 0) |
|
else: |
|
logging.debug("WASTE: " + boxes[0]["text"]) |
|
except Exception: |
|
pass |
|
boxes.pop(0) |
|
mw = np.mean(widths) |
|
if mj or mw / pw >= 0.35 or mw > 200: |
|
res.append( |
|
"\n".join([c["text"] + self._line_tag(c, ZM) for c in lines])) |
|
else: |
|
logging.debug("REMOVED: " + |
|
"<<".join([c["text"] for c in lines])) |
|
|
|
return "\n\n".join(res) |
|
|
|
@staticmethod |
|
def total_page_number(fnm, binary=None): |
|
try: |
|
pdf = pdfplumber.open( |
|
fnm) if not binary else pdfplumber.open(BytesIO(binary)) |
|
return len(pdf.pages) |
|
except Exception: |
|
logging.exception("total_page_number") |
|
|
|
def __images__(self, fnm, zoomin=3, page_from=0, |
|
page_to=299, callback=None): |
|
self.lefted_chars = [] |
|
self.mean_height = [] |
|
self.mean_width = [] |
|
self.boxes = [] |
|
self.garbages = {} |
|
self.page_cum_height = [0] |
|
self.page_layout = [] |
|
self.page_from = page_from |
|
try: |
|
self.pdf = pdfplumber.open(fnm) if isinstance( |
|
fnm, str) else pdfplumber.open(BytesIO(fnm)) |
|
self.page_images = [p.to_image(resolution=72 * zoomin).annotated for i, p in |
|
enumerate(self.pdf.pages[page_from:page_to])] |
|
self.page_images_x2 = [p.to_image(resolution=72 * zoomin * 2).annotated for i, p in |
|
enumerate(self.pdf.pages[page_from:page_to])] |
|
try: |
|
self.page_chars = [[{**c, 'top': c['top'], 'bottom': c['bottom']} for c in page.dedupe_chars().chars if self._has_color(c)] for page in self.pdf.pages[page_from:page_to]] |
|
except Exception as e: |
|
logging.warning(f"Failed to extract characters for pages {page_from}-{page_to}: {str(e)}") |
|
self.page_chars = [[] for _ in range(page_to - page_from)] |
|
|
|
self.total_page = len(self.pdf.pages) |
|
except Exception: |
|
logging.exception("RAGFlowPdfParser __images__") |
|
|
|
self.outlines = [] |
|
try: |
|
self.pdf = pdf2_read(fnm if isinstance(fnm, str) else BytesIO(fnm)) |
|
outlines = self.pdf.outline |
|
|
|
def dfs(arr, depth): |
|
for a in arr: |
|
if isinstance(a, dict): |
|
self.outlines.append((a["/Title"], depth)) |
|
continue |
|
dfs(a, depth + 1) |
|
|
|
dfs(outlines, 0) |
|
except Exception as e: |
|
logging.warning(f"Outlines exception: {e}") |
|
if not self.outlines: |
|
logging.warning("Miss outlines") |
|
|
|
logging.debug("Images converted.") |
|
self.is_english = [re.search(r"[a-zA-Z0-9,/¸;:'\[\]\(\)!@#$%^&*\"?<>._-]{30,}", "".join( |
|
random.choices([c["text"] for c in self.page_chars[i]], k=min(100, len(self.page_chars[i]))))) for i in |
|
range(len(self.page_chars))] |
|
if sum([1 if e else 0 for e in self.is_english]) > len( |
|
self.page_images) / 2: |
|
self.is_english = True |
|
else: |
|
self.is_english = False |
|
|
|
|
|
for i, img in enumerate(self.page_images_x2): |
|
chars = self.page_chars[i] if not self.is_english else [] |
|
self.mean_height.append( |
|
np.median(sorted([c["height"] for c in chars])) if chars else 0 |
|
) |
|
self.mean_width.append( |
|
np.median(sorted([c["width"] for c in chars])) if chars else 8 |
|
) |
|
self.page_cum_height.append(img.size[1] / zoomin/2) |
|
j = 0 |
|
while j + 1 < len(chars): |
|
if chars[j]["text"] and chars[j + 1]["text"] \ |
|
and re.match(r"[0-9a-zA-Z,.:;!%]+", chars[j]["text"] + chars[j + 1]["text"]) \ |
|
and chars[j + 1]["x0"] - chars[j]["x1"] >= min(chars[j + 1]["width"], |
|
chars[j]["width"]) / 2: |
|
chars[j]["text"] += " " |
|
j += 1 |
|
|
|
self.__ocr(i + 1, img, chars, zoomin*2) |
|
if callback and i % 6 == 5: |
|
callback(prog=(i + 1) * 0.6 / len(self.page_images), msg="") |
|
|
|
|
|
if not self.is_english and not any( |
|
[c for c in self.page_chars]) and self.boxes: |
|
bxes = [b for bxs in self.boxes for b in bxs] |
|
self.is_english = re.search(r"[\na-zA-Z0-9,/¸;:'\[\]\(\)!@#$%^&*\"?<>._-]{30,}", |
|
"".join([b["text"] for b in random.choices(bxes, k=min(30, len(bxes)))])) |
|
|
|
logging.debug("Is it English:", self.is_english) |
|
|
|
self.page_cum_height = np.cumsum(self.page_cum_height) |
|
assert len(self.page_cum_height) == len(self.page_images) + 1 |
|
if len(self.boxes) == 0 and zoomin < 9: |
|
self.__images__(fnm, zoomin * 3, page_from, page_to, callback) |
|
|
|
def __call__(self, fnm, need_image=True, zoomin=3, return_html=False): |
|
self.__images__(fnm, zoomin) |
|
self._layouts_rec(zoomin) |
|
self._table_transformer_job(zoomin) |
|
self._text_merge() |
|
self._concat_downward() |
|
self._filter_forpages() |
|
tbls = self._extract_table_figure( |
|
need_image, zoomin, return_html, False) |
|
return self.__filterout_scraps(deepcopy(self.boxes), zoomin), tbls |
|
|
|
def remove_tag(self, txt): |
|
return re.sub(r"@@[\t0-9.-]+?##", "", txt) |
|
|
|
def crop(self, text, ZM=3, need_position=False): |
|
imgs = [] |
|
poss = [] |
|
for tag in re.findall(r"@@[0-9-]+\t[0-9.\t]+##", text): |
|
pn, left, right, top, bottom = tag.strip( |
|
"#").strip("@").split("\t") |
|
left, right, top, bottom = float(left), float( |
|
right), float(top), float(bottom) |
|
poss.append(([int(p) - 1 for p in pn.split("-")], |
|
left, right, top, bottom)) |
|
if not poss: |
|
if need_position: |
|
return None, None |
|
return |
|
|
|
max_width = max( |
|
np.max([right - left for (_, left, right, _, _) in poss]), 6) |
|
GAP = 6 |
|
pos = poss[0] |
|
poss.insert(0, ([pos[0][0]], pos[1], pos[2], max( |
|
0, pos[3] - 120), max(pos[3] - GAP, 0))) |
|
pos = poss[-1] |
|
poss.append(([pos[0][-1]], pos[1], pos[2], min(self.page_images[pos[0][-1]].size[1] / ZM, pos[4] + GAP), |
|
min(self.page_images[pos[0][-1]].size[1] / ZM, pos[4] + 120))) |
|
|
|
positions = [] |
|
for ii, (pns, left, right, top, bottom) in enumerate(poss): |
|
right = left + max_width |
|
bottom *= ZM |
|
for pn in pns[1:]: |
|
bottom += self.page_images[pn - 1].size[1] |
|
imgs.append( |
|
self.page_images[pns[0]].crop((left * ZM, top * ZM, |
|
right * |
|
ZM, min( |
|
bottom, self.page_images[pns[0]].size[1]) |
|
)) |
|
) |
|
if 0 < ii < len(poss) - 1: |
|
positions.append((pns[0] + self.page_from, left, right, top, min( |
|
bottom, self.page_images[pns[0]].size[1]) / ZM)) |
|
bottom -= self.page_images[pns[0]].size[1] |
|
for pn in pns[1:]: |
|
imgs.append( |
|
self.page_images[pn].crop((left * ZM, 0, |
|
right * ZM, |
|
min(bottom, |
|
self.page_images[pn].size[1]) |
|
)) |
|
) |
|
if 0 < ii < len(poss) - 1: |
|
positions.append((pn + self.page_from, left, right, 0, min( |
|
bottom, self.page_images[pn].size[1]) / ZM)) |
|
bottom -= self.page_images[pn].size[1] |
|
|
|
if not imgs: |
|
if need_position: |
|
return None, None |
|
return |
|
height = 0 |
|
for img in imgs: |
|
height += img.size[1] + GAP |
|
height = int(height) |
|
width = int(np.max([i.size[0] for i in imgs])) |
|
pic = Image.new("RGB", |
|
(width, height), |
|
(245, 245, 245)) |
|
height = 0 |
|
for ii, img in enumerate(imgs): |
|
if ii == 0 or ii + 1 == len(imgs): |
|
img = img.convert('RGBA') |
|
overlay = Image.new('RGBA', img.size, (0, 0, 0, 0)) |
|
overlay.putalpha(128) |
|
img = Image.alpha_composite(img, overlay).convert("RGB") |
|
pic.paste(img, (0, int(height))) |
|
height += img.size[1] + GAP |
|
|
|
if need_position: |
|
return pic, positions |
|
return pic |
|
|
|
def get_position(self, bx, ZM): |
|
poss = [] |
|
pn = bx["page_number"] |
|
top = bx["top"] - self.page_cum_height[pn - 1] |
|
bott = bx["bottom"] - self.page_cum_height[pn - 1] |
|
poss.append((pn, bx["x0"], bx["x1"], top, min( |
|
bott, self.page_images[pn - 1].size[1] / ZM))) |
|
while bott * ZM > self.page_images[pn - 1].size[1]: |
|
bott -= self.page_images[pn - 1].size[1] / ZM |
|
top = 0 |
|
pn += 1 |
|
poss.append((pn, bx["x0"], bx["x1"], top, min( |
|
bott, self.page_images[pn - 1].size[1] / ZM))) |
|
return poss |
|
|
|
|
|
class PlainParser(object): |
|
def __call__(self, filename, from_page=0, to_page=100000, **kwargs): |
|
self.outlines = [] |
|
lines = [] |
|
try: |
|
self.pdf = pdf2_read( |
|
filename if isinstance( |
|
filename, str) else BytesIO(filename)) |
|
for page in self.pdf.pages[from_page:to_page]: |
|
lines.extend([t for t in page.extract_text().split("\n")]) |
|
|
|
outlines = self.pdf.outline |
|
|
|
def dfs(arr, depth): |
|
for a in arr: |
|
if isinstance(a, dict): |
|
self.outlines.append((a["/Title"], depth)) |
|
continue |
|
dfs(a, depth + 1) |
|
|
|
dfs(outlines, 0) |
|
except Exception: |
|
logging.exception("Outlines exception") |
|
if not self.outlines: |
|
logging.warning("Miss outlines") |
|
|
|
return [(line, "") for line in lines], [] |
|
|
|
def crop(self, ck, need_position): |
|
raise NotImplementedError |
|
|
|
@staticmethod |
|
def remove_tag(txt): |
|
raise NotImplementedError |
|
|
|
|
|
if __name__ == "__main__": |
|
pass |
|
|