# 🔍 AUDIT INITIAL - Stream Processing Thread **Date**: 2025-12-05 **Service**: `veza-stream-server` **Mission**: P1 - Implémentation complète du thread de Stream Processing --- ## 1. ÉTAT ACTUEL DU CODE ### 1.1 Quelles parties sont déjà prêtes ? #### ✅ **Pool FFmpeg (`src/core/encoding_pool.rs`)** - **État**: ✅ **100% Fonctionnel** - Pool de workers avec `async_channel` pour la queue - Workers qui spawn des processus FFmpeg - Capture stderr en streaming (lignes 202-232) - Parsing de progression via `FfmpegProgress` (lignes 214-224) - Timeout configurable (10 minutes par défaut) - Mise à jour DB des statuts (`pending`, `encoding`, `done`, `error`) - Parse et stocke les segments après complétion (lignes 281-349) #### ✅ **Command Builder (`src/transcoding/ffmpeg/command_builder.rs`)** - **État**: ✅ **Fonctionnel** - Builder pattern pour commandes FFmpeg - Support HLS complet (`-f hls`, `-hls_time`, `-hls_playlist_type vod`) - Pattern de nommage segments: `segment_%05d.ts` (ligne 148) - Isolation CPU: `-threads 1` (ligne 105) - Audio-only: `-map 0:a` (ligne 108) - Redirection stdout/stderr: `Stdio::piped()` (lignes 156-157) #### ✅ **Progress Parser (`src/transcoding/ffmpeg/progress_parser.rs`)** - **État**: ✅ **Fonctionnel** - Regex pour parser `time=HH:MM:SS.mm` - Support audio-only (sans frame/fps) - Extraction de `time`, `speed`, `bitrate` #### ✅ **Routes API (`src/routes/transcode.rs`)** - **État**: ✅ **Fonctionnel** - `POST /v1/stream/transcode` - Soumettre un job - `GET /v1/stream/job/{id}` - Statut d'un job - `GET /v1/stream/hls/{job_id}/index.m3u8` - Manifest HLS - `GET /v1/stream/hls/{job_id}/{segment}` - Segment .ts #### ✅ **Database Schema** - **État**: ✅ **Prêt** - `stream_jobs`: id, track_id, status, created_at, updated_at, error_message - `stream_segments`: id, track_id, quality, segment_index, path, duration, created_at - Index optimisés pour requêtes fréquentes #### ✅ **Job Structure (`src/core/job.rs`)** - **État**: ✅ **Fonctionnel** - `EncodeJob`: structure complète avec tous les paramètres - `EncodeJobStatus`: enum (Pending, Encoding, Done, Error) #### ❌ **Stream Processing Thread (`src/core/encoder.rs` ligne 459)** - **État**: ❌ **TODO non implémenté** - `EncoderPipeline::start_processing()` ne fait rien (ligne 459) - Aucun thread de traitement temps réel - Aucun monitoring incrémental des segments --- ### 1.2 Qu'est-ce que le thread doit réellement faire ? Le thread de Stream Processing doit : 1. **Spawn FFmpeg** - Utiliser `tokio::process::Command` depuis `FfmpegCommandBuilder` - Rediriger stdout/stderr - Enregistrer PID + start_time en DB 2. **Monitoring Temps Réel** - Lire stderr ligne par ligne (déjà fait dans `encoding_pool.rs` mais de manière basique) - Détecter les événements FFmpeg: - Ouverture de segments: `Opening 'segment_00000.ts'` - Progression: `time=00:00:05.12` - Erreurs: `error`, `Error`, `ERROR` - Fin: `muxing overhead` - Extraire les timestamps et détecter la génération de segments 3. **Suivi des Segments** - Détecter chaque segment généré en temps réel (pas seulement après complétion) - Enregistrer immédiatement en DB (`stream_segments`) - Mettre à jour `current_duration` dans `stream_jobs` 4. **Persistance DB Temps Réel** - Insérer chaque segment dès qu'il est détecté - Mettre à jour `stream_jobs.status = 'encoding'` avec progression - Mettre à jour `stream_jobs.current_duration` (si colonne existe) 5. **Callbacks de Fin** - `on_success`: Mettre à jour status = 'done', finaliser playlist HLS - `on_error`: Mettre à jour status = 'error', logger l'erreur - `on_timeout`: Tuer le processus, nettoyer fichiers temporaires 6. **Gestion Stop/Retry** - Support pour arrêter un job en cours - Retry automatique en cas d'erreur temporaire --- ### 1.3 Quels événements FFmpeg sont détectables (stderr lines) ? #### Événements Détectables dans stderr FFmpeg: 1. **Ouverture de Segment** ``` Opening 'segment_00000.ts' for writing ``` - Regex: `Opening '([^']+)' for writing` - Indique qu'un nouveau segment est en cours de création 2. **Progression** ``` frame= 123 fps=0.0 q=-1.0 size= 1024kB time=00:00:05.12 bitrate=1638.4kbits/s speed=10.2x ``` - Déjà parsé par `FfmpegProgress::parse()` - Contient `time` (timestamp actuel) 3. **Erreurs** ``` error: Invalid data found when processing input Error opening input file ERROR: Failed to open output file ``` - Détection: `line.contains("error") || line.contains("Error") || line.contains("ERROR")` - Déjà détecté dans `encoding_pool.rs` ligne 227 4. **Fin de Muxing** ``` muxing overhead: 1.234567% ``` - Indique que FFmpeg a terminé l'encodage - Regex: `muxing overhead: ([\d.]+)%` 5. **Segment Complet** - Pas d'événement direct dans stderr - **Solution**: Détecter quand un nouveau segment apparaît dans le répertoire - Ou: Parser le manifest `.m3u8` en temps réel (mais il est mis à jour de manière asynchrone) --- ### 1.4 Comment les segments sont-ils nommés ? D'après `command_builder.rs` ligne 148: ```rust let segment_filename = output_path.with_file_name("segment_%05d.ts"); cmd.arg("-hls_segment_filename").arg(segment_filename); ``` **Pattern**: `segment_%05d.ts` où `%05d` est le numéro de séquence (00000, 00001, 00002, ...) **Exemple**: - `segment_00000.ts` - `segment_00001.ts` - `segment_00002.ts` **Répertoire**: `/data/streams///segment_*.ts` --- ### 1.5 Est-ce que la playlist live / vod est déjà générée automatiquement ? **Oui**, mais de manière asynchrone: 1. **Génération Automatique**: FFmpeg génère automatiquement `index.m3u8` avec les segments 2. **Type VOD**: `-hls_playlist_type vod` (ligne 141 de `command_builder.rs`) 3. **Liste Complète**: `-hls_list_size 0` (ligne 143) = tous les segments dans la playlist 4. **Mise à Jour**: La playlist est mise à jour par FFmpeg au fur et à mesure **Problème Actuel**: - La playlist n'est parsée qu'**après** la complétion du job (ligne 247 de `encoding_pool.rs`) - Pas de parsing temps réel pendant l'encodage **Solution**: - Parser le manifest `.m3u8` périodiquement (toutes les 2-3 secondes) - Ou: Surveiller le répertoire pour détecter les nouveaux fichiers `.ts` --- ### 1.6 Le système supporte-t-il ABR (multi-qualité) ? **Oui**, mais de manière séquentielle: 1. **Qualités Supportées**: `low`, `medium`, `high`, `hi_res` (défini dans `job.rs`) 2. **Encodage Multi-Qualité**: Chaque qualité est encodée dans un job séparé 3. **Master Playlist**: Pas de master playlist `.m3u8` pour ABR actuellement 4. **Structure**: ``` /data/streams// low/index.m3u8 + segments medium/index.m3u8 + segments high/index.m3u8 + segments hi_res/index.m3u8 + segments ``` **ABR Support Futur**: - Créer un `master.m3u8` qui référence les playlists de chaque qualité - Format: ```m3u8 #EXTM3U #EXT-X-STREAM-INF:BANDWIDTH=64000,CODECS="mp4a.40.2" low/playlist.m3u8 #EXT-X-STREAM-INF:BANDWIDTH=128000,CODECS="mp4a.40.2" medium/playlist.m3u8 ... ``` --- ## 2. GAPS IDENTIFIÉS ### 2.1 Ce qui manque pour le Processing Thread 1. **Module `StreamProcessor`** - Structure pour gérer un job de processing - Méthode `run()` qui orchestre spawn → monitor → finalize 2. **Module `FFmpegMonitor`** - Lecture stderr ligne par ligne - Détection événements (segments, erreurs, fin) - Extraction timestamps et progression 3. **Module `SegmentTracker`** - Suivi des segments détectés - Persistance DB temps réel - Mise à jour `current_duration` 4. **Module `ProcessingCallbacks`** - Callbacks `on_success`, `on_error`, `on_timeout` - Finalisation playlist HLS - Nettoyage fichiers temporaires 5. **API Status Temps Réel** - Route `GET /api/streams/jobs/{id}/status` - Retourne segments détectés + progression 6. **Intégration dans `EncoderPipeline`** - Remplacer le TODO ligne 459 par un vrai thread - Utiliser `StreamProcessor` pour le traitement --- ## 3. DESIGN PROPOSÉ ### 3.1 Architecture ``` EncoderPipeline::start_processing() └─> StreamProcessor::new(job, db, config) └─> StreamProcessor::run() ├─> spawn_ffmpeg() → tokio::process::Command ├─> monitor_output() → FFmpegMonitor │ ├─> Lire stderr ligne par ligne │ ├─> Détecter événements │ └─> SegmentTracker::register(segment) ├─> SegmentTracker::persist() → DB └─> finalize() → ProcessingCallbacks ├─> on_success → DB status = 'done' ├─> on_error → DB status = 'error' └─> on_timeout → kill process, cleanup ``` ### 3.2 Flux de Données ``` FFmpeg stderr → FFmpegMonitor → SegmentTracker → DB ↓ ProcessingStatus (mpsc) ↓ API Status Endpoint ``` --- ## 4. PROCHAINES ÉTAPES 1. ✅ Audit initial (ce document) 2. ⏳ Implémenter `StreamProcessor` 3. ⏳ Implémenter `FFmpegMonitor` 4. ⏳ Implémenter `SegmentTracker` 5. ⏳ Implémenter `ProcessingCallbacks` 6. ⏳ Créer API status endpoint 7. ⏳ Intégrer dans `EncoderPipeline` 8. ⏳ Tests unitaires et intégration 9. ⏳ Documentation complète --- **Auteur**: Veza Stream Server Team **Dernière mise à jour**: 2025-12-05