AI-Powered Industrial Surveillance Platform — Video Ingestion Subsystem Design
Document Information
| Attribute | Value |
|---|---|
| Document Version | 1.0.0 |
| Target DVR | CP PLUS ORANGE CP-UVR-0801E1-CV2 |
| DVR Hardware | V1.0, System V4.001.00AT001.3.0 |
| Channels | 8 active |
| Resolution | 960x1080 per channel |
| DVR Network | 192.168.29.200/24, Static |
| ONVIF | V2.6.1.867657 (Server V19.06) |
| Date | 2025 |
| Classification | Technical Specification — Video Ingestion Subsystem |
Table of Contents
- System Architecture Overview
- RTSP Stream Configuration
- Edge Gateway Design
- Stream Discovery & Management
- Stream Processing Pipeline
- Live Streaming for Dashboard
- Edge Buffering & Local Storage
- Implementation Specifications
- Appendix
1. System Architecture Overview
1.1 High-Level Topology
+-----------------------------------------------------------------------------+
| CLOUD VPC (VPN TUNNEL) |
| +------------------+ +------------------+ +---------------------------+ |
| | Stream Relay | | AI Inference | | Dashboard / NVR | |
| | (HLS/DASH/WebRTC)| | Service | | Web UI | |
| | Port: 443 | | GPU Cluster | | React/Vue Frontend | |
| +--------+---------+ +--------+---------+ +---------------------------+ |
| | | |
| +--------+---------+ | |
| | Metadata Bus | | (WebSocket: frame metadata) |
| | MQTT/Kafka |<----------+ |
| | Port: 8883/9094 | |
| +--------+---------+ |
+----------|------------------------------------------------------------------+
|
| WireGuard / OpenVPN Tunnel (UDP 51820 or TCP 443)
|
+----------v------------------------------------------------------------------+
| EDGE GATEWAY (Local Network) |
| Hardware: Intel NUC 13 Pro / Raspberry Pi 5 / NVIDIA Jetson Nano |
| OS: Ubuntu 22.04 LTS (Server) |
| LAN IP: 192.168.29.10/24 |
| |
| +------------------+ +------------------+ +---------------------------+ |
| | Stream Manager | | FFmpeg Pipeline | | HLS Segmenter | |
| | (Python/asyncio) | | (8x RTSP feeds) | | (live streaming) | |
| | Port: 8080 | | RAM buffers | | Port: 8081 | |
| +--------+---------+ +--------+---------+ +---------------------------+ |
| | | |
| +--------v---------+ +--------v---------+ +---------------------------+ |
| | Health Monitor | | Frame Extractor | | Buffer Manager | |
| | & Watchdog | | (AI decimation) | | (ring buffer + upload) | |
| +------------------+ +--------+---------+ +---------------------------+ |
| | |
| +---------------------v---------------------+ |
| | VPN Client (WireGuard) | |
| +---------------------+---------------------+ |
+-------------------------------|----------------------------------------------+
|
| RTSP over TCP/UDP
| ONVIF over HTTP
|
+-------------------------------v----------------------------------------------+
| CP PLUS DVR (192.168.29.200) |
| +----+----+----+----+----+----+----+----+ |
| |CH1 |CH2 |CH3 |CH4 |CH5 |CH6 |CH7 |CH8 | 8x 960x1080 @ 25fps |
| +----+----+----+----+----+----+----+----+ |
| RTSP:554 ONVIF:80 TCP:25001 UDP:25002 HTTP:80 HTTPS:443 |
+------------------------------------------------------------------------------+
1.2 Data Flow Summary
[DVR RTSP Stream] --> [Edge Gateway] --> [FFmpeg Demux] --> [Multi-target routing]
|
+----------------+-------------+----------------+
| | | |
[HLS Segmenter] [Frame Extractor] [Buffer Mgr] [Live Relay]
| | | |
[Dashboard] [AI Inference] [Cloud Upload] [Cloud Stream]
| | | |
WebSocket MQTT/Kafka S3/MinIO CDN/Relay
1.3 Network Specification
| Parameter | Value |
|---|---|
| DVR IP | 192.168.29.200 |
| DVR Subnet | 255.255.255.0 (/24) |
| DVR Gateway | 192.168.29.1 |
| Edge Gateway LAN IP | 192.168.29.10 (static) |
| VPN Tunnel Network | 10.100.0.0/24 (WireGuard) |
| Cloud Relay Endpoint | 10.100.0.1 (VPN server) |
| RTSP Transport | TCP interleaved (preferred) / UDP |
| MTU | 1400 (WireGuard tunnel) |
| QoS | DSCP EF (46) for RTSP, DSCP AF41 (34) for HLS |
1.4 Bandwidth Budget (960x1080 @ 25fps)
| Stream Type | Codec | Expected Bitrate | 8-Channel Total |
|---|---|---|---|
| Main Stream (primary) | H.265 | 2-4 Mbps | 16-32 Mbps |
| Sub Stream (secondary) | H.264 | 512-1024 Kbps | 4-8 Mbps |
| AI Extraction Stream | H.264 | 256-512 Kbps | 2-4 Mbps |
| HLS Output (live) | H.264 | 1-2 Mbps | 8-16 Mbps |
| Total LAN Ingress | — | — | ~30-60 Mbps |
| Cloud Egress (VPN) | — | — | ~8-16 Mbps (optimized) |
2. RTSP Stream Configuration
2.1 RTSP URL Patterns for CP PLUS ORANGE Series
CP PLUS ORANGE Series DVRs use a Dahua-compatible RTSP URL scheme. Two URL formats are supported:
Primary (Main Stream) — Full Resolution, Higher Bitrate
rtsp://admin:{PASSWORD}@192.168.29.200:554/cam/realmonitor?channel={N}&subtype=0
Secondary (Sub Stream) — Lower Resolution, Lower Bitrate
rtsp://admin:{PASSWORD}@192.168.29.200:554/cam/realmonitor?channel={N}&subtype=1
Alternative Format (Media URL with authentication in headers)
rtsp://192.168.29.200:554/cam/realmonitor?channel={N}&subtype={M}&unicast=true&proto=Onvif
Auth via
DESCRIBErequestAuthorization: Digest username="admin", ...header.
2.2 Channel-to-URL Mapping
| Channel | Location | Primary URL (subtype=0) | Secondary URL (subtype=1) |
|---|---|---|---|
| CH1 | Gate/Entry | rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0 |
rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=1 |
| CH2 | Production Floor A | rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=2&subtype=0 |
rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=2&subtype=1 |
| CH3 | Production Floor B | rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=3&subtype=0 |
rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=3&subtype=1 |
| CH4 | Warehouse | rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=4&subtype=0 |
rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=4&subtype=1 |
| CH5 | Loading Dock | rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=5&subtype=0 |
rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=5&subtype=1 |
| CH6 | Office Corridor | rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=6&subtype=0 |
rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=6&subtype=1 |
| CH7 | Server Room | rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=7&subtype=0 |
rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=7&subtype=1 |
| CH8 | Perimeter | rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=8&subtype=0 |
rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=8&subtype=1 |
2.3 Stream Usage Strategy
| Purpose | Stream Type | Rationale |
|---|---|---|
| AI Inference / Frame Extraction | Sub Stream (subtype=1) | Lower bandwidth, sufficient for detection, reduces compute |
| Live Dashboard (grid view) | Sub Stream (subtype=1) | 8-camera grid needs lower per-stream bandwidth |
| Live Dashboard (fullscreen) | Main Stream (subtype=0) | Full quality for single-camera focus |
| Event Recording / Alert Clips | Main Stream (subtype=0) | Maximum quality for forensic review |
| Cloud Archive | Main Stream (subtype=0) | Best quality for long-term storage |
2.4 Codec Expectations
| Parameter | Main Stream (subtype=0) | Sub Stream (subtype=1) |
|---|---|---|
| Video Codec | H.265 (HEVC) or H.264 | H.264 (AVC) |
| Audio Codec | G.711A/u (if enabled) | G.711A/u (if enabled) |
| Resolution | 960x1080 | CIF to D1 (typically 352x288 or 704x576) |
| Frame Rate | 25 fps | 15-25 fps |
| Bitrate | 2048-4096 Kbps | 512-1024 Kbps |
| GOP Size | 25-50 frames | 25-50 frames |
| Pixel Format | YUV420P | YUV420P |
Note: The DVR disk is FULL (0 bytes free). This means the DVR cannot perform local recording. Stream ingestion must not depend on DVR-side recording or playback features. All archival is edge-managed.
2.5 FFmpeg RTSP Connection Parameters
-rtsp_transport tcp # Force TCP interleaved (NAT/firewall friendly)
-stimeout 5000000 # Socket timeout in microseconds (5 seconds)
-reconnect 1 # Enable reconnection
-reconnect_at_eof 1 # Reconnect at EOF
-reconnect_streamed 1 # Reconnect streamed content
-reconnect_delay_max 30 # Max reconnect delay (seconds)
-fflags +genpts # Generate presentation timestamps
-flags +low_delay # Low latency mode
-probesize 5000000 # Probe size for codec detection
-analyzeduration 2000000 # Analyze duration in microseconds
2.6 Critical RTSP Notes for CP PLUS DVR
- TCP Interleaved is mandatory — UDP may drop behind NAT; CP PLUS DVRs work most reliably with
rtsp_transport tcp - Authentication — Uses Digest authentication (Basic is disabled on newer firmware)
- Session limit — The DVR supports a finite number of concurrent RTSP sessions (typically 10-20). With 8 channels and dual streams, the gateway may open up to 16 concurrent sessions. Stay within limits.
- No audio in subtype=1 — Sub streams typically carry no audio; main stream may include audio
- Channel numbering — Channels are 1-indexed (1-8), not 0-indexed
- Disk full impact — Zero bytes free on DVR means: (a) no local recording possible, (b) some DVR functions may be degraded, (c) stream serving typically still works but monitor performance
3. Edge Gateway Design
3.1 Hardware Specifications
Recommended Platform: Intel NUC 13 Pro (i5-1340P)
| Component | Specification |
|---|---|
| CPU | Intel Core i5-1340P (12 cores, 16 threads) |
| RAM | 16GB DDR4-3200 |
| Storage | 512GB NVMe SSD (for buffering) |
| LAN | Intel i226-V 2.5GbE |
| USB | 3x USB 3.2 (for expansion) |
| Power | 19V DC, 65W |
| OS | Ubuntu 22.04.4 LTS Server (no GUI) |
| Docker | Docker CE 25.x, Docker Compose 2.x |
Alternative: NVIDIA Jetson Orin Nano (for on-device AI inference)
| Component | Specification |
|---|---|
| CPU | 6-core Arm Cortex-A78AE |
| GPU | 1024-core NVIDIA Ampere |
| RAM | 8GB LPDDR5 |
| AI Perf | 40 TOPS |
| Storage | 256GB NVMe via M.2 |
| Power | 7W-15W |
| OS | JetPack 6.0 (Ubuntu 22.04 based) |
Minimum Viable: Raspberry Pi 5
| Component | Specification |
|---|---|
| CPU | BCM2712 2.4GHz quad-core |
| RAM | 8GB LPDDR4X |
| Storage | 256GB USB3 NVMe |
| LAN | Gigabit Ethernet |
| Power | 5V 5A USB-C |
| Note | Sufficient for 8x sub-stream ingestion + HLS relay; may struggle with 8x main stream |
3.2 Edge Gateway Software Architecture
+-------------------------- Edge Gateway (Docker Host) ------------------------+
| |
| +-------------------------+ +-------------------------+ +--------------+ |
| | stream-manager | | hls-segmenter | | vpn-client | |
| | (Python/FastAPI) | | (FFmpeg/nginx-rtmp) | | (WireGuard)| |
| | Port: 8080 | | Port: 8081 | | Port: 51820| |
| | | | | | | |
| | - Camera registry | | - HLS manifest gen | | - Persistent| |
| | - Stream lifecycle | | - Segment rotation | | tunnel | |
| | - Health monitoring | | - DRM/auth tokens | | - Auto-rec | |
| | - REST API | | - Adaptive bitrate | | - Kill sw | |
| +-------------------------+ +-------------------------+ +--------------+ |
| | | |
| +---------v---------+ +--------v---------+ +-------------------+ |
| | frame-extractor | | buffer-manager | | health-monitor | |
| | (Python/CV2) | | (Python) | | (Python) | |
| | | | | | | |
| | - Keyframe decim.| | - Ring buffer | | - DVR ping | |
| | - JPEG/PNG encode| | - Upload queue | | - Stream FPS chk | |
| | - MQTT publish | | - Retry logic | | - Watchdog | |
| | - Buffer frames | | - Disk mgmt | | - Alert emit | |
| +---------+---------+ +--------+---------+ +-------------------+ |
| | | |
| +---------v---------+ | |
| | mqtt-client |<----------------+ |
| | (Eclipse-MQTT) | |
| | Port: 1883 | |
| +-------------------+ |
| |
| Shared Volumes: |
| /data/buffer/ - Alert clip ring buffer (20GB max) |
| /data/hls/ - HLS segments for dashboard (5GB max) |
| /data/config/ - YAML/JSON configuration files |
| /data/logs/ - Application logs (rotated, 1GB max) |
| /tmp/ffmpeg/ - Temporary FFmpeg pipes and segments |
+-----------------------------------------------------------------------------+
3.3 Container Specifications
| Container | Image | CPU Limit | Memory | Ports | Purpose |
|---|---|---|---|---|---|
| stream-manager | Custom Python 3.11 slim | 2 cores | 512MB | 8080/tcp | Core orchestration |
| ffmpeg-worker-1 | jrottenberg/ffmpeg:6.1 | 2 cores | 256MB | — | CH1-CH4 RTSP ingestion |
| ffmpeg-worker-2 | jrottenberg/ffmpeg:6.1 | 2 cores | 256MB | — | CH5-CH8 RTSP ingestion |
| hls-segmenter | alpine/nginx + ffmpeg | 1 core | 256MB | 8081/tcp | HLS manifest serving |
| buffer-manager | Custom Python 3.11 slim | 1 core | 256MB | — | Clip storage & upload |
| mqtt-client | eclipse-mosquitto:2.0 | 0.5 core | 128MB | 1883/tcp | Local MQTT broker |
| vpn-client | lscr.io/linuxserver/wireguard | 0.5 core | 128MB | — | VPN tunnel |
| health-monitor | Custom Python 3.11 slim | 0.5 core | 128MB | — | System watchdog |
| prometheus | prom/prometheus | 0.5 core | 256MB | 9090/tcp | Metrics collection |
| node-exporter | prom/node-exporter | 0.25 core | 64MB | 9100/tcp | Host metrics |
3.4 VPN Tunnel Specification (WireGuard)
# /etc/wireguard/wg0.conf (Edge Gateway)
[Interface]
PrivateKey = {EDGE_PRIVATE_KEY}
Address = 10.100.0.10/24
DNS = 10.100.0.1
MTU = 1400
# Persistent keepalive for NAT traversal
PersistentKeepalive = 25
# Post-up rules: mark packets for routing
PostUp = iptables -t mangle -A OUTPUT -p tcp --dport 554 -j DSCP --set-dscp-class EF
PostUp = iptables -t nat -A POSTROUTING -o wg0 -j MASQUERADE
# Post-down: cleanup
PostDown = iptables -t mangle -D OUTPUT -p tcp --dport 554 -j DSCP --set-dscp-class EF || true
PostDown = iptables -t nat -D POSTROUTING -o wg0 -j MASQUERADE || true
[Peer]
PublicKey = {CLOUD_PUBLIC_KEY}
PresharedKey = {PSK}
AllowedIPs = 10.100.0.0/24
Endpoint = cloud-gateway.example.com:51820
PersistentKeepalive = 25
Cloud-side peer configuration:
# Cloud VPN Server
[Peer]
PublicKey = {EDGE_PUBLIC_KEY}
PresharedKey = {PSK}
AllowedIPs = 10.100.0.10/32
VPN Routing Table:
| Destination | Gateway | Interface | Notes |
|---|---|---|---|
| 192.168.29.0/24 | 192.168.29.1 | eth0 | Local DVR network |
| 10.100.0.0/24 | — | wg0 | VPN tunnel network |
| 0.0.0.0/0 | 192.168.29.1 | eth0 | Default route |
| 10.100.0.1 | — | wg0 | Cloud relay (via tunnel) |
3.5 Edge Gateway Environment Variables
# Core
GATEWAY_ID=edge-ind-01
GATEWAY_LOCATION="Plant Floor A"
LOG_LEVEL=INFO
# DVR Connection
DVR_HOST=192.168.29.200
DVR_RTSP_PORT=554
DVR_HTTP_PORT=80
DVR_USERNAME=admin
DVR_PASSWORD={SECRET_FROM_ENV_FILE}
DVR_CHANNELS=8
# Streams
STREAM_TRANSPORT=tcp # tcp or udp
STREAM_PRIMARY_SUBTYPE=0 # main stream
STREAM_SECONDARY_SUBTYPE=1 # sub stream
STREAM_BUFFER_SIZE=65536 # RTSP buffer
STREAM_RECONNECT_MAX_DELAY=30 # seconds
STREAM_STALL_TIMEOUT=10 # seconds without frame = stalled
# AI Pipeline
AI_INFERENCE_FPS=5 # extract 5 fps for AI
AI_KEYFRAME_ONLY=true # only I-frames for analysis
AI_RESOLUTION=640x480 # resize for inference
AI_MQTT_TOPIC=ai/frames # MQTT topic prefix
AI_CONFIDENCE_THRESHOLD=0.6
# HLS
HLS_SEGMENT_DURATION=2 # seconds per segment
HLS_WINDOW_SIZE=5 # segments in playlist
HLS_OUTPUT_DIR=/data/hls
HLS_CLEANUP_INTERVAL=60 # seconds
# Buffer/Storage
BUFFER_MAX_SIZE_GB=20 # ring buffer max size
BUFFER_RETENTION_HOURS=72 # max age before deletion
UPLOAD_ENDPOINT=https://10.100.0.1/api/upload
UPLOAD_MAX_RETRIES=5
UPLOAD_RETRY_BASE_DELAY=5 # seconds
UPLOAD_CHUNK_SIZE_MB=10
# Monitoring
HEALTH_CHECK_INTERVAL=30 # seconds
METRICS_PORT=9090
WEB_UI_PORT=8080
# VPN
VPN_ENABLED=true
VPN_TUNNEL_IP=10.100.0.10
CLOUD_RELAY_IP=10.100.0.1
4. Stream Discovery & Management
4.1 ONVIF-Based Discovery
CP PLUS DVR ONVIF server V19.06 (V2.6.1.867657) supports:
- Device Management Service (core)
- Media Service (stream URI retrieval)
- PTZ Service (if applicable)
- Event Service (basic)
ONVIF Endpoints:
Device Service: http://192.168.29.200:80/onvif/device_service
Media Service: http://192.168.29.200:80/onvif/media_service
Event Service: http://192.168.29.200:80/onvif/event_service
Discovery workflow:
# Pseudocode for ONVIF discovery
from onvif import ONVIFCamera
def discover_cameras(dvr_host, dvr_port, username, password):
camera = ONVIFCamera(dvr_host, dvr_port, username, password)
# Get device info
devicemgmt = camera.create_devicemgmt_service()
device_info = devicemgmt.GetDeviceInformation()
# Returns: Manufacturer, Model, FirmwareVersion, SerialNumber, HardwareId
# Get media profiles (streams)
media = camera.create_media_service()
profiles = media.GetProfiles()
for profile in profiles:
# Get stream URI for each profile
token = profile.token
stream_uri = media.GetStreamUri(
StreamSetup={'Stream': 'RTP_unicast', 'Transport': {'Protocol': 'RTSP'}},
ProfileToken=token
)
# Returns: rtsp://192.168.29.200:554/cam/realmonitor?channel=N&subtype=M
# Get video encoder configuration
if profile.VideoEncoderConfiguration:
ve = profile.VideoEncoderConfiguration
# ve.Resolution, ve.RateControl.BitrateLimit, ve.RateControl.FrameRateLimit
# Get video source configuration
if profile.VideoSourceConfiguration:
vs = profile.VideoSourceConfiguration
# vs.Bounds (resolution), vs.ViewMode
return profiles
Expected ONVIF Response for CP PLUS CP-UVR-0801E1-CV2:
<GetDeviceInformationResponse>
<Manufacturer>CP PLUS</Manufacturer>
<Model>CP-UVR-0801E1-CV2</Model>
<FirmwareVersion>V4.001.00AT001.3.0</FirmwareVersion>
<SerialNumber>{device_serial}</SerialNumber>
<HardwareId>V1.0</HardwareId>
</GetDeviceInformationResponse>
4.2 Manual Camera Configuration
When ONVIF is unavailable or for manual override, cameras are defined in YAML:
# /data/config/cameras.yaml
cameras:
- channel: 1
name: "Gate/Entry"
enabled: true
primary_stream:
url: "rtsp://admin:{PASSWORD}@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0"
codec: "h265"
resolution: "960x1080"
fps: 25
secondary_stream:
url: "rtsp://admin:{PASSWORD}@192.168.29.200:554/cam/realmonitor?channel=1&subtype=1"
codec: "h264"
resolution: "704x576"
fps: 15
ai_config:
enabled: true
use_secondary: true
inference_fps: 5
keyframe_only: true
roi: [0, 0, 960, 1080]
live_config:
dashboard_default: "secondary" # grid view uses sub stream
fullscreen_uses: "primary"
recording_config:
enabled: true
use_primary: true
pre_event_seconds: 10
post_event_seconds: 30
- channel: 2
name: "Production Floor A"
enabled: true
# ... (same structure)
# ... CH3 through CH8
settings:
default_ai_fps: 5
default_inference_resolution: "640x480"
stream_stall_timeout: 10
reconnect_max_delay: 30
circuit_breaker:
failure_threshold: 5
recovery_timeout: 60
4.3 Stream Puller with FFmpeg — Full Command Reference
4.3.1 Basic Stream Pull (Validation)
# Channel 1 - Main Stream (validation probe)
ffmpeg -rtsp_transport tcp \
-stimeout 5000000 \
-i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
-c copy -f null - \
-v quiet -print_format json \
-show_format -show_streams \
2>/dev/null | jq .
4.3.2 AI Frame Extraction (Keyframe, Decimated)
# Extract I-frames only at 5fps for AI inference (sub stream)
ffmpeg -rtsp_transport tcp -stimeout 5000000 -reconnect 1 -reconnect_at_eof 1 \
-reconnect_streamed 1 -reconnect_delay_max 30 \
-fflags +genpts+discardcorrupt -flags +low_delay \
-i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=1" \
-vf "select='eq(pict_type,I)',fps=5,scale=640:480" \
-f image2pipe -vcodec mjpeg -q:v 2 \
pipe:1
4.3.3 HLS Segment Generation (All 8 Channels)
# Single channel HLS (primary stream)
ffmpeg -rtsp_transport tcp -stimeout 5000000 -reconnect 1 -reconnect_at_eof 1 \
-reconnect_streamed 1 -reconnect_delay_max 30 \
-fflags +genpts+discardcorrupt -flags +low_delay \
-i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
-c:v copy -c:a aac -b:a 128k \
-f hls -hls_time 2 -hls_list_size 5 -hls_flags delete_segments+omit_endlist \
-hls_segment_filename "/data/hls/ch1_%03d.ts" \
"/data/hls/ch1.m3u8"
4.3.4 Simultaneous Multi-Output (Single FFmpeg Instance)
# Single RTSP input -> HLS + frame extraction + recording (tee pseudo-muxer)
ffmpeg -rtsp_transport tcp -stimeout 5000000 -reconnect 1 -reconnect_at_eof 1 \
-reconnect_streamed 1 -reconnect_delay_max 30 \
-fflags +genpts+discardcorrupt -flags +low_delay \
-probesize 5000000 -analyzeduration 2000000 \
-i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
\
# Output 1: HLS for live viewing
-c:v copy -c:a copy \
-f hls -hls_time 2 -hls_list_size 5 \
-hls_flags delete_segments+omit_endlist \
-hls_segment_filename "/data/hls/ch1_%03d.ts" \
"/data/hls/ch1.m3u8" \
\
# Output 2: Transcoded sub-stream for AI (scaled, keyframes)
-vf "select='eq(pict_type,I)',fps=5,scale=640:480" \
-f image2pipe -vcodec mjpeg -q:v 2 \
pipe:1 \
\
# Output 3: MP4 recording (on event trigger)
-c:v copy -c:a copy \
-f segment -segment_time 60 -segment_format mp4 \
-reset_timestamps 1 \
-strftime 1 "/data/buffer/ch1_%Y%m%d_%H%M%S.mp4"
4.3.5 UDP Fallback Command (when TCP fails)
ffmpeg -rtsp_transport udp -buffer_size 65536 \
-i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
-c copy -f hls -hls_time 2 -hls_list_size 5 \
-hls_segment_filename "/data/hls/ch1_%03d.ts" \
"/data/hls/ch1.m3u8"
4.3.8 Complete 8-Channel Parallel Launch Script
#!/bin/bash
# /usr/local/bin/start_streams.sh
# Launches all 8 channel streams with individual control
DVR_IP="192.168.29.200"
DVR_USER="admin"
DVR_PASS="${DVR_PASSWORD}"
HLS_DIR="/data/hls"
BUFFER_DIR="/data/buffer"
FFMPEG_OPTS="-rtsp_transport tcp -stimeout 5000000 -reconnect 1 -reconnect_at_eof 1 \
-reconnect_streamed 1 -reconnect_delay_max 30 -fflags +genpts+discardcorrupt \
-flags +low_delay -probesize 5000000 -analyzeduration 2000000 -nostdin"
for CH in {1..8}; do
# Determine stream type: channels 1-4 use main, 5-8 use sub (bandwidth management)
if [ $CH -le 4 ]; then
SUBTYPE=0
QUALITY="high"
else
SUBTYPE=1
QUALITY="medium"
fi
RTSP_URL="rtsp://${DVR_USER}:${DVR_PASS}@${DVR_IP}:554/cam/realmonitor?channel=${CH}&subtype=${SUBTYPE}"
# Launch with process monitoring
ffmpeg ${FFMPEG_OPTS} \
-i "${RTSP_URL}" \
-c:v copy -c:a copy \
-f hls -hls_time 2 -hls_list_size 5 \
-hls_flags delete_segments+omit_endlist \
-hls_segment_filename "${HLS_DIR}/ch${CH}_%03d.ts" \
"${HLS_DIR}/ch${CH}.m3u8" \
2>"${BUFFER_DIR}/ch${CH}_ffmpeg.log" &
echo $! > "/var/run/stream_ch${CH}.pid"
echo "Started channel ${CH} (${QUALITY}) with PID $!"
done
echo "All 8 channels launched. PIDs stored in /var/run/stream_ch*.pid"
4.4 Auto-Reconnect Strategy
# Auto-reconnect with exponential backoff
import asyncio
import logging
from dataclasses import dataclass
from enum import Enum
from typing import Optional
logger = logging.getLogger(__name__)
class StreamState(Enum):
CONNECTING = "connecting"
ONLINE = "online"
STALLED = "stalled"
OFFLINE = "offline"
CIRCUIT_OPEN = "circuit_open"
@dataclass
class ReconnectConfig:
initial_delay: float = 1.0
max_delay: float = 30.0
multiplier: float = 2.0
jitter: float = 0.1
stall_timeout: float = 10.0
circuit_breaker_threshold: int = 5
circuit_breaker_recovery: float = 60.0
class StreamConnector:
"""
Manages a single RTSP stream connection with:
- Exponential backoff reconnect
- Frame-level stall detection
- Circuit breaker for persistent failures
- Graceful shutdown
"""
def __init__(self, channel: int, config: ReconnectConfig, ffmpeg_cmd: list):
self.channel = channel
self.cfg = config
self.ffmpeg_cmd = ffmpeg_cmd
self.state = StreamState.OFFLINE
self.current_delay = config.initial_delay
self.consecutive_failures = 0
self.circuit_open_time: Optional[float] = None
self.last_frame_time: Optional[float] = None
self._process: Optional[asyncio.subprocess.Process] = None
self._task: Optional[asyncio.Task] = None
self._shutdown = False
async def start(self):
self._task = asyncio.create_task(self._connection_loop())
asyncio.create_task(self._stall_monitor())
async def _connection_loop(self):
while not self._shutdown:
if self.state == StreamState.CIRCUIT_OPEN:
elapsed = asyncio.get_event_loop().time() - self.circuit_open_time
if elapsed < self.cfg.circuit_breaker_recovery:
await asyncio.sleep(1)
continue
# Circuit breaker recovery
self.consecutive_failures = 0
self.state = StreamState.OFFLINE
logger.info(f"CH{self.channel}: Circuit breaker recovered")
if self.state == StreamState.OFFLINE:
await self._attempt_connect()
elif self.state == StreamState.STALLED:
await self._handle_stall()
await asyncio.sleep(0.5)
async def _attempt_connect(self):
self.state = StreamState.CONNECTING
logger.info(f"CH{self.channel}: Connecting (delay={self.current_delay:.1f}s)...")
await asyncio.sleep(self.current_delay)
try:
self._process = await asyncio.create_subprocess_exec(
*self.ffmpeg_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
# Wait for process to stabilize
await asyncio.sleep(3)
if self._process.returncode is not None:
raise ConnectionError(f"FFmpeg exited with code {self._process.returncode}")
self.state = StreamState.ONLINE
self.last_frame_time = asyncio.get_event_loop().time()
self.current_delay = self.cfg.initial_delay
self.consecutive_failures = 0
logger.info(f"CH{self.channel}: Online")
# Monitor the stream
await self._monitor_stream()
except Exception as e:
self.consecutive_failures += 1
self.current_delay = min(
self.current_delay * self.cfg.multiplier,
self.cfg.max_delay
)
if self.consecutive_failures >= self.cfg.circuit_breaker_threshold:
self.state = StreamState.CIRCUIT_OPEN
self.circuit_open_time = asyncio.get_event_loop().time()
logger.error(
f"CH{self.channel}: Circuit breaker OPEN after "
f"{self.consecutive_failures} failures"
)
else:
self.state = StreamState.OFFLINE
logger.warning(f"CH{self.channel}: Connection failed ({e}), "
f"retry in {self.current_delay:.1f}s")
async def _monitor_stream(self):
"""Monitor stderr output for frame reception indicators."""
while self.state == StreamState.ONLINE and self._process:
try:
line = await asyncio.wait_for(
self._process.stderr.readline(), timeout=5.0
)
if line:
self.last_frame_time = asyncio.get_event_loop().time()
# Parse frame info from FFmpeg output
# Example: "frame= 125 fps= 25 q=-1.0 Lsize= ..."
decoded = line.decode('utf-8', errors='ignore').strip()
if 'frame=' in decoded:
self._update_metrics(decoded)
except asyncio.TimeoutError:
logger.debug(f"CH{self.channel}: No stderr output (may be normal)")
async def _stall_monitor(self):
"""Independent task that detects stalls based on frame timestamps."""
while not self._shutdown:
await asyncio.sleep(1)
if self.state == StreamState.ONLINE and self.last_frame_time:
elapsed = asyncio.get_event_loop().time() - self.last_frame_time
if elapsed > self.cfg.stall_timeout:
logger.warning(
f"CH{self.channel}: STALL detected — "
f"no frames for {elapsed:.1f}s"
)
self.state = StreamState.STALLED
async def _handle_stall(self):
"""Kill the stuck process and reconnect."""
logger.info(f"CH{self.channel}: Restarting stalled stream")
await self._kill_process()
self.state = StreamState.OFFLINE
self.current_delay = self.cfg.initial_delay
async def _kill_process(self):
if self._process and self._process.returncode is None:
try:
self._process.terminate()
await asyncio.wait_for(self._process.wait(), timeout=5.0)
except asyncio.TimeoutError:
self._process.kill()
await self._process.wait()
self._process = None
def _update_metrics(self, line: str):
"""Parse and publish frame metrics."""
# Parse: frame= 125 fps= 25 q=-1.0
metrics = {}
for part in line.split():
if '=' in part:
key, _, val = part.partition('=')
metrics[key] = val
if 'fps' in metrics:
self.current_fps = float(metrics['fps'])
async def stop(self):
self._shutdown = True
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
await self._kill_process()
@property
def is_healthy(self) -> bool:
return self.state == StreamState.ONLINE
4.5 Circuit Breaker Pattern
# circuit_breaker.py
import time
from enum import Enum
from dataclasses import dataclass
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing fast
HALF_OPEN = "half_open" # Testing recovery
@dataclass
class CircuitBreakerConfig:
failure_threshold: int = 5 # failures before opening
recovery_timeout: float = 60.0 # seconds before half-open
half_open_max_calls: int = 3 # test calls in half-open state
success_threshold: int = 2 # consecutive successes to close
class CircuitBreaker:
"""Circuit breaker for persistent stream failures."""
def __init__(self, name: str, config: CircuitBreakerConfig):
self.name = name
self.cfg = config
self.state = CircuitState.CLOSED
self.failures = 0
self.successes = 0
self.last_failure_time: Optional[float] = None
self.half_open_calls = 0
def record_success(self):
if self.state == CircuitState.HALF_OPEN:
self.successes += 1
self.half_open_calls += 1
if self.successes >= self.cfg.success_threshold:
self._close_circuit()
elif self.state == CircuitState.CLOSED:
self.failures = max(0, self.failures - 1)
def record_failure(self) -> bool:
self.failures += 1
self.last_failure_time = time.time()
if self.state == CircuitState.HALF_OPEN:
self.half_open_calls += 1
if self.half_open_calls >= self.cfg.half_open_max_calls:
self._open_circuit()
return False
if self.failures >= self.cfg.failure_threshold:
self._open_circuit()
return False
return True # Still allowed
def can_execute(self) -> bool:
if self.state == CircuitState.CLOSED:
return True
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time >= self.cfg.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self.half_open_calls = 0
self.successes = 0
return True
return False
if self.state == CircuitState.HALF_OPEN:
return self.half_open_calls < self.cfg.half_open_max_calls
return False
def _open_circuit(self):
self.state = CircuitState.OPEN
self.failures = self.cfg.failure_threshold
# Alert published here
def _close_circuit(self):
self.state = CircuitState.CLOSED
self.failures = 0
self.successes = 0
self.half_open_calls = 0
4.6 Per-Camera Status Model
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
@dataclass
class CameraStatus:
"""Real-time status for a single camera stream."""
channel: int
name: str
enabled: bool = True
state: str = "OFFLINE" # OFFLINE, CONNECTING, ONLINE, STALLED, ERROR
stream_type: str = "secondary" # primary or secondary currently active
current_fps: float = 0.0
target_fps: float = 25.0
current_bitrate_kbps: float = 0.0
resolution: str = "unknown"
codec: str = "unknown"
last_frame_time: Optional[datetime] = None
connection_time: Optional[datetime] = None
disconnect_count: int = 0
total_bytes_received: int = 0
latency_ms: float = 0.0
keyframe_interval: float = 0.0
circuit_breaker_state: str = "CLOSED"
error_message: Optional[str] = None
ai_fps_actual: float = 0.0
ai_frames_processed: int = 0
hls_segments_available: int = 0
recording_active: bool = False
tags: dict = field(default_factory=dict)
@property
def is_healthy(self) -> bool:
return self.state == "ONLINE" and self.current_fps > 0
@property
def uptime_seconds(self) -> float:
if self.connection_time:
return (datetime.utcnow() - self.connection_time).total_seconds()
return 0.0
def to_dict(self) -> dict:
return {
"channel": self.channel,
"name": self.name,
"enabled": self.enabled,
"state": self.state,
"stream_type": self.stream_type,
"fps": {
"current": round(self.current_fps, 1),
"target": self.target_fps,
},
"bitrate_kbps": round(self.current_bitrate_kbps, 1),
"resolution": self.resolution,
"codec": self.codec,
"last_frame_time": self.last_frame_time.isoformat() if self.last_frame_time else None,
"uptime_seconds": self.uptime_seconds,
"disconnect_count": self.disconnect_count,
"circuit_breaker": self.circuit_breaker_state,
"ai_fps_actual": round(self.ai_fps_actual, 1),
"hls_ready": self.hls_segments_available > 0,
"healthy": self.is_healthy,
}
5. Stream Processing Pipeline
5.1 Pipeline Architecture
+----------+ +-------------+ +------------------+ +---------------+
| RTSP | -> | FFmpeg | -> | Frame Router | -> | Multiple |
| Source | | Demuxer | | & Tee | | Consumers |
+----------+ +-------------+ +------------------+ +---------------+
|
+----------------------+----------------------+
| | |
+-------v-------+ +---------v---------+ +-------v--------+
| HLS Segmenter | | Frame Extractor | | Clip Recorder |
| (live view) | | (AI inference) | | (events) |
+---------------+ +---------+---------+ +----------------+
|
+-----------+-----------+
| |
+---------v---------+ +--------v---------+
| Keyframe Decoder | | JPEG Encoder |
| (I-frame select) | | (AI input) |
+---------+---------+ +--------+---------+
| |
+---------v---------+ +--------v---------+
| Frame Buffer | | MQTT Publisher |
| (ring queue) | | (metadata) |
+-------------------+ +------------------+
5.2 Frame Extraction for AI Inference
The AI inference pipeline extracts frames from the sub-stream (lower bandwidth) at a configurable decimated rate. Keyframes (I-frames) are preferred to reduce decode overhead.
5.2.1 Keyframe-Based Extraction Strategy
| Mode | Description | Use Case |
|---|---|---|
keyframe_only |
Extract only I-frames | Maximum efficiency, may skip motion |
fixed_fps |
Extract at fixed FPS regardless of frame type | Smooth temporal analysis |
adaptive |
I-frames + P-frames when motion detected | Balanced quality and efficiency |
hybrid |
Fixed FPS with I-frame priority | Best overall |
Recommended: hybrid mode — 5 FPS extraction with I-frame priority
# frame_extractor.py — Frame extraction pipeline for AI inference
import asyncio
import cv2
import numpy as np
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, Callable, AsyncIterator
import subprocess
import io
import hashlib
logger = logging.getLogger(__name__)
@dataclass
class FrameConfig:
target_fps: float = 5.0
mode: str = "hybrid" # keyframe_only, fixed_fps, adaptive, hybrid
resolution: tuple = (640, 480)
jpeg_quality: int = 85
use_keyframes: bool = True
motion_threshold: float = 0.02 # for adaptive mode
@dataclass
class ExtractedFrame:
channel: int
timestamp: datetime
frame_number: int
is_keyframe: bool
raw_bytes: bytes
width: int
height: int
motion_score: float = 0.0
jpeg_bytes: Optional[bytes] = None
class FrameExtractor:
"""
Extracts decimated frames from RTSP stream for AI inference.
Supports multiple extraction modes with configurable FPS.
"""
def __init__(self, channel: int, config: FrameConfig):
self.channel = channel
self.cfg = config
self._callback: Optional[Callable[[ExtractedFrame], None]] = None
self._running = False
self._frame_count = 0
self._last_extract_time = 0.0
self._extract_interval = 1.0 / config.target_fps
self._prev_gray: Optional[np.ndarray] = None
def on_frame(self, callback: Callable[[ExtractedFrame], None]):
"""Register callback for extracted frames."""
self._callback = callback
async def start(self, rtsp_url: str):
"""Start frame extraction from RTSP stream."""
self._running = True
ffmpeg_cmd = self._build_ffmpeg_cmd(rtsp_url)
process = await asyncio.create_subprocess_exec(
*ffmpeg_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL,
)
try:
await self._read_frames(process)
finally:
self._running = False
if process.returncode is None:
process.kill()
await process.wait()
def _build_ffmpeg_cmd(self, rtsp_url: str) -> list:
"""Build FFmpeg command for frame extraction."""
w, h = self.cfg.resolution
# Base filters
filters = f"fps={self.cfg.target_fps},scale={w}:{h}"
if self.cfg.use_keyframes:
if self.cfg.mode == "keyframe_only":
filters = f"select='eq(pict_type,I)',{filters}"
elif self.cfg.mode == "hybrid":
# I-frame priority with fixed FPS fallback
filters = f"select='eq(pict_type,I)+gte(n,prev_selected_n+{int(25/self.cfg.target_fps)})',{filters}"
cmd = [
"ffmpeg",
"-rtsp_transport", "tcp",
"-stimeout", "5000000",
"-reconnect", "1",
"-reconnect_at_eof", "1",
"-reconnect_streamed", "1",
"-reconnect_delay_max", "30",
"-fflags", "+genpts+discardcorrupt",
"-flags", "+low_delay",
"-i", rtsp_url,
"-vf", filters,
"-f", "image2pipe",
"-vcodec", "mjpeg",
"-q:v", str(int((100 - self.cfg.jpeg_quality) / 10)),
"-"
]
return cmd
async def _read_frames(self, process: asyncio.subprocess.Process):
"""Read MJPEG frames from FFmpeg pipe."""
buffer = b""
jpeg_start = b'\xff\xd8' # JPEG SOI marker
jpeg_end = b'\xff\xd9' # JPEG EOI marker
while self._running:
chunk = await process.stdout.read(65536)
if not chunk:
break
buffer += chunk
# Find complete JPEG frames
start_idx = buffer.find(jpeg_start)
while start_idx != -1:
end_idx = buffer.find(jpeg_end, start_idx)
if end_idx == -1:
break
jpeg_data = buffer[start_idx:end_idx + 2]
buffer = buffer[end_idx + 2:]
await self._process_jpeg(jpeg_data)
start_idx = buffer.find(jpeg_start)
# Prevent buffer bloat
if len(buffer) > 10 * 1024 * 1024:
buffer = buffer[-5 * 1024 * 1024:]
async def _process_jpeg(self, jpeg_data: bytes):
"""Process extracted JPEG frame."""
self._frame_count += 1
# Decode for metadata and motion detection
nparr = np.frombuffer(jpeg_data, np.uint8)
frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if frame is None:
return
h, w = frame.shape[:2]
is_keyframe = self._frame_count % int(25 / self.cfg.target_fps) == 1
# Motion detection (for adaptive mode)
motion_score = 0.0
if self.cfg.mode in ("adaptive", "hybrid") and self._prev_gray is not None:
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
resized = cv2.resize(gray, (w // 4, h // 4))
flow = cv2.calcOpticalFlowFarneback(
self._prev_gray, resized, None,
0.5, 3, 15, 3, 5, 1.2, 0
)
motion_score = np.mean(np.abs(flow))
self._prev_gray = resized
elif self.cfg.mode in ("adaptive", "hybrid"):
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
self._prev_gray = cv2.resize(gray, (w // 4, h // 4))
extracted = ExtractedFrame(
channel=self.channel,
timestamp=datetime.utcnow(),
frame_number=self._frame_count,
is_keyframe=is_keyframe,
raw_bytes=frame.tobytes(),
width=w,
height=h,
motion_score=float(motion_score),
jpeg_bytes=jpeg_data,
)
if self._callback:
try:
if asyncio.iscoroutinefunction(self._callback):
await self._callback(extracted)
else:
self._callback(extracted)
except Exception as e:
logger.error(f"Frame callback error: {e}")
async def stop(self):
self._running = False
class MultiChannelExtractor:
"""Manages frame extraction across all 8 channels."""
def __init__(self, max_concurrent: int = 8):
self.extractors: dict[int, FrameExtractor] = {}
self.tasks: dict[int, asyncio.Task] = {}
self.max_concurrent = max_concurrent
self._frame_queue: asyncio.Queue[ExtractedFrame] = asyncio.Queue(maxsize=100)
async def add_channel(self, channel: int, rtsp_url: str, config: FrameConfig):
extractor = FrameExtractor(channel, config)
extractor.on_frame(self._on_frame)
self.extractors[channel] = extractor
self.tasks[channel] = asyncio.create_task(
extractor.start(rtsp_url),
name=f"extractor-ch{channel}"
)
async def _on_frame(self, frame: ExtractedFrame):
try:
self._frame_queue.put_nowait(frame)
except asyncio.QueueFull:
# Drop oldest frame to prevent blocking
try:
self._frame_queue.get_nowait()
self._frame_queue.put_nowait(frame)
except asyncio.QueueEmpty:
pass
async def frame_consumer(self, handler: Callable[[ExtractedFrame], None]):
"""Consume frames from the queue."""
while True:
frame = await self._frame_queue.get()
try:
if asyncio.iscoroutinefunction(handler):
await handler(frame)
else:
handler(frame)
except Exception as e:
logger.error(f"Frame handler error: {e}")
finally:
self._frame_queue.task_done()
async def stop_all(self):
for extractor in self.extractors.values():
await extractor.stop()
for task in self.tasks.values():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
5.3 Simultaneous Stream Forking (Live + AI)
For each channel, the gateway must simultaneously provide:
- HLS live stream — for dashboard viewing (full quality)
- AI frame stream — decimated frames for inference
- Alert clip stream — triggered recordings
Approach: Single FFmpeg → Multi-target output using tee pseudo-muxer
# Advanced: Single RTSP input, multiple outputs with tee muxer
ffmpeg -rtsp_transport tcp -stimeout 5000000 \
-fflags +genpts+discardcorrupt -flags +low_delay \
-i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
\
# Split into two video streams using split filter
-filter_complex "[0:v]split=2[live][ai];[ai]fps=5,scale=640:480[aiout]" \
\
# Output 1: HLS live stream (copy codec, no re-encode)
-map "[live]" -map 0:a? -c:v copy -c:a copy \
-f hls -hls_time 2 -hls_list_size 5 \
-hls_flags delete_segments+omit_endlist+program_date_time \
-hls_segment_filename "/data/hls/ch1_%03d.ts" \
"/data/hls/ch1.m3u8" \
\
# Output 2: AI frame stream (MJPEG to pipe)
-map "[aiout]" -an \
-f image2pipe -vcodec mjpeg -q:v 3 \
pipe:1 \
\
# Output 3: Continuous recording (segmented MP4)
-map 0:v -map 0:a? -c:v copy -c:a copy \
-f segment -segment_time 60 -segment_format mp4 \
-reset_timestamps 1 -strftime 1 \
"/data/buffer/ch1_continuous/%Y%m%d_%H%M%S.mp4"
5.4 Resolution Adaptation
The gateway supports dynamic resolution adaptation based on network conditions:
| Scenario | Action | Trigger |
|---|---|---|
| Bandwidth sufficient | Full resolution (960x1080) | VPN throughput > 16 Mbps |
| Bandwidth constrained | Downscale to 640x720 | VPN throughput < 8 Mbps |
| Severe constraint | Switch to sub-stream (352x288) | VPN throughput < 4 Mbps |
| Recovery detected | Gradually upscale | Sustained > threshold for 30s |
class AdaptiveBitrateController:
"""
Monitors VPN tunnel bandwidth and adjusts stream quality.
Uses EWMA (exponentially weighted moving average) for smoothing.
"""
QUALITY_LEVELS = {
"full": {"resolution": (960, 1080), "stream": "primary"},
"medium": {"resolution": (640, 720), "stream": "primary"},
"low": {"resolution": (352, 288), "stream": "secondary"},
}
def __init__(self, min_bandwidth_mbps: float = 4.0):
self.ewma_alpha = 0.3
self.current_bandwidth = 100.0 # Start optimistic
self.min_bandwidth = min_bandwidth_mbps
self.current_quality = "full"
self._history: list[float] = []
def update_bandwidth(self, measured_mbps: float):
self.current_bandwidth = (
self.ewma_alpha * measured_mbps +
(1 - self.ewma_alpha) * self.current_bandwidth
)
self._history.append(self.current_bandwidth)
if len(self._history) > 100:
self._history = self._history[-50:]
def get_quality(self) -> str:
bw = self.current_bandwidth
if bw > 16:
target = "full"
elif bw > 8:
target = "medium"
else:
target = "low"
# Hysteresis: only downgrade immediately, upgrade with delay
if self.QUALITY_LEVELS[target]["resolution"][0] < \
self.QUALITY_LEVELS[self.current_quality]["resolution"][0]:
# Downgrade immediately
self.current_quality = target
elif target != self.current_quality:
# Check sustained improvement before upgrading
if len(self._history) >= 10 and \
all(b > self.QUALITY_LEVELS[target]["resolution"][0] / 60
for b in self._history[-10:]):
self.current_quality = target
return self.current_quality
5.5 Bandwidth Optimization for Cloud Upload
+------------------+ +-------------------+ +------------------+
| Raw RTSP Stream | -> | Edge Transcoder | -> | Cloud-Optimized |
| (H.265, 4 Mbps) | | (H.264, 1 Mbps) | | Upload Stream |
+------------------+ +-------------------+ +------------------+
|
+---------v---------+
| Optimization: |
| - H.265→H.264 |
| - CRF 28 (quality)|
| - 2-pass VBR |
| - De-duplication |
| - Delta frames |
+-------------------+
FFmpeg Cloud Upload Encoding:
# Transcode H.265 main stream to H.264 for cloud compatibility
ffmpeg -rtsp_transport tcp -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
-c:v libx264 -preset fast -tune zerolatency \
-crf 28 -maxrate 1500k -bufsize 3000k \
-g 50 -keyint_min 25 \
-profile:v main -level 4.1 \
-movflags +faststart \
-c:a aac -b:a 64k -ac 1 \
-f mp4 pipe:1 | curl -X POST -H "Content-Type: video/mp4" \
--data-binary @- https://10.100.0.1/api/upload/ch1
Upload Optimization Techniques:
| Technique | Bandwidth Savings | Implementation |
|---|---|---|
| H.265→H.264 with CRF 28 | 40-60% | FFmpeg libx264 |
| Frame deduplication (static scenes) | 10-30% | Skip identical frames |
| Adaptive GOP | 5-15% | Dynamic I-frame interval |
| Resolution scaling | 30-70% | Scale to 720p or 480p |
| Audio mono + low bitrate | 5% | AAC 64kbps mono |
| Upload compression (gzip) | 5-10% | HTTP Content-Encoding |
6. Live Streaming for Dashboard
6.1 HLS Stream Generation
HLS (HTTP Live Streaming) is the primary delivery mechanism for the dashboard due to its wide browser support and adaptive bitrate capabilities.
6.1.1 HLS Playlist Format (Generated per Channel)
# /data/hls/ch1.m3u8
#EXTM3U
#EXT-X-VERSION:3
#EXT-X-TARGETDURATION:2
#EXT-X-MEDIA-SEQUENCE:1245
#EXT-X-DISCONTINUITY-SEQUENCE:0
#EXTINF:2.000,
ch1_245.ts
#EXTINF:2.000,
ch1_246.ts
#EXTINF:2.000,
ch1_247.ts
#EXTINF:2.000,
ch1_248.ts
#EXTINF:2.000,
ch1_249.ts
6.1.2 Multi-Quality HLS (Master Playlist)
For adaptive streaming, generate multiple quality variants:
# /data/hls/ch1_master.m3u8
#EXTM3U
#EXT-X-STREAM-INF:BANDWIDTH=4000000,RESOLUTION=960x1080,CODECS="avc1.640020,mp4a.40.2"
ch1_high.m3u8
#EXT-X-STREAM-INF:BANDWIDTH=1500000,RESOLUTION=640x720,CODECS="avc1.4d001f,mp4a.40.2"
ch1_mid.m3u8
#EXT-X-STREAM-INF:BANDWIDTH=500000,RESOLUTION=352x288,CODECS="avc1.42001e,mp4a.40.2"
ch1_low.m3u8
6.1.3 FFmpeg HLS Generation Commands
# High quality variant (primary stream)
ffmpeg -rtsp_transport tcp -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
-c:v libx264 -preset ultrafast -tune zerolatency -profile:v high -level 4.1 \
-b:v 3000k -maxrate 4000k -bufsize 6000k -g 50 -keyint_min 25 \
-c:a aac -b:a 128k -ar 44100 -ac 2 \
-f hls -hls_time 2 -hls_list_size 5 -hls_flags delete_segments+omit_endlist \
-hls_segment_type mpegts -hls_segment_filename "/data/hls/ch1_high_%03d.ts" \
"/data/hls/ch1_high.m3u8" \
\
# Mid quality variant (transcoded)
-c:v libx264 -preset ultrafast -tune zerolatency -profile:v main -level 3.1 \
-vf "scale=640:720" -b:v 1000k -maxrate 1500k -bufsize 2000k -g 50 \
-c:a aac -b:a 96k -ar 44100 -ac 2 \
-f hls -hls_time 2 -hls_list_size 5 -hls_flags delete_segments+omit_endlist \
-hls_segment_filename "/data/hls/ch1_mid_%03d.ts" \
"/data/hls/ch1_mid.m3u8"
6.2 WebSocket + WebRTC for Low-Latency Preview
For sub-500ms latency preview (single camera fullscreen), WebRTC is used:
+-------------+ +------------------+ +-----------------+
| Browser | -> | Edge Gateway | -> | CP PLUS DVR |
| (WebRTC) | | (WHIP Bridge) | | (RTSP Source) |
| |<- | |<- | |
+-------------+ +------------------+ +-----------------+
Latency breakdown:
RTSP ingest: ~200-500ms
FFmpeg decode: ~50-100ms
WebRTC encode: ~30-80ms
Network (LAN): ~1-5ms
Browser decode: ~20-50ms
TOTAL: ~300-750ms
WebRTC Bridge (Python/aiortc):
# webrtc_bridge.py — Low-latency WebRTC bridge for single-camera preview
import asyncio
import logging
from aiortc import RTCPeerConnection, VideoStreamTrack
from aiortc.contrib.signaling import object_from_string, object_to_string
from av import VideoFrame
import cv2
import numpy as np
logger = logging.getLogger(__name__)
class RTSPVideoStreamTrack(VideoStreamTrack):
"""Reads RTSP via FFmpeg, outputs WebRTC VideoFrame."""
def __init__(self, rtsp_url: str):
super().__init__()
self.rtsp_url = rtsp_url
self._ffmpeg = None
self._task = None
self._frame_queue = asyncio.Queue(maxsize=2) # Minimize latency
async def start(self):
cmd = [
"ffmpeg", "-rtsp_transport", "tcp", "-stimeout", "5000000",
"-reconnect", "1", "-reconnect_at_eof", "1",
"-reconnect_streamed", "1", "-reconnect_delay_max", "30",
"-fflags", "+genpts+discardcorrupt", "-flags", "+low_delay",
"-i", self.rtsp_url,
"-vf", "scale=640:480,format=yuv420p",
"-c:v", "rawvideo", "-pix_fmt", "yuv420p",
"-f", "rawvideo", "-"
]
self._ffmpeg = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL,
)
self._task = asyncio.create_task(self._read_frames())
async def _read_frames(self):
width, height = 640, 480
frame_size = width * height * 3 // 2 # YUV420P
while True:
raw = await self._ffmpeg.stdout.read(frame_size)
if len(raw) < frame_size:
break
frame = VideoFrame.from_bytes(raw, format="yuv420p", width=width, height=height)
try:
self._frame_queue.put_nowait(frame)
except asyncio.QueueFull:
# Drop oldest to minimize latency
self._frame_queue.get_nowait()
self._frame_queue.put_nowait(frame)
async def recv(self):
frame = await self._frame_queue.get()
pts, time_base = await self.next_timestamp()
frame.pts = pts
frame.time_base = time_base
return frame
async def stop(self):
if self._task:
self._task.cancel()
if self._ffmpeg:
self._ffmpeg.kill()
await self._ffmpeg.wait()
class WebRTCBridge:
"""Manages WebRTC peer connections for low-latency camera preview."""
def __init__(self, rtsp_url: str):
self.rtsp_url = rtsp_url
self.pc = None
self.track = None
async def create_offer(self):
self.pc = RTCPeerConnection()
self.track = RTSPVideoStreamTrack(self.rtsp_url)
await self.track.start()
self.pc.addTrack(self.track)
offer = await self.pc.createOffer()
await self.pc.setLocalDescription(offer)
return {"sdp": self.pc.localDescription.sdp, "type": self.pc.localDescription.type}
async def handle_answer(self, answer_sdp: str):
answer = object_from_string(json.dumps({"sdp": answer_sdp, "type": "answer"}))
await self.pc.setRemoteDescription(answer)
async def close(self):
if self.track:
await self.track.stop()
if self.pc:
await self.pc.close()
6.3 Multi-Camera Grid Layout
The dashboard supports multiple grid layouts with automatic stream selection:
| Layout | Cameras Visible | Stream Used | Resolution | Per-Stream BW |
|---|---|---|---|---|
| 1x1 (fullscreen) | 1 | Primary (subtype=0) | 960x1080 | ~3 Mbps |
| 2x2 | 4 | Secondary (subtype=1) | 352x288 | ~0.5 Mbps each |
| 3x3 | 8+1 empty | Secondary (subtype=1) | 352x288 | ~0.5 Mbps each |
| 4x2 | 8 | Secondary (subtype=1) | 352x288 | ~0.5 Mbps each |
| Custom | User-defined | Mixed | Mixed | Sum of selected |
Grid Switching Logic:
def get_stream_for_layout(channel: int, layout: str, is_focused: bool) -> str:
"""Determine which RTSP stream to use based on layout context."""
if layout == "1x1" or is_focused:
# Fullscreen — use main stream for quality
return f"rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel={channel}&subtype=0"
else:
# Grid view — use sub stream for bandwidth efficiency
return f"rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel={channel}&subtype=1"
6.4 Stream Relay Through Cloud
[Edge Gateway HLS] --VPN--> [Cloud Nginx] --> [CDN / Dashboard Users]
(10.100.0.1:443)
Nginx Configuration:
- Reverse proxy to edge gateway HLS endpoint
- Token-based authentication (JWT)
- Rate limiting per client
- HTTPS termination
- HLS caching (5-second segments)
Nginx HLS Relay Configuration:
# /etc/nginx/conf.d/hls-relay.conf
upstream edge_gateway_hls {
server 10.100.0.10:8081; # Via WireGuard tunnel
keepalive 32;
}
server {
listen 443 ssl http2;
server_name streams.example.com;
ssl_certificate /etc/ssl/certs/stream.crt;
ssl_certificate_key /etc/ssl/private/stream.key;
# HLS endpoint authentication
location /hls/ {
# JWT validation
auth_jwt "HLS Stream";
auth_jwt_key_file /etc/nginx/jwt_key.pub;
# Rate limiting
limit_req zone=hls_limit burst=10 nodelay;
proxy_pass http://edge_gateway_hls/hls/;
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_buffering off;
proxy_cache off;
# CORS for web dashboard
add_header Access-Control-Allow-Origin "https://dashboard.example.com" always;
add_header Access-Control-Allow-Methods "GET, OPTIONS" always;
# HLS-specific headers
add_header Cache-Control "no-cache" always;
add_header Access-Control-Expose-Headers "Content-Length" always;
# Segment caching (brief)
location ~* \.ts$ {
proxy_pass http://edge_gateway_hls;
proxy_cache hls_cache;
proxy_cache_valid 200 10s;
expires 10s;
}
}
# WebSocket for real-time status
location /ws/ {
proxy_pass http://edge_gateway_hls/ws/;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
}
6.5 Fallback Handling
When a camera stream fails, the following fallback chain is executed:
1. RTSP connection fails
├──> Retry with exponential backoff (3 attempts)
├──> Attempt UDP transport if TCP fails
├──> Circuit breaker opens after 5 consecutive failures
│
2. Stream stall detected (no frames for 10s)
├──> Kill FFmpeg process
├──> Restart stream with fresh connection
│
3. Camera marked OFFLINE
├──> Dashboard shows "Camera Offline" placeholder
├──> HLS playlist returns 404
├──> Last known frame displayed with timestamp overlay
├──> Alert sent to operations team
│
4. Camera recovers
├──> Circuit breaker transitions to HALF_OPEN
├──> Test stream pulled for 10 seconds
├──> On success: circuit CLOSED, stream resumes
├──> Dashboard auto-refreshes
HLS Error Response (Camera Offline):
#EXTM3U
#EXT-X-VERSION:3
#EXT-X-TARGETDURATION:2
#EXT-X-MEDIA-SEQUENCE:0
#EXT-X-ERROR: "Camera OFFLINE - Channel 1"
#EXTINF:2.000,
offline_placeholder.ts
7. Edge Buffering & Local Storage
7.1 Storage Architecture
The edge gateway uses a tiered storage system optimized for limited SSD capacity:
+-------------------------------------------------------------------+
| Edge Gateway Storage (512GB NVMe) |
| |
| +---------------------+ +---------------------+ +-----------+ |
| | Ring Buffer (20GB) | | HLS Cache (5GB) | | Logs (1GB)| |
| | /data/buffer/ | | /data/hls/ | |/data/logs/| |
| | | | | | | |
| | Alert clips (MP4) | | HLS segments (.ts) | | Rotated | |
| | Pre-event buffer | | Playlists (.m3u8) | | daily | |
| | Upload queue | | 5-segment window | | | |
| | Retry staging | | Auto-cleanup | | | |
| +----------+----------+ +---------------------+ +-----------+ |
| | |
| +----------v----------+ |
| | Upload Manager | ---> VPN ---> Cloud S3/MinIO |
| | - Parallel uploads | (10.100.0.1:9000) |
| | - Retry with backoff| |
| | - Integrity verify | |
| +---------------------+ |
+-------------------------------------------------------------------+
Storage Quotas:
| Directory | Max Size | Max Age | Cleanup Strategy |
|---|---|---|---|
/data/buffer/alert_clips/ |
15 GB | 72 hours | LRU + age-based |
/data/buffer/pre_event/ |
3 GB | 24 hours | Ring buffer (FIFO) |
/data/buffer/upload_queue/ |
2 GB | 48 hours | On successful upload |
/data/hls/ |
5 GB | 10 seconds | Immediate (segment rotation) |
/data/logs/ |
1 GB | 30 days | Logrotate daily |
/tmp/ffmpeg/ |
2 GB | On process exit | Cleanup on restart |
| Total Reserved | ~28 GB | — | — |
7.2 Ring Buffer for Alert Clips
The ring buffer stores video clips triggered by AI detection events (motion, intrusion, safety violation, etc.).
7.2.1 Ring Buffer Design
# ring_buffer.py — Alert clip ring buffer with disk management
import os
import time
import json
import asyncio
import logging
import shutil
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional, List
import aiofiles
import hashlib
logger = logging.getLogger(__name__)
@dataclass
class ClipMetadata:
clip_id: str
channel: int
camera_name: str
event_type: str # motion, intrusion, safety_violation, etc.
event_confidence: float
start_time: datetime # clip start (includes pre-event)
event_time: datetime # actual detection time
end_time: datetime # clip end (includes post-event)
duration_seconds: float
file_path: str
file_size_bytes: int
checksum_sha256: str
upload_status: str # pending, uploading, uploaded, failed
upload_attempts: int = 0
resolution: str = "960x1080"
fps: int = 25
tags: List[str] = None
def __post_init__(self):
if self.tags is None:
self.tags = []
class RingBuffer:
"""
Fixed-size ring buffer for alert clips.
When full, oldest clips are deleted to make room.
"""
def __init__(self, base_path: str, max_size_bytes: int, max_age_hours: int):
self.base_path = Path(base_path)
self.max_size = max_size_bytes
self.max_age = timedelta(hours=max_age_hours)
self.metadata_file = self.base_path / "index.jsonl"
self._clips: List[ClipMetadata] = []
self._current_size = 0
self._lock = asyncio.Lock()
async def initialize(self):
"""Load existing clips from disk and calculate current usage."""
self.base_path.mkdir(parents=True, exist_ok=True)
if self.metadata_file.exists():
async with aiofiles.open(self.metadata_file, 'r') as f:
async for line in f:
line = line.strip()
if line:
data = json.loads(line)
clip = ClipMetadata(**data)
clip.start_time = datetime.fromisoformat(data['start_time'])
clip.event_time = datetime.fromisoformat(data['event_time'])
clip.end_time = datetime.fromisoformat(data['end_time'])
self._clips.append(clip)
self._current_size += clip.file_size_bytes
# Clean old clips on startup
await self._cleanup_old_clips()
logger.info(f"Ring buffer initialized: {len(self._clips)} clips, "
f"{self._current_size / 1e9:.2f} GB")
async def store_clip(self, source_path: str, metadata: ClipMetadata) -> ClipMetadata:
"""Store a new clip in the ring buffer."""
async with self._lock:
# Make room if needed
while self._current_size + metadata.file_size_bytes > self.max_size:
if not self._clips:
break
await self._remove_oldest_clip()
# Copy file to buffer
dest_path = self.base_path / f"{metadata.clip_id}.mp4"
shutil.copy2(source_path, dest_path)
# Verify checksum
actual_checksum = await self._calculate_checksum(dest_path)
if actual_checksum != metadata.checksum_sha256:
os.remove(dest_path)
raise ValueError("Checksum mismatch — possible corruption")
metadata.file_path = str(dest_path)
self._clips.append(metadata)
self._current_size += metadata.file_size_bytes
# Persist metadata
await self._append_metadata(metadata)
logger.info(f"Stored clip {metadata.clip_id} "
f"({metadata.file_size_bytes / 1e6:.1f} MB)")
return metadata
async def get_pending_uploads(self, batch_size: int = 10) -> List[ClipMetadata]:
"""Get clips pending upload, ordered by age."""
pending = [
c for c in self._clips
if c.upload_status in ("pending", "failed")
and c.upload_attempts < 5
]
return sorted(pending, key=lambda c: c.event_time)[:batch_size]
async def mark_uploaded(self, clip_id: str):
"""Mark a clip as successfully uploaded."""
for clip in self._clips:
if clip.clip_id == clip_id:
clip.upload_status = "uploaded"
await self._rewrite_metadata()
# Optionally delete local file after successful upload
# await self._remove_clip_file(clip)
break
async def mark_failed(self, clip_id: str):
"""Increment failure counter for a clip."""
for clip in self._clips:
if clip.clip_id == clip_id:
clip.upload_attempts += 1
clip.upload_status = "failed"
await self._rewrite_metadata()
break
async def _remove_oldest_clip(self):
"""Remove the oldest clip to free space."""
if not self._clips:
return
oldest = min(self._clips, key=lambda c: c.event_time)
await self._remove_clip(oldest)
async def _remove_clip(self, clip: ClipMetadata):
"""Remove a specific clip from buffer."""
try:
if os.path.exists(clip.file_path):
os.remove(clip.file_path)
except OSError:
pass
self._clips.remove(clip)
self._current_size -= clip.file_size_bytes
async def _cleanup_old_clips(self):
"""Remove clips older than max_age."""
cutoff = datetime.utcnow() - self.max_age
old_clips = [c for c in self._clips if c.event_time < cutoff]
for clip in old_clips:
await self._remove_clip(clip)
if old_clips:
await self._rewrite_metadata()
logger.info(f"Cleaned {len(old_clips)} old clips")
async def _calculate_checksum(self, file_path: Path) -> str:
sha = hashlib.sha256()
async with aiofiles.open(file_path, 'rb') as f:
while chunk := await f.read(8192):
sha.update(chunk)
return sha.hexdigest()
async def _append_metadata(self, clip: ClipMetadata):
async with aiofiles.open(self.metadata_file, 'a') as f:
data = asdict(clip)
data['start_time'] = clip.start_time.isoformat()
data['event_time'] = clip.event_time.isoformat()
data['end_time'] = clip.end_time.isoformat()
await f.write(json.dumps(data) + '\n')
async def _rewrite_metadata(self):
async with aiofiles.open(self.metadata_file, 'w') as f:
for clip in self._clips:
data = asdict(clip)
data['start_time'] = clip.start_time.isoformat()
data['event_time'] = clip.event_time.isoformat()
data['end_time'] = clip.end_time.isoformat()
await f.write(json.dumps(data) + '\n')
@property
def stats(self) -> dict:
return {
"clip_count": len(self._clips),
"total_size_bytes": self._current_size,
"total_size_gb": round(self._current_size / 1e9, 2),
"max_size_gb": round(self.max_size / 1e9, 2),
"usage_percent": round(self._current_size / self.max_size * 100, 1),
"pending_uploads": len([c for c in self._clips if c.upload_status == "pending"]),
"failed_uploads": len([c for c in self._clips if c.upload_status == "failed"]),
}
7.3 Upload Queue with Retry
# upload_manager.py — Cloud upload with retry and integrity verification
import asyncio
import aiohttp
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
import json
logger = logging.getLogger(__name__)
@dataclass
class UploadConfig:
endpoint_url: str = "https://10.100.0.1/api/upload"
max_retries: int = 5
base_delay: float = 5.0 # seconds
max_delay: float = 300.0 # 5 minutes
timeout: float = 60.0 # per-request timeout
concurrent_uploads: int = 3
verify_ssl: bool = True # False for self-signed in dev
class UploadManager:
"""
Manages cloud uploads with:
- Exponential backoff retry
- Parallel upload limiting
- Integrity verification
- Bandwidth throttling
"""
def __init__(self, config: UploadConfig, ring_buffer: RingBuffer):
self.cfg = config
self.buffer = ring_buffer
self._semaphore = asyncio.Semaphore(config.concurrent_uploads)
self._session: Optional[aiohttp.ClientSession] = None
self._running = False
async def start(self):
ssl_ctx = False if not self.cfg.verify_ssl else None
connector = aiohttp.TCPConnector(
limit=10,
limit_per_host=5,
enable_cleanup_closed=True,
force_close=True,
)
timeout = aiohttp.ClientTimeout(total=self.cfg.timeout)
self._session = aiohttp.ClientSession(
connector=connector,
timeout=timeout,
)
self._running = True
asyncio.create_task(self._upload_loop())
async def _upload_loop(self):
"""Background task that continuously processes the upload queue."""
while self._running:
try:
pending = await self.buffer.get_pending_uploads(batch_size=5)
if not pending:
await asyncio.sleep(5)
continue
tasks = [
self._upload_with_retry(clip)
for clip in pending
]
await asyncio.gather(*tasks, return_exceptions=True)
except Exception as e:
logger.error(f"Upload loop error: {e}")
await asyncio.sleep(10)
async def _upload_with_retry(self, clip: ClipMetadata):
"""Upload a clip with exponential backoff retry."""
async with self._semaphore:
for attempt in range(1, self.cfg.max_retries + 1):
try:
await self._do_upload(clip)
await self.buffer.mark_uploaded(clip.clip_id)
logger.info(f"Uploaded clip {clip.clip_id} "
f"(attempt {attempt})")
return
except aiohttp.ClientError as e:
delay = min(
self.cfg.base_delay * (2 ** (attempt - 1)),
self.cfg.max_delay
)
logger.warning(
f"Upload failed for {clip.clip_id} "
f"(attempt {attempt}/{self.cfg.max_retries}): {e}. "
f"Retry in {delay}s"
)
await asyncio.sleep(delay)
# All retries exhausted
await self.buffer.mark_failed(clip.clip_id)
logger.error(f"Upload permanently failed for {clip.clip_id}")
async def _do_upload(self, clip: ClipMetadata):
"""Perform the actual HTTP upload."""
upload_url = f"{self.cfg.endpoint_url}/{clip.channel}"
form_data = aiohttp.FormData()
form_data.add_field(
'file',
open(clip.file_path, 'rb'),
filename=f"{clip.clip_id}.mp4",
content_type='video/mp4',
)
form_data.add_field('metadata', json.dumps({
'clip_id': clip.clip_id,
'channel': clip.channel,
'event_type': clip.event_type,
'event_time': clip.event_time.isoformat(),
'checksum': clip.checksum_sha256,
}))
async with self._session.post(upload_url, data=form_data) as resp:
if resp.status != 200:
body = await resp.text()
raise aiohttp.ClientError(f"HTTP {resp.status}: {body}")
result = await resp.json()
# Verify server-side checksum matches
if result.get('checksum') != clip.checksum_sha256:
raise ValueError("Server checksum mismatch")
async def stop(self):
self._running = False
if self._session:
await self._session.close()
async def get_queue_status(self) -> dict:
"""Get current upload queue status."""
pending = await self.buffer.get_pending_uploads(batch_size=1000)
return {
"pending_count": len(pending),
"in_progress": self.cfg.concurrent_uploads - self._semaphore._value,
"endpoint": self.cfg.endpoint_url,
"last_check": datetime.utcnow().isoformat(),
}
7.4 Disk Management — Automatic Cleanup
# disk_manager.py — Proactive disk space management
import os
import shutil
import asyncio
import logging
from pathlib import Path
from typing import List, Dict
logger = logging.getLogger(__name__)
class DiskManager:
"""
Monitors disk usage and enforces quotas.
Implements emergency cleanup when disk is critically full.
"""
CRITICAL_THRESHOLD = 95 # % — emergency cleanup
WARNING_THRESHOLD = 85 # % — proactive cleanup
TARGET_USAGE = 80 # % — cleanup target
def __init__(self, mounts: Dict[str, dict]):
"""
mounts: { path: { max_size_gb, max_age_hours, priority } }
priority: lower = deleted first during cleanup
"""
self.mounts = mounts
async def monitor_loop(self, interval_seconds: float = 60.0):
"""Continuous disk monitoring loop."""
while True:
try:
for path, config in self.mounts.items():
usage = self._get_usage(path)
logger.debug(f"Disk {path}: {usage['percent']:.1f}% used")
if usage['percent'] > self.CRITICAL_THRESHOLD:
logger.critical(f"CRITICAL: Disk {path} at "
f"{usage['percent']:.1f}%")
await self._emergency_cleanup(path, config)
elif usage['percent'] > self.WARNING_THRESHOLD:
logger.warning(f"Disk {path} at {usage['percent']:.1f}%")
await self._proactive_cleanup(path, config)
except Exception as e:
logger.error(f"Disk monitor error: {e}")
await asyncio.sleep(interval_seconds)
def _get_usage(self, path: str) -> dict:
stat = shutil.disk_usage(path)
total = stat.total
used = stat.used
free = stat.free
return {
"total_gb": total / 1e9,
"used_gb": used / 1e9,
"free_gb": free / 1e9,
"percent": (used / total) * 100,
}
async def _emergency_cleanup(self, path: str, config: dict):
"""Aggressive cleanup to free space immediately."""
# 1. Delete HLS cache oldest segments aggressively
hls_dir = Path(path) / "hls"
if hls_dir.exists():
segments = sorted(hls_dir.glob("*.ts"), key=lambda f: f.stat().st_mtime)
for seg in segments[:-3]: # Keep only newest 3 segments
seg.unlink()
logger.info(f"Emergency deleted HLS segment: {seg}")
# 2. Delete already-uploaded clips
buffer_dir = Path(path) / "buffer"
if buffer_dir.exists():
for clip_file in sorted(buffer_dir.glob("*.mp4"), key=lambda f: f.stat().st_mtime):
clip_file.unlink()
logger.info(f"Emergency deleted uploaded clip: {clip_file}")
usage = self._get_usage(path)
if usage['percent'] < self.TARGET_USAGE:
break
# 3. Compress logs
logs_dir = Path(path) / "logs"
if logs_dir.exists():
for log_file in sorted(logs_dir.glob("*.log"), key=lambda f: f.stat().st_mtime):
if not str(log_file).endswith('.gz'):
os.system(f"gzip {log_file}")
async def _proactive_cleanup(self, path: str, config: dict):
"""Normal proactive cleanup — age-based deletion."""
# Age-based cleanup handled by RingBuffer
pass
7.5 Pre-Event Buffer (Continuous Circular Recording)
For capturing video before an AI-detected event, a continuous circular buffer is maintained:
# Continuous 60-second rolling buffer per channel (primary stream)
ffmpeg -rtsp_transport tcp -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
-c:v copy -c:a copy -f segment -segment_time 10 -segment_wrap 6 \
-reset_timestamps 1 -strftime 0 \
"/data/buffer/pre_event/ch1_%d.mp4"
This creates 6 x 10-second segments that continuously overwrite:
ch1_0.mp4— newestch1_1.mp4- ...
ch1_5.mp4— oldest (will be overwritten next)
On event detection:
- Lock current segments (prevent overwrite)
- Continue recording for
post_event_seconds(e.g., 30s) - Concatenate pre-event + event + post-event segments
- Store in ring buffer
- Queue for cloud upload
async def handle_event_detection(self, channel: int, event_time: datetime):
"""Trigger clip capture with pre/post-event padding."""
pre_buffer_dir = f"/data/buffer/pre_event/ch{channel}"
clip_id = f"evt_{channel}_{event_time.strftime('%Y%m%d_%H%M%S')}"
# Collect pre-event segments (last 60 seconds)
pre_segments = sorted(
Path(pre_buffer_dir).glob("*.mp4"),
key=lambda f: f.stat().st_mtime
)
# Start post-event recording
post_event_path = f"/tmp/{clip_id}_post.mp4"
post_task = asyncio.create_task(
self._record_post_event(channel, post_event_path, duration=30)
)
await post_task
# Concatenate: pre-event + post-event
concat_list = f"/tmp/{clip_id}_concat.txt"
with open(concat_list, 'w') as f:
for seg in pre_segments:
f.write(f"file '{seg.absolute()}'\n")
f.write(f"file '{post_event_path}'\n")
output_path = f"/data/buffer/alert_clips/{clip_id}.mp4"
subprocess.run([
"ffmpeg", "-y", "-f", "concat", "-safe", "0",
"-i", concat_list, "-c", "copy", output_path
], check=True)
# Store in ring buffer
file_size = os.path.getsize(output_path)
checksum = await self._calculate_checksum(output_path)
metadata = ClipMetadata(
clip_id=clip_id,
channel=channel,
event_type="ai_detection",
event_confidence=0.85,
start_time=event_time - timedelta(seconds=60),
event_time=event_time,
end_time=event_time + timedelta(seconds=30),
duration_seconds=90,
file_path=output_path,
file_size_bytes=file_size,
checksum_sha256=checksum,
upload_status="pending",
)
await self.ring_buffer.store_clip(output_path, metadata)
8. Implementation Specifications
8.1 Python Project Structure
edge-gateway/
├── docker/
│ ├── Dockerfile.stream-manager
│ ├── Dockerfile.ffmpeg-worker
│ ├── Dockerfile.hls-segmenter
│ ├── Dockerfile.buffer-manager
│ └── Dockerfile.health-monitor
├── docker-compose.yml
├── requirements.txt
├── config/
│ ├── cameras.yaml
│ ├── gateway.yaml
│ └── logging.yaml
├── src/
│ ├── __init__.py
│ ├── main.py # Application entry point
│ ├── api/
│ │ ├── __init__.py
│ │ ├── server.py # FastAPI HTTP server
│ │ ├── routes/
│ │ │ ├── __init__.py
│ │ │ ├── cameras.py # Camera CRUD endpoints
│ │ │ ├── streams.py # Stream control endpoints
│ │ │ ├── hls.py # HLS playlist proxy
│ │ │ ├── status.py # Health & status
│ │ │ └── config.py # Configuration endpoints
│ │ └── middleware/
│ │ ├── auth.py # JWT authentication
│ │ ├── cors.py # CORS headers
│ │ └── logging.py # Request logging
│ ├── core/
│ │ ├── __init__.py
│ │ ├── config.py # Configuration loader
│ │ ├── events.py # Event bus (asyncio)
│ │ ├── logging_setup.py # Structured logging
│ │ └── exceptions.py # Custom exceptions
│ ├── streams/
│ │ ├── __init__.py
│ │ ├── manager.py # StreamManager (orchestrator)
│ │ ├── connector.py # StreamConnector (FFmpeg wrapper)
│ │ ├── registry.py # CameraRegistry
│ │ ├── circuit_breaker.py # CircuitBreaker
│ │ ├── status.py # CameraStatus model
│ │ └── ffmpeg_builder.py # FFmpeg command builder
│ ├── processing/
│ │ ├── __init__.py
│ │ ├── frame_extractor.py # FrameExtractor
│ │ ├── multi_extractor.py # MultiChannelExtractor
│ │ ├── adaptive_bitrate.py # AdaptiveBitrateController
│ │ └── motion_detector.py # Motion detection utilities
│ ├── hls/
│ │ ├── __init__.py
│ │ ├── segmenter.py # HLS segment generation
│ │ ├── playlist.py # Playlist manager
│ │ └── server.py # HLS HTTP server
│ ├── storage/
│ │ ├── __init__.py
│ │ ├── ring_buffer.py # RingBuffer
│ │ ├── upload_manager.py # UploadManager
│ │ ├── disk_manager.py # DiskManager
│ │ └── pre_event_buffer.py # Circular pre-event recording
│ ├── discovery/
│ │ ├── __init__.py
│ │ └── onvif_discovery.py # ONVIF camera discovery
│ ├── mqtt/
│ │ ├── __init__.py
│ │ └── client.py # MQTT publisher/subscriber
│ ├── health/
│ │ ├── __init__.py
│ │ ├── monitor.py # HealthMonitor & Watchdog
│ │ └── metrics.py # Prometheus metrics
│ └── utils/
│ ├── __init__.py
│ ├── async_utils.py # Async helpers
│ ├── validators.py # Input validation
│ └── time_utils.py # Timestamp utilities
├── tests/
│ ├── unit/
│ ├── integration/
│ └── fixtures/
├── scripts/
│ ├── start.sh
│ ├── health_check.sh
│ └── backup_config.sh
└── docs/
├── ARCHITECTURE.md
└── API.md
8.2 Core Classes — Full Specification
8.2.1 StreamManager (Main Orchestrator)
# src/streams/manager.py
import asyncio
import logging
from typing import Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime
from .connector import StreamConnector, ReconnectConfig
from .registry import CameraRegistry
from .circuit_breaker import CircuitBreaker, CircuitBreakerConfig
from .status import CameraStatus
from .ffmpeg_builder import FFmpegCommandBuilder
from ..core.config import GatewayConfig
from ..health.metrics import MetricsCollector
logger = logging.getLogger(__name__)
@dataclass
class StreamManagerConfig:
reconnect: ReconnectConfig
circuit_breaker: CircuitBreakerConfig
ffmpeg: FFmpegCommandBuilder
max_concurrent_streams: int = 8
class StreamManager:
"""
Central orchestrator for all camera streams.
Manages lifecycle: discovery -> connection -> monitoring -> cleanup.
"""
def __init__(self, config: StreamManagerConfig, registry: CameraRegistry,
metrics: MetricsCollector):
self.cfg = config
self.registry = registry
self.metrics = metrics
self.connectors: Dict[int, StreamConnector] = {}
self.circuit_breakers: Dict[int, CircuitBreaker] = {}
self._monitor_task: Optional[asyncio.Task] = None
self._running = False
async def start(self):
"""Start the stream manager and connect all enabled cameras."""
self._running = True
cameras = await self.registry.get_all_cameras()
for camera in cameras:
if camera.enabled:
await self._start_camera_stream(camera.channel)
self._monitor_task = asyncio.create_task(
self._health_monitor_loop(),
name="stream-health-monitor"
)
logger.info(f"StreamManager started: {len(self.connectors)} streams active")
async def _start_camera_stream(self, channel: int):
"""Initialize and start a single camera stream."""
camera = await self.registry.get_camera(channel)
if not camera:
logger.error(f"Camera {channel} not found in registry")
return
# Create circuit breaker
cb = CircuitBreaker(f"ch{channel}", self.cfg.circuit_breaker)
self.circuit_breakers[channel] = cb
# Build FFmpeg command
rtsp_url = camera.get_rtsp_url(primary=True)
ffmpeg_cmd = self.cfg.ffmpeg.build(
rtsp_url=rtsp_url,
hls_output=f"/data/hls/ch{channel}.m3u8",
segment_pattern=f"/data/hls/ch{channel}_%03d.ts",
)
# Create and start connector
connector = StreamConnector(
channel=channel,
config=self.cfg.reconnect,
ffmpeg_cmd=ffmpeg_cmd,
)
self.connectors[channel] = connector
await connector.start()
logger.info(f"Started stream for channel {channel}: {camera.name}")
async def stop_camera_stream(self, channel: int):
"""Gracefully stop a camera stream."""
if channel in self.connectors:
await self.connectors[channel].stop()
del self.connectors[channel]
if channel in self.circuit_breakers:
del self.circuit_breakers[channel]
logger.info(f"Stopped stream for channel {channel}")
async def restart_camera_stream(self, channel: int):
"""Restart a specific camera stream."""
await self.stop_camera_stream(channel)
await asyncio.sleep(2)
await self._start_camera_stream(channel)
async def get_status(self, channel: Optional[int] = None) -> List[CameraStatus]:
"""Get status for one or all cameras."""
if channel is not None:
connector = self.connectors.get(channel)
if connector:
return [self._build_status(channel, connector)]
return []
return [
self._build_status(ch, conn)
for ch, conn in self.connectors.items()
]
def _build_status(self, channel: int, connector: StreamConnector) -> CameraStatus:
camera = self.registry.get_camera_sync(channel)
cb = self.circuit_breakers.get(channel)
return CameraStatus(
channel=channel,
name=camera.name if camera else f"CH{channel}",
enabled=camera.enabled if camera else False,
state=connector.state.value if hasattr(connector, 'state') else "unknown",
stream_type="primary" if camera and camera.use_primary else "secondary",
current_fps=getattr(connector, 'current_fps', 0),
disconnect_count=connector.consecutive_failures if hasattr(connector, 'consecutive_failures') else 0,
circuit_breaker_state=cb.state.value if cb else "CLOSED",
)
async def _health_monitor_loop(self):
"""Periodic health check for all streams."""
while self._running:
try:
for channel, connector in list(self.connectors.items()):
status = self._build_status(channel, connector)
# Record metrics
self.metrics.record_fps(channel, status.current_fps)
self.metrics.record_stream_state(channel, status.state)
# Check for anomalies
if status.state == "ONLINE" and status.current_fps < 5:
logger.warning(f"CH{channel}: Low FPS detected "
f"({status.current_fps:.1f})")
except Exception as e:
logger.error(f"Health monitor error: {e}")
await asyncio.sleep(30)
async def stop(self):
"""Graceful shutdown of all streams."""
self._running = False
if self._monitor_task:
self._monitor_task.cancel()
try:
await self._monitor_task
except asyncio.CancelledError:
pass
# Stop all connectors concurrently
await asyncio.gather(*[
connector.stop()
for connector in self.connectors.values()
])
self.connectors.clear()
self.circuit_breakers.clear()
logger.info("StreamManager stopped")
8.2.2 FFmpegCommandBuilder
# src/streams/ffmpeg_builder.py
from dataclasses import dataclass, field
from typing import List, Optional
@dataclass
class FFmpegPreset:
"""Predefined FFmpeg configuration preset."""
name: str
transport: str = "tcp"
timeout_us: int = 5_000_000
reconnect: bool = True
reconnect_max_delay: int = 30
flags: List[str] = field(default_factory=lambda: [
"+genpts", "+discardcorrupt", "+low_delay"
])
probesize: int = 5_000_000
analyzeduration: int = 2_000_000
class FFmpegCommandBuilder:
"""
Builds FFmpeg command lines for various stream operations.
Supports presets for different use cases.
"""
PRESETS = {
"ingest": FFmpegPreset("ingest"),
"hls": FFmpegPreset("hls", reconnect_max_delay=60),
"ai_extraction": FFmpegPreset("ai_extraction", probesize=1_000_000),
"recording": FFmpegPreset("recording", reconnect_max_delay=300),
}
def build(self, rtsp_url: str, hls_output: str, segment_pattern: str,
preset: str = "hls", audio: bool = True,
segment_duration: int = 2, playlist_size: int = 5) -> List[str]:
"""Build complete FFmpeg command for HLS output."""
p = self.PRESETS.get(preset, self.PRESETS["hls"])
cmd = ["ffmpeg", "-nostdin", "-y"]
# Input options
cmd.extend(["-rtsp_transport", p.transport])
cmd.extend(["-stimeout", str(p.timeout_us)])
if p.reconnect:
cmd.extend([
"-reconnect", "1",
"-reconnect_at_eof", "1",
"-reconnect_streamed", "1",
"-reconnect_delay_max", str(p.reconnect_max_delay),
])
cmd.extend(["-fflags", "".join(p.flags)])
cmd.extend(["-flags", "+low_delay"])
cmd.extend(["-probesize", str(p.probesize)])
cmd.extend(["-analyzeduration", str(p.analyzeduration)])
# Input
cmd.extend(["-i", rtsp_url])
# Video codec (copy for efficiency)
cmd.extend(["-c:v", "copy"])
# Audio
if audio:
cmd.extend(["-c:a", "copy"])
else:
cmd.extend(["-an"])
# HLS output
cmd.extend(["-f", "hls"])
cmd.extend(["-hls_time", str(segment_duration)])
cmd.extend(["-hls_list_size", str(playlist_size)])
cmd.extend(["-hls_flags", "delete_segments+omit_endlist+program_date_time"])
cmd.extend(["-hls_segment_type", "mpegts"])
cmd.extend(["-hls_segment_filename", segment_pattern])
cmd.append(hls_output)
return cmd
def build_ai_extraction(self, rtsp_url: str, target_fps: float = 5.0,
resolution: tuple = (640, 480),
mode: str = "hybrid") -> List[str]:
"""Build FFmpeg command for AI frame extraction."""
w, h = resolution
filters = f"fps={target_fps},scale={w}:{h}"
if mode == "keyframe_only":
filters = f"select='eq(pict_type,I)',{filters}"
elif mode == "hybrid":
# I-frame priority with fixed FPS fallback
interval = int(25 / target_fps)
filters = f"select='eq(pict_type,I)+gte(n,prev_selected_n+{interval})',{filters}"
return [
"ffmpeg", "-nostdin", "-y",
"-rtsp_transport", "tcp",
"-stimeout", "5000000",
"-reconnect", "1",
"-reconnect_at_eof", "1",
"-reconnect_streamed", "1",
"-reconnect_delay_max", "30",
"-fflags", "+genpts+discardcorrupt",
"-flags", "+low_delay",
"-i", rtsp_url,
"-vf", filters,
"-an", # No audio for AI
"-f", "image2pipe",
"-vcodec", "mjpeg",
"-q:v", "3",
"-",
]
def build_recording(self, rtsp_url: str, output_pattern: str,
segment_seconds: int = 60) -> List[str]:
"""Build FFmpeg command for segmented recording."""
return [
"ffmpeg", "-nostdin", "-y",
"-rtsp_transport", "tcp",
"-stimeout", "5000000",
"-reconnect", "1",
"-reconnect_at_eof", "1",
"-reconnect_streamed", "1",
"-reconnect_delay_max", "300",
"-fflags", "+genpts+discardcorrupt",
"-flags", "+low_delay",
"-i", rtsp_url,
"-c:v", "copy",
"-c:a", "copy",
"-f", "segment",
"-segment_time", str(segment_seconds),
"-segment_format", "mp4",
"-reset_timestamps", "1",
"-strftime", "1",
output_pattern,
]
def build_webrtc_bridge(self, rtsp_url: str,
output_resolution: tuple = (640, 480)) -> List[str]:
"""Build FFmpeg command for WebRTC bridge (raw video output)."""
w, h = output_resolution
return [
"ffmpeg", "-nostdin", "-y",
"-rtsp_transport", "tcp",
"-stimeout", "5000000",
"-reconnect", "1",
"-reconnect_at_eof", "1",
"-reconnect_streamed", "1",
"-reconnect_delay_max", "30",
"-fflags", "+genpts+discardcorrupt",
"-flags", "+low_delay",
"-i", rtsp_url,
"-vf", f"scale={w}:{h},format=yuv420p",
"-c:v", "rawvideo",
"-pix_fmt", "yuv420p",
"-f", "rawvideo",
"-",
]
8.2.3 CameraRegistry
# src/streams/registry.py
import yaml
import asyncio
import logging
from typing import List, Optional, Dict
from dataclasses import dataclass, field
from pathlib import Path
import aiofiles
logger = logging.getLogger(__name__)
@dataclass
class CameraConfig:
channel: int
name: str
enabled: bool = True
primary_stream_url: str = ""
secondary_stream_url: str = ""
use_primary: bool = False
ai_enabled: bool = True
ai_fps: float = 5.0
ai_resolution: str = "640x480"
ai_mode: str = "hybrid"
recording_enabled: bool = True
pre_event_seconds: int = 10
post_event_seconds: int = 30
tags: Dict[str, str] = field(default_factory=dict)
def get_rtsp_url(self, primary: bool = False) -> str:
if primary and self.primary_stream_url:
return self.primary_stream_url
return self.secondary_stream_url or self.primary_stream_url
class CameraRegistry:
"""
Manages camera configuration from YAML file.
Supports hot-reloading and ONVIF discovery integration.
"""
def __init__(self, config_path: str):
self.config_path = Path(config_path)
self._cameras: Dict[int, CameraConfig] = {}
self._lock = asyncio.Lock()
async def load(self):
"""Load camera configuration from YAML."""
async with self._lock:
if not self.config_path.exists():
logger.warning(f"Config file not found: {self.config_path}")
return
async with aiofiles.open(self.config_path, 'r') as f:
content = await f.read()
data = yaml.safe_load(content)
self._cameras = {}
for cam_data in data.get('cameras', []):
cam = CameraConfig(
channel=cam_data['channel'],
name=cam_data['name'],
enabled=cam_data.get('enabled', True),
primary_stream_url=cam_data.get('primary_stream', {}).get('url', ''),
secondary_stream_url=cam_data.get('secondary_stream', {}).get('url', ''),
ai_enabled=cam_data.get('ai_config', {}).get('enabled', True),
ai_fps=cam_data.get('ai_config', {}).get('inference_fps', 5.0),
ai_resolution=cam_data.get('ai_config', {}).get('resolution', '640x480'),
ai_mode=cam_data.get('ai_config', {}).get('mode', 'hybrid'),
recording_enabled=cam_data.get('recording_config', {}).get('enabled', True),
pre_event_seconds=cam_data.get('recording_config', {}).get('pre_event_seconds', 10),
post_event_seconds=cam_data.get('recording_config', {}).get('post_event_seconds', 30),
tags=cam_data.get('tags', {}),
)
self._cameras[cam.channel] = cam
logger.info(f"Loaded {len(self._cameras)} cameras from config")
async def save(self):
"""Persist current configuration to YAML."""
async with self._lock:
data = {
'cameras': [],
'settings': {
'default_ai_fps': 5.0,
'default_inference_resolution': '640x480',
}
}
for cam in self._cameras.values():
data['cameras'].append({
'channel': cam.channel,
'name': cam.name,
'enabled': cam.enabled,
'primary_stream': {'url': cam.primary_stream_url},
'secondary_stream': {'url': cam.secondary_stream_url},
'ai_config': {
'enabled': cam.ai_enabled,
'inference_fps': cam.ai_fps,
'resolution': cam.ai_resolution,
'mode': cam.ai_mode,
},
'recording_config': {
'enabled': cam.recording_enabled,
'pre_event_seconds': cam.pre_event_seconds,
'post_event_seconds': cam.post_event_seconds,
},
'tags': cam.tags,
})
async with aiofiles.open(self.config_path, 'w') as f:
await f.write(yaml.dump(data, default_flow_style=False, sort_keys=False))
async def get_all_cameras(self) -> List[CameraConfig]:
return list(self._cameras.values())
async def get_camera(self, channel: int) -> Optional[CameraConfig]:
return self._cameras.get(channel)
def get_camera_sync(self, channel: int) -> Optional[CameraConfig]:
return self._cameras.get(channel)
async def update_camera(self, channel: int, updates: dict):
async with self._lock:
if channel not in self._cameras:
raise ValueError(f"Camera {channel} not found")
cam = self._cameras[channel]
for key, value in updates.items():
if hasattr(cam, key):
setattr(cam, key, value)
await self.save()
async def add_camera(self, camera: CameraConfig):
async with self._lock:
if camera.channel in self._cameras:
raise ValueError(f"Camera {camera.channel} already exists")
self._cameras[camera.channel] = camera
await self.save()
async def remove_camera(self, channel: int):
async with self._lock:
if channel in self._cameras:
del self._cameras[channel]
await self.save()
8.2.4 HealthMonitor & Watchdog
# src/health/monitor.py
import asyncio
import logging
import subprocess
from datetime import datetime
from typing import Dict, List, Optional, Callable
from dataclasses import dataclass, field
logger = logging.getLogger(__name__)
@dataclass
class HealthReport:
timestamp: datetime
gateway_id: str
overall_status: str # healthy, degraded, critical
dvr_reachable: bool
dvr_latency_ms: float
streams_online: int
streams_total: int
vpn_tunnel_up: bool
vpn_latency_ms: float
disk_usage_percent: float
memory_usage_percent: float
cpu_usage_percent: float
issues: List[str] = field(default_factory=list)
stream_details: Dict[int, dict] = field(default_factory=dict)
class HealthMonitor:
"""
Comprehensive health monitoring for the edge gateway.
Checks DVR connectivity, stream health, VPN tunnel, and system resources.
"""
def __init__(self, gateway_id: str, dvr_host: str, vpn_endpoint: str,
check_interval: float = 30.0):
self.gateway_id = gateway_id
self.dvr_host = dvr_host
self.vpn_endpoint = vpn_endpoint
self.check_interval = check_interval
self._callbacks: List[Callable[[HealthReport], None]] = []
self._running = False
self._task: Optional[asyncio.Task] = None
def on_health_change(self, callback: Callable[[HealthReport], None]):
self._callbacks.append(callback)
async def start(self):
self._running = True
self._task = asyncio.create_task(self._monitor_loop(), name="health-monitor")
async def _monitor_loop(self):
while self._running:
try:
report = await self._run_checks()
for cb in self._callbacks:
try:
if asyncio.iscoroutinefunction(cb):
await cb(report)
else:
cb(report)
except Exception as e:
logger.error(f"Health callback error: {e}")
except Exception as e:
logger.error(f"Health check error: {e}")
await asyncio.sleep(self.check_interval)
async def _run_checks(self) -> HealthReport:
timestamp = datetime.utcnow()
issues = []
# Check DVR reachability
dvr_reachable, dvr_latency = await self._ping_host(self.dvr_host)
if not dvr_reachable:
issues.append(f"DVR unreachable: {self.dvr_host}")
# Check VPN tunnel
vpn_up, vpn_latency = await self._ping_host(self.vpn_endpoint)
if not vpn_up:
issues.append(f"VPN tunnel down: {self.vpn_endpoint}")
# Check system resources
disk_pct = await self._get_disk_usage()
memory_pct = await self._get_memory_usage()
cpu_pct = await self._get_cpu_usage()
if disk_pct > 90:
issues.append(f"Disk critical: {disk_pct:.1f}%")
elif disk_pct > 80:
issues.append(f"Disk warning: {disk_pct:.1f}%")
if memory_pct > 90:
issues.append(f"Memory critical: {memory_pct:.1f}%")
# Determine overall status
if len(issues) == 0:
status = "healthy"
elif any("critical" in i for i in issues):
status = "critical"
else:
status = "degraded"
return HealthReport(
timestamp=timestamp,
gateway_id=self.gateway_id,
overall_status=status,
dvr_reachable=dvr_reachable,
dvr_latency_ms=dvr_latency,
streams_online=0, # Filled by StreamManager
streams_total=8,
vpn_tunnel_up=vpn_up,
vpn_latency_ms=vpn_latency,
disk_usage_percent=disk_pct,
memory_usage_percent=memory_pct,
cpu_usage_percent=cpu_pct,
issues=issues,
)
async def _ping_host(self, host: str, count: int = 3) -> tuple:
"""Ping a host and return (reachable, avg_latency_ms)."""
try:
proc = await asyncio.create_subprocess_exec(
"ping", "-c", str(count), "-W", "2", host,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=10)
output = stdout.decode()
if "0% packet loss" in output or proc.returncode == 0:
# Parse latency
try:
avg_line = [l for l in output.split('\n') if 'avg' in l][-1]
avg_ms = float(avg_line.split('/')[-3])
return True, avg_ms
except (IndexError, ValueError):
return True, 0.0
return False, -1.0
except Exception:
return False, -1.0
async def _get_disk_usage(self) -> float:
try:
proc = await asyncio.create_subprocess_exec(
"df", "/data", "--output=pcent",
stdout=asyncio.subprocess.PIPE,
)
stdout, _ = await proc.communicate()
lines = stdout.decode().strip().split('\n')
if len(lines) >= 2:
return float(lines[1].replace('%', ''))
except Exception:
pass
return 0.0
async def _get_memory_usage(self) -> float:
try:
proc = await asyncio.create_subprocess_exec(
"free", "|", "grep", "Mem",
stdout=asyncio.subprocess.PIPE,
)
stdout, _ = await proc.communicate()
# Parse: Mem: total used free shared buff/cache available
parts = stdout.decode().split()
total = float(parts[1])
used = float(parts[2])
return (used / total) * 100
except Exception:
return 0.0
async def _get_cpu_usage(self) -> float:
try:
proc = await asyncio.create_subprocess_exec(
"top", "-bn1", "|", "grep", "Cpu(s)",
stdout=asyncio.subprocess.PIPE,
)
stdout, _ = await proc.communicate()
line = stdout.decode()
# Parse: %Cpu(s): 10.5 us, 5.2 sy, ...
if 'us' in line:
us = float(line.split('us')[0].split()[-1])
sy = float(line.split('sy')[0].split()[-1])
return us + sy
except Exception:
return 0.0
async def stop(self):
self._running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
8.2.5 FastAPI Server (Diagnostic UI & API)
# src/api/server.py
from fastapi import FastAPI, HTTPException, Depends, WebSocket
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse, FileResponse
import asyncio
import logging
from typing import List, Optional
from ..streams.manager import StreamManager
from ..streams.status import CameraStatus
from ..health.monitor import HealthMonitor, HealthReport
from ..core.config import GatewayConfig
logger = logging.getLogger(__name__)
# HTML for minimal diagnostic UI
DIAGNOSTIC_HTML = """
<!DOCTYPE html>
<html>
<head>
<title>Edge Gateway Diagnostics</title>
<style>
body { font-family: monospace; background: #1a1a2e; color: #eee; padding: 20px; }
h1 { color: #00d4ff; }
.status-healthy { color: #00ff88; }
.status-degraded { color: #ffaa00; }
.status-critical { color: #ff4444; }
.status-offline { color: #888; }
table { border-collapse: collapse; width: 100%; margin-top: 20px; }
th, td { border: 1px solid #333; padding: 8px; text-align: left; }
th { background: #16213e; }
tr:nth-child(even) { background: #0f3460; }
.metric { display: inline-block; margin: 10px 20px 10px 0; }
.metric-label { color: #888; font-size: 12px; }
.metric-value { font-size: 24px; font-weight: bold; }
#log { background: #0a0a1a; padding: 10px; height: 200px; overflow-y: auto;
font-size: 11px; margin-top: 20px; border: 1px solid #333; }
</style>
</head>
<body>
<h1>Edge Gateway Diagnostics — {{gateway_id}}</h1>
<div>
<div class="metric">
<div class="metric-label">Status</div>
<div class="metric-value status-{{health_status}}">{{health_status.upper()}}</div>
</div>
<div class="metric">
<div class="metric-label">Streams Online</div>
<div class="metric-value">{{streams_online}}/{{streams_total}}</div>
</div>
<div class="metric">
<div class="metric-label">DVR</div>
<div class="metric-value">{{dvr_status}}</div>
</div>
<div class="metric">
<div class="metric-label">VPN</div>
<div class="metric-value">{{vpn_status}}</div>
</div>
<div class="metric">
<div class="metric-label">Disk</div>
<div class="metric-value">{{disk_usage}}%</div>
</div>
</div>
<h2>Camera Streams</h2>
<table>
<tr>
<th>CH</th><th>Name</th><th>State</th><th>FPS</th>
<th>Bitrate</th><th>Resolution</th><th>Uptime</th><th>Circuit</th>
</tr>
{{#cameras}}
<tr>
<td>{{channel}}</td>
<td>{{name}}</td>
<td class="status-{{state}}">{{state}}</td>
<td>{{fps}}</td>
<td>{{bitrate}}</td>
<td>{{resolution}}</td>
<td>{{uptime}}</td>
<td>{{circuit_breaker}}</td>
</tr>
{{/cameras}}
</table>
<h2>System Log</h2>
<div id="log">{{system_log}}</div>
<script>
setTimeout(() => location.reload(), 5000);
</script>
</body>
</html>
"""
def create_app(stream_manager: StreamManager, health_monitor: HealthMonitor,
config: GatewayConfig) -> FastAPI:
app = FastAPI(title="Edge Gateway API", version="1.0.0")
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["https://dashboard.example.com"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/", response_class=HTMLResponse)
async def diagnostic_ui():
"""Minimal diagnostic HTML UI."""
status = await stream_manager.get_status()
health = await health_monitor._run_checks()
# Simple template rendering (in production use Jinja2)
cameras_html = ""
for cam in status:
cameras_html += f"""
<tr>
<td>{cam.channel}</td>
<td>{cam.name}</td>
<td class="status-{cam.state}">{cam.state}</td>
<td>{cam.current_fps:.1f}</td>
<td>{cam.current_bitrate_kbps:.0f}</td>
<td>{cam.resolution}</td>
<td>{cam.uptime_seconds:.0f}s</td>
<td>{cam.circuit_breaker_state}</td>
</tr>"""
html = DIAGNOSTIC_HTML
html = html.replace("{{gateway_id}}", config.gateway_id)
html = html.replace("{{health_status}}", health.overall_status)
html = html.replace("{{streams_online}}", str(health.streams_online))
html = html.replace("{{streams_total}}", str(health.streams_total))
html = html.replace("{{dvr_status}}", "ONLINE" if health.dvr_reachable else "OFFLINE")
html = html.replace("{{vpn_status}}", "UP" if health.vpn_tunnel_up else "DOWN")
html = html.replace("{{disk_usage}}", f"{health.disk_usage_percent:.1f}")
html = html.replace("{{#cameras}}", "")
html = html.replace("{{/cameras}}", "")
# Insert camera rows between table rows
html_parts = html.split("<tr>\n <th>CH</th>")
if len(html_parts) == 2:
html = html_parts[0] + cameras_html + "<tr>\n <th>CH</th>" + html_parts[1]
return HTMLResponse(content=html)
@app.get("/api/v1/status")
async def get_status():
"""Get overall gateway status."""
streams = await stream_manager.get_status()
health = await health_monitor._run_checks()
return {
"gateway_id": config.gateway_id,
"timestamp": datetime.utcnow().isoformat(),
"health": health.overall_status,
"dvr": {"reachable": health.dvr_reachable, "latency_ms": health.dvr_latency_ms},
"vpn": {"up": health.vpn_tunnel_up, "latency_ms": health.vpn_latency_ms},
"system": {
"disk_percent": health.disk_usage_percent,
"memory_percent": health.memory_usage_percent,
"cpu_percent": health.cpu_usage_percent,
},
"streams": [s.to_dict() for s in streams],
}
@app.get("/api/v1/cameras")
async def list_cameras():
"""List all configured cameras."""
return await stream_manager.registry.get_all_cameras()
@app.get("/api/v1/cameras/{channel}")
async def get_camera(channel: int):
camera = await stream_manager.registry.get_camera(channel)
if not camera:
raise HTTPException(status_code=404, detail=f"Camera {channel} not found")
return camera
@app.post("/api/v1/cameras/{channel}/restart")
async def restart_camera_stream(channel: int):
"""Restart a specific camera stream."""
await stream_manager.restart_camera_stream(channel)
return {"status": "restarted", "channel": channel}
@app.get("/api/v1/streams/{channel}/status")
async def get_stream_status(channel: int):
"""Get detailed stream status."""
status = await stream_manager.get_status(channel)
if not status:
raise HTTPException(status_code=404, detail=f"Stream {channel} not found")
return status[0].to_dict()
@app.get("/api/v1/hls/{channel}.m3u8")
async def get_hls_playlist(channel: int):
"""Proxy HLS playlist for a channel."""
playlist_path = f"/data/hls/ch{channel}.m3u8"
if not os.path.exists(playlist_path):
raise HTTPException(status_code=404, detail="Playlist not available")
return FileResponse(playlist_path, media_type="application/vnd.apple.mpegurl")
@app.get("/api/v1/hls/{segment}")
async def get_hls_segment(segment: str):
"""Serve HLS segment file."""
segment_path = f"/data/hls/{segment}"
if not os.path.exists(segment_path):
raise HTTPException(status_code=404, detail="Segment not found")
return FileResponse(segment_path, media_type="video/mp2t")
@app.websocket("/ws/live")
async def websocket_status(websocket: WebSocket):
"""WebSocket for real-time status updates."""
await websocket.accept()
try:
while True:
streams = await stream_manager.get_status()
health = await health_monitor._run_checks()
await websocket.send_json({
"timestamp": datetime.utcnow().isoformat(),
"health": health.overall_status,
"streams": [s.to_dict() for s in streams],
})
await asyncio.sleep(5)
except Exception:
await websocket.close()
return app
8.3 Docker Compose Configuration
# docker-compose.yml — Edge Gateway Full Stack
version: "3.8"
services:
# ============================================
# Core: Stream Manager (API + Orchestration)
# ============================================
stream-manager:
build:
context: .
dockerfile: docker/Dockerfile.stream-manager
container_name: stream-manager
hostname: stream-manager
restart: unless-stopped
ports:
- "8080:8080" # API + Diagnostic UI
volumes:
- /data/config:/app/config:ro
- /data/logs:/app/logs
- /data/hls:/data/hls:ro # Read HLS segments
- /data/buffer:/data/buffer # Access buffer
- /var/run/docker.sock:/var/run/docker.sock:ro # Container mgmt
environment:
- GATEWAY_ID=${GATEWAY_ID:-edge-ind-01}
- DVR_HOST=${DVR_HOST:-192.168.29.200}
- DVR_RTSP_PORT=${DVR_RTSP_PORT:-554}
- DVR_USERNAME=${DVR_USERNAME:-admin}
- DVR_PASSWORD=${DVR_PASSWORD}
- LOG_LEVEL=${LOG_LEVEL:-INFO}
- STREAM_TRANSPORT=${STREAM_TRANSPORT:-tcp}
- HEALTH_CHECK_INTERVAL=${HEALTH_CHECK_INTERVAL:-30}
- MQTT_BROKER_HOST=mqtt-broker
- MQTT_BROKER_PORT=1883
- HLS_OUTPUT_DIR=/data/hls
- BUFFER_DIR=/data/buffer
networks:
- edge-internal
cpus: "2.0"
mem_limit: 512m
logging:
driver: "json-file"
options:
max-size: "10m"
max-file: "5"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/api/v1/status"]
interval: 30s
timeout: 5s
retries: 3
start_period: 30s
depends_on:
- mqtt-broker
# ============================================
# FFmpeg Workers (CH1-CH4 — Primary Streams)
# ============================================
ffmpeg-worker-1:
image: jrottenberg/ffmpeg:6.1-ubuntu2204
container_name: ffmpeg-worker-1
restart: unless-stopped
command: >
ffmpeg -nostdin -y
-rtsp_transport tcp -stimeout 5000000
-reconnect 1 -reconnect_at_eof 1 -reconnect_streamed 1 -reconnect_delay_max 30
-fflags +genpts+discardcorrupt -flags +low_delay
-probesize 5000000 -analyzeduration 2000000
-i "rtsp://${DVR_USERNAME}:${DVR_PASSWORD}@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0"
-c:v copy -c:a copy
-f hls -hls_time 2 -hls_list_size 5
-hls_flags delete_segments+omit_endlist+program_date_time
-hls_segment_filename "/data/hls/ch1_%03d.ts"
"/data/hls/ch1.m3u8"
-c:v copy -c:a copy
-f segment -segment_time 60 -segment_format mp4
-reset_timestamps 1 -strftime 1
"/data/buffer/continuous/ch1_%Y%m%d_%H%M%S.mp4"
volumes:
- /data/hls:/data/hls
- /data/buffer/continuous:/data/buffer/continuous
environment:
- DVR_PASSWORD=${DVR_PASSWORD}
networks:
- edge-internal
cpus: "2.0"
mem_limit: 256m
logging:
driver: "json-file"
options:
max-size: "5m"
max-file: "3"
deploy:
restart_policy:
condition: on-failure
delay: 5s
max_attempts: 10
ffmpeg-worker-2:
image: jrottenberg/ffmpeg:6.1-ubuntu2204
container_name: ffmpeg-worker-2
restart: unless-stopped
command: >
ffmpeg -nostdin -y
-rtsp_transport tcp -stimeout 5000000
-reconnect 1 -reconnect_at_eof 1 -reconnect_streamed 1 -reconnect_delay_max 30
-fflags +genpts+discardcorrupt -flags +low_delay
-probesize 5000000 -analyzeduration 2000000
-i "rtsp://${DVR_USERNAME}:${DVR_PASSWORD}@192.168.29.200:554/cam/realmonitor?channel=2&subtype=0"
-c:v copy -c:a copy
-f hls -hls_time 2 -hls_list_size 5
-hls_flags delete_segments+omit_endlist+program_date_time
-hls_segment_filename "/data/hls/ch2_%03d.ts"
"/data/hls/ch2.m3u8"
-c:v copy -c:a copy
-f segment -segment_time 60 -segment_format mp4
-reset_timestamps 1 -strftime 1
"/data/buffer/continuous/ch2_%Y%m%d_%H%M%S.mp4"
volumes:
- /data/hls:/data/hls
- /data/buffer/continuous:/data/buffer/continuous
environment:
- DVR_PASSWORD=${DVR_PASSWORD}
networks:
- edge-internal
cpus: "2.0"
mem_limit: 256m
logging:
driver: "json-file"
options:
max-size: "5m"
max-file: "3"
deploy:
restart_policy:
condition: on-failure
delay: 5s
max_attempts: 10
ffmpeg-worker-3:
image: jrottenberg/ffmpeg:6.1-ubuntu2204
container_name: ffmpeg-worker-3
restart: unless-stopped
command: >
ffmpeg -nostdin -y
-rtsp_transport tcp -stimeout 5000000
-reconnect 1 -reconnect_at_eof 1 -reconnect_streamed 1 -reconnect_delay_max 30
-fflags +genpts+discardcorrupt -flags +low_delay
-probesize 5000000 -analyzeduration 2000000
-i "rtsp://${DVR_USERNAME}:${DVR_PASSWORD}@192.168.29.200:554/cam/realmonitor?channel=3&subtype=0"
-c:v copy -c:a copy
-f hls -hls_time 2 -hls_list_size 5
-hls_flags delete_segments+omit_endlist+program_date_time
-hls_segment_filename "/data/hls/ch3_%03d.ts"
"/data/hls/ch3.m3u8"
volumes:
- /data/hls:/data/hls
environment:
- DVR_PASSWORD=${DVR_PASSWORD}
networks:
- edge-internal
cpus: "2.0"
mem_limit: 256m
ffmpeg-worker-4:
image: jrottenberg/ffmpeg:6.1-ubuntu2204
container_name: ffmpeg-worker-4
restart: unless-stopped
command: >
ffmpeg -nostdin -y
-rtsp_transport tcp -stimeout 5000000
-reconnect 1 -reconnect_at_eof 1 -reconnect_streamed 1 -reconnect_delay_max 30
-fflags +genpts+discardcorrupt -flags +low_delay
-probesize 5000000 -analyzeduration 2000000
-i "rtsp://${DVR_USERNAME}:${DVR_PASSWORD}@192.168.29.200:554/cam/realmonitor?channel=4&subtype=0"
-c:v copy -c:a copy
-f hls -hls_time 2 -hls_list_size 5
-hls_flags delete_segments+omit_endlist+program_date_time
-hls_segment_filename "/data/hls/ch4_%03d.ts"
"/data/hls/ch4.m3u8"
volumes:
- /data/hls:/data/hls
environment:
- DVR_PASSWORD=${DVR_PASSWORD}
networks:
- edge-internal
cpus: "2.0"
mem_limit: 256m
# ============================================
# FFmpeg Workers (CH5-CH8 — Secondary Streams)
# ============================================
ffmpeg-worker-5:
image: jrottenberg/ffmpeg:6.1-ubuntu2204
container_name: ffmpeg-worker-5
restart: unless-stopped
command: >
ffmpeg -nostdin -y
-rtsp_transport tcp -stimeout 5000000
-reconnect 1 -reconnect_at_eof 1 -reconnect_streamed 1 -reconnect_delay_max 30
-fflags +genpts+discardcorrupt -flags +low_delay
-i "rtsp://${DVR_USERNAME}:${DVR_PASSWORD}@192.168.29.200:554/cam/realmonitor?channel=5&subtype=1"
-c:v copy -c:a copy
-f hls -hls_time 2 -hls_list_size 5
-hls_flags delete_segments+omit_endlist+program_date_time
-hls_segment_filename "/data/hls/ch5_%03d.ts"
"/data/hls/ch5.m3u8"
volumes:
- /data/hls:/data/hls
networks:
- edge-internal
cpus: "1.0"
mem_limit: 128m
ffmpeg-worker-6:
image: jrottenberg/ffmpeg:6.1-ubuntu2204
container_name: ffmpeg-worker-6
restart: unless-stopped
command: >
ffmpeg -nostdin -y
-rtsp_transport tcp -stimeout 5000000
-reconnect 1 -reconnect_at_eof 1 -reconnect_streamed 1 -reconnect_delay_max 30
-fflags +genpts+discardcorrupt -flags +low_delay
-i "rtsp://${DVR_USERNAME}:${DVR_PASSWORD}@192.168.29.200:554/cam/realmonitor?channel=6&subtype=1"
-c:v copy -c:a copy
-f hls -hls_time 2 -hls_list_size 5
-hls_flags delete_segments+omit_endlist+program_date_time
-hls_segment_filename "/data/hls/ch6_%03d.ts"
"/data/hls/ch6.m3u8"
volumes:
- /data/hls:/data/hls
networks:
- edge-internal
cpus: "1.0"
mem_limit: 128m
ffmpeg-worker-7:
image: jrottenberg/ffmpeg:6.1-ubuntu2204
container_name: ffmpeg-worker-7
restart: unless-stopped
command: >
ffmpeg -nostdin -y
-rtsp_transport tcp -stimeout 5000000
-reconnect 1 -reconnect_at_eof 1 -reconnect_streamed 1 -reconnect_delay_max 30
-fflags +genpts+discardcorrupt -flags +low_delay
-i "rtsp://${DVR_USERNAME}:${DVR_PASSWORD}@192.168.29.200:554/cam/realmonitor?channel=7&subtype=1"
-c:v copy -c:a copy
-f hls -hls_time 2 -hls_list_size 5
-hls_flags delete_segments+omit_endlist+program_date_time
-hls_segment_filename "/data/hls/ch7_%03d.ts"
"/data/hls/ch7.m3u8"
volumes:
- /data/hls:/data/hls
networks:
- edge-internal
cpus: "1.0"
mem_limit: 128m
ffmpeg-worker-8:
image: jrottenberg/ffmpeg:6.1-ubuntu2204
container_name: ffmpeg-worker-8
restart: unless-stopped
command: >
ffmpeg -nostdin -y
-rtsp_transport tcp -stimeout 5000000
-reconnect 1 -reconnect_at_eof 1 -reconnect_streamed 1 -reconnect_delay_max 30
-fflags +genpts+discardcorrupt -flags +low_delay
-i "rtsp://${DVR_USERNAME}:${DVR_PASSWORD}@192.168.29.200:554/cam/realmonitor?channel=8&subtype=1"
-c:v copy -c:a copy
-f hls -hls_time 2 -hls_list_size 5
-hls_flags delete_segments+omit_endlist+program_date_time
-hls_segment_filename "/data/hls/ch8_%03d.ts"
"/data/hls/ch8.m3u8"
volumes:
- /data/hls:/data/hls
networks:
- edge-internal
cpus: "1.0"
mem_limit: 128m
# ============================================
# AI Frame Extractor
# ============================================
frame-extractor:
build:
context: .
dockerfile: docker/Dockerfile.frame-extractor
container_name: frame-extractor
restart: unless-stopped
volumes:
- /data/config:/app/config:ro
- /data/logs:/app/logs
environment:
- DVR_HOST=192.168.29.200
- DVR_USERNAME=${DVR_USERNAME:-admin}
- DVR_PASSWORD=${DVR_PASSWORD}
- AI_INFERENCE_FPS=${AI_INFERENCE_FPS:-5}
- AI_KEYFRAME_ONLY=${AI_KEYFRAME_ONLY:-true}
- AI_RESOLUTION=${AI_RESOLUTION:-640x480}
- MQTT_BROKER_HOST=mqtt-broker
- MQTT_BROKER_PORT=1883
- MQTT_TOPIC_PREFIX=ai/frames
- LOG_LEVEL=${LOG_LEVEL:-INFO}
networks:
- edge-internal
cpus: "2.0"
mem_limit: 512m
depends_on:
- mqtt-broker
# ============================================
# Buffer Manager (Upload Queue)
# ============================================
buffer-manager:
build:
context: .
dockerfile: docker/Dockerfile.buffer-manager
container_name: buffer-manager
restart: unless-stopped
volumes:
- /data/buffer:/data/buffer
- /data/config:/app/config:ro
- /data/logs:/app/logs
environment:
- BUFFER_MAX_SIZE_GB=${BUFFER_MAX_SIZE_GB:-20}
- BUFFER_RETENTION_HOURS=${BUFFER_RETENTION_HOURS:-72}
- UPLOAD_ENDPOINT=${UPLOAD_ENDPOINT:-https://10.100.0.1/api/upload}
- UPLOAD_MAX_RETRIES=${UPLOAD_MAX_RETRIES:-5}
- UPLOAD_RETRY_BASE_DELAY=${UPLOAD_RETRY_BASE_DELAY:-5}
- UPLOAD_CHUNK_SIZE_MB=${UPLOAD_CHUNK_SIZE_MB:-10}
- VPN_ENABLED=${VPN_ENABLED:-true}
- LOG_LEVEL=${LOG_LEVEL:-INFO}
networks:
- edge-internal
- vpn-net
cpus: "1.0"
mem_limit: 256m
depends_on:
- stream-manager
# ============================================
# MQTT Broker (Internal Messaging)
# ============================================
mqtt-broker:
image: eclipse-mosquitto:2.0.18
container_name: mqtt-broker
restart: unless-stopped
ports:
- "127.0.0.1:1883:1883" # Local only — not exposed externally
volumes:
- ./config/mosquitto.conf:/mosquitto/config/mosquitto.conf:ro
- /data/mqtt/data:/mosquitto/data
- /data/mqtt/logs:/mosquitto/log
networks:
- edge-internal
cpus: "0.5"
mem_limit: 128m
# ============================================
# VPN Client (WireGuard)
# ============================================
vpn-client:
image: lscr.io/linuxserver/wireguard:latest
container_name: vpn-client
restart: unless-stopped
cap_add:
- NET_ADMIN
- SYS_MODULE
environment:
- PUID=1000
- PGID=1000
volumes:
- ./config/wireguard:/config/wg0.conf:ro
- /lib/modules:/lib/modules:ro
sysctls:
- net.ipv4.conf.all.src_valid_mark=1
- net.ipv4.ip_forward=1
networks:
- vpn-net
cpus: "0.5"
mem_limit: 128m
# ============================================
# Prometheus (Metrics)
# ============================================
prometheus:
image: prom/prometheus:v2.50.0
container_name: prometheus
restart: unless-stopped
ports:
- "127.0.0.1:9090:9090"
volumes:
- ./config/prometheus.yml:/etc/prometheus/prometheus.yml:ro
- /data/prometheus:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
- '--web.console.libraries=/etc/prometheus/console_libraries'
- '--web.console.templates=/etc/prometheus/consoles'
- '--storage.tsdb.retention.time=7d'
networks:
- edge-internal
cpus: "0.5"
mem_limit: 256m
# ============================================
# Node Exporter (Host Metrics)
# ============================================
node-exporter:
image: prom/node-exporter:v1.7.0
container_name: node-exporter
restart: unless-stopped
volumes:
- /proc:/host/proc:ro
- /sys:/host/sys:ro
- /:/rootfs:ro
command:
- '--path.procfs=/host/proc'
- '--path.rootfs=/rootfs'
- '--path.sysfs=/host/sys'
- '--collector.filesystem.mount-points-exclude=^/(sys|proc|dev|host|etc)($$|/)'
networks:
- edge-internal
cpus: "0.25"
mem_limit: 64m
networks:
edge-internal:
driver: bridge
ipam:
config:
- subnet: 172.28.0.0/16
vpn-net:
driver: bridge
internal: true # Only containers can access this network
8.4 Dockerfiles
8.4.1 Stream Manager Dockerfile
# docker/Dockerfile.stream-manager
FROM python:3.11-slim-bookworm AS base
# Install system dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
curl \
ffmpeg \
libsm6 \
libxext6 \
libgl1-mesa-glx \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY src/ ./src/
# Create non-root user
RUN useradd -m -u 1000 appuser && \
mkdir -p /data/hls /data/buffer /data/logs /app/logs && \
chown -R appuser:appuser /app /data
USER appuser
# Expose API port
EXPOSE 8080
# Health check
HEALTHCHECK --interval=30s --timeout=5s --start-period=30s --retries=3 \
CMD curl -f http://localhost:8080/api/v1/status || exit 1
# Run the application
CMD ["python", "-m", "src.main"]
8.4.2 Frame Extractor Dockerfile
# docker/Dockerfile.frame-extractor
FROM python:3.11-slim-bookworm
RUN apt-get update && apt-get install -y --no-install-recommends \
ffmpeg \
libsm6 \
libxext6 \
libgl1-mesa-glx \
libgomp1 \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY src/processing/ ./src/processing/
COPY src/mqtt/ ./src/mqtt/
COPY src/core/ ./src/core/
COPY src/utils/ ./src/utils/
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser
CMD ["python", "-m", "src.processing.frame_extractor"]
8.4.3 Buffer Manager Dockerfile
# docker/Dockerfile.buffer-manager
FROM python:3.11-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY src/storage/ ./src/storage/
COPY src/core/ ./src/core/
COPY src/utils/ ./src/utils/
RUN useradd -m -u 1000 appuser && \
mkdir -p /data/buffer && \
chown -R appuser:appuser /app /data
USER appuser
CMD ["python", "-m", "src.storage.buffer_manager"]
8.5 Requirements.txt
# Core Framework
fastapi==0.110.0
uvicorn[standard]==0.27.1
python-multipart==0.0.9
websockets==12.0
# Async
aiohttp==3.9.3
aiofiles==23.2.1
aiortc==1.8.0
# MQTT
paho-mqtt==1.6.1
# Computer Vision
opencv-python-headless==4.9.0.84
numpy==1.26.4
Pillow==10.2.0
# ONVIF
onvif-zeep==0.2.12
# Configuration
PyYAML==6.0.1
python-dotenv==1.0.1
# Data Validation
pydantic==2.6.3
pydantic-settings==2.2.1
# Monitoring
prometheus-client==0.20.0
# Testing
pytest==8.0.2
pytest-asyncio==0.23.5
pytest-cov==4.1.0
httpx==0.27.0
# Utilities
httpx==0.27.0
tenacity==8.2.3
structlog==24.1.0
orjson==3.9.15
8.6 Configuration Files
8.6.1 Camera Configuration (cameras.yaml)
# config/cameras.yaml
cameras:
- channel: 1
name: "Gate/Entry"
enabled: true
primary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0"
secondary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=1"
use_primary: false
ai_enabled: true
ai_fps: 5.0
ai_resolution: "640x480"
ai_mode: "hybrid"
recording_enabled: true
pre_event_seconds: 10
post_event_seconds: 30
tags:
location: "main_gate"
criticality: "high"
- channel: 2
name: "Production Floor A"
enabled: true
primary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=2&subtype=0"
secondary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=2&subtype=1"
use_primary: false
ai_enabled: true
ai_fps: 5.0
ai_resolution: "640x480"
ai_mode: "hybrid"
recording_enabled: true
pre_event_seconds: 10
post_event_seconds: 30
tags:
location: "production_a"
criticality: "high"
- channel: 3
name: "Production Floor B"
enabled: true
primary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=3&subtype=0"
secondary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=3&subtype=1"
use_primary: false
ai_enabled: true
ai_fps: 5.0
ai_resolution: "640x480"
ai_mode: "hybrid"
recording_enabled: true
pre_event_seconds: 10
post_event_seconds: 30
tags:
location: "production_b"
criticality: "medium"
- channel: 4
name: "Warehouse"
enabled: true
primary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=4&subtype=0"
secondary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=4&subtype=1"
use_primary: false
ai_enabled: true
ai_fps: 5.0
ai_resolution: "640x480"
ai_mode: "hybrid"
recording_enabled: true
pre_event_seconds: 10
post_event_seconds: 30
tags:
location: "warehouse"
criticality: "medium"
- channel: 5
name: "Loading Dock"
enabled: true
primary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=5&subtype=0"
secondary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=5&subtype=1"
use_primary: false
ai_enabled: true
ai_fps: 5.0
ai_resolution: "640x480"
ai_mode: "hybrid"
recording_enabled: true
pre_event_seconds: 10
post_event_seconds: 30
tags:
location: "loading_dock"
criticality: "medium"
- channel: 6
name: "Office Corridor"
enabled: true
primary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=6&subtype=0"
secondary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=6&subtype=1"
use_primary: false
ai_enabled: true
ai_fps: 5.0
ai_resolution: "640x480"
ai_mode: "hybrid"
recording_enabled: false
pre_event_seconds: 10
post_event_seconds: 30
tags:
location: "office"
criticality: "low"
- channel: 7
name: "Server Room"
enabled: true
primary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=7&subtype=0"
secondary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=7&subtype=1"
use_primary: false
ai_enabled: true
ai_fps: 5.0
ai_resolution: "640x480"
ai_mode: "hybrid"
recording_enabled: true
pre_event_seconds: 15
post_event_seconds: 45
tags:
location: "server_room"
criticality: "high"
- channel: 8
name: "Perimeter"
enabled: true
primary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=8&subtype=0"
secondary_stream_url: "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=8&subtype=1"
use_primary: false
ai_enabled: true
ai_fps: 5.0
ai_resolution: "640x480"
ai_mode: "hybrid"
recording_enabled: true
pre_event_seconds: 10
post_event_seconds: 30
tags:
location: "perimeter"
criticality: "high"
settings:
default_ai_fps: 5.0
default_inference_resolution: "640x480"
default_ai_mode: "hybrid"
stream_stall_timeout: 10
reconnect_max_delay: 30
circuit_breaker:
failure_threshold: 5
recovery_timeout: 60
half_open_max_calls: 3
success_threshold: 2
8.6.2 Gateway Configuration (gateway.yaml)
# config/gateway.yaml
gateway:
id: "edge-ind-01"
location: "Plant Floor A"
description: "Edge gateway for CP PLUS DVR surveillance"
network:
lan_interface: "eth0"
lan_ip: "192.168.29.10"
subnet: "255.255.255.0"
gateway: "192.168.29.1"
dns:
- "8.8.8.8"
- "1.1.1.1"
vpn:
enabled: true
type: "wireguard"
tunnel_ip: "10.100.0.10"
server_endpoint: "cloud-gateway.example.com:51820"
persistent_keepalive: 25
mtu: 1400
dvr:
host: "192.168.29.200"
rtsp_port: 554
http_port: 80
https_port: 443
username: "admin"
password: "${DVR_PASSWORD}" # From environment
channels: 8
onvif_enabled: true
streams:
transport: "tcp"
buffer_size: 65536
stall_timeout: 10
reconnect:
initial_delay: 1.0
max_delay: 30.0
multiplier: 2.0
jitter: 0.1
ai:
inference_fps: 5.0
keyframe_only: false
mode: "hybrid"
resolution: "640x480"
confidence_threshold: 0.6
mqtt_topic_prefix: "ai/frames"
hls:
segment_duration: 2
window_size: 5
output_dir: "/data/hls"
cleanup_interval: 60
storage:
buffer_max_size_gb: 20
buffer_retention_hours: 72
hls_cache_max_size_gb: 5
log_max_size_gb: 1
upload:
endpoint: "https://10.100.0.1/api/upload"
max_retries: 5
retry_base_delay: 5
max_retry_delay: 300
concurrent_uploads: 3
verify_ssl: true
monitoring:
health_check_interval: 30
metrics_port: 9090
web_ui_port: 8080
log_level: "INFO"
log_format: "json"
security:
api_auth_enabled: true
jwt_secret: "${JWT_SECRET}"
jwt_expiry_hours: 24
allowed_dashboard_hosts:
- "https://dashboard.example.com"
8.6.3 Mosquitto MQTT Configuration
# config/mosquitto.conf
# MQTT Broker for internal edge communication
listener 1883 0.0.0.0
protocol mqtt
# Persistence
persistence true
persistence_location /mosquitto/data/
# Logging
log_dest file /mosquitto/log/mosquitto.log
log_type error
log_type warning
log_type information
connection_messages true
# Security — local network only
allow_anonymous true
# In production, use password_file and ACL
# password_file /mosquitto/config/passwd
# acl_file /mosquitto/config/acl
# Limits
max_connections 100
message_size_limit 10485760 # 10MB max (for image frames)
max_inflight_messages 100
max_queued_messages 1000
8.6.4 Prometheus Configuration
# config/prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'prometheus'
static_configs:
- targets: ['localhost:9090']
- job_name: 'node-exporter'
static_configs:
- targets: ['node-exporter:9100']
- job_name: 'stream-manager'
static_configs:
- targets: ['stream-manager:8080']
metrics_path: '/metrics'
8.7 Environment File (.env)
# .env — Environment variables for Docker Compose
# Copy to .env and fill in sensitive values
# Gateway Identity
GATEWAY_ID=edge-ind-01
LOG_LEVEL=INFO
# DVR Credentials (REQUIRED)
DVR_USERNAME=admin
DVR_PASSWORD=your_dvr_password_here
# AI Pipeline
AI_INFERENCE_FPS=5
AI_KEYFRAME_ONLY=true
AI_RESOLUTION=640x480
# Cloud Upload
UPLOAD_ENDPOINT=https://10.100.0.1/api/upload
UPLOAD_MAX_RETRIES=5
# VPN
VPN_ENABLED=true
# Buffer
BUFFER_MAX_SIZE_GB=20
BUFFER_RETENTION_HOURS=72
# JWT Secret for API Auth
JWT_SECRET=change_this_to_a_random_32_char_string
8.8 Main Application Entry Point
# src/main.py
import asyncio
import logging
import signal
import sys
from contextlib import asynccontextmanager
import uvicorn
from fastapi import FastAPI
from .api.server import create_app
from .core.config import GatewayConfig
from .core.logging_setup import setup_logging
from .streams.manager import StreamManager, StreamManagerConfig
from .streams.registry import CameraRegistry
from .streams.ffmpeg_builder import FFmpegCommandBuilder
from .streams.connector import ReconnectConfig
from .streams.circuit_breaker import CircuitBreakerConfig
from .health.monitor import HealthMonitor
from .health.metrics import MetricsCollector
logger = logging.getLogger(__name__)
class EdgeGateway:
"""Main application class — initializes and coordinates all services."""
def __init__(self):
self.config = GatewayConfig.load()
setup_logging(self.config.log_level, self.config.log_format)
self.registry = CameraRegistry(self.config.camera_config_path)
self.stream_manager: Optional[StreamManager] = None
self.health_monitor: Optional[HealthMonitor] = None
self.metrics = MetricsCollector()
self._shutdown_event = asyncio.Event()
async def initialize(self):
"""Initialize all services."""
logger.info(f"Starting Edge Gateway: {self.config.gateway_id}")
# Load camera configuration
await self.registry.load()
# Build FFmpeg command builder
ffmpeg_builder = FFmpegCommandBuilder()
# Configure stream manager
manager_config = StreamManagerConfig(
reconnect=ReconnectConfig(
initial_delay=self.config.reconnect_initial_delay,
max_delay=self.config.reconnect_max_delay,
stall_timeout=self.config.stream_stall_timeout,
),
circuit_breaker=CircuitBreakerConfig(
failure_threshold=self.config.cb_failure_threshold,
recovery_timeout=self.config.cb_recovery_timeout,
),
ffmpeg=ffmpeg_builder,
)
self.stream_manager = StreamManager(
config=manager_config,
registry=self.registry,
metrics=self.metrics,
)
self.health_monitor = HealthMonitor(
gateway_id=self.config.gateway_id,
dvr_host=self.config.dvr_host,
vpn_endpoint=self.config.vpn_server_ip,
check_interval=self.config.health_check_interval,
)
# Wire up health callbacks
self.health_monitor.on_health_change(self._on_health_change)
# Start services
await self.stream_manager.start()
await self.health_monitor.start()
logger.info("Edge Gateway initialized successfully")
def _on_health_change(self, report):
"""Handle health status changes."""
if report.overall_status == "critical":
logger.critical(f"Gateway health CRITICAL: {report.issues}")
elif report.overall_status == "degraded":
logger.warning(f"Gateway health degraded: {report.issues}")
async def run_api_server(self):
"""Run the FastAPI HTTP server."""
app = create_app(self.stream_manager, self.health_monitor, self.config)
config = uvicorn.Config(
app=app,
host="0.0.0.0",
port=self.config.web_ui_port,
log_level=self.config.log_level.lower(),
access_log=True,
)
server = uvicorn.Server(config)
await server.serve()
async def run(self):
"""Main run loop."""
await self.initialize()
# Run API server and wait for shutdown
try:
await asyncio.gather(
self.run_api_server(),
self._shutdown_event.wait(),
return_exceptions=True,
)
except asyncio.CancelledError:
pass
finally:
await self.shutdown()
async def shutdown(self):
"""Graceful shutdown."""
logger.info("Shutting down Edge Gateway...")
if self.stream_manager:
await self.stream_manager.stop()
if self.health_monitor:
await self.health_monitor.stop()
logger.info("Edge Gateway stopped")
def handle_signal(self, sig):
"""Handle OS signals for graceful shutdown."""
logger.info(f"Received signal {sig.name}")
self._shutdown_event.set()
def main():
gateway = EdgeGateway()
# Register signal handlers
loop = asyncio.get_event_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, lambda s=sig: gateway.handle_signal(s))
try:
loop.run_until_complete(gateway.run())
except KeyboardInterrupt:
logger.info("Keyboard interrupt received")
finally:
loop.run_until_complete(gateway.shutdown())
loop.close()
if __name__ == "__main__":
main()
8.9 Startup Script
#!/bin/bash
# scripts/start.sh — Edge Gateway startup script
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
PROJECT_DIR="$(dirname "$SCRIPT_DIR")"
ENV_FILE="${PROJECT_DIR}/.env"
# Colors
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m'
echo "========================================"
echo " Edge Gateway Startup"
echo "========================================"
# Check environment file
if [[ ! -f "$ENV_FILE" ]]; then
echo -e "${RED}ERROR: .env file not found at ${ENV_FILE}${NC}"
echo "Please copy .env.example to .env and fill in credentials"
exit 1
fi
# Load environment
set -a
source "$ENV_FILE"
set +a
# Validate required variables
if [[ -z "${DVR_PASSWORD:-}" ]]; then
echo -e "${RED}ERROR: DVR_PASSWORD is not set in .env${NC}"
exit 1
fi
# Create data directories
echo "Creating data directories..."
mkdir -p /data/{hls,buffer/alert_clips,buffer/continuous,buffer/pre_event,logs,config,mqtt,prometheus}
chmod 755 /data/*
# Check DVR connectivity
echo "Checking DVR connectivity (${DVR_HOST:-192.168.29.200})..."
if ping -c 1 -W 2 "${DVR_HOST:-192.168.29.200}" > /dev/null 2>&1; then
echo -e "${GREEN}DVR is reachable${NC}"
else
echo -e "${YELLOW}WARNING: DVR is not reachable — streams will fail${NC}"
fi
# Check Docker
echo "Checking Docker..."
if ! docker info > /dev/null 2>&1; then
echo -e "${RED}ERROR: Docker is not running${NC}"
exit 1
fi
# Pull latest images
echo "Pulling Docker images..."
docker compose -f "${PROJECT_DIR}/docker-compose.yml" pull
# Start services
echo "Starting Edge Gateway services..."
docker compose -f "${PROJECT_DIR}/docker-compose.yml" up -d --remove-orphans
# Wait for services
echo "Waiting for services to start..."
sleep 10
# Health check
echo "Running health check..."
MAX_RETRIES=12
RETRY=0
while [[ $RETRY -lt $MAX_RETRIES ]]; do
if curl -sf http://localhost:8080/api/v1/status > /dev/null 2>&1; then
echo -e "${GREEN}Edge Gateway is healthy!${NC}"
echo ""
echo "Diagnostic UI: http://localhost:8080/"
echo "API Status: http://localhost:8080/api/v1/status"
echo "HLS Streams: http://localhost:8081/hls/ch{N}.m3u8"
exit 0
fi
RETRY=$((RETRY + 1))
echo " Retry ${RETRY}/${MAX_RETRIES}..."
sleep 5
done
echo -e "${RED}ERROR: Gateway failed health check${NC}"
echo "Check logs: docker compose logs stream-manager"
exit 1
9. Appendix
9.1 Complete FFmpeg Command Reference
A. Single-Channel Operations
# A1. Probe stream information (codec, resolution, bitrate)
ffmpeg -rtsp_transport tcp -stimeout 5000000 \
-i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
-v quiet -print_format json -show_format -show_streams 2>/dev/null
# A2. Save single-frame snapshot
ffmpeg -rtsp_transport tcp -stimeout 5000000 \
-i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
-vf "select='eq(n,0)'" -vframes 1 -q:v 2 \
"/data/snapshots/ch1_$(date +%Y%m%d_%H%M%S).jpg"
# A3. Record 60-second clip to MP4
ffmpeg -rtsp_transport tcp -stimeout 5000000 -t 60 \
-i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
-c:v copy -c:a copy -movflags +faststart \
"/data/clips/ch1_$(date +%Y%m%d_%H%M%S).mp4"
# A4. Transcode to H.264 for cloud compatibility
ffmpeg -rtsp_transport tcp -stimeout 5000000 \
-i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
-c:v libx264 -preset fast -tune zerolatency -crf 23 \
-maxrate 2000k -bufsize 4000k -g 50 \
-c:a aac -b:a 128k \
-f mp4 -movflags +faststart \
"/data/clips/ch1_transcoded_$(date +%Y%m%d_%H%M%S).mp4"
# A5. Extract audio only
ffmpeg -rtsp_transport tcp -stimeout 5000000 \
-i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
-vn -c:a aac -b:a 128k \
"/data/audio/ch1_$(date +%Y%m%d_%H%M%S).aac"
# A6. Stream to RTMP relay (e.g., YouTube, custom server)
ffmpeg -rtsp_transport tcp -stimeout 5000000 \
-i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
-c:v copy -c:a aac -b:a 128k -ar 44100 \
-f flv "rtmp://relay-server.example.com/live/ch1"
# A7. Generate thumbnail grid (contact sheet)
ffmpeg -rtsp_transport tcp -stimeout 5000000 -t 60 \
-i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
-vf "fps=1/5,scale=320:240,tile=4x3" -vframes 1 \
"/data/snapshots/ch1_contact.jpg"
# A8. Motion detection using FFmpeg scene filter
ffmpeg -rtsp_transport tcp -stimeout 5000000 \
-i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
-vf "select='gt(scene,0.1)',scale=640:480" -f image2pipe -vcodec mjpeg - \
> /dev/null 2>&1 | grep -o 'scene:[0-9.]*' | head -20
# A9. Low-latency WebRTC-compatible output
ffmpeg -rtsp_transport tcp -stimeout 5000000 \
-i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
-c:v libx264 -preset ultrafast -tune zerolatency -profile:v baseline \
-vf "scale=640:480" -b:v 1000k -g 30 \
-c:a opus -b:a 64k \
-f webm pipe:1
# A10. MJPEG stream for simple HTTP embedding
ffmpeg -rtsp_transport tcp -stimeout 5000000 \
-i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
-c:v mjpeg -q:v 5 -vf "fps=15,scale=640:480" \
-f mpjpeg -boundary_tag ffmpeg \
"http://localhost:8081/mjpeg/ch1"
B. Multi-Channel Operations
# B1. Simultaneous 4-channel recording
for CH in 1 2 3 4; do
ffmpeg -rtsp_transport tcp -stimeout 5000000 \
-i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=${CH}&subtype=0" \
-c:v copy -c:a copy -t 300 \
-movflags +faststart \
"/data/clips/ch${CH}_$(date +%Y%m%d_%H%M%S).mp4" &
done
wait
# B2. 8-channel status check (probe all streams)
for CH in $(seq 1 8); do
echo "=== Channel ${CH} ==="
ffprobe -v error -rtsp_transport tcp -stimeout 3000000 \
-select_streams v:0 \
-show_entries stream=codec_name,width,height,r_frame_rate,bit_rate \
-of csv=s=,:p=0 \
"rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=${CH}&subtype=0" \
2>/dev/null || echo " UNREACHABLE"
done
# B3. Mosaic view (all 8 channels in 4x2 grid)
ffmpeg \
-rtsp_transport tcp -stimeout 5000000 -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=1" \
-rtsp_transport tcp -stimeout 5000000 -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=2&subtype=1" \
-rtsp_transport tcp -stimeout 5000000 -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=3&subtype=1" \
-rtsp_transport tcp -stimeout 5000000 -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=4&subtype=1" \
-rtsp_transport tcp -stimeout 5000000 -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=5&subtype=1" \
-rtsp_transport tcp -stimeout 5000000 -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=6&subtype=1" \
-rtsp_transport tcp -stimeout 5000000 -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=7&subtype=1" \
-rtsp_transport tcp -stimeout 5000000 -i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=8&subtype=1" \
-filter_complex "
[0:v]scale=480:270[v0]; [1:v]scale=480:270[v1];
[2:v]scale=480:270[v2]; [3:v]scale=480:270[v3];
[4:v]scale=480:270[v4]; [5:v]scale=480:270[v5];
[6:v]scale=480:270[v6]; [7:v]scale=480:270[v7];
[v0][v1][v2][v3]hstack=inputs=4[row1];
[v4][v5][v6][v7]hstack=inputs=4[row2];
[row1][row2]vstack=inputs=2[out]
" \
-map "[out]" -c:v libx264 -preset fast -b:v 4000k -f hls \
-hls_time 2 -hls_list_size 5 -hls_flags delete_segments \
"/data/hls/mosaic.m3u8"
# B4. Batch transcode all channels to H.264
for CH in $(seq 1 8); do
ffmpeg -rtsp_transport tcp -stimeout 5000000 -t 60 \
-i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=${CH}&subtype=0" \
-c:v libx264 -preset fast -crf 28 -vf "scale=640:480" \
-c:a aac -b:a 64k -ac 1 \
-movflags +faststart \
"/data/clips/ch${CH}_compact.mp4" &
done
wait
echo "All channels transcoded"
C. HLS-Specific Operations
# C1. HLS with AES-128 encryption
ffmpeg -rtsp_transport tcp -stimeout 5000000 \
-i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
-c:v copy -c:a copy \
-f hls -hls_time 2 -hls_list_size 5 \
-hls_key_info_file "/data/config/hls_key_info.txt" \
-hls_segment_filename "/data/hls/ch1_enc_%03d.ts" \
"/data/hls/ch1_enc.m3u8"
# C2. HLS with independent segments (for CDN compatibility)
ffmpeg -rtsp_transport tcp -stimeout 5000000 \
-i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
-c:v copy -c:a copy \
-f hls -hls_time 2 -hls_list_size 5 \
-hls_flags independent_segments+delete_segments+omit_endlist \
-hls_segment_filename "/data/hls/ch1_%03d.ts" \
"/data/hls/ch1.m3u8"
# C3. Multi-variant HLS (adaptive bitrate)
ffmpeg -rtsp_transport tcp -stimeout 5000000 \
-i "rtsp://admin:password@192.168.29.200:554/cam/realmonitor?channel=1&subtype=0" \
-filter_complex "[0:v]split=2[high][low];[low]scale=640:480[lowv]" \
-map "[high]" -map 0:a -c:v:0 copy -c:a:0 copy \
-map "[lowv]" -c:v:1 libx264 -preset fast -b:v:1 800k -c:a:1 aac -b:a:1 64k \
-var_stream_map "v:0,a:0,name:high v:1,a:1,name:low" \
-f hls -hls_time 2 -hls_list_size 5 \
-hls_flags delete_segments+omit_endlist \
-master_pl_name "ch1_master.m3u8" \
-hls_segment_filename "/data/hls/ch1_%v_%03d.ts" \
"/data/hls/ch1_%v.m3u8"
9.2 API Endpoint Reference
| Method | Endpoint | Description | Auth |
|---|---|---|---|
GET |
/ |
Diagnostic HTML UI | No |
GET |
/api/v1/status |
Full gateway status | No |
GET |
/api/v1/cameras |
List all cameras | Yes |
GET |
/api/v1/cameras/{ch} |
Get camera config | Yes |
PUT |
/api/v1/cameras/{ch} |
Update camera config | Yes |
POST |
/api/v1/cameras/{ch}/restart |
Restart stream | Yes |
POST |
/api/v1/cameras/{ch}/enable |
Enable camera | Yes |
POST |
/api/v1/cameras/{ch}/disable |
Disable camera | Yes |
GET |
/api/v1/streams/{ch}/status |
Stream status detail | Yes |
GET |
/api/v1/streams/{ch}/stats |
Stream statistics | Yes |
POST |
/api/v1/streams/{ch}/start |
Start stream | Yes |
POST |
/api/v1/streams/{ch}/stop |
Stop stream | Yes |
GET |
/api/v1/hls/{ch}.m3u8 |
HLS playlist | No* |
GET |
/api/v1/hls/{segment}.ts |
HLS segment | No* |
GET |
/api/v1/buffer/stats |
Buffer statistics | Yes |
GET |
/api/v1/buffer/uploads/pending |
Pending uploads | Yes |
POST |
/api/v1/buffer/uploads/{id}/retry |
Retry upload | Yes |
GET |
/api/v1/health |
Health check (simple) | No |
GET |
/api/v1/health/detailed |
Detailed health | Yes |
GET |
/api/v1/metrics |
Prometheus metrics | No |
WS |
/ws/live |
Real-time status WebSocket | Yes |
*HLS endpoints use token-based URL signing for security.
9.3 MQTT Topic Structure
| Topic | Direction | Payload | Description |
|---|---|---|---|
ai/frames/{channel} |
Edge → Cloud | JPEG bytes + metadata | AI inference frames |
ai/detections/{channel} |
Edge → Cloud | JSON | Detection results |
ai/events/{channel} |
Edge → Cloud | JSON | Triggered events |
streams/status/{channel} |
Edge → Cloud | JSON | Stream health status |
gateway/health |
Edge → Cloud | JSON | Gateway health report |
gateway/metrics |
Edge → Cloud | JSON | Resource metrics |
control/cameras/{channel}/restart |
Cloud → Edge | JSON | Restart command |
control/cameras/{channel}/config |
Cloud → Edge | JSON | Config update |
control/gateway/reboot |
Cloud → Edge | Empty | Gateway reboot |
alerts/critical |
Edge → Cloud | JSON | Critical alerts |
buffer/uploads/status |
Edge → Cloud | JSON | Upload progress |
9.4 Failure Mode Matrix
| Failure | Detection | Response | Fallback | Recovery |
|---|---|---|---|---|
| DVR network unreachable | ICMP ping fails | Mark all streams OFFLINE | Use cached last frame | Auto-retry every 30s |
| Single channel RTSP fail | No frames for 10s | Exponential backoff reconnect | Circuit breaker opens | Auto-recovery test |
| All 8 channels fail | Mass disconnect | Alert to cloud, enter degraded mode | Static placeholder | DVR power cycle suggestion |
| VPN tunnel down | WG handshake fail | Queue uploads locally | Local-only mode | Auto-reconnect WG |
| Disk full (>95%) | df monitor | Emergency cleanup | Drop oldest clips | Alert + cleanup |
| FFmpeg crash | Process exit code | Restart container | Retry with delay | Docker auto-restart |
| MQTT broker down | Connection timeout | Buffer messages in memory | File-based queuing | Auto-reconnect |
| High CPU usage | >90% for 60s | Reduce AI FPS | Skip non-key frames | Auto-scale down |
| Memory pressure | >90% usage | Drop frames, OOM protection | Reduce buffer sizes | Kernel OOM killer |
| Cloud upload fail | HTTP 5xx | Retry with backoff | Local retention | Infinite retry |
| SSL certificate expiry | TLS handshake fail | Log warning | Insecure fallback (dev only) | Auto-renewal |
| Power loss | Hardware event | UPS graceful shutdown | None (hardware) | Auto-start on boot |
9.5 Network Port Reference
| Port | Protocol | Service | Direction | Notes |
|---|---|---|---|---|
| 80/tcp | HTTP | DVR Web UI | Edge → DVR | Local only |
| 443/tcp | HTTPS | DVR Web UI | Edge → DVR | Local only |
| 554/tcp | RTSP | Video streams | Edge → DVR | Primary protocol |
| 554/udp | RTSP/UDP | Video streams | Edge → DVR | Fallback |
| 25001/tcp | TCP | DVR control | Edge → DVR | CP PLUS protocol |
| 25002/udp | UDP | DVR control | Edge → DVR | CP PLUS protocol |
| 123/udp | NTP | Time sync | Edge → Gateway | Or local NTP |
| 8080/tcp | HTTP | Gateway API | External → Edge | Diagnostic UI |
| 8081/tcp | HTTP | HLS server | External → Edge | Video segments |
| 1883/tcp | MQTT | Message broker | Internal | Local only |
| 51820/udp | WireGuard | VPN tunnel | Edge ↔ Cloud | Encrypted |
| 9090/tcp | HTTP | Prometheus | Internal | Metrics |
| 9100/tcp | HTTP | Node Exporter | Internal | Host metrics |
9.6 Performance Benchmarks (Expected)
| Metric | Intel NUC i5 | RPi 5 | Jetson Orin Nano |
|---|---|---|---|
| 8x RTSP ingest (sub) | <5% CPU | <15% CPU | <10% CPU |
| 8x RTSP ingest (main) | <10% CPU | <30% CPU | <15% CPU |
| HLS segment generation | <5% CPU | <10% CPU | <5% CPU |
| AI frame extraction (8ch) | <20% CPU | <50% CPU* | <10% CPU (GPU) |
| Upload throughput | 50+ Mbps | 20+ Mbps | 30+ Mbps |
| End-to-end latency | 500-1000ms | 800-1500ms | 600-1200ms |
| Power consumption | 15-25W | 5-8W | 7-15W |
*RPi 5 may struggle with 8-channel simultaneous AI extraction; consider 4-channel max.
9.7 Security Checklist
- DVR password changed from default
- WireGuard PSK configured
- VPN uses strong crypto (Curve25519, ChaCha20-Poly1305)
- API uses JWT authentication
- HLS URLs use signed tokens (time-limited)
- DVR Web UI not exposed externally
- MQTT broker bound to localhost only
- Docker containers run as non-root
- Secrets in
.env, not in code -
.envfile permissions: 600 - SSL certificates for API HTTPS
- Log rotation configured
- Fail2ban for API brute force protection
- Network segmentation (DVR on isolated VLAN)
- Regular firmware updates for DVR
- Backup of gateway configuration
9.8 Glossary
| Term | Definition |
|---|---|
| DVR | Digital Video Recorder — CP PLUS CP-UVR-0801E1-CV2 |
| UVR | Universal Video Recorder (supports analog + IP cameras) |
| RTSP | Real-Time Streaming Protocol (RFC 7826) |
| ONVIF | Open Network Video Interface Forum standard |
| HLS | HTTP Live Streaming (Apple) |
| DASH | Dynamic Adaptive Streaming over HTTP |
| WebRTC | Web Real-Time Communication |
| GOP | Group of Pictures (I-frame + P-frames + B-frames) |
| CRF | Constant Rate Factor (quality-based encoding) |
| DSCP | Differentiated Services Code Point (QoS marking) |
| MTU | Maximum Transmission Unit |
| EWMA | Exponentially Weighted Moving Average |
| Circuit Breaker | Fault tolerance pattern for failing services |
| Ring Buffer | Circular fixed-size buffer overwriting oldest data |
| Keyframe/I-frame | Intra-coded frame (full image, no reference) |
| P-frame | Predicted frame (references previous frame) |
| MJPEG | Motion JPEG (sequence of JPEG images) |
| WireGuard | Modern VPN protocol using UDP |
Document version 1.0.0 — Video Ingestion Subsystem Design for CP PLUS ORANGE Series DVR Generated for AI-powered industrial surveillance platform