import subprocess import threading import queue import time import traceback from .base_code_interpreter import BaseCodeInterpreter class SubprocessCodeInterpreter(BaseCodeInterpreter): def __init__(self): self.start_cmd = "" self.process = None self.debug_mode = False self.output_queue = queue.Queue() self.done = threading.Event() def detect_active_line(self, line): return None def detect_end_of_execution(self, line): return None def line_postprocessor(self, line): return line def preprocess_code(self, code): """ This needs to insert an end_of_execution marker of some kind, which can be detected by detect_end_of_execution. Optionally, add active line markers for detect_active_line. """ return code def terminate(self): self.process.terminate() def start_process(self): if self.process: self.terminate() self.process = subprocess.Popen(self.start_cmd.split(), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=0, universal_newlines=True) threading.Thread(target=self.handle_stream_output, args=(self.process.stdout, False), daemon=True).start() threading.Thread(target=self.handle_stream_output, args=(self.process.stderr, True), daemon=True).start() def run(self, code): retry_count = 0 max_retries = 3 # Setup try: code = self.preprocess_code(code) if not self.process: self.start_process() except: yield {"output": traceback.format_exc()} return while retry_count <= max_retries: if self.debug_mode: print(f"Running code:\n{code}\n---") self.done.clear() try: self.process.stdin.write(code + "\n") self.process.stdin.flush() break except: if retry_count != 0: # For UX, I like to hide this if it happens once. Obviously feels better to not see errors # Most of the time it doesn't matter, but we should figure out why it happens frequently with: # applescript yield {"output": traceback.format_exc()} yield {"output": f"Retrying... ({retry_count}/{max_retries})"} yield {"output": "Restarting process."} self.start_process() retry_count += 1 if retry_count > max_retries: yield {"output": "Maximum retries reached. Could not execute code."} return while True: if not self.output_queue.empty(): yield self.output_queue.get() else: time.sleep(0.1) try: output = self.output_queue.get(timeout=0.3) # Waits for 0.3 seconds yield output except queue.Empty: if self.done.is_set(): # Try to yank 3 more times from it... maybe there's something in there... # (I don't know if this actually helps. Maybe we just need to yank 1 more time) for _ in range(3): if not self.output_queue.empty(): yield self.output_queue.get() time.sleep(0.2) break def handle_stream_output(self, stream, is_error_stream): for line in iter(stream.readline, ''): if self.debug_mode: print(f"Received output line:\n{line}\n---") line = self.line_postprocessor(line) if line is None: continue # `line = None` is the postprocessor's signal to discard completely if self.detect_active_line(line): active_line = self.detect_active_line(line) self.output_queue.put({"active_line": active_line}) elif self.detect_end_of_execution(line): self.output_queue.put({"active_line": None}) time.sleep(0.1) self.done.set() elif is_error_stream and "KeyboardInterrupt" in line: self.output_queue.put({"output": "KeyboardInterrupt"}) time.sleep(0.1) self.done.set() else: self.output_queue.put({"output": line})