Suspicious Activity

Suspicious Activity

Night-mode behavior detection and alert scoring.

Suspicious Activity Detection System — Night-Mode Surveillance

Technical Design Document

Version: 1.0
Scope: 8-Channel Indoor/Industrial Night Surveillance
Operating Window: 22:00 - 06:00 (configurable per camera)
Classification: Confidential — Internal Use


Table of Contents

  1. System Architecture Overview
  2. Detection Modules
    • 2.1 Intrusion Detection
    • 2.2 Loitering Detection
    • 2.3 Running Detection
    • 2.4 Crowding / Group Formation
    • 2.5 Fall Detection
    • 2.6 Abandoned Object Detection
    • 2.7 Unusual After-Hours Presence
    • 2.8 Zone Breach
    • 2.9 Repeated Re-entry Patterns
    • 2.10 Suspicious Dwell Time
  3. Activity Scoring Engine
  4. Configuration Schema
  5. Alert Generation Logic
  6. Night Mode Scheduler
  7. Pipeline Integration
  8. CV Models Reference
  9. Pseudocode Reference
  10. Performance & Resource Budget
  11. Testing & Validation Strategy

1. System Architecture Overview

1.1 High-Level Architecture

┌─────────────────────────────────────────────────────────────────────────────┐
│                         NIGHT-MODE SURVEILLANCE SYSTEM                       │
├─────────────────────────────────────────────────────────────────────────────┤
│  Camera Layer          ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐    │
│  (8 Channels)          │  Cam 1  │ │  Cam 2  │ │  ...    │ │  Cam 8  │    │
│                        └───┬─────┘ └────┬────┘ └────┬────┘ └────┬────┘    │
│                            │            │           │           │          │
├────────────────────────────┼────────────┼───────────┼───────────┼──────────┤
│  Base AI Pipeline          ▼            ▼           ▼           ▼          │
│  (Shared Detections)  ┌─────────────────────────────────────────────────┐   │
│                       │    Person Detection (YOLOv8)                    │   │
│                       │    Object Detection (YOLOv8)                    │   │
│                       │    Multi-Object Tracking (ByteTrack)            │   │
│                       └────────────┬────────────────────────────────────┘   │
│                                    │                                         │
│                                    ▼                                         │
│  ┌─────────────────────────────────────────────────────────────────────┐    │
│  │              SUSPICIOUS ACTIVITY ANALYSIS LAYER                      │    │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐  │    │
│  │  │ Intrusion│ │Loitering │ │ Running  │ │ Crowding │ │   Fall   │  │    │
│  │  │ Detection│ │ Detection│ │ Detection│ │ Detection│ │ Detection│  │    │
│  │  └──────────┘ └──────────┘ └──────────┘ └──────────┘ └──────────┘  │    │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐  │    │
│  │  │Abandoned │ │  After-  │ │  Zone    │ │ Repeated │ │ Suspicious│  │    │
│  │  │  Object  │ │  Hours   │ │  Breach  │ │ Re-entry │ │ Dwell    │  │    │
│  │  │ Detection│ │ Presence │ │ Detection│ │ Patterns │ │  Time    │  │    │
│  │  └──────────┘ └──────────┘ └──────────┘ └──────────┘ └──────────┘  │    │
│  └──────────────────────────┬──────────────────────────────────────────┘    │
│                             ▼                                               │
│  ┌─────────────────────────────────────────────────────────────────────┐    │
│  │              ACTIVITY SCORING ENGINE                                 │    │
│  │   Composite Score = weighted_sum(detection_signals)                 │    │
│  │   Time-decay applied  |  Escalation thresholds                      │    │
│  └──────────────────────────┬──────────────────────────────────────────┘    │
│                             ▼                                               │
│  ┌─────────────────────────────────────────────────────────────────────┐    │
│  │              ALERT GENERATION & MANAGEMENT                           │    │
│  │   Severity Assignment | Suppression Rules | Evidence Attachment     │    │
│  └─────────────────────────────────────────────────────────────────────┘    │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

1.2 Data Flow

Video Frame (RTSP/ONVIF)
    │
    ▼
┌─────────────────┐
│ Frame Capture   │ ◄── 25 FPS target, 1920x1080 resolution
│ & Preprocessing │ ◄── Night enhancement (histogram equalization, denoise)
└────────┬────────┘
         │
         ▼
┌─────────────────┐
│ Base Detections │ ◄── Person boxes (bbox, confidence, track_id)
│                 │ ◄── Object boxes (backpack, suitcase, box, etc.)
│                 │ ◄── Pose keypoints (if person detected)
└────────┬────────┘
         │
         ▼
┌──────────────────────────────────────────────────────────────┐
│ Detection State Buffer                                        │
│   - Track history (last 5 minutes per track_id)               │
│   - Zone occupancy (which tracks in which zones)              │
│   - Velocity vectors per track                                │
│   - Pose history per track (last 10 seconds)                  │
│   - Object-to-person association map                          │
└────────┬──────────────────────────────────────────────────────┘
         │
         ▼
┌──────────────────────────────────────────────────────────────┐
│ Rule Engine — All 10 Detection Modules Evaluate               │
│   Each module reads from state buffer and checks its rules    │
│   Outputs: detection_event (type, severity, confidence)       │
└────────┬──────────────────────────────────────────────────────┘
         │
         ▼
┌──────────────────────────────────────────────────────────────┐
│ Scoring Engine — Composite suspicious activity score          │
│   S_total = sum(weight_i * signal_i * decay(t_i))            │
└────────┬──────────────────────────────────────────────────────┘
         │
         ▼
┌──────────────────────────────────────────────────────────────┐
│ Alert Manager — Severity-based alert decision                 │
│   Suppression check → Evidence capture → Alert dispatch       │
└──────────────────────────────────────────────────────────────┘

1.3 Key Design Principles

Principle Implementation
Real-time Processing All detection modules run in parallel on shared state; target latency < 200ms per frame
Modularity Each detection module is independently enable/disable configurable per camera
Extensibility New detection types can be added as plug-in modules following the DetectionModule interface
False-Positive Resilience Confidence thresholds, temporal filtering, and composite scoring reduce noise
Resource Efficiency Shared base detections (person, object, tracking) feed all modules; no redundant inference
Audit Trail Every detection event, scoring calculation, and alert decision is logged with timestamp

2. Detection Modules

2.1 Intrusion Detection

Description

Detects when a person enters a user-defined restricted zone during night hours. Zones are specified as polygons drawn on the camera's field of view.

Algorithm

Inputs:
  - person_detections: list of (bbox, track_id, confidence) from base pipeline
  - restricted_zones: list of polygon vertices [(x1,y1), (x2,y2), ...] per camera
  - confidence_threshold: float (default 0.65)
  - overlap_threshold: float (default 0.30) — min IoU to trigger

Processing (each frame):
  1. Filter detections: confidence >= confidence_threshold AND class == 'person'
  2. For each person detection:
     a. Compute bounding box polygon
     b. For each restricted_zone:
        i. Compute overlap_area = area(intersection(bbox_poly, zone_poly))
        ii. Compute overlap_ratio = overlap_area / area(bbox_poly)
        iii. If overlap_ratio >= overlap_threshold:
             → Trigger INTRUSION event
  3. Intrusion event structure:
     {
       event_type: "INTRUSION",
       track_id: <id>,
       zone_id: <zone_identifier>,
       overlap_ratio: <float>,
       confidence: <float>,
       timestamp: <ISO8601>,
       severity: "HIGH",
       bbox: [x1, y1, x2, y2],
       camera_id: <cam_id>
     }

Zone Overlap Calculation

def compute_zone_overlap(person_bbox, zone_polygon):
    """
    Calculate overlap between person bounding box and restricted zone polygon.
    Uses shapely-like polygon intersection.
    """
    from shapely.geometry import box, Polygon

    person_poly = box(person_bbox[0], person_bbox[1], person_bbox[2], person_bbox[3])
    zone_poly = Polygon(zone_polygon)

    if not zone_poly.is_valid:
        zone_poly = zone_poly.buffer(0)

    intersection_area = person_poly.intersection(zone_poly).area
    bbox_area = person_poly.area

    overlap_ratio = intersection_area / bbox_area if bbox_area > 0 else 0
    return overlap_ratio, intersection_area

Configuration Parameters

Parameter Type Default Range Description
enabled bool true Enable/disable module
confidence_threshold float 0.65 0.0 - 1.0 Minimum person detection confidence
overlap_threshold float 0.30 0.0 - 1.0 Minimum bbox/zone overlap ratio
restricted_zones list of polygons [] Each polygon: list of (x, y) normalized coordinates
min_zone_area float 0.01 0.0 - 1.0 Minimum zone area as fraction of frame
cooldown_seconds int 30 0 - 3600 Cooldown between repeated alerts for same track in same zone
schedule_override bool false If true, enforce even outside night hours

Hysteresis (Anti-Flutter)

To prevent rapid on/off triggering when a person stands at the zone boundary:

INTRUSION_STATE per (track_id, zone_id):
  - NONE → INTRUDING: requires overlap >= overlap_threshold for 3 consecutive frames
  - INTRUDING → NONE: requires overlap < (overlap_threshold - 0.10) for 5 consecutive frames
  - Alert is generated only on NONE → INTRUDING transition

Severity: HIGH

Rationale: Unauthorized entry into restricted zones during night hours is a critical security event requiring immediate attention.


2.2 Loitering Detection

Description

Detects when a person remains within a defined area for longer than a configurable dwell time threshold. Different from Suspicious Dwell Time (Section 2.10) which targets specific sensitive locations.

Algorithm

Inputs:
  - track_history: dict mapping track_id → list of (timestamp, centroid_x, centroid_y, bbox)
  - loitering_zones: list of polygon-defined zones (optional; if empty, use any area)
  - dwell_time_threshold_seconds: int (default 300 = 5 minutes)
  - movement_tolerance_pixels: int (default 50) — centroid must stay within this radius

Processing:
  1. For each active track_id:
     a. Retrieve position history for last T seconds (T = dwell_time_threshold)
     b. Compute bounding circle of all centroids in window
     c. If bounding_circle_radius <= movement_tolerance_pixels:
        → Candidate loitering detected
     d. Require candidate state for 3 consecutive evaluations (anti-flutter)
  2. If zone-restricted: only trigger if loitering centroid falls inside any loitering_zone
  3. Loiter event structure:
     {
       event_type: "LOITERING",
       track_id: <id>,
       zone_id: <zone_or_null>,
       dwell_time_seconds: <float>,
       centroid_stability_px: <float>,
       severity: "MEDIUM",
       ...
     }

Dwell Time Measurement

def measure_dwell_time(track_history, current_time, track_id):
    """
    Calculate continuous dwell time within a radius.
    Returns: (dwell_seconds, stability_radius, is_loitering)
    """
    positions = track_history[track_id]  # list of (t, cx, cy)
    if len(positions) < 2:
        return 0, float('inf'), False

    # Walk backward from current time
    current_pos = positions[-1]
    contiguous_start = current_time

    for i in range(len(positions) - 2, -1, -1):
        prev_pos = positions[i]
        time_gap = current_pos[0] - prev_pos[0]
        distance = euclidean_distance(current_pos[1:3], prev_pos[1:3])

        if time_gap > MAX_GAP_SECONDS or distance > movement_tolerance_pixels:
            break
        contiguous_start = prev_pos[0]

    dwell_seconds = current_time - contiguous_start
    return dwell_seconds, max_distance, dwell_seconds >= dwell_time_threshold

Configuration Parameters

Parameter Type Default Range Description
enabled bool true Enable/disable module
dwell_time_threshold_seconds int 300 30 - 3600 Seconds before loitering alert
movement_tolerance_pixels int 50 10 - 500 Max centroid displacement to count as stationary
loitering_zones list [] Optional zone restriction; empty = entire frame
consecutive_confirmations int 3 1 - 10 Required consecutive evaluations before alert
cooldown_seconds int 60 0 - 3600 Cooldown for same track_id

Severity: MEDIUM

Rationale: Loitering may indicate surveillance of the facility or waiting for an opportunity; not immediately dangerous but warrants monitoring.


2.3 Running Detection

Description

Detects abnormally fast pedestrian movement, which may indicate fleeing, rushing toward a target, or emergency situations.

Algorithm

