feat(api): add PostgreSQL read replica support (3.7)

- Add DATABASE_READ_URL config and InitReadReplica in database package
- Add ForRead() helper for read-only handler routing
- Update TrackService and TrackSearchService to use read replica for reads
- Document setup in DEPLOYMENT_GUIDE.md and .env.template
This commit is contained in:
senke 2026-02-14 22:50:23 +01:00
parent 45ebcb8cad
commit b73387af3c
9 changed files with 155 additions and 18 deletions

View file

@ -20,6 +20,8 @@ APP_DOMAIN=veza.fr
# PostgreSQL connection string (host ports when using docker-compose: 15432)
# In Docker: postgres:5432 | On host: veza.fr:15432
DATABASE_URL=postgres://veza:password@veza.fr:15432/veza?sslmode=disable
# Optional: Read replica for scaling read-heavy workloads (same format as DATABASE_URL)
# DATABASE_READ_URL=postgres://veza:password@veza-read-replica:5432/veza?sslmode=disable
DATABASE_MAX_OPEN_CONNS=25
DATABASE_MAX_IDLE_CONNS=5
DATABASE_CONN_MAX_LIFETIME=5m

View file

@ -73,6 +73,8 @@ The API uses environment variables for configuration. Create a `.env` file or se
```bash
# Database
DATABASE_URL=postgres://user:password@host:5432/dbname?sslmode=disable
# Optional: Read replica for scaling read-heavy workloads (same format as DATABASE_URL)
# DATABASE_READ_URL=postgres://user:password@read-replica-host:5432/dbname?sslmode=disable
# Security
JWT_SECRET=<32+ character secret>
@ -659,6 +661,35 @@ migrate -path ./migrations -database "$DATABASE_URL" up
psql -h db.veza.internal -U veza_user -d veza_prod -c "SELECT version FROM schema_migrations;"
```
### PostgreSQL Read Replica Setup
For high-traffic deployments, you can offload read queries to a PostgreSQL read replica. The API routes read-only operations (e.g. `GET /tracks`, `GET /playlists`, `GET /search`) to the replica when configured.
#### 1. Configure PostgreSQL Streaming Replication
Set up a read replica using PostgreSQL streaming replication. The replica must use the same schema as the primary (migrations run only on the primary).
#### 2. Environment Variables
```bash
# Primary (write) connection - required
DATABASE_URL=postgres://user:password@primary-host:5432/veza_prod?sslmode=require
# Read replica - optional
DATABASE_READ_URL=postgres://user:password@read-replica-host:5432/veza_prod?sslmode=require
# Replica connection pool (optional, defaults shown)
DB_READ_MAX_OPEN_CONNS=25
DB_READ_MAX_IDLE_CONNS=6
```
#### 3. Behavior
- When `DATABASE_READ_URL` is set, the API establishes a separate connection pool to the replica.
- Read-only handlers use the replica; write operations use the primary.
- If the replica connection fails at startup, the API logs a warning and uses the primary for all operations.
- Replication lag: expect a few seconds of delay; avoid time-sensitive reads on the replica.
## Health Checks and Monitoring
### Health Check Endpoints

View file

