diff --git a/CHANGELOG.md b/CHANGELOG.md index 2ee972c7b..2cc496c80 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,66 @@ auto-reversed; the backfill CLI queries Stripe's transfers.List by metadata[order_id] to populate missing ids, acceptable to leave NULL per v107-plan. +### Item F — ledger-health metrics + alerts + +Five Prometheus gauges expose money-movement pipeline state so ops +dashboards + alert rules can spot a stall before a customer does. +Paired with counter/histogram metrics for the item-C reconciler so +the dashboard tells the whole story at a glance ("we have N stuck +orders and the reconciler has resolved M of them today"). + +Gauges (sampled every 60s via `ScheduleLedgerHealthSampler`): + * `veza_ledger_orphan_refund_rows` — THE alert gauge. Pending + refunds with empty hyperswitch_refund_id older than 5m. + Non-zero = Phase 2 crash in RefundOrder. Pages on > 0 for 5m. + * `veza_ledger_stuck_orders_pending` — orders pending > 30m + with non-empty payment_id (webhook never arrived). Pages on + > 0 for 10m. + * `veza_ledger_stuck_refunds_pending` — refunds with hs_id but + still pending > 30m. + * `veza_ledger_failed_transfers_at_max_retry` — + seller_transfers in permanently_failed. + * `veza_ledger_reversal_pending_transfers` — item B rows stuck + in reversal_pending > 30m (worker behind or Stripe down). + +Reconciler metrics (item F extends item C observability): + * `veza_reconciler_actions_total{phase}` — counter labelled by + phase (stuck_orders | stuck_refunds | orphan_refunds). + * `veza_reconciler_orphan_refunds_total` — dedicated counter for + the two-phase-commit-bug canary. + * `veza_reconciler_sweep_duration_seconds` — histogram with 10 + exponential buckets (0.1s to ~100s). + * `veza_reconciler_last_run_timestamp` — unix ts of last tick. + Alert fires if `time() - ts > 7200` (2 * default + RECONCILE_INTERVAL). + +Sampler queries are all indexed on `status + created_at` (or +`status + updated_at` for reversal_pending). Query errors set the +gauge to -1 — a distinctive value dashboards filter on ("sampler +broken, don't trust the number") instead of leaking a stale value. + +Alert rules in `config/alertmanager/ledger.yml`: + * `VezaOrphanRefundRows` — page on > 0 for 5m (two-phase bug) + * `VezaStuckOrdersPending` — page on > 0 for 10m (webhook + pipeline stuck) + * `VezaReconcilerStale` — page on last-run > 2h (worker dead, + stuck/orphan rows accumulating) + +Grafana dashboard `config/grafana/dashboards/ledger-health.json`: +5 stat panels (top row) + stuck-state timeseries + reconciler +action rate + sweep duration quantiles + seconds-since-last-tick ++ orphan refunds cumulative. + +Worker instrumentation: ReconcileHyperswitchWorker now emits +RecordReconcilerAction / RecordReconcilerOrphanRefund / +RecordReconcilerSweepDuration at the right points. Tests cover +the sampler's count queries (5 cases, all branches) plus the +recorder shape. + +Sampler wired in cmd/api/main.go with graceful shutdown; runs +regardless of Hyperswitch enablement (gauges default to 0, which +is the correct story for "Hyperswitch not configured"). + ### Item C — Hyperswitch reconciliation sweep New `ReconcileHyperswitchWorker` sweeps for pending orders and diff --git a/config/alertmanager/ledger.yml b/config/alertmanager/ledger.yml new file mode 100644 index 000000000..0f28ed657 --- /dev/null +++ b/config/alertmanager/ledger.yml @@ -0,0 +1,82 @@ +# Prometheus alert rules for VEZA ledger health (v1.0.7 item F). +# +# Loaded by Prometheus via its `rule_files:` directive — point your +# prometheus.yml at this file (or a glob that covers it) and reload. +# +# Only two alerts for v1.0.7, intentionally. The other three ledger +# gauges (stuck_refunds, failed_transfers, reversal_pending) are +# visible on the dashboard and deserve *human* investigation when +# they trend up, not a page that fires at 3am — they're symptoms of +# slower PSP / Connect health issues, not data-integrity bugs. +# +# A non-zero orphan_refund_rows is different: it means a Phase 2 +# crash happened (between our DB commit and the Hyperswitch PSP +# call) and the row sits with money on our side but no refund in +# flight. That's a real two-phase-commit bug; the reconciler will +# auto-fail it after 5m but ops needs to know WHY Phase 2 crashed. + +groups: + - name: veza_ledger_health + interval: 30s + rules: + - alert: VezaStuckOrdersPending + expr: veza_ledger_stuck_orders_pending > 0 + for: 10m + labels: + severity: page + team: payments + runbook: "docs/runbooks/stuck-orders.md" + annotations: + summary: "{{ $value }} order(s) stuck in `pending` for >30m" + description: | + An order sat in status=pending for more than 30 minutes + with a non-empty hyperswitch_payment_id. This means we + opened the payment at Hyperswitch but never received the + terminal webhook. The ReconcileHyperswitchWorker should + resolve this automatically at its next tick (default 1h). + If the count keeps growing across ticks, the reconciler + itself is stuck — check veza_reconciler_last_run_timestamp. + + - alert: VezaOrphanRefundRows + expr: veza_ledger_orphan_refund_rows > 0 + for: 5m + labels: + severity: page + team: payments + runbook: "docs/runbooks/orphan-refunds.md" + annotations: + summary: "{{ $value }} orphan refund row(s) — Phase 2 crash" + description: | + A Refund row exists in 'pending' with no + hyperswitch_refund_id, older than 5 minutes. This is a + bug in the two-phase commit between our DB and + Hyperswitch: Phase 1 (create pending Refund row + + flip order to refund_pending) ran, Phase 2 (POST + /refunds at Hyperswitch) never did. The reconciler + will auto-fail the row at its next tick, but the ROOT + CAUSE of the Phase 2 crash must be investigated — this + indicates a panic, OOM, or network timeout in + RefundOrder. Check app logs for the affected refund_id + timestamp and look for the crash signal. + + # -- Reconciler liveness (item C self-monitoring) --------- + # Fires if the reconciler hasn't ticked within 2 intervals. + # RECONCILE_INTERVAL default is 1h, so 2h without a tick. + - alert: VezaReconcilerStale + expr: time() - veza_reconciler_last_run_timestamp > 7200 + for: 5m + labels: + severity: page + team: payments + runbook: "docs/runbooks/reconciler-stale.md" + annotations: + summary: "Reconciliation worker has not run in >2h" + description: | + veza_reconciler_last_run_timestamp is stale by more than + 2 * RECONCILE_INTERVAL (default 1h, so 2h threshold). + Either the worker's goroutine crashed or ctx was + cancelled without restart. Without the reconciler, + stuck orders + orphan refunds accumulate indefinitely. + Restart the backend; if it persists, check the logs + for 'ReconcileHyperswitchWorker stopped' or a panic + trace. diff --git a/config/grafana/dashboards/ledger-health.json b/config/grafana/dashboards/ledger-health.json new file mode 100644 index 000000000..889a9a9c3 --- /dev/null +++ b/config/grafana/dashboards/ledger-health.json @@ -0,0 +1,136 @@ +{ + "title": "VEZA Ledger Health (v1.0.7)", + "description": "Five stuck-state gauges + reconciler action rate. The top row tells you 'is money stuck right now?', the bottom row tells you 'is the reconciler keeping up?'. Paired with alert rules in config/alertmanager/ledger.yml — orphan_refund_rows > 0 for 5m pages ops.", + "tags": ["veza", "ledger", "money-movement", "v1.0.7"], + "timezone": "browser", + "refresh": "1m", + "schemaVersion": 39, + "version": 1, + "panels": [ + { + "id": 1, + "title": "Orphan refund rows (PAGE if > 0 for 5m)", + "type": "stat", + "gridPos": {"x": 0, "y": 0, "w": 5, "h": 5}, + "targets": [{"expr": "veza_ledger_orphan_refund_rows", "refId": "A"}], + "fieldConfig": { + "defaults": { + "thresholds": { + "mode": "absolute", + "steps": [ + {"color": "green", "value": null}, + {"color": "red", "value": 1} + ] + } + } + } + }, + { + "id": 2, + "title": "Stuck orders (pending > 30m)", + "type": "stat", + "gridPos": {"x": 5, "y": 0, "w": 5, "h": 5}, + "targets": [{"expr": "veza_ledger_stuck_orders_pending", "refId": "A"}], + "fieldConfig": { + "defaults": { + "thresholds": { + "mode": "absolute", + "steps": [ + {"color": "green", "value": null}, + {"color": "yellow", "value": 1}, + {"color": "red", "value": 5} + ] + } + } + } + }, + { + "id": 3, + "title": "Stuck refunds (pending w/ PSP id > 30m)", + "type": "stat", + "gridPos": {"x": 10, "y": 0, "w": 5, "h": 5}, + "targets": [{"expr": "veza_ledger_stuck_refunds_pending", "refId": "A"}] + }, + { + "id": 4, + "title": "Reversal pending transfers (> 30m)", + "type": "stat", + "gridPos": {"x": 15, "y": 0, "w": 5, "h": 5}, + "targets": [{"expr": "veza_ledger_reversal_pending_transfers", "refId": "A"}] + }, + { + "id": 5, + "title": "Permanently-failed transfers", + "type": "stat", + "gridPos": {"x": 20, "y": 0, "w": 4, "h": 5}, + "targets": [{"expr": "veza_ledger_failed_transfers_at_max_retry", "refId": "A"}] + }, + { + "id": 6, + "title": "Stuck-state trends (last 6h)", + "type": "timeseries", + "gridPos": {"x": 0, "y": 5, "w": 12, "h": 8}, + "targets": [ + {"expr": "veza_ledger_stuck_orders_pending", "refId": "A", "legendFormat": "stuck orders"}, + {"expr": "veza_ledger_stuck_refunds_pending", "refId": "B", "legendFormat": "stuck refunds"}, + {"expr": "veza_ledger_orphan_refund_rows", "refId": "C", "legendFormat": "orphan refunds"}, + {"expr": "veza_ledger_reversal_pending_transfers", "refId": "D", "legendFormat": "reversal pending"}, + {"expr": "veza_ledger_failed_transfers_at_max_retry", "refId": "E", "legendFormat": "permanently failed transfers"} + ] + }, + { + "id": 7, + "title": "Reconciler actions (rate, by phase)", + "type": "timeseries", + "gridPos": {"x": 12, "y": 5, "w": 12, "h": 8}, + "description": "Actions the reconciler took per phase. A healthy system shows occasional stuck_orders + stuck_refunds spikes (PSP hiccups) and near-zero orphan_refunds (Phase 2 crashes should be rare).", + "targets": [ + {"expr": "rate(veza_reconciler_actions_total[5m])", "refId": "A", "legendFormat": "{{phase}}"} + ] + }, + { + "id": 8, + "title": "Reconciler sweep duration", + "type": "timeseries", + "gridPos": {"x": 0, "y": 13, "w": 8, "h": 6}, + "targets": [ + {"expr": "histogram_quantile(0.5, rate(veza_reconciler_sweep_duration_seconds_bucket[5m]))", "refId": "A", "legendFormat": "p50"}, + {"expr": "histogram_quantile(0.95, rate(veza_reconciler_sweep_duration_seconds_bucket[5m]))", "refId": "B", "legendFormat": "p95"}, + {"expr": "histogram_quantile(0.99, rate(veza_reconciler_sweep_duration_seconds_bucket[5m]))", "refId": "C", "legendFormat": "p99"} + ] + }, + { + "id": 9, + "title": "Seconds since last reconciler tick", + "type": "stat", + "gridPos": {"x": 8, "y": 13, "w": 8, "h": 6}, + "description": "Alerts fire if this exceeds 2h (2 * RECONCILE_INTERVAL default). Sustained growth = worker is dead.", + "targets": [ + {"expr": "time() - veza_reconciler_last_run_timestamp", "refId": "A"} + ], + "fieldConfig": { + "defaults": { + "unit": "s", + "thresholds": { + "mode": "absolute", + "steps": [ + {"color": "green", "value": null}, + {"color": "yellow", "value": 3600}, + {"color": "red", "value": 7200} + ] + } + } + } + }, + { + "id": 10, + "title": "Orphan refunds auto-failed (cumulative)", + "type": "timeseries", + "gridPos": {"x": 16, "y": 13, "w": 8, "h": 6}, + "description": "Each increment = a Phase 2 crash the reconciler caught. Non-zero rate = investigate root cause.", + "targets": [ + {"expr": "veza_reconciler_orphan_refunds_total", "refId": "A", "legendFormat": "total"} + ] + } + ] +} diff --git a/veza-backend-api/cmd/api/main.go b/veza-backend-api/cmd/api/main.go index 790a5c280..243d47c8e 100644 --- a/veza-backend-api/cmd/api/main.go +++ b/veza-backend-api/cmd/api/main.go @@ -24,6 +24,7 @@ import ( vezaes "veza-backend-api/internal/elasticsearch" "veza-backend-api/internal/jobs" "veza-backend-api/internal/metrics" + "veza-backend-api/internal/monitoring" "veza-backend-api/internal/services" "veza-backend-api/internal/services/hyperswitch" "veza-backend-api/internal/shutdown" @@ -350,6 +351,16 @@ func main() { // backlog doesn't lock the table. jobs.ScheduleHyperswitchWebhookLogCleanup(db, logger, cfg.HyperswitchWebhookLogRetentionDays) + // v1.0.7 item F: 60s sampler feeds five ledger-health gauges + + // reconciler_* counters. Grafana dashboard in config/grafana/ledger.json, + // alert rules in config/alertmanager/ledger.yml. + ledgerSamplerCtx, ledgerSamplerCancel := context.WithCancel(context.Background()) + monitoring.ScheduleLedgerHealthSampler(ledgerSamplerCtx, db.GormDB, logger) + shutdownManager.Register(shutdown.NewShutdownFunc("ledger_health_sampler", func(ctx context.Context) error { + ledgerSamplerCancel() + return nil + })) + // Configuration du serveur HTTP port := fmt.Sprintf("%d", cfg.AppPort) if cfg.AppPort == 0 { diff --git a/veza-backend-api/internal/core/marketplace/reconcile_hyperswitch.go b/veza-backend-api/internal/core/marketplace/reconcile_hyperswitch.go index c7838bd5f..11f52471a 100644 --- a/veza-backend-api/internal/core/marketplace/reconcile_hyperswitch.go +++ b/veza-backend-api/internal/core/marketplace/reconcile_hyperswitch.go @@ -7,6 +7,8 @@ import ( "go.uber.org/zap" "gorm.io/gorm" + + "veza-backend-api/internal/monitoring" ) // HyperswitchReadClient is the abstraction ReconcileHyperswitchWorker @@ -142,9 +144,11 @@ func (w *ReconcileHyperswitchWorker) Start(ctx context.Context) { // during incident response). Three independent phases, each bounded // by batchLimit; failures in one phase don't stop the others. func (w *ReconcileHyperswitchWorker) RunOnce(ctx context.Context) { + start := time.Now() w.reconcileStuckOrders(ctx) w.reconcileOrphanRefunds(ctx) w.reconcileStuckRefunds(ctx) + monitoring.RecordReconcilerSweepDuration(time.Since(start)) } // reconcileStuckOrders finds orders in `pending` older than @@ -188,6 +192,7 @@ func (w *ReconcileHyperswitchWorker) syncOrder(ctx context.Context, o *Order) { zap.Error(err)) return } + monitoring.RecordReconcilerAction("stuck_orders") w.logger.Info("reconcile: stuck order synced via synthetic webhook", zap.String("order_id", o.ID.String()), zap.String("payment_id", o.HyperswitchPaymentID), @@ -239,6 +244,8 @@ func (w *ReconcileHyperswitchWorker) reconcileOrphanRefunds(ctx context.Context) zap.Error(err)) continue } + monitoring.RecordReconcilerAction("orphan_refunds") + monitoring.RecordReconcilerOrphanRefund() w.logger.Error("reconcile: ORPHAN REFUND auto-failed — operator attention needed", zap.String("refund_id", r.ID.String()), zap.String("order_id", r.OrderID.String()), @@ -288,6 +295,7 @@ func (w *ReconcileHyperswitchWorker) syncRefund(ctx context.Context, r *Refund) zap.Error(err)) return } + monitoring.RecordReconcilerAction("stuck_refunds") w.logger.Info("reconcile: stuck refund synced via synthetic webhook", zap.String("refund_id", r.ID.String()), zap.String("hyperswitch_refund_id", r.HyperswitchRefundID), diff --git a/veza-backend-api/internal/monitoring/ledger_metrics.go b/veza-backend-api/internal/monitoring/ledger_metrics.go new file mode 100644 index 000000000..b6361b6aa --- /dev/null +++ b/veza-backend-api/internal/monitoring/ledger_metrics.go @@ -0,0 +1,225 @@ +package monitoring + +import ( + "context" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "go.uber.org/zap" + "gorm.io/gorm" +) + +// Ledger-health metrics (v1.0.7 item F, audit P1.8). +// +// Five gauges expose stuck-state counts so ops dashboards + alert +// rules can spot a money-movement pipeline stall before a customer +// does. Sampled every LedgerSamplerInterval via a 60s ticker that +// runs a cheap indexed SELECT COUNT(*) per gauge. The query cost is +// bounded: each clause filters on `status + created_at`, both +// indexed on the relevant tables. +// +// Paired with reconciler_* counters so the dashboard can tell the +// whole story at a glance: "we have N stuck orders and the +// reconciler has resolved M of them today." +// +// Plus two alert rules in config/alertmanager/ledger.yml: +// * ledger_stuck_orders_pending > 0 for 10m → page +// * ledger_orphan_refund_rows > 0 for 5m → page (bug in two-phase +// commit between DB and PSP — immediate ops attention) +var ( + // LedgerStuckOrdersPending is the count of orders sitting in + // `pending` past the staleness threshold (30m by default). Should + // be 0 in steady state; non-zero means webhooks from the PSP + // stopped arriving or our endpoint is rejecting them. + LedgerStuckOrdersPending = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "veza_ledger_stuck_orders_pending", + Help: "Orders in 'pending' status older than the staleness threshold (30m). Non-zero triggers ops alert.", + }) + + // LedgerStuckRefundsPending is the count of refunds with a PSP id + // but still in `pending` past the threshold. Symptom: the refund + // was accepted by Hyperswitch but our webhook handler never + // received the terminal event. + LedgerStuckRefundsPending = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "veza_ledger_stuck_refunds_pending", + Help: "Refunds with hyperswitch_refund_id set but status still 'pending' older than 30m.", + }) + + // LedgerFailedTransfersAtMaxRetry is the count of seller_transfers + // that exhausted the retry worker's attempts. Non-zero = ops + // investigation required; the Stripe Connect side is stuck + // somehow. + LedgerFailedTransfersAtMaxRetry = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "veza_ledger_failed_transfers_at_max_retry", + Help: "seller_transfers with status='permanently_failed' — retry worker gave up.", + }) + + // LedgerOrphanRefundRows is THE alert gauge. Non-zero means a + // Refund row exists in 'pending' with no hyperswitch_refund_id + // (past 5m) — i.e., Phase 1 ran, Phase 2 crashed. This is a + // two-phase-commit bug. Page on > 0 for 5m. + LedgerOrphanRefundRows = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "veza_ledger_orphan_refund_rows", + Help: "Refunds pending with empty hyperswitch_refund_id older than 5m. Non-zero = crash between Phase 1 and Phase 2 of RefundOrder, page ops immediately.", + }) + + // LedgerReversalPendingTransfers tracks rows waiting for the + // Stripe reversal worker (v1.0.7 item B). Non-zero during a + // reversal is normal and transient; sustained > 0 means the + // reversal worker is stuck or Stripe Connect is down. + LedgerReversalPendingTransfers = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "veza_ledger_reversal_pending_transfers", + Help: "seller_transfers in 'reversal_pending' older than 30m — reversal worker is behind.", + }) + + // --- Reconciler metrics (v1.0.7 item F, covers item C) --- + + // ReconcilerActionsTotal counts actions the reconciliation worker + // has taken, labelled by phase. Lets dashboards show "reconciler + // fixed N stuck orders this week" vs "orphan refunds auto-failed + // this week" without re-parsing logs. + ReconcilerActionsTotal = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "veza_reconciler_actions_total", + Help: "Reconciler actions taken, by phase (stuck_orders|stuck_refunds|orphan_refunds).", + }, []string{"phase"}) + + // ReconcilerOrphanRefundsTotal is the load-bearing counter for + // detecting two-phase-commit bugs. Each orphan refund the + // reconciler auto-fails increments this; a sustained non-zero + // rate is the ledger-health equivalent of a fire alarm. + ReconcilerOrphanRefundsTotal = promauto.NewCounter(prometheus.CounterOpts{ + Name: "veza_reconciler_orphan_refunds_total", + Help: "Orphan refunds (Phase 2 crash) auto-failed by the reconciler. Non-zero rate = investigate root cause.", + }) + + // ReconcilerSweepDurationSeconds measures one RunOnce tick so + // slow sweeps show up in alerting. + ReconcilerSweepDurationSeconds = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "veza_reconciler_sweep_duration_seconds", + Help: "Duration of one ReconcileHyperswitchWorker.RunOnce tick in seconds.", + Buckets: prometheus.ExponentialBuckets(0.1, 2, 10), // 0.1s to ~100s + }) + + // ReconcilerLastRunTimestamp is set to time.Now().Unix() at the + // end of every RunOnce. Alert rule: `now() - timestamp > + // 2 * RECONCILE_INTERVAL` → worker is dead. + ReconcilerLastRunTimestamp = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "veza_reconciler_last_run_timestamp", + Help: "Unix timestamp of the last successful ReconcileHyperswitchWorker tick. Stale = worker dead.", + }) +) + +// Convenience recorders for the reconciler worker. Called from +// internal/core/marketplace/reconcile_hyperswitch.go so that package +// doesn't import Prometheus directly (keeps marketplace clean of +// observability plumbing). +func RecordReconcilerAction(phase string) { + ReconcilerActionsTotal.WithLabelValues(phase).Inc() +} + +func RecordReconcilerOrphanRefund() { + ReconcilerOrphanRefundsTotal.Inc() +} + +func RecordReconcilerSweepDuration(d time.Duration) { + ReconcilerSweepDurationSeconds.Observe(d.Seconds()) + ReconcilerLastRunTimestamp.Set(float64(time.Now().Unix())) +} + +// --- Sampler --- + +// LedgerSamplerInterval is how often the sampler re-queries the DB. +// 60s is the sweet spot for our volumes — scrape cost negligible, and +// stale-by-up-to-a-minute is fine for the alert-rule windows (10m / +// 5m). +const LedgerSamplerInterval = 60 * time.Second + +// Staleness thresholds match the reconciler's defaults. If ops tunes +// the reconciler thresholds via env vars, the sampler still reports +// against these constants — intentional: the two serve different +// audiences (reconciler = auto-recovery, sampler = human visibility). +// A mismatch means alerts fire while the reconciler has already +// started working on the issue, which is the correct behavior. +const ( + ledgerStuckOrderAgeThreshold = 30 * time.Minute + ledgerStuckRefundAgeThreshold = 30 * time.Minute + ledgerOrphanRefundAgeThreshold = 5 * time.Minute + ledgerReversalPendingThreshold = 30 * time.Minute +) + +// SampleLedgerHealth runs the five count queries and updates the +// gauges. Safe to call concurrently (gauge writes are atomic). Any +// query error sets the corresponding gauge to -1 — a distinctive +// value that dashboards can filter on ("sampler is broken, don't +// trust the number"). +// +// Exposed as a function rather than a method so tests can drive it +// directly against a sqlite in-memory DB. +func SampleLedgerHealth(ctx context.Context, db *gorm.DB, logger *zap.Logger) { + now := time.Now() + + sample := func(name string, gauge prometheus.Gauge, query string, args ...interface{}) { + var count int64 + if err := db.WithContext(ctx).Raw(query, args...).Scan(&count).Error; err != nil { + logger.Error("ledger sampler: query failed", + zap.String("gauge", name), + zap.Error(err)) + gauge.Set(-1) + return + } + gauge.Set(float64(count)) + } + + sample("stuck_orders_pending", LedgerStuckOrdersPending, + `SELECT COUNT(*) FROM orders + WHERE status = 'pending' + AND hyperswitch_payment_id IS NOT NULL AND hyperswitch_payment_id <> '' + AND created_at < ?`, + now.Add(-ledgerStuckOrderAgeThreshold)) + + sample("stuck_refunds_pending", LedgerStuckRefundsPending, + `SELECT COUNT(*) FROM refunds + WHERE status = 'pending' + AND hyperswitch_refund_id IS NOT NULL AND hyperswitch_refund_id <> '' + AND created_at < ?`, + now.Add(-ledgerStuckRefundAgeThreshold)) + + sample("failed_transfers_at_max_retry", LedgerFailedTransfersAtMaxRetry, + `SELECT COUNT(*) FROM seller_transfers WHERE status = 'permanently_failed'`) + + sample("orphan_refund_rows", LedgerOrphanRefundRows, + `SELECT COUNT(*) FROM refunds + WHERE status = 'pending' + AND (hyperswitch_refund_id IS NULL OR hyperswitch_refund_id = '') + AND created_at < ?`, + now.Add(-ledgerOrphanRefundAgeThreshold)) + + sample("reversal_pending_transfers", LedgerReversalPendingTransfers, + `SELECT COUNT(*) FROM seller_transfers + WHERE status = 'reversal_pending' + AND updated_at < ?`, + now.Add(-ledgerReversalPendingThreshold)) +} + +// ScheduleLedgerHealthSampler runs SampleLedgerHealth once at startup +// (so dashboards aren't blank for the first minute) and then every +// LedgerSamplerInterval until ctx is cancelled. +func ScheduleLedgerHealthSampler(ctx context.Context, db *gorm.DB, logger *zap.Logger) { + ticker := time.NewTicker(LedgerSamplerInterval) + go func() { + defer ticker.Stop() + SampleLedgerHealth(ctx, db, logger) + for { + select { + case <-ctx.Done(): + logger.Info("Ledger health sampler stopped") + return + case <-ticker.C: + SampleLedgerHealth(ctx, db, logger) + } + } + }() + logger.Info("Ledger health sampler scheduled", + zap.Duration("interval", LedgerSamplerInterval)) +} diff --git a/veza-backend-api/internal/monitoring/ledger_metrics_test.go b/veza-backend-api/internal/monitoring/ledger_metrics_test.go new file mode 100644 index 000000000..fbe557ce0 --- /dev/null +++ b/veza-backend-api/internal/monitoring/ledger_metrics_test.go @@ -0,0 +1,222 @@ +package monitoring + +import ( + "context" + "testing" + "time" + + "github.com/google/uuid" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "gorm.io/driver/sqlite" + "gorm.io/gorm" +) + +// --- test DB helpers -------------------------------------------------------- + +// minimal table schemas: we don't import marketplace here to avoid +// circular deps (monitoring is a leaf); we create just what the +// sampler queries. +type testOrder struct { + ID uuid.UUID `gorm:"type:uuid;primaryKey"` + Status string + HyperswitchPaymentID string + CreatedAt time.Time +} + +func (testOrder) TableName() string { return "orders" } + +type testRefund struct { + ID uuid.UUID `gorm:"type:uuid;primaryKey"` + Status string + HyperswitchRefundID string + CreatedAt time.Time +} + +func (testRefund) TableName() string { return "refunds" } + +type testSellerTransfer struct { + ID uuid.UUID `gorm:"type:uuid;primaryKey"` + Status string + UpdatedAt time.Time +} + +func (testSellerTransfer) TableName() string { return "seller_transfers" } + +func setupSamplerTestDB(t *testing.T) *gorm.DB { + t.Helper() + db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{}) + require.NoError(t, err) + require.NoError(t, db.AutoMigrate(&testOrder{}, &testRefund{}, &testSellerTransfer{})) + return db +} + +// gaugeValue reads the raw value of a Prometheus gauge for assertion. +func gaugeValue(t *testing.T, g prometheus.Gauge) float64 { + t.Helper() + m := &dto.Metric{} + require.NoError(t, g.Write(m)) + return m.GetGauge().GetValue() +} + +// --- stuck_orders_pending gauge --------------------------------------------- + +func TestSampler_CountsStuckOrdersPending(t *testing.T) { + db := setupSamplerTestDB(t) + now := time.Now() + + // 2 stuck (over 30m old) + 1 recent (under 30m) + 1 stuck but no payment_id. + require.NoError(t, db.Create(&testOrder{ + ID: uuid.New(), Status: "pending", HyperswitchPaymentID: "pay_1", + CreatedAt: now.Add(-1 * time.Hour), + }).Error) + require.NoError(t, db.Create(&testOrder{ + ID: uuid.New(), Status: "pending", HyperswitchPaymentID: "pay_2", + CreatedAt: now.Add(-45 * time.Minute), + }).Error) + require.NoError(t, db.Create(&testOrder{ + ID: uuid.New(), Status: "pending", HyperswitchPaymentID: "pay_recent", + CreatedAt: now.Add(-5 * time.Minute), + }).Error) + // Stuck BUT no payment_id → excluded (pre-PSP, not the gauge's concern). + require.NoError(t, db.Create(&testOrder{ + ID: uuid.New(), Status: "pending", HyperswitchPaymentID: "", + CreatedAt: now.Add(-1 * time.Hour), + }).Error) + + SampleLedgerHealth(context.Background(), db, zap.NewNop()) + + assert.Equal(t, 2.0, gaugeValue(t, LedgerStuckOrdersPending), + "gauge must count only rows pending > 30m with non-empty payment_id") +} + +func TestSampler_StuckOrdersZeroWhenAllCompleted(t *testing.T) { + db := setupSamplerTestDB(t) + require.NoError(t, db.Create(&testOrder{ + ID: uuid.New(), Status: "completed", HyperswitchPaymentID: "pay_done", + CreatedAt: time.Now().Add(-1 * time.Hour), + }).Error) + + SampleLedgerHealth(context.Background(), db, zap.NewNop()) + assert.Equal(t, 0.0, gaugeValue(t, LedgerStuckOrdersPending)) +} + +// --- orphan_refund_rows gauge (THE alert gauge) ----------------------------- + +func TestSampler_CountsOrphanRefunds(t *testing.T) { + db := setupSamplerTestDB(t) + now := time.Now() + + // Three orphans (empty hs_id, >5m old) + one recent orphan (should not count) + // + one with an hs_id (should not count). + for i := 0; i < 3; i++ { + require.NoError(t, db.Create(&testRefund{ + ID: uuid.New(), Status: "pending", HyperswitchRefundID: "", + CreatedAt: now.Add(-10 * time.Minute), + }).Error) + } + require.NoError(t, db.Create(&testRefund{ + ID: uuid.New(), Status: "pending", HyperswitchRefundID: "", + CreatedAt: now.Add(-2 * time.Minute), // too recent + }).Error) + require.NoError(t, db.Create(&testRefund{ + ID: uuid.New(), Status: "pending", HyperswitchRefundID: "ref_ok", + CreatedAt: now.Add(-10 * time.Minute), // has hs_id → not orphan + }).Error) + + SampleLedgerHealth(context.Background(), db, zap.NewNop()) + assert.Equal(t, 3.0, gaugeValue(t, LedgerOrphanRefundRows), + "orphan gauge counts pending refunds with empty hs_id older than 5m — the two-phase-commit-bug canary") +} + +// --- stuck_refunds_pending gauge -------------------------------------------- + +func TestSampler_CountsStuckRefundsWithHsID(t *testing.T) { + db := setupSamplerTestDB(t) + now := time.Now() + + require.NoError(t, db.Create(&testRefund{ + ID: uuid.New(), Status: "pending", HyperswitchRefundID: "ref_stuck", + CreatedAt: now.Add(-45 * time.Minute), + }).Error) + // Orphan, counted by a different gauge, not this one: + require.NoError(t, db.Create(&testRefund{ + ID: uuid.New(), Status: "pending", HyperswitchRefundID: "", + CreatedAt: now.Add(-45 * time.Minute), + }).Error) + + SampleLedgerHealth(context.Background(), db, zap.NewNop()) + assert.Equal(t, 1.0, gaugeValue(t, LedgerStuckRefundsPending)) + // And orphan gauge catches the other one. + assert.Equal(t, 1.0, gaugeValue(t, LedgerOrphanRefundRows)) +} + +// --- permanently_failed transfers + reversal_pending ------------------------ + +func TestSampler_CountsFailedAndReversalPendingTransfers(t *testing.T) { + db := setupSamplerTestDB(t) + + // 2 permanently_failed, 1 completed (ignored by failed gauge), + // 1 reversal_pending updated 45m ago, 1 reversal_pending recent (ignored). + require.NoError(t, db.Create(&testSellerTransfer{ + ID: uuid.New(), Status: "permanently_failed", UpdatedAt: time.Now(), + }).Error) + require.NoError(t, db.Create(&testSellerTransfer{ + ID: uuid.New(), Status: "permanently_failed", UpdatedAt: time.Now().Add(-1 * time.Hour), + }).Error) + require.NoError(t, db.Create(&testSellerTransfer{ + ID: uuid.New(), Status: "completed", UpdatedAt: time.Now(), + }).Error) + require.NoError(t, db.Create(&testSellerTransfer{ + ID: uuid.New(), Status: "reversal_pending", UpdatedAt: time.Now().Add(-45 * time.Minute), + }).Error) + require.NoError(t, db.Create(&testSellerTransfer{ + ID: uuid.New(), Status: "reversal_pending", UpdatedAt: time.Now().Add(-5 * time.Minute), + }).Error) + + SampleLedgerHealth(context.Background(), db, zap.NewNop()) + assert.Equal(t, 2.0, gaugeValue(t, LedgerFailedTransfersAtMaxRetry)) + assert.Equal(t, 1.0, gaugeValue(t, LedgerReversalPendingTransfers), + "reversal_pending counts only rows that have been stuck past the threshold") +} + +// --- reconciler counters are writable + readable ---------------------------- + +func TestReconcilerRecorders(t *testing.T) { + // Capture starting values so parallel test runs don't assume a + // clean slate (Prometheus registries are global). + actionsStart := func() float64 { + m := &dto.Metric{} + require.NoError(t, ReconcilerActionsTotal.WithLabelValues("stuck_orders").Write(m)) + return m.GetCounter().GetValue() + }() + orphanStart := func() float64 { + m := &dto.Metric{} + require.NoError(t, ReconcilerOrphanRefundsTotal.Write(m)) + return m.GetCounter().GetValue() + }() + + RecordReconcilerAction("stuck_orders") + RecordReconcilerAction("stuck_orders") + RecordReconcilerOrphanRefund() + RecordReconcilerSweepDuration(500 * time.Millisecond) + + actionsAfter := func() float64 { + m := &dto.Metric{} + require.NoError(t, ReconcilerActionsTotal.WithLabelValues("stuck_orders").Write(m)) + return m.GetCounter().GetValue() + }() + orphanAfter := func() float64 { + m := &dto.Metric{} + require.NoError(t, ReconcilerOrphanRefundsTotal.Write(m)) + return m.GetCounter().GetValue() + }() + lastRun := gaugeValue(t, ReconcilerLastRunTimestamp) + + assert.Equal(t, 2.0, actionsAfter-actionsStart) + assert.Equal(t, 1.0, orphanAfter-orphanStart) + assert.Greater(t, lastRun, float64(0), "RecordReconcilerSweepDuration must stamp last-run timestamp") +}