# 🎬 Stream Processing Thread - Documentation Complète **Date**: 2025-12-05 **Service**: `veza-stream-server` **Version**: 1.0.0 **Statut**: ✅ **Implémenté** --- ## 📋 Table des Matières 1. [Vue d'ensemble](#vue-densemble) 2. [Architecture](#architecture) 3. [Cycle de Vie](#cycle-de-vie) 4. [Modules](#modules) 5. [Format des Logs FFmpeg](#format-des-logs-ffmpeg) 6. [Mapping Timestamp → Segments](#mapping-timestamp--segments) 7. [ABR Support](#abr-support) 8. [API Status](#api-status) 9. [Troubleshooting](#troubleshooting) 10. [Checklist Production](#checklist-production) --- ## 🎯 Vue d'ensemble Le **Stream Processing Thread** est le module central qui gère le traitement temps réel des jobs d'encodage audio via FFmpeg. Il orchestre: - ✅ Spawn et monitoring du processus FFmpeg - ✅ Détection en temps réel des segments HLS générés - ✅ Persistance DB incrémentale des segments - ✅ Gestion des erreurs et timeouts - ✅ Callbacks de fin (success, error, timeout) - ✅ Mise à jour des métadonnées HLS ### Composants Principaux ``` StreamProcessor ├─ FFmpegMonitor (parsing stderr) ├─ SegmentTracker (suivi segments) └─ ProcessingCallbacks (fin de traitement) ``` --- ## 🏗️ Architecture ### Diagramme de la Pipeline ``` ┌─────────────────────────────────────────────────────────┐ │ EncoderPipeline::start_processing() │ └────────────────────┬────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────┐ │ StreamProcessor::new() │ │ - job_id, job, db, output_dir │ └────────────────────┬────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────┐ │ StreamProcessor::run() │ │ │ │ 1. spawn_ffmpeg() │ │ └─> FfmpegCommandBuilder │ │ └─> tokio::process::Command │ │ │ │ 2. monitor_output() │ │ ├─> FFmpegMonitor::monitor_stderr() │ │ │ └─> Lire stderr ligne par ligne │ │ │ └─> Détecter événements │ │ │ ├─ SegmentOpened │ │ │ ├─ Progress │ │ │ ├─ Error │ │ │ └─ MuxingComplete │ │ │ │ │ └─> SegmentTracker::register() │ │ └─> Persist immédiatement en DB │ │ │ │ 3. finalize() │ │ └─> ProcessingCallbacks │ │ ├─ on_success() │ │ ├─ on_error() │ │ └─ on_timeout() │ └─────────────────────────────────────────────────────────┘ ``` ### Flux de Données ``` FFmpeg Process └─> stderr (lignes) └─> FFmpegMonitor └─> FFmpegEvent └─> StreamProcessor └─> SegmentTracker ├─> Vec (mémoire) └─> DB (stream_segments) └─> API Status Endpoint ``` --- ## 🔄 Cycle de Vie ### 1. Initialisation ```rust let processor = StreamProcessor::new( job_id, job, db_pool, output_dir, ) .with_timeout(Duration::from_secs(600)) .with_status_sender(status_tx); ``` ### 2. Spawn FFmpeg ```rust // Créer répertoire de sortie tokio::fs::create_dir_all(&output_dir).await?; // Construire commande FFmpeg let command = FfmpegCommandBuilder::new() .input(&job.input_path) .output(&manifest_path) .audio_codec(job.codec) .bitrate(job.bitrate) .container(ContainerFormat::HLS) .hls_time(job.hls_segment_time) .build()?; // Spawn processus let child = command.spawn()?; ``` ### 3. Monitoring Temps Réel ```rust // Créer monitor let (monitor, mut event_rx) = FFmpegMonitor::new(output_dir); // Démarrer monitoring stderr let monitor_handle = tokio::spawn(async move { monitor.monitor_stderr(stderr).await }); // Traiter événements while let Some(event) = event_rx.recv().await { match event { FFmpegEvent::SegmentOpened { path, index } => { tracker.register(segment).await?; } FFmpegEvent::Progress { time, speed } => { // Log progression } FFmpegEvent::Error(msg) => { // Log erreur } FFmpegEvent::MuxingComplete { overhead } => { // Fin de muxing } } } ``` ### 4. Persistance DB ```rust // Chaque segment est persisté immédiatement sqlx::query!( r#" INSERT INTO stream_segments (track_id, quality, segment_index, path, duration, created_at) VALUES ($1, $2, $3, $4, $5, NOW()) ON CONFLICT (track_id, quality, segment_index) DO UPDATE SET path = EXCLUDED.path, duration = EXCLUDED.duration "#, track_id, quality, segment_index, path, duration ) .execute(&db) .await?; ``` ### 5. Finalisation ```rust // Succès callbacks.on_success().await?; callbacks.finalize_playlist().await?; // Erreur callbacks.on_error(&error_msg).await?; // Timeout callbacks.on_timeout().await?; ``` --- ## 📦 Modules ### 1. `processor.rs` - StreamProcessor **Responsabilités**: - Orchestration du traitement complet - Spawn FFmpeg - Coordination monitor + tracker - Gestion timeout et erreurs **API Principale**: ```rust pub struct StreamProcessor { job_id: Uuid, job: EncodeJob, db: PgPool, output_dir: PathBuf, job_timeout: Duration, } impl StreamProcessor { pub fn new(job_id, job, db, output_dir) -> Self; pub fn with_timeout(self, timeout: Duration) -> Self; pub fn with_status_sender(self, tx: mpsc::Sender) -> Self; pub async fn run(&mut self) -> Result<()>; } ``` ### 2. `ffmpeg_monitor.rs` - FFmpegMonitor **Responsabilités**: - Lecture stderr ligne par ligne - Parsing des événements FFmpeg - Détection segments, erreurs, progression **Événements Détectés**: ```rust pub enum FFmpegEvent { SegmentOpened { path: PathBuf, index: u32 }, Progress { time: Option, speed: Option }, Error(String), MuxingComplete { overhead: f32 }, } ``` **Regex Utilisées**: - `Opening '([^']+)' for writing` → Segment ouvert - `time=(\d{2}:\d{2}:\d{2}\.\d{2})` → Progression - `(?i)(error|failed|invalid)` → Erreurs - `muxing overhead: ([\d.]+)%` → Fin muxing ### 3. `segment_tracker.rs` - SegmentTracker **Responsabilités**: - Suivi des segments détectés - Persistance DB temps réel - Calcul durée totale **API Principale**: ```rust pub struct SegmentTracker { segments: Arc>>, job_id: Uuid, track_id: Uuid, quality: String, db: PgPool, } impl SegmentTracker { pub async fn register(&self, segment: SegmentInfo) -> Result<()>; pub async fn get_segments(&self) -> Vec; pub async fn get_total_duration(&self) -> f64; pub async fn persist_all(&self) -> Result<()>; } ``` ### 4. `callbacks.rs` - ProcessingCallbacks **Responsabilités**: - Callbacks de fin de traitement - Mise à jour DB status - Finalisation playlist HLS **API Principale**: ```rust pub struct ProcessingCallbacks { job_id: Uuid, track_id: Uuid, db: PgPool, output_dir: PathBuf, } impl ProcessingCallbacks { pub async fn on_success(&self) -> Result<()>; pub async fn on_error(&self, error_message: &str) -> Result<()>; pub async fn on_timeout(&self) -> Result<()>; pub async fn finalize_playlist(&self) -> Result<()>; } ``` --- ## 📝 Format des Logs FFmpeg ### Lignes de Progression ``` frame= 123 fps=0.0 q=-1.0 size= 1024kB time=00:00:05.12 bitrate=1638.4kbits/s speed=10.2x ``` **Parsé par**: `FfmpegProgress::parse()` **Champs extraits**: - `time`: `00:00:05.12` → `Duration::from_millis(5120)` - `speed`: `10.2x` → `10.2` - `bitrate`: `1638.4kbits/s` → `"1638.4kbits/s"` ### Ouverture de Segment ``` Opening 'segment_00000.ts' for writing ``` **Détecté par**: `OPENING_SEGMENT_REGEX` **Extraction**: - `path`: `segment_00000.ts` - `index`: `0` (depuis le nom de fichier) ### Erreurs ``` error: Invalid data found when processing input Error opening input file ERROR: Failed to open output file ``` **Détecté par**: `ERROR_REGEX` (case-insensitive) ### Fin de Muxing ``` muxing overhead: 1.234567% ``` **Détecté par**: `MUXING_OVERHEAD_REGEX` **Extraction**: `overhead`: `1.234567` --- ## 🗺️ Mapping Timestamp → Segments ### Détection des Segments 1. **Depuis stderr**: `Opening 'segment_00000.ts' for writing` - Index extrait depuis le nom de fichier - Durée par défaut: `4.0s` (sera mise à jour depuis le manifest) 2. **Depuis le répertoire**: Scan périodique des fichiers `.ts` - Utile pour détecter les segments déjà générés - Méthode: `FFmpegMonitor::detect_existing_segments()` 3. **Depuis le manifest**: Parsing du `.m3u8` (après complétion) - Durée exacte depuis `#EXTINF:4.000,` - Ordre des segments ### Structure SegmentInfo ```rust pub struct SegmentInfo { pub index: u32, // 0, 1, 2, ... pub path: PathBuf, // /data/streams/track_id/quality/segment_00000.ts pub duration: f64, // 4.0 (secondes) pub timestamp: SystemTime, // Timestamp de création } ``` ### Persistance DB ```sql INSERT INTO stream_segments ( track_id, quality, segment_index, path, duration, created_at ) VALUES ($1, $2, $3, $4, $5, NOW()) ON CONFLICT (track_id, quality, segment_index) DO UPDATE SET path = EXCLUDED.path, duration = EXCLUDED.duration ``` --- ## 🎚️ ABR Support ### Structure Multi-Qualité ``` /data/streams// low/ index.m3u8 segment_00000.ts segment_00001.ts ... medium/ index.m3u8 segment_00000.ts segment_00001.ts ... high/ index.m3u8 segment_00000.ts segment_00001.ts ... hi_res/ index.m3u8 segment_00000.ts segment_00001.ts ... ``` ### Master Playlist (Futur) Pour supporter ABR automatique, créer un `master.m3u8`: ```m3u8 #EXTM3U #EXT-X-VERSION:3 #EXT-X-STREAM-INF:BANDWIDTH=64000,CODECS="mp4a.40.2" low/playlist.m3u8 #EXT-X-STREAM-INF:BANDWIDTH=128000,CODECS="mp4a.40.2" medium/playlist.m3u8 #EXT-X-STREAM-INF:BANDWIDTH=192000,CODECS="mp4a.40.2" high/playlist.m3u8 #EXT-X-STREAM-INF:BANDWIDTH=320000,CODECS="mp4a.40.2" hi_res/playlist.m3u8 ``` **Note**: Le master playlist n'est pas encore généré automatiquement. C'est une feature future. --- ## 🌐 API Status ### Endpoint ``` GET /api/streams/jobs/{id}/status ``` ### Response ```json { "id": "550e8400-e29b-41d4-a716-446655440000", "track_id": "123e4567-e89b-12d3-a456-426614174000", "status": "encoding", "segments": [ { "index": 0, "path": "/data/streams/track_id/medium/segment_00000.ts", "duration": 4.0, "created_at": "2025-12-05T10:30:00Z" }, { "index": 1, "path": "/data/streams/track_id/medium/segment_00001.ts", "duration": 4.0, "created_at": "2025-12-05T10:30:04Z" } ], "current_duration": 8.0, "progress": 0.5, "created_at": "2025-12-05T10:29:55Z", "started_at": null, "completed_at": null, "error": null } ``` ### Statuts Possibles - `pending`: Job en attente - `encoding`: Job en cours de traitement - `done`: Job terminé avec succès - `error`: Job échoué - `timeout`: Job expiré --- ## 🔧 Troubleshooting ### Problème: Segments non détectés **Symptômes**: - Aucun segment dans la DB - `segments_count = 0` dans l'API status **Solutions**: 1. Vérifier que FFmpeg génère bien les segments: ```bash ls -la /data/streams/// ``` 2. Vérifier les logs stderr: ```bash # Les logs doivent contenir "Opening 'segment_*.ts' for writing" ``` 3. Vérifier les permissions du répertoire: ```bash chmod -R 755 /data/streams ``` ### Problème: Timeout fréquents **Symptômes**: - Jobs qui expirent avant la fin - `status = "timeout"` **Solutions**: 1. Augmenter le timeout: ```rust processor.with_timeout(Duration::from_secs(1200)); // 20 minutes ``` 2. Vérifier la charge CPU: ```bash top -p $(pgrep -f ffmpeg) ``` 3. Réduire le nombre de workers simultanés ### Problème: Erreurs FFmpeg non détectées **Symptômes**: - Job en `encoding` indéfiniment - Pas d'erreur dans les logs **Solutions**: 1. Vérifier les logs stderr: ```rust // Les erreurs sont loggées avec tracing::warn! ``` 2. Vérifier le code de sortie FFmpeg: ```rust // child.wait() retourne le status ``` ### Problème: Segments dupliqués en DB **Symptômes**: - Plusieurs entrées pour le même `segment_index` **Solutions**: 1. Utiliser `ON CONFLICT DO UPDATE` (déjà implémenté) 2. Vérifier les contraintes UNIQUE: ```sql CONSTRAINT stream_segments_unique UNIQUE (track_id, quality, segment_index) ``` --- ## ✅ Checklist Production ### Avant Déploiement - [ ] Vérifier que FFmpeg est installé et accessible - [ ] Vérifier les permissions `/data/streams` - [ ] Configurer le timeout approprié (défaut: 10 minutes) - [ ] Vérifier la connexion DB - [ ] Tester avec un fichier audio de test ### Monitoring - [ ] Logs structurés activés (`tracing`) - [ ] Métriques Prometheus (si disponible) - [ ] Alertes sur timeouts fréquents - [ ] Alertes sur erreurs FFmpeg ### Performance - [ ] Limiter le nombre de workers simultanés - [ ] Monitorer l'utilisation CPU - [ ] Monitorer l'espace disque (`/data/streams`) - [ ] Optimiser les requêtes DB (index) ### Sécurité - [ ] Validation des chemins d'entrée/sortie - [ ] Isolation des processus FFmpeg - [ ] Nettoyage des fichiers temporaires - [ ] Rate limiting sur l'API --- ## 📚 Références - [HLS Specification (Apple)](https://developer.apple.com/streaming/) - [FFmpeg Documentation](https://ffmpeg.org/documentation.html) - [Documentation STREAM_ENCODING_PIPELINE.md](./STREAM_ENCODING_PIPELINE.md) - [Audit AUDIT_STREAM_PROCESSING.md](./AUDIT_STREAM_PROCESSING.md) --- **Auteur**: Veza Stream Server Team **Dernière mise à jour**: 2025-12-05