Skip to main content

Examples

Practical code examples for common use cases with SiFi Bridge Python.

Basic Examples

Simple ECG Recording

Record ECG data for a fixed duration and save to a list.

import sifi_bridge_py as sbp
import time

# 1. Initialize the SifiBridge object to manage the device connection
sb = sbp.SifiBridge()

# 2. Establish a connection to the BioPoint V1.3 hardware
# This searches for and connects to the device via the bridge
sb.connect(sbp.DeviceType.BIOPOINT_V1_3)

# 3. Configure the onboard EMG digital filters
# bandpass_freqs: Filters out noise below 20Hz and above 450Hz
# notch_freq: Removes power line interference (50Hz is common in Europe/Asia; use 60Hz for US/Canada)
sb.configure_emg(bandpass_freqs=(20, 450), notch_freq=50)

# 4. Start the data acquisition stream
# Note: The bridge starts a background thread that fills an internal
# FIFO queue immediately upon calling start().
sb.start()

# 5. Flush the internal buffer for 1 second
# We do this because packets begin accumulating the moment sb.start() is called.
# Since get_emg() only retrieves one packet at a time, we loop to "drain"
# the queue of any stale data or startup noise before our actual recording begins.
print("Flushing buffer...")
flush_start = time.time()
while time.time() - flush_start < 1.0:
sb.get_emg()

# 6. Initialize collection variables
emg_buffer = [] # List to store accumulated EMG data points
start_time = time.time() # Track the start time for the 10s window

# 7. Main collection loop (Collect for 10 seconds)
try:
print("Recording started.")
while time.time() - start_time < 10:
# Pull the oldest available packet from the internal queue
packet = sb.get_emg()

# 'packet' is a dictionary containing metadata and the data array
# We extend our buffer with the 'emg' array found inside 'data'
if packet:
emg_buffer.extend(packet['data']['emg'])

# Provide a real-time status update in the console
print(f"Collected {len(emg_buffer)} samples", end='\r')

finally:
# 8. Cleanup: Ensure the device stops streaming and disconnects
# This block runs even if the loop is interrupted or crashes
sb.stop()
sb.stop()
sb.disconnect()
time.sleep(.5) # Small pause to allow background threads to close cleanly

# 9. Print the final results
print(f"\nTotal EMG samples collected: {len(emg_buffer)}")

Multi-Sensor Data Collection

Collect data from multiple sensors simultaneously.

import sifi_bridge_py as sbp
import time

# 1. Initialize the SifiBridge object
sb = sbp.SifiBridge()

# 2. Establish a connection to the BioPoint V1.3 hardware
sb.connect(sbp.DeviceType.BIOPOINT_V1_3)

# 3. Configure Multiple Sensors
# Each sensor has specific digital filters or hardware gains applied onboard
sb.configure_ecg(bandpass_freqs=(0, 30))
sb.configure_emg(bandpass_freqs=(20, 450), notch_freq=50)
sb.configure_ppg(ir=7, red=7, green=9, blue=9, sens=sbp.PpgSensitivity.MEDIUM)

# 4. Start the acquisition stream
# The library starts a background thread to handle high-speed data transfer
sb.start()

# 5. Initialize buffers and sample counters
ecg_data, ecg_sample_count = [], 0
emg_data, emg_sample_count = [], 0
ppg_data, ppg_sample_count = [], 0

# 6. Flush the internal buffer for 1 second
# We drain the queue to clear out any startup latency or stale data,
# ensuring the recording starts with fresh, synchronized samples.
print("Flushing buffers for all sensors...")
flush_start = time.time()
while time.time() - flush_start < 1.0:
sb.get_data()

print("Recording 10 seconds of multi-sensor data...")
start_time = time.time()

# 7. Main collection loop
try:
while time.time() - start_time < 10:
# Get the oldest packet available for any of the active sensors
packet = sb.get_data()

if packet:
p_type = packet.get('packet_type')

