|
- """
- Python audio synthesis framework.
- """
-
- from pymod import _ysynth_init, _ysynth_init, _ysynth_set_callback
- from math import cos, sin, pi, modf
- from itertools import izip
- import numpy as np
- from scipy.signal import lfilter
-
- __all__ = ['YSynth', 'Sin', 'Cos', 'Saw', 'RevSaw', 'Square', 'Pulse']
-
- class YSynth(object):
- """
- YSynth synthesis and audio context.
- """
-
- def __init__(self, samplerate=44100, channels=2):
- """
- Setup a simple synthesizer.
-
- samplerate: Samples/second
- channels: Number of output channels.
- e.g. 1 for mono, 2 for stereo
- """
- self.context = _ysynth_init(samplerate, channels)
- self.samples = 0
- self.volume = 0.75
-
- # XXX need to fetch 'acquired' values from ysynth context
- self.samplerate = samplerate
- self.channels = channels
- self.graph = None
- self.set_callback(self.default_callback)
-
- def deinit(self):
- """
- Shutdown synthesizer, afterwards this object becomes useless.
- """
- if self.context:
- _ysynth_shutdown(self.context)
- self.context = None
-
- def default_callback(self, channels):
- """
- Default synthesis callback.
- """
-
- # Outputs silence if no graph available
- if not self.graph:
- return
-
- # Process audio graph
- buf_len = channels.shape[0]
- self.chunk_size = buf_len
-
- next_chunk = next(self.graph)
- # Do we need to mix from mono to stereo?
- if len(next_chunk.shape) == 1:
- next_chunk = np.dot(np.reshape(next_chunk, (-1, 1)), ((1, 1),))
- #print channels.shape, next_chunk.shape
- np.round(next_chunk * 32767.5, 0, out=channels)
-
- # Advance sampleclock
- self.samples += buf_len
-
- def set_graph(self, graph):
- graph.set_synth(self)
- self.graph = iter(graph)
-
- def get_graph(self):
- return self.graph
-
- def __del__(self):
- """
- Deinitialise synth before being removed from mem
- """
- print "Deinitialising"
- self.deinit()
-
- def set_callback(self, func):
- """
- Set audio output function.
-
- Without any callback the synthesizer will simply output silence.
-
- The callback should adhere the following signature:
- def callback(channels):
-
- channels shall contain a list of lists and each list
- contains a large block of 0.0 floats describing the next set of samples.
- Those floats should be set to the chunk of audio.
- """
-
- _ysynth_set_callback(self.context, func)
-
- def get_callback(self):
- return _ysynth_get_callback(self.context)
-
- class YConstant(object):
- """
- Unchanging signal output.
- """
-
- def __init__(self, const):
- self.const = float(const)
-
- def __iter__(self):
- return iter(self())
-
- def __call__(self):
- while True:
- yield self.const
-
- def set_synth(self, synth):
- pass
-
- class YAudioGraphNode(object):
- """
- Base audio graph node
-
- This base class provides YSynth's DSL behaviours such as
- adding, substracting and the sample iteration protocol.
- """
-
- def __init__(self, *inputs):
- self.inputs = []
- lens = 0
-
- for stream in inputs:
- if not isinstance(stream, YAudioGraphNode):
- stream = YConstant(stream)
- self.inputs.append(stream)
-
- self.inputs = tuple(self.inputs)
- #for stream in inputs:
- # if isinstance(stream, YAudioGraphNode):
- # pass
- # # Make sure all multi-channels share the same size
- # if len(stream) != 1:
- # if not lens:
- # lens = len
- # elif lens != len(stream):
- # # TODO: Expand error info to contain sizes
- # raise ValueError("Cannot combine different sized "
- # "multi-channel streams")
-
- #self.channels = lens
-
- def __iter__(self):
- """
- Initialise graph for synthesis, link to sampleclock.
- """
-
- # XXX This function is not graph cycle safe
-
- # FIXME: I need to unpack the channels from the input streams
- # multiplex any mono-streams if there are multi-channel streams
- # available, and then initialise every set of streams' component
- # generator function by calling self with the correct arguments.
-
- # Setup input streams
- for stream in self.inputs:
- stream.set_synth(self.synth)
-
- # Setup self
-
- # The self.samples variable is used for protecting against
- # multiple next() calls, next will only evaluate the next
- # set of input samples if the synth's sampleclock has changed
- self.samples = self.synth.samples - 1
-
- # XXX self.last_sample is currently not initialised on purpose
- # maybe this must be changed at a later time.
-
- # Connect the actual generator components
- sample_iter = iter(self(izip(*self.inputs)))
-
- # Build the sample protection function
- def sample_func():
- """
- Make sure multiple next() calls during the same
- clock cycle yield the same sample value.
- """
- while True:
- if self.samples != self.synth.samples:
- self.last_sample = next(sample_iter)
- self.samples = self.synth.samples
-
- yield self.last_sample
-
- return sample_func()
-
- def set_synth(self, synth):
- """
- Set this component's synthesizer.
-
- This is mainly useful for reading the synth's sampleclock. However every
- active component requires a valid synthesizer.
- """
- self.synth = synth
-
- def get_synth(self, synth):
- """
- Return this component's synthesizer.
- """
- return self.synth
-
- def __add__(self, other):
- return Adder(self, other)
-
- def __radd__(self, other):
- return Adder(other, self)
-
- def __sub__(self, other):
- return Subtractor(self, other)
-
- def __rsub__(self, other):
- return Subtractor(other, self)
-
- def __mul__(self, other):
- return Multiplier(self, other)
-
- def __rmul__(self, other):
- return Multiplier(other, self)
-
- def __div__(self, other):
- return Divisor(self, other)
-
- def __rdiv__(self, other):
- return Divisor(other, self)
-
- def __getitem__(self, delay):
- """
- Return a delayed version of the output signal.
- """
- return Delay(self, delay)
-
- def __next__(self):
- """
- Process next sample.
- """
-
- def __call__(self):
- raise NotImplementedError("You need to inherit this class")
-
- def process(self, *streams):
- raise NotImplementedError("You need to inherit this class")
-
- # Basic signal arithmetic
- class Adder(YAudioGraphNode):
- def __call__(self, l):
- for a, b in l:
- yield a + b
-
- class Subtractor(YAudioGraphNode):
- def __call__(self, l):
- for a, b in l:
- yield a - b
-
- class Multiplier(YAudioGraphNode):
- def __call__(self, l):
- for a, b in l:
- yield a * b
-
- class Divisor(YAudioGraphNode):
- def __call__(self, l):
- for a, b in l:
- yield a / b
-
- # Sample delay
- class Delay(YAudioGraphNode):
- def __call__(self, l):
- buf = [0.0] * 4096
- samples = 0
- for sig, delay in l:
- buf[samples] = sig
- yield buf[(samples - delay) % 4096]
- samples = (samples + 1) % 4096
-
- # Base oscillator class
- class YOscillator(YAudioGraphNode):
- def __call__(self, l):
- def cycle_gen():
- """
- This function generates the oscillation cycle
- upon which all basic decoupled oscillators base their output
- signal.
- A decoupled oscillator is an oscillator
- with its own cycle generator. These oscillators respond well to
- incoming frequency changes, but due to the limitations of floating
- point are at risk of drifting out of phase, coupled oscillators
- are always in phase with each other.
- The generated cycle ranges from 0.0 to 1.0 exclusive.
- """
-
- last_cycle = [0.0]
-
- for freq, in l:
- # The last_cycle will only be a list during the initial
- # iteration, perfect for a 'once' statement, we want
- # to force the first cycle value to 0.
- if isinstance(last_cycle, list):
- if isinstance(freq, np.ndarray):
- ifreq = freq[0]
- else:
- ifreq = freq
- last_cycle = [-(ifreq / float(self.synth.samplerate))]
-
- # Compute cycle using IIR filter and np.fmod
- cycle, last_cycle = lfilter([1], [1, -1], freq /
- float(self.synth.samplerate) *
- np.ones(self.synth.chunk_size), zi=last_cycle)
- yield np.fmod(cycle, 1.0)
- last_cycle = np.fmod(last_cycle, 1.0)
-
- return self.oscillate(cycle_gen())
-
- def oscillate(self, l):
- raise NotImplementedError("Inherit this class")
-
- # Basic oscillators
- class Sin(YOscillator):
- """
- Sine wave oscillator
- """
- def oscillate(self, l):
- for pos in l:
- yield np.sin(2 * pi * pos)
-
- class Cos(YOscillator):
- """
- Cosine wave oscillator
- """
- def oscillate(self, l):
- for pos in l:
- yield np.cos(2 * pi * pos)
-
- class Saw(YOscillator):
- """
- Saw wave oscillator
- """
- def oscillate(self, l):
- for pos in l:
- yield pos * 2.0 - 1.0
-
- class RevSaw(YOscillator):
- """
- Reverse Saw wave oscillator
- """
- def oscillate(self, l):
- for pos in l:
- yield -(pos * 2.0 - 1.0)
-
- class Square(YOscillator):
- """
- Square wave oscillator
- """
- def oscillate(self, l):
- for pos in l:
- #yield 1.0 if pos < 0.5 else -1.0
- pos = pos - 0.5
- yield -(np.abs(pos) / pos)
-
- class Pulse(YOscillator):
- """
- Pulse oscillator
- """
- def oscillate(self, l):
- zi = [1.0]
- for pos in l:
- pos, zi = lfilter([-1, 1], [1], pos, zi=zi)
- yield np.array(pos > 0, dtype=float)
|