package workers import ( "bytes" "context" "encoding/json" "fmt" "time" "github.com/elastic/go-elasticsearch/v8/esapi" "github.com/google/uuid" "github.com/redis/go-redis/v9" "go.uber.org/zap" "gorm.io/gorm" vezaes "veza-backend-api/internal/elasticsearch" ) // HardDeleteWorker performs final GDPR cleanup for users past recovery deadline (v0.10.8 F065). // Runs periodically to irreversibly anonymize accounts that were soft-deleted 30+ days ago. // // The worker cleans three stores: // - PostgreSQL: anonymize users row, delete from user_profiles, user_sessions, user_settings, // user_follows, notifications, and null user_id in audit_logs. // - Redis (optional): all keys matching user:{id}:* using SCAN with a cursor — never KEYS, // which would block the server on large keyspaces. // - Elasticsearch (optional): the user doc from the users index, plus all track docs and // playlist docs authored by the user (collected from PG before anonymization, then removed // from ES via DeleteByQuery on the _id terms). // // Both Redis and Elasticsearch are optional. If either client is nil (feature disabled or // unreachable at startup), the corresponding cleanup is skipped with a debug log and the // worker continues. Transient Redis errors trigger a bounded retry loop; persistent errors // are logged and the worker moves on to the next user (no silent panic). type HardDeleteWorker struct { db *gorm.DB redisClient *redis.Client esClient *vezaes.Client logger *zap.Logger interval time.Duration stopChan chan struct{} running bool } // NewHardDeleteWorker creates a new hard delete worker with only the required PostgreSQL // dependency. Use WithRedis and WithElasticsearch to enable the corresponding cleanups. func NewHardDeleteWorker(db *gorm.DB, logger *zap.Logger, interval time.Duration) *HardDeleteWorker { if logger == nil { logger = zap.NewNop() } if interval <= 0 { interval = 24 * time.Hour } return &HardDeleteWorker{ db: db, logger: logger, interval: interval, stopChan: make(chan struct{}), running: false, } } // WithRedis enables Redis cache key cleanup for the user:{id}:* pattern. // Pass nil to explicitly disable; pass a live *redis.Client to enable. func (w *HardDeleteWorker) WithRedis(client *redis.Client) *HardDeleteWorker { w.redisClient = client return w } // WithElasticsearch enables Elasticsearch document cleanup. The veza wrapper is expected // (it carries the index prefix via Config). Pass nil to explicitly disable. func (w *HardDeleteWorker) WithElasticsearch(client *vezaes.Client) *HardDeleteWorker { w.esClient = client return w } // Start runs the worker periodically. func (w *HardDeleteWorker) Start(ctx context.Context) { if w.running { w.logger.Warn("Hard delete worker is already running") return } w.running = true w.logger.Info("Starting hard delete worker", zap.Duration("interval", w.interval), zap.Bool("redis_enabled", w.redisClient != nil), zap.Bool("elasticsearch_enabled", w.esClient != nil), ) go w.runOnce(ctx) ticker := time.NewTicker(w.interval) defer ticker.Stop() for { select { case <-ctx.Done(): w.logger.Info("Stopping hard delete worker") w.running = false return case <-w.stopChan: w.logger.Info("Stopping hard delete worker (stop requested)") w.running = false return case <-ticker.C: go w.runOnce(ctx) } } } // Stop stops the worker. func (w *HardDeleteWorker) Stop() { if !w.running { return } close(w.stopChan) } // runOnce executes one pass of hard delete / final anonymization. func (w *HardDeleteWorker) runOnce(ctx context.Context) { logger := w.logger.With(zap.String("worker", "hard_delete")) runCtx, cancel := context.WithTimeout(ctx, 30*time.Minute) defer cancel() var userIDs []uuid.UUID if err := w.db.WithContext(runCtx).Raw(` SELECT id FROM users WHERE deleted_at IS NOT NULL AND recovery_deadline IS NOT NULL AND recovery_deadline < NOW() `).Scan(&userIDs).Error; err != nil { logger.Error("Failed to query users for hard delete", zap.Error(err)) return } if len(userIDs) == 0 { logger.Debug("No users eligible for hard delete") return } logger.Info("Processing hard delete", zap.Int("count", len(userIDs))) for _, id := range userIDs { // Collect ES doc IDs BEFORE PG anonymization, while creator/user relations // are still resolvable. The anonymization UPDATE does not cascade, so the // tracks and playlists rows still exist afterwards — we just need their IDs. trackIDs := w.collectTrackIDs(runCtx, logger, id) playlistIDs := w.collectPlaylistIDs(runCtx, logger, id) // Final anonymization pass (ORIGIN §19.3) — ensure no PII remains in users row. res := w.db.WithContext(runCtx).Exec(` UPDATE users SET password_hash = '', first_name = NULL, last_name = NULL, bio = '', location = '', avatar = '', banner_url = '', recovery_deadline = NULL, updated_at = NOW() WHERE id = ? AND deleted_at IS NOT NULL `, id) if res.Error != nil { logger.Error("Failed to anonymize user", zap.Error(res.Error), zap.String("user_id", id.String())) continue } // Delete user_profiles (may contain PII) w.db.WithContext(runCtx).Exec("DELETE FROM user_profiles WHERE user_id = ?", id) // SECURITY(HIGH-007): RGPD — clean additional PII-containing tables w.db.WithContext(runCtx).Exec("DELETE FROM user_sessions WHERE user_id = ?", id) w.db.WithContext(runCtx).Exec("DELETE FROM user_settings WHERE user_id = ?", id) w.db.WithContext(runCtx).Exec("DELETE FROM user_follows WHERE follower_id = ? OR following_id = ?", id, id) w.db.WithContext(runCtx).Exec("DELETE FROM notifications WHERE user_id = ? OR actor_id = ?", id, id) w.db.WithContext(runCtx).Exec("UPDATE audit_logs SET user_id = NULL, ip_address = NULL WHERE user_id = ?", id) // Redis cache cleanup — user:{id}:* keys. if deleted, rerr := w.cleanRedisKeys(runCtx, id); rerr != nil { logger.Error("Redis cleanup failed", zap.Error(rerr), zap.String("user_id", id.String()), ) // Non-fatal: continue to ES so we still make partial progress. } else if deleted > 0 { logger.Info("Redis cleanup complete", zap.String("user_id", id.String()), zap.Int("deleted", deleted), ) } // Elasticsearch doc cleanup — user + owned tracks + owned playlists. if eerr := w.cleanESDocs(runCtx, id, trackIDs, playlistIDs); eerr != nil { logger.Error("Elasticsearch cleanup failed", zap.Error(eerr), zap.String("user_id", id.String()), ) } logger.Info("Hard delete completed", zap.String("user_id", id.String()), zap.Int("tracks_removed_from_es", len(trackIDs)), zap.Int("playlists_removed_from_es", len(playlistIDs)), ) } } // collectTrackIDs returns the UUIDs of tracks whose creator_id is the given user. // creator_id is the canonical ownership column; user_id is a denormalized legacy alias. func (w *HardDeleteWorker) collectTrackIDs(ctx context.Context, logger *zap.Logger, userID uuid.UUID) []uuid.UUID { var ids []uuid.UUID if err := w.db.WithContext(ctx).Raw( `SELECT id FROM tracks WHERE creator_id = ?`, userID, ).Scan(&ids).Error; err != nil { logger.Warn("Failed to collect track IDs for ES cleanup", zap.Error(err), zap.String("user_id", userID.String())) return nil } return ids } // collectPlaylistIDs returns the UUIDs of playlists owned by the user. func (w *HardDeleteWorker) collectPlaylistIDs(ctx context.Context, logger *zap.Logger, userID uuid.UUID) []uuid.UUID { var ids []uuid.UUID if err := w.db.WithContext(ctx).Raw( `SELECT id FROM playlists WHERE user_id = ?`, userID, ).Scan(&ids).Error; err != nil { logger.Warn("Failed to collect playlist IDs for ES cleanup", zap.Error(err), zap.String("user_id", userID.String())) return nil } return ids } // cleanRedisKeys removes all Redis keys matching user:{userID}:*. // // Uses SCAN with a cursor in a loop — NEVER KEYS, which would block the Redis server // on keyspaces with millions of keys. COUNT hints at ~100 keys per round trip; Redis may // return slightly fewer or more depending on internal bucket distribution. // // Transient Scan errors trigger up to 3 retries with exponential backoff. A persistent // error returns immediately without panicking. DEL errors are logged but non-fatal — // subsequent batches are still attempted so we don't leave the cache half-clean. func (w *HardDeleteWorker) cleanRedisKeys(ctx context.Context, userID uuid.UUID) (int, error) { if w.redisClient == nil { w.logger.Debug("Redis cleanup skipped (no client)", zap.String("user_id", userID.String())) return 0, nil } const ( batchSize = 100 maxRetries = 3 ) pattern := fmt.Sprintf("user:%s:*", userID.String()) var cursor uint64 totalDeleted := 0 for { var ( keys []string scanErr error ) retries := 0 for { keys, cursor, scanErr = w.redisClient.Scan(ctx, cursor, pattern, batchSize).Result() if scanErr == nil { break } retries++ w.logger.Warn("Redis SCAN failed, retrying", zap.Error(scanErr), zap.String("user_id", userID.String()), zap.String("pattern", pattern), zap.Int("retry", retries), ) if retries >= maxRetries { return totalDeleted, fmt.Errorf("redis SCAN failed after %d retries: %w", maxRetries, scanErr) } select { case <-ctx.Done(): return totalDeleted, ctx.Err() case <-time.After(time.Duration(retries) * 100 * time.Millisecond): } } if len(keys) > 0 { n, delErr := w.redisClient.Del(ctx, keys...).Result() if delErr != nil { w.logger.Error("Redis DEL failed for batch", zap.Error(delErr), zap.String("user_id", userID.String()), zap.Int("batch_size", len(keys)), ) // Non-fatal: continue scanning so later batches still get cleaned. } else { totalDeleted += int(n) } } if cursor == 0 { break } } return totalDeleted, nil } // cleanESDocs removes user-bound documents from Elasticsearch. // // - users index: delete the user doc by its document ID (the user UUID). // - tracks index: DeleteByQuery with a terms filter on _id for the collected track IDs. // - playlists index: DeleteByQuery with a terms filter on _id for the collected playlist IDs. // // Each index is handled independently: a failure on one does not prevent the others // from being attempted. The first encountered error is returned so the caller can log it, // but partial progress is still made. func (w *HardDeleteWorker) cleanESDocs( ctx context.Context, userID uuid.UUID, trackIDs []uuid.UUID, playlistIDs []uuid.UUID, ) error { if w.esClient == nil { w.logger.Debug("Elasticsearch cleanup skipped (no client)", zap.String("user_id", userID.String())) return nil } prefix := w.esClient.Config.Index var firstErr error // 1. Users index — delete by document ID. usersIdx := esIndexNameFor(prefix, vezaes.IdxUsers) if err := w.esDeleteDoc(ctx, usersIdx, userID.String()); err != nil { w.logger.Error("ES user doc delete failed", zap.Error(err), zap.String("user_id", userID.String()), zap.String("index", usersIdx), ) if firstErr == nil { firstErr = err } } // 2. Tracks index — batch delete by collected IDs. if len(trackIDs) > 0 { tracksIdx := esIndexNameFor(prefix, vezaes.IdxTracks) if err := w.esDeleteByIDs(ctx, tracksIdx, uuidsToStrings(trackIDs)); err != nil { w.logger.Error("ES tracks delete_by_query failed", zap.Error(err), zap.String("user_id", userID.String()), zap.String("index", tracksIdx), zap.Int("count", len(trackIDs)), ) if firstErr == nil { firstErr = err } } } // 3. Playlists index — batch delete by collected IDs. if len(playlistIDs) > 0 { playlistsIdx := esIndexNameFor(prefix, vezaes.IdxPlaylists) if err := w.esDeleteByIDs(ctx, playlistsIdx, uuidsToStrings(playlistIDs)); err != nil { w.logger.Error("ES playlists delete_by_query failed", zap.Error(err), zap.String("user_id", userID.String()), zap.String("index", playlistsIdx), zap.Int("count", len(playlistIDs)), ) if firstErr == nil { firstErr = err } } } return firstErr } // esDeleteDoc deletes a single document by ID. A 404 response is treated as success // (the doc was already gone, which is the desired end state). func (w *HardDeleteWorker) esDeleteDoc(ctx context.Context, index, docID string) error { req := esapi.DeleteRequest{ Index: index, DocumentID: docID, Refresh: "true", } res, err := req.Do(ctx, w.esClient) if err != nil { return fmt.Errorf("es delete: %w", err) } defer res.Body.Close() if res.StatusCode == 404 { return nil } if res.IsError() { return fmt.Errorf("es delete status %d: %s", res.StatusCode, res.String()) } return nil } // esDeleteByIDs runs a DeleteByQuery with a terms filter on _id. Safe for empty input // (returns nil immediately). func (w *HardDeleteWorker) esDeleteByIDs(ctx context.Context, index string, ids []string) error { if len(ids) == 0 { return nil } body := map[string]any{ "query": map[string]any{ "terms": map[string]any{ "_id": ids, }, }, } buf, err := json.Marshal(body) if err != nil { return fmt.Errorf("marshal delete_by_query body: %w", err) } refresh := true req := esapi.DeleteByQueryRequest{ Index: []string{index}, Body: bytes.NewReader(buf), Refresh: &refresh, } res, err := req.Do(ctx, w.esClient) if err != nil { return fmt.Errorf("es delete_by_query: %w", err) } defer res.Body.Close() if res.IsError() { return fmt.Errorf("es delete_by_query status %d: %s", res.StatusCode, res.String()) } return nil } // esIndexNameFor mirrors the internal/elasticsearch.indexName helper (private there). func esIndexNameFor(prefix, name string) string { if prefix != "" { return prefix + "-" + name } return "veza-" + name } func uuidsToStrings(ids []uuid.UUID) []string { out := make([]string, len(ids)) for i, id := range ids { out[i] = id.String() } return out }