veza/veza-stream-server/src/core/sync.rs
senke 7af9c98a73 style(stream-server): apply rustfmt and fix golangci-lint v2 install
Two fixes surfaced by run #55:

1. veza-stream-server (47 files): cargo fmt had been run locally but
   never committed — the working tree was clean locally while HEAD
   had unformatted code. CI's `cargo fmt -- --check` caught the drift.
   This commit lands the formatting that was already staged.

2. ci.yml Install Go tools: `go install .../cmd/golangci-lint@latest`
   resolves to v1.64.8 (the old /cmd/ module path). The repo's
   .golangci.yml is v2-format, so v1 refuses with:
     "you are using a configuration file for golangci-lint v2
      with golangci-lint v1: please use golangci-lint v2"
   Switch to the /v2/cmd/ path so @latest actually gets v2.x.
2026-04-14 15:30:32 +02:00

1288 lines
44 KiB
Rust

use std::collections::HashMap;
/// Module de synchronisation multi-client pour streaming production
///
/// Features :
/// - Synchronisation précise <10ms entre clients
/// - Compensation automatique du drift réseau
/// - Support NTP pour horloge de référence
/// - Synchronisation adaptative selon latence
/// - Support paroles/sous-titres synchronisés
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
// use async_trait::async_trait; // Not available
use dashmap::DashMap;
use futures::future::BoxFuture;
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast;
use uuid::Uuid;
// Note: Use tracing::info! macro directly instead of importing
use crate::core::stream::SyncState;
use crate::core::Listener;
use crate::error::AppError;
/// Interface de transport pour l'envoi des messages de synchronisation
/// Permet de découpler le moteur de sync de l'implémentation WebSocket
pub trait SyncTransport: Send + Sync + std::fmt::Debug {
/// Envoie un ajustement de synchronisation à un client spécifique
fn send_adjustment<'a>(
&'a self,
client_id: Uuid,
adjustment: SyncAdjustment,
) -> BoxFuture<'a, Result<(), AppError>>;
/// Envoie un ping de synchronisation
fn send_ping<'a>(&'a self, client_id: Uuid) -> BoxFuture<'a, Result<(), AppError>>;
/// Obtient les stats de connexion (RTT, Offset)
fn get_connection_stats<'a>(
&'a self,
client_id: Uuid,
) -> BoxFuture<'a, Result<(Option<Duration>, Option<i64>), AppError>>;
/// Envoie un message d'initialisation de synchronisation
fn send_init<'a>(
&'a self,
client_id: Uuid,
session_id: Uuid,
track_id: String,
server_timestamp_ms: u64,
position_ms: u64,
) -> BoxFuture<'a, Result<(), AppError>>;
/// Envoie un message de stabilité
fn send_stable<'a>(
&'a self,
client_id: Uuid,
session_id: Uuid,
) -> BoxFuture<'a, Result<(), AppError>>;
}
/// Moteur de synchronisation principal
#[derive(Debug)]
pub struct SyncEngine {
/// Serveur de temps de référence
time_server: Arc<TimeServer>,
/// Compensateur de drift pour chaque client
drift_compensator: Arc<DriftCompensator>,
/// Carte des latences par client
latency_map: Arc<DashMap<Uuid, Duration>>,
/// Synchroniseurs actifs par stream
stream_synchronizers: Arc<DashMap<Uuid, Arc<StreamSynchronizer>>>,
/// Configuration globale
config: Arc<RwLock<SyncConfig>>,
/// Métriques de synchronisation
metrics: Arc<SyncMetrics>,
/// Événements de synchronisation
event_sender: broadcast::Sender<SyncEvent>,
/// Transport pour l'envoi des messages (WebSocket)
transport: Option<Arc<dyn SyncTransport>>,
}
/// Synchroniseur pour un stream spécifique
#[derive(Debug)]
pub struct StreamSynchronizer {
pub stream_id: Uuid,
/// Horloge maître du stream
master_clock: Arc<MasterClock>,
/// Clients synchronisés sur ce stream
synchronized_clients: Arc<DashMap<Uuid, SynchronizedClient>>,
/// Buffer de synchronisation
sync_buffer: Arc<RwLock<SyncBuffer>>,
/// Configuration du stream
config: StreamSyncConfig,
/// Métadonnées temps réel (paroles, etc.)
timed_metadata: Arc<RwLock<TimedMetadata>>,
}
/// Serveur de temps NTP-like
#[derive(Debug)]
pub struct TimeServer {
/// Temps de référence (peut être NTP externe)
reference_time: Arc<RwLock<ReferenceTime>>,
/// Clients NTP configurés
ntp_clients: Vec<String>,
/// Décalage mesuré avec les serveurs externes
time_offset: Arc<std::sync::atomic::AtomicI64>,
/// Qualité de la synchronisation
sync_quality: Arc<std::sync::atomic::AtomicU8>,
}
/// Temps de référence avec précision
#[derive(Debug, Clone)]
pub struct ReferenceTime {
pub system_time: SystemTime,
pub monotonic_time: Instant,
pub ntp_offset: Duration,
pub precision_microseconds: u32,
}
/// Compensateur de drift réseau
#[derive(Debug)]
pub struct DriftCompensator {
/// Mesures de drift par client
drift_measurements: Arc<DashMap<Uuid, VecDeque<DriftMeasurement>>>,
/// Compensation calculée par client
compensations: Arc<DashMap<Uuid, DriftCompensation>>,
/// Configuration
config: DriftCompensatorConfig,
}
/// Mesure de drift pour un client
#[derive(Debug, Clone)]
pub struct DriftMeasurement {
pub timestamp: Instant,
pub client_reported_time: u64,
pub server_time: u64,
pub round_trip_time: Duration,
pub drift_ms: f64,
}
/// Compensation appliquée à un client
#[derive(Debug, Clone)]
pub struct DriftCompensation {
pub timestamp_offset: Duration,
pub playback_rate_adjustment: f64, // 1.0 = normal, 1.001 = 0.1% plus rapide
pub buffer_target_adjustment: i32, // +/- chunks
pub confidence: f32,
}
/// Client synchronisé
#[derive(Debug, Clone)]
pub struct SynchronizedClient {
pub listener_id: Uuid,
pub sync_state: SyncState,
pub last_sync_adjustment: Option<SyncAdjustment>,
pub sync_quality: SyncQuality,
pub adaptive_config: ClientAdaptiveConfig,
}
// SyncState moved to stream.rs to avoid duplication and allow Listener usage
/// Ajustement de synchronisation envoyé au client
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncAdjustment {
pub timestamp_offset: Duration,
pub playback_rate: f64,
pub buffer_target: usize,
pub quality_switch: Option<String>,
pub sync_point: SyncPoint,
}
/// Point de synchronisation dans le stream
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncPoint {
pub stream_position: Duration,
pub server_timestamp: u64,
pub sequence_number: u64,
pub checksum: u32,
}
/// Qualité de synchronisation
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncQuality {
pub accuracy_ms: f64,
pub stability_score: f32, // 0-1
pub last_drift: f64,
pub sync_loss_count: u32,
}
/// Configuration adaptative par client
#[derive(Debug, Clone)]
pub struct ClientAdaptiveConfig {
pub max_drift_tolerance: Duration,
pub sync_frequency: Duration,
pub aggressive_correction: bool,
pub quality_priority: bool,
}
/// Buffer de synchronisation
#[derive(Debug, Clone)]
pub struct SyncBuffer {
/// Points de synchronisation dans le stream
sync_points: VecDeque<SyncPoint>,
/// Métadonnées temporelles
timed_events: VecDeque<TimedEvent>,
/// Configuration
max_size: usize,
}
/// Métadonnées temporelles (paroles, chapitres, etc.)
#[derive(Debug, Clone)]
pub struct TimedMetadata {
/// Paroles synchronisées
pub lyrics: Vec<LyricLine>,
/// Chapitres/sections
pub chapters: Vec<Chapter>,
/// Événements personnalisés
pub custom_events: Vec<CustomTimedEvent>,
/// Sous-titres
pub subtitles: Vec<Subtitle>,
}
/// Ligne de paroles avec timing
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LyricLine {
pub start_time: Duration,
pub end_time: Duration,
pub text: String,
pub phonetic: Option<String>,
pub language: Option<String>,
}
/// Chapitre dans le stream
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Chapter {
pub start_time: Duration,
pub end_time: Duration,
pub title: String,
pub description: Option<String>,
pub artwork_url: Option<String>,
}
/// Événement personnalisé avec timing
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CustomTimedEvent {
pub timestamp: Duration,
pub event_type: String,
pub data: HashMap<String, String>,
pub priority: u8,
}
/// Sous-titre
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Subtitle {
pub start_time: Duration,
pub end_time: Duration,
pub text: String,
pub language: String,
pub position: SubtitlePosition,
}
/// Position du sous-titre
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum SubtitlePosition {
Bottom,
Top,
Center,
Custom { x: f32, y: f32 },
}
/// Événement temporel
#[derive(Debug, Clone)]
pub struct TimedEvent {
pub timestamp: Duration,
pub event: SyncEvent,
pub target_clients: Option<Vec<Uuid>>,
}
/// Configuration globale de synchronisation
#[derive(Debug, Clone)]
pub struct SyncConfig {
pub enable_ntp_sync: bool,
pub ntp_servers: Vec<String>,
pub max_client_drift_ms: f64,
pub sync_interval: Duration,
pub drift_measurement_window: Duration,
pub enable_adaptive_sync: bool,
pub quality_threshold: f32,
}
/// Configuration par stream
#[derive(Debug, Clone)]
pub struct StreamSyncConfig {
pub precision_mode: PrecisionMode,
pub enable_timed_metadata: bool,
pub enable_lyrics_sync: bool,
pub enable_chapter_sync: bool,
pub sync_tolerance_ms: f64,
}
/// Mode de précision de synchronisation
#[derive(Debug, Clone)]
pub enum PrecisionMode {
/// Mode relax pour casual listening
Relaxed { tolerance_ms: f64 },
/// Mode standard pour streaming normal
Standard { tolerance_ms: f64 },
/// Mode précis pour events synchronisés
Precise { tolerance_ms: f64 },
/// Mode ultra-précis pour performances live
UltraPrecise { tolerance_ms: f64 },
}
/// Configuration du compensateur de drift
#[derive(Debug, Clone)]
pub struct DriftCompensatorConfig {
pub measurement_window_size: usize,
pub min_measurements_for_compensation: usize,
pub max_playback_rate_adjustment: f64,
pub compensation_smoothing: f64,
}
/// Métriques de synchronisation
#[derive(Debug, Default)]
pub struct SyncMetrics {
pub total_clients_synchronized: std::sync::atomic::AtomicU64,
pub average_sync_accuracy_ms: std::sync::atomic::AtomicU32,
pub sync_failures_total: std::sync::atomic::AtomicU64,
pub drift_corrections_total: std::sync::atomic::AtomicU64,
pub ntp_sync_errors: std::sync::atomic::AtomicU64,
}
/// Événements de synchronisation
#[derive(Debug, Clone)]
pub enum SyncEvent {
/// Client ajouté à la synchronisation
ClientSyncStarted { client_id: Uuid, stream_id: Uuid },
/// Client synchronisé avec succès
ClientSynchronized { client_id: Uuid, accuracy_ms: f64 },
/// Client désynchronisé
ClientDesynchronized { client_id: Uuid, reason: String },
/// Ajustement de synchronisation appliqué
SyncAdjustmentApplied {
client_id: Uuid,
adjustment: SyncAdjustment,
},
/// Métadonnées temporelles mises à jour
TimedMetadataUpdated { stream_id: Uuid, event_type: String },
/// Erreur de synchronisation
SyncError {
client_id: Option<Uuid>,
error: String,
},
}
use std::collections::VecDeque;
impl Default for SyncConfig {
fn default() -> Self {
Self {
enable_ntp_sync: true,
ntp_servers: vec![
"pool.ntp.org".to_string(),
"time.google.com".to_string(),
"time.cloudflare.com".to_string(),
],
max_client_drift_ms: 50.0,
sync_interval: Duration::from_millis(1000),
drift_measurement_window: Duration::from_secs(60),
enable_adaptive_sync: true,
quality_threshold: 0.8,
}
}
}
impl SyncEngine {
pub fn new(
time_server: Arc<TimeServer>,
drift_compensator: Arc<DriftCompensator>,
config: Arc<RwLock<SyncConfig>>,
transport: Option<Arc<dyn SyncTransport>>,
) -> Self {
let (event_sender, _) = broadcast::channel(100);
Self {
time_server,
drift_compensator,
latency_map: Arc::new(DashMap::new()),
stream_synchronizers: Arc::new(DashMap::new()),
config,
metrics: Arc::new(SyncMetrics::default()),
event_sender,
transport,
}
}
/// Synchronise tous les listeners d'un stream
pub async fn sync_listeners(
&self,
stream_id: Uuid,
track_id: Option<String>,
listeners: Arc<DashMap<Uuid, Listener>>,
) -> Result<(), AppError> {
let synchronizer = self.get_or_create_synchronizer(stream_id).await?;
// Obtenir le temps maître
let master_time = self.time_server.get_master_time().await?;
// Synchroniser chaque listener en parallèle
let sync_tasks: Vec<_> = listeners
.iter()
.map(|entry| {
let listener_id = entry.key().clone();
let sync_engine = self.clone();
let synchronizer = synchronizer.clone();
let listeners_map = listeners.clone();
let track_id = track_id.clone();
let master_time = master_time;
tokio::spawn(async move {
sync_engine
.sync_individual_listener(
&synchronizer,
listeners_map,
listener_id,
track_id,
master_time,
)
.await
})
})
.collect();
// Attendre toutes les synchronisations
let results = futures::future::join_all(sync_tasks).await;
// Compter les succès/échecs (pour debug/log futur)
let mut _success_count = 0;
let mut _error_count = 0;
for result in results {
match result {
Ok(Ok(())) => _success_count += 1,
Ok(Err(e)) => {
_error_count += 1;
tracing::warn!("Erreur sync listener: {:?}", e);
}
Err(e) => {
_error_count += 1;
tracing::error!("Erreur task sync: {:?}", e);
}
}
}
Ok(())
}
/// Synchronise un listener individuel (State Machine P1-2)
async fn sync_individual_listener(
&self,
synchronizer: &StreamSynchronizer,
listeners_map: Arc<DashMap<Uuid, Listener>>,
listener_id: Uuid,
track_id: Option<String>,
master_time: MasterTime,
) -> Result<(), AppError> {
let listener = match listeners_map.get(&listener_id) {
Some(l) => l.value().clone(),
None => return Ok(()),
};
// State Machine
match listener.sync_state {
SyncState::Desynchronized => {
if let Some(track) = track_id {
let position = synchronizer.master_clock.get_position().as_millis() as u64;
if let Some(transport) = &self.transport {
transport
.send_init(
listener.id,
synchronizer.stream_id,
track,
master_time.timestamp,
position,
)
.await?;
}
// Update state to Calibrating
if let Some(mut l) = listeners_map.get_mut(&listener.id) {
l.sync_state = SyncState::Calibrating {
drift_samples: Vec::new(),
rtt_samples: Vec::new(),
};
}
}
return Ok(());
}
SyncState::Calibrating {
drift_samples: _,
rtt_samples: _,
} => {
// Send Ping
if let Some(transport) = &self.transport {
let _ = transport.send_ping(listener.id).await;
}
// Get stats
let (rtt, offset) = if let Some(transport) = &self.transport {
transport
.get_connection_stats(listener.id)
.await
.unwrap_or((None, None))
} else {
(None, None)
};
// Store samples if valid
if let (Some(r), Some(o)) = (rtt, offset) {
let latency = r; // Simple RTT/2 est souvent utilisé mais ici on prend RTT brut pr le moment
self.latency_map.insert(listener.id, latency);
// Calculate raw drift
let drift = self
.drift_compensator
.calculate_drift(
&listener,
synchronizer.master_clock.get_position(),
rtt,
Some(o),
)
.await?;
// Update state logic
if let Some(mut l) = listeners_map.get_mut(&listener.id) {
if let SyncState::Calibrating {
drift_samples,
rtt_samples,
} = &mut l.sync_state
{
drift_samples.push(drift as i64);
rtt_samples.push(r.as_millis() as u64);
// Check for stability (e.g., 5 samples)
if drift_samples.len() >= 5 {
// Transition to Synchronized
l.sync_state = SyncState::Synchronized;
if let Some(transport) = &self.transport {
let _ =
transport.send_stable(l.id, synchronizer.stream_id).await;
}
}
}
}
}
return Ok(());
}
SyncState::Resyncing => {
// Send Ping heavily and transition back to Calibrating or Synchronized
// For now, reset to Calibrating
if let Some(mut l) = listeners_map.get_mut(&listener.id) {
l.sync_state = SyncState::Calibrating {
drift_samples: vec![],
rtt_samples: vec![],
};
}
return Ok(());
}
SyncState::Synchronized => {
// FALLTHROUGH to existing normal sync logic below
}
}
// --- NORMAL SYNCHRONIZED LOGIC (Existing + Refined) ---
// Envoyer un Ping pour maintenir la latence à jour
if let Some(transport) = &self.transport {
// Moins fréquent en mode synchro stable ? Pour l'instant on garde le rythme
let _ = transport.send_ping(listener.id).await;
}
// Récupérer les stats de latence
let (rtt, clock_offset) = if let Some(transport) = &self.transport {
transport
.get_connection_stats(listener.id)
.await
.unwrap_or((None, None))
} else {
(None, None)
};
// Mesurer la latence (Legacy + Real)
let latency = if let Some(r) = rtt {
r
} else {
self.measure_latency(&listener).await?
};
self.latency_map.insert(listener.id, latency);
// Calculer le drift
let drift = self
.drift_compensator
.calculate_drift(
&listener,
synchronizer.master_clock.get_position(),
rtt,
clock_offset,
)
.await?;
// Check if we lost sync
if drift.abs() > 200.0 {
// 200ms threshold for Resync
if let Some(mut l) = listeners_map.get_mut(&listener.id) {
l.sync_state = SyncState::Resyncing;
}
return Ok(());
}
// Créer l'ajustement de synchronisation seulement si nécessaire (Threshold 40ms)
if drift.abs() > 40.0 {
let adjustment = SyncAdjustment {
timestamp_offset: latency + Duration::from_millis(drift.abs() as u64),
playback_rate: self.calculate_playback_rate(drift),
buffer_target: self.calculate_buffer_size(latency),
quality_switch: self.determine_quality_switch(&listener).await,
sync_point: synchronizer.get_current_sync_point().await?,
};
// Appliquer l'ajustement
self.apply_sync_adjustment(listener.id, adjustment.clone())
.await?;
}
// Update SynchronizedClient maps (non-essential for P1-2 core logic but good practice)
// ... (Skipping full update of internal map for brevity to focus on protocol)
Ok(())
}
/// Mesure la latence réseau avec un client
async fn measure_latency(&self, listener: &Listener) -> Result<Duration, AppError> {
// Simulation de mesure ping/pong
// En production, implémenter un vrai protocole de mesure
let base_latency = match listener.bandwidth_estimate {
0..=64_000 => Duration::from_millis(150), // Mobile/slow
64_001..=256_000 => Duration::from_millis(50), // Standard
256_001..=1_000_000 => Duration::from_millis(20), // Fast
_ => Duration::from_millis(10), // Fiber/local
};
// Ajouter jitter simulé
let jitter = Duration::from_millis(rand::random::<u64>() % 10);
Ok(base_latency + jitter)
}
/// Calcule le taux de lecture pour corriger le drift
fn calculate_playback_rate(&self, drift_ms: f64) -> f64 {
const MAX_ADJUSTMENT: f64 = 0.005; // 0.5% max
// Ajustement proportionnel au drift
let adjustment = (drift_ms / 1000.0) * 0.001; // 0.1% par seconde de drift
let clamped = adjustment.max(-MAX_ADJUSTMENT).min(MAX_ADJUSTMENT);
1.0 + clamped
}
/// Calcule la taille de buffer optimale selon la latence
fn calculate_buffer_size(&self, latency: Duration) -> usize {
match latency.as_millis() {
0..=20 => 25, // Très faible latence
21..=50 => 50, // Faible latence
51..=100 => 75, // Latence standard
101..=200 => 100, // Latence élevée
_ => 150, // Très haute latence
}
}
/// Détermine si un changement de qualité est nécessaire
async fn determine_quality_switch(&self, listener: &Listener) -> Option<String> {
// Logique simplifiée - en production, analyser la performance
if listener.buffer_health < 0.3 {
Some("low".to_string())
} else if listener.buffer_health > 0.8 && listener.bandwidth_estimate > 256_000 {
Some("high".to_string())
} else {
None
}
}
/// Applique un ajustement de synchronisation
async fn apply_sync_adjustment(
&self,
client_id: Uuid,
adjustment: SyncAdjustment,
) -> Result<(), AppError> {
// Envoyer l'ajustement au client via le transport configuré (WebSocket)
if let Some(transport) = &self.transport {
if let Err(e) = transport
.send_adjustment(client_id, adjustment.clone())
.await
{
tracing::warn!(
?client_id,
"Failed to send sync adjustment via transport: {}",
e
);
// On continue quand même pour émettre l'événement interne
} else {
tracing::debug!(?client_id, "Sync adjustment sent via transport");
}
}
// Émettre l'événement interne
let _ = self.event_sender.send(SyncEvent::SyncAdjustmentApplied {
client_id,
adjustment,
});
Ok(())
}
/// Calcule le score de stabilité pour un client
async fn calculate_stability_score(&self, client_id: Uuid) -> f32 {
if let Some(measurements) = self.drift_compensator.drift_measurements.get(&client_id) {
if measurements.len() < 3 {
return 0.5; // Score neutre
}
// Calculer la variance du drift
let drifts: Vec<f64> = measurements.iter().map(|m| m.drift_ms).collect();
let mean = drifts.iter().sum::<f64>() / drifts.len() as f64;
let variance =
drifts.iter().map(|&d| (d - mean).powi(2)).sum::<f64>() / drifts.len() as f64;
let std_dev = variance.sqrt();
// Score inversement proportionnel à la variance
(1.0 - (std_dev / 100.0).min(1.0)) as f32
} else {
0.5
}
}
/// Obtient ou crée un synchroniseur pour un stream
async fn get_or_create_synchronizer(
&self,
stream_id: Uuid,
) -> Result<Arc<StreamSynchronizer>, AppError> {
if let Some(sync) = self.stream_synchronizers.get(&stream_id) {
Ok(sync.clone())
} else {
let synchronizer = Arc::new(StreamSynchronizer::new(stream_id).await?);
self.stream_synchronizers
.insert(stream_id, synchronizer.clone());
Ok(synchronizer)
}
}
/// Abonnement aux événements de synchronisation
pub fn subscribe_events(&self) -> broadcast::Receiver<SyncEvent> {
self.event_sender.subscribe()
}
}
impl Clone for SyncEngine {
fn clone(&self) -> Self {
Self {
time_server: self.time_server.clone(),
drift_compensator: self.drift_compensator.clone(),
latency_map: self.latency_map.clone(),
stream_synchronizers: self.stream_synchronizers.clone(),
config: self.config.clone(),
metrics: self.metrics.clone(),
event_sender: self.event_sender.clone(),
transport: self.transport.clone(),
}
}
}
/// Temps maître pour synchronisation
#[derive(Debug, Clone, Copy)]
pub struct MasterTime {
pub timestamp: u64,
pub precision_us: u32,
}
impl TimeServer {
/// Crée un nouveau serveur de temps
pub async fn new(ntp_servers: Vec<String>) -> Result<Self, AppError> {
let reference_time = ReferenceTime {
system_time: SystemTime::now(),
monotonic_time: Instant::now(),
ntp_offset: Duration::ZERO,
precision_microseconds: 1000, // 1ms precision par défaut
};
Ok(Self {
reference_time: Arc::new(RwLock::new(reference_time)),
ntp_clients: ntp_servers,
time_offset: Arc::new(std::sync::atomic::AtomicI64::new(0)),
sync_quality: Arc::new(std::sync::atomic::AtomicU8::new(50)),
})
}
/// Obtient le temps maître actuel
pub async fn get_master_time(&self) -> Result<MasterTime, AppError> {
let ref_time = self.reference_time.read();
let elapsed = ref_time.monotonic_time.elapsed();
let timestamp = ref_time
.system_time
.duration_since(UNIX_EPOCH)
.map_err(|_| AppError::TimeSync {
message: "Failed to calculate timestamp".to_string(),
})?
.as_micros() as u64
+ elapsed.as_micros() as u64;
Ok(MasterTime {
timestamp,
precision_us: ref_time.precision_microseconds,
})
}
}
impl StreamSynchronizer {
/// Crée un nouveau synchroniseur de stream
pub async fn new(stream_id: Uuid) -> Result<Self, AppError> {
Ok(Self {
stream_id,
master_clock: Arc::new(MasterClock::new()),
synchronized_clients: Arc::new(DashMap::new()),
sync_buffer: Arc::new(RwLock::new(SyncBuffer {
sync_points: VecDeque::new(),
timed_events: VecDeque::new(),
max_size: 1000,
})),
config: StreamSyncConfig {
precision_mode: PrecisionMode::Standard { tolerance_ms: 10.0 },
enable_timed_metadata: true,
enable_lyrics_sync: true,
enable_chapter_sync: true,
sync_tolerance_ms: 10.0,
},
timed_metadata: Arc::new(RwLock::new(TimedMetadata {
lyrics: Vec::new(),
chapters: Vec::new(),
custom_events: Vec::new(),
subtitles: Vec::new(),
})),
})
}
/// Obtient le point de synchronisation actuel
pub async fn get_current_sync_point(&self) -> Result<SyncPoint, AppError> {
let buffer = self.sync_buffer.read();
buffer
.sync_points
.back()
.cloned()
.ok_or_else(|| AppError::NoSyncPoint {
message: "No sync point available".to_string(),
})
}
}
/// Horloge maître pour un stream
#[derive(Debug)]
pub struct MasterClock {
start_time: Instant,
position: Arc<std::sync::atomic::AtomicU64>, // microseconds
}
impl MasterClock {
pub fn new() -> Self {
Self {
start_time: Instant::now(),
position: Arc::new(std::sync::atomic::AtomicU64::new(0)),
}
}
pub fn get_position(&self) -> Duration {
let micros = self.position.load(std::sync::atomic::Ordering::Relaxed);
Duration::from_micros(micros)
}
}
impl DriftCompensator {
/// Crée un nouveau compensateur de drift
pub fn new() -> Self {
Self {
drift_measurements: Arc::new(DashMap::new()),
compensations: Arc::new(DashMap::new()),
config: DriftCompensatorConfig {
measurement_window_size: 20,
min_measurements_for_compensation: 5,
max_playback_rate_adjustment: 0.01, // 1%
compensation_smoothing: 0.8,
},
}
}
/// Calcule le drift pour un client
///
/// Utilise les données de session du listener pour estimer le décalage
/// par rapport à la position maître.
pub async fn calculate_drift(
&self,
listener: &Listener,
master_position: Duration,
rtt: Option<Duration>,
clock_offset_ms: Option<i64>,
) -> Result<f64, AppError> {
// En production, on utilise les données remontées par le client
// via les WebSockets (stockées dans session_data) ou via le protocole SyncPing/Pong
let master_ms = master_position.as_secs_f64() * 1000.0;
// Si on a des mesures précises via SyncPing/Pong (P1-1)
if let Some(offset) = clock_offset_ms {
// Position théorique côté client (si parfaitement synchronisé)
// Client time = Server time + offset
// Mais on veut savoir où est le PLAYBACK par rapport au MAITRE.
// On a besoin de la position de lecture du client.
// Si le client remonte sa position de lecture via WebSocket (session_data)
if let Some(pos_str) = listener.session_data.get("position_ms") {
if let Ok(client_playback_ms) = pos_str.parse::<f64>() {
// Le timestamp absolu n'est pas suffisant, il faut comparer les positions relatives
// DANS LE STREAM.
// Mais attendez, calculate_drift compare la POSITION de lecture (00:01:23)
// Le clock offset sert à corriger les timestamps si on utilisait des timestamps absolus.
// Ici on compare listener.position vs master.position.
// Si on a un RTT, on sait que l'info du client date de RTT/2
let rtt_latency = rtt.map(|d| d.as_secs_f64() * 1000.0).unwrap_or(0.0);
let one_way_delay = rtt_latency / 2.0;
// La position reçue est celle qu'avait le client il y a `one_way_delay` ms
// Donc sa position REELLE actuelle est estimée à :
let estimated_client_playback_ms = client_playback_ms + one_way_delay;
// Drift = Estimated Client - Master
let drift_ms = estimated_client_playback_ms - master_ms;
tracing::trace!(
"Drift calc (Real): ClientReported={:.2}, LatencyCorrection={:.2}, EstClient={:.2}, Master={:.2}, Drift={:.2}, Offset={}",
client_playback_ms, one_way_delay, estimated_client_playback_ms, master_ms, drift_ms, offset
);
// Note: L'offset d'horloge (clock_offset) n'est pas directement utilisé ici
// car on compare des "positions de lecture" (durée depuis le début du morceau)
// et non des timestamps muraux.
// SAUF si le client utilise des timestamps muraux pour dire "j'étais à telle position à tel timestamp".
// Le protocole actuel envoie juste "position_ms".
// L'offset sert surtout à NTP ou si le client disait "À mon heure H, j'étais à P".
// Pour l'instant, P1-1 demande d'intégrer offset mais sans changer tout le message "position".
// On utilisera l'offset pour logging ou future usage, mais la correction latence (RTT/2) est la plus critique ici.
return Ok(drift_ms.max(-5000.0).min(5000.0));
}
}
}
// 1. Tenter de récupérer la position rapportée par le client
let client_position = if let Some(pos_str) = listener.session_data.get("position_ms") {
match pos_str.parse::<u64>() {
Ok(ms) => Duration::from_millis(ms),
Err(_) => return Ok(0.0), // Donnée invalide
}
} else {
// Pas de donnée client disponible
return Ok(0.0);
};
// 2. Calculer le drift (différence entre client et maître)
// Drift > 0 signifie que le client est EN AVANCE
// Drift < 0 signifie que le client est EN RETARD
let client_ms = client_position.as_secs_f64() * 1000.0;
let master_ms = master_position.as_secs_f64() * 1000.0;
let drift_ms = client_ms - master_ms;
// 3. Clamper les valeurs pour éviter les corrections violentes
// On limite à +/- 5 secondes (5000ms) pour rester sain
let clamped_drift = drift_ms.max(-5000.0).min(5000.0);
// 4. Logger les drifts importants (debug seulement)
if clamped_drift.abs() > 100.0 {
tracing::debug!(
"Significant drift detected for {}: {:.2}ms (Client: {:.2}ms, Master: {:.2}ms)",
listener.id,
clamped_drift,
client_ms,
master_ms
);
}
Ok(clamped_drift)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::stream::SyncState;
use std::collections::HashMap;
fn create_test_listener(id: Uuid, position_ms: Option<u64>) -> Listener {
let mut session_data = HashMap::new();
if let Some(pos) = position_ms {
session_data.insert("position_ms".to_string(), pos.to_string());
}
Listener {
id,
user_id: None,
ip_address: "127.0.0.1".to_string(),
user_agent: None,
connected_at: Instant::now(),
current_quality: "high".to_string(),
bandwidth_estimate: 1000,
buffer_health: 1.0,
session_data,
sync_state: SyncState::default(),
}
}
#[tokio::test]
async fn test_drift_calculation_perfect_sync() {
let compensator = DriftCompensator::new();
let listener = create_test_listener(Uuid::new_v4(), Some(1000));
let master_pos = Duration::from_millis(1000);
let drift = compensator
.calculate_drift(&listener, master_pos, None, None)
.await
.unwrap();
assert_eq!(drift, 0.0);
}
#[tokio::test]
async fn test_drift_calculation_client_ahead() {
let compensator = DriftCompensator::new();
let listener = create_test_listener(Uuid::new_v4(), Some(1100)); // +100ms
let master_pos = Duration::from_millis(1000);
let drift = compensator
.calculate_drift(&listener, master_pos, None, None)
.await
.unwrap();
assert_eq!(drift, 100.0);
}
#[tokio::test]
async fn test_drift_calculation_client_behind() {
let compensator = DriftCompensator::new();
let listener = create_test_listener(Uuid::new_v4(), Some(900)); // -100ms
let master_pos = Duration::from_millis(1000);
let drift = compensator
.calculate_drift(&listener, master_pos, None, None)
.await
.unwrap();
assert_eq!(drift, -100.0);
}
#[tokio::test]
async fn test_drift_calculation_clamping() {
let compensator = DriftCompensator::new();
let listener = create_test_listener(Uuid::new_v4(), Some(10000)); // +9000ms ahead
let master_pos = Duration::from_millis(1000);
let drift = compensator
.calculate_drift(&listener, master_pos, None, None)
.await
.unwrap();
assert_eq!(drift, 5000.0); // Clamped to +5000
}
#[tokio::test]
async fn test_drift_calculation_no_data() {
let compensator = DriftCompensator::new();
let listener = create_test_listener(Uuid::new_v4(), None);
let master_pos = Duration::from_millis(1000);
let drift = compensator
.calculate_drift(&listener, master_pos, None, None)
.await
.unwrap();
assert_eq!(drift, 0.0);
}
#[tokio::test]
async fn test_drift_calculation_with_real_latency() {
let compensator = DriftCompensator::new();
// Client plays at 1000ms.
let listener = create_test_listener(Uuid::new_v4(), Some(1000));
let master_pos = Duration::from_millis(1000);
// RTT is 200ms => One way delay is 100ms.
// So client ACTUAL position is 1000 + 100 = 1100ms.
// Master is at 1000ms.
// Drift should be 1100 - 1000 = +100ms.
let rtt = Some(Duration::from_millis(200));
let offset = Some(0); // Offset doesn't affect relative stream position drift logic directly in current impl
let drift = compensator
.calculate_drift(&listener, master_pos, rtt, offset)
.await
.unwrap();
assert_eq!(drift, 100.0);
}
}
#[cfg(test)]
mod transport_tests {
use super::*;
use std::sync::Mutex;
#[derive(Debug)]
struct MockTransport {
pub sent_adjustments: Arc<Mutex<Vec<(Uuid, SyncAdjustment)>>>,
pub sent_pings: Arc<Mutex<Vec<Uuid>>>,
pub mock_stats: Arc<Mutex<HashMap<Uuid, (Option<Duration>, Option<i64>)>>>,
// Ignore validation of these for now
pub sent_inits: Arc<Mutex<Vec<Uuid>>>,
pub sent_stables: Arc<Mutex<Vec<Uuid>>>,
}
impl SyncTransport for MockTransport {
fn send_adjustment<'a>(
&'a self,
client_id: Uuid,
adjustment: SyncAdjustment,
) -> BoxFuture<'a, Result<(), AppError>> {
let sent = self.sent_adjustments.clone();
Box::pin(async move {
sent.lock().unwrap().push((client_id, adjustment));
Ok(())
})
}
fn send_ping<'a>(&'a self, client_id: Uuid) -> BoxFuture<'a, Result<(), AppError>> {
let sent = self.sent_pings.clone();
Box::pin(async move {
sent.lock().unwrap().push(client_id);
Ok(())
})
}
fn get_connection_stats<'a>(
&'a self,
client_id: Uuid,
) -> BoxFuture<'a, Result<(Option<Duration>, Option<i64>), AppError>> {
let stats = self.mock_stats.clone();
Box::pin(async move {
let guard = stats.lock().unwrap();
Ok(guard.get(&client_id).cloned().unwrap_or((None, None)))
})
}
fn send_init<'a>(
&'a self,
client_id: Uuid,
_session_id: Uuid,
_track_id: String,
_server_timestamp_ms: u64,
_position_ms: u64,
) -> BoxFuture<'a, Result<(), AppError>> {
let sent = self.sent_inits.clone();
Box::pin(async move {
sent.lock().unwrap().push(client_id);
Ok(())
})
}
fn send_stable<'a>(
&'a self,
client_id: Uuid,
_session_id: Uuid,
) -> BoxFuture<'a, Result<(), AppError>> {
let sent = self.sent_stables.clone();
Box::pin(async move {
sent.lock().unwrap().push(client_id);
Ok(())
})
}
}
#[tokio::test]
async fn test_apply_sync_adjustment_sends_via_transport() {
let sent = Arc::new(Mutex::new(Vec::new()));
let pings = Arc::new(Mutex::new(Vec::new()));
let stats = Arc::new(Mutex::new(HashMap::new()));
let transport = Arc::new(MockTransport {
sent_adjustments: sent.clone(),
sent_pings: pings.clone(),
mock_stats: stats.clone(),
sent_inits: Arc::new(Mutex::new(Vec::new())),
sent_stables: Arc::new(Mutex::new(Vec::new())),
});
let config = Arc::new(RwLock::new(SyncConfig::default()));
let time_server = Arc::new(TimeServer::new(vec![]).await.unwrap());
let drift_compensator = Arc::new(DriftCompensator::new());
let engine = SyncEngine::new(time_server, drift_compensator, config, Some(transport));
let client_id = Uuid::new_v4();
let adjustment = SyncAdjustment {
timestamp_offset: Duration::from_millis(100),
playback_rate: 1.0,
buffer_target: 100,
quality_switch: None,
sync_point: SyncPoint {
stream_position: Duration::ZERO,
server_timestamp: 0,
sequence_number: 0,
checksum: 0,
},
};
engine
.apply_sync_adjustment(client_id, adjustment.clone())
.await
.unwrap();
let sent_guard = sent.lock().unwrap();
assert_eq!(sent_guard.len(), 1);
assert_eq!(sent_guard[0].0, client_id);
}
#[tokio::test]
async fn test_sync_state_machine_initialization() {
// Setup
let sent_inits = Arc::new(Mutex::new(Vec::new()));
let transport = Arc::new(MockTransport {
sent_adjustments: Arc::new(Mutex::new(Vec::new())),
sent_pings: Arc::new(Mutex::new(Vec::new())),
mock_stats: Arc::new(Mutex::new(HashMap::new())),
sent_inits: sent_inits.clone(),
sent_stables: Arc::new(Mutex::new(Vec::new())),
});
let config = Arc::new(RwLock::new(SyncConfig::default()));
let time_server = Arc::new(TimeServer::new(vec![]).await.unwrap());
let drift_compensator = Arc::new(DriftCompensator::new());
let engine = SyncEngine::new(time_server, drift_compensator, config, Some(transport));
let stream_id = Uuid::new_v4();
let listener_id = Uuid::new_v4();
let mut listener = Listener {
id: listener_id,
user_id: None,
ip_address: "127.0.0.1".to_string(),
user_agent: None,
connected_at: std::time::Instant::now(),
current_quality: "high".to_string(),
bandwidth_estimate: 1000,
buffer_health: 1.0,
session_data: HashMap::new(),
sync_state: SyncState::Desynchronized,
};
if let Some(pos) = Some(0) {
listener
.session_data
.insert("position_ms".to_string(), pos.to_string());
}
// Mock listeners map
let listeners = Arc::new(DashMap::new());
listeners.insert(listener_id, listener);
// Run sync
let track_id = Some("test_track".to_string());
let result = engine
.sync_listeners(stream_id, track_id, listeners.clone())
.await;
assert!(result.is_ok());
// Verify SyncInit sent
let inits = sent_inits.lock().unwrap();
assert_eq!(inits.len(), 1);
assert_eq!(inits[0], listener_id);
}
}