Backend Go: - Remplacement complet des anciennes migrations par la base V1 alignée sur ORIGIN. - Durcissement global du parsing JSON (BindAndValidateJSON + RespondWithAppError). - Sécurisation de config.go, CORS, statuts de santé et monitoring. - Implémentation des transactions P0 (RBAC, duplication de playlists, social toggles). - Ajout d’un job worker structuré (emails, analytics, thumbnails) + tests associés. - Nouvelle doc backend : AUDIT_CONFIG, BACKEND_CONFIG, AUTH_PASSWORD_RESET, JOB_WORKER_*. Chat server (Rust): - Refonte du pipeline JWT + sécurité, audit et rate limiting avancé. - Implémentation complète du cycle de message (read receipts, delivered, edit/delete, typing). - Nettoyage des panics, gestion d’erreurs robuste, logs structurés. - Migrations chat alignées sur le schéma UUID et nouvelles features. Stream server (Rust): - Refonte du moteur de streaming (encoding pipeline + HLS) et des modules core. - Transactions P0 pour les jobs et segments, garanties d’atomicité. - Documentation détaillée de la pipeline (AUDIT_STREAM_*, DESIGN_STREAM_PIPELINE, TRANSACTIONS_P0_IMPLEMENTATION). Documentation & audits: - TRIAGE.md et AUDIT_STABILITY.md à jour avec l’état réel des 3 services. - Cartographie complète des migrations et des transactions (DB_MIGRATIONS_*, DB_TRANSACTION_PLAN, AUDIT_DB_TRANSACTIONS, TRANSACTION_TESTS_PHASE3). - Scripts de reset et de cleanup pour la lab DB et la V1. Ce commit fige l’ensemble du travail de stabilisation P0 (UUID, backend, chat et stream) avant les phases suivantes (Coherence Guardian, WS hardening, etc.).
15 KiB
🎬 Stream Processing Thread - Documentation Complète
Date: 2025-12-05
Service: veza-stream-server
Version: 1.0.0
Statut: ✅ Implémenté
📋 Table des Matières
- Vue d'ensemble
- Architecture
- Cycle de Vie
- Modules
- Format des Logs FFmpeg
- Mapping Timestamp → Segments
- ABR Support
- API Status
- Troubleshooting
- Checklist Production
🎯 Vue d'ensemble
Le Stream Processing Thread est le module central qui gère le traitement temps réel des jobs d'encodage audio via FFmpeg. Il orchestre:
- ✅ Spawn et monitoring du processus FFmpeg
- ✅ Détection en temps réel des segments HLS générés
- ✅ Persistance DB incrémentale des segments
- ✅ Gestion des erreurs et timeouts
- ✅ Callbacks de fin (success, error, timeout)
- ✅ Mise à jour des métadonnées HLS
Composants Principaux
StreamProcessor
├─ FFmpegMonitor (parsing stderr)
├─ SegmentTracker (suivi segments)
└─ ProcessingCallbacks (fin de traitement)
🏗️ Architecture
Diagramme de la Pipeline
┌─────────────────────────────────────────────────────────┐
│ EncoderPipeline::start_processing() │
└────────────────────┬────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ StreamProcessor::new() │
│ - job_id, job, db, output_dir │
└────────────────────┬────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ StreamProcessor::run() │
│ │
│ 1. spawn_ffmpeg() │
│ └─> FfmpegCommandBuilder │
│ └─> tokio::process::Command │
│ │
│ 2. monitor_output() │
│ ├─> FFmpegMonitor::monitor_stderr() │
│ │ └─> Lire stderr ligne par ligne │
│ │ └─> Détecter événements │
│ │ ├─ SegmentOpened │
│ │ ├─ Progress │
│ │ ├─ Error │
│ │ └─ MuxingComplete │
│ │ │
│ └─> SegmentTracker::register() │
│ └─> Persist immédiatement en DB │
│ │
│ 3. finalize() │
│ └─> ProcessingCallbacks │
│ ├─ on_success() │
│ ├─ on_error() │
│ └─ on_timeout() │
└─────────────────────────────────────────────────────────┘
Flux de Données
FFmpeg Process
└─> stderr (lignes)
└─> FFmpegMonitor
└─> FFmpegEvent
└─> StreamProcessor
└─> SegmentTracker
├─> Vec<SegmentInfo> (mémoire)
└─> DB (stream_segments)
└─> API Status Endpoint
🔄 Cycle de Vie
1. Initialisation
let processor = StreamProcessor::new(
job_id,
job,
db_pool,
output_dir,
)
.with_timeout(Duration::from_secs(600))
.with_status_sender(status_tx);
2. Spawn FFmpeg
// Créer répertoire de sortie
tokio::fs::create_dir_all(&output_dir).await?;
// Construire commande FFmpeg
let command = FfmpegCommandBuilder::new()
.input(&job.input_path)
.output(&manifest_path)
.audio_codec(job.codec)
.bitrate(job.bitrate)
.container(ContainerFormat::HLS)
.hls_time(job.hls_segment_time)
.build()?;
// Spawn processus
let child = command.spawn()?;
3. Monitoring Temps Réel
// Créer monitor
let (monitor, mut event_rx) = FFmpegMonitor::new(output_dir);
// Démarrer monitoring stderr
let monitor_handle = tokio::spawn(async move {
monitor.monitor_stderr(stderr).await
});
// Traiter événements
while let Some(event) = event_rx.recv().await {
match event {
FFmpegEvent::SegmentOpened { path, index } => {
tracker.register(segment).await?;
}
FFmpegEvent::Progress { time, speed } => {
// Log progression
}
FFmpegEvent::Error(msg) => {
// Log erreur
}
FFmpegEvent::MuxingComplete { overhead } => {
// Fin de muxing
}
}
}
4. Persistance DB
// Chaque segment est persisté immédiatement
sqlx::query!(
r#"
INSERT INTO stream_segments (track_id, quality, segment_index, path, duration, created_at)
VALUES ($1, $2, $3, $4, $5, NOW())
ON CONFLICT (track_id, quality, segment_index) DO UPDATE
SET path = EXCLUDED.path, duration = EXCLUDED.duration
"#,
track_id,
quality,
segment_index,
path,
duration
)
.execute(&db)
.await?;
5. Finalisation
// Succès
callbacks.on_success().await?;
callbacks.finalize_playlist().await?;
// Erreur
callbacks.on_error(&error_msg).await?;
// Timeout
callbacks.on_timeout().await?;
📦 Modules
1. processor.rs - StreamProcessor
Responsabilités:
- Orchestration du traitement complet
- Spawn FFmpeg
- Coordination monitor + tracker
- Gestion timeout et erreurs
API Principale:
pub struct StreamProcessor {
job_id: Uuid,
job: EncodeJob,
db: PgPool,
output_dir: PathBuf,
job_timeout: Duration,
}
impl StreamProcessor {
pub fn new(job_id, job, db, output_dir) -> Self;
pub fn with_timeout(self, timeout: Duration) -> Self;
pub fn with_status_sender(self, tx: mpsc::Sender<ProcessingStatus>) -> Self;
pub async fn run(&mut self) -> Result<()>;
}
2. ffmpeg_monitor.rs - FFmpegMonitor
Responsabilités:
- Lecture stderr ligne par ligne
- Parsing des événements FFmpeg
- Détection segments, erreurs, progression
Événements Détectés:
pub enum FFmpegEvent {
SegmentOpened { path: PathBuf, index: u32 },
Progress { time: Option<Duration>, speed: Option<f32> },
Error(String),
MuxingComplete { overhead: f32 },
}
Regex Utilisées:
Opening '([^']+)' for writing→ Segment ouverttime=(\d{2}:\d{2}:\d{2}\.\d{2})→ Progression(?i)(error|failed|invalid)→ Erreursmuxing overhead: ([\d.]+)%→ Fin muxing
3. segment_tracker.rs - SegmentTracker
Responsabilités:
- Suivi des segments détectés
- Persistance DB temps réel
- Calcul durée totale
API Principale:
pub struct SegmentTracker {
segments: Arc<RwLock<Vec<SegmentInfo>>>,
job_id: Uuid,
track_id: Uuid,
quality: String,
db: PgPool,
}
impl SegmentTracker {
pub async fn register(&self, segment: SegmentInfo) -> Result<()>;
pub async fn get_segments(&self) -> Vec<SegmentInfo>;
pub async fn get_total_duration(&self) -> f64;
pub async fn persist_all(&self) -> Result<()>;
}
4. callbacks.rs - ProcessingCallbacks
Responsabilités:
- Callbacks de fin de traitement
- Mise à jour DB status
- Finalisation playlist HLS
API Principale:
pub struct ProcessingCallbacks {
job_id: Uuid,
track_id: Uuid,
db: PgPool,
output_dir: PathBuf,
}
impl ProcessingCallbacks {
pub async fn on_success(&self) -> Result<()>;
pub async fn on_error(&self, error_message: &str) -> Result<()>;
pub async fn on_timeout(&self) -> Result<()>;
pub async fn finalize_playlist(&self) -> Result<()>;
}
📝 Format des Logs FFmpeg
Lignes de Progression
frame= 123 fps=0.0 q=-1.0 size= 1024kB time=00:00:05.12 bitrate=1638.4kbits/s speed=10.2x
Parsé par: FfmpegProgress::parse()
Champs extraits:
time:00:00:05.12→Duration::from_millis(5120)speed:10.2x→10.2bitrate:1638.4kbits/s→"1638.4kbits/s"
Ouverture de Segment
Opening 'segment_00000.ts' for writing
Détecté par: OPENING_SEGMENT_REGEX
Extraction:
path:segment_00000.tsindex:0(depuis le nom de fichier)
Erreurs
error: Invalid data found when processing input
Error opening input file
ERROR: Failed to open output file
Détecté par: ERROR_REGEX (case-insensitive)
Fin de Muxing
muxing overhead: 1.234567%
Détecté par: MUXING_OVERHEAD_REGEX
Extraction: overhead: 1.234567
🗺️ Mapping Timestamp → Segments
Détection des Segments
-
Depuis stderr:
Opening 'segment_00000.ts' for writing- Index extrait depuis le nom de fichier
- Durée par défaut:
4.0s(sera mise à jour depuis le manifest)
-
Depuis le répertoire: Scan périodique des fichiers
.ts- Utile pour détecter les segments déjà générés
- Méthode:
FFmpegMonitor::detect_existing_segments()
-
Depuis le manifest: Parsing du
.m3u8(après complétion)- Durée exacte depuis
#EXTINF:4.000, - Ordre des segments
- Durée exacte depuis
Structure SegmentInfo
pub struct SegmentInfo {
pub index: u32, // 0, 1, 2, ...
pub path: PathBuf, // /data/streams/track_id/quality/segment_00000.ts
pub duration: f64, // 4.0 (secondes)
pub timestamp: SystemTime, // Timestamp de création
}
Persistance DB
INSERT INTO stream_segments (
track_id, quality, segment_index, path, duration, created_at
)
VALUES ($1, $2, $3, $4, $5, NOW())
ON CONFLICT (track_id, quality, segment_index) DO UPDATE
SET path = EXCLUDED.path, duration = EXCLUDED.duration
🎚️ ABR Support
Structure Multi-Qualité
/data/streams/<track_id>/
low/
index.m3u8
segment_00000.ts
segment_00001.ts
...
medium/
index.m3u8
segment_00000.ts
segment_00001.ts
...
high/
index.m3u8
segment_00000.ts
segment_00001.ts
...
hi_res/
index.m3u8
segment_00000.ts
segment_00001.ts
...
Master Playlist (Futur)
Pour supporter ABR automatique, créer un master.m3u8:
#EXTM3U
#EXT-X-VERSION:3
#EXT-X-STREAM-INF:BANDWIDTH=64000,CODECS="mp4a.40.2"
low/playlist.m3u8
#EXT-X-STREAM-INF:BANDWIDTH=128000,CODECS="mp4a.40.2"
medium/playlist.m3u8
#EXT-X-STREAM-INF:BANDWIDTH=192000,CODECS="mp4a.40.2"
high/playlist.m3u8
#EXT-X-STREAM-INF:BANDWIDTH=320000,CODECS="mp4a.40.2"
hi_res/playlist.m3u8
Note: Le master playlist n'est pas encore généré automatiquement. C'est une feature future.
🌐 API Status
Endpoint
GET /api/streams/jobs/{id}/status
Response
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"track_id": "123e4567-e89b-12d3-a456-426614174000",
"status": "encoding",
"segments": [
{
"index": 0,
"path": "/data/streams/track_id/medium/segment_00000.ts",
"duration": 4.0,
"created_at": "2025-12-05T10:30:00Z"
},
{
"index": 1,
"path": "/data/streams/track_id/medium/segment_00001.ts",
"duration": 4.0,
"created_at": "2025-12-05T10:30:04Z"
}
],
"current_duration": 8.0,
"progress": 0.5,
"created_at": "2025-12-05T10:29:55Z",
"started_at": null,
"completed_at": null,
"error": null
}
Statuts Possibles
pending: Job en attenteencoding: Job en cours de traitementdone: Job terminé avec succèserror: Job échouétimeout: Job expiré
🔧 Troubleshooting
Problème: Segments non détectés
Symptômes:
- Aucun segment dans la DB
segments_count = 0dans l'API status
Solutions:
-
Vérifier que FFmpeg génère bien les segments:
ls -la /data/streams/<track_id>/<quality>/ -
Vérifier les logs stderr:
# Les logs doivent contenir "Opening 'segment_*.ts' for writing" -
Vérifier les permissions du répertoire:
chmod -R 755 /data/streams
Problème: Timeout fréquents
Symptômes:
- Jobs qui expirent avant la fin
status = "timeout"
Solutions:
-
Augmenter le timeout:
processor.with_timeout(Duration::from_secs(1200)); // 20 minutes -
Vérifier la charge CPU:
top -p $(pgrep -f ffmpeg) -
Réduire le nombre de workers simultanés
Problème: Erreurs FFmpeg non détectées
Symptômes:
- Job en
encodingindéfiniment - Pas d'erreur dans les logs
Solutions:
-
Vérifier les logs stderr:
// Les erreurs sont loggées avec tracing::warn! -
Vérifier le code de sortie FFmpeg:
// child.wait() retourne le status
Problème: Segments dupliqués en DB
Symptômes:
- Plusieurs entrées pour le même
segment_index
Solutions:
- Utiliser
ON CONFLICT DO UPDATE(déjà implémenté) - Vérifier les contraintes UNIQUE:
CONSTRAINT stream_segments_unique UNIQUE (track_id, quality, segment_index)
✅ Checklist Production
Avant Déploiement
- Vérifier que FFmpeg est installé et accessible
- Vérifier les permissions
/data/streams - Configurer le timeout approprié (défaut: 10 minutes)
- Vérifier la connexion DB
- Tester avec un fichier audio de test
Monitoring
- Logs structurés activés (
tracing) - Métriques Prometheus (si disponible)
- Alertes sur timeouts fréquents
- Alertes sur erreurs FFmpeg
Performance
- Limiter le nombre de workers simultanés
- Monitorer l'utilisation CPU
- Monitorer l'espace disque (
/data/streams) - Optimiser les requêtes DB (index)
Sécurité
- Validation des chemins d'entrée/sortie
- Isolation des processus FFmpeg
- Nettoyage des fichiers temporaires
- Rate limiting sur l'API
📚 Références
- HLS Specification (Apple)
- FFmpeg Documentation
- Documentation STREAM_ENCODING_PIPELINE.md
- Audit AUDIT_STREAM_PROCESSING.md
Auteur: Veza Stream Server Team
Dernière mise à jour: 2025-12-05