fix(backend): J4 — GDPR-compliant hard delete with Redis and ES cleanup

Closes TODO(HIGH-007). When the hard-delete worker anonymizes a user past
their recovery deadline, it now also cleans the user's residual data from
Redis and Elasticsearch, not just PostgreSQL. Without this, a user who
invoked their right to erasure would still appear in cached feed/profile
responses and in ES search results for up to the next reindex cycle.

Worker changes (internal/workers/hard_delete_worker.go):

  WithRedis / WithElasticsearch builder methods inject the clients. Both
  are optional: if either is nil (feature disabled or unreachable), the
  corresponding cleanup is skipped with a debug log and the worker keeps
  going. Partial progress beats panic.

  cleanRedisKeys uses SCAN with a cursor loop (COUNT 100), NEVER KEYS —
  KEYS would block the Redis server on multi-million-key deployments.
  Pattern is user:{id}:*. Transient SCAN errors retry up to 3 times with
  100ms * retry linear backoff; persistent errors return without panic.
  DEL errors on a batch are logged but non-fatal so subsequent batches
  are still attempted.

  cleanESDocs hits three indices independently:
    - users index: DELETE doc by _id (the user UUID); 404 treated as
      success (already gone = desired state)
    - tracks index: DeleteByQuery with a terms filter on _id, using the
      list of track IDs collected from PostgreSQL BEFORE anonymization
    - playlists index: same pattern as tracks
  A failure on one index does not prevent the others from being tried;
  the first error is returned so the caller can log.

  Track/playlist IDs are pre-collected (collectTrackIDs, collectPlaylistIDs)
  before the UPDATE anonymization runs, because the anonymization does NOT
  cascade (no DELETE on users), so tracks and playlists rows remain with
  their creator_id / user_id intact and resolvable at query time.

Wiring (cmd/api/main.go):

  The worker now receives cfg.RedisClient directly, and an optional ES
  client built from elasticsearch.LoadConfig() + NewClient. If ES is
  disabled or unreachable at startup, the worker logs a warning and
  proceeds with Redis-only cleanup.

Tests (internal/workers/hard_delete_worker_test.go, +260 lines):

  Pure-function unit tests:
    - TestUUIDsToStrings
    - TestEsIndexNameFor
  Nil-client safety tests:
    - TestCleanRedisKeys_NilClientIsNoop
    - TestCleanESDocs_NilClientIsNoop
  ES mock-server tests (httptest.Server mimicking /_doc and
  /_delete_by_query endpoints with valid ES 8.11 responses):
    - TestCleanESDocs_CallsAllThreeIndices — verifies the three expected
      HTTP calls land with the right paths and request bodies containing
      the provided UUIDs
    - TestCleanESDocs_SkipsEmptyIDLists — verifies no DeleteByQuery is
      issued when the ID lists are empty
  Redis testcontainer integration test (gated by VEZA_SKIP_INTEGRATION):
    - TestCleanRedisKeys_Integration — seeds 154 keys (4 fixed + 150 bulk
      to force the SCAN loop past a single batch) plus 4 unrelated keys
      from another user / global, runs cleanRedisKeys, asserts all 154
      own keys are gone and all 4 unrelated keys remain.

Verification:
  go build ./...                                                OK
  go vet ./...                                                  OK
  VEZA_SKIP_INTEGRATION=1 go test ./internal/workers/... short  OK
  go test ./internal/workers/ -run TestCleanRedisKeys_Integration
    → testcontainers spins redis:7-alpine, test passes in 1.34s

Out of J4 scope (noted for a follow-up):
  - No "activity" ES index exists in the codebase today (the audit plan
    mentioned it as a possible target). The three real indices with user
    data — users, tracks, playlists — are all now cleaned.
  - Track artist strings (free-form) may still contain the user's
    display name as a cached value in the tracks index after this
    cleanup. Actual user-owned tracks are deleted here, but if a third
    party's track referenced the removed user in its artist field, that
    reference is not touched. Strict RGPD on that edge case is a
    separate ticket.

Refs: AUDIT_REPORT.md §8.5, §10 P5, §12 item 1
This commit is contained in:
senke 2026-04-15 12:25:39 +02:00
parent 67f18892af
commit 9cdfc6d898
3 changed files with 609 additions and 18 deletions

View file

