Skip to content

Commit 21b6b4a

Browse files
authored
Add readiness tracker to ensure caches have been loaded before serving traffic (#541)
* Add readiness tracker to ensure caches have been loaded before serving traffic This commit adds readiness.Tracker, responsible for managing expectations and observations of templates, constraints, and data. Expectations are set according to the initial state of the cluster when Gatekeeper starts - pre-existing templates and their corresponding constraints are tracked until they are loaded by their respective controllers into OPA. Similarly, cached data resources specified by the Config singleton are also tracked to completion. Templates or configured resource removed during warm-up are removed from the initial expected set and will not block readiness. The tracker is registered as a readiness probe - traffic will not be routed to the admission controller while the readiness probe is not returning success. Fixes: #405 Signed-off-by: Oren Shomron <shomron@gmail.com> * * Add circuit-breaker for readiness once expectations have been satisfied for the first time. * Fix expected kinds according to ConstraintTemplate / Config rosters, only use those sets for checking satisfied expectations. * Move config roster calculation into readiness package Signed-off-by: Oren Shomron <shomron@gmail.com> * Workaround for CRD parsing failure where envtest picks up kustomization.yaml A kustomization.yaml was [added](open-policy-agent/frameworks@aa59bd1) in the constraints framework deployment directory. This is read by the CRD loading code of controller-runtime's envtest and fails on parsing. This leads to no CRDs being loaded for tests. This commit works around the problem loading the framework CRDs specifically where needed, rather than using the directory-based loading code path. Signed-off-by: Oren Shomron <shomron@gmail.com> * Add test to verify config readiness with non-existent GVK references Signed-off-by: Oren Shomron <shomron@gmail.com> * Decouple tracker from watch manager, use API server directly. Resource are now listed directly from the API server in the readiness tracker and will not be cached. This means the same resources will be subsequently queried and cached as part of the normal controller operation, but removes some coordination complexity. Also add retry logic when tracking cached data, only short-circuit when a resource kind is unregistered. Signed-off-by: Oren Shomron <shomron@gmail.com> * Address remaining review comments * Unexport Unsatisfied() method, remove from Expectations interface * defer ExpectationsDone() where appropriate, avoids failures when listing resources cannot be completed (even after retries) * Remove unneeded metav1.Object, runtime,Object method implementations on objKey * Use RWMutex for synchronization in objectTracker * Rename isRecoverable -> predicate for in retryLister Signed-off-by: Oren Shomron <shomron@gmail.com> * Create pkg/keys for shared keys used across packages. Fix comments. Signed-off-by: Oren Shomron <shomron@gmail.com> * Fix race condition registering readiness probe Since controllers are registered asynchronously (waiting on certificate generation), it was possible for the readiness probe registration call to happen after the manager started, which caused an error and CrashLoopBackoff. This commit moves probe registration back to the main goroutine. Signed-off-by: Oren Shomron <shomron@gmail.com>
1 parent e1f25a5 commit 21b6b4a

46 files changed

Lines changed: 2563 additions & 90 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

main.go

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
configController "github.com/open-policy-agent/gatekeeper/pkg/controller/config"
3232
"github.com/open-policy-agent/gatekeeper/pkg/controller/constrainttemplate"
3333
"github.com/open-policy-agent/gatekeeper/pkg/metrics"
34+
"github.com/open-policy-agent/gatekeeper/pkg/readiness"
3435
"github.com/open-policy-agent/gatekeeper/pkg/target"
3536
"github.com/open-policy-agent/gatekeeper/pkg/upgrade"
3637
"github.com/open-policy-agent/gatekeeper/pkg/util"
@@ -114,15 +115,14 @@ func main() {
114115

115116
switch *logLevel {
116117
case "DEBUG":
117-
ctrl.SetLogger(crzap.Logger(true))
118+
ctrl.SetLogger(crzap.New(crzap.UseDevMode(true)))
118119
case "WARNING", "ERROR":
119120
setLoggerForProduction()
120121
case "INFO":
121122
fallthrough
122123
default:
123-
ctrl.SetLogger(crzap.Logger(false))
124+
ctrl.SetLogger(crzap.New(crzap.UseDevMode(false)))
124125
}
125-
ctrl.SetLogger(crzap.Logger(true))
126126

127127
// set default if --operation is not provided
128128
if len(operations) == 0 {
@@ -172,18 +172,22 @@ func main() {
172172
// ControllerSwitch will be used to disable controllers during our teardown process,
173173
// avoiding conflicts in finalizer cleanup.
174174
sw := watch.NewSwitch()
175-
go startControllers(mgr, sw, setupFinished)
176175

177-
// +kubebuilder:scaffold:builder
178-
179-
if err := mgr.AddReadyzCheck("default", healthz.Ping); err != nil {
180-
setupLog.Error(err, "unable to create ready check")
176+
// Setup tracker and register readiness probe.
177+
tracker, err := readiness.SetupTracker(mgr)
178+
if err != nil {
179+
setupLog.Error(err, "unable to register readiness tracker")
181180
os.Exit(1)
182181
}
182+
183+
// +kubebuilder:scaffold:builder
184+
183185
if err := mgr.AddHealthzCheck("default", healthz.Ping); err != nil {
184186
setupLog.Error(err, "unable to create health check")
185187
os.Exit(1)
186188
}
189+
// Setup controllers asynchronously, they will block for certificate generation if needed.
190+
go setupControllers(mgr, sw, tracker, setupFinished)
187191

188192
setupLog.Info("starting manager")
189193
hadError := false
@@ -223,8 +227,8 @@ func main() {
223227
}
224228
}
225229

226-
func startControllers(mgr ctrl.Manager, sw *watch.ControllerSwitch, setupFinished chan struct{}) {
227-
// Block until the setup finishes.
230+
func setupControllers(mgr ctrl.Manager, sw *watch.ControllerSwitch, tracker *readiness.Tracker, setupFinished chan struct{}) {
231+
// Block until the setup (certificate generation) finishes.
228232
<-setupFinished
229233

230234
// initialize OPA
@@ -258,10 +262,17 @@ func startControllers(mgr ctrl.Manager, sw *watch.ControllerSwitch, setupFinishe
258262

259263
// Setup all Controllers
260264
setupLog.Info("setting up controller")
261-
if err := controller.AddToManager(mgr, client, wm, sw); err != nil {
265+
opts := controller.Dependencies{
266+
Opa: client,
267+
WatchManger: wm,
268+
ControllerSwitch: sw,
269+
Tracker: tracker,
270+
}
271+
if err := controller.AddToManager(mgr, opts); err != nil {
262272
setupLog.Error(err, "unable to register controllers to the manager")
263273
os.Exit(1)
264274
}
275+
265276
if operations["webhook"] {
266277
setupLog.Info("setting up webhooks")
267278
if err := webhook.AddToManager(mgr, client); err != nil {

pkg/controller/config/config_controller.go

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,15 @@ import (
2323
opa "github.com/open-policy-agent/frameworks/constraint/pkg/client"
2424
configv1alpha1 "github.com/open-policy-agent/gatekeeper/api/v1alpha1"
2525
syncc "github.com/open-policy-agent/gatekeeper/pkg/controller/sync"
26+
"github.com/open-policy-agent/gatekeeper/pkg/keys"
2627
"github.com/open-policy-agent/gatekeeper/pkg/metrics"
28+
"github.com/open-policy-agent/gatekeeper/pkg/readiness"
2729
"github.com/open-policy-agent/gatekeeper/pkg/target"
28-
"github.com/open-policy-agent/gatekeeper/pkg/util"
2930
"github.com/open-policy-agent/gatekeeper/pkg/watch"
3031
"k8s.io/apimachinery/pkg/api/errors"
3132
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
3233
"k8s.io/apimachinery/pkg/runtime"
3334
"k8s.io/apimachinery/pkg/runtime/schema"
34-
"k8s.io/apimachinery/pkg/types"
3535
"k8s.io/apimachinery/pkg/util/wait"
3636
"sigs.k8s.io/controller-runtime/pkg/client"
3737
"sigs.k8s.io/controller-runtime/pkg/controller"
@@ -51,19 +51,19 @@ const (
5151
finalizerName = "finalizers.gatekeeper.sh/config"
5252
)
5353

54-
var CfgKey = types.NamespacedName{Namespace: util.GetNamespace(), Name: "config"}
5554
var log = logf.Log.WithName("controller").WithValues("kind", "Config")
5655

5756
type Adder struct {
5857
Opa *opa.Client
5958
WatchManager *watch.Manager
6059
ControllerSwitch *watch.ControllerSwitch
60+
Tracker *readiness.Tracker
6161
}
6262

6363
// Add creates a new ConfigController and adds it to the Manager with default RBAC. The Manager will set fields on the Controller
6464
// and Start it when the Manager is Started.
6565
func (a *Adder) Add(mgr manager.Manager) error {
66-
r, err := newReconciler(mgr, a.Opa, a.WatchManager, a.ControllerSwitch)
66+
r, err := newReconciler(mgr, a.Opa, a.WatchManager, a.ControllerSwitch, a.Tracker)
6767
if err != nil {
6868
return err
6969
}
@@ -83,8 +83,12 @@ func (a *Adder) InjectControllerSwitch(cs *watch.ControllerSwitch) {
8383
a.ControllerSwitch = cs
8484
}
8585

86+
func (a *Adder) InjectTracker(t *readiness.Tracker) {
87+
a.Tracker = t
88+
}
89+
8690
// newReconciler returns a new reconcile.Reconciler
87-
func newReconciler(mgr manager.Manager, opa syncc.OpaDataClient, wm *watch.Manager, cs *watch.ControllerSwitch) (reconcile.Reconciler, error) {
91+
func newReconciler(mgr manager.Manager, opa syncc.OpaDataClient, wm *watch.Manager, cs *watch.ControllerSwitch, tracker *readiness.Tracker) (reconcile.Reconciler, error) {
8892
watchSet := watch.NewSet()
8993
filteredOpa := syncc.NewFilteredOpaDataClient(opa, watchSet)
9094
syncMetricsCache := syncc.NewMetricsCache()
@@ -96,6 +100,7 @@ func newReconciler(mgr manager.Manager, opa syncc.OpaDataClient, wm *watch.Manag
96100
Opa: filteredOpa,
97101
Events: events,
98102
MetricsCache: syncMetricsCache,
103+
Tracker: tracker,
99104
}
100105
// Create subordinate controller - we will feed it events dynamically via watch
101106
if err := syncAdder.Add(mgr); err != nil {
@@ -118,6 +123,7 @@ func newReconciler(mgr manager.Manager, opa syncc.OpaDataClient, wm *watch.Manag
118123
watcher: w,
119124
watched: watchSet,
120125
syncMetricsCache: syncMetricsCache,
126+
tracker: tracker,
121127
}, nil
122128
}
123129

@@ -152,6 +158,7 @@ type ReconcileConfig struct {
152158
cs *watch.ControllerSwitch
153159
watcher *watch.Registrar
154160
watched *watch.Set
161+
tracker *readiness.Tracker
155162
}
156163

157164
// +kubebuilder:rbac:groups=*,resources=*,verbs=get;list;watch
@@ -173,13 +180,14 @@ func (r *ReconcileConfig) Reconcile(request reconcile.Request) (reconcile.Result
173180
}
174181

175182
// Fetch the Config instance
176-
if request.NamespacedName != CfgKey {
183+
if request.NamespacedName != keys.Config {
177184
log.Info("Ignoring unsupported config name", "namespace", request.NamespacedName.Namespace, "name", request.NamespacedName.Name)
178185
return reconcile.Result{}, nil
179186
}
180187
exists := true
181188
instance := &configv1alpha1.Config{}
182-
err := r.reader.Get(context.TODO(), request.NamespacedName, instance)
189+
ctx := context.Background()
190+
err := r.reader.Get(ctx, request.NamespacedName, instance)
183191
if err != nil {
184192
// if config is not found, we should remove cached data
185193
if errors.IsNotFound(err) {
@@ -210,6 +218,10 @@ func (r *ReconcileConfig) Reconcile(request reconcile.Request) (reconcile.Result
210218
}
211219
}
212220

221+
// Remove expectations for resources we no longer watch.
222+
diff := r.watched.Difference(newSyncOnly)
223+
r.removeStaleExpectations(diff)
224+
213225
// If the watch set has not changed, we're done here.
214226
if r.watched.Equals(newSyncOnly) {
215227
return reconcile.Result{}, nil
@@ -286,6 +298,13 @@ func (r *ReconcileConfig) replayData(ctx context.Context, w *watch.Set) error {
286298
return nil
287299
}
288300

301+
// removeStaleExpectations stops tracking data for any resources that are no longer watched.
302+
func (r *ReconcileConfig) removeStaleExpectations(stale *watch.Set) {
303+
for _, gvk := range stale.Items() {
304+
r.tracker.CancelData(gvk)
305+
}
306+
}
307+
289308
func containsString(s string, items []string) bool {
290309
for _, item := range items {
291310
if item == s {
@@ -310,13 +329,13 @@ func removeString(s string, items []string) []string {
310329
func TearDownState(c client.Client, finished chan struct{}) {
311330
defer close(finished)
312331
syncCfg := &configv1alpha1.Config{}
313-
if err := c.Get(context.Background(), CfgKey, syncCfg); err != nil {
332+
if err := c.Get(context.Background(), keys.Config, syncCfg); err != nil {
314333
log.Error(err, "while retrieving sync config")
315334
return
316335
}
317336
cleanFn := func() (bool, error) {
318337
syncCfg := &configv1alpha1.Config{}
319-
if err := c.Get(context.Background(), CfgKey, syncCfg); err != nil {
338+
if err := c.Get(context.Background(), keys.Config, syncCfg); err != nil {
320339
if errors.IsNotFound(err) {
321340
return true, nil
322341
}

pkg/controller/config/config_controller_test.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,12 @@ import (
2727
opa "github.com/open-policy-agent/frameworks/constraint/pkg/client"
2828
"github.com/open-policy-agent/frameworks/constraint/pkg/client/drivers/local"
2929
constraintTypes "github.com/open-policy-agent/frameworks/constraint/pkg/types"
30+
configv1alpha1 "github.com/open-policy-agent/gatekeeper/api/v1alpha1"
31+
"github.com/open-policy-agent/gatekeeper/pkg/keys"
32+
"github.com/open-policy-agent/gatekeeper/pkg/readiness"
33+
"github.com/open-policy-agent/gatekeeper/pkg/target"
34+
"github.com/open-policy-agent/gatekeeper/pkg/watch"
35+
"github.com/open-policy-agent/gatekeeper/third_party/sigs.k8s.io/controller-runtime/pkg/dynamiccache"
3036
"github.com/prometheus/client_golang/prometheus"
3137
"golang.org/x/net/context"
3238
"k8s.io/apimachinery/pkg/api/meta"
@@ -45,11 +51,6 @@ import (
4551
"sigs.k8s.io/controller-runtime/pkg/manager"
4652
"sigs.k8s.io/controller-runtime/pkg/metrics"
4753
"sigs.k8s.io/controller-runtime/pkg/reconcile"
48-
49-
configv1alpha1 "github.com/open-policy-agent/gatekeeper/api/v1alpha1"
50-
"github.com/open-policy-agent/gatekeeper/pkg/target"
51-
"github.com/open-policy-agent/gatekeeper/pkg/watch"
52-
"github.com/open-policy-agent/gatekeeper/third_party/sigs.k8s.io/controller-runtime/pkg/dynamiccache"
5354
)
5455

5556
var expectedRequest = reconcile.Request{NamespacedName: types.NamespacedName{
@@ -63,7 +64,7 @@ const timeout = time.Second * 20
6364
func setupManager(t *testing.T) (manager.Manager, *watch.Manager) {
6465
t.Helper()
6566

66-
ctrl.SetLogger(zap.Logger(true))
67+
ctrl.SetLogger(zap.New(zap.UseDevMode(true)))
6768
metrics.Registry = prometheus.NewRegistry()
6869
mgr, err := manager.New(cfg, manager.Options{
6970
MetricsBindAddress: "0",
@@ -126,7 +127,9 @@ func TestReconcile(t *testing.T) {
126127
}
127128

128129
cs := watch.NewSwitch()
129-
rec, _ := newReconciler(mgr, opa, wm, cs)
130+
tracker, err := readiness.SetupTracker(mgr)
131+
g.Expect(err).NotTo(gomega.HaveOccurred())
132+
rec, _ := newReconciler(mgr, opa, wm, cs, tracker)
130133
recFn, requests := SetupTestReconcile(rec)
131134
g.Expect(add(mgr, recFn)).NotTo(gomega.HaveOccurred())
132135

@@ -181,7 +184,7 @@ func TestReconcile(t *testing.T) {
181184

182185
g.Eventually(func() error {
183186
obj := &configv1alpha1.Config{}
184-
if err := newCli.Get(context.TODO(), CfgKey, obj); err != nil {
187+
if err := newCli.Get(context.TODO(), keys.Config, obj); err != nil {
185188
return err
186189
}
187190
if hasFinalizer(obj) {
@@ -215,7 +218,9 @@ func TestConfig_CacheContents(t *testing.T) {
215218

216219
opa := &fakeOpa{}
217220
cs := watch.NewSwitch()
218-
rec, _ := newReconciler(mgr, opa, wm, cs)
221+
tracker, err := readiness.SetupTracker(mgr)
222+
g.Expect(err).NotTo(gomega.HaveOccurred())
223+
rec, _ := newReconciler(mgr, opa, wm, cs, tracker)
219224
g.Expect(add(mgr, rec)).NotTo(gomega.HaveOccurred())
220225

221226
stopMgr, mgrStopped := StartTestManager(mgr, g)
@@ -230,7 +235,7 @@ func TestConfig_CacheContents(t *testing.T) {
230235
defer testMgrStopped()
231236

232237
// Create the Config object and expect the Reconcile to be created
233-
err := c.Create(context.TODO(), instance)
238+
err = c.Create(context.TODO(), instance)
234239
g.Expect(err).NotTo(gomega.HaveOccurred())
235240

236241
defer func() {

pkg/controller/constraint/constraint_controller.go

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/open-policy-agent/frameworks/constraint/pkg/core/constraints"
2626
"github.com/open-policy-agent/gatekeeper/pkg/logging"
2727
"github.com/open-policy-agent/gatekeeper/pkg/metrics"
28+
"github.com/open-policy-agent/gatekeeper/pkg/readiness"
2829
"github.com/open-policy-agent/gatekeeper/pkg/util"
2930
csutil "github.com/open-policy-agent/gatekeeper/pkg/util/constraint"
3031
"github.com/open-policy-agent/gatekeeper/pkg/watch"
@@ -56,19 +57,25 @@ type Adder struct {
5657
WatchManager *watch.Manager
5758
ControllerSwitch *watch.ControllerSwitch
5859
Events <-chan event.GenericEvent
60+
Tracker *readiness.Tracker
5961
}
6062

6163
func (a *Adder) InjectOpa(o *opa.Client) {
6264
a.Opa = o
6365
}
6466

6567
func (a *Adder) InjectWatchManager(w *watch.Manager) {
68+
a.WatchManager = w
6669
}
6770

6871
func (a *Adder) InjectControllerSwitch(cs *watch.ControllerSwitch) {
6972
a.ControllerSwitch = cs
7073
}
7174

75+
func (a *Adder) InjectTracker(t *readiness.Tracker) {
76+
a.Tracker = t
77+
}
78+
7279
// Add creates a new Constraint Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller
7380
// and Start it when the Manager is Started.
7481
func (a *Adder) Add(mgr manager.Manager) error {
@@ -78,7 +85,7 @@ func (a *Adder) Add(mgr manager.Manager) error {
7885
return err
7986
}
8087

81-
r := newReconciler(mgr, a.Opa, a.ControllerSwitch, reporter, a.ConstraintsCache)
88+
r := newReconciler(mgr, a.Opa, a.ControllerSwitch, reporter, a.ConstraintsCache, a.Tracker)
8289
return add(mgr, r, a.Events)
8390
}
8491

@@ -98,7 +105,8 @@ func newReconciler(
98105
opa *opa.Client,
99106
cs *watch.ControllerSwitch,
100107
reporter StatsReporter,
101-
constraintsCache *ConstraintsCache) reconcile.Reconciler {
108+
constraintsCache *ConstraintsCache,
109+
tracker *readiness.Tracker) reconcile.Reconciler {
102110
return &ReconcileConstraint{
103111
// Separate reader and writer because manager's default client bypasses the cache for unstructured resources.
104112
writer: mgr.GetClient(),
@@ -111,6 +119,7 @@ func newReconciler(
111119
log: log,
112120
reporter: reporter,
113121
constraintsCache: constraintsCache,
122+
tracker: tracker,
114123
}
115124
}
116125

@@ -146,6 +155,7 @@ type ReconcileConstraint struct {
146155
log logr.Logger
147156
reporter StatsReporter
148157
constraintsCache *ConstraintsCache
158+
tracker *readiness.Tracker
149159
}
150160

151161
// +kubebuilder:rbac:groups=constraints.gatekeeper.sh,resources=*,verbs=get;list;watch;create;update;patch;delete
@@ -298,11 +308,22 @@ func logRemoval(l logr.Logger, constraint *unstructured.Unstructured, enforcemen
298308
}
299309

300310
func (r *ReconcileConstraint) cacheConstraint(instance *unstructured.Unstructured) error {
311+
t := r.tracker.For(instance.GroupVersionKind())
312+
301313
obj := instance.DeepCopy()
302314
// Remove the status field since we do not need it for OPA
303315
unstructured.RemoveNestedField(obj.Object, "status")
304316
_, err := r.opa.AddConstraint(context.Background(), obj)
305-
return err
317+
if err != nil {
318+
t.CancelExpect(obj)
319+
return err
320+
}
321+
322+
// Track for readiness
323+
t.Observe(instance)
324+
log.Info("[readiness] observed Constraint", "name", instance.GetName())
325+
326+
return nil
306327
}
307328

308329
func RemoveFinalizer(instance *unstructured.Unstructured) {

0 commit comments

Comments
 (0)