Federated Learning Coordinator User Manual
The Federated Learning Coordinator enables healthcare organizations to collectively improve AI diagnosis models while maintaining complete patient data privacy.
Overview
The Federated Learning Coordinator allows hospitals to:
- Participate in collaborative AI training without sharing patient data
- Improve diagnosis accuracy across organizations
- Maintain HIPAA/GDPR compliance
- Track privacy budget and differential privacy guarantees
When to Use
Use Federated Learning Coordinator when:
- ✅ Multiple hospitals want to improve diagnosis AI together
- ✅ You have sensitive patient data that cannot be shared
- ✅ You want to contribute to healthcare AI research
- ✅ You need HIPAA-compliant collaborative learning
Do not use for:
- ❌ Real-time clinical decision support
- ❌ Individual patient diagnosis (use Eligibility/Prescription instead)
- ❌ Non-medical data analysis
Data Privacy Guarantees
What Stays Private
- ✅ Raw patient medical data (never leaves your organization)
- ✅ Patient identities
- ✅ Diagnoses
- ✅ Medications
- ✅ Medical history details
What is Shared
- ❌ Only encrypted gradients (mathematical model updates)
- ❌ Gradients have differential privacy noise added
- ❌ No plaintext patient information ever transmitted
How It Works
Simple Explanation
Step 1: Your hospital has patient data
Step 2: Compute what to learn from that data (gradients)
Step 3: Encrypt the learning
Step 4: Add noise to protect privacy
Step 5: Send encrypted learning to coordinator
Step 6: Coordinator averages all hospitals' learning
Step 7: Improved model distributed back to all
Step 8: Repeat monthly
Privacy Mathematics
- Differential Privacy: Proves that individual patient data cannot be recovered
- zCDP: Zero-Concentrated Differential Privacy ensures privacy bounds
- Gaussian Noise: Math ensures privacy regardless of gradient content
API Usage
Endpoint
POST /submit-federated-update
Request Format
{
"patient_data_list": [
{
"patient_id": "PATIENT_001",
"encrypted_data": "gAAAAABlmZ...",
"ipfs_cid": "QmV5koooi...",
"data_hash": "a1b2c3d4..."
},
{
"patient_id": "PATIENT_002",
"encrypted_data": "gAAAAABlnZ...",
"ipfs_cid": "QmV5koooj...",
"data_hash": "b2c3d4e5..."
}
],
"round_number": 5
}
Response Format
{
"round": 5,
"participants": 3,
"model_hash": "abc123def456..."
}
Response Fields
| Field | Type | Description |
|---|---|---|
round | Integer | Training round number completed |
participants | Integer | Number of hospitals that submitted data |
model_hash | String | SHA256 hash of updated model (for verification) |
Step-by-Step Setup
Step 1: Generate Encryption Keys
from cryptography.fernet import Fernet
# Generate and securely store encryption key
encryption_key = Fernet.generate_key()
# IMPORTANT: Save this key securely
with open('/secure/location/encryption_key.bin', 'wb') as f:
f.write(encryption_key)
print(f"Encryption key generated and saved")
Step 2: Prepare Patient Data Batch
import json
import hashlib
from cryptography.fernet import Fernet
# Load encryption key
with open('/secure/location/encryption_key.bin', 'rb') as f:
encryption_key = f.read()
cipher = Fernet(encryption_key)
# Prepare patient batch (from your EHR)
patients = [
{
"patient_id": "PATIENT_001",
"age": 45,
"symptoms": ["fever", "cough"],
"vitals": {"bp": "120/80", "hr": 72},
"labs": {"wbc": 12000},
"diagnosis": "respiratory_infection"
},
{
"patient_id": "PATIENT_002",
"age": 62,
"symptoms": ["chest_pain", "shortness_of_breath"],
"vitals": {"bp": "140/90", "hr": 88},
"labs": {"troponin": 0.02},
"diagnosis": "acute_coronary_syndrome"
}
]
# Prepare data for submission
patient_data_list = []
for patient in patients:
# Encrypt medical data
encrypted = cipher.encrypt(
json.dumps(patient).encode()
).decode()
# Compute hash for integrity
data_hash = hashlib.sha256(
json.dumps(patient).encode()
).hexdigest()
# Add to list
patient_data_list.append({
"patient_id": patient["patient_id"],
"encrypted_data": encrypted,
"ipfs_cid": "QmV5koooi...", # Optional: upload to IPFS first
"data_hash": data_hash
})
print(f"Prepared {len(patient_data_list)} patients for training")
Step 3: Submit to Federated Learning Coordinator
import requests
api_url = "http://localhost:8000/submit-federated-update"
payload = {
"patient_data_list": patient_data_list,
"round_number": 5
}
response = requests.post(api_url, json=payload)
result = response.json()
print(f"Round {result['round']} completed")
print(f"Participants: {result['participants']}")
print(f"Model hash: {result['model_hash']}")
Step 4: Store and Verify Results
# Store model hash for verification
with open('model_history.log', 'a') as f:
f.write(f"Round {result['round']}: {result['model_hash']}\n")
# Verify consistency with other hospitals
# (All hospitals should report same model_hash for same round)
Integration with Your Hospital System
EHR Integration
import schedule
import time
from datetime import datetime
class FederatedLearningIntegration:
def __init__(self, api_url, encryption_key_path):
self.api_url = api_url
self.encryption_key_path = encryption_key_path
self.round_number = 1
def run_training_round(self):
"""Execute one federated learning training round."""
print(f"[{datetime.now()}] Starting FL round {self.round_number}")
# Step 1: Extract patients from EHR
patients = self.extract_from_ehr(
num_patients=100,
conditions=["respiratory", "cardiac"]
)
# Step 2: Prepare encrypted data
patient_data = self.prepare_encrypted_data(patients)
# Step 3: Submit to coordinator
try:
result = self.submit_federated_update(
patient_data_list=patient_data,
round_number=self.round_number
)
print(f"Round {result['round']} succeeded")
print(f"Model hash: {result['model_hash']}")
self.round_number += 1
except Exception as e:
print(f"Round failed: {e}")
# Implement retry logic
self.retry_round()
def extract_from_ehr(self, num_patients, conditions):
"""Extract patient data from EHR system."""
# Implementation specific to your EHR
pass
def prepare_encrypted_data(self, patients):
"""Encrypt and prepare patient data."""
# Load key
with open(self.encryption_key_path, 'rb') as f:
encryption_key = f.read()
cipher = Fernet(encryption_key)
# Encrypt each patient
data_list = []
for patient in patients:
encrypted = cipher.encrypt(
json.dumps(patient).encode()
).decode()
data_list.append({
"patient_id": patient['id'],
"encrypted_data": encrypted,
"ipfs_cid": "...",
"data_hash": hashlib.sha256(
json.dumps(patient).encode()
).hexdigest()
})
return data_list
def submit_federated_update(self, patient_data_list, round_number):
"""Submit encrypted data to coordinator."""
response = requests.post(
self.api_url,
json={
"patient_data_list": patient_data_list,
"round_number": round_number
}
)
return response.json()
# Schedule weekly training rounds
fl_integration = FederatedLearningIntegration(
api_url="http://localhost:8000/submit-federated-update",
encryption_key_path="/secure/encryption_key.bin"
)
# Run every Monday at 2 AM
schedule.every().monday.at("02:00").do(fl_integration.run_training_round)
while True:
schedule.run_pending()
time.sleep(60)
Privacy Budget Tracking
Understanding Privacy Budget
Privacy budget represents the total allowed privacy loss across all training rounds.
Total Privacy Budget = 1.0 (ε)
Round 1: Spend 0.1 ε → Remaining: 0.9 ε
Round 2: Spend 0.1 ε → Remaining: 0.8 ε
...
Round 10: Spend 0.1 ε → Remaining: 0.0 ε (exhausted)
Tracking Implementation
class PrivacyBudgetTracker:
def __init__(self, total_epsilon: float = 1.0):
self.epsilon_total = total_epsilon
self.epsilon_used = 0.0
self.rounds = []
def log_round(
self,
round_num: int,
sigma: float,
num_samples: int,
model_hash: str
):
"""Log a federated learning round."""
# Compute privacy cost
# (simplified; actual formula depends on DP mechanism)
epsilon_this_round = self._compute_epsilon(sigma, num_samples)
self.epsilon_used += epsilon_this_round
self.rounds.append({
"round": round_num,
"epsilon_spent": epsilon_this_round,
"epsilon_total_used": self.epsilon_used,
"epsilon_remaining": self.epsilon_total - self.epsilon_used,
"model_hash": model_hash,
"timestamp": datetime.utcnow().isoformat()
})
# Alert if budget running low
remaining_pct = (self.epsilon_total - self.epsilon_used) / self.epsilon_total
if remaining_pct < 0.2:
print(f"WARNING: Privacy budget running low ({remaining_pct*100:.1f}% remaining)")
def _compute_epsilon(self, sigma: float, num_samples: int) -> float:
"""Compute privacy cost for a round."""
# zCDP formula: ε ≈ 1 / (2 * σ²)
# (simplified for demonstration)
return 1.0 / (2 * sigma**2) if sigma > 0 else 0.1
def get_report(self) -> dict:
"""Get privacy budget report."""
return {
"total_rounds": len(self.rounds),
"epsilon_total": self.epsilon_total,
"epsilon_used": self.epsilon_used,
"epsilon_remaining": self.epsilon_total - self.epsilon_used,
"percentage_used": (self.epsilon_used / self.epsilon_total) * 100,
"history": self.rounds
}
# Usage
tracker = PrivacyBudgetTracker(total_epsilon=1.0)
# After each round
tracker.log_round(
round_num=5,
sigma=1.0,
num_samples=1500,
model_hash="abc123..."
)
# Check budget
report = tracker.get_report()
print(f"Budget used: {report['percentage_used']:.1f}%")
Performance Optimization
Batch Size Considerations
| Batch Size | Gradient Quality | Training Time | Privacy Cost |
|---|---|---|---|
| 10 patients | Low | 10ms | High |
| 100 patients | Medium | 100ms | Medium |
| 1000 patients | High | 1000ms | Low |
Recommendation: Use 500-2000 patients per round for optimal balance.
Caching Strategy
# Cache model parameters to avoid recomputation
model_cache = {}
def get_model_weights(model_hash):
"""Get model weights from cache or recompute."""
if model_hash in model_cache:
return model_cache[model_hash]
# Load from disk
weights = load_model_weights(model_hash)
model_cache[model_hash] = weights
return weights
Error Handling
Common Issues
Issue 1: Encryption Key Mismatch
try:
result = submit_federated_update(patient_data)
except Exception as e:
if "decryption failed" in str(e):
print("ERROR: Encryption key mismatch")
print("Ensure encryption_key.bin matches coordinator's key")
Issue 2: Too Few Participants
{
"round": 5,
"participants": 1,
"warning": "Only 1 hospital participated. Waiting for more."
}
Solution: Ensure other hospitals submit data for same round. Coordinate timing.
Issue 3: Data Hash Mismatch
try:
result = submit_federated_update(patient_data)
except HTTPException as e:
if "data_hash" in str(e):
# Recompute hash correctly
correct_hash = hashlib.sha256(
json.dumps(patient).encode()
).hexdigest()
Issue 4: Network Timeout
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=4, max=60)
)
def submit_with_retry(patient_data):
"""Submit with automatic retry."""
return submit_federated_update(patient_data)
Monitoring and Auditing
Key Metrics
monitoring_data = {
"round": 5,
"submission_time": "2024-01-15T02:30:00Z",
"patients_submitted": 1500,
"data_size_encrypted": 2500000, # bytes
"submission_latency": 350, # ms
"model_hash": "abc123...",
"privacy_epsilon_used": 0.1,
"status": "success"
}
Audit Logging
import logging
# Configure audit logging
audit_logger = logging.getLogger('federated_learning_audit')
handler = logging.FileHandler('federated_learning_audit.log')
handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
audit_logger.addHandler(handler)
# Log each submission
def log_submission(patient_count, model_hash, round_num):
audit_logger.info(
f"Round {round_num}: Submitted {patient_count} patients. "
f"Model hash: {model_hash}"
)
Testing & Validation
Test Case 1: Single Round Training
def test_single_round():
# Prepare test data
test_patients = [
{
"patient_id": f"TEST_{i}",
"age": 30 + i,
"symptoms": ["test"],
"diagnosis": "test"
}
for i in range(100)
]
# Encrypt and submit
encrypted_data = prepare_encrypted_data(test_patients)
result = submit_federated_update(encrypted_data, round_number=1)
# Verify result
assert result['round'] == 1
assert result['participants'] >= 1
assert len(result['model_hash']) == 64 # SHA256 hash length
print("✓ Single round training test passed")
Test Case 2: Multi-Hospital Consistency
def test_multi_hospital_consistency():
"""Verify all hospitals receive same model hash."""
results = []
# Hospital A submits
result_a = submit_from_hospital_a(round_num=5)
results.append(result_a['model_hash'])
# Hospital B submits
result_b = submit_from_hospital_b(round_num=5)
results.append(result_b['model_hash'])
# Hospital C submits
result_c = submit_from_hospital_c(round_num=5)
results.append(result_c['model_hash'])
# All should be identical
assert results[0] == results[1] == results[2]
print("✓ Multi-hospital consistency test passed")
Compliance
- HIPAA: Patient data encrypted, never transmitted in plaintext
- GDPR: Differential privacy ensures no individual data recovery
- Data Minimization: Only gradients shared, not raw data
- Right to be Forgotten: Can opt out after current round
Best Practices
- Secure Key Management: Use HSM or key vault for encryption keys
- Regular Monitoring: Check privacy budget and model quality
- Backup Encryption Keys: Store encrypted backups separately
- Coordinate Timing: Synchronize submission times with other hospitals
- Validate Models: Test updated models before clinical use
- Document Participation: Keep records for compliance audits
Next Steps
- Architecture - Federated Learning - Deep dive into theory
- Integration Guide - Step-by-step setup
- API Reference - Full API docs
- Deployment - Production setup
Support
- Documentation: This guide
- GitHub: https://github.com/orgs/Oneliac
- Telegram: https://t.me/oneliac_bot
- Website: https://www.oneliac.xyz