from pydantic import BaseModel from fastapi import FastAPI, UploadFile, HTTPException from ffmpeg import input, output, run from botocore.exceptions import ClientError import ffmpeg import boto3 import os app = FastAPI() # Define your AWS S3 credentials and bucket name aws_access_key_id = os.environ['AWS_ACCESS_KEY_ID'] aws_secret_access_key = os.environ['AWS_SECRET_ACCESS_KEY'] s3_bucket_name = os.environ['S3_BUCKET_NAME'] aws_env = os.environ['AWS_ENV'] aws_region = 'eu-central-1' temp_dir = '/tmp/' # Initialize an S3 client s3_client = boto3.client('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name=aws_region) class CutRequest(BaseModel): quote_filename: str segments: list[tuple[float, float]] news_name: str news_id: int async def json_stripper(request: CutRequest): raw_body = await request.body() try: # Decode the raw body body_str = raw_body.decode('utf-8') # Remove any surrounding quotes if present body_str = body_str.strip('"') # Unescape the JSON string body_str = body_str.replace('\\"', '"') # Parse the JSON body = json.loads(body_str) return body except json.JSONDecodeError as e: print(f"JSON Decode Error: {e}") raise HTTPException(status_code=400, detail=f"Invalid JSON: {str(e)}") except Exception as e: print(f"Error: {e}") raise HTTPException(status_code=400, detail=str(e)) def download_file(news_name: str, quote_filename: str, new_filename: str = "source.mp3"): s3_directory = f'{aws_env}/{news_name}' s3_object_key = f'{s3_directory}/{quote_filename}' local_file = os.path.join(temp_dir, new_filename) try: # Ensure the download directory exists os.makedirs(temp_dir, exist_ok=True) s3_client.download_file(s3_bucket_name, s3_object_key, local_file) print(f"File downloaded and saved as: {local_file}") # Return the local_file path return local_file except ClientError as e: if e.response['Error']['Code'] == "404": raise HTTPException(status_code=404, detail=f"The file {quote_filename} does not exist in S3.") else: raise HTTPException(status_code=500, detail=f"Failed to download file from S3: {str(e)}") except Exception as e: raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(e)}") @app.post("/cut-audio") async def cut_audio(request: CutRequest): # Use json_stripper to parse the request body body = await json_stripper(request) # Validate the body against CutRequest model try: cut_request = CutRequest(**body) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) # Now you can use cut_request in your function download_file(cut_request.news_name, cut_request.quote_filename) for i, (start, end) in enumerate(cut_request.segments): output_file = f"/tmp/cut_quote_{i}.mp3" try: ( ffmpeg .input('/tmp/source.mp3', ss=start, to=end) .output(output_file) .run() ) except ffmpeg.Error as e: raise HTTPException(status_code=500, detail=str(e)) try: s3_output_key = f'{aws_env}/{cut_request.news_id}/genBase_segment_{i}.mp3' s3_client.upload_file(output_file, s3_bucket_name, s3_output_key) except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to upload file to S3: {e}") return {"message": "Audio cut successfully"} @app.post("/merge_audio/{output_key}") async def merge_audio(output_key: str): s3_directory = output_key # Output file name output_file = temp_dir + 'output.mp3' # Use python-ffmpeg to merge audio clips # input_file_paths = [temp_dir + input_file for input_file in input_files] # input_args = [input(file_path) for file_path in input_file_paths] input_file = 'list.txt' try: intermediate_output = temp_dir + 'intermediate_output.mp3' ffmpeg.input(input_file, format='concat', safe=0).output(intermediate_output, c='copy').run() ffmpeg.input(intermediate_output).audio.filter('loudnorm').output(output_file, audio_bitrate='256k', ar='44100').run() except Exception as e: raise HTTPException(status_code=500, detail=f"FFmpeg Error: {e}") # Upload the merged file back to S3 s3_output_key = f'{s3_directory}/final.mp3' try: s3_client.upload_file(output_file, s3_bucket_name, s3_output_key) except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to upload file to S3: {e}") return {"message": "Audio clips successfully merged and saved in S3"} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7680)