File size: 4,927 Bytes
fcdf91d 9a9e104 fcdf91d 9a9e104 fcdf91d 9a9e104 fcdf91d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
import subprocess
import threading
import queue
import time
import traceback
from .base_code_interpreter import BaseCodeInterpreter
class SubprocessCodeInterpreter(BaseCodeInterpreter):
def __init__(self):
self.start_cmd = ""
self.process = None
self.debug_mode = False
self.output_queue = queue.Queue()
self.done = threading.Event()
def detect_active_line(self, line):
return None
def detect_end_of_execution(self, line):
return None
def line_postprocessor(self, line):
return line
def preprocess_code(self, code):
"""
This needs to insert an end_of_execution marker of some kind,
which can be detected by detect_end_of_execution.
Optionally, add active line markers for detect_active_line.
"""
return code
def terminate(self):
self.process.terminate()
def start_process(self):
if self.process:
self.terminate()
self.process = subprocess.Popen(self.start_cmd.split(),
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=0,
universal_newlines=True)
threading.Thread(target=self.handle_stream_output,
args=(self.process.stdout, False),
daemon=True).start()
threading.Thread(target=self.handle_stream_output,
args=(self.process.stderr, True),
daemon=True).start()
def run(self, code):
retry_count = 0
max_retries = 3
# Setup
try:
code = self.preprocess_code(code)
if not self.process:
self.start_process()
except:
yield {"output": traceback.format_exc()}
return
while retry_count <= max_retries:
if self.debug_mode:
print(f"Running code:\n{code}\n---")
self.done.clear()
try:
self.process.stdin.write(code + "\n")
self.process.stdin.flush()
break
except:
if retry_count != 0:
# For UX, I like to hide this if it happens once. Obviously feels better to not see errors
# Most of the time it doesn't matter, but we should figure out why it happens frequently with:
# applescript
yield {"output": traceback.format_exc()}
yield {"output": f"Retrying... ({retry_count}/{max_retries})"}
yield {"output": "Restarting process."}
self.start_process()
retry_count += 1
if retry_count > max_retries:
yield {"output": "Maximum retries reached. Could not execute code."}
return
while True:
if not self.output_queue.empty():
yield self.output_queue.get()
else:
time.sleep(0.1)
try:
output = self.output_queue.get(timeout=0.3) # Waits for 0.3 seconds
yield output
except queue.Empty:
if self.done.is_set():
# Try to yank 3 more times from it... maybe there's something in there...
# (I don't know if this actually helps. Maybe we just need to yank 1 more time)
for _ in range(3):
if not self.output_queue.empty():
yield self.output_queue.get()
time.sleep(0.2)
break
def handle_stream_output(self, stream, is_error_stream):
for line in iter(stream.readline, ''):
if self.debug_mode:
print(f"Received output line:\n{line}\n---")
line = self.line_postprocessor(line)
if line is None:
continue # `line = None` is the postprocessor's signal to discard completely
if self.detect_active_line(line):
active_line = self.detect_active_line(line)
self.output_queue.put({"active_line": active_line})
elif self.detect_end_of_execution(line):
self.output_queue.put({"active_line": None})
time.sleep(0.1)
self.done.set()
elif is_error_stream and "KeyboardInterrupt" in line:
self.output_queue.put({"output": "KeyboardInterrupt"})
time.sleep(0.1)
self.done.set()
else:
self.output_queue.put({"output": line})
|