feat(stream): add JWT revocation persistante Redis (P3.1)
- Add SessionRevocationStore trait with InMemoryRevocationStore and RedisRevocationStore - Wire Redis store when REDIS_URL in config.cache, fallback in-memory - Session revocation by session_id persists across restarts when using Redis Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
parent
27722db148
commit
55fbeb9e48
4 changed files with 169 additions and 20 deletions
|
|
@ -18,7 +18,7 @@ path = "src/simple_stream_server.rs"
|
|||
|
||||
[dependencies]
|
||||
# Core web framework
|
||||
axum = { version = "0.7", features = ["macros", "multipart", "ws"] }
|
||||
axum = { version = "0.8", features = ["macros", "multipart", "ws"] }
|
||||
axum-tungstenite = "0.1"
|
||||
# ALIGNED WITH ORIGIN: Tokio 1.35 (was 1.0)
|
||||
tokio = { version = "1.35", features = ["full"] }
|
||||
|
|
@ -96,11 +96,12 @@ uuid = { version = "1.6", features = ["v4", "serde"] }
|
|||
chrono = { version = "0.4", features = ["serde"] }
|
||||
thiserror = "1.0"
|
||||
anyhow = "1.0"
|
||||
async-trait = "0.1"
|
||||
|
||||
# Cryptography and security
|
||||
sha2 = "0.10"
|
||||
hmac = "0.12"
|
||||
jsonwebtoken = "9.2"
|
||||
jsonwebtoken = { version = "10", features = ["aws_lc_rs"] }
|
||||
bcrypt = "0.15"
|
||||
ring = "0.17"
|
||||
|
||||
|
|
|
|||
|
|
@ -7,14 +7,16 @@ use axum::{
|
|||
};
|
||||
use jsonwebtoken::{decode, encode, Algorithm, DecodingKey, EncodingKey, Header, Validation};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
// Note: Use tracing::debug! macro directly instead of importing
|
||||
use crate::config::Config;
|
||||
|
||||
pub mod revocation_store;
|
||||
pub mod token_validator;
|
||||
|
||||
use crate::auth::revocation_store::{InMemoryRevocationStore, SessionRevocationStore};
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct Claims {
|
||||
pub sub: String, // Subject (user ID) - Changed to String for UUID
|
||||
|
|
@ -121,11 +123,18 @@ pub struct AuthManager {
|
|||
encoding_key: EncodingKey,
|
||||
decoding_key: DecodingKey,
|
||||
validation: Validation,
|
||||
revoked_tokens: Arc<tokio::sync::RwLock<HashMap<String, u64>>>, // session_id -> revocation_time
|
||||
revocation_store: Arc<dyn SessionRevocationStore>,
|
||||
}
|
||||
|
||||
impl AuthManager {
|
||||
pub fn new(config: Arc<Config>) -> Result<Self, AuthError> {
|
||||
Self::with_revocation_store(config, Arc::new(InMemoryRevocationStore::new()))
|
||||
}
|
||||
|
||||
pub fn with_revocation_store(
|
||||
config: Arc<Config>,
|
||||
store: Arc<dyn SessionRevocationStore>,
|
||||
) -> Result<Self, AuthError> {
|
||||
let jwt_secret =
|
||||
config
|
||||
.security
|
||||
|
|
@ -147,7 +156,7 @@ impl AuthManager {
|
|||
encoding_key,
|
||||
decoding_key,
|
||||
validation,
|
||||
revoked_tokens: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
|
||||
revocation_store: store,
|
||||
})
|
||||
}
|
||||
|
||||
|
|
@ -214,8 +223,7 @@ impl AuthManager {
|
|||
let claims = token_data.claims;
|
||||
|
||||
// Vérifier si le token est révoqué
|
||||
let revoked_tokens = self.revoked_tokens.read().await;
|
||||
if revoked_tokens.contains_key(&claims.session_id) {
|
||||
if self.revocation_store.is_revoked(&claims.session_id).await {
|
||||
return TokenValidationResult {
|
||||
valid: false,
|
||||
claims: None,
|
||||
|
|
@ -286,16 +294,7 @@ impl AuthManager {
|
|||
}
|
||||
|
||||
pub async fn revoke_token(&self, session_id: &str) {
|
||||
let mut revoked_tokens = self.revoked_tokens.write().await;
|
||||
let now = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.map(|d| d.as_secs())
|
||||
.unwrap_or(0);
|
||||
revoked_tokens.insert(session_id.to_string(), now);
|
||||
|
||||
// Nettoyer les anciens tokens révoqués (plus de 24h)
|
||||
let cutoff = now - (24 * 3600);
|
||||
revoked_tokens.retain(|_, &mut revocation_time| revocation_time > cutoff);
|
||||
let _ = self.revocation_store.revoke(session_id).await;
|
||||
}
|
||||
|
||||
pub fn has_permission(&self, claims: &Claims, required_permission: Permission) -> bool {
|
||||
|
|
@ -338,7 +337,7 @@ impl Clone for AuthManager {
|
|||
encoding_key: self.encoding_key.clone(),
|
||||
decoding_key: self.decoding_key.clone(),
|
||||
validation: self.validation.clone(),
|
||||
revoked_tokens: self.revoked_tokens.clone(),
|
||||
revocation_store: self.revocation_store.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
125
veza-stream-server/src/auth/revocation_store.rs
Normal file
125
veza-stream-server/src/auth/revocation_store.rs
Normal file
|
|
@ -0,0 +1,125 @@
|
|||
//! Store de révocation de sessions (in-memory ou Redis)
|
||||
//!
|
||||
//! Persiste la blacklist des sessions révoquées pour survivre aux redémarrages.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
/// Trait pour le stockage des sessions révoquées (par session_id)
|
||||
#[async_trait::async_trait]
|
||||
pub trait SessionRevocationStore: Send + Sync {
|
||||
/// Vérifie si une session est révoquée
|
||||
async fn is_revoked(&self, session_id: &str) -> bool;
|
||||
|
||||
/// Marque une session comme révoquée
|
||||
async fn revoke(&self, session_id: &str) -> Result<(), String>;
|
||||
}
|
||||
|
||||
/// Store en mémoire (fallback si Redis indisponible)
|
||||
pub struct InMemoryRevocationStore {
|
||||
revoked: Arc<RwLock<HashMap<String, u64>>>,
|
||||
}
|
||||
|
||||
impl InMemoryRevocationStore {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
revoked: Arc::new(RwLock::new(HashMap::new())),
|
||||
}
|
||||
}
|
||||
|
||||
/// Nettoie les entrées expirées (plus de 24h)
|
||||
async fn cleanup(&self) {
|
||||
let now = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.map(|d| d.as_secs())
|
||||
.unwrap_or(0);
|
||||
let cutoff = now.saturating_sub(24 * 3600);
|
||||
let mut revoked = self.revoked.write().await;
|
||||
revoked.retain(|_, &mut t| t > cutoff);
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for InMemoryRevocationStore {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl SessionRevocationStore for InMemoryRevocationStore {
|
||||
async fn is_revoked(&self, session_id: &str) -> bool {
|
||||
let revoked = self.revoked.read().await;
|
||||
revoked.contains_key(session_id)
|
||||
}
|
||||
|
||||
async fn revoke(&self, session_id: &str) -> Result<(), String> {
|
||||
let now = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.map(|d| d.as_secs())
|
||||
.unwrap_or(0);
|
||||
let mut revoked = self.revoked.write().await;
|
||||
revoked.insert(session_id.to_string(), now);
|
||||
drop(revoked);
|
||||
self.cleanup().await;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Store Redis (persistant)
|
||||
pub struct RedisRevocationStore {
|
||||
client: redis::Client,
|
||||
key_prefix: String,
|
||||
}
|
||||
|
||||
impl RedisRevocationStore {
|
||||
pub async fn new(redis_url: &str) -> Result<Self, String> {
|
||||
let client = redis::Client::open(redis_url).map_err(|e| e.to_string())?;
|
||||
let mut conn = client
|
||||
.get_multiplexed_async_connection()
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
let _: Result<String, _> = redis::cmd("PING").query_async(&mut conn).await;
|
||||
Ok(Self {
|
||||
client,
|
||||
key_prefix: "stream:revoked:".to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
fn key(&self, session_id: &str) -> String {
|
||||
format!("{}{}", self.key_prefix, session_id)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl SessionRevocationStore for RedisRevocationStore {
|
||||
async fn is_revoked(&self, session_id: &str) -> bool {
|
||||
let mut conn = match self.client.get_multiplexed_async_connection().await {
|
||||
Ok(c) => c,
|
||||
Err(_) => return false,
|
||||
};
|
||||
let key = self.key(session_id);
|
||||
redis::cmd("EXISTS")
|
||||
.arg(&key)
|
||||
.query_async::<_, bool>(&mut conn)
|
||||
.await
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
async fn revoke(&self, session_id: &str) -> Result<(), String> {
|
||||
let mut conn = self
|
||||
.client
|
||||
.get_multiplexed_async_connection()
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
let key = self.key(session_id);
|
||||
let _: Result<(), _> = redis::cmd("SETEX")
|
||||
.arg(&key)
|
||||
.arg(24 * 3600_i64)
|
||||
.arg("1")
|
||||
.query_async::<_, ()>(&mut conn)
|
||||
.await;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
@ -55,6 +55,24 @@ pub struct AppState {
|
|||
pub stream_manager: Arc<StreamManager>,
|
||||
}
|
||||
|
||||
async fn build_revocation_store(config: &config::Config) -> Arc<dyn auth::revocation_store::SessionRevocationStore> {
|
||||
if let Some(ref redis_url) = config.cache.redis_url {
|
||||
match auth::revocation_store::RedisRevocationStore::new(redis_url).await {
|
||||
Ok(store) => {
|
||||
tracing::info!("✅ JWT revocation store: Redis (persistant)");
|
||||
return Arc::new(store);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
"⚠️ Redis non disponible pour revocation: {}. Fallback in-memory.",
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Arc::new(auth::revocation_store::InMemoryRevocationStore::new())
|
||||
}
|
||||
|
||||
impl AppState {
|
||||
pub async fn new(config: config::Config) -> Result<Self, Box<dyn std::error::Error>> {
|
||||
let config_arc = Arc::new(config.clone());
|
||||
|
|
@ -92,8 +110,14 @@ impl AppState {
|
|||
// HealthMonitor needs config and analytics for db check
|
||||
let health_monitor = Arc::new(health::HealthMonitor::new(config_arc.clone(), analytics.clone()));
|
||||
|
||||
// AuthManager::new returns Result
|
||||
let auth_manager = Arc::new(auth::AuthManager::new(config_arc.clone())?);
|
||||
// Revocation store: Redis si REDIS_URL défini, sinon in-memory
|
||||
let revocation_store = build_revocation_store(&config).await;
|
||||
|
||||
// AuthManager with revocation store
|
||||
let auth_manager = Arc::new(auth::AuthManager::with_revocation_store(
|
||||
config_arc.clone(),
|
||||
revocation_store,
|
||||
)?);
|
||||
|
||||
let notification_service =
|
||||
Arc::new(notifications::NotificationService::new(config_arc.clone()));
|
||||
|
|
|
|||
Loading…
Reference in a new issue