|
|
|
|
|
import subprocess |
|
import threading |
|
import queue |
|
import time |
|
import traceback |
|
from .base_code_interpreter import BaseCodeInterpreter |
|
|
|
class SubprocessCodeInterpreter(BaseCodeInterpreter): |
|
def __init__(self): |
|
self.start_cmd = "" |
|
self.process = None |
|
self.debug_mode = False |
|
self.output_queue = queue.Queue() |
|
self.done = threading.Event() |
|
|
|
def detect_active_line(self, line): |
|
return None |
|
|
|
def detect_end_of_execution(self, line): |
|
return None |
|
|
|
def line_postprocessor(self, line): |
|
return line |
|
|
|
def preprocess_code(self, code): |
|
""" |
|
This needs to insert an end_of_execution marker of some kind, |
|
which can be detected by detect_end_of_execution. |
|
|
|
Optionally, add active line markers for detect_active_line. |
|
""" |
|
return code |
|
|
|
def terminate(self): |
|
self.process.terminate() |
|
|
|
def start_process(self): |
|
if self.process: |
|
self.terminate() |
|
|
|
self.process = subprocess.Popen(self.start_cmd.split(), |
|
stdin=subprocess.PIPE, |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.PIPE, |
|
text=True, |
|
bufsize=0, |
|
universal_newlines=True) |
|
threading.Thread(target=self.handle_stream_output, |
|
args=(self.process.stdout, False), |
|
daemon=True).start() |
|
threading.Thread(target=self.handle_stream_output, |
|
args=(self.process.stderr, True), |
|
daemon=True).start() |
|
|
|
def run(self, code): |
|
retry_count = 0 |
|
max_retries = 3 |
|
|
|
|
|
try: |
|
code = self.preprocess_code(code) |
|
if not self.process: |
|
self.start_process() |
|
except: |
|
yield {"output": traceback.format_exc()} |
|
return |
|
|
|
|
|
while retry_count <= max_retries: |
|
if self.debug_mode: |
|
print(f"Running code:\n{code}\n---") |
|
|
|
self.done.clear() |
|
|
|
try: |
|
self.process.stdin.write(code + "\n") |
|
self.process.stdin.flush() |
|
break |
|
except: |
|
if retry_count != 0: |
|
|
|
|
|
|
|
yield {"output": traceback.format_exc()} |
|
yield {"output": f"Retrying... ({retry_count}/{max_retries})"} |
|
yield {"output": "Restarting process."} |
|
|
|
self.start_process() |
|
|
|
retry_count += 1 |
|
if retry_count > max_retries: |
|
yield {"output": "Maximum retries reached. Could not execute code."} |
|
return |
|
|
|
while True: |
|
if not self.output_queue.empty(): |
|
yield self.output_queue.get() |
|
else: |
|
time.sleep(0.1) |
|
try: |
|
output = self.output_queue.get(timeout=0.3) |
|
yield output |
|
except queue.Empty: |
|
if self.done.is_set(): |
|
|
|
|
|
for _ in range(3): |
|
if not self.output_queue.empty(): |
|
yield self.output_queue.get() |
|
time.sleep(0.2) |
|
break |
|
|
|
def handle_stream_output(self, stream, is_error_stream): |
|
for line in iter(stream.readline, ''): |
|
if self.debug_mode: |
|
print(f"Received output line:\n{line}\n---") |
|
|
|
line = self.line_postprocessor(line) |
|
|
|
if line is None: |
|
continue |
|
|
|
if self.detect_active_line(line): |
|
active_line = self.detect_active_line(line) |
|
self.output_queue.put({"active_line": active_line}) |
|
elif self.detect_end_of_execution(line): |
|
self.output_queue.put({"active_line": None}) |
|
time.sleep(0.1) |
|
self.done.set() |
|
elif is_error_stream and "KeyboardInterrupt" in line: |
|
self.output_queue.put({"output": "KeyboardInterrupt"}) |
|
time.sleep(0.1) |
|
self.done.set() |
|
else: |
|
self.output_queue.put({"output": line}) |
|
|
|
|