File size: 9,892 Bytes
b115d50
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
from __future__ import annotations

import time
from typing import Any, Callable, Dict, Generic, List, Optional, Set, Type, TypeVar

from pydantic import BaseModel, Field

from steamship.base.error import SteamshipError
from steamship.base.model import CamelModel, GenericCamelModel
from steamship.base.request import DeleteRequest, IdentifierRequest, Request
from steamship.utils.metadata import metadata_to_str, str_to_metadata

T = TypeVar("T")


class CreateTaskCommentRequest(Request):
    task_id: str
    external_id: str = None
    external_type: str = None
    external_group: str = None
    metadata: str = None


class ListTaskCommentRequest(Request):
    task_id: str = None
    external_id: str = None
    external_type: str = None
    external_group: str = None


class TaskComment(CamelModel):
    client: Client = Field(None, exclude=True)
    id: str = None
    user_id: str = None
    task_id: str = None
    external_id: str = None
    external_type: str = None
    external_group: str = None
    metadata: Any = None
    created_at: str = None

    def __init__(self, **kwargs):
        kwargs["metadata"] = str_to_metadata(kwargs.get("metadata"))
        super().__init__(**kwargs)

    @classmethod
    def parse_obj(cls: Type[BaseModel], obj: Any) -> BaseModel:
        # TODO (enias): This needs to be solved at the engine side
        obj = obj["taskComment"] if "taskComment" in obj else obj
        return super().parse_obj(obj)

    @staticmethod
    def create(
        client: Client,
        task_id: str = None,
        external_id: str = None,
        external_type: str = None,
        external_group: str = None,
        metadata: Any = None,
    ) -> TaskComment:
        req = CreateTaskCommentRequest(
            taskId=task_id,
            external_id=external_id,
            external_type=external_type,
            externalGroup=external_group,
            metadata=metadata_to_str(metadata),
        )
        return client.post(
            "task/comment/create",
            req,
            expect=TaskComment,
        )

    @staticmethod
    def list(
        client: Client,
        task_id: str = None,
        external_id: str = None,
        external_type: str = None,
        external_group: str = None,
    ) -> TaskCommentList:
        req = ListTaskCommentRequest(
            taskId=task_id,
            external_id=external_id,
            external_type=external_type,
            externalGroup=external_group,
        )
        return client.post(
            "task/comment/list",
            req,
            expect=TaskCommentList,
        )

    def delete(self) -> TaskComment:
        req = DeleteRequest(id=self.id)
        return self.client.post(
            "task/comment/delete",
            req,
            expect=TaskComment,
        )


class TaskCommentList(CamelModel):
    comments: List[TaskComment]


class TaskState:
    waiting = "waiting"
    running = "running"
    succeeded = "succeeded"
    failed = "failed"


class TaskType:
    internal_api = "internalApi"
    train = "train"
    infer = "infer"


class TaskRunRequest(Request):
    task_id: str


class TaskStatusRequest(Request):
    task_id: str


