Spaces:
Running
on
A10G
Running
on
A10G
import io | |
from flask import Flask, Response, send_from_directory, jsonify, request, abort | |
import os | |
from flask_cors import CORS | |
from multiprocessing import Queue | |
import base64 | |
from typing import Any, List, Dict, Tuple | |
from multiprocessing import Queue | |
import logging | |
import sys | |
from server.AudioTranscriber import AudioTranscriber | |
from server.ActionProcessor import ActionProcessor | |
from server.StandaloneApplication import StandaloneApplication | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", | |
handlers=[logging.StreamHandler(sys.stdout)], | |
) | |
# Get a logger for your app | |
logger = logging.getLogger(__name__) | |
# Use a directory in the user's home folder for static files | |
STATIC_DIR = "/app/server/static" if os.getenv("DEBUG") != "true" else "./server" | |
audio_queue: "Queue[io.BytesIO]" = Queue() | |
text_queue: "Queue[str]" = Queue() | |
action_queue: "Queue[str]" = Queue() | |
app = Flask(__name__, static_folder=STATIC_DIR) | |
_ = CORS( | |
app, | |
origins=["*"], | |
methods=["GET", "POST", "OPTIONS"], | |
allow_headers=["Content-Type", "Authorization"], | |
) | |
def add_header(response: Response): | |
# Add permissive CORS headers | |
response.headers["Access-Control-Allow-Origin"] = "*" | |
response.headers["Access-Control-Allow-Methods"] = "GET, POST, PUT, DELETE, OPTIONS" | |
response.headers["Access-Control-Allow-Headers"] = "*" # Allow all headers | |
# Cross-origin isolation headers | |
response.headers["Cross-Origin-Embedder-Policy"] = "require-corp" | |
response.headers["Cross-Origin-Opener-Policy"] = "same-origin" | |
response.headers["Cross-Origin-Resource-Policy"] = "cross-origin" | |
return response | |
def serve_index(): | |
# Handle logs=container query parameter | |
if request.args.get("logs") == "container": | |
files = ( | |
os.listdir(app.static_folder) if os.path.exists(app.static_folder) else [] | |
) | |
return jsonify( | |
{ | |
"static_folder": app.static_folder, | |
"exists": os.path.exists(app.static_folder), | |
"files": files, | |
"pwd": os.getcwd(), | |
"user": os.getenv("USER"), | |
} | |
) | |
try: | |
response = send_from_directory(app.static_folder, "index.html") | |
response.headers["Cross-Origin-Opener-Policy"] = "same-origin" | |
response.headers["Cross-Origin-Embedder-Policy"] = "require-corp" | |
return response | |
except FileNotFoundError: | |
abort( | |
404, | |
description=f"Static folder or index.html not found. Static folder: {app.static_folder}", | |
) | |
def get_data(): | |
return jsonify({"status": "success"}) | |
def process_data(): | |
try: | |
# Check content type | |
content_type = request.headers.get("Content-Type", "") | |
# Handle different content types | |
if "application/json" in content_type: | |
data = request.get_json() | |
audio_base64 = data.get("audio_chunk") | |
elif "multipart/form-data" in content_type: | |
audio_base64 = request.form.get("audio_chunk") | |
else: | |
# Try to get raw data | |
audio_base64 = request.get_data().decode("utf-8") | |
# Validate the incoming data | |
if not audio_base64: | |
return ( | |
jsonify({"error": "Missing audio_chunk in request", "status": "error"}), | |
400, | |
) | |
# Decode the base64 audio chunk | |
try: | |
audio_chunk = base64.b64decode(audio_base64) | |
except Exception as e: | |
return ( | |
jsonify( | |
{ | |
"error": f"Failed to decode audio chunk: {str(e)}", | |
"status": "error", | |
} | |
), | |
400, | |
) | |
# Put the audio chunk in the queue for processing | |
audio_queue.put(io.BytesIO(audio_chunk)) | |
return jsonify( | |
{ | |
"status": "success", | |
} | |
) | |
except Exception as e: | |
return ( | |
jsonify( | |
{"error": f"Failed to process request: {str(e)}", "status": "error"} | |
), | |
500, | |
) | |
def get_actions() -> Tuple[Response, int]: | |
"""Retrieve and clear all pending actions from the queue""" | |
actions: List[Dict[str, Any]] = [] | |
# Drain the queue into our actions list | |
while not action_queue.empty(): | |
try: | |
actions.append(action_queue.get_nowait()) | |
except Exception: | |
break | |
return jsonify({"actions": actions, "status": "success"}), 200 | |
def serve_static(path: str): | |
try: | |
return send_from_directory(app.static_folder, path) | |
except FileNotFoundError: | |
abort(404, description=f"File {path} not found in static folder") | |
if __name__ == "__main__": | |
if os.path.exists(app.static_folder): | |
logger.info(f"Static folder contents: {os.listdir(app.static_folder)}") | |
os.makedirs(app.static_folder, exist_ok=True) | |
# Start the audio transcriber thread | |
transcriber = AudioTranscriber(audio_queue, text_queue) | |
transcriber.start() | |
# Start the action processor thread | |
MISTRAL_API_KEY = os.getenv("MISTRAL_API_KEY") | |
if not MISTRAL_API_KEY: | |
raise ValueError("MISTRAL_API_KEY is not set") | |
action_processor = ActionProcessor(text_queue, action_queue, MISTRAL_API_KEY) | |
action_processor.start() | |
options: Any = { | |
"bind": "0.0.0.0:7860", | |
"workers": 3, | |
"worker_class": "sync", | |
"timeout": 120, | |
"forwarded_allow_ips": "*", | |
"accesslog": None, # Disable access logging | |
"errorlog": "-", # Keep error logging to stderr | |
"capture_output": True, | |
"enable_stdio_inheritance": True, | |
} | |
StandaloneApplication(app, options).run() | |