veza/veza-stream-server/src/lib.rs
senke 7af9c98a73 style(stream-server): apply rustfmt and fix golangci-lint v2 install
Two fixes surfaced by run #55:

1. veza-stream-server (47 files): cargo fmt had been run locally but
   never committed — the working tree was clean locally while HEAD
   had unformatted code. CI's `cargo fmt -- --check` caught the drift.
   This commit lands the formatting that was already staged.

2. ci.yml Install Go tools: `go install .../cmd/golangci-lint@latest`
   resolves to v1.64.8 (the old /cmd/ module path). The repo's
   .golangci.yml is v2-format, so v1 refuses with:
     "you are using a configuration file for golangci-lint v2
      with golangci-lint v1: please use golangci-lint v2"
   Switch to the /v2/cmd/ path so @latest actually gets v2.x.
2026-04-14 15:30:32 +02:00

203 lines
7 KiB
Rust

//! Stream Server pour Veza
//!
//! Serveur de streaming audio temps réel avec WebRTC
#![allow(dead_code)] // Stub fields in codecs/core for future features
pub mod analytics;
pub mod audio;
pub mod auth;
pub mod cache;
pub mod codecs;
pub mod config;
pub mod core;
pub mod database;
pub mod error;
pub mod event_bus;
pub mod health;
pub mod middleware;
pub mod monitoring;
pub mod notifications;
pub mod routes;
pub mod streaming;
pub mod structured_logging;
pub mod transcoding; // NEW: Phase 3 Transcoding Engine
pub mod utils; // ORIGIN Architecture: Event-driven via RabbitMQ
use crate::core::stream::{StreamConfig as CoreStreamConfig, StreamManager};
use crate::core::sync::{DriftCompensator, SyncConfig as CoreSyncConfig, SyncEngine, TimeServer};
use crate::streaming::websocket_transport::WebSocketSyncTransport;
use parking_lot::RwLock as PlRwLock;
use sqlx::PgPool;
use std::sync::Arc;
/// État global de l'application
/// Cette structure contient tous les services et composants nécessaires au serveur
#[derive(Clone)]
pub struct AppState {
pub config: Arc<config::Config>,
pub pool: PgPool,
// Ces champs sont initialisés dans main.rs via create_app_state()
// Utilisation de pub pour permettre l'accès depuis main.rs et les routes
pub cache: Arc<cache::FileCache>,
pub metrics: Arc<utils::metrics::Metrics>,
pub analytics: Arc<analytics::AnalyticsEngine>,
pub audio_processor: Arc<audio::processing::AudioProcessor>,
pub adaptive_streaming: Arc<streaming::adaptive::AdaptiveStreamingManager>,
pub health_monitor: Arc<health::HealthMonitor>,
pub auth_manager: Arc<auth::AuthManager>,
pub compression_engine: Arc<audio::compression::CompressionEngine>,
pub notification_service: Arc<notifications::NotificationService>,
pub websocket_manager: Arc<streaming::websocket::WebSocketManager>,
pub event_bus: Option<Arc<event_bus::RabbitMQEventBus>>, // Add RabbitMQEventBus, wrapped in Arc for Clone trait
pub transcoding_engine: Arc<transcoding::TranscodingEngine>, // Transcoding engine for audio encoding
pub sync_engine: Arc<SyncEngine>,
pub stream_manager: Arc<StreamManager>,
}
async fn build_revocation_store(
config: &config::Config,
) -> Arc<dyn auth::revocation_store::SessionRevocationStore> {
if let Some(ref redis_url) = config.cache.redis_url {
match auth::revocation_store::RedisRevocationStore::new(redis_url).await {
Ok(store) => {
tracing::info!("✅ JWT revocation store: Redis (persistant)");
return Arc::new(store);
}
Err(e) => {
tracing::warn!(
"⚠️ Redis non disponible pour revocation: {}. Fallback in-memory.",
e
);
}
}
}
Arc::new(auth::revocation_store::InMemoryRevocationStore::new())
}
impl AppState {
pub async fn new(config: config::Config) -> Result<Self, Box<dyn std::error::Error>> {
let config_arc = Arc::new(config.clone());
// Create database pool
let pool = database::create_pool_from_config(&config.database).await?;
// Run migrations if enabled
if config.database.migrate_on_start {
tracing::info!("🔄 Exécution des migrations de base de données...");
sqlx::migrate!("./migrations")
.run(&pool)
.await
.map_err(|e| format!("Migration failed: {}", e))?;
tracing::info!("✅ Migrations terminées avec succès");
}
// Cache needs max_size_mb as usize
let cache = Arc::new(cache::FileCache::new(config.cache.max_size_mb as usize));
// Metrics needs config
let metrics = Arc::new(utils::metrics::Metrics::new(config_arc.clone()));
// AnalyticsEngine uses the shared pool
let analytics =
Arc::new(analytics::AnalyticsEngine::new(pool.clone(), config_arc.clone()).await?);
let audio_processor = Arc::new(audio::processing::AudioProcessor::new(config_arc.clone()));
let compression_engine = Arc::new(audio::compression::CompressionEngine::new(
config_arc.clone(),
));
// HealthMonitor needs config and analytics for db check
let health_monitor = Arc::new(health::HealthMonitor::new(
config_arc.clone(),
analytics.clone(),
));
// Revocation store: Redis si REDIS_URL défini, sinon in-memory
let revocation_store = build_revocation_store(&config).await;
// AuthManager with revocation store
let auth_manager = Arc::new(auth::AuthManager::with_revocation_store(
config_arc.clone(),
revocation_store,
)?);
let notification_service =
Arc::new(notifications::NotificationService::new(config_arc.clone()));
// WebSocketManager::new takes no arguments
let websocket_manager = Arc::new(streaming::websocket::WebSocketManager::new());
// Adaptive streaming manager
let adaptive_streaming = Arc::new(streaming::adaptive::AdaptiveStreamingManager::new(
config_arc.clone(),
));
// Transcoding engine
let worker_count = num_cpus::get();
let transcoding_engine = Arc::new(transcoding::TranscodingEngine::new(worker_count));
// Démarrer les workers
transcoding_engine.start();
// SyncEngine initialization
let time_server = Arc::new(
TimeServer::new(vec![])
.await
.map_err(|e| format!("Failed to init TimeServer: {}", e))?,
);
let drift_compensator = Arc::new(DriftCompensator::new());
let sync_config = Arc::new(PlRwLock::new(CoreSyncConfig::default()));
let sync_transport = Arc::new(WebSocketSyncTransport::new(websocket_manager.clone()));
let sync_engine = Arc::new(SyncEngine::new(
time_server,
drift_compensator,
sync_config,
Some(sync_transport), // Now using real transport!
));
// StreamManager initialization
let stream_manager = Arc::new(
StreamManager::new(
CoreStreamConfig::default(),
pool.clone(),
sync_engine.clone(),
)
.await?,
);
// Start the sync loop
stream_manager.start_sync_loop().await;
Ok(Self {
config: config_arc,
pool,
cache,
metrics,
analytics,
audio_processor,
adaptive_streaming,
health_monitor,
auth_manager,
compression_engine,
notification_service,
websocket_manager,
event_bus: None, // Initialisé à None, sera assigné dans main.rs
transcoding_engine,
sync_engine,
stream_manager,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_app_state_structure() {
// Just verify struct exists
assert!(true);
}
}