Spaces:
Sleeping
Sleeping
import numpy as np | |
import pandas as pd | |
from scipy.signal import resample | |
from sklearn.preprocessing import scale | |
import soundfile as sf | |
from gemini import query_gemini_rest | |
import librosa | |
import tempfile | |
EXPECTED_LEN = 256 | |
STEP = 128 | |
PCG_LABELS = [ | |
"Normal", | |
"Aortic Stenosis", | |
"Mitral Stenosis", | |
"Mitral Valve Prolapse", | |
"Pericardial Murmurs" | |
] | |
LABELS_EMG = ["healthy", "myopathy", "neuropathy"] | |
def load_uploaded_file(file, signal_type="ECG") -> np.ndarray: | |
name = file.name.lower() | |
if signal_type in ("ECG", "EMG"): | |
text = file.read().decode("utf-8").strip() | |
if "," in text: | |
vals = [float(x) for x in text.split(",") if x.strip()] | |
else: | |
vals = [float(x) for x in text.splitlines() if x.strip()] | |
return np.array(vals, dtype=np.float32) | |
if signal_type == "VAG": | |
if name.endswith(".csv"): | |
df = pd.read_csv(file) | |
features = [ | |
"rms_amplitude", | |
"peak_frequency", | |
"spectral_entropy", | |
"zero_crossing_rate", | |
"mean_frequency", | |
] | |
return df[features].iloc[0].values.astype(np.float32) | |
elif name.endswith(".npy"): | |
return np.load(file) | |
elif name.endswith(".wav"): | |
data, _ = sf.read(file) | |
return data.astype(np.float32) | |
raise ValueError("Unsupported VAG file format.") | |
if signal_type == "PCG" and name.endswith((".wav", ".flac", ".mp3")): | |
data, _ = sf.read(file) | |
if data.ndim > 1: | |
data = data[:, 0] | |
return data.astype(np.float32) | |
raise ValueError("Unsupported file format.") | |
def preprocess_signal(x: np.ndarray) -> np.ndarray: | |
if x.size != EXPECTED_LEN: | |
x = resample(x, EXPECTED_LEN) | |
return scale(x).astype(np.float32) | |
def segment_signal(raw: np.ndarray) -> np.ndarray: | |
raw = preprocess_signal(raw) | |
seg = raw.reshape(EXPECTED_LEN, 1) | |
return seg[np.newaxis, ...] | |
PCG_INPUT_LEN = 995 | |
def preprocess_pcg_waveform(wave: np.ndarray) -> np.ndarray: | |
if wave.ndim > 1: | |
wave = wave.mean(axis=1) | |
if len(wave) < PCG_INPUT_LEN: | |
wave = np.pad(wave, (0, PCG_INPUT_LEN - len(wave))) | |
else: | |
wave = wave[:PCG_INPUT_LEN] | |
wave = (wave - np.mean(wave)) / (np.std(wave) + 1e-8) | |
return wave.astype(np.float32) | |
def analyze_pcg_signal(file, model, gemini_key=None): | |
signal, _ = sf.read(file) | |
signal = preprocess_pcg_waveform(signal) | |
input_data = signal.reshape(1, PCG_INPUT_LEN, 1) | |
preds = model.predict(input_data, verbose=0)[0] | |
labels = [ | |
"Normal", | |
"Aortic Stenosis", | |
"Mitral Stenosis", | |
"Mitral Valve Prolapse", | |
"Pericardial Murmurs", | |
] | |
idx = int(np.argmax(preds)) | |
confidence = float(preds[idx]) | |
label = labels[idx] | |
gem_txt = None | |
if gemini_key: | |
gem_txt = query_gemini_rest("PCG", label, confidence, gemini_key) | |
return label, label, confidence, gem_txt | |
def pcg_to_features(file_obj, target_sr=16000, n_mels=128, n_frames=112): | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp: | |
tmp.write(file_obj.read()) | |
tmp_path = tmp.name | |
y, sr = librosa.load(tmp_path, sr=target_sr, mono=True) | |
mel = librosa.feature.melspectrogram(y=y, sr=sr, n_fft=512, hop_length=256, n_mels=n_mels) | |
logmel = librosa.power_to_db(mel, ref=np.max) | |
if logmel.shape[1] < n_frames: | |
pad_width = n_frames - logmel.shape[1] | |
pad = np.zeros((n_mels, pad_width)) | |
logmel = np.hstack((logmel, pad)) | |
else: | |
logmel = logmel[:, :n_frames] | |
feat = logmel.flatten().astype(np.float32) | |
return feat[np.newaxis, ...] | |
def analyze_emg_signal(file, model, gemini_key=""): | |
raw = load_uploaded_file(file, signal_type="EMG") | |
WINDOW = 1000 | |
wins = [] | |
if len(raw) < WINDOW: | |
pad = np.pad(raw, (0, WINDOW - len(raw))) | |
wins.append(((pad - pad.mean()) / (pad.std()+1e-6)).reshape(WINDOW, 1)) | |
else: | |
for i in range(0, len(raw) - WINDOW + 1, WINDOW): | |
win = raw[i:i+WINDOW] | |
win = (win - win.mean()) / (win.std() + 1e-6) | |
wins.append(win.reshape(WINDOW, 1)) | |
X = np.array(wins, dtype=np.float32) | |
preds = model.predict(X, verbose=0) | |
classes = np.argmax(preds, axis=1) | |
final = int(np.bincount(classes).argmax()) | |
conf = float(preds[:, final].mean()) | |
human = LABELS_EMG[final] | |
gemini_txt = None | |
if gemini_key: | |
gemini_txt = query_gemini_rest("EMG", human, conf, gemini_key) | |
return human, conf, gemini_txt | |
FEATURE_COLS = [ | |
"rms_amplitude", | |
"peak_frequency", | |
"spectral_entropy", | |
"zero_crossing_rate", | |
"mean_frequency", | |
] | |
def vag_to_features(file_obj) -> np.ndarray: | |
df = pd.read_csv(file_obj) | |
x = df[FEATURE_COLS].iloc[0].values.astype(np.float32) | |
return x.reshape(1, -1) | |
def predict_vag_from_features(file_obj, model_bundle, gemini_key=""): | |
model = model_bundle["model"] | |
scaler = model_bundle["scaler"] | |
encoder = model_bundle["encoder"] | |
x = vag_to_features(file_obj) | |
x_s = scaler.transform(x) | |
prob = model.predict_proba(x_s)[0] | |
idx = int(np.argmax(prob)) | |
conf = float(prob[idx]) | |
label = encoder.inverse_transform([idx])[0].title() | |
gem_note = ( | |
query_gemini_rest("VAG", label, conf, gemini_key) | |
if gemini_key else None | |
) | |
return label, label, conf, gem_note |