veza/veza-backend-api/internal/database/database.go
senke f0ba7de543 state-ownership: delete unused optimisticStoreUpdates.ts file
- Deleted apps/web/src/utils/optimisticStoreUpdates.ts (unused file)
- File was unused - no imports found in codebase
- Mutations already use React Query's onMutate pattern
- No TypeScript errors after deletion
- Actions 4.4.1.2 and 4.4.1.3 complete
2026-01-15 19:26:53 +01:00

638 lines
19 KiB
Go

package database
import (
"context"
"database/sql"
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"time"
"veza-backend-api/internal/models"
"github.com/google/uuid"
"go.uber.org/zap"
"gorm.io/driver/postgres"
"gorm.io/driver/sqlite" // Added sqlite driver
"gorm.io/gorm"
gormlogger "gorm.io/gorm/logger"
)
// Config contient la configuration de la base de données
type Config struct {
URL string
Host string
Port string
Username string
Password string
Database string
SSLMode string
MaxOpenConns int
MaxIdleConns int
MaxLifetime time.Duration
MaxIdleTime time.Duration
MaxRetries int // Nombre maximal de tentatives de connexion
RetryInterval time.Duration // Intervalle entre les tentatives
}
// Database représente la connexion principale à la base de données
type Database struct {
*sql.DB
GormDB *gorm.DB
config *Config
Logger *zap.Logger
}
// DB est un wrapper autour de sql.DB pour les repositories
type DB struct {
*sql.DB
}
// NewDatabaseWithRetry crée une nouvelle connexion à la base de données avec des tentatives de retry
func NewDatabaseWithRetry(cfg *Config, logger *zap.Logger) (*Database, error) {
if cfg.MaxRetries == 0 {
cfg.MaxRetries = 1 // Au moins une tentative
}
if cfg.RetryInterval == 0 {
cfg.RetryInterval = 5 * time.Second // 5 secondes par défaut
}
var db *Database
var err error
for i := 0; i < cfg.MaxRetries; i++ {
// CRITICAL: Protect logger calls from broken pipe errors
func() {
defer func() {
if r := recover(); r != nil {
_ = r
}
}()
if logger != nil {
logger.Info("🔌 Tentative de connexion à la base de données PostgreSQL",
zap.Int("attempt", i+1),
zap.Int("max_attempts", cfg.MaxRetries),
zap.String("host", cfg.Host),
zap.String("port", cfg.Port),
zap.String("database", cfg.Database))
}
}()
db, err = NewDatabase(cfg)
if err == nil {
// Set the logger on the database instance
db.Logger = logger
// CRITICAL: Protect logger calls from broken pipe errors
func() {
defer func() {
if r := recover(); r != nil {
_ = r
}
}()
if logger != nil {
logger.Info("✅ Connexion à la base de données établie avec succès après tentatives")
}
}()
return db, nil
}
// CRITICAL: Protect logger calls from broken pipe errors
func() {
defer func() {
if r := recover(); r != nil {
_ = r
}
}()
if logger != nil {
logger.Warn("❌ Échec de connexion à la base de données",
zap.Error(err),
zap.Int("attempt", i+1),
zap.Int("max_attempts", cfg.MaxRetries))
}
}()
if i < cfg.MaxRetries-1 {
// CRITICAL: Protect logger calls from broken pipe errors
func() {
defer func() {
if r := recover(); r != nil {
_ = r
}
}()
if logger != nil {
logger.Info("🔄 Nouvelle tentative dans quelques secondes...",
zap.Duration("interval", cfg.RetryInterval))
}
}()
time.Sleep(cfg.RetryInterval)
}
}
return nil, fmt.Errorf("échec de connexion à la base de données après %d tentatives: %w", cfg.MaxRetries, err)
}
// NewDatabase crée une nouvelle connexion à la base de données avec configuration
func NewDatabase(cfg *Config) (*Database, error) {
// NOTE: Do not create a logger here to avoid broken pipe errors
// The logger will be set by NewDatabaseWithRetry after successful connection
// This prevents broken pipe errors during database connection
// Construire l'URL de connexion
var dsn string
if cfg.URL != "" {
dsn = cfg.URL
} else {
dsn = fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=%s",
cfg.Host, cfg.Port, cfg.Username, cfg.Password, cfg.Database, cfg.SSLMode)
}
// Ouvrir la connexion
db, err := sql.Open("postgres", dsn)
if err != nil {
return nil, fmt.Errorf("failed to open database: %w", err)
}
// Configurer le pool de connexions optimisé
db.SetMaxOpenConns(cfg.MaxOpenConns)
db.SetMaxIdleConns(cfg.MaxIdleConns)
db.SetConnMaxLifetime(cfg.MaxLifetime)
db.SetConnMaxIdleTime(cfg.MaxIdleTime)
// Tester la connexion
if err := db.Ping(); err != nil {
return nil, fmt.Errorf("failed to ping database: %w", err)
}
// Initialiser GORM avec la même connexion
// Activer le logger GORM pour voir toutes les requêtes SQL au niveau Info
gormDB, err := gorm.Open(postgres.New(postgres.Config{
Conn: db,
}), &gorm.Config{
Logger: gormlogger.Default.LogMode(gormlogger.Info), // Afficher toutes les requêtes SQL
})
if err != nil {
return nil, fmt.Errorf("failed to initialize GORM: %w", err)
}
// Don't log here - logger will be set by NewDatabaseWithRetry after successful connection
// This prevents broken pipe errors during database connection
return &Database{
DB: db,
GormDB: gormDB,
config: cfg,
Logger: nil, // Will be set by NewDatabaseWithRetry
}, nil
}
// Initialize initialise la base de données avec les migrations
func (d *Database) Initialize() error {
if d.Logger != nil {
// Try to log, but ignore broken pipe errors
d.Logger.Info("🔧 Initialisation de la base de données...")
}
// Exécuter les migrations
if err := d.RunMigrations(); err != nil {
return fmt.Errorf("failed to run migrations: %w", err)
}
// Vérifier l'intégrité des données
if err := d.VerifyIntegrity(); err != nil {
if d.Logger != nil {
d.Logger.Warn("⚠️ Problèmes d'intégrité détectés", zap.Error(err))
}
}
if d.Logger != nil {
d.Logger.Info("✅ Base de données initialisée avec succès")
}
return nil
}
// RunMigrations exécute toutes les migrations en attente
func (d *Database) RunMigrations() error {
if d.Logger != nil {
d.Logger.Info("📦 Exécution des migrations...")
}
// STRATÉGIE 100% SQL : Les migrations SQL sont exécutées EN PREMIER
// GORM n'est plus utilisé pour créer/modifier les tables
if d.Logger != nil {
d.Logger.Info("📦 Exécution des migrations SQL...")
}
// Créer la table migrations si elle n'existe pas
createMigrationsTable := `
CREATE TABLE IF NOT EXISTS schema_migrations (
id SERIAL PRIMARY KEY,
version VARCHAR(50) NOT NULL UNIQUE,
applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
`
if _, err := d.Exec(createMigrationsTable); err != nil {
return fmt.Errorf("failed to create migrations table: %w", err)
}
// Découvir les fichiers de migration
files, err := filepath.Glob("migrations/*.sql")
if err != nil {
return fmt.Errorf("failed to list migration files: %w", err)
}
sort.Strings(files)
if len(files) == 0 {
if d.Logger != nil {
d.Logger.Warn("⚠️ Aucune migration trouvée dans le dossier migrations/")
}
}
// Exécuter chaque migration
for _, file := range files {
migration := filepath.Base(file)
// Vérifier si la migration a déjà été appliquée
var exists bool
checkQuery := "SELECT EXISTS(SELECT 1 FROM schema_migrations WHERE version = $1)"
if err := d.QueryRow(checkQuery, migration).Scan(&exists); err != nil && err != sql.ErrNoRows {
return fmt.Errorf("failed to check migration status: %w", err)
}
if exists {
if d.Logger != nil {
d.Logger.Info("Migration déjà appliquée", zap.String("migration", migration))
}
continue
}
// Lire le fichier de migration
content, err := os.ReadFile(file)
if err != nil {
if d.Logger != nil {
d.Logger.Warn("Migration non trouvée, skip", zap.String("migration", migration))
}
continue
}
migrationSQL := string(content)
// MOD-P1-002: Détecter si la migration contient CREATE EXTENSION (ne peut pas être dans transaction)
containsExtension := strings.Contains(strings.ToUpper(migrationSQL), "CREATE EXTENSION")
// MOD-P1-002: Exécuter migration avec rollback automatique en cas d'échec
if containsExtension {
// Extensions ne peuvent pas être dans une transaction PostgreSQL, exécuter directement
// Mais on doit quand même enregistrer dans schema_migrations de manière atomique
if d.Logger != nil {
d.Logger.Info("Exécution migration avec extensions (hors transaction)",
zap.String("migration", migration))
}
// Exécuter la migration directement (extensions)
if _, err := d.Exec(migrationSQL); err != nil {
return fmt.Errorf("failed to execute migration %s (extensions): %w", migration, err)
}
// Enregistrer la migration dans une transaction séparée pour atomicité
tx, err := d.Begin()
if err != nil {
return fmt.Errorf("failed to begin transaction for recording migration %s: %w", migration, err)
}
if _, err := tx.Exec("INSERT INTO schema_migrations (version) VALUES ($1)", migration); err != nil {
tx.Rollback()
return fmt.Errorf("failed to record migration %s: %w", migration, err)
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("failed to commit migration record transaction: %w", err)
}
} else {
// MOD-P1-002: Migration normale dans transaction avec rollback automatique garanti
tx, err := d.Begin()
if err != nil {
return fmt.Errorf("failed to begin transaction for migration %s: %w", migration, err)
}
// MOD-P1-002: Utiliser defer pour garantir rollback en cas d'erreur (même si panic)
committed := false
defer func() {
if !committed {
if rollbackErr := tx.Rollback(); rollbackErr != nil {
if d.Logger != nil {
d.Logger.Error("Failed to rollback migration transaction",
zap.String("migration", migration),
zap.Error(rollbackErr))
}
} else {
if d.Logger != nil {
d.Logger.Info("Migration transaction rolled back automatically",
zap.String("migration", migration))
}
}
}
}()
// Exécuter la migration dans la transaction
if _, err := tx.Exec(migrationSQL); err != nil {
return fmt.Errorf("failed to execute migration %s: %w", migration, err)
}
// Enregistrer la migration comme appliquée dans la même transaction
if _, err := tx.Exec("INSERT INTO schema_migrations (version) VALUES ($1)", migration); err != nil {
return fmt.Errorf("failed to record migration %s: %w", migration, err)
}
// Commit transaction
if err := tx.Commit(); err != nil {
return fmt.Errorf("failed to commit migration transaction for %s: %w", migration, err)
}
committed = true
}
if d.Logger != nil {
d.Logger.Info("Migration appliquée", zap.String("migration", migration))
}
}
if d.Logger != nil {
d.Logger.Info("✅ Toutes les migrations SQL ont été appliquées")
}
// Exécuter les migrations GORM APRÈS les migrations SQL
// (uniquement pour les indexes additionnels sur users, pas pour créer/modifier les tables)
if d.GormDB != nil {
if err := RunMigrations(d.GormDB); err != nil {
return fmt.Errorf("failed to run GORM migrations: %w", err)
}
if d.Logger != nil {
d.Logger.Info("✅ Migrations GORM appliquées (indexes additionnels)")
}
}
return nil
}
// VerifyIntegrity vérifie l'intégrité de base de la base de données
func (d *Database) VerifyIntegrity() error {
if d.Logger != nil {
d.Logger.Info("🔍 Vérification de l'intégrité de la base de données...")
}
// Vérifier que les tables principales existent
tables := []string{"users", "user_sessions", "tracks", "rooms", "messages"}
for _, table := range tables {
var exists bool
query := `SELECT EXISTS (
SELECT 1 FROM information_schema.tables
WHERE table_schema = 'public' AND table_name = $1
)`
if err := d.QueryRow(query, table).Scan(&exists); err != nil {
return fmt.Errorf("failed to check table %s: %w", table, err)
}
if !exists {
return fmt.Errorf("required table %s does not exist", table)
}
}
// Vérifier quelques contraintes importantes
constraints := map[string]string{
"users_username_key": "users",
"users_email_key": "users",
"user_sessions_pkey": "user_sessions",
"tracks_pkey": "tracks",
"rooms_pkey": "rooms",
"messages_pkey": "messages",
}
for constraint, table := range constraints {
var exists bool
query := `SELECT EXISTS (
SELECT 1 FROM information_schema.table_constraints
WHERE table_name = $1 AND constraint_name = $2
)`
if err := d.QueryRow(query, table, constraint).Scan(&exists); err != nil {
if d.Logger != nil {
d.Logger.Warn("Impossible de vérifier la contrainte",
zap.String("constraint", constraint),
zap.Error(err))
}
continue
}
if !exists {
if d.Logger != nil {
d.Logger.Warn("Contrainte manquante",
zap.String("constraint", constraint),
zap.String("table", table))
}
}
}
if d.Logger != nil {
d.Logger.Info("✅ Vérification d'intégrité terminée")
}
return nil
}
// Close ferme la connexion à la base de données de manière gracieuse
func (d *Database) Close() error {
if d.Logger != nil {
d.Logger.Info("🔌 Fermeture de la connexion à la base de données")
}
// Fermeture gracieuse : attendre que les requêtes en cours se terminent
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Fermer GORM d'abord
if d.GormDB != nil {
// GORM ferme automatiquement via sql.DB
}
// Fermer le pool de connexions
if err := d.DB.Close(); err != nil {
if d.Logger != nil {
d.Logger.Error("Erreur lors de la fermeture de la base de données", zap.Error(err))
}
return err
}
// Vérifier que la fermeture a réussi en utilisant le contexte
select {
case <-ctx.Done():
if d.Logger != nil {
d.Logger.Warn("Timeout lors de la fermeture de la base de données")
}
return ctx.Err()
default:
if d.Logger != nil {
d.Logger.Info("✅ Connexion à la base de données fermée avec succès")
}
return nil
}
}
// Health vérifie la santé de la connexion à la base de données
func (d *Database) Health() error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return d.PingContext(ctx)
}
// Stats retourne les statistiques de la base de données
func (d *Database) Stats() sql.DBStats {
return d.DB.Stats()
}
// GetUserByOAuthID récupère un utilisateur par son OAuth ID et provider
func (d *Database) GetUserByOAuthID(oauthID, provider string) (*models.User, error) {
// TODO: Implémenter OAuth user lookup
return nil, fmt.Errorf("not implemented")
}
// CreateUser crée un nouvel utilisateur
func (d *Database) CreateUser(user *models.User) error {
// TODO: Implémenter avec vraie DB
return fmt.Errorf("not implemented")
}
// UpdateUser met à jour un utilisateur existant
func (d *Database) UpdateUser(user *models.User) error {
// TODO: Implémenter avec vraie DB
return fmt.Errorf("not implemented")
}
// GetUserByID récupère un utilisateur par son ID
// MIGRATION UUID: Accepte maintenant uuid.UUID au lieu de int64
func (d *Database) GetUserByID(userID uuid.UUID) (*models.User, error) {
// TODO: Implémenter avec vraie DB
return nil, fmt.Errorf("not implemented")
}
// Chat methods - using interfaces to avoid import cycles
type Message struct {
ID uuid.UUID `json:"id"`
RoomID uuid.UUID `json:"room_id"`
UserID uuid.UUID `json:"user_id"`
Content string `json:"content"`
Type string `json:"type"`
ParentID *uuid.UUID `json:"parent_id,omitempty"`
IsEdited bool `json:"is_edited"`
IsDeleted bool `json:"is_deleted"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
type Reaction struct {
ID uuid.UUID `json:"id"`
MessageID uuid.UUID `json:"message_id"`
UserID uuid.UUID `json:"user_id"`
Emoji string `json:"emoji"`
CreatedAt time.Time `json:"created_at"`
}
type Room struct {
ID uuid.UUID `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Type string `json:"type"`
IsPrivate bool `json:"is_private"`
CreatedBy uuid.UUID `json:"created_by"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
func (d *Database) CreateMessage(ctx context.Context, message *Message) error {
repo := NewChatRepository(&DB{DB: d.DB})
return repo.CreateMessage(ctx, message)
}
func (d *Database) GetMessages(ctx context.Context, roomID uuid.UUID, page, limit int, beforeID *uuid.UUID) ([]*Message, error) {
repo := NewChatRepository(&DB{DB: d.DB})
return repo.GetMessages(ctx, roomID, page, limit, beforeID)
}
func (d *Database) GetMessageByID(ctx context.Context, messageID uuid.UUID) (*Message, error) {
repo := NewChatRepository(&DB{DB: d.DB})
return repo.GetMessageByID(ctx, messageID)
}
func (d *Database) UpdateMessage(ctx context.Context, message *Message) error {
repo := NewChatRepository(&DB{DB: d.DB})
return repo.UpdateMessage(ctx, message)
}
func (d *Database) CreateReaction(ctx context.Context, reaction *Reaction) error {
repo := NewChatRepository(&DB{DB: d.DB})
return repo.CreateReaction(ctx, reaction)
}
func (d *Database) DeleteReaction(ctx context.Context, messageID, userID uuid.UUID, emoji string) error {
repo := NewChatRepository(&DB{DB: d.DB})
return repo.DeleteReaction(ctx, messageID, userID, emoji)
}
func (d *Database) CreateRoom(ctx context.Context, room *Room) error {
repo := NewChatRepository(&DB{DB: d.DB})
return repo.CreateRoom(ctx, room)
}
func (d *Database) GetRooms(ctx context.Context, userID uuid.UUID, includePrivate bool) ([]*Room, error) {
repo := NewChatRepository(&DB{DB: d.DB})
return repo.GetRooms(ctx, userID, includePrivate)
}
func (d *Database) GetDirectMessageRoom(ctx context.Context, userID1, userID2 uuid.UUID) (*Room, error) {
repo := NewChatRepository(&DB{DB: d.DB})
return repo.GetDirectMessageRoom(ctx, userID1, userID2)
}
func (d *Database) AddUserToRoom(ctx context.Context, roomID, userID uuid.UUID) error {
repo := NewChatRepository(&DB{DB: d.DB})
return repo.AddUserToRoom(ctx, roomID, userID)
}
func (d *Database) RemoveUserFromRoom(ctx context.Context, roomID, userID uuid.UUID) error {
repo := NewChatRepository(&DB{DB: d.DB})
return repo.RemoveUserFromRoom(ctx, roomID, userID)
}
func (d *Database) GetRoomUserCount(ctx context.Context, roomID uuid.UUID) (int, error) {
repo := NewChatRepository(&DB{DB: d.DB})
return repo.GetRoomUserCount(ctx, roomID)
}
func (d *Database) SearchMessages(ctx context.Context, roomID uuid.UUID, query string, limit int) ([]*Message, error) {
repo := NewChatRepository(&DB{DB: d.DB})
return repo.SearchMessages(ctx, roomID, query, limit)
}
// NewSQLiteTestDB crée une nouvelle connexion à une base de données SQLite en mémoire pour les tests.
// Pour les tests d'intégration, nous ne faisons pas d'AutoMigrate pour éviter les problèmes de DDL PostgreSQL.
// Les tests doivent mocker les interactions avec la base de données si nécessaire,
// ou s'appuyer sur des handlers qui ne touchent pas directement la base de données.
func NewSQLiteTestDB() (*Database, error) {
logger, _ := zap.NewProduction() // Ou un logger de test silencieux
// Ouvrir une connexion GORM avec SQLite en mémoire
gormDB, err := gorm.Open(sqlite.Open("file::memory:?cache=shared"), &gorm.Config{})
if err != nil {
return nil, fmt.Errorf("failed to open sqlite test database: %w", err)
}
// Ne pas exécuter AutoMigrate pour éviter les erreurs de DDL PostgreSQL.
// Les tests qui nécessitent des données devront les insérer manuellement
// ou les handlers devront être mockés/testés sans réelle interaction DB.
return &Database{
GormDB: gormDB,
Logger: logger,
}, nil
}