Spaces:
Runtime error
Runtime error
File size: 3,901 Bytes
bea535a 1b6025d a7e2ea5 1b6025d 829b713 1b6025d 829b713 754a99d be918f3 bea535a 1b6025d be918f3 1b6025d bea535a 754a99d 1b6025d 754a99d 1b6025d 754a99d 1b6025d bea535a 754a99d bea535a 6bf70b2 bea535a 1b6025d a7e2ea5 1b6025d e6db5f3 1b6025d 2c12c7a 1b6025d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
from pydantic import BaseModel
from fastapi import FastAPI, UploadFile, HTTPException
from ffmpeg import input, output, run
import ffmpeg
import boto3
import os
app = FastAPI()
# Define your AWS S3 credentials and bucket name
aws_access_key_id = os.environ['AWS_ACCESS_KEY_ID']
aws_secret_access_key = os.environ['AWS_SECRET_ACCESS_KEY']
s3_bucket_name = os.environ['S3_BUCKET_NAME']
aws_env = os.environ['AWS_ENV']
aws_region = 'eu-central-1'
temp_dir = '/tmp/'
# Initialize an S3 client
s3_client = boto3.client('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name=aws_region)
class CutRequest(BaseModel):
segments: list[tuple[float, float]]
def download_file(news_name: str,quote_filename: str):
# Define a function to download a single file
s3_directory = f'{aws_env}/{news_name}'
input_file = quote_filename
try:
s3_object_key = f'{s3_directory}/{input_file}'
local_file = temp_dir + input_file
s3_client.download_file(s3_bucket_name, s3_object_key, local_file)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to download a file from S3: {e}")
# def download_files(output_key: str):
# # Temporary directory to store uploaded audio files
# s3_directory = output_key
# input_files = ['genBase.mp3', 'quote.mp3']
# try:
# for input_file in input_files:
# s3_object_key = f'{s3_directory}/{input_file}'
# local_file = temp_dir + input_file
# s3_client.download_file(s3_bucket_name, s3_object_key, local_file)
# except Exception as e:
# raise HTTPException(status_code=500, detail=f"Failed to download files from S3: {e}")
@app.post("/cut-audio/{output_key}")
def cut_audio(request: CutRequest, output_key: str):
download_files(output_key)
for i, (start, end) in enumerate(request.segments):
output_file = f"/tmp/cut_quote_{i}.mp3"
# output_file = f"/tmp/genBase_segment_{i}.mp3"
try:
(
ffmpeg
.input('/tmp/genBase.mp3', ss=start, to=end)
.output(output_file)
.run()
)
except ffmpeg.Error as e:
raise HTTPException(status_code=500, detail=str(e))
try:
s3_output_key = f'{output_key}/genBase_segment_{i}.mp3'
s3_client.upload_file(output_file, s3_bucket_name, s3_output_key)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to upload file to S3: {e}")
return {"message": "Audio cut successfully"}
@app.post("/merge_audio/{output_key}")
async def merge_audio(output_key: str):
s3_directory = output_key
# Output file name
output_file = temp_dir + 'output.mp3'
# Use python-ffmpeg to merge audio clips
# input_file_paths = [temp_dir + input_file for input_file in input_files]
# input_args = [input(file_path) for file_path in input_file_paths]
input_file = 'list.txt'
try:
intermediate_output = temp_dir + 'intermediate_output.mp3'
ffmpeg.input(input_file, format='concat', safe=0).output(intermediate_output, c='copy').run()
ffmpeg.input(intermediate_output).audio.filter('loudnorm').output(output_file, audio_bitrate='256k', ar='44100').run()
except Exception as e:
raise HTTPException(status_code=500, detail=f"FFmpeg Error: {e}")
# Upload the merged file back to S3
s3_output_key = f'{s3_directory}/final.mp3'
try:
s3_client.upload_file(output_file, s3_bucket_name, s3_output_key)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to upload file to S3: {e}")
return {"message": "Audio clips successfully merged and saved in S3"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7680)
|