Skip to content

Commit f9a8821

Browse files
committed
feat(billing): meter active keys for Deploy
1 parent d3e1ba0 commit f9a8821

18 files changed

Lines changed: 280 additions & 15 deletions

File tree

pkg/clickhouse/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ load("@rules_go//go:def.bzl", "go_library", "go_test")
33
go_library(
44
name = "clickhouse",
55
srcs = [
6+
"active_keys.go",
67
"audit_logs.go",
78
"billable_ratelimits.go",
89
"billable_usage.go",
@@ -39,6 +40,7 @@ go_test(
3940
name = "clickhouse_test",
4041
size = "large",
4142
srcs = [
43+
"active_keys_test.go",
4244
"billable_source_test.go",
4345
"billable_usage_test.go",
4446
"errors_test.go",

pkg/clickhouse/active_keys.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package clickhouse
2+
3+
import (
4+
"context"
5+
"strconv"
6+
7+
"github.com/unkeyed/unkey/pkg/fault"
8+
)
9+
10+
// GetActiveKeysUsageRequest scopes the active-keys count to one billing month
11+
// and optionally a single workspace.
12+
type GetActiveKeysUsageRequest struct {
13+
// WorkspaceID restricts the query to one workspace. Empty aggregates
14+
// across every workspace (the reconciliation / shadow-mode path).
15+
WorkspaceID string
16+
17+
// Month is any instant inside the billing month (unix millis); the query
18+
// buckets it to the month start to match the per-month rollup grain.
19+
Month int64
20+
}
21+
22+
// ActiveKeysUsage is the number of distinct active keys for one workspace in
23+
// the requested month.
24+
type ActiveKeysUsage struct {
25+
WorkspaceID string `ch:"workspace_id"`
26+
ActiveKeys int64 `ch:"active_keys"`
27+
}
28+
29+
// GetActiveKeysUsage counts the distinct keys verified through the Deploy
30+
// gateway (source = 'gateway') in the billing month, per workspace. A key is
31+
// active once it has at least one verification in the month, regardless of
32+
// outcome: a RATE_LIMITED or DISABLED verification is still work done for
33+
// that key. API-sourced verifications never count; they are the API product's
34+
// usage, not Deploy's.
35+
func (c *Client) GetActiveKeysUsage(
36+
ctx context.Context,
37+
req GetActiveKeysUsageRequest,
38+
) ([]ActiveKeysUsage, error) {
39+
query := `
40+
SELECT
41+
workspace_id,
42+
toInt64(uniqExact(key_id)) AS active_keys
43+
FROM default.key_verifications_per_month_v3
44+
WHERE time = toDate(toStartOfMonth(fromUnixTimestamp64Milli({month:Int64})))
45+
AND source = 'gateway'
46+
AND ({workspace_id:String} = '' OR workspace_id = {workspace_id:String})
47+
GROUP BY workspace_id
48+
`
49+
50+
usage, err := Select[ActiveKeysUsage](ctx, c.conn, query, map[string]string{
51+
"month": strconv.FormatInt(req.Month, 10),
52+
"workspace_id": req.WorkspaceID,
53+
})
54+
if err != nil {
55+
return nil, fault.Wrap(err, fault.Internal("failed to query active keys usage"))
56+
}
57+
58+
return usage, nil
59+
}

pkg/clickhouse/active_keys_test.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package clickhouse_test
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
ch "github.com/ClickHouse/clickhouse-go/v2"
9+
"github.com/stretchr/testify/assert"
10+
"github.com/stretchr/testify/require"
11+
"github.com/unkeyed/unkey/pkg/clickhouse"
12+
"github.com/unkeyed/unkey/pkg/clickhouse/schema"
13+
"github.com/unkeyed/unkey/pkg/testutil/containers"
14+
"github.com/unkeyed/unkey/pkg/uid"
15+
)
16+
17+
// Active keys = distinct keys verified through the gateway in the month,
18+
// regardless of outcome. API-sourced verifications never count.
19+
func TestGetActiveKeysUsage(t *testing.T) {
20+
chCfg := containers.ClickHouse(t)
21+
22+
client, err := clickhouse.New(clickhouse.Config{URL: chCfg.DSN})
23+
require.NoError(t, err)
24+
t.Cleanup(func() { require.NoError(t, client.Close()) })
25+
26+
opts, err := ch.ParseDSN(chCfg.DSN)
27+
require.NoError(t, err)
28+
conn, err := ch.Open(opts)
29+
require.NoError(t, err)
30+
t.Cleanup(func() { require.NoError(t, conn.Close()) })
31+
32+
ctx := context.Background()
33+
require.NoError(t, conn.Ping(ctx))
34+
35+
now := time.Now()
36+
workspaceID := uid.New(uid.WorkspacePrefix)
37+
38+
// 3 distinct gateway keys: one VALID, one RATE_LIMITED (still active),
39+
// one verified twice (counted once). Plus 2 API-sourced keys (never
40+
// counted) and one gateway key in another workspace.
41+
gateway := func(keyID, outcome string) schema.KeyVerification {
42+
v := createVerifications(workspaceID, 1, now, outcome)[0]
43+
v.KeyID = keyID
44+
v.Source = schema.SourceGateway
45+
return v
46+
}
47+
rows := []schema.KeyVerification{
48+
gateway("key_a", "VALID"),
49+
gateway("key_b", "RATE_LIMITED"),
50+
gateway("key_c", "VALID"),
51+
gateway("key_c", "VALID"),
52+
}
53+
api := createVerifications(workspaceID, 2, now, "VALID")
54+
for i := range api {
55+
api[i].Source = schema.SourceAPI
56+
}
57+
other := createVerifications(uid.New(uid.WorkspacePrefix), 1, now, "VALID")
58+
other[0].Source = schema.SourceGateway
59+
insertVerifications(t, ctx, conn, append(append(rows, api...), other...))
60+
61+
require.EventuallyWithT(t, func(c *assert.CollectT) {
62+
usage, err := client.GetActiveKeysUsage(ctx, clickhouse.GetActiveKeysUsageRequest{
63+
WorkspaceID: workspaceID,
64+
Month: now.UnixMilli(),
65+
})
66+
require.NoError(c, err)
67+
require.Len(c, usage, 1)
68+
assert.Equal(c, workspaceID, usage[0].WorkspaceID)
69+
assert.Equal(c, int64(3), usage[0].ActiveKeys)
70+
}, time.Minute, time.Second)
71+
72+
// All-workspaces variant includes the other workspace's key.
73+
require.EventuallyWithT(t, func(c *assert.CollectT) {
74+
usage, err := client.GetActiveKeysUsage(ctx, clickhouse.GetActiveKeysUsageRequest{
75+
WorkspaceID: "",
76+
Month: now.UnixMilli(),
77+
})
78+
require.NoError(c, err)
79+
assert.GreaterOrEqual(c, len(usage), 2)
80+
}, time.Minute, time.Second)
81+
}

svc/ctrl/internal/billingmeter/interface.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,13 @@ type MeterValues struct {
3434
EgressGiB float64
3535
// DiskGiBSeconds is allocated disk integrated over time, in GiB-seconds.
3636
DiskGiBSeconds float64
37+
// ActiveKeys is the number of distinct keys verified through the Deploy
38+
// gateway this period (month-to-date, like every other meter).
39+
ActiveKeys float64
3740
}
3841

3942
// Positive reports whether any meter has usage worth pushing.
4043
func (v MeterValues) Positive() bool {
41-
return v.CPUSeconds > 0 || v.MemoryGiBSeconds > 0 || v.EgressGiB > 0 || v.DiskGiBSeconds > 0
44+
return v.CPUSeconds > 0 || v.MemoryGiBSeconds > 0 || v.EgressGiB > 0 || v.DiskGiBSeconds > 0 ||
45+
v.ActiveKeys > 0
4246
}

svc/ctrl/internal/billingmeter/stripe.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ const (
2121
eventMemory = "memory_gib_seconds"
2222
eventEgress = "egress_public_gib"
2323
eventDisk = "disk_gib_seconds"
24+
eventKeys = "active_keys"
2425
)
2526

2627
// payloadKeyCustomer and payloadKeyValue are the meter's
@@ -88,6 +89,7 @@ func (p *stripePusher) Push(ctx context.Context, req PushRequest) (int, error) {
8889
{eventMemory, req.Values.MemoryGiBSeconds},
8990
{eventEgress, req.Values.EgressGiB},
9091
{eventDisk, req.Values.DiskGiBSeconds},
92+
{eventKeys, req.Values.ActiveKeys},
9193
}
9294

9395
pushed := 0

svc/ctrl/worker/cron/deploybilling/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ go_test(
3232
embed = [":deploybilling"],
3333
deps = [
3434
"//pkg/clickhouse",
35+
"//svc/ctrl/internal/billingmeter",
3536
"@com_github_stretchr_testify//require",
3637
],
3738
)

svc/ctrl/worker/cron/deploybilling/billing.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
// purpose so the handler depends only on the one query it needs.
1313
type UsageReader interface {
1414
GetInstanceMeterUsage(ctx context.Context, req clickhouse.GetInstanceMeterUsageRequest) ([]clickhouse.InstanceMeterUsage, error)
15+
GetActiveKeysUsage(ctx context.Context, req clickhouse.GetActiveKeysUsageRequest) ([]clickhouse.ActiveKeysUsage, error)
1516
}
1617

1718
const (
@@ -57,7 +58,23 @@ func aggregateUsage(rows []clickhouse.InstanceMeterUsage) map[string]billingmete
5758
MemoryGiBSeconds: a.memoryGiBHours * secondsPerHour,
5859
EgressGiB: float64(a.egressBytes) / bytesPerGiB,
5960
DiskGiBSeconds: a.diskGiBHours * secondsPerHour,
61+
ActiveKeys: 0, // merged in from the active-keys query below
6062
}
6163
}
6264
return out
6365
}
66+
67+
// mergeActiveKeys folds the per-workspace active-key counts into the meter
68+
// values, adding entries for workspaces that have key activity but no
69+
// instance usage (possible: a deployment can be scaled to zero while its
70+
// keys keep verifying through the gateway).
71+
func mergeActiveKeys(
72+
values map[string]billingmeter.MeterValues,
73+
rows []clickhouse.ActiveKeysUsage,
74+
) {
75+
for _, r := range rows {
76+
v := values[r.WorkspaceID] // zero value when instance usage is absent
77+
v.ActiveKeys = float64(r.ActiveKeys)
78+
values[r.WorkspaceID] = v
79+
}
80+
}

svc/ctrl/worker/cron/deploybilling/billing_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
"github.com/stretchr/testify/require"
77
"github.com/unkeyed/unkey/pkg/clickhouse"
8+
"github.com/unkeyed/unkey/svc/ctrl/internal/billingmeter"
89
)
910

