Backend Go: - Remplacement complet des anciennes migrations par la base V1 alignée sur ORIGIN. - Durcissement global du parsing JSON (BindAndValidateJSON + RespondWithAppError). - Sécurisation de config.go, CORS, statuts de santé et monitoring. - Implémentation des transactions P0 (RBAC, duplication de playlists, social toggles). - Ajout d’un job worker structuré (emails, analytics, thumbnails) + tests associés. - Nouvelle doc backend : AUDIT_CONFIG, BACKEND_CONFIG, AUTH_PASSWORD_RESET, JOB_WORKER_*. Chat server (Rust): - Refonte du pipeline JWT + sécurité, audit et rate limiting avancé. - Implémentation complète du cycle de message (read receipts, delivered, edit/delete, typing). - Nettoyage des panics, gestion d’erreurs robuste, logs structurés. - Migrations chat alignées sur le schéma UUID et nouvelles features. Stream server (Rust): - Refonte du moteur de streaming (encoding pipeline + HLS) et des modules core. - Transactions P0 pour les jobs et segments, garanties d’atomicité. - Documentation détaillée de la pipeline (AUDIT_STREAM_*, DESIGN_STREAM_PIPELINE, TRANSACTIONS_P0_IMPLEMENTATION). Documentation & audits: - TRIAGE.md et AUDIT_STABILITY.md à jour avec l’état réel des 3 services. - Cartographie complète des migrations et des transactions (DB_MIGRATIONS_*, DB_TRANSACTION_PLAN, AUDIT_DB_TRANSACTIONS, TRANSACTION_TESTS_PHASE3). - Scripts de reset et de cleanup pour la lab DB et la V1. Ce commit fige l’ensemble du travail de stabilisation P0 (UUID, backend, chat et stream) avant les phases suivantes (Coherence Guardian, WS hardening, etc.).
251 lines
7.2 KiB
Rust
251 lines
7.2 KiB
Rust
//! Module WebSocket pour le chat server
|
|
//!
|
|
//! Ce module contient:
|
|
//! - Les types de messages WebSocket (IncomingMessage, OutgoingMessage)
|
|
//! - Le gestionnaire de connexions (WebSocketManager)
|
|
//! - Le handler Axum (handler.rs)
|
|
//! - Le système de broadcast (broadcast.rs)
|
|
|
|
pub mod broadcast;
|
|
pub mod handler;
|
|
|
|
use crate::error::Result;
|
|
use axum::extract::ws::{Message, WebSocket};
|
|
use futures_util::{stream::SplitSink, SinkExt};
|
|
use serde::{Deserialize, Serialize};
|
|
use std::collections::HashSet;
|
|
use std::sync::Arc;
|
|
use tokio::sync::RwLock;
|
|
use uuid::Uuid;
|
|
|
|
/// Message WebSocket entrant
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
#[serde(tag = "type")]
|
|
pub enum IncomingMessage {
|
|
/// Envoi d'un message de chat
|
|
SendMessage {
|
|
conversation_id: Uuid,
|
|
content: String,
|
|
parent_message_id: Option<Uuid>,
|
|
},
|
|
/// Rejoindre une conversation
|
|
JoinConversation { conversation_id: Uuid },
|
|
/// Quitter une conversation
|
|
LeaveConversation { conversation_id: Uuid },
|
|
/// Marquer des messages comme lus
|
|
MarkAsRead {
|
|
conversation_id: Uuid,
|
|
message_id: Uuid,
|
|
},
|
|
/// Indicateur de frappe (typing indicator)
|
|
Typing {
|
|
conversation_id: Uuid,
|
|
is_typing: bool,
|
|
},
|
|
/// Marquer un message comme délivré (reçu par le client)
|
|
Delivered {
|
|
conversation_id: Uuid,
|
|
message_id: Uuid,
|
|
},
|
|
/// Éditer un message
|
|
EditMessage {
|
|
message_id: Uuid,
|
|
conversation_id: Uuid,
|
|
new_content: String,
|
|
},
|
|
/// Supprimer un message
|
|
DeleteMessage {
|
|
message_id: Uuid,
|
|
conversation_id: Uuid,
|
|
},
|
|
/// Récupérer l'historique avec pagination
|
|
FetchHistory {
|
|
conversation_id: Uuid,
|
|
before: Option<chrono::DateTime<chrono::Utc>>,
|
|
after: Option<chrono::DateTime<chrono::Utc>>,
|
|
limit: Option<usize>,
|
|
},
|
|
/// Rechercher des messages
|
|
SearchMessages {
|
|
conversation_id: Uuid,
|
|
query: String,
|
|
limit: Option<usize>,
|
|
offset: Option<usize>,
|
|
},
|
|
/// Synchroniser les messages depuis un timestamp (offline sync)
|
|
SyncMessages {
|
|
conversation_id: Uuid,
|
|
since: chrono::DateTime<chrono::Utc>,
|
|
},
|
|
/// Ping de connexion
|
|
Ping,
|
|
}
|
|
|
|
/// Message WebSocket sortant
|
|
#[derive(Debug, Clone, Serialize)]
|
|
#[serde(tag = "type")]
|
|
pub enum OutgoingMessage {
|
|
/// Nouveau message reçu
|
|
NewMessage {
|
|
conversation_id: Uuid,
|
|
message_id: Uuid,
|
|
sender_id: Uuid,
|
|
content: String,
|
|
created_at: chrono::DateTime<chrono::Utc>,
|
|
},
|
|
/// Message marqué comme lu
|
|
MessageRead {
|
|
message_id: Uuid,
|
|
user_id: Uuid,
|
|
conversation_id: Uuid,
|
|
read_at: chrono::DateTime<chrono::Utc>,
|
|
},
|
|
/// Message délivré (reçu par le client)
|
|
MessageDelivered {
|
|
message_id: Uuid,
|
|
user_id: Uuid,
|
|
conversation_id: Uuid,
|
|
delivered_at: chrono::DateTime<chrono::Utc>,
|
|
},
|
|
/// Indicateur de frappe (typing indicator)
|
|
UserTyping {
|
|
conversation_id: Uuid,
|
|
user_id: Uuid,
|
|
is_typing: bool,
|
|
},
|
|
/// Message édité
|
|
MessageEdited {
|
|
message_id: Uuid,
|
|
conversation_id: Uuid,
|
|
editor_id: Uuid,
|
|
edited_at: chrono::DateTime<chrono::Utc>,
|
|
new_content: String,
|
|
},
|
|
/// Message supprimé
|
|
MessageDeleted {
|
|
message_id: Uuid,
|
|
conversation_id: Uuid,
|
|
deleter_id: Uuid,
|
|
deleted_at: chrono::DateTime<chrono::Utc>,
|
|
},
|
|
/// Chunk d'historique (pagination)
|
|
HistoryChunk {
|
|
conversation_id: Uuid,
|
|
messages: Vec<crate::models::message::Message>,
|
|
has_more_before: bool,
|
|
has_more_after: bool,
|
|
},
|
|
/// Résultats de recherche
|
|
SearchResults {
|
|
conversation_id: Uuid,
|
|
messages: Vec<crate::models::message::Message>,
|
|
query: String,
|
|
total: i64,
|
|
},
|
|
/// Chunk de synchronisation (offline sync)
|
|
SyncChunk {
|
|
conversation_id: Uuid,
|
|
messages: Vec<crate::models::message::Message>,
|
|
last_sync: chrono::DateTime<chrono::Utc>,
|
|
},
|
|
/// Confirmation d'action
|
|
ActionConfirmed { action: String, success: bool },
|
|
/// Erreur
|
|
Error { message: String },
|
|
/// Pong de connexion
|
|
Pong,
|
|
}
|
|
|
|
/// Client WebSocket connecté
|
|
pub struct WebSocketClient {
|
|
pub id: Uuid,
|
|
pub user_id: String,
|
|
pub stream: Arc<RwLock<SplitSink<WebSocket, Message>>>,
|
|
pub conversations: Arc<RwLock<HashSet<Uuid>>>,
|
|
}
|
|
|
|
impl WebSocketClient {
|
|
pub fn new(id: Uuid, user_id: String, stream: SplitSink<WebSocket, Message>) -> Self {
|
|
Self {
|
|
id,
|
|
user_id,
|
|
stream: Arc::new(RwLock::new(stream)),
|
|
conversations: Arc::new(RwLock::new(HashSet::new())),
|
|
}
|
|
}
|
|
|
|
/// Envoie un message au client
|
|
pub async fn send_message(&self, message: OutgoingMessage) -> Result<()> {
|
|
let json = serde_json::to_string(&message)
|
|
.map_err(|e| crate::error::ChatError::serialization_error("OutgoingMessage", "", e))?;
|
|
|
|
let mut stream = self.stream.write().await;
|
|
stream.send(Message::Text(json.into())).await.map_err(|e| {
|
|
crate::error::ChatError::internal_error(format!("WebSocket send error: {}", e))
|
|
})?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Ajoute une conversation à la liste du client
|
|
pub async fn add_conversation(&self, conversation_id: Uuid) {
|
|
let mut conversations = self.conversations.write().await;
|
|
conversations.insert(conversation_id);
|
|
}
|
|
|
|
/// Supprime une conversation de la liste du client
|
|
pub async fn remove_conversation(&self, conversation_id: Uuid) {
|
|
let mut conversations = self.conversations.write().await;
|
|
conversations.remove(&conversation_id);
|
|
}
|
|
}
|
|
|
|
/// Gestionnaire de connexions WebSocket
|
|
pub struct WebSocketManager {
|
|
clients: Arc<RwLock<Vec<Arc<WebSocketClient>>>>,
|
|
}
|
|
|
|
impl WebSocketManager {
|
|
pub fn new() -> Self {
|
|
Self {
|
|
clients: Arc::new(RwLock::new(Vec::new())),
|
|
}
|
|
}
|
|
|
|
/// Ajoute un nouveau client
|
|
pub async fn add_client(&self, client: Arc<WebSocketClient>) {
|
|
let mut clients = self.clients.write().await;
|
|
clients.push(client);
|
|
}
|
|
|
|
/// Supprime un client
|
|
pub async fn remove_client(&self, client_id: Uuid) {
|
|
let mut clients = self.clients.write().await;
|
|
clients.retain(|c| c.id != client_id);
|
|
}
|
|
|
|
/// Diffuse un message à tous les clients d'une conversation
|
|
pub async fn broadcast_to_conversation(
|
|
&self,
|
|
conversation_id: Uuid,
|
|
message: OutgoingMessage,
|
|
) -> Result<()> {
|
|
let clients = self.clients.read().await;
|
|
|
|
for client in clients.iter() {
|
|
let conversations = client.conversations.read().await;
|
|
if conversations.contains(&conversation_id) {
|
|
// Ignore send errors during broadcast to avoid stopping
|
|
let _ = client.send_message(message.clone()).await;
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Récupère un client par son ID
|
|
pub async fn get_client(&self, client_id: Uuid) -> Option<Arc<WebSocketClient>> {
|
|
let clients = self.clients.read().await;
|
|
clients.iter().find(|c| c.id == client_id).cloned()
|
|
}
|
|
}
|