15. Streaming plots with Bokeh


[1]:
import re
import asyncio
import time

import numpy as np
import pandas as pd

import serial
import serial.tools.list_ports

import bokeh.plotting
import bokeh.io
import bokeh.driving
bokeh.io.output_notebook()

notebook_url = "localhost:8888"
Loading BokehJS ...

The setup for this lesson is the same as the previous one. We also have an ever-growing list of utility functions. If you want to skip to the section after the setup, execute the code cell below that contains the utility function and then click here.

[2]:
def find_arduino(port=None):
    """Get the name of the port that is connected to Arduino."""
    if port is None:
        ports = serial.tools.list_ports.comports()
        for p in ports:
            if p.manufacturer is not None and "Arduino" in p.manufacturer:
                port = p.device
    return port


def handshake_arduino(
    arduino, sleep_time=1, print_handshake_message=False, handshake_code=0
):
    """Make sure connection is established by sending
    and receiving bytes."""
    # Close and reopen
    arduino.close()
    arduino.open()

    # Chill out while everything gets set
    time.sleep(sleep_time)

    # Set a long timeout to complete handshake
    timeout = arduino.timeout
    arduino.timeout = 2

    # Read and discard everything that may be in the input buffer
    _ = arduino.read_all()

    # Send request to Arduino
    arduino.write(bytes([handshake_code]))

    # Read in what Arduino sent
    handshake_message = arduino.read_until()

    # Send and receive request again
    arduino.write(bytes([handshake_code]))
    handshake_message = arduino.read_until()

    # Print the handshake message, if desired
    if print_handshake_message:
        print("Handshake message: " + handshake_message.decode())

    # Reset the timeout
    arduino.timeout = timeout


def read_all(ser, read_buffer=b"", **args):
    """Read all available bytes from the serial port
    and append to the read buffer.

    Parameters
    ----------
    ser : serial.Serial() instance
        The device we are reading from.
    read_buffer : bytes, default b''
        Previous read buffer that is appended to.

    Returns
    -------
    output : bytes
        Bytes object that contains read_buffer + read.

    Notes
    -----
    .. `**args` appears, but is never used. This is for
       compatibility with `read_all_newlines()` as a
       drop-in replacement for this function.
    """
    # Set timeout to None to make sure we read all bytes
    previous_timeout = ser.timeout
    ser.timeout = None

    in_waiting = ser.in_waiting
    read = ser.read(size=in_waiting)

    # Reset to previous timeout
    ser.timeout = previous_timeout

    return read_buffer + read


def read_all_newlines(ser, read_buffer=b"", n_reads=4):
    """Read data in until encountering newlines.

    Parameters
    ----------
    ser : serial.Serial() instance
        The device we are reading from.
    n_reads : int
        The number of reads up to newlines
    read_buffer : bytes, default b''
        Previous read buffer that is appended to.

    Returns
    -------
    output : bytes
        Bytes object that contains read_buffer + read.

    Notes
    -----
    .. This is a drop-in replacement for read_all().
    """
    raw = read_buffer
    for _ in range(n_reads):
        raw += ser.read_until()

    return raw


def parse_read(read):
    """Parse a read with time, volage data

    Parameters
    ----------
    read : byte string
        Byte string with comma delimited time/voltage
        measurements.

    Returns
    -------
    time_ms : list of ints
        Time points in milliseconds.
    voltage : list of floats
        Voltages in volts.
    remaining_bytes : byte string
        Remaining, unparsed bytes.
    """
    time_ms = []
    voltage = []

    # Separate independent time/voltage measurements
    pattern = re.compile(b"\d+|,")
    raw_list = [
        b"".join(pattern.findall(raw)).decode()
        for raw in read.split(b"\r\n")
    ]

    for raw in raw_list[:-1]:
        try:
            t, V = raw.split(",")
            time_ms.append(int(t))
            voltage.append(int(V) * 5 / 1023)
        except:
            pass

    if len(raw_list) == 0:
        return time_ms, voltage, b""
    else:
        return time_ms, voltage, raw_list[-1].encode()