@ -21,6 +21,7 @@ import (
"veza-backend-api/internal/api"
"veza-backend-api/internal/config"
"veza-backend-api/internal/core/marketplace"
vezaes "veza-backend-api/internal/elasticsearch"
"veza-backend-api/internal/metrics"
"veza-backend-api/internal/services"
"veza-backend-api/internal/shutdown"
@ -218,10 +219,26 @@ func main() {
// v0.10.8 F065: Hard delete worker (GDPR - final anonymization after 30 days)
if os.Getenv("HARD_DELETE_CRON_ENABLED") != "false" {
hardDeleteWorker := workers.NewHardDeleteWorker(db.GormDB, logger, 24*time.Hour)
// Optional ES client for the worker's RGPD cleanup (users/tracks/playlists indices).
// Non-fatal if ES is disabled or unreachable — the worker will just skip ES cleanup.
var hardDeleteESClient *vezaes.Client
if esCfg := vezaes.LoadConfig(); esCfg.Enabled {
if esc, esErr := vezaes.NewClient(esCfg, logger); esErr != nil {
logger.Warn("Elasticsearch unavailable for hard delete worker, ES cleanup disabled",
zap.Error(esErr))
} else {
hardDeleteESClient = esc
}
}
hardDeleteWorker := workers.NewHardDeleteWorker(db.GormDB, logger, 24*time.Hour).
WithRedis(cfg.RedisClient).
WithElasticsearch(hardDeleteESClient)
hardDeleteCtx, hardDeleteCancel := context.WithCancel(context.Background())
go hardDeleteWorker.Start(hardDeleteCtx)
logger.Info("Hard delete worker started (24h interval)")
logger.Info("Hard delete worker started (24h interval)",
zap.Bool("redis_cleanup", cfg.RedisClient != nil),
zap.Bool("es_cleanup", hardDeleteESClient != nil),
)
shutdownManager.Register(shutdown.NewShutdownFunc("hard_delete_worker", func(ctx context.Context) error {
hardDeleteWorker.Stop()

View file

@ -1,25 +1,49 @@
package workers
import (
"bytes"
"context"
"encoding/json"
"fmt"
"time"
"github.com/elastic/go-elasticsearch/v8/esapi"
"github.com/google/uuid"
"github.com/redis/go-redis/v9"
"go.uber.org/zap"
"gorm.io/gorm"
vezaes "veza-backend-api/internal/elasticsearch"
)
// HardDeleteWorker performs final GDPR cleanup for users past recovery deadline (v0.10.8 F065)
// Runs periodically to irreversibly anonymize accounts that were soft-deleted 30+ days ago
// HardDeleteWorker performs final GDPR cleanup for users past recovery deadline (v0.10.8 F065).
// Runs periodically to irreversibly anonymize accounts that were soft-deleted 30+ days ago.
//
// The worker cleans three stores:
// - PostgreSQL: anonymize users row, delete from user_profiles, user_sessions, user_settings,
// user_follows, notifications, and null user_id in audit_logs.
// - Redis (optional): all keys matching user:{id}:* using SCAN with a cursor — never KEYS,
// which would block the server on large keyspaces.
// - Elasticsearch (optional): the user doc from the users index, plus all track docs and
// playlist docs authored by the user (collected from PG before anonymization, then removed
// from ES via DeleteByQuery on the _id terms).
//
// Both Redis and Elasticsearch are optional. If either client is nil (feature disabled or
// unreachable at startup), the corresponding cleanup is skipped with a debug log and the
// worker continues. Transient Redis errors trigger a bounded retry loop; persistent errors
// are logged and the worker moves on to the next user (no silent panic).
type HardDeleteWorker struct {
db *gorm.DB
redisClient *redis.Client
esClient *vezaes.Client
logger *zap.Logger
interval time.Duration
stopChan chan struct{}
running bool
}
// NewHardDeleteWorker creates a new hard delete worker
// NewHardDeleteWorker creates a new hard delete worker with only the required PostgreSQL
// dependency. Use WithRedis and WithElasticsearch to enable the corresponding cleanups.
func NewHardDeleteWorker(db *gorm.DB, logger *zap.Logger, interval time.Duration) *HardDeleteWorker {
if logger == nil {
logger = zap.NewNop()
@ -36,14 +60,32 @@ func NewHardDeleteWorker(db *gorm.DB, logger *zap.Logger, interval time.Duration
}
}
// Start runs the worker periodically
// WithRedis enables Redis cache key cleanup for the user:{id}:* pattern.
// Pass nil to explicitly disable; pass a live *redis.Client to enable.
func (w *HardDeleteWorker) WithRedis(client *redis.Client) *HardDeleteWorker {
w.redisClient = client
return w
}
// WithElasticsearch enables Elasticsearch document cleanup. The veza wrapper is expected
// (it carries the index prefix via Config). Pass nil to explicitly disable.
func (w *HardDeleteWorker) WithElasticsearch(client *vezaes.Client) *HardDeleteWorker {
w.esClient = client
return w
}
// Start runs the worker periodically.
func (w *HardDeleteWorker) Start(ctx context.Context) {
if w.running {
w.logger.Warn("Hard delete worker is already running")
return
}
w.running = true
w.logger.Info("Starting hard delete worker", zap.Duration("interval", w.interval))
w.logger.Info("Starting hard delete worker",
zap.Duration("interval", w.interval),
zap.Bool("redis_enabled", w.redisClient != nil),
zap.Bool("elasticsearch_enabled", w.esClient != nil),
)
go w.runOnce(ctx)
@ -66,7 +108,7 @@ func (w *HardDeleteWorker) Start(ctx context.Context) {
}
}
// Stop stops the worker
// Stop stops the worker.
func (w *HardDeleteWorker) Stop() {
if !w.running {
return
@ -74,7 +116,7 @@ func (w *HardDeleteWorker) Stop() {
close(w.stopChan)
}
// runOnce executes one pass of hard delete / final anonymization
// runOnce executes one pass of hard delete / final anonymization.
func (w *HardDeleteWorker) runOnce(ctx context.Context) {
logger := w.logger.With(zap.String("worker", "hard_delete"))
runCtx, cancel := context.WithTimeout(ctx, 30*time.Minute)
@ -97,7 +139,13 @@ func (w *HardDeleteWorker) runOnce(ctx context.Context) {
logger.Info("Processing hard delete", zap.Int("count", len(userIDs)))
for _, id := range userIDs {
// Final anonymization pass (ORIGIN §19.3) - ensure no PII remains
// Collect ES doc IDs BEFORE PG anonymization, while creator/user relations
// are still resolvable. The anonymization UPDATE does not cascade, so the
// tracks and playlists rows still exist afterwards — we just need their IDs.
trackIDs := w.collectTrackIDs(runCtx, logger, id)
playlistIDs := w.collectPlaylistIDs(runCtx, logger, id)
// Final anonymization pass (ORIGIN §19.3) — ensure no PII remains in users row.
res := w.db.WithContext(runCtx).Exec(`
UPDATE users SET
password_hash = '',
@ -124,9 +172,275 @@ func (w *HardDeleteWorker) runOnce(ctx context.Context) {
w.db.WithContext(runCtx).Exec("DELETE FROM user_follows WHERE follower_id = ? OR following_id = ?", id, id)
w.db.WithContext(runCtx).Exec("DELETE FROM notifications WHERE user_id = ? OR actor_id = ?", id, id)
w.db.WithContext(runCtx).Exec("UPDATE audit_logs SET user_id = NULL, ip_address = NULL WHERE user_id = ?", id)
// TODO(HIGH-007): Clean Redis cache keys (user:{id}:*) and Elasticsearch user documents.
// Requires injecting Redis/ES clients into HardDeleteWorker.
logger.Info("Hard delete completed", zap.String("user_id", id.String()))
// Redis cache cleanup — user:{id}:* keys.
if deleted, rerr := w.cleanRedisKeys(runCtx, id); rerr != nil {
logger.Error("Redis cleanup failed",
zap.Error(rerr),
zap.String("user_id", id.String()),
)
// Non-fatal: continue to ES so we still make partial progress.
} else if deleted > 0 {
logger.Info("Redis cleanup complete",
zap.String("user_id", id.String()),
zap.Int("deleted", deleted),
)
}
// Elasticsearch doc cleanup — user + owned tracks + owned playlists.
if eerr := w.cleanESDocs(runCtx, id, trackIDs, playlistIDs); eerr != nil {
logger.Error("Elasticsearch cleanup failed",
zap.Error(eerr),
zap.String("user_id", id.String()),
)
}
logger.Info("Hard delete completed",
zap.String("user_id", id.String()),
zap.Int("tracks_removed_from_es", len(trackIDs)),
zap.Int("playlists_removed_from_es", len(playlistIDs)),
)
}
}
// collectTrackIDs returns the UUIDs of tracks whose creator_id is the given user.
// creator_id is the canonical ownership column; user_id is a denormalized legacy alias.
func (w *HardDeleteWorker) collectTrackIDs(ctx context.Context, logger *zap.Logger, userID uuid.UUID) []uuid.UUID {
var ids []uuid.UUID
if err := w.db.WithContext(ctx).Raw(
`SELECT id FROM tracks WHERE creator_id = ?`, userID,
).Scan(&ids).Error; err != nil {
logger.Warn("Failed to collect track IDs for ES cleanup",
zap.Error(err), zap.String("user_id", userID.String()))
return nil
}
return ids
}
// collectPlaylistIDs returns the UUIDs of playlists owned by the user.
func (w *HardDeleteWorker) collectPlaylistIDs(ctx context.Context, logger *zap.Logger, userID uuid.UUID) []uuid.UUID {
var ids []uuid.UUID
if err := w.db.WithContext(ctx).Raw(
`SELECT id FROM playlists WHERE user_id = ?`, userID,
).Scan(&ids).Error; err != nil {
logger.Warn("Failed to collect playlist IDs for ES cleanup",
zap.Error(err), zap.String("user_id", userID.String()))
return nil
}
return ids
}
// cleanRedisKeys removes all Redis keys matching user:{userID}:*.
//
// Uses SCAN with a cursor in a loop — NEVER KEYS, which would block the Redis server
// on keyspaces with millions of keys. COUNT hints at ~100 keys per round trip; Redis may
// return slightly fewer or more depending on internal bucket distribution.
//
// Transient Scan errors trigger up to 3 retries with exponential backoff. A persistent
// error returns immediately without panicking. DEL errors are logged but non-fatal —
// subsequent batches are still attempted so we don't leave the cache half-clean.
func (w *HardDeleteWorker) cleanRedisKeys(ctx context.Context, userID uuid.UUID) (int, error) {
if w.redisClient == nil {
w.logger.Debug("Redis cleanup skipped (no client)", zap.String("user_id", userID.String()))
return 0, nil
}
const (
batchSize = 100
maxRetries = 3
)
pattern := fmt.Sprintf("user:%s:*", userID.String())
var cursor uint64
totalDeleted := 0
for {
var (
keys []string
scanErr error
)
retries := 0
for {
keys, cursor, scanErr = w.redisClient.Scan(ctx, cursor, pattern, batchSize).Result()
if scanErr == nil {
break
}
retries++
w.logger.Warn("Redis SCAN failed, retrying",
zap.Error(scanErr),
zap.String("user_id", userID.String()),
zap.String("pattern", pattern),
zap.Int("retry", retries),
)
if retries >= maxRetries {
return totalDeleted, fmt.Errorf("redis SCAN failed after %d retries: %w", maxRetries, scanErr)
}
select {
case <-ctx.Done():
return totalDeleted, ctx.Err()
case <-time.After(time.Duration(retries) * 100 * time.Millisecond):
}
}
if len(keys) > 0 {
n, delErr := w.redisClient.Del(ctx, keys...).Result()
if delErr != nil {
w.logger.Error("Redis DEL failed for batch",
zap.Error(delErr),
zap.String("user_id", userID.String()),
zap.Int("batch_size", len(keys)),
)
// Non-fatal: continue scanning so later batches still get cleaned.
} else {
totalDeleted += int(n)
}
}
if cursor == 0 {
break
}
}
return totalDeleted, nil
}
// cleanESDocs removes user-bound documents from Elasticsearch.
//
// - users index: delete the user doc by its document ID (the user UUID).
// - tracks index: DeleteByQuery with a terms filter on _id for the collected track IDs.
// - playlists index: DeleteByQuery with a terms filter on _id for the collected playlist IDs.
//
// Each index is handled independently: a failure on one does not prevent the others
// from being attempted. The first encountered error is returned so the caller can log it,
// but partial progress is still made.
func (w *HardDeleteWorker) cleanESDocs(
ctx context.Context,
userID uuid.UUID,
trackIDs []uuid.UUID,
playlistIDs []uuid.UUID,
) error {
if w.esClient == nil {
w.logger.Debug("Elasticsearch cleanup skipped (no client)", zap.String("user_id", userID.String()))
return nil
}
prefix := w.esClient.Config.Index
var firstErr error
// 1. Users index — delete by document ID.
usersIdx := esIndexNameFor(prefix, vezaes.IdxUsers)
if err := w.esDeleteDoc(ctx, usersIdx, userID.String()); err != nil {
w.logger.Error("ES user doc delete failed",
zap.Error(err),
zap.String("user_id", userID.String()),
zap.String("index", usersIdx),
)
if firstErr == nil {
firstErr = err
}
}
// 2. Tracks index — batch delete by collected IDs.
if len(trackIDs) > 0 {
tracksIdx := esIndexNameFor(prefix, vezaes.IdxTracks)
if err := w.esDeleteByIDs(ctx, tracksIdx, uuidsToStrings(trackIDs)); err != nil {
w.logger.Error("ES tracks delete_by_query failed",
zap.Error(err),
zap.String("user_id", userID.String()),
zap.String("index", tracksIdx),
zap.Int("count", len(trackIDs)),
)
if firstErr == nil {
firstErr = err
}
}
}
// 3. Playlists index — batch delete by collected IDs.
if len(playlistIDs) > 0 {
playlistsIdx := esIndexNameFor(prefix, vezaes.IdxPlaylists)
if err := w.esDeleteByIDs(ctx, playlistsIdx, uuidsToStrings(playlistIDs)); err != nil {
w.logger.Error("ES playlists delete_by_query failed",
zap.Error(err),
zap.String("user_id", userID.String()),
zap.String("index", playlistsIdx),
zap.Int("count", len(playlistIDs)),
)
if firstErr == nil {
firstErr = err
}
}
}
return firstErr
}
// esDeleteDoc deletes a single document by ID. A 404 response is treated as success
// (the doc was already gone, which is the desired end state).
func (w *HardDeleteWorker) esDeleteDoc(ctx context.Context, index, docID string) error {
req := esapi.DeleteRequest{
Index: index,
DocumentID: docID,
Refresh: "true",
}
res, err := req.Do(ctx, w.esClient)
if err != nil {
return fmt.Errorf("es delete: %w", err)
}
defer res.Body.Close()
if res.StatusCode == 404 {
return nil
}
if res.IsError() {
return fmt.Errorf("es delete status %d: %s", res.StatusCode, res.String())
}
return nil
}
// esDeleteByIDs runs a DeleteByQuery with a terms filter on _id. Safe for empty input
// (returns nil immediately).
func (w *HardDeleteWorker) esDeleteByIDs(ctx context.Context, index string, ids []string) error {
if len(ids) == 0 {
return nil
}
body := map[string]any{
"query": map[string]any{
"terms": map[string]any{
"_id": ids,
},
},
}
buf, err := json.Marshal(body)
if err != nil {
return fmt.Errorf("marshal delete_by_query body: %w", err)
}
refresh := true
req := esapi.DeleteByQueryRequest{
Index: []string{index},
Body: bytes.NewReader(buf),
Refresh: &refresh,
}
res, err := req.Do(ctx, w.esClient)
if err != nil {
return fmt.Errorf("es delete_by_query: %w", err)
}
defer res.Body.Close()
if res.IsError() {
return fmt.Errorf("es delete_by_query status %d: %s", res.StatusCode, res.String())
}
return nil
}
// esIndexNameFor mirrors the internal/elasticsearch.indexName helper (private there).
func esIndexNameFor(prefix, name string) string {
if prefix != "" {
return prefix + "-" + name
}
return "veza-" + name
}
func uuidsToStrings(ids []uuid.UUID) []string {
out := make([]string, len(ids))
for i, id := range ids {
out[i] = id.String()
}
return out
}

View file

@ -0,0 +1,260 @@
package workers
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"
"time"
elasticsearch "github.com/elastic/go-elasticsearch/v8"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
vezaes "veza-backend-api/internal/elasticsearch"
"veza-backend-api/internal/testutils"
)
// ---------------------------------------------------------------------------
// Pure helpers
// ---------------------------------------------------------------------------
func TestUUIDsToStrings(t *testing.T) {
u1 := uuid.New()
u2 := uuid.New()
got := uuidsToStrings([]uuid.UUID{u1, u2})
require.Len(t, got, 2)
assert.Equal(t, u1.String(), got[0])
assert.Equal(t, u2.String(), got[1])
assert.Empty(t, uuidsToStrings(nil))
assert.Empty(t, uuidsToStrings([]uuid.UUID{}))
}
func TestEsIndexNameFor(t *testing.T) {
assert.Equal(t, "veza-users", esIndexNameFor("", "users"))
assert.Equal(t, "veza-tracks", esIndexNameFor("", "tracks"))
assert.Equal(t, "myprefix-users", esIndexNameFor("myprefix", "users"))
}
// ---------------------------------------------------------------------------
// Nil-client safety
// ---------------------------------------------------------------------------
func TestCleanRedisKeys_NilClientIsNoop(t *testing.T) {
w := NewHardDeleteWorker(nil, zap.NewNop(), time.Hour)
// redisClient stays nil — WithRedis never called
deleted, err := w.cleanRedisKeys(context.Background(), uuid.New())
require.NoError(t, err)
assert.Equal(t, 0, deleted)
}
func TestCleanESDocs_NilClientIsNoop(t *testing.T) {
w := NewHardDeleteWorker(nil, zap.NewNop(), time.Hour)
// esClient stays nil
err := w.cleanESDocs(context.Background(), uuid.New(), []uuid.UUID{uuid.New()}, []uuid.UUID{uuid.New()})
require.NoError(t, err)
}
// ---------------------------------------------------------------------------
// Redis integration test (gated by VEZA_SKIP_INTEGRATION)
// ---------------------------------------------------------------------------
func TestCleanRedisKeys_Integration(t *testing.T) {
testutils.SkipIfNoIntegration(t)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
client, err := testutils.GetTestRedisClient(ctx)
require.NoError(t, err, "redis testcontainer setup")
require.NotNil(t, client)
userID := uuid.New()
otherID := uuid.New()
// Seed a bunch of keys across two users + some unrelated keys.
ownKeys := []string{
fmt.Sprintf("user:%s:session", userID),
fmt.Sprintf("user:%s:profile", userID),
fmt.Sprintf("user:%s:feed:page:0", userID),
fmt.Sprintf("user:%s:feed:page:1", userID),
}
// Also seed a larger batch to exercise the SCAN loop (>100 keys).
for i := 0; i < 150; i++ {
ownKeys = append(ownKeys, fmt.Sprintf("user:%s:bulk:%d", userID, i))
}
unrelated := []string{
fmt.Sprintf("user:%s:session", otherID),
fmt.Sprintf("user:%s:profile", otherID),
"global:config",
"cache:homepage",
}
for _, k := range append(ownKeys, unrelated...) {
require.NoError(t, client.Set(ctx, k, "x", time.Minute).Err(), "seed key %s", k)
}
w := NewHardDeleteWorker(nil, zap.NewNop(), time.Hour).WithRedis(client)
deleted, err := w.cleanRedisKeys(ctx, userID)
require.NoError(t, err)
assert.GreaterOrEqual(t, deleted, len(ownKeys),
"expected at least %d keys deleted, got %d", len(ownKeys), deleted)
// All own keys must be gone.
for _, k := range ownKeys {
n, err := client.Exists(ctx, k).Result()
require.NoError(t, err)
assert.Equal(t, int64(0), n, "key %s should be gone", k)
}
// All unrelated keys must remain.
for _, k := range unrelated {
n, err := client.Exists(ctx, k).Result()
require.NoError(t, err)
assert.Equal(t, int64(1), n, "key %s should still exist", k)
}
// Cleanup the unrelated keys we seeded so the shared container stays tidy.
_ = client.Del(ctx, unrelated...).Err()
}
// ---------------------------------------------------------------------------
// Elasticsearch test using a mock HTTP server
// ---------------------------------------------------------------------------
// esMockRecord captures a received request for later assertion.
type esMockRecord struct {
Method string
Path string
Query string
Body string
}
func newESMockServer(t *testing.T) (*httptest.Server, *[]esMockRecord, *sync.Mutex) {
t.Helper()
var (
records []esMockRecord
mu sync.Mutex
)
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, _ := io.ReadAll(r.Body)
mu.Lock()
records = append(records, esMockRecord{
Method: r.Method,
Path: r.URL.Path,
Query: r.URL.RawQuery,
Body: string(body),
})
mu.Unlock()
// The go-elasticsearch client first calls GET / (Info) during NewClient-less flows,
// and also sends HEAD requests for index existence checks. We answer minimal valid JSON.
switch {
case r.Method == http.MethodGet && r.URL.Path == "/":
w.Header().Set("X-Elastic-Product", "Elasticsearch")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"version":{"number":"8.11.0"},"tagline":"You Know, for Search"}`))
case r.Method == http.MethodDelete && strings.Contains(r.URL.Path, "/_doc/"):
// Single doc delete.
w.Header().Set("X-Elastic-Product", "Elasticsearch")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"_index":"x","_id":"y","result":"deleted","_shards":{"total":1,"successful":1,"failed":0}}`))
case r.Method == http.MethodPost && strings.HasSuffix(r.URL.Path, "/_delete_by_query"):
w.Header().Set("X-Elastic-Product", "Elasticsearch")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"took":1,"timed_out":false,"total":2,"deleted":2,"batches":1,"version_conflicts":0,"noops":0,"retries":{"bulk":0,"search":0},"throttled_millis":0,"requests_per_second":-1.0,"throttled_until_millis":0,"failures":[]}`))
default:
// Fallback OK for any other request (e.g. internal health probes).
w.Header().Set("X-Elastic-Product", "Elasticsearch")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{}`))
}
}))
return srv, &records, &mu
}
// newMockVezaESClient builds a veza wrapper pointing at the mock server.
func newMockVezaESClient(t *testing.T, baseURL, indexPrefix string) *vezaes.Client {
t.Helper()
rawCfg := elasticsearch.Config{Addresses: []string{baseURL}}
raw, err := elasticsearch.NewClient(rawCfg)
require.NoError(t, err)
return &vezaes.Client{
Client: raw,
Config: vezaes.Config{URL: baseURL, Index: indexPrefix, Enabled: true},
Logger: zap.NewNop(),
}
}
func TestCleanESDocs_CallsAllThreeIndices(t *testing.T) {
srv, records, mu := newESMockServer(t)
defer srv.Close()
esClient := newMockVezaESClient(t, srv.URL, "myenv")
w := NewHardDeleteWorker(nil, zap.NewNop(), time.Hour).WithElasticsearch(esClient)
userID := uuid.New()
trackIDs := []uuid.UUID{uuid.New(), uuid.New()}
playlistIDs := []uuid.UUID{uuid.New()}
err := w.cleanESDocs(context.Background(), userID, trackIDs, playlistIDs)
require.NoError(t, err)
mu.Lock()
defer mu.Unlock()
// Inspect recorded requests.
var sawUserDelete, sawTracksDBQ, sawPlaylistsDBQ bool
for _, rec := range *records {
switch {
case rec.Method == http.MethodDelete &&
strings.HasPrefix(rec.Path, "/myenv-users/_doc/") &&
strings.HasSuffix(rec.Path, userID.String()):
sawUserDelete = true
case rec.Method == http.MethodPost &&
rec.Path == "/myenv-tracks/_delete_by_query":
sawTracksDBQ = true
// body must contain both track UUIDs
var parsed map[string]any
require.NoError(t, json.Unmarshal([]byte(rec.Body), &parsed))
assert.Contains(t, rec.Body, trackIDs[0].String())
assert.Contains(t, rec.Body, trackIDs[1].String())
case rec.Method == http.MethodPost &&
rec.Path == "/myenv-playlists/_delete_by_query":
sawPlaylistsDBQ = true
assert.Contains(t, rec.Body, playlistIDs[0].String())
}
}
assert.True(t, sawUserDelete, "expected DELETE /myenv-users/_doc/{userID}")
assert.True(t, sawTracksDBQ, "expected POST /myenv-tracks/_delete_by_query")
assert.True(t, sawPlaylistsDBQ, "expected POST /myenv-playlists/_delete_by_query")
}
func TestCleanESDocs_SkipsEmptyIDLists(t *testing.T) {
srv, records, mu := newESMockServer(t)
defer srv.Close()
esClient := newMockVezaESClient(t, srv.URL, "")
w := NewHardDeleteWorker(nil, zap.NewNop(), time.Hour).WithElasticsearch(esClient)
// No tracks, no playlists — only the user doc delete should happen.
err := w.cleanESDocs(context.Background(), uuid.New(), nil, nil)
require.NoError(t, err)
mu.Lock()
defer mu.Unlock()
var dbqCount int
for _, rec := range *records {
if rec.Method == http.MethodPost && strings.HasSuffix(rec.Path, "/_delete_by_query") {
dbqCount++
}
}
assert.Equal(t, 0, dbqCount, "no delete_by_query should have been issued")
}