Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 24 additions & 10 deletions cli/cmd/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/cloudquery/cloudquery/cli/v6/internal/auth"
"github.com/cloudquery/cloudquery/cli/v6/internal/env"
"github.com/cloudquery/cloudquery/cli/v6/internal/otel"
cqplatform "github.com/cloudquery/cloudquery/cli/v6/internal/platform"
"github.com/cloudquery/cloudquery/cli/v6/internal/specs/v0"
"github.com/cloudquery/plugin-pb-go/managedplugin"
"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -206,7 +207,9 @@ func sync(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
log.Info().Strs("args", args).Msg("Loading spec(s)")
fmt.Printf("Loading spec(s) from %s\n", strings.Join(args, ", "))
specReader, err := specs.NewSpecReader(args)
// Validate after injection so a source-only spec isn't rejected before the
// platform destination is added.
specReader, err := specs.NewSpecReaderWithoutValidation(args)
if err != nil {
return fmt.Errorf("failed to load spec(s) from %s. Error: %w", strings.Join(args, ", "), err)
}
Expand All @@ -227,6 +230,26 @@ func sync(cmd *cobra.Command, args []string) error {

var otelReceiver *otel.OtelReceiver

authToken, err := auth.GetAuthTokenIfNeeded(log.Logger, sources, destinations, transformers)
if err != nil {
return fmt.Errorf("failed to get auth token: %w", err)
}
teamName, err := auth.GetTeamForToken(ctx, authToken)
if err != nil {
return fmt.Errorf("failed to get team name from token: %w", err)
}

// Must run before the needSummary/otel decisions below: the injected
// destination sets SyncSummary.
destinations, err = cqplatform.MaybeInjectDestination(ctx, log.Logger, authToken.Value, teamName, sources, destinations)
if err != nil {
return err
}

if err := specReader.SetDestinationsAndValidate(destinations); err != nil {
return fmt.Errorf("failed to load spec(s) from %s. Error: %w", strings.Join(args, ", "), err)
}

var destsWantSummary bool
for _, dest := range destinations {
destsWantSummary = destsWantSummary || dest.SyncSummary
Expand Down Expand Up @@ -262,15 +285,6 @@ func sync(cmd *cobra.Command, args []string) error {
fmt.Println(err)
}
}()
authToken, err := auth.GetAuthTokenIfNeeded(log.Logger, sources, destinations, transformers)
if err != nil {
return fmt.Errorf("failed to get auth token: %w", err)
}
teamName, err := auth.GetTeamForToken(ctx, authToken)
if err != nil {
return fmt.Errorf("failed to get team name from token: %w", err)
}

pluginVersionWarner, _ := managedplugin.NewPluginVersionWarner(log.Logger, authToken.Value)
specs.WarnOnOutdatedVersions(ctx, pluginVersionWarner, sources, destinations, transformers)

Expand Down
2 changes: 1 addition & 1 deletion cli/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/apache/arrow-go/v18 v18.6.0
github.com/bradleyjkemp/cupaloy/v2 v2.8.0
github.com/cenkalti/backoff/v5 v5.0.3
github.com/cloudquery/cloudquery-api-go v1.14.10
github.com/cloudquery/cloudquery-api-go v1.14.11-0.20260616074206-c9fbe79d8954
github.com/cloudquery/codegen v0.4.1
github.com/cloudquery/plugin-pb-go v1.27.15
github.com/cloudquery/plugin-sdk/v4 v4.95.2
Expand Down
4 changes: 2 additions & 2 deletions cli/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1 h1:q763qf9huN11kDQavWs
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/clipperhouse/uax29/v2 v2.7.0 h1:+gs4oBZ2gPfVrKPthwbMzWZDaAFPGYK72F0NJv2v7Vk=
github.com/clipperhouse/uax29/v2 v2.7.0/go.mod h1:EFJ2TJMRUaplDxHKj1qAEhCtQPW2tJSwu5BF98AuoVM=
github.com/cloudquery/cloudquery-api-go v1.14.10 h1:AzdqFc6hB2YZj4RA9kG/hGGeXDI2kPX3rb7S5JLGN7M=
github.com/cloudquery/cloudquery-api-go v1.14.10/go.mod h1:lXGJ0XbzQUUFN/xLUOosw/+yvlwNeeLD/GxlstUBc6w=
github.com/cloudquery/cloudquery-api-go v1.14.11-0.20260616074206-c9fbe79d8954 h1:GZ6Y68yJZCtdMMQTXC4ib56C7OVOMW8NMfGarnITC3I=
github.com/cloudquery/cloudquery-api-go v1.14.11-0.20260616074206-c9fbe79d8954/go.mod h1:lXGJ0XbzQUUFN/xLUOosw/+yvlwNeeLD/GxlstUBc6w=
github.com/cloudquery/codegen v0.4.1 h1:c9D18N925tUvnDeGHIl3JWKj37TyII9daHufkf8hU+Y=
github.com/cloudquery/codegen v0.4.1/go.mod h1:QWIOD6R1aCa+YM+th+9Qt9lZw+ztdJR9JDEMLWyazwM=
github.com/cloudquery/jsonschema v0.0.0-20260327151118-8dfb902740f6 h1:5KvFfR2HlsFFSUpnjrfp0D3jv69sYV3VlxEfmBd4EL0=
Expand Down
255 changes: 255 additions & 0 deletions cli/internal/platform/inject.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
// Package platform auto-injects a platform destination into syncs for teams
// with an active platform tenant.
package platform

import (
"context"
"errors"
"fmt"
"os"
"slices"
"strconv"
"strings"
"time"

cloudquery_api "github.com/cloudquery/cloudquery-api-go"
cqapiauth "github.com/cloudquery/cloudquery-api-go/auth"
"github.com/cloudquery/cloudquery/cli/v6/internal/api"
cqauth "github.com/cloudquery/cloudquery/cli/v6/internal/auth"
"github.com/cloudquery/cloudquery/cli/v6/internal/env"
specs "github.com/cloudquery/cloudquery/cli/v6/internal/specs/v0"
"github.com/rs/zerolog"
)

const (
envDisable = "CQ_DISABLE_PLATFORM_DESTINATION"
envTenantID = "CQ_PLATFORM_TENANT_ID"

envPluginRegistry = "CQ_PLATFORM_PLUGIN_REGISTRY"
envPluginPath = "CQ_PLATFORM_PLUGIN_PATH"
envPluginVersion = "CQ_PLATFORM_PLUGIN_VERSION"

destinationName = "platform"

requestTimeout = 10 * time.Second
)

// Tenant statuses that are eligible for platform destination injection.
var injectableStatuses = []cloudquery_api.PlatformTenantStatus{
cloudquery_api.PlatformTenantStatusActive,
cloudquery_api.PlatformTenantStatusCreated,
}

type pluginCoordinates struct {
Registry string
Path string
Version string
}

var defaultPlugin = pluginCoordinates{
Registry: "cloudquery",
Path: "cloudquery/platform",
Version: "v1.0.0",
}

// platformAPIURL appends /api (where /external-syncs/* is served) to the
// session api_url unless already present.
func platformAPIURL(sessionURL string) string {
url := strings.TrimRight(sessionURL, "/")
if !strings.HasSuffix(url, "/api") {
url += "/api"
}
return url
}

func pluginCoords() pluginCoordinates {
p := defaultPlugin
if v := os.Getenv(envPluginRegistry); v != "" {
p.Registry = v
}
if v := os.Getenv(envPluginPath); v != "" {
p.Path = v
}
if v := os.Getenv(envPluginVersion); v != "" {
p.Version = v
}
return p
}

// MaybeInjectDestination appends a `platform` destination carrying a freshly
// minted cqpd_ token when the team has an active platform tenant. Tenant/network
// failures skip injection silently; a pre-existing `platform` destination is a
// hard error rather than a silent overwrite.
func MaybeInjectDestination(ctx context.Context, logger zerolog.Logger, token, teamName string, sources []*specs.Source, destinations []*specs.Destination) ([]*specs.Destination, error) {
if os.Getenv(envDisable) == "1" {
return destinations, nil
}
if env.IsCloud() {
return destinations, nil
}
// The caller only fetches a token for cloudquery-registry specs; resolve
// directly so source-only specs can still inject. Failure just skips.
if token == "" {
var err error
if token, teamName, err = resolveCredentials(ctx); err != nil {
logger.Debug().Err(err).Msg("platform destination: credentials unavailable, skipping auto-injection")
return destinations, nil
}
}
if token == "" || teamName == "" {
return destinations, nil
}

cl, err := api.NewClient(token)
if err != nil {
logger.Debug().Err(err).Msg("platform destination: api client init failed, skipping auto-injection")
return destinations, nil
}

tenants, err := activeTenants(ctx, cl, teamName)
if err != nil {
logger.Debug().Err(err).Msg("platform destination: tenant discovery failed, skipping auto-injection")
return destinations, nil
}
tenant, ok := selectTenant(logger, tenants)
if !ok {
return destinations, nil
}

// Injecting: a pre-existing `platform` destination collides with the
// reserved name — fail rather than overwrite.
for _, d := range destinations {
if d.Name == destinationName {
return destinations, fmt.Errorf("a destination named %q already exists, but this name is reserved for the auto-injected CloudQuery Platform destination; remove it from your spec", destinationName)
}
}

session, err := mintSession(ctx, cl, tenant)
if err != nil {
logger.Warn().Err(err).Str("tenant_id", tenant.TenantId.String()).Msg("platform destination: session mint failed, skipping auto-injection")
return destinations, nil
}

plugin := pluginCoords()
parsedRegistry, err := specs.RegistryFromString(plugin.Registry)
if err != nil {
logger.Warn().Err(err).Str("registry", plugin.Registry).Msg("platform destination: unknown plugin registry; skipping auto-injection")
return destinations, nil
}

apiURL := platformAPIURL(session.ApiUrl)
dest := &specs.Destination{
Metadata: specs.Metadata{
Name: destinationName,
Path: plugin.Path,
Registry: parsedRegistry,
Version: plugin.Version,
},
SyncSummary: true,
// sync_group_id is rejected with the default overwrite-delete-stale mode.
WriteMode: specs.WriteModeAppend,
// Unique per invocation so concurrent runs don't wipe each other's rows.
SyncGroupId: strconv.FormatUint(allocateSyncGroupID(time.Now()), 10),
Spec: map[string]any{
"api_url": apiURL,
"token": session.Token,
},
}
dest.SetDefaults()
destinations = append(destinations, dest)

for _, s := range sources {
if !slices.Contains(s.Destinations, destinationName) {
s.Destinations = append(s.Destinations, destinationName)
}
}
logger.Info().
Str("platform_url", apiURL).
Str("tenant_id", tenant.TenantId.String()).
Str("registry", plugin.Registry).
Str("path", plugin.Path).
Str("version", plugin.Version).
Msg("auto-injected platform destination")
return destinations, nil
}

// resolveCredentials fetches a token and team for best-effort injection when
// the sync command didn't authenticate. Overridable in tests.
var resolveCredentials = func(ctx context.Context) (token, team string, err error) {
tok, err := cqapiauth.NewTokenClient().GetToken()
if err != nil {
return "", "", err
}
team, err = cqauth.GetTeamForToken(ctx, tok)
if err != nil {
return "", "", err
}
return tok.Value, team, nil
}

func selectTenant(logger zerolog.Logger, tenants []cloudquery_api.PlatformTenantSummary) (cloudquery_api.PlatformTenantSummary, bool) {
switch len(tenants) {
case 0:
return cloudquery_api.PlatformTenantSummary{}, false
case 1:
return tenants[0], true
}
want := os.Getenv(envTenantID)
if want == "" {
logger.Warn().Int("tenants", len(tenants)).Msgf("platform destination: team has multiple active tenants; set %s to choose one, skipping auto-injection", envTenantID)
return cloudquery_api.PlatformTenantSummary{}, false
}
for _, t := range tenants {
if t.TenantId.String() == want {
return t, true
}
}
logger.Warn().Str("tenant_id", want).Msgf("platform destination: %s does not match any active tenant, skipping auto-injection", envTenantID)
return cloudquery_api.PlatformTenantSummary{}, false
}

// YYYYMMDDhhmmssfff — same shape platform/syncs-transformer uses, so
// external-sync rows share the keyspace.
func allocateSyncGroupID(now time.Time) uint64 {
t := now.UTC()
base := t.Format("20060102150405") + fmt.Sprintf("%03d", t.Nanosecond()/1e6)
u, _ := strconv.ParseUint(base, 10, 64)
return u
}

func activeTenants(ctx context.Context, cl *cloudquery_api.ClientWithResponses, teamName string) ([]cloudquery_api.PlatformTenantSummary, error) {
ctx, cancel := context.WithTimeout(ctx, requestTimeout)
defer cancel()

resp, err := cl.ListUserPlatformTenantsWithResponse(ctx)
if err != nil {
return nil, err
}
if resp.JSON200 == nil {
return nil, fmt.Errorf("unexpected status %d listing platform tenants: %s", resp.StatusCode(), strings.TrimSpace(string(resp.Body)))
}
active := make([]cloudquery_api.PlatformTenantSummary, 0, len(resp.JSON200.Items))
for _, t := range resp.JSON200.Items {
if t.TeamName == teamName && slices.Contains(injectableStatuses, t.Status) {
active = append(active, t)
}
}
return active, nil
}

func mintSession(ctx context.Context, cl *cloudquery_api.ClientWithResponses, tenant cloudquery_api.PlatformTenantSummary) (*cloudquery_api.CreatePlatformDestinationSession201Response, error) {
ctx, cancel := context.WithTimeout(ctx, requestTimeout)
defer cancel()

resp, err := cl.CreatePlatformDestinationSessionWithResponse(ctx, cloudquery_api.CreatePlatformDestinationSessionRequest{TenantId: tenant.TenantId})
if err != nil {
return nil, err
}
if resp.JSON201 == nil {
return nil, fmt.Errorf("unexpected status %d minting platform destination session: %s", resp.StatusCode(), strings.TrimSpace(string(resp.Body)))
}
if resp.JSON201.Token == "" || resp.JSON201.ApiUrl == "" {
return nil, errors.New("platform destination session response missing token or api_url")
}
return resp.JSON201, nil
}
Loading
Loading