async def daq_stream_async(
    arduino,
    data,
    n_data=100,
    delay=20,
    n_trash_reads=5,
    n_reads_per_chunk=4,
    reader=read_all_newlines,
):
    """Obtain `n_data` data points from an Arduino stream
    with a delay of `delay` milliseconds between each."""
    # Specify delay
    arduino.write(bytes([READ_DAQ_DELAY]) + (str(delay) + "x").encode())

    # Turn on the stream
    arduino.write(bytes([STREAM]))

    # Read and throw out first few reads
    i = 0
    while i < n_trash_reads:
        _ = arduino.read_until()
        i += 1

    # Receive data
    read_buffer = [b""]
    while len(data["time_ms"]) < n_data:
        # Read in chunk of data
        raw = reader(arduino, read_buffer=read_buffer[0], n_reads=n_reads_per_chunk)

        # Parse it, passing if it is gibberish
        try:
            t, V, read_buffer[0] = parse_read(raw)

            # Update data dictionary
            data["time_ms"] += t
            data["voltage"] += V
        except:
            pass

        # Sleep 80% of the time before we need to start reading chunks
        await asyncio.sleep(0.8 * n_reads_per_chunk * delay / 1000)

    # Turn off the stream
    arduino.write(bytes([ON_REQUEST]))

    return pd.DataFrame(
        {"time (ms)": data["time_ms"][:n_data], "voltage (V)": data["voltage"][:n_data]}
    )

The schematic we will use is shown below.

Arduino data transfer schematic

The sketch is

const int voltagePin = A0;

const int HANDSHAKE = 0;
const int VOLTAGE_REQUEST = 1;
const int ON_REQUEST = 2;
const int STREAM = 3;
const int READ_DAQ_DELAY = 4;

// Initially, only send data upon request
int daqMode = ON_REQUEST;

// Default time between data acquisition is 100 ms
int daqDelay = 100;

// String to store input of DAQ delay
String daqDelayStr;


// Keep track of last data acquistion for delays
unsigned long timeOfLastDAQ = 0;


unsigned long printVoltage() {
  // Read value from analog pin
  int value = analogRead(voltagePin);

  // Get the time point
  unsigned long timeMilliseconds = millis();

  // Write the result
  if (Serial.availableForWrite()) {
    String outstr = String(String(timeMilliseconds, DEC) + "," + String(value, DEC));
    Serial.println(outstr);
  }

  // Return time of acquisition
  return timeMilliseconds;
}


void setup() {
  // Initialize serial communication
  Serial.begin(115200);
}


void loop() {
  // If we're streaming
  if (daqMode == STREAM) {
    if (millis() - timeOfLastDAQ >= daqDelay) {
      timeOfLastDAQ = printVoltage();
    }
  }

  // Check if data has been sent to Arduino and respond accordingly
  if (Serial.available() > 0) {
    // Read in request
    int inByte = Serial.read();

    // If data is requested, fetch it and write it, or handshake
    switch(inByte) {
      case VOLTAGE_REQUEST:
        timeOfLastDAQ = printVoltage();
        break;
      case ON_REQUEST:
        daqMode = ON_REQUEST;
        break;
      case STREAM:
        daqMode = STREAM;
        break;
      case READ_DAQ_DELAY:
        // Read in delay, knowing it is appended with an x
        daqDelayStr = Serial.readStringUntil('x');

        // Convert to int and store
        daqDelay = daqDelayStr.toInt();

        break;
      case HANDSHAKE:
        if (Serial.availableForWrite()) {
          Serial.println("Message received.");
        }
        break;
    }
  }
}

Bokeh apps

Though this section is not a follow-along exercise, you should run it on your machine so you can see the dynamics.

Thus far, we have used Bokeh to make zoom-able, pan-able, save-able JavaScript-based plots of data. We also used it to make widgets that we could use to control Arduino. But Bokeh allows much more interactivity. We can update plots based on results of calculation by the Python interpreter. In our case, we want to update a plot of voltages coming off of the Arduino board in real time.

