Integrating With Flowcloud
14 Sep 2015All of this so far has been fine and dandy, but you're hardly going to want to be fiddling with the Ci20 every time you want to operate a bluetooth lightbulb. Wouldn't we much rather control it from our personal computer, or perhaps a smart phone? To that end, we will now work on a way of integrating our bluetooth device with FlowCloud.
To make things as generic as possible, we'll use FlowCloud's asynchronous messaging API to send commands in xml format to the Ci20, which will interpret them and send them on to the device. We'll also provide a way for the sender to subscribe to notif/indications on a handle, and then forward those notifications on as a reply to the subscription message. Install FlowCloud like so (the sudo password for the Ci20 is 'ci20'):
pushd /etc/apt/sources.list.d/
sudo wget http://deb.flowworld.com/flowsdk.list
popd
sudo apt-get update
sudo apt-get install libflowcore-python libflowmessaging-python
Note: the Flow Python API currently only works with Python 2.7.
Basics
Lets start by writing a very basic Flow script:
#!/usr/bin/env python
# third party
from libflowcore import *
from libflowmessaging import *
FLOW_SERVER_URL = 'http://ws-uat.flowworld.com'
FLOW_SERVER_KEY = 'Ph3bY5kkU4P6vmtT'
FLOW_SERVER_SECRET = 'Sd1SVBfYtGfQvUCR'
# step 1: Initialise Flow libraries, connect to server
def initialise_flow():
"""Initialises Flow libraries and then connects the the Flow server.
Initialises LibFlowCore and LibFlowMessaging, then connects to the Flow
server specified in the config. If any of these steps fails, ensures
that LibFlowCore and LibFlowMessaging are shutdown properly.
Returns:
bool: Whether or not initialisation was successful.
"""
result = False
# Get server details
url = FLOW_SERVER_URL
key = FLOW_SERVER_KEY
secret = FLOW_SERVER_SECRET
if FlowCore_Initialise():
FlowCore_RegisterTypes()
if FlowMessaging_Initialise():
FlowMessaging_RegisterTypes()
if FlowClient_ConnectToServer(url, key, secret, 1):
result = True
else:
FlowMessaging_Shutdown()
FlowCore_Shutdown()
print('Failed to connect to FlowCloud, error code %s'
% FlowThread_GetLastError())
else:
FlowCore_Shutdown()
print('LibFlowMessaging initialisation failed, error code %s'
% FlowThread_GetLastError())
else:
print('LibFlowCore initialisation failed, error code:'
% FlowThread_GetLastError())
return result
# last step: Shut down Flow
def shutdown_flow():
"""Shuts down the Flow libaries."""
FlowMessaging_Shutdown()
FlowCore_Shutdown()
def main():
result = False
if initialise_flow():
memory_manager = FlowMemoryManager_New()
try:
# meat goes here
result = True
finally:
FlowClearMemory(memory_manager)
shutdown_flow()
return result
if __name__ == '__main__':
main()
This script does the bare minimum necessary for us to start working with Flow: intialise the libraries, connect to the Flow server, create a memory manager (although why this is necessary in Python is quite beyond me), then free the memory manager and shut down the Flow libaries. How boring! A few things to note:
We're currently using
print
to display error information - we should probably use a library like logging.We've configured our server using module level variables - it would be much nicer if we could read them from a configuration file.
You'd think the following code:
memory_manager = FlowMemoryManager_New() FlowMemoryManager_Free(memory_manager)
...would create and then free a memory manager. Hah! What this actually does is raise a
TypeError
, becauseFlowMemoryManager_Free
expects a pointer to aFlowMemoryManager
, not aFlowMemoryManager
. Pointers, as far as I'm aware, are not a thing in python. Instead, we use theFlowClearMemory
function, mentioned in the Python API documentation here and nowhere else. I hate the Python API documentation.After we initialise Flow and set up the memory manager, the meat of the main function is ensconced in a
try/finally
block, to ensure that the memory manager is freed and the libraries are shut down, even in the event of an exception.The call to main() is wrapped in an
if __name__ == '__main__':
condition. For those unfamiliar with Python, this means that main will be called if we run the script directly, but not if we import it from another script.
Logging / config set up
Let's improve things using logging and configparser. Note: docstrings that have not changed are omitted - pay attention to this if copy/pasting the code below.
#!/usr/bin/env python
# standard library
import logging
import ConfigParser
from collections import OrderedDict # Used as types for configs
import os # Used for os.linesep in default config
# third party
from libflowcore import *
from libflowmessaging import *
CONFIG_FILE = 'config.cnf' # location of configuration file
DEFAULT_CONFIG = OrderedDict(
[ ('FlowServer', OrderedDict(
[ ('url', 'http://ws-uat.flowworld.com')
, ('key', 'Ph3bY5kkU4P6vmtT')
, ('secret', 'Sd1SVBfYtGfQvUCR')
]
))
, ('Logging', OrderedDict(
[ ('level', 'info')
, ('format', ('%(asctime)s %(name)s.%(funcName)s:%(lineno)d'
+ os.linesep + '%(levelname)s: %(message)s'))
]
))
]
)
_logger = logging.getLogger(__name__)
def get_config(config_file):
"""Gets the configuration.
If `config_file` does not exist, we get the default configuration and
write a new configuration file at that path.
Args:
config_file (str|bytes): the path to the configuration file
Returns:
collections.OrderedDict: A nested dictionary {str:{str:str}} matching
the configuration in `config_file`. For example, the value of
option 'url' in section 'FlowServer' can be accessed using
returned['FlowServer']['url'].
"""
_logger.debug('Fetching config from %s' % config_file)
config = OrderedDict()
parser = ConfigParser.RawConfigParser()
try:
# get config from file
with open(config_file, 'r') as config_file_object:
parser.read(config_file_object)
except IOError as e:
if e.errno == 2: # File not found
# write default config
write_config(DEFAULT_CONFIG, config_file)
# load default config
_logger.debug('File not found, loaded default config.')
return DEFAULT_CONFIG
else: # not a clue
raise e
# copy config into dictionary
for section in parser.sections():
config[section] = OrderedDict(parser.items(section))
_logger.debug('Config fetch successful.')
return config
def write_config(config, config_file):
"""Writes the configuration to file.
Args:
config (collections.OrderedDict): the configuration to save, in the
format {str:{str:str}} ({section:{option:value}}).
config_file (str|bytes): the path to the config file.
"""
_logger.debug('Writing config to %s' % config_file)
parser = ConfigParser.RawConfigParser()
# load config into parser
for section in config.keys():
parser.add_section(section)
for option, value in config[section].items():
parser.set(section, option, value)
# write config
with open(config_file, 'w') as config_file_object:
parser.write(config_file_object)
_logger.debug('Config write successful.')
# step 1: Initialise Flow libraries, connect to server
def initialise_flow():
"""[...]"""
result = False
# fetch config
config = get_config(CONFIG_FILE)
url = config['FlowServer']['url']
key = config['FlowServer']['key']
secret = config['FlowServer']['secret']
if FlowCore_Initialise():
_logger.debug('FlowCore initialised successfully.')
FlowCore_RegisterTypes()
if FlowMessaging_Initialise():
_logger.debug('FlowMessaging initialised successfully.')
FlowMessaging_RegisterTypes()
if FlowClient_ConnectToServer(url, key, secret, 1):
_logger.debug('Connected to server at %s' % url)
result = True
else:
FlowMessaging_Shutdown()
FlowCore_Shutdown()
_logger.error('Failed to connect to server at %s, error %s'
% (url, FlowThread_GetLastError())
else:
FlowCore_Shutdown()
_logger.error('LibFlowMessaging initialisation failed, error %s'
% FlowThread_GetLastError())
else:
_logger.error('LibFlowCore initialisation failed, error %s'
% FlowThread_GetLastError())
return result
# last step: Shut down Flow
def shutdown_flow():
"""[...]"""
_logger.debug('Shutting down the Flow libraries.')
FlowMessaging_Shutdown()
FlowCore_Shutdown()
def main():
result = False
_logger.info('Initialising Flow libraries...')
if initialise_flow():
_logger.info('Initilisation successful.')
memory_manager = FlowMemoryManager_New()
try:
# meat goes here
result = True
finally:
FlowClearMemory(memory_manager)
_logger.info('Shutting down Flow...')
shutdown_flow()
return result
if __name__ == '__main__':
# set up _logger
def _logger_setup(config):
section = config['Logging']
log_level_dict = { 'debug': logging.DEBUG
, 'info': logging.INFO
, 'warning': logging.WARNING
, 'error': logging.ERROR
, 'critical': logging.CRITICAL
}
log_level = log_level_dict[section['level'].lower()]
log_format = section['format']
root = logging.getLogger()
root.setLevel(log_level)
root.handlers = []
formatter = logging.Formatter(log_format)
if section.get('logfile', None):
fh = logging.FileHandler(section['logfile'])
fh_log_level = section.get('logfile_level', None)
fh_log_level = log_level_dict[fh_log_level.lower()] \
if fh_log_level else log_level
fh.setLevel(fh_log_level)
fh_formatter = section.get('logfile_format', None)
fh_formatter = loggin.Formatter(fh_formatter) \
if fh_formatter else formatter
fh.setFormatter(fh_formatter)
root.addHandler(fh)
if section.get('stream', '1').lower() in {'1', 'yes', 'true', 'on'}:
sh = logging.StreamHandler()
sh_log_level = section.get('stream_level', None)
sh_log_level = log_level_dict[sh_log_level.lower()] \
if sh_log_level else log_level
sh.setLevel(sh_log_level)
sh_formatter = section.get('stream_format', None)
sh_formatter = loggin.Formatter(sh_formatter) \
if sh_formatter else formatter
sh.setFormatter(sh_formatter)
root.addHandler(sh)
_logger_setup(DEFAULT_CONFIG)
_logger_setup(get_config(CONFIG_FILE))
main()
That may look like a lot of code for little benefit, but it will make our
lives a lot easier later on. For instance, even though we're still defining
the default configuration in a module-level variable, we can override it by
editing the config file, which will be created in ../config.cnf
when we
first run the script. Note that we set up the logger twice, once with the
default configuration and once with the loaded configuration - this is
because we've used the logger in our get_config
function. When it is
created (if we do not create it ourselves), our config file will look like
this:
[Flow Server]
url = http://ws-uat.flowworld.com ; or some other valid url
key = Ph3bY5kkU4P6vmtT ; or some other valid key
secret = Sd1SVBfYtGfQvUCR ; or some other valid secret
[Logging]
; level may be one of [debug|info|warning|error|critical]
level = info ; messages below this level log to
; neither stream nor file
format = %(asctime)s %(name)s.%(funcName):%(lineno)d
%(levelname)s: %(message)s ; see Python logging docs
; optional
stream = bool ; whether or not to log to stream
stream_level = info ; minimum level for stream
stream_format = [some valid format] ; format for stream messages
logfile = path ; file to log ouput to
logfile_level = error ; minimum level for file logs
logfile_format = [some valid format] ; format for file messages
We'll add to this file as we find more things we might want to configure; this will do us for now.
Logging in to Flow as a device
Up until now, we've been skirting around a much larger problem: our script doesn't do anything useful or interesting! Let's fix that by logging in as a device:
#!/usr/bin/env python
# standard library
import logging
import configparser
from collections import OrderedDict # Used as type for configs
import os # Used for os.linesep in default config
# third party
from libflowcore import *
from libflowmessaging import *
...
# step 1: Initialise Flow libraries, connect to server.
def initialise_flow():
"""[...]"""
...
# step 2: Login as device
def register_device():
"""Registers (logs in as) a Flow device, based on user input.
Returns:
bool: whether or not registration was successful.
"""
result = False
device_type = raw_input('Please input device type: ')
address = raw_input('Please input address: ')
serial_number = raw_input('Please input serial number: ')
software_version = raw_input('Please input software version: ')
name = raw_input('Please input name: ')
registration_token = raw_input('Please input registration token: ')
_logger.debug('Attempting to login as device.')
if FlowClient_LoginAsDevice(
device_type, device_address, device_serial_number, None,
device_software_version, device_name, device_registration_token
):
result = True
_logger.debug('Device login successful.')
else:
_logger.error('Device login failed, error code: %d'
% FlowThread_GetLastError())
return result
...
def main():
result = False
_logger.info('Initialising Flow libraries...')
if initialise_flow():
_logger.info('Initilisation successful.')
memory_manager = FlowMemoryManager_New()
try:
if register_device():
_logger.info('Logged in as device.')
# more meat please
result = True
finally:
FlowClearMemory(memory_manager)
_logger.info('Shutting down Flow...')
shutdown_flow()
return result
if __name__ == '__main__':
...
Now when we run the script, we'll be prompted for various bits of device information, and if they're all valid, we'll be told the login was successful. Progress! You'll need to set your device type here (you'll probably want ci20), and get a corresponding registration token. Set the address parameter to the MAC address of the peripheral you're using, sans colons, with letters in upper case. The other parameters can be set arbitrarily, as long as you're consistent.
Wouldn't it be easier not to have to specify these by hand every time we run the script? I'm glad you agree. Let's build them into the config.
...
# step 2: Login as device.
def register_device()
"""
[...]
"""
result = False
write_needed = False
config = get_config(CONFIG_FILE)
# ensure there's a Device section in the config
if not config.get('Device', None):
config['Device'] = OrderedDict()
parameters = ['device_type', 'address', 'serial_number',
'software_version', 'name', 'registration_token']
for p in parameters:
if not config['Device'].get(p, None):
write_needed = True
config['Device'][p] = raw_input('Please input %s: '
% p.replace('_', ' '))
args = [config['Device'][p] for p in parameters]
args.insert(3, None)
_logger.debug('Attempting to login as device.')
if FlowClient_LoginAsDevice(*args):
result = True
_logger.debug('Device login successful.')
if write_needed and \
raw_input('Save device details? [Y/n] ').lower() in {'', 'y', 'yes'}:
write_config(config, CONFIG_FILE)
_logger.debug('Wrote new device parameters to config file.')
else:
_logger.error('Device login failed, error code: %d'
% FlowThread_GetLastError())
return result
...
Now the first time we run the script, if the device login is successful, it
will add the device parameters to the config file. Note that we're now passing
FlowClient_LoginAsDevice
its arguments by building a list (args
) and
then unpacking it in the function call - this is
just for convenience, as it means we don't have to assign variable names to
each parameter. The rest of this code should be self-explanatory; remember
that dict.get(arg1, arg2)
attempts to use arg1
as a key to the
dictionary, and returns arg2
if it fails.
One other thing - we're logging in as a device, yet our function is called
register_device
. Why is this? In FlowCloud, the two are one and the same.
FlowCloud registration is idempotent. Regardless of the number of times a
device (re)registers, it will always return the same ID. Thus, we don't
need seperate functions for registering and logging in.
Listening for messages
Now that we've logged in as a device, we have a wealth of options open to us! We want to be able to receive and respond to asyncronous messages, so let's add a function to start the message listener:
...
# step 2: Login as device.
def register_device():
"""[...]"""
...
# step 3: Start the message listener.
def start_message_listener(callback):
"""Starts the message listener in a new thread.
Args:
callback (f(FlowMessagingMessage)): A callback function, which will
be passed each received message.
"""
_logger.debug('Starting message listener with callback %s'
% callback.__name__)
Flow_SetMessageReceivedListenerForDevice(callback)
def print_message(message):
"""Prints the contents of a message to stdout.
Args:
message (FlowMessagingMessage): The message to print.
"""
print(FlowMessagingMessage_GetContent(message))
...
def main():
result = False
_logger.info('Initialising Flow libraries...')
if initialise_flow():
_logger.info('Initilisation successful.')
memory_manager = FlowMemoryManager_New()
try:
if register_device():
_logger.info('Logged in as device.')
_logger.info('Starting message listener...')
start_message_listener(print_message)
raw_input('Press enter to stop waiting:' + os.linesep)
result = True
finally:
FlowClearMemory(memory_manager)
_logger.info('Shutting down Flow...')
shutdown_flow()
return result
if __name__ == '__main__':
...
That was easy! Now when we run the script, it listens for any messages sent to the registered device. Unfortunately, nothing is sending any messages to the device, so we can't check that it's working yet. One way we can fix that is by setting up device presence publishing information, so that we can send messages to the device using the MakeItFlow app. We'll have to pretend the Ci20 is a WiFire board, but I'm sure no one will mind. Note: if you don't have a smart phone, or other device capable of running the MakeItFlow app, feel free to skip this next bit.
Create a new file called presence.xml in the same directory as the python script, and fill it with the following:
<?xml version="1.0" encoding="UTF-8"?>
<presence xmlns="urn:ietf:params:xml:ns:pidf"
xmlns:wifire_starterapp="com.imgtec.flow"
entity="sip:%(address)s@sip-uat.flowworld.com">
<tuple id="standard">
<status>
<basic>open</basic>
</status>
</tuple>
<tuple id="presence-info">
<wifire_starterapp:pres-info>
<datetime>%(datetime)s</datetime>
</wifire_starterapp:pres-info>
</tuple>
<tuple id="network-status">
<wifire_starterapp:net-status>
<wifire_starterapp:ssid>IMG-Demo</wifire_starterapp:ssid>
<wifire_starterapp:state>connected</wifire_starterapp:state>
<wifire_starterapp:rssi_dbm>-100</wifire_starterapp:rssi_dbm>
</wifire_starterapp:net-status>
</tuple>
<tuple id="board-health">
<wifire_starterapp:mrf24w-status>
<wifire_starterapp:status>ok</wifire_starterapp:status>
</wifire_starterapp:mrf24w-status>
<wifire_starterapp:wifire-status>
<wifire_starterapp:status>ok</wifire_starterapp:status>
<wifire_starterapp:uptime>0d:84h:23m:10s</wifire_starterapp:uptime>
</wifire_starterapp:wifire-status>
</tuple>
</presence>
Now define a new function in the python script like so:
#!/usr/bin/env python
# standard library
import logging
import configparser
from collections import OrderedDict # Used as type for configs
import os # Used for os.linesep
import datetime # Used in device presence info
import time # Used to publish periodically
import threading # Used to publish in the background
# third party
from libflowcore import *
from libflowmessaging import *
CONFIG_FILE = 'config.cnf' # location of configuration file
DEFAULT_INTERVAL = '14.5'
DEFAULT_CHECK_PERIOD = '0.01'
DEFAULT_CONFIG = OrderedDict(
[ ('FlowServer', OrderedDict(
[ ('url', 'http://ws-uat.flowworld.com')
, ('key', 'Ph3bY5kkU4P6vmtT')
, ('secret', 'Sd1SVBfYtGfQvUCR')
]
))
, ('Logging', OrderedDict(
[ ('level', 'info')
, ('format', ('%(asctime)s %(name)s.%(funcName)s:%(lineno)d'
+ os.linesep + '%(levelname)s: %(message)s'))
]
))
]
)
...
# step 2: Login as device.
def register_device():
"""[...]"""
...
# step 2.5: Publish device presence information for MakeItFlow app
def publish_device_presence(memory_manager):
"""Publishes the device presence so the MakeItFlow app will send messages
to our listener.
Args:
memory_manager (FlowMemoryManager):
Returns:
bool: Whether the publishing was successful
"""
_logger.debug('Attempting to publish device presence...')
result = False
this_device = FlowClient_GetLoggedInDevice(memory_manager)
address = FlowDevice_GetDeviceID(this_device)
datetime_ = datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ')
with open('presence.xml', 'r') as presence_file:
presence_info = presence_file.read()
# substitute address/datetime into presence.xml
presence_info %= {'address': address, 'datetime': datetime_}
if FlowMessaging_Publish(
FLOW_MESSAGING_TOPIC_DEVICE_PRESENCE, None, presence_info,
len(presence_info), 300
):
_logger.debug('Published device presence.')
result = True
else:
_logger.error('Could not publish device presence, error %d'
% FlowThread_GetLastError())
return result
...
def main():
result = False
_logger.info('Initialising Flow libraries...')
if initialise_flow():
_logger.info('Initilisation successful.')
memory_manager = FlowMemoryManager_New()
try:
if register_device():
_logger.info('Logged in as device.')
# Periodically publish device presence
_logger.info('Starting device presence publisher thread...')
section = get_config(CONFIG_FILE).get('Presence',OrderedDict())
interval = float(section.get('interval', DEFAULT_INTERVAL))
check_period = float(section.get('check_period',
DEFAULT_CHECK_PERIOD))
def presence_loop():
i = interval + 1
while publish:
if i > interval:
publish_device_presence(memory_manager)
i = 0
time.sleep(check_period)
i += check_period
presence_thread = threading.Thread(target=presence_loop)
presence_thread.start()
# Listen for messages
_logger.info('Starting message listener...')
start_message_listener(print_message)
raw_input('Press enter to stop waiting:' + os.linesep)
finally:
result = True
time.sleep(check_period)
FlowClearMemory(memory_manager)
_logger.info('Shutting down Flow...')
shutdown_flow()
return result
if __name__ == '__main__':
...
Note: In main
, we check for another (optional) section in the config file,
where interval
and check_period
can be set - interval is the interval
between device presence updates, and check_period is the time between each
check of the publish variable to see if the presence thread should stop. We
must make sure that the presence thread stops before we free the memory
manager and shut down Flow, or we get a segmentation fault.
Now download the MakeItFlow app (iOS, Android) onto your smart phone, fire it up and log in. Run the script - your Ci20 (whatever you named it) should appear in the Connected Devices list of the MakeItFlow app with a green cloud next to it. Select it and press 'Interact with selected', then send a "CLEAR LED" message. Your python script should receive the message and print something like the following:
<?xml version="1.0" encoding="UTF-8"?><command><sent type="datetime">2015-09-15
T12:14:15Z</sent><to>sip:11c53549-6d4e-43a1-ae50-aee022c30326@sip-uat.flowworld
.com</to><from>sip:blaine.rogers.imgtec.com@sip-uat.flowworld.com</from><client
id type="integer">2c7bf87a-3e7a-4aba-bdb2-5d4272d82c64</clientid type="integer"
><requestid type="integer">1</requestid type="integer"><details>CLEAR LED</deta
ils></command>
For testing purposes, it may also be handy to send messages from the
command line. For this, we can borrow
ex-async-message-send-user2device.py
from the
FlowCloud messaging howtos. When it asks for a
Device ID, look it up on this page. If I use
ex-async-message-send-user2device.py
to send a message, it appears in our
script's output as
hello hi
If you're planning on using ex-async-message-send-user2device.py
, replace
the SendAsyncMessageToDevice
function with the following:
def SendAsyncMessageToDevice():
import readline
result = False
memory_manager = FlowMemoryManager_New()
if memory_manager:
try:
loggedInUser = FlowClient_GetLoggedInUser(memory_manager)
my_devices = FlowUser_RetrieveOwnedDevices(loggedInUser, 20)
if my_devices != None and FlowDevices_GetCount(my_devices) != 0:
print("-----------------------\n"
"Please select a device:")
for i in range(FlowDevices_GetCount(my_devices)):
device = FlowDevices_GetItem(my_devices, i)
print(" %d: %s" % (i, FlowDevice_GetDeviceName(device)))
index = int(raw_input("\nYour choice: "))
target = FlowDevices_GetItem(my_devices, index)
print("-----------------------")
if target:
deviceID = FlowDevice_GetDeviceID(target)
sendAgain = True
while sendAgain:
message = raw_input("\n\rPlease type a message: ")
Flow_SendMessageToDeviceAsync(
deviceID, "text/plain", message, len(message),
100, MessageSendAsyncCallBack
)
result = True
if raw_input("\n\rSend another message Y/n?").lower() \
in {'n', 'no'}:
sendAgain = False
else:
print('\n\rCouldn\'t fetch device with that index!')
else:
print('\n\rNo devices found! :(')
finally:
FlowClearMemory(memory_manager)
return result
This will automatically fetch a list of valid devices from the server and let you select on by its index, meaning you don't have to copy-paste your device ID in each time you run it. It will also let you type in a message, rather than always sending "hi hello".
Acting on messages
We can now send messages to the Ci20! As yet, it doesn't do much when it receives a message; it just prints the contents to stdout. Let's do what we came here for, and use the messages we receive as instructions for communicating with a BTLE peripheral, using the library we built in the previous post. Before we start writing code, let's list the things we want to be able to do, then define an xml schema to do it.
- Write data to a handle.
- Read data from a handle.
- Subscribe to notif/indicaitons on a handle.
<!--Sender to Device-->
<?xml version="1.0" encoding="UTF-8"?>
<bluetooth-le>
<!--one or more of any of the following-->
<write handle=hex value=bytes/>
<read handle=hex/>
<subscribe handle=hex type=["notification"|"indication"|"both"]/>
</bluetooth-le>
<!--Device reply on notification-->
<?xml version="1.0" encoding="UTF-8"?>
<bluetooth-le>
<!--one or more of any of the following-->
<read-success handle=hex value=bytes/>
<notification handle=hex value=bytes/>
</bluetooth-le>
Where hex
and bytes
are strings of the form "[0-9A-Fa-f]{1,4}"
and
"([0-9A-Fa-f]{2} ?)*[0-9A-Fa-f]{2}"
respectively; that is, hex
is a
string of up to four hex digits, and bytes
is a string of pairs of hex
digits.
type | valid | invalid |
---|---|---|
hex |
"0" |
"" |
"0043" |
"-y" |
|
"fFaE" |
"01234" |
|
bytes |
"00" |
"" |
"12 ab 2A" |
"000 00 00 0" |
|
"F12e D34c B5" |
"12 23 34 " |
Now we're ready to improve our messaging callback:
#!/usr/bin/env python
# standard library
import logging
import configparser
from collections import OrderedDict # type for configs
import os # for os.linesep
import datetime # for device presence info
import time # publish periodically
import threading # publish in the background
import xml.etree.ElementTree as ET # parse xml data from messages
import re # check handle/value formats
import partial # construct callbacks
import getpass # get sudo password without echoing
# third party
from libflowcore import *
from libflowmessaging import *
# package
from . import bluetooth
CONFIG_FILE = 'config.cnf' # location of configuration file
DEFAULT_INTERVAL = '14.5'
DEFAULT_CHECK_PERIOD = '0.01'
DEFAULT_CONNECT_TIMEOUT = '3.0'
DEFAULT_CONFIG = OrderedDict(
[ ('FlowServer', OrderedDict(
[ ('url', 'http://ws-uat.flowworld.com')
, ('key', 'Ph3bY5kkU4P6vmtT')
, ('secret', 'Sd1SVBfYtGfQvUCR')
]
))
, ('Logging', OrderedDict(
[ ('level', 'info')
, ('format', ('%(asctime)s %(name)s.%(funcName)s:%(lineno)d'
+ os.linesep + '%(levelname)s: %(message)s'))
]
))
]
)
_logger = logging.getLogger(__name__)
class _SenderInfo(object):
"""Wrapper for sender information.
Attributes:
id (FlowID): The FlowID of the sender.
func (f(FlowID, str, str, int, int, f(bool))): The function to use
when replying to this sender.
"""
def __init__(self, type_, id_):
"""Initialises the object.
Args:
type_ (str): 'device' or 'user' only - the type of the sender.
id_ (FlowId|str): The FlowID of the sender.
Raises:
ValueError: If `type_` is neither 'user' nor 'device'.
"""
self.id = id_
if type_ == 'device':
self.func = Flow_SendMessageToDeviceAsync
elif type_ == 'user':
self.func = Flow_SendMessageToUserAsync
else:
raise ValueError("type_ must be 'user' or 'device'")
@classmethod
def from_message(cls, message):
"""Retrieves sender information from a FlowMessagingMessage.
Args:
message (FlowMessagingMessage): The message to inspect.
Raises:
ValueError: In the highly unlikely event that `message` has
neither a user nor a device ID.
"""
user_id = FlowMessageMessage_GetSenderUserID(message)
device_id = FlowMessagingMessage_GetSenderDeviceID(message)
if user_id: return cls('user', user_id)
elif device_id: return cls('device', device_id)
else: raise ValueError('message sent by neither user nor device??')
def send_reply(self, reply_text, callback=None):
"""Sends a text message to self.id
Args:
reply_text (str): The text to send.
callback (f(bool)|None): The callback for the send function. If
None, self.message_callback will be used.
"""
if callback == None: callback = self.message_callback
self.func(
self.id, 'text/plain', reply_text,
len(reply_text), 100, callback
)
@staticmethod
def message_callback(result):
"""Logs whether the message was sent successfully.
Args:
result (bool): Whether the message was sent successfully.
"""
if result:
_logger.info('Message sent successfully.')
else:
_logger.error('Message not sent, error %d'
% FlowThread_GetLastError())
...
#step 3: Start the message listener
def start_message_listener(callback):
"""[...]"""
...
# step 4: interpret received message
def interpret_btle_message(device, message):
"""Reads an xml-formatted message and executes bluetooth commands therein.
The supported xml format is as follows:
<?xml version="1.0" encoding="UTF-8"?>
<bluetooth-le>
<!--one or more of any of the following-->
<write handle=hex value=bytes/>
<read handle=hex/>
<subscribe handle=hex type=["notification"|"indication"|"both"]/>
</bluetooth-le>
Where `hex` matches /"[0-9A-Fa-f]{1,4}"/
and `bytes` matches /"([0-9A-Fa-f]{2} ?)*[0-9A-Fa-f]{2}"/
Args:
message (FlowMessagingMessage): The message to read.
device (bluetooth.BTLEDevice): The device to send decoded commands to.
"""
def try_handle_convert(handle_str):
"""Converts a string of hex digits into an int.
Args:
handle_str (str): The string to try to convert.
Returns:
int or None: None if conversion failed or the handle didn't match
the right pattern, else the converted int.
"""
if not re.match(r'^[0-9A-Fa-f]{1,4}$', handle_str):
return None
return int(handle_str, 16)
def try_value_convert(value_str):
"""Converts a string of pairs of hex digits into a bytearray.
Args:
value_str (str): The value to try to convert.
Returns:
bytearray or None: None if the conversion failed or the value
didn't match the right pattern, else the converted bytearray.
"""
if not re.match(r'^([0-9A-Fa-f]{2} ?)*[0-9A-Fa-f]{2}$', value_str):
return None
return bytearray.fromhex(value_str)
content = FlowMessagingMessage_GetContent(message)
_logger.debug('Received message: %s' % content)
try:
root = ET.fromstring(content)
except ET.ParseError as e:
_logger.error('could not parse message %s: %s'
% (content, str(e)))
return
if root.tag == 'bluetooth-le':
for child in root:
if child.tag == 'write':
handle = try_handle_convert(child.get('handle', ''))
value = try_value_convert(child.get('value', ''))
if handle and value:
device.char_write(handle, value, False)
value_str = ''.join(['%02x' % b for b in value])
_logger.info('Wrote %s to handle %04x'
% (value_str, handle))
else:
_logger.error('Could not parse handle/value for write')
elif child.tag == 'read':
handle = try_handle_convert(child.get('handle', ''))
if handle:
try:
value = device.char_read_hnd(handle)
send_read_success(message, handle, value)
except bluetooth.NotificationTimeout:
_logger.error('Could not read from handle %04x'
% handle)
else:
_logger.error('Could not parse handle for read')
elif child.tag == 'subscribe':
handle = try_handle_convert(child.get('handle', ''))
type_ = child.get('type', None)
type__list = ['notification', 'indication', 'both']
if handle and type_ in type__list:
try:
info = _SenderInfo.from_message(message)
callback = partial(_send_notification, info)
device.subscribe(
handle, callback=callback,
type_=type__list.index(type_)
)
except bluetooth.NoResponseError:
type__desc = 'notifications and indications' \
if type_ == 'both' else type_ + 's'
_logger.info('subscribed to %s on %04x'
% (type__desc, handle))
except bluetooth.NoResponseError:
_logger.error('Could not subscribe to handle %04x'
% handle)
else:
_logger.error('Could not parse handle/type for subscribe')
else:
_logger.error('Unsupported operation %s' % child.tag)
def send_read_success(message, handle, value):
"""Sends a read-success message to the sender of `message`.
The message will be in the following format:
<?xml version="1.0" encoding="UTF-8"?>
<bluetooth-le>
<read-success handle=hex value=bytes/>
</bluetooth-le>
Where `hex` matches /"[0-9A-Fa-f]{1,4}"/
and `bytes` matches /"([0-9A-Fa-f]{2} ?)*[0-9A-Fa-f]{2}"/
Args:
message (FlowMessagingMessage): The message to reply to.
handle (int): The handle to send.
value (bytearray): The value to send.
"""
value_str = ''.join(['%02x' % b for b in value])
_logger.debug('Sending read success message, handle %04x value %s'
% (handle, value_str))
reply_text = ('<?xml version="1.0" encoding="UTF-8"?>'
'<bluetooth-le/>'
'<read-success handle="%04x" value="%s"/>'
'</bluetooth-le>') % (handle, value_str)
reply_to_message(message, reply_text)
def _send_notification(sender_info, handle, value):
"""Sends a notification message to the subscriber at `sender_info`.
The message will be in the following format:
<?xml version="1.0" encoding="UTF-8"?>
<bluetooth-le>
<notification handle=hex value=bytes/>
</bluetooth-le>
Where `hex` matches /"[0-9A-Fa-f]{1,4}"/
and `bytes` matches /"([0-9A-Fa-f]{2} ?)*[0-9A-Fa-f]{2}"/
Args:
sender_info (_SenderInfo): Details of the subscriber to send
the message to.
handle (int): The handle to send.
value (bytearray): The value to send.
"""
value_str = ''.join(['%02x' % b for b in value])
_logger.debug('Sending notification message, handle %04x value %s'
% (handle, value_str))
reply_text = ('<?xml version="1.0" encoding="UTF-8"?>'
'<bluetooth-le/>'
'<notification handle="%04x" value="%s"/>'
'</bluetooth-le>') % (handle, value_str)
sender_info.send_reply(reply_text)
def was_message_successful(result):
"""Logs whether a message was sent successfully.
Args:
result (bool): True iff the reply was successfully sent.
"""
if result:
_logger.info('Message sent successfully.')
else:
_logger.error('Message not sent, error %d' % FlowThread_GetLastError())
# step 5: reply to message if necessary
def reply_to_message(message, reply_text, callback=was_message_successful):
"""Sends a message containing `reply_text` to the sender of `message`.
Args:
message (FlowMessagingMessage): The message to reply to.
reply_text (str): Sent as the text of the reply.
callback (f(bool)): Passed the result of Flow_ReplyToMessageAsyc,
i.e. a boolean that is True iff the reply was sent successfully.
"""
_logger.info('Sending message %s' % reply_text)
Flow_ReplyToMessageAsync(
message, 'text/plain', reply_text, len(reply_text), 100, callback
)
...
def main():
result = False
_logger.info('Initialising Flow libraries...')
if initialise_flow():
_logger.info('Initilisation successful.')
memory_manager = FlowMemoryManager_New()
try:
if register_device():
_logger.info('Logged in as device.')
# Fetch config
config = get_config(CONFIG_FILE)
# get info for presence publishing loop
# check_period must be defined before exceptions are raised
section = config.get('Presence',OrderedDict())
interval = float(section.get('interval', DEFAULT_INTERVAL))
check_period = float(section.get('check_period',
DEFAULT_CHECK_PERIOD))
publish = True
def presence_loop():
i = interval + 1
while publish:
if i > interval:
publish_device_presence(memory_manager)
i = 0
time.sleep(check_period)
i += check_period
presence_thread = threading.Thread(target=presence_loop)
## Set up BTLEDevice
sudo_pass = getpass.getpass('sudo password: ') \
if not raw_input('Are you root? [y/N]: ').lower() \
in {'y', 'yes'} else None
# Reset the bluetooth controller so the scan doesn't fail
_logger.info('Resetting the bluetooth controller...')
bluetooth.reset_bluetooth_controller(sudo_pass)
# Run a scan so gatttool doesn't throw a hissy fit
_logger.info('Running an LE scan...')
bluetooth.le_scan(sudo_pass, timeout=1)
del sudo_pass
write_needed = False
if not config.get('BluetoothDevice', None):
config['BluetoothDevice'] = OrderedDict()
write_needed = True
section = config['BluetoothDevice']
if not section.get('address', None):
section['address'] = \
raw_input('Please input BTLE MAC address: ')
write_needed = True
with bluetooth.BTLEDevice(section['address']) as device:xi
device.connect(float(
section.get('connect_timeout', DEFAULT_CONNECT_TIMEOUT)
))
# connect was succesful, write good config
if write_needed and raw_input('Save config? [Y/n]')\
.lower() in {'', 'y', 'yes'}:
write_config(config, CONFIG_FILE)
# Start presence thread
_logger.info('Starting device presence thread...')
presence_thread.start()
# Listen for messages
_logger.info('Starting message listener...')
callback = partial(interpret_btle_message, device)
callback.__name__ = \
'partial(interpret_btle_message, device)'
start_message_listener(callback)
raw_input('Press enter to stop waiting:' + os.linesep)
finally:
result = True
time.sleep(check_period)
FlowClearMemory(memory_manager)
_logger.info('Shutting down Flow...')
shutdown_flow()
return result
if __name__ == '__main__':
...
That is a lot of code. Let's go through what it all does and why it's necessary.
interpret_btle_message
:
This is the function that is called whenever our device receives a message.
The two nested functions, try_handle_convert
and try_value_convert
, are
short and boring - they just check whether a handle or value matches our
specification in terms of what a hand or value should look like, and if so,
converts them into int
and bytearray
respectively. They exist solely to
reduce repetition later on.
We start by extracting the content from the message, then trying to parse
it as xml. If we can't parse it, we log an error and return early.
Otherwise, we check if the root node is tagged with bluetooth-le
, as per
our specification, and if it is, we attempt to execute each of its children
as bluetooth commands.
The process for executing a command is similar for each command - we check
the tag of child to find which command we should attempt, then extract and
check the appropriate attributes. If we find the attributes and they're
valid, we execute a command using one of the methods of the device,
checking for any exceptions that might be thrown. If we're succesful, we
log a message indicating as such (for read
, this is handled in
send_read_success
).
send_read_success
:
This function is called on a successful read - it could be moved into the
body of interpret_btle_message
, but (a) that function is long enough
already, (b) we'd rather not specify xml five levels of indentation
deep, which would make it difficult to track down if it needed changing and
(c) it's possible we might want to send a read_success
from other places,
e.g. if we improved the specification to allow reading by UUID.
Besides, it sits nicely next to _send_notification
, a function with a very
similar purpose which can't be moved into interpret_btle_message
because
it's only called as the callback to device.subscribe
.
The actual code of this function is self-explanatory.
_send_notification
:
This function is used as the callback to device.subscribe
(or rather, a
partial application of it is). Originally, this function was identical
to send_read_success
, with read-success
changed to notification
in
the xml. It took a FlowMessagingMessage
as its first parameter, which was
passed to it using partial in interpret_btle_message
, and passed that
message on to reply_to_message
. However, while writing the code I
discovered that attempting to reply to the stored message caused a
segmentation fault. Investigating further, I discovered that the stored
message, when accessed in send_notificaiton
, had no content nor sender
information nor any other attributes. My hypothesis is this:
interpret_btle_device
is called from somewhere in the Flow libary, and is
passed a message. Mid-function, the message is passed away to the
BTLEDevice
as part of a partial function, where a refrence to it is
saved. Then control returns to interpret_btle_device
, which exits and
returns control to Flow. Flow then deletes the message as part of its
cleanup. Because the Flow library is actually written in C, not Python,
rather than just decrementing the reference count for the object and
allowing it to continue to exist elsewhere, this actually deletes the
object, so all references to it stop working. Frustrating!
We solve this by extracting the address of the sender while we still have
the message, and passing that to _send_notification
, rather than the
message as a whole. We do the with the help of...
_SenderInfo
:
This is a wrapper class for information about the sender of a message. We
only need this because Flow makes a distinction between device and user
IDs, which we don't want to differentiate between - we just want to reply
to whatever sent a message. This class lets us bundle the ID of the sender
with the appropriate function for replying to them. Note the second
constructor from_message
- we use this in the elif child.tag == 'subscribe'
section of interpret_btle_message
for convenience.
This class is private (leading underscore) at the moment, because it's just
acting as a helper for _send_notification
. _send_notification
is private
because it uses a private class in its parameters.
reply_to_message
:
This function is fairly self-explanatory, what with it being two lines long. It exists to seperate the Flow bit of replying to a message from the BTLE bit (constructing xml etc) of replying to a message.
main
:
Deep in the heart of main
, we've added code to set up a BTLE device,
between fetching the config for the presence loop and the actual starting
of the presence thread. We begin by asking the user if they need a sudo
password, and if so fetching it from them using getpass
. Note: we use
getpass
instead of raw_input
because getpass
won't echo to stdout.
We then reset the bluetooth adapter and run an LE scan. This shouldn't be
necessary, but if we don't do it gatttool will fall over and die. Then, we
fetch some more settings from the config: this is the same procedure we
used in device registration, but now the section is 'BluetoothDevice' and
the parameter we'll fetch and then optionally save is the MAC address of
the bluetooth device. We then connect to the device, using a with
block
so we don't forget to close the connection, and if we succesfully connect,
ask to save the config. We then start the presence thread and the message
listener, but this time we pass the message listener a partial
application of interpret_btle_message
, in order that we can pass commands
to the device without the help of, say, global variables.
Further improvements
Now, boot up the MakeItFlow app and send a message! If the script errors because it can't parse the xml, this is expected behaviour. Set the logging level to 'debug' in the config file and run it again, then look closely at the xml being sent by the MakeItFlow app.
...</from><clientid type="integer">2c7bf87a-3e7a-4aba-bdb2-5d4272d82c64</clien
tid type="integer" ><requestid type="integer">1</requestid type="integer"><deta
ils>...
Look at that! Attributes in the end-tags! For shame, MakeItFlow developers. If we want to recieve messages from the MakeItFlow app, we're going to have to find some way around this.
# step 4: interpret received message
def interpret_btle_message(device, message):
"""[...]"""
...
def check_for_MakeItFlow(content):
"""Checks for a message from the MakeItFlow app.
Args:
content: The contentof the message to check.
Returns:
ET or None: The children of the details element of the message,
if they could be parsed.
"""
# check for MakeItFlow
regexp = r'^<\?xml.*\?><command>.*<details>(.*)</details></command>$'
match = re.search(regexp, content)
if match: # probably MakeItFlow
_logger.debug('Found MakeItFlow command.')
try:
return ET.fromstring(match.group(1).lower())
except ET.ParseError as e2:
_logger.error('could not parse message %s: %s'
% (match.group(1).lower(), str(e2)))
return None
content = FlowMessagingMessage_GetContent(message)
_logger.debug('Received message: %s' % content)
try:
root = ET.fromstring(content)
except ET.ParseError as e:
# check for MakeItFlow
root = check_for_MakeItFlow(content)
if root == None:
_logger.error('could not parse message %s: %s'
% (content, str(e)))
return
# in case the MakeItFlow developers ever repair their xml
if root.tag == 'command':
root1 = check_for_MakeItFlow(content)
if root1:
root = root1
if root.tag == 'bluetooth-le':
...
There! Now we can send from the MakeItFlow app, and it should be received just fine. Note: A quirk of MakeItFlow is that it sends commands in all caps, which is why we parse match.group(1).lower() instead of match.group(1). Of course, typing xml on a mobile device is very awkward, so you might want to implement another way to control the bluetooth device. For example, if the device is a smart bulb, sending "ON" from the MakeItFlow app might switch the bulb on, and sending "OFF" might switch it off. The implementation of such a scheme is fairly easy, but is beyond the scope of this tutorial.
One final improvement we can make is to do away with the reply_to_message
function and just use _SenderInfo.send_reply
. When we do that, we should
probably remove the leading _
that indicates that SenderInfo
is
private:
...
class SenderInfo(object): # note the lack of leading _
"""Wrapper for sender information.
Attributes:
id (FlowID): The FlowID of the sender.
func (f(FlowID, str, str, int, int, f(bool))): The function to use
when replying to this sender.
"""
...
...
# step 4: interpret received message
def interpret_btle_message(device, message):
"""[...]"""
...
if root.tag == 'bluetooth-le':
for child in root:
if child.tag == 'write':
...
elif child.tag == 'read':
handle = try_handle_convert(child.get('handle', ''))
if handle:
try:
value = device.char_read_hnd(handle)
### pass sender info, not message ###
info = SenderInfo.from_message(message)
send_read_success(info, handle, value)
except bluetooth.NotificationTimeout:
_logger.error('Could not read from handle %04x'
% handle)
else:
_logger.error('Could not parse handle for read')
elif child.tag == 'subscribe':
...
if handle and type_ in type__list:
try:
### remove leading _s ###
info = SenderInfo.from_message(message)
callback = partial(send_notification, info)
device.subscribe(
handle, callback=callback,
type_=type__list.index(type_)
)
...
except bluetooth.NoResponseError:
...
def send_read_success(sender_info, handle, value):
"""Sends a read-success message to `sender_info`.
...
Args:
sender_info (SenderInfo): The info of the user/device to send to.
handle (int): The handle to send.
value (bytearray): The value to send.
"""
...
# reply_to_message(message, reply_text)
sender_info.send_reply(reply_text)
def send_notification(subscriber_info, handle, value):
# Note how we've removed the leading _ from SenderInfo in the docstring!
"""Sends a notification message to the subscriber at `sender_info`.
...
Args:
sender_info (SenderInfo): ...
...
"""
...
#Don't need these any more, delete them
#def was_message_successful(result):
#def reply_to_message(message, reply_text, callback=was_message_successful):
...
And with that, we're done! You can find a full code listing for this post in the github repository for this blog.
Appendix A: Configuration file specification
[Flow Server] ; compulsory
url = http://ws-uat.flowworld.com ; or some other valid url
key = Ph3bY5kkU4P6vmtT ; or some other valid key
secret = Sd1SVBfYtGfQvUCR ; or some other valid secret
[Logging] ; compulsory
; level may be one of [debug|info|warning|error|critical]
level = info ; messages below this level log to
; neither stream nor file
format = %(asctime)s %(name)s.%(funcName):%(lineno)d
%(levelname)s: %(message)s ; see Python logging docs
; optional
stream = [bool] ; whether or not to log to stream
stream_level = info ; minimum level for stream
stream_format = [some valid format] ; format for stream messages
logfile = path ; file to log ouput to
logfile_level = error ; minimum level for file logs
logfile_format = [some valid format] ; format for file messages
[Device] ; if not present, this section is created using user input
device_type = ci20 ; can be any valid device type
address = 1234ABCD5678 ; 12 (upper case) hex digits
serial_number = 1 ; can be anything
software_version = 0.9.1 ; can be anything
name = bluetooth ; something recogniseable
registration_token = GXWABNMG6G ; get from Flow Developer website
[BluetoothDevice] ; if not present, partially created with user input
address = 12:34:AB:CD:56:78 ; 6 pairs of hex digits, sep. by :
; optional
connect_timeout = 3.0 ; time before giving up on connecting
[Presence] ; entirely optional
interval = 14.5 ; interval in seconds between
; presence updates,
; default for wifire boards is 15
check_period = 0.01 ; responsiveness of publish loop
; lower is more accurate to interval