# AI-Powered Industrial Surveillance Platform — Video Ingestion Subsystem Design

## Document Information
| Attribute | Value |
|---|---|
| **Document Version** | 1.0.0 |
| **Target DVR** | CP PLUS ORANGE CP-UVR-0801E1-CV2 |
| **DVR Hardware** | V1.0, System V4.001.00AT001.3.0 |
| **Channels** | 8 active |
| **Resolution** | 960x1080 per channel |
| **DVR Network** | 192.168.29.200/24, Static |
| **ONVIF** | V2.6.1.867657 (Server V19.06) |
| **Date** | 2025 |
| **Classification** | Technical Specification — Video Ingestion Subsystem |

---

## Table of Contents

1. [System Architecture Overview](#1-system-architecture-overview)
2. [RTSP Stream Configuration](#2-rtsp-stream-configuration)
3. [Edge Gateway Design](#3-edge-gateway-design)
4. [Stream Discovery & Management](#4-stream-discovery--management)
5. [Stream Processing Pipeline](#5-stream-processing-pipeline)
6. [Live Streaming for Dashboard](#6-live-streaming-for-dashboard)
7. [Edge Buffering & Local Storage](#7-edge-buffering--local-storage)
8. [Implementation Specifications](#8-implementation-specifications)
9. [Appendix](#9-appendix)

---

## 1. System Architecture Overview

### 1.1 High-Level Topology

```
+-----------------------------------------------------------------------------+
|                              CLOUD VPC (VPN TUNNEL)                          |
|  +------------------+  +------------------+  +---------------------------+  |
|  | Stream Relay     |  | AI Inference     |  | Dashboard / NVR           |  |
|  | (HLS/DASH/WebRTC)|  | Service          |  | Web UI                    |  |
|  | Port: 443        |  | GPU Cluster      |  | React/Vue Frontend        |  |
|  +--------+---------+  +--------+---------+  +---------------------------+  |
|           |                     |                                            |
|  +--------+---------+           |                                            |
|  | Metadata Bus     |           | (WebSocket: frame metadata)               |
|  | MQTT/Kafka       |<----------+                                            |
|  | Port: 8883/9094  |                                                        |
|  +--------+---------+                                                        |
+----------|------------------------------------------------------------------+
           |
           | WireGuard / OpenVPN Tunnel (UDP 51820 or TCP 443)
           |
+----------v------------------------------------------------------------------+
|                         EDGE GATEWAY (Local Network)                         |
|  Hardware: Intel NUC 13 Pro / Raspberry Pi 5 / NVIDIA Jetson Nano            |
|  OS: Ubuntu 22.04 LTS (Server)                                             |
|  LAN IP: 192.168.29.10/24                                                   |
|                                                                              |
|  +------------------+  +------------------+  +---------------------------+  |
|  | Stream Manager   |  | FFmpeg Pipeline  |  | HLS Segmenter           |  |
|  | (Python/asyncio) |  | (8x RTSP feeds)  |  | (live streaming)        |  |
|  | Port: 8080       |  | RAM buffers      |  | Port: 8081              |  |
|  +--------+---------+  +--------+---------+  +---------------------------+  |
|           |                     |                                           |
|  +--------v---------+  +--------v---------+  +---------------------------+ |
|  | Health Monitor   |  | Frame Extractor  |  | Buffer Manager            | |
|  | & Watchdog       |  | (AI decimation)  |  | (ring buffer + upload)    | |
|  +------------------+  +--------+---------+  +---------------------------+ |
|                                 |                                            |
|           +---------------------v---------------------+                      |
|           |           VPN Client (WireGuard)          |                      |
|           +---------------------+---------------------+                      |
+-------------------------------|----------------------------------------------+
                                |
                                | RTSP over TCP/UDP
                                | ONVIF over HTTP
                                |
+-------------------------------v----------------------------------------------+
|                         CP PLUS DVR (192.168.29.200)                         |
|  +----+----+----+----+----+----+----+----+                                 |
|  |CH1 |CH2 |CH3 |CH4 |CH5 |CH6 |CH7 |CH8 |  8x 960x1080 @ 25fps         |
|  +----+----+----+----+----+----+----+----+                                 |
|  RTSP:554  ONVIF:80  TCP:25001  UDP:25002  HTTP:80  HTTPS:443             |
+------------------------------------------------------------------------------+
```

### 1.2 Data Flow Summary

```
[DVR RTSP Stream] --> [Edge Gateway] --> [FFmpeg Demux] --> [Multi-target routing]
                                                                   |
                                    +----------------+-------------+----------------+
                                    |                |             |                |
                              [HLS Segmenter]  [Frame Extractor] [Buffer Mgr]   [Live Relay]
                                    |                |             |                |
                              [Dashboard]      [AI Inference] [Cloud Upload]  [Cloud Stream]
                                    |                |             |                |
                              WebSocket      MQTT/Kafka    S3/MinIO      CDN/Relay
```

### 1.3 Network Specification

| Parameter | Value |
|---|---|
| DVR IP | 192.168.29.200 |
| DVR Subnet | 255.255.255.0 (/24) |
| DVR Gateway | 192.168.29.1 |
| Edge Gateway LAN IP | 192.168.29.10 (static) |
| VPN Tunnel Network | 10.100.0.0/24 (WireGuard) |
| Cloud Relay Endpoint | 10.100.0.1 (VPN server) |
| RTSP Transport | TCP interleaved (preferred) / UDP |
| MTU | 1400 (WireGuard tunnel) |
| QoS | DSCP EF (46) for RTSP, DSCP AF41 (34) for HLS |

### 1.4 Bandwidth Budget (960x1080 @ 25fps)

| Stream Type | Codec | Expected Bitrate | 8-Channel Total |
|---|---|---|---|
| Main Stream (primary) | H.265 | 2-4 Mbps | 16-32 Mbps |
| Sub Stream (secondary) | H.264 | 512-1024 Kbps | 4-8 Mbps |
| AI Extraction Stream | H.264 | 256-512 Kbps | 2-4 Mbps |
| HLS Output (live) | H.264 | 1-2 Mbps | 8-16 Mbps |
| **Total LAN Ingress** | — | — | **~30-60 Mbps** |
| **Cloud Egress (VPN)** | — | — | **~8-16 Mbps (optimized)** |

---

## 2. RTSP Stream Configuration

### 2.1 RTSP URL Patterns for CP PLUS ORANGE Series

CP PLUS ORANGE Series DVRs use a Dahua-compatible RTSP URL scheme. Two URL formats are supported:

#### Primary (Main Stream) — Full Resolution, Higher Bitrate
```
rtsp://admin:{PASSWORD}@192.168.29.200:554/cam/realmonitor?channel={N}&subtype=0
```

#### Secondary (Sub Stream) — Lower Resolution, Lower Bitrate
```
rtsp://admin:{PASSWORD}@192.168.29.200:554/cam/realmonitor?channel={N}&subtype=1
```

#### Alternative Format (Media URL with authentication in headers)
```
rtsp://192.168.29.200:554/cam/realmonitor?channel={N}&subtype={M}&unicast=true&proto=Onvif
```
> Auth via `DESCRIBE` request `Authorization: Digest username="admin", ...` header.

### 2.2 Channel-to-URL Mapping

| Channel | Location | Primary URL (subtype=0) | Secondary URL (subtype=1) |
|---|---|---|---|
| CH1 | Gate/Entry | `rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0` | `rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=1` |
| CH2 | Production Floor A | `rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=2&subtype=0` | `rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=2&subtype=1` |
| CH3 | Production Floor B | `rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=3&subtype=0` | `rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=3&subtype=1` |
| CH4 | Warehouse | `rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=4&subtype=0` | `rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=4&subtype=1` |
| CH5 | Loading Dock | `rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=5&subtype=0` | `rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=5&subtype=1` |
| CH6 | Office Corridor | `rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=6&subtype=0` | `rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=6&subtype=1` |
| CH7 | Server Room | `rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=7&subtype=0` | `rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=7&subtype=1` |
| CH8 | Perimeter | `rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=8&subtype=0` | `rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=8&subtype=1` |

### 2.3 Stream Usage Strategy

| Purpose | Stream Type | Rationale |
|---|---|---|
| AI Inference / Frame Extraction | Sub Stream (subtype=1) | Lower bandwidth, sufficient for detection, reduces compute |
| Live Dashboard (grid view) | Sub Stream (subtype=1) | 8-camera grid needs lower per-stream bandwidth |
| Live Dashboard (fullscreen) | Main Stream (subtype=0) | Full quality for single-camera focus |
| Event Recording / Alert Clips | Main Stream (subtype=0) | Maximum quality for forensic review |
| Cloud Archive | Main Stream (subtype=0) | Best quality for long-term storage |

### 2.4 Codec Expectations

| Parameter | Main Stream (subtype=0) | Sub Stream (subtype=1) |
|---|---|---|
| Video Codec | H.265 (HEVC) or H.264 | H.264 (AVC) |
| Audio Codec | G.711A/u (if enabled) | G.711A/u (if enabled) |
| Resolution | 960x1080 | CIF to D1 (typically 352x288 or 704x576) |
| Frame Rate | 25 fps | 15-25 fps |
| Bitrate | 2048-4096 Kbps | 512-1024 Kbps |
| GOP Size | 25-50 frames | 25-50 frames |
| Pixel Format | YUV420P | YUV420P |

**Note:** The DVR disk is FULL (0 bytes free). This means the DVR cannot perform local recording. Stream ingestion must not depend on DVR-side recording or playback features. All archival is edge-managed.

### 2.5 FFmpeg RTSP Connection Parameters

```
-rtsp_transport tcp          # Force TCP interleaved (NAT/firewall friendly)
-stimeout 5000000            # Socket timeout in microseconds (5 seconds)
-reconnect 1                 # Enable reconnection
-reconnect_at_eof 1          # Reconnect at EOF
-reconnect_streamed 1        # Reconnect streamed content
-reconnect_delay_max 30      # Max reconnect delay (seconds)
-fflags +genpts              # Generate presentation timestamps
-flags +low_delay            # Low latency mode
-probesize 5000000           # Probe size for codec detection
-analyzeduration 2000000     # Analyze duration in microseconds
```

### 2.6 Critical RTSP Notes for CP PLUS DVR

1. **TCP Interleaved is mandatory** — UDP may drop behind NAT; CP PLUS DVRs work most reliably with `rtsp_transport tcp`
2. **Authentication** — Uses Digest authentication (Basic is disabled on newer firmware)
3. **Session limit** — The DVR supports a finite number of concurrent RTSP sessions (typically 10-20). With 8 channels and dual streams, the gateway may open up to 16 concurrent sessions. Stay within limits.
4. **No audio in subtype=1** — Sub streams typically carry no audio; main stream may include audio
5. **Channel numbering** — Channels are 1-indexed (1-8), not 0-indexed
6. **Disk full impact** — Zero bytes free on DVR means: (a) no local recording possible, (b) some DVR functions may be degraded, (c) stream serving typically still works but monitor performance

---

## 3. Edge Gateway Design

### 3.1 Hardware Specifications

**Recommended Platform: Intel NUC 13 Pro (i5-1340P)**

| Component | Specification |
|---|---|
| CPU | Intel Core i5-1340P (12 cores, 16 threads) |
| RAM | 16GB DDR4-3200 |
| Storage | 512GB NVMe SSD (for buffering) |
| LAN | Intel i226-V 2.5GbE |
| USB | 3x USB 3.2 (for expansion) |
| Power | 19V DC, 65W |
| OS | Ubuntu 22.04.4 LTS Server (no GUI) |
| Docker | Docker CE 25.x, Docker Compose 2.x |

**Alternative: NVIDIA Jetson Orin Nano (for on-device AI inference)**

| Component | Specification |
|---|---|
| CPU | 6-core Arm Cortex-A78AE |
| GPU | 1024-core NVIDIA Ampere |
| RAM | 8GB LPDDR5 |
| AI Perf | 40 TOPS |
| Storage | 256GB NVMe via M.2 |
| Power | 7W-15W |
| OS | JetPack 6.0 (Ubuntu 22.04 based) |

**Minimum Viable: Raspberry Pi 5**

| Component | Specification |
|---|---|
| CPU | BCM2712 2.4GHz quad-core |
| RAM | 8GB LPDDR4X |
| Storage | 256GB USB3 NVMe |
| LAN | Gigabit Ethernet |
| Power | 5V 5A USB-C |
| Note | Sufficient for 8x sub-stream ingestion + HLS relay; may struggle with 8x main stream |

### 3.2 Edge Gateway Software Architecture

```
+-------------------------- Edge Gateway (Docker Host) ------------------------+
|                                                                             |
|  +-------------------------+  +-------------------------+  +--------------+ |
|  |   stream-manager        |  |   hls-segmenter         |  |   vpn-client | |
|  |   (Python/FastAPI)      |  |   (FFmpeg/nginx-rtmp)   |  |   (WireGuard)| |
|  |   Port: 8080            |  |   Port: 8081            |  |   Port: 51820| |
|  |                         |  |                         |  |              | |
|  |  - Camera registry      |  |  - HLS manifest gen     |  |  - Persistent| |
|  |  - Stream lifecycle     |  |  - Segment rotation     |  |    tunnel    | |
|  |  - Health monitoring    |  |  - DRM/auth tokens      |  |  - Auto-rec  | |
|  |  - REST API             |  |  - Adaptive bitrate     |  |  - Kill sw   | |
|  +-------------------------+  +-------------------------+  +--------------+ |
|            |                           |                                   |
|  +---------v---------+        +--------v---------+  +-------------------+ |
|  |   frame-extractor |        |   buffer-manager |  |   health-monitor  | |
|  |   (Python/CV2)    |        |   (Python)       |  |   (Python)        | |
|  |                   |        |                  |  |                   | |
|  |  - Keyframe decim.|        |  - Ring buffer   |  |  - DVR ping       | |
|  |  - JPEG/PNG encode|        |  - Upload queue  |  |  - Stream FPS chk | |
|  |  - MQTT publish   |        |  - Retry logic   |  |  - Watchdog       | |
|  |  - Buffer frames  |        |  - Disk mgmt     |  |  - Alert emit     | |
|  +---------+---------+        +--------+---------+  +-------------------+ |
|            |                           |                                   |
|  +---------v---------+                 |                                   |
|  |   mqtt-client     |<----------------+                                   |
|  |   (Eclipse-MQTT)  |                                                     |
|  |   Port: 1883      |                                                     |
|  +-------------------+                                                     |
|                                                                             |
|  Shared Volumes:                                                            |
|    /data/buffer/    - Alert clip ring buffer (20GB max)                    |
|    /data/hls/       - HLS segments for dashboard (5GB max)                 |
|    /data/config/    - YAML/JSON configuration files                        |
|    /data/logs/      - Application logs (rotated, 1GB max)                  |
|    /tmp/ffmpeg/     - Temporary FFmpeg pipes and segments                  |
+-----------------------------------------------------------------------------+
```

### 3.3 Container Specifications

| Container | Image | CPU Limit | Memory | Ports | Purpose |
|---|---|---|---|---|---|
| stream-manager | Custom Python 3.11 slim | 2 cores | 512MB | 8080/tcp | Core orchestration |
| ffmpeg-worker-1 | jrottenberg/ffmpeg:6.1 | 2 cores | 256MB | — | CH1-CH4 RTSP ingestion |
| ffmpeg-worker-2 | jrottenberg/ffmpeg:6.1 | 2 cores | 256MB | — | CH5-CH8 RTSP ingestion |
| hls-segmenter | alpine/nginx + ffmpeg | 1 core | 256MB | 8081/tcp | HLS manifest serving |
| buffer-manager | Custom Python 3.11 slim | 1 core | 256MB | — | Clip storage & upload |
| mqtt-client | eclipse-mosquitto:2.0 | 0.5 core | 128MB | 1883/tcp | Local MQTT broker |
| vpn-client | lscr.io/linuxserver/wireguard | 0.5 core | 128MB | — | VPN tunnel |
| health-monitor | Custom Python 3.11 slim | 0.5 core | 128MB | — | System watchdog |
| prometheus | prom/prometheus | 0.5 core | 256MB | 9090/tcp | Metrics collection |
| node-exporter | prom/node-exporter | 0.25 core | 64MB | 9100/tcp | Host metrics |

### 3.4 VPN Tunnel Specification (WireGuard)

```ini
# /etc/wireguard/wg0.conf (Edge Gateway)
[Interface]
PrivateKey = {EDGE_PRIVATE_KEY}
Address = 10.100.0.10/24
DNS = 10.100.0.1
MTU = 1400

# Persistent keepalive for NAT traversal
PersistentKeepalive = 25

# Post-up rules: mark packets for routing
PostUp = iptables -t mangle -A OUTPUT -p tcp --dport 554 -j DSCP --set-dscp-class EF
PostUp = iptables -t nat -A POSTROUTING -o wg0 -j MASQUERADE

# Post-down: cleanup
PostDown = iptables -t mangle -D OUTPUT -p tcp --dport 554 -j DSCP --set-dscp-class EF || true
PostDown = iptables -t nat -D POSTROUTING -o wg0 -j MASQUERADE || true

[Peer]
PublicKey = {CLOUD_PUBLIC_KEY}
PresharedKey = {PSK}
AllowedIPs = 10.100.0.0/24
Endpoint = cloud-gateway.example.com:51820
PersistentKeepalive = 25
```

**Cloud-side peer configuration:**
```ini
# Cloud VPN Server
[Peer]
PublicKey = {EDGE_PUBLIC_KEY}
PresharedKey = {PSK}
AllowedIPs = 10.100.0.10/32
```

**VPN Routing Table:**
| Destination | Gateway | Interface | Notes |
|---|---|---|---|
| 192.168.29.0/24 | 192.168.29.1 | eth0 | Local DVR network |
| 10.100.0.0/24 | — | wg0 | VPN tunnel network |
| 0.0.0.0/0 | 192.168.29.1 | eth0 | Default route |
| 10.100.0.1 | — | wg0 | Cloud relay (via tunnel) |

### 3.5 Edge Gateway Environment Variables

```bash
# Core
GATEWAY_ID=edge-ind-01
GATEWAY_LOCATION="Plant Floor A"
LOG_LEVEL=INFO

# DVR Connection
DVR_HOST=192.168.29.200
DVR_RTSP_PORT=554
DVR_HTTP_PORT=80
DVR_USERNAME=admin
DVR_PASSWORD={SECRET_FROM_ENV_FILE}
DVR_CHANNELS=8

# Streams
STREAM_TRANSPORT=tcp          # tcp or udp
STREAM_PRIMARY_SUBTYPE=0      # main stream
STREAM_SECONDARY_SUBTYPE=1    # sub stream
STREAM_BUFFER_SIZE=65536      # RTSP buffer
STREAM_RECONNECT_MAX_DELAY=30 # seconds
STREAM_STALL_TIMEOUT=10       # seconds without frame = stalled

# AI Pipeline
AI_INFERENCE_FPS=5            # extract 5 fps for AI
AI_KEYFRAME_ONLY=true         # only I-frames for analysis
AI_RESOLUTION=640x480         # resize for inference
AI_MQTT_TOPIC=ai/frames       # MQTT topic prefix
AI_CONFIDENCE_THRESHOLD=0.6

# HLS
HLS_SEGMENT_DURATION=2        # seconds per segment
HLS_WINDOW_SIZE=5             # segments in playlist
HLS_OUTPUT_DIR=/data/hls
HLS_CLEANUP_INTERVAL=60       # seconds

# Buffer/Storage
BUFFER_MAX_SIZE_GB=20         # ring buffer max size
BUFFER_RETENTION_HOURS=72     # max age before deletion
UPLOAD_ENDPOINT=https://10.100.0.1/api/upload
UPLOAD_MAX_RETRIES=5
UPLOAD_RETRY_BASE_DELAY=5     # seconds
UPLOAD_CHUNK_SIZE_MB=10

# Monitoring
HEALTH_CHECK_INTERVAL=30      # seconds
METRICS_PORT=9090
WEB_UI_PORT=8080

# VPN
VPN_ENABLED=true
VPN_TUNNEL_IP=10.100.0.10
CLOUD_RELAY_IP=10.100.0.1
```



---

## 4. Stream Discovery & Management

### 4.1 ONVIF-Based Discovery

CP PLUS DVR ONVIF server V19.06 (V2.6.1.867657) supports:
- Device Management Service (core)
- Media Service (stream URI retrieval)
- PTZ Service (if applicable)
- Event Service (basic)

**ONVIF Endpoints:**
```
Device Service:    http://192.168.29.200:80/onvif/device_service
Media Service:     http://192.168.29.200:80/onvif/media_service
Event Service:     http://192.168.29.200:80/onvif/event_service
```

**Discovery workflow:**
```python
# Pseudocode for ONVIF discovery
from onvif import ONVIFCamera

def discover_cameras(dvr_host, dvr_port, username, password):
    camera = ONVIFCamera(dvr_host, dvr_port, username, password)
    
    # Get device info
    devicemgmt = camera.create_devicemgmt_service()
    device_info = devicemgmt.GetDeviceInformation()
    # Returns: Manufacturer, Model, FirmwareVersion, SerialNumber, HardwareId
    
    # Get media profiles (streams)
    media = camera.create_media_service()
    profiles = media.GetProfiles()
    
    for profile in profiles:
        # Get stream URI for each profile
        token = profile.token
        stream_uri = media.GetStreamUri(
            StreamSetup={'Stream': 'RTP_unicast', 'Transport': {'Protocol': 'RTSP'}},
            ProfileToken=token
        )
        # Returns: rtsp://192.168.29.200:554/cam/realmonitor?channel=N&subtype=M
        
        # Get video encoder configuration
        if profile.VideoEncoderConfiguration:
            ve = profile.VideoEncoderConfiguration
            # ve.Resolution, ve.RateControl.BitrateLimit, ve.RateControl.FrameRateLimit
            
        # Get video source configuration
        if profile.VideoSourceConfiguration:
            vs = profile.VideoSourceConfiguration
            # vs.Bounds (resolution), vs.ViewMode
    
    return profiles
```

**Expected ONVIF Response for CP PLUS CP-UVR-0801E1-CV2:**
```xml
<GetDeviceInformationResponse>
    <Manufacturer>CP PLUS</Manufacturer>
    <Model>CP-UVR-0801E1-CV2</Model>
    <FirmwareVersion>V4.001.00AT001.3.0</FirmwareVersion>
    <SerialNumber>{device_serial}</SerialNumber>
    <HardwareId>V1.0</HardwareId>
</GetDeviceInformationResponse>
```

### 4.2 Manual Camera Configuration

When ONVIF is unavailable or for manual override, cameras are defined in YAML:

```yaml
# /data/config/cameras.yaml
cameras:
  - channel: 1
    name: "Gate/Entry"
    enabled: true
    primary_stream:
      url: "rtsp://admin:{PASSWORD}@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0"
      codec: "h265"
      resolution: "960x1080"
      fps: 25
    secondary_stream:
      url: "rtsp://admin:{PASSWORD}@192.168.29.200:554/cam/realmonitor?channel=1&subtype=1"
      codec: "h264"
      resolution: "704x576"
      fps: 15
    ai_config:
      enabled: true
      use_secondary: true
      inference_fps: 5
      keyframe_only: true
      roi: [0, 0, 960, 1080]
    live_config:
      dashboard_default: "secondary"  # grid view uses sub stream
      fullscreen_uses: "primary"
    recording_config:
      enabled: true
      use_primary: true
      pre_event_seconds: 10
      post_event_seconds: 30

  - channel: 2
    name: "Production Floor A"
    enabled: true
    # ... (same structure)

  # ... CH3 through CH8

settings:
  default_ai_fps: 5
  default_inference_resolution: "640x480"
  stream_stall_timeout: 10
  reconnect_max_delay: 30
  circuit_breaker:
    failure_threshold: 5
    recovery_timeout: 60
```

### 4.3 Stream Puller with FFmpeg — Full Command Reference

#### 4.3.1 Basic Stream Pull (Validation)
```bash
# Channel 1 - Main Stream (validation probe)
ffmpeg -rtsp_transport tcp \
  -stimeout 5000000 \
  -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
  -c copy -f null - \
  -v quiet -print_format json \
  -show_format -show_streams \
  2>/dev/null | jq .
```

#### 4.3.2 AI Frame Extraction (Keyframe, Decimated)
```bash
# Extract I-frames only at 5fps for AI inference (sub stream)
ffmpeg -rtsp_transport tcp -stimeout 5000000 -reconnect 1 -reconnect_at_eof 1 \
  -reconnect_streamed 1 -reconnect_delay_max 30 \
  -fflags +genpts+discardcorrupt -flags +low_delay \
  -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=1" \
  -vf "select='eq(pict_type,I)',fps=5,scale=640:480" \
  -f image2pipe -vcodec mjpeg -q:v 2 \
  pipe:1
```

#### 4.3.3 HLS Segment Generation (All 8 Channels)
```bash
# Single channel HLS (primary stream)
ffmpeg -rtsp_transport tcp -stimeout 5000000 -reconnect 1 -reconnect_at_eof 1 \
  -reconnect_streamed 1 -reconnect_delay_max 30 \
  -fflags +genpts+discardcorrupt -flags +low_delay \
  -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
  -c:v copy -c:a aac -b:a 128k \
  -f hls -hls_time 2 -hls_list_size 5 -hls_flags delete_segments+omit_endlist \
  -hls_segment_filename "/data/hls/ch1_%03d.ts" \
  "/data/hls/ch1.m3u8"
```

#### 4.3.4 Simultaneous Multi-Output (Single FFmpeg Instance)
```bash
# Single RTSP input -> HLS + frame extraction + recording (tee pseudo-muxer)
ffmpeg -rtsp_transport tcp -stimeout 5000000 -reconnect 1 -reconnect_at_eof 1 \
  -reconnect_streamed 1 -reconnect_delay_max 30 \
  -fflags +genpts+discardcorrupt -flags +low_delay \
  -probesize 5000000 -analyzeduration 2000000 \
  -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
  \
  # Output 1: HLS for live viewing
  -c:v copy -c:a copy \
  -f hls -hls_time 2 -hls_list_size 5 \
  -hls_flags delete_segments+omit_endlist \
  -hls_segment_filename "/data/hls/ch1_%03d.ts" \
  "/data/hls/ch1.m3u8" \
  \
  # Output 2: Transcoded sub-stream for AI (scaled, keyframes)
  -vf "select='eq(pict_type,I)',fps=5,scale=640:480" \
  -f image2pipe -vcodec mjpeg -q:v 2 \
  pipe:1 \
  \
  # Output 3: MP4 recording (on event trigger)
  -c:v copy -c:a copy \
  -f segment -segment_time 60 -segment_format mp4 \
  -reset_timestamps 1 \
  -strftime 1 "/data/buffer/ch1_%Y%m%d_%H%M%S.mp4"
```

#### 4.3.5 UDP Fallback Command (when TCP fails)
```bash
ffmpeg -rtsp_transport udp -buffer_size 65536 \
  -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
  -c copy -f hls -hls_time 2 -hls_list_size 5 \
  -hls_segment_filename "/data/hls/ch1_%03d.ts" \
  "/data/hls/ch1.m3u8"
```

#### 4.3.8 Complete 8-Channel Parallel Launch Script
```bash
#!/bin/bash
# /usr/local/bin/start_streams.sh
# Launches all 8 channel streams with individual control

DVR_IP="192.168.29.200"
DVR_USER="admin"
DVR_PASS="${DVR_PASSWORD}"
HLS_DIR="/data/hls"
BUFFER_DIR="/data/buffer"
FFMPEG_OPTS="-rtsp_transport tcp -stimeout 5000000 -reconnect 1 -reconnect_at_eof 1 \
  -reconnect_streamed 1 -reconnect_delay_max 30 -fflags +genpts+discardcorrupt \
  -flags +low_delay -probesize 5000000 -analyzeduration 2000000 -nostdin"

for CH in {1..8}; do
  # Determine stream type: channels 1-4 use main, 5-8 use sub (bandwidth management)
  if [ $CH -le 4 ]; then
    SUBTYPE=0
    QUALITY="high"
  else
    SUBTYPE=1
    QUALITY="medium"
  fi

  RTSP_URL="rtsp://${DVR_USER}:${DVR_PASS}@${DVR_IP}:554/cam/realmonitor?channel=${CH}&subtype=${SUBTYPE}"

  # Launch with process monitoring
  ffmpeg ${FFMPEG_OPTS} \
    -i "${RTSP_URL}" \
    -c:v copy -c:a copy \
    -f hls -hls_time 2 -hls_list_size 5 \
    -hls_flags delete_segments+omit_endlist \
    -hls_segment_filename "${HLS_DIR}/ch${CH}_%03d.ts" \
    "${HLS_DIR}/ch${CH}.m3u8" \
    2>"${BUFFER_DIR}/ch${CH}_ffmpeg.log" &

  echo $! > "/var/run/stream_ch${CH}.pid"
  echo "Started channel ${CH} (${QUALITY}) with PID $!"
done

echo "All 8 channels launched. PIDs stored in /var/run/stream_ch*.pid"
```

### 4.4 Auto-Reconnect Strategy

```python
# Auto-reconnect with exponential backoff
import asyncio
import logging
from dataclasses import dataclass
from enum import Enum
from typing import Optional

logger = logging.getLogger(__name__)

class StreamState(Enum):
    CONNECTING = "connecting"
    ONLINE = "online"
    STALLED = "stalled"
    OFFLINE = "offline"
    CIRCUIT_OPEN = "circuit_open"

@dataclass
class ReconnectConfig:
    initial_delay: float = 1.0
    max_delay: float = 30.0
    multiplier: float = 2.0
    jitter: float = 0.1
    stall_timeout: float = 10.0
    circuit_breaker_threshold: int = 5
    circuit_breaker_recovery: float = 60.0

class StreamConnector:
    """
    Manages a single RTSP stream connection with:
    - Exponential backoff reconnect
    - Frame-level stall detection
    - Circuit breaker for persistent failures
    - Graceful shutdown
    """

    def __init__(self, channel: int, config: ReconnectConfig, ffmpeg_cmd: list):
        self.channel = channel
        self.cfg = config
        self.ffmpeg_cmd = ffmpeg_cmd
        self.state = StreamState.OFFLINE
        self.current_delay = config.initial_delay
        self.consecutive_failures = 0
        self.circuit_open_time: Optional[float] = None
        self.last_frame_time: Optional[float] = None
        self._process: Optional[asyncio.subprocess.Process] = None
        self._task: Optional[asyncio.Task] = None
        self._shutdown = False

    async def start(self):
        self._task = asyncio.create_task(self._connection_loop())
        asyncio.create_task(self._stall_monitor())

    async def _connection_loop(self):
        while not self._shutdown:
            if self.state == StreamState.CIRCUIT_OPEN:
                elapsed = asyncio.get_event_loop().time() - self.circuit_open_time
                if elapsed < self.cfg.circuit_breaker_recovery:
                    await asyncio.sleep(1)
                    continue
                # Circuit breaker recovery
                self.consecutive_failures = 0
                self.state = StreamState.OFFLINE
                logger.info(f"CH{self.channel}: Circuit breaker recovered")

            if self.state == StreamState.OFFLINE:
                await self._attempt_connect()
            elif self.state == StreamState.STALLED:
                await self._handle_stall()

            await asyncio.sleep(0.5)

    async def _attempt_connect(self):
        self.state = StreamState.CONNECTING
        logger.info(f"CH{self.channel}: Connecting (delay={self.current_delay:.1f}s)...")
        await asyncio.sleep(self.current_delay)

        try:
            self._process = await asyncio.create_subprocess_exec(
                *self.ffmpeg_cmd,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE,
            )

            # Wait for process to stabilize
            await asyncio.sleep(3)

            if self._process.returncode is not None:
                raise ConnectionError(f"FFmpeg exited with code {self._process.returncode}")

            self.state = StreamState.ONLINE
            self.last_frame_time = asyncio.get_event_loop().time()
            self.current_delay = self.cfg.initial_delay
            self.consecutive_failures = 0
            logger.info(f"CH{self.channel}: Online")

            # Monitor the stream
            await self._monitor_stream()

        except Exception as e:
            self.consecutive_failures += 1
            self.current_delay = min(
                self.current_delay * self.cfg.multiplier,
                self.cfg.max_delay
            )

            if self.consecutive_failures >= self.cfg.circuit_breaker_threshold:
                self.state = StreamState.CIRCUIT_OPEN
                self.circuit_open_time = asyncio.get_event_loop().time()
                logger.error(
                    f"CH{self.channel}: Circuit breaker OPEN after "
                    f"{self.consecutive_failures} failures"
                )
            else:
                self.state = StreamState.OFFLINE
                logger.warning(f"CH{self.channel}: Connection failed ({e}), "
                               f"retry in {self.current_delay:.1f}s")

    async def _monitor_stream(self):
        """Monitor stderr output for frame reception indicators."""
        while self.state == StreamState.ONLINE and self._process:
            try:
                line = await asyncio.wait_for(
                    self._process.stderr.readline(), timeout=5.0
                )
                if line:
                    self.last_frame_time = asyncio.get_event_loop().time()
                    # Parse frame info from FFmpeg output
                    # Example: "frame=  125 fps= 25 q=-1.0 Lsize= ..."
                    decoded = line.decode('utf-8', errors='ignore').strip()
                    if 'frame=' in decoded:
                        self._update_metrics(decoded)
            except asyncio.TimeoutError:
                logger.debug(f"CH{self.channel}: No stderr output (may be normal)")

    async def _stall_monitor(self):
        """Independent task that detects stalls based on frame timestamps."""
        while not self._shutdown:
            await asyncio.sleep(1)
            if self.state == StreamState.ONLINE and self.last_frame_time:
                elapsed = asyncio.get_event_loop().time() - self.last_frame_time
                if elapsed > self.cfg.stall_timeout:
                    logger.warning(
                        f"CH{self.channel}: STALL detected — "
                        f"no frames for {elapsed:.1f}s"
                    )
                    self.state = StreamState.STALLED

    async def _handle_stall(self):
        """Kill the stuck process and reconnect."""
        logger.info(f"CH{self.channel}: Restarting stalled stream")
        await self._kill_process()
        self.state = StreamState.OFFLINE
        self.current_delay = self.cfg.initial_delay

    async def _kill_process(self):
        if self._process and self._process.returncode is None:
            try:
                self._process.terminate()
                await asyncio.wait_for(self._process.wait(), timeout=5.0)
            except asyncio.TimeoutError:
                self._process.kill()
                await self._process.wait()
        self._process = None

    def _update_metrics(self, line: str):
        """Parse and publish frame metrics."""
        # Parse: frame=  125 fps= 25 q=-1.0
        metrics = {}
        for part in line.split():
            if '=' in part:
                key, _, val = part.partition('=')
                metrics[key] = val
        if 'fps' in metrics:
            self.current_fps = float(metrics['fps'])

    async def stop(self):
        self._shutdown = True
        if self._task:
            self._task.cancel()
            try:
                await self._task
            except asyncio.CancelledError:
                pass
        await self._kill_process()

    @property
    def is_healthy(self) -> bool:
        return self.state == StreamState.ONLINE
```

### 4.5 Circuit Breaker Pattern

```python
# circuit_breaker.py
import time
from enum import Enum
from dataclasses import dataclass

class CircuitState(Enum):
    CLOSED = "closed"       # Normal operation
    OPEN = "open"           # Failing fast
    HALF_OPEN = "half_open" # Testing recovery

@dataclass
class CircuitBreakerConfig:
    failure_threshold: int = 5      # failures before opening
    recovery_timeout: float = 60.0   # seconds before half-open
    half_open_max_calls: int = 3    # test calls in half-open state
    success_threshold: int = 2      # consecutive successes to close

class CircuitBreaker:
    """Circuit breaker for persistent stream failures."""

    def __init__(self, name: str, config: CircuitBreakerConfig):
        self.name = name
        self.cfg = config
        self.state = CircuitState.CLOSED
        self.failures = 0
        self.successes = 0
        self.last_failure_time: Optional[float] = None
        self.half_open_calls = 0

    def record_success(self):
        if self.state == CircuitState.HALF_OPEN:
            self.successes += 1
            self.half_open_calls += 1
            if self.successes >= self.cfg.success_threshold:
                self._close_circuit()
        elif self.state == CircuitState.CLOSED:
            self.failures = max(0, self.failures - 1)

    def record_failure(self) -> bool:
        self.failures += 1
        self.last_failure_time = time.time()

        if self.state == CircuitState.HALF_OPEN:
            self.half_open_calls += 1
            if self.half_open_calls >= self.cfg.half_open_max_calls:
                self._open_circuit()
            return False

        if self.failures >= self.cfg.failure_threshold:
            self._open_circuit()
            return False
        return True  # Still allowed

    def can_execute(self) -> bool:
        if self.state == CircuitState.CLOSED:
            return True
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time >= self.cfg.recovery_timeout:
                self.state = CircuitState.HALF_OPEN
                self.half_open_calls = 0
                self.successes = 0
                return True
            return False
        if self.state == CircuitState.HALF_OPEN:
            return self.half_open_calls < self.cfg.half_open_max_calls
        return False

    def _open_circuit(self):
        self.state = CircuitState.OPEN
        self.failures = self.cfg.failure_threshold
        # Alert published here

    def _close_circuit(self):
        self.state = CircuitState.CLOSED
        self.failures = 0
        self.successes = 0
        self.half_open_calls = 0
```

### 4.6 Per-Camera Status Model

```python
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional

@dataclass
class CameraStatus:
    """Real-time status for a single camera stream."""
    channel: int
    name: str
    enabled: bool = True
    state: str = "OFFLINE"           # OFFLINE, CONNECTING, ONLINE, STALLED, ERROR
    stream_type: str = "secondary"   # primary or secondary currently active
    current_fps: float = 0.0
    target_fps: float = 25.0
    current_bitrate_kbps: float = 0.0
    resolution: str = "unknown"
    codec: str = "unknown"
    last_frame_time: Optional[datetime] = None
    connection_time: Optional[datetime] = None
    disconnect_count: int = 0
    total_bytes_received: int = 0
    latency_ms: float = 0.0
    keyframe_interval: float = 0.0
    circuit_breaker_state: str = "CLOSED"
    error_message: Optional[str] = None
    ai_fps_actual: float = 0.0
    ai_frames_processed: int = 0
    hls_segments_available: int = 0
    recording_active: bool = False
    tags: dict = field(default_factory=dict)

    @property
    def is_healthy(self) -> bool:
        return self.state == "ONLINE" and self.current_fps > 0

    @property
    def uptime_seconds(self) -> float:
        if self.connection_time:
            return (datetime.utcnow() - self.connection_time).total_seconds()
        return 0.0

    def to_dict(self) -> dict:
        return {
            "channel": self.channel,
            "name": self.name,
            "enabled": self.enabled,
            "state": self.state,
            "stream_type": self.stream_type,
            "fps": {
                "current": round(self.current_fps, 1),
                "target": self.target_fps,
            },
            "bitrate_kbps": round(self.current_bitrate_kbps, 1),
            "resolution": self.resolution,
            "codec": self.codec,
            "last_frame_time": self.last_frame_time.isoformat() if self.last_frame_time else None,
            "uptime_seconds": self.uptime_seconds,
            "disconnect_count": self.disconnect_count,
            "circuit_breaker": self.circuit_breaker_state,
            "ai_fps_actual": round(self.ai_fps_actual, 1),
            "hls_ready": self.hls_segments_available > 0,
            "healthy": self.is_healthy,
        }
```



---

## 5. Stream Processing Pipeline

### 5.1 Pipeline Architecture

```
+----------+    +-------------+    +------------------+    +---------------+
|  RTSP    | -> |   FFmpeg    | -> |  Frame Router    | -> |  Multiple     |
|  Source  |    |   Demuxer   |    |  & Tee           |    |  Consumers    |
+----------+    +-------------+    +------------------+    +---------------+
                                           |
                    +----------------------+----------------------+
                    |                      |                      |
            +-------v-------+    +---------v---------+    +-------v--------+
            | HLS Segmenter |    | Frame Extractor   |    | Clip Recorder  |
            | (live view)   |    | (AI inference)    |    | (events)       |
            +---------------+    +---------+---------+    +----------------+
                                          |
                              +-----------+-----------+
                              |                       |
                    +---------v---------+    +--------v---------+
                    | Keyframe Decoder  |    | JPEG Encoder     |
                    | (I-frame select)  |    | (AI input)       |
                    +---------+---------+    +--------+---------+
                              |                       |
                    +---------v---------+    +--------v---------+
                    | Frame Buffer      |    | MQTT Publisher   |
                    | (ring queue)      |    | (metadata)       |
                    +-------------------+    +------------------+
```

### 5.2 Frame Extraction for AI Inference

The AI inference pipeline extracts frames from the sub-stream (lower bandwidth) at a configurable decimated rate. Keyframes (I-frames) are preferred to reduce decode overhead.

#### 5.2.1 Keyframe-Based Extraction Strategy

| Mode | Description | Use Case |
|---|---|---|
| `keyframe_only` | Extract only I-frames | Maximum efficiency, may skip motion |
| `fixed_fps` | Extract at fixed FPS regardless of frame type | Smooth temporal analysis |
| `adaptive` | I-frames + P-frames when motion detected | Balanced quality and efficiency |
| `hybrid` | Fixed FPS with I-frame priority | Best overall |

**Recommended: `hybrid` mode — 5 FPS extraction with I-frame priority**

```python
# frame_extractor.py — Frame extraction pipeline for AI inference
import asyncio
import cv2
import numpy as np
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, Callable, AsyncIterator
import subprocess
import io
import hashlib

logger = logging.getLogger(__name__)

@dataclass
class FrameConfig:
    target_fps: float = 5.0
    mode: str = "hybrid"          # keyframe_only, fixed_fps, adaptive, hybrid
    resolution: tuple = (640, 480)
    jpeg_quality: int = 85
    use_keyframes: bool = True
    motion_threshold: float = 0.02  # for adaptive mode

@dataclass
class ExtractedFrame:
    channel: int
    timestamp: datetime
    frame_number: int
    is_keyframe: bool
    raw_bytes: bytes
    width: int
    height: int
    motion_score: float = 0.0
    jpeg_bytes: Optional[bytes] = None

class FrameExtractor:
    """
    Extracts decimated frames from RTSP stream for AI inference.
    Supports multiple extraction modes with configurable FPS.
    """

    def __init__(self, channel: int, config: FrameConfig):
        self.channel = channel
        self.cfg = config
        self._callback: Optional[Callable[[ExtractedFrame], None]] = None
        self._running = False
        self._frame_count = 0
        self._last_extract_time = 0.0
        self._extract_interval = 1.0 / config.target_fps
        self._prev_gray: Optional[np.ndarray] = None

    def on_frame(self, callback: Callable[[ExtractedFrame], None]):
        """Register callback for extracted frames."""
        self._callback = callback

    async def start(self, rtsp_url: str):
        """Start frame extraction from RTSP stream."""
        self._running = True
        ffmpeg_cmd = self._build_ffmpeg_cmd(rtsp_url)

        process = await asyncio.create_subprocess_exec(
            *ffmpeg_cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.DEVNULL,
        )

        try:
            await self._read_frames(process)
        finally:
            self._running = False
            if process.returncode is None:
                process.kill()
                await process.wait()

    def _build_ffmpeg_cmd(self, rtsp_url: str) -> list:
        """Build FFmpeg command for frame extraction."""
        w, h = self.cfg.resolution

        # Base filters
        filters = f"fps={self.cfg.target_fps},scale={w}:{h}"

        if self.cfg.use_keyframes:
            if self.cfg.mode == "keyframe_only":
                filters = f"select='eq(pict_type,I)',{filters}"
            elif self.cfg.mode == "hybrid":
                # I-frame priority with fixed FPS fallback
                filters = f"select='eq(pict_type,I)+gte(n,prev_selected_n+{int(25/self.cfg.target_fps)})',{filters}"

        cmd = [
            "ffmpeg",
            "-rtsp_transport", "tcp",
            "-stimeout", "5000000",
            "-reconnect", "1",
            "-reconnect_at_eof", "1",
            "-reconnect_streamed", "1",
            "-reconnect_delay_max", "30",
            "-fflags", "+genpts+discardcorrupt",
            "-flags", "+low_delay",
            "-i", rtsp_url,
            "-vf", filters,
            "-f", "image2pipe",
            "-vcodec", "mjpeg",
            "-q:v", str(int((100 - self.cfg.jpeg_quality) / 10)),
            "-"
        ]
        return cmd

    async def _read_frames(self, process: asyncio.subprocess.Process):
        """Read MJPEG frames from FFmpeg pipe."""
        buffer = b""
        jpeg_start = b'\xff\xd8'  # JPEG SOI marker
        jpeg_end = b'\xff\xd9'    # JPEG EOI marker

        while self._running:
            chunk = await process.stdout.read(65536)
            if not chunk:
                break

            buffer += chunk

            # Find complete JPEG frames
            start_idx = buffer.find(jpeg_start)
            while start_idx != -1:
                end_idx = buffer.find(jpeg_end, start_idx)
                if end_idx == -1:
                    break

                jpeg_data = buffer[start_idx:end_idx + 2]
                buffer = buffer[end_idx + 2:]

                await self._process_jpeg(jpeg_data)
                start_idx = buffer.find(jpeg_start)

            # Prevent buffer bloat
            if len(buffer) > 10 * 1024 * 1024:
                buffer = buffer[-5 * 1024 * 1024:]

    async def _process_jpeg(self, jpeg_data: bytes):
        """Process extracted JPEG frame."""
        self._frame_count += 1

        # Decode for metadata and motion detection
        nparr = np.frombuffer(jpeg_data, np.uint8)
        frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)

        if frame is None:
            return

        h, w = frame.shape[:2]
        is_keyframe = self._frame_count % int(25 / self.cfg.target_fps) == 1

        # Motion detection (for adaptive mode)
        motion_score = 0.0
        if self.cfg.mode in ("adaptive", "hybrid") and self._prev_gray is not None:
            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            resized = cv2.resize(gray, (w // 4, h // 4))
            flow = cv2.calcOpticalFlowFarneback(
                self._prev_gray, resized, None,
                0.5, 3, 15, 3, 5, 1.2, 0
            )
            motion_score = np.mean(np.abs(flow))
            self._prev_gray = resized
        elif self.cfg.mode in ("adaptive", "hybrid"):
            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            self._prev_gray = cv2.resize(gray, (w // 4, h // 4))

        extracted = ExtractedFrame(
            channel=self.channel,
            timestamp=datetime.utcnow(),
            frame_number=self._frame_count,
            is_keyframe=is_keyframe,
            raw_bytes=frame.tobytes(),
            width=w,
            height=h,
            motion_score=float(motion_score),
            jpeg_bytes=jpeg_data,
        )

        if self._callback:
            try:
                if asyncio.iscoroutinefunction(self._callback):
                    await self._callback(extracted)
                else:
                    self._callback(extracted)
            except Exception as e:
                logger.error(f"Frame callback error: {e}")

    async def stop(self):
        self._running = False


class MultiChannelExtractor:
    """Manages frame extraction across all 8 channels."""

    def __init__(self, max_concurrent: int = 8):
        self.extractors: dict[int, FrameExtractor] = {}
        self.tasks: dict[int, asyncio.Task] = {}
        self.max_concurrent = max_concurrent
        self._frame_queue: asyncio.Queue[ExtractedFrame] = asyncio.Queue(maxsize=100)

    async def add_channel(self, channel: int, rtsp_url: str, config: FrameConfig):
        extractor = FrameExtractor(channel, config)
        extractor.on_frame(self._on_frame)
        self.extractors[channel] = extractor
        self.tasks[channel] = asyncio.create_task(
            extractor.start(rtsp_url),
            name=f"extractor-ch{channel}"
        )

    async def _on_frame(self, frame: ExtractedFrame):
        try:
            self._frame_queue.put_nowait(frame)
        except asyncio.QueueFull:
            # Drop oldest frame to prevent blocking
            try:
                self._frame_queue.get_nowait()
                self._frame_queue.put_nowait(frame)
            except asyncio.QueueEmpty:
                pass

    async def frame_consumer(self, handler: Callable[[ExtractedFrame], None]):
        """Consume frames from the queue."""
        while True:
            frame = await self._frame_queue.get()
            try:
                if asyncio.iscoroutinefunction(handler):
                    await handler(frame)
                else:
                    handler(frame)
            except Exception as e:
                logger.error(f"Frame handler error: {e}")
            finally:
                self._frame_queue.task_done()

    async def stop_all(self):
        for extractor in self.extractors.values():
            await extractor.stop()
        for task in self.tasks.values():
            task.cancel()
            try:
                await task
            except asyncio.CancelledError:
                pass
```

### 5.3 Simultaneous Stream Forking (Live + AI)

For each channel, the gateway must simultaneously provide:
1. **HLS live stream** — for dashboard viewing (full quality)
2. **AI frame stream** — decimated frames for inference
3. **Alert clip stream** — triggered recordings

**Approach: Single FFmpeg → Multi-target output using `tee` pseudo-muxer**

```bash
# Advanced: Single RTSP input, multiple outputs with tee muxer
ffmpeg -rtsp_transport tcp -stimeout 5000000 \
  -fflags +genpts+discardcorrupt -flags +low_delay \
  -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
  \
  # Split into two video streams using split filter
  -filter_complex "[0:v]split=2[live][ai];[ai]fps=5,scale=640:480[aiout]" \
  \
  # Output 1: HLS live stream (copy codec, no re-encode)
  -map "[live]" -map 0:a? -c:v copy -c:a copy \
  -f hls -hls_time 2 -hls_list_size 5 \
  -hls_flags delete_segments+omit_endlist+program_date_time \
  -hls_segment_filename "/data/hls/ch1_%03d.ts" \
  "/data/hls/ch1.m3u8" \
  \
  # Output 2: AI frame stream (MJPEG to pipe)
  -map "[aiout]" -an \
  -f image2pipe -vcodec mjpeg -q:v 3 \
  pipe:1 \
  \
  # Output 3: Continuous recording (segmented MP4)
  -map 0:v -map 0:a? -c:v copy -c:a copy \
  -f segment -segment_time 60 -segment_format mp4 \
  -reset_timestamps 1 -strftime 1 \
  "/data/buffer/ch1_continuous/%Y%m%d_%H%M%S.mp4"
```

### 5.4 Resolution Adaptation

The gateway supports dynamic resolution adaptation based on network conditions:

| Scenario | Action | Trigger |
|---|---|---|
| Bandwidth sufficient | Full resolution (960x1080) | VPN throughput > 16 Mbps |
| Bandwidth constrained | Downscale to 640x720 | VPN throughput < 8 Mbps |
| Severe constraint | Switch to sub-stream (352x288) | VPN throughput < 4 Mbps |
| Recovery detected | Gradually upscale | Sustained > threshold for 30s |

```python
class AdaptiveBitrateController:
    """
    Monitors VPN tunnel bandwidth and adjusts stream quality.
    Uses EWMA (exponentially weighted moving average) for smoothing.
    """

    QUALITY_LEVELS = {
        "full": {"resolution": (960, 1080), "stream": "primary"},
        "medium": {"resolution": (640, 720), "stream": "primary"},
        "low": {"resolution": (352, 288), "stream": "secondary"},
    }

    def __init__(self, min_bandwidth_mbps: float = 4.0):
        self.ewma_alpha = 0.3
        self.current_bandwidth = 100.0  # Start optimistic
        self.min_bandwidth = min_bandwidth_mbps
        self.current_quality = "full"
        self._history: list[float] = []

    def update_bandwidth(self, measured_mbps: float):
        self.current_bandwidth = (
            self.ewma_alpha * measured_mbps +
            (1 - self.ewma_alpha) * self.current_bandwidth
        )
        self._history.append(self.current_bandwidth)
        if len(self._history) > 100:
            self._history = self._history[-50:]

    def get_quality(self) -> str:
        bw = self.current_bandwidth
        if bw > 16:
            target = "full"
        elif bw > 8:
            target = "medium"
        else:
            target = "low"

        # Hysteresis: only downgrade immediately, upgrade with delay
        if self.QUALITY_LEVELS[target]["resolution"][0] < \
           self.QUALITY_LEVELS[self.current_quality]["resolution"][0]:
            # Downgrade immediately
            self.current_quality = target
        elif target != self.current_quality:
            # Check sustained improvement before upgrading
            if len(self._history) >= 10 and \
               all(b > self.QUALITY_LEVELS[target]["resolution"][0] / 60
                   for b in self._history[-10:]):
                self.current_quality = target

        return self.current_quality
```

### 5.5 Bandwidth Optimization for Cloud Upload

```
+------------------+    +-------------------+    +------------------+
|  Raw RTSP Stream | -> |  Edge Transcoder  | -> |  Cloud-Optimized |
|  (H.265, 4 Mbps) |    |  (H.264, 1 Mbps)  |    |  Upload Stream   |
+------------------+    +-------------------+    +------------------+
                              |
                    +---------v---------+
                    | Optimization:     |
                    | - H.265→H.264     |
                    | - CRF 28 (quality)|
                    | - 2-pass VBR      |
                    | - De-duplication  |
                    | - Delta frames    |
                    +-------------------+
```

**FFmpeg Cloud Upload Encoding:**
```bash
# Transcode H.265 main stream to H.264 for cloud compatibility
ffmpeg -rtsp_transport tcp -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
  -c:v libx264 -preset fast -tune zerolatency \
  -crf 28 -maxrate 1500k -bufsize 3000k \
  -g 50 -keyint_min 25 \
  -profile:v main -level 4.1 \
  -movflags +faststart \
  -c:a aac -b:a 64k -ac 1 \
  -f mp4 pipe:1 | curl -X POST -H "Content-Type: video/mp4" \
  --data-binary @- https://10.100.0.1/api/upload/ch1
```

**Upload Optimization Techniques:**

| Technique | Bandwidth Savings | Implementation |
|---|---|---|
| H.265→H.264 with CRF 28 | 40-60% | FFmpeg libx264 |
| Frame deduplication (static scenes) | 10-30% | Skip identical frames |
| Adaptive GOP | 5-15% | Dynamic I-frame interval |
| Resolution scaling | 30-70% | Scale to 720p or 480p |
| Audio mono + low bitrate | 5% | AAC 64kbps mono |
| Upload compression (gzip) | 5-10% | HTTP Content-Encoding |

---

## 6. Live Streaming for Dashboard

### 6.1 HLS Stream Generation

HLS (HTTP Live Streaming) is the primary delivery mechanism for the dashboard due to its wide browser support and adaptive bitrate capabilities.

#### 6.1.1 HLS Playlist Format (Generated per Channel)

```m3u8
# /data/hls/ch1.m3u8
#EXTM3U
#EXT-X-VERSION:3
#EXT-X-TARGETDURATION:2
#EXT-X-MEDIA-SEQUENCE:1245
#EXT-X-DISCONTINUITY-SEQUENCE:0
#EXTINF:2.000,
ch1_245.ts
#EXTINF:2.000,
ch1_246.ts
#EXTINF:2.000,
ch1_247.ts
#EXTINF:2.000,
ch1_248.ts
#EXTINF:2.000,
ch1_249.ts
```

#### 6.1.2 Multi-Quality HLS (Master Playlist)

For adaptive streaming, generate multiple quality variants:

```m3u8
# /data/hls/ch1_master.m3u8
#EXTM3U
#EXT-X-STREAM-INF:BANDWIDTH=4000000,RESOLUTION=960x1080,CODECS="avc1.640020,mp4a.40.2"
ch1_high.m3u8
#EXT-X-STREAM-INF:BANDWIDTH=1500000,RESOLUTION=640x720,CODECS="avc1.4d001f,mp4a.40.2"
ch1_mid.m3u8
#EXT-X-STREAM-INF:BANDWIDTH=500000,RESOLUTION=352x288,CODECS="avc1.42001e,mp4a.40.2"
ch1_low.m3u8
```

#### 6.1.3 FFmpeg HLS Generation Commands

```bash
# High quality variant (primary stream)
ffmpeg -rtsp_transport tcp -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
  -c:v libx264 -preset ultrafast -tune zerolatency -profile:v high -level 4.1 \
  -b:v 3000k -maxrate 4000k -bufsize 6000k -g 50 -keyint_min 25 \
  -c:a aac -b:a 128k -ar 44100 -ac 2 \
  -f hls -hls_time 2 -hls_list_size 5 -hls_flags delete_segments+omit_endlist \
  -hls_segment_type mpegts -hls_segment_filename "/data/hls/ch1_high_%03d.ts" \
  "/data/hls/ch1_high.m3u8" \
  \
# Mid quality variant (transcoded)
  -c:v libx264 -preset ultrafast -tune zerolatency -profile:v main -level 3.1 \
  -vf "scale=640:720" -b:v 1000k -maxrate 1500k -bufsize 2000k -g 50 \
  -c:a aac -b:a 96k -ar 44100 -ac 2 \
  -f hls -hls_time 2 -hls_list_size 5 -hls_flags delete_segments+omit_endlist \
  -hls_segment_filename "/data/hls/ch1_mid_%03d.ts" \
  "/data/hls/ch1_mid.m3u8"
```

### 6.2 WebSocket + WebRTC for Low-Latency Preview

For sub-500ms latency preview (single camera fullscreen), WebRTC is used:

```
+-------------+    +------------------+    +-----------------+
|  Browser    | -> |  Edge Gateway    | -> |  CP PLUS DVR    |
| (WebRTC)    |    |  (WHIP Bridge)   |    |  (RTSP Source)  |
|             |<-   |                  |<-   |                 |
+-------------+    +------------------+    +-----------------+

Latency breakdown:
  RTSP ingest:    ~200-500ms
  FFmpeg decode:  ~50-100ms
  WebRTC encode:  ~30-80ms
  Network (LAN):  ~1-5ms
  Browser decode: ~20-50ms
  TOTAL:          ~300-750ms
```

**WebRTC Bridge (Python/aiortc):**
```python
# webrtc_bridge.py — Low-latency WebRTC bridge for single-camera preview
import asyncio
import logging
from aiortc import RTCPeerConnection, VideoStreamTrack
from aiortc.contrib.signaling import object_from_string, object_to_string
from av import VideoFrame
import cv2
import numpy as np

logger = logging.getLogger(__name__)

class RTSPVideoStreamTrack(VideoStreamTrack):
    """Reads RTSP via FFmpeg, outputs WebRTC VideoFrame."""

    def __init__(self, rtsp_url: str):
        super().__init__()
        self.rtsp_url = rtsp_url
        self._ffmpeg = None
        self._task = None
        self._frame_queue = asyncio.Queue(maxsize=2)  # Minimize latency

    async def start(self):
        cmd = [
            "ffmpeg", "-rtsp_transport", "tcp", "-stimeout", "5000000",
            "-reconnect", "1", "-reconnect_at_eof", "1",
            "-reconnect_streamed", "1", "-reconnect_delay_max", "30",
            "-fflags", "+genpts+discardcorrupt", "-flags", "+low_delay",
            "-i", self.rtsp_url,
            "-vf", "scale=640:480,format=yuv420p",
            "-c:v", "rawvideo", "-pix_fmt", "yuv420p",
            "-f", "rawvideo", "-"
        ]
        self._ffmpeg = await asyncio.create_subprocess_exec(
            *cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.DEVNULL,
        )
        self._task = asyncio.create_task(self._read_frames())

    async def _read_frames(self):
        width, height = 640, 480
        frame_size = width * height * 3 // 2  # YUV420P
        while True:
            raw = await self._ffmpeg.stdout.read(frame_size)
            if len(raw) < frame_size:
                break
            frame = VideoFrame.from_bytes(raw, format="yuv420p", width=width, height=height)
            try:
                self._frame_queue.put_nowait(frame)
            except asyncio.QueueFull:
                # Drop oldest to minimize latency
                self._frame_queue.get_nowait()
                self._frame_queue.put_nowait(frame)

    async def recv(self):
        frame = await self._frame_queue.get()
        pts, time_base = await self.next_timestamp()
        frame.pts = pts
        frame.time_base = time_base
        return frame

    async def stop(self):
        if self._task:
            self._task.cancel()
        if self._ffmpeg:
            self._ffmpeg.kill()
            await self._ffmpeg.wait()


class WebRTCBridge:
    """Manages WebRTC peer connections for low-latency camera preview."""

    def __init__(self, rtsp_url: str):
        self.rtsp_url = rtsp_url
        self.pc = None
        self.track = None

    async def create_offer(self):
        self.pc = RTCPeerConnection()
        self.track = RTSPVideoStreamTrack(self.rtsp_url)
        await self.track.start()
        self.pc.addTrack(self.track)

        offer = await self.pc.createOffer()
        await self.pc.setLocalDescription(offer)
        return {"sdp": self.pc.localDescription.sdp, "type": self.pc.localDescription.type}

    async def handle_answer(self, answer_sdp: str):
        answer = object_from_string(json.dumps({"sdp": answer_sdp, "type": "answer"}))
        await self.pc.setRemoteDescription(answer)

    async def close(self):
        if self.track:
            await self.track.stop()
        if self.pc:
            await self.pc.close()
```

### 6.3 Multi-Camera Grid Layout

The dashboard supports multiple grid layouts with automatic stream selection:

| Layout | Cameras Visible | Stream Used | Resolution | Per-Stream BW |
|---|---|---|---|---|
| 1x1 (fullscreen) | 1 | Primary (subtype=0) | 960x1080 | ~3 Mbps |
| 2x2 | 4 | Secondary (subtype=1) | 352x288 | ~0.5 Mbps each |
| 3x3 | 8+1 empty | Secondary (subtype=1) | 352x288 | ~0.5 Mbps each |
| 4x2 | 8 | Secondary (subtype=1) | 352x288 | ~0.5 Mbps each |
| Custom | User-defined | Mixed | Mixed | Sum of selected |

**Grid Switching Logic:**
```python
def get_stream_for_layout(channel: int, layout: str, is_focused: bool) -> str:
    """Determine which RTSP stream to use based on layout context."""
    if layout == "1x1" or is_focused:
        # Fullscreen — use main stream for quality
        return f"rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel={channel}&subtype=0"
    else:
        # Grid view — use sub stream for bandwidth efficiency
        return f"rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel={channel}&subtype=1"
```

### 6.4 Stream Relay Through Cloud

```
[Edge Gateway HLS] --VPN--> [Cloud Nginx] --> [CDN / Dashboard Users]
                              (10.100.0.1:443)
                              
Nginx Configuration:
- Reverse proxy to edge gateway HLS endpoint
- Token-based authentication (JWT)
- Rate limiting per client
- HTTPS termination
- HLS caching (5-second segments)
```

**Nginx HLS Relay Configuration:**
```nginx
# /etc/nginx/conf.d/hls-relay.conf
upstream edge_gateway_hls {
    server 10.100.0.10:8081;  # Via WireGuard tunnel
    keepalive 32;
}

server {
    listen 443 ssl http2;
    server_name streams.example.com;

    ssl_certificate /etc/ssl/certs/stream.crt;
    ssl_certificate_key /etc/ssl/private/stream.key;

    # HLS endpoint authentication
    location /hls/ {
        # JWT validation
        auth_jwt "HLS Stream";
        auth_jwt_key_file /etc/nginx/jwt_key.pub;

        # Rate limiting
        limit_req zone=hls_limit burst=10 nodelay;

        proxy_pass http://edge_gateway_hls/hls/;
        proxy_http_version 1.1;
        proxy_set_header Connection "";
        proxy_buffering off;
        proxy_cache off;

        # CORS for web dashboard
        add_header Access-Control-Allow-Origin "https://dashboard.example.com" always;
        add_header Access-Control-Allow-Methods "GET, OPTIONS" always;

        # HLS-specific headers
        add_header Cache-Control "no-cache" always;
        add_header Access-Control-Expose-Headers "Content-Length" always;

        # Segment caching (brief)
        location ~* \.ts$ {
            proxy_pass http://edge_gateway_hls;
            proxy_cache hls_cache;
            proxy_cache_valid 200 10s;
            expires 10s;
        }
    }

    # WebSocket for real-time status
    location /ws/ {
        proxy_pass http://edge_gateway_hls/ws/;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
    }
}
```

### 6.5 Fallback Handling

When a camera stream fails, the following fallback chain is executed:

```
1. RTSP connection fails
   ├──> Retry with exponential backoff (3 attempts)
   ├──> Attempt UDP transport if TCP fails
   ├──> Circuit breaker opens after 5 consecutive failures
   │
2. Stream stall detected (no frames for 10s)
   ├──> Kill FFmpeg process
   ├──> Restart stream with fresh connection
   │
3. Camera marked OFFLINE
   ├──> Dashboard shows "Camera Offline" placeholder
   ├──> HLS playlist returns 404
   ├──> Last known frame displayed with timestamp overlay
   ├──> Alert sent to operations team
   │
4. Camera recovers
   ├──> Circuit breaker transitions to HALF_OPEN
   ├──> Test stream pulled for 10 seconds
   ├──> On success: circuit CLOSED, stream resumes
   ├──> Dashboard auto-refreshes
```

**HLS Error Response (Camera Offline):**
```m3u8
#EXTM3U
#EXT-X-VERSION:3
#EXT-X-TARGETDURATION:2
#EXT-X-MEDIA-SEQUENCE:0
#EXT-X-ERROR: "Camera OFFLINE - Channel 1"
#EXTINF:2.000,
offline_placeholder.ts
```



---

## 7. Edge Buffering & Local Storage

### 7.1 Storage Architecture

The edge gateway uses a tiered storage system optimized for limited SSD capacity:

```
+-------------------------------------------------------------------+
|                     Edge Gateway Storage (512GB NVMe)              |
|                                                                    |
|  +---------------------+  +---------------------+  +-----------+  |
|  | Ring Buffer (20GB)  |  | HLS Cache (5GB)     |  | Logs (1GB)|  |
|  | /data/buffer/       |  | /data/hls/          |  |/data/logs/|  |
|  |                     |  |                     |  |           |  |
|  | Alert clips (MP4)   |  | HLS segments (.ts)  |  | Rotated   |  |
|  | Pre-event buffer    |  | Playlists (.m3u8)   |  | daily     |  |
|  | Upload queue        |  | 5-segment window    |  |           |  |
|  | Retry staging       |  | Auto-cleanup        |  |           |  |
|  +----------+----------+  +---------------------+  +-----------+  |
|             |                                                      |
|  +----------v----------+                                           |
|  | Upload Manager      | ---> VPN ---> Cloud S3/MinIO             |
|  | - Parallel uploads  |      (10.100.0.1:9000)                   |
|  | - Retry with backoff|                                           |
|  | - Integrity verify  |                                           |
|  +---------------------+                                           |
+-------------------------------------------------------------------+
```

**Storage Quotas:**

| Directory | Max Size | Max Age | Cleanup Strategy |
|---|---|---|---|
| `/data/buffer/alert_clips/` | 15 GB | 72 hours | LRU + age-based |
| `/data/buffer/pre_event/` | 3 GB | 24 hours | Ring buffer (FIFO) |
| `/data/buffer/upload_queue/` | 2 GB | 48 hours | On successful upload |
| `/data/hls/` | 5 GB | 10 seconds | Immediate (segment rotation) |
| `/data/logs/` | 1 GB | 30 days | Logrotate daily |
| `/tmp/ffmpeg/` | 2 GB | On process exit | Cleanup on restart |
| **Total Reserved** | **~28 GB** | — | — |

### 7.2 Ring Buffer for Alert Clips

The ring buffer stores video clips triggered by AI detection events (motion, intrusion, safety violation, etc.).

#### 7.2.1 Ring Buffer Design

```python
# ring_buffer.py — Alert clip ring buffer with disk management
import os
import time
import json
import asyncio
import logging
import shutil
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional, List
import aiofiles
import hashlib

logger = logging.getLogger(__name__)

@dataclass
class ClipMetadata:
    clip_id: str
    channel: int
    camera_name: str
    event_type: str          # motion, intrusion, safety_violation, etc.
    event_confidence: float
    start_time: datetime     # clip start (includes pre-event)
    event_time: datetime     # actual detection time
    end_time: datetime       # clip end (includes post-event)
    duration_seconds: float
    file_path: str
    file_size_bytes: int
    checksum_sha256: str
    upload_status: str       # pending, uploading, uploaded, failed
    upload_attempts: int = 0
    resolution: str = "960x1080"
    fps: int = 25
    tags: List[str] = None

    def __post_init__(self):
        if self.tags is None:
            self.tags = []

class RingBuffer:
    """
    Fixed-size ring buffer for alert clips.
    When full, oldest clips are deleted to make room.
    """

    def __init__(self, base_path: str, max_size_bytes: int, max_age_hours: int):
        self.base_path = Path(base_path)
        self.max_size = max_size_bytes
        self.max_age = timedelta(hours=max_age_hours)
        self.metadata_file = self.base_path / "index.jsonl"
        self._clips: List[ClipMetadata] = []
        self._current_size = 0
        self._lock = asyncio.Lock()

    async def initialize(self):
        """Load existing clips from disk and calculate current usage."""
        self.base_path.mkdir(parents=True, exist_ok=True)

        if self.metadata_file.exists():
            async with aiofiles.open(self.metadata_file, 'r') as f:
                async for line in f:
                    line = line.strip()
                    if line:
                        data = json.loads(line)
                        clip = ClipMetadata(**data)
                        clip.start_time = datetime.fromisoformat(data['start_time'])
                        clip.event_time = datetime.fromisoformat(data['event_time'])
                        clip.end_time = datetime.fromisoformat(data['end_time'])
                        self._clips.append(clip)
                        self._current_size += clip.file_size_bytes

        # Clean old clips on startup
        await self._cleanup_old_clips()
        logger.info(f"Ring buffer initialized: {len(self._clips)} clips, "
                    f"{self._current_size / 1e9:.2f} GB")

    async def store_clip(self, source_path: str, metadata: ClipMetadata) -> ClipMetadata:
        """Store a new clip in the ring buffer."""
        async with self._lock:
            # Make room if needed
            while self._current_size + metadata.file_size_bytes > self.max_size:
                if not self._clips:
                    break
                await self._remove_oldest_clip()

            # Copy file to buffer
            dest_path = self.base_path / f"{metadata.clip_id}.mp4"
            shutil.copy2(source_path, dest_path)

            # Verify checksum
            actual_checksum = await self._calculate_checksum(dest_path)
            if actual_checksum != metadata.checksum_sha256:
                os.remove(dest_path)
                raise ValueError("Checksum mismatch — possible corruption")

            metadata.file_path = str(dest_path)
            self._clips.append(metadata)
            self._current_size += metadata.file_size_bytes

            # Persist metadata
            await self._append_metadata(metadata)

            logger.info(f"Stored clip {metadata.clip_id} "
                        f"({metadata.file_size_bytes / 1e6:.1f} MB)")
            return metadata

    async def get_pending_uploads(self, batch_size: int = 10) -> List[ClipMetadata]:
        """Get clips pending upload, ordered by age."""
        pending = [
            c for c in self._clips
            if c.upload_status in ("pending", "failed")
            and c.upload_attempts < 5
        ]
        return sorted(pending, key=lambda c: c.event_time)[:batch_size]

    async def mark_uploaded(self, clip_id: str):
        """Mark a clip as successfully uploaded."""
        for clip in self._clips:
            if clip.clip_id == clip_id:
                clip.upload_status = "uploaded"
                await self._rewrite_metadata()
                # Optionally delete local file after successful upload
                # await self._remove_clip_file(clip)
                break

    async def mark_failed(self, clip_id: str):
        """Increment failure counter for a clip."""
        for clip in self._clips:
            if clip.clip_id == clip_id:
                clip.upload_attempts += 1
                clip.upload_status = "failed"
                await self._rewrite_metadata()
                break

    async def _remove_oldest_clip(self):
        """Remove the oldest clip to free space."""
        if not self._clips:
            return
        oldest = min(self._clips, key=lambda c: c.event_time)
        await self._remove_clip(oldest)

    async def _remove_clip(self, clip: ClipMetadata):
        """Remove a specific clip from buffer."""
        try:
            if os.path.exists(clip.file_path):
                os.remove(clip.file_path)
        except OSError:
            pass
        self._clips.remove(clip)
        self._current_size -= clip.file_size_bytes

    async def _cleanup_old_clips(self):
        """Remove clips older than max_age."""
        cutoff = datetime.utcnow() - self.max_age
        old_clips = [c for c in self._clips if c.event_time < cutoff]
        for clip in old_clips:
            await self._remove_clip(clip)
        if old_clips:
            await self._rewrite_metadata()
            logger.info(f"Cleaned {len(old_clips)} old clips")

    async def _calculate_checksum(self, file_path: Path) -> str:
        sha = hashlib.sha256()
        async with aiofiles.open(file_path, 'rb') as f:
            while chunk := await f.read(8192):
                sha.update(chunk)
        return sha.hexdigest()

    async def _append_metadata(self, clip: ClipMetadata):
        async with aiofiles.open(self.metadata_file, 'a') as f:
            data = asdict(clip)
            data['start_time'] = clip.start_time.isoformat()
            data['event_time'] = clip.event_time.isoformat()
            data['end_time'] = clip.end_time.isoformat()
            await f.write(json.dumps(data) + '\n')

    async def _rewrite_metadata(self):
        async with aiofiles.open(self.metadata_file, 'w') as f:
            for clip in self._clips:
                data = asdict(clip)
                data['start_time'] = clip.start_time.isoformat()
                data['event_time'] = clip.event_time.isoformat()
                data['end_time'] = clip.end_time.isoformat()
                await f.write(json.dumps(data) + '\n')

    @property
    def stats(self) -> dict:
        return {
            "clip_count": len(self._clips),
            "total_size_bytes": self._current_size,
            "total_size_gb": round(self._current_size / 1e9, 2),
            "max_size_gb": round(self.max_size / 1e9, 2),
            "usage_percent": round(self._current_size / self.max_size * 100, 1),
            "pending_uploads": len([c for c in self._clips if c.upload_status == "pending"]),
            "failed_uploads": len([c for c in self._clips if c.upload_status == "failed"]),
        }
```

### 7.3 Upload Queue with Retry

```python
# upload_manager.py — Cloud upload with retry and integrity verification
import asyncio
import aiohttp
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
import json

logger = logging.getLogger(__name__)

@dataclass
class UploadConfig:
    endpoint_url: str = "https://10.100.0.1/api/upload"
    max_retries: int = 5
    base_delay: float = 5.0          # seconds
    max_delay: float = 300.0         # 5 minutes
    timeout: float = 60.0            # per-request timeout
    concurrent_uploads: int = 3
    verify_ssl: bool = True          # False for self-signed in dev

class UploadManager:
    """
    Manages cloud uploads with:
    - Exponential backoff retry
    - Parallel upload limiting
    - Integrity verification
    - Bandwidth throttling
    """

    def __init__(self, config: UploadConfig, ring_buffer: RingBuffer):
        self.cfg = config
        self.buffer = ring_buffer
        self._semaphore = asyncio.Semaphore(config.concurrent_uploads)
        self._session: Optional[aiohttp.ClientSession] = None
        self._running = False

    async def start(self):
        ssl_ctx = False if not self.cfg.verify_ssl else None
        connector = aiohttp.TCPConnector(
            limit=10,
            limit_per_host=5,
            enable_cleanup_closed=True,
            force_close=True,
        )
        timeout = aiohttp.ClientTimeout(total=self.cfg.timeout)
        self._session = aiohttp.ClientSession(
            connector=connector,
            timeout=timeout,
        )
        self._running = True
        asyncio.create_task(self._upload_loop())

    async def _upload_loop(self):
        """Background task that continuously processes the upload queue."""
        while self._running:
            try:
                pending = await self.buffer.get_pending_uploads(batch_size=5)
                if not pending:
                    await asyncio.sleep(5)
                    continue

                tasks = [
                    self._upload_with_retry(clip)
                    for clip in pending
                ]
                await asyncio.gather(*tasks, return_exceptions=True)

            except Exception as e:
                logger.error(f"Upload loop error: {e}")
                await asyncio.sleep(10)

    async def _upload_with_retry(self, clip: ClipMetadata):
        """Upload a clip with exponential backoff retry."""
        async with self._semaphore:
            for attempt in range(1, self.cfg.max_retries + 1):
                try:
                    await self._do_upload(clip)
                    await self.buffer.mark_uploaded(clip.clip_id)
                    logger.info(f"Uploaded clip {clip.clip_id} "
                                f"(attempt {attempt})")
                    return

                except aiohttp.ClientError as e:
                    delay = min(
                        self.cfg.base_delay * (2 ** (attempt - 1)),
                        self.cfg.max_delay
                    )
                    logger.warning(
                        f"Upload failed for {clip.clip_id} "
                        f"(attempt {attempt}/{self.cfg.max_retries}): {e}. "
                        f"Retry in {delay}s"
                    )
                    await asyncio.sleep(delay)

            # All retries exhausted
            await self.buffer.mark_failed(clip.clip_id)
            logger.error(f"Upload permanently failed for {clip.clip_id}")

    async def _do_upload(self, clip: ClipMetadata):
        """Perform the actual HTTP upload."""
        upload_url = f"{self.cfg.endpoint_url}/{clip.channel}"

        form_data = aiohttp.FormData()
        form_data.add_field(
            'file',
            open(clip.file_path, 'rb'),
            filename=f"{clip.clip_id}.mp4",
            content_type='video/mp4',
        )
        form_data.add_field('metadata', json.dumps({
            'clip_id': clip.clip_id,
            'channel': clip.channel,
            'event_type': clip.event_type,
            'event_time': clip.event_time.isoformat(),
            'checksum': clip.checksum_sha256,
        }))

        async with self._session.post(upload_url, data=form_data) as resp:
            if resp.status != 200:
                body = await resp.text()
                raise aiohttp.ClientError(f"HTTP {resp.status}: {body}")

            result = await resp.json()
            # Verify server-side checksum matches
            if result.get('checksum') != clip.checksum_sha256:
                raise ValueError("Server checksum mismatch")

    async def stop(self):
        self._running = False
        if self._session:
            await self._session.close()

    async def get_queue_status(self) -> dict:
        """Get current upload queue status."""
        pending = await self.buffer.get_pending_uploads(batch_size=1000)
        return {
            "pending_count": len(pending),
            "in_progress": self.cfg.concurrent_uploads - self._semaphore._value,
            "endpoint": self.cfg.endpoint_url,
            "last_check": datetime.utcnow().isoformat(),
        }
```

### 7.4 Disk Management — Automatic Cleanup

```python
# disk_manager.py — Proactive disk space management
import os
import shutil
import asyncio
import logging
from pathlib import Path
from typing import List, Dict

logger = logging.getLogger(__name__)

class DiskManager:
    """
    Monitors disk usage and enforces quotas.
    Implements emergency cleanup when disk is critically full.
    """

    CRITICAL_THRESHOLD = 95  # % — emergency cleanup
    WARNING_THRESHOLD = 85   # % — proactive cleanup
    TARGET_USAGE = 80        # % — cleanup target

    def __init__(self, mounts: Dict[str, dict]):
        """
        mounts: { path: { max_size_gb, max_age_hours, priority } }
        priority: lower = deleted first during cleanup
        """
        self.mounts = mounts

    async def monitor_loop(self, interval_seconds: float = 60.0):
        """Continuous disk monitoring loop."""
        while True:
            try:
                for path, config in self.mounts.items():
                    usage = self._get_usage(path)
                    logger.debug(f"Disk {path}: {usage['percent']:.1f}% used")

                    if usage['percent'] > self.CRITICAL_THRESHOLD:
                        logger.critical(f"CRITICAL: Disk {path} at "
                                        f"{usage['percent']:.1f}%")
                        await self._emergency_cleanup(path, config)
                    elif usage['percent'] > self.WARNING_THRESHOLD:
                        logger.warning(f"Disk {path} at {usage['percent']:.1f}%")
                        await self._proactive_cleanup(path, config)

            except Exception as e:
                logger.error(f"Disk monitor error: {e}")

            await asyncio.sleep(interval_seconds)

    def _get_usage(self, path: str) -> dict:
        stat = shutil.disk_usage(path)
        total = stat.total
        used = stat.used
        free = stat.free
        return {
            "total_gb": total / 1e9,
            "used_gb": used / 1e9,
            "free_gb": free / 1e9,
            "percent": (used / total) * 100,
        }

    async def _emergency_cleanup(self, path: str, config: dict):
        """Aggressive cleanup to free space immediately."""
        # 1. Delete HLS cache oldest segments aggressively
        hls_dir = Path(path) / "hls"
        if hls_dir.exists():
            segments = sorted(hls_dir.glob("*.ts"), key=lambda f: f.stat().st_mtime)
            for seg in segments[:-3]:  # Keep only newest 3 segments
                seg.unlink()
                logger.info(f"Emergency deleted HLS segment: {seg}")

        # 2. Delete already-uploaded clips
        buffer_dir = Path(path) / "buffer"
        if buffer_dir.exists():
            for clip_file in sorted(buffer_dir.glob("*.mp4"), key=lambda f: f.stat().st_mtime):
                clip_file.unlink()
                logger.info(f"Emergency deleted uploaded clip: {clip_file}")
                usage = self._get_usage(path)
                if usage['percent'] < self.TARGET_USAGE:
                    break

        # 3. Compress logs
        logs_dir = Path(path) / "logs"
        if logs_dir.exists():
            for log_file in sorted(logs_dir.glob("*.log"), key=lambda f: f.stat().st_mtime):
                if not str(log_file).endswith('.gz'):
                    os.system(f"gzip {log_file}")

    async def _proactive_cleanup(self, path: str, config: dict):
        """Normal proactive cleanup — age-based deletion."""
        # Age-based cleanup handled by RingBuffer
        pass
```

### 7.5 Pre-Event Buffer (Continuous Circular Recording)

For capturing video before an AI-detected event, a continuous circular buffer is maintained:

```bash
# Continuous 60-second rolling buffer per channel (primary stream)
ffmpeg -rtsp_transport tcp -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
  -c:v copy -c:a copy -f segment -segment_time 10 -segment_wrap 6 \
  -reset_timestamps 1 -strftime 0 \
  "/data/buffer/pre_event/ch1_%d.mp4"
```

This creates 6 x 10-second segments that continuously overwrite:
- `ch1_0.mp4` — newest
- `ch1_1.mp4`
- ...
- `ch1_5.mp4` — oldest (will be overwritten next)

**On event detection:**
1. Lock current segments (prevent overwrite)
2. Continue recording for `post_event_seconds` (e.g., 30s)
3. Concatenate pre-event + event + post-event segments
4. Store in ring buffer
5. Queue for cloud upload

```python
async def handle_event_detection(self, channel: int, event_time: datetime):
    """Trigger clip capture with pre/post-event padding."""
    pre_buffer_dir = f"/data/buffer/pre_event/ch{channel}"
    clip_id = f"evt_{channel}_{event_time.strftime('%Y%m%d_%H%M%S')}"

    # Collect pre-event segments (last 60 seconds)
    pre_segments = sorted(
        Path(pre_buffer_dir).glob("*.mp4"),
        key=lambda f: f.stat().st_mtime
    )

    # Start post-event recording
    post_event_path = f"/tmp/{clip_id}_post.mp4"
    post_task = asyncio.create_task(
        self._record_post_event(channel, post_event_path, duration=30)
    )
    await post_task

    # Concatenate: pre-event + post-event
    concat_list = f"/tmp/{clip_id}_concat.txt"
    with open(concat_list, 'w') as f:
        for seg in pre_segments:
            f.write(f"file '{seg.absolute()}'\n")
        f.write(f"file '{post_event_path}'\n")

    output_path = f"/data/buffer/alert_clips/{clip_id}.mp4"
    subprocess.run([
        "ffmpeg", "-y", "-f", "concat", "-safe", "0",
        "-i", concat_list, "-c", "copy", output_path
    ], check=True)

    # Store in ring buffer
    file_size = os.path.getsize(output_path)
    checksum = await self._calculate_checksum(output_path)
    metadata = ClipMetadata(
        clip_id=clip_id,
        channel=channel,
        event_type="ai_detection",
        event_confidence=0.85,
        start_time=event_time - timedelta(seconds=60),
        event_time=event_time,
        end_time=event_time + timedelta(seconds=30),
        duration_seconds=90,
        file_path=output_path,
        file_size_bytes=file_size,
        checksum_sha256=checksum,
        upload_status="pending",
    )
    await self.ring_buffer.store_clip(output_path, metadata)
```



---

## 8. Implementation Specifications

### 8.1 Python Project Structure

```
edge-gateway/
├── docker/
│   ├── Dockerfile.stream-manager
│   ├── Dockerfile.ffmpeg-worker
│   ├── Dockerfile.hls-segmenter
│   ├── Dockerfile.buffer-manager
│   └── Dockerfile.health-monitor
├── docker-compose.yml
├── requirements.txt
├── config/
│   ├── cameras.yaml
│   ├── gateway.yaml
│   └── logging.yaml
├── src/
│   ├── __init__.py
│   ├── main.py                    # Application entry point
│   ├── api/
│   │   ├── __init__.py
│   │   ├── server.py              # FastAPI HTTP server
│   │   ├── routes/
│   │   │   ├── __init__.py
│   │   │   ├── cameras.py         # Camera CRUD endpoints
│   │   │   ├── streams.py         # Stream control endpoints
│   │   │   ├── hls.py             # HLS playlist proxy
│   │   │   ├── status.py          # Health & status
│   │   │   └── config.py          # Configuration endpoints
│   │   └── middleware/
│   │       ├── auth.py            # JWT authentication
│   │       ├── cors.py            # CORS headers
│   │       └── logging.py         # Request logging
│   ├── core/
│   │   ├── __init__.py
│   │   ├── config.py              # Configuration loader
│   │   ├── events.py              # Event bus (asyncio)
│   │   ├── logging_setup.py       # Structured logging
│   │   └── exceptions.py          # Custom exceptions
│   ├── streams/
│   │   ├── __init__.py
│   │   ├── manager.py             # StreamManager (orchestrator)
│   │   ├── connector.py           # StreamConnector (FFmpeg wrapper)
│   │   ├── registry.py            # CameraRegistry
│   │   ├── circuit_breaker.py     # CircuitBreaker
│   │   ├── status.py              # CameraStatus model
│   │   └── ffmpeg_builder.py      # FFmpeg command builder
│   ├── processing/
│   │   ├── __init__.py
│   │   ├── frame_extractor.py     # FrameExtractor
│   │   ├── multi_extractor.py     # MultiChannelExtractor
│   │   ├── adaptive_bitrate.py    # AdaptiveBitrateController
│   │   └── motion_detector.py     # Motion detection utilities
│   ├── hls/
│   │   ├── __init__.py
│   │   ├── segmenter.py           # HLS segment generation
│   │   ├── playlist.py            # Playlist manager
│   │   └── server.py              # HLS HTTP server
│   ├── storage/
│   │   ├── __init__.py
│   │   ├── ring_buffer.py         # RingBuffer
│   │   ├── upload_manager.py      # UploadManager
│   │   ├── disk_manager.py        # DiskManager
│   │   └── pre_event_buffer.py    # Circular pre-event recording
│   ├── discovery/
│   │   ├── __init__.py
│   │   └── onvif_discovery.py     # ONVIF camera discovery
│   ├── mqtt/
│   │   ├── __init__.py
│   │   └── client.py              # MQTT publisher/subscriber
│   ├── health/
│   │   ├── __init__.py
│   │   ├── monitor.py             # HealthMonitor & Watchdog
│   │   └── metrics.py             # Prometheus metrics
│   └── utils/
│       ├── __init__.py
│       ├── async_utils.py         # Async helpers
│       ├── validators.py          # Input validation
│       └── time_utils.py          # Timestamp utilities
├── tests/
│   ├── unit/
│   ├── integration/
│   └── fixtures/
├── scripts/
│   ├── start.sh
│   ├── health_check.sh
│   └── backup_config.sh
└── docs/
    ├── ARCHITECTURE.md
    └── API.md
```

### 8.2 Core Classes — Full Specification

#### 8.2.1 StreamManager (Main Orchestrator)

```python
# src/streams/manager.py
import asyncio
import logging
from typing import Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime

from .connector import StreamConnector, ReconnectConfig
from .registry import CameraRegistry
from .circuit_breaker import CircuitBreaker, CircuitBreakerConfig
from .status import CameraStatus
from .ffmpeg_builder import FFmpegCommandBuilder
from ..core.config import GatewayConfig
from ..health.metrics import MetricsCollector

logger = logging.getLogger(__name__)

@dataclass
class StreamManagerConfig:
    reconnect: ReconnectConfig
    circuit_breaker: CircuitBreakerConfig
    ffmpeg: FFmpegCommandBuilder
    max_concurrent_streams: int = 8

class StreamManager:
    """
    Central orchestrator for all camera streams.
    Manages lifecycle: discovery -> connection -> monitoring -> cleanup.
    """

    def __init__(self, config: StreamManagerConfig, registry: CameraRegistry,
                 metrics: MetricsCollector):
        self.cfg = config
        self.registry = registry
        self.metrics = metrics
        self.connectors: Dict[int, StreamConnector] = {}
        self.circuit_breakers: Dict[int, CircuitBreaker] = {}
        self._monitor_task: Optional[asyncio.Task] = None
        self._running = False

    async def start(self):
        """Start the stream manager and connect all enabled cameras."""
        self._running = True
        cameras = await self.registry.get_all_cameras()

        for camera in cameras:
            if camera.enabled:
                await self._start_camera_stream(camera.channel)

        self._monitor_task = asyncio.create_task(
            self._health_monitor_loop(),
            name="stream-health-monitor"
        )
        logger.info(f"StreamManager started: {len(self.connectors)} streams active")

    async def _start_camera_stream(self, channel: int):
        """Initialize and start a single camera stream."""
        camera = await self.registry.get_camera(channel)
        if not camera:
            logger.error(f"Camera {channel} not found in registry")
            return

        # Create circuit breaker
        cb = CircuitBreaker(f"ch{channel}", self.cfg.circuit_breaker)
        self.circuit_breakers[channel] = cb

        # Build FFmpeg command
        rtsp_url = camera.get_rtsp_url(primary=True)
        ffmpeg_cmd = self.cfg.ffmpeg.build(
            rtsp_url=rtsp_url,
            hls_output=f"/data/hls/ch{channel}.m3u8",
            segment_pattern=f"/data/hls/ch{channel}_%03d.ts",
        )

        # Create and start connector
        connector = StreamConnector(
            channel=channel,
            config=self.cfg.reconnect,
            ffmpeg_cmd=ffmpeg_cmd,
        )
        self.connectors[channel] = connector
        await connector.start()

        logger.info(f"Started stream for channel {channel}: {camera.name}")

    async def stop_camera_stream(self, channel: int):
        """Gracefully stop a camera stream."""
        if channel in self.connectors:
            await self.connectors[channel].stop()
            del self.connectors[channel]
        if channel in self.circuit_breakers:
            del self.circuit_breakers[channel]
        logger.info(f"Stopped stream for channel {channel}")

    async def restart_camera_stream(self, channel: int):
        """Restart a specific camera stream."""
        await self.stop_camera_stream(channel)
        await asyncio.sleep(2)
        await self._start_camera_stream(channel)

    async def get_status(self, channel: Optional[int] = None) -> List[CameraStatus]:
        """Get status for one or all cameras."""
        if channel is not None:
            connector = self.connectors.get(channel)
            if connector:
                return [self._build_status(channel, connector)]
            return []

        return [
            self._build_status(ch, conn)
            for ch, conn in self.connectors.items()
        ]

    def _build_status(self, channel: int, connector: StreamConnector) -> CameraStatus:
        camera = self.registry.get_camera_sync(channel)
        cb = self.circuit_breakers.get(channel)

        return CameraStatus(
            channel=channel,
            name=camera.name if camera else f"CH{channel}",
            enabled=camera.enabled if camera else False,
            state=connector.state.value if hasattr(connector, 'state') else "unknown",
            stream_type="primary" if camera and camera.use_primary else "secondary",
            current_fps=getattr(connector, 'current_fps', 0),
            disconnect_count=connector.consecutive_failures if hasattr(connector, 'consecutive_failures') else 0,
            circuit_breaker_state=cb.state.value if cb else "CLOSED",
        )

    async def _health_monitor_loop(self):
        """Periodic health check for all streams."""
        while self._running:
            try:
                for channel, connector in list(self.connectors.items()):
                    status = self._build_status(channel, connector)

                    # Record metrics
                    self.metrics.record_fps(channel, status.current_fps)
                    self.metrics.record_stream_state(channel, status.state)

                    # Check for anomalies
                    if status.state == "ONLINE" and status.current_fps < 5:
                        logger.warning(f"CH{channel}: Low FPS detected "
                                       f"({status.current_fps:.1f})")

            except Exception as e:
                logger.error(f"Health monitor error: {e}")

            await asyncio.sleep(30)

    async def stop(self):
        """Graceful shutdown of all streams."""
        self._running = False
        if self._monitor_task:
            self._monitor_task.cancel()
            try:
                await self._monitor_task
            except asyncio.CancelledError:
                pass

        # Stop all connectors concurrently
        await asyncio.gather(*[
            connector.stop()
            for connector in self.connectors.values()
        ])
        self.connectors.clear()
        self.circuit_breakers.clear()
        logger.info("StreamManager stopped")
```

#### 8.2.2 FFmpegCommandBuilder

```python
# src/streams/ffmpeg_builder.py
from dataclasses import dataclass, field
from typing import List, Optional

@dataclass
class FFmpegPreset:
    """Predefined FFmpeg configuration preset."""
    name: str
    transport: str = "tcp"
    timeout_us: int = 5_000_000
    reconnect: bool = True
    reconnect_max_delay: int = 30
    flags: List[str] = field(default_factory=lambda: [
        "+genpts", "+discardcorrupt", "+low_delay"
    ])
    probesize: int = 5_000_000
    analyzeduration: int = 2_000_000

class FFmpegCommandBuilder:
    """
    Builds FFmpeg command lines for various stream operations.
    Supports presets for different use cases.
    """

    PRESETS = {
        "ingest": FFmpegPreset("ingest"),
        "hls": FFmpegPreset("hls", reconnect_max_delay=60),
        "ai_extraction": FFmpegPreset("ai_extraction", probesize=1_000_000),
        "recording": FFmpegPreset("recording", reconnect_max_delay=300),
    }

    def build(self, rtsp_url: str, hls_output: str, segment_pattern: str,
              preset: str = "hls", audio: bool = True,
              segment_duration: int = 2, playlist_size: int = 5) -> List[str]:
        """Build complete FFmpeg command for HLS output."""
        p = self.PRESETS.get(preset, self.PRESETS["hls"])

        cmd = ["ffmpeg", "-nostdin", "-y"]

        # Input options
        cmd.extend(["-rtsp_transport", p.transport])
        cmd.extend(["-stimeout", str(p.timeout_us)])

        if p.reconnect:
            cmd.extend([
                "-reconnect", "1",
                "-reconnect_at_eof", "1",
                "-reconnect_streamed", "1",
                "-reconnect_delay_max", str(p.reconnect_max_delay),
            ])

        cmd.extend(["-fflags", "".join(p.flags)])
        cmd.extend(["-flags", "+low_delay"])
        cmd.extend(["-probesize", str(p.probesize)])
        cmd.extend(["-analyzeduration", str(p.analyzeduration)])

        # Input
        cmd.extend(["-i", rtsp_url])

        # Video codec (copy for efficiency)
        cmd.extend(["-c:v", "copy"])

        # Audio
        if audio:
            cmd.extend(["-c:a", "copy"])
        else:
            cmd.extend(["-an"])

        # HLS output
        cmd.extend(["-f", "hls"])
        cmd.extend(["-hls_time", str(segment_duration)])
        cmd.extend(["-hls_list_size", str(playlist_size)])
        cmd.extend(["-hls_flags", "delete_segments+omit_endlist+program_date_time"])
        cmd.extend(["-hls_segment_type", "mpegts"])
        cmd.extend(["-hls_segment_filename", segment_pattern])
        cmd.append(hls_output)

        return cmd

    def build_ai_extraction(self, rtsp_url: str, target_fps: float = 5.0,
                            resolution: tuple = (640, 480),
                            mode: str = "hybrid") -> List[str]:
        """Build FFmpeg command for AI frame extraction."""
        w, h = resolution
        filters = f"fps={target_fps},scale={w}:{h}"

        if mode == "keyframe_only":
            filters = f"select='eq(pict_type,I)',{filters}"
        elif mode == "hybrid":
            # I-frame priority with fixed FPS fallback
            interval = int(25 / target_fps)
            filters = f"select='eq(pict_type,I)+gte(n,prev_selected_n+{interval})',{filters}"

        return [
            "ffmpeg", "-nostdin", "-y",
            "-rtsp_transport", "tcp",
            "-stimeout", "5000000",
            "-reconnect", "1",
            "-reconnect_at_eof", "1",
            "-reconnect_streamed", "1",
            "-reconnect_delay_max", "30",
            "-fflags", "+genpts+discardcorrupt",
            "-flags", "+low_delay",
            "-i", rtsp_url,
            "-vf", filters,
            "-an",  # No audio for AI
            "-f", "image2pipe",
            "-vcodec", "mjpeg",
            "-q:v", "3",
            "-",
        ]

    def build_recording(self, rtsp_url: str, output_pattern: str,
                        segment_seconds: int = 60) -> List[str]:
        """Build FFmpeg command for segmented recording."""
        return [
            "ffmpeg", "-nostdin", "-y",
            "-rtsp_transport", "tcp",
            "-stimeout", "5000000",
            "-reconnect", "1",
            "-reconnect_at_eof", "1",
            "-reconnect_streamed", "1",
            "-reconnect_delay_max", "300",
            "-fflags", "+genpts+discardcorrupt",
            "-flags", "+low_delay",
            "-i", rtsp_url,
            "-c:v", "copy",
            "-c:a", "copy",
            "-f", "segment",
            "-segment_time", str(segment_seconds),
            "-segment_format", "mp4",
            "-reset_timestamps", "1",
            "-strftime", "1",
            output_pattern,
        ]

    def build_webrtc_bridge(self, rtsp_url: str,
                            output_resolution: tuple = (640, 480)) -> List[str]:
        """Build FFmpeg command for WebRTC bridge (raw video output)."""
        w, h = output_resolution
        return [
            "ffmpeg", "-nostdin", "-y",
            "-rtsp_transport", "tcp",
            "-stimeout", "5000000",
            "-reconnect", "1",
            "-reconnect_at_eof", "1",
            "-reconnect_streamed", "1",
            "-reconnect_delay_max", "30",
            "-fflags", "+genpts+discardcorrupt",
            "-flags", "+low_delay",
            "-i", rtsp_url,
            "-vf", f"scale={w}:{h},format=yuv420p",
            "-c:v", "rawvideo",
            "-pix_fmt", "yuv420p",
            "-f", "rawvideo",
            "-",
        ]
```

#### 8.2.3 CameraRegistry

```python
# src/streams/registry.py
import yaml
import asyncio
import logging
from typing import List, Optional, Dict
from dataclasses import dataclass, field
from pathlib import Path
import aiofiles

logger = logging.getLogger(__name__)

@dataclass
class CameraConfig:
    channel: int
    name: str
    enabled: bool = True
    primary_stream_url: str = ""
    secondary_stream_url: str = ""
    use_primary: bool = False
    ai_enabled: bool = True
    ai_fps: float = 5.0
    ai_resolution: str = "640x480"
    ai_mode: str = "hybrid"
    recording_enabled: bool = True
    pre_event_seconds: int = 10
    post_event_seconds: int = 30
    tags: Dict[str, str] = field(default_factory=dict)

    def get_rtsp_url(self, primary: bool = False) -> str:
        if primary and self.primary_stream_url:
            return self.primary_stream_url
        return self.secondary_stream_url or self.primary_stream_url

class CameraRegistry:
    """
    Manages camera configuration from YAML file.
    Supports hot-reloading and ONVIF discovery integration.
    """

    def __init__(self, config_path: str):
        self.config_path = Path(config_path)
        self._cameras: Dict[int, CameraConfig] = {}
        self._lock = asyncio.Lock()

    async def load(self):
        """Load camera configuration from YAML."""
        async with self._lock:
            if not self.config_path.exists():
                logger.warning(f"Config file not found: {self.config_path}")
                return

            async with aiofiles.open(self.config_path, 'r') as f:
                content = await f.read()

            data = yaml.safe_load(content)
            self._cameras = {}

            for cam_data in data.get('cameras', []):
                cam = CameraConfig(
                    channel=cam_data['channel'],
                    name=cam_data['name'],
                    enabled=cam_data.get('enabled', True),
                    primary_stream_url=cam_data.get('primary_stream', {}).get('url', ''),
                    secondary_stream_url=cam_data.get('secondary_stream', {}).get('url', ''),
                    ai_enabled=cam_data.get('ai_config', {}).get('enabled', True),
                    ai_fps=cam_data.get('ai_config', {}).get('inference_fps', 5.0),
                    ai_resolution=cam_data.get('ai_config', {}).get('resolution', '640x480'),
                    ai_mode=cam_data.get('ai_config', {}).get('mode', 'hybrid'),
                    recording_enabled=cam_data.get('recording_config', {}).get('enabled', True),
                    pre_event_seconds=cam_data.get('recording_config', {}).get('pre_event_seconds', 10),
                    post_event_seconds=cam_data.get('recording_config', {}).get('post_event_seconds', 30),
                    tags=cam_data.get('tags', {}),
                )
                self._cameras[cam.channel] = cam

            logger.info(f"Loaded {len(self._cameras)} cameras from config")

    async def save(self):
        """Persist current configuration to YAML."""
        async with self._lock:
            data = {
                'cameras': [],
                'settings': {
                    'default_ai_fps': 5.0,
                    'default_inference_resolution': '640x480',
                }
            }
            for cam in self._cameras.values():
                data['cameras'].append({
                    'channel': cam.channel,
                    'name': cam.name,
                    'enabled': cam.enabled,
                    'primary_stream': {'url': cam.primary_stream_url},
                    'secondary_stream': {'url': cam.secondary_stream_url},
                    'ai_config': {
                        'enabled': cam.ai_enabled,
                        'inference_fps': cam.ai_fps,
                        'resolution': cam.ai_resolution,
                        'mode': cam.ai_mode,
                    },
                    'recording_config': {
                        'enabled': cam.recording_enabled,
                        'pre_event_seconds': cam.pre_event_seconds,
                        'post_event_seconds': cam.post_event_seconds,
                    },
                    'tags': cam.tags,
                })

            async with aiofiles.open(self.config_path, 'w') as f:
                await f.write(yaml.dump(data, default_flow_style=False, sort_keys=False))

    async def get_all_cameras(self) -> List[CameraConfig]:
        return list(self._cameras.values())

    async def get_camera(self, channel: int) -> Optional[CameraConfig]:
        return self._cameras.get(channel)

    def get_camera_sync(self, channel: int) -> Optional[CameraConfig]:
        return self._cameras.get(channel)

    async def update_camera(self, channel: int, updates: dict):
        async with self._lock:
            if channel not in self._cameras:
                raise ValueError(f"Camera {channel} not found")
            cam = self._cameras[channel]
            for key, value in updates.items():
                if hasattr(cam, key):
                    setattr(cam, key, value)
            await self.save()

    async def add_camera(self, camera: CameraConfig):
        async with self._lock:
            if camera.channel in self._cameras:
                raise ValueError(f"Camera {camera.channel} already exists")
            self._cameras[camera.channel] = camera
            await self.save()

    async def remove_camera(self, channel: int):
        async with self._lock:
            if channel in self._cameras:
                del self._cameras[channel]
                await self.save()
```

#### 8.2.4 HealthMonitor & Watchdog

```python
# src/health/monitor.py
import asyncio
import logging
import subprocess
from datetime import datetime
from typing import Dict, List, Optional, Callable
from dataclasses import dataclass, field

logger = logging.getLogger(__name__)

@dataclass
class HealthReport:
    timestamp: datetime
    gateway_id: str
    overall_status: str  # healthy, degraded, critical
    dvr_reachable: bool
    dvr_latency_ms: float
    streams_online: int
    streams_total: int
    vpn_tunnel_up: bool
    vpn_latency_ms: float
    disk_usage_percent: float
    memory_usage_percent: float
    cpu_usage_percent: float
    issues: List[str] = field(default_factory=list)
    stream_details: Dict[int, dict] = field(default_factory=dict)

class HealthMonitor:
    """
    Comprehensive health monitoring for the edge gateway.
    Checks DVR connectivity, stream health, VPN tunnel, and system resources.
    """

    def __init__(self, gateway_id: str, dvr_host: str, vpn_endpoint: str,
                 check_interval: float = 30.0):
        self.gateway_id = gateway_id
        self.dvr_host = dvr_host
        self.vpn_endpoint = vpn_endpoint
        self.check_interval = check_interval
        self._callbacks: List[Callable[[HealthReport], None]] = []
        self._running = False
        self._task: Optional[asyncio.Task] = None

    def on_health_change(self, callback: Callable[[HealthReport], None]):
        self._callbacks.append(callback)

    async def start(self):
        self._running = True
        self._task = asyncio.create_task(self._monitor_loop(), name="health-monitor")

    async def _monitor_loop(self):
        while self._running:
            try:
                report = await self._run_checks()
                for cb in self._callbacks:
                    try:
                        if asyncio.iscoroutinefunction(cb):
                            await cb(report)
                        else:
                            cb(report)
                    except Exception as e:
                        logger.error(f"Health callback error: {e}")
            except Exception as e:
                logger.error(f"Health check error: {e}")
            await asyncio.sleep(self.check_interval)

    async def _run_checks(self) -> HealthReport:
        timestamp = datetime.utcnow()
        issues = []

        # Check DVR reachability
        dvr_reachable, dvr_latency = await self._ping_host(self.dvr_host)
        if not dvr_reachable:
            issues.append(f"DVR unreachable: {self.dvr_host}")

        # Check VPN tunnel
        vpn_up, vpn_latency = await self._ping_host(self.vpn_endpoint)
        if not vpn_up:
            issues.append(f"VPN tunnel down: {self.vpn_endpoint}")

        # Check system resources
        disk_pct = await self._get_disk_usage()
        memory_pct = await self._get_memory_usage()
        cpu_pct = await self._get_cpu_usage()

        if disk_pct > 90:
            issues.append(f"Disk critical: {disk_pct:.1f}%")
        elif disk_pct > 80:
            issues.append(f"Disk warning: {disk_pct:.1f}%")

        if memory_pct > 90:
            issues.append(f"Memory critical: {memory_pct:.1f}%")

        # Determine overall status
        if len(issues) == 0:
            status = "healthy"
        elif any("critical" in i for i in issues):
            status = "critical"
        else:
            status = "degraded"

        return HealthReport(
            timestamp=timestamp,
            gateway_id=self.gateway_id,
            overall_status=status,
            dvr_reachable=dvr_reachable,
            dvr_latency_ms=dvr_latency,
            streams_online=0,  # Filled by StreamManager
            streams_total=8,
            vpn_tunnel_up=vpn_up,
            vpn_latency_ms=vpn_latency,
            disk_usage_percent=disk_pct,
            memory_usage_percent=memory_pct,
            cpu_usage_percent=cpu_pct,
            issues=issues,
        )

    async def _ping_host(self, host: str, count: int = 3) -> tuple:
        """Ping a host and return (reachable, avg_latency_ms)."""
        try:
            proc = await asyncio.create_subprocess_exec(
                "ping", "-c", str(count), "-W", "2", host,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE,
            )
            stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=10)
            output = stdout.decode()

            if "0% packet loss" in output or proc.returncode == 0:
                # Parse latency
                try:
                    avg_line = [l for l in output.split('\n') if 'avg' in l][-1]
                    avg_ms = float(avg_line.split('/')[-3])
                    return True, avg_ms
                except (IndexError, ValueError):
                    return True, 0.0
            return False, -1.0
        except Exception:
            return False, -1.0

    async def _get_disk_usage(self) -> float:
        try:
            proc = await asyncio.create_subprocess_exec(
                "df", "/data", "--output=pcent",
                stdout=asyncio.subprocess.PIPE,
            )
            stdout, _ = await proc.communicate()
            lines = stdout.decode().strip().split('\n')
            if len(lines) >= 2:
                return float(lines[1].replace('%', ''))
        except Exception:
            pass
        return 0.0

    async def _get_memory_usage(self) -> float:
        try:
            proc = await asyncio.create_subprocess_exec(
                "free", "|", "grep", "Mem",
                stdout=asyncio.subprocess.PIPE,
            )
            stdout, _ = await proc.communicate()
            # Parse: Mem: total used free shared buff/cache available
            parts = stdout.decode().split()
            total = float(parts[1])
            used = float(parts[2])
            return (used / total) * 100
        except Exception:
            return 0.0

    async def _get_cpu_usage(self) -> float:
        try:
            proc = await asyncio.create_subprocess_exec(
                "top", "-bn1", "|", "grep", "Cpu(s)",
                stdout=asyncio.subprocess.PIPE,
            )
            stdout, _ = await proc.communicate()
            line = stdout.decode()
            # Parse: %Cpu(s): 10.5 us,  5.2 sy, ...
            if 'us' in line:
                us = float(line.split('us')[0].split()[-1])
                sy = float(line.split('sy')[0].split()[-1])
                return us + sy
        except Exception:
            return 0.0

    async def stop(self):
        self._running = False
        if self._task:
            self._task.cancel()
            try:
                await self._task
            except asyncio.CancelledError:
                pass
```

#### 8.2.5 FastAPI Server (Diagnostic UI & API)

```python
# src/api/server.py
from fastapi import FastAPI, HTTPException, Depends, WebSocket
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse, FileResponse
import asyncio
import logging
from typing import List, Optional

from ..streams.manager import StreamManager
from ..streams.status import CameraStatus
from ..health.monitor import HealthMonitor, HealthReport
from ..core.config import GatewayConfig

logger = logging.getLogger(__name__)

# HTML for minimal diagnostic UI
DIAGNOSTIC_HTML = """
<!DOCTYPE html>
<html>
<head>
    <title>Edge Gateway Diagnostics</title>
    <style>
        body { font-family: monospace; background: #1a1a2e; color: #eee; padding: 20px; }
        h1 { color: #00d4ff; }
        .status-healthy { color: #00ff88; }
        .status-degraded { color: #ffaa00; }
        .status-critical { color: #ff4444; }
        .status-offline { color: #888; }
        table { border-collapse: collapse; width: 100%; margin-top: 20px; }
        th, td { border: 1px solid #333; padding: 8px; text-align: left; }
        th { background: #16213e; }
        tr:nth-child(even) { background: #0f3460; }
        .metric { display: inline-block; margin: 10px 20px 10px 0; }
        .metric-label { color: #888; font-size: 12px; }
        .metric-value { font-size: 24px; font-weight: bold; }
        #log { background: #0a0a1a; padding: 10px; height: 200px; overflow-y: auto; 
               font-size: 11px; margin-top: 20px; border: 1px solid #333; }
    </style>
</head>
<body>
    <h1>Edge Gateway Diagnostics — {{gateway_id}}</h1>
    <div>
        <div class="metric">
            <div class="metric-label">Status</div>
            <div class="metric-value status-{{health_status}}">{{health_status.upper()}}</div>
        </div>
        <div class="metric">
            <div class="metric-label">Streams Online</div>
            <div class="metric-value">{{streams_online}}/{{streams_total}}</div>
        </div>
        <div class="metric">
            <div class="metric-label">DVR</div>
            <div class="metric-value">{{dvr_status}}</div>
        </div>
        <div class="metric">
            <div class="metric-label">VPN</div>
            <div class="metric-value">{{vpn_status}}</div>
        </div>
        <div class="metric">
            <div class="metric-label">Disk</div>
            <div class="metric-value">{{disk_usage}}%</div>
        </div>
    </div>
    <h2>Camera Streams</h2>
    <table>
        <tr>
            <th>CH</th><th>Name</th><th>State</th><th>FPS</th>
            <th>Bitrate</th><th>Resolution</th><th>Uptime</th><th>Circuit</th>
        </tr>
        {{#cameras}}
        <tr>
            <td>{{channel}}</td>
            <td>{{name}}</td>
            <td class="status-{{state}}">{{state}}</td>
            <td>{{fps}}</td>
            <td>{{bitrate}}</td>
            <td>{{resolution}}</td>
            <td>{{uptime}}</td>
            <td>{{circuit_breaker}}</td>
        </tr>
        {{/cameras}}
    </table>
    <h2>System Log</h2>
    <div id="log">{{system_log}}</div>
    <script>
        setTimeout(() => location.reload(), 5000);
    </script>
</body>
</html>
"""

def create_app(stream_manager: StreamManager, health_monitor: HealthMonitor,
               config: GatewayConfig) -> FastAPI:
    app = FastAPI(title="Edge Gateway API", version="1.0.0")

    # CORS
    app.add_middleware(
        CORSMiddleware,
        allow_origins=["https://dashboard.example.com"],
        allow_credentials=True,
        allow_methods=["*"],
        allow_headers=["*"],
    )

    @app.get("/", response_class=HTMLResponse)
    async def diagnostic_ui():
        """Minimal diagnostic HTML UI."""
        status = await stream_manager.get_status()
        health = await health_monitor._run_checks()

        # Simple template rendering (in production use Jinja2)
        cameras_html = ""
        for cam in status:
            cameras_html += f"""
            <tr>
                <td>{cam.channel}</td>
                <td>{cam.name}</td>
                <td class="status-{cam.state}">{cam.state}</td>
                <td>{cam.current_fps:.1f}</td>
                <td>{cam.current_bitrate_kbps:.0f}</td>
                <td>{cam.resolution}</td>
                <td>{cam.uptime_seconds:.0f}s</td>
                <td>{cam.circuit_breaker_state}</td>
            </tr>"""

        html = DIAGNOSTIC_HTML
        html = html.replace("{{gateway_id}}", config.gateway_id)
        html = html.replace("{{health_status}}", health.overall_status)
        html = html.replace("{{streams_online}}", str(health.streams_online))
        html = html.replace("{{streams_total}}", str(health.streams_total))
        html = html.replace("{{dvr_status}}", "ONLINE" if health.dvr_reachable else "OFFLINE")
        html = html.replace("{{vpn_status}}", "UP" if health.vpn_tunnel_up else "DOWN")
        html = html.replace("{{disk_usage}}", f"{health.disk_usage_percent:.1f}")
        html = html.replace("{{#cameras}}", "")
        html = html.replace("{{/cameras}}", "")
        # Insert camera rows between table rows
        html_parts = html.split("<tr>\n            <th>CH</th>")
        if len(html_parts) == 2:
            html = html_parts[0] + cameras_html + "<tr>\n            <th>CH</th>" + html_parts[1]

        return HTMLResponse(content=html)

    @app.get("/api/v1/status")
    async def get_status():
        """Get overall gateway status."""
        streams = await stream_manager.get_status()
        health = await health_monitor._run_checks()
        return {
            "gateway_id": config.gateway_id,
            "timestamp": datetime.utcnow().isoformat(),
            "health": health.overall_status,
            "dvr": {"reachable": health.dvr_reachable, "latency_ms": health.dvr_latency_ms},
            "vpn": {"up": health.vpn_tunnel_up, "latency_ms": health.vpn_latency_ms},
            "system": {
                "disk_percent": health.disk_usage_percent,
                "memory_percent": health.memory_usage_percent,
                "cpu_percent": health.cpu_usage_percent,
            },
            "streams": [s.to_dict() for s in streams],
        }

    @app.get("/api/v1/cameras")
    async def list_cameras():
        """List all configured cameras."""
        return await stream_manager.registry.get_all_cameras()

    @app.get("/api/v1/cameras/{channel}")
    async def get_camera(channel: int):
        camera = await stream_manager.registry.get_camera(channel)
        if not camera:
            raise HTTPException(status_code=404, detail=f"Camera {channel} not found")
        return camera

    @app.post("/api/v1/cameras/{channel}/restart")
    async def restart_camera_stream(channel: int):
        """Restart a specific camera stream."""
        await stream_manager.restart_camera_stream(channel)
        return {"status": "restarted", "channel": channel}

    @app.get("/api/v1/streams/{channel}/status")
    async def get_stream_status(channel: int):
        """Get detailed stream status."""
        status = await stream_manager.get_status(channel)
        if not status:
            raise HTTPException(status_code=404, detail=f"Stream {channel} not found")
        return status[0].to_dict()

    @app.get("/api/v1/hls/{channel}.m3u8")
    async def get_hls_playlist(channel: int):
        """Proxy HLS playlist for a channel."""
        playlist_path = f"/data/hls/ch{channel}.m3u8"
        if not os.path.exists(playlist_path):
            raise HTTPException(status_code=404, detail="Playlist not available")
        return FileResponse(playlist_path, media_type="application/vnd.apple.mpegurl")

    @app.get("/api/v1/hls/{segment}")
    async def get_hls_segment(segment: str):
        """Serve HLS segment file."""
        segment_path = f"/data/hls/{segment}"
        if not os.path.exists(segment_path):
            raise HTTPException(status_code=404, detail="Segment not found")
        return FileResponse(segment_path, media_type="video/mp2t")

    @app.websocket("/ws/live")
    async def websocket_status(websocket: WebSocket):
        """WebSocket for real-time status updates."""
        await websocket.accept()
        try:
            while True:
                streams = await stream_manager.get_status()
                health = await health_monitor._run_checks()
                await websocket.send_json({
                    "timestamp": datetime.utcnow().isoformat(),
                    "health": health.overall_status,
                    "streams": [s.to_dict() for s in streams],
                })
                await asyncio.sleep(5)
        except Exception:
            await websocket.close()

    return app
```



### 8.3 Docker Compose Configuration

```yaml
# docker-compose.yml — Edge Gateway Full Stack
version: "3.8"

services:
  # ============================================
  # Core: Stream Manager (API + Orchestration)
  # ============================================
  stream-manager:
    build:
      context: .
      dockerfile: docker/Dockerfile.stream-manager
    container_name: stream-manager
    hostname: stream-manager
    restart: unless-stopped
    ports:
      - "8080:8080"       # API + Diagnostic UI
    volumes:
      - /data/config:/app/config:ro
      - /data/logs:/app/logs
      - /data/hls:/data/hls:ro         # Read HLS segments
      - /data/buffer:/data/buffer       # Access buffer
      - /var/run/docker.sock:/var/run/docker.sock:ro  # Container mgmt
    environment:
      - GATEWAY_ID=${GATEWAY_ID:-edge-ind-01}
      - DVR_HOST=${DVR_HOST:-192.168.29.200}
      - DVR_RTSP_PORT=${DVR_RTSP_PORT:-554}
      - DVR_USERNAME=${DVR_USERNAME:-admin}
      - DVR_PASSWORD=${DVR_PASSWORD}
      - LOG_LEVEL=${LOG_LEVEL:-INFO}
      - STREAM_TRANSPORT=${STREAM_TRANSPORT:-tcp}
      - HEALTH_CHECK_INTERVAL=${HEALTH_CHECK_INTERVAL:-30}
      - MQTT_BROKER_HOST=mqtt-broker
      - MQTT_BROKER_PORT=1883
      - HLS_OUTPUT_DIR=/data/hls
      - BUFFER_DIR=/data/buffer
    networks:
      - edge-internal
    cpus: "2.0"
    mem_limit: 512m
    logging:
      driver: "json-file"
      options:
        max-size: "10m"
        max-file: "5"
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8080/api/v1/status"]
      interval: 30s
      timeout: 5s
      retries: 3
      start_period: 30s
    depends_on:
      - mqtt-broker

  # ============================================
  # FFmpeg Workers (CH1-CH4 — Primary Streams)
  # ============================================
  ffmpeg-worker-1:
    image: jrottenberg/ffmpeg:6.1-ubuntu2204
    container_name: ffmpeg-worker-1
    restart: unless-stopped
    command: >
      ffmpeg -nostdin -y
      -rtsp_transport tcp -stimeout 5000000
      -reconnect 1 -reconnect_at_eof 1 -reconnect_streamed 1 -reconnect_delay_max 30
      -fflags +genpts+discardcorrupt -flags +low_delay
      -probesize 5000000 -analyzeduration 2000000
      -i "rtsp://${DVR_USERNAME}:${DVR_PASSWORD}@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0"
      -c:v copy -c:a copy
      -f hls -hls_time 2 -hls_list_size 5
      -hls_flags delete_segments+omit_endlist+program_date_time
      -hls_segment_filename "/data/hls/ch1_%03d.ts"
      "/data/hls/ch1.m3u8"
      -c:v copy -c:a copy
      -f segment -segment_time 60 -segment_format mp4
      -reset_timestamps 1 -strftime 1
      "/data/buffer/continuous/ch1_%Y%m%d_%H%M%S.mp4"
    volumes:
      - /data/hls:/data/hls
      - /data/buffer/continuous:/data/buffer/continuous
    environment:
      - DVR_PASSWORD=${DVR_PASSWORD}
    networks:
      - edge-internal
    cpus: "2.0"
    mem_limit: 256m
    logging:
      driver: "json-file"
      options:
        max-size: "5m"
        max-file: "3"
    deploy:
      restart_policy:
        condition: on-failure
        delay: 5s
        max_attempts: 10

  ffmpeg-worker-2:
    image: jrottenberg/ffmpeg:6.1-ubuntu2204
    container_name: ffmpeg-worker-2
    restart: unless-stopped
    command: >
      ffmpeg -nostdin -y
      -rtsp_transport tcp -stimeout 5000000
      -reconnect 1 -reconnect_at_eof 1 -reconnect_streamed 1 -reconnect_delay_max 30
      -fflags +genpts+discardcorrupt -flags +low_delay
      -probesize 5000000 -analyzeduration 2000000
      -i "rtsp://${DVR_USERNAME}:${DVR_PASSWORD}@192.168.29.200:554/cam/realmonitor?channel=2&subtype=0"
      -c:v copy -c:a copy
      -f hls -hls_time 2 -hls_list_size 5
      -hls_flags delete_segments+omit_endlist+program_date_time
      -hls_segment_filename "/data/hls/ch2_%03d.ts"
      "/data/hls/ch2.m3u8"
      -c:v copy -c:a copy
      -f segment -segment_time 60 -segment_format mp4
      -reset_timestamps 1 -strftime 1
      "/data/buffer/continuous/ch2_%Y%m%d_%H%M%S.mp4"
    volumes:
      - /data/hls:/data/hls
      - /data/buffer/continuous:/data/buffer/continuous
    environment:
      - DVR_PASSWORD=${DVR_PASSWORD}
    networks:
      - edge-internal
    cpus: "2.0"
    mem_limit: 256m
    logging:
      driver: "json-file"
      options:
        max-size: "5m"
        max-file: "3"
    deploy:
      restart_policy:
        condition: on-failure
        delay: 5s
        max_attempts: 10

  ffmpeg-worker-3:
    image: jrottenberg/ffmpeg:6.1-ubuntu2204
    container_name: ffmpeg-worker-3
    restart: unless-stopped
    command: >
      ffmpeg -nostdin -y
      -rtsp_transport tcp -stimeout 5000000
      -reconnect 1 -reconnect_at_eof 1 -reconnect_streamed 1 -reconnect_delay_max 30
      -fflags +genpts+discardcorrupt -flags +low_delay
      -probesize 5000000 -analyzeduration 2000000
      -i "rtsp://${DVR_USERNAME}:${DVR_PASSWORD}@192.168.29.200:554/cam/realmonitor?channel=3&subtype=0"
      -c:v copy -c:a copy
      -f hls -hls_time 2 -hls_list_size 5
      -hls_flags delete_segments+omit_endlist+program_date_time
      -hls_segment_filename "/data/hls/ch3_%03d.ts"
      "/data/hls/ch3.m3u8"
    volumes:
      - /data/hls:/data/hls
    environment:
      - DVR_PASSWORD=${DVR_PASSWORD}
    networks:
      - edge-internal
    cpus: "2.0"
    mem_limit: 256m

  ffmpeg-worker-4:
    image: jrottenberg/ffmpeg:6.1-ubuntu2204
    container_name: ffmpeg-worker-4
    restart: unless-stopped
    command: >
      ffmpeg -nostdin -y
      -rtsp_transport tcp -stimeout 5000000
      -reconnect 1 -reconnect_at_eof 1 -reconnect_streamed 1 -reconnect_delay_max 30
      -fflags +genpts+discardcorrupt -flags +low_delay
      -probesize 5000000 -analyzeduration 2000000
      -i "rtsp://${DVR_USERNAME}:${DVR_PASSWORD}@192.168.29.200:554/cam/realmonitor?channel=4&subtype=0"
      -c:v copy -c:a copy
      -f hls -hls_time 2 -hls_list_size 5
      -hls_flags delete_segments+omit_endlist+program_date_time
      -hls_segment_filename "/data/hls/ch4_%03d.ts"
      "/data/hls/ch4.m3u8"
    volumes:
      - /data/hls:/data/hls
    environment:
      - DVR_PASSWORD=${DVR_PASSWORD}
    networks:
      - edge-internal
    cpus: "2.0"
    mem_limit: 256m

  # ============================================
  # FFmpeg Workers (CH5-CH8 — Secondary Streams)
  # ============================================
  ffmpeg-worker-5:
    image: jrottenberg/ffmpeg:6.1-ubuntu2204
    container_name: ffmpeg-worker-5
    restart: unless-stopped
    command: >
      ffmpeg -nostdin -y
      -rtsp_transport tcp -stimeout 5000000
      -reconnect 1 -reconnect_at_eof 1 -reconnect_streamed 1 -reconnect_delay_max 30
      -fflags +genpts+discardcorrupt -flags +low_delay
      -i "rtsp://${DVR_USERNAME}:${DVR_PASSWORD}@192.168.29.200:554/cam/realmonitor?channel=5&subtype=1"
      -c:v copy -c:a copy
      -f hls -hls_time 2 -hls_list_size 5
      -hls_flags delete_segments+omit_endlist+program_date_time
      -hls_segment_filename "/data/hls/ch5_%03d.ts"
      "/data/hls/ch5.m3u8"
    volumes:
      - /data/hls:/data/hls
    networks:
      - edge-internal
    cpus: "1.0"
    mem_limit: 128m

  ffmpeg-worker-6:
    image: jrottenberg/ffmpeg:6.1-ubuntu2204
    container_name: ffmpeg-worker-6
    restart: unless-stopped
    command: >
      ffmpeg -nostdin -y
      -rtsp_transport tcp -stimeout 5000000
      -reconnect 1 -reconnect_at_eof 1 -reconnect_streamed 1 -reconnect_delay_max 30
      -fflags +genpts+discardcorrupt -flags +low_delay
      -i "rtsp://${DVR_USERNAME}:${DVR_PASSWORD}@192.168.29.200:554/cam/realmonitor?channel=6&subtype=1"
      -c:v copy -c:a copy
      -f hls -hls_time 2 -hls_list_size 5
      -hls_flags delete_segments+omit_endlist+program_date_time
      -hls_segment_filename "/data/hls/ch6_%03d.ts"
      "/data/hls/ch6.m3u8"
    volumes:
      - /data/hls:/data/hls
    networks:
      - edge-internal
    cpus: "1.0"
    mem_limit: 128m

  ffmpeg-worker-7:
    image: jrottenberg/ffmpeg:6.1-ubuntu2204
    container_name: ffmpeg-worker-7
    restart: unless-stopped
    command: >
      ffmpeg -nostdin -y
      -rtsp_transport tcp -stimeout 5000000
      -reconnect 1 -reconnect_at_eof 1 -reconnect_streamed 1 -reconnect_delay_max 30
      -fflags +genpts+discardcorrupt -flags +low_delay
      -i "rtsp://${DVR_USERNAME}:${DVR_PASSWORD}@192.168.29.200:554/cam/realmonitor?channel=7&subtype=1"
      -c:v copy -c:a copy
      -f hls -hls_time 2 -hls_list_size 5
      -hls_flags delete_segments+omit_endlist+program_date_time
      -hls_segment_filename "/data/hls/ch7_%03d.ts"
      "/data/hls/ch7.m3u8"
    volumes:
      - /data/hls:/data/hls
    networks:
      - edge-internal
    cpus: "1.0"
    mem_limit: 128m

  ffmpeg-worker-8:
    image: jrottenberg/ffmpeg:6.1-ubuntu2204
    container_name: ffmpeg-worker-8
    restart: unless-stopped
    command: >
      ffmpeg -nostdin -y
      -rtsp_transport tcp -stimeout 5000000
      -reconnect 1 -reconnect_at_eof 1 -reconnect_streamed 1 -reconnect_delay_max 30
      -fflags +genpts+discardcorrupt -flags +low_delay
      -i "rtsp://${DVR_USERNAME}:${DVR_PASSWORD}@192.168.29.200:554/cam/realmonitor?channel=8&subtype=1"
      -c:v copy -c:a copy
      -f hls -hls_time 2 -hls_list_size 5
      -hls_flags delete_segments+omit_endlist+program_date_time
      -hls_segment_filename "/data/hls/ch8_%03d.ts"
      "/data/hls/ch8.m3u8"
    volumes:
      - /data/hls:/data/hls
    networks:
      - edge-internal
    cpus: "1.0"
    mem_limit: 128m

  # ============================================
  # AI Frame Extractor
  # ============================================
  frame-extractor:
    build:
      context: .
      dockerfile: docker/Dockerfile.frame-extractor
    container_name: frame-extractor
    restart: unless-stopped
    volumes:
      - /data/config:/app/config:ro
      - /data/logs:/app/logs
    environment:
      - DVR_HOST=192.168.29.200
      - DVR_USERNAME=${DVR_USERNAME:-admin}
      - DVR_PASSWORD=${DVR_PASSWORD}
      - AI_INFERENCE_FPS=${AI_INFERENCE_FPS:-5}
      - AI_KEYFRAME_ONLY=${AI_KEYFRAME_ONLY:-true}
      - AI_RESOLUTION=${AI_RESOLUTION:-640x480}
      - MQTT_BROKER_HOST=mqtt-broker
      - MQTT_BROKER_PORT=1883
      - MQTT_TOPIC_PREFIX=ai/frames
      - LOG_LEVEL=${LOG_LEVEL:-INFO}
    networks:
      - edge-internal
    cpus: "2.0"
    mem_limit: 512m
    depends_on:
      - mqtt-broker

  # ============================================
  # Buffer Manager (Upload Queue)
  # ============================================
  buffer-manager:
    build:
      context: .
      dockerfile: docker/Dockerfile.buffer-manager
    container_name: buffer-manager
    restart: unless-stopped
    volumes:
      - /data/buffer:/data/buffer
      - /data/config:/app/config:ro
      - /data/logs:/app/logs
    environment:
      - BUFFER_MAX_SIZE_GB=${BUFFER_MAX_SIZE_GB:-20}
      - BUFFER_RETENTION_HOURS=${BUFFER_RETENTION_HOURS:-72}
      - UPLOAD_ENDPOINT=${UPLOAD_ENDPOINT:-https://10.100.0.1/api/upload}
      - UPLOAD_MAX_RETRIES=${UPLOAD_MAX_RETRIES:-5}
      - UPLOAD_RETRY_BASE_DELAY=${UPLOAD_RETRY_BASE_DELAY:-5}
      - UPLOAD_CHUNK_SIZE_MB=${UPLOAD_CHUNK_SIZE_MB:-10}
      - VPN_ENABLED=${VPN_ENABLED:-true}
      - LOG_LEVEL=${LOG_LEVEL:-INFO}
    networks:
      - edge-internal
      - vpn-net
    cpus: "1.0"
    mem_limit: 256m
    depends_on:
      - stream-manager

  # ============================================
  # MQTT Broker (Internal Messaging)
  # ============================================
  mqtt-broker:
    image: eclipse-mosquitto:2.0.18
    container_name: mqtt-broker
    restart: unless-stopped
    ports:
      - "127.0.0.1:1883:1883"   # Local only — not exposed externally
    volumes:
      - ./config/mosquitto.conf:/mosquitto/config/mosquitto.conf:ro
      - /data/mqtt/data:/mosquitto/data
      - /data/mqtt/logs:/mosquitto/log
    networks:
      - edge-internal
    cpus: "0.5"
    mem_limit: 128m

  # ============================================
  # VPN Client (WireGuard)
  # ============================================
  vpn-client:
    image: lscr.io/linuxserver/wireguard:latest
    container_name: vpn-client
    restart: unless-stopped
    cap_add:
      - NET_ADMIN
      - SYS_MODULE
    environment:
      - PUID=1000
      - PGID=1000
    volumes:
      - ./config/wireguard:/config/wg0.conf:ro
      - /lib/modules:/lib/modules:ro
    sysctls:
      - net.ipv4.conf.all.src_valid_mark=1
      - net.ipv4.ip_forward=1
    networks:
      - vpn-net
    cpus: "0.5"
    mem_limit: 128m

  # ============================================
  # Prometheus (Metrics)
  # ============================================
  prometheus:
    image: prom/prometheus:v2.50.0
    container_name: prometheus
    restart: unless-stopped
    ports:
      - "127.0.0.1:9090:9090"
    volumes:
      - ./config/prometheus.yml:/etc/prometheus/prometheus.yml:ro
      - /data/prometheus:/prometheus
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
      - '--storage.tsdb.path=/prometheus'
      - '--web.console.libraries=/etc/prometheus/console_libraries'
      - '--web.console.templates=/etc/prometheus/consoles'
      - '--storage.tsdb.retention.time=7d'
    networks:
      - edge-internal
    cpus: "0.5"
    mem_limit: 256m

  # ============================================
  # Node Exporter (Host Metrics)
  # ============================================
  node-exporter:
    image: prom/node-exporter:v1.7.0
    container_name: node-exporter
    restart: unless-stopped
    volumes:
      - /proc:/host/proc:ro
      - /sys:/host/sys:ro
      - /:/rootfs:ro
    command:
      - '--path.procfs=/host/proc'
      - '--path.rootfs=/rootfs'
      - '--path.sysfs=/host/sys'
      - '--collector.filesystem.mount-points-exclude=^/(sys|proc|dev|host|etc)($$|/)'
    networks:
      - edge-internal
    cpus: "0.25"
    mem_limit: 64m

networks:
  edge-internal:
    driver: bridge
    ipam:
      config:
        - subnet: 172.28.0.0/16
  vpn-net:
    driver: bridge
    internal: true  # Only containers can access this network
```

### 8.4 Dockerfiles

#### 8.4.1 Stream Manager Dockerfile

```dockerfile
# docker/Dockerfile.stream-manager
FROM python:3.11-slim-bookworm AS base

# Install system dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
    curl \
    ffmpeg \
    libsm6 \
    libxext6 \
    libgl1-mesa-glx \
    && rm -rf /var/lib/apt/lists/*

WORKDIR /app

# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY src/ ./src/

# Create non-root user
RUN useradd -m -u 1000 appuser && \
    mkdir -p /data/hls /data/buffer /data/logs /app/logs && \
    chown -R appuser:appuser /app /data

USER appuser

# Expose API port
EXPOSE 8080

# Health check
HEALTHCHECK --interval=30s --timeout=5s --start-period=30s --retries=3 \
    CMD curl -f http://localhost:8080/api/v1/status || exit 1

# Run the application
CMD ["python", "-m", "src.main"]
```

#### 8.4.2 Frame Extractor Dockerfile

```dockerfile
# docker/Dockerfile.frame-extractor
FROM python:3.11-slim-bookworm

RUN apt-get update && apt-get install -y --no-install-recommends \
    ffmpeg \
    libsm6 \
    libxext6 \
    libgl1-mesa-glx \
    libgomp1 \
    && rm -rf /var/lib/apt/lists/*

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY src/processing/ ./src/processing/
COPY src/mqtt/ ./src/mqtt/
COPY src/core/ ./src/core/
COPY src/utils/ ./src/utils/

RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser

CMD ["python", "-m", "src.processing.frame_extractor"]
```

#### 8.4.3 Buffer Manager Dockerfile

```dockerfile
# docker/Dockerfile.buffer-manager
FROM python:3.11-slim-bookworm

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY src/storage/ ./src/storage/
COPY src/core/ ./src/core/
COPY src/utils/ ./src/utils/

RUN useradd -m -u 1000 appuser && \
    mkdir -p /data/buffer && \
    chown -R appuser:appuser /app /data
USER appuser

CMD ["python", "-m", "src.storage.buffer_manager"]
```

### 8.5 Requirements.txt

```txt
# Core Framework
fastapi==0.110.0
uvicorn[standard]==0.27.1
python-multipart==0.0.9
websockets==12.0

# Async
aiohttp==3.9.3
aiofiles==23.2.1
aiortc==1.8.0

# MQTT
paho-mqtt==1.6.1

# Computer Vision
opencv-python-headless==4.9.0.84
numpy==1.26.4
Pillow==10.2.0

# ONVIF
onvif-zeep==0.2.12

# Configuration
PyYAML==6.0.1
python-dotenv==1.0.1

# Data Validation
pydantic==2.6.3
pydantic-settings==2.2.1

# Monitoring
prometheus-client==0.20.0

# Testing
pytest==8.0.2
pytest-asyncio==0.23.5
pytest-cov==4.1.0
httpx==0.27.0

# Utilities
httpx==0.27.0
tenacity==8.2.3
structlog==24.1.0
orjson==3.9.15
```

### 8.6 Configuration Files

#### 8.6.1 Camera Configuration (cameras.yaml)

```yaml
# config/cameras.yaml
cameras:
  - channel: 1
    name: "Gate/Entry"
    enabled: true
    primary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0"
    secondary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=1"
    use_primary: false
    ai_enabled: true
    ai_fps: 5.0
    ai_resolution: "640x480"
    ai_mode: "hybrid"
    recording_enabled: true
    pre_event_seconds: 10
    post_event_seconds: 30
    tags:
      location: "main_gate"
      criticality: "high"

  - channel: 2
    name: "Production Floor A"
    enabled: true
    primary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=2&subtype=0"
    secondary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=2&subtype=1"
    use_primary: false
    ai_enabled: true
    ai_fps: 5.0
    ai_resolution: "640x480"
    ai_mode: "hybrid"
    recording_enabled: true
    pre_event_seconds: 10
    post_event_seconds: 30
    tags:
      location: "production_a"
      criticality: "high"

  - channel: 3
    name: "Production Floor B"
    enabled: true
    primary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=3&subtype=0"
    secondary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=3&subtype=1"
    use_primary: false
    ai_enabled: true
    ai_fps: 5.0
    ai_resolution: "640x480"
    ai_mode: "hybrid"
    recording_enabled: true
    pre_event_seconds: 10
    post_event_seconds: 30
    tags:
      location: "production_b"
      criticality: "medium"

  - channel: 4
    name: "Warehouse"
    enabled: true
    primary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=4&subtype=0"
    secondary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=4&subtype=1"
    use_primary: false
    ai_enabled: true
    ai_fps: 5.0
    ai_resolution: "640x480"
    ai_mode: "hybrid"
    recording_enabled: true
    pre_event_seconds: 10
    post_event_seconds: 30
    tags:
      location: "warehouse"
      criticality: "medium"

  - channel: 5
    name: "Loading Dock"
    enabled: true
    primary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=5&subtype=0"
    secondary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=5&subtype=1"
    use_primary: false
    ai_enabled: true
    ai_fps: 5.0
    ai_resolution: "640x480"
    ai_mode: "hybrid"
    recording_enabled: true
    pre_event_seconds: 10
    post_event_seconds: 30
    tags:
      location: "loading_dock"
      criticality: "medium"

  - channel: 6
    name: "Office Corridor"
    enabled: true
    primary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=6&subtype=0"
    secondary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=6&subtype=1"
    use_primary: false
    ai_enabled: true
    ai_fps: 5.0
    ai_resolution: "640x480"
    ai_mode: "hybrid"
    recording_enabled: false
    pre_event_seconds: 10
    post_event_seconds: 30
    tags:
      location: "office"
      criticality: "low"

  - channel: 7
    name: "Server Room"
    enabled: true
    primary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=7&subtype=0"
    secondary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=7&subtype=1"
    use_primary: false
    ai_enabled: true
    ai_fps: 5.0
    ai_resolution: "640x480"
    ai_mode: "hybrid"
    recording_enabled: true
    pre_event_seconds: 15
    post_event_seconds: 45
    tags:
      location: "server_room"
      criticality: "high"

  - channel: 8
    name: "Perimeter"
    enabled: true
    primary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=8&subtype=0"
    secondary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=8&subtype=1"
    use_primary: false
    ai_enabled: true
    ai_fps: 5.0
    ai_resolution: "640x480"
    ai_mode: "hybrid"
    recording_enabled: true
    pre_event_seconds: 10
    post_event_seconds: 30
    tags:
      location: "perimeter"
      criticality: "high"

settings:
  default_ai_fps: 5.0
  default_inference_resolution: "640x480"
  default_ai_mode: "hybrid"
  stream_stall_timeout: 10
  reconnect_max_delay: 30
  circuit_breaker:
    failure_threshold: 5
    recovery_timeout: 60
    half_open_max_calls: 3
    success_threshold: 2
```

#### 8.6.2 Gateway Configuration (gateway.yaml)

```yaml
# config/gateway.yaml
gateway:
  id: "edge-ind-01"
  location: "Plant Floor A"
  description: "Edge gateway for CP PLUS DVR surveillance"

network:
  lan_interface: "eth0"
  lan_ip: "192.168.29.10"
  subnet: "255.255.255.0"
  gateway: "192.168.29.1"
  dns:
    - "8.8.8.8"
    - "1.1.1.1"

vpn:
  enabled: true
  type: "wireguard"
  tunnel_ip: "10.100.0.10"
  server_endpoint: "cloud-gateway.example.com:51820"
  persistent_keepalive: 25
  mtu: 1400

dvr:
  host: "192.168.29.200"
  rtsp_port: 554
  http_port: 80
  https_port: 443
  username: "admin"
  password: "${DVR_PASSWORD}"  # From environment
  channels: 8
  onvif_enabled: true

streams:
  transport: "tcp"
  buffer_size: 65536
  stall_timeout: 10
  reconnect:
    initial_delay: 1.0
    max_delay: 30.0
    multiplier: 2.0
    jitter: 0.1

ai:
  inference_fps: 5.0
  keyframe_only: false
  mode: "hybrid"
  resolution: "640x480"
  confidence_threshold: 0.6
  mqtt_topic_prefix: "ai/frames"

hls:
  segment_duration: 2
  window_size: 5
  output_dir: "/data/hls"
  cleanup_interval: 60

storage:
  buffer_max_size_gb: 20
  buffer_retention_hours: 72
  hls_cache_max_size_gb: 5
  log_max_size_gb: 1
  upload:
    endpoint: "https://10.100.0.1/api/upload"
    max_retries: 5
    retry_base_delay: 5
    max_retry_delay: 300
    concurrent_uploads: 3
    verify_ssl: true

monitoring:
  health_check_interval: 30
  metrics_port: 9090
  web_ui_port: 8080
  log_level: "INFO"
  log_format: "json"

security:
  api_auth_enabled: true
  jwt_secret: "${JWT_SECRET}"
  jwt_expiry_hours: 24
  allowed_dashboard_hosts:
    - "https://dashboard.example.com"
```

#### 8.6.3 Mosquitto MQTT Configuration

```conf
# config/mosquitto.conf
# MQTT Broker for internal edge communication

listener 1883 0.0.0.0
protocol mqtt

# Persistence
persistence true
persistence_location /mosquitto/data/

# Logging
log_dest file /mosquitto/log/mosquitto.log
log_type error
log_type warning
log_type information
connection_messages true

# Security — local network only
allow_anonymous true
# In production, use password_file and ACL
# password_file /mosquitto/config/passwd
# acl_file /mosquitto/config/acl

# Limits
max_connections 100
message_size_limit 10485760  # 10MB max (for image frames)
max_inflight_messages 100
max_queued_messages 1000
```

#### 8.6.4 Prometheus Configuration

```yaml
# config/prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

scrape_configs:
  - job_name: 'prometheus'
    static_configs:
      - targets: ['localhost:9090']

  - job_name: 'node-exporter'
    static_configs:
      - targets: ['node-exporter:9100']

  - job_name: 'stream-manager'
    static_configs:
      - targets: ['stream-manager:8080']
    metrics_path: '/metrics'
```

### 8.7 Environment File (.env)

```bash
# .env — Environment variables for Docker Compose
# Copy to .env and fill in sensitive values

# Gateway Identity
GATEWAY_ID=edge-ind-01
LOG_LEVEL=INFO

# DVR Credentials (REQUIRED)
DVR_USERNAME=admin
DVR_PASSWORD=your_dvr_password_here

# AI Pipeline
AI_INFERENCE_FPS=5
AI_KEYFRAME_ONLY=true
AI_RESOLUTION=640x480

# Cloud Upload
UPLOAD_ENDPOINT=https://10.100.0.1/api/upload
UPLOAD_MAX_RETRIES=5

# VPN
VPN_ENABLED=true

# Buffer
BUFFER_MAX_SIZE_GB=20
BUFFER_RETENTION_HOURS=72

# JWT Secret for API Auth
JWT_SECRET=change_this_to_a_random_32_char_string
```

### 8.8 Main Application Entry Point

```python
# src/main.py
import asyncio
import logging
import signal
import sys
from contextlib import asynccontextmanager

import uvicorn
from fastapi import FastAPI

from .api.server import create_app
from .core.config import GatewayConfig
from .core.logging_setup import setup_logging
from .streams.manager import StreamManager, StreamManagerConfig
from .streams.registry import CameraRegistry
from .streams.ffmpeg_builder import FFmpegCommandBuilder
from .streams.connector import ReconnectConfig
from .streams.circuit_breaker import CircuitBreakerConfig
from .health.monitor import HealthMonitor
from .health.metrics import MetricsCollector

logger = logging.getLogger(__name__)

class EdgeGateway:
    """Main application class — initializes and coordinates all services."""

    def __init__(self):
        self.config = GatewayConfig.load()
        setup_logging(self.config.log_level, self.config.log_format)
        self.registry = CameraRegistry(self.config.camera_config_path)
        self.stream_manager: Optional[StreamManager] = None
        self.health_monitor: Optional[HealthMonitor] = None
        self.metrics = MetricsCollector()
        self._shutdown_event = asyncio.Event()

    async def initialize(self):
        """Initialize all services."""
        logger.info(f"Starting Edge Gateway: {self.config.gateway_id}")

        # Load camera configuration
        await self.registry.load()

        # Build FFmpeg command builder
        ffmpeg_builder = FFmpegCommandBuilder()

        # Configure stream manager
        manager_config = StreamManagerConfig(
            reconnect=ReconnectConfig(
                initial_delay=self.config.reconnect_initial_delay,
                max_delay=self.config.reconnect_max_delay,
                stall_timeout=self.config.stream_stall_timeout,
            ),
            circuit_breaker=CircuitBreakerConfig(
                failure_threshold=self.config.cb_failure_threshold,
                recovery_timeout=self.config.cb_recovery_timeout,
            ),
            ffmpeg=ffmpeg_builder,
        )

        self.stream_manager = StreamManager(
            config=manager_config,
            registry=self.registry,
            metrics=self.metrics,
        )

        self.health_monitor = HealthMonitor(
            gateway_id=self.config.gateway_id,
            dvr_host=self.config.dvr_host,
            vpn_endpoint=self.config.vpn_server_ip,
            check_interval=self.config.health_check_interval,
        )

        # Wire up health callbacks
        self.health_monitor.on_health_change(self._on_health_change)

        # Start services
        await self.stream_manager.start()
        await self.health_monitor.start()

        logger.info("Edge Gateway initialized successfully")

    def _on_health_change(self, report):
        """Handle health status changes."""
        if report.overall_status == "critical":
            logger.critical(f"Gateway health CRITICAL: {report.issues}")
        elif report.overall_status == "degraded":
            logger.warning(f"Gateway health degraded: {report.issues}")

    async def run_api_server(self):
        """Run the FastAPI HTTP server."""
        app = create_app(self.stream_manager, self.health_monitor, self.config)

        config = uvicorn.Config(
            app=app,
            host="0.0.0.0",
            port=self.config.web_ui_port,
            log_level=self.config.log_level.lower(),
            access_log=True,
        )
        server = uvicorn.Server(config)
        await server.serve()

    async def run(self):
        """Main run loop."""
        await self.initialize()

        # Run API server and wait for shutdown
        try:
            await asyncio.gather(
                self.run_api_server(),
                self._shutdown_event.wait(),
                return_exceptions=True,
            )
        except asyncio.CancelledError:
            pass
        finally:
            await self.shutdown()

    async def shutdown(self):
        """Graceful shutdown."""
        logger.info("Shutting down Edge Gateway...")
        if self.stream_manager:
            await self.stream_manager.stop()
        if self.health_monitor:
            await self.health_monitor.stop()
        logger.info("Edge Gateway stopped")

    def handle_signal(self, sig):
        """Handle OS signals for graceful shutdown."""
        logger.info(f"Received signal {sig.name}")
        self._shutdown_event.set()


def main():
    gateway = EdgeGateway()

    # Register signal handlers
    loop = asyncio.get_event_loop()
    for sig in (signal.SIGTERM, signal.SIGINT):
        loop.add_signal_handler(sig, lambda s=sig: gateway.handle_signal(s))

    try:
        loop.run_until_complete(gateway.run())
    except KeyboardInterrupt:
        logger.info("Keyboard interrupt received")
    finally:
        loop.run_until_complete(gateway.shutdown())
        loop.close()


if __name__ == "__main__":
    main()
```

### 8.9 Startup Script

```bash
#!/bin/bash
# scripts/start.sh — Edge Gateway startup script

set -euo pipefail

SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
PROJECT_DIR="$(dirname "$SCRIPT_DIR")"
ENV_FILE="${PROJECT_DIR}/.env"

# Colors
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m'

echo "========================================"
echo "  Edge Gateway Startup"
echo "========================================"

# Check environment file
if [[ ! -f "$ENV_FILE" ]]; then
    echo -e "${RED}ERROR: .env file not found at ${ENV_FILE}${NC}"
    echo "Please copy .env.example to .env and fill in credentials"
    exit 1
fi

# Load environment
set -a
source "$ENV_FILE"
set +a

# Validate required variables
if [[ -z "${DVR_PASSWORD:-}" ]]; then
    echo -e "${RED}ERROR: DVR_PASSWORD is not set in .env${NC}"
    exit 1
fi

# Create data directories
echo "Creating data directories..."
mkdir -p /data/{hls,buffer/alert_clips,buffer/continuous,buffer/pre_event,logs,config,mqtt,prometheus}
chmod 755 /data/*

# Check DVR connectivity
echo "Checking DVR connectivity (${DVR_HOST:-192.168.29.200})..."
if ping -c 1 -W 2 "${DVR_HOST:-192.168.29.200}" > /dev/null 2>&1; then
    echo -e "${GREEN}DVR is reachable${NC}"
else
    echo -e "${YELLOW}WARNING: DVR is not reachable — streams will fail${NC}"
fi

# Check Docker
echo "Checking Docker..."
if ! docker info > /dev/null 2>&1; then
    echo -e "${RED}ERROR: Docker is not running${NC}"
    exit 1
fi

# Pull latest images
echo "Pulling Docker images..."
docker compose -f "${PROJECT_DIR}/docker-compose.yml" pull

# Start services
echo "Starting Edge Gateway services..."
docker compose -f "${PROJECT_DIR}/docker-compose.yml" up -d --remove-orphans

# Wait for services
echo "Waiting for services to start..."
sleep 10

# Health check
echo "Running health check..."
MAX_RETRIES=12
RETRY=0
while [[ $RETRY -lt $MAX_RETRIES ]]; do
    if curl -sf http://localhost:8080/api/v1/status > /dev/null 2>&1; then
        echo -e "${GREEN}Edge Gateway is healthy!${NC}"
        echo ""
        echo "Diagnostic UI: http://localhost:8080/"
        echo "API Status:    http://localhost:8080/api/v1/status"
        echo "HLS Streams:   http://localhost:8081/hls/ch{N}.m3u8"
        exit 0
    fi
    RETRY=$((RETRY + 1))
    echo "  Retry ${RETRY}/${MAX_RETRIES}..."
    sleep 5
done

echo -e "${RED}ERROR: Gateway failed health check${NC}"
echo "Check logs: docker compose logs stream-manager"
exit 1
```



---

## 9. Appendix

### 9.1 Complete FFmpeg Command Reference

#### A. Single-Channel Operations

```bash
# A1. Probe stream information (codec, resolution, bitrate)
ffmpeg -rtsp_transport tcp -stimeout 5000000 \
  -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
  -v quiet -print_format json -show_format -show_streams 2>/dev/null

# A2. Save single-frame snapshot
ffmpeg -rtsp_transport tcp -stimeout 5000000 \
  -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
  -vf "select='eq(n,0)'" -vframes 1 -q:v 2 \
  "/data/snapshots/ch1_$(date +%Y%m%d_%H%M%S).jpg"

# A3. Record 60-second clip to MP4
ffmpeg -rtsp_transport tcp -stimeout 5000000 -t 60 \
  -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
  -c:v copy -c:a copy -movflags +faststart \
  "/data/clips/ch1_$(date +%Y%m%d_%H%M%S).mp4"

# A4. Transcode to H.264 for cloud compatibility
ffmpeg -rtsp_transport tcp -stimeout 5000000 \
  -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
  -c:v libx264 -preset fast -tune zerolatency -crf 23 \
  -maxrate 2000k -bufsize 4000k -g 50 \
  -c:a aac -b:a 128k \
  -f mp4 -movflags +faststart \
  "/data/clips/ch1_transcoded_$(date +%Y%m%d_%H%M%S).mp4"

# A5. Extract audio only
ffmpeg -rtsp_transport tcp -stimeout 5000000 \
  -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
  -vn -c:a aac -b:a 128k \
  "/data/audio/ch1_$(date +%Y%m%d_%H%M%S).aac"

# A6. Stream to RTMP relay (e.g., YouTube, custom server)
ffmpeg -rtsp_transport tcp -stimeout 5000000 \
  -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
  -c:v copy -c:a aac -b:a 128k -ar 44100 \
  -f flv "rtmp://relay-server.example.com/live/ch1"

# A7. Generate thumbnail grid (contact sheet)
ffmpeg -rtsp_transport tcp -stimeout 5000000 -t 60 \
  -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
  -vf "fps=1/5,scale=320:240,tile=4x3" -vframes 1 \
  "/data/snapshots/ch1_contact.jpg"

# A8. Motion detection using FFmpeg scene filter
ffmpeg -rtsp_transport tcp -stimeout 5000000 \
  -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
  -vf "select='gt(scene,0.1)',scale=640:480" -f image2pipe -vcodec mjpeg - \
  > /dev/null 2>&1 | grep -o 'scene:[0-9.]*' | head -20

# A9. Low-latency WebRTC-compatible output
ffmpeg -rtsp_transport tcp -stimeout 5000000 \
  -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
  -c:v libx264 -preset ultrafast -tune zerolatency -profile:v baseline \
  -vf "scale=640:480" -b:v 1000k -g 30 \
  -c:a opus -b:a 64k \
  -f webm pipe:1

# A10. MJPEG stream for simple HTTP embedding
ffmpeg -rtsp_transport tcp -stimeout 5000000 \
  -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
  -c:v mjpeg -q:v 5 -vf "fps=15,scale=640:480" \
  -f mpjpeg -boundary_tag ffmpeg \
  "http://localhost:8081/mjpeg/ch1"
```

#### B. Multi-Channel Operations

```bash
# B1. Simultaneous 4-channel recording
for CH in 1 2 3 4; do
  ffmpeg -rtsp_transport tcp -stimeout 5000000 \
    -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=${CH}&subtype=0" \
    -c:v copy -c:a copy -t 300 \
    -movflags +faststart \
    "/data/clips/ch${CH}_$(date +%Y%m%d_%H%M%S).mp4" &
done
wait

# B2. 8-channel status check (probe all streams)
for CH in $(seq 1 8); do
  echo "=== Channel ${CH} ==="
  ffprobe -v error -rtsp_transport tcp -stimeout 3000000 \
    -select_streams v:0 \
    -show_entries stream=codec_name,width,height,r_frame_rate,bit_rate \
    -of csv=s=,:p=0 \
    "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=${CH}&subtype=0" \
    2>/dev/null || echo "  UNREACHABLE"
done

# B3. Mosaic view (all 8 channels in 4x2 grid)
ffmpeg \
  -rtsp_transport tcp -stimeout 5000000 -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=1" \
  -rtsp_transport tcp -stimeout 5000000 -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=2&subtype=1" \
  -rtsp_transport tcp -stimeout 5000000 -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=3&subtype=1" \
  -rtsp_transport tcp -stimeout 5000000 -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=4&subtype=1" \
  -rtsp_transport tcp -stimeout 5000000 -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=5&subtype=1" \
  -rtsp_transport tcp -stimeout 5000000 -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=6&subtype=1" \
  -rtsp_transport tcp -stimeout 5000000 -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=7&subtype=1" \
  -rtsp_transport tcp -stimeout 5000000 -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=8&subtype=1" \
  -filter_complex "
    [0:v]scale=480:270[v0]; [1:v]scale=480:270[v1];
    [2:v]scale=480:270[v2]; [3:v]scale=480:270[v3];
    [4:v]scale=480:270[v4]; [5:v]scale=480:270[v5];
    [6:v]scale=480:270[v6]; [7:v]scale=480:270[v7];
    [v0][v1][v2][v3]hstack=inputs=4[row1];
    [v4][v5][v6][v7]hstack=inputs=4[row2];
    [row1][row2]vstack=inputs=2[out]
  " \
  -map "[out]" -c:v libx264 -preset fast -b:v 4000k -f hls \
  -hls_time 2 -hls_list_size 5 -hls_flags delete_segments \
  "/data/hls/mosaic.m3u8"

# B4. Batch transcode all channels to H.264
for CH in $(seq 1 8); do
  ffmpeg -rtsp_transport tcp -stimeout 5000000 -t 60 \
    -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=${CH}&subtype=0" \
    -c:v libx264 -preset fast -crf 28 -vf "scale=640:480" \
    -c:a aac -b:a 64k -ac 1 \
    -movflags +faststart \
    "/data/clips/ch${CH}_compact.mp4" &
done
wait
echo "All channels transcoded"
```

#### C. HLS-Specific Operations

```bash
# C1. HLS with AES-128 encryption
ffmpeg -rtsp_transport tcp -stimeout 5000000 \
  -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
  -c:v copy -c:a copy \
  -f hls -hls_time 2 -hls_list_size 5 \
  -hls_key_info_file "/data/config/hls_key_info.txt" \
  -hls_segment_filename "/data/hls/ch1_enc_%03d.ts" \
  "/data/hls/ch1_enc.m3u8"

# C2. HLS with independent segments (for CDN compatibility)
ffmpeg -rtsp_transport tcp -stimeout 5000000 \
  -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
  -c:v copy -c:a copy \
  -f hls -hls_time 2 -hls_list_size 5 \
  -hls_flags independent_segments+delete_segments+omit_endlist \
  -hls_segment_filename "/data/hls/ch1_%03d.ts" \
  "/data/hls/ch1.m3u8"

# C3. Multi-variant HLS (adaptive bitrate)
ffmpeg -rtsp_transport tcp -stimeout 5000000 \
  -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
  -filter_complex "[0:v]split=2[high][low];[low]scale=640:480[lowv]" \
  -map "[high]" -map 0:a -c:v:0 copy -c:a:0 copy \
  -map "[lowv]" -c:v:1 libx264 -preset fast -b:v:1 800k -c:a:1 aac -b:a:1 64k \
  -var_stream_map "v:0,a:0,name:high v:1,a:1,name:low" \
  -f hls -hls_time 2 -hls_list_size 5 \
  -hls_flags delete_segments+omit_endlist \
  -master_pl_name "ch1_master.m3u8" \
  -hls_segment_filename "/data/hls/ch1_%v_%03d.ts" \
  "/data/hls/ch1_%v.m3u8"
```

### 9.2 API Endpoint Reference

| Method | Endpoint | Description | Auth |
|---|---|---|---|
| `GET` | `/` | Diagnostic HTML UI | No |
| `GET` | `/api/v1/status` | Full gateway status | No |
| `GET` | `/api/v1/cameras` | List all cameras | Yes |
| `GET` | `/api/v1/cameras/{ch}` | Get camera config | Yes |
| `PUT` | `/api/v1/cameras/{ch}` | Update camera config | Yes |
| `POST` | `/api/v1/cameras/{ch}/restart` | Restart stream | Yes |
| `POST` | `/api/v1/cameras/{ch}/enable` | Enable camera | Yes |
| `POST` | `/api/v1/cameras/{ch}/disable` | Disable camera | Yes |
| `GET` | `/api/v1/streams/{ch}/status` | Stream status detail | Yes |
| `GET` | `/api/v1/streams/{ch}/stats` | Stream statistics | Yes |
| `POST` | `/api/v1/streams/{ch}/start` | Start stream | Yes |
| `POST` | `/api/v1/streams/{ch}/stop` | Stop stream | Yes |
| `GET` | `/api/v1/hls/{ch}.m3u8` | HLS playlist | No* |
| `GET` | `/api/v1/hls/{segment}.ts` | HLS segment | No* |
| `GET` | `/api/v1/buffer/stats` | Buffer statistics | Yes |
| `GET` | `/api/v1/buffer/uploads/pending` | Pending uploads | Yes |
| `POST` | `/api/v1/buffer/uploads/{id}/retry` | Retry upload | Yes |
| `GET` | `/api/v1/health` | Health check (simple) | No |
| `GET` | `/api/v1/health/detailed` | Detailed health | Yes |
| `GET` | `/api/v1/metrics` | Prometheus metrics | No |
| `WS` | `/ws/live` | Real-time status WebSocket | Yes |

*HLS endpoints use token-based URL signing for security.

### 9.3 MQTT Topic Structure

| Topic | Direction | Payload | Description |
|---|---|---|---|
| `ai/frames/{channel}` | Edge → Cloud | JPEG bytes + metadata | AI inference frames |
| `ai/detections/{channel}` | Edge → Cloud | JSON | Detection results |
| `ai/events/{channel}` | Edge → Cloud | JSON | Triggered events |
| `streams/status/{channel}` | Edge → Cloud | JSON | Stream health status |
| `gateway/health` | Edge → Cloud | JSON | Gateway health report |
| `gateway/metrics` | Edge → Cloud | JSON | Resource metrics |
| `control/cameras/{channel}/restart` | Cloud → Edge | JSON | Restart command |
| `control/cameras/{channel}/config` | Cloud → Edge | JSON | Config update |
| `control/gateway/reboot` | Cloud → Edge | Empty | Gateway reboot |
| `alerts/critical` | Edge → Cloud | JSON | Critical alerts |
| `buffer/uploads/status` | Edge → Cloud | JSON | Upload progress |

### 9.4 Failure Mode Matrix

| Failure | Detection | Response | Fallback | Recovery |
|---|---|---|---|---|
| DVR network unreachable | ICMP ping fails | Mark all streams OFFLINE | Use cached last frame | Auto-retry every 30s |
| Single channel RTSP fail | No frames for 10s | Exponential backoff reconnect | Circuit breaker opens | Auto-recovery test |
| All 8 channels fail | Mass disconnect | Alert to cloud, enter degraded mode | Static placeholder | DVR power cycle suggestion |
| VPN tunnel down | WG handshake fail | Queue uploads locally | Local-only mode | Auto-reconnect WG |
| Disk full (>95%) | df monitor | Emergency cleanup | Drop oldest clips | Alert + cleanup |
| FFmpeg crash | Process exit code | Restart container | Retry with delay | Docker auto-restart |
| MQTT broker down | Connection timeout | Buffer messages in memory | File-based queuing | Auto-reconnect |
| High CPU usage | >90% for 60s | Reduce AI FPS | Skip non-key frames | Auto-scale down |
| Memory pressure | >90% usage | Drop frames, OOM protection | Reduce buffer sizes | Kernel OOM killer |
| Cloud upload fail | HTTP 5xx | Retry with backoff | Local retention | Infinite retry |
| SSL certificate expiry | TLS handshake fail | Log warning | Insecure fallback (dev only) | Auto-renewal |
| Power loss | Hardware event | UPS graceful shutdown | None (hardware) | Auto-start on boot |

### 9.5 Network Port Reference

| Port | Protocol | Service | Direction | Notes |
|---|---|---|---|---|
| 80/tcp | HTTP | DVR Web UI | Edge → DVR | Local only |
| 443/tcp | HTTPS | DVR Web UI | Edge → DVR | Local only |
| 554/tcp | RTSP | Video streams | Edge → DVR | Primary protocol |
| 554/udp | RTSP/UDP | Video streams | Edge → DVR | Fallback |
| 25001/tcp | TCP | DVR control | Edge → DVR | CP PLUS protocol |
| 25002/udp | UDP | DVR control | Edge → DVR | CP PLUS protocol |
| 123/udp | NTP | Time sync | Edge → Gateway | Or local NTP |
| 8080/tcp | HTTP | Gateway API | External → Edge | Diagnostic UI |
| 8081/tcp | HTTP | HLS server | External → Edge | Video segments |
| 1883/tcp | MQTT | Message broker | Internal | Local only |
| 51820/udp | WireGuard | VPN tunnel | Edge ↔ Cloud | Encrypted |
| 9090/tcp | HTTP | Prometheus | Internal | Metrics |
| 9100/tcp | HTTP | Node Exporter | Internal | Host metrics |

### 9.6 Performance Benchmarks (Expected)

| Metric | Intel NUC i5 | RPi 5 | Jetson Orin Nano |
|---|---|---|---|
| 8x RTSP ingest (sub) | <5% CPU | <15% CPU | <10% CPU |
| 8x RTSP ingest (main) | <10% CPU | <30% CPU | <15% CPU |
| HLS segment generation | <5% CPU | <10% CPU | <5% CPU |
| AI frame extraction (8ch) | <20% CPU | <50% CPU* | <10% CPU (GPU) |
| Upload throughput | 50+ Mbps | 20+ Mbps | 30+ Mbps |
| End-to-end latency | 500-1000ms | 800-1500ms | 600-1200ms |
| Power consumption | 15-25W | 5-8W | 7-15W |

*RPi 5 may struggle with 8-channel simultaneous AI extraction; consider 4-channel max.

### 9.7 Security Checklist

- [ ] DVR password changed from default
- [ ] WireGuard PSK configured
- [ ] VPN uses strong crypto (Curve25519, ChaCha20-Poly1305)
- [ ] API uses JWT authentication
- [ ] HLS URLs use signed tokens (time-limited)
- [ ] DVR Web UI not exposed externally
- [ ] MQTT broker bound to localhost only
- [ ] Docker containers run as non-root
- [ ] Secrets in `.env`, not in code
- [ ] `.env` file permissions: 600
- [ ] SSL certificates for API HTTPS
- [ ] Log rotation configured
- [ ] Fail2ban for API brute force protection
- [ ] Network segmentation (DVR on isolated VLAN)
- [ ] Regular firmware updates for DVR
- [ ] Backup of gateway configuration

### 9.8 Glossary

| Term | Definition |
|---|---|
| **DVR** | Digital Video Recorder — CP PLUS CP-UVR-0801E1-CV2 |
| **UVR** | Universal Video Recorder (supports analog + IP cameras) |
| **RTSP** | Real-Time Streaming Protocol (RFC 7826) |
| **ONVIF** | Open Network Video Interface Forum standard |
| **HLS** | HTTP Live Streaming (Apple) |
| **DASH** | Dynamic Adaptive Streaming over HTTP |
| **WebRTC** | Web Real-Time Communication |
| **GOP** | Group of Pictures (I-frame + P-frames + B-frames) |
| **CRF** | Constant Rate Factor (quality-based encoding) |
| **DSCP** | Differentiated Services Code Point (QoS marking) |
| **MTU** | Maximum Transmission Unit |
| **EWMA** | Exponentially Weighted Moving Average |
| **Circuit Breaker** | Fault tolerance pattern for failing services |
| **Ring Buffer** | Circular fixed-size buffer overwriting oldest data |
| **Keyframe/I-frame** | Intra-coded frame (full image, no reference) |
| **P-frame** | Predicted frame (references previous frame) |
| **MJPEG** | Motion JPEG (sequence of JPEG images) |
| **WireGuard** | Modern VPN protocol using UDP |

---

*Document version 1.0.0 — Video Ingestion Subsystem Design for CP PLUS ORANGE Series DVR*
*Generated for AI-powered industrial surveillance platform*
