Call Applications

switchio supports writing and composing call control apps written in pure Python. An app is simply a namespace which defines a set of event processing (async) functions [1].

Apps are somewhat analogous to extensions in FreeSWITCH’s XML dialplan interface and can similarly be activated using any event header or channel variable value of your choosing.

Apps can be implemented each as a standalone Python namespace which can hold state and be mutated at runtime. This allows for all sorts of dynamic call processing logic. Apps can also be shared across a FreeSWITCH process cluster allowing for centralized call processing overtop a scalable multi-process service system. Processing functions are implemented either as regular or async (i.e. coroutine) function callbacks and are selected to be invoked depending on the recieved event type.

Applications are loaded either directly using the low level Client or, in the case of a switchio cluster Service, using a AppManager.

API

Apps are usually implemented as plain old Python classes which contain methods decorated using the switchio.marks module.

Currently the marks supported would be one of:

@coroutine("EVENT_NAME")  # session oriented async function
@callback("EVENT_NAME")   # session oriented callback function
@handler("EVENT_NAME")    # low level event oriented callback function

Where EVENT_NAME is any of the strings supported by the ESL event type list.

Additionally, app types can support a prepost() callable which serves as a setup/teardown fixture mechanism for the app to do pre/post app loading execution. It can be either of a function or single-yield generator.

Note

For examples using prepost() see the extensive set of built-in apps under switchio.apps.

Event Callbacks and Coroutines

Session oriented event processors are methods which typically receive a type from switchio.models as their first (and only) argument. This type is most often a Session.

Note

Technically the method will receive whatever is returned as the 2nd value from the preceeding event handler looked up in the event processing loop, but this is an implementation detail and may change in the future.

Here is a simple callback which counts the number of answered sessions in a global:

import switchio

num_calls = 0

@switchio.callback('CHANNEL_ANSWER')
def counter(session):
    global num_calls
    num_calls += 1

Note

This is meant to be a simple example and not actually implemented for practical use. switchio.handlers.EventListener.count_calls() exists for this very purpose.

Event Handlers

An event handler is any callable marked by handler() which is expected to handle an ESL event packet and process it within the EventListener event loop. It’s function signature should expect a single argument - the received event packaged in a dict .

Example handlers can be found in the EventListener such as the default CHANNEL_ANSWER handler

    @handler('CHANNEL_ANSWER')
    def _handle_answer(self, e):
        '''Handle answer events

        Returns
        -------
        sess : session instance corresponding to uuid
        '''
        uuid = e.get('Unique-ID')
        sess = self.sessions.get(uuid, None)
        if sess:
            self.log.debug("answered {} session '{}'"
                           .format(e.get('Call-Direction'), uuid))
            sess.answered = True
            self.total_answered_sessions += 1
            sess.update(e)
            return True, sess
        else:
            self.log.warning('Skipping answer of {}'.format(uuid))
            return False, None

As you can see a knowledge of the underlying ESL event list usually is required for handler implementations.

Examples

TonePlay

As a first example here is the TonePlay app which is provided as a built-in for switchio

@app
class TonePlay(object):
    """Play a 'milli-watt' tone on the outbound leg and echo it back
    on the inbound
    """
    @event_callback('CHANNEL_PARK')
    def on_park(self, sess):
        if sess.is_inbound():
            sess.answer()

    @event_callback("CHANNEL_ANSWER")
    def on_answer(self, sess):
        # inbound leg simply echos back the tone
        if sess.is_inbound():
            sess.echo()

        # play infinite tones on calling leg
        if sess.is_outbound():
            sess.playback('tone_stream://%(251,0,1004)',
                          params={'loops': '-1'})

Clients who load this app will originate calls wherein a simple tone is played infinitely and echoed back to the caller until each call is hung up.

Proxier

An example of the proxy dialplan can be implemented quite trivially:

import switchio

