|
| 1 | +package stripe |
| 2 | + |
| 3 | +import ( |
| 4 | + "context" |
| 5 | + "fmt" |
| 6 | + "math" |
| 7 | + "os" |
| 8 | + "time" |
| 9 | + |
| 10 | + stripesdk "github.com/stripe/stripe-go/v86" |
| 11 | + "github.com/unkeyed/unkey/pkg/billingperiod" |
| 12 | + "github.com/unkeyed/unkey/pkg/cli" |
| 13 | + "github.com/unkeyed/unkey/pkg/clickhouse" |
| 14 | + "github.com/unkeyed/unkey/pkg/tui" |
| 15 | +) |
| 16 | + |
| 17 | +// Worker unit conversions (svc/ctrl/worker/cron/deploybilling/billing.go): the |
| 18 | +// ClickHouse query reports memory/disk in GiB-hours and egress in bytes, while |
| 19 | +// the meters bill in GiB-seconds and GiB. Kept in sync by hand; the fixture |
| 20 | +// test TestMeterUsageHandComputedBill guards the same factors. |
| 21 | +const ( |
| 22 | + secondsPerHour = 3600.0 |
| 23 | + bytesPerGiB = float64(int64(1024 * 1024 * 1024)) |
| 24 | +) |
| 25 | + |
| 26 | +// relativeTolerance is the drift a meter may show before shadow flags it. The |
| 27 | +// hourly push can lag the live query by up to an hour, so a small difference |
| 28 | +// is usually that lag rather than a real problem. Set above the lag, below |
| 29 | +// any difference that would indicate the pipeline is dropping or double-counting. |
| 30 | +const relativeTolerance = 0.001 // 0.1% |
| 31 | + |
| 32 | +// shadowCmd recomputes a workspace's month-to-date Deploy usage from ClickHouse |
| 33 | +// and diffs it against the values Stripe holds for the customer's meters. Use it |
| 34 | +// before enabling billing to check the query, hourly push, and meters agree. |
| 35 | +var shadowCmd = &cli.Command{ |
| 36 | + Name: "shadow", |
| 37 | + Usage: "Diff a workspace's ClickHouse usage against Stripe's meter values", |
| 38 | + Flags: []cli.Flag{ |
| 39 | + keyFlag(), |
| 40 | + cli.String("clickhouse-url", "ClickHouse DSN", cli.EnvVar("CLICKHOUSE_URL"), cli.Required()), |
| 41 | + cli.String("workspace", "Workspace id (for the ClickHouse query)", cli.Required()), |
| 42 | + cli.String("customer", "Stripe customer id (cus_...)", cli.Required()), |
| 43 | + cli.String("month", "Billing month YYYY-MM (default: current)", cli.Default("")), |
| 44 | + }, |
| 45 | + Action: shadow, |
| 46 | +} |
| 47 | + |
| 48 | +// meterValues holds one workspace's usage in the exact units each Stripe meter |
| 49 | +// expects, keyed by the meter's event_name so it lines up with the summaries. |
| 50 | +type meterValues map[string]float64 |
| 51 | + |
| 52 | +func shadow(ctx context.Context, cmd *cli.Command) error { |
| 53 | + sc, err := newClient(cmd) |
| 54 | + if err != nil { |
| 55 | + return err |
| 56 | + } |
| 57 | + out := tui.New(os.Stdout) |
| 58 | + |
| 59 | + period, err := resolvePeriod(ctx, sc, cmd.String("month")) |
| 60 | + if err != nil { |
| 61 | + return err |
| 62 | + } |
| 63 | + start := period.Start() |
| 64 | + end := period.End() |
| 65 | + |
| 66 | + out.Printf("Shadow %s workspace=%s customer=%s\n", |
| 67 | + out.Bold(start.Format("2006-01")), cmd.RequireString("workspace"), cmd.RequireString("customer")) |
| 68 | + out.Println(out.Dim(fmt.Sprintf(" window %s -> %s", start.Format("2006-01-02"), end.Format("2006-01-02")))) |
| 69 | + |
| 70 | + chValues, err := clickhouseUsage(ctx, cmd, start.UnixMilli(), end.UnixMilli()) |
| 71 | + if err != nil { |
| 72 | + return err |
| 73 | + } |
| 74 | + |
| 75 | + stripeValues, err := stripeMeterValues(ctx, sc, cmd.RequireString("customer"), start.Unix(), end.Unix()) |
| 76 | + if err != nil { |
| 77 | + return err |
| 78 | + } |
| 79 | + |
| 80 | + return report(out, chValues, stripeValues) |
| 81 | +} |
| 82 | + |
| 83 | +// resolvePeriod picks the billing period: the --month flag if given, else the |
| 84 | +// current calendar month from the local clock. For clocked test customers |
| 85 | +// whose "now" is not wall-clock time, pass --month explicitly. |
| 86 | +func resolvePeriod(_ context.Context, _ *stripesdk.Client, month string) (billingperiod.Period, error) { |
| 87 | + if month != "" { |
| 88 | + return billingperiod.Parse(month) |
| 89 | + } |
| 90 | + now := time.Now().UTC() |
| 91 | + return billingperiod.Period{Year: now.Year(), Month: now.Month()}, nil |
| 92 | +} |
| 93 | + |
| 94 | +// clickhouseUsage runs the production billing queries for the window and folds |
| 95 | +// them into meter units, exactly as the worker does before pushing. |
| 96 | +func clickhouseUsage(ctx context.Context, cmd *cli.Command, startMillis, endMillis int64) (meterValues, error) { |
| 97 | + ch, err := clickhouse.New(clickhouse.Config{URL: cmd.RequireString("clickhouse-url")}) |
| 98 | + if err != nil { |
| 99 | + return nil, fmt.Errorf("connect clickhouse: %w", err) |
| 100 | + } |
| 101 | + defer func() { _ = ch.Close() }() |
| 102 | + |
| 103 | + workspace := cmd.RequireString("workspace") |
| 104 | + |
| 105 | + usage, err := ch.GetInstanceMeterUsage(ctx, clickhouse.GetInstanceMeterUsageRequest{ |
| 106 | + WorkspaceID: workspace, |
| 107 | + Start: startMillis, |
| 108 | + End: endMillis, |
| 109 | + }) |
| 110 | + if err != nil { |
| 111 | + return nil, fmt.Errorf("query instance usage: %w", err) |
| 112 | + } |
| 113 | + |
| 114 | + var cpuSeconds, memoryGiBHours, diskGiBHours float64 |
| 115 | + var egressBytes int64 |
| 116 | + for _, r := range usage { |
| 117 | + cpuSeconds += r.CPUSeconds |
| 118 | + memoryGiBHours += r.MemoryGiBHours |
| 119 | + diskGiBHours += r.DiskGiBHours |
| 120 | + egressBytes += r.EgressBytes |
| 121 | + } |
| 122 | + |
| 123 | + keys, err := ch.GetActiveKeysUsage(ctx, clickhouse.GetActiveKeysUsageRequest{ |
| 124 | + WorkspaceID: workspace, |
| 125 | + Month: startMillis, |
| 126 | + }) |
| 127 | + if err != nil { |
| 128 | + return nil, fmt.Errorf("query active keys: %w", err) |
| 129 | + } |
| 130 | + var activeKeys float64 |
| 131 | + for _, k := range keys { |
| 132 | + activeKeys += float64(k.ActiveKeys) |
| 133 | + } |
| 134 | + |
| 135 | + return meterValues{ |
| 136 | + "cpu_seconds": cpuSeconds, |
| 137 | + "memory_gib_seconds": memoryGiBHours * secondsPerHour, |
| 138 | + "egress_public_gib": float64(egressBytes) / bytesPerGiB, |
| 139 | + "disk_gib_seconds": diskGiBHours * secondsPerHour, |
| 140 | + "active_keys": activeKeys, |
| 141 | + }, nil |
| 142 | +} |
| 143 | + |
| 144 | +// stripeMeterValues reads the value Stripe currently holds for each meter: the |
| 145 | +// "last"-aggregated summary over the window is the most recent month-to-date |
| 146 | +// total the worker pushed. |
| 147 | +func stripeMeterValues(ctx context.Context, sc *stripesdk.Client, customer string, startUnix, endUnix int64) (meterValues, error) { |
| 148 | + values := meterValues{} |
| 149 | + |
| 150 | + meters := sc.V1BillingMeters.List(ctx, &stripesdk.BillingMeterListParams{ |
| 151 | + ListParams: stripesdk.ListParams{Limit: stripesdk.Int64(100)}, |
| 152 | + }) |
| 153 | + for meter, err := range meters.All(ctx) { |
| 154 | + if err != nil { |
| 155 | + return nil, fmt.Errorf("list meters: %w", err) |
| 156 | + } |
| 157 | + |
| 158 | + summaries := sc.V1BillingMeterEventSummaries.List(ctx, &stripesdk.BillingMeterEventSummaryListParams{ |
| 159 | + ID: stripesdk.String(meter.ID), |
| 160 | + Customer: stripesdk.String(customer), |
| 161 | + StartTime: stripesdk.Int64(startUnix), |
| 162 | + EndTime: stripesdk.Int64(endUnix), |
| 163 | + }) |
| 164 | + var value float64 |
| 165 | + for summary, sErr := range summaries.All(ctx) { |
| 166 | + if sErr != nil { |
| 167 | + return nil, fmt.Errorf("meter %s summaries: %w", meter.EventName, sErr) |
| 168 | + } |
| 169 | + // One summary for the whole window (no grouping requested); for a |
| 170 | + // "last" meter its aggregated value is the latest MTD push. |
| 171 | + value = summary.AggregatedValue |
| 172 | + } |
| 173 | + values[meter.EventName] = value |
| 174 | + } |
| 175 | + |
| 176 | + return values, nil |
| 177 | +} |
| 178 | + |
| 179 | +// report prints a per-meter ClickHouse-vs-Stripe table and returns an error if |
| 180 | +// any meter drifts beyond the tolerance, so scripts can gate on the exit code. |
| 181 | +func report(out *tui.Renderer, ch, stripe meterValues) error { |
| 182 | + order := []string{"cpu_seconds", "memory_gib_seconds", "egress_public_gib", "disk_gib_seconds", "active_keys"} |
| 183 | + |
| 184 | + drifted := 0 |
| 185 | + out.Blank() |
| 186 | + for _, meter := range order { |
| 187 | + chVal := ch[meter] |
| 188 | + stripeVal := stripe[meter] |
| 189 | + diff := chVal - stripeVal |
| 190 | + |
| 191 | + ok := withinTolerance(chVal, stripeVal) |
| 192 | + verdict := out.Green("ok") |
| 193 | + if !ok { |
| 194 | + verdict = out.Red("DRIFT") |
| 195 | + drifted++ |
| 196 | + } |
| 197 | + out.Printf(" %-20s clickhouse=%-16.6f stripe=%-16.6f diff=%-14.6f %s\n", |
| 198 | + meter, chVal, stripeVal, diff, verdict) |
| 199 | + } |
| 200 | + out.Blank() |
| 201 | + |
| 202 | + if drifted > 0 { |
| 203 | + return fmt.Errorf("%d meter(s) drifted beyond %.3f%%; a gap this large is more than the hourly push lag can explain", |
| 204 | + drifted, relativeTolerance*100) |
| 205 | + } |
| 206 | + out.Println(out.Green("All meters agree within tolerance.")) |
| 207 | + return nil |
| 208 | +} |
| 209 | + |
| 210 | +// withinTolerance compares two meter values with a relative tolerance, treating |
| 211 | +// two zeros as agreement and any zero-vs-nonzero as drift. |
| 212 | +func withinTolerance(a, b float64) bool { |
| 213 | + if a == 0 && b == 0 { |
| 214 | + return true |
| 215 | + } |
| 216 | + scale := math.Max(math.Abs(a), math.Abs(b)) |
| 217 | + return math.Abs(a-b)/scale <= relativeTolerance |
| 218 | +} |
0 commit comments