Integrating With Flowcloud

All of this so far has been fine and dandy, but you're hardly going to want to be fiddling with the Ci20 every time you want to operate a bluetooth lightbulb. Wouldn't we much rather control it from our personal computer, or perhaps a smart phone? To that end, we will now work on a way of integrating our bluetooth device with FlowCloud.

To make things as generic as possible, we'll use FlowCloud's asynchronous messaging API to send commands in xml format to the Ci20, which will interpret them and send them on to the device. We'll also provide a way for the sender to subscribe to notif/indications on a handle, and then forward those notifications on as a reply to the subscription message. Install FlowCloud like so (the sudo password for the Ci20 is 'ci20'):

pushd /etc/apt/sources.list.d/
sudo wget http://deb.flowworld.com/flowsdk.list
popd
sudo apt-get update
sudo apt-get install libflowcore-python libflowmessaging-python

Note: the Flow Python API currently only works with Python 2.7.

Basics

Lets start by writing a very basic Flow script:

#!/usr/bin/env python

# third party
from libflowcore import *
from libflowmessaging import *

FLOW_SERVER_URL = 'http://ws-uat.flowworld.com' 
FLOW_SERVER_KEY = 'Ph3bY5kkU4P6vmtT'
FLOW_SERVER_SECRET = 'Sd1SVBfYtGfQvUCR'

# step 1: Initialise Flow libraries, connect to server
def initialise_flow():
    """Initialises Flow libraries and then connects the the Flow server.

    Initialises LibFlowCore and LibFlowMessaging, then connects to the Flow
    server specified in the config. If any of these steps fails, ensures 
    that LibFlowCore and LibFlowMessaging are shutdown properly.

    Returns:
        bool: Whether or not initialisation was successful.
    """
    result = False

    # Get server details
    url = FLOW_SERVER_URL
    key = FLOW_SERVER_KEY
    secret = FLOW_SERVER_SECRET

    if FlowCore_Initialise():
        FlowCore_RegisterTypes()

        if FlowMessaging_Initialise():
            FlowMessaging_RegisterTypes()

            if FlowClient_ConnectToServer(url, key, secret, 1):
                result = True
            else:
                FlowMessaging_Shutdown()
                FlowCore_Shutdown()
                print('Failed to connect to FlowCloud, error code %s'
                      % FlowThread_GetLastError())
        else:
            FlowCore_Shutdown()
            print('LibFlowMessaging initialisation failed, error code %s'
                  % FlowThread_GetLastError())
    else:
        print('LibFlowCore initialisation failed, error code:'
              % FlowThread_GetLastError())

    return result

# last step: Shut down Flow
def shutdown_flow():
    """Shuts down the Flow libaries."""
    FlowMessaging_Shutdown()
    FlowCore_Shutdown()

def main():
    result = False
    if initialise_flow():
        memory_manager = FlowMemoryManager_New()

        try:
            # meat goes here
            result = True
        finally:
            FlowClearMemory(memory_manager)
            shutdown_flow()

    return result

if __name__ == '__main__':
    main()

This script does the bare minimum necessary for us to start working with Flow: intialise the libraries, connect to the Flow server, create a memory manager (although why this is necessary in Python is quite beyond me), then free the memory manager and shut down the Flow libaries. How boring! A few things to note:

Logging / config set up

Let's improve things using logging and configparser. Note: docstrings that have not changed are omitted - pay attention to this if copy/pasting the code below.

#!/usr/bin/env python

# standard library
import logging
import ConfigParser
from collections import OrderedDict  # Used as types for configs
import os                            # Used for os.linesep in default config

# third party
from libflowcore import *
from libflowmessaging import *


CONFIG_FILE = 'config.cnf'  # location of configuration file
DEFAULT_CONFIG = OrderedDict(
    [ ('FlowServer', OrderedDict( 
        [ ('url', 'http://ws-uat.flowworld.com')
        , ('key', 'Ph3bY5kkU4P6vmtT')
        , ('secret', 'Sd1SVBfYtGfQvUCR')
        ]
       ))
    , ('Logging', OrderedDict(
        [ ('level', 'info')
        , ('format', ('%(asctime)s %(name)s.%(funcName)s:%(lineno)d'
                      + os.linesep + '%(levelname)s: %(message)s'))
        ]
       ))
    ]
)

_logger = logging.getLogger(__name__)


def get_config(config_file):
    """Gets the configuration.

    If `config_file` does not exist, we get the default configuration and
    write a new configuration file at that path.

    Args:
        config_file (str|bytes): the path to the configuration file

    Returns:
        collections.OrderedDict: A nested dictionary {str:{str:str}} matching
            the configuration in `config_file`. For example, the value of
            option 'url' in section 'FlowServer' can be accessed using
            returned['FlowServer']['url'].
    """
    _logger.debug('Fetching config from %s' % config_file)
    config = OrderedDict()
    parser = ConfigParser.RawConfigParser()

    try:
        # get config from file
        with open(config_file, 'r') as config_file_object:
            parser.read(config_file_object)

    except IOError as e:
        if e.errno == 2:  # File not found
            # write default config
            write_config(DEFAULT_CONFIG, config_file)
            # load default config
            _logger.debug('File not found, loaded default config.')
            return DEFAULT_CONFIG
        else:  # not a clue
            raise e

    # copy config into dictionary
    for section in parser.sections():
        config[section] = OrderedDict(parser.items(section))

    _logger.debug('Config fetch successful.')
    return config


def write_config(config, config_file):
    """Writes the configuration to file.

    Args:
        config (collections.OrderedDict): the configuration to save, in the
            format {str:{str:str}} ({section:{option:value}}).
        config_file (str|bytes): the path to the config file.
    """
    _logger.debug('Writing config to %s' % config_file)
    parser = ConfigParser.RawConfigParser()

    # load config into parser
    for section in config.keys():
        parser.add_section(section)
        for option, value in config[section].items():
            parser.set(section, option, value)

    # write config
    with open(config_file, 'w') as config_file_object:
        parser.write(config_file_object)

    _logger.debug('Config write successful.')


