veza/veza-stream-server/docs/AUDIT_STREAM_PROCESSING.md
okinrev b7955a680c P0: stabilisation backend/chat/stream + nouvelle base migrations v1
Backend Go:
- Remplacement complet des anciennes migrations par la base V1 alignée sur ORIGIN.
- Durcissement global du parsing JSON (BindAndValidateJSON + RespondWithAppError).
- Sécurisation de config.go, CORS, statuts de santé et monitoring.
- Implémentation des transactions P0 (RBAC, duplication de playlists, social toggles).
- Ajout d’un job worker structuré (emails, analytics, thumbnails) + tests associés.
- Nouvelle doc backend : AUDIT_CONFIG, BACKEND_CONFIG, AUTH_PASSWORD_RESET, JOB_WORKER_*.

Chat server (Rust):
- Refonte du pipeline JWT + sécurité, audit et rate limiting avancé.
- Implémentation complète du cycle de message (read receipts, delivered, edit/delete, typing).
- Nettoyage des panics, gestion d’erreurs robuste, logs structurés.
- Migrations chat alignées sur le schéma UUID et nouvelles features.

Stream server (Rust):
- Refonte du moteur de streaming (encoding pipeline + HLS) et des modules core.
- Transactions P0 pour les jobs et segments, garanties d’atomicité.
- Documentation détaillée de la pipeline (AUDIT_STREAM_*, DESIGN_STREAM_PIPELINE, TRANSACTIONS_P0_IMPLEMENTATION).

Documentation & audits:
- TRIAGE.md et AUDIT_STABILITY.md à jour avec l’état réel des 3 services.
- Cartographie complète des migrations et des transactions (DB_MIGRATIONS_*, DB_TRANSACTION_PLAN, AUDIT_DB_TRANSACTIONS, TRANSACTION_TESTS_PHASE3).
- Scripts de reset et de cleanup pour la lab DB et la V1.

Ce commit fige l’ensemble du travail de stabilisation P0 (UUID, backend, chat et stream) avant les phases suivantes (Coherence Guardian, WS hardening, etc.).
2025-12-06 11:14:38 +01:00

9.5 KiB

🔍 AUDIT INITIAL - Stream Processing Thread

Date: 2025-12-05
Service: veza-stream-server
Mission: P1 - Implémentation complète du thread de Stream Processing


1. ÉTAT ACTUEL DU CODE

1.1 Quelles parties sont déjà prêtes ?

Pool FFmpeg (src/core/encoding_pool.rs)

  • État: 100% Fonctionnel
  • Pool de workers avec async_channel pour la queue
  • Workers qui spawn des processus FFmpeg
  • Capture stderr en streaming (lignes 202-232)
  • Parsing de progression via FfmpegProgress (lignes 214-224)
  • Timeout configurable (10 minutes par défaut)
  • Mise à jour DB des statuts (pending, encoding, done, error)
  • Parse et stocke les segments après complétion (lignes 281-349)

Command Builder (src/transcoding/ffmpeg/command_builder.rs)

  • État: Fonctionnel
  • Builder pattern pour commandes FFmpeg
  • Support HLS complet (-f hls, -hls_time, -hls_playlist_type vod)
  • Pattern de nommage segments: segment_%05d.ts (ligne 148)
  • Isolation CPU: -threads 1 (ligne 105)
  • Audio-only: -map 0:a (ligne 108)
  • Redirection stdout/stderr: Stdio::piped() (lignes 156-157)

Progress Parser (src/transcoding/ffmpeg/progress_parser.rs)

  • État: Fonctionnel
  • Regex pour parser time=HH:MM:SS.mm
  • Support audio-only (sans frame/fps)
  • Extraction de time, speed, bitrate

Routes API (src/routes/transcode.rs)

  • État: Fonctionnel
  • POST /v1/stream/transcode - Soumettre un job
  • GET /v1/stream/job/{id} - Statut d'un job
  • GET /v1/stream/hls/{job_id}/index.m3u8 - Manifest HLS
  • GET /v1/stream/hls/{job_id}/{segment} - Segment .ts

