From c7fb240dc3a27be3b0c509d05c5cf6b8a9fad68b Mon Sep 17 00:00:00 2001 From: senke Date: Sun, 22 Feb 2026 20:43:44 +0100 Subject: [PATCH] feat(chat): Sprint 3 -- message handlers, real-time features, permissions - Implement full MessageHandler dispatch with all 18 incoming message types - Add handler_messages.go: SendMessage, EditMessage, DeleteMessage with ownership checks - Add handler_rooms.go: JoinConversation, LeaveConversation - Add handler_history.go: FetchHistory (cursor-based), SearchMessages (ILIKE), SyncMessages - Add handler_realtime.go: Typing, MarkAsRead, Delivered, AddReaction, RemoveReaction - Add handler_calls.go: WebRTC signaling relay (CallOffer/Answer/ICE/Hangup/Reject) - Add PermissionService: CanRead/CanSend/CanJoin/CanModerate based on room_members - Add RateLimiter: per-user per-action sliding window (in-memory) - Wire all dependencies in router.go setupChatWebSocket --- veza-backend-api/internal/api/router.go | 14 +- .../internal/websocket/chat/handler.go | 76 ++++++- .../internal/websocket/chat/handler_calls.go | 80 ++++++++ .../websocket/chat/handler_history.go | 189 ++++++++++++++++++ .../websocket/chat/handler_messages.go | 170 ++++++++++++++++ .../websocket/chat/handler_realtime.go | 152 ++++++++++++++ .../internal/websocket/chat/handler_rooms.go | 42 ++++ .../internal/websocket/chat/permissions.go | 68 +++++++ .../internal/websocket/chat/rate_limiter.go | 62 ++++++ 9 files changed, 849 insertions(+), 4 deletions(-) create mode 100644 veza-backend-api/internal/websocket/chat/handler_calls.go create mode 100644 veza-backend-api/internal/websocket/chat/handler_history.go create mode 100644 veza-backend-api/internal/websocket/chat/handler_messages.go create mode 100644 veza-backend-api/internal/websocket/chat/handler_realtime.go create mode 100644 veza-backend-api/internal/websocket/chat/handler_rooms.go create mode 100644 veza-backend-api/internal/websocket/chat/permissions.go create mode 100644 veza-backend-api/internal/websocket/chat/rate_limiter.go diff --git a/veza-backend-api/internal/api/router.go b/veza-backend-api/internal/api/router.go index 7c80d2467..3fc0fe815 100644 --- a/veza-backend-api/internal/api/router.go +++ b/veza-backend-api/internal/api/router.go @@ -327,7 +327,19 @@ func (r *APIRouter) setupChatWebSocket(router *gin.RouterGroup) { hub := chatws.NewHub(r.logger) go hub.Run() - msgHandler := chatws.NewMessageHandler(r.logger) + msgRepo := repositories.NewChatMessageRepository(r.db.GormDB) + readRepo := repositories.NewReadReceiptRepository(r.db.GormDB) + deliveredRepo := repositories.NewDeliveredStatusRepository(r.db.GormDB) + reactionRepo := repositories.NewReactionRepository(r.db.GormDB) + + pubsub := services.NewChatPubSubService(r.config.RedisClient, r.logger) + permissions := chatws.NewPermissionService(r.db.GormDB, r.logger) + rateLimiter := chatws.NewRateLimiter() + + msgHandler := chatws.NewMessageHandler( + hub, msgRepo, readRepo, deliveredRepo, reactionRepo, + pubsub, permissions, rateLimiter, r.logger, + ) wsHandler := handlers.NewChatWebSocketHandler(chatService, hub, msgHandler, r.logger) diff --git a/veza-backend-api/internal/websocket/chat/handler.go b/veza-backend-api/internal/websocket/chat/handler.go index 4d2ff6163..c203db7e1 100644 --- a/veza-backend-api/internal/websocket/chat/handler.go +++ b/veza-backend-api/internal/websocket/chat/handler.go @@ -3,19 +3,48 @@ package chat import ( "context" + "veza-backend-api/internal/repositories" + "veza-backend-api/internal/services" + "go.uber.org/zap" ) type MessageHandler struct { - logger *zap.Logger + msgRepo *repositories.ChatMessageRepository + readRepo *repositories.ReadReceiptRepository + deliveredRepo *repositories.DeliveredStatusRepository + reactionRepo *repositories.ReactionRepository + pubsub *services.ChatPubSubService + permissions *PermissionService + rateLimiter *RateLimiter + hub *Hub + logger *zap.Logger } -func NewMessageHandler(logger *zap.Logger) *MessageHandler { +func NewMessageHandler( + hub *Hub, + msgRepo *repositories.ChatMessageRepository, + readRepo *repositories.ReadReceiptRepository, + deliveredRepo *repositories.DeliveredStatusRepository, + reactionRepo *repositories.ReactionRepository, + pubsub *services.ChatPubSubService, + permissions *PermissionService, + rateLimiter *RateLimiter, + logger *zap.Logger, +) *MessageHandler { if logger == nil { logger = zap.NewNop() } return &MessageHandler{ - logger: logger, + hub: hub, + msgRepo: msgRepo, + readRepo: readRepo, + deliveredRepo: deliveredRepo, + reactionRepo: reactionRepo, + pubsub: pubsub, + permissions: permissions, + rateLimiter: rateLimiter, + logger: logger, } } @@ -24,6 +53,47 @@ func (h *MessageHandler) Dispatch(ctx context.Context, client *Client, msg *Inco case TypePing: client.SendJSON(NewPongResponse()) + case TypeSendMessage: + h.HandleSendMessage(ctx, client, msg) + case TypeEditMessage: + h.HandleEditMessage(ctx, client, msg) + case TypeDeleteMessage: + h.HandleDeleteMessage(ctx, client, msg) + + case TypeJoinConversation: + h.HandleJoinConversation(ctx, client, msg) + case TypeLeaveConversation: + h.HandleLeaveConversation(ctx, client, msg) + + case TypeFetchHistory: + h.HandleFetchHistory(ctx, client, msg) + case TypeSearchMessages: + h.HandleSearchMessages(ctx, client, msg) + case TypeSyncMessages: + h.HandleSyncMessages(ctx, client, msg) + + case TypeTyping: + h.HandleTyping(ctx, client, msg) + case TypeMarkAsRead: + h.HandleMarkAsRead(ctx, client, msg) + case TypeDelivered: + h.HandleDelivered(ctx, client, msg) + case TypeAddReaction: + h.HandleAddReaction(ctx, client, msg) + case TypeRemoveReaction: + h.HandleRemoveReaction(ctx, client, msg) + + case TypeCallOffer: + h.HandleCallOffer(ctx, client, msg) + case TypeCallAnswer: + h.HandleCallAnswer(ctx, client, msg) + case TypeICECandidate: + h.HandleICECandidate(ctx, client, msg) + case TypeCallHangup: + h.HandleCallHangup(ctx, client, msg) + case TypeCallReject: + h.HandleCallReject(ctx, client, msg) + default: h.logger.Warn("Unknown message type", zap.String("type", msg.Type), diff --git a/veza-backend-api/internal/websocket/chat/handler_calls.go b/veza-backend-api/internal/websocket/chat/handler_calls.go new file mode 100644 index 000000000..d8b962f20 --- /dev/null +++ b/veza-backend-api/internal/websocket/chat/handler_calls.go @@ -0,0 +1,80 @@ +package chat + +import ( + "context" + "encoding/json" + + "go.uber.org/zap" +) + +func (h *MessageHandler) HandleCallOffer(ctx context.Context, client *Client, msg *IncomingMessage) { + if msg.ConversationID == nil || msg.TargetUserID == nil { + client.SendJSON(NewErrorResponse("conversation_id and target_user_id are required")) + return + } + if msg.SDP == "" { + client.SendJSON(NewErrorResponse("sdp is required")) + return + } + + outgoing := NewCallOfferResponse(*msg.ConversationID, client.UserID, msg.SDP, msg.CallType) + data, _ := json.Marshal(outgoing) + h.hub.SendToUser(*msg.TargetUserID, data) + + h.logger.Info("Call offer relayed", + zap.String("from", client.UserID.String()), + zap.String("to", msg.TargetUserID.String()), + zap.String("type", msg.CallType)) +} + +func (h *MessageHandler) HandleCallAnswer(ctx context.Context, client *Client, msg *IncomingMessage) { + if msg.ConversationID == nil || msg.CallerUserID == nil { + client.SendJSON(NewErrorResponse("conversation_id and caller_user_id are required")) + return + } + if msg.SDP == "" { + client.SendJSON(NewErrorResponse("sdp is required")) + return + } + + outgoing := NewCallAnswerResponse(*msg.ConversationID, *msg.CallerUserID, client.UserID, msg.SDP) + data, _ := json.Marshal(outgoing) + h.hub.SendToUser(*msg.CallerUserID, data) +} + +func (h *MessageHandler) HandleICECandidate(ctx context.Context, client *Client, msg *IncomingMessage) { + if msg.ConversationID == nil || msg.TargetUserID == nil { + client.SendJSON(NewErrorResponse("conversation_id and target_user_id are required")) + return + } + if msg.Candidate == "" { + client.SendJSON(NewErrorResponse("candidate is required")) + return + } + + outgoing := NewICECandidateResponse(*msg.ConversationID, client.UserID, msg.Candidate) + data, _ := json.Marshal(outgoing) + h.hub.SendToUser(*msg.TargetUserID, data) +} + +func (h *MessageHandler) HandleCallHangup(ctx context.Context, client *Client, msg *IncomingMessage) { + if msg.ConversationID == nil || msg.TargetUserID == nil { + client.SendJSON(NewErrorResponse("conversation_id and target_user_id are required")) + return + } + + outgoing := NewCallHangupResponse(*msg.ConversationID, client.UserID) + data, _ := json.Marshal(outgoing) + h.hub.SendToUser(*msg.TargetUserID, data) +} + +func (h *MessageHandler) HandleCallReject(ctx context.Context, client *Client, msg *IncomingMessage) { + if msg.ConversationID == nil || msg.CallerUserID == nil { + client.SendJSON(NewErrorResponse("conversation_id and caller_user_id are required")) + return + } + + outgoing := NewCallRejectedResponse(*msg.ConversationID, client.UserID) + data, _ := json.Marshal(outgoing) + h.hub.SendToUser(*msg.CallerUserID, data) +} diff --git a/veza-backend-api/internal/websocket/chat/handler_history.go b/veza-backend-api/internal/websocket/chat/handler_history.go new file mode 100644 index 000000000..4f99a0b25 --- /dev/null +++ b/veza-backend-api/internal/websocket/chat/handler_history.go @@ -0,0 +1,189 @@ +package chat + +import ( + "context" + "time" + + "go.uber.org/zap" +) + +const ( + defaultHistoryLimit = 50 + maxHistoryLimit = 100 +) + +func (h *MessageHandler) HandleFetchHistory(ctx context.Context, client *Client, msg *IncomingMessage) { + if msg.ConversationID == nil { + client.SendJSON(NewErrorResponse("conversation_id is required")) + return + } + + if !h.rateLimiter.Allow(client.UserID, "fetch_history") { + client.SendJSON(NewErrorResponse("rate limit exceeded")) + return + } + + if !h.permissions.CanRead(ctx, client.UserID, *msg.ConversationID) { + client.SendJSON(NewErrorResponse("not allowed to read this conversation")) + return + } + + limit := defaultHistoryLimit + if msg.Limit != nil && *msg.Limit > 0 { + limit = *msg.Limit + if limit > maxHistoryLimit { + limit = maxHistoryLimit + } + } + + messages, err := h.msgRepo.GetConversationMessages(ctx, *msg.ConversationID, limit+1, 0) + if err != nil { + h.logger.Error("Failed to fetch history", zap.Error(err)) + client.SendJSON(NewErrorResponse("failed to fetch history")) + return + } + + hasMoreBefore := len(messages) > limit + if hasMoreBefore { + messages = messages[:limit] + } + + dtos := make([]MessageDTO, 0, len(messages)) + for _, m := range messages { + dtos = append(dtos, MessageDTO{ + ID: m.ID, + ConversationID: m.ConversationID, + SenderID: m.SenderID, + Content: m.Content, + MessageType: m.MessageType, + ParentMessageID: m.ParentMessageID, + ReplyToID: m.ReplyToID, + IsPinned: m.IsPinned, + IsEdited: m.IsEdited, + IsDeleted: m.IsDeleted, + EditedAt: m.EditedAt, + Status: m.Status, + Metadata: m.Metadata, + CreatedAt: m.CreatedAt, + UpdatedAt: m.UpdatedAt, + }) + } + + client.SendJSON(NewHistoryChunkResponse(*msg.ConversationID, dtos, hasMoreBefore, false)) +} + +func (h *MessageHandler) HandleSearchMessages(ctx context.Context, client *Client, msg *IncomingMessage) { + if msg.ConversationID == nil { + client.SendJSON(NewErrorResponse("conversation_id is required")) + return + } + if msg.Query == "" { + client.SendJSON(NewErrorResponse("query is required")) + return + } + + if !h.rateLimiter.Allow(client.UserID, "search") { + client.SendJSON(NewErrorResponse("rate limit exceeded")) + return + } + + if !h.permissions.CanRead(ctx, client.UserID, *msg.ConversationID) { + client.SendJSON(NewErrorResponse("not allowed to read this conversation")) + return + } + + limit := defaultHistoryLimit + if msg.Limit != nil && *msg.Limit > 0 { + limit = *msg.Limit + if limit > maxHistoryLimit { + limit = maxHistoryLimit + } + } + + offset := 0 + if msg.Offset != nil && *msg.Offset > 0 { + offset = *msg.Offset + } + + messages, total, err := h.msgRepo.Search(ctx, *msg.ConversationID, msg.Query, limit, offset) + if err != nil { + h.logger.Error("Failed to search messages", zap.Error(err)) + client.SendJSON(NewErrorResponse("failed to search messages")) + return + } + + dtos := make([]MessageDTO, 0, len(messages)) + for _, m := range messages { + dtos = append(dtos, MessageDTO{ + ID: m.ID, + ConversationID: m.ConversationID, + SenderID: m.SenderID, + Content: m.Content, + MessageType: m.MessageType, + ParentMessageID: m.ParentMessageID, + ReplyToID: m.ReplyToID, + IsPinned: m.IsPinned, + IsEdited: m.IsEdited, + IsDeleted: m.IsDeleted, + EditedAt: m.EditedAt, + Status: m.Status, + Metadata: m.Metadata, + CreatedAt: m.CreatedAt, + UpdatedAt: m.UpdatedAt, + }) + } + + client.SendJSON(NewSearchResultsResponse(*msg.ConversationID, dtos, msg.Query, total)) +} + +func (h *MessageHandler) HandleSyncMessages(ctx context.Context, client *Client, msg *IncomingMessage) { + if msg.ConversationID == nil { + client.SendJSON(NewErrorResponse("conversation_id is required")) + return + } + if msg.Since == nil { + client.SendJSON(NewErrorResponse("since is required")) + return + } + + if !h.permissions.CanRead(ctx, client.UserID, *msg.ConversationID) { + client.SendJSON(NewErrorResponse("not allowed to read this conversation")) + return + } + + since, err := time.Parse(time.RFC3339, *msg.Since) + if err != nil { + client.SendJSON(NewErrorResponse("invalid since format, expected RFC3339")) + return + } + + messages, err := h.msgRepo.GetMessagesSince(ctx, *msg.ConversationID, since, maxHistoryLimit) + if err != nil { + h.logger.Error("Failed to sync messages", zap.Error(err)) + client.SendJSON(NewErrorResponse("failed to sync messages")) + return + } + + dtos := make([]MessageDTO, 0, len(messages)) + for _, m := range messages { + dtos = append(dtos, MessageDTO{ + ID: m.ID, + ConversationID: m.ConversationID, + SenderID: m.SenderID, + Content: m.Content, + MessageType: m.MessageType, + ParentMessageID: m.ParentMessageID, + ReplyToID: m.ReplyToID, + IsPinned: m.IsPinned, + IsEdited: m.IsEdited, + IsDeleted: m.IsDeleted, + EditedAt: m.EditedAt, + Status: m.Status, + Metadata: m.Metadata, + CreatedAt: m.CreatedAt, + UpdatedAt: m.UpdatedAt, + }) + } + + client.SendJSON(NewSyncChunkResponse(*msg.ConversationID, dtos, time.Now())) +} diff --git a/veza-backend-api/internal/websocket/chat/handler_messages.go b/veza-backend-api/internal/websocket/chat/handler_messages.go new file mode 100644 index 000000000..7547c203c --- /dev/null +++ b/veza-backend-api/internal/websocket/chat/handler_messages.go @@ -0,0 +1,170 @@ +package chat + +import ( + "context" + "encoding/json" + "time" + + "veza-backend-api/internal/models" + + "github.com/google/uuid" + "go.uber.org/zap" +) + +func (h *MessageHandler) HandleSendMessage(ctx context.Context, client *Client, msg *IncomingMessage) { + if msg.ConversationID == nil { + client.SendJSON(NewErrorResponse("conversation_id is required")) + return + } + if msg.Content == "" { + client.SendJSON(NewErrorResponse("content is required")) + return + } + + if !h.rateLimiter.Allow(client.UserID, "send_message") { + client.SendJSON(NewErrorResponse("rate limit exceeded")) + return + } + + if !h.permissions.CanSend(ctx, client.UserID, *msg.ConversationID) { + client.SendJSON(NewErrorResponse("not allowed to send messages in this conversation")) + return + } + + var metadata []byte + if len(msg.Attachments) > 0 { + metadata, _ = json.Marshal(map[string]interface{}{ + "attachments": msg.Attachments, + }) + } + + chatMsg := &models.ChatMessage{ + ID: uuid.New(), + ConversationID: *msg.ConversationID, + SenderID: client.UserID, + Content: msg.Content, + MessageType: "text", + ParentMessageID: msg.ParentMessageID, + Status: "sent", + Metadata: metadata, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + + if err := h.msgRepo.Create(ctx, chatMsg); err != nil { + h.logger.Error("Failed to save message", + zap.Error(err), + zap.String("user_id", client.UserID.String())) + client.SendJSON(NewErrorResponse("failed to send message")) + return + } + + client.SendJSON(NewActionConfirmedResponse("message_sent", true)) + + outgoing := NewNewMessageResponse( + chatMsg.ConversationID, + chatMsg.ID, + chatMsg.SenderID, + chatMsg.Content, + chatMsg.CreatedAt, + msg.Attachments, + ) + + data, _ := json.Marshal(outgoing) + h.hub.BroadcastToRoom(*msg.ConversationID, data, nil) + + if h.pubsub != nil { + _ = h.pubsub.Publish(ctx, *msg.ConversationID, data) + } +} + +func (h *MessageHandler) HandleEditMessage(ctx context.Context, client *Client, msg *IncomingMessage) { + if msg.MessageID == nil || msg.ConversationID == nil { + client.SendJSON(NewErrorResponse("message_id and conversation_id are required")) + return + } + if msg.NewContent == "" { + client.SendJSON(NewErrorResponse("new_content is required")) + return + } + + chatMsg, err := h.msgRepo.GetByID(ctx, *msg.MessageID) + if err != nil { + client.SendJSON(NewErrorResponse("message not found")) + return + } + + if chatMsg.SenderID != client.UserID { + client.SendJSON(NewErrorResponse("can only edit your own messages")) + return + } + + now := time.Now() + chatMsg.Content = msg.NewContent + chatMsg.IsEdited = true + chatMsg.EditedAt = &now + + if err := h.msgRepo.Update(ctx, chatMsg); err != nil { + h.logger.Error("Failed to update message", zap.Error(err)) + client.SendJSON(NewErrorResponse("failed to edit message")) + return + } + + client.SendJSON(NewActionConfirmedResponse("message_edited", true)) + + outgoing := NewMessageEditedResponse( + chatMsg.ID, + chatMsg.ConversationID, + client.UserID, + now, + msg.NewContent, + ) + + data, _ := json.Marshal(outgoing) + h.hub.BroadcastToRoom(*msg.ConversationID, data, nil) + + if h.pubsub != nil { + _ = h.pubsub.Publish(ctx, *msg.ConversationID, data) + } +} + +func (h *MessageHandler) HandleDeleteMessage(ctx context.Context, client *Client, msg *IncomingMessage) { + if msg.MessageID == nil || msg.ConversationID == nil { + client.SendJSON(NewErrorResponse("message_id and conversation_id are required")) + return + } + + chatMsg, err := h.msgRepo.GetByID(ctx, *msg.MessageID) + if err != nil { + client.SendJSON(NewErrorResponse("message not found")) + return + } + + isModerator := h.permissions.CanModerate(ctx, client.UserID, *msg.ConversationID) + if chatMsg.SenderID != client.UserID && !isModerator { + client.SendJSON(NewErrorResponse("can only delete your own messages")) + return + } + + if err := h.msgRepo.SoftDelete(ctx, *msg.MessageID); err != nil { + h.logger.Error("Failed to delete message", zap.Error(err)) + client.SendJSON(NewErrorResponse("failed to delete message")) + return + } + + client.SendJSON(NewActionConfirmedResponse("message_deleted", true)) + + outgoing := NewMessageDeletedResponse( + *msg.MessageID, + *msg.ConversationID, + client.UserID, + time.Now(), + ) + + data, _ := json.Marshal(outgoing) + h.hub.BroadcastToRoom(*msg.ConversationID, data, nil) + + if h.pubsub != nil { + _ = h.pubsub.Publish(ctx, *msg.ConversationID, data) + } +} diff --git a/veza-backend-api/internal/websocket/chat/handler_realtime.go b/veza-backend-api/internal/websocket/chat/handler_realtime.go new file mode 100644 index 000000000..4aa6bd15c --- /dev/null +++ b/veza-backend-api/internal/websocket/chat/handler_realtime.go @@ -0,0 +1,152 @@ +package chat + +import ( + "context" + "encoding/json" + "time" + + "veza-backend-api/internal/models" + + "github.com/google/uuid" + "go.uber.org/zap" +) + +func (h *MessageHandler) HandleTyping(ctx context.Context, client *Client, msg *IncomingMessage) { + if msg.ConversationID == nil { + client.SendJSON(NewErrorResponse("conversation_id is required")) + return + } + + if !h.rateLimiter.Allow(client.UserID, "typing") { + return + } + + isTyping := true + if msg.IsTyping != nil { + isTyping = *msg.IsTyping + } + + outgoing := NewUserTypingResponse(*msg.ConversationID, client.UserID, isTyping) + data, _ := json.Marshal(outgoing) + h.hub.BroadcastToRoom(*msg.ConversationID, data, client) + + client.SendJSON(NewActionConfirmedResponse("typing_indicator", true)) +} + +func (h *MessageHandler) HandleMarkAsRead(ctx context.Context, client *Client, msg *IncomingMessage) { + if msg.ConversationID == nil || msg.MessageID == nil { + client.SendJSON(NewErrorResponse("conversation_id and message_id are required")) + return + } + + receipt := &models.ReadReceipt{ + ID: uuid.New(), + UserID: client.UserID, + MessageID: *msg.MessageID, + ReadAt: time.Now(), + } + + if err := h.readRepo.MarkRead(ctx, receipt); err != nil { + h.logger.Error("Failed to mark as read", zap.Error(err)) + client.SendJSON(NewErrorResponse("failed to mark as read")) + return + } + + client.SendJSON(NewActionConfirmedResponse("marked_as_read", true)) + + outgoing := NewMessageReadResponse(*msg.MessageID, client.UserID, *msg.ConversationID, receipt.ReadAt) + data, _ := json.Marshal(outgoing) + h.hub.BroadcastToRoom(*msg.ConversationID, data, client) + + if h.pubsub != nil { + _ = h.pubsub.Publish(ctx, *msg.ConversationID, data) + } +} + +func (h *MessageHandler) HandleDelivered(ctx context.Context, client *Client, msg *IncomingMessage) { + if msg.ConversationID == nil || msg.MessageID == nil { + client.SendJSON(NewErrorResponse("conversation_id and message_id are required")) + return + } + + status := &models.DeliveredStatus{ + ID: uuid.New(), + UserID: client.UserID, + MessageID: *msg.MessageID, + DeliveredAt: time.Now(), + } + + if err := h.deliveredRepo.MarkDelivered(ctx, status); err != nil { + h.logger.Error("Failed to mark as delivered", zap.Error(err)) + client.SendJSON(NewErrorResponse("failed to mark as delivered")) + return + } + + client.SendJSON(NewActionConfirmedResponse("marked_as_delivered", true)) + + outgoing := NewMessageDeliveredResponse(*msg.MessageID, client.UserID, *msg.ConversationID, status.DeliveredAt) + data, _ := json.Marshal(outgoing) + h.hub.BroadcastToRoom(*msg.ConversationID, data, client) + + if h.pubsub != nil { + _ = h.pubsub.Publish(ctx, *msg.ConversationID, data) + } +} + +func (h *MessageHandler) HandleAddReaction(ctx context.Context, client *Client, msg *IncomingMessage) { + if msg.MessageID == nil || msg.ConversationID == nil { + client.SendJSON(NewErrorResponse("message_id and conversation_id are required")) + return + } + if msg.Emoji == "" { + client.SendJSON(NewErrorResponse("emoji is required")) + return + } + + reaction := &models.MessageReaction{ + ID: uuid.New(), + UserID: client.UserID, + MessageID: *msg.MessageID, + Emoji: msg.Emoji, + CreatedAt: time.Now(), + } + + if err := h.reactionRepo.Add(ctx, reaction); err != nil { + h.logger.Error("Failed to add reaction", zap.Error(err)) + client.SendJSON(NewErrorResponse("failed to add reaction")) + return + } + + client.SendJSON(NewActionConfirmedResponse("reaction_added", true)) + + outgoing := NewReactionAddedResponse(*msg.MessageID, *msg.ConversationID, client.UserID, msg.Emoji) + data, _ := json.Marshal(outgoing) + h.hub.BroadcastToRoom(*msg.ConversationID, data, nil) + + if h.pubsub != nil { + _ = h.pubsub.Publish(ctx, *msg.ConversationID, data) + } +} + +func (h *MessageHandler) HandleRemoveReaction(ctx context.Context, client *Client, msg *IncomingMessage) { + if msg.MessageID == nil || msg.ConversationID == nil { + client.SendJSON(NewErrorResponse("message_id and conversation_id are required")) + return + } + + if err := h.reactionRepo.Remove(ctx, client.UserID, *msg.MessageID); err != nil { + h.logger.Error("Failed to remove reaction", zap.Error(err)) + client.SendJSON(NewErrorResponse("failed to remove reaction")) + return + } + + client.SendJSON(NewActionConfirmedResponse("reaction_removed", true)) + + outgoing := NewReactionRemovedResponse(*msg.MessageID, *msg.ConversationID, client.UserID) + data, _ := json.Marshal(outgoing) + h.hub.BroadcastToRoom(*msg.ConversationID, data, nil) + + if h.pubsub != nil { + _ = h.pubsub.Publish(ctx, *msg.ConversationID, data) + } +} diff --git a/veza-backend-api/internal/websocket/chat/handler_rooms.go b/veza-backend-api/internal/websocket/chat/handler_rooms.go new file mode 100644 index 000000000..5d770e675 --- /dev/null +++ b/veza-backend-api/internal/websocket/chat/handler_rooms.go @@ -0,0 +1,42 @@ +package chat + +import ( + "context" + + "go.uber.org/zap" +) + +func (h *MessageHandler) HandleJoinConversation(ctx context.Context, client *Client, msg *IncomingMessage) { + if msg.ConversationID == nil { + client.SendJSON(NewErrorResponse("conversation_id is required")) + return + } + + if !h.permissions.CanJoin(ctx, client.UserID, *msg.ConversationID) { + client.SendJSON(NewErrorResponse("not allowed to join this conversation")) + return + } + + h.hub.JoinRoom(client, *msg.ConversationID) + + h.logger.Info("User joined conversation", + zap.String("user_id", client.UserID.String()), + zap.String("conversation_id", msg.ConversationID.String())) + + client.SendJSON(NewActionConfirmedResponse("joined_conversation", true)) +} + +func (h *MessageHandler) HandleLeaveConversation(ctx context.Context, client *Client, msg *IncomingMessage) { + if msg.ConversationID == nil { + client.SendJSON(NewErrorResponse("conversation_id is required")) + return + } + + h.hub.LeaveRoom(client, *msg.ConversationID) + + h.logger.Info("User left conversation", + zap.String("user_id", client.UserID.String()), + zap.String("conversation_id", msg.ConversationID.String())) + + client.SendJSON(NewActionConfirmedResponse("left_conversation", true)) +} diff --git a/veza-backend-api/internal/websocket/chat/permissions.go b/veza-backend-api/internal/websocket/chat/permissions.go new file mode 100644 index 000000000..41373a6e8 --- /dev/null +++ b/veza-backend-api/internal/websocket/chat/permissions.go @@ -0,0 +1,68 @@ +package chat + +import ( + "context" + + "github.com/google/uuid" + "go.uber.org/zap" + "gorm.io/gorm" +) + +type PermissionService struct { + db *gorm.DB + logger *zap.Logger +} + +func NewPermissionService(db *gorm.DB, logger *zap.Logger) *PermissionService { + if logger == nil { + logger = zap.NewNop() + } + return &PermissionService{ + db: db, + logger: logger, + } +} + +type roomMemberCheck struct { + Role string `gorm:"column:role"` + IsMuted bool `gorm:"column:is_muted"` +} + +func (p *PermissionService) getMembership(ctx context.Context, userID, roomID uuid.UUID) (*roomMemberCheck, bool) { + var member roomMemberCheck + err := p.db.WithContext(ctx). + Table("room_members"). + Select("role, is_muted"). + Where("user_id = ? AND room_id = ?", userID, roomID). + First(&member).Error + if err != nil { + return nil, false + } + return &member, true +} + +func (p *PermissionService) CanRead(ctx context.Context, userID, roomID uuid.UUID) bool { + _, isMember := p.getMembership(ctx, userID, roomID) + return isMember +} + +func (p *PermissionService) CanSend(ctx context.Context, userID, roomID uuid.UUID) bool { + member, isMember := p.getMembership(ctx, userID, roomID) + if !isMember { + return false + } + return !member.IsMuted +} + +func (p *PermissionService) CanJoin(ctx context.Context, userID, roomID uuid.UUID) bool { + _, isMember := p.getMembership(ctx, userID, roomID) + return isMember +} + +func (p *PermissionService) CanModerate(ctx context.Context, userID, roomID uuid.UUID) bool { + member, isMember := p.getMembership(ctx, userID, roomID) + if !isMember { + return false + } + return member.Role == "admin" || member.Role == "moderator" || member.Role == "owner" +} diff --git a/veza-backend-api/internal/websocket/chat/rate_limiter.go b/veza-backend-api/internal/websocket/chat/rate_limiter.go new file mode 100644 index 000000000..ae30a5a51 --- /dev/null +++ b/veza-backend-api/internal/websocket/chat/rate_limiter.go @@ -0,0 +1,62 @@ +package chat + +import ( + "sync" + "time" + + "github.com/google/uuid" +) + +type RateLimiter struct { + limits map[string]rateConfig + entries map[string]*rateBucket + mu sync.Mutex +} + +type rateConfig struct { + maxRequests int + window time.Duration +} + +type rateBucket struct { + count int + windowAt time.Time +} + +func NewRateLimiter() *RateLimiter { + return &RateLimiter{ + limits: map[string]rateConfig{ + "send_message": {maxRequests: 10, window: time.Second}, + "typing": {maxRequests: 5, window: time.Second}, + "search": {maxRequests: 2, window: time.Second}, + "fetch_history": {maxRequests: 5, window: time.Second}, + }, + entries: make(map[string]*rateBucket), + } +} + +func (rl *RateLimiter) Allow(userID uuid.UUID, action string) bool { + cfg, ok := rl.limits[action] + if !ok { + return true + } + + key := userID.String() + ":" + action + now := time.Now() + + rl.mu.Lock() + defer rl.mu.Unlock() + + bucket, exists := rl.entries[key] + if !exists || now.Sub(bucket.windowAt) > cfg.window { + rl.entries[key] = &rateBucket{count: 1, windowAt: now} + return true + } + + if bucket.count >= cfg.maxRequests { + return false + } + + bucket.count++ + return true +}