Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 143 additions & 46 deletions internal/runtimehandlerhooks/high_performance_hooks_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,54 @@ const (
SharedCPUsEnvVar = "OPENSHIFT_SHARED_CPUS"
)

// ServiceManager interface for managing system services.
type ServiceManager interface {
IsServiceEnabled(serviceName string) bool
RestartService(serviceName string) error
}

// CommandRunner interface for running external commands.
type CommandRunner interface {
LookPath(file string) (string, error)
RunCommand(name string, env []string, arg ...string) error
}

// Default implementations.
type defaultServiceManager struct{}

func (d *defaultServiceManager) IsServiceEnabled(serviceName string) bool {
return isServiceEnabled(serviceName)
}

func (d *defaultServiceManager) RestartService(serviceName string) error {
return restartService(serviceName)
}

type defaultCommandRunner struct{}

func (d *defaultCommandRunner) LookPath(file string) (string, error) {
return exec.LookPath(file)
}

func (d *defaultCommandRunner) RunCommand(name string, env []string, arg ...string) error {
cmd := cmdrunner.Command(name, arg...)
if len(env) > 0 {
cmd.Env = env
}

return cmd.Run()
}

var (
serviceManager ServiceManager = &defaultServiceManager{}
commandRunner CommandRunner = &defaultCommandRunner{}
)

// HighPerformanceHooks used to run additional hooks that will configure a system for the latency sensitive workloads.
type HighPerformanceHooks struct {
irqBalanceConfigFile string
cpusetLock sync.Mutex
irqSMPAffinityFileLock sync.Mutex
irqBalanceConfigFileLock sync.Mutex
updateIRQSMPAffinityLock sync.Mutex
sharedCPUs string
irqSMPAffinityFile string
}
Expand Down Expand Up @@ -197,13 +239,6 @@ func (h *HighPerformanceHooks) PreStop(ctx context.Context, c *oci.Container, s
return nil
}

// enable the IRQ smp balancing for the container CPUs
if shouldIRQLoadBalancingBeDisabled(ctx, s.Annotations()) {
if err := h.setIRQLoadBalancing(ctx, c, true); err != nil {
return fmt.Errorf("set IRQ load balancing: %w", err)
}
}

