Notification System

Notification System

Alert routing and escalation design.

AI Surveillance Platform — Notification & Alerting System Design

Document Information

Property Value
Version 1.0.0
Status Draft
Scope Multi-channel notification system (Telegram + WhatsApp Business API)
Platform AI Surveillance Platform — 8 cameras, real-time person detection & watchlist matching
Target Users Non-technical admin (configuration), Security personnel (alert recipients)

Table of Contents

  1. System Architecture Overview
  2. Telegram Integration
  3. WhatsApp Business API Integration
  4. Alert Routing Rules Engine
  5. Recipient Groups
  6. Quiet Hours
  7. Message Templates
  8. Retry Logic & Failure Handling
  9. Rate Limiting & Throttling
  10. Delivery Tracking
  11. Escalation Engine
  12. Media Attachments
  13. Notification Service API
  14. Configuration Schema
  15. Database Schema
  16. Deployment & Security
  17. Appendix: Delivery Flow Diagrams

1. System Architecture Overview

1.1 High-Level Architecture

+------------------------------------------------------------------+
|                        AI SURVEILLANCE PLATFORM                     |
|                                                                     |
|  +----------------+  +----------------+  +---------------------+  |
|  | Person         |  | Watchlist      |  | Suspicious Activity |  |
|  | Detection      |  | Matching       |  | Detection           |  |
|  +--------+-------+  +--------+-------+  +----------+----------+  |
|           |                   |                    |               |
+-----------|-------------------|--------------------|---------------+
            |                   |                    |
            v                   v                    v
+-----------+-------------------+--------------------+---------------+
|                         EVENT BUS (Redis Pub/Sub)                   |
+-----------+-------------------+--------------------+---------------+
            |                                        |
            v                                        v
+-----------+-------------------+    +-------------------------------+
|   NOTIFICATION SERVICE         |    |    MEDIA PROCESSING SERVICE   |
|                                |    |                               |
|  +------------------------+   |    |  +-------------------------+  |
|  | Alert Router Engine    |   |    |  | Image Resizer           |  |
|  | - Rule Evaluation      |   |    |  | - Telegram: up to 10MB  |  |
|  | - Severity Assignment  |   |    |  | - WhatsApp: up to 16MB  |  |
|  | - Recipient Resolution |   |    |  | - Thumbnail generation  |  |
|  +------------------------+   |    |  +-------------------------+  |
|                                |    |                               |
|  +------------------------+   |    |  +-------------------------+  |
|  | Template Engine        |   |    |  | Video Clip Processor    |  |
|  | - Placeholder rendering|   |    |  | - Telegram: up to 50MB  |  |
|  | - Multi-lang support   |   |    |  | - WhatsApp: up to 16MB  |  |
|  | - Format conversion    |   |    |  | - Format transcoding    |  |
|  +------------------------+   |    |  +-------------------------+  |
|                                |    |                               |
|  +------------------------+   |    |  +-------------------------+  |
|  | Queue Manager          |   |    |  | Temporary URL Generator |  |
|  | - Priority queues      |   |    |  | - Signed URLs w/ expiry |  |
|  | - Rate limiting        |   |    |  | - S3/MinIO presigned    |  |
|  | - Deduplication        |   |    |  +-------------------------+  |
|  +------------------------+   |    +-------------------------------+
|                                |
|  +------------------------+   |
|  | Retry Manager          |   |
|  | - Exponential backoff  |   |
|  | - Dead letter queue    |   |
|  | - Manual retry API     |   |
|  +------------------------+   |
|                                |
|  +------------------------+   |
|  | Escalation Engine      |   |
|  | - Timer-based triggers |   |
|  | - Level progression    |   |
|  | - Recipient expansion  |   |
|  +------------------------+   |
+-----------+-------+----------+
            |       |
   +--------+       +--------+
   |                        |
   v                        v
+--+----------------+ +-----+------------------+
| TELEGRAM BOT API   | | WHATSAPP BUSINESS API |
|                    | | (Meta Official)       |
| - sendMessage      | | - Messages endpoint   |
| - sendPhoto        | | - Template messages   |
| - sendVideo        | | - Media upload        |
| - Inline keyboards | | - Webhook receipts    |
+--------------------+ +-----------------------+

1.2 Core Components

Component Technology Purpose
Event Bus Redis Pub/Sub Decouple detection services from notification service
Notification Service Python/FastAPI Core orchestration, routing, templating
Queue Manager Redis Streams + Celery Priority queues, rate limiting, task distribution
Media Processor FFmpeg + Pillow Image resizing, video compression, format conversion
Object Storage MinIO (S3-compatible) Media storage with presigned URL generation
Database PostgreSQL Persistent storage for configs, delivery status, audit logs
Cache Redis Rate limit counters, deduplication keys, session state

1.3 Alert Lifecycle

DETECTION EVENT
      |
      v
[Event Normalized] --> {camera_id, event_type, person_id, timestamp, image_url, video_url, severity}
      |
      v
[Router Engine Evaluates Rules] --> matching_rules[]
      |
      v
[Recipients Resolved] --> recipient_groups[] + individual_recipients[]
      |
      v
[Template Selected & Rendered] --> message_text + formatting
      |
      v
[Media Processed] --> resized_image + compressed_video (if needed)
      |
      v
[Rate Limit Check] --> pass / throttle / deduplicate
      |
      v
[Queued per Channel] --> Telegram queue / WhatsApp queue
      |
      v
[Channel Adapter Sends] --> HTTP request to provider API
      |
      v
[Delivery Status Tracked] --> pending -> sent -> delivered/failed
      |
      v
[If failed] --> Retry Manager (exponential backoff, max 5)
      |
      v
[If unacknowledged] --> Escalation Engine (after threshold)
      |
      v
[Audit Log Written] --> persistent record with full trace

2. Telegram Integration

2.1 Bot Configuration

Encrypted Token Storage

# config/telegram.yml (stored encrypted at rest)
telegram:
  enabled: true
  bot_token: "${ENCRYPTED:ENC[PKCS7,MIIBkAYJKoZIhvcNAQcDoIIB...]}"  # Decrypted at runtime
  bot_username: "SurveillanceAlertBot"
  webhook_url: "https://api.surveillance.local/webhooks/telegram"
  # Token encryption: AES-256-GCM with KMS-managed key
  # Rotation: automatic every 90 days

Runtime Decryption Flow

# pseudocode: Token decryption at service startup
class SecureConfig:
    def get_telegram_token(self) -> str:
        encrypted = self._load_from_vault("telegram/bot_token")
        # Decrypt using KMS (HashiCorp Vault / AWS KMS / Azure Key Vault)
        plaintext = self._kms.decrypt(
            ciphertext=encrypted,
            key_id="notification-service-key",
            context={"service": "notification", "environment": "prod"}
        )
        # Cache in memory with TTL (5 minutes), never persist to disk
        self._cache.set("telegram_token", plaintext, ttl=300)
        return plaintext

2.2 Chat ID / Group Configuration

# Admin-configured via UI, stored in database
channels:
  telegram:
    chats:
      - id: "-1001234567890"           # Group chat ID (negative for groups)
        name: "Security Operations"
        type: "group"
        description: "Main security team group"
        allowed_severities: ["low", "medium", "high", "critical"]
        quiet_hours: null               # No quiet hours (security-critical)
        
      - id: "123456789"                # Private chat ID (positive)
        name: "Security Manager"
        type: "private"
        description: "Direct alerts for security manager"
        allowed_severities: ["high", "critical"]
        quiet_hours: null
        
      - id: "-1009876543210"
        name: "Management Alerts"
        type: "group"
        description: "Management-only critical alerts"
        allowed_severities: ["critical"]
        quiet_hours: null

2.3 Chat Discovery & Verification

# Bot command handlers for chat registration
class TelegramBotHandler:
    
    async def cmd_start(self, update: Update):
        """User starts bot conversation."""
        chat_id = update.effective_chat.id
        chat_type = update.effective_chat.type  # private/group/supergroup
        
        welcome_msg = (
            f"👋 Welcome to Surveillance Alert Bot!\n\n"
            f"Chat ID: <code>{chat_id}</code>\n"
            f"Type: {chat_type}\n\n"
            f"Provide this Chat ID to your administrator to receive alerts.\n"
            f"Use /help for available commands."
        )
        await update.message.reply_html(welcome_msg)
    
    async def cmd_help(self, update: Update):
        """Display help information."""
        help_text = (
            "📋 <b>Available Commands</b>\n\n"
            "/start — Show welcome and Chat ID\n"
            "/help — Show this help message\n"
            "/status — Check bot connectivity\n"
            "/acknowledge {alert_id} — Acknowledge an alert\n"
            "/mute {minutes} — Temporarily mute non-critical alerts\n"
            "/unmute — Resume all alerts\n"
            "/settings — View your notification settings"
        )
        await update.message.reply_html(help_text)
    
    async def cmd_acknowledge(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
        """Acknowledge an alert by ID."""
        alert_id = context.args[0] if context.args else None
        if not alert_id:
            await update.message.reply_text("❌ Usage: /acknowledge <alert_id>")
            return
        
        result = await self.escalation_service.acknowledge_alert(
            alert_id=alert_id,
            acknowledged_by=f"telegram:{update.effective_user.id}",
            channel="telegram"
        )
        
        if result.success:
            await update.message.reply_text(f"✅ Alert {alert_id} acknowledged.")
        else:
            await update.message.reply_text(f"❌ {result.error_message}")

2.4 Sending Messages — API Methods

Core Send Methods

class TelegramChannelAdapter:
    BASE_URL = "https://api.telegram.org/bot{token}"
    
    def __init__(self, bot_token: str):
        self.token = bot_token
        self.session = aiohttp.ClientSession(
            timeout=aiohttp.ClientTimeout(total=30),
            headers={"Content-Type": "application/json"}
        )
    
    # --- Text Messages ---
    
    async def send_text_message(
        self,
        chat_id: str,
        text: str,
        parse_mode: str = "HTML",           # HTML or MarkdownV2
        disable_notification: bool = False,
        reply_markup: Optional[dict] = None  # Inline keyboard
    ) -> TelegramSendResult:
        """Send plain text message with optional inline keyboard."""
        url = f"{self.BASE_URL}/sendMessage"
        payload = {
            "chat_id": chat_id,
            "text": text,
            "parse_mode": parse_mode,
            "disable_notification": disable_notification,
            "reply_markup": reply_markup,
            "link_preview_options": {"is_disabled": True}
        }
        
        response = await self.session.post(url, json=payload)
        data = await response.json()
        
        return TelegramSendResult(
            message_id=data["result"]["message_id"] if data.get("ok") else None,
            status="sent" if data.get("ok") else "failed",
            error=data.get("description") if not data.get("ok") else None,
            timestamp_utc=datetime.utcnow()
        )
    
    # --- Photo Messages ---
    
    async def send_photo(
        self,
        chat_id: str,
        photo: Union[str, bytes],  # URL, file_id, or binary
        caption: str = "",
        parse_mode: str = "HTML",
        reply_markup: Optional[dict] = None
    ) -> TelegramSendResult:
        """Send photo with caption. Supports URL, file_id, or multipart upload."""
        url = f"{self.BASE_URL}/sendPhoto"
        
        if isinstance(photo, str) and photo.startswith("http"):
            # Photo via URL
            payload = {
                "chat_id": chat_id,
                "photo": photo,
                "caption": caption,
                "parse_mode": parse_mode,
                "reply_markup": reply_markup
            }
            response = await self.session.post(url, json=payload)
        else:
            # Photo via multipart upload
            form = aiohttp.FormData()
            form.add_field("chat_id", chat_id)
            form.add_field("photo", photo, filename="image.jpg")
            form.add_field("caption", caption)
            form.add_field("parse_mode", parse_mode)
            if reply_markup:
                form.add_field("reply_markup", json.dumps(reply_markup))
            response = await self.session.post(url, data=form)
        
        data = await response.json()
        return self._parse_result(data)
    
    # --- Video Messages ---
    
    async def send_video(
        self,
        chat_id: str,
        video: Union[str, bytes],
        caption: str = "",
        parse_mode: str = "HTML",
        width: Optional[int] = None,
        height: Optional[int] = None,
        duration: Optional[int] = None,
        thumbnail: Optional[str] = None,
        reply_markup: Optional[dict] = None
    ) -> TelegramSendResult:
        """Send video clip. Max 50MB for bots."""
        url = f"{self.BASE_URL}/sendVideo"
        
        payload = {
            "chat_id": chat_id,
            "video": video,
            "caption": caption,
            "parse_mode": parse_mode,
            "supports_streaming": True,
            "reply_markup": reply_markup
        }
        if width: payload["width"] = width
        if height: payload["height"] = height
        if duration: payload["duration"] = duration
        if thumbnail: payload["thumbnail"] = thumbnail
        
        response = await self.session.post(url, json=payload)
        data = await response.json()
        return self._parse_result(data)
    
    # --- Media Group (multiple photos) ---
    
    async def send_media_group(
        self,
        chat_id: str,
        media: List[MediaGroupItem],  # List of photo/video items
        caption: str = ""
    ) -> TelegramSendResult:
        """Send up to 10 photos/videos as an album."""
        url = f"{self.BASE_URL}/sendMediaGroup"
        
        media_json = []
        for idx, item in enumerate(media[:10]):  # Telegram limit: 10 items
            media_item = {
                "type": item.type,  # "photo" or "video"
                "media": item.url
            }
            if idx == len(media) - 1:  # Caption on last item
                media_item["caption"] = caption
                media_item["parse_mode"] = "HTML"
            media_json.append(media_item)
        
        payload = {"chat_id": chat_id, "media": media_json}
        response = await self.session.post(url, json=payload)
        data = await response.json()
        return self._parse_result(data)

2.5 Message Formatting (HTML Mode)

# HTML tag support in Telegram Bot API
TELEGRAM_HTML_TAGS = {
    "allowed": [
        "<b>", "<strong>",        # Bold
        "<i>", "<em>",            # Italic
        "<u>",                    # Underline
        "<s>", "<strike>", "<del>",  # Strikethrough
        "<code>",                 # Inline code
        "<pre>",                  # Code block
        "<a href='...'>",         # Hyperlinks
        "<tg-spoiler>",           # Spoiler
        "<blockquote>",           # Blockquote
    ],
    "escape_required": ["<", ">", "&", '"'],
}

def escape_telegram_html(text: str) -> str:
    """Escape special characters for Telegram HTML parse mode."""
    return text.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")

def build_alert_message(template: str, placeholders: dict) -> str:
    """Render template with escaped placeholders for Telegram HTML."""
    # Escape all placeholder values first
    safe_values = {k: escape_telegram_html(str(v)) for k, v in placeholders.items()}
    # Render template (template contains pre-approved HTML tags)
    return template.format(**safe_values)

2.6 Inline Keyboard (Quick Actions)

def build_alert_keyboard(alert_id: str, details_url: str) -> dict:
    """Build inline keyboard for alert messages."""
    return {
        "inline_keyboard": [
            [
                {
                    "text": "✅ Acknowledge",
                    "callback_data": json.dumps({"action": "ack", "id": alert_id})
                },
                {
                    "text": "📹 View Live Feed",
                    "url": details_url  # Opens camera live view in browser
                }
            ],
            [
                {
                    "text": "📋 View Details",
                    "callback_data": json.dumps({"action": "details", "id": alert_id})
                },
                {
                    "text": "🔍 View on Map",
                    "callback_data": json.dumps({"action": "map", "id": alert_id})
                }
            ],
            [
                {
                    "text": "🚨 Escalate",
                    "callback_data": json.dumps({"action": "escalate", "id": alert_id})
                }
            ]
        ]
    }

# Callback query handler
async def handle_callback_query(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
    """Handle button clicks on inline keyboards."""
    query = update.callback_query
    await query.answer()  # Acknowledge to stop loading animation
    
    data = json.loads(query.data)
    action = data["action"]
    alert_id = data["id"]
    
    handlers = {
        "ack": self._handle_acknowledge,
        "details": self._handle_view_details,
        "map": self._handle_view_map,
        "escalate": self._handle_escalate,
    }
    
    handler = handlers.get(action)
    if handler:
        result = await handler(alert_id, query.from_user.id)
        await query.edit_message_reply_markup(
            reply_markup=result.updated_keyboard if result.success else None
        )
        if result.notification_text:
            await query.answer(result.notification_text, show_alert=True)

2.7 Telegram Delivery Confirmation

async def telegram_webhook_handler(self, request: Request):
    """Handle Telegram webhook updates for delivery receipts."""
    update = await request.json()
    
    # Delivery confirmation comes via callback_query or message delivery
    if "message" in update:
        message = update["message"]
        # Store message_id -> delivery mapping
        await self.delivery_tracker.update_status(
            channel_message_id=f"tg:{message['chat']['id']}:{message['message_id']}",
            status="delivered",
            delivered_at=datetime.utcnow()
        )
    
    if "callback_query" in update:
        # User interacted with message — considered "engaged"
        callback = update["callback_query"]
        await self.delivery_tracker.update_status(
            channel_message_id=f"tg:{callback['message']['chat']['id']}:{callback['message']['message_id']}",
            status="engaged",
            engaged_at=datetime.utcnow()
        )
    
    return {"ok": True}

2.8 Telegram API Limits

Limit Value Handling
Message length 4096 chars Truncate with ellipsis + "View details" link
Photo size 10MB upload, no limit via URL Resize if needed
Video size 50MB Compress if exceeds
Media group 10 items max Split into multiple messages
Rate limit ~30 messages/sec per bot Queue + throttle
Caption length 1024 chars Truncate if needed

3. WhatsApp Business API Integration

3.1 Architecture — Official Meta API

Uses the official WhatsApp Business Platform Cloud API (Meta). NOT unofficial solutions.

+----------------------------+
|  Your Notification Service  |
|         (FastAPI)           |
+------------+---------------+
             |
             | HTTPS POST (Graph API v18.0+)
             v
+----------------------------+
|  Meta Graph API            |
|  /v18.0/{phone_number_id}  |
|  /messages                 |
+------------+---------------+
             |
             | WhatsApp protocol
             v
+----------------------------+
|  WhatsApp Business API     |
|  (Meta-managed)            |
+------------+---------------+
             |
             | Encrypted delivery
             v
+----------------------------+
|  Recipient WhatsApp App    |
+----------------------------+

3.2 Configuration

# config/whatsapp.yml (encrypted at rest)
whatsapp:
  enabled: true
  api_version: "v18.0"
  
  # Business Account credentials
  access_token: "${ENCRYPTED:ENC[PKCS7,...]}"    # System User access token
  phone_number_id: "123456789012345"              # WhatsApp Business Phone Number ID
  business_account_id: "987654321098765"          # WhatsApp Business Account ID
  
  # Webhook configuration (for delivery receipts)
  webhook:
    verify_token: "${ENCRYPTED:...}"              # Token for webhook verification
    endpoint: "/webhooks/whatsapp"
    
  # Rate limiting (enforced by Meta + our throttling)
  rate_limit:
    messages_per_second: 80                        # Business tier limit
    burst_capacity: 200
    
  # Template namespace (auto-assigned by Meta)
  template_namespace: "your_business_namespace"

3.3 Authentication

class WhatsAppAuth:
    """Handle WhatsApp Business API authentication."""
    
    TOKEN_REFRESH_BUFFER = 300  # Refresh 5 min before expiry
    
    def __init__(self, kms_client, config_store):
        self.kms = kms_client
        self.config = config_store
        self._access_token: Optional[str] = None
        self._token_expiry: Optional[datetime] = None
    
    async def get_access_token(self) -> str:
        """Get valid access token, refresh if needed."""
        if self._access_token and self._token_expiry:
            if datetime.utcnow() < self._token_expiry - timedelta(seconds=self.TOKEN_REFRESH_BUFFER):
                return self._access_token
        
        # Fetch and decrypt stored token
        encrypted_token = await self.config.get("whatsapp.access_token")
        decrypted = self.kms.decrypt(encrypted_token)
        
        # For long-lived tokens, validate with Meta
        token_info = await self._introspect_token(decrypted)
        
        self._access_token = decrypted
        self._token_expiry = datetime.utcnow() + timedelta(seconds=token_info["expires_in"])
        
        return self._access_token
    
    async def _introspect_token(self, token: str) -> dict:
        """Validate token with Meta's debug endpoint."""
        url = "https://graph.facebook.com/debug_token"
        params = {
            "input_token": token,
            "access_token": token  # Use same token for validation
        }
        async with aiohttp.ClientSession() as session:
            async with session.get(url, params=params) as resp:
                data = await resp.json()
                return data["data"]

3.4 Message Sending — API Implementation

Core Message Sender

class WhatsAppChannelAdapter:
    BASE_URL = "https://graph.facebook.com"
    
    def __init__(self, auth: WhatsAppAuth, phone_number_id: str, api_version: str = "v18.0"):
        self.auth = auth
        self.phone_number_id = phone_number_id
        self.api_version = api_version
        self.session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30))
    
    def _build_url(self, endpoint: str) -> str:
        return f"{self.BASE_URL}/{self.api_version}/{self.phone_number_id}/{endpoint}"
    
    # --- Template Messages (Business-Initiated) ---
    
    async def send_template_message(
        self,
        to: str,                          # E.164 format: +1234567890
        template_name: str,
        language_code: str = "en",
        components: Optional[List[dict]] = None
    ) -> WhatsAppSendResult:
        """
        Send approved template message.
        Required for business-initiated conversations outside 24h window.
        """
        url = self._build_url("messages")
        token = await self.auth.get_access_token()
        
        payload = {
            "messaging_product": "whatsapp",
            "recipient_type": "individual",
            "to": self._normalize_phone(to),
            "type": "template",
            "template": {
                "name": template_name,
                "language": {"code": language_code}
            }
        }
        
        if components:
            payload["template"]["components"] = components
        
        headers = {
            "Authorization": f"Bearer {token}",
            "Content-Type": "application/json"
        }
        
        response = await self.session.post(url, json=payload, headers=headers)
        data = await response.json()
        
        return WhatsAppSendResult(
            message_id=data.get("messages", [{}])[0].get("id"),
            status="sent" if response.status == 200 else "failed",
            error=data.get("error", {}).get("message") if response.status != 200 else None,
            response_code=response.status,
            timestamp_utc=datetime.utcnow()
        )
    
    # --- Text Messages (Within 24h Session) ---
    
    async def send_text_message(
        self,
        to: str,
        text: str,
        preview_url: bool = False
    ) -> WhatsAppSendResult:
        """
        Send free-form text message.
        Only works within 24h of last user message (session message).
        """
        url = self._build_url("messages")
        token = await self.auth.get_access_token()
        
        # Truncate to WhatsApp limit
        safe_text = text[:4096] if len(text) > 4096 else text
        
        payload = {
            "messaging_product": "whatsapp",
            "recipient_type": "individual",
            "to": self._normalize_phone(to),
            "type": "text",
            "text": {
                "body": safe_text,
                "preview_url": preview_url
            }
        }
        
        headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
        response = await self.session.post(url, json=payload, headers=headers)
        data = await response.json()
        
        return self._parse_result(response, data)
    
    # --- Image Messages ---
    
    async def send_image(
        self,
        to: str,
        image_url: str,           # Public HTTPS URL
        caption: str = ""
    ) -> WhatsAppSendResult:
        """Send image message. Image must be publicly accessible via HTTPS."""
        url = self._build_url("messages")
        token = await self.auth.get_access_token()
        
        payload = {
            "messaging_product": "whatsapp",
            "recipient_type": "individual",
            "to": self._normalize_phone(to),
            "type": "image",
            "image": {
                "link": image_url,           # Public URL (not file upload)
                "caption": caption[:4096]    # Caption limit
            }
        }
        
        headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
        response = await self.session.post(url, json=payload, headers=headers)
        data = await response.json()
        
        return self._parse_result(response, data)
    
    # --- Video Messages ---
    
    async def send_video(
        self,
        to: str,
        video_url: str,            # Public HTTPS URL
        caption: str = ""
    ) -> WhatsAppSendResult:
        """Send video message. Max 16MB."""
        url = self._build_url("messages")
        token = await self.auth.get_access_token()
        
        payload = {
            "messaging_product": "whatsapp",
            "recipient_type": "individual",
            "to": self._normalize_phone(to),
            "type": "video",
            "video": {
                "link": video_url,
                "caption": caption[:4096]
            }
        }
        
        headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
        response = await self.session.post(url, json=payload, headers=headers)
        data = await response.json()
        
        return self._parse_result(response, data)
    
    # --- Media Upload (for non-URL media) ---
    
    async def upload_media(self, media_path: str, mime_type: str) -> str:
        """
        Upload media to WhatsApp servers.
        Returns media_id that can be used in message sends.
        """
        upload_url = f"{self.BASE_URL}/{self.api_version}/{self.phone_number_id}/media"
        token = await self.auth.get_access_token()
        
        data = aiohttp.FormData()
        data.add_field("messaging_product", "whatsapp")
        data.add_field("file", open(media_path, "rb"), filename=os.path.basename(media_path))
        data.add_field("type", mime_type)
        
        headers = {"Authorization": f"Bearer {token}"}
        response = await self.session.post(upload_url, data=data, headers=headers)
        result = await response.json()
        
        return result["id"]  # Media ID for reuse
    
    @staticmethod
    def _normalize_phone(phone: str) -> str:
        """Normalize to E.164 format."""
        cleaned = re.sub(r"[^\d+]", "", phone)
        if not cleaned.startswith("+"):
            raise ValueError(f"Phone number must include country code: {phone}")
        return cleaned