@ -32,7 +32,7 @@ func (r *APIRouter) setupInternalRoutes(router *gin.Engine) {
}
chunksDir := uploadDir + "/chunks"
trackService := trackcore.NewTrackService(r.db.GormDB, r.logger, uploadDir)
trackService := trackcore.NewTrackServiceWithDB(r.db, r.logger, uploadDir)
if r.config.CacheService != nil {
trackService.SetCacheService(r.config.CacheService)
}
@ -299,7 +299,7 @@ func (r *APIRouter) setupCoreProtectedRoutes(v1 *gin.RouterGroup) {
if uploadDir == "" {
uploadDir = "uploads/tracks"
}
trackServiceForDashboard := trackcore.NewTrackService(r.db.GormDB, r.logger, uploadDir)
trackServiceForDashboard := trackcore.NewTrackServiceWithDB(r.db, r.logger, uploadDir)
if r.config.CacheService != nil {
trackServiceForDashboard.SetCacheService(r.config.CacheService)
}

View file

@ -19,7 +19,7 @@ func (r *APIRouter) setupTrackRoutes(router *gin.RouterGroup) {
}
chunksDir := uploadDir + "/chunks"
trackService := trackcore.NewTrackService(r.db.GormDB, r.logger, uploadDir)
trackService := trackcore.NewTrackServiceWithDB(r.db, r.logger, uploadDir)
if r.config.CacheService != nil {
trackService.SetCacheService(r.config.CacheService)
}
@ -51,7 +51,7 @@ func (r *APIRouter) setupTrackRoutes(router *gin.RouterGroup) {
}
trackHandler.SetUploadValidator(uploadValidator)
trackSearchService := services.NewTrackSearchService(r.db.GormDB)
trackSearchService := services.NewTrackSearchServiceWithDB(r.db)
trackHandler.SetSearchService(trackSearchService)
trackVersionService := services.NewTrackVersionService(r.db.GormDB, r.logger, uploadDir)

View file

@ -77,7 +77,7 @@ func (r *APIRouter) setupUserRoutes(router *gin.RouterGroup) {
uploadDir = "uploads/tracks"
}
likeService := services.NewTrackLikeService(r.db.GormDB, r.logger)
trackService := trackcore.NewTrackService(r.db.GormDB, r.logger, uploadDir)
trackService := trackcore.NewTrackServiceWithDB(r.db, r.logger, uploadDir)
if r.config.CacheService != nil {
trackService.SetCacheService(r.config.CacheService)
}

View file

@ -75,6 +75,7 @@ type Config struct {
RedisURL string
RedisEnable bool // Enable/Disable Redis
DatabaseURL string
DatabaseReadURL string // Optional read replica URL (DATABASE_READ_URL)
UploadDir string // Répertoire d'upload
StreamServerURL string // URL du serveur de streaming
StreamServerInternalAPIKey string // API key for /internal/jobs/transcode (P1.1.2 - same as stream server INTERNAL_API_KEY)
@ -271,6 +272,7 @@ func NewConfig() (*Config, error) {
RedisEnable: getEnvBool("REDIS_ENABLE", true),
// SECURITY: DATABASE_URL est REQUIS - contient des credentials sensibles
DatabaseURL: databaseURL,
DatabaseReadURL: getEnv("DATABASE_READ_URL", ""),
UploadDir: getEnv("UPLOAD_DIR", "uploads"),
StreamServerURL: getEnv("STREAM_SERVER_URL", "http://"+appDomain+":8082"),
StreamServerInternalAPIKey: getEnv("STREAM_SERVER_INTERNAL_API_KEY", ""),
@ -572,7 +574,7 @@ func NewConfig() (*Config, error) {
}
// Initialiser la base de données avec retry
config.Database, err = initDatabaseWithRetry(config.DatabaseURL, config.DBMaxRetries, config.DBRetryInterval, dbLoggerZap)
config.Database, err = initDatabaseWithRetry(config.DatabaseURL, config.DatabaseReadURL, config.DBMaxRetries, config.DBRetryInterval, dbLoggerZap)
if err != nil {
// CRITICAL: Protect logger calls from broken pipe errors
func() {
@ -883,7 +885,7 @@ func (l *filteredRedisLogger) Printf(ctx context.Context, format string, v ...in
}
// initDatabaseWithRetry initialise la connexion à la base de données avec des tentatives de retry
func initDatabaseWithRetry(databaseURL string, maxRetries int, retryInterval time.Duration, logger *zap.Logger) (*database.Database, error) {
func initDatabaseWithRetry(databaseURL, databaseReadURL string, maxRetries int, retryInterval time.Duration, logger *zap.Logger) (*database.Database, error) {
dbConfig := &database.Config{
URL: databaseURL,
// BE-DB-015: Optimized connection pool settings for production
@ -901,8 +903,23 @@ func initDatabaseWithRetry(databaseURL string, maxRetries int, retryInterval tim
RetryInterval: retryInterval,
}
// Utiliser la fonction de connexion avec retry du package database
return database.NewDatabaseWithRetry(dbConfig, logger)
db, err := database.NewDatabaseWithRetry(dbConfig, logger)
if err != nil {
return nil, err
}
if databaseReadURL != "" {
readConfig := &database.Config{
URL: databaseReadURL,
MaxOpenConns: getEnvAsInt("DB_READ_MAX_OPEN_CONNS", 25),
MaxIdleConns: getEnvAsInt("DB_READ_MAX_IDLE_CONNS", 6),
MaxLifetime: getEnvAsDuration("DB_MAX_LIFETIME", 10*time.Minute),
MaxIdleTime: getEnvAsDuration("DB_MAX_IDLE_TIME", 5*time.Minute),
}
if err := db.InitReadReplica(readConfig); err != nil {
logger.Warn("Failed to init read replica, using primary for reads", zap.Error(err))
}
}
return db, nil
}
// EnvConfig représente la configuration de base chargée depuis les variables d'environnement

View file

@ -11,6 +11,7 @@ import (
"strings" // Removed strconv
"time" // MOD-P2-008: Ajouté pour timeout asynchrone
"veza-backend-api/internal/database"
"veza-backend-api/internal/models"
"veza-backend-api/internal/monitoring"
"veza-backend-api/internal/services"
@ -50,13 +51,22 @@ var (
// TrackService gère les opérations sur les tracks
// BE-SVC-001: Add cache service for track metadata
type TrackService struct {
db *gorm.DB
db *gorm.DB // Write operations (and read fallback when readDB is nil)
readDB *gorm.DB // Optional read replica for read-only operations
logger *zap.Logger
uploadDir string
maxFileSize int64
cacheService *services.CacheService
}
// forRead returns the DB to use for read operations (read replica if configured, else primary)
func (s *TrackService) forRead() *gorm.DB {
if s.readDB != nil {
return s.readDB
}
return s.db
}
// NewTrackService crée un nouveau service de tracks
func NewTrackService(db *gorm.DB, logger *zap.Logger, uploadDir string) *TrackService {
if uploadDir == "" {
@ -64,6 +74,21 @@ func NewTrackService(db *gorm.DB, logger *zap.Logger, uploadDir string) *TrackSe
}
return &TrackService{
db: db,
readDB: nil,
logger: logger,
uploadDir: uploadDir,
maxFileSize: 100 * 1024 * 1024, // 100MB
}
}
// NewTrackServiceWithDB crée un TrackService avec support read replica (utilise db.ForRead pour les lectures)
func NewTrackServiceWithDB(db *database.Database, logger *zap.Logger, uploadDir string) *TrackService {
if uploadDir == "" {
uploadDir = "uploads/tracks"
}
return &TrackService{
db: db.GormDB,
readDB: db.ForRead(),
logger: logger,
uploadDir: uploadDir,
maxFileSize: 100 * 1024 * 1024, // 100MB
@ -509,8 +534,8 @@ type TrackListParams struct {
// ListTracks récupère une liste de tracks avec pagination, filtres et tri
func (s *TrackService) ListTracks(ctx context.Context, params TrackListParams) ([]*models.Track, int64, error) {
// Créer la requête de base avec filtre sur le statut
query := s.db.WithContext(ctx).Model(&models.Track{}).Where("status = ?", models.TrackStatusCompleted)
// Créer la requête de base avec filtre sur le statut (read replica si configuré)
query := s.forRead().WithContext(ctx).Model(&models.Track{}).Where("status = ?", models.TrackStatusCompleted)
// Appliquer les filtres
if params.UserID != nil {
@ -594,9 +619,9 @@ func (s *TrackService) GetTrackByID(ctx context.Context, trackID uuid.UUID) (*mo
}
}
// Cache miss - fetch from database
// Cache miss - fetch from database (read replica si configuré)
var track models.Track
if err := s.db.WithContext(ctx).
if err := s.forRead().WithContext(ctx).
Preload("User").
First(&track, "id = ?", trackID).Error; err != nil { // Updated query
if err == gorm.ErrRecordNotFound {

View file

@ -40,9 +40,11 @@ type Config struct {
// Database représente la connexion principale à la base de données
type Database struct {
*sql.DB
GormDB *gorm.DB
config *Config
Logger *zap.Logger
GormDB *gorm.DB
ReadDB *sql.DB // Optional read replica connection
ReadGormDB *gorm.DB // GORM instance for read replica
config *Config
Logger *zap.Logger
}
// DB est un wrapper autour de sql.DB pour les repositories
@ -133,6 +135,48 @@ func NewDatabaseWithRetry(cfg *Config, logger *zap.Logger) (*Database, error) {
return nil, fmt.Errorf("échec de connexion à la base de données après %d tentatives: %w", cfg.MaxRetries, err)
}
// InitReadReplica initialise une connexion vers un read replica PostgreSQL.
// Si DATABASE_READ_URL est défini, les requêtes en lecture peuvent être routées vers le replica.
func (d *Database) InitReadReplica(cfg *Config) error {
if cfg.URL == "" {
return fmt.Errorf("read replica URL is required")
}
db, err := sql.Open("postgres", cfg.URL)
if err != nil {
return fmt.Errorf("failed to open read replica: %w", err)
}
db.SetMaxOpenConns(cfg.MaxOpenConns)
db.SetMaxIdleConns(cfg.MaxIdleConns)
db.SetConnMaxLifetime(cfg.MaxLifetime)
db.SetConnMaxIdleTime(cfg.MaxIdleTime)
if err := db.Ping(); err != nil {
_ = db.Close()
return fmt.Errorf("failed to ping read replica: %w", err)
}
gormDB, err := gorm.Open(postgres.New(postgres.Config{Conn: db}), &gorm.Config{
Logger: gormlogger.Default.LogMode(gormlogger.Info),
})
if err != nil {
_ = db.Close()
return fmt.Errorf("failed to initialize GORM for read replica: %w", err)
}
d.ReadDB = db
d.ReadGormDB = gormDB
if d.Logger != nil {
d.Logger.Info("Read replica connection established")
}
return nil
}
// ForRead retourne le *gorm.DB à utiliser pour les requêtes en lecture seule.
// Si un read replica est configuré, retourne ReadGormDB ; sinon retourne GormDB.
func (d *Database) ForRead() *gorm.DB {
if d.ReadGormDB != nil {
return d.ReadGormDB
}
return d.GormDB
}
// NewDatabase crée une nouvelle connexion à la base de données avec configuration
func NewDatabase(cfg *Config) (*Database, error) {
// NOTE: Do not create a logger here to avoid broken pipe errors
@ -463,6 +507,17 @@ func (d *Database) Close() error {
// GORM ferme automatiquement via sql.DB
}
// Fermer la connexion read replica si configurée
if d.ReadDB != nil {
if err := d.ReadDB.Close(); err != nil {
if d.Logger != nil {
d.Logger.Error("Erreur lors de la fermeture du read replica", zap.Error(err))
}
}
d.ReadDB = nil
d.ReadGormDB = nil
}
// Fermer le pool de connexions
if err := d.DB.Close(); err != nil {
if d.Logger != nil {

View file

@ -6,8 +6,10 @@ import (
"strings"
"time"
"gorm.io/gorm"
"veza-backend-api/internal/database"
"veza-backend-api/internal/models"
"gorm.io/gorm"
)
// TrackSearchParams représente les paramètres de recherche de tracks
@ -39,6 +41,11 @@ func NewTrackSearchService(db *gorm.DB) *TrackSearchService {
return &TrackSearchService{db: db}
}
// NewTrackSearchServiceWithDB crée un service de recherche avec support read replica
func NewTrackSearchServiceWithDB(db *database.Database) *TrackSearchService {
return &TrackSearchService{db: db.ForRead()}
}
// SearchTracks effectue une recherche avancée de tracks avec support de filtres combinés
func (s *TrackSearchService) SearchTracks(ctx context.Context, params TrackSearchParams) ([]*models.Track, int64, error) {
query := s.db.Model(&models.Track{}).Where("is_public = ? AND deleted_at IS NULL", true)