import traceback from copy import deepcopy from typing import Dict, Any from .code_interpreters.create_code_interpreter import create_code_interpreter from aiflows.messages import FlowMessage from aiflows.base_flows import AtomicFlow def truncate_output(data, max_output_chars=2000): needs_truncation = False message = f'Output truncated. Showing the last {max_output_chars} characters.\n\n' # Remove previous truncation message if it exists if data.startswith(message): data = data[len(message):] needs_truncation = True # If data exceeds max length, truncate it and add message if len(data) > max_output_chars or needs_truncation: data = message + data[-max_output_chars:] return data class InterpreterAtomicFlow(AtomicFlow): """This flow is used to run the code passed from the caller. *Input Interface*: - `code` - `language` *Output Interface*: - `interpreter_output`: output of the code interpreter *Configuration Parameters*: - max_output: maximum number of characters to display in the output **Full credits to open-interpreter (https://github.com/KillianLucas/open-interpreter) for the usage of code interpreters (package `code_interpreters`) and the function truncate_output()** I'm extracting the code interpreter part from open-interpreter because the litellm version of open-interpreter is not compatible with that of the current version of aiflows(v.0.1.7). """ def __init__(self, **kwargs): super().__init__(**kwargs) self.max_output = self.flow_config["max_output"] self._code_interpreters = {} def set_up_flow_state(self): """ class-specific flow state: language and code, which describes the programming language and the code to run. """ super().set_up_flow_state() self.flow_state["language"] = None self.flow_state["code"] = "" def _state_update_add_language_and_code(self, language: str, code: str) -> None: """ updates the language and code passed from _process_input_data to the flow state :param language: the programming language :param code: the code to run """ self.flow_state["language"] = language self.flow_state["code"] = code def _check_input(self, input_data: Dict[str, Any]): """ Sanity check of input data :param input_data: input data :type input_data: Dict[str, Any] """ # ~~~ Sanity check of input_data ~~~ assert "language" in input_data, "attribute 'language' not in input data." assert "code" in input_data, "attribute 'code' not in input data." def _process_input_data(self, input_data: Dict[str, Any]): """ Allocate interpreter if any, pass input data into flow state :param input_data: input data :type input_data: Dict[str, Any] """ # code in Jupyter notebook that starts with '!' is actually shell command. if input_data["language"] == "python" and input_data["code"].startswith("!"): input_data["language"] = "shell" input_data["code"] = input_data["code"][1:] # ~~~ Allocate interpreter ~~~ # interpreter existence is checked in create_code_interpreter() # TODO: consider: should we put language not supported error into output? language = input_data["language"] if language not in self._code_interpreters: self._code_interpreters[language] = create_code_interpreter(language) # ~~~ Pass input data to flow state ~~~ self._state_update_add_language_and_code( language=language, code=input_data["code"] ) def _call(self): """ This method runs the code interpreter and returns the output. (runs the code interpreter and returns the output.) """ output = "" try: code_interpreter = self._code_interpreters[self.flow_state["language"]] code = self.flow_state["code"] for line in code_interpreter.run(code): if "output" in line: output += "\n" + line["output"] # Truncate output output = truncate_output(output, self.max_output) output = output.strip() except: output = traceback.format_exc() output = output.strip() return output def run( self, input_message: FlowMessage): """ Run the code interpreter and return the output. :param input_message: The input message of the flow. :type input_message: FlowMessage """ input_data = input_message.data self._check_input(input_data) self._process_input_data(input_data) output = self._call() response = { "interpreter_output": output, } reply = self.package_output_message( input_message=input_message, response = response ) self.send_message(reply)