As an example of how a Bokeh app can be used to update a plot I build one below to dynamically plot a random walk. The walk will proceed with a dot doing the walk and the trail behind it represented as a line. To build a Bokeh app, we need to write a function that controls the app. The function for the random walker is shown below with an explanation following immediately.

[3]:
def random_walk(doc):
    """Bokeh app for a dynamic random walk of 1000 steps."""
    rg = np.random.default_rng(3252)

    p = bokeh.plotting.figure(
        frame_width=200,
        frame_height=200,
        x_range=[-20, 20],
        y_range=[-20, 20],
    )

    # Use ColumnDataSources for data for populating glyphs
    source_line = bokeh.models.ColumnDataSource({"x": [0], "y": [0]})
    source_dot = bokeh.models.ColumnDataSource({"x": [0], "y": [0]})
    line = p.line(source=source_line, x="x", y="y")
    dot = p.circle(source=source_dot, x="x", y="y", color="tomato", size=7)

    @bokeh.driving.linear()
    def update(step):
        if step > 1000:
            doc.remove_periodic_callback(pc)
        else:
            theta = rg.uniform(0, 2 * np.pi)
            new_position = {
                "x": [source_dot.data["x"][0] + np.cos(theta)],
                "y": [source_dot.data["y"][0] + np.sin(theta)],
            }
            source_line.stream(new_position)
            source_dot.data = new_position

    doc.add_root(p)

    # Add a periodic callback to be run every 20 milliseconds
    pc = doc.add_periodic_callback(update, 20)

The argument of the function (traditionally called doc) is accessed to add any plots (or other Bokeh) to app and to add the callbacks. We first set up the figure. Next, we set up the data sources for the dot and line using Bokeh’s ColumnDataSource. This data type may be dynamically updated in a Bokeh app, which is exactly what we want. After the data sources are set up, we set up an update function (we call it update() here, but it could have any name). This function is called each time a callback is triggered. At the end of the function defining the app, we add a periodic callback that calls the update function every 20 milliseconds. (Note that unlike time.sleep() and asyncio.sleep(), the time units for Bokeh’s periodic callbacks are millseconds.) In this case, we decorate the callback function with @bokeh.driving.linear(). This results in the argument of update(), step, being advanced by one every time the function is called. This way we can keep track of how many steps were taken. In the update function, if we have exceeded the number of desired steps, we cancel the periodic callbacks. Otherwise, we compute the next step of the random walk by computing a random angle for the step. We update the position of the walker by adding the step to it. Finally, we update the data sources for the dot and line. For the line, we use the stream() method. This results in Bokeh only appending new data to the data source instead of pushing through the whole data set for the plot each time. As the size of the data set being plotted grows, this give much better performance. For the dot, since it is only plotted as a single position, we update the source data to be the dot position.

To run our app, we use bokeh.io.show(). We should also include the URL of the notebook (specified above in the input cell; you can see the URL by looking at the top of your browser).

Note: Bokeh apps, relying on a Python engine to run, do not render in the static HTML version (i.e., on the course website) of this lesson. So, if you are reading this on the website, you will not see the plot below.

[4]:
bokeh.io.show(random_walk, notebook_url=notebook_url)

Follow-along exercise 11: Plotting streaming data

We will expand on the work we did in the last lesson to acquire data asynchronously and push the data to a Bokeh plot for updating. To start with, as usual, we need to shake hands with Arduino.

[5]:
HANDSHAKE = 0
VOLTAGE_REQUEST = 1
ON_REQUEST = 2;
STREAM = 3;
READ_DAQ_DELAY = 4;

port = find_arduino()
arduino = serial.Serial(port, baudrate=115200)
handshake_arduino(arduino, print_handshake_message=True)
Handshake message: Message received.

Our strategy for building our app is this:

  • Set up a dictionary containing lists of data

  • Asynchronously collect data from Arduino that updates the data dictionary

  • Set up a periodic callback so Bokeh updates the plot from the data dictionary

To do this, we need to keep track of which data are included on the plot and which are new. Therefore, the data dictionary also contains a variable to remember how long the time point and voltage lists were the last time the plot was rendered.

