import pickle import subprocess import threading from filelock import FileLock import gradio as gr import multiprocessing import time import os from PIL import Image import base64 from io import BytesIO import numpy as np from datetime import datetime from threading import Timer # Assuming this is the runner for your game pipeline (replace with actual import if different) from runner.playground_runner import PlaygroundPipelineRunner # 游戏与关卡的映射字典 Game_to_Levels = { "RaceGame": list(range(1, 10)), "SuperMario": list(range(0, 10)), "FlappyBird": list(range(1, 8)), "TempestRun": list(range(0, 5)), "PongGame": list(range(0, 4)) } # 每个游戏的有效动作字典 valid_actions_dict = { "RaceGame": ["LEFT", "RIGHT", "UP", "DOWN", "FORWARD", "BACKWARD"], "PongGame": ["LEFTUP", "LEFTDOWN", "RIGHTUP", "RIGHTDOWN", "NONE"], "FlappyBird": ["UP", "DOWN", "KEEP", "NONE"], "SuperMario": ["UP", "LEFT", "RIGHT", "UP+LEFT", "UP+RIGHT", "NONE"], "TempestRun": ["JUMP", "LEFT", "RIGHT", "SLIDE", "RISE", "NONE"] } all_actions = [ "LEFT", "LEFTUP", "UP+LEFT", "LEFTDOWN", "RIGHT", "RIGHTUP", "UP+RIGHT", "RIGHTDOWN", "UP", "RISE", "JUMP", "SLIDE", "DOWN", "KEEP", "NONE", "FORWARD", "BACKWARD" ] game_pids = {} alive_game_ids = {} def remove_old_game_dirs(): output_dirs = [d for d in os.listdir("./runs") if os.path.isdir(os.path.join("./runs", d))] for game_id in output_dirs: if game_id not in alive_game_ids: run_lock = FileLock(os.path.join(".", "runs", "run.lock")) with run_lock: os.system(f"rm -rf {os.path.join('.', 'runs', game_id)}") elif (datetime.now() - datetime.strptime(alive_game_ids[game_id], '%Y-%m-%d-%H:%M:%S')).days > 1: # If the game directory is older than 1 day, remove it run_lock = FileLock(os.path.join(".", "runs", "run.lock")) with run_lock: os.system(f"rm -rf {os.path.join('.', 'runs', game_id)}") alive_game_ids.pop(game_id) for game_id in list(alive_game_ids): if not os.path.exists(os.path.join(".", "runs", game_id)): alive_game_ids.pop(game_id) print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), " - Cleaned up old game directories.") print("Current alive game IDs:", alive_game_ids) def start_game(game, level, state, req: gr.Request): """ Start the game process and yield screenshots for real-time display in Gradio. Args: game (str): The selected game name. level (str): The selected level for the game. Yields: PIL.Image: Game screenshots for display. """ print(f"Starting {game} at {level}") # game_id = time.strftime('%Y-%m-%d-%H:%M:%S',time.localtime(time.time())) game_id = req.session_hash alive_game_ids[game_id] = datetime.now().strftime('%Y-%m-%d-%H:%M:%S') # print("start game: game_id: ", game_id) if game_pids.get(game_id): # kill the previous game process if it exists try: os.kill(game_pids[game_id], 9) print(f"Killed previous game process with PID: {game_pids[game_id]}") except Exception as e: print(f"Error killing previous game process: {e}") output_dir = os.path.join(".", "runs", game_id) if not os.path.exists(output_dir): os.makedirs(output_dir) action_file = os.path.join(output_dir, f"action_{game_id}.txt") with open(action_file, "w") as f: f.write("") state["action_file"] = action_file command = f"nohup python3 -u run_game.py --game {game} --level {level} --action_file {action_file} --game_id {game_id} > run.log 2>&1 & echo $!" with os.popen(command) as f: # 读取命令的输出,这应该就是 PID pid_str = f.read().strip() try: pid = int(pid_str) print(f"Game started with PID: {pid}") game_pids[game_id] = pid except ValueError: print(f"Failed to parse PID from command output: {pid_str}") def write_action(action, state): action_file = state.get("action_file") if not action_file: print("Action file not found in state. Cannot write action.") return if not os.path.exists(action_file): print(f"Action file {action_file} does not exist. Skipping write.") return print(f"Writing action: {action} to {action_file}") with open(action_file, "w") as f: f.write(action) def cleanup(req: gr.Request): game_id = req.session_hash if game_id in alive_game_ids: alive_game_ids.pop(game_id) pid = game_pids.get(game_id) if pid: try: os.kill(pid, 9) print(f"Killed game process with PID: {pid}") except Exception as e: print(f"Error killing game process: {e}") finally: remove_old_game_dirs() def update_image(req: gr.Request=None): if req is None: return None game_id = req.session_hash output_dir = os.path.join(".", "runs", game_id) run_lock = FileLock(os.path.join(".", "runs", "run.lock")) with run_lock: if not os.path.exists(output_dir): game_over_image = os.path.join("gameover.jpg") img_array = np.array(Image.open(game_over_image).convert('RGB')) return img_array pkl = os.path.join(output_dir, f"game_{game_id}.pkl") lock = FileLock(pkl + ".lock") while(True): try: with lock: info = pickle.load(open(pkl, "rb")) break except Exception as e: print(f"Error reading pickle file: {e}") time.sleep(0.1) current_image = info["current_image"] image_data = base64.b64decode(current_image) image = Image.open(BytesIO(image_data)).convert('RGB') image = image.resize((int(image.width * 800 / image.height), 800)) img_array = np.array(image) return img_array with gr.Blocks(title="Game Control Interface") as demo: state = gr.State(value={}) with gr.Row(): with gr.Column(scale=2): game_dropdown = gr.Dropdown( choices=["RaceGame", "SuperMario", "FlappyBird", "TempestRun", "PongGame"], label="Select Game", value="RaceGame" ) level_dropdown = gr.Dropdown( choices=Game_to_Levels["RaceGame"], label="Select Level" ) start_button = gr.Button("Start") with gr.Column(scale=8): screenshot_display = gr.Image(update_image, label="Game Screen", height=800, interactive=False, every=0.5) game_dropdown.change( fn=lambda game: gr.update(choices=Game_to_Levels.get(game, [])), inputs=game_dropdown, outputs=level_dropdown ) start_button.click( fn=start_game, inputs=[game_dropdown, level_dropdown, state], outputs=None ) with gr.Row(): action_buttons = {action: gr.Button(action, visible=True if action in valid_actions_dict.get("RaceGame", []) else False) for action in all_actions} def update_action_buttons(game): valid_actions = valid_actions_dict.get(game, []) return {btn: gr.update(visible=(action in valid_actions)) for action, btn in action_buttons.items()} game_dropdown.change( fn=update_action_buttons, inputs=game_dropdown, outputs=list(action_buttons.values()) ) for action, btn in action_buttons.items(): btn.click( fn=write_action, inputs=[btn, state], outputs=None, ) demo.unload(cleanup) demo.launch()