15. Streaming plots with Bokeh
[1]:
import re
import asyncio
import time
import numpy as np
import pandas as pd
import serial
import serial.tools.list_ports
import bokeh.plotting
import bokeh.io
import bokeh.driving
bokeh.io.output_notebook()
notebook_url = "localhost:8888"
The setup for this lesson is the same as the previous one. We also have an ever-growing list of utility functions. If you want to skip to the section after the setup, execute the code cell below that contains the utility function and then click here.
[2]:
def find_arduino(port=None):
"""Get the name of the port that is connected to Arduino."""
if port is None:
ports = serial.tools.list_ports.comports()
for p in ports:
if p.manufacturer is not None and "Arduino" in p.manufacturer:
port = p.device
return port
def handshake_arduino(
arduino, sleep_time=1, print_handshake_message=False, handshake_code=0
):
"""Make sure connection is established by sending
and receiving bytes."""
# Close and reopen
arduino.close()
arduino.open()
# Chill out while everything gets set
time.sleep(sleep_time)
# Set a long timeout to complete handshake
timeout = arduino.timeout
arduino.timeout = 2
# Read and discard everything that may be in the input buffer
_ = arduino.read_all()
# Send request to Arduino
arduino.write(bytes([handshake_code]))
# Read in what Arduino sent
handshake_message = arduino.read_until()
# Send and receive request again
arduino.write(bytes([handshake_code]))
handshake_message = arduino.read_until()
# Print the handshake message, if desired
if print_handshake_message:
print("Handshake message: " + handshake_message.decode())
# Reset the timeout
arduino.timeout = timeout
def read_all(ser, read_buffer=b"", **args):
"""Read all available bytes from the serial port
and append to the read buffer.
Parameters
----------
ser : serial.Serial() instance
The device we are reading from.
read_buffer : bytes, default b''
Previous read buffer that is appended to.
Returns
-------
output : bytes
Bytes object that contains read_buffer + read.
Notes
-----
.. `**args` appears, but is never used. This is for
compatibility with `read_all_newlines()` as a
drop-in replacement for this function.
"""
# Set timeout to None to make sure we read all bytes
previous_timeout = ser.timeout
ser.timeout = None
in_waiting = ser.in_waiting
read = ser.read(size=in_waiting)
# Reset to previous timeout
ser.timeout = previous_timeout
return read_buffer + read
def read_all_newlines(ser, read_buffer=b"", n_reads=4):
"""Read data in until encountering newlines.
Parameters
----------
ser : serial.Serial() instance
The device we are reading from.
n_reads : int
The number of reads up to newlines
read_buffer : bytes, default b''
Previous read buffer that is appended to.
Returns
-------
output : bytes
Bytes object that contains read_buffer + read.
Notes
-----
.. This is a drop-in replacement for read_all().
"""
raw = read_buffer
for _ in range(n_reads):
raw += ser.read_until()
return raw
def parse_read(read):
"""Parse a read with time, volage data
Parameters
----------
read : byte string
Byte string with comma delimited time/voltage
measurements.
Returns
-------
time_ms : list of ints
Time points in milliseconds.
voltage : list of floats
Voltages in volts.
remaining_bytes : byte string
Remaining, unparsed bytes.
"""
time_ms = []
voltage = []
# Separate independent time/voltage measurements
pattern = re.compile(b"\d+|,")
raw_list = [
b"".join(pattern.findall(raw)).decode()
for raw in read.split(b"\r\n")
]
for raw in raw_list[:-1]:
try:
t, V = raw.split(",")
time_ms.append(int(t))
voltage.append(int(V) * 5 / 1023)
except:
pass
if len(raw_list) == 0:
return time_ms, voltage, b""
else:
return time_ms, voltage, raw_list[-1].encode()
async def daq_stream_async(
arduino,
data,
n_data=100,
delay=20,
n_trash_reads=5,
n_reads_per_chunk=4,
reader=read_all_newlines,
):
"""Obtain `n_data` data points from an Arduino stream
with a delay of `delay` milliseconds between each."""
# Specify delay
arduino.write(bytes([READ_DAQ_DELAY]) + (str(delay) + "x").encode())
# Turn on the stream
arduino.write(bytes([STREAM]))
# Read and throw out first few reads
i = 0
while i < n_trash_reads:
_ = arduino.read_until()
i += 1
# Receive data
read_buffer = [b""]
while len(data["time_ms"]) < n_data:
# Read in chunk of data
raw = reader(arduino, read_buffer=read_buffer[0], n_reads=n_reads_per_chunk)
# Parse it, passing if it is gibberish
try:
t, V, read_buffer[0] = parse_read(raw)
# Update data dictionary
data["time_ms"] += t
data["voltage"] += V
except:
pass
# Sleep 80% of the time before we need to start reading chunks
await asyncio.sleep(0.8 * n_reads_per_chunk * delay / 1000)
# Turn off the stream
arduino.write(bytes([ON_REQUEST]))
return pd.DataFrame(
{"time (ms)": data["time_ms"][:n_data], "voltage (V)": data["voltage"][:n_data]}
)
The schematic we will use is shown below.
The sketch is
const int voltagePin = A0;
const int HANDSHAKE = 0;
const int VOLTAGE_REQUEST = 1;
const int ON_REQUEST = 2;
const int STREAM = 3;
const int READ_DAQ_DELAY = 4;
// Initially, only send data upon request
int daqMode = ON_REQUEST;
// Default time between data acquisition is 100 ms
int daqDelay = 100;
// String to store input of DAQ delay
String daqDelayStr;
// Keep track of last data acquistion for delays
unsigned long timeOfLastDAQ = 0;
unsigned long printVoltage() {
// Read value from analog pin
int value = analogRead(voltagePin);
// Get the time point
unsigned long timeMilliseconds = millis();
// Write the result
if (Serial.availableForWrite()) {
String outstr = String(String(timeMilliseconds, DEC) + "," + String(value, DEC));
Serial.println(outstr);
}
// Return time of acquisition
return timeMilliseconds;
}
void setup() {
// Initialize serial communication
Serial.begin(115200);
}
void loop() {
// If we're streaming
if (daqMode == STREAM) {
if (millis() - timeOfLastDAQ >= daqDelay) {
timeOfLastDAQ = printVoltage();
}
}
// Check if data has been sent to Arduino and respond accordingly
if (Serial.available() > 0) {
// Read in request
int inByte = Serial.read();
// If data is requested, fetch it and write it, or handshake
switch(inByte) {
case VOLTAGE_REQUEST:
timeOfLastDAQ = printVoltage();
break;
case ON_REQUEST:
daqMode = ON_REQUEST;
break;
case STREAM:
daqMode = STREAM;
break;
case READ_DAQ_DELAY:
// Read in delay, knowing it is appended with an x
daqDelayStr = Serial.readStringUntil('x');
// Convert to int and store
daqDelay = daqDelayStr.toInt();
break;
case HANDSHAKE:
if (Serial.availableForWrite()) {
Serial.println("Message received.");
}
break;
}
}
}
Bokeh apps
Though this section is not a follow-along exercise, you should run it on your machine so you can see the dynamics.
Thus far, we have used Bokeh to make zoom-able, pan-able, save-able JavaScript-based plots of data. We also used it to make widgets that we could use to control Arduino. But Bokeh allows much more interactivity. We can update plots based on results of calculation by the Python interpreter. In our case, we want to update a plot of voltages coming off of the Arduino board in real time.
As an example of how a Bokeh app can be used to update a plot I build one below to dynamically plot a random walk. The walk will proceed with a dot doing the walk and the trail behind it represented as a line. To build a Bokeh app, we need to write a function that controls the app. The function for the random walker is shown below with an explanation following immediately.
[3]:
def random_walk(doc):
"""Bokeh app for a dynamic random walk of 1000 steps."""
rg = np.random.default_rng(3252)
p = bokeh.plotting.figure(
frame_width=200,
frame_height=200,
x_range=[-20, 20],
y_range=[-20, 20],
)
# Use ColumnDataSources for data for populating glyphs
source_line = bokeh.models.ColumnDataSource({"x": [0], "y": [0]})
source_dot = bokeh.models.ColumnDataSource({"x": [0], "y": [0]})
line = p.line(source=source_line, x="x", y="y")
dot = p.circle(source=source_dot, x="x", y="y", color="tomato", size=7)
@bokeh.driving.linear()
def update(step):
if step > 1000:
doc.remove_periodic_callback(pc)
else:
theta = rg.uniform(0, 2 * np.pi)
new_position = {
"x": [source_dot.data["x"][0] + np.cos(theta)],
"y": [source_dot.data["y"][0] + np.sin(theta)],
}
source_line.stream(new_position)
source_dot.data = new_position
doc.add_root(p)
# Add a periodic callback to be run every 20 milliseconds
pc = doc.add_periodic_callback(update, 20)
The argument of the function (traditionally called doc) is accessed to add any plots (or other Bokeh) to app and to add the callbacks. We first set up the figure. Next, we set up the data sources for the dot and line using Bokeh’s ColumnDataSource
. This data type may be dynamically updated in a Bokeh app, which is exactly what we want. After the data sources are set up, we set up an update function (we call it update()
here, but it could have any name). This function is called each time
a callback is triggered. At the end of the function defining the app, we add a periodic callback that calls the update function every 20 milliseconds. (Note that unlike time.sleep()
and asyncio.sleep()
, the time units for Bokeh’s periodic callbacks are millseconds.) In this case, we decorate the callback function with @bokeh.driving.linear()
. This results in the argument of update()
, step
, being advanced by one every time the function is called. This way we can keep track
of how many steps were taken. In the update function, if we have exceeded the number of desired steps, we cancel the periodic callbacks. Otherwise, we compute the next step of the random walk by computing a random angle for the step. We update the position of the walker by adding the step to it. Finally, we update the data sources for the dot and line. For the line, we use the stream()
method. This results in Bokeh only appending new data to the data source instead of pushing through the
whole data set for the plot each time. As the size of the data set being plotted grows, this give much better performance. For the dot, since it is only plotted as a single position, we update the source data to be the dot position.
To run our app, we use bokeh.io.show()
. We should also include the URL of the notebook (specified above in the input cell; you can see the URL by looking at the top of your browser).
Note: Bokeh apps, relying on a Python engine to run, do not render in the static HTML version (i.e., on the course website) of this lesson. So, if you are reading this on the website, you will not see the plot below.
[4]:
bokeh.io.show(random_walk, notebook_url=notebook_url)
Follow-along exercise 11: Plotting streaming data
We will expand on the work we did in the last lesson to acquire data asynchronously and push the data to a Bokeh plot for updating. To start with, as usual, we need to shake hands with Arduino.
[5]:
HANDSHAKE = 0
VOLTAGE_REQUEST = 1
ON_REQUEST = 2;
STREAM = 3;
READ_DAQ_DELAY = 4;
port = find_arduino()
arduino = serial.Serial(port, baudrate=115200)
handshake_arduino(arduino, print_handshake_message=True)
Handshake message: Message received.
Our strategy for building our app is this:
Set up a dictionary containing lists of data
Asynchronously collect data from Arduino that updates the data dictionary
Set up a periodic callback so Bokeh updates the plot from the data dictionary
To do this, we need to keep track of which data are included on the plot and which are new. Therefore, the data dictionary also contains a variable to remember how long the time point and voltage lists were the last time the plot was rendered.
[6]:
# Set up data dictionary
data = dict(prev_array_length=0, time_ms=[], voltage=[])
Next, we build the plotting app. Because the app must have a call signature app(doc)
, I like to write a function that returns an app. This allows me to have a more convenient API for specifying properties of the app. This app is essentially like the random walk app, except that we pull data out of the data dictionary as needed. We also have a rollover parameter, which specifies the maximum number of data points to be displayed on the plot. Only the most recent data points are displayed.
For time series data, like we’re plotting here, this results in a “scroll” across the plot, kind of like a stock ticker.
I have also included a keyword argument for the delay between plot updates. If the delay is too short, your computer will struggle trying to render the Bokeh plot at a high rate. In my experience, plots that are updated every 100 ms or less look like essentially continuous updates to the eye, so I use a plot delay of 90 ms.
[7]:
def potentiometer_app(data, n_data=100, rollover=400, plot_update_delay=90):
"""Return a function defining a Bokeh app for streaming
data up to `n_data` data points. A maximum of `rollover`
data points are shown at a time.
"""
def _app(doc):
# Instatiate figures
p = bokeh.plotting.figure(
frame_width=500,
frame_height=175,
x_axis_label="time (s)",
y_axis_label="voltage (V)",
y_range=[-0.2, 5.2],
)
# No padding on x_range makes data flush with end of plot
p.x_range.range_padding = 0
# Start with an empty column data source with time and voltage
source = bokeh.models.ColumnDataSource({"t": [], "V": []})
# Put a line glyph
r = p.line(source=source, x="t", y="V")
@bokeh.driving.linear()
def update(step):
# Shut off periodic callback if we have plotted all of the data
if step > n_data:
doc.remove_periodic_callback(pc)
else:
# Update plot by streaming in data
source.stream(
{
"t": np.array(data['time_ms'][data['prev_array_length']:]) / 1000,
"V": data['voltage'][data['prev_array_length']:],
},
rollover,
)
data['prev_array_length'] = len(data['time_ms'])
doc.add_root(p)
pc = doc.add_periodic_callback(update, plot_update_delay)
return _app
Now, to put the app to use! We need to show the app, and then create a task to acquire the data. The plot is then updated live! (Note that this is not viewable in the static HTML version of this lesson.)
[8]:
n_data = 1000
bokeh.io.show(potentiometer_app(data, n_data=n_data), notebook_url=notebook_url)
daq_task = asyncio.create_task(daq_stream_async(arduino, data, n_data=n_data, delay=20))
Although only the last 400 data points are visible on the live-updated plot, we still have all the data available and can retrieve them to the task.
[9]:
# Retrieve data from the task
df = daq_task.result()
# Convert time to seconds
df["time (s)"] = df["time (ms)"] / 1000
# Plot the full time-voltage trace
p = bokeh.plotting.figure(
frame_width=500,
frame_height=175,
x_axis_label="time (s)",
y_axis_label="voltage (V)",
y_range=[-0.2, 5.2],
)
p.x_range.range_padding = 0
p.line(source=df, x="time (s)", y="voltage (V)")
bokeh.io.show(p)
[10]:
arduino.close()
Do-it-yourself exercise 6: Etch-A-Sketch
An Etch-A-Sketch is a classic toy wherein a child (or adult!) turns knobs to draw lines. To get an idea of how it works, check out this video.
In this exercise, make an Etch-A-Sketch by using two potentiometers as the “knobs”. The voltages measured from analog inputs are then sent to Python and interpreted as positions in an x-y plane. You should have a Bokeh plot that gets updated as the knobs are turned.
Computing environment
[11]:
%load_ext watermark
%watermark -v -p numpy,pandas,serial,bokeh,jupyterlab
CPython 3.8.5
IPython 7.18.1
numpy 1.19.1
pandas 1.1.3
serial 3.4
bokeh 2.2.1
jupyterlab 2.2.6