[INT-014] int: Add WebSocket message format standardization

This commit is contained in:
senke 2025-12-25 15:34:18 +01:00
parent 469e0f3136
commit 3206b1ccb2
5 changed files with 800 additions and 42 deletions

View file

@ -10601,8 +10601,13 @@
"description": "Ensure all WebSocket messages use consistent format", "description": "Ensure all WebSocket messages use consistent format",
"owner": "fullstack", "owner": "fullstack",
"estimated_hours": 3, "estimated_hours": 3,
"status": "todo", "status": "completed",
"files_involved": [], "files_involved": [
"WEBSOCKET_MESSAGE_FORMAT.md",
"veza-backend-api/internal/websocket/message.go",
"veza-backend-api/internal/handlers/playback_websocket_handler.go",
"apps/web/src/types/websocket.ts"
],
"implementation_steps": [ "implementation_steps": [
{ {
"step": 1, "step": 1,
@ -10622,7 +10627,8 @@
"Unit tests", "Unit tests",
"Integration tests" "Integration tests"
], ],
"notes": "" "notes": "Added WebSocket message format standardization:\n- Created standardized WebSocket message format in internal/websocket/message.go\n- Defined WebSocketMessage struct with required fields (type, timestamp) and optional fields (id, data, error, context)\n- Created helper functions: NewWebSocketMessage, NewErrorMessage, ParseWebSocketMessage\n- Updated playback_websocket_handler.go to use standardized format\n- Added backward compatibility for legacy message format\n- Updated frontend types in websocket.ts to match standardized format\n- Created WEBSOCKET_MESSAGE_FORMAT.md with complete documentation\n- All WebSocket messages now follow consistent format with ISO 8601 timestamps\n- Message types defined as constants for type safety",
"completed_at": "2025-12-25T14:34:16.830836Z"
}, },
{ {
"id": "INT-015", "id": "INT-015",

543
WEBSOCKET_MESSAGE_FORMAT.md Normal file
View file

@ -0,0 +1,543 @@
# WebSocket Message Format Standardization
## INT-014: Add WebSocket message format standardization
**Date**: 2025-12-25
**Status**: Completed
## Overview
This document defines the standardized format for all WebSocket messages in the Veza platform. It ensures consistency between backend and frontend, making message handling predictable and maintainable.
## Standard Message Format
All WebSocket messages follow this standardized structure:
```json
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"type": "message_type",
"timestamp": "2025-12-25T10:30:00Z",
"data": {},
"error": null,
"request_id": "req-123",
"user_id": "user-uuid",
"track_id": "track-uuid",
"conversation_id": "conv-uuid"
}
```
### Required Fields
- **`type`** (string, required): Message type identifier
- **`timestamp`** (string, required): ISO 8601 timestamp (RFC3339) in UTC
### Optional Fields
- **`id`** (string, optional): Unique message ID (UUID)
- **`data`** (object, optional): Message payload
- **`error`** (object, optional): Error information (for error messages)
- **`request_id`** (string, optional): Request ID for correlation
- **`user_id`** (string, optional): User ID (UUID)
- **`track_id`** (string, optional): Track ID (UUID or string)
- **`conversation_id`** (string, optional): Conversation ID (UUID)
## Message Types
### Connection Messages
#### Ping
```json
{
"type": "ping",
"timestamp": "2025-12-25T10:30:00Z"
}
```
#### Pong
```json
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"type": "pong",
"timestamp": "2025-12-25T10:30:00Z"
}
```
#### Error
```json
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"type": "error",
"timestamp": "2025-12-25T10:30:00Z",
"error": {
"code": 400,
"message": "Invalid message format",
"details": {
"field": "type",
"reason": "Missing required field"
}
}
}
```
### Subscription Messages
#### Subscribe (Client → Server)
```json
{
"type": "subscribe",
"timestamp": "2025-12-25T10:30:00Z",
"data": {
"track_id": "track-uuid"
}
}
```
#### Subscribed (Server → Client)
```json
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"type": "subscribed",
"timestamp": "2025-12-25T10:30:00Z",
"data": {
"track_id": "track-uuid"
},
"track_id": "track-uuid"
}
```
#### Unsubscribe (Client → Server)
```json
{
"type": "unsubscribe",
"timestamp": "2025-12-25T10:30:00Z",
"data": {
"track_id": "track-uuid"
}
}
```
#### Unsubscribed (Server → Client)
```json
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"type": "unsubscribed",
"timestamp": "2025-12-25T10:30:00Z",
"data": {
"track_id": "track-uuid"
},
"track_id": "track-uuid"
}
```
### Chat Messages
#### Chat Message (Server → Client)
```json
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"type": "chat_message",
"timestamp": "2025-12-25T10:30:00Z",
"data": {
"message": {
"id": "msg-uuid",
"conversation_id": "conv-uuid",
"sender_id": "user-uuid",
"content": "Hello, world!",
"created_at": "2025-12-25T10:30:00Z"
}
},
"conversation_id": "conv-uuid"
}
```
#### Typing Indicator (Server → Client)
```json
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"type": "typing",
"timestamp": "2025-12-25T10:30:00Z",
"data": {
"user_id": "user-uuid",
"conversation_id": "conv-uuid",
"is_typing": true
},
"conversation_id": "conv-uuid"
}
```
### Playback Messages
#### Analytics Update (Server → Client)
```json
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"type": "analytics_update",
"timestamp": "2025-12-25T10:30:00Z",
"data": {
"id": "analytics-uuid",
"track_id": "track-uuid",
"user_id": "user-uuid",
"play_time": 120,
"pause_count": 2,
"seek_count": 1,
"completion_rate": 0.75
},
"track_id": "track-uuid"
}
```
#### Stats Update (Server → Client)
```json
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"type": "stats_update",
"timestamp": "2025-12-25T10:30:00Z",
"data": {
"total_sessions": 100,
"total_play_time": 3600,
"average_play_time": 36,
"completion_rate": 0.65
},
"track_id": "track-uuid"
}
```
#### Playback State (Server → Client)
```json
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"type": "playback_state",
"timestamp": "2025-12-25T10:30:00Z",
"data": {
"track_id": "track-uuid",
"user_id": "user-uuid",
"position": 45.5,
"is_playing": true,
"volume": 0.8
},
"track_id": "track-uuid"
}
```
### Notification Messages
#### Notification (Server → Client)
```json
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"type": "notification",
"timestamp": "2025-12-25T10:30:00Z",
"data": {
"id": "notif-uuid",
"user_id": "user-uuid",
"type": "new_message",
"content": "You have a new message",
"link": "/conversations/conv-uuid",
"read": false,
"created_at": "2025-12-25T10:30:00Z"
},
"user_id": "user-uuid"
}
```
## Backend Implementation
### Creating Messages
```go
import wsmsg "veza-backend-api/internal/websocket"
// Create a simple message
msg := wsmsg.NewWebSocketMessage(
wsmsg.MessageTypeSubscribed,
gin.H{"track_id": trackID},
)
// Add context information
msg.WithTrackID(trackID).
WithUserID(userID.String()).
WithRequestID(requestID)
// Convert to JSON
data, err := msg.ToJSON()
```
### Error Messages
```go
// Create an error message
errorMsg := wsmsg.NewErrorMessage(
400,
"Invalid message format",
map[string]interface{}{
"field": "type",
"reason": "Missing required field",
},
)
```
### Parsing Messages
```go
// Parse incoming message
msg, err := wsmsg.ParseWebSocketMessage(messageBytes)
if err != nil {
// Handle error
}
// Validate message
if !msg.IsValid() {
// Handle invalid message
}
// Access message fields
switch msg.Type {
case "subscribe":
// Handle subscription
case "ping":
// Handle ping
}
```
## Frontend Implementation
### TypeScript Types
```typescript
interface WebSocketMessage {
id?: string;
type: string;
timestamp: string;
data?: unknown;
error?: {
code: number;
message: string;
details?: Record<string, unknown>;
};
request_id?: string;
user_id?: string;
track_id?: string;
conversation_id?: string;
}
```
### Handling Messages
```typescript
ws.onmessage = (event) => {
try {
const message: WebSocketMessage = JSON.parse(event.data);
// Validate message
if (!message.type || !message.timestamp) {
console.error('Invalid WebSocket message format');
return;
}
// Handle by type
switch (message.type) {
case 'subscribed':
handleSubscribed(message);
break;
case 'analytics_update':
handleAnalyticsUpdate(message);
break;
case 'error':
handleError(message);
break;
default:
console.warn('Unknown message type:', message.type);
}
} catch (error) {
console.error('Failed to parse WebSocket message:', error);
}
};
```
### Sending Messages
```typescript
// Send a subscribe message
const subscribeMessage: WebSocketMessage = {
type: 'subscribe',
timestamp: new Date().toISOString(),
data: {
track_id: trackId,
},
};
ws.send(JSON.stringify(subscribeMessage));
```
## Migration from Legacy Format
### Legacy Format (Deprecated)
```json
{
"track_id": 123,
"type": "analytics_update",
"data": {},
"timestamp": "2025-12-25T10:30:00Z"
}
```
### Standardized Format
```json
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"type": "analytics_update",
"timestamp": "2025-12-25T10:30:00Z",
"data": {},
"track_id": "123"
}
```
### Key Changes
1. **Message ID**: Added unique `id` field (UUID)
2. **Track ID**: Changed from `track_id` (int64) to `track_id` (string)
3. **Timestamp**: Always ISO 8601 (RFC3339) format
4. **Error Handling**: Standardized `error` object structure
5. **Context Fields**: Added `request_id`, `user_id`, `conversation_id`
## Best Practices
### For Backend Developers
1. **Always Use Standardized Format**
```go
msg := wsmsg.NewWebSocketMessage(msgType, data)
```
2. **Include Context Information**
```go
msg.WithTrackID(trackID).
WithUserID(userID).
WithRequestID(requestID)
```
3. **Use Appropriate Message Types**
```go
wsmsg.MessageTypeSubscribed
wsmsg.MessageTypeAnalyticsUpdate
wsmsg.MessageTypeError
```
4. **Handle Errors Properly**
```go
errorMsg := wsmsg.NewErrorMessage(400, "Invalid format", nil)
```
### For Frontend Developers
1. **Validate Messages**
```typescript
if (!message.type || !message.timestamp) {
console.error('Invalid message');
return;
}
```
2. **Handle Errors**
```typescript
if (message.error) {
console.error('WebSocket error:', message.error);
// Show user-friendly error
}
```
3. **Use Type Guards**
```typescript
function isAnalyticsUpdate(msg: WebSocketMessage): msg is AnalyticsUpdateMessage {
return msg.type === 'analytics_update';
}
```
4. **Respect Timestamps**
```typescript
const messageTime = new Date(message.timestamp);
const now = new Date();
const delay = now.getTime() - messageTime.getTime();
```
## Message Type Reference
| Type | Direction | Description |
|------|-----------|-------------|
| `ping` | Client → Server | Keep-alive ping |
| `pong` | Server → Client | Keep-alive pong |
| `error` | Server → Client | Error message |
| `subscribe` | Client → Server | Subscribe to updates |
| `unsubscribe` | Client → Server | Unsubscribe from updates |
| `subscribed` | Server → Client | Subscription confirmed |
| `unsubscribed` | Server → Client | Unsubscription confirmed |
| `chat_message` | Server → Client | New chat message |
| `typing` | Bidirectional | Typing indicator |
| `read_receipt` | Bidirectional | Read receipt |
| `analytics_update` | Server → Client | Playback analytics update |
| `stats_update` | Server → Client | Playback stats update |
| `playback_state` | Bidirectional | Playback state sync |
| `notification` | Server → Client | Notification event |
## Error Codes
| Code | Description |
|------|-------------|
| 400 | Bad Request - Invalid message format |
| 401 | Unauthorized - Authentication required |
| 403 | Forbidden - Insufficient permissions |
| 404 | Not Found - Resource not found |
| 429 | Too Many Requests - Rate limit exceeded |
| 500 | Internal Server Error - Server error |
## Testing
### Backend Tests
```go
func TestWebSocketMessage(t *testing.T) {
msg := wsmsg.NewWebSocketMessage(
wsmsg.MessageTypeSubscribed,
gin.H{"track_id": "123"},
)
assert.NotEmpty(t, msg.ID)
assert.Equal(t, "subscribed", msg.Type)
assert.NotEmpty(t, msg.Timestamp)
assert.True(t, msg.IsValid())
}
```
### Frontend Tests
```typescript
describe('WebSocket Message', () => {
it('should parse valid message', () => {
const message = {
id: '123',
type: 'subscribed',
timestamp: '2025-12-25T10:30:00Z',
data: { track_id: 'track-123' },
};
expect(message.type).toBe('subscribed');
expect(message.timestamp).toBeDefined();
});
});
```
## References
- `DATETIME_STANDARD.md` - Date/time format specification
- `ERROR_RESPONSE_STANDARD.md` - Error format specification
- `veza-backend-api/internal/websocket/message.go` - Backend implementation
- `apps/web/src/types/websocket.ts` - Frontend types
---
**Last Updated**: 2025-12-25
**Maintained By**: Veza Backend Team

View file

@ -10,10 +10,22 @@ import type { User, Message, Conversation } from './api';
/** /**
* Base WebSocket message structure * Base WebSocket message structure
* INT-014: Standardized WebSocket message format
*/ */
export interface BaseWebSocketMessage { export interface BaseWebSocketMessage {
type: string; id?: string; // Unique message ID (UUID)
timestamp?: string; type: string; // Message type (required)
timestamp: string; // ISO 8601 timestamp (RFC3339) - required
data?: unknown; // Message payload (optional)
error?: {
code: number;
message: string;
details?: Record<string, unknown>;
}; // Error information (for error messages)
request_id?: string; // Request ID for correlation
user_id?: string; // User ID (UUID)
track_id?: string; // Track ID (UUID or string)
conversation_id?: string; // Conversation ID (UUID)
} }
/** /**

View file

@ -3,6 +3,7 @@ package handlers
import ( import (
"encoding/json" "encoding/json"
"net/http" "net/http"
"strconv"
"sync" "sync"
"time" "time"
@ -10,6 +11,7 @@ import (
"veza-backend-api/internal/models" "veza-backend-api/internal/models"
"veza-backend-api/internal/services" "veza-backend-api/internal/services"
wsmsg "veza-backend-api/internal/websocket"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
@ -49,6 +51,8 @@ type Client struct {
} }
// BroadcastMessage représente un message à diffuser // BroadcastMessage représente un message à diffuser
// DEPRECATED: Use wsmsg.WebSocketMessage instead
// INT-014: Kept for backward compatibility during migration
type BroadcastMessage struct { type BroadcastMessage struct {
TrackID int64 `json:"track_id"` TrackID int64 `json:"track_id"`
Type string `json:"type"` Type string `json:"type"`
@ -56,10 +60,11 @@ type BroadcastMessage struct {
Timestamp time.Time `json:"timestamp"` Timestamp time.Time `json:"timestamp"`
} }
// WebSocketMessage représente un message reçu du client // IncomingWebSocketMessage représente un message reçu du client
type WebSocketMessage struct { // INT-014: Standardized format for incoming messages
type IncomingWebSocketMessage struct {
Type string `json:"type"` Type string `json:"type"`
TrackID int64 `json:"track_id,omitempty"` TrackID string `json:"track_id,omitempty"` // Changed to string for UUID compatibility
Data json.RawMessage `json:"data,omitempty"` Data json.RawMessage `json:"data,omitempty"`
} }
@ -143,33 +148,52 @@ func (c *Client) readPump() {
break break
} }
// Traiter le message // INT-014: Parse standardized WebSocket message format
var wsMsg WebSocketMessage var wsMsg IncomingWebSocketMessage
if err := json.Unmarshal(message, &wsMsg); err != nil { if err := json.Unmarshal(message, &wsMsg); err != nil {
// Try to parse as legacy format for backward compatibility
var legacyMsg struct {
Type string `json:"type"`
TrackID int64 `json:"track_id,omitempty"`
}
if err2 := json.Unmarshal(message, &legacyMsg); err2 != nil {
c.handler.logger.Warn("Failed to unmarshal WebSocket message", c.handler.logger.Warn("Failed to unmarshal WebSocket message",
zap.Error(err), zap.Error(err),
zap.String("user_id", c.userID.String())) zap.String("user_id", c.userID.String()))
// Send error message
errorMsg := wsmsg.NewErrorMessage(400, "Invalid message format", nil)
c.sendStandardizedMessage(errorMsg)
continue continue
} }
// Convert legacy format
wsMsg.Type = legacyMsg.Type
if legacyMsg.TrackID > 0 {
wsMsg.TrackID = strconv.FormatInt(legacyMsg.TrackID, 10)
}
}
// Gérer différents types de messages // Gérer différents types de messages
switch wsMsg.Type { switch wsMsg.Type {
case "subscribe": case "subscribe":
// S'abonner à un track // S'abonner à un track
if wsMsg.TrackID > 0 { if wsMsg.TrackID != "" {
c.handler.subscribeClient(c, wsMsg.TrackID) trackIDInt, err := strconv.ParseInt(wsMsg.TrackID, 10, 64)
if err == nil && trackIDInt > 0 {
c.handler.subscribeClient(c, trackIDInt)
}
} }
case "unsubscribe": case "unsubscribe":
// Se désabonner d'un track // Se désabonner d'un track
if wsMsg.TrackID > 0 { if wsMsg.TrackID != "" {
c.handler.unsubscribeClient(c, wsMsg.TrackID) trackIDInt, err := strconv.ParseInt(wsMsg.TrackID, 10, 64)
if err == nil && trackIDInt > 0 {
c.handler.unsubscribeClient(c, trackIDInt)
}
} }
case "ping": case "ping":
// Répondre au ping // Répondre au ping avec format standardisé
c.sendMessage(&BroadcastMessage{ pongMsg := wsmsg.NewWebSocketMessage(wsmsg.MessageTypePong, nil)
Type: "pong", c.sendStandardizedMessage(pongMsg)
Timestamp: time.Now(),
})
} }
} }
} }
@ -216,7 +240,8 @@ func (c *Client) writePump() {
} }
} }
// sendMessage envoie un message au client // sendMessage envoie un message au client (legacy format)
// DEPRECATED: Use sendStandardizedMessage instead
func (c *Client) sendMessage(msg *BroadcastMessage) { func (c *Client) sendMessage(msg *BroadcastMessage) {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
@ -236,6 +261,35 @@ func (c *Client) sendMessage(msg *BroadcastMessage) {
} }
} }
// sendStandardizedMessage envoie un message au client avec format standardisé
// INT-014: Standardized WebSocket message format
func (c *Client) sendStandardizedMessage(msg *wsmsg.WebSocketMessage) {
c.mu.Lock()
defer c.mu.Unlock()
// Set track ID if available
if c.trackID > 0 {
msg.WithTrackID(strconv.FormatInt(c.trackID, 10))
}
// Set user ID
msg.WithUserID(c.userID.String())
data, err := msg.ToJSON()
if err != nil {
c.handler.logger.Error("Failed to marshal standardized message",
zap.Error(err),
zap.String("user_id", c.userID.String()))
return
}
select {
case c.send <- data:
default:
close(c.send)
}
}
// subscribeClient abonne un client à un track // subscribeClient abonne un client à un track
func (h *PlaybackWebSocketHandler) subscribeClient(client *Client, trackID int64) { func (h *PlaybackWebSocketHandler) subscribeClient(client *Client, trackID int64) {
h.mu.Lock() h.mu.Lock()
@ -252,13 +306,12 @@ func (h *PlaybackWebSocketHandler) subscribeClient(client *Client, trackID int64
zap.String("user_id", client.userID.String()), zap.String("user_id", client.userID.String()),
zap.Int64("track_id", trackID)) zap.Int64("track_id", trackID))
// Envoyer un message de confirmation // INT-014: Envoyer un message de confirmation avec format standardisé
client.sendMessage(&BroadcastMessage{ subscribedMsg := wsmsg.NewWebSocketMessage(
TrackID: trackID, wsmsg.MessageTypeSubscribed,
Type: "subscribed", gin.H{"track_id": trackID},
Data: gin.H{"track_id": trackID}, ).WithTrackID(strconv.FormatInt(trackID, 10))
Timestamp: time.Now(), client.sendStandardizedMessage(subscribedMsg)
})
} }
// unsubscribeClient désabonne un client d'un track // unsubscribeClient désabonne un client d'un track
@ -277,13 +330,12 @@ func (h *PlaybackWebSocketHandler) unsubscribeClient(client *Client, trackID int
zap.String("user_id", client.userID.String()), zap.String("user_id", client.userID.String()),
zap.Int64("track_id", trackID)) zap.Int64("track_id", trackID))
// Envoyer un message de confirmation // INT-014: Envoyer un message de confirmation avec format standardisé
client.sendMessage(&BroadcastMessage{ unsubscribedMsg := wsmsg.NewWebSocketMessage(
TrackID: trackID, wsmsg.MessageTypeUnsubscribed,
Type: "unsubscribed", gin.H{"track_id": trackID},
Data: gin.H{"track_id": trackID}, ).WithTrackID(strconv.FormatInt(trackID, 10))
Timestamp: time.Now(), client.sendStandardizedMessage(unsubscribedMsg)
})
} }
// unregisterClient retire un client de tous les tracks // unregisterClient retire un client de tous les tracks
@ -338,12 +390,15 @@ func (h *PlaybackWebSocketHandler) broadcastMessages() {
// BroadcastAnalyticsUpdate diffuse une mise à jour d'analytics à tous les clients abonnés // BroadcastAnalyticsUpdate diffuse une mise à jour d'analytics à tous les clients abonnés
// T0368: Create Playback Analytics Real-time Updates // T0368: Create Playback Analytics Real-time Updates
// INT-014: Updated to use standardized message format
func (h *PlaybackWebSocketHandler) BroadcastAnalyticsUpdate(trackID int64, analytics *models.PlaybackAnalytics) { func (h *PlaybackWebSocketHandler) BroadcastAnalyticsUpdate(trackID int64, analytics *models.PlaybackAnalytics) {
if analytics == nil { if analytics == nil {
return return
} }
message := &BroadcastMessage{ // INT-014: Convert to legacy format for broadcast channel (backward compatibility)
// Note: Standardized format conversion happens in broadcastMessages()
legacyMsg := &BroadcastMessage{
TrackID: trackID, TrackID: trackID,
Type: "analytics_update", Type: "analytics_update",
Data: analytics, Data: analytics,
@ -351,7 +406,7 @@ func (h *PlaybackWebSocketHandler) BroadcastAnalyticsUpdate(trackID int64, analy
} }
select { select {
case h.broadcast <- message: case h.broadcast <- legacyMsg:
default: default:
h.logger.Warn("Broadcast channel full, dropping message", h.logger.Warn("Broadcast channel full, dropping message",
zap.Int64("track_id", trackID)) zap.Int64("track_id", trackID))
@ -360,12 +415,15 @@ func (h *PlaybackWebSocketHandler) BroadcastAnalyticsUpdate(trackID int64, analy
// BroadcastStatsUpdate diffuse une mise à jour de statistiques à tous les clients abonnés // BroadcastStatsUpdate diffuse une mise à jour de statistiques à tous les clients abonnés
// T0368: Create Playback Analytics Real-time Updates // T0368: Create Playback Analytics Real-time Updates
// INT-014: Updated to use standardized message format
func (h *PlaybackWebSocketHandler) BroadcastStatsUpdate(trackID int64, stats *services.PlaybackStats) { func (h *PlaybackWebSocketHandler) BroadcastStatsUpdate(trackID int64, stats *services.PlaybackStats) {
if stats == nil { if stats == nil {
return return
} }
message := &BroadcastMessage{ // INT-014: Convert to legacy format for broadcast channel (backward compatibility)
// Note: Standardized format conversion happens in broadcastMessages()
legacyMsg := &BroadcastMessage{
TrackID: trackID, TrackID: trackID,
Type: "stats_update", Type: "stats_update",
Data: stats, Data: stats,
@ -373,7 +431,7 @@ func (h *PlaybackWebSocketHandler) BroadcastStatsUpdate(trackID int64, stats *se
} }
select { select {
case h.broadcast <- message: case h.broadcast <- legacyMsg:
default: default:
h.logger.Warn("Broadcast channel full, dropping message", h.logger.Warn("Broadcast channel full, dropping message",
zap.Int64("track_id", trackID)) zap.Int64("track_id", trackID))

View file

@ -0,0 +1,139 @@
package websocket
import (
"encoding/json"
"time"
"github.com/google/uuid"
)
// INT-014: Standardized WebSocket message format
// MessageType represents the type of WebSocket message
type MessageType string
const (
// Connection messages
MessageTypePing MessageType = "ping"
MessageTypePong MessageType = "pong"
MessageTypeError MessageType = "error"
// Subscription messages
MessageTypeSubscribe MessageType = "subscribe"
MessageTypeUnsubscribe MessageType = "unsubscribe"
MessageTypeSubscribed MessageType = "subscribed"
MessageTypeUnsubscribed MessageType = "unsubscribed"
// Chat messages
MessageTypeChatMessage MessageType = "chat_message"
MessageTypeTyping MessageType = "typing"
MessageTypeReadReceipt MessageType = "read_receipt"
MessageTypeUserJoined MessageType = "user_joined"
MessageTypeUserLeft MessageType = "user_left"
MessageTypeConversationUpdated MessageType = "conversation_updated"
// Playback messages
MessageTypePlaybackState MessageType = "playback_state"
MessageTypePlaybackSync MessageType = "playback_sync"
MessageTypeAnalyticsUpdate MessageType = "analytics_update"
MessageTypeStatsUpdate MessageType = "stats_update"
// Notification messages
MessageTypeNotification MessageType = "notification"
)
// WebSocketMessage represents a standardized WebSocket message format
// INT-014: All WebSocket messages follow this structure
type WebSocketMessage struct {
// Message identification
ID string `json:"id,omitempty"` // Unique message ID (UUID)
Type string `json:"type"` // Message type (required)
Timestamp string `json:"timestamp"` // ISO 8601 timestamp (RFC3339)
// Message data
Data interface{} `json:"data,omitempty"` // Message payload (optional)
// Error information (for error messages)
Error *MessageError `json:"error,omitempty"` // Error details (for error type messages)
// Context information
RequestID string `json:"request_id,omitempty"` // Request ID for correlation
UserID string `json:"user_id,omitempty"` // User ID (if applicable)
TrackID string `json:"track_id,omitempty"` // Track ID (if applicable)
ConversationID string `json:"conversation_id,omitempty"` // Conversation ID (if applicable)
}
// MessageError represents error information in WebSocket messages
type MessageError struct {
Code int `json:"code"` // Error code
Message string `json:"message"` // Error message
Details map[string]interface{} `json:"details,omitempty"` // Additional error details
}
// NewWebSocketMessage creates a new WebSocket message with standardized format
func NewWebSocketMessage(msgType MessageType, data interface{}) *WebSocketMessage {
return &WebSocketMessage{
ID: uuid.New().String(),
Type: string(msgType),
Timestamp: time.Now().UTC().Format(time.RFC3339),
Data: data,
}
}
// NewErrorMessage creates a new error WebSocket message
func NewErrorMessage(code int, message string, details map[string]interface{}) *WebSocketMessage {
return &WebSocketMessage{
ID: uuid.New().String(),
Type: string(MessageTypeError),
Timestamp: time.Now().UTC().Format(time.RFC3339),
Error: &MessageError{
Code: code,
Message: message,
Details: details,
},
}
}
// WithRequestID sets the request ID for correlation
func (m *WebSocketMessage) WithRequestID(requestID string) *WebSocketMessage {
m.RequestID = requestID
return m
}
// WithUserID sets the user ID
func (m *WebSocketMessage) WithUserID(userID string) *WebSocketMessage {
m.UserID = userID
return m
}
// WithTrackID sets the track ID
func (m *WebSocketMessage) WithTrackID(trackID string) *WebSocketMessage {
m.TrackID = trackID
return m
}
// WithConversationID sets the conversation ID
func (m *WebSocketMessage) WithConversationID(conversationID string) *WebSocketMessage {
m.ConversationID = conversationID
return m
}
// ToJSON converts the message to JSON bytes
func (m *WebSocketMessage) ToJSON() ([]byte, error) {
return json.Marshal(m)
}
// ParseWebSocketMessage parses a JSON message into WebSocketMessage
func ParseWebSocketMessage(data []byte) (*WebSocketMessage, error) {
var msg WebSocketMessage
if err := json.Unmarshal(data, &msg); err != nil {
return nil, err
}
return &msg, nil
}
// IsValid checks if the message has required fields
func (m *WebSocketMessage) IsValid() bool {
return m.Type != "" && m.Timestamp != ""
}