Backend Go: - Remplacement complet des anciennes migrations par la base V1 alignée sur ORIGIN. - Durcissement global du parsing JSON (BindAndValidateJSON + RespondWithAppError). - Sécurisation de config.go, CORS, statuts de santé et monitoring. - Implémentation des transactions P0 (RBAC, duplication de playlists, social toggles). - Ajout d’un job worker structuré (emails, analytics, thumbnails) + tests associés. - Nouvelle doc backend : AUDIT_CONFIG, BACKEND_CONFIG, AUTH_PASSWORD_RESET, JOB_WORKER_*. Chat server (Rust): - Refonte du pipeline JWT + sécurité, audit et rate limiting avancé. - Implémentation complète du cycle de message (read receipts, delivered, edit/delete, typing). - Nettoyage des panics, gestion d’erreurs robuste, logs structurés. - Migrations chat alignées sur le schéma UUID et nouvelles features. Stream server (Rust): - Refonte du moteur de streaming (encoding pipeline + HLS) et des modules core. - Transactions P0 pour les jobs et segments, garanties d’atomicité. - Documentation détaillée de la pipeline (AUDIT_STREAM_*, DESIGN_STREAM_PIPELINE, TRANSACTIONS_P0_IMPLEMENTATION). Documentation & audits: - TRIAGE.md et AUDIT_STABILITY.md à jour avec l’état réel des 3 services. - Cartographie complète des migrations et des transactions (DB_MIGRATIONS_*, DB_TRANSACTION_PLAN, AUDIT_DB_TRANSACTIONS, TRANSACTION_TESTS_PHASE3). - Scripts de reset et de cleanup pour la lab DB et la V1. Ce commit fige l’ensemble du travail de stabilisation P0 (UUID, backend, chat et stream) avant les phases suivantes (Coherence Guardian, WS hardening, etc.).
8.5 KiB
🎯 IMPLÉMENTATION PHASE 2 — Transactions P0 dans le Stream Server (Rust)
Date : 2025-01-27
Statut : ✅ COMPLÉTÉ
Référence : docs/DB_TRANSACTION_PLAN.md (sections Stream–P0)
📋 RÉSUMÉ EXÉCUTIF
Toutes les opérations critiques P0 du Stream Server Rust ont été rendues transactionnelles, garantissant l'atomicité et la cohérence des données en cas d'erreur ou de crash.
Opérations modifiées : 4
Fichiers modifiés : 3
Garanties ajoutées : Atomicité complète pour toutes les opérations multi-étapes
🔍 AUDIT SUMMARY
Opérations Non Transactionnelles Identifiées (P0)
| Fonction | Fichier | Risque | Priorité | Statut |
|---|---|---|---|---|
SegmentTracker.persist_segment |
segment_tracker.rs:82-106 |
Segments orphelins, incohérence durée | P0 | ✅ CORRIGÉ |
SegmentTracker.persist_all |
segment_tracker.rs:158-172 |
Segments partiellement persistés | P0 | ✅ CORRIGÉ |
StreamProcessor.finalize |
processor.rs:239-269 |
Job finalisé sans segments | P0 | ✅ CORRIGÉ |
EncoderWorker.parse_and_store_segments |
encoding_pool.rs:281-349 |
Playlist HLS incomplète | P1 | ✅ CORRIGÉ |
Risques Éliminés
✅ Segments orphelins : Impossible — INSERT segment + UPDATE job sont atomiques
✅ Jobs incomplets : Impossible — Finalisation inclut tous les segments dans une transaction
✅ Durées incorrectes : Impossible — Calcul et mise à jour dans la même transaction
✅ Playlists HLS incomplètes : Impossible — Tous les segments insérés en batch dans une transaction
🔧 MODIFICATIONS DÉTAILLÉES
1. SegmentTracker.persist_segment (P0)
Fichier : src/core/processing/segment_tracker.rs
Avant :
async fn persist_segment(&self, segment: &SegmentInfo) -> Result<(), AppError> {
// INSERT segment (sans transaction)
sqlx::query!("INSERT INTO stream_segments ...")
.execute(&self.db).await?;
// UPDATE job (séparé, pas atomique)
self.update_current_duration().await?;
}
Après (Transactionnel) :
async fn persist_segment(&self, segment: &SegmentInfo) -> Result<(), AppError> {
let mut tx = self.db.begin().await?;
// 1. VALIDATION : Job existe
// 2. INSERT segment
// 3. CALCUL : Durée totale
// 4. UPDATE job
// COMMIT
tx.commit().await?;
}
Garanties :
- ✅ INSERT segment + UPDATE job sont atomiques
- ✅ En cas d'erreur → rollback automatique
- ✅ Pas de segment orphelin possible
- ✅ Job toujours cohérent avec les segments
2. SegmentTracker.persist_all (P0)
Fichier : src/core/processing/segment_tracker.rs
Avant :
pub async fn persist_all(&self) -> Result<(), AppError> {
for segment in segments.iter() {
self.persist_segment(segment).await?; // Chaque segment dans sa propre transaction
}
}
Après (Transaction batch) :
pub async fn persist_all(&self) -> Result<(), AppError> {
let mut tx = self.db.begin().await?;
// 1. VALIDATION : Job existe
// 2. INSERT tous les segments en batch
// 3. CALCUL : Durée totale
// 4. UPDATE job
// COMMIT
tx.commit().await?;
}
Garanties :
- ✅ Tous les segments ou aucun (atomicité batch)
- ✅ Pas de playlist HLS partiellement créée
- ✅ Job toujours cohérent
3. StreamProcessor.finalize (P0)
Fichier : src/core/processing/processor.rs
Avant :
async fn finalize(&self, tracker: Arc<SegmentTracker>) -> Result<(), AppError> {
tracker.persist_all().await?; // Transaction séparée
callbacks.on_success().await?; // UPDATE job séparé
}
Après (Transaction complète) :
async fn finalize(&self, tracker: Arc<SegmentTracker>) -> Result<(), AppError> {
let mut tx = self.db.begin().await?;
// 1. VALIDATION : Job existe et est en "encoding"
// 2. INSERT tous les segments
// 3. CALCUL : Durée totale
// 4. UPDATE job (status = 'done', updated_at)
// COMMIT
tx.commit().await?;
}
Garanties :
- ✅ Job finalisé seulement si tous les segments sont persistés
- ✅ Pas de job "done" sans segments
- ✅ Durée totale toujours cohérente
4. EncoderWorker.parse_and_store_segments (P1)
Fichier : src/core/encoding_pool.rs
Avant :
async fn parse_and_store_segments(&self, job: &EncodeJob) -> Result<(), AppError> {
for segment in segments {
sqlx::query!("INSERT INTO stream_segments ...")
.execute(&self.db_pool).await?; // Pas de transaction
}
}
Après (Transaction batch) :
async fn parse_and_store_segments(&self, job: &EncodeJob) -> Result<(), AppError> {
let mut tx = self.db_pool.begin().await?;
// 1. VALIDATION : Job existe
// 2. INSERT tous les segments en batch
// 3. CALCUL : Durée totale
// 4. UPDATE job
// COMMIT
tx.commit().await?;
}
Garanties :
- ✅ Tous les segments ou aucun
- ✅ Playlist HLS complète ou vide
- ✅ Job toujours à jour
🛡️ GARANTIES DE COHÉRENCE
Invariants Garantis
-
Jamais de segment sans job valide
- Vérification dans chaque transaction
- Rollback si job n'existe pas
-
Job toujours cohérent avec segments
updated_atmis à jour dans la même transaction que les segments- Durée calculée depuis les segments réels
-
Pas de segments orphelins
- Si job supprimé, segments supprimés (CASCADE DB)
- Si INSERT segment échoue, UPDATE job annulé (rollback)
-
Pas de job finalisé sans segments
- Finalisation inclut persistance des segments
- Validation avant commit
-
Playlist HLS complète ou vide
- Batch INSERT garantit atomicité
- Pas de playlist partiellement générée
📊 PROPAGATION DES ERREURS
Toutes les erreurs sont correctement propagées via AppError :
// Pattern utilisé partout
tx.commit().await
.map_err(|e| AppError::InternalError {
message: format!("Failed to commit transaction: {}", e),
})?;
Types d'erreurs gérées :
- ❌ Erreur de connexion DB →
AppError::InternalError - ❌ Job n'existe pas →
AppError::NotFound - ❌ Contrainte violée →
AppError::InternalError(avec message détaillé) - ❌ Timeout transaction → Rollback automatique
🧵 SÉCURITÉ CONCURRENTIELLE
Les transactions SQLx garantissent l'isolation par défaut (READ COMMITTED) :
- ✅ Pas de race condition sur les segments (contrainte UNIQUE)
- ✅ Pas de double finalisation (UPDATE avec WHERE id = $1)
- ✅ Isolation des transactions concurrentes
Note : Pour des cas plus complexes, on pourrait utiliser FOR UPDATE dans les SELECT, mais ce n'est pas nécessaire pour les opérations P0 actuelles.
📝 LOGGING STRUCTURÉ
Tous les commits de transaction sont loggés avec tracing :
tracing::info!(
job_id = %self.job_id,
segment_count = segments.len(),
total_duration = total_duration,
"All segments persisted successfully in transaction"
);
Niveaux de log :
debug: Persistance individuelle de segmentinfo: Persistance batch, finalisation joberror: Erreurs de transaction (automatique via?)
✅ VALIDATION
Tests de Compilation
cd veza-stream-server && cargo check
Résultat : ✅ Compilation réussie (warnings non critiques uniquement)
Points de Validation
- Toutes les opérations P0 sont transactionnelles
- Propagation correcte des erreurs
- Logging structuré ajouté
- Code compile sans erreurs
- Pattern SQLx respecté (
begin(),commit(), rollback automatique)
🚀 PROCHAINES ÉTAPES (Phase 3)
Les tests transactionnels seront implémentés en Phase 3 :
- Tests unitaires : Simulation d'erreur au milieu de transaction
- Tests d'intégration : Vérification rollback en cas d'erreur
- Tests de charge : Validation performance avec transactions
Fichiers de tests à créer :
tests/segment_tracker_transaction_test.rstests/processor_finalize_transaction_test.rstests/encoding_pool_batch_transaction_test.rs
📚 RÉFÉRENCES
docs/DB_TRANSACTION_PLAN.md— Plan d'implémentation completdocs/AUDIT_DB_TRANSACTIONS.md— Audit initial des opérationsdocs/AUDIT_STABILITY.md— Audit de stabilité global
Date de création : 2025-01-27
Dernière mise à jour : 2025-01-27
Statut : ✅ Phase 2 complétée — Prêt pour Phase 3 (Tests)