韩宇
init
1b7e88c
from abc import ABC, abstractmethod
from typing import List, Optional
from omagent_core.engine.http.models import (
ScrollableSearchResultWorkflowSummary, SkipTaskRequest, WorkflowRun,
WorkflowStatus)
from omagent_core.engine.http.models.correlation_ids_search_request import \
CorrelationIdsSearchRequest
from omagent_core.engine.http.models.rerun_workflow_request import \
RerunWorkflowRequest
from omagent_core.engine.http.models.start_workflow_request import \
StartWorkflowRequest
from omagent_core.engine.http.models.workflow import Workflow
from omagent_core.engine.http.models.workflow_state_update import \
WorkflowStateUpdate
from omagent_core.engine.http.models.workflow_test_request import \
WorkflowTestRequest
class WorkflowClient(ABC):
@abstractmethod
def start_workflow(self, start_workflow_request: StartWorkflowRequest) -> str:
pass
@abstractmethod
def get_workflow(
self, workflow_id: str, include_tasks: Optional[bool] = True
) -> Workflow:
pass
@abstractmethod
def get_workflow_status(
self,
workflow_id: str,
include_output: bool = None,
include_variables: bool = None,
) -> WorkflowStatus:
pass
@abstractmethod
def delete_workflow(
self, workflow_id: str, archive_workflow: Optional[bool] = True
):
pass
@abstractmethod
def terminate_workflow(
self,
workflow_id: str,
reason: Optional[str] = None,
trigger_failure_workflow: bool = False,
):
pass
@abstractmethod
def execute_workflow(
self,
start_workflow_request: StartWorkflowRequest,
request_id: str = None,
wait_until_task_ref: Optional[str] = None,
wait_for_seconds: int = 30,
) -> WorkflowRun:
pass
@abstractmethod
def pause_workflow(self, workflow_id: str):
pass
@abstractmethod
def resume_workflow(self, workflow_id: str):
pass
@abstractmethod
def restart_workflow(
self, workflow_id: str, use_latest_def: Optional[bool] = False
):
pass
@abstractmethod
def retry_workflow(
self, workflow_id: str, resume_subworkflow_tasks: Optional[bool] = False
):
pass
@abstractmethod
def rerun_workflow(
self, workflow_id: str, rerun_workflow_request: RerunWorkflowRequest
):
pass
@abstractmethod
def skip_task_from_workflow(
self, workflow_id: str, task_reference_name: str, request: SkipTaskRequest
):
pass
@abstractmethod
def test_workflow(self, test_request: WorkflowTestRequest) -> Workflow:
pass
@abstractmethod
def search(
self, start: int = 0, size: int = 100, free_text: str = "*", query: str = None
) -> ScrollableSearchResultWorkflowSummary:
pass
@abstractmethod
def get_by_correlation_ids_in_batch(
self,
batch_request: CorrelationIdsSearchRequest,
include_completed: bool = False,
include_tasks: bool = False,
) -> dict[str, List[Workflow]]:
pass
@abstractmethod
def get_by_correlation_ids(
self,
workflow_name: str,
correlation_ids: List[str],
include_completed: bool = False,
include_tasks: bool = False,
) -> dict[str, List[Workflow]]:
pass
@abstractmethod
def remove_workflow(self, workflow_id: str):
pass
@abstractmethod
def update_variables(
self, workflow_id: str, variables: dict[str, object] = {}
) -> None:
pass
@abstractmethod
def update_state(
self,
workflow_id: str,
update_requesst: WorkflowStateUpdate,
wait_until_task_ref_names: List[str] = None,
wait_for_seconds: int = None,
) -> WorkflowRun:
pass