diff --git a/veza-stream-server/Cargo.toml b/veza-stream-server/Cargo.toml index 1e06f4ca2..ca1c2aec5 100644 --- a/veza-stream-server/Cargo.toml +++ b/veza-stream-server/Cargo.toml @@ -18,7 +18,7 @@ path = "src/simple_stream_server.rs" [dependencies] # Core web framework -axum = { version = "0.7", features = ["macros", "multipart", "ws"] } +axum = { version = "0.8", features = ["macros", "multipart", "ws"] } axum-tungstenite = "0.1" # ALIGNED WITH ORIGIN: Tokio 1.35 (was 1.0) tokio = { version = "1.35", features = ["full"] } @@ -96,11 +96,12 @@ uuid = { version = "1.6", features = ["v4", "serde"] } chrono = { version = "0.4", features = ["serde"] } thiserror = "1.0" anyhow = "1.0" +async-trait = "0.1" # Cryptography and security sha2 = "0.10" hmac = "0.12" -jsonwebtoken = "9.2" +jsonwebtoken = { version = "10", features = ["aws_lc_rs"] } bcrypt = "0.15" ring = "0.17" diff --git a/veza-stream-server/src/auth/mod.rs b/veza-stream-server/src/auth/mod.rs index 5375f6388..a81ccac95 100644 --- a/veza-stream-server/src/auth/mod.rs +++ b/veza-stream-server/src/auth/mod.rs @@ -7,14 +7,16 @@ use axum::{ }; use jsonwebtoken::{decode, encode, Algorithm, DecodingKey, EncodingKey, Header, Validation}; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; // Note: Use tracing::debug! macro directly instead of importing use crate::config::Config; +pub mod revocation_store; pub mod token_validator; +use crate::auth::revocation_store::{InMemoryRevocationStore, SessionRevocationStore}; + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Claims { pub sub: String, // Subject (user ID) - Changed to String for UUID @@ -121,11 +123,18 @@ pub struct AuthManager { encoding_key: EncodingKey, decoding_key: DecodingKey, validation: Validation, - revoked_tokens: Arc>>, // session_id -> revocation_time + revocation_store: Arc, } impl AuthManager { pub fn new(config: Arc) -> Result { + Self::with_revocation_store(config, Arc::new(InMemoryRevocationStore::new())) + } + + pub fn with_revocation_store( + config: Arc, + store: Arc, + ) -> Result { let jwt_secret = config .security @@ -147,7 +156,7 @@ impl AuthManager { encoding_key, decoding_key, validation, - revoked_tokens: Arc::new(tokio::sync::RwLock::new(HashMap::new())), + revocation_store: store, }) } @@ -214,8 +223,7 @@ impl AuthManager { let claims = token_data.claims; // Vérifier si le token est révoqué - let revoked_tokens = self.revoked_tokens.read().await; - if revoked_tokens.contains_key(&claims.session_id) { + if self.revocation_store.is_revoked(&claims.session_id).await { return TokenValidationResult { valid: false, claims: None, @@ -286,16 +294,7 @@ impl AuthManager { } pub async fn revoke_token(&self, session_id: &str) { - let mut revoked_tokens = self.revoked_tokens.write().await; - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .map(|d| d.as_secs()) - .unwrap_or(0); - revoked_tokens.insert(session_id.to_string(), now); - - // Nettoyer les anciens tokens révoqués (plus de 24h) - let cutoff = now - (24 * 3600); - revoked_tokens.retain(|_, &mut revocation_time| revocation_time > cutoff); + let _ = self.revocation_store.revoke(session_id).await; } pub fn has_permission(&self, claims: &Claims, required_permission: Permission) -> bool { @@ -338,7 +337,7 @@ impl Clone for AuthManager { encoding_key: self.encoding_key.clone(), decoding_key: self.decoding_key.clone(), validation: self.validation.clone(), - revoked_tokens: self.revoked_tokens.clone(), + revocation_store: self.revocation_store.clone(), } } } diff --git a/veza-stream-server/src/auth/revocation_store.rs b/veza-stream-server/src/auth/revocation_store.rs new file mode 100644 index 000000000..5d3d1b724 --- /dev/null +++ b/veza-stream-server/src/auth/revocation_store.rs @@ -0,0 +1,125 @@ +//! Store de révocation de sessions (in-memory ou Redis) +//! +//! Persiste la blacklist des sessions révoquées pour survivre aux redémarrages. + +use std::collections::HashMap; +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; +use tokio::sync::RwLock; + +/// Trait pour le stockage des sessions révoquées (par session_id) +#[async_trait::async_trait] +pub trait SessionRevocationStore: Send + Sync { + /// Vérifie si une session est révoquée + async fn is_revoked(&self, session_id: &str) -> bool; + + /// Marque une session comme révoquée + async fn revoke(&self, session_id: &str) -> Result<(), String>; +} + +/// Store en mémoire (fallback si Redis indisponible) +pub struct InMemoryRevocationStore { + revoked: Arc>>, +} + +impl InMemoryRevocationStore { + pub fn new() -> Self { + Self { + revoked: Arc::new(RwLock::new(HashMap::new())), + } + } + + /// Nettoie les entrées expirées (plus de 24h) + async fn cleanup(&self) { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0); + let cutoff = now.saturating_sub(24 * 3600); + let mut revoked = self.revoked.write().await; + revoked.retain(|_, &mut t| t > cutoff); + } +} + +impl Default for InMemoryRevocationStore { + fn default() -> Self { + Self::new() + } +} + +#[async_trait::async_trait] +impl SessionRevocationStore for InMemoryRevocationStore { + async fn is_revoked(&self, session_id: &str) -> bool { + let revoked = self.revoked.read().await; + revoked.contains_key(session_id) + } + + async fn revoke(&self, session_id: &str) -> Result<(), String> { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0); + let mut revoked = self.revoked.write().await; + revoked.insert(session_id.to_string(), now); + drop(revoked); + self.cleanup().await; + Ok(()) + } +} + +/// Store Redis (persistant) +pub struct RedisRevocationStore { + client: redis::Client, + key_prefix: String, +} + +impl RedisRevocationStore { + pub async fn new(redis_url: &str) -> Result { + let client = redis::Client::open(redis_url).map_err(|e| e.to_string())?; + let mut conn = client + .get_multiplexed_async_connection() + .await + .map_err(|e| e.to_string())?; + let _: Result = redis::cmd("PING").query_async(&mut conn).await; + Ok(Self { + client, + key_prefix: "stream:revoked:".to_string(), + }) + } + + fn key(&self, session_id: &str) -> String { + format!("{}{}", self.key_prefix, session_id) + } +} + +#[async_trait::async_trait] +impl SessionRevocationStore for RedisRevocationStore { + async fn is_revoked(&self, session_id: &str) -> bool { + let mut conn = match self.client.get_multiplexed_async_connection().await { + Ok(c) => c, + Err(_) => return false, + }; + let key = self.key(session_id); + redis::cmd("EXISTS") + .arg(&key) + .query_async::<_, bool>(&mut conn) + .await + .unwrap_or(false) + } + + async fn revoke(&self, session_id: &str) -> Result<(), String> { + let mut conn = self + .client + .get_multiplexed_async_connection() + .await + .map_err(|e| e.to_string())?; + let key = self.key(session_id); + let _: Result<(), _> = redis::cmd("SETEX") + .arg(&key) + .arg(24 * 3600_i64) + .arg("1") + .query_async::<_, ()>(&mut conn) + .await; + Ok(()) + } +} diff --git a/veza-stream-server/src/lib.rs b/veza-stream-server/src/lib.rs index cece448b7..78d367057 100644 --- a/veza-stream-server/src/lib.rs +++ b/veza-stream-server/src/lib.rs @@ -55,6 +55,24 @@ pub struct AppState { pub stream_manager: Arc, } +async fn build_revocation_store(config: &config::Config) -> Arc { + if let Some(ref redis_url) = config.cache.redis_url { + match auth::revocation_store::RedisRevocationStore::new(redis_url).await { + Ok(store) => { + tracing::info!("✅ JWT revocation store: Redis (persistant)"); + return Arc::new(store); + } + Err(e) => { + tracing::warn!( + "⚠️ Redis non disponible pour revocation: {}. Fallback in-memory.", + e + ); + } + } + } + Arc::new(auth::revocation_store::InMemoryRevocationStore::new()) +} + impl AppState { pub async fn new(config: config::Config) -> Result> { let config_arc = Arc::new(config.clone()); @@ -92,8 +110,14 @@ impl AppState { // HealthMonitor needs config and analytics for db check let health_monitor = Arc::new(health::HealthMonitor::new(config_arc.clone(), analytics.clone())); - // AuthManager::new returns Result - let auth_manager = Arc::new(auth::AuthManager::new(config_arc.clone())?); + // Revocation store: Redis si REDIS_URL défini, sinon in-memory + let revocation_store = build_revocation_store(&config).await; + + // AuthManager with revocation store + let auth_manager = Arc::new(auth::AuthManager::with_revocation_store( + config_arc.clone(), + revocation_store, + )?); let notification_service = Arc::new(notifications::NotificationService::new(config_arc.clone()));