1011
func TestAggregateUsage(t *testing.T) {
@@ -38,3 +39,20 @@ func TestAggregateUsage(t *testing.T) {
3839
require.Empty(t, aggregateUsage(nil))
3940
})
4041
}
42+
43+
func TestMergeActiveKeys(t *testing.T) {
44+
values := map[string]billingmeter.MeterValues{
45+
"ws_with_usage": {CPUSeconds: 10, MemoryGiBSeconds: 0, EgressGiB: 0, DiskGiBSeconds: 0, ActiveKeys: 0},
46+
}
47+
mergeActiveKeys(values, []clickhouse.ActiveKeysUsage{
48+
{WorkspaceID: "ws_with_usage", ActiveKeys: 5},
49+
// Key activity without instance usage: deployment scaled to zero
50+
// while its keys keep verifying through the gateway.
51+
{WorkspaceID: "ws_keys_only", ActiveKeys: 2},
52+
})
53+
54+
require.Equal(t, 5.0, values["ws_with_usage"].ActiveKeys)
55+
require.Equal(t, 10.0, values["ws_with_usage"].CPUSeconds, "existing meters must survive the merge")
56+
require.Equal(t, 2.0, values["ws_keys_only"].ActiveKeys)
57+
require.True(t, values["ws_keys_only"].Positive())
58+
}

