AI Surveillance Platform — Notification & Alerting System Design
Document Information
| Property | Value |
|---|---|
| Version | 1.0.0 |
| Status | Draft |
| Scope | Multi-channel notification system (Telegram + WhatsApp Business API) |
| Platform | AI Surveillance Platform — 8 cameras, real-time person detection & watchlist matching |
| Target Users | Non-technical admin (configuration), Security personnel (alert recipients) |
Table of Contents
- System Architecture Overview
- Telegram Integration
- WhatsApp Business API Integration
- Alert Routing Rules Engine
- Recipient Groups
- Quiet Hours
- Message Templates
- Retry Logic & Failure Handling
- Rate Limiting & Throttling
- Delivery Tracking
- Escalation Engine
- Media Attachments
- Notification Service API
- Configuration Schema
- Database Schema
- Deployment & Security
- Appendix: Delivery Flow Diagrams
1. System Architecture Overview
1.1 High-Level Architecture
+------------------------------------------------------------------+
| AI SURVEILLANCE PLATFORM |
| |
| +----------------+ +----------------+ +---------------------+ |
| | Person | | Watchlist | | Suspicious Activity | |
| | Detection | | Matching | | Detection | |
| +--------+-------+ +--------+-------+ +----------+----------+ |
| | | | |
+-----------|-------------------|--------------------|---------------+
| | |
v v v
+-----------+-------------------+--------------------+---------------+
| EVENT BUS (Redis Pub/Sub) |
+-----------+-------------------+--------------------+---------------+
| |
v v
+-----------+-------------------+ +-------------------------------+
| NOTIFICATION SERVICE | | MEDIA PROCESSING SERVICE |
| | | |
| +------------------------+ | | +-------------------------+ |
| | Alert Router Engine | | | | Image Resizer | |
| | - Rule Evaluation | | | | - Telegram: up to 10MB | |
| | - Severity Assignment | | | | - WhatsApp: up to 16MB | |
| | - Recipient Resolution | | | | - Thumbnail generation | |
| +------------------------+ | | +-------------------------+ |
| | | |
| +------------------------+ | | +-------------------------+ |
| | Template Engine | | | | Video Clip Processor | |
| | - Placeholder rendering| | | | - Telegram: up to 50MB | |
| | - Multi-lang support | | | | - WhatsApp: up to 16MB | |
| | - Format conversion | | | | - Format transcoding | |
| +------------------------+ | | +-------------------------+ |
| | | |
| +------------------------+ | | +-------------------------+ |
| | Queue Manager | | | | Temporary URL Generator | |
| | - Priority queues | | | | - Signed URLs w/ expiry | |
| | - Rate limiting | | | | - S3/MinIO presigned | |
| | - Deduplication | | | +-------------------------+ |
| +------------------------+ | +-------------------------------+
| |
| +------------------------+ |
| | Retry Manager | |
| | - Exponential backoff | |
| | - Dead letter queue | |
| | - Manual retry API | |
| +------------------------+ |
| |
| +------------------------+ |
| | Escalation Engine | |
| | - Timer-based triggers | |
| | - Level progression | |
| | - Recipient expansion | |
| +------------------------+ |
+-----------+-------+----------+
| |
+--------+ +--------+
| |
v v
+--+----------------+ +-----+------------------+
| TELEGRAM BOT API | | WHATSAPP BUSINESS API |
| | | (Meta Official) |
| - sendMessage | | - Messages endpoint |
| - sendPhoto | | - Template messages |
| - sendVideo | | - Media upload |
| - Inline keyboards | | - Webhook receipts |
+--------------------+ +-----------------------+
1.2 Core Components
| Component | Technology | Purpose |
|---|---|---|
| Event Bus | Redis Pub/Sub | Decouple detection services from notification service |
| Notification Service | Python/FastAPI | Core orchestration, routing, templating |
| Queue Manager | Redis Streams + Celery | Priority queues, rate limiting, task distribution |
| Media Processor | FFmpeg + Pillow | Image resizing, video compression, format conversion |
| Object Storage | MinIO (S3-compatible) | Media storage with presigned URL generation |
| Database | PostgreSQL | Persistent storage for configs, delivery status, audit logs |
| Cache | Redis | Rate limit counters, deduplication keys, session state |
1.3 Alert Lifecycle
DETECTION EVENT
|
v
[Event Normalized] --> {camera_id, event_type, person_id, timestamp, image_url, video_url, severity}
|
v
[Router Engine Evaluates Rules] --> matching_rules[]
|
v
[Recipients Resolved] --> recipient_groups[] + individual_recipients[]
|
v
[Template Selected & Rendered] --> message_text + formatting
|
v
[Media Processed] --> resized_image + compressed_video (if needed)
|
v
[Rate Limit Check] --> pass / throttle / deduplicate
|
v
[Queued per Channel] --> Telegram queue / WhatsApp queue
|
v
[Channel Adapter Sends] --> HTTP request to provider API
|
v
[Delivery Status Tracked] --> pending -> sent -> delivered/failed
|
v
[If failed] --> Retry Manager (exponential backoff, max 5)
|
v
[If unacknowledged] --> Escalation Engine (after threshold)
|
v
[Audit Log Written] --> persistent record with full trace
2. Telegram Integration
2.1 Bot Configuration
Encrypted Token Storage
# config/telegram.yml (stored encrypted at rest)
telegram:
enabled: true
bot_token: "${ENCRYPTED:ENC[PKCS7,MIIBkAYJKoZIhvcNAQcDoIIB...]}" # Decrypted at runtime
bot_username: "SurveillanceAlertBot"
webhook_url: "https://api.surveillance.local/webhooks/telegram"
# Token encryption: AES-256-GCM with KMS-managed key
# Rotation: automatic every 90 days
Runtime Decryption Flow
# pseudocode: Token decryption at service startup
class SecureConfig:
def get_telegram_token(self) -> str:
encrypted = self._load_from_vault("telegram/bot_token")
# Decrypt using KMS (HashiCorp Vault / AWS KMS / Azure Key Vault)
plaintext = self._kms.decrypt(
ciphertext=encrypted,
key_id="notification-service-key",
context={"service": "notification", "environment": "prod"}
)
# Cache in memory with TTL (5 minutes), never persist to disk
self._cache.set("telegram_token", plaintext, ttl=300)
return plaintext
2.2 Chat ID / Group Configuration
# Admin-configured via UI, stored in database
channels:
telegram:
chats:
- id: "-1001234567890" # Group chat ID (negative for groups)
name: "Security Operations"
type: "group"
description: "Main security team group"
allowed_severities: ["low", "medium", "high", "critical"]
quiet_hours: null # No quiet hours (security-critical)
- id: "123456789" # Private chat ID (positive)
name: "Security Manager"
type: "private"
description: "Direct alerts for security manager"
allowed_severities: ["high", "critical"]
quiet_hours: null
- id: "-1009876543210"
name: "Management Alerts"
type: "group"
description: "Management-only critical alerts"
allowed_severities: ["critical"]
quiet_hours: null
2.3 Chat Discovery & Verification
# Bot command handlers for chat registration
class TelegramBotHandler:
async def cmd_start(self, update: Update):
"""User starts bot conversation."""
chat_id = update.effective_chat.id
chat_type = update.effective_chat.type # private/group/supergroup
welcome_msg = (
f"👋 Welcome to Surveillance Alert Bot!\n\n"
f"Chat ID: <code>{chat_id}</code>\n"
f"Type: {chat_type}\n\n"
f"Provide this Chat ID to your administrator to receive alerts.\n"
f"Use /help for available commands."
)
await update.message.reply_html(welcome_msg)
async def cmd_help(self, update: Update):
"""Display help information."""
help_text = (
"📋 <b>Available Commands</b>\n\n"
"/start — Show welcome and Chat ID\n"
"/help — Show this help message\n"
"/status — Check bot connectivity\n"
"/acknowledge {alert_id} — Acknowledge an alert\n"
"/mute {minutes} — Temporarily mute non-critical alerts\n"
"/unmute — Resume all alerts\n"
"/settings — View your notification settings"
)
await update.message.reply_html(help_text)
async def cmd_acknowledge(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Acknowledge an alert by ID."""
alert_id = context.args[0] if context.args else None
if not alert_id:
await update.message.reply_text("❌ Usage: /acknowledge <alert_id>")
return
result = await self.escalation_service.acknowledge_alert(
alert_id=alert_id,
acknowledged_by=f"telegram:{update.effective_user.id}",
channel="telegram"
)
if result.success:
await update.message.reply_text(f"✅ Alert {alert_id} acknowledged.")
else:
await update.message.reply_text(f"❌ {result.error_message}")
2.4 Sending Messages — API Methods
Core Send Methods
class TelegramChannelAdapter:
BASE_URL = "https://api.telegram.org/bot{token}"
def __init__(self, bot_token: str):
self.token = bot_token
self.session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=30),
headers={"Content-Type": "application/json"}
)
# --- Text Messages ---
async def send_text_message(
self,
chat_id: str,
text: str,
parse_mode: str = "HTML", # HTML or MarkdownV2
disable_notification: bool = False,
reply_markup: Optional[dict] = None # Inline keyboard
) -> TelegramSendResult:
"""Send plain text message with optional inline keyboard."""
url = f"{self.BASE_URL}/sendMessage"
payload = {
"chat_id": chat_id,
"text": text,
"parse_mode": parse_mode,
"disable_notification": disable_notification,
"reply_markup": reply_markup,
"link_preview_options": {"is_disabled": True}
}
response = await self.session.post(url, json=payload)
data = await response.json()
return TelegramSendResult(
message_id=data["result"]["message_id"] if data.get("ok") else None,
status="sent" if data.get("ok") else "failed",
error=data.get("description") if not data.get("ok") else None,
timestamp_utc=datetime.utcnow()
)
# --- Photo Messages ---
async def send_photo(
self,
chat_id: str,
photo: Union[str, bytes], # URL, file_id, or binary
caption: str = "",
parse_mode: str = "HTML",
reply_markup: Optional[dict] = None
) -> TelegramSendResult:
"""Send photo with caption. Supports URL, file_id, or multipart upload."""
url = f"{self.BASE_URL}/sendPhoto"
if isinstance(photo, str) and photo.startswith("http"):
# Photo via URL
payload = {
"chat_id": chat_id,
"photo": photo,
"caption": caption,
"parse_mode": parse_mode,
"reply_markup": reply_markup
}
response = await self.session.post(url, json=payload)
else:
# Photo via multipart upload
form = aiohttp.FormData()
form.add_field("chat_id", chat_id)
form.add_field("photo", photo, filename="image.jpg")
form.add_field("caption", caption)
form.add_field("parse_mode", parse_mode)
if reply_markup:
form.add_field("reply_markup", json.dumps(reply_markup))
response = await self.session.post(url, data=form)
data = await response.json()
return self._parse_result(data)
# --- Video Messages ---
async def send_video(
self,
chat_id: str,
video: Union[str, bytes],
caption: str = "",
parse_mode: str = "HTML",
width: Optional[int] = None,
height: Optional[int] = None,
duration: Optional[int] = None,
thumbnail: Optional[str] = None,
reply_markup: Optional[dict] = None
) -> TelegramSendResult:
"""Send video clip. Max 50MB for bots."""
url = f"{self.BASE_URL}/sendVideo"
payload = {
"chat_id": chat_id,
"video": video,
"caption": caption,
"parse_mode": parse_mode,
"supports_streaming": True,
"reply_markup": reply_markup
}
if width: payload["width"] = width
if height: payload["height"] = height
if duration: payload["duration"] = duration
if thumbnail: payload["thumbnail"] = thumbnail
response = await self.session.post(url, json=payload)
data = await response.json()
return self._parse_result(data)
# --- Media Group (multiple photos) ---
async def send_media_group(
self,
chat_id: str,
media: List[MediaGroupItem], # List of photo/video items
caption: str = ""
) -> TelegramSendResult:
"""Send up to 10 photos/videos as an album."""
url = f"{self.BASE_URL}/sendMediaGroup"
media_json = []
for idx, item in enumerate(media[:10]): # Telegram limit: 10 items
media_item = {
"type": item.type, # "photo" or "video"
"media": item.url
}
if idx == len(media) - 1: # Caption on last item
media_item["caption"] = caption
media_item["parse_mode"] = "HTML"
media_json.append(media_item)
payload = {"chat_id": chat_id, "media": media_json}
response = await self.session.post(url, json=payload)
data = await response.json()
return self._parse_result(data)
2.5 Message Formatting (HTML Mode)
# HTML tag support in Telegram Bot API
TELEGRAM_HTML_TAGS = {
"allowed": [
"<b>", "<strong>", # Bold
"<i>", "<em>", # Italic
"<u>", # Underline
"<s>", "<strike>", "<del>", # Strikethrough
"<code>", # Inline code
"<pre>", # Code block
"<a href='...'>", # Hyperlinks
"<tg-spoiler>", # Spoiler
"<blockquote>", # Blockquote
],
"escape_required": ["<", ">", "&", '"'],
}
def escape_telegram_html(text: str) -> str:
"""Escape special characters for Telegram HTML parse mode."""
return text.replace("&", "&").replace("<", "<").replace(">", ">")
def build_alert_message(template: str, placeholders: dict) -> str:
"""Render template with escaped placeholders for Telegram HTML."""
# Escape all placeholder values first
safe_values = {k: escape_telegram_html(str(v)) for k, v in placeholders.items()}
# Render template (template contains pre-approved HTML tags)
return template.format(**safe_values)
2.6 Inline Keyboard (Quick Actions)
def build_alert_keyboard(alert_id: str, details_url: str) -> dict:
"""Build inline keyboard for alert messages."""
return {
"inline_keyboard": [
[
{
"text": "✅ Acknowledge",
"callback_data": json.dumps({"action": "ack", "id": alert_id})
},
{
"text": "📹 View Live Feed",
"url": details_url # Opens camera live view in browser
}
],
[
{
"text": "📋 View Details",
"callback_data": json.dumps({"action": "details", "id": alert_id})
},
{
"text": "🔍 View on Map",
"callback_data": json.dumps({"action": "map", "id": alert_id})
}
],
[
{
"text": "🚨 Escalate",
"callback_data": json.dumps({"action": "escalate", "id": alert_id})
}
]
]
}
# Callback query handler
async def handle_callback_query(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle button clicks on inline keyboards."""
query = update.callback_query
await query.answer() # Acknowledge to stop loading animation
data = json.loads(query.data)
action = data["action"]
alert_id = data["id"]
handlers = {
"ack": self._handle_acknowledge,
"details": self._handle_view_details,
"map": self._handle_view_map,
"escalate": self._handle_escalate,
}
handler = handlers.get(action)
if handler:
result = await handler(alert_id, query.from_user.id)
await query.edit_message_reply_markup(
reply_markup=result.updated_keyboard if result.success else None
)
if result.notification_text:
await query.answer(result.notification_text, show_alert=True)
2.7 Telegram Delivery Confirmation
async def telegram_webhook_handler(self, request: Request):
"""Handle Telegram webhook updates for delivery receipts."""
update = await request.json()
# Delivery confirmation comes via callback_query or message delivery
if "message" in update:
message = update["message"]
# Store message_id -> delivery mapping
await self.delivery_tracker.update_status(
channel_message_id=f"tg:{message['chat']['id']}:{message['message_id']}",
status="delivered",
delivered_at=datetime.utcnow()
)
if "callback_query" in update:
# User interacted with message — considered "engaged"
callback = update["callback_query"]
await self.delivery_tracker.update_status(
channel_message_id=f"tg:{callback['message']['chat']['id']}:{callback['message']['message_id']}",
status="engaged",
engaged_at=datetime.utcnow()
)
return {"ok": True}
2.8 Telegram API Limits
| Limit | Value | Handling |
|---|---|---|
| Message length | 4096 chars | Truncate with ellipsis + "View details" link |
| Photo size | 10MB upload, no limit via URL | Resize if needed |
| Video size | 50MB | Compress if exceeds |
| Media group | 10 items max | Split into multiple messages |
| Rate limit | ~30 messages/sec per bot | Queue + throttle |
| Caption length | 1024 chars | Truncate if needed |
3. WhatsApp Business API Integration
3.1 Architecture — Official Meta API
Uses the official WhatsApp Business Platform Cloud API (Meta). NOT unofficial solutions.
+----------------------------+
| Your Notification Service |
| (FastAPI) |
+------------+---------------+
|
| HTTPS POST (Graph API v18.0+)
v
+----------------------------+
| Meta Graph API |
| /v18.0/{phone_number_id} |
| /messages |
+------------+---------------+
|
| WhatsApp protocol
v
+----------------------------+
| WhatsApp Business API |
| (Meta-managed) |
+------------+---------------+
|
| Encrypted delivery
v
+----------------------------+
| Recipient WhatsApp App |
+----------------------------+
3.2 Configuration
# config/whatsapp.yml (encrypted at rest)
whatsapp:
enabled: true
api_version: "v18.0"
# Business Account credentials
access_token: "${ENCRYPTED:ENC[PKCS7,...]}" # System User access token
phone_number_id: "123456789012345" # WhatsApp Business Phone Number ID
business_account_id: "987654321098765" # WhatsApp Business Account ID
# Webhook configuration (for delivery receipts)
webhook:
verify_token: "${ENCRYPTED:...}" # Token for webhook verification
endpoint: "/webhooks/whatsapp"
# Rate limiting (enforced by Meta + our throttling)
rate_limit:
messages_per_second: 80 # Business tier limit
burst_capacity: 200
# Template namespace (auto-assigned by Meta)
template_namespace: "your_business_namespace"
3.3 Authentication
class WhatsAppAuth:
"""Handle WhatsApp Business API authentication."""
TOKEN_REFRESH_BUFFER = 300 # Refresh 5 min before expiry
def __init__(self, kms_client, config_store):
self.kms = kms_client
self.config = config_store
self._access_token: Optional[str] = None
self._token_expiry: Optional[datetime] = None
async def get_access_token(self) -> str:
"""Get valid access token, refresh if needed."""
if self._access_token and self._token_expiry:
if datetime.utcnow() < self._token_expiry - timedelta(seconds=self.TOKEN_REFRESH_BUFFER):
return self._access_token
# Fetch and decrypt stored token
encrypted_token = await self.config.get("whatsapp.access_token")
decrypted = self.kms.decrypt(encrypted_token)
# For long-lived tokens, validate with Meta
token_info = await self._introspect_token(decrypted)
self._access_token = decrypted
self._token_expiry = datetime.utcnow() + timedelta(seconds=token_info["expires_in"])
return self._access_token
async def _introspect_token(self, token: str) -> dict:
"""Validate token with Meta's debug endpoint."""
url = "https://graph.facebook.com/debug_token"
params = {
"input_token": token,
"access_token": token # Use same token for validation
}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as resp:
data = await resp.json()
return data["data"]
3.4 Message Sending — API Implementation
Core Message Sender
class WhatsAppChannelAdapter:
BASE_URL = "https://graph.facebook.com"
def __init__(self, auth: WhatsAppAuth, phone_number_id: str, api_version: str = "v18.0"):
self.auth = auth
self.phone_number_id = phone_number_id
self.api_version = api_version
self.session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30))
def _build_url(self, endpoint: str) -> str:
return f"{self.BASE_URL}/{self.api_version}/{self.phone_number_id}/{endpoint}"
# --- Template Messages (Business-Initiated) ---
async def send_template_message(
self,
to: str, # E.164 format: +1234567890
template_name: str,
language_code: str = "en",
components: Optional[List[dict]] = None
) -> WhatsAppSendResult:
"""
Send approved template message.
Required for business-initiated conversations outside 24h window.
"""
url = self._build_url("messages")
token = await self.auth.get_access_token()
payload = {
"messaging_product": "whatsapp",
"recipient_type": "individual",
"to": self._normalize_phone(to),
"type": "template",
"template": {
"name": template_name,
"language": {"code": language_code}
}
}
if components:
payload["template"]["components"] = components
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
response = await self.session.post(url, json=payload, headers=headers)
data = await response.json()
return WhatsAppSendResult(
message_id=data.get("messages", [{}])[0].get("id"),
status="sent" if response.status == 200 else "failed",
error=data.get("error", {}).get("message") if response.status != 200 else None,
response_code=response.status,
timestamp_utc=datetime.utcnow()
)
# --- Text Messages (Within 24h Session) ---
async def send_text_message(
self,
to: str,
text: str,
preview_url: bool = False
) -> WhatsAppSendResult:
"""
Send free-form text message.
Only works within 24h of last user message (session message).
"""
url = self._build_url("messages")
token = await self.auth.get_access_token()
# Truncate to WhatsApp limit
safe_text = text[:4096] if len(text) > 4096 else text
payload = {
"messaging_product": "whatsapp",
"recipient_type": "individual",
"to": self._normalize_phone(to),
"type": "text",
"text": {
"body": safe_text,
"preview_url": preview_url
}
}
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
response = await self.session.post(url, json=payload, headers=headers)
data = await response.json()
return self._parse_result(response, data)
# --- Image Messages ---
async def send_image(
self,
to: str,
image_url: str, # Public HTTPS URL
caption: str = ""
) -> WhatsAppSendResult:
"""Send image message. Image must be publicly accessible via HTTPS."""
url = self._build_url("messages")
token = await self.auth.get_access_token()
payload = {
"messaging_product": "whatsapp",
"recipient_type": "individual",
"to": self._normalize_phone(to),
"type": "image",
"image": {
"link": image_url, # Public URL (not file upload)
"caption": caption[:4096] # Caption limit
}
}
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
response = await self.session.post(url, json=payload, headers=headers)
data = await response.json()
return self._parse_result(response, data)
# --- Video Messages ---
async def send_video(
self,
to: str,
video_url: str, # Public HTTPS URL
caption: str = ""
) -> WhatsAppSendResult:
"""Send video message. Max 16MB."""
url = self._build_url("messages")
token = await self.auth.get_access_token()
payload = {
"messaging_product": "whatsapp",
"recipient_type": "individual",
"to": self._normalize_phone(to),
"type": "video",
"video": {
"link": video_url,
"caption": caption[:4096]
}
}
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
response = await self.session.post(url, json=payload, headers=headers)
data = await response.json()
return self._parse_result(response, data)
# --- Media Upload (for non-URL media) ---
async def upload_media(self, media_path: str, mime_type: str) -> str:
"""
Upload media to WhatsApp servers.
Returns media_id that can be used in message sends.
"""
upload_url = f"{self.BASE_URL}/{self.api_version}/{self.phone_number_id}/media"
token = await self.auth.get_access_token()
data = aiohttp.FormData()
data.add_field("messaging_product", "whatsapp")
data.add_field("file", open(media_path, "rb"), filename=os.path.basename(media_path))
data.add_field("type", mime_type)
headers = {"Authorization": f"Bearer {token}"}
response = await self.session.post(upload_url, data=data, headers=headers)
result = await response.json()
return result["id"] # Media ID for reuse
@staticmethod
def _normalize_phone(phone: str) -> str:
"""Normalize to E.164 format."""
cleaned = re.sub(r"[^\d+]", "", phone)
if not cleaned.startswith("+"):
raise ValueError(f"Phone number must include country code: {phone}")
return cleaned
3.5 Message Templates (Meta Approval Required)
Template Registration & Approval Workflow
class WhatsAppTemplateManager:
"""Manage WhatsApp message templates with Meta approval workflow."""
async def create_template(self, template: TemplateDefinition) -> TemplateResult:
"""
Submit new template for Meta approval.
Templates must be approved before use in business-initiated messages.
"""
url = f"{BASE_URL}/{api_version}/{business_account_id}/message_templates"
payload = {
"name": template.name, # lowercase_underscore format
"category": template.category, # "UTILITY" or "MARKETING"
"language": template.language,
"components": self._build_components(template)
}
response = await self.session.post(url, json=payload, headers=headers)
result = await response.json()
# Template goes through approval: PENDING -> APPROVED/REJECTED
return TemplateResult(
template_id=result["id"],
name=template.name,
status="PENDING_APPROVAL", # Initial state
estimated_approval_time="up to 24 hours"
)
async def get_template_status(self, template_name: str) -> str:
"""Check template approval status."""
# Statuses: PENDING, APPROVED, REJECTED, PAUSED, DISABLED
template = await self._fetch_template(template_name)
return template["status"]
async def list_approved_templates(self) -> List[dict]:
"""List all approved templates for the UI."""
url = f"{BASE_URL}/{api_version}/{business_account_id}/message_templates"
params = {"status": "APPROVED"}
response = await self.session.get(url, params=params, headers=headers)
return (await response.json()).get("data", [])
Required Templates (Pre-Submitted for Approval)
# Templates must be created and approved via Meta Business Manager
# or via API before sending business-initiated messages
whatsapp_templates:
# --- Alert: Person Detected (Known) ---
- name: "person_detected_known"
category: "UTILITY"
language: "en"
components:
header:
type: "TEXT"
text: "🔍 Person Detected"
body:
type: "BODY"
text: "{{1}} ({{2}}) detected at camera {{3}} on {{4}} at {{5}}.\n\nSeverity: {{6}}"
# {{1}} = person_name, {{2}} = role, {{3}} = camera_name,
# {{4}} = date, {{5}} = time, {{6}} = severity
footer:
type: "FOOTER"
text: "AI Surveillance Platform"
buttons:
- type: "QUICK_REPLY"
text: "Acknowledge"
- type: "URL"
text: "View Details"
url: "https://portal.example.com/alerts/{{7}}" # {{7}} = alert_id
# --- Alert: Person Detected (Unknown) ---
- name: "person_detected_unknown"
category: "UTILITY"
language: "en"
components:
header:
type: "TEXT"
text: "❓ Unknown Person Detected"
body:
type: "BODY"
text: "An unknown person was detected at camera {{1}} on {{2}} at {{3}}.\n\nConfidence: {{4}}%"
footer:
type: "FOOTER"
text: "AI Surveillance Platform"
# --- Alert: Watchlist Match ---
- name: "watchlist_alert"
category: "UTILITY"
language: "en"
components:
header:
type: "TEXT"
text: "⚠️ WATCHLIST ALERT"
body:
type: "BODY"
text: "WATCHLIST: {{1}} ({{2}}) detected at camera {{3}} on {{4}} at {{5}}.\n\nThis person is on the {{6}} watchlist."
# {{1}} = name, {{2}} = watchlist_type, {{3}} = camera,
# {{4}} = date, {{5}} = time, {{6}} = watchlist_name
footer:
type: "FOOTER"
text: "AI Surveillance Platform"
buttons:
- type: "QUICK_REPLY"
text: "Acknowledge"
- type: "QUICK_REPLY"
text: "Escalate"
# --- Alert: Blacklist Match ---
- name: "blacklist_alert"
category: "UTILITY"
language: "en"
components:
header:
type: "TEXT"
text: "🚨 BLACKLIST ALERT"
body:
type: "BODY"
text: "BLACKLIST: {{1}} was detected at camera {{2}} on {{3}} at {{4}}.\n\nThis is a HIGH PRIORITY security alert."
footer:
type: "FOOTER"
text: "AI Surveillance Platform"
buttons:
- type: "QUICK_REPLY"
text: "Acknowledge"
- type: "QUICK_REPLY"
text: "Dispatch Security"
# --- Alert: Suspicious Activity ---
- name: "suspicious_activity"
category: "UTILITY"
language: "en"
components:
header:
type: "TEXT"
text: "🛑 Suspicious Activity"
body:
type: "BODY"
text: "Suspicious activity detected: {{1}} at camera {{2}} on {{3}} at {{4}}.\n\nConfidence: {{5}}%"
footer:
type: "FOOTER"
text: "AI Surveillance Platform"
# --- Alert: System ---
- name: "system_alert"
category: "UTILITY"
language: "en"
components:
header:
type: "TEXT"
text: "⚙️ System Alert"
body:
type: "BODY"
text: "{{1}}\n\nTime: {{2}}\nSeverity: {{3}}"
footer:
type: "FOOTER"
text: "AI Surveillance Platform"
# --- Alert: Escalation ---
- name: "escalation_notice"
category: "UTILITY"
language: "en"
components:
header:
type: "TEXT"
text: "⬆️ Alert Escalated"
body:
type: "BODY"
text: "Alert #{{1}} has been escalated to Level {{2}}.\n\nOriginal alert: {{3}}\n\nThis alert has not been acknowledged within {{4}} minutes."
footer:
type: "FOOTER"
text: "AI Surveillance Platform"
Template Parameter Mapping
class TemplateParameterMapper:
"""Map alert data to WhatsApp template parameters."""
MAPPINGS = {
"person_detected_known": {
1: "person_name",
2: "person_role",
3: "camera_name",
4: "date",
5: "time",
6: "severity",
7: "alert_id"
},
"person_detected_unknown": {
1: "camera_name",
2: "date",
3: "time",
4: "confidence"
},
"watchlist_alert": {
1: "person_name",
2: "watchlist_type",
3: "camera_name",
4: "date",
5: "time",
6: "watchlist_name"
},
"blacklist_alert": {
1: "person_name",
2: "camera_name",
3: "date",
4: "time"
},
"suspicious_activity": {
1: "activity_type",
2: "camera_name",
3: "date",
4: "time",
5: "confidence"
},
"system_alert": {
1: "message",
2: "timestamp",
3: "severity"
},
"escalation_notice": {
1: "alert_id",
2: "escalation_level",
3: "alert_summary",
4: "threshold_minutes"
}
}
def map_parameters(self, template_name: str, alert_data: dict) -> List[dict]:
"""Generate ordered parameter list for template."""
mapping = self.MAPPINGS.get(template_name, {})
parameters = []
for param_index in sorted(mapping.keys()):
field_name = mapping[param_index]
value = self._resolve_field(alert_data, field_name)
parameters.append({"type": "text", "text": str(value)})
return parameters
def _resolve_field(self, data: dict, field: str) -> str:
"""Resolve nested field from alert data."""
if field in data:
return data[field]
# Handle special fields
resolvers = {
"date": lambda d: d["timestamp"].strftime("%Y-%m-%d"),
"time": lambda d: d["timestamp"].strftime("%H:%M:%S"),
}
resolver = resolvers.get(field)
return resolver(data) if resolver else "N/A"
3.6 WhatsApp Delivery Receipts (Webhooks)
class WhatsAppWebhookHandler:
"""Handle WhatsApp webhook events for delivery tracking."""
STATUS_MAPPING = {
"sent": "sent", # Message sent to WhatsApp servers
"delivered": "delivered", # Message delivered to recipient's device
"read": "read", # Recipient opened the message
"failed": "failed", # Message failed to deliver
}
async def handle_webhook(self, request: Request):
"""Process incoming webhook from Meta."""
body = await request.json()
for entry in body.get("entry", []):
for change in entry.get("changes", []):
value = change.get("value", {})
# Handle message status updates
if "statuses" in value:
for status in value["statuses"]:
await self._process_status_update(status)
# Handle incoming messages (for session tracking)
if "messages" in value:
for message in value["messages"]:
await self._process_incoming_message(message)
return {"success": True}
async def _process_status_update(self, status: dict):
"""Update delivery status based on webhook."""
message_id = status["id"] # WhatsApp message ID
recipient = status["recipient_id"] # Phone number
status_type = status["status"] # sent/delivered/read/failed
# Map to internal status
internal_status = self.STATUS_MAPPING.get(status_type, "unknown")
# Update delivery tracking
await self.delivery_tracker.update_by_provider_id(
provider_message_id=message_id,
channel="whatsapp",
status=internal_status,
timestamp=datetime.utcnow(),
error=status.get("errors", [{}])[0].get("title") if status_type == "failed" else None
)
# If delivered, cancel escalation timer
if internal_status in ("delivered", "read"):
await self.escalation_service.cancel_pending_escalation(message_id)
async def _process_incoming_message(self, message: dict):
"""Handle user replies — marks active session for 24h window."""
phone_number = message["from"]
# Track active session (allows free-form messages for 24h)
await self.session_tracker.update_session(
phone_number=phone_number,
last_user_message=datetime.utcnow()
)
# Handle quick reply responses
if message.get("type") == "interactive":
interactive = message["interactive"]
if interactive["type"] == "button_reply":
button_id = interactive["button_reply"]["id"]
await self._handle_button_reply(phone_number, button_id)
async def _handle_button_reply(self, phone: str, button_id: str):
"""Handle quick reply button clicks."""
if button_id.startswith("ack_"):
alert_id = button_id[4:]
await self.escalation_service.acknowledge_alert(
alert_id=alert_id,
acknowledged_by=f"whatsapp:{phone}",
channel="whatsapp"
)
elif button_id.startswith("esc_"):
alert_id = button_id[4:]
await self.escalation_service.escalate_alert(alert_id, reason="manual")
3.7 WhatsApp API Limits
| Limit | Value | Handling |
|---|---|---|
| Message text length | 4096 chars | Truncate with details link |
| Image size | 16MB max | Resize/compress before upload |
| Video size | 16MB max | Compress with FFmpeg |
| Video duration | 60 seconds | Trim if needed |
| Media upload | Max 5MB for direct upload | Use presigned URLs for larger |
| Rate limit | 80 msg/sec (business tier) | Token bucket throttling |
| Templates | Must be pre-approved | Submit 48h before needed |
| Session window | 24h from last user message | Use templates outside window |
| Phone number format | E.164 (+1234567890) | Normalize all inputs |
4. Alert Routing Rules Engine
4.1 Rule Engine Architecture
class AlertRoutingEngine:
"""Evaluate routing rules to determine alert recipients."""
def __init__(self, rule_store, recipient_resolver):
self.rule_store = rule_store
self.resolver = recipient_resolver
async def evaluate(self, alert_event: AlertEvent) -> RoutingDecision:
"""
Evaluate all rules against alert event.
Returns final routing decision with recipients and channels.
"""
# Fetch all active rules ordered by priority
rules = await self.rule_store.get_active_rules(
order_by="priority DESC"
)
matched_rules = []
for rule in rules:
if self._evaluate_rule(rule, alert_event):
matched_rules.append(rule)
# Stop on first exclusive match if configured
if rule.stop_on_match:
break
if not matched_rules:
# Apply default routing (all enabled cameras -> security team)
return await self._default_routing(alert_event)
# Merge decisions from all matched rules
recipients = set()
channels = set()
severity_override = None
templates = {}
for rule in matched_rules:
rule_recipients = await self.resolver.resolve(rule.recipient_groups)
recipients.update(rule_recipients)
channels.update(rule.channels)
if rule.severity_override:
severity_override = self._higher_severity(
severity_override, rule.severity_override
)
templates.update(rule.templates or {})
return RoutingDecision(
alert_id=alert_event.id,
recipients=list(recipients),
channels=list(channels),
severity=severity_override or alert_event.severity,
templates=templates,
matched_rules=[r.id for r in matched_rules],
quiet_hours_applicable=self._check_quiet_hours(channels, alert_event)
)
def _evaluate_rule(self, rule: AlertRule, event: AlertEvent) -> bool:
"""Evaluate single rule against alert event."""
conditions = []
for condition in rule.conditions:
result = self._evaluate_condition(condition, event)
conditions.append(result)
# Combine with rule logic (AND/OR)
if rule.logic == "ALL":
return all(conditions)
elif rule.logic == "ANY":
return any(conditions)
else: # CUSTOM expression
return self._evaluate_expression(rule.logic, conditions)
return False
4.2 Condition Types
class ConditionEvaluator:
"""Evaluate individual routing conditions."""
def evaluate(self, condition: Condition, event: AlertEvent) -> bool:
condition_type = condition.type
evaluator = getattr(self, f"_{condition_type}", None)
if not evaluator:
raise ValueError(f"Unknown condition type: {condition_type}")
return evaluator(condition, event)
# --- Camera-based ---
def _camera(self, c: Condition, event: AlertEvent) -> bool:
"""Check if event camera matches condition."""
return event.camera_id in c.values
# --- Person-based ---
def _person(self, c: Condition, event: AlertEvent) -> bool:
"""Check if detected person matches condition."""
if not event.person_id:
return False
return event.person_id in c.values
# --- Role-based ---
def _person_role(self, c: Condition, event: AlertEvent) -> bool:
"""Check if person's role matches condition."""
if not event.person_role:
return False
return event.person_role in c.values # e.g., ["blacklist", "vip"]
# --- Event type ---
def _event_type(self, c: Condition, event: AlertEvent) -> bool:
"""Check if event type matches condition."""
return event.event_type in c.values
# --- Zone-based ---
def _zone(self, c: Condition, event: AlertEvent) -> bool:
"""Check if detection zone matches condition."""
return event.zone_id in c.values
# --- Time-based ---
def _time_range(self, c: Condition, event: AlertEvent) -> bool:
"""Check if event timestamp falls within time range."""
event_time = event.timestamp.time()
tz = timezone(c.timezone) if c.timezone else timezone.utc
local_time = event.timestamp.astimezone(tz).time()
if c.start_time <= c.end_time:
return c.start_time <= local_time <= c.end_time
else: # Crosses midnight
return local_time >= c.start_time or local_time <= c.end_time
# --- Day of week ---
def _day_of_week(self, c: Condition, event: AlertEvent) -> bool:
"""Check if event day matches condition."""
tz = timezone(c.timezone) if c.timezone else timezone.utc
weekday = event.timestamp.astimezone(tz).strftime("%A").lower()
return weekday in [d.lower() for d in c.values]
# --- Severity ---
def _severity(self, c: Condition, event: AlertEvent) -> bool:
"""Check if event severity meets threshold."""
severity_order = {"low": 1, "medium": 2, "high": 3, "critical": 4}
event_level = severity_order.get(event.severity, 0)
threshold = severity_order.get(c.threshold, 0)
operators = {
"eq": event_level == threshold,
"gte": event_level >= threshold,
"gt": event_level > threshold,
"lte": event_level <= threshold,
}
return operators.get(c.operator, False)
# --- Watchlist ---
def _watchlist(self, c: Condition, event: AlertEvent) -> bool:
"""Check if person is on specified watchlist."""
if not event.watchlist_matches:
return False
matched_lists = [w["list_name"] for w in event.watchlist_matches]
return any(wl in matched_lists for wl in c.values)
# --- Confidence threshold ---
def _confidence(self, c: Condition, event: AlertEvent) -> bool:
"""Check if detection confidence meets threshold."""
operators = {
"gte": event.confidence >= c.threshold,
"gt": event.confidence > c.threshold,
"lte": event.confidence <= c.threshold,
}
return operators.get(c.operator, False)
4.3 Rule Configuration Examples
# routing_rules.yml — Configurable via admin UI
routing_rules:
# Rule 1: Always alert on blacklist persons (highest priority)
- id: "rule_blacklist_always"
name: "Blacklist Person Alert"
description: "Always notify security team when blacklist person is detected"
priority: 100 # Highest priority
enabled: true
logic: "ALL"
stop_on_match: false
conditions:
- type: "person_role"
operator: "in"
values: ["blacklist"]
actions:
recipient_groups: ["security_team", "management"]
channels: ["telegram", "whatsapp"]
severity_override: "critical"
templates:
telegram: "blacklist_alert_tg"
whatsapp: "blacklist_alert"
bypass_quiet_hours: true
# Rule 2: VIP watchlist alerts
- id: "rule_vip_watchlist"
name: "VIP Watchlist Match"
description: "Alert on VIP watchlist matches during business hours"
priority: 90
enabled: true
logic: "ALL"
conditions:
- type: "watchlist"
values: ["vip_watchlist"]
- type: "time_range"
start_time: "07:00"
end_time: "22:00"
timezone: "America/New_York"
actions:
recipient_groups: ["management"]
channels: ["telegram"]
severity_override: "high"
templates:
telegram: "watchlist_alert_tg"
# Rule 3: Suspicious activity at night
- id: "rule_night_suspicious"
name: "Night Suspicious Activity"
description: "Escalate suspicious activity detected during night hours"
priority: 80
enabled: true
logic: "ALL"
conditions:
- type: "event_type"
values: ["suspicious_activity"]
- type: "time_range"
start_time: "22:00"
end_time: "06:00"
timezone: "America/New_York"
actions:
recipient_groups: ["night_staff", "security_team"]
channels: ["telegram", "whatsapp"]
severity_override: "high"
bypass_quiet_hours: true
# Rule 4: Per-camera rule — Front entrance always monitored
- id: "rule_front_entrance"
name: "Front Entrance Monitoring"
description: "All person detection at front entrance"
priority: 70
enabled: true
logic: "ALL"
conditions:
- type: "camera"
values: ["cam_01_front_entrance"]
- type: "event_type"
values: ["person_detected"]
actions:
recipient_groups: ["security_team"]
channels: ["telegram"]
# Rule 5: Zone-based — Restricted area
- id: "rule_restricted_zone"
name: "Restricted Area Alert"
description: "Any detection in restricted zones"
priority: 85
enabled: true
logic: "ALL"
conditions:
- type: "zone"
values: ["restricted_warehouse", "server_room"]
actions:
recipient_groups: ["security_team", "management"]
channels: ["telegram", "whatsapp"]
severity_override: "high"
bypass_quiet_hours: true
# Rule 6: Unknown persons during off-hours
- id: "rule_unknown_after_hours"
name: "Unknown Person After Hours"
description: "Unknown person detection outside business hours"
priority: 75
enabled: true
logic: "ALL"
conditions:
- type: "event_type"
values: ["person_detected_unknown"]
- type: "time_range"
start_time: "18:00"
end_time: "07:00"
timezone: "America/New_York"
actions:
recipient_groups: ["night_staff", "security_team"]
channels: ["telegram"]
severity_override: "medium"
# Rule 7: System alerts
- id: "rule_system_alerts"
name: "System Health Alerts"
description: "Camera offline, system errors"
priority: 60
enabled: true
logic: "ALL"
conditions:
- type: "event_type"
values: ["system_alert", "camera_offline", "system_error"]
actions:
recipient_groups: ["management"]
channels: ["telegram"]
severity_override: "high"
# Rule 8: Critical severity — always to management
- id: "rule_critical_always"
name: "Critical Severity Alert"
description: "All critical severity alerts go to management"
priority: 95
enabled: true
logic: "ALL"
conditions:
- type: "severity"
operator: "gte"
threshold: "critical"
actions:
recipient_groups: ["management"]
channels: ["telegram", "whatsapp"]
bypass_quiet_hours: true
# Rule 9: Low confidence suppression
- id: "rule_low_confidence"
name: "Low Confidence Suppression"
description: "Don't alert on detections below 60% confidence"
priority: 10 # Low priority — evaluated last
enabled: true
logic: "ALL"
conditions:
- type: "confidence"
operator: "lt"
threshold: 60
actions:
recipient_groups: [] # No recipients — suppress
channels: []
4.4 Admin UI — Rule Builder (Non-Technical)
# Non-technical admin interface specification
rule_builder_ui:
# Visual drag-and-drop rule builder
components:
trigger_selector:
label: "When this happens..."
options:
- "Person is detected"
- "Watchlist person is detected"
- "Unknown person is detected"
- "Suspicious activity detected"
- "Camera goes offline"
- "System error occurs"
condition_builder:
label: "And these conditions match..."
condition_types:
- type: "camera"
label: "Camera"
widget: "multi_select"
source: "cameras" # Populated from camera registry
- type: "person"
label: "Specific Person"
widget: "person_search"
source: "person_database"
- type: "person_role"
label: "Person Type"
widget: "multi_select"
options:
- value: "blacklist"
label: "Blacklisted Person"
color: "red"
- value: "vip"
label: "VIP"
color: "gold"
- value: "employee"
label: "Employee"
color: "blue"
- value: "visitor"
label: "Visitor"
color: "green"
- type: "event_type"
label: "Event Type"
widget: "multi_select"
options:
- value: "person_detected"
label: "Person Detected"
- value: "person_detected_unknown"
label: "Unknown Person"
- value: "suspicious_activity"
label: "Suspicious Activity"
- value: "system_alert"
label: "System Alert"
- type: "zone"
label: "Zone/Area"
widget: "multi_select"
source: "zones" # Populated from zone definitions
- type: "time_range"
label: "Time of Day"
widget: "time_range_picker"
- type: "day_of_week"
label: "Days of Week"
widget: "day_selector"
- type: "severity"
label: "Severity Level"
widget: "severity_threshold"
options:
- value: "low"
label: "Low"
- value: "medium"
label: "Medium"
- value: "high"
label: "High"
- value: "critical"
label: "Critical"
- type: "watchlist"
label: "Watchlist"
widget: "multi_select"
source: "watchlists"
- type: "confidence"
label: "Confidence Level"
widget: "slider"
min: 0
max: 100
unit: "%"
action_builder:
label: "Then do this..."
actions:
- type: "notify"
label: "Send notification to..."
widget: "recipient_group_selector"
- type: "set_severity"
label: "Set alert severity to..."
widget: "severity_selector"
- type: "bypass_quiet_hours"
label: "Send even during quiet hours"
widget: "toggle"
- type: "require_acknowledgment"
label: "Require acknowledgment"
widget: "toggle"
- type: "escalate_after"
label: "Escalate if not acknowledged after..."
widget: "duration_picker"
default: "15 minutes"
5. Recipient Groups
5.1 Group Data Model
# Recipient group configuration
recipient_groups:
- id: "security_team"
name: "Security Team"
description: "Primary security operations team"
created_at: "2024-01-15T00:00:00Z"
updated_at: "2024-06-01T00:00:00Z"
# Channel assignments
channels:
telegram:
enabled: true
chat_ids:
- "-1001234567890" # Security Ops group
individual_chats: # Also DM each member
enabled: true
for_severity: ["high", "critical"]
whatsapp:
enabled: true
phone_numbers:
- "+12345678901"
- "+12345678902"
- "+12345678903"
# Default settings
default_settings:
quiet_hours: null # No quiet hours (security-critical)
min_severity: "low"
auto_acknowledge_after: null # Never auto-ack
members:
- user_id: "user_sec_001"
name: "John Smith"
role: "Security Supervisor"
telegram_id: "111111111"
phone: "+12345678901"
is_active: true
added_at: "2024-01-15T00:00:00Z"
- user_id: "user_sec_002"
name: "Jane Doe"
role: "Security Officer"
telegram_id: "222222222"
phone: "+12345678902"
is_active: true
added_at: "2024-02-01T00:00:00Z"
- user_id: "user_sec_003"
name: "Bob Johnson"
role: "Security Officer"
telegram_id: "333333333"
phone: "+12345678903"
is_active: true
added_at: "2024-03-10T00:00:00Z"
- id: "management"
name: "Management"
description: "Management-level personnel for critical alerts"
channels:
telegram:
enabled: true
chat_ids:
- "-1009876543210" # Management group
whatsapp:
enabled: true
phone_numbers:
- "+19998887777"
- "+19998887778"
default_settings:
quiet_hours: null
min_severity: "high"
members:
- user_id: "user_mgr_001"
name: "Alice Williams"
role: "Security Director"
telegram_id: "444444444"
phone: "+19998887777"
is_active: true
- user_id: "user_mgr_002"
name: "Charlie Brown"
role: "Operations Manager"
telegram_id: "555555555"
phone: "+19998887778"
is_active: true
- id: "night_staff"
name: "Night Staff"
description: "Night shift security personnel (22:00 - 06:00)"
channels:
telegram:
enabled: true
chat_ids:
- "-1005555666677" # Night shift group
whatsapp:
enabled: false
default_settings:
quiet_hours: null
min_severity: "medium"
active_time_range:
start: "22:00"
end: "06:00"
timezone: "America/New_York"
members:
- user_id: "user_night_001"
name: "David Lee"
role: "Night Supervisor"
telegram_id: "666666666"
phone: "+15551234567"
is_active: true
- user_id: "user_night_002"
name: "Eva Martinez"
role: "Night Officer"
telegram_id: "777777777"
phone: "+15551234568"
is_active: true
5.2 Member Management API
# API endpoints for member management
class RecipientGroupAPI:
# GET /recipient-groups — List all groups
async def list_groups(
self,
skip: int = 0,
limit: int = 50,
active_only: bool = True
) -> List[RecipientGroupSummary]:
...
# GET /recipient-groups/{group_id} — Get group details
async def get_group(self, group_id: str) -> RecipientGroup:
...
# POST /recipient-groups — Create new group
async def create_group(self, request: CreateGroupRequest) -> RecipientGroup:
"""
Request body:
{
"name": "New Group",
"description": "Optional description",
"channels": {
"telegram": {"enabled": true, "chat_ids": []},
"whatsapp": {"enabled": false}
}
}
"""
...
# PUT /recipient-groups/{group_id} — Update group
async def update_group(self, group_id: str, request: UpdateGroupRequest) -> RecipientGroup:
...
# DELETE /recipient-groups/{group_id} — Delete group
async def delete_group(self, group_id: str) -> None:
...
# POST /recipient-groups/{group_id}/members — Add member
async def add_member(
self,
group_id: str,
request: AddMemberRequest
) -> GroupMember:
"""
Request body:
{
"name": "John Smith",
"role": "Security Officer",
"telegram_id": "123456789", # Optional
"phone": "+12345678901", # Required for WhatsApp
"is_active": true
}
"""
...
# PUT /recipient-groups/{group_id}/members/{member_id} — Update member
async def update_member(
self,
group_id: str,
member_id: str,
request: UpdateMemberRequest
) -> GroupMember:
...
# DELETE /recipient-groups/{group_id}/members/{member_id} — Remove member
async def remove_member(self, group_id: str, member_id: str) -> None:
...
# POST /recipient-groups/{group_id}/members/{member_id}/toggle
async def toggle_member_active(self, group_id: str, member_id: str) -> GroupMember:
"""Quick toggle member active/inactive status."""
...
5.3 Multiple Groups Per Alert
An alert can be routed to multiple groups simultaneously. Resolution rules:
class MultiGroupResolver:
"""Resolve multiple recipient groups, deduplicating members."""
async def resolve(self, group_ids: List[str], alert: AlertEvent) -> List[ResolvedRecipient]:
"""Resolve all groups, deduplicate by contact info."""
all_recipients = []
seen_telegram_ids = set()
seen_phones = set()
for group_id in group_ids:
group = await self.group_store.get(group_id)
if not group or not group.is_active:
continue
# Check time-based group availability (e.g., night_staff)
if group.active_time_range:
if not self._is_within_active_hours(group.active_time_range, alert.timestamp):
continue
for member in group.members:
if not member.is_active:
continue
# Telegram deduplication
if member.telegram_id and member.telegram_id not in seen_telegram_ids:
seen_telegram_ids.add(member.telegram_id)
all_recipients.append(ResolvedRecipient(
type="telegram",
identifier=member.telegram_id,
name=member.name,
group_id=group_id,
role=member.role
))
# WhatsApp deduplication
if member.phone and member.phone not in seen_phones:
seen_phones.add(member.phone)
all_recipients.append(ResolvedRecipient(
type="whatsapp",
identifier=member.phone,
name=member.name,
group_id=group_id,
role=member.role
))
return all_recipients
6. Quiet Hours
6.1 Quiet Hours Configuration
# Default: NO quiet hours for security systems
# But configurable per-channel if needed
quiet_hours:
# Global default (can be overridden per-group, per-channel)
default_policy:
enabled: false # Default: disabled for security
description: "No quiet hours by default for security-critical alerts"
# Per-channel quiet hours (if enabled)
channels:
telegram:
enabled: false
schedule:
- days: ["Saturday", "Sunday"]
start: "00:00"
end: "23:59"
timezone: "America/New_York"
label: "Weekend quiet hours (optional)"
whatsapp:
enabled: false
schedule: []
# Per-group overrides (examples of when it might be used)
group_overrides:
# Night staff has no quiet hours (they work at night)
night_staff:
enabled: false
# Management might have quiet hours for non-critical
management:
enabled: true
schedule:
- days: ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday"]
start: "23:00"
end: "06:00"
timezone: "America/New_York"
label: "Weeknight quiet hours"
- days: ["Saturday", "Sunday"]
start: "18:00"
end: "08:00"
timezone: "America/New_York"
label: "Weekend quiet hours"
# During quiet hours, only these severities come through
allowed_during_quiet: ["critical"]
# Emergency bypass configuration
emergency_bypass:
enabled: true
triggers:
- severity: "critical" # Critical alerts always bypass
- tag: "emergency" # Alerts tagged as emergency
- rule_override: "bypass_quiet_hours" # Individual rule setting
notification_method: "all_channels" # Send via all channels when bypassed
6.2 Quiet Hours Evaluation Logic
class QuietHoursEvaluator:
"""Evaluate whether quiet hours apply to a given alert."""
async def should_suppress(
self,
alert: AlertEvent,
recipient: ResolvedRecipient,
channel: str
) -> QuietHoursDecision:
"""
Determine if alert should be suppressed due to quiet hours.
Returns suppression decision with reason.
"""
# Check emergency bypass (critical severity always sends)
if alert.severity == "critical":
return QuietHoursDecision(
suppress=False,
reason: "Critical severity — emergency bypass active"
)
# Check rule-level bypass
if alert.bypass_quiet_hours:
return QuietHoursDecision(
suppress=False,
reason: "Rule-level quiet hours bypass"
)
# Get quiet hours config for this recipient + channel
config = await self._get_quiet_hours_config(recipient, channel)
if not config or not config.enabled:
return QuietHoursDecision(suppress=False, reason="Quiet hours not configured")
# Check if current time falls within quiet hours
current_time = datetime.now(config.timezone)
in_quiet_hours = self._is_in_quiet_hours(current_time, config.schedule)
if not in_quiet_hours:
return QuietHoursDecision(
suppress=False,
reason="Current time outside quiet hours"
)
# In quiet hours — check if severity is allowed through
if alert.severity in config.allowed_during_quiet:
return QuietHoursDecision(
suppress=False,
reason=f"Severity '{alert.severity}' allowed during quiet hours"
)
# Suppress the notification
return QuietHoursDecision(
suppress=True,
reason=f"Quiet hours active ({self._describe_schedule(config.schedule)}). "
f"Severity '{alert.severity}' below threshold.",
resume_at=self._next_non_quiet_time(config.schedule, current_time)
)
def _is_in_quiet_hours(self, current: datetime, schedule: List[ScheduleEntry]) -> bool:
"""Check if current time falls within any quiet hours schedule entry."""
current_day = current.strftime("%A")
current_time = current.time()
for entry in schedule:
if current_day not in entry.days:
continue
start = entry.start_time
end = entry.end_time
if start <= end:
# Same-day window (e.g., 23:00 to 06:00 crosses midnight)
if start <= current_time <= end:
return True
else:
# Crosses midnight
if current_time >= start or current_time <= end:
return True
return False
6.3 Admin UI — Quiet Hours Configuration
ui_quiet_hours:
layout: "simple_form" # Non-technical friendly
fields:
- id: "enable_quiet_hours"
label: "Enable Quiet Hours"
type: "toggle_switch"
help_text: "When enabled, non-critical alerts will be held during specified times."
default: false
warning: "⚠️ For security applications, quiet hours are NOT recommended."
- id: "quiet_hours_schedule"
label: "Quiet Hours Schedule"
type: "schedule_builder"
visible_when: "enable_quiet_hours == true"
options:
presets:
- label: "Never (default for security)"
value: "none"
- label: "Nights (22:00 - 06:00)"
value: "nights"
- label: "Weekends (all day)"
value: "weekends"
- label: "Custom"
value: "custom"
- id: "allow_critical_during_quiet"
label: "Always allow critical alerts"
type: "toggle_switch"
visible_when: "enable_quiet_hours == true"
default: true
disabled: true # Always on, non-configurable
help_text: "Critical alerts always bypass quiet hours. This cannot be disabled for security."
- id: "timezone"
label: "Timezone"
type: "timezone_dropdown"
default: "America/New_York"
7. Message Templates
7.1 Template Engine
class TemplateEngine:
"""Render message templates with placeholder substitution."""
# Placeholder pattern: {variable_name}
PLACEHOLDER_PATTERN = re.compile(r"\{([a-zA-Z_][a-zA-Z0-9_]*)\}")
# Supported variables
BUILT_IN_VARIABLES = {
"timestamp": lambda ctx: ctx["timestamp"].strftime("%Y-%m-%d %H:%M:%S"),
"date": lambda ctx: ctx["timestamp"].strftime("%Y-%m-%d"),
"time": lambda ctx: ctx["timestamp"].strftime("%H:%M:%S"),
"timezone": lambda ctx: str(ctx["timestamp"].tzinfo or "UTC"),
}
async def render(
self,
template_id: str,
context: dict,
channel: str, # "telegram" or "whatsapp"
include_media: bool = False
) -> RenderedMessage:
"""Render template for specified channel."""
template = await self.template_store.get(template_id, channel)
# Build variable context
variables = self._build_variables(context)
# Render text
text = self._render_text(template.text, variables)
# Apply channel-specific formatting
if channel == "telegram":
formatted_text = self._format_telegram(text, template.formatting)
keyboard = template.keyboard if template.keyboard else None
elif channel == "whatsapp":
formatted_text = self._format_whatsapp(text)
keyboard = None # WhatsApp uses template buttons
else:
formatted_text = text
keyboard = None
# Determine media attachments
media = None
if include_media and template.supports_media:
media = await self._resolve_media(context, template.media_config)
return RenderedMessage(
text=formatted_text,
raw_text=text,
keyboard=keyboard,
media=media,
template_id=template_id,
channel=channel
)
def _build_variables(self, context: dict) -> dict:
"""Build full variable dictionary from context."""
variables = dict(context)
# Add built-in computed variables
for name, resolver in self.BUILT_IN_VARIABLES.items():
if name not in variables:
try:
variables[name] = resolver(context)
except Exception:
variables[name] = "N/A"
return variables
def _render_text(self, template_text: str, variables: dict) -> str:
"""Substitute placeholders with values."""
def replace_match(match):
var_name = match.group(1)
value = variables.get(var_name, f"{{{var_name}}}")
return str(value)
return self.PLACEHOLDER_PATTERN.sub(replace_match, template_text)
7.2 Template Definitions
Telegram Templates
templates:
telegram:
# --- Person Detected (Known) ---
person_detected_known:
name: "Person Detected (Known)"
text: |
🔍 <b>Person Detected</b>
<b>{name}</b> ({role})
📍 Camera: {camera_name}
🕐 {date} at {time}
🎯 Confidence: {confidence}%
formatting: "HTML"
supports_media: true
media_config:
image:
caption_template: "{name} at {camera_name} — {time}"
resize: [1280, 720]
video:
caption_template: "{name} at {camera_name} — {time}"
max_duration: 10
max_size_mb: 50
keyboard: "alert_actions" # Reference to keyboard definition
# --- Person Detected (Known) WITH Image ---
person_detected_known_with_image:
name: "Person Detected (Known) + Photo"
text: "{name} ({role}) detected at {camera_name} — {timestamp}"
formatting: "HTML"
supports_media: true
media_config:
image:
caption_template: |
🔍 <b>{name}</b> ({role})
📍 {camera_name} | 🕐 {time}
🎯 {confidence}% confidence
resize: [1280, 720]
keyboard: "alert_actions"
# --- Person Detected (Known) WITH Video ---
person_detected_known_with_video:
name: "Person Detected (Known) + Video"
text: "{name} ({role}) detected at {camera_name} — {timestamp}"
formatting: "HTML"
supports_media: true
media_config:
video:
caption_template: |
🎥 <b>{name}</b> ({role})
📍 {camera_name} | 🕐 {time}
🎯 {confidence}% confidence
max_duration: 10
max_size_mb: 50
keyboard: "alert_actions"
# --- Person Detected (Unknown) ---
person_detected_unknown:
name: "Person Detected (Unknown)"
text: |
❓ <b>Unknown Person Detected</b>
📍 Camera: {camera_name}
🕐 {date} at {time}
🎯 Confidence: {confidence}%
<i>This person is not in the database.</i>
formatting: "HTML"
supports_media: true
media_config:
image:
caption_template: "Unknown person at {camera_name} — {time}"
keyboard: "alert_actions"
# --- Watchlist Alert ---
watchlist_alert:
name: "Watchlist Match"
text: |
⚠️ <b>WATCHLIST ALERT</b>
<b>{name}</b>
📋 Watchlist: {watchlist_type}
📍 Camera: {camera_name}
🕐 {date} at {time}
🎯 Confidence: {confidence}%
formatting: "HTML"
supports_media: true
media_config:
image:
caption_template: "WATCHLIST: {name} ({watchlist_type})"
keyboard: "watchlist_actions"
# --- Blacklist Alert ---
blacklist_alert:
name: "Blacklist Match"
text: |
🚨 <b>BLACKLIST ALERT</b> 🚨
⚠️ <b>{name}</b> has been detected!
📍 Camera: {camera_name}
🕐 {date} at {time}
🎯 Confidence: {confidence}%
<b>This person is BLACKLISTED. Immediate attention required.</b>
formatting: "HTML"
supports_media: true
media_config:
image:
caption_template: "BLACKLIST: {name} at {camera_name}"
video:
caption_template: "BLACKLIST: {name} at {camera_name}"
max_duration: 15
keyboard: "blacklist_actions"
# --- Suspicious Activity ---
suspicious_activity:
name: "Suspicious Activity"
text: |
🛑 <b>Suspicious Activity Detected</b>
Type: <b>{activity_type}</b>
📍 Camera: {camera_name}
🕐 {date} at {time}
🎯 Confidence: {confidence}%
{description}
formatting: "HTML"
supports_media: true
media_config:
image:
caption_template: "Suspicious: {activity_type} at {camera_name}"
video:
caption_template: "Suspicious: {activity_type} at {camera_name}"
max_duration: 15
keyboard: "alert_actions"
# --- System Alert ---
system_alert:
name: "System Alert"
text: |
⚙️ <b>System Alert</b>
{message}
🕐 {timestamp}
🔧 Severity: {severity}
formatting: "HTML"
supports_media: false
keyboard: null
# --- Escalation Notice ---
escalation_notice:
name: "Escalation Notice"
text: |
⬆️ <b>Alert Escalated</b>
Alert #{alert_id} has been escalated to <b>Level {escalation_level}</b>.
Original: {alert_summary}
⏱️ Unacknowledged for {elapsed_minutes} minutes
<i>Please review immediately.</i>
formatting: "HTML"
supports_media: false
keyboard: "escalation_actions"
WhatsApp Template References
WhatsApp templates are pre-defined in Meta Business Manager (see Section 3.5). The template engine maps alert data to template parameters:
WHATSAPP_TEMPLATE_MAPPING = {
"person_detected_known": {
"template_name": "person_detected_known",
"parameter_mapping": {
1: "person_name",
2: "person_role",
3: "camera_name",
4: "date",
5: "time",
6: "severity",
7: "alert_id"
}
},
"person_detected_unknown": {
"template_name": "person_detected_unknown",
"parameter_mapping": {
1: "camera_name",
2: "date",
3: "time",
4: "confidence"
}
},
"watchlist_alert": {
"template_name": "watchlist_alert",
"parameter_mapping": {
1: "person_name",
2: "watchlist_type",
3: "camera_name",
4: "date",
5: "time",
6: "watchlist_name"
}
},
"blacklist_alert": {
"template_name": "blacklist_alert",
"parameter_mapping": {
1: "person_name",
2: "camera_name",
3: "date",
4: "time"
}
},
"suspicious_activity": {
"template_name": "suspicious_activity",
"parameter_mapping": {
1: "activity_type",
2: "camera_name",
3: "date",
4: "time",
5: "confidence"
}
},
"system_alert": {
"template_name": "system_alert",
"parameter_mapping": {
1: "message",
2: "timestamp",
3: "severity"
}
},
"escalation_notice": {
"template_name": "escalation_notice",
"parameter_mapping": {
1: "alert_id",
2: "escalation_level",
3: "alert_summary",
4: "threshold_minutes"
}
}
}
7.3 Template Rendered Examples
Example 1: Known Person (Telegram)
🔍 Person Detected
John Smith (Employee)
📍 Camera: Front Entrance
🕐 2024-06-15 at 14:32:18
🎯 Confidence: 94%
[Inline buttons: Acknowledge | View Live Feed | View Details]
Example 2: Blacklist Alert (Telegram + Image)
🚨 BLACKLIST ALERT 🚨
⚠️ Mark Johnson has been detected!
📍 Camera: Warehouse Rear
🕐 2024-06-15 at 03:15:42
🎯 Confidence: 97%
This person is BLACKLISTED. Immediate attention required.
[Image attached: detection frame]
[Inline buttons: Acknowledge | Dispatch Security | View Details]
Example 3: Watchlist Match (WhatsApp)
[Template: watchlist_alert]
⚠️ WATCHLIST ALERT
Sarah Williams (VIP Watchlist)
detected at camera Lobby Main
on 2024-06-15 at 09:45:30
This person is on the VIP watchlist.
[Buttons: Acknowledge | Escalate]
Example 4: System Alert (Telegram)
⚙️ System Alert
Camera "Parking Lot" has been offline for 5 minutes.
Please check connectivity.
🕐 2024-06-15 at 12:10:00
🔧 Severity: high
7.4 Keyboard Definitions
inline_keyboards:
alert_actions:
buttons:
- row:
- text: "✅ Acknowledge"
action: "acknowledge"
style: "primary"
- text: "📹 View Live"
action: "view_live"
style: "link"
- row:
- text: "📋 Details"
action: "view_details"
style: "secondary"
- text: "🗺️ Map"
action: "view_map"
style: "secondary"
watchlist_actions:
buttons:
- row:
- text: "✅ Acknowledge"
action: "acknowledge"
style: "primary"
- text: "📹 View Live"
action: "view_live"
style: "link"
- row:
- text: "📋 Details"
action: "view_details"
style: "secondary"
- text: "⬆️ Escalate"
action: "escalate"
style: "danger"
blacklist_actions:
buttons:
- row:
- text: "🚨 Acknowledge NOW"
action: "acknowledge"
style: "danger"
- text: "📹 View Live"
action: "view_live"
style: "link"
- row:
- text: "👮 Dispatch Security"
action: "dispatch_security"
style: "danger"
- text: "📋 Details"
action: "view_details"
style: "secondary"
- row:
- text: "⬆️ Escalate to Management"
action: "escalate"
style: "warning"
escalation_actions:
buttons:
- row:
- text: "✅ Acknowledge"
action: "acknowledge"
style: "primary"
- text: "📋 View Original Alert"
action: "view_original"
style: "link"
8. Retry Logic & Failure Handling
8.1 Retry Configuration
retry_config:
# Global retry settings
max_retries: 5
base_delay_seconds: 2
max_delay_seconds: 300 # 5 minutes
exponential_base: 2
# Jitter to prevent thundering herd
jitter:
enabled: true
max_jitter_seconds: 1
# Per-channel retry settings
channels:
telegram:
max_retries: 5
base_delay: 2
retryable_errors:
- "timeout" # Network timeout
- "rate_limit" # 429 Too Many Requests
- "server_error" # 5xx errors
- "conflict" # 409 Conflict
non_retryable_errors:
- "unauthorized" # 401 — bad token
- "forbidden" # 403 — blocked by user
- "chat_not_found" # Chat doesn't exist
- "bad_request" # 400 — malformed request
whatsapp:
max_retries: 5
base_delay: 3
retryable_errors:
- "timeout"
- "rate_limit"
- "server_error"
- "temporarily_unavailable"
non_retryable_errors:
- "invalid_phone_number" # Bad phone number
- "unsupported_message_type" # Template not approved
- "access_denied" # Auth issue
- "recipient_not_in_allowed_list" # Phone not registered
# Circuit breaker settings
circuit_breaker:
enabled: true
failure_threshold: 10 # Open after 10 consecutive failures
recovery_timeout_seconds: 60 # Try again after 60 seconds
half_open_max_calls: 3 # Test with 3 calls when half-open
8.2 Retry Implementation (Pseudocode)
import asyncio
import random
from enum import Enum
from dataclasses import dataclass
from typing import Callable, Optional
class DeliveryStatus(Enum):
PENDING = "pending"
SENT = "sent"
DELIVERED = "delivered"
FAILED = "failed"
RETRYING = "retrying"
DEAD_LETTER = "dead_letter"
class RetryableError(Exception):
"""Error that can be retried."""
pass
class NonRetryableError(Exception):
"""Error that should not be retried."""
pass
@dataclass
class RetryPolicy:
max_retries: int = 5
base_delay: float = 2.0
max_delay: float = 300.0
exponential_base: float = 2.0
jitter: bool = True
max_jitter: float = 1.0
class RetryManager:
"""Handle retry logic with exponential backoff."""
def __init__(
self,
policy: RetryPolicy,
circuit_breaker: Optional[CircuitBreaker] = None,
dead_letter_queue: Optional[DeadLetterQueue] = None,
delivery_tracker: Optional[DeliveryTracker] = None
):
self.policy = policy
self.circuit_breaker = circuit_breaker
self.dlq = dead_letter_queue
self.tracker = delivery_tracker
async def execute_with_retry(
self,
send_func: Callable,
notification_id: str,
*args,
**kwargs
) -> SendResult:
"""
Execute send function with exponential backoff retry.
Pseudocode:
1. Check circuit breaker — if OPEN, fail immediately
2. Attempt delivery
3. If success — record success, return result
4. If retryable error:
a. Calculate next delay: delay = min(base * 2^attempt, max_delay) + jitter
b. Schedule retry after delay
c. Increment attempt counter
d. If attempt > max_retries → move to dead letter queue
5. If non-retryable error → fail immediately, no retry
"""
last_error = None
for attempt in range(self.policy.max_retries + 1):
# Check circuit breaker
if self.circuit_breaker and self.circuit_breaker.is_open():
return SendResult(
status=DeliveryStatus.FAILED,
error="Circuit breaker is OPEN",
circuit_breaker_open=True
)
try:
# Attempt delivery
result = await send_func(*args, **kwargs)
if result.success:
# Success — record and return
if self.circuit_breaker:
self.circuit_breaker.record_success()
await self._record_success(notification_id, result)
return result
# Check if error is retryable
if not self._is_retryable(result.error_code):
# Non-retryable — fail immediately
await self._record_permanent_failure(
notification_id, result.error, attempt
)
return result
last_error = result.error
except RetryableError as e:
last_error = str(e)
except NonRetryableError as e:
# Non-retryable — fail immediately
await self._record_permanent_failure(
notification_id, str(e), attempt
)
return SendResult(
status=DeliveryStatus.FAILED,
error=str(e),
retryable=False
)
except Exception as e:
last_error = str(e)
if not self._is_retryable_exception(e):
await self._record_permanent_failure(
notification_id, str(e), attempt
)
return SendResult(
status=DeliveryStatus.FAILED,
error=str(e),
retryable=False
)
# Calculate retry delay (exponential backoff with jitter)
if attempt < self.policy.max_retries:
delay = self._calculate_delay(attempt)
# Update status to retrying
await self._record_retry(notification_id, attempt, delay, last_error)
# Wait before retry
await asyncio.sleep(delay)
# All retries exhausted — move to dead letter queue
await self._move_to_dead_letter(
notification_id=notification_id,
error=last_error,
total_attempts=self.policy.max_retries + 1
)
if self.circuit_breaker:
self.circuit_breaker.record_failure()
return SendResult(
status=DeliveryStatus.DEAD_LETTER,
error=f"All {self.policy.max_retries} retries exhausted. Last error: {last_error}",
total_attempts=self.policy.max_retries + 1
)
def _calculate_delay(self, attempt: int) -> float:
"""Calculate delay with exponential backoff and jitter."""
# Exponential: base * 2^attempt
delay = self.policy.base_delay * (self.policy.exponential_base ** attempt)
# Cap at max delay
delay = min(delay, self.policy.max_delay)
if self.policy.jitter:
# Add random jitter to prevent thundering herd
jitter = random.uniform(0, self.policy.max_jitter)
delay += jitter
return delay
def _is_retryable(self, error_code: str) -> bool:
"""Determine if error code is retryable."""
retryable_codes = {
"timeout", "rate_limit", "server_error", "conflict",
"temporarily_unavailable", "network_error", "connection_reset"
}
return error_code in retryable_codes
def _is_retryable_exception(self, exc: Exception) -> bool:
"""Determine if exception is retryable."""
retryable_types = (
asyncio.TimeoutError,
ConnectionError,
ConnectionResetError,
)
return isinstance(exc, retryable_types)
async def _record_retry(
self,
notification_id: str,
attempt: int,
next_delay: float,
error: str
):
"""Record retry attempt in delivery tracker."""
if self.tracker:
await self.tracker.update_status(
notification_id=notification_id,
status=DeliveryStatus.RETRYING,
attempt=attempt,
next_retry_at=datetime.utcnow() + timedelta(seconds=next_delay),
error=error
)
async def _record_success(self, notification_id: str, result: SendResult):
"""Record successful delivery."""
if self.tracker:
await self.tracker.update_status(
notification_id=notification_id,
status=DeliveryStatus.SENT,
provider_message_id=result.message_id,
sent_at=datetime.utcnow()
)
async def _record_permanent_failure(
self,
notification_id: str,
error: str,
attempt: int
):
"""Record permanent (non-retryable) failure."""
if self.tracker:
await self.tracker.update_status(
notification_id=notification_id,
status=DeliveryStatus.FAILED,
error=error,
failed_at=datetime.utcnow(),
total_attempts=attempt + 1,
retryable=False
)
async def _move_to_dead_letter(
self,
notification_id: str,
error: str,
total_attempts: int
):
"""Move notification to dead letter queue."""
if self.dlq:
await self.dlq.enqueue(
notification_id=notification_id,
error=error,
total_attempts=total_attempts,
enqueued_at=datetime.utcnow()
)
if self.tracker:
await self.tracker.update_status(
notification_id=notification_id,
status=DeliveryStatus.DEAD_LETTER,
error=error,
total_attempts=total_attempts,
moved_to_dlq_at=datetime.utcnow()
)
8.3 Dead Letter Queue
class DeadLetterQueue:
"""Queue for notifications that failed after all retries."""
def __init__(self, redis_client, notification_store):
self.redis = redis_client
self.store = notification_store
self.queue_key = "notifications:dead_letter"
async def enqueue(self, notification_id: str, error: str, total_attempts: int, enqueued_at: datetime):
"""Add failed notification to DLQ."""
entry = {
"notification_id": notification_id,
"error": error,
"total_attempts": total_attempts,
"enqueued_at": enqueued_at.isoformat(),
"status": "pending_review"
}
await self.redis.lpush(self.queue_key, json.dumps(entry))
async def list_pending(
self,
skip: int = 0,
limit: int = 50
) -> List[DLQEntry]:
"""List notifications in DLQ for admin review."""
entries = await self.redis.lrange(self.queue_key, skip, skip + limit - 1)
return [DLQEntry(**json.loads(e)) for e in entries]
async def retry_notification(self, notification_id: str) -> bool:
"""Manually retry a notification from DLQ."""
# Remove from DLQ
await self._remove_from_queue(notification_id)
# Reset retry counter and re-queue
notification = await self.store.get(notification_id)
notification.retry_count = 0
notification.status = DeliveryStatus.PENDING
# Re-submit to queue manager
await self.queue_manager.enqueue(notification)
return True
async def retry_all(self) -> RetryAllResult:
"""Retry all notifications in DLQ."""
entries = await self.redis.lrange(self.queue_key, 0, -1)
results = {"success": 0, "failed": 0}
for entry_json in entries:
entry = json.loads(entry_json)
try:
await self.retry_notification(entry["notification_id"])
results["success"] += 1
except Exception:
results["failed"] += 1
return RetryAllResult(**results)
async def discard(self, notification_id: str):
"""Permanently discard a notification from DLQ."""
await self._remove_from_queue(notification_id)
await self.store.update_status(
notification_id,
status=DeliveryStatus.FAILED,
discarded_at=datetime.utcnow()
)
8.4 Circuit Breaker
class CircuitBreaker:
"""Circuit breaker pattern to prevent cascading failures."""
STATE_CLOSED = "closed" # Normal operation
STATE_OPEN = "open" # Failing fast
STATE_HALF_OPEN = "half_open" # Testing if recovered
def __init__(
self,
failure_threshold: int = 10,
recovery_timeout: int = 60,
half_open_max_calls: int = 3
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_max_calls = half_open_max_calls
self.state = self.STATE_CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time = None
self.half_open_calls = 0
def is_open(self) -> bool:
"""Check if circuit breaker is open (failing fast)."""
if self.state == self.STATE_OPEN:
# Check if recovery timeout has passed
if self.last_failure_time:
elapsed = (datetime.utcnow() - self.last_failure_time).total_seconds()
if elapsed >= self.recovery_timeout:
self.state = self.STATE_HALF_OPEN
self.half_open_calls = 0
return False
return True
return False
def record_success(self):
"""Record a successful call."""
if self.state == self.STATE_HALF_OPEN:
self.half_open_calls += 1
if self.half_open_calls >= self.half_open_max_calls:
# Successfully tested — close the circuit
self.state = self.STATE_CLOSED
self.failure_count = 0
self.half_open_calls = 0
else:
self.failure_count = max(0, self.failure_count - 1)
def record_failure(self):
"""Record a failed call."""
self.failure_count += 1
self.last_failure_time = datetime.utcnow()
if self.state == self.STATE_HALF_OPEN:
# Failed during testing — re-open
self.state = self.STATE_OPEN
elif self.failure_count >= self.failure_threshold:
# Threshold reached — open circuit
self.state = self.STATE_OPEN
9. Rate Limiting & Throttling
9.1 Rate Limit Configuration
rate_limiting:
# Global rate limit (across all channels)
global:
enabled: true
max_messages_per_minute: 200
burst_capacity: 50
strategy: "token_bucket"
# Per-channel rate limits
per_channel:
telegram:
max_messages_per_second: 30
max_messages_per_minute: 600
burst_capacity: 30
# Per-chat limits
per_chat:
max_messages_per_second: 1
min_interval_seconds: 0.5
whatsapp:
max_messages_per_second: 80 # Business tier limit
max_messages_per_minute: 4800
burst_capacity: 200
# Per-recipient limits
per_recipient:
max_messages_per_minute: 20 # Avoid spam perception
# Per-camera burst handling
per_source:
camera:
max_alerts_per_minute: 30 # Prevent camera spam
burst_window_seconds: 60
cooldown_after_burst_seconds: 300 # 5 min cooldown
# Alert deduplication
deduplication:
enabled: true
window_minutes: 5 # Dedup within 5 minutes
keys:
- "camera_id + person_id + event_type"
- "camera_id + event_type" # For unknown persons
# Deduplication actions
on_duplicate:
action: "suppress" # Options: suppress, batch, increment_counter
batch_window_seconds: 30 # Batch duplicates within 30s
counter_template: " (+{count} more)"
# Queue configuration
queues:
high_priority:
name: "notifications:priority:high"
max_size: 10000
workers: 5
normal_priority:
name: "notifications:priority:normal"
max_size: 50000
workers: 3
low_priority:
name: "notifications:priority:low"
max_size: 100000
workers: 2
9.2 Token Bucket Implementation
import asyncio
from dataclasses import dataclass
from typing import Dict
import time
@dataclass
class TokenBucket:
"""Token bucket for rate limiting."""
capacity: float # Maximum tokens
refill_rate: float # Tokens per second
tokens: float = 0
last_refill: float = 0
def __post_init__(self):
self.tokens = self.capacity
self.last_refill = time.monotonic()
def consume(self, tokens: float = 1) -> bool:
"""Try to consume tokens. Returns True if allowed."""
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def _refill(self):
"""Refill tokens based on elapsed time."""
now = time.monotonic()
elapsed = now - self.last_refill
self.tokens = min(
self.capacity,
self.tokens + elapsed * self.refill_rate
)
self.last_refill = now
def time_until_available(self) -> float:
"""Seconds until at least 1 token is available."""
self._refill()
if self.tokens >= 1:
return 0
return (1 - self.tokens) / self.refill_rate
class RateLimiter:
"""Multi-level rate limiter with token buckets."""
def __init__(self, redis_client):
self.redis = redis_client
self.buckets: Dict[str, TokenBucket] = {}
async def check_rate_limit(
self,
channel: str,
recipient: Optional[str] = None,
source: Optional[str] = None
) -> RateLimitResult:
"""
Check all applicable rate limits.
Levels checked (in order):
1. Global rate limit
2. Per-channel rate limit
3. Per-recipient rate limit (if applicable)
4. Per-source rate limit (if applicable)
"""
checks = []
# 1. Global check
global_allowed = await self._check_global_limit()
checks.append(("global", global_allowed))
# 2. Channel check
channel_allowed = await self._check_channel_limit(channel)
checks.append(("channel", channel_allowed))
# 3. Per-recipient check
if recipient:
recipient_allowed = await self._check_recipient_limit(channel, recipient)
checks.append(("recipient", recipient_allowed))
# 4. Per-source check
if source:
source_allowed = await self._check_source_limit(source)
checks.append(("source", source_allowed))
# All must pass
all_allowed = all(allowed for _, allowed in checks)
if all_allowed:
# Consume tokens from all buckets
await self._consume_tokens(channel, recipient, source)
return RateLimitResult(allowed=True, limiting_factor=None)
# Find which limit was hit
for name, allowed in checks:
if not allowed:
retry_after = await self._get_retry_after(name, channel, recipient)
return RateLimitResult(
allowed=False,
limiting_factor=name,
retry_after_seconds=retry_after
)
return RateLimitResult(allowed=False, limiting_factor="unknown")
async def _check_global_limit(self) -> bool:
"""Check global rate limit using Redis for distributed counting."""
key = "rate_limit:global"
current = await self.redis.incr(key)
if current == 1:
await self.redis.expire(key, 60) # 1-minute window
return current <= 200 # 200 messages per minute
async def _check_channel_limit(self, channel: str) -> bool:
"""Check per-channel rate limit."""
key = f"rate_limit:channel:{channel}"
current = await self.redis.incr(key)
if current == 1:
await self.redis.expire(key, 1) # 1-second window
limits = {"telegram": 30, "whatsapp": 80}
return current <= limits.get(channel, 30)
async def _check_recipient_limit(self, channel: str, recipient: str) -> bool:
"""Check per-recipient rate limit."""
key = f"rate_limit:recipient:{channel}:{recipient}"
current = await self.redis.incr(key)
if current == 1:
await self.redis.expire(key, 60)
return current <= 20 # 20 messages per minute per recipient
9.3 Alert Deduplication
class AlertDeduplicator:
"""Deduplicate alerts within a time window."""
def __init__(self, redis_client, window_minutes: int = 5):
self.redis = redis_client
self.window_seconds = window_minutes * 60
def _generate_dedup_key(self, alert: AlertEvent) -> str:
"""Generate deduplication key from alert attributes."""
if alert.person_id:
# Deduplicate by camera + person + event type
key_parts = ["dedup", alert.camera_id, alert.person_id, alert.event_type]
else:
# Deduplicate by camera + event type (for unknown persons)
key_parts = ["dedup", alert.camera_id, alert.event_type]
return ":".join(key_parts)
async def check_duplicate(self, alert: AlertEvent) -> DedupResult:
"""Check if this alert is a duplicate within the window."""
key = self._generate_dedup_key(alert)
# Check if key exists
exists = await self.redis.exists(key)
if exists:
# Increment counter for this dedup key
count = await self.redis.incr(f"{key}:count")
return DedupResult(
is_duplicate=True,
suppressed=True,
original_alert_id=await self.redis.get(key),
duplicate_count=count
)
# Store alert ID with TTL
await self.redis.setex(key, self.window_seconds, alert.id)
await self.redis.setex(f"{key}:count", self.window_seconds, 1)
return DedupResult(is_duplicate=False)
async def update_with_count(self, alert_id: str) -> Optional[str]:
"""
If alert was sent and duplicates were suppressed,
append counter to indicate how many similar alerts were suppressed.
"""
# This would be called after sending to update the original message
# with a count of suppressed duplicates
pass
10. Delivery Tracking
10.1 Delivery Status Model
from enum import Enum
from datetime import datetime
from typing import Optional, List
from pydantic import BaseModel
class DeliveryStatus(str, Enum):
PENDING = "pending" # Queued, not yet sent
SENT = "sent" # API request successful
DELIVERED = "delivered" # Provider confirmed delivery
READ = "read" # Recipient opened/read
ENGAGED = "engaged" # User interacted (button click)
FAILED = "failed" # Permanently failed
RETRYING = "retrying" # Scheduled for retry
DEAD_LETTER = "dead_letter" # Moved to DLQ
SUPPRESSED = "suppressed" # Quiet hours or dedup
CANCELLED = "cancelled" # Cancelled (e.g., acknowledged before send)
class NotificationDelivery(BaseModel):
"""Full delivery record for a notification."""
# IDs
notification_id: str # Internal UUID
alert_id: str # Parent alert ID
channel: str # "telegram" | "whatsapp"
recipient_id: str # Chat ID or phone number
recipient_name: Optional[str]
# Template info
template_id: str
template_rendered_text: str
# Status timeline
status: DeliveryStatus
status_history: List[StatusTransition]
# Timestamps
created_at: datetime # Notification created
queued_at: Optional[datetime]
sent_at: Optional[datetime]
delivered_at: Optional[datetime]
read_at: Optional[datetime]
failed_at: Optional[datetime]
acknowledged_at: Optional[datetime]
# Provider info
provider_message_id: Optional[str] # Telegram message_id or WhatsApp message ID
provider_error: Optional[str]
provider_error_code: Optional[str]
# Retry info
retry_count: int = 0
max_retries: int = 5
next_retry_at: Optional[datetime]
# Media
media_attached: bool = False
media_type: Optional[str] # "image" | "video"
media_url: Optional[str]
# Escalation
escalation_level: int = 0
escalation_triggered_at: Optional[datetime]
# Context
quiet_hours_suppressed: bool = False
deduplicated: bool = False
duplicate_of: Optional[str]
bypassed_quiet_hours: bool = False
class StatusTransition(BaseModel):
from_status: DeliveryStatus
to_status: DeliveryStatus
timestamp: datetime
reason: Optional[str]
# Database schema (PostgreSQL)
NOTIFICATIONS_TABLE = """
CREATE TABLE notifications (
notification_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
alert_id UUID NOT NULL REFERENCES alerts(alert_id),
channel VARCHAR(20) NOT NULL CHECK (channel IN ('telegram', 'whatsapp')),
recipient_id VARCHAR(100) NOT NULL,
recipient_name VARCHAR(255),
template_id VARCHAR(100) NOT NULL,
rendered_text TEXT,
status VARCHAR(30) NOT NULL DEFAULT 'pending',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
queued_at TIMESTAMPTZ,
sent_at TIMESTAMPTZ,
delivered_at TIMESTAMPTZ,
read_at TIMESTAMPTZ,
failed_at TIMESTAMPTZ,
acknowledged_at TIMESTAMPTZ,
provider_message_id VARCHAR(255),
provider_error TEXT,
provider_error_code VARCHAR(100),
retry_count INT NOT NULL DEFAULT 0,
next_retry_at TIMESTAMPTZ,
media_attached BOOLEAN NOT NULL DEFAULT FALSE,
media_type VARCHAR(20),
media_url TEXT,
escalation_level INT NOT NULL DEFAULT 0,
quiet_hours_suppressed BOOLEAN NOT NULL DEFAULT FALSE,
deduplicated BOOLEAN NOT NULL DEFAULT FALSE,
duplicate_of UUID,
bypassed_quiet_hours BOOLEAN NOT NULL DEFAULT FALSE,
-- Indexes
CONSTRAINT valid_status CHECK (status IN (
'pending', 'sent', 'delivered', 'read', 'engaged',
'failed', 'retrying', 'dead_letter', 'suppressed', 'cancelled'
))
);
-- Indexes for common queries
CREATE INDEX idx_notifications_alert_id ON notifications(alert_id);
CREATE INDEX idx_notifications_status ON notifications(status);
CREATE INDEX idx_notifications_channel ON notifications(channel);
CREATE INDEX idx_notifications_created_at ON notifications(created_at);
CREATE INDEX idx_notifications_recipient ON notifications(channel, recipient_id);
-- Status history table
CREATE TABLE notification_status_history (
history_id BIGSERIAL PRIMARY KEY,
notification_id UUID NOT NULL REFERENCES notifications(notification_id),
from_status VARCHAR(30),
to_status VARCHAR(30) NOT NULL,
timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
reason TEXT,
metadata JSONB
);
CREATE INDEX idx_status_history_notification ON notification_status_history(notification_id);
"""
10.2 Delivery Status API
# GET /delivery-status/{alert_id}
async def get_delivery_status(alert_id: str) -> DeliveryStatusResponse:
"""
Get delivery status for all notifications of an alert.
Response:
{
"alert_id": "uuid",
"event_type": "person_detected",
"total_notifications": 5,
"summary": {
"pending": 0,
"sent": 2,
"delivered": 2,
"read": 1,
"failed": 0,
"retrying": 1,
"dead_letter": 0,
"suppressed": 0
},
"notifications": [
{
"notification_id": "uuid-1",
"channel": "telegram",
"recipient": "Security Team (@sec_ops)",
"status": "delivered",
"provider_message_id": "12345",
"timestamps": {
"created": "2024-06-15T14:32:18Z",
"sent": "2024-06-15T14:32:19Z",
"delivered": "2024-06-15T14:32:20Z"
},
"retry_count": 0,
"media_attached": true,
"escalation_level": 0
},
{
"notification_id": "uuid-2",
"channel": "whatsapp",
"recipient": "+12345678901",
"status": "sent",
"provider_message_id": "wamid.xxx",
"timestamps": {
"created": "2024-06-15T14:32:18Z",
"sent": "2024-06-15T14:32:19Z"
},
"retry_count": 0,
"media_attached": true,
"escalation_level": 0
}
]
}
"""
...
# GET /notifications/{notification_id}/status
async def get_notification_status(notification_id: str) -> NotificationStatus:
"""Get detailed status for a single notification including history."""
...
# GET /delivery-status with filters
async def list_delivery_status(
alert_id: Optional[str] = None,
channel: Optional[str] = None,
status: Optional[str] = None,
recipient: Optional[str] = None,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
skip: int = 0,
limit: int = 50
) -> PaginatedDeliveryList:
"""List delivery statuses with filtering."""
...
10.3 Dashboard Display
dashboard_display:
# Real-time delivery status widget
status_widget:
layout: "timeline"
elements:
- type: "status_badge"
states:
pending: { color: "gray", icon: "clock" }
sent: { color: "blue", icon: "paper-plane" }
delivered: { color: "green", icon: "check-circle" }
read: { color: "purple", icon: "eye" }
failed: { color: "red", icon: "times-circle" }
retrying: { color: "orange", icon: "sync" }
dead_letter: { color: "dark-red", icon: "exclamation-triangle" }
suppressed: { color: "yellow", icon: "ban" }
- type: "timeline"
show_transitions: true
timestamps:
- created: "Creation time"
- queued: "Queued for delivery"
- sent: "Sent to provider"
- delivered: "Delivered to device"
- read: "Read by recipient"
- type: "error_display"
show_for: ["failed", "dead_letter", "retrying"]
fields:
- error_message
- error_code
- retry_count
- next_retry_time
# Aggregated statistics
statistics_panel:
metrics:
- "Delivery rate (24h)"
- "Average delivery time"
- "Failure rate by channel"
- "Top failed recipients"
- "Dead letter queue size"
- "Retry queue depth"
11. Escalation Engine
11.1 Escalation Configuration
escalation_config:
# Global settings
enabled: true
default_threshold_minutes: 15
max_escalation_levels: 3
# Per-severity thresholds
thresholds:
critical:
level_1: 5 # 5 minutes
level_2: 10 # 10 minutes
level_3: 20 # 20 minutes
high:
level_1: 15 # 15 minutes
level_2: 30 # 30 minutes
level_3: 60 # 60 minutes
medium:
level_1: 30 # 30 minutes
level_2: 60 # 60 minutes
level_3: 120 # 2 hours
low:
level_1: 60 # 1 hour
level_2: 120 # 2 hours
level_3: 240 # 4 hours
# Level definitions
levels:
level_1:
name: "Primary Escalation"
description: "Notify additional recipients"
action: "expand_recipients"
config:
add_groups: ["management"]
channels: ["telegram", "whatsapp"]
level_2:
name: "Secondary Escalation"
description: "Escalate to higher severity, notify all"
action: "escalate_severity"
config:
severity_increase: 1 # low->medium, medium->high, high->critical
add_groups: ["management"]
channels: ["telegram", "whatsapp"]
repeat_to_original: true # Re-notify original recipients
level_3:
name: "Final Escalation"
description: "Maximum escalation — all hands"
action: "all_hands"
config:
severity: "critical"
notify_all_groups: true
channels: ["telegram", "whatsapp"]
bypass_quiet_hours: true
include_audit_trail: true
# Acknowledgment settings
acknowledgment:
channels:
- "telegram_inline_button" # Ack via Telegram button
- "whatsapp_quick_reply" # Ack via WhatsApp quick reply
- "dashboard_button" # Ack via web dashboard
- "api_call" # Ack via API
require_ack_for_severities: ["high", "critical"]
auto_acknowledge_after_minutes: null # Never auto-ack for security
11.2 Escalation Engine Implementation
class EscalationEngine:
"""Handle alert escalation for unacknowledged notifications."""
def __init__(
self,
config: EscalationConfig,
notification_service: NotificationService,
timer_scheduler: TimerScheduler,
delivery_tracker: DeliveryTracker
):
self.config = config
self.notification_service = notification_service
self.scheduler = timer_scheduler
self.tracker = delivery_tracker
self.active_timers: Dict[str, str] = {} # alert_id -> timer_id
async def start_escalation_timer(self, alert: AlertEvent, notifications: List[Notification]):
"""Start escalation timer when alert is first sent."""
if alert.severity not in self.config.require_ack_for_severities:
return
# Get threshold for this severity and level 1
threshold = self.config.get_threshold(alert.severity, level=1)
# Schedule escalation check
timer_id = await self.scheduler.schedule(
task="escalation_check",
execute_at=datetime.utcnow() + timedelta(minutes=threshold),
payload={
"alert_id": alert.id,
"current_level": 0,
"severity": alert.severity
}
)
self.active_timers[alert.id] = timer_id
# Store timer reference on alert
await self.tracker.set_escalation_timer(alert.id, timer_id, threshold)
async def handle_escalation_timer(self, payload: dict):
"""Handle escalation timer firing."""
alert_id = payload["alert_id"]
current_level = payload["current_level"]
# Check if alert has been acknowledged
if await self._is_acknowledged(alert_id):
logger.info(f"Alert {alert_id} acknowledged — canceling escalation")
return
# Check if max level reached
if current_level >= self.config.max_escalation_levels:
logger.info(f"Alert {alert_id} reached max escalation level")
return
# Perform escalation
next_level = current_level + 1
await self._escalate(alert_id, next_level)
# Schedule next escalation if not at max
if next_level < self.config.max_escalation_levels:
severity = payload["severity"]
next_threshold = self.config.get_threshold(severity, level=next_level + 1)
timer_id = await self.scheduler.schedule(
task="escalation_check",
execute_at=datetime.utcnow() + timedelta(minutes=next_threshold),
payload={
"alert_id": alert_id,
"current_level": next_level,
"severity": severity
}
)
self.active_timers[alert_id] = timer_id
async def _escalate(self, alert_id: str, level: int):
"""Execute escalation for the given level."""
level_config = self.config.levels.get(f"level_{level}")
if not level_config:
return
alert = await self.alert_store.get(alert_id)
logger.warning(f"Escalating alert {alert_id} to level {level}: {level_config.name}")
if level_config.action == "expand_recipients":
# Add additional recipient groups
await self.notification_service.send_escalation(
alert=alert,
level=level,
recipient_groups=level_config.config["add_groups"],
channels=level_config.config["channels"],
template="escalation_notice"
)
elif level_config.action == "escalate_severity":
# Increase severity
new_severity = self._increase_severity(alert.severity)
await self.alert_store.update_severity(alert_id, new_severity)
# Re-notify original recipients with higher severity
if level_config.config.get("repeat_to_original"):
await self.notification_service.send_escalation(
alert=alert,
level=level,
recipient_groups=level_config.config["add_groups"],
channels=level_config.config["channels"],
template="escalation_notice",
severity_override=new_severity
)
elif level_config.action == "all_hands":
# Notify all configured groups
await self.notification_service.send_escalation(
alert=alert,
level=level,
notify_all_groups=True,
channels=level_config.config["channels"],
template="escalation_notice",
severity_override="critical",
bypass_quiet_hours=True
)
# Update escalation tracking
await self.tracker.record_escalation(alert_id, level, datetime.utcnow())
async def acknowledge_alert(
self,
alert_id: str,
acknowledged_by: str,
channel: str
) -> AckResult:
"""Acknowledge an alert, canceling any pending escalation."""
# Update alert status
await self.alert_store.acknowledge(alert_id, acknowledged_by, channel)
# Cancel escalation timer
if alert_id in self.active_timers:
timer_id = self.active_timers[alert_id]
await self.scheduler.cancel(timer_id)
del self.active_timers[alert_id]
# Update all pending notifications for this alert
await self.tracker.acknowledge_all(alert_id, acknowledged_by, datetime.utcnow())
# Notify other recipients that alert was acknowledged
await self._notify_acknowledgment(alert_id, acknowledged_by)
return AckResult(success=True, acknowledged_at=datetime.utcnow())
async def _is_acknowledged(self, alert_id: str) -> bool:
"""Check if alert has been acknowledged."""
alert = await self.alert_store.get(alert_id)
return alert.acknowledged_at is not None
def _increase_severity(self, current: str) -> str:
"""Increase severity by one level."""
order = ["low", "medium", "high", "critical"]
idx = order.index(current)
return order[min(idx + 1, len(order) - 1)]
12. Media Attachments
12.1 Media Processing Pipeline
Detection Frame/Clip
|
v
[Store Original] --> MinIO/S3 (full resolution)
|
v
[Process for Channels]
|
+---> [Telegram Image] --> Resize to 1280x720 --> Generate presigned URL
| Format: JPEG, Quality: 85
|
+---> [WhatsApp Image] --> Resize to 1600x900 --> Generate presigned URL
| Format: JPEG, Quality: 80
| Max: 16MB
|
+---> [Telegram Video] --> Compress to 50MB limit
| Resolution: 1280x720
| Duration: max 10 seconds
| Format: MP4 (H.264)
|
+---> [WhatsApp Video] --> Compress to 16MB limit
Resolution: 1280x720
Duration: max 10 seconds
Format: MP4 (H.264)
12.2 Media Processor Implementation
import subprocess
import os
from PIL import Image
from typing import Optional, Tuple
class MediaProcessor:
"""Process media attachments for different channels."""
# Channel-specific limits
LIMITS = {
"telegram": {
"image": {"max_size_mb": 10, "max_dimensions": (2560, 2560)},
"video": {"max_size_mb": 50, "max_duration_sec": 60},
},
"whatsapp": {
"image": {"max_size_mb": 16, "max_dimensions": (1920, 1080)},
"video": {"max_size_mb": 16, "max_duration_sec": 60},
}
}
def __init__(self, storage: ObjectStorage, temp_dir: str = "/tmp/media"):
self.storage = storage
self.temp_dir = temp_dir
os.makedirs(temp_dir, exist_ok=True)
async def process_image(
self,
source_url: str,
channel: str,
max_dimensions: Optional[Tuple[int, int]] = None
) -> ProcessedMedia:
"""
Process image for specified channel.
Steps:
1. Download from source
2. Resize if exceeds max dimensions
3. Compress to meet size limit
4. Upload to object storage
5. Generate presigned URL
"""
limits = self.LIMITS[channel]["image"]
max_dims = max_dimensions or limits["max_dimensions"]
# Download
local_path = await self._download(source_url)
try:
# Open and process
with Image.open(local_path) as img:
# Convert to RGB if necessary
if img.mode in ("RGBA", "P"):
img = img.convert("RGB")
# Resize if needed
if img.width > max_dims[0] or img.height > max_dims[1]:
img.thumbnail(max_dims, Image.LANCZOS)
# Save with quality optimization
output_path = os.path.join(self.temp_dir, f"{uuid4()}.jpg")
# Iteratively reduce quality until under size limit
quality = 90
while quality >= 50:
img.save(output_path, "JPEG", quality=quality, optimize=True)
size_mb = os.path.getsize(output_path) / (1024 * 1024)
if size_mb <= limits["max_size_mb"]:
break
quality -= 5
# Upload to storage
storage_key = f"processed/images/{channel}/{uuid4()}.jpg"
await self.storage.upload(output_path, storage_key)
# Generate presigned URL (valid for 1 hour)
presigned_url = await self.storage.presigned_url(
storage_key,
expiration=3600
)
return ProcessedMedia(
url=presigned_url,
storage_key=storage_key,
size_bytes=os.path.getsize(output_path),
width=img.width,
height=img.height,
mime_type="image/jpeg"
)
finally:
# Cleanup temp files
self._cleanup(local_path, output_path if 'output_path' in dir() else None)
async def process_video(
self,
source_url: str,
channel: str,
max_duration: int = 10,
target_resolution: Tuple[int, int] = (1280, 720)
) -> ProcessedMedia:
"""
Process video clip for specified channel.
Steps:
1. Download from source
2. Trim to max duration
3. Resize to target resolution
4. Compress to meet size limit
5. Upload to object storage
6. Generate presigned URL
"""
limits = self.LIMITS[channel]["video"]
max_size_mb = limits["max_size_mb"]
# Download
local_path = await self._download(source_url)
output_path = os.path.join(self.temp_dir, f"{uuid4()}.mp4")
try:
# Get video info
probe = await self._ffprobe(local_path)
duration = float(probe["format"]["duration"])
# Determine target bitrate to fit within size limit
target_size_bits = (max_size_mb * 0.9) * 8 * 1024 * 1024 # 90% of limit
effective_duration = min(duration, max_duration)
target_bitrate = int(target_size_bits / effective_duration)
# Two-pass encoding for better quality
await self._ffmpeg_encode(
input_path=local_path,
output_path=output_path,
duration=min(duration, max_duration),
resolution=target_resolution,
video_bitrate=f"{target_bitrate // 2}", # Split between video/audio
audio_bitrate="64k"
)
# Verify output size
output_size_mb = os.path.getsize(output_path) / (1024 * 1024)
if output_size_mb > max_size_mb:
# Reduce quality further
target_bitrate = int(target_bitrate * 0.7)
await self._ffmpeg_encode(
input_path=local_path,
output_path=output_path,
duration=min(duration, max_duration),
resolution=target_resolution,
video_bitrate=f"{target_bitrate // 2}",
audio_bitrate="32k"
)
# Upload to storage
storage_key = f"processed/videos/{channel}/{uuid4()}.mp4"
await self.storage.upload(output_path, storage_key)
presigned_url = await self.storage.presigned_url(storage_key, expiration=3600)
return ProcessedMedia(
url=presigned_url,
storage_key=storage_key,
size_bytes=os.path.getsize(output_path),
width=target_resolution[0],
height=target_resolution[1],
duration=effective_duration,
mime_type="video/mp4"
)
finally:
self._cleanup(local_path, output_path)
async def _ffmpeg_encode(
self,
input_path: str,
output_path: str,
duration: float,
resolution: Tuple[int, int],
video_bitrate: str,
audio_bitrate: str
):
"""Run FFmpeg to encode video."""
cmd = [
"ffmpeg",
"-y", # Overwrite output
"-i", input_path, # Input
"-t", str(duration), # Duration limit
"-vf", f"scale={resolution[0]}:{resolution[1]}", # Resolution
"-c:v", "libx264", # Video codec
"-b:v", video_bitrate, # Video bitrate
"-preset", "fast", # Encoding speed
"-c:a", "aac", # Audio codec
"-b:a", audio_bitrate, # Audio bitrate
"-movflags", "+faststart", # Web optimization
"-f", "mp4", # Output format
output_path
]
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
raise MediaProcessingError(f"FFmpeg failed: {stderr.decode()}")
async def cleanup_expired_media(self, max_age_hours: int = 24):
"""Clean up processed media older than specified age."""
cutoff = datetime.utcnow() - timedelta(hours=max_age_hours)
await self.storage.delete_before("processed/", cutoff)
12.3 Media Size Limits Summary
| Channel | Type | Max Size | Max Duration | Recommended |
|---|---|---|---|---|
| Telegram | Photo | 10MB | N/A | 1280x720 JPEG |
| Telegram | Video | 50MB | 60 sec | 1280x720 MP4 |
| Image | 16MB | N/A | 1600x900 JPEG | |
| Video | 16MB | 60 sec | 1280x720 MP4 |
12.4 Temporary URL Generation
class ObjectStorage:
"""S3-compatible object storage with presigned URLs."""
async def presigned_url(
self,
key: str,
expiration: int = 3600, # 1 hour default
content_type: Optional[str] = None
) -> str:
"""
Generate presigned URL for media access.
Security considerations:
- URLs expire after `expiration` seconds
- URLs are signed with HMAC-SHA256
- Bucket policy restricts direct access
- IP restriction can be added for additional security
"""
params = {
"Bucket": self.bucket,
"Key": key,
}
if content_type:
params["ResponseContentType"] = content_type
url = self.client.generate_presigned_url(
"get_object",
Params=params,
ExpiresIn=expiration
)
# Log URL generation for audit
await self.audit_log.record(
action="presigned_url_generated",
object_key=key,
expiration_seconds=expiration
)
return url
13. Notification Service API
13.1 API Specification
Base URL: /api/v1/notifications
POST /send-alert
Send a new alert notification.
Request:
{
"alert": {
"event_type": "person_detected",
"camera_id": "cam_01_front_entrance",
"zone_id": "main_lobby",
"timestamp": "2024-06-15T14:32:18Z",
"severity": "high",
"person_id": "person_123",
"person_name": "John Smith",
"person_role": "blacklist",
"confidence": 94.5,
"watchlist_matches": [
{
"list_name": "security_watchlist",
"match_confidence": 98.2
}
],
"image_url": "https://storage.internal/detection/abc123.jpg",
"video_url": "https://storage.internal/detection/abc123.mp4",
"metadata": {
"bounding_box": [100, 200, 300, 400],
"detection_model": "yolov8-face-v2"
}
},
"options": {
"include_image": true,
"include_video": false,
"bypass_quiet_hours": false,
"require_acknowledgment": true
}
}
Response (202 Accepted):
{
"alert_id": "alert-uuid-123",
"status": "accepted",
"routing_decision": {
"matched_rules": ["rule_blacklist_always", "rule_critical_always"],
"recipient_groups": ["security_team", "management"],
"channels": ["telegram", "whatsapp"],
"resolved_recipients": 5
},
"notifications": [
{
"notification_id": "notif-uuid-1",
"channel": "telegram",
"recipient": "-1001234567890",
"status": "pending"
},
{
"notification_id": "notif-uuid-2",
"channel": "whatsapp",
"recipient": "+12345678901",
"status": "pending"
}
],
"estimated_delivery": "2024-06-15T14:32:20Z"
}
GET /delivery-status/{alert_id}
Get delivery status for all notifications of an alert.
Response (200 OK):
{
"alert_id": "alert-uuid-123",
"event_type": "person_detected",
"severity": "high",
"created_at": "2024-06-15T14:32:18Z",
"total_notifications": 5,
"acknowledged": false,
"summary": {
"pending": 0,
"sent": 1,
"delivered": 2,
"read": 1,
"failed": 0,
"retrying": 1,
"dead_letter": 0,
"suppressed": 0
},
"notifications": [
{
"notification_id": "notif-uuid-1",
"channel": "telegram",
"recipient": {
"id": "-1001234567890",
"name": "Security Operations",
"type": "group"
},
"template": "blacklist_alert_tg",
"status": "delivered",
"provider_message_id": "12345",
"timestamps": {
"created": "2024-06-15T14:32:18Z",
"queued": "2024-06-15T14:32:18.100Z",
"sent": "2024-06-15T14:32:19Z",
"delivered": "2024-06-15T14:32:20Z"
},
"media": {
"attached": true,
"type": "image",
"url": "https://storage.internal/processed/img-1.jpg?signature=..."
},
"retry_count": 0,
"escalation_level": 0,
"quiet_hours_applied": false
}
]
}
POST /retry/{notification_id}
Manually retry a failed or dead-letter notification.
Request:
{
"reason": "Manual retry by admin"
}
Response (202 Accepted):
{
"notification_id": "notif-uuid-3",
"status": "retrying",
"previous_failures": 5,
"retry_scheduled_at": "2024-06-15T14:35:00Z"
}
POST /retry/dead-letter/batch
Retry all notifications in the dead letter queue.
Response (200 OK):
{
"total": 12,
"retried": 10,
"failed_to_retry": 2,
"details": [
{"notification_id": "notif-uuid-x", "status": "queued"},
{"notification_id": "notif-uuid-y", "status": "error", "reason": "Recipient no longer active"}
]
}
POST /acknowledge/{alert_id}
Manually acknowledge an alert.
Request:
{
"acknowledged_by": "admin@example.com",
"note": "False positive - authorized visitor"
}
Response (200 OK):
{
"alert_id": "alert-uuid-123",
"acknowledged": true,
"acknowledged_by": "admin@example.com",
"acknowledged_at": "2024-06-15T14:40:00Z",
"escalation_cancelled": true,
"other_recipients_notified": true
}
GET /templates
List all available message templates.
Response (200 OK):
{
"templates": [
{
"template_id": "person_detected_known",
"name": "Person Detected (Known)",
"channels": ["telegram", "whatsapp"],
"supports_media": true,
"placeholders": ["name", "role", "camera_name", "timestamp", "confidence"],
"whatsapp_template_name": "person_detected_known"
},
{
"template_id": "blacklist_alert",
"name": "Blacklist Alert",
"channels": ["telegram", "whatsapp"],
"supports_media": true,
"placeholders": ["name", "camera_name", "timestamp", "confidence"],
"whatsapp_template_name": "blacklist_alert"
}
]
}
POST /templates/render
Preview a rendered template (for testing).
Request:
{
"template_id": "person_detected_known",
"channel": "telegram",
"placeholders": {
"name": "John Smith",
"role": "Employee",
"camera_name": "Front Entrance",
"timestamp": "2024-06-15T14:32:18Z",
"confidence": 94.5
},
"include_media": false
}
Response (200 OK):
{
"template_id": "person_detected_known",
"channel": "telegram",
"rendered_text": "🔍 <b>Person Detected</b>\n\n<b>John Smith</b> (Employee)\n📍 Camera: Front Entrance\n🕐 2024-06-15 at 14:32:18\n🎯 Confidence: 94.5%",
"raw_text": "John Smith (Employee) detected at Front Entrance — 2024-06-15T14:32:18Z",
"character_count": 156,
"placeholders_resolved": 5,
"placeholders_missing": []
}
GET /recipient-groups
List all recipient groups.
Response (200 OK):
{
"groups": [
{
"id": "security_team",
"name": "Security Team",
"member_count": 3,
"channels": ["telegram", "whatsapp"],
"is_active": true
},
{
"id": "management",
"name": "Management",
"member_count": 2,
"channels": ["telegram", "whatsapp"],
"is_active": true
}
]
}
GET /recipient-groups/{group_id}
Get detailed group information.
Response (200 OK):
{
"id": "security_team",
"name": "Security Team",
"description": "Primary security operations team",
"channels": {
"telegram": {
"enabled": true,
"chat_ids": ["-1001234567890"]
},
"whatsapp": {
"enabled": true,
"phone_numbers": ["+12345678901", "+12345678902"]
}
},
"members": [
{
"id": "user_sec_001",
"name": "John Smith",
"role": "Security Supervisor",
"telegram_id": "111111111",
"phone": "+12345678901",
"is_active": true
}
],
"settings": {
"quiet_hours": null,
"min_severity": "low"
}
}
POST /recipient-groups
Create a new recipient group.
Request:
{
"name": "Weekend Staff",
"description": "Weekend security coverage",
"channels": {
"telegram": {
"enabled": true,
"chat_ids": ["-1009999888877"]
},
"whatsapp": {
"enabled": false
}
},
"settings": {
"min_severity": "medium"
}
}
GET /routing-rules
List all routing rules.
Response (200 OK):
{
"rules": [
{
"id": "rule_blacklist_always",
"name": "Blacklist Person Alert",
"priority": 100,
"enabled": true,
"conditions_count": 1,
"recipient_groups": ["security_team", "management"],
"channels": ["telegram", "whatsapp"]
}
],
"total": 9,
"active": 8
}
POST /routing-rules
Create a new routing rule.
Request:
{
"name": "New Rule",
"priority": 50,
"enabled": true,
"logic": "ALL",
"conditions": [
{
"type": "camera",
"operator": "in",
"values": ["cam_01_front_entrance"]
},
{
"type": "time_range",
"start_time": "18:00",
"end_time": "06:00",
"timezone": "America/New_York"
}
],
"actions": {
"recipient_groups": ["night_staff"],
"channels": ["telegram"],
"severity_override": "high",
"bypass_quiet_hours": true
}
}
GET /dead-letter-queue
List dead letter queue entries.
Response (200 OK):
{
"total": 5,
"entries": [
{
"notification_id": "notif-dlq-1",
"alert_id": "alert-uuid-456",
"channel": "whatsapp",
"recipient": "+12345678901",
"error": "Rate limit exceeded after 5 retries",
"total_attempts": 5,
"enqueued_at": "2024-06-15T10:00:00Z",
"age_minutes": 240
}
]
}
GET /statistics
Get notification statistics.
Response (200 OK):
{
"period": "24h",
"total_alerts": 156,
"total_notifications": 420,
"delivery_rate": 98.5,
"by_channel": {
"telegram": {
"sent": 280,
"delivered": 275,
"failed": 5,
"avg_delivery_time_ms": 850
},
"whatsapp": {
"sent": 140,
"delivered": 135,
"failed": 5,
"avg_delivery_time_ms": 1200
}
},
"by_severity": {
"critical": 12,
"high": 45,
"medium": 67,
"low": 32
},
"escalations": {
"total": 8,
"acknowledged_before_escalation": 148,
"avg_ack_time_seconds": 180
},
"dead_letter_queue": {
"current_size": 5,
"peak_24h": 12
}
}
GET /health
Health check endpoint.
Response (200 OK):
{
"status": "healthy",
"services": {
"telegram_api": "connected",
"whatsapp_api": "connected",
"redis": "connected",
"postgresql": "connected",
"object_storage": "connected"
},
"queue_depths": {
"high_priority": 0,
"normal_priority": 3,
"low_priority": 12
},
"circuit_breakers": {
"telegram": "closed",
"whatsapp": "closed"
}
}
13.2 Webhooks
Telegram Webhook
POST /webhooks/telegram
Content-Type: application/json
Body: Telegram Update object (message, callback_query, etc.)
WhatsApp Webhook
POST /webhooks/whatsapp
Content-Type: application/json
X-Hub-Signature-256: sha256=... # Meta signature verification
Body: WhatsApp webhook payload (statuses, messages)
13.3 Authentication
All API endpoints require authentication:
security:
method: "Bearer Token (JWT)"
scopes:
- "notifications:read" # Read delivery status
- "notifications:write" # Send alerts, acknowledge
- "notifications:admin" # Manage templates, rules, groups
- "notifications:retry" # Retry failed notifications
# Webhook verification
telegram:
method: "Bot token in URL path"
whatsapp:
method: "X-Hub-Signature-256 HMAC verification"
14. Configuration Schema
14.1 Full Configuration
# Full system configuration (managed via admin UI)
notification_service:
# Service settings
service:
name: "AI Surveillance Notifications"
log_level: "INFO"
max_concurrent_sends: 50
# Channel configurations
channels:
telegram:
enabled: true
bot_token: "${ENCRYPTED}"
webhook_url: "https://api.example.com/webhooks/telegram"
rate_limit: 30 # messages/sec
whatsapp:
enabled: true
access_token: "${ENCRYPTED}"
phone_number_id: "${ENCRYPTED}"
business_account_id: "${ENCRYPTED}"
api_version: "v18.0"
webhook_verify_token: "${ENCRYPTED}"
rate_limit: 80 # messages/sec
# Routing
routing:
default_recipient_groups: ["security_team"]
default_channels: ["telegram"]
stop_on_first_exclusive_match: false
# Escalation
escalation:
enabled: true
default_threshold_minutes: 15
max_levels: 3
# Quiet hours
quiet_hours:
default_enabled: false
critical_always_bypass: true
# Rate limiting
rate_limiting:
global_max_per_minute: 200
dedup_window_minutes: 5
# Retry
retry:
max_retries: 5
base_delay_seconds: 2
exponential_base: 2
# Media
media:
image_max_width: 1280
image_quality: 85
video_max_duration_seconds: 10
video_target_resolution: [1280, 720]
temp_url_expiry_seconds: 3600
cleanup_after_hours: 24
# Templates
templates:
default_language: "en"
timezone_display: "America/New_York"
date_format: "YYYY-MM-DD"
time_format: "HH:mm:ss"
15. Database Schema
15.1 Entity Relationship Diagram
+----------------+ +------------------+ +------------------+
| alerts | | notifications | | recipient_groups |
+----------------+ +------------------+ +------------------+
| alert_id (PK) |<-----+| notification_id | | group_id (PK) |
| event_type | | alert_id (FK) | | name |
| camera_id (FK) | | channel | | description |
| person_id (FK) | | recipient_id | +| channels (json) |
| severity | | template_id | | settings (json) |
| confidence | | status | | is_active |
| image_url | | provider_msg_id | | created_at |
| video_url | | retry_count | +------------------+
| metadata(json) | | sent_at | |
| acknowledged | | delivered_at | |
| created_at | | failed_at | +------------------+
+----------------+ | error | | group_members |
| +------------------+ +------------------+
| | | member_id (PK) |
| +------------------+ | group_id (FK) |
| | status_history | | name |
| +------------------+ | role |
| | history_id (PK) | | telegram_id |
| | notification(FK) | | phone |
| | from_status | | is_active |
| | to_status | +------------------+
| | timestamp |
| +------------------+
|
v
+----------------+
| cameras |
+----------------+
| camera_id (PK) |
| name |
| location |
| zone_id (FK) |
| is_active |
+----------------+
15.2 Core Tables
-- Alerts table
CREATE TABLE alerts (
alert_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
event_type VARCHAR(50) NOT NULL,
camera_id VARCHAR(50) NOT NULL REFERENCES cameras(camera_id),
zone_id VARCHAR(50),
person_id VARCHAR(50),
person_name VARCHAR(255),
person_role VARCHAR(50),
confidence DECIMAL(5,2),
severity VARCHAR(20) NOT NULL CHECK (severity IN ('low', 'medium', 'high', 'critical')),
image_url TEXT,
video_url TEXT,
metadata JSONB DEFAULT '{}',
watchlist_matches JSONB DEFAULT '[]',
-- Acknowledgment
acknowledged BOOLEAN NOT NULL DEFAULT FALSE,
acknowledged_by VARCHAR(100),
acknowledged_via VARCHAR(20), -- telegram, whatsapp, dashboard
acknowledged_at TIMESTAMPTZ,
acknowledgment_note TEXT,
-- Escalation
escalation_level INT NOT NULL DEFAULT 0,
max_escalation INT NOT NULL DEFAULT 3,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_alerts_created_at ON alerts(created_at);
CREATE INDEX idx_alerts_camera ON alerts(camera_id);
CREATE INDEX idx_alerts_severity ON alerts(severity);
CREATE INDEX idx_alerts_event_type ON alerts(event_type);
CREATE INDEX idx_alerts_acknowledged ON alerts(acknowledged) WHERE NOT acknowledged;
-- Cameras table
CREATE TABLE cameras (
camera_id VARCHAR(50) PRIMARY KEY,
name VARCHAR(255) NOT NULL,
location VARCHAR(255),
zone_id VARCHAR(50),
rtsp_url TEXT,
is_active BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Recipient groups
CREATE TABLE recipient_groups (
group_id VARCHAR(50) PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description TEXT,
channels JSONB NOT NULL DEFAULT '{}',
settings JSONB NOT NULL DEFAULT '{}',
is_active BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Group members
CREATE TABLE group_members (
member_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
group_id VARCHAR(50) NOT NULL REFERENCES recipient_groups(group_id) ON DELETE CASCADE,
name VARCHAR(255) NOT NULL,
role VARCHAR(100),
telegram_id VARCHAR(50),
phone VARCHAR(20),
is_active BOOLEAN NOT NULL DEFAULT TRUE,
added_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(group_id, telegram_id),
UNIQUE(group_id, phone)
);
-- Routing rules
CREATE TABLE routing_rules (
rule_id VARCHAR(50) PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description TEXT,
priority INT NOT NULL DEFAULT 50,
enabled BOOLEAN NOT NULL DEFAULT TRUE,
logic VARCHAR(10) NOT NULL DEFAULT 'ALL', -- ALL, ANY
stop_on_match BOOLEAN NOT NULL DEFAULT FALSE,
conditions JSONB NOT NULL DEFAULT '[]',
actions JSONB NOT NULL DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Templates
CREATE TABLE message_templates (
template_id VARCHAR(100) PRIMARY KEY,
name VARCHAR(255) NOT NULL,
channel VARCHAR(20) NOT NULL,
text TEXT NOT NULL,
formatting VARCHAR(20) DEFAULT 'HTML',
supports_media BOOLEAN NOT NULL DEFAULT FALSE,
media_config JSONB,
keyboard_config JSONB,
placeholders JSONB NOT NULL DEFAULT '[]',
is_active BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(template_id, channel)
);
-- Dead letter queue
CREATE TABLE dead_letter_queue (
entry_id BIGSERIAL PRIMARY KEY,
notification_id UUID NOT NULL,
alert_id UUID NOT NULL,
channel VARCHAR(20) NOT NULL,
recipient_id VARCHAR(100) NOT NULL,
error TEXT NOT NULL,
total_attempts INT NOT NULL,
enqueued_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
resolved_at TIMESTAMPTZ,
resolution VARCHAR(20) -- retried, discarded, manual
);
16. Deployment & Security
16.1 Deployment Architecture
deployment:
# Docker Compose / Kubernetes
services:
notification-api:
replicas: 2
resources:
cpu: "1"
memory: "2Gi"
notification-worker:
replicas: 3
resources:
cpu: "1"
memory: "2Gi"
queues:
- "notifications:priority:high"
- "notifications:priority:normal"
- "notifications:priority:low"
media-processor:
replicas: 2
resources:
cpu: "2"
memory: "4Gi" # For FFmpeg
redis:
mode: "cluster" # For production
postgresql:
mode: "primary-replica"
backups: "daily"
16.2 Security Checklist
| Category | Requirement | Implementation |
|---|---|---|
| Authentication | API endpoints secured | JWT Bearer tokens |
| Authorization | Role-based access | RBAC with scopes |
| Encryption at Rest | Bot tokens, API keys | AES-256-GCM via KMS |
| Encryption in Transit | All API calls | TLS 1.3 |
| Secret Management | No secrets in code | HashiCorp Vault / cloud KMS |
| Webhook Security | Verify webhook sources | HMAC signature verification |
| Input Validation | Prevent injection | Pydantic schemas, sanitization |
| Rate Limiting | Prevent abuse | Multi-level token buckets |
| Audit Logging | All actions logged | Structured JSON logs |
| PII Protection | Phone numbers, chat IDs | Encrypted, access-controlled |
17. Appendix: Delivery Flow Diagrams
17.1 Complete Alert Delivery Flow
┌─────────────────┐
│ Detection Event │
│ (AI Pipeline) │
└────────┬────────┘
│
▼
┌─────────────────┐
│ Event Bus │
│ (Redis Pub/Sub)│
└────────┬────────┘
│
▼
┌──────────────────────────┐
│ 1. Event Normalization │
│ - Validate schema │
│ - Enrich data │
│ - Assign severity │
└───────────┬──────────────┘
│
▼
┌──────────────────────────┐ No match
│ 2. Rule Engine │─────────────────►┌──────────────┐
│ - Evaluate rules │ │ Log & Drop │
│ - Match conditions │ └──────────────┘
│ - Resolve recipients │
└───────────┬──────────────┘
│ Match found
▼
┌──────────────────────────┐
│ 3. Deduplication Check │
│ - Check Redis key │
│ - Within 5 min window│
└───────────┬──────────────┘
│
┌──────┴──────┐
│ │
New Duplicate
│ │
▼ ▼
┌──────────┐ ┌──────────┐
│ Continue │ │ Suppress │
│ │ │ + Counter│
└────┬─────┘ └──────────┘
│
▼
┌──────────────────────────┐
│ 4. Template Rendering │
│ - Select template │
│ - Fill placeholders │
│ - Format per channel │
└───────────┬──────────────┘
│
▼
┌──────────────────────────┐ ┌──────────┐
│ 5. Media Processing │────►│ MinIO │
│ - Resize image │ │ Storage │
│ - Compress video │ └──────────┘
│ - Generate URL │
└───────────┬──────────────┘
│
▼
┌──────────────────────────┐ Over limit
│ 6. Rate Limit Check │─────────────────►┌──────────────┐
│ - Global check │ │ Queue with │
│ - Per-channel check │ │ delay │
│ - Per-recipient check│ └──────────────┘
└───────────┬──────────────┘
│ Under limit
▼
┌──────────────────────────┐
│ 7. Quiet Hours Check │
┌──────────────────┐
│ Critical? │──Yes──►┌──────────────┐
│ Bypass flag? │ │ Send anyway │
└──────────────────┘ └──────────────┘
│
┌──────┴──────┐
│ │
Allowed In quiet hours
│ │
▼ ▼
┌──────────┐ ┌──────────┐
│ Continue │ │ Suppress │
│ │ │ (log) │
└────┬─────┘ └──────────┘
│
▼
┌──────────────────────────┐
│ 8. Queue by Priority │
│ - Critical: High Q │
│ - High/Normal: Norm Q│
│ - Low: Low Q │
└───────────┬──────────────┘
│
▼
┌──────────────────────────┐
│ 9. Channel Adapter │
│ ┌──────────────┐ │
│ │ Telegram? │─────┼────► Send via Bot API
│ │ WhatsApp? │─────┼────► Send via Graph API
│ └──────────────┘ │
└───────────┬──────────────┘
│
┌──────┴──────┐
│ │
Success Failure
│ │
▼ ▼
┌──────────┐ ┌──────────────────┐
│ Track │ │ Retry Manager │
│ Delivery │ │ - Check retryable│
│ │ │ - Schedule retry │
└──────────┘ │ - Max 5 attempts │
└────────┬─────────┘
│
┌──────┴──────┐
│ │
Retryable Non-retryable
│ │
▼ ▼
┌──────────┐ ┌──────────┐
│ Re-queue │ │ Permanent│
│ w/ delay │ │ failure │
└──────────┘ └──────────┘
│
▼
┌──────────────────┐
│ After 5 failures │
│ Move to DLQ │
└──────────────────┘
17.2 Escalation Flow
Alert Sent
│
▼
┌──────────────────┐
│ Start Escalation │
│ Timer (15 min) │
└────────┬─────────┘
│
▼
┌─────────┐
│Acked? │────Yes──►┌────────────┐
└────┬────┘ │ Cancel timer│
│No └────────────┘
▼
┌─────────┐
│Timer │
│fired? │
└────┬────┘
│Yes
▼
┌──────────────────┐
│ Level 1 Escalation│
│ - Add management │
│ - Notify via all │
└────────┬─────────┘
│
▼
┌─────────┐
│Acked? │────Yes──► Done
└────┬────┘
│No
▼
┌──────────────────┐
│ Level 2 Escalation│
│ - Increase severity│
│ - Re-notify all │
└────────┬─────────┘
│
▼
┌─────────┐
│Acked? │────Yes──► Done
└────┬────┘
│No
▼
┌──────────────────┐
│ Level 3 Escalation│
│ - All hands │
│ - Critical severity│
│ - Bypass quiet hrs│
└──────────────────┘
17.3 Sequence Diagram: Blacklist Alert Flow
AI Detection Event Bus Notif Service Telegram WhatsApp Admin
│ │ │ │ │ │
│─Person detected────>│ │ │ │ │
│ (blacklist match) │ │ │ │ │
│ │─Alert event─────>│ │ │ │
│ │ │ │ │ │
│ │ │─Evaluate rules│ │ │
│ │ │ (rule_blacklist│ │ │
│ │ │ _always match)│ │ │
│ │ │ │ │ │
│ │ │─Resolve groups│ │ │
│ │ │ (security + │ │ │
│ │ │ management) │ │ │
│ │ │ │ │ │
│ │ │─Render template│ │ │
│ │ │ (blacklist │ │ │
│ │ │ template) │ │ │
│ │ │ │ │ │
│ │ │─Process image─>│ │ │
│ │ │ (resize + URL)│ │ │
│ │ │ │ │ │
│ │ │─Check rate limit │ │
│ │ │ (pass) │ │ │
│ │ │ │ │ │
│ │ │─────────Send via Telegram────>│ │
│ │ │ │─Message sent │ │
│ │ │ │<─message_id───│ │
│ │ │ │ │ │
│ │ │─────────Send via WhatsApp──────────────────>│
│ │ │ │ │─Templ msg sent
│ │ │ │ │<─wamid.xxx──│
│ │ │ │ │ │
│ │ │─Store delivery status │ │
│ │ │ │ │ │
│ │ │─Start escalation timer │ │
│ │ │ (15 min) │ │ │
│ │ │ │ │ │
│ │ │ │─Webhook: │ │
│ │ │ │ delivered │ │
│ │ │ │──────────────>│ │
│ │ │ │ │─Webhook: │
│ │ │ │ │ delivered │
│ │ │ │ │────────────>│
│ │ │ │ │ │
│ │ │ │◄──Ack button──│ │
│ │ │ │ (clicked) │ │
│ │ │ │ │ │
│ │ │◄─Acknowledge alert────────────│ │
│ │ │ │ │ │
│ │ │─Cancel escalation │ │
│ │ │ │ │ │
│ │ │─Notify others: "Ack by John" │ │
│ │ │───────────Update status──────>│ │
│ │ │ │ │────────────>│
│ │ │ │ │ │
│ │ │ │ │◄─Dashboard──│
│ │ │ │ │ refresh │
│ │ │ │ │ │
Appendix A: Environment Variables
| Variable | Description | Required | Default |
|---|---|---|---|
NOTIF_TELEGRAM_ENABLED |
Enable Telegram channel | No | true |
NOTIF_TELEGRAM_BOT_TOKEN |
Telegram bot token | If enabled | — |
NOTIF_TELEGRAM_WEBHOOK_URL |
Webhook URL for Telegram | No | auto |
NOTIF_WHATSAPP_ENABLED |
Enable WhatsApp channel | No | true |
NOTIF_WHATSAPP_ACCESS_TOKEN |
Meta access token | If enabled | — |
NOTIF_WHATSAPP_PHONE_NUMBER_ID |
WhatsApp phone number ID | If enabled | — |
NOTIF_WHATSAPP_BUSINESS_ACCOUNT_ID |
WABA ID | If enabled | — |
NOTIF_REDIS_URL |
Redis connection URL | Yes | — |
NOTIF_POSTGRES_URL |
PostgreSQL connection URL | Yes | — |
NOTIF_MINIO_ENDPOINT |
MinIO/S3 endpoint | Yes | — |
NOTIF_KMS_TYPE |
KMS type (vault/aws/azure) | Yes | — |
NOTIF_LOG_LEVEL |
Log level | No | INFO |
NOTIF_MAX_RETRIES |
Max retry attempts | No | 5 |
NOTIF_RETRY_BASE_DELAY |
Retry base delay (sec) | No | 2 |
NOTIF_DEDUP_WINDOW_MINUTES |
Deduplication window | No | 5 |
NOTIF_DEFAULT_ESCALATION_MINUTES |
Escalation threshold | No | 15 |
NOTIF_MEDIA_TEMP_URL_EXPIRY |
Presigned URL expiry (sec) | No | 3600 |
Appendix B: Technology Stack
| Layer | Technology | Version |
|---|---|---|
| API Framework | FastAPI | 0.110+ |
| HTTP Client | aiohttp | 3.9+ |
| Task Queue | Celery + Redis | 5.3+ |
| Database | PostgreSQL | 15+ |
| Cache | Redis | 7+ |
| Object Storage | MinIO | latest |
| Secret Management | HashiCorp Vault | 1.15+ |
| Image Processing | Pillow | 10+ |
| Video Processing | FFmpeg | 6+ |
| Telegram SDK | python-telegram-bot | 21+ |
| WhatsApp SDK | Custom (Graph API) | v18.0 |
End of Document