Bluetooth LE in Python using pexpect and gatttool

Once you've extablished control of the peripheral using gatttool's interactive mode, the next step is to establish a way of controlling it programmatically. Here, we accomplish this using pexpect, a library for controlling command line programs in Python. Make sure you have pexpect installed on the Ci20:

ci20@ci20:~$ sudo apt-get install python-pexpect

Let's start by listing what we want to be able to do from within Python:

  1. Connect to a device.
  2. Write to characteristics by their handle.
  3. Read from characteristics by their handle.
  4. Listen for notifications / indications on a handle.

We'll start by outlining a class to represent a bluetooth peripheral, with methods that will do those tasks. We'll leave the bodies blank for now:

class BLEDevice():
    """Represents a bluetooth peripheral"""
    DEFAULT_CONNECT_TIMEOUT = 3.0

    def __init__(self, mac_address, hci_device='hci0'):
        """Initialises the device.

        Sets up threading for the notification listener and starts the
        gatttool session.

        Args:
            mac_address (str): The mac address of the BLE device to connect
                to in the format "XX:XX:XX:XX:XX:XX"
            hci_device (str): The hci device to use with gatttool

        Raises:
            pexpect.TIMEOUT: If, for some reason, pexpect fails to spawn a
                gatttool instance (e.g. you don't have gatttool installed).
        """
        pass

    def char_write(self, handle, value, wait_for_response=False):
        """Writes a value to a given characteristic handle.

        Args:
            handle (int): The handle to write to
            value (bytearray): The value to write
            wait_for_response (bool): If true, waits for a response from
                the peripheral to check that the value was written succesfully. 

        Raises:
            NotConnectedError: If no connection to the device has been
                established.
            NoResponseError: If `wait_for_response` is True and no response
                was received from the peripheral.
        """
        pass

    def char_read_hnd(self, handle):
        """Reads a characteristic by handle.

        Args:
            handle (int): The handle of the characteristic to read.

        Returns:
            bytearray: The value of the characteristic.

        Raises:
            NotConnectedError: If no connection to the device has been 
                established.
            NotificationTimeout: If the device is connected, but reading
                fails for another reason.
        """
        pass

    def connect(self, timeout=DEFAULT_CONNECT_TIMEOUT):
        """Established a connection with the device. 

        If connection fails, try running an LE scan first. 

        Args:
            timeout (numeric): Time in seconds to wait before giving up on
                trying to connect.

        Raises:
            NotConnectedError: If connection to the device fails.
        """
        pass

    def run(self):
        """Listens for notifications."""
        pass

    def stop(self):
        """Stop the gatttool instance and listener thread."""
        pass

    def subscribe(self, handle, callback=None, type_=0):
        """Subscribes to notification/indiciatons from a characteristic.

        This is achieved by writing to the control handle, which is assumed
        to be `handle`+1. If indications are requested and we are already
        subscribed to notifications (or vice versa), we write 0300 
        (signifying we want to enable both). Otherwise, we write 0100 for
        notifications or 0200 for indications.

        Args:
            handle (int): The handle to listen for.
            callback (f(int, bytearray)): A function that will be called
                when the notif/indication is received. When called, it will be
                passed the handle and value.
            type_ (int): If 0, requests notifications. If 1, requests 
                indications. If 2, requests both. Any other value will
                result in a ValueError being raised. 

        Raises:
            NotificationTimeout: If writing to the control handle fails.
            ValueError: If `type_` is not in {0, 1, 2}.
        """
        pass

    def unsubscribe(self, handle, callback=None):
        """Unsubscribes from notif/indications on a handle.

        Writes 0000 to the control handle, which is assumed to be `handle`+1.
        If `callback` is supplied, removes `callback` from the list of
        callbacks for this handle.

        Args:
            handle (int): The handle to unsubscribe from.
            callback (f(int, bytearray)): The callback to remove,
                previously passed as the `callback` parameter of
                self.subscribe(handle, callback).

        Raises:
            NotificationTimeout: If writing to the control handle fails.
        """
        pass

Note: The design of this class, along with a fair chunk of the code, is borrowed from pygatt. Credit and license information can be found in the sources for this project.

