[BE-TEST-011] test: Add integration tests for ownership checks

- Added comprehensive integration tests for ownership middleware:
  * Track owner access (should succeed)
  * Track non-owner access (should be forbidden)
  * Track admin access (should succeed with override)
  * Playlist owner access (should succeed)
  * Playlist non-owner access (should be forbidden)
  * Resource not found (should return 404)
  * Unauthenticated access (should return 401)
  * Complete flow with multiple resources
- Tests use real services and in-memory database for end-to-end testing
- All tests tagged with integration build tag
This commit is contained in:
senke 2025-12-25 01:41:42 +01:00
parent 42d0e5785e
commit aeaf4620da
2 changed files with 605 additions and 5 deletions

View file

@ -5460,7 +5460,7 @@
"description": "Test that ownership middleware works correctly",
"owner": "backend",
"estimated_hours": 4,
"status": "todo",
"status": "completed",
"files_involved": [],
"implementation_steps": [
{
@ -5481,7 +5481,18 @@
"Unit tests",
"Integration tests"
],
"notes": ""
"notes": "",
"completion": {
"completed_at": "2025-12-25T01:41:41.590645",
"completed_by": "autonomous-agent",
"notes": "Added comprehensive integration tests for ownership checks middleware. Tests cover: Track owner access, Track non-owner access (forbidden), Track admin access (override), Playlist owner access, Playlist non-owner access (forbidden), Resource not found (404), Unauthenticated access (401), and Complete flow with multiple resources. All tests use real services and in-memory database for end-to-end testing.",
"files_modified": [
"veza-backend-api/internal/middleware/ownership_integration_test.go"
]
},
"progress_tracking": {
"last_updated": "2025-12-25T01:41:41.590661"
}
},
{
"id": "BE-TEST-012",
@ -11232,11 +11243,11 @@
]
},
"progress_tracking": {
"completed": 130,
"completed": 131,
"in_progress": 0,
"todo": 141,
"blocked": 0,
"last_updated": "2025-12-25T01:39:42.334643",
"completion_percentage": 48.68913857677903
"last_updated": "2025-12-25T01:41:41.590680",
"completion_percentage": 49.063670411985015
}
}

View file

