Suspicious Activity Detection System — Night-Mode Surveillance
Technical Design Document
Version: 1.0
Scope: 8-Channel Indoor/Industrial Night Surveillance
Operating Window: 22:00 - 06:00 (configurable per camera)
Classification: Confidential — Internal Use
Table of Contents
- System Architecture Overview
- Detection Modules
- 2.1 Intrusion Detection
- 2.2 Loitering Detection
- 2.3 Running Detection
- 2.4 Crowding / Group Formation
- 2.5 Fall Detection
- 2.6 Abandoned Object Detection
- 2.7 Unusual After-Hours Presence
- 2.8 Zone Breach
- 2.9 Repeated Re-entry Patterns
- 2.10 Suspicious Dwell Time
- Activity Scoring Engine
- Configuration Schema
- Alert Generation Logic
- Night Mode Scheduler
- Pipeline Integration
- CV Models Reference
- Pseudocode Reference
- Performance & Resource Budget
- Testing & Validation Strategy
1. System Architecture Overview
1.1 High-Level Architecture
┌─────────────────────────────────────────────────────────────────────────────┐
│ NIGHT-MODE SURVEILLANCE SYSTEM │
├─────────────────────────────────────────────────────────────────────────────┤
│ Camera Layer ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ (8 Channels) │ Cam 1 │ │ Cam 2 │ │ ... │ │ Cam 8 │ │
│ └───┬─────┘ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │ │
├────────────────────────────┼────────────┼───────────┼───────────┼──────────┤
│ Base AI Pipeline ▼ ▼ ▼ ▼ │
│ (Shared Detections) ┌─────────────────────────────────────────────────┐ │
│ │ Person Detection (YOLOv8) │ │
│ │ Object Detection (YOLOv8) │ │
│ │ Multi-Object Tracking (ByteTrack) │ │
│ └────────────┬────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ SUSPICIOUS ACTIVITY ANALYSIS LAYER │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Intrusion│ │Loitering │ │ Running │ │ Crowding │ │ Fall │ │ │
│ │ │ Detection│ │ Detection│ │ Detection│ │ Detection│ │ Detection│ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │Abandoned │ │ After- │ │ Zone │ │ Repeated │ │ Suspicious│ │ │
│ │ │ Object │ │ Hours │ │ Breach │ │ Re-entry │ │ Dwell │ │ │
│ │ │ Detection│ │ Presence │ │ Detection│ │ Patterns │ │ Time │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ └──────────────────────────┬──────────────────────────────────────────┘ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ ACTIVITY SCORING ENGINE │ │
│ │ Composite Score = weighted_sum(detection_signals) │ │
│ │ Time-decay applied | Escalation thresholds │ │
│ └──────────────────────────┬──────────────────────────────────────────┘ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ ALERT GENERATION & MANAGEMENT │ │
│ │ Severity Assignment | Suppression Rules | Evidence Attachment │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
1.2 Data Flow
Video Frame (RTSP/ONVIF)
│
▼
┌─────────────────┐
│ Frame Capture │ ◄── 25 FPS target, 1920x1080 resolution
│ & Preprocessing │ ◄── Night enhancement (histogram equalization, denoise)
└────────┬────────┘
│
▼
┌─────────────────┐
│ Base Detections │ ◄── Person boxes (bbox, confidence, track_id)
│ │ ◄── Object boxes (backpack, suitcase, box, etc.)
│ │ ◄── Pose keypoints (if person detected)
└────────┬────────┘
│
▼
┌──────────────────────────────────────────────────────────────┐
│ Detection State Buffer │
│ - Track history (last 5 minutes per track_id) │
│ - Zone occupancy (which tracks in which zones) │
│ - Velocity vectors per track │
│ - Pose history per track (last 10 seconds) │
│ - Object-to-person association map │
└────────┬──────────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────┐
│ Rule Engine — All 10 Detection Modules Evaluate │
│ Each module reads from state buffer and checks its rules │
│ Outputs: detection_event (type, severity, confidence) │
└────────┬──────────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────┐
│ Scoring Engine — Composite suspicious activity score │
│ S_total = sum(weight_i * signal_i * decay(t_i)) │
└────────┬──────────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────┐
│ Alert Manager — Severity-based alert decision │
│ Suppression check → Evidence capture → Alert dispatch │
└──────────────────────────────────────────────────────────────┘
1.3 Key Design Principles
| Principle | Implementation |
|---|---|
| Real-time Processing | All detection modules run in parallel on shared state; target latency < 200ms per frame |
| Modularity | Each detection module is independently enable/disable configurable per camera |
| Extensibility | New detection types can be added as plug-in modules following the DetectionModule interface |
| False-Positive Resilience | Confidence thresholds, temporal filtering, and composite scoring reduce noise |
| Resource Efficiency | Shared base detections (person, object, tracking) feed all modules; no redundant inference |
| Audit Trail | Every detection event, scoring calculation, and alert decision is logged with timestamp |
2. Detection Modules
2.1 Intrusion Detection
Description
Detects when a person enters a user-defined restricted zone during night hours. Zones are specified as polygons drawn on the camera's field of view.
Algorithm
Inputs:
- person_detections: list of (bbox, track_id, confidence) from base pipeline
- restricted_zones: list of polygon vertices [(x1,y1), (x2,y2), ...] per camera
- confidence_threshold: float (default 0.65)
- overlap_threshold: float (default 0.30) — min IoU to trigger
Processing (each frame):
1. Filter detections: confidence >= confidence_threshold AND class == 'person'
2. For each person detection:
a. Compute bounding box polygon
b. For each restricted_zone:
i. Compute overlap_area = area(intersection(bbox_poly, zone_poly))
ii. Compute overlap_ratio = overlap_area / area(bbox_poly)
iii. If overlap_ratio >= overlap_threshold:
→ Trigger INTRUSION event
3. Intrusion event structure:
{
event_type: "INTRUSION",
track_id: <id>,
zone_id: <zone_identifier>,
overlap_ratio: <float>,
confidence: <float>,
timestamp: <ISO8601>,
severity: "HIGH",
bbox: [x1, y1, x2, y2],
camera_id: <cam_id>
}
Zone Overlap Calculation
def compute_zone_overlap(person_bbox, zone_polygon):
"""
Calculate overlap between person bounding box and restricted zone polygon.
Uses shapely-like polygon intersection.
"""
from shapely.geometry import box, Polygon
person_poly = box(person_bbox[0], person_bbox[1], person_bbox[2], person_bbox[3])
zone_poly = Polygon(zone_polygon)
if not zone_poly.is_valid:
zone_poly = zone_poly.buffer(0)
intersection_area = person_poly.intersection(zone_poly).area
bbox_area = person_poly.area
overlap_ratio = intersection_area / bbox_area if bbox_area > 0 else 0
return overlap_ratio, intersection_area
Configuration Parameters
| Parameter | Type | Default | Range | Description |
|---|---|---|---|---|
enabled |
bool | true | — | Enable/disable module |
confidence_threshold |
float | 0.65 | 0.0 - 1.0 | Minimum person detection confidence |
overlap_threshold |
float | 0.30 | 0.0 - 1.0 | Minimum bbox/zone overlap ratio |
restricted_zones |
list of polygons | [] | — | Each polygon: list of (x, y) normalized coordinates |
min_zone_area |
float | 0.01 | 0.0 - 1.0 | Minimum zone area as fraction of frame |
cooldown_seconds |
int | 30 | 0 - 3600 | Cooldown between repeated alerts for same track in same zone |
schedule_override |
bool | false | — | If true, enforce even outside night hours |
Hysteresis (Anti-Flutter)
To prevent rapid on/off triggering when a person stands at the zone boundary:
INTRUSION_STATE per (track_id, zone_id):
- NONE → INTRUDING: requires overlap >= overlap_threshold for 3 consecutive frames
- INTRUDING → NONE: requires overlap < (overlap_threshold - 0.10) for 5 consecutive frames
- Alert is generated only on NONE → INTRUDING transition
Severity: HIGH
Rationale: Unauthorized entry into restricted zones during night hours is a critical security event requiring immediate attention.
2.2 Loitering Detection
Description
Detects when a person remains within a defined area for longer than a configurable dwell time threshold. Different from Suspicious Dwell Time (Section 2.10) which targets specific sensitive locations.
Algorithm
Inputs:
- track_history: dict mapping track_id → list of (timestamp, centroid_x, centroid_y, bbox)
- loitering_zones: list of polygon-defined zones (optional; if empty, use any area)
- dwell_time_threshold_seconds: int (default 300 = 5 minutes)
- movement_tolerance_pixels: int (default 50) — centroid must stay within this radius
Processing:
1. For each active track_id:
a. Retrieve position history for last T seconds (T = dwell_time_threshold)
b. Compute bounding circle of all centroids in window
c. If bounding_circle_radius <= movement_tolerance_pixels:
→ Candidate loitering detected
d. Require candidate state for 3 consecutive evaluations (anti-flutter)
2. If zone-restricted: only trigger if loitering centroid falls inside any loitering_zone
3. Loiter event structure:
{
event_type: "LOITERING",
track_id: <id>,
zone_id: <zone_or_null>,
dwell_time_seconds: <float>,
centroid_stability_px: <float>,
severity: "MEDIUM",
...
}
Dwell Time Measurement
def measure_dwell_time(track_history, current_time, track_id):
"""
Calculate continuous dwell time within a radius.
Returns: (dwell_seconds, stability_radius, is_loitering)
"""
positions = track_history[track_id] # list of (t, cx, cy)
if len(positions) < 2:
return 0, float('inf'), False
# Walk backward from current time
current_pos = positions[-1]
contiguous_start = current_time
for i in range(len(positions) - 2, -1, -1):
prev_pos = positions[i]
time_gap = current_pos[0] - prev_pos[0]
distance = euclidean_distance(current_pos[1:3], prev_pos[1:3])
if time_gap > MAX_GAP_SECONDS or distance > movement_tolerance_pixels:
break
contiguous_start = prev_pos[0]
dwell_seconds = current_time - contiguous_start
return dwell_seconds, max_distance, dwell_seconds >= dwell_time_threshold
Configuration Parameters
| Parameter | Type | Default | Range | Description |
|---|---|---|---|---|
enabled |
bool | true | — | Enable/disable module |
dwell_time_threshold_seconds |
int | 300 | 30 - 3600 | Seconds before loitering alert |
movement_tolerance_pixels |
int | 50 | 10 - 500 | Max centroid displacement to count as stationary |
loitering_zones |
list | [] | — | Optional zone restriction; empty = entire frame |
consecutive_confirmations |
int | 3 | 1 - 10 | Required consecutive evaluations before alert |
cooldown_seconds |
int | 60 | 0 - 3600 | Cooldown for same track_id |
Severity: MEDIUM
Rationale: Loitering may indicate surveillance of the facility or waiting for an opportunity; not immediately dangerous but warrants monitoring.
2.3 Running Detection
Description
Detects abnormally fast pedestrian movement, which may indicate fleeing, rushing toward a target, or emergency situations.
Algorithm
Inputs:
- track_history: dict mapping track_id → list of (timestamp, centroid_x, centroid_y)
- camera_calibration: homography matrix (optional, for real-world speed)
- speed_threshold_pixels_per_second: float (default 150)
- speed_threshold_kmh: float (default 15.0) — if calibration available
Processing:
1. For each active track_id with at least N positions (N = 5):
a. Compute velocity vector over sliding window (last 1 second)
b. speed = magnitude(velocity) in pixels/second
c. If calibrated: convert to km/h using homography + assumed person height
d. Compute speed_percentile_90 (90th percentile speed over last 3 seconds)
e. If speed_percentile_90 >= threshold AND person is upright (not falling):
→ Candidate running
2. Require candidate state for 2 consecutive seconds to suppress noise
3. Running event structure:
{
event_type: "RUNNING",
track_id: <id>,
estimated_speed_kmh: <float>,
speed_percentile_90: <float>,
direction_vector: [dx, dy],
severity: "MEDIUM",
...
}
Speed Estimation
def estimate_speed(track_id, track_buffer, homography=None, window_seconds=1.0):
"""
Estimate pedestrian speed from tracking data.
"""
history = track_buffer.get(track_id, [])
if len(history) < 2:
return 0.0
now = history[-1][0]
cutoff = now - window_seconds
recent = [h for h in history if h[0] >= cutoff]
if len(recent) < 2:
return 0.0
# Compute displacements between consecutive frames
displacements = []
for i in range(1, len(recent)):
dt = recent[i][0] - recent[i-1][0]
if dt > 0:
dx = recent[i][1] - recent[i-1][1]
dy = recent[i][2] - recent[i-1][2]
speed_px_s = sqrt(dx*dx + dy*dy) / dt
displacements.append(speed_px_s)
if not displacements:
return 0.0
# Use 90th percentile to filter out brief stops
speed_p90 = np.percentile(displacements, 90)
if homography is not None:
# Convert pixel speed to real-world km/h
# Reference: average person height = 1.7m in world → ~100px in image
scale_factor = homography.get_scale_at_point(recent[-1][1:3])
speed_kmh = speed_px_s * scale_factor * 3.6
return speed_kmh
return speed_p90 # pixels/second fallback
Configuration Parameters
| Parameter | Type | Default | Range | Description |
|---|---|---|---|---|
enabled |
bool | true | — | Enable/disable module |
speed_threshold_pixels_per_second |
float | 150 | 50 - 500 | Speed threshold in pixels/second (uncalibrated) |
speed_threshold_kmh |
float | 15.0 | 5.0 - 30.0 | Speed threshold in km/h (calibrated cameras) |
confirmation_duration_seconds |
float | 2.0 | 0.5 - 5.0 | Seconds of sustained speed before alert |
speed_percentile |
float | 90 | 50 - 99 | Percentile for speed robustness |
min_track_history_seconds |
float | 1.0 | 0.5 - 3.0 | Minimum tracking duration before speed estimation |
exclude_falling_state |
bool | true | — | Don't trigger if fall is concurrently detected |
Per-Camera Calibration
# Example camera calibration entry
camera_01:
homography_matrix: [[h11, h12, h13], [h21, h22, h23], [h31, h32, h33]]
ground_plane_reference: # Known points in (image_x, image_y) → (world_x, world_y_meters)
- {img: [100, 800], world: [0.0, 0.0]}
- {img: [900, 800], world: [10.0, 0.0]}
- {img: [100, 400], world: [0.0, 5.0]}
- {img: [900, 400], world: [10.0, 5.0]}
pixels_per_meter_at_center: 45.0
Severity: MEDIUM
Rationale: Running may indicate an emergency or malicious intent; requires attention but may also be a false positive (employee rushing).
2.4 Crowding / Group Formation
Description
Detects when multiple persons gather in a small area, which may indicate unauthorized group activity, confrontation, or coordinated intrusion.
Algorithm
Inputs:
- person_detections: list of (bbox, track_id, confidence)
- count_threshold: int (default 3)
- area_threshold: float (default 0.15) — max area as fraction of frame
- density_threshold: float (default 0.05) — persons per normalized area unit
Processing:
1. Filter detections: confidence >= 0.50 AND class == 'person'
2. If count < count_threshold: return NO_EVENT
3. Compute bounding box of all person centroids
4. group_area = area(bbox_of_centroids) / area(frame)
5. density = count / group_area (if group_area > 0)
6. If count >= count_threshold AND group_area <= area_threshold:
→ Candidate crowding
7. Apply temporal confirmation: candidate must persist for 5 consecutive frames
8. Crowding event structure:
{
event_type: "CROWDING",
person_count: <int>,
group_area_ratio: <float>,
density_score: <float>,
track_ids: [<id1>, <id2>, ...],
centroid_bbox: [x1, y1, x2, y2],
severity: "MEDIUM",
...
}
DBSCAN-Based Clustering (Alternative)
def detect_crowding_dbscan(detections, eps=0.08, min_samples=3, frame_area=1.0):
"""
Use DBSCAN clustering on normalized centroid positions for robust group detection.
"""
from sklearn.cluster import DBSCAN
centroids = np.array([
[(d.bbox[0] + d.bbox[2]) / 2, (d.bbox[1] + d.bbox[3]) / 2]
for d in detections if d.confidence >= 0.50
])
if len(centroids) < min_samples:
return []
# Normalize to [0, 1] range
centroids_norm = centroids / np.array([frame_width, frame_height])
clustering = DBSCAN(eps=eps, min_samples=min_samples).fit(centroids_norm)
labels = clustering.labels_
groups = []
for label in set(labels):
if label == -1: # Noise points
continue
group_mask = labels == label
group_centroids = centroids_norm[group_mask]
group_size = len(group_centroids)
# Compute bounding box of group
min_xy = group_centroids.min(axis=0)
max_xy = group_centroids.max(axis=0)
group_area_ratio = (max_xy[0] - min_xy[0]) * (max_xy[1] - min_xy[1])
density = group_size / max(group_area_ratio, 0.001)
groups.append({
'count': group_size,
'area_ratio': group_area_ratio,
'density': density,
'members': np.where(group_mask)[0].tolist()
})
return groups
Configuration Parameters
| Parameter | Type | Default | Range | Description |
|---|---|---|---|---|
enabled |
bool | true | — | Enable/disable module |
count_threshold |
int | 3 | 2 - 20 | Minimum persons to form a crowd |
area_threshold |
float | 0.15 | 0.01 - 0.50 | Max group area as fraction of frame |
density_threshold |
float | 0.05 | 0.01 - 0.50 | Min persons per normalized area unit |
confirmation_frames |
int | 5 | 1 - 30 | Consecutive frames before alert |
use_dbscan |
bool | true | — | Use DBSCAN instead of bounding box method |
dbscan_eps |
float | 0.08 | 0.01 - 0.30 | DBSCAN neighborhood radius (normalized) |
cooldown_seconds |
int | 60 | 0 - 3600 | Cooldown between crowding alerts |
Severity: MEDIUM
Rationale: Group formation may indicate coordinated activity; requires monitoring but could be legitimate.
2.5 Fall Detection
Description
Detects when a person falls to the ground, which may indicate an accident, medical emergency, or assault.
Algorithm
Inputs:
- pose_estimation_output: keypoints for each detected person (COCO format: 17 keypoints)
- track_id: associated track from MOT
- fall_confidence_threshold: float (default 0.75)
Processing (per person):
1. Extract keypoints: nose, shoulders, hips, knees, ankles
2. Compute torso angle:
- torso_vector = midpoint(shoulders) - midpoint(hips)
- angle = arctan2(torso_vector.y, torso_vector.x) # angle from horizontal
- upright_angle ≈ 90 degrees (vertical torso)
3. Compute aspect ratio:
- person_height = max(y) - min(y) of all keypoints
- person_width = max(x) - min(x) of all keypoints
- aspect_ratio = width / height
4. Fall classification (multi-criteria):
a. Torso angle is near horizontal: |angle - 90| > 45 degrees
b. Aspect ratio > 1.5 (person wider than tall)
c. Center of mass (y-coordinate) is low (near ground)
d. All keypoints confidence > 0.30
5. Temporal consistency: fall state must persist for >= 1 second
6. Distinguish from bending:
- If only torso angle is abnormal but aspect_ratio < 1.2: likely bending → suppress
- If hip keypoint y is close to ankle y: confirmed fall
7. Fall event structure:
{
event_type: "FALL",
track_id: <id>,
torso_angle_degrees: <float>,
aspect_ratio: <float>,
fall_confidence: <float>,
keypoint_confidence_avg: <float>,
severity: "HIGH",
...
}
Pose-Based Fall Classification
def classify_fall(pose_keypoints, track_history):
"""
Classify fall from pose keypoints using multiple geometric features.
pose_keypoints: array of shape (17, 3) → (x, y, confidence)
Returns: (is_fall, confidence, features_dict)
"""
# COCO keypoint indices
NOSE, L_EYE, R_EYE, L_EAR, R_EAR = 0, 1, 2, 3, 4
L_SHOULDER, R_SHOULDER = 5, 6
L_ELBOW, R_ELBOW = 7, 8
L_WRIST, R_WRIST = 9, 10
L_HIP, R_HIP = 11, 12
L_KNEE, R_KNEE = 13, 14
L_ANKLE, R_ANKLE = 15, 16
# Extract relevant keypoints
left_shoulder = pose_keypoints[L_SHOULDER]
right_shoulder = pose_keypoints[R_SHOULDER]
left_hip = pose_keypoints[L_HIP]
right_hip = pose_keypoints[R_HIP]
left_knee = pose_keypoints[L_KNEE]
right_knee = pose_keypoints[R_KNEE]
left_ankle = pose_keypoints[L_ANKLE]
right_ankle = pose_keypoints[R_ANKLE]
# Filter low-confidence keypoints
keypoints = [left_shoulder, right_shoulder, left_hip, right_hip,
left_knee, right_knee, left_ankle, right_ankle]
avg_confidence = np.mean([kp[2] for kp in keypoints])
if avg_confidence < 0.3:
return False, 0.0, {}
# 1. Torso angle
shoulder_mid = midpoint(left_shoulder, right_shoulder)
hip_mid = midpoint(left_hip, right_hip)
torso_vector = (shoulder_mid[0] - hip_mid[0], shoulder_mid[1] - hip_mid[1])
torso_angle = abs(degrees(atan2(torso_vector[1], torso_vector[0])))
torso_angle_from_vertical = abs(torso_angle - 90)
# 2. Aspect ratio
all_x = [kp[0] for kp in keypoints]
all_y = [kp[1] for kp in keypoints]
width = max(all_x) - min(all_x)
height = max(all_y) - min(all_y)
aspect_ratio = width / max(height, 1)
# 3. Height ratio — current height vs historical average height
current_height = height
if track_id in track_history:
historical_heights = [h['height'] for h in track_history[track_id][-30:]]
avg_height = np.mean(historical_heights) if historical_heights else current_height
height_ratio = current_height / max(avg_height, 1)
else:
height_ratio = 1.0
# 4. Center of mass (vertical)
com_y = np.mean([kp[1] for kp in keypoints])
ground_y = max(all_y) # lowest point
com_to_ground_ratio = (ground_y - com_y) / max(height, 1)
# Fall scoring (weighted combination)
score = 0.0
score += sigmoid((torso_angle_from_vertical - 45) / 15) * 0.30 # 30% weight
score += sigmoid((aspect_ratio - 1.2) / 0.4) * 0.25 # 25% weight
score += sigmoid((1.0 - height_ratio - 0.3) / 0.2) * 0.25 # 25% weight
score += sigmoid((0.3 - com_to_ground_ratio) / 0.1) * 0.20 # 20% weight
is_fall = score > 0.75
# Bending suppression
if aspect_ratio < 1.2 and height_ratio > 0.6:
is_fall = False
score *= 0.3 # Reduce confidence
features = {
'torso_angle_from_vertical': torso_angle_from_vertical,
'aspect_ratio': aspect_ratio,
'height_ratio': height_ratio,
'com_to_ground_ratio': com_to_ground_ratio,
'avg_keypoint_confidence': avg_confidence
}
return is_fall, score, features
Configuration Parameters
| Parameter | Type | Default | Range | Description |
|---|---|---|---|---|
enabled |
bool | true | — | Enable/disable module |
fall_score_threshold |
float | 0.75 | 0.0 - 1.0 | Minimum fall classification score |
min_keypoint_confidence |
float | 0.30 | 0.0 - 1.0 | Minimum avg keypoint confidence |
torso_angle_threshold_deg |
float | 45 | 15 - 75 | Degrees from vertical to consider fall |
aspect_ratio_threshold |
float | 1.2 | 1.0 - 3.0 | Width/height ratio threshold |
height_ratio_threshold |
float | 0.6 | 0.3 - 0.9 | Current height / avg height threshold |
temporal_confirmation_ms |
int | 1000 | 500 - 5000 | Milliseconds fall must persist |
bending_suppression |
bool | true | — | Reduce false positives from bending |
Severity: HIGH
Rationale: Falls represent immediate safety concerns and require urgent response.
2.6 Abandoned Object Detection
Description
Detects when an object is left unattended for an extended period. Uses background subtraction and frame differencing combined with object detection.
Algorithm
Inputs:
- current_frame: raw image
- object_detections: list of (bbox, class, confidence) from base pipeline
- person_detections: list of (bbox, track_id, confidence)
- background_model: accumulated background reference
- unattended_time_threshold_seconds: int (default 60)
Processing:
1. Background Subtraction:
a. Compute foreground mask: fg_mask = |current_frame - background_model| > threshold
b. Apply morphological operations (open + close) to clean mask
c. Find connected components in fg_mask
2. Object-Person Association:
a. For each detected object (backpack, suitcase, box, bag):
- Compute distance to nearest person bbox
- If distance < proximity_threshold_px: object is "attended"
- Else: object is "unattended"
3. Temporal Tracking of Unattended Objects:
a. Match current unattended objects to previously tracked ones (IoU-based)
b. For each tracked object:
- If still unattended: increment unattended_duration
- If attended again: reset timer
- If moved significantly: reset timer
4. Trigger Condition:
- unattended_duration >= unattended_time_threshold_seconds
- Object class is in watchlist (backpack, suitcase, box, bag, package)
- Object remains stationary (centroid shift < 10 pixels)
5. Abandoned object event:
{
event_type: "ABANDONED_OBJECT",
object_class: <class_name>,
object_id: <tracking_id>,
unattended_duration_seconds: <float>,
owner_track_id: <track_id_or_null>,
detection_confidence: <float>,
severity: "HIGH",
...
}
Background Subtraction Pipeline
class BackgroundSubtractionModule:
def __init__(self, learn_rate=0.005, history=500):
self.bg_subtractor = cv2.createBackgroundSubtractorMOG2(
history=history,
varThreshold=16,
detectShadows=True
)
self.learn_rate = learn_rate
self.stable_bg = None
self.frame_count = 0
def process(self, frame):
# Apply background subtractor
fg_mask = self.bg_subtractor.apply(frame, learningRate=self.learn_rate)
# Shadow removal (shadows = 127 in MOG2 output)
_, fg_mask = cv2.threshold(fg_mask, 200, 255, cv2.THRESH_BINARY)
# Morphological cleaning
kernel_open = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3))
kernel_close = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (7, 7))
fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_OPEN, kernel_open)
fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_CLOSE, kernel_close)
self.frame_count += 1
if self.frame_count == history:
# Capture stable background for reference
self.stable_bg = self.bg_subtractor.getBackgroundImage()
return fg_mask
def detect_abandoned_objects(current_frame, detections, bg_mask, object_tracker,
unattended_threshold=60):
"""
Main abandoned object detection logic.
"""
abandoned_events = []
watchlist_classes = {'backpack', 'suitcase', 'box', 'bag', 'package', 'luggage'}
for det in detections:
if det.class_name not in watchlist_classes:
continue
# Check if object is in foreground
obj_mask = extract_mask_region(bg_mask, det.bbox)
fg_ratio = np.sum(obj_mask > 0) / obj_mask.size
if fg_ratio < 0.3:
continue # Object is part of background, not newly introduced
# Check proximity to persons
nearest_person_distance = min_person_distance(det.bbox, person_detections)
is_attended = nearest_person_distance < 100 # pixels
# Track object state
obj_state = object_tracker.update(det, is_attended)
if not is_attended and obj_state.unattended_duration >= unattended_threshold:
abandoned_events.append({
'event_type': 'ABANDONED_OBJECT',
'object_class': det.class_name,
'unattended_duration': obj_state.unattended_duration,
'bbox': det.bbox,
'confidence': det.confidence
})
return abandoned_events
Configuration Parameters
| Parameter | Type | Default | Range | Description |
|---|---|---|---|---|
enabled |
bool | true | — | Enable/disable module |
unattended_time_threshold_seconds |
int | 60 | 10 - 600 | Seconds before object is considered abandoned |
proximity_threshold_pixels |
int | 100 | 30 - 300 | Max distance (px) to nearest person for "attended" |
watchlist_classes |
list | ["backpack", "suitcase", "box", "bag"] | — | Object classes to monitor |
fg_ratio_threshold |
float | 0.3 | 0.1 - 0.9 | Min foreground ratio in object region |
bg_history_frames |
int | 500 | 100 - 2000 | Background model history length |
bg_learning_rate |
float | 0.005 | 0.001 - 0.05 | Background model learning rate |
stationary_threshold_pixels |
int | 10 | 5 - 50 | Max centroid shift to count as stationary |
Severity: HIGH
Rationale: Abandoned objects in industrial environments may be explosive devices, hazardous materials, or stolen goods.
2.7 Unusual After-Hours Presence
Description
Generates an alert for any person detected during night hours, separate from watchlist-based alerts. This is a catch-all detection for unexpected human presence.
Algorithm
Inputs:
- person_detections: list of (bbox, track_id, confidence)
- is_night_mode: boolean (from Night Mode Scheduler)
- authorized_personnel_db: list of known personnel (optional, for suppression)
- detection_confidence_threshold: float (default 0.60)
- min_detection_frames: int (default 10)
Processing:
1. If not is_night_mode: return NO_EVENT (unless override enabled)
2. For each person detection:
a. If confidence < detection_confidence_threshold: skip
b. Check if track_id is in authorized_personnel (if DB available)
c. Require track to be visible for at least min_detection_frames
d. Check if track has already triggered an alert (deduplication)
3. After-hours presence event:
{
event_type: "AFTER_HOURS_PRESENCE",
track_id: <id>,
detection_confidence: <float>,
is_authorized: <bool_or_null>,
first_seen_timestamp: <ISO8601>,
duration_seconds: <float>,
severity: "LOW", # upgraded to MEDIUM if in restricted zone
...
}
Severity Escalation
def assign_after_hours_severity(event, current_zone_occupancy):
"""Escalate severity based on location context."""
base_severity = "LOW"
# Escalate if in restricted zone
if event['zone_id'] and event['zone_id'] in RESTRICTED_ZONE_IDS:
base_severity = "MEDIUM"
# Escalate if loitering simultaneously detected
if event.get('concurrent_loitering', False):
base_severity = "MEDIUM"
# Escalate if multiple after-hours persons detected
if event.get('concurrent_presence_count', 1) >= 3:
base_severity = "MEDIUM"
return base_severity
Configuration Parameters
| Parameter | Type | Default | Range | Description |
|---|---|---|---|---|
enabled |
bool | true | — | Enable/disable module |
detection_confidence_threshold |
float | 0.60 | 0.0 - 1.0 | Minimum person detection confidence |
min_detection_frames |
int | 10 | 1 - 60 | Frames of continuous detection before alert |
check_authorized_personnel |
bool | false | — | Cross-reference with authorized personnel DB |
escalate_in_restricted_zones |
bool | true | — | Upgrade severity in restricted zones |
alert_per_track |
bool | true | — | One alert per unique track_id |
cooldown_per_zone_seconds |
int | 300 | 0 - 3600 | Cooldown for same zone |
Severity: LOW (escalatable to MEDIUM)
Rationale: After-hours presence may be completely legitimate (authorized worker); escalation rules prevent alert fatigue while maintaining coverage.
2.8 Zone Breach
Description
Detects when a person crosses a predefined boundary line in a specific direction. Supports virtual tripwires for entry/exit monitoring.
Algorithm
Inputs:
- track_history: dict of track positions over time
- boundary_lines: list of line definitions {
line_id, point_a(x1,y1), point_b(x2,y2),
allowed_direction: "both" | "a_to_b" | "b_to_a",
trigger_on: "cross" | "enter" | "exit"
}
- crossing_threshold_pixels: int (default 20)
Processing:
1. For each active track_id:
a. Get previous position P_prev and current position P_curr (centroids)
b. For each boundary_line:
i. Check if segment [P_prev → P_curr] intersects boundary_line
ii. Compute intersection point
iii. Determine crossing direction:
- direction_vector = normalize(P_curr - P_prev)
- line_normal = perpendicular(line_vector)
- dot_product = direction_vector · line_normal
- if dot_product > 0: crossing A→B, else B→A
iv. If crossing_direction matches allowed_direction:
→ Trigger ZONE_BREACH
2. Hysteresis: don't re-trigger same track on same line for cooldown period
3. Zone breach event:
{
event_type: "ZONE_BREACH",
track_id: <id>,
line_id: <line_identifier>,
crossing_direction: "a_to_b" | "b_to_a",
crossing_point: [x, y],
severity: <configurable>,
...
}
Line-Crossing Geometry
def check_line_crossing(p_prev, p_curr, line_a, line_b):
"""
Check if segment p_prev→p_curr crosses line_a→line_b.
Returns: (did_cross, intersection_point, direction)
"""
from shapely.geometry import LineString, Point
path_segment = LineString([p_prev, p_curr])
boundary = LineString([line_a, line_b])
if not path_segment.intersects(boundary):
return False, None, None
intersection = path_segment.intersection(boundary)
if intersection.is_empty:
return False, None, None
# Determine direction using cross product
line_vec = (line_b[0] - line_a[0], line_b[1] - line_a[1])
move_vec = (p_curr[0] - p_prev[0], p_curr[1] - p_prev[1])
cross = line_vec[0] * move_vec[1] - line_vec[1] * move_vec[0]
direction = "a_to_b" if cross > 0 else "b_to_a"
return True, (intersection.x, intersection.y), direction
class ZoneBreachDetector:
def __init__(self, boundary_lines):
self.lines = boundary_lines
self.crossed_state = {} # (track_id, line_id) → last_cross_time
def evaluate(self, track_id, position_history):
if len(position_history) < 2:
return []
p_prev = position_history[-2][:2]
p_curr = position_history[-1][:2]
events = []
now = position_history[-1][0]
for line in self.lines:
line_key = (track_id, line['line_id'])
# Cooldown check
if line_key in self.crossed_state:
if now - self.crossed_state[line_key] < line.get('cooldown_seconds', 30):
continue
did_cross, cross_point, direction = check_line_crossing(
p_prev, p_curr, line['point_a'], line['point_b']
)
if did_cross:
allowed = line.get('allowed_direction', 'both')
if allowed == 'both' or allowed == direction:
events.append({
'event_type': 'ZONE_BREACH',
'track_id': track_id,
'line_id': line['line_id'],
'direction': direction,
'crossing_point': cross_point,
'timestamp': now,
'severity': line.get('severity', 'MEDIUM')
})
self.crossed_state[line_key] = now
return events
Configuration Parameters
| Parameter | Type | Default | Range | Description |
|---|---|---|---|---|
enabled |
bool | true | — | Enable/disable module |
boundary_lines |
list | [] | — | Each line: {line_id, point_a, point_b, allowed_direction, severity} |
crossing_threshold_pixels |
int | 20 | 5 - 100 | Min movement across line to register |
cooldown_seconds |
int | 30 | 0 - 3600 | Cooldown per (track, line) |
default_severity |
string | "MEDIUM" | LOW/MEDIUM/HIGH | Default severity for breaches |
require_continuous_tracking |
bool | true | — | Track must not be lost during crossing |
Severity: Configurable (default MEDIUM)
Rationale: Zone breach severity depends heavily on context — crossing into a server room is HIGH, while crossing a general lobby boundary may be LOW.
2.9 Repeated Re-entry Patterns
Description
Detects when the same person enters and exits a defined area multiple times within a time window, which may indicate surveillance, probing, or unusual behavior patterns.
Algorithm
Inputs:
- track_history: dict of track positions
- reentry_zone: polygon defining the monitored area
- time_window_seconds: int (default 600 = 10 minutes)
- reentry_threshold: int (default 3) — min entry/exit cycles
- entry_exit_line: boundary line for counting entries/exits
Processing:
1. For each active track_id:
a. Maintain entry/exit history: list of (timestamp, type: "entry"|"exit")
b. Detect entry when track centroid crosses into reentry_zone (was outside, now inside)
c. Detect exit when track centroid crosses out of reentry_zone (was inside, now outside)
d. Store events with timestamps
2. Pattern Evaluation:
a. For each track, examine all entry/exit events in last time_window_seconds
b. Count complete cycles: entry → exit = 1 cycle
c. If cycle_count >= reentry_threshold:
→ Trigger REENTRY_PATTERN alert
3. Re-entry event:
{
event_type: "REENTRY_PATTERN",
track_id: <id>,
cycle_count: <int>,
time_window_seconds: <int>,
first_entry_time: <ISO8601>,
last_exit_time: <ISO8601>,
avg_cycle_duration_seconds: <float>,
severity: "MEDIUM",
...
}
State Machine for Entry/Exit Tracking
class ReentryTracker:
def __init__(self, zone_polygon, time_window=600, threshold=3):
self.zone = Polygon(zone_polygon)
self.time_window = time_window
self.threshold = threshold
self.track_states = {} # track_id → {"inside": bool, "history": [...]}
def update(self, track_id, centroid, timestamp):
events = []
is_inside = self.zone.contains(Point(centroid))
if track_id not in self.track_states:
self.track_states[track_id] = {
'inside': is_inside,
'transitions': []
}
return events
state = self.track_states[track_id]
# Detect transitions
if not state['inside'] and is_inside:
# Entry detected
state['transitions'].append({'time': timestamp, 'type': 'entry'})
state['inside'] = True
elif state['inside'] and not is_inside:
# Exit detected
state['transitions'].append({'time': timestamp, 'type': 'exit'})
state['inside'] = False
# Evaluate pattern
cycles = self._count_cycles(state['transitions'], timestamp)
if cycles >= self.threshold:
events.append({
'event_type': 'REENTRY_PATTERN',
'track_id': track_id,
'cycle_count': cycles,
'transitions': state['transitions'].copy()
})
# Reset after alert to allow re-detection
state['transitions'] = []
return events
def _count_cycles(self, transitions, current_time):
"""Count complete entry→exit cycles in time window."""
cutoff = current_time - self.time_window
recent = [t for t in transitions if t['time'] >= cutoff]
cycles = 0
looking_for_exit = False
for t in recent:
if t['type'] == 'entry' and not looking_for_exit:
looking_for_exit = True
elif t['type'] == 'exit' and looking_for_exit:
cycles += 1
looking_for_exit = False
return cycles
Configuration Parameters
| Parameter | Type | Default | Range | Description |
|---|---|---|---|---|
enabled |
bool | true | — | Enable/disable module |
reentry_zone |
polygon | required | — | Polygon defining the monitored area |
time_window_seconds |
int | 600 | 60 - 3600 | Time window for counting re-entries |
reentry_threshold |
int | 3 | 2 - 10 | Min entry/exit cycles to trigger |
cooldown_seconds |
int | 300 | 0 - 3600 | Cooldown after alert |
min_cycle_duration_seconds |
int | 30 | 5 - 300 | Min duration of one cycle (prevent flicker) |
Severity: MEDIUM
Rationale: Repeated re-entry is suspicious behavior that warrants investigation but may also be legitimate work activity.
2.10 Suspicious Dwell Time
Description
Detects when a person remains near sensitive areas (doors, equipment, storage zones) for longer than a zone-specific threshold. Different from general loitering (Section 2.2) which applies anywhere.
Algorithm
Inputs:
- track_history: dict of track positions
- sensitive_zones: list of {
zone_id, polygon, dwell_threshold_seconds,
is_sensitive (bool), alert_severity
}
- detection_confidence: float
Processing:
1. For each active track_id:
a. Compute current centroid position
b. For each sensitive_zone:
i. If centroid is inside zone:
- Increment dwell timer for (track_id, zone_id)
- Track continuous presence (reset on exit)
ii. If dwell_timer >= zone.dwell_threshold_seconds:
→ Trigger SUSPICIOUS_DWELL event
iii. If track exits zone: reset dwell_timer for (track_id, zone_id)
2. Multi-zone handling:
- A track can have independent dwell timers for multiple zones
- Alert is generated per (track, zone) pair
3. Dwell event:
{
event_type: "SUSPICIOUS_DWELL",
track_id: <id>,
zone_id: <zone_id>,
dwell_duration_seconds: <float>,
zone_sensitivity_level: <string>,
severity: <zone-configurable>,
...
}
Dwell Timer Management
class SuspiciousDwellDetector:
def __init__(self, sensitive_zones):
"""
sensitive_zones: list of zone configs with dwell thresholds.
"""
self.zones = sensitive_zones
# dwell_timers: {(track_id, zone_id): {
# 'start_time': float, 'accumulated': float, 'last_seen': float
# }}
self.dwell_timers = {}
self.alerted_pairs = set() # (track_id, zone_id) that already triggered
def update(self, track_id, centroid, timestamp):
events = []
for zone in self.zones:
zone_id = zone['zone_id']
zone_poly = zone['polygon']
timer_key = (track_id, zone_id)
is_inside = zone_poly.contains(Point(centroid))
if is_inside:
if timer_key not in self.dwell_timers:
self.dwell_timers[timer_key] = {
'start_time': timestamp,
'accumulated': 0.0,
'last_seen': timestamp
}
else:
# Accumulate dwell time
dt = timestamp - self.dwell_timers[timer_key]['last_seen']
if dt < MAX_GAP_SECONDS: # Allow brief disappearance
self.dwell_timers[timer_key]['accumulated'] += dt
self.dwell_timers[timer_key]['last_seen'] = timestamp
# Check threshold
accumulated = self.dwell_timers[timer_key]['accumulated']
threshold = zone['dwell_threshold_seconds']
if accumulated >= threshold and timer_key not in self.alerted_pairs:
events.append({
'event_type': 'SUSPICIOUS_DWELL',
'track_id': track_id,
'zone_id': zone_id,
'dwell_duration': accumulated,
'zone_type': zone.get('zone_type', 'generic'),
'severity': zone.get('severity', 'MEDIUM')
})
self.alerted_pairs.add(timer_key)
else:
# Track exited zone — reset timer
if timer_key in self.dwell_timers:
del self.dwell_timers[timer_key]
if timer_key in self.alerted_pairs:
self.alerted_pairs.remove(timer_key)
return events
Configuration Parameters
| Parameter | Type | Default | Range | Description |
|---|---|---|---|---|
enabled |
bool | true | — | Enable/disable module |
sensitive_zones |
list | required | — | Zones with specific dwell thresholds |
default_dwell_threshold_seconds |
int | 120 | 10 - 1800 | Default threshold for zones without specific config |
max_gap_seconds |
float | 5.0 | 1.0 - 30.0 | Max disappearance gap before timer reset |
zone_severity_map |
dict | {} | — | Override severity per zone |
cooldown_seconds |
int | 180 | 0 - 3600 | Cooldown before re-alerting same track in same zone |
Predefined Sensitive Zone Types
| Zone Type | Default Threshold | Default Severity | Examples |
|---|---|---|---|
main_entrance |
60s | MEDIUM | Lobby doors, revolving doors |
emergency_exit |
30s | HIGH | Fire exits, emergency doors |
equipment_room |
45s | HIGH | Server room, electrical panel |
storage_area |
120s | MEDIUM | Warehouse shelves, supply closet |
elevator_bank |
90s | LOW | Elevator waiting areas |
parking_access |
60s | MEDIUM | Garage entry points |
Severity: MEDIUM (configurable per zone, up to HIGH)
Rationale: Dwell time near sensitive equipment or access points is context-dependent; severity scales with zone criticality.
3. Activity Scoring Engine
3.1 Composite Suspicious Activity Score
The Activity Scoring Engine combines signals from all detection modules into a single composite suspiciousness score per camera. This score enables:
- Unified threat assessment across all detection dimensions
- Escalation triggering when multiple lower-severity events converge
- Trend analysis for security dashboard visualization
- Intelligent alert suppression when overall threat is low despite individual triggers
3.2 Score Formula
S_total(t) = sum_over_modules(weight_i * signal_i(t) * decay(t - t_i)) + bonus_cross_module
Where:
- weight_i: module-specific weight (see table below)
- signal_i(t): normalized signal value from module i at time t [0, 1]
- decay(delta_t): exponential time-decay function
- bonus_cross_module: extra score when multiple modules fire simultaneously
- t_i: timestamp of most recent event from module i
3.3 Module Weights and Signal Definitions
| Module | Weight | Signal Source | Signal Range | Max Contribution |
|---|---|---|---|---|
| Intrusion Detection | 0.25 | overlap_ratio * confidence | 0.0 - 1.0 | 0.25 |
| Loitering Detection | 0.15 | dwell_ratio (dwell_time / threshold) | 0.0 - 1.0+ | 0.15 |
| Running Detection | 0.10 | speed_ratio (speed / threshold) normalized | 0.0 - 1.0+ | 0.10 |
| Crowding Detection | 0.12 | crowd_density_score | 0.0 - 1.0 | 0.12 |
| Fall Detection | 0.20 | fall_confidence_score | 0.0 - 1.0 | 0.20 |
| Abandoned Object | 0.18 | unattended_ratio (duration / threshold) | 0.0 - 1.0+ | 0.18 |
| After-Hours Presence | 0.05 | binary (1 if detected) * zone_severity_multiplier | 0.0 - 1.0 | 0.05 |
| Zone Breach | 0.12 | severity_mapped (LOW=0.3, MED=0.6, HIGH=1.0) | 0.0 - 1.0 | 0.12 |
| Re-entry Patterns | 0.10 | cycle_ratio (count / threshold) | 0.0 - 1.0+ | 0.10 |
| Suspicious Dwell | 0.13 | dwell_ratio (duration / zone_threshold) | 0.0 - 1.0+ | 0.13 |
Note: Weights sum to 1.40 — this is intentional to allow cross-module amplification. Normalized scores above 1.0 for individual modules are clipped at 1.0.
3.4 Time-Decay Function
def time_decay(delta_t_seconds, half_life=300):
"""
Exponential decay with 5-minute half-life by default.
Events older than 30 minutes contribute < 1%.
"""
import math
return math.exp(-0.693 * delta_t_seconds / half_life)
# Decay table:
# 0 min → 1.000
# 1 min → 0.871
# 5 min → 0.500
# 10 min → 0.250
# 20 min → 0.063
# 30 min → 0.016
3.5 Cross-Module Amplification Bonus
When multiple detection modules fire simultaneously for the same track or in close proximity, a cross-module bonus is applied:
def compute_cross_module_bonus(active_signals, proximity_weight=0.15):
"""
Apply bonus when multiple modules detect simultaneously.
active_signals: list of (module_id, signal_value, track_id, zone_id)
"""
n_modules = len(active_signals)
if n_modules <= 1:
return 0.0
# Base bonus: +15% per additional module (beyond the first)
base_bonus = proximity_weight * (n_modules - 1)
# Track overlap bonus: if signals share the same track_id
track_groups = group_by_track(active_signals)
track_bonus = 0.0
for track_id, signals in track_groups.items():
if len(signals) >= 2:
# Same person triggering multiple rules → higher threat
track_bonus += 0.10 * (len(signals) - 1)
# Zone overlap bonus: if signals are in the same zone
zone_groups = group_by_zone(active_signals)
zone_bonus = 0.0
for zone_id, signals in zone_groups.items():
if len(signals) >= 2:
zone_bonus += 0.08 * (len(signals) - 1)
return min(base_bonus + track_bonus + zone_bonus, 0.50) # Cap at +0.50
3.6 Escalation Thresholds
| Score Range | Threat Level | Action |
|---|---|---|
| 0.00 - 0.20 | NONE | Log only, no alert |
| 0.20 - 0.40 | LOW | Log + dashboard indicator |
| 0.40 - 0.60 | MEDIUM | Log + alert dispatch (non-urgent) |
| 0.60 - 0.80 | HIGH | Log + immediate alert + highlight |
| 0.80 - 1.00 | CRITICAL | Log + immediate alert + SMS/email + security dispatch recommendation |
| > 1.00 | EMERGENCY | All channels + automatic escalation to security team lead |
3.7 Score Computation Pseudocode
class ActivityScoringEngine:
def __init__(self, config):
self.weights = config['module_weights']
self.decay_half_life = config.get('decay_half_life_seconds', 300)
self.event_history = deque(maxlen=1000) # Recent events with timestamps
self.score_history = deque(maxlen=3600) # Score over last hour
def compute_score(self, current_time, camera_id):
"""Compute composite suspicious activity score for a camera."""
# Get recent events for this camera (last 30 minutes)
recent_events = [
e for e in self.event_history
if e['camera_id'] == camera_id
and (current_time - e['timestamp']) < 1800
]
active_signals = []
weighted_sum = 0.0
for event in recent_events:
module = event['event_type']
weight = self.weights.get(module, 0.05)
signal = self._normalize_signal(event)
age = current_time - event['timestamp']
decay = time_decay(age, self.decay_half_life)
contribution = weight * signal * decay
weighted_sum += contribution
active_signals.append({
'module': module,
'signal': signal,
'track_id': event.get('track_id'),
'zone_id': event.get('zone_id'),
'decayed_contribution': contribution
})
# Cross-module bonus
cross_bonus = compute_cross_module_bonus(active_signals)
total_score = min(weighted_sum + cross_bonus, 1.5) # Cap at 1.5
# Store for history
self.score_history.append({
'timestamp': current_time,
'camera_id': camera_id,
'score': total_score,
'components': active_signals
})
return total_score, self._classify_threat(total_score)
def _normalize_signal(self, event):
"""Normalize raw event data to [0, 1] signal value."""
event_type = event['event_type']
normalizers = {
'INTRUSION': lambda e: min(e.get('overlap_ratio', 0) * e.get('confidence', 0) / 0.65, 1.0),
'LOITERING': lambda e: min(e.get('dwell_time_seconds', 0) / e.get('threshold', 300), 1.0),
'RUNNING': lambda e: min(e.get('speed_percentile_90', 0) / e.get('threshold', 150), 1.0),
'CROWDING': lambda e: min(e.get('density_score', 0) / 0.5, 1.0),
'FALL': lambda e: e.get('fall_confidence', 0),
'ABANDONED_OBJECT': lambda e: min(e.get('unattended_duration', 0) / e.get('threshold', 60), 1.0),
'AFTER_HOURS_PRESENCE': lambda e: 1.0 if e.get('is_unauthorized', True) else 0.3,
'ZONE_BREACH': lambda e: {'LOW': 0.3, 'MEDIUM': 0.6, 'HIGH': 1.0}.get(e.get('severity', 'MEDIUM'), 0.6),
'REENTRY_PATTERN': lambda e: min(e.get('cycle_count', 0) / e.get('threshold', 3), 1.0),
'SUSPICIOUS_DWELL': lambda e: min(e.get('dwell_duration', 0) / e.get('threshold', 120), 1.0)
}
normalizer = normalizers.get(event_type, lambda e: 0.5)
return normalizer(event)
def _classify_threat(self, score):
if score >= 1.0: return 'EMERGENCY'
if score >= 0.80: return 'CRITICAL'
if score >= 0.60: return 'HIGH'
if score >= 0.40: return 'MEDIUM'
if score >= 0.20: return 'LOW'
return 'NONE'
3.8 Score Visualization (Per Camera)
The score is exposed as a time series for dashboard visualization:
- Current Score: Real-time value (updated every second)
- 5-Minute Average: Smoothed trend
- Peak Score: Maximum in last hour
- Alert Count: Number of alerts generated in current shift
4. Configuration Schema
4.1 Full Configuration Schema (YAML)
# ============================================================
# Suspicious Activity Detection — Master Configuration
# ============================================================
system:
name: "Night-Mode Surveillance"
version: "1.0"
max_channels: 8
processing_fps: 25
frame_resolution: [1920, 1080]
night_mode_schedule:
enabled: true
start_time: "22:00"
end_time: "06:00"
timezone: "local"
gradual_transition_minutes: 15 # Ramp sensitivity over 15 min
override_manual: false # Admin can manually toggle
# ============================================================
# Activity Scoring Engine
# ============================================================
scoring_engine:
decay_half_life_seconds: 300
max_history_minutes: 30
module_weights:
INTRUSION: 0.25
LOITERING: 0.15
RUNNING: 0.10
CROWDING: 0.12
FALL: 0.20
ABANDONED_OBJECT: 0.18
AFTER_HOURS_PRESENCE: 0.05
ZONE_BREACH: 0.12
REENTRY_PATTERN: 0.10
SUSPICIOUS_DWELL: 0.13
escalation_thresholds:
NONE: 0.00
LOW: 0.20
MEDIUM: 0.40
HIGH: 0.60
CRITICAL: 0.80
EMERGENCY: 1.00
cross_module_bonus:
enabled: true
proximity_weight: 0.15
max_bonus: 0.50
# ============================================================
# Alert Generation
# ============================================================
alert_manager:
suppression_enabled: true
default_suppression_minutes: 5
max_alerts_per_hour_per_camera: 20
evidence_capture:
snapshot_enabled: true
snapshot_frames_before: 5
snapshot_frames_after: 10
clip_enabled: true
clip_duration_seconds: 10
clip_pre_buffer_seconds: 5
severity_actions:
LOW:
log: true
dashboard: true
notification: false
email: false
sms: false
MEDIUM:
log: true
dashboard: true
notification: true
email: false
sms: false
HIGH:
log: true
dashboard: true
notification: true
email: true
sms: true
CRITICAL:
log: true
dashboard: true
notification: true
email: true
sms: true
auto_dispatch: true
# ============================================================
# Per-Camera Configuration Template
# ============================================================
cameras:
# Camera 1 — Main Entrance
cam_01:
enabled: true
location: "Main Entrance Lobby"
night_mode:
enabled: true
custom_schedule: null # Use system default
sensitivity_multiplier: 1.0
# Restricted Zones (Intrusion Detection)
intrusion_detection:
enabled: true
confidence_threshold: 0.65
overlap_threshold: 0.30
cooldown_seconds: 30
restricted_zones:
- zone_id: "server_room_door"
name: "Server Room Entry"
polygon:
- [0.65, 0.20]
- [0.85, 0.20]
- [0.85, 0.60]
- [0.65, 0.60]
severity: "HIGH"
- zone_id: "admin_office"
name: "Administration Office"
polygon:
- [0.10, 0.30]
- [0.40, 0.30]
- [0.40, 0.80]
- [0.10, 0.80]
severity: "HIGH"
# Loitering Detection
loitering_detection:
enabled: true
dwell_time_threshold_seconds: 300
movement_tolerance_pixels: 50
loitering_zones: [] # Empty = entire frame
consecutive_confirmations: 3
cooldown_seconds: 60
# Running Detection
running_detection:
enabled: true
speed_threshold_pixels_per_second: 150
speed_threshold_kmh: 15.0
confirmation_duration_seconds: 2.0
speed_percentile: 90
min_track_history_seconds: 1.0
exclude_falling_state: true
calibration:
pixels_per_meter_at_center: 45.0
homography_matrix: null # Set if calibrated
# Crowding Detection
crowding_detection:
enabled: true
count_threshold: 3
area_threshold: 0.15
density_threshold: 0.05
confirmation_frames: 5
use_dbscan: true
dbscan_eps: 0.08
cooldown_seconds: 60
# Fall Detection
fall_detection:
enabled: true
fall_score_threshold: 0.75
min_keypoint_confidence: 0.30
torso_angle_threshold_deg: 45
aspect_ratio_threshold: 1.2
height_ratio_threshold: 0.6
temporal_confirmation_ms: 1000
bending_suppression: true
# Abandoned Object Detection
abandoned_object_detection:
enabled: true
unattended_time_threshold_seconds: 60
proximity_threshold_pixels: 100
watchlist_classes: ["backpack", "suitcase", "box", "bag", "package"]
fg_ratio_threshold: 0.3
bg_history_frames: 500
bg_learning_rate: 0.005
stationary_threshold_pixels: 10
# After-Hours Presence
after_hours_presence:
enabled: true
detection_confidence_threshold: 0.60
min_detection_frames: 10
check_authorized_personnel: false
escalate_in_restricted_zones: true
alert_per_track: true
cooldown_per_zone_seconds: 300
# Zone Breach
zone_breach:
enabled: true
boundary_lines:
- line_id: "lobby_entry"
name: "Lobby Entry Line"
point_a: [0.0, 0.5]
point_b: [1.0, 0.5]
allowed_direction: "both"
trigger_on: "cross"
severity: "MEDIUM"
cooldown_seconds: 30
- line_id: "secure_corridor"
name: "Secure Corridor Entry"
point_a: [0.5, 0.0]
point_b: [0.5, 1.0]
allowed_direction: "b_to_a" # Only flag entering from B side
trigger_on: "cross"
severity: "HIGH"
cooldown_seconds: 60
# Repeated Re-entry
reentry_detection:
enabled: true
reentry_zone:
zone_id: "lobby_area"
polygon:
- [0.0, 0.0]
- [1.0, 0.0]
- [1.0, 1.0]
- [0.0, 1.0]
time_window_seconds: 600
reentry_threshold: 3
cooldown_seconds: 300
min_cycle_duration_seconds: 30
# Suspicious Dwell Time
suspicious_dwell:
enabled: true
sensitive_zones:
- zone_id: "main_door"
zone_type: "main_entrance"
dwell_threshold_seconds: 60
severity: "MEDIUM"
polygon:
- [0.40, 0.10]
- [0.60, 0.10]
- [0.60, 0.40]
- [0.40, 0.40]
- zone_id: "equipment_panel"
zone_type: "equipment_room"
dwell_threshold_seconds: 45
severity: "HIGH"
polygon:
- [0.70, 0.50]
- [0.90, 0.50]
- [0.90, 0.80]
- [0.70, 0.80]
max_gap_seconds: 5.0
# Camera 2 — Warehouse Floor
cam_02:
enabled: true
location: "Warehouse Floor North"
intrusion_detection:
enabled: true
confidence_threshold: 0.60 # Slightly lower for wide area
overlap_threshold: 0.25
restricted_zones:
- zone_id: "high_value_storage"
polygon:
- [0.20, 0.20]
- [0.60, 0.20]
- [0.60, 0.70]
- [0.20, 0.70]
severity: "HIGH"
# ... (same structure, camera-specific values)
# Cameras 3-8 follow same pattern
4.2 JSON Schema for API Configuration
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "SuspiciousActivityConfig",
"type": "object",
"properties": {
"system": {
"type": "object",
"properties": {
"night_mode_schedule": {
"type": "object",
"properties": {
"enabled": {"type": "boolean", "default": true},
"start_time": {"type": "string", "pattern": "^([01]?[0-9]|2[0-3]):[0-5][0-9]$", "default": "22:00"},
"end_time": {"type": "string", "pattern": "^([01]?[0-9]|2[0-3]):[0-5][0-9]$", "default": "06:00"},
"gradual_transition_minutes": {"type": "integer", "minimum": 0, "maximum": 60, "default": 15}
}
}
}
},
"cameras": {
"type": "object",
"patternProperties": {
"^cam_[0-9]+$": {
"type": "object",
"properties": {
"enabled": {"type": "boolean", "default": true},
"intrusion_detection": {"type": "object", "properties": {
"enabled": {"type": "boolean", "default": true},
"confidence_threshold": {"type": "number", "minimum": 0, "maximum": 1, "default": 0.65},
"overlap_threshold": {"type": "number", "minimum": 0, "maximum": 1, "default": 0.30},
"restricted_zones": {
"type": "array",
"items": {
"type": "object",
"properties": {
"zone_id": {"type": "string"},
"polygon": {
"type": "array",
"items": {"type": "array", "items": {"type": "number"}, "minItems": 2, "maxItems": 2},
"minItems": 3
}
},
"required": ["zone_id", "polygon"]
}
}
}},
"loitering_detection": {"type": "object", "properties": {
"enabled": {"type": "boolean", "default": true},
"dwell_time_threshold_seconds": {"type": "integer", "minimum": 30, "maximum": 3600, "default": 300}
}},
"fall_detection": {"type": "object", "properties": {
"enabled": {"type": "boolean", "default": true},
"fall_score_threshold": {"type": "number", "minimum": 0, "maximum": 1, "default": 0.75}
}},
"abandoned_object_detection": {"type": "object", "properties": {
"enabled": {"type": "boolean", "default": true},
"unattended_time_threshold_seconds": {"type": "integer", "minimum": 10, "maximum": 600, "default": 60}
}},
"zone_breach": {"type": "object", "properties": {
"enabled": {"type": "boolean", "default": true},
"boundary_lines": {
"type": "array",
"items": {
"type": "object",
"properties": {
"line_id": {"type": "string"},
"point_a": {"type": "array", "items": {"type": "number"}, "minItems": 2, "maxItems": 2},
"point_b": {"type": "array", "items": {"type": "number"}, "minItems": 2, "maxItems": 2},
"allowed_direction": {"type": "string", "enum": ["both", "a_to_b", "b_to_a"], "default": "both"},
"severity": {"type": "string", "enum": ["LOW", "MEDIUM", "HIGH"], "default": "MEDIUM"}
},
"required": ["line_id", "point_a", "point_b"]
}
}
}}
}
}
}
}
}
}
4.3 Configuration Management API
class ConfigurationManager:
"""
Runtime configuration management for suspicious activity detection.
Supports per-camera, per-rule configuration with hot-reload.
"""
def get_camera_config(self, camera_id: str) -> CameraConfig:
"""Retrieve full configuration for a camera."""
def update_rule_config(self, camera_id: str, rule_name: str, config: dict) -> bool:
"""Update configuration for a specific rule on a camera."""
def add_restricted_zone(self, camera_id: str, zone: ZoneConfig) -> str:
"""Add a new restricted zone to a camera. Returns zone_id."""
def remove_restricted_zone(self, camera_id: str, zone_id: str) -> bool:
"""Remove a restricted zone from a camera."""
def add_boundary_line(self, camera_id: str, line: BoundaryLineConfig) -> str:
"""Add a new boundary line for zone breach detection."""
def toggle_rule(self, camera_id: str, rule_name: str, enabled: bool) -> bool:
"""Enable or disable a detection rule for a camera."""
def reload_config(self) -> bool:
"""Hot-reload configuration from file without restart."""
def export_config(self) -> dict:
"""Export full configuration for backup/transfer."""
def validate_config(self, config: dict) -> list[ValidationError]:
"""Validate configuration and return list of errors."""
5. Alert Generation Logic
5.1 Alert Lifecycle
┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ DETECTED │────▶│ SUPPRESSED │────▶│ EVIDENCE │────▶│ DISPATCHED │
│ (Rule fire)│ │ (Deduplic.) │ │ (Capture) │ │ (Send alert)│
└──────────────┘ └──────────────┘ └──────────────┘ └──────────────┘
│
▼
┌──────────────┐
│ ACKNOWLEDGE │
│ or AUTO │
└──────────────┘
5.2 When to Alert vs Log
| Condition | Action | Reason |
|---|---|---|
| Detection confidence < rule minimum | Log only | Insufficient evidence |
| Threat score < LOW threshold (0.2) | Log only | Below alert threshold |
| Duplicate alert within suppression window | Log + increment counter | Prevent spam |
| Exceeded max alerts/hour for camera | Log + rate-limit flag | Prevent overflow |
| Severity LOW and no escalation | Log + dashboard | Reduce noise |
| Severity MEDIUM or higher | Full alert pipeline | Actionable event |
| Multiple concurrent detections (score spike) | Immediate alert | Convergent threat |
5.3 Suppression Rules
class AlertSuppressor:
def __init__(self):
# suppression_log: {(camera_id, rule_type, track_id, zone_id): last_alert_time}
self.suppression_log = {}
self.hourly_counters = {} # (camera_id, hour): count
def should_suppress(self, event) -> (bool, str):
"""
Check if an event should be suppressed.
Returns: (is_suppressed, reason)
"""
camera_id = event['camera_id']
rule_type = event['event_type']
track_id = event.get('track_id', '*')
zone_id = event.get('zone_id', '*')
# 1. Rule-specific cooldown
suppression_key = (camera_id, rule_type, track_id, zone_id)
cooldown = self._get_cooldown_seconds(rule_type, camera_id)
if suppression_key in self.suppression_log:
elapsed = time.time() - self.suppression_log[suppression_key]
if elapsed < cooldown:
return True, f"COOLDOWN: {elapsed:.0f}s / {cooldown}s"
# 2. Hourly rate limit
hour_key = (camera_id, datetime.now().hour)
current_count = self.hourly_counters.get(hour_key, 0)
max_per_hour = self._get_max_alerts_per_hour(camera_id)
if current_count >= max_per_hour:
return True, f"RATE_LIMIT: {current_count}/{max_per_hour} this hour"
# 3. Composite score gating (don't alert if overall threat is low)
if event.get('severity') == 'LOW':
current_score = scoring_engine.get_current_score(camera_id)
if current_score < 0.20:
return True, f"LOW_SEVERITY_AND_LOW_SCORE: {current_score:.2f}"
return False, "PASS"
def record_alert(self, event):
"""Record that an alert was generated."""
suppression_key = (event['camera_id'], event['event_type'],
event.get('track_id', '*'), event.get('zone_id', '*'))
self.suppression_log[suppression_key] = time.time()
hour_key = (event['camera_id'], datetime.now().hour)
self.hourly_counters[hour_key] = self.hourly_counters.get(hour_key, 0) + 1
5.4 Severity Assignment
def assign_alert_severity(detection_event, context):
"""
Assign final severity considering all context.
"""
base_severity = detection_event['severity']
score = context.get('current_composite_score', 0)
severity_levels = {'LOW': 1, 'MEDIUM': 2, 'HIGH': 3, 'CRITICAL': 4}
base_level = severity_levels.get(base_severity, 2)
# Escalation conditions
if score >= 0.80 and base_level < 3:
base_level = min(base_level + 1, 4) # Bump up one level
# Multiple concurrent detections for same track
if context.get('concurrent_detections_count', 0) >= 2:
base_level = min(base_level + 1, 4)
# Zone-specific escalation
if detection_event.get('zone_severity_override'):
zone_level = severity_levels.get(detection_event['zone_severity_override'], base_level)
base_level = max(base_level, zone_level)
reverse_levels = {v: k for k, v in severity_levels.items()}
return reverse_levels.get(base_level, 'MEDIUM')
5.5 Evidence Attachment
Every alert includes visual evidence:
class EvidenceCapture:
def __init__(self, config):
self.frame_buffer = {} # camera_id → ring buffer of recent frames
self.config = config
def capture_evidence(self, event, camera_id):
"""Capture snapshot and clip for an alert event."""
evidence = {}
# Snapshot: annotated frame at event moment
if self.config['snapshot_enabled']:
frame = self.get_frame_with_annotations(camera_id, event)
snapshot_path = self.save_snapshot(frame, event)
evidence['snapshot_path'] = snapshot_path
# Clip: video segment around event
if self.config['clip_enabled']:
pre = self.config['clip_pre_buffer_seconds']
post = self.config['clip_duration_seconds'] - pre
clip_path = self.save_clip(camera_id, event['timestamp'], pre, post, event)
evidence['clip_path'] = clip_path
# Metadata
evidence['metadata'] = {
'camera_id': camera_id,
'timestamp': event['timestamp'],
'event_type': event['event_type'],
'track_ids': event.get('track_ids', [event.get('track_id')]),
'confidence': event.get('confidence', event.get('detection_confidence', 0)),
'annotated_bboxes': self._get_annotations(event)
}
return evidence
5.6 Alert Dispatch Format
{
"alert_id": "alert_20240115_033045_cam01_001",
"timestamp": "2024-01-15T03:30:45.123Z",
"camera_id": "cam_01",
"camera_location": "Main Entrance Lobby",
"event_type": "INTRUSION",
"event_subtype": "restricted_zone_entry",
"severity": "HIGH",
"threat_score": 0.72,
"threat_level": "HIGH",
"description": "Person detected in restricted zone 'server_room_door' with 68% overlap",
"detected_objects": [
{
"track_id": 42,
"class": "person",
"confidence": 0.89,
"bbox": [820, 310, 950, 580],
"zone_overlap_ratio": 0.68,
"zone_id": "server_room_door"
}
],
"evidence": {
"snapshot_url": "/evidence/snapshots/alert_20240115_033045_cam01_001.jpg",
"clip_url": "/evidence/clips/alert_20240115_033045_cam01_001.mp4",
"metadata": {
"frame_number": 452310,
"processing_latency_ms": 145
}
},
"recommendation": "Dispatch security personnel to investigate server room entry",
"suppression_info": {
"cooldown_remaining_seconds": 0,
"alerts_this_hour": 3,
"hourly_limit": 20
}
}
6. Night Mode Scheduler
6.1 Activation Logic
class NightModeScheduler:
"""
Manages automatic activation/deactivation of suspicious activity detection
during night hours with gradual sensitivity transition.
"""
def __init__(self, config):
self.start_time = self._parse_time(config['start_time']) # default 22:00
self.end_time = self._parse_time(config['end_time']) # default 06:00
self.transition_minutes = config.get('gradual_transition_minutes', 15)
self.override_active = False
self.override_state = None # True=force on, False=force off, None=auto
def is_night_mode(self, now=None) -> (bool, float):
"""
Check if night mode should be active.
Returns: (is_active, sensitivity_multiplier)
"""
if now is None:
now = datetime.now()
# Manual override takes precedence
if self.override_state is not None:
return self.override_state, 1.0
current_time = now.time()
# Night window: 22:00 - 06:00 (spans midnight)
if self.start_time <= self.end_time:
in_window = self.start_time <= current_time <= self.end_time
else:
in_window = current_time >= self.start_time or current_time <= self.end_time
if not in_window:
return False, 0.0
# Gradual sensitivity transition
sensitivity = self._compute_transition_sensitivity(current_time)
return True, sensitivity
def _compute_transition_sensitivity(self, current_time) -> float:
"""
Compute sensitivity multiplier with ramp-up/ramp-down.
During transition periods, sensitivity gradually increases/decreases.
"""
now = datetime.combine(datetime.today(), current_time)
# Ramp-up at start of night window
start_dt = datetime.combine(datetime.today(), self.start_time)
ramp_end = start_dt + timedelta(minutes=self.transition_minutes)
if start_dt <= now <= ramp_end:
elapsed = (now - start_dt).total_seconds()
return 0.3 + 0.7 * (elapsed / (self.transition_minutes * 60))
# Ramp-down at end of night window
end_dt = datetime.combine(datetime.today(), self.end_time)
ramp_start = end_dt - timedelta(minutes=self.transition_minutes)
if ramp_start <= now <= end_dt:
remaining = (end_dt - now).total_seconds()
return 0.3 + 0.7 * (remaining / (self.transition_minutes * 60))
return 1.0 # Full sensitivity during core night hours
def force_override(self, state: bool):
"""Manually override night mode. state=True forces ON, False forces OFF."""
self.override_state = state
def release_override(self):
"""Return to automatic scheduling."""
self.override_state = None
6.2 Transition Timeline
21:45 ─┬─ Pre-night: Normal daytime mode (sensitivity 0.0)
│
22:00 ─┼─ Night mode START → Ramp-up begins
│ Sensitivity: 0.30 → 0.50 → 0.70 → 1.0
│
22:15 ─┼─ Ramp-up complete → Full night sensitivity (1.0x)
│
│ [Full night mode active — all detection modules at configured sensitivity]
│
05:45 ─┼─ Ramp-down begins → Sensitivity: 1.0 → 0.70 → 0.50 → 0.30
│
06:00 ─┼─ Night mode END → Return to daytime mode (sensitivity 0.0)
│
└─ Normal daytime mode
6.3 Per-Camera Override
# Per-camera night mode overrides
cameras:
cam_01:
night_mode:
enabled: true
custom_schedule:
start_time: "21:00" # Earlier for this camera
end_time: "07:00" # Later for this camera
sensitivity_multiplier: 1.2 # 20% more sensitive
cam_05:
night_mode:
enabled: true
custom_schedule: null # Use system default
sensitivity_multiplier: 0.8 # 20% less sensitive (e.g., always-lit area)
cam_07:
night_mode:
enabled: false # This camera is always in "day" mode (24h staffed area)
6.4 Scheduler Integration with Detection Pipeline
class NightModePipelineAdapter:
"""
Adapts detection module sensitivity based on night mode state.
"""
def __init__(self, scheduler: NightModeScheduler, config_manager):
self.scheduler = scheduler
self.config_manager = config_manager
self.current_state = {} # camera_id → (is_night, sensitivity, timestamp)
def apply_night_mode(self, camera_id):
"""
Called each frame to apply night mode adjustments.
Returns modified detection thresholds for this frame.
"""
is_night, sensitivity = self.scheduler.is_night_mode()
base_config = self.config_manager.get_camera_config(camera_id)
camera_sensitivity = base_config.get('sensitivity_multiplier', 1.0)
effective_sensitivity = sensitivity * camera_sensitivity
# Adjust thresholds inversely to sensitivity
# Higher sensitivity → lower thresholds
threshold_multiplier = 1.0 / max(effective_sensitivity, 0.1)
adjusted_config = self._scale_thresholds(base_config, threshold_multiplier)
self.current_state[camera_id] = {
'is_night': is_night,
'sensitivity': effective_sensitivity,
'threshold_multiplier': threshold_multiplier,
'timestamp': time.time()
}
return adjusted_config
def _scale_thresholds(self, config, multiplier):
"""Scale all threshold parameters by the given multiplier."""
adjusted = deepcopy(config)
threshold_fields = [
'confidence_threshold',
'overlap_threshold',
'fall_score_threshold',
'fg_ratio_threshold',
'detection_confidence_threshold'
]
for rule_name, rule_config in adjusted.items():
if isinstance(rule_config, dict):
for field in threshold_fields:
if field in rule_config:
rule_config[field] = min(rule_config[field] * multiplier, 1.0)
# Time-based thresholds scale inversely
for field in ['dwell_time_threshold_seconds',
'unattended_time_threshold_seconds',
'confirmation_duration_seconds']:
if field in rule_config:
rule_config[field] = int(rule_config[field] * (2 - multiplier))
return adjusted
7. Pipeline Integration
7.1 Integration Architecture
┌─────────────────────────────────────────────────────────────────────────────┐
│ MAIN AI PIPELINE │
│ │
│ ┌─────────────┐ ┌──────────────┐ ┌──────────────────┐ │
│ │ Frame Input │───▶│ Preprocessing│───▶│ Shared Inference │ │
│ │ (8 chans) │ │ (resize, │ │ (YOLOv8 + Track) │ │
│ │ │ │ night enh.) │ │ │ │
│ └─────────────┘ └──────────────┘ └────────┬─────────┘ │
│ │ │
│ ┌────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────────────────┐ │
│ │ SHARED DETECTION OUTPUTS │ │
│ │ ├── Person detections: [{bbox, confidence, class, track_id}] │ │
│ │ ├── Object detections: [{bbox, confidence, class}] │ │
│ │ ├── Track states: {track_id → {centroid, velocity, history}} │ │
│ │ ├── Pose keypoints: {track_id → [17x3 keypoints]} (optional) │ │
│ │ └── Frame metadata: {timestamp, camera_id, frame_number} │ │
│ └────────────────────────────────┬─────────────────────────────────────┘ │
│ │ │
│ ┌──────────────┼──────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────────────┐ ┌──────────┐ ┌────────────────────┐ │
│ │ SUSPICIOUS ACTIVITY │ │ WATCHLIST│ │ OTHER MODULES │ │
│ │ ANALYSIS LAYER │ │ MATCHING │ │ (counting, etc.) │ │
│ │ │ │ │ │ │ │
│ │ 10 Detection Rules │ │ Face/ │ │ │ │
│ │ Scoring Engine │ │ Person │ │ │ │
│ │ Alert Manager │ │ Matching │ │ │ │
│ └──────────┬───────────┘ └──────────┘ └────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────────────────┐ │
│ │ ALERT BUS / DISPATCH │ │
│ │ All alerts from all modules are normalized and routed to channels │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
7.2 Shared Detection Outputs
The suspicious activity layer consumes the following shared outputs from the base AI pipeline:
| Output | Source | Format | Used By |
|---|---|---|---|
| Person bounding boxes | YOLOv8 detection | [[x1,y1,x2,y2], confidence, track_id] |
All 10 modules |
| Object bounding boxes | YOLOv8 detection | [[x1,y1,x2,y2], confidence, class] |
Abandoned object, intrusion |
| Multi-object tracks | ByteTrack | {track_id: {centroid, bbox, history}} |
Loitering, running, re-entry, dwell, zone breach |
| Pose keypoints | YOLOv8-pose | (17, 3) per person |
Fall detection |
| Background mask | MOG2 subtractor | HxW binary mask | Abandoned object |
7.3 Detection Module Interface
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
from enum import Enum
class Severity(Enum):
LOW = "LOW"
MEDIUM = "MEDIUM"
HIGH = "HIGH"
CRITICAL = "CRITICAL"
@dataclass
class DetectionEvent:
event_type: str
camera_id: str
timestamp: float
severity: Severity
confidence: float
track_id: Optional[int] = None
zone_id: Optional[str] = None
bbox: Optional[List[float]] = None
metadata: Dict[str, Any] = None
@dataclass
class SharedDetections:
"""Input from base AI pipeline — shared across all modules."""
camera_id: str
timestamp: float
frame: Any # numpy array
person_detections: List[Dict] # [{bbox, confidence, track_id, class}]
object_detections: List[Dict] # [{bbox, confidence, class}]
track_states: Dict[int, Dict] # {track_id: {centroid, velocity, history}}
pose_keypoints: Dict[int, Any] # {track_id: (17, 3) array}
background_mask: Optional[Any] = None
class DetectionModule(ABC):
"""Abstract base class for all suspicious activity detection modules."""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.enabled = config.get('enabled', True)
self.module_name = self.__class__.__name__
@abstractmethod
def process(self, detections: SharedDetections) -> List[DetectionEvent]:
"""
Process shared detections and return a list of detection events.
Called every frame for each active camera.
"""
pass
@abstractmethod
def get_config_schema(self) -> Dict[str, Any]:
"""Return the configuration schema for this module."""
pass
def is_enabled(self) -> bool:
return self.enabled
def set_enabled(self, enabled: bool):
self.enabled = enabled
7.4 Pipeline Orchestrator
class SuspiciousActivityPipeline:
"""
Orchestrates all detection modules and the scoring engine.
"""
def __init__(self, config_manager: ConfigurationManager):
self.config_manager = config_manager
self.modules: Dict[str, DetectionModule] = {}
self.scoring_engine = ActivityScoringEngine(config_manager.get_scoring_config())
self.alert_manager = AlertManager(config_manager.get_alert_config())
self.night_mode = NightModeScheduler(config_manager.get_night_mode_config())
self.evidence_capture = EvidenceCapture(config_manager.get_evidence_config())
# Initialize all detection modules
self._init_modules()
def _init_modules(self):
"""Instantiate all 10 detection modules."""
module_classes = {
'intrusion': IntrusionDetector,
'loitering': LoiteringDetector,
'running': RunningDetector,
'crowding': CrowdingDetector,
'fall': FallDetector,
'abandoned_object': AbandonedObjectDetector,
'after_hours': AfterHoursDetector,
'zone_breach': ZoneBreachDetector,
'reentry': ReentryPatternDetector,
'suspicious_dwell': SuspiciousDwellDetector
}
for module_name, module_class in module_classes.items():
module_config = self.config_manager.get_module_config(module_name)
self.modules[module_name] = module_class(module_config)
def process_frame(self, camera_id: str, shared_detections: SharedDetections):
"""
Main entry point: process one frame's detections for one camera.
"""
# Check night mode
is_night, sensitivity = self.night_mode.is_night_mode()
if not is_night:
return [] # No suspicious activity detection during day
# Get camera-specific config
camera_config = self.config_manager.get_camera_config(camera_id)
# Run all enabled detection modules
all_events = []
for module_name, module in self.modules.items():
if not module.is_enabled():
continue
try:
events = module.process(shared_detections)
all_events.extend(events)
except Exception as e:
logger.error(f"Module {module_name} failed: {e}")
continue
# Feed events to scoring engine
current_time = time.time()
for event in all_events:
self.scoring_engine.record_event(event)
# Compute composite score
score, threat_level = self.scoring_engine.compute_score(current_time, camera_id)
# Generate alerts for qualifying events
alerts = []
for event in all_events:
if self.alert_manager.should_alert(event, score):
evidence = self.evidence_capture.capture_evidence(event, camera_id)
alert = self.alert_manager.create_alert(event, score, evidence)
self.alert_manager.dispatch_alert(alert)
alerts.append(alert)
return alerts
7.5 Latency Budget
| Pipeline Stage | Target Latency | Max Latency |
|---|---|---|
| Frame preprocessing | 5 ms | 10 ms |
| Base detection (YOLOv8) | 15 ms | 30 ms |
| Tracking (ByteTrack) | 5 ms | 10 ms |
| Suspicious activity analysis (all 10 modules) | 50 ms | 100 ms |
| Scoring engine | 2 ms | 5 ms |
| Alert generation + evidence | 10 ms | 50 ms |
| Total end-to-end | ~87 ms | ~205 ms |
8. CV Models Reference
8.1 Required Models
| # | Model | Purpose | Input | Output | Framework | Performance Target |
|---|---|---|---|---|---|---|
| 1 | YOLOv8n-pose | Person detection + pose estimation (fall detection) | 640x640 RGB | Person bbox (x4) + 17 keypoints (x3) | Ultralytics | 15ms @ TensorRT |
| 2 | YOLOv8s | General object detection (abandoned objects) | 640x640 RGB | Object bboxes + classes | Ultralytics | 10ms @ TensorRT |
| 3 | ByteTrack | Multi-object tracking | Detection bboxes | Track IDs + trajectories | Custom/MOT | 5ms |
| 4 | MOG2 / ViBe | Background subtraction | Grayscale frame | Foreground mask | OpenCV / Custom | 3ms |
| 5 | RAFT / Farneback | Optical flow (running speed estimation) | 2 consecutive frames | Flow field (dx, dy per pixel) | OpenCV / RAFT | 8ms |
8.2 Model Specifications
Model 1: YOLOv8n-pose (Person Detection + Pose)
model: yolov8n-pose.pt
source: Ultralytics
purpose:>
Detect persons and estimate 17-keypoint body pose.
Keypoints used for fall detection and running confirmation.
input:
resolution: [640, 640]
format: RGB
preprocessing: letterbox + normalize(/255)
output:
- person bounding boxes: [x1, y1, x2, y2, confidence]
- 17 COCO keypoints per person: [x, y, visibility] x 17
keypoint_mapping:
0: nose
1-2: left_eye, right_eye
3-4: left_ear, right_ear
5-6: left_shoulder, right_shoulder
7-8: left_elbow, right_elbow
9-10: left_wrist, right_wrist
11-12: left_hip, right_hip
13-14: left_knee, right_knee
15-16: left_ankle, right_ankle
optimization:
tensorrt: FP16
batch_size: 1
target_latency_ms: 15
confidence_threshold: 0.5
Model 2: YOLOv8s (Object Detection)
model: yolov8s.pt
source: Ultralytics
purpose:>
Detect objects that may be abandoned: backpacks, suitcases, boxes, bags.
Also used for secondary person detection verification.
classes_of_interest:
- person # For verification
- backpack # Abandoned object watchlist
- suitcase # Abandoned object watchlist
- handbag # Abandoned object watchlist
- cell phone # Optional
optimization:
tensorrt: FP16
batch_size: 1
target_latency_ms: 10
confidence_threshold: 0.4
Model 3: ByteTrack (Multi-Object Tracking)
model: ByteTrack
source: https://github.com/ifzhang/ByteTrack
purpose:>
Maintain consistent track IDs across frames.
Essential for loitering, running, re-entry, and dwell time detection.
parameters:
track_thresh: 0.5
match_thresh: 0.8
track_buffer: 30 # Frames to keep lost tracks
frame_rate: 25
input: Detection bboxes + scores from YOLOv8
output:
- track_id: int (persistent across frames)
- bbox: [x1, y1, x2, y2]
- score: confidence
- track_state: tracked / lost / removed
latency_target_ms: 5
Model 4: Background Subtraction (MOG2 / ViBe)
model: cv2.BackgroundSubtractorMOG2
source: OpenCV
purpose:>
Identify static foreground objects for abandoned object detection.
Distinguish newly introduced objects from background.
parameters:
history: 500
varThreshold: 16
detectShadows: true
learningRate: 0.005
input: Grayscale or RGB frame
output: Binary foreground mask (255=foreground, 0=background, 127=shadow)
postprocessing:
- Threshold: 200 (remove shadows)
- Morphological open: 3x3 ellipse
- Morphological close: 7x7 ellipse
latency_target_ms: 3
Model 5: Optical Flow (Farneback)
model: cv2.calcOpticalFlowFarneback
source: OpenCV
purpose:>
Estimate pixel motion between consecutive frames.
Used for speed estimation in running detection and
for distinguishing moving persons from stationary objects.
parameters:
pyr_scale: 0.5
levels: 3
winsize: 15
iterations: 3
poly_n: 5
poly_sigma: 1.2
flags: 0
input: 2 consecutive grayscale frames
output: Flow field [H, W, 2] → (dx, dy) per pixel
latency_target_ms: 8
8.3 Model Inference Pipeline
Frame Input (1920x1080)
│
├──▶ Night Enhancement (histogram equalization + denoising)
│ └── Enhanced Frame
│
├──▶ Resize to 640x640
│ │
│ ├──▶ YOLOv8n-pose ──▶ Person bboxes + Keypoints
│ │ │
│ │ ├──▶ Fall Detection Module
│ │ └──▶ ByteTrack ──▶ Track IDs
│ │ │
│ │ ├──▶ Loitering Detection
│ │ ├──▶ Running Detection
│ │ ├──▶ Re-entry Patterns
│ │ ├──▶ Zone Breach
│ │ ├──▶ Suspicious Dwell
│ │ └──▶ After-Hours Presence
│ │
│ └──▶ YOLOv8s ──▶ Object bboxes (backpack, suitcase, etc.)
│ │
│ └──▶ Abandoned Object Detection
│
├──▶ Grayscale conversion ──▶ MOG2 Background Subtraction
│ │
│ └──▶ Foreground mask ──▶ Abandoned Object Detection
│
└──▶ Grayscale ──▶ Farneback Optical Flow
│
└──▶ Running Detection (speed validation)
8.4 Hardware Requirements
| Component | Minimum | Recommended |
|---|---|---|
| GPU | NVIDIA GTX 1660 (6GB) | NVIDIA RTX 3060 (12GB) or Jetson AGX Orin |
| RAM | 16 GB | 32 GB |
| Storage | 256 GB SSD | 512 GB NVMe SSD |
| CPU | Intel i5-8400 / AMD Ryzen 5 2600 | Intel i7-11700 / AMD Ryzen 7 5800X |
| Network | Gigabit Ethernet | Gigabit Ethernet |
9. Pseudocode Reference
9.1 Main Processing Loop
function main_loop():
initialize_pipeline()
initialize_all_detection_modules()
initialize_scoring_engine()
initialize_alert_manager()
for each camera in camera_list:
start_capture_thread(camera)
while system_running:
for each camera in camera_list:
frame = get_next_frame(camera)
if frame is None: continue
# Step 1: Night mode check
is_night, sensitivity = night_mode_scheduler.check(camera)
if not is_night:
continue
# Step 2: Base AI inference (shared)
person_detections = yolo8_pose.detect(frame)
object_detections = yolo8s.detect(frame)
tracks = bytetrack.update(person_detections)
bg_mask = mog2.process(frame)
flow = optical_flow.compute(prev_frame, frame)
# Step 3: Build shared detections object
shared = SharedDetections(
camera_id=camera.id,
timestamp=now(),
frame=frame,
person_detections=person_detections,
object_detections=object_detections,
track_states=tracks,
pose_keypoints={t.id: t.keypoints for t in person_detections},
background_mask=bg_mask,
optical_flow=flow
)
# Step 4: Run all detection modules
all_events = []
for module in detection_modules:
if module.is_enabled():
events = module.process(shared)
all_events.extend(events)
# Step 5: Scoring
for event in all_events:
scoring_engine.record_event(event)
score, level = scoring_engine.compute_score(now(), camera.id)
# Step 6: Alert generation
for event in all_events:
if alert_manager.should_alert(event, score):
evidence = evidence_capture.capture(event, camera)
alert = alert_manager.create_alert(event, score, evidence)
alert_manager.dispatch(alert)
# Step 7: Periodic maintenance
if frame_count % 3000 == 0: # Every ~2 minutes
scoring_engine.purge_old_events()
alert_manager.cleanup_suppression_cache()
sleep(frame_interval)
9.2 Zone Polygon Editor (Admin UI)
function open_zone_editor(camera_id, zone_type):
frame = get_reference_frame(camera_id)
display_frame_on_canvas(frame)
zone = {
zone_id: generate_uuid(),
polygon: [],
zone_type: zone_type, # "intrusion", "loitering", "sensitive_dwell"
color: get_color_for_type(zone_type)
}
on_mouse_click(x, y):
# Convert pixel coordinates to normalized [0,1]
nx = x / frame_width
ny = y / frame_height
zone.polygon.append([nx, ny])
redraw_canvas()
on_mouse_right_click():
# Close polygon (minimum 3 points)
if len(zone.polygon) >= 3:
finalize_polygon(zone)
save_zone(camera_id, zone)
close_editor()
else:
show_error("Polygon requires at least 3 points")
on_double_click():
# Remove last point
if zone.polygon:
zone.polygon.pop()
redraw_canvas()
9.3 Alert Correlation Engine
function correlate_alerts(camera_id, time_window=300):
"""
Find correlated alerts that may indicate coordinated activity.
Returns alert groups that should be escalated together.
"""
recent_alerts = get_alerts(camera_id, since=now() - time_window)
# Group by proximity in time and space
groups = []
for alert in recent_alerts:
assigned = false
for group in groups:
if time_distance(alert, group[-1]) < 60: # Within 60s
if spatial_distance(alert.zone, group[-1].zone) < 0.3: # 30% frame
group.append(alert)
assigned = true
break
if not assigned:
groups.append([alert])
# Escalate groups with multiple correlated alerts
for group in groups:
if len(group) >= 3:
for alert in group:
alert.severity = max(alert.severity, "HIGH")
alert.correlation_group = group[0].alert_id
alert.correlation_note =
f"Correlated with {len(group)-1} other alerts"
return groups
10. Performance & Resource Budget
10.1 Per-Camera Processing Budget (1080p @ 25 FPS)
| Stage | GPU Memory | GPU Compute | CPU | Latency |
|---|---|---|---|---|
| Frame preprocessing | 50 MB | Low | Medium | 5ms |
| YOLOv8n-pose inference | 200 MB | High | Low | 15ms |
| YOLOv8s inference | 150 MB | Medium | Low | 10ms |
| ByteTrack tracking | 20 MB | Low | Medium | 5ms |
| MOG2 background subtraction | 30 MB | Low | Low | 3ms |
| Optical flow | 100 MB | Medium | Low | 8ms |
| 10 Detection modules | 50 MB | Low | Medium | 50ms |
| Scoring + Alerts | 10 MB | None | Low | 12ms |
| Total per camera | ~610 MB | Burst | Medium | ~108ms |
10.2 8-Camera Total Budget
| Resource | Total Required |
|---|---|
| GPU Memory | ~4.9 GB (with batching optimizations) |
| GPU Compute | ~60% sustained on RTX 3060 |
| System RAM | ~8 GB for buffers and state |
| Storage (alerts) | ~10 GB/day (with 30-day retention) |
| Network (alerts) | ~500 KB/day (metadata only) |
11. Testing & Validation Strategy
11.1 Test Scenarios
| Test ID | Scenario | Expected Detection | Modules Tested |
|---|---|---|---|
| TC-001 | Person walks into restricted zone | INTRUSION alert | Intrusion |
| TC-002 | Person stands still for 5+ minutes | LOITERING alert | Loitering |
| TC-003 | Person runs across frame | RUNNING alert | Running |
| TC-004 | 3+ people gather in corner | CROWDING alert | Crowding |
| TC-005 | Person falls to ground | FALL alert | Fall |
| TC-006 | Backpack left unattended | ABANDONED_OBJECT alert | Abandoned Object |
| TC-007 | Person detected at 2 AM | AFTER_HOURS alert | After-Hours |
| TC-008 | Person crosses boundary line | ZONE_BREACH alert | Zone Breach |
| TC-009 | Person enters/exits 3+ times | REENTRY alert | Re-entry |
| TC-010 | Person lingers near server room door | SUSPICIOUS_DWELL alert | Suspicious Dwell |
| TC-011 | Multiple simultaneous detections | Escalated composite score | Scoring Engine |
| TC-012 | Alert suppression within cooldown | Alert suppressed + logged | Alert Manager |
11.2 False Positive Mitigation
| Source of FP | Mitigation Strategy |
|---|---|
| Shadow entering zone | Require 3-frame confirmation + overlap threshold |
| Person bending (not falling) | Aspect ratio + height ratio check in fall detector |
| Quick stop (not loitering) | Movement tolerance radius + temporal confirmation |
| Reflections/ghost detections | Confidence threshold + track consistency check |
| Authorized after-hours worker | Authorized personnel DB + zone-based escalation |
| Cat/dog/animal | YOLOv8 class filter (person only) |
Appendix A: Data Retention
| Data Type | Retention Period | Storage Location |
|---|---|---|
| Raw video | 7 days | NAS / SAN |
| Alert clips | 90 days | NAS / SAN |
| Alert snapshots | 1 year | Object storage |
| Detection events | 1 year | Time-series DB |
| Composite scores | 30 days | Time-series DB |
| Audit logs | 3 years | Log aggregator |
| Configuration changes | Permanent | Version control |
Appendix B: API Endpoints
| Endpoint | Method | Description |
|---|---|---|
/api/v1/cameras/{id}/rules |
GET | List all rules for camera |
/api/v1/cameras/{id}/rules/{rule} |
PUT | Update rule configuration |
/api/v1/cameras/{id}/zones |
POST | Add new zone |
/api/v1/cameras/{id}/zones/{zone_id} |
DELETE | Remove zone |
/api/v1/cameras/{id}/lines |
POST | Add boundary line |
/api/v1/alerts |
GET | List alerts (with filters) |
/api/v1/alerts/{id}/acknowledge |
POST | Acknowledge alert |
/api/v1/scores/{camera_id} |
GET | Current composite score |
/api/v1/scores/{camera_id}/history |
GET | Score time series |
/api/v1/nightmode/override |
POST | Manual night mode override |
/api/v1/nightmode/status |
GET | Current night mode state |
Appendix C: Revision History
| Version | Date | Author | Changes |
|---|---|---|---|
| 1.0 | 2024-01-15 | AI Engineering | Initial design document |
End of Document