Video Ingestion

Video Ingestion

RTSP, edge gateway, buffering, and live stream handling.

AI-Powered Industrial Surveillance Platform — Video Ingestion Subsystem Design

Document Information

Attribute Value
Document Version 1.0.0
Target DVR CP PLUS ORANGE CP-UVR-0801E1-CV2
DVR Hardware V1.0, System V4.001.00AT001.3.0
Channels 8 active
Resolution 960x1080 per channel
DVR Network 192.168.29.200/24, Static
ONVIF V2.6.1.867657 (Server V19.06)
Date 2025
Classification Technical Specification — Video Ingestion Subsystem

Table of Contents

  1. System Architecture Overview
  2. RTSP Stream Configuration
  3. Edge Gateway Design
  4. Stream Discovery & Management
  5. Stream Processing Pipeline
  6. Live Streaming for Dashboard
  7. Edge Buffering & Local Storage
  8. Implementation Specifications
  9. Appendix

1. System Architecture Overview

1.1 High-Level Topology

+-----------------------------------------------------------------------------+
|                              CLOUD VPC (VPN TUNNEL)                          |
|  +------------------+  +------------------+  +---------------------------+  |
|  | Stream Relay     |  | AI Inference     |  | Dashboard / NVR           |  |
|  | (HLS/DASH/WebRTC)|  | Service          |  | Web UI                    |  |
|  | Port: 443        |  | GPU Cluster      |  | React/Vue Frontend        |  |
|  +--------+---------+  +--------+---------+  +---------------------------+  |
|           |                     |                                            |
|  +--------+---------+           |                                            |
|  | Metadata Bus     |           | (WebSocket: frame metadata)               |
|  | MQTT/Kafka       |<----------+                                            |
|  | Port: 8883/9094  |                                                        |
|  +--------+---------+                                                        |
+----------|------------------------------------------------------------------+
           |
           | WireGuard / OpenVPN Tunnel (UDP 51820 or TCP 443)
           |
+----------v------------------------------------------------------------------+
|                         EDGE GATEWAY (Local Network)                         |
|  Hardware: Intel NUC 13 Pro / Raspberry Pi 5 / NVIDIA Jetson Nano            |
|  OS: Ubuntu 22.04 LTS (Server)                                             |
|  LAN IP: 192.168.29.10/24                                                   |
|                                                                              |
|  +------------------+  +------------------+  +---------------------------+  |
|  | Stream Manager   |  | FFmpeg Pipeline  |  | HLS Segmenter           |  |
|  | (Python/asyncio) |  | (8x RTSP feeds)  |  | (live streaming)        |  |
|  | Port: 8080       |  | RAM buffers      |  | Port: 8081              |  |
|  +--------+---------+  +--------+---------+  +---------------------------+  |
|           |                     |                                           |
|  +--------v---------+  +--------v---------+  +---------------------------+ |
|  | Health Monitor   |  | Frame Extractor  |  | Buffer Manager            | |
|  | & Watchdog       |  | (AI decimation)  |  | (ring buffer + upload)    | |
|  +------------------+  +--------+---------+  +---------------------------+ |
|                                 |                                            |
|           +---------------------v---------------------+                      |
|           |           VPN Client (WireGuard)          |                      |
|           +---------------------+---------------------+                      |
+-------------------------------|----------------------------------------------+
                                |
                                | RTSP over TCP/UDP
                                | ONVIF over HTTP
                                |
+-------------------------------v----------------------------------------------+
|                         CP PLUS DVR (192.168.29.200)                         |
|  +----+----+----+----+----+----+----+----+                                 |
|  |CH1 |CH2 |CH3 |CH4 |CH5 |CH6 |CH7 |CH8 |  8x 960x1080 @ 25fps         |
|  +----+----+----+----+----+----+----+----+                                 |
|  RTSP:554  ONVIF:80  TCP:25001  UDP:25002  HTTP:80  HTTPS:443             |
+------------------------------------------------------------------------------+

1.2 Data Flow Summary

[DVR RTSP Stream] --> [Edge Gateway] --> [FFmpeg Demux] --> [Multi-target routing]
                                                                   |
                                    +----------------+-------------+----------------+
                                    |                |             |                |
                              [HLS Segmenter]  [Frame Extractor] [Buffer Mgr]   [Live Relay]
                                    |                |             |                |
                              [Dashboard]      [AI Inference] [Cloud Upload]  [Cloud Stream]
                                    |                |             |                |
                              WebSocket      MQTT/Kafka    S3/MinIO      CDN/Relay

1.3 Network Specification

Parameter Value
DVR IP 192.168.29.200
DVR Subnet 255.255.255.0 (/24)
DVR Gateway 192.168.29.1
Edge Gateway LAN IP 192.168.29.10 (static)
VPN Tunnel Network 10.100.0.0/24 (WireGuard)
Cloud Relay Endpoint 10.100.0.1 (VPN server)
RTSP Transport TCP interleaved (preferred) / UDP
MTU 1400 (WireGuard tunnel)
QoS DSCP EF (46) for RTSP, DSCP AF41 (34) for HLS

1.4 Bandwidth Budget (960x1080 @ 25fps)

Stream Type Codec Expected Bitrate 8-Channel Total
Main Stream (primary) H.265 2-4 Mbps 16-32 Mbps
Sub Stream (secondary) H.264 512-1024 Kbps 4-8 Mbps
AI Extraction Stream H.264 256-512 Kbps 2-4 Mbps
HLS Output (live) H.264 1-2 Mbps 8-16 Mbps
Total LAN Ingress ~30-60 Mbps
Cloud Egress (VPN) ~8-16 Mbps (optimized)

2. RTSP Stream Configuration

2.1 RTSP URL Patterns for CP PLUS ORANGE Series

CP PLUS ORANGE Series DVRs use a Dahua-compatible RTSP URL scheme. Two URL formats are supported:

Primary (Main Stream) — Full Resolution, Higher Bitrate

rtsp://admin:{PASSWORD}@192.168.29.200:554/cam/realmonitor?channel={N}&subtype=0

Secondary (Sub Stream) — Lower Resolution, Lower Bitrate

rtsp://admin:{PASSWORD}@192.168.29.200:554/cam/realmonitor?channel={N}&subtype=1

Alternative Format (Media URL with authentication in headers)

rtsp://192.168.29.200:554/cam/realmonitor?channel={N}&subtype={M}&unicast=true&proto=Onvif

Auth via DESCRIBE request Authorization: Digest username="admin", ... header.

2.2 Channel-to-URL Mapping

Channel Location Primary URL (subtype=0) Secondary URL (subtype=1)
CH1 Gate/Entry rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0 rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=1
CH2 Production Floor A rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=2&subtype=0 rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=2&subtype=1
CH3 Production Floor B rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=3&subtype=0 rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=3&subtype=1
CH4 Warehouse rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=4&subtype=0 rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=4&subtype=1
CH5 Loading Dock rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=5&subtype=0 rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=5&subtype=1
CH6 Office Corridor rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=6&subtype=0 rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=6&subtype=1
CH7 Server Room rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=7&subtype=0 rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=7&subtype=1
CH8 Perimeter rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=8&subtype=0 rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=8&subtype=1

2.3 Stream Usage Strategy

Purpose Stream Type Rationale
AI Inference / Frame Extraction Sub Stream (subtype=1) Lower bandwidth, sufficient for detection, reduces compute
Live Dashboard (grid view) Sub Stream (subtype=1) 8-camera grid needs lower per-stream bandwidth
Live Dashboard (fullscreen) Main Stream (subtype=0) Full quality for single-camera focus
Event Recording / Alert Clips Main Stream (subtype=0) Maximum quality for forensic review
Cloud Archive Main Stream (subtype=0) Best quality for long-term storage

2.4 Codec Expectations

Parameter Main Stream (subtype=0) Sub Stream (subtype=1)
Video Codec H.265 (HEVC) or H.264 H.264 (AVC)
Audio Codec G.711A/u (if enabled) G.711A/u (if enabled)
Resolution 960x1080 CIF to D1 (typically 352x288 or 704x576)
Frame Rate 25 fps 15-25 fps
Bitrate 2048-4096 Kbps 512-1024 Kbps
GOP Size 25-50 frames 25-50 frames
Pixel Format YUV420P YUV420P

Note: The DVR disk is FULL (0 bytes free). This means the DVR cannot perform local recording. Stream ingestion must not depend on DVR-side recording or playback features. All archival is edge-managed.

2.5 FFmpeg RTSP Connection Parameters

-rtsp_transport tcp          # Force TCP interleaved (NAT/firewall friendly)
-stimeout 5000000            # Socket timeout in microseconds (5 seconds)
-reconnect 1                 # Enable reconnection
-reconnect_at_eof 1          # Reconnect at EOF
-reconnect_streamed 1        # Reconnect streamed content
-reconnect_delay_max 30      # Max reconnect delay (seconds)
-fflags +genpts              # Generate presentation timestamps
-flags +low_delay            # Low latency mode
-probesize 5000000           # Probe size for codec detection
-analyzeduration 2000000     # Analyze duration in microseconds

2.6 Critical RTSP Notes for CP PLUS DVR

  1. TCP Interleaved is mandatory — UDP may drop behind NAT; CP PLUS DVRs work most reliably with rtsp_transport tcp
  2. Authentication — Uses Digest authentication (Basic is disabled on newer firmware)
  3. Session limit — The DVR supports a finite number of concurrent RTSP sessions (typically 10-20). With 8 channels and dual streams, the gateway may open up to 16 concurrent sessions. Stay within limits.
  4. No audio in subtype=1 — Sub streams typically carry no audio; main stream may include audio
  5. Channel numbering — Channels are 1-indexed (1-8), not 0-indexed
  6. Disk full impact — Zero bytes free on DVR means: (a) no local recording possible, (b) some DVR functions may be degraded, (c) stream serving typically still works but monitor performance

3. Edge Gateway Design

3.1 Hardware Specifications

Recommended Platform: Intel NUC 13 Pro (i5-1340P)

Component Specification
CPU Intel Core i5-1340P (12 cores, 16 threads)
RAM 16GB DDR4-3200
Storage 512GB NVMe SSD (for buffering)
LAN Intel i226-V 2.5GbE
USB 3x USB 3.2 (for expansion)
Power 19V DC, 65W
OS Ubuntu 22.04.4 LTS Server (no GUI)
Docker Docker CE 25.x, Docker Compose 2.x

Alternative: NVIDIA Jetson Orin Nano (for on-device AI inference)

Component Specification
CPU 6-core Arm Cortex-A78AE
GPU 1024-core NVIDIA Ampere
RAM 8GB LPDDR5
AI Perf 40 TOPS
Storage 256GB NVMe via M.2
Power 7W-15W
OS JetPack 6.0 (Ubuntu 22.04 based)

Minimum Viable: Raspberry Pi 5

Component Specification
CPU BCM2712 2.4GHz quad-core
RAM 8GB LPDDR4X
Storage 256GB USB3 NVMe
LAN Gigabit Ethernet
Power 5V 5A USB-C
Note Sufficient for 8x sub-stream ingestion + HLS relay; may struggle with 8x main stream

3.2 Edge Gateway Software Architecture

+-------------------------- Edge Gateway (Docker Host) ------------------------+
|                                                                             |
|  +-------------------------+  +-------------------------+  +--------------+ |
|  |   stream-manager        |  |   hls-segmenter         |  |   vpn-client | |
|  |   (Python/FastAPI)      |  |   (FFmpeg/nginx-rtmp)   |  |   (WireGuard)| |
|  |   Port: 8080            |  |   Port: 8081            |  |   Port: 51820| |
|  |                         |  |                         |  |              | |
|  |  - Camera registry      |  |  - HLS manifest gen     |  |  - Persistent| |
|  |  - Stream lifecycle     |  |  - Segment rotation     |  |    tunnel    | |
|  |  - Health monitoring    |  |  - DRM/auth tokens      |  |  - Auto-rec  | |
|  |  - REST API             |  |  - Adaptive bitrate     |  |  - Kill sw   | |
|  +-------------------------+  +-------------------------+  +--------------+ |
|            |                           |                                   |
|  +---------v---------+        +--------v---------+  +-------------------+ |
|  |   frame-extractor |        |   buffer-manager |  |   health-monitor  | |
|  |   (Python/CV2)    |        |   (Python)       |  |   (Python)        | |
|  |                   |        |                  |  |                   | |
|  |  - Keyframe decim.|        |  - Ring buffer   |  |  - DVR ping       | |
|  |  - JPEG/PNG encode|        |  - Upload queue  |  |  - Stream FPS chk | |
|  |  - MQTT publish   |        |  - Retry logic   |  |  - Watchdog       | |
|  |  - Buffer frames  |        |  - Disk mgmt     |  |  - Alert emit     | |
|  +---------+---------+        +--------+---------+  +-------------------+ |
|            |                           |                                   |
|  +---------v---------+                 |                                   |
|  |   mqtt-client     |<----------------+                                   |
|  |   (Eclipse-MQTT)  |                                                     |
|  |   Port: 1883      |                                                     |
|  +-------------------+                                                     |
|                                                                             |
|  Shared Volumes:                                                            |
|    /data/buffer/    - Alert clip ring buffer (20GB max)                    |
|    /data/hls/       - HLS segments for dashboard (5GB max)                 |
|    /data/config/    - YAML/JSON configuration files                        |
|    /data/logs/      - Application logs (rotated, 1GB max)                  |
|    /tmp/ffmpeg/     - Temporary FFmpeg pipes and segments                  |
+-----------------------------------------------------------------------------+

3.3 Container Specifications

Container Image CPU Limit Memory Ports Purpose
stream-manager Custom Python 3.11 slim 2 cores 512MB 8080/tcp Core orchestration
ffmpeg-worker-1 jrottenberg/ffmpeg:6.1 2 cores 256MB CH1-CH4 RTSP ingestion
ffmpeg-worker-2 jrottenberg/ffmpeg:6.1 2 cores 256MB CH5-CH8 RTSP ingestion
hls-segmenter alpine/nginx + ffmpeg 1 core 256MB 8081/tcp HLS manifest serving
buffer-manager Custom Python 3.11 slim 1 core 256MB Clip storage & upload
mqtt-client eclipse-mosquitto:2.0 0.5 core 128MB 1883/tcp Local MQTT broker
vpn-client lscr.io/linuxserver/wireguard 0.5 core 128MB VPN tunnel
health-monitor Custom Python 3.11 slim 0.5 core 128MB System watchdog
prometheus prom/prometheus 0.5 core 256MB 9090/tcp Metrics collection
node-exporter prom/node-exporter 0.25 core 64MB 9100/tcp Host metrics

3.4 VPN Tunnel Specification (WireGuard)

# /etc/wireguard/wg0.conf (Edge Gateway)
[Interface]
PrivateKey = {EDGE_PRIVATE_KEY}
Address = 10.100.0.10/24
DNS = 10.100.0.1
MTU = 1400

# Persistent keepalive for NAT traversal
PersistentKeepalive = 25

# Post-up rules: mark packets for routing
PostUp = iptables -t mangle -A OUTPUT -p tcp --dport 554 -j DSCP --set-dscp-class EF
PostUp = iptables -t nat -A POSTROUTING -o wg0 -j MASQUERADE

# Post-down: cleanup
PostDown = iptables -t mangle -D OUTPUT -p tcp --dport 554 -j DSCP --set-dscp-class EF || true
PostDown = iptables -t nat -D POSTROUTING -o wg0 -j MASQUERADE || true

[Peer]
PublicKey = {CLOUD_PUBLIC_KEY}
PresharedKey = {PSK}
AllowedIPs = 10.100.0.0/24
Endpoint = cloud-gateway.example.com:51820
PersistentKeepalive = 25

Cloud-side peer configuration:

# Cloud VPN Server
[Peer]
PublicKey = {EDGE_PUBLIC_KEY}
PresharedKey = {PSK}
AllowedIPs = 10.100.0.10/32

VPN Routing Table:

Destination Gateway Interface Notes
192.168.29.0/24 192.168.29.1 eth0 Local DVR network
10.100.0.0/24 wg0 VPN tunnel network
0.0.0.0/0 192.168.29.1 eth0 Default route
10.100.0.1 wg0 Cloud relay (via tunnel)

3.5 Edge Gateway Environment Variables

# Core
GATEWAY_ID=edge-ind-01
GATEWAY_LOCATION="Plant Floor A"
LOG_LEVEL=INFO

# DVR Connection
DVR_HOST=192.168.29.200
DVR_RTSP_PORT=554
DVR_HTTP_PORT=80
DVR_USERNAME=admin
DVR_PASSWORD={SECRET_FROM_ENV_FILE}
DVR_CHANNELS=8

# Streams
STREAM_TRANSPORT=tcp          # tcp or udp
STREAM_PRIMARY_SUBTYPE=0      # main stream
STREAM_SECONDARY_SUBTYPE=1    # sub stream
STREAM_BUFFER_SIZE=65536      # RTSP buffer
STREAM_RECONNECT_MAX_DELAY=30 # seconds
STREAM_STALL_TIMEOUT=10       # seconds without frame = stalled

# AI Pipeline
AI_INFERENCE_FPS=5            # extract 5 fps for AI
AI_KEYFRAME_ONLY=true         # only I-frames for analysis
AI_RESOLUTION=640x480         # resize for inference
AI_MQTT_TOPIC=ai/frames       # MQTT topic prefix
AI_CONFIDENCE_THRESHOLD=0.6

# HLS
HLS_SEGMENT_DURATION=2        # seconds per segment
HLS_WINDOW_SIZE=5             # segments in playlist
HLS_OUTPUT_DIR=/data/hls
HLS_CLEANUP_INTERVAL=60       # seconds

# Buffer/Storage
BUFFER_MAX_SIZE_GB=20         # ring buffer max size
BUFFER_RETENTION_HOURS=72     # max age before deletion
UPLOAD_ENDPOINT=https://10.100.0.1/api/upload
UPLOAD_MAX_RETRIES=5
UPLOAD_RETRY_BASE_DELAY=5     # seconds
UPLOAD_CHUNK_SIZE_MB=10

# Monitoring
HEALTH_CHECK_INTERVAL=30      # seconds
METRICS_PORT=9090
WEB_UI_PORT=8080

# VPN
VPN_ENABLED=true
VPN_TUNNEL_IP=10.100.0.10
CLOUD_RELAY_IP=10.100.0.1

4. Stream Discovery & Management

4.1 ONVIF-Based Discovery

CP PLUS DVR ONVIF server V19.06 (V2.6.1.867657) supports:

  • Device Management Service (core)
  • Media Service (stream URI retrieval)
  • PTZ Service (if applicable)
  • Event Service (basic)

ONVIF Endpoints:

Device Service:    http://192.168.29.200:80/onvif/device_service
Media Service:     http://192.168.29.200:80/onvif/media_service
Event Service:     http://192.168.29.200:80/onvif/event_service

Discovery workflow:

# Pseudocode for ONVIF discovery
from onvif import ONVIFCamera

def discover_cameras(dvr_host, dvr_port, username, password):
    camera = ONVIFCamera(dvr_host, dvr_port, username, password)
    
    # Get device info
    devicemgmt = camera.create_devicemgmt_service()
    device_info = devicemgmt.GetDeviceInformation()
    # Returns: Manufacturer, Model, FirmwareVersion, SerialNumber, HardwareId
    
    # Get media profiles (streams)
    media = camera.create_media_service()
    profiles = media.GetProfiles()
    
    for profile in profiles:
        # Get stream URI for each profile
        token = profile.token
        stream_uri = media.GetStreamUri(
            StreamSetup={'Stream': 'RTP_unicast', 'Transport': {'Protocol': 'RTSP'}},
            ProfileToken=token
        )
        # Returns: rtsp://192.168.29.200:554/cam/realmonitor?channel=N&subtype=M
        
        # Get video encoder configuration
        if profile.VideoEncoderConfiguration:
            ve = profile.VideoEncoderConfiguration
            # ve.Resolution, ve.RateControl.BitrateLimit, ve.RateControl.FrameRateLimit
            
        # Get video source configuration
        if profile.VideoSourceConfiguration:
            vs = profile.VideoSourceConfiguration
            # vs.Bounds (resolution), vs.ViewMode
    
    return profiles

Expected ONVIF Response for CP PLUS CP-UVR-0801E1-CV2:

<GetDeviceInformationResponse>
    <Manufacturer>CP PLUS</Manufacturer>
    <Model>CP-UVR-0801E1-CV2</Model>
    <FirmwareVersion>V4.001.00AT001.3.0</FirmwareVersion>
    <SerialNumber>{device_serial}</SerialNumber>
    <HardwareId>V1.0</HardwareId>
