-
Notifications
You must be signed in to change notification settings - Fork 41.6k
[FG:InPlacePodVerticalScaling] Move resize allocation logic out of the sync loop #131612
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
cb2baef
87fe249
e6d165b
35bf152
b46a124
c925243
b89ab30
381b3f3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,10 +17,12 @@ limitations under the License. | |
| package allocation | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "path/filepath" | ||
| "slices" | ||
| "sync" | ||
| "time" | ||
|
|
||
| v1 "k8s.io/api/core/v1" | ||
| apiequality "k8s.io/apimachinery/pkg/api/equality" | ||
|
|
@@ -34,6 +36,7 @@ import ( | |
| "k8s.io/kubernetes/pkg/features" | ||
| "k8s.io/kubernetes/pkg/kubelet/allocation/state" | ||
| "k8s.io/kubernetes/pkg/kubelet/cm" | ||
| "k8s.io/kubernetes/pkg/kubelet/config" | ||
| kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" | ||
| "k8s.io/kubernetes/pkg/kubelet/lifecycle" | ||
| "k8s.io/kubernetes/pkg/kubelet/status" | ||
|
|
@@ -45,6 +48,9 @@ import ( | |
| const ( | ||
| allocatedPodsStateFile = "allocated_pods_state" | ||
| actuatedPodsStateFile = "actuated_pods_state" | ||
|
|
||
| initialRetryDelay = 30 * time.Second | ||
| retryDelay = 3 * time.Minute | ||
| ) | ||
|
|
||
| // AllocationManager tracks pod resource allocations. | ||
|
|
@@ -71,6 +77,10 @@ type Manager interface { | |
| // TODO: See if we can remove this and just add them in the allocation manager constructor. | ||
| AddPodAdmitHandlers(handlers lifecycle.PodAdmitHandlers) | ||
|
|
||
| // SetContainerRuntime sets the allocation manager's container runtime. | ||
| // TODO: See if we can remove this and just add it in the allocation manager constructor. | ||
| SetContainerRuntime(runtime kubecontainer.Runtime) | ||
|
|
||
| // AddPod checks if a pod can be admitted. If so, it admits the pod and updates the allocation. | ||
| // The function returns a boolean value indicating whether the pod | ||
| // can be admitted, a brief single-word reason and a message explaining why | ||
|
|
@@ -85,10 +95,19 @@ type Manager interface { | |
| // RemoveOrphanedPods removes the stored state for any pods not included in the set of remaining pods. | ||
| RemoveOrphanedPods(remainingPods sets.Set[types.UID]) | ||
|
|
||
| // HandlePodResourcesResize returns the "allocated pod", which should be used for all resource | ||
| // calculations after this function is called. It also updates the cached ResizeStatus according to | ||
| // the allocation decision and pod status. | ||
| HandlePodResourcesResize(runtime kubecontainer.Runtime, allocatedPods []*v1.Pod, pod *v1.Pod, podStatus *kubecontainer.PodStatus) (*v1.Pod, error) | ||
| // Run starts the allocation manager. This is currently only used to handle periodic retry of | ||
| // pending resizes. | ||
| Run(ctx context.Context) | ||
|
|
||
| // PushPendingResize queues a pod with a pending resize request for later reevaluation. | ||
| PushPendingResize(uid types.UID) | ||
|
|
||
| // RetryPendingResizes retries all pending resizes. It returns a list of successful resizes. | ||
| RetryPendingResizes() []*v1.Pod | ||
|
|
||
| // CheckPodResizeInProgress checks whether the actuated resizable resources differ from the allocated resources | ||
| // for any running containers. | ||
| CheckPodResizeInProgress(allocatedPod *v1.Pod, podStatus *kubecontainer.PodStatus) | ||
| } | ||
|
|
||
| type manager struct { | ||
|
|
@@ -97,19 +116,40 @@ type manager struct { | |
|
|
||
| admitHandlers lifecycle.PodAdmitHandlers | ||
| containerManager cm.ContainerManager | ||
| containerRuntime kubecontainer.Runtime | ||
| statusManager status.Manager | ||
| sourcesReady config.SourcesReady | ||
|
|
||
| ticker *time.Ticker | ||
| triggerPodSync func(pod *v1.Pod) | ||
| getActivePods func() []*v1.Pod | ||
| getPodByUID func(types.UID) (*v1.Pod, bool) | ||
|
|
||
| allocationMutex sync.Mutex | ||
| allocationMutex sync.Mutex | ||
| podsWithPendingResizes []types.UID | ||
| } | ||
|
|
||
| func NewManager(checkpointDirectory string, containerManager cm.ContainerManager, statusManager status.Manager) Manager { | ||
| func NewManager(checkpointDirectory string, | ||
| containerManager cm.ContainerManager, | ||
| statusManager status.Manager, | ||
| triggerPodSync func(pod *v1.Pod), | ||
| getActivePods func() []*v1.Pod, | ||
| getPodByUID func(types.UID) (*v1.Pod, bool), | ||
| sourcesReady config.SourcesReady, | ||
| ) Manager { | ||
| return &manager{ | ||
| allocated: newStateImpl(checkpointDirectory, allocatedPodsStateFile), | ||
| actuated: newStateImpl(checkpointDirectory, actuatedPodsStateFile), | ||
|
|
||
| containerManager: containerManager, | ||
| statusManager: statusManager, | ||
| admitHandlers: lifecycle.PodAdmitHandlers{}, | ||
| sourcesReady: sourcesReady, | ||
|
|
||
| ticker: time.NewTicker(initialRetryDelay), | ||
| triggerPodSync: triggerPodSync, | ||
| getActivePods: getActivePods, | ||
| getPodByUID: getPodByUID, | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -131,17 +171,117 @@ func newStateImpl(checkpointDirectory, checkpointName string) state.State { | |
|
|
||
| // NewInMemoryManager returns an allocation manager that doesn't persist state. | ||
| // For testing purposes only! | ||
| func NewInMemoryManager(containerManager cm.ContainerManager, statusManager status.Manager) Manager { | ||
| func NewInMemoryManager(containerManager cm.ContainerManager, | ||
| statusManager status.Manager, | ||
| triggerPodSync func(pod *v1.Pod), | ||
| getActivePods func() []*v1.Pod, | ||
| getPodByUID func(types.UID) (*v1.Pod, bool), | ||
| sourcesReady config.SourcesReady, | ||
| ) Manager { | ||
| return &manager{ | ||
| allocated: state.NewStateMemory(nil), | ||
| actuated: state.NewStateMemory(nil), | ||
|
|
||
| containerManager: containerManager, | ||
| statusManager: statusManager, | ||
| admitHandlers: lifecycle.PodAdmitHandlers{}, | ||
| sourcesReady: sourcesReady, | ||
|
|
||
| ticker: time.NewTicker(initialRetryDelay), | ||
| triggerPodSync: triggerPodSync, | ||
| getActivePods: getActivePods, | ||
| getPodByUID: getPodByUID, | ||
| } | ||
| } | ||
|
|
||
| func (m *manager) Run(ctx context.Context) { | ||
| // Start a goroutine to periodically check for pending resizes and process them if needed. | ||
| go func() { | ||
| for { | ||
| select { | ||
| case <-m.ticker.C: | ||
| successfulResizes := m.RetryPendingResizes() | ||
| for _, po := range successfulResizes { | ||
| klog.InfoS("Successfully retried resize after timeout", "pod", klog.KObj(po)) | ||
| } | ||
| case <-ctx.Done(): | ||
| m.ticker.Stop() | ||
| return | ||
| } | ||
| } | ||
| }() | ||
| } | ||
|
|
||
| func (m *manager) RetryPendingResizes() []*v1.Pod { | ||
| m.allocationMutex.Lock() | ||
| defer m.allocationMutex.Unlock() | ||
|
|
||
| if !m.sourcesReady.AllReady() { | ||
| klog.V(4).InfoS("Skipping evaluation of pending resizes; sources are not ready") | ||
natasha41575 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| m.ticker.Reset(initialRetryDelay) | ||
| return nil | ||
| } | ||
|
|
||
| m.ticker.Reset(retryDelay) | ||
|
|
||
| var newPendingResizes []types.UID | ||
| var successfulResizes []*v1.Pod | ||
|
|
||
| // Retry all pending resizes. | ||
| for _, uid := range m.podsWithPendingResizes { | ||
| pod, found := m.getPodByUID(uid) | ||
| if !found { | ||
| klog.V(4).InfoS("Pod not found; removing from pending resizes", "podUID", uid) | ||
| continue | ||
| } | ||
|
|
||
| oldResizeStatus := m.statusManager.GetPodResizeConditions(uid) | ||
|
|
||
| resizeAllocated, err := m.handlePodResourcesResize(pod) | ||
| switch { | ||
| case err != nil: | ||
| klog.ErrorS(err, "Failed to handle pod resources resize", "pod", klog.KObj(pod)) | ||
| newPendingResizes = append(newPendingResizes, uid) | ||
| case m.statusManager.IsPodResizeDeferred(uid): | ||
| klog.V(4).InfoS("Pod resize is deferred; will reevaluate later", "pod", klog.KObj(pod)) | ||
| newPendingResizes = append(newPendingResizes, uid) | ||
| case m.statusManager.IsPodResizeInfeasible(uid): | ||
| klog.V(4).InfoS("Pod resize is infeasible", "pod", klog.KObj(pod)) | ||
| default: | ||
| klog.V(4).InfoS("Pod resize successfully allocated", "pod", klog.KObj(pod)) | ||
| successfulResizes = append(successfulResizes, pod) | ||
| } | ||
|
|
||
| // If the pod resize status has changed, we need to update the pod status. | ||
| newResizeStatus := m.statusManager.GetPodResizeConditions(uid) | ||
| if resizeAllocated || !apiequality.Semantic.DeepEqual(oldResizeStatus, newResizeStatus) { | ||
| m.triggerPodSync(pod) | ||
| } | ||
| } | ||
|
|
||
| m.podsWithPendingResizes = newPendingResizes | ||
| return successfulResizes | ||
|
|
||
| } | ||
|
|
||
| func (m *manager) PushPendingResize(uid types.UID) { | ||
| m.allocationMutex.Lock() | ||
| defer m.allocationMutex.Unlock() | ||
|
|
||
| for _, p := range m.podsWithPendingResizes { | ||
| if p == uid { | ||
| // Pod is already in the pending resizes queue. | ||
| return | ||
| } | ||
| } | ||
|
|
||
| // Add the pod to the pending resizes list | ||
| m.podsWithPendingResizes = append(m.podsWithPendingResizes, uid) | ||
|
|
||
| // TODO (natasha41575): Sort the pending resizes list by priority. | ||
| // See https://github.com/kubernetes/enhancements/pull/5266. | ||
| } | ||
|
|
||
| // GetContainerResourceAllocation returns the last checkpointed AllocatedResources values | ||
| // If checkpoint manager has not been initialized, it returns nil, false | ||
| func (m *manager) GetContainerResourceAllocation(podUID types.UID, containerName string) (v1.ResourceRequirements, bool) { | ||
|
|
@@ -157,6 +297,9 @@ func (m *manager) UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool) { | |
| } | ||
|
|
||
| func updatePodFromAllocation(pod *v1.Pod, allocs state.PodResourceInfoMap) (*v1.Pod, bool) { | ||
| if pod == nil { | ||
| return pod, false | ||
| } | ||
| allocated, found := allocs[pod.UID] | ||
| if !found { | ||
| return pod, false | ||
|
|
@@ -222,6 +365,10 @@ func (m *manager) AddPodAdmitHandlers(handlers lifecycle.PodAdmitHandlers) { | |
| } | ||
| } | ||
|
|
||
| func (m *manager) SetContainerRuntime(runtime kubecontainer.Runtime) { | ||
| m.containerRuntime = runtime | ||
| } | ||
|
|
||
| func (m *manager) AddPod(activePods []*v1.Pod, pod *v1.Pod) (bool, string, string) { | ||
| m.allocationMutex.Lock() | ||
| defer m.allocationMutex.Unlock() | ||
|
|
@@ -277,65 +424,44 @@ func (m *manager) GetActuatedResources(podUID types.UID, containerName string) ( | |
| return m.actuated.GetContainerResources(podUID, containerName) | ||
| } | ||
|
|
||
| func (m *manager) HandlePodResourcesResize( | ||
| runtime kubecontainer.Runtime, | ||
| allocatedPods []*v1.Pod, | ||
| pod *v1.Pod, | ||
| podStatus *kubecontainer.PodStatus) (allocatedPod *v1.Pod, err error) { | ||
| // Always check whether a resize is in progress so we can set the PodResizeInProgressCondition | ||
| // accordingly. | ||
| defer func() { | ||
| if err != nil { | ||
| return | ||
| } | ||
| if m.isPodResizeInProgress(allocatedPod, podStatus) { | ||
| // If a resize is in progress, make sure the cache has the correct state in case the Kubelet restarted. | ||
| m.statusManager.SetPodResizeInProgressCondition(pod.UID, "", "", false) | ||
| } else { | ||
| // (Allocated == Actual) => clear the resize in-progress status. | ||
| m.statusManager.ClearPodResizeInProgressCondition(pod.UID) | ||
| } | ||
| }() | ||
|
|
||
| podFromAllocation, updated := m.UpdatePodFromAllocation(pod) | ||
| func (m *manager) handlePodResourcesResize(pod *v1.Pod) (bool, error) { | ||
| allocatedPod, updated := m.UpdatePodFromAllocation(pod) | ||
| if !updated { | ||
| // Desired resources == allocated resources. Pod allocation does not need to be updated. | ||
| m.statusManager.ClearPodResizePendingCondition(pod.UID) | ||
| return podFromAllocation, nil | ||
| return false, nil | ||
|
|
||
| } else if resizable, msg := IsInPlacePodVerticalScalingAllowed(pod); !resizable { | ||
| // If there is a pending resize but the resize is not allowed, always use the allocated resources. | ||
| m.statusManager.SetPodResizePendingCondition(pod.UID, v1.PodReasonInfeasible, msg) | ||
| return podFromAllocation, nil | ||
| } else if resizeNotAllowed, msg := disallowResizeForSwappableContainers(runtime, pod, podFromAllocation); resizeNotAllowed { | ||
| return false, nil | ||
| } else if resizeNotAllowed, msg := disallowResizeForSwappableContainers(m.containerRuntime, pod, allocatedPod); resizeNotAllowed { | ||
| // If this resize involve swap recalculation, set as infeasible, as IPPR with swap is not supported for beta. | ||
| m.statusManager.SetPodResizePendingCondition(pod.UID, v1.PodReasonInfeasible, msg) | ||
| return podFromAllocation, nil | ||
| return false, nil | ||
| } | ||
|
|
||
| m.allocationMutex.Lock() | ||
| defer m.allocationMutex.Unlock() | ||
| // Desired resources != allocated resources. Can we update the allocation to the desired resources? | ||
| fit, reason, message := m.canResizePod(allocatedPods, pod) | ||
| fit, reason, message := m.canResizePod(m.getAllocatedPods(m.getActivePods()), pod) | ||
| if fit { | ||
| // Update pod resource allocation checkpoint | ||
| if err := m.SetAllocatedResources(pod); err != nil { | ||
| return nil, err | ||
| return false, err | ||
| } | ||
| m.statusManager.ClearPodResizePendingCondition(pod.UID) | ||
|
|
||
| // Clear any errors that may have been surfaced from a previous resize. The condition will be | ||
| // added back as needed in the defer block, but this prevents old errors from being preserved. | ||
| m.statusManager.ClearPodResizeInProgressCondition(pod.UID) | ||
| // re-assesssed in the sync loop but this prevents old errors from being preserved. | ||
| m.statusManager.SetPodResizeInProgressCondition(pod.UID, "", "", true) | ||
|
|
||
| return pod, nil | ||
| return true, nil | ||
| } | ||
|
|
||
| if reason != "" { | ||
| m.statusManager.SetPodResizePendingCondition(pod.UID, reason, message) | ||
| } | ||
|
|
||
| return podFromAllocation, nil | ||
| return false, nil | ||
| } | ||
|
|
||
| func disallowResizeForSwappableContainers(runtime kubecontainer.Runtime, desiredPod, allocatedPod *v1.Pod) (bool, string) { | ||
|
|
@@ -433,19 +559,44 @@ func (m *manager) canResizePod(allocatedPods []*v1.Pod, pod *v1.Pod) (bool, stri | |
| msg = "Node didn't have enough capacity: " + msg | ||
| klog.V(3).InfoS(msg, "pod", klog.KObj(pod)) | ||
| return false, v1.PodReasonInfeasible, msg | ||
| } | ||
|
|
||
| for i := range allocatedPods { | ||
| for j, c := range allocatedPods[i].Status.ContainerStatuses { | ||
| actuatedResources, exists := m.GetActuatedResources(allocatedPods[i].UID, c.Name) | ||
| if exists { | ||
| // Overwrite the actual resources in the status with the actuated resources. | ||
| // This lets us reuse the existing scheduler libraries without having to wait | ||
| // for the actual resources in the status to be updated. | ||
| allocatedPods[i].Status.ContainerStatuses[j].Resources = &actuatedResources | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if ok, failReason, failMessage := m.canAdmitPod(allocatedPods, pod); !ok { | ||
| // Log reason and return. Let the next sync iteration retry the resize | ||
| // Log reason and return. | ||
| klog.V(3).InfoS("Resize cannot be accommodated", "pod", klog.KObj(pod), "reason", failReason, "message", failMessage) | ||
| return false, v1.PodReasonDeferred, failMessage | ||
| } | ||
|
|
||
| return true, "", "" | ||
| } | ||
|
|
||
| // IsPodResizingInProgress checks whether the actuated resizable resources differ from the allocated resources | ||
| func (m *manager) CheckPodResizeInProgress(allocatedPod *v1.Pod, podStatus *kubecontainer.PodStatus) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's leave this here for now, but maybe we should move all the actuated resources & in-progress logic out of allocation manager in a follow-up PR. It can almost all go to the runtime. That would also break the circular dependency with the kuberuntime. WDYT? I need to think about this more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I spent far too long thinking about this, but I like the idea of making actuated resources an implementation detail of kuberuntime. Once this PR merges, I can take this as a follow-up task. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't have anything against it, and it makes sense as a mental model, but you'd have to either plumb the status manager down into kuberuntime or bubble up the result somehow? I have a vague recollection of trying something like this elsewhere and running into depedency issues but I'm sure you'll have a better solution than me, good luck :) |
||
| // If a resize is in progress, make sure the cache has the correct state in case the Kubelet restarted. | ||
| if m.isPodResizeInProgress(allocatedPod, podStatus) { | ||
| m.statusManager.SetPodResizeInProgressCondition(allocatedPod.UID, "", "", false) | ||
| } else { | ||
| // (Allocated == Actual) => clear the resize in-progress status. | ||
| conditionCleared := m.statusManager.ClearPodResizeInProgressCondition(allocatedPod.UID) | ||
| if conditionCleared { | ||
| // TODO(natasha41575): We only need to make this call if any of the resources were decreased. | ||
| m.RetryPendingResizes() | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // isPodResizeInProgress checks whether the actuated resizable resources differ from the allocated resources | ||
| // for any running containers. Specifically, the following differences are ignored: | ||
| // - Non-resizable containers: non-restartable init containers, ephemeral containers | ||
| // - Non-resizable resources: only CPU & memory are resizable | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same energy as #131801 (comment)