class Task(GenericCamelModel, Generic[T]):
    """Encapsulates a unit of asynchronously performed work."""

    # Note: The Field object prevents this from being serialized into JSON (and causing a crash)
    client: Client = Field(None, exclude=True)  # Steamship client

    task_id: str = None  # The id of this task
    user_id: str = None  # The user who requested this task
    workspace_id: str = None  # The workspace in which this task is executing

    # Note: The Field object prevents this from being serialized into JSON (and causing a crash)
    expect: Type = Field(
        None, exclude=True
    )  # Type of the expected output once the output is complete.

    input: str = None  # The input provided to the task
    output: T = None  # The output of the task
    state: str = None  # A value in class TaskState

    status_message: str = None  # User-facing message concerning task status
    status_suggestion: str = None  # User-facing suggestion concerning error remediation
    status_code: str = None  # User-facing error code for support assistance
    status_created_on: str = None  # When the status fields were last set

    task_type: str = None  # A value in class TaskType; for internal routing
    task_executor: str = None  #
    task_created_on: str = None  # When the task object was created
    task_last_modified_on: str = None  # When the task object was last modified

    # Long Running Plugin Support
    # The `remote_status_*` fields govern how Steamship Plugins can communicate long-running work back to the engine.
    # If instead of sending data, the plugin sends a status with these fields set, the engine will begin polling for
    # updates, echoing the contents of these fields back to the plugin to communicate, e.g., the jobId of the work
    # being checked. When the work is complete, simply respond with the Response `data` field set as per usual.
    remote_status_input: Optional[
        Dict
    ] = None  # For re-hydrating state in order to check remote status.
    remote_status_output: Optional[
        Dict
    ] = None  # For reporting structured JSON state for error diagnostics.
    remote_status_message: str = None  # User facing message string to report on remote status.

    assigned_worker: str = None  # The worker assigned to complete this task
    started_at: str = None  # When the work on this task began

    max_retries: int = None  # The maximum number of retries allowed for this task
    retries: int = None  # The number of retries already used.

    def as_error(self) -> SteamshipError:
        return SteamshipError(
            message=self.status_message, suggestion=self.status_suggestion, code=self.status_code
        )

    @classmethod
    def parse_obj(cls: Type[BaseModel], obj: Any) -> Task:
        obj = obj["task"] if "task" in obj else obj
        return super().parse_obj(obj)

    @staticmethod
    def get(
        client,
        _id: str = None,
        handle: str = None,
    ) -> Task:
        return client.post(
            "task/get",
            IdentifierRequest(id=_id, handle=handle),
            expect=Task,
        )

    def update(self, other: Optional[Task] = None):
        """Incorporates a `Task` into this object."""
        other = other or Task()
        for k, v in other.__dict__.items():
            self.__dict__[k] = v

    def add_comment(
        self,
        external_id: str = None,
        external_type: str = None,
        external_group: str = None,
        metadata: Any = None,
    ) -> TaskComment:
        return TaskComment.create(
            client=self.client,
            task_id=self.task_id,
            external_id=external_id,
            external_type=external_type,
            external_group=external_group,
            metadata=metadata,
        )

    def post_update(self, fields: Set[str] = None) -> Task:
        """Updates this task in the Steamship Engine."""
        if not isinstance(fields, set):
            raise RuntimeError(f'Unexpected type of "fields": {type(fields)}. Expected type set.')
        body = self.dict(by_alias=True, include={*fields, "task_id"})
        return self.client.post("task/update", body, expect=Task)

    def wait(
        self,
        max_timeout_s: float = 180,
        retry_delay_s: float = 1,
        on_each_refresh: "Optional[Callable[[int, float, Task], None]]" = None,
    ):
        """Polls and blocks until the task has succeeded or failed (or timeout reached).

        Parameters
        ----------
        max_timeout_s : int
            Max timeout in seconds. Default: 180s. After this timeout, an exception will be thrown.
        retry_delay_s : float
            Delay between status checks. Default: 1s.
        on_each_refresh : Optional[Callable[[int, float, Task], None]]
            Optional call back you can get after each refresh is made, including success state refreshes.
            The signature represents: (refresh #, total elapsed time, task)

            WARNING: Do not pass a long-running function to this variable. It will block the update polling.
        """
        t0 = time.perf_counter()
        refresh_count = 0
        while time.perf_counter() - t0 < max_timeout_s and self.state not in (
            TaskState.succeeded,
            TaskState.failed,
        ):
            time.sleep(retry_delay_s)
            self.refresh()
            refresh_count += 1

            # Possibly make a callback so the caller knows we've tried again
            if on_each_refresh:
                on_each_refresh(refresh_count, time.perf_counter() - t0, self)

        # If the task did not complete within the timeout, throw an error
        if self.state not in (TaskState.succeeded, TaskState.failed):
            raise SteamshipError(
                message=f"Task {self.task_id} did not complete within requested timeout of {max_timeout_s}s. The task is still running on the server. You can retrieve its status via Task.get() or try waiting again with wait()."
            )

    def refresh(self):
        if self.task_id is None:
            raise SteamshipError(message="Unable to refresh task because `task_id` is None")

        req = TaskStatusRequest(taskId=self.task_id)
        # TODO (enias): A status call can return both data and task
        # In this case both task and data will include the output (one is string serialized, the other is parsed)
        # Ideally task status only returns the status, not the full output object
        resp = self.client.post("task/status", payload=req, expect=self.expect)
        self.update(resp)


from .client import Client  # noqa: E402

Task.update_forward_refs()
TaskComment.update_forward_refs()