# 8. Route packets and increment data point (sample) counts
# 'packet['data'][p_type]' contains the list of raw samples
if p_type == 'ecg':
ecg_data.append(packet)
ecg_sample_count += len(packet['data']['ecg'])

elif p_type == 'emg':
emg_data.append(packet)
emg_sample_count += len(packet['data']['emg'])

elif p_type == 'ppg':
ppg_data.append(packet)
# PPG packets contain optical data (IR, Red, Green, Blue)
ppg_sample_count += len(packet['data']['ir']) # PPG data is sent back with "ir", "red", "green", "blue" keys, here we count samples from the "ir" channel (all channels have the same length)

# Optional: Print progress
elapsed = time.time() - start_time
print(f"Recording: {elapsed:.1f}s | ECG: {len(ecg_data)} | EMG: {len(emg_data)} | PPG: {len(ppg_data)}", end='\r')

finally:
# 9. Cleanup: Stop streaming and release hardware resources
sb.stop()
sb.disconnect()
time.sleep(.5) # Small pause to allow background threads to close cleanly

# 10. Final Summary
print(f"\n\nRecording Complete:")
print(f"ECG: {len(ecg_data)} packets, {ecg_sample_count} total data points")
print(f"EMG: {len(emg_data)} packets, {emg_sample_count} total data points")
print(f"PPG: {len(ppg_data)} packets, {ppg_sample_count} total data points")

Save Directly to CSV

Let SiFi Bridge handle CSV file creation and writing.

import sifi_bridge_py as sbp
import time

# Create bridge with CSV output
sb = sbp.SifiBridge(publishers="csv://./recordings/")

# Connect and configure
sb.connect(sbp.DeviceType.BIOPOINT_V1_3)
sb.configure_sensors(ecg=True, emg=True, imu=True)

# Record for 60 seconds
sb.start()
print("Recording for 60 seconds...")
time.sleep(60)
sb.stop()

sb.disconnect()
print("Data saved to ./recordings/")

Threading Examples

Background Data Collection

Use threading to collect data in the background while performing other tasks.

import threading
import time
import sifi_bridge_py as sbp

class DataCollector:
def __init__(self):
self.sb = sbp.SifiBridge()
self.running = False
self.thread = None
self.data_buffer = []

def connect_and_configure(self):
self.sb.connect()
self.sb.configure_ecg(state=True, fs=500)
self.sb.start()

def collect_data(self):
"""Background thread function"""
while self.running:
try:
packet = self.sb.get_data(timeout=1.0)
if packet and packet['packet_type'] == 'ecg':
self.data_buffer.append(packet)
except Exception as e:
print(f"Error: {e}")

def start_collection(self):
"""Start background collection"""
self.running = True
self.thread = threading.Thread(target=self.collect_data)
self.thread.start()

def stop_collection(self):
"""Stop background collection"""
self.running = False
if self.thread:
self.thread.join()
self.sb.stop()
self.sb.disconnect()

# Usage
collector = DataCollector()
collector.connect_and_configure()
collector.start_collection()

# Do other work while collecting
print("Collecting data in background...")
for i in range(10):
time.sleep(1)
print(f"Collected {len(collector.data_buffer)} packets")

collector.stop_collection()
print(f"Final count: {len(collector.data_buffer)} packets")

Producer-Consumer Pattern

Use queues for thread-safe data passing.

import threading
import queue
import sifi_bridge_py as sbp

def producer(sb, data_queue, stop_event):
"""Collect data and put in queue"""
while not stop_event.is_set():
try:
packet = sb.get_data(timeout=0.5)
if packet:
data_queue.put(packet)
except:
pass

def consumer(data_queue, stop_event):
"""Process data from queue"""
packet_count = {'ecg': 0, 'emg': 0, 'imu': 0}

while not stop_event.is_set() or not data_queue.empty():
try:
packet = data_queue.get(timeout=0.5)
packet_type = packet.get('packet_type', 'unknown')

if packet_type in packet_count:
packet_count[packet_type] += 1

# Process packet here...

