import gradio as gr import re import base64 import os from PIL import Image, ImageDraw from io import BytesIO from img_utils import smart_resize import backoff import httpx from loguru import logger import time from typing import List, Optional import traceback def image_to_base64(image): buffered = BytesIO() image.save(buffered, format="PNG") img_str = base64.b64encode(buffered.getvalue()).decode("utf-8") return img_str def create_grounding_messages(image: str, instruction: str): image_path = image_to_base64(image) """Create chat messages for GUI grounding task.""" system_prompt = ( "You are a GUI agent. You are given a task and a screenshot of the screen. " "You need to perform a series of pyautogui actions to complete the task." ) messages = [ {"role": "system", "content": system_prompt}, { "role": "user", "content": [ {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{image_path}"}}, {"type": "text", "text": instruction} ], }, ] return messages def draw_circle_at_point(image, x, y, outline_color="#FF3366", line_width=5, radius=15): """Draw a circle at the specified point""" draw = ImageDraw.Draw(image) # Draw circle outline draw.ellipse( [x - radius, y - radius, x + radius, y + radius], outline=outline_color, width=line_width ) return image def draw_bounding_boxes(image, bounding_boxes, outline_color="#FF3366", line_width=3): draw = ImageDraw.Draw(image) for box in bounding_boxes: xmin, ymin, xmax, ymax = box # Draw rounded rectangle radius = 10 # Corner radius # Draw the rounded rectangle using arcs and lines # Top left corner draw.arc((xmin, ymin, xmin + 2 * radius, ymin + 2 * radius), 180, 270, fill=outline_color, width=line_width) # Top right corner draw.arc((xmax - 2 * radius, ymin, xmax, ymin + 2 * radius), 270, 0, fill=outline_color, width=line_width) # Bottom right corner draw.arc((xmax - 2 * radius, ymax - 2 * radius, xmax, ymax), 0, 90, fill=outline_color, width=line_width) # Bottom left corner draw.arc((xmin, ymax - 2 * radius, xmin + 2 * radius, ymax), 90, 180, fill=outline_color, width=line_width) # Top line draw.line((xmin + radius, ymin, xmax - radius, ymin), fill=outline_color, width=line_width) # Right line draw.line((xmax, ymin + radius, xmax, ymax - radius), fill=outline_color, width=line_width) # Bottom line draw.line((xmin + radius, ymax, xmax - radius, ymax), fill=outline_color, width=line_width) # Left line draw.line((xmin, ymin + radius, xmin, ymax - radius), fill=outline_color, width=line_width) return image # Import parsing and evaluation functions from the provided code def extract_coordinates(code): """Extract coordinates from pyautogui command string.""" coords = {"x1": None, "y1": None, "x2": None, "y2": None} if not ("x=" in code and "y=" in code): return None commands = code.split("\n") first_cmd = commands[0] x_match = re.search(r"x=([\d.]+)", first_cmd) y_match = re.search(r"y=([\d.]+)", first_cmd) if x_match and y_match: coords["x1"] = float(x_match.group(1)) coords["y1"] = float(y_match.group(1)) coords["x2"] = coords["x1"] coords["y2"] = coords["y1"] if len(commands) == 2: x_match = re.search(r"x=([\d.]+)", commands[1]) y_match = re.search(r"y=([\d.]+)", commands[1]) if x_match and y_match: coords["x2"] = float(x_match.group(1)) coords["y2"] = float(y_match.group(1)) if None in coords.values(): print("Failed to extract coordinates") return None return [coords["x1"], coords["y1"], coords["x2"], coords["y2"]] def split_args(args_str: str) -> List[str]: args = [] current_arg = "" within_string = False string_char = "" prev_char = "" for char in args_str: if char in ['"', "'"]: if not within_string: within_string = True string_char = char elif within_string and prev_char != "\\" and char == string_char: within_string = False if char == "," and not within_string: args.append(current_arg) current_arg = "" else: current_arg += char prev_char = char if current_arg: args.append(current_arg) return args def correct_pyautogui_arguments(code: str) -> str: function_corrections = { "write": { "incorrect_args": ["text", "content"], "correct_args": [], "keyword_arg": "message", }, "press": { "incorrect_args": ["key", "button"], "correct_args": [], "keyword_arg": None, }, "hotkey": { "incorrect_args": ["key1", "key2", "keys"], "correct_args": [], "keyword_arg": None, }, } lines = code.strip().split("\n") corrected_lines = [] for line in lines: line = line.strip() match = re.match(r"(pyautogui\.(\w+))\((.*)\)", line) if match: full_func_call = match.group(1) func_name = match.group(2) args_str = match.group(3) if func_name in function_corrections: func_info = function_corrections[func_name] args = split_args(args_str) corrected_args = [] for arg in args: arg = arg.strip() kwarg_match = re.match(r"(\w+)\s*=\s*(.*)", arg) if kwarg_match: arg_name = kwarg_match.group(1) arg_value = kwarg_match.group(2) if arg_name in func_info["incorrect_args"]: if func_info["keyword_arg"]: corrected_args.append(f"{func_info['keyword_arg']}={arg_value}") else: corrected_args.append(arg_value) else: corrected_args.append(f"{arg_name}={arg_value}") else: corrected_args.append(arg) corrected_args_str = ", ".join(corrected_args) corrected_line = f"{full_func_call}({corrected_args_str})" corrected_lines.append(corrected_line) else: corrected_lines.append(line) else: corrected_lines.append(line) corrected_code = "\n".join(corrected_lines) return corrected_code def transform_agnet_action_to_code_block(action): if any(keyword in action for keyword in ["computer.terminate", "computer.wait", "browser.select_option", "browser.clear"]): return f"```code\n{action}\n```" else: return f"```python\n{action}\n```" def _coordinate_projection(x, y, screen_width, screen_height, coordinate_type): if coordinate_type == "relative": return int(round(x * screen_width)), int(round(y * screen_height)) elif coordinate_type == "absolute": return x, y elif coordinate_type == "qwen25": if 0 <= x <= 1 and 0 <= y <= 1: # If already normalized, treat like "relative" return int(round(x * screen_width)), int(round(y * screen_height)) height, width = smart_resize( height=screen_height, width=screen_width, factor=28, min_pixels=3136, max_pixels=12845056 ) return int(x / width * screen_width), int(y / height * screen_height) elif coordinate_type == "relative1000": if screen_width == 0 or screen_height == 0: raise ValueError("Screen width and height must be greater than zero for relative1000 coordinates.") x_abs = int(round(x * screen_width / 1000)) y_abs = int(round(y * screen_height / 1000)) return x_abs, y_abs else: raise ValueError(f"Unsupported coordinate type: {coordinate_type}") def project_coordinate_to_absolute_scale(pyautogui_code_relative_coordinates, screen_width, screen_height, coordinate_type="qwen25"): """ Convert the relative coordinates in the pyautogui code to absolute coordinates based on the logical screen size. """ import re import ast if coordinate_type not in ["relative", "relative1000", "absolute", "qwen25"]: raise ValueError(f"Invalid coordinate type: {coordinate_type}. Expected one of ['relative', 'relative1000', 'absolute', 'qwen25'].") pattern = r'(pyautogui\.\w+\([^\)]*\))' matches = re.findall(pattern, pyautogui_code_relative_coordinates) new_code = pyautogui_code_relative_coordinates for full_call in matches: func_name_pattern = r'(pyautogui\.\w+)\((.*)\)' func_match = re.match(func_name_pattern, full_call, re.DOTALL) if not func_match: continue func_name = func_match.group(1) args_str = func_match.group(2) try: parsed = ast.parse(f"func({args_str})").body[0].value parsed_args = parsed.args parsed_keywords = parsed.keywords except SyntaxError: return pyautogui_code_relative_coordinates function_parameters = { 'click': ['x', 'y', 'clicks', 'interval', 'button', 'duration', 'pause'], 'moveTo': ['x', 'y', 'duration', 'tween', 'pause'], 'moveRel': ['xOffset', 'yOffset', 'duration', 'tween', 'pause'], 'dragTo': ['x', 'y', 'duration', 'button', 'mouseDownUp', 'pause'], 'dragRel': ['xOffset', 'yOffset', 'duration', 'button', 'mouseDownUp', 'pause'], 'doubleClick': ['x', 'y', 'interval', 'button', 'duration', 'pause'], } func_base_name = func_name.split('.')[-1] param_names = function_parameters.get(func_base_name, []) args = {} for idx, arg in enumerate(parsed_args): if idx < len(param_names): param_name = param_names[idx] arg_value = ast.literal_eval(arg) args[param_name] = arg_value try: for kw in parsed_keywords: param_name = kw.arg arg_value = ast.literal_eval(kw.value) args[param_name] = arg_value except Exception as e: logger.error(f"Error parsing keyword arguments: {e}") return pyautogui_code_relative_coordinates updated = False if 'x' in args and 'y' in args: try: x_rel = float(args['x']) y_rel = float(args['y']) x_abs, y_abs = _coordinate_projection(x_rel, y_rel, screen_width, screen_height, coordinate_type) logger.warning(f"Projecting coordinates: ({x_rel}, {y_rel}) to ({x_abs}, {y_abs}) using {coordinate_type} projection.") args['x'] = x_abs args['y'] = y_abs updated = True except ValueError: pass if 'xOffset' in args and 'yOffset' in args: try: x_rel = float(args['xOffset']) y_rel = float(args['yOffset']) x_abs, y_abs = _coordinate_projection(x_rel, y_rel, screen_width, screen_height, coordinate_type) args['xOffset'] = x_abs args['yOffset'] = y_abs updated = True except ValueError: pass if updated: reconstructed_args = [] for idx, param_name in enumerate(param_names): if param_name in args: arg_value = args[param_name] if isinstance(arg_value, str): arg_repr = f"'{arg_value}'" else: arg_repr = str(arg_value) reconstructed_args.append(arg_repr) else: break used_params = set(param_names[:len(reconstructed_args)]) for kw in parsed_keywords: if kw.arg not in used_params: arg_value = args[kw.arg] if isinstance(arg_value, str): arg_repr = f"{kw.arg}='{arg_value}'" else: arg_repr = f"{kw.arg}={arg_value}" reconstructed_args.append(arg_repr) new_args_str = ', '.join(reconstructed_args) new_full_call = f"{func_name}({new_args_str})" new_code = new_code.replace(full_call, new_full_call) return new_code def parse_response_to_cot_and_action(input_string, screen_width, screen_height, coordinate_type="qwen25") -> Optional[str]: """Parse response including Observation, Thought, Action and code block""" sections = {} obs_match = re.search(r'^##\s*Observation\s*:?[\n\r]+(.*?)(?=^##\s*Thought:|^##\s*Action:|^##|\Z)', input_string, re.DOTALL | re.MULTILINE) if obs_match: sections['observation'] = obs_match.group(1).strip() thought_match = re.search(r'^##\s*Thought\s*:?[\n\r]+(.*?)(?=^##\s*Action:|^##|\Z)', input_string, re.DOTALL | re.MULTILINE) if thought_match: sections['thought'] = thought_match.group(1).strip() action_match = re.search(r'^##\s*Action\s*:?[\n\r]+(.*?)(?=^##|\Z)', input_string, re.DOTALL | re.MULTILINE) if action_match: action = action_match.group(1).strip() sections['action'] = action.strip() if "computer.wait" in input_string.lower(): code_blocks = re.findall(r'```(?:code|python)?\s*(.*?)\s*```', input_string, re.DOTALL | re.IGNORECASE) if code_blocks: code = code_blocks[-1].strip() sections['original_code'] = transform_agnet_action_to_code_block(code) sections["code"] = "WAIT" return sections elif "computer.terminate" in input_string.lower(): # Look for code blocks that might contain terminate command code_blocks = re.findall(r'```(?:code|python)?\s*(.*?)\s*```', input_string, re.DOTALL | re.IGNORECASE) if code_blocks: last_code = code_blocks[-1].strip().lower() if "fail" in last_code: sections['code'] = "FAIL" return sections elif "success" in last_code: sections['code'] = "DONE" return sections code_blocks = re.findall(r'```(?:python)\s*(.*?)\s*```', input_string, re.DOTALL) if code_blocks: code = code_blocks[-1].strip() sections['original_code'] = transform_agnet_action_to_code_block(code) corrected_code = correct_pyautogui_arguments(code) sections['code'] = corrected_code sections['code'] = project_coordinate_to_absolute_scale(corrected_code, screen_width=screen_width, screen_height=screen_height, coordinate_type=coordinate_type) if 'code' not in sections: logger.error("Missing required action or code section") sections['code'] = "FAIL" return sections return sections def rescale_bounding_boxes(bounding_boxes, original_width, original_height, scaled_width=1000, scaled_height=1000): x_scale = original_width / scaled_width y_scale = original_height / scaled_height rescaled_boxes = [] for box in bounding_boxes: xmin, ymin, xmax, ymax = box rescaled_box = [ xmin * x_scale, ymin * y_scale, xmax * x_scale, ymax * y_scale ] rescaled_boxes.append(rescaled_box) return rescaled_boxes def parse_coordinates(code): """ Parse coordinates from pyautogui code. Supports: click, moveTo, dragTo, doubleClick, middleClick, rightClick, tripleClick Returns: [x, y] or None if no coordinates found """ if not code or code in ["WAIT", "FAIL", "DONE"]: return None # List of pyautogui functions that take x, y coordinates coordinate_functions = [ 'click', 'moveTo', 'dragTo', 'doubleClick', 'middleClick', 'rightClick', 'tripleClick' ] # Pattern to match pyautogui function calls pattern = r'pyautogui\.(' + '|'.join(coordinate_functions) + r')\s*\([^)]*\)' # Find all matching function calls matches = re.findall(pattern, code) if not matches: return None # Get the first matching function call func_name = matches[0] func_pattern = rf'pyautogui\.{func_name}\s*\(([^)]*)\)' func_match = re.search(func_pattern, code) if not func_match: return None args_str = func_match.group(1) # Try to extract x and y coordinates # Method 1: Look for x=value, y=value patterns x_match = re.search(r'x\s*=\s*([\d.]+)', args_str) y_match = re.search(r'y\s*=\s*([\d.]+)', args_str) if x_match and y_match: try: x = float(x_match.group(1)) y = float(y_match.group(1)) return [x, y] except ValueError: pass # Method 2: Look for positional arguments (first two numbers) # Remove any keyword arguments first args_without_kwargs = re.sub(r'\w+\s*=\s*[^,]+', '', args_str) # Find all numbers in the remaining arguments numbers = re.findall(r'([\d.]+)', args_without_kwargs) if len(numbers) >= 2: try: x = float(numbers[0]) y = float(numbers[1]) return [x, y] except ValueError: pass return None @backoff.on_exception( backoff.constant, # here you should add more model exceptions as you want, # but you are forbidden to add "Exception", that is, a common type of exception # because we want to catch this kind of Exception in the outside to ensure # each example won't exceed the time limit ( Exception ), interval=30, max_tries=10 ) def call_llm(payload): """Call the LLM API""" headers = { "Content-Type": "application/json", "Authorization": f"Bearer {os.environ['OPENCUA_API_KEY']}" } response = None for _ in range(30): response = httpx.post( os.environ['OPENCUA_URL'], headers=headers, json=payload, timeout=500, verify=False ) if response.status_code != 200: logger.error("Failed to call LLM: " + response.text) logger.error("Retrying...") time.sleep(5) else: response = response.json() finish_reason = response["choices"][0].get("finish_reason") if finish_reason is not None and finish_reason == "stop": # for most of the time, length will not exceed max_tokens return response['choices'][0]['message']['content'] else: logger.error("LLM did not finish properly, retrying...") time.sleep(5) def run_inference(image, text_input): if image is None: return "Please upload an image", "", None if not text_input: text_input = "Describe this image in detail" resized_height, resized_width = smart_resize(image.height, image.width, max_pixels=12845056) messages = create_grounding_messages(image, instruction = text_input) output_text = call_llm({ "model": "opencua", "messages": messages, "max_tokens": 2000, "top_p": 0.9, "temperature": 0 }) print(output_text) try: sections = parse_response_to_cot_and_action(output_text, resized_width, resized_height, coordinate_type="qwen25") # Parse coordinates from the code coordinates = parse_coordinates(sections.get('code', '')) if coordinates is None: # No coordinates found, return original image return output_text, "No coordinates found", image # Extract x, y from coordinates x, y = coordinates # Draw a red circle at the parsed coordinates annotated_image = draw_circle_at_point(image.copy(), x, y) return output_text, f"x: {x}, y: {y}", annotated_image except Exception as e: # 获取完整的traceback信息 tb_str = traceback.format_exc() logger.error(f"Error in run_inference: {e}\nTraceback:\n{tb_str}") return output_text, f"Error: {str(e)}\n{tb_str}", image # Load example images example_images = [ # "assets/images/example_0.png", "assets/images/example_1.jpg", "assets/images/example_2.png" ] example_prompts = [ # "Select the C9 cell", "Close the file explorer", "Click on the word 'underserved'" ] examples = [[Image.open(img), prompt] for img, prompt in zip(example_images, example_prompts)] css = """ #output { height: 500px; overflow: auto; border: 1px solid #ccc; } """ with gr.Blocks(css=css) as demo: gr.Markdown( """ # OpenCUA GUI Grounding Demo Upload a screenshot and provide a description of an element. In the demo, we use the OpenCUA-32B model for demostration. """) with gr.Row(): with gr.Column(): input_img = gr.Image(label="Input Image", type="pil") text_input = gr.Textbox(label="Instruction") submit_btn = gr.Button(value="Submit") with gr.Column(): model_output_text = gr.Textbox(label="Model Output", lines=5) model_output_box = gr.Textbox(label="Coordinates", lines=2) annotated_image = gr.Image(label="Annotated Image") submit_btn.click(run_inference, [input_img, text_input], [model_output_text, model_output_box, annotated_image]) # Add examples gr.Examples( examples=examples, inputs=[input_img, text_input], outputs=[model_output_text, model_output_box, annotated_image], fn=run_inference, cache_examples=True, ) demo.launch(debug=True)