</GetDeviceInformationResponse>

4.2 Manual Camera Configuration

When ONVIF is unavailable or for manual override, cameras are defined in YAML:

# /data/config/cameras.yaml
cameras:
  - channel: 1
    name: "Gate/Entry"
    enabled: true
    primary_stream:
      url: "rtsp://admin:{PASSWORD}@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0"
      codec: "h265"
      resolution: "960x1080"
      fps: 25
    secondary_stream:
      url: "rtsp://admin:{PASSWORD}@192.168.29.200:554/cam/realmonitor?channel=1&subtype=1"
      codec: "h264"
      resolution: "704x576"
      fps: 15
    ai_config:
      enabled: true
      use_secondary: true
      inference_fps: 5
      keyframe_only: true
      roi: [0, 0, 960, 1080]
    live_config:
      dashboard_default: "secondary"  # grid view uses sub stream
      fullscreen_uses: "primary"
    recording_config:
      enabled: true
      use_primary: true
      pre_event_seconds: 10
      post_event_seconds: 30

  - channel: 2
    name: "Production Floor A"
    enabled: true
    # ... (same structure)

  # ... CH3 through CH8

settings:
  default_ai_fps: 5
  default_inference_resolution: "640x480"
  stream_stall_timeout: 10
  reconnect_max_delay: 30
  circuit_breaker:
    failure_threshold: 5
    recovery_timeout: 60

4.3 Stream Puller with FFmpeg — Full Command Reference

4.3.1 Basic Stream Pull (Validation)

# Channel 1 - Main Stream (validation probe)
ffmpeg -rtsp_transport tcp \
  -stimeout 5000000 \
  -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
  -c copy -f null - \
  -v quiet -print_format json \
  -show_format -show_streams \
  2>/dev/null | jq .

4.3.2 AI Frame Extraction (Keyframe, Decimated)

# Extract I-frames only at 5fps for AI inference (sub stream)
ffmpeg -rtsp_transport tcp -stimeout 5000000 -reconnect 1 -reconnect_at_eof 1 \
  -reconnect_streamed 1 -reconnect_delay_max 30 \
  -fflags +genpts+discardcorrupt -flags +low_delay \
  -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=1" \
  -vf "select='eq(pict_type,I)',fps=5,scale=640:480" \
  -f image2pipe -vcodec mjpeg -q:v 2 \
  pipe:1

4.3.3 HLS Segment Generation (All 8 Channels)

# Single channel HLS (primary stream)
ffmpeg -rtsp_transport tcp -stimeout 5000000 -reconnect 1 -reconnect_at_eof 1 \
  -reconnect_streamed 1 -reconnect_delay_max 30 \
  -fflags +genpts+discardcorrupt -flags +low_delay \
  -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
  -c:v copy -c:a aac -b:a 128k \
  -f hls -hls_time 2 -hls_list_size 5 -hls_flags delete_segments+omit_endlist \
  -hls_segment_filename "/data/hls/ch1_%03d.ts" \
  "/data/hls/ch1.m3u8"

4.3.4 Simultaneous Multi-Output (Single FFmpeg Instance)

# Single RTSP input -> HLS + frame extraction + recording (tee pseudo-muxer)
ffmpeg -rtsp_transport tcp -stimeout 5000000 -reconnect 1 -reconnect_at_eof 1 \
  -reconnect_streamed 1 -reconnect_delay_max 30 \
  -fflags +genpts+discardcorrupt -flags +low_delay \
  -probesize 5000000 -analyzeduration 2000000 \
  -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
  \
  # Output 1: HLS for live viewing
  -c:v copy -c:a copy \
  -f hls -hls_time 2 -hls_list_size 5 \
  -hls_flags delete_segments+omit_endlist \
  -hls_segment_filename "/data/hls/ch1_%03d.ts" \
  "/data/hls/ch1.m3u8" \
  \
  # Output 2: Transcoded sub-stream for AI (scaled, keyframes)
  -vf "select='eq(pict_type,I)',fps=5,scale=640:480" \
  -f image2pipe -vcodec mjpeg -q:v 2 \
  pipe:1 \
  \
  # Output 3: MP4 recording (on event trigger)
  -c:v copy -c:a copy \
  -f segment -segment_time 60 -segment_format mp4 \
  -reset_timestamps 1 \
  -strftime 1 "/data/buffer/ch1_%Y%m%d_%H%M%S.mp4"

4.3.5 UDP Fallback Command (when TCP fails)

ffmpeg -rtsp_transport udp -buffer_size 65536 \
  -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
  -c copy -f hls -hls_time 2 -hls_list_size 5 \
  -hls_segment_filename "/data/hls/ch1_%03d.ts" \
  "/data/hls/ch1.m3u8"

4.3.8 Complete 8-Channel Parallel Launch Script

#!/bin/bash
# /usr/local/bin/start_streams.sh
# Launches all 8 channel streams with individual control

DVR_IP="192.168.29.200"
DVR_USER="admin"
DVR_PASS="${DVR_PASSWORD}"
HLS_DIR="/data/hls"
BUFFER_DIR="/data/buffer"
FFMPEG_OPTS="-rtsp_transport tcp -stimeout 5000000 -reconnect 1 -reconnect_at_eof 1 \
  -reconnect_streamed 1 -reconnect_delay_max 30 -fflags +genpts+discardcorrupt \
  -flags +low_delay -probesize 5000000 -analyzeduration 2000000 -nostdin"

for CH in {1..8}; do
  # Determine stream type: channels 1-4 use main, 5-8 use sub (bandwidth management)
  if [ $CH -le 4 ]; then
    SUBTYPE=0
    QUALITY="high"
  else
    SUBTYPE=1
    QUALITY="medium"
  fi

  RTSP_URL="rtsp://${DVR_USER}:${DVR_PASS}@${DVR_IP}:554/cam/realmonitor?channel=${CH}&subtype=${SUBTYPE}"

  # Launch with process monitoring
  ffmpeg ${FFMPEG_OPTS} \
    -i "${RTSP_URL}" \
    -c:v copy -c:a copy \
    -f hls -hls_time 2 -hls_list_size 5 \
    -hls_flags delete_segments+omit_endlist \
    -hls_segment_filename "${HLS_DIR}/ch${CH}_%03d.ts" \
    "${HLS_DIR}/ch${CH}.m3u8" \
    2>"${BUFFER_DIR}/ch${CH}_ffmpeg.log" &

  echo $! > "/var/run/stream_ch${CH}.pid"
  echo "Started channel ${CH} (${QUALITY}) with PID $!"
done

echo "All 8 channels launched. PIDs stored in /var/run/stream_ch*.pid"

4.4 Auto-Reconnect Strategy

# Auto-reconnect with exponential backoff
import asyncio
import logging
from dataclasses import dataclass
from enum import Enum
from typing import Optional

logger = logging.getLogger(__name__)

class StreamState(Enum):
    CONNECTING = "connecting"
    ONLINE = "online"
    STALLED = "stalled"
    OFFLINE = "offline"
    CIRCUIT_OPEN = "circuit_open"

@dataclass
class ReconnectConfig:
    initial_delay: float = 1.0
    max_delay: float = 30.0
    multiplier: float = 2.0
    jitter: float = 0.1
    stall_timeout: float = 10.0
    circuit_breaker_threshold: int = 5
    circuit_breaker_recovery: float = 60.0

class StreamConnector:
    """
    Manages a single RTSP stream connection with:
    - Exponential backoff reconnect
    - Frame-level stall detection
    - Circuit breaker for persistent failures
    - Graceful shutdown
    """

    def __init__(self, channel: int, config: ReconnectConfig, ffmpeg_cmd: list):
        self.channel = channel
        self.cfg = config
        self.ffmpeg_cmd = ffmpeg_cmd
        self.state = StreamState.OFFLINE
        self.current_delay = config.initial_delay
        self.consecutive_failures = 0
        self.circuit_open_time: Optional[float] = None
        self.last_frame_time: Optional[float] = None
        self._process: Optional[asyncio.subprocess.Process] = None
        self._task: Optional[asyncio.Task] = None
        self._shutdown = False

    async def start(self):
        self._task = asyncio.create_task(self._connection_loop())
        asyncio.create_task(self._stall_monitor())

    async def _connection_loop(self):
        while not self._shutdown:
            if self.state == StreamState.CIRCUIT_OPEN:
                elapsed = asyncio.get_event_loop().time() - self.circuit_open_time
                if elapsed < self.cfg.circuit_breaker_recovery:
                    await asyncio.sleep(1)
                    continue
                # Circuit breaker recovery
                self.consecutive_failures = 0
                self.state = StreamState.OFFLINE
                logger.info(f"CH{self.channel}: Circuit breaker recovered")

            if self.state == StreamState.OFFLINE:
                await self._attempt_connect()
            elif self.state == StreamState.STALLED:
                await self._handle_stall()

            await asyncio.sleep(0.5)

    async def _attempt_connect(self):
        self.state = StreamState.CONNECTING
        logger.info(f"CH{self.channel}: Connecting (delay={self.current_delay:.1f}s)...")
        await asyncio.sleep(self.current_delay)

        try:
            self._process = await asyncio.create_subprocess_exec(
                *self.ffmpeg_cmd,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE,
            )

            # Wait for process to stabilize
            await asyncio.sleep(3)

            if self._process.returncode is not None:
                raise ConnectionError(f"FFmpeg exited with code {self._process.returncode}")

            self.state = StreamState.ONLINE
            self.last_frame_time = asyncio.get_event_loop().time()
            self.current_delay = self.cfg.initial_delay
            self.consecutive_failures = 0
            logger.info(f"CH{self.channel}: Online")

            # Monitor the stream
            await self._monitor_stream()

        except Exception as e:
            self.consecutive_failures += 1
            self.current_delay = min(
                self.current_delay * self.cfg.multiplier,
                self.cfg.max_delay
            )

            if self.consecutive_failures >= self.cfg.circuit_breaker_threshold:
                self.state = StreamState.CIRCUIT_OPEN
                self.circuit_open_time = asyncio.get_event_loop().time()
                logger.error(
                    f"CH{self.channel}: Circuit breaker OPEN after "
                    f"{self.consecutive_failures} failures"
                )
            else:
                self.state = StreamState.OFFLINE
                logger.warning(f"CH{self.channel}: Connection failed ({e}), "
                               f"retry in {self.current_delay:.1f}s")

    async def _monitor_stream(self):
        """Monitor stderr output for frame reception indicators."""
        while self.state == StreamState.ONLINE and self._process:
            try:
                line = await asyncio.wait_for(
                    self._process.stderr.readline(), timeout=5.0
                )
                if line:
                    self.last_frame_time = asyncio.get_event_loop().time()
                    # Parse frame info from FFmpeg output
                    # Example: "frame=  125 fps= 25 q=-1.0 Lsize= ..."
                    decoded = line.decode('utf-8', errors='ignore').strip()
                    if 'frame=' in decoded:
                        self._update_metrics(decoded)
            except asyncio.TimeoutError:
                logger.debug(f"CH{self.channel}: No stderr output (may be normal)")

    async def _stall_monitor(self):
        """Independent task that detects stalls based on frame timestamps."""
        while not self._shutdown:
            await asyncio.sleep(1)
            if self.state == StreamState.ONLINE and self.last_frame_time:
                elapsed = asyncio.get_event_loop().time() - self.last_frame_time
                if elapsed > self.cfg.stall_timeout:
                    logger.warning(
                        f"CH{self.channel}: STALL detected — "
                        f"no frames for {elapsed:.1f}s"
                    )
                    self.state = StreamState.STALLED

    async def _handle_stall(self):
        """Kill the stuck process and reconnect."""
        logger.info(f"CH{self.channel}: Restarting stalled stream")
        await self._kill_process()
        self.state = StreamState.OFFLINE
        self.current_delay = self.cfg.initial_delay

    async def _kill_process(self):
        if self._process and self._process.returncode is None:
            try:
                self._process.terminate()
                await asyncio.wait_for(self._process.wait(), timeout=5.0)
            except asyncio.TimeoutError:
                self._process.kill()
                await self._process.wait()
        self._process = None

    def _update_metrics(self, line: str):
        """Parse and publish frame metrics."""
        # Parse: frame=  125 fps= 25 q=-1.0
        metrics = {}
        for part in line.split():
            if '=' in part:
                key, _, val = part.partition('=')
                metrics[key] = val
        if 'fps' in metrics:
            self.current_fps = float(metrics['fps'])

    async def stop(self):
        self._shutdown = True
        if self._task:
            self._task.cancel()
            try:
                await self._task
            except asyncio.CancelledError:
                pass
        await self._kill_process()

    @property
    def is_healthy(self) -> bool:
        return self.state == StreamState.ONLINE

4.5 Circuit Breaker Pattern

# circuit_breaker.py
import time
from enum import Enum
from dataclasses import dataclass

class CircuitState(Enum):
    CLOSED = "closed"       # Normal operation
    OPEN = "open"           # Failing fast
    HALF_OPEN = "half_open" # Testing recovery

@dataclass
class CircuitBreakerConfig:
    failure_threshold: int = 5      # failures before opening
    recovery_timeout: float = 60.0   # seconds before half-open
    half_open_max_calls: int = 3    # test calls in half-open state
    success_threshold: int = 2      # consecutive successes to close

class CircuitBreaker:
    """Circuit breaker for persistent stream failures."""

    def __init__(self, name: str, config: CircuitBreakerConfig):
        self.name = name
        self.cfg = config
        self.state = CircuitState.CLOSED
        self.failures = 0
        self.successes = 0
        self.last_failure_time: Optional[float] = None
        self.half_open_calls = 0

    def record_success(self):
        if self.state == CircuitState.HALF_OPEN:
            self.successes += 1
            self.half_open_calls += 1
            if self.successes >= self.cfg.success_threshold:
                self._close_circuit()
        elif self.state == CircuitState.CLOSED:
            self.failures = max(0, self.failures - 1)

    def record_failure(self) -> bool:
        self.failures += 1
        self.last_failure_time = time.time()

        if self.state == CircuitState.HALF_OPEN:
            self.half_open_calls += 1
            if self.half_open_calls >= self.cfg.half_open_max_calls:
                self._open_circuit()
            return False

        if self.failures >= self.cfg.failure_threshold:
            self._open_circuit()
            return False
        return True  # Still allowed

    def can_execute(self) -> bool:
        if self.state == CircuitState.CLOSED:
            return True
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time >= self.cfg.recovery_timeout:
                self.state = CircuitState.HALF_OPEN
                self.half_open_calls = 0
                self.successes = 0
                return True
            return False
        if self.state == CircuitState.HALF_OPEN:
            return self.half_open_calls < self.cfg.half_open_max_calls
        return False

    def _open_circuit(self):
        self.state = CircuitState.OPEN
        self.failures = self.cfg.failure_threshold
        # Alert published here

    def _close_circuit(self):
        self.state = CircuitState.CLOSED
        self.failures = 0
        self.successes = 0
        self.half_open_calls = 0

4.6 Per-Camera Status Model

from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional

@dataclass
class CameraStatus:
    """Real-time status for a single camera stream."""
    channel: int
    name: str
    enabled: bool = True
    state: str = "OFFLINE"           # OFFLINE, CONNECTING, ONLINE, STALLED, ERROR
    stream_type: str = "secondary"   # primary or secondary currently active
    current_fps: float = 0.0
    target_fps: float = 25.0
    current_bitrate_kbps: float = 0.0
    resolution: str = "unknown"
    codec: str = "unknown"
    last_frame_time: Optional[datetime] = None
    connection_time: Optional[datetime] = None
    disconnect_count: int = 0
    total_bytes_received: int = 0
    latency_ms: float = 0.0
    keyframe_interval: float = 0.0
    circuit_breaker_state: str = "CLOSED"
    error_message: Optional[str] = None
    ai_fps_actual: float = 0.0
    ai_frames_processed: int = 0
    hls_segments_available: int = 0
    recording_active: bool = False
    tags: dict = field(default_factory=dict)

    @property
    def is_healthy(self) -> bool:
        return self.state == "ONLINE" and self.current_fps > 0

    @property
    def uptime_seconds(self) -> float:
        if self.connection_time:
            return (datetime.utcnow() - self.connection_time).total_seconds()
        return 0.0

    def to_dict(self) -> dict:
        return {
            "channel": self.channel,
            "name": self.name,
            "enabled": self.enabled,
            "state": self.state,
            "stream_type": self.stream_type,
            "fps": {
                "current": round(self.current_fps, 1),
                "target": self.target_fps,
            },
            "bitrate_kbps": round(self.current_bitrate_kbps, 1),
            "resolution": self.resolution,
            "codec": self.codec,
            "last_frame_time": self.last_frame_time.isoformat() if self.last_frame_time else None,
            "uptime_seconds": self.uptime_seconds,
            "disconnect_count": self.disconnect_count,
            "circuit_breaker": self.circuit_breaker_state,
            "ai_fps_actual": round(self.ai_fps_actual, 1),
            "hls_ready": self.hls_segments_available > 0,
            "healthy": self.is_healthy,
        }

5. Stream Processing Pipeline

5.1 Pipeline Architecture

+----------+    +-------------+    +------------------+    +---------------+
|  RTSP    | -> |   FFmpeg    | -> |  Frame Router    | -> |  Multiple     |
|  Source  |    |   Demuxer   |    |  & Tee           |    |  Consumers    |
+----------+    +-------------+    +------------------+    +---------------+
                                           |
                    +----------------------+----------------------+
                    |                      |                      |
            +-------v-------+    +---------v---------+    +-------v--------+
            | HLS Segmenter |    | Frame Extractor   |    | Clip Recorder  |
            | (live view)   |    | (AI inference)    |    | (events)       |
            +---------------+    +---------+---------+    +----------------+
                                          |
                              +-----------+-----------+
                              |                       |
                    +---------v---------+    +--------v---------+
                    | Keyframe Decoder  |    | JPEG Encoder     |
                    | (I-frame select)  |    | (AI input)       |
                    +---------+---------+    +--------+---------+
                              |                       |
                    +---------v---------+    +--------v---------+
                    | Frame Buffer      |    | MQTT Publisher   |
                    | (ring queue)      |    | (metadata)       |
                    +-------------------+    +------------------+

5.2 Frame Extraction for AI Inference

The AI inference pipeline extracts frames from the sub-stream (lower bandwidth) at a configurable decimated rate. Keyframes (I-frames) are preferred to reduce decode overhead.

5.2.1 Keyframe-Based Extraction Strategy

Mode Description Use Case
keyframe_only Extract only I-frames Maximum efficiency, may skip motion
fixed_fps Extract at fixed FPS regardless of frame type Smooth temporal analysis
adaptive I-frames + P-frames when motion detected Balanced quality and efficiency
hybrid Fixed FPS with I-frame priority Best overall

Recommended: hybrid mode — 5 FPS extraction with I-frame priority

# frame_extractor.py — Frame extraction pipeline for AI inference
import asyncio
import cv2
import numpy as np
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, Callable, AsyncIterator
import subprocess
import io
import hashlib

logger = logging.getLogger(__name__)

@dataclass
class FrameConfig:
    target_fps: float = 5.0
    mode: str = "hybrid"          # keyframe_only, fixed_fps, adaptive, hybrid
    resolution: tuple = (640, 480)
    jpeg_quality: int = 85
    use_keyframes: bool = True
    motion_threshold: float = 0.02  # for adaptive mode

@dataclass
class ExtractedFrame:
    channel: int
    timestamp: datetime
    frame_number: int
    is_keyframe: bool
    raw_bytes: bytes
    width: int
    height: int
    motion_score: float = 0.0
    jpeg_bytes: Optional[bytes] = None

