veza/veza-backend-api/internal/monitoring/ledger_metrics_test.go
senke 94dfc80b73 feat(metrics): ledger-health gauges + alert rules — v1.0.7 item F
Five Prometheus gauges + reconciler metrics + Grafana dashboard +
three alert rules. Closes axis-1 P1.8 and adds observability for
item C's reconciler (user review: "F should include reconciler_*
metrics, otherwise tag is blind on the worker we just shipped").

Gauges (veza_ledger_, sampled every 60s):
  * orphan_refund_rows — THE canary. Pending refunds with empty
    hyperswitch_refund_id older than 5m = Phase 2 crash in
    RefundOrder. Alert: > 0 for 5m → page.
  * stuck_orders_pending — order pending > 30m with non-empty
    payment_id. Alert: > 0 for 10m → page.
  * stuck_refunds_pending — refund pending > 30m with hs_id.
  * failed_transfers_at_max_retry — permanently_failed rows.
  * reversal_pending_transfers — item B rows stuck > 30m.

Reconciler metrics (veza_reconciler_):
  * actions_total{phase} — counter by phase.
  * orphan_refunds_total — two-phase-bug canary.
  * sweep_duration_seconds — exponential histogram.
  * last_run_timestamp — alert: stale > 2h → page (worker dead).

Implementation notes:
  * Sampler thresholds hardcoded to match reconciler defaults —
    intentional mismatch allowed (alerts fire while reconciler
    already working = correct behavior).
  * Query error sets gauge to -1 (sentinel for "sampler broken").
  * marketplace package routes through monitoring recorders so it
    doesn't import prometheus directly.
  * Sampler runs regardless of Hyperswitch enablement; gauges
    default 0 when pipeline idle.
  * Graceful shutdown wired in cmd/api/main.go.

Alert rules in config/alertmanager/ledger.yml with runbook
pointers + detailed descriptions — each alert explains WHAT
happened, WHY the reconciler may not resolve it, and WHERE to
look first.

Grafana dashboard config/grafana/dashboards/ledger-health.json —
top row = 5 stat panels (orphan first, color-coded red on > 0),
middle row = trend timeseries + reconciler action rate by phase,
bottom row = sweep duration p50/p95/p99 + seconds-since-last-tick
+ orphan cumulative.

Tests — 6 cases, all green (sqlite :memory:):
  * CountsStuckOrdersPending (includes the filter on
    non-empty payment_id)
  * StuckOrdersZeroWhenAllCompleted
  * CountsOrphanRefunds (THE canary)
  * CountsStuckRefundsWithHsID (gauge-orthogonality check)
  * CountsFailedAndReversalPendingTransfers
  * ReconcilerRecorders (counter + gauge shape)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 03:40:14 +02:00

222 lines
7.7 KiB
Go

