# # Copyright 2024 The InfiniFlow Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import re from agent.component.base import ComponentBase, ComponentParamBase class TemplateParam(ComponentParamBase): """ Define the Generate component parameters. """ def __init__(self): super().__init__() self.content = "" self.parameters = [] def check(self): self.check_empty(self.content, "[Template] Content") return True class Template(ComponentBase): component_name = "Template" def get_dependent_components(self): cpnts = set([para["component_id"].split("@")[0] for para in self._param.parameters \ if para.get("component_id") \ and para["component_id"].lower().find("answer") < 0 \ and para["component_id"].lower().find("begin") < 0]) return list(cpnts) def _run(self, history, **kwargs): content = self._param.content self._param.inputs = [] for para in self._param.parameters: if not para.get("component_id"): continue component_id = para["component_id"].split("@")[0] if para["component_id"].lower().find("@") >= 0: cpn_id, key = para["component_id"].split("@") for p in self._canvas.get_component(cpn_id)["obj"]._param.query: if p["key"] == key: kwargs[para["key"]] = p.get("value", "") self._param.inputs.append( {"component_id": para["component_id"], "content": kwargs[para["key"]]}) break else: assert False, f"Can't find parameter '{key}' for {cpn_id}" continue cpn = self._canvas.get_component(component_id)["obj"] if cpn.component_name.lower() == "answer": hist = self._canvas.get_history(1) if hist: hist = hist[0]["content"] else: hist = "" kwargs[para["key"]] = hist continue _, out = cpn.output(allow_partial=False) if "content" not in out.columns: kwargs[para["key"]] = "" else: kwargs[para["key"]] = " - "+"\n - ".join([o if isinstance(o, str) else str(o) for o in out["content"]]) self._param.inputs.append({"component_id": para["component_id"], "content": kwargs[para["key"]]}) for n, v in kwargs.items(): content = re.sub(r"\{%s\}" % re.escape(n), str(v).replace("\\", " "), content) return Template.be_output(content)