Feedback loops, retraining workflow, and model governance.
Self-Learning & Training System Design for Surveillance AI Platform
Table of Contents
- Architecture Overview
- Data Collection Pipeline
- Candidate Training Examples
- Label Conflict Detection
- Approval Workflow
- Retraining Pipeline
- Model Versioning
- Quality Gates
- Embedding Update Strategy
- Training Dataset Management
- Performance Monitoring
- UI for Training Management
- Database Schema
- API Specification
- Implementation Roadmap
1. Architecture Overview
1.1 System Architecture Diagram
+-------------------+ +-------------------+ +-------------------+
| Face Recognition | | User Correction | | Admin Console |
| Pipeline (Prod) | | UI / Actions | | (Training Mgmt) |
+--------+----------+ +--------+----------+ +--------+----------+
| | |
v v v
+-------------------+ +-------------------+ +-------------------+
| Detection Store | | Training Event | | Training Queue |
| (Face Detections)| | Collector | | & Scheduler |
+-------------------+ +--------+----------+ +--------+----------+
| | |
v v v
+-------------------+ +-------------------+ +-------------------+
| Embedding Index |<--->| Training Data | | Training Engine |
| (FAISS/Milvus) | | Warehouse | | (GPU Workers) |
+-------------------+ +--------+----------+ +--------+----------+
| | |
v v v
+-------------------+ +-------------------+ +-------------------+
| Identity DB | | Conflict Detector| | Model Registry |
| (Postgres) | | (Auto-blocker) | | (MLflow) |
+-------------------+ +-------------------+ +-------------------+
|
+-----+-----+
v v
+-----------+ +-----------+
| Quality | | A/B Test |
| Gates | | Manager |
+-----------+ +-----------+
1.2 Core Principles
| Principle |
Description |
| Never Silent |
All training actions are explicit, logged, and auditable |
| Safety First |
Conflicting data is automatically blocked from training |
| Rollback Ready |
Every deployed model can be rolled back within 60 seconds |
| Human in Loop |
User corrections drive learning, but humans approve changes |
| Incremental by Default |
Prefer embedding updates over full retraining |
| Observable |
All training jobs, model versions, and metrics are tracked |
1.3 Technology Stack
| Component |
Technology |
Purpose |
| Training Engine |
PyTorch + PyTorch Lightning |
Model training and fine-tuning |
| Model Registry |
MLflow |
Model versioning and artifact storage |
| GPU Orchestration |
Kubernetes + KEDA |
Auto-scaling GPU training jobs |
| Embedding Store |
FAISS (local) / Milvus (distributed) |
Vector search for face matching |
| Metadata DB |
PostgreSQL |
Training events, labels, model versions |
| Object Storage |
MinIO / S3 |
Training images and model artifacts |
| Message Queue |
Redis / Apache Kafka |
Training job queue and events |
| Pipeline Orchestration |
Apache Airflow |
Scheduled retraining workflows |
| Monitoring |
Prometheus + Grafana |
Model performance metrics |
2. Data Collection Pipeline
2.1 Training Event Types
The system captures five categories of training events from user interactions:
+------------------+------------------------------------------------------+
| Event Type | Description |
+------------------+------------------------------------------------------+
| IDENTITY_NAMED | User assigns a name to an unknown face detection |
| MATCH_CONFIRMED | User confirms an identity match is correct |
| MATCH_CORRECTED | User corrects a wrong match to a different identity |
| IDENTITY_MERGED | User merges two duplicate identities into one |
| MATCH_REJECTED | User explicitly rejects a match suggestion |
+------------------+------------------------------------------------------+
2.2 Event Collection Flow
User Action in UI
|
v
+--------------+ +------------------+ +---------------------+
| Capture Event| --> | Enrich Metadata | --> | Write to Training |
| (API Call) | | (face quality, | | Event Queue (Redis) |
+--------------+ | source, user) | +---------------------+
+------------------+ |
v
+------------------+
| Async Processor |
| (Celery Worker) |
+--------+---------+
|
+------------------------+------------------------+
| | |
v v v
+-------------+ +-------------+ +-------------+
| Quality | | Label Index | | Conflict |
| Filter | | Update | | Detector |
+-------------+ +-------------+ +-------------+
2.3 Quality Filtering Criteria
Every training example must pass the following quality gates before being accepted:
class QualityFilterConfig:
"""Quality thresholds for training data acceptance."""
# Face quality thresholds
min_face_size_px: int = 80 # Minimum face dimension in pixels
min_confidence: float = 0.85 # Face detection confidence
min_image_quality_score: float = 0.6 # Blur/lighting quality score
# Pose thresholds
max_yaw_degrees: float = 45.0 # Maximum head yaw angle
max_pitch_degrees: float = 30.0 # Maximum head pitch angle
max_roll_degrees: float = 30.0 # Maximum head roll angle
# Occlusion thresholds
max_occlusion_ratio: float = 0.3 # Maximum occluded face area
# Consistency thresholds
min_embedding_quality: float = 0.7 # Embedding vector quality metric
# Acceptance logic
require_frontal: bool = False # If true, only near-frontal faces
require_eyes_visible: bool = True # Both eyes must be visible
2.4 Training Event Schema
class TrainingEvent(BaseModel):
"""A single training data capture event."""
event_id: UUID # Unique event identifier
event_type: TrainingEventType # NAMED, CONFIRMED, CORRECTED, MERGED, REJECTED
timestamp: datetime # Event timestamp (UTC)
# Source detection
detection_id: UUID # Reference to original face detection
face_image_path: str # S3/MinIO path to cropped face image
full_frame_path: str # S3/MinIO path to source frame (optional)
# Identity information
identity_id: UUID # Assigned/corrected identity ID
previous_identity_id: UUID | None # For corrections: previous (wrong) identity
is_unknown: bool # Whether this was previously unknown
# Embedding data
embedding_vector: list[float] # 512-dim face embedding
embedding_model_version: str # Model version that produced embedding
# Quality metrics
face_quality_score: float # Combined quality score (0-1)
face_size_px: int # Face bounding box size
detection_confidence: float # Face detection confidence
pose_angles: dict[str, float] # yaw, pitch, roll
# Metadata
source_camera_id: str | None # Camera that captured the face
captured_at: datetime # Original capture timestamp
reviewed_by_user_id: UUID # User who performed the action
review_timestamp: datetime # When user reviewed
review_method: str # "manual_click", "bulk_approve", "auto"
# Training status
training_status: EventStatus # pending, approved, rejected, conflicted
conflict_id: UUID | None # Set if a conflict was detected
approved_at: datetime | None
2.5 Event Collection API
# POST /api/v1/training/events
class SubmitTrainingEventRequest(BaseModel):
event_type: TrainingEventType
detection_id: UUID
identity_id: UUID
previous_identity_id: UUID | None = None
reviewed_by_user_id: UUID
review_method: str = "manual_click"
confidence_override: float | None = None # User confidence in correction
2.6 Bulk Event Collection
For batch operations (e.g., bulk confirmation of matches):
# POST /api/v1/training/events/bulk
class BulkTrainingEventRequest(BaseModel):
events: list[SingleEvent]
common_fields: dict # Shared fields (user_id, timestamp, etc.)
auto_approve: bool = False # If True, skip suggestion queue
# Processing: Events are validated, quality-filtered, and queued
# Response: { accepted: int, rejected: int, conflicts: int, event_ids: list[UUID] }
3. Candidate Training Examples
3.1 Dataset Structure Per Identity
For each identity, the system maintains a structured dataset:
Identity: "John Smith" (ID: uuid-1234)
|
|-- Positive Examples (confirmed faces of John Smith)
| |-- Auto-captured: high-confidence detections confirmed by system
| |-- User-confirmed: matches user explicitly confirmed
| |-- User-named: original naming of unknown person
| |-- Merged-in: faces from merged duplicate identity
| |
| +-- Embedding centroid (running average)
| +-- Confidence radius (max distance from centroid)
|
|-- Negative Examples (faces confirmed NOT John Smith)
| |-- Corrected-away: faces user moved to different identity
| |-- Rejected matches: system suggestions user rejected
| |-- Hard negatives: similar-looking faces from other identities
| |
| +-- Hard negative mining pool
|
|-- Statistics
|-- Total positive examples
|-- Total negative examples
|-- Last trained timestamp
|-- Model version last trained on
|-- Average embedding quality
3.2 Dataset Balance Logic
class IdentityDatasetManager:
"""Manages training examples per identity with balancing."""
MIN_POSITIVE_EXAMPLES: int = 5 # Min positives before identity included
MAX_POSITIVE_EXAMPLES: int = 500 # Cap to prevent over-representation
MAX_NEGATIVE_EXAMPLES: int = 200 # Cap negatives per identity
NEGATIVE_POSITIVE_RATIO: float = 0.5 # Negatives per positive (target)
def build_training_batch(self, identity_id: UUID) -> TrainingBatch:
"""Build a balanced training batch for an identity."""
positives = self.get_positive_examples(
identity_id,
max_count=self.MAX_POSITIVE_EXAMPLES,
prefer_high_quality=True,
prefer_recent=True,
ensure_diversity=True # Spread across time/cameras
)
if len(positives) < self.MIN_POSITIVE_EXAMPLES:
raise InsufficientSamplesError(
f"Identity {identity_id} has only {len(positives)} positive "
f"examples (min: {self.MIN_POSITIVE_EXAMPLES})"
)
target_negatives = int(len(positives) * self.NEGATIVE_POSITIVE_RATIO)
negatives = self.select_negative_examples(
identity_id,
count=min(target_negatives, self.MAX_NEGATIVE_EXAMPLES),
include_hard_negatives=True,
diversity_weight=0.3
)
return TrainingBatch(
identity_id=identity_id,
positive_examples=positives,
negative_examples=negatives,
positive_count=len(positives),
negative_count=len(negatives)
)
3.3 Hard Negative Mining
class HardNegativeMiner:
"""Find hard negative examples to improve model discrimination."""
def mine_hard_negatives(
self,
identity_id: UUID,
model_version: str,
top_k: int = 50
) -> list[TrainingExample]:
"""
Find faces that are similar to the target identity but
belong to other people. These are 'hard' because the model
might confuse them.
"""
centroid = self.get_embedding_centroid(identity_id)
# Search embedding space for nearest neighbors
# that belong to OTHER identities
candidates = self.embedding_index.search(
query=centroid,
top_k=top_k * 3, # Over-fetch to filter
exclude_identity=identity_id
)
# Filter to those within the identity's confidence radius
radius = self.get_confidence_radius(identity_id)
hard_negatives = [
c for c in candidates
if c.distance < radius * 1.5 # Within 1.5x the identity radius
and c.identity_id != identity_id
]
# Score by 'hardness' (closer = harder)
hard_negatives.sort(key=lambda x: x.distance)
return hard_negatives[:top_k]
3.4 Dataset Statistics Per Identity
| Metric |
Threshold |
Action if Below |
| Positive examples |
>= 5 |
Identity excluded from training |
| Negative examples |
>= 2 (or ratio 0.2) |
Use global negatives |
| Embedding quality |
>= 0.7 avg |
Flag for review |
| Age of newest sample |
< 90 days |
Consider identity stale |
| Diversity score |
>= 0.4 (cameras/time) |
Add diversity warning |
4. Label Conflict Detection
4.1 Conflict Types
+-------------------+------------------------------------------------------+
| Conflict Type | Description & Example |
+-------------------+------------------------------------------------------+
| SAME_FACE_DIFF_ID | Same face image assigned to different identities |
| | Ex: Face-A labeled as "John" then corrected to "Bob" |
+-------------------+------------------------------------------------------+
| TRANSITIVE_CHAIN | A->B and B->C implies A->C which may be wrong |
| | Ex: "John" merged into "Bob", but some "John" faces |
| | were previously corrected to "Alice" |
+-------------------+------------------------------------------------------+
| SELF_CONTRADICTION| Same user contradicts their own previous label |
| | Ex: User confirmed face as "John", later says "Bob" |
+-------------------+------------------------------------------------------+
| TEMPORAL_DRIFT | Identity labels drift over time for same person |
| | Ex: Early captures labeled "Unknown", later "John" |
+-------------------+------------------------------------------------------+
| MERGE_COLLISION | Merging identities creates conflicting embeddings |
| | Ex: Two identities with very different face clusters |
+-------------------+------------------------------------------------------+
4.2 Conflict Detection Pipeline
class ConflictDetector:
"""Detects and manages label conflicts in training data."""
async def check_for_conflicts(
self,
new_event: TrainingEvent
) -> list[LabelConflict]:
"""
Check if a new training event creates any label conflicts.
Called synchronously on every training event submission.
"""
conflicts = []
# 1. Check SAME_FACE_DIFF_ID
same_face_events = await self.db.find_events_by_detection(
new_event.detection_id
)
conflicts.extend(
self._detect_same_face_different_identity(
new_event, same_face_events
)
)
# 2. Check if new assignment contradicts existing corrections
if new_event.event_type == TrainingEventType.MATCH_CORRECTED:
conflicts.extend(
await self._detect_transitive_conflicts(new_event)
)
# 3. Check for self-contradiction by same user
conflicts.extend(
await self._detect_user_self_contradiction(new_event)
)
# 4. Check merge compatibility
if new_event.event_type == TrainingEventType.IDENTITY_MERGED:
conflicts.extend(
await self._detect_merge_collisions(new_event)
)
return conflicts
def _detect_same_face_different_identity(
self,
new_event: TrainingEvent,
existing_events: list[TrainingEvent]
) -> list[LabelConflict]:
conflicts = []
for existing in existing_events:
if existing.identity_id != new_event.identity_id:
conflict = LabelConflict(
conflict_type=ConflictType.SAME_FACE_DIFF_ID,
severity=ConflictSeverity.HIGH,
detection_id=new_event.detection_id,
face_image_path=new_event.face_image_path,
conflicting_labels=[
(existing.event_id, existing.identity_id),
(new_event.event_id, new_event.identity_id)
],
description=(
f"Same face assigned to both "
f"{existing.identity_id} and {new_event.identity_id}"
),
blocks_training=True, # Hard block
detected_at=datetime.utcnow()
)
conflicts.append(conflict)
return conflicts
4.3 Conflict Resolution Workflow
+-----------------------------------------+
| Conflict Detected |
+-----------------------------------------+
|
v
+-----------------------------------------+
| 1. Set conflict_id on affected events |
| 2. Set event.training_status = CONFLICTED|
| 3. Block all conflicting events from |
| training dataset |
+-----------------------------------------+
|
v
+-----------------------------------------+
| 4. Create conflict ticket in review UI |
| 5. Notify admin (in-app + email) |
+-----------------------------------------+
|
v
+-----------------------------------------+
| Admin reviews conflict in UI |
+-----------------------------------------+
| |
v v
+----------------+ +-------------------+
| RESOLVE: Pick | | ESCALATE: Flag for|
| correct label | | further review |
+--------+-------+ +--------+----------+
| |
v v
+----------------+ +-------------------+
| Winning label | | Assigned to senior|
| marked as | | reviewer |
| APPROVED, | +-------------------+
| losing marked |
| as REJECTED |
+--------+-------+
|
v
+----------------+
| All events now |
| non-conflicted |
| Unblock for |
| training |
+----------------+
4.4 Conflict Database Schema
CREATE TABLE label_conflicts (
conflict_id UUID PRIMARY KEY,
conflict_type VARCHAR(32) NOT NULL,
severity VARCHAR(16) NOT NULL, -- LOW, MEDIUM, HIGH, CRITICAL
status VARCHAR(16) DEFAULT 'open', -- open, under_review, resolved, escalated
-- References
detection_id UUID REFERENCES detections(detection_id),
face_image_path TEXT NOT NULL,
-- Conflicting assignments (JSON array of event_id + identity_id pairs)
conflicting_labels JSONB NOT NULL,
-- Resolution
resolution VARCHAR(16), -- label_selected, both_valid, escalated, auto_resolved
resolved_by_user_id UUID REFERENCES users(user_id),
winning_identity_id UUID REFERENCES identities(identity_id),
resolution_notes TEXT,
-- Timestamps
detected_at TIMESTAMP NOT NULL DEFAULT NOW(),
resolved_at TIMESTAMP,
-- Training block
blocks_training BOOLEAN NOT NULL DEFAULT TRUE,
training_unblocked_at TIMESTAMP
);
-- Index for fast lookups
CREATE INDEX idx_label_conflicts_status ON label_conflicts(status);
CREATE INDEX idx_label_conflicts_detection ON label_conflicts(detection_id);
CREATE INDEX idx_label_conflicts_severity ON label_conflicts(severity);
5. Approval Workflow
5.1 Three Learning Modes
+==========================================================================+
| MODE 1: MANUAL ONLY |
+==========================================================================+
| |
| User Corrections Training Events Admin |
| | | | |
| v v v |
| +---------+ +---------+ +----------+ |
| | Capture |--------->| Store |-------->| Queue | |
| | Events | | (DB) | | (Wait) | |
| +---------+ +---------+ +----------+ |
| | |
| | Admin manually |
| | triggers training |
| v |
| +------------+ |
| | Training | |
| | Job | |
| +------------+ |
+==========================================================================+
| Best for: High-security environments, regulated industries |
| |
+==========================================================================+
+==========================================================================+
| MODE 2: SUGGESTED LEARNING (Recommended) |
+==========================================================================+
| |
| User Corrections Training Events Suggestion Engine |
| | | | |
| v v v |
| +---------+ +---------+ +----------+ |
| | Capture |--------->| Quality |-------->| Generate | |
| | Events | | Filter | | Suggestions |
| +---------+ +---------+ +----------+ |
| | |
| v |
| +----------+ |
| | Review | |
| | Queue | |
| +----------+ |
| | |
| +--------------------+------------------+ |
| | | | |
| v v v |
| +---------+ +---------+ +--------+|
| | Approve | | Reject | | Correct||
| | (Train) | | (Drop) | | Label ||
| +---------+ +---------+ +--------+|
| |
+==========================================================================+
| Best for: Most production environments. Human reviews every suggestion |
+==========================================================================+
+==========================================================================+
| MODE 3: APPROVED AUTO-UPDATE |
+==========================================================================+
| |
| +-----------------------------------------------------------+ |
| | Trusted Users / Auto-Approval Rules | |
| +-----------------------------------------------------------+ |
| | | |
| v v |
| +----------------+ Auto-approve if: +-------------------+ |
| | > High-confidence| | > From trusted user| |
| | system matches | | > No conflicts | |
| | (score > 0.95) | | > Identity has 10+ | |
| | > User has >100 | | existing samples | |
| | prior approvals | | > Single correction| |
| +----------------+ +-------------------+ |
| | | |
| v v |
| Direct to Training Dataset Into Suggestion Queue |
| (Skip Review) (Mode 2 behavior) |
| |
| Automatic retraining triggers on thresholds: |
| - >50 new approved examples total |
| - >10 new examples for any single identity |
| - Daily schedule at 02:00 AM |
| - Performance degradation detected |
| |
+==========================================================================+
| Best for: Mature systems with well-established identity databases |
+==========================================================================+
5.2 Suggestion Generation Engine
class TrainingSuggestionEngine:
"""Generates training suggestions from captured events."""
async def generate_suggestions(self) -> list[TrainingSuggestion]:
"""
Aggregate training events into actionable suggestions.
Grouped by identity for efficient review.
"""
# Get all pending training events
pending_events = await self.db.get_pending_events(
status=EventStatus.PENDING_REVIEW,
min_quality_score=0.7,
exclude_conflicted=True
)
# Group by target identity
by_identity = self._group_by_identity(pending_events)
suggestions = []
for identity_id, events in by_identity.items():
suggestion = TrainingSuggestion(
suggestion_id=uuid4(),
identity_id=identity_id,
identity_name=await self._get_identity_name(identity_id),
# Grouped changes
new_positive_examples=[
e for e in events
if e.event_type in (IDENTITY_NAMED, MATCH_CONFIRMED)
],
corrected_examples=[
e for e in events
if e.event_type == MATCH_CORRECTED
],
merged_examples=[
e for e in events
if e.event_type == IDENTITY_MERGED
],
# Impact assessment
estimated_accuracy_change=self._estimate_impact(events),
conflict_risk_score=self._assess_conflict_risk(events),
sample_diversity_score=self._assess_diversity(events),
# Metadata
generated_at=datetime.utcnow(),
expires_at=datetime.utcnow() + timedelta(days=7),
auto_approve_eligible=self._check_auto_approve(events),
# Current state
existing_positive_count=await self._get_positive_count(identity_id),
existing_negative_count=await self._get_negative_count(identity_id)
)
suggestions.append(suggestion)
# Sort by priority (impact x risk)
suggestions.sort(
key=lambda s: s.estimated_accuracy_change * (1 - s.conflict_risk_score),
reverse=True
)
return suggestions
5.3 Suggestion Review API
# GET /api/v1/training/suggestions
# Returns paginated list of pending suggestions
# POST /api/v1/training/suggestions/{id}/approve
class ApproveSuggestionRequest(BaseModel):
review_method: str = "manual" # manual, bulk, auto
notes: str | None = None
# On approval: events marked as APPROVED, added to training dataset
# POST /api/v1/training/suggestions/{id}/reject
class RejectSuggestionRequest(BaseModel):
reason: str # Required: why rejected
reject_events: list[UUID] | None = None # Reject specific events, or all
# POST /api/v1/training/suggestions/{id}/correct
class CorrectSuggestionRequest(BaseModel):
corrections: list[EventCorrection] # Fix specific event labels
# Events are updated and re-checked for conflicts
# POST /api/v1/training/suggestions/bulk-approve
class BulkApproveRequest(BaseModel):
suggestion_ids: list[UUID]
review_method: str = "bulk"
# GET /api/v1/training/suggestions/{id}/details
# Returns full details: all events, face thumbnails, impact analysis
5.4 Suggestion Review UI State Machine
+-------------+
| PENDING |
| (Queue) |
+------+------+
|
+----------------+----------------+
| | |
v v v
+---------+ +---------+ +---------+
| APPROVED| | REJECTED| |CORRECTED|
+----+----+ +----+----+ +----+----+
| | |
v v v
+---------+ +---------+ +---------+
| Added to| | Dropped | | Re-check|
| Training| | (Log) | | Conflict|
| Dataset | +---------+ +---------+
+---------+
|
v
+---------+
| Marked |
| TRAINED |
+---------+
6. Retraining Pipeline
6.1 Pipeline Architecture
+------------------------------------------------------------------+
| RETRAINING PIPELINE |
+------------------------------------------------------------------+
| |
| +----------+ +----------+ +----------+ +----------+ |
| | Trigger | | Dataset | | Training | | Quality | |
| | Sources |->| Builder |->| Engine |->| Gates | |
| +----------+ +----------+ +----------+ +----+-----+ |
| | | |
| | v |
| | +----------+ |
| | | Deploy? | |
| | +----+-----+ |
| | | |
| | +----------+----------+ |
| | | | |
| | v v |
| | +---------+ +---------+ |
| | | DEPLOY | | HOLD | |
| | | (New | | (Keep | |
| | | Model) | | Current)| |
| | +----+----+ +---------+ |
| | | |
| | v |
| | +---------+ |
| | | Rollback| |
| | | Ready | |
| | +---------+ |
| | |
| +----+------------------------------------------------------+ |
| | TRIGGER SOURCES | |
| +------------------------------------------------------------+ |
| | 1. SCHEDULED: Daily at 02:00, Weekly on Sunday | |
| | 2. THRESHOLD: >50 new approved examples since last train | |
| | 3. MANUAL: Admin triggers via UI/API | |
| | 4. DEGRADATION: Performance monitor signals accuracy drop | |
| | 5. NEW_IDENTITY: New identity with enough samples added | |
| +------------------------------------------------------------+ |
+------------------------------------------------------------------+
6.2 Training Job Orchestration
class TrainingOrchestrator:
"""Orchestrates the complete retraining pipeline."""
async def create_training_job(
self,
trigger: TrainingTrigger,
config: TrainingConfig
) -> TrainingJob:
"""
Create and queue a new training job.
"""
job = TrainingJob(
job_id=uuid4(),
trigger=trigger, # scheduled, threshold, manual, degradation
status=JobStatus.QUEUED,
config=config,
created_at=datetime.utcnow(),
gpu_requirement=config.gpu_type or GPUType.V100,
priority=self._calculate_priority(trigger)
)
# Write to job queue (Redis sorted set by priority)
await self.job_queue.enqueue(job)
# Log
logger.info(f"Training job {job.job_id} queued with priority {job.priority}")
return job
async def execute_training_job(self, job: TrainingJob):
"""
Execute a training job end-to-end.
"""
try:
# Phase 1: Acquire GPU resources
await self._update_status(job, JobStatus.ACQUIRING_RESOURCES)
gpu_pod = await self.gpu_manager.acquire_gpu(
gpu_type=job.config.gpu_type,
max_wait_seconds=3600
)
# Phase 2: Build training dataset
await self._update_status(job, JobStatus.BUILDING_DATASET)
dataset = await self._build_training_dataset(job)
dataset_version = await self._version_dataset(dataset)
# Phase 3: Determine training strategy
await self._update_status(job, JobStatus.DETERMINING_STRATEGY)
strategy = self._determine_training_strategy(
job=job,
dataset=dataset,
current_model=await self._get_current_production_model()
)
# Phase 4: Execute training
if strategy == TrainingStrategy.INCREMENTAL_EMBEDDING:
await self._update_status(job, JobStatus.INCREMENTAL_UPDATE)
result = await self._run_incremental_update(job, dataset, gpu_pod)
elif strategy == TrainingStrategy.FULL_RETRAIN:
await self._update_status(job, JobStatus.FULL_TRAINING)
result = await self._run_full_retrain(job, dataset, gpu_pod)
elif strategy == TrainingStrategy.FINE_TUNE:
await self._update_status(job, JobStatus.FINE_TUNING)
result = await self._run_fine_tuning(job, dataset, gpu_pod)
# Phase 5: Quality gates
await self._update_status(job, JobStatus.QUALITY_CHECK)
passed = await self._run_quality_gates(result)
if not passed:
await self._update_status(job, JobStatus.QUALITY_FAILED)
await self._handle_quality_failure(job, result)
return
# Phase 6: Register model
await self._update_status(job, JobStatus.REGISTERING)
model_version = await self._register_model(result, dataset_version)
# Phase 7: Deploy (if auto-deploy enabled) or hold for approval
if job.config.auto_deploy and job.trigger != TrainingTrigger.MANUAL:
await self._update_status(job, JobStatus.DEPLOYING)
await self._deploy_model(model_version)
await self._update_status(job, JobStatus.COMPLETED)
else:
await self._update_status(
job, JobStatus.AWAITING_DEPLOYMENT_APPROVAL
)
except Exception as e:
await self._update_status(job, JobStatus.FAILED, error=str(e))
logger.error(f"Training job {job.job_id} failed: {e}")
raise
finally:
await self.gpu_manager.release_gpu(gpu_pod)
6.3 Training Strategy Decision Logic
def determine_training_strategy(
self,
new_examples_count: int,
new_identities_count: int,
days_since_last_train: int,
current_model_age_days: int,
drift_detected: bool
) -> TrainingStrategy:
"""
Decide between incremental update, fine-tuning, or full retraining.
Decision Matrix:
+----------------------+----------------------+----------------------+
| Scenario | Strategy | Reason |
+----------------------+----------------------+----------------------+
| < 100 new examples | INCREMENTAL_EMBEDDING| Just update centroid |
| Existing identities | | vectors, no model |
| No drift | | retraining needed |
+----------------------+----------------------+----------------------+
| 100-1000 examples | FINE_TUNE | Update top layers, |
| Some new identities | | keep base frozen |
| Minor drift | | |
+----------------------+----------------------+----------------------+
| > 1000 examples | FULL_RETRAIN | Significant new data, |
| Many new identities | | retrain from scratch |
| Major drift | | or from checkpoint |
| Model > 30 days old | | |
+----------------------+----------------------+----------------------+
"""
if drift_detected and current_model_age_days > 30:
return TrainingStrategy.FULL_RETRAIN
if new_examples_count > 1000 or new_identities_count > 50:
return TrainingStrategy.FULL_RETRAIN
if new_examples_count > 100:
return TrainingStrategy.FINE_TUNE
return TrainingStrategy.INCREMENTAL_EMBEDDING
6.4 Training Job States
+---------+ +-----------+ +------------+ +--------------+
| QUEUED |-->| ACQUIRING |-->| BUILDING |-->| DETERMINING |
| | | RESOURCES | | DATASET | | STRATEGY |
+---------+ +-----------+ +------------+ +------+-------+
|
+----------------------------------------+--------------------------------+
| | |
v v v
+-------------+ +------------+ +------------+
| INCREMENTAL | | FINE_TUNING| | FULL |
| UPDATE | | | | TRAINING |
+------+------+ +------+-----+ +------+-----+
| | |
+------------------------+----------------+-------------------------------+
|
v
+--------------+
| QUALITY_CHECK|
+------+-------+
|
+----------+----------+
| |
v v
+---------+ +-----------+
| PASSED | | FAILED |
+----+----+ +-----+-----+
| |
v v
+---------+ +-----------+
|REGISTER | | NOTIFIED |
| MODEL | | (Admin) |
+----+----+ +-----------+
|
v
+----------+-----------+
| |
v v
+-------------+ +-------------------+
| DEPLOYED | | AWAITING_APPROVAL |
| (Auto) | | (Manual) |
+-------------+ +-------------------+
6.5 GPU Resource Management
class GPUResourceManager:
"""Manages GPU allocation for training jobs."""
# Configuration
GPU_POOL_CONFIG = {
GPUType.V100: {"count": 4, "memory_gb": 32, "cost_per_hour": 2.5},
GPUType.A100: {"count": 2, "memory_gb": 80, "cost_per_hour": 3.5},
GPUType.T4: {"count": 8, "memory_gb": 16, "cost_per_hour": 0.35},
}
# Job to GPU mapping
JOB_GPU_REQUIREMENTS = {
TrainingStrategy.INCREMENTAL_EMBEDDING: GPUType.T4, # Fast, light
TrainingStrategy.FINE_TUNE: GPUType.V100, # Moderate
TrainingStrategy.FULL_RETRAIN: GPUType.A100, # Heavy
}
async def acquire_gpu(
self,
gpu_type: GPUType,
max_wait_seconds: int = 3600
) -> GPUPod:
"""
Acquire a GPU for training with fair queuing.
"""
# Check if GPU available
available = await self.kubernetes.get_available_gpus(gpu_type)
if available > 0:
pod = await self.kubernetes.create_training_pod(
image="face-training:latest",
gpu_type=gpu_type,
resources={"memory": "32Gi", "cpu": 8}
)
return pod
# Queue and wait with priority
queue_position = await self.waiting_queue.add({
"gpu_type": gpu_type,
"requested_at": datetime.utcnow().isoformat()
})
logger.info(f"GPU {gpu_type.value} not available, queued at position {queue_position}")
# Wait with timeout
pod = await self._wait_for_gpu(gpu_type, timeout=max_wait_seconds)
return pod
async def auto_scale_gpu_pool(self):
"""
Auto-scale GPU pool based on queue depth and cost constraints.
Uses KEDA for Kubernetes event-driven autoscaling.
"""
queue_depth = await self.waiting_queue.depth()
running_jobs = await self.kubernetes.get_running_training_jobs()
current_cost = self._calculate_current_cost(running_jobs)
# Scale up if queue is backed up
if queue_depth > 5 and current_cost < self.max_hourly_budget:
await self.kubernetes.scale_gpu_node_pool(
target_replicas=min(queue_depth // 2, self.max_gpus)
)
# Scale down if idle
if queue_depth == 0 and len(running_jobs) == 0:
await self.kubernetes.scale_gpu_node_pool(target_replicas=1)
7. Model Versioning
7.1 Model Registry Structure
Model Registry (MLflow-like)
|
|-- face-recognition-model (Registered Model)
| |
| |-- Version 1.0.0 (Production, 2024-01-15)
| | |-- Model artifact (PyTorch .pt)
| | |-- Training dataset version: dataset-v1.0.0
| | |-- Training config JSON
| | |-- Performance metrics JSON
| | |-- Stage: Production
| | |-- Deployment date: 2024-01-15T08:30:00Z
| | +-- Rollback ready: Yes
| |
| |-- Version 1.1.0 (Staging, 2024-01-22)
| | |-- Model artifact
| | |-- Dataset: dataset-v1.1.0
| | |-- Stage: Staging (awaiting quality gates)
| | +-- A/B test: 10% traffic
| |
| |-- Version 1.1.1 (Archived, 2024-01-23)
| | |-- Stage: Archived
| | +-- Reason: Failed quality gate (precision drop)
| |
| +-- Version 1.2.0 (Staging, 2024-01-25)
| |-- Dataset: dataset-v1.2.0
| |-- Stage: Staging
| +-- Changelog: "Added 45 new identities, fine-tuned embeddings"
7.2 Versioning Schema
class ModelVersion(BaseModel):
"""A versioned model in the registry."""
version_id: UUID
version_string: str # Semantic: 1.2.3
# Model metadata
model_name: str = "face-recognition"
model_type: str # "arcface", "facenet", "custom"
architecture: str # "iresnet100", "mobilenet", etc.
embedding_dimension: int = 512
# Training lineage
training_job_id: UUID
training_dataset_version: str # References dataset version
training_config: TrainingConfig
base_model_version: str | None # For fine-tuning: what we started from
training_strategy: TrainingStrategy
# Model artifacts
model_artifact_path: str # S3/MinIO path to .pt file
checkpoint_path: str # Full training checkpoint
onnx_export_path: str | None # ONNX optimized version
tensorrt_path: str | None # TensorRT optimized (GPU)
# Performance metrics
training_metrics: TrainingMetrics
evaluation_metrics: EvaluationMetrics # On hold-out test set
identity_performance: dict[UUID, IdentityMetrics] # Per-identity breakdown
# Deployment state
stage: ModelStage # STAGING, PRODUCTION, ARCHIVED
deployment_date: datetime | None
traffic_percentage: float = 0.0 # For A/B testing
# Rollback
can_rollback: bool = True
rollback_to_version: str | None # Previous version
# Timestamps
created_at: datetime
updated_at: datetime
# Audit
created_by_user_id: UUID
approved_by_user_id: UUID | None
approval_notes: str | None
7.3 Semantic Versioning for Models
Version Format: MAJOR.MINOR.PATCH
MAJOR (X.0.0): Full retraining, architecture change, or breaking embedding change
- New base model (e.g., ArcFace -> FaceNet)
- Embedding dimension change
- All embeddings invalidated, full database rebuild required
MINOR (x.Y.0): Fine-tuning or significant new training data
- 50+ new identities added
- Fine-tuning with substantial new data
- Embeddings compatible, incremental update possible
PATCH (x.y.Z): Incremental update, hotfix
- New examples for existing identities
- Centroid updates only
- Backward compatible, rolling deployment safe
7.4 Rollback Mechanism
class RollbackManager:
"""Manages model rollback with zero-downtime switching."""
ROLLBACK_TIMEOUT_SECONDS = 60
async def rollback_to_version(
self,
target_version: str,
reason: str,
initiated_by: UUID
) -> RollbackResult:
"""
Rollback to a previous model version.
"""
# 1. Validate target version exists and is rollback-eligible
target = await self.model_registry.get_version(target_version)
if not target.can_rollback:
raise RollbackError(f"Version {target_version} is not rollback-eligible")
current = await self.model_registry.get_production_version()
# 2. Pre-warm: Load target model into memory
logger.info(f"Pre-warming model {target_version} for rollback")
await self.inference_service.load_model(target_version)
# 3. Atomic switch with traffic mirroring
await self._atomic_model_switch(
from_version=current.version_string,
to_version=target.version_string,
mirror_traffic=True, # Send to both, compare results
mirror_duration_seconds=30
)
# 4. Validate switch health
health = await self._check_model_health(target_version)
if not health.healthy:
# Auto-revert to current
logger.error(f"Rollback health check failed, reverting to {current.version_string}")
await self._atomic_model_switch(
from_version=target.version_string,
to_version=current.version_string
)
raise RollbackError(f"Rollback health check failed: {health.details}")
# 5. Commit the rollback
await self.model_registry.update_stage(
version=target.version_string,
stage=ModelStage.PRODUCTION,
traffic_percentage=100.0
)
await self.model_registry.update_stage(
version=current.version_string,
stage=ModelStage.ARCHIVED,
archive_reason=f"Rolled back to {target.version_string}: {reason}"
)
# 6. Log rollback event
await self.audit_log.record(RollbackEvent(
from_version=current.version_string,
to_version=target.version_string,
reason=reason,
initiated_by=initiated_by,
completed_at=datetime.utcnow(),
duration_seconds=self.ROLLBACK_TIMEOUT_SECONDS
))
return RollbackResult(
success=True,
previous_version=current.version_string,
new_version=target.version_string,
switch_duration_ms=health.switch_duration_ms
)
7.5 A/B Testing Framework
class ABTestManager:
"""Manages A/B testing of model versions."""
async def start_ab_test(
self,
new_version: str,
traffic_split: dict[str, float], # {"current": 0.9, "new": 0.1}
test_duration_hours: int = 48,
success_criteria: ABTestCriteria | None = None
) -> ABTest:
"""
Start an A/B test between current and new model.
"""
test = ABTest(
test_id=uuid4(),
new_version=new_version,
traffic_split=traffic_split,
started_at=datetime.utcnow(),
ends_at=datetime.utcnow() + timedelta(hours=test_duration_hours),
criteria=success_criteria or ABTestCriteria(
min_accuracy_improvement=0.005, # 0.5% improvement required
max_latency_regression=0.1, # 10% latency increase max
max_false_positive_increase=0.05 # 5% FP increase max
),
status=ABTestStatus.RUNNING
)
# Configure traffic routing
await self.traffic_router.set_split(
model_a=(await self.model_registry.get_production_version()).version_string,
model_b=new_version,
split_a=traffic_split.get("current", 0.9),
split_b=traffic_split.get("new", 0.1)
)
# Start metrics collection
await self.metrics_collector.start_comparison(test)
return test
async def evaluate_ab_test(self, test_id: UUID) -> ABTestResult:
"""
Evaluate whether A/B test passed or failed.
"""
test = await self.ab_test_store.get(test_id)
metrics = await self.metrics_collector.get_comparison_metrics(test)
passed = (
metrics.accuracy_improvement >= test.criteria.min_accuracy_improvement
and metrics.latency_regression <= test.criteria.max_latency_regression
and metrics.false_positive_change <= test.criteria.max_false_positive_increase
)
if passed:
await self._promote_to_full(test)
else:
await self._revert_traffic(test)
return ABTestResult(
test_id=test_id,
passed=passed,
metrics=metrics,
recommendation="promote" if passed else "revert",
confidence=metrics.statistical_confidence
)
8. Quality Gates
8.1 Quality Gate Pipeline
+----------------+ +----------------+ +----------------+
| Hold-Out Test | | Model Compare | | Identity |
| Evaluation | --> | vs Current | --> | Regression |
| | | Production | | Check |
+----------------+ +----------------+ +----------------+
| | |
v v v
+--------+---------+ +--------+---------+ +--------+---------+
| PASS/FAIL | | PASS/FAIL | | PASS/FAIL |
| Threshold: | | Threshold: | | Threshold: |
| Precision >= 0.97| | No regression > | | 100% recall on |
| Recall >= 0.95 | | 2% on any metric | | all known IDs |
+------------------+ +------------------+ +------------------+
| | |
+----------+-----------+----------------------+
|
v
+----------+-----------+
| ALL PASSED? |
+----------+-----------+
|
+--------+--------+
| |
v v
+-------------+ +------------------+
| DEPLOY | | REJECT & NOTIFY |
| (Proceed) | | (Log + Alert) |
+-------------+ +------------------+
8.2 Quality Gate Implementation
class QualityGates:
"""Quality gate checks before model deployment."""
# Thresholds
MIN_PRECISION: float = 0.97
MIN_RECALL: float = 0.95
MIN_F1_SCORE: float = 0.96
MAX_LATENCY_MS: float = 150.0
MAX_REGRESSION_PCT: float = 0.02 # 2% regression allowed
MIN_KNOWN_IDENTITY_ACCURACY: float = 1.0 # 100% on known IDs
async def run_all_gates(
self,
candidate_model: ModelVersion,
current_model: ModelVersion,
test_dataset: TestDataset
) -> QualityGateResult:
"""Run all quality gates and return combined result."""
results = []
# Gate 1: Hold-out test performance
results.append(await self._gate_holdout_performance(
candidate_model, test_dataset
))
# Gate 2: Comparison against production model
results.append(await self._gate_no_regression(
candidate_model, current_model, test_dataset
))
# Gate 3: Known identity accuracy
results.append(await self._gate_known_identity_accuracy(
candidate_model, test_dataset
))
# Gate 4: Latency check
results.append(await self._gate_latency_requirement(
candidate_model
))
# Gate 5: Confusion matrix analysis
results.append(await self._gate_confusion_analysis(
candidate_model, test_dataset
))
# Combine results
all_passed = all(r.passed for r in results)
return QualityGateResult(
passed=all_passed,
gate_results=results,
overall_recommendation="deploy" if all_passed else "reject",
can_override=not any(r.critical for r in results),
details={r.gate_name: r.details for r in results}
)
async def _gate_holdout_performance(
self,
model: ModelVersion,
test_dataset: TestDataset
) -> GateResult:
"""Gate 1: Must meet minimum performance on hold-out set."""
metrics = await self.evaluator.evaluate(model, test_dataset)
passed = (
metrics.precision >= self.MIN_PRECISION
and metrics.recall >= self.MIN_RECALL
and metrics.f1_score >= self.MIN_F1_SCORE
)
return GateResult(
gate_name="holdout_performance",
passed=passed,
critical=True, # Cannot override
details={
"precision": metrics.precision,
"precision_threshold": self.MIN_PRECISION,
"recall": metrics.recall,
"recall_threshold": self.MIN_RECALL,
"f1_score": metrics.f1_score,
"f1_threshold": self.MIN_F1_SCORE,
"false_positive_rate": metrics.fpr,
"false_negative_rate": metrics.fnr
},
failure_reason=None if passed else (
f"Precision {metrics.precision:.4f} < {self.MIN_PRECISION} "
f"or Recall {metrics.recall:.4f} < {self.MIN_RECALL}"
)
)
async def _gate_no_regression(
self,
candidate: ModelVersion,
current: ModelVersion,
test_dataset: TestDataset
) -> GateResult:
"""Gate 2: New model must not regress on any key metric."""
candidate_metrics = await self.evaluator.evaluate(candidate, test_dataset)
current_metrics = await self.evaluator.evaluate(current, test_dataset)
regressions = {}
for metric_name in ["precision", "recall", "f1_score"]:
candidate_val = getattr(candidate_metrics, metric_name)
current_val = getattr(current_metrics, metric_name)
regression = (current_val - candidate_val) / current_val if current_val > 0 else 0
regressions[metric_name] = regression
max_regression = max(regressions.values())
passed = max_regression <= self.MAX_REGRESSION_PCT
return GateResult(
gate_name="no_regression",
passed=passed,
critical=False, # Can be overridden by admin
details={
"max_regression_pct": max_regression * 100,
"threshold_pct": self.MAX_REGRESSION_PCT * 100,
"per_metric_regression": regressions,
"candidate_metrics": candidate_metrics.dict(),
"current_metrics": current_metrics.dict()
},
failure_reason=None if passed else (
f"Maximum regression {max_regression*100:.2f}% exceeds "
f"threshold {self.MAX_REGRESSION_PCT*100:.2f}%"
)
)
async def _gate_known_identity_accuracy(
self,
model: ModelVersion,
test_dataset: TestDataset
) -> GateResult:
"""Gate 3: Must have 100% accuracy on known identities."""
known_identities = test_dataset.get_known_identities()
per_identity_metrics = await self.evaluator.evaluate_per_identity(
model, known_identities
)
failing_identities = [
(id_id, m.accuracy)
for id_id, m in per_identity_metrics.items()
if m.accuracy < self.MIN_KNOWN_IDENTITY_ACCURACY
]
passed = len(failing_identities) == 0
return GateResult(
gate_name="known_identity_accuracy",
passed=passed,
critical=True,
details={
"total_known_identities": len(known_identities),
"failing_identities": [
{"identity_id": str(i), "accuracy": a}
for i, a in failing_identities
]
},
failure_reason=None if passed else (
f"{len(failing_identities)} known identities have "
f"accuracy below {self.MIN_KNOWN_IDENTITY_ACCURACY}"
)
)
8.3 Quality Gate Report
{
"gate_run_id": "uuid-string",
"candidate_model_version": "1.2.0",
"baseline_model_version": "1.1.0",
"timestamp": "2024-01-25T10:30:00Z",
"overall_result": "PASSED",
"gates": [
{
"name": "holdout_performance",
"status": "PASSED",
"critical": true,
"metrics": {
"precision": 0.9842,
"recall": 0.9678,
"f1_score": 0.9759
}
},
{
"name": "no_regression",
"status": "PASSED",
"metrics": {
"max_regression_pct": 0.8,
"per_metric": {
"precision": 0.003,
"recall": -0.008,
"f1_score": -0.002
}
}
},
{
"name": "known_identity_accuracy",
"status": "PASSED",
"metrics": {
"known_identities_tested": 142,
"perfect_accuracy": 142,
"accuracy_below_threshold": 0
}
},
{
"name": "latency_requirement",
"status": "PASSED",
"metrics": {
"p50_latency_ms": 45,
"p99_latency_ms": 128,
"threshold_ms": 150
}
}
]
}
9. Embedding Update Strategy
9.1 Update Strategy Decision Matrix
class EmbeddingUpdateStrategy(Enum):
"""Strategies for updating the embedding database."""
CENTROID_UPDATE = "centroid_update" # Update running mean only
FULL_REINDEX = "full_reindex" # Recompute all embeddings
INCREMENTAL_ADD = "incremental_add" # Add new, keep existing
MERGE_AND_UPDATE = "merge_and_update" # Handle identity merges
ROLLBACK_REINDEX = "rollback_reindex" # Restore previous embeddings
def select_embedding_strategy(
change_type: str,
affected_identities: int,
total_identities: int,
model_version_changed: bool
) -> EmbeddingUpdateStrategy:
"""
Select the optimal embedding update strategy.
+------------------+------------------+------------------+------------------+
| Scenario | Model Same | Model Changed | Action |
+------------------+------------------+------------------+------------------+
| Few new examples | Centroid update | Full reindex | CENTROID_UPDATE |
| (< 10/identity) | (fast, online) | (required) | or FULL_REINDEX |
+------------------+------------------+------------------+------------------+
| Many new examples| Incremental add | Full reindex | INCREMENTAL_ADD |
| (10-100/identity)| + centroid | | or FULL_REINDEX |
+------------------+------------------+------------------+------------------+
| Identity merge | Merge & update | Full reindex | MERGE_AND_UPDATE |
+------------------+------------------+------------------+------------------+
| Model rollback | N/A | Rollback | ROLLBACK_REINDEX |
+------------------+------------------+------------------+------------------+
"""
if model_version_changed:
return EmbeddingUpdateStrategy.FULL_REINDEX
if change_type == "merge":
return EmbeddingUpdateStrategy.MERGE_AND_UPDATE
if change_type == "rollback":
return EmbeddingUpdateStrategy.ROLLBACK_REINDEX
if affected_identities / total_identities > 0.1: # > 10% changed
return EmbeddingUpdateStrategy.FULL_REINDEX
if change_type == "new_examples" and affected_identities < 10:
return EmbeddingUpdateStrategy.CENTROID_UPDATE
return EmbeddingUpdateStrategy.INCREMENTAL_ADD
9.2 Centroid Update (Online, Fast)
class CentroidUpdater:
"""
Fast online update of identity centroids without full model inference.
Suitable for small batches of new confirmed examples.
"""
async def update_centroid(
self,
identity_id: UUID,
new_embeddings: list[np.ndarray]
) -> UpdatedIdentity:
"""
Update the centroid embedding for an identity with new examples.
Uses running average with configurable decay.
"""
identity = await self.identity_store.get(identity_id)
# Get current centroid and count
current_centroid = identity.embedding_centroid
current_count = identity.positive_example_count
# Compute new embeddings centroid
new_centroid = np.mean(new_embeddings, axis=0)
new_count = len(new_embeddings)
# Weighted average with configurable decay
# Higher alpha = faster adaptation to new examples
alpha = self.config.centroid_update_alpha # e.g., 0.3
updated_centroid = (
(1 - alpha) * current_centroid * current_count +
alpha * new_centroid * new_count
) / ((1 - alpha) * current_count + alpha * new_count)
# Normalize (embeddings are typically L2 normalized)
updated_centroid = updated_centroid / np.linalg.norm(updated_centroid)
# Update confidence radius
distances = [
np.linalg.norm(emb - updated_centroid)
for emb in new_embeddings
]
new_radius = max(distances) if distances else identity.confidence_radius
updated_radius = max(
identity.confidence_radius * 0.95, # Decay old radius slightly
new_radius
)
# Update in database and index
await self.identity_store.update_embedding(
identity_id=identity_id,
centroid=updated_centroid.tolist(),
confidence_radius=updated_radius,
example_count=current_count + new_count,
last_updated=datetime.utcnow()
)
# Update FAISS index
await self.embedding_index.update_vector(
identity_id=identity_id,
new_vector=updated_centroid
)
return UpdatedIdentity(
identity_id=identity_id,
previous_centroid=current_centroid.tolist(),
new_centroid=updated_centroid.tolist(),
update_type="centroid_online",
examples_added=new_count,
processing_time_ms=0 # Near-instant
)
9.3 Full Reindex (Batch, Comprehensive)
class FullReindexer:
"""
Complete reindexing of all embeddings. Required when:
- Model version changes
- Large batch updates (> 10% of identities)
- Embedding dimension changes
- Periodic consistency maintenance
"""
async def full_reindex(
self,
model_version: str,
batch_size: int = 256,
parallel_workers: int = 4
) -> ReindexResult:
"""
Recompute all embeddings for all training examples using
the specified model version. Build a new FAISS index.
"""
start_time = datetime.utcnow()
# Phase 1: Collect all training examples
all_examples = await self.training_store.get_all_approved_examples()
# Phase 2: Group by identity
by_identity = defaultdict(list)
for ex in all_examples:
by_identity[ex.identity_id].append(ex)
# Phase 3: Recompute embeddings in batches
new_embeddings = {}
model = await self.model_registry.load_model(model_version)
for identity_id, examples in tqdm(by_identity.items()):
# Load face images
images = await self._load_face_images(
[ex.face_image_path for ex in examples]
)
# Batch inference
embeddings = await self._batch_inference(model, images, batch_size)
# Compute new centroid
centroid = np.mean(embeddings, axis=0)
centroid = centroid / np.linalg.norm(centroid)
# Compute confidence radius
distances = [np.linalg.norm(e - centroid) for e in embeddings]
radius = np.percentile(distances, 95) # 95th percentile
new_embeddings[identity_id] = {
"centroid": centroid,
"radius": radius,
"count": len(examples),
"all_embeddings": embeddings
}
# Phase 4: Build new FAISS index
new_index = self._build_faiss_index(new_embeddings)
# Phase 5: Atomic swap (dual-index strategy)
await self._atomic_index_swap(
old_index=self.embedding_index.current_index(),
new_index=new_index
)
# Phase 6: Update identity records
for identity_id, data in new_embeddings.items():
await self.identity_store.update_embedding(
identity_id=identity_id,
centroid=data["centroid"].tolist(),
confidence_radius=data["radius"],
example_count=data["count"],
model_version=model_version,
last_reindexed=datetime.utcnow()
)
# Phase 7: Consistency check
consistency = await self._run_consistency_check(new_index, new_embeddings)
duration = (datetime.utcnow() - start_time).total_seconds()
return ReindexResult(
model_version=model_version,
identities_reindexed=len(new_embeddings),
total_examples=len(all_examples),
index_build_time_seconds=duration,
consistency_check=consistency,
index_size_mb=new_index.size() / (1024 * 1024)
)
9.4 Identity Merge Embedding Update
class MergeEmbeddingUpdater:
"""Handle embedding updates when identities are merged."""
async def handle_identity_merge(
self,
source_identity_id: UUID,
target_identity_id: UUID,
merge_method: str = "weighted_centroid"
) -> MergeResult:
"""
Merge embeddings from source identity into target identity.
"""
source = await self.identity_store.get(source_identity_id)
target = await self.identity_store.get(target_identity_id)
if merge_method == "weighted_centroid":
# Weighted average based on example counts
total = source.positive_example_count + target.positive_example_count
merged_centroid = (
np.array(source.embedding_centroid) * source.positive_example_count +
np.array(target.embedding_centroid) * target.positive_example_count
) / total
merged_centroid = merged_centroid / np.linalg.norm(merged_centroid)
elif merge_method == "recompute_all":
# Recompute from all combined examples (slower but more accurate)
all_examples = (
source.get_all_embeddings() +
target.get_all_embeddings()
)
merged_centroid = np.mean(all_examples, axis=0)
merged_centroid = merged_centroid / np.linalg.norm(merged_centroid)
# Update target identity
await self.identity_store.update_embedding(
identity_id=target_identity_id,
centroid=merged_centroid.tolist(),
confidence_radius=max(source.confidence_radius, target.confidence_radius),
example_count=total,
merged_from=[source_identity_id]
)
# Remove source from embedding index
await self.embedding_index.delete_vector(source_identity_id)
# Update target in embedding index
await self.embedding_index.update_vector(
identity_id=target_identity_id,
new_vector=merged_centroid
)
# Mark source identity as merged
await self.identity_store.mark_merged(
source_identity_id,
merged_into=target_identity_id
)
return MergeResult(
source_identity_id=source_identity_id,
target_identity_id=target_identity_id,
combined_example_count=total,
new_centroid=merged_centroid.tolist(),
processing_time_ms=duration_ms
)
9.5 Consistency Checks
class EmbeddingConsistencyChecker:
"""Verify embedding database consistency after updates."""
async def run_consistency_check(self) -> ConsistencyReport:
"""
Run post-update consistency verification.
"""
issues = []
# Check 1: Every identity has a centroid
identities = await self.identity_store.get_all_active()
for identity in identities:
if identity.embedding_centroid is None:
issues.append(ConsistencyIssue(
type="missing_centroid",
identity_id=identity.identity_id,
severity="critical",
message=f"Identity {identity.identity_id} has no embedding centroid"
))
# Check 2: Every centroid is in the FAISS index
index = self.embedding_index.current_index()
indexed_ids = set(index.get_all_ids())
for identity in identities:
if identity.identity_id not in indexed_ids:
issues.append(ConsistencyIssue(
type="missing_from_index",
identity_id=identity.identity_id,
severity="critical",
message=f"Identity {identity.identity_id} not found in FAISS index"
))
# Check 3: Centroid matches index vector
for identity in identities:
if identity.identity_id in indexed_ids:
index_vector = index.get_vector(identity.identity_id)
centroid = np.array(identity.embedding_centroid)
if not np.allclose(index_vector, centroid, atol=1e-5):
issues.append(ConsistencyIssue(
type="centroid_mismatch",
identity_id=identity.identity_id,
severity="high",
message="Database centroid differs from index vector"
))
# Check 4: No orphaned index entries
active_ids = {i.identity_id for i in identities}
orphaned = indexed_ids - active_ids
for orphan_id in orphaned:
issues.append(ConsistencyIssue(
type="orphaned_index_entry",
identity_id=orphan_id,
severity="medium",
message=f"FAISS entry for deleted identity: {orphan_id}"
))
# Check 5: Embedding dimension consistency
expected_dim = self.config.embedding_dimension
for identity in identities:
if len(identity.embedding_centroid) != expected_dim:
issues.append(ConsistencyIssue(
type="dimension_mismatch",
identity_id=identity.identity_id,
severity="critical",
message=f"Expected dim {expected_dim}, got {len(identity.embedding_centroid)}"
))
# Auto-fix minor issues
fixed = await self._auto_fix_issues([i for i in issues if i.severity == "medium"])
return ConsistencyReport(
total_identities=len(identities),
total_issues=len(issues),
critical_issues=len([i for i in issues if i.severity == "critical"]),
auto_fixed=len(fixed),
requires_manual_fix=len([i for i in issues if i.severity in ("high", "critical")]),
details=issues
)
10. Training Dataset Management
10.1 Dataset Versioning
class TrainingDatasetVersion(BaseModel):
"""A versioned snapshot of the training dataset."""
version_id: str # Semantic version: "1.2.3"
created_at: datetime
created_by: UUID # User or system
# Composition
total_identities: int
total_positive_examples: int
total_negative_examples: int
total_examples: int
# Per-identity breakdown
identity_breakdown: list[IdentityDatasetStats]
# Source events
included_event_ids: list[UUID] # All training events in this version
excluded_event_ids: list[UUID] # Events excluded (quality/conflict)
# Quality summary
avg_face_quality_score: float
avg_embedding_quality: float
conflict_count: int # Should be 0 (conflicts resolved before versioning)
# Storage
dataset_manifest_path: str # S3 path to manifest.json
images_tar_path: str # S3 path to images tarball
embeddings_path: str # S3 path to precomputed embeddings
# Audit
parent_version: str | None # Previous version this was derived from
changelog: str # Human-readable changes
retention_until: datetime # When this version can be deleted
class IdentityDatasetStats(BaseModel):
"""Statistics for a single identity in a dataset version."""
identity_id: UUID
identity_name: str
positive_examples: int
negative_examples: int
avg_quality_score: float
camera_diversity: int # Number of unique cameras
time_span_days: int # Time span of examples
newest_example_age_days: int
10.2 Dataset Export
class DatasetExporter:
"""Export training datasets in various formats."""
async def export_dataset(
self,
version: str,
format: ExportFormat,
include_images: bool = True,
anonymize: bool = False,
filter_criteria: ExportFilter | None = None
) -> ExportResult:
"""
Export a dataset version to a portable format.
Formats:
- STRUCTURED_JSON: Organized by identity with metadata
- COCO_FORMAT: Standard COCO detection format
- REID_FORMAT: Person re-identification format
- RAW_IMAGES: Flat folder structure with label files
"""
dataset = await self.dataset_store.load_version(version)
if filter_criteria:
dataset = self._apply_filter(dataset, filter_criteria)
# Build manifest
manifest = self._build_manifest(dataset, format)
# Export images (if requested)
if include_images:
image_archive = await self._export_images(
dataset, anonymize=anonymize
)
else:
image_archive = None
# Write to export location
export_path = f"exports/{version}/{format.value}_{datetime.utcnow().isoformat()}.tar.gz"
await self.object_store.upload(
path=export_path,
data={"manifest": manifest, "images": image_archive}
)
return ExportResult(
export_path=export_path,
format=format,
total_identities=len(dataset.identities),
total_examples=dataset.total_examples,
file_size_mb=await self.object_store.get_size(export_path)
)
10.3 Retention Policy
class RetentionPolicy:
"""Manage training data retention with privacy controls."""
DEFAULT_POLICY = {
# Event retention
"rejected_events_days": 30, # Keep rejected events for 30 days
"approved_events_years": 2, # Keep approved events for 2 years
"conflicted_events_years": 1, # Keep conflict records for 1 year
# Dataset versions
"dataset_versions_keep": 10, # Keep last 10 dataset versions
"dataset_versions_min_days": 90, # Minimum retention for any version
# Images
"training_images_years": 2, # Keep training images for 2 years
"deleted_identity_images_days": 30, # Keep images of deleted identities 30d
# Model artifacts
"model_versions_keep": 20, # Keep last 20 model versions
"archived_models_days": 365, # Archived models kept 1 year
# Audit
"audit_logs_years": 5, # Audit logs kept 5 years
}
async def apply_retention(self):
"""
Apply retention policy: soft-delete expired data,
hard-delete after grace period.
"""
# Soft-delete expired training events
expired_events = await self.db.find_expired_events(
before=datetime.utcnow() - timedelta(days=self.policy["rejected_events_days"])
)
for event in expired_events:
await self.db.soft_delete_event(event.event_id)
# Hard-delete after grace period
grace_period_events = await self.db.find_soft_deleted_before(
before=datetime.utcnow() - timedelta(days=30)
)
for event in grace_period_events:
# Delete image files
await self.object_store.delete(event.face_image_path)
await self.db.hard_delete_event(event.event_id)
# Cleanup old dataset versions
versions = await self.dataset_store.list_versions()
if len(versions) > self.policy["dataset_versions_keep"]:
to_delete = versions[self.policy["dataset_versions_keep"]:]
for version in to_delete:
if version.age_days > self.policy["dataset_versions_min_days"]:
await self.dataset_store.archive_version(version.version_id)
10.4 Privacy Controls
class PrivacyControls:
"""Access control and anonymization for training data."""
PERMISSION_MATRIX = {
# Role -> [permissions]
"admin": ["view_all", "export", "delete", "anonymize", "change_retention"],
"trainer": ["view_approved", "export_filtered", "request_delete"],
"reviewer": ["view_pending", "approve_reject", "view_own_actions"],
"auditor": ["view_analytics", "view_audit_log", "export_metrics_only"],
"viewer": ["view_own_identities", "view_own_examples"],
}
async def can_view_training_image(
self,
user_id: UUID,
image_path: str,
identity_id: UUID
) -> bool:
"""
Check if user can view a specific training image.
"""
user = await self.auth.get_user(user_id)
# Admins can view all
if user.role == "admin":
return True
# Users can view images they reviewed
if await self.db.user_reviewed_image(user_id, image_path):
return True
# Users can view images for identities they manage
if await self.db.user_manages_identity(user_id, identity_id):
return True
# Training role can view approved examples (with audit)
if user.role == "trainer":
await self.audit_log.record_image_view(user_id, image_path)
return True
return False
async def anonymize_identity(
self,
identity_id: UUID,
method: str = "blur_faces"
) -> AnonymizeResult:
"""
Anonymize all training images for an identity.
Used for GDPR/privacy compliance.
"""
examples = await self.training_store.get_identity_examples(identity_id)
for example in examples:
if method == "blur_faces":
await self._apply_face_blur(example.face_image_path)
elif method == "remove_images":
await self.object_store.delete(example.face_image_path)
await self.training_store.mark_image_deleted(example.event_id)
elif method == "synthetic_replace":
# Replace with synthetic face preserving attributes
await self._replace_with_synthetic(example)
return AnonymizeResult(
identity_id=identity_id,
method=method,
examples_processed=len(examples),
completion_time=datetime.utcnow()
)
11. Performance Monitoring
11.1 Monitoring Architecture
+------------------------------------------------------------------+
| PERFORMANCE MONITORING |
+------------------------------------------------------------------+
| |
| +------------------+ +------------------+ +------------------+ |
| | Detection Events | | Training Events | | Model Predictions| |
| +--------+---------+ +--------+---------+ +--------+---------+ |
| | | | |
| v v v |
| +--------+---------+ +--------+---------+ +--------+---------+ |
| | Metrics | | Metrics | | Metrics | |
| | Collector | | Collector | | Collector | |
| +--------+---------+ +--------+---------+ +--------+---------+ |
| | | | |
| +----------+----------+----------+----------+ |
| | | |
| v v |
| +-------+-------+ +--------+-------+ |
| | Prometheus | | Alerting | |
| | Time-Series | | Manager | |
| | Database | +--------+-------+ |
| +-------+-------+ | |
| | v |
| v +--------+-------+ |
| +-------+-------+ | Slack/Email/ | |
| | Grafana | | PagerDuty | |
| | Dashboards | +----------------+ |
| +---------------+ |
+------------------------------------------------------------------+
11.2 Key Metrics
class PerformanceMetrics:
"""Metrics tracked for the face recognition system."""
# Per-identity metrics (tracked over time)
class IdentityMetrics(BaseModel):
identity_id: UUID
timestamp: datetime
# Accuracy
true_positives: int
false_positives: int
true_negatives: int
false_negatives: int
precision: float
recall: float
f1_score: float
# Confidence
avg_match_confidence: float
confidence_std: float
# Drift
embedding_centroid_drift: float # Distance from training centroid
match_distance_trend: float # Increasing = model struggling
# Coverage
detection_rate: float # % of appearances detected
unique_cameras_detected: int
# Global metrics
class GlobalMetrics(BaseModel):
timestamp: datetime
model_version: str
# Overall accuracy
overall_precision: float
overall_recall: float
overall_f1: float
# Error rates
false_positive_rate: float
false_negative_rate: float
false_discovery_rate: float
# System health
avg_inference_latency_ms: float
p99_inference_latency_ms: float
embedding_index_size: int
active_identities: int
# Training
total_training_examples: int
examples_last_24h: int
pending_suggestions: int
open_conflicts: int
# Drift detection
class DriftMetrics(BaseModel):
timestamp: datetime
model_version: str
# Embedding drift
avg_centroid_drift: float # Average drift across all identities
max_centroid_drift: float # Worst-case drift
drift_alert_threshold: float
# Performance drift
accuracy_trend: float # Slope over last 7 days
precision_trend: float
recall_trend: float
# Data distribution drift
input_distribution_shift: float # KL divergence from training distribution
new_identity_rate: float # % of detections that are new faces
# Recommendations
retrain_recommended: bool
retrain_reason: str | None
estimated_accuracy_if_retrained: float | None
11.3 Drift Detection
class ModelDriftDetector:
"""Detects when model performance is degrading over time."""
DRIFT_THRESHOLDS = {
"accuracy_drop_pct": 0.03, # 3% drop triggers alert
"latency_increase_pct": 0.20, # 20% latency increase triggers alert
"centroid_drift_threshold": 0.15, # Average drift > 0.15 triggers alert
"new_identity_rate_threshold": 0.3, # >30% unknowns suggests model gap
"consecutive_degraded_days": 3, # 3 days of degradation triggers retrain
}
async def detect_drift(self) -> DriftReport:
"""
Run drift detection and return report with recommendations.
"""
report = DriftReport(timestamp=datetime.utcnow())
# 1. Compare recent accuracy vs. baseline
recent_accuracy = await self._get_recent_accuracy(days=7)
baseline_accuracy = await self._get_baseline_accuracy()
accuracy_drop = baseline_accuracy - recent_accuracy
if accuracy_drop > self.DRIFT_THRESHOLDS["accuracy_drop_pct"]:
report.alerts.append(DriftAlert(
type="accuracy_degradation",
severity="high",
message=f"Accuracy dropped {accuracy_drop*100:.1f}% from baseline",
recent_value=recent_accuracy,
baseline_value=baseline_accuracy
))
# 2. Check embedding centroid drift
avg_drift = await self._calculate_avg_centroid_drift(days=7)
if avg_drift > self.DRIFT_THRESHOLDS["centroid_drift_threshold"]:
report.alerts.append(DriftAlert(
type="embedding_drift",
severity="medium",
message=f"Average centroid drift: {avg_drift:.3f}",
recent_value=avg_drift,
baseline_value=0.0
))
# 3. Check latency trends
recent_latency = await self._get_recent_p99_latency(days=7)
baseline_latency = await self._get_baseline_latency()
latency_increase = (recent_latency - baseline_latency) / baseline_latency
if latency_increase > self.DRIFT_THRESHOLDS["latency_increase_pct"]:
report.alerts.append(DriftAlert(
type="latency_increase",
severity="medium",
message=f"P99 latency increased {latency_increase*100:.1f}%",
recent_value=recent_latency,
baseline_value=baseline_latency
))
# 4. Check for consecutive degradation
consecutive_degraded = await self._check_consecutive_degraded_days()
if consecutive_degraded >= self.DRIFT_THRESHOLDS["consecutive_degraded_days"]:
report.recommendations.append(RetrainRecommendation(
trigger="consecutive_degradation",
urgency="high",
reason=f"{consecutive_degraded} consecutive days of performance degradation",
estimated_benefit=f"Restore accuracy to ~{baseline_accuracy:.3f}"
))
# Determine overall drift status
report.is_drifting = len(report.alerts) > 0
report.retrain_recommended = any(
r.urgency == "high" for r in report.recommendations
)
return report
11.4 Alert Configuration
class AlertConfiguration:
"""Alert rules for the training system."""
ALERT_RULES = [
# Performance alerts
{
"name": "accuracy_drop_critical",
"condition": "overall_precision < 0.95",
"severity": "critical",
"channels": ["pagerduty", "slack"],
"cooldown_minutes": 30
},
{
"name": "accuracy_drop_warning",
"condition": "overall_precision < 0.97 AND >= 0.95",
"severity": "warning",
"channels": ["slack"],
"cooldown_minutes": 60
},
{
"name": "identity_accuracy_degradation",
"condition": "any(identity.precision < 0.9 for identity in identities)",
"severity": "warning",
"channels": ["slack"],
"cooldown_minutes": 120
},
{
"name": "drift_detected",
"condition": "drift_report.is_drifting == True",
"severity": "warning",
"channels": ["slack"],
"cooldown_minutes": 360
},
{
"name": "high_retrain_recommended",
"condition": "drift_report.retrain_recommended == True",
"severity": "info",
"channels": ["slack", "email"],
"cooldown_minutes": 1440
},
# Training queue alerts
{
"name": "training_queue_backlog",
"condition": "pending_suggestions > 500",
"severity": "warning",
"channels": ["slack"],
"cooldown_minutes": 120
},
{
"name": "label_conflicts_open",
"condition": "open_conflicts > 10",
"severity": "warning",
"channels": ["slack"],
"cooldown_minutes": 240
},
{
"name": "gpu_resource_exhausted",
"condition": "gpu_queue_wait_time_minutes > 60",
"severity": "warning",
"channels": ["slack"],
"cooldown_minutes": 60
},
]
12. UI for Training Management
12.1 Training Dashboard Layout
+------------------------------------------------------------------+
| [Logo] Surveillance AI Training Center [User: Admin] |
+------------------------------------------------------------------+
| |
| +------------------+ +------------------+ +------------------+ |
| | Training Status | | Suggestions | | Model Health | |
| | | | Pending Review | | | |
| | [=====> ] 67% | | | | Precision: 98.4% | |
| | Job: #1247 | | 42 suggestions | | Recall: 96.7% | |
| | ETA: 12 min | | from 8 identities| | F1: 97.5% | |
| | | | | | | |
| | [Pause] [Cancel] | | [Review All] | | [View Details] | |
| +------------------+ +------------------+ +------------------+ |
| |
| +------------------+ +------------------+ +------------------+ |
| | Recent Activity | | Conflicts | | Model Version | |
| | | | | | | |
| | + Identity | | 3 open conflicts | | Current: v1.2.0 | |
| | 'John Smith' | | (needs review) | | Staging: v1.3.0 | |
| | +12 examples | | | | | |
| | | | [View Conflicts] | | [View History] | |
| | + 2 corrections | | | | [Trigger Train] | |
| | processed | | | | | |
| +------------------+ +------------------+ +------------------+ |
| |
+------------------------------------------------------------------+
| [Dashboard] [Suggestions] [Conflicts] [Models] [Datasets] [Logs] |
+------------------------------------------------------------------+
12.2 Suggestion Review Interface
+------------------------------------------------------------------+
| Suggestions > Identity: John Smith [Learning Mode: Suggested] |
+------------------------------------------------------------------+
| |
| Identity: John Smith (#uuid-1234) |
| Current examples: 23 positive, 8 negative |
| Estimated accuracy impact: +1.2% |
| |
| +------------------+ +------------------+ +------------------+ |
| | [Thumbnail 1] | | [Thumbnail 2] | | [Thumbnail 3] | |
| | Confidence: 0.97 | | Confidence: 0.94 | | Confidence: 0.91 | |
| | Camera: Cam-01 | | Camera: Cam-03 | | Camera: Cam-01 | |
| | 2024-01-20 14:32 | | 2024-01-21 09:15 | | 2024-01-22 16:45 | |
| | | | | | | |
| | [x] Approve | | [x] Approve | | [ ] Approve | |
| | [ ] Reject | | [ ] Reject | | [x] Reject | |
| | [ ] Correct... | | [ ] Correct... | | [ ] Correct... | |
| +------------------+ +------------------+ +------------------+ |
| |
| +------------------+ +------------------+ +------------------+ |
| | [Thumbnail 4] | | [Thumbnail 5] | | [Thumbnail 6] | |
| | Confidence: 0.89 | | Confidence: 0.87 | | Confidence: 0.85 | |
| | | | | | | |
| | [x] Approve | | [ ] Approve | | [ ] Approve | |
| | [ ] Reject | | [ ] Reject | | [ ] Reject | |
| | [ ] Correct... | | [ ] Correct... | | [ ] Correct... | |
| +------------------+ +------------------+ +------------------+ |
| |
| [Select All] [Select None] [Approve Selected] [Reject Selected] |
| |
| Conflict Risk: LOW | Diversity Score: 0.73 | Quality: GOOD |
+------------------------------------------------------------------+
12.3 Conflict Resolution Interface
+------------------------------------------------------------------+
| Conflicts > #conf-456 (SAME_FACE_DIFF_ID) |
+------------------------------------------------------------------+
| |
| ALERT: Same face assigned to different identities |
| Face Detection: #det-789 | Quality: 0.94 |
| |
| +--------------------+ +--------------------+ |
| | [Large Face Image] | | Assignment History | |
| | | | | |
| | | | + 2024-01-15 | |
| | | | By: user_a | |
| | High-res face | | Assigned to: | |
| | bounding box | | "John Smith" | |
| | with landmarks | | [View Event] | |
| | | | | |
| | | | + 2024-01-22 | |
| | | | By: user_b | |
| | | | Corrected to: | |
| | | | "Bob Johnson" | |
| | | | [View Event] | |
| +--------------------+ +--------------------+ |
| |
| Resolution: |
| ( ) This face is "John Smith" - keep first assignment |
| ( ) This face is "Bob Johnson" - accept the correction |
| ( ) This is a different person - create new identity |
| ( ) Cannot determine - escalate to senior reviewer |
| |
| [Submit Resolution] [View Similar Faces] [Escalate] |
+------------------------------------------------------------------+
12.4 Model Version History
+------------------------------------------------------------------+
| Models > Version History |
+------------------------------------------------------------------+
| |
| [Filter: All] [Sort: Newest] [Compare] [Export Report] |
| |
| +------+----------+--------+----------+----------+---------------+ |
| | Ver | Date | Status | Precision| Recall | Actions | |
| +------+----------+--------+----------+----------+---------------+ |
| | 1.3.0| Jan 25 | Staging| 98.4% | 96.8% | [View][Deploy][A/B]| |
| | 1.2.1| Jan 23 | Prod | 98.2% | 96.7% | [View][Rollback] | |
| | 1.2.0| Jan 20 | Arch | 97.8% | 96.1% | [View][Restore] | |
| | 1.1.0| Jan 15 | Arch | 97.5% | 95.8% | [View] | |
| | 1.0.0| Jan 10 | Arch | 96.2% | 94.1% | [View] | |
| +------+----------+--------+----------+----------+---------------+ |
| |
| Version 1.3.0 Details: |
| - Training data: dataset-v1.3.0 (1,245 examples, 89 identities) |
| - Training strategy: Fine-tune (from v1.2.0 checkpoint) |
| - Training time: 45 minutes |
| - GPU used: NVIDIA V100 x 2 |
| - New identities added: 12 |
| - Quality gates: ALL PASSED |
| |
| [Download Model] [Download Dataset] [View Training Log] |
+------------------------------------------------------------------+
12.5 Training Trigger UI
+------------------------------------------------------------------+
| Manual Training Trigger |
+------------------------------------------------------------------+
| |
| Training Mode: |
| ( ) Incremental Update - Fast, update embeddings only |
| (*) Fine-tuning - Moderate, update model weights |
| ( ) Full Retraining - Slow, retrain from scratch |
| |
| Data Selection: |
| [x] Include all approved examples since last training |
| [x] Include new identities (5 new since last train) |
| [ ] Force retrain on all identities |
| [ ] Use custom dataset version: [________] [Browse] |
| |
| Advanced Options: |
| [ ] Override quality gates (admin only) |
| [ ] Auto-deploy on success |
| [ ] Enable A/B testing: [__10__]% traffic to new model |
| |
| Estimated: |
| - Training time: ~45 minutes |
| - GPU requirement: V100 x 2 |
| - Dataset size: ~1,245 examples |
| |
| [Preview Changes] [Schedule for Later] [START TRAINING NOW] |
+------------------------------------------------------------------+
13. Database Schema
13.1 Core Tables
-- ============================================================
-- TRAINING EVENTS
-- Captures every user action that could generate training data
-- ============================================================
CREATE TABLE training_events (
event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
event_type VARCHAR(32) NOT NULL, -- IDENTITY_NAMED, MATCH_CONFIRMED, etc.
status VARCHAR(16) NOT NULL DEFAULT 'pending',
-- pending, approved, rejected, conflicted, trained
-- Source detection reference
detection_id UUID NOT NULL,
face_image_path TEXT NOT NULL,
full_frame_path TEXT,
-- Identity assignment
identity_id UUID NOT NULL,
previous_identity_id UUID, -- For corrections: old identity
is_unknown BOOLEAN NOT NULL DEFAULT FALSE,
-- Embedding (at time of event)
embedding_vector VECTOR(512), -- pgvector extension
embedding_model_version VARCHAR(32) NOT NULL,
-- Quality metrics
face_quality_score FLOAT NOT NULL,
face_size_px INTEGER NOT NULL,
detection_confidence FLOAT NOT NULL,
pose_yaw FLOAT,
pose_pitch FLOAT,
pose_roll FLOAT,
-- Metadata
source_camera_id VARCHAR(64),
captured_at TIMESTAMP NOT NULL,
reviewed_by_user_id UUID NOT NULL,
review_timestamp TIMESTAMP NOT NULL,
review_method VARCHAR(32) NOT NULL DEFAULT 'manual_click',
-- Training lifecycle
suggestion_id UUID, -- Links to training suggestion
conflict_id UUID, -- Links to label conflict
approved_at TIMESTAMP,
trained_at TIMESTAMP,
model_version_trained VARCHAR(32),
-- Timestamps
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
-- Indexes
CONSTRAINT valid_status CHECK (status IN (
'pending', 'approved', 'rejected', 'conflicted', 'trained'
)),
CONSTRAINT valid_event_type CHECK (event_type IN (
'IDENTITY_NAMED', 'MATCH_CONFIRMED', 'MATCH_CORRECTED',
'IDENTITY_MERGED', 'MATCH_REJECTED'
))
);
CREATE INDEX idx_training_events_identity ON training_events(identity_id, status);
CREATE INDEX idx_training_events_detection ON training_events(detection_id);
CREATE INDEX idx_training_events_status ON training_events(status) WHERE status = 'pending';
CREATE INDEX idx_training_events_suggestion ON training_events(suggestion_id);
CREATE INDEX idx_training_events_conflict ON training_events(conflict_id);
CREATE INDEX idx_training_events_created ON training_events(created_at);
-- HNSW index for vector similarity search on embeddings
CREATE INDEX idx_training_events_embedding ON training_events
USING hnsw (embedding_vector vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
-- ============================================================
-- LABEL CONFLICTS
-- Tracks detected label conflicts requiring resolution
-- ============================================================
CREATE TABLE label_conflicts (
conflict_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
conflict_type VARCHAR(32) NOT NULL, -- SAME_FACE_DIFF_ID, TRANSITIVE_CHAIN, etc.
severity VARCHAR(16) NOT NULL, -- LOW, MEDIUM, HIGH, CRITICAL
status VARCHAR(16) NOT NULL DEFAULT 'open',
-- open, under_review, resolved, escalated
detection_id UUID NOT NULL,
face_image_path TEXT NOT NULL,
conflicting_labels JSONB NOT NULL, -- Array of {event_id, identity_id, user_id}
-- Resolution
resolution VARCHAR(16), -- label_selected, both_valid, escalated
resolved_by_user_id UUID,
winning_identity_id UUID,
resolution_notes TEXT,
blocks_training BOOLEAN NOT NULL DEFAULT TRUE,
detected_at TIMESTAMP NOT NULL DEFAULT NOW(),
resolved_at TIMESTAMP,
CONSTRAINT valid_severity CHECK (severity IN ('LOW', 'MEDIUM', 'HIGH', 'CRITICAL')),
CONSTRAINT valid_status CHECK (status IN ('open', 'under_review', 'resolved', 'escalated'))
);
CREATE INDEX idx_label_conflicts_status ON label_conflicts(status);
CREATE INDEX idx_label_conflicts_detection ON label_conflicts(detection_id);
-- ============================================================
-- TRAINING SUGGESTIONS
-- Aggregated suggestions for admin review
-- ============================================================
CREATE TABLE training_suggestions (
suggestion_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
identity_id UUID NOT NULL,
identity_name VARCHAR(256) NOT NULL,
status VARCHAR(16) NOT NULL DEFAULT 'pending',
-- pending, approved, rejected, corrected
-- Content summary
new_positive_count INTEGER NOT NULL DEFAULT 0,
corrected_count INTEGER NOT NULL DEFAULT 0,
merged_count INTEGER NOT NULL DEFAULT 0,
-- Impact assessment
estimated_accuracy_change FLOAT, -- Predicted accuracy delta
conflict_risk_score FLOAT NOT NULL, -- 0-1, higher = more risk
diversity_score FLOAT, -- 0-1, camera/time diversity
-- Current state
existing_positive_count INTEGER NOT NULL,
existing_negative_count INTEGER NOT NULL,
-- Review
reviewed_by UUID,
reviewed_at TIMESTAMP,
review_notes TEXT,
auto_approve_eligible BOOLEAN DEFAULT FALSE,
generated_at TIMESTAMP NOT NULL DEFAULT NOW(),
expires_at TIMESTAMP, -- Suggestions expire after 7 days
CONSTRAINT valid_status CHECK (status IN ('pending', 'approved', 'rejected', 'corrected'))
);
CREATE INDEX idx_training_suggestions_status ON training_suggestions(status);
CREATE INDEX idx_training_suggestions_identity ON training_suggestions(identity_id);
-- ============================================================
-- MODEL VERSIONS
-- Registry of all trained model versions
-- ============================================================
CREATE TABLE model_versions (
version_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
version_string VARCHAR(16) NOT NULL UNIQUE, -- Semantic: 1.2.3
model_name VARCHAR(64) NOT NULL DEFAULT 'face-recognition',
model_type VARCHAR(32) NOT NULL, -- arcface, facenet, custom
architecture VARCHAR(32) NOT NULL, -- iresnet100, mobilenet, etc.
embedding_dimension INTEGER NOT NULL DEFAULT 512,
-- Training lineage
training_job_id UUID NOT NULL,
training_dataset_version VARCHAR(32) NOT NULL,
base_model_version VARCHAR(16), -- Previous version (for fine-tuning)
training_strategy VARCHAR(32) NOT NULL, -- incremental, fine_tune, full_retrain
-- Artifacts
model_artifact_path TEXT NOT NULL,
checkpoint_path TEXT,
onnx_export_path TEXT,
tensorrt_path TEXT,
-- Performance (from quality gates)
test_precision FLOAT,
test_recall FLOAT,
test_f1_score FLOAT,
test_false_positive_rate FLOAT,
test_false_negative_rate FLOAT,
test_latency_p50_ms FLOAT,
test_latency_p99_ms FLOAT,
-- Deployment state
stage VARCHAR(16) NOT NULL DEFAULT 'staging',
-- staging, production, archived
deployment_date TIMESTAMP,
traffic_percentage FLOAT NOT NULL DEFAULT 0.0,
-- Rollback
can_rollback BOOLEAN NOT NULL DEFAULT TRUE,
rollback_to_version VARCHAR(16),
-- Audit
created_by UUID NOT NULL,
approved_by UUID,
approval_notes TEXT,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
CONSTRAINT valid_stage CHECK (stage IN ('staging', 'production', 'archived'))
);
CREATE INDEX idx_model_versions_stage ON model_versions(stage);
CREATE INDEX idx_model_versions_string ON model_versions(version_string);
-- ============================================================
-- TRAINING JOBS
-- Track all training job executions
-- ============================================================
CREATE TABLE training_jobs (
job_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
trigger_type VARCHAR(32) NOT NULL, -- scheduled, threshold, manual, degradation
status VARCHAR(32) NOT NULL DEFAULT 'queued',
-- See state machine below
-- Configuration
training_mode VARCHAR(32) NOT NULL, -- incremental, fine_tune, full_retrain
dataset_version VARCHAR(32),
gpu_type VARCHAR(16),
gpu_count INTEGER NOT NULL DEFAULT 1,
auto_deploy BOOLEAN DEFAULT FALSE,
override_quality_gates BOOLEAN DEFAULT FALSE,
-- Progress tracking
current_phase VARCHAR(32),
progress_percent FLOAT DEFAULT 0.0,
current_step TEXT,
-- Results
model_version_id UUID REFERENCES model_versions(version_id),
quality_gate_result JSONB,
training_metrics JSONB,
error_message TEXT,
-- Timing
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
started_at TIMESTAMP,
completed_at TIMESTAMP,
duration_seconds INTEGER,
-- Audit
created_by UUID NOT NULL,
CONSTRAINT valid_trigger CHECK (trigger_type IN (
'scheduled', 'threshold', 'manual', 'degradation', 'new_identity'
)),
CONSTRAINT valid_status CHECK (status IN (
'queued', 'acquiring_resources', 'building_dataset', 'determining_strategy',
'incremental_update', 'fine_tuning', 'full_training', 'quality_check',
'quality_failed', 'registering', 'deploying', 'completed',
'awaiting_deployment_approval', 'failed', 'cancelled'
))
);
CREATE INDEX idx_training_jobs_status ON training_jobs(status);
CREATE INDEX idx_training_jobs_created ON training_jobs(created_at);
-- ============================================================
-- DATASET VERSIONS
-- Track dataset snapshots used for training
-- ============================================================
CREATE TABLE dataset_versions (
version_id VARCHAR(32) PRIMARY KEY, -- Semantic: "dataset-v1.2.3"
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
created_by UUID NOT NULL,
total_identities INTEGER NOT NULL,
total_positive_examples INTEGER NOT NULL,
total_negative_examples INTEGER NOT NULL,
avg_face_quality_score FLOAT,
avg_embedding_quality FLOAT,
conflict_count INTEGER NOT NULL DEFAULT 0,
-- Storage
manifest_path TEXT NOT NULL,
images_archive_path TEXT,
embeddings_path TEXT,
-- Lineage
parent_version VARCHAR(32) REFERENCES dataset_versions(version_id),
changelog TEXT,
-- Retention
retention_until TIMESTAMP NOT NULL
);
-- ============================================================
-- PERFORMANCE METRICS (Time-series, optional: use TimescaleDB)
-- ============================================================
CREATE TABLE identity_performance_metrics (
metric_id BIGSERIAL PRIMARY KEY,
identity_id UUID NOT NULL,
model_version VARCHAR(16) NOT NULL,
timestamp TIMESTAMP NOT NULL DEFAULT NOW(),
true_positives INTEGER NOT NULL DEFAULT 0,
false_positives INTEGER NOT NULL DEFAULT 0,
true_negatives INTEGER NOT NULL DEFAULT 0,
false_negatives INTEGER NOT NULL DEFAULT 0,
precision FLOAT,
recall FLOAT,
f1_score FLOAT,
avg_match_confidence FLOAT,
embedding_centroid_drift FLOAT,
detection_rate FLOAT,
CONSTRAINT positive_metrics CHECK (
true_positives >= 0 AND false_positives >= 0 AND
true_negatives >= 0 AND false_negatives >= 0
)
);
-- Convert to hypertable with TimescaleDB
SELECT create_hypertable('identity_performance_metrics', 'timestamp');
CREATE INDEX idx_identity_perf_identity ON identity_performance_metrics(identity_id, timestamp);
CREATE INDEX idx_identity_perf_model ON identity_performance_metrics(model_version, timestamp);
-- ============================================================
-- AUDIT LOG
-- All administrative actions on training system
-- ============================================================
CREATE TABLE training_audit_log (
audit_id BIGSERIAL PRIMARY KEY,
action VARCHAR(64) NOT NULL,
actor_id UUID NOT NULL,
actor_role VARCHAR(32) NOT NULL,
target_type VARCHAR(32) NOT NULL, -- model, dataset, event, conflict, suggestion
target_id UUID NOT NULL,
details JSONB,
ip_address INET,
timestamp TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_audit_log_target ON training_audit_log(target_type, target_id);
CREATE INDEX idx_audit_log_actor ON training_audit_log(actor_id, timestamp);
CREATE INDEX idx_audit_log_timestamp ON training_audit_log(timestamp);
14. API Specification
14.1 Training Events API
openapi: 3.0.0
paths:
/api/v1/training/events:
post:
summary: Submit a training event
requestBody:
content:
application/json:
schema:
type: object
required: [event_type, detection_id, identity_id, reviewed_by_user_id]
properties:
event_type:
type: string
enum: [IDENTITY_NAMED, MATCH_CONFIRMED, MATCH_CORRECTED, IDENTITY_MERGED, MATCH_REJECTED]
detection_id:
type: string
format: uuid
identity_id:
type: string
format: uuid
previous_identity_id:
type: string
format: uuid
reviewed_by_user_id:
type: string
format: uuid
review_method:
type: string
enum: [manual_click, bulk_approve, auto, api]
responses:
201:
description: Event created
content:
application/json:
schema:
$ref: '#/components/schemas/TrainingEvent'
409:
description: Conflict detected - requires resolution
/api/v1/training/events/bulk:
post:
summary: Submit multiple training events in bulk
requestBody:
content:
application/json:
schema:
type: object
properties:
events:
type: array
items:
$ref: '#/components/schemas/SingleEvent'
auto_approve:
type: boolean
responses:
201:
description: Events processed
content:
application/json:
schema:
type: object
properties:
accepted:
type: integer
rejected:
type: integer
conflicts:
type: integer
event_ids:
type: array
items:
type: string
format: uuid
/api/v1/training/suggestions:
get:
summary: List pending training suggestions
parameters:
- name: status
in: query
schema:
type: string
enum: [pending, approved, rejected, all]
- name: identity_id
in: query
schema:
type: string
format: uuid
- name: limit
in: query
schema:
type: integer
default: 50
- name: offset
in: query
schema:
type: integer
default: 0
responses:
200:
description: List of suggestions
/api/v1/training/suggestions/{id}/approve:
post:
summary: Approve a training suggestion
requestBody:
content:
application/json:
schema:
type: object
properties:
notes:
type: string
responses:
200:
description: Suggestion approved
/api/v1/training/suggestions/{id}/reject:
post:
summary: Reject a training suggestion
requestBody:
content:
application/json:
schema:
type: object
required: [reason]
properties:
reason:
type: string
responses:
200:
description: Suggestion rejected
/api/v1/training/suggestions/bulk-approve:
post:
summary: Bulk approve multiple suggestions
requestBody:
content:
application/json:
schema:
type: object
required: [suggestion_ids]
properties:
suggestion_ids:
type: array
items:
type: string
format: uuid
responses:
200:
description: Suggestions approved
/api/v1/training/jobs:
post:
summary: Trigger a new training job
requestBody:
content:
application/json:
schema:
type: object
properties:
training_mode:
type: string
enum: [incremental, fine_tune, full_retrain]
dataset_version:
type: string
auto_deploy:
type: boolean
ab_test_config:
type: object
responses:
201:
description: Training job created
get:
summary: List training jobs
parameters:
- name: status
in: query
schema:
type: string
- name: limit
in: query
schema:
type: integer
default: 20
responses:
200:
description: List of training jobs
/api/v1/training/jobs/{id}:
get:
summary: Get training job details and status
responses:
200:
description: Job details
delete:
summary: Cancel a running training job
responses:
200:
description: Job cancelled
/api/v1/models:
get:
summary: List model versions
responses:
200:
description: List of models
/api/v1/models/{version}/rollback:
post:
summary: Rollback to a specific model version
requestBody:
content:
application/json:
schema:
type: object
required: [reason]
properties:
reason:
type: string
responses:
200:
description: Rollback initiated
/api/v1/models/{version}/ab-test:
post:
summary: Start A/B test for a model version
requestBody:
content:
application/json:
schema:
type: object
properties:
traffic_split:
type: object
properties:
current:
type: number
new:
type: number
duration_hours:
type: integer
responses:
201:
description: A/B test started
/api/v1/metrics/drift:
get:
summary: Get drift detection report
responses:
200:
description: Drift report
/api/v1/metrics/performance:
get:
summary: Get model performance metrics
parameters:
- name: model_version
in: query
schema:
type: string
- name: identity_id
in: query
schema:
type: string
format: uuid
- name: from
in: query
schema:
type: string
format: date-time
- name: to
in: query
schema:
type: string
format: date-time
responses:
200:
description: Performance metrics
/api/v1/conflicts:
get:
summary: List label conflicts
parameters:
- name: status
in: query
schema:
type: string
enum: [open, under_review, resolved, all]
responses:
200:
description: List of conflicts
/api/v1/conflicts/{id}/resolve:
post:
summary: Resolve a label conflict
requestBody:
content:
application/json:
schema:
type: object
required: [resolution, winning_identity_id]
properties:
resolution:
type: string
enum: [label_selected, both_valid, escalated]
winning_identity_id:
type: string
format: uuid
notes:
type: string
responses:
200:
description: Conflict resolved
14.2 WebSocket Events
TrainingWebSocket:
/ws/training:
events:
job_status_update:
description: Real-time training job progress
payload:
job_id: uuid
status: string
progress_percent: float
current_step: string
eta_seconds: int
suggestion_created:
description: New training suggestion available
payload:
suggestion_id: uuid
identity_id: uuid
identity_name: string
new_example_count: int
conflict_detected:
description: New label conflict detected
payload:
conflict_id: uuid
conflict_type: string
severity: string
detection_id: uuid
quality_gate_result:
description: Quality gate completed for training job
payload:
job_id: uuid
model_version: string
passed: boolean
gate_results: object
model_deployed:
description: New model deployed to production
payload:
version: string
previous_version: string
deployment_method: string # auto, manual, ab_test_promoted
15. Implementation Roadmap
15.1 Phase 1: Foundation (Weeks 1-4)
| Week |
Deliverable |
Description |
| 1 |
Database schema |
Create all core tables, indexes, constraints |
| 1 |
Event collection API |
Submit and bulk-submit training events |
| 2 |
Quality filter |
Face quality assessment pipeline |
| 2 |
Conflict detector |
Automatic conflict detection on event submission |
| 3 |
Conflict UI |
Conflict review and resolution interface |
| 3 |
Suggestion engine |
Aggregate events into reviewable suggestions |
| 4 |
Suggestion UI |
Admin review interface for suggestions |
| 4 |
Learning mode config |
Support manual-only and suggested modes |
15.2 Phase 2: Training Pipeline (Weeks 5-8)
| Week |
Deliverable |
Description |
| 5 |
Dataset builder |
Build training datasets from approved events |
| 5 |
Dataset versioning |
Snapshot and version datasets |
| 6 |
Training orchestrator |
Job creation, queuing, GPU allocation |
| 6 |
Incremental updater |
Centroid-based embedding updates |
| 7 |
Full retrain pipeline |
Complete model retraining workflow |
| 7 |
Fine-tuning pipeline |
Fine-tune from existing checkpoint |
| 8 |
Quality gates |
All 5 gate checks implemented |
| 8 |
Model registry |
Version registration and artifact storage |
15.3 Phase 3: Deployment & Safety (Weeks 9-11)
| Week |
Deliverable |
Description |
| 9 |
Model deployment |
Atomic model switching |
| 9 |
Rollback system |
One-click rollback to previous version |
| 10 |
A/B testing |
Traffic splitting and comparison |
| 10 |
Auto-deploy config |
Configurable auto-deployment rules |
| 11 |
Embedding migration |
Safe embedding index updates |
| 11 |
Consistency checker |
Post-update verification |
15.4 Phase 4: Monitoring & Polish (Weeks 12-14)
| Week |
Deliverable |
Description |
| 12 |
Performance metrics |
Per-identity and global metrics collection |
| 12 |
Drift detector |
Automatic drift detection and alerts |
| 13 |
Alerting system |
Slack/email/pagerduty integration |
| 13 |
Training dashboard |
Full admin dashboard |
| 14 |
Dataset export |
Multiple export formats |
| 14 |
Retention policy |
Automated data lifecycle management |
| 14 |
Documentation |
Full API docs and runbooks |
Appendix A: Key Design Decisions
| Decision |
Choice |
Rationale |
| Embedding store |
FAISS + pgvector |
Fast ANN search with relational metadata |
| Model training |
PyTorch Lightning |
Structured training with built-in checkpointing |
| Model registry |
MLflow |
Industry standard, supports versioning and lineage |
| GPU scheduling |
Kubernetes + KEDA |
Auto-scaling based on queue depth |
| Orchestration |
Apache Airflow |
Robust DAG-based pipeline execution |
| Conflicts |
Hard block on detection |
Never train on conflicting data |
| Rollback |
Dual-index strategy |
Zero-downtime atomic switching |
| Updates |
Centroid-first |
95% of updates are incremental; full retrain only when needed |
Appendix B: Failure Mode Handling
| Failure |
Detection |
Mitigation |
| Training job fails |
Job status = FAILED, alert sent |
Auto-cleanup, no model deployed |
| Quality gate fails |
Gate report = FAILED |
Model held, admin notified, error logged |
| GPU OOM |
Worker crash, timeout |
Job retried with smaller batch size |
| Conflict detected |
Synchronous on event submission |
Training blocked, conflict ticket created |
| Drift detected |
Periodic drift analysis |
Retrain recommended, alert sent |
| Model deployment fails |
Health check failure |
Automatic rollback to previous version |
| Embedding index corruption |
Consistency checker |
Rebuild from dataset version |
| Database connection lost |
Connection timeout |
Retry with exponential backoff |
Appendix C: Configuration Reference
# training_config.yaml
training:
learning_mode: "suggested" # manual, suggested, approved_auto
min_positive_examples: 5
max_positive_examples: 500
negative_positive_ratio: 0.5
centroid_update:
alpha: 0.3 # Centroid adaptation rate
auto_trigger_threshold: 10 # New examples to trigger incremental update
scheduled_training:
daily_time: "02:00"
weekly_day: "sunday"
enabled: true
thresholds:
full_retrain_examples: 1000
full_retrain_identities: 50
full_retrain_model_age_days: 30
drift_retrain_accuracy_drop: 0.03
consecutive_degraded_days: 3
quality_gates:
min_precision: 0.97
min_recall: 0.95
min_f1_score: 0.96
max_regression_pct: 0.02
max_latency_p99_ms: 150
known_identity:
require_perfect_accuracy: true
test_all_identities: true
gpu:
pool:
v100_count: 4
a100_count: 2
t4_count: 8
auto_scale:
enabled: true
max_hourly_cost: 50.0
max_queue_depth: 20
job_timeouts:
incremental_minutes: 30
fine_tune_minutes: 120
full_retrain_minutes: 480
embedding_index:
type: "faiss"
metric: "cosine"
nlist: 100 # IVF clusters
nprobe: 10 # Clusters to search
hnsw_m: 16 # HNSW connections
ef_construction: 64
ef_search: 32
retention:
rejected_events_days: 30
approved_events_years: 2
dataset_versions_keep: 10
training_images_years: 2
model_versions_keep: 20
monitoring:
metrics_interval_minutes: 5
drift_check_interval_hours: 6
alert_channels:
- type: "slack"
webhook_url: "${SLACK_WEBHOOK}"
- type: "pagerduty"
service_key: "${PAGERDUTY_KEY}"