File size: 5,679 Bytes
af325d0
b6093b0
3fc2a44
b6093b0
d0c0836
b6093b0
2b0ce36
3fc2a44
23e5406
d0c0836
9876d02
3fc2a44
2b0ce36
78c135b
3fc2a44
9876d02
3fc2a44
af325d0
3fc2a44
 
 
 
 
 
 
 
 
3b25c9f
ab25f71
 
2a6a104
 
 
 
 
 
 
ab25f71
d0c0836
ab25f71
 
 
 
 
 
 
2a6a104
 
 
 
 
 
 
 
 
 
ab25f71
d0c0836
ab25f71
 
 
 
 
 
b6093b0
 
1aceee0
 
 
 
 
44d365d
b6093b0
 
23e5406
 
b6093b0
 
 
 
ab25f71
 
d63a8c1
 
 
8cdcb92
b6093b0
3fc2a44
d0c0836
3fc2a44
8cdcb92
b6093b0
2b0ce36
b6093b0
 
2b0ce36
 
 
 
b6093b0
 
8cdcb92
b6093b0
8cdcb92
2b0ce36
8cdcb92
 
 
 
 
 
 
2b0ce36
d0c0836
c5829e8
78c135b
6c68a13
2b0ce36
9876d02
c5829e8
6c68a13
 
8cdcb92
b6093b0
 
3fc2a44
 
d0c0836
 
 
 
9876d02
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
from fastapi import FastAPI, UploadFile, HTTPException, Form, Depends
from fastapi.responses import FileResponse, HTMLResponse
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from typing import Optional
from pydantic import BaseModel, field_validator
from utils.process_video import process_video
from utils.zip_response import zip_response
from utils.api_configs import api_configs
from utils.read_html import read_html
from utils.archiver import archiver
from utils.logger import setup_logger
import shutil, os, logging, uvicorn, secrets

app = FastAPI()
security = HTTPBasic()
api_configs_file = os.path.abspath("api_config.yml")

async def get_current_user(credentials: HTTPBasicCredentials = Depends(security)):
    correct_username = secrets.compare_digest(credentials.username, api_configs(api_configs_file)["secrets"]["username"])
    correct_password = secrets.compare_digest(credentials.password, api_configs(api_configs_file)["secrets"]["password"])
    if not (correct_username and correct_password):
        raise HTTPException(
            status_code=401,
            detail="Incorrect email or password",
            headers={"WWW-Authenticate": "Basic"},
        )
    return credentials.username

class MP4Video(BaseModel):
    video_file: UploadFile
    
    @property
    def filename(self):
        return self.video_file.filename
    @property
    def file(self):
        return self.video_file.file

    @field_validator('video_file')
    def validate_video_file(cls, v):
        if not v.filename.endswith('.mp4'):
            raise HTTPException(status_code=500, detail='Invalid video file type. Please upload an MP4 file.')
        return v

class SRTFile(BaseModel):
    srt_file: Optional[UploadFile] = None
    
    @property
    def filename(self):
        return self.srt_file.filename
    @property
    def file(self):
        return self.srt_file.file
    @property
    def size(self):
        return self.srt_file.size

    @field_validator('srt_file')
    def validate_srt_file(cls, v):
        if v.size > 0 and not v.filename.endswith('.srt'):
            raise HTTPException(status_code=422, detail='Invalid subtitle file type. Please upload an SRT file.')
        return v


@app.get("/")
async def root():
    html_content = f"""
    {read_html(os.path.join(os.getcwd(),"static/landing_page.html"))}
    """
    return HTMLResponse(content=html_content)


@app.get("/submit_video/")
async def get_form():
    html_content = f"""
    {read_html(os.path.join(os.getcwd(),"static/submit_video.html"))}
    """
    return HTMLResponse(content=html_content)

@app.post("/process_video/")
async def process_video_api(video_file: MP4Video = Depends(),
                            srt_file: SRTFile = Depends(),
                            task: Optional[str] = Form("transcribe"),
                            max_words_per_line: Optional[int] = Form(6),
                            fontsize: Optional[int] = Form(42),
                            font: Optional[str] = Form("FuturaPTHeavy"),
                            bg_color: Optional[str] = Form("#070a13b3"),
                            text_color: Optional[str] = Form("white"),
                            caption_mode: Optional[str] = Form("desktop"),
                            username: str = Depends(get_current_user)
                            ):
    try:
        logging.info("Creating temporary directories")
        temp_dir = os.path.join(os.getcwd(),"temp")
        os.makedirs(temp_dir, exist_ok=True)
        temp_vid_dir = os.path.join(temp_dir,video_file.filename.split('.')[0])
        os.makedirs(temp_vid_dir, exist_ok=True)
        temp_input_path = os.path.join(temp_vid_dir, video_file.filename)
        logging.info("Copying video UploadFile to the temp_input_path")
        with open(temp_input_path, 'wb') as buffer:
            try:
                shutil.copyfileobj(video_file.file, buffer)
            finally:
                video_file.file.close()
        logging.info("Copying SRT UploadFile to the temp_input_path")
        if srt_file.size > 0:
            SRT_PATH = os.path.abspath(f"{temp_input_path.split('.')[0]}.srt")
            with open(SRT_PATH, 'wb') as buffer:
                try:
                    shutil.copyfileobj(srt_file.file, buffer)
                finally:
                    srt_file.file.close()
            logging.info("Processing the video...")
            output_path, _ = process_video(temp_input_path, SRT_PATH, task, max_words_per_line, fontsize, font, bg_color, text_color, caption_mode)
            logging.info("Zipping response...")
            zip_path = zip_response(os.path.join(temp_vid_dir,"archive.zip"), [output_path, SRT_PATH])
            return FileResponse(zip_path, media_type='application/zip', filename=f"result_{video_file.filename.split('.')[0]}.zip")
        logging.info("Processing the video...")
        output_path, srt_path = process_video(temp_input_path, None, task, max_words_per_line, fontsize, font, bg_color, text_color, caption_mode, api_configs_file)
        logging.info("Zipping response...")
        zip_path = zip_response(os.path.join(temp_vid_dir,"archive.zip"), [output_path, srt_path])
        return  FileResponse(zip_path, media_type='application/zip', filename=f"result_{video_file.filename.split('.')[0]}.zip")
                
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
    
if __name__ == "__main__":
    try:
        archiver()
    except FileNotFoundError:
        pass
    app_logger = setup_logger('dbLogger', 'db.log', level=logging.DEBUG)
    uvicorn.run(app, host="0.0.0.0", port=8000, log_config=app_logger)