Some checks failed
Veza CI / Backend (Go) (push) Failing after 8m56s
Veza CI / Frontend (Web) (push) Has been cancelled
E2E Playwright / e2e (full) (push) Has been cancelled
Veza CI / Notify on failure (push) Blocked by required conditions
Veza CI / Rust (Stream Server) (push) Successful in 5m3s
Security Scan / Secret Scanning (gitleaks) (push) Failing after 53s
Three Incus containers, each running redis-server + redis-sentinel (co-located). redis-1 = master at first boot, redis-2/3 = replicas. Sentinel quorum=2 of 3 ; failover-timeout=30s satisfies the W3 acceptance criterion. - internal/config/redis_init.go : initRedis branches on REDIS_SENTINEL_ADDRS ; non-empty -> redis.NewFailoverClient with MasterName + SentinelAddrs + SentinelPassword. Empty -> existing single-instance NewClient (dev/local stays parametric). - internal/config/config.go : 3 new fields (RedisSentinelAddrs, RedisSentinelMasterName, RedisSentinelPassword) read from env. parseRedisSentinelAddrs trims+filters CSV. - internal/metrics/cache_hit_rate.go : new RecordCacheHit / Miss counters, labelled by subsystem. Cardinality bounded. - internal/middleware/rate_limiter.go : instrument 3 Eval call sites (DDoS, frontend log throttle, upload throttle). Hit = Redis answered, Miss = error -> in-memory fallback. - internal/services/chat_pubsub.go : instrument Publish + PublishPresence. - internal/websocket/chat/presence_service.go : instrument SetOnline / SetOffline / Heartbeat / GetPresence. redis.Nil counts as a hit (legitimate empty result). - infra/ansible/roles/redis_sentinel/ : install Redis 7 + Sentinel, render redis.conf + sentinel.conf, systemd units. Vault assertion prevents shipping placeholder passwords to staging/prod. - infra/ansible/playbooks/redis_sentinel.yml : provisions the 3 containers + applies common baseline + role. - infra/ansible/inventory/lab.yml : new groups redis_ha + redis_ha_master. - infra/ansible/tests/test_redis_failover.sh : kills the master container, polls Sentinel for the new master, asserts elapsed < 30s. - config/grafana/dashboards/redis-cache-overview.json : 3 hit-rate stats (rate_limiter / chat_pubsub / presence) + ops/s breakdown. - docs/ENV_VARIABLES.md §3 : 3 new REDIS_SENTINEL_* env vars. - veza-backend-api/.env.template : 3 placeholders (empty default). Acceptance (Day 11) : Sentinel failover < 30s ; cache hit-rate dashboard populated. Lab test pending Sentinel deployment. W3 verification gate progress : Redis Sentinel ✓ (this commit), MinIO EC4+2 ⏳ Day 12, CDN ⏳ Day 13, DMCA ⏳ Day 14, embed ⏳ Day 15. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
135 lines
3.4 KiB
Go
135 lines
3.4 KiB
Go
package chat
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"time"
|
|
|
|
"veza-backend-api/internal/metrics"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/redis/go-redis/v9"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
const (
|
|
presenceTTL = 2 * time.Minute
|
|
presenceKeyPrefix = "chat:presence:"
|
|
)
|
|
|
|
type PresenceInfo struct {
|
|
UserID uuid.UUID `json:"user_id"`
|
|
Online bool `json:"online"`
|
|
LastSeen time.Time `json:"last_seen"`
|
|
}
|
|
|
|
type ChatPresenceService struct {
|
|
redis *redis.Client
|
|
logger *zap.Logger
|
|
}
|
|
|
|
func NewChatPresenceService(redisClient *redis.Client, logger *zap.Logger) *ChatPresenceService {
|
|
if logger == nil {
|
|
logger = zap.NewNop()
|
|
}
|
|
return &ChatPresenceService{
|
|
redis: redisClient,
|
|
logger: logger,
|
|
}
|
|
}
|
|
|
|
func (s *ChatPresenceService) presenceKey(userID uuid.UUID) string {
|
|
return fmt.Sprintf("%s%s", presenceKeyPrefix, userID.String())
|
|
}
|
|
|
|
func (s *ChatPresenceService) SetOnline(ctx context.Context, userID uuid.UUID) error {
|
|
if s.redis == nil {
|
|
return nil
|
|
}
|
|
|
|
info := PresenceInfo{
|
|
UserID: userID,
|
|
Online: true,
|
|
LastSeen: time.Now(),
|
|
}
|
|
|
|
data, err := json.Marshal(info)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal presence: %w", err)
|
|
}
|
|
|
|
if err := s.redis.Set(ctx, s.presenceKey(userID), data, presenceTTL).Err(); err != nil {
|
|
metrics.RecordCacheMiss("presence")
|
|
s.logger.Warn("Failed to set online presence", zap.Error(err), zap.String("user_id", userID.String()))
|
|
return fmt.Errorf("set presence: %w", err)
|
|
}
|
|
metrics.RecordCacheHit("presence")
|
|
return nil
|
|
}
|
|
|
|
func (s *ChatPresenceService) SetOffline(ctx context.Context, userID uuid.UUID) error {
|
|
if s.redis == nil {
|
|
return nil
|
|
}
|
|
|
|
if err := s.redis.Del(ctx, s.presenceKey(userID)).Err(); err != nil {
|
|
metrics.RecordCacheMiss("presence")
|
|
s.logger.Warn("Failed to delete presence", zap.Error(err), zap.String("user_id", userID.String()))
|
|
return fmt.Errorf("delete presence: %w", err)
|
|
}
|
|
metrics.RecordCacheHit("presence")
|
|
return nil
|
|
}
|
|
|
|
func (s *ChatPresenceService) Heartbeat(ctx context.Context, userID uuid.UUID) error {
|
|
if s.redis == nil {
|
|
return nil
|
|
}
|
|
|
|
info := PresenceInfo{
|
|
UserID: userID,
|
|
Online: true,
|
|
LastSeen: time.Now(),
|
|
}
|
|
|
|
data, err := json.Marshal(info)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal presence: %w", err)
|
|
}
|
|
|
|
if err := s.redis.Set(ctx, s.presenceKey(userID), data, presenceTTL).Err(); err != nil {
|
|
metrics.RecordCacheMiss("presence")
|
|
s.logger.Warn("Failed to heartbeat presence", zap.Error(err), zap.String("user_id", userID.String()))
|
|
return fmt.Errorf("heartbeat presence: %w", err)
|
|
}
|
|
metrics.RecordCacheHit("presence")
|
|
return nil
|
|
}
|
|
|
|
func (s *ChatPresenceService) GetPresence(ctx context.Context, userID uuid.UUID) (*PresenceInfo, error) {
|
|
if s.redis == nil {
|
|
return &PresenceInfo{UserID: userID, Online: false}, nil
|
|
}
|
|
|
|
data, err := s.redis.Get(ctx, s.presenceKey(userID)).Bytes()
|
|
if err == redis.Nil {
|
|
// "redis.Nil" = key doesn't exist = user is offline. That's a
|
|
// legitimate read result, not an error — count as a hit so the
|
|
// hit-rate metric reflects "Redis answered correctly".
|
|
metrics.RecordCacheHit("presence")
|
|
return &PresenceInfo{UserID: userID, Online: false}, nil
|
|
}
|
|
if err != nil {
|
|
metrics.RecordCacheMiss("presence")
|
|
return nil, fmt.Errorf("get presence: %w", err)
|
|
}
|
|
metrics.RecordCacheHit("presence")
|
|
|
|
var info PresenceInfo
|
|
if err := json.Unmarshal(data, &info); err != nil {
|
|
return nil, fmt.Errorf("unmarshal presence: %w", err)
|
|
}
|
|
|
|
return &info, nil
|
|
}
|