veza/veza-chat-server/src/websocket/mod.rs
2026-02-22 03:46:10 +01:00

363 lines
10 KiB
Rust

//! Module WebSocket pour le chat server
//!
//! Ce module contient:
//! - Les types de messages WebSocket (IncomingMessage, OutgoingMessage)
//! - Le gestionnaire de connexions (WebSocketManager)
//! - Le handler Axum (handler.rs)
//! - Le système de broadcast (broadcast.rs)
pub mod broadcast;
pub mod handler;
use crate::error::Result;
use axum::extract::ws::{Message, WebSocket};
use futures_util::{stream::SplitSink, SinkExt};
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::RwLock;
use uuid::Uuid;
/// Message WebSocket entrant
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum IncomingMessage {
/// Envoi d'un message de chat
SendMessage {
conversation_id: Uuid,
content: String,
parent_message_id: Option<Uuid>,
attachments: Option<Vec<MessageAttachment>>,
},
/// Rejoindre une conversation
JoinConversation { conversation_id: Uuid },
/// Quitter une conversation
LeaveConversation { conversation_id: Uuid },
/// Marquer des messages comme lus
MarkAsRead {
conversation_id: Uuid,
message_id: Uuid,
},
/// Indicateur de frappe (typing indicator)
Typing {
conversation_id: Uuid,
is_typing: bool,
},
/// Marquer un message comme délivré (reçu par le client)
Delivered {
conversation_id: Uuid,
message_id: Uuid,
},
/// Éditer un message
EditMessage {
message_id: Uuid,
conversation_id: Uuid,
new_content: String,
},
/// Supprimer un message
DeleteMessage {
message_id: Uuid,
conversation_id: Uuid,
},
/// Ajouter une réaction
AddReaction {
message_id: Uuid,
conversation_id: Uuid,
emoji: String, // String representation from ReactionEmoji
},
/// Retirer une réaction
RemoveReaction {
message_id: Uuid,
conversation_id: Uuid,
},
/// Récupérer l'historique avec pagination
FetchHistory {
conversation_id: Uuid,
before: Option<chrono::DateTime<chrono::Utc>>,
after: Option<chrono::DateTime<chrono::Utc>>,
limit: Option<usize>,
},
/// Rechercher des messages
SearchMessages {
conversation_id: Uuid,
query: String,
limit: Option<usize>,
offset: Option<usize>,
},
/// Synchroniser les messages depuis un timestamp (offline sync)
SyncMessages {
conversation_id: Uuid,
since: chrono::DateTime<chrono::Utc>,
},
/// Initier un appel WebRTC
CallOffer {
conversation_id: Uuid,
target_user_id: Uuid,
sdp: String,
call_type: String, // "audio" | "video"
},
/// Accepter un appel
CallAnswer {
conversation_id: Uuid,
caller_user_id: Uuid,
sdp: String,
},
/// Échanger un candidat ICE
ICECandidate {
conversation_id: Uuid,
target_user_id: Uuid,
candidate: String,
},
/// Raccrocher
CallHangup {
conversation_id: Uuid,
target_user_id: Uuid,
},
/// Refuser un appel
CallReject {
conversation_id: Uuid,
caller_user_id: Uuid,
},
/// Ping de connexion
Ping,
}
/// Pièce jointe à un message
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MessageAttachment {
pub file_name: String,
pub file_type: String, // 'image', 'audio', 'video', 'file'
pub file_url: String,
pub file_size: Option<u64>,
}
/// Message WebSocket sortant
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "type")]
pub enum OutgoingMessage {
/// Nouveau message reçu
NewMessage {
conversation_id: Uuid,
message_id: Uuid,
sender_id: Uuid,
content: String,
created_at: chrono::DateTime<chrono::Utc>,
attachments: Option<Vec<MessageAttachment>>,
},
/// Message marqué comme lu
MessageRead {
message_id: Uuid,
user_id: Uuid,
conversation_id: Uuid,
read_at: chrono::DateTime<chrono::Utc>,
},
/// Message délivré (reçu par le client)
MessageDelivered {
message_id: Uuid,
user_id: Uuid,
conversation_id: Uuid,
delivered_at: chrono::DateTime<chrono::Utc>,
},
/// Indicateur de frappe (typing indicator)
UserTyping {
conversation_id: Uuid,
user_id: Uuid,
is_typing: bool,
},
/// Message édité
MessageEdited {
message_id: Uuid,
conversation_id: Uuid,
editor_id: Uuid,
edited_at: chrono::DateTime<chrono::Utc>,
new_content: String,
},
/// Message supprimé
MessageDeleted {
message_id: Uuid,
conversation_id: Uuid,
deleter_id: Uuid,
deleted_at: chrono::DateTime<chrono::Utc>,
},
/// Réaction ajoutée
ReactionAdded {
message_id: Uuid,
conversation_id: Uuid,
user_id: Uuid,
emoji: String,
},
/// Réaction retirée
ReactionRemoved {
message_id: Uuid,
conversation_id: Uuid,
user_id: Uuid,
},
/// Chunk d'historique (pagination)
HistoryChunk {
conversation_id: Uuid,
messages: Vec<crate::models::message::Message>,
has_more_before: bool,
has_more_after: bool,
},
/// Résultats de recherche
SearchResults {
conversation_id: Uuid,
messages: Vec<crate::models::message::Message>,
query: String,
total: i64,
},
/// Chunk de synchronisation (offline sync)
SyncChunk {
conversation_id: Uuid,
messages: Vec<crate::models::message::Message>,
last_sync: chrono::DateTime<chrono::Utc>,
},
/// Confirmation d'action
ActionConfirmed { action: String, success: bool },
/// Erreur
Error { message: String },
/// Appel WebRTC — offre
CallOffer {
conversation_id: Uuid,
caller_user_id: Uuid,
sdp: String,
call_type: String,
},
/// Appel WebRTC — réponse
CallAnswer {
conversation_id: Uuid,
target_user_id: Uuid,
from_user_id: Uuid,
sdp: String,
},
/// Appel WebRTC — candidat ICE
ICECandidate {
conversation_id: Uuid,
from_user_id: Uuid,
candidate: String,
},
/// Appel WebRTC — raccrocher
CallHangup {
conversation_id: Uuid,
user_id: Uuid,
},
/// Appel WebRTC — refusé
CallRejected {
conversation_id: Uuid,
user_id: Uuid,
},
/// Pong de connexion
Pong,
}
/// Client WebSocket connecté
pub struct WebSocketClient {
pub id: Uuid,
pub user_id: String,
pub stream: Arc<RwLock<SplitSink<WebSocket, Message>>>,
pub conversations: Arc<RwLock<HashSet<Uuid>>>,
}
impl WebSocketClient {
pub fn new(id: Uuid, user_id: String, stream: SplitSink<WebSocket, Message>) -> Self {
Self {
id,
user_id,
stream: Arc::new(RwLock::new(stream)),
conversations: Arc::new(RwLock::new(HashSet::new())),
}
}
/// Envoie un message au client
pub async fn send_message(&self, message: OutgoingMessage) -> Result<()> {
let json = serde_json::to_string(&message)
.map_err(|e| crate::error::ChatError::serialization_error("OutgoingMessage", "", e))?;
let mut stream = self.stream.write().await;
stream.send(Message::Text(json.into())).await.map_err(|e| {
crate::error::ChatError::internal_error(format!("WebSocket send error: {}", e))
})?;
Ok(())
}
/// Ajoute une conversation à la liste du client
pub async fn add_conversation(&self, conversation_id: Uuid) {
let mut conversations = self.conversations.write().await;
conversations.insert(conversation_id);
}
/// Supprime une conversation de la liste du client
pub async fn remove_conversation(&self, conversation_id: Uuid) {
let mut conversations = self.conversations.write().await;
conversations.remove(&conversation_id);
}
}
/// Gestionnaire de connexions WebSocket
pub struct WebSocketManager {
clients: Arc<RwLock<Vec<Arc<WebSocketClient>>>>,
}
impl WebSocketManager {
pub fn new() -> Self {
Self {
clients: Arc::new(RwLock::new(Vec::new())),
}
}
/// Ajoute un nouveau client
pub async fn add_client(&self, client: Arc<WebSocketClient>) {
let mut clients = self.clients.write().await;
clients.push(client);
}
/// Supprime un client
pub async fn remove_client(&self, client_id: Uuid) {
let mut clients = self.clients.write().await;
clients.retain(|c| c.id != client_id);
}
/// Diffuse un message à tous les clients d'une conversation
pub async fn broadcast_to_conversation(
&self,
conversation_id: Uuid,
message: OutgoingMessage,
) -> Result<()> {
let clients = self.clients.read().await;
for client in clients.iter() {
let conversations = client.conversations.read().await;
if conversations.contains(&conversation_id) {
// Ignore send errors during broadcast to avoid stopping
let _ = client.send_message(message.clone()).await;
}
}
Ok(())
}
/// Récupère un client par son ID
pub async fn get_client(&self, client_id: Uuid) -> Option<Arc<WebSocketClient>> {
let clients = self.clients.read().await;
clients.iter().find(|c| c.id == client_id).cloned()
}
/// Envoie un message à un utilisateur spécifique (1-to-1, pour appels)
/// exclude_client_id : ne pas envoyer au client qui a initié (éviter echo)
pub async fn send_to_user(
&self,
user_id: &str,
message: OutgoingMessage,
exclude_client_id: Option<Uuid>,
) -> Result<()> {
let clients = self.clients.read().await;
for client in clients.iter() {
if client.user_id == user_id && Some(client.id) != exclude_client_id {
let _ = client.send_message(message.clone()).await;
return Ok(());
}
}
Ok(())
}
}