Inputs:
  - track_history: dict mapping track_id → list of (timestamp, centroid_x, centroid_y)
  - camera_calibration: homography matrix (optional, for real-world speed)
  - speed_threshold_pixels_per_second: float (default 150)
  - speed_threshold_kmh: float (default 15.0) — if calibration available

Processing:
  1. For each active track_id with at least N positions (N = 5):
     a. Compute velocity vector over sliding window (last 1 second)
     b. speed = magnitude(velocity) in pixels/second
     c. If calibrated: convert to km/h using homography + assumed person height
     d. Compute speed_percentile_90 (90th percentile speed over last 3 seconds)
     e. If speed_percentile_90 >= threshold AND person is upright (not falling):
        → Candidate running
  2. Require candidate state for 2 consecutive seconds to suppress noise
  3. Running event structure:
     {
       event_type: "RUNNING",
       track_id: <id>,
       estimated_speed_kmh: <float>,
       speed_percentile_90: <float>,
       direction_vector: [dx, dy],
       severity: "MEDIUM",
       ...
     }

Speed Estimation

def estimate_speed(track_id, track_buffer, homography=None, window_seconds=1.0):
    """
    Estimate pedestrian speed from tracking data.
    """
    history = track_buffer.get(track_id, [])
    if len(history) < 2:
        return 0.0

    now = history[-1][0]
    cutoff = now - window_seconds
    recent = [h for h in history if h[0] >= cutoff]

    if len(recent) < 2:
        return 0.0

    # Compute displacements between consecutive frames
    displacements = []
    for i in range(1, len(recent)):
        dt = recent[i][0] - recent[i-1][0]
        if dt > 0:
            dx = recent[i][1] - recent[i-1][1]
            dy = recent[i][2] - recent[i-1][2]
            speed_px_s = sqrt(dx*dx + dy*dy) / dt
            displacements.append(speed_px_s)

    if not displacements:
        return 0.0

    # Use 90th percentile to filter out brief stops
    speed_p90 = np.percentile(displacements, 90)

    if homography is not None:
        # Convert pixel speed to real-world km/h
        # Reference: average person height = 1.7m in world → ~100px in image
        scale_factor = homography.get_scale_at_point(recent[-1][1:3])
        speed_kmh = speed_px_s * scale_factor * 3.6
        return speed_kmh

    return speed_p90  # pixels/second fallback

Configuration Parameters

Parameter Type Default Range Description
enabled bool true Enable/disable module
speed_threshold_pixels_per_second float 150 50 - 500 Speed threshold in pixels/second (uncalibrated)
speed_threshold_kmh float 15.0 5.0 - 30.0 Speed threshold in km/h (calibrated cameras)
confirmation_duration_seconds float 2.0 0.5 - 5.0 Seconds of sustained speed before alert
speed_percentile float 90 50 - 99 Percentile for speed robustness
min_track_history_seconds float 1.0 0.5 - 3.0 Minimum tracking duration before speed estimation
exclude_falling_state bool true Don't trigger if fall is concurrently detected

Per-Camera Calibration

# Example camera calibration entry
camera_01:
  homography_matrix: [[h11, h12, h13], [h21, h22, h23], [h31, h32, h33]]
  ground_plane_reference:  # Known points in (image_x, image_y) → (world_x, world_y_meters)
    - {img: [100, 800], world: [0.0, 0.0]}
    - {img: [900, 800], world: [10.0, 0.0]}
    - {img: [100, 400], world: [0.0, 5.0]}
    - {img: [900, 400], world: [10.0, 5.0]}
  pixels_per_meter_at_center: 45.0

Severity: MEDIUM

Rationale: Running may indicate an emergency or malicious intent; requires attention but may also be a false positive (employee rushing).


2.4 Crowding / Group Formation

Description

Detects when multiple persons gather in a small area, which may indicate unauthorized group activity, confrontation, or coordinated intrusion.

Algorithm

Inputs:
  - person_detections: list of (bbox, track_id, confidence)
  - count_threshold: int (default 3)
  - area_threshold: float (default 0.15) — max area as fraction of frame
  - density_threshold: float (default 0.05) — persons per normalized area unit

Processing:
  1. Filter detections: confidence >= 0.50 AND class == 'person'
  2. If count < count_threshold: return NO_EVENT
  3. Compute bounding box of all person centroids
  4. group_area = area(bbox_of_centroids) / area(frame)
  5. density = count / group_area  (if group_area > 0)
  6. If count >= count_threshold AND group_area <= area_threshold:
     → Candidate crowding
  7. Apply temporal confirmation: candidate must persist for 5 consecutive frames
  8. Crowding event structure:
     {
       event_type: "CROWDING",
       person_count: <int>,
       group_area_ratio: <float>,
       density_score: <float>,
       track_ids: [<id1>, <id2>, ...],
       centroid_bbox: [x1, y1, x2, y2],
       severity: "MEDIUM",
       ...
     }

DBSCAN-Based Clustering (Alternative)

def detect_crowding_dbscan(detections, eps=0.08, min_samples=3, frame_area=1.0):
    """
    Use DBSCAN clustering on normalized centroid positions for robust group detection.
    """
    from sklearn.cluster import DBSCAN

    centroids = np.array([
        [(d.bbox[0] + d.bbox[2]) / 2, (d.bbox[1] + d.bbox[3]) / 2]
        for d in detections if d.confidence >= 0.50
    ])

    if len(centroids) < min_samples:
        return []

    # Normalize to [0, 1] range
    centroids_norm = centroids / np.array([frame_width, frame_height])

    clustering = DBSCAN(eps=eps, min_samples=min_samples).fit(centroids_norm)
    labels = clustering.labels_

    groups = []
    for label in set(labels):
        if label == -1:  # Noise points
            continue
        group_mask = labels == label
        group_centroids = centroids_norm[group_mask]
        group_size = len(group_centroids)

        # Compute bounding box of group
        min_xy = group_centroids.min(axis=0)
        max_xy = group_centroids.max(axis=0)
        group_area_ratio = (max_xy[0] - min_xy[0]) * (max_xy[1] - min_xy[1])
        density = group_size / max(group_area_ratio, 0.001)

        groups.append({
            'count': group_size,
            'area_ratio': group_area_ratio,
            'density': density,
            'members': np.where(group_mask)[0].tolist()
        })

    return groups

Configuration Parameters

Parameter Type Default Range Description
enabled bool true Enable/disable module
count_threshold int 3 2 - 20 Minimum persons to form a crowd
area_threshold float 0.15 0.01 - 0.50 Max group area as fraction of frame
density_threshold float 0.05 0.01 - 0.50 Min persons per normalized area unit
confirmation_frames int 5 1 - 30 Consecutive frames before alert
use_dbscan bool true Use DBSCAN instead of bounding box method
dbscan_eps float 0.08 0.01 - 0.30 DBSCAN neighborhood radius (normalized)
cooldown_seconds int 60 0 - 3600 Cooldown between crowding alerts

Severity: MEDIUM

Rationale: Group formation may indicate coordinated activity; requires monitoring but could be legitimate.


2.5 Fall Detection

Description

Detects when a person falls to the ground, which may indicate an accident, medical emergency, or assault.

Algorithm

Inputs:
  - pose_estimation_output: keypoints for each detected person (COCO format: 17 keypoints)
  - track_id: associated track from MOT
  - fall_confidence_threshold: float (default 0.75)

Processing (per person):
  1. Extract keypoints: nose, shoulders, hips, knees, ankles
  2. Compute torso angle:
     - torso_vector = midpoint(shoulders) - midpoint(hips)
     - angle = arctan2(torso_vector.y, torso_vector.x)  # angle from horizontal
     - upright_angle ≈ 90 degrees (vertical torso)
  3. Compute aspect ratio:
     - person_height = max(y) - min(y) of all keypoints
     - person_width = max(x) - min(x) of all keypoints
     - aspect_ratio = width / height
  4. Fall classification (multi-criteria):
     a. Torso angle is near horizontal: |angle - 90| > 45 degrees
     b. Aspect ratio > 1.5 (person wider than tall)
     c. Center of mass (y-coordinate) is low (near ground)
     d. All keypoints confidence > 0.30
  5. Temporal consistency: fall state must persist for >= 1 second
  6. Distinguish from bending:
     - If only torso angle is abnormal but aspect_ratio < 1.2: likely bending → suppress
     - If hip keypoint y is close to ankle y: confirmed fall
  7. Fall event structure:
     {
       event_type: "FALL",
       track_id: <id>,
       torso_angle_degrees: <float>,
       aspect_ratio: <float>,
       fall_confidence: <float>,
       keypoint_confidence_avg: <float>,
       severity: "HIGH",
       ...
     }

Pose-Based Fall Classification

def classify_fall(pose_keypoints, track_history):
    """
    Classify fall from pose keypoints using multiple geometric features.
    pose_keypoints: array of shape (17, 3) → (x, y, confidence)
    Returns: (is_fall, confidence, features_dict)
    """
    # COCO keypoint indices
    NOSE, L_EYE, R_EYE, L_EAR, R_EAR = 0, 1, 2, 3, 4
    L_SHOULDER, R_SHOULDER = 5, 6
    L_ELBOW, R_ELBOW = 7, 8
    L_WRIST, R_WRIST = 9, 10
    L_HIP, R_HIP = 11, 12
    L_KNEE, R_KNEE = 13, 14
    L_ANKLE, R_ANKLE = 15, 16

    # Extract relevant keypoints
    left_shoulder = pose_keypoints[L_SHOULDER]
    right_shoulder = pose_keypoints[R_SHOULDER]
    left_hip = pose_keypoints[L_HIP]
    right_hip = pose_keypoints[R_HIP]
    left_knee = pose_keypoints[L_KNEE]
    right_knee = pose_keypoints[R_KNEE]
    left_ankle = pose_keypoints[L_ANKLE]
    right_ankle = pose_keypoints[R_ANKLE]

    # Filter low-confidence keypoints
    keypoints = [left_shoulder, right_shoulder, left_hip, right_hip,
                 left_knee, right_knee, left_ankle, right_ankle]
    avg_confidence = np.mean([kp[2] for kp in keypoints])
    if avg_confidence < 0.3:
        return False, 0.0, {}

    # 1. Torso angle
    shoulder_mid = midpoint(left_shoulder, right_shoulder)
    hip_mid = midpoint(left_hip, right_hip)
    torso_vector = (shoulder_mid[0] - hip_mid[0], shoulder_mid[1] - hip_mid[1])
    torso_angle = abs(degrees(atan2(torso_vector[1], torso_vector[0])))
    torso_angle_from_vertical = abs(torso_angle - 90)

    # 2. Aspect ratio
    all_x = [kp[0] for kp in keypoints]
    all_y = [kp[1] for kp in keypoints]
    width = max(all_x) - min(all_x)
    height = max(all_y) - min(all_y)
    aspect_ratio = width / max(height, 1)

    # 3. Height ratio — current height vs historical average height
    current_height = height
    if track_id in track_history:
        historical_heights = [h['height'] for h in track_history[track_id][-30:]]
        avg_height = np.mean(historical_heights) if historical_heights else current_height
        height_ratio = current_height / max(avg_height, 1)
    else:
        height_ratio = 1.0

    # 4. Center of mass (vertical)
    com_y = np.mean([kp[1] for kp in keypoints])
    ground_y = max(all_y)  # lowest point
    com_to_ground_ratio = (ground_y - com_y) / max(height, 1)

    # Fall scoring (weighted combination)
    score = 0.0
    score += sigmoid((torso_angle_from_vertical - 45) / 15) * 0.30  # 30% weight
    score += sigmoid((aspect_ratio - 1.2) / 0.4) * 0.25  # 25% weight
    score += sigmoid((1.0 - height_ratio - 0.3) / 0.2) * 0.25  # 25% weight
    score += sigmoid((0.3 - com_to_ground_ratio) / 0.1) * 0.20  # 20% weight

    is_fall = score > 0.75

    # Bending suppression
    if aspect_ratio < 1.2 and height_ratio > 0.6:
        is_fall = False
        score *= 0.3  # Reduce confidence

    features = {
        'torso_angle_from_vertical': torso_angle_from_vertical,
        'aspect_ratio': aspect_ratio,
        'height_ratio': height_ratio,
        'com_to_ground_ratio': com_to_ground_ratio,
        'avg_keypoint_confidence': avg_confidence
    }

    return is_fall, score, features

Configuration Parameters