Notice that we've established a few conventions: handles are ints, values are bytearrays. In order to deal with notif/indications, which can arrive at any time, we'll run a background thread checking for them in gatttool's output, and then look up the handle in a dictionary of callback functions. Now let's fill the methods in, starting with __init__.

    def __init__(self, mac_address, hci_device='hci0'):
        """Initialises the device.

        Sets up threading for the notification listener and starts the
        gatttool session.

        Args:
            mac_address (str): The mac address of the BLE device to connect
                to in the format "XX:XX:XX:XX:XX:XX"
            hci_device (str): The hci device to use with gatttool

        Raises:
            pexpect.TIMEOUT: If, for some reason, pexpect fails to spawn a 
                gatttool instance (e.g. you don't have gatttool installed).
        """
        ##### Internal state #####
        self._address = mac_address
        self._handles = {}               # Used for tracking which handles
        self._subscribed_handlers = {}   # have subscribed callbacks
        self._callbacks = defaultdict(set)
        self._lock = threading.Lock()
        self._connection_lock = threading.RLock()
        self._running = True
        self._thread = None
        self._con = None                 # The gatttool instance
        self._connected = False

        ##### Set up gatttool #####
        gatttool_cmd = ' '.join(
            ['gatttool',
             '-b', self._address,
             '-i', hci_device,
             '-I']
        )

        self._con = pexpect.spawn(gatttool_cmd, ignore_sighup=False)
        self._con.expect(r'\[LE\]>', timeout=1)

        ##### Start notification listener thread #####
        thread = threading.Thread(target=self.run)
        thread.daemon = True
        thread.start()

__init__ has three jobs:

  1. Initialise all the internal variables of the class. Note that any variable prefixed by _ is private, and should not be referenced outside the class. It may not be immediately obvious what some of these are for; this will become clear later.
  2. Spawn a gatttool instance. This is accomplised using pexpect.spawn. Things to note:
    • setting ignore_sighup to false "ensures" that the spawned gatttool instance will be killed when the Python process is - otherwise, you'll find (as I did) that they stay alive indefinitely, which seriously slows down the Ci20 after a while. We'll try to make sure they die gracefully by defining __enter__, __exit__ and __del__ methods later.
    • self._con.expect waits for a particular regular expression (in this case, "[LE]>") to appear in gatttool's output. Here, we're waiting for the prompt to appear - if it doesn't, a pexpect.TIMEOUT exception will be raised.
  3. Start the notification listener. This is quite straightforward - we create a new thread pointed at self.run, turn it into a daemon and start it.

Next, we'll write connect and run:

    def connect(self, timeout=DEFAULT_CONNECT_TIMEOUT):
        """Established a connection with the device. 

        If connection fails, try running an LE scan first. 

        Args:
            timeout (numeric): Time in seconds to wait before giving up on
                trying to connect.

        Raises:
            NotConnectedError: If connection to the device fails.
        """
        try:
            self._con.sendline('connect')
            self._con.expect(r'Connection successful.*\[LE\]>', timeout)
            self._connected = True
            if not self._running:
                self._thread.run()
        except pexpect.TIMEOUT:
            self.stop()
            message = ('timed out after connecting to %s after %f seconds.'
                       % (self._address, timeout))
            raise NotConnectedError(message)

    def run(self):
        """Listens for notifications."""
        while self._running:
            with self._connection_lock:
                try:
                    self._expect('nonsense value foobar', timeout=0.1)
                except NotificationTimeout:
                    pass
                except (NotConnectedError, pexpect.EOF):
                    break

            time.sleep(0.05)  # Stop thread from hogging _connection_lock

connect is quite straightforward: We send the "connect" command to gatttool, and wait for it to respond with "Connection successful". If it does, and the listener thread isn't running (perhaps because we called stop) we start the listener thread. If it doesn't, we call stop to stop the listener thread and raise a NotConnectedError.

