veza/veza-backend-api/internal/handlers/live_stream_callback.go
senke eb2862092d
Some checks failed
Backend API CI / test-unit (push) Failing after 0s
Backend API CI / test-integration (push) Failing after 0s
Frontend CI / test (push) Failing after 0s
Storybook Audit / Build & audit Storybook (push) Failing after 0s
feat(v0.10.6): Livestreaming basique F471-F476
- Backend: callbacks on_publish/on_publish_done, UpdateStreamURL, GetByStreamKey
- Nginx-RTMP: config infra, docker-compose service (profil live)
- Frontend: stream_url dans LiveStream, HLS.js dans LiveViewPlayer, état Stream terminé
- Chat: rate limit send_live_message 1 msg/3s pour rooms live_streams
- Env: RTMP_CALLBACK_SECRET, STREAM_HLS_BASE_URL, NGINX_RTMP_HOST
- Roadmap v0.10.6 marquée DONE
2026-03-10 10:21:57 +01:00

120 lines
4.2 KiB
Go

package handlers
import (
"net/http"
"os"
"github.com/gin-gonic/gin"
"go.uber.org/zap"
apperrors "veza-backend-api/internal/errors"
"veza-backend-api/internal/services"
)
// LiveStreamCallbackHandler handles Nginx-RTMP on_publish / on_publish_done callbacks (v0.10.6 F471)
type LiveStreamCallbackHandler struct {
service *services.LiveStreamService
logger *zap.Logger
}
// NewLiveStreamCallbackHandler creates a new callback handler
func NewLiveStreamCallbackHandler(service *services.LiveStreamService, logger *zap.Logger) *LiveStreamCallbackHandler {
return &LiveStreamCallbackHandler{service: service, logger: logger}
}
// validateCallbackSecret returns true if the request is authorized
func validateCallbackSecret(c *gin.Context) bool {
expect := os.Getenv("RTMP_CALLBACK_SECRET")
if expect == "" {
return true // Allow in dev when not configured
}
got := c.GetHeader("X-RTMP-Callback-Secret")
if got == "" {
got = c.Query("secret")
}
return got == expect
}
// HandlePublish is called by Nginx-RTMP on_publish. Params: name=stream_key
// On success: SetIsLive(true), UpdateStreamURL with HLS playlist URL
func (h *LiveStreamCallbackHandler) HandlePublish(c *gin.Context) {
if !validateCallbackSecret(c) {
RespondWithAppError(c, apperrors.New(apperrors.ErrCodeUnauthorized, "invalid callback secret"))
return
}
streamKey := c.Query("name")
if streamKey == "" {
streamKey = c.PostForm("name")
}
if streamKey == "" {
RespondWithAppError(c, apperrors.NewValidationError("missing stream key (name)"))
return
}
stream, err := h.service.GetByStreamKey(c.Request.Context(), streamKey)
if err != nil {
h.logger.Warn("Live publish: invalid stream key", zap.String("key", streamKey), zap.Error(err))
RespondWithAppError(c, apperrors.NewNotFoundError("invalid stream key"))
return
}
// Nginx-RTMP writes to /tmp/hls/{stream_key}/ so we use stream_key in the URL
baseURL := os.Getenv("STREAM_HLS_BASE_URL")
if baseURL == "" {
baseURL = "http://localhost:18083/live"
}
streamURL := baseURL + "/" + stream.StreamKey + "/playlist.m3u8"
if err := h.service.SetIsLive(c.Request.Context(), stream.ID, true); err != nil {
h.logger.Error("Live publish: SetIsLive failed", zap.Error(err))
RespondWithAppError(c, apperrors.Wrap(apperrors.ErrCodeInternal, "failed to set stream live", err))
return
}
if err := h.service.UpdateStreamURL(c.Request.Context(), stream.ID, streamURL); err != nil {
h.logger.Error("Live publish: UpdateStreamURL failed", zap.Error(err))
RespondWithAppError(c, apperrors.Wrap(apperrors.ErrCodeInternal, "failed to update stream URL", err))
return
}
if err := h.service.UpdateViewerCount(c.Request.Context(), stream.ID, 0); err != nil {
_ = err // non-fatal
}
h.logger.Info("Live stream started", zap.String("stream_id", stream.ID.String()), zap.String("stream_key", streamKey))
c.AbortWithStatus(http.StatusOK)
}
// HandlePublishDone is called by Nginx-RTMP on_publish_done. Params: name=stream_key
func (h *LiveStreamCallbackHandler) HandlePublishDone(c *gin.Context) {
if !validateCallbackSecret(c) {
RespondWithAppError(c, apperrors.New(apperrors.ErrCodeUnauthorized, "invalid callback secret"))
return
}
streamKey := c.Query("name")
if streamKey == "" {
streamKey = c.PostForm("name")
}
if streamKey == "" {
RespondWithAppError(c, apperrors.NewValidationError("missing stream key (name)"))
return
}
stream, err := h.service.GetByStreamKey(c.Request.Context(), streamKey)
if err != nil {
h.logger.Warn("Live publish_done: stream not found", zap.String("key", streamKey))
c.AbortWithStatus(http.StatusOK) // Nginx expects 2xx even if we don't know the stream
return
}
if err := h.service.SetIsLive(c.Request.Context(), stream.ID, false); err != nil {
h.logger.Error("Live publish_done: SetIsLive failed", zap.Error(err))
}
if err := h.service.UpdateStreamURL(c.Request.Context(), stream.ID, ""); err != nil {
h.logger.Error("Live publish_done: UpdateStreamURL failed", zap.Error(err))
}
if err := h.service.UpdateViewerCount(c.Request.Context(), stream.ID, -stream.ViewerCount); err != nil {
_ = err // reset to 0
}
h.logger.Info("Live stream ended", zap.String("stream_id", stream.ID.String()))
c.AbortWithStatus(http.StatusOK)
}