Skip to content
235 changes: 193 additions & 42 deletions pkg/kubelet/allocation/allocation_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ limitations under the License.
package allocation

import (
"context"
"fmt"
"path/filepath"
"slices"
"sync"
"time"

v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
Expand All @@ -34,6 +36,7 @@ import (
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/allocation/state"
"k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/status"
Expand All @@ -45,6 +48,9 @@ import (
const (
allocatedPodsStateFile = "allocated_pods_state"
actuatedPodsStateFile = "actuated_pods_state"

initialRetryDelay = 30 * time.Second
retryDelay = 3 * time.Minute
)

// AllocationManager tracks pod resource allocations.
Expand All @@ -71,6 +77,10 @@ type Manager interface {
// TODO: See if we can remove this and just add them in the allocation manager constructor.
AddPodAdmitHandlers(handlers lifecycle.PodAdmitHandlers)

// SetContainerRuntime sets the allocation manager's container runtime.
// TODO: See if we can remove this and just add it in the allocation manager constructor.
SetContainerRuntime(runtime kubecontainer.Runtime)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same energy as #131801 (comment)


// AddPod checks if a pod can be admitted. If so, it admits the pod and updates the allocation.
// The function returns a boolean value indicating whether the pod
// can be admitted, a brief single-word reason and a message explaining why
Expand All @@ -85,10 +95,19 @@ type Manager interface {
// RemoveOrphanedPods removes the stored state for any pods not included in the set of remaining pods.
RemoveOrphanedPods(remainingPods sets.Set[types.UID])

// HandlePodResourcesResize returns the "allocated pod", which should be used for all resource
// calculations after this function is called. It also updates the cached ResizeStatus according to
// the allocation decision and pod status.
HandlePodResourcesResize(runtime kubecontainer.Runtime, allocatedPods []*v1.Pod, pod *v1.Pod, podStatus *kubecontainer.PodStatus) (*v1.Pod, error)
// Run starts the allocation manager. This is currently only used to handle periodic retry of
// pending resizes.
Run(ctx context.Context)

// PushPendingResize queues a pod with a pending resize request for later reevaluation.
PushPendingResize(uid types.UID)

// RetryPendingResizes retries all pending resizes. It returns a list of successful resizes.
RetryPendingResizes() []*v1.Pod

// CheckPodResizeInProgress checks whether the actuated resizable resources differ from the allocated resources
// for any running containers.
CheckPodResizeInProgress(allocatedPod *v1.Pod, podStatus *kubecontainer.PodStatus)
}

type manager struct {
Expand All @@ -97,19 +116,40 @@ type manager struct {

admitHandlers lifecycle.PodAdmitHandlers
containerManager cm.ContainerManager
containerRuntime kubecontainer.Runtime
statusManager status.Manager
sourcesReady config.SourcesReady

ticker *time.Ticker
triggerPodSync func(pod *v1.Pod)
getActivePods func() []*v1.Pod
getPodByUID func(types.UID) (*v1.Pod, bool)

allocationMutex sync.Mutex
allocationMutex sync.Mutex
podsWithPendingResizes []types.UID
}

func NewManager(checkpointDirectory string, containerManager cm.ContainerManager, statusManager status.Manager) Manager {
func NewManager(checkpointDirectory string,
containerManager cm.ContainerManager,
statusManager status.Manager,
triggerPodSync func(pod *v1.Pod),
getActivePods func() []*v1.Pod,
getPodByUID func(types.UID) (*v1.Pod, bool),
sourcesReady config.SourcesReady,
) Manager {
return &manager{
allocated: newStateImpl(checkpointDirectory, allocatedPodsStateFile),
actuated: newStateImpl(checkpointDirectory, actuatedPodsStateFile),

containerManager: containerManager,
statusManager: statusManager,
admitHandlers: lifecycle.PodAdmitHandlers{},
sourcesReady: sourcesReady,

ticker: time.NewTicker(initialRetryDelay),
triggerPodSync: triggerPodSync,
getActivePods: getActivePods,
getPodByUID: getPodByUID,
}
}

Expand All @@ -131,17 +171,117 @@ func newStateImpl(checkpointDirectory, checkpointName string) state.State {

// NewInMemoryManager returns an allocation manager that doesn't persist state.
// For testing purposes only!
func NewInMemoryManager(containerManager cm.ContainerManager, statusManager status.Manager) Manager {
func NewInMemoryManager(containerManager cm.ContainerManager,
statusManager status.Manager,
triggerPodSync func(pod *v1.Pod),
getActivePods func() []*v1.Pod,
getPodByUID func(types.UID) (*v1.Pod, bool),
sourcesReady config.SourcesReady,
) Manager {
return &manager{
allocated: state.NewStateMemory(nil),
actuated: state.NewStateMemory(nil),

containerManager: containerManager,
statusManager: statusManager,
admitHandlers: lifecycle.PodAdmitHandlers{},
sourcesReady: sourcesReady,

ticker: time.NewTicker(initialRetryDelay),
triggerPodSync: triggerPodSync,
getActivePods: getActivePods,
getPodByUID: getPodByUID,
}
}

func (m *manager) Run(ctx context.Context) {
// Start a goroutine to periodically check for pending resizes and process them if needed.
go func() {
for {
select {
case <-m.ticker.C:
successfulResizes := m.RetryPendingResizes()
for _, po := range successfulResizes {
klog.InfoS("Successfully retried resize after timeout", "pod", klog.KObj(po))
}
case <-ctx.Done():
m.ticker.Stop()
return
}
}
}()
}

func (m *manager) RetryPendingResizes() []*v1.Pod {
m.allocationMutex.Lock()
defer m.allocationMutex.Unlock()

if !m.sourcesReady.AllReady() {
klog.V(4).InfoS("Skipping evaluation of pending resizes; sources are not ready")
m.ticker.Reset(initialRetryDelay)
return nil
}

m.ticker.Reset(retryDelay)

var newPendingResizes []types.UID
var successfulResizes []*v1.Pod

// Retry all pending resizes.
for _, uid := range m.podsWithPendingResizes {
pod, found := m.getPodByUID(uid)
if !found {
klog.V(4).InfoS("Pod not found; removing from pending resizes", "podUID", uid)
continue
}

oldResizeStatus := m.statusManager.GetPodResizeConditions(uid)

resizeAllocated, err := m.handlePodResourcesResize(pod)
switch {
case err != nil:
klog.ErrorS(err, "Failed to handle pod resources resize", "pod", klog.KObj(pod))
newPendingResizes = append(newPendingResizes, uid)
case m.statusManager.IsPodResizeDeferred(uid):
klog.V(4).InfoS("Pod resize is deferred; will reevaluate later", "pod", klog.KObj(pod))
newPendingResizes = append(newPendingResizes, uid)
case m.statusManager.IsPodResizeInfeasible(uid):
klog.V(4).InfoS("Pod resize is infeasible", "pod", klog.KObj(pod))
default:
klog.V(4).InfoS("Pod resize successfully allocated", "pod", klog.KObj(pod))
successfulResizes = append(successfulResizes, pod)
}

// If the pod resize status has changed, we need to update the pod status.
newResizeStatus := m.statusManager.GetPodResizeConditions(uid)
if resizeAllocated || !apiequality.Semantic.DeepEqual(oldResizeStatus, newResizeStatus) {
m.triggerPodSync(pod)
}
}

m.podsWithPendingResizes = newPendingResizes
return successfulResizes

}

func (m *manager) PushPendingResize(uid types.UID) {
m.allocationMutex.Lock()
defer m.allocationMutex.Unlock()

for _, p := range m.podsWithPendingResizes {
if p == uid {
// Pod is already in the pending resizes queue.
return
}
}

// Add the pod to the pending resizes list
m.podsWithPendingResizes = append(m.podsWithPendingResizes, uid)

// TODO (natasha41575): Sort the pending resizes list by priority.
// See https://github.com/kubernetes/enhancements/pull/5266.
}

// GetContainerResourceAllocation returns the last checkpointed AllocatedResources values
// If checkpoint manager has not been initialized, it returns nil, false
func (m *manager) GetContainerResourceAllocation(podUID types.UID, containerName string) (v1.ResourceRequirements, bool) {
Expand All @@ -157,6 +297,9 @@ func (m *manager) UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool) {
}

func updatePodFromAllocation(pod *v1.Pod, allocs state.PodResourceInfoMap) (*v1.Pod, bool) {
if pod == nil {
return pod, false
}
allocated, found := allocs[pod.UID]
if !found {
return pod, false
Expand Down Expand Up @@ -222,6 +365,10 @@ func (m *manager) AddPodAdmitHandlers(handlers lifecycle.PodAdmitHandlers) {
}
}

func (m *manager) SetContainerRuntime(runtime kubecontainer.Runtime) {
m.containerRuntime = runtime
}

func (m *manager) AddPod(activePods []*v1.Pod, pod *v1.Pod) (bool, string, string) {
m.allocationMutex.Lock()
defer m.allocationMutex.Unlock()
Expand Down Expand Up @@ -277,65 +424,44 @@ func (m *manager) GetActuatedResources(podUID types.UID, containerName string) (
return m.actuated.GetContainerResources(podUID, containerName)
}

func (m *manager) HandlePodResourcesResize(
runtime kubecontainer.Runtime,
allocatedPods []*v1.Pod,
pod *v1.Pod,
podStatus *kubecontainer.PodStatus) (allocatedPod *v1.Pod, err error) {
// Always check whether a resize is in progress so we can set the PodResizeInProgressCondition
// accordingly.
defer func() {
if err != nil {
return
}
if m.isPodResizeInProgress(allocatedPod, podStatus) {
// If a resize is in progress, make sure the cache has the correct state in case the Kubelet restarted.
m.statusManager.SetPodResizeInProgressCondition(pod.UID, "", "", false)
} else {
// (Allocated == Actual) => clear the resize in-progress status.
m.statusManager.ClearPodResizeInProgressCondition(pod.UID)
}
}()

podFromAllocation, updated := m.UpdatePodFromAllocation(pod)
func (m *manager) handlePodResourcesResize(pod *v1.Pod) (bool, error) {
allocatedPod, updated := m.UpdatePodFromAllocation(pod)
if !updated {
// Desired resources == allocated resources. Pod allocation does not need to be updated.
m.statusManager.ClearPodResizePendingCondition(pod.UID)
return podFromAllocation, nil
return false, nil

} else if resizable, msg := IsInPlacePodVerticalScalingAllowed(pod); !resizable {
// If there is a pending resize but the resize is not allowed, always use the allocated resources.
m.statusManager.SetPodResizePendingCondition(pod.UID, v1.PodReasonInfeasible, msg)
return podFromAllocation, nil
} else if resizeNotAllowed, msg := disallowResizeForSwappableContainers(runtime, pod, podFromAllocation); resizeNotAllowed {
return false, nil
} else if resizeNotAllowed, msg := disallowResizeForSwappableContainers(m.containerRuntime, pod, allocatedPod); resizeNotAllowed {
// If this resize involve swap recalculation, set as infeasible, as IPPR with swap is not supported for beta.
m.statusManager.SetPodResizePendingCondition(pod.UID, v1.PodReasonInfeasible, msg)
return podFromAllocation, nil
return false, nil
}

m.allocationMutex.Lock()
defer m.allocationMutex.Unlock()
// Desired resources != allocated resources. Can we update the allocation to the desired resources?
fit, reason, message := m.canResizePod(allocatedPods, pod)
fit, reason, message := m.canResizePod(m.getAllocatedPods(m.getActivePods()), pod)
if fit {
// Update pod resource allocation checkpoint
if err := m.SetAllocatedResources(pod); err != nil {
return nil, err
return false, err
}
m.statusManager.ClearPodResizePendingCondition(pod.UID)

// Clear any errors that may have been surfaced from a previous resize. The condition will be
// added back as needed in the defer block, but this prevents old errors from being preserved.
m.statusManager.ClearPodResizeInProgressCondition(pod.UID)
// re-assesssed in the sync loop but this prevents old errors from being preserved.
m.statusManager.SetPodResizeInProgressCondition(pod.UID, "", "", true)

return pod, nil
return true, nil
}

if reason != "" {
m.statusManager.SetPodResizePendingCondition(pod.UID, reason, message)
}

return podFromAllocation, nil
return false, nil
}

func disallowResizeForSwappableContainers(runtime kubecontainer.Runtime, desiredPod, allocatedPod *v1.Pod) (bool, string) {
Expand Down Expand Up @@ -433,19 +559,44 @@ func (m *manager) canResizePod(allocatedPods []*v1.Pod, pod *v1.Pod) (bool, stri
msg = "Node didn't have enough capacity: " + msg
klog.V(3).InfoS(msg, "pod", klog.KObj(pod))
return false, v1.PodReasonInfeasible, msg
}

for i := range allocatedPods {
for j, c := range allocatedPods[i].Status.ContainerStatuses {
actuatedResources, exists := m.GetActuatedResources(allocatedPods[i].UID, c.Name)
if exists {
// Overwrite the actual resources in the status with the actuated resources.
// This lets us reuse the existing scheduler libraries without having to wait
// for the actual resources in the status to be updated.
allocatedPods[i].Status.ContainerStatuses[j].Resources = &actuatedResources
}
}
}

if ok, failReason, failMessage := m.canAdmitPod(allocatedPods, pod); !ok {
// Log reason and return. Let the next sync iteration retry the resize
// Log reason and return.
klog.V(3).InfoS("Resize cannot be accommodated", "pod", klog.KObj(pod), "reason", failReason, "message", failMessage)
return false, v1.PodReasonDeferred, failMessage
}

return true, "", ""
}

// IsPodResizingInProgress checks whether the actuated resizable resources differ from the allocated resources
func (m *manager) CheckPodResizeInProgress(allocatedPod *v1.Pod, podStatus *kubecontainer.PodStatus) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's leave this here for now, but maybe we should move all the actuated resources & in-progress logic out of allocation manager in a follow-up PR. It can almost all go to the runtime. That would also break the circular dependency with the kuberuntime. WDYT?

I need to think about this more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent far too long thinking about this, but I like the idea of making actuated resources an implementation detail of kuberuntime. Once this PR merges, I can take this as a follow-up task.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have anything against it, and it makes sense as a mental model, but you'd have to either plumb the status manager down into kuberuntime or bubble up the result somehow? I have a vague recollection of trying something like this elsewhere and running into depedency issues but I'm sure you'll have a better solution than me, good luck :)

// If a resize is in progress, make sure the cache has the correct state in case the Kubelet restarted.
if m.isPodResizeInProgress(allocatedPod, podStatus) {
m.statusManager.SetPodResizeInProgressCondition(allocatedPod.UID, "", "", false)
} else {
// (Allocated == Actual) => clear the resize in-progress status.
conditionCleared := m.statusManager.ClearPodResizeInProgressCondition(allocatedPod.UID)
if conditionCleared {
// TODO(natasha41575): We only need to make this call if any of the resources were decreased.
m.RetryPendingResizes()
}
}
}

// isPodResizeInProgress checks whether the actuated resizable resources differ from the allocated resources
// for any running containers. Specifically, the following differences are ignored:
// - Non-resizable containers: non-restartable init containers, ephemeral containers
// - Non-resizable resources: only CPU & memory are resizable
Expand Down
Loading