veza/veza-stream-server/docs/STREAM_PROCESSING_THREAD.md
okinrev b7955a680c P0: stabilisation backend/chat/stream + nouvelle base migrations v1
Backend Go:
- Remplacement complet des anciennes migrations par la base V1 alignée sur ORIGIN.
- Durcissement global du parsing JSON (BindAndValidateJSON + RespondWithAppError).
- Sécurisation de config.go, CORS, statuts de santé et monitoring.
- Implémentation des transactions P0 (RBAC, duplication de playlists, social toggles).
- Ajout d’un job worker structuré (emails, analytics, thumbnails) + tests associés.
- Nouvelle doc backend : AUDIT_CONFIG, BACKEND_CONFIG, AUTH_PASSWORD_RESET, JOB_WORKER_*.

Chat server (Rust):
- Refonte du pipeline JWT + sécurité, audit et rate limiting avancé.
- Implémentation complète du cycle de message (read receipts, delivered, edit/delete, typing).
- Nettoyage des panics, gestion d’erreurs robuste, logs structurés.
- Migrations chat alignées sur le schéma UUID et nouvelles features.

Stream server (Rust):
- Refonte du moteur de streaming (encoding pipeline + HLS) et des modules core.
- Transactions P0 pour les jobs et segments, garanties d’atomicité.
- Documentation détaillée de la pipeline (AUDIT_STREAM_*, DESIGN_STREAM_PIPELINE, TRANSACTIONS_P0_IMPLEMENTATION).

Documentation & audits:
- TRIAGE.md et AUDIT_STABILITY.md à jour avec l’état réel des 3 services.
- Cartographie complète des migrations et des transactions (DB_MIGRATIONS_*, DB_TRANSACTION_PLAN, AUDIT_DB_TRANSACTIONS, TRANSACTION_TESTS_PHASE3).
- Scripts de reset et de cleanup pour la lab DB et la V1.

Ce commit fige l’ensemble du travail de stabilisation P0 (UUID, backend, chat et stream) avant les phases suivantes (Coherence Guardian, WS hardening, etc.).
2025-12-06 11:14:38 +01:00

614 lines
15 KiB
Markdown

