From 9cdfc6d898cdfbe1750886bc9522707d10b64123 Mon Sep 17 00:00:00 2001 From: senke Date: Wed, 15 Apr 2026 12:25:39 +0200 Subject: [PATCH] =?UTF-8?q?fix(backend):=20J4=20=E2=80=94=20GDPR-compliant?= =?UTF-8?q?=20hard=20delete=20with=20Redis=20and=20ES=20cleanup?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes TODO(HIGH-007). When the hard-delete worker anonymizes a user past their recovery deadline, it now also cleans the user's residual data from Redis and Elasticsearch, not just PostgreSQL. Without this, a user who invoked their right to erasure would still appear in cached feed/profile responses and in ES search results for up to the next reindex cycle. Worker changes (internal/workers/hard_delete_worker.go): WithRedis / WithElasticsearch builder methods inject the clients. Both are optional: if either is nil (feature disabled or unreachable), the corresponding cleanup is skipped with a debug log and the worker keeps going. Partial progress beats panic. cleanRedisKeys uses SCAN with a cursor loop (COUNT 100), NEVER KEYS — KEYS would block the Redis server on multi-million-key deployments. Pattern is user:{id}:*. Transient SCAN errors retry up to 3 times with 100ms * retry linear backoff; persistent errors return without panic. DEL errors on a batch are logged but non-fatal so subsequent batches are still attempted. cleanESDocs hits three indices independently: - users index: DELETE doc by _id (the user UUID); 404 treated as success (already gone = desired state) - tracks index: DeleteByQuery with a terms filter on _id, using the list of track IDs collected from PostgreSQL BEFORE anonymization - playlists index: same pattern as tracks A failure on one index does not prevent the others from being tried; the first error is returned so the caller can log. Track/playlist IDs are pre-collected (collectTrackIDs, collectPlaylistIDs) before the UPDATE anonymization runs, because the anonymization does NOT cascade (no DELETE on users), so tracks and playlists rows remain with their creator_id / user_id intact and resolvable at query time. Wiring (cmd/api/main.go): The worker now receives cfg.RedisClient directly, and an optional ES client built from elasticsearch.LoadConfig() + NewClient. If ES is disabled or unreachable at startup, the worker logs a warning and proceeds with Redis-only cleanup. Tests (internal/workers/hard_delete_worker_test.go, +260 lines): Pure-function unit tests: - TestUUIDsToStrings - TestEsIndexNameFor Nil-client safety tests: - TestCleanRedisKeys_NilClientIsNoop - TestCleanESDocs_NilClientIsNoop ES mock-server tests (httptest.Server mimicking /_doc and /_delete_by_query endpoints with valid ES 8.11 responses): - TestCleanESDocs_CallsAllThreeIndices — verifies the three expected HTTP calls land with the right paths and request bodies containing the provided UUIDs - TestCleanESDocs_SkipsEmptyIDLists — verifies no DeleteByQuery is issued when the ID lists are empty Redis testcontainer integration test (gated by VEZA_SKIP_INTEGRATION): - TestCleanRedisKeys_Integration — seeds 154 keys (4 fixed + 150 bulk to force the SCAN loop past a single batch) plus 4 unrelated keys from another user / global, runs cleanRedisKeys, asserts all 154 own keys are gone and all 4 unrelated keys remain. Verification: go build ./... OK go vet ./... OK VEZA_SKIP_INTEGRATION=1 go test ./internal/workers/... short OK go test ./internal/workers/ -run TestCleanRedisKeys_Integration → testcontainers spins redis:7-alpine, test passes in 1.34s Out of J4 scope (noted for a follow-up): - No "activity" ES index exists in the codebase today (the audit plan mentioned it as a possible target). The three real indices with user data — users, tracks, playlists — are all now cleaned. - Track artist strings (free-form) may still contain the user's display name as a cached value in the tracks index after this cleanup. Actual user-owned tracks are deleted here, but if a third party's track referenced the removed user in its artist field, that reference is not touched. Strict RGPD on that edge case is a separate ticket. Refs: AUDIT_REPORT.md §8.5, §10 P5, §12 item 1 --- veza-backend-api/cmd/api/main.go | 21 +- .../internal/workers/hard_delete_worker.go | 346 +++++++++++++++++- .../workers/hard_delete_worker_test.go | 260 +++++++++++++ 3 files changed, 609 insertions(+), 18 deletions(-) create mode 100644 veza-backend-api/internal/workers/hard_delete_worker_test.go diff --git a/veza-backend-api/cmd/api/main.go b/veza-backend-api/cmd/api/main.go index 85f455ba7..37afd84ae 100644 --- a/veza-backend-api/cmd/api/main.go +++ b/veza-backend-api/cmd/api/main.go @@ -21,6 +21,7 @@ import ( "veza-backend-api/internal/api" "veza-backend-api/internal/config" "veza-backend-api/internal/core/marketplace" + vezaes "veza-backend-api/internal/elasticsearch" "veza-backend-api/internal/metrics" "veza-backend-api/internal/services" "veza-backend-api/internal/shutdown" @@ -218,10 +219,26 @@ func main() { // v0.10.8 F065: Hard delete worker (GDPR - final anonymization after 30 days) if os.Getenv("HARD_DELETE_CRON_ENABLED") != "false" { - hardDeleteWorker := workers.NewHardDeleteWorker(db.GormDB, logger, 24*time.Hour) + // Optional ES client for the worker's RGPD cleanup (users/tracks/playlists indices). + // Non-fatal if ES is disabled or unreachable — the worker will just skip ES cleanup. + var hardDeleteESClient *vezaes.Client + if esCfg := vezaes.LoadConfig(); esCfg.Enabled { + if esc, esErr := vezaes.NewClient(esCfg, logger); esErr != nil { + logger.Warn("Elasticsearch unavailable for hard delete worker, ES cleanup disabled", + zap.Error(esErr)) + } else { + hardDeleteESClient = esc + } + } + hardDeleteWorker := workers.NewHardDeleteWorker(db.GormDB, logger, 24*time.Hour). + WithRedis(cfg.RedisClient). + WithElasticsearch(hardDeleteESClient) hardDeleteCtx, hardDeleteCancel := context.WithCancel(context.Background()) go hardDeleteWorker.Start(hardDeleteCtx) - logger.Info("Hard delete worker started (24h interval)") + logger.Info("Hard delete worker started (24h interval)", + zap.Bool("redis_cleanup", cfg.RedisClient != nil), + zap.Bool("es_cleanup", hardDeleteESClient != nil), + ) shutdownManager.Register(shutdown.NewShutdownFunc("hard_delete_worker", func(ctx context.Context) error { hardDeleteWorker.Stop() diff --git a/veza-backend-api/internal/workers/hard_delete_worker.go b/veza-backend-api/internal/workers/hard_delete_worker.go index 4af444e50..1cb0b3a8b 100644 --- a/veza-backend-api/internal/workers/hard_delete_worker.go +++ b/veza-backend-api/internal/workers/hard_delete_worker.go @@ -1,25 +1,49 @@ package workers import ( + "bytes" "context" + "encoding/json" + "fmt" "time" + "github.com/elastic/go-elasticsearch/v8/esapi" "github.com/google/uuid" + "github.com/redis/go-redis/v9" "go.uber.org/zap" "gorm.io/gorm" + + vezaes "veza-backend-api/internal/elasticsearch" ) -// HardDeleteWorker performs final GDPR cleanup for users past recovery deadline (v0.10.8 F065) -// Runs periodically to irreversibly anonymize accounts that were soft-deleted 30+ days ago +// HardDeleteWorker performs final GDPR cleanup for users past recovery deadline (v0.10.8 F065). +// Runs periodically to irreversibly anonymize accounts that were soft-deleted 30+ days ago. +// +// The worker cleans three stores: +// - PostgreSQL: anonymize users row, delete from user_profiles, user_sessions, user_settings, +// user_follows, notifications, and null user_id in audit_logs. +// - Redis (optional): all keys matching user:{id}:* using SCAN with a cursor — never KEYS, +// which would block the server on large keyspaces. +// - Elasticsearch (optional): the user doc from the users index, plus all track docs and +// playlist docs authored by the user (collected from PG before anonymization, then removed +// from ES via DeleteByQuery on the _id terms). +// +// Both Redis and Elasticsearch are optional. If either client is nil (feature disabled or +// unreachable at startup), the corresponding cleanup is skipped with a debug log and the +// worker continues. Transient Redis errors trigger a bounded retry loop; persistent errors +// are logged and the worker moves on to the next user (no silent panic). type HardDeleteWorker struct { - db *gorm.DB - logger *zap.Logger - interval time.Duration - stopChan chan struct{} - running bool + db *gorm.DB + redisClient *redis.Client + esClient *vezaes.Client + logger *zap.Logger + interval time.Duration + stopChan chan struct{} + running bool } -// NewHardDeleteWorker creates a new hard delete worker +// NewHardDeleteWorker creates a new hard delete worker with only the required PostgreSQL +// dependency. Use WithRedis and WithElasticsearch to enable the corresponding cleanups. func NewHardDeleteWorker(db *gorm.DB, logger *zap.Logger, interval time.Duration) *HardDeleteWorker { if logger == nil { logger = zap.NewNop() @@ -36,14 +60,32 @@ func NewHardDeleteWorker(db *gorm.DB, logger *zap.Logger, interval time.Duration } } -// Start runs the worker periodically +// WithRedis enables Redis cache key cleanup for the user:{id}:* pattern. +// Pass nil to explicitly disable; pass a live *redis.Client to enable. +func (w *HardDeleteWorker) WithRedis(client *redis.Client) *HardDeleteWorker { + w.redisClient = client + return w +} + +// WithElasticsearch enables Elasticsearch document cleanup. The veza wrapper is expected +// (it carries the index prefix via Config). Pass nil to explicitly disable. +func (w *HardDeleteWorker) WithElasticsearch(client *vezaes.Client) *HardDeleteWorker { + w.esClient = client + return w +} + +// Start runs the worker periodically. func (w *HardDeleteWorker) Start(ctx context.Context) { if w.running { w.logger.Warn("Hard delete worker is already running") return } w.running = true - w.logger.Info("Starting hard delete worker", zap.Duration("interval", w.interval)) + w.logger.Info("Starting hard delete worker", + zap.Duration("interval", w.interval), + zap.Bool("redis_enabled", w.redisClient != nil), + zap.Bool("elasticsearch_enabled", w.esClient != nil), + ) go w.runOnce(ctx) @@ -66,7 +108,7 @@ func (w *HardDeleteWorker) Start(ctx context.Context) { } } -// Stop stops the worker +// Stop stops the worker. func (w *HardDeleteWorker) Stop() { if !w.running { return @@ -74,7 +116,7 @@ func (w *HardDeleteWorker) Stop() { close(w.stopChan) } -// runOnce executes one pass of hard delete / final anonymization +// runOnce executes one pass of hard delete / final anonymization. func (w *HardDeleteWorker) runOnce(ctx context.Context) { logger := w.logger.With(zap.String("worker", "hard_delete")) runCtx, cancel := context.WithTimeout(ctx, 30*time.Minute) @@ -97,7 +139,13 @@ func (w *HardDeleteWorker) runOnce(ctx context.Context) { logger.Info("Processing hard delete", zap.Int("count", len(userIDs))) for _, id := range userIDs { - // Final anonymization pass (ORIGIN §19.3) - ensure no PII remains + // Collect ES doc IDs BEFORE PG anonymization, while creator/user relations + // are still resolvable. The anonymization UPDATE does not cascade, so the + // tracks and playlists rows still exist afterwards — we just need their IDs. + trackIDs := w.collectTrackIDs(runCtx, logger, id) + playlistIDs := w.collectPlaylistIDs(runCtx, logger, id) + + // Final anonymization pass (ORIGIN §19.3) — ensure no PII remains in users row. res := w.db.WithContext(runCtx).Exec(` UPDATE users SET password_hash = '', @@ -124,9 +172,275 @@ func (w *HardDeleteWorker) runOnce(ctx context.Context) { w.db.WithContext(runCtx).Exec("DELETE FROM user_follows WHERE follower_id = ? OR following_id = ?", id, id) w.db.WithContext(runCtx).Exec("DELETE FROM notifications WHERE user_id = ? OR actor_id = ?", id, id) w.db.WithContext(runCtx).Exec("UPDATE audit_logs SET user_id = NULL, ip_address = NULL WHERE user_id = ?", id) - // TODO(HIGH-007): Clean Redis cache keys (user:{id}:*) and Elasticsearch user documents. - // Requires injecting Redis/ES clients into HardDeleteWorker. - logger.Info("Hard delete completed", zap.String("user_id", id.String())) + // Redis cache cleanup — user:{id}:* keys. + if deleted, rerr := w.cleanRedisKeys(runCtx, id); rerr != nil { + logger.Error("Redis cleanup failed", + zap.Error(rerr), + zap.String("user_id", id.String()), + ) + // Non-fatal: continue to ES so we still make partial progress. + } else if deleted > 0 { + logger.Info("Redis cleanup complete", + zap.String("user_id", id.String()), + zap.Int("deleted", deleted), + ) + } + + // Elasticsearch doc cleanup — user + owned tracks + owned playlists. + if eerr := w.cleanESDocs(runCtx, id, trackIDs, playlistIDs); eerr != nil { + logger.Error("Elasticsearch cleanup failed", + zap.Error(eerr), + zap.String("user_id", id.String()), + ) + } + + logger.Info("Hard delete completed", + zap.String("user_id", id.String()), + zap.Int("tracks_removed_from_es", len(trackIDs)), + zap.Int("playlists_removed_from_es", len(playlistIDs)), + ) } } + +// collectTrackIDs returns the UUIDs of tracks whose creator_id is the given user. +// creator_id is the canonical ownership column; user_id is a denormalized legacy alias. +func (w *HardDeleteWorker) collectTrackIDs(ctx context.Context, logger *zap.Logger, userID uuid.UUID) []uuid.UUID { + var ids []uuid.UUID + if err := w.db.WithContext(ctx).Raw( + `SELECT id FROM tracks WHERE creator_id = ?`, userID, + ).Scan(&ids).Error; err != nil { + logger.Warn("Failed to collect track IDs for ES cleanup", + zap.Error(err), zap.String("user_id", userID.String())) + return nil + } + return ids +} + +// collectPlaylistIDs returns the UUIDs of playlists owned by the user. +func (w *HardDeleteWorker) collectPlaylistIDs(ctx context.Context, logger *zap.Logger, userID uuid.UUID) []uuid.UUID { + var ids []uuid.UUID + if err := w.db.WithContext(ctx).Raw( + `SELECT id FROM playlists WHERE user_id = ?`, userID, + ).Scan(&ids).Error; err != nil { + logger.Warn("Failed to collect playlist IDs for ES cleanup", + zap.Error(err), zap.String("user_id", userID.String())) + return nil + } + return ids +} + +// cleanRedisKeys removes all Redis keys matching user:{userID}:*. +// +// Uses SCAN with a cursor in a loop — NEVER KEYS, which would block the Redis server +// on keyspaces with millions of keys. COUNT hints at ~100 keys per round trip; Redis may +// return slightly fewer or more depending on internal bucket distribution. +// +// Transient Scan errors trigger up to 3 retries with exponential backoff. A persistent +// error returns immediately without panicking. DEL errors are logged but non-fatal — +// subsequent batches are still attempted so we don't leave the cache half-clean. +func (w *HardDeleteWorker) cleanRedisKeys(ctx context.Context, userID uuid.UUID) (int, error) { + if w.redisClient == nil { + w.logger.Debug("Redis cleanup skipped (no client)", zap.String("user_id", userID.String())) + return 0, nil + } + + const ( + batchSize = 100 + maxRetries = 3 + ) + pattern := fmt.Sprintf("user:%s:*", userID.String()) + var cursor uint64 + totalDeleted := 0 + + for { + var ( + keys []string + scanErr error + ) + retries := 0 + for { + keys, cursor, scanErr = w.redisClient.Scan(ctx, cursor, pattern, batchSize).Result() + if scanErr == nil { + break + } + retries++ + w.logger.Warn("Redis SCAN failed, retrying", + zap.Error(scanErr), + zap.String("user_id", userID.String()), + zap.String("pattern", pattern), + zap.Int("retry", retries), + ) + if retries >= maxRetries { + return totalDeleted, fmt.Errorf("redis SCAN failed after %d retries: %w", maxRetries, scanErr) + } + select { + case <-ctx.Done(): + return totalDeleted, ctx.Err() + case <-time.After(time.Duration(retries) * 100 * time.Millisecond): + } + } + + if len(keys) > 0 { + n, delErr := w.redisClient.Del(ctx, keys...).Result() + if delErr != nil { + w.logger.Error("Redis DEL failed for batch", + zap.Error(delErr), + zap.String("user_id", userID.String()), + zap.Int("batch_size", len(keys)), + ) + // Non-fatal: continue scanning so later batches still get cleaned. + } else { + totalDeleted += int(n) + } + } + + if cursor == 0 { + break + } + } + return totalDeleted, nil +} + +// cleanESDocs removes user-bound documents from Elasticsearch. +// +// - users index: delete the user doc by its document ID (the user UUID). +// - tracks index: DeleteByQuery with a terms filter on _id for the collected track IDs. +// - playlists index: DeleteByQuery with a terms filter on _id for the collected playlist IDs. +// +// Each index is handled independently: a failure on one does not prevent the others +// from being attempted. The first encountered error is returned so the caller can log it, +// but partial progress is still made. +func (w *HardDeleteWorker) cleanESDocs( + ctx context.Context, + userID uuid.UUID, + trackIDs []uuid.UUID, + playlistIDs []uuid.UUID, +) error { + if w.esClient == nil { + w.logger.Debug("Elasticsearch cleanup skipped (no client)", zap.String("user_id", userID.String())) + return nil + } + + prefix := w.esClient.Config.Index + var firstErr error + + // 1. Users index — delete by document ID. + usersIdx := esIndexNameFor(prefix, vezaes.IdxUsers) + if err := w.esDeleteDoc(ctx, usersIdx, userID.String()); err != nil { + w.logger.Error("ES user doc delete failed", + zap.Error(err), + zap.String("user_id", userID.String()), + zap.String("index", usersIdx), + ) + if firstErr == nil { + firstErr = err + } + } + + // 2. Tracks index — batch delete by collected IDs. + if len(trackIDs) > 0 { + tracksIdx := esIndexNameFor(prefix, vezaes.IdxTracks) + if err := w.esDeleteByIDs(ctx, tracksIdx, uuidsToStrings(trackIDs)); err != nil { + w.logger.Error("ES tracks delete_by_query failed", + zap.Error(err), + zap.String("user_id", userID.String()), + zap.String("index", tracksIdx), + zap.Int("count", len(trackIDs)), + ) + if firstErr == nil { + firstErr = err + } + } + } + + // 3. Playlists index — batch delete by collected IDs. + if len(playlistIDs) > 0 { + playlistsIdx := esIndexNameFor(prefix, vezaes.IdxPlaylists) + if err := w.esDeleteByIDs(ctx, playlistsIdx, uuidsToStrings(playlistIDs)); err != nil { + w.logger.Error("ES playlists delete_by_query failed", + zap.Error(err), + zap.String("user_id", userID.String()), + zap.String("index", playlistsIdx), + zap.Int("count", len(playlistIDs)), + ) + if firstErr == nil { + firstErr = err + } + } + } + + return firstErr +} + +// esDeleteDoc deletes a single document by ID. A 404 response is treated as success +// (the doc was already gone, which is the desired end state). +func (w *HardDeleteWorker) esDeleteDoc(ctx context.Context, index, docID string) error { + req := esapi.DeleteRequest{ + Index: index, + DocumentID: docID, + Refresh: "true", + } + res, err := req.Do(ctx, w.esClient) + if err != nil { + return fmt.Errorf("es delete: %w", err) + } + defer res.Body.Close() + if res.StatusCode == 404 { + return nil + } + if res.IsError() { + return fmt.Errorf("es delete status %d: %s", res.StatusCode, res.String()) + } + return nil +} + +// esDeleteByIDs runs a DeleteByQuery with a terms filter on _id. Safe for empty input +// (returns nil immediately). +func (w *HardDeleteWorker) esDeleteByIDs(ctx context.Context, index string, ids []string) error { + if len(ids) == 0 { + return nil + } + body := map[string]any{ + "query": map[string]any{ + "terms": map[string]any{ + "_id": ids, + }, + }, + } + buf, err := json.Marshal(body) + if err != nil { + return fmt.Errorf("marshal delete_by_query body: %w", err) + } + refresh := true + req := esapi.DeleteByQueryRequest{ + Index: []string{index}, + Body: bytes.NewReader(buf), + Refresh: &refresh, + } + res, err := req.Do(ctx, w.esClient) + if err != nil { + return fmt.Errorf("es delete_by_query: %w", err) + } + defer res.Body.Close() + if res.IsError() { + return fmt.Errorf("es delete_by_query status %d: %s", res.StatusCode, res.String()) + } + return nil +} + +// esIndexNameFor mirrors the internal/elasticsearch.indexName helper (private there). +func esIndexNameFor(prefix, name string) string { + if prefix != "" { + return prefix + "-" + name + } + return "veza-" + name +} + +func uuidsToStrings(ids []uuid.UUID) []string { + out := make([]string, len(ids)) + for i, id := range ids { + out[i] = id.String() + } + return out +} diff --git a/veza-backend-api/internal/workers/hard_delete_worker_test.go b/veza-backend-api/internal/workers/hard_delete_worker_test.go new file mode 100644 index 000000000..086aa9d57 --- /dev/null +++ b/veza-backend-api/internal/workers/hard_delete_worker_test.go @@ -0,0 +1,260 @@ +package workers + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/http/httptest" + "strings" + "sync" + "testing" + "time" + + elasticsearch "github.com/elastic/go-elasticsearch/v8" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + vezaes "veza-backend-api/internal/elasticsearch" + "veza-backend-api/internal/testutils" +) + +// --------------------------------------------------------------------------- +// Pure helpers +// --------------------------------------------------------------------------- + +func TestUUIDsToStrings(t *testing.T) { + u1 := uuid.New() + u2 := uuid.New() + got := uuidsToStrings([]uuid.UUID{u1, u2}) + require.Len(t, got, 2) + assert.Equal(t, u1.String(), got[0]) + assert.Equal(t, u2.String(), got[1]) + + assert.Empty(t, uuidsToStrings(nil)) + assert.Empty(t, uuidsToStrings([]uuid.UUID{})) +} + +func TestEsIndexNameFor(t *testing.T) { + assert.Equal(t, "veza-users", esIndexNameFor("", "users")) + assert.Equal(t, "veza-tracks", esIndexNameFor("", "tracks")) + assert.Equal(t, "myprefix-users", esIndexNameFor("myprefix", "users")) +} + +// --------------------------------------------------------------------------- +// Nil-client safety +// --------------------------------------------------------------------------- + +func TestCleanRedisKeys_NilClientIsNoop(t *testing.T) { + w := NewHardDeleteWorker(nil, zap.NewNop(), time.Hour) + // redisClient stays nil — WithRedis never called + deleted, err := w.cleanRedisKeys(context.Background(), uuid.New()) + require.NoError(t, err) + assert.Equal(t, 0, deleted) +} + +func TestCleanESDocs_NilClientIsNoop(t *testing.T) { + w := NewHardDeleteWorker(nil, zap.NewNop(), time.Hour) + // esClient stays nil + err := w.cleanESDocs(context.Background(), uuid.New(), []uuid.UUID{uuid.New()}, []uuid.UUID{uuid.New()}) + require.NoError(t, err) +} + +// --------------------------------------------------------------------------- +// Redis integration test (gated by VEZA_SKIP_INTEGRATION) +// --------------------------------------------------------------------------- + +func TestCleanRedisKeys_Integration(t *testing.T) { + testutils.SkipIfNoIntegration(t) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + client, err := testutils.GetTestRedisClient(ctx) + require.NoError(t, err, "redis testcontainer setup") + require.NotNil(t, client) + + userID := uuid.New() + otherID := uuid.New() + + // Seed a bunch of keys across two users + some unrelated keys. + ownKeys := []string{ + fmt.Sprintf("user:%s:session", userID), + fmt.Sprintf("user:%s:profile", userID), + fmt.Sprintf("user:%s:feed:page:0", userID), + fmt.Sprintf("user:%s:feed:page:1", userID), + } + // Also seed a larger batch to exercise the SCAN loop (>100 keys). + for i := 0; i < 150; i++ { + ownKeys = append(ownKeys, fmt.Sprintf("user:%s:bulk:%d", userID, i)) + } + unrelated := []string{ + fmt.Sprintf("user:%s:session", otherID), + fmt.Sprintf("user:%s:profile", otherID), + "global:config", + "cache:homepage", + } + for _, k := range append(ownKeys, unrelated...) { + require.NoError(t, client.Set(ctx, k, "x", time.Minute).Err(), "seed key %s", k) + } + + w := NewHardDeleteWorker(nil, zap.NewNop(), time.Hour).WithRedis(client) + deleted, err := w.cleanRedisKeys(ctx, userID) + require.NoError(t, err) + assert.GreaterOrEqual(t, deleted, len(ownKeys), + "expected at least %d keys deleted, got %d", len(ownKeys), deleted) + + // All own keys must be gone. + for _, k := range ownKeys { + n, err := client.Exists(ctx, k).Result() + require.NoError(t, err) + assert.Equal(t, int64(0), n, "key %s should be gone", k) + } + // All unrelated keys must remain. + for _, k := range unrelated { + n, err := client.Exists(ctx, k).Result() + require.NoError(t, err) + assert.Equal(t, int64(1), n, "key %s should still exist", k) + } + + // Cleanup the unrelated keys we seeded so the shared container stays tidy. + _ = client.Del(ctx, unrelated...).Err() +} + +// --------------------------------------------------------------------------- +// Elasticsearch test using a mock HTTP server +// --------------------------------------------------------------------------- + +// esMockRecord captures a received request for later assertion. +type esMockRecord struct { + Method string + Path string + Query string + Body string +} + +func newESMockServer(t *testing.T) (*httptest.Server, *[]esMockRecord, *sync.Mutex) { + t.Helper() + var ( + records []esMockRecord + mu sync.Mutex + ) + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, _ := io.ReadAll(r.Body) + mu.Lock() + records = append(records, esMockRecord{ + Method: r.Method, + Path: r.URL.Path, + Query: r.URL.RawQuery, + Body: string(body), + }) + mu.Unlock() + + // The go-elasticsearch client first calls GET / (Info) during NewClient-less flows, + // and also sends HEAD requests for index existence checks. We answer minimal valid JSON. + switch { + case r.Method == http.MethodGet && r.URL.Path == "/": + w.Header().Set("X-Elastic-Product", "Elasticsearch") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"version":{"number":"8.11.0"},"tagline":"You Know, for Search"}`)) + case r.Method == http.MethodDelete && strings.Contains(r.URL.Path, "/_doc/"): + // Single doc delete. + w.Header().Set("X-Elastic-Product", "Elasticsearch") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"_index":"x","_id":"y","result":"deleted","_shards":{"total":1,"successful":1,"failed":0}}`)) + case r.Method == http.MethodPost && strings.HasSuffix(r.URL.Path, "/_delete_by_query"): + w.Header().Set("X-Elastic-Product", "Elasticsearch") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"took":1,"timed_out":false,"total":2,"deleted":2,"batches":1,"version_conflicts":0,"noops":0,"retries":{"bulk":0,"search":0},"throttled_millis":0,"requests_per_second":-1.0,"throttled_until_millis":0,"failures":[]}`)) + default: + // Fallback OK for any other request (e.g. internal health probes). + w.Header().Set("X-Elastic-Product", "Elasticsearch") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{}`)) + } + })) + return srv, &records, &mu +} + +// newMockVezaESClient builds a veza wrapper pointing at the mock server. +func newMockVezaESClient(t *testing.T, baseURL, indexPrefix string) *vezaes.Client { + t.Helper() + rawCfg := elasticsearch.Config{Addresses: []string{baseURL}} + raw, err := elasticsearch.NewClient(rawCfg) + require.NoError(t, err) + return &vezaes.Client{ + Client: raw, + Config: vezaes.Config{URL: baseURL, Index: indexPrefix, Enabled: true}, + Logger: zap.NewNop(), + } +} + +func TestCleanESDocs_CallsAllThreeIndices(t *testing.T) { + srv, records, mu := newESMockServer(t) + defer srv.Close() + + esClient := newMockVezaESClient(t, srv.URL, "myenv") + w := NewHardDeleteWorker(nil, zap.NewNop(), time.Hour).WithElasticsearch(esClient) + + userID := uuid.New() + trackIDs := []uuid.UUID{uuid.New(), uuid.New()} + playlistIDs := []uuid.UUID{uuid.New()} + + err := w.cleanESDocs(context.Background(), userID, trackIDs, playlistIDs) + require.NoError(t, err) + + mu.Lock() + defer mu.Unlock() + + // Inspect recorded requests. + var sawUserDelete, sawTracksDBQ, sawPlaylistsDBQ bool + for _, rec := range *records { + switch { + case rec.Method == http.MethodDelete && + strings.HasPrefix(rec.Path, "/myenv-users/_doc/") && + strings.HasSuffix(rec.Path, userID.String()): + sawUserDelete = true + case rec.Method == http.MethodPost && + rec.Path == "/myenv-tracks/_delete_by_query": + sawTracksDBQ = true + // body must contain both track UUIDs + var parsed map[string]any + require.NoError(t, json.Unmarshal([]byte(rec.Body), &parsed)) + assert.Contains(t, rec.Body, trackIDs[0].String()) + assert.Contains(t, rec.Body, trackIDs[1].String()) + case rec.Method == http.MethodPost && + rec.Path == "/myenv-playlists/_delete_by_query": + sawPlaylistsDBQ = true + assert.Contains(t, rec.Body, playlistIDs[0].String()) + } + } + assert.True(t, sawUserDelete, "expected DELETE /myenv-users/_doc/{userID}") + assert.True(t, sawTracksDBQ, "expected POST /myenv-tracks/_delete_by_query") + assert.True(t, sawPlaylistsDBQ, "expected POST /myenv-playlists/_delete_by_query") +} + +func TestCleanESDocs_SkipsEmptyIDLists(t *testing.T) { + srv, records, mu := newESMockServer(t) + defer srv.Close() + + esClient := newMockVezaESClient(t, srv.URL, "") + w := NewHardDeleteWorker(nil, zap.NewNop(), time.Hour).WithElasticsearch(esClient) + + // No tracks, no playlists — only the user doc delete should happen. + err := w.cleanESDocs(context.Background(), uuid.New(), nil, nil) + require.NoError(t, err) + + mu.Lock() + defer mu.Unlock() + + var dbqCount int + for _, rec := range *records { + if rec.Method == http.MethodPost && strings.HasSuffix(rec.Path, "/_delete_by_query") { + dbqCount++ + } + } + assert.Equal(t, 0, dbqCount, "no delete_by_query should have been issued") +}