Examples
Practical code examples for common use cases with SiFi Bridge Python. Examples written using version 2.0.0-b10
Basic Examples
Simple ECG Recording
Record ECG data for a fixed duration and save to a list.
import time
import sifi_bridge_py as sbp
# 1. Create a SifiBridge instance, which manages the underlying sifibridge
# CLI process and exposes a Python API to talk to a SiFi device.
sb = sbp.SifiBridge()
# 2. Connect to the first available SiFi device (BioPoint or SiFi Band).
# Pass a DeviceType or a MAC/UUID to connect.connect() to target a specific device.
sb.connect()
# 3. Configure the onboard ECG digital filters.
# fs: Sampling rate in Hz.
# bandpass: Enables the bandpass filter defined by flo / fhi.
# flo, fhi: Bandpass cutoffs (Hz). 0 to 30 Hz captures the standard
# ECG morphology (P, QRS, T) while attenuating drift and HF noise.
# mains_notch: Removes power line interference. Use 50 in Europe/Asia,
# 60 in North America, or None to disable.
sb.configure_ecg(state=True, fs=1000, mains_notch=50, bandpass=True, flo=0, fhi=30)
# 4. Start data acquisition. The bridge spawns a background thread that
# fills an internal FIFO queue with packets as they arrive over BLE.
sb.start()
# 5. Buffer for accumulated samples and a reference time for the recording window.
ecg_buffer = []
start_time = time.time()
# 6. Discard any packets that piled up between start() and now. This avoids
# mixing startup transients into the recording.
print("Flushing buffer...")
sb.clear_data_buffer()
# 7. Main acquisition loop. Pull packets one at a time for 10 seconds and
# append their ECG samples to the buffer.
try:
print("Recording started.")
while time.time() - start_time < 10:
# get_ecg() blocks until the next ECG packet is available.
# Each packet is a dict with metadata (timestamp, sample_rate, ...)
# and a 'data' field containing the sample array.
packet = sb.get_ecg()
if packet:
ecg_buffer.extend(packet['data']['ecg'])
print(f"Collected {len(ecg_buffer)} samples", end='\r')
finally:
# 8. Always stop streaming and disconnect, even if the loop is
# interrupted. The short sleep gives background threads time to
# flush the stop command over BLE before the process exits.
sb.stop()
sb.disconnect()
time.sleep(0.5)
# 9. Final summary.
print(f"\nTotal ECG samples collected: {len(ecg_buffer)}")
Multi-Sensor Data Collection
Collect data from multiple sensors simultaneously.
import time
import sifi_bridge_py as sbp
# 1. Create a SifiBridge instance, which manages the underlying sifibridge
# CLI process and exposes a Python API to talk to a SiFi device.
sb = sbp.SifiBridge()
# 2. This example is made to be run with a BioPoint. For a SiFi Band,
# the only difference is that the EMG will arrive as an array.
# ECG, EMG and PPG are all available on either device. Pass a MAC/UUID to target a specific unit, or
# call sb.connect() with no argument to grab the first device available.
sb.connect()
# 3. Configure each sensor's onboard digital pipeline.
# ECG: 500 Hz, 0–30 Hz bandpass to keep P/QRS/T morphology while
# removing baseline drift and high-frequency noise.
# EMG: 2 kHz, 20–450 Hz bandpass — the standard surface-EMG band.
# Both use mains_notch=50 (Europe/Asia); set 60 in North America.
sb.configure_ecg(state=True, fs=500, mains_notch=50, bandpass=True, flo=0, fhi=30)
sb.configure_emg(state=True, fs=2000, mains_notch=50, bandpass=True, flo=20, fhi=450)
# PPG 100Hz, LED currents (mA, 1–50). Higher = brighter illumination, more signal
# but more power draw and risk of saturating the photodiode. The defaults
# below are a reasonable starting point for skin contact at the wrist/arm.
sb.configure_ppg(state=True, fs=100, ir=7, red=7, green=9, blue=9,
sens=sbp.PpgSensitivity.MEDIUM)
# 4. Start acquisition. The bridge spawns a background thread that fills
# an internal FIFO queue with packets from every enabled sensor as they
# arrive over BLE.
sb.start()
# 5. Per-sensor packet lists and running sample counters. We keep the raw
# packets so timestamps and metadata are preserved; the counters track
# total samples per sensor across all packets.
ecg_data, ecg_sample_count = [], 0
emg_data, emg_sample_count = [], 0
ppg_data, ppg_sample_count = [], 0
# 6. Let the sensors settle for ~1 second, then drop everything that
# accumulated in the buffer during the settle period. This keeps startup
# transients and front-end stabilization out of the recording.
print("Settling and flushing buffers...")
time.sleep(1.0)
sb.clear_data_buffer()
# 7. Main acquisition loop. Pull packets one at a time for 10 seconds and
# route each to the right buffer based on its packet_type.
print("Recording 10 seconds of multi-sensor data...")
start_time = time.time()
try:
while time.time() - start_time < 10:
# get_data() blocks until the next packet of any type is available.
packet = sb.get_data()
if not packet:
continue
p_type = packet.get('packet_type')
# 8. Route packets and tally sample counts.
# The samples for a given sensor live under packet['data'][<key>].
if p_type == 'ecg':
ecg_data.append(packet)
ecg_sample_count += len(packet['data']['ecg'])
elif p_type == 'emg':
emg_data.append(packet)
emg_sample_count += len(packet['data']['emg'])
elif p_type == 'ppg':
# PPG packets carry four optical channels under the keys
# 'ir', 'r', 'g', 'b' (infrared, red, green, blue). All four
# arrays have the same length, so counting one is enough.
ppg_data.append(packet)
ppg_sample_count += len(packet['data']['ir'])
elapsed = time.time() - start_time
print(f"Recording: {elapsed:4.1f}s | "
f"ECG pkts: {len(ecg_data)} | "
f"EMG pkts: {len(emg_data)} | "
f"PPG pkts: {len(ppg_data)}", end='\r')
finally:
# 9. Always stop streaming and disconnect, even if the loop is
# interrupted. The short sleep gives background threads time to flush
# the stop command over BLE before the process exits.
sb.stop()
sb.disconnect()
time.sleep(0.5)
# 10. Final summary.
print("\n\nRecording complete:")
print(f"ECG: {len(ecg_data)} packets, {ecg_sample_count} total samples")
print(f"EMG: {len(emg_data)} packets, {emg_sample_count} total samples")
print(f"PPG: {len(ppg_data)} packets, {ppg_sample_count} total samples")
Threading Examples
Background Data Collection
Use threading to collect data in the background while performing other tasks.
import threading
import time
import sifi_bridge_py as sbp
class DataCollector:
"""
Wraps SifiBridge and runs the acquisition loop in a background thread,
so the main thread is free to do other work (UI updates, real-time
processing, plotting, etc.) while data accumulates.
Architecture (worth understanding before extending this class):
sifibridge process → internal stdout-reader thread (in SifiBridge)
→ this collector thread (drains the queue)
→ main thread (reads self.data_buffer)
"""
def __init__(self):
# SifiBridge owns the bridge subprocess and its own reader threads.
self.sb = sbp.SifiBridge()
# Sentinel that lets the main thread cleanly stop the worker.
self.running = False
self.thread = None
# Shared buffer. list.append is atomic under the CPython GIL, and
# so is len(list), so the main thread can safely poll the size.
# If you need to iterate or slice the buffer from the main thread
# while collection is running, add a threading.Lock around access.
self.data_buffer = []
def connect_and_configure(self):
"""Connect to the device, configure ECG, and start streaming."""
self.sb.connect()
self.sb.configure_ecg(state=True, fs=500, mains_notch=50,
bandpass=True, flo=0, fhi=30)
self.sb.start()
# Let the front-end settle, then drop the startup transient so the
# collector starts on clean data.
time.sleep(1.0)
self.sb.clear_data_buffer()
def collect_data(self):
"""
Background worker loop. Runs until self.running is set to False.
We use get_data(timeout=1.0) instead of an unbounded blocking call
so the loop wakes up at least once per second to re-check the
running flag — this is what makes clean shutdown possible.
"""
while self.running:
try:
packet = self.sb.get_data(timeout=1.0)
# An empty dict means the timeout fired with no packet.
# Any non-ECG packet is silently dropped here; extend this
# filter if you enable additional sensors.
if packet and packet.get('packet_type') == 'ecg':
self.data_buffer.append(packet)
except Exception as e:
print(f"Collector error: {e}")
def start_collection(self):
"""Spawn the background acquisition thread."""
self.running = True
self.thread = threading.Thread(target=self.collect_data)
self.thread.start()
def stop_collection(self):
"""Signal the worker to exit, then stop and disconnect the device."""
# Flip the flag and wait for the worker to finish its current
# iteration. The 1 s get_data() timeout bounds how long this takes.
self.running = False
if self.thread:
self.thread.join()
# Tear down the BLE link last, so the worker is guaranteed to be
# done touching the bridge before we shut it down.
self.sb.stop()
self.sb.disconnect()
time.sleep(0.5) # Let the stop command propagate over BLE.
# ---------------------------------------------------------------------------
# Usage: collect ECG in the background while the main thread reports progress.
# ---------------------------------------------------------------------------
collector = DataCollector()
collector.connect_and_configure()
collector.start_collection()
print("Collecting data in background...")
for i in range(10):
time.sleep(1)
# The main thread is free to do anything here — render a plot, run a
# detector on the latest packet, update a GUI, etc. We just print the
# running packet count for this example.
print(f" t={i + 1:2d}s | {len(collector.data_buffer)} packets")
collector.stop_collection()
print(f"Final count: {len(collector.data_buffer)} packets")
Producer-Consumer Pattern
Use queues for thread-safe data passing.
import threading
import queue
import time
import sifi_bridge_py as sbp
# ---------------------------------------------------------------------------
# Shared state used by all three threads (producer, consumer, main).
# Single-key dict updates are atomic under the GIL, which is enough here.
# Anything richer (growing lists iterated from another thread) would need
# a threading.Lock.
# ---------------------------------------------------------------------------
counts = {'ecg': 0, 'emg': 0, 'imu': 0}
drops = {'count': 0}
def producer(sb, data_queue, stop_event):
"""Acquisition thread: pull from the bridge, push onto the queue."""
while not stop_event.is_set():
packet = sb.get_data(timeout=0.5)
if not packet:
continue
try:
data_queue.put(packet, timeout=0.5)
except queue.Full:
# Consumer can't keep up. We record the drop instead of printing
# — printing would corrupt the live dashboard on the main thread.
drops['count'] += 1
def consumer(data_queue, stop_event):
"""Processing thread: pop from the queue, do per-packet work."""
while not stop_event.is_set() or not data_queue.empty():
try:
packet = data_queue.get(timeout=0.5)
except queue.Empty:
continue
ptype = packet.get('packet_type', '')
if ptype in counts:
counts[ptype] += 1
# Simulated per-packet work. In a real app this might be filtering,
# feature extraction, writing to disk, or pushing to a GUI. We do NOT
# touch the sample values themselves — the demo must run even when
# the device isn't on a body and the signal is garbage. A small
# delay is enough to make the queue depth visible in the dashboard,
# which is the whole point of the demo.
time.sleep(0.005)
data_queue.task_done()
def render_dashboard(elapsed, qsize, qmax):
"""One-line status, updated in place with \\r."""
bar_width = 20
fill = int(bar_width * qsize / qmax) if qmax else 0
bar = "█" * fill + "░" * (bar_width - fill)
line = (f"[{elapsed:5.1f}s] "
f"queue [{bar}] {qsize:3d}/{qmax} | "
f"ECG: {counts['ecg']:5d} pkts | "
f"EMG: {counts['emg']:5d} pkts | "
f"IMU: {counts['imu']:5d} pkts | "
f"drops: {drops['count']}")
# Pad to overwrite any leftover characters from a previous longer line.
print(line.ljust(120), end='\r', flush=True)
# ---------------------------------------------------------------------------
# Setup: connect, enable sensors, start streaming, settle, flush.
# ---------------------------------------------------------------------------
sb = sbp.SifiBridge()
sb.connect()
sb.configure_sensors(ecg=True, emg=True, imu=True)
sb.start()
time.sleep(1.0)
sb.clear_data_buffer()
# ---------------------------------------------------------------------------
# Threading primitives.
# stop_event - shared shutdown signal.
# data_queue - bounded handoff. Bounding it gives us natural backpressure:
# if the consumer ever falls behind, the queue fills up and
# the producer's put() times out, surfacing as a visible
# "drops" counter on the dashboard instead of unbounded
# memory growth or hidden lag.
# ---------------------------------------------------------------------------
QUEUE_MAX = 100
data_queue = queue.Queue(maxsize=QUEUE_MAX)
stop_event = threading.Event()
producer_thread = threading.Thread(
target=producer, args=(sb, data_queue, stop_event), name="producer")
consumer_thread = threading.Thread(
target=consumer, args=(data_queue, stop_event), name="consumer")
producer_thread.start()
consumer_thread.start()
# ---------------------------------------------------------------------------
# Main thread: render the live dashboard. This is what makes the pattern's
# value obvious — the main thread has zero contact with BLE I/O, but can
# still observe exactly what's flowing because the producer and consumer
# are running independently. If the dashboard hangs (e.g. while a slow
# render or GUI redraw happens), acquisition does not stop.
# ---------------------------------------------------------------------------
print("Live producer-consumer dashboard (30 s). Counters update as packets flow:\n")
RUN_FOR = 30.0
start = time.monotonic()
while True:
elapsed = time.monotonic() - start
if elapsed >= RUN_FOR:
break
render_dashboard(elapsed, data_queue.qsize(), QUEUE_MAX)
time.sleep(0.1)
# ---------------------------------------------------------------------------
# Shutdown: signal first, then join producer before consumer so the consumer
# can drain whatever the producer pushed last.
# ---------------------------------------------------------------------------
stop_event.set()
producer_thread.join()
consumer_thread.join()
sb.stop()
sb.disconnect()
time.sleep(0.5)
# Newline so the final summary doesn't sit on top of the \r-updated line.
print()
print(f"Done. ECG={counts['ecg']} pkts | EMG={counts['emg']} pkts | "
f"IMU={counts['imu']} pkts | dropped={drops['count']}")
Memory Download Examples
Download Device Memory and plot it
Download stored data from device memory to CSV files.
import time
import sifi_bridge_py as sbp
import numpy as np
import matplotlib.pyplot as plt
# 1. Initialize SifiBridge.
# Packets will be received over stdout and pulled into Python via get_data().
sb = sbp.SifiBridge()
# 2. Connect to the first available SiFi Labs device. For this example,
# it needs to be a BioPoint because we are using on-board memory.
# connect() returns False if the device couldn't be reached, so we retry
# in a loop until it succeeds. You can also pass a MAC/UUID instead of a
# DeviceType to target one specific unit.
print("Connecting...")
while not sb.connect():
print(" retrying...")
time.sleep(1)
print("Connected.\n")
# 3. Erase the onboard memory before recording.
# The BioPoint accumulates data across recording sessions until the flash
# is wiped. If we don't erase first, the next download will return the
# new recording AND every old one still on the device. Erasing guarantees
# the download maps 1-to-1 to what we are about to record.
#
# The erase command is asynchronous: the device replies with a status
# packet whose 'status' field is "memory_erased" once it's done.
print("Erasing onboard memory...")
sb.send_command(sbp.DeviceCommand.ERASE_ONBOARD_MEMORY)
erase_start = time.time()
ERASE_TIMEOUT_S = 30
erased = False
while time.time() - erase_start < ERASE_TIMEOUT_S:
pkt = sb.get_data(timeout=1.0)
if pkt.get('status') == 'memory_erased':
erased = True
break
if erased:
print(f"Memory erased in {time.time() - erase_start:.1f} s.\n")
else:
print("Warning: did not see erase confirmation; continuing anyway.\n")
# 4. Configure the IMU and switch the device into onboard-recording mode.
# fs=100 100 Hz sample rate per axis.
# accel_range=2 ±2 g full-scale — best resolution for everyday motion
# (gesture, hand-waving, gentle taps). Use a larger
# range if you expect high-impact movement.
# gyro_range=250 ±250 °/s — typical range for human limb motion.
#
# set_memory_mode(MemoryMode.DEVICE) tells the BioPoint to log straight
# to its onboard flash and NOT stream over BLE during acquisition. This
# is what makes offline recording possible: you can take the device off,
# walk away from the host, and download the data later.
# Alternatives:
# MemoryMode.STREAMING - default, BLE only (no flash log).
# MemoryMode.BOTH - flash log AND live BLE stream.
sb.configure_imu(state=True, fs=100, accel_range=2, gyro_range=250)
sb.set_memory_mode(sbp.MemoryMode.DEVICE)
# 5. Record for 60 seconds.
# Because the device is in MemoryMode.DEVICE, no IMU packets reach the
# host during this window — they're being written to flash. The host
# just needs to call start(), wait, and call stop().
RECORD_SEC = 60
print(f"Recording {RECORD_SEC} s of IMU data to onboard flash.")
print("Move the device around so the accelerometer captures something interesting.\n")
sb.start()
for remaining in range(RECORD_SEC, 0, -1):
bar_width = 30
filled = int(bar_width * (RECORD_SEC - remaining) / RECORD_SEC)
bar = "█" * filled + "░" * (bar_width - filled)
print(f"\r [{bar}] {remaining:3d} s remaining", end='', flush=True)
time.sleep(1)
sb.stop()
print("\nRecording complete.\n")
# Give the BLE link a moment to register the stop before starting the
# download — the download command is rejected if the device still thinks
# acquisition is active.
time.sleep(1)
# 6. Download the recording from onboard memory.
# start_memory_download() does two things:
# 1. Queries the device for how many kilobytes are stored (returned).
# 2. Issues the download command, after which packets begin streaming
# back over BLE in the same format you would get from live
# acquisition (one packet per sensor batch, with packet_type set
# accordingly).
#
# We loop on get_data() and route IMU packets into per-axis lists. The
# transfer terminates with a packet whose 'status' is
# "memory_download_completed" — is_memory_download_completed() is the
# helper that recognises it. Don't break on packet_type alone: the
# memory stream also carries non-IMU packets if other sensors were
# enabled at record time.
kb_to_download = sb.start_memory_download()
print(f"Downloading {kb_to_download} kB from device memory...")
ax_samples, ay_samples, az_samples = [], [], []
packets_received = 0
download_start = time.time()
while True:
packet = sb.get_data()
if not packet:
continue
packets_received += 1
if sb.is_memory_download_completed(packet):
break
# IMU packets carry equal-length arrays under the channel keys
# defined by SensorChannel.IMU: (qw, qx, qy, qz, ax, ay, az). We
# only keep the three accelerometer axes for plotting.
if packet.get('packet_type') == 'imu':
ax_samples.extend(packet['data']['ax'])
ay_samples.extend(packet['data']['ay'])
az_samples.extend(packet['data']['az'])
if packets_received % 100 == 0:
elapsed = time.time() - download_start
print(f"\r {packets_received} packets received | "
f"IMU samples so far: {len(ax_samples)} | "
f"elapsed: {elapsed:.1f} s",
end='', flush=True)
download_time = time.time() - download_start
print(f"\nDownload complete in {download_time:.1f} s.\n")
# 7. Always disconnect when done so the BLE link is released cleanly
# and the device is available for the next session.
sb.disconnect()
time.sleep(0.5)
# 8. Quick sanity check on what we recovered.
# At fs = 100 Hz, a 60 s recording should yield ~6000 samples per axis.
# A noticeably smaller number means BLE drops or that the device wasn't
# recording for the full window. Note that the writing to memory happens in chunk of a few kB,
# so the last few fraction of a second may not be captured if you stop exactly after X seconds.
n = len(ax_samples)
fs_imu = 100
duration_recorded = n / fs_imu
print(f"Recovered {n} IMU samples per axis (~{duration_recorded:.1f} s at {fs_imu} Hz)")
print("Per-axis statistics (g):")
for label, data in [('Ax', ax_samples), ('Ay', ay_samples), ('Az', az_samples)]:
arr = np.asarray(data)
print(f" {label}: min={arr.min():+.3f} max={arr.max():+.3f} "
f"mean={arr.mean():+.3f} std={arr.std():.3f}")
print()
# 9. Plot the three accelerometer axes against time.
# IMU samples are uniform at fs_imu Hz, so the time axis is just
# np.arange(n) / fs_imu. If the device sat still on a desk you should
# see ~1 g on whichever axis points up (gravity) and ~0 g on the others.
t = np.arange(n) / fs_imu
fig, axes = plt.subplots(3, 1, figsize=(10, 8), sharex=True)
for ax_plot, samples, label, color in zip(
axes,
[ax_samples, ay_samples, az_samples],
['Ax', 'Ay', 'Az'],
['tab:red', 'tab:green', 'tab:blue'],
):
ax_plot.plot(t, samples, color=color, linewidth=0.8)
ax_plot.set_ylabel(f"{label} (g)")
ax_plot.grid(True, alpha=0.3)
axes[0].set_title("Downloaded onboard accelerometer recording (60 s @ 100 Hz)")
axes[-1].set_xlabel("Time (s)")
fig.tight_layout()
plt.show()
Advanced Examples
Multiple Device Management
Manage multiple devices simultaneously.
import time
import sifi_bridge_py as sbp
# 1. Initialize SifiBridge.
# A single SifiBridge instance can manage multiple physical devices by
# creating one named "virtual device" per physical unit and switching
# between them with select_device(). All commands (connect, configure,
# start, stop, disconnect) act on whichever virtual device is currently
# selected.
sb = sbp.SifiBridge()
# 2. Create the two virtual devices.
# The default bridge already has one device entry, so create_device()
# adds a second slot. select=True makes the new device active; select=False
# leaves the previously active device selected.
sb.create_device("device1", select=True)
sb.create_device("device2", select=False)
device1_addr = "XX:XX:XX:XX:XX:01"
device2_addr = "XX:XX:XX:XX:XX:02"
# 3. Bring up device 1: select it, connect, configure ECG, start streaming.
sb.select_device("device1")
sb.connect(device1_addr)
sb.configure_ecg(state=True, fs=500, mains_notch=50,
bandpass=True, flo=0, fhi=30)
sb.start()
# 4. Bring up device 2 the same way. Note that device 1 keeps streaming
# in the background — the bridge multiplexes packets from every connected
# device onto the same stdout queue.
sb.select_device("device2")
sb.connect(device2_addr)
sb.configure_emg(state=True, fs=2000, mains_notch=50,
bandpass=True, flo=20, fhi=450)
sb.start()
# Settle both devices, then drop everything that piled up during startup.
# clear_data_buffer() drains the bridge's single shared queue, so one
# call covers packets from every connected virtual device.
time.sleep(1.0)
sb.clear_data_buffer()
# 5. Collect packets from BOTH devices.
# Important: get_ecg() / get_emg() filter by packet_type and would skip
# packets from the other sensor type. With multiple devices on different
# sensors we use get_data() and dispatch ourselves on packet['id'] (which
# carries the virtual-device name) and packet['packet_type'].
print("Collecting 50 sensor packets from both devices...\n")
collected = 0
while collected < 50:
packet = sb.get_data(timeout=1.0)
if not packet:
continue
ptype = packet.get('packet_type')
if ptype not in ('ecg', 'emg'):
continue # status / config / other bookkeeping packets
src = packet.get('id', '?')
samples = packet['data'][ptype][:3]
print(f" [{src:8s}] {ptype.upper()}: {samples}")
collected += 1
# 6. Cleanup. stop() and disconnect() act on the active virtual device,
# so each device must be selected in turn before tearing it down.
sb.select_device("device1")
sb.stop()
sb.disconnect()
sb.select_device("device2")
sb.stop()
sb.disconnect()
time.sleep(0.5) # let the final BLE stop/disconnect commands land
Live Data Plot
Stream data and visualize the signal in real-time
import collections
import threading
import time
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import sifi_bridge_py as sbp
# ---------------------------------------------------------------------------
# Live plot configuration.
#
# IMU_FS IMU sampling rate in Hz. Must match configure_imu(fs=...)
# so the time axis is correct.
# WINDOW_SEC How many seconds of history to keep on screen. The plot
# scrolls left as new samples arrive.
# FRAME_INTERVAL How often (in ms) matplotlib redraws the plot. The data
# itself arrives much faster than this on the BLE side —
# we just don't need to redraw at the BLE rate.
# ---------------------------------------------------------------------------
IMU_FS = 100
WINDOW_SEC = 10
FRAME_INTERVAL = 50 # ms
# ---------------------------------------------------------------------------
# Shared state between the acquisition thread (which receives BLE packets)
# and the plotting thread (matplotlib's main loop).
#
# A deque with a bounded maxlen automatically discards the oldest sample
# when full, so we always hold exactly WINDOW_SEC seconds of history with
# no manual trimming. Appends and iteration are atomic enough under the
# GIL for a tutorial-grade plot.
# ---------------------------------------------------------------------------
norm_buffer = collections.deque(maxlen=IMU_FS * WINDOW_SEC)
stop_event = threading.Event()
def acquisition_loop(sb):
"""Background thread: pull IMU packets and append the accel norm."""
while not stop_event.is_set():
packet = sb.get_data(timeout=0.5)
if not packet or packet.get('packet_type') != 'imu':
continue
# IMU packets carry equal-length arrays for every channel under
# data['ax' / 'ay' / 'az']. We compute the magnitude of the
# accelerometer vector at each sample and append it to the buffer.
data = packet['data']
ax, ay, az = data['ax'], data['ay'], data['az']
for x, y, z in zip(ax, ay, az):
norm_buffer.append((x * x + y * y + z * z) ** 0.5)
# ---------------------------------------------------------------------------
# Connect, configure, start streaming.
# ---------------------------------------------------------------------------
sb = sbp.SifiBridge()
sb.connect()
# Restrict streaming to the IMU only so the BLE link isn't shared with
# sensors we don't plot. configure_sensors() is the on/off switch;
# configure_imu() then sets the IMU-specific parameters. accel_range=2
# (±2 g) gives the best resolution for everyday motion.
sb.configure_sensors(imu=True)
sb.configure_imu(state=True, fs=IMU_FS, accel_range=2, gyro_range=250)
sb.start()
time.sleep(1.0)
sb.clear_data_buffer()
# Spawn the acquisition thread. It runs until stop_event is set in the
# cleanup block below.
acq_thread = threading.Thread(target=acquisition_loop, args=(sb,), name="imu-acq")
acq_thread.start()
# ---------------------------------------------------------------------------
# Matplotlib live plot. FuncAnimation calls update() every FRAME_INTERVAL
# ms; update() snapshots the buffer and redraws the line. The window
# scrolls left as new samples arrive because xlim is anchored to the
# right edge of the buffer.
# ---------------------------------------------------------------------------
fig, ax = plt.subplots(figsize=(10, 4))
line, = ax.plot([], [], color='tab:purple', linewidth=0.9)
ax.set_xlim(0, WINDOW_SEC)
ax.set_ylim(0, 3)
ax.set_xlabel("Time (s, trailing window)")
ax.set_ylabel("|a|")
ax.set_title(f"Live IMU acceleration norm — last {WINDOW_SEC} s")
ax.grid(True, alpha=0.3)
def update(_frame):
n = len(norm_buffer)
if n == 0:
return (line,)
# Snapshot the deque so the line draw sees a stable view even if the
# acquisition thread keeps appending behind us.
samples = list(norm_buffer)
t = [(i - (n - 1)) / IMU_FS + WINDOW_SEC for i in range(n)]
line.set_data(t, samples)
# Auto-rescale Y if motion goes outside the default range.
smax = max(samples)
if smax > ax.get_ylim()[1]:
ax.set_ylim(0, smax * 1.1)
return (line,)
anim = animation.FuncAnimation(
fig, update, interval=FRAME_INTERVAL, blit=True, cache_frame_data=False)
print("Live IMU norm plot. Close the window to stop.")
try:
plt.show() # blocks until the window is closed
finally:
# Clean shutdown: tell the acquisition thread to exit, then tear
# down the BLE link.
stop_event.set()
acq_thread.join(timeout=2.0)
sb.stop()
sb.disconnect()
time.sleep(0.5)
print("Disconnected.")
Generate Event Trigger and Haptic Feedback
Shows how to control the motor inside the SiFi devices to generate vibrotactile feedback. This example also demonstrates how to make the device generate a timestamped event. These events allow you to precisely label experimental milestones, such as trial starts, directly within your synchronized sensor data stream. They can also be used to synchronize events across different devices (virtual trigger).
import time
from collections import deque
import matplotlib.pyplot as plt
import sifi_bridge_py as sbp
# IMU sampling rate. Must match configure_imu(fs=...).
IMU_FS = 100
WINDOW_SEC = 10 # full session length & live-window length
PLOT_REDRAW_SEC = 0.05 # 20 Hz plot redraws
# 1. Initialize, connect, configure IMU, start streaming.
# IMU works the same way on BioPoint and SiFi Band — same packet_type,
# same channel keys (qw, qx, qy, qz, ax, ay, az) — so this example runs
# unchanged on either device.
sb = sbp.SifiBridge()
sb.connect()
sb.configure_sensors(imu=True)
sb.configure_imu(state=True, fs=IMU_FS, accel_range=2, gyro_range=250)
sb.start()
time.sleep(1.0)
sb.clear_data_buffer()
# 2. Set up the live plot.
plt.ion()
fig, ax = plt.subplots(figsize=(11, 4.5))
norm_line, = ax.plot([], [], color='tab:purple', linewidth=0.9)
ax.set_xlim(0, WINDOW_SEC)
ax.set_ylim(0, 3)
ax.set_xlabel("Time (s)")
ax.set_ylabel("|a| (unitless)")
ax.set_title("IMU acceleration norm with motor sweep and device events")
ax.grid(True, alpha=0.3)
fig.tight_layout()
# Buffer for the entire 10 s session — we plot the whole thing live and
# leave the final view up at the end.
norm_buf = deque()
event_times = []
event_artists = []
# 3. Timeline (10 s total):
# t = 2 s start motor at intensity 5 (event 1)
# t = 4 s set intensity to 1 (event 2)
# t = 6 s set intensity to 10 (event 3)
# t = 8 s stop motor (event 4)
# t < 2 s, t > 8 s baseline IMU at the head and tail
DURATION = 10.0
MOTOR_START = 2.0
MOTOR_LOW = 4.0
MOTOR_HIGH = 6.0
MOTOR_STOP = 8.0
motor_started = False
went_low = False
went_high = False
motor_stopped = False
last_plot_redraw = -1.0
t0 = time.time()
print(f"Recording {DURATION:.0f} s of IMU with motor steps + events ...")
while True:
t = time.time() - t0
if t > DURATION:
break
# 3a. Step through the four motor states. Each transition fires a
# device event, which comes back through the data stream as a packet
# with packet_type='event' and a device-side timestamp.
if not motor_started and t >= MOTOR_START:
sb.set_motor_intensity(5)
sb.send_command(sbp.DeviceCommand.START_MOTOR)
sb.send_event() # event 1: motor on @ 5
motor_started = True
if motor_started and not went_low and t >= MOTOR_LOW:
sb.set_motor_intensity(1)
sb.send_event() # event 2: intensity → 1
went_low = True
if went_low and not went_high and t >= MOTOR_HIGH:
sb.set_motor_intensity(10)
sb.send_event() # event 3: intensity → 10
went_high = True
if went_high and not motor_stopped and t >= MOTOR_STOP:
sb.send_command(sbp.DeviceCommand.STOP_MOTOR)
sb.send_event() # event 4: motor off
motor_stopped = True
# 3b. Drain whatever packets the bridge has queued. IMU packets and
# event packets share the same stream — we route on packet_type.
packet = sb.get_data(timeout=0.0)
if packet:
ptype = packet.get('packet_type')
if ptype == 'imu':
# IMU packets carry per-sample timestamps under 'timestamps'
# and equal-length arrays for every channel under data. We
# compute the magnitude of the accelerometer vector at each
# sample. The result is unitless on purpose — a live sanity
# check of motion, not a calibrated measurement.
ts = packet.get('timestamps', [])
ax_s = packet['data'].get('ax', [])
ay_s = packet['data'].get('ay', [])
az_s = packet['data'].get('az', [])
for s_t, x, y, z in zip(ts, ax_s, ay_s, az_s):
norm_buf.append((s_t, (x * x + y * y + z * z) ** 0.5))
elif ptype == 'event':
# Example event packet shape:
# {'packet_type': 'event',
# 'timestamps': [3.105],
# 'data': {'event': [2.0]}, ...}
# 'timestamps[0]' is the device-time at which the event fired.
for evt_t in packet.get('timestamps', []):
event_times.append(evt_t)
print(f" event received at device-time "
f"{packet['timestamps']} (marker "
f"{packet['data'].get('event')})")
# 3c. Redraw the plot at PLOT_REDRAW_SEC granularity. The X axis is
# anchored at the IMU stream's t=0 (first sample's timestamp), so
# the trace simply grows from left to right over the full session.
if norm_buf and (t - last_plot_redraw >= PLOT_REDRAW_SEC):
last_plot_redraw = t
snapshot = list(norm_buf)
ts_abs = [a for a, _ in snapshot]
vals = [v for _, v in snapshot]
x0 = ts_abs[0]
x = [a - x0 for a in ts_abs]
norm_line.set_data(x, vals)
smax = max(vals)
if smax > ax.get_ylim()[1]:
ax.set_ylim(0, smax * 1.1)
for art in event_artists:
art.remove()
event_artists.clear()
y_top = ax.get_ylim()[1]
for i, evt_t in enumerate(event_times):
x_pos = evt_t - x0
if 0 <= x_pos <= WINDOW_SEC:
event_artists.append(
ax.axvline(x_pos, color='tab:red',
linewidth=1.4, alpha=0.85))
event_artists.append(
ax.text(x_pos, y_top * 0.92, f" event {i + 1}",
rotation=90, fontsize=9,
va='top', color='tab:red'))
fig.canvas.draw_idle()
# 3d. Yield to the GUI so the window stays responsive.
plt.pause(0.001)
# 4. Cleanup. Stop the motor defensively in case the loop exited early.
sb.send_command(sbp.DeviceCommand.STOP_MOTOR)
sb.stop()
sb.disconnect()
time.sleep(0.5)
print(f"\nDone. {len(norm_buf)} IMU samples collected, "
f"{len(event_times)} events received.")
# Keep the full final view up so the whole 10 s session is visible.
plt.ioff()
plt.show()
Lab Streaming Layer (LSL) Integration
Stream data via LSL for use with other applications.
import time
import sifi_bridge_py as sbp
# 1. Initialize SifiBridge with LSL enabled.
# use_lsl=True tells the underlying sifibridge process to expose every
# enabled sensor as a Lab Streaming Layer outlet, in addition to the
# usual stdout stream. Any LSL-aware tool on the same network (LabRecorder,
# the LSL viewer, OpenViBE, BCILAB, custom pylsl scripts, etc.) can then
# pick the streams up without us writing any extra Python.
sb = sbp.SifiBridge(use_lsl=True)
# 2. Connect to the first available SiFi device.
sb.connect()
# 3. Enable the sensors we want streamed.
# Each enabled sensor becomes its own LSL outlet, so this single call is
# what determines how many outlets show up downstream.
sb.configure_sensors(ecg=True, emg=True, imu=True)
# 4. Start acquisition.
# Once start() returns, the bridge is publishing to LSL outlets in the
# background. Nothing more is required from the Python side — we just
# need to keep the process alive so the bridge stays running.
sb.start()
# Brief settle so first samples aren't startup transients on the LSL side.
time.sleep(1.0)
print("Streaming to LSL.")
print("Open an LSL viewer (e.g. LabRecorder, the BSL/LSL viewer, or a")
print("pylsl script) to see one outlet per enabled sensor.")
print("For the exact outlet names exposed by this build of sifibridge,")
print("run `sifibridge` in a terminal and type `help lsl` at its prompt.")
print("Press Ctrl+C to stop.\n")
# 5. Hold the process open so the bridge keeps streaming.
# Cleanup must run on Ctrl+C OR on any unexpected error, so we put it in
# a finally block — otherwise the device could be left acquiring with no
# host attached.
start_time = time.monotonic()
try:
while True:
elapsed = time.monotonic() - start_time
print(f"\rStreaming to LSL... t = {elapsed:6.1f} s", end='', flush=True)
time.sleep(1)
except KeyboardInterrupt:
print("\nCtrl+C received, stopping...")
finally:
sb.stop()
sb.disconnect()
time.sleep(0.5) # let the BLE stop/disconnect commands land
print("Disconnected.")
Network Streaming
Stream data over TCP for remote monitoring.
import json
import socket
import threading
import time
import sifi_bridge_py as sbp
import numpy as np
import matplotlib.pyplot as plt
# ---------------------------------------------------------------------------
# Network configuration.
#
# `--tcp-out <host>:<port>` makes the bridge act as a TCP CLIENT that
# connects out to the address you give it and pushes sensor data there.
# Our Python script must therefore run a TCP SERVER first, and the bridge
# will connect to it. We bind to localhost to keep the publisher
# loopback-only (no firewall prompt). Switch to "0.0.0.0" if you want a
# subscriber on another machine on the same network to be able to connect.
# ---------------------------------------------------------------------------
HOST = "127.0.0.1"
PORT = 9001
RUN_FOR_SEC = 30
# ---------------------------------------------------------------------------
# Shared state between the TCP server thread and the main thread. The
# server thread parses incoming packets and updates the counters and the
# per-channel sample buffers; the main thread reads them to render the
# live dashboard, then plots them after acquisition is done.
#
# PPG packets carry four optical channels under the keys 'ir', 'r', 'g',
# 'b' (infrared, red, green, blue). All four arrays in a given packet
# have the same length, so they share a common time axis.
# ---------------------------------------------------------------------------
stop_event = threading.Event()
stats = {'packets': 0, 'ppg_samples': 0, 'last_packet_type': None}
ppg_buffer = {'ir': [], 'r': [], 'g': [], 'b': []}
ppg_buffer_lock = threading.Lock()
def tcp_server():
"""Accept the bridge as a TCP client and parse its JSON stream."""
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((HOST, PORT))
server.listen(1)
server.settimeout(0.5)
print(f"Listening for the bridge on tcp://{HOST}:{PORT} ...")
client = None
while not stop_event.is_set():
try:
client, addr = server.accept()
print(f"Bridge connected from {addr}\n")
break
except socket.timeout:
continue
server.close()
if client is None:
return
# Parsing the TCP feed.
#
# The bridge writes JSON objects back-to-back with NO delimiter
# between them — no newline, no length prefix. So we can't split on
# '\n'. Instead we use json.JSONDecoder.raw_decode, which reads the
# FIRST complete JSON object out of a string and returns the index
# where it ended. We then slice past that object and try again on the
# remainder. If only a partial object has arrived so far, raw_decode
# raises JSONDecodeError and we wait for more bytes from the socket.
#
# This is the framing rule any custom subscriber needs to implement.
decoder = json.JSONDecoder()
client.settimeout(0.5)
buf = ""
try:
while not stop_event.is_set():
try:
chunk = client.recv(4096)
except socket.timeout:
continue
if not chunk:
break # bridge closed the connection
buf += chunk.decode('utf-8', errors='replace')
buf = buf.lstrip()
while buf:
try:
pkt, end = decoder.raw_decode(buf)
except json.JSONDecodeError:
# Partial object — wait for more bytes.
break
buf = buf[end:].lstrip()
stats['packets'] += 1
stats['last_packet_type'] = pkt.get('packet_type')
if pkt.get('packet_type') == 'ppg':
data = pkt.get('data', {})
# Append each PPG channel into its buffer. The lock
# keeps the four lists consistent against the main
# thread, which reads them once acquisition stops.
with ppg_buffer_lock:
for ch in ('ir', 'r', 'g', 'b'):
ppg_buffer[ch].extend(data.get(ch, []))
# Use 'ir' to count samples — all four channels carry
# the same number of samples per packet.
stats['ppg_samples'] += len(data.get('ir', []))
finally:
client.close()
# ---------------------------------------------------------------------------
# Start the TCP server BEFORE constructing SifiBridge. The bridge connects
# out as a TCP client at startup, and if it can't reach a listening socket
# it will exit with a connection-refused error.
# ---------------------------------------------------------------------------
server_thread = threading.Thread(target=tcp_server, name="tcp-server")
server_thread.start()
time.sleep(0.5) # give the OS a moment to actually bind the port
# 1. Initialize SifiBridge with a TCP publisher. The "tcp://<host>:<port>"
# string is parsed into the bridge's --tcp-out flag, which causes every
# sensor packet to be pushed over the TCP socket as JSON.
sb = sbp.SifiBridge(publishers=f"tcp://{HOST}:{PORT}")
# 2. Connect to a SiFi device.
sb.connect()
# 3. Restrict the active sensors to PPG only.
#
# configure_sensors() is the on/off switch — anything not listed as True
# gets disabled. Without this call, whatever sensors were enabled in the
# device's previous session would also stream, which makes the TCP feed
# carry packet_types other than 'ppg'.
#
# configure_ppg() then sets the PPG-specific parameters: sampling rate,
# the per-LED currents (in mA), and the photodiode sensitivity. Higher
# LED currents and higher sensitivity give more signal but more risk of
# saturating the sensor on light skin or under strong ambient light.
# Order matters: enable the sensor first, then configure it.
PPG_FS = 100
sb.configure_sensors(ppg=True)
sb.configure_ppg(state=True, fs=PPG_FS, ir=7, red=7, green=9, blue=9,
sens=sbp.PpgSensitivity.MEDIUM)
# 4. Start streaming. From this point on, the bridge pushes PPG packets
# to the TCP socket as fast as the device produces them.
sb.start()
# Brief settle so the first packets reaching the TCP feed aren't startup
# transients from the analog front-end and the LEDs stabilizing.
time.sleep(1.0)
print(f"Streaming PPG over TCP for {RUN_FOR_SEC} s. The in-script server")
print("is reading the feed for live confirmation.")
print("If the device is on skin, expect the PPG channels to show a")
print("clear pulsatile waveform after a few seconds.\n")
# 5. Live dashboard. The TCP server thread populates `stats` and the
# per-channel buffers; the main thread just reads and prints. Watching
# the counters tick upward is the end-to-end proof that packets are
# flowing: device -> BLE -> bridge -> TCP socket -> parser -> Python.
start_time = time.monotonic()
try:
while True:
elapsed = time.monotonic() - start_time
if elapsed >= RUN_FOR_SEC:
break
rate = stats['packets'] / elapsed if elapsed > 0 else 0.0
print(f"\rt = {elapsed:6.1f} s | "
f"TCP packets: {stats['packets']:6d} "
f"({rate:5.1f}/s) | "
f"PPG samples: {stats['ppg_samples']:7d} | "
end='', flush=True)
time.sleep(0.2)
print(f"\n{RUN_FOR_SEC} s elapsed, stopping...")
except KeyboardInterrupt:
print("\nCtrl+C received, stopping...")
finally:
# 6. Shut down in order: signal the server thread first, then stop
# acquisition and disconnect the device.
stop_event.set()
sb.stop()
sb.disconnect()
server_thread.join(timeout=2.0)
time.sleep(0.5) # let the BLE stop/disconnect commands land
print("Disconnected.")
print(f"Final totals: {stats['packets']} packets, "
f"{stats['ppg_samples']} PPG samples received over TCP.\n")
# 7. Plot the four PPG channels we collected over the TCP feed.
# Each channel is plotted on its own subplot with the matching colour
# so the wavelengths are easy to identify at a glance.
with ppg_buffer_lock:
snapshot = {ch: list(samples) for ch, samples in ppg_buffer.items()}
n = len(snapshot['ir'])
if n == 0:
print("No PPG samples were received — nothing to plot.")
else:
t = np.arange(n) / PPG_FS
channels = [
('ir', 'Infrared', 'tab:gray'),
('r', 'Red', 'tab:red'),
('g', 'Green', 'tab:green'),
('b', 'Blue', 'tab:blue'),
]
fig, axes = plt.subplots(4, 1, figsize=(10, 9), sharex=True)
for ax, (key, label, color) in zip(axes, channels):
ax.plot(t, snapshot[key], color=color, linewidth=0.8)
ax.set_ylabel(f"{label}\n(raw)")
ax.grid(True, alpha=0.3)
axes[0].set_title(
f"PPG received over TCP ({n} samples per channel @ {PPG_FS} Hz)")
axes[-1].set_xlabel("Time (s)")
fig.tight_layout()
plt.show()
Complete Application Example
Full-Featured IMU and EMG Recorder
A complete application with error handling, configuration, and data export.
import json
import logging
import time
from pathlib import Path
import sifi_bridge_py as sbp
class BiosignalRecorder:
"""Record IMU and EMG data from a SiFi BioPoint.
Two sensors at very different sampling rates run in parallel:
- IMU at 100 Hz: motion (quaternion + 3-axis accelerometer)
- EMG at 2000 Hz: surface electromyography
They arrive in separate packets, each with their own packet_type
and per-sample timestamps array, so we route them by packet_type
and store them independently. Saved files keep both streams with
their own timestamps; downstream code can resample as needed.
NOTE — using a SiFi Band instead of a BioPoint:
The SiFi Band exposes an 8-channel EMG armband. Its packets
come in with packet_type == 'emg_armband' and carry eight
channels under the keys 'emg0' .. 'emg7' instead of the single
'emg' key used by BioPoint. To support the Band, replace the
EMG_PACKET_TYPE / EMG_CHANNELS constants below with:
EMG_PACKET_TYPE = 'emg_armband'
EMG_CHANNELS = ('emg0', 'emg1', 'emg2', 'emg3',
'emg4', 'emg5', 'emg6', 'emg7')
and adapt the dispatch and save logic accordingly.
"""
# IMU packets carry seven channels under these keys (see
# sbp.SensorChannel.IMU): a unit quaternion (qw, qx, qy, qz) plus
# the 3-axis accelerometer (ax, ay, az). All seven arrays in a given
# packet share the same length and the same timestamps array.
IMU_CHANNELS = ('qw', 'qx', 'qy', 'qz', 'ax', 'ay', 'az')
# BioPoint EMG: single channel.
EMG_PACKET_TYPE = 'emg'
EMG_CHANNELS = ('emg',)
def __init__(self, output_dir="./recordings"):
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True, parents=True)
self.sb: sbp.SifiBridge | None = None
self.recording = False
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
)
self.logger = logging.getLogger("BiosignalRecorder")
# ------------------------------------------------------------------
# Connection
# ------------------------------------------------------------------
def connect(self, max_retries=5):
"""Connect to the first available SiFi device.
Calling sb.connect() with no argument tells the bridge to grab
whichever SiFi device it sees first — useful when only one
device is in range. Pass a DeviceType (e.g. BIOPOINT_V1_3) or a
MAC/UUID string to target a specific unit.
"""
self.sb = sbp.SifiBridge()
for attempt in range(1, max_retries + 1):
try:
if self.sb.connect():
self.logger.info("Connected on attempt %d", attempt)
return True
self.logger.warning(
"Connection attempt %d/%d failed", attempt, max_retries)
except Exception as e:
self.logger.error("Connection error on attempt %d: %s",
attempt, e)
time.sleep(2)
return False
# ------------------------------------------------------------------
# Configuration
# ------------------------------------------------------------------
def configure(self, imu_fs=100, accel_range=2, gyro_range=250,
emg_fs=2000, emg_mains_notch=50):
"""Enable only the IMU and EMG sensors and set their parameters.
configure_sensors() is the on/off switch for each sensor family.
Anything not listed as True gets disabled, which keeps the BLE
link from being saturated by sensors we don't care about.
EMG defaults match the standard surface-EMG band:
- fs=2000 Hz, 20–450 Hz bandpass, mains-notch filter.
- mains_notch=50 for Europe/Asia; 60 for North America.
IMU defaults are conservative:
- accel_range=2 (±2 g) maximises resolution for everyday motion.
- gyro_range=250 (±250 °/s) covers normal limb motion.
"""
if self.sb is None:
raise RuntimeError("Connect before configuring")
self.sb.configure_sensors(imu=True, emg=True)
self.sb.configure_imu(
state=True,
fs=imu_fs,
accel_range=accel_range,
gyro_range=gyro_range,
)
self.sb.configure_emg(
state=True,
fs=emg_fs,
mains_notch=emg_mains_notch,
bandpass=True,
flo=20,
fhi=450,
)
self.logger.info(
"Configured IMU at %d Hz (accel ±%d g, gyro ±%d °/s) and "
"EMG at %d Hz (20–450 Hz bandpass, %d Hz mains notch).",
imu_fs, accel_range, gyro_range, emg_fs, emg_mains_notch)
# ------------------------------------------------------------------
# Recording
# ------------------------------------------------------------------
def record(self, duration_sec=30):
"""Stream IMU + EMG for `duration_sec` and return both buffers.
Packets from the two sensors are interleaved on the BLE link, so
we read them with sb.get_data() and dispatch on packet_type.
Using sb.get_imu() / sb.get_emg() in alternation would also work
but blocks waiting for one specific type, which can delay the
other when sample rates differ as much as 100 Hz vs 2000 Hz.
"""
if self.sb is None:
raise RuntimeError("Not connected")
# Per-channel buffers plus a shared per-sensor timestamps list.
imu = {ch: [] for ch in self.IMU_CHANNELS}
imu['timestamps'] = []
emg = {ch: [] for ch in self.EMG_CHANNELS}
emg['timestamps'] = []
# Drop any packets that piled up between start() and now so the
# buffers begin on clean data.
self.sb.start()
self.recording = True
time.sleep(1.0)
self.sb.clear_data_buffer()
self.logger.info("Recording for %d s ...", duration_sec)
last_log = 0
start = time.time()
try:
while time.time() - start < duration_sec:
packet = self.sb.get_data(timeout=1.0)
if not packet:
continue
ptype = packet.get('packet_type')
data = packet.get('data', {})
timestamps = packet.get('timestamps', [])
if ptype == 'imu':
# Each IMU packet carries equal-length arrays for
# every channel; extend the running buffers in lockstep.
for ch in self.IMU_CHANNELS:
imu[ch].extend(data.get(ch, []))
imu['timestamps'].extend(timestamps)
elif ptype == self.EMG_PACKET_TYPE:
for ch in self.EMG_CHANNELS:
emg[ch].extend(data.get(ch, []))
emg['timestamps'].extend(timestamps)
# Progress update once per second.
elapsed = time.time() - start
if int(elapsed) > last_log:
last_log = int(elapsed)
self.logger.info(
" t=%2d s | IMU samples: %d | EMG samples: %d",
last_log,
len(imu['timestamps']),
len(emg['timestamps']))
except KeyboardInterrupt:
self.logger.warning("Recording interrupted by user")
finally:
self.sb.stop()
self.recording = False
self.logger.info(
"Recording complete: %d IMU samples, %d EMG samples",
len(imu['timestamps']), len(emg['timestamps']))
return imu, emg
# ------------------------------------------------------------------
# Persistence
# ------------------------------------------------------------------
def save(self, imu, emg, filename=None):
"""Save both streams to a single JSON file under output_dir.
Each stream keeps its own timestamps, since IMU and EMG run at
very different rates (100 Hz vs 2000 Hz) and aren't sampled in
lockstep. Anyone re-loading the file can resample / interpolate
as needed.
"""
if filename is None:
filename = f"biosignal_{int(time.time())}.json"
filepath = self.output_dir / filename
payload = {
'imu': {
'channels': list(self.IMU_CHANNELS),
'data': {ch: imu[ch] for ch in self.IMU_CHANNELS},
'timestamps': imu['timestamps'],
'sample_count': len(imu['timestamps']),
},
'emg': {
'packet_type': self.EMG_PACKET_TYPE,
'channels': list(self.EMG_CHANNELS),
'data': {ch: emg[ch] for ch in self.EMG_CHANNELS},
'timestamps': emg['timestamps'],
'sample_count': len(emg['timestamps']),
},
}
with open(filepath, 'w') as f:
json.dump(payload, f)
self.logger.info("Saved to %s", filepath)
return filepath
# ------------------------------------------------------------------
# Cleanup
# ------------------------------------------------------------------
def disconnect(self):
"""Stop streaming (if needed) and release the BLE link."""
if self.sb is None:
return
if self.recording:
self.sb.stop()
self.sb.disconnect()
time.sleep(0.5) # let the BLE stop/disconnect commands land
self.logger.info("Disconnected")
# ---------------------------------------------------------------------------
# Usage
# ---------------------------------------------------------------------------
if __name__ == "__main__":
recorder = BiosignalRecorder(output_dir="./my_recordings")
try:
if not recorder.connect():
print("Failed to connect to a SiFi device.")
raise SystemExit(1)
recorder.configure(imu_fs=100, accel_range=2, gyro_range=250,
emg_fs=2000, emg_mains_notch=50)
imu, emg = recorder.record(duration_sec=30)
recorder.save(imu, emg)
# Quick sanity summary so the run is self-explanatory in the
# console even before re-loading the file.
if imu['timestamps']:
ax = imu['ax']
print(f"\nIMU (ax) range: {min(ax):+.3f} g to {max(ax):+.3f} g")
if emg['timestamps']:
ch = BiosignalRecorder.EMG_CHANNELS[0]
samples = emg[ch]
peak = max(abs(s) for s in samples)
print(f"EMG peak amplitude on '{ch}': {peak:.4f}")
finally:
recorder.disconnect()
Computing Heart Rate from PPG signal
Example on how to compute a rolling heart rate from the PPG (using the green channel) in real-time. This example make use of Neurokit2 (https://neurokit2.readthedocs.io) for peak detection
"""
Live heart rate from PPG.
This example streams the BioPoint's green PPG channel, runs a sliding
8-second analysis window through NeuroKit2 (cleaning + peak detection),
and plots the cleaned waveform together with a running heart-rate
estimate. The HR shown is the mean rate across the current window, with
a shaded band representing one standard deviation around that mean —
a quick visual cue for how steady the rate is over the window.
Requirements (in addition to sifi_bridge_py):
pip install numpy matplotlib neurokit2
NeuroKit2 documentation: https://neurokit2.readthedocs.io
"""
import time
from collections import deque
import matplotlib.pyplot as plt
import numpy as np
import neurokit2 as nk
import sifi_bridge_py as sbp
# PPG sampling rate. Must match configure_ppg(fs=...) — both the buffer
# size and the X axis are derived from this constant, so changing it
# here keeps the rest of the script consistent.
PPG_FS = 100
# Length of the analysis window. 8 seconds contains 6–15 heartbeats at
# normal rates, which is enough for a stable HR estimate while still
# reacting to changes within a few seconds.
WINDOW_SEC = 8
DURATION_SEC = 60 # how long the demo runs in total
HR_UPDATE_SEC = 1.0 # interval between HR computations
PLOT_REDRAW_SEC = 0.1 # 10 Hz plot redraw
# Minimum half-range of the HR axis around the current heart rate. The
# axis widens automatically when the ±1 SD band is larger than this so
# nothing ever gets clipped, but stays at ±5 bpm at rest for a tight,
# readable view.
HR_AXIS_PADDING_BPM = 5.0
# Which PPG wavelength to analyse. The BioPoint streams four LEDs
# (infrared, red, green, blue) under the keys 'ir', 'r', 'g', 'b'. Green
# is usually the cleanest at the wrist; switch to 'ir' or 'r' if the
# device is over thicker tissue (finger tip, earlobe).
PPG_CHANNEL = 'g'
# 1. Connect, configure PPG, start streaming.
sb = sbp.SifiBridge()
sb.connect()
# configure_sensors() is the on/off switch — anything not listed as True
# is disabled, so only PPG packets reach our code. configure_ppg() then
# sets the PPG-specific parameters: sampling rate, per-LED current in
# milliamps (1–50), and photodiode sensitivity. Higher current and
# sensitivity give a stronger signal but raise the risk of saturating
# under bright ambient light or on lighter skin.
sb.configure_sensors(ppg=True)
sb.configure_ppg(state=True, fs=PPG_FS, ir=15, red=15, green=15, blue=15,
sens=sbp.PpgSensitivity.MEDIUM, avg=4)
sb.start()
time.sleep(1.0)
sb.clear_data_buffer()
# 2. Set up the live plot. Top axes show the cleaned PPG waveform with
# detected peaks; bottom axes show the running HR estimate together
# with a ±1 SD band.
plt.ion()
fig, (ax_signal, ax_hr) = plt.subplots(
2, 1, figsize=(11, 6), gridspec_kw={'height_ratios': [3, 1]})
cleaned_line, = ax_signal.plot(
[], [], color='tab:blue', linewidth=0.9, label='cleaned PPG')
peak_scatter = ax_signal.scatter(
[], [], color='tab:red', s=40, zorder=3, label='detected peaks')
ax_signal.set_xlim(0, WINDOW_SEC)
ax_signal.set_xlabel(f"Time (s, trailing {WINDOW_SEC} s window)")
ax_signal.set_ylabel(f"PPG ({PPG_CHANNEL}, cleaned)")
ax_signal.set_title("Live PPG with detected peaks")
ax_signal.grid(True, alpha=0.3)
ax_signal.legend(loc='upper right')
# HR history with a ±1 SD shaded band. fill_between can't be updated in
# place, so we tear down and re-create the band on each redraw.
hr_history_t = []
hr_history_v = []
hr_history_sd = []
hr_line, = ax_hr.plot([], [], color='tab:purple', linewidth=1.6,
label='mean HR (window)')
hr_band = None
ax_hr.set_xlim(0, DURATION_SEC)
ax_hr.set_ylim(60 - HR_AXIS_PADDING_BPM, 60 + HR_AXIS_PADDING_BPM)
ax_hr.set_xlabel("Time since start (s)")
ax_hr.set_ylabel("HR (bpm)")
ax_hr.grid(True, alpha=0.3)
ax_hr.legend(loc='upper right')
fig.tight_layout()
# 3. Bounded buffer holding the most recent WINDOW_SEC seconds of raw
# PPG samples. New samples come in from the right; once the buffer is
# full, older samples drop off the left automatically.
ppg_buf = deque(maxlen=PPG_FS * WINDOW_SEC)
last_hr_update = -1.0
last_plot_redraw = -1.0
current_hr = None
current_hr_sd = None
t0 = time.time()
print(f"Streaming PPG @ {PPG_FS} Hz on channel '{PPG_CHANNEL}'.")
print(f"Heart rate is the mean over the last {WINDOW_SEC} s, "
f"updated every {HR_UPDATE_SEC:.0f} s.\n")
print("Make sure the sensor is in good contact with skin; expect a few")
print("seconds before HR settles into a stable value.\n")
try:
while True:
# If the user closed the figure window, stop early.
if not plt.fignum_exists(fig.number):
break
t = time.time() - t0
if t > DURATION_SEC:
sb.stop()
sb.disconnect()
break
# 4. Pull whatever packet the bridge has queued. PPG packets
# carry four optical channels under 'ir', 'r', 'g', 'b'; all
# four arrays in a given packet share the same length.
packet = sb.get_data(timeout=0.0)
if packet and packet.get('packet_type') == 'ppg':
samples = packet['data'].get(PPG_CHANNEL, [])
ppg_buf.extend(samples)
# 5. Recompute heart rate every HR_UPDATE_SEC. We wait until
# the buffer holds a full window so the very first estimate is
# already meaningful.
if (len(ppg_buf) >= PPG_FS * WINDOW_SEC
and t - last_hr_update >= HR_UPDATE_SEC):
last_hr_update = t
# nk.ppg_process runs the whole pipeline for us: bandpass
# cleaning, systolic-peak detection, and per-beat HR in bpm.
# We feed it only the current window, so the cost stays
# bounded no matter how long the demo runs.
signal = np.asarray(ppg_buf, dtype=float)
try:
df, info = nk.ppg_process(signal, sampling_rate=PPG_FS)
hr_series = df["PPG_Rate"].to_numpy()
# Use the mean for a robust central estimate and the
# standard deviation as a quick measure of how steady
# the rate is across the window. Both ignore NaNs that
# can appear before the first peak is detected.
hr_clean = hr_series[np.isfinite(hr_series)]
if hr_clean.size > 0:
current_hr = float(np.mean(hr_clean))
current_hr_sd = float(np.std(hr_clean))
hr_history_t.append(t)
hr_history_v.append(current_hr)
hr_history_sd.append(current_hr_sd)
print(f" t={t:5.1f} s | "
f"HR = {current_hr:5.1f} ± "
f"{current_hr_sd:4.1f} bpm")
except Exception as e:
# nk.ppg_process can raise on very short or very noisy
# windows. Skip and try again on the next tick.
print(f" t={t:5.1f} s | HR computation skipped ({e})")
# 6. Redraw the plot at PLOT_REDRAW_SEC granularity. We display
# the cleaned waveform (bandpass-filtered) rather than the raw
# ADC counts so the heartbeats are obvious to the eye, and the
# peak markers sit on the same waveform that NeuroKit2 actually
# used to find them.
if ppg_buf and (t - last_plot_redraw >= PLOT_REDRAW_SEC):
last_plot_redraw = t
signal = np.asarray(ppg_buf, dtype=float)
x = np.arange(signal.size) / PPG_FS
# Cleaning + peak detection are run on every redraw so the
# markers stay aligned with the waveform as the window
# scrolls. These are far cheaper than the full
# nk.ppg_process pipeline (no HR aggregation, no DataFrame
# construction), so they're safe to call at the redraw rate.
try:
cleaned = nk.ppg_clean(signal, sampling_rate=PPG_FS)
peaks_info = nk.ppg_findpeaks(cleaned, sampling_rate=PPG_FS)
peaks_idx = np.asarray(
peaks_info["PPG_Peaks"], dtype=int)
except Exception:
cleaned = signal
peaks_idx = np.array([], dtype=int)
cleaned_line.set_data(x, cleaned)
# Centre the Y axis on the cleaned signal so a small
# pulsatile waveform isn't lost on a wide axis.
smin, smax = cleaned.min(), cleaned.max()
margin = (smax - smin) * 0.1 + 1e-9
ax_signal.set_ylim(smin - margin, smax + margin)
if peaks_idx.size > 0:
peak_scatter.set_offsets(np.column_stack(
[peaks_idx / PPG_FS, cleaned[peaks_idx]]))
else:
peak_scatter.set_offsets(np.empty((0, 2)))
if current_hr is not None:
ax_signal.set_title(
f"Live PPG with detected peaks — HR = "
f"{current_hr:.1f} ± {current_hr_sd:.1f} bpm")
# Update the HR trace and the ±1 SD band. The band is
# drawn as a fill_between artist that we tear down and
# recreate each redraw, since fill collections aren't
# updatable in place.
if hr_history_t:
t_arr = np.asarray(hr_history_t)
hr_arr = np.asarray(hr_history_v)
sd_arr = np.asarray(hr_history_sd)
hr_line.set_data(t_arr, hr_arr)
if hr_band is not None:
hr_band.remove()
hr_band = ax_hr.fill_between(
t_arr, hr_arr - sd_arr, hr_arr + sd_arr,
color='tab:purple', alpha=0.2, label='±1 SD')
# Window of ±HR_AXIS_PADDING_BPM around the current HR,
# widened automatically so the entire SD band is
# visible even when variability is large.
centre = current_hr
half = HR_AXIS_PADDING_BPM
if current_hr_sd is not None:
half = max(half, current_hr_sd + 1.0)
ax_hr.set_ylim(centre - half, centre + half)
fig.canvas.draw_idle()
# Yield to the GUI so the figure window stays responsive.
# plt.pause raises an internal exception once the window has
# been closed, so we catch and treat it as a clean exit.
try:
plt.pause(0.001)
except Exception:
break
finally:
# 7. Always stop streaming and disconnect, even if something raised
# mid-loop or the user closed the window early. The short sleep
# gives the BLE stop / disconnect commands time to land before the
# script exits. Cleanup runs BEFORE the final blocking plt.show()
# so the device is released even when the user lets the window sit.
sb.stop()
sb.disconnect()
time.sleep(0.5)
if hr_history_v:
print(f"\nDone. {len(ppg_buf)} PPG samples in the buffer, "
f"{len(hr_history_v)} HR estimates.")
print(f"Mean HR over the session: {np.mean(hr_history_v):.1f} bpm "
f"(min {min(hr_history_v):.1f}, "
f"max {max(hr_history_v):.1f}).")
else:
print("\nDone. No HR estimates produced — check sensor contact, "
"ambient light, and that the device was on skin.")
# Block on the final figure only if the window is still open.
plt.ioff()
if plt.fignum_exists(fig.number):
plt.show()