Spaces:
Sleeping
Sleeping
import streamlit as st | |
import streamlit.components.v1 as components | |
import threading | |
import time | |
from queue import Queue | |
import mido # For MIDI I/O | |
from mido import Message | |
from pyo import Server, Sine, SfPlayer, Mixer, Notein, MidiAdsr | |
# ========================= | |
# 1) AUDIO ENGINE (pyo) | |
# ========================= | |
# We’ll create a pyo Server at module load. Adjust sample rate, buffers, etc. as needed. | |
# In many environments, pyo wants to open an audio stream. This might conflict with | |
# Streamlit’s runtime if it's not set up for real-time audio. | |
# We'll do a basic attempt: | |
AUDIO_SR = 44100 | |
s = Server(sr=AUDIO_SR, nchnls=2, buffersize=1024, duplex=1).boot() | |
s.start() | |
# We'll keep a global dictionary of active pyo objects for "notes" to allow polyphony. | |
active_oscillators = {} | |
# A simple function to generate or retrieve an oscillator for a given note. | |
def note_on(note, velocity=100, synth_type='sine'): | |
""" | |
Trigger or re-trigger a note with pyo-based oscillator or sample player. | |
""" | |
# Example approach: a simple sine wave whose frequency is set by MIDI note number | |
freq = mido.midifrequencies[note] # Mido has a built-in freq table | |
amp = velocity / 127.0 * 0.3 # scale amplitude by velocity, 0.3 is arbitrary | |
if note not in active_oscillators: | |
# Create a new oscillator for that note | |
if synth_type == 'sine': | |
osc = Sine(freq=freq, mul=amp).out() | |
else: | |
# For demonstration, we can also do a sample-based approach if you want: | |
# osc = SfPlayer("path_to_some_sample.wav", speed=1, loop=False, mul=amp).out() | |
osc = Sine(freq=freq, mul=amp).out() | |
active_oscillators[note] = osc | |
else: | |
# If the note is already on, you could re-trigger or update amplitude, etc. | |
osc = active_oscillators[note] | |
osc.setFreq(freq) | |
osc.mul = amp | |
def note_off(note): | |
""" | |
Stop a note by turning off or freeing the oscillator. | |
""" | |
if note in active_oscillators: | |
osc = active_oscillators[note] | |
osc.stop() # immediately stop | |
del active_oscillators[note] | |
# If you want a more advanced poly-synth approach, you might consider `Notein`, `MidiAdsr`, etc. | |
# ========================= | |
# 2) DRUM / LOOPS | |
# ========================= | |
# For drum pads, we can load multiple short samples. | |
# We'll store them in a dictionary to trigger by index or note number: | |
drum_samples = { | |
0: "samples/kick.wav", | |
1: "samples/snare.wav", | |
2: "samples/hihat.wav", | |
3: "samples/clap.wav", | |
# ... | |
} | |
def drum_trigger(index, velocity=100): | |
"""Simple function to trigger a drum sample from a dictionary of sample files.""" | |
if index not in drum_samples: | |
return | |
vol = velocity / 127.0 * 0.8 | |
# Create a one-shot player | |
sfp = SfPlayer(drum_samples[index], loop=False, mul=vol).out() | |
# ========================= | |
# 3) ARPEGGIATOR EXAMPLE | |
# ========================= | |
class Arpeggiator: | |
def __init__(self, bpm=120): | |
self.bpm = bpm | |
self.notes_held = set() | |
self.running = False | |
self.thread = None | |
def start(self): | |
if self.running: | |
return | |
self.running = True | |
self.thread = threading.Thread(target=self.run, daemon=True) | |
self.thread.start() | |
def stop(self): | |
self.running = False | |
if self.thread: | |
self.thread.join() | |
def run(self): | |
# Very simple up pattern | |
delay = 60.0 / self.bpm / 2.0 # half of a quarter note => eighth notes | |
idx = 0 | |
while self.running: | |
if self.notes_held: | |
sorted_notes = sorted(list(self.notes_held)) | |
note = sorted_notes[idx % len(sorted_notes)] | |
note_on(note, 100) # arpeggiator triggers a note_on | |
time.sleep(delay * 0.5) | |
note_off(note) # note_off after half the step | |
time.sleep(delay * 0.5) | |
idx += 1 | |
else: | |
time.sleep(0.01) | |
def note_on(self, note): | |
self.notes_held.add(note) | |
def note_off(self, note): | |
if note in self.notes_held: | |
self.notes_held.remove(note) | |
# ========================= | |
# 4) HTML + JS | |
# ========================= | |
def get_keyboard_html(): | |
""" | |
Returns an HTML snippet for a 5-octave Qwerty-Hancock keyboard | |
from 'C3' upward. | |
""" | |
return """ | |
<!DOCTYPE html> | |
<html> | |
<head> | |
<style> | |
#keyboard { | |
margin: 0 auto; | |
} | |
.qwerty-hancock-wrapper { | |
width: 900px; /* Adjust to taste */ | |
margin: 0 auto; | |
} | |
</style> | |
<script src="https://cdnjs.cloudflare.com/ajax/libs/qwerty-hancock/0.10.0/qwerty-hancock.min.js"></script> | |
</head> | |
<body> | |
<div class="qwerty-hancock-wrapper"> | |
<div id="keyboard"></div> | |
</div> | |
<script> | |
// 5 octaves from C3 to (C3 + 5 octaves => C8 is beyond 5, but let's do ~ C7). | |
const keyboard = new QwertyHancock({ | |
id: 'keyboard', | |
width: 900, | |
height: 150, | |
octaves: 5, | |
startNote: 'C3', | |
whiteKeyColour: 'white', | |
blackKeyColour: '#444', | |
activeColour: '#FF6961' | |
}); | |
// Build a note->MIDI dictionary. We'll do some approximate mappings: | |
// C3=48, C#3=49, ... up to B7 or so. Expand as needed. | |
// We'll hardcode for demonstration, or generate dynamically. | |
const noteToMidi = { | |
'C3':48,'C#3':49,'D3':50,'D#3':51,'E3':52,'F3':53,'F#3':54,'G3':55,'G#3':56,'A3':57,'A#3':58,'B3':59, | |
'C4':60,'C#4':61,'D4':62,'D#4':63,'E4':64,'F4':65,'F#4':66,'G4':67,'G#4':68,'A4':69,'A#4':70,'B4':71, | |
'C5':72,'C#5':73,'D5':74,'D#5':75,'E5':76,'F5':77,'F#5':78,'G5':79,'G#5':80,'A5':81,'A#5':82,'B5':83, | |
'C6':84,'C#6':85,'D6':86,'D#6':87,'E6':88,'F6':89,'F#6':90,'G6':91,'G#6':92,'A6':93,'A#6':94,'B6':95, | |
'C7':96,'C#7':97,'D7':98,'D#7':99,'E7':100,'F7':101,'F#7':102,'G7':103,'G#7':104,'A7':105,'A#7':106,'B7':107 | |
}; | |
keyboard.keyDown = function (note, freq) { | |
const midiNote = noteToMidi[note]; | |
if(midiNote !== undefined){ | |
window.parent.postMessage({ | |
type: 'midiEvent', | |
data: { | |
type: 'noteOn', | |
note: midiNote, | |
velocity: 100 | |
} | |
}, '*'); | |
} | |
}; | |
keyboard.keyUp = function (note, freq) { | |
const midiNote = noteToMidi[note]; | |
if(midiNote !== undefined){ | |
window.parent.postMessage({ | |
type: 'midiEvent', | |
data: { | |
type: 'noteOff', | |
note: midiNote, | |
velocity: 0 | |
} | |
}, '*'); | |
} | |
}; | |
</script> | |
</body> | |
</html> | |
""" | |
def get_drum_pads_html(): | |
""" | |
Returns an HTML snippet for a 4x4 (16) grid of drum pads. | |
Each pad sends a 'drumTrigger' event with index 0..15. | |
""" | |
return """ | |
<!DOCTYPE html> | |
<html> | |
<head> | |
<style> | |
.drum-grid { | |
display: grid; | |
grid-template-columns: repeat(4, 80px); | |
grid-gap: 10px; | |
width: max-content; | |
margin: 0 auto; | |
} | |
.drum-pad { | |
width: 80px; | |
height: 80px; | |
background-color: #666; | |
display: flex; | |
align-items: center; | |
justify-content: center; | |
color: #fff; | |
font-weight: bold; | |
font-size: 1.2em; | |
cursor: pointer; | |
user-select: none; | |
border-radius: 8px; | |
} | |
.drum-pad:active { | |
background-color: #999; | |
} | |
</style> | |
</head> | |
<body> | |
<div class="drum-grid"> | |
<div class="drum-pad" data-pad="0">Pad 1</div> | |
<div class="drum-pad" data-pad="1">Pad 2</div> | |
<div class="drum-pad" data-pad="2">Pad 3</div> | |
<div class="drum-pad" data-pad="3">Pad 4</div> | |
<div class="drum-pad" data-pad="4">Pad 5</div> | |
<div class="drum-pad" data-pad="5">Pad 6</div> | |
<div class="drum-pad" data-pad="6">Pad 7</div> | |
<div class="drum-pad" data-pad="7">Pad 8</div> | |
<div class="drum-pad" data-pad="8">Pad 9</div> | |
<div class="drum-pad" data-pad="9">Pad10</div> | |
<div class="drum-pad" data-pad="10">Pad11</div> | |
<div class="drum-pad" data-pad="11">Pad12</div> | |
<div class="drum-pad" data-pad="12">Pad13</div> | |
<div class="drum-pad" data-pad="13">Pad14</div> | |
<div class="drum-pad" data-pad="14">Pad15</div> | |
<div class="drum-pad" data-pad="15">Pad16</div> | |
</div> | |
<script> | |
document.querySelectorAll('.drum-pad').forEach(pad => { | |
pad.addEventListener('mousedown', () => { | |
let padIndex = parseInt(pad.getAttribute('data-pad')); | |
window.parent.postMessage({ | |
type: 'drumTrigger', | |
data: { | |
padIndex: padIndex, | |
velocity: 100 | |
} | |
}, '*'); | |
}); | |
}); | |
</script> | |
</body> | |
</html> | |
""" | |
# ========================= | |
# 5) STREAMLIT APP | |
# ========================= | |
def main(): | |
st.title("Python Synth with 5-Octave Keyboard + Drum Pads (pyo-based)") | |
# Arpeggiator in session state | |
if 'arpeggiator' not in st.session_state: | |
st.session_state.arpeggiator = Arpeggiator(bpm=120) | |
# BPM slider | |
st.session_state.arpeggiator.bpm = st.slider("Arpeggiator BPM", 60, 240, 120) | |
use_arp = st.checkbox("Enable Arpeggiator", value=False) | |
if use_arp: | |
st.session_state.arpeggiator.start() | |
else: | |
st.session_state.arpeggiator.stop() | |
# MIDI I/O | |
st.subheader("MIDI Ports") | |
in_ports = mido.get_input_names() | |
out_ports = mido.get_output_names() | |
input_choice = st.selectbox("MIDI Input", ["None"] + in_ports) | |
output_choice = st.selectbox("MIDI Output", ["None"] + out_ports) | |
# Manage opening/closing | |
if 'midi_in' not in st.session_state: | |
st.session_state.midi_in = None | |
if 'midi_out' not in st.session_state: | |
st.session_state.midi_out = None | |
# Callback for incoming hardware MIDI | |
def midi_in_callback(msg): | |
if msg.type in ['note_on', 'note_off']: | |
# Convert to dictionary | |
event = { | |
'type': 'noteOn' if msg.type == 'note_on' else 'noteOff', | |
'note': msg.note, | |
'velocity': msg.velocity | |
} | |
st.session_state.incoming_events.put(event) | |
def open_input(port): | |
if port == "None": | |
return None | |
return mido.open_input(port, callback=midi_in_callback) | |
def open_output(port): | |
if port == "None": | |
return None | |
return mido.open_output(port) | |
# Re-open if changed | |
if st.session_state.midi_in and st.session_state.midi_in.name != input_choice: | |
st.session_state.midi_in.close() | |
st.session_state.midi_in = open_input(input_choice) | |
elif not st.session_state.midi_in and input_choice != "None": | |
st.session_state.midi_in = open_input(input_choice) | |
if st.session_state.midi_out and st.session_state.midi_out.name != output_choice: | |
st.session_state.midi_out.close() | |
st.session_state.midi_out = open_output(output_choice) | |
elif not st.session_state.midi_out and output_choice != "None": | |
st.session_state.midi_out = open_output(output_choice) | |
st.write("Press keys on hardware (if connected) or use the on-screen UI below:") | |
# On-screen 5-octave keyboard | |
st.subheader("5-Octave Keyboard") | |
components.html(get_keyboard_html(), height=220) | |
# Drum pads | |
st.subheader("Drum Pads (16)") | |
components.html(get_drum_pads_html(), height=220) | |
# Hidden script to route postMessage -> Streamlit | |
components.html(""" | |
<script> | |
window.addEventListener('message', function(e) { | |
if (e.data.type === 'midiEvent') { | |
// forward to parent's postMessage | |
window.parent.postMessage({ | |
type: 'streamlit:message', | |
data: { | |
type: 'midi_event', | |
event: e.data.data | |
} | |
}, '*'); | |
} else if (e.data.type === 'drumTrigger') { | |
window.parent.postMessage({ | |
type: 'streamlit:message', | |
data: { | |
type: 'drum_event', | |
event: e.data.data | |
} | |
}, '*'); | |
} | |
}); | |
</script> | |
""", height=0) | |
# We'll store inbound events in a queue | |
if 'incoming_events' not in st.session_state: | |
st.session_state.incoming_events = Queue() | |
# A small debug output | |
debug_area = st.empty() | |
# We define a function to dispatch events to pyo and optionally to MIDI out | |
def dispatch_event(event): | |
etype = event['type'] | |
if etype in ('noteOn', 'noteOff'): | |
note = event['note'] | |
vel = event.get('velocity', 100) | |
# Arp logic or direct | |
if use_arp: | |
# Send to arpeggiator | |
if etype == 'noteOn': | |
st.session_state.arpeggiator.note_on(note) | |
else: | |
st.session_state.arpeggiator.note_off(note) | |
else: | |
# Trigger directly | |
if etype == 'noteOn': | |
note_on(note, vel) | |
else: | |
note_off(note) | |
# Also echo to output port | |
if st.session_state.midi_out: | |
if etype == 'noteOn': | |
out_msg = Message('note_on', note=note, velocity=vel) | |
st.session_state.midi_out.send(out_msg) | |
else: | |
out_msg = Message('note_off', note=note, velocity=0) | |
st.session_state.midi_out.send(out_msg) | |
debug_area.write(f"MIDI Note Event -> {event}") | |
elif etype == 'drum': | |
# for drum, we have event['padIndex'] | |
idx = event['padIndex'] | |
vel = event.get('velocity', 100) | |
drum_trigger(idx, vel) | |
debug_area.write(f"Drum Trigger -> Pad {idx}") | |
else: | |
pass | |
# We'll do a short poll in the Streamlit loop | |
# (In actual usage, you'd use a Streamlit custom component to pass these more elegantly.) | |
def poll_events(): | |
while not st.session_state.incoming_events.empty(): | |
e = st.session_state.incoming_events.get_nowait() | |
dispatch_event(e) | |
poll_events() | |
st.write("Try pressing the on-screen keys/pads or your hardware keyboard/pads. Enjoy!") | |
# Cleanup | |
def cleanup(): | |
st.session_state.arpeggiator.stop() | |
for note in list(active_oscillators.keys()): | |
note_off(note) | |
if st.session_state.midi_in: | |
st.session_state.midi_in.close() | |
if st.session_state.midi_out: | |
st.session_state.midi_out.close() | |
s.stop() | |
st.on_session_end(cleanup) | |
if __name__ == "__main__": | |
main() | |