3.5 Message Templates (Meta Approval Required)

Template Registration & Approval Workflow

class WhatsAppTemplateManager:
    """Manage WhatsApp message templates with Meta approval workflow."""
    
    async def create_template(self, template: TemplateDefinition) -> TemplateResult:
        """
        Submit new template for Meta approval.
        Templates must be approved before use in business-initiated messages.
        """
        url = f"{BASE_URL}/{api_version}/{business_account_id}/message_templates"
        
        payload = {
            "name": template.name,           # lowercase_underscore format
            "category": template.category,    # "UTILITY" or "MARKETING"
            "language": template.language,
            "components": self._build_components(template)
        }
        
        response = await self.session.post(url, json=payload, headers=headers)
        result = await response.json()
        
        # Template goes through approval: PENDING -> APPROVED/REJECTED
        return TemplateResult(
            template_id=result["id"],
            name=template.name,
            status="PENDING_APPROVAL",  # Initial state
            estimated_approval_time="up to 24 hours"
        )
    
    async def get_template_status(self, template_name: str) -> str:
        """Check template approval status."""
        # Statuses: PENDING, APPROVED, REJECTED, PAUSED, DISABLED
        template = await self._fetch_template(template_name)
        return template["status"]
    
    async def list_approved_templates(self) -> List[dict]:
        """List all approved templates for the UI."""
        url = f"{BASE_URL}/{api_version}/{business_account_id}/message_templates"
        params = {"status": "APPROVED"}
        response = await self.session.get(url, params=params, headers=headers)
        return (await response.json()).get("data", [])

Required Templates (Pre-Submitted for Approval)

# Templates must be created and approved via Meta Business Manager
# or via API before sending business-initiated messages

whatsapp_templates:
  # --- Alert: Person Detected (Known) ---
  - name: "person_detected_known"
    category: "UTILITY"
    language: "en"
    components:
      header:
        type: "TEXT"
        text: "🔍 Person Detected"
      body:
        type: "BODY"
        text: "{{1}} ({{2}}) detected at camera {{3}} on {{4}} at {{5}}.\n\nSeverity: {{6}}"
        # {{1}} = person_name, {{2}} = role, {{3}} = camera_name,
        # {{4}} = date, {{5}} = time, {{6}} = severity
      footer:
        type: "FOOTER"
        text: "AI Surveillance Platform"
      buttons:
        - type: "QUICK_REPLY"
          text: "Acknowledge"
        - type: "URL"
          text: "View Details"
          url: "https://portal.example.com/alerts/{{7}}"  # {{7}} = alert_id
    
  # --- Alert: Person Detected (Unknown) ---
  - name: "person_detected_unknown"
    category: "UTILITY"
    language: "en"
    components:
      header:
        type: "TEXT"
        text: "❓ Unknown Person Detected"
      body:
        type: "BODY"
        text: "An unknown person was detected at camera {{1}} on {{2}} at {{3}}.\n\nConfidence: {{4}}%"
      footer:
        type: "FOOTER"
        text: "AI Surveillance Platform"

  # --- Alert: Watchlist Match ---
  - name: "watchlist_alert"
    category: "UTILITY"
    language: "en"
    components:
      header:
        type: "TEXT"
        text: "⚠️ WATCHLIST ALERT"
      body:
        type: "BODY"
        text: "WATCHLIST: {{1}} ({{2}}) detected at camera {{3}} on {{4}} at {{5}}.\n\nThis person is on the {{6}} watchlist."
        # {{1}} = name, {{2}} = watchlist_type, {{3}} = camera,
        # {{4}} = date, {{5}} = time, {{6}} = watchlist_name
      footer:
        type: "FOOTER"
        text: "AI Surveillance Platform"
      buttons:
        - type: "QUICK_REPLY"
          text: "Acknowledge"
        - type: "QUICK_REPLY"
          text: "Escalate"

  # --- Alert: Blacklist Match ---
  - name: "blacklist_alert"
    category: "UTILITY"
    language: "en"
    components:
      header:
        type: "TEXT"
        text: "🚨 BLACKLIST ALERT"
      body:
        type: "BODY"
        text: "BLACKLIST: {{1}} was detected at camera {{2}} on {{3}} at {{4}}.\n\nThis is a HIGH PRIORITY security alert."
      footer:
        type: "FOOTER"
        text: "AI Surveillance Platform"
      buttons:
        - type: "QUICK_REPLY"
          text: "Acknowledge"
        - type: "QUICK_REPLY"
          text: "Dispatch Security"

  # --- Alert: Suspicious Activity ---
  - name: "suspicious_activity"
    category: "UTILITY"
    language: "en"
    components:
      header:
        type: "TEXT"
        text: "🛑 Suspicious Activity"
      body:
        type: "BODY"
        text: "Suspicious activity detected: {{1}} at camera {{2}} on {{3}} at {{4}}.\n\nConfidence: {{5}}%"
      footer:
        type: "FOOTER"
        text: "AI Surveillance Platform"

  # --- Alert: System ---
  - name: "system_alert"
    category: "UTILITY"
    language: "en"
    components:
      header:
        type: "TEXT"
        text: "⚙️ System Alert"
      body:
        type: "BODY"
        text: "{{1}}\n\nTime: {{2}}\nSeverity: {{3}}"
      footer:
        type: "FOOTER"
        text: "AI Surveillance Platform"

  # --- Alert: Escalation ---
  - name: "escalation_notice"
    category: "UTILITY"
    language: "en"
    components:
      header:
        type: "TEXT"
        text: "⬆️ Alert Escalated"
      body:
        type: "BODY"
        text: "Alert #{{1}} has been escalated to Level {{2}}.\n\nOriginal alert: {{3}}\n\nThis alert has not been acknowledged within {{4}} minutes."
      footer:
        type: "FOOTER"
        text: "AI Surveillance Platform"

Template Parameter Mapping

class TemplateParameterMapper:
    """Map alert data to WhatsApp template parameters."""
    
    MAPPINGS = {
        "person_detected_known": {
            1: "person_name",
            2: "person_role",
            3: "camera_name",
            4: "date",
            5: "time",
            6: "severity",
            7: "alert_id"
        },
        "person_detected_unknown": {
            1: "camera_name",
            2: "date",
            3: "time",
            4: "confidence"
        },
        "watchlist_alert": {
            1: "person_name",
            2: "watchlist_type",
            3: "camera_name",
            4: "date",
            5: "time",
            6: "watchlist_name"
        },
        "blacklist_alert": {
            1: "person_name",
            2: "camera_name",
            3: "date",
            4: "time"
        },
        "suspicious_activity": {
            1: "activity_type",
            2: "camera_name",
            3: "date",
            4: "time",
            5: "confidence"
        },
        "system_alert": {
            1: "message",
            2: "timestamp",
            3: "severity"
        },
        "escalation_notice": {
            1: "alert_id",
            2: "escalation_level",
            3: "alert_summary",
            4: "threshold_minutes"
        }
    }
    
    def map_parameters(self, template_name: str, alert_data: dict) -> List[dict]:
        """Generate ordered parameter list for template."""
        mapping = self.MAPPINGS.get(template_name, {})
        parameters = []
        
        for param_index in sorted(mapping.keys()):
            field_name = mapping[param_index]
            value = self._resolve_field(alert_data, field_name)
            parameters.append({"type": "text", "text": str(value)})
        
        return parameters
    
    def _resolve_field(self, data: dict, field: str) -> str:
        """Resolve nested field from alert data."""
        if field in data:
            return data[field]
        # Handle special fields
        resolvers = {
            "date": lambda d: d["timestamp"].strftime("%Y-%m-%d"),
            "time": lambda d: d["timestamp"].strftime("%H:%M:%S"),
        }
        resolver = resolvers.get(field)
        return resolver(data) if resolver else "N/A"

3.6 WhatsApp Delivery Receipts (Webhooks)

class WhatsAppWebhookHandler:
    """Handle WhatsApp webhook events for delivery tracking."""
    
    STATUS_MAPPING = {
        "sent": "sent",           # Message sent to WhatsApp servers
        "delivered": "delivered",  # Message delivered to recipient's device
        "read": "read",            # Recipient opened the message
        "failed": "failed",        # Message failed to deliver
    }
    
    async def handle_webhook(self, request: Request):
        """Process incoming webhook from Meta."""
        body = await request.json()
        
        for entry in body.get("entry", []):
            for change in entry.get("changes", []):
                value = change.get("value", {})
                
                # Handle message status updates
                if "statuses" in value:
                    for status in value["statuses"]:
                        await self._process_status_update(status)
                
                # Handle incoming messages (for session tracking)
                if "messages" in value:
                    for message in value["messages"]:
                        await self._process_incoming_message(message)
        
        return {"success": True}
    
    async def _process_status_update(self, status: dict):
        """Update delivery status based on webhook."""
        message_id = status["id"]           # WhatsApp message ID
        recipient = status["recipient_id"]   # Phone number
        status_type = status["status"]       # sent/delivered/read/failed
        
        # Map to internal status
        internal_status = self.STATUS_MAPPING.get(status_type, "unknown")
        
        # Update delivery tracking
        await self.delivery_tracker.update_by_provider_id(
            provider_message_id=message_id,
            channel="whatsapp",
            status=internal_status,
            timestamp=datetime.utcnow(),
            error=status.get("errors", [{}])[0].get("title") if status_type == "failed" else None
        )
        
        # If delivered, cancel escalation timer
        if internal_status in ("delivered", "read"):
            await self.escalation_service.cancel_pending_escalation(message_id)
    
    async def _process_incoming_message(self, message: dict):
        """Handle user replies — marks active session for 24h window."""
        phone_number = message["from"]
        
        # Track active session (allows free-form messages for 24h)
        await self.session_tracker.update_session(
            phone_number=phone_number,
            last_user_message=datetime.utcnow()
        )
        
        # Handle quick reply responses
        if message.get("type") == "interactive":
            interactive = message["interactive"]
            if interactive["type"] == "button_reply":
                button_id = interactive["button_reply"]["id"]
                await self._handle_button_reply(phone_number, button_id)
    
    async def _handle_button_reply(self, phone: str, button_id: str):
        """Handle quick reply button clicks."""
        if button_id.startswith("ack_"):
            alert_id = button_id[4:]
            await self.escalation_service.acknowledge_alert(
                alert_id=alert_id,
                acknowledged_by=f"whatsapp:{phone}",
                channel="whatsapp"
            )
        elif button_id.startswith("esc_"):
            alert_id = button_id[4:]
            await self.escalation_service.escalate_alert(alert_id, reason="manual")

3.7 WhatsApp API Limits

Limit Value Handling
Message text length 4096 chars Truncate with details link
Image size 16MB max Resize/compress before upload
Video size 16MB max Compress with FFmpeg
Video duration 60 seconds Trim if needed
Media upload Max 5MB for direct upload Use presigned URLs for larger
Rate limit 80 msg/sec (business tier) Token bucket throttling
Templates Must be pre-approved Submit 48h before needed
Session window 24h from last user message Use templates outside window
Phone number format E.164 (+1234567890) Normalize all inputs

4. Alert Routing Rules Engine

4.1 Rule Engine Architecture

