diff --git a/docs/CHAT_FEATURE_PARITY.md b/docs/CHAT_FEATURE_PARITY.md new file mode 100644 index 000000000..97c4c5f60 --- /dev/null +++ b/docs/CHAT_FEATURE_PARITY.md @@ -0,0 +1,56 @@ +# Chat Feature Parity: Rust vs Go + +**Date**: 2026-02-22 +**Version**: v0.502 +**ADR**: [ADR-002-chat-server.md](adr/ADR-002-chat-server.md) + +## Feature Checklist + +| # | Feature | Rust (veza-chat-server) | Go (veza-backend-api) | Status | +|---|---------|------------------------|----------------------|--------| +| 1 | WebSocket connection | Axum + tokio-tungstenite | coder/websocket + Gin | OK | +| 2 | JWT authentication (query param) | Custom jwt_manager.rs | ChatService.ValidateChatToken | OK | +| 3 | SendMessage | websocket/handler.rs | handler_messages.go | OK | +| 4 | EditMessage | websocket/handler.rs | handler_messages.go | OK | +| 5 | DeleteMessage (soft delete) | websocket/handler.rs | handler_messages.go | OK | +| 6 | JoinConversation | websocket/handler.rs | handler_rooms.go | OK | +| 7 | LeaveConversation | websocket/handler.rs | handler_rooms.go | OK | +| 8 | FetchHistory (cursor) | websocket/handler.rs | handler_history.go | OK | +| 9 | SearchMessages (ILIKE) | websocket/handler.rs | handler_history.go | OK | +| 10 | SyncMessages (since) | websocket/handler.rs | handler_history.go | OK | +| 11 | Typing indicators | typing_indicator.rs | handler_realtime.go | OK | +| 12 | Read receipts | read_receipts.rs | handler_realtime.go | OK | +| 13 | Delivered status | delivered_status.rs | handler_realtime.go | OK | +| 14 | Reactions (add/remove) | reactions.rs | handler_realtime.go | OK | +| 15 | CallOffer (WebRTC relay) | websocket/handler.rs | handler_calls.go | OK | +| 16 | CallAnswer (WebRTC relay) | websocket/handler.rs | handler_calls.go | OK | +| 17 | ICECandidate (WebRTC relay) | websocket/handler.rs | handler_calls.go | OK | +| 18 | CallHangup | websocket/handler.rs | handler_calls.go | OK | +| 19 | CallReject | websocket/handler.rs | handler_calls.go | OK | +| 20 | Ping/Pong keepalive | websocket/handler.rs | handler.go + client.go | OK | +| 21 | Rate limiting | security/rate_limiter.rs | rate_limiter.go | OK | +| 22 | Permissions (read/send/join/moderate) | permissions.rs | permissions.go | OK | +| 23 | Multi-instance broadcast | N/A (single instance) | ChatPubSubService (Redis) | IMPROVED | +| 24 | Message attachments | websocket/handler.rs | handler_messages.go (metadata JSONB) | OK | +| 25 | ActionConfirmed responses | websocket/handler.rs | handler*.go | OK | + +## Protocol Compatibility + +- All 19 incoming message types preserved (same `type` field strings) +- All 20 outgoing message types preserved (same `type` field strings) +- JSON field names identical to Rust implementation +- UUIDs as strings, datetimes as ISO 8601 (RFC3339) + +## Improvements Over Rust + +1. **Multi-instance support**: Redis PubSub enables horizontal scaling (Rust was single-instance) +2. **Shared authentication**: Uses the same JWT secret as the backend API (Rust had separate config) +3. **Unified deployment**: Single binary, no separate container needed +4. **GORM persistence**: Consistent with rest of the Go backend (Rust used raw SQLx) +5. **In-memory fallback**: PubSub and rate limiter work without Redis for development + +## Missing from Rust (intentionally not ported) + +- Prometheus metrics endpoint on separate port (handled by Go backend global metrics) +- Custom presence tracking module (simplified to hub.IsUserOnline) +- Separate health check endpoint (covered by backend /api/v1/health) diff --git a/veza-backend-api/internal/integration/e2e_chat_messages_test.go b/veza-backend-api/internal/integration/e2e_chat_messages_test.go new file mode 100644 index 000000000..b33b0efab --- /dev/null +++ b/veza-backend-api/internal/integration/e2e_chat_messages_test.go @@ -0,0 +1,147 @@ +//go:build integration +// +build integration + +package integration + +import ( + "context" + "encoding/json" + "fmt" + "testing" + "time" + + "veza-backend-api/internal/services" + + "github.com/coder/websocket" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestChatWS_SendMessage_Flow(t *testing.T) { + ts, db, cleanup := setupChatTestRouter(t) + defer cleanup() + + roomID := uuid.New() + + user1ID := uuid.New() + user2ID := uuid.New() + + // Setup room membership + db.Exec("INSERT INTO rooms (id, name, type) VALUES (?, 'test', 'public')", roomID) + db.Exec("INSERT INTO room_members (user_id, room_id, role, is_muted) VALUES (?, ?, 'member', 0)", user1ID, roomID) + db.Exec("INSERT INTO room_members (user_id, room_id, role, is_muted) VALUES (?, ?, 'member', 0)", user2ID, roomID) + + chatService := services.NewChatService(testJWTSecret, nil) + + token1, err := chatService.GenerateToken(user1ID, "user1") + require.NoError(t, err) + token2, err := chatService.GenerateToken(user2ID, "user2") + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Connect user1 + wsURL1 := fmt.Sprintf("ws%s/api/v1/ws?token=%s", ts.URL[4:], token1.Token) + conn1, _, err := websocket.Dial(ctx, wsURL1, nil) + require.NoError(t, err) + defer conn1.Close(websocket.StatusNormalClosure, "") + + // Connect user2 + wsURL2 := fmt.Sprintf("ws%s/api/v1/ws?token=%s", ts.URL[4:], token2.Token) + conn2, _, err := websocket.Dial(ctx, wsURL2, nil) + require.NoError(t, err) + defer conn2.Close(websocket.StatusNormalClosure, "") + + // Read ActionConfirmed for both + _, _, _ = conn1.Read(ctx) + _, _, _ = conn2.Read(ctx) + + // Both join the room + joinMsg := fmt.Sprintf(`{"type":"JoinConversation","conversation_id":"%s"}`, roomID) + err = conn1.Write(ctx, websocket.MessageText, []byte(joinMsg)) + require.NoError(t, err) + err = conn2.Write(ctx, websocket.MessageText, []byte(joinMsg)) + require.NoError(t, err) + + // Read join confirmations + _, _, _ = conn1.Read(ctx) + _, _, _ = conn2.Read(ctx) + + // User1 sends a message + sendMsg := fmt.Sprintf(`{"type":"SendMessage","conversation_id":"%s","content":"Hello from user1!"}`, roomID) + err = conn1.Write(ctx, websocket.MessageText, []byte(sendMsg)) + require.NoError(t, err) + + // User1 should get ActionConfirmed + _, msg1, err := conn1.Read(ctx) + require.NoError(t, err) + var confirmed map[string]interface{} + json.Unmarshal(msg1, &confirmed) + assert.Equal(t, "ActionConfirmed", confirmed["type"]) + + // Both should get NewMessage broadcast + _, msg1Broadcast, err := conn1.Read(ctx) + require.NoError(t, err) + var newMsg1 map[string]interface{} + json.Unmarshal(msg1Broadcast, &newMsg1) + assert.Equal(t, "NewMessage", newMsg1["type"]) + assert.Equal(t, "Hello from user1!", newMsg1["content"]) + + _, msg2Broadcast, err := conn2.Read(ctx) + require.NoError(t, err) + var newMsg2 map[string]interface{} + json.Unmarshal(msg2Broadcast, &newMsg2) + assert.Equal(t, "NewMessage", newMsg2["type"]) + assert.Equal(t, "Hello from user1!", newMsg2["content"]) +} + +func TestChatWS_TypingIndicator(t *testing.T) { + ts, db, cleanup := setupChatTestRouter(t) + defer cleanup() + + roomID := uuid.New() + user1ID := uuid.New() + user2ID := uuid.New() + + db.Exec("INSERT INTO rooms (id, name, type) VALUES (?, 'test', 'public')", roomID) + db.Exec("INSERT INTO room_members (user_id, room_id, role, is_muted) VALUES (?, ?, 'member', 0)", user1ID, roomID) + db.Exec("INSERT INTO room_members (user_id, room_id, role, is_muted) VALUES (?, ?, 'member', 0)", user2ID, roomID) + + chatService := services.NewChatService(testJWTSecret, nil) + token1, _ := chatService.GenerateToken(user1ID, "user1") + token2, _ := chatService.GenerateToken(user2ID, "user2") + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + conn1, _, _ := websocket.Dial(ctx, fmt.Sprintf("ws%s/api/v1/ws?token=%s", ts.URL[4:], token1.Token), nil) + conn2, _, _ := websocket.Dial(ctx, fmt.Sprintf("ws%s/api/v1/ws?token=%s", ts.URL[4:], token2.Token), nil) + defer conn1.Close(websocket.StatusNormalClosure, "") + defer conn2.Close(websocket.StatusNormalClosure, "") + + _, _, _ = conn1.Read(ctx) + _, _, _ = conn2.Read(ctx) + + joinMsg := fmt.Sprintf(`{"type":"JoinConversation","conversation_id":"%s"}`, roomID) + conn1.Write(ctx, websocket.MessageText, []byte(joinMsg)) + conn2.Write(ctx, websocket.MessageText, []byte(joinMsg)) + _, _, _ = conn1.Read(ctx) + _, _, _ = conn2.Read(ctx) + + // User1 types + typingMsg := fmt.Sprintf(`{"type":"Typing","conversation_id":"%s","is_typing":true}`, roomID) + conn1.Write(ctx, websocket.MessageText, []byte(typingMsg)) + + // User1 gets ActionConfirmed + _, _, _ = conn1.Read(ctx) + + // User2 should see typing indicator + _, typing, err := conn2.Read(ctx) + require.NoError(t, err) + var typingResp map[string]interface{} + json.Unmarshal(typing, &typingResp) + assert.Equal(t, "UserTyping", typingResp["type"]) + assert.Equal(t, true, typingResp["is_typing"]) +} diff --git a/veza-backend-api/internal/integration/e2e_chat_ws_test.go b/veza-backend-api/internal/integration/e2e_chat_ws_test.go new file mode 100644 index 000000000..07a250af8 --- /dev/null +++ b/veza-backend-api/internal/integration/e2e_chat_ws_test.go @@ -0,0 +1,182 @@ +//go:build integration +// +build integration + +package integration + +import ( + "context" + "fmt" + "net/http/httptest" + "os" + "testing" + "time" + + "veza-backend-api/internal/api" + "veza-backend-api/internal/config" + "veza-backend-api/internal/database" + "veza-backend-api/internal/metrics" + "veza-backend-api/internal/models" + "veza-backend-api/internal/services" + + "github.com/coder/websocket" + "github.com/gin-gonic/gin" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" + "gorm.io/driver/sqlite" + "gorm.io/gorm" +) + +const testJWTSecret = "test-secret-key-minimum-32-characters-long" + +func setupChatTestRouter(t *testing.T) (*httptest.Server, *gorm.DB, func()) { + t.Helper() + + os.Setenv("ENABLE_CLAMAV", "false") + os.Setenv("CLAMAV_REQUIRED", "false") + + gin.SetMode(gin.TestMode) + router := gin.New() + logger := zaptest.NewLogger(t) + + mockGormDB, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{}) + require.NoError(t, err) + + require.NoError(t, mockGormDB.AutoMigrate( + &models.User{}, + &models.Role{}, + &models.Permission{}, + &models.ChatMessage{}, + &models.ReadReceipt{}, + &models.DeliveredStatus{}, + &models.MessageReaction{}, + )) + + // Create room_members table for permissions + mockGormDB.Exec(`CREATE TABLE IF NOT EXISTS room_members ( + user_id TEXT NOT NULL, + room_id TEXT NOT NULL, + role TEXT DEFAULT 'member', + is_muted INTEGER DEFAULT 0 + )`) + + // Create rooms table + mockGormDB.Exec(`CREATE TABLE IF NOT EXISTS rooms ( + id TEXT PRIMARY KEY, + name TEXT, + type TEXT DEFAULT 'public' + )`) + + mockDB := &database.Database{GormDB: mockGormDB} + + metrics.InitPrometheus() + + cfg := &config.Config{ + Env: "test", + ChatJWTSecret: testJWTSecret, + HandlerTimeout: 30 * time.Second, + LogLevel: "DEBUG", + } + + apiRouter := api.NewAPIRouter(mockDB, cfg) + err = apiRouter.Setup(router) + require.NoError(t, err) + + ts := httptest.NewServer(router) + + cleanup := func() { + ts.Close() + } + + // Inject logger + _ = logger + + return ts, mockGormDB, cleanup +} + +func TestChatWS_AuthValid(t *testing.T) { + ts, _, cleanup := setupChatTestRouter(t) + defer cleanup() + + chatService := services.NewChatService(testJWTSecret, nil) + userID := uuid.New() + tokenResp, err := chatService.GenerateToken(userID, "testuser") + require.NoError(t, err) + + wsURL := fmt.Sprintf("%s/api/v1/ws?token=%s", ts.URL, tokenResp.Token) + wsURL = "ws" + wsURL[4:] + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + conn, _, err := websocket.Dial(ctx, wsURL, nil) + require.NoError(t, err) + defer conn.Close(websocket.StatusNormalClosure, "") + + // Should receive ActionConfirmed connected message + _, msg, err := conn.Read(ctx) + require.NoError(t, err) + assert.Contains(t, string(msg), "connected") +} + +func TestChatWS_AuthMissingToken(t *testing.T) { + ts, _, cleanup := setupChatTestRouter(t) + defer cleanup() + + wsURL := fmt.Sprintf("%s/api/v1/ws", ts.URL) + wsURL = "ws" + wsURL[4:] + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + _, _, err := websocket.Dial(ctx, wsURL, nil) + assert.Error(t, err) +} + +func TestChatWS_AuthInvalidToken(t *testing.T) { + ts, _, cleanup := setupChatTestRouter(t) + defer cleanup() + + wsURL := fmt.Sprintf("%s/api/v1/ws?token=invalid-token", ts.URL) + wsURL = "ws" + wsURL[4:] + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + _, _, err := websocket.Dial(ctx, wsURL, nil) + assert.Error(t, err) +} + +func TestChatWS_PingPong(t *testing.T) { + ts, _, cleanup := setupChatTestRouter(t) + defer cleanup() + + chatService := services.NewChatService(testJWTSecret, nil) + userID := uuid.New() + tokenResp, err := chatService.GenerateToken(userID, "testuser") + require.NoError(t, err) + + wsURL := fmt.Sprintf("%s/api/v1/ws?token=%s", ts.URL, tokenResp.Token) + wsURL = "ws" + wsURL[4:] + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + conn, _, err := websocket.Dial(ctx, wsURL, nil) + require.NoError(t, err) + defer conn.Close(websocket.StatusNormalClosure, "") + + // Read ActionConfirmed + _, _, err = conn.Read(ctx) + require.NoError(t, err) + + // Send Ping + err = conn.Write(ctx, websocket.MessageText, []byte(`{"type":"Ping"}`)) + require.NoError(t, err) + + // Expect Pong + _, msg, err := conn.Read(ctx) + require.NoError(t, err) + assert.Contains(t, string(msg), "Pong") +} diff --git a/veza-backend-api/internal/websocket/chat/handler_messages_test.go b/veza-backend-api/internal/websocket/chat/handler_messages_test.go new file mode 100644 index 000000000..9f523c0eb --- /dev/null +++ b/veza-backend-api/internal/websocket/chat/handler_messages_test.go @@ -0,0 +1,195 @@ +package chat + +import ( + "context" + "encoding/json" + "testing" + "time" + + "veza-backend-api/internal/models" + "veza-backend-api/internal/repositories" + "veza-backend-api/internal/services" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" + "gorm.io/driver/sqlite" + "gorm.io/gorm" +) + +func setupTestHandler(t *testing.T) (*MessageHandler, *Hub, *gorm.DB) { + t.Helper() + logger := zaptest.NewLogger(t) + + db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{}) + require.NoError(t, err) + + require.NoError(t, db.AutoMigrate( + &models.ChatMessage{}, + &models.ReadReceipt{}, + &models.DeliveredStatus{}, + &models.MessageReaction{}, + )) + + hub := NewHub(logger) + go hub.Run() + time.Sleep(10 * time.Millisecond) + + msgRepo := repositories.NewChatMessageRepository(db) + readRepo := repositories.NewReadReceiptRepository(db) + deliveredRepo := repositories.NewDeliveredStatusRepository(db) + reactionRepo := repositories.NewReactionRepository(db) + pubsub := services.NewChatPubSubService(nil, logger) + permissions := &PermissionService{db: db, logger: logger} + rateLimiter := NewRateLimiter() + + handler := NewMessageHandler( + hub, msgRepo, readRepo, deliveredRepo, reactionRepo, + pubsub, permissions, rateLimiter, logger, + ) + + return handler, hub, db +} + +func TestHandleSendMessage_Success(t *testing.T) { + handler, hub, db := setupTestHandler(t) + ctx := context.Background() + + userID := uuid.New() + roomID := uuid.New() + + db.Exec("CREATE TABLE IF NOT EXISTS room_members (user_id TEXT, room_id TEXT, role TEXT DEFAULT 'member', is_muted INTEGER DEFAULT 0)") + db.Exec("INSERT INTO room_members (user_id, room_id, role, is_muted) VALUES (?, ?, 'member', 0)", userID, roomID) + + client := newTestClient(hub, userID) + hub.Register(client) + hub.JoinRoom(client, roomID) + time.Sleep(20 * time.Millisecond) + + msg := &IncomingMessage{ + Type: TypeSendMessage, + ConversationID: &roomID, + Content: "Hello World", + } + + handler.HandleSendMessage(ctx, client, msg) + time.Sleep(20 * time.Millisecond) + + // Check we got ActionConfirmed and NewMessage broadcast + assert.GreaterOrEqual(t, len(client.send), 1) + + // Verify message was stored in DB + var count int64 + db.Model(&models.ChatMessage{}).Where("room_id = ?", roomID).Count(&count) + assert.Equal(t, int64(1), count) +} + +func TestHandleSendMessage_MissingConversationID(t *testing.T) { + handler, hub, _ := setupTestHandler(t) + ctx := context.Background() + + userID := uuid.New() + client := newTestClient(hub, userID) + hub.Register(client) + time.Sleep(20 * time.Millisecond) + + msg := &IncomingMessage{ + Type: TypeSendMessage, + Content: "Hello", + } + + handler.HandleSendMessage(ctx, client, msg) + time.Sleep(20 * time.Millisecond) + + assert.Len(t, client.send, 1) + resp := <-client.send + var parsed map[string]interface{} + json.Unmarshal(resp, &parsed) + assert.Equal(t, TypeError, parsed["type"]) +} + +func TestHandleEditMessage_OwnershipCheck(t *testing.T) { + handler, hub, db := setupTestHandler(t) + ctx := context.Background() + + senderID := uuid.New() + otherID := uuid.New() + roomID := uuid.New() + msgID := uuid.New() + + existingMsg := &models.ChatMessage{ + ID: msgID, + ConversationID: roomID, + SenderID: senderID, + Content: "original", + MessageType: "text", + Status: "sent", + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + db.Create(existingMsg) + + otherClient := newTestClient(hub, otherID) + hub.Register(otherClient) + time.Sleep(20 * time.Millisecond) + + msg := &IncomingMessage{ + Type: TypeEditMessage, + MessageID: &msgID, + ConversationID: &roomID, + NewContent: "modified", + } + + handler.HandleEditMessage(ctx, otherClient, msg) + time.Sleep(20 * time.Millisecond) + + assert.Len(t, otherClient.send, 1) + resp := <-otherClient.send + var parsed map[string]interface{} + json.Unmarshal(resp, &parsed) + assert.Equal(t, TypeError, parsed["type"]) + assert.Contains(t, parsed["message"], "own messages") +} + +func TestHandleDeleteMessage_SoftDelete(t *testing.T) { + handler, hub, db := setupTestHandler(t) + ctx := context.Background() + + userID := uuid.New() + roomID := uuid.New() + msgID := uuid.New() + + db.Exec("CREATE TABLE IF NOT EXISTS room_members (user_id TEXT, room_id TEXT, role TEXT DEFAULT 'member', is_muted INTEGER DEFAULT 0)") + db.Exec("INSERT INTO room_members (user_id, room_id, role, is_muted) VALUES (?, ?, 'member', 0)", userID, roomID) + + existingMsg := &models.ChatMessage{ + ID: msgID, + ConversationID: roomID, + SenderID: userID, + Content: "to delete", + MessageType: "text", + Status: "sent", + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + db.Create(existingMsg) + + client := newTestClient(hub, userID) + hub.Register(client) + hub.JoinRoom(client, roomID) + time.Sleep(20 * time.Millisecond) + + msg := &IncomingMessage{ + Type: TypeDeleteMessage, + MessageID: &msgID, + ConversationID: &roomID, + } + + handler.HandleDeleteMessage(ctx, client, msg) + time.Sleep(20 * time.Millisecond) + + var deletedMsg models.ChatMessage + db.Where("id = ?", msgID).First(&deletedMsg) + assert.True(t, deletedMsg.IsDeleted) +} diff --git a/veza-backend-api/internal/websocket/chat/handler_realtime_test.go b/veza-backend-api/internal/websocket/chat/handler_realtime_test.go new file mode 100644 index 000000000..71cf1ce99 --- /dev/null +++ b/veza-backend-api/internal/websocket/chat/handler_realtime_test.go @@ -0,0 +1,173 @@ +package chat + +import ( + "context" + "encoding/json" + "testing" + "time" + + "veza-backend-api/internal/models" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" +) + +func TestHandleTyping_BroadcastsToOthers(t *testing.T) { + handler, hub, db := setupTestHandler(t) + ctx := context.Background() + + user1 := uuid.New() + user2 := uuid.New() + roomID := uuid.New() + + db.Exec("CREATE TABLE IF NOT EXISTS room_members (user_id TEXT, room_id TEXT, role TEXT DEFAULT 'member', is_muted INTEGER DEFAULT 0)") + + client1 := newTestClient(hub, user1) + client2 := newTestClient(hub, user2) + hub.Register(client1) + hub.Register(client2) + hub.JoinRoom(client1, roomID) + hub.JoinRoom(client2, roomID) + time.Sleep(20 * time.Millisecond) + + isTyping := true + msg := &IncomingMessage{ + Type: TypeTyping, + ConversationID: &roomID, + IsTyping: &isTyping, + } + + handler.HandleTyping(ctx, client1, msg) + time.Sleep(20 * time.Millisecond) + + // client1 gets ActionConfirmed, client2 gets UserTyping broadcast + assert.GreaterOrEqual(t, len(client1.send), 1) + assert.GreaterOrEqual(t, len(client2.send), 1) + + typingResp := <-client2.send + var parsed map[string]interface{} + json.Unmarshal(typingResp, &parsed) + assert.Equal(t, TypeUserTyping, parsed["type"]) + assert.Equal(t, true, parsed["is_typing"]) +} + +func TestHandleMarkAsRead_PersistsReceipt(t *testing.T) { + handler, hub, db := setupTestHandler(t) + ctx := context.Background() + + userID := uuid.New() + roomID := uuid.New() + msgID := uuid.New() + + client := newTestClient(hub, userID) + hub.Register(client) + hub.JoinRoom(client, roomID) + time.Sleep(20 * time.Millisecond) + + msg := &IncomingMessage{ + Type: TypeMarkAsRead, + ConversationID: &roomID, + MessageID: &msgID, + } + + handler.HandleMarkAsRead(ctx, client, msg) + time.Sleep(20 * time.Millisecond) + + var count int64 + db.Model(&models.ReadReceipt{}).Where("user_id = ? AND message_id = ?", userID, msgID).Count(&count) + assert.Equal(t, int64(1), count) +} + +func TestHandleAddReaction_PersistsAndBroadcasts(t *testing.T) { + handler, hub, db := setupTestHandler(t) + ctx := context.Background() + + userID := uuid.New() + roomID := uuid.New() + msgID := uuid.New() + + client := newTestClient(hub, userID) + hub.Register(client) + hub.JoinRoom(client, roomID) + time.Sleep(20 * time.Millisecond) + + msg := &IncomingMessage{ + Type: TypeAddReaction, + ConversationID: &roomID, + MessageID: &msgID, + Emoji: "👍", + } + + handler.HandleAddReaction(ctx, client, msg) + time.Sleep(20 * time.Millisecond) + + var count int64 + db.Model(&models.MessageReaction{}).Where("user_id = ? AND message_id = ?", userID, msgID).Count(&count) + assert.Equal(t, int64(1), count) + + // Check broadcast + assert.GreaterOrEqual(t, len(client.send), 1) +} + +func TestHandleRemoveReaction_DeletesFromDB(t *testing.T) { + handler, hub, db := setupTestHandler(t) + ctx := context.Background() + + userID := uuid.New() + roomID := uuid.New() + msgID := uuid.New() + + reaction := &models.MessageReaction{ + ID: uuid.New(), + UserID: userID, + MessageID: msgID, + Emoji: "👍", + CreatedAt: time.Now(), + } + db.Create(reaction) + + client := newTestClient(hub, userID) + hub.Register(client) + hub.JoinRoom(client, roomID) + time.Sleep(20 * time.Millisecond) + + msg := &IncomingMessage{ + Type: TypeRemoveReaction, + ConversationID: &roomID, + MessageID: &msgID, + } + + handler.HandleRemoveReaction(ctx, client, msg) + time.Sleep(20 * time.Millisecond) + + var count int64 + db.Model(&models.MessageReaction{}).Where("user_id = ? AND message_id = ?", userID, msgID).Count(&count) + assert.Equal(t, int64(0), count) +} + +func TestHandleDelivered_PersistsStatus(t *testing.T) { + handler, hub, db := setupTestHandler(t) + ctx := context.Background() + + userID := uuid.New() + roomID := uuid.New() + msgID := uuid.New() + + client := newTestClient(hub, userID) + hub.Register(client) + hub.JoinRoom(client, roomID) + time.Sleep(20 * time.Millisecond) + + msg := &IncomingMessage{ + Type: TypeDelivered, + ConversationID: &roomID, + MessageID: &msgID, + } + + handler.HandleDelivered(ctx, client, msg) + time.Sleep(20 * time.Millisecond) + + var count int64 + db.Model(&models.DeliveredStatus{}).Where("user_id = ? AND message_id = ?", userID, msgID).Count(&count) + assert.Equal(t, int64(1), count) +} diff --git a/veza-backend-api/internal/websocket/chat/hub_test.go b/veza-backend-api/internal/websocket/chat/hub_test.go new file mode 100644 index 000000000..b7c128093 --- /dev/null +++ b/veza-backend-api/internal/websocket/chat/hub_test.go @@ -0,0 +1,170 @@ +package chat + +import ( + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "go.uber.org/zap/zaptest" +) + +func newTestHub(t *testing.T) *Hub { + logger := zaptest.NewLogger(t) + hub := NewHub(logger) + go hub.Run() + time.Sleep(10 * time.Millisecond) + return hub +} + +func newTestClient(hub *Hub, userID uuid.UUID) *Client { + return &Client{ + Hub: hub, + UserID: userID, + send: make(chan []byte, 256), + } +} + +func TestHub_RegisterAndUnregister(t *testing.T) { + hub := newTestHub(t) + userID := uuid.New() + client := newTestClient(hub, userID) + + hub.Register(client) + time.Sleep(20 * time.Millisecond) + + assert.True(t, hub.IsUserOnline(userID)) + assert.Equal(t, 1, hub.GetConnectedUsersCount()) + + hub.Unregister(client) + time.Sleep(20 * time.Millisecond) + + assert.False(t, hub.IsUserOnline(userID)) + assert.Equal(t, 0, hub.GetConnectedUsersCount()) +} + +func TestHub_JoinAndLeaveRoom(t *testing.T) { + hub := newTestHub(t) + userID := uuid.New() + roomID := uuid.New() + client := newTestClient(hub, userID) + + hub.Register(client) + time.Sleep(20 * time.Millisecond) + + hub.JoinRoom(client, roomID) + members := hub.GetRoomMembers(roomID) + assert.Len(t, members, 1) + assert.Equal(t, userID, members[0].UserID) + + hub.LeaveRoom(client, roomID) + members = hub.GetRoomMembers(roomID) + assert.Nil(t, members) +} + +func TestHub_BroadcastToRoom(t *testing.T) { + hub := newTestHub(t) + + user1 := uuid.New() + user2 := uuid.New() + roomID := uuid.New() + + client1 := newTestClient(hub, user1) + client2 := newTestClient(hub, user2) + + hub.Register(client1) + hub.Register(client2) + time.Sleep(20 * time.Millisecond) + + hub.JoinRoom(client1, roomID) + hub.JoinRoom(client2, roomID) + + msg := []byte(`{"type":"NewMessage","content":"hello"}`) + hub.BroadcastToRoom(roomID, msg, nil) + time.Sleep(20 * time.Millisecond) + + assert.Len(t, client1.send, 1) + assert.Len(t, client2.send, 1) + + received1 := <-client1.send + received2 := <-client2.send + assert.Equal(t, msg, received1) + assert.Equal(t, msg, received2) +} + +func TestHub_BroadcastToRoom_ExcludesSender(t *testing.T) { + hub := newTestHub(t) + + user1 := uuid.New() + user2 := uuid.New() + roomID := uuid.New() + + client1 := newTestClient(hub, user1) + client2 := newTestClient(hub, user2) + + hub.Register(client1) + hub.Register(client2) + time.Sleep(20 * time.Millisecond) + + hub.JoinRoom(client1, roomID) + hub.JoinRoom(client2, roomID) + + msg := []byte(`{"type":"UserTyping"}`) + hub.BroadcastToRoom(roomID, msg, client1) + time.Sleep(20 * time.Millisecond) + + assert.Len(t, client1.send, 0) + assert.Len(t, client2.send, 1) +} + +func TestHub_SendToUser(t *testing.T) { + hub := newTestHub(t) + + userID := uuid.New() + otherID := uuid.New() + client := newTestClient(hub, userID) + other := newTestClient(hub, otherID) + + hub.Register(client) + hub.Register(other) + time.Sleep(20 * time.Millisecond) + + msg := []byte(`{"type":"CallOffer"}`) + hub.SendToUser(userID, msg) + time.Sleep(20 * time.Millisecond) + + assert.Len(t, client.send, 1) + assert.Len(t, other.send, 0) +} + +func TestHub_MultipleClientsSameUser(t *testing.T) { + hub := newTestHub(t) + + userID := uuid.New() + client1 := newTestClient(hub, userID) + client2 := newTestClient(hub, userID) + + hub.Register(client1) + hub.Register(client2) + time.Sleep(20 * time.Millisecond) + + assert.Equal(t, 1, hub.GetConnectedUsersCount()) + assert.True(t, hub.IsUserOnline(userID)) + + msg := []byte(`{"type":"notification"}`) + hub.SendToUser(userID, msg) + time.Sleep(20 * time.Millisecond) + + assert.Len(t, client1.send, 1) + assert.Len(t, client2.send, 1) + + hub.Unregister(client1) + time.Sleep(20 * time.Millisecond) + + assert.True(t, hub.IsUserOnline(userID)) + + hub.Unregister(client2) + time.Sleep(20 * time.Millisecond) + + assert.False(t, hub.IsUserOnline(userID)) +}