Skip to content

Commit 7742c25

Browse files
committed
wait for services to be deletes
1 parent 1727419 commit 7742c25

4 files changed

Lines changed: 290 additions & 1 deletion

File tree

deploy/deploy.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,18 @@ func (d *Deployment) Deploy(ctx context.Context, kubecfg string) (rerr error) {
224224
log.Warningf("Failed to start pod watcher: %v", err)
225225
} else {
226226
w.SetProgress(d.Progress)
227+
// Restrict watcher to known namespaces managed during deployment to avoid noise
228+
// from unrelated user workloads in other namespaces.
229+
w.AllowNamespaces(
230+
"kube-system",
231+
"metallb-system",
232+
"meshnet",
233+
"arista-ceoslab-operator-system",
234+
"lemming-operator",
235+
"srlinux-controller-system",
236+
"ixiatg-op-system",
237+
"cdnos-controller-system",
238+
)
227239
defer func() {
228240
cancel()
229241
rerr = w.Cleanup(rerr)

pods/status.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ type Watcher struct {
3535
progress bool
3636
currentNamespace string
3737
currentPod types.UID
38+
allowedNS map[string]struct{}
3839
}
3940

4041
// NewWatcher returns a Watcher on the provided client or an error. The cancel
@@ -78,6 +79,24 @@ func (w *Watcher) SetProgress(value bool) {
7879
w.mu.Unlock()
7980
}
8081

82+
// AllowNamespaces restricts the watcher to only consider pods in the provided namespaces.
83+
// If no namespaces are provided, all namespaces are considered.
84+
func (w *Watcher) AllowNamespaces(namespaces ...string) {
85+
w.mu.Lock()
86+
defer w.mu.Unlock()
87+
if len(namespaces) == 0 {
88+
w.allowedNS = nil
89+
return
90+
}
91+
w.allowedNS = make(map[string]struct{}, len(namespaces))
92+
for _, ns := range namespaces {
93+
if ns == "" {
94+
continue
95+
}
96+
w.allowedNS[ns] = struct{}{}
97+
}
98+
}
99+
81100
func (w *Watcher) stop() {
82101
w.mu.Lock()
83102
stop := w.wstop
@@ -128,6 +147,16 @@ func (w *Watcher) display(format string, v ...any) {
128147
}
129148

130149
func (w *Watcher) updatePod(s *PodStatus) bool {
150+
// If allowed namespaces are configured, ignore pods outside them.
151+
w.mu.Lock()
152+
allowed := w.allowedNS
153+
w.mu.Unlock()
154+
if allowed != nil {
155+
if _, ok := allowed[s.Namespace]; !ok {
156+
return true
157+
}
158+
}
159+
131160
newNamespace := s.Namespace != w.currentNamespace
132161
var newState string
133162

topo/node/drivenets/drivenets.go

Lines changed: 247 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,19 @@ import (
2626
"os"
2727
"path/filepath"
2828
"strings"
29+
"time"
2930

3031
"github.com/drivenets/cdnos-controller/api/v1/clientset"
3132
"github.com/openconfig/kne/topo/node"
3233
"google.golang.org/grpc/codes"
3334
"google.golang.org/grpc/status"
3435
"google.golang.org/protobuf/proto"
36+
"k8s.io/client-go/kubernetes"
3537
"k8s.io/client-go/rest"
3638

3739
cdnosv1 "github.com/drivenets/cdnos-controller/api/v1"
3840
corev1 "k8s.io/api/core/v1"
41+
apierrors "k8s.io/apimachinery/pkg/api/errors"
3942
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
4043
log "k8s.io/klog/v2"
4144

@@ -236,9 +239,108 @@ func (n *Node) cdnosCreate(ctx context.Context) error {
236239
if _, err := cs.CdnosV1alpha1().Cdnoss(n.Namespace).Create(ctx, dut, metav1.CreateOptions{}); err != nil {
237240
return fmt.Errorf("failed to create cdnos: %v", err)
238241
}
242+
// Ensure the controller-created Service has required Azure LB annotations when on AKS.
243+
// Creation will fail if annotations cannot be applied within the timeout.
244+
if err := n.annotateCdnosService(ctx); err != nil {
245+
return fmt.Errorf("failed to annotate service for %s: %v", n.Name(), err)
246+
}
239247
return nil
240248
}
241249

250+
// annotateCdnosService waits for the controller-created Service named "service-<node>"
251+
// and adds Azure LoadBalancer annotations required by the user.
252+
func (n *Node) annotateCdnosService(ctx context.Context) error {
253+
if !isAzureAKS(n.KubeClient) {
254+
log.V(1).Infof("Azure AKS not detected; skipping service annotation for %q", n.Name())
255+
return nil
256+
}
257+
log.Infof("Azure AKS detected; annotating controller-managed Services for %q", n.Name())
258+
deadline := time.Now().Add(10 * time.Minute)
259+
desired := map[string]string{
260+
"service.beta.kubernetes.io/azure-load-balancer-internal": "true",
261+
}
262+
// Build no-probe rules from this node's services (outside ports).
263+
for port := range n.Proto.Services {
264+
key := fmt.Sprintf("service.beta.kubernetes.io/port_%d_no_probe_rule", port)
265+
desired[key] = "true"
266+
}
267+
for {
268+
if time.Now().After(deadline) {
269+
return fmt.Errorf("timeout waiting to annotate services for %q", n.Name())
270+
}
271+
svcs, err := n.servicesForNode(ctx)
272+
if err != nil || len(svcs) == 0 {
273+
time.Sleep(1 * time.Second)
274+
continue
275+
}
276+
allAnnotated := true
277+
for i := range svcs {
278+
s := &svcs[i]
279+
changed := false
280+
if s.Annotations == nil {
281+
s.Annotations = map[string]string{}
282+
changed = true
283+
}
284+
for k, v := range desired {
285+
if s.Annotations[k] != v {
286+
s.Annotations[k] = v
287+
changed = true
288+
}
289+
}
290+
if changed {
291+
// Use a short-lived background context to avoid parent ctx cancellations.
292+
updateCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
293+
_, err := n.KubeClient.CoreV1().Services(n.Namespace).Update(updateCtx, s, metav1.UpdateOptions{})
294+
cancel()
295+
if err != nil {
296+
// Retry once on conflict with a fresh GET
297+
if apierrors.IsConflict(err) {
298+
getCtx, cancelGet := context.WithTimeout(context.Background(), 5*time.Second)
299+
fresh, gerr := n.KubeClient.CoreV1().Services(n.Namespace).Get(getCtx, s.Name, metav1.GetOptions{})
300+
cancelGet()
301+
if gerr == nil {
302+
if fresh.Annotations == nil {
303+
fresh.Annotations = map[string]string{}
304+
}
305+
for k, v := range desired {
306+
fresh.Annotations[k] = v
307+
}
308+
updateCtx2, cancelUpd2 := context.WithTimeout(context.Background(), 5*time.Second)
309+
_, uerr := n.KubeClient.CoreV1().Services(n.Namespace).Update(updateCtx2, fresh, metav1.UpdateOptions{})
310+
cancelUpd2()
311+
if uerr == nil {
312+
log.Infof("Annotated Service %q with Azure LB annotations (after conflict retry)", s.Name)
313+
continue
314+
}
315+
}
316+
}
317+
allAnnotated = false
318+
continue
319+
}
320+
log.Infof("Annotated Service %q with Azure LB annotations", s.Name)
321+
}
322+
// Verify
323+
getCtx, cancelGet := context.WithTimeout(context.Background(), 5*time.Second)
324+
got, err := n.KubeClient.CoreV1().Services(n.Namespace).Get(getCtx, s.Name, metav1.GetOptions{})
325+
cancelGet()
326+
if err != nil {
327+
allAnnotated = false
328+
continue
329+
}
330+
for k, v := range desired {
331+
if got.Annotations[k] != v {
332+
allAnnotated = false
333+
break
334+
}
335+
}
336+
}
337+
if allAnnotated {
338+
return nil
339+
}
340+
time.Sleep(500 * time.Millisecond)
341+
}
342+
}
343+
242344
func (n *Node) Status(ctx context.Context) (node.Status, error) {
243345
if n.Impl.Proto.Model != modelCdnos {
244346
return node.StatusUnknown, fmt.Errorf("invalid model specified")
@@ -279,7 +381,111 @@ func (n *Node) cdnosDelete(ctx context.Context) error {
279381
if err != nil {
280382
return err
281383
}
282-
return cs.CdnosV1alpha1().Cdnoss(n.Namespace).Delete(ctx, n.Name(), metav1.DeleteOptions{})
384+
// 1) Start teardown by deleting all Cdnos CRs in the namespace
385+
// (controller will clean up owned objects for each).
386+
list, err := cs.CdnosV1alpha1().Cdnoss(n.Namespace).List(ctx, metav1.ListOptions{})
387+
if err != nil {
388+
return err
389+
}
390+
if len(list.Items) == 0 {
391+
log.V(1).Infof("No Cdnos CRs found in namespace %q", n.Namespace)
392+
} else {
393+
var crNames []string
394+
for _, item := range list.Items {
395+
crNames = append(crNames, item.Name)
396+
}
397+
log.Infof("Deleting Cdnos CRs in %q: %v", n.Namespace, crNames)
398+
for _, item := range list.Items {
399+
if err := cs.CdnosV1alpha1().Cdnoss(n.Namespace).Delete(ctx, item.Name, metav1.DeleteOptions{}); err != nil {
400+
return err
401+
}
402+
}
403+
}
404+
405+
// 2) Monitor Services associated with this node until the controller removes them.
406+
svcs, _ := n.servicesForNode(ctx)
407+
if len(svcs) == 0 {
408+
log.V(1).Infof("No Services found for node %q", n.Name())
409+
} else {
410+
var svcNames []string
411+
for _, s := range svcs {
412+
svcNames = append(svcNames, s.Name)
413+
}
414+
log.Infof("Monitoring Services for %q to be removed by controller: %v", n.Name(), svcNames)
415+
}
416+
// Wait for Services to be removed (longer on AKS due to LoadBalancer cleanup).
417+
waitDeadline := time.Now().Add(2 * time.Minute)
418+
if isAzureAKS(n.KubeClient) {
419+
waitDeadline = time.Now().Add(10 * time.Minute)
420+
log.Infof("AKS detected; waiting up to %v for all Services to be removed", time.Until(waitDeadline).Truncate(time.Second))
421+
} else {
422+
log.V(1).Infof("Azure AKS not detected; waiting up to %v for all Services to be removed", time.Until(waitDeadline).Truncate(time.Second))
423+
}
424+
start := time.Now()
425+
ticker := time.NewTicker(10 * time.Second)
426+
defer ticker.Stop()
427+
for {
428+
if time.Now().After(waitDeadline) {
429+
log.Warningf("Timeout waiting for Services removal; continuing teardown")
430+
break
431+
}
432+
svcs, _ = n.servicesForNode(ctx)
433+
remaining := len(svcs)
434+
if remaining == 0 {
435+
log.Infof("All Services for %q removed after %v", n.Name(), time.Since(start).Truncate(time.Second))
436+
break
437+
}
438+
select {
439+
case <-ticker.C:
440+
var names []string
441+
for _, s := range svcs {
442+
names = append(names, s.Name)
443+
}
444+
log.Infof("Waiting for Services removal for %q (%d remaining: %v, %v elapsed)", n.Name(), remaining, names, time.Since(start).Truncate(time.Second))
445+
default:
446+
}
447+
time.Sleep(2 * time.Second)
448+
}
449+
return nil
450+
}
451+
452+
// servicesForNode lists Services in the namespace that are associated with this node.
453+
// It matches by:
454+
// - name equals "service-<node>"
455+
// - label "name" equals node name (per controller)
456+
// - selector app == node name
457+
// - ownerReference is Cdnos/<node>
458+
func (n *Node) servicesForNode(ctx context.Context) ([]corev1.Service, error) {
459+
// Use a short-lived background context for API calls to avoid parent ctx deadline cancellations.
460+
listCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
461+
defer cancel()
462+
list, err := n.KubeClient.CoreV1().Services(n.Namespace).List(listCtx, metav1.ListOptions{})
463+
if err != nil {
464+
return nil, err
465+
}
466+
var out []corev1.Service
467+
wantName := fmt.Sprintf("service-%s", n.Name())
468+
for _, s := range list.Items {
469+
if s.Name == wantName {
470+
out = append(out, s)
471+
continue
472+
}
473+
if s.Labels["name"] == n.Name() {
474+
out = append(out, s)
475+
continue
476+
}
477+
if s.Spec.Selector != nil && s.Spec.Selector["app"] == n.Name() {
478+
out = append(out, s)
479+
continue
480+
}
481+
for _, or := range s.OwnerReferences {
482+
if or.Kind == "Cdnos" && or.Name == n.Name() {
483+
out = append(out, s)
484+
break
485+
}
486+
}
487+
}
488+
return out, nil
283489
}
284490

285491
func (n *Node) ResetCfg(ctx context.Context) error {
@@ -361,6 +567,46 @@ func init() {
361567
node.Vendor(tpb.Vendor_DRIVENETS, New)
362568
}
363569

570+
// isAzureAKS attempts to detect whether the current cluster is Azure AKS.
571+
// It returns true if any node has a providerID starting with "azure://"
572+
// or has any label prefixed with "kubernetes.azure.com/".
573+
func isAzureAKS(k kubernetes.Interface) bool {
574+
// Allow manual override for environments where listing nodes is restricted.
575+
if v := os.Getenv("KNE_FORCE_AKS"); v == "1" || strings.ToLower(v) == "true" {
576+
log.V(1).Infof("AKS detection overridden via KNE_FORCE_AKS")
577+
return true
578+
}
579+
if v := os.Getenv("KNE_FORCE_AZURE_ANNOTATIONS"); v == "1" || strings.ToLower(v) == "true" {
580+
log.V(1).Infof("AKS detection overridden via KNE_FORCE_AZURE_ANNOTATIONS")
581+
return true
582+
}
583+
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
584+
defer cancel()
585+
nodes, err := k.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
586+
if err != nil {
587+
log.V(1).Infof("AKS detection: failed to list nodes: %v", err)
588+
return false
589+
}
590+
if len(nodes.Items) == 0 {
591+
log.V(1).Infof("AKS detection: no nodes found in cluster")
592+
return false
593+
}
594+
for _, n := range nodes.Items {
595+
if strings.HasPrefix(n.Spec.ProviderID, "azure://") {
596+
log.V(1).Infof("AKS detection: node %q providerID %q indicates Azure", n.Name, n.Spec.ProviderID)
597+
return true
598+
}
599+
for key := range n.Labels {
600+
if strings.HasPrefix(key, "kubernetes.azure.com/") {
601+
log.V(1).Infof("AKS detection: node %q has Azure label %q", n.Name, key)
602+
return true
603+
}
604+
}
605+
}
606+
log.V(1).Infof("AKS detection: no Azure providerID or labels found on any node")
607+
return false
608+
}
609+
364610
func (n *Node) CreateConfig(ctx context.Context) (*corev1.Volume, error) {
365611
pb := n.Proto
366612
var data []byte

topo/topo.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,8 @@ func (m *Manager) Create(ctx context.Context, timeout time.Duration) (rerr error
270270
log.Warningf("Failed to start pod watcher: %v", err)
271271
} else {
272272
w.SetProgress(m.progress)
273+
// Only watch pods in this topology's namespace to avoid unrelated failures.
274+
w.AllowNamespaces(m.topo.Name)
273275
defer func() {
274276
cancel()
275277
rerr = w.Cleanup(rerr)

0 commit comments

Comments
 (0)