293 lines
9.5 KiB
Markdown
293 lines
9.5 KiB
Markdown
|
|
# 🔍 AUDIT INITIAL - Stream Processing Thread
|
||
|
|
|
||
|
|
**Date**: 2025-12-05
|
||
|
|
**Service**: `veza-stream-server`
|
||
|
|
**Mission**: P1 - Implémentation complète du thread de Stream Processing
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## 1. ÉTAT ACTUEL DU CODE
|
||
|
|
|
||
|
|
### 1.1 Quelles parties sont déjà prêtes ?
|
||
|
|
|
||
|
|
#### ✅ **Pool FFmpeg (`src/core/encoding_pool.rs`)**
|
||
|
|
- **État**: ✅ **100% Fonctionnel**
|
||
|
|
- Pool de workers avec `async_channel` pour la queue
|
||
|
|
- Workers qui spawn des processus FFmpeg
|
||
|
|
- Capture stderr en streaming (lignes 202-232)
|
||
|
|
- Parsing de progression via `FfmpegProgress` (lignes 214-224)
|
||
|
|
- Timeout configurable (10 minutes par défaut)
|
||
|
|
- Mise à jour DB des statuts (`pending`, `encoding`, `done`, `error`)
|
||
|
|
- Parse et stocke les segments après complétion (lignes 281-349)
|
||
|
|
|
||
|
|
#### ✅ **Command Builder (`src/transcoding/ffmpeg/command_builder.rs`)**
|
||
|
|
- **État**: ✅ **Fonctionnel**
|
||
|
|
- Builder pattern pour commandes FFmpeg
|
||
|
|
- Support HLS complet (`-f hls`, `-hls_time`, `-hls_playlist_type vod`)
|
||
|
|
- Pattern de nommage segments: `segment_%05d.ts` (ligne 148)
|
||
|
|
- Isolation CPU: `-threads 1` (ligne 105)
|
||
|
|
- Audio-only: `-map 0:a` (ligne 108)
|
||
|
|
- Redirection stdout/stderr: `Stdio::piped()` (lignes 156-157)
|
||
|
|
|
||
|
|
#### ✅ **Progress Parser (`src/transcoding/ffmpeg/progress_parser.rs`)**
|
||
|
|
- **État**: ✅ **Fonctionnel**
|
||
|
|
- Regex pour parser `time=HH:MM:SS.mm`
|
||
|
|
- Support audio-only (sans frame/fps)
|
||
|
|
- Extraction de `time`, `speed`, `bitrate`
|
||
|
|
|
||
|
|
#### ✅ **Routes API (`src/routes/transcode.rs`)**
|
||
|
|
- **État**: ✅ **Fonctionnel**
|
||
|
|
- `POST /v1/stream/transcode` - Soumettre un job
|
||
|
|
- `GET /v1/stream/job/{id}` - Statut d'un job
|
||
|
|
- `GET /v1/stream/hls/{job_id}/index.m3u8` - Manifest HLS
|
||
|
|
- `GET /v1/stream/hls/{job_id}/{segment}` - Segment .ts
|
||
|
|
|
||
|
|
#### ✅ **Database Schema**
|
||
|
|
- **État**: ✅ **Prêt**
|
||
|
|
- `stream_jobs`: id, track_id, status, created_at, updated_at, error_message
|
||
|
|
- `stream_segments`: id, track_id, quality, segment_index, path, duration, created_at
|
||
|
|
- Index optimisés pour requêtes fréquentes
|
||
|
|
|
||
|
|
#### ✅ **Job Structure (`src/core/job.rs`)**
|
||
|
|
- **État**: ✅ **Fonctionnel**
|
||
|
|
- `EncodeJob`: structure complète avec tous les paramètres
|
||
|
|
- `EncodeJobStatus`: enum (Pending, Encoding, Done, Error)
|
||
|
|
|
||
|
|
#### ❌ **Stream Processing Thread (`src/core/encoder.rs` ligne 459)**
|
||
|
|
- **État**: ❌ **TODO non implémenté**
|
||
|
|
- `EncoderPipeline::start_processing()` ne fait rien (ligne 459)
|
||
|
|
- Aucun thread de traitement temps réel
|
||
|
|
- Aucun monitoring incrémental des segments
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
### 1.2 Qu'est-ce que le thread doit réellement faire ?
|
||
|
|
|
||
|
|
Le thread de Stream Processing doit :
|
||
|
|
|
||
|
|
1. **Spawn FFmpeg**
|
||
|
|
- Utiliser `tokio::process::Command` depuis `FfmpegCommandBuilder`
|
||
|
|
- Rediriger stdout/stderr
|
||
|
|
- Enregistrer PID + start_time en DB
|
||
|
|
|
||
|
|
2. **Monitoring Temps Réel**
|
||
|
|
- Lire stderr ligne par ligne (déjà fait dans `encoding_pool.rs` mais de manière basique)
|
||
|
|
- Détecter les événements FFmpeg:
|
||
|
|
- Ouverture de segments: `Opening 'segment_00000.ts'`
|
||
|
|
- Progression: `time=00:00:05.12`
|
||
|
|
- Erreurs: `error`, `Error`, `ERROR`
|
||
|
|
- Fin: `muxing overhead`
|
||
|
|
- Extraire les timestamps et détecter la génération de segments
|
||
|
|
|
||
|
|
3. **Suivi des Segments**
|
||
|
|
- Détecter chaque segment généré en temps réel (pas seulement après complétion)
|
||
|
|
- Enregistrer immédiatement en DB (`stream_segments`)
|
||
|
|
- Mettre à jour `current_duration` dans `stream_jobs`
|
||
|
|
|
||
|
|
4. **Persistance DB Temps Réel**
|
||
|
|
- Insérer chaque segment dès qu'il est détecté
|
||
|
|
- Mettre à jour `stream_jobs.status = 'encoding'` avec progression
|
||
|
|
- Mettre à jour `stream_jobs.current_duration` (si colonne existe)
|
||
|
|
|
||
|
|
5. **Callbacks de Fin**
|
||
|
|
- `on_success`: Mettre à jour status = 'done', finaliser playlist HLS
|
||
|
|
- `on_error`: Mettre à jour status = 'error', logger l'erreur
|
||
|
|
- `on_timeout`: Tuer le processus, nettoyer fichiers temporaires
|
||
|
|
|
||
|
|
6. **Gestion Stop/Retry**
|
||
|
|
- Support pour arrêter un job en cours
|
||
|
|
- Retry automatique en cas d'erreur temporaire
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
### 1.3 Quels événements FFmpeg sont détectables (stderr lines) ?
|
||
|
|
|
||
|
|
#### Événements Détectables dans stderr FFmpeg:
|
||
|
|
|
||
|
|
1. **Ouverture de Segment**
|
||
|
|
```
|
||
|
|
Opening 'segment_00000.ts' for writing
|
||
|
|
```
|
||
|
|
- Regex: `Opening '([^']+)' for writing`
|
||
|
|
- Indique qu'un nouveau segment est en cours de création
|
||
|
|
|
||
|
|
2. **Progression**
|
||
|
|
```
|
||
|
|
frame= 123 fps=0.0 q=-1.0 size= 1024kB time=00:00:05.12 bitrate=1638.4kbits/s speed=10.2x
|
||
|
|
```
|
||
|
|
- Déjà parsé par `FfmpegProgress::parse()`
|
||
|
|
- Contient `time` (timestamp actuel)
|
||
|
|
|
||
|
|
3. **Erreurs**
|
||
|
|
```
|
||
|
|
error: Invalid data found when processing input
|
||
|
|
Error opening input file
|
||
|
|
ERROR: Failed to open output file
|
||
|
|
```
|
||
|
|
- Détection: `line.contains("error") || line.contains("Error") || line.contains("ERROR")`
|
||
|
|
- Déjà détecté dans `encoding_pool.rs` ligne 227
|
||
|
|
|
||
|
|
4. **Fin de Muxing**
|
||
|
|
```
|
||
|
|
muxing overhead: 1.234567%
|
||
|
|
```
|
||
|
|
- Indique que FFmpeg a terminé l'encodage
|
||
|
|
- Regex: `muxing overhead: ([\d.]+)%`
|
||
|
|
|
||
|
|
5. **Segment Complet**
|
||
|
|
- Pas d'événement direct dans stderr
|
||
|
|
- **Solution**: Détecter quand un nouveau segment apparaît dans le répertoire
|
||
|
|
- Ou: Parser le manifest `.m3u8` en temps réel (mais il est mis à jour de manière asynchrone)
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
### 1.4 Comment les segments sont-ils nommés ?
|
||
|
|
|
||
|
|
D'après `command_builder.rs` ligne 148:
|
||
|
|
```rust
|
||
|
|
let segment_filename = output_path.with_file_name("segment_%05d.ts");
|
||
|
|
cmd.arg("-hls_segment_filename").arg(segment_filename);
|
||
|
|
```
|
||
|
|
|
||
|
|
**Pattern**: `segment_%05d.ts` où `%05d` est le numéro de séquence (00000, 00001, 00002, ...)
|
||
|
|
|
||
|
|
**Exemple**:
|
||
|
|
- `segment_00000.ts`
|
||
|
|
- `segment_00001.ts`
|
||
|
|
- `segment_00002.ts`
|
||
|
|
|
||
|
|
**Répertoire**: `/data/streams/<track_id>/<quality>/segment_*.ts`
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
### 1.5 Est-ce que la playlist live / vod est déjà générée automatiquement ?
|
||
|
|
|
||
|
|
**Oui**, mais de manière asynchrone:
|
||
|
|
|
||
|
|
1. **Génération Automatique**: FFmpeg génère automatiquement `index.m3u8` avec les segments
|
||
|
|
2. **Type VOD**: `-hls_playlist_type vod` (ligne 141 de `command_builder.rs`)
|
||
|
|
3. **Liste Complète**: `-hls_list_size 0` (ligne 143) = tous les segments dans la playlist
|
||
|
|
4. **Mise à Jour**: La playlist est mise à jour par FFmpeg au fur et à mesure
|
||
|
|
|
||
|
|
**Problème Actuel**:
|
||
|
|
- La playlist n'est parsée qu'**après** la complétion du job (ligne 247 de `encoding_pool.rs`)
|
||
|
|
- Pas de parsing temps réel pendant l'encodage
|
||
|
|
|
||
|
|
**Solution**:
|
||
|
|
- Parser le manifest `.m3u8` périodiquement (toutes les 2-3 secondes)
|
||
|
|
- Ou: Surveiller le répertoire pour détecter les nouveaux fichiers `.ts`
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
### 1.6 Le système supporte-t-il ABR (multi-qualité) ?
|
||
|
|
|
||
|
|
**Oui**, mais de manière séquentielle:
|
||
|
|
|
||
|
|
1. **Qualités Supportées**: `low`, `medium`, `high`, `hi_res` (défini dans `job.rs`)
|
||
|
|
2. **Encodage Multi-Qualité**: Chaque qualité est encodée dans un job séparé
|
||
|
|
3. **Master Playlist**: Pas de master playlist `.m3u8` pour ABR actuellement
|
||
|
|
4. **Structure**:
|
||
|
|
```
|
||
|
|
/data/streams/<track_id>/
|
||
|
|
low/index.m3u8 + segments
|
||
|
|
medium/index.m3u8 + segments
|
||
|
|
high/index.m3u8 + segments
|
||
|
|
hi_res/index.m3u8 + segments
|
||
|
|
```
|
||
|
|
|
||
|
|
**ABR Support Futur**:
|
||
|
|
- Créer un `master.m3u8` qui référence les playlists de chaque qualité
|
||
|
|
- Format:
|
||
|
|
```m3u8
|
||
|
|
#EXTM3U
|
||
|
|
#EXT-X-STREAM-INF:BANDWIDTH=64000,CODECS="mp4a.40.2"
|
||
|
|
low/playlist.m3u8
|
||
|
|
#EXT-X-STREAM-INF:BANDWIDTH=128000,CODECS="mp4a.40.2"
|
||
|
|
medium/playlist.m3u8
|
||
|
|
...
|
||
|
|
```
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## 2. GAPS IDENTIFIÉS
|
||
|
|
|
||
|
|
### 2.1 Ce qui manque pour le Processing Thread
|
||
|
|
|
||
|
|
1. **Module `StreamProcessor`**
|
||
|
|
- Structure pour gérer un job de processing
|
||
|
|
- Méthode `run()` qui orchestre spawn → monitor → finalize
|
||
|
|
|
||
|
|
2. **Module `FFmpegMonitor`**
|
||
|
|
- Lecture stderr ligne par ligne
|
||
|
|
- Détection événements (segments, erreurs, fin)
|
||
|
|
- Extraction timestamps et progression
|
||
|
|
|
||
|
|
3. **Module `SegmentTracker`**
|
||
|
|
- Suivi des segments détectés
|
||
|
|
- Persistance DB temps réel
|
||
|
|
- Mise à jour `current_duration`
|
||
|
|
|
||
|
|
4. **Module `ProcessingCallbacks`**
|
||
|
|
- Callbacks `on_success`, `on_error`, `on_timeout`
|
||
|
|
- Finalisation playlist HLS
|
||
|
|
- Nettoyage fichiers temporaires
|
||
|
|
|
||
|
|
5. **API Status Temps Réel**
|
||
|
|
- Route `GET /api/streams/jobs/{id}/status`
|
||
|
|
- Retourne segments détectés + progression
|
||
|
|
|
||
|
|
6. **Intégration dans `EncoderPipeline`**
|
||
|
|
- Remplacer le TODO ligne 459 par un vrai thread
|
||
|
|
- Utiliser `StreamProcessor` pour le traitement
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## 3. DESIGN PROPOSÉ
|
||
|
|
|
||
|
|
### 3.1 Architecture
|
||
|
|
|
||
|
|
```
|
||
|
|
EncoderPipeline::start_processing()
|
||
|
|
└─> StreamProcessor::new(job, db, config)
|
||
|
|
└─> StreamProcessor::run()
|
||
|
|
├─> spawn_ffmpeg() → tokio::process::Command
|
||
|
|
├─> monitor_output() → FFmpegMonitor
|
||
|
|
│ ├─> Lire stderr ligne par ligne
|
||
|
|
│ ├─> Détecter événements
|
||
|
|
│ └─> SegmentTracker::register(segment)
|
||
|
|
├─> SegmentTracker::persist() → DB
|
||
|
|
└─> finalize() → ProcessingCallbacks
|
||
|
|
├─> on_success → DB status = 'done'
|
||
|
|
├─> on_error → DB status = 'error'
|
||
|
|
└─> on_timeout → kill process, cleanup
|
||
|
|
```
|
||
|
|
|
||
|
|
### 3.2 Flux de Données
|
||
|
|
|
||
|
|
```
|
||
|
|
FFmpeg stderr → FFmpegMonitor → SegmentTracker → DB
|
||
|
|
↓
|
||
|
|
ProcessingStatus (mpsc)
|
||
|
|
↓
|
||
|
|
API Status Endpoint
|
||
|
|
```
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
## 4. PROCHAINES ÉTAPES
|
||
|
|
|
||
|
|
1. ✅ Audit initial (ce document)
|
||
|
|
2. ⏳ Implémenter `StreamProcessor`
|
||
|
|
3. ⏳ Implémenter `FFmpegMonitor`
|
||
|
|
4. ⏳ Implémenter `SegmentTracker`
|
||
|
|
5. ⏳ Implémenter `ProcessingCallbacks`
|
||
|
|
6. ⏳ Créer API status endpoint
|
||
|
|
7. ⏳ Intégrer dans `EncoderPipeline`
|
||
|
|
8. ⏳ Tests unitaires et intégration
|
||
|
|
9. ⏳ Documentation complète
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
**Auteur**: Veza Stream Server Team
|
||
|
|
**Dernière mise à jour**: 2025-12-05
|