Database Schema

  • État: Prêt
  • stream_jobs: id, track_id, status, created_at, updated_at, error_message
  • stream_segments: id, track_id, quality, segment_index, path, duration, created_at
  • Index optimisés pour requêtes fréquentes

Job Structure (src/core/job.rs)

  • État: Fonctionnel
  • EncodeJob: structure complète avec tous les paramètres
  • EncodeJobStatus: enum (Pending, Encoding, Done, Error)

Stream Processing Thread (src/core/encoder.rs ligne 459)

  • État: TODO non implémenté
  • EncoderPipeline::start_processing() ne fait rien (ligne 459)
  • Aucun thread de traitement temps réel
  • Aucun monitoring incrémental des segments

1.2 Qu'est-ce que le thread doit réellement faire ?

Le thread de Stream Processing doit :

  1. Spawn FFmpeg

    • Utiliser tokio::process::Command depuis FfmpegCommandBuilder
    • Rediriger stdout/stderr
    • Enregistrer PID + start_time en DB
  2. Monitoring Temps Réel

    • Lire stderr ligne par ligne (déjà fait dans encoding_pool.rs mais de manière basique)
    • Détecter les événements FFmpeg:
      • Ouverture de segments: Opening 'segment_00000.ts'
      • Progression: time=00:00:05.12
      • Erreurs: error, Error, ERROR
      • Fin: muxing overhead
    • Extraire les timestamps et détecter la génération de segments
  3. Suivi des Segments

    • Détecter chaque segment généré en temps réel (pas seulement après complétion)
    • Enregistrer immédiatement en DB (stream_segments)
    • Mettre à jour current_duration dans stream_jobs
  4. Persistance DB Temps Réel

    • Insérer chaque segment dès qu'il est détecté
    • Mettre à jour stream_jobs.status = 'encoding' avec progression
    • Mettre à jour stream_jobs.current_duration (si colonne existe)
  5. Callbacks de Fin

    • on_success: Mettre à jour status = 'done', finaliser playlist HLS
    • on_error: Mettre à jour status = 'error', logger l'erreur
    • on_timeout: Tuer le processus, nettoyer fichiers temporaires
  6. Gestion Stop/Retry

    • Support pour arrêter un job en cours
    • Retry automatique en cas d'erreur temporaire

1.3 Quels événements FFmpeg sont détectables (stderr lines) ?

Événements Détectables dans stderr FFmpeg:

  1. Ouverture de Segment

    Opening 'segment_00000.ts' for writing
    
    • Regex: Opening '([^']+)' for writing
    • Indique qu'un nouveau segment est en cours de création
  2. Progression

    frame=  123 fps=0.0 q=-1.0 size=    1024kB time=00:00:05.12 bitrate=1638.4kbits/s speed=10.2x
    
    • Déjà parsé par FfmpegProgress::parse()
    • Contient time (timestamp actuel)
  3. Erreurs

    error: Invalid data found when processing input
    Error opening input file
    ERROR: Failed to open output file
    
    • Détection: line.contains("error") || line.contains("Error") || line.contains("ERROR")
    • Déjà détecté dans encoding_pool.rs ligne 227
  4. Fin de Muxing

    muxing overhead: 1.234567%
    
    • Indique que FFmpeg a terminé l'encodage
    • Regex: muxing overhead: ([\d.]+)%
  5. Segment Complet

    • Pas d'événement direct dans stderr
    • Solution: Détecter quand un nouveau segment apparaît dans le répertoire
    • Ou: Parser le manifest .m3u8 en temps réel (mais il est mis à jour de manière asynchrone)

1.4 Comment les segments sont-ils nommés ?

D'après command_builder.rs ligne 148:

let segment_filename = output_path.with_file_name("segment_%05d.ts");
cmd.arg("-hls_segment_filename").arg(segment_filename);

Pattern: segment_%05d.ts%05d est le numéro de séquence (00000, 00001, 00002, ...)

Exemple:

  • segment_00000.ts
  • segment_00001.ts
  • segment_00002.ts

Répertoire: /data/streams/<track_id>/<quality>/segment_*.ts


1.5 Est-ce que la playlist live / vod est déjà générée automatiquement ?