package monitoring
import (
"context"
"testing"
"time"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
)
// --- test DB helpers --------------------------------------------------------
// minimal table schemas: we don't import marketplace here to avoid
// circular deps (monitoring is a leaf); we create just what the
// sampler queries.
type testOrder struct {
ID uuid.UUID `gorm:"type:uuid;primaryKey"`
Status string
HyperswitchPaymentID string
CreatedAt time.Time
}
func (testOrder) TableName() string { return "orders" }
type testRefund struct {
ID uuid.UUID `gorm:"type:uuid;primaryKey"`
Status string
HyperswitchRefundID string
CreatedAt time.Time
}
func (testRefund) TableName() string { return "refunds" }
type testSellerTransfer struct {
ID uuid.UUID `gorm:"type:uuid;primaryKey"`
Status string
UpdatedAt time.Time
}
func (testSellerTransfer) TableName() string { return "seller_transfers" }
func setupSamplerTestDB(t *testing.T) *gorm.DB {
t.Helper()
db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
require.NoError(t, err)
require.NoError(t, db.AutoMigrate(&testOrder{}, &testRefund{}, &testSellerTransfer{}))
return db
}
// gaugeValue reads the raw value of a Prometheus gauge for assertion.
func gaugeValue(t *testing.T, g prometheus.Gauge) float64 {
t.Helper()
m := &dto.Metric{}
require.NoError(t, g.Write(m))
return m.GetGauge().GetValue()
}
// --- stuck_orders_pending gauge ---------------------------------------------
func TestSampler_CountsStuckOrdersPending(t *testing.T) {
db := setupSamplerTestDB(t)
now := time.Now()
// 2 stuck (over 30m old) + 1 recent (under 30m) + 1 stuck but no payment_id.
require.NoError(t, db.Create(&testOrder{
ID: uuid.New(), Status: "pending", HyperswitchPaymentID: "pay_1",
CreatedAt: now.Add(-1 * time.Hour),
}).Error)
require.NoError(t, db.Create(&testOrder{
ID: uuid.New(), Status: "pending", HyperswitchPaymentID: "pay_2",
CreatedAt: now.Add(-45 * time.Minute),
}).Error)
require.NoError(t, db.Create(&testOrder{
ID: uuid.New(), Status: "pending", HyperswitchPaymentID: "pay_recent",
CreatedAt: now.Add(-5 * time.Minute),
}).Error)
// Stuck BUT no payment_id → excluded (pre-PSP, not the gauge's concern).
require.NoError(t, db.Create(&testOrder{
ID: uuid.New(), Status: "pending", HyperswitchPaymentID: "",
CreatedAt: now.Add(-1 * time.Hour),
}).Error)
SampleLedgerHealth(context.Background(), db, zap.NewNop())
assert.Equal(t, 2.0, gaugeValue(t, LedgerStuckOrdersPending),
"gauge must count only rows pending > 30m with non-empty payment_id")
}
func TestSampler_StuckOrdersZeroWhenAllCompleted(t *testing.T) {
db := setupSamplerTestDB(t)
require.NoError(t, db.Create(&testOrder{
ID: uuid.New(), Status: "completed", HyperswitchPaymentID: "pay_done",
CreatedAt: time.Now().Add(-1 * time.Hour),
}).Error)
SampleLedgerHealth(context.Background(), db, zap.NewNop())
assert.Equal(t, 0.0, gaugeValue(t, LedgerStuckOrdersPending))
}
// --- orphan_refund_rows gauge (THE alert gauge) -----------------------------
func TestSampler_CountsOrphanRefunds(t *testing.T) {
db := setupSamplerTestDB(t)
now := time.Now()
// Three orphans (empty hs_id, >5m old) + one recent orphan (should not count)
// + one with an hs_id (should not count).
for i := 0; i < 3; i++ {
require.NoError(t, db.Create(&testRefund{
ID: uuid.New(), Status: "pending", HyperswitchRefundID: "",
CreatedAt: now.Add(-10 * time.Minute),
}).Error)
}
require.NoError(t, db.Create(&testRefund{
ID: uuid.New(), Status: "pending", HyperswitchRefundID: "",
CreatedAt: now.Add(-2 * time.Minute), // too recent
}).Error)
require.NoError(t, db.Create(&testRefund{
ID: uuid.New(), Status: "pending", HyperswitchRefundID: "ref_ok",
CreatedAt: now.Add(-10 * time.Minute), // has hs_id → not orphan
}).Error)
SampleLedgerHealth(context.Background(), db, zap.NewNop())
assert.Equal(t, 3.0, gaugeValue(t, LedgerOrphanRefundRows),
"orphan gauge counts pending refunds with empty hs_id older than 5m — the two-phase-commit-bug canary")
}
// --- stuck_refunds_pending gauge --------------------------------------------
func TestSampler_CountsStuckRefundsWithHsID(t *testing.T) {
db := setupSamplerTestDB(t)
now := time.Now()
require.NoError(t, db.Create(&testRefund{
ID: uuid.New(), Status: "pending", HyperswitchRefundID: "ref_stuck",
CreatedAt: now.Add(-45 * time.Minute),
}).Error)
// Orphan, counted by a different gauge, not this one:
require.NoError(t, db.Create(&testRefund{
ID: uuid.New(), Status: "pending", HyperswitchRefundID: "",
CreatedAt: now.Add(-45 * time.Minute),
}).Error)
SampleLedgerHealth(context.Background(), db, zap.NewNop())
assert.Equal(t, 1.0, gaugeValue(t, LedgerStuckRefundsPending))
// And orphan gauge catches the other one.
assert.Equal(t, 1.0, gaugeValue(t, LedgerOrphanRefundRows))
}
// --- permanently_failed transfers + reversal_pending ------------------------
func TestSampler_CountsFailedAndReversalPendingTransfers(t *testing.T) {
db := setupSamplerTestDB(t)
// 2 permanently_failed, 1 completed (ignored by failed gauge),
// 1 reversal_pending updated 45m ago, 1 reversal_pending recent (ignored).
require.NoError(t, db.Create(&testSellerTransfer{
ID: uuid.New(), Status: "permanently_failed", UpdatedAt: time.Now(),
}).Error)
require.NoError(t, db.Create(&testSellerTransfer{
ID: uuid.New(), Status: "permanently_failed", UpdatedAt: time.Now().Add(-1 * time.Hour),
}).Error)
require.NoError(t, db.Create(&testSellerTransfer{
ID: uuid.New(), Status: "completed", UpdatedAt: time.Now(),
}).Error)
require.NoError(t, db.Create(&testSellerTransfer{
ID: uuid.New(), Status: "reversal_pending", UpdatedAt: time.Now().Add(-45 * time.Minute),
}).Error)
require.NoError(t, db.Create(&testSellerTransfer{
ID: uuid.New(), Status: "reversal_pending", UpdatedAt: time.Now().Add(-5 * time.Minute),
}).Error)
SampleLedgerHealth(context.Background(), db, zap.NewNop())
assert.Equal(t, 2.0, gaugeValue(t, LedgerFailedTransfersAtMaxRetry))
assert.Equal(t, 1.0, gaugeValue(t, LedgerReversalPendingTransfers),
"reversal_pending counts only rows that have been stuck past the threshold")
}
// --- reconciler counters are writable + readable ----------------------------
func TestReconcilerRecorders(t *testing.T) {
// Capture starting values so parallel test runs don't assume a
// clean slate (Prometheus registries are global).
actionsStart := func() float64 {
m := &dto.Metric{}
require.NoError(t, ReconcilerActionsTotal.WithLabelValues("stuck_orders").Write(m))
return m.GetCounter().GetValue()
}()
orphanStart := func() float64 {
m := &dto.Metric{}
require.NoError(t, ReconcilerOrphanRefundsTotal.Write(m))
return m.GetCounter().GetValue()
}()
RecordReconcilerAction("stuck_orders")
RecordReconcilerAction("stuck_orders")
RecordReconcilerOrphanRefund()
RecordReconcilerSweepDuration(500 * time.Millisecond)
actionsAfter := func() float64 {
m := &dto.Metric{}
require.NoError(t, ReconcilerActionsTotal.WithLabelValues("stuck_orders").Write(m))
return m.GetCounter().GetValue()
}()
orphanAfter := func() float64 {
m := &dto.Metric{}
require.NoError(t, ReconcilerOrphanRefundsTotal.Write(m))
return m.GetCounter().GetValue()
}()
lastRun := gaugeValue(t, ReconcilerLastRunTimestamp)
assert.Equal(t, 2.0, actionsAfter-actionsStart)
assert.Equal(t, 1.0, orphanAfter-orphanStart)
assert.Greater(t, lastRun, float64(0), "RecordReconcilerSweepDuration must stamp last-run timestamp")
}