|
- """
- Python audio synthesis framework.
- """
-
- from pymod import _ysynth_init, _ysynth_init, _ysynth_set_callback, \
- _ysynth_mp3_open, _ysynth_mp3_get_format, _ysynth_mp3_read, \
- _ysynth_mp3_close
-
- from math import cos, sin, pi, modf
- from itertools import izip
- import numpy as np
- from scipy.signal import lfilter
-
- __all__ = ['YSynth', 'Sin', 'Cos', 'Saw', 'RevSaw', 'Square', 'Pulse', 'WhiteNoise',
- 'UserSignal', 'MP3Stream']
-
- class YSynth(object):
- """
- YSynth synthesis and audio context.
- """
-
- def __init__(self, samplerate=44100, channels=2):
- """
- Setup a simple synthesizer.
-
- samplerate: Samples/second
- channels: Number of output channels.
- e.g. 1 for mono, 2 for stereo
- """
- self.context = _ysynth_init(samplerate, channels)
- self.samples = 0
- self.volume = 0.75
-
- # XXX need to fetch 'acquired' values from ysynth context
- self.samplerate = samplerate
- self.channels = channels
- self.graph = None
- self.set_callback(self.default_callback)
-
- def deinit(self):
- """
- Shutdown synthesizer, afterwards this object becomes useless.
- """
- if self.context:
- _ysynth_shutdown(self.context)
- self.context = None
-
- def default_callback(self, channels):
- """
- Default synthesis callback.
- """
-
- # Outputs silence if no graph available
- if self.graph is None:
- channels.fill(0)
- return
-
- try:
- # Process audio graph
- buf_len = channels.shape[0]
- self.chunk_size = buf_len
-
- next_chunk = next(self.graph)
- # Do we need to mix from mono to stereo?
- if len(next_chunk.shape) == 1:
- next_chunk = np.dot(np.reshape(next_chunk, (-1, 1)), ((1, 1),))
- #print channels.shape, next_chunk.shape
- np.round(next_chunk * 32767.5, 0, out=channels)
-
- # Advance sampleclock
- self.samples += buf_len
-
- # When this happens set the graph back to
- # None, this suppresses futher errors, and let's
- # a first exception slip through, causing a backtrace
- # in ysynth.
- except StopIteration:
- self.graph = None
-
- def set_graph(self, graph):
- if isinstance(graph, tuple):
- graph = ChannelJoiner(*graph)
- graph.set_synth(self)
- self.graph = iter(graph)
-
- def get_graph(self):
- return self.graph
-
- def __del__(self):
- """
- Deinitialise synth before being removed from mem
- """
- print "Deinitialising"
- self.deinit()
-
- def set_callback(self, func):
- """
- Set audio output function.
-
- Without any callback the synthesizer will simply output silence.
-
- The callback should adhere the following signature:
- def callback(channels):
-
- channels shall contain a list of lists and each list
- contains a large block of 0.0 floats describing the next set of samples.
- Those floats should be set to the chunk of audio.
- """
-
- _ysynth_set_callback(self.context, func)
-
- def get_callback(self):
- return _ysynth_get_callback(self.context)
-
- class YConstant(object):
- """
- Unchanging signal output.
- """
-
- def __init__(self, const):
- self.const = float(const)
-
- def __iter__(self):
- return iter(self())
-
- def __call__(self):
- while True:
- x = np.empty((self.synth.chunk_size,))
- x.fill(self.const)
- yield x
-
- def set_synth(self, synth):
- self.synth = synth
-
- # XXX: Write a function that broadcasts
- # various channeled signals to a common
- # shape for operations that require this.
- def broadcast(stream):
- """
- This generator takes an iterator yielding a variable number of arrays and
- scalars and broadcasts any arrays and scalars that require this to the
- correct amount of channels. If an array argument could not be normalized,
- this function raises a ValueError with the appropriate information.
- """
-
- channels = 0
- channels_shape = None
- require_broadcast = False
- stream = iter(stream)
-
- # First, check for a single multichannel count, if not
- # we need to raise an exception.
- args = next(stream)
- for c in args:
- if len(c.shape) == 2 and c.shape[1] != 1:
- if not channels:
- channels = c.shape[1]
- channels_shape = c.shape
- else:
- if channels != c.shape[1]:
- raise ValueError("Cannot broadcast input signals of"
- " shape %r and %r together." % (channels_shape, c.shape))
- else:
- # There is no need for broadcasting if no differently sized
- # channels occur
- if channels:
- require_broadcast = True
-
- # Okay we need to broadcast every argument to 'channels' nr. of channels.
- if require_broadcast:
- while True:
- r = []
- for c in args:
- if len(c.shape) != 2:
- c = np.reshape(c, (-1, 1))
- if c.shape[1] != channels:
- r.append(np.dot(c, ([1] * channels,)))
- else:
- r.append(c)
-
- yield tuple(r)
- args = next(stream)
-
- # Or we don't need to broadcast in which case we simply pass
- # on our results
- while True:
- yield(args)
- args = next(stream)
-
- class YAudioGraphNode(object):
- """
- Base audio graph node
-
- This base class provides YSynth's DSL behaviours such as
- adding, substracting and the sample iteration protocol.
- """
-
- def __init__(self, *inputs):
- self.inputs = []
- lens = 0
-
- # Convert input arguments to audio streaming
- # components
- for stream in inputs:
- # Attempt to join multiple channels
- if isinstance(stream, tuple):
- if len(stream) == 1:
- stream = stream[1]
- else:
- stream = ChannelJoiner(*stream)
-
- # Convert constant value to audio stream
- if not isinstance(stream, YAudioGraphNode):
- stream = YConstant(stream)
-
- # Append to input signals
- self.inputs.append(stream)
-
- self.inputs = tuple(self.inputs)
- #for stream in inputs:
- # if isinstance(stream, YAudioGraphNode):
- # pass
- # # Make sure all multi-channels share the same size
- # if len(stream) != 1:
- # if not lens:
- # lens = len
- # elif lens != len(stream):
- # # TODO: Expand error info to contain sizes
- # raise ValueError("Cannot combine different sized "
- # "multi-channel streams")
-
- #self.channels = lens
-
- def __iter__(self):
- """
- Initialise graph for synthesis, link to sampleclock.
- """
-
- # XXX This function is not graph cycle safe
-
- # FIXME: I need to unpack the channels from the input streams
- # multiplex any mono-streams if there are multi-channel streams
- # available, and then initialise every set of streams' component
- # generator function by calling self with the correct arguments.
-
- # Setup input streams
- for stream in self.inputs:
- stream.set_synth(self.synth)
-
- # Setup self
-
- # The self.samples variable is used for protecting against
- # multiple next() calls, next will only evaluate the next
- # set of input samples if the synth's sampleclock has changed
- self.samples = self.synth.samples - 1
-
- # XXX self.last_sample is currently not initialised on purpose
- # maybe this must be changed at a later time.
-
- # Connect the actual generator components
- sample_iter = iter(self(broadcast(izip(*self.inputs))))
-
- # Build the sample protection function
- def sample_func():
- """
- Make sure multiple next() calls during the same
- clock cycle yield the same sample value.
- """
- while True:
- if self.samples != self.synth.samples:
- self.last_sample = next(sample_iter)
- self.samples = self.synth.samples
-
- yield self.last_sample
-
- return sample_func()
-
- def set_synth(self, synth):
- """
- Set this component's synthesizer.
-
- This is mainly useful for reading the synth's sampleclock. However every
- active component requires a valid synthesizer.
- """
- self.synth = synth
-
- def get_synth(self, synth):
- """
- Return this component's synthesizer.
- """
- return self.synth
-
- def __add__(self, other):
- return Adder(self, other)
-
- def __radd__(self, other):
- return Adder(other, self)
-
- def __sub__(self, other):
- return Subtractor(self, other)
-
- def __rsub__(self, other):
- return Subtractor(other, self)
-
- def __mul__(self, other):
- return Multiplier(self, other)
-
- def __rmul__(self, other):
- return Multiplier(other, self)
-
- def __div__(self, other):
- return Divisor(self, other)
-
- def __rdiv__(self, other):
- return Divisor(other, self)
-
- def __getitem__(self, key):
- """
- Return a delayed version of the output signal.
- """
-
- # Return delay
- if isinstance(key, int) or isinstance(key, float) or \
- isinstance(key, YAudioGraphNode):
- return Delay(self, key)
-
- # Split off channels
- elif isinstance(key, slice):
- if x.start is None:
- start = 0
- if x.stop is None:
- # XXX Channels here
- # FIXME: Incomplete code
- stop = self.hoi
- if x.step is None:
- step = 1
- rng = xrange(start, stop, step)
-
- else:
- rng = key
-
- # XXX FIXME Incomplete channel split code
- return # Return channel ranges
-
- def __next__(self):
- """
- Process next sample.
- """
-
- def __call__(self):
- raise NotImplementedError("You need to inherit this class")
-
- def process(self, *streams):
- raise NotImplementedError("You need to inherit this class")
-
- # Basic signal arithmetic
- class Adder(YAudioGraphNode):
- def __call__(self, l):
- for a, b in l:
- yield a + b
-
- class Subtractor(YAudioGraphNode):
- def __call__(self, l):
- for a, b in l:
- yield a - b
-
- class Multiplier(YAudioGraphNode):
- def __call__(self, l):
- for a, b in l:
- yield a * b
-
- class Divisor(YAudioGraphNode):
- def __call__(self, l):
- for a, b in l:
- yield a / b
-
- # Sample delay
- # Currently uses a fixed buffer of 4096 samples
- # should support dynamic buffer size in the future
- class Delay(YAudioGraphNode):
- def __call__(self, l):
- buf = None
- samples = 0
-
- for sig, delay in l:
- if buf is None:
- if len(sig.shape) == 2:
- buf = np.zeros((4096, sig.shape[1]))
- else:
- buf = np.zeros((4096,))
-
- # Update delay buffer
- if samples + self.synth.chunk_size > buf.shape[0]:
- s_left = buf.shape[0] - samples
- buf[samples:] = sig[:s_left]
- buf[:self.synth.chunk_size - s_left] = sig[s_left:]
- else:
- buf[samples:samples + self.synth.chunk_size] = sig
-
- # Construct delayed signal from delay input
- delayed_idx = np.array((np.arange(samples, samples +
- self.synth.chunk_size) - delay) % buf.shape[0], dtype=int)
- yield buf[delayed_idx]
-
- # Finally update sample pointer
- samples = (samples + self.synth.chunk_size) % buf.shape[0]
-
- # Tracker
- # XXX: Incomplete class
- class SimpleStaticTracker(YAudioGraphNode):
- """
- Think MOD file.
- """
-
- def __init__(self, track):
- self.track = track
-
- # Base oscillator class
- class YOscillator(YAudioGraphNode):
- def __call__(self, l):
- def cycle_gen():
- """
- This function generates the oscillation cycle
- upon which all basic decoupled oscillators base their output
- signal.
- A decoupled oscillator is an oscillator
- with its own cycle generator. These oscillators respond well to
- incoming frequency changes, but due to the limitations of floating
- point are at risk of drifting out of phase, coupled oscillators
- are always in phase with each other.
- The generated cycle ranges from 0.0 to 1.0 exclusive.
- """
-
- last_cycle = [0.0]
-
- for freq, in l:
- # The last_cycle will only be a list during the initial
- # iteration, perfect for a 'once' statement, we want
- # to force the first cycle value to 0.
- if isinstance(last_cycle, list):
- if isinstance(freq, np.ndarray):
- ifreq = freq[0]
- else:
- ifreq = freq
- last_cycle = [-(ifreq / float(self.synth.samplerate))]
-
- # Compute cycle using IIR filter and np.fmod
- # XXX: I won't work with multichannel input
- cycle, last_cycle = lfilter([1], [1, -1], freq /
- float(self.synth.samplerate) *
- np.ones((self.synth.chunk_size,) + freq.shape[1:]),
- zi=last_cycle, axis=0)
- yield np.fmod(cycle, 1.0)
- last_cycle = np.fmod(last_cycle, 1.0)
-
- return self.oscillate(cycle_gen())
-
- def oscillate(self, l):
- raise NotImplementedError("Inherit this class")
-
- # Channel modifiers
- class ChannelJoiner(YAudioGraphNode):
- """
- Join multiple input channels into a single
- output stream.
- """
-
- def __call__(self, l):
- for chunks in l:
- yield np.hstack(np.reshape(chunk, (-1, 1)) for chunk in chunks)
-
- # User programmable signal
- class UserSignal(YAudioGraphNode):
- """
- Simple programmable output signal.
- """
-
- def __init__(self, out=0.0):
- YAudioGraphNode.__init__(self)
- self.cur_out = out
- self.ps_list = []
-
- def __call__(self, l):
- while True:
- x = np.empty((self.synth.chunk_size,))
- if len(self.ps_list):
- pos = 0
- cur_out = self.cur_out
-
- # FIXME: lrn2python queue
- while len(self.ps_list):
- change, delay = self.ps_list.pop(0)
- if delay + pos > self.synth.chunk_size:
- x[pos:] = cur_out
- delay -= self.synth.chunk_size - pos
- self.ps_list.insert(0, (change, delay))
- pos = self.synth.chunk_size
- break
- else:
- x[pos:pos + delay] = cur_out
- pos += delay
- cur_out = change
-
- x[pos:] = cur_out
- self.cur_out = cur_out
-
- else:
- x.fill(self.cur_out)
-
- yield x
-
- def add_signal_change(self, *changes):
- """
- Program a sequence of signal changes in the
- output signal.
- """
- self.ps_list.extend(changes)
-
- def pulse(self, delay, high=1.0, low=0.0):
- """
- Add a pulse change to the output signal.
- This is a convenience function that calls
- add_signal_change with the appropriate arguments.
- """
- self.add_signal_change((high, delay), (low, 1))
-
- def set_output(self, out):
- self.cur_out = out
-
- # Noise signals
- # XXX: I am probably not White Noise, fix me
- # up at a later time.
- class WhiteNoise(YAudioGraphNode):
- def __call__(self, l):
- while True:
- yield np.random.rand(self.synth.chunk_size)
-
- # Basic oscillators
- class Sin(YOscillator):
- """
- Sine wave oscillator
- """
- def oscillate(self, l):
- for pos in l:
- yield np.sin(2 * pi * pos)
-
- class Cos(YOscillator):
- """
- Cosine wave oscillator
- """
- def oscillate(self, l):
- for pos in l:
- yield np.cos(2 * pi * pos)
-
- class Saw(YOscillator):
- """
- Saw wave oscillator
- """
- def oscillate(self, l):
- for pos in l:
- yield pos * 2.0 - 1.0
-
- class RevSaw(YOscillator):
- """
- Reverse Saw wave oscillator
- """
- def oscillate(self, l):
- for pos in l:
- yield -(pos * 2.0 - 1.0)
-
- class Square(YOscillator):
- """
- Square wave oscillator
- """
- def oscillate(self, l):
- for pos in l:
- #yield 1.0 if pos < 0.5 else -1.0
- pos = pos - 0.5
- yield -(np.abs(pos) / pos)
-
- class Pulse(YOscillator):
- """
- Pulse oscillator
- """
- def oscillate(self, l):
- zi = [1.0]
- for pos in l:
- pos, zi = lfilter([-1, 1], [1], pos, zi=zi)
- yield np.array(pos > 0, dtype=float)
-
- # MP3 playback
- class MP3Stream(YAudioGraphNode):
- def __init__(self, filename):
- YAudioGraphNode.__init__(self)
-
- # Open MP3 file
- self.filename = filename
- self.mp3_handle = _ysynth_mp3_open(filename)
- self.samplerate, self.channels = _ysynth_mp3_get_format(self.mp3_handle)
-
- def __call__(self, l):
- """
- Return next chunk of mp3 audio.
- """
-
- while True:
- yield _ysynth_mp3_read(self.mp3_handle, self.synth.chunk_size)
-
- def __del__(self):
- """
- Close MP3 file and 'free' handle
- """
-
- _ysynth_mp3_close(self.mp3_handle)
- self.mp3_handle = None
-
- # Resamplers
- class ResampleNN(YAudioGraphNode):
- """
- Resamplers have their own sample clock which
- distinguishes them from the usual components, obviously
- necessary because they change the samplerate of the input
- signal.
- """
-
- def __init__(self, in_audio, in_rate=None, out_rate=None):
- self.synth = self
-
- if in_rate is None:
- in_rate = 44100
-
- if out_rate is None:
- out_rate = 44100
-
- YAudioGraphNode.__init__(self, in_audio, in_rate, out_rate)
-
- def __call__(self, l):
- for in_audio, in_rate, out_rate in l:
- pass
-
- def set_synth(self, synth):
- self.real_synth = synth
-
- # Flanger effect
- # FIXME: Incomplete class
- class Flanger(YAudioGraphNode):
- """
- Perform virtual tape flange by mixing the
- input signal with a delayed version.
- """
|