V-MAGE-DEMO / app.py
Fengx1n's picture
fix: log remove
cfb310a
import pickle
import subprocess
import threading
from filelock import FileLock
import gradio as gr
import multiprocessing
import time
import os
from PIL import Image
import base64
from io import BytesIO
import numpy as np
from datetime import datetime
from threading import Timer
# Assuming this is the runner for your game pipeline (replace with actual import if different)
from runner.playground_runner import PlaygroundPipelineRunner
# 游戏与关卡的映射字典
Game_to_Levels = {
"RaceGame": list(range(1, 10)),
"SuperMario": list(range(0, 10)),
"FlappyBird": list(range(1, 8)),
"TempestRun": list(range(0, 5)),
"PongGame": list(range(0, 4))
}
# 每个游戏的有效动作字典
valid_actions_dict = {
"RaceGame": ["LEFT", "RIGHT", "UP", "DOWN", "FORWARD", "BACKWARD"],
"PongGame": ["LEFTUP", "LEFTDOWN", "RIGHTUP", "RIGHTDOWN", "NONE"],
"FlappyBird": ["UP", "DOWN", "KEEP", "NONE"],
"SuperMario": ["UP", "LEFT", "RIGHT", "UP+LEFT", "UP+RIGHT", "NONE"],
"TempestRun": ["JUMP", "LEFT", "RIGHT", "SLIDE", "RISE", "NONE"]
}
all_actions = [
"LEFT", "LEFTUP", "UP+LEFT", "LEFTDOWN", "RIGHT", "RIGHTUP", "UP+RIGHT", "RIGHTDOWN",
"UP", "RISE", "JUMP", "SLIDE", "DOWN", "KEEP", "NONE", "FORWARD", "BACKWARD"
]
game_pids = {}
alive_game_ids = {}
def remove_old_game_dirs():
output_dirs = [d for d in os.listdir("./runs") if os.path.isdir(os.path.join("./runs", d))]
for game_id in output_dirs:
if game_id not in alive_game_ids:
run_lock = FileLock(os.path.join(".", "runs", "run.lock"))
with run_lock:
os.system(f"rm -rf {os.path.join('.', 'runs', game_id)}")
elif (datetime.now() - datetime.strptime(alive_game_ids[game_id], '%Y-%m-%d-%H:%M:%S')).days > 1:
# If the game directory is older than 1 day, remove it
run_lock = FileLock(os.path.join(".", "runs", "run.lock"))
with run_lock:
os.system(f"rm -rf {os.path.join('.', 'runs', game_id)}")
alive_game_ids.pop(game_id)
for game_id in list(alive_game_ids):
if not os.path.exists(os.path.join(".", "runs", game_id)):
alive_game_ids.pop(game_id)
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), " - Cleaned up old game directories.")
print("Current alive game IDs:", alive_game_ids)
def start_game(game, level, state, req: gr.Request):
"""
Start the game process and yield screenshots for real-time display in Gradio.
Args:
game (str): The selected game name.
level (str): The selected level for the game.
Yields:
PIL.Image: Game screenshots for display.
"""
print(f"Starting {game} at {level}")
# game_id = time.strftime('%Y-%m-%d-%H:%M:%S',time.localtime(time.time()))
game_id = req.session_hash
alive_game_ids[game_id] = datetime.now().strftime('%Y-%m-%d-%H:%M:%S')
# print("start game: game_id: ", game_id)
if game_pids.get(game_id):
# kill the previous game process if it exists
try:
os.kill(game_pids[game_id], 9)
print(f"Killed previous game process with PID: {game_pids[game_id]}")
except Exception as e:
print(f"Error killing previous game process: {e}")
output_dir = os.path.join(".", "runs", game_id)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
action_file = os.path.join(output_dir, f"action_{game_id}.txt")
with open(action_file, "w") as f:
f.write("")
state["action_file"] = action_file
command = f"nohup python3 -u run_game.py --game {game} --level {level} --action_file {action_file} --game_id {game_id} > run.log 2>&1 & echo $!"
with os.popen(command) as f:
# 读取命令的输出,这应该就是 PID
pid_str = f.read().strip()
try:
pid = int(pid_str)
print(f"Game started with PID: {pid}")
game_pids[game_id] = pid
except ValueError:
print(f"Failed to parse PID from command output: {pid_str}")
def write_action(action, state):
action_file = state.get("action_file")
if not action_file:
print("Action file not found in state. Cannot write action.")
return
if not os.path.exists(action_file):
print(f"Action file {action_file} does not exist. Skipping write.")
return
print(f"Writing action: {action} to {action_file}")
with open(action_file, "w") as f:
f.write(action)
def cleanup(req: gr.Request):
game_id = req.session_hash
if game_id in alive_game_ids:
alive_game_ids.pop(game_id)
pid = game_pids.get(game_id)
if pid:
try:
os.kill(pid, 9)
print(f"Killed game process with PID: {pid}")
except Exception as e:
print(f"Error killing game process: {e}")
finally:
remove_old_game_dirs()
def update_image(req: gr.Request=None):
if req is None:
return None
game_id = req.session_hash
output_dir = os.path.join(".", "runs", game_id)
run_lock = FileLock(os.path.join(".", "runs", "run.lock"))
with run_lock:
if not os.path.exists(output_dir):
game_over_image = os.path.join("gameover.jpg")
img_array = np.array(Image.open(game_over_image).convert('RGB'))
return img_array
pkl = os.path.join(output_dir, f"game_{game_id}.pkl")
lock = FileLock(pkl + ".lock")
while(True):
try:
with lock:
info = pickle.load(open(pkl, "rb"))
break
except Exception as e:
print(f"Error reading pickle file: {e}")
time.sleep(0.1)
current_image = info["current_image"]
image_data = base64.b64decode(current_image)
image = Image.open(BytesIO(image_data)).convert('RGB')
image = image.resize((int(image.width * 800 / image.height), 800))
img_array = np.array(image)
return img_array
with gr.Blocks(title="Game Control Interface") as demo:
state = gr.State(value={})
with gr.Row():
with gr.Column(scale=2):
game_dropdown = gr.Dropdown(
choices=["RaceGame", "SuperMario", "FlappyBird", "TempestRun", "PongGame"],
label="Select Game",
value="RaceGame"
)
level_dropdown = gr.Dropdown(
choices=Game_to_Levels["RaceGame"],
label="Select Level"
)
start_button = gr.Button("Start")
with gr.Column(scale=8):
screenshot_display = gr.Image(update_image, label="Game Screen", height=800, interactive=False, every=0.5)
game_dropdown.change(
fn=lambda game: gr.update(choices=Game_to_Levels.get(game, [])),
inputs=game_dropdown,
outputs=level_dropdown
)
start_button.click(
fn=start_game,
inputs=[game_dropdown, level_dropdown, state],
outputs=None
)
with gr.Row():
action_buttons = {action: gr.Button(action, visible=True if action in valid_actions_dict.get("RaceGame", []) else False) for action in all_actions}
def update_action_buttons(game):
valid_actions = valid_actions_dict.get(game, [])
return {btn: gr.update(visible=(action in valid_actions)) for action, btn in action_buttons.items()}
game_dropdown.change(
fn=update_action_buttons,
inputs=game_dropdown,
outputs=list(action_buttons.values())
)
for action, btn in action_buttons.items():
btn.click(
fn=write_action,
inputs=[btn, state],
outputs=None,
)
demo.unload(cleanup)
demo.launch()