Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 168 additions & 53 deletions internal/runtimehandlerhooks/high_performance_hooks_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,56 @@ const (
SharedCPUsEnvVar = "OPENSHIFT_SHARED_CPUS"
)

// ServiceManager interface for managing system services.
type ServiceManager interface {
IsServiceEnabled(serviceName string) bool
RestartService(serviceName string) error
}

// CommandRunner interface for running external commands.
type CommandRunner interface {
LookPath(file string) (string, error)
RunCommand(name string, env []string, arg ...string) error
}

// Default implementations.
type defaultServiceManager struct{}

func (d *defaultServiceManager) IsServiceEnabled(serviceName string) bool {
return isServiceEnabled(serviceName)
}

func (d *defaultServiceManager) RestartService(serviceName string) error {
return restartService(serviceName)
}

type defaultCommandRunner struct{}

func (d *defaultCommandRunner) LookPath(file string) (string, error) {
return exec.LookPath(file)
}

func (d *defaultCommandRunner) RunCommand(name string, env []string, arg ...string) error {
cmd := cmdrunner.Command(name, arg...)
if len(env) > 0 {
cmd.Env = env
}

return cmd.Run()
}

var (
serviceManager ServiceManager = &defaultServiceManager{}
commandRunner CommandRunner = &defaultCommandRunner{}
)

// HighPerformanceHooks used to run additional hooks that will configure a system for the latency sensitive workloads.
type HighPerformanceHooks struct {
irqBalanceConfigFile string
cpusetLock sync.Mutex
sharedCPUs string
irqBalanceConfigFile string
cpusetLock sync.Mutex
updateIRQSMPAffinityLock sync.Mutex
sharedCPUs string
irqSMPAffinityFile string
}

