Backend Go: - Remplacement complet des anciennes migrations par la base V1 alignée sur ORIGIN. - Durcissement global du parsing JSON (BindAndValidateJSON + RespondWithAppError). - Sécurisation de config.go, CORS, statuts de santé et monitoring. - Implémentation des transactions P0 (RBAC, duplication de playlists, social toggles). - Ajout d’un job worker structuré (emails, analytics, thumbnails) + tests associés. - Nouvelle doc backend : AUDIT_CONFIG, BACKEND_CONFIG, AUTH_PASSWORD_RESET, JOB_WORKER_*. Chat server (Rust): - Refonte du pipeline JWT + sécurité, audit et rate limiting avancé. - Implémentation complète du cycle de message (read receipts, delivered, edit/delete, typing). - Nettoyage des panics, gestion d’erreurs robuste, logs structurés. - Migrations chat alignées sur le schéma UUID et nouvelles features. Stream server (Rust): - Refonte du moteur de streaming (encoding pipeline + HLS) et des modules core. - Transactions P0 pour les jobs et segments, garanties d’atomicité. - Documentation détaillée de la pipeline (AUDIT_STREAM_*, DESIGN_STREAM_PIPELINE, TRANSACTIONS_P0_IMPLEMENTATION). Documentation & audits: - TRIAGE.md et AUDIT_STABILITY.md à jour avec l’état réel des 3 services. - Cartographie complète des migrations et des transactions (DB_MIGRATIONS_*, DB_TRANSACTION_PLAN, AUDIT_DB_TRANSACTIONS, TRANSACTION_TESTS_PHASE3). - Scripts de reset et de cleanup pour la lab DB et la V1. Ce commit fige l’ensemble du travail de stabilisation P0 (UUID, backend, chat et stream) avant les phases suivantes (Coherence Guardian, WS hardening, etc.).
292 lines
9.5 KiB
Markdown
292 lines
9.5 KiB
Markdown
# 🔍 AUDIT INITIAL - Stream Processing Thread
|
|
|
|
**Date**: 2025-12-05
|
|
**Service**: `veza-stream-server`
|
|
**Mission**: P1 - Implémentation complète du thread de Stream Processing
|
|
|
|
---
|
|
|
|
## 1. ÉTAT ACTUEL DU CODE
|
|
|
|
### 1.1 Quelles parties sont déjà prêtes ?
|
|
|
|
#### ✅ **Pool FFmpeg (`src/core/encoding_pool.rs`)**
|
|
- **État**: ✅ **100% Fonctionnel**
|
|
- Pool de workers avec `async_channel` pour la queue
|
|
- Workers qui spawn des processus FFmpeg
|
|
- Capture stderr en streaming (lignes 202-232)
|
|
- Parsing de progression via `FfmpegProgress` (lignes 214-224)
|
|
- Timeout configurable (10 minutes par défaut)
|
|
- Mise à jour DB des statuts (`pending`, `encoding`, `done`, `error`)
|
|
- Parse et stocke les segments après complétion (lignes 281-349)
|
|
|
|
#### ✅ **Command Builder (`src/transcoding/ffmpeg/command_builder.rs`)**
|
|
- **État**: ✅ **Fonctionnel**
|
|
- Builder pattern pour commandes FFmpeg
|
|
- Support HLS complet (`-f hls`, `-hls_time`, `-hls_playlist_type vod`)
|
|
- Pattern de nommage segments: `segment_%05d.ts` (ligne 148)
|
|
- Isolation CPU: `-threads 1` (ligne 105)
|
|
- Audio-only: `-map 0:a` (ligne 108)
|
|
- Redirection stdout/stderr: `Stdio::piped()` (lignes 156-157)
|
|
|
|
#### ✅ **Progress Parser (`src/transcoding/ffmpeg/progress_parser.rs`)**
|
|
- **État**: ✅ **Fonctionnel**
|
|
- Regex pour parser `time=HH:MM:SS.mm`
|
|
- Support audio-only (sans frame/fps)
|
|
- Extraction de `time`, `speed`, `bitrate`
|
|
|
|
#### ✅ **Routes API (`src/routes/transcode.rs`)**
|
|
- **État**: ✅ **Fonctionnel**
|
|
- `POST /v1/stream/transcode` - Soumettre un job
|
|
- `GET /v1/stream/job/{id}` - Statut d'un job
|
|
- `GET /v1/stream/hls/{job_id}/index.m3u8` - Manifest HLS
|
|
- `GET /v1/stream/hls/{job_id}/{segment}` - Segment .ts
|
|
|
|
#### ✅ **Database Schema**
|
|
- **État**: ✅ **Prêt**
|
|
- `stream_jobs`: id, track_id, status, created_at, updated_at, error_message
|
|
- `stream_segments`: id, track_id, quality, segment_index, path, duration, created_at
|
|
- Index optimisés pour requêtes fréquentes
|
|
|
|
#### ✅ **Job Structure (`src/core/job.rs`)**
|
|
- **État**: ✅ **Fonctionnel**
|
|
- `EncodeJob`: structure complète avec tous les paramètres
|
|
- `EncodeJobStatus`: enum (Pending, Encoding, Done, Error)
|
|
|
|
#### ❌ **Stream Processing Thread (`src/core/encoder.rs` ligne 459)**
|
|
- **État**: ❌ **TODO non implémenté**
|
|
- `EncoderPipeline::start_processing()` ne fait rien (ligne 459)
|
|
- Aucun thread de traitement temps réel
|
|
- Aucun monitoring incrémental des segments
|
|
|
|
---
|
|
|
|
### 1.2 Qu'est-ce que le thread doit réellement faire ?
|
|
|
|
Le thread de Stream Processing doit :
|
|
|
|
1. **Spawn FFmpeg**
|
|
- Utiliser `tokio::process::Command` depuis `FfmpegCommandBuilder`
|
|
- Rediriger stdout/stderr
|
|
- Enregistrer PID + start_time en DB
|
|
|
|
2. **Monitoring Temps Réel**
|
|
- Lire stderr ligne par ligne (déjà fait dans `encoding_pool.rs` mais de manière basique)
|
|
- Détecter les événements FFmpeg:
|
|
- Ouverture de segments: `Opening 'segment_00000.ts'`
|
|
- Progression: `time=00:00:05.12`
|
|
- Erreurs: `error`, `Error`, `ERROR`
|
|
- Fin: `muxing overhead`
|
|
- Extraire les timestamps et détecter la génération de segments
|
|
|
|
3. **Suivi des Segments**
|
|
- Détecter chaque segment généré en temps réel (pas seulement après complétion)
|
|
- Enregistrer immédiatement en DB (`stream_segments`)
|
|
- Mettre à jour `current_duration` dans `stream_jobs`
|
|
|
|
4. **Persistance DB Temps Réel**
|
|
- Insérer chaque segment dès qu'il est détecté
|
|
- Mettre à jour `stream_jobs.status = 'encoding'` avec progression
|
|
- Mettre à jour `stream_jobs.current_duration` (si colonne existe)
|
|
|
|
5. **Callbacks de Fin**
|
|
- `on_success`: Mettre à jour status = 'done', finaliser playlist HLS
|
|
- `on_error`: Mettre à jour status = 'error', logger l'erreur
|
|
- `on_timeout`: Tuer le processus, nettoyer fichiers temporaires
|
|
|
|
6. **Gestion Stop/Retry**
|
|
- Support pour arrêter un job en cours
|
|
- Retry automatique en cas d'erreur temporaire
|
|
|
|
---
|
|
|
|
### 1.3 Quels événements FFmpeg sont détectables (stderr lines) ?
|
|
|
|
#### Événements Détectables dans stderr FFmpeg:
|
|
|
|
1. **Ouverture de Segment**
|
|
```
|
|
Opening 'segment_00000.ts' for writing
|
|
```
|
|
- Regex: `Opening '([^']+)' for writing`
|
|
- Indique qu'un nouveau segment est en cours de création
|
|
|
|
2. **Progression**
|
|
```
|
|
frame= 123 fps=0.0 q=-1.0 size= 1024kB time=00:00:05.12 bitrate=1638.4kbits/s speed=10.2x
|
|
```
|
|
- Déjà parsé par `FfmpegProgress::parse()`
|
|
- Contient `time` (timestamp actuel)
|
|
|
|
3. **Erreurs**
|
|
```
|
|
error: Invalid data found when processing input
|
|
Error opening input file
|
|
ERROR: Failed to open output file
|
|
```
|
|
- Détection: `line.contains("error") || line.contains("Error") || line.contains("ERROR")`
|
|
- Déjà détecté dans `encoding_pool.rs` ligne 227
|
|
|
|
4. **Fin de Muxing**
|
|
```
|
|
muxing overhead: 1.234567%
|
|
```
|
|
- Indique que FFmpeg a terminé l'encodage
|
|
- Regex: `muxing overhead: ([\d.]+)%`
|
|
|
|
5. **Segment Complet**
|
|
- Pas d'événement direct dans stderr
|
|
- **Solution**: Détecter quand un nouveau segment apparaît dans le répertoire
|
|
- Ou: Parser le manifest `.m3u8` en temps réel (mais il est mis à jour de manière asynchrone)
|
|
|
|
---
|
|
|
|
### 1.4 Comment les segments sont-ils nommés ?
|
|
|
|
D'après `command_builder.rs` ligne 148:
|
|
```rust
|
|
let segment_filename = output_path.with_file_name("segment_%05d.ts");
|
|
cmd.arg("-hls_segment_filename").arg(segment_filename);
|
|
```
|
|
|
|
**Pattern**: `segment_%05d.ts` où `%05d` est le numéro de séquence (00000, 00001, 00002, ...)
|
|
|
|
**Exemple**:
|
|
- `segment_00000.ts`
|
|
- `segment_00001.ts`
|
|
- `segment_00002.ts`
|
|
|
|
**Répertoire**: `/data/streams/<track_id>/<quality>/segment_*.ts`
|
|
|
|
---
|
|
|
|
### 1.5 Est-ce que la playlist live / vod est déjà générée automatiquement ?
|
|
|
|
**Oui**, mais de manière asynchrone:
|
|
|
|
1. **Génération Automatique**: FFmpeg génère automatiquement `index.m3u8` avec les segments
|
|
2. **Type VOD**: `-hls_playlist_type vod` (ligne 141 de `command_builder.rs`)
|
|
3. **Liste Complète**: `-hls_list_size 0` (ligne 143) = tous les segments dans la playlist
|
|
4. **Mise à Jour**: La playlist est mise à jour par FFmpeg au fur et à mesure
|
|
|
|
**Problème Actuel**:
|
|
- La playlist n'est parsée qu'**après** la complétion du job (ligne 247 de `encoding_pool.rs`)
|
|
- Pas de parsing temps réel pendant l'encodage
|
|
|
|
**Solution**:
|
|
- Parser le manifest `.m3u8` périodiquement (toutes les 2-3 secondes)
|
|
- Ou: Surveiller le répertoire pour détecter les nouveaux fichiers `.ts`
|
|
|
|
---
|
|
|
|
### 1.6 Le système supporte-t-il ABR (multi-qualité) ?
|
|
|
|
**Oui**, mais de manière séquentielle:
|
|
|
|
1. **Qualités Supportées**: `low`, `medium`, `high`, `hi_res` (défini dans `job.rs`)
|
|
2. **Encodage Multi-Qualité**: Chaque qualité est encodée dans un job séparé
|
|
3. **Master Playlist**: Pas de master playlist `.m3u8` pour ABR actuellement
|
|
4. **Structure**:
|
|
```
|
|
/data/streams/<track_id>/
|
|
low/index.m3u8 + segments
|
|
medium/index.m3u8 + segments
|
|
high/index.m3u8 + segments
|
|
hi_res/index.m3u8 + segments
|
|
```
|
|
|
|
**ABR Support Futur**:
|
|
- Créer un `master.m3u8` qui référence les playlists de chaque qualité
|
|
- Format:
|
|
```m3u8
|
|
#EXTM3U
|
|
#EXT-X-STREAM-INF:BANDWIDTH=64000,CODECS="mp4a.40.2"
|
|
low/playlist.m3u8
|
|
#EXT-X-STREAM-INF:BANDWIDTH=128000,CODECS="mp4a.40.2"
|
|
medium/playlist.m3u8
|
|
...
|
|
```
|
|
|
|
---
|
|
|
|
## 2. GAPS IDENTIFIÉS
|
|
|
|
### 2.1 Ce qui manque pour le Processing Thread
|
|
|
|
1. **Module `StreamProcessor`**
|
|
- Structure pour gérer un job de processing
|
|
- Méthode `run()` qui orchestre spawn → monitor → finalize
|
|
|
|
2. **Module `FFmpegMonitor`**
|
|
- Lecture stderr ligne par ligne
|
|
- Détection événements (segments, erreurs, fin)
|
|
- Extraction timestamps et progression
|
|
|
|
3. **Module `SegmentTracker`**
|
|
- Suivi des segments détectés
|
|
- Persistance DB temps réel
|
|
- Mise à jour `current_duration`
|
|
|
|
4. **Module `ProcessingCallbacks`**
|
|
- Callbacks `on_success`, `on_error`, `on_timeout`
|
|
- Finalisation playlist HLS
|
|
- Nettoyage fichiers temporaires
|
|
|
|
5. **API Status Temps Réel**
|
|
- Route `GET /api/streams/jobs/{id}/status`
|
|
- Retourne segments détectés + progression
|
|
|
|
6. **Intégration dans `EncoderPipeline`**
|
|
- Remplacer le TODO ligne 459 par un vrai thread
|
|
- Utiliser `StreamProcessor` pour le traitement
|
|
|
|
---
|
|
|
|
## 3. DESIGN PROPOSÉ
|
|
|
|
### 3.1 Architecture
|
|
|
|
```
|
|
EncoderPipeline::start_processing()
|
|
└─> StreamProcessor::new(job, db, config)
|
|
└─> StreamProcessor::run()
|
|
├─> spawn_ffmpeg() → tokio::process::Command
|
|
├─> monitor_output() → FFmpegMonitor
|
|
│ ├─> Lire stderr ligne par ligne
|
|
│ ├─> Détecter événements
|
|
│ └─> SegmentTracker::register(segment)
|
|
├─> SegmentTracker::persist() → DB
|
|
└─> finalize() → ProcessingCallbacks
|
|
├─> on_success → DB status = 'done'
|
|
├─> on_error → DB status = 'error'
|
|
└─> on_timeout → kill process, cleanup
|
|
```
|
|
|
|
### 3.2 Flux de Données
|
|
|
|
```
|
|
FFmpeg stderr → FFmpegMonitor → SegmentTracker → DB
|
|
↓
|
|
ProcessingStatus (mpsc)
|
|
↓
|
|
API Status Endpoint
|
|
```
|
|
|
|
---
|
|
|
|
## 4. PROCHAINES ÉTAPES
|
|
|
|
1. ✅ Audit initial (ce document)
|
|
2. ⏳ Implémenter `StreamProcessor`
|
|
3. ⏳ Implémenter `FFmpegMonitor`
|
|
4. ⏳ Implémenter `SegmentTracker`
|
|
5. ⏳ Implémenter `ProcessingCallbacks`
|
|
6. ⏳ Créer API status endpoint
|
|
7. ⏳ Intégrer dans `EncoderPipeline`
|
|
8. ⏳ Tests unitaires et intégration
|
|
9. ⏳ Documentation complète
|
|
|
|
---
|
|
|
|
**Auteur**: Veza Stream Server Team
|
|
**Dernière mise à jour**: 2025-12-05
|