Advanced Topics
Advanced features, optimization techniques, and best practices for SiFi Bridge Python.
Threading and Concurrency
Why Use Threading?
SiFi Bridge Python's get_data() method blocks until data arrives. For responsive applications, use threading to:
- Collect data in the background
- Update UI while recording
- Perform real-time processing
- Handle multiple devices simultaneously
Thread-Safe Operations
Most SifiBridge operations are thread-safe when accessing the same instance, but be careful with:
# ✓ Safe: Reading from same instance
def reader_thread(sb):
while running:
packet = sb.get_data()
process(packet)
# ✗ Unsafe: Modifying configuration from multiple threads
def config_thread_1(sb):
sb.configure_ecg(state=True) # Don't do this!
def config_thread_2(sb):
sb.configure_emg(state=True) # Simultaneously from another thread
Best Practice: Configure the device from the main thread, then use worker threads only for data collection.
Recommended Threading Patterns
Pattern 1: Single Worker Thread
Simple background data collection:
import threading
import sifi_bridge_py as sbp
class DataCollector:
def __init__(self):
self.sb = sbp.SifiBridge()
self.running = False
self.data = []
self.lock = threading.Lock()
def worker(self):
while self.running:
packet = self.sb.get_data(timeout=1.0)
if packet:
with self.lock:
self.data.append(packet)
def start(self):
self.sb.connect()
self.sb.configure_ecg(state=True)
self.sb.start()
self.running = True
self.thread = threading.Thread(target=self.worker, daemon=True)
self.thread.start()
def stop(self):
self.running = False
self.thread.join()
self.sb.stop()
self.sb.disconnect()
def get_data_copy(self):
with self.lock:
return self.data.copy()
Pattern 2: Producer-Consumer with Queue
For processing-heavy applications:
import threading
import queue
import sifi_bridge_py as sbp
def producer(sb, q, stop_event):
"""Collect data and enqueue"""
while not stop_event.is_set():
try:
packet = sb.get_data(timeout=0.5)
if packet:
q.put(packet, timeout=0.5)
except queue.Full:
print("Queue full, dropping packet")
except Exception as e:
print(f"Producer error: {e}")
def consumer(q, stop_event):
"""Process data from queue"""
while not stop_event.is_set() or not q.empty():
try:
packet = q.get(timeout=0.5)
# Heavy processing here
process_packet(packet)
q.task_done()
except queue.Empty:
continue
# Setup
sb = sbp.SifiBridge()
sb.connect()
sb.configure_ecg(state=True)
sb.start()
# Create queue and events
data_queue = queue.Queue(maxsize=1000)
stop_event = threading.Event()
# Start threads
threads = [
threading.Thread(target=producer, args=(sb, data_queue, stop_event)),
threading.Thread(target=consumer, args=(data_queue, stop_event)),
]
for t in threads:
t.start()
# Run for a while...
import time
time.sleep(60)
# Cleanup
stop_event.set()
for t in threads:
t.join()
sb.stop()
sb.disconnect()
Pattern 3: Thread Pool for Multiple Devices
Manage multiple devices efficiently:
from concurrent.futures import ThreadPoolExecutor
import sifi_bridge_py as sbp
def manage_device(device_name, device_addr, duration):
"""Worker function for one device"""
sb = sbp.SifiBridge()
sb.create_device(device_name)
if not sb.connect(device_addr):
return None
sb.configure_ecg(state=True)
sb.start()
# Collect data
data = []
import time
start = time.time()
while time.time() - start < duration:
packet = sb.get_ecg()
data.append(packet)
sb.stop()
sb.disconnect()
return data
# Collect from 3 devices simultaneously
devices = [
("device1", "XX:XX:XX:XX:XX:01"),
("device2", "XX:XX:XX:XX:XX:02"),
("device3", "XX:XX:XX:XX:XX:03"),
]
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [
executor.submit(manage_device, name, addr, 30)
for name, addr in devices
]
results = [f.result() for f in futures]
for i, data in enumerate(results):
if data:
print(f"Device {i+1}: {len(data)} packets")
Performance Optimization
Reduce Latency
1. Use timeout parameter:
# Don't block indefinitely
packet = sb.get_data(timeout=1.0)
# Check if data received
if packet:
process(packet)
2. Enable low-latency mode (if supported):
sb.set_low_latency_mode(True)
3. Optimize sensor configuration:
# Lower sampling rates = faster packets
sb.configure_ecg(fs=250) # Instead of 500 Hz
# Disable unused sensors
sb.configure_sensors(ecg=True) # Only what you need
Reduce Memory Usage
1. Process data incrementally:
# Bad: Accumulate everything in memory
all_data = []
for _ in range(10000):
packet = sb.get_ecg()
all_data.append(packet) # Memory grows!
# Good: Process and discard
for _ in range(10000):
packet = sb.get_ecg()
processed = process_packet(packet)
save_to_disk(processed) # Don't keep in memory
2. Use generators for large datasets:
def data_generator(sb, num_packets):
"""Yield packets one at a time"""
for _ in range(num_packets):
yield sb.get_ecg()
# Process without loading all into memory
for packet in data_generator(sb, 10000):
process_packet(packet)
3. Limit queue sizes:
# Bounded queue prevents memory explosion
data_queue = queue.Queue(maxsize=100)
Maximize Throughput
1. Use multiple outputs efficiently:
# Let sifibridge handle file I/O
sb = sbp.SifiBridge(publishers="csv://./data/")
# Just read and process, don't worry about saving
while True:
packet = sb.get_data()
display_realtime(packet) # CSV handled automatically
2. Batch processing:
# Collect batches before processing
batch_size = 100
batch = []
while True:
packet = sb.get_ecg()
batch.append(packet)
if len(batch) >= batch_size:
process_batch(batch) # Process 100 at once
batch.clear()
Error Handling and Robustness
Connection Resilience
Handle connection drops gracefully:
import sifi_bridge_py as sbp
import time
class RobustConnection:
def __init__(self, max_retries=5, retry_delay=2):
self.sb = None
self.max_retries = max_retries
self.retry_delay = retry_delay
self.connected = False
def connect(self):
"""Connect with automatic retries"""
for attempt in range(self.max_retries):
try:
self.sb = sbp.SifiBridge()
if self.sb.connect():
self.connected = True
return True
print(f"Connection attempt {attempt + 1} failed")
time.sleep(self.retry_delay)
except ConnectionError as e:
print(f"Bluetooth error: {e}")
time.sleep(self.retry_delay)
except Exception as e:
print(f"Unexpected error: {e}")
time.sleep(self.retry_delay)
return False
def ensure_connected(self):
"""Reconnect if connection lost"""
if not self.connected:
return self.connect()
try:
# Test connection
info = self.sb.show()
return info.get('connected', False)
except:
self.connected = False
return self.connect()
def get_data_robust(self, timeout=5.0):
"""Get data with automatic reconnection"""
if not self.ensure_connected():
return None
try:
return self.sb.get_data(timeout=timeout)
except Exception as e:
print(f"Error getting data: {e}")
self.connected = False
return None
Graceful Shutdown
Always cleanup properly:
import sifi_bridge_py as sbp
import signal
import sys
class Application:
def __init__(self):
self.sb = sbp.SifiBridge()
self.running = False
# Handle Ctrl+C
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
def signal_handler(self, signum, frame):
print("\nShutting down gracefully...")
self.shutdown()
sys.exit(0)
def shutdown(self):
"""Proper cleanup"""
if self.sb and self.running:
try:
self.sb.stop()
self.sb.disconnect()
print("Disconnected successfully")
except Exception as e:
print(f"Error during shutdown: {e}")
def run(self):
try:
self.sb.connect()
self.sb.configure_ecg(state=True)
self.sb.start()
self.running = True
while self.running:
packet = self.sb.get_data()
# Process...
except Exception as e:
print(f"Error: {e}")
finally:
self.shutdown()
# Usage
app = Application()
app.run()
Data Validation
Validate packets before processing:
def validate_packet(packet):
"""Validate data packet structure"""
required_keys = ['id', 'packet_type', 'timestamp', 'status', 'data']
# Check structure
if not all(key in packet for key in required_keys):
return False
# Check status
if packet['status'] not in ['ok', 'lost_data']:
print(f"Warning: packet status = {packet['status']}")
# Check data completeness
if packet['packet_type'] == 'ecg':
if 'ecg' not in packet['data']:
return False
if len(packet['data']['ecg']) == 0:
return False
return True
# Use in data collection
while True:
packet = sb.get_data()
if validate_packet(packet):
process(packet)
else:
print("Invalid packet received")
Multiple Devices
Managing Multiple Physical Devices
Use virtual device managers:
import sifi_bridge_py as sbp
# Create bridge
sb = sbp.SifiBridge()
# Setup device 1
sb.create_device("participant1")
sb.select_device("participant1")
sb.connect("XX:XX:XX:XX:XX:01")
sb.configure_ecg(state=True, fs=500)
# Setup device 2
sb.create_device("participant2")
sb.select_device("participant2")
sb.connect("XX:XX:XX:XX:XX:02")
sb.configure_emg(state=True, fs=2000)
# Start both
sb.select_device("participant1")
sb.start()
sb.select_device("participant2")
sb.start()
# Collect from both
for _ in range(100):
# Get from device 1
sb.select_device("participant1")
p1_packet = sb.get_ecg()
# Get from device 2
sb.select_device("participant2")
p2_packet = sb.get_emg()
print(f"P1: {len(p1_packet['data']['ecg'])} samples, "
f"P2: {len(p2_packet['data']['emg'])} samples")
# Cleanup
for device in ["participant1", "participant2"]:
sb.select_device(device)
sb.stop()
sb.disconnect()
Synchronization Between Devices
Use timestamps for synchronization:
import numpy as np
def synchronize_data(packets1, packets2):
"""Align data from two devices by timestamp"""
# Extract timestamps
t1 = [p['timestamp'] for p in packets1]
t2 = [p['timestamp'] for p in packets2]
# Find common time range
t_start = max(t1[0], t2[0])
t_end = min(t1[-1], t2[-1])
# Filter to common range
sync1 = [p for p in packets1 if t_start <= p['timestamp'] <= t_end]
sync2 = [p for p in packets2 if t_start <= p['timestamp'] <= t_end]
return sync1, sync2
Integration with Analysis Tools
NumPy Integration
import sifi_bridge_py as sbp
import numpy as np
sb = sbp.SifiBridge()
sb.connect()
sb.configure_ecg(state=True, fs=500)
sb.start()
# Collect into numpy array
ecg_list = []
for _ in range(100):
packet = sb.get_ecg()
ecg_list.extend(packet['data']['ecg'])
# Convert to numpy array
ecg_array = np.array(ecg_list)
# Analyze
print(f"Mean: {np.mean(ecg_array):.4f}")
print(f"Std: {np.std(ecg_array):.4f}")
print(f"Min: {np.min(ecg_array):.4f}, Max: {np.max(ecg_array):.4f}")
sb.stop()
sb.disconnect()
Pandas Integration
import sifi_bridge_py as sbp
import pandas as pd
sb = sbp.SifiBridge()
sb.connect()
sb.configure_sensors(ecg=True, imu=True)
sb.start()
# Collect data
data_records = []
for _ in range(100):
packet = sb.get_data()
if packet['packet_type'] == 'ecg':
for sample in packet['data']['ecg']:
data_records.append({
'timestamp': packet['timestamp'],
'type': 'ecg',
'value': sample
})
sb.stop()
sb.disconnect()
# Create DataFrame
df = pd.DataFrame(data_records)
print(df.head())
print(df.describe())
# Save to CSV
df.to_csv('ecg_data.csv', index=False)
Matplotlib Real-Time Plotting
import sifi_bridge_py as sbp
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from collections import deque
# Setup
sb = sbp.SifiBridge()
sb.connect()
sb.configure_ecg(state=True, fs=500)
sb.start()
# Data buffer
buffer_size = 2000
ecg_buffer = deque(maxlen=buffer_size)
# Setup plot
fig, ax = plt.subplots()
line, = ax.plot([], [])
ax.set_ylim(-0.5, 0.5)
ax.set_xlim(0, buffer_size)
ax.set_xlabel('Samples')
ax.set_ylabel('ECG (mV)')
def update(frame):
"""Animation update function"""
packet = sb.get_ecg()
ecg_buffer.extend(packet['data']['ecg'])
line.set_data(range(len(ecg_buffer)), list(ecg_buffer))
return line,
# Animate
ani = animation.FuncAnimation(fig, update, interval=50, blit=True)
plt.show()
# Cleanup
sb.stop()
sb.disconnect()
Best Practices
Do's
✓ Use threading for responsive applications
# Good: Non-blocking data collection
thread = threading.Thread(target=collect_data)
✓ Always use try-finally for cleanup
try:
sb.start()
collect_data()
finally:
sb.stop()
sb.disconnect()
✓ Validate data before processing
if packet and packet['status'] == 'ok':
process(packet)
✓ Use timeouts to prevent blocking
packet = sb.get_data(timeout=5.0)
✓ Configure once, collect many times
# Good: Configure once
sb.configure_ecg(state=True, fs=500)
for _ in range(1000):
packet = sb.get_ecg()
Don'ts
✗ Don't ignore connection failures
# Bad
sb.connect() # Might fail silently
# Good
if not sb.connect():
raise RuntimeError("Failed to connect")
✗ Don't reconfigure during acquisition
# Bad
sb.start()
sb.configure_ecg(fs=1000) # Don't do this!
✗ Don't accumulate unlimited data
# Bad: Will run out of memory
all_data = []
while True:
all_data.append(sb.get_data()) # No limit!
✗ Don't forget to stop before disconnect
# Bad
sb.disconnect() # Device still streaming!
# Good
sb.stop()
sb.disconnect()
Troubleshooting
High Latency
Problem: Data arrives slowly or with delays
Solutions:
- Enable low-latency mode:
sb.set_low_latency_mode(True) - Reduce sampling rates
- Check Bluetooth signal strength
- Increase BLE power:
sb.set_ble_power(sbp.BleTxPower.HIGH)
Lost Data Packets
Problem: packet['status'] == 'lost_data'
Solutions:
- Increase BLE transmission power
- Reduce distance between device and computer
- Disable other Bluetooth devices
- Use lower sampling rates
- Process data faster (use threading)
Memory Issues
Problem: Program runs out of memory
Solutions:
- Process data incrementally, don't accumulate
- Use bounded queues:
queue.Queue(maxsize=100) - Write to disk instead of storing in RAM
- Use generators instead of lists
Connection Drops
Problem: Device disconnects randomly
Solutions:
- Check battery level
- Increase BLE power
- Reduce interference (WiFi, other BLE devices)
- Implement auto-reconnect logic
- Check device firmware version