func (h *HighPerformanceHooks) PreCreate(ctx context.Context, specgen *generate.Generator, s *sandbox.Sandbox, c *oci.Container) error {
Expand Down Expand Up @@ -133,7 +178,8 @@ func (h *HighPerformanceHooks) PreStart(ctx context.Context, c *oci.Container, s
// disable the IRQ smp load balancing for the container CPUs
if shouldIRQLoadBalancingBeDisabled(ctx, s.Annotations()) {
log.Infof(ctx, "Disable irq smp balancing for container %q", c.ID())
if err := setIRQLoadBalancing(ctx, c, false, IrqSmpAffinityProcFile, h.irqBalanceConfigFile); err != nil {

if err := h.setIRQLoadBalancing(ctx, c, false); err != nil {
return fmt.Errorf("set IRQ load balancing: %w", err)
}
}
Expand Down Expand Up @@ -183,13 +229,6 @@ func (h *HighPerformanceHooks) PreStop(ctx context.Context, c *oci.Container, s
return nil
}

// enable the IRQ smp balancing for the container CPUs
if shouldIRQLoadBalancingBeDisabled(ctx, s.Annotations()) {
if err := setIRQLoadBalancing(ctx, c, true, IrqSmpAffinityProcFile, h.irqBalanceConfigFile); err != nil {
return fmt.Errorf("set IRQ load balancing: %w", err)
}
}

// disable the CPU load balancing for the container CPUs
if shouldCPULoadBalancingBeDisabled(ctx, s.Annotations()) {
podManager, containerManagers, err := libctrManagersForPodAndContainerCgroup(c, s.CgroupParent())
Expand Down Expand Up @@ -225,12 +264,23 @@ func (h *HighPerformanceHooks) PreStop(ctx context.Context, c *oci.Container, s
}

// If CPU load balancing is enabled, then *all* containers must run this PostStop hook.
func (*HighPerformanceHooks) PostStop(ctx context.Context, c *oci.Container, s *sandbox.Sandbox) error {
func (h *HighPerformanceHooks) PostStop(ctx context.Context, c *oci.Container, s *sandbox.Sandbox) error {
cSpec := c.Spec()
if shouldRunHooks(ctx, c.ID(), &cSpec, s) {
// enable the IRQ smp balancing for the container CPUs
if shouldIRQLoadBalancingBeDisabled(ctx, s.Annotations()) {
if err := h.setIRQLoadBalancing(ctx, c, true); err != nil {
return fmt.Errorf("set IRQ load balancing: %w", err)
}
}
}

// We could check if `!cpuLoadBalancingAllowed()` here, but it requires access to the config, which would be
// odd to plumb. Instead, always assume if they're using a HighPerformanceHook, they have CPULoadBalanceDisabled
// annotation allowed.
h := &DefaultCPULoadBalanceHooks{}
return h.PostStop(ctx, c, s)
dh := &DefaultCPULoadBalanceHooks{}

return dh.PostStop(ctx, c, s)
}

func shouldCPULoadBalancingBeDisabled(ctx context.Context, annotations fields.Set) bool {
Expand Down Expand Up @@ -537,7 +587,17 @@ func disableCPULoadBalancingV1(containerManagers []cgroups.Manager) error {
return nil
}

func setIRQLoadBalancing(ctx context.Context, c *oci.Container, enable bool, irqSmpAffinityFile, irqBalanceConfigFile string) error {
// setIRQLoadBalancing configures interrupt load balancing for container CPUs by updating
// the IRQ SMP affinity mask and IRQ balance service configuration. It then handles IRQ balance restart.
// When enable is false (= container added), removes container CPUs from interrupt handling to reduce latency;
// when true (= container removed), restores them.
// The entire function after reading IRQ SMP affinity must be wrapped inside a single lock to avoid race conditions.
// The reason for this is that once we read from the SMP IRQ affinity file, we have to calculate new masks and
// write those masks to /proc/irq/default_smp_affinity and /etc/sysconfig/irqbalance.
// We also must restart irqbalance or run irqbalance --oneshot within the same lock.
// Without this lock, 2 threads could read from the file and calculate the new mask but overwrite the
// results of each other, or start irbalance --oneshot with different values.
func (h *HighPerformanceHooks) setIRQLoadBalancing(ctx context.Context, c *oci.Container, enable bool) error {
lspec := c.Spec().Linux
if lspec == nil ||
lspec.Resources == nil ||
Expand All @@ -546,44 +606,110 @@ func setIRQLoadBalancing(ctx context.Context, c *oci.Container, enable bool, irq
return fmt.Errorf("find container %s CPUs", c.ID())
}

content, err := os.ReadFile(irqSmpAffinityFile)
h.updateIRQSMPAffinityLock.Lock()
defer h.updateIRQSMPAffinityLock.Unlock()

newIRQBalanceSetting, err := h.updateNewIRQSMPAffinityMask(ctx, c.ID(), c.Name(), lspec.Resources.CPU.Cpus, enable)
if err != nil {
return err
}
currentIRQSMPSetting := strings.TrimSpace(string(content))
newIRQSMPSetting, newIRQBalanceSetting, err := UpdateIRQSmpAffinityMask(lspec.Resources.CPU.Cpus, currentIRQSMPSetting, enable)
// Now, restart the irqbalance service or run irqbalance --oneshot command.
// On failure, this will log errors but will not return them, as it is not critical for the pod to start.
if !h.handleIRQBalanceRestart(ctx, c.Name()) {
h.handleIRQBalanceOneShot(ctx, c.Name(), newIRQBalanceSetting)
}
return nil
}

// handleIRQBalanceRestart handles the restart of the irqbalance service.
func (h *HighPerformanceHooks) handleIRQBalanceRestart(ctx context.Context, cName string) bool {
// If the irqbalance service is enabled, restart it and return.
// systemd's StartLimitBurst might cause issues here when container restarts occur in very
// quick succession and the parameter must be reconfigured for this to work correctly.
// See:
// https://github.com/cri-o/cri-o/pull/8834/commits/b96928dcbb7956e0ebde42238e88955831411216
if !serviceManager.IsServiceEnabled(irqBalancedName) || !fileExists(h.irqBalanceConfigFile) {
return false
}

log.Debugf(ctx, "Container %q restarting irqbalance service", cName)

if err := serviceManager.RestartService(irqBalancedName); err != nil {
log.Warnf(ctx, "Irqbalance service restart failed: %v", err)

return false
}

return true
}

// handleIRQBalanceOneShot runs irqbalance --oneshot command.
func (h *HighPerformanceHooks) handleIRQBalanceOneShot(ctx context.Context, cName, newIRQBalanceSetting string) {
irqBalanceFullPath, err := commandRunner.LookPath(irqBalancedName)
if err != nil {
return err
// irqbalance is not installed, skip the rest; pod should still start, so return nil instead.
log.Warnf(ctx, "Irqbalance binary not found: %v", err)

return
}
if err := os.WriteFile(irqSmpAffinityFile, []byte(newIRQSMPSetting), 0o644); err != nil {
return err

env := fmt.Sprintf("%s=%s", irqBalanceBannedCpus, newIRQBalanceSetting)
log.Debugf(ctx, "Container %q running '%s %s %s'", cName, env, irqBalanceFullPath, "--oneshot")

if err := commandRunner.RunCommand(
irqBalanceFullPath,
[]string{env},
"--oneshot",
); err != nil {
log.Warnf(ctx, "Container %q failed to run '%s %s %s', err: %q",
cName, env, irqBalanceFullPath, "--oneshot", err)
}
}

// updateNewIRQSMPAffinityMask updates SMP IRQ affinity and IRQ balance configuration files.
func (h *HighPerformanceHooks) updateNewIRQSMPAffinityMask(ctx context.Context, cID, cName, cpus string, enable bool) (newIRQBalanceSetting string, err error) {
content, err := os.ReadFile(h.irqSMPAffinityFile)
if err != nil {
return "", err
}

isIrqConfigExists := fileExists(irqBalanceConfigFile)
originalIRQSMPSetting := strings.TrimSpace(string(content))

if isIrqConfigExists {
if err := updateIrqBalanceConfigFile(irqBalanceConfigFile, newIRQBalanceSetting); err != nil {
return err
}
newIRQSMPSetting, newIRQBalanceSetting, err := calcIRQSMPAffinityMask(cpus, originalIRQSMPSetting, enable)
if err != nil {
return "", err
}

if !isServiceEnabled(irqBalancedName) || !isIrqConfigExists {
if _, err := exec.LookPath(irqBalancedName); err != nil {
// irqbalance is not installed, skip the rest; pod should still start, so return nil instead
log.Warnf(ctx, "Irqbalance binary not found: %v", err)
return nil
log.Debugf(ctx, "Container %q (%q) enable %t cpus %q set %q: %q -> %q; %q: %q",
cID, cName, enable, cpus,
h.irqSMPAffinityFile, originalIRQSMPSetting, newIRQSMPSetting,
h.irqBalanceConfigFile, newIRQBalanceSetting,
)

if err := os.WriteFile(h.irqSMPAffinityFile, []byte(newIRQSMPSetting), 0o644); err != nil {
return "", err
}

// Rollback IRQ SMP affinity file to maintain consistency if something goes wrong.
defer func() {
if err != nil {
if rollbackErr := os.WriteFile(h.irqSMPAffinityFile, []byte(originalIRQSMPSetting), 0o644); rollbackErr != nil {
log.Errorf(ctx, "Failed to rollback IRQ SMP affinity file after config update failure: err: %q, rollback err: %q",
err, rollbackErr)
}
}
// run irqbalance in daemon mode, so this won't cause delay
cmd := cmdrunner.Command(irqBalancedName, "--oneshot")
additionalEnv := irqBalanceBannedCpus + "=" + newIRQBalanceSetting
cmd.Env = append(os.Environ(), additionalEnv)
return cmd.Run()
}()

// Nothing else to do if irq balance config file does not exist.
if !fileExists(h.irqBalanceConfigFile) {
return newIRQBalanceSetting, nil
}

if err := restartIrqBalanceService(); err != nil {
log.Warnf(ctx, "Irqbalance service restart failed: %v", err)
if err := updateIrqBalanceConfigFile(h.irqBalanceConfigFile, newIRQBalanceSetting); err != nil {
return "", err
}
return nil

return newIRQBalanceSetting, nil
}

func setCPUQuota(podManager cgroups.Manager, containerManagers []cgroups.Manager) error {
Expand Down Expand Up @@ -957,26 +1083,15 @@ func RestoreIrqBalanceConfig(ctx context.Context, irqBalanceConfigFile, irqBanne
if err := updateIrqBalanceConfigFile(irqBalanceConfigFile, origBannedCPUMasks); err != nil {
return err
}
if isServiceEnabled(irqBalancedName) {
if err := restartIrqBalanceService(); err != nil {

if serviceManager.IsServiceEnabled(irqBalancedName) {
if err := serviceManager.RestartService(irqBalancedName); err != nil {
log.Warnf(ctx, "Irqbalance service restart failed: %v", err)
}
}
return nil
}

func ShouldCPUQuotaBeDisabled(ctx context.Context, cid string, cSpec *specs.Spec, s *sandbox.Sandbox, annotations fields.Set) bool {
if !shouldRunHooks(ctx, cid, cSpec, s) {
return false
}
if annotations[crioannotations.CPUQuotaAnnotation] == annotationTrue {
log.Warnf(ctx, "%s", annotationValueDeprecationWarning(crioannotations.CPUQuotaAnnotation))
}

return annotations[crioannotations.CPUQuotaAnnotation] == annotationTrue ||
annotations[crioannotations.CPUQuotaAnnotation] == annotationDisable
}

func shouldRunHooks(ctx context.Context, id string, cSpec *specs.Spec, s *sandbox.Sandbox) bool {
if isCgroupParentBurstable(s) {
log.Infof(ctx, "Container %q is a burstable pod. Skip PreStart.", id)
Expand Down
Loading
Loading