[6]:
# Set up data dictionary
data = dict(prev_array_length=0, time_ms=[], voltage=[])

Next, we build the plotting app. Because the app must have a call signature app(doc), I like to write a function that returns an app. This allows me to have a more convenient API for specifying properties of the app. This app is essentially like the random walk app, except that we pull data out of the data dictionary as needed. We also have a rollover parameter, which specifies the maximum number of data points to be displayed on the plot. Only the most recent data points are displayed. For time series data, like we’re plotting here, this results in a “scroll” across the plot, kind of like a stock ticker.

I have also included a keyword argument for the delay between plot updates. If the delay is too short, your computer will struggle trying to render the Bokeh plot at a high rate. In my experience, plots that are updated every 100 ms or less look like essentially continuous updates to the eye, so I use a plot delay of 90 ms.

[7]:
def potentiometer_app(data, n_data=100, rollover=400, plot_update_delay=90):
    """Return a function defining a Bokeh app for streaming
    data up to `n_data` data points. A maximum of `rollover`
    data points are shown at a time.
    """
    def _app(doc):
        # Instatiate figures
        p = bokeh.plotting.figure(
            frame_width=500,
            frame_height=175,
            x_axis_label="time (s)",
            y_axis_label="voltage (V)",
            y_range=[-0.2, 5.2],
        )

        # No padding on x_range makes data flush with end of plot
        p.x_range.range_padding = 0

        # Start with an empty column data source with time and voltage
        source = bokeh.models.ColumnDataSource({"t": [], "V": []})

        # Put a line glyph
        r = p.line(source=source, x="t", y="V")

        @bokeh.driving.linear()
        def update(step):
            # Shut off periodic callback if we have plotted all of the data
            if step > n_data:
                doc.remove_periodic_callback(pc)
            else:
                # Update plot by streaming in data
                source.stream(
                    {
                        "t": np.array(data['time_ms'][data['prev_array_length']:]) / 1000,
                        "V": data['voltage'][data['prev_array_length']:],
                    },
                    rollover,
                )
                data['prev_array_length'] = len(data['time_ms'])

        doc.add_root(p)
        pc = doc.add_periodic_callback(update, plot_update_delay)


    return _app

Now, to put the app to use! We need to show the app, and then create a task to acquire the data. The plot is then updated live! (Note that this is not viewable in the static HTML version of this lesson.)

[8]:
n_data = 1000

bokeh.io.show(potentiometer_app(data, n_data=n_data), notebook_url=notebook_url)
daq_task = asyncio.create_task(daq_stream_async(arduino, data, n_data=n_data, delay=20))

Although only the last 400 data points are visible on the live-updated plot, we still have all the data available and can retrieve them to the task.

[9]:
# Retrieve data from the task
df = daq_task.result()

# Convert time to seconds
df["time (s)"] = df["time (ms)"] / 1000

# Plot the full time-voltage trace
p = bokeh.plotting.figure(
    frame_width=500,
    frame_height=175,
    x_axis_label="time (s)",
    y_axis_label="voltage (V)",
    y_range=[-0.2, 5.2],
)

p.x_range.range_padding = 0

p.line(source=df, x="time (s)", y="voltage (V)")

bokeh.io.show(p)
[10]:
arduino.close()

Do-it-yourself exercise 6: Etch-A-Sketch

An Etch-A-Sketch is a classic toy wherein a child (or adult!) turns knobs to draw lines. To get an idea of how it works, check out this video.

In this exercise, make an Etch-A-Sketch by using two potentiometers as the “knobs”. The voltages measured from analog inputs are then sent to Python and interpreted as positions in an x-y plane. You should have a Bokeh plot that gets updated as the knobs are turned.


Computing environment

[11]:
%load_ext watermark
%watermark -v -p numpy,pandas,serial,bokeh,jupyterlab
CPython 3.8.5
IPython 7.18.1

numpy 1.19.1
pandas 1.1.3
serial 3.4
bokeh 2.2.1
jupyterlab 2.2.6