AI Surveillance Platform — Comprehensive Security Architecture
Document Classification: CONFIDENTIAL — Security Architecture
Version: 1.0
Target Environment: Cloud-Hosted Web Application + Local Edge Gateway
Network Topology: Cloud Platform <-> WireGuard VPN <-> Edge Gateway <-> DVR (192.168.29.200)
Data Sensitivity: HIGH — Contains biometric facial data, PII, surveillance footage
Table of Contents
- SSL/TLS Security
- Authentication
- Role-Based Access Control (RBAC)
- VPN & Network Security
- Secret Management
- Audit Logging
- Media Access Security
- API Security
- Session Security
- Data Privacy & GDPR Compliance
- Edge Gateway Security
- Cloud Infrastructure Security
- Secrets Rotation Policy
- Incident Response Plan
- Security Checklist
- Appendix: Configuration Templates
1. SSL/TLS Security
1.1 HTTPS-Only Policy
All web traffic MUST use HTTPS. HTTP requests are permanently redirected to HTTPS.
1.2 Certificate Management
Option A: Let's Encrypt (Recommended for Cost Efficiency)
- Automatic certificate provisioning via ACMEv2 protocol
- 90-day certificate lifecycle with auto-renewal at day 60
- DNS-01 challenge for wildcard certificates (e.g.,
*.surveillance.example.com) - HTTP-01 challenge for single-domain certificates
Option B: Managed Certificate Authority (Enterprise)
- Cloud provider-managed certificates (AWS ACM, Azure Key Vault, Google Certificate Manager)
- Annual validity with auto-renewal
- Extended Validation (EV) or Organization Validation (OV) recommended for trust
1.3 TLS Version & Cipher Requirements
Minimum TLS Version: TLS 1.2
Preferred: TLS 1.3
# Nginx SSL Configuration
server {
listen 443 ssl http2;
server_name surveillance.example.com;
ssl_certificate /etc/letsencrypt/live/surveillance.example.com/fullchain.pem;
ssl_certificate_key /etc/letsencrypt/live/surveillance.example.com/privkey.pem;
# TLS Versions
ssl_protocols TLSv1.3 TLSv1.2;
# Cipher Suites — TLS 1.3 (automatic, no configuration needed)
# Cipher Suites — TLS 1.2
ssl_ciphers 'ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256';
ssl_prefer_server_ciphers off;
# Session Configuration
ssl_session_cache shared:SSL:50m;
ssl_session_timeout 1d;
ssl_session_tickets off;
# OCSP Stapling
ssl_stapling on;
ssl_stapling_verify on;
ssl_trusted_certificate /etc/letsencrypt/live/surveillance.example.com/chain.pem;
resolver 8.8.8.8 8.8.4.4 valid=300s;
resolver_timeout 5s;
# Security Headers
add_header Strict-Transport-Security "max-age=63072000; includeSubDomains; preload" always;
add_header X-Content-Type-Options "nosniff" always;
add_header X-Frame-Options "DENY" always;
add_header X-XSS-Protection "0" always; # Disabled in favor of CSP
add_header Referrer-Policy "strict-origin-when-cross-origin" always;
}
1.4 HSTS Configuration
Strict-Transport-Security: max-age=63072000; includeSubDomains; preload
max-age=63072000— 2 years enforcementincludeSubDomains— applies to all subdomainspreload— eligible for browser HSTS preload lists
1.5 Internal Service-to-Service TLS
All internal communications MUST be encrypted with TLS/mTLS:
| Communication Path | Security Mechanism |
|---|---|
| Web App -> Database | TLS 1.2+ with certificate verification |
| Web App -> Object Storage | HTTPS with signed URLs |
| Web App -> Message Queue | TLS 1.2+ |
| Web App -> Cache (Redis) | TLS + AUTH password |
| Edge Gateway -> Cloud API | TLS 1.3 via WireGuard tunnel |
| Edge Gateway -> DVR | RTSP over TLS (RTSPS) or isolated VLAN |
| Microservice -> Microservice | mTLS with service mesh (Istio/Linkerd) |
1.6 TLS Configuration Verification
# Verify TLS configuration
nmap --script ssl-enum-ciphers -p 443 surveillance.example.com
# Test for known vulnerabilities
testssl.sh surveillance.example.com
# Verify HSTS
curl -s -D- https://surveillance.example.com | grep -i strict-transport
2. Authentication
2.1 Password Policy
| Policy | Requirement |
|---|---|
| Minimum Length | 12 characters |
| Maximum Length | 128 characters |
| Complexity | Uppercase, lowercase, digit, special character |
| Password History | Last 12 passwords cannot be reused |
| Dictionary Check | Reject common passwords (HaveIBeenPwned API check) |
| Maximum Age | 90 days (configurable) |
| Lockout Threshold | 5 failed attempts |
| Lockout Duration | 30 minutes (exponential backoff) |
2.2 Password Hashing
Primary Algorithm: Argon2id (OWASP recommended)
from argon2 import PasswordHasher
from argon2.low_level import Type
# Argon2id configuration — memory-hard, resistant to GPU/ASIC attacks
ph = PasswordHasher(
time_cost=3, # iterations
memory_cost=65536, # 64 MB
parallelism=4, # parallel threads
hash_len=32,
salt_len=16,
type=Type.ID
)
# Hashing
def hash_password(plain_password: str) -> str:
return ph.hash(plain_password)
# Verification
def verify_password(plain_password: str, hashed_password: str) -> bool:
try:
ph.verify(hashed_password, plain_password)
# Check if rehash needed (parameters upgraded)
if ph.check_needs_rehash(hashed_password):
# Trigger async rehash
pass
return True
except Exception:
return False
Fallback/Compatibility: bcrypt (cost factor 12+)
import bcrypt
def hash_password_bcrypt(plain: str) -> str:
salt = bcrypt.gensalt(rounds=12)
return bcrypt.hashpw(plain.encode(), salt).decode()
2.3 Session Management — JWT with Secure Cookies
Architecture: Short-lived JWT access tokens + long-lived refresh tokens (httpOnly cookies)
import jwt
from datetime import datetime, timedelta
from cryptography.fernet import Fernet
# Configuration
ACCESS_TOKEN_EXPIRY = 15 * 60 # 15 minutes
REFRESH_TOKEN_EXPIRY = 7 * 24 * 3600 # 7 days
JWT_ALGORITHM = "ES256" # ECDSA P-256 — faster, shorter signatures
class TokenManager:
def __init__(self, private_key: str, public_key: str):
self.private_key = private_key
self.public_key = public_key
def create_access_token(self, user_id: str, role: str,
permissions: list, session_id: str) -> str:
"""Create short-lived access token with minimal claims."""
now = datetime.utcnow()
payload = {
"sub": user_id, # Subject (user ID)
"role": role,
"perms": permissions, # Permission scope
"sid": session_id, # Session binding
"iat": now, # Issued at
"exp": now + timedelta(seconds=ACCESS_TOKEN_EXPIRY),
"nbf": now, # Not before
"iss": "surveillance-platform", # Issuer
"aud": "surveillance-api", # Audience
"jti": secrets.token_urlsafe(16) # Unique token ID
}
return jwt.encode(payload, self.private_key, algorithm=JWT_ALGORITHM)
def create_refresh_token(self, user_id: str, session_id: str,
fingerprint: str) -> str:
"""Create refresh token stored as httpOnly cookie."""
now = datetime.utcnow()
payload = {
"sub": user_id,
"sid": session_id,
"fp": fingerprint, # Browser fingerprint binding
"iat": now,
"exp": now + timedelta(seconds=REFRESH_TOKEN_EXPIRY),
"jti": secrets.token_urlsafe(32),
"type": "refresh"
}
return jwt.encode(payload, self.private_key, algorithm=JWT_ALGORITHM)
def verify_token(self, token: str, token_type: str = "access") -> dict:
"""Verify and decode token with full validation."""
try:
payload = jwt.decode(
token,
self.public_key,
algorithms=[JWT_ALGORITHM],
audience="surveillance-api",
issuer="surveillance-platform"
)
if payload.get("type", "access") != token_type:
raise jwt.InvalidTokenError("Invalid token type")
return payload
except jwt.ExpiredSignatureError:
raise AuthenticationError("Token expired")
except jwt.InvalidTokenError as e:
raise AuthenticationError(f"Invalid token: {e}")
2.4 Cookie Configuration
# Refresh token cookie settings
response.set_cookie(
key="refresh_token",
value=refresh_token,
httponly=True, # Not accessible via JavaScript
secure=True, # HTTPS only
samesite="Strict", # CSRF protection
max_age=7 * 24 * 3600,
path="/auth/refresh"
)
2.5 Session Timeout Configuration
| Timeout Type | Duration | Action |
|---|---|---|
| Idle Timeout | 30 minutes | Invalidate session, require re-login |
| Absolute Timeout | 8 hours | Force logout regardless of activity |
| Maximum Session Lifetime | 7 days | Refresh token expiry |
| Concurrent Sessions | 3 per user | Revoke oldest on new login |
2.6 Login Rate Limiting & Account Lockout
import redis
from functools import wraps
class LoginRateLimiter:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
def is_login_allowed(self, identifier: str) -> tuple[bool, dict]:
"""
Multi-layer rate limiting:
1. Per-IP rate limiting
2. Per-username rate limiting
3. Account lockout on repeated failures
"""
ip_key = f"login:ip:{identifier}"
user_key = f"login:user:{username}"
lockout_key = f"lockout:{username}"
# Check if account is locked
if self.redis.exists(lockout_key):
ttl = self.redis.ttl(lockout_key)
return False, {"error": "Account locked", "retry_after": ttl}
# IP-based: max 10 attempts per minute
ip_attempts = self.redis.incr(ip_key)
if ip_attempts == 1:
self.redis.expire(ip_key, 60)
if ip_attempts > 10:
return False, {"error": "Too many attempts from this IP"}
# Username-based: max 5 attempts per 15 minutes
user_attempts = self.redis.incr(user_key)
if user_attempts == 1:
self.redis.expire(user_key, 900)
# Account lockout: 5 failures = 30min lockout
if user_attempts >= 5:
lockout_duration = 1800 # 30 minutes
# Exponential backoff: 30min, 1hr, 2hr, 4hr, 8hr
consecutive_lockouts = int(self.redis.get(f"lockouts:{username}") or 0)
lockout_duration *= (2 ** min(consecutive_lockouts, 4))
self.redis.setex(lockout_key, lockout_duration, "1")
self.redis.incr(f"lockouts:{username}")
self.redis.expire(f"lockouts:{username}", 86400 * 7)
# Log security event
self.log_security_event("ACCOUNT_LOCKED", username, identifier)
return False, {"error": "Account locked", "retry_after": lockout_duration}
return True, {}
def record_success(self, username: str, ip: str):
"""Clear failure counters on successful login."""
self.redis.delete(f"login:user:{username}")
self.redis.delete(f"lockouts:{username}")
self.redis.delete(f"login:ip:{ip}")
2.7 Multi-Factor Authentication (TOTP)
import pyotp
import qrcode
import qrcode.image.svg
class TOTPManager:
def __init__(self):
self.issuer = "AI-Surveillance-Platform"
def enroll_user(self, user_id: str) -> dict:
"""Generate TOTP secret and QR code for enrollment."""
secret = pyotp.random_base32()
# Store encrypted secret in database
encrypted_secret = encrypt_with_kms(secret)
db.store_totp_secret(user_id, encrypted_secret)
# Generate provisioning URI
totp = pyotp.TOTP(secret)
provisioning_uri = totp.provisioning_uri(
name=user_id,
issuer_name=self.issuer
)
# Generate QR code
qr = qrcode.make(provisioning_uri)
qr_base64 = base64.b64encode(qr.tobytes()).decode()
# Return backup codes (single-use)
backup_codes = [secrets.token_hex(4) for _ in range(10)]
hashed_backup_codes = [hash_code(code) for code in backup_codes]
db.store_backup_codes(user_id, hashed_backup_codes)
return {
"qr_code": qr_base64,
"backup_codes": backup_codes, # Display once, never stored plain
"manual_entry_key": secret
}
def verify_totp(self, user_id: str, token: str) -> bool:
"""Verify TOTP code with time-window tolerance."""
encrypted_secret = db.get_totp_secret(user_id)
secret = decrypt_with_kms(encrypted_secret)
totp = pyotp.TOTP(secret)
# Allow current, previous, and next window (90 seconds total tolerance)
return totp.verify(token, valid_window=1)
2.8 Authentication Event Logging
All authentication events are logged with:
- Timestamp (UTC, ISO 8601)
- Event type (LOGIN_SUCCESS, LOGIN_FAILURE, LOGOUT, PASSWORD_CHANGE, MFA_VERIFY)
- User ID (hashed for privacy)
- Source IP address
- User agent (parsed, truncated)
- Geographic location (if available)
- Session ID
- Result (success/failure/reason)
3. Role-Based Access Control (RBAC)
3.1 Role Definitions
roles:
super_admin:
name: "Super Administrator"
description: "Full platform access including user management"
level: 100
admin:
name: "Administrator"
description: "Camera, alert, person, and settings management"
level: 80
operator:
name: "Operator"
description: "Live viewing, alert monitoring and acknowledgment"
level: 50
viewer:
name: "Viewer"
description: "View-only access to live streams and historical events"
level: 20
3.2 Permission Matrix
| Permission | Super Admin | Admin | Operator | Viewer |
|---|---|---|---|---|
| User Management | ||||
| users:create | YES | NO | NO | NO |
| users:read | YES | NO | NO | NO |
| users:update | YES | NO | NO | NO |
| users:delete | YES | NO | NO | NO |
| users:manage_roles | YES | NO | NO | NO |
| Camera Management | ||||
| cameras:create | YES | YES | NO | NO |
| cameras:read | YES | YES | YES | YES |
| cameras:update | YES | YES | NO | NO |
| cameras:delete | YES | YES | NO | NO |
| cameras:configure | YES | YES | NO | NO |
| Live View | ||||
| live:view_all | YES | YES | YES | YES |
| live:view_assigned | YES | YES | YES | YES |
| live:ptz_control | YES | YES | NO | NO |
| Alerts | ||||
| alerts:read_all | YES | YES | YES | NO |
| alerts:read_assigned | YES | YES | YES | YES |
| alerts:acknowledge | YES | YES | YES | NO |
| alerts:configure | YES | YES | NO | NO |
| Persons (Facial Data) | ||||
| persons:create | YES | YES | NO | NO |
| persons:read | YES | YES | YES | NO |
| persons:update | YES | YES | NO | NO |
| persons:delete | YES | YES | NO | NO |
| persons:export | YES | NO | NO | NO |
| persons:train_model | YES | YES | NO | NO |
| persons:approve_training | YES | YES | NO | NO |
| Events & Media | ||||
| events:read | YES | YES | YES | YES |
| events:playback | YES | YES | YES | YES |
| media:download | YES | YES | NO | NO |
| media:export | YES | NO | NO | NO |
| System | ||||
| settings:read | YES | YES | NO | NO |
| settings:update | YES | YES | NO | NO |
| audit:read | YES | NO | NO | NO |
| system:backup | YES | NO | NO | NO |
| system:restore | YES | NO | NO | NO |
| Reports | ||||
| reports:generate | YES | YES | YES | NO |
| reports:schedule | YES | YES | NO | NO |
3.3 Resource-Level Permissions
class ResourcePermissionChecker:
"""
Fine-grained access control with resource-level permissions.
Supports per-camera, per-zone, and per-person restrictions.
"""
def can_access_camera(self, user: User, camera_id: str) -> bool:
# Super Admin bypass
if user.has_role("super_admin"):
return True
# Check if camera is in user's assigned cameras
assigned_cameras = self.get_user_camera_assignments(user.id)
if camera_id in assigned_cameras:
return True
# Check zone-based access
camera_zones = self.get_camera_zones(camera_id)
user_zones = self.get_user_zone_assignments(user.id)
if camera_zones & user_zones: # Intersection
return True
return False
def can_view_person(self, user: User, person_id: str) -> bool:
# Facial data access requires explicit permission
if not user.has_permission("persons:read"):
return False
# Check data classification level
person = self.get_person(person_id)
if person.privacy_level == "restricted":
return user.has_permission("persons:read_restricted")
return True
def filter_query_by_permission(self, user: User,
query: Query, resource_type: str):
"""Automatically apply permission filters to database queries."""
if user.has_role("super_admin"):
return query
if resource_type == "camera":
allowed_ids = self.get_user_camera_assignments(user.id)
return query.filter(Camera.id.in_(allowed_ids))
if resource_type == "person":
# Enforce privacy level restrictions
max_level = self.get_user_privacy_clearance(user.id)
return query.filter(Person.privacy_level <= max_level)
return query
3.4 Middleware for API Endpoint Protection
from functools import wraps
from fastapi import HTTPException, Depends, Request
class RBACMiddleware:
def require_permission(self, permission: str):
def decorator(func):
@wraps(func)
async def wrapper(request: Request, *args, **kwargs):
user = request.state.user
# Check permission
if not user.has_permission(permission):
# Log unauthorized access attempt
await self.log_unauthorized_access(request, permission)
raise HTTPException(
status_code=403,
detail="Insufficient permissions for this operation"
)
# Check resource-level permission if resource_id provided
resource_id = kwargs.get("resource_id") or kwargs.get("camera_id")
if resource_id:
resource_type = self.infer_resource_type(func)
if not self.check_resource_access(user, resource_type, resource_id):
raise HTTPException(
status_code=403,
detail="Access denied for this resource"
)
return await func(request, *args, **kwargs)
return wrapper
return decorator
# Usage in API routes
@app.get("/api/cameras/{camera_id}/live")
@rbac.require_permission("live:view_all")
async def get_live_stream(request: Request, camera_id: str):
...
@app.post("/api/persons")
@rbac.require_permission("persons:create")
async def create_person(request: Request, person_data: PersonCreate):
...
@app.post("/api/alerts/{alert_id}/acknowledge")
@rbac.require_permission("alerts:acknowledge")
async def acknowledge_alert(request: Request, alert_id: str):
...
3.5 Permission Caching
# Cache user permissions in Redis with TTL
PERMISSION_CACHE_TTL = 300 # 5 minutes
def get_user_permissions(user_id: str) -> set:
cache_key = f"permissions:{user_id}"
cached = redis.get(cache_key)
if cached:
return json.loads(cached)
# Fetch from database
permissions = compute_effective_permissions(user_id)
redis.setex(cache_key, PERMISSION_CACHE_TTL, json.dumps(list(permissions)))
return permissions
# Invalidate on role change
def invalidate_permission_cache(user_id: str):
redis.delete(f"permissions:{user_id}")
4. VPN & Network Security
4.1 Architecture Overview
INTERNET
|
[Cloudflare/CDN]
|
+-----v------+
| Nginx | <-- TLS termination, WAF
| (WAF) |
+-----+------+
|
+-----------+-----------+
| |
[App Servers] [VPN Server]
(Private Subnet) (WireGuard)
| |
+-----------+-----------+
|
[Database]
[Object Storage]
[Cache]
|
WIREGUARD VPN (UDP 51820)
|
+-----v------+
| Edge |
| Gateway | <-- Site network entry point
| 192.168.29.1|
+-----+------+
|
+---------+---------+
| |
[DVR/NVR] [Local Admin]
192.168.29.200 (Wired LAN only)
|
[IP Cameras]
192.168.29.0/25
4.2 WireGuard VPN Configuration
Cloud VPN Server Configuration:
# /etc/wireguard/wg0.conf
[Interface]
Address = 10.200.0.1/24
ListenPort = 51820
PrivateKey = <SERVER_PRIVATE_KEY>
# Hardening
MTU = 1420
Table = off
PreUp = iptables -A FORWARD -i wg0 -j ACCEPT; iptables -t nat -A POSTROUTING -o eth0 -j MASQUERADE
PreDown = iptables -D FORWARD -i wg0 -j ACCEPT; iptables -t nat -D POSTROUTING -o eth0 -j MASQUERADE
# Peer: Edge Gateway (Site 1)
[Peer]
PublicKey = <EDGE_PUBLIC_KEY>
PresharedKey = <PRESHARED_KEY>
AllowedIPs = 10.200.0.2/32, 192.168.29.0/24
PersistentKeepalive = 25
# Peer: Backup Edge (Site 2)
[Peer]
PublicKey = <BACKUP_EDGE_PUBLIC_KEY>
PresharedKey = <PRESHARED_KEY_2>
AllowedIPs = 10.200.0.3/32, 192.168.30.0/24
PersistentKeepalive = 25
Edge Gateway Configuration:
# /etc/wireguard/wg0.conf
[Interface]
Address = 10.200.0.2/32
PrivateKey = <EDGE_PRIVATE_KEY>
DNS = 1.1.1.1, 1.0.0.1
# Hardening
MTU = 1420
Table = off
[Peer]
PublicKey = <SERVER_PUBLIC_KEY>
PresharedKey = <PRESHARED_KEY>
AllowedIPs = 10.0.0.0/8 # Cloud private range only
Endpoint = vpn.surveillance.example.com:51820
PersistentKeepalive = 25
4.3 VPN Security Hardening
| Hardening Measure | Implementation |
|---|---|
| Preshared Keys | All peers use PSK for quantum-resistant layer |
| Persistent Keepalive | 25 seconds — maintains NAT traversal without excess traffic |
| AllowedIPs Restriction | Strict — only necessary networks allowed |
| Key Generation | `wg genkey |
| Key Storage | Private keys in /etc/wireguard/ with permissions 600 |
| No Logging | WireGuard is stateless — no connection logs by design |
| Endpoint Hiding | VPN endpoint not published in DNS; IP whitelisting on firewall |
| DDoS Protection | Rate limiting on UDP/51820 at cloud firewall level |
4.4 Firewall Rules — Cloud VPN Server
#!/bin/bash
# /usr/local/bin/vpn-firewall.sh
# Default deny
iptables -P INPUT DROP
iptables -P FORWARD DROP
iptables -P OUTPUT ACCEPT
# Allow loopback
iptables -A INPUT -i lo -j ACCEPT
# Allow established connections
iptables -A INPUT -m conntrack --ctstate ESTABLISHED,RELATED -j ACCEPT
# Allow WireGuard (only from known peer IPs)
iptables -A INPUT -p udp --dport 51820 -s <EDGE_PUBLIC_IP> -j ACCEPT
iptables -A INPUT -p udp --dport 51820 -s <BACKUP_EDGE_IP> -j ACCEPT
# Allow SSH (bastion host only)
iptables -A INPUT -p tcp --dport 22 -s <BASTION_IP>/32 -j ACCEPT
# Allow WireGuard peers to access cloud services
iptables -A FORWARD -i wg0 -o eth0 -m conntrack --ctstate NEW -j ACCEPT
# Restrict what VPN peers can access
iptables -A FORWARD -i wg0 -d 10.0.1.0/24 -p tcp --match multiport --dports 443,8080 -j ACCEPT # App servers
iptables -A FORWARD -i wg0 -d 10.0.2.0/24 -p tcp --dport 5432 -j ACCEPT # Database
iptables -A FORWARD -i wg0 -j LOG --log-prefix "WG-DENIED: "
iptables -A FORWARD -i wg0 -j DROP
# Prevent VPN clients from talking to each other (isolation)
iptables -A FORWARD -i wg0 -o wg0 -j DROP
4.5 Firewall Rules — Edge Gateway
#!/bin/bash
# /usr/local/bin/edge-firewall.sh
# Default policies
iptables -P INPUT DROP
iptables -P FORWARD DROP
iptables -P OUTPUT DROP
### INPUT CHAIN ###
# Allow loopback
iptables -A INPUT -i lo -j ACCEPT
# Allow established
iptables -A INPUT -m conntrack --ctstate ESTABLISHED,RELATED -j ACCEPT
# Allow WireGuard from cloud server ONLY
iptables -A INPUT -p udp --dport 51820 -s <CLOUD_VPN_IP> -j ACCEPT
# Allow local admin access from LAN only
iptables -A INPUT -s 192.168.29.0/24 -p tcp --dport 22 -j ACCEPT # SSH
iptables -A INPUT -s 192.168.29.0/24 -p tcp --dport 443 -j ACCEPT # Local HTTPS
# Allow DVR to communicate (if needed)
iptables -A INPUT -s 192.168.29.200/32 -j ACCEPT
# Drop everything else and log
iptables -A INPUT -j LOG --log-prefix "EDGE-IN-DENIED: " --log-level 4
iptables -A INPUT -j DROP
### OUTPUT CHAIN ###
# Allow DNS
iptables -A OUTPUT -p udp --dport 53 -j ACCEPT
iptables -A OUTPUT -p tcp --dport 53 -j ACCEPT
# Allow NTP
iptables -A OUTPUT -p udp --dport 123 -j ACCEPT
# Allow WireGuard tunnel
iptables -A OUTPUT -p udp --dport 51820 -d <CLOUD_VPN_IP> -j ACCEPT
# Allow HTTPS to cloud API
iptables -A OUTPUT -d 10.0.0.0/8 -p tcp --dport 443 -j ACCEPT
# Allow local network communication
iptables -A OUTPUT -d 192.168.29.0/24 -j ACCEPT
# Drop everything else
iptables -A OUTPUT -j LOG --log-prefix "EDGE-OUT-DENIED: " --log-level 4
iptables -A OUTPUT -j DROP
### FORWARD CHAIN ###
# No forwarding between interfaces (edge gateway is not a router)
iptables -A FORWARD -j DROP
4.6 Network Segmentation
Site Network: 192.168.29.0/24
+------------------------------------------+
| VLAN 10: Management |
| 192.168.29.0/26 |
| Gateway: 192.168.29.1 |
| Admin Workstations: 192.168.29.10-30 |
| Jump Host: 192.168.29.5 |
+------------------------------------------+
+------------------------------------------+
| VLAN 20: Cameras/DVR |
| 192.168.29.128/25 |
| DVR: 192.168.29.200 |
| Cameras: 192.168.29.201-250 |
+------------------------------------------+
Inter-VLAN Rules:
- VLAN 10 can initiate to VLAN 20 (for admin access)
- VLAN 20 CANNOT initiate to VLAN 10
- Internet access ONLY through WireGuard tunnel
- DVR has NO default gateway (one-way communication)
4.7 DVR Protection
DVR_Security:
network:
- No default gateway configured
- Static IP: 192.168.29.200/25
- DNS: None configured
- Can only be reached from edge gateway or local admin VLAN
access:
- Default admin credentials changed immediately
- Local admin account only (no cloud admin)
- Password: 32+ character random string
- Stored in encrypted vault, never in configuration files
services:
- Disable UPnP
- Disable cloud connectivity features
- Disable remote access services
- Disable telnet/SSH if not needed
- Keep only RTSP/RTSPS enabled
- Enable RTSPS (encrypted) if supported
physical:
- DVR in locked rack/cabinet
- No USB ports accessible
- BIOS password protected
5. Secret Management
5.1 Secret Categories & Storage
| Secret Type | Storage Method | Encryption |
|---|---|---|
| Database Passwords | HashiCorp Vault / AWS Secrets Manager | AES-256-GCM |
| Telegram Bot Tokens | Vault KV v2 | AES-256-GCM + Auto-rotation |
| WhatsApp API Keys | Vault KV v2 | AES-256-GCM + Auto-rotation |
| DVR Credentials | Vault KV v2 | AES-256-GCM |
| JWT Signing Keys | Vault Transit Engine | RSA-4096 / ECDSA P-256 |
| Database Encryption Key | Cloud KMS (AWS KMS / Azure Key Vault) | Hardware Security Module |
| API Keys (External) | Vault KV v2 | AES-256-GCM |
| TLS Certificates | Vault PKI Engine | RSA-4096 / ECDSA P-256 |
5.2 Database Encryption at Rest
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2
import os
class DatabaseFieldEncryption:
"""
AES-256-GCM field-level encryption for sensitive database columns.
Each field has unique IV. Authentication tag prevents tampering.
"""
def __init__(self, master_key: bytes):
self.master_key = master_key
def encrypt_field(self, plaintext: str, context: str = "") -> str:
"""Encrypt a field value with authenticated encryption."""
# Derive field-specific key using context
field_key = self._derive_key(self.master_key, context)
# Generate random 96-bit IV (never reuse with same key)
iv = os.urandom(12)
# Associated data for authentication binding
aad = context.encode()
aesgcm = AESGCM(field_key)
ciphertext = aesgcm.encrypt(iv, plaintext.encode(), aad)
# Format: base64(iv || ciphertext || auth_tag)
combined = iv + ciphertext
return base64.b64encode(combined).decode()
def decrypt_field(self, encrypted: str, context: str = "") -> str:
"""Decrypt and verify field value."""
combined = base64.b64decode(encrypted.encode())
iv = combined[:12]
ciphertext = combined[12:]
field_key = self._derive_key(self.master_key, context)
aad = context.encode()
aesgcm = AESGCM(field_key)
plaintext = aesgcm.decrypt(iv, ciphertext, aad)
return plaintext.decode()
def _derive_key(self, master_key: bytes, context: str) -> bytes:
"""Derive field-specific key from master key."""
kdf = PBKDF2(
algorithm=hashes.SHA256(),
length=32,
salt=context.encode(),
iterations=100000
)
return kdf.derive(master_key)
# Application: Database Model
class Person(Base):
__tablename__ = "persons"
id = Column(String(36), primary_key=True)
name_encrypted = Column(Text) # Encrypted: personal name
face_encoding_encrypted = Column(LargeBinary) # Encrypted: biometric
id_number_encrypted = Column(Text) # Encrypted: government ID
consent_status = Column(String(20)) # Plain: consent tracking
created_at = Column(DateTime) # Plain: audit timestamp
@property
def name(self) -> str:
return encryption.decrypt_field(self.name_encrypted, f"person:{self.id}:name")
@name.setter
def name(self, value: str):
self.name_encrypted = encryption.encrypt_field(value, f"person:{self.id}:name")
5.3 Telegram/WhatsApp Token Encryption
class TokenVault:
"""
Secure storage for third-party service tokens.
Tokens are encrypted at rest and only decrypted in memory when needed.
"""
def __init__(self, vault_client):
self.vault = vault_client
self._memory_cache = {} # TTL cache for decrypted tokens
def store_telegram_token(self, bot_id: str, token: str):
"""Store Telegram bot token in Vault."""
self.vault.secrets.kv.v2.create_or_update_secret(
path=f"integrations/telegram/{bot_id}",
secret={"token": token, "created_at": datetime.utcnow().isoformat()}
)
def get_telegram_token(self, bot_id: str) -> str:
"""Retrieve and decrypt Telegram token."""
# Check memory cache first (TTL: 5 minutes)
cached = self._memory_cache.get(f"tg:{bot_id}")
if cached and cached["expires"] > time.time():
return cached["value"]
# Fetch from Vault
secret = self.vault.secrets.kv.v2.read_secret_version(
path=f"integrations/telegram/{bot_id}"
)
token = secret["data"]["data"]["token"]
# Store in memory cache with short TTL
self._memory_cache[f"tg:{bot_id}"] = {
"value": token,
"expires": time.time() + 300
}
return token
def rotate_telegram_token(self, bot_id: str, new_token: str):
"""Rotate token with versioning."""
# Store new version
self.store_telegram_token(bot_id, new_token)
# Clear cache
self._memory_cache.pop(f"tg:{bot_id}", None)
5.4 Environment Variable Security
# docker-compose.yml — Environment variable handling
services:
app:
image: surveillance-app:latest
environment:
# Non-sensitive configuration
- APP_ENV=production
- LOG_LEVEL=info
# Secrets injected at runtime (NOT in compose file)
- DATABASE_PASSWORD_FILE=/run/secrets/db_password
- JWT_PRIVATE_KEY_FILE=/run/secrets/jwt_key
- KMS_KEY_ID_FILE=/run/secrets/kms_key
# Never set actual secret values here
secrets:
- db_password
- jwt_key
- kms_key
# Read-only filesystem
read_only: true
tmpfs:
- /tmp:noexec,nosuid,size=100m
secrets:
db_password:
external: true # Managed by Docker secrets or external vault
jwt_key:
external: true
kms_key:
external: true
5.5 Kubernetes Secret Management (if applicable)
# External Secrets Operator — sync from Vault
apiVersion: external-secrets.io/v1beta1
kind: ExternalSecret
metadata:
name: surveillance-secrets
namespace: surveillance
spec:
refreshInterval: "1h"
secretStoreRef:
kind: ClusterSecretStore
name: vault-backend
target:
name: surveillance-credentials
creationPolicy: Owner
template:
type: Opaque
data:
database-url: "postgresql://{{ .db_user }}:{{ .db_password }}@db:5432/surveillance"
jwt-private-key: "{{ .jwt_key }}"
data:
- secretKey: db_user
remoteRef:
key: surveillance/database
property: username
- secretKey: db_password
remoteRef:
key: surveillance/database
property: password
- secretKey: jwt_key
remoteRef:
key: surveillance/jwt
property: private_key
5.6 No Secrets in Logs
import copy
import re
SENSITIVE_PATTERNS = [
(r'password["\']?\s*[:=]\s*["\']?[^"\'\s&]+', 'password":"***"'),
(r'token["\']?\s*[:=]\s*["\']?[^"\'\s&]+', 'token":"***"'),
(r'secret["\']?\s*[:=]\s*["\']?[^"\'\s&]+', 'secret":"***"'),
(r'key["\']?\s*[:=]\s*["\']?[^"\'\s&]+', 'key":"***"'),
(r'authorization["\']?\s*[:=]\s*["\']?[^"\'\s&]+', 'authorization":"***"'),
(r'api[_-]?key["\']?\s*[:=]\s*["\']?[^"\'\s&]+', 'api_key":"***"'),
]
def sanitize_for_logging(data: dict) -> dict:
"""Remove all sensitive fields before logging."""
sanitized = copy.deepcopy(data)
sensitive_keys = {
'password', 'secret', 'token', 'api_key', 'private_key',
'credential', 'authorization', 'cookie', 'session',
'credit_card', 'ssn', 'face_encoding'
}
def _redact(obj):
if isinstance(obj, dict):
for key in obj:
if any(sk in key.lower() for sk in sensitive_keys):
obj[key] = "***REDACTED***"
else:
_redact(obj[key])
elif isinstance(obj, list):
for item in obj:
_redact(item)
_redact(sanitized)
return sanitized
def sanitize_log_message(message: str) -> str:
"""Redact sensitive patterns from log strings."""
for pattern, replacement in SENSITIVE_PATTERNS:
message = re.sub(pattern, replacement, message, flags=re.IGNORECASE)
return message
6. Audit Logging
6.1 Audit Log Schema
{
"timestamp": "2024-01-15T08:30:00.000Z",
"event_id": "uuid-v4",
"event_type": "PERSON_DATA_CREATED",
"severity": "INFO",
"actor": {
"user_id": "uuid-hashed",
"username": "admin@company.com",
"role": "admin",
"ip_address": "203.0.113.45",
"session_id": "sess-uuid-hashed",
"user_agent": "Mozilla/5.0 (truncated)"
},
"resource": {
"type": "person",
"id": "person-uuid",
"name": "John Doe (hashed)"
},
"action": {
"type": "CREATE",
"details": {
"fields_modified": ["name", "face_encoding", "department"],
"old_values": null,
"new_values": {
"department": "Engineering"
}
}
},
"result": "SUCCESS",
"metadata": {
"source": "web-ui",
"request_id": "req-uuid",
"correlation_id": "corr-uuid"
}
}
6.2 Event Types to Log
| Category | Event Type | Severity | Log Contents |
|---|---|---|---|
| Authentication | LOGIN_SUCCESS | INFO | User, IP, method (password/MFA) |
| LOGIN_FAILURE | WARNING | Username attempted, IP, failure reason | |
| LOGOUT | INFO | User, session duration | |
| PASSWORD_CHANGE | NOTICE | User, self/admin initiated | |
| MFA_ENABLED | NOTICE | User, method | |
| MFA_DISABLED | WARNING | User, admin override | |
| SESSION_EXPIRED | INFO | User, expiry type (idle/absolute) | |
| Authorization | ACCESS_DENIED | WARNING | User, resource, required permission |
| PRIVILEGE_ESCALATION | NOTICE | User, old role, new role | |
| Person Data | PERSON_CREATED | NOTICE | Creator, person fields (hashed) |
| PERSON_UPDATED | NOTICE | Updater, changed fields | |
| PERSON_DELETED | WARNING | Deleter, person reference (hashed) | |
| PERSON_EXPORTED | WARNING | Exporter, filter criteria | |
| TRAINING_APPROVED | NOTICE | Approver, model version | |
| TRAINING_REJECTED | NOTICE | Rejector, reason | |
| Camera | CAMERA_ADDED | NOTICE | Admin, camera details |
| CAMERA_REMOVED | WARNING | Admin, camera reference | |
| CAMERA_CONFIG_CHANGED | NOTICE | Admin, changed settings | |
| Alerts | ALERT_CREATED | INFO | System-generated |
| ALERT_ACKNOWLEDGED | INFO | Operator, timestamp | |
| ALERT_ESCALATED | NOTICE | System/Admin, escalation path | |
| Configuration | SETTINGS_CHANGED | NOTICE | Admin, changed keys |
| POLICY_CHANGED | NOTICE | Admin, policy name, diff | |
| BACKUP_CREATED | INFO | Initiator, backup scope | |
| Media | MEDIA_ACCESSED | INFO | User, media ID, access method |
| MEDIA_EXPORTED | NOTICE | User, media IDs, destination | |
| MEDIA_DELETED | WARNING | User, retention policy/auto | |
| System | SECURITY_EVENT | CRITICAL | Details of security incident |
| VULNERABILITY_DETECTED | WARNING | CVE, affected component |
6.3 Tamper-Resistant Log Storage
import hashlib
import json
from datetime import datetime
class TamperResistantLogger:
"""
Append-only audit log with cryptographic chain.
Each entry includes hash of previous entry for integrity verification.
"""
def __init__(self, storage_backend, signing_key: bytes):
self.storage = storage_backend
self.signing_key = signing_key
self.previous_hash = self._get_last_hash()
def log_event(self, event: dict) -> str:
"""Append event with integrity chain."""
event_id = str(uuid.uuid4())
timestamp = datetime.utcnow().isoformat() + "Z"
entry = {
"event_id": event_id,
"timestamp": timestamp,
"previous_hash": self.previous_hash,
"event": event
}
# Serialize for hashing (canonical JSON)
entry_json = json.dumps(entry, sort_keys=True, ensure_ascii=True)
# Calculate hash
entry_hash = hashlib.sha256(entry_json.encode()).hexdigest()
# Sign the entry
signature = hmac.new(
self.signing_key,
entry_hash.encode(),
hashlib.sha256
).hexdigest()
final_entry = {
**entry,
"entry_hash": entry_hash,
"signature": signature
}
# Store in append-only log
self.storage.append(final_entry)
# Update chain
self.previous_hash = entry_hash
return event_id
def verify_chain(self) -> tuple[bool, list]:
"""Verify integrity of entire log chain."""
entries = self.storage.get_all()
tampered_entries = []
for i, entry in enumerate(entries):
# Verify entry hash
check_entry = {k: v for k, v in entry.items()
if k not in ("entry_hash", "signature")}
expected_hash = hashlib.sha256(
json.dumps(check_entry, sort_keys=True).encode()
).hexdigest()
if expected_hash != entry["entry_hash"]:
tampered_entries.append((i, "hash_mismatch"))
continue
# Verify signature
expected_sig = hmac.new(
self.signing_key,
entry["entry_hash"].encode(),
hashlib.sha256
).hexdigest()
if not hmac.compare_digest(expected_sig, entry["signature"]):
tampered_entries.append((i, "signature_invalid"))
# Verify chain link (skip first entry)
if i > 0:
prev_entry = entries[i - 1]
if entry["previous_hash"] != prev_entry["entry_hash"]:
tampered_entries.append((i, "chain_broken"))
return len(tampered_entries) == 0, tampered_entries
6.4 Log Storage Architecture
Application Loggers
|
v
+--------------+ +--------------+ +------------------+
| Filebeat/ | --> | Logstash/ | --> | Elasticsearch |
| Fluent Bit | | Vector | | (Hot: 7 days) |
+--------------+ +--------------+ +------------------+
|
+--------v---------+
| S3/GCS (Warm: |
| 90 days, |
| WORM enabled) |
+------------------+
|
+--------v---------+
| Glacier/Coldline |
| (Cold: 7 years) |
| Immutable |
+------------------+
6.5 Log Retention Policy
| Log Type | Hot Storage | Warm Storage | Cold Storage | Total Retention |
|---|---|---|---|---|
| Authentication Events | 30 days | 1 year | 6 years | 7 years |
| Authorization Events | 30 days | 1 year | 6 years | 7 years |
| Person Data Changes | 30 days | 2 years | 5 years | 7 years |
| Media Access Logs | 7 days | 90 days | 1 year | ~2 years |
| System Events | 7 days | 90 days | 1 year | ~2 years |
| Debug/Info Logs | 3 days | 14 days | — | 14 days |
7. Media Access Security
7.1 Signed URL Architecture
User Request
|
v
+--------+ +------------------+ +---------------+ +--------+
| API | -> | URL Signing | -> | Object Storage | | CDN |
| Server | | Service | | (Private) | | (Edge) |
+--------+ +------------------+ +---------------+ +--------+
| ^ |
| Generates signed | |
| URL (time-limited) | |
+-------------------------+ |
|
User <- Redirects to signed URL ---------------------------------+
7.2 Signed URL Implementation
import hashlib
import hmac
import base64
import time
from urllib.parse import quote
class MediaAccessController:
"""
Time-limited signed URL generation for media access.
URLs expire after configured TTL and are single-use or limited-use.
"""
def __init__(self, storage_client, signing_secret: bytes,
url_ttl: int = 300):
self.storage = storage_client
self.signing_secret = signing_secret
self.url_ttl = url_ttl # Default 5 minutes
def generate_signed_url(self, user: User, media_id: str,
action: str = "view") -> dict:
"""Generate time-limited signed URL for media access."""
# Verify user has permission
if not self.check_media_permission(user, media_id, action):
raise PermissionError("Access denied for this media")
# Get media metadata
media = self.storage.get_metadata(media_id)
# Generate expiration timestamp
expiry = int(time.time()) + self.url_ttl
# Create signed URL (cloud-agnostic pattern)
object_path = f"media/{media.camera_id}/{media.date_path}/{media.filename}"
# Build signature base string
signature_data = f"GET\n{expiry}\n{object_path}\n{user.id}\n{media_id}"
signature = base64.b64encode(
hmac.new(self.signing_secret, signature_data.encode(),
hashlib.sha256).digest()
).decode()
# Build URL with signature parameters
signed_url = (
f"https://media.surveillance.example.com/{object_path}?"
f"expires={expiry}&"
f"user={quote(user.id)}&"
f"media={media_id}&"
f"signature={quote(signature)}"
)
# Log access grant
self.log_media_access(user, media_id, action, expiry)
return {
"signed_url": signed_url,
"expires_at": datetime.utcfromtimestamp(expiry).isoformat() + "Z",
"ttl_seconds": self.url_ttl,
"single_use": action == "download"
}
def verify_signed_url(self, url_params: dict) -> bool:
"""Verify signed URL request at edge/CDN."""
expiry = int(url_params.get("expires", 0))
# Check expiration
if time.time() > expiry:
return False
# Reconstruct and verify signature
expected_sig = base64.b64encode(
hmac.new(
self.signing_secret,
f"GET\n{expiry}\n{object_path}\n{user_id}\n{media_id}".encode(),
hashlib.sha256
).digest()
).decode()
if not hmac.compare_digest(expected_sig, url_params["signature"]):
return False
# Check single-use token if applicable
if url_params.get("token"):
if self.storage.is_token_used(url_params["token"]):
return False
self.storage.mark_token_used(url_params["token"], expiry)
return True
7.3 Cloud Storage Integration
# AWS S3 Signed URLs
import boto3
from botocore.config import Config
class S3MediaStorage:
def __init__(self, bucket_name: str):
self.s3 = boto3.client(
's3',
config=Config(signature_version='s3v4')
)
self.bucket = bucket_name
def generate_presigned_url(self, object_key: str,
expiry: int = 300) -> str:
return self.s3.generate_presigned_url(
'get_object',
Params={
'Bucket': self.bucket,
'Key': object_key,
'ResponseContentDisposition': 'attachment' # Force download
},
ExpiresIn=expiry
)
# Google Cloud Storage Signed URLs
from google.cloud import storage
from google.cloud.storage import Blob
class GCSMediaStorage:
def generate_signed_url(self, blob_name: str,
expiry: int = 300,
method: str = "GET") -> str:
blob = self.bucket.blob(blob_name)
return blob.generate_signed_url(
version="v4",
expiration=timedelta(seconds=expiry),
method=method,
# Binding to specific requester
headers={"x-goog-resumable": "start"} if method == "POST" else None
)
7.4 Access Logging for Media
class MediaAccessLogger:
"""Comprehensive logging for all media access events."""
def log_media_access(self, user: User, media: Media,
access_type: str, delivery_method: str):
event = {
"event_type": "MEDIA_ACCESSED",
"timestamp": datetime.utcnow().isoformat() + "Z",
"actor": {
"user_id": hash_id(user.id),
"role": user.role,
"ip": user.current_ip
},
"media": {
"media_id": media.id,
"camera_id": media.camera_id,
"camera_zone": media.zone_id,
"recording_time": media.start_time.isoformat(),
"contains_persons": media.has_person_data,
"retention_classification": media.retention_class
},
"access": {
"type": access_type, # view, download, export, api
"delivery": delivery_method, # stream, signed_url, direct
"quality": media.requested_quality
},
"compliance": {
"gdpr_lawful_basis": "legitimate_interest", # or "consent"
"consent_verified": media.consent_verified if media.has_person_data else None,
"dpo_notified": False # Flagged if sensitive
}
}
# High-sensitivity media triggers additional review
if media.has_person_data and media.confidence_score > 0.9:
event["compliance"]["dpo_notified"] = True
self.notify_dpo(event)
self.audit_logger.log(event)
7.5 Retention-Based Auto-Deletion
class DataRetentionEnforcer:
"""
Automated enforcement of data retention policies.
Deletes media and associated metadata based on configured retention periods.
"""
RETENTION_POLICIES = {
"general_footage": timedelta(days=30),
"alert_footage": timedelta(days=90),
"person_detected": timedelta(days=365),
"investigation": timedelta(days=2555), # 7 years
"legal_hold": None # Never delete (manual override)
}
def enforce_retention(self):
"""Daily cron job to enforce retention policies."""
for policy_name, retention_period in self.RETENTION_POLICIES.items():
if retention_period is None:
continue # Legal hold — never delete
cutoff_date = datetime.utcnow() - retention_period
# Find expired media
expired_media = db.query(Media).filter(
Media.retention_policy == policy_name,
Media.created_at < cutoff_date,
Media.legal_hold == False
).all()
for media in expired_media:
self._securely_delete_media(media)
def _securely_delete_media(self, media: Media):
"""Cryptographically secure deletion with audit trail."""
# 1. Log deletion intent
self.audit_logger.log({
"event_type": "MEDIA_DELETION_SCHEDULED",
"media_id": media.id,
"reason": "retention_policy",
"scheduled_at": datetime.utcnow().isoformat()
})
# 2. Overwrite object metadata with deletion marker
media.deletion_marked_at = datetime.utcnow()
media.deletion_reason = "retention_expired"
# 3. Delete from object storage
self.storage.delete_object(media.object_key)
# 4. Delete face embeddings if associated
if media.face_embeddings:
for embedding_id in media.face_embeddings:
self.delete_face_embedding(embedding_id)
# 5. Soft-delete database record (retain metadata for audit)
media.deleted = True
media.object_key = None # Remove reference to deleted object
db.commit()
# 6. Log completion
self.audit_logger.log({
"event_type": "MEDIA_DELETED",
"media_id": media.id,
"retention_policy": media.retention_policy,
"deleted_at": datetime.utcnow().isoformat()
})
8. API Security
8.1 Rate Limiting
from redis import Redis
from functools import wraps
import time
class RateLimiter:
"""
Multi-tier rate limiting:
1. Global rate limit (all users)
2. Per-user rate limit
3. Per-endpoint rate limit
4. Per-resource rate limit
"""
RATE_LIMITS = {
# Authentication endpoints — strict
"auth:login": {"requests": 5, "window": 60},
"auth:refresh": {"requests": 10, "window": 60},
"auth:mfa": {"requests": 5, "window": 60},
# Media endpoints — moderate
"media:stream": {"requests": 60, "window": 60},
"media:download": {"requests": 30, "window": 60},
"media:export": {"requests": 5, "window": 3600},
# Data modification — moderate
"person:create": {"requests": 30, "window": 60},
"person:update": {"requests": 30, "window": 60},
"person:delete": {"requests": 10, "window": 60},
# Read operations — generous
"camera:list": {"requests": 120, "window": 60},
"alert:list": {"requests": 120, "window": 60},
"event:list": {"requests": 120, "window": 60},
# Admin operations — strict
"user:create": {"requests": 10, "window": 60},
"settings:update": {"requests": 10, "window": 60},
"backup:create": {"requests": 2, "window": 3600},
}
def __init__(self, redis_client: Redis):
self.redis = redis_client
def is_allowed(self, user_id: str, endpoint_key: str) -> tuple[bool, dict]:
"""Check if request is within rate limits using sliding window."""
limit_config = self.RATE_LIMITS.get(endpoint_key,
{"requests": 100, "window": 60})
key = f"ratelimit:{user_id}:{endpoint_key}"
window = limit_config["window"]
max_requests = limit_config["requests"]
now = time.time()
window_start = now - window
# Use Redis sorted set for sliding window
pipe = self.redis.pipeline()
# Remove entries outside the window
pipe.zremrangebyscore(key, 0, window_start)
# Count current entries in window
pipe.zcard(key)
# Add current request
pipe.zadd(key, {str(now): now})
# Set expiry on the key
pipe.expire(key, window)
results = pipe.execute()
current_count = results[1]
if current_count >= max_requests:
# Get time until oldest entry expires
oldest = self.redis.zrange(key, 0, 0, withscores=True)
retry_after = int(oldest[0][1] + window - now) if oldest else window
return False, {
"error": "Rate limit exceeded",
"limit": max_requests,
"window": window,
"retry_after": max(1, retry_after)
}
return True, {
"limit": max_requests,
"remaining": max_requests - current_count - 1,
"reset": int(now + window)
}
# FastAPI middleware integration
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
# Skip rate limiting for health checks
if request.url.path in ["/health", "/metrics"]:
return await call_next(request)
user = getattr(request.state, "user", None)
user_id = user.id if user else request.client.host
endpoint_key = f"{request.method.lower()}:{request.url.path}"
allowed, headers = rate_limiter.is_allowed(user_id, endpoint_key)
if not allowed:
return JSONResponse(
status_code=429,
content={"error": headers["error"]},
headers={
"X-RateLimit-Limit": str(headers["limit"]),
"X-RateLimit-Retry-After": str(headers["retry_after"]),
"Retry-After": str(headers["retry_after"])
}
)
response = await call_next(request)
response.headers["X-RateLimit-Limit"] = str(headers["limit"])
response.headers["X-RateLimit-Remaining"] = str(headers["remaining"])
response.headers["X-RateLimit-Reset"] = str(headers["reset"])
return response
8.2 Input Validation & Sanitization
from pydantic import BaseModel, Field, validator, constr
from email_validator import validate_email
import re
# Strict input models with validation
class PersonCreateRequest(BaseModel):
name: constr(min_length=1, max_length=100, strip_whitespace=True)
email: constr(max_length=255) | None = None
department: constr(max_length=50) | None = None
employee_id: constr(max_length=50) | None = None
consent_status: constr(regex="^(granted|denied|pending|withdrawn)$")
face_image: str # Base64 encoded
@validator('name')
def validate_name(cls, v):
# Reject potential script injection
if re.search(r'[<>&;{}\[\]]', v):
raise ValueError("Name contains invalid characters")
# Normalize Unicode
return v.strip().title()
@validator('email')
def validate_email(cls, v):
if v is None:
return v
try:
validate_email(v)
return v.lower()
except Exception:
raise ValueError("Invalid email format")
@validator('face_image')
def validate_face_image(cls, v):
# Validate base64 encoding
try:
decoded = base64.b64decode(v)
except Exception:
raise ValueError("Invalid base64 encoding")
# Check file size (max 5MB)
if len(decoded) > 5 * 1024 * 1024:
raise ValueError("Image exceeds maximum size of 5MB")
# Validate image format
magic_bytes = {
b'\xff\xd8\xff': 'jpeg',
b'\x89PNG': 'png',
b'GIF87a': 'gif',
b'GIF89a': 'gif'
}
valid = False
for magic, fmt in magic_bytes.items():
if decoded.startswith(magic):
valid = True
break
if not valid:
raise ValueError("Invalid image format. Only JPEG, PNG allowed.")
return v
class CameraConfigurationRequest(BaseModel):
camera_id: constr(min_length=1, max_length=36)
name: constr(max_length=100)
zone_id: constr(max_length=36)
retention_days: int = Field(..., ge=1, le=2555) # 1 day to 7 years
privacy_mask: list[dict] | None = None # Coordinate validation
@validator('privacy_mask')
def validate_privacy_mask(cls, v):
if v is None:
return v
for mask in v:
if not all(k in mask for k in ('x', 'y', 'width', 'height')):
raise ValueError("Privacy mask missing required coordinates")
if not (0 <= mask['x'] <= 1 and 0 <= mask['y'] <= 1):
raise ValueError("Coordinates must be normalized [0,1]")
return v
8.3 SQL Injection Prevention
# ALWAYS use parameterized queries — NEVER concatenate SQL
# CORRECT — Parameterized query
def get_person_by_id(person_id: str) -> Person | None:
query = text("SELECT * FROM persons WHERE id = :person_id AND deleted = false")
result = db.execute(query, {"person_id": person_id})
return result.fetchone()
# CORRECT — ORM with automatic parameterization
def search_persons(name_pattern: str, department: str | None = None) -> list[Person]:
query = db.query(Person).filter(
Person.name.ilike(f"%{name_pattern}%"), # SQLAlchemy auto-escapes
Person.deleted == false()
)
if department:
query = query.filter(Person.department == department)
return query.limit(100).all()
# INCORRECT — NEVER DO THIS
def BAD_get_person_unsafe(user_input: str):
query = f"SELECT * FROM persons WHERE name = '{user_input}'" # SQL INJECTION!
return db.execute(query)
# Additional protection: Use ORM exclusively
# Raw SQL only for migrations and reporting with admin review
8.4 XSS Protection
from markupsafe import Markup, escape
import bleach
class XSSProtection:
"""
Defense in depth against XSS:
1. Output encoding on all user-generated content
2. Content Security Policy headers
3. Bleach sanitization for allowed HTML
"""
# Bleach configuration for rich text fields
ALLOWED_TAGS = ['p', 'br', 'strong', 'em', 'ul', 'ol', 'li']
ALLOWED_ATTRIBUTES = {}
ALLOWED_STYLES = []
@staticmethod
def sanitize_output(value: str, allow_markup: bool = False) -> str:
"""Sanitize output for safe HTML rendering."""
if not value:
return ""
if allow_markup:
return bleach.clean(
value,
tags=XSSProtection.ALLOWED_TAGS,
attributes=XSSProtection.ALLOWED_ATTRIBUTES,
strip=True
)
# Default: full HTML escape
return escape(value)
@staticmethod
def sanitize_json_value(value: any) -> any:
"""Recursively sanitize values for JSON output."""
if isinstance(value, str):
return escape(value)
elif isinstance(value, dict):
return {k: XSSProtection.sanitize_json_value(v)
for k, v in value.items()}
elif isinstance(value, list):
return [XSSProtection.sanitize_json_value(v) for v in value]
return value
# Content Security Policy Headers
def get_csp_header() -> str:
return (
"default-src 'self'; "
"script-src 'self' 'nonce-{nonce}' https://cdn.jsdelivr.net; "
"style-src 'self' 'unsafe-inline' https://fonts.googleapis.com; "
"img-src 'self' data: blob: https://media.surveillance.example.com; "
"media-src 'self' blob: https://media.surveillance.example.com; "
"connect-src 'self' https://api.surveillance.example.com wss://realtime.surveillance.example.com; "
"font-src 'self' https://fonts.gstatic.com; "
"frame-ancestors 'none'; "
"base-uri 'self'; "
"form-action 'self'; "
"upgrade-insecure-requests; "
"block-all-mixed-content"
)
8.5 CSRF Protection
# For cookie-based session authentication
from itsdangerous import URLSafeTimedSerializer, BadSignature
class CSRFProtection:
def __init__(self, secret_key: str):
self.serializer = URLSafeTimedSerializer(secret_key, salt="csrf-token")
def generate_token(self, session_id: str) -> str:
"""Generate CSRF token bound to session."""
return self.serializer.dumps(session_id)
def validate_token(self, token: str, session_id: str,
max_age: int = 3600) -> bool:
"""Validate CSRF token matches session."""
try:
stored_session = self.serializer.loads(token, max_age=max_age)
return hmac.compare_digest(stored_session, session_id)
except BadSignature:
return False
# Double-submit cookie pattern for state-changing operations
# Frontend must include X-CSRF-Token header matching cookie value
8.6 API Authentication (Bearer Tokens)
from fastapi import Security, HTTPException, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
security = HTTPBearer(auto_error=False)
async def authenticate_request(
credentials: HTTPAuthorizationCredentials = Security(security)
) -> User:
"""
Authenticate API requests using Bearer token (JWT access token).
Validates token signature, expiry, and binds to session.
"""
if not credentials:
raise HTTPException(status_code=401, detail="Authentication required")
scheme = credentials.scheme.lower()
if scheme != "bearer":
raise HTTPException(status_code=401, detail="Invalid authentication scheme")
token = credentials.credentials
try:
payload = token_manager.verify_token(token, token_type="access")
except AuthenticationError as e:
raise HTTPException(status_code=401, detail=str(e))
# Check if session is still valid (not revoked)
session_id = payload["sid"]
if not await session_store.is_valid(session_id):
raise HTTPException(status_code=401, detail="Session revoked")
# Load user
user = await user_store.get_by_id(payload["sub"])
if not user or not user.is_active:
raise HTTPException(status_code=401, detail="User inactive")
# Attach user to request state
return user
# Apply to protected routes
@app.get("/api/cameras")
async def list_cameras(user: User = Depends(authenticate_request)):
...
8.7 Request Size Limits
# Nginx request size limits
client_max_body_size 10M; # Upload limit
client_body_buffer_size 16K;
client_header_buffer_size 1K;
large_client_header_buffers 4 8K;
# Specific location limits
location /api/media/upload {
client_max_body_size 50M;
}
location /api/persons/batch {
client_max_body_size 20M;
}
8.8 CORS Configuration
from fastapi.middleware.cors import CORSMiddleware
# Strict CORS — only allow known origins
ALLOWED_ORIGINS = [
"https://surveillance.example.com",
"https://admin.surveillance.example.com",
# No wildcard origins
]
app.add_middleware(
CORSMiddleware,
allow_origins=ALLOWED_ORIGINS,
allow_credentials=True, # Required for cookies
allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
allow_headers=[
"Authorization",
"Content-Type",
"X-CSRF-Token",
"X-Request-ID"
],
expose_headers=[
"X-RateLimit-Limit",
"X-RateLimit-Remaining",
"X-RateLimit-Reset"
],
max_age=600 # Preflight cache
)
9. Session Security
9.1 Secure Cookie Configuration
# Cookie security settings
COOKIE_CONFIG = {
"access_token": {
"name": "access_token",
"httponly": True, # Not accessible via JavaScript
"secure": True, # HTTPS only
"samesite": "Strict", # Strict CSRF protection
"max_age": 900, # 15 minutes
"path": "/",
},
"refresh_token": {
"name": "refresh_token",
"httponly": True,
"secure": True,
"samesite": "Strict",
"max_age": 604800, # 7 days
"path": "/auth/refresh", # Only sent to refresh endpoint
},
"session_id": {
"name": "session_id",
"httponly": True,
"secure": True,
"samesite": "Strict",
"max_age": 28800, # 8 hours (absolute timeout)
"path": "/",
}
}
# Cookie setting in response
def set_secure_cookie(response, cookie_name: str, value: str,
config: dict = None):
cfg = config or COOKIE_CONFIG.get(cookie_name, {})
response.set_cookie(
key=cfg.get("name", cookie_name),
value=value,
httponly=cfg.get("httponly", True),
secure=cfg.get("secure", True),
samesite=cfg.get("samesite", "Strict"),
max_age=cfg.get("max_age"),
path=cfg.get("path", "/"),
domain=None # Host-only cookie — no subdomain sharing
)
9.2 Session Fixation Protection
class SessionManager:
"""
Secure session management with fixation protection.
Regenerates session ID on privilege level change.
"""
async def create_session(self, user: User, request: Request) -> Session:
"""Create new session with secure attributes."""
session_id = secrets.token_urlsafe(32)
# Generate browser fingerprint for binding
fingerprint = self._generate_fingerprint(request)
session = Session(
id=session_id,
user_id=user.id,
fingerprint=fingerprint,
created_at=datetime.utcnow(),
last_activity=datetime.utcnow(),
absolute_expiry=datetime.utcnow() + timedelta(hours=8),
ip_address=request.client.host,
user_agent=request.headers.get("user-agent", "")[:200]
)
await self.store.save(session)
return session
async def regenerate_session(self, user: User,
old_session: Session) -> Session:
"""Regenerate session ID on login/privilege change."""
# Create new session
new_session = await self.create_session(user, request)
# Transfer non-sensitive data from old session
new_session.preferred_language = old_session.preferred_language
# Invalidate old session
await self.store.revoke(old_session.id)
# Log session change
self.audit_logger.log({
"event_type": "SESSION_REGENERATED",
"old_session_id": hash_id(old_session.id),
"new_session_id": hash_id(new_session.id),
"user_id": hash_id(user.id)
})
return new_session
def _generate_fingerprint(self, request: Request) -> str:
"""Create browser fingerprint for session binding."""
components = [
request.headers.get("user-agent", ""),
request.headers.get("accept-language", ""),
request.headers.get("accept-encoding", ""),
]
fingerprint_data = "|".join(components)
return hashlib.sha256(fingerprint_data.encode()).hexdigest()[:16]
9.3 Concurrent Session Handling
class ConcurrentSessionManager:
"""
Manage concurrent sessions per user.
Limit: 3 concurrent sessions (configurable).
"""
MAX_CONCURRENT_SESSIONS = 3
async def handle_new_login(self, user: User, request: Request) -> Session:
"""Create new session, revoke oldest if limit reached."""
existing_sessions = await self.store.get_active_by_user(user.id)
if len(existing_sessions) >= self.MAX_CONCURRENT_SESSIONS:
# Sort by last activity (oldest first)
existing_sessions.sort(key=lambda s: s.last_activity)
# Revoke oldest sessions to make room
to_revoke = existing_sessions[:len(existing_sessions) -
self.MAX_CONCURRENT_SESSIONS + 1]
for old_session in to_revoke:
await self.revoke_session(old_session.id, reason="concurrent_limit")
return await self.session_manager.create_session(user, request)
async def revoke_session(self, session_id: str, reason: str = "logout"):
"""Revoke a session immediately."""
await self.store.revoke(session_id)
self.audit_logger.log({
"event_type": "SESSION_REVOKED",
"session_id": hash_id(session_id),
"reason": reason
})
async def revoke_all_user_sessions(self, user_id: str,
except_session: str = None):
"""Force logout all sessions for a user (e.g., password change)."""
sessions = await self.store.get_active_by_user(user_id)
for session in sessions:
if except_session and session.id == except_session:
continue
await self.revoke_session(session.id, reason="security_action")
9.4 Session Timeout Enforcement
class SessionTimeoutEnforcer:
"""
Enforce both idle and absolute session timeouts.
Runs as background task every minute.
"""
IDLE_TIMEOUT = 30 * 60 # 30 minutes
ABSOLUTE_TIMEOUT = 8 * 3600 # 8 hours
async def enforce_timeouts(self):
"""Check and expire timed-out sessions."""
now = datetime.utcnow()
# Check idle timeout
idle_cutoff = now - timedelta(seconds=self.IDLE_TIMEOUT)
idle_expired = await self.store.find_idle_since(idle_cutoff)
for session in idle_expired:
await self.revoke_session(session.id, reason="idle_timeout")
self.audit_logger.log({
"event_type": "SESSION_EXPIRED",
"session_id": hash_id(session.id),
"user_id": hash_id(session.user_id),
"reason": "idle_timeout",
"idle_duration_seconds": self.IDLE_TIMEOUT
})
# Check absolute timeout
abs_expired = await self.store.find_expired_before(now)
for session in abs_expired:
await self.revoke_session(session.id, reason="absolute_timeout")
self.audit_logger.log({
"event_type": "SESSION_EXPIRED",
"session_id": hash_id(session.id),
"user_id": hash_id(session.user_id),
"reason": "absolute_timeout",
"session_duration_hours": self.ABSOLUTE_TIMEOUT / 3600
})
async def update_activity(self, session_id: str):
"""Update last activity timestamp on each request."""
await self.store.update_last_activity(session_id, datetime.utcnow())
9.5 Force Logout Capability
@app.post("/api/admin/users/{user_id}/force-logout")
@rbac.require_permission("users:manage_sessions")
async def force_logout(user_id: str, request: Request,
reason: str = Body(...)):
"""
Admin endpoint to force logout all sessions for a user.
Requires super_admin or explicit user management permission.
"""
target_user = await user_store.get_by_id(user_id)
if not target_user:
raise HTTPException(status_code=404, detail="User not found")
# Prevent self-lockout for single admin
current_user = request.state.user
if target_user.id == current_user.id:
raise HTTPException(status_code=400,
detail="Cannot force logout yourself")
# Revoke all sessions
await session_manager.revoke_all_user_sessions(user_id)
# Log admin action
audit_logger.log({
"event_type": "ADMIN_FORCE_LOGOUT",
"actor_id": hash_id(current_user.id),
"target_user_id": hash_id(user_id),
"reason": reason,
"timestamp": datetime.utcnow().isoformat()
})
return {"status": "success", "message": f"All sessions for user revoked"}
10. Data Privacy & GDPR Compliance
10.1 GDPR Compliance Matrix
| GDPR Principle | Implementation |
|---|---|
| Lawful Basis | Documented legitimate interest assessment for each processing purpose |
| Data Minimization | Collect only necessary facial features; discard raw images after encoding |
| Purpose Limitation | Facial data used ONLY for security/safety purposes |
| Storage Limitation | Automated retention enforcement with cryptographic deletion |
| Accuracy | Regular review and correction procedures for person data |
| Integrity | Access controls, encryption, audit logging |
| Accountability | DPO appointment, privacy impact assessment, Records of Processing |
| Transparency | Privacy notice at entry points, signage near cameras |
10.2 Consent Tracking
class ConsentManager:
"""
Comprehensive consent management for facial recognition data.
Tracks consent lifecycle: granted -> withdrawn -> deleted.
"""
CONSENT_STATES = ["pending", "granted", "denied", "withdrawn", "expired"]
def record_consent(self, person_id: str, consent_type: str,
granted: bool, method: str,
document_reference: str = None) -> ConsentRecord:
"""Record consent with full audit trail."""
record = ConsentRecord(
id=uuid.uuid4(),
person_id=person_id,
consent_type=consent_type, # "facial_recognition", "data_storage"
status="granted" if granted else "denied",
method=method, # "written", "digital", "verbal"
document_reference=document_reference,
recorded_by=current_user.id,
recorded_at=datetime.utcnow(),
expiry_date=datetime.utcnow() + timedelta(days=365) # Annual renewal
)
db.add(record)
db.commit()
# Log consent event
audit_logger.log({
"event_type": "CONSENT_RECORDED",
"person_id": hash_id(person_id),
"consent_status": record.status,
"method": method,
"recorded_by": hash_id(current_user.id)
})
return record
def withdraw_consent(self, person_id: str,
withdrawal_method: str = "user_request"):
"""
Process consent withdrawal.
Triggers data anonymization or deletion workflow.
"""
# Update consent status
consent = db.query(ConsentRecord).filter_by(
person_id=person_id,
consent_type="facial_recognition"
).order_by(ConsentRecord.recorded_at.desc()).first()
if consent:
consent.status = "withdrawn"
consent.withdrawn_at = datetime.utcnow()
consent.withdrawal_method = withdrawal_method
db.commit()
# Trigger data handling workflow
if withdrawal_method == "user_request":
# Full deletion as per GDPR Article 17
self.initiate_deletion_workflow(person_id)
else:
# Anonymize for statistical purposes
self.initiate_anonymization_workflow(person_id)
audit_logger.log({
"event_type": "CONSENT_WITHDRAWN",
"person_id": hash_id(person_id),
"method": withdrawal_method
})
def check_consent_valid(self, person_id: str,
consent_type: str = "facial_recognition") -> bool:
"""Check if valid consent exists for processing."""
consent = db.query(ConsentRecord).filter_by(
person_id=person_id,
consent_type=consent_type
).order_by(ConsentRecord.recorded_at.desc()).first()
if not consent:
return False
if consent.status not in ["granted"]:
return False
if consent.expiry_date and consent.expiry_date < datetime.utcnow():
consent.status = "expired"
db.commit()
return False
return True
10.3 Right to Deletion (GDPR Article 17)
class DataDeletionService:
"""
Right to erasure implementation.
Securely deletes all personal data while preserving audit logs.
"""
def delete_person_data(self, person_id: str,
requested_by: str = None,
legal_basis: str = "user_request") -> dict:
"""
Complete deletion of person data:
1. Face encodings (biometric data)
2. Personal identifiers
3. Associated images with identifiable features
4. Historical detections linked to person
Retains:
- Audit logs (with hashed IDs)
- Anonymized statistics
"""
person = db.query(Person).get(person_id)
if not person:
raise ValueError("Person not found")
deletion_report = {
"person_id": hash_id(person_id),
"started_at": datetime.utcnow().isoformat(),
"deleted_items": []
}
# 1. Delete face embeddings (biometric data)
embeddings_deleted = db.query(FaceEmbedding).filter_by(
person_id=person_id
).delete()
deletion_report["deleted_items"].append({
"type": "face_embeddings", "count": embeddings_deleted
})
# 2. Delete personal images from storage
images = db.query(PersonImage).filter_by(person_id=person_id).all()
for img in images:
self.storage.delete_object(img.object_key)
db.delete(img)
deletion_report["deleted_items"].append({
"type": "person_images", "count": len(images)
})
# 3. Anonymize detection events (keep event, remove person link)
detections_updated = db.query(DetectionEvent).filter_by(
person_id=person_id
).update({
"person_id": None,
"person_anonymized": True
})
deletion_report["deleted_items"].append({
"type": "detection_events_anonymized",
"count": detections_updated
})
# 4. Delete person record
db.delete(person)
# 5. Delete consent records
consent_deleted = db.query(ConsentRecord).filter_by(
person_id=person_id
).delete()
db.commit()
deletion_report["completed_at"] = datetime.utcnow().isoformat()
# Log deletion
audit_logger.log({
"event_type": "PERSON_DATA_DELETED",
"person_id_hash": hash_id(person_id),
"requested_by": hash_id(requested_by) if requested_by else "system",
"legal_basis": legal_basis,
"items_deleted": deletion_report["deleted_items"]
})
return deletion_report
10.4 Anonymization Capability
class AnonymizationService:
"""
Anonymize data for analytics while protecting privacy.
Used when full deletion would destroy valuable safety data.
"""
def anonymize_person(self, person_id: str) -> str:
"""
Replace personal data with anonymous reference.
Returns anonymous ID for statistical tracking.
"""
anonymous_id = f"anon_{secrets.token_hex(8)}"
person = db.query(Person).get(person_id)
# Replace identifying data
person.name = None
person.email = None
person.employee_id = None
person.phone = None
person.anonymous_id = anonymous_id
person.anonymized_at = datetime.utcnow()
person.anonymized = True
# Delete face encoding (cannot be anonymized — biometric)
db.query(FaceEmbedding).filter_by(person_id=person_id).delete()
# Delete images
images = db.query(PersonImage).filter_by(person_id=person_id).all()
for img in images:
self.storage.delete_object(img.object_key)
db.delete(img)
# Keep detection events with anonymous link
db.query(DetectionEvent).filter_by(person_id=person_id).update({
"person_id": None,
"anonymous_id": anonymous_id
})
db.commit()
audit_logger.log({
"event_type": "PERSON_ANONYMIZED",
"original_id_hash": hash_id(person_id),
"anonymous_id": anonymous_id
})
return anonymous_id
def anonymize_detection_zones(self, image: np.ndarray,
zones: list[dict]) -> np.ndarray:
"""
Real-time anonymization: blur faces in video streams
for users without facial data access permission.
"""
from PIL import Image, ImageDraw, ImageFilter
img = Image.fromarray(image)
for zone in zones:
if zone.get("type") == "face" and zone.get("should_anonymize"):
# Extract face region
x, y, w, h = zone["bbox"]
face_region = img.crop((x, y, x+w, y+h))
# Apply heavy blur
blurred = face_region.filter(ImageFilter.GaussianBlur(radius=30))
# Replace in original
img.paste(blurred, (x, y))
return np.array(img)
10.5 Privacy Mode Settings
class PrivacyModeManager:
"""
Privacy mode controls for cameras and zones.
When privacy mode is enabled:
- No recording
- No facial recognition
- No alerts generated
- Live view shows privacy overlay
"""
PRIVACY_MODES = {
"full_operation": {
"recording": True,
"face_recognition": True,
"alerts": True,
"live_view": True,
"retention": "standard"
},
"recording_only": {
"recording": True,
"face_recognition": False,
"alerts": False, # Motion only, no person alerts
"live_view": True,
"retention": "reduced"
},
"live_view_only": {
"recording": False,
"face_recognition": False,
"alerts": False,
"live_view": True, # Real-time only, no storage
"retention": "none"
},
"privacy_mode": {
"recording": False,
"face_recognition": False,
"alerts": False,
"live_view": False, # Privacy overlay
"retention": "none"
}
}
def set_camera_privacy(self, camera_id: str, mode: str,
schedule: dict = None):
"""
Set privacy mode for a camera.
Optional scheduling for automatic mode changes.
"""
config = self.PRIVACY_MODES.get(mode)
if not config:
raise ValueError(f"Invalid privacy mode: {mode}")
camera = db.query(Camera).get(camera_id)
camera.privacy_mode = mode
camera.privacy_schedule = schedule
camera.privacy_updated_at = datetime.utcnow()
camera.privacy_updated_by = current_user.id
db.commit()
# Apply to edge gateway immediately
self.edge_gateway.update_camera_config(camera_id, {
"privacy_mode": mode,
"face_recognition": config["face_recognition"],
"recording": config["recording"]
})
audit_logger.log({
"event_type": "PRIVACY_MODE_CHANGED",
"camera_id": camera_id,
"new_mode": mode,
"changed_by": hash_id(current_user.id)
})
10.6 Data Retention Enforcement
class PrivacyRetentionEnforcer:
"""
Enforce privacy-compliant data retention.
Different retention periods based on data sensitivity.
"""
RETENTION_SCHEDULE = {
"raw_video_no_detection": {"days": 30, "auto_delete": True},
"raw_video_with_detection": {"days": 90, "auto_delete": True},
"alert_footage": {"days": 365, "auto_delete": True},
"face_embedding_consented": {"days": 365, "auto_delete": True},
"face_embedding_withdrawn": {"days": 0, "immediate_delete": True},
"person_record_active": {"days": None, "retain": True},
"person_record_deleted": {
"audit_log_days": 2555, # 7 years audit retention
"auto_anonymize": True
},
"training_data": {"days": 90, "auto_delete": True},
"export_logs": {"days": 2555, "auto_delete": True} # 7 years
}
async def enforce_daily(self):
"""Daily cron: enforce all retention policies."""
# Handle withdrawn consent (immediate deletion)
withdrawn = db.query(Person).filter(
Person.consent_status == "withdrawn",
Person.data_deleted == False
).all()
for person in withdrawn:
await self.delete_person_data(person.id,
legal_basis="consent_withdrawn")
# Handle expired consent
expired = db.query(Person).join(ConsentRecord).filter(
ConsentRecord.expiry_date < datetime.utcnow(),
ConsentRecord.status == "granted"
).all()
for person in expired:
ConsentRecord.status = "expired"
# Notify for renewal
await self.notify_consent_expiry(person)
# Standard retention cleanup
for policy_name, policy in self.RETENTION_SCHEDULE.items():
if policy.get("auto_delete"):
await self._apply_retention_policy(policy_name, policy)
11. Edge Gateway Security
11.1 Minimal Attack Surface
# Edge Gateway Configuration — Ubuntu Server LTS
edge_gateway:
os: "Ubuntu Server 22.04 LTS"
services_enabled:
- wireguard: "VPN tunnel only"
- docker: "Container runtime for AI inference"
- ntp: "Time synchronization"
- ssh: "Local admin access ONLY (key-based)"
services_disabled:
- bluetooth: "No wireless interfaces"
- wifi: "No wireless interfaces"
- cups: "No printing"
- avahi: "No mDNS"
- snapd: "Minimize attack surface"
- modemmanager: "Not needed"
- thermald: "Not needed"
11.2 SSH Hardening
# /etc/ssh/sshd_config
# Only key-based authentication
PasswordAuthentication no
PubkeyAuthentication yes
PermitEmptyPasswords no
ChallengeResponseAuthentication no
# Root login disabled
PermitRootLogin no
RootLogin no
# Only specific users allowed
AllowUsers edgeadmin
DenyUsers root
# Listen on LAN interface only
ListenAddress 192.168.29.1
# Hardening
MaxAuthTries 3
MaxSessions 2
ClientAliveInterval 300
ClientAliveCountMax 2
LoginGraceTime 30
Protocol 2
# Ciphers and algorithms
Ciphers chacha20-poly1305@openssh.com,aes256-gcm@openssh.com
MACs hmac-sha2-512-etm@openssh.com,hmac-sha2-256-etm@openssh.com
KexAlgorithms curve25519-sha256,curve25519-sha256@libssh.org
# Disable forwarding
X11Forwarding no
AllowTcpForwarding no
AllowAgentForwarding no
GatewayPorts no
PermitTunnel no
# Logging
SyslogFacility AUTH
LogLevel VERBOSE
11.3 Automatic Security Updates
#!/bin/bash
# /usr/local/bin/security-update.sh
# Unattended security updates
apt-get update
apt-get -y upgrade -o APT::Get::Upgrade-Allow-New="false"
# Auto-restart services that need it
if [ -f /var/run/reboot-required ]; then
# Notify admin via Telegram
/usr/local/bin/notify-admin.sh "Security reboot required for $(hostname)"
# Schedule reboot during maintenance window
shutdown -r 04:00 "Rebooting for security updates"
fi
# Cron: Every day at 3 AM
# 0 3 * * * /usr/local/bin/security-update.sh >> /var/log/security-updates.log 2>&1
# /etc/apt/apt.conf.d/50unattended-upgrades
Unattended-Upgrade::Allowed-Origins {
"${distro_id}:${distro_codename}-security";
"${distro_id}ESMApps:${distro_codename}-apps-security";
"${distro_id}ESM:${distro_codename}-infra-security";
};
Unattended-Upgrade::AutoFixInterruptedDpkg "true";
Unattended-Upgrade::MinimalSteps "true";
Unattended-Upgrade::InstallOnShutdown "false";
Unattended-Upgrade::Remove-Unused-Dependencies "true";
Unattended-Upgrade::Remove-New-Unused-Dependencies "true";
11.4 Tamper Detection
#!/usr/bin/env python3
# /usr/local/bin/tamper-detection.py
import hashlib
import json
import os
from datetime import datetime
CRITICAL_FILES = [
"/etc/wireguard/wg0.conf",
"/etc/ssh/sshd_config",
"/etc/ssh/ssh_host_ed25519_key.pub",
"/etc/iptables/rules.v4",
"/etc/systemd/system/edge-gateway.service",
"/usr/local/bin/edge-firewall.sh",
"/usr/local/bin/security-update.sh",
]
CRITICAL_DIRS = [
"/etc/wireguard/",
"/etc/ssh/",
"/usr/local/bin/",
]
STATE_FILE = "/var/lib/edge-gateway/file-integrity-state.json"
def compute_hash(filepath: str) -> str:
h = hashlib.sha256()
with open(filepath, 'rb') as f:
for chunk in iter(lambda: f.read(8192), b''):
h.update(chunk)
return h.hexdigest()
def initialize_baseline():
"""Run once to establish baseline."""
state = {}
for filepath in CRITICAL_FILES:
if os.path.exists(filepath):
state[filepath] = {
"hash": compute_hash(filepath),
"mtime": os.path.getmtime(filepath),
"size": os.path.getsize(filepath)
}
with open(STATE_FILE, 'w') as f:
json.dump(state, f, indent=2)
print(f"Baseline established with {len(state)} files")
def check_integrity():
"""Check file integrity against baseline."""
with open(STATE_FILE, 'r') as f:
baseline = json.load(f)
alerts = []
for filepath, expected in baseline.items():
if not os.path.exists(filepath):
alerts.append(f"MISSING: {filepath}")
continue
current_hash = compute_hash(filepath)
if current_hash != expected["hash"]:
alerts.append(
f"MODIFIED: {filepath} "
f"(expected: {expected['hash'][:16]}..., "
f"got: {current_hash[:16]}...)"
)
current_mtime = os.path.getmtime(filepath)
if current_mtime != expected["mtime"]:
alerts.append(f"MTIME_CHANGED: {filepath}")
# Check for new files in critical directories
for directory in CRITICAL_DIRS:
for root, dirs, files in os.walk(directory):
for f in files:
full_path = os.path.join(root, f)
if full_path not in baseline:
alerts.append(f"NEW_FILE: {full_path}")
if alerts:
print(f"ALERT: {len(alerts)} integrity violations detected")
for alert in alerts:
print(f" - {alert}")
# Send critical alert to cloud
send_tamper_alert(alerts)
else:
print(f"OK: All {len(baseline)} files verified at {datetime.utcnow().isoformat()}")
def send_tamper_alert(alerts: list):
"""Send tamper detection alert to cloud monitoring."""
import requests
payload = {
"gateway_id": os.environ["GATEWAY_ID"],
"timestamp": datetime.utcnow().isoformat(),
"alert_type": "TAMPER_DETECTED",
"severity": "CRITICAL",
"details": alerts
}
# Send via VPN tunnel
requests.post(
"https://cloud-api.surveillance.example.com/alerts/tamper",
json=payload,
timeout=10,
verify="/etc/ssl/certs/cloud-ca.pem"
)
if __name__ == "__main__":
import sys
if len(sys.argv) > 1 and sys.argv[1] == "init":
initialize_baseline()
else:
check_integrity()
11.5 Secure Boot & Disk Encryption
# Disk encryption for edge gateway (if hardware supports)
# LUKS full disk encryption
cryptsetup luksFormat --type luks2 /dev/nvme0n1p2
cryptsetup open /dev/nvme0n1p2 cryptroot
# Key management
# Option A: Passphrase (requires local admin on boot)
# Option B: TPM2 auto-unseal (headless operation)
# TPM2 auto-unlock setup
tpm2_createprimary -C o -g sha256 -G rsa -c primary.ctx
tpm2_create -g sha256 -u obj.pub -r obj.priv -C primary.ctx
tpm2_load -C primary.ctx -u obj.pub -r obj.priv -c unseal.ctx
tpm2_unseal -c unseal.ctx
# Measure boot components for tamper detection
tpm2_pcrread sha256:0,1,2,3,4,5,6,7
11.6 Local Access Only
# Edge Gateway Access Policy
access_policy:
remote_admin:
enabled: false # No remote admin except via VPN
vpn_admin:
enabled: true
method: "WireGuard tunnel to cloud bastion"
requires: "Cloud VPN + 2FA"
local_admin:
enabled: true
method: "Physical LAN connection + SSH key"
requires: "Physical presence at site"
emergency_access:
enabled: true
method: "Serial console + hardware token"
requires: "Physical device access"
prohibited:
- "Direct SSH from internet"
- "Remote desktop/VNC"
- "Telnet or unencrypted protocols"
- "Default credentials"
- "Password authentication"
11.7 Container Security (Edge AI Inference)
# Edge AI container — security hardened
FROM python:3.11-slim-bookworm AS base
# Non-root user
RUN groupadd -r inference && useradd -r -g inference inference
# Install only required packages
RUN apt-get update && apt-get install -y --no-install-recommends \
libgl1-mesa-glx \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
# Copy application as non-root
COPY --chown=inference:inference requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY --chown=inference:inference ./src ./src
# Read-only filesystem
USER inference
# No new privileges
SECURITY_OPTS:
- no-new-privileges:true
- read-only:true
- tmpfs:/tmp:noexec,nosuid,size=100m
- tmpfs:/cache:noexec,nosuid,size=500m
# Resource limits
RESOURCES:
memory: 2g
cpus: 2.0
pids: 100
# No network access except via VPN
NETWORK:
mode: "container:wireguard" # Only VPN network
HEALTHCHECK:
interval: 30s
timeout: 10s
retries: 3
command: python -c "import requests; requests.get('http://localhost:8080/health')"
12. Cloud Infrastructure Security
12.1 Security Groups / Firewall Rules
# Terraform — AWS Security Group Configuration
resource "aws_security_group" "app_server" {
name = "surveillance-app"
description = "Security group for surveillance application servers"
vpc_id = aws_vpc.main.id
# HTTPS inbound from load balancer only
ingress {
from_port = 443
to_port = 443
protocol = "tcp"
security_groups = [aws_security_group.load_balancer.id]
description = "HTTPS from load balancer"
}
# Health checks from load balancer
ingress {
from_port = 8080
to_port = 8080
protocol = "tcp"
security_groups = [aws_security_group.load_balancer.id]
description = "Health check endpoint"
}
# WireGuard from edge gateways only
ingress {
from_port = 51820
to_port = 51820
protocol = "udp"
cidr_blocks = ["${var.edge_gateway_public_ip}/32"]
description = "WireGuard VPN from edge"
}
# Outbound — restrictive
egress {
from_port = 443
to_port = 443
protocol = "tcp"
cidr_blocks = ["0.0.0.0/0"]
description = "HTTPS outbound"
}
egress {
from_port = 5432
to_port = 5432
protocol = "tcp"
cidr_blocks = [aws_subnet.database.cidr_block]
description = "PostgreSQL to database subnet"
}
egress {
from_port = 6379
to_port = 6379
protocol = "tcp"
cidr_blocks = [aws_subnet.cache.cidr_block]
description = "Redis to cache subnet"
}
tags = {
Name = "surveillance-app"
}
}
# Database security group — NO public access
resource "aws_security_group" "database" {
name = "surveillance-database"
description = "Database security group — private only"
vpc_id = aws_vpc.main.id
ingress {
from_port = 5432
to_port = 5432
protocol = "tcp"
security_groups = [aws_security_group.app_server.id]
description = "PostgreSQL from app servers"
}
# NO egress to internet
# NO ingress from public subnets
# NO ingress from VPN (except via app servers)
}
12.2 Network Architecture (Private Subnets)
VPC: 10.0.0.0/16
+------------------+ +------------------+ +------------------+
| Public Subnet | | Private Subnet | | Private Subnet |
| 10.0.1.0/24 | | 10.0.2.0/24 | | 10.0.3.0/24 |
| | | | | |
| - Load Balancer |----->| - App Servers |----->| - Database |
| - NAT Gateway | | - API Workers | | - Redis |
| - VPN Gateway | | - AI Workers | | - Message Queue |
| - Bastion Host | | | | |
+------------------+ +------------------+ +------------------+
Internet access:
- Public subnet: Direct via IGW
- Private subnets: Via NAT Gateway only (egress only)
- Database subnet: NO internet access
Inter-subnet routing:
- App servers -> Database: ALLOWED (via security group)
- Database -> App servers: DENIED (no return path needed)
- Internet -> Database: DENIED
12.3 Bastion Host (Emergency Access)
# Terraform — Bastion Host Configuration
resource "aws_instance" "bastion" {
ami = "ami-ubuntu-22.04-lts"
instance_type = "t3.micro"
subnet_id = aws_subnet.public.id
# Security group
vpc_security_group_ids = [aws_security_group.bastion.id]
# Key pair only — no password
key_name = "emergency-access-key"
# Instance metadata protection
metadata_options {
http_tokens = "required" # IMDSv2
http_put_response_hop_limit = 1
http_endpoint = "enabled"
}
# Detailed monitoring
monitoring = true
tags = {
Name = "surveillance-bastion"
Purpose = "Emergency access only"
}
}
resource "aws_security_group" "bastion" {
name = "surveillance-bastion"
# SSH from specific admin IPs only
ingress {
from_port = 22
to_port = 22
protocol = "tcp"
cidr_blocks = var.emergency_admin_ips # Office IPs only
description = "SSH from authorized admin IPs"
}
# No other ingress
egress {
from_port = 22
to_port = 22
protocol = "tcp"
cidr_blocks = [aws_subnet.app.cidr_block]
description = "SSH to app servers only"
}
}
# Bastion host hardening
# /etc/ssh/sshd_config
Port 2222 # Non-standard port
PasswordAuthentication no
PubkeyAuthentication yes
PermitRootLogin no
MaxAuthTries 2
ClientAliveInterval 60
ClientAliveCountMax 3
# Audit all commands
export PROMPT_COMMAND='RETRN_VAL=$?;logger -p local6.debug "$(whoami) [$$]: $(history 1 | sed "s/^[ ]*[0-9]\+[ ]*//" ) [$RETRN_VAL]"'
# Session recording
# Using script or ttyrec for complete session capture
12.4 Container Security
# Kubernetes Pod Security Standard — Restricted
apiVersion: v1
kind: Pod
metadata:
name: surveillance-app
namespace: surveillance
spec:
securityContext:
runAsNonRoot: true
runAsUser: 1000
runAsGroup: 1000
fsGroup: 1000
seccompProfile:
type: RuntimeDefault
containers:
- name: app
image: surveillance/app:v1.2.3
securityContext:
allowPrivilegeEscalation: false
readOnlyRootFilesystem: true
capabilities:
drop:
- ALL
seccompProfile:
type: RuntimeDefault
resources:
limits:
memory: "512Mi"
cpu: "500m"
requests:
memory: "256Mi"
cpu: "250m"
volumeMounts:
- name: tmp
mountPath: /tmp
- name: cache
mountPath: /app/cache
# No hostPath, no privileged mode
# No hostNetwork, no hostPID
volumes:
- name: tmp
emptyDir:
sizeLimit: 100Mi
- name: cache
emptyDir:
sizeLimit: 200Mi
12.5 Image Vulnerability Scanning
# CI/CD pipeline — Security scanning
stages:
- build
- scan
- deploy
build:
script:
- docker build -t $IMAGE:$TAG .
- docker push $IMAGE:$TAG
scan:
script:
# Trivy vulnerability scanner
- trivy image --exit-code 1 --severity HIGH,CRITICAL $IMAGE:$TAG
# Snyk container scan
- snyk container test $IMAGE:$TAG --severity-threshold=high
# Image signing with Cosign
- cosign sign --key $COSIGN_KEY $IMAGE:$TAG
allow_failure: false # Block deployment on vulnerabilities
deploy:
script:
# Verify image signature before deployment
- cosign verify --key $COSIGN_PUBKEY $IMAGE:$TAG
- kubectl apply -f k8s/
12.6 Infrastructure as Code Security
# Terraform security checks
# tfsec configuration
tfsec:
- rule: aws-vpc-no-public-egress-sgr
severity: HIGH
- rule: aws-ec2-enforce-http-token-imds
severity: CRITICAL
- rule: aws-rds-enable-performance-insights-encryption
severity: HIGH
- rule: aws-s3-enable-bucket-encryption
severity: CRITICAL
- rule: aws-s3-block-public-acls
severity: HIGH
13. Secrets Rotation Policy
13.1 Rotation Schedule
| Secret Type | Rotation Frequency | Method | Automation |
|---|---|---|---|
| Database Passwords | 90 days | Manual with maintenance window | Terraform + Vault |
| JWT Signing Keys | 180 days | Grace period with dual-key support | Automated (Vault) |
| API Keys (Internal) | 90 days | Zero-downtime rotation | Automated |
| Telegram Bot Tokens | 180 days or on suspicion | Revoke + regenerate via BotFather | Semi-automated |
| WhatsApp API Keys | 180 days or on suspicion | Regenerate via Meta dashboard | Semi-automated |
| TLS Certificates | 60 days (Let's Encrypt auto) | ACME auto-renewal | Fully automated |
| WireGuard Keys | 365 days | Planned maintenance window | Scripted |
| DVR Credentials | 180 days | Manual DVR interface change | Manual |
| Backup Encryption Keys | 365 days | Re-encrypt all backups | Automated |
| KMS/Data Keys | 365 days or on employee exit | Automatic via cloud provider | Cloud-managed |
| Session Secrets | On security incident | Immediate revocation | Admin trigger |
13.2 Zero-Downtime Key Rotation Procedure
class KeyRotationManager:
"""
Zero-downtime JWT key rotation using dual-key support.
"""
async def rotate_jwt_keys(self):
"""
1. Generate new key pair
2. Publish new public key alongside old one
3. New tokens signed with new key
4. Old tokens still valid (verified with old key)
5. After grace period, retire old key
"""
# 1. Generate new ECDSA key pair
new_private_key = ec.generate_private_key(ec.SECP256R1())
new_public_key = new_private_key.public_key()
# 2. Store new key with activation timestamp
await vault.store_jwt_key(
key_id="jwt-key-2024-02",
private_key=serialize_private(new_private_key),
public_key=serialize_public(new_public_key),
status="active",
created_at=datetime.utcnow(),
grace_period_end=datetime.utcnow() + timedelta(days=7)
)
# 3. Mark previous key as "retiring"
await vault.update_jwt_key_status(
key_id="jwt-key-2024-01",
status="retiring",
retire_at=datetime.utcnow() + timedelta(days=7)
)
# 4. Token manager now signs with new key
# but still accepts tokens signed with old key
# 5. Schedule final retirement
await scheduler.schedule(
run_at=datetime.utcnow() + timedelta(days=7),
task=self._retire_old_key,
key_id="jwt-key-2024-01"
)
async def verify_token_with_rotation(self, token: str) -> dict:
"""Verify token against active or retiring keys."""
# Try active key first
active_key = await vault.get_active_jwt_key()
try:
return jwt.decode(token, active_key.public_key,
algorithms=["ES256"])
except jwt.InvalidSignatureError:
pass
# Try retiring keys (grace period)
retiring_keys = await vault.get_retiring_jwt_keys()
for key in retiring_keys:
try:
payload = jwt.decode(token, key.public_key,
algorithms=["ES256"])
# Token is valid but should be refreshed
payload["_refresh_recommended"] = True
return payload
except jwt.InvalidSignatureError:
continue
raise jwt.InvalidTokenError("No valid signing key found")
13.3 Telegram/WhatsApp Token Rotation
class TokenRotationService:
"""Automated rotation for third-party service tokens."""
async def rotate_telegram_token(self, bot_id: str):
"""
1. Generate new token via BotFather API
2. Update vault with new token
3. Restart services to pick up new token
4. Revoke old token after grace period
"""
# Get current token (needed for BotFather API call)
current_token = await vault.get_telegram_token(bot_id)
# Generate new token (via BotFather or Telegram API)
new_token = await telegram_api.revoke_and_generate(
current_token
)
# Update vault with versioning
await vault.store_telegram_token(
bot_id=bot_id,
token=new_token,
version=await vault.get_next_version(f"integrations/telegram/{bot_id}")
)
# Notify services to reload
await service_mesh.notify_token_update(
service="telegram-bot",
bot_id=bot_id
)
# Grace period: 5 minutes
await asyncio.sleep(300)
# Revoke old token
await telegram_api.revoke_token(current_token)
audit_logger.log({
"event_type": "TOKEN_ROTATED",
"service": "telegram",
"bot_id": bot_id,
"rotated_at": datetime.utcnow().isoformat()
})
async def rotate_all_tokens(self):
"""Rotate all tokens on security incident."""
tasks = []
# Get all integrations
integrations = await vault.list_integrations()
for integration in integrations:
if integration.type == "telegram":
tasks.append(self.rotate_telegram_token(integration.id))
elif integration.type == "whatsapp":
tasks.append(self.rotate_whatsapp_token(integration.id))
elif integration.type == "api_key":
tasks.append(self.rotate_api_key(integration.id))
# Rotate all in parallel
results = await asyncio.gather(*tasks, return_exceptions=True)
# Report results
for integration, result in zip(integrations, results):
if isinstance(result, Exception):
audit_logger.log({
"event_type": "TOKEN_ROTATION_FAILED",
"service": integration.type,
"id": integration.id,
"error": str(result)
})
13.4 Certificate Rotation
#!/bin/bash
# /usr/local/bin/rotate-certs.sh
# Automated certificate rotation via Let's Encrypt
DOMAIN="surveillance.example.com"
CERT_DIR="/etc/letsencrypt/live/$DOMAIN"
NGINX_CONTAINER="surveillance-nginx"
# Check certificate expiry (renew if < 30 days remaining)
EXPIRY_DAYS=$(openssl x509 -in "$CERT_DIR/cert.pem" -noout -dates | \
grep notAfter | cut -d= -f2 | \
xargs -I {} date -d "{}" +%s)
CURRENT_DAYS=$(date +%s)
DAYS_UNTIL_EXPIRY=$(( (EXPIRY_DAYS - CURRENT_DAYS) / 86400 ))
if [ "$DAYS_UNTIL_EXPIRY" -lt 30 ]; then
echo "Certificate expires in $DAYS_UNTIL_EXPIRY days. Renewing..."
# Renew certificate
certbot renew --force-renewal --quiet
if [ $? -eq 0 ]; then
# Reload nginx without dropping connections
docker kill --signal HUP "$NGINX_CONTAINER"
echo "Certificate renewed and nginx reloaded"
# Notify
curl -X POST "https://alerts.surveillance.example.com/cert-renewed" \
-d "{\"domain\":\"$DOMAIN\",\"expires\":\"$(date -d '+90 days' +%Y-%m-%d)\"}"
else
echo "Certificate renewal FAILED!"
# Critical alert
curl -X POST "https://alerts.surveillance.example.com/critical" \
-d "{\"alert\":\"CERT_RENEWAL_FAILED\",\"domain\":\"$DOMAIN\"}"
fi
else
echo "Certificate valid for $DAYS_UNTIL_EXPIRY days. No action needed."
fi
13.5 Credential Rotation Procedure
## Credential Rotation Checklist
### Pre-Rotation
- [ ] Announce maintenance window to users
- [ ] Verify backup of current credentials
- [ ] Prepare rollback procedure
- [ ] Have emergency contact available
### During Rotation
- [ ] Generate new credentials
- [ ] Update Vault/secret store
- [ ] Update dependent services
- [ ] Verify services pick up new credentials
- [ ] Test critical functionality
- [ ] Monitor error rates
### Post-Rotation
- [ ] Revoke old credentials
- [ ] Verify old credentials no longer work
- [ ] Update credential inventory
- [ ] Document rotation in audit log
- [ ] Notify stakeholders of completion
### Emergency Rotation (Security Incident)
- [ ] Immediately revoke suspected credentials
- [ ] Generate new credentials
- [ ] Force logout all sessions
- [ ] Audit all access with old credentials
- [ ] Notify security team
- [ ] Document incident
14. Incident Response Plan
14.1 Security Event Detection
class SecurityEventDetector:
"""
Real-time detection of security events using rule-based
and behavioral analysis.
"""
DETECTION_RULES = {
"BRUTE_FORCE_LOGIN": {
"description": "Multiple failed login attempts",
"condition": "login_failures > 5 FROM same_ip IN 5_minutes",
"severity": "HIGH",
"auto_response": "block_ip"
},
"CREDENTIAL_STUFFING": {
"description": "Login attempts with many different usernames",
"condition": "unique_usernames > 10 FROM same_ip IN 5_minutes",
"severity": "HIGH",
"auto_response": "block_ip"
},
"IMPOSSIBLE_TRAVEL": {
"description": "Login from geographically distant locations",
"condition": "distance_between_logins > 500km IN 1_hour",
"severity": "MEDIUM",
"auto_response": "require_mfa"
},
"PRIVILEGE_ESCALATION": {
"description": "Unusual admin activity",
"condition": "admin_actions > 20 IN 10_minutes FROM user_with_no_history",
"severity": "MEDIUM",
"auto_response": "alert_security_team"
},
"DATA_EXFILTRATION": {
"description": "Unusual data download volume",
"condition": "data_downloaded > 1GB IN 1_hour BY single_user",
"severity": "CRITICAL",
"auto_response": "suspend_account"
},
"OFF_HOURS_ADMIN": {
"description": "Admin activity outside business hours",
"condition": "admin_action BETWEEN 22:00 AND 06:00",
"severity": "LOW",
"auto_response": "log_only"
},
"MFA_BYPASS_ATTEMPT": {
"description": "Attempts to access without MFA",
"condition": "mfa_failures > 3 THEN success_without_mfa",
"severity": "CRITICAL",
"auto_response": "block_account"
},
"SUSPICIOUS_MEDIA_ACCESS": {
"description": "Access to sensitive media outside normal pattern",
"condition": "media_access_pattern_deviation > 3_sigma",
"severity": "MEDIUM",
"auto_response": "alert_dpo"
}
}
async def analyze_event(self, event: dict):
"""Analyze single event against detection rules."""
for rule_name, rule in self.DETECTION_RULES.items():
if await self.check_rule(rule, event):
await self.trigger_response(rule, event)
async def trigger_response(self, rule: dict, event: dict):
"""Execute automated response for detected threat."""
response = rule["auto_response"]
if response == "block_ip":
await self.block_ip(event["source_ip"], duration=3600)
elif response == "block_account":
await self.suspend_account(event["user_id"], reason=rule["description"])
elif response == "suspend_account":
await self.suspend_account(event["user_id"], reason=rule["description"])
elif response == "require_mfa":
await self.force_mfa_challenge(event["user_id"])
elif response == "alert_security_team":
await self.notify_security_team(rule, event)
elif response == "alert_dpo":
await self.notify_dpo(rule, event)
# Always log the detection
audit_logger.log({
"event_type": "SECURITY_EVENT_DETECTED",
"rule": rule_name,
"severity": rule["severity"],
"details": event,
"auto_response": response
})
14.2 Automated Brute Force Response
class AutomatedBruteForceResponse:
"""
Multi-layer automated response to brute force attacks.
"""
async def block_ip(self, ip: str, duration: int = 3600,
reason: str = "brute_force"):
"""Block IP at firewall level."""
# Add to dynamic blocklist
await redis.setex(f"blocklist:ip:{ip}", duration, reason)
# Update firewall (if using dynamic firewall)
await firewall.add_deny_rule(
source=ip,
duration=duration,
reason=reason
)
# Report to threat intelligence
await threat_intel.report(
ip=ip,
category="brute_force",
confidence="high"
)
audit_logger.log({
"event_type": "IP_BLOCKED",
"ip": ip,
"duration": duration,
"reason": reason
})
async def progressive_response(self, ip: str,
failure_count: int):
"""Escalate response based on failure count."""
responses = {
5: {"action": "captcha_required", "duration": 300},
10: {"action": "rate_limit", "duration": 600},
15: {"action": "temporary_block", "duration": 1800},
25: {"action": "extended_block", "duration": 86400},
50: {"action": "permanent_block", "duration": None}
}
for threshold, response in sorted(responses.items()):
if failure_count >= threshold:
await self.execute_response(ip, response)
async def execute_response(self, ip: str, response: dict):
"""Execute the appropriate response action."""
action = response["action"]
if action == "captcha_required":
await redis.setex(f"captcha:required:{ip}",
response["duration"], "1")
elif action in ["temporary_block", "extended_block", "permanent_block"]:
await self.block_ip(ip, response["duration"])
14.3 Suspicious Admin Activity Alerting
class AdminActivityMonitor:
"""
Monitor admin activities for anomalous behavior.
Alert on patterns that may indicate compromised accounts
or insider threats.
"""
SUSPICIOUS_PATTERNS = [
{
"name": "bulk_export",
"description": "Exporting large amounts of person data",
"threshold": {"exports_per_hour": 5, "records_per_export": 100}
},
{
"name": "after_hours_access",
"description": "Admin activity during off-hours",
"threshold": {"local_hour_before": 6, "local_hour_after": 22}
},
{
"name": "privilege_abuse",
"description": "Using elevated permissions for non-admin tasks",
"threshold": {"super_admin_actions_daily": 20}
},
{
"name": "unusual_deletion",
"description": "Deleting data at unusual rate",
"threshold": {"deletions_per_hour": 10}
},
{
"name": "configuration_tampering",
"description": "Multiple security setting changes",
"threshold": {"security_config_changes_per_day": 3}
}
]
async def analyze_admin_activity(self, user_id: str,
action: dict):
"""Check admin action against suspicious patterns."""
for pattern in self.SUSPICIOUS_PATTERNS:
if await self.matches_pattern(user_id, action, pattern):
await self.generate_alert(user_id, action, pattern)
async def generate_alert(self, user_id: str, action: dict,
pattern: dict):
"""Generate and dispatch security alert."""
alert = {
"alert_id": str(uuid.uuid4()),
"timestamp": datetime.utcnow().isoformat(),
"severity": "HIGH",
"category": "SUSPICIOUS_ADMIN_ACTIVITY",
"pattern": pattern["name"],
"description": pattern["description"],
"user_id": hash_id(user_id),
"action": action,
"recommendation": "Review activity and verify with admin"
}
# Send to security team
await notify_security_team(alert)
# If critical pattern, also notify CISO
if pattern["name"] in ["bulk_export", "configuration_tampering"]:
await notify_ciso(alert)
audit_logger.log({
"event_type": "SUSPICIOUS_ACTIVITY_ALERT",
**alert
})
14.4 Breach Notification Procedure
## Data Breach Response and Notification Procedure
### Phase 1: Detection and Assessment (0-24 hours)
1. **Confirm Breach**
- Verify the security incident is a confirmed breach
- Determine scope: what data, how many records, what systems
- Assess whether personal data was accessed/exfiltrated
2. **Immediate Containment**
- Isolate affected systems
- Revoke compromised credentials
- Block malicious IPs/accounts
- Preserve evidence (forensic images)
3. **Assemble Response Team**
- Security Lead (incident commander)
- DPO (Data Protection Officer)
- Legal Counsel
- System Administrators
- Communications Lead
### Phase 2: Investigation (24-72 hours)
4. **Forensic Investigation**
- Determine attack vector
- Timeline of compromise
- Data accessed or exfiltrated
- Systems affected
5. **Impact Assessment**
- Number of data subjects affected
- Categories of personal data involved
- Risk level to data subjects
- Likely consequences
### Phase 3: Notification (Within 72 hours of discovery)
6. **Supervisory Authority Notification**
- Notify relevant Data Protection Authority
- Required information:
- Nature of breach
- Categories and approximate number of data subjects
- Likely consequences
- Measures taken/proposed
- Contact details for more information
7. **Data Subject Notification**
- Notify affected individuals if high risk
- Clear and plain language
- Description of incident
- Measures taken
- Recommendations for protection
### Phase 4: Recovery and Post-Incident
8. **System Recovery**
- Restore from clean backups
- Apply security patches
- Verify system integrity
- Gradual service restoration
9. **Post-Incident Review**
- Root cause analysis
- Lessons learned
- Security improvements
- Update incident response plan
- Update risk register
### Notification Templates
**To Supervisory Authority (within 72 hours):**
Date: [DATE] To: [DPA Name] From: [Organization] DPO
Subject: Personal Data Breach Notification — Article 33 GDPR
- Nature of breach: [Unauthorized access / Data exfiltration / ...]
- Categories of data: [Facial recognition data / Personal identifiers]
- Approximate number of data subjects: [NUMBER]
- Likely consequences: [DESCRIPTION]
- Measures taken: [CONTAINMENT ACTIONS]
- Contact: [DPO contact details]
**To Data Subjects (when required):**
Subject: Important Security Notice — [Organization]
We are writing to inform you of a security incident that may have affected your personal data held in our surveillance system.
What happened: [Brief description] What data: [Categories of data involved] What we are doing: [Remediation measures] What you can do: [Protective measures] Contact: [Contact for questions]
### Breach Severity Classification
| Level | Criteria | Notification Required |
|---|---|---|
| **Low** | No personal data accessed; system availability only | Internal only |
| **Medium** | Limited data accessed; no sensitive data | DPA notification |
| **High** | Sensitive personal data accessed; facial/biometric data | DPA + Data subjects |
| **Critical** | Large-scale biometric data exfiltration; ongoing threat | DPA + Data subjects + Public |
14.5 Security Alert Escalation Matrix
Detection -> Analysis -> Containment -> Eradication -> Recovery -> Post-Incident
| | | | | |
v v v v v v
Automated L1 Analyst Automated L2 Engineer L2 Engineer DPO Review
+ (15 min) Response (4 hours) (24 hours) (72 hours)
L1 Analyst | (immediate) | | |
(5 min) v | v v v
| Escalate to Security Forensic Service Report to
v L2 if needed Team Lead Analysis Restoration DPA if
Alert + | | | required
Dashboard L2 Engineer v v v |
(30 min) Communication Patch Validate v
| (stakeholders) Systems Systems Lessons
v | Learned
L3/External Sign-off
(if needed) |
| v
v Close Incident
Emergency
Response
15. Security Checklist
15.1 Pre-Deployment Security Checklist
SSL/TLS
- TLS 1.3 enabled; TLS 1.2 minimum
- Strong cipher suites configured (no CBC, no RC4, no 3DES)
- HSTS header with includeSubDomains and preload
- OCSP stapling enabled
- Certificate auto-renewal configured (Let's Encrypt or managed)
- Internal service-to-service TLS enabled
- mTLS between microservices configured
- Certificate expiry monitoring in place
Authentication
- Password policy enforced (12+ chars, complexity, history)
- Argon2id or bcrypt password hashing
- Rate limiting on login endpoints (5 attempts / 15 min)
- Account lockout with exponential backoff
- JWT with ES256 signing algorithm
- Short-lived access tokens (15 min) + refresh tokens
- Session binding to browser fingerprint
- Session fixation protection (regenerate on login)
- Idle timeout: 30 minutes
- Absolute timeout: 8 hours
- Maximum 3 concurrent sessions per user
- Force logout capability for admins
- MFA (TOTP) available and encouraged for admins
- HaveIBeenPwned API integration for password checking
Authorization (RBAC)
- 4 roles defined: Super Admin, Admin, Operator, Viewer
- Permission matrix documented and implemented
- Resource-level permissions (per-camera, per-zone)
- API endpoints protected with permission checks
- No privilege escalation via parameter manipulation
- Permission caching with invalidation on role change
- Default deny for unmapped permissions
VPN & Network
- WireGuard VPN between cloud and edge
- Preshared keys for all peer connections
- Strict AllowedIPs on all peers
- No direct DVR exposure to internet
- DVR has no default gateway
- Network segmentation (management VLAN + camera VLAN)
- Edge gateway firewall: default deny all
- Cloud firewall: database not publicly accessible
- Inter-VLAN routing restrictions enforced
- VPN endpoint not discoverable via scanning
Secret Management
- No secrets in source code
- No secrets in environment variables (files only)
- Vault (HashiCorp/AWS/Azure) for secret storage
- Database encryption at rest (AES-256-GCM)
- Field-level encryption for PII/biometric data
- Telegram/WhatsApp tokens encrypted in vault
- DVR credentials encrypted
- JWT keys managed by Vault with rotation
- Secret sanitization in all logs
- Memory-only secret caching with short TTL
Audit Logging
- All authentication events logged
- All authorization decisions logged
- All person data modifications logged
- All alert actions logged
- All configuration changes logged
- Tamper-resistant log chain (hash chain + signature)
- Centralized log aggregation (ELK/Loki)
- Log retention policy defined and enforced
- Immutable log storage for security events (WORM)
- Log access restricted to security team
- Real-time alerting on critical events
Media Access
- No direct object storage URLs exposed to users
- Signed URLs with expiration for all media access
- URLs bound to user session
- Time-limited access (default 5 minutes)
- Single-use tokens for downloads
- Access logging for every media request
- Retention-based auto-deletion configured
- Secure deletion (overwrite + verify)
- DPO notification for sensitive media access
API Security
- Rate limiting per endpoint per user
- Request size limits configured
- Input validation on all endpoints (Pydantic/strong types)
- Parameterized queries (no SQL injection)
- Output encoding for XSS prevention
- CSP headers configured
- CSRF tokens for state-changing operations
- CORS restricted to known origins only
- API authentication with Bearer tokens
- Security headers on all responses
- API versioning for backward compatibility
Session Security
- HttpOnly, Secure, SameSite=Strict cookies
- Session ID regeneration on login
- Session binding to IP/fingerprint
- Concurrent session limits enforced
- Idle and absolute timeouts enforced
- Server-side session invalidation capability
- Secure session storage (Redis with AUTH)
- Session cleanup on logout
Data Privacy (GDPR)
- Lawful basis documented for all processing
- Consent tracking implemented
- Consent withdrawal workflow
- Right to deletion implemented
- Right to access (data export) implemented
- Data minimization enforced (no raw images retained)
- Anonymization capability for analytics
- Privacy mode per camera implemented
- Privacy impact assessment completed
- DPO appointed and contactable
- Privacy notice displayed at camera entry points
- Data retention schedule enforced
- Cross-border transfer safeguards (if applicable)
Edge Gateway
- Minimal services running
- All unnecessary services disabled
- SSH key-based auth only (no passwords)
- SSH on LAN interface only
- Automatic security updates enabled
- File integrity monitoring configured
- Disk encryption (LUKS + TPM2 if available)
- Container runtime with security options
- Read-only container filesystems
- Non-root container execution
- Resource limits on containers
- No network access except via VPN
- Tamper detection alerts to cloud
Cloud Infrastructure
- Private subnets for all internal services
- Security groups: least privilege
- No public database access
- Bastion host for emergency access only
- IMDSv2 enforced (no IMDSv1)
- Container security (non-root, read-only, no new privileges)
- Image vulnerability scanning in CI/CD
- Image signing with Cosign
- Resource quotas and limits
- Network policies (Kubernetes)
- Pod Security Standards (Restricted)
Secrets Rotation
- Rotation schedule defined for all secret types
- Automated certificate rotation
- JWT key rotation with dual-key support
- Token rotation procedure for Telegram/WhatsApp
- Credential rotation checklist documented
- Emergency rotation procedure documented
- Rotation events logged to audit trail
Incident Response
- Detection rules configured for common threats
- Automated brute force blocking
- Suspicious admin activity alerting
- Breach notification procedure documented
- DPA notification template prepared
- Data subject notification template prepared
- Incident response team contacts documented
- Forensic evidence preservation procedure
- Post-incident review process defined
15.2 Ongoing Security Operations
Daily
- Review security alerts
- Check failed login attempts
- Verify backup completion
- Monitor certificate expiry
Weekly
- Review audit logs for anomalies
- Check for new CVEs affecting dependencies
- Verify VPN tunnel status
- Review access patterns
Monthly
- Review user access rights
- Check for unused accounts
- Verify log integrity
- Review firewall rules
- Test backup restoration
Quarterly
- Full penetration test
- RBAC review and cleanup
- Security policy review
- Disaster recovery drill
- Privacy impact assessment review
Annually
- Comprehensive security audit
- Update threat model
- Review and update incident response plan
- Security awareness training
- Third-party security assessment
16. Appendix: Configuration Templates
16.1 Complete Nginx Security Configuration
# /etc/nginx/nginx.conf
user nginx;
worker_processes auto;
error_log /var/log/nginx/error.log warn;
pid /var/run/nginx.pid;
events {
worker_connections 4096;
use epoll;
multi_accept on;
}
http {
include /etc/nginx/mime.types;
default_type application/octet-stream;
# Logging format with security-relevant fields
log_format security '$remote_addr - $remote_user [$time_local] '
'"$request" $status $body_bytes_sent '
'"$http_referer" "$http_user_agent" '
'$request_time $ssl_protocol $ssl_cipher '
'$http_x_forwarded_for';
access_log /var/log/nginx/access.log security;
# Security hardening
server_tokens off;
more_clear_headers Server;
# Connection and rate limiting
limit_req_zone $binary_remote_addr zone=login:10m rate=5r/m;
limit_req_zone $binary_remote_addr zone=api:10m rate=100r/m;
limit_conn_zone $binary_remote_addr zone=addr:10m;
# SSL configuration
ssl_protocols TLSv1.3 TLSv1.2;
ssl_ciphers 'ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305';
ssl_prefer_server_ciphers off;
ssl_session_cache shared:SSL:50m;
ssl_session_timeout 1d;
ssl_session_tickets off;
# Buffer sizes
client_body_buffer_size 16K;
client_header_buffer_size 1K;
client_max_body_size 10M;
large_client_header_buffers 4 8K;
# Timeouts
client_body_timeout 12;
client_header_timeout 12;
keepalive_timeout 15;
send_timeout 10;
# File handling
sendfile on;
tcp_nopush on;
tcp_nodelay on;
# Gzip (careful with BREACH)
gzip on;
gzip_vary on;
gzip_proxied any;
gzip_comp_level 5;
gzip_types text/plain text/css application/json application/javascript text/xml;
# Upstream
upstream app_servers {
least_conn;
server 10.0.2.10:8080 max_fails=3 fail_timeout=30s;
server 10.0.2.11:8080 max_fails=3 fail_timeout=30s;
keepalive 32;
}
include /etc/nginx/conf.d/*.conf;
}
# /etc/nginx/conf.d/surveillance.conf
server {
listen 80;
server_name surveillance.example.com;
# Redirect all HTTP to HTTPS
return 301 https://$server_name$request_uri;
}
server {
listen 443 ssl http2;
server_name surveillance.example.com;
ssl_certificate /etc/letsencrypt/live/surveillance.example.com/fullchain.pem;
ssl_certificate_key /etc/letsencrypt/live/surveillance.example.com/privkey.pem;
ssl_trusted_certificate /etc/letsencrypt/live/surveillance.example.com/chain.pem;
# OCSP stapling
ssl_stapling on;
ssl_stapling_verify on;
resolver 1.1.1.1 1.0.0.1 valid=300s;
resolver_timeout 5s;
# Security Headers
add_header Strict-Transport-Security "max-age=63072000; includeSubDomains; preload" always;
add_header X-Content-Type-Options "nosniff" always;
add_header X-Frame-Options "DENY" always;
add_header X-XSS-Protection "0" always;
add_header Referrer-Policy "strict-origin-when-cross-origin" always;
add_header Permissions-Policy "camera=(), microphone=(), geolocation=(), payment=()" always;
add_header Content-Security-Policy "default-src 'self'; script-src 'self' 'nonce-$request_id' https://cdn.jsdelivr.net; style-src 'self' 'unsafe-inline' https://fonts.googleapis.com; img-src 'self' data: blob: https://media.surveillance.example.com; media-src 'self' blob: https://media.surveillance.example.com; connect-src 'self' https://api.surveillance.example.com wss://realtime.surveillance.example.com; font-src 'self' https://fonts.gstatic.com; frame-ancestors 'none'; base-uri 'self'; form-action 'self'; upgrade-insecure-requests; block-all-mixed-content" always;
# Rate limiting for login
location /api/auth/login {
limit_req zone=login burst=3 nodelay;
limit_req_status 429;
proxy_pass http://app_servers;
include /etc/nginx/proxy_params.conf;
}
# API endpoints
location /api/ {
limit_req zone=api burst=20 nodelay;
limit_conn addr 10;
proxy_pass http://app_servers;
include /etc/nginx/proxy_params.conf;
# Additional API security headers
add_header Cache-Control "no-store, no-cache, must-revalidate" always;
add_header Pragma "no-cache" always;
# CORS preflight
if ($request_method = 'OPTIONS') {
add_header Access-Control-Allow-Origin "https://surveillance.example.com";
add_header Access-Control-Allow-Methods "GET, POST, PUT, DELETE, PATCH, OPTIONS";
add_header Access-Control-Allow-Headers "Authorization, Content-Type, X-CSRF-Token, X-Request-ID";
add_header Access-Control-Allow-Credentials "true";
add_header Access-Control-Max-Age "600";
add_header Content-Length 0;
return 204;
}
}
# WebSocket for real-time alerts
location /ws/ {
proxy_pass http://app_servers;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_read_timeout 86400;
include /etc/nginx/proxy_params.conf;
}
# Static files
location /static/ {
alias /var/www/static/;
expires 1d;
add_header Cache-Control "public, immutable";
# Additional security for static files
location ~* \.(js|css)$ {
add_header X-Content-Type-Options "nosniff";
}
}
# Health check (no auth)
location /health {
proxy_pass http://app_servers;
include /etc/nginx/proxy_params.conf;
access_log off;
}
# Deny access to hidden files
location ~ /\. {
deny all;
return 404;
}
# Security.txt (RFC 9116)
location /.well-known/security.txt {
alias /var/www/security.txt;
}
}
# /etc/nginx/proxy_params.conf
proxy_http_version 1.1;
proxy_cache_bypass $http_upgrade;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_set_header X-Forwarded-Host $host;
proxy_set_header X-Forwarded-Port $server_port;
proxy_set_header X-Request-ID $request_id;
# Hide upstream server info
proxy_hide_header X-Powered-By;
proxy_hide_header Server;
# Timeouts
proxy_connect_timeout 60s;
proxy_send_timeout 60s;
proxy_read_timeout 60s;
16.2 Traefik Security Configuration (Alternative)
# docker-compose.traefik.yml
services:
traefik:
image: traefik:v2.10
command:
- "--api.dashboard=false"
- "--providers.docker=true"
- "--providers.docker.exposedbydefault=false"
- "--entrypoints.web.address=:80"
- "--entrypoints.websecure.address=:443"
- "--entrypoints.web.http.redirections.entryPoint.to=websecure"
- "--entrypoints.web.http.redirections.entryPoint.scheme=https"
- "--entrypoints.web.http.redirections.entryPoint.permanent=true"
- "--certificatesresolvers.letsencrypt.acme.tlschallenge=true"
- "--certificatesresolvers.letsencrypt.acme.email=admin@example.com"
- "--certificatesresolvers.letsencrypt.acme.storage=/letsencrypt/acme.json"
- "--certificatesresolvers.letsencrypt.acme.caserver=https://acme-v02.api.letsencrypt.org/directory"
- "--log.level=WARN"
- "--accesslog=true"
- "--accesslog.format=json"
- "--ping=true"
- "--serverstransport.insecureskipverify=false"
ports:
- "80:80"
- "443:443"
volumes:
- /var/run/docker.sock:/var/run/docker.sock:ro
- ./letsencrypt:/letsencrypt
labels:
- "traefik.enable=true"
# Global middleware
- "traefik.http.middlewares.security-headers.headers.stsSeconds=63072000"
- "traefik.http.middlewares.security-headers.headers.stsIncludeSubdomains=true"
- "traefik.http.middlewares.security-headers.headers.stsPreload=true"
- "traefik.http.middlewares.security-headers.headers.forceStsHeader=true"
- "traefik.http.middlewares.security-headers.headers.contentTypeNosniff=true"
- "traefik.http.middlewares.security-headers.headers.browserXssFilter=false"
- "traefik.http.middlewares.security-headers.headers.frameDeny=true"
- "traefik.http.middlewares.security-headers.headers.referrerPolicy=strict-origin-when-cross-origin"
- "traefik.http.middlewares.security-headers.headers.permissionsPolicy=camera=(), microphone=(), geolocation=()"
- "traefik.http.middlewares.security-headers.headers.customFrameOptionsValue=DENY"
- "traefik.http.middlewares.security-headers.headers.contentSecurityPolicy=default-src 'self'; script-src 'self'; style-src 'self' 'unsafe-inline'; img-src 'self' data:;"
# Rate limiting
- "traefik.http.middlewares.rate-limit.ratelimit.average=100"
- "traefik.http.middlewares.rate-limit.ratelimit.burst=50"
- "traefik.http.middlewares.rate-limit.ratelimit.period=1m"
# Compress
- "traefik.http.middlewares.compress.compress=true"
app:
image: surveillance-app:latest
labels:
- "traefik.enable=true"
- "traefik.http.routers.app.rule=Host(`surveillance.example.com`)"
- "traefik.http.routers.app.entrypoints=websecure"
- "traefik.http.routers.app.tls.certresolver=letsencrypt"
- "traefik.http.routers.app.middlewares=security-headers,rate-limit,compress"
- "traefik.http.services.app.loadbalancer.server.port=8080"
- "traefik.http.services.app.loadbalancer.healthcheck.path=/health"
- "traefik.http.services.app.loadbalancer.healthcheck.interval=10s"
16.3 Database Encryption Setup (PostgreSQL)
-- Enable pgcrypto extension for encryption functions
CREATE EXTENSION IF NOT EXISTS pgcrypto;
-- Table for encrypted person data
CREATE TABLE persons (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name_encrypted BYTEA NOT NULL, -- AES-256-GCM encrypted
email_encrypted BYTEA,
phone_encrypted BYTEA,
employee_id_encrypted BYTEA,
face_encoding_encrypted BYTEA, -- Biometric data
department_encrypted BYTEA,
consent_status VARCHAR(20) NOT NULL DEFAULT 'pending',
privacy_level VARCHAR(20) NOT NULL DEFAULT 'standard',
anonymized BOOLEAN NOT NULL DEFAULT FALSE,
anonymous_id VARCHAR(20),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
created_by UUID NOT NULL,
updated_by UUID NOT NULL,
-- Constraints
CONSTRAINT valid_consent CHECK (consent_status IN ('pending', 'granted', 'denied', 'withdrawn')),
CONSTRAINT valid_privacy CHECK (privacy_level IN ('standard', 'sensitive', 'restricted'))
);
-- Index for non-encrypted fields (performance)
CREATE INDEX idx_persons_consent ON persons(consent_status);
CREATE INDEX idx_persons_privacy ON persons(privacy_level);
CREATE INDEX idx_persons_anonymized ON persons(anonymized);
-- Audit log table with integrity chain
CREATE TABLE audit_log (
event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
event_type VARCHAR(50) NOT NULL,
severity VARCHAR(10) NOT NULL,
actor_user_id UUID,
actor_username VARCHAR(100),
actor_role VARCHAR(30),
actor_ip INET,
actor_session_id VARCHAR(100),
resource_type VARCHAR(30),
resource_id UUID,
action_type VARCHAR(20),
action_details JSONB,
result VARCHAR(20),
previous_hash VARCHAR(64),
entry_hash VARCHAR(64) NOT NULL,
signature VARCHAR(128) NOT NULL,
metadata JSONB,
CONSTRAINT valid_severity CHECK (severity IN ('INFO', 'NOTICE', 'WARNING', 'ERROR', 'CRITICAL')),
CONSTRAINT valid_result CHECK (result IN ('SUCCESS', 'FAILURE', 'DENIED', 'ERROR'))
);
-- Indexes for audit log queries
CREATE INDEX idx_audit_timestamp ON audit_log(timestamp);
CREATE INDEX idx_audit_event_type ON audit_log(event_type);
CREATE INDEX idx_audit_user ON audit_log(actor_user_id);
CREATE INDEX idx_audit_resource ON audit_log(resource_type, resource_id);
CREATE INDEX idx_audit_severity ON audit_log(severity) WHERE severity IN ('WARNING', 'ERROR', 'CRITICAL');
-- Partition audit log by month for performance
CREATE TABLE audit_log_2024_01 PARTITION OF audit_log
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
-- Row Level Security (RLS) for multi-tenant data access
ALTER TABLE persons ENABLE ROW LEVEL SECURITY;
CREATE POLICY person_access_policy ON persons
FOR ALL
TO application_role
USING (
-- Users can only see non-anonymized persons in their scope
anonymized = FALSE
AND privacy_level <= current_setting('app.privacy_clearance')::int
);
-- Media access log
CREATE TABLE media_access_log (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
user_id UUID NOT NULL,
media_id UUID NOT NULL,
camera_id UUID,
access_type VARCHAR(20) NOT NULL,
delivery_method VARCHAR(20),
quality_requested VARCHAR(10),
contains_persons BOOLEAN,
consent_verified BOOLEAN,
ip_address INET,
session_id VARCHAR(100),
retention_classification VARCHAR(30),
CONSTRAINT valid_access_type CHECK (access_type IN ('view', 'download', 'export', 'stream', 'api'))
);
CREATE INDEX idx_media_access_user ON media_access_log(user_id, timestamp);
CREATE INDEX idx_media_access_media ON media_access_log(media_id, timestamp);
CREATE INDEX idx_media_access_timestamp ON media_access_log(timestamp);
-- Automatic partition maintenance function
CREATE OR REPLACE FUNCTION create_audit_partition()
RETURNS void AS $$
DECLARE
partition_date DATE;
partition_name TEXT;
start_date DATE;
end_date DATE;
BEGIN
partition_date := DATE_TRUNC('month', NOW() + INTERVAL '1 month');
partition_name := 'audit_log_' || TO_CHAR(partition_date, 'YYYY_MM');
start_date := partition_date;
end_date := partition_date + INTERVAL '1 month';
EXECUTE format(
'CREATE TABLE IF NOT EXISTS %I PARTITION OF audit_log FOR VALUES FROM (%L) TO (%L)',
partition_name, start_date, end_date
);
END;
$$ LANGUAGE plpgsql;
16.4 Docker Compose — Full Stack Security
# docker-compose.yml — Production Security Hardened
version: "3.8"
services:
# Application
app:
image: surveillance-app:${APP_VERSION}
read_only: true
user: "1000:1000"
security_opt:
- no-new-privileges:true
cap_drop:
- ALL
cap_add:
- NET_BIND_SERVICE
tmpfs:
- /tmp:noexec,nosuid,size=100m
- /app/tmp:noexec,nosuid,size=50m
environment:
- DATABASE_URL_FILE=/run/secrets/database_url
- JWT_PRIVATE_KEY_FILE=/run/secrets/jwt_private_key
- KMS_KEY_ID_FILE=/run/secrets/kms_key_id
- REDIS_URL_FILE=/run/secrets/redis_url
- VAULT_TOKEN_FILE=/run/secrets/vault_token
secrets:
- database_url
- jwt_private_key
- kms_key_id
- redis_url
- vault_token
deploy:
resources:
limits:
cpus: '2.0'
memory: 1G
reservations:
cpus: '0.5'
memory: 256M
replicas: 2
update_config:
parallelism: 1
delay: 10s
failure_action: rollback
restart_policy:
condition: on-failure
delay: 5s
max_attempts: 3
networks:
- app_network
- database_network
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 40s
# Nginx reverse proxy
nginx:
image: nginx:alpine
read_only: true
ports:
- "80:80"
- "443:443"
tmpfs:
- /var/cache/nginx:noexec,nosuid,size=50m
- /var/run:noexec,nosuid,size=10m
- /tmp:noexec,nosuid,size=10m
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
- ./ssl:/etc/nginx/ssl:ro
- ./static:/var/www/static:ro
- ./security.txt:/var/www/security.txt:ro
- ./letsencrypt:/etc/letsencrypt:ro
networks:
- app_network
depends_on:
- app
deploy:
resources:
limits:
cpus: '0.5'
memory: 128M
healthcheck:
test: ["CMD", "wget", "--quiet", "--tries=1", "--spider", "http://localhost/health"]
interval: 30s
timeout: 10s
retries: 3
# Database
database:
image: postgres:15-alpine
user: "999:999" # postgres user
read_only: true
environment:
- POSTGRES_USER_FILE=/run/secrets/db_user
- POSTGRES_PASSWORD_FILE=/run/secrets/db_password
- POSTGRES_DB=surveillance
- PGDATA=/var/lib/postgresql/data/pgdata
volumes:
- postgres_data:/var/lib/postgresql/data
- ./init-scripts:/docker-entrypoint-initdb.d:ro
- /etc/localtime:/etc/localtime:ro
tmpfs:
- /tmp:noexec,nosuid,size=100m
- /var/run/postgresql:noexec,nosuid,size=10m
secrets:
- db_user
- db_password
networks:
- database_network
deploy:
resources:
limits:
cpus: '2.0'
memory: 2G
reservations:
memory: 512M
healthcheck:
test: ["CMD-SHELL", "pg_isready -U $$(cat /run/secrets/db_user) -d surveillance"]
interval: 10s
timeout: 5s
retries: 5
start_period: 30s
# Redis session cache
redis:
image: redis:7-alpine
user: "999:999"
read_only: true
command: redis-server --requirepass $(cat /run/secrets/redis_password) --maxmemory 256mb --maxmemory-policy allkeys-lru --appendonly yes
volumes:
- redis_data:/data
tmpfs:
- /tmp:noexec,nosuid,size=50m
secrets:
- redis_password
networks:
- app_network
- database_network
deploy:
resources:
limits:
memory: 256M
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 3
# Audit log forwarder
filebeat:
image: docker.elastic.co/beats/filebeat:8.11
user: "0:0"
read_only: true
volumes:
- /var/log/surveillance:/logs:ro
- ./filebeat.yml:/usr/share/filebeat/filebeat.yml:ro
- filebeat_registry:/usr/share/filebeat/data
tmpfs:
- /tmp:noexec,nosuid,size=50m
networks:
- app_network
deploy:
resources:
limits:
memory: 128M
networks:
app_network:
driver: bridge
internal: false
database_network:
driver: bridge
internal: true # No external access
volumes:
postgres_data:
driver: local
redis_data:
driver: local
filebeat_registry:
driver: local
secrets:
database_url:
external: true
jwt_private_key:
external: true
kms_key_id:
external: true
redis_url:
external: true
vault_token:
external: true
db_user:
external: true
db_password:
external: true
redis_password:
external: true
16.5 Kubernetes Security Policies
# network-policy.yaml — Isolate namespaces
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: default-deny-all
namespace: surveillance
spec:
podSelector: {}
policyTypes:
- Ingress
- Egress
---
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: app-allow-ingress
namespace: surveillance
spec:
podSelector:
matchLabels:
app: surveillance-app
policyTypes:
- Ingress
ingress:
- from:
- namespaceSelector:
matchLabels:
name: ingress-nginx
ports:
- protocol: TCP
port: 8080
---
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: app-allow-egress
namespace: surveillance
spec:
podSelector:
matchLabels:
app: surveillance-app
policyTypes:
- Egress
egress:
- to:
- podSelector:
matchLabels:
app: postgres
ports:
- protocol: TCP
port: 5432
- to:
- podSelector:
matchLabels:
app: redis
ports:
- protocol: TCP
port: 6379
- to:
- namespaceSelector:
matchLabels:
name: vault
ports:
- protocol: TCP
port: 8200
- to: [] # DNS
ports:
- protocol: UDP
port: 53
# pod-security-policy.yaml
apiVersion: policy/v1beta1
kind: PodSecurityPolicy
metadata:
name: surveillance-restricted
spec:
privileged: false
allowPrivilegeEscalation: false
requiredDropCapabilities:
- ALL
volumes:
- 'configMap'
- 'emptyDir'
- 'projected'
- 'secret'
- 'downwardAPI'
- 'persistentVolumeClaim'
runAsUser:
rule: 'MustRunAsNonRoot'
seLinux:
rule: 'RunAsAny'
fsGroup:
rule: 'RunAsAny'
runAsGroup:
rule: 'RunAsAny'
supplementalGroups:
rule: 'RunAsAny'
readOnlyRootFilesystem: true
16.6 Security Headers Reference
| Header | Value | Purpose |
|---|---|---|
Strict-Transport-Security |
max-age=63072000; includeSubDomains; preload |
Force HTTPS |
X-Content-Type-Options |
nosniff |
Prevent MIME sniffing |
X-Frame-Options |
DENY |
Prevent clickjacking |
X-XSS-Protection |
0 |
Disabled (CSP preferred) |
Referrer-Policy |
strict-origin-when-cross-origin |
Limit referrer info |
Permissions-Policy |
camera=(), microphone=(), geolocation=() |
Restrict browser features |
Content-Security-Policy |
See full policy above | Prevent XSS, injection |
Cache-Control |
no-store, no-cache, must-revalidate |
Prevent sensitive caching |
Clear-Site-Data |
"cookies", "storage", "cache" |
Clear data on logout |
16.7 Security Tools Inventory
| Category | Tool | Purpose |
|---|---|---|
| SSL Testing | testssl.sh, SSL Labs | TLS configuration validation |
| Vulnerability Scanning | Trivy, Snyk | Container/image scanning |
| Dependency Check | Snyk, OWASP DC | Dependency vulnerability scanning |
| Static Analysis | Bandit (Python), Semgrep | Code security scanning |
| Dynamic Analysis | OWASP ZAP | Runtime vulnerability testing |
| Secret Detection | TruffleHog, GitLeaks | Secret scanning in code |
| Penetration Testing | Burp Suite, Metasploit | Security assessment |
| Compliance | tfsec, Checkov | IaC security scanning |
| Log Analysis | ELK Stack, Loki | Log aggregation and analysis |
| Monitoring | Prometheus + Alertmanager | Security metric alerting |
| Network Scanning | Nmap | Network discovery and audit |
| Fuzzing | OWASP ZAP Fuzzer | Input validation testing |
Document Information
Version History:
| Version | Date | Author | Changes |
|---|---|---|---|
| 1.0 | 2024-01-15 | Security Architecture Team | Initial comprehensive security architecture |
Approval:
| Role | Name | Date | Signature |
|---|---|---|---|
| CISO | [Name] | ||
| DPO | [Name] | ||
| CTO | [Name] |
Review Schedule: Quarterly
Next Review Date: 2024-04-15
This document contains sensitive security configuration information. Distribution is restricted to authorized security and infrastructure personnel. Handle as CONFIDENTIAL — Unauthorized disclosure prohibited.