veza/veza-stream-server/docs/STREAM_PROCESSING_THREAD.md
okinrev b7955a680c P0: stabilisation backend/chat/stream + nouvelle base migrations v1
Backend Go:
- Remplacement complet des anciennes migrations par la base V1 alignée sur ORIGIN.
- Durcissement global du parsing JSON (BindAndValidateJSON + RespondWithAppError).
- Sécurisation de config.go, CORS, statuts de santé et monitoring.
- Implémentation des transactions P0 (RBAC, duplication de playlists, social toggles).
- Ajout d’un job worker structuré (emails, analytics, thumbnails) + tests associés.
- Nouvelle doc backend : AUDIT_CONFIG, BACKEND_CONFIG, AUTH_PASSWORD_RESET, JOB_WORKER_*.

Chat server (Rust):
- Refonte du pipeline JWT + sécurité, audit et rate limiting avancé.
- Implémentation complète du cycle de message (read receipts, delivered, edit/delete, typing).
- Nettoyage des panics, gestion d’erreurs robuste, logs structurés.
- Migrations chat alignées sur le schéma UUID et nouvelles features.

Stream server (Rust):
- Refonte du moteur de streaming (encoding pipeline + HLS) et des modules core.
- Transactions P0 pour les jobs et segments, garanties d’atomicité.
- Documentation détaillée de la pipeline (AUDIT_STREAM_*, DESIGN_STREAM_PIPELINE, TRANSACTIONS_P0_IMPLEMENTATION).

Documentation & audits:
- TRIAGE.md et AUDIT_STABILITY.md à jour avec l’état réel des 3 services.
- Cartographie complète des migrations et des transactions (DB_MIGRATIONS_*, DB_TRANSACTION_PLAN, AUDIT_DB_TRANSACTIONS, TRANSACTION_TESTS_PHASE3).
- Scripts de reset et de cleanup pour la lab DB et la V1.

Ce commit fige l’ensemble du travail de stabilisation P0 (UUID, backend, chat et stream) avant les phases suivantes (Coherence Guardian, WS hardening, etc.).
2025-12-06 11:14:38 +01:00

15 KiB

🎬 Stream Processing Thread - Documentation Complète

Date: 2025-12-05
Service: veza-stream-server
Version: 1.0.0
Statut: Implémenté


📋 Table des Matières

  1. Vue d'ensemble
  2. Architecture
  3. Cycle de Vie
  4. Modules
  5. Format des Logs FFmpeg
  6. Mapping Timestamp → Segments
  7. ABR Support
  8. API Status
  9. Troubleshooting
  10. Checklist Production

🎯 Vue d'ensemble

Le Stream Processing Thread est le module central qui gère le traitement temps réel des jobs d'encodage audio via FFmpeg. Il orchestre:

  • Spawn et monitoring du processus FFmpeg
  • Détection en temps réel des segments HLS générés
  • Persistance DB incrémentale des segments
  • Gestion des erreurs et timeouts
  • Callbacks de fin (success, error, timeout)
  • Mise à jour des métadonnées HLS

Composants Principaux

StreamProcessor
  ├─ FFmpegMonitor (parsing stderr)
  ├─ SegmentTracker (suivi segments)
  └─ ProcessingCallbacks (fin de traitement)

🏗️ Architecture

Diagramme de la Pipeline

┌─────────────────────────────────────────────────────────┐
│              EncoderPipeline::start_processing()         │
└────────────────────┬────────────────────────────────────┘
                     │
                     ▼
┌─────────────────────────────────────────────────────────┐
│              StreamProcessor::new()                      │
│  - job_id, job, db, output_dir                         │
└────────────────────┬────────────────────────────────────┘
                     │
                     ▼
┌─────────────────────────────────────────────────────────┐
│              StreamProcessor::run()                      │
│                                                         │
│  1. spawn_ffmpeg()                                      │
│     └─> FfmpegCommandBuilder                            │
│     └─> tokio::process::Command                         │
│                                                         │
│  2. monitor_output()                                    │
│     ├─> FFmpegMonitor::monitor_stderr()                 │
│     │   └─> Lire stderr ligne par ligne                │
│     │   └─> Détecter événements                        │
│     │       ├─ SegmentOpened                            │
│     │       ├─ Progress                                │
│     │       ├─ Error                                   │
│     │       └─ MuxingComplete                          │
│     │                                                   │
│     └─> SegmentTracker::register()                     │
│         └─> Persist immédiatement en DB                │
│                                                         │
│  3. finalize()                                          │
│     └─> ProcessingCallbacks                            │
│         ├─ on_success()                                 │
│         ├─ on_error()                                   │
│         └─ on_timeout()                                │
└─────────────────────────────────────────────────────────┘

