InterpreterFlowModule / code_interpreters /subprocess_code_interpreter.py
nbaldwin's picture
remove code runs
9a9e104
import subprocess
import threading
import queue
import time
import traceback
from .base_code_interpreter import BaseCodeInterpreter
class SubprocessCodeInterpreter(BaseCodeInterpreter):
def __init__(self):
self.start_cmd = ""
self.process = None
self.debug_mode = False
self.output_queue = queue.Queue()
self.done = threading.Event()
def detect_active_line(self, line):
return None
def detect_end_of_execution(self, line):
return None
def line_postprocessor(self, line):
return line
def preprocess_code(self, code):
"""
This needs to insert an end_of_execution marker of some kind,
which can be detected by detect_end_of_execution.
Optionally, add active line markers for detect_active_line.
"""
return code
def terminate(self):
self.process.terminate()
def start_process(self):
if self.process:
self.terminate()
self.process = subprocess.Popen(self.start_cmd.split(),
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=0,
universal_newlines=True)
threading.Thread(target=self.handle_stream_output,
args=(self.process.stdout, False),
daemon=True).start()
threading.Thread(target=self.handle_stream_output,
args=(self.process.stderr, True),
daemon=True).start()
def run(self, code):
retry_count = 0
max_retries = 3
# Setup
try:
code = self.preprocess_code(code)
if not self.process:
self.start_process()
except:
yield {"output": traceback.format_exc()}
return
while retry_count <= max_retries:
if self.debug_mode:
print(f"Running code:\n{code}\n---")
self.done.clear()
try:
self.process.stdin.write(code + "\n")
self.process.stdin.flush()
break
except:
if retry_count != 0:
# For UX, I like to hide this if it happens once. Obviously feels better to not see errors
# Most of the time it doesn't matter, but we should figure out why it happens frequently with:
# applescript
yield {"output": traceback.format_exc()}
yield {"output": f"Retrying... ({retry_count}/{max_retries})"}
yield {"output": "Restarting process."}
self.start_process()
retry_count += 1
if retry_count > max_retries:
yield {"output": "Maximum retries reached. Could not execute code."}
return
while True:
if not self.output_queue.empty():
yield self.output_queue.get()
else:
time.sleep(0.1)
try:
output = self.output_queue.get(timeout=0.3) # Waits for 0.3 seconds
yield output
except queue.Empty:
if self.done.is_set():
# Try to yank 3 more times from it... maybe there's something in there...
# (I don't know if this actually helps. Maybe we just need to yank 1 more time)
for _ in range(3):
if not self.output_queue.empty():
yield self.output_queue.get()
time.sleep(0.2)
break
def handle_stream_output(self, stream, is_error_stream):
for line in iter(stream.readline, ''):
if self.debug_mode:
print(f"Received output line:\n{line}\n---")
line = self.line_postprocessor(line)
if line is None:
continue # `line = None` is the postprocessor's signal to discard completely
if self.detect_active_line(line):
active_line = self.detect_active_line(line)
self.output_queue.put({"active_line": active_line})
elif self.detect_end_of_execution(line):
self.output_queue.put({"active_line": None})
time.sleep(0.1)
self.done.set()
elif is_error_stream and "KeyboardInterrupt" in line:
self.output_queue.put({"output": "KeyboardInterrupt"})
time.sleep(0.1)
self.done.set()
else:
self.output_queue.put({"output": line})