run is less straightforward. First off, the meat of the function is nestled in a while - if self._running is set to False, the function will exit and the listener thread will halt. We then acquire the connection lock, so that we can search gatttool's output without colliding with another thread doing the same. The process of searching for notif/indications has been seperated into the private helper method _expect, defined below; this serves to keep the logic of run clear and to allow other methods to search for notif/indicaitons. While we're at it, let's also quickly define the stop method:

    def stop(self):
        """Stops the gatttool instance and listener thread."""
        self._running = False  # stop the listener thread
        if self._con.isalive():
            self._con.sendline('exit')

            # wait one second for gatttool to stop
            for i in range(100):
                if not self._con.isalive(): break
                time.sleep(0.01)

            self._con.close()  # make sure gatttool is dead
            self._connected = False

This method tells the listener thread to stop running, then sends an exit command to the gatttool instance, waits for it to die gracefully and kills if it's still not dead after a second. Now let's finish the helper method for run:

    def _expect(self, expected, timeout=DEFAULT_CONNECT_TIMEOUT):
        """Searches for notif/indications while expecting a pattern.

        We may (and often do) get an indication/notification before a write
        completes, and so it can be lost if we "expect()"ed something that
        cam after it in the output, e.g.:
            > char-write-req 0x1 0x2
            Notification handle: xxx
            Write completed succesfully.
            >
        Anytime we expect() something we have to expect() notif/indications
        first for a short time.

        Args:
            expected (str): The pattern to search for in the output.
            timout (numeric): The time in seconds to wait before assuming the
                pattern will never be found.

        Raises:
            NotificationTimout: If the pattern is not found before the
                timeout is reached.
        """
        with self._connection_lock:
            patterns = [
                expected,
                'Notification handle = .*? \r',
                'Indication   handle = .*? \r',
                '.*Invalid file descriptor.*',
                '.*Disconnected\r'
            ]
            while True:
                try:
                    matched_pattern_index = self._con.expect(patterns, timeout)
                    if matched_pattern_index == 0:
                        break
                    elif matched_pattern_index in {1, 2}:
                        self._handle_notification(self._con.after)
                    elif matched_pattern_index in {3, 4}:
                        message = ''
                        if self._running:
                            message = 'unexpectedly disconnected'
                            self._running = False
                        raise NotConnectedError(message)
                except pexpect.TIMEOUT:
                    message = 'timed out waiting for a notification'
                    raise NotificationTimeout(message)

    def _handle_notification(self, msg):
        """Handle a notification.

        Propagates the handle and value to all registered callbacks.

        Args:
            msg (str): The notification message, which looks like these:

                    Notification handle = <handle> value: <value> 
                    Indication   handle = <handle> value: <value>
        """
        hex_handle, _, hex_value = string.split(msg.strip(), maxsplit=5)[3:]
        handle = int(hex_handle, 16)
        value = bytearray.fromhex(hex_value)

        with self._lock:
            if handle in self._callbacks:
                for callback in self._callbacks[handle]:
                    callback(handle, value)

You'll notice that we start _expect by acquiring the connection lock, which we already acquired in run. This is fine, because the connection lock is a reentrant lock, so we can acquire it multiple times from within the same thread (more information here). This time, we pass self._con.expect a list of patterns, and it returns the index of the matched pattern - this allows us to check for notifications whenever we expect a response from the device, not just from the notification thread. We include the expected parameter in the pattern list for this reason, even though we have to set it to a nonsense value when we call _expect from run. We then check the index to see if it was the expected pattern (in which case we exit the function), a notif/indication (in which case we handle it) or an error message (in which case we vomit a NotConnectedError). We've defined a further helper, _handle_notification, to keep the logic of _expect clear.

We start _handle_notification by extracting the value and handle stings from the notification message. string.split(x, maxsplit=5) seperates a string into at most 6 entries, so

'Notification handle = <handle> value: <value>'

becomes

['Notification', 'handle', '=', '<handle>', 'value:', '<value>']