Parameter Type Default Range Description
enabled bool true Enable/disable module
fall_score_threshold float 0.75 0.0 - 1.0 Minimum fall classification score
min_keypoint_confidence float 0.30 0.0 - 1.0 Minimum avg keypoint confidence
torso_angle_threshold_deg float 45 15 - 75 Degrees from vertical to consider fall
aspect_ratio_threshold float 1.2 1.0 - 3.0 Width/height ratio threshold
height_ratio_threshold float 0.6 0.3 - 0.9 Current height / avg height threshold
temporal_confirmation_ms int 1000 500 - 5000 Milliseconds fall must persist
bending_suppression bool true Reduce false positives from bending

Severity: HIGH

Rationale: Falls represent immediate safety concerns and require urgent response.


2.6 Abandoned Object Detection

Description

Detects when an object is left unattended for an extended period. Uses background subtraction and frame differencing combined with object detection.

Algorithm

Inputs:
  - current_frame: raw image
  - object_detections: list of (bbox, class, confidence) from base pipeline
  - person_detections: list of (bbox, track_id, confidence)
  - background_model: accumulated background reference
  - unattended_time_threshold_seconds: int (default 60)

Processing:
  1. Background Subtraction:
     a. Compute foreground mask: fg_mask = |current_frame - background_model| > threshold
     b. Apply morphological operations (open + close) to clean mask
     c. Find connected components in fg_mask

  2. Object-Person Association:
     a. For each detected object (backpack, suitcase, box, bag):
        - Compute distance to nearest person bbox
        - If distance < proximity_threshold_px: object is "attended"
        - Else: object is "unattended"

  3. Temporal Tracking of Unattended Objects:
     a. Match current unattended objects to previously tracked ones (IoU-based)
     b. For each tracked object:
        - If still unattended: increment unattended_duration
        - If attended again: reset timer
        - If moved significantly: reset timer

  4. Trigger Condition:
     - unattended_duration >= unattended_time_threshold_seconds
     - Object class is in watchlist (backpack, suitcase, box, bag, package)
     - Object remains stationary (centroid shift < 10 pixels)

  5. Abandoned object event:
     {
       event_type: "ABANDONED_OBJECT",
       object_class: <class_name>,
       object_id: <tracking_id>,
       unattended_duration_seconds: <float>,
       owner_track_id: <track_id_or_null>,
       detection_confidence: <float>,
       severity: "HIGH",
       ...
     }

Background Subtraction Pipeline

class BackgroundSubtractionModule:
    def __init__(self, learn_rate=0.005, history=500):
        self.bg_subtractor = cv2.createBackgroundSubtractorMOG2(
            history=history,
            varThreshold=16,
            detectShadows=True
        )
        self.learn_rate = learn_rate
        self.stable_bg = None
        self.frame_count = 0

    def process(self, frame):
        # Apply background subtractor
        fg_mask = self.bg_subtractor.apply(frame, learningRate=self.learn_rate)

        # Shadow removal (shadows = 127 in MOG2 output)
        _, fg_mask = cv2.threshold(fg_mask, 200, 255, cv2.THRESH_BINARY)

        # Morphological cleaning
        kernel_open = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3))
        kernel_close = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (7, 7))
        fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_OPEN, kernel_open)
        fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_CLOSE, kernel_close)

        self.frame_count += 1
        if self.frame_count == history:
            # Capture stable background for reference
            self.stable_bg = self.bg_subtractor.getBackgroundImage()

        return fg_mask

def detect_abandoned_objects(current_frame, detections, bg_mask, object_tracker,
                              unattended_threshold=60):
    """
    Main abandoned object detection logic.
    """
    abandoned_events = []
    watchlist_classes = {'backpack', 'suitcase', 'box', 'bag', 'package', 'luggage'}

    for det in detections:
        if det.class_name not in watchlist_classes:
            continue

        # Check if object is in foreground
        obj_mask = extract_mask_region(bg_mask, det.bbox)
        fg_ratio = np.sum(obj_mask > 0) / obj_mask.size

        if fg_ratio < 0.3:
            continue  # Object is part of background, not newly introduced

        # Check proximity to persons
        nearest_person_distance = min_person_distance(det.bbox, person_detections)
        is_attended = nearest_person_distance < 100  # pixels

        # Track object state
        obj_state = object_tracker.update(det, is_attended)

        if not is_attended and obj_state.unattended_duration >= unattended_threshold:
            abandoned_events.append({
                'event_type': 'ABANDONED_OBJECT',
                'object_class': det.class_name,
                'unattended_duration': obj_state.unattended_duration,
                'bbox': det.bbox,
                'confidence': det.confidence
            })

    return abandoned_events

Configuration Parameters

Parameter Type Default Range Description
enabled bool true Enable/disable module
unattended_time_threshold_seconds int 60 10 - 600 Seconds before object is considered abandoned
proximity_threshold_pixels int 100 30 - 300 Max distance (px) to nearest person for "attended"
watchlist_classes list ["backpack", "suitcase", "box", "bag"] Object classes to monitor
fg_ratio_threshold float 0.3 0.1 - 0.9 Min foreground ratio in object region
bg_history_frames int 500 100 - 2000 Background model history length
bg_learning_rate float 0.005 0.001 - 0.05 Background model learning rate
stationary_threshold_pixels int 10 5 - 50 Max centroid shift to count as stationary

Severity: HIGH

Rationale: Abandoned objects in industrial environments may be explosive devices, hazardous materials, or stolen goods.


2.7 Unusual After-Hours Presence

Description

Generates an alert for any person detected during night hours, separate from watchlist-based alerts. This is a catch-all detection for unexpected human presence.

Algorithm

Inputs:
  - person_detections: list of (bbox, track_id, confidence)
  - is_night_mode: boolean (from Night Mode Scheduler)
  - authorized_personnel_db: list of known personnel (optional, for suppression)
  - detection_confidence_threshold: float (default 0.60)
  - min_detection_frames: int (default 10)

Processing:
  1. If not is_night_mode: return NO_EVENT (unless override enabled)
  2. For each person detection:
     a. If confidence < detection_confidence_threshold: skip
     b. Check if track_id is in authorized_personnel (if DB available)
     c. Require track to be visible for at least min_detection_frames
     d. Check if track has already triggered an alert (deduplication)
  3. After-hours presence event:
     {
       event_type: "AFTER_HOURS_PRESENCE",
       track_id: <id>,
       detection_confidence: <float>,
       is_authorized: <bool_or_null>,
       first_seen_timestamp: <ISO8601>,
       duration_seconds: <float>,
       severity: "LOW",  # upgraded to MEDIUM if in restricted zone
       ...
     }

Severity Escalation

def assign_after_hours_severity(event, current_zone_occupancy):
    """Escalate severity based on location context."""
    base_severity = "LOW"

    # Escalate if in restricted zone
    if event['zone_id'] and event['zone_id'] in RESTRICTED_ZONE_IDS:
        base_severity = "MEDIUM"

    # Escalate if loitering simultaneously detected
    if event.get('concurrent_loitering', False):
        base_severity = "MEDIUM"

    # Escalate if multiple after-hours persons detected
    if event.get('concurrent_presence_count', 1) >= 3:
        base_severity = "MEDIUM"

    return base_severity

Configuration Parameters

Parameter Type Default Range Description
enabled bool true Enable/disable module
detection_confidence_threshold float 0.60 0.0 - 1.0 Minimum person detection confidence
min_detection_frames int 10 1 - 60 Frames of continuous detection before alert
check_authorized_personnel bool false Cross-reference with authorized personnel DB
escalate_in_restricted_zones bool true Upgrade severity in restricted zones
alert_per_track bool true One alert per unique track_id
cooldown_per_zone_seconds int 300 0 - 3600 Cooldown for same zone

Severity: LOW (escalatable to MEDIUM)

Rationale: After-hours presence may be completely legitimate (authorized worker); escalation rules prevent alert fatigue while maintaining coverage.


2.8 Zone Breach

Description

Detects when a person crosses a predefined boundary line in a specific direction. Supports virtual tripwires for entry/exit monitoring.

Algorithm

Inputs:
  - track_history: dict of track positions over time
  - boundary_lines: list of line definitions {
      line_id, point_a(x1,y1), point_b(x2,y2),
      allowed_direction: "both" | "a_to_b" | "b_to_a",
      trigger_on: "cross" | "enter" | "exit"
    }
  - crossing_threshold_pixels: int (default 20)

Processing:
  1. For each active track_id:
     a. Get previous position P_prev and current position P_curr (centroids)
     b. For each boundary_line:
        i. Check if segment [P_prev → P_curr] intersects boundary_line
        ii. Compute intersection point
        iii. Determine crossing direction:
             - direction_vector = normalize(P_curr - P_prev)
             - line_normal = perpendicular(line_vector)
             - dot_product = direction_vector · line_normal
             - if dot_product > 0: crossing A→B, else B→A
        iv. If crossing_direction matches allowed_direction:
            → Trigger ZONE_BREACH
  2. Hysteresis: don't re-trigger same track on same line for cooldown period
  3. Zone breach event:
     {
       event_type: "ZONE_BREACH",
       track_id: <id>,
       line_id: <line_identifier>,
       crossing_direction: "a_to_b" | "b_to_a",
       crossing_point: [x, y],
       severity: <configurable>,
       ...
     }

Line-Crossing Geometry

def check_line_crossing(p_prev, p_curr, line_a, line_b):
    """
    Check if segment p_prev→p_curr crosses line_a→line_b.
    Returns: (did_cross, intersection_point, direction)
    """
    from shapely.geometry import LineString, Point

    path_segment = LineString([p_prev, p_curr])
    boundary = LineString([line_a, line_b])

    if not path_segment.intersects(boundary):
        return False, None, None

    intersection = path_segment.intersection(boundary)
    if intersection.is_empty:
        return False, None, None

    # Determine direction using cross product
    line_vec = (line_b[0] - line_a[0], line_b[1] - line_a[1])
    move_vec = (p_curr[0] - p_prev[0], p_curr[1] - p_prev[1])
    cross = line_vec[0] * move_vec[1] - line_vec[1] * move_vec[0]
    direction = "a_to_b" if cross > 0 else "b_to_a"

    return True, (intersection.x, intersection.y), direction


class ZoneBreachDetector:
    def __init__(self, boundary_lines):
        self.lines = boundary_lines
        self.crossed_state = {}  # (track_id, line_id) → last_cross_time

    def evaluate(self, track_id, position_history):
        if len(position_history) < 2:
            return []

        p_prev = position_history[-2][:2]
        p_curr = position_history[-1][:2]
        events = []
        now = position_history[-1][0]

        for line in self.lines:
            line_key = (track_id, line['line_id'])

            # Cooldown check
            if line_key in self.crossed_state:
                if now - self.crossed_state[line_key] < line.get('cooldown_seconds', 30):
                    continue

            did_cross, cross_point, direction = check_line_crossing(
                p_prev, p_curr, line['point_a'], line['point_b']
            )

            if did_cross:
                allowed = line.get('allowed_direction', 'both')
                if allowed == 'both' or allowed == direction:
                    events.append({
                        'event_type': 'ZONE_BREACH',
                        'track_id': track_id,
                        'line_id': line['line_id'],
                        'direction': direction,
                        'crossing_point': cross_point,
                        'timestamp': now,
                        'severity': line.get('severity', 'MEDIUM')
                    })
                    self.crossed_state[line_key] = now

        return events

Configuration Parameters

Parameter Type Default Range Description
enabled bool true Enable/disable module
boundary_lines list [] Each line: {line_id, point_a, point_b, allowed_direction, severity}
crossing_threshold_pixels int 20 5 - 100 Min movement across line to register
cooldown_seconds int 30 0 - 3600 Cooldown per (track, line)
default_severity string "MEDIUM" LOW/MEDIUM/HIGH Default severity for breaches
require_continuous_tracking bool true Track must not be lost during crossing

Severity: Configurable (default MEDIUM)

Rationale: Zone breach severity depends heavily on context — crossing into a server room is HIGH, while crossing a general lobby boundary may be LOW.


2.9 Repeated Re-entry Patterns

Description

Detects when the same person enters and exits a defined area multiple times within a time window, which may indicate surveillance, probing, or unusual behavior patterns.

Algorithm

