Skip to content

Commit 7fe7474

Browse files
committed
test(billing): shadow-mode meter validation (ENG-2879)
Fixture tests proving the meter query maps to known dollar bills, plus a 'dev stripe shadow' command that diffs a workspace's live ClickHouse usage against the values Stripe is holding for its meters.
1 parent 4dc389e commit 7fe7474

6 files changed

Lines changed: 436 additions & 1 deletion

File tree

cmd/dev/stripe/BUILD.bazel

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
load("@rules_go//go:def.bzl", "go_library")
1+
load("@rules_go//go:def.bzl", "go_library", "go_test")
22

33
go_library(
44
name = "stripe",
@@ -7,14 +7,23 @@ go_library(
77
"grants.go",
88
"invoices.go",
99
"reset.go",
10+
"shadow.go",
1011
"stripe.go",
1112
],
1213
importpath = "github.com/unkeyed/unkey/cmd/dev/stripe",
1314
visibility = ["//visibility:public"],
1415
deps = [
16+
"//pkg/billingperiod",
1517
"//pkg/cli",
18+
"//pkg/clickhouse",
1619
"//pkg/db",
1720
"//pkg/tui",
1821
"@com_github_stripe_stripe_go_v86//:stripe-go",
1922
],
2023
)
24+
25+
go_test(
26+
name = "stripe_test",
27+
srcs = ["shadow_test.go"],
28+
embed = [":stripe"],
29+
)

