Examples
Practical code examples for common use cases with SiFi Bridge Python.
Basic Examples
Simple ECG Recording
Record ECG data for a fixed duration and save to a list.
import sifi_bridge_py as sbp
import time
# 1. Initialize the SifiBridge object to manage the device connection
sb = sbp.SifiBridge()
# 2. Establish a connection to the BioPoint V1.3 hardware
# This searches for and connects to the device via the bridge
sb.connect(sbp.DeviceType.BIOPOINT_V1_3)
# 3. Configure the onboard EMG digital filters
# bandpass_freqs: Filters out noise below 20Hz and above 450Hz
# notch_freq: Removes power line interference (50Hz is common in Europe/Asia; use 60Hz for US/Canada)
sb.configure_emg(bandpass_freqs=(20, 450), notch_freq=50)
# 4. Start the data acquisition stream
# Note: The bridge starts a background thread that fills an internal
# FIFO queue immediately upon calling start().
sb.start()
# 5. Flush the internal buffer for 1 second
# We do this because packets begin accumulating the moment sb.start() is called.
# Since get_emg() only retrieves one packet at a time, we loop to "drain"
# the queue of any stale data or startup noise before our actual recording begins.
print("Flushing buffer...")
flush_start = time.time()
while time.time() - flush_start < 1.0:
sb.get_emg()
# 6. Initialize collection variables
emg_buffer = [] # List to store accumulated EMG data points
start_time = time.time() # Track the start time for the 10s window
# 7. Main collection loop (Collect for 10 seconds)
try:
print("Recording started.")
while time.time() - start_time < 10:
# Pull the oldest available packet from the internal queue
packet = sb.get_emg()
# 'packet' is a dictionary containing metadata and the data array
# We extend our buffer with the 'emg' array found inside 'data'
if packet:
emg_buffer.extend(packet['data']['emg'])
# Provide a real-time status update in the console
print(f"Collected {len(emg_buffer)} samples", end='\r')
finally:
# 8. Cleanup: Ensure the device stops streaming and disconnects
# This block runs even if the loop is interrupted or crashes
sb.stop()
sb.stop()
sb.disconnect()
time.sleep(.5) # Small pause to allow background threads to close cleanly
# 9. Print the final results
print(f"\nTotal EMG samples collected: {len(emg_buffer)}")
Multi-Sensor Data Collection
Collect data from multiple sensors simultaneously.
import sifi_bridge_py as sbp
import time
# 1. Initialize the SifiBridge object
sb = sbp.SifiBridge()
# 2. Establish a connection to the BioPoint V1.3 hardware
sb.connect(sbp.DeviceType.BIOPOINT_V1_3)
# 3. Configure Multiple Sensors
# Each sensor has specific digital filters or hardware gains applied onboard
sb.configure_ecg(bandpass_freqs=(0, 30))
sb.configure_emg(bandpass_freqs=(20, 450), notch_freq=50)
sb.configure_ppg(ir=7, red=7, green=9, blue=9, sens=sbp.PpgSensitivity.MEDIUM)
# 4. Start the acquisition stream
# The library starts a background thread to handle high-speed data transfer
sb.start()
# 5. Initialize buffers and sample counters
ecg_data, ecg_sample_count = [], 0
emg_data, emg_sample_count = [], 0
ppg_data, ppg_sample_count = [], 0
# 6. Flush the internal buffer for 1 second
# We drain the queue to clear out any startup latency or stale data,
# ensuring the recording starts with fresh, synchronized samples.
print("Flushing buffers for all sensors...")
flush_start = time.time()
while time.time() - flush_start < 1.0:
sb.get_data()
print("Recording 10 seconds of multi-sensor data...")
start_time = time.time()
# 7. Main collection loop
try:
while time.time() - start_time < 10:
# Get the oldest packet available for any of the active sensors
packet = sb.get_data()
if packet:
p_type = packet.get('packet_type')
# 8. Route packets and increment data point (sample) counts
# 'packet['data'][p_type]' contains the list of raw samples
if p_type == 'ecg':
ecg_data.append(packet)
ecg_sample_count += len(packet['data']['ecg'])
elif p_type == 'emg':
emg_data.append(packet)
emg_sample_count += len(packet['data']['emg'])
elif p_type == 'ppg':
ppg_data.append(packet)
# PPG packets contain optical data (IR, Red, Green, Blue)
ppg_sample_count += len(packet['data']['ir']) # PPG data is sent back with "ir", "red", "green", "blue" keys, here we count samples from the "ir" channel (all channels have the same length)
# Optional: Print progress
elapsed = time.time() - start_time
print(f"Recording: {elapsed:.1f}s | ECG: {len(ecg_data)} | EMG: {len(emg_data)} | PPG: {len(ppg_data)}", end='\r')
finally:
# 9. Cleanup: Stop streaming and release hardware resources
sb.stop()
sb.disconnect()
time.sleep(.5) # Small pause to allow background threads to close cleanly
# 10. Final Summary
print(f"\n\nRecording Complete:")
print(f"ECG: {len(ecg_data)} packets, {ecg_sample_count} total data points")
print(f"EMG: {len(emg_data)} packets, {emg_sample_count} total data points")
print(f"PPG: {len(ppg_data)} packets, {ppg_sample_count} total data points")
Save Directly to CSV
Let SiFi Bridge handle CSV file creation and writing.
import sifi_bridge_py as sbp
import time
# Create bridge with CSV output
sb = sbp.SifiBridge(publishers="csv://./recordings/")
# Connect and configure
sb.connect(sbp.DeviceType.BIOPOINT_V1_3)
sb.configure_sensors(ecg=True, emg=True, imu=True)
# Record for 60 seconds
sb.start()
print("Recording for 60 seconds...")
time.sleep(60)
sb.stop()
sb.disconnect()
print("Data saved to ./recordings/")
Threading Examples
Background Data Collection
Use threading to collect data in the background while performing other tasks.
import threading
import time
import sifi_bridge_py as sbp
class DataCollector:
def __init__(self):
self.sb = sbp.SifiBridge()
self.running = False
self.thread = None
self.data_buffer = []
def connect_and_configure(self):
self.sb.connect()
self.sb.configure_ecg(state=True, fs=500)
self.sb.start()
def collect_data(self):
"""Background thread function"""
while self.running:
try:
packet = self.sb.get_data(timeout=1.0)
if packet and packet['packet_type'] == 'ecg':
self.data_buffer.append(packet)
except Exception as e:
print(f"Error: {e}")
def start_collection(self):
"""Start background collection"""
self.running = True
self.thread = threading.Thread(target=self.collect_data)
self.thread.start()
def stop_collection(self):
"""Stop background collection"""
self.running = False
if self.thread:
self.thread.join()
self.sb.stop()
self.sb.disconnect()
# Usage
collector = DataCollector()
collector.connect_and_configure()
collector.start_collection()
# Do other work while collecting
print("Collecting data in background...")
for i in range(10):
time.sleep(1)
print(f"Collected {len(collector.data_buffer)} packets")
collector.stop_collection()
print(f"Final count: {len(collector.data_buffer)} packets")
Producer-Consumer Pattern
Use queues for thread-safe data passing.
import threading
import queue
import sifi_bridge_py as sbp
def producer(sb, data_queue, stop_event):
"""Collect data and put in queue"""
while not stop_event.is_set():
try:
packet = sb.get_data(timeout=0.5)
if packet:
data_queue.put(packet)
except:
pass
def consumer(data_queue, stop_event):
"""Process data from queue"""
packet_count = {'ecg': 0, 'emg': 0, 'imu': 0}
while not stop_event.is_set() or not data_queue.empty():
try:
packet = data_queue.get(timeout=0.5)
packet_type = packet.get('packet_type', 'unknown')
if packet_type in packet_count:
packet_count[packet_type] += 1
# Process packet here...
data_queue.task_done()
except queue.Empty:
continue
print(f"Processed packets: {packet_count}")
# Setup
sb = sbp.SifiBridge()
sb.connect()
sb.configure_sensors(ecg=True, emg=True, imu=True)
sb.start()
# Create queue and threads
data_queue = queue.Queue(maxsize=100)
stop_event = threading.Event()
producer_thread = threading.Thread(target=producer, args=(sb, data_queue, stop_event))
consumer_thread = threading.Thread(target=consumer, args=(data_queue, stop_event))
# Start threads
producer_thread.start()
consumer_thread.start()
# Run for 30 seconds
import time
time.sleep(30)
# Stop threads
stop_event.set()
producer_thread.join()
consumer_thread.join()
# Cleanup
sb.stop()
sb.disconnect()
Memory Download Examples
Download Device Memory to CSV
Download stored data from device memory to CSV files.
import sifi_bridge_py as sbp
import time
# Setup with CSV output
sb = sbp.SifiBridge(publishers="csv://./memory_data/")
# Connect
while not sb.connect(sbp.DeviceType.BIOPOINT_V1_3):
print("Connecting...")
time.sleep(1)
# Start memory download
kb = sb.start_memory_download()
print(f"Downloading {kb} kB from device memory...")
# Wait for download to complete
packets_received = 0
start_time = time.time()
while True:
packet = sb.get_data()
packets_received += 1
if sb.is_memory_download_completed(packet):
break
# Print progress
if packets_received % 100 == 0:
elapsed = time.time() - start_time
rate = (packets_received * 0.227) / elapsed # Approximate kBps
print(f"Downloaded {packets_received} packets ({rate:.1f} kBps)")
print(f"Download complete! Check ./memory_data/")
sb.disconnect()
Download Memory to Python Lists
Download and store memory data in Python for immediate processing.
import sifi_bridge_py as sbp
# Setup
sb = sbp.SifiBridge()
sb.connect()
# Enable only ECG for clarity
sb.configure_sensors(ecg=True)
# Start memory download
kb = sb.start_memory_download()
print(f"Downloading {kb} kB...")
# Collect all ECG data
ecg_samples = []
packet_count = 0
while True:
packet = sb.get_data()
if sb.is_memory_download_completed(packet):
break
if packet['packet_type'] == 'ecg':
ecg_samples.extend(packet['data']['ecg'])
packet_count += 1
sb.disconnect()
print(f"Downloaded {len(ecg_samples)} ECG samples in {packet_count} packets")
print(f"First 10 samples: {ecg_samples[:10]}")
Data Processing Examples
Real-Time Signal Processing
Apply filters and processing to incoming data.
import sifi_bridge_py as sbp
import numpy as np
from scipy import signal
class ECGProcessor:
def __init__(self, fs=500):
self.fs = fs
# Design bandpass filter (0.5-40 Hz)
self.b, self.a = signal.butter(4, [0.5, 40], 'bandpass', fs=fs)
self.zi = signal.lfilter_zi(self.b, self.a)
def process(self, ecg_samples):
"""Apply filtering to ECG samples"""
filtered, self.zi = signal.lfilter(
self.b, self.a, ecg_samples, zi=self.zi
)
return filtered
# Setup
sb = sbp.SifiBridge()
sb.connect()
sb.configure_ecg(state=True, fs=500)
sb.start()
processor = ECGProcessor(fs=500)
# Process incoming data
for _ in range(100):
packet = sb.get_ecg()
raw_ecg = packet['data']['ecg']
filtered_ecg = processor.process(raw_ecg)
# Do something with filtered data
print(f"Raw mean: {np.mean(raw_ecg):.4f}, "
f"Filtered mean: {np.mean(filtered_ecg):.4f}")
sb.stop()
sb.disconnect()
Calculate Heart Rate from PPG
Simple heart rate estimation from PPG data.
import sifi_bridge_py as sbp
import numpy as np
from scipy import signal
def estimate_heart_rate(ppg_data, fs=100):
"""Estimate heart rate from PPG signal"""
# Use IR channel
ir_signal = np.array(ppg_data)
# Bandpass filter (0.5-4 Hz for heart rate)
b, a = signal.butter(3, [0.5, 4], 'bandpass', fs=fs)
filtered = signal.filtfilt(b, a, ir_signal)
# Find peaks
peaks, _ = signal.find_peaks(filtered, distance=fs*0.5)
if len(peaks) < 2:
return None
# Calculate heart rate
ibi = np.diff(peaks) / fs # Inter-beat intervals
hr = 60 / np.mean(ibi)
return hr
# Setup
sb = sbp.SifiBridge()
sb.connect()
sb.configure_ppg(state=True, fs=100, ir=25, sens=sbp.PpgSensitivity.HIGH)
sb.start()
# Collect 10 seconds of data
ppg_buffer = []
packet_count = 0
target_packets = 100 # ~10 seconds at 100 Hz
while packet_count < target_packets:
packet = sb.get_ppg()
ppg_buffer.extend(packet['data']['ir'])
packet_count += 1
sb.stop()
sb.disconnect()
# Estimate heart rate
hr = estimate_heart_rate(ppg_buffer, fs=100)
if hr:
print(f"Estimated heart rate: {hr:.1f} BPM")
Advanced Examples
Multiple Device Management
Manage multiple devices simultaneously.
import sifi_bridge_py as sbp
# Create bridge
sb = sbp.SifiBridge()
# Create virtual devices for two physical devices
sb.create_device("device1", select=True)
device1_addr = "XX:XX:XX:XX:XX:01"
sb.create_device("device2", select=False)
device2_addr = "XX:XX:XX:XX:XX:02"
# Connect device 1
sb.select_device("device1")
sb.connect(device1_addr)
sb.configure_ecg(state=True)
sb.start()
# Connect device 2
sb.select_device("device2")
sb.connect(device2_addr)
sb.configure_emg(state=True)
sb.start()
# Collect data from both
for _ in range(50):
packet = sb.get_ecg()
print(f"{packet['id']} ECG: {packet['data']['ecg'][:3]}")
# Cleanup
sb.select_device("device1")
sb.stop()
sb.disconnect()
sb.select_device("device2")
sb.stop()
sb.disconnect()
Lab Streaming Layer (LSL) Integration
Stream data via LSL for use with other applications.
import sifi_bridge_py as sbp
import time
# Create bridge with LSL enabled
sb = sbp.SifiBridge(use_lsl=True)
# Connect and configure
sb.connect()
sb.configure_sensors(ecg=True, emg=True, imu=True)
# Start acquisition (data automatically streams to LSL)
sb.start()
print("Streaming to LSL. Use LSL viewer to see outlets.")
print("Outlets: ECG, EMG, IMU, etc.")
print("Press Ctrl+C to stop...")
try:
# Keep running
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\nStopping...")
sb.stop()
sb.disconnect()
Network Streaming
Stream data over TCP for remote monitoring.
import sifi_bridge_py as sbp
import time
# Create bridge with TCP output
sb = sbp.SifiBridge(publishers="tcp://0.0.0.0:5000")
# Connect and configure
sb.connect()
sb.configure_ecg(state=True, fs=500)
# Start streaming
sb.start()
print("Streaming ECG data to TCP port 5000")
print("Connect with: nc localhost 5000")
print("Press Ctrl+C to stop...")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\nStopping...")
sb.stop()
sb.disconnect()
Auto-Reconnect on Disconnect
Automatically reconnect if connection is lost.
import sifi_bridge_py as sbp
import time
def connect_with_retry(sb, max_retries=10, retry_delay=2):
"""Try to connect with retries"""
for attempt in range(max_retries):
try:
if sb.connect():
print("Connected!")
return True
print(f"Retry {attempt + 1}/{max_retries}")
time.sleep(retry_delay)
except Exception as e:
print(f"Error: {e}")
time.sleep(retry_delay)
return False
# Setup
sb = sbp.SifiBridge()
if not connect_with_retry(sb):
print("Failed to connect")
exit(1)
sb.configure_ecg(state=True)
sb.start()
# Monitor connection and reconnect if needed
packet_timeout = 5.0 # Consider disconnected if no packet in 5s
try:
while True:
packet = sb.get_data(timeout=packet_timeout)
if not packet:
print("Connection lost! Reconnecting...")
sb.disconnect()
if connect_with_retry(sb):
sb.configure_ecg(state=True)
sb.start()
else:
print("Failed to reconnect")
break
else:
print(f"Received {packet['packet_type']}")
except KeyboardInterrupt:
print("\nStopping...")
sb.stop()
sb.disconnect()
Complete Application Example
Full-Featured ECG Recorder
A complete application with error handling, configuration, and data export.
import sifi_bridge_py as sbp
import time
import json
from pathlib import Path
import logging
class ECGRecorder:
def __init__(self, output_dir="./recordings"):
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
self.sb = None
self.recording = False
# Setup logging
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
def connect(self, device_type=sbp.DeviceType.BIOPOINT_V1_3, max_retries=5):
"""Connect to device with retries"""
self.sb = sbp.SifiBridge()
for attempt in range(max_retries):
try:
if self.sb.connect(device_type):
self.logger.info("Connected successfully")
return True
self.logger.warning(f"Connection attempt {attempt + 1} failed")
time.sleep(2)
except Exception as e:
self.logger.error(f"Connection error: {e}")
time.sleep(2)
return False
def configure(self, fs=500, apply_filters=True):
"""Configure ECG sensor"""
self.sb.configure_ecg(
state=True,
fs=fs,
dc_notch=apply_filters,
mains_notch=60 if apply_filters else None,
bandpass=apply_filters,
flo=0.5,
fhi=40
)
self.logger.info(f"Configured ECG: fs={fs} Hz, filters={apply_filters}")
def record(self, duration_sec):
"""Record ECG for specified duration"""
if not self.sb:
raise RuntimeError("Not connected")
# Start acquisition
self.sb.start()
self.recording = True
self.logger.info(f"Recording for {duration_sec} seconds...")
# Collect data
ecg_data = []
timestamps = []
start_time = time.time()
try:
while time.time() - start_time < duration_sec:
packet = self.sb.get_ecg()
ecg_data.extend(packet['data']['ecg'])
timestamps.append(packet['timestamp'])
# Progress update
elapsed = time.time() - start_time
if int(elapsed) % 5 == 0:
self.logger.info(f"Recording: {elapsed:.0f}s / {duration_sec}s")
except KeyboardInterrupt:
self.logger.warning("Recording interrupted by user")
finally:
self.sb.stop()
self.recording = False
self.logger.info(f"Recording complete: {len(ecg_data)} samples")
return ecg_data, timestamps
def save(self, ecg_data, timestamps, filename=None):
"""Save data to file"""
if filename is None:
filename = f"ecg_{int(time.time())}.json"
filepath = self.output_dir / filename
data = {
'ecg_data': ecg_data,
'timestamps': timestamps,
'sample_count': len(ecg_data),
'duration_sec': timestamps[-1] - timestamps[0] if timestamps else 0
}
with open(filepath, 'w') as f:
json.dump(data, f)
self.logger.info(f"Saved to {filepath}")
return filepath
def disconnect(self):
"""Disconnect from device"""
if self.sb:
if self.recording:
self.sb.stop()
self.sb.disconnect()
self.logger.info("Disconnected")
# Usage
if __name__ == "__main__":
recorder = ECGRecorder(output_dir="./my_recordings")
try:
# Connect
if not recorder.connect():
print("Failed to connect")
exit(1)
# Configure
recorder.configure(fs=500, apply_filters=True)
# Record
ecg_data, timestamps = recorder.record(duration_sec=30)
# Save
recorder.save(ecg_data, timestamps)
finally:
recorder.disconnect()