Spaces:
Runtime error
Runtime error
from pydantic import BaseModel | |
from fastapi import FastAPI, UploadFile, HTTPException | |
from ffmpeg import input, output, run | |
import ffmpeg | |
import boto3 | |
import os | |
app = FastAPI() | |
# Define your AWS S3 credentials and bucket name | |
aws_access_key_id = os.environ['AWS_ACCESS_KEY_ID'] | |
aws_secret_access_key = os.environ['AWS_SECRET_ACCESS_KEY'] | |
s3_bucket_name = os.environ['S3_BUCKET_NAME'] | |
aws_env = os.environ['AWS_ENV'] | |
aws_region = 'eu-central-1' | |
temp_dir = '/tmp/' | |
# Initialize an S3 client | |
s3_client = boto3.client('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name=aws_region) | |
class CutRequest(BaseModel): | |
segments: list[tuple[float, float]] | |
def download_file(news_name: str,quote_filename: str): | |
# Define a function to download a single file | |
s3_directory = f'{aws_env}/{news_name}' | |
input_file = quote_filename | |
try: | |
s3_object_key = f'{s3_directory}/{input_file}' | |
local_file = temp_dir + input_file | |
s3_client.download_file(s3_bucket_name, s3_object_key, local_file) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Failed to download a file from S3: {e}") | |
# def download_files(output_key: str): | |
# # Temporary directory to store uploaded audio files | |
# s3_directory = output_key | |
# input_files = ['genBase.mp3', 'quote.mp3'] | |
# try: | |
# for input_file in input_files: | |
# s3_object_key = f'{s3_directory}/{input_file}' | |
# local_file = temp_dir + input_file | |
# s3_client.download_file(s3_bucket_name, s3_object_key, local_file) | |
# except Exception as e: | |
# raise HTTPException(status_code=500, detail=f"Failed to download files from S3: {e}") | |
def cut_audio(request: CutRequest, output_key: str): | |
download_files(output_key) | |
for i, (start, end) in enumerate(request.segments): | |
output_file = f"/tmp/cut_quote_{i}.mp3" | |
# output_file = f"/tmp/genBase_segment_{i}.mp3" | |
try: | |
( | |
ffmpeg | |
.input('/tmp/genBase.mp3', ss=start, to=end) | |
.output(output_file) | |
.run() | |
) | |
except ffmpeg.Error as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
try: | |
s3_output_key = f'{output_key}/genBase_segment_{i}.mp3' | |
s3_client.upload_file(output_file, s3_bucket_name, s3_output_key) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Failed to upload file to S3: {e}") | |
return {"message": "Audio cut successfully"} | |
async def merge_audio(output_key: str): | |
s3_directory = output_key | |
# Output file name | |
output_file = temp_dir + 'output.mp3' | |
# Use python-ffmpeg to merge audio clips | |
# input_file_paths = [temp_dir + input_file for input_file in input_files] | |
# input_args = [input(file_path) for file_path in input_file_paths] | |
input_file = 'list.txt' | |
try: | |
intermediate_output = temp_dir + 'intermediate_output.mp3' | |
ffmpeg.input(input_file, format='concat', safe=0).output(intermediate_output, c='copy').run() | |
ffmpeg.input(intermediate_output).audio.filter('loudnorm').output(output_file, audio_bitrate='256k', ar='44100').run() | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"FFmpeg Error: {e}") | |
# Upload the merged file back to S3 | |
s3_output_key = f'{s3_directory}/final.mp3' | |
try: | |
s3_client.upload_file(output_file, s3_bucket_name, s3_output_key) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Failed to upload file to S3: {e}") | |
return {"message": "Audio clips successfully merged and saved in S3"} | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=7680) | |