Spaces:
Sleeping
Sleeping
from datetime import datetime | |
import importlib | |
import os | |
import pathlib | |
import queue | |
import random | |
import threading | |
import time | |
from filelock import FileLock | |
import pygame | |
from agent.game_agent import game_agent | |
from provider.OpenAIProvider import OpenAIProvider | |
from utils.game_utils import seed_everything | |
from utils.calculate_log import calculate_statistics, extract_scores | |
from utils.config import Config | |
from utils.encoding_utils import encode_data_to_base64_path | |
from utils.file_utils import assemble_project_path, get_all_files, img_to_gif, run_path_construct | |
from utils.json_utils import parse_semi_formatted_text | |
from utils.lmm_utils import assemble_prompt | |
from utils.planner_utils import _extract_keys_from_template | |
import logging | |
import pickle | |
config = Config() | |
class PlaygroundPipelineRunner(): | |
def __init__(self, game, level, history_steps, action_file, game_id, stop_event, params=None): | |
print("PlaygroundPipelineRunner init.") | |
# TODO | |
self.output_dir = os.path.join(".", "runs", game_id) | |
if not os.path.exists(self.output_dir): | |
os.makedirs(self.output_dir) | |
games = ["RaceGame", "SuperMario", "FlappyBird", "TempestRun", "PongGame"] | |
gamename_to_envname = { | |
"RaceGame": "race", | |
"SuperMario": "mario", | |
"FlappyBird": "flappybird", | |
"TempestRun": "tempestrun", | |
"PongGame": "pong" | |
} | |
gamename_to_levelname = { | |
"RaceGame": "racegame", | |
"SuperMario": "supermariogame", | |
"FlappyBird": "flappybirdgame", | |
"TempestRun": "tempestrungame", | |
"PongGame": "ponggame" | |
} | |
# TODO history steps | |
gameEnvConfig = f"config/env_config/env_config_{gamename_to_envname[game]}_reasoning_3steps.json" | |
levelConfig = f"config/level_config/{gamename_to_levelname[game]}/level{level}.json" | |
config.load_env_config(gameEnvConfig) | |
config.load_level_config(levelConfig) | |
self.env_name = config.env_name | |
self.game_module = config.game_module | |
self.game_class = config.game_class | |
self.new_input_event = threading.Event() | |
self.new_input_event.clear() | |
self.history_images = [] | |
self.current_image = None | |
self.history_actions = [] | |
self.game_id = game_id | |
self.action_file = action_file | |
self.stop_event = stop_event | |
self.save_file = f"{self.output_dir}/game_{self.game_id}.pkl" | |
self.step_signal_file = f"{self.output_dir}/step_signal.txt" | |
if params is not None: | |
print(f"Pipeline runner loaded params: {params}") | |
self.provider = OpenAIProvider(params) | |
self.agent = game_agent(self.provider) | |
else: | |
self.provider = None | |
self.agent = game_agent() | |
def input_listener(self, event): | |
count = 0 | |
flag = True | |
while not event.is_set() and not self.game.over and not self.stop_event.is_set(): | |
if not self.game.new_action_event.is_set(): | |
if flag: | |
game_info = self.game.get_game_info() | |
self.agent.update_game_info(game_info) | |
self.history_images = [ | |
x['image'] for x in self.agent.history[-4:-1] | |
] | |
self.current_image = self.agent.history[-1]['image'] | |
self.history_actions = [ | |
x['history_action'] for x in self.agent.history[-4:-1] | |
] | |
info = { | |
"history_images": self.history_images, | |
"current_image": self.current_image, | |
"history_actions": self.history_actions | |
} | |
self.last_info = info | |
print("In runner:", info["history_actions"]) | |
lock = FileLock(self.save_file + ".lock") | |
with lock: | |
pickle.dump(info, open(self.save_file, "wb")) | |
flag = False | |
if self.provider: | |
if not os.path.exists(self.step_signal_file): | |
continue | |
with open(self.step_signal_file, "r") as f: | |
step_signal = f.read() | |
if step_signal == "step": | |
with open(self.step_signal_file, "w") as f: | |
f.write("") | |
success, msg = self.agent.execute_action() | |
if success: | |
self.game.current_action = msg | |
flag = True | |
else: | |
error_msg = msg | |
info = self.last_info | |
info["Error"] = error_msg | |
lock = FileLock(self.save_file + ".lock") | |
with lock: | |
pickle.dump(info, open(self.save_file, "wb")) | |
time.sleep(0.5) | |
continue | |
elif step_signal == "continuous": | |
# TODO | |
pass | |
else: | |
continue | |
else: | |
if not os.path.exists(self.action_file): | |
continue | |
with open(self.action_file, "r") as f: | |
self.game.current_action = f.read() | |
if self.game.current_action == "": | |
continue | |
with open(self.action_file, "w") as f: | |
f.write("") | |
flag = True | |
print("Input listener get action: ", self.game.current_action) | |
self.game.new_action_event.set() | |
else: | |
count += 1 | |
if count % 5 == 0: | |
print("Input listener waiting for event.") | |
time.sleep(0.02) | |
print("Input listener exit 0.") | |
print("self.game.over: ", self.game.over) | |
self.game.over = True | |
def run(self): | |
print(f"{self.env_name} Playground Pipeline Running.") | |
game_module = importlib.import_module(self.game_module) | |
game_class = getattr(game_module, self.game_class) | |
self.game = game_class(self.output_dir) | |
self.game.run(self.input_listener) | |
def pipeline_shutdown(self): | |
self.agent = None |