Skip to content

fix: reject scale down request if there is already an in-process request in the same rollout group #254

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pkg/admission/prep_downscale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ type templateParams struct {
DownScalePortKey string
DownScalePort string
DownScaleLabelKey string
RolloutGroup string
}

type fakeHttpClient struct {
Expand Down Expand Up @@ -214,6 +215,7 @@ func testPrepDownscaleWebhook(t *testing.T, oldReplicas, newReplicas int, option
DownScalePortKey: config.PrepareDownscalePortAnnotationKey,
DownScalePort: u.Port(),
DownScaleLabelKey: config.PrepareDownscaleLabelKey,
RolloutGroup: "ingester",
}

newParams := templateParams{
Expand All @@ -223,6 +225,7 @@ func testPrepDownscaleWebhook(t *testing.T, oldReplicas, newReplicas int, option
DownScalePortKey: config.PrepareDownscalePortAnnotationKey,
DownScalePort: u.Port(),
DownScaleLabelKey: config.PrepareDownscaleLabelKey,
RolloutGroup: "ingester",
}

rawObject, err := statefulSetTemplate(newParams)
Expand Down Expand Up @@ -618,7 +621,7 @@ metadata:
name: web
labels:
{{.DownScaleLabelKey}}: "true"
rollout-group: "ingester"
rollout-group: "{{.RolloutGroup}}"
annotations:
{{.DownScalePathKey}}: {{.DownScalePath}}
{{.DownScalePortKey}}: "{{.DownScalePort}}"
Expand Down Expand Up @@ -686,6 +689,7 @@ func testPrepDownscaleWebhookWithZoneTracker(t *testing.T, oldReplicas, newRepli
DownScalePortKey: config.PrepareDownscalePortAnnotationKey,
DownScalePort: u.Port(),
DownScaleLabelKey: config.PrepareDownscaleLabelKey,
RolloutGroup: "ingester",
}

newParams := templateParams{
Expand All @@ -695,6 +699,7 @@ func testPrepDownscaleWebhookWithZoneTracker(t *testing.T, oldReplicas, newRepli
DownScalePortKey: config.PrepareDownscalePortAnnotationKey,
DownScalePort: u.Port(),
DownScaleLabelKey: config.PrepareDownscaleLabelKey,
RolloutGroup: "ingester",
}

rawObject, err := statefulSetTemplate(newParams)
Expand Down
13 changes: 13 additions & 0 deletions pkg/admission/zone_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type zoneTracker struct {
client kubernetes.Interface
namespace string
configMapName string

rolloutGroupDownscalingInProgress sync.Map
}

type zoneInfo struct {
Expand Down Expand Up @@ -156,6 +158,14 @@ func (zt *zoneTracker) prepareDownscale(ctx context.Context, l log.Logger, ar ad
level.Warn(logger).Log("msg", msg)
return deny(msg)
}
lockedFor, alreadyLocked := zt.rolloutGroupDownscalingInProgress.LoadOrStore(stsPrepareInfo.rolloutGroup, ar.Request.Name)
if alreadyLocked {
msg := fmt.Sprintf("downscale of %s/%s in %s from %d to %d replicas is not allowed because statefulset %s is already in process of updating replicas",
ar.Request.Resource.Resource, ar.Request.Name, ar.Request.Namespace, *oldInfo.replicas, *newInfo.replicas, lockedFor)
level.Warn(logger).Log("msg", msg)
return deny(msg)
}
defer zt.rolloutGroupDownscalingInProgress.Delete(stsPrepareInfo.rolloutGroup)
}

// It's a downscale, so we need to prepare the pods that are going away for shutdown.
Expand Down Expand Up @@ -267,6 +277,9 @@ func (zt *zoneTracker) getOrCreateConfigMap(ctx context.Context, stsList *appsv1

// Load the zones from the zoneTracker ConfigMap into the zones map
func (zt *zoneTracker) loadZones(ctx context.Context, stsList *appsv1.StatefulSetList) error {
zt.mu.Lock()
defer zt.mu.Unlock()

cm, err := zt.getOrCreateConfigMap(ctx, stsList)
if err != nil {
return err
Expand Down
156 changes: 156 additions & 0 deletions pkg/admission/zone_tracker_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
package admission

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/http/httptest"
"net/url"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"
admissionv1 "k8s.io/api/admission/v1"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"

"github.com/grafana/rollout-operator/pkg/config"
Expand Down Expand Up @@ -286,3 +295,150 @@ func TestLastDownscaledNonExistentZone(t *testing.T) {
t.Errorf("lastDownscaled did not handle non-existent zone correctly")
}
}

func TestZoneTrackerConcurrentDownscale(t *testing.T) {
f := newFakeHttpClient(func(r *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewBuffer([]byte(""))),
}, nil
})

logger := newDebugLogger()

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
defer ts.Close()

u, err := url.Parse(ts.URL)
require.NoError(t, err)
require.NotEmpty(t, u.Port())

path := "/prepare-downscale"
rolloutGroupIngester := "ingester"
ingesterZoneA := "ingester-zone-a"
ingesterZoneB := "ingester-zone-b"
rolloutGroupIndexGateway := "index-gateway"
indexGatewayZoneA := "index-gateway-zone-a"

namespace := "test"
dryRun := false
buildAdmissionRequest := func(rolloutGroup string, stsName string) admissionv1.AdmissionReview {
oldParams := templateParams{
Replicas: 5,
DownScalePathKey: config.PrepareDownscalePathAnnotationKey,
DownScalePath: path,
DownScalePortKey: config.PrepareDownscalePortAnnotationKey,
DownScalePort: u.Port(),
DownScaleLabelKey: config.PrepareDownscaleLabelKey,
RolloutGroup: rolloutGroup,
}

newParams := templateParams{
Replicas: 2,
DownScalePathKey: config.PrepareDownscalePathAnnotationKey,
DownScalePath: path,
DownScalePortKey: config.PrepareDownscalePortAnnotationKey,
DownScalePort: u.Port(),
DownScaleLabelKey: config.PrepareDownscaleLabelKey,
RolloutGroup: rolloutGroup,
}

rawObject, err := statefulSetTemplate(newParams)
require.NoError(t, err)

oldRawObject, err := statefulSetTemplate(oldParams)
require.NoError(t, err)

return admissionv1.AdmissionReview{
Request: &admissionv1.AdmissionRequest{
Kind: metav1.GroupVersionKind{
Group: "apps",
Version: "v1",
Kind: "Statefulset",
},
Resource: metav1.GroupVersionResource{
Group: "apps",
Version: "v1",
Resource: "statefulsets",
},
Name: stsName,
Namespace: namespace,
Object: runtime.RawExtension{
Raw: rawObject,
},
OldObject: runtime.RawExtension{
Raw: oldRawObject,
},
DryRun: &dryRun,
},
}
}

api := fake.NewSimpleClientset()

zt := newZoneTracker(api, namespace, "testconfigmap")

// block the ingester-zone-a downscale request for rollout group ingester
ingesterZoneAPrepDownscaleDone := make(chan struct{})
ingesterZoneADownscaleInitiated := atomic.Bool{}
go func() {
f := newFakeHttpClient(func(r *http.Request) (*http.Response, error) {
ingesterZoneADownscaleInitiated.Store(true)
<-ingesterZoneAPrepDownscaleDone
return &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewBuffer([]byte(""))),
}, nil
})

ar := buildAdmissionRequest(rolloutGroupIngester, ingesterZoneA)
admissionResponse := zt.prepareDownscale(context.Background(), logger, ar, api, f)
require.True(t, admissionResponse.Allowed, admissionResponse.Result.Message)
}()

// wait for ingester-zone-a downscale request to get accepted before checking rejection of downscale requests for ingester group
require.Eventually(t, func() bool {
return ingesterZoneADownscaleInitiated.Load()
}, time.Second, time.Millisecond)

// ingester-zone-b downscale request for rollout group ingester should get rejected
ar := buildAdmissionRequest(rolloutGroupIngester, ingesterZoneB)
admissionResponse := zt.prepareDownscale(context.Background(), logger, ar, api, f)
require.False(t, admissionResponse.Allowed)
require.Equal(t, "downscale of statefulsets/ingester-zone-b in test from 5 to 2 replicas is not allowed because statefulset ingester-zone-a is already in process of updating replicas", admissionResponse.Result.Message)

// no zones should have been updated
require.NoError(t, zt.loadZones(context.Background(), nil))
require.Len(t, zt.zones, 0)

// while downscale request for group index-gateway should pass
ar = buildAdmissionRequest(rolloutGroupIndexGateway, indexGatewayZoneA)
admissionResponse = zt.prepareDownscale(context.Background(), logger, ar, api, f)
require.True(t, admissionResponse.Allowed)

// only index-gateway-zone-a should have been updated
require.NoError(t, zt.loadZones(context.Background(), nil))
require.Len(t, zt.zones, 1)
_, zoneUpdated := zt.zones[indexGatewayZoneA]
require.True(t, zoneUpdated)

// finishing the in progress downscaling request for rollout group ingester should let new requests to go through
close(ingesterZoneAPrepDownscaleDone)

require.Eventually(t, func() bool {
ar = buildAdmissionRequest(rolloutGroupIngester, ingesterZoneB)
return zt.prepareDownscale(context.Background(), logger, ar, api, f).Allowed
}, 5*time.Second, time.Millisecond)

// index-gateway-zone-a and ingester-zone-(a|b) should have been updated
require.NoError(t, zt.loadZones(context.Background(), nil))
require.Len(t, zt.zones, 3)
_, zoneUpdated = zt.zones[indexGatewayZoneA]
require.True(t, zoneUpdated)
_, zoneUpdated = zt.zones[ingesterZoneA]
require.True(t, zoneUpdated)
_, zoneUpdated = zt.zones[ingesterZoneB]
require.True(t, zoneUpdated)
}