class AlertRoutingEngine:
    """Evaluate routing rules to determine alert recipients."""
    
    def __init__(self, rule_store, recipient_resolver):
        self.rule_store = rule_store
        self.resolver = recipient_resolver
    
    async def evaluate(self, alert_event: AlertEvent) -> RoutingDecision:
        """
        Evaluate all rules against alert event.
        Returns final routing decision with recipients and channels.
        """
        # Fetch all active rules ordered by priority
        rules = await self.rule_store.get_active_rules(
            order_by="priority DESC"
        )
        
        matched_rules = []
        for rule in rules:
            if self._evaluate_rule(rule, alert_event):
                matched_rules.append(rule)
                
                # Stop on first exclusive match if configured
                if rule.stop_on_match:
                    break
        
        if not matched_rules:
            # Apply default routing (all enabled cameras -> security team)
            return await self._default_routing(alert_event)
        
        # Merge decisions from all matched rules
        recipients = set()
        channels = set()
        severity_override = None
        templates = {}
        
        for rule in matched_rules:
            rule_recipients = await self.resolver.resolve(rule.recipient_groups)
            recipients.update(rule_recipients)
            channels.update(rule.channels)
            
            if rule.severity_override:
                severity_override = self._higher_severity(
                    severity_override, rule.severity_override
                )
            
            templates.update(rule.templates or {})
        
        return RoutingDecision(
            alert_id=alert_event.id,
            recipients=list(recipients),
            channels=list(channels),
            severity=severity_override or alert_event.severity,
            templates=templates,
            matched_rules=[r.id for r in matched_rules],
            quiet_hours_applicable=self._check_quiet_hours(channels, alert_event)
        )
    
    def _evaluate_rule(self, rule: AlertRule, event: AlertEvent) -> bool:
        """Evaluate single rule against alert event."""
        conditions = []
        
        for condition in rule.conditions:
            result = self._evaluate_condition(condition, event)
            conditions.append(result)
        
        # Combine with rule logic (AND/OR)
        if rule.logic == "ALL":
            return all(conditions)
        elif rule.logic == "ANY":
            return any(conditions)
        else:  # CUSTOM expression
            return self._evaluate_expression(rule.logic, conditions)
        
        return False

4.2 Condition Types

class ConditionEvaluator:
    """Evaluate individual routing conditions."""
    
    def evaluate(self, condition: Condition, event: AlertEvent) -> bool:
        condition_type = condition.type
        evaluator = getattr(self, f"_{condition_type}", None)
        if not evaluator:
            raise ValueError(f"Unknown condition type: {condition_type}")
        return evaluator(condition, event)
    
    # --- Camera-based ---
    def _camera(self, c: Condition, event: AlertEvent) -> bool:
        """Check if event camera matches condition."""
        return event.camera_id in c.values
    
    # --- Person-based ---
    def _person(self, c: Condition, event: AlertEvent) -> bool:
        """Check if detected person matches condition."""
        if not event.person_id:
            return False
        return event.person_id in c.values
    
    # --- Role-based ---
    def _person_role(self, c: Condition, event: AlertEvent) -> bool:
        """Check if person's role matches condition."""
        if not event.person_role:
            return False
        return event.person_role in c.values  # e.g., ["blacklist", "vip"]
    
    # --- Event type ---
    def _event_type(self, c: Condition, event: AlertEvent) -> bool:
        """Check if event type matches condition."""
        return event.event_type in c.values
    
    # --- Zone-based ---
    def _zone(self, c: Condition, event: AlertEvent) -> bool:
        """Check if detection zone matches condition."""
        return event.zone_id in c.values
    
    # --- Time-based ---
    def _time_range(self, c: Condition, event: AlertEvent) -> bool:
        """Check if event timestamp falls within time range."""
        event_time = event.timestamp.time()
        tz = timezone(c.timezone) if c.timezone else timezone.utc
        local_time = event.timestamp.astimezone(tz).time()
        
        if c.start_time <= c.end_time:
            return c.start_time <= local_time <= c.end_time
        else:  # Crosses midnight
            return local_time >= c.start_time or local_time <= c.end_time
    
    # --- Day of week ---
    def _day_of_week(self, c: Condition, event: AlertEvent) -> bool:
        """Check if event day matches condition."""
        tz = timezone(c.timezone) if c.timezone else timezone.utc
        weekday = event.timestamp.astimezone(tz).strftime("%A").lower()
        return weekday in [d.lower() for d in c.values]
    
    # --- Severity ---
    def _severity(self, c: Condition, event: AlertEvent) -> bool:
        """Check if event severity meets threshold."""
        severity_order = {"low": 1, "medium": 2, "high": 3, "critical": 4}
        event_level = severity_order.get(event.severity, 0)
        threshold = severity_order.get(c.threshold, 0)
        
        operators = {
            "eq": event_level == threshold,
            "gte": event_level >= threshold,
            "gt": event_level > threshold,
            "lte": event_level <= threshold,
        }
        return operators.get(c.operator, False)
    
    # --- Watchlist ---
    def _watchlist(self, c: Condition, event: AlertEvent) -> bool:
        """Check if person is on specified watchlist."""
        if not event.watchlist_matches:
            return False
        matched_lists = [w["list_name"] for w in event.watchlist_matches]
        return any(wl in matched_lists for wl in c.values)
    
    # --- Confidence threshold ---
    def _confidence(self, c: Condition, event: AlertEvent) -> bool:
        """Check if detection confidence meets threshold."""
        operators = {
            "gte": event.confidence >= c.threshold,
            "gt": event.confidence > c.threshold,
            "lte": event.confidence <= c.threshold,
        }
        return operators.get(c.operator, False)

4.3 Rule Configuration Examples

# routing_rules.yml — Configurable via admin UI

routing_rules:
  # Rule 1: Always alert on blacklist persons (highest priority)
  - id: "rule_blacklist_always"
    name: "Blacklist Person Alert"
    description: "Always notify security team when blacklist person is detected"
    priority: 100  # Highest priority
    enabled: true
    logic: "ALL"
    stop_on_match: false
    conditions:
      - type: "person_role"
        operator: "in"
        values: ["blacklist"]
    actions:
      recipient_groups: ["security_team", "management"]
      channels: ["telegram", "whatsapp"]
      severity_override: "critical"
      templates:
        telegram: "blacklist_alert_tg"
        whatsapp: "blacklist_alert"
      bypass_quiet_hours: true
    
  # Rule 2: VIP watchlist alerts
  - id: "rule_vip_watchlist"
    name: "VIP Watchlist Match"
    description: "Alert on VIP watchlist matches during business hours"
    priority: 90
    enabled: true
    logic: "ALL"
    conditions:
      - type: "watchlist"
        values: ["vip_watchlist"]
      - type: "time_range"
        start_time: "07:00"
        end_time: "22:00"
        timezone: "America/New_York"
    actions:
      recipient_groups: ["management"]
      channels: ["telegram"]
      severity_override: "high"
      templates:
        telegram: "watchlist_alert_tg"

  # Rule 3: Suspicious activity at night
  - id: "rule_night_suspicious"
    name: "Night Suspicious Activity"
    description: "Escalate suspicious activity detected during night hours"
    priority: 80
    enabled: true
    logic: "ALL"
    conditions:
      - type: "event_type"
        values: ["suspicious_activity"]
      - type: "time_range"
        start_time: "22:00"
        end_time: "06:00"
        timezone: "America/New_York"
    actions:
      recipient_groups: ["night_staff", "security_team"]
      channels: ["telegram", "whatsapp"]
      severity_override: "high"
      bypass_quiet_hours: true

  # Rule 4: Per-camera rule — Front entrance always monitored
  - id: "rule_front_entrance"
    name: "Front Entrance Monitoring"
    description: "All person detection at front entrance"
    priority: 70
    enabled: true
    logic: "ALL"
    conditions:
      - type: "camera"
        values: ["cam_01_front_entrance"]
      - type: "event_type"
        values: ["person_detected"]
    actions:
      recipient_groups: ["security_team"]
      channels: ["telegram"]

  # Rule 5: Zone-based — Restricted area
  - id: "rule_restricted_zone"
    name: "Restricted Area Alert"
    description: "Any detection in restricted zones"
    priority: 85
    enabled: true
    logic: "ALL"
    conditions:
      - type: "zone"
        values: ["restricted_warehouse", "server_room"]
    actions:
      recipient_groups: ["security_team", "management"]
      channels: ["telegram", "whatsapp"]
      severity_override: "high"
      bypass_quiet_hours: true

  # Rule 6: Unknown persons during off-hours
  - id: "rule_unknown_after_hours"
    name: "Unknown Person After Hours"
    description: "Unknown person detection outside business hours"
    priority: 75
    enabled: true
    logic: "ALL"
    conditions:
      - type: "event_type"
        values: ["person_detected_unknown"]
      - type: "time_range"
        start_time: "18:00"
        end_time: "07:00"
        timezone: "America/New_York"
    actions:
      recipient_groups: ["night_staff", "security_team"]
      channels: ["telegram"]
      severity_override: "medium"

  # Rule 7: System alerts
  - id: "rule_system_alerts"
    name: "System Health Alerts"
    description: "Camera offline, system errors"
    priority: 60
    enabled: true
    logic: "ALL"
    conditions:
      - type: "event_type"
        values: ["system_alert", "camera_offline", "system_error"]
    actions:
      recipient_groups: ["management"]
      channels: ["telegram"]
      severity_override: "high"

  # Rule 8: Critical severity — always to management
  - id: "rule_critical_always"
    name: "Critical Severity Alert"
    description: "All critical severity alerts go to management"
    priority: 95
    enabled: true
    logic: "ALL"
    conditions:
      - type: "severity"
        operator: "gte"
        threshold: "critical"
    actions:
      recipient_groups: ["management"]
      channels: ["telegram", "whatsapp"]
      bypass_quiet_hours: true

  # Rule 9: Low confidence suppression
  - id: "rule_low_confidence"
    name: "Low Confidence Suppression"
    description: "Don't alert on detections below 60% confidence"
    priority: 10  # Low priority — evaluated last
    enabled: true
    logic: "ALL"
    conditions:
      - type: "confidence"
        operator: "lt"
        threshold: 60
    actions:
      recipient_groups: []  # No recipients — suppress
      channels: []

4.4 Admin UI — Rule Builder (Non-Technical)

# Non-technical admin interface specification

rule_builder_ui:
  # Visual drag-and-drop rule builder
  components:
    trigger_selector:
      label: "When this happens..."
      options:
        - "Person is detected"
        - "Watchlist person is detected"
        - "Unknown person is detected"
        - "Suspicious activity detected"
        - "Camera goes offline"
        - "System error occurs"
      
    condition_builder:
      label: "And these conditions match..."
      condition_types:
        - type: "camera"
          label: "Camera"
          widget: "multi_select"
          source: "cameras"  # Populated from camera registry
          
        - type: "person"
          label: "Specific Person"
          widget: "person_search"
          source: "person_database"
          
        - type: "person_role"
          label: "Person Type"
          widget: "multi_select"
          options:
            - value: "blacklist"
              label: "Blacklisted Person"
              color: "red"
            - value: "vip"
              label: "VIP"
              color: "gold"
            - value: "employee"
              label: "Employee"
              color: "blue"
            - value: "visitor"
              label: "Visitor"
              color: "green"
            
        - type: "event_type"
          label: "Event Type"
          widget: "multi_select"
          options:
            - value: "person_detected"
              label: "Person Detected"
            - value: "person_detected_unknown"
              label: "Unknown Person"
            - value: "suspicious_activity"
              label: "Suspicious Activity"
            - value: "system_alert"
              label: "System Alert"
              
        - type: "zone"
          label: "Zone/Area"
          widget: "multi_select"
          source: "zones"  # Populated from zone definitions
          
        - type: "time_range"
          label: "Time of Day"
          widget: "time_range_picker"
          
        - type: "day_of_week"
          label: "Days of Week"
          widget: "day_selector"
          
        - type: "severity"
          label: "Severity Level"
          widget: "severity_threshold"
          options:
            - value: "low"
              label: "Low"
            - value: "medium"
              label: "Medium"
            - value: "high"
              label: "High"
            - value: "critical"
              label: "Critical"
              
        - type: "watchlist"
          label: "Watchlist"
          widget: "multi_select"
          source: "watchlists"
          
        - type: "confidence"
          label: "Confidence Level"
          widget: "slider"
          min: 0
          max: 100
          unit: "%"
    
    action_builder:
      label: "Then do this..."
      actions:
        - type: "notify"
          label: "Send notification to..."
          widget: "recipient_group_selector"
          
        - type: "set_severity"
          label: "Set alert severity to..."
          widget: "severity_selector"
          
        - type: "bypass_quiet_hours"
          label: "Send even during quiet hours"
          widget: "toggle"
          
        - type: "require_acknowledgment"
          label: "Require acknowledgment"
          widget: "toggle"
          
        - type: "escalate_after"
          label: "Escalate if not acknowledged after..."
          widget: "duration_picker"
          default: "15 minutes"

5. Recipient Groups

5.1 Group Data Model

# Recipient group configuration

recipient_groups:
  - id: "security_team"
    name: "Security Team"
    description: "Primary security operations team"
    created_at: "2024-01-15T00:00:00Z"
    updated_at: "2024-06-01T00:00:00Z"
    
    # Channel assignments
    channels:
      telegram:
        enabled: true
        chat_ids:
          - "-1001234567890"      # Security Ops group
        individual_chats:          # Also DM each member
          enabled: true
          for_severity: ["high", "critical"]
      whatsapp:
        enabled: true
        phone_numbers:
          - "+12345678901"
          - "+12345678902"
          - "+12345678903"
    
    # Default settings
    default_settings:
      quiet_hours: null            # No quiet hours (security-critical)
      min_severity: "low"
      auto_acknowledge_after: null  # Never auto-ack
      
    members:
      - user_id: "user_sec_001"
        name: "John Smith"
        role: "Security Supervisor"
        telegram_id: "111111111"
        phone: "+12345678901"
        is_active: true
        added_at: "2024-01-15T00:00:00Z"
        
      - user_id: "user_sec_002"
        name: "Jane Doe"
        role: "Security Officer"
        telegram_id: "222222222"
        phone: "+12345678902"
        is_active: true
        added_at: "2024-02-01T00:00:00Z"
        
      - user_id: "user_sec_003"
        name: "Bob Johnson"
        role: "Security Officer"
        telegram_id: "333333333"
        phone: "+12345678903"
        is_active: true
        added_at: "2024-03-10T00:00:00Z"

  - id: "management"
    name: "Management"
    description: "Management-level personnel for critical alerts"
    
    channels:
      telegram:
        enabled: true
        chat_ids:
          - "-1009876543210"      # Management group
      whatsapp:
        enabled: true
        phone_numbers:
          - "+19998887777"
          - "+19998887778"
    
    default_settings:
      quiet_hours: null
      min_severity: "high"
      
    members:
      - user_id: "user_mgr_001"
        name: "Alice Williams"
        role: "Security Director"
        telegram_id: "444444444"
        phone: "+19998887777"
        is_active: true
        
      - user_id: "user_mgr_002"
        name: "Charlie Brown"
        role: "Operations Manager"
        telegram_id: "555555555"
        phone: "+19998887778"
        is_active: true

  - id: "night_staff"
    name: "Night Staff"
    description: "Night shift security personnel (22:00 - 06:00)"
    
    channels:
      telegram:
        enabled: true
        chat_ids:
          - "-1005555666677"      # Night shift group
      whatsapp:
        enabled: false
    
    default_settings:
      quiet_hours: null
      min_severity: "medium"
      active_time_range:
        start: "22:00"
        end: "06:00"
        timezone: "America/New_York"
        
    members:
      - user_id: "user_night_001"
        name: "David Lee"
        role: "Night Supervisor"
        telegram_id: "666666666"
        phone: "+15551234567"
        is_active: true
        
      - user_id: "user_night_002"
        name: "Eva Martinez"
        role: "Night Officer"
        telegram_id: "777777777"
        phone: "+15551234568"
        is_active: true

5.2 Member Management API

# API endpoints for member management

class RecipientGroupAPI:
    
    # GET /recipient-groups — List all groups
    async def list_groups(
        self,
        skip: int = 0,
        limit: int = 50,
        active_only: bool = True
    ) -> List[RecipientGroupSummary]:
        ...
    
    # GET /recipient-groups/{group_id} — Get group details
    async def get_group(self, group_id: str) -> RecipientGroup:
        ...
    
    # POST /recipient-groups — Create new group
    async def create_group(self, request: CreateGroupRequest) -> RecipientGroup:
        """
        Request body:
        {
            "name": "New Group",
            "description": "Optional description",
            "channels": {
                "telegram": {"enabled": true, "chat_ids": []},
                "whatsapp": {"enabled": false}
            }
        }
        """
        ...
    
    # PUT /recipient-groups/{group_id} — Update group
    async def update_group(self, group_id: str, request: UpdateGroupRequest) -> RecipientGroup:
        ...
    
    # DELETE /recipient-groups/{group_id} — Delete group
    async def delete_group(self, group_id: str) -> None:
        ...
    
    # POST /recipient-groups/{group_id}/members — Add member
    async def add_member(
        self,
        group_id: str,
        request: AddMemberRequest
    ) -> GroupMember:
        """
        Request body:
        {
            "name": "John Smith",
            "role": "Security Officer",
            "telegram_id": "123456789",      # Optional
            "phone": "+12345678901",         # Required for WhatsApp
            "is_active": true
        }
        """
        ...
    
    # PUT /recipient-groups/{group_id}/members/{member_id} — Update member
    async def update_member(
        self,
        group_id: str,
        member_id: str,
        request: UpdateMemberRequest
    ) -> GroupMember:
        ...
    
    # DELETE /recipient-groups/{group_id}/members/{member_id} — Remove member
    async def remove_member(self, group_id: str, member_id: str) -> None:
        ...
    
    # POST /recipient-groups/{group_id}/members/{member_id}/toggle
    async def toggle_member_active(self, group_id: str, member_id: str) -> GroupMember:
        """Quick toggle member active/inactive status."""
        ...

5.3 Multiple Groups Per Alert

An alert can be routed to multiple groups simultaneously. Resolution rules:

