frugalvox

A tiny VoIP IVR framework by hackers, for hackers
git clone git://git.luxferre.top/frugalvox.git
Log | Files | Refs | README | LICENSE

fvx.py (13033B)


      1 #!/usr/bin/env python3
      2 
      3 # FrugalVox: experimental, straightforward, no-nonsense IVR framework on top of pyVoIP (patched) and TTS engines
      4 # Created by Luxferre in 2023, released into public domain
      5 # Deps: PyYAML, NumPy, espeak-ng/flite/libttspico, patched pyVoIP (see https://github.com/tayler6000/pyVoIP/issues/107#issuecomment-1440231926)
      6 # All configuration is in config.yaml
      7 
      8 import sys
      9 import os
     10 import signal
     11 import tempfile
     12 import yaml
     13 import wave, audioop
     14 import time
     15 from datetime import datetime # for logging
     16 import traceback # for logging
     17 import socket # for local IP detection
     18 import numpy as np # for in-band DTMF detection and generation
     19 import importlib.util # for action modules import
     20 from pyVoIP.VoIP import VoIPPhone, InvalidStateError, CallState
     21 
     22 # global parameters
     23 progname = 'FrugalVox v0.0.2'
     24 config = {} # placeholder for config object
     25 configfile = './config.yaml' # default config yaml path (relative to the workdir)
     26 if len(sys.argv) > 1:
     27     configfile = sys.argv[1]
     28 configfile = os.path.realpath(configfile)
     29 kernelroot = os.path.realpath(os.path.dirname(__file__)) # absolute path to the kernel directory
     30 configroot = os.path.dirname(configfile)
     31 sys.path.append(kernelroot) # make the kernel module findable
     32 if configroot != kernelroot:
     33     sys.path.append(configroot) # make the modules in configuration directory findable
     34 audio_buf_len = 160 # analyze this amount of raw audio data bytes
     35 emptybuf = b'\x80' * audio_buf_len
     36 DTMF_TABLE = {
     37     '1': [1209, 697],
     38     '2': [1336, 697],
     39     '3': [1477, 697],
     40     'A': [1633, 697],
     41     '4': [1209, 770],
     42     '5': [1336, 770],
     43     '6': [1477, 770],
     44     'B': [1633, 770],
     45     '7': [1209, 852],
     46     '8': [1336, 852],
     47     '9': [1477, 852],
     48     'C': [1633, 852],
     49     '*': [1209, 941],
     50     '0': [1336, 941],
     51     '#': [1477, 941],
     52     'D': [1633, 941]
     53 }
     54 ivrconfig = None # placeholder for IVR auth config
     55 calls = {} # placeholder for all realtime call instances
     56 
     57 # helper methods
     58 
     59 def logevent(msg):
     60     dts = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
     61     print('[%s] %s' % (dts, msg))
     62 
     63 def load_audio(fname): # load audio data from a WAV PCM file, resampling it if necessary
     64     f = wave.open(fname, 'rb')
     65     outrate = 8000
     66     aparams = f.getparams()
     67     frames = aparams.nframes
     68     channels = aparams.nchannels
     69     inrate = aparams.framerate
     70     swidth = aparams.sampwidth
     71     data = f.readframes(frames)
     72     f.close()
     73     if channels > 1: # convert to mono
     74         data = audioop.tomono(data, swidth, 0.5, 0.5)
     75     if inrate > outrate or swidth > 1: # convert the sample rate and bit width at the same time
     76         rfactor = int(inrate / outrate) * swidth # only multiples of 8 KHz are supported
     77         out = bytearray()
     78         blen = len(data)
     79         bwidth = swidth << 3 # incoming bit width
     80         bfactor = 1 << (bwidth - 8) # factor to divide the biased sample value by to get a single byte
     81         for i in range(0, blen, swidth): # only add every `rfactor`th frame
     82             if (i % rfactor) == 0:
     83                 if swidth == 1:
     84                     bval = data[i]
     85                 else:
     86                     bval = int.from_bytes(bytes(data[i:i+swidth]), byteorder='little', signed=True)
     87                 if bfactor > 1: # perform bit reduction if necessary
     88                     bval = int(round(bval / bfactor)) + 128
     89                 if bval > 255: # handle clipping
     90                     bval = 255
     91                 out.append(bval)
     92         data = bytes(out)
     93     return data
     94 
     95 def load_yaml(fname): # load an object from a YAML file
     96     yf = open(fname, 'r')
     97     yc = yf.read()
     98     yf.close()
     99     return yaml.safe_load(yc)
    100 
    101 def tts_to_file(text, fname, conf): # render the text to a file
    102     ecmd = conf['cmd'] % (fname, text)
    103     os.system(ecmd) # render to the temporary file
    104 
    105 def tts_to_buf(text, conf): # render the text directly to a buffer
    106     fh, fname = tempfile.mkstemp('.wav', 'fvx-')
    107     os.close(fh)
    108     tts_to_file(text, fname, conf)
    109     buf = load_audio(fname)
    110     os.remove(fname)
    111     return buf
    112 
    113 def gen_dtmf(f1, f2): # directly render two sine frequencies to a buffer (0.2 s duration and 8KHz sample rate hardcoded)
    114     nbuf = np.arange(0, 0.2, 1 / 8000) # init target signal buffer and then sum the sine signals
    115     return (127 + 61.44 * (np.sin(2 * np.pi * f1 * nbuf) + np.sin(2 * np.pi * f2 * nbuf))).astype(np.ubyte).tobytes()
    116 
    117 def get_caller_addr(call): # extract caller's SIP address from the call request headers
    118     return call.request.headers['From']['address']
    119 
    120 def get_callee_addr(call): # extract destination SIP address from the call request headers
    121     return call.request.headers['To']['address']
    122 
    123 def flush_input_audio(call): # clear the call's RTP input buffer
    124     abuf = None
    125     for i in range(625): # because 625 * 160 = 100000 (pyVoIP's internal buffer size)
    126         abuf = call.read_audio(audio_buf_len, False)
    127 
    128 def playbuf(buf, call): # properly play audio buffer on the call
    129     blen = len(buf) / 8000
    130     call.write_audio(buf)
    131     time.sleep(blen)
    132 
    133 def playclips(clipset, call): # properly play clips on the call
    134     for clipname in clipset:
    135         playbuf(clips[clipname], call)
    136 
    137 def hangup(call): # call hangup wrapper
    138     global calls
    139     if call.call_id in calls:
    140         del calls[call.call_id]
    141     try:
    142         call.hangup()
    143     except InvalidStateError:
    144         pass
    145     logevent('Call with %s terminated' % get_caller_addr(call))
    146 
    147 # in-band DTMF detector
    148 
    149 def isNumberInArray(array, number):
    150     offset = 5
    151     for i in range(number - offset, number + offset):
    152         if i in array:
    153             return True
    154     return False
    155 
    156 def detect_dtmf(buf): # Detect a DTMF digit in the audio buffer using FFT
    157     data = np.frombuffer(buf, dtype=np.uint8)
    158     ftdata = np.fft.fft(data)
    159     ftlen = len(ftdata)
    160     for i in range(ftlen):
    161         ftdata[i] = int(np.absolute(ftdata[i]))
    162     lb = 20 * np.average(ftdata) # lower bound for filtering
    163     freqs = []
    164     for i in range(ftlen):
    165         if ftdata[i] > lb:
    166             freqs.append(i)
    167     for d, fpair in DTMF_TABLE.items(): # Detect and return the digit
    168         if isNumberInArray(freqs, fpair[0]) and isNumberInArray(freqs, fpair[1]):
    169             return d
    170 
    171 # IVR command handler (for authenticated and authorized action runs)
    172 
    173 def command_handler(act, modulefile, call, userid):
    174     global clips
    175     global calls
    176     global config
    177     actid = act[0]
    178     params = act[1:]
    179     logevent('Running action %s from the module %s with params (%s)' % (actid, modulefile, ', '.join(params)))
    180     (modname, ext) = os.path.splitext(os.path.basename(modulefile))
    181     spec = importlib.util.spec_from_file_location(modname, modulefile)
    182     actionmodule = importlib.util.module_from_spec(spec)
    183     sys.modules[modname] = actionmodule
    184     spec.loader.exec_module(actionmodule)
    185     actionmodule.run_action(actid, params, call, userid, config, clips, calls)
    186 
    187 # main call handler
    188 
    189 def main_call_handler(call): # call object as the argument
    190     global clips
    191     global ivrconfig
    192     global calls
    193     calls[call.call_id] = call # register the call in the list
    194     logevent('New incoming call from %s' % get_caller_addr(call))
    195     try:
    196         call.answer()
    197         authdone = True
    198         userid = '0000' # default for the unauthorized
    199         actionsallowed = '*'
    200         if ivrconfig['auth'] == True: # drop all permissions and prompt for the PIN
    201             authdone = False
    202             actionsallowed = {}
    203             playclips(ivrconfig['authpromptclips'], call)
    204         else: # prompt for the first command
    205             playclips(ivrconfig['cmdpromptclips'], call)
    206         cmdbuf = '' # command buffer
    207         cache_digit = None # in-band digit cache
    208         while call.state == CallState.ANSWERED: # main event loop
    209             audiobuf = call.read_audio(audio_buf_len, False) # nonblocking audio buffer read
    210             digit = call.get_dtmf() # get a single out-of-band DTMF digit
    211             if digit == '' and audiobuf != emptybuf: # no out-of-band digit, try in-band detection
    212                 ib_digit = detect_dtmf(audiobuf)
    213                 if ib_digit != cache_digit:
    214                     if ib_digit == None: # digit transmission ended
    215                         digit = cache_digit # save the digit
    216                         cache_digit = None  # reset the cache
    217                     else: # digit transmission started
    218                         cache_digit = ib_digit
    219             if digit == '#': # end of the command
    220                 if authdone: # we're authenticated, let's authorize the action
    221                     actionparts = cmdbuf.split('*')
    222                     actionroot = actionparts[0]
    223                     letthrough = False
    224                     if actionsallowed == '*' or (actionroot in actionsallowed):
    225                         letthrough = True
    226                     if letthrough: # authorized
    227                         if actionroot in ivrconfig['actions']: # command exists
    228                             actionmod = os.path.realpath(os.path.join(configroot, ivrconfig['actions'][actionroot])) # resolve the action module file
    229                             command_handler(actionparts, actionmod, call, userid) # pass control to the command handler along with the call instance
    230                         else: # command doesn't exist, notify the caller
    231                             playclips(ivrconfig['cmdfailclips'], call)
    232                             logevent('Attempt to execute a non-existing action %s with the user ID %s' % (cmdbuf, userid))
    233                     else: # notify the caller that the command doesn't exist and log the event
    234                         playclips(ivrconfig['cmdfailclips'], call)
    235                         logevent('Attempt to execute an unauthorized action %s with the user ID %s' % (cmdbuf, userid))
    236                     playclips(ivrconfig['cmdpromptclips'], call) # prompt for the next command
    237                     flush_input_audio(call)
    238                 else: # we expect the first command to be our user PIN
    239                     if cmdbuf in ivrconfig['users']: # PIN found, confirm auth and prompt for the command
    240                         authdone = True
    241                         userid = cmdbuf
    242                         actionsallowed = ivrconfig['users'][userid]
    243                         playclips(ivrconfig['cmdpromptclips'], call) # prompt for the next command
    244                     else: # PIN not found, alert the caller, log the failed entry and hang up
    245                         playclips(ivrconfig['authfailclips'], call)
    246                         logevent('Attempt to enter with invalid PIN %s' % cmdbuf)
    247                         hangup(call)
    248                 cmdbuf = '' # clear command buffer
    249             elif digit != '': # append the digit to the command buffer
    250                 cmdbuf += digit
    251         hangup(call)
    252     except InvalidStateError: # usually this means the call was hung up mid-action
    253         hangup(call)
    254     except SystemExit: # in case the service has been stopped or restarted
    255         hangup(call)
    256     except Exception as e:
    257         print('Unknown error: ', sys.exc_info())
    258         traceback.print_exc()
    259         hangup(call)
    260 
    261 # signal handler for graceful process termination
    262 
    263 def sighandler(signum, frame):
    264     global phone
    265     logevent('Stopping the SIP client...')
    266     phone.stop()
    267     logevent('SIP client stopped, bye!')
    268 
    269 # entry point
    270 
    271 if __name__ == '__main__':
    272     logevent('Starting %s' % progname)
    273     config = load_yaml(configfile)
    274     ivrconfig = config['ivr']
    275     logevent('Configuration loaded from %s' % configfile)
    276     clipDir = os.path.realpath(os.path.join(configroot, config['clips']['dir']))
    277     logevent('Loading static audio clips')
    278     clips = config['clips']['files']
    279     for k, fname in clips.items():
    280         clips[k] = load_audio(os.path.join(clipDir, fname))
    281     logevent('Rendering TTS phrases')
    282     for pname, phrase in config['tts']['phrases'].items():
    283         clips[pname] = tts_to_buf(phrase, config['tts'])
    284     logevent('Rendering DTMF clips')
    285     clips['dtmf'] = {}
    286     for digit, fpair in DTMF_TABLE.items():
    287         clips['dtmf'][digit] = gen_dtmf(fpair[0], fpair[1])
    288     logevent('All clips loaded to memory buffers from %s' % clipDir)
    289     logevent('Initializing SIP phone part')
    290     sip = config['sip']
    291     sipport = int(sip['port'])
    292     localname = socket.gethostname()
    293     localip = (([ip for ip in socket.gethostbyname_ex(localname)[2] if not ip.startswith('127.')] or [[(s.connect((sip['host'], sipport)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]]) + [None])[0]
    294     if localip == None:
    295         localip = socket.gethostbyname(localname)
    296     logevent('Local IP detected: %s' % localip)
    297     phone = VoIPPhone(sip['host'], sipport, sip['username'], sip['password'], myIP=localip, rtpPortLow=int(sip['rtpPortLow']), rtpPortHigh=int(sip['rtpPortHigh']), callCallback=main_call_handler)
    298     # register the SIGINT and SIGTERM handlers to gracefully stop the phone instance
    299     signal.signal(signal.SIGINT, sighandler)
    300     signal.signal(signal.SIGTERM, sighandler)
    301     phone.start()
    302     logevent('SIP client started')