veza/veza-stream-server/src/auth/mod.rs
senke 7af9c98a73 style(stream-server): apply rustfmt and fix golangci-lint v2 install
Two fixes surfaced by run #55:

1. veza-stream-server (47 files): cargo fmt had been run locally but
   never committed — the working tree was clean locally while HEAD
   had unformatted code. CI's `cargo fmt -- --check` caught the drift.
   This commit lands the formatting that was already staged.

2. ci.yml Install Go tools: `go install .../cmd/golangci-lint@latest`
   resolves to v1.64.8 (the old /cmd/ module path). The repo's
   .golangci.yml is v2-format, so v1 refuses with:
     "you are using a configuration file for golangci-lint v2
      with golangci-lint v1: please use golangci-lint v2"
   Switch to the /v2/cmd/ path so @latest actually gets v2.x.
2026-04-14 15:30:32 +02:00

825 lines
27 KiB
Rust

use axum::{
extract::{Request, State},
http::{HeaderMap, StatusCode},
middleware::Next,
response::Response,
Json,
};
use jsonwebtoken::{decode, encode, Algorithm, DecodingKey, EncodingKey, Header, Validation};
use serde::{Deserialize, Serialize};
use std::fs;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
// Note: Use tracing::debug! macro directly instead of importing
use crate::config::Config;
pub mod revocation_store;
pub mod token_validator;
use crate::auth::revocation_store::{InMemoryRevocationStore, SessionRevocationStore};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Claims {
pub sub: String, // Subject (user ID) - Changed to String for UUID
pub username: String, // Username
pub email: Option<String>,
pub roles: Vec<Role>,
pub permissions: Vec<Permission>,
pub exp: u64, // Expiration time
pub iat: u64, // Issued at
pub iss: String, // Issuer
pub aud: String, // Audience
pub session_id: String, // Session ID pour la révocation
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum Role {
Admin,
Moderator,
User,
Premium,
Artist,
Guest,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum Permission {
// Permissions de streaming
StreamAudio,
StreamHighQuality,
StreamUnlimited,
// Permissions de contenu
UploadAudio,
DeleteAudio,
ModifyMetadata,
// Permissions administratives
ViewAnalytics,
ManageUsers,
SystemAdmin,
// Permissions sociales
CreatePlaylists,
ShareContent,
Comment,
Like,
// Permissions avancées
AccessAPI,
ManageSubscriptions,
ViewReports,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LoginRequest {
pub username: String,
pub password: String,
pub remember_me: Option<bool>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LoginResponse {
pub access_token: String,
pub refresh_token: String,
pub token_type: String,
pub expires_in: u64,
pub user_info: UserInfo,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UserInfo {
pub id: String, // Changed to String for UUID
pub username: String,
pub email: Option<String>,
pub roles: Vec<Role>,
pub permissions: Vec<Permission>,
pub subscription_tier: SubscriptionTier,
pub created_at: u64,
pub last_login: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum SubscriptionTier {
Free,
Premium,
Artist,
Enterprise,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RefreshTokenRequest {
pub refresh_token: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TokenValidationResult {
pub valid: bool,
pub claims: Option<Claims>,
pub error: Option<String>,
}
pub struct AuthManager {
config: Arc<Config>,
encoding_key: Option<EncodingKey>,
/// v0.9.1 RS256: verify tokens signed by backend with RSA
decoding_key_rs256: Option<DecodingKey>,
validation_rs256: Validation,
/// HS256 fallback (dev / transition)
decoding_key_hs256: Option<DecodingKey>,
validation_hs256: Validation,
revocation_store: Arc<dyn SessionRevocationStore>,
}
impl AuthManager {
pub fn new(config: Arc<Config>) -> Result<Self, AuthError> {
Self::with_revocation_store(config, Arc::new(InMemoryRevocationStore::new()))
}
pub fn with_revocation_store(
config: Arc<Config>,
store: Arc<dyn SessionRevocationStore>,
) -> Result<Self, AuthError> {
let mut validation_rs256 = Validation::new(Algorithm::RS256);
validation_rs256.set_audience(&["veza-services"]);
validation_rs256.set_issuer(&["veza-platform"]);
let mut validation_hs256 = Validation::new(Algorithm::HS256);
validation_hs256.set_audience(&["veza-services"]);
validation_hs256.set_issuer(&["veza-platform"]);
let decoding_key_rs256 = config
.security
.jwt_public_key_path
.as_ref()
.map(|path| {
let pem = fs::read_to_string(path).map_err(|e| {
AuthError::ConfigurationError(format!(
"Failed to read JWT public key from {}: {}",
path, e
))
})?;
DecodingKey::from_rsa_pem(pem.as_bytes()).map_err(|e| {
AuthError::ConfigurationError(format!("Invalid RSA public key: {}", e))
})
})
.transpose()?;
let (encoding_key, decoding_key_hs256) =
if let Some(ref secret) = config.security.jwt_secret {
(
Some(EncodingKey::from_secret(secret.as_bytes())),
Some(DecodingKey::from_secret(secret.as_bytes())),
)
} else {
(None, None)
};
if decoding_key_rs256.is_none() && decoding_key_hs256.is_none() {
return Err(AuthError::ConfigurationError(
"JWT_PUBLIC_KEY_PATH or JWT_SECRET must be configured".to_string(),
));
}
Ok(Self {
config,
encoding_key,
decoding_key_rs256,
validation_rs256,
decoding_key_hs256,
validation_hs256,
revocation_store: store,
})
}
/// Authenticate user. Stream server does NOT support direct login.
/// Users must obtain JWT from backend API (/api/v1/auth/login). This endpoint
/// always returns InvalidCredentials for security.
pub async fn authenticate_user(
&self,
_username: &str,
_password: &str,
) -> Result<UserInfo, AuthError> {
Err(AuthError::InvalidCredentials)
}
pub async fn generate_tokens(
&self,
user_info: &UserInfo,
remember_me: bool,
) -> Result<(String, String), AuthError> {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let session_id = uuid::Uuid::new_v4().to_string();
// Durée d'expiration selon remember_me
let expires_in = if remember_me {
self.config.security.jwt_expiration.as_secs() * 7 // 7 fois plus long
} else {
self.config.security.jwt_expiration.as_secs()
};
let claims = Claims {
sub: user_info.id.clone(), // Changed to String
username: user_info.username.clone(),
email: user_info.email.clone(),
roles: user_info.roles.clone(),
permissions: user_info.permissions.clone(),
exp: now + expires_in,
iat: now,
iss: "stream_server".to_string(),
aud: "stream_server".to_string(),
session_id: session_id.clone(),
};
let enc_key = self
.encoding_key
.as_ref()
.ok_or(AuthError::ConfigurationError(
"JWT_SECRET required for token generation (use RS256 for validation only)"
.to_string(),
))?;
let access_token = encode(&Header::default(), &claims, enc_key)
.map_err(|e| AuthError::TokenGenerationError(e.to_string()))?;
// Refresh token avec une durée plus longue
let refresh_claims = Claims {
exp: now + (expires_in * 2), // 2x plus long que l'access token
..claims.clone()
};
let refresh_token = encode(&Header::default(), &refresh_claims, enc_key)
.map_err(|e| AuthError::TokenGenerationError(e.to_string()))?;
Ok((access_token, refresh_token))
}
pub async fn validate_token(&self, token: &str) -> TokenValidationResult {
// v0.9.1: try RS256 first, then HS256 fallback
let mut last_err = None;
if let Some(ref key) = self.decoding_key_rs256 {
match decode::<Claims>(token, key, &self.validation_rs256) {
Ok(token_data) => {
let claims = token_data.claims;
if self.revocation_store.is_revoked(&claims.session_id).await {
return TokenValidationResult {
valid: false,
claims: None,
error: Some("Token has been revoked".to_string()),
};
}
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
if claims.exp < now {
return TokenValidationResult {
valid: false,
claims: None,
error: Some("Token has expired".to_string()),
};
}
return TokenValidationResult {
valid: true,
claims: Some(claims),
error: None,
};
}
Err(e) => last_err = Some(e.to_string()),
}
}
if let Some(ref key) = self.decoding_key_hs256 {
match decode::<Claims>(token, key, &self.validation_hs256) {
Ok(token_data) => {
let claims = token_data.claims;
if self.revocation_store.is_revoked(&claims.session_id).await {
return TokenValidationResult {
valid: false,
claims: None,
error: Some("Token has been revoked".to_string()),
};
}
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
if claims.exp < now {
return TokenValidationResult {
valid: false,
claims: None,
error: Some("Token has expired".to_string()),
};
}
return TokenValidationResult {
valid: true,
claims: Some(claims),
error: None,
};
}
Err(e) => last_err = Some(e.to_string()),
}
}
TokenValidationResult {
valid: false,
claims: None,
error: last_err.or_else(|| Some("No JWT verification key configured".to_string())),
}
}
pub async fn refresh_token(&self, refresh_token: &str) -> Result<(String, String), AuthError> {
let validation_result = self.validate_token(refresh_token).await;
if !validation_result.valid {
return Err(AuthError::InvalidToken(
validation_result
.error
.unwrap_or("Invalid refresh token".to_string()),
));
}
let claims = validation_result
.claims
.ok_or(AuthError::InvalidToken("No claims in token".to_string()))?;
// Créer un nouveau UserInfo à partir des claims
let user_info = UserInfo {
id: claims.sub,
username: claims.username,
email: claims.email,
roles: claims.roles,
permissions: claims.permissions,
subscription_tier: SubscriptionTier::Free, // À déterminer selon la logique métier
created_at: claims.iat,
last_login: SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0),
};
// Révoquer l'ancien token
self.revoke_token(&claims.session_id).await;
// Générer de nouveaux tokens
self.generate_tokens(&user_info, false).await
}
pub async fn revoke_token(&self, session_id: &str) {
let _ = self.revocation_store.revoke(session_id).await;
}
pub fn has_permission(&self, claims: &Claims, required_permission: Permission) -> bool {
claims.permissions.contains(&required_permission)
}
pub fn has_role(&self, claims: &Claims, required_role: Role) -> bool {
claims.roles.contains(&required_role)
}
pub fn has_any_role(&self, claims: &Claims, required_roles: &[Role]) -> bool {
claims
.roles
.iter()
.any(|role| required_roles.contains(role))
}
pub async fn login(&self, request: LoginRequest) -> Result<LoginResponse, AuthError> {
let user_info = self
.authenticate_user(&request.username, &request.password)
.await?;
let (access_token, refresh_token) = self
.generate_tokens(&user_info, request.remember_me.unwrap_or(false))
.await?;
Ok(LoginResponse {
access_token,
refresh_token,
token_type: "Bearer".to_string(),
expires_in: self.config.security.jwt_expiration.as_secs(),
user_info,
})
}
}
impl Clone for AuthManager {
fn clone(&self) -> Self {
Self {
config: self.config.clone(),
encoding_key: self.encoding_key.clone(),
decoding_key_rs256: self.decoding_key_rs256.clone(),
validation_rs256: self.validation_rs256.clone(),
decoding_key_hs256: self.decoding_key_hs256.clone(),
validation_hs256: self.validation_hs256.clone(),
revocation_store: self.revocation_store.clone(),
}
}
}
// Middleware d'authentification
pub async fn auth_middleware(
State(auth_manager): State<Arc<AuthManager>>,
mut request: Request,
next: Next,
) -> Result<Response, StatusCode> {
let headers = request.headers();
let token = extract_token_from_headers(headers).ok_or(StatusCode::UNAUTHORIZED)?;
let validation_result = auth_manager.validate_token(&token).await;
if !validation_result.valid {
tracing::warn!("Token validation failed: {:?}", validation_result.error);
return Err(StatusCode::UNAUTHORIZED);
}
let claims = validation_result.claims.ok_or(StatusCode::UNAUTHORIZED)?;
// Ajouter les claims à la requête pour les handlers suivants
request.extensions_mut().insert(claims);
Ok(next.run(request).await)
}
// Middleware de vérification des permissions
pub fn require_permission(
required_permission: Permission,
) -> impl Fn(
Request,
Next,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = Result<Response, StatusCode>> + Send>,
> + Clone {
move |request: Request, next: Next| {
let required_permission = required_permission.clone();
Box::pin(async move {
let claims = request
.extensions()
.get::<Claims>()
.ok_or(StatusCode::UNAUTHORIZED)?;
if !claims.permissions.contains(&required_permission) {
tracing::warn!(
"User {} lacks required permission: {:?}",
claims.username,
required_permission
);
return Err(StatusCode::FORBIDDEN);
}
Ok(next.run(request).await)
})
}
}
// Middleware de vérification des rôles
pub fn require_role(
required_role: Role,
) -> impl Fn(
Request,
Next,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = Result<Response, StatusCode>> + Send>,
> + Clone {
move |request: Request, next: Next| {
let required_role = required_role.clone();
Box::pin(async move {
let claims = request
.extensions()
.get::<Claims>()
.ok_or(StatusCode::UNAUTHORIZED)?;
if !claims.roles.contains(&required_role) {
tracing::warn!(
"User {} lacks required role: {:?}",
claims.username,
required_role
);
return Err(StatusCode::FORBIDDEN);
}
Ok(next.run(request).await)
})
}
}
pub(crate) fn extract_token_from_headers(headers: &HeaderMap) -> Option<String> {
let auth_header = headers.get("Authorization")?;
let auth_str = auth_header.to_str().ok()?;
if auth_str.starts_with("Bearer ") {
Some(auth_str[7..].to_string())
} else {
None
}
}
/// Extracts JWT from request: Authorization header (priority) or ?token= query param.
/// Used for HLS endpoints where clients may pass token via header or query.
pub(crate) fn extract_token_from_request(
headers: &HeaderMap,
uri: &axum::http::Uri,
) -> Option<String> {
extract_token_from_headers(headers).or_else(|| {
uri.query().and_then(|q| {
url::form_urlencoded::parse(q.as_bytes())
.find(|(k, _)| k == "token")
.map(|(_, v)| v.into_owned())
})
})
}
/// Middleware for HLS routes: requires valid JWT (Bearer header or ?token= query).
/// Audit 1.3 P0: Protect HLS endpoints from unauthorized access.
pub async fn hls_auth_middleware(
State(state): State<crate::AppState>,
mut request: Request,
next: Next,
) -> Result<Response, StatusCode> {
let auth_manager = &state.auth_manager;
let token = extract_token_from_request(request.headers(), request.uri()).ok_or_else(|| {
tracing::warn!("HLS request rejected: no token (Authorization or ?token=)");
StatusCode::UNAUTHORIZED
})?;
let validation_result = auth_manager.validate_token(&token).await;
if !validation_result.valid {
tracing::warn!("HLS token validation failed: {:?}", validation_result.error);
return Err(StatusCode::UNAUTHORIZED);
}
if let Some(claims) = validation_result.claims {
request.extensions_mut().insert(claims);
}
Ok(next.run(request).await)
}
// Handlers pour les routes d'authentification
pub async fn login_handler(
State(auth_manager): State<Arc<AuthManager>>,
Json(request): Json<LoginRequest>,
) -> Result<Json<LoginResponse>, (StatusCode, String)> {
match auth_manager.login(request).await {
Ok(response) => Ok(Json(response)),
Err(e) => {
tracing::error!("Login failed: {:?}", e);
match e {
AuthError::InvalidCredentials => {
Err((StatusCode::UNAUTHORIZED, "Invalid credentials".to_string()))
}
AuthError::TokenGenerationError(msg) => {
Err((StatusCode::INTERNAL_SERVER_ERROR, msg))
}
_ => Err((
StatusCode::INTERNAL_SERVER_ERROR,
"Authentication failed".to_string(),
)),
}
}
}
}
pub async fn refresh_handler(
State(auth_manager): State<Arc<AuthManager>>,
Json(request): Json<RefreshTokenRequest>,
) -> Result<Json<LoginResponse>, (StatusCode, String)> {
match auth_manager.refresh_token(&request.refresh_token).await {
Ok((access_token, refresh_token)) => {
// Valider le refresh token pour récupérer les claims
let validation_result = auth_manager.validate_token(&request.refresh_token).await;
let claims = validation_result.claims.ok_or((
StatusCode::INTERNAL_SERVER_ERROR,
"Failed to extract claims".to_string(),
))?;
// Créer UserInfo à partir des claims avec i64 aligned
let user_info = UserInfo {
id: claims.sub, // ALIGNED WITH i64 - use claims.sub directly
username: claims.username.clone(),
email: claims.email.clone(),
roles: claims.roles.clone(),
permissions: claims.permissions.clone(),
subscription_tier: SubscriptionTier::Free,
created_at: claims.iat,
last_login: SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0),
};
let response = LoginResponse {
access_token,
refresh_token,
token_type: "Bearer".to_string(),
expires_in: auth_manager.config.security.jwt_expiration.as_secs(),
user_info,
};
Ok(Json(response))
}
Err(e) => {
tracing::error!("Token refresh failed: {:?}", e);
Err((StatusCode::UNAUTHORIZED, "Token refresh failed".to_string()))
}
}
}
pub async fn logout_handler(
State(auth_manager): State<Arc<AuthManager>>,
request: Request,
) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
if let Some(claims) = request.extensions().get::<Claims>() {
auth_manager.revoke_token(&claims.session_id).await;
tracing::debug!("User {} logged out", claims.username);
}
Ok(Json(serde_json::json!({
"message": "Successfully logged out"
})))
}
pub async fn user_info_handler(request: Request) -> Result<Json<Claims>, StatusCode> {
let claims = request
.extensions()
.get::<Claims>()
.ok_or(StatusCode::UNAUTHORIZED)?;
Ok(Json(claims.clone()))
}
#[derive(Debug, thiserror::Error)]
pub enum AuthError {
#[error("Invalid credentials")]
InvalidCredentials,
#[error("Invalid token: {0}")]
InvalidToken(String),
#[error("Token generation error: {0}")]
TokenGenerationError(String),
#[error("Configuration error: {0}")]
ConfigurationError(String),
#[error("Database error: {0}")]
DatabaseError(String),
}
#[cfg(test)]
mod tests {
use super::*;
use axum::http::{HeaderMap, HeaderValue, Uri};
#[test]
fn extract_token_from_request_prefers_authorization_header() {
let mut headers = HeaderMap::new();
headers.insert(
"Authorization",
HeaderValue::from_static("Bearer my-jwt-token"),
);
let uri = "/hls/123/master.m3u8".parse::<Uri>().unwrap();
let token = extract_token_from_request(&headers, &uri);
assert_eq!(token.as_deref(), Some("my-jwt-token"));
}
#[test]
fn extract_token_from_request_fallback_to_query_param() {
let headers = HeaderMap::new();
let uri = "/hls/123/master.m3u8?token=query-jwt-token"
.parse::<Uri>()
.unwrap();
let token = extract_token_from_request(&headers, &uri);
assert_eq!(token.as_deref(), Some("query-jwt-token"));
}
#[test]
fn extract_token_from_request_header_overrides_query() {
let mut headers = HeaderMap::new();
headers.insert(
"Authorization",
HeaderValue::from_static("Bearer header-token"),
);
let uri = "/hls/123/master.m3u8?token=query-token"
.parse::<Uri>()
.unwrap();
let token = extract_token_from_request(&headers, &uri);
assert_eq!(token.as_deref(), Some("header-token"));
}
#[test]
fn extract_token_from_request_returns_none_when_missing() {
let headers = HeaderMap::new();
let uri = "/hls/123/master.m3u8".parse::<Uri>().unwrap();
let token = extract_token_from_request(&headers, &uri);
assert!(token.is_none());
}
#[test]
fn extract_token_from_request_query_param_only() {
let headers = HeaderMap::new();
let uri = "/stream/123?token=query-only-token".parse::<Uri>().unwrap();
let token = extract_token_from_request(&headers, &uri);
assert_eq!(token.as_deref(), Some("query-only-token"));
}
#[test]
fn test_auth_error_display() {
assert_eq!(
format!("{}", AuthError::InvalidCredentials),
"Invalid credentials"
);
assert!(format!("{}", AuthError::InvalidToken("bad".to_string())).contains("bad"));
}
#[test]
fn test_auth_error_configuration() {
let err = AuthError::ConfigurationError("missing config".to_string());
assert!(format!("{}", err).contains("missing config"));
}
#[test]
fn test_has_permission() {
let config = Arc::new(Config::default());
let manager = AuthManager::new(config).unwrap();
let claims = Claims {
sub: "user-1".to_string(),
username: "test".to_string(),
email: None,
roles: vec![Role::User],
permissions: vec![Permission::StreamAudio, Permission::UploadAudio],
exp: 9999999999,
iat: 0,
iss: "test".to_string(),
aud: "test".to_string(),
session_id: "sess-1".to_string(),
};
assert!(manager.has_permission(&claims, Permission::StreamAudio));
assert!(!manager.has_permission(&claims, Permission::SystemAdmin));
}
#[test]
fn test_has_role() {
let config = Arc::new(Config::default());
let manager = AuthManager::new(config).unwrap();
let claims = Claims {
sub: "user-1".to_string(),
username: "admin".to_string(),
email: None,
roles: vec![Role::Admin, Role::User],
permissions: vec![],
exp: 9999999999,
iat: 0,
iss: "test".to_string(),
aud: "test".to_string(),
session_id: "sess-1".to_string(),
};
assert!(manager.has_role(&claims, Role::Admin));
assert!(!manager.has_role(&claims, Role::Moderator));
}
#[test]
fn test_has_any_role() {
let config = Arc::new(Config::default());
let manager = AuthManager::new(config).unwrap();
let claims = Claims {
sub: "user-1".to_string(),
username: "artist".to_string(),
email: None,
roles: vec![Role::Artist],
permissions: vec![],
exp: 9999999999,
iat: 0,
iss: "test".to_string(),
aud: "test".to_string(),
session_id: "sess-1".to_string(),
};
assert!(manager.has_any_role(&claims, &[Role::Admin, Role::Artist]));
assert!(!manager.has_any_role(&claims, &[Role::Admin, Role::Moderator]));
}
#[test]
fn test_extract_token_from_headers() {
let mut headers = HeaderMap::new();
headers.insert("Authorization", HeaderValue::from_static("Bearer token123"));
let token = extract_token_from_headers(&headers);
assert_eq!(token.as_deref(), Some("token123"));
}
#[test]
fn test_extract_token_from_headers_no_bearer() {
let mut headers = HeaderMap::new();
headers.insert("Authorization", HeaderValue::from_static("Basic xyz"));
let token = extract_token_from_headers(&headers);
assert!(token.is_none());
}
#[test]
fn test_extract_token_from_headers_missing() {
let headers = HeaderMap::new();
let token = extract_token_from_headers(&headers);
assert!(token.is_none());
}
}