Inputs:
  - track_history: dict of track positions
  - reentry_zone: polygon defining the monitored area
  - time_window_seconds: int (default 600 = 10 minutes)
  - reentry_threshold: int (default 3) — min entry/exit cycles
  - entry_exit_line: boundary line for counting entries/exits

Processing:
  1. For each active track_id:
     a. Maintain entry/exit history: list of (timestamp, type: "entry"|"exit")
     b. Detect entry when track centroid crosses into reentry_zone (was outside, now inside)
     c. Detect exit when track centroid crosses out of reentry_zone (was inside, now outside)
     d. Store events with timestamps

  2. Pattern Evaluation:
     a. For each track, examine all entry/exit events in last time_window_seconds
     b. Count complete cycles: entry → exit = 1 cycle
     c. If cycle_count >= reentry_threshold:
        → Trigger REENTRY_PATTERN alert

  3. Re-entry event:
     {
       event_type: "REENTRY_PATTERN",
       track_id: <id>,
       cycle_count: <int>,
       time_window_seconds: <int>,
       first_entry_time: <ISO8601>,
       last_exit_time: <ISO8601>,
       avg_cycle_duration_seconds: <float>,
       severity: "MEDIUM",
       ...
     }

State Machine for Entry/Exit Tracking

class ReentryTracker:
    def __init__(self, zone_polygon, time_window=600, threshold=3):
        self.zone = Polygon(zone_polygon)
        self.time_window = time_window
        self.threshold = threshold
        self.track_states = {}  # track_id → {"inside": bool, "history": [...]}

    def update(self, track_id, centroid, timestamp):
        events = []
        is_inside = self.zone.contains(Point(centroid))

        if track_id not in self.track_states:
            self.track_states[track_id] = {
                'inside': is_inside,
                'transitions': []
            }
            return events

        state = self.track_states[track_id]

        # Detect transitions
        if not state['inside'] and is_inside:
            # Entry detected
            state['transitions'].append({'time': timestamp, 'type': 'entry'})
            state['inside'] = True

        elif state['inside'] and not is_inside:
            # Exit detected
            state['transitions'].append({'time': timestamp, 'type': 'exit'})
            state['inside'] = False

        # Evaluate pattern
        cycles = self._count_cycles(state['transitions'], timestamp)
        if cycles >= self.threshold:
            events.append({
                'event_type': 'REENTRY_PATTERN',
                'track_id': track_id,
                'cycle_count': cycles,
                'transitions': state['transitions'].copy()
            })
            # Reset after alert to allow re-detection
            state['transitions'] = []

        return events

    def _count_cycles(self, transitions, current_time):
        """Count complete entry→exit cycles in time window."""
        cutoff = current_time - self.time_window
        recent = [t for t in transitions if t['time'] >= cutoff]

        cycles = 0
        looking_for_exit = False
        for t in recent:
            if t['type'] == 'entry' and not looking_for_exit:
                looking_for_exit = True
            elif t['type'] == 'exit' and looking_for_exit:
                cycles += 1
                looking_for_exit = False

        return cycles

Configuration Parameters

Parameter Type Default Range Description
enabled bool true Enable/disable module
reentry_zone polygon required Polygon defining the monitored area
time_window_seconds int 600 60 - 3600 Time window for counting re-entries
reentry_threshold int 3 2 - 10 Min entry/exit cycles to trigger
cooldown_seconds int 300 0 - 3600 Cooldown after alert
min_cycle_duration_seconds int 30 5 - 300 Min duration of one cycle (prevent flicker)

Severity: MEDIUM

Rationale: Repeated re-entry is suspicious behavior that warrants investigation but may also be legitimate work activity.


2.10 Suspicious Dwell Time

Description

Detects when a person remains near sensitive areas (doors, equipment, storage zones) for longer than a zone-specific threshold. Different from general loitering (Section 2.2) which applies anywhere.

Algorithm

Inputs:
  - track_history: dict of track positions
  - sensitive_zones: list of {
      zone_id, polygon, dwell_threshold_seconds,
      is_sensitive (bool), alert_severity
    }
  - detection_confidence: float

Processing:
  1. For each active track_id:
     a. Compute current centroid position
     b. For each sensitive_zone:
        i. If centroid is inside zone:
           - Increment dwell timer for (track_id, zone_id)
           - Track continuous presence (reset on exit)
        ii. If dwell_timer >= zone.dwell_threshold_seconds:
            → Trigger SUSPICIOUS_DWELL event
        iii. If track exits zone: reset dwell_timer for (track_id, zone_id)

  2. Multi-zone handling:
     - A track can have independent dwell timers for multiple zones
     - Alert is generated per (track, zone) pair

  3. Dwell event:
     {
       event_type: "SUSPICIOUS_DWELL",
       track_id: <id>,
       zone_id: <zone_id>,
       dwell_duration_seconds: <float>,
       zone_sensitivity_level: <string>,
       severity: <zone-configurable>,
       ...
     }

Dwell Timer Management

class SuspiciousDwellDetector:
    def __init__(self, sensitive_zones):
        """
        sensitive_zones: list of zone configs with dwell thresholds.
        """
        self.zones = sensitive_zones
        # dwell_timers: {(track_id, zone_id): {
        #     'start_time': float, 'accumulated': float, 'last_seen': float
        # }}
        self.dwell_timers = {}
        self.alerted_pairs = set()  # (track_id, zone_id) that already triggered

    def update(self, track_id, centroid, timestamp):
        events = []

        for zone in self.zones:
            zone_id = zone['zone_id']
            zone_poly = zone['polygon']
            timer_key = (track_id, zone_id)
            is_inside = zone_poly.contains(Point(centroid))

            if is_inside:
                if timer_key not in self.dwell_timers:
                    self.dwell_timers[timer_key] = {
                        'start_time': timestamp,
                        'accumulated': 0.0,
                        'last_seen': timestamp
                    }
                else:
                    # Accumulate dwell time
                    dt = timestamp - self.dwell_timers[timer_key]['last_seen']
                    if dt < MAX_GAP_SECONDS:  # Allow brief disappearance
                        self.dwell_timers[timer_key]['accumulated'] += dt
                    self.dwell_timers[timer_key]['last_seen'] = timestamp

                # Check threshold
                accumulated = self.dwell_timers[timer_key]['accumulated']
                threshold = zone['dwell_threshold_seconds']

                if accumulated >= threshold and timer_key not in self.alerted_pairs:
                    events.append({
                        'event_type': 'SUSPICIOUS_DWELL',
                        'track_id': track_id,
                        'zone_id': zone_id,
                        'dwell_duration': accumulated,
                        'zone_type': zone.get('zone_type', 'generic'),
                        'severity': zone.get('severity', 'MEDIUM')
                    })
                    self.alerted_pairs.add(timer_key)

            else:
                # Track exited zone — reset timer
                if timer_key in self.dwell_timers:
                    del self.dwell_timers[timer_key]
                if timer_key in self.alerted_pairs:
                    self.alerted_pairs.remove(timer_key)

        return events

Configuration Parameters

Parameter Type Default Range Description
enabled bool true Enable/disable module
sensitive_zones list required Zones with specific dwell thresholds
default_dwell_threshold_seconds int 120 10 - 1800 Default threshold for zones without specific config
max_gap_seconds float 5.0 1.0 - 30.0 Max disappearance gap before timer reset
zone_severity_map dict {} Override severity per zone
cooldown_seconds int 180 0 - 3600 Cooldown before re-alerting same track in same zone

Predefined Sensitive Zone Types

Zone Type Default Threshold Default Severity Examples
main_entrance 60s MEDIUM Lobby doors, revolving doors
emergency_exit 30s HIGH Fire exits, emergency doors
equipment_room 45s HIGH Server room, electrical panel
storage_area 120s MEDIUM Warehouse shelves, supply closet
elevator_bank 90s LOW Elevator waiting areas
parking_access 60s MEDIUM Garage entry points

Severity: MEDIUM (configurable per zone, up to HIGH)

Rationale: Dwell time near sensitive equipment or access points is context-dependent; severity scales with zone criticality.


3. Activity Scoring Engine

3.1 Composite Suspicious Activity Score

The Activity Scoring Engine combines signals from all detection modules into a single composite suspiciousness score per camera. This score enables:

  • Unified threat assessment across all detection dimensions
  • Escalation triggering when multiple lower-severity events converge
  • Trend analysis for security dashboard visualization
  • Intelligent alert suppression when overall threat is low despite individual triggers

3.2 Score Formula

S_total(t) = sum_over_modules(weight_i * signal_i(t) * decay(t - t_i)) + bonus_cross_module

Where:
  - weight_i: module-specific weight (see table below)
  - signal_i(t): normalized signal value from module i at time t [0, 1]
  - decay(delta_t): exponential time-decay function
  - bonus_cross_module: extra score when multiple modules fire simultaneously
  - t_i: timestamp of most recent event from module i

3.3 Module Weights and Signal Definitions

Module Weight Signal Source Signal Range Max Contribution
Intrusion Detection 0.25 overlap_ratio * confidence 0.0 - 1.0 0.25
Loitering Detection 0.15 dwell_ratio (dwell_time / threshold) 0.0 - 1.0+ 0.15
Running Detection 0.10 speed_ratio (speed / threshold) normalized 0.0 - 1.0+ 0.10
Crowding Detection 0.12 crowd_density_score 0.0 - 1.0 0.12
Fall Detection 0.20 fall_confidence_score 0.0 - 1.0 0.20
Abandoned Object 0.18 unattended_ratio (duration / threshold) 0.0 - 1.0+ 0.18
After-Hours Presence 0.05 binary (1 if detected) * zone_severity_multiplier 0.0 - 1.0 0.05
Zone Breach 0.12 severity_mapped (LOW=0.3, MED=0.6, HIGH=1.0) 0.0 - 1.0 0.12
Re-entry Patterns 0.10 cycle_ratio (count / threshold) 0.0 - 1.0+ 0.10
Suspicious Dwell 0.13 dwell_ratio (duration / zone_threshold) 0.0 - 1.0+ 0.13

Note: Weights sum to 1.40 — this is intentional to allow cross-module amplification. Normalized scores above 1.0 for individual modules are clipped at 1.0.

3.4 Time-Decay Function

def time_decay(delta_t_seconds, half_life=300):
    """
    Exponential decay with 5-minute half-life by default.
    Events older than 30 minutes contribute < 1%.
    """
    import math
    return math.exp(-0.693 * delta_t_seconds / half_life)

# Decay table:
#   0 min → 1.000
#   1 min → 0.871
#   5 min → 0.500
#  10 min → 0.250
#  20 min → 0.063
#  30 min → 0.016

3.5 Cross-Module Amplification Bonus

When multiple detection modules fire simultaneously for the same track or in close proximity, a cross-module bonus is applied:

def compute_cross_module_bonus(active_signals, proximity_weight=0.15):
    """
    Apply bonus when multiple modules detect simultaneously.
    active_signals: list of (module_id, signal_value, track_id, zone_id)
    """
    n_modules = len(active_signals)
    if n_modules <= 1:
        return 0.0

    # Base bonus: +15% per additional module (beyond the first)
    base_bonus = proximity_weight * (n_modules - 1)

    # Track overlap bonus: if signals share the same track_id
    track_groups = group_by_track(active_signals)
    track_bonus = 0.0
    for track_id, signals in track_groups.items():
        if len(signals) >= 2:
            # Same person triggering multiple rules → higher threat
            track_bonus += 0.10 * (len(signals) - 1)

    # Zone overlap bonus: if signals are in the same zone
    zone_groups = group_by_zone(active_signals)
    zone_bonus = 0.0
    for zone_id, signals in zone_groups.items():
        if len(signals) >= 2:
            zone_bonus += 0.08 * (len(signals) - 1)

    return min(base_bonus + track_bonus + zone_bonus, 0.50)  # Cap at +0.50

3.6 Escalation Thresholds

Score Range Threat Level Action
0.00 - 0.20 NONE Log only, no alert
0.20 - 0.40 LOW Log + dashboard indicator
0.40 - 0.60 MEDIUM Log + alert dispatch (non-urgent)
0.60 - 0.80 HIGH Log + immediate alert + highlight
0.80 - 1.00 CRITICAL Log + immediate alert + SMS/email + security dispatch recommendation
> 1.00 EMERGENCY All channels + automatic escalation to security team lead

3.7 Score Computation Pseudocode

