veza/veza-backend-api/internal/workers/hard_delete_worker.go
senke 9cdfc6d898 fix(backend): J4 — GDPR-compliant hard delete with Redis and ES cleanup
Closes TODO(HIGH-007). When the hard-delete worker anonymizes a user past
their recovery deadline, it now also cleans the user's residual data from
Redis and Elasticsearch, not just PostgreSQL. Without this, a user who
invoked their right to erasure would still appear in cached feed/profile
responses and in ES search results for up to the next reindex cycle.

Worker changes (internal/workers/hard_delete_worker.go):

  WithRedis / WithElasticsearch builder methods inject the clients. Both
  are optional: if either is nil (feature disabled or unreachable), the
  corresponding cleanup is skipped with a debug log and the worker keeps
  going. Partial progress beats panic.

  cleanRedisKeys uses SCAN with a cursor loop (COUNT 100), NEVER KEYS —
  KEYS would block the Redis server on multi-million-key deployments.
  Pattern is user:{id}:*. Transient SCAN errors retry up to 3 times with
  100ms * retry linear backoff; persistent errors return without panic.
  DEL errors on a batch are logged but non-fatal so subsequent batches
  are still attempted.

  cleanESDocs hits three indices independently:
    - users index: DELETE doc by _id (the user UUID); 404 treated as
      success (already gone = desired state)
    - tracks index: DeleteByQuery with a terms filter on _id, using the
      list of track IDs collected from PostgreSQL BEFORE anonymization
    - playlists index: same pattern as tracks
  A failure on one index does not prevent the others from being tried;
  the first error is returned so the caller can log.

  Track/playlist IDs are pre-collected (collectTrackIDs, collectPlaylistIDs)
  before the UPDATE anonymization runs, because the anonymization does NOT
  cascade (no DELETE on users), so tracks and playlists rows remain with
  their creator_id / user_id intact and resolvable at query time.

Wiring (cmd/api/main.go):

  The worker now receives cfg.RedisClient directly, and an optional ES
  client built from elasticsearch.LoadConfig() + NewClient. If ES is
  disabled or unreachable at startup, the worker logs a warning and
  proceeds with Redis-only cleanup.

Tests (internal/workers/hard_delete_worker_test.go, +260 lines):

  Pure-function unit tests:
    - TestUUIDsToStrings
    - TestEsIndexNameFor
  Nil-client safety tests:
    - TestCleanRedisKeys_NilClientIsNoop
    - TestCleanESDocs_NilClientIsNoop
  ES mock-server tests (httptest.Server mimicking /_doc and
  /_delete_by_query endpoints with valid ES 8.11 responses):
    - TestCleanESDocs_CallsAllThreeIndices — verifies the three expected
      HTTP calls land with the right paths and request bodies containing
      the provided UUIDs
    - TestCleanESDocs_SkipsEmptyIDLists — verifies no DeleteByQuery is
      issued when the ID lists are empty
  Redis testcontainer integration test (gated by VEZA_SKIP_INTEGRATION):
    - TestCleanRedisKeys_Integration — seeds 154 keys (4 fixed + 150 bulk
      to force the SCAN loop past a single batch) plus 4 unrelated keys
      from another user / global, runs cleanRedisKeys, asserts all 154
      own keys are gone and all 4 unrelated keys remain.

Verification:
  go build ./...                                                OK
  go vet ./...                                                  OK
  VEZA_SKIP_INTEGRATION=1 go test ./internal/workers/... short  OK
  go test ./internal/workers/ -run TestCleanRedisKeys_Integration
    → testcontainers spins redis:7-alpine, test passes in 1.34s

Out of J4 scope (noted for a follow-up):
  - No "activity" ES index exists in the codebase today (the audit plan
    mentioned it as a possible target). The three real indices with user
    data — users, tracks, playlists — are all now cleaned.
  - Track artist strings (free-form) may still contain the user's
    display name as a cached value in the tracks index after this
    cleanup. Actual user-owned tracks are deleted here, but if a third
    party's track referenced the removed user in its artist field, that
    reference is not touched. Strict RGPD on that edge case is a
    separate ticket.

