V-MAGE-DEMO / runner /playground_runner.py
Fengx1n's picture
Initial DEMO
e53fda1
from datetime import datetime
import importlib
import os
import pathlib
import queue
import random
import threading
import time
from filelock import FileLock
import pygame
from agent.game_agent import game_agent
from provider.OpenAIProvider import OpenAIProvider
from utils.game_utils import seed_everything
from utils.calculate_log import calculate_statistics, extract_scores
from utils.config import Config
from utils.encoding_utils import encode_data_to_base64_path
from utils.file_utils import assemble_project_path, get_all_files, img_to_gif, run_path_construct
from utils.json_utils import parse_semi_formatted_text
from utils.lmm_utils import assemble_prompt
from utils.planner_utils import _extract_keys_from_template
import logging
import pickle
config = Config()
class PlaygroundPipelineRunner():
def __init__(self, game, level, history_steps, action_file, game_id, stop_event, params=None):
print("PlaygroundPipelineRunner init.")
# TODO
self.output_dir = os.path.join(".", "runs", game_id)
if not os.path.exists(self.output_dir):
os.makedirs(self.output_dir)
games = ["RaceGame", "SuperMario", "FlappyBird", "TempestRun", "PongGame"]
gamename_to_envname = {
"RaceGame": "race",
"SuperMario": "mario",
"FlappyBird": "flappybird",
"TempestRun": "tempestrun",
"PongGame": "pong"
}
gamename_to_levelname = {
"RaceGame": "racegame",
"SuperMario": "supermariogame",
"FlappyBird": "flappybirdgame",
"TempestRun": "tempestrungame",
"PongGame": "ponggame"
}
# TODO history steps
gameEnvConfig = f"config/env_config/env_config_{gamename_to_envname[game]}_reasoning_3steps.json"
levelConfig = f"config/level_config/{gamename_to_levelname[game]}/level{level}.json"
config.load_env_config(gameEnvConfig)
config.load_level_config(levelConfig)
self.env_name = config.env_name
self.game_module = config.game_module
self.game_class = config.game_class
self.new_input_event = threading.Event()
self.new_input_event.clear()
self.history_images = []
self.current_image = None
self.history_actions = []
self.game_id = game_id
self.action_file = action_file
self.stop_event = stop_event
self.save_file = f"{self.output_dir}/game_{self.game_id}.pkl"
self.step_signal_file = f"{self.output_dir}/step_signal.txt"
if params is not None:
print(f"Pipeline runner loaded params: {params}")
self.provider = OpenAIProvider(params)
self.agent = game_agent(self.provider)
else:
self.provider = None
self.agent = game_agent()
def input_listener(self, event):
count = 0
flag = True
while not event.is_set() and not self.game.over and not self.stop_event.is_set():
if not self.game.new_action_event.is_set():
if flag:
game_info = self.game.get_game_info()
self.agent.update_game_info(game_info)
self.history_images = [
x['image'] for x in self.agent.history[-4:-1]
]
self.current_image = self.agent.history[-1]['image']
self.history_actions = [
x['history_action'] for x in self.agent.history[-4:-1]
]
info = {
"history_images": self.history_images,
"current_image": self.current_image,
"history_actions": self.history_actions
}
self.last_info = info
print("In runner:", info["history_actions"])
lock = FileLock(self.save_file + ".lock")
with lock:
pickle.dump(info, open(self.save_file, "wb"))
flag = False
if self.provider:
if not os.path.exists(self.step_signal_file):
continue
with open(self.step_signal_file, "r") as f:
step_signal = f.read()
if step_signal == "step":
with open(self.step_signal_file, "w") as f:
f.write("")
success, msg = self.agent.execute_action()
if success:
self.game.current_action = msg
flag = True
else:
error_msg = msg
info = self.last_info
info["Error"] = error_msg
lock = FileLock(self.save_file + ".lock")
with lock:
pickle.dump(info, open(self.save_file, "wb"))
time.sleep(0.5)
continue
elif step_signal == "continuous":
# TODO
pass
else:
continue
else:
if not os.path.exists(self.action_file):
continue
with open(self.action_file, "r") as f:
self.game.current_action = f.read()
if self.game.current_action == "":
continue
with open(self.action_file, "w") as f:
f.write("")
flag = True
print("Input listener get action: ", self.game.current_action)
self.game.new_action_event.set()
else:
count += 1
if count % 5 == 0:
print("Input listener waiting for event.")
time.sleep(0.02)
print("Input listener exit 0.")
print("self.game.over: ", self.game.over)
self.game.over = True
def run(self):
print(f"{self.env_name} Playground Pipeline Running.")
game_module = importlib.import_module(self.game_module)
game_class = getattr(game_module, self.game_class)
self.game = game_class(self.output_dir)
self.game.run(self.input_listener)
def pipeline_shutdown(self):
self.agent = None