Call Applications¶
switchio supports writing and composing call control apps written in pure Python. An app is simply a namespace which defines a set of event processing (async) functions [1].
Apps are somewhat analogous to extensions in FreeSWITCH’s XML dialplan interface and can similarly be activated using any event header or channel variable value of your choosing.
Apps can be implemented each as a standalone Python namespace which can hold state and be mutated at runtime. This allows for all sorts of dynamic call processing logic. Apps can also be shared across a FreeSWITCH process cluster allowing for centralized call processing overtop a scalable multi-process service system. Processing functions are implemented either as regular or async (i.e. coroutine) function callbacks and are selected to be invoked depending on the recieved event type.
Applications are loaded either directly using the low level
Client
or, in the case of a switchio cluster
Service, using a AppManager
.
API¶
Apps are usually implemented as plain old Python classes which contain
methods decorated using the switchio.marks
module.
Currently the marks supported would be one of:
@coroutine("EVENT_NAME") # session oriented async function
@callback("EVENT_NAME") # session oriented callback function
@handler("EVENT_NAME") # low level event oriented callback function
Where EVENT_NAME is any of the strings supported by the ESL event type list.
Additionally, app types can support a prepost()
callable which serves
as a setup/teardown fixture mechanism for the app to do pre/post app loading
execution. It can be either of a function or single-yield generator.
Note
For examples using prepost()
see the extensive set of built-in
apps under switchio.apps
.
Event Callbacks and Coroutines¶
Session oriented event processors are methods which typically receive a
type from switchio.models
as their first (and only) argument. This
type is most often a Session
.
Note
Technically the method will receive whatever is returned as the 2nd value from the preceeding event handler looked up in the event processing loop, but this is an implementation detail and may change in the future.
Here is a simple callback which counts the number of answered sessions in a global:
import switchio
num_calls = 0
@switchio.callback('CHANNEL_ANSWER')
def counter(session):
global num_calls
num_calls += 1
Note
This is meant to be a simple example and not actually
implemented for practical use.
switchio.handlers.EventListener.count_calls()
exists
for this very purpose.
Event Handlers¶
An event handler is any callable marked by handler()
which
is expected to handle an ESL event packet and process it within the
EventListener
event loop.
It’s function signature should expect a single argument - the received
event packaged in a dict
.
Example handlers can be found in the EventListener
such as the default CHANNEL_ANSWER handler
@handler('CHANNEL_ANSWER')
def _handle_answer(self, e):
'''Handle answer events
Returns
-------
sess : session instance corresponding to uuid
'''
uuid = e.get('Unique-ID')
sess = self.sessions.get(uuid, None)
if sess:
self.log.debug("answered {} session '{}'"
.format(e.get('Call-Direction'), uuid))
sess.answered = True
self.total_answered_sessions += 1
sess.update(e)
return True, sess
else:
self.log.warning('Skipping answer of {}'.format(uuid))
return False, None
As you can see a knowledge of the underlying ESL event list usually is required for handler implementations.
Examples¶
TonePlay¶
As a first example here is the TonePlay
app which is provided as a built-in for switchio
@app
class TonePlay(object):
"""Play a 'milli-watt' tone on the outbound leg and echo it back
on the inbound
"""
@event_callback('CHANNEL_PARK')
def on_park(self, sess):
if sess.is_inbound():
sess.answer()
@event_callback("CHANNEL_ANSWER")
def on_answer(self, sess):
# inbound leg simply echos back the tone
if sess.is_inbound():
sess.echo()
# play infinite tones on calling leg
if sess.is_outbound():
sess.playback('tone_stream://%(251,0,1004)',
params={'loops': '-1'})
Clients
who load this app will originate
calls wherein a simple tone is played infinitely and echoed back to
the caller until each call is hung up.
Proxier¶
An example of the proxy dialplan can be implemented quite trivially:
import switchio
class Proxier(object):
@switchio.coroutine('CHANNEL_PARK')
async def on_park(self, sess):
if sess.is_inbound():
sess.bridge(dest_url="${sip_req_user}@${sip_req_host}:${sip_req_port}")
await sess.recv("CHANNEL_ANSWER")
CDR¶
The measurement application used by the
Originator
to gather stress testing
performance metrics from call detail records:
class CDR(object):
"""Collect call detail record info including call oriented event time
stamps and and active sessions data which can be used for per call metrics
computations.
"""
fields = [
('switchio_app', 'S50'),
('hangup_cause', 'S50'),
('caller_create', 'float64'),
('caller_answer', 'float64'),
('caller_req_originate', 'float64'),
('caller_originate', 'float64'),
('caller_hangup', 'float64'),
('job_launch', 'float64'),
('callee_create', 'float64'),
('callee_answer', 'float64'),
('callee_hangup', 'float64'),
('failed_calls', 'uint32'),
('active_sessions', 'uint32'),
('erlangs', 'uint32'),
]
operators = {
'call_metrics': call_metrics,
# 'call_types': call_types,
# 'hcm': hcm,
}
def __init__(self):
self.log = utils.get_logger(__name__)
self._call_counter = itertools.count(0)
def new_storer(self):
return DataStorer(self.__class__.__name__, dtype=self.fields)
def prepost(self, listener, storer=None, pool=None, orig=None):
self.listener = listener
self.orig = orig
# create our own storer if we're not loaded as a `Measurer`
self._ds = storer if storer else self.new_storer()
self.pool = weakref.proxy(pool) if pool else self.listener
@property
def storer(self):
return self._ds
@event_callback('CHANNEL_CREATE')
def on_create(self, sess):
"""Store total (cluster) session count at channel create time
"""
call_vars = sess.call.vars
# call number tracking
if not call_vars.get('call_index', None):
call_vars['call_index'] = next(self._call_counter)
# capture the current erlangs / call count
call_vars['session_count'] = self.pool.count_sessions()
call_vars['erlangs'] = self.pool.count_calls()
@event_callback('CHANNEL_ORIGINATE')
def on_originate(self, sess):
# store local time stamp for originate
sess.times['originate'] = sess.time
sess.times['req_originate'] = time.time()
@event_callback('CHANNEL_ANSWER')
def on_answer(self, sess):
sess.times['answer'] = sess.time
@event_callback('CHANNEL_DESTROY')
def log_stats(self, sess, job):
"""Append measurement data only once per call
"""
sess.times['hangup'] = sess.time
call = sess.call
if call.sessions: # still session(s) remaining to be hungup
call.caller = call.first
call.callee = call.last
if job:
call.job = job
return # stop now since more sessions are expected to hangup
# all other sessions have been hungup so store all measurements
caller = getattr(call, 'caller', None)
if not caller:
# most likely only one leg was established and the call failed
# (i.e. call.caller was never assigned above)
caller = sess
callertimes = caller.times
callee = getattr(call, 'callee', None)
calleetimes = callee.times if callee else None
pool = self.pool
job = getattr(call, 'job', None)
# NOTE: the entries here correspond to the listed `CDR.fields`
rollover = self._ds.append_row((
caller.appname,
caller['Hangup-Cause'],
callertimes['create'], # invite time index
callertimes['answer'],
callertimes['req_originate'], # local time stamp
callertimes['originate'],
callertimes['hangup'],
# 2nd leg may not be successfully established
job.launch_time if job else None,
calleetimes['create'] if callee else None,
calleetimes['answer'] if callee else None,
calleetimes['hangup'] if callee else None,
pool.count_failed(),
call.vars['session_count'],
call.vars['erlangs'],
))
if rollover:
self.log.debug('wrote data to disk')
It simply inserts the call record data on hangup once for each call.
PlayRec¶
This more involved application demonstrates FreeSWITCH’s ability to play and record rtp streams locally which can be used in tandem with MOS to do audio quality checking:
@app
class PlayRec(object):
'''Play a recording to the callee and record it onto the local file system
This app can be used in tandem with MOS scoring to verify audio quality.
The filename provided must exist in the FreeSWITCH sounds directory such
that ${FS_CONFIG_ROOT}/${sound_prefix}/<category>/<filename> points to a
valid wave file.
'''
timer = utils.Timer()
def prepost(
self,
client,
filename='ivr-founder_of_freesource.wav',
category='ivr',
clip_length=4.25, # measured empirically for the clip above
sample_rate=8000,
iterations=1, # number of times the speech clip will be played
callback=None,
rec_period=5.0, # in seconds (i.e. 1 recording per period)
rec_stereo=False,
):
self.filename = filename
self.category = category
self.framerate = sample_rate
self.clip_length = clip_length
if callback:
assert inspect.isfunction(callback), 'callback must be a function'
assert len(inspect.getargspec(callback)[0]) == 1
self.callback = callback
self.rec_period = rec_period
self.stereo = rec_stereo
self.log = utils.get_logger(self.__class__.__name__)
self.silence = 'silence_stream://0' # infinite silence stream
self.iterations = iterations
self.tail = 1.0
# slave specific
soundsdir = client.cmd('global_getvar sounds_dir')
self.soundsprefix = client.cmd('global_getvar sound_prefix')
# older FS versions don't return the deep path
if soundsdir == self.soundsprefix:
self.soundsprefix = '/'.join((self.soundsprefix, 'en/us/callie'))
self.recsdir = client.cmd('global_getvar recordings_dir')
self.audiofile = '{}/{}/{}/{}'.format(
self.soundsprefix, self.category, self.framerate, self.filename)
self.call2recs = OrderedDict()
self.host = client.host
# self.stats = OrderedDict()
def __setduration__(self, value):
"""Called when an originator changes it's `duration` attribute
"""
if value == float('inf'):
self.iterations, self.tail = value, 1.0
else:
self.iterations, self.tail = divmod(value, self.clip_length)
if self.tail < 1.0:
self.tail = 1.0
@event_callback("CHANNEL_PARK")
def on_park(self, sess):
if sess.is_inbound():
sess.answer()
@event_callback("CHANNEL_ANSWER")
def on_answer(self, sess):
call = sess.call
if sess.is_inbound():
# rec the callee stream
elapsed = self.timer.elapsed()
if elapsed >= self.rec_period:
filename = '{}/callee_{}.wav'.format(self.recsdir, sess.uuid)
sess.start_record(filename, stereo=self.stereo)
self.call2recs.setdefault(call.uuid, {})['callee'] = filename
call.vars['record'] = True
# mark all rec calls to NOT be hung up automatically
# (see the `Originator`'s bj callback)
call.vars['noautohangup'] = True
self.timer.reset()
# set call length
call.vars['iterations'] = self.iterations
call.vars['tail'] = self.tail
if sess.is_outbound():
if call.vars.get('record'): # call is already recording
# rec the caller stream
filename = '{}/caller_{}.wav'.format(self.recsdir, sess.uuid)
sess.start_record(filename, stereo=self.stereo)
self.call2recs.setdefault(call.uuid, {})['caller'] = filename
else:
self.trigger_playback(sess)
# always enable a jitter buffer
# sess.broadcast('jitterbuffer::60')
@event_callback("PLAYBACK_START")
def on_play(self, sess):
fp = sess['Playback-File-Path']
self.log.debug("Playing file '{}' for session '{}'"
.format(fp, sess.uuid))
self.log.debug("fp is '{}'".format(fp))
if fp == self.audiofile:
sess.vars['clip'] = 'signal'
elif fp == self.silence:
# if playing silence tell the peer to start playing a signal
sess.vars['clip'] = 'silence'
peer = sess.call.get_peer(sess)
if peer: # may have already been hungup
peer.breakmedia()
peer.playback(self.audiofile)
@event_callback("PLAYBACK_STOP")
def on_stop(self, sess):
'''On stop either trigger a new playing of the signal if more
iterations are required or hangup the call.
If the current call is being recorded schedule the recordings to stop
and expect downstream callbacks to schedule call teardown.
'''
self.log.debug("Finished playing '{}' for session '{}'".format(
sess['Playback-File-Path'], sess.uuid))
if sess.vars['clip'] == 'signal':
vars = sess.call.vars
vars['playback_count'] += 1
if vars['playback_count'] < vars['iterations']:
sess.playback(self.silence)
else:
# no more clips are expected to play
if vars.get('record'): # stop recording both ends
tail = vars['tail']
sess.stop_record(delay=tail)
peer = sess.call.get_peer(sess)
if peer: # may have already been hungup
# infinite silence must be manually killed
peer.breakmedia()
peer.stop_record(delay=tail)
else:
# hangup calls not being recorded immediately
self.log.debug("sending hangup for session '{}'"
.format(sess.uuid))
if not sess.hungup:
sess.sched_hangup(0.5) # delay hangup slightly
def trigger_playback(self, sess):
'''Trigger clip playback on the given session by doing the following:
- Start playing a silence stream on the peer session
- This will in turn trigger a speech playback on this session in the
"PLAYBACK_START" callback
'''
peer = sess.call.get_peer(sess)
peer.playback(self.silence) # play infinite silence
peer.vars['clip'] = 'silence'
# start counting number of clips played
sess.call.vars['playback_count'] = 0
@event_callback("RECORD_START")
def on_rec(self, sess):
self.log.debug("Recording file '{}' for session '{}'".format(
sess['Record-File-Path'], sess.uuid)
)
# mark this session as "currently recording"
sess.vars['recorded'] = False
# sess.setvar('timer_name', 'soft')
# start signal playback on the caller
if sess.is_outbound():
self.trigger_playback(sess)
@event_callback("RECORD_STOP")
def on_recstop(self, sess):
self.log.debug("Finished recording file '{}' for session '{}'".format(
sess['Record-File-Path'], sess.uuid))
# mark as recorded so user can block with `EventListener.waitfor`
sess.vars['recorded'] = True
if sess.hungup:
self.log.warning(
"sess '{}' was already hungup prior to recording completion?"
.format(sess.uuid))
# if sess.call.vars.get('record'):
# self.stats[sess.uuid] = sess.con.api(
# 'json {{"command": "mediaStats", "data": {{"uuid": "{0}"}}}}'
# .format(sess.uuid)
# ).getBody()
# if the far end has finished recording then hangup the call
if sess.call.get_peer(sess).vars.get('recorded', True):
self.log.debug("sending hangup for session '{}'".format(sess.uuid))
if not sess.hungup:
sess.sched_hangup(0.5) # delay hangup slightly
recs = self.call2recs[sess.call.uuid]
# invoke callback for each recording
if self.callback:
self.callback(
RecInfo(self.host, recs['caller'], recs['callee'])
)
For further examples check out the apps
sub-package which also includes the very notorious
switchio.apps.call_gen.Originator
.
[1] | Although this may change in the future with the introduction of native asyncio coroutines in Python 3.5. |