data_queue.task_done()
except queue.Empty:
continue

print(f"Processed packets: {packet_count}")

# Setup
sb = sbp.SifiBridge()
sb.connect()
sb.configure_sensors(ecg=True, emg=True, imu=True)
sb.start()

# Create queue and threads
data_queue = queue.Queue(maxsize=100)
stop_event = threading.Event()

producer_thread = threading.Thread(target=producer, args=(sb, data_queue, stop_event))
consumer_thread = threading.Thread(target=consumer, args=(data_queue, stop_event))

# Start threads
producer_thread.start()
consumer_thread.start()

# Run for 30 seconds
import time
time.sleep(30)

# Stop threads
stop_event.set()
producer_thread.join()
consumer_thread.join()

# Cleanup
sb.stop()
sb.disconnect()

Memory Download Examples

Download Device Memory to CSV

Download stored data from device memory to CSV files.

import sifi_bridge_py as sbp
import time

# Setup with CSV output
sb = sbp.SifiBridge(publishers="csv://./memory_data/")

# Connect
while not sb.connect(sbp.DeviceType.BIOPOINT_V1_3):
print("Connecting...")
time.sleep(1)

# Start memory download
kb = sb.start_memory_download()
print(f"Downloading {kb} kB from device memory...")

# Wait for download to complete
packets_received = 0
start_time = time.time()

while True:
packet = sb.get_data()
packets_received += 1

if sb.is_memory_download_completed(packet):
break

# Print progress
if packets_received % 100 == 0:
elapsed = time.time() - start_time
rate = (packets_received * 0.227) / elapsed # Approximate kBps
print(f"Downloaded {packets_received} packets ({rate:.1f} kBps)")

print(f"Download complete! Check ./memory_data/")
sb.disconnect()

Download Memory to Python Lists

Download and store memory data in Python for immediate processing.

import sifi_bridge_py as sbp

# Setup
sb = sbp.SifiBridge()
sb.connect()

# Enable only ECG for clarity
sb.configure_sensors(ecg=True)

# Start memory download
kb = sb.start_memory_download()
print(f"Downloading {kb} kB...")

# Collect all ECG data
ecg_samples = []
packet_count = 0

while True:
packet = sb.get_data()

if sb.is_memory_download_completed(packet):
break

if packet['packet_type'] == 'ecg':
ecg_samples.extend(packet['data']['ecg'])
packet_count += 1

sb.disconnect()

print(f"Downloaded {len(ecg_samples)} ECG samples in {packet_count} packets")
print(f"First 10 samples: {ecg_samples[:10]}")

Data Processing Examples

Real-Time Signal Processing

Apply filters and processing to incoming data.

import sifi_bridge_py as sbp
import numpy as np
from scipy import signal

class ECGProcessor:
def __init__(self, fs=500):
self.fs = fs
# Design bandpass filter (0.5-40 Hz)
self.b, self.a = signal.butter(4, [0.5, 40], 'bandpass', fs=fs)
self.zi = signal.lfilter_zi(self.b, self.a)

def process(self, ecg_samples):
"""Apply filtering to ECG samples"""
filtered, self.zi = signal.lfilter(
self.b, self.a, ecg_samples, zi=self.zi
)
return filtered

# Setup
sb = sbp.SifiBridge()
sb.connect()
sb.configure_ecg(state=True, fs=500)
sb.start()

processor = ECGProcessor(fs=500)

# Process incoming data
for _ in range(100):
packet = sb.get_ecg()
raw_ecg = packet['data']['ecg']
filtered_ecg = processor.process(raw_ecg)

# Do something with filtered data
print(f"Raw mean: {np.mean(raw_ecg):.4f}, "
f"Filtered mean: {np.mean(filtered_ecg):.4f}")

sb.stop()
sb.disconnect()

Calculate Heart Rate from PPG

Simple heart rate estimation from PPG data.

import sifi_bridge_py as sbp
import numpy as np
from scipy import signal

