Closes TODO(HIGH-007). When the hard-delete worker anonymizes a user past
their recovery deadline, it now also cleans the user's residual data from
Redis and Elasticsearch, not just PostgreSQL. Without this, a user who
invoked their right to erasure would still appear in cached feed/profile
responses and in ES search results for up to the next reindex cycle.
Worker changes (internal/workers/hard_delete_worker.go):
WithRedis / WithElasticsearch builder methods inject the clients. Both
are optional: if either is nil (feature disabled or unreachable), the
corresponding cleanup is skipped with a debug log and the worker keeps
going. Partial progress beats panic.
cleanRedisKeys uses SCAN with a cursor loop (COUNT 100), NEVER KEYS —
KEYS would block the Redis server on multi-million-key deployments.
Pattern is user:{id}:*. Transient SCAN errors retry up to 3 times with
100ms * retry linear backoff; persistent errors return without panic.
DEL errors on a batch are logged but non-fatal so subsequent batches
are still attempted.
cleanESDocs hits three indices independently:
- users index: DELETE doc by _id (the user UUID); 404 treated as
success (already gone = desired state)
- tracks index: DeleteByQuery with a terms filter on _id, using the
list of track IDs collected from PostgreSQL BEFORE anonymization
- playlists index: same pattern as tracks
A failure on one index does not prevent the others from being tried;
the first error is returned so the caller can log.
Track/playlist IDs are pre-collected (collectTrackIDs, collectPlaylistIDs)
before the UPDATE anonymization runs, because the anonymization does NOT
cascade (no DELETE on users), so tracks and playlists rows remain with
their creator_id / user_id intact and resolvable at query time.
Wiring (cmd/api/main.go):
The worker now receives cfg.RedisClient directly, and an optional ES
client built from elasticsearch.LoadConfig() + NewClient. If ES is
disabled or unreachable at startup, the worker logs a warning and
proceeds with Redis-only cleanup.
Tests (internal/workers/hard_delete_worker_test.go, +260 lines):
Pure-function unit tests:
- TestUUIDsToStrings
- TestEsIndexNameFor
Nil-client safety tests:
- TestCleanRedisKeys_NilClientIsNoop
- TestCleanESDocs_NilClientIsNoop
ES mock-server tests (httptest.Server mimicking /_doc and
/_delete_by_query endpoints with valid ES 8.11 responses):
- TestCleanESDocs_CallsAllThreeIndices — verifies the three expected
HTTP calls land with the right paths and request bodies containing
the provided UUIDs
- TestCleanESDocs_SkipsEmptyIDLists — verifies no DeleteByQuery is
issued when the ID lists are empty
Redis testcontainer integration test (gated by VEZA_SKIP_INTEGRATION):
- TestCleanRedisKeys_Integration — seeds 154 keys (4 fixed + 150 bulk
to force the SCAN loop past a single batch) plus 4 unrelated keys
from another user / global, runs cleanRedisKeys, asserts all 154
own keys are gone and all 4 unrelated keys remain.
Verification:
go build ./... OK
go vet ./... OK
VEZA_SKIP_INTEGRATION=1 go test ./internal/workers/... short OK
go test ./internal/workers/ -run TestCleanRedisKeys_Integration
→ testcontainers spins redis:7-alpine, test passes in 1.34s
Out of J4 scope (noted for a follow-up):
- No "activity" ES index exists in the codebase today (the audit plan
mentioned it as a possible target). The three real indices with user
data — users, tracks, playlists — are all now cleaned.
- Track artist strings (free-form) may still contain the user's
display name as a cached value in the tracks index after this
cleanup. Actual user-owned tracks are deleted here, but if a third
party's track referenced the removed user in its artist field, that
reference is not touched. Strict RGPD on that edge case is a
separate ticket.
Refs: AUDIT_REPORT.md §8.5, §10 P5, §12 item 1
260 lines
8.7 KiB
Go
260 lines
8.7 KiB
Go
package workers
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"strings"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
elasticsearch "github.com/elastic/go-elasticsearch/v8"
|
|
"github.com/google/uuid"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
"go.uber.org/zap"
|
|
|
|
vezaes "veza-backend-api/internal/elasticsearch"
|
|
"veza-backend-api/internal/testutils"
|
|
)
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Pure helpers
|
|
// ---------------------------------------------------------------------------
|
|
|
|
func TestUUIDsToStrings(t *testing.T) {
|
|
u1 := uuid.New()
|
|
u2 := uuid.New()
|
|
got := uuidsToStrings([]uuid.UUID{u1, u2})
|
|
require.Len(t, got, 2)
|
|
assert.Equal(t, u1.String(), got[0])
|
|
assert.Equal(t, u2.String(), got[1])
|
|
|
|
assert.Empty(t, uuidsToStrings(nil))
|
|
assert.Empty(t, uuidsToStrings([]uuid.UUID{}))
|
|
}
|
|
|
|
func TestEsIndexNameFor(t *testing.T) {
|
|
assert.Equal(t, "veza-users", esIndexNameFor("", "users"))
|
|
assert.Equal(t, "veza-tracks", esIndexNameFor("", "tracks"))
|
|
assert.Equal(t, "myprefix-users", esIndexNameFor("myprefix", "users"))
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Nil-client safety
|
|
// ---------------------------------------------------------------------------
|
|
|
|
func TestCleanRedisKeys_NilClientIsNoop(t *testing.T) {
|
|
w := NewHardDeleteWorker(nil, zap.NewNop(), time.Hour)
|
|
// redisClient stays nil — WithRedis never called
|
|
deleted, err := w.cleanRedisKeys(context.Background(), uuid.New())
|
|
require.NoError(t, err)
|
|
assert.Equal(t, 0, deleted)
|
|
}
|
|
|
|
func TestCleanESDocs_NilClientIsNoop(t *testing.T) {
|
|
w := NewHardDeleteWorker(nil, zap.NewNop(), time.Hour)
|
|
// esClient stays nil
|
|
err := w.cleanESDocs(context.Background(), uuid.New(), []uuid.UUID{uuid.New()}, []uuid.UUID{uuid.New()})
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Redis integration test (gated by VEZA_SKIP_INTEGRATION)
|
|
// ---------------------------------------------------------------------------
|
|
|
|
func TestCleanRedisKeys_Integration(t *testing.T) {
|
|
testutils.SkipIfNoIntegration(t)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
|
|
client, err := testutils.GetTestRedisClient(ctx)
|
|
require.NoError(t, err, "redis testcontainer setup")
|
|
require.NotNil(t, client)
|
|
|
|
userID := uuid.New()
|
|
otherID := uuid.New()
|
|
|
|
// Seed a bunch of keys across two users + some unrelated keys.
|
|
ownKeys := []string{
|
|
fmt.Sprintf("user:%s:session", userID),
|
|
fmt.Sprintf("user:%s:profile", userID),
|
|
fmt.Sprintf("user:%s:feed:page:0", userID),
|
|
fmt.Sprintf("user:%s:feed:page:1", userID),
|
|
}
|
|
// Also seed a larger batch to exercise the SCAN loop (>100 keys).
|
|
for i := 0; i < 150; i++ {
|
|
ownKeys = append(ownKeys, fmt.Sprintf("user:%s:bulk:%d", userID, i))
|
|
}
|
|
unrelated := []string{
|
|
fmt.Sprintf("user:%s:session", otherID),
|
|
fmt.Sprintf("user:%s:profile", otherID),
|
|
"global:config",
|
|
"cache:homepage",
|
|
}
|
|
for _, k := range append(ownKeys, unrelated...) {
|
|
require.NoError(t, client.Set(ctx, k, "x", time.Minute).Err(), "seed key %s", k)
|
|
}
|
|
|
|
w := NewHardDeleteWorker(nil, zap.NewNop(), time.Hour).WithRedis(client)
|
|
deleted, err := w.cleanRedisKeys(ctx, userID)
|
|
require.NoError(t, err)
|
|
assert.GreaterOrEqual(t, deleted, len(ownKeys),
|
|
"expected at least %d keys deleted, got %d", len(ownKeys), deleted)
|
|
|
|
// All own keys must be gone.
|
|
for _, k := range ownKeys {
|
|
n, err := client.Exists(ctx, k).Result()
|
|
require.NoError(t, err)
|
|
assert.Equal(t, int64(0), n, "key %s should be gone", k)
|
|
}
|
|
// All unrelated keys must remain.
|
|
for _, k := range unrelated {
|
|
n, err := client.Exists(ctx, k).Result()
|
|
require.NoError(t, err)
|
|
assert.Equal(t, int64(1), n, "key %s should still exist", k)
|
|
}
|
|
|
|
// Cleanup the unrelated keys we seeded so the shared container stays tidy.
|
|
_ = client.Del(ctx, unrelated...).Err()
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Elasticsearch test using a mock HTTP server
|
|
// ---------------------------------------------------------------------------
|
|
|
|
// esMockRecord captures a received request for later assertion.
|
|
type esMockRecord struct {
|
|
Method string
|
|
Path string
|
|
Query string
|
|
Body string
|
|
}
|
|
|
|
func newESMockServer(t *testing.T) (*httptest.Server, *[]esMockRecord, *sync.Mutex) {
|
|
t.Helper()
|
|
var (
|
|
records []esMockRecord
|
|
mu sync.Mutex
|
|
)
|
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
body, _ := io.ReadAll(r.Body)
|
|
mu.Lock()
|
|
records = append(records, esMockRecord{
|
|
Method: r.Method,
|
|
Path: r.URL.Path,
|
|
Query: r.URL.RawQuery,
|
|
Body: string(body),
|
|
})
|
|
mu.Unlock()
|
|
|
|
// The go-elasticsearch client first calls GET / (Info) during NewClient-less flows,
|
|
// and also sends HEAD requests for index existence checks. We answer minimal valid JSON.
|
|
switch {
|
|
case r.Method == http.MethodGet && r.URL.Path == "/":
|
|
w.Header().Set("X-Elastic-Product", "Elasticsearch")
|
|
w.WriteHeader(http.StatusOK)
|
|
_, _ = w.Write([]byte(`{"version":{"number":"8.11.0"},"tagline":"You Know, for Search"}`))
|
|
case r.Method == http.MethodDelete && strings.Contains(r.URL.Path, "/_doc/"):
|
|
// Single doc delete.
|
|
w.Header().Set("X-Elastic-Product", "Elasticsearch")
|
|
w.WriteHeader(http.StatusOK)
|
|
_, _ = w.Write([]byte(`{"_index":"x","_id":"y","result":"deleted","_shards":{"total":1,"successful":1,"failed":0}}`))
|
|
case r.Method == http.MethodPost && strings.HasSuffix(r.URL.Path, "/_delete_by_query"):
|
|
w.Header().Set("X-Elastic-Product", "Elasticsearch")
|
|
w.WriteHeader(http.StatusOK)
|
|
_, _ = w.Write([]byte(`{"took":1,"timed_out":false,"total":2,"deleted":2,"batches":1,"version_conflicts":0,"noops":0,"retries":{"bulk":0,"search":0},"throttled_millis":0,"requests_per_second":-1.0,"throttled_until_millis":0,"failures":[]}`))
|
|
default:
|
|
// Fallback OK for any other request (e.g. internal health probes).
|
|
w.Header().Set("X-Elastic-Product", "Elasticsearch")
|
|
w.WriteHeader(http.StatusOK)
|
|
_, _ = w.Write([]byte(`{}`))
|
|
}
|
|
}))
|
|
return srv, &records, &mu
|
|
}
|
|
|
|
// newMockVezaESClient builds a veza wrapper pointing at the mock server.
|
|
func newMockVezaESClient(t *testing.T, baseURL, indexPrefix string) *vezaes.Client {
|
|
t.Helper()
|
|
rawCfg := elasticsearch.Config{Addresses: []string{baseURL}}
|
|
raw, err := elasticsearch.NewClient(rawCfg)
|
|
require.NoError(t, err)
|
|
return &vezaes.Client{
|
|
Client: raw,
|
|
Config: vezaes.Config{URL: baseURL, Index: indexPrefix, Enabled: true},
|
|
Logger: zap.NewNop(),
|
|
}
|
|
}
|
|
|
|
func TestCleanESDocs_CallsAllThreeIndices(t *testing.T) {
|
|
srv, records, mu := newESMockServer(t)
|
|
defer srv.Close()
|
|
|
|
esClient := newMockVezaESClient(t, srv.URL, "myenv")
|
|
w := NewHardDeleteWorker(nil, zap.NewNop(), time.Hour).WithElasticsearch(esClient)
|
|
|
|
userID := uuid.New()
|
|
trackIDs := []uuid.UUID{uuid.New(), uuid.New()}
|
|
playlistIDs := []uuid.UUID{uuid.New()}
|
|
|
|
err := w.cleanESDocs(context.Background(), userID, trackIDs, playlistIDs)
|
|
require.NoError(t, err)
|
|
|
|
mu.Lock()
|
|
defer mu.Unlock()
|
|
|
|
// Inspect recorded requests.
|
|
var sawUserDelete, sawTracksDBQ, sawPlaylistsDBQ bool
|
|
for _, rec := range *records {
|
|
switch {
|
|
case rec.Method == http.MethodDelete &&
|
|
strings.HasPrefix(rec.Path, "/myenv-users/_doc/") &&
|
|
strings.HasSuffix(rec.Path, userID.String()):
|
|
sawUserDelete = true
|
|
case rec.Method == http.MethodPost &&
|
|
rec.Path == "/myenv-tracks/_delete_by_query":
|
|
sawTracksDBQ = true
|
|
// body must contain both track UUIDs
|
|
var parsed map[string]any
|
|
require.NoError(t, json.Unmarshal([]byte(rec.Body), &parsed))
|
|
assert.Contains(t, rec.Body, trackIDs[0].String())
|
|
assert.Contains(t, rec.Body, trackIDs[1].String())
|
|
case rec.Method == http.MethodPost &&
|
|
rec.Path == "/myenv-playlists/_delete_by_query":
|
|
sawPlaylistsDBQ = true
|
|
assert.Contains(t, rec.Body, playlistIDs[0].String())
|
|
}
|
|
}
|
|
assert.True(t, sawUserDelete, "expected DELETE /myenv-users/_doc/{userID}")
|
|
assert.True(t, sawTracksDBQ, "expected POST /myenv-tracks/_delete_by_query")
|
|
assert.True(t, sawPlaylistsDBQ, "expected POST /myenv-playlists/_delete_by_query")
|
|
}
|
|
|
|
func TestCleanESDocs_SkipsEmptyIDLists(t *testing.T) {
|
|
srv, records, mu := newESMockServer(t)
|
|
defer srv.Close()
|
|
|
|
esClient := newMockVezaESClient(t, srv.URL, "")
|
|
w := NewHardDeleteWorker(nil, zap.NewNop(), time.Hour).WithElasticsearch(esClient)
|
|
|
|
// No tracks, no playlists — only the user doc delete should happen.
|
|
err := w.cleanESDocs(context.Background(), uuid.New(), nil, nil)
|
|
require.NoError(t, err)
|
|
|
|
mu.Lock()
|
|
defer mu.Unlock()
|
|
|
|
var dbqCount int
|
|
for _, rec := range *records {
|
|
if rec.Method == http.MethodPost && strings.HasSuffix(rec.Path, "/_delete_by_query") {
|
|
dbqCount++
|
|
}
|
|
}
|
|
assert.Equal(t, 0, dbqCount, "no delete_by_query should have been issued")
|
|
}
|