Flux de Données

FFmpeg Process
  └─> stderr (lignes)
      └─> FFmpegMonitor
          └─> FFmpegEvent
              └─> StreamProcessor
                  └─> SegmentTracker
                      ├─> Vec<SegmentInfo> (mémoire)
                      └─> DB (stream_segments)
                          └─> API Status Endpoint

🔄 Cycle de Vie

1. Initialisation

let processor = StreamProcessor::new(
    job_id,
    job,
    db_pool,
    output_dir,
)
.with_timeout(Duration::from_secs(600))
.with_status_sender(status_tx);

2. Spawn FFmpeg

// Créer répertoire de sortie
tokio::fs::create_dir_all(&output_dir).await?;

// Construire commande FFmpeg
let command = FfmpegCommandBuilder::new()
    .input(&job.input_path)
    .output(&manifest_path)
    .audio_codec(job.codec)
    .bitrate(job.bitrate)
    .container(ContainerFormat::HLS)
    .hls_time(job.hls_segment_time)
    .build()?;

// Spawn processus
let child = command.spawn()?;

3. Monitoring Temps Réel

// Créer monitor
let (monitor, mut event_rx) = FFmpegMonitor::new(output_dir);

// Démarrer monitoring stderr
let monitor_handle = tokio::spawn(async move {
    monitor.monitor_stderr(stderr).await
});

// Traiter événements
while let Some(event) = event_rx.recv().await {
    match event {
        FFmpegEvent::SegmentOpened { path, index } => {
            tracker.register(segment).await?;
        }
        FFmpegEvent::Progress { time, speed } => {
            // Log progression
        }
        FFmpegEvent::Error(msg) => {
            // Log erreur
        }
        FFmpegEvent::MuxingComplete { overhead } => {
            // Fin de muxing
        }
    }
}

4. Persistance DB

// Chaque segment est persisté immédiatement
sqlx::query!(
    r#"
    INSERT INTO stream_segments (track_id, quality, segment_index, path, duration, created_at)
    VALUES ($1, $2, $3, $4, $5, NOW())
    ON CONFLICT (track_id, quality, segment_index) DO UPDATE
    SET path = EXCLUDED.path, duration = EXCLUDED.duration
    "#,
    track_id,
    quality,
    segment_index,
    path,
    duration
)
.execute(&db)
.await?;

5. Finalisation

// Succès
callbacks.on_success().await?;
callbacks.finalize_playlist().await?;

// Erreur
callbacks.on_error(&error_msg).await?;

// Timeout
callbacks.on_timeout().await?;

📦 Modules

1. processor.rs - StreamProcessor

Responsabilités:

  • Orchestration du traitement complet
  • Spawn FFmpeg
  • Coordination monitor + tracker
  • Gestion timeout et erreurs

API Principale:

pub struct StreamProcessor {
    job_id: Uuid,
    job: EncodeJob,
    db: PgPool,
    output_dir: PathBuf,
    job_timeout: Duration,
}

impl StreamProcessor {
    pub fn new(job_id, job, db, output_dir) -> Self;
    pub fn with_timeout(self, timeout: Duration) -> Self;
    pub fn with_status_sender(self, tx: mpsc::Sender<ProcessingStatus>) -> Self;
    pub async fn run(&mut self) -> Result<()>;
}

2. ffmpeg_monitor.rs - FFmpegMonitor

Responsabilités:

  • Lecture stderr ligne par ligne
  • Parsing des événements FFmpeg
  • Détection segments, erreurs, progression

Événements Détectés:

pub enum FFmpegEvent {
    SegmentOpened { path: PathBuf, index: u32 },
    Progress { time: Option<Duration>, speed: Option<f32> },
    Error(String),
    MuxingComplete { overhead: f32 },
}