class ActivityScoringEngine:
    def __init__(self, config):
        self.weights = config['module_weights']
        self.decay_half_life = config.get('decay_half_life_seconds', 300)
        self.event_history = deque(maxlen=1000)  # Recent events with timestamps
        self.score_history = deque(maxlen=3600)  # Score over last hour

    def compute_score(self, current_time, camera_id):
        """Compute composite suspicious activity score for a camera."""
        # Get recent events for this camera (last 30 minutes)
        recent_events = [
            e for e in self.event_history
            if e['camera_id'] == camera_id
            and (current_time - e['timestamp']) < 1800
        ]

        active_signals = []
        weighted_sum = 0.0

        for event in recent_events:
            module = event['event_type']
            weight = self.weights.get(module, 0.05)
            signal = self._normalize_signal(event)
            age = current_time - event['timestamp']
            decay = time_decay(age, self.decay_half_life)

            contribution = weight * signal * decay
            weighted_sum += contribution

            active_signals.append({
                'module': module,
                'signal': signal,
                'track_id': event.get('track_id'),
                'zone_id': event.get('zone_id'),
                'decayed_contribution': contribution
            })

        # Cross-module bonus
        cross_bonus = compute_cross_module_bonus(active_signals)

        total_score = min(weighted_sum + cross_bonus, 1.5)  # Cap at 1.5

        # Store for history
        self.score_history.append({
            'timestamp': current_time,
            'camera_id': camera_id,
            'score': total_score,
            'components': active_signals
        })

        return total_score, self._classify_threat(total_score)

    def _normalize_signal(self, event):
        """Normalize raw event data to [0, 1] signal value."""
        event_type = event['event_type']

        normalizers = {
            'INTRUSION': lambda e: min(e.get('overlap_ratio', 0) * e.get('confidence', 0) / 0.65, 1.0),
            'LOITERING': lambda e: min(e.get('dwell_time_seconds', 0) / e.get('threshold', 300), 1.0),
            'RUNNING': lambda e: min(e.get('speed_percentile_90', 0) / e.get('threshold', 150), 1.0),
            'CROWDING': lambda e: min(e.get('density_score', 0) / 0.5, 1.0),
            'FALL': lambda e: e.get('fall_confidence', 0),
            'ABANDONED_OBJECT': lambda e: min(e.get('unattended_duration', 0) / e.get('threshold', 60), 1.0),
            'AFTER_HOURS_PRESENCE': lambda e: 1.0 if e.get('is_unauthorized', True) else 0.3,
            'ZONE_BREACH': lambda e: {'LOW': 0.3, 'MEDIUM': 0.6, 'HIGH': 1.0}.get(e.get('severity', 'MEDIUM'), 0.6),
            'REENTRY_PATTERN': lambda e: min(e.get('cycle_count', 0) / e.get('threshold', 3), 1.0),
            'SUSPICIOUS_DWELL': lambda e: min(e.get('dwell_duration', 0) / e.get('threshold', 120), 1.0)
        }

        normalizer = normalizers.get(event_type, lambda e: 0.5)
        return normalizer(event)

    def _classify_threat(self, score):
        if score >= 1.0: return 'EMERGENCY'
        if score >= 0.80: return 'CRITICAL'
        if score >= 0.60: return 'HIGH'
        if score >= 0.40: return 'MEDIUM'
        if score >= 0.20: return 'LOW'
        return 'NONE'

3.8 Score Visualization (Per Camera)

The score is exposed as a time series for dashboard visualization:

  • Current Score: Real-time value (updated every second)
  • 5-Minute Average: Smoothed trend
  • Peak Score: Maximum in last hour
  • Alert Count: Number of alerts generated in current shift

4. Configuration Schema

4.1 Full Configuration Schema (YAML)

# ============================================================
# Suspicious Activity Detection — Master Configuration
# ============================================================

system:
  name: "Night-Mode Surveillance"
  version: "1.0"
  max_channels: 8
  processing_fps: 25
  frame_resolution: [1920, 1080]
  night_mode_schedule:
    enabled: true
    start_time: "22:00"
    end_time: "06:00"
    timezone: "local"
    gradual_transition_minutes: 15  # Ramp sensitivity over 15 min
    override_manual: false          # Admin can manually toggle

# ============================================================
# Activity Scoring Engine
# ============================================================
scoring_engine:
  decay_half_life_seconds: 300
  max_history_minutes: 30
  module_weights:
    INTRUSION: 0.25
    LOITERING: 0.15
    RUNNING: 0.10
    CROWDING: 0.12
    FALL: 0.20
    ABANDONED_OBJECT: 0.18
    AFTER_HOURS_PRESENCE: 0.05
    ZONE_BREACH: 0.12
    REENTRY_PATTERN: 0.10
    SUSPICIOUS_DWELL: 0.13
  escalation_thresholds:
    NONE: 0.00
    LOW: 0.20
    MEDIUM: 0.40
    HIGH: 0.60
    CRITICAL: 0.80
    EMERGENCY: 1.00
  cross_module_bonus:
    enabled: true
    proximity_weight: 0.15
    max_bonus: 0.50

# ============================================================
# Alert Generation
# ============================================================
alert_manager:
  suppression_enabled: true
  default_suppression_minutes: 5
  max_alerts_per_hour_per_camera: 20
  evidence_capture:
    snapshot_enabled: true
    snapshot_frames_before: 5
    snapshot_frames_after: 10
    clip_enabled: true
    clip_duration_seconds: 10
    clip_pre_buffer_seconds: 5
  severity_actions:
    LOW:
      log: true
      dashboard: true
      notification: false
      email: false
      sms: false
    MEDIUM:
      log: true
      dashboard: true
      notification: true
      email: false
      sms: false
    HIGH:
      log: true
      dashboard: true
      notification: true
      email: true
      sms: true
    CRITICAL:
      log: true
      dashboard: true
      notification: true
      email: true
      sms: true
      auto_dispatch: true

# ============================================================
# Per-Camera Configuration Template
# ============================================================
cameras:
  # Camera 1 — Main Entrance
  cam_01:
    enabled: true
    location: "Main Entrance Lobby"
    night_mode:
      enabled: true
      custom_schedule: null  # Use system default
      sensitivity_multiplier: 1.0

    # Restricted Zones (Intrusion Detection)
    intrusion_detection:
      enabled: true
      confidence_threshold: 0.65
      overlap_threshold: 0.30
      cooldown_seconds: 30
      restricted_zones:
        - zone_id: "server_room_door"
          name: "Server Room Entry"
          polygon:
            - [0.65, 0.20]
            - [0.85, 0.20]
            - [0.85, 0.60]
            - [0.65, 0.60]
          severity: "HIGH"
        - zone_id: "admin_office"
          name: "Administration Office"
          polygon:
            - [0.10, 0.30]
            - [0.40, 0.30]
            - [0.40, 0.80]
            - [0.10, 0.80]
          severity: "HIGH"

    # Loitering Detection
    loitering_detection:
      enabled: true
      dwell_time_threshold_seconds: 300
      movement_tolerance_pixels: 50
      loitering_zones: []  # Empty = entire frame
      consecutive_confirmations: 3
      cooldown_seconds: 60

    # Running Detection
    running_detection:
      enabled: true
      speed_threshold_pixels_per_second: 150
      speed_threshold_kmh: 15.0
      confirmation_duration_seconds: 2.0
      speed_percentile: 90
      min_track_history_seconds: 1.0
      exclude_falling_state: true
      calibration:
        pixels_per_meter_at_center: 45.0
        homography_matrix: null  # Set if calibrated

    # Crowding Detection
    crowding_detection:
      enabled: true
      count_threshold: 3
      area_threshold: 0.15
      density_threshold: 0.05
      confirmation_frames: 5
      use_dbscan: true
      dbscan_eps: 0.08
      cooldown_seconds: 60

    # Fall Detection
    fall_detection:
      enabled: true
      fall_score_threshold: 0.75
      min_keypoint_confidence: 0.30
      torso_angle_threshold_deg: 45
      aspect_ratio_threshold: 1.2
      height_ratio_threshold: 0.6
      temporal_confirmation_ms: 1000
      bending_suppression: true

    # Abandoned Object Detection
    abandoned_object_detection:
      enabled: true
      unattended_time_threshold_seconds: 60
      proximity_threshold_pixels: 100
      watchlist_classes: ["backpack", "suitcase", "box", "bag", "package"]
      fg_ratio_threshold: 0.3
      bg_history_frames: 500
      bg_learning_rate: 0.005
      stationary_threshold_pixels: 10

    # After-Hours Presence
    after_hours_presence:
      enabled: true
      detection_confidence_threshold: 0.60
      min_detection_frames: 10
      check_authorized_personnel: false
      escalate_in_restricted_zones: true
      alert_per_track: true
      cooldown_per_zone_seconds: 300

    # Zone Breach
    zone_breach:
      enabled: true
      boundary_lines:
        - line_id: "lobby_entry"
          name: "Lobby Entry Line"
          point_a: [0.0, 0.5]
          point_b: [1.0, 0.5]
          allowed_direction: "both"
          trigger_on: "cross"
          severity: "MEDIUM"
          cooldown_seconds: 30
        - line_id: "secure_corridor"
          name: "Secure Corridor Entry"
          point_a: [0.5, 0.0]
          point_b: [0.5, 1.0]
          allowed_direction: "b_to_a"  # Only flag entering from B side
          trigger_on: "cross"
          severity: "HIGH"
          cooldown_seconds: 60

    # Repeated Re-entry
    reentry_detection:
      enabled: true
      reentry_zone:
        zone_id: "lobby_area"
        polygon:
          - [0.0, 0.0]
          - [1.0, 0.0]
          - [1.0, 1.0]
          - [0.0, 1.0]
      time_window_seconds: 600
      reentry_threshold: 3
      cooldown_seconds: 300
      min_cycle_duration_seconds: 30

    # Suspicious Dwell Time
    suspicious_dwell:
      enabled: true
      sensitive_zones:
        - zone_id: "main_door"
          zone_type: "main_entrance"
          dwell_threshold_seconds: 60
          severity: "MEDIUM"
          polygon:
            - [0.40, 0.10]
            - [0.60, 0.10]
            - [0.60, 0.40]
            - [0.40, 0.40]
        - zone_id: "equipment_panel"
          zone_type: "equipment_room"
          dwell_threshold_seconds: 45
          severity: "HIGH"
          polygon:
            - [0.70, 0.50]
            - [0.90, 0.50]
            - [0.90, 0.80]
            - [0.70, 0.80]
      max_gap_seconds: 5.0

  # Camera 2 — Warehouse Floor
  cam_02:
    enabled: true
    location: "Warehouse Floor North"
    intrusion_detection:
      enabled: true
      confidence_threshold: 0.60  # Slightly lower for wide area
      overlap_threshold: 0.25
      restricted_zones:
        - zone_id: "high_value_storage"
          polygon:
            - [0.20, 0.20]
            - [0.60, 0.20]
            - [0.60, 0.70]
            - [0.20, 0.70]
          severity: "HIGH"
    # ... (same structure, camera-specific values)

  # Cameras 3-8 follow same pattern

