rcvd-py

Python 3 port of RCVD project
git clone git://git.luxferre.top/rcvd-py.git
Log | Files | Refs | README

rcvd.py (6655B)


      1 #!/usr/bin/env python3
      2 # rcvd.py: a port of RCVD.js library to Python 3
      3 # RCVD is a small library for syncing time on BLE-enabled Casio watches
      4 # This script also offers a standalone way of running the sync
      5 # Tested on: GW-B5600BC, GMW-B5000D, OCW-T200S, GM-B2100BD, GA-B001G, DW-B5600G
      6 # Created by Luxferre in 2022, ported to Python in 2024
      7 # Released into public domain
      8 
      9 import asyncio, time, ntplib
     10 from bleak import BleakScanner, BleakClient
     11 from bleak.exc import BleakDBusError
     12 
     13 # time fetching part
     14 # must return a dictionary with the following fields:
     15 # year, month, date, hour, minute, second, day (0-6)
     16 def fetchtime(params = {}):
     17   res = {}
     18   delta = 0.0 # in milliseconds
     19   if 'delta' in params:
     20     delta = float(params['delta']) / 1000
     21   if 'server' in params and params['server'] is not None: # NTP server set
     22     ntpver = 3
     23     if 'version' in params:
     24       ntpver = params['version']
     25     c = ntplib.NTPClient()
     26     resp = c.request(params['server'], version=ntpver)
     27     unixtm = resp.tx_time
     28   else: # use current system time by default
     29     unixtm = time.time()
     30   tm = time.localtime(unixtm + delta) # account for delta
     31   res['year'] = tm.tm_year
     32   res['month'] = tm.tm_mon
     33   res['date'] = tm.tm_mday
     34   res['hour'] = tm.tm_hour
     35   res['minute'] = tm.tm_min
     36   res['second'] = tm.tm_sec
     37   res['day'] = tm.tm_wday
     38   return res
     39 
     40 # device interaction part
     41 
     42 # generate full Casio-specific UUID from the short one
     43 def fullid(s):
     44   return '26eb00' + s + '-b012-49a8-b1f8-394fb2032b0f'
     45 
     46 # populate constants
     47 CASIO_SYNC_SRV = fullid('0d') # for service discovery
     48 READER_CHAR = fullid('2c')    # for parameter reading
     49 WRITER_CHAR = fullid('2d')    # for parameter writing
     50 
     51 # the queue for GATT notification data
     52 notiqueue = asyncio.Queue()
     53 
     54 # GATT notification handler
     55 def readfiller(sender, data):
     56   global notiqueue
     57   notiqueue.put_nowait(data)
     58 
     59 # execute a read command:
     60 # first write the value without response to READER_CHAR
     61 # then read the result from the WRITER_CHAR (!) notifier
     62 async def exec_read(client, *cmdargs):
     63   global notiqueue
     64   await client.write_gatt_char(READER_CHAR, bytes(cmdargs), response=False)
     65   res = await notiqueue.get()
     66   return res
     67 
     68 # execute a write command:
     69 # write to WRITER_CHAR with an immediate response
     70 async def exec_write(client, cmd: bytearray):
     71   res = await client.write_gatt_char(WRITER_CHAR, cmd, response=True)
     72   return res
     73 
     74 # handshake sequence, returns the watch model
     75 async def handshake(client):
     76   rawmodel = await exec_read(client, 0x23) # read the raw model
     77   model = rawmodel[1:].decode('utf-8').rstrip('\x00') # human-readable model
     78   features = await exec_read(client, 0x10) # read feature setting
     79   if features[8] < 2: # we entered full setting mode, sync app info first
     80     info = await exec_read(client, 0x22) # detect if BLE has been reset
     81     # it has been reset if it consists of 11 FF bytes and then 00
     82     if info[:12] == b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00':
     83       restorecmd = b'\x22\x34\x88\xf4\xe5\xd5\xaf\xc8\x29\xe0\x6d\x02'
     84       await exec_write(client, restorecmd)
     85   return model
     86 
     87 # presync property cycling required to init the time sync process
     88 # these parameters need to be read from the watch and then written back
     89 # the set of the parameters is model-specific
     90 # this function returns the final precycle list based on the mode
     91 def precycle_prop_list(model: str):
     92   # full property list for primary targets, GW-B/GMW-B models
     93   props = [[30,0], [30,1], [30,2], [30,3], [30,4], [30,5],
     94            [31,0], [31,1], [31,2], [31,3], [31,4], [31,5]]
     95   if model.find('OCW') > 5 or model.find('GWR-B1000') > 5:
     96     props = [[30,0], [30,1]]
     97   elif (model.find('B2100') > 5 or model.find('B001') > 5 \
     98         or model.find('DW-B5600') > 5 or model.find('GST-B') > 5 \
     99         or model.find('MSG-B') > 5 or model.find('ECB') > 5):
    100     props = [[30,0], [30,1], [31,0], [31,1]]
    101   return [[29,0], [29,2], [29,4]] + props # concatenate the prop lists
    102 
    103 # sync the watch from here
    104 async def dosync(client, timeparams):
    105   global notiqueue
    106   print('Starting handshake...')
    107   await client.start_notify(WRITER_CHAR, readfiller) # set up notifications
    108   model = await handshake(client) # run the handshake and detect watch model
    109   print('Connected to %s, preparing to sync...' % model)
    110   precycle_props = precycle_prop_list(model)
    111   for prop in precycle_props: # just read and write again
    112     data = await exec_read(client, prop[0], prop[1])
    113     await exec_write(client, data)
    114   print('Fetching the time...')
    115   timeparts = fetchtime(timeparams)
    116   print('Syncing the time...')
    117   timelist = [9, # sync time command
    118     timeparts['year'] & 255, timeparts['year'] >> 8,
    119     timeparts['month'], timeparts['date'],
    120     timeparts['hour'], timeparts['minute'], timeparts['second'],
    121     timeparts['day'], 0, 0, 1]
    122   try: # some models disconnect prematurely as soon as sync OK
    123     await exec_write(client, bytes(timelist))
    124   except (EOFError, BleakDBusError):
    125     pass
    126   print('Time synced')
    127 
    128 
    129 # the process starts from here
    130 # accepts fetchtime parameters and BLE scan timeout
    131 async def syncstart(timeparams):
    132   # list of IDs to scan
    133   devices = await BleakScanner.discover(timeout=timeparams['timeout'],
    134     return_adv=True,
    135     service_uuids=['00001804-0000-1000-8000-00805f9b34fb', CASIO_SYNC_SRV])
    136   for d in devices: # enumerate found watches
    137     async with BleakClient(str(d)) as client:
    138       await dosync(client, timeparams) # init the client by BLE MAC
    139 
    140 if __name__ == '__main__': # application entry point
    141   from argparse import ArgumentParser
    142   parser = ArgumentParser(description='RCVD: an opensource time synchronizer for BLE-enabled Casio watches', epilog='(c) Luxferre 2024 --- No rights reserved <https://unlicense.org>')
    143   parser.add_argument('-t', '--timeout', type=int, default=4, help='BLE scanning timeout (in seconds, default 4)')
    144   parser.add_argument('-d', '--delta', type=int, default=500, help='Manual delta correction (in ms, must be determined individually, 500 ms by default)')
    145   parser.add_argument('-s', '--ntp-server', type=str, default=None, help='NTP server to sync from (if not specified then will sync from the local system time)')
    146   parser.add_argument('-n', '--ntp-version', type=int, default=4, help='NTP protocol version to use (default 4)')
    147   args = parser.parse_args()
    148   params = { # populate parameters from the command line
    149     'server': args.ntp_server, 'version': args.ntp_version,
    150     'delta': args.delta, 'timeout': args.timeout
    151   }
    152   if params['version'] < 1 or params['version'] > 4:
    153     params['version'] = 4
    154   print('RCVD by Luxferre\nPress sync button on the watch ASAP to connect')
    155   asyncio.run(syncstart(params)) # start the process