Security Best Practices
Guidelines for secure deployment and usage of Oneliac.
Data Security
Encryption at Rest
from cryptography.fernet import Fernet
import json
# Generate strong encryption key
encryption_key = Fernet.generate_key()
# Store key securely
with open('/secure/path/encryption_key.bin', 'wb') as f:
f.write(encryption_key)
# Encrypt sensitive data before storage
cipher = Fernet(encryption_key)
plaintext = json.dumps(patient_data).encode()
ciphertext = cipher.encrypt(plaintext)
# Store ciphertext
with open('patient_data.enc', 'wb') as f:
f.write(ciphertext)
Encryption in Transit
# ALWAYS use HTTPS/TLS for API calls
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# Configure session with retries and HTTPS
session = requests.Session()
session.mount('https://', HTTPAdapter(max_retries=Retry(
total=3,
backoff_factor=1,
status_forcelist=[500, 502, 503, 504]
)))
# Make HTTPS request
response = session.post(
'https://api.oneliac.io/verify-eligibility',
json=payload,
verify=True # Verify SSL certificate
)
Key Management
Store Keys Securely
❌ Never do this:
# WRONG: Hardcoding keys
encryption_key = "gAAAAABlmZ..."
api_key = "sk_live_abc123"
✅ Do this instead:
import os
from dotenv import load_dotenv
# Load from environment
load_dotenv()
encryption_key = os.getenv('ENCRYPTION_KEY')
api_key = os.getenv('API_KEY')
# Or use secrets manager
import boto3
secrets_client = boto3.client('secretsmanager')
secret = secrets_client.get_secret_value(SecretId='oneliac-key')
encryption_key = secret['SecretString']
Use Secrets Management
Option 1: AWS Secrets Manager
import boto3
def get_encryption_key():
client = boto3.client('secretsmanager', region_name='us-east-1')
response = client.get_secret_value(SecretId='oneliac/encryption-key')
return response['SecretString']
Option 2: HashiCorp Vault
import hvac
def get_encryption_key():
client = hvac.Client(url='http://vault.example.com:8200')
client.auth.approle.login(role_id='...', secret_id='...')
secret = client.secrets.kv.read_secret_version(path='oneliac/encryption-key')
return secret['data']['data']['key']
Option 3: Environment Variables
# .env file (not in git)
ENCRYPTION_KEY=base64_encoded_key_here
API_KEY=sk_live_abc123
# Load in code
from dotenv import load_dotenv
import os
load_dotenv()
encryption_key = os.getenv('ENCRYPTION_KEY')
Rotate Keys Regularly
def rotate_encryption_key():
"""Rotate encryption key quarterly."""
from cryptography.fernet import Fernet
import datetime
# Generate new key
new_key = Fernet.generate_key()
old_key = load_current_key()
# Re-encrypt all data with new key
old_cipher = Fernet(old_key)
new_cipher = Fernet(new_key)
for data in get_all_encrypted_data():
decrypted = old_cipher.decrypt(data)
new_encrypted = new_cipher.encrypt(decrypted)
update_data(new_encrypted)
# Store new key
save_key(new_key, backup_old_key=True)
# Log rotation
log_event({
"event": "key_rotation",
"timestamp": datetime.datetime.utcnow().isoformat(),
"status": "success"
})
API Security
Use Strong API Keys
# API key requirements
- Minimum length: 32 characters
- Include uppercase, lowercase, numbers, symbols
- Unique per environment (dev, staging, prod)
- Rotate every 90 days
API Key Whitelisting
# Restrict API key to specific IPs
import requests
def whitelist_api_key(api_key, ip_addresses):
response = requests.post(
f'https://api.oneliac.io/api/keys/{api_key}/whitelist',
json={"ips": ip_addresses},
headers={"Authorization": f"Bearer {admin_token}"}
)
return response.status_code == 200
Rate Limiting
from functools import wraps
from time import time
def rate_limit(max_calls, time_window):
"""Rate limit decorator."""
def decorator(func):
call_times = []
@wraps(func)
def wrapper(*args, **kwargs):
now = time()
# Remove old calls outside time window
call_times[:] = [t for t in call_times if t > now - time_window]
if len(call_times) >= max_calls:
raise Exception(f"Rate limit exceeded: {max_calls} calls per {time_window}s")
call_times.append(now)
return func(*args, **kwargs)
return wrapper
return decorator
@rate_limit(max_calls=10, time_window=60)
def api_call(endpoint, data):
# API call implementation
pass
Network Security
HTTPS/TLS Configuration
# Use strong TLS versions and ciphers
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers HIGH:!aNULL:!MD5;
ssl_prefer_server_ciphers on;
CORS Configuration
# Restrict CORS origins in production
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=["https://yourdomain.com"], # Specific domains
allow_credentials=True,
allow_methods=["POST", "GET"], # Only needed methods
allow_headers=["Content-Type", "Authorization"],
)
IP Whitelisting
# Restrict API access to known IPs
ALLOWED_IPS = [
"203.0.113.0",
"203.0.113.1",
]
def check_ip(request):
client_ip = request.client.host
if client_ip not in ALLOWED_IPS:
raise HTTPException(status_code=403, detail="IP not whitelisted")
Access Control
Role-Based Access Control (RBAC)
from enum import Enum
class Role(str, Enum):
ADMIN = "admin"
DOCTOR = "doctor"
STAFF = "staff"
PATIENT = "patient"
def require_role(required_roles):
"""Check user has required role."""
def decorator(func):
@wraps(func)
def wrapper(request, *args, **kwargs):
user_role = get_user_role(request)
if user_role not in required_roles:
raise HTTPException(status_code=403, detail="Insufficient permissions")
return func(request, *args, **kwargs)
return wrapper
return decorator
@app.post("/admin/config")
@require_role([Role.ADMIN])
async def admin_config(request: Request):
# Only admins can access
pass
Audit Logging
Log All Operations
import logging
import json
from datetime import datetime
audit_logger = logging.getLogger('audit')
def log_api_call(user_id, endpoint, action, status, data):
"""Log API call for audit trail."""
audit_logger.info(json.dumps({
"timestamp": datetime.utcnow().isoformat(),
"user_id": user_id,
"endpoint": endpoint,
"action": action,
"status": status,
"data_hash": hashlib.sha256(json.dumps(data).encode()).hexdigest()
}))
# Log eligibility check
log_api_call(
user_id="doctor_001",
endpoint="/verify-eligibility",
action="check_coverage",
status="success",
data={"procedure": "PROC001"}
)
Audit Log Retention
# Keep audit logs for required retention period (e.g., 7 years for healthcare)
# Example: Store in immutable S3 with versioning
import boto3
def archive_audit_logs():
"""Archive old audit logs to S3."""
s3 = boto3.client('s3')
# Upload to immutable S3 bucket
with open('audit.log', 'rb') as f:
s3.put_object(
Bucket='audit-logs-archive',
Key=f"audit-{date.today()}.log",
Body=f.read(),
ServerSideEncryption='AES256'
)
Data Privacy
Minimize Data Collection
# Only collect necessary fields
required_fields = [
"patient_id", # De-identified
"encrypted_data", # Encrypted
"ipfs_cid", # Reference only
"data_hash" # Hash only
]
# Remove unnecessary fields
unnecessary_fields = [
"patient_name", # Never needed
"patient_ssn", # Never store
"diagnosis_text" # Never store plaintext
]
Data Retention Policy
# Implement automatic data deletion
from datetime import datetime, timedelta
def delete_old_data():
"""Delete data older than retention period."""
retention_days = 365 * 7 # 7 years for healthcare
cutoff_date = datetime.utcnow() - timedelta(days=retention_days)
old_records = get_records_before(cutoff_date)
for record in old_records:
# Securely delete
shred_file(record.file_path)
delete_database_record(record.id)
# Log deletion
log_event({
"event": "data_deletion",
"record_id": record.id,
"reason": "retention_period_expired"
})
Compliance Checklist
- All data encrypted at rest and in transit
- API keys rotated quarterly
- HTTPS/TLS enabled for all communications
- Access control implemented (RBAC)
- Audit logging enabled and archived
- Data retention policy implemented
- Incident response plan created
- Security assessments performed annually
- HIPAA Business Associate Agreement signed
- Staff trained on data protection
- Regular security updates applied
- Firewall rules restricted to minimum necessary
- Two-factor authentication enabled
- Database backups encrypted
- Disaster recovery plan tested
Security Tools
Dependency Scanning
# Check for vulnerable dependencies
pip install safety
safety check
# Or using Snyk
snyk test
Static Code Analysis
# Scan Python code for security issues
pip install bandit
bandit -r agents/
# Or using SonarQube
sonar-scanner
Secrets Detection
# Scan for hardcoded secrets
pip install detect-secrets
detect-secrets scan --all-files
Incident Response
Security Incident Procedure
- Detect: Monitor logs for suspicious activity
- Contain: Revoke compromised keys, block IP
- Investigate: Review logs, identify scope
- Remediate: Fix vulnerability, update code
- Communicate: Notify affected parties
- Learn: Post-mortem, prevent recurrence
Breach Notification
def notify_of_breach(affected_users, breach_details):
"""Notify users of security breach."""
for user in affected_users:
send_email(
to=user.email,
subject="Security Notice",
body=f"""
A security incident was detected on {breach_details['date']}.
Affected data: {breach_details['data']}
Action taken: {breach_details['action']}
"""
)
# Notify authorities if required by law
notify_authorities(breach_details)