Skip to main content

Federated Learning Coordinator User Manual

The Federated Learning Coordinator enables healthcare organizations to collectively improve AI diagnosis models while maintaining complete patient data privacy.

Overview

The Federated Learning Coordinator allows hospitals to:

  • Participate in collaborative AI training without sharing patient data
  • Improve diagnosis accuracy across organizations
  • Maintain HIPAA/GDPR compliance
  • Track privacy budget and differential privacy guarantees

When to Use

Use Federated Learning Coordinator when:

  • ✅ Multiple hospitals want to improve diagnosis AI together
  • ✅ You have sensitive patient data that cannot be shared
  • ✅ You want to contribute to healthcare AI research
  • ✅ You need HIPAA-compliant collaborative learning

Do not use for:

  • ❌ Real-time clinical decision support
  • ❌ Individual patient diagnosis (use Eligibility/Prescription instead)
  • ❌ Non-medical data analysis

Data Privacy Guarantees

What Stays Private

  • ✅ Raw patient medical data (never leaves your organization)
  • ✅ Patient identities
  • ✅ Diagnoses
  • ✅ Medications
  • ✅ Medical history details

What is Shared

  • ❌ Only encrypted gradients (mathematical model updates)
  • ❌ Gradients have differential privacy noise added
  • ❌ No plaintext patient information ever transmitted

How It Works

Simple Explanation

Step 1: Your hospital has patient data
Step 2: Compute what to learn from that data (gradients)
Step 3: Encrypt the learning
Step 4: Add noise to protect privacy
Step 5: Send encrypted learning to coordinator
Step 6: Coordinator averages all hospitals' learning
Step 7: Improved model distributed back to all
Step 8: Repeat monthly

Privacy Mathematics

  • Differential Privacy: Proves that individual patient data cannot be recovered
  • zCDP: Zero-Concentrated Differential Privacy ensures privacy bounds
  • Gaussian Noise: Math ensures privacy regardless of gradient content

API Usage

Endpoint

POST /submit-federated-update

Request Format

{
"patient_data_list": [
{
"patient_id": "PATIENT_001",
"encrypted_data": "gAAAAABlmZ...",
"ipfs_cid": "QmV5koooi...",
"data_hash": "a1b2c3d4..."
},
{
"patient_id": "PATIENT_002",
"encrypted_data": "gAAAAABlnZ...",
"ipfs_cid": "QmV5koooj...",
"data_hash": "b2c3d4e5..."
}
],
"round_number": 5
}

Response Format

{
"round": 5,
"participants": 3,
"model_hash": "abc123def456..."
}

Response Fields

FieldTypeDescription
roundIntegerTraining round number completed
participantsIntegerNumber of hospitals that submitted data
model_hashStringSHA256 hash of updated model (for verification)

Step-by-Step Setup

Step 1: Generate Encryption Keys

from cryptography.fernet import Fernet

# Generate and securely store encryption key
encryption_key = Fernet.generate_key()

# IMPORTANT: Save this key securely
with open('/secure/location/encryption_key.bin', 'wb') as f:
f.write(encryption_key)

print(f"Encryption key generated and saved")

Step 2: Prepare Patient Data Batch

import json
import hashlib
from cryptography.fernet import Fernet

# Load encryption key
with open('/secure/location/encryption_key.bin', 'rb') as f:
encryption_key = f.read()

cipher = Fernet(encryption_key)

# Prepare patient batch (from your EHR)
patients = [
{
"patient_id": "PATIENT_001",
"age": 45,
"symptoms": ["fever", "cough"],
"vitals": {"bp": "120/80", "hr": 72},
"labs": {"wbc": 12000},
"diagnosis": "respiratory_infection"
},
{
"patient_id": "PATIENT_002",
"age": 62,
"symptoms": ["chest_pain", "shortness_of_breath"],
"vitals": {"bp": "140/90", "hr": 88},
"labs": {"troponin": 0.02},
"diagnosis": "acute_coronary_syndrome"
}
]

# Prepare data for submission
patient_data_list = []

for patient in patients:
# Encrypt medical data
encrypted = cipher.encrypt(
json.dumps(patient).encode()
).decode()

# Compute hash for integrity
data_hash = hashlib.sha256(
json.dumps(patient).encode()
).hexdigest()

# Add to list
patient_data_list.append({
"patient_id": patient["patient_id"],
"encrypted_data": encrypted,
"ipfs_cid": "QmV5koooi...", # Optional: upload to IPFS first
"data_hash": data_hash
})

print(f"Prepared {len(patient_data_list)} patients for training")

Step 3: Submit to Federated Learning Coordinator

import requests

api_url = "http://localhost:8000/submit-federated-update"

payload = {
"patient_data_list": patient_data_list,
"round_number": 5
}

response = requests.post(api_url, json=payload)
result = response.json()