@ -0,0 +1,589 @@
//go:build integration
// +build integration
package middleware
import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"
"veza-backend-api/internal/models"
"veza-backend-api/internal/services"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
)
// setupOwnershipIntegrationTestRouter crée un router de test avec le middleware d'ownership
func setupOwnershipIntegrationTestRouter(t *testing.T) (*gin.Engine, *gorm.DB, *AuthMiddleware, func()) {
gin.SetMode(gin.TestMode)
// Setup in-memory SQLite database
db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
require.NoError(t, err)
// Enable foreign keys for SQLite
db.Exec("PRAGMA foreign_keys = ON")
// Auto-migrate all models
err = db.AutoMigrate(
&models.User{},
&models.Track{},
&models.Playlist{},
&models.Role{},
&models.Permission{},
&models.UserRole{},
&models.RolePermission{},
)
require.NoError(t, err)
// Setup logger
logger := zap.NewNop()
// Setup PermissionService
permissionService := services.NewPermissionService(db)
// Setup AuthMiddleware
authMiddleware := &AuthMiddleware{
permissionService: permissionService,
logger: logger,
}
// Create router
router := gin.New()
// Mock authentication middleware - set user_id from header
router.Use(func(c *gin.Context) {
userIDStr := c.GetHeader("X-User-ID")
if userIDStr == "" {
c.JSON(http.StatusUnauthorized, gin.H{"error": "unauthorized"})
c.Abort()
return
}
uid, err := uuid.Parse(userIDStr)
if err != nil {
c.JSON(http.StatusUnauthorized, gin.H{"error": "invalid user id"})
c.Abort()
return
}
c.Set("user_id", uid)
c.Next()
})
cleanup := func() {
// Database will be closed automatically
}
return router, db, authMiddleware, cleanup
}
// createTestUser crée un utilisateur de test
func createTestUserForOwnership(t *testing.T, db *gorm.DB, userID uuid.UUID, username string, isAdmin bool) *models.User {
user := &models.User{
ID: userID,
Username: username,
Slug: username,
Email: username + "@example.com",
PasswordHash: "hashed_password",
IsActive: true,
CreatedAt: time.Now(),
}
err := db.Create(user).Error
require.NoError(t, err)
if isAdmin {
// Create admin role
adminRole := &models.Role{
ID: uuid.New(),
Name: "admin",
IsActive: true,
IsSystem: true,
CreatedAt: time.Now(),
}
err = db.FirstOrCreate(adminRole, models.Role{Name: "admin"}).Error
require.NoError(t, err)
// Assign admin role to user
userRole := &models.UserRole{
ID: uuid.New(),
UserID: userID,
RoleID: adminRole.ID,
RoleName: "admin",
IsActive: true,
AssignedAt: time.Now(),
}
err = db.Create(userRole).Error
require.NoError(t, err)
}
return user
}
// createTestTrack crée un track de test
func createTestTrackForOwnership(t *testing.T, db *gorm.DB, trackID uuid.UUID, userID uuid.UUID) *models.Track {
track := &models.Track{
ID: trackID,
UserID: userID,
Title: "Test Track",
Artist: "Test Artist",
FilePath: "/tmp/test.mp3",
IsPublic: true,
Status: models.TrackStatusCompleted,
CreatedAt: time.Now(),
}
err := db.Create(track).Error
require.NoError(t, err)
return track
}
// createTestPlaylist crée une playlist de test
func createTestPlaylistForOwnership(t *testing.T, db *gorm.DB, playlistID uuid.UUID, userID uuid.UUID) *models.Playlist {
playlist := &models.Playlist{
ID: playlistID,
UserID: userID,
Title: "Test Playlist",
Description: "Test Description",
IsPublic: true,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
err := db.Create(playlist).Error
require.NoError(t, err)
return playlist
}
// TestOwnershipMiddleware_TrackOwnerAccess teste que le propriétaire d'un track peut y accéder
func TestOwnershipMiddleware_TrackOwnerAccess(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
router, db, authMiddleware, cleanup := setupOwnershipIntegrationTestRouter(t)
defer cleanup()
// Create test user
ownerID := uuid.New()
createTestUserForOwnership(t, db, ownerID, "owner", false)
// Create test track
trackID := uuid.New()
createTestTrackForOwnership(t, db, trackID, ownerID)
// Setup track owner resolver
trackOwnerResolver := func(c *gin.Context) (uuid.UUID, error) {
trackIDStr := c.Param("id")
trackID, err := uuid.Parse(trackIDStr)
if err != nil {
return uuid.Nil, err
}
var track models.Track
if err := db.First(&track, "id = ?", trackID).Error; err != nil {
return uuid.Nil, err
}
return track.UserID, nil
}
// Setup route with ownership middleware
router.PUT("/tracks/:id", authMiddleware.RequireOwnershipOrAdmin("track", trackOwnerResolver), func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"message": "success"})
})
// Test: Owner tries to access their own track
req := httptest.NewRequest("PUT", "/tracks/"+trackID.String(), nil)
req.Header.Set("X-User-ID", ownerID.String())
w := httptest.NewRecorder()
router.ServeHTTP(w, req)
assert.Equal(t, http.StatusOK, w.Code)
var response map[string]interface{}
err := json.Unmarshal(w.Body.Bytes(), &response)
require.NoError(t, err)
assert.Equal(t, "success", response["message"])
}
// TestOwnershipMiddleware_TrackNonOwnerAccess teste qu'un non-propriétaire ne peut pas accéder à un track
func TestOwnershipMiddleware_TrackNonOwnerAccess(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
router, db, authMiddleware, cleanup := setupOwnershipIntegrationTestRouter(t)
defer cleanup()
// Create two users
ownerID := uuid.New()
otherUserID := uuid.New()
createTestUserForOwnership(t, db, ownerID, "owner", false)
createTestUserForOwnership(t, db, otherUserID, "otheruser", false)
// Create test track owned by owner
trackID := uuid.New()
createTestTrackForOwnership(t, db, trackID, ownerID)
// Setup track owner resolver
trackOwnerResolver := func(c *gin.Context) (uuid.UUID, error) {
trackIDStr := c.Param("id")
trackID, err := uuid.Parse(trackIDStr)
if err != nil {
return uuid.Nil, err
}
var track models.Track
if err := db.First(&track, "id = ?", trackID).Error; err != nil {
return uuid.Nil, err
}
return track.UserID, nil
}
// Setup route with ownership middleware
router.PUT("/tracks/:id", authMiddleware.RequireOwnershipOrAdmin("track", trackOwnerResolver), func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"message": "success"})
})
// Test: Other user tries to access owner's track
req := httptest.NewRequest("PUT", "/tracks/"+trackID.String(), nil)
req.Header.Set("X-User-ID", otherUserID.String())
w := httptest.NewRecorder()
router.ServeHTTP(w, req)
assert.Equal(t, http.StatusForbidden, w.Code)
}
// TestOwnershipMiddleware_TrackAdminAccess teste qu'un admin peut accéder à n'importe quel track
func TestOwnershipMiddleware_TrackAdminAccess(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
router, db, authMiddleware, cleanup := setupOwnershipIntegrationTestRouter(t)
defer cleanup()
// Create owner and admin users
ownerID := uuid.New()
adminID := uuid.New()
createTestUserForOwnership(t, db, ownerID, "owner", false)
createTestUserForOwnership(t, db, adminID, "admin", true)
// Create test track owned by owner
trackID := uuid.New()
createTestTrackForOwnership(t, db, trackID, ownerID)
// Setup track owner resolver
trackOwnerResolver := func(c *gin.Context) (uuid.UUID, error) {
trackIDStr := c.Param("id")
trackID, err := uuid.Parse(trackIDStr)
if err != nil {
return uuid.Nil, err
}
var track models.Track
if err := db.First(&track, "id = ?", trackID).Error; err != nil {
return uuid.Nil, err
}
return track.UserID, nil
}
// Setup route with ownership middleware
router.PUT("/tracks/:id", authMiddleware.RequireOwnershipOrAdmin("track", trackOwnerResolver), func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"message": "success"})
})
// Test: Admin tries to access owner's track
req := httptest.NewRequest("PUT", "/tracks/"+trackID.String(), nil)
req.Header.Set("X-User-ID", adminID.String())
w := httptest.NewRecorder()
router.ServeHTTP(w, req)
assert.Equal(t, http.StatusOK, w.Code)
var response map[string]interface{}
err := json.Unmarshal(w.Body.Bytes(), &response)
require.NoError(t, err)
assert.Equal(t, "success", response["message"])
}
// TestOwnershipMiddleware_PlaylistOwnerAccess teste que le propriétaire d'une playlist peut y accéder
func TestOwnershipMiddleware_PlaylistOwnerAccess(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
router, db, authMiddleware, cleanup := setupOwnershipIntegrationTestRouter(t)
defer cleanup()
// Create test user
ownerID := uuid.New()
createTestUserForOwnership(t, db, ownerID, "owner", false)
// Create test playlist
playlistID := uuid.New()
createTestPlaylistForOwnership(t, db, playlistID, ownerID)
// Setup playlist owner resolver
playlistOwnerResolver := func(c *gin.Context) (uuid.UUID, error) {
playlistIDStr := c.Param("id")
playlistID, err := uuid.Parse(playlistIDStr)
if err != nil {
return uuid.Nil, err
}
var playlist models.Playlist
if err := db.First(&playlist, "id = ?", playlistID).Error; err != nil {
return uuid.Nil, err
}
return playlist.UserID, nil
}
// Setup route with ownership middleware
router.PUT("/playlists/:id", authMiddleware.RequireOwnershipOrAdmin("playlist", playlistOwnerResolver), func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"message": "success"})
})
// Test: Owner tries to access their own playlist
req := httptest.NewRequest("PUT", "/playlists/"+playlistID.String(), nil)
req.Header.Set("X-User-ID", ownerID.String())
w := httptest.NewRecorder()
router.ServeHTTP(w, req)
assert.Equal(t, http.StatusOK, w.Code)
var response map[string]interface{}
err := json.Unmarshal(w.Body.Bytes(), &response)
require.NoError(t, err)
assert.Equal(t, "success", response["message"])
}
// TestOwnershipMiddleware_PlaylistNonOwnerAccess teste qu'un non-propriétaire ne peut pas accéder à une playlist
func TestOwnershipMiddleware_PlaylistNonOwnerAccess(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
router, db, authMiddleware, cleanup := setupOwnershipIntegrationTestRouter(t)
defer cleanup()
// Create two users
ownerID := uuid.New()
otherUserID := uuid.New()
createTestUserForOwnership(t, db, ownerID, "owner", false)
createTestUserForOwnership(t, db, otherUserID, "otheruser", false)
// Create test playlist owned by owner
playlistID := uuid.New()
createTestPlaylistForOwnership(t, db, playlistID, ownerID)
// Setup playlist owner resolver
playlistOwnerResolver := func(c *gin.Context) (uuid.UUID, error) {
playlistIDStr := c.Param("id")
playlistID, err := uuid.Parse(playlistIDStr)
if err != nil {
return uuid.Nil, err
}
var playlist models.Playlist
if err := db.First(&playlist, "id = ?", playlistID).Error; err != nil {
return uuid.Nil, err
}
return playlist.UserID, nil
}
// Setup route with ownership middleware
router.PUT("/playlists/:id", authMiddleware.RequireOwnershipOrAdmin("playlist", playlistOwnerResolver), func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"message": "success"})
})
// Test: Other user tries to access owner's playlist
req := httptest.NewRequest("PUT", "/playlists/"+playlistID.String(), nil)
req.Header.Set("X-User-ID", otherUserID.String())
w := httptest.NewRecorder()
router.ServeHTTP(w, req)
assert.Equal(t, http.StatusForbidden, w.Code)
}
// TestOwnershipMiddleware_ResourceNotFound teste que le middleware retourne 404 si la ressource n'existe pas
func TestOwnershipMiddleware_ResourceNotFound(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
router, db, authMiddleware, cleanup := setupOwnershipIntegrationTestRouter(t)
defer cleanup()
// Create test user
userID := uuid.New()
createTestUserForOwnership(t, db, userID, "user", false)
// Setup track owner resolver
trackOwnerResolver := func(c *gin.Context) (uuid.UUID, error) {
trackIDStr := c.Param("id")
trackID, err := uuid.Parse(trackIDStr)
if err != nil {
return uuid.Nil, err
}
var track models.Track
if err := db.First(&track, "id = ?", trackID).Error; err != nil {
return uuid.Nil, err // Resource not found
}
return track.UserID, nil
}
// Setup route with ownership middleware
router.PUT("/tracks/:id", authMiddleware.RequireOwnershipOrAdmin("track", trackOwnerResolver), func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"message": "success"})
})
// Test: Try to access non-existent track
nonExistentTrackID := uuid.New()
req := httptest.NewRequest("PUT", "/tracks/"+nonExistentTrackID.String(), nil)
req.Header.Set("X-User-ID", userID.String())
w := httptest.NewRecorder()
router.ServeHTTP(w, req)
assert.Equal(t, http.StatusNotFound, w.Code)
}
// TestOwnershipMiddleware_UnauthenticatedAccess teste que le middleware rejette les requêtes non authentifiées
func TestOwnershipMiddleware_UnauthenticatedAccess(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
router, db, authMiddleware, cleanup := setupOwnershipIntegrationTestRouter(t)
defer cleanup()
// Create test track
ownerID := uuid.New()
createTestUserForOwnership(t, db, ownerID, "owner", false)
trackID := uuid.New()
createTestTrackForOwnership(t, db, trackID, ownerID)
// Setup track owner resolver
trackOwnerResolver := func(c *gin.Context) (uuid.UUID, error) {
trackIDStr := c.Param("id")
trackID, err := uuid.Parse(trackIDStr)
if err != nil {
return uuid.Nil, err
}
var track models.Track
if err := db.First(&track, "id = ?", trackID).Error; err != nil {
return uuid.Nil, err
}
return track.UserID, nil
}
// Setup route with ownership middleware
router.PUT("/tracks/:id", authMiddleware.RequireOwnershipOrAdmin("track", trackOwnerResolver), func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"message": "success"})
})
// Test: Try to access without authentication
req := httptest.NewRequest("PUT", "/tracks/"+trackID.String(), nil)
// No X-User-ID header
w := httptest.NewRecorder()
router.ServeHTTP(w, req)
assert.Equal(t, http.StatusUnauthorized, w.Code)
}
// TestOwnershipMiddleware_CompleteFlow teste le flux complet d'ownership avec différentes ressources
func TestOwnershipMiddleware_CompleteFlow(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
router, db, authMiddleware, cleanup := setupOwnershipIntegrationTestRouter(t)
defer cleanup()
// Create users: owner, other user, and admin
ownerID := uuid.New()
otherUserID := uuid.New()
adminID := uuid.New()
createTestUserForOwnership(t, db, ownerID, "owner", false)
createTestUserForOwnership(t, db, otherUserID, "otheruser", false)
createTestUserForOwnership(t, db, adminID, "admin", true)
// Create resources
trackID := uuid.New()
playlistID := uuid.New()
createTestTrackForOwnership(t, db, trackID, ownerID)
createTestPlaylistForOwnership(t, db, playlistID, ownerID)
// Setup resolvers
trackOwnerResolver := func(c *gin.Context) (uuid.UUID, error) {
trackIDStr := c.Param("id")
trackID, err := uuid.Parse(trackIDStr)
if err != nil {
return uuid.Nil, err
}
var track models.Track
if err := db.First(&track, "id = ?", trackID).Error; err != nil {
return uuid.Nil, err
}
return track.UserID, nil
}
playlistOwnerResolver := func(c *gin.Context) (uuid.UUID, error) {
playlistIDStr := c.Param("id")
playlistID, err := uuid.Parse(playlistIDStr)
if err != nil {
return uuid.Nil, err
}
var playlist models.Playlist
if err := db.First(&playlist, "id = ?", playlistID).Error; err != nil {
return uuid.Nil, err
}
return playlist.UserID, nil
}
// Setup routes
router.PUT("/tracks/:id", authMiddleware.RequireOwnershipOrAdmin("track", trackOwnerResolver), func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"message": "track updated"})
})
router.PUT("/playlists/:id", authMiddleware.RequireOwnershipOrAdmin("playlist", playlistOwnerResolver), func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"message": "playlist updated"})
})
// Test 1: Owner can access their own resources
req := httptest.NewRequest("PUT", "/tracks/"+trackID.String(), nil)
req.Header.Set("X-User-ID", ownerID.String())
w := httptest.NewRecorder()
router.ServeHTTP(w, req)
assert.Equal(t, http.StatusOK, w.Code)
req = httptest.NewRequest("PUT", "/playlists/"+playlistID.String(), nil)
req.Header.Set("X-User-ID", ownerID.String())
w = httptest.NewRecorder()
router.ServeHTTP(w, req)
assert.Equal(t, http.StatusOK, w.Code)
// Test 2: Other user cannot access owner's resources
req = httptest.NewRequest("PUT", "/tracks/"+trackID.String(), nil)
req.Header.Set("X-User-ID", otherUserID.String())
w = httptest.NewRecorder()
router.ServeHTTP(w, req)
assert.Equal(t, http.StatusForbidden, w.Code)
req = httptest.NewRequest("PUT", "/playlists/"+playlistID.String(), nil)
req.Header.Set("X-User-ID", otherUserID.String())
w = httptest.NewRecorder()
router.ServeHTTP(w, req)
assert.Equal(t, http.StatusForbidden, w.Code)
// Test 3: Admin can access any resource
req = httptest.NewRequest("PUT", "/tracks/"+trackID.String(), nil)
req.Header.Set("X-User-ID", adminID.String())
w = httptest.NewRecorder()
router.ServeHTTP(w, req)
assert.Equal(t, http.StatusOK, w.Code)
req = httptest.NewRequest("PUT", "/playlists/"+playlistID.String(), nil)
req.Header.Set("X-User-ID", adminID.String())
w = httptest.NewRecorder()
router.ServeHTTP(w, req)
assert.Equal(t, http.StatusOK, w.Code)
}