use lapin::{ options::ExchangeDeclareOptions, types::FieldTable, Channel, Connection, ConnectionProperties, Error as LapinError, ExchangeKind, }; use tokio::time::{sleep, Duration}; use tracing::{error, info, warn}; use crate::{ config, error::{AppError, Result}, }; /// Erreur spécifique pour l'indisponibilité de l'Event Bus #[derive(Debug, thiserror::Error)] #[error("RabbitMQ EventBus is unavailable: {message}")] pub struct EventBusUnavailableError { message: String, } impl EventBusUnavailableError { pub fn new(message: &str) -> Self { Self { message: message.to_string(), } } } /// Gestionnaire de connexion RabbitMQ pub struct RabbitMQEventBus { config: config::RabbitMQConfig, // Use the canonical config type connection: Option, channel: Option, pub is_enabled: bool, } impl RabbitMQEventBus { /// Tente de se connecter à RabbitMQ avec une politique de retry pub async fn new_with_retry(config: config::RabbitMQConfig) -> Result { if !config.enable { info!("📴 EventBus RabbitMQ désactivé par configuration."); return Ok(Self { config, connection: None, channel: None, is_enabled: false, }); } let mut attempts = 0; let max_attempts = config.max_retries; let retry_interval = Duration::from_secs(config.retry_interval_secs); while attempts < max_attempts { info!( "🔄 Tentative de connexion à RabbitMQ (url: {}) - Essai {}/{}", config.url, attempts + 1, max_attempts ); match Connection::connect(&config.url, ConnectionProperties::default()).await { Ok(conn) => { info!("✅ Connexion à RabbitMQ établie avec succès."); let channel = conn.create_channel() .await .map_err(|e| AppError::InternalError { message: format!("Failed to open RabbitMQ channel: {}", e), })?; return Ok(Self { config, connection: Some(conn), channel: Some(channel), is_enabled: true, }); } Err(e) => { warn!( "❌ Échec de connexion à RabbitMQ (url: {}): {}. Retrying in {} seconds...", config.url, e, config.retry_interval_secs ); attempts += 1; sleep(retry_interval).await; } } } error!( "❌ Échec de connexion à RabbitMQ après {} tentatives. L'EventBus est indisponible.", max_attempts ); Err(AppError::InternalError { message: EventBusUnavailableError::new( "Failed to connect to RabbitMQ after multiple retries", ) .to_string(), }) } /// Publie un message sur un exchange RabbitMQ pub async fn publish(&self, exchange: &str, routing_key: &str, payload: &[u8]) -> Result<()> { if !self.is_enabled { warn!( "⚠️ Tentative de publication sur EventBus désactivé (exchange: {}, routing_key: {})", exchange, routing_key ); return Err(AppError::InternalError { message: EventBusUnavailableError::new("EventBus is disabled").to_string(), }); } let channel = self .channel .as_ref() .ok_or_else(|| AppError::InternalError { message: EventBusUnavailableError::new("RabbitMQ channel is not available") .to_string(), })?; channel .basic_publish( exchange, routing_key, lapin::options::BasicPublishOptions::default(), payload, lapin::BasicProperties::default(), ) .await .map(|_| ()) .map_err(|e| AppError::InternalError { message: format!("Failed to publish message: {}", e), }) } /// Déclare un exchange pub async fn declare_exchange(&self, exchange_name: &str, exchange_type: &str) -> Result<()> { if !self.is_enabled { return Err(AppError::InternalError { message: EventBusUnavailableError::new("EventBus is disabled").to_string(), }); } let channel = self .channel .as_ref() .ok_or_else(|| AppError::InternalError { message: EventBusUnavailableError::new("RabbitMQ channel is not available") .to_string(), })?; channel .exchange_declare( exchange_name, Self::exchange_kind_from_str(exchange_type), ExchangeDeclareOptions { durable: true, auto_delete: false, ..ExchangeDeclareOptions::default() }, FieldTable::default(), ) .await .map(|_| ()) .map_err(|e| AppError::InternalError { message: format!("Failed to declare exchange {}: {}", exchange_name, e), }) } /// Crée une queue pub async fn create_queue(&self, queue_name: &str) -> Result<()> { let channel = self .channel .as_ref() .ok_or_else(|| AppError::InternalError { message: EventBusUnavailableError::new("RabbitMQ channel is not available") .to_string(), })?; channel .queue_declare( queue_name, lapin::options::QueueDeclareOptions { durable: true, ..lapin::options::QueueDeclareOptions::default() }, FieldTable::default(), ) .await .map(|_| ()) .map_err(|e| AppError::InternalError { message: format!("Failed to declare queue {}: {}", queue_name, e), }) } /// Lie une queue à un exchange pub async fn bind_queue( &self, queue_name: &str, exchange_name: &str, routing_key: &str, ) -> Result<()> { if !self.is_enabled { warn!( "⚠️ Tentative de lier une queue sur EventBus désactivé (queue: {}, exchange: {})", queue_name, exchange_name ); return Err(AppError::InternalError { message: EventBusUnavailableError::new("EventBus is disabled").to_string(), }); } let channel = self .channel .as_ref() .ok_or_else(|| AppError::InternalError { message: EventBusUnavailableError::new("RabbitMQ channel is not available") .to_string(), })?; channel .queue_bind( queue_name, exchange_name, routing_key, lapin::options::QueueBindOptions::default(), FieldTable::default(), ) .await .map(|_| ()) .map_err(|e| AppError::InternalError { message: format!( "Failed to bind queue {} to exchange {}: {}", queue_name, exchange_name, e ), }) } fn exchange_kind_from_str(s: &str) -> lapin::ExchangeKind { match s.to_lowercase().as_str() { "direct" => lapin::ExchangeKind::Direct, "fanout" => lapin::ExchangeKind::Fanout, "topic" => lapin::ExchangeKind::Topic, "headers" => lapin::ExchangeKind::Headers, _ => { warn!("Unknown exchange type: {}. Defaulting to Topic.", s); lapin::ExchangeKind::Topic } } } }