14. Asynchronously receiving data from Arduino


[1]:
import re
import asyncio
import time

import numpy as np
import pandas as pd

import serial
import serial.tools.list_ports

import bokeh.plotting
import bokeh.io
import bokeh.driving
bokeh.io.output_notebook()
Loading BokehJS ...

In this lesson, you will learn how to use Python’s built-in asynchronous capabilities to constantly receive data from Arduino without blocking so that you can use the Python interpreter to do other tasks.

Setup

The setup for this lesson is the same as the previous one. If you want to skip to the section after the setup, execute the code cell below that contains the utility functions and then click here.

[2]:
def find_arduino(port=None):
    """Get the name of the port that is connected to Arduino."""
    if port is None:
        ports = serial.tools.list_ports.comports()
        for p in ports:
            if p.manufacturer is not None and "Arduino" in p.manufacturer:
                port = p.device
    return port


def handshake_arduino(
    arduino, sleep_time=1, print_handshake_message=False, handshake_code=0
):
    """Make sure connection is established by sending
    and receiving bytes."""
    # Close and reopen
    arduino.close()
    arduino.open()

    # Chill out while everything gets set
    time.sleep(sleep_time)

    # Set a long timeout to complete handshake
    timeout = arduino.timeout
    arduino.timeout = 2

    # Read and discard everything that may be in the input buffer
    _ = arduino.read_all()

    # Send request to Arduino
    arduino.write(bytes([handshake_code]))

    # Read in what Arduino sent
    handshake_message = arduino.read_until()

    # Send and receive request again
    arduino.write(bytes([handshake_code]))
    handshake_message = arduino.read_until()

    # Print the handshake message, if desired
    if print_handshake_message:
        print("Handshake message: " + handshake_message.decode())

    # Reset the timeout
    arduino.timeout = timeout

The schematic we will use is shown below.

Arduino data transfer schematic

The sketch is

const int voltagePin = A0;

const int HANDSHAKE = 0;
const int VOLTAGE_REQUEST = 1;
const int ON_REQUEST = 2;
const int STREAM = 3;
const int READ_DAQ_DELAY = 4;

// Initially, only send data upon request
int daqMode = ON_REQUEST;

// Default time between data acquisition is 100 ms
int daqDelay = 100;

// String to store input of DAQ delay
String daqDelayStr;


// Keep track of last data acquistion for delays
unsigned long timeOfLastDAQ = 0;


unsigned long printVoltage() {
  // Read value from analog pin
  int value = analogRead(voltagePin);

  // Get the time point
  unsigned long timeMilliseconds = millis();

  // Write the result
  if (Serial.availableForWrite()) {
    String outstr = String(String(timeMilliseconds, DEC) + "," + String(value, DEC));
    Serial.println(outstr);
  }

  // Return time of acquisition
  return timeMilliseconds;
}


void setup() {
  // Initialize serial communication
  Serial.begin(115200);
}


void loop() {
  // If we're streaming
  if (daqMode == STREAM) {
    if (millis() - timeOfLastDAQ >= daqDelay) {
      timeOfLastDAQ = printVoltage();
    }
  }

  // Check if data has been sent to Arduino and respond accordingly
  if (Serial.available() > 0) {
    // Read in request
    int inByte = Serial.read();

    // If data is requested, fetch it and write it, or handshake
    switch(inByte) {
      case VOLTAGE_REQUEST:
        timeOfLastDAQ = printVoltage();
        break;
      case ON_REQUEST:
        daqMode = ON_REQUEST;
        break;
      case STREAM:
        daqMode = STREAM;
        break;
      case READ_DAQ_DELAY:
        // Read in delay, knowing it is appended with an x
        daqDelayStr = Serial.readStringUntil('x');

        // Convert to int and store
        daqDelay = daqDelayStr.toInt();

        break;
      case HANDSHAKE:
        if (Serial.availableForWrite()) {
          Serial.println("Message received.");
        }
        break;
    }
  }
}

Why do we need asynchrony?