class FrameExtractor:
    """
    Extracts decimated frames from RTSP stream for AI inference.
    Supports multiple extraction modes with configurable FPS.
    """

    def __init__(self, channel: int, config: FrameConfig):
        self.channel = channel
        self.cfg = config
        self._callback: Optional[Callable[[ExtractedFrame], None]] = None
        self._running = False
        self._frame_count = 0
        self._last_extract_time = 0.0
        self._extract_interval = 1.0 / config.target_fps
        self._prev_gray: Optional[np.ndarray] = None

    def on_frame(self, callback: Callable[[ExtractedFrame], None]):
        """Register callback for extracted frames."""
        self._callback = callback

    async def start(self, rtsp_url: str):
        """Start frame extraction from RTSP stream."""
        self._running = True
        ffmpeg_cmd = self._build_ffmpeg_cmd(rtsp_url)

        process = await asyncio.create_subprocess_exec(
            *ffmpeg_cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.DEVNULL,
        )

        try:
            await self._read_frames(process)
        finally:
            self._running = False
            if process.returncode is None:
                process.kill()
                await process.wait()

    def _build_ffmpeg_cmd(self, rtsp_url: str) -> list:
        """Build FFmpeg command for frame extraction."""
        w, h = self.cfg.resolution

        # Base filters
        filters = f"fps={self.cfg.target_fps},scale={w}:{h}"

        if self.cfg.use_keyframes:
            if self.cfg.mode == "keyframe_only":
                filters = f"select='eq(pict_type,I)',{filters}"
            elif self.cfg.mode == "hybrid":
                # I-frame priority with fixed FPS fallback
                filters = f"select='eq(pict_type,I)+gte(n,prev_selected_n+{int(25/self.cfg.target_fps)})',{filters}"

        cmd = [
            "ffmpeg",
            "-rtsp_transport", "tcp",
            "-stimeout", "5000000",
            "-reconnect", "1",
            "-reconnect_at_eof", "1",
            "-reconnect_streamed", "1",
            "-reconnect_delay_max", "30",
            "-fflags", "+genpts+discardcorrupt",
            "-flags", "+low_delay",
            "-i", rtsp_url,
            "-vf", filters,
            "-f", "image2pipe",
            "-vcodec", "mjpeg",
            "-q:v", str(int((100 - self.cfg.jpeg_quality) / 10)),
            "-"
        ]
        return cmd

    async def _read_frames(self, process: asyncio.subprocess.Process):
        """Read MJPEG frames from FFmpeg pipe."""
        buffer = b""
        jpeg_start = b'\xff\xd8'  # JPEG SOI marker
        jpeg_end = b'\xff\xd9'    # JPEG EOI marker

        while self._running:
            chunk = await process.stdout.read(65536)
            if not chunk:
                break

            buffer += chunk

            # Find complete JPEG frames
            start_idx = buffer.find(jpeg_start)
            while start_idx != -1:
                end_idx = buffer.find(jpeg_end, start_idx)
                if end_idx == -1:
                    break

                jpeg_data = buffer[start_idx:end_idx + 2]
                buffer = buffer[end_idx + 2:]

                await self._process_jpeg(jpeg_data)
                start_idx = buffer.find(jpeg_start)

            # Prevent buffer bloat
            if len(buffer) > 10 * 1024 * 1024:
                buffer = buffer[-5 * 1024 * 1024:]

    async def _process_jpeg(self, jpeg_data: bytes):
        """Process extracted JPEG frame."""
        self._frame_count += 1

        # Decode for metadata and motion detection
        nparr = np.frombuffer(jpeg_data, np.uint8)
        frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)

        if frame is None:
            return

        h, w = frame.shape[:2]
        is_keyframe = self._frame_count % int(25 / self.cfg.target_fps) == 1

        # Motion detection (for adaptive mode)
        motion_score = 0.0
        if self.cfg.mode in ("adaptive", "hybrid") and self._prev_gray is not None:
            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            resized = cv2.resize(gray, (w // 4, h // 4))
            flow = cv2.calcOpticalFlowFarneback(
                self._prev_gray, resized, None,
                0.5, 3, 15, 3, 5, 1.2, 0
            )
            motion_score = np.mean(np.abs(flow))
            self._prev_gray = resized
        elif self.cfg.mode in ("adaptive", "hybrid"):
            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            self._prev_gray = cv2.resize(gray, (w // 4, h // 4))

        extracted = ExtractedFrame(
            channel=self.channel,
            timestamp=datetime.utcnow(),
            frame_number=self._frame_count,
            is_keyframe=is_keyframe,
            raw_bytes=frame.tobytes(),
            width=w,
            height=h,
            motion_score=float(motion_score),
            jpeg_bytes=jpeg_data,
        )

        if self._callback:
            try:
                if asyncio.iscoroutinefunction(self._callback):
                    await self._callback(extracted)
                else:
                    self._callback(extracted)
            except Exception as e:
                logger.error(f"Frame callback error: {e}")

    async def stop(self):
        self._running = False


class MultiChannelExtractor:
    """Manages frame extraction across all 8 channels."""

    def __init__(self, max_concurrent: int = 8):
        self.extractors: dict[int, FrameExtractor] = {}
        self.tasks: dict[int, asyncio.Task] = {}
        self.max_concurrent = max_concurrent
        self._frame_queue: asyncio.Queue[ExtractedFrame] = asyncio.Queue(maxsize=100)

    async def add_channel(self, channel: int, rtsp_url: str, config: FrameConfig):
        extractor = FrameExtractor(channel, config)
        extractor.on_frame(self._on_frame)
        self.extractors[channel] = extractor
        self.tasks[channel] = asyncio.create_task(
            extractor.start(rtsp_url),
            name=f"extractor-ch{channel}"
        )

    async def _on_frame(self, frame: ExtractedFrame):
        try:
            self._frame_queue.put_nowait(frame)
        except asyncio.QueueFull:
            # Drop oldest frame to prevent blocking
            try:
                self._frame_queue.get_nowait()
                self._frame_queue.put_nowait(frame)
            except asyncio.QueueEmpty:
                pass

    async def frame_consumer(self, handler: Callable[[ExtractedFrame], None]):
        """Consume frames from the queue."""
        while True:
            frame = await self._frame_queue.get()
            try:
                if asyncio.iscoroutinefunction(handler):
                    await handler(frame)
                else:
                    handler(frame)
            except Exception as e:
                logger.error(f"Frame handler error: {e}")
            finally:
                self._frame_queue.task_done()

    async def stop_all(self):
        for extractor in self.extractors.values():
            await extractor.stop()
        for task in self.tasks.values():
            task.cancel()
            try:
                await task
            except asyncio.CancelledError:
                pass

5.3 Simultaneous Stream Forking (Live + AI)

For each channel, the gateway must simultaneously provide:

  1. HLS live stream — for dashboard viewing (full quality)
  2. AI frame stream — decimated frames for inference
  3. Alert clip stream — triggered recordings

Approach: Single FFmpeg → Multi-target output using tee pseudo-muxer

# Advanced: Single RTSP input, multiple outputs with tee muxer
ffmpeg -rtsp_transport tcp -stimeout 5000000 \
  -fflags +genpts+discardcorrupt -flags +low_delay \
  -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
  \
  # Split into two video streams using split filter
  -filter_complex "[0:v]split=2[live][ai];[ai]fps=5,scale=640:480[aiout]" \
  \
  # Output 1: HLS live stream (copy codec, no re-encode)
  -map "[live]" -map 0:a? -c:v copy -c:a copy \
  -f hls -hls_time 2 -hls_list_size 5 \
  -hls_flags delete_segments+omit_endlist+program_date_time \
  -hls_segment_filename "/data/hls/ch1_%03d.ts" \
  "/data/hls/ch1.m3u8" \
  \
  # Output 2: AI frame stream (MJPEG to pipe)
  -map "[aiout]" -an \
  -f image2pipe -vcodec mjpeg -q:v 3 \
  pipe:1 \
  \
  # Output 3: Continuous recording (segmented MP4)
  -map 0:v -map 0:a? -c:v copy -c:a copy \
  -f segment -segment_time 60 -segment_format mp4 \
  -reset_timestamps 1 -strftime 1 \
  "/data/buffer/ch1_continuous/%Y%m%d_%H%M%S.mp4"

5.4 Resolution Adaptation

The gateway supports dynamic resolution adaptation based on network conditions:

Scenario Action Trigger
Bandwidth sufficient Full resolution (960x1080) VPN throughput > 16 Mbps
Bandwidth constrained Downscale to 640x720 VPN throughput < 8 Mbps
Severe constraint Switch to sub-stream (352x288) VPN throughput < 4 Mbps
Recovery detected Gradually upscale Sustained > threshold for 30s
class AdaptiveBitrateController:
    """
    Monitors VPN tunnel bandwidth and adjusts stream quality.
    Uses EWMA (exponentially weighted moving average) for smoothing.
    """

    QUALITY_LEVELS = {
        "full": {"resolution": (960, 1080), "stream": "primary"},
        "medium": {"resolution": (640, 720), "stream": "primary"},
        "low": {"resolution": (352, 288), "stream": "secondary"},
    }

    def __init__(self, min_bandwidth_mbps: float = 4.0):
        self.ewma_alpha = 0.3
        self.current_bandwidth = 100.0  # Start optimistic
        self.min_bandwidth = min_bandwidth_mbps
        self.current_quality = "full"
        self._history: list[float] = []

    def update_bandwidth(self, measured_mbps: float):
        self.current_bandwidth = (
            self.ewma_alpha * measured_mbps +
            (1 - self.ewma_alpha) * self.current_bandwidth
        )
        self._history.append(self.current_bandwidth)
        if len(self._history) > 100:
            self._history = self._history[-50:]

    def get_quality(self) -> str:
        bw = self.current_bandwidth
        if bw > 16:
            target = "full"
        elif bw > 8:
            target = "medium"
        else:
            target = "low"

        # Hysteresis: only downgrade immediately, upgrade with delay
        if self.QUALITY_LEVELS[target]["resolution"][0] < \
           self.QUALITY_LEVELS[self.current_quality]["resolution"][0]:
            # Downgrade immediately
            self.current_quality = target
        elif target != self.current_quality:
            # Check sustained improvement before upgrading
            if len(self._history) >= 10 and \
               all(b > self.QUALITY_LEVELS[target]["resolution"][0] / 60
                   for b in self._history[-10:]):
                self.current_quality = target

        return self.current_quality

5.5 Bandwidth Optimization for Cloud Upload

+------------------+    +-------------------+    +------------------+
|  Raw RTSP Stream | -> |  Edge Transcoder  | -> |  Cloud-Optimized |
|  (H.265, 4 Mbps) |    |  (H.264, 1 Mbps)  |    |  Upload Stream   |
+------------------+    +-------------------+    +------------------+
                              |
                    +---------v---------+
                    | Optimization:     |
                    | - H.265→H.264     |
                    | - CRF 28 (quality)|
                    | - 2-pass VBR      |
                    | - De-duplication  |
                    | - Delta frames    |
                    +-------------------+

FFmpeg Cloud Upload Encoding:

# Transcode H.265 main stream to H.264 for cloud compatibility
ffmpeg -rtsp_transport tcp -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
  -c:v libx264 -preset fast -tune zerolatency \
  -crf 28 -maxrate 1500k -bufsize 3000k \
  -g 50 -keyint_min 25 \
  -profile:v main -level 4.1 \
  -movflags +faststart \
  -c:a aac -b:a 64k -ac 1 \
  -f mp4 pipe:1 | curl -X POST -H "Content-Type: video/mp4" \
  --data-binary @- https://10.100.0.1/api/upload/ch1

Upload Optimization Techniques:

Technique Bandwidth Savings Implementation
H.265→H.264 with CRF 28 40-60% FFmpeg libx264
Frame deduplication (static scenes) 10-30% Skip identical frames
Adaptive GOP 5-15% Dynamic I-frame interval
Resolution scaling 30-70% Scale to 720p or 480p
Audio mono + low bitrate 5% AAC 64kbps mono
Upload compression (gzip) 5-10% HTTP Content-Encoding

6. Live Streaming for Dashboard

6.1 HLS Stream Generation

HLS (HTTP Live Streaming) is the primary delivery mechanism for the dashboard due to its wide browser support and adaptive bitrate capabilities.

6.1.1 HLS Playlist Format (Generated per Channel)

# /data/hls/ch1.m3u8
#EXTM3U
#EXT-X-VERSION:3
#EXT-X-TARGETDURATION:2
#EXT-X-MEDIA-SEQUENCE:1245
#EXT-X-DISCONTINUITY-SEQUENCE:0
#EXTINF:2.000,
ch1_245.ts
#EXTINF:2.000,
ch1_246.ts
#EXTINF:2.000,
ch1_247.ts
#EXTINF:2.000,
ch1_248.ts
#EXTINF:2.000,
ch1_249.ts

6.1.2 Multi-Quality HLS (Master Playlist)

For adaptive streaming, generate multiple quality variants:

# /data/hls/ch1_master.m3u8
#EXTM3U
#EXT-X-STREAM-INF:BANDWIDTH=4000000,RESOLUTION=960x1080,CODECS="avc1.640020,mp4a.40.2"
ch1_high.m3u8
#EXT-X-STREAM-INF:BANDWIDTH=1500000,RESOLUTION=640x720,CODECS="avc1.4d001f,mp4a.40.2"
ch1_mid.m3u8
#EXT-X-STREAM-INF:BANDWIDTH=500000,RESOLUTION=352x288,CODECS="avc1.42001e,mp4a.40.2"
ch1_low.m3u8

6.1.3 FFmpeg HLS Generation Commands

# High quality variant (primary stream)
ffmpeg -rtsp_transport tcp -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
  -c:v libx264 -preset ultrafast -tune zerolatency -profile:v high -level 4.1 \
  -b:v 3000k -maxrate 4000k -bufsize 6000k -g 50 -keyint_min 25 \
  -c:a aac -b:a 128k -ar 44100 -ac 2 \
  -f hls -hls_time 2 -hls_list_size 5 -hls_flags delete_segments+omit_endlist \
  -hls_segment_type mpegts -hls_segment_filename "/data/hls/ch1_high_%03d.ts" \
  "/data/hls/ch1_high.m3u8" \
  \
# Mid quality variant (transcoded)
  -c:v libx264 -preset ultrafast -tune zerolatency -profile:v main -level 3.1 \
  -vf "scale=640:720" -b:v 1000k -maxrate 1500k -bufsize 2000k -g 50 \
  -c:a aac -b:a 96k -ar 44100 -ac 2 \
  -f hls -hls_time 2 -hls_list_size 5 -hls_flags delete_segments+omit_endlist \
  -hls_segment_filename "/data/hls/ch1_mid_%03d.ts" \
  "/data/hls/ch1_mid.m3u8"

6.2 WebSocket + WebRTC for Low-Latency Preview

For sub-500ms latency preview (single camera fullscreen), WebRTC is used:

+-------------+    +------------------+    +-----------------+
|  Browser    | -> |  Edge Gateway    | -> |  CP PLUS DVR    |
| (WebRTC)    |    |  (WHIP Bridge)   |    |  (RTSP Source)  |
|             |<-   |                  |<-   |                 |
+-------------+    +------------------+    +-----------------+

Latency breakdown:
  RTSP ingest:    ~200-500ms
  FFmpeg decode:  ~50-100ms
  WebRTC encode:  ~30-80ms
  Network (LAN):  ~1-5ms
  Browser decode: ~20-50ms
  TOTAL:          ~300-750ms

WebRTC Bridge (Python/aiortc):

# webrtc_bridge.py — Low-latency WebRTC bridge for single-camera preview
import asyncio
import logging
from aiortc import RTCPeerConnection, VideoStreamTrack
from aiortc.contrib.signaling import object_from_string, object_to_string
from av import VideoFrame
import cv2
import numpy as np

logger = logging.getLogger(__name__)

class RTSPVideoStreamTrack(VideoStreamTrack):
    """Reads RTSP via FFmpeg, outputs WebRTC VideoFrame."""

    def __init__(self, rtsp_url: str):
        super().__init__()
        self.rtsp_url = rtsp_url
        self._ffmpeg = None
        self._task = None
        self._frame_queue = asyncio.Queue(maxsize=2)  # Minimize latency

    async def start(self):
        cmd = [
            "ffmpeg", "-rtsp_transport", "tcp", "-stimeout", "5000000",
            "-reconnect", "1", "-reconnect_at_eof", "1",
            "-reconnect_streamed", "1", "-reconnect_delay_max", "30",
            "-fflags", "+genpts+discardcorrupt", "-flags", "+low_delay",
            "-i", self.rtsp_url,
            "-vf", "scale=640:480,format=yuv420p",
            "-c:v", "rawvideo", "-pix_fmt", "yuv420p",
            "-f", "rawvideo", "-"
        ]
        self._ffmpeg = await asyncio.create_subprocess_exec(
            *cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.DEVNULL,
        )
        self._task = asyncio.create_task(self._read_frames())

    async def _read_frames(self):
        width, height = 640, 480
        frame_size = width * height * 3 // 2  # YUV420P
        while True:
            raw = await self._ffmpeg.stdout.read(frame_size)
            if len(raw) < frame_size:
                break
            frame = VideoFrame.from_bytes(raw, format="yuv420p", width=width, height=height)
            try:
                self._frame_queue.put_nowait(frame)
            except asyncio.QueueFull:
                # Drop oldest to minimize latency
                self._frame_queue.get_nowait()
                self._frame_queue.put_nowait(frame)

    async def recv(self):
        frame = await self._frame_queue.get()
        pts, time_base = await self.next_timestamp()
        frame.pts = pts
        frame.time_base = time_base
        return frame

    async def stop(self):
        if self._task:
            self._task.cancel()
        if self._ffmpeg:
            self._ffmpeg.kill()
            await self._ffmpeg.wait()


class WebRTCBridge:
    """Manages WebRTC peer connections for low-latency camera preview."""

    def __init__(self, rtsp_url: str):
        self.rtsp_url = rtsp_url
        self.pc = None
        self.track = None

    async def create_offer(self):
        self.pc = RTCPeerConnection()
        self.track = RTSPVideoStreamTrack(self.rtsp_url)
        await self.track.start()
        self.pc.addTrack(self.track)

        offer = await self.pc.createOffer()
        await self.pc.setLocalDescription(offer)
        return {"sdp": self.pc.localDescription.sdp, "type": self.pc.localDescription.type}

    async def handle_answer(self, answer_sdp: str):
        answer = object_from_string(json.dumps({"sdp": answer_sdp, "type": "answer"}))
        await self.pc.setRemoteDescription(answer)

    async def close(self):
        if self.track:
            await self.track.stop()
        if self.pc:
            await self.pc.close()

6.3 Multi-Camera Grid Layout

The dashboard supports multiple grid layouts with automatic stream selection:

Layout Cameras Visible Stream Used Resolution Per-Stream BW
1x1 (fullscreen) 1 Primary (subtype=0) 960x1080 ~3 Mbps
2x2 4 Secondary (subtype=1) 352x288 ~0.5 Mbps each
3x3 8+1 empty Secondary (subtype=1) 352x288 ~0.5 Mbps each
4x2 8 Secondary (subtype=1) 352x288 ~0.5 Mbps each
Custom User-defined Mixed Mixed Sum of selected

Grid Switching Logic:

def get_stream_for_layout(channel: int, layout: str, is_focused: bool) -> str:
    """Determine which RTSP stream to use based on layout context."""
    if layout == "1x1" or is_focused:
        # Fullscreen — use main stream for quality
        return f"rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel={channel}&subtype=0"
    else:
        # Grid view — use sub stream for bandwidth efficiency
        return f"rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel={channel}&subtype=1"

6.4 Stream Relay Through Cloud

[Edge Gateway HLS] --VPN--> [Cloud Nginx] --> [CDN / Dashboard Users]
                              (10.100.0.1:443)
                              
Nginx Configuration:
- Reverse proxy to edge gateway HLS endpoint
- Token-based authentication (JWT)
- Rate limiting per client
- HTTPS termination
- HLS caching (5-second segments)

Nginx HLS Relay Configuration:

# /etc/nginx/conf.d/hls-relay.conf
upstream edge_gateway_hls {
    server 10.100.0.10:8081;  # Via WireGuard tunnel
    keepalive 32;
}

server {
    listen 443 ssl http2;
    server_name streams.example.com;

    ssl_certificate /etc/ssl/certs/stream.crt;
    ssl_certificate_key /etc/ssl/private/stream.key;

    # HLS endpoint authentication
    location /hls/ {
        # JWT validation
        auth_jwt "HLS Stream";
        auth_jwt_key_file /etc/nginx/jwt_key.pub;

        # Rate limiting
        limit_req zone=hls_limit burst=10 nodelay;

        proxy_pass http://edge_gateway_hls/hls/;
        proxy_http_version 1.1;
        proxy_set_header Connection "";
        proxy_buffering off;
        proxy_cache off;

        # CORS for web dashboard
        add_header Access-Control-Allow-Origin "https://dashboard.example.com" always;
        add_header Access-Control-Allow-Methods "GET, OPTIONS" always;

        # HLS-specific headers
        add_header Cache-Control "no-cache" always;
        add_header Access-Control-Expose-Headers "Content-Length" always;

        # Segment caching (brief)
        location ~* \.ts$ {
            proxy_pass http://edge_gateway_hls;
            proxy_cache hls_cache;
            proxy_cache_valid 200 10s;
            expires 10s;
        }
    }

    # WebSocket for real-time status
    location /ws/ {
        proxy_pass http://edge_gateway_hls/ws/;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
    }
}

6.5 Fallback Handling

When a camera stream fails, the following fallback chain is executed:

1. RTSP connection fails
   ├──> Retry with exponential backoff (3 attempts)
   ├──> Attempt UDP transport if TCP fails
   ├──> Circuit breaker opens after 5 consecutive failures
   │
2. Stream stall detected (no frames for 10s)
   ├──> Kill FFmpeg process
   ├──> Restart stream with fresh connection
   │
3. Camera marked OFFLINE
   ├──> Dashboard shows "Camera Offline" placeholder
   ├──> HLS playlist returns 404
   ├──> Last known frame displayed with timestamp overlay
   ├──> Alert sent to operations team
   │
4. Camera recovers
   ├──> Circuit breaker transitions to HALF_OPEN
   ├──> Test stream pulled for 10 seconds
   ├──> On success: circuit CLOSED, stream resumes
   ├──> Dashboard auto-refreshes

HLS Error Response (Camera Offline):

#EXTM3U
#EXT-X-VERSION:3
#EXT-X-TARGETDURATION:2
#EXT-X-MEDIA-SEQUENCE:0
#EXT-X-ERROR: "Camera OFFLINE - Channel 1"
#EXTINF:2.000,
offline_placeholder.ts

7. Edge Buffering & Local Storage

7.1 Storage Architecture

The edge gateway uses a tiered storage system optimized for limited SSD capacity:

+-------------------------------------------------------------------+
|                     Edge Gateway Storage (512GB NVMe)              |
|                                                                    |
|  +---------------------+  +---------------------+  +-----------+  |
|  | Ring Buffer (20GB)  |  | HLS Cache (5GB)     |  | Logs (1GB)|  |
|  | /data/buffer/       |  | /data/hls/          |  |/data/logs/|  |
|  |                     |  |                     |  |           |  |
|  | Alert clips (MP4)   |  | HLS segments (.ts)  |  | Rotated   |  |
|  | Pre-event buffer    |  | Playlists (.m3u8)   |  | daily     |  |
|  | Upload queue        |  | 5-segment window    |  |           |  |
|  | Retry staging       |  | Auto-cleanup        |  |           |  |
|  +----------+----------+  +---------------------+  +-----------+  |
|             |                                                      |
|  +----------v----------+                                           |
|  | Upload Manager      | ---> VPN ---> Cloud S3/MinIO             |
|  | - Parallel uploads  |      (10.100.0.1:9000)                   |
|  | - Retry with backoff|                                           |
|  | - Integrity verify  |                                           |
|  +---------------------+                                           |
+-------------------------------------------------------------------+

Storage Quotas:

Directory Max Size Max Age Cleanup Strategy
/data/buffer/alert_clips/ 15 GB 72 hours LRU + age-based
/data/buffer/pre_event/ 3 GB 24 hours Ring buffer (FIFO)
/data/buffer/upload_queue/ 2 GB 48 hours On successful upload
/data/hls/ 5 GB 10 seconds Immediate (segment rotation)
/data/logs/ 1 GB 30 days Logrotate daily
/tmp/ffmpeg/ 2 GB On process exit Cleanup on restart
Total Reserved ~28 GB

7.2 Ring Buffer for Alert Clips

The ring buffer stores video clips triggered by AI detection events (motion, intrusion, safety violation, etc.).

7.2.1 Ring Buffer Design

# ring_buffer.py — Alert clip ring buffer with disk management
import os
import time
import json
import asyncio
import logging
import shutil
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional, List
import aiofiles
import hashlib

logger = logging.getLogger(__name__)

@dataclass
class ClipMetadata:
    clip_id: str
    channel: int
    camera_name: str
    event_type: str          # motion, intrusion, safety_violation, etc.
    event_confidence: float
    start_time: datetime     # clip start (includes pre-event)
    event_time: datetime     # actual detection time
    end_time: datetime       # clip end (includes post-event)
    duration_seconds: float
    file_path: str
    file_size_bytes: int
    checksum_sha256: str
    upload_status: str       # pending, uploading, uploaded, failed
    upload_attempts: int = 0
    resolution: str = "960x1080"
    fps: int = 25
    tags: List[str] = None

    def __post_init__(self):
        if self.tags is None:
            self.tags = []

class RingBuffer:
    """
    Fixed-size ring buffer for alert clips.
    When full, oldest clips are deleted to make room.
    """

    def __init__(self, base_path: str, max_size_bytes: int, max_age_hours: int):
        self.base_path = Path(base_path)
        self.max_size = max_size_bytes
        self.max_age = timedelta(hours=max_age_hours)
        self.metadata_file = self.base_path / "index.jsonl"
        self._clips: List[ClipMetadata] = []
        self._current_size = 0
        self._lock = asyncio.Lock()

    async def initialize(self):
        """Load existing clips from disk and calculate current usage."""
        self.base_path.mkdir(parents=True, exist_ok=True)

        if self.metadata_file.exists():
            async with aiofiles.open(self.metadata_file, 'r') as f:
                async for line in f:
                    line = line.strip()
                    if line:
                        data = json.loads(line)
                        clip = ClipMetadata(**data)
                        clip.start_time = datetime.fromisoformat(data['start_time'])
                        clip.event_time = datetime.fromisoformat(data['event_time'])
                        clip.end_time = datetime.fromisoformat(data['end_time'])
                        self._clips.append(clip)
                        self._current_size += clip.file_size_bytes

        # Clean old clips on startup
        await self._cleanup_old_clips()
        logger.info(f"Ring buffer initialized: {len(self._clips)} clips, "
                    f"{self._current_size / 1e9:.2f} GB")

    async def store_clip(self, source_path: str, metadata: ClipMetadata) -> ClipMetadata:
        """Store a new clip in the ring buffer."""
        async with self._lock:
            # Make room if needed
            while self._current_size + metadata.file_size_bytes > self.max_size:
                if not self._clips:
                    break
                await self._remove_oldest_clip()

            # Copy file to buffer
            dest_path = self.base_path / f"{metadata.clip_id}.mp4"
            shutil.copy2(source_path, dest_path)

            # Verify checksum
            actual_checksum = await self._calculate_checksum(dest_path)
            if actual_checksum != metadata.checksum_sha256:
                os.remove(dest_path)
                raise ValueError("Checksum mismatch — possible corruption")

            metadata.file_path = str(dest_path)
            self._clips.append(metadata)
            self._current_size += metadata.file_size_bytes

            # Persist metadata
            await self._append_metadata(metadata)

            logger.info(f"Stored clip {metadata.clip_id} "
                        f"({metadata.file_size_bytes / 1e6:.1f} MB)")
            return metadata

    async def get_pending_uploads(self, batch_size: int = 10) -> List[ClipMetadata]:
        """Get clips pending upload, ordered by age."""
        pending = [
            c for c in self._clips
            if c.upload_status in ("pending", "failed")
            and c.upload_attempts < 5
        ]
        return sorted(pending, key=lambda c: c.event_time)[:batch_size]

    async def mark_uploaded(self, clip_id: str):
        """Mark a clip as successfully uploaded."""
        for clip in self._clips:
            if clip.clip_id == clip_id:
                clip.upload_status = "uploaded"
                await self._rewrite_metadata()
                # Optionally delete local file after successful upload
                # await self._remove_clip_file(clip)
                break

    async def mark_failed(self, clip_id: str):
        """Increment failure counter for a clip."""
        for clip in self._clips:
            if clip.clip_id == clip_id:
                clip.upload_attempts += 1
                clip.upload_status = "failed"
                await self._rewrite_metadata()
                break

    async def _remove_oldest_clip(self):
        """Remove the oldest clip to free space."""
        if not self._clips:
            return
        oldest = min(self._clips, key=lambda c: c.event_time)
        await self._remove_clip(oldest)

    async def _remove_clip(self, clip: ClipMetadata):
        """Remove a specific clip from buffer."""
        try:
            if os.path.exists(clip.file_path):
                os.remove(clip.file_path)
        except OSError:
            pass
        self._clips.remove(clip)
        self._current_size -= clip.file_size_bytes

    async def _cleanup_old_clips(self):
        """Remove clips older than max_age."""
        cutoff = datetime.utcnow() - self.max_age
        old_clips = [c for c in self._clips if c.event_time < cutoff]
        for clip in old_clips:
            await self._remove_clip(clip)
        if old_clips:
            await self._rewrite_metadata()
            logger.info(f"Cleaned {len(old_clips)} old clips")

    async def _calculate_checksum(self, file_path: Path) -> str:
        sha = hashlib.sha256()
        async with aiofiles.open(file_path, 'rb') as f:
            while chunk := await f.read(8192):
                sha.update(chunk)
        return sha.hexdigest()

    async def _append_metadata(self, clip: ClipMetadata):
        async with aiofiles.open(self.metadata_file, 'a') as f:
            data = asdict(clip)
            data['start_time'] = clip.start_time.isoformat()
            data['event_time'] = clip.event_time.isoformat()
            data['end_time'] = clip.end_time.isoformat()
            await f.write(json.dumps(data) + '\n')

    async def _rewrite_metadata(self):
        async with aiofiles.open(self.metadata_file, 'w') as f:
            for clip in self._clips:
                data = asdict(clip)
                data['start_time'] = clip.start_time.isoformat()
                data['event_time'] = clip.event_time.isoformat()
                data['end_time'] = clip.end_time.isoformat()
                await f.write(json.dumps(data) + '\n')

    @property
    def stats(self) -> dict:
        return {
            "clip_count": len(self._clips),
            "total_size_bytes": self._current_size,
            "total_size_gb": round(self._current_size / 1e9, 2),
            "max_size_gb": round(self.max_size / 1e9, 2),
            "usage_percent": round(self._current_size / self.max_size * 100, 1),
            "pending_uploads": len([c for c in self._clips if c.upload_status == "pending"]),
            "failed_uploads": len([c for c in self._clips if c.upload_status == "failed"]),
        }

7.3 Upload Queue with Retry

# upload_manager.py — Cloud upload with retry and integrity verification
import asyncio
import aiohttp
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
import json

logger = logging.getLogger(__name__)

@dataclass
class UploadConfig:
    endpoint_url: str = "https://10.100.0.1/api/upload"
    max_retries: int = 5
    base_delay: float = 5.0          # seconds
    max_delay: float = 300.0         # 5 minutes
    timeout: float = 60.0            # per-request timeout
    concurrent_uploads: int = 3
    verify_ssl: bool = True          # False for self-signed in dev

class UploadManager:
    """
    Manages cloud uploads with:
    - Exponential backoff retry
    - Parallel upload limiting
    - Integrity verification
    - Bandwidth throttling
    """

    def __init__(self, config: UploadConfig, ring_buffer: RingBuffer):
        self.cfg = config
        self.buffer = ring_buffer
        self._semaphore = asyncio.Semaphore(config.concurrent_uploads)
        self._session: Optional[aiohttp.ClientSession] = None
        self._running = False

    async def start(self):
        ssl_ctx = False if not self.cfg.verify_ssl else None
        connector = aiohttp.TCPConnector(
            limit=10,
            limit_per_host=5,
            enable_cleanup_closed=True,
            force_close=True,
        )
        timeout = aiohttp.ClientTimeout(total=self.cfg.timeout)
        self._session = aiohttp.ClientSession(
            connector=connector,
            timeout=timeout,
        )
        self._running = True
        asyncio.create_task(self._upload_loop())

    async def _upload_loop(self):
        """Background task that continuously processes the upload queue."""
        while self._running:
            try:
                pending = await self.buffer.get_pending_uploads(batch_size=5)
                if not pending:
                    await asyncio.sleep(5)
                    continue

                tasks = [
                    self._upload_with_retry(clip)
                    for clip in pending
                ]
                await asyncio.gather(*tasks, return_exceptions=True)

            except Exception as e:
                logger.error(f"Upload loop error: {e}")
                await asyncio.sleep(10)

    async def _upload_with_retry(self, clip: ClipMetadata):
        """Upload a clip with exponential backoff retry."""
        async with self._semaphore:
            for attempt in range(1, self.cfg.max_retries + 1):
                try:
                    await self._do_upload(clip)
                    await self.buffer.mark_uploaded(clip.clip_id)
                    logger.info(f"Uploaded clip {clip.clip_id} "
                                f"(attempt {attempt})")
                    return

                except aiohttp.ClientError as e:
                    delay = min(
                        self.cfg.base_delay * (2 ** (attempt - 1)),
                        self.cfg.max_delay
                    )
                    logger.warning(
                        f"Upload failed for {clip.clip_id} "
                        f"(attempt {attempt}/{self.cfg.max_retries}): {e}. "
                        f"Retry in {delay}s"
                    )
                    await asyncio.sleep(delay)

            # All retries exhausted
            await self.buffer.mark_failed(clip.clip_id)
            logger.error(f"Upload permanently failed for {clip.clip_id}")

    async def _do_upload(self, clip: ClipMetadata):
        """Perform the actual HTTP upload."""
        upload_url = f"{self.cfg.endpoint_url}/{clip.channel}"

        form_data = aiohttp.FormData()
        form_data.add_field(
            'file',
            open(clip.file_path, 'rb'),
            filename=f"{clip.clip_id}.mp4",
            content_type='video/mp4',
        )
        form_data.add_field('metadata', json.dumps({
            'clip_id': clip.clip_id,
            'channel': clip.channel,
            'event_type': clip.event_type,
            'event_time': clip.event_time.isoformat(),
            'checksum': clip.checksum_sha256,
        }))

        async with self._session.post(upload_url, data=form_data) as resp:
            if resp.status != 200:
                body = await resp.text()
                raise aiohttp.ClientError(f"HTTP {resp.status}: {body}")

            result = await resp.json()
            # Verify server-side checksum matches
            if result.get('checksum') != clip.checksum_sha256:
                raise ValueError("Server checksum mismatch")

    async def stop(self):
        self._running = False
        if self._session:
            await self._session.close()

    async def get_queue_status(self) -> dict:
        """Get current upload queue status."""
        pending = await self.buffer.get_pending_uploads(batch_size=1000)
        return {
            "pending_count": len(pending),
            "in_progress": self.cfg.concurrent_uploads - self._semaphore._value,
            "endpoint": self.cfg.endpoint_url,
            "last_check": datetime.utcnow().isoformat(),
        }

7.4 Disk Management — Automatic Cleanup

# disk_manager.py — Proactive disk space management
import os
import shutil
import asyncio
import logging
from pathlib import Path
from typing import List, Dict

logger = logging.getLogger(__name__)

class DiskManager:
    """
    Monitors disk usage and enforces quotas.
    Implements emergency cleanup when disk is critically full.
    """

    CRITICAL_THRESHOLD = 95  # % — emergency cleanup
    WARNING_THRESHOLD = 85   # % — proactive cleanup
    TARGET_USAGE = 80        # % — cleanup target

    def __init__(self, mounts: Dict[str, dict]):
        """
        mounts: { path: { max_size_gb, max_age_hours, priority } }
        priority: lower = deleted first during cleanup
        """
        self.mounts = mounts

    async def monitor_loop(self, interval_seconds: float = 60.0):
        """Continuous disk monitoring loop."""
        while True:
            try:
                for path, config in self.mounts.items():
                    usage = self._get_usage(path)
                    logger.debug(f"Disk {path}: {usage['percent']:.1f}% used")

                    if usage['percent'] > self.CRITICAL_THRESHOLD:
                        logger.critical(f"CRITICAL: Disk {path} at "
                                        f"{usage['percent']:.1f}%")
                        await self._emergency_cleanup(path, config)
                    elif usage['percent'] > self.WARNING_THRESHOLD:
                        logger.warning(f"Disk {path} at {usage['percent']:.1f}%")
                        await self._proactive_cleanup(path, config)

            except Exception as e:
                logger.error(f"Disk monitor error: {e}")

            await asyncio.sleep(interval_seconds)

    def _get_usage(self, path: str) -> dict:
        stat = shutil.disk_usage(path)
        total = stat.total
        used = stat.used
        free = stat.free
        return {
            "total_gb": total / 1e9,
            "used_gb": used / 1e9,
            "free_gb": free / 1e9,
            "percent": (used / total) * 100,
        }

    async def _emergency_cleanup(self, path: str, config: dict):
        """Aggressive cleanup to free space immediately."""
        # 1. Delete HLS cache oldest segments aggressively
        hls_dir = Path(path) / "hls"
        if hls_dir.exists():
            segments = sorted(hls_dir.glob("*.ts"), key=lambda f: f.stat().st_mtime)
            for seg in segments[:-3]:  # Keep only newest 3 segments
                seg.unlink()
                logger.info(f"Emergency deleted HLS segment: {seg}")

        # 2. Delete already-uploaded clips
        buffer_dir = Path(path) / "buffer"
        if buffer_dir.exists():
            for clip_file in sorted(buffer_dir.glob("*.mp4"), key=lambda f: f.stat().st_mtime):
                clip_file.unlink()
                logger.info(f"Emergency deleted uploaded clip: {clip_file}")
                usage = self._get_usage(path)
                if usage['percent'] < self.TARGET_USAGE:
                    break

        # 3. Compress logs
        logs_dir = Path(path) / "logs"
        if logs_dir.exists():
            for log_file in sorted(logs_dir.glob("*.log"), key=lambda f: f.stat().st_mtime):
                if not str(log_file).endswith('.gz'):
                    os.system(f"gzip {log_file}")

    async def _proactive_cleanup(self, path: str, config: dict):
        """Normal proactive cleanup — age-based deletion."""
        # Age-based cleanup handled by RingBuffer
        pass

7.5 Pre-Event Buffer (Continuous Circular Recording)

For capturing video before an AI-detected event, a continuous circular buffer is maintained:

# Continuous 60-second rolling buffer per channel (primary stream)
ffmpeg -rtsp_transport tcp -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
  -c:v copy -c:a copy -f segment -segment_time 10 -segment_wrap 6 \
  -reset_timestamps 1 -strftime 0 \
  "/data/buffer/pre_event/ch1_%d.mp4"

This creates 6 x 10-second segments that continuously overwrite:

  • ch1_0.mp4 — newest
  • ch1_1.mp4
  • ...
  • ch1_5.mp4 — oldest (will be overwritten next)

On event detection:

  1. Lock current segments (prevent overwrite)
  2. Continue recording for post_event_seconds (e.g., 30s)
  3. Concatenate pre-event + event + post-event segments
  4. Store in ring buffer
  5. Queue for cloud upload
async def handle_event_detection(self, channel: int, event_time: datetime):
    """Trigger clip capture with pre/post-event padding."""
    pre_buffer_dir = f"/data/buffer/pre_event/ch{channel}"
    clip_id = f"evt_{channel}_{event_time.strftime('%Y%m%d_%H%M%S')}"

    # Collect pre-event segments (last 60 seconds)
    pre_segments = sorted(
        Path(pre_buffer_dir).glob("*.mp4"),
        key=lambda f: f.stat().st_mtime
    )

    # Start post-event recording
    post_event_path = f"/tmp/{clip_id}_post.mp4"
    post_task = asyncio.create_task(
        self._record_post_event(channel, post_event_path, duration=30)
    )
    await post_task

    # Concatenate: pre-event + post-event
    concat_list = f"/tmp/{clip_id}_concat.txt"
    with open(concat_list, 'w') as f:
        for seg in pre_segments:
            f.write(f"file '{seg.absolute()}'\n")
        f.write(f"file '{post_event_path}'\n")

    output_path = f"/data/buffer/alert_clips/{clip_id}.mp4"
    subprocess.run([
        "ffmpeg", "-y", "-f", "concat", "-safe", "0",
        "-i", concat_list, "-c", "copy", output_path
    ], check=True)

    # Store in ring buffer
    file_size = os.path.getsize(output_path)
    checksum = await self._calculate_checksum(output_path)
    metadata = ClipMetadata(
        clip_id=clip_id,
        channel=channel,
        event_type="ai_detection",
        event_confidence=0.85,
        start_time=event_time - timedelta(seconds=60),
        event_time=event_time,
        end_time=event_time + timedelta(seconds=30),
        duration_seconds=90,
        file_path=output_path,
        file_size_bytes=file_size,
        checksum_sha256=checksum,
        upload_status="pending",
    )
    await self.ring_buffer.store_clip(output_path, metadata)

8. Implementation Specifications

8.1 Python Project Structure

edge-gateway/
├── docker/
│   ├── Dockerfile.stream-manager
│   ├── Dockerfile.ffmpeg-worker
│   ├── Dockerfile.hls-segmenter
│   ├── Dockerfile.buffer-manager
│   └── Dockerfile.health-monitor
├── docker-compose.yml
├── requirements.txt
├── config/
│   ├── cameras.yaml
│   ├── gateway.yaml
│   └── logging.yaml
├── src/
│   ├── __init__.py
│   ├── main.py                    # Application entry point
│   ├── api/
│   │   ├── __init__.py
│   │   ├── server.py              # FastAPI HTTP server
│   │   ├── routes/
│   │   │   ├── __init__.py
│   │   │   ├── cameras.py         # Camera CRUD endpoints
│   │   │   ├── streams.py         # Stream control endpoints
│   │   │   ├── hls.py             # HLS playlist proxy
│   │   │   ├── status.py          # Health & status
│   │   │   └── config.py          # Configuration endpoints
│   │   └── middleware/
│   │       ├── auth.py            # JWT authentication
│   │       ├── cors.py            # CORS headers
│   │       └── logging.py         # Request logging
│   ├── core/
│   │   ├── __init__.py
│   │   ├── config.py              # Configuration loader
│   │   ├── events.py              # Event bus (asyncio)
│   │   ├── logging_setup.py       # Structured logging
│   │   └── exceptions.py          # Custom exceptions
│   ├── streams/
│   │   ├── __init__.py
│   │   ├── manager.py             # StreamManager (orchestrator)
│   │   ├── connector.py           # StreamConnector (FFmpeg wrapper)
│   │   ├── registry.py            # CameraRegistry
│   │   ├── circuit_breaker.py     # CircuitBreaker
│   │   ├── status.py              # CameraStatus model
│   │   └── ffmpeg_builder.py      # FFmpeg command builder
│   ├── processing/
│   │   ├── __init__.py
│   │   ├── frame_extractor.py     # FrameExtractor
│   │   ├── multi_extractor.py     # MultiChannelExtractor
│   │   ├── adaptive_bitrate.py    # AdaptiveBitrateController
│   │   └── motion_detector.py     # Motion detection utilities
│   ├── hls/
│   │   ├── __init__.py
│   │   ├── segmenter.py           # HLS segment generation
│   │   ├── playlist.py            # Playlist manager
│   │   └── server.py              # HLS HTTP server
│   ├── storage/
│   │   ├── __init__.py
│   │   ├── ring_buffer.py         # RingBuffer
│   │   ├── upload_manager.py      # UploadManager
│   │   ├── disk_manager.py        # DiskManager
│   │   └── pre_event_buffer.py    # Circular pre-event recording
│   ├── discovery/
│   │   ├── __init__.py
│   │   └── onvif_discovery.py     # ONVIF camera discovery
│   ├── mqtt/
│   │   ├── __init__.py
│   │   └── client.py              # MQTT publisher/subscriber
│   ├── health/
│   │   ├── __init__.py
│   │   ├── monitor.py             # HealthMonitor & Watchdog
│   │   └── metrics.py             # Prometheus metrics
│   └── utils/
│       ├── __init__.py
│       ├── async_utils.py         # Async helpers
│       ├── validators.py          # Input validation
│       └── time_utils.py          # Timestamp utilities
├── tests/
│   ├── unit/
│   ├── integration/
│   └── fixtures/
├── scripts/
│   ├── start.sh
│   ├── health_check.sh
│   └── backup_config.sh
└── docs/
    ├── ARCHITECTURE.md
    └── API.md

8.2 Core Classes — Full Specification

8.2.1 StreamManager (Main Orchestrator)

# src/streams/manager.py
import asyncio
import logging
from typing import Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime

from .connector import StreamConnector, ReconnectConfig
from .registry import CameraRegistry
from .circuit_breaker import CircuitBreaker, CircuitBreakerConfig
from .status import CameraStatus
from .ffmpeg_builder import FFmpegCommandBuilder
from ..core.config import GatewayConfig
from ..health.metrics import MetricsCollector

logger = logging.getLogger(__name__)

@dataclass
class StreamManagerConfig:
    reconnect: ReconnectConfig
    circuit_breaker: CircuitBreakerConfig
    ffmpeg: FFmpegCommandBuilder
    max_concurrent_streams: int = 8

class StreamManager:
    """
    Central orchestrator for all camera streams.
    Manages lifecycle: discovery -> connection -> monitoring -> cleanup.
    """

    def __init__(self, config: StreamManagerConfig, registry: CameraRegistry,
                 metrics: MetricsCollector):
        self.cfg = config
        self.registry = registry
        self.metrics = metrics
        self.connectors: Dict[int, StreamConnector] = {}
        self.circuit_breakers: Dict[int, CircuitBreaker] = {}
        self._monitor_task: Optional[asyncio.Task] = None
        self._running = False

    async def start(self):
        """Start the stream manager and connect all enabled cameras."""
        self._running = True
        cameras = await self.registry.get_all_cameras()

        for camera in cameras:
            if camera.enabled:
                await self._start_camera_stream(camera.channel)

        self._monitor_task = asyncio.create_task(
            self._health_monitor_loop(),
            name="stream-health-monitor"
        )
        logger.info(f"StreamManager started: {len(self.connectors)} streams active")

    async def _start_camera_stream(self, channel: int):
        """Initialize and start a single camera stream."""
        camera = await self.registry.get_camera(channel)
        if not camera:
            logger.error(f"Camera {channel} not found in registry")
            return

        # Create circuit breaker
        cb = CircuitBreaker(f"ch{channel}", self.cfg.circuit_breaker)
        self.circuit_breakers[channel] = cb

        # Build FFmpeg command
        rtsp_url = camera.get_rtsp_url(primary=True)
        ffmpeg_cmd = self.cfg.ffmpeg.build(
            rtsp_url=rtsp_url,
            hls_output=f"/data/hls/ch{channel}.m3u8",
            segment_pattern=f"/data/hls/ch{channel}_%03d.ts",
        )

        # Create and start connector
        connector = StreamConnector(
            channel=channel,
            config=self.cfg.reconnect,
            ffmpeg_cmd=ffmpeg_cmd,
        )
        self.connectors[channel] = connector
        await connector.start()

        logger.info(f"Started stream for channel {channel}: {camera.name}")

    async def stop_camera_stream(self, channel: int):
        """Gracefully stop a camera stream."""
        if channel in self.connectors:
            await self.connectors[channel].stop()
            del self.connectors[channel]
        if channel in self.circuit_breakers:
            del self.circuit_breakers[channel]
        logger.info(f"Stopped stream for channel {channel}")

    async def restart_camera_stream(self, channel: int):
        """Restart a specific camera stream."""
        await self.stop_camera_stream(channel)
        await asyncio.sleep(2)
        await self._start_camera_stream(channel)

    async def get_status(self, channel: Optional[int] = None) -> List[CameraStatus]:
        """Get status for one or all cameras."""
        if channel is not None:
            connector = self.connectors.get(channel)
            if connector:
                return [self._build_status(channel, connector)]
            return []

        return [
            self._build_status(ch, conn)
            for ch, conn in self.connectors.items()
        ]

    def _build_status(self, channel: int, connector: StreamConnector) -> CameraStatus:
        camera = self.registry.get_camera_sync(channel)
        cb = self.circuit_breakers.get(channel)

        return CameraStatus(
            channel=channel,
            name=camera.name if camera else f"CH{channel}",
            enabled=camera.enabled if camera else False,
            state=connector.state.value if hasattr(connector, 'state') else "unknown",
            stream_type="primary" if camera and camera.use_primary else "secondary",
            current_fps=getattr(connector, 'current_fps', 0),
            disconnect_count=connector.consecutive_failures if hasattr(connector, 'consecutive_failures') else 0,
            circuit_breaker_state=cb.state.value if cb else "CLOSED",
        )

    async def _health_monitor_loop(self):
        """Periodic health check for all streams."""
        while self._running:
            try:
                for channel, connector in list(self.connectors.items()):
                    status = self._build_status(channel, connector)

                    # Record metrics
                    self.metrics.record_fps(channel, status.current_fps)
                    self.metrics.record_stream_state(channel, status.state)

                    # Check for anomalies
                    if status.state == "ONLINE" and status.current_fps < 5:
                        logger.warning(f"CH{channel}: Low FPS detected "
                                       f"({status.current_fps:.1f})")

            except Exception as e:
                logger.error(f"Health monitor error: {e}")

            await asyncio.sleep(30)

    async def stop(self):
        """Graceful shutdown of all streams."""
        self._running = False
        if self._monitor_task:
            self._monitor_task.cancel()
            try:
                await self._monitor_task
            except asyncio.CancelledError:
                pass

        # Stop all connectors concurrently
        await asyncio.gather(*[
            connector.stop()
            for connector in self.connectors.values()
        ])
        self.connectors.clear()
        self.circuit_breakers.clear()
        logger.info("StreamManager stopped")

8.2.2 FFmpegCommandBuilder

# src/streams/ffmpeg_builder.py
from dataclasses import dataclass, field
from typing import List, Optional

@dataclass
class FFmpegPreset:
    """Predefined FFmpeg configuration preset."""
    name: str
    transport: str = "tcp"
    timeout_us: int = 5_000_000
    reconnect: bool = True
    reconnect_max_delay: int = 30
    flags: List[str] = field(default_factory=lambda: [
        "+genpts", "+discardcorrupt", "+low_delay"
    ])
    probesize: int = 5_000_000
    analyzeduration: int = 2_000_000

class FFmpegCommandBuilder:
    """
    Builds FFmpeg command lines for various stream operations.
    Supports presets for different use cases.
    """

    PRESETS = {
        "ingest": FFmpegPreset("ingest"),
        "hls": FFmpegPreset("hls", reconnect_max_delay=60),
        "ai_extraction": FFmpegPreset("ai_extraction", probesize=1_000_000),
        "recording": FFmpegPreset("recording", reconnect_max_delay=300),
    }

    def build(self, rtsp_url: str, hls_output: str, segment_pattern: str,
              preset: str = "hls", audio: bool = True,
              segment_duration: int = 2, playlist_size: int = 5) -> List[str]:
        """Build complete FFmpeg command for HLS output."""
        p = self.PRESETS.get(preset, self.PRESETS["hls"])

        cmd = ["ffmpeg", "-nostdin", "-y"]

        # Input options
        cmd.extend(["-rtsp_transport", p.transport])
        cmd.extend(["-stimeout", str(p.timeout_us)])

        if p.reconnect:
            cmd.extend([
                "-reconnect", "1",
                "-reconnect_at_eof", "1",
                "-reconnect_streamed", "1",
                "-reconnect_delay_max", str(p.reconnect_max_delay),
            ])

        cmd.extend(["-fflags", "".join(p.flags)])
        cmd.extend(["-flags", "+low_delay"])
        cmd.extend(["-probesize", str(p.probesize)])
        cmd.extend(["-analyzeduration", str(p.analyzeduration)])

        # Input
        cmd.extend(["-i", rtsp_url])

        # Video codec (copy for efficiency)
        cmd.extend(["-c:v", "copy"])

        # Audio
        if audio:
            cmd.extend(["-c:a", "copy"])
        else:
            cmd.extend(["-an"])

        # HLS output
        cmd.extend(["-f", "hls"])
        cmd.extend(["-hls_time", str(segment_duration)])
        cmd.extend(["-hls_list_size", str(playlist_size)])
        cmd.extend(["-hls_flags", "delete_segments+omit_endlist+program_date_time"])
        cmd.extend(["-hls_segment_type", "mpegts"])
        cmd.extend(["-hls_segment_filename", segment_pattern])
        cmd.append(hls_output)

        return cmd

    def build_ai_extraction(self, rtsp_url: str, target_fps: float = 5.0,
                            resolution: tuple = (640, 480),
                            mode: str = "hybrid") -> List[str]:
        """Build FFmpeg command for AI frame extraction."""
        w, h = resolution
        filters = f"fps={target_fps},scale={w}:{h}"

        if mode == "keyframe_only":
            filters = f"select='eq(pict_type,I)',{filters}"
        elif mode == "hybrid":
            # I-frame priority with fixed FPS fallback
            interval = int(25 / target_fps)
            filters = f"select='eq(pict_type,I)+gte(n,prev_selected_n+{interval})',{filters}"

        return [
            "ffmpeg", "-nostdin", "-y",
            "-rtsp_transport", "tcp",
            "-stimeout", "5000000",
            "-reconnect", "1",
            "-reconnect_at_eof", "1",
            "-reconnect_streamed", "1",
            "-reconnect_delay_max", "30",
            "-fflags", "+genpts+discardcorrupt",
            "-flags", "+low_delay",
            "-i", rtsp_url,
            "-vf", filters,
            "-an",  # No audio for AI
            "-f", "image2pipe",
            "-vcodec", "mjpeg",
            "-q:v", "3",
            "-",
        ]

    def build_recording(self, rtsp_url: str, output_pattern: str,
                        segment_seconds: int = 60) -> List[str]:
        """Build FFmpeg command for segmented recording."""
        return [
            "ffmpeg", "-nostdin", "-y",
            "-rtsp_transport", "tcp",
            "-stimeout", "5000000",
            "-reconnect", "1",
            "-reconnect_at_eof", "1",
            "-reconnect_streamed", "1",
            "-reconnect_delay_max", "300",
            "-fflags", "+genpts+discardcorrupt",
            "-flags", "+low_delay",
            "-i", rtsp_url,
            "-c:v", "copy",
            "-c:a", "copy",
            "-f", "segment",
            "-segment_time", str(segment_seconds),
            "-segment_format", "mp4",
            "-reset_timestamps", "1",
            "-strftime", "1",
            output_pattern,
        ]

    def build_webrtc_bridge(self, rtsp_url: str,
                            output_resolution: tuple = (640, 480)) -> List[str]:
        """Build FFmpeg command for WebRTC bridge (raw video output)."""
        w, h = output_resolution
        return [
            "ffmpeg", "-nostdin", "-y",
            "-rtsp_transport", "tcp",
            "-stimeout", "5000000",
            "-reconnect", "1",
            "-reconnect_at_eof", "1",
            "-reconnect_streamed", "1",
            "-reconnect_delay_max", "30",
            "-fflags", "+genpts+discardcorrupt",
            "-flags", "+low_delay",
            "-i", rtsp_url,
            "-vf", f"scale={w}:{h},format=yuv420p",
            "-c:v", "rawvideo",
            "-pix_fmt", "yuv420p",
            "-f", "rawvideo",
            "-",
        ]

8.2.3 CameraRegistry

# src/streams/registry.py
import yaml
import asyncio
import logging
from typing import List, Optional, Dict
from dataclasses import dataclass, field
from pathlib import Path
import aiofiles

logger = logging.getLogger(__name__)

@dataclass
class CameraConfig:
    channel: int
    name: str
    enabled: bool = True
    primary_stream_url: str = ""
    secondary_stream_url: str = ""
    use_primary: bool = False
    ai_enabled: bool = True
    ai_fps: float = 5.0
    ai_resolution: str = "640x480"
    ai_mode: str = "hybrid"
    recording_enabled: bool = True
    pre_event_seconds: int = 10
    post_event_seconds: int = 30
    tags: Dict[str, str] = field(default_factory=dict)

    def get_rtsp_url(self, primary: bool = False) -> str:
        if primary and self.primary_stream_url:
            return self.primary_stream_url
        return self.secondary_stream_url or self.primary_stream_url

class CameraRegistry:
    """
    Manages camera configuration from YAML file.
    Supports hot-reloading and ONVIF discovery integration.
    """

    def __init__(self, config_path: str):
        self.config_path = Path(config_path)
        self._cameras: Dict[int, CameraConfig] = {}
        self._lock = asyncio.Lock()

    async def load(self):
        """Load camera configuration from YAML."""
        async with self._lock:
            if not self.config_path.exists():
                logger.warning(f"Config file not found: {self.config_path}")
                return

            async with aiofiles.open(self.config_path, 'r') as f:
                content = await f.read()

            data = yaml.safe_load(content)
            self._cameras = {}

            for cam_data in data.get('cameras', []):
                cam = CameraConfig(
                    channel=cam_data['channel'],
                    name=cam_data['name'],
                    enabled=cam_data.get('enabled', True),
                    primary_stream_url=cam_data.get('primary_stream', {}).get('url', ''),
                    secondary_stream_url=cam_data.get('secondary_stream', {}).get('url', ''),
                    ai_enabled=cam_data.get('ai_config', {}).get('enabled', True),
                    ai_fps=cam_data.get('ai_config', {}).get('inference_fps', 5.0),
                    ai_resolution=cam_data.get('ai_config', {}).get('resolution', '640x480'),
                    ai_mode=cam_data.get('ai_config', {}).get('mode', 'hybrid'),
                    recording_enabled=cam_data.get('recording_config', {}).get('enabled', True),
                    pre_event_seconds=cam_data.get('recording_config', {}).get('pre_event_seconds', 10),
                    post_event_seconds=cam_data.get('recording_config', {}).get('post_event_seconds', 30),
                    tags=cam_data.get('tags', {}),
                )
                self._cameras[cam.channel] = cam

            logger.info(f"Loaded {len(self._cameras)} cameras from config")

    async def save(self):
        """Persist current configuration to YAML."""
        async with self._lock:
            data = {
                'cameras': [],
                'settings': {
                    'default_ai_fps': 5.0,
                    'default_inference_resolution': '640x480',
                }
            }
            for cam in self._cameras.values():
                data['cameras'].append({
                    'channel': cam.channel,
                    'name': cam.name,
                    'enabled': cam.enabled,
                    'primary_stream': {'url': cam.primary_stream_url},
                    'secondary_stream': {'url': cam.secondary_stream_url},
                    'ai_config': {
                        'enabled': cam.ai_enabled,
                        'inference_fps': cam.ai_fps,
                        'resolution': cam.ai_resolution,
                        'mode': cam.ai_mode,
                    },
                    'recording_config': {
                        'enabled': cam.recording_enabled,
                        'pre_event_seconds': cam.pre_event_seconds,
                        'post_event_seconds': cam.post_event_seconds,
                    },
                    'tags': cam.tags,
                })

            async with aiofiles.open(self.config_path, 'w') as f:
                await f.write(yaml.dump(data, default_flow_style=False, sort_keys=False))

    async def get_all_cameras(self) -> List[CameraConfig]:
        return list(self._cameras.values())

    async def get_camera(self, channel: int) -> Optional[CameraConfig]:
        return self._cameras.get(channel)

    def get_camera_sync(self, channel: int) -> Optional[CameraConfig]:
        return self._cameras.get(channel)

    async def update_camera(self, channel: int, updates: dict):
        async with self._lock:
            if channel not in self._cameras:
                raise ValueError(f"Camera {channel} not found")
            cam = self._cameras[channel]
            for key, value in updates.items():
                if hasattr(cam, key):
                    setattr(cam, key, value)
            await self.save()

    async def add_camera(self, camera: CameraConfig):
        async with self._lock:
            if camera.channel in self._cameras:
                raise ValueError(f"Camera {camera.channel} already exists")
            self._cameras[camera.channel] = camera
            await self.save()

    async def remove_camera(self, channel: int):
        async with self._lock:
            if channel in self._cameras:
                del self._cameras[channel]
                await self.save()

8.2.4 HealthMonitor & Watchdog

# src/health/monitor.py
import asyncio
import logging
import subprocess
from datetime import datetime
from typing import Dict, List, Optional, Callable
from dataclasses import dataclass, field

logger = logging.getLogger(__name__)

@dataclass
class HealthReport:
    timestamp: datetime
    gateway_id: str
    overall_status: str  # healthy, degraded, critical
    dvr_reachable: bool
    dvr_latency_ms: float
    streams_online: int
    streams_total: int
    vpn_tunnel_up: bool
    vpn_latency_ms: float
    disk_usage_percent: float
    memory_usage_percent: float
    cpu_usage_percent: float
    issues: List[str] = field(default_factory=list)
    stream_details: Dict[int, dict] = field(default_factory=dict)

class HealthMonitor:
    """
    Comprehensive health monitoring for the edge gateway.
    Checks DVR connectivity, stream health, VPN tunnel, and system resources.
    """

    def __init__(self, gateway_id: str, dvr_host: str, vpn_endpoint: str,
                 check_interval: float = 30.0):
        self.gateway_id = gateway_id
        self.dvr_host = dvr_host
        self.vpn_endpoint = vpn_endpoint
        self.check_interval = check_interval
        self._callbacks: List[Callable[[HealthReport], None]] = []
        self._running = False
        self._task: Optional[asyncio.Task] = None

    def on_health_change(self, callback: Callable[[HealthReport], None]):
        self._callbacks.append(callback)

    async def start(self):
        self._running = True
        self._task = asyncio.create_task(self._monitor_loop(), name="health-monitor")

    async def _monitor_loop(self):
        while self._running:
            try:
                report = await self._run_checks()
                for cb in self._callbacks:
                    try:
                        if asyncio.iscoroutinefunction(cb):
                            await cb(report)
                        else:
                            cb(report)
                    except Exception as e:
                        logger.error(f"Health callback error: {e}")
            except Exception as e:
                logger.error(f"Health check error: {e}")
            await asyncio.sleep(self.check_interval)

    async def _run_checks(self) -> HealthReport:
        timestamp = datetime.utcnow()
        issues = []

        # Check DVR reachability
        dvr_reachable, dvr_latency = await self._ping_host(self.dvr_host)
        if not dvr_reachable:
            issues.append(f"DVR unreachable: {self.dvr_host}")

        # Check VPN tunnel
        vpn_up, vpn_latency = await self._ping_host(self.vpn_endpoint)
        if not vpn_up:
            issues.append(f"VPN tunnel down: {self.vpn_endpoint}")

        # Check system resources
        disk_pct = await self._get_disk_usage()
        memory_pct = await self._get_memory_usage()
        cpu_pct = await self._get_cpu_usage()

        if disk_pct > 90:
            issues.append(f"Disk critical: {disk_pct:.1f}%")
        elif disk_pct > 80:
            issues.append(f"Disk warning: {disk_pct:.1f}%")

        if memory_pct > 90:
            issues.append(f"Memory critical: {memory_pct:.1f}%")

        # Determine overall status
        if len(issues) == 0:
            status = "healthy"
        elif any("critical" in i for i in issues):
            status = "critical"
        else:
            status = "degraded"

        return HealthReport(
            timestamp=timestamp,
            gateway_id=self.gateway_id,
            overall_status=status,
            dvr_reachable=dvr_reachable,
            dvr_latency_ms=dvr_latency,
            streams_online=0,  # Filled by StreamManager
            streams_total=8,
            vpn_tunnel_up=vpn_up,
            vpn_latency_ms=vpn_latency,
            disk_usage_percent=disk_pct,
            memory_usage_percent=memory_pct,
            cpu_usage_percent=cpu_pct,
            issues=issues,
        )

    async def _ping_host(self, host: str, count: int = 3) -> tuple:
        """Ping a host and return (reachable, avg_latency_ms)."""
        try:
            proc = await asyncio.create_subprocess_exec(
                "ping", "-c", str(count), "-W", "2", host,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE,
            )
            stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=10)
            output = stdout.decode()

            if "0% packet loss" in output or proc.returncode == 0:
                # Parse latency
                try:
                    avg_line = [l for l in output.split('\n') if 'avg' in l][-1]
                    avg_ms = float(avg_line.split('/')[-3])
                    return True, avg_ms
                except (IndexError, ValueError):
                    return True, 0.0
            return False, -1.0
        except Exception:
            return False, -1.0

    async def _get_disk_usage(self) -> float:
        try:
            proc = await asyncio.create_subprocess_exec(
                "df", "/data", "--output=pcent",
                stdout=asyncio.subprocess.PIPE,
            )
            stdout, _ = await proc.communicate()
            lines = stdout.decode().strip().split('\n')
            if len(lines) >= 2:
                return float(lines[1].replace('%', ''))
        except Exception:
            pass
        return 0.0

    async def _get_memory_usage(self) -> float:
        try:
            proc = await asyncio.create_subprocess_exec(
                "free", "|", "grep", "Mem",
                stdout=asyncio.subprocess.PIPE,
            )
            stdout, _ = await proc.communicate()
            # Parse: Mem: total used free shared buff/cache available
            parts = stdout.decode().split()
            total = float(parts[1])
            used = float(parts[2])
            return (used / total) * 100
        except Exception:
            return 0.0

    async def _get_cpu_usage(self) -> float:
        try:
            proc = await asyncio.create_subprocess_exec(
                "top", "-bn1", "|", "grep", "Cpu(s)",
                stdout=asyncio.subprocess.PIPE,
            )
            stdout, _ = await proc.communicate()
            line = stdout.decode()
            # Parse: %Cpu(s): 10.5 us,  5.2 sy, ...
            if 'us' in line:
                us = float(line.split('us')[0].split()[-1])
                sy = float(line.split('sy')[0].split()[-1])
                return us + sy
        except Exception:
            return 0.0

    async def stop(self):
        self._running = False
        if self._task:
            self._task.cancel()
            try:
                await self._task
            except asyncio.CancelledError:
                pass

8.2.5 FastAPI Server (Diagnostic UI & API)

# src/api/server.py
from fastapi import FastAPI, HTTPException, Depends, WebSocket
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse, FileResponse
import asyncio
import logging
from typing import List, Optional

from ..streams.manager import StreamManager
from ..streams.status import CameraStatus
from ..health.monitor import HealthMonitor, HealthReport
from ..core.config import GatewayConfig

logger = logging.getLogger(__name__)

# HTML for minimal diagnostic UI
DIAGNOSTIC_HTML = """
<!DOCTYPE html>
<html>
<head>
    <title>Edge Gateway Diagnostics</title>
    <style>
        body { font-family: monospace; background: #1a1a2e; color: #eee; padding: 20px; }
        h1 { color: #00d4ff; }
        .status-healthy { color: #00ff88; }
        .status-degraded { color: #ffaa00; }
        .status-critical { color: #ff4444; }
        .status-offline { color: #888; }
        table { border-collapse: collapse; width: 100%; margin-top: 20px; }
        th, td { border: 1px solid #333; padding: 8px; text-align: left; }
        th { background: #16213e; }
        tr:nth-child(even) { background: #0f3460; }
        .metric { display: inline-block; margin: 10px 20px 10px 0; }
        .metric-label { color: #888; font-size: 12px; }
        .metric-value { font-size: 24px; font-weight: bold; }
        #log { background: #0a0a1a; padding: 10px; height: 200px; overflow-y: auto; 
               font-size: 11px; margin-top: 20px; border: 1px solid #333; }
    </style>
</head>
<body>
    <h1>Edge Gateway Diagnostics — {{gateway_id}}</h1>
    <div>
        <div class="metric">
            <div class="metric-label">Status</div>
            <div class="metric-value status-{{health_status}}">{{health_status.upper()}}</div>
        </div>
        <div class="metric">
            <div class="metric-label">Streams Online</div>
            <div class="metric-value">{{streams_online}}/{{streams_total}}</div>
        </div>
        <div class="metric">
            <div class="metric-label">DVR</div>
            <div class="metric-value">{{dvr_status}}</div>
        </div>
        <div class="metric">
            <div class="metric-label">VPN</div>
            <div class="metric-value">{{vpn_status}}</div>
        </div>
        <div class="metric">
            <div class="metric-label">Disk</div>
            <div class="metric-value">{{disk_usage}}%</div>
        </div>
    </div>
    <h2>Camera Streams</h2>
    <table>
        <tr>
            <th>CH</th><th>Name</th><th>State</th><th>FPS</th>
            <th>Bitrate</th><th>Resolution</th><th>Uptime</th><th>Circuit</th>
        </tr>
        {{#cameras}}
        <tr>
            <td>{{channel}}</td>
            <td>{{name}}</td>
            <td class="status-{{state}}">{{state}}</td>
            <td>{{fps}}</td>
            <td>{{bitrate}}</td>
            <td>{{resolution}}</td>
            <td>{{uptime}}</td>
            <td>{{circuit_breaker}}</td>
        </tr>
        {{/cameras}}
    </table>
    <h2>System Log</h2>
    <div id="log">{{system_log}}</div>
    <script>
        setTimeout(() => location.reload(), 5000);
    </script>
</body>
</html>
"""

def create_app(stream_manager: StreamManager, health_monitor: HealthMonitor,
               config: GatewayConfig) -> FastAPI:
    app = FastAPI(title="Edge Gateway API", version="1.0.0")

    # CORS
    app.add_middleware(
        CORSMiddleware,
        allow_origins=["https://dashboard.example.com"],
        allow_credentials=True,
        allow_methods=["*"],
        allow_headers=["*"],
    )

    @app.get("/", response_class=HTMLResponse)
    async def diagnostic_ui():
        """Minimal diagnostic HTML UI."""
        status = await stream_manager.get_status()
        health = await health_monitor._run_checks()

        # Simple template rendering (in production use Jinja2)
        cameras_html = ""
        for cam in status:
            cameras_html += f"""
            <tr>
                <td>{cam.channel}</td>
                <td>{cam.name}</td>
                <td class="status-{cam.state}">{cam.state}</td>
                <td>{cam.current_fps:.1f}</td>
                <td>{cam.current_bitrate_kbps:.0f}</td>
                <td>{cam.resolution}</td>
                <td>{cam.uptime_seconds:.0f}s</td>
                <td>{cam.circuit_breaker_state}</td>
            </tr>"""

        html = DIAGNOSTIC_HTML
        html = html.replace("{{gateway_id}}", config.gateway_id)
        html = html.replace("{{health_status}}", health.overall_status)
        html = html.replace("{{streams_online}}", str(health.streams_online))
        html = html.replace("{{streams_total}}", str(health.streams_total))
        html = html.replace("{{dvr_status}}", "ONLINE" if health.dvr_reachable else "OFFLINE")
        html = html.replace("{{vpn_status}}", "UP" if health.vpn_tunnel_up else "DOWN")
        html = html.replace("{{disk_usage}}", f"{health.disk_usage_percent:.1f}")
        html = html.replace("{{#cameras}}", "")
        html = html.replace("{{/cameras}}", "")
        # Insert camera rows between table rows
        html_parts = html.split("<tr>\n            <th>CH</th>")
        if len(html_parts) == 2:
            html = html_parts[0] + cameras_html + "<tr>\n            <th>CH</th>" + html_parts[1]

        return HTMLResponse(content=html)

    @app.get("/api/v1/status")
    async def get_status():
        """Get overall gateway status."""
        streams = await stream_manager.get_status()
        health = await health_monitor._run_checks()
        return {
            "gateway_id": config.gateway_id,
            "timestamp": datetime.utcnow().isoformat(),
            "health": health.overall_status,
            "dvr": {"reachable": health.dvr_reachable, "latency_ms": health.dvr_latency_ms},
            "vpn": {"up": health.vpn_tunnel_up, "latency_ms": health.vpn_latency_ms},
            "system": {
                "disk_percent": health.disk_usage_percent,
                "memory_percent": health.memory_usage_percent,
                "cpu_percent": health.cpu_usage_percent,
            },
            "streams": [s.to_dict() for s in streams],
        }

    @app.get("/api/v1/cameras")
    async def list_cameras():
        """List all configured cameras."""
        return await stream_manager.registry.get_all_cameras()

    @app.get("/api/v1/cameras/{channel}")
    async def get_camera(channel: int):
        camera = await stream_manager.registry.get_camera(channel)
        if not camera:
            raise HTTPException(status_code=404, detail=f"Camera {channel} not found")
        return camera

    @app.post("/api/v1/cameras/{channel}/restart")
    async def restart_camera_stream(channel: int):
        """Restart a specific camera stream."""
        await stream_manager.restart_camera_stream(channel)
        return {"status": "restarted", "channel": channel}

    @app.get("/api/v1/streams/{channel}/status")
    async def get_stream_status(channel: int):
        """Get detailed stream status."""
        status = await stream_manager.get_status(channel)
        if not status:
            raise HTTPException(status_code=404, detail=f"Stream {channel} not found")
        return status[0].to_dict()

    @app.get("/api/v1/hls/{channel}.m3u8")
    async def get_hls_playlist(channel: int):
        """Proxy HLS playlist for a channel."""
        playlist_path = f"/data/hls/ch{channel}.m3u8"
        if not os.path.exists(playlist_path):
            raise HTTPException(status_code=404, detail="Playlist not available")
        return FileResponse(playlist_path, media_type="application/vnd.apple.mpegurl")

    @app.get("/api/v1/hls/{segment}")
    async def get_hls_segment(segment: str):
        """Serve HLS segment file."""
        segment_path = f"/data/hls/{segment}"
        if not os.path.exists(segment_path):
            raise HTTPException(status_code=404, detail="Segment not found")
        return FileResponse(segment_path, media_type="video/mp2t")

    @app.websocket("/ws/live")
    async def websocket_status(websocket: WebSocket):
        """WebSocket for real-time status updates."""
        await websocket.accept()
        try:
            while True:
                streams = await stream_manager.get_status()
                health = await health_monitor._run_checks()
                await websocket.send_json({
                    "timestamp": datetime.utcnow().isoformat(),
                    "health": health.overall_status,
                    "streams": [s.to_dict() for s in streams],
                })
                await asyncio.sleep(5)
        except Exception:
            await websocket.close()

    return app

8.3 Docker Compose Configuration

# docker-compose.yml — Edge Gateway Full Stack
version: "3.8"

services:
  # ============================================
  # Core: Stream Manager (API + Orchestration)
  # ============================================
  stream-manager:
    build:
      context: .
      dockerfile: docker/Dockerfile.stream-manager
    container_name: stream-manager
    hostname: stream-manager
    restart: unless-stopped
    ports:
      - "8080:8080"       # API + Diagnostic UI
    volumes:
      - /data/config:/app/config:ro
      - /data/logs:/app/logs
      - /data/hls:/data/hls:ro         # Read HLS segments
      - /data/buffer:/data/buffer       # Access buffer
      - /var/run/docker.sock:/var/run/docker.sock:ro  # Container mgmt
    environment:
      - GATEWAY_ID=${GATEWAY_ID:-edge-ind-01}
      - DVR_HOST=${DVR_HOST:-192.168.29.200}
      - DVR_RTSP_PORT=${DVR_RTSP_PORT:-554}
      - DVR_USERNAME=${DVR_USERNAME:-admin}
      - DVR_PASSWORD=${DVR_PASSWORD}
      - LOG_LEVEL=${LOG_LEVEL:-INFO}
      - STREAM_TRANSPORT=${STREAM_TRANSPORT:-tcp}
      - HEALTH_CHECK_INTERVAL=${HEALTH_CHECK_INTERVAL:-30}
      - MQTT_BROKER_HOST=mqtt-broker
      - MQTT_BROKER_PORT=1883
      - HLS_OUTPUT_DIR=/data/hls
      - BUFFER_DIR=/data/buffer
    networks:
      - edge-internal
    cpus: "2.0"
    mem_limit: 512m
    logging:
      driver: "json-file"
      options:
        max-size: "10m"
        max-file: "5"
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8080/api/v1/status"]
      interval: 30s
      timeout: 5s
      retries: 3
      start_period: 30s
    depends_on:
      - mqtt-broker

  # ============================================
  # FFmpeg Workers (CH1-CH4 — Primary Streams)
  # ============================================
  ffmpeg-worker-1:
    image: jrottenberg/ffmpeg:6.1-ubuntu2204
    container_name: ffmpeg-worker-1
    restart: unless-stopped
    command: >
      ffmpeg -nostdin -y
      -rtsp_transport tcp -stimeout 5000000
      -reconnect 1 -reconnect_at_eof 1 -reconnect_streamed 1 -reconnect_delay_max 30
      -fflags +genpts+discardcorrupt -flags +low_delay
      -probesize 5000000 -analyzeduration 2000000
      -i "rtsp://${DVR_USERNAME}:${DVR_PASSWORD}@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0"
      -c:v copy -c:a copy
      -f hls -hls_time 2 -hls_list_size 5
      -hls_flags delete_segments+omit_endlist+program_date_time
      -hls_segment_filename "/data/hls/ch1_%03d.ts"
      "/data/hls/ch1.m3u8"
      -c:v copy -c:a copy
      -f segment -segment_time 60 -segment_format mp4
      -reset_timestamps 1 -strftime 1
      "/data/buffer/continuous/ch1_%Y%m%d_%H%M%S.mp4"
    volumes:
      - /data/hls:/data/hls
      - /data/buffer/continuous:/data/buffer/continuous
    environment:
      - DVR_PASSWORD=${DVR_PASSWORD}
    networks:
      - edge-internal
    cpus: "2.0"
    mem_limit: 256m
    logging:
      driver: "json-file"
      options:
        max-size: "5m"
        max-file: "3"
    deploy:
      restart_policy:
        condition: on-failure
        delay: 5s
        max_attempts: 10

  ffmpeg-worker-2:
    image: jrottenberg/ffmpeg:6.1-ubuntu2204
    container_name: ffmpeg-worker-2
    restart: unless-stopped
    command: >
      ffmpeg -nostdin -y
      -rtsp_transport tcp -stimeout 5000000
      -reconnect 1 -reconnect_at_eof 1 -reconnect_streamed 1 -reconnect_delay_max 30
      -fflags +genpts+discardcorrupt -flags +low_delay
      -probesize 5000000 -analyzeduration 2000000
      -i "rtsp://${DVR_USERNAME}:${DVR_PASSWORD}@192.168.29.200:554/cam/realmonitor?channel=2&subtype=0"
      -c:v copy -c:a copy
      -f hls -hls_time 2 -hls_list_size 5
      -hls_flags delete_segments+omit_endlist+program_date_time
      -hls_segment_filename "/data/hls/ch2_%03d.ts"
      "/data/hls/ch2.m3u8"
      -c:v copy -c:a copy
      -f segment -segment_time 60 -segment_format mp4
      -reset_timestamps 1 -strftime 1
      "/data/buffer/continuous/ch2_%Y%m%d_%H%M%S.mp4"
    volumes:
      - /data/hls:/data/hls
      - /data/buffer/continuous:/data/buffer/continuous
    environment:
      - DVR_PASSWORD=${DVR_PASSWORD}
    networks:
      - edge-internal
    cpus: "2.0"
    mem_limit: 256m
    logging:
      driver: "json-file"
      options:
        max-size: "5m"
        max-file: "3"
    deploy:
      restart_policy:
        condition: on-failure
        delay: 5s
        max_attempts: 10

  ffmpeg-worker-3:
    image: jrottenberg/ffmpeg:6.1-ubuntu2204
    container_name: ffmpeg-worker-3
    restart: unless-stopped
    command: >
      ffmpeg -nostdin -y
      -rtsp_transport tcp -stimeout 5000000
      -reconnect 1 -reconnect_at_eof 1 -reconnect_streamed 1 -reconnect_delay_max 30
      -fflags +genpts+discardcorrupt -flags +low_delay
      -probesize 5000000 -analyzeduration 2000000
      -i "rtsp://${DVR_USERNAME}:${DVR_PASSWORD}@192.168.29.200:554/cam/realmonitor?channel=3&subtype=0"
      -c:v copy -c:a copy
      -f hls -hls_time 2 -hls_list_size 5
      -hls_flags delete_segments+omit_endlist+program_date_time
      -hls_segment_filename "/data/hls/ch3_%03d.ts"
      "/data/hls/ch3.m3u8"
    volumes:
      - /data/hls:/data/hls
    environment:
      - DVR_PASSWORD=${DVR_PASSWORD}
    networks:
      - edge-internal
    cpus: "2.0"
    mem_limit: 256m

  ffmpeg-worker-4:
    image: jrottenberg/ffmpeg:6.1-ubuntu2204
    container_name: ffmpeg-worker-4
    restart: unless-stopped
    command: >
      ffmpeg -nostdin -y
      -rtsp_transport tcp -stimeout 5000000
      -reconnect 1 -reconnect_at_eof 1 -reconnect_streamed 1 -reconnect_delay_max 30
      -fflags +genpts+discardcorrupt -flags +low_delay
      -probesize 5000000 -analyzeduration 2000000
      -i "rtsp://${DVR_USERNAME}:${DVR_PASSWORD}@192.168.29.200:554/cam/realmonitor?channel=4&subtype=0"
      -c:v copy -c:a copy
      -f hls -hls_time 2 -hls_list_size 5
      -hls_flags delete_segments+omit_endlist+program_date_time
      -hls_segment_filename "/data/hls/ch4_%03d.ts"
      "/data/hls/ch4.m3u8"
    volumes:
      - /data/hls:/data/hls
    environment:
      - DVR_PASSWORD=${DVR_PASSWORD}
    networks:
      - edge-internal
    cpus: "2.0"
    mem_limit: 256m

  # ============================================
  # FFmpeg Workers (CH5-CH8 — Secondary Streams)
  # ============================================
  ffmpeg-worker-5:
    image: jrottenberg/ffmpeg:6.1-ubuntu2204
    container_name: ffmpeg-worker-5
    restart: unless-stopped
    command: >
      ffmpeg -nostdin -y
      -rtsp_transport tcp -stimeout 5000000
      -reconnect 1 -reconnect_at_eof 1 -reconnect_streamed 1 -reconnect_delay_max 30
      -fflags +genpts+discardcorrupt -flags +low_delay
      -i "rtsp://${DVR_USERNAME}:${DVR_PASSWORD}@192.168.29.200:554/cam/realmonitor?channel=5&subtype=1"
      -c:v copy -c:a copy
      -f hls -hls_time 2 -hls_list_size 5
      -hls_flags delete_segments+omit_endlist+program_date_time
      -hls_segment_filename "/data/hls/ch5_%03d.ts"
      "/data/hls/ch5.m3u8"
    volumes:
      - /data/hls:/data/hls
    networks:
      - edge-internal
    cpus: "1.0"
    mem_limit: 128m

  ffmpeg-worker-6:
    image: jrottenberg/ffmpeg:6.1-ubuntu2204
    container_name: ffmpeg-worker-6
    restart: unless-stopped
    command: >
      ffmpeg -nostdin -y
      -rtsp_transport tcp -stimeout 5000000
      -reconnect 1 -reconnect_at_eof 1 -reconnect_streamed 1 -reconnect_delay_max 30
      -fflags +genpts+discardcorrupt -flags +low_delay
      -i "rtsp://${DVR_USERNAME}:${DVR_PASSWORD}@192.168.29.200:554/cam/realmonitor?channel=6&subtype=1"
      -c:v copy -c:a copy
      -f hls -hls_time 2 -hls_list_size 5
      -hls_flags delete_segments+omit_endlist+program_date_time
      -hls_segment_filename "/data/hls/ch6_%03d.ts"
      "/data/hls/ch6.m3u8"
    volumes:
      - /data/hls:/data/hls
    networks:
      - edge-internal
    cpus: "1.0"
    mem_limit: 128m

  ffmpeg-worker-7:
    image: jrottenberg/ffmpeg:6.1-ubuntu2204
    container_name: ffmpeg-worker-7
    restart: unless-stopped
    command: >
      ffmpeg -nostdin -y
      -rtsp_transport tcp -stimeout 5000000
      -reconnect 1 -reconnect_at_eof 1 -reconnect_streamed 1 -reconnect_delay_max 30
      -fflags +genpts+discardcorrupt -flags +low_delay
      -i "rtsp://${DVR_USERNAME}:${DVR_PASSWORD}@192.168.29.200:554/cam/realmonitor?channel=7&subtype=1"
      -c:v copy -c:a copy
      -f hls -hls_time 2 -hls_list_size 5
      -hls_flags delete_segments+omit_endlist+program_date_time
      -hls_segment_filename "/data/hls/ch7_%03d.ts"
      "/data/hls/ch7.m3u8"
    volumes:
      - /data/hls:/data/hls
    networks:
      - edge-internal
    cpus: "1.0"
    mem_limit: 128m

  ffmpeg-worker-8:
    image: jrottenberg/ffmpeg:6.1-ubuntu2204
    container_name: ffmpeg-worker-8
    restart: unless-stopped
    command: >
      ffmpeg -nostdin -y
      -rtsp_transport tcp -stimeout 5000000
      -reconnect 1 -reconnect_at_eof 1 -reconnect_streamed 1 -reconnect_delay_max 30
      -fflags +genpts+discardcorrupt -flags +low_delay
      -i "rtsp://${DVR_USERNAME}:${DVR_PASSWORD}@192.168.29.200:554/cam/realmonitor?channel=8&subtype=1"
      -c:v copy -c:a copy
      -f hls -hls_time 2 -hls_list_size 5
      -hls_flags delete_segments+omit_endlist+program_date_time
      -hls_segment_filename "/data/hls/ch8_%03d.ts"
      "/data/hls/ch8.m3u8"
    volumes:
      - /data/hls:/data/hls
    networks:
      - edge-internal
    cpus: "1.0"
    mem_limit: 128m

  # ============================================
  # AI Frame Extractor
  # ============================================
  frame-extractor:
    build:
      context: .
      dockerfile: docker/Dockerfile.frame-extractor
    container_name: frame-extractor
    restart: unless-stopped
    volumes:
      - /data/config:/app/config:ro
      - /data/logs:/app/logs
    environment:
      - DVR_HOST=192.168.29.200
      - DVR_USERNAME=${DVR_USERNAME:-admin}
      - DVR_PASSWORD=${DVR_PASSWORD}
      - AI_INFERENCE_FPS=${AI_INFERENCE_FPS:-5}
      - AI_KEYFRAME_ONLY=${AI_KEYFRAME_ONLY:-true}
      - AI_RESOLUTION=${AI_RESOLUTION:-640x480}
      - MQTT_BROKER_HOST=mqtt-broker
      - MQTT_BROKER_PORT=1883
      - MQTT_TOPIC_PREFIX=ai/frames
      - LOG_LEVEL=${LOG_LEVEL:-INFO}
    networks:
      - edge-internal
    cpus: "2.0"
    mem_limit: 512m
    depends_on:
      - mqtt-broker

  # ============================================
  # Buffer Manager (Upload Queue)
  # ============================================
  buffer-manager:
    build:
      context: .
      dockerfile: docker/Dockerfile.buffer-manager
    container_name: buffer-manager
    restart: unless-stopped
    volumes:
      - /data/buffer:/data/buffer
      - /data/config:/app/config:ro
      - /data/logs:/app/logs
    environment:
      - BUFFER_MAX_SIZE_GB=${BUFFER_MAX_SIZE_GB:-20}
      - BUFFER_RETENTION_HOURS=${BUFFER_RETENTION_HOURS:-72}
      - UPLOAD_ENDPOINT=${UPLOAD_ENDPOINT:-https://10.100.0.1/api/upload}
      - UPLOAD_MAX_RETRIES=${UPLOAD_MAX_RETRIES:-5}
      - UPLOAD_RETRY_BASE_DELAY=${UPLOAD_RETRY_BASE_DELAY:-5}
      - UPLOAD_CHUNK_SIZE_MB=${UPLOAD_CHUNK_SIZE_MB:-10}
      - VPN_ENABLED=${VPN_ENABLED:-true}
      - LOG_LEVEL=${LOG_LEVEL:-INFO}
    networks:
      - edge-internal
      - vpn-net
    cpus: "1.0"
    mem_limit: 256m
    depends_on:
      - stream-manager

  # ============================================
  # MQTT Broker (Internal Messaging)
  # ============================================
  mqtt-broker:
    image: eclipse-mosquitto:2.0.18
    container_name: mqtt-broker
    restart: unless-stopped
    ports:
      - "127.0.0.1:1883:1883"   # Local only — not exposed externally
    volumes:
      - ./config/mosquitto.conf:/mosquitto/config/mosquitto.conf:ro
      - /data/mqtt/data:/mosquitto/data
      - /data/mqtt/logs:/mosquitto/log
    networks:
      - edge-internal
    cpus: "0.5"
    mem_limit: 128m

  # ============================================
  # VPN Client (WireGuard)
  # ============================================
  vpn-client:
    image: lscr.io/linuxserver/wireguard:latest
    container_name: vpn-client
    restart: unless-stopped
    cap_add:
      - NET_ADMIN
      - SYS_MODULE
    environment:
      - PUID=1000
      - PGID=1000
    volumes:
      - ./config/wireguard:/config/wg0.conf:ro
      - /lib/modules:/lib/modules:ro
    sysctls:
      - net.ipv4.conf.all.src_valid_mark=1
      - net.ipv4.ip_forward=1
    networks:
      - vpn-net
    cpus: "0.5"
    mem_limit: 128m

  # ============================================
  # Prometheus (Metrics)
  # ============================================
  prometheus:
    image: prom/prometheus:v2.50.0
    container_name: prometheus
    restart: unless-stopped
    ports:
      - "127.0.0.1:9090:9090"
    volumes:
      - ./config/prometheus.yml:/etc/prometheus/prometheus.yml:ro
      - /data/prometheus:/prometheus
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
      - '--storage.tsdb.path=/prometheus'
      - '--web.console.libraries=/etc/prometheus/console_libraries'
      - '--web.console.templates=/etc/prometheus/consoles'
      - '--storage.tsdb.retention.time=7d'
    networks:
      - edge-internal
    cpus: "0.5"
    mem_limit: 256m

  # ============================================
  # Node Exporter (Host Metrics)
  # ============================================
  node-exporter:
    image: prom/node-exporter:v1.7.0
    container_name: node-exporter
    restart: unless-stopped
    volumes:
      - /proc:/host/proc:ro
      - /sys:/host/sys:ro
      - /:/rootfs:ro
    command:
      - '--path.procfs=/host/proc'
      - '--path.rootfs=/rootfs'
      - '--path.sysfs=/host/sys'
      - '--collector.filesystem.mount-points-exclude=^/(sys|proc|dev|host|etc)($$|/)'
    networks:
      - edge-internal
    cpus: "0.25"
    mem_limit: 64m

networks:
  edge-internal:
    driver: bridge
    ipam:
      config:
        - subnet: 172.28.0.0/16
  vpn-net:
    driver: bridge
    internal: true  # Only containers can access this network

8.4 Dockerfiles

8.4.1 Stream Manager Dockerfile

# docker/Dockerfile.stream-manager
FROM python:3.11-slim-bookworm AS base

# Install system dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
    curl \
    ffmpeg \
    libsm6 \
    libxext6 \
    libgl1-mesa-glx \
    && rm -rf /var/lib/apt/lists/*

WORKDIR /app

# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY src/ ./src/

# Create non-root user
RUN useradd -m -u 1000 appuser && \
    mkdir -p /data/hls /data/buffer /data/logs /app/logs && \
    chown -R appuser:appuser /app /data

USER appuser

# Expose API port
EXPOSE 8080

# Health check
HEALTHCHECK --interval=30s --timeout=5s --start-period=30s --retries=3 \
    CMD curl -f http://localhost:8080/api/v1/status || exit 1

# Run the application
CMD ["python", "-m", "src.main"]

8.4.2 Frame Extractor Dockerfile

# docker/Dockerfile.frame-extractor
FROM python:3.11-slim-bookworm

RUN apt-get update && apt-get install -y --no-install-recommends \
    ffmpeg \
    libsm6 \
    libxext6 \
    libgl1-mesa-glx \
    libgomp1 \
    && rm -rf /var/lib/apt/lists/*

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY src/processing/ ./src/processing/
COPY src/mqtt/ ./src/mqtt/
COPY src/core/ ./src/core/
COPY src/utils/ ./src/utils/

RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser

CMD ["python", "-m", "src.processing.frame_extractor"]

8.4.3 Buffer Manager Dockerfile

# docker/Dockerfile.buffer-manager
FROM python:3.11-slim-bookworm

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY src/storage/ ./src/storage/
COPY src/core/ ./src/core/
COPY src/utils/ ./src/utils/

RUN useradd -m -u 1000 appuser && \
    mkdir -p /data/buffer && \
    chown -R appuser:appuser /app /data
USER appuser

CMD ["python", "-m", "src.storage.buffer_manager"]

8.5 Requirements.txt

# Core Framework
fastapi==0.110.0
uvicorn[standard]==0.27.1
python-multipart==0.0.9
websockets==12.0

# Async
aiohttp==3.9.3
aiofiles==23.2.1
aiortc==1.8.0

# MQTT
paho-mqtt==1.6.1

# Computer Vision
opencv-python-headless==4.9.0.84
numpy==1.26.4
Pillow==10.2.0

# ONVIF
onvif-zeep==0.2.12

# Configuration
PyYAML==6.0.1
python-dotenv==1.0.1

# Data Validation
pydantic==2.6.3
pydantic-settings==2.2.1

# Monitoring
prometheus-client==0.20.0

# Testing
pytest==8.0.2
pytest-asyncio==0.23.5
pytest-cov==4.1.0
httpx==0.27.0

# Utilities
httpx==0.27.0
tenacity==8.2.3
structlog==24.1.0
orjson==3.9.15

8.6 Configuration Files

8.6.1 Camera Configuration (cameras.yaml)

# config/cameras.yaml
cameras:
  - channel: 1
    name: "Gate/Entry"
    enabled: true
    primary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0"
    secondary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=1"
    use_primary: false
    ai_enabled: true
    ai_fps: 5.0
    ai_resolution: "640x480"
    ai_mode: "hybrid"
    recording_enabled: true
    pre_event_seconds: 10
    post_event_seconds: 30
    tags:
      location: "main_gate"
      criticality: "high"

  - channel: 2
    name: "Production Floor A"
    enabled: true
    primary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=2&subtype=0"
    secondary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=2&subtype=1"
    use_primary: false
    ai_enabled: true
    ai_fps: 5.0
    ai_resolution: "640x480"
    ai_mode: "hybrid"
    recording_enabled: true
    pre_event_seconds: 10
    post_event_seconds: 30
    tags:
      location: "production_a"
      criticality: "high"

  - channel: 3
    name: "Production Floor B"
    enabled: true
    primary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=3&subtype=0"
    secondary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=3&subtype=1"
    use_primary: false
    ai_enabled: true
    ai_fps: 5.0
    ai_resolution: "640x480"
    ai_mode: "hybrid"
    recording_enabled: true
    pre_event_seconds: 10
    post_event_seconds: 30
    tags:
      location: "production_b"
      criticality: "medium"

  - channel: 4
    name: "Warehouse"
    enabled: true
    primary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=4&subtype=0"
    secondary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=4&subtype=1"
    use_primary: false
    ai_enabled: true
    ai_fps: 5.0
    ai_resolution: "640x480"
    ai_mode: "hybrid"
    recording_enabled: true
    pre_event_seconds: 10
    post_event_seconds: 30
    tags:
      location: "warehouse"
      criticality: "medium"

  - channel: 5
    name: "Loading Dock"
    enabled: true
    primary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=5&subtype=0"
    secondary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=5&subtype=1"
    use_primary: false
    ai_enabled: true
    ai_fps: 5.0
    ai_resolution: "640x480"
    ai_mode: "hybrid"
    recording_enabled: true
    pre_event_seconds: 10
    post_event_seconds: 30
    tags:
      location: "loading_dock"
      criticality: "medium"

  - channel: 6
    name: "Office Corridor"
    enabled: true
    primary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=6&subtype=0"
    secondary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=6&subtype=1"
    use_primary: false
    ai_enabled: true
    ai_fps: 5.0
    ai_resolution: "640x480"
    ai_mode: "hybrid"
    recording_enabled: false
    pre_event_seconds: 10
    post_event_seconds: 30
    tags:
      location: "office"
      criticality: "low"

  - channel: 7
    name: "Server Room"
    enabled: true
    primary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=7&subtype=0"
    secondary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=7&subtype=1"
    use_primary: false
    ai_enabled: true
    ai_fps: 5.0
    ai_resolution: "640x480"
    ai_mode: "hybrid"
    recording_enabled: true
    pre_event_seconds: 15
    post_event_seconds: 45
    tags:
      location: "server_room"
      criticality: "high"

  - channel: 8
    name: "Perimeter"
    enabled: true
    primary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=8&subtype=0"
    secondary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=8&subtype=1"
    use_primary: false
    ai_enabled: true
    ai_fps: 5.0
    ai_resolution: "640x480"
    ai_mode: "hybrid"
    recording_enabled: true
    pre_event_seconds: 10
    post_event_seconds: 30
    tags:
      location: "perimeter"
      criticality: "high"

settings:
  default_ai_fps: 5.0
  default_inference_resolution: "640x480"
  default_ai_mode: "hybrid"
  stream_stall_timeout: 10
  reconnect_max_delay: 30
  circuit_breaker:
    failure_threshold: 5
    recovery_timeout: 60
    half_open_max_calls: 3
    success_threshold: 2

8.6.2 Gateway Configuration (gateway.yaml)

# config/gateway.yaml
gateway:
  id: "edge-ind-01"
  location: "Plant Floor A"
  description: "Edge gateway for CP PLUS DVR surveillance"

network:
  lan_interface: "eth0"
  lan_ip: "192.168.29.10"
  subnet: "255.255.255.0"
  gateway: "192.168.29.1"
  dns:
    - "8.8.8.8"
    - "1.1.1.1"

vpn:
  enabled: true
  type: "wireguard"
  tunnel_ip: "10.100.0.10"
  server_endpoint: "cloud-gateway.example.com:51820"
  persistent_keepalive: 25
  mtu: 1400

dvr:
  host: "192.168.29.200"
  rtsp_port: 554
  http_port: 80
  https_port: 443
  username: "admin"
  password: "${DVR_PASSWORD}"  # From environment
  channels: 8
  onvif_enabled: true

streams:
  transport: "tcp"
  buffer_size: 65536
  stall_timeout: 10
  reconnect:
    initial_delay: 1.0
    max_delay: 30.0
    multiplier: 2.0
    jitter: 0.1

ai:
  inference_fps: 5.0
  keyframe_only: false
  mode: "hybrid"
  resolution: "640x480"
  confidence_threshold: 0.6
  mqtt_topic_prefix: "ai/frames"

hls:
  segment_duration: 2
  window_size: 5
  output_dir: "/data/hls"
  cleanup_interval: 60

storage:
  buffer_max_size_gb: 20
  buffer_retention_hours: 72
  hls_cache_max_size_gb: 5
  log_max_size_gb: 1
  upload:
    endpoint: "https://10.100.0.1/api/upload"
    max_retries: 5
    retry_base_delay: 5
    max_retry_delay: 300
    concurrent_uploads: 3
    verify_ssl: true

monitoring:
  health_check_interval: 30
  metrics_port: 9090
  web_ui_port: 8080
  log_level: "INFO"
  log_format: "json"

security:
  api_auth_enabled: true
  jwt_secret: "${JWT_SECRET}"
  jwt_expiry_hours: 24
  allowed_dashboard_hosts:
    - "https://dashboard.example.com"

8.6.3 Mosquitto MQTT Configuration

# config/mosquitto.conf
# MQTT Broker for internal edge communication

listener 1883 0.0.0.0
protocol mqtt

# Persistence
persistence true
persistence_location /mosquitto/data/

# Logging
log_dest file /mosquitto/log/mosquitto.log
log_type error
log_type warning
log_type information
connection_messages true

# Security — local network only
allow_anonymous true
# In production, use password_file and ACL
# password_file /mosquitto/config/passwd
# acl_file /mosquitto/config/acl

# Limits
max_connections 100
message_size_limit 10485760  # 10MB max (for image frames)
max_inflight_messages 100
max_queued_messages 1000

8.6.4 Prometheus Configuration

# config/prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

scrape_configs:
  - job_name: 'prometheus'
    static_configs:
      - targets: ['localhost:9090']

  - job_name: 'node-exporter'
    static_configs:
      - targets: ['node-exporter:9100']

  - job_name: 'stream-manager'
    static_configs:
      - targets: ['stream-manager:8080']
    metrics_path: '/metrics'

8.7 Environment File (.env)

# .env — Environment variables for Docker Compose
# Copy to .env and fill in sensitive values

# Gateway Identity
GATEWAY_ID=edge-ind-01
LOG_LEVEL=INFO

# DVR Credentials (REQUIRED)
DVR_USERNAME=admin
DVR_PASSWORD=your_dvr_password_here

# AI Pipeline
AI_INFERENCE_FPS=5
AI_KEYFRAME_ONLY=true
AI_RESOLUTION=640x480

# Cloud Upload
UPLOAD_ENDPOINT=https://10.100.0.1/api/upload
UPLOAD_MAX_RETRIES=5

# VPN
VPN_ENABLED=true

# Buffer
BUFFER_MAX_SIZE_GB=20
BUFFER_RETENTION_HOURS=72

# JWT Secret for API Auth
JWT_SECRET=change_this_to_a_random_32_char_string

8.8 Main Application Entry Point

# src/main.py
import asyncio
import logging
import signal
import sys
from contextlib import asynccontextmanager

import uvicorn
from fastapi import FastAPI

from .api.server import create_app
from .core.config import GatewayConfig
from .core.logging_setup import setup_logging
from .streams.manager import StreamManager, StreamManagerConfig
from .streams.registry import CameraRegistry
from .streams.ffmpeg_builder import FFmpegCommandBuilder
from .streams.connector import ReconnectConfig
from .streams.circuit_breaker import CircuitBreakerConfig
from .health.monitor import HealthMonitor
from .health.metrics import MetricsCollector

logger = logging.getLogger(__name__)

class EdgeGateway:
    """Main application class — initializes and coordinates all services."""

    def __init__(self):
        self.config = GatewayConfig.load()
        setup_logging(self.config.log_level, self.config.log_format)
        self.registry = CameraRegistry(self.config.camera_config_path)
        self.stream_manager: Optional[StreamManager] = None
        self.health_monitor: Optional[HealthMonitor] = None
        self.metrics = MetricsCollector()
        self._shutdown_event = asyncio.Event()

    async def initialize(self):
        """Initialize all services."""
        logger.info(f"Starting Edge Gateway: {self.config.gateway_id}")

        # Load camera configuration
        await self.registry.load()

        # Build FFmpeg command builder
        ffmpeg_builder = FFmpegCommandBuilder()

        # Configure stream manager
        manager_config = StreamManagerConfig(
            reconnect=ReconnectConfig(
                initial_delay=self.config.reconnect_initial_delay,
                max_delay=self.config.reconnect_max_delay,
                stall_timeout=self.config.stream_stall_timeout,
            ),
            circuit_breaker=CircuitBreakerConfig(
                failure_threshold=self.config.cb_failure_threshold,
                recovery_timeout=self.config.cb_recovery_timeout,
            ),
            ffmpeg=ffmpeg_builder,
        )

        self.stream_manager = StreamManager(
            config=manager_config,
            registry=self.registry,
            metrics=self.metrics,
        )

        self.health_monitor = HealthMonitor(
            gateway_id=self.config.gateway_id,
            dvr_host=self.config.dvr_host,
            vpn_endpoint=self.config.vpn_server_ip,
            check_interval=self.config.health_check_interval,
        )

        # Wire up health callbacks
        self.health_monitor.on_health_change(self._on_health_change)

        # Start services
        await self.stream_manager.start()
        await self.health_monitor.start()

        logger.info("Edge Gateway initialized successfully")

    def _on_health_change(self, report):
        """Handle health status changes."""
        if report.overall_status == "critical":
            logger.critical(f"Gateway health CRITICAL: {report.issues}")
        elif report.overall_status == "degraded":
            logger.warning(f"Gateway health degraded: {report.issues}")

    async def run_api_server(self):
        """Run the FastAPI HTTP server."""
        app = create_app(self.stream_manager, self.health_monitor, self.config)

        config = uvicorn.Config(
            app=app,
            host="0.0.0.0",
            port=self.config.web_ui_port,
            log_level=self.config.log_level.lower(),
            access_log=True,
        )
        server = uvicorn.Server(config)
        await server.serve()

    async def run(self):
        """Main run loop."""
        await self.initialize()

        # Run API server and wait for shutdown
        try:
            await asyncio.gather(
                self.run_api_server(),
                self._shutdown_event.wait(),
                return_exceptions=True,
            )
        except asyncio.CancelledError:
            pass
        finally:
            await self.shutdown()

    async def shutdown(self):
        """Graceful shutdown."""
        logger.info("Shutting down Edge Gateway...")
        if self.stream_manager:
            await self.stream_manager.stop()
        if self.health_monitor:
            await self.health_monitor.stop()
        logger.info("Edge Gateway stopped")

    def handle_signal(self, sig):
        """Handle OS signals for graceful shutdown."""
        logger.info(f"Received signal {sig.name}")
        self._shutdown_event.set()


def main():
    gateway = EdgeGateway()

    # Register signal handlers
    loop = asyncio.get_event_loop()
    for sig in (signal.SIGTERM, signal.SIGINT):
        loop.add_signal_handler(sig, lambda s=sig: gateway.handle_signal(s))

    try:
        loop.run_until_complete(gateway.run())
    except KeyboardInterrupt:
        logger.info("Keyboard interrupt received")
    finally:
        loop.run_until_complete(gateway.shutdown())
        loop.close()


if __name__ == "__main__":
    main()

8.9 Startup Script

#!/bin/bash
# scripts/start.sh — Edge Gateway startup script

set -euo pipefail

SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
PROJECT_DIR="$(dirname "$SCRIPT_DIR")"
ENV_FILE="${PROJECT_DIR}/.env"

# Colors
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m'

echo "========================================"
echo "  Edge Gateway Startup"
echo "========================================"

# Check environment file
if [[ ! -f "$ENV_FILE" ]]; then
    echo -e "${RED}ERROR: .env file not found at ${ENV_FILE}${NC}"
    echo "Please copy .env.example to .env and fill in credentials"
    exit 1
fi

# Load environment
set -a
source "$ENV_FILE"
set +a

# Validate required variables
if [[ -z "${DVR_PASSWORD:-}" ]]; then
    echo -e "${RED}ERROR: DVR_PASSWORD is not set in .env${NC}"
    exit 1
fi

# Create data directories
echo "Creating data directories..."
mkdir -p /data/{hls,buffer/alert_clips,buffer/continuous,buffer/pre_event,logs,config,mqtt,prometheus}
chmod 755 /data/*

# Check DVR connectivity
echo "Checking DVR connectivity (${DVR_HOST:-192.168.29.200})..."
if ping -c 1 -W 2 "${DVR_HOST:-192.168.29.200}" > /dev/null 2>&1; then
    echo -e "${GREEN}DVR is reachable${NC}"
else
    echo -e "${YELLOW}WARNING: DVR is not reachable — streams will fail${NC}"
fi

# Check Docker
echo "Checking Docker..."
if ! docker info > /dev/null 2>&1; then
    echo -e "${RED}ERROR: Docker is not running${NC}"
    exit 1
fi

# Pull latest images
echo "Pulling Docker images..."
docker compose -f "${PROJECT_DIR}/docker-compose.yml" pull

# Start services
echo "Starting Edge Gateway services..."
docker compose -f "${PROJECT_DIR}/docker-compose.yml" up -d --remove-orphans

# Wait for services
echo "Waiting for services to start..."
sleep 10

# Health check
echo "Running health check..."
MAX_RETRIES=12
RETRY=0
while [[ $RETRY -lt $MAX_RETRIES ]]; do
    if curl -sf http://localhost:8080/api/v1/status > /dev/null 2>&1; then
        echo -e "${GREEN}Edge Gateway is healthy!${NC}"
        echo ""
        echo "Diagnostic UI: http://localhost:8080/"
        echo "API Status:    http://localhost:8080/api/v1/status"
        echo "HLS Streams:   http://localhost:8081/hls/ch{N}.m3u8"
        exit 0
    fi
    RETRY=$((RETRY + 1))
    echo "  Retry ${RETRY}/${MAX_RETRIES}..."
    sleep 5
done

echo -e "${RED}ERROR: Gateway failed health check${NC}"
echo "Check logs: docker compose logs stream-manager"
exit 1

9. Appendix

9.1 Complete FFmpeg Command Reference

A. Single-Channel Operations

# A1. Probe stream information (codec, resolution, bitrate)
ffmpeg -rtsp_transport tcp -stimeout 5000000 \
  -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
  -v quiet -print_format json -show_format -show_streams 2>/dev/null

# A2. Save single-frame snapshot
ffmpeg -rtsp_transport tcp -stimeout 5000000 \
  -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
  -vf "select='eq(n,0)'" -vframes 1 -q:v 2 \
  "/data/snapshots/ch1_$(date +%Y%m%d_%H%M%S).jpg"

# A3. Record 60-second clip to MP4
ffmpeg -rtsp_transport tcp -stimeout 5000000 -t 60 \
  -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
  -c:v copy -c:a copy -movflags +faststart \
  "/data/clips/ch1_$(date +%Y%m%d_%H%M%S).mp4"

# A4. Transcode to H.264 for cloud compatibility
ffmpeg -rtsp_transport tcp -stimeout 5000000 \
  -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
  -c:v libx264 -preset fast -tune zerolatency -crf 23 \
  -maxrate 2000k -bufsize 4000k -g 50 \
  -c:a aac -b:a 128k \
  -f mp4 -movflags +faststart \
  "/data/clips/ch1_transcoded_$(date +%Y%m%d_%H%M%S).mp4"

# A5. Extract audio only
ffmpeg -rtsp_transport tcp -stimeout 5000000 \
  -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
  -vn -c:a aac -b:a 128k \
  "/data/audio/ch1_$(date +%Y%m%d_%H%M%S).aac"

# A6. Stream to RTMP relay (e.g., YouTube, custom server)
ffmpeg -rtsp_transport tcp -stimeout 5000000 \
  -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
  -c:v copy -c:a aac -b:a 128k -ar 44100 \
  -f flv "rtmp://relay-server.example.com/live/ch1"

# A7. Generate thumbnail grid (contact sheet)
ffmpeg -rtsp_transport tcp -stimeout 5000000 -t 60 \
  -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
  -vf "fps=1/5,scale=320:240,tile=4x3" -vframes 1 \
  "/data/snapshots/ch1_contact.jpg"

# A8. Motion detection using FFmpeg scene filter
ffmpeg -rtsp_transport tcp -stimeout 5000000 \
  -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
  -vf "select='gt(scene,0.1)',scale=640:480" -f image2pipe -vcodec mjpeg - \
  > /dev/null 2>&1 | grep -o 'scene:[0-9.]*' | head -20

# A9. Low-latency WebRTC-compatible output
ffmpeg -rtsp_transport tcp -stimeout 5000000 \
  -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
  -c:v libx264 -preset ultrafast -tune zerolatency -profile:v baseline \
  -vf "scale=640:480" -b:v 1000k -g 30 \
  -c:a opus -b:a 64k \
  -f webm pipe:1

# A10. MJPEG stream for simple HTTP embedding
ffmpeg -rtsp_transport tcp -stimeout 5000000 \
  -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
  -c:v mjpeg -q:v 5 -vf "fps=15,scale=640:480" \
  -f mpjpeg -boundary_tag ffmpeg \
  "http://localhost:8081/mjpeg/ch1"

B. Multi-Channel Operations

# B1. Simultaneous 4-channel recording
for CH in 1 2 3 4; do
  ffmpeg -rtsp_transport tcp -stimeout 5000000 \
    -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=${CH}&subtype=0" \
    -c:v copy -c:a copy -t 300 \
    -movflags +faststart \
    "/data/clips/ch${CH}_$(date +%Y%m%d_%H%M%S).mp4" &
done
wait

# B2. 8-channel status check (probe all streams)
for CH in $(seq 1 8); do
  echo "=== Channel ${CH} ==="
  ffprobe -v error -rtsp_transport tcp -stimeout 3000000 \
    -select_streams v:0 \
    -show_entries stream=codec_name,width,height,r_frame_rate,bit_rate \
    -of csv=s=,:p=0 \
    "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=${CH}&subtype=0" \
    2>/dev/null || echo "  UNREACHABLE"
done

# B3. Mosaic view (all 8 channels in 4x2 grid)
ffmpeg \
  -rtsp_transport tcp -stimeout 5000000 -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=1" \
  -rtsp_transport tcp -stimeout 5000000 -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=2&subtype=1" \
  -rtsp_transport tcp -stimeout 5000000 -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=3&subtype=1" \
  -rtsp_transport tcp -stimeout 5000000 -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=4&subtype=1" \
  -rtsp_transport tcp -stimeout 5000000 -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=5&subtype=1" \
  -rtsp_transport tcp -stimeout 5000000 -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=6&subtype=1" \
  -rtsp_transport tcp -stimeout 5000000 -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=7&subtype=1" \
  -rtsp_transport tcp -stimeout 5000000 -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=8&subtype=1" \
  -filter_complex "
    [0:v]scale=480:270[v0]; [1:v]scale=480:270[v1];
    [2:v]scale=480:270[v2]; [3:v]scale=480:270[v3];
    [4:v]scale=480:270[v4]; [5:v]scale=480:270[v5];
    [6:v]scale=480:270[v6]; [7:v]scale=480:270[v7];
    [v0][v1][v2][v3]hstack=inputs=4[row1];
    [v4][v5][v6][v7]hstack=inputs=4[row2];
    [row1][row2]vstack=inputs=2[out]
  " \
  -map "[out]" -c:v libx264 -preset fast -b:v 4000k -f hls \
  -hls_time 2 -hls_list_size 5 -hls_flags delete_segments \
  "/data/hls/mosaic.m3u8"

# B4. Batch transcode all channels to H.264
for CH in $(seq 1 8); do
  ffmpeg -rtsp_transport tcp -stimeout 5000000 -t 60 \
    -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=${CH}&subtype=0" \
    -c:v libx264 -preset fast -crf 28 -vf "scale=640:480" \
    -c:a aac -b:a 64k -ac 1 \
    -movflags +faststart \
    "/data/clips/ch${CH}_compact.mp4" &
done
wait
echo "All channels transcoded"

C. HLS-Specific Operations

# C1. HLS with AES-128 encryption
ffmpeg -rtsp_transport tcp -stimeout 5000000 \
  -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
  -c:v copy -c:a copy \
  -f hls -hls_time 2 -hls_list_size 5 \
  -hls_key_info_file "/data/config/hls_key_info.txt" \
  -hls_segment_filename "/data/hls/ch1_enc_%03d.ts" \
  "/data/hls/ch1_enc.m3u8"

# C2. HLS with independent segments (for CDN compatibility)
ffmpeg -rtsp_transport tcp -stimeout 5000000 \
  -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
  -c:v copy -c:a copy \
  -f hls -hls_time 2 -hls_list_size 5 \
  -hls_flags independent_segments+delete_segments+omit_endlist \
  -hls_segment_filename "/data/hls/ch1_%03d.ts" \
  "/data/hls/ch1.m3u8"

# C3. Multi-variant HLS (adaptive bitrate)
ffmpeg -rtsp_transport tcp -stimeout 5000000 \
  -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
  -filter_complex "[0:v]split=2[high][low];[low]scale=640:480[lowv]" \
  -map "[high]" -map 0:a -c:v:0 copy -c:a:0 copy \
  -map "[lowv]" -c:v:1 libx264 -preset fast -b:v:1 800k -c:a:1 aac -b:a:1 64k \
  -var_stream_map "v:0,a:0,name:high v:1,a:1,name:low" \
  -f hls -hls_time 2 -hls_list_size 5 \
  -hls_flags delete_segments+omit_endlist \
  -master_pl_name "ch1_master.m3u8" \
  -hls_segment_filename "/data/hls/ch1_%v_%03d.ts" \
  "/data/hls/ch1_%v.m3u8"

9.2 API Endpoint Reference

Method Endpoint Description Auth
GET / Diagnostic HTML UI No
GET /api/v1/status Full gateway status No
GET /api/v1/cameras List all cameras Yes
GET /api/v1/cameras/{ch} Get camera config Yes
PUT /api/v1/cameras/{ch} Update camera config Yes
POST /api/v1/cameras/{ch}/restart Restart stream Yes
POST /api/v1/cameras/{ch}/enable Enable camera Yes
POST /api/v1/cameras/{ch}/disable Disable camera Yes
GET /api/v1/streams/{ch}/status Stream status detail Yes
GET /api/v1/streams/{ch}/stats Stream statistics Yes
POST /api/v1/streams/{ch}/start Start stream Yes
POST /api/v1/streams/{ch}/stop Stop stream Yes
GET /api/v1/hls/{ch}.m3u8 HLS playlist No*
GET /api/v1/hls/{segment}.ts HLS segment No*
GET /api/v1/buffer/stats Buffer statistics Yes
GET /api/v1/buffer/uploads/pending Pending uploads Yes
POST /api/v1/buffer/uploads/{id}/retry Retry upload Yes
GET /api/v1/health Health check (simple) No
GET /api/v1/health/detailed Detailed health Yes
GET /api/v1/metrics Prometheus metrics No
WS /ws/live Real-time status WebSocket Yes

*HLS endpoints use token-based URL signing for security.

9.3 MQTT Topic Structure

Topic Direction Payload Description
ai/frames/{channel} Edge → Cloud JPEG bytes + metadata AI inference frames
ai/detections/{channel} Edge → Cloud JSON Detection results
ai/events/{channel} Edge → Cloud JSON Triggered events
streams/status/{channel} Edge → Cloud JSON Stream health status
gateway/health Edge → Cloud JSON Gateway health report
gateway/metrics Edge → Cloud JSON Resource metrics
control/cameras/{channel}/restart Cloud → Edge JSON Restart command
control/cameras/{channel}/config Cloud → Edge JSON Config update
control/gateway/reboot Cloud → Edge Empty Gateway reboot
alerts/critical Edge → Cloud JSON Critical alerts
buffer/uploads/status Edge → Cloud JSON Upload progress

9.4 Failure Mode Matrix

Failure Detection Response Fallback Recovery
DVR network unreachable ICMP ping fails Mark all streams OFFLINE Use cached last frame Auto-retry every 30s
Single channel RTSP fail No frames for 10s Exponential backoff reconnect Circuit breaker opens Auto-recovery test
All 8 channels fail Mass disconnect Alert to cloud, enter degraded mode Static placeholder DVR power cycle suggestion
VPN tunnel down WG handshake fail Queue uploads locally Local-only mode Auto-reconnect WG
Disk full (>95%) df monitor Emergency cleanup Drop oldest clips Alert + cleanup
FFmpeg crash Process exit code Restart container Retry with delay Docker auto-restart
MQTT broker down Connection timeout Buffer messages in memory File-based queuing Auto-reconnect
High CPU usage >90% for 60s Reduce AI FPS Skip non-key frames Auto-scale down
Memory pressure >90% usage Drop frames, OOM protection Reduce buffer sizes Kernel OOM killer
Cloud upload fail HTTP 5xx Retry with backoff Local retention Infinite retry
SSL certificate expiry TLS handshake fail Log warning Insecure fallback (dev only) Auto-renewal
Power loss Hardware event UPS graceful shutdown None (hardware) Auto-start on boot

9.5 Network Port Reference

Port Protocol Service Direction Notes
80/tcp HTTP DVR Web UI Edge → DVR Local only
443/tcp HTTPS DVR Web UI Edge → DVR Local only
554/tcp RTSP Video streams Edge → DVR Primary protocol
554/udp RTSP/UDP Video streams Edge → DVR Fallback
25001/tcp TCP DVR control Edge → DVR CP PLUS protocol
25002/udp UDP DVR control Edge → DVR CP PLUS protocol
123/udp NTP Time sync Edge → Gateway Or local NTP
8080/tcp HTTP Gateway API External → Edge Diagnostic UI
8081/tcp HTTP HLS server External → Edge Video segments
1883/tcp MQTT Message broker Internal Local only
51820/udp WireGuard VPN tunnel Edge ↔ Cloud Encrypted
9090/tcp HTTP Prometheus Internal Metrics
9100/tcp HTTP Node Exporter Internal Host metrics

9.6 Performance Benchmarks (Expected)

Metric Intel NUC i5 RPi 5 Jetson Orin Nano
8x RTSP ingest (sub) <5% CPU <15% CPU <10% CPU
8x RTSP ingest (main) <10% CPU <30% CPU <15% CPU
HLS segment generation <5% CPU <10% CPU <5% CPU
AI frame extraction (8ch) <20% CPU <50% CPU* <10% CPU (GPU)
Upload throughput 50+ Mbps 20+ Mbps 30+ Mbps
End-to-end latency 500-1000ms 800-1500ms 600-1200ms
Power consumption 15-25W 5-8W 7-15W

*RPi 5 may struggle with 8-channel simultaneous AI extraction; consider 4-channel max.

9.7 Security Checklist

  • DVR password changed from default
  • WireGuard PSK configured
  • VPN uses strong crypto (Curve25519, ChaCha20-Poly1305)
  • API uses JWT authentication
  • HLS URLs use signed tokens (time-limited)
  • DVR Web UI not exposed externally
  • MQTT broker bound to localhost only
  • Docker containers run as non-root
  • Secrets in .env, not in code
  • .env file permissions: 600
  • SSL certificates for API HTTPS
  • Log rotation configured
  • Fail2ban for API brute force protection
  • Network segmentation (DVR on isolated VLAN)
  • Regular firmware updates for DVR
  • Backup of gateway configuration

9.8 Glossary

Term Definition
DVR Digital Video Recorder — CP PLUS CP-UVR-0801E1-CV2
UVR Universal Video Recorder (supports analog + IP cameras)
RTSP Real-Time Streaming Protocol (RFC 7826)
ONVIF Open Network Video Interface Forum standard
HLS HTTP Live Streaming (Apple)
DASH Dynamic Adaptive Streaming over HTTP
WebRTC Web Real-Time Communication
GOP Group of Pictures (I-frame + P-frames + B-frames)
CRF Constant Rate Factor (quality-based encoding)
DSCP Differentiated Services Code Point (QoS marking)
MTU Maximum Transmission Unit
EWMA Exponentially Weighted Moving Average
Circuit Breaker Fault tolerance pattern for failing services
Ring Buffer Circular fixed-size buffer overwriting oldest data
Keyframe/I-frame Intra-coded frame (full image, no reference)
P-frame Predicted frame (references previous frame)
MJPEG Motion JPEG (sequence of JPEG images)
WireGuard Modern VPN protocol using UDP

Document version 1.0.0 — Video Ingestion Subsystem Design for CP PLUS ORANGE Series DVR Generated for AI-powered industrial surveillance platform