cmd/dev/stripe/shadow.go

Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
package stripe
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"math"
7+
"os"
8+
"time"
9+
10+
stripesdk "github.com/stripe/stripe-go/v86"
11+
"github.com/unkeyed/unkey/pkg/billingperiod"
12+
"github.com/unkeyed/unkey/pkg/cli"
13+
"github.com/unkeyed/unkey/pkg/clickhouse"
14+
"github.com/unkeyed/unkey/pkg/tui"
15+
)
16+
17+
// Worker unit conversions (svc/ctrl/worker/cron/deploybilling/billing.go): the
18+
// ClickHouse query reports memory/disk in GiB-hours and egress in bytes, while
19+
// the meters bill in GiB-seconds and GiB. Kept in sync by hand; the fixture
20+
// test TestMeterUsageHandComputedBill guards the same factors.
21+
const (
22+
secondsPerHour = 3600.0
23+
bytesPerGiB = float64(int64(1024 * 1024 * 1024))
24+
)
25+
26+
// relativeTolerance is the drift a meter may show before shadow flags it. The
27+
// hourly push can lag the live query by up to an hour, so a small difference
28+
// is usually that lag rather than a real problem. Set above the lag, below
29+
// any difference that would indicate the pipeline is dropping or double-counting.
30+
const relativeTolerance = 0.001 // 0.1%
31+
32+
// shadowCmd recomputes a workspace's month-to-date Deploy usage from ClickHouse
33+
// and diffs it against the values Stripe holds for the customer's meters. Use it
34+
// before enabling billing to check the query, hourly push, and meters agree.
35+
var shadowCmd = &cli.Command{
36+
Name: "shadow",
37+
Usage: "Diff a workspace's ClickHouse usage against Stripe's meter values",
38+
Flags: []cli.Flag{
39+
keyFlag(),
40+
cli.String("clickhouse-url", "ClickHouse DSN", cli.EnvVar("CLICKHOUSE_URL"), cli.Required()),
41+
cli.String("workspace", "Workspace id (for the ClickHouse query)", cli.Required()),
42+
cli.String("customer", "Stripe customer id (cus_...)", cli.Required()),
43+
cli.String("month", "Billing month YYYY-MM (default: current)", cli.Default("")),
44+
},
45+
Action: shadow,
46+
}
47+
48+
// meterValues holds one workspace's usage in the exact units each Stripe meter
49+
// expects, keyed by the meter's event_name so it lines up with the summaries.
50+
type meterValues map[string]float64
51+
52+
func shadow(ctx context.Context, cmd *cli.Command) error {
53+
sc, err := newClient(cmd)
54+
if err != nil {
55+
return err
56+
}
57+
out := tui.New(os.Stdout)
58+
59+
period, err := resolvePeriod(ctx, sc, cmd.String("month"))
60+
if err != nil {
61+
return err
62+
}
63+
start := period.Start()
64+
end := period.End()
65+
66+
out.Printf("Shadow %s workspace=%s customer=%s\n",
67+
out.Bold(start.Format("2006-01")), cmd.RequireString("workspace"), cmd.RequireString("customer"))
68+
out.Println(out.Dim(fmt.Sprintf(" window %s -> %s", start.Format("2006-01-02"), end.Format("2006-01-02"))))
69+
70+
chValues, err := clickhouseUsage(ctx, cmd, start.UnixMilli(), end.UnixMilli())
71+
if err != nil {
72+
return err
73+
}
74+
75+
stripeValues, err := stripeMeterValues(ctx, sc, cmd.RequireString("customer"), start.Unix(), end.Unix())
76+
if err != nil {
77+
return err
78+
}
79+
80+
return report(out, chValues, stripeValues)
81+
}
82+
83+
// resolvePeriod picks the billing period: the --month flag if given, else the
84+
// current calendar month from the local clock. For clocked test customers
85+
// whose "now" is not wall-clock time, pass --month explicitly.
86+
func resolvePeriod(_ context.Context, _ *stripesdk.Client, month string) (billingperiod.Period, error) {
87+
if month != "" {
88+
return billingperiod.Parse(month)
89+
}
90+
now := time.Now().UTC()
91+
return billingperiod.Period{Year: now.Year(), Month: now.Month()}, nil
92+
}
93+
94+
// clickhouseUsage runs the production billing queries for the window and folds
95+
// them into meter units, exactly as the worker does before pushing.
96+
func clickhouseUsage(ctx context.Context, cmd *cli.Command, startMillis, endMillis int64) (meterValues, error) {
97+
ch, err := clickhouse.New(clickhouse.Config{URL: cmd.RequireString("clickhouse-url")})
98+
if err != nil {
99+
return nil, fmt.Errorf("connect clickhouse: %w", err)
100+
}
101+
defer func() { _ = ch.Close() }()
102+
103+
workspace := cmd.RequireString("workspace")
104+
105+
usage, err := ch.GetInstanceMeterUsage(ctx, clickhouse.GetInstanceMeterUsageRequest{
106+
WorkspaceID: workspace,
107+
Start: startMillis,
108+
End: endMillis,
109+
})
110+
if err != nil {
111+
return nil, fmt.Errorf("query instance usage: %w", err)
112+
}
113+
114+
var cpuSeconds, memoryGiBHours, diskGiBHours float64
115+
var egressBytes int64
116+
for _, r := range usage {
117+
cpuSeconds += r.CPUSeconds
118+
memoryGiBHours += r.MemoryGiBHours
119+
diskGiBHours += r.DiskGiBHours
120+
egressBytes += r.EgressBytes
121+
}
122+
123+
keys, err := ch.GetActiveKeysUsage(ctx, clickhouse.GetActiveKeysUsageRequest{
124+
WorkspaceID: workspace,
125+
Month: startMillis,
126+
})
127+
if err != nil {
128+
return nil, fmt.Errorf("query active keys: %w", err)
129+
}
130+
var activeKeys float64
131+
for _, k := range keys {
132+
activeKeys += float64(k.ActiveKeys)
133+
}
134+
135+
return meterValues{
136+
"cpu_seconds": cpuSeconds,
137+
"memory_gib_seconds": memoryGiBHours * secondsPerHour,
138+
"egress_public_gib": float64(egressBytes) / bytesPerGiB,
139+
"disk_gib_seconds": diskGiBHours * secondsPerHour,
140+
"active_keys": activeKeys,
141+
}, nil
142+
}
143+
144+
// stripeMeterValues reads the value Stripe currently holds for each meter: the
145+
// "last"-aggregated summary over the window is the most recent month-to-date
146+
// total the worker pushed.
147+
func stripeMeterValues(ctx context.Context, sc *stripesdk.Client, customer string, startUnix, endUnix int64) (meterValues, error) {
148+
values := meterValues{}
149+
150+
meters := sc.V1BillingMeters.List(ctx, &stripesdk.BillingMeterListParams{
151+
ListParams: stripesdk.ListParams{Limit: stripesdk.Int64(100)},
152+
})
153+
for meter, err := range meters.All(ctx) {
154+
if err != nil {
155+
return nil, fmt.Errorf("list meters: %w", err)
156+
}
157+
158+
summaries := sc.V1BillingMeterEventSummaries.List(ctx, &stripesdk.BillingMeterEventSummaryListParams{
159+
ID: stripesdk.String(meter.ID),
160+
Customer: stripesdk.String(customer),
161+
StartTime: stripesdk.Int64(startUnix),
162+
EndTime: stripesdk.Int64(endUnix),
163+
})
164+
var value float64
165+
for summary, sErr := range summaries.All(ctx) {
166+
if sErr != nil {
167+
return nil, fmt.Errorf("meter %s summaries: %w", meter.EventName, sErr)
168+
}
169+
// One summary for the whole window (no grouping requested); for a
170+
// "last" meter its aggregated value is the latest MTD push.
171+
value = summary.AggregatedValue
172+
}
173+
values[meter.EventName] = value
174+
}
175+
176+
return values, nil
177+
}
178+
179+
// report prints a per-meter ClickHouse-vs-Stripe table and returns an error if
180+
// any meter drifts beyond the tolerance, so scripts can gate on the exit code.
181+
func report(out *tui.Renderer, ch, stripe meterValues) error {
182+
order := []string{"cpu_seconds", "memory_gib_seconds", "egress_public_gib", "disk_gib_seconds", "active_keys"}
183+
184+
drifted := 0
185+
out.Blank()
186+
for _, meter := range order {
187+
chVal := ch[meter]
188+
stripeVal := stripe[meter]
189+
diff := chVal - stripeVal
190+
191+
ok := withinTolerance(chVal, stripeVal)
192+
verdict := out.Green("ok")
193+
if !ok {
194+
verdict = out.Red("DRIFT")
195+
drifted++
196+
}
197+
out.Printf(" %-20s clickhouse=%-16.6f stripe=%-16.6f diff=%-14.6f %s\n",
198+
meter, chVal, stripeVal, diff, verdict)
199+
}
200+
out.Blank()
201+
202+
if drifted > 0 {
203+
return fmt.Errorf("%d meter(s) drifted beyond %.3f%%; a gap this large is more than the hourly push lag can explain",
204+
drifted, relativeTolerance*100)
205+
}
206+
out.Println(out.Green("All meters agree within tolerance."))
207+
return nil
208+
}
209+
210+
// withinTolerance compares two meter values with a relative tolerance, treating
211+
// two zeros as agreement and any zero-vs-nonzero as drift.
212+
func withinTolerance(a, b float64) bool {
213+
if a == 0 && b == 0 {
214+
return true
215+
}
216+
scale := math.Max(math.Abs(a), math.Abs(b))
217+
return math.Abs(a-b)/scale <= relativeTolerance
218+
}