// disable the CPU load balancing for the container CPUs
if shouldCPULoadBalancingBeDisabled(ctx, s.Annotations()) {
podManager, containerManagers, err := libctrManagersForPodAndContainerCgroup(c, s.CgroupParent())
Expand Down Expand Up @@ -240,13 +275,23 @@ func (h *HighPerformanceHooks) PreStop(ctx context.Context, c *oci.Container, s
}

// If CPU load balancing is enabled, then *all* containers must run this PostStop hook.
func (*HighPerformanceHooks) PostStop(ctx context.Context, c *oci.Container, s *sandbox.Sandbox) error {
func (h *HighPerformanceHooks) PostStop(ctx context.Context, c *oci.Container, s *sandbox.Sandbox) error {
cSpec := c.Spec()
if shouldRunHooks(ctx, c.ID(), &cSpec, s) {
// enable the IRQ smp balancing for the container CPUs
if shouldIRQLoadBalancingBeDisabled(ctx, s.Annotations()) {
if err := h.setIRQLoadBalancing(ctx, c, true); err != nil {
return fmt.Errorf("set IRQ load balancing: %w", err)
}
}
}

// We could check if `!cpuLoadBalancingAllowed()` here, but it requires access to the config, which would be
// odd to plumb. Instead, always assume if they're using a HighPerformanceHook, they have CPULoadBalanceDisabled
// annotation allowed.
h := &DefaultCPULoadBalanceHooks{}
dh := &DefaultCPULoadBalanceHooks{}

return h.PostStop(ctx, c, s)
return dh.PostStop(ctx, c, s)
}

func shouldCPULoadBalancingBeDisabled(ctx context.Context, annotations fields.Set) bool {
Expand Down Expand Up @@ -572,6 +617,16 @@ func disableCPULoadBalancingV1(containerManagers []cgroups.Manager) error {
return nil
}

// setIRQLoadBalancing configures interrupt load balancing for container CPUs by updating
// the IRQ SMP affinity mask and IRQ balance service configuration. It then handles IRQ balance restart.
// When enable is false (= container added), removes container CPUs from interrupt handling to reduce latency;
// when true (= container removed), restores them.
// The entire function after reading IRQ SMP affinity must be wrapped inside a single lock to avoid race conditions.
// The reason for this is that once we read from the SMP IRQ affinity file, we have to calculate new masks and
// write those masks to /proc/irq/default_smp_affinity and /etc/sysconfig/irqbalance.
// We also must restart irqbalance or run irqbalance --oneshot within the same lock.
// Without this lock, 2 threads could read from the file and calculate the new mask but overwrite the
// results of each other, or start irbalance --oneshot with different values.
func (h *HighPerformanceHooks) setIRQLoadBalancing(ctx context.Context, c *oci.Container, enable bool) error {
lspec := c.Spec().Linux
if lspec == nil ||
Expand All @@ -581,69 +636,111 @@ func (h *HighPerformanceHooks) setIRQLoadBalancing(ctx context.Context, c *oci.C
return fmt.Errorf("find container %s CPUs", c.ID())
}

newIRQBalanceSetting, err := h.updateNewIRQSMPAffinityMask(lspec.Resources.CPU.Cpus, enable)
h.updateIRQSMPAffinityLock.Lock()
defer h.updateIRQSMPAffinityLock.Unlock()

newIRQBalanceSetting, err := h.updateNewIRQSMPAffinityMask(ctx, c.ID(), c.Name(), lspec.Resources.CPU.Cpus, enable)
if err != nil {
return err
}
// Now, restart the irqbalance service or run irqbalance --oneshot command.
// On failure, this will log errors but will not return them, as it is not critical for the pod to start.
if !h.handleIRQBalanceRestart(ctx, c.Name()) {
h.handleIRQBalanceOneShot(ctx, c.Name(), newIRQBalanceSetting)
}

isIrqConfigExists := fileExists(h.irqBalanceConfigFile)
return nil
}

if isIrqConfigExists {
if err := h.updateIrqBalanceConfigFile(newIRQBalanceSetting); err != nil {
return err
}
// handleIRQBalanceRestart handles the restart of the irqbalance service.
func (h *HighPerformanceHooks) handleIRQBalanceRestart(ctx context.Context, cName string) bool {
// If the irqbalance service is enabled, restart it and return.
// systemd's StartLimitBurst might cause issues here when container restarts occur in very
// quick succession and the parameter must be reconfigured for this to work correctly.
// See:
// https://github.com/cri-o/cri-o/pull/8834/commits/b96928dcbb7956e0ebde42238e88955831411216
if !serviceManager.IsServiceEnabled(irqBalancedName) || !fileExists(h.irqBalanceConfigFile) {
return false
}

if !isServiceEnabled(irqBalancedName) || !isIrqConfigExists {
if _, err := exec.LookPath(irqBalancedName); err != nil {
// irqbalance is not installed, skip the rest; pod should still start, so return nil instead
log.Warnf(ctx, "Irqbalance binary not found: %v", err)
log.Debugf(ctx, "Container %q restarting irqbalance service", cName)

return nil
}
// run irqbalance in daemon mode, so this won't cause delay
cmd := cmdrunner.Command(irqBalancedName, "--oneshot")
additionalEnv := irqBalanceBannedCpus + "=" + newIRQBalanceSetting
cmd.Env = append(os.Environ(), additionalEnv)
if err := serviceManager.RestartService(irqBalancedName); err != nil {
log.Warnf(ctx, "Irqbalance service restart failed: %v", err)

return cmd.Run()
return false
}

if err := restartIrqBalanceService(); err != nil {
log.Warnf(ctx, "Irqbalance service restart failed: %v", err)
return true
}

// handleIRQBalanceOneShot runs irqbalance --oneshot command.
func (h *HighPerformanceHooks) handleIRQBalanceOneShot(ctx context.Context, cName, newIRQBalanceSetting string) {
irqBalanceFullPath, err := commandRunner.LookPath(irqBalancedName)
if err != nil {
// irqbalance is not installed, skip the rest; pod should still start, so return nil instead.
log.Warnf(ctx, "Irqbalance binary not found: %v", err)

return
}

return nil
}
env := fmt.Sprintf("%s=%s", irqBalanceBannedCpus, newIRQBalanceSetting)
log.Debugf(ctx, "Container %q running '%s %s %s'", cName, env, irqBalanceFullPath, "--oneshot")

func (h *HighPerformanceHooks) updateNewIRQSMPAffinityMask(cpus string, enable bool) (string, error) {
h.irqSMPAffinityFileLock.Lock()
defer h.irqSMPAffinityFileLock.Unlock()
if err := commandRunner.RunCommand(
irqBalanceFullPath,
[]string{env},
"--oneshot",
); err != nil {
log.Warnf(ctx, "Container %q failed to run '%s %s %s', err: %q",
cName, env, irqBalanceFullPath, "--oneshot", err)
}
}

// updateNewIRQSMPAffinityMask updates SMP IRQ affinity and IRQ balance configuration files.
func (h *HighPerformanceHooks) updateNewIRQSMPAffinityMask(ctx context.Context, cID, cName, cpus string, enable bool) (newIRQBalanceSetting string, err error) {
content, err := os.ReadFile(h.irqSMPAffinityFile)
if err != nil {
return "", err
}

currentIRQSMPSetting := strings.TrimSpace(string(content))
originalIRQSMPSetting := strings.TrimSpace(string(content))

newIRQSMPSetting, newIRQBalanceSetting, err := calcIRQSMPAffinityMask(cpus, currentIRQSMPSetting, enable)
newIRQSMPSetting, newIRQBalanceSetting, err := calcIRQSMPAffinityMask(cpus, originalIRQSMPSetting, enable)
if err != nil {
return "", err
}

log.Debugf(ctx, "Container %q (%q) enable %t cpus %q set %q: %q -> %q; %q: %q",
cID, cName, enable, cpus,
h.irqSMPAffinityFile, originalIRQSMPSetting, newIRQSMPSetting,
h.irqBalanceConfigFile, newIRQBalanceSetting,
)

if err := os.WriteFile(h.irqSMPAffinityFile, []byte(newIRQSMPSetting), 0o644); err != nil {
return "", err
}

return newIRQBalanceSetting, nil
}
// Rollback IRQ SMP affinity file to maintain consistency if something goes wrong.
defer func() {
if err != nil {
if rollbackErr := os.WriteFile(h.irqSMPAffinityFile, []byte(originalIRQSMPSetting), 0o644); rollbackErr != nil {
log.Errorf(ctx, "Failed to rollback IRQ SMP affinity file after config update failure: err: %q, rollback err: %q",
err, rollbackErr)
}
}
}()

// Nothing else to do if irq balance config file does not exist.
if !fileExists(h.irqBalanceConfigFile) {
return newIRQBalanceSetting, nil
}

func (h *HighPerformanceHooks) updateIrqBalanceConfigFile(newIRQBalanceSetting string) error {
h.irqBalanceConfigFileLock.Lock()
defer h.irqBalanceConfigFileLock.Unlock()
if err := updateIrqBalanceConfigFile(h.irqBalanceConfigFile, newIRQBalanceSetting); err != nil {
return "", err
}

return updateIrqBalanceConfigFile(h.irqBalanceConfigFile, newIRQBalanceSetting)
return newIRQBalanceSetting, nil
}

func setCPUQuota(podManager cgroups.Manager, containerManagers []cgroups.Manager) error {
Expand Down Expand Up @@ -1047,8 +1144,8 @@ func RestoreIrqBalanceConfig(ctx context.Context, irqBalanceConfigFile, irqBanne
return err
}

if isServiceEnabled(irqBalancedName) {
if err := restartIrqBalanceService(); err != nil {
if serviceManager.IsServiceEnabled(irqBalancedName) {
if err := serviceManager.RestartService(irqBalancedName); err != nil {
log.Warnf(ctx, "Irqbalance service restart failed: %v", err)
}
}
Expand Down
Loading
Loading