Spaces:
Sleeping
Sleeping
Commit
·
2f6587c
1
Parent(s):
4143e48
app.py
CHANGED
|
@@ -7,7 +7,6 @@ import tempfile
|
|
| 7 |
import os
|
| 8 |
import warnings
|
| 9 |
from pydub import AudioSegment
|
| 10 |
-
from datetime import datetime
|
| 11 |
import time
|
| 12 |
|
| 13 |
warnings.filterwarnings("ignore")
|
|
@@ -15,28 +14,19 @@ warnings.filterwarnings("ignore")
|
|
| 15 |
app = FastAPI()
|
| 16 |
|
| 17 |
def convert_mp3_to_wav(mp3_path):
|
| 18 |
-
# Convert MP3 to WAV
|
| 19 |
sound = AudioSegment.from_mp3(mp3_path)
|
| 20 |
wav_path = mp3_path.replace(".mp3", ".wav")
|
| 21 |
sound.export(wav_path, format="wav")
|
| 22 |
return wav_path
|
| 23 |
|
| 24 |
def extract_audio_features(audio_file_path):
|
| 25 |
-
# Load the audio file using soundfile
|
| 26 |
waveform, sample_rate = sf.read(audio_file_path)
|
| 27 |
-
|
| 28 |
-
# Ensure waveform is a 1D array (mono audio)
|
| 29 |
if waveform.ndim > 1:
|
| 30 |
waveform = waveform.mean(axis=1)
|
| 31 |
-
|
| 32 |
-
# Calculate basic features (pitch estimation requires a more complex algorithm, but we'll simplify)
|
| 33 |
energy = np.mean(waveform ** 2)
|
| 34 |
-
mfccs = np.mean(np.abs(np.fft.fft(waveform)[:13]), axis=0)
|
| 35 |
-
|
| 36 |
-
|
| 37 |
-
speech_rate = 4.0 # Arbitrary placeholder value for speech rate
|
| 38 |
-
f0 = np.mean(np.abs(np.diff(waveform))) * sample_rate / (2 * np.pi) # Rough pitch estimate
|
| 39 |
-
|
| 40 |
return f0, energy, speech_rate, mfccs, waveform, sample_rate
|
| 41 |
|
| 42 |
def analyze_voice_stress(audio_file_path):
|
|
@@ -54,7 +44,7 @@ def analyze_voice_stress(audio_file_path):
|
|
| 54 |
z_energy = (mean_energy - norm_mean_energy) / norm_std_energy
|
| 55 |
z_speech_rate = (speech_rate - norm_speech_rate) / norm_std_speech_rate
|
| 56 |
stress_score = (0.4 * z_f0) + (0.4 * z_speech_rate) + (0.2 * z_energy)
|
| 57 |
-
stress_level = round(float(1 / (1 + np.exp(-stress_score)) * 100), 2)
|
| 58 |
categories = ["Very Low Stress", "Low Stress", "Moderate Stress", "High Stress", "Very High Stress"]
|
| 59 |
category_idx = min(int(stress_level / 20), 4)
|
| 60 |
stress_category = categories[category_idx]
|
|
@@ -72,9 +62,9 @@ def analyze_text_stress(text: str):
|
|
| 72 |
class StressResponse(BaseModel):
|
| 73 |
stress_level: float
|
| 74 |
category: str
|
| 75 |
-
gender: str = None
|
| 76 |
-
|
| 77 |
-
|
| 78 |
size: str
|
| 79 |
|
| 80 |
@app.post("/analyze-stress/", response_model=StressResponse)
|
|
@@ -88,7 +78,6 @@ async def analyze_stress(
|
|
| 88 |
|
| 89 |
start_time = time.time()
|
| 90 |
|
| 91 |
-
# Handle audio file analysis
|
| 92 |
if file or file_path:
|
| 93 |
if file:
|
| 94 |
if not (file.filename.endswith(".wav") or file.filename.endswith(".mp3")):
|
|
@@ -105,38 +94,35 @@ async def analyze_stress(
|
|
| 105 |
temp_audio_path = file_path
|
| 106 |
file_size = os.path.getsize(file_path)
|
| 107 |
|
| 108 |
-
# Convert MP3 to WAV if needed
|
| 109 |
if temp_audio_path.endswith(".mp3"):
|
| 110 |
temp_audio_path = convert_mp3_to_wav(temp_audio_path)
|
| 111 |
|
| 112 |
try:
|
| 113 |
result = analyze_voice_stress(temp_audio_path)
|
| 114 |
-
processing_time_ms = int((time.time() - start_time) * 1000)
|
| 115 |
result.update({
|
| 116 |
-
"
|
| 117 |
-
"
|
| 118 |
-
"size": f"{round(file_size / 1024, 2)} KB"
|
| 119 |
})
|
| 120 |
return JSONResponse(content=result, status_code=200)
|
| 121 |
except Exception as e:
|
| 122 |
raise HTTPException(status_code=500, detail=str(e))
|
| 123 |
finally:
|
| 124 |
-
# Clean up temporary files
|
| 125 |
if file:
|
| 126 |
os.remove(temp_audio_path)
|
| 127 |
|
| 128 |
-
# Handle text analysis
|
| 129 |
elif text:
|
| 130 |
result = analyze_text_stress(text)
|
| 131 |
-
processing_time_ms = int((time.time() - start_time) * 1000)
|
| 132 |
result.update({
|
| 133 |
-
"
|
| 134 |
-
"
|
| 135 |
-
"size": "N/A"
|
| 136 |
})
|
| 137 |
return JSONResponse(content=result, status_code=200)
|
| 138 |
|
| 139 |
if __name__ == "__main__":
|
| 140 |
import uvicorn
|
| 141 |
-
port = int(os.getenv("PORT", 7860))
|
| 142 |
uvicorn.run("app:app", host="0.0.0.0", port=port, reload=True)
|
|
|
|
| 7 |
import os
|
| 8 |
import warnings
|
| 9 |
from pydub import AudioSegment
|
|
|
|
| 10 |
import time
|
| 11 |
|
| 12 |
warnings.filterwarnings("ignore")
|
|
|
|
| 14 |
app = FastAPI()
|
| 15 |
|
| 16 |
def convert_mp3_to_wav(mp3_path):
|
|
|
|
| 17 |
sound = AudioSegment.from_mp3(mp3_path)
|
| 18 |
wav_path = mp3_path.replace(".mp3", ".wav")
|
| 19 |
sound.export(wav_path, format="wav")
|
| 20 |
return wav_path
|
| 21 |
|
| 22 |
def extract_audio_features(audio_file_path):
|
|
|
|
| 23 |
waveform, sample_rate = sf.read(audio_file_path)
|
|
|
|
|
|
|
| 24 |
if waveform.ndim > 1:
|
| 25 |
waveform = waveform.mean(axis=1)
|
|
|
|
|
|
|
| 26 |
energy = np.mean(waveform ** 2)
|
| 27 |
+
mfccs = np.mean(np.abs(np.fft.fft(waveform)[:13]), axis=0)
|
| 28 |
+
speech_rate = 4.0
|
| 29 |
+
f0 = np.mean(np.abs(np.diff(waveform))) * sample_rate / (2 * np.pi)
|
|
|
|
|
|
|
|
|
|
| 30 |
return f0, energy, speech_rate, mfccs, waveform, sample_rate
|
| 31 |
|
| 32 |
def analyze_voice_stress(audio_file_path):
|
|
|
|
| 44 |
z_energy = (mean_energy - norm_mean_energy) / norm_std_energy
|
| 45 |
z_speech_rate = (speech_rate - norm_speech_rate) / norm_std_speech_rate
|
| 46 |
stress_score = (0.4 * z_f0) + (0.4 * z_speech_rate) + (0.2 * z_energy)
|
| 47 |
+
stress_level = round(float(1 / (1 + np.exp(-stress_score)) * 100), 2)
|
| 48 |
categories = ["Very Low Stress", "Low Stress", "Moderate Stress", "High Stress", "Very High Stress"]
|
| 49 |
category_idx = min(int(stress_level / 20), 4)
|
| 50 |
stress_category = categories[category_idx]
|
|
|
|
| 62 |
class StressResponse(BaseModel):
|
| 63 |
stress_level: float
|
| 64 |
category: str
|
| 65 |
+
gender: str = None
|
| 66 |
+
status: str
|
| 67 |
+
time: str
|
| 68 |
size: str
|
| 69 |
|
| 70 |
@app.post("/analyze-stress/", response_model=StressResponse)
|
|
|
|
| 78 |
|
| 79 |
start_time = time.time()
|
| 80 |
|
|
|
|
| 81 |
if file or file_path:
|
| 82 |
if file:
|
| 83 |
if not (file.filename.endswith(".wav") or file.filename.endswith(".mp3")):
|
|
|
|
| 94 |
temp_audio_path = file_path
|
| 95 |
file_size = os.path.getsize(file_path)
|
| 96 |
|
|
|
|
| 97 |
if temp_audio_path.endswith(".mp3"):
|
| 98 |
temp_audio_path = convert_mp3_to_wav(temp_audio_path)
|
| 99 |
|
| 100 |
try:
|
| 101 |
result = analyze_voice_stress(temp_audio_path)
|
| 102 |
+
processing_time_ms = int((time.time() - start_time) * 1000)
|
| 103 |
result.update({
|
| 104 |
+
"status": "200 (OK)",
|
| 105 |
+
"time": f"{processing_time_ms} ms",
|
| 106 |
+
"size": f"{round(file_size / 1024, 2)} KB"
|
| 107 |
})
|
| 108 |
return JSONResponse(content=result, status_code=200)
|
| 109 |
except Exception as e:
|
| 110 |
raise HTTPException(status_code=500, detail=str(e))
|
| 111 |
finally:
|
|
|
|
| 112 |
if file:
|
| 113 |
os.remove(temp_audio_path)
|
| 114 |
|
|
|
|
| 115 |
elif text:
|
| 116 |
result = analyze_text_stress(text)
|
| 117 |
+
processing_time_ms = int((time.time() - start_time) * 1000)
|
| 118 |
result.update({
|
| 119 |
+
"status": "200 (OK)",
|
| 120 |
+
"time": f"{processing_time_ms} ms",
|
| 121 |
+
"size": "N/A"
|
| 122 |
})
|
| 123 |
return JSONResponse(content=result, status_code=200)
|
| 124 |
|
| 125 |
if __name__ == "__main__":
|
| 126 |
import uvicorn
|
| 127 |
+
port = int(os.getenv("PORT", 7860))
|
| 128 |
uvicorn.run("app:app", host="0.0.0.0", port=port, reload=True)
|