import xgboost as xgb |
from io import BytesIO |
import torch |
import re |
import pdfplumber |
import logging |
from PIL import Image |
import numpy as np |
from rag.nlp import huqie |
from collections import Counter |
from copy import deepcopy |
from rag.cv.table_recognize import TableTransformer |
from rag.cv.ppdetection import PPDet |
from huggingface_hub import hf_hub_download |
logging.getLogger("pdfminer").setLevel(logging.WARNING) |
class HuParser: |
def __init__(self): |
from paddleocr import PaddleOCR |
logging.getLogger("ppocr").setLevel(logging.ERROR) |
self.ocr = PaddleOCR(use_angle_cls=False, lang="ch") |
self.layouter = PPDet() |
self.tbl_det = TableTransformer() |
self.updown_cnt_mdl = xgb.Booster() |
if torch.cuda.is_available(): |
self.updown_cnt_mdl.set_param({"device": "cuda"}) |
self.updown_cnt_mdl.load_model(hf_hub_download(repo_id="InfiniFlow/text_concat_xgb_v1.0", |
filename="updown_concat_xgb.model")) |
""" |
If you have trouble downloading HuggingFace models, -_^ this might help!! |
For Linux: |
export HF_ENDPOINT=https://hf-mirror.com |
For Windows: |
Good luck |
^_- |
""" |
def __char_width(self, c): |
return (c["x1"] - c["x0"]) // len(c["text"]) |
def __height(self, c): |
return c["bottom"] - c["top"] |
def _x_dis(self, a, b): |
return min(abs(a["x1"] - b["x0"]), abs(a["x0"] - b["x1"]), |
abs(a["x0"] + a["x1"] - b["x0"] - b["x1"]) / 2) |
def _y_dis( |
self, a, b): |
return ( |
b["top"] + b["bottom"] - a["top"] - a["bottom"]) / 2 |
def _match_proj(self, b): |
proj_patt = [ |
r"第[零一二三四五六七八九十百]+章", |
r"第[零一二三四五六七八九十百]+[条节]", |
r"[零一二三四五六七八九十百]+[、是 ]", |
r"[\((][零一二三四五六七八九十百]+[)\)]", |
r"[\((][0-9]+[)\)]", |
r"[0-9]+(、|\.[ ]|)|\.[^0-9./a-zA-Z_%><-]{4,})", |
r"[0-9]+\.[0-9.]+(、|\.[ ])", |
r"[⚫•➢①② ]", |
] |
return any([re.match(p, b["text"]) for p in proj_patt]) |
def _updown_concat_features(self, up, down): |
w = max(self.__char_width(up), self.__char_width(down)) |
h = max(self.__height(up), self.__height(down)) |
y_dis = self._y_dis(up, down) |
LEN = 6 |
tks_down = huqie.qie(down["text"][:LEN]).split(" ") |
tks_up = huqie.qie(up["text"][-LEN:]).split(" ") |
tks_all = up["text"][-LEN:].strip() \ |
+ (" " if re.match(r"[a-zA-Z0-9]+", |
up["text"][-1] + down["text"][0]) else "") \ |
+ down["text"][:LEN].strip() |
tks_all = huqie.qie(tks_all).split(" ") |
fea = [ |
up.get("R", -1) == down.get("R", -1), |
y_dis / h, |
down["page_number"] - up["page_number"], |
up["layout_type"] == down["layout_type"], |
up["layout_type"] == "text", |
down["layout_type"] == "text", |
up["layout_type"] == "table", |
down["layout_type"] == "table", |
True if re.search( |
r"([。?!;!?;+))]|[a-z]\.)$", |
up["text"]) else False, |
True if re.search(r"[,:‘“、0-9(+-]$", up["text"]) else False, |
True if re.search( |
r"(^.?[/,?;:\],。;:’”?!》】)-])", |
down["text"]) else False, |
True if re.match(r"[\((][^\(\)()]+[)\)]$", up["text"]) else False, |
True if re.search(r"[,,][^。.]+$", up["text"]) else False, |
True if re.search(r"[,,][^。.]+$", up["text"]) else False, |
True if re.search(r"[\((][^\))]+$", up["text"]) |
and re.search(r"[\))]", down["text"]) else False, |
self._match_proj(down), |
True if re.match(r"[A-Z]", down["text"]) else False, |
True if re.match(r"[A-Z]", up["text"][-1]) else False, |
True if re.match(r"[a-z0-9]", up["text"][-1]) else False, |
True if re.match(r"[0-9.%,-]+$", down["text"]) else False, |
up["text"].strip()[-2:] == down["text"].strip()[-2:] if len(up["text"].strip() |
) > 1 and len( |
down["text"].strip()) > 1 else False, |
up["x0"] > down["x1"], |
abs(self.__height(up) - self.__height(down)) / min(self.__height(up), |
self.__height(down)), |
self._x_dis(up, down) / max(w, 0.000001), |
(len(up["text"]) - len(down["text"])) / |
max(len(up["text"]), len(down["text"])), |
len(tks_all) - len(tks_up) - len(tks_down), |
len(tks_down) - len(tks_up), |
tks_down[-1] == tks_up[-1], |
max(down["in_row"], up["in_row"]), |
abs(down["in_row"] - up["in_row"]), |
len(tks_down) == 1 and huqie.tag(tks_down[0]).find("n") >= 0, |
len(tks_up) == 1 and huqie.tag(tks_up[0]).find("n") >= 0 |
] |
return fea |
@staticmethod |
def sort_Y_firstly(arr, threashold): |
arr = sorted(arr, key=lambda r: (r["top"], r["x0"])) |
for i in range(len(arr) - 1): |
for j in range(i, -1, -1): |
if abs(arr[j + 1]["top"] - arr[j]["top"]) < threashold \ |
and arr[j + 1]["x0"] < arr[j]["x0"]: |
tmp = deepcopy(arr[j]) |
arr[j] = deepcopy(arr[j + 1]) |
arr[j + 1] = deepcopy(tmp) |
return arr |
@staticmethod |
def sort_R_firstly(arr, thr=0): |
arr = HuParser.sort_Y_firstly(arr, thr) |
for i in range(len(arr) - 1): |
for j in range(i, -1, -1): |
if "R" not in arr[j] or "R" not in arr[j + 1]: |
continue |
if arr[j + 1]["R"] < arr[j]["R"] \ |
or ( |
arr[j + 1]["R"] == arr[j]["R"] |
and arr[j + 1]["x0"] < arr[j]["x0"] |
): |
tmp = arr[j] |
arr[j] = arr[j + 1] |
arr[j + 1] = tmp |
return arr |
@staticmethod |
def sort_X_firstly(arr, threashold, copy=True): |
arr = sorted(arr, key=lambda r: (r["x0"], r["top"])) |
for i in range(len(arr) - 1): |
for j in range(i, -1, -1): |
if abs(arr[j + 1]["x0"] - arr[j]["x0"]) < threashold \ |
and arr[j + 1]["top"] < arr[j]["top"]: |
tmp = deepcopy(arr[j]) if copy else arr[j] |
arr[j] = deepcopy(arr[j + 1]) if copy else arr[j + 1] |
arr[j + 1] = deepcopy(tmp) if copy else tmp |
return arr |
@staticmethod |
def sort_C_firstly(arr, thr=0): |
arr = HuParser.sort_X_firstly(arr, thr) |
for i in range(len(arr) - 1): |
for j in range(i, -1, -1): |
if "C" not in arr[j] or "C" not in arr[j + 1]: |
continue |
if arr[j + 1]["C"] < arr[j]["C"] \ |
or ( |
arr[j + 1]["C"] == arr[j]["C"] |
and arr[j + 1]["top"] < arr[j]["top"] |
): |
tmp = arr[j] |
arr[j] = arr[j + 1] |
arr[j + 1] = tmp |
return arr |
return sorted(arr, key=lambda r: (r.get("C", r["x0"]), r["top"])) |
def _has_color(self, o): |
if o.get("ncs", "") == "DeviceGray": |
if o["stroking_color"] and o["stroking_color"][0] == 1 and o["non_stroking_color"] and \ |
o["non_stroking_color"][0] == 1: |
if re.match(r"[a-zT_\[\]\(\)-]+", o.get("text", "")): |
return False |
return True |
def __overlapped_area(self, a, b, ratio=True): |
tp, btm, x0, x1 = a["top"], a["bottom"], a["x0"], a["x1"] |
if b["x0"] > x1 or b["x1"] < x0: |
return 0 |
if b["bottom"] < tp or b["top"] > btm: |
return 0 |
x0_ = max(b["x0"], x0) |
x1_ = min(b["x1"], x1) |
assert x0_ <= x1_, "Fuckedup! T:{},B:{},X0:{},X1:{} ==> {}".format( |
tp, btm, x0, x1, b) |
tp_ = max(b["top"], tp) |
btm_ = min(b["bottom"], btm) |
assert tp_ <= btm_, "Fuckedup! T:{},B:{},X0:{},X1:{} => {}".format( |
tp, btm, x0, x1, b) |
ov = (btm_ - tp_) * (x1_ - x0_) if x1 - \ |
x0 != 0 and btm - tp != 0 else 0 |
if ov > 0 and ratio: |
ov /= (x1 - x0) * (btm - tp) |
return ov |
def __find_overlapped_with_threashold(self, box, boxes, thr=0.3): |
if not boxes: |
return |
max_overlaped_i, max_overlaped, _max_overlaped = None, thr, 0 |
s, e = 0, len(boxes) |
for i in range(s, e): |
ov = self.__overlapped_area(box, boxes[i]) |
_ov = self.__overlapped_area(boxes[i], box) |
if (ov, _ov) < (max_overlaped, _max_overlaped): |
continue |
max_overlaped_i = i |
max_overlaped = ov |
_max_overlaped = _ov |
return max_overlaped_i |
def __find_overlapped(self, box, boxes_sorted_by_y, naive=False): |
if not boxes_sorted_by_y: |
return |
bxs = boxes_sorted_by_y |
s, e, ii = 0, len(bxs), 0 |
while s < e and not naive: |
ii = (e + s) // 2 |
pv = bxs[ii] |
if box["bottom"] < pv["top"]: |
e = ii |
continue |
if box["top"] > pv["bottom"]: |
s = ii + 1 |
continue |
break |
while s < ii: |
if box["top"] > bxs[s]["bottom"]: |
s += 1 |
break |
while e - 1 > ii: |
if box["bottom"] < bxs[e - 1]["top"]: |
e -= 1 |
break |
max_overlaped_i, max_overlaped = None, 0 |
for i in range(s, e): |
ov = self.__overlapped_area(bxs[i], box) |
if ov <= max_overlaped: |
continue |
max_overlaped_i = i |
max_overlaped = ov |
return max_overlaped_i |
def _is_garbage(self, b): |
patt = [r"^•+$", r"(版权归©|免责条款|地址[::])", r"\.{3,}", "^[0-9]{1,2} / ?[0-9]{1,2}$", |
r"^[0-9]{1,2} of [0-9]{1,2}$", "^http://[^ ]{12,}", |
"(资料|数据)来源[::]", "[0-9a-z._-]+@[a-z0-9-]+\\.[a-z]{2,3}", |
"\\(cid *: *[0-9]+ *\\)" |
] |
return any([re.search(p, b["text"]) for p in patt]) |
def __layouts_cleanup(self, boxes, layouts, far=2, thr=0.7): |
def notOverlapped(a, b): |
return any([a["x1"] < b["x0"], |
a["x0"] > b["x1"], |
a["bottom"] < b["top"], |
a["top"] > b["bottom"]]) |
i = 0 |
while i + 1 < len(layouts): |
j = i + 1 |
while j < min(i + far, len(layouts)) \ |
and (layouts[i].get("type", "") != layouts[j].get("type", "") |
or notOverlapped(layouts[i], layouts[j])): |
j += 1 |
if j >= min(i + far, len(layouts)): |
i += 1 |
continue |
if self.__overlapped_area(layouts[i], layouts[j]) < thr \ |
and self.__overlapped_area(layouts[j], layouts[i]) < thr: |
i += 1 |
continue |
if layouts[i].get("score") and layouts[j].get("score"): |
if layouts[i]["score"] > layouts[j]["score"]: |
layouts.pop(j) |
else: |
layouts.pop(i) |
continue |
area_i, area_i_1 = 0, 0 |
for b in boxes: |
if not notOverlapped(b, layouts[i]): |
area_i += self.__overlapped_area(b, layouts[i], False) |
if not notOverlapped(b, layouts[j]): |
area_i_1 += self.__overlapped_area(b, layouts[j], False) |
if area_i > area_i_1: |
layouts.pop(j) |
else: |
layouts.pop(i) |
return layouts |
def __table_paddle(self, images): |
tbls = self.tbl_det([img for img in images], threshold=0.5) |
res = [] |
for tbl in tbls: |
lts = [{"label": b["type"], |
"score": b["score"], |
"x0": b["bbox"][0], "x1": b["bbox"][2], |
"top": b["bbox"][1], "bottom": b["bbox"][-1] |
} for b in tbl] |
if not lts: |
continue |
left = [b["x0"] for b in lts if b["label"].find( |
"row") > 0 or b["label"].find("header") > 0] |
right = [b["x1"] for b in lts if b["label"].find( |
"row") > 0 or b["label"].find("header") > 0] |
if not left: |
continue |
left = np.median(left) if len(left) > 4 else np.min(left) |
right = np.median(right) if len(right) > 4 else np.max(right) |
for b in lts: |
if b["label"].find("row") > 0 or b["label"].find("header") > 0: |
if b["x0"] > left: |
b["x0"] = left |
if b["x1"] < right: |
b["x1"] = right |
top = [b["top"] for b in lts if b["label"] == "table column"] |
bottom = [b["bottom"] for b in lts if b["label"] == "table column"] |
if not top: |
res.append(lts) |
continue |
top = np.median(top) if len(top) > 4 else np.min(top) |
bottom = np.median(bottom) if len(bottom) > 4 else np.max(bottom) |
for b in lts: |
if b["label"] == "table column": |
if b["top"] > top: |
b["top"] = top |
if b["bottom"] < bottom: |
b["bottom"] = bottom |
res.append(lts) |
return res |
def __table_transformer_job(self, ZM): |
logging.info("Table processing...") |
imgs, pos = [], [] |
tbcnt = [0] |
MARGIN = 10 |
self.tb_cpns = [] |
assert len(self.page_layout) == len(self.page_images) |
for p, tbls in enumerate(self.page_layout): |
tbls = [f for f in tbls if f["type"] == "table"] |
tbcnt.append(len(tbls)) |
if not tbls: |
continue |
for tb in tbls: |
left, top, right, bott = tb["x0"] - MARGIN, tb["top"] - MARGIN, \ |
tb["x1"] + MARGIN, tb["bottom"] + MARGIN |
left *= ZM |
top *= ZM |
right *= ZM |
bott *= ZM |
pos.append((left, top)) |
imgs.append(self.page_images[p].crop((left, top, right, bott))) |
assert len(self.page_images) == len(tbcnt) - 1 |
if not imgs: |
return |
recos = self.__table_paddle(imgs) |
tbcnt = np.cumsum(tbcnt) |
for i in range(len(tbcnt) - 1): |
pg = [] |
for j, tb_items in enumerate( |
recos[tbcnt[i]: tbcnt[i + 1]]): |
poss = pos[tbcnt[i]: tbcnt[i + 1]] |
for it in tb_items: |
it["x0"] = (it["x0"] + poss[j][0]) |
it["x1"] = (it["x1"] + poss[j][0]) |
it["top"] = (it["top"] + poss[j][1]) |
it["bottom"] = (it["bottom"] + poss[j][1]) |
for n in ["x0", "x1", "top", "bottom"]: |
it[n] /= ZM |
it["top"] += self.page_cum_height[i] |
it["bottom"] += self.page_cum_height[i] |
it["pn"] = i |
it["layoutno"] = j |
pg.append(it) |
self.tb_cpns.extend(pg) |
def __ocr_paddle(self, pagenum, img, chars, ZM=3): |
bxs = self.ocr.ocr(np.array(img), cls=True)[0] |
if not bxs: |
self.boxes.append([]) |
return |
bxs = [(line[0], line[1][0]) for line in bxs] |
bxs = self.sort_Y_firstly( |
[{"x0": b[0][0] / ZM, "x1": b[1][0] / ZM, |
"top": b[0][1] / ZM, "text": "", "txt": t, |
"bottom": b[-1][1] / ZM, |
"page_number": pagenum} for b, t in bxs if b[0][0] <= b[1][0] and b[0][1] <= b[-1][1]], |
self.mean_height[-1] / 3 |
) |
for c in self.sort_X_firstly(chars, self.mean_width[pagenum - 1] // 4): |
ii = self.__find_overlapped(c, bxs) |
if ii is None: |
self.lefted_chars.append(c) |
continue |
ch = c["bottom"] - c["top"] |
bh = bxs[ii]["bottom"] - bxs[ii]["top"] |
if abs(ch - bh) / max(ch, bh) >= 0.7: |
self.lefted_chars.append(c) |
continue |
bxs[ii]["text"] += c["text"] |
for b in bxs: |
if not b["text"]: |
b["text"] = b["txt"] |
del b["txt"] |
if self.mean_height[-1] == 0: |
self.mean_height[-1] = np.median([b["bottom"] - b["top"] |
for b in bxs]) |
self.boxes.append(bxs) |
def __layouts_paddle(self, ZM): |
assert len(self.page_images) == len(self.boxes) |
boxes = [] |
layouts = self.layouter([np.array(img) for img in self.page_images]) |
assert len(self.page_images) == len(layouts) |
for pn, lts in enumerate(layouts): |
bxs = self.boxes[pn] |
lts = [{"type": b["type"], |
"score": float(b["score"]), |
"x0": b["bbox"][0] / ZM, "x1": b["bbox"][2] / ZM, |
"top": b["bbox"][1] / ZM, "bottom": b["bbox"][-1] / ZM, |
"page_number": pn, |
} for b in lts] |
lts = self.sort_Y_firstly(lts, self.mean_height[pn] / 2) |
lts = self.__layouts_cleanup(bxs, lts) |
self.page_layout.append(lts) |
def findLayout(ty): |
nonlocal bxs, lts |
lts_ = [lt for lt in lts if lt["type"] == ty] |
i = 0 |
while i < len(bxs): |
if bxs[i].get("layout_type"): |
i += 1 |
continue |
if self._is_garbage(bxs[i]): |
logging.debug("GARBAGE: " + bxs[i]["text"]) |
bxs.pop(i) |
continue |
ii = self.__find_overlapped_with_threashold(bxs[i], lts_, |
thr=0.4) |
if ii is None: |
bxs[i]["layout_type"] = "" |
i += 1 |
continue |
lts_[ii]["visited"] = True |
if lts_[ii]["type"] in ["footer", "header", "reference"]: |
if lts_[ii]["type"] not in self.garbages: |
self.garbages[lts_[ii]["type"]] = [] |
self.garbages[lts_[ii]["type"]].append(bxs[i]["text"]) |
logging.debug("GARBAGE: " + bxs[i]["text"]) |
bxs.pop(i) |
continue |
bxs[i]["layoutno"] = f"{ty}-{ii}" |
bxs[i]["layout_type"] = lts_[ii]["type"] |
i += 1 |
for lt in ["footer", "header", "reference", "figure caption", |
"table caption", "title", "text", "table", "figure"]: |
findLayout(lt) |
for i, lt in enumerate( |
[lt for lt in lts if lt["type"] == "figure"]): |
if lt.get("visited"): |
continue |
lt = deepcopy(lt) |
del lt["type"] |
lt["text"] = "" |
lt["layout_type"] = "figure" |
lt["layoutno"] = f"figure-{i}" |
bxs.append(lt) |
boxes.extend(bxs) |
self.boxes = boxes |
def __text_merge(self, garbage): |
bxs = self.boxes |
def end_with(b, txt): |
txt = txt.strip() |
tt = b.get("text", "").strip() |
return tt and tt.find(txt) == len(tt) - len(txt) |
def start_with(b, txts): |
tt = b.get("text", "").strip() |
return tt and any([tt.find(t.strip()) == 0 for t in txts]) |
i = 0 |
while i < len(bxs) - 1: |
b = bxs[i] |
b_ = bxs[i + 1] |
if b.get("layoutno", "0") != b_.get("layoutno", "1"): |
i += 1 |
continue |
dis_thr = 1 |
dis = b["x1"] - b_["x0"] |
if b.get("layout_type", "") != "text" or b_.get( |
"layout_type", "") != "text": |
if end_with(b, ",") or start_with(b_, "(,"): |
dis_thr = -8 |
else: |
i += 1 |
continue |
if abs(self._y_dis(b, b_)) < self.mean_height[bxs[i]["page_number"] - 1] / 5 \ |
and dis >= dis_thr and b["x1"] < b_["x1"]: |
bxs[i]["x1"] = b_["x1"] |
bxs[i]["top"] = (b["top"] + b_["top"]) / 2 |
bxs[i]["bottom"] = (b["bottom"] + b_["bottom"]) / 2 |
bxs[i]["text"] += b_["text"] |
bxs.pop(i + 1) |
continue |
i += 1 |
self.boxes = bxs |
for i in range(len(self.boxes)): |
mh = self.mean_height[self.boxes[i]["page_number"] - 1] |
self.boxes[i]["in_row"] = 0 |
j = max(0, i - 12) |
while j < min(i + 12, len(self.boxes)): |
if j == i: |
j += 1 |
continue |
ydis = self._y_dis(self.boxes[i], self.boxes[j]) / mh |
if abs(ydis) < 1: |
self.boxes[i]["in_row"] += 1 |
elif ydis > 0: |
break |
j += 1 |
def gather(kwd, fzy=10, ption=0.6): |
eles = self.sort_Y_firstly( |
[r for r in self.tb_cpns if re.match(kwd, r["label"])], fzy) |
eles = self.__layouts_cleanup(self.boxes, eles, 5, ption) |
return self.sort_Y_firstly(eles, 0) |
headers = gather(r".*header$") |
rows = gather(r".* (row|header)") |
spans = gather(r".*spanning") |
clmns = sorted([r for r in self.tb_cpns if re.match( |
r"table column$", r["label"])], key=lambda x: (x["pn"], x["layoutno"], x["x0"])) |
clmns = self.__layouts_cleanup(self.boxes, clmns, 5, 0.5) |
for b in self.boxes: |
if b.get("layout_type", "") != "table": |
continue |
ii = self.__find_overlapped_with_threashold(b, rows, thr=0.3) |
if ii is not None: |
b["R"] = ii |
b["R_top"] = rows[ii]["top"] |
b["R_bott"] = rows[ii]["bottom"] |
ii = self.__find_overlapped_with_threashold(b, headers, thr=0.3) |
if ii is not None: |
b["H_top"] = headers[ii]["top"] |
b["H_bott"] = headers[ii]["bottom"] |
b["H_left"] = headers[ii]["x0"] |
b["H_right"] = headers[ii]["x1"] |
b["H"] = ii |
ii = self.__find_overlapped_with_threashold(b, clmns, thr=0.3) |
if ii is not None: |
b["C"] = ii |
b["C_left"] = clmns[ii]["x0"] |
b["C_right"] = clmns[ii]["x1"] |
ii = self.__find_overlapped_with_threashold(b, spans, thr=0.3) |
if ii is not None: |
b["H_top"] = spans[ii]["top"] |
b["H_bott"] = spans[ii]["bottom"] |
b["H_left"] = spans[ii]["x0"] |
b["H_right"] = spans[ii]["x1"] |
b["SP"] = ii |
boxes = deepcopy(self.boxes) |
blocks = [] |
while boxes: |
chunks = [] |
def dfs(up, dp): |
if not up["text"].strip() or up["text"].strip() in garbage: |
return |
chunks.append(up) |
i = dp |
while i < min(dp + 12, len(boxes)): |
ydis = self._y_dis(up, boxes[i]) |
smpg = up["page_number"] == boxes[i]["page_number"] |
mh = self.mean_height[up["page_number"] - 1] |
mw = self.mean_width[up["page_number"] - 1] |
if smpg and ydis > mh * 4: |
break |
if not smpg and ydis > mh * 16: |
break |
down = boxes[i] |
if up.get("R", "") != down.get( |
"R", "") and up["text"][-1] != ",": |
i += 1 |
continue |
if re.match(r"[0-9]{2,3}/[0-9]{3}$", up["text"]) \ |
or re.match(r"[0-9]{2,3}/[0-9]{3}$", down["text"]): |
i += 1 |
continue |
if not down["text"].strip() \ |
or down["text"].strip() in garbage: |
i += 1 |
continue |
if up["x1"] < down["x0"] - 10 * \ |
mw or up["x0"] > down["x1"] + 10 * mw: |
i += 1 |
continue |
if i - dp < 5 and up.get("layout_type") == "text": |
if up.get("layoutno", "1") == down.get( |
"layoutno", "2"): |
dfs(down, i + 1) |
boxes.pop(i) |
return |
i += 1 |
continue |
fea = self._updown_concat_features(up, down) |
if self.updown_cnt_mdl.predict( |
xgb.DMatrix([fea]))[0] <= 0.5: |
i += 1 |
continue |
dfs(down, i + 1) |
boxes.pop(i) |
return |
dfs(boxes[0], 1) |
boxes.pop(0) |
if chunks: |
blocks.append(chunks) |
boxes = [] |
for b in blocks: |
if len(b) == 1: |
boxes.append(b[0]) |
continue |
t = b[0] |
for c in b[1:]: |
t["text"] = t["text"].strip() |
c["text"] = c["text"].strip() |
if not c["text"]: |
continue |
if t["text"] and re.match( |
r"[0-9\.a-zA-Z]+$", t["text"][-1] + c["text"][-1]): |
t["text"] += " " |
t["text"] += c["text"] |
t["x0"] = min(t["x0"], c["x0"]) |
t["x1"] = max(t["x1"], c["x1"]) |
t["page_number"] = min(t["page_number"], c["page_number"]) |
t["bottom"] = c["bottom"] |
if not t["layout_type"] \ |
and c["layout_type"]: |
t["layout_type"] = c["layout_type"] |
boxes.append(t) |
self.boxes = self.sort_Y_firstly(boxes, 0) |
def __filter_forpages(self): |
if not self.boxes: |
return |
to = min(7, len(self.page_images) // 5) |
pg_hits = [0 for _ in range(to)] |
def possible(c): |
if c.get("layout_type", "") == "reference": |
return True |
if c["bottom"] - c["top"] >= 2 * \ |
self.mean_height[c["page_number"] - 1]: |
return False |
if c["text"].find("....") >= 0 \ |
or (c["x1"] - c["x0"] > 250 and re.search(r"[0-9]+$", |
c["text"].strip())): |
return True |
return self.is_caption(c) and re.search( |
r"[0-9]+$", c["text"].strip()) |
for c in self.boxes: |
if c["page_number"] >= to: |
break |
if possible(c): |
pg_hits[c["page_number"] - 1] += 1 |
st, ed = -1, -1 |
for i in range(len(self.boxes)): |
c = self.boxes[i] |
if c["page_number"] >= to: |
break |
if pg_hits[c["page_number"] - 1] >= 3 and possible(c): |
if st < 0: |
st = i |
else: |
ed = i |
for _ in range(st, ed + 1): |
self.boxes.pop(st) |
def _blockType(self, b): |
patt = [ |
("^(20|19)[0-9]{2}[年/-][0-9]{1,2}[月/-][0-9]{1,2}日*$", "Dt"), |
(r"^(20|19)[0-9]{2}年$", "Dt"), |
(r"^(20|19)[0-9]{2}[年-][0-9]{1,2}月*$", "Dt"), |
("^[0-9]{1,2}[月-][0-9]{1,2}日*$", "Dt"), |
(r"^第*[一二三四1-4]季度$", "Dt"), |
(r"^(20|19)[0-9]{2}年*[一二三四1-4]季度$", "Dt"), |
(r"^(20|19)[0-9]{2}[ABCDE]$", "Dt"), |
("^[0-9.,+%/ -]+$", "Nu"), |
(r"^[0-9A-Z/\._~-]+$", "Ca"), |
(r"^[A-Z]*[a-z' -]+$", "En"), |
(r"^[0-9.,+-]+[0-9A-Za-z/$¥%<>()()' -]+$", "NE"), |
(r"^.{1}$", "Sg") |
] |
for p, n in patt: |
if re.search(p, b["text"].strip()): |
return n |
tks = [t for t in huqie.qie(b["text"]).split(" ") if len(t) > 1] |
if len(tks) > 3: |
if len(tks) < 12: |
return "Tx" |
else: |
return "Lx" |
if len(tks) == 1 and huqie.tag(tks[0]) == "nr": |
return "Nr" |
return "Ot" |
def __cal_spans(self, boxes, rows, cols, tbl, html=True): |
clft = [np.mean([c.get("C_left", c["x0"]) for c in cln]) |
for cln in cols] |
crgt = [np.mean([c.get("C_right", c["x1"]) for c in cln]) |
for cln in cols] |
rtop = [np.mean([c.get("R_top", c["top"]) for c in row]) |
for row in rows] |
rbtm = [np.mean([c.get("R_btm", c["bottom"]) |
for c in row]) for row in rows] |
for b in boxes: |
if "SP" not in b: |
continue |
b["colspan"] = [b["cn"]] |
b["rowspan"] = [b["rn"]] |
for j in range(0, len(clft)): |
if j == b["cn"]: |
continue |
if clft[j] + (crgt[j] - clft[j]) / 2 < b["H_left"]: |
continue |
if crgt[j] - (crgt[j] - clft[j]) / 2 > b["H_right"]: |
continue |
b["colspan"].append(j) |
for j in range(0, len(rtop)): |
if j == b["rn"]: |
continue |
if rtop[j] + (rbtm[j] - rtop[j]) / 2 < b["H_top"]: |
continue |
if rbtm[j] - (rbtm[j] - rtop[j]) / 2 > b["H_bott"]: |
continue |
b["rowspan"].append(j) |
def join(arr): |
if not arr: |
return "" |
return "".join([t["text"] for t in arr]) |
for i in range(len(tbl)): |
for j, arr in enumerate(tbl[i]): |
if not arr: |
continue |
if all(["rowspan" not in a and "colspan" not in a for a in arr]): |
continue |
rowspan, colspan = [], [] |
for a in arr: |
if isinstance(a.get("rowspan", 0), list): |
rowspan.extend(a["rowspan"]) |
if isinstance(a.get("colspan", 0), list): |
colspan.extend(a["colspan"]) |
rowspan, colspan = set(rowspan), set(colspan) |
if len(rowspan) < 2 and len(colspan) < 2: |
for a in arr: |
if "rowspan" in a: |
del a["rowspan"] |
if "colspan" in a: |
del a["colspan"] |
continue |
rowspan, colspan = sorted(rowspan), sorted(colspan) |
rowspan = list(range(rowspan[0], rowspan[-1] + 1)) |
colspan = list(range(colspan[0], colspan[-1] + 1)) |
assert i in rowspan, rowspan |
assert j in colspan, colspan |
arr = [] |
for r in rowspan: |
for c in colspan: |
arr_txt = join(arr) |
if tbl[r][c] and join(tbl[r][c]) != arr_txt: |
arr.extend(tbl[r][c]) |
tbl[r][c] = None if html else arr |
for a in arr: |
if len(rowspan) > 1: |
a["rowspan"] = len(rowspan) |
elif "rowspan" in a: |
del a["rowspan"] |
if len(colspan) > 1: |
a["colspan"] = len(colspan) |
elif "colspan" in a: |
del a["colspan"] |
tbl[rowspan[0]][colspan[0]] = arr |
return tbl |
def __construct_table(self, boxes, html=False): |
cap = "" |
i = 0 |
while i < len(boxes): |
if self.is_caption(boxes[i]): |
cap += boxes[i]["text"] |
boxes.pop(i) |
i -= 1 |
i += 1 |
if not boxes: |
return [] |
for b in boxes: |
b["btype"] = self._blockType(b) |
max_type = Counter([b["btype"] for b in boxes]).items() |
max_type = max(max_type, key=lambda x: x[1])[0] if max_type else "" |
logging.debug("MAXTYPE: " + max_type) |
rowh = [b["R_bott"] - b["R_top"] for b in boxes if "R" in b] |
rowh = np.min(rowh) if rowh else 0 |
boxes = self.sort_R_firstly(boxes, rowh / 2) |
boxes[0]["rn"] = 0 |
rows = [[boxes[0]]] |
btm = boxes[0]["bottom"] |
for b in boxes[1:]: |
b["rn"] = len(rows) - 1 |
lst_r = rows[-1] |
if lst_r[-1].get("R", "") != b.get("R", "") \ |
or (b["top"] >= btm - 3 and lst_r[-1].get("R", "-1") != b.get("R", "-2") |
): |
btm = b["bottom"] |
b["rn"] += 1 |
rows.append([b]) |
continue |
btm = (btm + b["bottom"]) / 2. |
rows[-1].append(b) |
colwm = [b["C_right"] - b["C_left"] for b in boxes if "C" in b] |
colwm = np.min(colwm) if colwm else 0 |
crosspage = len(set([b["page_number"] for b in boxes])) > 1 |
if crosspage: |
boxes = self.sort_X_firstly(boxes, colwm / 2, False) |
else: |
boxes = self.sort_C_firstly(boxes, colwm / 2) |
boxes[0]["cn"] = 0 |
cols = [[boxes[0]]] |
right = boxes[0]["x1"] |
for b in boxes[1:]: |
b["cn"] = len(cols) - 1 |
lst_c = cols[-1] |
if (int(b.get("C", "1")) - int(lst_c[-1].get("C", "1")) == 1 and b["page_number"] == lst_c[-1][ |
"page_number"]) \ |
or (b["x0"] >= right and lst_c[-1].get("C", "-1") != b.get("C", "-2")): |
right = b["x1"] |
b["cn"] += 1 |
cols.append([b]) |
continue |
right = (right + b["x1"]) / 2. |
cols[-1].append(b) |
tbl = [[[] for _ in range(len(cols))] for _ in range(len(rows))] |
for b in boxes: |
tbl[b["rn"]][b["cn"]].append(b) |
if len(rows) >= 4: |
j = 0 |
while j < len(tbl[0]): |
e, ii = 0, 0 |
for i in range(len(tbl)): |
if tbl[i][j]: |
e += 1 |
ii = i |
if e > 1: |
break |
if e > 1: |
j += 1 |
continue |
f = (j > 0 and tbl[ii][j - 1] and tbl[ii] |
[j - 1][0].get("text")) or j == 0 |
ff = (j + 1 < len(tbl[ii]) and tbl[ii][j + 1] and tbl[ii] |
[j + 1][0].get("text")) or j + 1 >= len(tbl[ii]) |
if f and ff: |
j += 1 |
continue |
bx = tbl[ii][j][0] |
logging.debug("Relocate column single: " + bx["text"]) |
left, right = 100000, 100000 |
if j > 0 and not f: |
for i in range(len(tbl)): |
if tbl[i][j - 1]: |
left = min(left, np.min( |
[bx["x0"] - a["x1"] for a in tbl[i][j - 1]])) |
if j + 1 < len(tbl[0]) and not ff: |
for i in range(len(tbl)): |
if tbl[i][j + 1]: |
right = min(right, np.min( |
[a["x0"] - bx["x1"] for a in tbl[i][j + 1]])) |
assert left < 100000 or right < 100000 |
if left < right: |
for jj in range(j, len(tbl[0])): |
for i in range(len(tbl)): |
for a in tbl[i][jj]: |
a["cn"] -= 1 |
if tbl[ii][j - 1]: |
tbl[ii][j - 1].extend(tbl[ii][j]) |
else: |
tbl[ii][j - 1] = tbl[ii][j] |
for i in range(len(tbl)): |
tbl[i].pop(j) |
else: |
for jj in range(j + 1, len(tbl[0])): |
for i in range(len(tbl)): |
for a in tbl[i][jj]: |
a["cn"] -= 1 |
if tbl[ii][j + 1]: |
tbl[ii][j + 1].extend(tbl[ii][j]) |
else: |
tbl[ii][j + 1] = tbl[ii][j] |
for i in range(len(tbl)): |
tbl[i].pop(j) |
cols.pop(j) |
assert len(cols) == len(tbl[0]), "Column NO. miss matched: %d vs %d" % ( |
len(cols), len(tbl[0])) |
if len(cols) >= 4: |
i = 0 |
while i < len(tbl): |
e, jj = 0, 0 |
for j in range(len(tbl[i])): |
if tbl[i][j]: |
e += 1 |
jj = j |
if e > 1: |
break |
if e > 1: |
i += 1 |
continue |
f = (i > 0 and tbl[i - 1][jj] and tbl[i - 1] |
[jj][0].get("text")) or i == 0 |
ff = (i + 1 < len(tbl) and tbl[i + 1][jj] and tbl[i + 1] |
[jj][0].get("text")) or i + 1 >= len(tbl) |
if f and ff: |
i += 1 |
continue |
bx = tbl[i][jj][0] |
logging.debug("Relocate row single: " + bx["text"]) |
up, down = 100000, 100000 |
if i > 0 and not f: |
for j in range(len(tbl[i - 1])): |
if tbl[i - 1][j]: |
up = min(up, np.min( |
[bx["top"] - a["bottom"] for a in tbl[i - 1][j]])) |
if i + 1 < len(tbl) and not ff: |
for j in range(len(tbl[i + 1])): |
if tbl[i + 1][j]: |
down = min(down, np.min( |
[a["top"] - bx["bottom"] for a in tbl[i + 1][j]])) |
assert up < 100000 or down < 100000 |
if up < down: |
for ii in range(i, len(tbl)): |
for j in range(len(tbl[ii])): |
for a in tbl[ii][j]: |
a["rn"] -= 1 |
if tbl[i - 1][jj]: |
tbl[i - 1][jj].extend(tbl[i][jj]) |
else: |
tbl[i - 1][jj] = tbl[i][jj] |
tbl.pop(i) |
else: |
for ii in range(i + 1, len(tbl)): |
for j in range(len(tbl[ii])): |
for a in tbl[ii][j]: |
a["rn"] -= 1 |
if tbl[i + 1][jj]: |
tbl[i + 1][jj].extend(tbl[i][jj]) |
else: |
tbl[i + 1][jj] = tbl[i][jj] |
tbl.pop(i) |
rows.pop(i) |
hdset = set([]) |
for i in range(len(tbl)): |
cnt, h = 0, 0 |
for j, arr in enumerate(tbl[i]): |
if not arr: |
continue |
cnt += 1 |
if max_type == "Nu" and arr[0]["btype"] == "Nu": |
continue |
if any([a.get("H") for a in arr]) \ |
or (max_type == "Nu" and arr[0]["btype"] != "Nu"): |
h += 1 |
if h / cnt > 0.5: |
hdset.add(i) |
if html: |
return [self.__html_table(cap, hdset, |
self.__cal_spans(boxes, rows, |
cols, tbl, True) |
)] |
return self.__desc_table(cap, hdset, |
self.__cal_spans(boxes, rows, cols, tbl, False)) |
def __html_table(self, cap, hdset, tbl): |
html = "<table>" |
if cap: |
html += f"<caption>{cap}</caption>" |
for i in range(len(tbl)): |
row = "<tr>" |
txts = [] |
for j, arr in enumerate(tbl[i]): |
if arr is None: |
continue |
if not arr: |
row += "<td></td>" if i not in hdset else "<th></th>" |
continue |
txt = "" |
if arr: |
h = min(np.min([c["bottom"] - c["top"] for c in arr]) / 2, |
self.mean_height[arr[0]["page_number"] - 1] / 2) |
txt = "".join([c["text"] |
for c in self.sort_Y_firstly(arr, h)]) |
txts.append(txt) |
sp = "" |
if arr[0].get("colspan"): |
sp = "colspan={}".format(arr[0]["colspan"]) |
if arr[0].get("rowspan"): |
sp += " rowspan={}".format(arr[0]["rowspan"]) |
if i in hdset: |
row += f"<th {sp} >" + txt + "</th>" |
else: |
row += f"<td {sp} >" + txt + "</td>" |
if i in hdset: |
if all([t in hdset for t in txts]): |
continue |
for t in txts: |
hdset.add(t) |
if row != "<tr>": |
row += "</tr>" |
else: |
row = "" |
html += "\n" + row |
html += "\n</table>" |
return html |
def __desc_table(self, cap, hdr_rowno, tbl): |
clmno = len(tbl[0]) |
rowno = len(tbl) |
headers = {} |
hdrset = set() |
lst_hdr = [] |
for r in sorted(list(hdr_rowno)): |
headers[r] = ["" for _ in range(clmno)] |
for i in range(clmno): |
if not tbl[r][i]: |
continue |
txt = "".join([a["text"].strip() for a in tbl[r][i]]) |
headers[r][i] = txt |
hdrset.add(txt) |
if all([not t for t in headers[r]]): |
del headers[r] |
hdr_rowno.remove(r) |
continue |
for j in range(clmno): |
if headers[r][j]: |
continue |
if j >= len(lst_hdr): |
break |
headers[r][j] = lst_hdr[j] |
lst_hdr = headers[r] |
for i in range(rowno): |
if i not in hdr_rowno: |
continue |
for j in range(i + 1, rowno): |
if j not in hdr_rowno: |
break |
for k in range(clmno): |
if not headers[j - 1][k]: |
continue |
if headers[j][k].find(headers[j - 1][k]) >= 0: |
continue |
if len(headers[j][k]) > len(headers[j - 1][k]): |
headers[j][k] += ("的" if headers[j][k] |
else "") + headers[j - 1][k] |
else: |
headers[j][k] = headers[j - 1][k] \ |
+ ("的" if headers[j - 1][k] else "") \ |
+ headers[j][k] |
logging.debug( |
f">>>>>>>>>>>>>>>>>{cap}:SIZE:{rowno}X{clmno} Header: {hdr_rowno}") |
row_txt = [] |
for i in range(rowno): |
if i in hdr_rowno: |
continue |
rtxt = [] |
def append(delimer): |
nonlocal rtxt, row_txt |
rtxt = delimer.join(rtxt) |
if row_txt and len(row_txt[-1]) + len(rtxt) < 64: |
row_txt[-1] += "\n" + rtxt |
else: |
row_txt.append(rtxt) |
r = 0 |
if len(headers.items()): |
_arr = [(i - r, r) for r, _ in headers.items() if r < i] |
if _arr: |
_, r = min(_arr, key=lambda x: x[0]) |
if r not in headers and clmno <= 2: |
for j in range(clmno): |
if not tbl[i][j]: |
continue |
txt = "".join([a["text"].strip() for a in tbl[i][j]]) |
if txt: |
rtxt.append(txt) |
if rtxt: |
append(":") |
continue |
for j in range(clmno): |
if not tbl[i][j]: |
continue |
txt = "".join([a["text"].strip() for a in tbl[i][j]]) |
if not txt: |
continue |
ctt = headers[r][j] if r in headers else "" |
if ctt: |
ctt += ":" |
ctt += txt |
if ctt: |
rtxt.append(ctt) |
if rtxt: |
row_txt.append("; ".join(rtxt)) |
if cap: |
row_txt = [t + f"\t——来自“{cap}”" for t in row_txt] |
return row_txt |
@staticmethod |
def is_caption(bx): |
patt = [ |
r"[图表]+[ 0-9::]{2,}" |
] |
if any([re.match(p, bx["text"].strip()) for p in patt]) \ |
or bx["layout_type"].find("caption") >= 0: |
return True |
return False |
def __extract_table_figure(self, need_image, ZM, return_html): |
tables = {} |
figures = {} |
i = 0 |
lst_lout_no = "" |
nomerge_lout_no = [] |
while i < len(self.boxes): |
if "layoutno" not in self.boxes[i]: |
i += 1 |
continue |
lout_no = str(self.boxes[i]["page_number"]) + \ |
"-" + str(self.boxes[i]["layoutno"]) |
if self.is_caption(self.boxes[i]) or self.boxes[i]["layout_type"] in ["table caption", "title", |
"figure caption", "reference"]: |
nomerge_lout_no.append(lst_lout_no) |
if self.boxes[i]["layout_type"] == "table": |
if re.match(r"(数据|资料|图表)*来源[:: ]", self.boxes[i]["text"]): |
self.boxes.pop(i) |
continue |
if lout_no not in tables: |
tables[lout_no] = [] |
tables[lout_no].append(self.boxes[i]) |
self.boxes.pop(i) |
lst_lout_no = lout_no |
continue |
if need_image and self.boxes[i]["layout_type"] == "figure": |
if re.match(r"(数据|资料|图表)*来源[:: ]", self.boxes[i]["text"]): |
self.boxes.pop(i) |
continue |
if lout_no not in figures: |
figures[lout_no] = [] |
figures[lout_no].append(self.boxes[i]) |
self.boxes.pop(i) |
lst_lout_no = lout_no |
continue |
i += 1 |
nomerge_lout_no = set(nomerge_lout_no) |
tbls = sorted([(k, bxs) for k, bxs in tables.items()], |
key=lambda x: (x[1][0]["top"], x[1][0]["x0"])) |
i = len(tbls) - 1 |
while i - 1 >= 0: |
k0, bxs0 = tbls[i - 1] |
k, bxs = tbls[i] |
i -= 1 |
if k0 in nomerge_lout_no: |
continue |
if bxs[0]["page_number"] == bxs0[0]["page_number"]: |
continue |
if bxs[0]["page_number"] - bxs0[0]["page_number"] > 1: |
continue |
mh = self.mean_height[bxs[0]["page_number"] - 1] |
if self._y_dis(bxs0[-1], bxs[0]) > mh * 23: |
continue |
tables[k0].extend(tables[k]) |
del tables[k] |
def x_overlapped(a, b): |
return not any([a["x1"] < b["x0"], a["x0"] > b["x1"]]) |
i = 0 |
while i < len(self.boxes): |
c = self.boxes[i] |
if not self.is_caption(c): |
i += 1 |
continue |
def nearest(tbls): |
nonlocal c |
mink = "" |
minv = 1000000000 |
for k, bxs in tbls.items(): |
for b in bxs[:10]: |
if b.get("layout_type", "").find("caption") >= 0: |
continue |
y_dis = self._y_dis(c, b) |
x_dis = self._x_dis( |
c, b) if not x_overlapped( |
c, b) else 0 |
dis = y_dis * y_dis + x_dis * x_dis |
if dis < minv: |
mink = k |
minv = dis |
return mink, minv |
tk, tv = nearest(tables) |
fk, fv = nearest(figures) |
if min(tv, fv) > 2000: |
i += 1 |
continue |
if tv < fv: |
tables[tk].insert(0, c) |
logging.debug( |
"TABLE:" + |
self.boxes[i]["text"] + |
"; Cap: " + |
tk) |
else: |
figures[fk].insert(0, c) |
logging.debug( |
"FIGURE:" + |
self.boxes[i]["text"] + |
"; Cap: " + |
tk) |
self.boxes.pop(i) |
res = [] |
def cropout(bxs, ltype): |
nonlocal ZM |
pn = set([b["page_number"] - 1 for b in bxs]) |
if len(pn) < 2: |
pn = list(pn)[0] |
ht = self.page_cum_height[pn] |
b = { |
"x0": np.min([b["x0"] for b in bxs]), |
"top": np.min([b["top"] for b in bxs]) - ht, |
"x1": np.max([b["x1"] for b in bxs]), |
"bottom": np.max([b["bottom"] for b in bxs]) - ht |
} |
louts = [l for l in self.page_layout[pn] if l["type"] == ltype] |
ii = self.__find_overlapped(b, louts, naive=True) |
if ii is not None: |
b = louts[ii] |
else: |
logging.warn( |
f"Missing layout match: {pn + 1},%s" % |
(bxs[0].get( |
"layoutno", ""))) |
left, top, right, bott = b["x0"], b["top"], b["x1"], b["bottom"] |
return self.page_images[pn] \ |
.crop((left * ZM, top * ZM, |
right * ZM, bott * ZM)) |
pn = {} |
for b in bxs: |
p = b["page_number"] - 1 |
if p not in pn: |
pn[p] = [] |
pn[p].append(b) |
pn = sorted(pn.items(), key=lambda x: x[0]) |
imgs = [cropout(arr, ltype) for p, arr in pn] |
pic = Image.new("RGB", |
(int(np.max([i.size[0] for i in imgs])), |
int(np.sum([m.size[1] for m in imgs]))), |
(245, 245, 245)) |
height = 0 |
for img in imgs: |
pic.paste(img, (0, int(height))) |
height += img.size[1] |
return pic |
for k, bxs in figures.items(): |
txt = "\n".join( |
[b["text"] for b in bxs |
if not re.match(r"[0-9a-z.\+%-]", b["text"].strip()) |
and len(b["text"].strip()) >= 4 |
] |
) |
if not txt: |
continue |
res.append( |
(cropout( |
bxs, |
"figure"), |
[txt] if not return_html else [f"<p>{txt}</p>"])) |
for k, bxs in tables.items(): |
if not bxs: |
continue |
res.append((cropout(bxs, "table"), |
self.__construct_table(bxs, html=return_html))) |
return res |
def proj_match(self, line): |
if len(line) <= 2: |
return |
if re.match(r"[0-9 ().,%%+/-]+$", line): |
return False |
for p, j in [ |
(r"第[零一二三四五六七八九十百]+章", 1), |
(r"第[零一二三四五六七八九十百]+[条节]", 2), |
(r"[零一二三四五六七八九十百]+[、 ]", 3), |
(r"[\((][零一二三四五六七八九十百]+[)\)]", 4), |
(r"[0-9]+(、|\.[ ]|\.[^0-9])", 5), |
(r"[0-9]+\.[0-9]+(、|[. ]|[^0-9])", 6), |
(r"[0-9]+\.[0-9]+\.[0-9]+(、|[ ]|[^0-9])", 7), |
(r"[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+(、|[ ]|[^0-9])", 8), |
(r".{,48}[::??]$", 9), |
(r"[0-9]+)", 10), |
(r"[\((][0-9]+[)\)]", 11), |
(r"[零一二三四五六七八九十百]+是", 12), |
(r"[⚫•➢✓]", 12) |
]: |
if re.match(p, line): |
return j |
return |
def __filterout_scraps(self, boxes, ZM): |
def line_tag(bx): |
pn = [bx["page_number"]] |
top = bx["top"] - self.page_cum_height[pn[0] - 1] |
bott = bx["bottom"] - self.page_cum_height[pn[0] - 1] |
while bott * ZM > self.page_images[pn[-1] - 1].size[1]: |
bott -= self.page_images[pn[-1] - 1].size[1] / ZM |
pn.append(pn[-1] + 1) |
return "@@{}\t{:.1f}\t{:.1f}\t{:.1f}\t{:.1f}##" \ |
.format("-".join([str(p) for p in pn]), |
bx["x0"], bx["x1"], top, bott) |
def width(b): |
return b["x1"] - b["x0"] |
def height(b): |
return b["bottom"] - b["top"] |
def usefull(b): |
if b.get("layout_type"): |
return True |
if width( |
b) > self.page_images[b["page_number"] - 1].size[0] / ZM / 3: |
return True |
if b["bottom"] - b["top"] > self.mean_height[b["page_number"] - 1]: |
return True |
return False |
res = [] |
while boxes: |
lines = [] |
widths = [] |
pw = self.page_images[boxes[0]["page_number"] - 1].size[0] / ZM |
mh = self.mean_height[boxes[0]["page_number"] - 1] |
mj = self.proj_match( |
boxes[0]["text"]) or boxes[0].get( |
"layout_type", |
"") == "title" |
def dfs(line, st): |
nonlocal mh, pw, lines, widths |
lines.append(line) |
widths.append(width(line)) |
width_mean = np.mean(widths) |
mmj = self.proj_match( |
line["text"]) or line.get( |
"layout_type", |
"") == "title" |
for i in range(st + 1, min(st + 20, len(boxes))): |
if (boxes[i]["page_number"] - line["page_number"]) > 0: |
break |
if not mmj and self._y_dis( |
line, boxes[i]) >= 3 * mh and height(line) < 1.5 * mh: |
break |
if not usefull(boxes[i]): |
continue |
if mmj or \ |
(self._x_dis(boxes[i], line) < pw / 10): \ |
dfs(boxes[i], i) |
boxes.pop(i) |
break |
try: |
if usefull(boxes[0]): |
dfs(boxes[0], 0) |
else: |
logging.debug("WASTE: " + boxes[0]["text"]) |
except Exception as e: |
pass |
boxes.pop(0) |
mw = np.mean(widths) |
if mj or mw / pw >= 0.35 or mw > 200: |
res.append("\n".join([c["text"] + line_tag(c) for c in lines])) |
else: |
logging.debug("REMOVED: " + |
"<<".join([c["text"] for c in lines])) |
return "\n\n".join(res) |
def __call__(self, fnm, need_image=True, zoomin=3, return_html=False): |
self.pdf = pdfplumber.open(fnm) if isinstance( |
fnm, str) else pdfplumber.open(BytesIO(fnm)) |
self.lefted_chars = [] |
self.mean_height = [] |
self.mean_width = [] |
self.boxes = [] |
self.garbages = {} |
self.page_cum_height = [0] |
self.page_layout = [] |
self.page_images = [p.to_image( |
resolution=72 * zoomin).annotated for i, p in enumerate(self.pdf.pages[:299])] |
logging.info("Images converted.") |
logging.info("Table processed.") |
for i, img in enumerate(self.page_images): |
chars = [c for c in self.pdf.pages[i].chars if self._has_color(c)] |
self.mean_height.append( |
np.median(sorted([c["height"] for c in chars])) if chars else 0 |
) |
self.mean_width.append( |
np.median(sorted([c["width"] for c in chars])) if chars else 8 |
) |
if i > 0: |
if not chars: |
self.page_cum_height.append(img.size[1] / zoomin) |
else: |
self.page_cum_height.append( |
np.max([c["bottom"] for c in chars])) |
self.__ocr_paddle(i + 1, img, chars, zoomin) |
self.__layouts_paddle(zoomin) |
self.page_cum_height = np.cumsum(self.page_cum_height) |
assert len(self.page_cum_height) == len(self.page_images) |
garbage = set() |
for k in self.garbages.keys(): |
self.garbages[k] = Counter(self.garbages[k]) |
for g, c in self.garbages[k].items(): |
if c > 1: |
garbage.add(g) |
logging.debug("GARBAGE:" + ",".join(garbage)) |
self.boxes = [b for b in self.boxes if b["text"] not in garbage] |
for i in range(len(self.boxes)): |
self.boxes[i]["top"] += \ |
self.page_cum_height[self.boxes[i]["page_number"] - 1] |
self.boxes[i]["bottom"] += \ |
self.page_cum_height[self.boxes[i]["page_number"] - 1] |
self.__table_transformer_job(zoomin) |
self.__text_merge(garbage) |
self.__filter_forpages() |
tbls = self.__extract_table_figure(need_image, zoomin, return_html) |
return self.__filterout_scraps(deepcopy(self.boxes), zoomin), tbls |
def remove_tag(self, txt): |
return re.sub(r"@@[\t0-9.-]+?##", "", txt) |
def crop(self, text, ZM=3): |
imgs = [] |
for tag in re.findall(r"@@[0-9-]+\t[0-9.\t]+##", text): |
pn, left, right, top, bottom = tag.strip( |
"#").strip("@").split("\t") |
left, right, top, bottom = float(left), float( |
right), float(top), float(bottom) |
bottom *= ZM |
pns = [int(p) - 1 for p in pn.split("-")] |
for pn in pns[1:]: |
bottom += self.page_images[pn - 1].size[1] |
imgs.append( |
self.page_images[pns[0]].crop((left * ZM, top * ZM, |
right * |
ZM, min( |
bottom, self.page_images[pns[0]].size[1]) |
)) |
) |
bottom -= self.page_images[pns[0]].size[1] |
for pn in pns[1:]: |
imgs.append( |
self.page_images[pn].crop((left * ZM, 0, |
right * ZM, |
min(bottom, |
self.page_images[pn].size[1]) |
)) |
) |
bottom -= self.page_images[pn].size[1] |
if not imgs: |
return |
GAP = 2 |
height = 0 |
for img in imgs: |
height += img.size[1] + GAP |
height = int(height) |
pic = Image.new("RGB", |
(int(np.max([i.size[0] for i in imgs])), height), |
(245, 245, 245)) |
height = 0 |
for img in imgs: |
pic.paste(img, (0, int(height))) |
height += img.size[1] + GAP |
return pic |
if __name__ == "__main__": |
pass |