fvx.py (13033B)
1 #!/usr/bin/env python3 2 3 # FrugalVox: experimental, straightforward, no-nonsense IVR framework on top of pyVoIP (patched) and TTS engines 4 # Created by Luxferre in 2023, released into public domain 5 # Deps: PyYAML, NumPy, espeak-ng/flite/libttspico, patched pyVoIP (see https://github.com/tayler6000/pyVoIP/issues/107#issuecomment-1440231926) 6 # All configuration is in config.yaml 7 8 import sys 9 import os 10 import signal 11 import tempfile 12 import yaml 13 import wave, audioop 14 import time 15 from datetime import datetime # for logging 16 import traceback # for logging 17 import socket # for local IP detection 18 import numpy as np # for in-band DTMF detection and generation 19 import importlib.util # for action modules import 20 from pyVoIP.VoIP import VoIPPhone, InvalidStateError, CallState 21 22 # global parameters 23 progname = 'FrugalVox v0.0.2' 24 config = {} # placeholder for config object 25 configfile = './config.yaml' # default config yaml path (relative to the workdir) 26 if len(sys.argv) > 1: 27 configfile = sys.argv[1] 28 configfile = os.path.realpath(configfile) 29 kernelroot = os.path.realpath(os.path.dirname(__file__)) # absolute path to the kernel directory 30 configroot = os.path.dirname(configfile) 31 sys.path.append(kernelroot) # make the kernel module findable 32 if configroot != kernelroot: 33 sys.path.append(configroot) # make the modules in configuration directory findable 34 audio_buf_len = 160 # analyze this amount of raw audio data bytes 35 emptybuf = b'\x80' * audio_buf_len 36 DTMF_TABLE = { 37 '1': [1209, 697], 38 '2': [1336, 697], 39 '3': [1477, 697], 40 'A': [1633, 697], 41 '4': [1209, 770], 42 '5': [1336, 770], 43 '6': [1477, 770], 44 'B': [1633, 770], 45 '7': [1209, 852], 46 '8': [1336, 852], 47 '9': [1477, 852], 48 'C': [1633, 852], 49 '*': [1209, 941], 50 '0': [1336, 941], 51 '#': [1477, 941], 52 'D': [1633, 941] 53 } 54 ivrconfig = None # placeholder for IVR auth config 55 calls = {} # placeholder for all realtime call instances 56 57 # helper methods 58 59 def logevent(msg): 60 dts = datetime.now().strftime('%Y-%m-%d %H:%M:%S') 61 print('[%s] %s' % (dts, msg)) 62 63 def load_audio(fname): # load audio data from a WAV PCM file, resampling it if necessary 64 f = wave.open(fname, 'rb') 65 outrate = 8000 66 aparams = f.getparams() 67 frames = aparams.nframes 68 channels = aparams.nchannels 69 inrate = aparams.framerate 70 swidth = aparams.sampwidth 71 data = f.readframes(frames) 72 f.close() 73 if channels > 1: # convert to mono 74 data = audioop.tomono(data, swidth, 0.5, 0.5) 75 if inrate > outrate or swidth > 1: # convert the sample rate and bit width at the same time 76 rfactor = int(inrate / outrate) * swidth # only multiples of 8 KHz are supported 77 out = bytearray() 78 blen = len(data) 79 bwidth = swidth << 3 # incoming bit width 80 bfactor = 1 << (bwidth - 8) # factor to divide the biased sample value by to get a single byte 81 for i in range(0, blen, swidth): # only add every `rfactor`th frame 82 if (i % rfactor) == 0: 83 if swidth == 1: 84 bval = data[i] 85 else: 86 bval = int.from_bytes(bytes(data[i:i+swidth]), byteorder='little', signed=True) 87 if bfactor > 1: # perform bit reduction if necessary 88 bval = int(round(bval / bfactor)) + 128 89 if bval > 255: # handle clipping 90 bval = 255 91 out.append(bval) 92 data = bytes(out) 93 return data 94 95 def load_yaml(fname): # load an object from a YAML file 96 yf = open(fname, 'r') 97 yc = yf.read() 98 yf.close() 99 return yaml.safe_load(yc) 100 101 def tts_to_file(text, fname, conf): # render the text to a file 102 ecmd = conf['cmd'] % (fname, text) 103 os.system(ecmd) # render to the temporary file 104 105 def tts_to_buf(text, conf): # render the text directly to a buffer 106 fh, fname = tempfile.mkstemp('.wav', 'fvx-') 107 os.close(fh) 108 tts_to_file(text, fname, conf) 109 buf = load_audio(fname) 110 os.remove(fname) 111 return buf 112 113 def gen_dtmf(f1, f2): # directly render two sine frequencies to a buffer (0.2 s duration and 8KHz sample rate hardcoded) 114 nbuf = np.arange(0, 0.2, 1 / 8000) # init target signal buffer and then sum the sine signals 115 return (127 + 61.44 * (np.sin(2 * np.pi * f1 * nbuf) + np.sin(2 * np.pi * f2 * nbuf))).astype(np.ubyte).tobytes() 116 117 def get_caller_addr(call): # extract caller's SIP address from the call request headers 118 return call.request.headers['From']['address'] 119 120 def get_callee_addr(call): # extract destination SIP address from the call request headers 121 return call.request.headers['To']['address'] 122 123 def flush_input_audio(call): # clear the call's RTP input buffer 124 abuf = None 125 for i in range(625): # because 625 * 160 = 100000 (pyVoIP's internal buffer size) 126 abuf = call.read_audio(audio_buf_len, False) 127 128 def playbuf(buf, call): # properly play audio buffer on the call 129 blen = len(buf) / 8000 130 call.write_audio(buf) 131 time.sleep(blen) 132 133 def playclips(clipset, call): # properly play clips on the call 134 for clipname in clipset: 135 playbuf(clips[clipname], call) 136 137 def hangup(call): # call hangup wrapper 138 global calls 139 if call.call_id in calls: 140 del calls[call.call_id] 141 try: 142 call.hangup() 143 except InvalidStateError: 144 pass 145 logevent('Call with %s terminated' % get_caller_addr(call)) 146 147 # in-band DTMF detector 148 149 def isNumberInArray(array, number): 150 offset = 5 151 for i in range(number - offset, number + offset): 152 if i in array: 153 return True 154 return False 155 156 def detect_dtmf(buf): # Detect a DTMF digit in the audio buffer using FFT 157 data = np.frombuffer(buf, dtype=np.uint8) 158 ftdata = np.fft.fft(data) 159 ftlen = len(ftdata) 160 for i in range(ftlen): 161 ftdata[i] = int(np.absolute(ftdata[i])) 162 lb = 20 * np.average(ftdata) # lower bound for filtering 163 freqs = [] 164 for i in range(ftlen): 165 if ftdata[i] > lb: 166 freqs.append(i) 167 for d, fpair in DTMF_TABLE.items(): # Detect and return the digit 168 if isNumberInArray(freqs, fpair[0]) and isNumberInArray(freqs, fpair[1]): 169 return d 170 171 # IVR command handler (for authenticated and authorized action runs) 172 173 def command_handler(act, modulefile, call, userid): 174 global clips 175 global calls 176 global config 177 actid = act[0] 178 params = act[1:] 179 logevent('Running action %s from the module %s with params (%s)' % (actid, modulefile, ', '.join(params))) 180 (modname, ext) = os.path.splitext(os.path.basename(modulefile)) 181 spec = importlib.util.spec_from_file_location(modname, modulefile) 182 actionmodule = importlib.util.module_from_spec(spec) 183 sys.modules[modname] = actionmodule 184 spec.loader.exec_module(actionmodule) 185 actionmodule.run_action(actid, params, call, userid, config, clips, calls) 186 187 # main call handler 188 189 def main_call_handler(call): # call object as the argument 190 global clips 191 global ivrconfig 192 global calls 193 calls[call.call_id] = call # register the call in the list 194 logevent('New incoming call from %s' % get_caller_addr(call)) 195 try: 196 call.answer() 197 authdone = True 198 userid = '0000' # default for the unauthorized 199 actionsallowed = '*' 200 if ivrconfig['auth'] == True: # drop all permissions and prompt for the PIN 201 authdone = False 202 actionsallowed = {} 203 playclips(ivrconfig['authpromptclips'], call) 204 else: # prompt for the first command 205 playclips(ivrconfig['cmdpromptclips'], call) 206 cmdbuf = '' # command buffer 207 cache_digit = None # in-band digit cache 208 while call.state == CallState.ANSWERED: # main event loop 209 audiobuf = call.read_audio(audio_buf_len, False) # nonblocking audio buffer read 210 digit = call.get_dtmf() # get a single out-of-band DTMF digit 211 if digit == '' and audiobuf != emptybuf: # no out-of-band digit, try in-band detection 212 ib_digit = detect_dtmf(audiobuf) 213 if ib_digit != cache_digit: 214 if ib_digit == None: # digit transmission ended 215 digit = cache_digit # save the digit 216 cache_digit = None # reset the cache 217 else: # digit transmission started 218 cache_digit = ib_digit 219 if digit == '#': # end of the command 220 if authdone: # we're authenticated, let's authorize the action 221 actionparts = cmdbuf.split('*') 222 actionroot = actionparts[0] 223 letthrough = False 224 if actionsallowed == '*' or (actionroot in actionsallowed): 225 letthrough = True 226 if letthrough: # authorized 227 if actionroot in ivrconfig['actions']: # command exists 228 actionmod = os.path.realpath(os.path.join(configroot, ivrconfig['actions'][actionroot])) # resolve the action module file 229 command_handler(actionparts, actionmod, call, userid) # pass control to the command handler along with the call instance 230 else: # command doesn't exist, notify the caller 231 playclips(ivrconfig['cmdfailclips'], call) 232 logevent('Attempt to execute a non-existing action %s with the user ID %s' % (cmdbuf, userid)) 233 else: # notify the caller that the command doesn't exist and log the event 234 playclips(ivrconfig['cmdfailclips'], call) 235 logevent('Attempt to execute an unauthorized action %s with the user ID %s' % (cmdbuf, userid)) 236 playclips(ivrconfig['cmdpromptclips'], call) # prompt for the next command 237 flush_input_audio(call) 238 else: # we expect the first command to be our user PIN 239 if cmdbuf in ivrconfig['users']: # PIN found, confirm auth and prompt for the command 240 authdone = True 241 userid = cmdbuf 242 actionsallowed = ivrconfig['users'][userid] 243 playclips(ivrconfig['cmdpromptclips'], call) # prompt for the next command 244 else: # PIN not found, alert the caller, log the failed entry and hang up 245 playclips(ivrconfig['authfailclips'], call) 246 logevent('Attempt to enter with invalid PIN %s' % cmdbuf) 247 hangup(call) 248 cmdbuf = '' # clear command buffer 249 elif digit != '': # append the digit to the command buffer 250 cmdbuf += digit 251 hangup(call) 252 except InvalidStateError: # usually this means the call was hung up mid-action 253 hangup(call) 254 except SystemExit: # in case the service has been stopped or restarted 255 hangup(call) 256 except Exception as e: 257 print('Unknown error: ', sys.exc_info()) 258 traceback.print_exc() 259 hangup(call) 260 261 # signal handler for graceful process termination 262 263 def sighandler(signum, frame): 264 global phone 265 logevent('Stopping the SIP client...') 266 phone.stop() 267 logevent('SIP client stopped, bye!') 268 269 # entry point 270 271 if __name__ == '__main__': 272 logevent('Starting %s' % progname) 273 config = load_yaml(configfile) 274 ivrconfig = config['ivr'] 275 logevent('Configuration loaded from %s' % configfile) 276 clipDir = os.path.realpath(os.path.join(configroot, config['clips']['dir'])) 277 logevent('Loading static audio clips') 278 clips = config['clips']['files'] 279 for k, fname in clips.items(): 280 clips[k] = load_audio(os.path.join(clipDir, fname)) 281 logevent('Rendering TTS phrases') 282 for pname, phrase in config['tts']['phrases'].items(): 283 clips[pname] = tts_to_buf(phrase, config['tts']) 284 logevent('Rendering DTMF clips') 285 clips['dtmf'] = {} 286 for digit, fpair in DTMF_TABLE.items(): 287 clips['dtmf'][digit] = gen_dtmf(fpair[0], fpair[1]) 288 logevent('All clips loaded to memory buffers from %s' % clipDir) 289 logevent('Initializing SIP phone part') 290 sip = config['sip'] 291 sipport = int(sip['port']) 292 localname = socket.gethostname() 293 localip = (([ip for ip in socket.gethostbyname_ex(localname)[2] if not ip.startswith('127.')] or [[(s.connect((sip['host'], sipport)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]]) + [None])[0] 294 if localip == None: 295 localip = socket.gethostbyname(localname) 296 logevent('Local IP detected: %s' % localip) 297 phone = VoIPPhone(sip['host'], sipport, sip['username'], sip['password'], myIP=localip, rtpPortLow=int(sip['rtpPortLow']), rtpPortHigh=int(sip['rtpPortHigh']), callCallback=main_call_handler) 298 # register the SIGINT and SIGTERM handlers to gracefully stop the phone instance 299 signal.signal(signal.SIGINT, sighandler) 300 signal.signal(signal.SIGTERM, sighandler) 301 phone.start() 302 logevent('SIP client started')