In the previous lesson, when we were streaming in data, we had a structure like this:

while i < n_data:
    raw = arduino.read_until()

    try:
        t, V = parse_raw(raw)
        time_ms[i] = t
        voltage[i] = V
        i += 1
    except:
        pass

Most of the time, an iteration of the while loop resulted in a pass. We were essentially telling the Python interpreter to constantly be trying to read and parse. Just like delay() in Arduino is blocking, so too is this for the Python interpreter. While the while loop is running, the interpreter cannot attend to any other tasks.

When you are building devices, there are plenty of other tasks you want the Python interpreter to be doing while it is acquiring data. At the very least, you may want it to be listening for more user input to stop acquiring data. But you may also want to perform calculations on the incoming data (such as digital filtering), control and/or receive data from other connected devices, or even just mess around in your Jupyter notebook.

In order to do these things, you want the data acquisition to happen asynchronously. You want the interpreter to occasionally read and parse data, but be free to do whatever else you want it to do when it is not reading and parsing data.

Reading data in chunks

As a first step toward asynchrony, we will write a function to read data in chunks. Instead of constantly monitoring the data coming in over the serial connection, we would rather occasionally check the serial connection to see if there are any data in the input buffer. If there is, we read in whatever is in the input buffer to clear it, go off and process that, and then wait a while before checking again. During that waiting time, you can have the interpreter do other tasks.

Warning: Don’t wait too long to read, though! You do not want to overrun the USB input buffer size on your computer. Arduino’s output buffer is 64 bytes, and computers can have default input buffer sizes as low as 64 bytes as well. (I think most computers these days have input buffer sizes around 1024 bytes, but it does vary from machine to machine.)

Chunk reading for non-corrupted data

The function below reads in all of the data that is in the input buffer and returns the data as a byte string. We should specify a short timeout so that reading will stop before the buffer starts filling up again.

[3]:
def read_all(ser, read_buffer=b"", **args):
    """Read all available bytes from the serial port
    and append to the read buffer.

    Parameters
    ----------
    ser : serial.Serial() instance
        The device we are reading from.
    read_buffer : bytes, default b''
        Previous read buffer that is appended to.

    Returns
    -------
    output : bytes
        Bytes object that contains read_buffer + read.

    Notes
    -----
    .. `**args` appears, but is never used. This is for
       compatibility with `read_all_newlines()` as a
       drop-in replacement for this function.
    """
    # Set timeout to None to make sure we read all bytes
    previous_timeout = ser.timeout
    ser.timeout = None

    in_waiting = ser.in_waiting
    read = ser.read(size=in_waiting)

    # Reset to previous timeout
    ser.timeout = previous_timeout

    return read_buffer + read

For our present application, in which we read in comma-delimited time-voltage data, the byte string returned from this function might look like this:

b'1032,541\r\n1052,542\r\n1073,554\r\n1093,5'