Regex Utilisées:

  • Opening '([^']+)' for writing → Segment ouvert
  • time=(\d{2}:\d{2}:\d{2}\.\d{2}) → Progression
  • (?i)(error|failed|invalid) → Erreurs
  • muxing overhead: ([\d.]+)% → Fin muxing

3. segment_tracker.rs - SegmentTracker

Responsabilités:

  • Suivi des segments détectés
  • Persistance DB temps réel
  • Calcul durée totale

API Principale:

pub struct SegmentTracker {
    segments: Arc<RwLock<Vec<SegmentInfo>>>,
    job_id: Uuid,
    track_id: Uuid,
    quality: String,
    db: PgPool,
}

impl SegmentTracker {
    pub async fn register(&self, segment: SegmentInfo) -> Result<()>;
    pub async fn get_segments(&self) -> Vec<SegmentInfo>;
    pub async fn get_total_duration(&self) -> f64;
    pub async fn persist_all(&self) -> Result<()>;
}

4. callbacks.rs - ProcessingCallbacks

Responsabilités:

  • Callbacks de fin de traitement
  • Mise à jour DB status
  • Finalisation playlist HLS

API Principale:

pub struct ProcessingCallbacks {
    job_id: Uuid,
    track_id: Uuid,
    db: PgPool,
    output_dir: PathBuf,
}

impl ProcessingCallbacks {
    pub async fn on_success(&self) -> Result<()>;
    pub async fn on_error(&self, error_message: &str) -> Result<()>;
    pub async fn on_timeout(&self) -> Result<()>;
    pub async fn finalize_playlist(&self) -> Result<()>;
}

📝 Format des Logs FFmpeg

Lignes de Progression

frame=  123 fps=0.0 q=-1.0 size=    1024kB time=00:00:05.12 bitrate=1638.4kbits/s speed=10.2x

Parsé par: FfmpegProgress::parse()

Champs extraits:

  • time: 00:00:05.12Duration::from_millis(5120)
  • speed: 10.2x10.2
  • bitrate: 1638.4kbits/s"1638.4kbits/s"

Ouverture de Segment

Opening 'segment_00000.ts' for writing

Détecté par: OPENING_SEGMENT_REGEX

Extraction:

  • path: segment_00000.ts
  • index: 0 (depuis le nom de fichier)

Erreurs

error: Invalid data found when processing input
Error opening input file
ERROR: Failed to open output file

Détecté par: ERROR_REGEX (case-insensitive)

Fin de Muxing

muxing overhead: 1.234567%

Détecté par: MUXING_OVERHEAD_REGEX

Extraction: overhead: 1.234567


🗺️ Mapping Timestamp → Segments

Détection des Segments

  1. Depuis stderr: Opening 'segment_00000.ts' for writing

    • Index extrait depuis le nom de fichier
    • Durée par défaut: 4.0s (sera mise à jour depuis le manifest)
  2. Depuis le répertoire: Scan périodique des fichiers .ts

    • Utile pour détecter les segments déjà générés
    • Méthode: FFmpegMonitor::detect_existing_segments()
  3. Depuis le manifest: Parsing du .m3u8 (après complétion)

    • Durée exacte depuis #EXTINF:4.000,
    • Ordre des segments

Structure SegmentInfo

pub struct SegmentInfo {
    pub index: u32,              // 0, 1, 2, ...
    pub path: PathBuf,           // /data/streams/track_id/quality/segment_00000.ts
    pub duration: f64,           // 4.0 (secondes)
    pub timestamp: SystemTime,   // Timestamp de création
}

Persistance DB

INSERT INTO stream_segments (
    track_id, quality, segment_index, path, duration, created_at
)
VALUES ($1, $2, $3, $4, $5, NOW())
ON CONFLICT (track_id, quality, segment_index) DO UPDATE
SET path = EXCLUDED.path, duration = EXCLUDED.duration

🎚️ ABR Support

Structure Multi-Qualité

/data/streams/<track_id>/
  low/
    index.m3u8
    segment_00000.ts
    segment_00001.ts
    ...
  medium/
    index.m3u8
    segment_00000.ts
    segment_00001.ts
    ...
  high/
    index.m3u8
    segment_00000.ts
    segment_00001.ts
    ...
  hi_res/
    index.m3u8
    segment_00000.ts
    segment_00001.ts
    ...

Master Playlist (Futur)

Pour supporter ABR automatique, créer un master.m3u8:

#EXTM3U
#EXT-X-VERSION:3
#EXT-X-STREAM-INF:BANDWIDTH=64000,CODECS="mp4a.40.2"
low/playlist.m3u8
#EXT-X-STREAM-INF:BANDWIDTH=128000,CODECS="mp4a.40.2"
medium/playlist.m3u8
#EXT-X-STREAM-INF:BANDWIDTH=192000,CODECS="mp4a.40.2"
high/playlist.m3u8
#EXT-X-STREAM-INF:BANDWIDTH=320000,CODECS="mp4a.40.2"
hi_res/playlist.m3u8

Note: Le master playlist n'est pas encore généré automatiquement. C'est une feature future.


🌐 API Status

Endpoint

GET /api/streams/jobs/{id}/status

Response

{
  "id": "550e8400-e29b-41d4-a716-446655440000",
  "track_id": "123e4567-e89b-12d3-a456-426614174000",
  "status": "encoding",
  "segments": [
    {
      "index": 0,
      "path": "/data/streams/track_id/medium/segment_00000.ts",
      "duration": 4.0,
      "created_at": "2025-12-05T10:30:00Z"
    },
    {
      "index": 1,
      "path": "/data/streams/track_id/medium/segment_00001.ts",
      "duration": 4.0,
      "created_at": "2025-12-05T10:30:04Z"
    }
  ],
  "current_duration": 8.0,
  "progress": 0.5,
  "created_at": "2025-12-05T10:29:55Z",
  "started_at": null,
  "completed_at": null,
  "error": null
}

Statuts Possibles

  • pending: Job en attente
  • encoding: Job en cours de traitement
  • done: Job terminé avec succès
  • error: Job échoué
  • timeout: Job expiré

🔧 Troubleshooting

Problème: Segments non détectés

Symptômes:

  • Aucun segment dans la DB
  • segments_count = 0 dans l'API status

Solutions:

  1. Vérifier que FFmpeg génère bien les segments:

    ls -la /data/streams/<track_id>/<quality>/
    
  2. Vérifier les logs stderr:

    # Les logs doivent contenir "Opening 'segment_*.ts' for writing"
    
  3. Vérifier les permissions du répertoire:

    chmod -R 755 /data/streams
    

Problème: Timeout fréquents

Symptômes:

  • Jobs qui expirent avant la fin
  • status = "timeout"

Solutions:

  1. Augmenter le timeout:

    processor.with_timeout(Duration::from_secs(1200)); // 20 minutes
    
  2. Vérifier la charge CPU:

    top -p $(pgrep -f ffmpeg)
    
  3. Réduire le nombre de workers simultanés

Problème: Erreurs FFmpeg non détectées

Symptômes:

  • Job en encoding indéfiniment
  • Pas d'erreur dans les logs

Solutions:

  1. Vérifier les logs stderr:

    // Les erreurs sont loggées avec tracing::warn!
    
  2. Vérifier le code de sortie FFmpeg:

    // child.wait() retourne le status
    

Problème: Segments dupliqués en DB

Symptômes:

  • Plusieurs entrées pour le même segment_index

Solutions:

  1. Utiliser ON CONFLICT DO UPDATE (déjà implémenté)
  2. Vérifier les contraintes UNIQUE:
    CONSTRAINT stream_segments_unique UNIQUE (track_id, quality, segment_index)
    

Checklist Production

Avant Déploiement

  • Vérifier que FFmpeg est installé et accessible
  • Vérifier les permissions /data/streams
  • Configurer le timeout approprié (défaut: 10 minutes)
  • Vérifier la connexion DB
  • Tester avec un fichier audio de test

Monitoring

  • Logs structurés activés (tracing)
  • Métriques Prometheus (si disponible)
  • Alertes sur timeouts fréquents
  • Alertes sur erreurs FFmpeg

Performance

  • Limiter le nombre de workers simultanés
  • Monitorer l'utilisation CPU
  • Monitorer l'espace disque (/data/streams)
  • Optimiser les requêtes DB (index)

Sécurité

  • Validation des chemins d'entrée/sortie
  • Isolation des processus FFmpeg
  • Nettoyage des fichiers temporaires
  • Rate limiting sur l'API

📚 Références


Auteur: Veza Stream Server Team
Dernière mise à jour: 2025-12-05