- Add migrations 109-112: read_receipts, delivered_status, message_reactions, messages extra columns - Create ReadReceipt, DeliveredStatus, MessageReaction GORM models - Update Message model with EditedAt, Status, IsPinned, Metadata fields - Enrich ChatMessageRepository with cursor pagination, search, soft delete - Create ReadReceiptRepository, DeliveredStatusRepository, ReactionRepository - Create ChatPubSubService with Redis PubSub and in-memory fallback
160 lines
3.3 KiB
Go
160 lines
3.3 KiB
Go
package services
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/redis/go-redis/v9"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type ChatPubSubService struct {
|
|
redisClient *redis.Client
|
|
logger *zap.Logger
|
|
inMemorySubscribers map[string][]chan []byte
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
func NewChatPubSubService(redisClient *redis.Client, logger *zap.Logger) *ChatPubSubService {
|
|
if logger == nil {
|
|
logger = zap.NewNop()
|
|
}
|
|
return &ChatPubSubService{
|
|
redisClient: redisClient,
|
|
logger: logger,
|
|
inMemorySubscribers: make(map[string][]chan []byte),
|
|
}
|
|
}
|
|
|
|
func (s *ChatPubSubService) roomChannel(roomID uuid.UUID) string {
|
|
return "chat:room:" + roomID.String()
|
|
}
|
|
|
|
func (s *ChatPubSubService) Publish(ctx context.Context, roomID uuid.UUID, message []byte) error {
|
|
channel := s.roomChannel(roomID)
|
|
|
|
if s.redisClient != nil {
|
|
if err := s.redisClient.Publish(ctx, channel, message).Err(); err != nil {
|
|
s.logger.Warn("Redis publish failed, using in-memory fallback", zap.Error(err))
|
|
s.publishInMemory(channel, message)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
s.publishInMemory(channel, message)
|
|
return nil
|
|
}
|
|
|
|
func (s *ChatPubSubService) Subscribe(ctx context.Context, roomID uuid.UUID) (<-chan []byte, func(), error) {
|
|
channel := s.roomChannel(roomID)
|
|
|
|
if s.redisClient != nil {
|
|
pubsub := s.redisClient.Subscribe(ctx, channel)
|
|
ch := make(chan []byte, 256)
|
|
|
|
go func() {
|
|
defer close(ch)
|
|
for {
|
|
msg, err := pubsub.ReceiveMessage(ctx)
|
|
if err != nil {
|
|
return
|
|
}
|
|
select {
|
|
case ch <- []byte(msg.Payload):
|
|
default:
|
|
s.logger.Warn("PubSub channel full, dropping message", zap.String("channel", channel))
|
|
}
|
|
}
|
|
}()
|
|
|
|
cancel := func() {
|
|
_ = pubsub.Close()
|
|
}
|
|
return ch, cancel, nil
|
|
}
|
|
|
|
return s.subscribeInMemory(channel)
|
|
}
|
|
|
|
func (s *ChatPubSubService) PublishPresence(ctx context.Context, event []byte) error {
|
|
channel := "chat:presence"
|
|
|
|
if s.redisClient != nil {
|
|
return s.redisClient.Publish(ctx, channel, event).Err()
|
|
}
|
|
|
|
s.publishInMemory(channel, event)
|
|
return nil
|
|
}
|
|
|
|
func (s *ChatPubSubService) SubscribePresence(ctx context.Context) (<-chan []byte, func(), error) {
|
|
channel := "chat:presence"
|
|
|
|
if s.redisClient != nil {
|
|
pubsub := s.redisClient.Subscribe(ctx, channel)
|
|
ch := make(chan []byte, 256)
|
|
|
|
go func() {
|
|
defer close(ch)
|
|
for {
|
|
msg, err := pubsub.ReceiveMessage(ctx)
|
|
if err != nil {
|
|
return
|
|
}
|
|
select {
|
|
case ch <- []byte(msg.Payload):
|
|
default:
|
|
}
|
|
}
|
|
}()
|
|
|
|
cancel := func() {
|
|
_ = pubsub.Close()
|
|
}
|
|
return ch, cancel, nil
|
|
}
|
|
|
|
return s.subscribeInMemory(channel)
|
|
}
|
|
|
|
func (s *ChatPubSubService) publishInMemory(channel string, message []byte) {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
subscribers, ok := s.inMemorySubscribers[channel]
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
for _, ch := range subscribers {
|
|
select {
|
|
case ch <- message:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *ChatPubSubService) subscribeInMemory(channel string) (<-chan []byte, func(), error) {
|
|
ch := make(chan []byte, 256)
|
|
|
|
s.mu.Lock()
|
|
s.inMemorySubscribers[channel] = append(s.inMemorySubscribers[channel], ch)
|
|
s.mu.Unlock()
|
|
|
|
cancel := func() {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
subs := s.inMemorySubscribers[channel]
|
|
for i, sub := range subs {
|
|
if sub == ch {
|
|
s.inMemorySubscribers[channel] = append(subs[:i], subs[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
close(ch)
|
|
}
|
|
|
|
return ch, cancel, nil
|
|
}
|