fix(backend): synchronous Hub.Shutdown to eliminate goleak failures
The chat Hub's Shutdown() only closed the done channel and returned immediately, racing against goleak.VerifyNone in TestHub_*. Worse, the broadcast saturation path spawned a fire-and-forget goroutine to send on the unregister channel, which could leak if Run() exited mid-flight. Fix: - Add `stopped` channel closed by Run() on exit; Shutdown() waits on it. - Buffer `unregister` (256) and replace the anonymous goroutine with a non-blocking select. Worst case the client is reaped on its next failed broadcast attempt. - handler_messages_test.go's setupTestHandler started a Hub but never shut it down, leaking Run() goroutines into the hub_test.go run that followed. Register t.Cleanup(hub.Shutdown) and close the gorm sqlite connection too — the connectionOpener goroutine was the secondary leak.
This commit is contained in:
parent
055b94c637
commit
87e1e0a5ab
2 changed files with 19 additions and 5 deletions
|
|
@ -24,6 +24,11 @@ func setupTestHandler(t *testing.T) (*MessageHandler, *Hub, *gorm.DB) {
|
|||
|
||||
db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
if sqlDB, err := db.DB(); err == nil {
|
||||
_ = sqlDB.Close()
|
||||
}
|
||||
})
|
||||
|
||||
require.NoError(t, db.AutoMigrate(
|
||||
&models.ChatMessage{},
|
||||
|
|
@ -35,6 +40,7 @@ func setupTestHandler(t *testing.T) (*MessageHandler, *Hub, *gorm.DB) {
|
|||
hub := NewHub(logger, nil)
|
||||
go hub.Run()
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
t.Cleanup(hub.Shutdown)
|
||||
|
||||
msgRepo := repositories.NewChatMessageRepository(db)
|
||||
readRepo := repositories.NewReadReceiptRepository(db)
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ type Hub struct {
|
|||
unregister chan *Client
|
||||
broadcast chan *RoomBroadcast
|
||||
done chan struct{} // TASK-DEBT-008: lifecycle - close to stop Run()
|
||||
stopped chan struct{} // closed by Run() on exit; Shutdown waits on it
|
||||
mu sync.RWMutex
|
||||
logger *zap.Logger
|
||||
presenceService *ChatPresenceService
|
||||
|
|
@ -36,15 +37,17 @@ func NewHub(logger *zap.Logger, presenceService *ChatPresenceService) *Hub {
|
|||
rooms: make(map[uuid.UUID]map[*Client]bool),
|
||||
userIndex: make(map[uuid.UUID][]*Client),
|
||||
register: make(chan *Client),
|
||||
unregister: make(chan *Client),
|
||||
unregister: make(chan *Client, 256),
|
||||
broadcast: make(chan *RoomBroadcast, 256),
|
||||
done: make(chan struct{}),
|
||||
stopped: make(chan struct{}),
|
||||
logger: logger,
|
||||
presenceService: presenceService,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Hub) Run() {
|
||||
defer close(h.stopped)
|
||||
for {
|
||||
select {
|
||||
case <-h.done:
|
||||
|
|
@ -108,9 +111,12 @@ func (h *Hub) Run() {
|
|||
select {
|
||||
case client.send <- msg.Data:
|
||||
default:
|
||||
go func(c *Client) {
|
||||
h.unregister <- c
|
||||
}(client)
|
||||
// best-effort cleanup; if unregister is full, the client
|
||||
// will be reaped on its next failed broadcast attempt
|
||||
select {
|
||||
case h.unregister <- client:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -209,7 +215,9 @@ func (h *Hub) GetConnectedUsersCount() int {
|
|||
return len(h.userIndex)
|
||||
}
|
||||
|
||||
// Shutdown stops the Hub's Run loop (TASK-DEBT-008: goroutine lifecycle)
|
||||
// Shutdown stops the Hub's Run loop and waits for it to exit (TASK-DEBT-008: goroutine lifecycle).
|
||||
// Synchronous so goleak does not race against an in-flight Run() goroutine.
|
||||
func (h *Hub) Shutdown() {
|
||||
close(h.done)
|
||||
<-h.stopped
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue