Skip to main content

Security Best Practices

Guidelines for secure deployment and usage of Oneliac.

Data Security

Encryption at Rest

from cryptography.fernet import Fernet
import json

# Generate strong encryption key
encryption_key = Fernet.generate_key()

# Store key securely
with open('/secure/path/encryption_key.bin', 'wb') as f:
f.write(encryption_key)

# Encrypt sensitive data before storage
cipher = Fernet(encryption_key)
plaintext = json.dumps(patient_data).encode()
ciphertext = cipher.encrypt(plaintext)

# Store ciphertext
with open('patient_data.enc', 'wb') as f:
f.write(ciphertext)

Encryption in Transit

# ALWAYS use HTTPS/TLS for API calls
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

# Configure session with retries and HTTPS
session = requests.Session()
session.mount('https://', HTTPAdapter(max_retries=Retry(
total=3,
backoff_factor=1,
status_forcelist=[500, 502, 503, 504]
)))

# Make HTTPS request
response = session.post(
'https://api.oneliac.io/verify-eligibility',
json=payload,
verify=True # Verify SSL certificate
)

Key Management

Store Keys Securely

Never do this:

# WRONG: Hardcoding keys
encryption_key = "gAAAAABlmZ..."
api_key = "sk_live_abc123"

Do this instead:

import os
from dotenv import load_dotenv

# Load from environment
load_dotenv()
encryption_key = os.getenv('ENCRYPTION_KEY')
api_key = os.getenv('API_KEY')

# Or use secrets manager
import boto3
secrets_client = boto3.client('secretsmanager')
secret = secrets_client.get_secret_value(SecretId='oneliac-key')
encryption_key = secret['SecretString']

Use Secrets Management

Option 1: AWS Secrets Manager

import boto3

def get_encryption_key():
client = boto3.client('secretsmanager', region_name='us-east-1')
response = client.get_secret_value(SecretId='oneliac/encryption-key')
return response['SecretString']

Option 2: HashiCorp Vault

import hvac

def get_encryption_key():
client = hvac.Client(url='http://vault.example.com:8200')
client.auth.approle.login(role_id='...', secret_id='...')
secret = client.secrets.kv.read_secret_version(path='oneliac/encryption-key')
return secret['data']['data']['key']

Option 3: Environment Variables

# .env file (not in git)
ENCRYPTION_KEY=base64_encoded_key_here
API_KEY=sk_live_abc123

# Load in code
from dotenv import load_dotenv
import os
load_dotenv()
encryption_key = os.getenv('ENCRYPTION_KEY')

Rotate Keys Regularly

def rotate_encryption_key():
"""Rotate encryption key quarterly."""
from cryptography.fernet import Fernet
import datetime

# Generate new key
new_key = Fernet.generate_key()
old_key = load_current_key()

# Re-encrypt all data with new key
old_cipher = Fernet(old_key)
new_cipher = Fernet(new_key)

for data in get_all_encrypted_data():
decrypted = old_cipher.decrypt(data)
new_encrypted = new_cipher.encrypt(decrypted)
update_data(new_encrypted)

# Store new key
save_key(new_key, backup_old_key=True)

# Log rotation
log_event({
"event": "key_rotation",
"timestamp": datetime.datetime.utcnow().isoformat(),
"status": "success"
})

API Security

Use Strong API Keys

# API key requirements
- Minimum length: 32 characters
- Include uppercase, lowercase, numbers, symbols
- Unique per environment (dev, staging, prod)
- Rotate every 90 days

API Key Whitelisting

# Restrict API key to specific IPs
import requests

def whitelist_api_key(api_key, ip_addresses):
response = requests.post(
f'https://api.oneliac.io/api/keys/{api_key}/whitelist',
json={"ips": ip_addresses},
headers={"Authorization": f"Bearer {admin_token}"}
)
return response.status_code == 200

Rate Limiting

from functools import wraps
from time import time

def rate_limit(max_calls, time_window):
"""Rate limit decorator."""
def decorator(func):
call_times = []

@wraps(func)
def wrapper(*args, **kwargs):
now = time()
# Remove old calls outside time window
call_times[:] = [t for t in call_times if t > now - time_window]

if len(call_times) >= max_calls:
raise Exception(f"Rate limit exceeded: {max_calls} calls per {time_window}s")

call_times.append(now)
return func(*args, **kwargs)

return wrapper
return decorator

@rate_limit(max_calls=10, time_window=60)
def api_call(endpoint, data):
# API call implementation
pass

Network Security

HTTPS/TLS Configuration

# Use strong TLS versions and ciphers
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers HIGH:!aNULL:!MD5;
ssl_prefer_server_ciphers on;

CORS Configuration

# Restrict CORS origins in production
from fastapi.middleware.cors import CORSMiddleware

app.add_middleware(
CORSMiddleware,
allow_origins=["https://yourdomain.com"], # Specific domains
allow_credentials=True,
allow_methods=["POST", "GET"], # Only needed methods
allow_headers=["Content-Type", "Authorization"],
)

IP Whitelisting

# Restrict API access to known IPs
ALLOWED_IPS = [
"203.0.113.0",
"203.0.113.1",
]

def check_ip(request):
client_ip = request.client.host
if client_ip not in ALLOWED_IPS:
raise HTTPException(status_code=403, detail="IP not whitelisted")

Access Control

Role-Based Access Control (RBAC)

from enum import Enum

class Role(str, Enum):
ADMIN = "admin"
DOCTOR = "doctor"
STAFF = "staff"
PATIENT = "patient"

def require_role(required_roles):
"""Check user has required role."""
def decorator(func):
@wraps(func)
def wrapper(request, *args, **kwargs):
user_role = get_user_role(request)
if user_role not in required_roles:
raise HTTPException(status_code=403, detail="Insufficient permissions")
return func(request, *args, **kwargs)
return wrapper
return decorator

@app.post("/admin/config")
@require_role([Role.ADMIN])
async def admin_config(request: Request):
# Only admins can access
pass

Audit Logging

Log All Operations

import logging
import json
from datetime import datetime

audit_logger = logging.getLogger('audit')

def log_api_call(user_id, endpoint, action, status, data):
"""Log API call for audit trail."""
audit_logger.info(json.dumps({
"timestamp": datetime.utcnow().isoformat(),
"user_id": user_id,
"endpoint": endpoint,
"action": action,
"status": status,
"data_hash": hashlib.sha256(json.dumps(data).encode()).hexdigest()
}))

# Log eligibility check
log_api_call(
user_id="doctor_001",
endpoint="/verify-eligibility",
action="check_coverage",
status="success",
data={"procedure": "PROC001"}
)

Audit Log Retention

# Keep audit logs for required retention period (e.g., 7 years for healthcare)
# Example: Store in immutable S3 with versioning

import boto3

def archive_audit_logs():
"""Archive old audit logs to S3."""
s3 = boto3.client('s3')

# Upload to immutable S3 bucket
with open('audit.log', 'rb') as f:
s3.put_object(
Bucket='audit-logs-archive',
Key=f"audit-{date.today()}.log",
Body=f.read(),
ServerSideEncryption='AES256'
)

Data Privacy

Minimize Data Collection

# Only collect necessary fields
required_fields = [
"patient_id", # De-identified
"encrypted_data", # Encrypted
"ipfs_cid", # Reference only
"data_hash" # Hash only
]

# Remove unnecessary fields
unnecessary_fields = [
"patient_name", # Never needed
"patient_ssn", # Never store
"diagnosis_text" # Never store plaintext
]

Data Retention Policy

# Implement automatic data deletion
from datetime import datetime, timedelta

def delete_old_data():
"""Delete data older than retention period."""
retention_days = 365 * 7 # 7 years for healthcare

cutoff_date = datetime.utcnow() - timedelta(days=retention_days)

old_records = get_records_before(cutoff_date)

for record in old_records:
# Securely delete
shred_file(record.file_path)
delete_database_record(record.id)

# Log deletion
log_event({
"event": "data_deletion",
"record_id": record.id,
"reason": "retention_period_expired"
})

Compliance Checklist

  • All data encrypted at rest and in transit
  • API keys rotated quarterly
  • HTTPS/TLS enabled for all communications
  • Access control implemented (RBAC)
  • Audit logging enabled and archived
  • Data retention policy implemented
  • Incident response plan created
  • Security assessments performed annually
  • HIPAA Business Associate Agreement signed
  • Staff trained on data protection
  • Regular security updates applied
  • Firewall rules restricted to minimum necessary
  • Two-factor authentication enabled
  • Database backups encrypted
  • Disaster recovery plan tested

Security Tools

Dependency Scanning

# Check for vulnerable dependencies
pip install safety
safety check

# Or using Snyk
snyk test

Static Code Analysis

# Scan Python code for security issues
pip install bandit
bandit -r agents/

# Or using SonarQube
sonar-scanner

Secrets Detection

# Scan for hardcoded secrets
pip install detect-secrets
detect-secrets scan --all-files

Incident Response

Security Incident Procedure

  1. Detect: Monitor logs for suspicious activity
  2. Contain: Revoke compromised keys, block IP
  3. Investigate: Review logs, identify scope
  4. Remediate: Fix vulnerability, update code
  5. Communicate: Notify affected parties
  6. Learn: Post-mortem, prevent recurrence

Breach Notification

def notify_of_breach(affected_users, breach_details):
"""Notify users of security breach."""
for user in affected_users:
send_email(
to=user.email,
subject="Security Notice",
body=f"""
A security incident was detected on {breach_details['date']}.
Affected data: {breach_details['data']}
Action taken: {breach_details['action']}
"""
)

# Notify authorities if required by law
notify_authorities(breach_details)

Next Steps