-
Notifications
You must be signed in to change notification settings - Fork 41.6k
[FG:InPlacePodVerticalScaling] Add Pod resize complete event #130387
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,12 +24,14 @@ import ( | |
| "sync" | ||
| "time" | ||
|
|
||
| "encoding/json" | ||
| v1 "k8s.io/api/core/v1" | ||
| apiequality "k8s.io/apimachinery/pkg/api/equality" | ||
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
| "k8s.io/apimachinery/pkg/types" | ||
| "k8s.io/apimachinery/pkg/util/sets" | ||
| utilfeature "k8s.io/apiserver/pkg/util/feature" | ||
| "k8s.io/client-go/tools/record" | ||
| resourcehelper "k8s.io/component-helpers/resource" | ||
| "k8s.io/klog/v2" | ||
| podutil "k8s.io/kubernetes/pkg/api/v1/pod" | ||
|
|
@@ -40,6 +42,7 @@ import ( | |
| "k8s.io/kubernetes/pkg/kubelet/cm" | ||
| "k8s.io/kubernetes/pkg/kubelet/config" | ||
| kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" | ||
| "k8s.io/kubernetes/pkg/kubelet/events" | ||
| "k8s.io/kubernetes/pkg/kubelet/lifecycle" | ||
| "k8s.io/kubernetes/pkg/kubelet/status" | ||
| kubetypes "k8s.io/kubernetes/pkg/kubelet/types" | ||
|
|
@@ -129,6 +132,8 @@ type manager struct { | |
|
|
||
| allocationMutex sync.Mutex | ||
| podsWithPendingResizes []types.UID | ||
|
|
||
| recorder record.EventRecorder | ||
| } | ||
|
|
||
| func NewManager(checkpointDirectory string, | ||
|
|
@@ -138,6 +143,7 @@ func NewManager(checkpointDirectory string, | |
| getActivePods func() []*v1.Pod, | ||
| getPodByUID func(types.UID) (*v1.Pod, bool), | ||
| sourcesReady config.SourcesReady, | ||
| recorder record.EventRecorder, | ||
| ) Manager { | ||
| return &manager{ | ||
| allocated: newStateImpl(checkpointDirectory, allocatedPodsStateFile), | ||
|
|
@@ -152,9 +158,21 @@ func NewManager(checkpointDirectory string, | |
| triggerPodSync: triggerPodSync, | ||
| getActivePods: getActivePods, | ||
| getPodByUID: getPodByUID, | ||
| recorder: recorder, | ||
| } | ||
| } | ||
|
|
||
| type containerAllocation struct { | ||
| Name string `json:"name"` | ||
| Resources v1.ResourceRequirements `json:"resources,omitempty"` | ||
| } | ||
|
|
||
| type podResourceSummary struct { | ||
| //TODO: resources v1.ResourceRequirements, add pod-level resources here once resizing pod-level resources is supported | ||
| InitContainers []containerAllocation `json:"initContainers,omitempty"` | ||
| Containers []containerAllocation `json:"containers,omitempty"` | ||
| } | ||
|
|
||
| func newStateImpl(checkpointDirectory, checkpointName string) state.State { | ||
| if !utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { | ||
| return state.NewNoopStateCheckpoint() | ||
|
|
@@ -214,6 +232,33 @@ func (m *manager) Run(ctx context.Context) { | |
| }() | ||
| } | ||
|
|
||
| // Gernerate pod resize completed event message | ||
| func (m *manager) podResizeCompletionMsg(allocatedPod *v1.Pod) string { | ||
| podResizeSource := &podResourceSummary{} | ||
| podutil.VisitContainers(&allocatedPod.Spec, podutil.InitContainers|podutil.Containers, | ||
| func(allocatedContainer *v1.Container, containerType podutil.ContainerType) bool { | ||
| allocation := containerAllocation{ | ||
| Name: allocatedContainer.Name, | ||
| Resources: allocatedContainer.Resources, | ||
| } | ||
| switch containerType { | ||
| case podutil.InitContainers: | ||
| podResizeSource.InitContainers = append(podResizeSource.InitContainers, allocation) | ||
| case podutil.Containers: | ||
| podResizeSource.Containers = append(podResizeSource.Containers, allocation) | ||
| } | ||
| return true | ||
| }) | ||
|
|
||
| podResizeMsgDetailsJSON, err := json.Marshal(podResizeSource) | ||
| if err != nil { | ||
| klog.ErrorS(err, "Failed to serialize resource summary", "pod", format.Pod(allocatedPod)) | ||
| return "Pod resize completed" | ||
| } | ||
| podResizeCompletedMsg := fmt.Sprintf("Pod resize completed: %s", string(podResizeMsgDetailsJSON)) | ||
| return podResizeCompletedMsg | ||
| } | ||
|
|
||
| func (m *manager) RetryPendingResizes() []*v1.Pod { | ||
| m.allocationMutex.Lock() | ||
| defer m.allocationMutex.Unlock() | ||
|
|
@@ -709,6 +754,11 @@ func (m *manager) CheckPodResizeInProgress(allocatedPod *v1.Pod, podStatus *kube | |
| m.statusManager.SetPodResizeInProgressCondition(allocatedPod.UID, "", "", allocatedPod.Generation) | ||
| } else if m.statusManager.ClearPodResizeInProgressCondition(allocatedPod.UID) { | ||
| // (Allocated == Actual) => clear the resize in-progress status. | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe not for this PR (unless you find it quick and easy to do this here), but perhaps as a follow-up, I think it would be useful for us to change Ideally, we could update this test to call
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Very happy to get your quick and detailed review comments. Thanks a lot, I modify code as below:
In TestCheckPodResizeInProgress, the logic is:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Sorry, I'm not understanding this part. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Resource update sequence is Spec-> allocated -> actuated, actuated resource is actually applied to container, that's I say actuated resource reflective of the actual resized resources~ |
||
| // Generate Pod resize completed event | ||
| podResizeCompletedEventMsg := m.podResizeCompletionMsg(allocatedPod) | ||
| if m.recorder != nil { | ||
| m.recorder.Eventf(allocatedPod, v1.EventTypeNormal, events.ResizeCompleted, podResizeCompletedEventMsg) | ||
| } | ||
| // TODO(natasha41575): We only need to make this call if any of the resources were decreased. | ||
| m.RetryPendingResizes() | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,6 +31,7 @@ import ( | |
| "k8s.io/apimachinery/pkg/util/sets" | ||
| utilfeature "k8s.io/apiserver/pkg/util/feature" | ||
| "k8s.io/client-go/kubernetes/fake" | ||
| "k8s.io/client-go/tools/record" | ||
| featuregatetesting "k8s.io/component-base/featuregate/testing" | ||
| "k8s.io/kubernetes/pkg/features" | ||
| "k8s.io/kubernetes/pkg/kubelet/allocation/state" | ||
|
|
@@ -185,7 +186,16 @@ func TestUpdatePodFromAllocation(t *testing.T) { | |
| } | ||
| } | ||
|
|
||
| func TestIsPodResizeInProgress(t *testing.T) { | ||
| func getEventsFromFakeRecorder(t *testing.T, am Manager) string { | ||
| select { | ||
| case e := <-am.(*manager).recorder.(*record.FakeRecorder).Events: | ||
| return e | ||
| default: | ||
| return "" | ||
| } | ||
| } | ||
|
|
||
| func TestCheckPodResizeInProgress(t *testing.T) { | ||
| type testResources struct { | ||
| cpuReq, cpuLim, memReq, memLim int64 | ||
| } | ||
|
|
@@ -198,72 +208,83 @@ func TestIsPodResizeInProgress(t *testing.T) { | |
| } | ||
|
|
||
| tests := []struct { | ||
| name string | ||
| containers []testContainer | ||
| expectHasResize bool | ||
| name string | ||
| containers []testContainer | ||
| oldPodResizeInProgressCondition bool | ||
| expectHasResize bool | ||
| expectPodResizeCompletedMsg string | ||
| }{{ | ||
| name: "simple running container", | ||
| containers: []testContainer{{ | ||
| allocated: testResources{100, 100, 100, 100}, | ||
| actuated: &testResources{100, 100, 100, 100}, | ||
| isRunning: true, | ||
| }}, | ||
| expectHasResize: false, | ||
| oldPodResizeInProgressCondition: true, | ||
| expectHasResize: false, | ||
| expectPodResizeCompletedMsg: `Normal ResizeCompleted Pod resize completed: {"containers":[{"name":"c0","resources":{"limits":{"cpu":"100m","memory":"100"},"requests":{"cpu":"100m","memory":"100"}}}]}`, | ||
| }, { | ||
| name: "simple unstarted container", | ||
| containers: []testContainer{{ | ||
| allocated: testResources{100, 100, 100, 100}, | ||
| unstarted: true, | ||
| }}, | ||
| expectHasResize: false, | ||
| oldPodResizeInProgressCondition: false, | ||
| expectHasResize: false, | ||
| }, { | ||
| name: "simple resized container/cpu req", | ||
| containers: []testContainer{{ | ||
| allocated: testResources{100, 200, 100, 200}, | ||
| actuated: &testResources{150, 200, 100, 200}, | ||
| isRunning: true, | ||
| }}, | ||
| expectHasResize: true, | ||
| oldPodResizeInProgressCondition: false, | ||
| expectHasResize: true, | ||
| }, { | ||
| name: "simple resized container/cpu limit", | ||
| containers: []testContainer{{ | ||
| allocated: testResources{100, 200, 100, 200}, | ||
| actuated: &testResources{100, 300, 100, 200}, | ||
| isRunning: true, | ||
| }}, | ||
| expectHasResize: true, | ||
| oldPodResizeInProgressCondition: false, | ||
| expectHasResize: true, | ||
| }, { | ||
| name: "simple resized container/mem req", | ||
| containers: []testContainer{{ | ||
| allocated: testResources{100, 200, 100, 200}, | ||
| actuated: &testResources{100, 200, 150, 200}, | ||
| isRunning: true, | ||
| }}, | ||
| expectHasResize: true, | ||
| oldPodResizeInProgressCondition: false, | ||
| expectHasResize: true, | ||
| }, { | ||
| name: "simple resized container/cpu+mem req", | ||
| containers: []testContainer{{ | ||
| allocated: testResources{100, 200, 100, 200}, | ||
| actuated: &testResources{150, 200, 150, 200}, | ||
| isRunning: true, | ||
| }}, | ||
| expectHasResize: true, | ||
| oldPodResizeInProgressCondition: false, | ||
| expectHasResize: true, | ||
| }, { | ||
| name: "simple resized container/mem limit", | ||
| containers: []testContainer{{ | ||
| allocated: testResources{100, 200, 100, 200}, | ||
| actuated: &testResources{100, 200, 100, 300}, | ||
| isRunning: true, | ||
| }}, | ||
| expectHasResize: true, | ||
| oldPodResizeInProgressCondition: false, | ||
| expectHasResize: true, | ||
| }, { | ||
| name: "terminated resized container", | ||
| containers: []testContainer{{ | ||
| allocated: testResources{100, 200, 100, 200}, | ||
| actuated: &testResources{200, 200, 100, 200}, | ||
| isRunning: false, | ||
| }}, | ||
| expectHasResize: false, | ||
| oldPodResizeInProgressCondition: false, | ||
| expectHasResize: false, | ||
| }, { | ||
| name: "non-sidecar init container", | ||
| containers: []testContainer{{ | ||
|
|
@@ -275,7 +296,8 @@ func TestIsPodResizeInProgress(t *testing.T) { | |
| actuated: &testResources{100, 200, 100, 200}, | ||
| isRunning: true, | ||
| }}, | ||
| expectHasResize: false, | ||
| oldPodResizeInProgressCondition: false, | ||
| expectHasResize: false, | ||
| }, { | ||
| name: "non-resized sidecar", | ||
| containers: []testContainer{{ | ||
|
|
@@ -288,7 +310,8 @@ func TestIsPodResizeInProgress(t *testing.T) { | |
| actuated: &testResources{100, 200, 100, 200}, | ||
| isRunning: true, | ||
| }}, | ||
| expectHasResize: false, | ||
| oldPodResizeInProgressCondition: false, | ||
| expectHasResize: false, | ||
| }, { | ||
| name: "resized sidecar", | ||
| containers: []testContainer{{ | ||
|
|
@@ -301,7 +324,8 @@ func TestIsPodResizeInProgress(t *testing.T) { | |
| actuated: &testResources{100, 200, 100, 200}, | ||
| isRunning: true, | ||
| }}, | ||
| expectHasResize: true, | ||
| oldPodResizeInProgressCondition: false, | ||
| expectHasResize: true, | ||
| }, { | ||
| name: "several containers and a resize", | ||
| containers: []testContainer{{ | ||
|
|
@@ -320,31 +344,55 @@ func TestIsPodResizeInProgress(t *testing.T) { | |
| actuated: &testResources{200, 200, 100, 200}, // Resized | ||
| isRunning: true, | ||
| }}, | ||
| expectHasResize: true, | ||
| oldPodResizeInProgressCondition: false, | ||
| expectHasResize: true, | ||
| }, { | ||
| name: "several containers", | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for this case, omit some resources in each container, e.g.
it would also be good to add a test with init containers. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Alternatively, consider adding a separate test for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In order to simplify modifications, I just changed "several container" case to cover these three cases, both have init and normal container. I think it could cover the test case for emitPodResizeCompletionEvent. If there is other request |
||
| containers: []testContainer{{ | ||
| allocated: testResources{cpuReq: 100, cpuLim: 200}, | ||
| actuated: &testResources{cpuReq: 100, cpuLim: 200}, | ||
| sidecar: true, | ||
| isRunning: true, | ||
| }, { | ||
| allocated: testResources{memReq: 100, memLim: 200}, | ||
| actuated: &testResources{memReq: 100, memLim: 200}, | ||
| isRunning: true, | ||
| }, { | ||
| allocated: testResources{cpuReq: 200, memReq: 100}, | ||
| actuated: &testResources{cpuReq: 200, memReq: 100}, | ||
| isRunning: true, | ||
| }}, | ||
| oldPodResizeInProgressCondition: true, | ||
| expectHasResize: false, | ||
| expectPodResizeCompletedMsg: `Normal ResizeCompleted Pod resize completed: {"initContainers":[{"name":"c0","resources":{"limits":{"cpu":"200m"},"requests":{"cpu":"100m"}}}],"containers":[{"name":"c1","resources":{"limits":{"memory":"200"},"requests":{"memory":"100"}}},{"name":"c2","resources":{"requests":{"cpu":"200m","memory":"100"}}}]}`, | ||
| }, { | ||
| name: "best-effort pod", | ||
| containers: []testContainer{{ | ||
| allocated: testResources{}, | ||
| actuated: &testResources{}, | ||
| isRunning: true, | ||
| }}, | ||
| expectHasResize: false, | ||
| oldPodResizeInProgressCondition: true, | ||
| expectHasResize: false, | ||
| expectPodResizeCompletedMsg: `Normal ResizeCompleted Pod resize completed: {"containers":[{"name":"c0","resources":{}}]}`, | ||
| }, { | ||
| name: "burstable pod/not resizing", | ||
| containers: []testContainer{{ | ||
| allocated: testResources{cpuReq: 100}, | ||
| actuated: &testResources{cpuReq: 100}, | ||
| isRunning: true, | ||
| }}, | ||
| expectHasResize: false, | ||
| oldPodResizeInProgressCondition: false, | ||
| expectHasResize: false, | ||
| }, { | ||
| name: "burstable pod/resized", | ||
| containers: []testContainer{{ | ||
| allocated: testResources{cpuReq: 100}, | ||
| actuated: &testResources{cpuReq: 500}, | ||
| isRunning: true, | ||
| }}, | ||
| expectHasResize: true, | ||
| oldPodResizeInProgressCondition: false, | ||
| expectHasResize: true, | ||
| }} | ||
|
|
||
| mkRequirements := func(r testResources) v1.ResourceRequirements { | ||
|
|
@@ -431,8 +479,32 @@ func TestIsPodResizeInProgress(t *testing.T) { | |
| } | ||
| require.NoError(t, am.SetAllocatedResources(pod)) | ||
|
|
||
| hasResizedResources := am.(*manager).isPodResizeInProgress(pod, podStatus) | ||
| require.Equal(t, test.expectHasResize, hasResizedResources, "hasResizedResources") | ||
| am.(*manager).recorder = record.NewFakeRecorder(200) | ||
|
|
||
| // Set old Pod condition as Inprogress, so that ClearPodResizeInProgressCondition is true and emit resize completed event | ||
| if test.oldPodResizeInProgressCondition { | ||
| am.(*manager).statusManager.SetPodResizeInProgressCondition(pod.UID, "", "", int64(1)) | ||
| } | ||
|
|
||
| am.CheckPodResizeInProgress(pod, podStatus) | ||
|
|
||
| // Verify pod resize completed event is emitted | ||
| podResizeCompletionEvent := getEventsFromFakeRecorder(t, am) | ||
| assert.Equal(t, test.expectPodResizeCompletedMsg, podResizeCompletionEvent) | ||
|
|
||
| if test.expectHasResize { | ||
| // Verify the status manager has the InProgress condition set on it | ||
| gotResizeConditions := am.(*manager).statusManager.GetPodResizeConditions(pod.UID) | ||
| for _, c := range gotResizeConditions { | ||
| require.Equal(t, v1.PodResizeInProgress, c.Type, "ResizeConditions Type should be PodResizeInProgress") | ||
| require.Empty(t, c.Reason, "ResizeConditions Error") | ||
| require.Empty(t, c.Message, "ResizeConditions Message") | ||
| } | ||
| } else { | ||
| // Verify pod resize Inprogress condition is cleared | ||
| gotResizeConditions := am.(*manager).statusManager.GetPodResizeConditions(pod.UID) | ||
| require.Empty(t, gotResizeConditions, "ResizeConditions Error") | ||
| } | ||
| }) | ||
| } | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.