|
|
|
import json |
|
import os |
|
import sys |
|
import cv2 |
|
import logging |
|
from logging.handlers import RotatingFileHandler |
|
from concurrent.futures import ThreadPoolExecutor |
|
from tqdm import tqdm |
|
|
|
|
|
|
|
def setup_logger(log_file): |
|
logger = logging.getLogger("ImageProcessingLogger") |
|
logger.setLevel(logging.DEBUG) |
|
|
|
|
|
handler = RotatingFileHandler(log_file, maxBytes=10 * 1024 * 1024, backupCount=5) |
|
handler.setLevel(logging.DEBUG) |
|
|
|
|
|
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') |
|
handler.setFormatter(formatter) |
|
|
|
|
|
logger.addHandler(handler) |
|
return logger |
|
|
|
|
|
logger = setup_logger("app.log") |
|
|
|
|
|
def read_json(file_path): |
|
with open(file_path, 'r', encoding='utf-8') as file: |
|
data = json.load(file) |
|
return data |
|
|
|
def write_json(file_path, data): |
|
with open(file_path, 'w', encoding='utf-8') as file: |
|
json.dump(data, file, ensure_ascii=False, indent=4) |
|
|
|
|
|
|
|
def get_corrdinate(data, image_path,logger, out_root=None): |
|
try: |
|
image_name = image_path.split('/')[-1] |
|
out_path = os.path.join(out_root, image_name) |
|
|
|
y_str, x_str = data['click_loc'].split(',')[2:] |
|
y = float(y_str.split(':')[1]) |
|
x = float(x_str.strip()) |
|
|
|
|
|
image = cv2.imread(image_path) |
|
if image is None: |
|
logger.error(f"Failed to read image: {image_path}") |
|
return |
|
height, width, _ = image.shape |
|
|
|
|
|
x = int(x * width) |
|
y = int(y * height) |
|
|
|
|
|
color = (0, 255, 0) |
|
|
|
thickness = 5 |
|
center = (x, y) |
|
length = 35 |
|
cv2.line(image, (center[0] - length, center[1] - length), (center[0] - length, center[1] + length), color, thickness) |
|
cv2.line(image, (center[0] - length, center[1] + length), (center[0] + length, center[1] + length), color, thickness) |
|
cv2.line(image, (center[0] + length, center[1] + length), (center[0] + length, center[1] - length), color, thickness) |
|
cv2.line(image, (center[0] + length, center[1] - length), (center[0] - length, center[1] - length), color, thickness) |
|
|
|
cv2.imwrite(out_path, image) |
|
|
|
|
|
if count % 100 == 0: |
|
logger.info(f"Processed {count} images successfully.") |
|
except Exception as e: |
|
logger.exception(f"Error processing data: {data}. Exception: {e}") |
|
|
|
|
|
def process_data(data, root_path,logger, out_root): |
|
image_path = os.path.join(root_path, data['image']) |
|
get_corrdinate(data, image_path,logger, out_root) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
json_file = r'/code/Auto-GUI/dataset/mind/general_blip_train_llava_coco.json' |
|
root_path = r'/code/Auto-GUI/dataset' |
|
out_root = r'/code/Auto-GUI/dataset/coco_corrdinate' |
|
count = 0 |
|
|
|
|
|
|
|
data = read_json(json_file) |
|
data1 = [line for line in data if line['action_type'] == '#DUAL_POINT#'][1:] |
|
logging.debug(f"total data: " +str(len(data1))) |
|
|
|
|
|
os.makedirs(out_root, exist_ok=True) |
|
with ThreadPoolExecutor(max_workers=8) as executor: |
|
list(tqdm(executor.map(lambda d: process_data(d, root_path, logger, out_root), data1), total=len(data1))) |
|
|
|
|
|
|
|
logger.info("All processing complete.") |
|
|
|
|
|
|
|
|