svc/ctrl/worker/cron/deploybilling/handler.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -153,12 +153,22 @@ func (h *Handler) pushUsage(
153153
return 0, 0, 0, 0, fmt.Errorf("get period usage: %w", err)
154154
}
155155

156-
if len(rows) == 0 {
157-
logger.Info("no deploy usage this period", "billing_period", period)
158-
return 0, 0, 0, 0, nil
156+
keyRows, err := restate.Run(ctx, func(rc restate.RunContext) ([]clickhouse.ActiveKeysUsage, error) {
157+
return h.usage.GetActiveKeysUsage(rc, clickhouse.GetActiveKeysUsageRequest{
158+
WorkspaceID: "", // all workspaces; we filter to billable ones below
159+
Month: p.Start().UnixMilli(),
160+
})
161+
}, restate.WithName("get active keys"))
162+
if err != nil {
163+
return 0, 0, 0, 0, fmt.Errorf("get active keys: %w", err)
159164
}
160165

161166
valuesByWorkspace := aggregateUsage(rows)
167+
mergeActiveKeys(valuesByWorkspace, keyRows)
168+
if len(valuesByWorkspace) == 0 {
169+
logger.Info("no deploy usage this period", "billing_period", period)
170+
return 0, 0, 0, 0, nil
171+
}
162172

163173
// Sort so the downstream journaled steps (db fetch, per-workspace push)
164174
// replay in a stable order.

svc/ctrl/worker/cron/deploybilling/push.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ func (h *Handler) pushAll(ctx restate.ObjectContext, tasks []pushTask) (workspac
6161
"stripe_customer_id", task.req.StripeCustomerID,
6262
"cpu_seconds", task.req.Values.CPUSeconds,
6363
"memory_gib_seconds", task.req.Values.MemoryGiBSeconds,
64+
"active_keys", task.req.Values.ActiveKeys,
6465
"egress_gib", task.req.Values.EgressGiB,
6566
"disk_gib_seconds", task.req.Values.DiskGiBSeconds,
6667
"meters_pushed", n,

0 commit comments

Comments
 (0)