Oui, mais de manière asynchrone:

  1. Génération Automatique: FFmpeg génère automatiquement index.m3u8 avec les segments
  2. Type VOD: -hls_playlist_type vod (ligne 141 de command_builder.rs)
  3. Liste Complète: -hls_list_size 0 (ligne 143) = tous les segments dans la playlist
  4. Mise à Jour: La playlist est mise à jour par FFmpeg au fur et à mesure

Problème Actuel:

  • La playlist n'est parsée qu'après la complétion du job (ligne 247 de encoding_pool.rs)
  • Pas de parsing temps réel pendant l'encodage

Solution:

  • Parser le manifest .m3u8 périodiquement (toutes les 2-3 secondes)
  • Ou: Surveiller le répertoire pour détecter les nouveaux fichiers .ts

1.6 Le système supporte-t-il ABR (multi-qualité) ?

Oui, mais de manière séquentielle:

  1. Qualités Supportées: low, medium, high, hi_res (défini dans job.rs)
  2. Encodage Multi-Qualité: Chaque qualité est encodée dans un job séparé
  3. Master Playlist: Pas de master playlist .m3u8 pour ABR actuellement
  4. Structure:
    /data/streams/<track_id>/
      low/index.m3u8 + segments
      medium/index.m3u8 + segments
      high/index.m3u8 + segments
      hi_res/index.m3u8 + segments
    

ABR Support Futur:

  • Créer un master.m3u8 qui référence les playlists de chaque qualité
  • Format:
    #EXTM3U
    #EXT-X-STREAM-INF:BANDWIDTH=64000,CODECS="mp4a.40.2"
    low/playlist.m3u8
    #EXT-X-STREAM-INF:BANDWIDTH=128000,CODECS="mp4a.40.2"
    medium/playlist.m3u8
    ...
    

2. GAPS IDENTIFIÉS

2.1 Ce qui manque pour le Processing Thread

  1. Module StreamProcessor

    • Structure pour gérer un job de processing
    • Méthode run() qui orchestre spawn → monitor → finalize
  2. Module FFmpegMonitor

    • Lecture stderr ligne par ligne
    • Détection événements (segments, erreurs, fin)
    • Extraction timestamps et progression
  3. Module SegmentTracker

    • Suivi des segments détectés
    • Persistance DB temps réel
    • Mise à jour current_duration
  4. Module ProcessingCallbacks

    • Callbacks on_success, on_error, on_timeout
    • Finalisation playlist HLS
    • Nettoyage fichiers temporaires
  5. API Status Temps Réel

    • Route GET /api/streams/jobs/{id}/status
    • Retourne segments détectés + progression
  6. Intégration dans EncoderPipeline

    • Remplacer le TODO ligne 459 par un vrai thread
    • Utiliser StreamProcessor pour le traitement

3. DESIGN PROPOSÉ

3.1 Architecture

EncoderPipeline::start_processing()
  └─> StreamProcessor::new(job, db, config)
      └─> StreamProcessor::run()
          ├─> spawn_ffmpeg() → tokio::process::Command
          ├─> monitor_output() → FFmpegMonitor
          │   ├─> Lire stderr ligne par ligne
          │   ├─> Détecter événements
          │   └─> SegmentTracker::register(segment)
          ├─> SegmentTracker::persist() → DB
          └─> finalize() → ProcessingCallbacks
              ├─> on_success → DB status = 'done'
              ├─> on_error → DB status = 'error'
              └─> on_timeout → kill process, cleanup

3.2 Flux de Données

FFmpeg stderr → FFmpegMonitor → SegmentTracker → DB
                                      ↓
                              ProcessingStatus (mpsc)
                                      ↓
                              API Status Endpoint

4. PROCHAINES ÉTAPES

  1. Audit initial (ce document)
  2. Implémenter StreamProcessor
  3. Implémenter FFmpegMonitor
  4. Implémenter SegmentTracker
  5. Implémenter ProcessingCallbacks
  6. Créer API status endpoint
  7. Intégrer dans EncoderPipeline
  8. Tests unitaires et intégration
  9. Documentation complète

Auteur: Veza Stream Server Team
Dernière mise à jour: 2025-12-05