veza/veza-backend-api/internal/services/chat_pubsub.go
senke f80d46a153 fix(chat,config): require REDIS_URL in prod + error on in-memory fallback
Two connected failure modes that silently break multi-pod deployments:

  1. `RedisURL` has a struct-level default (`redis://<appDomain>:6379`)
     that makes `c.RedisURL == ""` always false. An operator forgetting
     to set `REDIS_URL` booted against a phantom host — every Redis call
     would then fail, and `ChatPubSubService` would quietly fall back to
     an in-memory map. On a single-pod deploy that "works"; on two pods
     it silently partitions chat (messages on pod A never reach
     subscribers on pod B).
  2. The fallback itself was logged at `Warn` level, buried under normal
     traffic. Operators only noticed when users reported stuck chats.

Changes:

  * `config.go` (`ValidateForEnvironment` prod branch): new check that
    `os.Getenv("REDIS_URL")` is non-empty. The struct field is left
    alone (dev + test still use the default); we inspect the raw env so
    the check is "explicitly set" rather than "non-empty after defaults".
  * `chat_pubsub.go` `NewChatPubSubService`: if `redisClient == nil`,
    emit an `ERROR` at construction time naming the failure mode
    ("cross-instance messages will be lost"). Same `Warn`→`Error`
    promotion for the `Publish` fallback path — runbook-worthy.

Tests: new `chat_pubsub_test.go` with a `zaptest/observer` that asserts
the ERROR-level log fires exactly once when Redis is nil, plus an
in-memory fan-out happy-path so single-pod dev behaviour stays covered.
New `TestValidateForEnvironment_RedisURLRequiredInProduction` mirrors
the Hyperswitch guard test shape.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-16 14:56:47 +02:00

172 lines
4 KiB
Go

package services
import (
"context"
"sync"
"github.com/google/uuid"
"github.com/redis/go-redis/v9"
"go.uber.org/zap"
)
type ChatPubSubService struct {
redisClient *redis.Client
logger *zap.Logger
inMemorySubscribers map[string][]chan []byte
mu sync.RWMutex
}
func NewChatPubSubService(redisClient *redis.Client, logger *zap.Logger) *ChatPubSubService {
if logger == nil {
logger = zap.NewNop()
}
if redisClient == nil {
// In multi-pod deployments the in-memory fallback silently breaks:
// messages published on pod A are never seen by subscribers on pod B.
// Emit a loud startup error so the misconfiguration is noticed.
logger.Error("Redis unavailable, falling back to in-memory PubSub — cross-instance messages will be lost. Set REDIS_URL and restart for multi-pod correctness")
}
return &ChatPubSubService{
redisClient: redisClient,
logger: logger,
inMemorySubscribers: make(map[string][]chan []byte),
}
}
func (s *ChatPubSubService) roomChannel(roomID uuid.UUID) string {
return "chat:room:" + roomID.String()
}
func (s *ChatPubSubService) Publish(ctx context.Context, roomID uuid.UUID, message []byte) error {
channel := s.roomChannel(roomID)
if s.redisClient != nil {
if err := s.redisClient.Publish(ctx, channel, message).Err(); err != nil {
// ERROR, not Warn: the in-memory fallback only reaches subscribers
// on this pod — a multi-pod chat becomes partitioned until Redis
// recovers. Operators should page on this log line.
s.logger.Error("Redis publish failed, in-memory fallback will not reach other pods",
zap.String("channel", channel),
zap.Error(err),
)
s.publishInMemory(channel, message)
}
return nil
}
s.publishInMemory(channel, message)
return nil
}
func (s *ChatPubSubService) Subscribe(ctx context.Context, roomID uuid.UUID) (<-chan []byte, func(), error) {
channel := s.roomChannel(roomID)
if s.redisClient != nil {
pubsub := s.redisClient.Subscribe(ctx, channel)
ch := make(chan []byte, 256)
go func() {
defer close(ch)
for {
msg, err := pubsub.ReceiveMessage(ctx)
if err != nil {
return
}
select {
case ch <- []byte(msg.Payload):
default:
s.logger.Warn("PubSub channel full, dropping message", zap.String("channel", channel))
}
}
}()
cancel := func() {
_ = pubsub.Close()
}
return ch, cancel, nil
}
return s.subscribeInMemory(channel)
}
func (s *ChatPubSubService) PublishPresence(ctx context.Context, event []byte) error {
channel := "chat:presence"
if s.redisClient != nil {
return s.redisClient.Publish(ctx, channel, event).Err()
}
s.publishInMemory(channel, event)
return nil
}
func (s *ChatPubSubService) SubscribePresence(ctx context.Context) (<-chan []byte, func(), error) {
channel := "chat:presence"
if s.redisClient != nil {
pubsub := s.redisClient.Subscribe(ctx, channel)
ch := make(chan []byte, 256)
go func() {
defer close(ch)
for {
msg, err := pubsub.ReceiveMessage(ctx)
if err != nil {
return
}
select {
case ch <- []byte(msg.Payload):
default:
}
}
}()
cancel := func() {
_ = pubsub.Close()
}
return ch, cancel, nil
}
return s.subscribeInMemory(channel)
}
func (s *ChatPubSubService) publishInMemory(channel string, message []byte) {
s.mu.RLock()
defer s.mu.RUnlock()
subscribers, ok := s.inMemorySubscribers[channel]
if !ok {
return
}
for _, ch := range subscribers {
select {
case ch <- message:
default:
}
}
}
func (s *ChatPubSubService) subscribeInMemory(channel string) (<-chan []byte, func(), error) {
ch := make(chan []byte, 256)
s.mu.Lock()
s.inMemorySubscribers[channel] = append(s.inMemorySubscribers[channel], ch)
s.mu.Unlock()
cancel := func() {
s.mu.Lock()
defer s.mu.Unlock()
subs := s.inMemorySubscribers[channel]
for i, sub := range subs {
if sub == ch {
s.inMemorySubscribers[channel] = append(subs[:i], subs[i+1:]...)
break
}
}
close(ch)
}
return ch, cancel, nil
}