File size: 5,277 Bytes
3f6b5c2
 
 
 
 
91ab55c
3f6b5c2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f5ba9f9
3f6b5c2
a39f32d
3f6b5c2
f5ba9f9
3f6b5c2
 
f5ba9f9
9a9e104
f5ba9f9
3f6b5c2
 
 
 
 
 
 
 
 
43e0ed4
3f6b5c2
 
 
43e0ed4
3f6b5c2
 
 
 
 
 
 
 
 
 
 
43e0ed4
 
3f6b5c2
 
 
 
 
43e0ed4
 
 
3f6b5c2
 
 
 
 
 
 
43e0ed4
 
 
3f6b5c2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
43e0ed4
 
9a9e104
3f6b5c2
 
 
 
 
 
 
 
 
 
 
 
 
9a9e104
 
3f6b5c2
 
 
91ab55c
43e0ed4
 
 
 
91ab55c
3f6b5c2
 
91ab55c
ddf8801
 
 
9a9e104
ddf8801
 
91ab55c
 
ddf8801
91ab55c
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import traceback
from copy import deepcopy
from typing import Dict, Any

from .code_interpreters.create_code_interpreter import create_code_interpreter
from aiflows.messages import FlowMessage
from aiflows.base_flows import AtomicFlow

def truncate_output(data, max_output_chars=2000):
    needs_truncation = False

    message = f'Output truncated. Showing the last {max_output_chars} characters.\n\n'

    # Remove previous truncation message if it exists
    if data.startswith(message):
        data = data[len(message):]
        needs_truncation = True

    # If data exceeds max length, truncate it and add message
    if len(data) > max_output_chars or needs_truncation:
        data = message + data[-max_output_chars:]

    return data


class InterpreterAtomicFlow(AtomicFlow):
    """This flow is used to run the code passed from the caller.
    *Input Interface*:
    - `code`
    - `language`
    
    *Output Interface*:
    - `interpreter_output`: output of the code interpreter

    *Configuration Parameters*:
    - max_output: maximum number of characters to display in the output

    **Full credits to open-interpreter (https://github.com/KillianLucas/open-interpreter)
    for the usage of code interpreters (package `code_interpreters`) and the function truncate_output()**

    I'm extracting the code interpreter part from open-interpreter because the litellm version of open-interpreter
    is not compatible with that of the current version of aiflows(v.0.1.7).
    """
    def __init__(self,
                 **kwargs):
        super().__init__(**kwargs)
        self.max_output = self.flow_config["max_output"]
        self._code_interpreters = {}

    def set_up_flow_state(self):
        """ class-specific flow state: language and code, which describes the programming language and the code to run.
        """
        super().set_up_flow_state()
        self.flow_state["language"] = None
        self.flow_state["code"] = ""

    def _state_update_add_language_and_code(self,
                                            language: str,
                                            code: str) -> None:
        """
        updates the language and code passed from _process_input_data
        to the flow state
        :param language: the programming language
        :param code: the code to run
        """
        self.flow_state["language"] = language
        self.flow_state["code"] = code

    def _check_input(self, input_data: Dict[str, Any]):
        """ Sanity check of input data
        :param input_data: input data
        :type input_data: Dict[str, Any]
        """
         # ~~~ Sanity check of input_data ~~~
        assert "language" in input_data, "attribute 'language' not in input data."
        assert "code" in input_data, "attribute 'code' not in input data."


    def _process_input_data(self, input_data: Dict[str, Any]):
        """ Allocate interpreter if any, pass input data into flow state
        :param input_data: input data
        :type input_data: Dict[str, Any]
        """
        # code in Jupyter notebook that starts with '!' is actually shell command.
        if input_data["language"] == "python" and input_data["code"].startswith("!"):
            input_data["language"] = "shell"
            input_data["code"] = input_data["code"][1:]

        # ~~~ Allocate interpreter ~~~
        # interpreter existence is checked in create_code_interpreter()
        # TODO: consider: should we put language not supported error into output?
        language = input_data["language"]
        if language not in self._code_interpreters:
            self._code_interpreters[language] = create_code_interpreter(language)

        # ~~~ Pass input data to flow state ~~~
        self._state_update_add_language_and_code(
            language=language,
            code=input_data["code"]
        )

    def _call(self):
        """ This method runs the code interpreter and returns the output. (runs the code interpreter and returns the output.)
        """

        output = ""
        try:
            code_interpreter = self._code_interpreters[self.flow_state["language"]]
            code = self.flow_state["code"]
            for line in code_interpreter.run(code):
                if "output" in line:
                    output += "\n" + line["output"]
                    # Truncate output
                    output = truncate_output(output, self.max_output)
                    output = output.strip()
        except:
            output = traceback.format_exc()
            output = output.strip()

        return output

    def run(
            self,
            input_message: FlowMessage):
        """ Run the code interpreter and return the output.
        :param input_message: The input message of the flow.
        :type input_message: FlowMessage
        """
        input_data = input_message.data
        self._check_input(input_data)
        self._process_input_data(input_data)
        
        output = self._call()
        
        response = {
            "interpreter_output": output,
        }
                
        reply = self.package_output_message(
            input_message=input_message,
            response = response
        )
    
        self.send_message(reply)