Spaces:
Running
Running
| import numpy as np | |
| from scipy import interpolate | |
| def Singleton(cls): | |
| _instance = {} | |
| def _singleton(*args, **kargs): | |
| if cls not in _instance: | |
| _instance[cls] = cls(*args, **kargs) | |
| return _instance[cls] | |
| return _singleton | |
| class RealtimeAudioDistribution(): | |
| def __init__(self) -> None: | |
| self.data = {} | |
| self.max_len = 1024*1024 | |
| self.rate = 48000 # 只读,每秒采样数量 | |
| def clean_up(self): | |
| self.data = {} | |
| def feed(self, uuid, audio): | |
| self.rate, audio_ = audio | |
| # print('feed', len(audio_), audio_[-25:]) | |
| if uuid not in self.data: | |
| self.data[uuid] = audio_ | |
| else: | |
| new_arr = np.concatenate((self.data[uuid], audio_)) | |
| if len(new_arr) > self.max_len: new_arr = new_arr[-self.max_len:] | |
| self.data[uuid] = new_arr | |
| def read(self, uuid): | |
| if uuid in self.data: | |
| res = self.data.pop(uuid) | |
| # print('\r read-', len(res), '-', max(res), end='', flush=True) | |
| else: | |
| res = None | |
| return res | |
| def change_sample_rate(audio, old_sr, new_sr): | |
| duration = audio.shape[0] / old_sr | |
| time_old = np.linspace(0, duration, audio.shape[0]) | |
| time_new = np.linspace(0, duration, int(audio.shape[0] * new_sr / old_sr)) | |
| interpolator = interpolate.interp1d(time_old, audio.T) | |
| new_audio = interpolator(time_new).T | |
| return new_audio.astype(np.int16) |