from datetime import datetime import importlib import os import pathlib import queue import random import threading import time from filelock import FileLock import pygame from agent.game_agent import game_agent from provider.OpenAIProvider import OpenAIProvider from utils.game_utils import seed_everything from utils.calculate_log import calculate_statistics, extract_scores from utils.config import Config from utils.encoding_utils import encode_data_to_base64_path from utils.file_utils import assemble_project_path, get_all_files, img_to_gif, run_path_construct from utils.json_utils import parse_semi_formatted_text from utils.lmm_utils import assemble_prompt from utils.planner_utils import _extract_keys_from_template import logging import pickle config = Config() class PlaygroundPipelineRunner(): def __init__(self, game, level, history_steps, action_file, game_id, stop_event, sample_rate=3, histort_steps=3, params=None): print("PlaygroundPipelineRunner init.") # TODO self.output_dir = os.path.join(".", "runs", game_id) if not os.path.exists(self.output_dir): os.makedirs(self.output_dir) games = ["RaceGame", "SuperMario", "FlappyBird", "TempestRun", "PongGame"] gamename_to_envname = { "RaceGame": "race", "SuperMario": "supermario", "FlappyBird": "flappybird", "TempestRun": "tempestrun", "PongGame": "pong" } gamename_to_levelname = { "RaceGame": "racegame", "SuperMario": "supermariogame", "FlappyBird": "flappybirdgame", "TempestRun": "tempestrungame", "PongGame": "ponggame" } self.histort_steps = histort_steps gameEnvConfig = f"config/env_config/{histort_steps}steps/env_config_{gamename_to_envname[game]}_reasoning_{histort_steps}steps.json" levelConfig = f"config/level_config/{gamename_to_levelname[game]}/level{level}.json" config.load_env_config(gameEnvConfig) config.load_level_config(levelConfig) self.env_name = config.env_name self.game_module = config.game_module self.game_class = config.game_class self.new_input_event = threading.Event() self.new_input_event.clear() self.history_images = [] self.current_image = None self.history_actions = [] self.game_id = game_id self.action_file = action_file self.stop_event = stop_event self.save_file = f"{self.output_dir}/game_{self.game_id}.pkl" self.step_signal_file = f"{self.output_dir}/step_signal.txt" if params is not None: print(f"Pipeline runner loaded params: {params}") self.provider = OpenAIProvider(params) self.agent = game_agent(self.provider) else: self.provider = None self.agent = game_agent() config.extra_config["overwrite_sample_frames"] = sample_rate def input_listener(self, event): count = 0 flag = True while not event.is_set() and not self.game.over and not self.stop_event.is_set(): if not self.game.new_action_event.is_set(): if flag: game_info = self.game.get_game_info() self.agent.update_game_info(game_info) self.history_images = [ x['image'] for x in self.agent.history[-4:-1] ] self.current_image = self.agent.history[-1]['image'] self.history_actions = [ x['history_action'] for x in self.agent.history[-4:-1] ] info = { "history_images": self.history_images, "current_image": self.current_image, "history_actions": self.history_actions } self.last_info = info print("In runner:", info["history_actions"]) lock = FileLock(self.save_file + ".lock") with lock: pickle.dump(info, open(self.save_file, "wb")) flag = False if not os.path.exists(self.action_file): continue with open(self.action_file, "r") as f: self.game.current_action = f.read() if self.game.current_action == "": continue if self.game.current_action == "model": if not self.provider: print("No provider found, skipping model action.") continue success, response_action = self.agent.execute_action() self.game.current_action = response_action with open(self.action_file, "w") as f: f.write("") flag = True print("Input listener get action: ", self.game.current_action) self.game.new_action_event.set() else: count += 1 if count % 5 == 0: print("Input listener waiting for event.") time.sleep(0.02) print("Input listener exit 0.") print("self.game.over: ", self.game.over) self.game.over = True def run(self): print(f"{self.env_name} Playground Pipeline Running.") game_module = importlib.import_module(self.game_module) game_class = getattr(game_module, self.game_class) self.game = game_class(self.output_dir) self.game.run(self.input_listener) def pipeline_shutdown(self): self.agent = None