class MultiGroupResolver:
    """Resolve multiple recipient groups, deduplicating members."""
    
    async def resolve(self, group_ids: List[str], alert: AlertEvent) -> List[ResolvedRecipient]:
        """Resolve all groups, deduplicate by contact info."""
        all_recipients = []
        seen_telegram_ids = set()
        seen_phones = set()
        
        for group_id in group_ids:
            group = await self.group_store.get(group_id)
            if not group or not group.is_active:
                continue
            
            # Check time-based group availability (e.g., night_staff)
            if group.active_time_range:
                if not self._is_within_active_hours(group.active_time_range, alert.timestamp):
                    continue
            
            for member in group.members:
                if not member.is_active:
                    continue
                
                # Telegram deduplication
                if member.telegram_id and member.telegram_id not in seen_telegram_ids:
                    seen_telegram_ids.add(member.telegram_id)
                    all_recipients.append(ResolvedRecipient(
                        type="telegram",
                        identifier=member.telegram_id,
                        name=member.name,
                        group_id=group_id,
                        role=member.role
                    ))
                
                # WhatsApp deduplication
                if member.phone and member.phone not in seen_phones:
                    seen_phones.add(member.phone)
                    all_recipients.append(ResolvedRecipient(
                        type="whatsapp",
                        identifier=member.phone,
                        name=member.name,
                        group_id=group_id,
                        role=member.role
                    ))
        
        return all_recipients

6. Quiet Hours

6.1 Quiet Hours Configuration

# Default: NO quiet hours for security systems
# But configurable per-channel if needed

quiet_hours:
  # Global default (can be overridden per-group, per-channel)
  default_policy:
    enabled: false                    # Default: disabled for security
    description: "No quiet hours by default for security-critical alerts"
  
  # Per-channel quiet hours (if enabled)
  channels:
    telegram:
      enabled: false
      schedule:
        - days: ["Saturday", "Sunday"]
          start: "00:00"
          end: "23:59"
          timezone: "America/New_York"
          label: "Weekend quiet hours (optional)"
      
    whatsapp:
      enabled: false
      schedule: []
  
  # Per-group overrides (examples of when it might be used)
  group_overrides:
    # Night staff has no quiet hours (they work at night)
    night_staff:
      enabled: false
    
    # Management might have quiet hours for non-critical
    management:
      enabled: true
      schedule:
        - days: ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday"]
          start: "23:00"
          end: "06:00"
          timezone: "America/New_York"
          label: "Weeknight quiet hours"
        - days: ["Saturday", "Sunday"]
          start: "18:00"
          end: "08:00"
          timezone: "America/New_York"
          label: "Weekend quiet hours"
      # During quiet hours, only these severities come through
      allowed_during_quiet: ["critical"]
      
  # Emergency bypass configuration
  emergency_bypass:
    enabled: true
    triggers:
      - severity: "critical"           # Critical alerts always bypass
      - tag: "emergency"               # Alerts tagged as emergency
      - rule_override: "bypass_quiet_hours"  # Individual rule setting
    notification_method: "all_channels"  # Send via all channels when bypassed

6.2 Quiet Hours Evaluation Logic

class QuietHoursEvaluator:
    """Evaluate whether quiet hours apply to a given alert."""
    
    async def should_suppress(
        self,
        alert: AlertEvent,
        recipient: ResolvedRecipient,
        channel: str
    ) -> QuietHoursDecision:
        """
        Determine if alert should be suppressed due to quiet hours.
        Returns suppression decision with reason.
        """
        # Check emergency bypass (critical severity always sends)
        if alert.severity == "critical":
            return QuietHoursDecision(
                suppress=False,
                reason: "Critical severity — emergency bypass active"
            )
        
        # Check rule-level bypass
        if alert.bypass_quiet_hours:
            return QuietHoursDecision(
                suppress=False,
                reason: "Rule-level quiet hours bypass"
            )
        
        # Get quiet hours config for this recipient + channel
        config = await self._get_quiet_hours_config(recipient, channel)
        
        if not config or not config.enabled:
            return QuietHoursDecision(suppress=False, reason="Quiet hours not configured")
        
        # Check if current time falls within quiet hours
        current_time = datetime.now(config.timezone)
        in_quiet_hours = self._is_in_quiet_hours(current_time, config.schedule)
        
        if not in_quiet_hours:
            return QuietHoursDecision(
                suppress=False,
                reason="Current time outside quiet hours"
            )
        
        # In quiet hours — check if severity is allowed through
        if alert.severity in config.allowed_during_quiet:
            return QuietHoursDecision(
                suppress=False,
                reason=f"Severity '{alert.severity}' allowed during quiet hours"
            )
        
        # Suppress the notification
        return QuietHoursDecision(
            suppress=True,
            reason=f"Quiet hours active ({self._describe_schedule(config.schedule)}). "
                   f"Severity '{alert.severity}' below threshold.",
            resume_at=self._next_non_quiet_time(config.schedule, current_time)
        )
    
    def _is_in_quiet_hours(self, current: datetime, schedule: List[ScheduleEntry]) -> bool:
        """Check if current time falls within any quiet hours schedule entry."""
        current_day = current.strftime("%A")
        current_time = current.time()
        
        for entry in schedule:
            if current_day not in entry.days:
                continue
            
            start = entry.start_time
            end = entry.end_time
            
            if start <= end:
                # Same-day window (e.g., 23:00 to 06:00 crosses midnight)
                if start <= current_time <= end:
                    return True
            else:
                # Crosses midnight
                if current_time >= start or current_time <= end:
                    return True
        
        return False

6.3 Admin UI — Quiet Hours Configuration

ui_quiet_hours:
  layout: "simple_form"  # Non-technical friendly
  
  fields:
    - id: "enable_quiet_hours"
      label: "Enable Quiet Hours"
      type: "toggle_switch"
      help_text: "When enabled, non-critical alerts will be held during specified times."
      default: false
      warning: "⚠️ For security applications, quiet hours are NOT recommended."
      
    - id: "quiet_hours_schedule"
      label: "Quiet Hours Schedule"
      type: "schedule_builder"
      visible_when: "enable_quiet_hours == true"
      options:
        presets:
          - label: "Never (default for security)"
            value: "none"
          - label: "Nights (22:00 - 06:00)"
            value: "nights"
          - label: "Weekends (all day)"
            value: "weekends"
          - label: "Custom"
            value: "custom"
            
    - id: "allow_critical_during_quiet"
      label: "Always allow critical alerts"
      type: "toggle_switch"
      visible_when: "enable_quiet_hours == true"
      default: true
      disabled: true  # Always on, non-configurable
      help_text: "Critical alerts always bypass quiet hours. This cannot be disabled for security."
      
    - id: "timezone"
      label: "Timezone"
      type: "timezone_dropdown"
      default: "America/New_York"

7. Message Templates

7.1 Template Engine

class TemplateEngine:
    """Render message templates with placeholder substitution."""
    
    # Placeholder pattern: {variable_name}
    PLACEHOLDER_PATTERN = re.compile(r"\{([a-zA-Z_][a-zA-Z0-9_]*)\}")
    
    # Supported variables
    BUILT_IN_VARIABLES = {
        "timestamp": lambda ctx: ctx["timestamp"].strftime("%Y-%m-%d %H:%M:%S"),
        "date": lambda ctx: ctx["timestamp"].strftime("%Y-%m-%d"),
        "time": lambda ctx: ctx["timestamp"].strftime("%H:%M:%S"),
        "timezone": lambda ctx: str(ctx["timestamp"].tzinfo or "UTC"),
    }
    
    async def render(
        self,
        template_id: str,
        context: dict,
        channel: str,           # "telegram" or "whatsapp"
        include_media: bool = False
    ) -> RenderedMessage:
        """Render template for specified channel."""
        template = await self.template_store.get(template_id, channel)
        
        # Build variable context
        variables = self._build_variables(context)
        
        # Render text
        text = self._render_text(template.text, variables)
        
        # Apply channel-specific formatting
        if channel == "telegram":
            formatted_text = self._format_telegram(text, template.formatting)
            keyboard = template.keyboard if template.keyboard else None
        elif channel == "whatsapp":
            formatted_text = self._format_whatsapp(text)
            keyboard = None  # WhatsApp uses template buttons
        else:
            formatted_text = text
            keyboard = None
        
        # Determine media attachments
        media = None
        if include_media and template.supports_media:
            media = await self._resolve_media(context, template.media_config)
        
        return RenderedMessage(
            text=formatted_text,
            raw_text=text,
            keyboard=keyboard,
            media=media,
            template_id=template_id,
            channel=channel
        )
    
    def _build_variables(self, context: dict) -> dict:
        """Build full variable dictionary from context."""
        variables = dict(context)
        
        # Add built-in computed variables
        for name, resolver in self.BUILT_IN_VARIABLES.items():
            if name not in variables:
                try:
                    variables[name] = resolver(context)
                except Exception:
                    variables[name] = "N/A"
        
        return variables
    
    def _render_text(self, template_text: str, variables: dict) -> str:
        """Substitute placeholders with values."""
        def replace_match(match):
            var_name = match.group(1)
            value = variables.get(var_name, f"{{{var_name}}}")
            return str(value)
        
        return self.PLACEHOLDER_PATTERN.sub(replace_match, template_text)

7.2 Template Definitions

Telegram Templates

templates:
  telegram:
    # --- Person Detected (Known) ---
    person_detected_known:
      name: "Person Detected (Known)"
      text: |
        🔍 <b>Person Detected</b>
        
        <b>{name}</b> ({role})
        📍 Camera: {camera_name}
        🕐 {date} at {time}
        🎯 Confidence: {confidence}%
      formatting: "HTML"
      supports_media: true
      media_config:
        image:
          caption_template: "{name} at {camera_name} — {time}"
          resize: [1280, 720]
        video:
          caption_template: "{name} at {camera_name} — {time}"
          max_duration: 10
          max_size_mb: 50
      keyboard: "alert_actions"  # Reference to keyboard definition
      
    # --- Person Detected (Known) WITH Image ---
    person_detected_known_with_image:
      name: "Person Detected (Known) + Photo"
      text: "{name} ({role}) detected at {camera_name} — {timestamp}"
      formatting: "HTML"
      supports_media: true
      media_config:
        image:
          caption_template: |
            🔍 <b>{name}</b> ({role})
            📍 {camera_name} | 🕐 {time}
            🎯 {confidence}% confidence
          resize: [1280, 720]
      keyboard: "alert_actions"
      
    # --- Person Detected (Known) WITH Video ---
    person_detected_known_with_video:
      name: "Person Detected (Known) + Video"
      text: "{name} ({role}) detected at {camera_name} — {timestamp}"
      formatting: "HTML"
      supports_media: true
      media_config:
        video:
          caption_template: |
            🎥 <b>{name}</b> ({role})
            📍 {camera_name} | 🕐 {time}
            🎯 {confidence}% confidence
          max_duration: 10
          max_size_mb: 50
      keyboard: "alert_actions"
    
    # --- Person Detected (Unknown) ---
    person_detected_unknown:
      name: "Person Detected (Unknown)"
      text: |
        ❓ <b>Unknown Person Detected</b>
        
        📍 Camera: {camera_name}
        🕐 {date} at {time}
        🎯 Confidence: {confidence}%
        
        <i>This person is not in the database.</i>
      formatting: "HTML"
      supports_media: true
      media_config:
        image:
          caption_template: "Unknown person at {camera_name} — {time}"
      keyboard: "alert_actions"
    
    # --- Watchlist Alert ---
    watchlist_alert:
      name: "Watchlist Match"
      text: |
        ⚠️ <b>WATCHLIST ALERT</b>
        
        <b>{name}</b>
        📋 Watchlist: {watchlist_type}
        📍 Camera: {camera_name}
        🕐 {date} at {time}
        🎯 Confidence: {confidence}%
      formatting: "HTML"
      supports_media: true
      media_config:
        image:
          caption_template: "WATCHLIST: {name} ({watchlist_type})"
      keyboard: "watchlist_actions"
      
    # --- Blacklist Alert ---
    blacklist_alert:
      name: "Blacklist Match"
      text: |
        🚨 <b>BLACKLIST ALERT</b> 🚨
        
        ⚠️ <b>{name}</b> has been detected!
        📍 Camera: {camera_name}
        🕐 {date} at {time}
        🎯 Confidence: {confidence}%
        
        <b>This person is BLACKLISTED. Immediate attention required.</b>
      formatting: "HTML"
      supports_media: true
      media_config:
        image:
          caption_template: "BLACKLIST: {name} at {camera_name}"
        video:
          caption_template: "BLACKLIST: {name} at {camera_name}"
          max_duration: 15
      keyboard: "blacklist_actions"
      
    # --- Suspicious Activity ---
    suspicious_activity:
      name: "Suspicious Activity"
      text: |
        🛑 <b>Suspicious Activity Detected</b>
        
        Type: <b>{activity_type}</b>
        📍 Camera: {camera_name}
        🕐 {date} at {time}
        🎯 Confidence: {confidence}%
        
        {description}
      formatting: "HTML"
      supports_media: true
      media_config:
        image:
          caption_template: "Suspicious: {activity_type} at {camera_name}"
        video:
          caption_template: "Suspicious: {activity_type} at {camera_name}"
          max_duration: 15
      keyboard: "alert_actions"
    
    # --- System Alert ---
    system_alert:
      name: "System Alert"
      text: |
        ⚙️ <b>System Alert</b>
        
        {message}
        
        🕐 {timestamp}
        🔧 Severity: {severity}
      formatting: "HTML"
      supports_media: false
      keyboard: null
      
    # --- Escalation Notice ---
    escalation_notice:
      name: "Escalation Notice"
      text: |
        ⬆️ <b>Alert Escalated</b>
        
        Alert #{alert_id} has been escalated to <b>Level {escalation_level}</b>.
        
        Original: {alert_summary}
        ⏱️ Unacknowledged for {elapsed_minutes} minutes
        
        <i>Please review immediately.</i>
      formatting: "HTML"
      supports_media: false
      keyboard: "escalation_actions"

WhatsApp Template References

WhatsApp templates are pre-defined in Meta Business Manager (see Section 3.5). The template engine maps alert data to template parameters:

WHATSAPP_TEMPLATE_MAPPING = {
    "person_detected_known": {
        "template_name": "person_detected_known",
        "parameter_mapping": {
            1: "person_name",
            2: "person_role",
            3: "camera_name",
            4: "date",
            5: "time",
            6: "severity",
            7: "alert_id"
        }
    },
    "person_detected_unknown": {
        "template_name": "person_detected_unknown",
        "parameter_mapping": {
            1: "camera_name",
            2: "date",
            3: "time",
            4: "confidence"
        }
    },
    "watchlist_alert": {
        "template_name": "watchlist_alert",
        "parameter_mapping": {
            1: "person_name",
            2: "watchlist_type",
            3: "camera_name",
            4: "date",
            5: "time",
            6: "watchlist_name"
        }
    },
    "blacklist_alert": {
        "template_name": "blacklist_alert",
        "parameter_mapping": {
            1: "person_name",
            2: "camera_name",
            3: "date",
            4: "time"
        }
    },
    "suspicious_activity": {
        "template_name": "suspicious_activity",
        "parameter_mapping": {
            1: "activity_type",
            2: "camera_name",
            3: "date",
            4: "time",
            5: "confidence"
        }
    },
    "system_alert": {
        "template_name": "system_alert",
        "parameter_mapping": {
            1: "message",
            2: "timestamp",
            3: "severity"
        }
    },
    "escalation_notice": {
        "template_name": "escalation_notice",
        "parameter_mapping": {
            1: "alert_id",
            2: "escalation_level",
            3: "alert_summary",
            4: "threshold_minutes"
        }
    }
}

7.3 Template Rendered Examples

Example 1: Known Person (Telegram)

🔍 Person Detected

John Smith (Employee)
📍 Camera: Front Entrance
🕐 2024-06-15 at 14:32:18
🎯 Confidence: 94%
[Inline buttons: Acknowledge | View Live Feed | View Details]

Example 2: Blacklist Alert (Telegram + Image)

🚨 BLACKLIST ALERT 🚨

⚠️ Mark Johnson has been detected!
📍 Camera: Warehouse Rear
🕐 2024-06-15 at 03:15:42
🎯 Confidence: 97%

This person is BLACKLISTED. Immediate attention required.
[Image attached: detection frame]
[Inline buttons: Acknowledge | Dispatch Security | View Details]

Example 3: Watchlist Match (WhatsApp)

[Template: watchlist_alert]
⚠️ WATCHLIST ALERT

Sarah Williams (VIP Watchlist)
detected at camera Lobby Main
on 2024-06-15 at 09:45:30

This person is on the VIP watchlist.
[Buttons: Acknowledge | Escalate]

Example 4: System Alert (Telegram)

⚙️ System Alert

Camera "Parking Lot" has been offline for 5 minutes.
Please check connectivity.

🕐 2024-06-15 at 12:10:00
🔧 Severity: high

7.4 Keyboard Definitions

inline_keyboards:
  alert_actions:
    buttons:
      - row:
        - text: "✅ Acknowledge"
          action: "acknowledge"
          style: "primary"
        - text: "📹 View Live"
          action: "view_live"
          style: "link"
      - row:
        - text: "📋 Details"
          action: "view_details"
          style: "secondary"
        - text: "🗺️ Map"
          action: "view_map"
          style: "secondary"

  watchlist_actions:
    buttons:
      - row:
        - text: "✅ Acknowledge"
          action: "acknowledge"
          style: "primary"
        - text: "📹 View Live"
          action: "view_live"
          style: "link"
      - row:
        - text: "📋 Details"
          action: "view_details"
          style: "secondary"
        - text: "⬆️ Escalate"
          action: "escalate"
          style: "danger"

  blacklist_actions:
    buttons:
      - row:
        - text: "🚨 Acknowledge NOW"
          action: "acknowledge"
          style: "danger"
        - text: "📹 View Live"
          action: "view_live"
          style: "link"
      - row:
        - text: "👮 Dispatch Security"
          action: "dispatch_security"
          style: "danger"
        - text: "📋 Details"
          action: "view_details"
          style: "secondary"
      - row:
        - text: "⬆️ Escalate to Management"
          action: "escalate"
          style: "warning"

  escalation_actions:
    buttons:
      - row:
        - text: "✅ Acknowledge"
          action: "acknowledge"
          style: "primary"
        - text: "📋 View Original Alert"
          action: "view_original"
          style: "link"

8. Retry Logic & Failure Handling

8.1 Retry Configuration

