veza/veza-backend-api/internal/websocket/chat/hub.go
senke d3e3ba9b33 feat(chat): Redis rate limiter, persistent presence, PostgreSQL full-text search
- Rewrite chat rate limiter with Redis sliding window (sorted sets) and
  automatic in-memory fallback when Redis is unavailable
- Add ChatPresenceService with Redis-backed online/offline/heartbeat
  tracking (2min TTL), integrated into Hub register/unregister
- Add migration 113: tsvector column with GIN index and auto-update
  trigger on messages table for full-text search
- Update Search repository method to use ts_rank ordering instead of ILIKE
- Wire Redis client into chat WebSocket setup in router.go
- Add comprehensive tests: rate limiter, presence, 100-user concurrent benchmark
2026-02-22 21:17:51 +01:00

206 lines
4.2 KiB
Go

package chat
import (
"context"
"sync"
"github.com/google/uuid"
"go.uber.org/zap"
)
type Hub struct {
clients map[*Client]bool
rooms map[uuid.UUID]map[*Client]bool
userIndex map[uuid.UUID][]*Client
register chan *Client
unregister chan *Client
broadcast chan *RoomBroadcast
mu sync.RWMutex
logger *zap.Logger
presenceService *ChatPresenceService
}
type RoomBroadcast struct {
RoomID uuid.UUID
Data []byte
Exclude *Client
}
func NewHub(logger *zap.Logger, presenceService *ChatPresenceService) *Hub {
if logger == nil {
logger = zap.NewNop()
}
return &Hub{
clients: make(map[*Client]bool),
rooms: make(map[uuid.UUID]map[*Client]bool),
userIndex: make(map[uuid.UUID][]*Client),
register: make(chan *Client),
unregister: make(chan *Client),
broadcast: make(chan *RoomBroadcast, 256),
logger: logger,
presenceService: presenceService,
}
}
func (h *Hub) Run() {
for {
select {
case client := <-h.register:
h.mu.Lock()
h.clients[client] = true
h.userIndex[client.UserID] = append(h.userIndex[client.UserID], client)
h.mu.Unlock()
if h.presenceService != nil {
_ = h.presenceService.SetOnline(context.Background(), client.UserID)
}
h.logger.Info("Client registered",
zap.String("user_id", client.UserID.String()))
case client := <-h.unregister:
h.mu.Lock()
userOffline := false
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
close(client.send)
for roomID, members := range h.rooms {
delete(members, client)
if len(members) == 0 {
delete(h.rooms, roomID)
}
}
clients := h.userIndex[client.UserID]
for i, c := range clients {
if c == client {
h.userIndex[client.UserID] = append(clients[:i], clients[i+1:]...)
break
}
}
if len(h.userIndex[client.UserID]) == 0 {
delete(h.userIndex, client.UserID)
userOffline = true
}
}
h.mu.Unlock()
if h.presenceService != nil && userOffline {
_ = h.presenceService.SetOffline(context.Background(), client.UserID)
}
h.logger.Info("Client unregistered",
zap.String("user_id", client.UserID.String()))
case msg := <-h.broadcast:
h.mu.RLock()
members, ok := h.rooms[msg.RoomID]
if ok {
for client := range members {
if msg.Exclude != nil && client == msg.Exclude {
continue
}
select {
case client.send <- msg.Data:
default:
go func(c *Client) {
h.unregister <- c
}(client)
}
}
}
h.mu.RUnlock()
}
}
}
func (h *Hub) JoinRoom(client *Client, roomID uuid.UUID) {
h.mu.Lock()
defer h.mu.Unlock()
if h.rooms[roomID] == nil {
h.rooms[roomID] = make(map[*Client]bool)
}
h.rooms[roomID][client] = true
}
func (h *Hub) LeaveRoom(client *Client, roomID uuid.UUID) {
h.mu.Lock()
defer h.mu.Unlock()
if members, ok := h.rooms[roomID]; ok {
delete(members, client)
if len(members) == 0 {
delete(h.rooms, roomID)
}
}
}
func (h *Hub) BroadcastToRoom(roomID uuid.UUID, data []byte, exclude *Client) {
h.broadcast <- &RoomBroadcast{
RoomID: roomID,
Data: data,
Exclude: exclude,
}
}
func (h *Hub) SendToUser(userID uuid.UUID, data []byte) {
h.mu.RLock()
defer h.mu.RUnlock()
clients, ok := h.userIndex[userID]
if !ok {
return
}
for _, client := range clients {
select {
case client.send <- data:
default:
}
}
}
func (h *Hub) SendToClient(client *Client, data []byte) {
select {
case client.send <- data:
default:
}
}
func (h *Hub) Register(client *Client) {
h.register <- client
}
func (h *Hub) Unregister(client *Client) {
h.unregister <- client
}
func (h *Hub) IsUserOnline(userID uuid.UUID) bool {
h.mu.RLock()
defer h.mu.RUnlock()
return len(h.userIndex[userID]) > 0
}
func (h *Hub) GetRoomMembers(roomID uuid.UUID) []*Client {
h.mu.RLock()
defer h.mu.RUnlock()
members, ok := h.rooms[roomID]
if !ok {
return nil
}
result := make([]*Client, 0, len(members))
for client := range members {
result = append(result, client)
}
return result
}
func (h *Hub) GetConnectedUsersCount() int {
h.mu.RLock()
defer h.mu.RUnlock()
return len(h.userIndex)
}