from pydantic import BaseModel from fastapi import FastAPI, UploadFile, HTTPException from ffmpeg import input, output, run import ffmpeg import boto3 import os app = FastAPI() # Define your AWS S3 credentials and bucket name aws_access_key_id = os.environ['AWS_ACCESS_KEY_ID'] aws_secret_access_key = os.environ['AWS_SECRET_ACCESS_KEY'] s3_bucket_name = os.environ['S3_BUCKET_NAME'] aws_env = os.environ['AWS_ENV'] aws_region = 'eu-central-1' temp_dir = '/tmp/' # Initialize an S3 client s3_client = boto3.client('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name=aws_region) class CutRequest(BaseModel): segments: list[tuple[float, float]] def download_file(news_name: str,quote_filename: str): # Define a function to download a single file s3_directory = f'{aws_env}/{news_name}' input_file = quote_filename try: s3_object_key = f'{s3_directory}/{input_file}' local_file = temp_dir + input_file s3_client.download_file(s3_bucket_name, s3_object_key, local_file) except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to download a file from S3: {e}") # def download_files(output_key: str): # # Temporary directory to store uploaded audio files # s3_directory = output_key # input_files = ['genBase.mp3', 'quote.mp3'] # try: # for input_file in input_files: # s3_object_key = f'{s3_directory}/{input_file}' # local_file = temp_dir + input_file # s3_client.download_file(s3_bucket_name, s3_object_key, local_file) # except Exception as e: # raise HTTPException(status_code=500, detail=f"Failed to download files from S3: {e}") @app.post("/cut-audio/{output_key}") def cut_audio(request: CutRequest, output_key: str): download_files(output_key) for i, (start, end) in enumerate(request.segments): output_file = f"/tmp/cut_quote_{i}.mp3" # output_file = f"/tmp/genBase_segment_{i}.mp3" try: ( ffmpeg .input('/tmp/genBase.mp3', ss=start, to=end) .output(output_file) .run() ) except ffmpeg.Error as e: raise HTTPException(status_code=500, detail=str(e)) try: s3_output_key = f'{output_key}/genBase_segment_{i}.mp3' s3_client.upload_file(output_file, s3_bucket_name, s3_output_key) except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to upload file to S3: {e}") return {"message": "Audio cut successfully"} @app.post("/merge_audio/{output_key}") async def merge_audio(output_key: str): s3_directory = output_key # Output file name output_file = temp_dir + 'output.mp3' # Use python-ffmpeg to merge audio clips # input_file_paths = [temp_dir + input_file for input_file in input_files] # input_args = [input(file_path) for file_path in input_file_paths] input_file = 'list.txt' try: intermediate_output = temp_dir + 'intermediate_output.mp3' ffmpeg.input(input_file, format='concat', safe=0).output(intermediate_output, c='copy').run() ffmpeg.input(intermediate_output).audio.filter('loudnorm').output(output_file, audio_bitrate='256k', ar='44100').run() except Exception as e: raise HTTPException(status_code=500, detail=f"FFmpeg Error: {e}") # Upload the merged file back to S3 s3_output_key = f'{s3_directory}/final.mp3' try: s3_client.upload_file(output_file, s3_bucket_name, s3_output_key) except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to upload file to S3: {e}") return {"message": "Audio clips successfully merged and saved in S3"} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7680)