|
| 1 | +From a39f3f7e30dc36812881e8f43d50b73abb5d2aba Mon Sep 17 00:00:00 2001 |
| 2 | +From: Rahul Ganesh <rahulgab@amazon.com> |
| 3 | +Date: Fri, 20 Mar 2026 11:51:16 -0700 |
| 4 | +Subject: [PATCH 2/2] retry on 403/401 in ServicesWatcher with exponential backoff |
| 5 | + |
| 6 | +Retry WatchFn with exponential backoff when service watcher starts against |
| 7 | +transient 403 and 401 errors. On joining CP nodes with K8s 1.34+, the local |
| 8 | +etcd may still be a learner when kube-vip starts, causing RBAC data |
| 9 | +to be unavailable through the learner until it graduates. |
| 10 | + |
| 11 | +Signed-off-by: Rahul Ganesh <rahulgab@amazon.com> |
| 12 | +--- |
| 13 | + pkg/services/watch_services.go | 39 +++++++++++++++++++++++++++++++++- |
| 14 | + 1 file changed, 38 insertions(+), 1 deletion(-) |
| 15 | + |
| 16 | +diff --git a/pkg/services/watch_services.go b/pkg/services/watch_services.go |
| 17 | +index 8c27ec6..58631ba 100644 |
| 18 | +--- a/pkg/services/watch_services.go |
| 19 | ++++ b/pkg/services/watch_services.go |
| 20 | +@@ -3,6 +3,7 @@ package services |
| 21 | + import ( |
| 22 | + "context" |
| 23 | + "fmt" |
| 24 | ++ "time" |
| 25 | + |
| 26 | + log "log/slog" |
| 27 | + |
| 28 | +@@ -14,11 +15,45 @@ import ( |
| 29 | + v1 "k8s.io/api/core/v1" |
| 30 | + apierrors "k8s.io/apimachinery/pkg/api/errors" |
| 31 | + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| 32 | ++ "k8s.io/apimachinery/pkg/util/wait" |
| 33 | + "k8s.io/apimachinery/pkg/watch" |
| 34 | + "k8s.io/client-go/tools/cache" |
| 35 | + watchtools "k8s.io/client-go/tools/watch" |
| 36 | + ) |
| 37 | + |
| 38 | ++// watchWithAuthRetry retries watchFn with exponential backoff on transient 403 Forbidden |
| 39 | ++// and 401 Unauthorized errors. On joining control plane nodes with K8s 1.34+, the local |
| 40 | ++// etcd may still be a learner when kube-vip starts, causing RBAC data to be unavailable. |
| 41 | ++// These auth errors resolve once etcd is promoted to a full member (typically within seconds). |
| 42 | ++// Non-auth errors are returned immediately. Context cancellation stops the retry loop. |
| 43 | ++func watchWithAuthRetry(ctx context.Context, watchFn func(context.Context) (watch.Interface, error)) (watch.Interface, error) { |
| 44 | ++ var w watch.Interface |
| 45 | ++ var lastErr error |
| 46 | ++ err := wait.ExponentialBackoffWithContext(ctx, wait.Backoff{ |
| 47 | ++ Duration: 2 * time.Second, |
| 48 | ++ Factor: 2.0, |
| 49 | ++ Jitter: 0.1, |
| 50 | ++ Steps: 10, |
| 51 | ++ Cap: 30 * time.Second, |
| 52 | ++ }, func(ctx context.Context) (bool, error) { |
| 53 | ++ var watchErr error |
| 54 | ++ w, watchErr = watchFn(ctx) |
| 55 | ++ if watchErr == nil { |
| 56 | ++ return true, nil |
| 57 | ++ } |
| 58 | ++ if !apierrors.IsForbidden(watchErr) && !apierrors.IsUnauthorized(watchErr) { |
| 59 | ++ return false, watchErr |
| 60 | ++ } |
| 61 | ++ lastErr = watchErr |
| 62 | ++ log.Warn("(svcs) services watch auth error, retrying", "err", watchErr) |
| 63 | ++ return false, nil |
| 64 | ++ }) |
| 65 | ++ if err != nil { |
| 66 | ++ return nil, fmt.Errorf("(svcs) services watch failed after retries: %w (last: %v)", err, lastErr) |
| 67 | ++ } |
| 68 | ++ return w, nil |
| 69 | ++} |
| 70 | ++ |
| 71 | + // This function handles the watching of a services endpoints and updates a load balancers endpoint configurations accordingly |
| 72 | + func (p *Processor) ServicesWatcher(ctx context.Context, serviceFunc func(*servicecontext.Context, *v1.Service) error) error { |
| 73 | + // first start port mirroring if enabled |
| 74 | +@@ -44,7 +79,9 @@ func (p *Processor) ServicesWatcher(ctx context.Context, serviceFunc func(*servi |
| 75 | + // Use a restartable watcher, as this should help in the event of etcd or timeout issues |
| 76 | + rw, err := watchtools.NewRetryWatcherWithContext(ctx, "1", &cache.ListWatch{ |
| 77 | + WatchFunc: func(_ metav1.ListOptions) (watch.Interface, error) { |
| 78 | +- return p.rwClientSet.CoreV1().Services(p.config.ServiceNamespace).Watch(ctx, metav1.ListOptions{}) |
| 79 | ++ return watchWithAuthRetry(ctx, func(ctx context.Context) (watch.Interface, error) { |
| 80 | ++ return p.rwClientSet.CoreV1().Services(p.config.ServiceNamespace).Watch(ctx, metav1.ListOptions{}) |
| 81 | ++ }) |
| 82 | + }, |
| 83 | + }) |
| 84 | + if err != nil { |
| 85 | +-- |
| 86 | +2.46.0 |
| 87 | + |
0 commit comments