feat(chat): Sprint 3 -- message handlers, real-time features, permissions
- Implement full MessageHandler dispatch with all 18 incoming message types - Add handler_messages.go: SendMessage, EditMessage, DeleteMessage with ownership checks - Add handler_rooms.go: JoinConversation, LeaveConversation - Add handler_history.go: FetchHistory (cursor-based), SearchMessages (ILIKE), SyncMessages - Add handler_realtime.go: Typing, MarkAsRead, Delivered, AddReaction, RemoveReaction - Add handler_calls.go: WebRTC signaling relay (CallOffer/Answer/ICE/Hangup/Reject) - Add PermissionService: CanRead/CanSend/CanJoin/CanModerate based on room_members - Add RateLimiter: per-user per-action sliding window (in-memory) - Wire all dependencies in router.go setupChatWebSocket
This commit is contained in:
parent
e8d97741e4
commit
c7fb240dc3
9 changed files with 849 additions and 4 deletions
|
|
@ -327,7 +327,19 @@ func (r *APIRouter) setupChatWebSocket(router *gin.RouterGroup) {
|
|||
hub := chatws.NewHub(r.logger)
|
||||
go hub.Run()
|
||||
|
||||
msgHandler := chatws.NewMessageHandler(r.logger)
|
||||
msgRepo := repositories.NewChatMessageRepository(r.db.GormDB)
|
||||
readRepo := repositories.NewReadReceiptRepository(r.db.GormDB)
|
||||
deliveredRepo := repositories.NewDeliveredStatusRepository(r.db.GormDB)
|
||||
reactionRepo := repositories.NewReactionRepository(r.db.GormDB)
|
||||
|
||||
pubsub := services.NewChatPubSubService(r.config.RedisClient, r.logger)
|
||||
permissions := chatws.NewPermissionService(r.db.GormDB, r.logger)
|
||||
rateLimiter := chatws.NewRateLimiter()
|
||||
|
||||
msgHandler := chatws.NewMessageHandler(
|
||||
hub, msgRepo, readRepo, deliveredRepo, reactionRepo,
|
||||
pubsub, permissions, rateLimiter, r.logger,
|
||||
)
|
||||
|
||||
wsHandler := handlers.NewChatWebSocketHandler(chatService, hub, msgHandler, r.logger)
|
||||
|
||||
|
|
|
|||
|
|
@ -3,19 +3,48 @@ package chat
|
|||
import (
|
||||
"context"
|
||||
|
||||
"veza-backend-api/internal/repositories"
|
||||
"veza-backend-api/internal/services"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type MessageHandler struct {
|
||||
logger *zap.Logger
|
||||
msgRepo *repositories.ChatMessageRepository
|
||||
readRepo *repositories.ReadReceiptRepository
|
||||
deliveredRepo *repositories.DeliveredStatusRepository
|
||||
reactionRepo *repositories.ReactionRepository
|
||||
pubsub *services.ChatPubSubService
|
||||
permissions *PermissionService
|
||||
rateLimiter *RateLimiter
|
||||
hub *Hub
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
func NewMessageHandler(logger *zap.Logger) *MessageHandler {
|
||||
func NewMessageHandler(
|
||||
hub *Hub,
|
||||
msgRepo *repositories.ChatMessageRepository,
|
||||
readRepo *repositories.ReadReceiptRepository,
|
||||
deliveredRepo *repositories.DeliveredStatusRepository,
|
||||
reactionRepo *repositories.ReactionRepository,
|
||||
pubsub *services.ChatPubSubService,
|
||||
permissions *PermissionService,
|
||||
rateLimiter *RateLimiter,
|
||||
logger *zap.Logger,
|
||||
) *MessageHandler {
|
||||
if logger == nil {
|
||||
logger = zap.NewNop()
|
||||
}
|
||||
return &MessageHandler{
|
||||
logger: logger,
|
||||
hub: hub,
|
||||
msgRepo: msgRepo,
|
||||
readRepo: readRepo,
|
||||
deliveredRepo: deliveredRepo,
|
||||
reactionRepo: reactionRepo,
|
||||
pubsub: pubsub,
|
||||
permissions: permissions,
|
||||
rateLimiter: rateLimiter,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -24,6 +53,47 @@ func (h *MessageHandler) Dispatch(ctx context.Context, client *Client, msg *Inco
|
|||
case TypePing:
|
||||
client.SendJSON(NewPongResponse())
|
||||
|
||||
case TypeSendMessage:
|
||||
h.HandleSendMessage(ctx, client, msg)
|
||||
case TypeEditMessage:
|
||||
h.HandleEditMessage(ctx, client, msg)
|
||||
case TypeDeleteMessage:
|
||||
h.HandleDeleteMessage(ctx, client, msg)
|
||||
|
||||
case TypeJoinConversation:
|
||||
h.HandleJoinConversation(ctx, client, msg)
|
||||
case TypeLeaveConversation:
|
||||
h.HandleLeaveConversation(ctx, client, msg)
|
||||
|
||||
case TypeFetchHistory:
|
||||
h.HandleFetchHistory(ctx, client, msg)
|
||||
case TypeSearchMessages:
|
||||
h.HandleSearchMessages(ctx, client, msg)
|
||||
case TypeSyncMessages:
|
||||
h.HandleSyncMessages(ctx, client, msg)
|
||||
|
||||
case TypeTyping:
|
||||
h.HandleTyping(ctx, client, msg)
|
||||
case TypeMarkAsRead:
|
||||
h.HandleMarkAsRead(ctx, client, msg)
|
||||
case TypeDelivered:
|
||||
h.HandleDelivered(ctx, client, msg)
|
||||
case TypeAddReaction:
|
||||
h.HandleAddReaction(ctx, client, msg)
|
||||
case TypeRemoveReaction:
|
||||
h.HandleRemoveReaction(ctx, client, msg)
|
||||
|
||||
case TypeCallOffer:
|
||||
h.HandleCallOffer(ctx, client, msg)
|
||||
case TypeCallAnswer:
|
||||
h.HandleCallAnswer(ctx, client, msg)
|
||||
case TypeICECandidate:
|
||||
h.HandleICECandidate(ctx, client, msg)
|
||||
case TypeCallHangup:
|
||||
h.HandleCallHangup(ctx, client, msg)
|
||||
case TypeCallReject:
|
||||
h.HandleCallReject(ctx, client, msg)
|
||||
|
||||
default:
|
||||
h.logger.Warn("Unknown message type",
|
||||
zap.String("type", msg.Type),
|
||||
|
|
|
|||
80
veza-backend-api/internal/websocket/chat/handler_calls.go
Normal file
80
veza-backend-api/internal/websocket/chat/handler_calls.go
Normal file
|
|
@ -0,0 +1,80 @@
|
|||
package chat
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (h *MessageHandler) HandleCallOffer(ctx context.Context, client *Client, msg *IncomingMessage) {
|
||||
if msg.ConversationID == nil || msg.TargetUserID == nil {
|
||||
client.SendJSON(NewErrorResponse("conversation_id and target_user_id are required"))
|
||||
return
|
||||
}
|
||||
if msg.SDP == "" {
|
||||
client.SendJSON(NewErrorResponse("sdp is required"))
|
||||
return
|
||||
}
|
||||
|
||||
outgoing := NewCallOfferResponse(*msg.ConversationID, client.UserID, msg.SDP, msg.CallType)
|
||||
data, _ := json.Marshal(outgoing)
|
||||
h.hub.SendToUser(*msg.TargetUserID, data)
|
||||
|
||||
h.logger.Info("Call offer relayed",
|
||||
zap.String("from", client.UserID.String()),
|
||||
zap.String("to", msg.TargetUserID.String()),
|
||||
zap.String("type", msg.CallType))
|
||||
}
|
||||
|
||||
func (h *MessageHandler) HandleCallAnswer(ctx context.Context, client *Client, msg *IncomingMessage) {
|
||||
if msg.ConversationID == nil || msg.CallerUserID == nil {
|
||||
client.SendJSON(NewErrorResponse("conversation_id and caller_user_id are required"))
|
||||
return
|
||||
}
|
||||
if msg.SDP == "" {
|
||||
client.SendJSON(NewErrorResponse("sdp is required"))
|
||||
return
|
||||
}
|
||||
|
||||
outgoing := NewCallAnswerResponse(*msg.ConversationID, *msg.CallerUserID, client.UserID, msg.SDP)
|
||||
data, _ := json.Marshal(outgoing)
|
||||
h.hub.SendToUser(*msg.CallerUserID, data)
|
||||
}
|
||||
|
||||
func (h *MessageHandler) HandleICECandidate(ctx context.Context, client *Client, msg *IncomingMessage) {
|
||||
if msg.ConversationID == nil || msg.TargetUserID == nil {
|
||||
client.SendJSON(NewErrorResponse("conversation_id and target_user_id are required"))
|
||||
return
|
||||
}
|
||||
if msg.Candidate == "" {
|
||||
client.SendJSON(NewErrorResponse("candidate is required"))
|
||||
return
|
||||
}
|
||||
|
||||
outgoing := NewICECandidateResponse(*msg.ConversationID, client.UserID, msg.Candidate)
|
||||
data, _ := json.Marshal(outgoing)
|
||||
h.hub.SendToUser(*msg.TargetUserID, data)
|
||||
}
|
||||
|
||||
func (h *MessageHandler) HandleCallHangup(ctx context.Context, client *Client, msg *IncomingMessage) {
|
||||
if msg.ConversationID == nil || msg.TargetUserID == nil {
|
||||
client.SendJSON(NewErrorResponse("conversation_id and target_user_id are required"))
|
||||
return
|
||||
}
|
||||
|
||||
outgoing := NewCallHangupResponse(*msg.ConversationID, client.UserID)
|
||||
data, _ := json.Marshal(outgoing)
|
||||
h.hub.SendToUser(*msg.TargetUserID, data)
|
||||
}
|
||||
|
||||
func (h *MessageHandler) HandleCallReject(ctx context.Context, client *Client, msg *IncomingMessage) {
|
||||
if msg.ConversationID == nil || msg.CallerUserID == nil {
|
||||
client.SendJSON(NewErrorResponse("conversation_id and caller_user_id are required"))
|
||||
return
|
||||
}
|
||||
|
||||
outgoing := NewCallRejectedResponse(*msg.ConversationID, client.UserID)
|
||||
data, _ := json.Marshal(outgoing)
|
||||
h.hub.SendToUser(*msg.CallerUserID, data)
|
||||
}
|
||||
189
veza-backend-api/internal/websocket/chat/handler_history.go
Normal file
189
veza-backend-api/internal/websocket/chat/handler_history.go
Normal file
|
|
@ -0,0 +1,189 @@
|
|||
package chat
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultHistoryLimit = 50
|
||||
maxHistoryLimit = 100
|
||||
)
|
||||
|
||||
func (h *MessageHandler) HandleFetchHistory(ctx context.Context, client *Client, msg *IncomingMessage) {
|
||||
if msg.ConversationID == nil {
|
||||
client.SendJSON(NewErrorResponse("conversation_id is required"))
|
||||
return
|
||||
}
|
||||
|
||||
if !h.rateLimiter.Allow(client.UserID, "fetch_history") {
|
||||
client.SendJSON(NewErrorResponse("rate limit exceeded"))
|
||||
return
|
||||
}
|
||||
|
||||
if !h.permissions.CanRead(ctx, client.UserID, *msg.ConversationID) {
|
||||
client.SendJSON(NewErrorResponse("not allowed to read this conversation"))
|
||||
return
|
||||
}
|
||||
|
||||
limit := defaultHistoryLimit
|
||||
if msg.Limit != nil && *msg.Limit > 0 {
|
||||
limit = *msg.Limit
|
||||
if limit > maxHistoryLimit {
|
||||
limit = maxHistoryLimit
|
||||
}
|
||||
}
|
||||
|
||||
messages, err := h.msgRepo.GetConversationMessages(ctx, *msg.ConversationID, limit+1, 0)
|
||||
if err != nil {
|
||||
h.logger.Error("Failed to fetch history", zap.Error(err))
|
||||
client.SendJSON(NewErrorResponse("failed to fetch history"))
|
||||
return
|
||||
}
|
||||
|
||||
hasMoreBefore := len(messages) > limit
|
||||
if hasMoreBefore {
|
||||
messages = messages[:limit]
|
||||
}
|
||||
|
||||
dtos := make([]MessageDTO, 0, len(messages))
|
||||
for _, m := range messages {
|
||||
dtos = append(dtos, MessageDTO{
|
||||
ID: m.ID,
|
||||
ConversationID: m.ConversationID,
|
||||
SenderID: m.SenderID,
|
||||
Content: m.Content,
|
||||
MessageType: m.MessageType,
|
||||
ParentMessageID: m.ParentMessageID,
|
||||
ReplyToID: m.ReplyToID,
|
||||
IsPinned: m.IsPinned,
|
||||
IsEdited: m.IsEdited,
|
||||
IsDeleted: m.IsDeleted,
|
||||
EditedAt: m.EditedAt,
|
||||
Status: m.Status,
|
||||
Metadata: m.Metadata,
|
||||
CreatedAt: m.CreatedAt,
|
||||
UpdatedAt: m.UpdatedAt,
|
||||
})
|
||||
}
|
||||
|
||||
client.SendJSON(NewHistoryChunkResponse(*msg.ConversationID, dtos, hasMoreBefore, false))
|
||||
}
|
||||
|
||||
func (h *MessageHandler) HandleSearchMessages(ctx context.Context, client *Client, msg *IncomingMessage) {
|
||||
if msg.ConversationID == nil {
|
||||
client.SendJSON(NewErrorResponse("conversation_id is required"))
|
||||
return
|
||||
}
|
||||
if msg.Query == "" {
|
||||
client.SendJSON(NewErrorResponse("query is required"))
|
||||
return
|
||||
}
|
||||
|
||||
if !h.rateLimiter.Allow(client.UserID, "search") {
|
||||
client.SendJSON(NewErrorResponse("rate limit exceeded"))
|
||||
return
|
||||
}
|
||||
|
||||
if !h.permissions.CanRead(ctx, client.UserID, *msg.ConversationID) {
|
||||
client.SendJSON(NewErrorResponse("not allowed to read this conversation"))
|
||||
return
|
||||
}
|
||||
|
||||
limit := defaultHistoryLimit
|
||||
if msg.Limit != nil && *msg.Limit > 0 {
|
||||
limit = *msg.Limit
|
||||
if limit > maxHistoryLimit {
|
||||
limit = maxHistoryLimit
|
||||
}
|
||||
}
|
||||
|
||||
offset := 0
|
||||
if msg.Offset != nil && *msg.Offset > 0 {
|
||||
offset = *msg.Offset
|
||||
}
|
||||
|
||||
messages, total, err := h.msgRepo.Search(ctx, *msg.ConversationID, msg.Query, limit, offset)
|
||||
if err != nil {
|
||||
h.logger.Error("Failed to search messages", zap.Error(err))
|
||||
client.SendJSON(NewErrorResponse("failed to search messages"))
|
||||
return
|
||||
}
|
||||
|
||||
dtos := make([]MessageDTO, 0, len(messages))
|
||||
for _, m := range messages {
|
||||
dtos = append(dtos, MessageDTO{
|
||||
ID: m.ID,
|
||||
ConversationID: m.ConversationID,
|
||||
SenderID: m.SenderID,
|
||||
Content: m.Content,
|
||||
MessageType: m.MessageType,
|
||||
ParentMessageID: m.ParentMessageID,
|
||||
ReplyToID: m.ReplyToID,
|
||||
IsPinned: m.IsPinned,
|
||||
IsEdited: m.IsEdited,
|
||||
IsDeleted: m.IsDeleted,
|
||||
EditedAt: m.EditedAt,
|
||||
Status: m.Status,
|
||||
Metadata: m.Metadata,
|
||||
CreatedAt: m.CreatedAt,
|
||||
UpdatedAt: m.UpdatedAt,
|
||||
})
|
||||
}
|
||||
|
||||
client.SendJSON(NewSearchResultsResponse(*msg.ConversationID, dtos, msg.Query, total))
|
||||
}
|
||||
|
||||
func (h *MessageHandler) HandleSyncMessages(ctx context.Context, client *Client, msg *IncomingMessage) {
|
||||
if msg.ConversationID == nil {
|
||||
client.SendJSON(NewErrorResponse("conversation_id is required"))
|
||||
return
|
||||
}
|
||||
if msg.Since == nil {
|
||||
client.SendJSON(NewErrorResponse("since is required"))
|
||||
return
|
||||
}
|
||||
|
||||
if !h.permissions.CanRead(ctx, client.UserID, *msg.ConversationID) {
|
||||
client.SendJSON(NewErrorResponse("not allowed to read this conversation"))
|
||||
return
|
||||
}
|
||||
|
||||
since, err := time.Parse(time.RFC3339, *msg.Since)
|
||||
if err != nil {
|
||||
client.SendJSON(NewErrorResponse("invalid since format, expected RFC3339"))
|
||||
return
|
||||
}
|
||||
|
||||
messages, err := h.msgRepo.GetMessagesSince(ctx, *msg.ConversationID, since, maxHistoryLimit)
|
||||
if err != nil {
|
||||
h.logger.Error("Failed to sync messages", zap.Error(err))
|
||||
client.SendJSON(NewErrorResponse("failed to sync messages"))
|
||||
return
|
||||
}
|
||||
|
||||
dtos := make([]MessageDTO, 0, len(messages))
|
||||
for _, m := range messages {
|
||||
dtos = append(dtos, MessageDTO{
|
||||
ID: m.ID,
|
||||
ConversationID: m.ConversationID,
|
||||
SenderID: m.SenderID,
|
||||
Content: m.Content,
|
||||
MessageType: m.MessageType,
|
||||
ParentMessageID: m.ParentMessageID,
|
||||
ReplyToID: m.ReplyToID,
|
||||
IsPinned: m.IsPinned,
|
||||
IsEdited: m.IsEdited,
|
||||
IsDeleted: m.IsDeleted,
|
||||
EditedAt: m.EditedAt,
|
||||
Status: m.Status,
|
||||
Metadata: m.Metadata,
|
||||
CreatedAt: m.CreatedAt,
|
||||
UpdatedAt: m.UpdatedAt,
|
||||
})
|
||||
}
|
||||
|
||||
client.SendJSON(NewSyncChunkResponse(*msg.ConversationID, dtos, time.Now()))
|
||||
}
|
||||
170
veza-backend-api/internal/websocket/chat/handler_messages.go
Normal file
170
veza-backend-api/internal/websocket/chat/handler_messages.go
Normal file
|
|
@ -0,0 +1,170 @@
|
|||
package chat
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"veza-backend-api/internal/models"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (h *MessageHandler) HandleSendMessage(ctx context.Context, client *Client, msg *IncomingMessage) {
|
||||
if msg.ConversationID == nil {
|
||||
client.SendJSON(NewErrorResponse("conversation_id is required"))
|
||||
return
|
||||
}
|
||||
if msg.Content == "" {
|
||||
client.SendJSON(NewErrorResponse("content is required"))
|
||||
return
|
||||
}
|
||||
|
||||
if !h.rateLimiter.Allow(client.UserID, "send_message") {
|
||||
client.SendJSON(NewErrorResponse("rate limit exceeded"))
|
||||
return
|
||||
}
|
||||
|
||||
if !h.permissions.CanSend(ctx, client.UserID, *msg.ConversationID) {
|
||||
client.SendJSON(NewErrorResponse("not allowed to send messages in this conversation"))
|
||||
return
|
||||
}
|
||||
|
||||
var metadata []byte
|
||||
if len(msg.Attachments) > 0 {
|
||||
metadata, _ = json.Marshal(map[string]interface{}{
|
||||
"attachments": msg.Attachments,
|
||||
})
|
||||
}
|
||||
|
||||
chatMsg := &models.ChatMessage{
|
||||
ID: uuid.New(),
|
||||
ConversationID: *msg.ConversationID,
|
||||
SenderID: client.UserID,
|
||||
Content: msg.Content,
|
||||
MessageType: "text",
|
||||
ParentMessageID: msg.ParentMessageID,
|
||||
Status: "sent",
|
||||
Metadata: metadata,
|
||||
CreatedAt: time.Now(),
|
||||
UpdatedAt: time.Now(),
|
||||
}
|
||||
|
||||
if err := h.msgRepo.Create(ctx, chatMsg); err != nil {
|
||||
h.logger.Error("Failed to save message",
|
||||
zap.Error(err),
|
||||
zap.String("user_id", client.UserID.String()))
|
||||
client.SendJSON(NewErrorResponse("failed to send message"))
|
||||
return
|
||||
}
|
||||
|
||||
client.SendJSON(NewActionConfirmedResponse("message_sent", true))
|
||||
|
||||
outgoing := NewNewMessageResponse(
|
||||
chatMsg.ConversationID,
|
||||
chatMsg.ID,
|
||||
chatMsg.SenderID,
|
||||
chatMsg.Content,
|
||||
chatMsg.CreatedAt,
|
||||
msg.Attachments,
|
||||
)
|
||||
|
||||
data, _ := json.Marshal(outgoing)
|
||||
h.hub.BroadcastToRoom(*msg.ConversationID, data, nil)
|
||||
|
||||
if h.pubsub != nil {
|
||||
_ = h.pubsub.Publish(ctx, *msg.ConversationID, data)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *MessageHandler) HandleEditMessage(ctx context.Context, client *Client, msg *IncomingMessage) {
|
||||
if msg.MessageID == nil || msg.ConversationID == nil {
|
||||
client.SendJSON(NewErrorResponse("message_id and conversation_id are required"))
|
||||
return
|
||||
}
|
||||
if msg.NewContent == "" {
|
||||
client.SendJSON(NewErrorResponse("new_content is required"))
|
||||
return
|
||||
}
|
||||
|
||||
chatMsg, err := h.msgRepo.GetByID(ctx, *msg.MessageID)
|
||||
if err != nil {
|
||||
client.SendJSON(NewErrorResponse("message not found"))
|
||||
return
|
||||
}
|
||||
|
||||
if chatMsg.SenderID != client.UserID {
|
||||
client.SendJSON(NewErrorResponse("can only edit your own messages"))
|
||||
return
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
chatMsg.Content = msg.NewContent
|
||||
chatMsg.IsEdited = true
|
||||
chatMsg.EditedAt = &now
|
||||
|
||||
if err := h.msgRepo.Update(ctx, chatMsg); err != nil {
|
||||
h.logger.Error("Failed to update message", zap.Error(err))
|
||||
client.SendJSON(NewErrorResponse("failed to edit message"))
|
||||
return
|
||||
}
|
||||
|
||||
client.SendJSON(NewActionConfirmedResponse("message_edited", true))
|
||||
|
||||
outgoing := NewMessageEditedResponse(
|
||||
chatMsg.ID,
|
||||
chatMsg.ConversationID,
|
||||
client.UserID,
|
||||
now,
|
||||
msg.NewContent,
|
||||
)
|
||||
|
||||
data, _ := json.Marshal(outgoing)
|
||||
h.hub.BroadcastToRoom(*msg.ConversationID, data, nil)
|
||||
|
||||
if h.pubsub != nil {
|
||||
_ = h.pubsub.Publish(ctx, *msg.ConversationID, data)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *MessageHandler) HandleDeleteMessage(ctx context.Context, client *Client, msg *IncomingMessage) {
|
||||
if msg.MessageID == nil || msg.ConversationID == nil {
|
||||
client.SendJSON(NewErrorResponse("message_id and conversation_id are required"))
|
||||
return
|
||||
}
|
||||
|
||||
chatMsg, err := h.msgRepo.GetByID(ctx, *msg.MessageID)
|
||||
if err != nil {
|
||||
client.SendJSON(NewErrorResponse("message not found"))
|
||||
return
|
||||
}
|
||||
|
||||
isModerator := h.permissions.CanModerate(ctx, client.UserID, *msg.ConversationID)
|
||||
if chatMsg.SenderID != client.UserID && !isModerator {
|
||||
client.SendJSON(NewErrorResponse("can only delete your own messages"))
|
||||
return
|
||||
}
|
||||
|
||||
if err := h.msgRepo.SoftDelete(ctx, *msg.MessageID); err != nil {
|
||||
h.logger.Error("Failed to delete message", zap.Error(err))
|
||||
client.SendJSON(NewErrorResponse("failed to delete message"))
|
||||
return
|
||||
}
|
||||
|
||||
client.SendJSON(NewActionConfirmedResponse("message_deleted", true))
|
||||
|
||||
outgoing := NewMessageDeletedResponse(
|
||||
*msg.MessageID,
|
||||
*msg.ConversationID,
|
||||
client.UserID,
|
||||
time.Now(),
|
||||
)
|
||||
|
||||
data, _ := json.Marshal(outgoing)
|
||||
h.hub.BroadcastToRoom(*msg.ConversationID, data, nil)
|
||||
|
||||
if h.pubsub != nil {
|
||||
_ = h.pubsub.Publish(ctx, *msg.ConversationID, data)
|
||||
}
|
||||
}
|
||||
152
veza-backend-api/internal/websocket/chat/handler_realtime.go
Normal file
152
veza-backend-api/internal/websocket/chat/handler_realtime.go
Normal file
|
|
@ -0,0 +1,152 @@
|
|||
package chat
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"veza-backend-api/internal/models"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (h *MessageHandler) HandleTyping(ctx context.Context, client *Client, msg *IncomingMessage) {
|
||||
if msg.ConversationID == nil {
|
||||
client.SendJSON(NewErrorResponse("conversation_id is required"))
|
||||
return
|
||||
}
|
||||
|
||||
if !h.rateLimiter.Allow(client.UserID, "typing") {
|
||||
return
|
||||
}
|
||||
|
||||
isTyping := true
|
||||
if msg.IsTyping != nil {
|
||||
isTyping = *msg.IsTyping
|
||||
}
|
||||
|
||||
outgoing := NewUserTypingResponse(*msg.ConversationID, client.UserID, isTyping)
|
||||
data, _ := json.Marshal(outgoing)
|
||||
h.hub.BroadcastToRoom(*msg.ConversationID, data, client)
|
||||
|
||||
client.SendJSON(NewActionConfirmedResponse("typing_indicator", true))
|
||||
}
|
||||
|
||||
func (h *MessageHandler) HandleMarkAsRead(ctx context.Context, client *Client, msg *IncomingMessage) {
|
||||
if msg.ConversationID == nil || msg.MessageID == nil {
|
||||
client.SendJSON(NewErrorResponse("conversation_id and message_id are required"))
|
||||
return
|
||||
}
|
||||
|
||||
receipt := &models.ReadReceipt{
|
||||
ID: uuid.New(),
|
||||
UserID: client.UserID,
|
||||
MessageID: *msg.MessageID,
|
||||
ReadAt: time.Now(),
|
||||
}
|
||||
|
||||
if err := h.readRepo.MarkRead(ctx, receipt); err != nil {
|
||||
h.logger.Error("Failed to mark as read", zap.Error(err))
|
||||
client.SendJSON(NewErrorResponse("failed to mark as read"))
|
||||
return
|
||||
}
|
||||
|
||||
client.SendJSON(NewActionConfirmedResponse("marked_as_read", true))
|
||||
|
||||
outgoing := NewMessageReadResponse(*msg.MessageID, client.UserID, *msg.ConversationID, receipt.ReadAt)
|
||||
data, _ := json.Marshal(outgoing)
|
||||
h.hub.BroadcastToRoom(*msg.ConversationID, data, client)
|
||||
|
||||
if h.pubsub != nil {
|
||||
_ = h.pubsub.Publish(ctx, *msg.ConversationID, data)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *MessageHandler) HandleDelivered(ctx context.Context, client *Client, msg *IncomingMessage) {
|
||||
if msg.ConversationID == nil || msg.MessageID == nil {
|
||||
client.SendJSON(NewErrorResponse("conversation_id and message_id are required"))
|
||||
return
|
||||
}
|
||||
|
||||
status := &models.DeliveredStatus{
|
||||
ID: uuid.New(),
|
||||
UserID: client.UserID,
|
||||
MessageID: *msg.MessageID,
|
||||
DeliveredAt: time.Now(),
|
||||
}
|
||||
|
||||
if err := h.deliveredRepo.MarkDelivered(ctx, status); err != nil {
|
||||
h.logger.Error("Failed to mark as delivered", zap.Error(err))
|
||||
client.SendJSON(NewErrorResponse("failed to mark as delivered"))
|
||||
return
|
||||
}
|
||||
|
||||
client.SendJSON(NewActionConfirmedResponse("marked_as_delivered", true))
|
||||
|
||||
outgoing := NewMessageDeliveredResponse(*msg.MessageID, client.UserID, *msg.ConversationID, status.DeliveredAt)
|
||||
data, _ := json.Marshal(outgoing)
|
||||
h.hub.BroadcastToRoom(*msg.ConversationID, data, client)
|
||||
|
||||
if h.pubsub != nil {
|
||||
_ = h.pubsub.Publish(ctx, *msg.ConversationID, data)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *MessageHandler) HandleAddReaction(ctx context.Context, client *Client, msg *IncomingMessage) {
|
||||
if msg.MessageID == nil || msg.ConversationID == nil {
|
||||
client.SendJSON(NewErrorResponse("message_id and conversation_id are required"))
|
||||
return
|
||||
}
|
||||
if msg.Emoji == "" {
|
||||
client.SendJSON(NewErrorResponse("emoji is required"))
|
||||
return
|
||||
}
|
||||
|
||||
reaction := &models.MessageReaction{
|
||||
ID: uuid.New(),
|
||||
UserID: client.UserID,
|
||||
MessageID: *msg.MessageID,
|
||||
Emoji: msg.Emoji,
|
||||
CreatedAt: time.Now(),
|
||||
}
|
||||
|
||||
if err := h.reactionRepo.Add(ctx, reaction); err != nil {
|
||||
h.logger.Error("Failed to add reaction", zap.Error(err))
|
||||
client.SendJSON(NewErrorResponse("failed to add reaction"))
|
||||
return
|
||||
}
|
||||
|
||||
client.SendJSON(NewActionConfirmedResponse("reaction_added", true))
|
||||
|
||||
outgoing := NewReactionAddedResponse(*msg.MessageID, *msg.ConversationID, client.UserID, msg.Emoji)
|
||||
data, _ := json.Marshal(outgoing)
|
||||
h.hub.BroadcastToRoom(*msg.ConversationID, data, nil)
|
||||
|
||||
if h.pubsub != nil {
|
||||
_ = h.pubsub.Publish(ctx, *msg.ConversationID, data)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *MessageHandler) HandleRemoveReaction(ctx context.Context, client *Client, msg *IncomingMessage) {
|
||||
if msg.MessageID == nil || msg.ConversationID == nil {
|
||||
client.SendJSON(NewErrorResponse("message_id and conversation_id are required"))
|
||||
return
|
||||
}
|
||||
|
||||
if err := h.reactionRepo.Remove(ctx, client.UserID, *msg.MessageID); err != nil {
|
||||
h.logger.Error("Failed to remove reaction", zap.Error(err))
|
||||
client.SendJSON(NewErrorResponse("failed to remove reaction"))
|
||||
return
|
||||
}
|
||||
|
||||
client.SendJSON(NewActionConfirmedResponse("reaction_removed", true))
|
||||
|
||||
outgoing := NewReactionRemovedResponse(*msg.MessageID, *msg.ConversationID, client.UserID)
|
||||
data, _ := json.Marshal(outgoing)
|
||||
h.hub.BroadcastToRoom(*msg.ConversationID, data, nil)
|
||||
|
||||
if h.pubsub != nil {
|
||||
_ = h.pubsub.Publish(ctx, *msg.ConversationID, data)
|
||||
}
|
||||
}
|
||||
42
veza-backend-api/internal/websocket/chat/handler_rooms.go
Normal file
42
veza-backend-api/internal/websocket/chat/handler_rooms.go
Normal file
|
|
@ -0,0 +1,42 @@
|
|||
package chat
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (h *MessageHandler) HandleJoinConversation(ctx context.Context, client *Client, msg *IncomingMessage) {
|
||||
if msg.ConversationID == nil {
|
||||
client.SendJSON(NewErrorResponse("conversation_id is required"))
|
||||
return
|
||||
}
|
||||
|
||||
if !h.permissions.CanJoin(ctx, client.UserID, *msg.ConversationID) {
|
||||
client.SendJSON(NewErrorResponse("not allowed to join this conversation"))
|
||||
return
|
||||
}
|
||||
|
||||
h.hub.JoinRoom(client, *msg.ConversationID)
|
||||
|
||||
h.logger.Info("User joined conversation",
|
||||
zap.String("user_id", client.UserID.String()),
|
||||
zap.String("conversation_id", msg.ConversationID.String()))
|
||||
|
||||
client.SendJSON(NewActionConfirmedResponse("joined_conversation", true))
|
||||
}
|
||||
|
||||
func (h *MessageHandler) HandleLeaveConversation(ctx context.Context, client *Client, msg *IncomingMessage) {
|
||||
if msg.ConversationID == nil {
|
||||
client.SendJSON(NewErrorResponse("conversation_id is required"))
|
||||
return
|
||||
}
|
||||
|
||||
h.hub.LeaveRoom(client, *msg.ConversationID)
|
||||
|
||||
h.logger.Info("User left conversation",
|
||||
zap.String("user_id", client.UserID.String()),
|
||||
zap.String("conversation_id", msg.ConversationID.String()))
|
||||
|
||||
client.SendJSON(NewActionConfirmedResponse("left_conversation", true))
|
||||
}
|
||||
68
veza-backend-api/internal/websocket/chat/permissions.go
Normal file
68
veza-backend-api/internal/websocket/chat/permissions.go
Normal file
|
|
@ -0,0 +1,68 @@
|
|||
package chat
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"go.uber.org/zap"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
type PermissionService struct {
|
||||
db *gorm.DB
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
func NewPermissionService(db *gorm.DB, logger *zap.Logger) *PermissionService {
|
||||
if logger == nil {
|
||||
logger = zap.NewNop()
|
||||
}
|
||||
return &PermissionService{
|
||||
db: db,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
type roomMemberCheck struct {
|
||||
Role string `gorm:"column:role"`
|
||||
IsMuted bool `gorm:"column:is_muted"`
|
||||
}
|
||||
|
||||
func (p *PermissionService) getMembership(ctx context.Context, userID, roomID uuid.UUID) (*roomMemberCheck, bool) {
|
||||
var member roomMemberCheck
|
||||
err := p.db.WithContext(ctx).
|
||||
Table("room_members").
|
||||
Select("role, is_muted").
|
||||
Where("user_id = ? AND room_id = ?", userID, roomID).
|
||||
First(&member).Error
|
||||
if err != nil {
|
||||
return nil, false
|
||||
}
|
||||
return &member, true
|
||||
}
|
||||
|
||||
func (p *PermissionService) CanRead(ctx context.Context, userID, roomID uuid.UUID) bool {
|
||||
_, isMember := p.getMembership(ctx, userID, roomID)
|
||||
return isMember
|
||||
}
|
||||
|
||||
func (p *PermissionService) CanSend(ctx context.Context, userID, roomID uuid.UUID) bool {
|
||||
member, isMember := p.getMembership(ctx, userID, roomID)
|
||||
if !isMember {
|
||||
return false
|
||||
}
|
||||
return !member.IsMuted
|
||||
}
|
||||
|
||||
func (p *PermissionService) CanJoin(ctx context.Context, userID, roomID uuid.UUID) bool {
|
||||
_, isMember := p.getMembership(ctx, userID, roomID)
|
||||
return isMember
|
||||
}
|
||||
|
||||
func (p *PermissionService) CanModerate(ctx context.Context, userID, roomID uuid.UUID) bool {
|
||||
member, isMember := p.getMembership(ctx, userID, roomID)
|
||||
if !isMember {
|
||||
return false
|
||||
}
|
||||
return member.Role == "admin" || member.Role == "moderator" || member.Role == "owner"
|
||||
}
|
||||
62
veza-backend-api/internal/websocket/chat/rate_limiter.go
Normal file
62
veza-backend-api/internal/websocket/chat/rate_limiter.go
Normal file
|
|
@ -0,0 +1,62 @@
|
|||
package chat
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
type RateLimiter struct {
|
||||
limits map[string]rateConfig
|
||||
entries map[string]*rateBucket
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
type rateConfig struct {
|
||||
maxRequests int
|
||||
window time.Duration
|
||||
}
|
||||
|
||||
type rateBucket struct {
|
||||
count int
|
||||
windowAt time.Time
|
||||
}
|
||||
|
||||
func NewRateLimiter() *RateLimiter {
|
||||
return &RateLimiter{
|
||||
limits: map[string]rateConfig{
|
||||
"send_message": {maxRequests: 10, window: time.Second},
|
||||
"typing": {maxRequests: 5, window: time.Second},
|
||||
"search": {maxRequests: 2, window: time.Second},
|
||||
"fetch_history": {maxRequests: 5, window: time.Second},
|
||||
},
|
||||
entries: make(map[string]*rateBucket),
|
||||
}
|
||||
}
|
||||
|
||||
func (rl *RateLimiter) Allow(userID uuid.UUID, action string) bool {
|
||||
cfg, ok := rl.limits[action]
|
||||
if !ok {
|
||||
return true
|
||||
}
|
||||
|
||||
key := userID.String() + ":" + action
|
||||
now := time.Now()
|
||||
|
||||
rl.mu.Lock()
|
||||
defer rl.mu.Unlock()
|
||||
|
||||
bucket, exists := rl.entries[key]
|
||||
if !exists || now.Sub(bucket.windowAt) > cfg.window {
|
||||
rl.entries[key] = &rateBucket{count: 1, windowAt: now}
|
||||
return true
|
||||
}
|
||||
|
||||
if bucket.count >= cfg.maxRequests {
|
||||
return false
|
||||
}
|
||||
|
||||
bucket.count++
|
||||
return true
|
||||
}
|
||||
Loading…
Reference in a new issue