FixedReplyFlowModule / fixed_reply.py
nbaldwin's picture
remove is_reply
7437003
from typing import Dict, Any
from aiflows.base_flows import AtomicFlow
from aiflows.messages import FlowMessage
class FixedReplyFlow(AtomicFlow):
""" This class implements a FixedReplyFlow. It's used to reply with a fixed reply.
*Configuration Parameters*:
- `name` (str): The name of the flow.
- `description` (str): A description of the flow. This description is used to generate the help message of the flow.
- `fixed_reply` (str): The fixed reply to reply with.
- The other configuration parameters are inherited from the default configuration of AtomicFlow (see AtomicFlow)
*Input Interface*:
- None
Output Interface:
- `fixed_reply` (str): The fixed reply.
:param \**kwargs: The keyword arguments passed to the AtomicFlow constructor. Among these is the flow_config which should also contain the "fixed_reply" key.
:type \**kwargs: Dict[str, Any]
"""
REQUIRED_KEYS_CONFIG = ["fixed_reply"]
__default_flow_config = {
"input_interface": [],
"output_interface": ["fixed_reply"],
}
def __init__(self, **kwargs):
super().__init__(**kwargs)
def run(self,
input_message: FlowMessage):
""" Runs the FixedReplyFlow. It's used to reply with a fixed reply.
:param input_message: The input message
:type input_message: FlowMessage
"""
reply = self.package_output_message(
input_message=input_message,
response={"fixed_reply": self.flow_config["fixed_reply"]}
)
self.send_message(reply)