veza/veza-chat-server/src/event_bus.rs

211 lines
6.9 KiB
Rust

use lapin::{
options::ExchangeDeclareOptions, types::FieldTable, Channel, Connection, ConnectionProperties,
ExchangeKind,
};
use tokio::time::{sleep, Duration};
use tracing::{error, info, warn};
use crate::{
config,
error::{ChatError, Result},
};
/// Erreur spécifique pour l'indisponibilité de l'Event Bus
#[derive(Debug, thiserror::Error)]
#[error("RabbitMQ EventBus is unavailable: {message}")]
pub struct EventBusUnavailableError {
message: String,
}
impl EventBusUnavailableError {
pub fn new(message: &str) -> Self {
Self {
message: message.to_string(),
}
}
}
/// Gestionnaire de connexion RabbitMQ
pub struct RabbitMQEventBus {
config: config::RabbitMQConfig, // Use the canonical config type
connection: Option<Connection>,
channel: Option<Channel>,
pub is_enabled: bool,
}
impl RabbitMQEventBus {
/// Tente de se connecter à RabbitMQ avec une politique de retry
pub async fn new_with_retry(config: config::RabbitMQConfig) -> Result<Self> {
if !config.enable {
info!("📴 EventBus RabbitMQ désactivé par configuration.");
return Ok(Self {
config,
connection: None,
channel: None,
is_enabled: false,
});
}
let mut attempts = 0;
let max_attempts = config.max_retries;
let retry_interval = Duration::from_secs(config.retry_interval_secs);
while attempts < max_attempts {
info!(
"🔄 Tentative de connexion à RabbitMQ (url: {}) - Essai {}/{}",
config.url,
attempts + 1,
max_attempts
);
match Connection::connect(&config.url, ConnectionProperties::default()).await {
Ok(conn) => {
info!("✅ Connexion à RabbitMQ établie avec succès.");
let channel = conn.create_channel().await.map_err(|e| {
ChatError::internal_error(format!("Failed to open RabbitMQ channel: {}", e))
})?;
return Ok(Self {
config,
connection: Some(conn),
channel: Some(channel),
is_enabled: true,
});
}
Err(e) => {
warn!(
"❌ Échec de connexion à RabbitMQ (url: {}): {}. Retrying in {} seconds...",
config.url, e, config.retry_interval_secs
);
attempts += 1;
sleep(retry_interval).await;
}
}
}
error!(
"❌ Échec de connexion à RabbitMQ après {} tentatives. L'EventBus est indisponible.",
max_attempts
);
Err(ChatError::EventBusUnavailable {
source: EventBusUnavailableError::new(
"Failed to connect to RabbitMQ after multiple retries",
),
})
}
/// Publie un message sur un exchange RabbitMQ
pub async fn publish(&self, exchange: &str, routing_key: &str, payload: &[u8]) -> Result<()> {
if !self.is_enabled {
return Err(ChatError::EventBusUnavailable {
source: EventBusUnavailableError::new("EventBus is disabled"),
});
}
let channel = self
.channel
.as_ref()
.ok_or_else(|| ChatError::EventBusUnavailable {
source: EventBusUnavailableError::new("RabbitMQ channel is not available"),
})?;
channel
.basic_publish(
exchange,
routing_key,
lapin::options::BasicPublishOptions::default(),
payload,
lapin::BasicProperties::default(),
)
.await
.map(|_| ())
.map_err(|e| ChatError::internal_error(format!("Failed to publish message: {}", e)))
}
/// Crée une queue
pub async fn create_queue(&self, queue_name: &str) -> Result<()> {
if !self.is_enabled {
warn!(
"⚠️ Tentative de déclaration d'exchange sur EventBus désactivé ({})",
queue_name
); // Fixed: changed exchange_name to queue_name
return Err(ChatError::EventBusUnavailable {
source: EventBusUnavailableError::new("EventBus is disabled"),
});
}
let channel = self
.channel
.as_ref()
.ok_or_else(|| ChatError::EventBusUnavailable {
source: EventBusUnavailableError::new("RabbitMQ channel is not available"),
})?;
channel
.queue_declare(
queue_name,
lapin::options::QueueDeclareOptions {
durable: true,
..lapin::options::QueueDeclareOptions::default()
},
FieldTable::default(),
)
.await
.map(|_| ())
.map_err(|e| {
ChatError::internal_error(format!("Failed to declare queue {}: {}", queue_name, e))
})
}
/// Lie une queue à un exchange
pub async fn bind_queue(
&self,
queue_name: &str,
exchange_name: &str,
routing_key: &str,
) -> Result<()> {
if !self.is_enabled {
warn!(
"⚠️ Tentative de lier une queue sur EventBus désactivé (queue: {}, exchange: {})",
queue_name, exchange_name
);
return Err(ChatError::EventBusUnavailable {
source: EventBusUnavailableError::new("EventBus is disabled"),
});
}
let channel = self
.channel
.as_ref()
.ok_or_else(|| ChatError::EventBusUnavailable {
source: EventBusUnavailableError::new("RabbitMQ channel is not available"),
})?;
channel
.queue_bind(
queue_name,
exchange_name,
routing_key,
lapin::options::QueueBindOptions::default(),
FieldTable::default(),
)
.await
.map(|_| ())
.map_err(|e| {
ChatError::internal_error(format!(
"Failed to bind queue {} to exchange {}: {}",
queue_name, exchange_name, e
))
})
}
fn exchange_kind_from_str(s: &str) -> lapin::ExchangeKind {
match s.to_lowercase().as_str() {
"direct" => lapin::ExchangeKind::Direct,
"fanout" => lapin::ExchangeKind::Fanout,
"topic" => lapin::ExchangeKind::Topic,
"headers" => lapin::ExchangeKind::Headers,
_ => {
warn!("Unknown exchange type: {}. Defaulting to Topic.", s);
lapin::ExchangeKind::Topic
}
}
}
}