class Proxier(object):
    @switchio.coroutine('CHANNEL_PARK')
    async def on_park(self, sess):
        if sess.is_inbound():
            sess.bridge(dest_url="${sip_req_user}@${sip_req_host}:${sip_req_port}")
            await sess.recv("CHANNEL_ANSWER")

CDR

The measurement application used by the Originator to gather stress testing performance metrics from call detail records:

class CDR(object):
    """Collect call detail record info including call oriented event time
    stamps and and active sessions data which can be used for per call metrics
    computations.
    """
    fields = [
        ('switchio_app', 'S50'),
        ('hangup_cause', 'S50'),
        ('caller_create', 'float64'),
        ('caller_answer',  'float64'),
        ('caller_req_originate', 'float64'),
        ('caller_originate', 'float64'),
        ('caller_hangup', 'float64'),
        ('job_launch', 'float64'),
        ('callee_create', 'float64'),
        ('callee_answer', 'float64'),
        ('callee_hangup', 'float64'),
        ('failed_calls', 'uint32'),
        ('active_sessions', 'uint32'),
        ('erlangs', 'uint32'),
    ]

    operators = {
        'call_metrics': call_metrics,
        # 'call_types': call_types,
        # 'hcm': hcm,
    }

    def __init__(self):
        self.log = utils.get_logger(__name__)
        self._call_counter = itertools.count(0)

    def new_storer(self):
        return DataStorer(self.__class__.__name__, dtype=self.fields)

    def prepost(self, listener, storer=None, pool=None, orig=None):
        self.listener = listener
        self.orig = orig
        # create our own storer if we're not loaded as a `Measurer`
        self._ds = storer if storer else self.new_storer()
        self.pool = weakref.proxy(pool) if pool else self.listener

    @property
    def storer(self):
        return self._ds

    @event_callback('CHANNEL_CREATE')
    def on_create(self, sess):
        """Store total (cluster) session count at channel create time
        """
        call_vars = sess.call.vars
        # call number tracking
        if not call_vars.get('call_index', None):
            call_vars['call_index'] = next(self._call_counter)
        # capture the current erlangs / call count
        call_vars['session_count'] = self.pool.count_sessions()
        call_vars['erlangs'] = self.pool.count_calls()

    @event_callback('CHANNEL_ORIGINATE')
    def on_originate(self, sess):
        # store local time stamp for originate
        sess.times['originate'] = sess.time
        sess.times['req_originate'] = time.time()

    @event_callback('CHANNEL_ANSWER')
    def on_answer(self, sess):
        sess.times['answer'] = sess.time

    @event_callback('CHANNEL_DESTROY')
    def log_stats(self, sess, job):
        """Append measurement data only once per call
        """
        sess.times['hangup'] = sess.time
        call = sess.call

        if call.sessions:  # still session(s) remaining to be hungup
            call.caller = call.first
            call.callee = call.last
            if job:
                call.job = job
            return  # stop now since more sessions are expected to hangup

        # all other sessions have been hungup so store all measurements
        caller = getattr(call, 'caller', None)
        if not caller:
            # most likely only one leg was established and the call failed
            # (i.e. call.caller was never assigned above)
            caller = sess

        callertimes = caller.times
        callee = getattr(call, 'callee', None)
        calleetimes = callee.times if callee else None

        pool = self.pool
        job = getattr(call, 'job', None)
        # NOTE: the entries here correspond to the listed `CDR.fields`
        rollover = self._ds.append_row((
            caller.appname,
            caller['Hangup-Cause'],
            callertimes['create'],  # invite time index
            callertimes['answer'],
            callertimes['req_originate'],  # local time stamp
            callertimes['originate'],
            callertimes['hangup'],
            # 2nd leg may not be successfully established
            job.launch_time if job else None,
            calleetimes['create'] if callee else None,
            calleetimes['answer'] if callee else None,
            calleetimes['hangup'] if callee else None,
            pool.count_failed(),
            call.vars['session_count'],
            call.vars['erlangs'],
        ))
        if rollover:
            self.log.debug('wrote data to disk')

