- Implement full MessageHandler dispatch with all 18 incoming message types - Add handler_messages.go: SendMessage, EditMessage, DeleteMessage with ownership checks - Add handler_rooms.go: JoinConversation, LeaveConversation - Add handler_history.go: FetchHistory (cursor-based), SearchMessages (ILIKE), SyncMessages - Add handler_realtime.go: Typing, MarkAsRead, Delivered, AddReaction, RemoveReaction - Add handler_calls.go: WebRTC signaling relay (CallOffer/Answer/ICE/Hangup/Reject) - Add PermissionService: CanRead/CanSend/CanJoin/CanModerate based on room_members - Add RateLimiter: per-user per-action sliding window (in-memory) - Wire all dependencies in router.go setupChatWebSocket
189 lines
5.1 KiB
Go
189 lines
5.1 KiB
Go
package chat
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
const (
|
|
defaultHistoryLimit = 50
|
|
maxHistoryLimit = 100
|
|
)
|
|
|
|
func (h *MessageHandler) HandleFetchHistory(ctx context.Context, client *Client, msg *IncomingMessage) {
|
|
if msg.ConversationID == nil {
|
|
client.SendJSON(NewErrorResponse("conversation_id is required"))
|
|
return
|
|
}
|
|
|
|
if !h.rateLimiter.Allow(client.UserID, "fetch_history") {
|
|
client.SendJSON(NewErrorResponse("rate limit exceeded"))
|
|
return
|
|
}
|
|
|
|
if !h.permissions.CanRead(ctx, client.UserID, *msg.ConversationID) {
|
|
client.SendJSON(NewErrorResponse("not allowed to read this conversation"))
|
|
return
|
|
}
|
|
|
|
limit := defaultHistoryLimit
|
|
if msg.Limit != nil && *msg.Limit > 0 {
|
|
limit = *msg.Limit
|
|
if limit > maxHistoryLimit {
|
|
limit = maxHistoryLimit
|
|
}
|
|
}
|
|
|
|
messages, err := h.msgRepo.GetConversationMessages(ctx, *msg.ConversationID, limit+1, 0)
|
|
if err != nil {
|
|
h.logger.Error("Failed to fetch history", zap.Error(err))
|
|
client.SendJSON(NewErrorResponse("failed to fetch history"))
|
|
return
|
|
}
|
|
|
|
hasMoreBefore := len(messages) > limit
|
|
if hasMoreBefore {
|
|
messages = messages[:limit]
|
|
}
|
|
|
|
dtos := make([]MessageDTO, 0, len(messages))
|
|
for _, m := range messages {
|
|
dtos = append(dtos, MessageDTO{
|
|
ID: m.ID,
|
|
ConversationID: m.ConversationID,
|
|
SenderID: m.SenderID,
|
|
Content: m.Content,
|
|
MessageType: m.MessageType,
|
|
ParentMessageID: m.ParentMessageID,
|
|
ReplyToID: m.ReplyToID,
|
|
IsPinned: m.IsPinned,
|
|
IsEdited: m.IsEdited,
|
|
IsDeleted: m.IsDeleted,
|
|
EditedAt: m.EditedAt,
|
|
Status: m.Status,
|
|
Metadata: m.Metadata,
|
|
CreatedAt: m.CreatedAt,
|
|
UpdatedAt: m.UpdatedAt,
|
|
})
|
|
}
|
|
|
|
client.SendJSON(NewHistoryChunkResponse(*msg.ConversationID, dtos, hasMoreBefore, false))
|
|
}
|
|
|
|
func (h *MessageHandler) HandleSearchMessages(ctx context.Context, client *Client, msg *IncomingMessage) {
|
|
if msg.ConversationID == nil {
|
|
client.SendJSON(NewErrorResponse("conversation_id is required"))
|
|
return
|
|
}
|
|
if msg.Query == "" {
|
|
client.SendJSON(NewErrorResponse("query is required"))
|
|
return
|
|
}
|
|
|
|
if !h.rateLimiter.Allow(client.UserID, "search") {
|
|
client.SendJSON(NewErrorResponse("rate limit exceeded"))
|
|
return
|
|
}
|
|
|
|
if !h.permissions.CanRead(ctx, client.UserID, *msg.ConversationID) {
|
|
client.SendJSON(NewErrorResponse("not allowed to read this conversation"))
|
|
return
|
|
}
|
|
|
|
limit := defaultHistoryLimit
|
|
if msg.Limit != nil && *msg.Limit > 0 {
|
|
limit = *msg.Limit
|
|
if limit > maxHistoryLimit {
|
|
limit = maxHistoryLimit
|
|
}
|
|
}
|
|
|
|
offset := 0
|
|
if msg.Offset != nil && *msg.Offset > 0 {
|
|
offset = *msg.Offset
|
|
}
|
|
|
|
messages, total, err := h.msgRepo.Search(ctx, *msg.ConversationID, msg.Query, limit, offset)
|
|
if err != nil {
|
|
h.logger.Error("Failed to search messages", zap.Error(err))
|
|
client.SendJSON(NewErrorResponse("failed to search messages"))
|
|
return
|
|
}
|
|
|
|
dtos := make([]MessageDTO, 0, len(messages))
|
|
for _, m := range messages {
|
|
dtos = append(dtos, MessageDTO{
|
|
ID: m.ID,
|
|
ConversationID: m.ConversationID,
|
|
SenderID: m.SenderID,
|
|
Content: m.Content,
|
|
MessageType: m.MessageType,
|
|
ParentMessageID: m.ParentMessageID,
|
|
ReplyToID: m.ReplyToID,
|
|
IsPinned: m.IsPinned,
|
|
IsEdited: m.IsEdited,
|
|
IsDeleted: m.IsDeleted,
|
|
EditedAt: m.EditedAt,
|
|
Status: m.Status,
|
|
Metadata: m.Metadata,
|
|
CreatedAt: m.CreatedAt,
|
|
UpdatedAt: m.UpdatedAt,
|
|
})
|
|
}
|
|
|
|
client.SendJSON(NewSearchResultsResponse(*msg.ConversationID, dtos, msg.Query, total))
|
|
}
|
|
|
|
func (h *MessageHandler) HandleSyncMessages(ctx context.Context, client *Client, msg *IncomingMessage) {
|
|
if msg.ConversationID == nil {
|
|
client.SendJSON(NewErrorResponse("conversation_id is required"))
|
|
return
|
|
}
|
|
if msg.Since == nil {
|
|
client.SendJSON(NewErrorResponse("since is required"))
|
|
return
|
|
}
|
|
|
|
if !h.permissions.CanRead(ctx, client.UserID, *msg.ConversationID) {
|
|
client.SendJSON(NewErrorResponse("not allowed to read this conversation"))
|
|
return
|
|
}
|
|
|
|
since, err := time.Parse(time.RFC3339, *msg.Since)
|
|
if err != nil {
|
|
client.SendJSON(NewErrorResponse("invalid since format, expected RFC3339"))
|
|
return
|
|
}
|
|
|
|
messages, err := h.msgRepo.GetMessagesSince(ctx, *msg.ConversationID, since, maxHistoryLimit)
|
|
if err != nil {
|
|
h.logger.Error("Failed to sync messages", zap.Error(err))
|
|
client.SendJSON(NewErrorResponse("failed to sync messages"))
|
|
return
|
|
}
|
|
|
|
dtos := make([]MessageDTO, 0, len(messages))
|
|
for _, m := range messages {
|
|
dtos = append(dtos, MessageDTO{
|
|
ID: m.ID,
|
|
ConversationID: m.ConversationID,
|
|
SenderID: m.SenderID,
|
|
Content: m.Content,
|
|
MessageType: m.MessageType,
|
|
ParentMessageID: m.ParentMessageID,
|
|
ReplyToID: m.ReplyToID,
|
|
IsPinned: m.IsPinned,
|
|
IsEdited: m.IsEdited,
|
|
IsDeleted: m.IsDeleted,
|
|
EditedAt: m.EditedAt,
|
|
Status: m.Status,
|
|
Metadata: m.Metadata,
|
|
CreatedAt: m.CreatedAt,
|
|
UpdatedAt: m.UpdatedAt,
|
|
})
|
|
}
|
|
|
|
client.SendJSON(NewSyncChunkResponse(*msg.ConversationID, dtos, time.Now()))
|
|
}
|