retry_config:
  # Global retry settings
  max_retries: 5
  base_delay_seconds: 2
  max_delay_seconds: 300  # 5 minutes
  exponential_base: 2
  
  # Jitter to prevent thundering herd
  jitter:
    enabled: true
    max_jitter_seconds: 1
  
  # Per-channel retry settings
  channels:
    telegram:
      max_retries: 5
      base_delay: 2
      retryable_errors:
        - "timeout"                    # Network timeout
        - "rate_limit"                 # 429 Too Many Requests
        - "server_error"               # 5xx errors
        - "conflict"                   # 409 Conflict
      non_retryable_errors:
        - "unauthorized"               # 401 — bad token
        - "forbidden"                  # 403 — blocked by user
        - "chat_not_found"             # Chat doesn't exist
        - "bad_request"                # 400 — malformed request
        
    whatsapp:
      max_retries: 5
      base_delay: 3
      retryable_errors:
        - "timeout"
        - "rate_limit"
        - "server_error"
        - "temporarily_unavailable"
      non_retryable_errors:
        - "invalid_phone_number"       # Bad phone number
        - "unsupported_message_type"   # Template not approved
        - "access_denied"              # Auth issue
        - "recipient_not_in_allowed_list"  # Phone not registered
  
  # Circuit breaker settings
  circuit_breaker:
    enabled: true
    failure_threshold: 10             # Open after 10 consecutive failures
    recovery_timeout_seconds: 60      # Try again after 60 seconds
    half_open_max_calls: 3            # Test with 3 calls when half-open

8.2 Retry Implementation (Pseudocode)

import asyncio
import random
from enum import Enum
from dataclasses import dataclass
from typing import Callable, Optional

class DeliveryStatus(Enum):
    PENDING = "pending"
    SENT = "sent"
    DELIVERED = "delivered"
    FAILED = "failed"
    RETRYING = "retrying"
    DEAD_LETTER = "dead_letter"

class RetryableError(Exception):
    """Error that can be retried."""
    pass

class NonRetryableError(Exception):
    """Error that should not be retried."""
    pass

@dataclass
class RetryPolicy:
    max_retries: int = 5
    base_delay: float = 2.0
    max_delay: float = 300.0
    exponential_base: float = 2.0
    jitter: bool = True
    max_jitter: float = 1.0

class RetryManager:
    """Handle retry logic with exponential backoff."""
    
    def __init__(
        self,
        policy: RetryPolicy,
        circuit_breaker: Optional[CircuitBreaker] = None,
        dead_letter_queue: Optional[DeadLetterQueue] = None,
        delivery_tracker: Optional[DeliveryTracker] = None
    ):
        self.policy = policy
        self.circuit_breaker = circuit_breaker
        self.dlq = dead_letter_queue
        self.tracker = delivery_tracker
    
    async def execute_with_retry(
        self,
        send_func: Callable,
        notification_id: str,
        *args,
        **kwargs
    ) -> SendResult:
        """
        Execute send function with exponential backoff retry.
        
        Pseudocode:
        
        1. Check circuit breaker — if OPEN, fail immediately
        2. Attempt delivery
        3. If success — record success, return result
        4. If retryable error:
           a. Calculate next delay: delay = min(base * 2^attempt, max_delay) + jitter
           b. Schedule retry after delay
           c. Increment attempt counter
           d. If attempt > max_retries → move to dead letter queue
        5. If non-retryable error → fail immediately, no retry
        """
        
        last_error = None
        
        for attempt in range(self.policy.max_retries + 1):
            # Check circuit breaker
            if self.circuit_breaker and self.circuit_breaker.is_open():
                return SendResult(
                    status=DeliveryStatus.FAILED,
                    error="Circuit breaker is OPEN",
                    circuit_breaker_open=True
                )
            
            try:
                # Attempt delivery
                result = await send_func(*args, **kwargs)
                
                if result.success:
                    # Success — record and return
                    if self.circuit_breaker:
                        self.circuit_breaker.record_success()
                    await self._record_success(notification_id, result)
                    return result
                
                # Check if error is retryable
                if not self._is_retryable(result.error_code):
                    # Non-retryable — fail immediately
                    await self._record_permanent_failure(
                        notification_id, result.error, attempt
                    )
                    return result
                
                last_error = result.error
                
            except RetryableError as e:
                last_error = str(e)
                
            except NonRetryableError as e:
                # Non-retryable — fail immediately
                await self._record_permanent_failure(
                    notification_id, str(e), attempt
                )
                return SendResult(
                    status=DeliveryStatus.FAILED,
                    error=str(e),
                    retryable=False
                )
            
            except Exception as e:
                last_error = str(e)
                if not self._is_retryable_exception(e):
                    await self._record_permanent_failure(
                        notification_id, str(e), attempt
                    )
                    return SendResult(
                        status=DeliveryStatus.FAILED,
                        error=str(e),
                        retryable=False
                    )
            
            # Calculate retry delay (exponential backoff with jitter)
            if attempt < self.policy.max_retries:
                delay = self._calculate_delay(attempt)
                
                # Update status to retrying
                await self._record_retry(notification_id, attempt, delay, last_error)
                
                # Wait before retry
                await asyncio.sleep(delay)
        
        # All retries exhausted — move to dead letter queue
        await self._move_to_dead_letter(
            notification_id=notification_id,
            error=last_error,
            total_attempts=self.policy.max_retries + 1
        )
        
        if self.circuit_breaker:
            self.circuit_breaker.record_failure()
        
        return SendResult(
            status=DeliveryStatus.DEAD_LETTER,
            error=f"All {self.policy.max_retries} retries exhausted. Last error: {last_error}",
            total_attempts=self.policy.max_retries + 1
        )
    
    def _calculate_delay(self, attempt: int) -> float:
        """Calculate delay with exponential backoff and jitter."""
        # Exponential: base * 2^attempt
        delay = self.policy.base_delay * (self.policy.exponential_base ** attempt)
        # Cap at max delay
        delay = min(delay, self.policy.max_delay)
        
        if self.policy.jitter:
            # Add random jitter to prevent thundering herd
            jitter = random.uniform(0, self.policy.max_jitter)
            delay += jitter
        
        return delay
    
    def _is_retryable(self, error_code: str) -> bool:
        """Determine if error code is retryable."""
        retryable_codes = {
            "timeout", "rate_limit", "server_error", "conflict",
            "temporarily_unavailable", "network_error", "connection_reset"
        }
        return error_code in retryable_codes
    
    def _is_retryable_exception(self, exc: Exception) -> bool:
        """Determine if exception is retryable."""
        retryable_types = (
            asyncio.TimeoutError,
            ConnectionError,
            ConnectionResetError,
        )
        return isinstance(exc, retryable_types)
    
    async def _record_retry(
        self,
        notification_id: str,
        attempt: int,
        next_delay: float,
        error: str
    ):
        """Record retry attempt in delivery tracker."""
        if self.tracker:
            await self.tracker.update_status(
                notification_id=notification_id,
                status=DeliveryStatus.RETRYING,
                attempt=attempt,
                next_retry_at=datetime.utcnow() + timedelta(seconds=next_delay),
                error=error
            )
    
    async def _record_success(self, notification_id: str, result: SendResult):
        """Record successful delivery."""
        if self.tracker:
            await self.tracker.update_status(
                notification_id=notification_id,
                status=DeliveryStatus.SENT,
                provider_message_id=result.message_id,
                sent_at=datetime.utcnow()
            )
    
    async def _record_permanent_failure(
        self,
        notification_id: str,
        error: str,
        attempt: int
    ):
        """Record permanent (non-retryable) failure."""
        if self.tracker:
            await self.tracker.update_status(
                notification_id=notification_id,
                status=DeliveryStatus.FAILED,
                error=error,
                failed_at=datetime.utcnow(),
                total_attempts=attempt + 1,
                retryable=False
            )
    
    async def _move_to_dead_letter(
        self,
        notification_id: str,
        error: str,
        total_attempts: int
    ):
        """Move notification to dead letter queue."""
        if self.dlq:
            await self.dlq.enqueue(
                notification_id=notification_id,
                error=error,
                total_attempts=total_attempts,
                enqueued_at=datetime.utcnow()
            )
        
        if self.tracker:
            await self.tracker.update_status(
                notification_id=notification_id,
                status=DeliveryStatus.DEAD_LETTER,
                error=error,
                total_attempts=total_attempts,
                moved_to_dlq_at=datetime.utcnow()
            )

8.3 Dead Letter Queue

class DeadLetterQueue:
    """Queue for notifications that failed after all retries."""
    
    def __init__(self, redis_client, notification_store):
        self.redis = redis_client
        self.store = notification_store
        self.queue_key = "notifications:dead_letter"
    
    async def enqueue(self, notification_id: str, error: str, total_attempts: int, enqueued_at: datetime):
        """Add failed notification to DLQ."""
        entry = {
            "notification_id": notification_id,
            "error": error,
            "total_attempts": total_attempts,
            "enqueued_at": enqueued_at.isoformat(),
            "status": "pending_review"
        }
        await self.redis.lpush(self.queue_key, json.dumps(entry))
    
    async def list_pending(
        self,
        skip: int = 0,
        limit: int = 50
    ) -> List[DLQEntry]:
        """List notifications in DLQ for admin review."""
        entries = await self.redis.lrange(self.queue_key, skip, skip + limit - 1)
        return [DLQEntry(**json.loads(e)) for e in entries]
    
    async def retry_notification(self, notification_id: str) -> bool:
        """Manually retry a notification from DLQ."""
        # Remove from DLQ
        await self._remove_from_queue(notification_id)
        
        # Reset retry counter and re-queue
        notification = await self.store.get(notification_id)
        notification.retry_count = 0
        notification.status = DeliveryStatus.PENDING
        
        # Re-submit to queue manager
        await self.queue_manager.enqueue(notification)
        return True
    
    async def retry_all(self) -> RetryAllResult:
        """Retry all notifications in DLQ."""
        entries = await self.redis.lrange(self.queue_key, 0, -1)
        results = {"success": 0, "failed": 0}
        
        for entry_json in entries:
            entry = json.loads(entry_json)
            try:
                await self.retry_notification(entry["notification_id"])
                results["success"] += 1
            except Exception:
                results["failed"] += 1
        
        return RetryAllResult(**results)
    
    async def discard(self, notification_id: str):
        """Permanently discard a notification from DLQ."""
        await self._remove_from_queue(notification_id)
        await self.store.update_status(
            notification_id,
            status=DeliveryStatus.FAILED,
            discarded_at=datetime.utcnow()
        )

8.4 Circuit Breaker