4.2 JSON Schema for API Configuration

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "SuspiciousActivityConfig",
  "type": "object",
  "properties": {
    "system": {
      "type": "object",
      "properties": {
        "night_mode_schedule": {
          "type": "object",
          "properties": {
            "enabled": {"type": "boolean", "default": true},
            "start_time": {"type": "string", "pattern": "^([01]?[0-9]|2[0-3]):[0-5][0-9]$", "default": "22:00"},
            "end_time": {"type": "string", "pattern": "^([01]?[0-9]|2[0-3]):[0-5][0-9]$", "default": "06:00"},
            "gradual_transition_minutes": {"type": "integer", "minimum": 0, "maximum": 60, "default": 15}
          }
        }
      }
    },
    "cameras": {
      "type": "object",
      "patternProperties": {
        "^cam_[0-9]+$": {
          "type": "object",
          "properties": {
            "enabled": {"type": "boolean", "default": true},
            "intrusion_detection": {"type": "object", "properties": {
              "enabled": {"type": "boolean", "default": true},
              "confidence_threshold": {"type": "number", "minimum": 0, "maximum": 1, "default": 0.65},
              "overlap_threshold": {"type": "number", "minimum": 0, "maximum": 1, "default": 0.30},
              "restricted_zones": {
                "type": "array",
                "items": {
                  "type": "object",
                  "properties": {
                    "zone_id": {"type": "string"},
                    "polygon": {
                      "type": "array",
                      "items": {"type": "array", "items": {"type": "number"}, "minItems": 2, "maxItems": 2},
                      "minItems": 3
                    }
                  },
                  "required": ["zone_id", "polygon"]
                }
              }
            }},
            "loitering_detection": {"type": "object", "properties": {
              "enabled": {"type": "boolean", "default": true},
              "dwell_time_threshold_seconds": {"type": "integer", "minimum": 30, "maximum": 3600, "default": 300}
            }},
            "fall_detection": {"type": "object", "properties": {
              "enabled": {"type": "boolean", "default": true},
              "fall_score_threshold": {"type": "number", "minimum": 0, "maximum": 1, "default": 0.75}
            }},
            "abandoned_object_detection": {"type": "object", "properties": {
              "enabled": {"type": "boolean", "default": true},
              "unattended_time_threshold_seconds": {"type": "integer", "minimum": 10, "maximum": 600, "default": 60}
            }},
            "zone_breach": {"type": "object", "properties": {
              "enabled": {"type": "boolean", "default": true},
              "boundary_lines": {
                "type": "array",
                "items": {
                  "type": "object",
                  "properties": {
                    "line_id": {"type": "string"},
                    "point_a": {"type": "array", "items": {"type": "number"}, "minItems": 2, "maxItems": 2},
                    "point_b": {"type": "array", "items": {"type": "number"}, "minItems": 2, "maxItems": 2},
                    "allowed_direction": {"type": "string", "enum": ["both", "a_to_b", "b_to_a"], "default": "both"},
                    "severity": {"type": "string", "enum": ["LOW", "MEDIUM", "HIGH"], "default": "MEDIUM"}
                  },
                  "required": ["line_id", "point_a", "point_b"]
                }
              }
            }}
          }
        }
      }
    }
  }
}

4.3 Configuration Management API

class ConfigurationManager:
    """
    Runtime configuration management for suspicious activity detection.
    Supports per-camera, per-rule configuration with hot-reload.
    """

    def get_camera_config(self, camera_id: str) -> CameraConfig:
        """Retrieve full configuration for a camera."""

    def update_rule_config(self, camera_id: str, rule_name: str, config: dict) -> bool:
        """Update configuration for a specific rule on a camera."""

    def add_restricted_zone(self, camera_id: str, zone: ZoneConfig) -> str:
        """Add a new restricted zone to a camera. Returns zone_id."""

    def remove_restricted_zone(self, camera_id: str, zone_id: str) -> bool:
        """Remove a restricted zone from a camera."""

    def add_boundary_line(self, camera_id: str, line: BoundaryLineConfig) -> str:
        """Add a new boundary line for zone breach detection."""

    def toggle_rule(self, camera_id: str, rule_name: str, enabled: bool) -> bool:
        """Enable or disable a detection rule for a camera."""

    def reload_config(self) -> bool:
        """Hot-reload configuration from file without restart."""

    def export_config(self) -> dict:
        """Export full configuration for backup/transfer."""

    def validate_config(self, config: dict) -> list[ValidationError]:
        """Validate configuration and return list of errors."""

5. Alert Generation Logic

5.1 Alert Lifecycle

┌──────────────┐     ┌──────────────┐     ┌──────────────┐     ┌──────────────┐
│   DETECTED   │────▶│  SUPPRESSED  │────▶│   EVIDENCE   │────▶│  DISPATCHED  │
│   (Rule fire)│     │  (Deduplic.) │     │   (Capture)  │     │  (Send alert)│
└──────────────┘     └──────────────┘     └──────────────┘     └──────────────┘
                                                          │
                                                          ▼
                                                   ┌──────────────┐
                                                   │  ACKNOWLEDGE │
                                                   │   or AUTO    │
                                                   └──────────────┘

5.2 When to Alert vs Log

Condition Action Reason
Detection confidence < rule minimum Log only Insufficient evidence
Threat score < LOW threshold (0.2) Log only Below alert threshold
Duplicate alert within suppression window Log + increment counter Prevent spam
Exceeded max alerts/hour for camera Log + rate-limit flag Prevent overflow
Severity LOW and no escalation Log + dashboard Reduce noise
Severity MEDIUM or higher Full alert pipeline Actionable event
Multiple concurrent detections (score spike) Immediate alert Convergent threat

5.3 Suppression Rules

class AlertSuppressor:
    def __init__(self):
        # suppression_log: {(camera_id, rule_type, track_id, zone_id): last_alert_time}
        self.suppression_log = {}
        self.hourly_counters = {}  # (camera_id, hour): count

    def should_suppress(self, event) -> (bool, str):
        """
        Check if an event should be suppressed.
        Returns: (is_suppressed, reason)
        """
        camera_id = event['camera_id']
        rule_type = event['event_type']
        track_id = event.get('track_id', '*')
        zone_id = event.get('zone_id', '*')

        # 1. Rule-specific cooldown
        suppression_key = (camera_id, rule_type, track_id, zone_id)
        cooldown = self._get_cooldown_seconds(rule_type, camera_id)

        if suppression_key in self.suppression_log:
            elapsed = time.time() - self.suppression_log[suppression_key]
            if elapsed < cooldown:
                return True, f"COOLDOWN: {elapsed:.0f}s / {cooldown}s"

        # 2. Hourly rate limit
        hour_key = (camera_id, datetime.now().hour)
        current_count = self.hourly_counters.get(hour_key, 0)
        max_per_hour = self._get_max_alerts_per_hour(camera_id)
        if current_count >= max_per_hour:
            return True, f"RATE_LIMIT: {current_count}/{max_per_hour} this hour"

        # 3. Composite score gating (don't alert if overall threat is low)
        if event.get('severity') == 'LOW':
            current_score = scoring_engine.get_current_score(camera_id)
            if current_score < 0.20:
                return True, f"LOW_SEVERITY_AND_LOW_SCORE: {current_score:.2f}"

        return False, "PASS"

    def record_alert(self, event):
        """Record that an alert was generated."""
        suppression_key = (event['camera_id'], event['event_type'],
                          event.get('track_id', '*'), event.get('zone_id', '*'))
        self.suppression_log[suppression_key] = time.time()

        hour_key = (event['camera_id'], datetime.now().hour)
        self.hourly_counters[hour_key] = self.hourly_counters.get(hour_key, 0) + 1

5.4 Severity Assignment

def assign_alert_severity(detection_event, context):
    """
    Assign final severity considering all context.
    """
    base_severity = detection_event['severity']
    score = context.get('current_composite_score', 0)

    severity_levels = {'LOW': 1, 'MEDIUM': 2, 'HIGH': 3, 'CRITICAL': 4}
    base_level = severity_levels.get(base_severity, 2)

    # Escalation conditions
    if score >= 0.80 and base_level < 3:
        base_level = min(base_level + 1, 4)  # Bump up one level

    # Multiple concurrent detections for same track
    if context.get('concurrent_detections_count', 0) >= 2:
        base_level = min(base_level + 1, 4)

    # Zone-specific escalation
    if detection_event.get('zone_severity_override'):
        zone_level = severity_levels.get(detection_event['zone_severity_override'], base_level)
        base_level = max(base_level, zone_level)

    reverse_levels = {v: k for k, v in severity_levels.items()}
    return reverse_levels.get(base_level, 'MEDIUM')

5.5 Evidence Attachment

Every alert includes visual evidence:

class EvidenceCapture:
    def __init__(self, config):
        self.frame_buffer = {}  # camera_id → ring buffer of recent frames
        self.config = config

    def capture_evidence(self, event, camera_id):
        """Capture snapshot and clip for an alert event."""
        evidence = {}

        # Snapshot: annotated frame at event moment
        if self.config['snapshot_enabled']:
            frame = self.get_frame_with_annotations(camera_id, event)
            snapshot_path = self.save_snapshot(frame, event)
            evidence['snapshot_path'] = snapshot_path

        # Clip: video segment around event
        if self.config['clip_enabled']:
            pre = self.config['clip_pre_buffer_seconds']
            post = self.config['clip_duration_seconds'] - pre
            clip_path = self.save_clip(camera_id, event['timestamp'], pre, post, event)
            evidence['clip_path'] = clip_path

        # Metadata
        evidence['metadata'] = {
            'camera_id': camera_id,
            'timestamp': event['timestamp'],
            'event_type': event['event_type'],
            'track_ids': event.get('track_ids', [event.get('track_id')]),
            'confidence': event.get('confidence', event.get('detection_confidence', 0)),
            'annotated_bboxes': self._get_annotations(event)
        }

        return evidence

5.6 Alert Dispatch Format

{
  "alert_id": "alert_20240115_033045_cam01_001",
  "timestamp": "2024-01-15T03:30:45.123Z",
  "camera_id": "cam_01",
  "camera_location": "Main Entrance Lobby",
  "event_type": "INTRUSION",
  "event_subtype": "restricted_zone_entry",
  "severity": "HIGH",
  "threat_score": 0.72,
  "threat_level": "HIGH",
  "description": "Person detected in restricted zone 'server_room_door' with 68% overlap",
  "detected_objects": [
    {
      "track_id": 42,
      "class": "person",
      "confidence": 0.89,
      "bbox": [820, 310, 950, 580],
      "zone_overlap_ratio": 0.68,
      "zone_id": "server_room_door"
    }
  ],
  "evidence": {
    "snapshot_url": "/evidence/snapshots/alert_20240115_033045_cam01_001.jpg",
    "clip_url": "/evidence/clips/alert_20240115_033045_cam01_001.mp4",
    "metadata": {
      "frame_number": 452310,
      "processing_latency_ms": 145
    }
  },
  "recommendation": "Dispatch security personnel to investigate server room entry",
  "suppression_info": {
    "cooldown_remaining_seconds": 0,
    "alerts_this_hour": 3,
    "hourly_limit": 20
  }
}

6. Night Mode Scheduler

6.1 Activation Logic

class NightModeScheduler:
    """
    Manages automatic activation/deactivation of suspicious activity detection
    during night hours with gradual sensitivity transition.
    """

    def __init__(self, config):
        self.start_time = self._parse_time(config['start_time'])  # default 22:00
        self.end_time = self._parse_time(config['end_time'])       # default 06:00
        self.transition_minutes = config.get('gradual_transition_minutes', 15)
        self.override_active = False
        self.override_state = None  # True=force on, False=force off, None=auto

    def is_night_mode(self, now=None) -> (bool, float):
        """
        Check if night mode should be active.
        Returns: (is_active, sensitivity_multiplier)
        """
        if now is None:
            now = datetime.now()

        # Manual override takes precedence
        if self.override_state is not None:
            return self.override_state, 1.0

        current_time = now.time()

        # Night window: 22:00 - 06:00 (spans midnight)
        if self.start_time <= self.end_time:
            in_window = self.start_time <= current_time <= self.end_time
        else:
            in_window = current_time >= self.start_time or current_time <= self.end_time

        if not in_window:
            return False, 0.0

        # Gradual sensitivity transition
        sensitivity = self._compute_transition_sensitivity(current_time)
        return True, sensitivity

    def _compute_transition_sensitivity(self, current_time) -> float:
        """
        Compute sensitivity multiplier with ramp-up/ramp-down.
        During transition periods, sensitivity gradually increases/decreases.
        """
        now = datetime.combine(datetime.today(), current_time)

        # Ramp-up at start of night window
        start_dt = datetime.combine(datetime.today(), self.start_time)
        ramp_end = start_dt + timedelta(minutes=self.transition_minutes)

        if start_dt <= now <= ramp_end:
            elapsed = (now - start_dt).total_seconds()
            return 0.3 + 0.7 * (elapsed / (self.transition_minutes * 60))

        # Ramp-down at end of night window
        end_dt = datetime.combine(datetime.today(), self.end_time)
        ramp_start = end_dt - timedelta(minutes=self.transition_minutes)

        if ramp_start <= now <= end_dt:
            remaining = (end_dt - now).total_seconds()
            return 0.3 + 0.7 * (remaining / (self.transition_minutes * 60))

        return 1.0  # Full sensitivity during core night hours

    def force_override(self, state: bool):
        """Manually override night mode. state=True forces ON, False forces OFF."""
        self.override_state = state

    def release_override(self):
        """Return to automatic scheduling."""
        self.override_state = None