def estimate_heart_rate(ppg_data, fs=100):
"""Estimate heart rate from PPG signal"""
# Use IR channel
ir_signal = np.array(ppg_data)

# Bandpass filter (0.5-4 Hz for heart rate)
b, a = signal.butter(3, [0.5, 4], 'bandpass', fs=fs)
filtered = signal.filtfilt(b, a, ir_signal)

# Find peaks
peaks, _ = signal.find_peaks(filtered, distance=fs*0.5)

if len(peaks) < 2:
return None

# Calculate heart rate
ibi = np.diff(peaks) / fs # Inter-beat intervals
hr = 60 / np.mean(ibi)

return hr

# Setup
sb = sbp.SifiBridge()
sb.connect()
sb.configure_ppg(state=True, fs=100, ir=25, sens=sbp.PpgSensitivity.HIGH)
sb.start()

# Collect 10 seconds of data
ppg_buffer = []
packet_count = 0
target_packets = 100 # ~10 seconds at 100 Hz

while packet_count < target_packets:
packet = sb.get_ppg()
ppg_buffer.extend(packet['data']['ir'])
packet_count += 1

sb.stop()
sb.disconnect()

# Estimate heart rate
hr = estimate_heart_rate(ppg_buffer, fs=100)
if hr:
print(f"Estimated heart rate: {hr:.1f} BPM")

Advanced Examples

Multiple Device Management

Manage multiple devices simultaneously.

import sifi_bridge_py as sbp

# Create bridge
sb = sbp.SifiBridge()

# Create virtual devices for two physical devices
sb.create_device("device1", select=True)
device1_addr = "XX:XX:XX:XX:XX:01"

sb.create_device("device2", select=False)
device2_addr = "XX:XX:XX:XX:XX:02"

# Connect device 1
sb.select_device("device1")
sb.connect(device1_addr)
sb.configure_ecg(state=True)
sb.start()

# Connect device 2
sb.select_device("device2")
sb.connect(device2_addr)
sb.configure_emg(state=True)
sb.start()

# Collect data from both
for _ in range(50):
packet = sb.get_ecg()
print(f"{packet['id']} ECG: {packet['data']['ecg'][:3]}")

# Cleanup
sb.select_device("device1")
sb.stop()
sb.disconnect()

sb.select_device("device2")
sb.stop()
sb.disconnect()

Lab Streaming Layer (LSL) Integration

Stream data via LSL for use with other applications.

import sifi_bridge_py as sbp
import time

# Create bridge with LSL enabled
sb = sbp.SifiBridge(use_lsl=True)

# Connect and configure
sb.connect()
sb.configure_sensors(ecg=True, emg=True, imu=True)

# Start acquisition (data automatically streams to LSL)
sb.start()

print("Streaming to LSL. Use LSL viewer to see outlets.")
print("Outlets: ECG, EMG, IMU, etc.")
print("Press Ctrl+C to stop...")

try:
# Keep running
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\nStopping...")

sb.stop()
sb.disconnect()

Network Streaming

Stream data over TCP for remote monitoring.

import sifi_bridge_py as sbp
import time

# Create bridge with TCP output
sb = sbp.SifiBridge(publishers="tcp://0.0.0.0:5000")

# Connect and configure
sb.connect()
sb.configure_ecg(state=True, fs=500)

# Start streaming
sb.start()

print("Streaming ECG data to TCP port 5000")
print("Connect with: nc localhost 5000")
print("Press Ctrl+C to stop...")

try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\nStopping...")

sb.stop()
sb.disconnect()

Auto-Reconnect on Disconnect

Automatically reconnect if connection is lost.

import sifi_bridge_py as sbp
import time

def connect_with_retry(sb, max_retries=10, retry_delay=2):
"""Try to connect with retries"""
for attempt in range(max_retries):
try:
if sb.connect():
print("Connected!")
return True
print(f"Retry {attempt + 1}/{max_retries}")
time.sleep(retry_delay)
except Exception as e:
print(f"Error: {e}")
time.sleep(retry_delay)
return False

