fix(backend): J4 — GDPR-compliant hard delete with Redis and ES cleanup
Closes TODO(HIGH-007). When the hard-delete worker anonymizes a user past
their recovery deadline, it now also cleans the user's residual data from
Redis and Elasticsearch, not just PostgreSQL. Without this, a user who
invoked their right to erasure would still appear in cached feed/profile
responses and in ES search results for up to the next reindex cycle.
Worker changes (internal/workers/hard_delete_worker.go):
WithRedis / WithElasticsearch builder methods inject the clients. Both
are optional: if either is nil (feature disabled or unreachable), the
corresponding cleanup is skipped with a debug log and the worker keeps
going. Partial progress beats panic.
cleanRedisKeys uses SCAN with a cursor loop (COUNT 100), NEVER KEYS —
KEYS would block the Redis server on multi-million-key deployments.
Pattern is user:{id}:*. Transient SCAN errors retry up to 3 times with
100ms * retry linear backoff; persistent errors return without panic.
DEL errors on a batch are logged but non-fatal so subsequent batches
are still attempted.
cleanESDocs hits three indices independently:
- users index: DELETE doc by _id (the user UUID); 404 treated as
success (already gone = desired state)
- tracks index: DeleteByQuery with a terms filter on _id, using the
list of track IDs collected from PostgreSQL BEFORE anonymization
- playlists index: same pattern as tracks
A failure on one index does not prevent the others from being tried;
the first error is returned so the caller can log.
Track/playlist IDs are pre-collected (collectTrackIDs, collectPlaylistIDs)
before the UPDATE anonymization runs, because the anonymization does NOT
cascade (no DELETE on users), so tracks and playlists rows remain with
their creator_id / user_id intact and resolvable at query time.
Wiring (cmd/api/main.go):
The worker now receives cfg.RedisClient directly, and an optional ES
client built from elasticsearch.LoadConfig() + NewClient. If ES is
disabled or unreachable at startup, the worker logs a warning and
proceeds with Redis-only cleanup.
Tests (internal/workers/hard_delete_worker_test.go, +260 lines):
Pure-function unit tests:
- TestUUIDsToStrings
- TestEsIndexNameFor
Nil-client safety tests:
- TestCleanRedisKeys_NilClientIsNoop
- TestCleanESDocs_NilClientIsNoop
ES mock-server tests (httptest.Server mimicking /_doc and
/_delete_by_query endpoints with valid ES 8.11 responses):
- TestCleanESDocs_CallsAllThreeIndices — verifies the three expected
HTTP calls land with the right paths and request bodies containing
the provided UUIDs
- TestCleanESDocs_SkipsEmptyIDLists — verifies no DeleteByQuery is
issued when the ID lists are empty
Redis testcontainer integration test (gated by VEZA_SKIP_INTEGRATION):
- TestCleanRedisKeys_Integration — seeds 154 keys (4 fixed + 150 bulk
to force the SCAN loop past a single batch) plus 4 unrelated keys
from another user / global, runs cleanRedisKeys, asserts all 154
own keys are gone and all 4 unrelated keys remain.
Verification:
go build ./... OK
go vet ./... OK
VEZA_SKIP_INTEGRATION=1 go test ./internal/workers/... short OK
go test ./internal/workers/ -run TestCleanRedisKeys_Integration
→ testcontainers spins redis:7-alpine, test passes in 1.34s
Out of J4 scope (noted for a follow-up):
- No "activity" ES index exists in the codebase today (the audit plan
mentioned it as a possible target). The three real indices with user
data — users, tracks, playlists — are all now cleaned.
- Track artist strings (free-form) may still contain the user's
display name as a cached value in the tracks index after this
cleanup. Actual user-owned tracks are deleted here, but if a third
party's track referenced the removed user in its artist field, that
reference is not touched. Strict RGPD on that edge case is a
separate ticket.
Refs: AUDIT_REPORT.md §8.5, §10 P5, §12 item 1
This commit is contained in:
parent
67f18892af
commit
9cdfc6d898
3 changed files with 609 additions and 18 deletions
|
|
@ -21,6 +21,7 @@ import (
|
|||
"veza-backend-api/internal/api"
|
||||
"veza-backend-api/internal/config"
|
||||
"veza-backend-api/internal/core/marketplace"
|
||||
vezaes "veza-backend-api/internal/elasticsearch"
|
||||
"veza-backend-api/internal/metrics"
|
||||
"veza-backend-api/internal/services"
|
||||
"veza-backend-api/internal/shutdown"
|
||||
|
|
@ -218,10 +219,26 @@ func main() {
|
|||
|
||||
// v0.10.8 F065: Hard delete worker (GDPR - final anonymization after 30 days)
|
||||
if os.Getenv("HARD_DELETE_CRON_ENABLED") != "false" {
|
||||
hardDeleteWorker := workers.NewHardDeleteWorker(db.GormDB, logger, 24*time.Hour)
|
||||
// Optional ES client for the worker's RGPD cleanup (users/tracks/playlists indices).
|
||||
// Non-fatal if ES is disabled or unreachable — the worker will just skip ES cleanup.
|
||||
var hardDeleteESClient *vezaes.Client
|
||||
if esCfg := vezaes.LoadConfig(); esCfg.Enabled {
|
||||
if esc, esErr := vezaes.NewClient(esCfg, logger); esErr != nil {
|
||||
logger.Warn("Elasticsearch unavailable for hard delete worker, ES cleanup disabled",
|
||||
zap.Error(esErr))
|
||||
} else {
|
||||
hardDeleteESClient = esc
|
||||
}
|
||||
}
|
||||
hardDeleteWorker := workers.NewHardDeleteWorker(db.GormDB, logger, 24*time.Hour).
|
||||
WithRedis(cfg.RedisClient).
|
||||
WithElasticsearch(hardDeleteESClient)
|
||||
hardDeleteCtx, hardDeleteCancel := context.WithCancel(context.Background())
|
||||
go hardDeleteWorker.Start(hardDeleteCtx)
|
||||
logger.Info("Hard delete worker started (24h interval)")
|
||||
logger.Info("Hard delete worker started (24h interval)",
|
||||
zap.Bool("redis_cleanup", cfg.RedisClient != nil),
|
||||
zap.Bool("es_cleanup", hardDeleteESClient != nil),
|
||||
)
|
||||
|
||||
shutdownManager.Register(shutdown.NewShutdownFunc("hard_delete_worker", func(ctx context.Context) error {
|
||||
hardDeleteWorker.Stop()
|
||||
|
|
|
|||
|
|
@ -1,25 +1,49 @@
|
|||
package workers
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/elastic/go-elasticsearch/v8/esapi"
|
||||
"github.com/google/uuid"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"go.uber.org/zap"
|
||||
"gorm.io/gorm"
|
||||
|
||||
vezaes "veza-backend-api/internal/elasticsearch"
|
||||
)
|
||||
|
||||
// HardDeleteWorker performs final GDPR cleanup for users past recovery deadline (v0.10.8 F065)
|
||||
// Runs periodically to irreversibly anonymize accounts that were soft-deleted 30+ days ago
|
||||
// HardDeleteWorker performs final GDPR cleanup for users past recovery deadline (v0.10.8 F065).
|
||||
// Runs periodically to irreversibly anonymize accounts that were soft-deleted 30+ days ago.
|
||||
//
|
||||
// The worker cleans three stores:
|
||||
// - PostgreSQL: anonymize users row, delete from user_profiles, user_sessions, user_settings,
|
||||
// user_follows, notifications, and null user_id in audit_logs.
|
||||
// - Redis (optional): all keys matching user:{id}:* using SCAN with a cursor — never KEYS,
|
||||
// which would block the server on large keyspaces.
|
||||
// - Elasticsearch (optional): the user doc from the users index, plus all track docs and
|
||||
// playlist docs authored by the user (collected from PG before anonymization, then removed
|
||||
// from ES via DeleteByQuery on the _id terms).
|
||||
//
|
||||
// Both Redis and Elasticsearch are optional. If either client is nil (feature disabled or
|
||||
// unreachable at startup), the corresponding cleanup is skipped with a debug log and the
|
||||
// worker continues. Transient Redis errors trigger a bounded retry loop; persistent errors
|
||||
// are logged and the worker moves on to the next user (no silent panic).
|
||||
type HardDeleteWorker struct {
|
||||
db *gorm.DB
|
||||
logger *zap.Logger
|
||||
interval time.Duration
|
||||
stopChan chan struct{}
|
||||
running bool
|
||||
db *gorm.DB
|
||||
redisClient *redis.Client
|
||||
esClient *vezaes.Client
|
||||
logger *zap.Logger
|
||||
interval time.Duration
|
||||
stopChan chan struct{}
|
||||
running bool
|
||||
}
|
||||
|
||||
// NewHardDeleteWorker creates a new hard delete worker
|
||||
// NewHardDeleteWorker creates a new hard delete worker with only the required PostgreSQL
|
||||
// dependency. Use WithRedis and WithElasticsearch to enable the corresponding cleanups.
|
||||
func NewHardDeleteWorker(db *gorm.DB, logger *zap.Logger, interval time.Duration) *HardDeleteWorker {
|
||||
if logger == nil {
|
||||
logger = zap.NewNop()
|
||||
|
|
@ -36,14 +60,32 @@ func NewHardDeleteWorker(db *gorm.DB, logger *zap.Logger, interval time.Duration
|
|||
}
|
||||
}
|
||||
|
||||
// Start runs the worker periodically
|
||||
// WithRedis enables Redis cache key cleanup for the user:{id}:* pattern.
|
||||
// Pass nil to explicitly disable; pass a live *redis.Client to enable.
|
||||
func (w *HardDeleteWorker) WithRedis(client *redis.Client) *HardDeleteWorker {
|
||||
w.redisClient = client
|
||||
return w
|
||||
}
|
||||
|
||||
// WithElasticsearch enables Elasticsearch document cleanup. The veza wrapper is expected
|
||||
// (it carries the index prefix via Config). Pass nil to explicitly disable.
|
||||
func (w *HardDeleteWorker) WithElasticsearch(client *vezaes.Client) *HardDeleteWorker {
|
||||
w.esClient = client
|
||||
return w
|
||||
}
|
||||
|
||||
// Start runs the worker periodically.
|
||||
func (w *HardDeleteWorker) Start(ctx context.Context) {
|
||||
if w.running {
|
||||
w.logger.Warn("Hard delete worker is already running")
|
||||
return
|
||||
}
|
||||
w.running = true
|
||||
w.logger.Info("Starting hard delete worker", zap.Duration("interval", w.interval))
|
||||
w.logger.Info("Starting hard delete worker",
|
||||
zap.Duration("interval", w.interval),
|
||||
zap.Bool("redis_enabled", w.redisClient != nil),
|
||||
zap.Bool("elasticsearch_enabled", w.esClient != nil),
|
||||
)
|
||||
|
||||
go w.runOnce(ctx)
|
||||
|
||||
|
|
@ -66,7 +108,7 @@ func (w *HardDeleteWorker) Start(ctx context.Context) {
|
|||
}
|
||||
}
|
||||
|
||||
// Stop stops the worker
|
||||
// Stop stops the worker.
|
||||
func (w *HardDeleteWorker) Stop() {
|
||||
if !w.running {
|
||||
return
|
||||
|
|
@ -74,7 +116,7 @@ func (w *HardDeleteWorker) Stop() {
|
|||
close(w.stopChan)
|
||||
}
|
||||
|
||||
// runOnce executes one pass of hard delete / final anonymization
|
||||
// runOnce executes one pass of hard delete / final anonymization.
|
||||
func (w *HardDeleteWorker) runOnce(ctx context.Context) {
|
||||
logger := w.logger.With(zap.String("worker", "hard_delete"))
|
||||
runCtx, cancel := context.WithTimeout(ctx, 30*time.Minute)
|
||||
|
|
@ -97,7 +139,13 @@ func (w *HardDeleteWorker) runOnce(ctx context.Context) {
|
|||
logger.Info("Processing hard delete", zap.Int("count", len(userIDs)))
|
||||
|
||||
for _, id := range userIDs {
|
||||
// Final anonymization pass (ORIGIN §19.3) - ensure no PII remains
|
||||
// Collect ES doc IDs BEFORE PG anonymization, while creator/user relations
|
||||
// are still resolvable. The anonymization UPDATE does not cascade, so the
|
||||
// tracks and playlists rows still exist afterwards — we just need their IDs.
|
||||
trackIDs := w.collectTrackIDs(runCtx, logger, id)
|
||||
playlistIDs := w.collectPlaylistIDs(runCtx, logger, id)
|
||||
|
||||
// Final anonymization pass (ORIGIN §19.3) — ensure no PII remains in users row.
|
||||
res := w.db.WithContext(runCtx).Exec(`
|
||||
UPDATE users SET
|
||||
password_hash = '',
|
||||
|
|
@ -124,9 +172,275 @@ func (w *HardDeleteWorker) runOnce(ctx context.Context) {
|
|||
w.db.WithContext(runCtx).Exec("DELETE FROM user_follows WHERE follower_id = ? OR following_id = ?", id, id)
|
||||
w.db.WithContext(runCtx).Exec("DELETE FROM notifications WHERE user_id = ? OR actor_id = ?", id, id)
|
||||
w.db.WithContext(runCtx).Exec("UPDATE audit_logs SET user_id = NULL, ip_address = NULL WHERE user_id = ?", id)
|
||||
// TODO(HIGH-007): Clean Redis cache keys (user:{id}:*) and Elasticsearch user documents.
|
||||
// Requires injecting Redis/ES clients into HardDeleteWorker.
|
||||
|
||||
logger.Info("Hard delete completed", zap.String("user_id", id.String()))
|
||||
// Redis cache cleanup — user:{id}:* keys.
|
||||
if deleted, rerr := w.cleanRedisKeys(runCtx, id); rerr != nil {
|
||||
logger.Error("Redis cleanup failed",
|
||||
zap.Error(rerr),
|
||||
zap.String("user_id", id.String()),
|
||||
)
|
||||
// Non-fatal: continue to ES so we still make partial progress.
|
||||
} else if deleted > 0 {
|
||||
logger.Info("Redis cleanup complete",
|
||||
zap.String("user_id", id.String()),
|
||||
zap.Int("deleted", deleted),
|
||||
)
|
||||
}
|
||||
|
||||
// Elasticsearch doc cleanup — user + owned tracks + owned playlists.
|
||||
if eerr := w.cleanESDocs(runCtx, id, trackIDs, playlistIDs); eerr != nil {
|
||||
logger.Error("Elasticsearch cleanup failed",
|
||||
zap.Error(eerr),
|
||||
zap.String("user_id", id.String()),
|
||||
)
|
||||
}
|
||||
|
||||
logger.Info("Hard delete completed",
|
||||
zap.String("user_id", id.String()),
|
||||
zap.Int("tracks_removed_from_es", len(trackIDs)),
|
||||
zap.Int("playlists_removed_from_es", len(playlistIDs)),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// collectTrackIDs returns the UUIDs of tracks whose creator_id is the given user.
|
||||
// creator_id is the canonical ownership column; user_id is a denormalized legacy alias.
|
||||
func (w *HardDeleteWorker) collectTrackIDs(ctx context.Context, logger *zap.Logger, userID uuid.UUID) []uuid.UUID {
|
||||
var ids []uuid.UUID
|
||||
if err := w.db.WithContext(ctx).Raw(
|
||||
`SELECT id FROM tracks WHERE creator_id = ?`, userID,
|
||||
).Scan(&ids).Error; err != nil {
|
||||
logger.Warn("Failed to collect track IDs for ES cleanup",
|
||||
zap.Error(err), zap.String("user_id", userID.String()))
|
||||
return nil
|
||||
}
|
||||
return ids
|
||||
}
|
||||
|
||||
// collectPlaylistIDs returns the UUIDs of playlists owned by the user.
|
||||
func (w *HardDeleteWorker) collectPlaylistIDs(ctx context.Context, logger *zap.Logger, userID uuid.UUID) []uuid.UUID {
|
||||
var ids []uuid.UUID
|
||||
if err := w.db.WithContext(ctx).Raw(
|
||||
`SELECT id FROM playlists WHERE user_id = ?`, userID,
|
||||
).Scan(&ids).Error; err != nil {
|
||||
logger.Warn("Failed to collect playlist IDs for ES cleanup",
|
||||
zap.Error(err), zap.String("user_id", userID.String()))
|
||||
return nil
|
||||
}
|
||||
return ids
|
||||
}
|
||||
|
||||
// cleanRedisKeys removes all Redis keys matching user:{userID}:*.
|
||||
//
|
||||
// Uses SCAN with a cursor in a loop — NEVER KEYS, which would block the Redis server
|
||||
// on keyspaces with millions of keys. COUNT hints at ~100 keys per round trip; Redis may
|
||||
// return slightly fewer or more depending on internal bucket distribution.
|
||||
//
|
||||
// Transient Scan errors trigger up to 3 retries with exponential backoff. A persistent
|
||||
// error returns immediately without panicking. DEL errors are logged but non-fatal —
|
||||
// subsequent batches are still attempted so we don't leave the cache half-clean.
|
||||
func (w *HardDeleteWorker) cleanRedisKeys(ctx context.Context, userID uuid.UUID) (int, error) {
|
||||
if w.redisClient == nil {
|
||||
w.logger.Debug("Redis cleanup skipped (no client)", zap.String("user_id", userID.String()))
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
const (
|
||||
batchSize = 100
|
||||
maxRetries = 3
|
||||
)
|
||||
pattern := fmt.Sprintf("user:%s:*", userID.String())
|
||||
var cursor uint64
|
||||
totalDeleted := 0
|
||||
|
||||
for {
|
||||
var (
|
||||
keys []string
|
||||
scanErr error
|
||||
)
|
||||
retries := 0
|
||||
for {
|
||||
keys, cursor, scanErr = w.redisClient.Scan(ctx, cursor, pattern, batchSize).Result()
|
||||
if scanErr == nil {
|
||||
break
|
||||
}
|
||||
retries++
|
||||
w.logger.Warn("Redis SCAN failed, retrying",
|
||||
zap.Error(scanErr),
|
||||
zap.String("user_id", userID.String()),
|
||||
zap.String("pattern", pattern),
|
||||
zap.Int("retry", retries),
|
||||
)
|
||||
if retries >= maxRetries {
|
||||
return totalDeleted, fmt.Errorf("redis SCAN failed after %d retries: %w", maxRetries, scanErr)
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return totalDeleted, ctx.Err()
|
||||
case <-time.After(time.Duration(retries) * 100 * time.Millisecond):
|
||||
}
|
||||
}
|
||||
|
||||
if len(keys) > 0 {
|
||||
n, delErr := w.redisClient.Del(ctx, keys...).Result()
|
||||
if delErr != nil {
|
||||
w.logger.Error("Redis DEL failed for batch",
|
||||
zap.Error(delErr),
|
||||
zap.String("user_id", userID.String()),
|
||||
zap.Int("batch_size", len(keys)),
|
||||
)
|
||||
// Non-fatal: continue scanning so later batches still get cleaned.
|
||||
} else {
|
||||
totalDeleted += int(n)
|
||||
}
|
||||
}
|
||||
|
||||
if cursor == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
return totalDeleted, nil
|
||||
}
|
||||
|
||||
// cleanESDocs removes user-bound documents from Elasticsearch.
|
||||
//
|
||||
// - users index: delete the user doc by its document ID (the user UUID).
|
||||
// - tracks index: DeleteByQuery with a terms filter on _id for the collected track IDs.
|
||||
// - playlists index: DeleteByQuery with a terms filter on _id for the collected playlist IDs.
|
||||
//
|
||||
// Each index is handled independently: a failure on one does not prevent the others
|
||||
// from being attempted. The first encountered error is returned so the caller can log it,
|
||||
// but partial progress is still made.
|
||||
func (w *HardDeleteWorker) cleanESDocs(
|
||||
ctx context.Context,
|
||||
userID uuid.UUID,
|
||||
trackIDs []uuid.UUID,
|
||||
playlistIDs []uuid.UUID,
|
||||
) error {
|
||||
if w.esClient == nil {
|
||||
w.logger.Debug("Elasticsearch cleanup skipped (no client)", zap.String("user_id", userID.String()))
|
||||
return nil
|
||||
}
|
||||
|
||||
prefix := w.esClient.Config.Index
|
||||
var firstErr error
|
||||
|
||||
// 1. Users index — delete by document ID.
|
||||
usersIdx := esIndexNameFor(prefix, vezaes.IdxUsers)
|
||||
if err := w.esDeleteDoc(ctx, usersIdx, userID.String()); err != nil {
|
||||
w.logger.Error("ES user doc delete failed",
|
||||
zap.Error(err),
|
||||
zap.String("user_id", userID.String()),
|
||||
zap.String("index", usersIdx),
|
||||
)
|
||||
if firstErr == nil {
|
||||
firstErr = err
|
||||
}
|
||||
}
|
||||
|
||||
// 2. Tracks index — batch delete by collected IDs.
|
||||
if len(trackIDs) > 0 {
|
||||
tracksIdx := esIndexNameFor(prefix, vezaes.IdxTracks)
|
||||
if err := w.esDeleteByIDs(ctx, tracksIdx, uuidsToStrings(trackIDs)); err != nil {
|
||||
w.logger.Error("ES tracks delete_by_query failed",
|
||||
zap.Error(err),
|
||||
zap.String("user_id", userID.String()),
|
||||
zap.String("index", tracksIdx),
|
||||
zap.Int("count", len(trackIDs)),
|
||||
)
|
||||
if firstErr == nil {
|
||||
firstErr = err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 3. Playlists index — batch delete by collected IDs.
|
||||
if len(playlistIDs) > 0 {
|
||||
playlistsIdx := esIndexNameFor(prefix, vezaes.IdxPlaylists)
|
||||
if err := w.esDeleteByIDs(ctx, playlistsIdx, uuidsToStrings(playlistIDs)); err != nil {
|
||||
w.logger.Error("ES playlists delete_by_query failed",
|
||||
zap.Error(err),
|
||||
zap.String("user_id", userID.String()),
|
||||
zap.String("index", playlistsIdx),
|
||||
zap.Int("count", len(playlistIDs)),
|
||||
)
|
||||
if firstErr == nil {
|
||||
firstErr = err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return firstErr
|
||||
}
|
||||
|
||||
// esDeleteDoc deletes a single document by ID. A 404 response is treated as success
|
||||
// (the doc was already gone, which is the desired end state).
|
||||
func (w *HardDeleteWorker) esDeleteDoc(ctx context.Context, index, docID string) error {
|
||||
req := esapi.DeleteRequest{
|
||||
Index: index,
|
||||
DocumentID: docID,
|
||||
Refresh: "true",
|
||||
}
|
||||
res, err := req.Do(ctx, w.esClient)
|
||||
if err != nil {
|
||||
return fmt.Errorf("es delete: %w", err)
|
||||
}
|
||||
defer res.Body.Close()
|
||||
if res.StatusCode == 404 {
|
||||
return nil
|
||||
}
|
||||
if res.IsError() {
|
||||
return fmt.Errorf("es delete status %d: %s", res.StatusCode, res.String())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// esDeleteByIDs runs a DeleteByQuery with a terms filter on _id. Safe for empty input
|
||||
// (returns nil immediately).
|
||||
func (w *HardDeleteWorker) esDeleteByIDs(ctx context.Context, index string, ids []string) error {
|
||||
if len(ids) == 0 {
|
||||
return nil
|
||||
}
|
||||
body := map[string]any{
|
||||
"query": map[string]any{
|
||||
"terms": map[string]any{
|
||||
"_id": ids,
|
||||
},
|
||||
},
|
||||
}
|
||||
buf, err := json.Marshal(body)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal delete_by_query body: %w", err)
|
||||
}
|
||||
refresh := true
|
||||
req := esapi.DeleteByQueryRequest{
|
||||
Index: []string{index},
|
||||
Body: bytes.NewReader(buf),
|
||||
Refresh: &refresh,
|
||||
}
|
||||
res, err := req.Do(ctx, w.esClient)
|
||||
if err != nil {
|
||||
return fmt.Errorf("es delete_by_query: %w", err)
|
||||
}
|
||||
defer res.Body.Close()
|
||||
if res.IsError() {
|
||||
return fmt.Errorf("es delete_by_query status %d: %s", res.StatusCode, res.String())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// esIndexNameFor mirrors the internal/elasticsearch.indexName helper (private there).
|
||||
func esIndexNameFor(prefix, name string) string {
|
||||
if prefix != "" {
|
||||
return prefix + "-" + name
|
||||
}
|
||||
return "veza-" + name
|
||||
}
|
||||
|
||||
func uuidsToStrings(ids []uuid.UUID) []string {
|
||||
out := make([]string, len(ids))
|
||||
for i, id := range ids {
|
||||
out[i] = id.String()
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
|
|
|||
260
veza-backend-api/internal/workers/hard_delete_worker_test.go
Normal file
260
veza-backend-api/internal/workers/hard_delete_worker_test.go
Normal file
|
|
@ -0,0 +1,260 @@
|
|||
package workers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
elasticsearch "github.com/elastic/go-elasticsearch/v8"
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
|
||||
vezaes "veza-backend-api/internal/elasticsearch"
|
||||
"veza-backend-api/internal/testutils"
|
||||
)
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Pure helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestUUIDsToStrings(t *testing.T) {
|
||||
u1 := uuid.New()
|
||||
u2 := uuid.New()
|
||||
got := uuidsToStrings([]uuid.UUID{u1, u2})
|
||||
require.Len(t, got, 2)
|
||||
assert.Equal(t, u1.String(), got[0])
|
||||
assert.Equal(t, u2.String(), got[1])
|
||||
|
||||
assert.Empty(t, uuidsToStrings(nil))
|
||||
assert.Empty(t, uuidsToStrings([]uuid.UUID{}))
|
||||
}
|
||||
|
||||
func TestEsIndexNameFor(t *testing.T) {
|
||||
assert.Equal(t, "veza-users", esIndexNameFor("", "users"))
|
||||
assert.Equal(t, "veza-tracks", esIndexNameFor("", "tracks"))
|
||||
assert.Equal(t, "myprefix-users", esIndexNameFor("myprefix", "users"))
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Nil-client safety
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestCleanRedisKeys_NilClientIsNoop(t *testing.T) {
|
||||
w := NewHardDeleteWorker(nil, zap.NewNop(), time.Hour)
|
||||
// redisClient stays nil — WithRedis never called
|
||||
deleted, err := w.cleanRedisKeys(context.Background(), uuid.New())
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 0, deleted)
|
||||
}
|
||||
|
||||
func TestCleanESDocs_NilClientIsNoop(t *testing.T) {
|
||||
w := NewHardDeleteWorker(nil, zap.NewNop(), time.Hour)
|
||||
// esClient stays nil
|
||||
err := w.cleanESDocs(context.Background(), uuid.New(), []uuid.UUID{uuid.New()}, []uuid.UUID{uuid.New()})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Redis integration test (gated by VEZA_SKIP_INTEGRATION)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestCleanRedisKeys_Integration(t *testing.T) {
|
||||
testutils.SkipIfNoIntegration(t)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
client, err := testutils.GetTestRedisClient(ctx)
|
||||
require.NoError(t, err, "redis testcontainer setup")
|
||||
require.NotNil(t, client)
|
||||
|
||||
userID := uuid.New()
|
||||
otherID := uuid.New()
|
||||
|
||||
// Seed a bunch of keys across two users + some unrelated keys.
|
||||
ownKeys := []string{
|
||||
fmt.Sprintf("user:%s:session", userID),
|
||||
fmt.Sprintf("user:%s:profile", userID),
|
||||
fmt.Sprintf("user:%s:feed:page:0", userID),
|
||||
fmt.Sprintf("user:%s:feed:page:1", userID),
|
||||
}
|
||||
// Also seed a larger batch to exercise the SCAN loop (>100 keys).
|
||||
for i := 0; i < 150; i++ {
|
||||
ownKeys = append(ownKeys, fmt.Sprintf("user:%s:bulk:%d", userID, i))
|
||||
}
|
||||
unrelated := []string{
|
||||
fmt.Sprintf("user:%s:session", otherID),
|
||||
fmt.Sprintf("user:%s:profile", otherID),
|
||||
"global:config",
|
||||
"cache:homepage",
|
||||
}
|
||||
for _, k := range append(ownKeys, unrelated...) {
|
||||
require.NoError(t, client.Set(ctx, k, "x", time.Minute).Err(), "seed key %s", k)
|
||||
}
|
||||
|
||||
w := NewHardDeleteWorker(nil, zap.NewNop(), time.Hour).WithRedis(client)
|
||||
deleted, err := w.cleanRedisKeys(ctx, userID)
|
||||
require.NoError(t, err)
|
||||
assert.GreaterOrEqual(t, deleted, len(ownKeys),
|
||||
"expected at least %d keys deleted, got %d", len(ownKeys), deleted)
|
||||
|
||||
// All own keys must be gone.
|
||||
for _, k := range ownKeys {
|
||||
n, err := client.Exists(ctx, k).Result()
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, int64(0), n, "key %s should be gone", k)
|
||||
}
|
||||
// All unrelated keys must remain.
|
||||
for _, k := range unrelated {
|
||||
n, err := client.Exists(ctx, k).Result()
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, int64(1), n, "key %s should still exist", k)
|
||||
}
|
||||
|
||||
// Cleanup the unrelated keys we seeded so the shared container stays tidy.
|
||||
_ = client.Del(ctx, unrelated...).Err()
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Elasticsearch test using a mock HTTP server
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// esMockRecord captures a received request for later assertion.
|
||||
type esMockRecord struct {
|
||||
Method string
|
||||
Path string
|
||||
Query string
|
||||
Body string
|
||||
}
|
||||
|
||||
func newESMockServer(t *testing.T) (*httptest.Server, *[]esMockRecord, *sync.Mutex) {
|
||||
t.Helper()
|
||||
var (
|
||||
records []esMockRecord
|
||||
mu sync.Mutex
|
||||
)
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
body, _ := io.ReadAll(r.Body)
|
||||
mu.Lock()
|
||||
records = append(records, esMockRecord{
|
||||
Method: r.Method,
|
||||
Path: r.URL.Path,
|
||||
Query: r.URL.RawQuery,
|
||||
Body: string(body),
|
||||
})
|
||||
mu.Unlock()
|
||||
|
||||
// The go-elasticsearch client first calls GET / (Info) during NewClient-less flows,
|
||||
// and also sends HEAD requests for index existence checks. We answer minimal valid JSON.
|
||||
switch {
|
||||
case r.Method == http.MethodGet && r.URL.Path == "/":
|
||||
w.Header().Set("X-Elastic-Product", "Elasticsearch")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_, _ = w.Write([]byte(`{"version":{"number":"8.11.0"},"tagline":"You Know, for Search"}`))
|
||||
case r.Method == http.MethodDelete && strings.Contains(r.URL.Path, "/_doc/"):
|
||||
// Single doc delete.
|
||||
w.Header().Set("X-Elastic-Product", "Elasticsearch")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_, _ = w.Write([]byte(`{"_index":"x","_id":"y","result":"deleted","_shards":{"total":1,"successful":1,"failed":0}}`))
|
||||
case r.Method == http.MethodPost && strings.HasSuffix(r.URL.Path, "/_delete_by_query"):
|
||||
w.Header().Set("X-Elastic-Product", "Elasticsearch")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_, _ = w.Write([]byte(`{"took":1,"timed_out":false,"total":2,"deleted":2,"batches":1,"version_conflicts":0,"noops":0,"retries":{"bulk":0,"search":0},"throttled_millis":0,"requests_per_second":-1.0,"throttled_until_millis":0,"failures":[]}`))
|
||||
default:
|
||||
// Fallback OK for any other request (e.g. internal health probes).
|
||||
w.Header().Set("X-Elastic-Product", "Elasticsearch")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_, _ = w.Write([]byte(`{}`))
|
||||
}
|
||||
}))
|
||||
return srv, &records, &mu
|
||||
}
|
||||
|
||||
// newMockVezaESClient builds a veza wrapper pointing at the mock server.
|
||||
func newMockVezaESClient(t *testing.T, baseURL, indexPrefix string) *vezaes.Client {
|
||||
t.Helper()
|
||||
rawCfg := elasticsearch.Config{Addresses: []string{baseURL}}
|
||||
raw, err := elasticsearch.NewClient(rawCfg)
|
||||
require.NoError(t, err)
|
||||
return &vezaes.Client{
|
||||
Client: raw,
|
||||
Config: vezaes.Config{URL: baseURL, Index: indexPrefix, Enabled: true},
|
||||
Logger: zap.NewNop(),
|
||||
}
|
||||
}
|
||||
|
||||
func TestCleanESDocs_CallsAllThreeIndices(t *testing.T) {
|
||||
srv, records, mu := newESMockServer(t)
|
||||
defer srv.Close()
|
||||
|
||||
esClient := newMockVezaESClient(t, srv.URL, "myenv")
|
||||
w := NewHardDeleteWorker(nil, zap.NewNop(), time.Hour).WithElasticsearch(esClient)
|
||||
|
||||
userID := uuid.New()
|
||||
trackIDs := []uuid.UUID{uuid.New(), uuid.New()}
|
||||
playlistIDs := []uuid.UUID{uuid.New()}
|
||||
|
||||
err := w.cleanESDocs(context.Background(), userID, trackIDs, playlistIDs)
|
||||
require.NoError(t, err)
|
||||
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
|
||||
// Inspect recorded requests.
|
||||
var sawUserDelete, sawTracksDBQ, sawPlaylistsDBQ bool
|
||||
for _, rec := range *records {
|
||||
switch {
|
||||
case rec.Method == http.MethodDelete &&
|
||||
strings.HasPrefix(rec.Path, "/myenv-users/_doc/") &&
|
||||
strings.HasSuffix(rec.Path, userID.String()):
|
||||
sawUserDelete = true
|
||||
case rec.Method == http.MethodPost &&
|
||||
rec.Path == "/myenv-tracks/_delete_by_query":
|
||||
sawTracksDBQ = true
|
||||
// body must contain both track UUIDs
|
||||
var parsed map[string]any
|
||||
require.NoError(t, json.Unmarshal([]byte(rec.Body), &parsed))
|
||||
assert.Contains(t, rec.Body, trackIDs[0].String())
|
||||
assert.Contains(t, rec.Body, trackIDs[1].String())
|
||||
case rec.Method == http.MethodPost &&
|
||||
rec.Path == "/myenv-playlists/_delete_by_query":
|
||||
sawPlaylistsDBQ = true
|
||||
assert.Contains(t, rec.Body, playlistIDs[0].String())
|
||||
}
|
||||
}
|
||||
assert.True(t, sawUserDelete, "expected DELETE /myenv-users/_doc/{userID}")
|
||||
assert.True(t, sawTracksDBQ, "expected POST /myenv-tracks/_delete_by_query")
|
||||
assert.True(t, sawPlaylistsDBQ, "expected POST /myenv-playlists/_delete_by_query")
|
||||
}
|
||||
|
||||
func TestCleanESDocs_SkipsEmptyIDLists(t *testing.T) {
|
||||
srv, records, mu := newESMockServer(t)
|
||||
defer srv.Close()
|
||||
|
||||
esClient := newMockVezaESClient(t, srv.URL, "")
|
||||
w := NewHardDeleteWorker(nil, zap.NewNop(), time.Hour).WithElasticsearch(esClient)
|
||||
|
||||
// No tracks, no playlists — only the user doc delete should happen.
|
||||
err := w.cleanESDocs(context.Background(), uuid.New(), nil, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
|
||||
var dbqCount int
|
||||
for _, rec := range *records {
|
||||
if rec.Method == http.MethodPost && strings.HasSuffix(rec.Path, "/_delete_by_query") {
|
||||
dbqCount++
|
||||
}
|
||||
}
|
||||
assert.Equal(t, 0, dbqCount, "no delete_by_query should have been issued")
|
||||
}
|
||||
Loading…
Reference in a new issue