- Create Hub with register/unregister/broadcast, room/user index - Create Client with readPump/writePump goroutines, 30s ping keepalive - Define all 18 incoming + 18 outgoing message types matching Rust protocol - Add ValidateChatToken to ChatService for JWT validation - Update WSUrl from /ws to /api/v1/ws - Register GET /api/v1/ws endpoint in router - Create ChatWebSocketHandler for WebSocket upgrade and auth
140 lines
2.9 KiB
Go
140 lines
2.9 KiB
Go
package chat
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"net"
|
|
"time"
|
|
|
|
"github.com/coder/websocket"
|
|
"github.com/google/uuid"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
const (
|
|
writeTimeout = 10 * time.Second
|
|
readTimeout = 60 * time.Second
|
|
pingInterval = 30 * time.Second
|
|
maxMessageSize = 32768
|
|
)
|
|
|
|
type Client struct {
|
|
Hub *Hub
|
|
Conn *websocket.Conn
|
|
UserID uuid.UUID
|
|
Username string
|
|
send chan []byte
|
|
handler *MessageHandler
|
|
logger *zap.Logger
|
|
}
|
|
|
|
func NewClient(hub *Hub, conn *websocket.Conn, userID uuid.UUID, username string, handler *MessageHandler, logger *zap.Logger) *Client {
|
|
return &Client{
|
|
Hub: hub,
|
|
Conn: conn,
|
|
UserID: userID,
|
|
Username: username,
|
|
send: make(chan []byte, 256),
|
|
handler: handler,
|
|
logger: logger,
|
|
}
|
|
}
|
|
|
|
func (c *Client) ReadPump(ctx context.Context) {
|
|
defer func() {
|
|
c.Hub.unregister <- c
|
|
_ = c.Conn.Close(websocket.StatusNormalClosure, "")
|
|
}()
|
|
|
|
c.Conn.SetReadLimit(maxMessageSize)
|
|
|
|
for {
|
|
readCtx, cancel := context.WithTimeout(ctx, readTimeout)
|
|
_, message, err := c.Conn.Read(readCtx)
|
|
cancel()
|
|
|
|
if err != nil {
|
|
if !errors.Is(err, net.ErrClosed) && !errors.Is(err, context.Canceled) {
|
|
status := websocket.CloseStatus(err)
|
|
if status != websocket.StatusGoingAway && status != websocket.StatusNormalClosure {
|
|
c.logger.Warn("WebSocket read error",
|
|
zap.Error(err),
|
|
zap.String("user_id", c.UserID.String()))
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
var incoming IncomingMessage
|
|
if err := json.Unmarshal(message, &incoming); err != nil {
|
|
c.logger.Warn("Invalid message format",
|
|
zap.Error(err),
|
|
zap.String("user_id", c.UserID.String()))
|
|
c.SendJSON(NewErrorResponse("Invalid message format"))
|
|
continue
|
|
}
|
|
|
|
if c.handler != nil {
|
|
c.handler.Dispatch(ctx, c, &incoming)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Client) WritePump(ctx context.Context) {
|
|
ticker := time.NewTicker(pingInterval)
|
|
defer func() {
|
|
ticker.Stop()
|
|
_ = c.Conn.Close(websocket.StatusNormalClosure, "")
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case message, ok := <-c.send:
|
|
if !ok {
|
|
_ = c.Conn.Close(websocket.StatusNormalClosure, "")
|
|
return
|
|
}
|
|
|
|
writeCtx, cancel := context.WithTimeout(ctx, writeTimeout)
|
|
err := c.Conn.Write(writeCtx, websocket.MessageText, message)
|
|
cancel()
|
|
|
|
if err != nil {
|
|
c.logger.Warn("WebSocket write error",
|
|
zap.Error(err),
|
|
zap.String("user_id", c.UserID.String()))
|
|
return
|
|
}
|
|
|
|
case <-ticker.C:
|
|
pingCtx, cancel := context.WithTimeout(ctx, writeTimeout)
|
|
err := c.Conn.Ping(pingCtx)
|
|
cancel()
|
|
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Client) SendJSON(v interface{}) {
|
|
data, err := json.Marshal(v)
|
|
if err != nil {
|
|
c.logger.Error("Failed to marshal message",
|
|
zap.Error(err),
|
|
zap.String("user_id", c.UserID.String()))
|
|
return
|
|
}
|
|
|
|
select {
|
|
case c.send <- data:
|
|
default:
|
|
c.logger.Warn("Client send buffer full",
|
|
zap.String("user_id", c.UserID.String()))
|
|
}
|
|
}
|