""" Python audio synthesis framework. """ from pymod import _ysynth_init, _ysynth_init, _ysynth_set_callback, \ _ysynth_mp3_open, _ysynth_mp3_get_format, _ysynth_mp3_read, \ _ysynth_mp3_close from math import cos, sin, pi, modf from itertools import izip import numpy as np from scipy.signal import lfilter __all__ = ['YSynth', 'Sin', 'Cos', 'Saw', 'RevSaw', 'Square', 'Pulse', 'WhiteNoise', 'UserSignal', 'MP3Stream'] class YSynth(object): """ YSynth synthesis and audio context. """ def __init__(self, samplerate=44100, channels=2): """ Setup a simple synthesizer. samplerate: Samples/second channels: Number of output channels. e.g. 1 for mono, 2 for stereo """ self.context = _ysynth_init(samplerate, channels) self.samples = 0 self.volume = 0.75 # XXX need to fetch 'acquired' values from ysynth context self.samplerate = samplerate self.channels = channels self.graph = None self.set_callback(self.default_callback) def deinit(self): """ Shutdown synthesizer, afterwards this object becomes useless. """ if self.context: _ysynth_shutdown(self.context) self.context = None def default_callback(self, channels): """ Default synthesis callback. """ # Outputs silence if no graph available if self.graph is None: channels.fill(0) return try: # Process audio graph buf_len = channels.shape[0] self.chunk_size = buf_len next_chunk = next(self.graph) # Do we need to mix from mono to stereo? if len(next_chunk.shape) == 1: next_chunk = np.dot(np.reshape(next_chunk, (-1, 1)), ((1, 1),)) #print channels.shape, next_chunk.shape np.round(next_chunk * 32767.5, 0, out=channels) # Advance sampleclock self.samples += buf_len # When this happens set the graph back to # None, this suppresses futher errors, and let's # a first exception slip through, causing a backtrace # in ysynth. except StopIteration: self.graph = None def set_graph(self, graph): if isinstance(graph, tuple): graph = ChannelJoiner(*graph) graph.set_synth(self) self.graph = iter(graph) def get_graph(self): return self.graph def __del__(self): """ Deinitialise synth before being removed from mem """ print "Deinitialising" self.deinit() def set_callback(self, func): """ Set audio output function. Without any callback the synthesizer will simply output silence. The callback should adhere the following signature: def callback(channels): channels shall contain a list of lists and each list contains a large block of 0.0 floats describing the next set of samples. Those floats should be set to the chunk of audio. """ _ysynth_set_callback(self.context, func) def get_callback(self): return _ysynth_get_callback(self.context) class YConstant(object): """ Unchanging signal output. """ def __init__(self, const): self.const = float(const) def __iter__(self): return iter(self()) def __call__(self): while True: x = np.empty((self.synth.chunk_size,)) x.fill(self.const) yield x def set_synth(self, synth): self.synth = synth # XXX: Write a function that broadcasts # various channeled signals to a common # shape for operations that require this. def broadcast(stream): """ This generator takes an iterator yielding a variable number of arrays and scalars and broadcasts any arrays and scalars that require this to the correct amount of channels. If an array argument could not be normalized, this function raises a ValueError with the appropriate information. """ channels = 0 channels_shape = None require_broadcast = False stream = iter(stream) # First, check for a single multichannel count, if not # we need to raise an exception. args = next(stream) for c in args: if len(c.shape) == 2 and c.shape[1] != 1: if not channels: channels = c.shape[1] channels_shape = c.shape else: if channels != c.shape[1]: raise ValueError("Cannot broadcast input signals of" " shape %r and %r together." % (channels_shape, c.shape)) else: # There is no need for broadcasting if no differently sized # channels occur if channels: require_broadcast = True # Okay we need to broadcast every argument to 'channels' nr. of channels. if require_broadcast: while True: r = [] for c in args: if len(c.shape) != 2: c = np.reshape(c, (-1, 1)) if c.shape[1] != channels: r.append(np.dot(c, ([1] * channels,))) else: r.append(c) yield tuple(r) args = next(stream) # Or we don't need to broadcast in which case we simply pass # on our results while True: yield(args) args = next(stream) class YAudioGraphNode(object): """ Base audio graph node This base class provides YSynth's DSL behaviours such as adding, substracting and the sample iteration protocol. """ def __init__(self, *inputs): self.inputs = [] lens = 0 # Convert input arguments to audio streaming # components for stream in inputs: # Attempt to join multiple channels if isinstance(stream, tuple): if len(stream) == 1: stream = stream[1] else: stream = ChannelJoiner(*stream) # Convert constant value to audio stream if not isinstance(stream, YAudioGraphNode): stream = YConstant(stream) # Append to input signals self.inputs.append(stream) self.inputs = tuple(self.inputs) #for stream in inputs: # if isinstance(stream, YAudioGraphNode): # pass # # Make sure all multi-channels share the same size # if len(stream) != 1: # if not lens: # lens = len # elif lens != len(stream): # # TODO: Expand error info to contain sizes # raise ValueError("Cannot combine different sized " # "multi-channel streams") #self.channels = lens def __iter__(self): """ Initialise graph for synthesis, link to sampleclock. """ # XXX This function is not graph cycle safe # FIXME: I need to unpack the channels from the input streams # multiplex any mono-streams if there are multi-channel streams # available, and then initialise every set of streams' component # generator function by calling self with the correct arguments. # Setup input streams for stream in self.inputs: stream.set_synth(self.synth) # Setup self # The self.samples variable is used for protecting against # multiple next() calls, next will only evaluate the next # set of input samples if the synth's sampleclock has changed self.samples = self.synth.samples - 1 # XXX self.last_sample is currently not initialised on purpose # maybe this must be changed at a later time. # Connect the actual generator components sample_iter = iter(self(broadcast(izip(*self.inputs)))) # Build the sample protection function def sample_func(): """ Make sure multiple next() calls during the same clock cycle yield the same sample value. """ while True: if self.samples != self.synth.samples: self.last_sample = next(sample_iter) self.samples = self.synth.samples yield self.last_sample return sample_func() def set_synth(self, synth): """ Set this component's synthesizer. This is mainly useful for reading the synth's sampleclock. However every active component requires a valid synthesizer. """ self.synth = synth def get_synth(self, synth): """ Return this component's synthesizer. """ return self.synth def __add__(self, other): return Adder(self, other) def __radd__(self, other): return Adder(other, self) def __sub__(self, other): return Subtractor(self, other) def __rsub__(self, other): return Subtractor(other, self) def __mul__(self, other): return Multiplier(self, other) def __rmul__(self, other): return Multiplier(other, self) def __div__(self, other): return Divisor(self, other) def __rdiv__(self, other): return Divisor(other, self) def __getitem__(self, key): """ Return a delayed version of the output signal. """ # Return delay if isinstance(key, int) or isinstance(key, float) or \ isinstance(key, YAudioGraphNode): return Delay(self, key) # Split off channels elif isinstance(key, slice): if x.start is None: start = 0 if x.stop is None: # XXX Channels here # FIXME: Incomplete code stop = self.hoi if x.step is None: step = 1 rng = xrange(start, stop, step) else: rng = key # XXX FIXME Incomplete channel split code return # Return channel ranges def __next__(self): """ Process next sample. """ def __call__(self): raise NotImplementedError("You need to inherit this class") def process(self, *streams): raise NotImplementedError("You need to inherit this class") # Basic signal arithmetic class Adder(YAudioGraphNode): def __call__(self, l): for a, b in l: yield a + b class Subtractor(YAudioGraphNode): def __call__(self, l): for a, b in l: yield a - b class Multiplier(YAudioGraphNode): def __call__(self, l): for a, b in l: yield a * b class Divisor(YAudioGraphNode): def __call__(self, l): for a, b in l: yield a / b # Sample delay # Currently uses a fixed buffer of 4096 samples # should support dynamic buffer size in the future class Delay(YAudioGraphNode): def __call__(self, l): buf = None samples = 0 for sig, delay in l: if buf is None: if len(sig.shape) == 2: buf = np.zeros((4096, sig.shape[1])) else: buf = np.zeros((4096,)) # Update delay buffer if samples + self.synth.chunk_size > buf.shape[0]: s_left = buf.shape[0] - samples buf[samples:] = sig[:s_left] buf[:self.synth.chunk_size - s_left] = sig[s_left:] else: buf[samples:samples + self.synth.chunk_size] = sig # Construct delayed signal from delay input delayed_idx = np.array((np.arange(samples, samples + self.synth.chunk_size) - delay) % buf.shape[0], dtype=int) yield buf[delayed_idx] # Finally update sample pointer samples = (samples + self.synth.chunk_size) % buf.shape[0] # Tracker # XXX: Incomplete class class SimpleStaticTracker(YAudioGraphNode): """ Think MOD file. """ def __init__(self, track): self.track = track # Base oscillator class class YOscillator(YAudioGraphNode): def __call__(self, l): def cycle_gen(): """ This function generates the oscillation cycle upon which all basic decoupled oscillators base their output signal. A decoupled oscillator is an oscillator with its own cycle generator. These oscillators respond well to incoming frequency changes, but due to the limitations of floating point are at risk of drifting out of phase, coupled oscillators are always in phase with each other. The generated cycle ranges from 0.0 to 1.0 exclusive. """ last_cycle = [0.0] for freq, in l: # The last_cycle will only be a list during the initial # iteration, perfect for a 'once' statement, we want # to force the first cycle value to 0. if isinstance(last_cycle, list): if isinstance(freq, np.ndarray): ifreq = freq[0] else: ifreq = freq last_cycle = [-(ifreq / float(self.synth.samplerate))] # Compute cycle using IIR filter and np.fmod # XXX: I won't work with multichannel input cycle, last_cycle = lfilter([1], [1, -1], freq / float(self.synth.samplerate) * np.ones((self.synth.chunk_size,) + freq.shape[1:]), zi=last_cycle, axis=0) yield np.fmod(cycle, 1.0) last_cycle = np.fmod(last_cycle, 1.0) return self.oscillate(cycle_gen()) def oscillate(self, l): raise NotImplementedError("Inherit this class") # Channel modifiers class ChannelJoiner(YAudioGraphNode): """ Join multiple input channels into a single output stream. """ def __call__(self, l): for chunks in l: yield np.hstack(np.reshape(chunk, (-1, 1)) for chunk in chunks) # User programmable signal class UserSignal(YAudioGraphNode): """ Simple programmable output signal. """ def __init__(self, out=0.0): YAudioGraphNode.__init__(self) self.cur_out = out self.ps_list = [] def __call__(self, l): while True: x = np.empty((self.synth.chunk_size,)) if len(self.ps_list): pos = 0 cur_out = self.cur_out # FIXME: lrn2python queue while len(self.ps_list): change, delay = self.ps_list.pop(0) if delay + pos > self.synth.chunk_size: x[pos:] = cur_out delay -= self.synth.chunk_size - pos self.ps_list.insert(0, (change, delay)) pos = self.synth.chunk_size break else: x[pos:pos + delay] = cur_out pos += delay cur_out = change x[pos:] = cur_out self.cur_out = cur_out else: x.fill(self.cur_out) yield x def add_signal_change(self, *changes): """ Program a sequence of signal changes in the output signal. """ self.ps_list.extend(changes) def pulse(self, delay, high=1.0, low=0.0): """ Add a pulse change to the output signal. This is a convenience function that calls add_signal_change with the appropriate arguments. """ self.add_signal_change((high, delay), (low, 1)) def set_output(self, out): self.cur_out = out # Noise signals # XXX: I am probably not White Noise, fix me # up at a later time. class WhiteNoise(YAudioGraphNode): def __call__(self, l): while True: yield np.random.rand(self.synth.chunk_size) # Basic oscillators class Sin(YOscillator): """ Sine wave oscillator """ def oscillate(self, l): for pos in l: yield np.sin(2 * pi * pos) class Cos(YOscillator): """ Cosine wave oscillator """ def oscillate(self, l): for pos in l: yield np.cos(2 * pi * pos) class Saw(YOscillator): """ Saw wave oscillator """ def oscillate(self, l): for pos in l: yield pos * 2.0 - 1.0 class RevSaw(YOscillator): """ Reverse Saw wave oscillator """ def oscillate(self, l): for pos in l: yield -(pos * 2.0 - 1.0) class Square(YOscillator): """ Square wave oscillator """ def oscillate(self, l): for pos in l: #yield 1.0 if pos < 0.5 else -1.0 pos = pos - 0.5 yield -(np.abs(pos) / pos) class Pulse(YOscillator): """ Pulse oscillator """ def oscillate(self, l): zi = [1.0] for pos in l: pos, zi = lfilter([-1, 1], [1], pos, zi=zi) yield np.array(pos > 0, dtype=float) # MP3 playback class MP3Stream(YAudioGraphNode): def __init__(self, filename): YAudioGraphNode.__init__(self) # Open MP3 file self.filename = filename self.mp3_handle = _ysynth_mp3_open(filename) self.samplerate, self.channels = _ysynth_mp3_get_format(self.mp3_handle) def __call__(self, l): """ Return next chunk of mp3 audio. """ while True: yield _ysynth_mp3_read(self.mp3_handle, self.synth.chunk_size) def __del__(self): """ Close MP3 file and 'free' handle """ _ysynth_mp3_close(self.mp3_handle) self.mp3_handle = None # Resamplers class ResampleNN(YAudioGraphNode): """ Resamplers have their own sample clock which distinguishes them from the usual components, obviously necessary because they change the samplerate of the input signal. """ def __init__(self, in_audio, in_rate=None, out_rate=None): self.synth = self if in_rate is None: in_rate = 44100 if out_rate is None: out_rate = 44100 YAudioGraphNode.__init__(self, in_audio, in_rate, out_rate) def __call__(self, l): for in_audio, in_rate, out_rate in l: pass def set_synth(self, synth): self.real_synth = synth # Flanger effect # FIXME: Incomplete class class Flanger(YAudioGraphNode): """ Perform virtual tape flange by mixing the input signal with a delayed version. """