# step 1: Initialise Flow libraries, connect to server
def initialise_flow():
    """[...]"""
    result = False

    # fetch config
    config = get_config(CONFIG_FILE)
    url = config['FlowServer']['url']
    key = config['FlowServer']['key']
    secret = config['FlowServer']['secret']

    if FlowCore_Initialise():
        _logger.debug('FlowCore initialised successfully.')
        FlowCore_RegisterTypes()

        if FlowMessaging_Initialise():
            _logger.debug('FlowMessaging initialised successfully.')
            FlowMessaging_RegisterTypes()

            if FlowClient_ConnectToServer(url, key, secret, 1):
                _logger.debug('Connected to server at %s' % url)
                result = True
            else:
                FlowMessaging_Shutdown()
                FlowCore_Shutdown()
                _logger.error('Failed to connect to server at %s, error %s'
                             % (url, FlowThread_GetLastError())
        else:
            FlowCore_Shutdown()
            _logger.error('LibFlowMessaging initialisation failed, error %s'
                         % FlowThread_GetLastError())
    else:
        _logger.error('LibFlowCore initialisation failed, error %s'
                     % FlowThread_GetLastError())

    return result

# last step: Shut down Flow
def shutdown_flow():
    """[...]"""
    _logger.debug('Shutting down the Flow libraries.')
    FlowMessaging_Shutdown()
    FlowCore_Shutdown()

def main():
    result = False
    _logger.info('Initialising Flow libraries...')
    if initialise_flow():
        _logger.info('Initilisation successful.')
        memory_manager = FlowMemoryManager_New()

        try:
            # meat goes here
            result = True
        finally:
            FlowClearMemory(memory_manager)
            _logger.info('Shutting down Flow...')
            shutdown_flow()

    return result

if __name__ == '__main__':
    # set up _logger
    def _logger_setup(config):
        section = config['Logging']
        log_level_dict = { 'debug': logging.DEBUG
                         , 'info': logging.INFO
                         , 'warning': logging.WARNING
                         , 'error': logging.ERROR
                         , 'critical': logging.CRITICAL
                         }
        log_level = log_level_dict[section['level'].lower()]
        log_format = section['format']
        root = logging.getLogger()
        root.setLevel(log_level)
        root.handlers = []
        formatter = logging.Formatter(log_format)
        if section.get('logfile', None):
            fh = logging.FileHandler(section['logfile'])
            fh_log_level = section.get('logfile_level', None)
            fh_log_level = log_level_dict[fh_log_level.lower()] \
                    if fh_log_level else log_level
            fh.setLevel(fh_log_level)
            fh_formatter = section.get('logfile_format', None)
            fh_formatter = loggin.Formatter(fh_formatter) \
                    if fh_formatter else formatter
            fh.setFormatter(fh_formatter)
            root.addHandler(fh)
        if section.get('stream', '1').lower() in {'1', 'yes', 'true', 'on'}:
            sh = logging.StreamHandler()
            sh_log_level = section.get('stream_level', None)
            sh_log_level = log_level_dict[sh_log_level.lower()] \
                    if sh_log_level else log_level
            sh.setLevel(sh_log_level)
            sh_formatter = section.get('stream_format', None)
            sh_formatter = loggin.Formatter(sh_formatter) \
                    if sh_formatter else formatter
            sh.setFormatter(sh_formatter)
            root.addHandler(sh)

    _logger_setup(DEFAULT_CONFIG)
    _logger_setup(get_config(CONFIG_FILE))

    main()

That may look like a lot of code for little benefit, but it will make our lives a lot easier later on. For instance, even though we're still defining the default configuration in a module-level variable, we can override it by editing the config file, which will be created in ../config.cnf when we first run the script. Note that we set up the logger twice, once with the default configuration and once with the loaded configuration - this is because we've used the logger in our get_config function. When it is created (if we do not create it ourselves), our config file will look like this:

[Flow Server]
url = http://ws-uat.flowworld.com      ; or some other valid url
key = Ph3bY5kkU4P6vmtT                 ; or some other valid key
secret = Sd1SVBfYtGfQvUCR              ; or some other valid secret

[Logging]
; level may be one of [debug|info|warning|error|critical]
level = info                           ; messages below this level log to
                                       ; neither stream nor file
format = %(asctime)s %(name)s.%(funcName):%(lineno)d
    %(levelname)s: %(message)s         ; see Python logging docs

; optional 
stream = bool                          ; whether or not to log to stream
stream_level = info                    ; minimum level for stream
stream_format = [some valid format]    ; format for stream messages
logfile = path                         ; file to log ouput to
logfile_level = error                  ; minimum level for file logs
logfile_format = [some valid format]   ; format for file messages

We'll add to this file as we find more things we might want to configure; this will do us for now.

Logging in to Flow as a device

Up until now, we've been skirting around a much larger problem: our script doesn't do anything useful or interesting! Let's fix that by logging in as a device:

#!/usr/bin/env python

# standard library
import logging
import configparser
from collections import OrderedDict  # Used as type for configs
import os                            # Used for os.linesep in default config

# third party
from libflowcore import *
from libflowmessaging import *

...

# step 1: Initialise Flow libraries, connect to server.
def initialise_flow():
    """[...]"""
    ...

# step 2: Login as device
def register_device():
    """Registers (logs in as) a Flow device, based on user input.

    Returns:
        bool: whether or not registration was successful.
    """
    result = False
    device_type = raw_input('Please input device type: ')
    address = raw_input('Please input address: ')
    serial_number = raw_input('Please input serial number: ')
    software_version = raw_input('Please input software version: ')
    name = raw_input('Please input name: ')
    registration_token = raw_input('Please input registration token: ')

    _logger.debug('Attempting to login as device.')
    if FlowClient_LoginAsDevice(
        device_type, device_address, device_serial_number, None,
        device_software_version, device_name, device_registration_token
      ):
        result = True
        _logger.debug('Device login successful.')
    else:
        _logger.error('Device login failed, error code: %d'
                     % FlowThread_GetLastError())

    return result

...

def main():
    result = False
    _logger.info('Initialising Flow libraries...')
    if initialise_flow():
        _logger.info('Initilisation successful.')
        memory_manager = FlowMemoryManager_New()

        try:
            if register_device():
                _logger.info('Logged in as device.') 
                # more meat please
                result = True
        finally:
            FlowClearMemory(memory_manager)
            _logger.info('Shutting down Flow...')
            shutdown_flow()

    return result

if __name__ == '__main__':
    ...

Now when we run the script, we'll be prompted for various bits of device information, and if they're all valid, we'll be told the login was successful. Progress! You'll need to set your device type here (you'll probably want ci20), and get a corresponding registration token. Set the address parameter to the MAC address of the peripheral you're using, sans colons, with letters in upper case. The other parameters can be set arbitrarily, as long as you're consistent.

Wouldn't it be easier not to have to specify these by hand every time we run the script? I'm glad you agree. Let's build them into the config.

...

# step 2: Login as device.
def register_device()
    """
    [...]
    """
    result = False
    write_needed = False
    config = get_config(CONFIG_FILE)
    # ensure there's a Device section in the config
    if not config.get('Device', None):
        config['Device'] = OrderedDict()

    parameters = ['device_type', 'address', 'serial_number',
                  'software_version', 'name', 'registration_token']

    for p in parameters:
        if not config['Device'].get(p, None): 
            write_needed = True
            config['Device'][p] = raw_input('Please input %s: '
                                            % p.replace('_', ' '))

    args = [config['Device'][p] for p in parameters]
    args.insert(3, None)

    _logger.debug('Attempting to login as device.')
    if FlowClient_LoginAsDevice(*args):
        result = True
        _logger.debug('Device login successful.')
        if write_needed and \
          raw_input('Save device details? [Y/n] ').lower() in {'', 'y', 'yes'}:
            write_config(config, CONFIG_FILE)
            _logger.debug('Wrote new device parameters to config file.')
    else:
        _logger.error('Device login failed, error code: %d'
                     % FlowThread_GetLastError())

    return result

...

Now the first time we run the script, if the device login is successful, it will add the device parameters to the config file. Note that we're now passing FlowClient_LoginAsDevice its arguments by building a list (args) and then unpacking it in the function call - this is just for convenience, as it means we don't have to assign variable names to each parameter. The rest of this code should be self-explanatory; remember that dict.get(arg1, arg2) attempts to use arg1 as a key to the dictionary, and returns arg2 if it fails.

One other thing - we're logging in as a device, yet our function is called register_device. Why is this? In FlowCloud, the two are one and the same. FlowCloud registration is idempotent. Regardless of the number of times a device (re)registers, it will always return the same ID. Thus, we don't need seperate functions for registering and logging in.

Listening for messages

Now that we've logged in as a device, we have a wealth of options open to us! We want to be able to receive and respond to asyncronous messages, so let's add a function to start the message listener:

...


# step 2: Login as device.
def register_device():
    """[...]"""
    ...

# step 3: Start the message listener.
def start_message_listener(callback):
    """Starts the message listener in a new thread.

    Args:
        callback (f(FlowMessagingMessage)): A callback function, which will
            be passed each received message.
    """
    _logger.debug('Starting message listener with callback %s'
                 % callback.__name__)
    Flow_SetMessageReceivedListenerForDevice(callback)

def print_message(message):
    """Prints the contents of a message to stdout.

    Args:
        message (FlowMessagingMessage): The message to print.
    """
    print(FlowMessagingMessage_GetContent(message))

...

def main():
    result = False
    _logger.info('Initialising Flow libraries...')
    if initialise_flow():
        _logger.info('Initilisation successful.')
        memory_manager = FlowMemoryManager_New()

        try:
            if register_device():
                _logger.info('Logged in as device.') 

                _logger.info('Starting message listener...')
                start_message_listener(print_message)
                raw_input('Press enter to stop waiting:' + os.linesep)
                result = True
        finally:
            FlowClearMemory(memory_manager)
            _logger.info('Shutting down Flow...')
            shutdown_flow()

    return result

if __name__ == '__main__':
    ...

That was easy! Now when we run the script, it listens for any messages sent to the registered device. Unfortunately, nothing is sending any messages to the device, so we can't check that it's working yet. One way we can fix that is by setting up device presence publishing information, so that we can send messages to the device using the MakeItFlow app. We'll have to pretend the Ci20 is a WiFire board, but I'm sure no one will mind. Note: if you don't have a smart phone, or other device capable of running the MakeItFlow app, feel free to skip this next bit.

Create a new file called presence.xml in the same directory as the python script, and fill it with the following:

<?xml version="1.0" encoding="UTF-8"?>
<presence xmlns="urn:ietf:params:xml:ns:pidf"
    xmlns:wifire_starterapp="com.imgtec.flow"
    entity="sip:%(address)s@sip-uat.flowworld.com">

    <tuple id="standard">
        <status>
            <basic>open</basic>
        </status>
    </tuple>

    <tuple id="presence-info">
        <wifire_starterapp:pres-info>
            <datetime>%(datetime)s</datetime>
        </wifire_starterapp:pres-info>
    </tuple>

    <tuple id="network-status">
        <wifire_starterapp:net-status>
            <wifire_starterapp:ssid>IMG-Demo</wifire_starterapp:ssid>
            <wifire_starterapp:state>connected</wifire_starterapp:state>
            <wifire_starterapp:rssi_dbm>-100</wifire_starterapp:rssi_dbm>
        </wifire_starterapp:net-status>
    </tuple>

    <tuple id="board-health">
        <wifire_starterapp:mrf24w-status>
            <wifire_starterapp:status>ok</wifire_starterapp:status>
        </wifire_starterapp:mrf24w-status>

        <wifire_starterapp:wifire-status>
            <wifire_starterapp:status>ok</wifire_starterapp:status>
            <wifire_starterapp:uptime>0d:84h:23m:10s</wifire_starterapp:uptime>
        </wifire_starterapp:wifire-status>
    </tuple>

</presence>

Now define a new function in the python script like so:

#!/usr/bin/env python

# standard library
import logging
import configparser
from collections import OrderedDict  # Used as type for configs
import os                            # Used for os.linesep
import datetime                      # Used in device presence info
import time                          # Used to publish periodically
import threading                     # Used to publish in the background

# third party
from libflowcore import *
from libflowmessaging import *


CONFIG_FILE = 'config.cnf'  # location of configuration file
DEFAULT_INTERVAL = '14.5'
DEFAULT_CHECK_PERIOD = '0.01'
DEFAULT_CONFIG = OrderedDict(
    [ ('FlowServer', OrderedDict( 
        [ ('url', 'http://ws-uat.flowworld.com')
        , ('key', 'Ph3bY5kkU4P6vmtT')
        , ('secret', 'Sd1SVBfYtGfQvUCR')
        ]
       ))
    , ('Logging', OrderedDict(
        [ ('level', 'info')
        , ('format', ('%(asctime)s %(name)s.%(funcName)s:%(lineno)d'
                      + os.linesep + '%(levelname)s: %(message)s'))
        ]
       ))
    ]
)

...

# step 2: Login as device. 
def register_device():
    """[...]"""
    ...

# step 2.5: Publish device presence information for MakeItFlow app
def publish_device_presence(memory_manager):
    """Publishes the device presence so the MakeItFlow app will send messages
    to our listener.

    Args:
        memory_manager (FlowMemoryManager): 
    Returns:
        bool: Whether the publishing was successful
    """
    _logger.debug('Attempting to publish device presence...')
    result = False
    this_device = FlowClient_GetLoggedInDevice(memory_manager)
    address = FlowDevice_GetDeviceID(this_device)
    datetime_ = datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ')
    with open('presence.xml', 'r') as presence_file:
        presence_info = presence_file.read()

    # substitute address/datetime into presence.xml
    presence_info %= {'address': address, 'datetime': datetime_}

    if FlowMessaging_Publish(
        FLOW_MESSAGING_TOPIC_DEVICE_PRESENCE, None, presence_info,
        len(presence_info), 300
      ):
        _logger.debug('Published device presence.')
        result = True
    else:
        _logger.error('Could not publish device presence, error %d'
                      % FlowThread_GetLastError())

    return result

...

def main():
    result = False
    _logger.info('Initialising Flow libraries...')
    if initialise_flow():
        _logger.info('Initilisation successful.')
        memory_manager = FlowMemoryManager_New()

        try:
            if register_device():
                _logger.info('Logged in as device.') 

                # Periodically publish device presence
                _logger.info('Starting device presence publisher thread...')
                section = get_config(CONFIG_FILE).get('Presence',OrderedDict())
                interval = float(section.get('interval', DEFAULT_INTERVAL))
                check_period = float(section.get('check_period', 
                                                 DEFAULT_CHECK_PERIOD))

                def presence_loop():
                    i = interval + 1
                    while publish:
                        if i > interval:
                            publish_device_presence(memory_manager)
                            i = 0
                        time.sleep(check_period)
                        i += check_period

                presence_thread = threading.Thread(target=presence_loop)
                presence_thread.start()

                # Listen for messages
                _logger.info('Starting message listener...')
                start_message_listener(print_message)
                raw_input('Press enter to stop waiting:' + os.linesep)

        finally:
            result = True
            time.sleep(check_period)
            FlowClearMemory(memory_manager)
            _logger.info('Shutting down Flow...')
            shutdown_flow()

    return result

if __name__ == '__main__':
    ...

Note: In main, we check for another (optional) section in the config file, where interval and check_period can be set - interval is the interval between device presence updates, and check_period is the time between each check of the publish variable to see if the presence thread should stop. We must make sure that the presence thread stops before we free the memory manager and shut down Flow, or we get a segmentation fault.

Now download the MakeItFlow app (iOS, Android) onto your smart phone, fire it up and log in. Run the script - your Ci20 (whatever you named it) should appear in the Connected Devices list of the MakeItFlow app with a green cloud next to it. Select it and press 'Interact with selected', then send a "CLEAR LED" message. Your python script should receive the message and print something like the following:

<?xml version="1.0" encoding="UTF-8"?><command><sent type="datetime">2015-09-15
T12:14:15Z</sent><to>sip:11c53549-6d4e-43a1-ae50-aee022c30326@sip-uat.flowworld
.com</to><from>sip:blaine.rogers.imgtec.com@sip-uat.flowworld.com</from><client
id type="integer">2c7bf87a-3e7a-4aba-bdb2-5d4272d82c64</clientid type="integer"
><requestid type="integer">1</requestid type="integer"><details>CLEAR LED</deta
ils></command>

For testing purposes, it may also be handy to send messages from the command line. For this, we can borrow ex-async-message-send-user2device.py from the FlowCloud messaging howtos. When it asks for a Device ID, look it up on this page. If I use ex-async-message-send-user2device.py to send a message, it appears in our script's output as

hello hi

If you're planning on using ex-async-message-send-user2device.py, replace the SendAsyncMessageToDevice function with the following:

def SendAsyncMessageToDevice():
    import readline
    result = False
    memory_manager = FlowMemoryManager_New()
    if memory_manager:
        try: 
            loggedInUser = FlowClient_GetLoggedInUser(memory_manager)
            my_devices = FlowUser_RetrieveOwnedDevices(loggedInUser, 20)
            if my_devices != None and FlowDevices_GetCount(my_devices) != 0:
                print("-----------------------\n"
                      "Please select a device:")
                for i in range(FlowDevices_GetCount(my_devices)):
                    device = FlowDevices_GetItem(my_devices, i)
                    print(" %d: %s" % (i, FlowDevice_GetDeviceName(device)))
                index = int(raw_input("\nYour choice: "))
                target = FlowDevices_GetItem(my_devices, index)
                print("-----------------------")
                if target:
                    deviceID = FlowDevice_GetDeviceID(target)
                    sendAgain = True
                    while sendAgain:
                        message = raw_input("\n\rPlease type a message: ")
                        Flow_SendMessageToDeviceAsync(
                            deviceID, "text/plain", message, len(message), 
                            100, MessageSendAsyncCallBack
                            )
                        result = True
                        if raw_input("\n\rSend another message Y/n?").lower() \
                          in {'n', 'no'}:
                            sendAgain = False
                else:
                    print('\n\rCouldn\'t fetch device with that index!')
            else:
                print('\n\rNo devices found! :(')
        finally:
          FlowClearMemory(memory_manager)

  return result

This will automatically fetch a list of valid devices from the server and let you select on by its index, meaning you don't have to copy-paste your device ID in each time you run it. It will also let you type in a message, rather than always sending "hi hello".

Acting on messages

We can now send messages to the Ci20! As yet, it doesn't do much when it receives a message; it just prints the contents to stdout. Let's do what we came here for, and use the messages we receive as instructions for communicating with a BTLE peripheral, using the library we built in the previous post. Before we start writing code, let's list the things we want to be able to do, then define an xml schema to do it.

  1. Write data to a handle.
  2. Read data from a handle.
  3. Subscribe to notif/indicaitons on a handle.
<!--Sender to Device-->
<?xml version="1.0" encoding="UTF-8"?>
<bluetooth-le>
    <!--one or more of any of the following-->
    <write handle=hex value=bytes/>
    <read handle=hex/>
    <subscribe handle=hex type=["notification"|"indication"|"both"]/>
</bluetooth-le>

<!--Device reply on notification-->
<?xml version="1.0" encoding="UTF-8"?>
<bluetooth-le>
    <!--one or more of any of the following-->
    <read-success handle=hex value=bytes/>
    <notification handle=hex value=bytes/>
</bluetooth-le>

Where hex and bytes are strings of the form "[0-9A-Fa-f]{1,4}" and "([0-9A-Fa-f]{2} ?)*[0-9A-Fa-f]{2}" respectively; that is, hex is a string of up to four hex digits, and bytes is a string of pairs of hex digits.

type valid invalid
hex "0" ""
"0043" "-y"
"fFaE" "01234"
bytes "00" ""
"12 ab 2A" "000 00 00 0"
"F12e D34c B5" "12 23 34 "

Now we're ready to improve our messaging callback:

#!/usr/bin/env python

# standard library
import logging
import configparser
from collections import OrderedDict  # type for configs
import os                            # for os.linesep
import datetime                      # for device presence info
import time                          # publish periodically
import threading                     # publish in the background
import xml.etree.ElementTree as ET   # parse xml data from messages
import re                            # check handle/value formats
import partial                       # construct callbacks
import getpass                       # get sudo password without echoing

# third party
from libflowcore import *
from libflowmessaging import *

# package
from . import bluetooth


CONFIG_FILE = 'config.cnf'  # location of configuration file
DEFAULT_INTERVAL = '14.5'
DEFAULT_CHECK_PERIOD = '0.01'
DEFAULT_CONNECT_TIMEOUT = '3.0'
DEFAULT_CONFIG = OrderedDict(
    [ ('FlowServer', OrderedDict( 
        [ ('url', 'http://ws-uat.flowworld.com')
        , ('key', 'Ph3bY5kkU4P6vmtT')
        , ('secret', 'Sd1SVBfYtGfQvUCR')
        ]
       ))
    , ('Logging', OrderedDict(
        [ ('level', 'info')
        , ('format', ('%(asctime)s %(name)s.%(funcName)s:%(lineno)d'
                      + os.linesep + '%(levelname)s: %(message)s'))
        ]
       ))
    ]
)


_logger = logging.getLogger(__name__)


class _SenderInfo(object):
    """Wrapper for sender information.

    Attributes:
        id (FlowID): The FlowID of the sender.
        func (f(FlowID, str, str, int, int, f(bool))): The function to use
            when replying to this sender.
    """
    def __init__(self, type_, id_):
        """Initialises the object.

        Args:
            type_ (str): 'device' or 'user' only - the type of the sender.
            id_ (FlowId|str): The FlowID of the sender.

        Raises:
            ValueError: If `type_` is neither 'user' nor 'device'.
        """
        self.id = id_
        if type_ == 'device':
            self.func = Flow_SendMessageToDeviceAsync
        elif type_ == 'user':
            self.func = Flow_SendMessageToUserAsync
        else:
            raise ValueError("type_ must be 'user' or 'device'")

    @classmethod
    def from_message(cls, message):
        """Retrieves sender information from a FlowMessagingMessage.

        Args:
            message (FlowMessagingMessage): The message to inspect.

        Raises:
            ValueError: In the highly unlikely event that `message` has
            neither a user nor a device ID.
        """
        user_id = FlowMessageMessage_GetSenderUserID(message)
        device_id = FlowMessagingMessage_GetSenderDeviceID(message)
        if user_id: return cls('user', user_id)
        elif device_id: return cls('device', device_id)
        else: raise ValueError('message sent by neither user nor device??')

    def send_reply(self, reply_text, callback=None):
        """Sends a text message to self.id

        Args:
            reply_text (str): The text to send.
            callback (f(bool)|None): The callback for the send function. If
                None, self.message_callback will be used.
        """
        if callback == None: callback = self.message_callback
        self.func(
            self.id, 'text/plain', reply_text,
            len(reply_text), 100, callback
            )

    @staticmethod
    def message_callback(result):
        """Logs whether the message was sent successfully.

        Args:
            result (bool): Whether the message was sent successfully.
        """
        if result:
            _logger.info('Message sent successfully.')
        else:
            _logger.error('Message not sent, error %d'
                          % FlowThread_GetLastError())

...

#step 3: Start the message listener
def start_message_listener(callback):
    """[...]"""
    ...


# step 4: interpret received message
def interpret_btle_message(device, message):
    """Reads an xml-formatted message and executes bluetooth commands therein.

    The supported xml format is as follows:

        <?xml version="1.0" encoding="UTF-8"?>
        <bluetooth-le>
            <!--one or more of any of the following-->
            <write handle=hex value=bytes/>
            <read handle=hex/>
            <subscribe handle=hex type=["notification"|"indication"|"both"]/>
        </bluetooth-le>

    Where `hex` matches /"[0-9A-Fa-f]{1,4}"/ 
      and `bytes` matches /"([0-9A-Fa-f]{2} ?)*[0-9A-Fa-f]{2}"/

    Args:
        message (FlowMessagingMessage): The message to read.
        device (bluetooth.BTLEDevice): The device to send decoded commands to.
    """
    def try_handle_convert(handle_str):
        """Converts a string of hex digits into an int.

        Args:
            handle_str (str): The string to try to convert.

        Returns:
            int or None: None if conversion failed or the handle didn't match
            the right pattern, else the converted int.
        """
        if not re.match(r'^[0-9A-Fa-f]{1,4}$', handle_str):
            return None
        return int(handle_str, 16)

    def try_value_convert(value_str):
        """Converts a string of pairs of hex digits into a bytearray.

        Args:
            value_str (str): The value to try to convert.

        Returns:
            bytearray or None: None if the conversion failed or the value
            didn't match the right pattern, else the converted bytearray.
        """
        if not re.match(r'^([0-9A-Fa-f]{2} ?)*[0-9A-Fa-f]{2}$', value_str):
            return None
        return bytearray.fromhex(value_str)

    content = FlowMessagingMessage_GetContent(message)
    _logger.debug('Received message: %s' % content)

    try:
        root = ET.fromstring(content)
    except ET.ParseError as e:
        _logger.error('could not parse message %s: %s'
                     % (content, str(e)))
        return

    if root.tag == 'bluetooth-le':
        for child in root:
            if child.tag == 'write':
                handle = try_handle_convert(child.get('handle', ''))
                value = try_value_convert(child.get('value', ''))
                if handle and value:
                    device.char_write(handle, value, False)
                    value_str = ''.join(['%02x' % b for b in value])
                    _logger.info('Wrote %s to handle %04x'
                                % (value_str, handle)) 
                else:
                    _logger.error('Could not parse handle/value for write')
            elif child.tag == 'read':
                handle = try_handle_convert(child.get('handle', ''))
                if handle:
                    try:
                        value = device.char_read_hnd(handle)
                        send_read_success(message, handle, value)
                    except bluetooth.NotificationTimeout:
                        _logger.error('Could not read from handle %04x'
                                     % handle)
                else:
                    _logger.error('Could not parse handle for read')
            elif child.tag == 'subscribe':
                handle = try_handle_convert(child.get('handle', ''))
                type_ = child.get('type', None)
                type__list = ['notification', 'indication', 'both']
                if handle and type_ in type__list:
                    try:
                        info = _SenderInfo.from_message(message)
                        callback = partial(_send_notification, info)
                        device.subscribe(
                            handle, callback=callback,
                            type_=type__list.index(type_)
                            )
                    except bluetooth.NoResponseError:
                        type__desc = 'notifications and indications' \
                                     if type_ == 'both' else type_ + 's'
                        _logger.info('subscribed to %s on %04x'
                                    % (type__desc, handle))
                    except bluetooth.NoResponseError:
                        _logger.error('Could not subscribe to handle %04x'
                                     % handle)
                else:
                    _logger.error('Could not parse handle/type for subscribe')
            else:
                _logger.error('Unsupported operation %s' % child.tag)


def send_read_success(message, handle, value):
    """Sends a read-success message to the sender of `message`.

    The message will be in the following format:

        <?xml version="1.0" encoding="UTF-8"?>
        <bluetooth-le>
            <read-success handle=hex value=bytes/>
        </bluetooth-le> 

    Where `hex` matches /"[0-9A-Fa-f]{1,4}"/ 
      and `bytes` matches /"([0-9A-Fa-f]{2} ?)*[0-9A-Fa-f]{2}"/

    Args:
        message (FlowMessagingMessage): The message to reply to.
        handle (int): The handle to send.
        value (bytearray): The value to send.
    """
    value_str = ''.join(['%02x' % b for b in value])
    _logger.debug('Sending read success message, handle %04x value %s'
                 % (handle, value_str))
    reply_text = ('<?xml version="1.0" encoding="UTF-8"?>'
                  '<bluetooth-le/>'
                      '<read-success handle="%04x" value="%s"/>'
                  '</bluetooth-le>') % (handle, value_str)
    reply_to_message(message, reply_text)


def _send_notification(sender_info, handle, value):
    """Sends a notification message to the subscriber at `sender_info`.

    The message will be in the following format:

        <?xml version="1.0" encoding="UTF-8"?>
        <bluetooth-le>
            <notification handle=hex value=bytes/>
        </bluetooth-le> 

    Where `hex` matches /"[0-9A-Fa-f]{1,4}"/ 
      and `bytes` matches /"([0-9A-Fa-f]{2} ?)*[0-9A-Fa-f]{2}"/

    Args:
        sender_info (_SenderInfo): Details of the subscriber to send
            the message to.
        handle (int): The handle to send.
        value (bytearray): The value to send.
    """
    value_str = ''.join(['%02x' % b for b in value])
    _logger.debug('Sending notification message, handle %04x value %s'
                 % (handle, value_str))
    reply_text = ('<?xml version="1.0" encoding="UTF-8"?>'
                  '<bluetooth-le/>'
                      '<notification handle="%04x" value="%s"/>'
                  '</bluetooth-le>') % (handle, value_str)
    sender_info.send_reply(reply_text)


def was_message_successful(result):
    """Logs whether a message was sent successfully.

    Args:
        result (bool): True iff the reply was successfully sent.
    """
    if result:
        _logger.info('Message sent successfully.')
    else:
        _logger.error('Message not sent, error %d' % FlowThread_GetLastError())


# step 5: reply to message if necessary
def reply_to_message(message, reply_text, callback=was_message_successful):
    """Sends a message containing `reply_text` to the sender of `message`.

    Args:
        message (FlowMessagingMessage): The message to reply to.
        reply_text (str): Sent as the text of the reply.
        callback (f(bool)): Passed the result of Flow_ReplyToMessageAsyc,
            i.e. a boolean that is True iff the reply was sent successfully. 
    """
    _logger.info('Sending message %s' % reply_text)
    Flow_ReplyToMessageAsync(
        message, 'text/plain', reply_text, len(reply_text), 100, callback
        )

...

def main():
    result = False
    _logger.info('Initialising Flow libraries...')
    if initialise_flow():
        _logger.info('Initilisation successful.')
        memory_manager = FlowMemoryManager_New()

        try:
            if register_device():
                _logger.info('Logged in as device.') 

                # Fetch config
                config = get_config(CONFIG_FILE)

                # get info for presence publishing loop
                # check_period must be defined before exceptions are raised
                section = config.get('Presence',OrderedDict())
                interval = float(section.get('interval', DEFAULT_INTERVAL))
                check_period = float(section.get('check_period', 
                                                 DEFAULT_CHECK_PERIOD))
                publish = True
                def presence_loop():
                    i = interval + 1
                    while publish:
                        if i > interval:
                            publish_device_presence(memory_manager)
                            i = 0
                        time.sleep(check_period)
                        i += check_period

                presence_thread = threading.Thread(target=presence_loop)

                ## Set up BTLEDevice
                sudo_pass = getpass.getpass('sudo password: ') \
                            if not raw_input('Are you root? [y/N]: ').lower() \
                            in {'y', 'yes'} else None
                # Reset the bluetooth controller so the scan doesn't fail
                _logger.info('Resetting the bluetooth controller...')
                bluetooth.reset_bluetooth_controller(sudo_pass)
                # Run a scan so gatttool doesn't throw a hissy fit
                _logger.info('Running an LE scan...')
                bluetooth.le_scan(sudo_pass, timeout=1)
                del sudo_pass

                write_needed = False
                if not config.get('BluetoothDevice', None):
                    config['BluetoothDevice'] = OrderedDict()
                    write_needed = True
                section = config['BluetoothDevice']
                if not section.get('address', None):
                    section['address'] = \
                        raw_input('Please input BTLE MAC address: ')
                    write_needed = True

                with bluetooth.BTLEDevice(section['address']) as device:xi
                    device.connect(float(
                        section.get('connect_timeout', DEFAULT_CONNECT_TIMEOUT)
                        ))
                    # connect was succesful, write good config
                    if write_needed and raw_input('Save config? [Y/n]')\
                      .lower() in {'', 'y', 'yes'}:
                        write_config(config, CONFIG_FILE)

                    # Start presence thread
                    _logger.info('Starting device presence thread...')
                    presence_thread.start()
                    # Listen for messages
                    _logger.info('Starting message listener...')
                    callback = partial(interpret_btle_message, device)
                    callback.__name__ = \
                        'partial(interpret_btle_message, device)'
                    start_message_listener(callback)
                    raw_input('Press enter to stop waiting:' + os.linesep)

        finally:
            result = True
            time.sleep(check_period)
            FlowClearMemory(memory_manager)
            _logger.info('Shutting down Flow...')
            shutdown_flow()

    return result

if __name__ == '__main__':
    ...

That is a lot of code. Let's go through what it all does and why it's necessary.

interpret_btle_message:

This is the function that is called whenever our device receives a message.

The two nested functions, try_handle_convert and try_value_convert, are short and boring - they just check whether a handle or value matches our specification in terms of what a hand or value should look like, and if so, converts them into int and bytearray respectively. They exist solely to reduce repetition later on.

We start by extracting the content from the message, then trying to parse it as xml. If we can't parse it, we log an error and return early. Otherwise, we check if the root node is tagged with bluetooth-le, as per our specification, and if it is, we attempt to execute each of its children as bluetooth commands.

The process for executing a command is similar for each command - we check the tag of child to find which command we should attempt, then extract and check the appropriate attributes. If we find the attributes and they're valid, we execute a command using one of the methods of the device, checking for any exceptions that might be thrown. If we're succesful, we log a message indicating as such (for read, this is handled in send_read_success).

send_read_success:

This function is called on a successful read - it could be moved into the body of interpret_btle_message, but (a) that function is long enough already, (b) we'd rather not specify xml five levels of indentation deep, which would make it difficult to track down if it needed changing and (c) it's possible we might want to send a read_success from other places, e.g. if we improved the specification to allow reading by UUID. Besides, it sits nicely next to _send_notification, a function with a very similar purpose which can't be moved into interpret_btle_message because it's only called as the callback to device.subscribe.

The actual code of this function is self-explanatory.

_send_notification:

This function is used as the callback to device.subscribe (or rather, a partial application of it is). Originally, this function was identical to send_read_success, with read-success changed to notification in the xml. It took a FlowMessagingMessage as its first parameter, which was passed to it using partial in interpret_btle_message, and passed that message on to reply_to_message. However, while writing the code I discovered that attempting to reply to the stored message caused a segmentation fault. Investigating further, I discovered that the stored message, when accessed in send_notificaiton, had no content nor sender information nor any other attributes. My hypothesis is this: interpret_btle_device is called from somewhere in the Flow libary, and is passed a message. Mid-function, the message is passed away to the BTLEDevice as part of a partial function, where a refrence to it is saved. Then control returns to interpret_btle_device, which exits and returns control to Flow. Flow then deletes the message as part of its cleanup. Because the Flow library is actually written in C, not Python, rather than just decrementing the reference count for the object and allowing it to continue to exist elsewhere, this actually deletes the object, so all references to it stop working. Frustrating!

We solve this by extracting the address of the sender while we still have the message, and passing that to _send_notification, rather than the message as a whole. We do the with the help of...

_SenderInfo:

This is a wrapper class for information about the sender of a message. We only need this because Flow makes a distinction between device and user IDs, which we don't want to differentiate between - we just want to reply to whatever sent a message. This class lets us bundle the ID of the sender with the appropriate function for replying to them. Note the second constructor from_message - we use this in the elif child.tag == 'subscribe' section of interpret_btle_message for convenience.

This class is private (leading underscore) at the moment, because it's just acting as a helper for _send_notification. _send_notification is private because it uses a private class in its parameters.

reply_to_message:

This function is fairly self-explanatory, what with it being two lines long. It exists to seperate the Flow bit of replying to a message from the BTLE bit (constructing xml etc) of replying to a message.

main:

Deep in the heart of main, we've added code to set up a BTLE device, between fetching the config for the presence loop and the actual starting of the presence thread. We begin by asking the user if they need a sudo password, and if so fetching it from them using getpass. Note: we use getpass instead of raw_input because getpass won't echo to stdout. We then reset the bluetooth adapter and run an LE scan. This shouldn't be necessary, but if we don't do it gatttool will fall over and die. Then, we fetch some more settings from the config: this is the same procedure we used in device registration, but now the section is 'BluetoothDevice' and the parameter we'll fetch and then optionally save is the MAC address of the bluetooth device. We then connect to the device, using a with block so we don't forget to close the connection, and if we succesfully connect, ask to save the config. We then start the presence thread and the message listener, but this time we pass the message listener a partial application of interpret_btle_message, in order that we can pass commands to the device without the help of, say, global variables.

Further improvements

Now, boot up the MakeItFlow app and send a message! If the script errors because it can't parse the xml, this is expected behaviour. Set the logging level to 'debug' in the config file and run it again, then look closely at the xml being sent by the MakeItFlow app.

...</from><clientid type="integer">2c7bf87a-3e7a-4aba-bdb2-5d4272d82c64</clien
tid type="integer" ><requestid type="integer">1</requestid type="integer"><deta
ils>...

Look at that! Attributes in the end-tags! For shame, MakeItFlow developers. If we want to recieve messages from the MakeItFlow app, we're going to have to find some way around this.

# step 4: interpret received message
def interpret_btle_message(device, message):
    """[...]"""
    ...
    def check_for_MakeItFlow(content):
        """Checks for a message from the MakeItFlow app.

        Args:
            content: The contentof the message to check.

        Returns:
            ET or None: The children of the details element of the message,
            if they could be parsed.
        """
        # check for MakeItFlow
        regexp = r'^<\?xml.*\?><command>.*<details>(.*)</details></command>$'
        match = re.search(regexp, content)
        if match:  # probably MakeItFlow
            _logger.debug('Found MakeItFlow command.')
            try: 
                return ET.fromstring(match.group(1).lower())
            except ET.ParseError as e2:
                _logger.error('could not parse message %s: %s'
                         % (match.group(1).lower(), str(e2)))
                return None

    content = FlowMessagingMessage_GetContent(message)
    _logger.debug('Received message: %s' % content)

    try:
        root = ET.fromstring(content)
    except ET.ParseError as e:
        # check for MakeItFlow
        root = check_for_MakeItFlow(content)
        if root == None:
            _logger.error('could not parse message %s: %s'
                     % (content, str(e)))
            return

    # in case the MakeItFlow developers ever repair their xml
    if root.tag == 'command':
        root1 = check_for_MakeItFlow(content)
        if root1:
            root = root1

    if root.tag == 'bluetooth-le':
        ...

There! Now we can send from the MakeItFlow app, and it should be received just fine. Note: A quirk of MakeItFlow is that it sends commands in all caps, which is why we parse match.group(1).lower() instead of match.group(1). Of course, typing xml on a mobile device is very awkward, so you might want to implement another way to control the bluetooth device. For example, if the device is a smart bulb, sending "ON" from the MakeItFlow app might switch the bulb on, and sending "OFF" might switch it off. The implementation of such a scheme is fairly easy, but is beyond the scope of this tutorial.

One final improvement we can make is to do away with the reply_to_message function and just use _SenderInfo.send_reply. When we do that, we should probably remove the leading _ that indicates that SenderInfo is private:

...

class SenderInfo(object):  # note the lack of leading _
    """Wrapper for sender information.

    Attributes:
        id (FlowID): The FlowID of the sender.
        func (f(FlowID, str, str, int, int, f(bool))): The function to use
            when replying to this sender.
    """
    ...

...

# step 4: interpret received message
def interpret_btle_message(device, message):
    """[...]"""
    ...
    if root.tag == 'bluetooth-le':
        for child in root:
            if child.tag == 'write':
                ...
            elif child.tag == 'read':
                handle = try_handle_convert(child.get('handle', ''))
                if handle:
                    try:
                        value = device.char_read_hnd(handle)
                        ### pass sender info, not message ###
                        info = SenderInfo.from_message(message)
                        send_read_success(info, handle, value)
                    except bluetooth.NotificationTimeout:
                        _logger.error('Could not read from handle %04x'
                                     % handle)
                else:
                    _logger.error('Could not parse handle for read')
            elif child.tag == 'subscribe':
                ...
                if handle and type_ in type__list:
                    try:
                        ### remove leading _s ###
                        info = SenderInfo.from_message(message)
                        callback = partial(send_notification, info)
                        device.subscribe(
                            handle, callback=callback,
                            type_=type__list.index(type_)
                            )
                        ...
                    except bluetooth.NoResponseError:
                        ...

def send_read_success(sender_info, handle, value):
    """Sends a read-success message to `sender_info`.

    ...

    Args:
        sender_info (SenderInfo): The info of the user/device to send to.
        handle (int): The handle to send.
        value (bytearray): The value to send.
    """
    ...
    # reply_to_message(message, reply_text)
    sender_info.send_reply(reply_text)

def send_notification(subscriber_info, handle, value):
    # Note how we've removed the leading _ from SenderInfo in the docstring!
    """Sends a notification message to the subscriber at `sender_info`. 

    ...

    Args:
        sender_info (SenderInfo): ...
        ...
    """
    ...

#Don't need these any more, delete them
#def was_message_successful(result): 
#def reply_to_message(message, reply_text, callback=was_message_successful):

...

And with that, we're done! You can find a full code listing for this post in the github repository for this blog.

Appendix A: Configuration file specification

[Flow Server] ; compulsory
url = http://ws-uat.flowworld.com      ; or some other valid url
key = Ph3bY5kkU4P6vmtT                 ; or some other valid key
secret = Sd1SVBfYtGfQvUCR              ; or some other valid secret

[Logging] ; compulsory
; level may be one of [debug|info|warning|error|critical]
level = info                           ; messages below this level log to
                                       ; neither stream nor file
format = %(asctime)s %(name)s.%(funcName):%(lineno)d
    %(levelname)s: %(message)s         ; see Python logging docs

; optional 
stream = [bool]                        ; whether or not to log to stream
stream_level = info                    ; minimum level for stream
stream_format = [some valid format]    ; format for stream messages
logfile = path                         ; file to log ouput to
logfile_level = error                  ; minimum level for file logs
logfile_format = [some valid format]   ; format for file messages

[Device] ; if not present, this section is created using user input
device_type = ci20                     ; can be any valid device type
address = 1234ABCD5678                 ; 12 (upper case) hex digits
serial_number = 1                      ; can be anything
software_version = 0.9.1               ; can be anything
name = bluetooth                       ; something recogniseable
registration_token = GXWABNMG6G        ; get from Flow Developer website

[BluetoothDevice] ; if not present, partially created with user input
address = 12:34:AB:CD:56:78            ; 6 pairs of hex digits, sep. by :
; optional
connect_timeout = 3.0                  ; time before giving up on connecting

[Presence] ; entirely optional
interval = 14.5                        ; interval in seconds between 
                                       ; presence updates,
                                       ; default for wifire boards is 15
check_period = 0.01                    ; responsiveness of publish loop
                                       ; lower is more accurate to interval