File size: 5,224 Bytes
a3ebd45
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6054f54
a3ebd45
 
 
 
 
 
629a826
a3ebd45
 
 
 
629a826
 
 
 
a3ebd45
 
 
 
629a826
 
 
a3ebd45
 
255441a
 
0404a52
 
a3ebd45
 
 
 
 
d25ba26
 
 
 
0404a52
 
d25ba26
 
 
 
 
 
 
a3ebd45
 
a57190d
629a826
0404a52
 
d25ba26
 
 
 
 
 
 
 
 
 
 
 
a57190d
 
 
 
629a826
a3ebd45
629a826
a3ebd45
629a826
 
 
a3ebd45
629a826
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0404a52
629a826
 
 
 
0404a52
629a826
 
 
 
0404a52
629a826
 
 
 
0404a52
629a826
a3ebd45
a57190d
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
#
#  Copyright 2024 The InfiniFlow Authors. All Rights Reserved.
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#
from abc import ABC
from agent.component.base import ComponentBase, ComponentParamBase


class SwitchParam(ComponentParamBase):
    """
    Define the Switch component parameters.
    """

    def __init__(self):
        super().__init__()
        """
        {
            "logical_operator" : "and | or"
            "items" : [
                            {"cpn_id": "categorize:0", "operator": "contains", "value": ""},
                            {"cpn_id": "categorize:0", "operator": "contains", "value": ""},...],
            "to": ""
        }
        """
        self.conditions = []
        self.end_cpn_id = "answer:0"
        self.operators = ['contains', 'not contains', 'start with', 'end with', 'empty', 'not empty', '=', '≠', '>',
                          '<', '≥', '≤']

    def check(self):
        self.check_empty(self.conditions, "[Switch] conditions")
        for cond in self.conditions:
            if not cond["to"]:
                raise ValueError("[Switch] 'To' can not be empty!")


class Switch(ComponentBase, ABC):
    component_name = "Switch"

    def get_dependent_components(self):
        res = []
        for cond in self._param.conditions:
            for item in cond["items"]:
                if not item["cpn_id"]:
                    continue
                if item["cpn_id"].find("begin") >= 0:
                    continue
                cid = item["cpn_id"].split("@")[0]
                res.append(cid)

        return list(set(res))

    def _run(self, history, **kwargs):
        for cond in self._param.conditions:
            res = []
            for item in cond["items"]:
                if not item["cpn_id"]:
                    continue
                cid = item["cpn_id"].split("@")[0]
                if item["cpn_id"].find("@") > 0:
                    cpn_id, key = item["cpn_id"].split("@")
                    for p in self._canvas.get_component(cid)["obj"]._param.query:
                        if p["key"] == key:
                            res.append(self.process_operator(p.get("value",""), item["operator"], item.get("value", "")))
                            break
                else:
                    out = self._canvas.get_component(cid)["obj"].output()[1]
                    cpn_input = "" if "content" not in out.columns else " ".join([str(s) for s in out["content"]])
                    res.append(self.process_operator(cpn_input, item["operator"], item.get("value", "")))

                if cond["logical_operator"] != "and" and any(res):
                    return Switch.be_output(cond["to"])

            if all(res):
                return Switch.be_output(cond["to"])

        return Switch.be_output(self._param.end_cpn_id)

    def process_operator(self, input: str, operator: str, value: str) -> bool:
        if not isinstance(input, str) or not isinstance(value, str):
            raise ValueError('Invalid input or value type: string')

        if operator == "contains":
            return True if value.lower() in input.lower() else False
        elif operator == "not contains":
            return True if value.lower() not in input.lower() else False
        elif operator == "start with":
            return True if input.lower().startswith(value.lower()) else False
        elif operator == "end with":
            return True if input.lower().endswith(value.lower()) else False
        elif operator == "empty":
            return True if not input else False
        elif operator == "not empty":
            return True if input else False
        elif operator == "=":
            return True if input == value else False
        elif operator == "≠":
            return True if input != value else False
        elif operator == ">":
            try:
                return True if float(input) > float(value) else False
            except Exception:
                return True if input > value else False
        elif operator == "<":
            try:
                return True if float(input) < float(value) else False
            except Exception:
                return True if input < value else False
        elif operator == "≥":
            try:
                return True if float(input) >= float(value) else False
            except Exception:
                return True if input >= value else False
        elif operator == "≤":
            try:
                return True if float(input) <= float(value) else False
            except Exception:
                return True if input <= value else False

        raise ValueError('Not supported operator' + operator)