Spaces:
Running
Running
import pickle | |
import subprocess | |
import threading | |
from filelock import FileLock | |
import gradio as gr | |
import multiprocessing | |
import time | |
import os | |
from PIL import Image | |
import base64 | |
from io import BytesIO | |
import numpy as np | |
from datetime import datetime | |
from threading import Timer | |
# Assuming this is the runner for your game pipeline (replace with actual import if different) | |
from runner.playground_runner import PlaygroundPipelineRunner | |
# 游戏与关卡的映射字典 | |
Game_to_Levels = { | |
"RaceGame": list(range(1, 10)), | |
"SuperMario": list(range(0, 10)), | |
"FlappyBird": list(range(1, 8)), | |
"TempestRun": list(range(0, 5)), | |
"PongGame": list(range(0, 4)) | |
} | |
# 每个游戏的有效动作字典 | |
valid_actions_dict = { | |
"RaceGame": ["LEFT", "RIGHT", "UP", "DOWN", "FORWARD", "BACKWARD"], | |
"PongGame": ["LEFTUP", "LEFTDOWN", "RIGHTUP", "RIGHTDOWN", "NONE"], | |
"FlappyBird": ["UP", "DOWN", "KEEP", "NONE"], | |
"SuperMario": ["UP", "LEFT", "RIGHT", "UP+LEFT", "UP+RIGHT", "NONE"], | |
"TempestRun": ["JUMP", "LEFT", "RIGHT", "SLIDE", "RISE", "NONE"] | |
} | |
all_actions = [ | |
"LEFT", "LEFTUP", "UP+LEFT", "LEFTDOWN", "RIGHT", "RIGHTUP", "UP+RIGHT", "RIGHTDOWN", | |
"UP", "RISE", "JUMP", "SLIDE", "DOWN", "KEEP", "NONE", "FORWARD", "BACKWARD" | |
] | |
game_pids = {} | |
alive_game_ids = {} | |
def remove_old_game_dirs(): | |
output_dirs = [d for d in os.listdir("./runs") if os.path.isdir(os.path.join("./runs", d))] | |
for game_id in output_dirs: | |
if game_id not in alive_game_ids: | |
run_lock = FileLock(os.path.join(".", "runs", "run.lock")) | |
with run_lock: | |
os.system(f"rm -rf {os.path.join('.', 'runs', game_id)}") | |
elif (datetime.now() - datetime.strptime(alive_game_ids[game_id], '%Y-%m-%d-%H:%M:%S')).days > 1: | |
# If the game directory is older than 1 day, remove it | |
run_lock = FileLock(os.path.join(".", "runs", "run.lock")) | |
with run_lock: | |
os.system(f"rm -rf {os.path.join('.', 'runs', game_id)}") | |
alive_game_ids.pop(game_id) | |
for game_id in list(alive_game_ids): | |
if not os.path.exists(os.path.join(".", "runs", game_id)): | |
alive_game_ids.pop(game_id) | |
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), " - Cleaned up old game directories.") | |
print("Current alive game IDs:", alive_game_ids) | |
def start_game(game, level, state, req: gr.Request): | |
""" | |
Start the game process and yield screenshots for real-time display in Gradio. | |
Args: | |
game (str): The selected game name. | |
level (str): The selected level for the game. | |
Yields: | |
PIL.Image: Game screenshots for display. | |
""" | |
print(f"Starting {game} at {level}") | |
# game_id = time.strftime('%Y-%m-%d-%H:%M:%S',time.localtime(time.time())) | |
game_id = req.session_hash | |
alive_game_ids[game_id] = datetime.now().strftime('%Y-%m-%d-%H:%M:%S') | |
# print("start game: game_id: ", game_id) | |
if game_pids.get(game_id): | |
# kill the previous game process if it exists | |
try: | |
os.kill(game_pids[game_id], 9) | |
print(f"Killed previous game process with PID: {game_pids[game_id]}") | |
except Exception as e: | |
print(f"Error killing previous game process: {e}") | |
output_dir = os.path.join(".", "runs", game_id) | |
if not os.path.exists(output_dir): | |
os.makedirs(output_dir) | |
action_file = os.path.join(output_dir, f"action_{game_id}.txt") | |
with open(action_file, "w") as f: | |
f.write("") | |
state["action_file"] = action_file | |
command = f"nohup python3 -u run_game.py --game {game} --level {level} --action_file {action_file} --game_id {game_id} > run.log 2>&1 & echo $!" | |
with os.popen(command) as f: | |
# 读取命令的输出,这应该就是 PID | |
pid_str = f.read().strip() | |
try: | |
pid = int(pid_str) | |
print(f"Game started with PID: {pid}") | |
game_pids[game_id] = pid | |
except ValueError: | |
print(f"Failed to parse PID from command output: {pid_str}") | |
def write_action(action, state): | |
action_file = state.get("action_file") | |
if not action_file: | |
print("Action file not found in state. Cannot write action.") | |
return | |
if not os.path.exists(action_file): | |
print(f"Action file {action_file} does not exist. Skipping write.") | |
return | |
print(f"Writing action: {action} to {action_file}") | |
with open(action_file, "w") as f: | |
f.write(action) | |
def cleanup(req: gr.Request): | |
game_id = req.session_hash | |
if game_id in alive_game_ids: | |
alive_game_ids.pop(game_id) | |
pid = game_pids.get(game_id) | |
if pid: | |
try: | |
os.kill(pid, 9) | |
print(f"Killed game process with PID: {pid}") | |
except Exception as e: | |
print(f"Error killing game process: {e}") | |
finally: | |
remove_old_game_dirs() | |
def update_image(req: gr.Request=None): | |
if req is None: | |
return None | |
game_id = req.session_hash | |
output_dir = os.path.join(".", "runs", game_id) | |
run_lock = FileLock(os.path.join(".", "runs", "run.lock")) | |
with run_lock: | |
if not os.path.exists(output_dir): | |
game_over_image = os.path.join("gameover.jpg") | |
img_array = np.array(Image.open(game_over_image).convert('RGB')) | |
return img_array | |
pkl = os.path.join(output_dir, f"game_{game_id}.pkl") | |
lock = FileLock(pkl + ".lock") | |
while(True): | |
try: | |
with lock: | |
info = pickle.load(open(pkl, "rb")) | |
break | |
except Exception as e: | |
print(f"Error reading pickle file: {e}") | |
time.sleep(0.1) | |
current_image = info["current_image"] | |
image_data = base64.b64decode(current_image) | |
image = Image.open(BytesIO(image_data)).convert('RGB') | |
image = image.resize((int(image.width * 800 / image.height), 800)) | |
img_array = np.array(image) | |
return img_array | |
with gr.Blocks(title="Game Control Interface") as demo: | |
state = gr.State(value={}) | |
with gr.Row(): | |
with gr.Column(scale=2): | |
game_dropdown = gr.Dropdown( | |
choices=["RaceGame", "SuperMario", "FlappyBird", "TempestRun", "PongGame"], | |
label="Select Game", | |
value="RaceGame" | |
) | |
level_dropdown = gr.Dropdown( | |
choices=Game_to_Levels["RaceGame"], | |
label="Select Level" | |
) | |
start_button = gr.Button("Start") | |
with gr.Column(scale=8): | |
screenshot_display = gr.Image(update_image, label="Game Screen", height=800, interactive=False, every=0.5) | |
game_dropdown.change( | |
fn=lambda game: gr.update(choices=Game_to_Levels.get(game, [])), | |
inputs=game_dropdown, | |
outputs=level_dropdown | |
) | |
start_button.click( | |
fn=start_game, | |
inputs=[game_dropdown, level_dropdown, state], | |
outputs=None | |
) | |
with gr.Row(): | |
action_buttons = {action: gr.Button(action, visible=True if action in valid_actions_dict.get("RaceGame", []) else False) for action in all_actions} | |
def update_action_buttons(game): | |
valid_actions = valid_actions_dict.get(game, []) | |
return {btn: gr.update(visible=(action in valid_actions)) for action, btn in action_buttons.items()} | |
game_dropdown.change( | |
fn=update_action_buttons, | |
inputs=game_dropdown, | |
outputs=list(action_buttons.values()) | |
) | |
for action, btn in action_buttons.items(): | |
btn.click( | |
fn=write_action, | |
inputs=[btn, state], | |
outputs=None, | |
) | |
demo.unload(cleanup) | |
demo.launch() | |