package services import ( "context" "sync" "veza-backend-api/internal/metrics" "github.com/google/uuid" "github.com/redis/go-redis/v9" "go.uber.org/zap" ) type ChatPubSubService struct { redisClient *redis.Client logger *zap.Logger inMemorySubscribers map[string][]chan []byte mu sync.RWMutex } func NewChatPubSubService(redisClient *redis.Client, logger *zap.Logger) *ChatPubSubService { if logger == nil { logger = zap.NewNop() } if redisClient == nil { // In multi-pod deployments the in-memory fallback silently breaks: // messages published on pod A are never seen by subscribers on pod B. // Emit a loud startup error so the misconfiguration is noticed. logger.Error("Redis unavailable, falling back to in-memory PubSub — cross-instance messages will be lost. Set REDIS_URL and restart for multi-pod correctness") } return &ChatPubSubService{ redisClient: redisClient, logger: logger, inMemorySubscribers: make(map[string][]chan []byte), } } func (s *ChatPubSubService) roomChannel(roomID uuid.UUID) string { return "chat:room:" + roomID.String() } func (s *ChatPubSubService) Publish(ctx context.Context, roomID uuid.UUID, message []byte) error { channel := s.roomChannel(roomID) if s.redisClient != nil { if err := s.redisClient.Publish(ctx, channel, message).Err(); err != nil { metrics.RecordCacheMiss("chat_pubsub") // ERROR, not Warn: the in-memory fallback only reaches subscribers // on this pod — a multi-pod chat becomes partitioned until Redis // recovers. Operators should page on this log line. s.logger.Error("Redis publish failed, in-memory fallback will not reach other pods", zap.String("channel", channel), zap.Error(err), ) s.publishInMemory(channel, message) return nil } metrics.RecordCacheHit("chat_pubsub") return nil } metrics.RecordCacheMiss("chat_pubsub") s.publishInMemory(channel, message) return nil } func (s *ChatPubSubService) Subscribe(ctx context.Context, roomID uuid.UUID) (<-chan []byte, func(), error) { channel := s.roomChannel(roomID) if s.redisClient != nil { pubsub := s.redisClient.Subscribe(ctx, channel) ch := make(chan []byte, 256) go func() { defer close(ch) for { msg, err := pubsub.ReceiveMessage(ctx) if err != nil { return } select { case ch <- []byte(msg.Payload): default: s.logger.Warn("PubSub channel full, dropping message", zap.String("channel", channel)) } } }() cancel := func() { _ = pubsub.Close() } return ch, cancel, nil } return s.subscribeInMemory(channel) } func (s *ChatPubSubService) PublishPresence(ctx context.Context, event []byte) error { channel := "chat:presence" if s.redisClient != nil { if err := s.redisClient.Publish(ctx, channel, event).Err(); err != nil { metrics.RecordCacheMiss("chat_pubsub") return err } metrics.RecordCacheHit("chat_pubsub") return nil } metrics.RecordCacheMiss("chat_pubsub") s.publishInMemory(channel, event) return nil } func (s *ChatPubSubService) SubscribePresence(ctx context.Context) (<-chan []byte, func(), error) { channel := "chat:presence" if s.redisClient != nil { pubsub := s.redisClient.Subscribe(ctx, channel) ch := make(chan []byte, 256) go func() { defer close(ch) for { msg, err := pubsub.ReceiveMessage(ctx) if err != nil { return } select { case ch <- []byte(msg.Payload): default: } } }() cancel := func() { _ = pubsub.Close() } return ch, cancel, nil } return s.subscribeInMemory(channel) } func (s *ChatPubSubService) publishInMemory(channel string, message []byte) { s.mu.RLock() defer s.mu.RUnlock() subscribers, ok := s.inMemorySubscribers[channel] if !ok { return } for _, ch := range subscribers { select { case ch <- message: default: } } } func (s *ChatPubSubService) subscribeInMemory(channel string) (<-chan []byte, func(), error) { ch := make(chan []byte, 256) s.mu.Lock() s.inMemorySubscribers[channel] = append(s.inMemorySubscribers[channel], ch) s.mu.Unlock() cancel := func() { s.mu.Lock() defer s.mu.Unlock() subs := s.inMemorySubscribers[channel] for i, sub := range subs { if sub == ch { s.inMemorySubscribers[channel] = append(subs[:i], subs[i+1:]...) break } } close(ch) } return ch, cancel, nil }