Refs: AUDIT_REPORT.md §8.5, §10 P5, §12 item 1
2026-04-15 12:25:39 +02:00

446 lines
14 KiB
Go

package workers
import (
"bytes"
"context"
"encoding/json"
"fmt"
"time"
"github.com/elastic/go-elasticsearch/v8/esapi"
"github.com/google/uuid"
"github.com/redis/go-redis/v9"
"go.uber.org/zap"
"gorm.io/gorm"
vezaes "veza-backend-api/internal/elasticsearch"
)
// HardDeleteWorker performs final GDPR cleanup for users past recovery deadline (v0.10.8 F065).
// Runs periodically to irreversibly anonymize accounts that were soft-deleted 30+ days ago.
//
// The worker cleans three stores:
// - PostgreSQL: anonymize users row, delete from user_profiles, user_sessions, user_settings,
// user_follows, notifications, and null user_id in audit_logs.
// - Redis (optional): all keys matching user:{id}:* using SCAN with a cursor — never KEYS,
// which would block the server on large keyspaces.
// - Elasticsearch (optional): the user doc from the users index, plus all track docs and
// playlist docs authored by the user (collected from PG before anonymization, then removed
// from ES via DeleteByQuery on the _id terms).
//
// Both Redis and Elasticsearch are optional. If either client is nil (feature disabled or
// unreachable at startup), the corresponding cleanup is skipped with a debug log and the
// worker continues. Transient Redis errors trigger a bounded retry loop; persistent errors
// are logged and the worker moves on to the next user (no silent panic).
type HardDeleteWorker struct {
db *gorm.DB
redisClient *redis.Client
esClient *vezaes.Client
logger *zap.Logger
interval time.Duration
stopChan chan struct{}
running bool
}
// NewHardDeleteWorker creates a new hard delete worker with only the required PostgreSQL
// dependency. Use WithRedis and WithElasticsearch to enable the corresponding cleanups.
func NewHardDeleteWorker(db *gorm.DB, logger *zap.Logger, interval time.Duration) *HardDeleteWorker {
if logger == nil {
logger = zap.NewNop()
}
if interval <= 0 {
interval = 24 * time.Hour
}
return &HardDeleteWorker{
db: db,
logger: logger,
interval: interval,
stopChan: make(chan struct{}),
running: false,
}
}
// WithRedis enables Redis cache key cleanup for the user:{id}:* pattern.
// Pass nil to explicitly disable; pass a live *redis.Client to enable.
func (w *HardDeleteWorker) WithRedis(client *redis.Client) *HardDeleteWorker {
w.redisClient = client
return w
}
// WithElasticsearch enables Elasticsearch document cleanup. The veza wrapper is expected
// (it carries the index prefix via Config). Pass nil to explicitly disable.
func (w *HardDeleteWorker) WithElasticsearch(client *vezaes.Client) *HardDeleteWorker {
w.esClient = client
return w
}
// Start runs the worker periodically.
func (w *HardDeleteWorker) Start(ctx context.Context) {
if w.running {
w.logger.Warn("Hard delete worker is already running")
return
}
w.running = true
w.logger.Info("Starting hard delete worker",
zap.Duration("interval", w.interval),
zap.Bool("redis_enabled", w.redisClient != nil),
zap.Bool("elasticsearch_enabled", w.esClient != nil),
)
go w.runOnce(ctx)
ticker := time.NewTicker(w.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
w.logger.Info("Stopping hard delete worker")
w.running = false
return
case <-w.stopChan:
w.logger.Info("Stopping hard delete worker (stop requested)")
w.running = false
return
case <-ticker.C:
go w.runOnce(ctx)
}
}
}
// Stop stops the worker.
func (w *HardDeleteWorker) Stop() {
if !w.running {
return
}
close(w.stopChan)
}
// runOnce executes one pass of hard delete / final anonymization.
func (w *HardDeleteWorker) runOnce(ctx context.Context) {
logger := w.logger.With(zap.String("worker", "hard_delete"))
runCtx, cancel := context.WithTimeout(ctx, 30*time.Minute)
defer cancel()
var userIDs []uuid.UUID
if err := w.db.WithContext(runCtx).Raw(`
SELECT id FROM users
WHERE deleted_at IS NOT NULL AND recovery_deadline IS NOT NULL AND recovery_deadline < NOW()
`).Scan(&userIDs).Error; err != nil {
logger.Error("Failed to query users for hard delete", zap.Error(err))
return
}
if len(userIDs) == 0 {
logger.Debug("No users eligible for hard delete")
return
}
logger.Info("Processing hard delete", zap.Int("count", len(userIDs)))
for _, id := range userIDs {
// Collect ES doc IDs BEFORE PG anonymization, while creator/user relations
// are still resolvable. The anonymization UPDATE does not cascade, so the
// tracks and playlists rows still exist afterwards — we just need their IDs.
trackIDs := w.collectTrackIDs(runCtx, logger, id)
playlistIDs := w.collectPlaylistIDs(runCtx, logger, id)
// Final anonymization pass (ORIGIN §19.3) — ensure no PII remains in users row.
res := w.db.WithContext(runCtx).Exec(`
UPDATE users SET
password_hash = '',
first_name = NULL,
last_name = NULL,
bio = '',
location = '',
avatar = '',
banner_url = '',
recovery_deadline = NULL,
updated_at = NOW()
WHERE id = ? AND deleted_at IS NOT NULL
`, id)
if res.Error != nil {
logger.Error("Failed to anonymize user", zap.Error(res.Error), zap.String("user_id", id.String()))
continue
}
// Delete user_profiles (may contain PII)
w.db.WithContext(runCtx).Exec("DELETE FROM user_profiles WHERE user_id = ?", id)
// SECURITY(HIGH-007): RGPD — clean additional PII-containing tables
w.db.WithContext(runCtx).Exec("DELETE FROM user_sessions WHERE user_id = ?", id)
w.db.WithContext(runCtx).Exec("DELETE FROM user_settings WHERE user_id = ?", id)
w.db.WithContext(runCtx).Exec("DELETE FROM user_follows WHERE follower_id = ? OR following_id = ?", id, id)
w.db.WithContext(runCtx).Exec("DELETE FROM notifications WHERE user_id = ? OR actor_id = ?", id, id)
w.db.WithContext(runCtx).Exec("UPDATE audit_logs SET user_id = NULL, ip_address = NULL WHERE user_id = ?", id)
// Redis cache cleanup — user:{id}:* keys.
if deleted, rerr := w.cleanRedisKeys(runCtx, id); rerr != nil {
logger.Error("Redis cleanup failed",
zap.Error(rerr),
zap.String("user_id", id.String()),
)
// Non-fatal: continue to ES so we still make partial progress.
} else if deleted > 0 {
logger.Info("Redis cleanup complete",
zap.String("user_id", id.String()),
zap.Int("deleted", deleted),
)
}
// Elasticsearch doc cleanup — user + owned tracks + owned playlists.
if eerr := w.cleanESDocs(runCtx, id, trackIDs, playlistIDs); eerr != nil {
logger.Error("Elasticsearch cleanup failed",
zap.Error(eerr),
zap.String("user_id", id.String()),
)
}
logger.Info("Hard delete completed",
zap.String("user_id", id.String()),
zap.Int("tracks_removed_from_es", len(trackIDs)),
zap.Int("playlists_removed_from_es", len(playlistIDs)),
)
}
}
// collectTrackIDs returns the UUIDs of tracks whose creator_id is the given user.
// creator_id is the canonical ownership column; user_id is a denormalized legacy alias.
func (w *HardDeleteWorker) collectTrackIDs(ctx context.Context, logger *zap.Logger, userID uuid.UUID) []uuid.UUID {
var ids []uuid.UUID
if err := w.db.WithContext(ctx).Raw(
`SELECT id FROM tracks WHERE creator_id = ?`, userID,
).Scan(&ids).Error; err != nil {
logger.Warn("Failed to collect track IDs for ES cleanup",
zap.Error(err), zap.String("user_id", userID.String()))
return nil
}
return ids
}
// collectPlaylistIDs returns the UUIDs of playlists owned by the user.
func (w *HardDeleteWorker) collectPlaylistIDs(ctx context.Context, logger *zap.Logger, userID uuid.UUID) []uuid.UUID {
var ids []uuid.UUID
if err := w.db.WithContext(ctx).Raw(
`SELECT id FROM playlists WHERE user_id = ?`, userID,
).Scan(&ids).Error; err != nil {
logger.Warn("Failed to collect playlist IDs for ES cleanup",
zap.Error(err), zap.String("user_id", userID.String()))
return nil
}
return ids
}
// cleanRedisKeys removes all Redis keys matching user:{userID}:*.
//
// Uses SCAN with a cursor in a loop — NEVER KEYS, which would block the Redis server
// on keyspaces with millions of keys. COUNT hints at ~100 keys per round trip; Redis may
// return slightly fewer or more depending on internal bucket distribution.
//
// Transient Scan errors trigger up to 3 retries with exponential backoff. A persistent
// error returns immediately without panicking. DEL errors are logged but non-fatal —
// subsequent batches are still attempted so we don't leave the cache half-clean.
func (w *HardDeleteWorker) cleanRedisKeys(ctx context.Context, userID uuid.UUID) (int, error) {
if w.redisClient == nil {
w.logger.Debug("Redis cleanup skipped (no client)", zap.String("user_id", userID.String()))
return 0, nil
}
const (
batchSize = 100
maxRetries = 3
)
pattern := fmt.Sprintf("user:%s:*", userID.String())
var cursor uint64
totalDeleted := 0
for {
var (
keys []string
scanErr error
)
retries := 0
for {
keys, cursor, scanErr = w.redisClient.Scan(ctx, cursor, pattern, batchSize).Result()
if scanErr == nil {
break
}
retries++
w.logger.Warn("Redis SCAN failed, retrying",
zap.Error(scanErr),
zap.String("user_id", userID.String()),
zap.String("pattern", pattern),
zap.Int("retry", retries),
)
if retries >= maxRetries {
return totalDeleted, fmt.Errorf("redis SCAN failed after %d retries: %w", maxRetries, scanErr)
}
select {
case <-ctx.Done():
return totalDeleted, ctx.Err()
case <-time.After(time.Duration(retries) * 100 * time.Millisecond):
}
}
if len(keys) > 0 {
n, delErr := w.redisClient.Del(ctx, keys...).Result()
if delErr != nil {
w.logger.Error("Redis DEL failed for batch",
zap.Error(delErr),
zap.String("user_id", userID.String()),
zap.Int("batch_size", len(keys)),
)
// Non-fatal: continue scanning so later batches still get cleaned.
} else {
totalDeleted += int(n)
}
}
if cursor == 0 {
break
}
}
return totalDeleted, nil
}
// cleanESDocs removes user-bound documents from Elasticsearch.
//
// - users index: delete the user doc by its document ID (the user UUID).
// - tracks index: DeleteByQuery with a terms filter on _id for the collected track IDs.
// - playlists index: DeleteByQuery with a terms filter on _id for the collected playlist IDs.
//
// Each index is handled independently: a failure on one does not prevent the others
// from being attempted. The first encountered error is returned so the caller can log it,
// but partial progress is still made.
func (w *HardDeleteWorker) cleanESDocs(
ctx context.Context,
userID uuid.UUID,
trackIDs []uuid.UUID,
playlistIDs []uuid.UUID,
) error {
if w.esClient == nil {
w.logger.Debug("Elasticsearch cleanup skipped (no client)", zap.String("user_id", userID.String()))
return nil
}
prefix := w.esClient.Config.Index
var firstErr error
// 1. Users index — delete by document ID.
usersIdx := esIndexNameFor(prefix, vezaes.IdxUsers)
if err := w.esDeleteDoc(ctx, usersIdx, userID.String()); err != nil {
w.logger.Error("ES user doc delete failed",
zap.Error(err),
zap.String("user_id", userID.String()),
zap.String("index", usersIdx),
)
if firstErr == nil {
firstErr = err
}
}
// 2. Tracks index — batch delete by collected IDs.
if len(trackIDs) > 0 {
tracksIdx := esIndexNameFor(prefix, vezaes.IdxTracks)
if err := w.esDeleteByIDs(ctx, tracksIdx, uuidsToStrings(trackIDs)); err != nil {
w.logger.Error("ES tracks delete_by_query failed",
zap.Error(err),
zap.String("user_id", userID.String()),
zap.String("index", tracksIdx),
zap.Int("count", len(trackIDs)),
)
if firstErr == nil {
firstErr = err
}
}
}
// 3. Playlists index — batch delete by collected IDs.
if len(playlistIDs) > 0 {
playlistsIdx := esIndexNameFor(prefix, vezaes.IdxPlaylists)
if err := w.esDeleteByIDs(ctx, playlistsIdx, uuidsToStrings(playlistIDs)); err != nil {
w.logger.Error("ES playlists delete_by_query failed",
zap.Error(err),
zap.String("user_id", userID.String()),
zap.String("index", playlistsIdx),
zap.Int("count", len(playlistIDs)),
)
if firstErr == nil {
firstErr = err
}
}
}
return firstErr
}
// esDeleteDoc deletes a single document by ID. A 404 response is treated as success
// (the doc was already gone, which is the desired end state).
func (w *HardDeleteWorker) esDeleteDoc(ctx context.Context, index, docID string) error {
req := esapi.DeleteRequest{
Index: index,
DocumentID: docID,
Refresh: "true",
}
res, err := req.Do(ctx, w.esClient)
if err != nil {
return fmt.Errorf("es delete: %w", err)
}
defer res.Body.Close()
if res.StatusCode == 404 {
return nil
}
if res.IsError() {
return fmt.Errorf("es delete status %d: %s", res.StatusCode, res.String())
}
return nil
}
// esDeleteByIDs runs a DeleteByQuery with a terms filter on _id. Safe for empty input
// (returns nil immediately).
func (w *HardDeleteWorker) esDeleteByIDs(ctx context.Context, index string, ids []string) error {
if len(ids) == 0 {
return nil
}
body := map[string]any{
"query": map[string]any{
"terms": map[string]any{
"_id": ids,
},
},
}
buf, err := json.Marshal(body)
if err != nil {
return fmt.Errorf("marshal delete_by_query body: %w", err)
}
refresh := true
req := esapi.DeleteByQueryRequest{
Index: []string{index},
Body: bytes.NewReader(buf),
Refresh: &refresh,
}
res, err := req.Do(ctx, w.esClient)
if err != nil {
return fmt.Errorf("es delete_by_query: %w", err)
}
defer res.Body.Close()
if res.IsError() {
return fmt.Errorf("es delete_by_query status %d: %s", res.StatusCode, res.String())
}
return nil
}
// esIndexNameFor mirrors the internal/elasticsearch.indexName helper (private there).
func esIndexNameFor(prefix, name string) string {
if prefix != "" {
return prefix + "-" + name
}
return "veza-" + name
}
func uuidsToStrings(ids []uuid.UUID) []string {
out := make([]string, len(ids))
for i, id := range ids {
out[i] = id.String()
}
return out
}