File size: 2,590 Bytes
b115d50
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
from abc import abstractmethod
from typing import List, Optional

from steamship import Block, File, SteamshipError, Tag, Task, TaskState
from steamship.invocable import InvocableResponse
from steamship.invocable.plugin_service import PluginRequest
from steamship.plugin.blockifier.blockifier import Blockifier
from steamship.plugin.inputs.raw_data_plugin_input import RawDataPluginInput
from steamship.plugin.outputs.block_and_tag_plugin_output import BlockAndTagPluginOutput

TRANSCRIPT_ID = "transcript_id"


class Transcriber(Blockifier):
    @abstractmethod
    def start_transcription(self, audio_file: PluginRequest[RawDataPluginInput]) -> str:
        """Start a transcription job and return an id to identify the transcription."""
        raise NotImplementedError()

    @abstractmethod
    def get_transcript(self, transcript_id: str) -> (Optional[str], Optional[List[Tag]]):
        """Method to retrieve the transcript and optional Tags. If the transcription is not ready, return None"""
        raise NotImplementedError()

    def _get_transcript(self, transcript_id: str) -> InvocableResponse:
        """Retrieve the transcript using the transcript_id."""
        transcript, tags = self.get_transcript(transcript_id)
        if transcript is None and tags is None:
            return InvocableResponse(
                status=Task(
                    state=TaskState.running,
                    remote_status_message="Transcription is ongoing.",
                    remote_status_input={"transcript_id": transcript_id},
                )
            )
        else:
            return InvocableResponse(
                data=BlockAndTagPluginOutput(
                    file=File(
                        blocks=[
                            Block(
                                text=transcript,
                                tags=tags,
                            )
                        ]
                    )
                )
            )

    def run(
        self, request: PluginRequest[RawDataPluginInput]
    ) -> InvocableResponse[BlockAndTagPluginOutput]:
        if request.is_status_check:
            if TRANSCRIPT_ID not in request.status.remote_status_input:
                raise SteamshipError(message="Status check requests need to provide a valid job id")
            transcript_id = request.status.remote_status_input[TRANSCRIPT_ID]
            return self._get_transcript(transcript_id)

        else:
            transcript_id = self.start_transcription(audio_file=request.data.data)
            return self._get_transcript(transcript_id)