|
from typing import Dict, Any |
|
|
|
from aiflows.base_flows import AtomicFlow |
|
from aiflows.messages import FlowMessage |
|
|
|
class FixedReplyFlow(AtomicFlow): |
|
""" This class implements a FixedReplyFlow. It's used to reply with a fixed reply. |
|
|
|
*Configuration Parameters*: |
|
|
|
- `name` (str): The name of the flow. |
|
|
|
- `description` (str): A description of the flow. This description is used to generate the help message of the flow. |
|
|
|
- `fixed_reply` (str): The fixed reply to reply with. |
|
|
|
- The other configuration parameters are inherited from the default configuration of AtomicFlow (see AtomicFlow) |
|
|
|
*Input Interface*: |
|
|
|
- None |
|
|
|
Output Interface: |
|
|
|
- `fixed_reply` (str): The fixed reply. |
|
|
|
:param \**kwargs: The keyword arguments passed to the AtomicFlow constructor. Among these is the flow_config which should also contain the "fixed_reply" key. |
|
:type \**kwargs: Dict[str, Any] |
|
""" |
|
REQUIRED_KEYS_CONFIG = ["fixed_reply"] |
|
|
|
__default_flow_config = { |
|
"input_interface": [], |
|
"output_interface": ["fixed_reply"], |
|
} |
|
|
|
def __init__(self, **kwargs): |
|
super().__init__(**kwargs) |
|
|
|
def run(self, |
|
input_message: FlowMessage): |
|
""" Runs the FixedReplyFlow. It's used to reply with a fixed reply. |
|
|
|
:param input_message: The input message |
|
:type input_message: FlowMessage |
|
""" |
|
reply = self.package_output_message( |
|
input_message=input_message, |
|
response={"fixed_reply": self.flow_config["fixed_reply"]} |
|
) |
|
self.send_message(reply) |
|
|
|
|