# Setup
sb = sbp.SifiBridge()

if not connect_with_retry(sb):
print("Failed to connect")
exit(1)

sb.configure_ecg(state=True)
sb.start()

# Monitor connection and reconnect if needed
packet_timeout = 5.0 # Consider disconnected if no packet in 5s

try:
while True:
packet = sb.get_data(timeout=packet_timeout)

if not packet:
print("Connection lost! Reconnecting...")
sb.disconnect()

if connect_with_retry(sb):
sb.configure_ecg(state=True)
sb.start()
else:
print("Failed to reconnect")
break
else:
print(f"Received {packet['packet_type']}")

except KeyboardInterrupt:
print("\nStopping...")

sb.stop()
sb.disconnect()

Complete Application Example

A complete application with error handling, configuration, and data export.

import sifi_bridge_py as sbp
import time
import json
from pathlib import Path
import logging

class ECGRecorder:
def __init__(self, output_dir="./recordings"):
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
self.sb = None
self.recording = False

# Setup logging
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)

def connect(self, device_type=sbp.DeviceType.BIOPOINT_V1_3, max_retries=5):
"""Connect to device with retries"""
self.sb = sbp.SifiBridge()

for attempt in range(max_retries):
try:
if self.sb.connect(device_type):
self.logger.info("Connected successfully")
return True
self.logger.warning(f"Connection attempt {attempt + 1} failed")
time.sleep(2)
except Exception as e:
self.logger.error(f"Connection error: {e}")
time.sleep(2)

return False

def configure(self, fs=500, apply_filters=True):
"""Configure ECG sensor"""
self.sb.configure_ecg(
state=True,
fs=fs,
dc_notch=apply_filters,
mains_notch=60 if apply_filters else None,
bandpass=apply_filters,
flo=0.5,
fhi=40
)
self.logger.info(f"Configured ECG: fs={fs} Hz, filters={apply_filters}")

def record(self, duration_sec):
"""Record ECG for specified duration"""
if not self.sb:
raise RuntimeError("Not connected")

# Start acquisition
self.sb.start()
self.recording = True
self.logger.info(f"Recording for {duration_sec} seconds...")

# Collect data
ecg_data = []
timestamps = []
start_time = time.time()

try:
while time.time() - start_time < duration_sec:
packet = self.sb.get_ecg()
ecg_data.extend(packet['data']['ecg'])
timestamps.append(packet['timestamp'])

# Progress update
elapsed = time.time() - start_time
if int(elapsed) % 5 == 0:
self.logger.info(f"Recording: {elapsed:.0f}s / {duration_sec}s")

except KeyboardInterrupt:
self.logger.warning("Recording interrupted by user")

finally:
self.sb.stop()
self.recording = False

self.logger.info(f"Recording complete: {len(ecg_data)} samples")
return ecg_data, timestamps

def save(self, ecg_data, timestamps, filename=None):
"""Save data to file"""
if filename is None:
filename = f"ecg_{int(time.time())}.json"

filepath = self.output_dir / filename

data = {
'ecg_data': ecg_data,
'timestamps': timestamps,
'sample_count': len(ecg_data),
'duration_sec': timestamps[-1] - timestamps[0] if timestamps else 0
}

with open(filepath, 'w') as f:
json.dump(data, f)

self.logger.info(f"Saved to {filepath}")
return filepath

def disconnect(self):
"""Disconnect from device"""
if self.sb:
if self.recording:
self.sb.stop()
self.sb.disconnect()
self.logger.info("Disconnected")

# Usage
if __name__ == "__main__":
recorder = ECGRecorder(output_dir="./my_recordings")

try:
# Connect
if not recorder.connect():
print("Failed to connect")
exit(1)

# Configure
recorder.configure(fs=500, apply_filters=True)

# Record
ecg_data, timestamps = recorder.record(duration_sec=30)

# Save
recorder.save(ecg_data, timestamps)

finally:
recorder.disconnect()