We then discard the first three items, and give names to the three remaining. Then, we convert the handle into an int and the value into a bytearray. After that, we acquire the non-reentrant lock (to prevent other threads from tampering with the callback list while we're iterating through it), check if there are any callbacks attached to the handle, and if so, call them.

Now that we've defined the notification handler, let's define the subscribe and unsubscribe methods:

    def subscribe(self, handle, callback=None, type_=0):
        """Subscribes to notification/indiciatons from a characteristic.

        This is achieved by writing to the control handle, which is assumed
        to be `handle`+1. If indications are requested and we are already
        subscribed to notifications (or vice versa), we write 0300 
        (signifying we want to enable both). Otherwise, we write 0100 for
        notifications or 0200 for indications.

        Args:
            handle (int): The handle to listen for.
            callback (f(int, bytearray)): A function that will be called
                when the notif/indication is received. When called, it will be
                passed the handle and value.
            type_ (int): If 0, requests notifications. If 1, requests 
                indications. If 2, requests both. Any other value will
                result in a ValueError being raised. 

        Raises:
            NoResponseError: If writing to the control handle fails.
            ValueError: If `type_` is not in {0, 1, 2}.
        """
        if type_ not in {0, 1, 2}:
            message = ('Type must be 0 (notifications), 1 (indications), or'
                       '2 (both).')
            raise ValueError(message)

        control_handle = handle + 1
        this, other = \
                (bytearray([1,0]), bytearray([2,0])) if _type == 0 else \
                (bytearray([2,0]), bytearray([1,0])) if _type == 1 else \
                (bytearray([3,0]), bytearray([3,0]))
        both = bytearray([3,0])

        with self._lock:
            if callback is not None:
                self._callbacks[handle].add(callback)

            previous = self._subscribed_handlers.get(handle, None)
            if not previous in [this, both]:
                write = both if previous == other else this
                self.char_write(control_handle, write, wait_for_response=True)
                self._subscribed_handlers[handle] = write

    def unsubscribe(self, handle, callback=None):
        """Unsubscribes from notif/indications on a handle.

        Writes 0000 to the control handle, which is assumed to be `handle`+1.
        If `callback` is supplied, removes `callback` from the list of
        callbacks for this handle.

        Args:
            handle (int): The handle to unsubscribe from.
            callback (f(int, bytearray)): The callback to remove,
                previously passed as the `callback` parameter of
                self.subscribe(handle, callback).

        Raises:
            NotificationTimeout: If writing to the control handle fails.
        """
        control_handle = handle + 1
        value = bytearray([0,0])
        with self._lock:
            if callback is not None:
                self._callbacks[handle].remove(callback)

            if self._subscribed_handlers.get(handle, None) != value:
                self.char_write(control_handle, value, wait_for_response=True)
                self._subscribed_handlers[handle] = value

Both of these methods work in a very similar way. We'll start with unsubscribe, because it's simpler. unsubscribe begins by defining the control handle and the value to write if we want no notif/indications (0000). It then acquires the reentrant lock, so that no other threads can modify the lists of handlers and callbacks. If callback was specified, we remove it from the list of callbacks; we then check if we've already unsubscribed, and if we haven't, write to the control handle. We've set wait_for_response to True so that an exception will be raised if the write isn't successful. If the write is successful (no exceptions were thrown), we write the new value to self._subscribed_handlers, a handle→value dictionary.

subscribe works in much the same way, but has some additional logic to choose which value to write to the control handle. The values we might want to write are:

If notification is True, we set this to 0100 and other to 0200, and else we set them the other way around. We acquire the lock, and add a callback to the list if one has been specified. Then we check if we've already subscribed to this kind of value update, and if we haven't, we write the appropriate value (if we've already subscribed to the other kind of value update, we subscribe to both; otherwise, we subscribe to just this kind). If the value write was successful, we update self._subscribed_handlers to reflect this.

Finally, we define the characteristic write and read methods:

    def char_write(self, handle, value, wait_for_response=False):
        """Writes a value to a given characteristic handle.

        Args:
            handle (int): The handle to write to
            value (bytearray): The value to write
            wait_for_response (bool): If true, waits for a response from
                the peripheral to check that the value was written succesfully. 

        Raises:
            NotConnectedError: If no connection to the device has been
                established.
            NoResponseError: If `wait_for_response` is True and no write
                confirmation was received from the peripheral.
        """
        if not self._connected:
            message = 'device is not connected'
            raise NotConnectedError(message)

        suffix = 'req' if wait_for_response else 'cmd'
        value_string = ''.join('%02x' % byte for byte in value)
        command = 'char-write-%s %04x %s' % (suffix, handle, value_string)

        self._con.sendline(command)

        if wait_for_response:
            try:
                self._expect('Characteristic value was written successfully')
            except NotificationTimeout:
                message = 'no response received'
                raise NoResponseError(message)

    def char_read_hnd(self, handle):
        """Reads a characteristic by handle.

        Args:
            handle (int): The handle of the characteristic to read.

        Returns:
            bytearray: The value of the characteristic.

        Raises:
            NotConnectedError: If no connection to the device has been 
                established.
            NotificationTimeout: If the device is connected, but reading
                fails for another reason.
        """
        if not self._connected:
            message = 'device is not connected'
            raise NotConnectedError(message)

        with self._connection_lock:
            self._con.sendline('char-read-hnd %04x' % handle)
            self._expect(r'descriptor: .*?\r')
            rval = self._con.after.split()[1:]
            return bytearray([int(x, 16) for x in rval])

Both of these are quite straight forward - we pick a gatttool command (char-write-cmd, char-write-req or char-read-hnd), examine what it ouputs upon success, and then build our method around it. Note that we combine char-write-cmd and char-write-req into the same method, and choose which to send to gatttool based on the wait_for_response parameter. char-write-cmd gives no output indicating success or failure, so we can use it in situations where we don't care if the write was successful or not. If we want confirmation of a successful write, we use char-write-req, and then _expect a message saying the write was successful. When char-read-hnd is succesful, it gives us a message of the form

Characteristic value/descriptor: XX XX XX XX XX ...

We extract the value from this message by _expecting a pattern that matches from descriptor: to the end of the line, taking the matched text (self._con.after), splitting it into words, discarding the first word (descriptor:) and converting each of the remaining words into a byte.

And we're done! Now we have a class that we can use to connect to and communicate with bluetooth peripherals. Below is the full file, which includes this class, definitions for the exceptions and a couple of extra functions, one of which runs an LE scan and returns a list of devices and the other of which resets the bluetooth controller. The methods for BTLEDevice are in alphabetical order, with private methods at the bottom. __enter__ and __exit__ methods have been defined so that we can wrap the object in a with and ensure that the child process is killed.

#!/usr/bin/env python

"""
bluetooth.py
============

Python wrapper for gatttool, to allow programmatic control of Bluetooth LE
devices from within Python.

Much of the gatttool interface is borrowed from the gatttool backend of
https://github.com/stratosinc/pygatt, which is distributed under an Apache
2.0 license: http://www.apache.org/licenses/LICENSE-2.0

Credit to the following:
Jeff Rowberg @jrowberg https://github.com/jrowberg/bglib
Greg Albrecht @ampledata https://github.com/ampledata/pygatt
Christopher Peplin @peplin https://github.com/stratosinc/pygatt
Morten Kjaergaard @mkjaergaard https://github.com/mkjaergaard/pygatt
Michael Saunby @msaunby https://github.com/msaunby/ble-sensor-pi
Steven Sloboda sloboste@umich.edu https://github.com/sloboste
"""

# Standard libary
from collections import defaultdict
import threading
import string
import time
import re

# Third party
import pexpect

__author__ = 'Blaine Rogers <blaine.rogers@imgtec.com>'
__credits__ = ['Jeff Rowberg', 'Greg Albrecht', 'Christopher Peplin',
'Morten Kjaergaard', 'Michael Saunby', 'Steven Sloboda']

class BluetoothLEError(Exception):
    """Parent exception class for Bluetooth interface"""
    def __repr__(self):
        return '<%s, %s>' % (self.__class__.__name__, self.message)

class NotConnectedError(BluetoothLEError):
    pass

class NotificationTimeout(BluetoothLEError):
    pass

class NoResponseError(BluetoothLEError):
    pass


class BTLEDevice(object):
    """Wrapper for gatttool session with bluetooth peripheral"""
    DEFAULT_CONNECT_TIMEOUT=3.0

    def __init__(self, mac_address, hci_device='hci0'):
        """Initialises the device.

        Sets up threading for the notification listener and starts the
        gatttool session.

        Args:
            mac_address (str): The mac address of the BLE device to connect
                to in the format "XX:XX:XX:XX:XX:XX"
            hci_device (str): The hci device to use with gatttool

        Raises:
            pexpect.TIMEOUT: If, for some reason, pexpect fails to spawn a 
                gatttool instance (e.g. you don't have gatttool installed).
        """
        ##### Internal state #####
        self._address = mac_address
        self._handles = {}               # Used for tracking which handles
        self._subscribed_handlers = {}   # have subscribed callbacks
        self._callbacks = defaultdict(set)
        self._lock = threading.Lock()
        self._connection_lock = threading.RLock()
        self._running = True
        self._thread = None
        self._con = None                 # The gatttool instance
        self._connected = False

        ##### Set up gatttool #####
        gatttool_cmd = ' '.join(
            ['gatttool',
             '-b', self._address,
             '-i', hci_device,
             '-I']
        )

        self._con = pexpect.spawn(gatttool_cmd, ignore_sighup=False)
        self._con.expect(r'\[LE\]>', timeout=1)

        ##### Start notification listener thread #####
        thread = threading.Thread(target=self.run)
        thread.daemon = True
        thread.start()

    def char_read_hnd(self, handle):
        """Reads a characteristic by handle.

        Args:
            handle (int): The handle of the characteristic to read.

        Returns:
            bytearray: The value of the characteristic.

        Raises:
            NotConnectedError: If no connection to the device has been 
                established.
            NotificationTimeout: If the device is connected, but reading
                fails for another reason.
        """
        if not self._connected:
            message = 'device is not connected'
            raise NotConnectedError(message)

        with self._connection_lock:
            self._con.sendline('char-read-hnd %04x' % handle)
            self._expect(r'descriptor: .*?\r')
            rval = self._con.after.split()[1:]
            return bytearray([int(x, 16) for x in rval])

    def char_write(self, handle, value, wait_for_response=False):
        """Writes a value to a given characteristic handle.

        Args:
            handle (int): The handle to write to
            value (bytearray): The value to write
            wait_for_response (bool): If true, waits for a response from
                the peripheral to check that the value was written succesfully. 

        Raises:
            NotConnectedError: If no connection to the device has been
                established.
            NoResponseError: If `wait_for_response` is True and no write
                confirmation was received from the peripheral.
        """
        if not self._connected:
            message = 'device is not connected'
            raise NotConnectedError(message)

        suffix = 'req' if wait_for_response else 'cmd'
        value_string = ''.join('%02x' % byte for byte in value)
        command = 'char-write-%s %04x %s' % (suffix, handle, value_string)

        self._con.sendline(command)

        if wait_for_response:
            try:
                self._expect('Characteristic value was written successfully')
            except NotificationTimeout:
                message = 'no response received'
                raise NoResponseError(message)

    def connect(self, timeout=DEFAULT_CONNECT_TIMEOUT):
        """Established a connection with the device. 

        If connection fails, try running an LE scan first. 

        Args:
            timeout (numeric): Time in seconds to wait before giving up on
                trying to connect.

        Raises:
            NotConnectedError: If connection to the device fails.
        """
        try:
            self._con.sendline('connect')
            self._con.expect(r'Connection successful.*\[LE\]>', timeout)
            self._connected = True
            if not self._running:
                self._thread.run()
        except pexpect.TIMEOUT:
            self.stop()
            message = ('timed out after connecting to %s after %f seconds.'
                       % (self._address, timeout))
            raise NotConnectedError(message)

    def run(self):
        """Listens for notifications.  """
        while self._running:
            with self._connection_lock:
                try:
                    self._expect('nonsense value foobar', timeout=0.1)
                except NotificationTimeout:
                    pass
                except (NotConnectedError, pexpect.EOF):
                    break
            time.sleep(0.05)  # Stop thread from hogging _connection_lock

    def stop(self):
        """Stops the gatttool instance and listener thread.  """
        self._running = False  # stop the listener thread
        if self._con.isalive():
            self._con.sendline('exit')

            # wait one second for gatttool to stop
            for i in range(100):
                if not self._con.isalive(): break
                time.sleep(0.01)

            self._con.close()  # make sure gatttool is dead
            self._connected = False

    def subscribe(self, handle, callback=None, type_=0):
        """Subscribes to notification/indiciatons from a characteristic.

        This is achieved by writing to the control handle, which is assumed
        to be `handle`+1. If indications are requested and we are already
        subscribed to notifications (or vice versa), we write 0300 
        (signifying we want to enable both). Otherwise, we write 0100 for
        notifications or 0200 for indications.

        Args:
            handle (int): The handle to listen for.
            callback (f(int, bytearray)): A function that will be called
                when the notif/indication is received. When called, it will be
                passed the handle and value.
            type_ (int): If 0, requests notifications. If 1, requests 
                indications. If 2, requests both. Any other value will
                result in a ValueError being raised. 

        Raises:
            NoResponseError: If writing to the control handle fails.
            ValueError: If `type_` is not in {0, 1, 2}.
        """
        if type_ not in {0, 1, 2}:
            message = ('Type must be 0 (notifications), 1 (indications), or'
                       '2 (both).')
            raise ValueError(message)

        control_handle = handle + 1
        this, other = \
                (bytearray([1,0]), bytearray([2,0])) if _type == 0 else \
                (bytearray([2,0]), bytearray([1,0])) if _type == 1 else \
                (bytearray([3,0]), bytearray([3,0]))
        both = bytearray([3,0])

        with self._lock:
            if callback is not None:
                self._callbacks[handle].add(callback)

            previous = self._subscribed_handlers.get(handle, None)
            if not previous in [this, both]:
                write = both if previous == other else this
                self.char_write(control_handle, write, wait_for_response=True)
                self._subscribed_handlers[handle] = write

    def unsubscribe(self, handle, callback=None):
        """Unsubscribes from notif/indications on a handle.

        Writes 0000 to the control handle, which is assumed to be `handle`+1.
        If `callback` is supplied, removes `callback` from the list of
        callbacks for this handle.

        Args:
            handle (int): The handle to unsubscribe from.
            callback (f(int, bytearray)): The callback to remove,
                previously passed as the `callback` parameter of
                self.subscribe(handle, callback).

        Raises:
            NotificationTimeout: If writing to the control handle fails.
        """
        control_handle = handle + 1
        value = bytearray([0,0])
        with self._lock:
            if callback is not None:
                self._callbacks[handle].remove(callback)

            if self._subscribed_handlers.get(handle, None) != value:
                self.char_write(control_handle, value, wait_for_response=True)
                self._subscribed_handlers[handle] = value

    def _expect(self, expected, timeout=DEFAULT_CONNECT_TIMEOUT):
        """Searches for notif/indications while expecting a pattern.

        We may (and often do) get an indication/notification before a write
        completes, and so it can be lost if we "expect()"ed something that
        cam after it in the output, e.g.:
            > char-write-req 0x1 0x2
            Notification handle: xxx
            Write completed succesfully.
            >
        Anytime we expect() something we have to expect() notif/indications
        first for a short time.

        Args:
            expected (str): The pattern to search for in the output.
            timout (numeric): The time in seconds to wait before assuming the
                pattern will never be found.

        Raises:
            NotificationTimout: If the pattern is not found before the
                timeout is reached.
        """
        with self._connection_lock:
            patterns = [
                expected,
                'Notification handle = .*? \r',
                'Indication   handle = .*? \r',
                '.*Invalid file descriptor.*',
                '.*Disconnected\r'
            ]
            while True:
                try:
                    matched_pattern_index = self._con.expect(patterns, timeout)
                    if matched_pattern_index == 0:
                        break
                    elif matched_pattern_index in {1, 2}:
                        self._handle_notification(self._con.after)
                    elif matched_pattern_index in {3, 4}:
                        message = ''
                        if self._running:
                            message = 'unexpectedly disconnected'
                            self._running = False
                        raise NotConnectedError(message)
                except pexpect.TIMEOUT:
                    message = 'timed out waiting for a notification'
                    raise NotificationTimeout(message)

    def _handle_notification(self, msg):
        """Handle a notification from the device.

        Propagates the handle and value to all registered callbacks.

        Args:
            msg (str): The notification message, which looks like these:

                    Notification handle = <handle> value: <value> 
                    Indication   handle = <handle> value: <value>
        """
        hex_handle, _, hex_value = string.split(msg.strip(), maxsplit=5)[3:]
        handle = int(hex_handle, 16)
        value = bytearray.fromhex(hex_value)

        with self._lock:
            if handle in self._callbacks:
                for callback in self._callbacks[handle]:
                    callback(handle, value)

    def __enter__(self):
        return self

    def __exit__(self):
        if self._con.isalive():
            self.stop()


def le_scan(sudo_password=None, timeout=5):
    """Performs a BTLE scan.

    If you don't want to use sudo, you must allow normal users to perform
    LE scanning:
        setcap 'cap_net_raw,cap_net_admin+eip' `which hcitool`

    Args:
        sudo_password (str): The password for super user priveleges. Do not
            hard code this! Fetch it from the user, then destroy it asap.
            If `None`, sudo priveleges will be assumed.
        timeout (numeric): Time (in seconds) to wait for the scan to complete
        use_sudo (bool): If True, performs scan as superuser.

    Returns:
        [{str:str}]: A list of dictionaries, each of which represents a
        device. The dictionaries have two keys, 'address' and 'name',
        whose values are the MAC address and name of the device
        respectively.

        [{'address': device_1_address, 'name': device_1_name},
         {'address': device_2_address, 'name': device_2_name},
         ...
         {'address': device_n_address, 'name': device_n_name}]

    Raises:
        BluetoothLEError: If hcitool exits before the timeout.
    """
    command = 'hcitool lescan'
    if sudo_password: command = 'sudo %s' % command

    scan = pexpect.spawn('bash', ['-c', command], ignore_sighup=False)
    if sudo_password: 
        scan.sendline(sudo_password)
        scan.readline()  # exclude sudo message from scan.before

    try:
        # Not actually expecting anything, just using the convenient timeout
        scan.expect('nonsense value foobar', timeout=timeout)
    except pexpect.EOF:
        message = 'unexpected error while scanning: \n' + scan.before
        if 'Input/Output error' in scan.before:
            message += '\n - Try resetting the bluetooth controller.'
        elif 'Operation not permitted' in scan.before:
            message += '\n - Try running using sudo.'
        raise BluetoothLEError(message)
    except pexpect.TIMEOUT:
        devices = {}
        for line in scan.before.split('\r\n'):
            match = re.match(r'(([0-9A-Fa-f]{2}:?){6}) (\(?\w+\)?', line)
            if match is not None:
                address = match.group(1)
                name = match.group(3)
                if name == '(unknown)': name = None
                if address in devices:
                    if devices[address]['name'] is None and name is not None:
                        devices[address]['name'] = name
                else:
                    devices[address] = {'address': address, 'name': name}
        # Convert from dict_values([{str:str}]) to [{str:str}]
        return [device for device in devices.values()]
    finally:
        # try our best to kill sudo
        scan.sendcontrol('c'); scan.sendcontrol('x'); scan.sendcontrol('d');
        scan.close()

    return []  # failsafe


def reset_bluetooth_controller(sudo_password=None, hci_device='hci0', 
                               timeout=3.0):
    """ Reinitialises the bluetooth controller interface.

    This is accomplished by bringing down and up the interface using
    hciconfig. This requires superuser priveleges.

    Args:
        sudo_password (str): The password for super user priveleges. Do not
            hard code this! Fetch it from the user, then destroy it asap.
            If `None`, sudo priveleges will be assumed.
        hci_device (str): The interface to reinitialise.
        timeout (numeric): Time to wait before abandoning hope.

    Raises:
        BluetoothLEError: If hciconfig fails or the timeout is reached.
    """
    command = 'hciconfig %s reset' % hci_device
    if sudo_password: command = 'sudo %s' % command

    p = pexpect.spawn('bash', ['-c', command], ignore_sighup=False)
    if sudo_password: p.sendline(sudo_password)

    try:
        p.expect('nonsense value foobar', timeout=timeout)
    except pexpect.EOF:
        p.close()
        if p.exitstatus != 0:
            message = 'hciconfig failed: \n' + p.before
            if 'Operation not permitted' in p.before:
                message += '\n - Try running using sudo.'
            raise BluetoothLEError(message)
    except pexpect.TIMEOUT:
        # try our best to kill sudo
        p.sendcontrol('c'); p.sendcontrol('x'); p.sendcontrol('d');
        message = 'hciconfig did not complete before timeout'
        raise BluetoothLEError(message)
    finally:
        p.close()