6.2 Transition Timeline

21:45 ─┬─ Pre-night: Normal daytime mode (sensitivity 0.0)
       │
22:00 ─┼─ Night mode START → Ramp-up begins
       │   Sensitivity: 0.30 → 0.50 → 0.70 → 1.0
       │
22:15 ─┼─ Ramp-up complete → Full night sensitivity (1.0x)
       │
       │   [Full night mode active — all detection modules at configured sensitivity]
       │
05:45 ─┼─ Ramp-down begins → Sensitivity: 1.0 → 0.70 → 0.50 → 0.30
       │
06:00 ─┼─ Night mode END → Return to daytime mode (sensitivity 0.0)
       │
       └─ Normal daytime mode

6.3 Per-Camera Override

# Per-camera night mode overrides
cameras:
  cam_01:
    night_mode:
      enabled: true
      custom_schedule:
        start_time: "21:00"  # Earlier for this camera
        end_time: "07:00"    # Later for this camera
      sensitivity_multiplier: 1.2  # 20% more sensitive

  cam_05:
    night_mode:
      enabled: true
      custom_schedule: null  # Use system default
      sensitivity_multiplier: 0.8  # 20% less sensitive (e.g., always-lit area)

  cam_07:
    night_mode:
      enabled: false  # This camera is always in "day" mode (24h staffed area)

6.4 Scheduler Integration with Detection Pipeline

class NightModePipelineAdapter:
    """
    Adapts detection module sensitivity based on night mode state.
    """

    def __init__(self, scheduler: NightModeScheduler, config_manager):
        self.scheduler = scheduler
        self.config_manager = config_manager
        self.current_state = {}  # camera_id → (is_night, sensitivity, timestamp)

    def apply_night_mode(self, camera_id):
        """
        Called each frame to apply night mode adjustments.
        Returns modified detection thresholds for this frame.
        """
        is_night, sensitivity = self.scheduler.is_night_mode()
        base_config = self.config_manager.get_camera_config(camera_id)
        camera_sensitivity = base_config.get('sensitivity_multiplier', 1.0)
        effective_sensitivity = sensitivity * camera_sensitivity

        # Adjust thresholds inversely to sensitivity
        # Higher sensitivity → lower thresholds
        threshold_multiplier = 1.0 / max(effective_sensitivity, 0.1)

        adjusted_config = self._scale_thresholds(base_config, threshold_multiplier)

        self.current_state[camera_id] = {
            'is_night': is_night,
            'sensitivity': effective_sensitivity,
            'threshold_multiplier': threshold_multiplier,
            'timestamp': time.time()
        }

        return adjusted_config

    def _scale_thresholds(self, config, multiplier):
        """Scale all threshold parameters by the given multiplier."""
        adjusted = deepcopy(config)
        threshold_fields = [
            'confidence_threshold',
            'overlap_threshold',
            'fall_score_threshold',
            'fg_ratio_threshold',
            'detection_confidence_threshold'
        ]

        for rule_name, rule_config in adjusted.items():
            if isinstance(rule_config, dict):
                for field in threshold_fields:
                    if field in rule_config:
                        rule_config[field] = min(rule_config[field] * multiplier, 1.0)

                # Time-based thresholds scale inversely
                for field in ['dwell_time_threshold_seconds',
                              'unattended_time_threshold_seconds',
                              'confirmation_duration_seconds']:
                    if field in rule_config:
                        rule_config[field] = int(rule_config[field] * (2 - multiplier))

        return adjusted

7. Pipeline Integration

7.1 Integration Architecture

┌─────────────────────────────────────────────────────────────────────────────┐
│                           MAIN AI PIPELINE                                   │
│                                                                              │
│  ┌─────────────┐    ┌──────────────┐    ┌──────────────────┐               │
│  │ Frame Input │───▶│ Preprocessing│───▶│ Shared Inference  │               │
│  │  (8 chans)  │    │ (resize,     │    │ (YOLOv8 + Track) │               │
│  │             │    │  night enh.) │    │                  │               │
│  └─────────────┘    └──────────────┘    └────────┬─────────┘               │
│                                                  │                          │
│                         ┌────────────────────────┘                          │
│                         │                                                   │
│                         ▼                                                   │
│  ┌──────────────────────────────────────────────────────────────────────┐   │
│  │              SHARED DETECTION OUTPUTS                                 │   │
│  │  ├── Person detections: [{bbox, confidence, class, track_id}]       │   │
│  │  ├── Object detections: [{bbox, confidence, class}]                 │   │
│  │  ├── Track states: {track_id → {centroid, velocity, history}}       │   │
│  │  ├── Pose keypoints: {track_id → [17x3 keypoints]} (optional)       │   │
│  │  └── Frame metadata: {timestamp, camera_id, frame_number}            │   │
│  └────────────────────────────────┬─────────────────────────────────────┘   │
│                                   │                                         │
│                    ┌──────────────┼──────────────┐                          │
│                    │              │              │                          │
│                    ▼              ▼              ▼                          │
│  ┌──────────────────────┐ ┌──────────┐ ┌────────────────────┐              │
│  │ SUSPICIOUS ACTIVITY  │ │ WATCHLIST│ │   OTHER MODULES    │              │
│  │   ANALYSIS LAYER     │ │ MATCHING │ │ (counting, etc.)   │              │
│  │                      │ │          │ │                    │              │
│  │  10 Detection Rules  │ │ Face/    │ │                    │              │
│  │  Scoring Engine      │ │ Person   │ │                    │              │
│  │  Alert Manager       │ │ Matching │ │                    │              │
│  └──────────┬───────────┘ └──────────┘ └────────────────────┘              │
│             │                                                               │
│             ▼                                                               │
│  ┌──────────────────────────────────────────────────────────────────────┐   │
│  │                    ALERT BUS / DISPATCH                              │   │
│  │   All alerts from all modules are normalized and routed to channels  │   │
│  └──────────────────────────────────────────────────────────────────────┘   │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

7.2 Shared Detection Outputs

The suspicious activity layer consumes the following shared outputs from the base AI pipeline:

Output Source Format Used By
Person bounding boxes YOLOv8 detection [[x1,y1,x2,y2], confidence, track_id] All 10 modules
Object bounding boxes YOLOv8 detection [[x1,y1,x2,y2], confidence, class] Abandoned object, intrusion
Multi-object tracks ByteTrack {track_id: {centroid, bbox, history}} Loitering, running, re-entry, dwell, zone breach
Pose keypoints YOLOv8-pose (17, 3) per person Fall detection
Background mask MOG2 subtractor HxW binary mask Abandoned object

7.3 Detection Module Interface

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
from enum import Enum

class Severity(Enum):
    LOW = "LOW"
    MEDIUM = "MEDIUM"
    HIGH = "HIGH"
    CRITICAL = "CRITICAL"

@dataclass
class DetectionEvent:
    event_type: str
    camera_id: str
    timestamp: float
    severity: Severity
    confidence: float
    track_id: Optional[int] = None
    zone_id: Optional[str] = None
    bbox: Optional[List[float]] = None
    metadata: Dict[str, Any] = None

@dataclass
class SharedDetections:
    """Input from base AI pipeline — shared across all modules."""
    camera_id: str
    timestamp: float
    frame: Any  # numpy array
    person_detections: List[Dict]  # [{bbox, confidence, track_id, class}]
    object_detections: List[Dict]  # [{bbox, confidence, class}]
    track_states: Dict[int, Dict]  # {track_id: {centroid, velocity, history}}
    pose_keypoints: Dict[int, Any]  # {track_id: (17, 3) array}
    background_mask: Optional[Any] = None

class DetectionModule(ABC):
    """Abstract base class for all suspicious activity detection modules."""

    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.enabled = config.get('enabled', True)
        self.module_name = self.__class__.__name__

    @abstractmethod
    def process(self, detections: SharedDetections) -> List[DetectionEvent]:
        """
        Process shared detections and return a list of detection events.
        Called every frame for each active camera.
        """
        pass

    @abstractmethod
    def get_config_schema(self) -> Dict[str, Any]:
        """Return the configuration schema for this module."""
        pass

    def is_enabled(self) -> bool:
        return self.enabled

    def set_enabled(self, enabled: bool):
        self.enabled = enabled

7.4 Pipeline Orchestrator

class SuspiciousActivityPipeline:
    """
    Orchestrates all detection modules and the scoring engine.
    """

    def __init__(self, config_manager: ConfigurationManager):
        self.config_manager = config_manager
        self.modules: Dict[str, DetectionModule] = {}
        self.scoring_engine = ActivityScoringEngine(config_manager.get_scoring_config())
        self.alert_manager = AlertManager(config_manager.get_alert_config())
        self.night_mode = NightModeScheduler(config_manager.get_night_mode_config())
        self.evidence_capture = EvidenceCapture(config_manager.get_evidence_config())

        # Initialize all detection modules
        self._init_modules()

    def _init_modules(self):
        """Instantiate all 10 detection modules."""
        module_classes = {
            'intrusion': IntrusionDetector,
            'loitering': LoiteringDetector,
            'running': RunningDetector,
            'crowding': CrowdingDetector,
            'fall': FallDetector,
            'abandoned_object': AbandonedObjectDetector,
            'after_hours': AfterHoursDetector,
            'zone_breach': ZoneBreachDetector,
            'reentry': ReentryPatternDetector,
            'suspicious_dwell': SuspiciousDwellDetector
        }

        for module_name, module_class in module_classes.items():
            module_config = self.config_manager.get_module_config(module_name)
            self.modules[module_name] = module_class(module_config)

    def process_frame(self, camera_id: str, shared_detections: SharedDetections):
        """
        Main entry point: process one frame's detections for one camera.
        """
        # Check night mode
        is_night, sensitivity = self.night_mode.is_night_mode()
        if not is_night:
            return []  # No suspicious activity detection during day

        # Get camera-specific config
        camera_config = self.config_manager.get_camera_config(camera_id)

        # Run all enabled detection modules
        all_events = []
        for module_name, module in self.modules.items():
            if not module.is_enabled():
                continue

            try:
                events = module.process(shared_detections)
                all_events.extend(events)
            except Exception as e:
                logger.error(f"Module {module_name} failed: {e}")
                continue

        # Feed events to scoring engine
        current_time = time.time()
        for event in all_events:
            self.scoring_engine.record_event(event)

        # Compute composite score
        score, threat_level = self.scoring_engine.compute_score(current_time, camera_id)

        # Generate alerts for qualifying events
        alerts = []
        for event in all_events:
            if self.alert_manager.should_alert(event, score):
                evidence = self.evidence_capture.capture_evidence(event, camera_id)
                alert = self.alert_manager.create_alert(event, score, evidence)
                self.alert_manager.dispatch_alert(alert)
                alerts.append(alert)

        return alerts

7.5 Latency Budget

Pipeline Stage Target Latency Max Latency
Frame preprocessing 5 ms 10 ms
Base detection (YOLOv8) 15 ms 30 ms
Tracking (ByteTrack) 5 ms 10 ms
Suspicious activity analysis (all 10 modules) 50 ms 100 ms
Scoring engine 2 ms 5 ms
Alert generation + evidence 10 ms 50 ms
Total end-to-end ~87 ms ~205 ms

8. CV Models Reference

8.1 Required Models

# Model Purpose Input Output Framework Performance Target
1 YOLOv8n-pose Person detection + pose estimation (fall detection) 640x640 RGB Person bbox (x4) + 17 keypoints (x3) Ultralytics 15ms @ TensorRT
2 YOLOv8s General object detection (abandoned objects) 640x640 RGB Object bboxes + classes Ultralytics 10ms @ TensorRT
3 ByteTrack Multi-object tracking Detection bboxes Track IDs + trajectories Custom/MOT 5ms
4 MOG2 / ViBe Background subtraction Grayscale frame Foreground mask OpenCV / Custom 3ms
5 RAFT / Farneback Optical flow (running speed estimation) 2 consecutive frames Flow field (dx, dy per pixel) OpenCV / RAFT 8ms

8.2 Model Specifications

Model 1: YOLOv8n-pose (Person Detection + Pose)

model: yolov8n-pose.pt
source: Ultralytics
purpose:>
  Detect persons and estimate 17-keypoint body pose.
  Keypoints used for fall detection and running confirmation.
input:
  resolution: [640, 640]
  format: RGB
  preprocessing: letterbox + normalize(/255)
