veza/veza-backend-api/internal/websocket/chat/hub.go
2026-03-06 19:13:16 +01:00

215 lines
4.4 KiB
Go

package chat
import (
"context"
"sync"
"github.com/google/uuid"
"go.uber.org/zap"
)
type Hub struct {
clients map[*Client]bool
rooms map[uuid.UUID]map[*Client]bool
userIndex map[uuid.UUID][]*Client
register chan *Client
unregister chan *Client
broadcast chan *RoomBroadcast
done chan struct{} // TASK-DEBT-008: lifecycle - close to stop Run()
mu sync.RWMutex
logger *zap.Logger
presenceService *ChatPresenceService
}
type RoomBroadcast struct {
RoomID uuid.UUID
Data []byte
Exclude *Client
}
func NewHub(logger *zap.Logger, presenceService *ChatPresenceService) *Hub {
if logger == nil {
logger = zap.NewNop()
}
return &Hub{
clients: make(map[*Client]bool),
rooms: make(map[uuid.UUID]map[*Client]bool),
userIndex: make(map[uuid.UUID][]*Client),
register: make(chan *Client),
unregister: make(chan *Client),
broadcast: make(chan *RoomBroadcast, 256),
done: make(chan struct{}),
logger: logger,
presenceService: presenceService,
}
}
func (h *Hub) Run() {
for {
select {
case <-h.done:
return
case client := <-h.register:
h.mu.Lock()
h.clients[client] = true
h.userIndex[client.UserID] = append(h.userIndex[client.UserID], client)
h.mu.Unlock()
if h.presenceService != nil {
_ = h.presenceService.SetOnline(context.Background(), client.UserID)
}
h.logger.Info("Client registered",
zap.String("user_id", client.UserID.String()))
case client := <-h.unregister:
h.mu.Lock()
userOffline := false
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
close(client.send)
for roomID, members := range h.rooms {
delete(members, client)
if len(members) == 0 {
delete(h.rooms, roomID)
}
}
clients := h.userIndex[client.UserID]
for i, c := range clients {
if c == client {
h.userIndex[client.UserID] = append(clients[:i], clients[i+1:]...)
break
}
}
if len(h.userIndex[client.UserID]) == 0 {
delete(h.userIndex, client.UserID)
userOffline = true
}
}
h.mu.Unlock()
if h.presenceService != nil && userOffline {
_ = h.presenceService.SetOffline(context.Background(), client.UserID)
}
h.logger.Info("Client unregistered",
zap.String("user_id", client.UserID.String()))
case msg := <-h.broadcast:
h.mu.RLock()
members, ok := h.rooms[msg.RoomID]
if ok {
for client := range members {
if msg.Exclude != nil && client == msg.Exclude {
continue
}
select {
case client.send <- msg.Data:
default:
go func(c *Client) {
h.unregister <- c
}(client)
}
}
}
h.mu.RUnlock()
}
}
}
func (h *Hub) JoinRoom(client *Client, roomID uuid.UUID) {
h.mu.Lock()
defer h.mu.Unlock()
if h.rooms[roomID] == nil {
h.rooms[roomID] = make(map[*Client]bool)
}
h.rooms[roomID][client] = true
}
func (h *Hub) LeaveRoom(client *Client, roomID uuid.UUID) {
h.mu.Lock()
defer h.mu.Unlock()
if members, ok := h.rooms[roomID]; ok {
delete(members, client)
if len(members) == 0 {
delete(h.rooms, roomID)
}
}
}
func (h *Hub) BroadcastToRoom(roomID uuid.UUID, data []byte, exclude *Client) {
h.broadcast <- &RoomBroadcast{
RoomID: roomID,
Data: data,
Exclude: exclude,
}
}
func (h *Hub) SendToUser(userID uuid.UUID, data []byte) {
h.mu.RLock()
defer h.mu.RUnlock()
clients, ok := h.userIndex[userID]
if !ok {
return
}
for _, client := range clients {
select {
case client.send <- data:
default:
}
}
}
func (h *Hub) SendToClient(client *Client, data []byte) {
select {
case client.send <- data:
default:
}
}
func (h *Hub) Register(client *Client) {
h.register <- client
}
func (h *Hub) Unregister(client *Client) {
h.unregister <- client
}
func (h *Hub) IsUserOnline(userID uuid.UUID) bool {
h.mu.RLock()
defer h.mu.RUnlock()
return len(h.userIndex[userID]) > 0
}
func (h *Hub) GetRoomMembers(roomID uuid.UUID) []*Client {
h.mu.RLock()
defer h.mu.RUnlock()
members, ok := h.rooms[roomID]
if !ok {
return nil
}
result := make([]*Client, 0, len(members))
for client := range members {
result = append(result, client)
}
return result
}
func (h *Hub) GetConnectedUsersCount() int {
h.mu.RLock()
defer h.mu.RUnlock()
return len(h.userIndex)
}
// Shutdown stops the Hub's Run loop (TASK-DEBT-008: goroutine lifecycle)
func (h *Hub) Shutdown() {
close(h.done)
}