print(f"Round {result['round']} completed")
print(f"Participants: {result['participants']}")
print(f"Model hash: {result['model_hash']}")

Step 4: Store and Verify Results

# Store model hash for verification
with open('model_history.log', 'a') as f:
f.write(f"Round {result['round']}: {result['model_hash']}\n")

# Verify consistency with other hospitals
# (All hospitals should report same model_hash for same round)

Integration with Your Hospital System

EHR Integration

import schedule
import time
from datetime import datetime

class FederatedLearningIntegration:
def __init__(self, api_url, encryption_key_path):
self.api_url = api_url
self.encryption_key_path = encryption_key_path
self.round_number = 1

def run_training_round(self):
"""Execute one federated learning training round."""

print(f"[{datetime.now()}] Starting FL round {self.round_number}")

# Step 1: Extract patients from EHR
patients = self.extract_from_ehr(
num_patients=100,
conditions=["respiratory", "cardiac"]
)

# Step 2: Prepare encrypted data
patient_data = self.prepare_encrypted_data(patients)

# Step 3: Submit to coordinator
try:
result = self.submit_federated_update(
patient_data_list=patient_data,
round_number=self.round_number
)

print(f"Round {result['round']} succeeded")
print(f"Model hash: {result['model_hash']}")

self.round_number += 1

except Exception as e:
print(f"Round failed: {e}")
# Implement retry logic
self.retry_round()

def extract_from_ehr(self, num_patients, conditions):
"""Extract patient data from EHR system."""
# Implementation specific to your EHR
pass

def prepare_encrypted_data(self, patients):
"""Encrypt and prepare patient data."""
# Load key
with open(self.encryption_key_path, 'rb') as f:
encryption_key = f.read()

cipher = Fernet(encryption_key)

# Encrypt each patient
data_list = []
for patient in patients:
encrypted = cipher.encrypt(
json.dumps(patient).encode()
).decode()

data_list.append({
"patient_id": patient['id'],
"encrypted_data": encrypted,
"ipfs_cid": "...",
"data_hash": hashlib.sha256(
json.dumps(patient).encode()
).hexdigest()
})

return data_list

def submit_federated_update(self, patient_data_list, round_number):
"""Submit encrypted data to coordinator."""
response = requests.post(
self.api_url,
json={
"patient_data_list": patient_data_list,
"round_number": round_number
}
)
return response.json()

# Schedule weekly training rounds
fl_integration = FederatedLearningIntegration(
api_url="http://localhost:8000/submit-federated-update",
encryption_key_path="/secure/encryption_key.bin"
)

# Run every Monday at 2 AM
schedule.every().monday.at("02:00").do(fl_integration.run_training_round)

while True:
schedule.run_pending()
time.sleep(60)

Privacy Budget Tracking

Understanding Privacy Budget

Privacy budget represents the total allowed privacy loss across all training rounds.

Total Privacy Budget = 1.0 (ε)

Round 1: Spend 0.1 ε → Remaining: 0.9 ε
Round 2: Spend 0.1 ε → Remaining: 0.8 ε
...
Round 10: Spend 0.1 ε → Remaining: 0.0 ε (exhausted)

Tracking Implementation

class PrivacyBudgetTracker:
def __init__(self, total_epsilon: float = 1.0):
self.epsilon_total = total_epsilon
self.epsilon_used = 0.0
self.rounds = []

def log_round(
self,
round_num: int,
sigma: float,
num_samples: int,
model_hash: str
):
"""Log a federated learning round."""

# Compute privacy cost
# (simplified; actual formula depends on DP mechanism)
epsilon_this_round = self._compute_epsilon(sigma, num_samples)

self.epsilon_used += epsilon_this_round

self.rounds.append({
"round": round_num,
"epsilon_spent": epsilon_this_round,
"epsilon_total_used": self.epsilon_used,
"epsilon_remaining": self.epsilon_total - self.epsilon_used,
"model_hash": model_hash,
"timestamp": datetime.utcnow().isoformat()
})

# Alert if budget running low
remaining_pct = (self.epsilon_total - self.epsilon_used) / self.epsilon_total
if remaining_pct < 0.2:
print(f"WARNING: Privacy budget running low ({remaining_pct*100:.1f}% remaining)")

def _compute_epsilon(self, sigma: float, num_samples: int) -> float:
"""Compute privacy cost for a round."""
# zCDP formula: ε ≈ 1 / (2 * σ²)
# (simplified for demonstration)
return 1.0 / (2 * sigma**2) if sigma > 0 else 0.1

def get_report(self) -> dict:
"""Get privacy budget report."""
return {
"total_rounds": len(self.rounds),
"epsilon_total": self.epsilon_total,
"epsilon_used": self.epsilon_used,
"epsilon_remaining": self.epsilon_total - self.epsilon_used,
"percentage_used": (self.epsilon_used / self.epsilon_total) * 100,
"history": self.rounds
}