It simply inserts the call record data on hangup once for each call.

PlayRec

This more involved application demonstrates FreeSWITCH’s ability to play and record rtp streams locally which can be used in tandem with MOS to do audio quality checking:

@app
class PlayRec(object):
    '''Play a recording to the callee and record it onto the local file system

    This app can be used in tandem with MOS scoring to verify audio quality.
    The filename provided must exist in the FreeSWITCH sounds directory such
    that ${FS_CONFIG_ROOT}/${sound_prefix}/<category>/<filename> points to a
    valid wave file.
    '''
    timer = utils.Timer()

    def prepost(
        self,
        client,
        filename='ivr-founder_of_freesource.wav',
        category='ivr',
        clip_length=4.25,  # measured empirically for the clip above
        sample_rate=8000,
        iterations=1,  # number of times the speech clip will be played
        callback=None,
        rec_period=5.0,  # in seconds (i.e. 1 recording per period)
        rec_stereo=False,
    ):
        self.filename = filename
        self.category = category
        self.framerate = sample_rate
        self.clip_length = clip_length
        if callback:
            assert inspect.isfunction(callback), 'callback must be a function'
            assert len(inspect.getargspec(callback)[0]) == 1
        self.callback = callback
        self.rec_period = rec_period
        self.stereo = rec_stereo
        self.log = utils.get_logger(self.__class__.__name__)
        self.silence = 'silence_stream://0'  # infinite silence stream
        self.iterations = iterations
        self.tail = 1.0

        # slave specific
        soundsdir = client.cmd('global_getvar sounds_dir')
        self.soundsprefix = client.cmd('global_getvar sound_prefix')
        # older FS versions don't return the deep path
        if soundsdir == self.soundsprefix:
            self.soundsprefix = '/'.join((self.soundsprefix, 'en/us/callie'))

        self.recsdir = client.cmd('global_getvar recordings_dir')
        self.audiofile = '{}/{}/{}/{}'.format(
            self.soundsprefix, self.category, self.framerate, self.filename)
        self.call2recs = OrderedDict()
        self.host = client.host

        # self.stats = OrderedDict()

    def __setduration__(self, value):
        """Called when an originator changes it's `duration` attribute
        """
        if value == float('inf'):
            self.iterations, self.tail = value, 1.0
        else:
            self.iterations, self.tail = divmod(value, self.clip_length)
        if self.tail < 1.0:
            self.tail = 1.0

    @event_callback("CHANNEL_PARK")
    def on_park(self, sess):
        if sess.is_inbound():
            sess.answer()

    @event_callback("CHANNEL_ANSWER")
    def on_answer(self, sess):
        call = sess.call
        if sess.is_inbound():
            # rec the callee stream
            elapsed = self.timer.elapsed()
            if elapsed >= self.rec_period:
                filename = '{}/callee_{}.wav'.format(self.recsdir, sess.uuid)
                sess.start_record(filename, stereo=self.stereo)
                self.call2recs.setdefault(call.uuid, {})['callee'] = filename
                call.vars['record'] = True
                # mark all rec calls to NOT be hung up automatically
                # (see the `Originator`'s bj callback)
                call.vars['noautohangup'] = True
                self.timer.reset()

            # set call length
            call.vars['iterations'] = self.iterations
            call.vars['tail'] = self.tail

        if sess.is_outbound():
            if call.vars.get('record'):  # call is already recording
                # rec the caller stream
                filename = '{}/caller_{}.wav'.format(self.recsdir, sess.uuid)
                sess.start_record(filename, stereo=self.stereo)
                self.call2recs.setdefault(call.uuid, {})['caller'] = filename
            else:
                self.trigger_playback(sess)

        # always enable a jitter buffer
        # sess.broadcast('jitterbuffer::60')

    @event_callback("PLAYBACK_START")
    def on_play(self, sess):
        fp = sess['Playback-File-Path']
        self.log.debug("Playing file '{}' for session '{}'"
                       .format(fp, sess.uuid))

        self.log.debug("fp is '{}'".format(fp))
        if fp == self.audiofile:
            sess.vars['clip'] = 'signal'
        elif fp == self.silence:
            # if playing silence tell the peer to start playing a signal
            sess.vars['clip'] = 'silence'
            peer = sess.call.get_peer(sess)
            if peer:  # may have already been hungup
                peer.breakmedia()
                peer.playback(self.audiofile)

    @event_callback("PLAYBACK_STOP")
    def on_stop(self, sess):
        '''On stop either trigger a new playing of the signal if more
        iterations are required or hangup the call.
        If the current call is being recorded schedule the recordings to stop
        and expect downstream callbacks to schedule call teardown.
        '''
        self.log.debug("Finished playing '{}' for session '{}'".format(
                      sess['Playback-File-Path'], sess.uuid))
        if sess.vars['clip'] == 'signal':
            vars = sess.call.vars
            vars['playback_count'] += 1

            if vars['playback_count'] < vars['iterations']:
                sess.playback(self.silence)
            else:
                # no more clips are expected to play
                if vars.get('record'):  # stop recording both ends
                    tail = vars['tail']
                    sess.stop_record(delay=tail)
                    peer = sess.call.get_peer(sess)
                    if peer:  # may have already been hungup
                        # infinite silence must be manually killed
                        peer.breakmedia()
                        peer.stop_record(delay=tail)
                else:
                    # hangup calls not being recorded immediately
                    self.log.debug("sending hangup for session '{}'"
                                   .format(sess.uuid))
                    if not sess.hungup:
                        sess.sched_hangup(0.5)  # delay hangup slightly

    def trigger_playback(self, sess):
        '''Trigger clip playback on the given session by doing the following:
        - Start playing a silence stream on the peer session
        - This will in turn trigger a speech playback on this session in the
        "PLAYBACK_START" callback
        '''
        peer = sess.call.get_peer(sess)
        peer.playback(self.silence)  # play infinite silence
        peer.vars['clip'] = 'silence'
        # start counting number of clips played
        sess.call.vars['playback_count'] = 0

    @event_callback("RECORD_START")
    def on_rec(self, sess):
        self.log.debug("Recording file '{}' for session '{}'".format(
            sess['Record-File-Path'], sess.uuid)
        )
        # mark this session as "currently recording"
        sess.vars['recorded'] = False
        # sess.setvar('timer_name', 'soft')

        # start signal playback on the caller
        if sess.is_outbound():
            self.trigger_playback(sess)

    @event_callback("RECORD_STOP")
    def on_recstop(self, sess):
        self.log.debug("Finished recording file '{}' for session '{}'".format(
            sess['Record-File-Path'], sess.uuid))
        # mark as recorded so user can block with `EventListener.waitfor`
        sess.vars['recorded'] = True
        if sess.hungup:
            self.log.warning(
                "sess '{}' was already hungup prior to recording completion?"
                .format(sess.uuid))

        # if sess.call.vars.get('record'):
        #     self.stats[sess.uuid] = sess.con.api(
        #         'json {{"command": "mediaStats", "data": {{"uuid": "{0}"}}}}'
        #         .format(sess.uuid)
        #     ).getBody()

        # if the far end has finished recording then hangup the call
        if sess.call.get_peer(sess).vars.get('recorded', True):
            self.log.debug("sending hangup for session '{}'".format(sess.uuid))
            if not sess.hungup:
                sess.sched_hangup(0.5)  # delay hangup slightly
            recs = self.call2recs[sess.call.uuid]

            # invoke callback for each recording
            if self.callback:
                self.callback(
                    RecInfo(self.host, recs['caller'], recs['callee'])
                )

For further examples check out the apps sub-package which also includes the very notorious switchio.apps.call_gen.Originator.

[1]Although this may change in the future with the introduction of native asyncio coroutines in Python 3.5.