Note that it does not end in a carriage return and newline. Those characters might not be in the read buffer yet, and since we are not using read_until(), we will not keep reading until we get those terminating characters. So, if we are parsing the output of this function, we should keep the last incomplete part of the data (in this case, b'1093,5' around for the next read.

Here is a parser that returns both the times and voltages as lists, as well as the remaining bytes that we will pass as the read_buffer kwarg in the read_all() function. There is some error checking. The only allowed characters are carriage returns, new lines, commas, and digits. Any message having other characters is discarded.

[4]:
def parse_read(read):
    """Parse a read with time, voltage data

    Parameters
    ----------
    read : byte string
        Byte string with comma delimited time/voltage
        measurements.

    Returns
    -------
    time_ms : list of ints
        Time points in milliseconds.
    voltage : list of floats
        Voltages in volts.
    remaining_bytes : byte string
        Remaining, unparsed bytes.
    """
    time_ms = []
    voltage = []

    # Separate independent time/voltage measurements
    pattern = re.compile(b"\d+|,")
    raw_list = [
        b"".join(pattern.findall(raw)).decode()
        for raw in read.split(b"\r\n")
    ]

    for raw in raw_list[:-1]:
        try:
            t, V = raw.split(",")
            time_ms.append(int(t))
            voltage.append(int(V) * 5 / 1023)
        except:
            pass

    if len(raw_list) == 0:
        return time_ms, voltage, b""
    else:
        return time_ms, voltage, raw_list[-1].encode()

Chunk reading with corrupted data

We discovered that on Windows machines, reads made with pySerial can sometimes result in corrupted bytes. This makes the read-in string unusable, and in many cases un-parsable because the resulting bytes do not correspond to any characters in ASCII. I am not sure exactly why this happens, but I suspect it is due to the read of a given byte being incomplete, with the read being interrupted before the stop bit. To counteract this, we can instead read chunks that must terminate in a newline using read_until(). This blocks all other processes until the complete newline byte is read. This also ensures that all bytes preceding the newline are read in their entirety as well.

[5]:
def read_all_newlines(ser, read_buffer=b"", n_reads=4):
    """Read data in until encountering newlines.

    Parameters
    ----------
    ser : serial.Serial() instance
        The device we are reading from.
    n_reads : int
        The number of reads up to newlines
    read_buffer : bytes, default b''
        Previous read buffer that is appended to.

    Returns
    -------
    output : bytes
        Bytes object that contains read_buffer + read.

    Notes
    -----
    .. This is a drop-in replacement for read_all().
    """
    raw = read_buffer
    for _ in range(n_reads):
        raw += ser.read_until()

    return raw

asyncio

While this section is not formally a follow-along exercise, I highly recommend running the code in this notebook because you will note delays as the code runs that will help you understand how ``asyncio`` is working.

Python has handy built-in asynchronous capabilities using the asyncio module from the standard library. It was introduced recently, in Python 3.5, and has had changes and deprecations since. The version in Python 3.8 has nice high-level functionality and has a stable API, so it is important that you are using Python 3.8.

I will give a brief overview of how it works here, but you would be well-served to read the documentation, most importantly the coroutines and tasks section.

At the center of asyncio’s high-level functionality are awaitables. An awaitable is a process that the interpreter can suspend such that it is not blocking. A very important awaitable is asyncio.sleep(), which is one we will put to use.

Aside from sleeping, the awaitables we will use are coroutines and tasks. You can think of a coroutine as a function that you can start and stop and start again. A task runs a coroutine. As usual, this is best seen by example.

We will start by making a coroutine that is a greeting in English. It says “hello” and then waits one second to say “world.” It returns a string describing what the message was. We would do this in a synchronous way (so it is a function and not a coroutine) like this:

[6]:
def english(exclaim=False):
    print("Hello, ")
    time.sleep(1)
    print("world" + ("!" if exclaim else "."))

    return "The message was a greeting to the world."

We can run this function, and it works as expected.

[7]:
message = english(exclaim=True)
Hello,
world!

The problem is that the function, like all functions in Python, blocked. While waiting for a second to see “world,” the Python interpreter was busy.

Now, let’s write an asynchronous version, that is a coroutine.

[8]:
async def english_async(exclaim=False):
    print("Hello, ")
    await asyncio.sleep(1)
    print("world" + ("!" if exclaim else "."))

    return "The message was a greeting to the world."

The async def keyword signifies that this is not a function, but a coroutine. That means the interpreter can start running the coroutine, leave it and do something else, and then run it again. It can only leave the coroutine where an awaitable is run. To run an awaitable within a coroutine, we use the await keyword. So, when we run await asyncio.sleep(1), the Python interpreter turns its attention away from the english_async() coroutine until asyncio.sleep() returns, which will happen after one second.

We cannot just run a coroutine like it is a function. Look:

[9]:
english_async(exclaim=True)
[9]:
<coroutine object english_async at 0x7fae93a45140>

We get back a coroutine. To run it, we need a running event loop. An event loop enables asynchronous computing by listening for requests to do something, and then dispatching resources to do the requested calculation. Each thread (which you can think of for our purposes as one core of your CPU) can have either zero or one event loops. If you are running a Jupyter notebook, there is an active event loop; that is how JupyterLab runs, waiting for you to execute a cell. If you are not in a Jupyter notebook, you probably do not have an event loop running, so you need to start one. We will discuss how to start and run an event loop outside of JupyterLab later in this lesson. For now, we will assume you have a running event loop, as you do in a Jupyter notebook.

To run the coroutine, you can create a task using asyncio.create_task(). Note that “calling” a coroutine like a function returns a coroutine, so the argument you pass into asyncio.create_task() is how you would call the coroutine, including all arguments and keyword arguments. Upon creation, the coroutine is run.

[10]:
task_english = asyncio.create_task(english_async(exclaim=True))
Hello,
world!

You can access the return value of the coroutine using the result() method of the task. Of course, you should first check the done() method of the task to see if it has completed.

[11]:
task_english.done()
[11]:
True

And we can safely retrieve the result.

[12]:
task_english.result()
[12]:
'The message was a greeting to the world.'

Now, let’s make another coroutine that says the same greeting in Spanish. For demonstration purposes, this function will only wait a half second between the two words.

[13]:
async def spanish_async(exclaim=False):
    print(("  ¡" if exclaim else "  ") + "Hola, ")
    await asyncio.sleep(0.5)
    print("  mundo" + ("!" if exclaim else "."))

    return("El mensaje fue un saludo al mundo.")

We can run this coroutine as we did for the English one.

[14]:
task_spanish = asyncio.create_task(spanish_async(exclaim=True))
  ¡Hola,
  mundo!

With asynchronous computing, we can run the two coroutines concurrently! There are several ways to do this. First, we can create tasks one after another. The first task is created, “Hello,” is printed, and then the second task is created. (This time, we won’t exclaim.)

[15]:
task_english = asyncio.create_task(english_async())
task_spanish = asyncio.create_task(spanish_async())
Hello,
  Hola,
  mundo.
world.

Because the delay is shorter for the Spanish version, the entire message gets printed before the English message is complete.

As another option, we can gather the coroutines together using asyncio.gather().

[16]:
task_english_spanish = asyncio.gather(english_async(), spanish_async())
Hello,
  Hola,
  mundo.
world.

To get the return values, we gain use task_english_spanish.result().

[17]:
task_english_spanish.result()
[17]:
['The message was a greeting to the world.',
 'El mensaje fue un saludo al mundo.']

Note that the result is the return values from the two coroutines as a list.

Canceling a task

Once a task is created, it may be interrupted and canceled using the cancel() method of the task. For example, we can cancel the English greeting before the second word comes out.

[18]:
# Create the task
task_english = asyncio.create_task(english_async())

# Wait a half second
await asyncio.sleep(0.5)

# Cancel the task
successfully_canceled = task_english.cancel()
Hello,

Note that the cancel() method requests a cancellation, but cancellation is not guaranteed. You should read the asyncio documentation for more information.

A canceled job will both be marked as done and canceled.

[19]:
task_english.done(), task_english.cancelled()
[19]:
(True, True)

Since it was not allowed to return, though, the result will be a CancelledError.

[20]:
task_english.result()
---------------------------------------------------------------------------
CancelledError                            Traceback (most recent call last)
<ipython-input-20-f5664860679d> in <module>
----> 1 task_english.result()

CancelledError:

Running without an existing event loop

If you do not have a running event loop on your thread, which will typically be the case if you are running outside of JupyterLab, you need to start an event loop. Fortunately, asynchio provides a convenient way to start (and automatically terminate upon completion of all coroutines) with its asyncio.run() function. To use it, define a coroutine that awaits all of the tasks you want to run and then pass that coroutine as an argument to asyncio.run(). For example, to run the English and Spanish greetings concurrently, do the following.

async def main():
    gathered = asyncio.gather(english_async(), spanish_async())
    await gathered

    return gathered.result()


asyncio.run(main())

Receiving data asynchronously

Now that we understand how asynchrony works in Python, let’s receive some data! We’ll of course start by shaking hands with Arduino.

[21]:
HANDSHAKE = 0
VOLTAGE_REQUEST = 1
ON_REQUEST = 2;
STREAM = 3;
READ_DAQ_DELAY = 4;

# Windows users may need to give COM port for find_arduino()
port = find_arduino()

# Connect and handshake
arduino = serial.Serial(port, baudrate=115200)
handshake_arduino(arduino, print_handshake_message=True)
Handshake message: Message received.

Now we can write a coroutine to acquire data. This is very much like the daq_stream() function we encountered in the last lesson, except for a couple key differences.

  1. We will read the data in chunks using the functions we wrote at the beginning of this lesson.

  2. I read in the first few messages sent from Arduino after turning on the stream and discard them, just to ensure the input buffer of my computer is cleared and we’re getting good clean reads.

  3. We sleep between acquisitions. I choose to sleep about 80% of the time of the acquisitions. This ensures that I will never have too many bytes in the input buffer, but I am still not checking as often as I could be. (Note that the read_all_newlines() function will take longer to run than the read_all() function because it has to wait until Arduino sends its final newline. It is blocking while it is waiting. This should not be a major slowdown, though.)

  4. The function takes an input reader, which specifies which function we want to use to read in the serial data. By default, we use read_all_newlines() because it does not have the aforementioned issues on Windows.

  5. I set up a dictionary that gets updated with data as it is read.

[22]:
# Set up data dictionary
data = dict(time_ms=[], voltage=[])


async def daq_stream_async(
    arduino,
    data,
    n_data=100,
    delay=20,
    n_trash_reads=5,
    n_reads_per_chunk=4,
    reader=read_all_newlines,
):
    """Obtain `n_data` data points from an Arduino stream
    with a delay of `delay` milliseconds between each."""
    # Specify delay
    arduino.write(bytes([READ_DAQ_DELAY]) + (str(delay) + "x").encode())

    # Turn on the stream
    arduino.write(bytes([STREAM]))

    # Read and throw out first few reads
    i = 0
    while i < n_trash_reads:
        _ = arduino.read_until()
        i += 1

    # Receive data
    read_buffer = [b""]
    while len(data["time_ms"]) < n_data:
        # Read in chunk of data
        raw = reader(arduino, read_buffer=read_buffer[0], n_reads=n_reads_per_chunk)

        # Parse it, passing if it is gibberish
        try:
            t, V, read_buffer[0] = parse_read(raw)

            # Update data dictionary
            data["time_ms"] += t
            data["voltage"] += V
        except:
            pass

        # Sleep 80% of the time before we need to start reading chunks
        await asyncio.sleep(0.8 * n_reads_per_chunk * delay / 1000)

    # Turn off the stream
    arduino.write(bytes([ON_REQUEST]))

    return pd.DataFrame(
        {"time (ms)": data["time_ms"][:n_data], "voltage (V)": data["voltage"][:n_data]}
    )

To acquire data using this coroutine, we create a task.

[23]:
daq_task = asyncio.create_task(daq_stream_async(arduino, data, n_data=1000, delay=20))

We can retrieve the data frame from the task’s result and make a plot.

[24]:
# Get the data from from the result
df = daq_task.result()

# Convert milliseconds to seconds
df['time (sec)'] = df['time (ms)'] / 1000

# Plot!
p = bokeh.plotting.figure(
    x_axis_label='time (s)',
    y_axis_label='voltage (V)',
    frame_height=175,
    frame_width=500,
    x_range=[df['time (sec)'].min(), df['time (sec)'].max()],
)
p.line(source=df, x='time (sec)', y='voltage (V)')

bokeh.io.show(p)
[25]:
arduino.close()

Computing environment

[26]:
%load_ext watermark
%watermark -v -p numpy,pandas,serial,bokeh,jupyterlab
CPython 3.8.5
IPython 7.18.1

numpy 1.19.1
pandas 1.1.3
serial 3.4
bokeh 2.2.1
jupyterlab 2.2.6