# Usage
tracker = PrivacyBudgetTracker(total_epsilon=1.0)

# After each round
tracker.log_round(
round_num=5,
sigma=1.0,
num_samples=1500,
model_hash="abc123..."
)

# Check budget
report = tracker.get_report()
print(f"Budget used: {report['percentage_used']:.1f}%")

Performance Optimization

Batch Size Considerations

Batch SizeGradient QualityTraining TimePrivacy Cost
10 patientsLow10msHigh
100 patientsMedium100msMedium
1000 patientsHigh1000msLow

Recommendation: Use 500-2000 patients per round for optimal balance.

Caching Strategy

# Cache model parameters to avoid recomputation
model_cache = {}

def get_model_weights(model_hash):
"""Get model weights from cache or recompute."""
if model_hash in model_cache:
return model_cache[model_hash]

# Load from disk
weights = load_model_weights(model_hash)
model_cache[model_hash] = weights

return weights

Error Handling

Common Issues

Issue 1: Encryption Key Mismatch

try:
result = submit_federated_update(patient_data)
except Exception as e:
if "decryption failed" in str(e):
print("ERROR: Encryption key mismatch")
print("Ensure encryption_key.bin matches coordinator's key")

Issue 2: Too Few Participants

{
"round": 5,
"participants": 1,
"warning": "Only 1 hospital participated. Waiting for more."
}

Solution: Ensure other hospitals submit data for same round. Coordinate timing.

Issue 3: Data Hash Mismatch

try:
result = submit_federated_update(patient_data)
except HTTPException as e:
if "data_hash" in str(e):
# Recompute hash correctly
correct_hash = hashlib.sha256(
json.dumps(patient).encode()
).hexdigest()

Issue 4: Network Timeout

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=4, max=60)
)
def submit_with_retry(patient_data):
"""Submit with automatic retry."""
return submit_federated_update(patient_data)

Monitoring and Auditing

Key Metrics

monitoring_data = {
"round": 5,
"submission_time": "2024-01-15T02:30:00Z",
"patients_submitted": 1500,
"data_size_encrypted": 2500000, # bytes
"submission_latency": 350, # ms
"model_hash": "abc123...",
"privacy_epsilon_used": 0.1,
"status": "success"
}

Audit Logging

import logging

# Configure audit logging
audit_logger = logging.getLogger('federated_learning_audit')
handler = logging.FileHandler('federated_learning_audit.log')
handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
audit_logger.addHandler(handler)

# Log each submission
def log_submission(patient_count, model_hash, round_num):
audit_logger.info(
f"Round {round_num}: Submitted {patient_count} patients. "
f"Model hash: {model_hash}"
)

Testing & Validation

Test Case 1: Single Round Training

def test_single_round():
# Prepare test data
test_patients = [
{
"patient_id": f"TEST_{i}",
"age": 30 + i,
"symptoms": ["test"],
"diagnosis": "test"
}
for i in range(100)
]

# Encrypt and submit
encrypted_data = prepare_encrypted_data(test_patients)
result = submit_federated_update(encrypted_data, round_number=1)

# Verify result
assert result['round'] == 1
assert result['participants'] >= 1
assert len(result['model_hash']) == 64 # SHA256 hash length

print("✓ Single round training test passed")

Test Case 2: Multi-Hospital Consistency

def test_multi_hospital_consistency():
"""Verify all hospitals receive same model hash."""

results = []

# Hospital A submits
result_a = submit_from_hospital_a(round_num=5)
results.append(result_a['model_hash'])

# Hospital B submits
result_b = submit_from_hospital_b(round_num=5)
results.append(result_b['model_hash'])

# Hospital C submits
result_c = submit_from_hospital_c(round_num=5)
results.append(result_c['model_hash'])

# All should be identical
assert results[0] == results[1] == results[2]

print("✓ Multi-hospital consistency test passed")

Compliance

  • HIPAA: Patient data encrypted, never transmitted in plaintext
  • GDPR: Differential privacy ensures no individual data recovery
  • Data Minimization: Only gradients shared, not raw data
  • Right to be Forgotten: Can opt out after current round

Best Practices

  1. Secure Key Management: Use HSM or key vault for encryption keys
  2. Regular Monitoring: Check privacy budget and model quality
  3. Backup Encryption Keys: Store encrypted backups separately
  4. Coordinate Timing: Synchronize submission times with other hospitals
  5. Validate Models: Test updated models before clinical use
  6. Document Participation: Keep records for compliance audits

Next Steps

  1. Architecture - Federated Learning - Deep dive into theory
  2. Integration Guide - Step-by-step setup
  3. API Reference - Full API docs
  4. Deployment - Production setup

Support