Bluetooth LE in Python using pexpect and gatttool
10 Sep 2015Once you've extablished control of the peripheral using gatttool's interactive mode, the next step is to establish a way of controlling it programmatically. Here, we accomplish this using pexpect, a library for controlling command line programs in Python. Make sure you have pexpect installed on the Ci20:
ci20@ci20:~$ sudo apt-get install python-pexpect
Let's start by listing what we want to be able to do from within Python:
- Connect to a device.
- Write to characteristics by their handle.
- Read from characteristics by their handle.
- Listen for notifications / indications on a handle.
We'll start by outlining a class to represent a bluetooth peripheral, with methods that will do those tasks. We'll leave the bodies blank for now:
class BLEDevice():
"""Represents a bluetooth peripheral"""
DEFAULT_CONNECT_TIMEOUT = 3.0
def __init__(self, mac_address, hci_device='hci0'):
"""Initialises the device.
Sets up threading for the notification listener and starts the
gatttool session.
Args:
mac_address (str): The mac address of the BLE device to connect
to in the format "XX:XX:XX:XX:XX:XX"
hci_device (str): The hci device to use with gatttool
Raises:
pexpect.TIMEOUT: If, for some reason, pexpect fails to spawn a
gatttool instance (e.g. you don't have gatttool installed).
"""
pass
def char_write(self, handle, value, wait_for_response=False):
"""Writes a value to a given characteristic handle.
Args:
handle (int): The handle to write to
value (bytearray): The value to write
wait_for_response (bool): If true, waits for a response from
the peripheral to check that the value was written succesfully.
Raises:
NotConnectedError: If no connection to the device has been
established.
NoResponseError: If `wait_for_response` is True and no response
was received from the peripheral.
"""
pass
def char_read_hnd(self, handle):
"""Reads a characteristic by handle.
Args:
handle (int): The handle of the characteristic to read.
Returns:
bytearray: The value of the characteristic.
Raises:
NotConnectedError: If no connection to the device has been
established.
NotificationTimeout: If the device is connected, but reading
fails for another reason.
"""
pass
def connect(self, timeout=DEFAULT_CONNECT_TIMEOUT):
"""Established a connection with the device.
If connection fails, try running an LE scan first.
Args:
timeout (numeric): Time in seconds to wait before giving up on
trying to connect.
Raises:
NotConnectedError: If connection to the device fails.
"""
pass
def run(self):
"""Listens for notifications."""
pass
def stop(self):
"""Stop the gatttool instance and listener thread."""
pass
def subscribe(self, handle, callback=None, type_=0):
"""Subscribes to notification/indiciatons from a characteristic.
This is achieved by writing to the control handle, which is assumed
to be `handle`+1. If indications are requested and we are already
subscribed to notifications (or vice versa), we write 0300
(signifying we want to enable both). Otherwise, we write 0100 for
notifications or 0200 for indications.
Args:
handle (int): The handle to listen for.
callback (f(int, bytearray)): A function that will be called
when the notif/indication is received. When called, it will be
passed the handle and value.
type_ (int): If 0, requests notifications. If 1, requests
indications. If 2, requests both. Any other value will
result in a ValueError being raised.
Raises:
NotificationTimeout: If writing to the control handle fails.
ValueError: If `type_` is not in {0, 1, 2}.
"""
pass
def unsubscribe(self, handle, callback=None):
"""Unsubscribes from notif/indications on a handle.
Writes 0000 to the control handle, which is assumed to be `handle`+1.
If `callback` is supplied, removes `callback` from the list of
callbacks for this handle.
Args:
handle (int): The handle to unsubscribe from.
callback (f(int, bytearray)): The callback to remove,
previously passed as the `callback` parameter of
self.subscribe(handle, callback).
Raises:
NotificationTimeout: If writing to the control handle fails.
"""
pass
Note: The design of this class, along with a fair chunk of the code, is borrowed from pygatt. Credit and license information can be found in the sources for this project.
Notice that we've established a few conventions: handles are int
s,
values are bytearray
s. In order to deal with notif/indications, which can
arrive at any time, we'll run a background thread checking for them in
gatttool's output, and then look up the handle in a dictionary of callback
functions. Now let's fill the methods in, starting with __init__
.
def __init__(self, mac_address, hci_device='hci0'):
"""Initialises the device.
Sets up threading for the notification listener and starts the
gatttool session.
Args:
mac_address (str): The mac address of the BLE device to connect
to in the format "XX:XX:XX:XX:XX:XX"
hci_device (str): The hci device to use with gatttool
Raises:
pexpect.TIMEOUT: If, for some reason, pexpect fails to spawn a
gatttool instance (e.g. you don't have gatttool installed).
"""
##### Internal state #####
self._address = mac_address
self._handles = {} # Used for tracking which handles
self._subscribed_handlers = {} # have subscribed callbacks
self._callbacks = defaultdict(set)
self._lock = threading.Lock()
self._connection_lock = threading.RLock()
self._running = True
self._thread = None
self._con = None # The gatttool instance
self._connected = False
##### Set up gatttool #####
gatttool_cmd = ' '.join(
['gatttool',
'-b', self._address,
'-i', hci_device,
'-I']
)
self._con = pexpect.spawn(gatttool_cmd, ignore_sighup=False)
self._con.expect(r'\[LE\]>', timeout=1)
##### Start notification listener thread #####
thread = threading.Thread(target=self.run)
thread.daemon = True
thread.start()
__init__
has three jobs:
- Initialise all the internal variables of the class. Note that any variable prefixed by _ is private, and should not be referenced outside the class. It may not be immediately obvious what some of these are for; this will become clear later.
- Spawn a gatttool instance. This is accomplised using
pexpect.spawn
. Things to note:- setting
ignore_sighup
to false "ensures" that the spawned gatttool instance will be killed when the Python process is - otherwise, you'll find (as I did) that they stay alive indefinitely, which seriously slows down the Ci20 after a while. We'll try to make sure they die gracefully by defining__enter__
,__exit__
and__del__
methods later. self._con.expect
waits for a particular regular expression (in this case, "[LE]>") to appear in gatttool's output. Here, we're waiting for the prompt to appear - if it doesn't, a pexpect.TIMEOUT exception will be raised.
- setting
- Start the notification listener. This is quite straightforward - we
create a new thread pointed at
self.run
, turn it into a daemon and start it.
Next, we'll write connect
and run
:
def connect(self, timeout=DEFAULT_CONNECT_TIMEOUT):
"""Established a connection with the device.
If connection fails, try running an LE scan first.
Args:
timeout (numeric): Time in seconds to wait before giving up on
trying to connect.
Raises:
NotConnectedError: If connection to the device fails.
"""
try:
self._con.sendline('connect')
self._con.expect(r'Connection successful.*\[LE\]>', timeout)
self._connected = True
if not self._running:
self._thread.run()
except pexpect.TIMEOUT:
self.stop()
message = ('timed out after connecting to %s after %f seconds.'
% (self._address, timeout))
raise NotConnectedError(message)
def run(self):
"""Listens for notifications."""
while self._running:
with self._connection_lock:
try:
self._expect('nonsense value foobar', timeout=0.1)
except NotificationTimeout:
pass
except (NotConnectedError, pexpect.EOF):
break
time.sleep(0.05) # Stop thread from hogging _connection_lock
connect
is quite straightforward: We send the "connect" command to
gatttool, and wait for it to respond with "Connection successful". If it
does, and the listener thread isn't running (perhaps because we called
stop
) we start the listener thread. If it doesn't, we call stop
to stop
the listener thread and raise a NotConnectedError
.
run
is less straightforward. First off, the meat of the function is
nestled in a while
- if self._running
is set to False
, the function
will exit and the listener thread will halt. We then acquire the connection
lock, so that we can search gatttool's output without colliding with
another thread doing the same. The process of searching for
notif/indications has been seperated into the private helper method
_expect
, defined below; this serves to keep the logic of run
clear and
to allow other methods to search for notif/indicaitons. While we're at it,
let's also quickly define the stop method:
def stop(self):
"""Stops the gatttool instance and listener thread."""
self._running = False # stop the listener thread
if self._con.isalive():
self._con.sendline('exit')
# wait one second for gatttool to stop
for i in range(100):
if not self._con.isalive(): break
time.sleep(0.01)
self._con.close() # make sure gatttool is dead
self._connected = False
This method tells the listener thread to stop running, then sends an exit
command to the gatttool instance, waits for it to die gracefully and
kills if it's still not dead after a second. Now let's finish the helper
method for run
:
def _expect(self, expected, timeout=DEFAULT_CONNECT_TIMEOUT):
"""Searches for notif/indications while expecting a pattern.
We may (and often do) get an indication/notification before a write
completes, and so it can be lost if we "expect()"ed something that
cam after it in the output, e.g.:
> char-write-req 0x1 0x2
Notification handle: xxx
Write completed succesfully.
>
Anytime we expect() something we have to expect() notif/indications
first for a short time.
Args:
expected (str): The pattern to search for in the output.
timout (numeric): The time in seconds to wait before assuming the
pattern will never be found.
Raises:
NotificationTimout: If the pattern is not found before the
timeout is reached.
"""
with self._connection_lock:
patterns = [
expected,
'Notification handle = .*? \r',
'Indication handle = .*? \r',
'.*Invalid file descriptor.*',
'.*Disconnected\r'
]
while True:
try:
matched_pattern_index = self._con.expect(patterns, timeout)
if matched_pattern_index == 0:
break
elif matched_pattern_index in {1, 2}:
self._handle_notification(self._con.after)
elif matched_pattern_index in {3, 4}:
message = ''
if self._running:
message = 'unexpectedly disconnected'
self._running = False
raise NotConnectedError(message)
except pexpect.TIMEOUT:
message = 'timed out waiting for a notification'
raise NotificationTimeout(message)
def _handle_notification(self, msg):
"""Handle a notification.
Propagates the handle and value to all registered callbacks.
Args:
msg (str): The notification message, which looks like these:
Notification handle = <handle> value: <value>
Indication handle = <handle> value: <value>
"""
hex_handle, _, hex_value = string.split(msg.strip(), maxsplit=5)[3:]
handle = int(hex_handle, 16)
value = bytearray.fromhex(hex_value)
with self._lock:
if handle in self._callbacks:
for callback in self._callbacks[handle]:
callback(handle, value)
You'll notice that we start _expect
by acquiring the connection lock,
which we already acquired in run
. This is fine, because the connection
lock is a reentrant lock, so we can acquire it multiple times from within
the same thread (more information here). This
time, we pass self._con.expect
a list of patterns, and it returns the
index of the matched pattern - this allows us to check for notifications
whenever we expect a response from the device, not just from the
notification thread. We include the expected
parameter in the pattern
list for this reason, even though we have to set it to a nonsense value
when we call _expect
from run
. We then check the index to see if it was
the expected pattern (in which case we exit the function), a notif/indication
(in which case we handle it) or an error message (in which case we vomit a
NotConnectedError
). We've defined a further helper,
_handle_notification
, to keep the logic of _expect
clear.
We start _handle_notification
by extracting the value and handle stings
from the notification message. string.split(x, maxsplit=5)
seperates a
string into at most 6 entries, so
'Notification handle = <handle> value: <value>'
becomes
['Notification', 'handle', '=', '<handle>', 'value:', '<value>']
We then discard the first three items, and give names to the three
remaining. Then, we convert the handle into an int
and the value into a
bytearray
. After that, we acquire the non-reentrant lock (to prevent
other threads from tampering with the callback list while we're iterating
through it), check if there are any callbacks attached to the handle, and
if so, call them.
Now that we've defined the notification handler, let's define the subscribe and unsubscribe methods:
def subscribe(self, handle, callback=None, type_=0):
"""Subscribes to notification/indiciatons from a characteristic.
This is achieved by writing to the control handle, which is assumed
to be `handle`+1. If indications are requested and we are already
subscribed to notifications (or vice versa), we write 0300
(signifying we want to enable both). Otherwise, we write 0100 for
notifications or 0200 for indications.
Args:
handle (int): The handle to listen for.
callback (f(int, bytearray)): A function that will be called
when the notif/indication is received. When called, it will be
passed the handle and value.
type_ (int): If 0, requests notifications. If 1, requests
indications. If 2, requests both. Any other value will
result in a ValueError being raised.
Raises:
NoResponseError: If writing to the control handle fails.
ValueError: If `type_` is not in {0, 1, 2}.
"""
if type_ not in {0, 1, 2}:
message = ('Type must be 0 (notifications), 1 (indications), or'
'2 (both).')
raise ValueError(message)
control_handle = handle + 1
this, other = \
(bytearray([1,0]), bytearray([2,0])) if _type == 0 else \
(bytearray([2,0]), bytearray([1,0])) if _type == 1 else \
(bytearray([3,0]), bytearray([3,0]))
both = bytearray([3,0])
with self._lock:
if callback is not None:
self._callbacks[handle].add(callback)
previous = self._subscribed_handlers.get(handle, None)
if not previous in [this, both]:
write = both if previous == other else this
self.char_write(control_handle, write, wait_for_response=True)
self._subscribed_handlers[handle] = write
def unsubscribe(self, handle, callback=None):
"""Unsubscribes from notif/indications on a handle.
Writes 0000 to the control handle, which is assumed to be `handle`+1.
If `callback` is supplied, removes `callback` from the list of
callbacks for this handle.
Args:
handle (int): The handle to unsubscribe from.
callback (f(int, bytearray)): The callback to remove,
previously passed as the `callback` parameter of
self.subscribe(handle, callback).
Raises:
NotificationTimeout: If writing to the control handle fails.
"""
control_handle = handle + 1
value = bytearray([0,0])
with self._lock:
if callback is not None:
self._callbacks[handle].remove(callback)
if self._subscribed_handlers.get(handle, None) != value:
self.char_write(control_handle, value, wait_for_response=True)
self._subscribed_handlers[handle] = value
Both of these methods work in a very similar way. We'll start with
unsubscribe
, because it's simpler. unsubscribe
begins by defining the
control handle and the value to write if we want no notif/indications
(0000). It then acquires the reentrant lock, so that no other threads can
modify the lists of handlers and callbacks. If callback
was specified, we
remove it from the list of callbacks; we then check if we've already
unsubscribed, and if we haven't, write to the control handle. We've set
wait_for_response
to True
so that an exception will be raised if the
write isn't successful. If the write is successful (no exceptions were
thrown), we write the new value to self._subscribed_handlers
, a
handle→value dictionary.
subscribe
works in much the same way, but has some additional logic to
choose which value to write to the control handle. The values we might want
to write are:
- 0100: subscribe to notifications only
- 0200: subscribe to indications only
- 0300: subscribe to both
If notification
is True
, we set this
to 0100 and other
to 0200, and
else we set them the other way around. We acquire the lock, and add a
callback to the list if one has been specified. Then we check if we've already
subscribed to this kind of value update, and if we haven't, we write the
appropriate value (if we've already subscribed to the other kind of value
update, we subscribe to both; otherwise, we subscribe to just this kind).
If the value write was successful, we update self._subscribed_handlers
to
reflect this.
Finally, we define the characteristic write and read methods:
def char_write(self, handle, value, wait_for_response=False):
"""Writes a value to a given characteristic handle.
Args:
handle (int): The handle to write to
value (bytearray): The value to write
wait_for_response (bool): If true, waits for a response from
the peripheral to check that the value was written succesfully.
Raises:
NotConnectedError: If no connection to the device has been
established.
NoResponseError: If `wait_for_response` is True and no write
confirmation was received from the peripheral.
"""
if not self._connected:
message = 'device is not connected'
raise NotConnectedError(message)
suffix = 'req' if wait_for_response else 'cmd'
value_string = ''.join('%02x' % byte for byte in value)
command = 'char-write-%s %04x %s' % (suffix, handle, value_string)
self._con.sendline(command)
if wait_for_response:
try:
self._expect('Characteristic value was written successfully')
except NotificationTimeout:
message = 'no response received'
raise NoResponseError(message)
def char_read_hnd(self, handle):
"""Reads a characteristic by handle.
Args:
handle (int): The handle of the characteristic to read.
Returns:
bytearray: The value of the characteristic.
Raises:
NotConnectedError: If no connection to the device has been
established.
NotificationTimeout: If the device is connected, but reading
fails for another reason.
"""
if not self._connected:
message = 'device is not connected'
raise NotConnectedError(message)
with self._connection_lock:
self._con.sendline('char-read-hnd %04x' % handle)
self._expect(r'descriptor: .*?\r')
rval = self._con.after.split()[1:]
return bytearray([int(x, 16) for x in rval])
Both of these are quite straight forward - we pick a gatttool command
(char-write-cmd
, char-write-req
or char-read-hnd
), examine what it ouputs
upon success, and then build our method around it. Note that we combine
char-write-cmd
and char-write-req
into the same method, and choose
which to send to gatttool based on the wait_for_response
parameter.
char-write-cmd
gives no output indicating success or failure, so we can
use it in situations where we don't care if the write was successful or
not. If we want confirmation of a successful write, we use
char-write-req
, and then _expect
a message saying the write was
successful. When char-read-hnd
is succesful, it gives us a message of the
form
Characteristic value/descriptor: XX XX XX XX XX ...
We extract the value from this message by _expect
ing a pattern that
matches from descriptor:
to the end of the line, taking the matched text
(self._con.after
), splitting it into words, discarding the first word
(descriptor:
) and converting each of the remaining words into a byte.
And we're done! Now we have a class that we can use to connect to and
communicate with bluetooth peripherals. Below is the full file, which
includes this class, definitions for the exceptions and a couple of extra
functions, one of which runs an LE scan and returns a list of devices and
the other of which resets the bluetooth controller. The methods for
BTLEDevice
are in alphabetical order, with private methods at the bottom.
__enter__
and __exit__
methods have been defined so that we can wrap
the object in a with
and ensure that the child process is killed.
#!/usr/bin/env python
"""
bluetooth.py
============
Python wrapper for gatttool, to allow programmatic control of Bluetooth LE
devices from within Python.
Much of the gatttool interface is borrowed from the gatttool backend of
https://github.com/stratosinc/pygatt, which is distributed under an Apache
2.0 license: http://www.apache.org/licenses/LICENSE-2.0
Credit to the following:
Jeff Rowberg @jrowberg https://github.com/jrowberg/bglib
Greg Albrecht @ampledata https://github.com/ampledata/pygatt
Christopher Peplin @peplin https://github.com/stratosinc/pygatt
Morten Kjaergaard @mkjaergaard https://github.com/mkjaergaard/pygatt
Michael Saunby @msaunby https://github.com/msaunby/ble-sensor-pi
Steven Sloboda sloboste@umich.edu https://github.com/sloboste
"""
# Standard libary
from collections import defaultdict
import threading
import string
import time
import re
# Third party
import pexpect
__author__ = 'Blaine Rogers <blaine.rogers@imgtec.com>'
__credits__ = ['Jeff Rowberg', 'Greg Albrecht', 'Christopher Peplin',
'Morten Kjaergaard', 'Michael Saunby', 'Steven Sloboda']
class BluetoothLEError(Exception):
"""Parent exception class for Bluetooth interface"""
def __repr__(self):
return '<%s, %s>' % (self.__class__.__name__, self.message)
class NotConnectedError(BluetoothLEError):
pass
class NotificationTimeout(BluetoothLEError):
pass
class NoResponseError(BluetoothLEError):
pass
class BTLEDevice(object):
"""Wrapper for gatttool session with bluetooth peripheral"""
DEFAULT_CONNECT_TIMEOUT=3.0
def __init__(self, mac_address, hci_device='hci0'):
"""Initialises the device.
Sets up threading for the notification listener and starts the
gatttool session.
Args:
mac_address (str): The mac address of the BLE device to connect
to in the format "XX:XX:XX:XX:XX:XX"
hci_device (str): The hci device to use with gatttool
Raises:
pexpect.TIMEOUT: If, for some reason, pexpect fails to spawn a
gatttool instance (e.g. you don't have gatttool installed).
"""
##### Internal state #####
self._address = mac_address
self._handles = {} # Used for tracking which handles
self._subscribed_handlers = {} # have subscribed callbacks
self._callbacks = defaultdict(set)
self._lock = threading.Lock()
self._connection_lock = threading.RLock()
self._running = True
self._thread = None
self._con = None # The gatttool instance
self._connected = False
##### Set up gatttool #####
gatttool_cmd = ' '.join(
['gatttool',
'-b', self._address,
'-i', hci_device,
'-I']
)
self._con = pexpect.spawn(gatttool_cmd, ignore_sighup=False)
self._con.expect(r'\[LE\]>', timeout=1)
##### Start notification listener thread #####
thread = threading.Thread(target=self.run)
thread.daemon = True
thread.start()
def char_read_hnd(self, handle):
"""Reads a characteristic by handle.
Args:
handle (int): The handle of the characteristic to read.
Returns:
bytearray: The value of the characteristic.
Raises:
NotConnectedError: If no connection to the device has been
established.
NotificationTimeout: If the device is connected, but reading
fails for another reason.
"""
if not self._connected:
message = 'device is not connected'
raise NotConnectedError(message)
with self._connection_lock:
self._con.sendline('char-read-hnd %04x' % handle)
self._expect(r'descriptor: .*?\r')
rval = self._con.after.split()[1:]
return bytearray([int(x, 16) for x in rval])
def char_write(self, handle, value, wait_for_response=False):
"""Writes a value to a given characteristic handle.
Args:
handle (int): The handle to write to
value (bytearray): The value to write
wait_for_response (bool): If true, waits for a response from
the peripheral to check that the value was written succesfully.
Raises:
NotConnectedError: If no connection to the device has been
established.
NoResponseError: If `wait_for_response` is True and no write
confirmation was received from the peripheral.
"""
if not self._connected:
message = 'device is not connected'
raise NotConnectedError(message)
suffix = 'req' if wait_for_response else 'cmd'
value_string = ''.join('%02x' % byte for byte in value)
command = 'char-write-%s %04x %s' % (suffix, handle, value_string)
self._con.sendline(command)
if wait_for_response:
try:
self._expect('Characteristic value was written successfully')
except NotificationTimeout:
message = 'no response received'
raise NoResponseError(message)
def connect(self, timeout=DEFAULT_CONNECT_TIMEOUT):
"""Established a connection with the device.
If connection fails, try running an LE scan first.
Args:
timeout (numeric): Time in seconds to wait before giving up on
trying to connect.
Raises:
NotConnectedError: If connection to the device fails.
"""
try:
self._con.sendline('connect')
self._con.expect(r'Connection successful.*\[LE\]>', timeout)
self._connected = True
if not self._running:
self._thread.run()
except pexpect.TIMEOUT:
self.stop()
message = ('timed out after connecting to %s after %f seconds.'
% (self._address, timeout))
raise NotConnectedError(message)
def run(self):
"""Listens for notifications. """
while self._running:
with self._connection_lock:
try:
self._expect('nonsense value foobar', timeout=0.1)
except NotificationTimeout:
pass
except (NotConnectedError, pexpect.EOF):
break
time.sleep(0.05) # Stop thread from hogging _connection_lock
def stop(self):
"""Stops the gatttool instance and listener thread. """
self._running = False # stop the listener thread
if self._con.isalive():
self._con.sendline('exit')
# wait one second for gatttool to stop
for i in range(100):
if not self._con.isalive(): break
time.sleep(0.01)
self._con.close() # make sure gatttool is dead
self._connected = False
def subscribe(self, handle, callback=None, type_=0):
"""Subscribes to notification/indiciatons from a characteristic.
This is achieved by writing to the control handle, which is assumed
to be `handle`+1. If indications are requested and we are already
subscribed to notifications (or vice versa), we write 0300
(signifying we want to enable both). Otherwise, we write 0100 for
notifications or 0200 for indications.
Args:
handle (int): The handle to listen for.
callback (f(int, bytearray)): A function that will be called
when the notif/indication is received. When called, it will be
passed the handle and value.
type_ (int): If 0, requests notifications. If 1, requests
indications. If 2, requests both. Any other value will
result in a ValueError being raised.
Raises:
NoResponseError: If writing to the control handle fails.
ValueError: If `type_` is not in {0, 1, 2}.
"""
if type_ not in {0, 1, 2}:
message = ('Type must be 0 (notifications), 1 (indications), or'
'2 (both).')
raise ValueError(message)
control_handle = handle + 1
this, other = \
(bytearray([1,0]), bytearray([2,0])) if _type == 0 else \
(bytearray([2,0]), bytearray([1,0])) if _type == 1 else \
(bytearray([3,0]), bytearray([3,0]))
both = bytearray([3,0])
with self._lock:
if callback is not None:
self._callbacks[handle].add(callback)
previous = self._subscribed_handlers.get(handle, None)
if not previous in [this, both]:
write = both if previous == other else this
self.char_write(control_handle, write, wait_for_response=True)
self._subscribed_handlers[handle] = write
def unsubscribe(self, handle, callback=None):
"""Unsubscribes from notif/indications on a handle.
Writes 0000 to the control handle, which is assumed to be `handle`+1.
If `callback` is supplied, removes `callback` from the list of
callbacks for this handle.
Args:
handle (int): The handle to unsubscribe from.
callback (f(int, bytearray)): The callback to remove,
previously passed as the `callback` parameter of
self.subscribe(handle, callback).
Raises:
NotificationTimeout: If writing to the control handle fails.
"""
control_handle = handle + 1
value = bytearray([0,0])
with self._lock:
if callback is not None:
self._callbacks[handle].remove(callback)
if self._subscribed_handlers.get(handle, None) != value:
self.char_write(control_handle, value, wait_for_response=True)
self._subscribed_handlers[handle] = value
def _expect(self, expected, timeout=DEFAULT_CONNECT_TIMEOUT):
"""Searches for notif/indications while expecting a pattern.
We may (and often do) get an indication/notification before a write
completes, and so it can be lost if we "expect()"ed something that
cam after it in the output, e.g.:
> char-write-req 0x1 0x2
Notification handle: xxx
Write completed succesfully.
>
Anytime we expect() something we have to expect() notif/indications
first for a short time.
Args:
expected (str): The pattern to search for in the output.
timout (numeric): The time in seconds to wait before assuming the
pattern will never be found.
Raises:
NotificationTimout: If the pattern is not found before the
timeout is reached.
"""
with self._connection_lock:
patterns = [
expected,
'Notification handle = .*? \r',
'Indication handle = .*? \r',
'.*Invalid file descriptor.*',
'.*Disconnected\r'
]
while True:
try:
matched_pattern_index = self._con.expect(patterns, timeout)
if matched_pattern_index == 0:
break
elif matched_pattern_index in {1, 2}:
self._handle_notification(self._con.after)
elif matched_pattern_index in {3, 4}:
message = ''
if self._running:
message = 'unexpectedly disconnected'
self._running = False
raise NotConnectedError(message)
except pexpect.TIMEOUT:
message = 'timed out waiting for a notification'
raise NotificationTimeout(message)
def _handle_notification(self, msg):
"""Handle a notification from the device.
Propagates the handle and value to all registered callbacks.
Args:
msg (str): The notification message, which looks like these:
Notification handle = <handle> value: <value>
Indication handle = <handle> value: <value>
"""
hex_handle, _, hex_value = string.split(msg.strip(), maxsplit=5)[3:]
handle = int(hex_handle, 16)
value = bytearray.fromhex(hex_value)
with self._lock:
if handle in self._callbacks:
for callback in self._callbacks[handle]:
callback(handle, value)
def __enter__(self):
return self
def __exit__(self):
if self._con.isalive():
self.stop()
def le_scan(sudo_password=None, timeout=5):
"""Performs a BTLE scan.
If you don't want to use sudo, you must allow normal users to perform
LE scanning:
setcap 'cap_net_raw,cap_net_admin+eip' `which hcitool`
Args:
sudo_password (str): The password for super user priveleges. Do not
hard code this! Fetch it from the user, then destroy it asap.
If `None`, sudo priveleges will be assumed.
timeout (numeric): Time (in seconds) to wait for the scan to complete
use_sudo (bool): If True, performs scan as superuser.
Returns:
[{str:str}]: A list of dictionaries, each of which represents a
device. The dictionaries have two keys, 'address' and 'name',
whose values are the MAC address and name of the device
respectively.
[{'address': device_1_address, 'name': device_1_name},
{'address': device_2_address, 'name': device_2_name},
...
{'address': device_n_address, 'name': device_n_name}]
Raises:
BluetoothLEError: If hcitool exits before the timeout.
"""
command = 'hcitool lescan'
if sudo_password: command = 'sudo %s' % command
scan = pexpect.spawn('bash', ['-c', command], ignore_sighup=False)
if sudo_password:
scan.sendline(sudo_password)
scan.readline() # exclude sudo message from scan.before
try:
# Not actually expecting anything, just using the convenient timeout
scan.expect('nonsense value foobar', timeout=timeout)
except pexpect.EOF:
message = 'unexpected error while scanning: \n' + scan.before
if 'Input/Output error' in scan.before:
message += '\n - Try resetting the bluetooth controller.'
elif 'Operation not permitted' in scan.before:
message += '\n - Try running using sudo.'
raise BluetoothLEError(message)
except pexpect.TIMEOUT:
devices = {}
for line in scan.before.split('\r\n'):
match = re.match(r'(([0-9A-Fa-f]{2}:?){6}) (\(?\w+\)?', line)
if match is not None:
address = match.group(1)
name = match.group(3)
if name == '(unknown)': name = None
if address in devices:
if devices[address]['name'] is None and name is not None:
devices[address]['name'] = name
else:
devices[address] = {'address': address, 'name': name}
# Convert from dict_values([{str:str}]) to [{str:str}]
return [device for device in devices.values()]
finally:
# try our best to kill sudo
scan.sendcontrol('c'); scan.sendcontrol('x'); scan.sendcontrol('d');
scan.close()
return [] # failsafe
def reset_bluetooth_controller(sudo_password=None, hci_device='hci0',
timeout=3.0):
""" Reinitialises the bluetooth controller interface.
This is accomplished by bringing down and up the interface using
hciconfig. This requires superuser priveleges.
Args:
sudo_password (str): The password for super user priveleges. Do not
hard code this! Fetch it from the user, then destroy it asap.
If `None`, sudo priveleges will be assumed.
hci_device (str): The interface to reinitialise.
timeout (numeric): Time to wait before abandoning hope.
Raises:
BluetoothLEError: If hciconfig fails or the timeout is reached.
"""
command = 'hciconfig %s reset' % hci_device
if sudo_password: command = 'sudo %s' % command
p = pexpect.spawn('bash', ['-c', command], ignore_sighup=False)
if sudo_password: p.sendline(sudo_password)
try:
p.expect('nonsense value foobar', timeout=timeout)
except pexpect.EOF:
p.close()
if p.exitstatus != 0:
message = 'hciconfig failed: \n' + p.before
if 'Operation not permitted' in p.before:
message += '\n - Try running using sudo.'
raise BluetoothLEError(message)
except pexpect.TIMEOUT:
# try our best to kill sudo
p.sendcontrol('c'); p.sendcontrol('x'); p.sendcontrol('d');
message = 'hciconfig did not complete before timeout'
raise BluetoothLEError(message)
finally:
p.close()