class CircuitBreaker:
    """Circuit breaker pattern to prevent cascading failures."""
    
    STATE_CLOSED = "closed"       # Normal operation
    STATE_OPEN = "open"           # Failing fast
    STATE_HALF_OPEN = "half_open"  # Testing if recovered
    
    def __init__(
        self,
        failure_threshold: int = 10,
        recovery_timeout: int = 60,
        half_open_max_calls: int = 3
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.half_open_max_calls = half_open_max_calls
        
        self.state = self.STATE_CLOSED
        self.failure_count = 0
        self.success_count = 0
        self.last_failure_time = None
        self.half_open_calls = 0
    
    def is_open(self) -> bool:
        """Check if circuit breaker is open (failing fast)."""
        if self.state == self.STATE_OPEN:
            # Check if recovery timeout has passed
            if self.last_failure_time:
                elapsed = (datetime.utcnow() - self.last_failure_time).total_seconds()
                if elapsed >= self.recovery_timeout:
                    self.state = self.STATE_HALF_OPEN
                    self.half_open_calls = 0
                    return False
            return True
        return False
    
    def record_success(self):
        """Record a successful call."""
        if self.state == self.STATE_HALF_OPEN:
            self.half_open_calls += 1
            if self.half_open_calls >= self.half_open_max_calls:
                # Successfully tested — close the circuit
                self.state = self.STATE_CLOSED
                self.failure_count = 0
                self.half_open_calls = 0
        else:
            self.failure_count = max(0, self.failure_count - 1)
    
    def record_failure(self):
        """Record a failed call."""
        self.failure_count += 1
        self.last_failure_time = datetime.utcnow()
        
        if self.state == self.STATE_HALF_OPEN:
            # Failed during testing — re-open
            self.state = self.STATE_OPEN
        elif self.failure_count >= self.failure_threshold:
            # Threshold reached — open circuit
            self.state = self.STATE_OPEN

9. Rate Limiting & Throttling

9.1 Rate Limit Configuration

rate_limiting:
  # Global rate limit (across all channels)
  global:
    enabled: true
    max_messages_per_minute: 200
    burst_capacity: 50
    strategy: "token_bucket"
  
  # Per-channel rate limits
  per_channel:
    telegram:
      max_messages_per_second: 30
      max_messages_per_minute: 600
      burst_capacity: 30
      # Per-chat limits
      per_chat:
        max_messages_per_second: 1
        min_interval_seconds: 0.5
    
    whatsapp:
      max_messages_per_second: 80      # Business tier limit
      max_messages_per_minute: 4800
      burst_capacity: 200
      # Per-recipient limits
      per_recipient:
        max_messages_per_minute: 20     # Avoid spam perception
  
  # Per-camera burst handling
  per_source:
    camera:
      max_alerts_per_minute: 30        # Prevent camera spam
      burst_window_seconds: 60
      cooldown_after_burst_seconds: 300  # 5 min cooldown
  
  # Alert deduplication
  deduplication:
    enabled: true
    window_minutes: 5                   # Dedup within 5 minutes
    keys:
      - "camera_id + person_id + event_type"
      - "camera_id + event_type"        # For unknown persons
    
    # Deduplication actions
    on_duplicate:
      action: "suppress"                # Options: suppress, batch, increment_counter
      batch_window_seconds: 30          # Batch duplicates within 30s
      counter_template: " (+{count} more)"

  # Queue configuration
  queues:
    high_priority:
      name: "notifications:priority:high"
      max_size: 10000
      workers: 5
    normal_priority:
      name: "notifications:priority:normal"
      max_size: 50000
      workers: 3
    low_priority:
      name: "notifications:priority:low"
      max_size: 100000
      workers: 2

9.2 Token Bucket Implementation

import asyncio
from dataclasses import dataclass
from typing import Dict
import time

@dataclass
class TokenBucket:
    """Token bucket for rate limiting."""
    capacity: float        # Maximum tokens
    refill_rate: float     # Tokens per second
    tokens: float = 0
    last_refill: float = 0
    
    def __post_init__(self):
        self.tokens = self.capacity
        self.last_refill = time.monotonic()
    
    def consume(self, tokens: float = 1) -> bool:
        """Try to consume tokens. Returns True if allowed."""
        self._refill()
        
        if self.tokens >= tokens:
            self.tokens -= tokens
            return True
        return False
    
    def _refill(self):
        """Refill tokens based on elapsed time."""
        now = time.monotonic()
        elapsed = now - self.last_refill
        self.tokens = min(
            self.capacity,
            self.tokens + elapsed * self.refill_rate
        )
        self.last_refill = now
    
    def time_until_available(self) -> float:
        """Seconds until at least 1 token is available."""
        self._refill()
        if self.tokens >= 1:
            return 0
        return (1 - self.tokens) / self.refill_rate


class RateLimiter:
    """Multi-level rate limiter with token buckets."""
    
    def __init__(self, redis_client):
        self.redis = redis_client
        self.buckets: Dict[str, TokenBucket] = {}
    
    async def check_rate_limit(
        self,
        channel: str,
        recipient: Optional[str] = None,
        source: Optional[str] = None
    ) -> RateLimitResult:
        """
        Check all applicable rate limits.
        
        Levels checked (in order):
        1. Global rate limit
        2. Per-channel rate limit
        3. Per-recipient rate limit (if applicable)
        4. Per-source rate limit (if applicable)
        """
        checks = []
        
        # 1. Global check
        global_allowed = await self._check_global_limit()
        checks.append(("global", global_allowed))
        
        # 2. Channel check
        channel_allowed = await self._check_channel_limit(channel)
        checks.append(("channel", channel_allowed))
        
        # 3. Per-recipient check
        if recipient:
            recipient_allowed = await self._check_recipient_limit(channel, recipient)
            checks.append(("recipient", recipient_allowed))
        
        # 4. Per-source check
        if source:
            source_allowed = await self._check_source_limit(source)
            checks.append(("source", source_allowed))
        
        # All must pass
        all_allowed = all(allowed for _, allowed in checks)
        
        if all_allowed:
            # Consume tokens from all buckets
            await self._consume_tokens(channel, recipient, source)
            return RateLimitResult(allowed=True, limiting_factor=None)
        
        # Find which limit was hit
        for name, allowed in checks:
            if not allowed:
                retry_after = await self._get_retry_after(name, channel, recipient)
                return RateLimitResult(
                    allowed=False,
                    limiting_factor=name,
                    retry_after_seconds=retry_after
                )
        
        return RateLimitResult(allowed=False, limiting_factor="unknown")
    
    async def _check_global_limit(self) -> bool:
        """Check global rate limit using Redis for distributed counting."""
        key = "rate_limit:global"
        current = await self.redis.incr(key)
        if current == 1:
            await self.redis.expire(key, 60)  # 1-minute window
        return current <= 200  # 200 messages per minute
    
    async def _check_channel_limit(self, channel: str) -> bool:
        """Check per-channel rate limit."""
        key = f"rate_limit:channel:{channel}"
        current = await self.redis.incr(key)
        if current == 1:
            await self.redis.expire(key, 1)  # 1-second window
        
        limits = {"telegram": 30, "whatsapp": 80}
        return current <= limits.get(channel, 30)
    
    async def _check_recipient_limit(self, channel: str, recipient: str) -> bool:
        """Check per-recipient rate limit."""
        key = f"rate_limit:recipient:{channel}:{recipient}"
        current = await self.redis.incr(key)
        if current == 1:
            await self.redis.expire(key, 60)
        return current <= 20  # 20 messages per minute per recipient

9.3 Alert Deduplication

class AlertDeduplicator:
    """Deduplicate alerts within a time window."""
    
    def __init__(self, redis_client, window_minutes: int = 5):
        self.redis = redis_client
        self.window_seconds = window_minutes * 60
    
    def _generate_dedup_key(self, alert: AlertEvent) -> str:
        """Generate deduplication key from alert attributes."""
        if alert.person_id:
            # Deduplicate by camera + person + event type
            key_parts = ["dedup", alert.camera_id, alert.person_id, alert.event_type]
        else:
            # Deduplicate by camera + event type (for unknown persons)
            key_parts = ["dedup", alert.camera_id, alert.event_type]
        
        return ":".join(key_parts)
    
    async def check_duplicate(self, alert: AlertEvent) -> DedupResult:
        """Check if this alert is a duplicate within the window."""
        key = self._generate_dedup_key(alert)
        
        # Check if key exists
        exists = await self.redis.exists(key)
        
        if exists:
            # Increment counter for this dedup key
            count = await self.redis.incr(f"{key}:count")
            
            return DedupResult(
                is_duplicate=True,
                suppressed=True,
                original_alert_id=await self.redis.get(key),
                duplicate_count=count
            )
        
        # Store alert ID with TTL
        await self.redis.setex(key, self.window_seconds, alert.id)
        await self.redis.setex(f"{key}:count", self.window_seconds, 1)
        
        return DedupResult(is_duplicate=False)
    
    async def update_with_count(self, alert_id: str) -> Optional[str]:
        """
        If alert was sent and duplicates were suppressed,
        append counter to indicate how many similar alerts were suppressed.
        """
        # This would be called after sending to update the original message
        # with a count of suppressed duplicates
        pass

10. Delivery Tracking

10.1 Delivery Status Model

from enum import Enum
from datetime import datetime
from typing import Optional, List
from pydantic import BaseModel

class DeliveryStatus(str, Enum):
    PENDING = "pending"           # Queued, not yet sent
    SENT = "sent"                 # API request successful
    DELIVERED = "delivered"       # Provider confirmed delivery
    READ = "read"                 # Recipient opened/read
    ENGAGED = "engaged"           # User interacted (button click)
    FAILED = "failed"             # Permanently failed
    RETRYING = "retrying"         # Scheduled for retry
    DEAD_LETTER = "dead_letter"   # Moved to DLQ
    SUPPRESSED = "suppressed"     # Quiet hours or dedup
    CANCELLED = "cancelled"       # Cancelled (e.g., acknowledged before send)

class NotificationDelivery(BaseModel):
    """Full delivery record for a notification."""
    
    # IDs
    notification_id: str           # Internal UUID
    alert_id: str                  # Parent alert ID
    channel: str                   # "telegram" | "whatsapp"
    recipient_id: str              # Chat ID or phone number
    recipient_name: Optional[str]
    
    # Template info
    template_id: str
    template_rendered_text: str
    
    # Status timeline
    status: DeliveryStatus
    status_history: List[StatusTransition]
    
    # Timestamps
    created_at: datetime           # Notification created
    queued_at: Optional[datetime]
    sent_at: Optional[datetime]
    delivered_at: Optional[datetime]
    read_at: Optional[datetime]
    failed_at: Optional[datetime]
    acknowledged_at: Optional[datetime]
    
    # Provider info
    provider_message_id: Optional[str]   # Telegram message_id or WhatsApp message ID
    provider_error: Optional[str]
    provider_error_code: Optional[str]
    
    # Retry info
    retry_count: int = 0
    max_retries: int = 5
    next_retry_at: Optional[datetime]
    
    # Media
    media_attached: bool = False
    media_type: Optional[str]       # "image" | "video"
    media_url: Optional[str]
    
    # Escalation
    escalation_level: int = 0
    escalation_triggered_at: Optional[datetime]
    
    # Context
    quiet_hours_suppressed: bool = False
    deduplicated: bool = False
    duplicate_of: Optional[str]
    bypassed_quiet_hours: bool = False
    
    class StatusTransition(BaseModel):
        from_status: DeliveryStatus
        to_status: DeliveryStatus
        timestamp: datetime
        reason: Optional[str]

# Database schema (PostgreSQL)
NOTIFICATIONS_TABLE = """
CREATE TABLE notifications (
    notification_id         UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    alert_id                UUID NOT NULL REFERENCES alerts(alert_id),
    channel                 VARCHAR(20) NOT NULL CHECK (channel IN ('telegram', 'whatsapp')),
    recipient_id            VARCHAR(100) NOT NULL,
    recipient_name          VARCHAR(255),
    
    template_id             VARCHAR(100) NOT NULL,
    rendered_text           TEXT,
    
    status                  VARCHAR(30) NOT NULL DEFAULT 'pending',
    
    created_at              TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    queued_at               TIMESTAMPTZ,
    sent_at                 TIMESTAMPTZ,
    delivered_at            TIMESTAMPTZ,
    read_at                 TIMESTAMPTZ,
    failed_at               TIMESTAMPTZ,
    acknowledged_at         TIMESTAMPTZ,
    
    provider_message_id     VARCHAR(255),
    provider_error          TEXT,
    provider_error_code     VARCHAR(100),
    
    retry_count             INT NOT NULL DEFAULT 0,
    next_retry_at           TIMESTAMPTZ,
    
    media_attached          BOOLEAN NOT NULL DEFAULT FALSE,
    media_type              VARCHAR(20),
    media_url               TEXT,
    
    escalation_level        INT NOT NULL DEFAULT 0,
    
    quiet_hours_suppressed  BOOLEAN NOT NULL DEFAULT FALSE,
    deduplicated            BOOLEAN NOT NULL DEFAULT FALSE,
    duplicate_of            UUID,
    bypassed_quiet_hours    BOOLEAN NOT NULL DEFAULT FALSE,
    
    -- Indexes
    CONSTRAINT valid_status CHECK (status IN (
        'pending', 'sent', 'delivered', 'read', 'engaged',
        'failed', 'retrying', 'dead_letter', 'suppressed', 'cancelled'
    ))
);

-- Indexes for common queries
CREATE INDEX idx_notifications_alert_id ON notifications(alert_id);
CREATE INDEX idx_notifications_status ON notifications(status);
CREATE INDEX idx_notifications_channel ON notifications(channel);
CREATE INDEX idx_notifications_created_at ON notifications(created_at);
CREATE INDEX idx_notifications_recipient ON notifications(channel, recipient_id);

-- Status history table
CREATE TABLE notification_status_history (
    history_id      BIGSERIAL PRIMARY KEY,
    notification_id UUID NOT NULL REFERENCES notifications(notification_id),
    from_status     VARCHAR(30),
    to_status       VARCHAR(30) NOT NULL,
    timestamp       TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    reason          TEXT,
    metadata        JSONB
);
CREATE INDEX idx_status_history_notification ON notification_status_history(notification_id);
"""

10.2 Delivery Status API

# GET /delivery-status/{alert_id}
async def get_delivery_status(alert_id: str) -> DeliveryStatusResponse:
    """
    Get delivery status for all notifications of an alert.
    
    Response:
    {
        "alert_id": "uuid",
        "event_type": "person_detected",
        "total_notifications": 5,
        "summary": {
            "pending": 0,
            "sent": 2,
            "delivered": 2,
            "read": 1,
            "failed": 0,
            "retrying": 1,
            "dead_letter": 0,
            "suppressed": 0
        },
        "notifications": [
            {
                "notification_id": "uuid-1",
                "channel": "telegram",
                "recipient": "Security Team (@sec_ops)",
                "status": "delivered",
                "provider_message_id": "12345",
                "timestamps": {
                    "created": "2024-06-15T14:32:18Z",
                    "sent": "2024-06-15T14:32:19Z",
                    "delivered": "2024-06-15T14:32:20Z"
                },
                "retry_count": 0,
                "media_attached": true,
                "escalation_level": 0
            },
            {
                "notification_id": "uuid-2",
                "channel": "whatsapp",
                "recipient": "+12345678901",
                "status": "sent",
                "provider_message_id": "wamid.xxx",
                "timestamps": {
                    "created": "2024-06-15T14:32:18Z",
                    "sent": "2024-06-15T14:32:19Z"
                },
                "retry_count": 0,
                "media_attached": true,
                "escalation_level": 0
            }
        ]
    }
    """
    ...

# GET /notifications/{notification_id}/status
async def get_notification_status(notification_id: str) -> NotificationStatus:
    """Get detailed status for a single notification including history."""
    ...

# GET /delivery-status with filters
async def list_delivery_status(
    alert_id: Optional[str] = None,
    channel: Optional[str] = None,
    status: Optional[str] = None,
    recipient: Optional[str] = None,
    start_time: Optional[datetime] = None,
    end_time: Optional[datetime] = None,
    skip: int = 0,
    limit: int = 50
) -> PaginatedDeliveryList:
    """List delivery statuses with filtering."""
    ...

10.3 Dashboard Display

dashboard_display:
  # Real-time delivery status widget
  status_widget:
    layout: "timeline"
    elements:
      - type: "status_badge"
        states:
          pending: { color: "gray", icon: "clock" }
          sent: { color: "blue", icon: "paper-plane" }
          delivered: { color: "green", icon: "check-circle" }
          read: { color: "purple", icon: "eye" }
          failed: { color: "red", icon: "times-circle" }
          retrying: { color: "orange", icon: "sync" }
          dead_letter: { color: "dark-red", icon: "exclamation-triangle" }
          suppressed: { color: "yellow", icon: "ban" }
      
      - type: "timeline"
        show_transitions: true
        timestamps:
          - created: "Creation time"
          - queued: "Queued for delivery"
          - sent: "Sent to provider"
          - delivered: "Delivered to device"
          - read: "Read by recipient"
      
      - type: "error_display"
        show_for: ["failed", "dead_letter", "retrying"]
        fields:
          - error_message
          - error_code
          - retry_count
          - next_retry_time
          
  # Aggregated statistics
  statistics_panel:
    metrics:
      - "Delivery rate (24h)"
      - "Average delivery time"
      - "Failure rate by channel"
      - "Top failed recipients"
      - "Dead letter queue size"
      - "Retry queue depth"

11. Escalation Engine

11.1 Escalation Configuration

escalation_config:
  # Global settings
  enabled: true
  default_threshold_minutes: 15
  max_escalation_levels: 3
  
  # Per-severity thresholds
  thresholds:
    critical:
      level_1: 5     # 5 minutes
      level_2: 10    # 10 minutes
      level_3: 20    # 20 minutes
    high:
      level_1: 15    # 15 minutes
      level_2: 30    # 30 minutes
      level_3: 60    # 60 minutes
    medium:
      level_1: 30    # 30 minutes
      level_2: 60    # 60 minutes
      level_3: 120   # 2 hours
    low:
      level_1: 60    # 1 hour
      level_2: 120   # 2 hours
      level_3: 240   # 4 hours
  
  # Level definitions
  levels:
    level_1:
      name: "Primary Escalation"
      description: "Notify additional recipients"
      action: "expand_recipients"
      config:
        add_groups: ["management"]
        channels: ["telegram", "whatsapp"]
        
    level_2:
      name: "Secondary Escalation"
      description: "Escalate to higher severity, notify all"
      action: "escalate_severity"
      config:
        severity_increase: 1           # low->medium, medium->high, high->critical
        add_groups: ["management"]
        channels: ["telegram", "whatsapp"]
        repeat_to_original: true       # Re-notify original recipients
        
    level_3:
      name: "Final Escalation"
      description: "Maximum escalation — all hands"
      action: "all_hands"
      config:
        severity: "critical"
        notify_all_groups: true
        channels: ["telegram", "whatsapp"]
        bypass_quiet_hours: true
        include_audit_trail: true
  
  # Acknowledgment settings
  acknowledgment:
    channels:
      - "telegram_inline_button"       # Ack via Telegram button
      - "whatsapp_quick_reply"         # Ack via WhatsApp quick reply
      - "dashboard_button"             # Ack via web dashboard
      - "api_call"                     # Ack via API
    
    require_ack_for_severities: ["high", "critical"]
    auto_acknowledge_after_minutes: null  # Never auto-ack for security

11.2 Escalation Engine Implementation

class EscalationEngine:
    """Handle alert escalation for unacknowledged notifications."""
    
    def __init__(
        self,
        config: EscalationConfig,
        notification_service: NotificationService,
        timer_scheduler: TimerScheduler,
        delivery_tracker: DeliveryTracker
    ):
        self.config = config
        self.notification_service = notification_service
        self.scheduler = timer_scheduler
        self.tracker = delivery_tracker
        self.active_timers: Dict[str, str] = {}  # alert_id -> timer_id
    
    async def start_escalation_timer(self, alert: AlertEvent, notifications: List[Notification]):
        """Start escalation timer when alert is first sent."""
        if alert.severity not in self.config.require_ack_for_severities:
            return
        
        # Get threshold for this severity and level 1
        threshold = self.config.get_threshold(alert.severity, level=1)
        
        # Schedule escalation check
        timer_id = await self.scheduler.schedule(
            task="escalation_check",
            execute_at=datetime.utcnow() + timedelta(minutes=threshold),
            payload={
                "alert_id": alert.id,
                "current_level": 0,
                "severity": alert.severity
            }
        )
        
        self.active_timers[alert.id] = timer_id
        
        # Store timer reference on alert
        await self.tracker.set_escalation_timer(alert.id, timer_id, threshold)
    
    async def handle_escalation_timer(self, payload: dict):
        """Handle escalation timer firing."""
        alert_id = payload["alert_id"]
        current_level = payload["current_level"]
        
        # Check if alert has been acknowledged
        if await self._is_acknowledged(alert_id):
            logger.info(f"Alert {alert_id} acknowledged — canceling escalation")
            return
        
        # Check if max level reached
        if current_level >= self.config.max_escalation_levels:
            logger.info(f"Alert {alert_id} reached max escalation level")
            return
        
        # Perform escalation
        next_level = current_level + 1
        await self._escalate(alert_id, next_level)
        
        # Schedule next escalation if not at max
        if next_level < self.config.max_escalation_levels:
            severity = payload["severity"]
            next_threshold = self.config.get_threshold(severity, level=next_level + 1)
            
            timer_id = await self.scheduler.schedule(
                task="escalation_check",
                execute_at=datetime.utcnow() + timedelta(minutes=next_threshold),
                payload={
                    "alert_id": alert_id,
                    "current_level": next_level,
                    "severity": severity
                }
            )
            self.active_timers[alert_id] = timer_id
    
    async def _escalate(self, alert_id: str, level: int):
        """Execute escalation for the given level."""
        level_config = self.config.levels.get(f"level_{level}")
        if not level_config:
            return
        
        alert = await self.alert_store.get(alert_id)
        
        logger.warning(f"Escalating alert {alert_id} to level {level}: {level_config.name}")
        
        if level_config.action == "expand_recipients":
            # Add additional recipient groups
            await self.notification_service.send_escalation(
                alert=alert,
                level=level,
                recipient_groups=level_config.config["add_groups"],
                channels=level_config.config["channels"],
                template="escalation_notice"
            )
            
        elif level_config.action == "escalate_severity":
            # Increase severity
            new_severity = self._increase_severity(alert.severity)
            await self.alert_store.update_severity(alert_id, new_severity)
            
            # Re-notify original recipients with higher severity
            if level_config.config.get("repeat_to_original"):
                await self.notification_service.send_escalation(
                    alert=alert,
                    level=level,
                    recipient_groups=level_config.config["add_groups"],
                    channels=level_config.config["channels"],
                    template="escalation_notice",
                    severity_override=new_severity
                )
            
        elif level_config.action == "all_hands":
            # Notify all configured groups
            await self.notification_service.send_escalation(
                alert=alert,
                level=level,
                notify_all_groups=True,
                channels=level_config.config["channels"],
                template="escalation_notice",
                severity_override="critical",
                bypass_quiet_hours=True
            )
        
        # Update escalation tracking
        await self.tracker.record_escalation(alert_id, level, datetime.utcnow())
    
    async def acknowledge_alert(
        self,
        alert_id: str,
        acknowledged_by: str,
        channel: str
    ) -> AckResult:
        """Acknowledge an alert, canceling any pending escalation."""
        # Update alert status
        await self.alert_store.acknowledge(alert_id, acknowledged_by, channel)
        
        # Cancel escalation timer
        if alert_id in self.active_timers:
            timer_id = self.active_timers[alert_id]
            await self.scheduler.cancel(timer_id)
            del self.active_timers[alert_id]
        
        # Update all pending notifications for this alert
        await self.tracker.acknowledge_all(alert_id, acknowledged_by, datetime.utcnow())
        
        # Notify other recipients that alert was acknowledged
        await self._notify_acknowledgment(alert_id, acknowledged_by)
        
        return AckResult(success=True, acknowledged_at=datetime.utcnow())
    
    async def _is_acknowledged(self, alert_id: str) -> bool:
        """Check if alert has been acknowledged."""
        alert = await self.alert_store.get(alert_id)
        return alert.acknowledged_at is not None
    
    def _increase_severity(self, current: str) -> str:
        """Increase severity by one level."""
        order = ["low", "medium", "high", "critical"]
        idx = order.index(current)
        return order[min(idx + 1, len(order) - 1)]

12. Media Attachments

12.1 Media Processing Pipeline

Detection Frame/Clip
       |
       v
[Store Original] --> MinIO/S3 (full resolution)
       |
       v
[Process for Channels]
       |
       +---> [Telegram Image] --> Resize to 1280x720 --> Generate presigned URL
       |                        Format: JPEG, Quality: 85
       |
       +---> [WhatsApp Image] --> Resize to 1600x900 --> Generate presigned URL
       |                        Format: JPEG, Quality: 80
       |                        Max: 16MB
       |
       +---> [Telegram Video] --> Compress to 50MB limit
       |                        Resolution: 1280x720
       |                        Duration: max 10 seconds
       |                        Format: MP4 (H.264)
       |
       +---> [WhatsApp Video] --> Compress to 16MB limit
                                Resolution: 1280x720
                                Duration: max 10 seconds
                                Format: MP4 (H.264)

12.2 Media Processor Implementation

import subprocess
import os
from PIL import Image
from typing import Optional, Tuple

class MediaProcessor:
    """Process media attachments for different channels."""
    
    # Channel-specific limits
    LIMITS = {
        "telegram": {
            "image": {"max_size_mb": 10, "max_dimensions": (2560, 2560)},
            "video": {"max_size_mb": 50, "max_duration_sec": 60},
        },
        "whatsapp": {
            "image": {"max_size_mb": 16, "max_dimensions": (1920, 1080)},
            "video": {"max_size_mb": 16, "max_duration_sec": 60},
        }
    }
    
    def __init__(self, storage: ObjectStorage, temp_dir: str = "/tmp/media"):
        self.storage = storage
        self.temp_dir = temp_dir
        os.makedirs(temp_dir, exist_ok=True)
    
    async def process_image(
        self,
        source_url: str,
        channel: str,
        max_dimensions: Optional[Tuple[int, int]] = None
    ) -> ProcessedMedia:
        """
        Process image for specified channel.
        
        Steps:
        1. Download from source
        2. Resize if exceeds max dimensions
        3. Compress to meet size limit
        4. Upload to object storage
        5. Generate presigned URL
        """
        limits = self.LIMITS[channel]["image"]
        max_dims = max_dimensions or limits["max_dimensions"]
        
        # Download
        local_path = await self._download(source_url)
        
        try:
            # Open and process
            with Image.open(local_path) as img:
                # Convert to RGB if necessary
                if img.mode in ("RGBA", "P"):
                    img = img.convert("RGB")
                
                # Resize if needed
                if img.width > max_dims[0] or img.height > max_dims[1]:
                    img.thumbnail(max_dims, Image.LANCZOS)
                
                # Save with quality optimization
                output_path = os.path.join(self.temp_dir, f"{uuid4()}.jpg")
                
                # Iteratively reduce quality until under size limit
                quality = 90
                while quality >= 50:
                    img.save(output_path, "JPEG", quality=quality, optimize=True)
                    size_mb = os.path.getsize(output_path) / (1024 * 1024)
                    if size_mb <= limits["max_size_mb"]:
                        break
                    quality -= 5
            
            # Upload to storage
            storage_key = f"processed/images/{channel}/{uuid4()}.jpg"
            await self.storage.upload(output_path, storage_key)
            
            # Generate presigned URL (valid for 1 hour)
            presigned_url = await self.storage.presigned_url(
                storage_key,
                expiration=3600
            )
            
            return ProcessedMedia(
                url=presigned_url,
                storage_key=storage_key,
                size_bytes=os.path.getsize(output_path),
                width=img.width,
                height=img.height,
                mime_type="image/jpeg"
            )
            
        finally:
            # Cleanup temp files
            self._cleanup(local_path, output_path if 'output_path' in dir() else None)
    
    async def process_video(
        self,
        source_url: str,
        channel: str,
        max_duration: int = 10,
        target_resolution: Tuple[int, int] = (1280, 720)
    ) -> ProcessedMedia:
        """
        Process video clip for specified channel.
        
        Steps:
        1. Download from source
        2. Trim to max duration
        3. Resize to target resolution
        4. Compress to meet size limit
        5. Upload to object storage
        6. Generate presigned URL
        """
        limits = self.LIMITS[channel]["video"]
        max_size_mb = limits["max_size_mb"]
        
        # Download
        local_path = await self._download(source_url)
        output_path = os.path.join(self.temp_dir, f"{uuid4()}.mp4")
        
        try:
            # Get video info
            probe = await self._ffprobe(local_path)
            duration = float(probe["format"]["duration"])
            
            # Determine target bitrate to fit within size limit
            target_size_bits = (max_size_mb * 0.9) * 8 * 1024 * 1024  # 90% of limit
            effective_duration = min(duration, max_duration)
            target_bitrate = int(target_size_bits / effective_duration)
            
            # Two-pass encoding for better quality
            await self._ffmpeg_encode(
                input_path=local_path,
                output_path=output_path,
                duration=min(duration, max_duration),
                resolution=target_resolution,
                video_bitrate=f"{target_bitrate // 2}",  # Split between video/audio
                audio_bitrate="64k"
            )
            
            # Verify output size
            output_size_mb = os.path.getsize(output_path) / (1024 * 1024)
            if output_size_mb > max_size_mb:
                # Reduce quality further
                target_bitrate = int(target_bitrate * 0.7)
                await self._ffmpeg_encode(
                    input_path=local_path,
                    output_path=output_path,
                    duration=min(duration, max_duration),
                    resolution=target_resolution,
                    video_bitrate=f"{target_bitrate // 2}",
                    audio_bitrate="32k"
                )
            
            # Upload to storage
            storage_key = f"processed/videos/{channel}/{uuid4()}.mp4"
            await self.storage.upload(output_path, storage_key)
            
            presigned_url = await self.storage.presigned_url(storage_key, expiration=3600)
            
            return ProcessedMedia(
                url=presigned_url,
                storage_key=storage_key,
                size_bytes=os.path.getsize(output_path),
                width=target_resolution[0],
                height=target_resolution[1],
                duration=effective_duration,
                mime_type="video/mp4"
            )
            
        finally:
            self._cleanup(local_path, output_path)
    
    async def _ffmpeg_encode(
        self,
        input_path: str,
        output_path: str,
        duration: float,
        resolution: Tuple[int, int],
        video_bitrate: str,
        audio_bitrate: str
    ):
        """Run FFmpeg to encode video."""
        cmd = [
            "ffmpeg",
            "-y",                                           # Overwrite output
            "-i", input_path,                               # Input
            "-t", str(duration),                            # Duration limit
            "-vf", f"scale={resolution[0]}:{resolution[1]}",  # Resolution
            "-c:v", "libx264",                              # Video codec
            "-b:v", video_bitrate,                          # Video bitrate
            "-preset", "fast",                              # Encoding speed
            "-c:a", "aac",                                  # Audio codec
            "-b:a", audio_bitrate,                          # Audio bitrate
            "-movflags", "+faststart",                      # Web optimization
            "-f", "mp4",                                    # Output format
            output_path
        ]
        
        process = await asyncio.create_subprocess_exec(
            *cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )
        stdout, stderr = await process.communicate()
        
        if process.returncode != 0:
            raise MediaProcessingError(f"FFmpeg failed: {stderr.decode()}")
    
    async def cleanup_expired_media(self, max_age_hours: int = 24):
        """Clean up processed media older than specified age."""
        cutoff = datetime.utcnow() - timedelta(hours=max_age_hours)
        await self.storage.delete_before("processed/", cutoff)

12.3 Media Size Limits Summary

Channel Type Max Size Max Duration Recommended
Telegram Photo 10MB N/A 1280x720 JPEG
Telegram Video 50MB 60 sec 1280x720 MP4
WhatsApp Image 16MB N/A 1600x900 JPEG
WhatsApp Video 16MB 60 sec 1280x720 MP4

12.4 Temporary URL Generation

class ObjectStorage:
    """S3-compatible object storage with presigned URLs."""
    
    async def presigned_url(
        self,
        key: str,
        expiration: int = 3600,  # 1 hour default
        content_type: Optional[str] = None
    ) -> str:
        """
        Generate presigned URL for media access.
        
        Security considerations:
        - URLs expire after `expiration` seconds
        - URLs are signed with HMAC-SHA256
        - Bucket policy restricts direct access
        - IP restriction can be added for additional security
        """
        params = {
            "Bucket": self.bucket,
            "Key": key,
        }
        if content_type:
            params["ResponseContentType"] = content_type
        
        url = self.client.generate_presigned_url(
            "get_object",
            Params=params,
            ExpiresIn=expiration
        )
        
        # Log URL generation for audit
        await self.audit_log.record(
            action="presigned_url_generated",
            object_key=key,
            expiration_seconds=expiration
        )
        
        return url

13. Notification Service API

13.1 API Specification

Base URL: /api/v1/notifications


POST /send-alert

Send a new alert notification.

Request:

{
  "alert": {
    "event_type": "person_detected",
    "camera_id": "cam_01_front_entrance",
    "zone_id": "main_lobby",
    "timestamp": "2024-06-15T14:32:18Z",
    "severity": "high",
    "person_id": "person_123",
    "person_name": "John Smith",
    "person_role": "blacklist",
    "confidence": 94.5,
    "watchlist_matches": [
      {
        "list_name": "security_watchlist",
        "match_confidence": 98.2
      }
    ],
    "image_url": "https://storage.internal/detection/abc123.jpg",
    "video_url": "https://storage.internal/detection/abc123.mp4",
    "metadata": {
      "bounding_box": [100, 200, 300, 400],
      "detection_model": "yolov8-face-v2"
    }
  },
  "options": {
    "include_image": true,
    "include_video": false,
    "bypass_quiet_hours": false,
    "require_acknowledgment": true
  }
}

Response (202 Accepted):

{
  "alert_id": "alert-uuid-123",
  "status": "accepted",
  "routing_decision": {
    "matched_rules": ["rule_blacklist_always", "rule_critical_always"],
    "recipient_groups": ["security_team", "management"],
    "channels": ["telegram", "whatsapp"],
    "resolved_recipients": 5
  },
  "notifications": [
    {
      "notification_id": "notif-uuid-1",
      "channel": "telegram",
      "recipient": "-1001234567890",
      "status": "pending"
    },
    {
      "notification_id": "notif-uuid-2",
      "channel": "whatsapp",
      "recipient": "+12345678901",
      "status": "pending"
    }
  ],
  "estimated_delivery": "2024-06-15T14:32:20Z"
}

GET /delivery-status/{alert_id}

Get delivery status for all notifications of an alert.

Response (200 OK):

{
  "alert_id": "alert-uuid-123",
  "event_type": "person_detected",
  "severity": "high",
  "created_at": "2024-06-15T14:32:18Z",
  "total_notifications": 5,
  "acknowledged": false,
  "summary": {
    "pending": 0,
    "sent": 1,
    "delivered": 2,
    "read": 1,
    "failed": 0,
    "retrying": 1,
    "dead_letter": 0,
    "suppressed": 0
  },
  "notifications": [
    {
      "notification_id": "notif-uuid-1",
      "channel": "telegram",
      "recipient": {
        "id": "-1001234567890",
        "name": "Security Operations",
        "type": "group"
      },
      "template": "blacklist_alert_tg",
      "status": "delivered",
      "provider_message_id": "12345",
      "timestamps": {
        "created": "2024-06-15T14:32:18Z",
        "queued": "2024-06-15T14:32:18.100Z",
        "sent": "2024-06-15T14:32:19Z",
        "delivered": "2024-06-15T14:32:20Z"
      },
      "media": {
        "attached": true,
        "type": "image",
        "url": "https://storage.internal/processed/img-1.jpg?signature=..."
      },
      "retry_count": 0,
      "escalation_level": 0,
      "quiet_hours_applied": false
    }
  ]
}

POST /retry/{notification_id}

Manually retry a failed or dead-letter notification.

Request:

{
  "reason": "Manual retry by admin"
}

Response (202 Accepted):

{
  "notification_id": "notif-uuid-3",
  "status": "retrying",
  "previous_failures": 5,
  "retry_scheduled_at": "2024-06-15T14:35:00Z"
}

POST /retry/dead-letter/batch

Retry all notifications in the dead letter queue.

Response (200 OK):

{
  "total": 12,
  "retried": 10,
  "failed_to_retry": 2,
  "details": [
    {"notification_id": "notif-uuid-x", "status": "queued"},
    {"notification_id": "notif-uuid-y", "status": "error", "reason": "Recipient no longer active"}
  ]
}

POST /acknowledge/{alert_id}

Manually acknowledge an alert.

Request:

{
  "acknowledged_by": "admin@example.com",
  "note": "False positive - authorized visitor"
}

Response (200 OK):

{
  "alert_id": "alert-uuid-123",
  "acknowledged": true,
  "acknowledged_by": "admin@example.com",
  "acknowledged_at": "2024-06-15T14:40:00Z",
  "escalation_cancelled": true,
  "other_recipients_notified": true
}

GET /templates

List all available message templates.

Response (200 OK):

{
  "templates": [
    {
      "template_id": "person_detected_known",
      "name": "Person Detected (Known)",
      "channels": ["telegram", "whatsapp"],
      "supports_media": true,
      "placeholders": ["name", "role", "camera_name", "timestamp", "confidence"],
      "whatsapp_template_name": "person_detected_known"
    },
    {
      "template_id": "blacklist_alert",
      "name": "Blacklist Alert",
      "channels": ["telegram", "whatsapp"],
      "supports_media": true,
      "placeholders": ["name", "camera_name", "timestamp", "confidence"],
      "whatsapp_template_name": "blacklist_alert"
    }
  ]
}

POST /templates/render

Preview a rendered template (for testing).

Request:

{
  "template_id": "person_detected_known",
  "channel": "telegram",
  "placeholders": {
    "name": "John Smith",
    "role": "Employee",
    "camera_name": "Front Entrance",
    "timestamp": "2024-06-15T14:32:18Z",
    "confidence": 94.5
  },
  "include_media": false
}

Response (200 OK):

{
  "template_id": "person_detected_known",
  "channel": "telegram",
  "rendered_text": "🔍 <b>Person Detected</b>\n\n<b>John Smith</b> (Employee)\n📍 Camera: Front Entrance\n🕐 2024-06-15 at 14:32:18\n🎯 Confidence: 94.5%",
  "raw_text": "John Smith (Employee) detected at Front Entrance — 2024-06-15T14:32:18Z",
  "character_count": 156,
  "placeholders_resolved": 5,
  "placeholders_missing": []
}

GET /recipient-groups

List all recipient groups.

Response (200 OK):

{
  "groups": [
    {
      "id": "security_team",
      "name": "Security Team",
      "member_count": 3,
      "channels": ["telegram", "whatsapp"],
      "is_active": true
    },
    {
      "id": "management",
      "name": "Management",
      "member_count": 2,
      "channels": ["telegram", "whatsapp"],
      "is_active": true
    }
  ]
}

GET /recipient-groups/{group_id}

Get detailed group information.

Response (200 OK):

{
  "id": "security_team",
  "name": "Security Team",
  "description": "Primary security operations team",
  "channels": {
    "telegram": {
      "enabled": true,
      "chat_ids": ["-1001234567890"]
    },
    "whatsapp": {
      "enabled": true,
      "phone_numbers": ["+12345678901", "+12345678902"]
    }
  },
  "members": [
    {
      "id": "user_sec_001",
      "name": "John Smith",
      "role": "Security Supervisor",
      "telegram_id": "111111111",
      "phone": "+12345678901",
      "is_active": true
    }
  ],
  "settings": {
    "quiet_hours": null,
    "min_severity": "low"
  }
}

POST /recipient-groups

Create a new recipient group.

Request:

{
  "name": "Weekend Staff",
  "description": "Weekend security coverage",
  "channels": {
    "telegram": {
      "enabled": true,
      "chat_ids": ["-1009999888877"]
    },
    "whatsapp": {
      "enabled": false
    }
  },
  "settings": {
    "min_severity": "medium"
  }
}

GET /routing-rules

List all routing rules.

Response (200 OK):

{
  "rules": [
    {
      "id": "rule_blacklist_always",
      "name": "Blacklist Person Alert",
      "priority": 100,
      "enabled": true,
      "conditions_count": 1,
      "recipient_groups": ["security_team", "management"],
      "channels": ["telegram", "whatsapp"]
    }
  ],
  "total": 9,
  "active": 8
}

POST /routing-rules

Create a new routing rule.

Request:

{
  "name": "New Rule",
  "priority": 50,
  "enabled": true,
  "logic": "ALL",
  "conditions": [
    {
      "type": "camera",
      "operator": "in",
      "values": ["cam_01_front_entrance"]
    },
    {
      "type": "time_range",
      "start_time": "18:00",
      "end_time": "06:00",
      "timezone": "America/New_York"
    }
  ],
  "actions": {
    "recipient_groups": ["night_staff"],
    "channels": ["telegram"],
    "severity_override": "high",
    "bypass_quiet_hours": true
  }
}

GET /dead-letter-queue

List dead letter queue entries.

Response (200 OK):

{
  "total": 5,
  "entries": [
    {
      "notification_id": "notif-dlq-1",
      "alert_id": "alert-uuid-456",
      "channel": "whatsapp",
      "recipient": "+12345678901",
      "error": "Rate limit exceeded after 5 retries",
      "total_attempts": 5,
      "enqueued_at": "2024-06-15T10:00:00Z",
      "age_minutes": 240
    }
  ]
}

GET /statistics

Get notification statistics.

Response (200 OK):

{
  "period": "24h",
  "total_alerts": 156,
  "total_notifications": 420,
  "delivery_rate": 98.5,
  "by_channel": {
    "telegram": {
      "sent": 280,
      "delivered": 275,
      "failed": 5,
      "avg_delivery_time_ms": 850
    },
    "whatsapp": {
      "sent": 140,
      "delivered": 135,
      "failed": 5,
      "avg_delivery_time_ms": 1200
    }
  },
  "by_severity": {
    "critical": 12,
    "high": 45,
    "medium": 67,
    "low": 32
  },
  "escalations": {
    "total": 8,
    "acknowledged_before_escalation": 148,
    "avg_ack_time_seconds": 180
  },
  "dead_letter_queue": {
    "current_size": 5,
    "peak_24h": 12
  }
}

GET /health

Health check endpoint.

Response (200 OK):

{
  "status": "healthy",
  "services": {
    "telegram_api": "connected",
    "whatsapp_api": "connected",
    "redis": "connected",
    "postgresql": "connected",
    "object_storage": "connected"
  },
  "queue_depths": {
    "high_priority": 0,
    "normal_priority": 3,
    "low_priority": 12
  },
  "circuit_breakers": {
    "telegram": "closed",
    "whatsapp": "closed"
  }
}

13.2 Webhooks

Telegram Webhook

POST /webhooks/telegram
Content-Type: application/json

Body: Telegram Update object (message, callback_query, etc.)

WhatsApp Webhook

POST /webhooks/whatsapp
Content-Type: application/json
X-Hub-Signature-256: sha256=...  # Meta signature verification

Body: WhatsApp webhook payload (statuses, messages)

13.3 Authentication

All API endpoints require authentication:

security:
  method: "Bearer Token (JWT)"
  scopes:
    - "notifications:read"      # Read delivery status
    - "notifications:write"     # Send alerts, acknowledge
    - "notifications:admin"     # Manage templates, rules, groups
    - "notifications:retry"     # Retry failed notifications
  
  # Webhook verification
  telegram:
    method: "Bot token in URL path"
    
  whatsapp:
    method: "X-Hub-Signature-256 HMAC verification"

14. Configuration Schema

14.1 Full Configuration

# Full system configuration (managed via admin UI)

notification_service:
  # Service settings
  service:
    name: "AI Surveillance Notifications"
    log_level: "INFO"
    max_concurrent_sends: 50
    
  # Channel configurations
  channels:
    telegram:
      enabled: true
      bot_token: "${ENCRYPTED}"
      webhook_url: "https://api.example.com/webhooks/telegram"
      rate_limit: 30  # messages/sec
      
    whatsapp:
      enabled: true
      access_token: "${ENCRYPTED}"
      phone_number_id: "${ENCRYPTED}"
      business_account_id: "${ENCRYPTED}"
      api_version: "v18.0"
      webhook_verify_token: "${ENCRYPTED}"
      rate_limit: 80  # messages/sec
  
  # Routing
  routing:
    default_recipient_groups: ["security_team"]
    default_channels: ["telegram"]
    stop_on_first_exclusive_match: false
    
  # Escalation
  escalation:
    enabled: true
    default_threshold_minutes: 15
    max_levels: 3
    
  # Quiet hours
  quiet_hours:
    default_enabled: false
    critical_always_bypass: true
    
  # Rate limiting
  rate_limiting:
    global_max_per_minute: 200
    dedup_window_minutes: 5
    
  # Retry
  retry:
    max_retries: 5
    base_delay_seconds: 2
    exponential_base: 2
    
  # Media
  media:
    image_max_width: 1280
    image_quality: 85
    video_max_duration_seconds: 10
    video_target_resolution: [1280, 720]
    temp_url_expiry_seconds: 3600
    cleanup_after_hours: 24
    
  # Templates
  templates:
    default_language: "en"
    timezone_display: "America/New_York"
    date_format: "YYYY-MM-DD"
    time_format: "HH:mm:ss"

15. Database Schema

15.1 Entity Relationship Diagram

+----------------+       +------------------+       +------------------+
|     alerts     |       |  notifications   |       | recipient_groups |
+----------------+       +------------------+       +------------------+
| alert_id (PK)  |<-----+| notification_id  |       | group_id (PK)    |
| event_type     |       | alert_id (FK)    |       | name             |
| camera_id (FK) |       | channel          |       | description      |
| person_id (FK) |       | recipient_id     |      +| channels (json)  |
| severity       |       | template_id      |       | settings (json)  |
| confidence     |       | status           |       | is_active        |
| image_url      |       | provider_msg_id  |       | created_at       |
| video_url      |       | retry_count      |       +------------------+
| metadata(json) |       | sent_at          |              |
| acknowledged   |       | delivered_at     |              |
| created_at     |       | failed_at        |       +------------------+
+----------------+       | error            |       |  group_members   |
         |               +------------------+       +------------------+
         |                        |                  | member_id (PK)   |
         |               +------------------+       | group_id (FK)    |
         |               | status_history   |       | name             |
         |               +------------------+       | role             |
         |               | history_id (PK)  |       | telegram_id      |
         |               | notification(FK) |       | phone            |
         |               | from_status      |       | is_active        |
         |               | to_status        |       +------------------+
         |               | timestamp        |
         |               +------------------+
         |
         v
+----------------+
|   cameras      |
+----------------+
| camera_id (PK) |
| name           |
| location       |
| zone_id (FK)   |
| is_active      |
+----------------+

15.2 Core Tables

-- Alerts table
CREATE TABLE alerts (
    alert_id            UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    event_type          VARCHAR(50) NOT NULL,
    camera_id           VARCHAR(50) NOT NULL REFERENCES cameras(camera_id),
    zone_id             VARCHAR(50),
    person_id           VARCHAR(50),
    person_name         VARCHAR(255),
    person_role         VARCHAR(50),
    confidence          DECIMAL(5,2),
    severity            VARCHAR(20) NOT NULL CHECK (severity IN ('low', 'medium', 'high', 'critical')),
    image_url           TEXT,
    video_url           TEXT,
    metadata            JSONB DEFAULT '{}',
    watchlist_matches   JSONB DEFAULT '[]',
    
    -- Acknowledgment
    acknowledged        BOOLEAN NOT NULL DEFAULT FALSE,
    acknowledged_by     VARCHAR(100),
    acknowledged_via    VARCHAR(20),  -- telegram, whatsapp, dashboard
    acknowledged_at     TIMESTAMPTZ,
    acknowledgment_note TEXT,
    
    -- Escalation
    escalation_level    INT NOT NULL DEFAULT 0,
    max_escalation      INT NOT NULL DEFAULT 3,
    
    created_at          TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at          TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_alerts_created_at ON alerts(created_at);
CREATE INDEX idx_alerts_camera ON alerts(camera_id);
CREATE INDEX idx_alerts_severity ON alerts(severity);
CREATE INDEX idx_alerts_event_type ON alerts(event_type);
CREATE INDEX idx_alerts_acknowledged ON alerts(acknowledged) WHERE NOT acknowledged;

-- Cameras table
CREATE TABLE cameras (
    camera_id   VARCHAR(50) PRIMARY KEY,
    name        VARCHAR(255) NOT NULL,
    location    VARCHAR(255),
    zone_id     VARCHAR(50),
    rtsp_url    TEXT,
    is_active   BOOLEAN NOT NULL DEFAULT TRUE,
    created_at  TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- Recipient groups
CREATE TABLE recipient_groups (
    group_id    VARCHAR(50) PRIMARY KEY,
    name        VARCHAR(255) NOT NULL,
    description TEXT,
    channels    JSONB NOT NULL DEFAULT '{}',
    settings    JSONB NOT NULL DEFAULT '{}',
    is_active   BOOLEAN NOT NULL DEFAULT TRUE,
    created_at  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at  TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- Group members
CREATE TABLE group_members (
    member_id   UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    group_id    VARCHAR(50) NOT NULL REFERENCES recipient_groups(group_id) ON DELETE CASCADE,
    name        VARCHAR(255) NOT NULL,
    role        VARCHAR(100),
    telegram_id VARCHAR(50),
    phone       VARCHAR(20),
    is_active   BOOLEAN NOT NULL DEFAULT TRUE,
    added_at    TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    UNIQUE(group_id, telegram_id),
    UNIQUE(group_id, phone)
);

-- Routing rules
CREATE TABLE routing_rules (
    rule_id         VARCHAR(50) PRIMARY KEY,
    name            VARCHAR(255) NOT NULL,
    description     TEXT,
    priority        INT NOT NULL DEFAULT 50,
    enabled         BOOLEAN NOT NULL DEFAULT TRUE,
    logic           VARCHAR(10) NOT NULL DEFAULT 'ALL',  -- ALL, ANY
    stop_on_match   BOOLEAN NOT NULL DEFAULT FALSE,
    conditions      JSONB NOT NULL DEFAULT '[]',
    actions         JSONB NOT NULL DEFAULT '{}',
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at      TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- Templates
CREATE TABLE message_templates (
    template_id     VARCHAR(100) PRIMARY KEY,
    name            VARCHAR(255) NOT NULL,
    channel         VARCHAR(20) NOT NULL,
    text            TEXT NOT NULL,
    formatting      VARCHAR(20) DEFAULT 'HTML',
    supports_media  BOOLEAN NOT NULL DEFAULT FALSE,
    media_config    JSONB,
    keyboard_config JSONB,
    placeholders    JSONB NOT NULL DEFAULT '[]',
    is_active       BOOLEAN NOT NULL DEFAULT TRUE,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    UNIQUE(template_id, channel)
);

-- Dead letter queue
CREATE TABLE dead_letter_queue (
    entry_id        BIGSERIAL PRIMARY KEY,
    notification_id UUID NOT NULL,
    alert_id        UUID NOT NULL,
    channel         VARCHAR(20) NOT NULL,
    recipient_id    VARCHAR(100) NOT NULL,
    error           TEXT NOT NULL,
    total_attempts  INT NOT NULL,
    enqueued_at     TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    resolved_at     TIMESTAMPTZ,
    resolution      VARCHAR(20)  -- retried, discarded, manual
);

16. Deployment & Security

16.1 Deployment Architecture

deployment:
  # Docker Compose / Kubernetes
  services:
    notification-api:
      replicas: 2
      resources:
        cpu: "1"
        memory: "2Gi"
      
    notification-worker:
      replicas: 3
      resources:
        cpu: "1"
        memory: "2Gi"
      queues:
        - "notifications:priority:high"
        - "notifications:priority:normal"
        - "notifications:priority:low"
    
    media-processor:
      replicas: 2
      resources:
        cpu: "2"
        memory: "4Gi"  # For FFmpeg
      
    redis:
      mode: "cluster"  # For production
      
    postgresql:
      mode: "primary-replica"
      backups: "daily"

16.2 Security Checklist

Category Requirement Implementation
Authentication API endpoints secured JWT Bearer tokens
Authorization Role-based access RBAC with scopes
Encryption at Rest Bot tokens, API keys AES-256-GCM via KMS
Encryption in Transit All API calls TLS 1.3
Secret Management No secrets in code HashiCorp Vault / cloud KMS
Webhook Security Verify webhook sources HMAC signature verification
Input Validation Prevent injection Pydantic schemas, sanitization
Rate Limiting Prevent abuse Multi-level token buckets
Audit Logging All actions logged Structured JSON logs
PII Protection Phone numbers, chat IDs Encrypted, access-controlled

17. Appendix: Delivery Flow Diagrams

17.1 Complete Alert Delivery Flow

┌─────────────────┐
│ Detection Event │
│  (AI Pipeline)  │
└────────┬────────┘
         │
         ▼
┌─────────────────┐
│  Event Bus      │
│  (Redis Pub/Sub)│
└────────┬────────┘
         │
         ▼
┌──────────────────────────┐
│  1. Event Normalization  │
│     - Validate schema    │
│     - Enrich data        │
│     - Assign severity    │
└───────────┬──────────────┘
            │
            ▼
┌──────────────────────────┐     No match
│  2. Rule Engine          │─────────────────►┌──────────────┐
│     - Evaluate rules     │                  │ Log & Drop   │
│     - Match conditions   │                  └──────────────┘
│     - Resolve recipients │
└───────────┬──────────────┘
            │ Match found
            ▼
┌──────────────────────────┐
│  3. Deduplication Check  │
│     - Check Redis key    │
│     - Within 5 min window│
└───────────┬──────────────┘
            │
     ┌──────┴──────┐
     │             │
   New         Duplicate
     │             │
     ▼             ▼
┌──────────┐  ┌──────────┐
│ Continue │  │ Suppress │
│          │  │ + Counter│
└────┬─────┘  └──────────┘
     │
     ▼
┌──────────────────────────┐
│  4. Template Rendering   │
│     - Select template    │
│     - Fill placeholders  │
│     - Format per channel │
└───────────┬──────────────┘
            │
            ▼
┌──────────────────────────┐     ┌──────────┐
│  5. Media Processing     │────►│  MinIO   │
│     - Resize image       │     │ Storage  │
│     - Compress video     │     └──────────┘
│     - Generate URL       │
└───────────┬──────────────┘
            │
            ▼
┌──────────────────────────┐     Over limit
│  6. Rate Limit Check     │─────────────────►┌──────────────┐
│     - Global check       │                  │ Queue with   │
│     - Per-channel check  │                  │ delay        │
│     - Per-recipient check│                  └──────────────┘
└───────────┬──────────────┘
            │ Under limit
            ▼
┌──────────────────────────┐
│  7. Quiet Hours Check    │
     ┌──────────────────┐
     │ Critical?        │──Yes──►┌──────────────┐
     │ Bypass flag?     │        │ Send anyway  │
     └──────────────────┘        └──────────────┘
            │
     ┌──────┴──────┐
     │             │
  Allowed     In quiet hours
     │             │
     ▼             ▼
┌──────────┐  ┌──────────┐
│ Continue │  │ Suppress │
│          │  │ (log)    │
└────┬─────┘  └──────────┘
     │
     ▼
┌──────────────────────────┐
│  8. Queue by Priority    │
│     - Critical: High Q   │
│     - High/Normal: Norm Q│
│     - Low: Low Q         │
└───────────┬──────────────┘
            │
            ▼
┌──────────────────────────┐
│  9. Channel Adapter      │
│     ┌──────────────┐     │
│     │ Telegram?    │─────┼────► Send via Bot API
│     │ WhatsApp?    │─────┼────► Send via Graph API
│     └──────────────┘     │
└───────────┬──────────────┘
            │
     ┌──────┴──────┐
     │             │
   Success     Failure
     │             │
     ▼             ▼
┌──────────┐  ┌──────────────────┐
│ Track    │  │ Retry Manager    │
│ Delivery │  │ - Check retryable│
│          │  │ - Schedule retry │
└──────────┘  │ - Max 5 attempts │
              └────────┬─────────┘
                       │
                ┌──────┴──────┐
                │             │
             Retryable   Non-retryable
                │             │
                ▼             ▼
         ┌──────────┐   ┌──────────┐
         │ Re-queue │   │ Permanent│
         │ w/ delay │   │ failure  │
         └──────────┘   └──────────┘
                              │
                              ▼
                    ┌──────────────────┐
                    │ After 5 failures │
                    │ Move to DLQ      │
                    └──────────────────┘

17.2 Escalation Flow

Alert Sent
    │
    ▼
┌──────────────────┐
│ Start Escalation │
│ Timer (15 min)   │
└────────┬─────────┘
         │
         ▼
    ┌─────────┐
    │Acked?   │────Yes──►┌────────────┐
    └────┬────┘          │ Cancel timer│
         │No             └────────────┘
         ▼
    ┌─────────┐
    │Timer    │
    │fired?   │
    └────┬────┘
         │Yes
         ▼
┌──────────────────┐
│ Level 1 Escalation│
│ - Add management  │
│ - Notify via all  │
└────────┬─────────┘
         │
         ▼
    ┌─────────┐
    │Acked?   │────Yes──► Done
    └────┬────┘
         │No
         ▼
┌──────────────────┐
│ Level 2 Escalation│
│ - Increase severity│
│ - Re-notify all   │
└────────┬─────────┘
         │
         ▼
    ┌─────────┐
    │Acked?   │────Yes──► Done
    └────┬────┘
         │No
         ▼
┌──────────────────┐
│ Level 3 Escalation│
│ - All hands       │
│ - Critical severity│
│ - Bypass quiet hrs│
└──────────────────┘

17.3 Sequence Diagram: Blacklist Alert Flow

AI Detection          Event Bus        Notif Service      Telegram        WhatsApp        Admin
    │                     │                  │               │               │             │
    │─Person detected────>│                  │               │               │             │
    │  (blacklist match)  │                  │               │               │             │
    │                     │─Alert event─────>│               │               │             │
    │                     │                  │               │               │             │
    │                     │                  │─Evaluate rules│               │             │
    │                     │                  │ (rule_blacklist│              │             │
    │                     │                  │  _always match)│              │             │
    │                     │                  │               │               │             │
    │                     │                  │─Resolve groups│               │             │
    │                     │                  │ (security +   │               │             │
    │                     │                  │  management)  │               │             │
    │                     │                  │               │               │             │
    │                     │                  │─Render template│              │               │
    │                     │                  │ (blacklist    │               │             │
    │                     │                  │  template)    │               │             │
    │                     │                  │               │               │             │
    │                     │                  │─Process image─>│               │             │
    │                     │                  │ (resize + URL)│               │             │
    │                     │                  │               │               │             │
    │                     │                  │─Check rate limit              │             │
    │                     │                  │ (pass)        │               │             │
    │                     │                  │               │               │             │
    │                     │                  │─────────Send via Telegram────>│               │
    │                     │                  │               │─Message sent  │             │
    │                     │                  │               │<─message_id───│             │
    │                     │                  │               │               │             │
    │                     │                  │─────────Send via WhatsApp──────────────────>│
    │                     │                  │               │               │─Templ msg sent
    │                     │                  │               │               │<─wamid.xxx──│
    │                     │                  │               │               │             │
    │                     │                  │─Store delivery status          │             │
    │                     │                  │               │               │             │
    │                     │                  │─Start escalation timer         │             │
    │                     │                  │ (15 min)      │               │             │
    │                     │                  │               │               │             │
    │                     │                  │               │─Webhook:      │             │
    │                     │                  │               │ delivered     │             │
    │                     │                  │               │──────────────>│             │
    │                     │                  │               │               │─Webhook:    │
    │                     │                  │               │               │ delivered   │
    │                     │                  │               │               │────────────>│
    │                     │                  │               │               │             │
    │                     │                  │               │◄──Ack button──│             │
    │                     │                  │               │  (clicked)    │             │
    │                     │                  │               │               │             │
    │                     │                  │◄─Acknowledge alert────────────│             │
    │                     │                  │               │               │             │
    │                     │                  │─Cancel escalation              │             │
    │                     │                  │               │               │             │
    │                     │                  │─Notify others: "Ack by John"   │             │
    │                     │                  │───────────Update status──────>│             │
    │                     │                  │               │               │────────────>│
    │                     │                  │               │               │             │
    │                     │                  │               │               │◄─Dashboard──│
    │                     │                  │               │               │  refresh    │
    │                     │                  │               │               │             │

Appendix A: Environment Variables

Variable Description Required Default
NOTIF_TELEGRAM_ENABLED Enable Telegram channel No true
NOTIF_TELEGRAM_BOT_TOKEN Telegram bot token If enabled
NOTIF_TELEGRAM_WEBHOOK_URL Webhook URL for Telegram No auto
NOTIF_WHATSAPP_ENABLED Enable WhatsApp channel No true
NOTIF_WHATSAPP_ACCESS_TOKEN Meta access token If enabled
NOTIF_WHATSAPP_PHONE_NUMBER_ID WhatsApp phone number ID If enabled
NOTIF_WHATSAPP_BUSINESS_ACCOUNT_ID WABA ID If enabled
NOTIF_REDIS_URL Redis connection URL Yes
NOTIF_POSTGRES_URL PostgreSQL connection URL Yes
NOTIF_MINIO_ENDPOINT MinIO/S3 endpoint Yes
NOTIF_KMS_TYPE KMS type (vault/aws/azure) Yes
NOTIF_LOG_LEVEL Log level No INFO
NOTIF_MAX_RETRIES Max retry attempts No 5
NOTIF_RETRY_BASE_DELAY Retry base delay (sec) No 2
NOTIF_DEDUP_WINDOW_MINUTES Deduplication window No 5
NOTIF_DEFAULT_ESCALATION_MINUTES Escalation threshold No 15
NOTIF_MEDIA_TEMP_URL_EXPIRY Presigned URL expiry (sec) No 3600

Appendix B: Technology Stack

Layer Technology Version
API Framework FastAPI 0.110+
HTTP Client aiohttp 3.9+
Task Queue Celery + Redis 5.3+
Database PostgreSQL 15+
Cache Redis 7+
Object Storage MinIO latest
Secret Management HashiCorp Vault 1.15+
Image Processing Pillow 10+
Video Processing FFmpeg 6+
Telegram SDK python-telegram-bot 21+
WhatsApp SDK Custom (Graph API) v18.0

End of Document