File size: 5,277 Bytes
3f6b5c2 91ab55c 3f6b5c2 f5ba9f9 3f6b5c2 a39f32d 3f6b5c2 f5ba9f9 3f6b5c2 f5ba9f9 9a9e104 f5ba9f9 3f6b5c2 43e0ed4 3f6b5c2 43e0ed4 3f6b5c2 43e0ed4 3f6b5c2 43e0ed4 3f6b5c2 43e0ed4 3f6b5c2 43e0ed4 9a9e104 3f6b5c2 9a9e104 3f6b5c2 91ab55c 43e0ed4 91ab55c 3f6b5c2 91ab55c ddf8801 9a9e104 ddf8801 91ab55c ddf8801 91ab55c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
import traceback
from copy import deepcopy
from typing import Dict, Any
from .code_interpreters.create_code_interpreter import create_code_interpreter
from aiflows.messages import FlowMessage
from aiflows.base_flows import AtomicFlow
def truncate_output(data, max_output_chars=2000):
needs_truncation = False
message = f'Output truncated. Showing the last {max_output_chars} characters.\n\n'
# Remove previous truncation message if it exists
if data.startswith(message):
data = data[len(message):]
needs_truncation = True
# If data exceeds max length, truncate it and add message
if len(data) > max_output_chars or needs_truncation:
data = message + data[-max_output_chars:]
return data
class InterpreterAtomicFlow(AtomicFlow):
"""This flow is used to run the code passed from the caller.
*Input Interface*:
- `code`
- `language`
*Output Interface*:
- `interpreter_output`: output of the code interpreter
*Configuration Parameters*:
- max_output: maximum number of characters to display in the output
**Full credits to open-interpreter (https://github.com/KillianLucas/open-interpreter)
for the usage of code interpreters (package `code_interpreters`) and the function truncate_output()**
I'm extracting the code interpreter part from open-interpreter because the litellm version of open-interpreter
is not compatible with that of the current version of aiflows(v.0.1.7).
"""
def __init__(self,
**kwargs):
super().__init__(**kwargs)
self.max_output = self.flow_config["max_output"]
self._code_interpreters = {}
def set_up_flow_state(self):
""" class-specific flow state: language and code, which describes the programming language and the code to run.
"""
super().set_up_flow_state()
self.flow_state["language"] = None
self.flow_state["code"] = ""
def _state_update_add_language_and_code(self,
language: str,
code: str) -> None:
"""
updates the language and code passed from _process_input_data
to the flow state
:param language: the programming language
:param code: the code to run
"""
self.flow_state["language"] = language
self.flow_state["code"] = code
def _check_input(self, input_data: Dict[str, Any]):
""" Sanity check of input data
:param input_data: input data
:type input_data: Dict[str, Any]
"""
# ~~~ Sanity check of input_data ~~~
assert "language" in input_data, "attribute 'language' not in input data."
assert "code" in input_data, "attribute 'code' not in input data."
def _process_input_data(self, input_data: Dict[str, Any]):
""" Allocate interpreter if any, pass input data into flow state
:param input_data: input data
:type input_data: Dict[str, Any]
"""
# code in Jupyter notebook that starts with '!' is actually shell command.
if input_data["language"] == "python" and input_data["code"].startswith("!"):
input_data["language"] = "shell"
input_data["code"] = input_data["code"][1:]
# ~~~ Allocate interpreter ~~~
# interpreter existence is checked in create_code_interpreter()
# TODO: consider: should we put language not supported error into output?
language = input_data["language"]
if language not in self._code_interpreters:
self._code_interpreters[language] = create_code_interpreter(language)
# ~~~ Pass input data to flow state ~~~
self._state_update_add_language_and_code(
language=language,
code=input_data["code"]
)
def _call(self):
""" This method runs the code interpreter and returns the output. (runs the code interpreter and returns the output.)
"""
output = ""
try:
code_interpreter = self._code_interpreters[self.flow_state["language"]]
code = self.flow_state["code"]
for line in code_interpreter.run(code):
if "output" in line:
output += "\n" + line["output"]
# Truncate output
output = truncate_output(output, self.max_output)
output = output.strip()
except:
output = traceback.format_exc()
output = output.strip()
return output
def run(
self,
input_message: FlowMessage):
""" Run the code interpreter and return the output.
:param input_message: The input message of the flow.
:type input_message: FlowMessage
"""
input_data = input_message.data
self._check_input(input_data)
self._process_input_data(input_data)
output = self._call()
response = {
"interpreter_output": output,
}
reply = self.package_output_message(
input_message=input_message,
response = response
)
self.send_message(reply)
|