File size: 11,817 Bytes
39c930a 5ddb3df 39c930a c92f194 39c930a c92f194 39c930a c92f194 39c930a c92f194 39c930a c92f194 39c930a c92f194 39c930a c92f194 39c930a c92f194 39c930a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 |
"""
Python interpreter for executing code snippets and capturing their output.
Supports:
- captures stdout and stderr
- captures exceptions and stack traces
- limits execution time
"""
import logging
import os
import queue
import signal
import sys
import time
import traceback
from dataclasses import dataclass
from multiprocessing import Process, Queue
from pathlib import Path
import humanize
from dataclasses_json import DataClassJsonMixin
logger = logging.getLogger("aide")
@dataclass
class ExecutionResult(DataClassJsonMixin):
"""
Result of executing a code snippet in the interpreter.
Contains the output, execution time, and exception information.
"""
term_out: list[str]
exec_time: float
exc_type: str | None
exc_info: dict | None = None
exc_stack: list[tuple] | None = None
def exception_summary(e, working_dir, exec_file_name, format_tb_ipython):
"""Generates a string that summarizes an exception and its stack trace (either in standard python repl or in IPython format)."""
if format_tb_ipython:
import IPython.core.ultratb
# tb_offset = 1 to skip parts of the stack trace in weflow code
tb = IPython.core.ultratb.VerboseTB(tb_offset=1, color_scheme="NoColor")
tb_str = str(tb.text(*sys.exc_info()))
else:
tb_lines = traceback.format_exception(e)
# skip parts of stack trace in weflow code
tb_str = "".join(
[
line
for line in tb_lines
if "aide/" not in line and "importlib" not in line
]
)
# tb_str = "".join([l for l in tb_lines])
# replace whole path to file with just filename (to remove agent workspace dir)
tb_str = tb_str.replace(str(working_dir / exec_file_name), exec_file_name)
exc_info = {}
if hasattr(e, "args"):
exc_info["args"] = [str(i) for i in e.args]
for att in ["name", "msg", "obj"]:
if hasattr(e, att):
exc_info[att] = str(getattr(e, att))
tb = traceback.extract_tb(e.__traceback__)
exc_stack = [(t.filename, t.lineno, t.name, t.line) for t in tb]
return tb_str, e.__class__.__name__, exc_info, exc_stack
class RedirectQueue:
def __init__(self, queue, timeout=5):
self.queue = queue
self.timeout = timeout
def write(self, msg):
try:
self.queue.put(msg, timeout=self.timeout)
except queue.Full:
logger.warning("Queue write timed out")
def flush(self):
pass
class Interpreter:
def __init__(
self,
working_dir: Path | str,
timeout: int = 3600,
format_tb_ipython: bool = False,
agent_file_name: str = "runfile.py",
):
"""
Simulates a standalone Python REPL with an execution time limit.
Args:
working_dir (Path | str): working directory of the agent
timeout (int, optional): Timeout for each code execution step. Defaults to 3600.
format_tb_ipython (bool, optional): Whether to use IPython or default python REPL formatting for exceptions. Defaults to False.
agent_file_name (str, optional): The name for the agent's code file. Defaults to "runfile.py".
"""
# this really needs to be a path, otherwise causes issues that don't raise exc
self.working_dir = Path(working_dir).resolve()
assert (
self.working_dir.exists()
), f"Working directory {self.working_dir} does not exist"
self.timeout = timeout
self.format_tb_ipython = format_tb_ipython
self.agent_file_name = agent_file_name
self.process: Process = None # type: ignore
def child_proc_setup(self, result_outq: Queue) -> None:
# disable all warnings (before importing anything)
import shutup
shutup.mute_warnings()
os.chdir(str(self.working_dir))
# this seems to only benecessary because we're exec'ing code from a string,
# a .py file should be able to import modules from the cwd anyway
sys.path.append(str(self.working_dir))
# capture stdout and stderr
# trunk-ignore(mypy/assignment)
sys.stdout = sys.stderr = RedirectQueue(result_outq)
def _run_session(
self, code_inq: Queue, result_outq: Queue, event_outq: Queue
) -> None:
self.child_proc_setup(result_outq)
global_scope: dict = {}
while True:
code = code_inq.get()
os.chdir(str(self.working_dir))
with open(self.agent_file_name, "w") as f:
f.write(code)
event_outq.put(("state:ready",))
try:
exec(compile(code, self.agent_file_name, "exec"), global_scope)
except BaseException as e:
tb_str, e_cls_name, exc_info, exc_stack = exception_summary(
e,
self.working_dir,
self.agent_file_name,
self.format_tb_ipython,
)
result_outq.put(tb_str)
if e_cls_name == "KeyboardInterrupt":
e_cls_name = "TimeoutError"
event_outq.put(("state:finished", e_cls_name, exc_info, exc_stack))
else:
event_outq.put(("state:finished", None, None, None))
# remove the file after execution (otherwise it might be included in the data preview)
os.remove(self.agent_file_name)
# put EOF marker to indicate that we're done
result_outq.put("<|EOF|>")
def create_process(self) -> None:
# we use three queues to communicate with the child process:
# - code_inq: send code to child to execute
# - result_outq: receive stdout/stderr from child
# - event_outq: receive events from child (e.g. state:ready, state:finished)
# trunk-ignore(mypy/var-annotated)
self.code_inq, self.result_outq, self.event_outq = Queue(), Queue(), Queue()
self.process = Process(
target=self._run_session,
args=(self.code_inq, self.result_outq, self.event_outq),
)
self.process.start()
def cleanup_session(self):
if self.process is None:
return
try:
# Reduce grace period from 2 seconds to 0.5
self.process.terminate()
self.process.join(timeout=0.5)
if self.process.exitcode is None:
logger.warning("Process failed to terminate, killing immediately")
self.process.kill()
self.process.join(timeout=0.5)
if self.process.exitcode is None:
logger.error("Process refuses to die, using SIGKILL")
os.kill(self.process.pid, signal.SIGKILL)
except Exception as e:
logger.error(f"Error during process cleanup: {e}")
finally:
if self.process is not None:
self.process.close()
self.process = None
def run(self, code: str, reset_session=True) -> ExecutionResult:
"""
Execute the provided Python command in a separate process and return its output.
Parameters:
code (str): Python code to execute.
reset_session (bool, optional): Whether to reset the interpreter session before executing the code. Defaults to True.
Returns:
ExecutionResult: Object containing the output and metadata of the code execution.
"""
logger.debug(f"REPL is executing code (reset_session={reset_session})")
if reset_session:
if self.process is not None:
# terminate and clean up previous process
self.cleanup_session()
self.create_process()
else:
# reset_session needs to be True on first exec
assert self.process is not None
assert self.process.is_alive()
self.code_inq.put(code)
# wait for child to actually start execution (we don't want interrupt child setup)
try:
state = self.event_outq.get(timeout=10)
except queue.Empty:
msg = "REPL child process failed to start execution"
logger.critical(msg)
while not self.result_outq.empty():
logger.error(f"REPL output queue dump: {self.result_outq.get()}")
raise RuntimeError(msg) from None
assert state[0] == "state:ready", state
start_time = time.time()
# this flag indicates that the child ahs exceeded the time limit and an interrupt was sent
# if the child process dies without this flag being set, it's an unexpected termination
child_in_overtime = False
while True:
try:
# check if the child is done
state = self.event_outq.get(timeout=1) # wait for state:finished
assert state[0] == "state:finished", state
exec_time = time.time() - start_time
break
except queue.Empty:
# we haven't heard back from the child -> check if it's still alive (assuming overtime interrupt wasn't sent yet)
if not child_in_overtime and not self.process.is_alive():
msg = "REPL child process died unexpectedly"
logger.critical(msg)
while not self.result_outq.empty():
logger.error(
f"REPL output queue dump: {self.result_outq.get()}"
)
raise RuntimeError(msg) from None
# child is alive and still executing -> check if we should sigint..
if self.timeout is None:
continue
running_time = time.time() - start_time
if running_time > self.timeout:
logger.warning(f"Execution exceeded timeout of {self.timeout}s")
os.kill(self.process.pid, signal.SIGINT)
child_in_overtime = True
# terminate if we're overtime by more than 5 seconds
if running_time > self.timeout + 5:
logger.warning("Child failed to terminate, killing it..")
self.cleanup_session()
state = (None, "TimeoutError", {}, [])
exec_time = self.timeout
break
output: list[str] = []
# read all stdout/stderr from child up to the EOF marker
# waiting until the queue is empty is not enough since
# the feeder thread in child might still be adding to the queue
start_collect = time.time()
while not self.result_outq.empty() or not output or output[-1] != "<|EOF|>":
try:
# Add 5-second timeout for output collection
if time.time() - start_collect > 5:
logger.warning("Output collection timed out")
break
output.append(self.result_outq.get(timeout=1))
except queue.Empty:
continue
output.pop() # remove the EOF marker
e_cls_name, exc_info, exc_stack = state[1:]
if e_cls_name == "TimeoutError":
output.append(
f"TimeoutError: Execution exceeded the time limit of {humanize.naturaldelta(self.timeout)}"
)
else:
output.append(
f"Execution time: {humanize.naturaldelta(exec_time)} seconds (time limit is {humanize.naturaldelta(self.timeout)})."
)
return ExecutionResult(output, exec_time, e_cls_name, exc_info, exc_stack)
|