output:
  - person bounding boxes: [x1, y1, x2, y2, confidence]
  - 17 COCO keypoints per person: [x, y, visibility] x 17
keypoint_mapping:
  0: nose
  1-2: left_eye, right_eye
  3-4: left_ear, right_ear
  5-6: left_shoulder, right_shoulder
  7-8: left_elbow, right_elbow
  9-10: left_wrist, right_wrist
  11-12: left_hip, right_hip
  13-14: left_knee, right_knee
  15-16: left_ankle, right_ankle
optimization:
  tensorrt: FP16
  batch_size: 1
  target_latency_ms: 15
confidence_threshold: 0.5

Model 2: YOLOv8s (Object Detection)

model: yolov8s.pt
source: Ultralytics
purpose:>
  Detect objects that may be abandoned: backpacks, suitcases, boxes, bags.
  Also used for secondary person detection verification.
classes_of_interest:
  - person        # For verification
  - backpack      # Abandoned object watchlist
  - suitcase      # Abandoned object watchlist
  - handbag       # Abandoned object watchlist
  - cell phone    # Optional
optimization:
  tensorrt: FP16
  batch_size: 1
  target_latency_ms: 10
confidence_threshold: 0.4

Model 3: ByteTrack (Multi-Object Tracking)

model: ByteTrack
source: https://github.com/ifzhang/ByteTrack
purpose:>
  Maintain consistent track IDs across frames.
  Essential for loitering, running, re-entry, and dwell time detection.
parameters:
  track_thresh: 0.5
  match_thresh: 0.8
  track_buffer: 30        # Frames to keep lost tracks
  frame_rate: 25
input: Detection bboxes + scores from YOLOv8
output:
  - track_id: int (persistent across frames)
  - bbox: [x1, y1, x2, y2]
  - score: confidence
  - track_state: tracked / lost / removed
latency_target_ms: 5

Model 4: Background Subtraction (MOG2 / ViBe)

model: cv2.BackgroundSubtractorMOG2
source: OpenCV
purpose:>
  Identify static foreground objects for abandoned object detection.
  Distinguish newly introduced objects from background.
parameters:
  history: 500
  varThreshold: 16
  detectShadows: true
  learningRate: 0.005
input: Grayscale or RGB frame
output: Binary foreground mask (255=foreground, 0=background, 127=shadow)
postprocessing:
  - Threshold: 200 (remove shadows)
  - Morphological open: 3x3 ellipse
  - Morphological close: 7x7 ellipse
latency_target_ms: 3

Model 5: Optical Flow (Farneback)

model: cv2.calcOpticalFlowFarneback
source: OpenCV
purpose:>
  Estimate pixel motion between consecutive frames.
  Used for speed estimation in running detection and
  for distinguishing moving persons from stationary objects.
parameters:
  pyr_scale: 0.5
  levels: 3
  winsize: 15
  iterations: 3
  poly_n: 5
  poly_sigma: 1.2
  flags: 0
input: 2 consecutive grayscale frames
output: Flow field [H, W, 2] → (dx, dy) per pixel
latency_target_ms: 8

8.3 Model Inference Pipeline

Frame Input (1920x1080)
    │
    ├──▶ Night Enhancement (histogram equalization + denoising)
    │       └── Enhanced Frame
    │
    ├──▶ Resize to 640x640
    │       │
    │       ├──▶ YOLOv8n-pose ──▶ Person bboxes + Keypoints
    │       │                       │
    │       │                       ├──▶ Fall Detection Module
    │       │                       └──▶ ByteTrack ──▶ Track IDs
    │       │                               │
    │       │                               ├──▶ Loitering Detection
    │       │                               ├──▶ Running Detection
    │       │                               ├──▶ Re-entry Patterns
    │       │                               ├──▶ Zone Breach
    │       │                               ├──▶ Suspicious Dwell
    │       │                               └──▶ After-Hours Presence
    │       │
    │       └──▶ YOLOv8s ──▶ Object bboxes (backpack, suitcase, etc.)
    │               │
    │               └──▶ Abandoned Object Detection
    │
    ├──▶ Grayscale conversion ──▶ MOG2 Background Subtraction
    │       │
    │       └──▶ Foreground mask ──▶ Abandoned Object Detection
    │
    └──▶ Grayscale ──▶ Farneback Optical Flow
            │
            └──▶ Running Detection (speed validation)

8.4 Hardware Requirements

Component Minimum Recommended
GPU NVIDIA GTX 1660 (6GB) NVIDIA RTX 3060 (12GB) or Jetson AGX Orin
RAM 16 GB 32 GB
Storage 256 GB SSD 512 GB NVMe SSD
CPU Intel i5-8400 / AMD Ryzen 5 2600 Intel i7-11700 / AMD Ryzen 7 5800X
Network Gigabit Ethernet Gigabit Ethernet

9. Pseudocode Reference

9.1 Main Processing Loop

function main_loop():
    initialize_pipeline()
    initialize_all_detection_modules()
    initialize_scoring_engine()
    initialize_alert_manager()

    for each camera in camera_list:
        start_capture_thread(camera)

    while system_running:
        for each camera in camera_list:
            frame = get_next_frame(camera)
            if frame is None: continue

            # Step 1: Night mode check
            is_night, sensitivity = night_mode_scheduler.check(camera)
            if not is_night:
                continue

            # Step 2: Base AI inference (shared)
            person_detections = yolo8_pose.detect(frame)
            object_detections = yolo8s.detect(frame)
            tracks = bytetrack.update(person_detections)
            bg_mask = mog2.process(frame)
            flow = optical_flow.compute(prev_frame, frame)

            # Step 3: Build shared detections object
            shared = SharedDetections(
                camera_id=camera.id,
                timestamp=now(),
                frame=frame,
                person_detections=person_detections,
                object_detections=object_detections,
                track_states=tracks,
                pose_keypoints={t.id: t.keypoints for t in person_detections},
                background_mask=bg_mask,
                optical_flow=flow
            )

            # Step 4: Run all detection modules
            all_events = []
            for module in detection_modules:
                if module.is_enabled():
                    events = module.process(shared)
                    all_events.extend(events)

            # Step 5: Scoring
            for event in all_events:
                scoring_engine.record_event(event)

            score, level = scoring_engine.compute_score(now(), camera.id)

            # Step 6: Alert generation
            for event in all_events:
                if alert_manager.should_alert(event, score):
                    evidence = evidence_capture.capture(event, camera)
                    alert = alert_manager.create_alert(event, score, evidence)
                    alert_manager.dispatch(alert)

            # Step 7: Periodic maintenance
            if frame_count % 3000 == 0:  # Every ~2 minutes
                scoring_engine.purge_old_events()
                alert_manager.cleanup_suppression_cache()

        sleep(frame_interval)

9.2 Zone Polygon Editor (Admin UI)

function open_zone_editor(camera_id, zone_type):
    frame = get_reference_frame(camera_id)
    display_frame_on_canvas(frame)

    zone = {
        zone_id: generate_uuid(),
        polygon: [],
        zone_type: zone_type,  # "intrusion", "loitering", "sensitive_dwell"
        color: get_color_for_type(zone_type)
    }

    on_mouse_click(x, y):
        # Convert pixel coordinates to normalized [0,1]
        nx = x / frame_width
        ny = y / frame_height
        zone.polygon.append([nx, ny])
        redraw_canvas()

    on_mouse_right_click():
        # Close polygon (minimum 3 points)
        if len(zone.polygon) >= 3:
            finalize_polygon(zone)
            save_zone(camera_id, zone)
            close_editor()
        else:
            show_error("Polygon requires at least 3 points")

    on_double_click():
        # Remove last point
        if zone.polygon:
            zone.polygon.pop()
            redraw_canvas()

9.3 Alert Correlation Engine

function correlate_alerts(camera_id, time_window=300):
    """
    Find correlated alerts that may indicate coordinated activity.
    Returns alert groups that should be escalated together.
    """
    recent_alerts = get_alerts(camera_id, since=now() - time_window)

    # Group by proximity in time and space
    groups = []
    for alert in recent_alerts:
        assigned = false
        for group in groups:
            if time_distance(alert, group[-1]) < 60:  # Within 60s
                if spatial_distance(alert.zone, group[-1].zone) < 0.3:  # 30% frame
                    group.append(alert)
                    assigned = true
                    break
        if not assigned:
            groups.append([alert])

    # Escalate groups with multiple correlated alerts
    for group in groups:
        if len(group) >= 3:
            for alert in group:
                alert.severity = max(alert.severity, "HIGH")
                alert.correlation_group = group[0].alert_id
                alert.correlation_note =
                    f"Correlated with {len(group)-1} other alerts"

    return groups

10. Performance & Resource Budget

10.1 Per-Camera Processing Budget (1080p @ 25 FPS)

Stage GPU Memory GPU Compute CPU Latency
Frame preprocessing 50 MB Low Medium 5ms
YOLOv8n-pose inference 200 MB High Low 15ms
YOLOv8s inference 150 MB Medium Low 10ms
ByteTrack tracking 20 MB Low Medium 5ms
MOG2 background subtraction 30 MB Low Low 3ms
Optical flow 100 MB Medium Low 8ms
10 Detection modules 50 MB Low Medium 50ms
Scoring + Alerts 10 MB None Low 12ms
Total per camera ~610 MB Burst Medium ~108ms

10.2 8-Camera Total Budget

Resource Total Required
GPU Memory ~4.9 GB (with batching optimizations)
GPU Compute ~60% sustained on RTX 3060
System RAM ~8 GB for buffers and state
Storage (alerts) ~10 GB/day (with 30-day retention)
Network (alerts) ~500 KB/day (metadata only)

11. Testing & Validation Strategy

11.1 Test Scenarios

Test ID Scenario Expected Detection Modules Tested
TC-001 Person walks into restricted zone INTRUSION alert Intrusion
TC-002 Person stands still for 5+ minutes LOITERING alert Loitering
TC-003 Person runs across frame RUNNING alert Running
TC-004 3+ people gather in corner CROWDING alert Crowding
TC-005 Person falls to ground FALL alert Fall
TC-006 Backpack left unattended ABANDONED_OBJECT alert Abandoned Object
TC-007 Person detected at 2 AM AFTER_HOURS alert After-Hours
TC-008 Person crosses boundary line ZONE_BREACH alert Zone Breach
TC-009 Person enters/exits 3+ times REENTRY alert Re-entry
TC-010 Person lingers near server room door SUSPICIOUS_DWELL alert Suspicious Dwell
TC-011 Multiple simultaneous detections Escalated composite score Scoring Engine
TC-012 Alert suppression within cooldown Alert suppressed + logged Alert Manager

11.2 False Positive Mitigation

Source of FP Mitigation Strategy
Shadow entering zone Require 3-frame confirmation + overlap threshold
Person bending (not falling) Aspect ratio + height ratio check in fall detector
Quick stop (not loitering) Movement tolerance radius + temporal confirmation
Reflections/ghost detections Confidence threshold + track consistency check
Authorized after-hours worker Authorized personnel DB + zone-based escalation
Cat/dog/animal YOLOv8 class filter (person only)

Appendix A: Data Retention

Data Type Retention Period Storage Location
Raw video 7 days NAS / SAN
Alert clips 90 days NAS / SAN
Alert snapshots 1 year Object storage
Detection events 1 year Time-series DB
Composite scores 30 days Time-series DB
Audit logs 3 years Log aggregator
Configuration changes Permanent Version control

Appendix B: API Endpoints

Endpoint Method Description
/api/v1/cameras/{id}/rules GET List all rules for camera
/api/v1/cameras/{id}/rules/{rule} PUT Update rule configuration
/api/v1/cameras/{id}/zones POST Add new zone
/api/v1/cameras/{id}/zones/{zone_id} DELETE Remove zone
/api/v1/cameras/{id}/lines POST Add boundary line
/api/v1/alerts GET List alerts (with filters)
/api/v1/alerts/{id}/acknowledge POST Acknowledge alert
/api/v1/scores/{camera_id} GET Current composite score
/api/v1/scores/{camera_id}/history GET Score time series
/api/v1/nightmode/override POST Manual night mode override
/api/v1/nightmode/status GET Current night mode state

Appendix C: Revision History

Version Date Author Changes
1.0 2024-01-15 AI Engineering Initial design document

End of Document