Spaces:
Runtime error
Runtime error
from pydantic import BaseModel | |
from fastapi import FastAPI, UploadFile, HTTPException | |
from ffmpeg import input, output, run | |
from botocore.exceptions import ClientError | |
import ffmpeg | |
import boto3 | |
import os | |
app = FastAPI() | |
# Define your AWS S3 credentials and bucket name | |
aws_access_key_id = os.environ['AWS_ACCESS_KEY_ID'] | |
aws_secret_access_key = os.environ['AWS_SECRET_ACCESS_KEY'] | |
s3_bucket_name = os.environ['S3_BUCKET_NAME'] | |
aws_env = os.environ['AWS_ENV'] | |
aws_region = 'eu-central-1' | |
temp_dir = '/tmp/' | |
# Initialize an S3 client | |
s3_client = boto3.client('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name=aws_region) | |
class CutRequest(BaseModel): | |
quote_filename: str | |
segments: list[tuple[float, float]] | |
news_name: str | |
news_id: int | |
async def json_stripper(request: CutRequest): | |
raw_body = await request.body() | |
try: | |
# Decode the raw body | |
body_str = raw_body.decode('utf-8') | |
# Remove any surrounding quotes if present | |
body_str = body_str.strip('"') | |
# Unescape the JSON string | |
body_str = body_str.replace('\\"', '"') | |
# Parse the JSON | |
body = json.loads(body_str) | |
return body | |
except json.JSONDecodeError as e: | |
print(f"JSON Decode Error: {e}") | |
raise HTTPException(status_code=400, detail=f"Invalid JSON: {str(e)}") | |
except Exception as e: | |
print(f"Error: {e}") | |
raise HTTPException(status_code=400, detail=str(e)) | |
def download_file(news_name: str, quote_filename: str, new_filename: str = "source.mp3"): | |
s3_directory = f'{aws_env}/{news_name}' | |
s3_object_key = f'{s3_directory}/{quote_filename}' | |
local_file = os.path.join(temp_dir, new_filename) | |
try: | |
# Ensure the download directory exists | |
os.makedirs(temp_dir, exist_ok=True) | |
s3_client.download_file(s3_bucket_name, s3_object_key, local_file) | |
print(f"File downloaded and saved as: {local_file}") | |
# Return the local_file path | |
return local_file | |
except ClientError as e: | |
if e.response['Error']['Code'] == "404": | |
raise HTTPException(status_code=404, detail=f"The file {quote_filename} does not exist in S3.") | |
else: | |
raise HTTPException(status_code=500, detail=f"Failed to download file from S3: {str(e)}") | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(e)}") | |
async def cut_audio(request: CutRequest): | |
# Use json_stripper to parse the request body | |
body = await json_stripper(request) | |
# Validate the body against CutRequest model | |
try: | |
cut_request = CutRequest(**body) | |
except ValueError as e: | |
raise HTTPException(status_code=400, detail=str(e)) | |
# Now you can use cut_request in your function | |
download_file(cut_request.news_name, cut_request.quote_filename) | |
for i, (start, end) in enumerate(cut_request.segments): | |
output_file = f"/tmp/cut_quote_{i}.mp3" | |
try: | |
( | |
ffmpeg | |
.input('/tmp/source.mp3', ss=start, to=end) | |
.output(output_file) | |
.run() | |
) | |
except ffmpeg.Error as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
try: | |
s3_output_key = f'{aws_env}/{cut_request.news_id}/genBase_segment_{i}.mp3' | |
s3_client.upload_file(output_file, s3_bucket_name, s3_output_key) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Failed to upload file to S3: {e}") | |
return {"message": "Audio cut successfully"} | |
async def merge_audio(output_key: str): | |
s3_directory = output_key | |
# Output file name | |
output_file = temp_dir + 'output.mp3' | |
# Use python-ffmpeg to merge audio clips | |
# input_file_paths = [temp_dir + input_file for input_file in input_files] | |
# input_args = [input(file_path) for file_path in input_file_paths] | |
input_file = 'list.txt' | |
try: | |
intermediate_output = temp_dir + 'intermediate_output.mp3' | |
ffmpeg.input(input_file, format='concat', safe=0).output(intermediate_output, c='copy').run() | |
ffmpeg.input(intermediate_output).audio.filter('loudnorm').output(output_file, audio_bitrate='256k', ar='44100').run() | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"FFmpeg Error: {e}") | |
# Upload the merged file back to S3 | |
s3_output_key = f'{s3_directory}/final.mp3' | |
try: | |
s3_client.upload_file(output_file, s3_bucket_name, s3_output_key) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Failed to upload file to S3: {e}") | |
return {"message": "Audio clips successfully merged and saved in S3"} | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=7680) | |