| import time | |
| import streamlit as st | |
| # from transformers import pipeline | |
| import os | |
| import torch | |
| import datetime | |
| import numpy as np | |
| import soundfile | |
| from wavmark.utils import file_reader | |
| from audioseal import AudioSeal | |
| import torchaudio | |
| from pydub import AudioSegment | |
| import io | |
| # pipeline = pipeline(task="image-classification", model="julien-c/hotdog-not-hotdog") | |
| # st.title("Hot Dog? Or Not?") | |
| # file_name = st.file_uploader("Upload a hot dog candidate image") | |
| # if file_name is not None: | |
| # col1, col2 = st.columns(2) | |
| # image = Image.open(file_name) | |
| # col1.image(image, use_column_width=True) | |
| # predictions = pipeline(image) | |
| # col2.header("Probabilities") | |
| # for p in predictions: | |
| # col2.subheader(f"{ p['label'] }: { round(p['score'] * 100, 1)}%") | |
| # def read_as_single_channel_16k(audio_file, def_sr=16000, verbose=True, aim_second=None): | |
| # assert os.path.exists(audio_file) | |
| # st.markdown(os.path.exists(audio_file)) | |
| # file_extension = os.path.splitext(audio_file)[1].lower() | |
| # st.markdown(file_extension) | |
| # if file_extension == ".mp3": | |
| # data, origin_sr = librosa.load(audio_file, sr=None) | |
| # elif file_extension in [".wav", ".flac"]: | |
| # data, origin_sr = soundfile.read(audio_file) | |
| # else: | |
| # raise Exception("unsupported file:" + file_extension) | |
| # # channel check | |
| # if len(data.shape) == 2: | |
| # left_channel = data[:, 0] | |
| # if verbose: | |
| # print("Warning! the input audio has multiple chanel, this tool only use the first channel!") | |
| # data = left_channel | |
| # # sample rate check | |
| # if origin_sr != def_sr: | |
| # data = resampy.resample(data, origin_sr, def_sr) | |
| # if verbose: | |
| # print("Warning! The original samplerate is not 16Khz; the watermarked audio will be re-sampled to 16KHz") | |
| # sr = def_sr | |
| # audio_length_second = 1.0 * len(data) / sr | |
| # # if verbose: | |
| # # print("input length :%d second" % audio_length_second) | |
| # if aim_second is not None: | |
| # signal = data | |
| # assert len(signal) > 0 | |
| # current_second = len(signal) / sr | |
| # if current_second < aim_second: | |
| # repeat_count = int(aim_second / current_second) + 1 | |
| # signal = np.repeat(signal, repeat_count) | |
| # data = signal[0:sr * aim_second] | |
| # return data, sr, audio_length_second | |
| # def my_read_file(audio_path, max_second): | |
| # signal, sr, audio_length_second = read_as_single_channel_16k(audio_path, default_sr) | |
| # if audio_length_second > max_second: | |
| # signal = signal[0:default_sr * max_second] | |
| # audio_length_second = max_second | |
| # return signal, sr, audio_length_second | |
| def create_default_value(): | |
| if "def_value" not in st.session_state: | |
| def_val_npy = np.random.choice([0, 1], size=32 - len_start_bit) | |
| def_val_str = "".join([str(i) for i in def_val_npy]) | |
| st.session_state.def_value = def_val_str | |
| def download_sample_audio(): | |
| url = "https://keithito.com/LJ-Speech-Dataset/LJ037-0171.wav" | |
| with open("test.wav", "wb") as f: | |
| resp = urllib.request.urlopen(url) | |
| f.write(resp.read()) | |
| wav, sample_rate = torchaudio.load("test.wav") | |
| return wav, sample_rate | |
| # Main web app | |
| def main(): | |
| create_default_value() | |
| # st.title("MDS07") | |
| # st.write("https://github.com/wavmark/wavmark") | |
| markdown_text = """ | |
| # MDS07 | |
| [AudioSeal](https://github.com/jcha0155/AudioSealEnhanced) is the next-generation watermarking tool driven by AI. | |
| You can upload an audio file and encode a custom 16-bit watermark or perform decoding from a watermarked audio. | |
| This page is for demonstration usage and only process **the first minute** of the audio. | |
| If you have longer files for processing, we recommend using [our python toolkit](https://github.com/jcha0155/AudioSealEnhanced). | |
| """ | |
| # 使用st.markdown渲染Markdown文本 | |
| st.markdown(markdown_text) | |
| audio_file = st.file_uploader("Upload Audio", type=["wav", "mp3"], accept_multiple_files=False) | |
| if audio_file: | |
| # 保存文件到本地: | |
| # tmp_input_audio_file = os.path.join("/tmp/", audio_file.name) | |
| # st.markdown(tmp_input_audio_file) | |
| # with open(tmp_input_audio_file, "wb") as f: | |
| # f.write(audio_file.getbuffer()) | |
| # st.audio(tmp_input_audio_file, format="mp3/wav") | |
| # Save file to local storage | |
| tmp_input_audio_file = os.path.join("/tmp/", audio_file.name) | |
| with open("test.wav", "wb") as f: | |
| f.write(audio_file.getbuffer()) | |
| # # Convert MP3 to WAV using pydub | |
| # mp3_audio = AudioSegment.from_mp3(tmp_input_audio_file) | |
| # wav_output_file = tmp_input_audio_file.replace(".mp3", ".wav") | |
| # mp3_audio.export(wav_output_file, format="wav") | |
| # Load the WAV file using torchaudio | |
| wav, sample_rate = torchaudio.load("test.wav") | |
| st.markdown("Before unsquuezewav") | |
| st.markdown(wav) | |
| #Unsqueeze for line 176 | |
| wav= wav.unsqueeze(0) | |
| # #2nd way | |
| # # Convert the tensor to a byte-like object in WAV format | |
| # with io.BytesIO() as buffer: | |
| # # Save the audio to the buffer using torchaudio | |
| # torchaudio.save(buffer, wav, default_sr, format="wav") | |
| # # Get the byte data from the buffer | |
| # wav = buffer.getvalue() | |
| # # Play the audio file (WAV format) | |
| # st.audio(wav, format="audio/wav") | |
| # wav, sample_rate = torchaudio.load(audio_file, format="mp3/wav") | |
| st.markdown("SR") | |
| st.markdown(sample_rate) | |
| st.markdown("after unsqueeze wav") | |
| st.markdown(wav) | |
| # 展示文件到页面上 | |
| # st.audio(tmp_input_audio_file, format="audio/wav") | |
| action = st.selectbox("Select Action", ["Add Watermark", "Decode Watermark"]) | |
| if action == "Add Watermark": | |
| watermark_text = st.text_input("The watermark (0, 1 list of length-16):", value=st.session_state.def_value) | |
| add_watermark_button = st.button("Add Watermark", key="add_watermark_btn") | |
| if add_watermark_button: # 点击按钮后执行的 | |
| if audio_file and watermark_text: | |
| with st.spinner("Adding Watermark..."): | |
| #wav = my_read_file(wav,max_second_encode) | |
| #1st attempt | |
| watermark = model.get_watermark(wav, default_sr) | |
| watermarked_audio = wav + watermark | |
| print(watermarked_audio.size()) | |
| size = watermarked_audio.size() | |
| st.markdown(size) | |
| print(watermarked_audio.squeeze()) | |
| squeeze = watermarked_audio.squeeze(1) | |
| shape = squeeze.size() | |
| st.markdown(shape) | |
| st.markdown(squeeze) | |
| watermarked_audio = torchaudio.save("output.wav", squeeze, default_sr) | |
| st.audio(watermarked_audio, format="audio/wav") | |
| #2nd Attempt | |
| # watermarked_audio = model(wav, sample_rate=default_sr, alpha=1) | |
| # print(watermarked_audio.size()) | |
| # size = watermarked_audio.size() | |
| # st.markdown(size) | |
| # print(watermarked_audio.squeeze()) | |
| # squeeze = watermarked_audio.squeeze(1) | |
| # shape = squeeze.size() | |
| # st.markdown(shape) | |
| # st.markdown(squeeze) | |
| # # watermarked_audio, encode_time_cost = add_watermark(tmp_input_audio_file, watermark_text) | |
| # st.write("Watermarked Audio:") | |
| # st.markdown(watermarked_audio) | |
| # print("watermarked_audio:", watermarked_audio) | |
| # watermarked_audio = torchaudio.save("output.wav", squeeze, default_sr) | |
| # st.audio(watermarked_audio, format="audio/wav") | |
| #st.write("Time Cost: %d seconds" % encode_time_cost) | |
| # # st.button("Add Watermark", disabled=False) | |
| # elif action == "Decode Watermark": | |
| # if st.button("Decode"): | |
| # with st.spinner("Decoding..."): | |
| # decode_watermark(tmp_input_audio_file) | |
| if __name__ == "__main__": | |
| default_sr = 16000 | |
| max_second_encode = 60 | |
| max_second_decode = 30 | |
| len_start_bit = 16 | |
| device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu') | |
| # model = wavmark.load_model().to(device) | |
| model = AudioSeal.load_generator("audioseal_wm_16bits") | |
| main() | |
| # audio_path = "/Users/my/Library/Mobile Documents/com~apple~CloudDocs/CODE/PycharmProjects/4_语音水印/419_huggingface水印/WavMark/example.wav" | |
| # decoded_watermark, decode_cost = decode_watermark(audio_path) | |
| # print(decoded_watermark) |