veza/veza-chat-server/src/websocket/handler.rs
2026-02-22 03:46:10 +01:00

1231 lines
46 KiB
Rust

//! Handler WebSocket pour le chat server
//!
//! Ce module fournit un handler WebSocket utilisant Axum pour gérer
//! les connexions, déconnexions et le routage des messages de chat.
use axum::extract::ws::{Message, WebSocket};
use axum::extract::{Query, State, WebSocketUpgrade};
use axum::http::HeaderMap;
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use futures_util::StreamExt;
use serde_json;
use std::collections::HashMap;
use std::sync::Arc;
use tracing::{debug, error, info, info_span, warn, Instrument};
use uuid::Uuid;
use crate::delivered_status::DeliveredStatusManager;
use crate::error::ChatError;
use crate::jwt_manager::{AccessTokenClaims, JwtManager};
use crate::monitoring::ChatMetrics;
use crate::reactions::ReactionsManager;
use crate::read_receipts::ReadReceiptManager;
use crate::repository::MessageRepository;
use crate::security::permission::PermissionService;
use crate::services::MessageEditService;
use crate::typing_indicator::TypingIndicatorManager;
use crate::websocket::{IncomingMessage, OutgoingMessage, WebSocketClient, WebSocketManager};
/// État partagé pour le handler WebSocket
#[derive(Clone)]
pub struct WebSocketState {
/// Timeout d'inactivité (heartbeat) en secondes, configurable via CHAT_KEEPALIVE_TIMEOUT_SECS
pub keepalive_timeout_secs: u64,
// pub store: Arc<SimpleMessageStore>, // Remove SimpleMessageStore
pub message_repo: Arc<MessageRepository>, // Add MessageRepository
pub read_receipt_manager: Arc<ReadReceiptManager>, // Add ReadReceiptManager
pub delivered_status_manager: Arc<DeliveredStatusManager>, // Add DeliveredStatusManager
pub typing_indicator_manager: Arc<TypingIndicatorManager>, // Add TypingIndicatorManager
pub message_edit_service: Arc<MessageEditService>, // Add MessageEditService
pub reactions_manager: Arc<ReactionsManager>, // Add ReactionsManager
pub ws_manager: Arc<WebSocketManager>,
pub jwt_manager: Arc<JwtManager>,
pub permission_service: Arc<PermissionService>, // Add PermissionService
pub metrics: Arc<ChatMetrics>,
pub rate_limiter: Arc<crate::security::RateLimiter>,
}
/// Extract access token from query param (?token=) or Cookie header (access_token)
fn extract_token(params: &HashMap<String, String>, headers: &HeaderMap) -> Option<String> {
if let Some(t) = params.get("token") {
return Some(t.clone());
}
if let Some(cookie_header) = headers.get(axum::http::header::COOKIE) {
if let Ok(cookie_str) = cookie_header.to_str() {
for part in cookie_str.split(';') {
let part = part.trim();
if part.starts_with("access_token=") {
let value = part.trim_start_matches("access_token=").trim();
if !value.is_empty() {
return Some(value.to_string());
}
}
}
}
}
None
}
/// Handler principal pour les connexions WebSocket
///
/// Cette fonction gère la mise à niveau de la connexion HTTP vers WebSocket
/// et délègue la gestion de la connexion à `handle_socket`.
/// FIX #13: Extrait le request_id depuis les extensions Axum (ajouté par le middleware)
pub async fn websocket_handler(
ws: WebSocketUpgrade,
Query(params): Query<HashMap<String, String>>,
headers: HeaderMap,
State(state): State<WebSocketState>,
request_id: Option<axum::extract::Extension<Uuid>>,
) -> Response {
// FIX #13: Extraire le request_id pour la corrélation
let request_id = request_id.map(|ext| *ext).unwrap_or_else(Uuid::new_v4);
// Créer un span avec le request_id pour la corrélation
let span = info_span!(
"websocket_upgrade",
request_id = %request_id,
);
async move {
info!(request_id = %request_id, "🔌 Nouvelle connexion WebSocket demandée");
let token = match extract_token(&params, &headers) {
Some(t) => t,
None => {
error!(request_id = %request_id, "❌ Token manquant (query ?token= ou cookie access_token)");
return (StatusCode::UNAUTHORIZED, "Missing token").into_response();
}
};
match state.jwt_manager.validate_access_token(&token).await {
Ok(claims) => {
info!(
request_id = %request_id,
username = %claims.username,
user_id = %claims.user_id,
"✅ Connexion autorisée"
);
// FIX #13: Passer le request_id au handler de socket
ws.on_upgrade(move |socket| handle_socket(socket, state, claims, request_id))
}
Err(e) => {
error!(request_id = %request_id, error = %e, "❌ Token invalide");
(StatusCode::UNAUTHORIZED, "Invalid token").into_response()
}
}
}
.instrument(span)
.await
}
/// Gère une connexion WebSocket individuelle
///
/// Note: Toutes les erreurs sont gérées explicitement pour éviter les panics.
/// Tokio capture automatiquement les panics dans les handlers, mais nous
/// nous assurons que toutes les erreurs sont gérées explicitement avec `?` ou `match`.
/// FIX #13: Accepte le request_id pour la corrélation
async fn handle_socket(
socket: WebSocket,
state: WebSocketState,
claims: AccessTokenClaims,
request_id: Uuid,
) {
// FIX #13: Créer un span avec le request_id pour toute la durée de la connexion WebSocket
let span = info_span!(
"websocket_connection",
request_id = %request_id,
user_id = %claims.user_id,
username = %claims.username,
);
async move {
let (sender, mut receiver) = socket.split();
// ID du client (généré localement pour cette connexion)
let client_id = Uuid::new_v4();
// MIGRATION UUID: user_id est déjà String (UUID)
let client = Arc::new(WebSocketClient::new(
client_id,
claims.user_id.clone(),
sender,
));
state.ws_manager.add_client(client.clone()).await;
info!(
request_id = %request_id,
client_id = %client_id,
username = %claims.username,
"✅ Connexion WebSocket établie"
);
// Metrics: connection
state
.metrics
.websocket_connected(claims.user_id.clone())
.await;
// Envoyer un message de bienvenue
let welcome_msg = OutgoingMessage::ActionConfirmed {
action: "connected".to_string(),
success: true,
};
if client.send_message(welcome_msg).await.is_err() {
error!("❌ Impossible d'envoyer le message de bienvenue");
state.ws_manager.remove_client(client_id).await;
return;
}
let keepalive_timeout = std::time::Duration::from_secs(state.keepalive_timeout_secs);
// Boucle principale de gestion des messages avec timeout
loop {
match tokio::time::timeout(keepalive_timeout, receiver.next()).await {
Ok(Some(msg)) => {
match msg {
Ok(Message::Text(text)) => {
debug!("📨 Message WebSocket reçu: {}", text);
match handle_incoming_message(&text, &state, client.clone(), &claims).await
{
Ok(should_continue) => {
if !should_continue {
break;
}
}
Err(e) => {
error!("❌ Erreur lors du traitement du message: {}", e);
// Envoyer un message d'erreur au client
let error_msg = OutgoingMessage::Error {
message: format!("Erreur: {}", e),
};
if client.send_message(error_msg).await.is_err() {
error!("❌ Impossible d'envoyer le message d'erreur au client");
break; // Fermer la connexion si on ne peut même pas envoyer d'erreur
}
}
}
}
Ok(Message::Close(_)) => {
info!("👋 Connexion WebSocket fermée par le client");
break;
}
Ok(Message::Ping(_)) => {
debug!("🏓 Ping WebSocket reçu");
if client.send_message(OutgoingMessage::Pong).await.is_err() {
error!("❌ Erreur lors de l'envoi du Pong");
break;
}
}
Ok(Message::Pong(_)) => {
debug!("🏓 Pong WebSocket reçu");
}
Ok(_) => {
debug!("⚠️ Type de message WebSocket non géré");
}
Err(e) => {
error!("❌ Erreur WebSocket: {}", e);
break;
}
}
}
Ok(None) => {
// Fin du stream
break;
}
Err(_) => {
info!(
"💤 Timeout inactivité ({}s) pour client {}, fermeture",
keepalive_timeout.as_secs(),
client_id
);
break;
}
}
}
info!(
request_id = %request_id,
client_id = %client_id,
username = %claims.username,
"🔌 Connexion WebSocket terminée"
);
state.ws_manager.remove_client(client_id).await;
// Metrics: disconnection
state.metrics.websocket_disconnected(claims.user_id).await;
}
.instrument(span)
.await;
}
/// Traite un message entrant et route selon le type
///
/// Retourne `Ok(true)` si la connexion doit continuer, `Ok(false)` pour fermer
async fn handle_incoming_message(
text: &str,
state: &WebSocketState,
client: Arc<WebSocketClient>,
claims: &AccessTokenClaims,
) -> Result<bool, ChatError> {
// Parser le message JSON
let incoming: IncomingMessage = serde_json::from_str(text)
.map_err(|e| ChatError::serialization_error("IncomingMessage", text, e))?;
// Determine the rate limit action category for this message
let rate_action = match &incoming {
IncomingMessage::SendMessage { .. } => Some(crate::security::RateLimitAction::SendMessage),
IncomingMessage::AddReaction { .. } | IncomingMessage::RemoveReaction { .. } => {
Some(crate::security::RateLimitAction::AddReaction)
}
IncomingMessage::EditMessage { .. } => Some(crate::security::RateLimitAction::EditMessage),
IncomingMessage::DeleteMessage { .. } => {
Some(crate::security::RateLimitAction::DeleteMessage)
}
IncomingMessage::Typing { .. } => Some(crate::security::RateLimitAction::Typing),
IncomingMessage::JoinConversation { .. } => {
Some(crate::security::RateLimitAction::JoinConversation)
}
IncomingMessage::SearchMessages { .. } => {
Some(crate::security::RateLimitAction::SearchMessages)
}
IncomingMessage::CallOffer { .. }
| IncomingMessage::CallAnswer { .. }
| IncomingMessage::ICECandidate { .. }
| IncomingMessage::CallHangup { .. }
| IncomingMessage::CallReject { .. } => {
Some(crate::security::RateLimitAction::CallSignaling)
}
// Ping, MarkAsRead, Delivered, LeaveConversation, FetchHistory, SyncMessages
// are not rate-limited
_ => None,
};
// Check rate limit if applicable
if let Some(action) = rate_action {
match state.rate_limiter.check_rate_limit(&claims.user_id, action).await {
Ok(false) => {
tracing::warn!(
user_id = %claims.user_id,
action = ?action,
"Rate limit exceeded for WebSocket action"
);
// Send error to client via WebSocket and continue connection
let _ = client
.send_message(crate::websocket::OutgoingMessage::Error {
message: "Too many requests. Please slow down.".to_string(),
})
.await;
return Ok(true); // Continue connection, just skip this message
}
Ok(true) => { /* Allowed, proceed */ }
Err(e) => {
tracing::error!("Rate limiter error: {}", e);
// Fail open — don't block on rate limiter errors
}
}
}
match incoming {
IncomingMessage::SendMessage {
conversation_id,
content,
parent_message_id: _,
attachments,
} => {
info!(
"💬 Envoi de message via WebSocket par {} (conversation: {})",
claims.username, conversation_id
);
// MIGRATION UUID: user_id est déjà String (UUID), on le parse directement
let sender_uuid = Uuid::parse_str(&claims.user_id)
.map_err(|e| ChatError::validation_error(&format!("Invalid user UUID: {}", e)))?;
// Vérifier les permissions avant d'envoyer le message
state
.permission_service
.can_send_message(sender_uuid, conversation_id)
.await
.map_err(|e| {
warn!(
user_id = %sender_uuid,
conversation_id = %conversation_id,
error = %e,
"Permission refusée pour l'envoi de message"
);
e
})?;
// Préparer les métadonnées pour les pièces jointes
let _metadata = attachments.as_ref().map(|a| serde_json::to_value(a).unwrap_or(serde_json::Value::Null));
// Enregistrer le message dans le store
// Note: On pourrait étendre MessageRepository::create pour accepter metadata et parent_message_id
let message = state
.message_repo
.create(conversation_id, sender_uuid, &content)
.await
.map_err(|e| {
ChatError::internal_error(format!(
"Erreur lors de l'enregistrement du message: {}",
e
))
})?;
// Diffuser le message à tous les clients de la conversation
let outgoing_message = OutgoingMessage::NewMessage {
conversation_id,
message_id: message.id,
sender_id: message.sender_id,
content: message.content.clone(),
created_at: message.created_at,
attachments,
};
state
.ws_manager
.broadcast_to_conversation(conversation_id, outgoing_message)
.await?;
// Envoyer confirmation au sender
let confirmation = OutgoingMessage::ActionConfirmed {
action: "message_sent".to_string(),
success: true,
};
client.send_message(confirmation).await?;
info!(
"✅ Message WebSocket envoyé et diffusé - ID: {}",
message.id
);
}
IncomingMessage::AddReaction {
message_id,
conversation_id,
emoji,
} => {
info!(
"❤️ Ajout de réaction {} au message {} par {}",
emoji, message_id, claims.username
);
let user_uuid = Uuid::parse_str(&claims.user_id)
.map_err(|e| ChatError::validation_error(&format!("Invalid user UUID: {}", e)))?;
// Vérifier les permissions
state
.permission_service
.can_read_conversation(user_uuid, conversation_id)
.await
.map_err(|e| {
warn!(
user_id = %user_uuid,
conversation_id = %conversation_id,
error = %e,
"Permission refusée pour ajouter une réaction"
);
e
})?;
// Convertir l'emoji string en enum (optionnel, on peut aussi stocker le string directement)
if let Some(reaction_emoji) = crate::reactions::ReactionEmoji::from_str(&emoji) {
state
.reactions_manager
.add_reaction(message_id, user_uuid, reaction_emoji)
.await
.map_err(|e| ChatError::internal_error(format!("Erreur DB réaction: {}", e)))?;
// Diffuser la réaction
let reaction_msg = OutgoingMessage::ReactionAdded {
message_id,
conversation_id,
user_id: user_uuid,
emoji,
};
state
.ws_manager
.broadcast_to_conversation(conversation_id, reaction_msg)
.await?;
client.send_message(OutgoingMessage::ActionConfirmed {
action: "reaction_added".to_string(),
success: true,
}).await?;
} else {
return Err(ChatError::validation_error("Emoji non supporté"));
}
}
IncomingMessage::RemoveReaction {
message_id,
conversation_id,
} => {
info!(
"💔 Retrait de réaction du message {} par {}",
message_id, claims.username
);
let user_uuid = Uuid::parse_str(&claims.user_id)
.map_err(|e| ChatError::validation_error(&format!("Invalid user UUID: {}", e)))?;
state
.reactions_manager
.remove_reaction(message_id, user_uuid)
.await
.map_err(|e| ChatError::internal_error(format!("Erreur DB réaction: {}", e)))?;
// Diffuser le retrait
let reaction_msg = OutgoingMessage::ReactionRemoved {
message_id,
conversation_id,
user_id: user_uuid,
};
state
.ws_manager
.broadcast_to_conversation(conversation_id, reaction_msg)
.await?;
client.send_message(OutgoingMessage::ActionConfirmed {
action: "reaction_removed".to_string(),
success: true,
}).await?;
}
IncomingMessage::JoinConversation { conversation_id } => {
info!(
"🔗 Client {} ({}) rejoint la conversation {}",
client.id, claims.username, conversation_id
);
// MIGRATION UUID: user_id est déjà String (UUID), on le parse directement
let user_uuid = Uuid::parse_str(&claims.user_id)
.map_err(|e| ChatError::validation_error(&format!("Invalid user UUID: {}", e)))?;
// Vérifier les permissions avant de rejoindre
state
.permission_service
.can_join_conversation(user_uuid, conversation_id)
.await
.map_err(|e| {
warn!(
user_id = %user_uuid,
conversation_id = %conversation_id,
error = %e,
"Permission refusée pour rejoindre la conversation"
);
e
})?;
client.add_conversation(conversation_id).await;
let outgoing = OutgoingMessage::ActionConfirmed {
action: "joined_conversation".to_string(),
success: true,
};
client.send_message(outgoing).await?;
}
IncomingMessage::LeaveConversation { conversation_id } => {
info!(
"🔚 Client {} ({}) quitte la conversation {}",
client.id, claims.username, conversation_id
);
client.remove_conversation(conversation_id).await;
let outgoing = OutgoingMessage::ActionConfirmed {
action: "left_conversation".to_string(),
success: true,
};
client.send_message(outgoing).await?;
}
IncomingMessage::MarkAsRead {
conversation_id,
message_id,
} => {
info!(
"👁️ Client {} marque le message {} comme lu dans {}",
client.id, message_id, conversation_id
);
// Parser l'user_id depuis les claims JWT
let user_uuid = Uuid::parse_str(&claims.user_id)
.map_err(|e| ChatError::validation_error(&format!("Invalid user UUID: {}", e)))?;
// Vérifier que le message existe
let message = state
.message_repo
.get_by_id(message_id)
.await
.map_err(|e| {
ChatError::internal_error(format!(
"Erreur lors de la récupération du message: {}",
e
))
})?;
let message =
message.ok_or_else(|| ChatError::not_found("Message", &message_id.to_string()))?;
// Vérifier que le message appartient à la conversation indiquée
if message.conversation_id != conversation_id {
return Err(ChatError::validation_error(
"Le message n'appartient pas à la conversation indiquée",
));
}
// Vérifier les permissions pour marquer comme lu
state
.permission_service
.can_mark_read(user_uuid, conversation_id)
.await
.map_err(|e| {
warn!(
user_id = %user_uuid,
conversation_id = %conversation_id,
error = %e,
"Permission refusée pour marquer comme lu"
);
e
})?;
// Marquer le message comme lu
let receipt = state
.read_receipt_manager
.mark_as_read(user_uuid, message_id, conversation_id)
.await
.map_err(|e| {
ChatError::internal_error(format!("Erreur lors du marquage comme lu: {}", e))
})?;
// Créer le message outbound pour notifier les autres participants
let message_read = OutgoingMessage::MessageRead {
message_id,
user_id: user_uuid,
conversation_id,
read_at: receipt.read_at,
};
// Broadcast aux autres participants de la conversation
state
.ws_manager
.broadcast_to_conversation(conversation_id, message_read.clone())
.await?;
// Envoyer confirmation au client qui a initié l'action
let confirmation = OutgoingMessage::ActionConfirmed {
action: "marked_as_read".to_string(),
success: true,
};
client.send_message(confirmation).await?;
info!(
"✅ Message {} marqué comme lu par {} dans la conversation {}",
message_id, user_uuid, conversation_id
);
}
IncomingMessage::Typing {
conversation_id,
is_typing,
} => {
info!(
"⌨️ Client {} ({}) typing indicator: {} dans conversation {}",
client.id, claims.username, is_typing, conversation_id
);
// Parser l'user_id depuis les claims JWT
let user_uuid = Uuid::parse_str(&claims.user_id)
.map_err(|e| ChatError::validation_error(&format!("Invalid user UUID: {}", e)))?;
// Vérifier les permissions avant d'envoyer le signal typing
state
.permission_service
.can_send_message(user_uuid, conversation_id)
.await
.map_err(|e| {
warn!(
user_id = %user_uuid,
conversation_id = %conversation_id,
error = %e,
"Permission refusée pour typing indicator"
);
e
})?;
if is_typing {
// User a commencé à taper
state
.typing_indicator_manager
.user_started_typing(user_uuid, conversation_id)
.await;
} else {
// User a arrêté de taper
state
.typing_indicator_manager
.user_stopped_typing(user_uuid, conversation_id)
.await;
}
// Broadcast aux autres participants de la conversation
let typing_message = OutgoingMessage::UserTyping {
conversation_id,
user_id: user_uuid,
is_typing,
};
state
.ws_manager
.broadcast_to_conversation(conversation_id, typing_message.clone())
.await?;
// Envoyer confirmation au client qui a initié l'action
let confirmation = OutgoingMessage::ActionConfirmed {
action: "typing_indicator".to_string(),
success: true,
};
client.send_message(confirmation).await?;
info!(
"✅ Typing indicator {} diffusé pour {} dans la conversation {}",
if is_typing { "activé" } else { "désactivé" },
user_uuid,
conversation_id
);
}
IncomingMessage::Delivered {
conversation_id,
message_id,
} => {
info!(
"📬 Client {} ({}) marque le message {} comme délivré dans {}",
client.id, message_id, conversation_id, claims.username
);
// Parser l'user_id depuis les claims JWT
let user_uuid = Uuid::parse_str(&claims.user_id)
.map_err(|e| ChatError::validation_error(&format!("Invalid user UUID: {}", e)))?;
// Vérifier les permissions pour marquer comme délivré
state
.permission_service
.can_read_conversation(user_uuid, conversation_id)
.await
.map_err(|e| {
warn!(
user_id = %user_uuid,
conversation_id = %conversation_id,
error = %e,
"Permission refusée pour marquer comme délivré"
);
e
})?;
// Vérifier que le message existe et appartient à la conversation
let message = state
.message_repo
.get_by_id(message_id)
.await
.map_err(|e| {
ChatError::internal_error(format!(
"Erreur lors de la récupération du message: {}",
e
))
})?;
let message =
message.ok_or_else(|| ChatError::not_found("Message", &message_id.to_string()))?;
// Vérifier que le message appartient à la conversation indiquée
if message.conversation_id != conversation_id {
return Err(ChatError::validation_error(
"Le message n'appartient pas à la conversation indiquée",
));
}
// Vérifier que le message appartient bien à la conversation (double vérification)
let belongs = state
.delivered_status_manager
.verify_message_belongs_to_conversation(message_id, conversation_id)
.await
.map_err(|e| {
ChatError::internal_error(format!(
"Erreur lors de la vérification du message: {}",
e
))
})?;
if !belongs {
return Err(ChatError::validation_error(
"Le message n'appartient pas à la conversation indiquée",
));
}
// Marquer le message comme délivré
let status = state
.delivered_status_manager
.mark_delivered(user_uuid, message_id, conversation_id)
.await
.map_err(|e| {
ChatError::internal_error(format!(
"Erreur lors du marquage comme délivré: {}",
e
))
})?;
// Créer le message outbound pour notifier les autres participants
let message_delivered = OutgoingMessage::MessageDelivered {
message_id,
user_id: user_uuid,
conversation_id,
delivered_at: status.delivered_at,
};
// Broadcast aux autres participants de la conversation
state
.ws_manager
.broadcast_to_conversation(conversation_id, message_delivered.clone())
.await?;
// Envoyer confirmation au client qui a initié l'action
let confirmation = OutgoingMessage::ActionConfirmed {
action: "marked_as_delivered".to_string(),
success: true,
};
client.send_message(confirmation).await?;
info!(
"✅ Message {} marqué comme délivré par {} dans la conversation {}",
message_id, user_uuid, conversation_id
);
}
IncomingMessage::EditMessage {
message_id,
conversation_id,
new_content,
} => {
info!(
"✏️ Client {} ({}) édite le message {} dans {}",
client.id, claims.username, message_id, conversation_id
);
// Parser l'user_id depuis les claims JWT
let user_uuid = Uuid::parse_str(&claims.user_id)
.map_err(|e| ChatError::validation_error(&format!("Invalid user UUID: {}", e)))?;
// Éditer le message via le service
let updated_message = state
.message_edit_service
.edit_message(user_uuid, message_id, &new_content)
.await
.map_err(|e| {
warn!(
user_id = %user_uuid,
message_id = %message_id,
error = %e,
"Erreur lors de l'édition du message"
);
e
})?;
// Vérifier que le message appartient à la conversation indiquée
if updated_message.conversation_id != conversation_id {
return Err(ChatError::validation_error(
"Le message n'appartient pas à la conversation indiquée",
));
}
// Créer le message outbound pour notifier les autres participants
let message_edited = OutgoingMessage::MessageEdited {
message_id,
conversation_id,
editor_id: user_uuid,
edited_at: updated_message
.edited_at
.unwrap_or(updated_message.updated_at),
new_content: updated_message.content.clone(),
};
// Broadcast aux autres participants de la conversation
state
.ws_manager
.broadcast_to_conversation(conversation_id, message_edited.clone())
.await?;
// Envoyer confirmation au client qui a initié l'action
let confirmation = OutgoingMessage::ActionConfirmed {
action: "message_edited".to_string(),
success: true,
};
client.send_message(confirmation).await?;
info!(
"✅ Message {} édité par {} dans la conversation {}",
message_id, user_uuid, conversation_id
);
}
IncomingMessage::DeleteMessage {
message_id,
conversation_id,
} => {
info!(
"🗑️ Client {} ({}) supprime le message {} dans {}",
client.id, claims.username, message_id, conversation_id
);
// Parser l'user_id depuis les claims JWT
let user_uuid = Uuid::parse_str(&claims.user_id)
.map_err(|e| ChatError::validation_error(&format!("Invalid user UUID: {}", e)))?;
// Supprimer le message via le service
let deleted_message = state
.message_edit_service
.delete_message(user_uuid, message_id)
.await
.map_err(|e| {
warn!(
user_id = %user_uuid,
message_id = %message_id,
error = %e,
"Erreur lors de la suppression du message"
);
e
})?;
// Vérifier que le message appartient à la conversation indiquée
if deleted_message.conversation_id != conversation_id {
return Err(ChatError::validation_error(
"Le message n'appartient pas à la conversation indiquée",
));
}
// Créer le message outbound pour notifier les autres participants
let message_deleted = OutgoingMessage::MessageDeleted {
message_id,
conversation_id,
deleter_id: user_uuid,
deleted_at: deleted_message
.deleted_at
.unwrap_or(deleted_message.updated_at),
};
// Broadcast aux autres participants de la conversation
state
.ws_manager
.broadcast_to_conversation(conversation_id, message_deleted.clone())
.await?;
// Envoyer confirmation au client qui a initié l'action
let confirmation = OutgoingMessage::ActionConfirmed {
action: "message_deleted".to_string(),
success: true,
};
client.send_message(confirmation).await?;
info!(
"✅ Message {} supprimé par {} dans la conversation {}",
message_id, user_uuid, conversation_id
);
}
IncomingMessage::FetchHistory {
conversation_id,
before,
after,
limit,
} => {
info!(
"📜 Client {} ({}) demande l'historique de la conversation {}",
client.id, claims.username, conversation_id
);
// Parser l'user_id depuis les claims JWT
let user_uuid = Uuid::parse_str(&claims.user_id)
.map_err(|e| ChatError::validation_error(&format!("Invalid user UUID: {}", e)))?;
// Vérifier les permissions pour lire l'historique
state
.permission_service
.can_read_conversation(user_uuid, conversation_id)
.await
.map_err(|e| {
warn!(
user_id = %user_uuid,
conversation_id = %conversation_id,
error = %e,
"Permission refusée pour lire l'historique"
);
e
})?;
// Récupérer l'historique
let limit = limit.unwrap_or(50).min(100);
let (messages, has_more_before, has_more_after): (
Vec<crate::models::message::Message>,
bool,
bool,
) = state
.message_repo
.fetch_history(conversation_id, before, after, limit, false)
.await
.map_err(|e| {
ChatError::internal_error(format!(
"Erreur lors de la récupération de l'historique: {}",
e
))
})?;
// Envoyer le chunk d'historique
let message_count = messages.len();
let history_chunk = OutgoingMessage::HistoryChunk {
conversation_id,
messages,
has_more_before,
has_more_after,
};
client.send_message(history_chunk).await?;
info!(
"✅ Historique envoyé pour la conversation {} ({} messages)",
conversation_id, message_count
);
}
IncomingMessage::SearchMessages {
conversation_id,
query,
limit,
offset,
} => {
info!(
"🔍 Client {} ({}) recherche dans la conversation {}: '{}'",
client.id, claims.username, conversation_id, query
);
// Parser l'user_id depuis les claims JWT
let user_uuid = Uuid::parse_str(&claims.user_id)
.map_err(|e| ChatError::validation_error(&format!("Invalid user UUID: {}", e)))?;
// Vérifier les permissions pour rechercher
state
.permission_service
.can_read_conversation(user_uuid, conversation_id)
.await
.map_err(|e| {
warn!(
user_id = %user_uuid,
conversation_id = %conversation_id,
error = %e,
"Permission refusée pour rechercher"
);
e
})?;
// Valider la query (ne pas être vide)
if query.trim().is_empty() {
return Err(ChatError::validation_error(
"La requête de recherche ne peut pas être vide",
));
}
// Rechercher les messages
let limit = limit.unwrap_or(50).min(100);
let offset = offset.unwrap_or(0);
let (messages, total) = state
.message_repo
.search_messages(conversation_id, &query, limit, offset, false)
.await
.map_err(|e| {
ChatError::internal_error(format!("Erreur lors de la recherche: {}", e))
})?;
// Envoyer les résultats
let search_results = OutgoingMessage::SearchResults {
conversation_id,
messages,
query: query.clone(),
total,
};
client.send_message(search_results).await?;
info!(
"✅ Recherche terminée pour '{}' dans {} ({} résultats)",
query, conversation_id, total
);
}
IncomingMessage::SyncMessages {
conversation_id,
since,
} => {
info!(
"🔄 Client {} ({}) synchronise la conversation {} depuis {}",
client.id, claims.username, conversation_id, since
);
// Parser l'user_id depuis les claims JWT
let user_uuid = Uuid::parse_str(&claims.user_id)
.map_err(|e| ChatError::validation_error(&format!("Invalid user UUID: {}", e)))?;
// Vérifier les permissions pour synchroniser
state
.permission_service
.can_read_conversation(user_uuid, conversation_id)
.await
.map_err(|e| {
warn!(
user_id = %user_uuid,
conversation_id = %conversation_id,
error = %e,
"Permission refusée pour synchroniser"
);
e
})?;
// Récupérer les messages depuis since
let messages = state
.message_repo
.fetch_since(conversation_id, since)
.await
.map_err(|e| {
ChatError::internal_error(format!("Erreur lors de la synchronisation: {}", e))
})?;
// Calculer le dernier timestamp de sync (maintenant)
let last_sync = chrono::Utc::now();
// Envoyer le chunk de synchronisation
let message_count = messages.len();
let sync_chunk = OutgoingMessage::SyncChunk {
conversation_id,
messages,
last_sync,
};
client.send_message(sync_chunk).await?;
info!(
"✅ Synchronisation terminée pour {} ({} messages)",
conversation_id, message_count
);
}
IncomingMessage::CallOffer {
conversation_id,
target_user_id,
sdp,
call_type,
} => {
let sender_uuid = Uuid::parse_str(&claims.user_id)
.map_err(|e| ChatError::validation_error(&format!("Invalid user UUID: {}", e)))?;
state
.permission_service
.can_send_message(sender_uuid, conversation_id)
.await
.map_err(|e| {
warn!(
user_id = %sender_uuid,
conversation_id = %conversation_id,
error = %e,
"Permission refusée pour CallOffer"
);
e
})?;
let msg = OutgoingMessage::CallOffer {
conversation_id,
caller_user_id: sender_uuid,
sdp: sdp.clone(),
call_type: call_type.clone(),
};
state
.ws_manager
.send_to_user(&target_user_id.to_string(), msg, Some(client.id))
.await?;
info!(
"📞 CallOffer relayé de {} vers {} (conversation: {})",
claims.username, target_user_id, conversation_id
);
}
IncomingMessage::CallAnswer {
conversation_id,
caller_user_id,
sdp,
} => {
let callee_uuid = Uuid::parse_str(&claims.user_id)
.map_err(|e| ChatError::validation_error(&format!("Invalid user UUID: {}", e)))?;
let msg = OutgoingMessage::CallAnswer {
conversation_id,
target_user_id: caller_user_id,
from_user_id: callee_uuid,
sdp: sdp.clone(),
};
state
.ws_manager
.send_to_user(&caller_user_id.to_string(), msg, Some(client.id))
.await?;
info!(
"📞 CallAnswer relayé vers {} (conversation: {})",
caller_user_id, conversation_id
);
}
IncomingMessage::ICECandidate {
conversation_id,
target_user_id,
candidate,
} => {
let sender_uuid = Uuid::parse_str(&claims.user_id)
.map_err(|e| ChatError::validation_error(&format!("Invalid user UUID: {}", e)))?;
let msg = OutgoingMessage::ICECandidate {
conversation_id,
from_user_id: sender_uuid,
candidate: candidate.clone(),
};
state
.ws_manager
.send_to_user(&target_user_id.to_string(), msg, Some(client.id))
.await?;
debug!(
"📞 ICECandidate relayé de {} vers {} (conversation: {})",
claims.username, target_user_id, conversation_id
);
}
IncomingMessage::CallHangup {
conversation_id,
target_user_id,
} => {
let sender_uuid = Uuid::parse_str(&claims.user_id)
.map_err(|e| ChatError::validation_error(&format!("Invalid user UUID: {}", e)))?;
let msg = OutgoingMessage::CallHangup {
conversation_id,
user_id: sender_uuid,
};
state
.ws_manager
.send_to_user(&target_user_id.to_string(), msg, Some(client.id))
.await?;
info!(
"📞 CallHangup relayé de {} vers {} (conversation: {})",
claims.username, target_user_id, conversation_id
);
}
IncomingMessage::CallReject {
conversation_id,
caller_user_id,
} => {
let sender_uuid = Uuid::parse_str(&claims.user_id)
.map_err(|e| ChatError::validation_error(&format!("Invalid user UUID: {}", e)))?;
let msg = OutgoingMessage::CallRejected {
conversation_id,
user_id: sender_uuid,
};
state
.ws_manager
.send_to_user(&caller_user_id.to_string(), msg, Some(client.id))
.await?;
info!(
"📞 CallReject relayé vers {} (conversation: {})",
caller_user_id, conversation_id
);
}
IncomingMessage::Ping => {
debug!("🏓 Ping WebSocket reçu");
client.send_message(OutgoingMessage::Pong).await?;
}
}
Ok(true) // Continuer la connexion
}