veza/veza-backend-api/internal/infrastructure/events/eventbus.go

65 lines
1.8 KiB
Go

package events
import (
"context"
"encoding/json"
"fmt"
"github.com/redis/go-redis/v9"
"go.uber.org/zap"
)
// EventBus définit l'interface pour le système d'événements
type EventBus interface {
Publish(ctx context.Context, topic string, payload interface{}) error
Subscribe(ctx context.Context, topic string, handler func(payload []byte) error)
}
// RedisEventBus implémente EventBus avec Redis Pub/Sub
type RedisEventBus struct {
client *redis.Client
logger *zap.Logger
}
// NewRedisEventBus crée une nouvelle instance de RedisEventBus
func NewRedisEventBus(client *redis.Client, logger *zap.Logger) *RedisEventBus {
return &RedisEventBus{
client: client,
logger: logger,
}
}
// Publish publie un événement sur un topic
func (b *RedisEventBus) Publish(ctx context.Context, topic string, payload interface{}) error {
data, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("failed to marshal payload: %w", err)
}
if err := b.client.Publish(ctx, topic, data).Err(); err != nil {
b.logger.Error("Failed to publish event", zap.String("topic", topic), zap.Error(err))
return err
}
b.logger.Debug("Event published", zap.String("topic", topic))
return nil
}
// Subscribe souscrit à un topic et exécute le handler pour chaque message
// Note: Cette méthode est bloquante ou doit être lancée dans une goroutine
func (b *RedisEventBus) Subscribe(ctx context.Context, topic string, handler func(payload []byte) error) {
pubsub := b.client.Subscribe(ctx, topic)
defer pubsub.Close()
ch := pubsub.Channel()
b.logger.Info("Subscribed to topic", zap.String("topic", topic))
for msg := range ch {
if err := handler([]byte(msg.Payload)); err != nil {
b.logger.Error("Error handling event",
zap.String("topic", topic),
zap.Error(err))
}
}
}