Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions pkg/kubelet/allocation/allocation_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ import (
"sync"
"time"

"encoding/json"
v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
resourcehelper "k8s.io/component-helpers/resource"
"k8s.io/klog/v2"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
Expand All @@ -40,6 +42,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/status"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
Expand Down Expand Up @@ -129,6 +132,8 @@ type manager struct {

allocationMutex sync.Mutex
podsWithPendingResizes []types.UID

recorder record.EventRecorder
}

func NewManager(checkpointDirectory string,
Expand All @@ -138,6 +143,7 @@ func NewManager(checkpointDirectory string,
getActivePods func() []*v1.Pod,
getPodByUID func(types.UID) (*v1.Pod, bool),
sourcesReady config.SourcesReady,
recorder record.EventRecorder,
) Manager {
return &manager{
allocated: newStateImpl(checkpointDirectory, allocatedPodsStateFile),
Expand All @@ -152,9 +158,21 @@ func NewManager(checkpointDirectory string,
triggerPodSync: triggerPodSync,
getActivePods: getActivePods,
getPodByUID: getPodByUID,
recorder: recorder,
}
}

type containerAllocation struct {
Name string `json:"name"`
Resources v1.ResourceRequirements `json:"resources,omitempty"`
}

type podResourceSummary struct {
//TODO: resources v1.ResourceRequirements, add pod-level resources here once resizing pod-level resources is supported
InitContainers []containerAllocation `json:"initContainers,omitempty"`
Containers []containerAllocation `json:"containers,omitempty"`
}

func newStateImpl(checkpointDirectory, checkpointName string) state.State {
if !utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
return state.NewNoopStateCheckpoint()
Expand Down Expand Up @@ -214,6 +232,33 @@ func (m *manager) Run(ctx context.Context) {
}()
}

// Gernerate pod resize completed event message
func (m *manager) podResizeCompletionMsg(allocatedPod *v1.Pod) string {
podResizeSource := &podResourceSummary{}
podutil.VisitContainers(&allocatedPod.Spec, podutil.InitContainers|podutil.Containers,
func(allocatedContainer *v1.Container, containerType podutil.ContainerType) bool {
allocation := containerAllocation{
Name: allocatedContainer.Name,
Resources: allocatedContainer.Resources,
}
switch containerType {
case podutil.InitContainers:
podResizeSource.InitContainers = append(podResizeSource.InitContainers, allocation)
case podutil.Containers:
podResizeSource.Containers = append(podResizeSource.Containers, allocation)
}
return true
})

podResizeMsgDetailsJSON, err := json.Marshal(podResizeSource)
if err != nil {
klog.ErrorS(err, "Failed to serialize resource summary", "pod", format.Pod(allocatedPod))
return "Pod resize completed"
}
podResizeCompletedMsg := fmt.Sprintf("Pod resize completed: %s", string(podResizeMsgDetailsJSON))
return podResizeCompletedMsg
}

func (m *manager) RetryPendingResizes() []*v1.Pod {
m.allocationMutex.Lock()
defer m.allocationMutex.Unlock()
Expand Down Expand Up @@ -709,6 +754,11 @@ func (m *manager) CheckPodResizeInProgress(allocatedPod *v1.Pod, podStatus *kube
m.statusManager.SetPodResizeInProgressCondition(allocatedPod.UID, "", "", allocatedPod.Generation)
} else if m.statusManager.ClearPodResizeInProgressCondition(allocatedPod.UID) {
// (Allocated == Actual) => clear the resize in-progress status.
Copy link
Contributor

@natasha41575 natasha41575 Jul 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe not for this PR (unless you find it quick and easy to do this here), but perhaps as a follow-up, I think it would be useful for us to change TestIsPodResizeInProgress to TestCheckPodResizeInProgress

Ideally, we could update this test to call CheckPodResizeInProgress and verify:

  • For cases where isPodResizeInProgress is true, the InProgress condition gets set appropriately
  • For cases where isPodResizeInProgress is false, the InProgress condition is cleared and the pod resize complete event is emitted

Copy link
Contributor Author

@shiya0705 shiya0705 Jul 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@natasha41575

Very happy to get your quick and detailed review comments. Thanks a lot, I modify code as below:

  1. Changed PodResourceSummary, PodResizeSource, PodResizeCompletedMsg,ContainerAllocation to lower case
  2. Remove PodResizeCompleteEventGen from interface and change its name to emitPodResizeCompletionEvent
  3. Removed non-resizable containers check, I agree with you, my original idea was to reduce the length of prints in a event, but some container's not printing might confuse the user, so I removed it.
  4. For unit test, I moved isPodResizeInProgress and emitPodResizeCompletionEvent to TestCheckPodResizeInProgress and call them separately. podResizeCompletedMsg is an internal variable of a function, call CheckPodResizeInProgress directly is hard to check the result.

In TestCheckPodResizeInProgress, the logic is:

  • For cases where isPodResizeInProgress is true, the InProgress condition gets set appropriately
    (1) If old condition is InProgress , use old pod condition
    (2) If old condition is non-InProgress , set pod condition type as InProgress, set condition Error and message as "".
  • For cases where isPodResizeInProgress is false and conditionCleared is cleared, InProgress condition is cleared and the pod resize complete event is emitted correctly
    If you don't think this modification works as expected, I can simplify the function and test only the isPodResizeInProgress and emitPodResizeCompletionEvent functions.
  1. "I think you can just use allocatedContainer.Resources directly (though please double check me on this)"
  • About this my consideration is, the actuatedResources is updated after the allocatedContainer, and is more reflective of the actual resized resources.
    -But it seems use allocatedContainer and actuatedResources may not significantly influence.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About this my consideration is, the actuatedResources is updated after the allocatedContainer, and is more reflective of the actual resized resources.

Sorry, I'm not understanding this part. actuatedResources should == allocatedContainer.Resources; otherwise the resize should still be in progress and is not complete? Are they different for some reason?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resource update sequence is Spec-> allocated -> actuated, actuated resource is actually applied to container, that's I say actuated resource reflective of the actual resized resources~
But in this case actuatedResources should == allocatedContainer.Resources, they are same.
In the new code, I get resource from allocated container resource.

// Generate Pod resize completed event
podResizeCompletedEventMsg := m.podResizeCompletionMsg(allocatedPod)
if m.recorder != nil {
m.recorder.Eventf(allocatedPod, v1.EventTypeNormal, events.ResizeCompleted, podResizeCompletedEventMsg)
}
// TODO(natasha41575): We only need to make this call if any of the resources were decreased.
m.RetryPendingResizes()
}
Expand Down
114 changes: 93 additions & 21 deletions pkg/kubelet/allocation/allocation_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/record"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/allocation/state"
Expand Down Expand Up @@ -185,7 +186,16 @@ func TestUpdatePodFromAllocation(t *testing.T) {
}
}

func TestIsPodResizeInProgress(t *testing.T) {
func getEventsFromFakeRecorder(t *testing.T, am Manager) string {
select {
case e := <-am.(*manager).recorder.(*record.FakeRecorder).Events:
return e
default:
return ""
}
}

func TestCheckPodResizeInProgress(t *testing.T) {
type testResources struct {
cpuReq, cpuLim, memReq, memLim int64
}
Expand All @@ -198,72 +208,83 @@ func TestIsPodResizeInProgress(t *testing.T) {
}

tests := []struct {
name string
containers []testContainer
expectHasResize bool
name string
containers []testContainer
oldPodResizeInProgressCondition bool
expectHasResize bool
expectPodResizeCompletedMsg string
}{{
name: "simple running container",
containers: []testContainer{{
allocated: testResources{100, 100, 100, 100},
actuated: &testResources{100, 100, 100, 100},
isRunning: true,
}},
expectHasResize: false,
oldPodResizeInProgressCondition: true,
expectHasResize: false,
expectPodResizeCompletedMsg: `Normal ResizeCompleted Pod resize completed: {"containers":[{"name":"c0","resources":{"limits":{"cpu":"100m","memory":"100"},"requests":{"cpu":"100m","memory":"100"}}}]}`,
}, {
name: "simple unstarted container",
containers: []testContainer{{
allocated: testResources{100, 100, 100, 100},
unstarted: true,
}},
expectHasResize: false,
oldPodResizeInProgressCondition: false,
expectHasResize: false,
}, {
name: "simple resized container/cpu req",
containers: []testContainer{{
allocated: testResources{100, 200, 100, 200},
actuated: &testResources{150, 200, 100, 200},
isRunning: true,
}},
expectHasResize: true,
oldPodResizeInProgressCondition: false,
expectHasResize: true,
}, {
name: "simple resized container/cpu limit",
containers: []testContainer{{
allocated: testResources{100, 200, 100, 200},
actuated: &testResources{100, 300, 100, 200},
isRunning: true,
}},
expectHasResize: true,
oldPodResizeInProgressCondition: false,
expectHasResize: true,
}, {
name: "simple resized container/mem req",
containers: []testContainer{{
allocated: testResources{100, 200, 100, 200},
actuated: &testResources{100, 200, 150, 200},
isRunning: true,
}},
expectHasResize: true,
oldPodResizeInProgressCondition: false,
expectHasResize: true,
}, {
name: "simple resized container/cpu+mem req",
containers: []testContainer{{
allocated: testResources{100, 200, 100, 200},
actuated: &testResources{150, 200, 150, 200},
isRunning: true,
}},
expectHasResize: true,
oldPodResizeInProgressCondition: false,
expectHasResize: true,
}, {
name: "simple resized container/mem limit",
containers: []testContainer{{
allocated: testResources{100, 200, 100, 200},
actuated: &testResources{100, 200, 100, 300},
isRunning: true,
}},
expectHasResize: true,
oldPodResizeInProgressCondition: false,
expectHasResize: true,
}, {
name: "terminated resized container",
containers: []testContainer{{
allocated: testResources{100, 200, 100, 200},
actuated: &testResources{200, 200, 100, 200},
isRunning: false,
}},
expectHasResize: false,
oldPodResizeInProgressCondition: false,
expectHasResize: false,
}, {
name: "non-sidecar init container",
containers: []testContainer{{
Expand All @@ -275,7 +296,8 @@ func TestIsPodResizeInProgress(t *testing.T) {
actuated: &testResources{100, 200, 100, 200},
isRunning: true,
}},
expectHasResize: false,
oldPodResizeInProgressCondition: false,
expectHasResize: false,
}, {
name: "non-resized sidecar",
containers: []testContainer{{
Expand All @@ -288,7 +310,8 @@ func TestIsPodResizeInProgress(t *testing.T) {
actuated: &testResources{100, 200, 100, 200},
isRunning: true,
}},
expectHasResize: false,
oldPodResizeInProgressCondition: false,
expectHasResize: false,
}, {
name: "resized sidecar",
containers: []testContainer{{
Expand All @@ -301,7 +324,8 @@ func TestIsPodResizeInProgress(t *testing.T) {
actuated: &testResources{100, 200, 100, 200},
isRunning: true,
}},
expectHasResize: true,
oldPodResizeInProgressCondition: false,
expectHasResize: true,
}, {
name: "several containers and a resize",
containers: []testContainer{{
Expand All @@ -320,31 +344,55 @@ func TestIsPodResizeInProgress(t *testing.T) {
actuated: &testResources{200, 200, 100, 200}, // Resized
isRunning: true,
}},
expectHasResize: true,
oldPodResizeInProgressCondition: false,
expectHasResize: true,
}, {
name: "several containers",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for this case, omit some resources in each container, e.g.

  • one just has CPU requests & limits
  • one just has memory requests & limits
  • one just has CPU & memory requests

it would also be good to add a test with init containers.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, consider adding a separate test for emitPodResizeCompletionEvent that has the more complete coverage.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to simplify modifications, I just changed "several container" case to cover these three cases, both have init and normal container. I think it could cover the test case for emitPodResizeCompletionEvent. If there is other request
, we can add a separate func to test it.
- one just has CPU requests & limits (sidecar init container)
- one just has memory requests & limits (container)
- one just has CPU & memory requests (container)

containers: []testContainer{{
allocated: testResources{cpuReq: 100, cpuLim: 200},
actuated: &testResources{cpuReq: 100, cpuLim: 200},
sidecar: true,
isRunning: true,
}, {
allocated: testResources{memReq: 100, memLim: 200},
actuated: &testResources{memReq: 100, memLim: 200},
isRunning: true,
}, {
allocated: testResources{cpuReq: 200, memReq: 100},
actuated: &testResources{cpuReq: 200, memReq: 100},
isRunning: true,
}},
oldPodResizeInProgressCondition: true,
expectHasResize: false,
expectPodResizeCompletedMsg: `Normal ResizeCompleted Pod resize completed: {"initContainers":[{"name":"c0","resources":{"limits":{"cpu":"200m"},"requests":{"cpu":"100m"}}}],"containers":[{"name":"c1","resources":{"limits":{"memory":"200"},"requests":{"memory":"100"}}},{"name":"c2","resources":{"requests":{"cpu":"200m","memory":"100"}}}]}`,
}, {
name: "best-effort pod",
containers: []testContainer{{
allocated: testResources{},
actuated: &testResources{},
isRunning: true,
}},
expectHasResize: false,
oldPodResizeInProgressCondition: true,
expectHasResize: false,
expectPodResizeCompletedMsg: `Normal ResizeCompleted Pod resize completed: {"containers":[{"name":"c0","resources":{}}]}`,
}, {
name: "burstable pod/not resizing",
containers: []testContainer{{
allocated: testResources{cpuReq: 100},
actuated: &testResources{cpuReq: 100},
isRunning: true,
}},
expectHasResize: false,
oldPodResizeInProgressCondition: false,
expectHasResize: false,
}, {
name: "burstable pod/resized",
containers: []testContainer{{
allocated: testResources{cpuReq: 100},
actuated: &testResources{cpuReq: 500},
isRunning: true,
}},
expectHasResize: true,
oldPodResizeInProgressCondition: false,
expectHasResize: true,
}}

mkRequirements := func(r testResources) v1.ResourceRequirements {
Expand Down Expand Up @@ -431,8 +479,32 @@ func TestIsPodResizeInProgress(t *testing.T) {
}
require.NoError(t, am.SetAllocatedResources(pod))

hasResizedResources := am.(*manager).isPodResizeInProgress(pod, podStatus)
require.Equal(t, test.expectHasResize, hasResizedResources, "hasResizedResources")
am.(*manager).recorder = record.NewFakeRecorder(200)

// Set old Pod condition as Inprogress, so that ClearPodResizeInProgressCondition is true and emit resize completed event
if test.oldPodResizeInProgressCondition {
am.(*manager).statusManager.SetPodResizeInProgressCondition(pod.UID, "", "", int64(1))
}

am.CheckPodResizeInProgress(pod, podStatus)

// Verify pod resize completed event is emitted
podResizeCompletionEvent := getEventsFromFakeRecorder(t, am)
assert.Equal(t, test.expectPodResizeCompletedMsg, podResizeCompletionEvent)

if test.expectHasResize {
// Verify the status manager has the InProgress condition set on it
gotResizeConditions := am.(*manager).statusManager.GetPodResizeConditions(pod.UID)
for _, c := range gotResizeConditions {
require.Equal(t, v1.PodResizeInProgress, c.Type, "ResizeConditions Type should be PodResizeInProgress")
require.Empty(t, c.Reason, "ResizeConditions Error")
require.Empty(t, c.Message, "ResizeConditions Message")
}
} else {
// Verify pod resize Inprogress condition is cleared
gotResizeConditions := am.(*manager).statusManager.GetPodResizeConditions(pod.UID)
require.Empty(t, gotResizeConditions, "ResizeConditions Error")
}
})
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/kubelet/events/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
NetworkNotReady = "NetworkNotReady"
ResizeDeferred = "ResizeDeferred"
ResizeInfeasible = "ResizeInfeasible"
ResizeCompleted = "ResizeCompleted"
)

// Image event reason list
Expand Down
1 change: 1 addition & 0 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet.GetActivePods,
klet.podManager.GetPodByUID,
klet.sourcesReady,
kubeDeps.Recorder,
)

klet.resourceAnalyzer = serverstats.NewResourceAnalyzer(klet, kubeCfg.VolumeStatsAggPeriod.Duration, kubeDeps.Recorder)
Expand Down