import numpy as np import pandas as pd from scipy.signal import resample from sklearn.preprocessing import scale import soundfile as sf from gemini import query_gemini_rest import librosa import tempfile EXPECTED_LEN = 256 STEP = 128 PCG_LABELS = [ "Normal", "Aortic Stenosis", "Mitral Stenosis", "Mitral Valve Prolapse", "Pericardial Murmurs" ] LABELS_EMG = ["healthy", "myopathy", "neuropathy"] def load_uploaded_file(file, signal_type="ECG") -> np.ndarray: name = file.name.lower() if signal_type in ("ECG", "EMG"): text = file.read().decode("utf-8").strip() if "," in text: vals = [float(x) for x in text.split(",") if x.strip()] else: vals = [float(x) for x in text.splitlines() if x.strip()] return np.array(vals, dtype=np.float32) if signal_type == "VAG": if name.endswith(".csv"): df = pd.read_csv(file) features = [ "rms_amplitude", "peak_frequency", "spectral_entropy", "zero_crossing_rate", "mean_frequency", ] return df[features].iloc[0].values.astype(np.float32) elif name.endswith(".npy"): return np.load(file) elif name.endswith(".wav"): data, _ = sf.read(file) return data.astype(np.float32) raise ValueError("Unsupported VAG file format.") if signal_type == "PCG" and name.endswith((".wav", ".flac", ".mp3")): data, _ = sf.read(file) if data.ndim > 1: data = data[:, 0] return data.astype(np.float32) raise ValueError("Unsupported file format.") def preprocess_signal(x: np.ndarray) -> np.ndarray: if x.size != EXPECTED_LEN: x = resample(x, EXPECTED_LEN) return scale(x).astype(np.float32) def segment_signal(raw: np.ndarray) -> np.ndarray: raw = preprocess_signal(raw) seg = raw.reshape(EXPECTED_LEN, 1) return seg[np.newaxis, ...] PCG_INPUT_LEN = 995 def preprocess_pcg_waveform(wave: np.ndarray) -> np.ndarray: if wave.ndim > 1: wave = wave.mean(axis=1) if len(wave) < PCG_INPUT_LEN: wave = np.pad(wave, (0, PCG_INPUT_LEN - len(wave))) else: wave = wave[:PCG_INPUT_LEN] wave = (wave - np.mean(wave)) / (np.std(wave) + 1e-8) return wave.astype(np.float32) def analyze_pcg_signal(file, model, gemini_key=None): signal, _ = sf.read(file) signal = preprocess_pcg_waveform(signal) input_data = signal.reshape(1, PCG_INPUT_LEN, 1) preds = model.predict(input_data, verbose=0)[0] labels = [ "Normal", "Aortic Stenosis", "Mitral Stenosis", "Mitral Valve Prolapse", "Pericardial Murmurs", ] idx = int(np.argmax(preds)) confidence = float(preds[idx]) label = labels[idx] gem_txt = None if gemini_key: gem_txt = query_gemini_rest("PCG", label, confidence, gemini_key) return label, label, confidence, gem_txt def pcg_to_features(file_obj, target_sr=16000, n_mels=128, n_frames=112): with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp: tmp.write(file_obj.read()) tmp_path = tmp.name y, sr = librosa.load(tmp_path, sr=target_sr, mono=True) mel = librosa.feature.melspectrogram(y=y, sr=sr, n_fft=512, hop_length=256, n_mels=n_mels) logmel = librosa.power_to_db(mel, ref=np.max) if logmel.shape[1] < n_frames: pad_width = n_frames - logmel.shape[1] pad = np.zeros((n_mels, pad_width)) logmel = np.hstack((logmel, pad)) else: logmel = logmel[:, :n_frames] feat = logmel.flatten().astype(np.float32) return feat[np.newaxis, ...] def analyze_emg_signal(file, model, gemini_key=""): raw = load_uploaded_file(file, signal_type="EMG") WINDOW = 1000 wins = [] if len(raw) < WINDOW: pad = np.pad(raw, (0, WINDOW - len(raw))) wins.append(((pad - pad.mean()) / (pad.std()+1e-6)).reshape(WINDOW, 1)) else: for i in range(0, len(raw) - WINDOW + 1, WINDOW): win = raw[i:i+WINDOW] win = (win - win.mean()) / (win.std() + 1e-6) wins.append(win.reshape(WINDOW, 1)) X = np.array(wins, dtype=np.float32) preds = model.predict(X, verbose=0) classes = np.argmax(preds, axis=1) final = int(np.bincount(classes).argmax()) conf = float(preds[:, final].mean()) human = LABELS_EMG[final] gemini_txt = None if gemini_key: gemini_txt = query_gemini_rest("EMG", human, conf, gemini_key) return human, conf, gemini_txt FEATURE_COLS = [ "rms_amplitude", "peak_frequency", "spectral_entropy", "zero_crossing_rate", "mean_frequency", ] def vag_to_features(file_obj) -> np.ndarray: df = pd.read_csv(file_obj) x = df[FEATURE_COLS].iloc[0].values.astype(np.float32) return x.reshape(1, -1) def predict_vag_from_features(file_obj, model_bundle, gemini_key=""): model = model_bundle["model"] scaler = model_bundle["scaler"] encoder = model_bundle["encoder"] x = vag_to_features(file_obj) x_s = scaler.transform(x) prob = model.predict_proba(x_s)[0] idx = int(np.argmax(prob)) conf = float(prob[idx]) label = encoder.inverse_transform([idx])[0].title() gem_note = ( query_gemini_rest("VAG", label, conf, gemini_key) if gemini_key else None ) return label, label, conf, gem_note