veza/veza-chat-server/src/delivered_status.rs

315 lines
10 KiB
Rust

//! Module de gestion des delivered status (messages reçus mais pas encore lus)
//!
//! Ce module fournit un système complet pour tracker quels messages
//! ont été délivrés (reçus par le client WebSocket) par quels utilisateurs.
use serde::{Deserialize, Serialize};
use sqlx::types::chrono::{DateTime, Utc};
use sqlx::{FromRow, Pool, Postgres};
use tracing::{debug, info, instrument, warn};
use uuid::Uuid;
/// Représente un delivered status pour un message
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
pub struct DeliveredStatus {
pub id: Uuid,
pub message_id: Uuid,
pub user_id: Uuid,
pub conversation_id: Uuid,
pub delivered_at: DateTime<Utc>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
/// Manager pour gérer les delivered status
pub struct DeliveredStatusManager {
pool: Pool<Postgres>,
}
impl DeliveredStatusManager {
/// Crée un nouveau DeliveredStatusManager
pub fn new(pool: Pool<Postgres>) -> Self {
Self { pool }
}
/// Marquer un message comme délivré pour un utilisateur
///
/// Si le delivered status existe déjà, met à jour le timestamp `delivered_at`.
/// Retourne le delivered status créé ou mis à jour.
#[instrument(skip(self))]
pub async fn mark_delivered(
&self,
user_id: Uuid,
message_id: Uuid,
conversation_id: Uuid,
) -> Result<DeliveredStatus, sqlx::Error> {
// Vérifier si le delivered status existe déjà
let existing: Option<DeliveredStatus> = sqlx::query_as::<_, DeliveredStatus>(
"SELECT id, message_id, user_id, conversation_id, delivered_at, created_at, updated_at
FROM delivered_status
WHERE message_id = $1 AND user_id = $2",
)
.bind(message_id)
.bind(user_id)
.fetch_optional(&self.pool)
.await?;
if let Some(mut status) = existing {
// Mettre à jour le timestamp de délivrance
let updated = sqlx::query_as::<_, DeliveredStatus>(
"UPDATE delivered_status
SET delivered_at = NOW(), updated_at = NOW()
WHERE id = $1
RETURNING id, message_id, user_id, conversation_id, delivered_at, created_at, updated_at"
)
.bind(status.id)
.fetch_one(&self.pool)
.await?;
debug!(
message_id = %message_id,
user_id = %user_id,
conversation_id = %conversation_id,
"Delivered status updated"
);
return Ok(updated);
}
// Créer un nouveau delivered status
let status = sqlx::query_as::<_, DeliveredStatus>(
"INSERT INTO delivered_status (message_id, user_id, conversation_id, delivered_at, created_at, updated_at)
VALUES ($1, $2, $3, NOW(), NOW(), NOW())
RETURNING id, message_id, user_id, conversation_id, delivered_at, created_at, updated_at"
)
.bind(message_id)
.bind(user_id)
.bind(conversation_id)
.fetch_one(&self.pool)
.await?;
info!(
message_id = %message_id,
user_id = %user_id,
conversation_id = %conversation_id,
"Message marked as delivered"
);
Ok(status)
}
/// Obtenir tous les delivered status pour un message
#[instrument(skip(self))]
pub async fn get_delivered_for_message(
&self,
message_id: Uuid,
) -> Result<Vec<DeliveredStatus>, sqlx::Error> {
let statuses = sqlx::query_as::<_, DeliveredStatus>(
"SELECT id, message_id, user_id, conversation_id, delivered_at, created_at, updated_at
FROM delivered_status
WHERE message_id = $1
ORDER BY delivered_at ASC",
)
.bind(message_id)
.fetch_all(&self.pool)
.await?;
Ok(statuses)
}
/// Obtenir un delivered status spécifique
#[instrument(skip(self))]
pub async fn get_delivered_status(
&self,
message_id: Uuid,
user_id: Uuid,
) -> Result<Option<DeliveredStatus>, sqlx::Error> {
let status = sqlx::query_as::<_, DeliveredStatus>(
"SELECT id, message_id, user_id, conversation_id, delivered_at, created_at, updated_at
FROM delivered_status
WHERE message_id = $1 AND user_id = $2",
)
.bind(message_id)
.bind(user_id)
.fetch_optional(&self.pool)
.await?;
Ok(status)
}
/// Vérifier si un message a été délivré à un utilisateur
#[instrument(skip(self))]
pub async fn is_delivered(&self, message_id: Uuid, user_id: Uuid) -> Result<bool, sqlx::Error> {
let exists: bool = sqlx::query_scalar(
"SELECT EXISTS(
SELECT 1 FROM delivered_status
WHERE message_id = $1 AND user_id = $2
)",
)
.bind(message_id)
.bind(user_id)
.fetch_one(&self.pool)
.await?;
Ok(exists)
}
/// Vérifier que le message appartient à la conversation indiquée
#[instrument(skip(self))]
pub async fn verify_message_belongs_to_conversation(
&self,
message_id: Uuid,
conversation_id: Uuid,
) -> Result<bool, sqlx::Error> {
let belongs: bool = sqlx::query_scalar(
"SELECT EXISTS(
SELECT 1 FROM messages
WHERE id = $1 AND conversation_id = $2
)",
)
.bind(message_id)
.bind(conversation_id)
.fetch_one(&self.pool)
.await?;
if !belongs {
warn!(
message_id = %message_id,
conversation_id = %conversation_id,
"Message does not belong to conversation"
);
}
Ok(belongs)
}
}
#[cfg(test)]
mod tests {
use super::*;
use sqlx::PgPool;
/// Setup une base de données de test
async fn setup_test_db() -> PgPool {
let database_url =
std::env::var("DATABASE_URL").expect("DATABASE_URL must be set for tests");
sqlx::PgPool::connect(&database_url)
.await
.expect("Failed to connect to test database")
}
#[tokio::test]
#[ignore] // Nécessite une base de données de test
async fn test_mark_delivered_creates_status() {
let pool = setup_test_db().await;
let manager = DeliveredStatusManager::new(pool);
// Créer des UUIDs de test
let user_id = Uuid::new_v4();
let message_id = Uuid::new_v4();
let conversation_id = Uuid::new_v4();
// Marquer comme délivré
let status = manager
.mark_delivered(user_id, message_id, conversation_id)
.await
.expect("Should mark message as delivered");
assert_eq!(status.message_id, message_id);
assert_eq!(status.user_id, user_id);
assert_eq!(status.conversation_id, conversation_id);
}
#[tokio::test]
#[ignore] // Nécessite une base de données de test
async fn test_mark_delivered_updates_existing() {
let pool = setup_test_db().await;
let manager = DeliveredStatusManager::new(pool);
let user_id = Uuid::new_v4();
let message_id = Uuid::new_v4();
let conversation_id = Uuid::new_v4();
// Première délivrance
let status1 = manager
.mark_delivered(user_id, message_id, conversation_id)
.await
.expect("Should mark message as delivered");
// Attendre un peu pour que le timestamp change
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
// Deuxième délivrance (devrait mettre à jour)
let status2 = manager
.mark_delivered(user_id, message_id, conversation_id)
.await
.expect("Should update existing status");
// Le delivered_at devrait être mis à jour
assert!(status2.delivered_at >= status1.delivered_at);
assert_eq!(status1.id, status2.id); // Même ID
}
#[tokio::test]
#[ignore] // Nécessite une base de données de test
async fn test_get_delivered_for_message() {
let pool = setup_test_db().await;
let manager = DeliveredStatusManager::new(pool);
let message_id = Uuid::new_v4();
let conversation_id = Uuid::new_v4();
let user1 = Uuid::new_v4();
let user2 = Uuid::new_v4();
// Marquer comme délivré par deux utilisateurs
manager
.mark_delivered(user1, message_id, conversation_id)
.await
.expect("Should mark as delivered");
manager
.mark_delivered(user2, message_id, conversation_id)
.await
.expect("Should mark as delivered");
// Récupérer tous les delivered status
let statuses = manager
.get_delivered_for_message(message_id)
.await
.expect("Should get statuses");
assert_eq!(statuses.len(), 2);
assert!(statuses.iter().any(|s| s.user_id == user1));
assert!(statuses.iter().any(|s| s.user_id == user2));
}
#[tokio::test]
#[ignore] // Nécessite une base de données de test
async fn test_is_delivered() {
let pool = setup_test_db().await;
let manager = DeliveredStatusManager::new(pool);
let user_id = Uuid::new_v4();
let message_id = Uuid::new_v4();
let conversation_id = Uuid::new_v4();
// Avant le marquage
let is_delivered_before = manager
.is_delivered(message_id, user_id)
.await
.expect("Should check status");
assert!(!is_delivered_before);
// Après le marquage
manager
.mark_delivered(user_id, message_id, conversation_id)
.await
.expect("Should mark as delivered");
let is_delivered_after = manager
.is_delivered(message_id, user_id)
.await
.expect("Should check status");
assert!(is_delivered_after);
}
}