cmd/dev/stripe/shadow_test.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package stripe
2+
3+
import "testing"
4+
5+
func TestWithinTolerance(t *testing.T) {
6+
cases := []struct {
7+
name string
8+
a, b float64
9+
want bool
10+
}{
11+
{"both zero", 0, 0, true},
12+
{"zero vs nonzero drifts", 0, 1, false},
13+
{"identical", 3600, 3600, true},
14+
{"within 0.1% (push lag)", 100000, 100050, true},
15+
{"beyond 0.1% drifts", 100000, 100200, false},
16+
{"tiny absolute, equal", 0.234375, 0.234375, true},
17+
}
18+
for _, tc := range cases {
19+
t.Run(tc.name, func(t *testing.T) {
20+
if got := withinTolerance(tc.a, tc.b); got != tc.want {
21+
t.Fatalf("withinTolerance(%v, %v) = %v, want %v", tc.a, tc.b, got, tc.want)
22+
}
23+
})
24+
}
25+
}

cmd/dev/stripe/stripe.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ var Cmd = &cli.Command{
3131
invoicesCmd,
3232
grantsCmd,
3333
resetCmd,
34+
shadowCmd,
3435
},
3536
}
3637

pkg/clickhouse/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ go_test(
4545
"billable_usage_test.go",
4646
"errors_test.go",
4747
"instance_events_test.go",
48+
"instance_meter_bill_test.go",
4849
"instance_meter_test.go",
4950
"key_verifications_test.go",
5051
"ratelimits_test.go",

0 commit comments

Comments
 (0)