# 🎬 Stream Processing Thread - Documentation Complète
**Date**: 2025-12-05
**Service**: `veza-stream-server`
**Version**: 1.0.0
**Statut**: ✅ **Implémenté**
---
## 📋 Table des Matières
1. [Vue d'ensemble](#vue-densemble)
2. [Architecture](#architecture)
3. [Cycle de Vie](#cycle-de-vie)
4. [Modules](#modules)
5. [Format des Logs FFmpeg](#format-des-logs-ffmpeg)
6. [Mapping Timestamp → Segments](#mapping-timestamp--segments)
7. [ABR Support](#abr-support)
8. [API Status](#api-status)
9. [Troubleshooting](#troubleshooting)
10. [Checklist Production](#checklist-production)
---
## 🎯 Vue d'ensemble
Le **Stream Processing Thread** est le module central qui gère le traitement temps réel des jobs d'encodage audio via FFmpeg. Il orchestre:
- ✅ Spawn et monitoring du processus FFmpeg
- ✅ Détection en temps réel des segments HLS générés
- ✅ Persistance DB incrémentale des segments
- ✅ Gestion des erreurs et timeouts
- ✅ Callbacks de fin (success, error, timeout)
- ✅ Mise à jour des métadonnées HLS
### Composants Principaux
```
StreamProcessor
├─ FFmpegMonitor (parsing stderr)
├─ SegmentTracker (suivi segments)
└─ ProcessingCallbacks (fin de traitement)
```
---
## 🏗️ Architecture
### Diagramme de la Pipeline
```
┌─────────────────────────────────────────────────────────┐
│ EncoderPipeline::start_processing() │
└────────────────────┬────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ StreamProcessor::new() │
│ - job_id, job, db, output_dir │
└────────────────────┬────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ StreamProcessor::run() │
│ │
│ 1. spawn_ffmpeg() │
│ └─> FfmpegCommandBuilder │
│ └─> tokio::process::Command │
│ │
│ 2. monitor_output() │
│ ├─> FFmpegMonitor::monitor_stderr() │
│ │ └─> Lire stderr ligne par ligne │
│ │ └─> Détecter événements │
│ │ ├─ SegmentOpened │
│ │ ├─ Progress │
│ │ ├─ Error │
│ │ └─ MuxingComplete │
│ │ │
│ └─> SegmentTracker::register() │
│ └─> Persist immédiatement en DB │
│ │
│ 3. finalize() │
│ └─> ProcessingCallbacks │
│ ├─ on_success() │
│ ├─ on_error() │
│ └─ on_timeout() │
└─────────────────────────────────────────────────────────┘
```
### Flux de Données
```
FFmpeg Process
└─> stderr (lignes)
└─> FFmpegMonitor
└─> FFmpegEvent
└─> StreamProcessor
└─> SegmentTracker
├─> Vec<SegmentInfo> (mémoire)
└─> DB (stream_segments)
└─> API Status Endpoint
```
---
## 🔄 Cycle de Vie
### 1. Initialisation
```rust
let processor = StreamProcessor::new(
job_id,
job,
db_pool,
output_dir,
)
.with_timeout(Duration::from_secs(600))
.with_status_sender(status_tx);
```
### 2. Spawn FFmpeg
```rust
// Créer répertoire de sortie
tokio::fs::create_dir_all(&output_dir).await?;
// Construire commande FFmpeg
let command = FfmpegCommandBuilder::new()
.input(&job.input_path)
.output(&manifest_path)
.audio_codec(job.codec)
.bitrate(job.bitrate)
.container(ContainerFormat::HLS)
.hls_time(job.hls_segment_time)
.build()?;
// Spawn processus
let child = command.spawn()?;
```
### 3. Monitoring Temps Réel
```rust
// Créer monitor
let (monitor, mut event_rx) = FFmpegMonitor::new(output_dir);
// Démarrer monitoring stderr
let monitor_handle = tokio::spawn(async move {
monitor.monitor_stderr(stderr).await
});
// Traiter événements
while let Some(event) = event_rx.recv().await {
match event {
FFmpegEvent::SegmentOpened { path, index } => {
tracker.register(segment).await?;
}
FFmpegEvent::Progress { time, speed } => {
// Log progression
}
FFmpegEvent::Error(msg) => {
// Log erreur
}
FFmpegEvent::MuxingComplete { overhead } => {
// Fin de muxing
}
}
}
```
### 4. Persistance DB
```rust
// Chaque segment est persisté immédiatement
sqlx::query!(
r#"
INSERT INTO stream_segments (track_id, quality, segment_index, path, duration, created_at)
VALUES ($1, $2, $3, $4, $5, NOW())
ON CONFLICT (track_id, quality, segment_index) DO UPDATE
SET path = EXCLUDED.path, duration = EXCLUDED.duration
"#,
track_id,
quality,
segment_index,
path,
duration
)
.execute(&db)
.await?;
```
### 5. Finalisation
```rust
// Succès
callbacks.on_success().await?;
callbacks.finalize_playlist().await?;
// Erreur
callbacks.on_error(&error_msg).await?;
// Timeout
callbacks.on_timeout().await?;
```
---
## 📦 Modules
### 1. `processor.rs` - StreamProcessor
**Responsabilités**:
- Orchestration du traitement complet
- Spawn FFmpeg
- Coordination monitor + tracker
- Gestion timeout et erreurs
**API Principale**:
```rust
pub struct StreamProcessor {
job_id: Uuid,
job: EncodeJob,
db: PgPool,
output_dir: PathBuf,
job_timeout: Duration,
}
impl StreamProcessor {
pub fn new(job_id, job, db, output_dir) -> Self;
pub fn with_timeout(self, timeout: Duration) -> Self;
pub fn with_status_sender(self, tx: mpsc::Sender<ProcessingStatus>) -> Self;
pub async fn run(&mut self) -> Result<()>;
}
```
### 2. `ffmpeg_monitor.rs` - FFmpegMonitor
**Responsabilités**:
- Lecture stderr ligne par ligne
- Parsing des événements FFmpeg
- Détection segments, erreurs, progression
**Événements Détectés**:
```rust
pub enum FFmpegEvent {
SegmentOpened { path: PathBuf, index: u32 },
Progress { time: Option<Duration>, speed: Option<f32> },
Error(String),
MuxingComplete { overhead: f32 },
}
```
**Regex Utilisées**:
- `Opening '([^']+)' for writing` → Segment ouvert
- `time=(\d{2}:\d{2}:\d{2}\.\d{2})` → Progression
- `(?i)(error|failed|invalid)` → Erreurs
- `muxing overhead: ([\d.]+)%` → Fin muxing
### 3. `segment_tracker.rs` - SegmentTracker
**Responsabilités**:
- Suivi des segments détectés
- Persistance DB temps réel
- Calcul durée totale
**API Principale**:
```rust
pub struct SegmentTracker {
segments: Arc<RwLock<Vec<SegmentInfo>>>,
job_id: Uuid,
track_id: Uuid,
quality: String,
db: PgPool,
}
impl SegmentTracker {
pub async fn register(&self, segment: SegmentInfo) -> Result<()>;
pub async fn get_segments(&self) -> Vec<SegmentInfo>;
pub async fn get_total_duration(&self) -> f64;
pub async fn persist_all(&self) -> Result<()>;
}
```
### 4. `callbacks.rs` - ProcessingCallbacks
**Responsabilités**:
- Callbacks de fin de traitement
- Mise à jour DB status
- Finalisation playlist HLS
**API Principale**:
```rust
pub struct ProcessingCallbacks {
job_id: Uuid,
track_id: Uuid,
db: PgPool,
output_dir: PathBuf,
}
impl ProcessingCallbacks {
pub async fn on_success(&self) -> Result<()>;
pub async fn on_error(&self, error_message: &str) -> Result<()>;
pub async fn on_timeout(&self) -> Result<()>;
pub async fn finalize_playlist(&self) -> Result<()>;
}
```
---
## 📝 Format des Logs FFmpeg
### Lignes de Progression
```
frame= 123 fps=0.0 q=-1.0 size= 1024kB time=00:00:05.12 bitrate=1638.4kbits/s speed=10.2x
```
**Parsé par**: `FfmpegProgress::parse()`
**Champs extraits**:
- `time`: `00:00:05.12``Duration::from_millis(5120)`
- `speed`: `10.2x``10.2`
- `bitrate`: `1638.4kbits/s``"1638.4kbits/s"`
### Ouverture de Segment
```
Opening 'segment_00000.ts' for writing
```
**Détecté par**: `OPENING_SEGMENT_REGEX`
**Extraction**:
- `path`: `segment_00000.ts`
- `index`: `0` (depuis le nom de fichier)
### Erreurs
```
error: Invalid data found when processing input
Error opening input file
ERROR: Failed to open output file
```
**Détecté par**: `ERROR_REGEX` (case-insensitive)
### Fin de Muxing
```
muxing overhead: 1.234567%
```
**Détecté par**: `MUXING_OVERHEAD_REGEX`
**Extraction**: `overhead`: `1.234567`
---
## 🗺️ Mapping Timestamp → Segments
### Détection des Segments
1. **Depuis stderr**: `Opening 'segment_00000.ts' for writing`
- Index extrait depuis le nom de fichier
- Durée par défaut: `4.0s` (sera mise à jour depuis le manifest)
2. **Depuis le répertoire**: Scan périodique des fichiers `.ts`
- Utile pour détecter les segments déjà générés
- Méthode: `FFmpegMonitor::detect_existing_segments()`
3. **Depuis le manifest**: Parsing du `.m3u8` (après complétion)
- Durée exacte depuis `#EXTINF:4.000,`
- Ordre des segments
### Structure SegmentInfo
```rust
pub struct SegmentInfo {
pub index: u32, // 0, 1, 2, ...
pub path: PathBuf, // /data/streams/track_id/quality/segment_00000.ts
pub duration: f64, // 4.0 (secondes)
pub timestamp: SystemTime, // Timestamp de création
}
```
### Persistance DB
```sql
INSERT INTO stream_segments (
track_id, quality, segment_index, path, duration, created_at
)
VALUES ($1, $2, $3, $4, $5, NOW())
ON CONFLICT (track_id, quality, segment_index) DO UPDATE
SET path = EXCLUDED.path, duration = EXCLUDED.duration
```
---
## 🎚️ ABR Support
### Structure Multi-Qualité
```
/data/streams/<track_id>/
low/
index.m3u8
segment_00000.ts
segment_00001.ts
...
medium/
index.m3u8
segment_00000.ts
segment_00001.ts
...
high/
index.m3u8
segment_00000.ts
segment_00001.ts
...
hi_res/
index.m3u8
segment_00000.ts
segment_00001.ts
...
```
### Master Playlist (Futur)
Pour supporter ABR automatique, créer un `master.m3u8`:
```m3u8
#EXTM3U
#EXT-X-VERSION:3
#EXT-X-STREAM-INF:BANDWIDTH=64000,CODECS="mp4a.40.2"
low/playlist.m3u8
#EXT-X-STREAM-INF:BANDWIDTH=128000,CODECS="mp4a.40.2"
medium/playlist.m3u8
#EXT-X-STREAM-INF:BANDWIDTH=192000,CODECS="mp4a.40.2"
high/playlist.m3u8
#EXT-X-STREAM-INF:BANDWIDTH=320000,CODECS="mp4a.40.2"
hi_res/playlist.m3u8
```
**Note**: Le master playlist n'est pas encore généré automatiquement. C'est une feature future.
---
## 🌐 API Status
### Endpoint
```
GET /api/streams/jobs/{id}/status
```
### Response
```json
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"track_id": "123e4567-e89b-12d3-a456-426614174000",
"status": "encoding",
"segments": [
{
"index": 0,
"path": "/data/streams/track_id/medium/segment_00000.ts",
"duration": 4.0,
"created_at": "2025-12-05T10:30:00Z"
},
{
"index": 1,
"path": "/data/streams/track_id/medium/segment_00001.ts",
"duration": 4.0,
"created_at": "2025-12-05T10:30:04Z"
}
],
"current_duration": 8.0,
"progress": 0.5,
"created_at": "2025-12-05T10:29:55Z",
"started_at": null,
"completed_at": null,
"error": null
}
```
### Statuts Possibles
- `pending`: Job en attente
- `encoding`: Job en cours de traitement
- `done`: Job terminé avec succès
- `error`: Job échoué
- `timeout`: Job expiré
---
## 🔧 Troubleshooting
### Problème: Segments non détectés
**Symptômes**:
- Aucun segment dans la DB
- `segments_count = 0` dans l'API status
**Solutions**:
1. Vérifier que FFmpeg génère bien les segments:
```bash
ls -la /data/streams/<track_id>/<quality>/
```
2. Vérifier les logs stderr:
```bash
# Les logs doivent contenir "Opening 'segment_*.ts' for writing"
```
3. Vérifier les permissions du répertoire:
```bash
chmod -R 755 /data/streams
```
### Problème: Timeout fréquents
**Symptômes**:
- Jobs qui expirent avant la fin
- `status = "timeout"`
**Solutions**:
1. Augmenter le timeout:
```rust
processor.with_timeout(Duration::from_secs(1200)); // 20 minutes
```
2. Vérifier la charge CPU:
```bash
top -p $(pgrep -f ffmpeg)
```
3. Réduire le nombre de workers simultanés
### Problème: Erreurs FFmpeg non détectées
**Symptômes**:
- Job en `encoding` indéfiniment
- Pas d'erreur dans les logs
**Solutions**:
1. Vérifier les logs stderr:
```rust
// Les erreurs sont loggées avec tracing::warn!
```
2. Vérifier le code de sortie FFmpeg:
```rust
// child.wait() retourne le status
```
### Problème: Segments dupliqués en DB
**Symptômes**:
- Plusieurs entrées pour le même `segment_index`
**Solutions**:
1. Utiliser `ON CONFLICT DO UPDATE` (déjà implémenté)
2. Vérifier les contraintes UNIQUE:
```sql
CONSTRAINT stream_segments_unique UNIQUE (track_id, quality, segment_index)
```
---
## ✅ Checklist Production
### Avant Déploiement
- [ ] Vérifier que FFmpeg est installé et accessible
- [ ] Vérifier les permissions `/data/streams`
- [ ] Configurer le timeout approprié (défaut: 10 minutes)
- [ ] Vérifier la connexion DB
- [ ] Tester avec un fichier audio de test
### Monitoring
- [ ] Logs structurés activés (`tracing`)
- [ ] Métriques Prometheus (si disponible)
- [ ] Alertes sur timeouts fréquents
- [ ] Alertes sur erreurs FFmpeg
### Performance
- [ ] Limiter le nombre de workers simultanés
- [ ] Monitorer l'utilisation CPU
- [ ] Monitorer l'espace disque (`/data/streams`)
- [ ] Optimiser les requêtes DB (index)
### Sécurité
- [ ] Validation des chemins d'entrée/sortie
- [ ] Isolation des processus FFmpeg
- [ ] Nettoyage des fichiers temporaires
- [ ] Rate limiting sur l'API
---
## 📚 Références
- [HLS Specification (Apple)](https://developer.apple.com/streaming/)
- [FFmpeg Documentation](https://ffmpeg.org/documentation.html)
- [Documentation STREAM_ENCODING_PIPELINE.md](./STREAM_ENCODING_PIPELINE.md)
- [Audit AUDIT_STREAM_PROCESSING.md](./AUDIT_STREAM_PROCESSING.md)
---
**Auteur**: Veza Stream Server Team
**Dernière mise à jour**: 2025-12-05