Skip to content

fix(inkless:controller): fix leader skew for managed diskless after rolling restart#643

Open
jeqo wants to merge 1 commit into
mainfrom
jeqo/preferred-leader-election-on-consolidation
Open

fix(inkless:controller): fix leader skew for managed diskless after rolling restart#643
jeqo wants to merge 1 commit into
mainfrom
jeqo/preferred-leader-election-on-consolidation

Conversation

@jeqo

@jeqo jeqo commented Jun 12, 2026

Copy link
Copy Markdown
Contributor

Three changes that together ensure balanced leadership for managed diskless topics:

  1. Enable preferred leader rebalance: the previous blanket skip for all diskless topics was correct for unmanaged (RF=1) but wrong for managed (RF>1) where tiered storage upload/deletion are leader-only.

  2. ISR = all replicas at creation/reassignment/addPartitions: matches unmanaged diskless semantics. Data is in object storage — broker fencing doesn't affect availability.

  3. Expand ISR on broker unfence: when a broker returns, re-add it to ISR for diskless managed partitions where it is a replica. This repairs stale ISR from prior shrinks and enables the preferred leader election to redistribute leadership.

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adjusts controller preferred-leader balancing behavior for diskless topics so that managed diskless topics (RF>1) participate in periodic preferred leader election, avoiding tiered-storage leader-only work concentrating on a single broker after rolling restarts, while keeping the existing skip behavior for unmanaged/legacy diskless topics (RF=1).

Changes:

  • Update imbalance tracking (imbalancedPartitions) to include diskless topics when isDisklessManagedReplicasEnabled is true.
  • Allow maybeBalancePartitionLeaders / preferred leader elections to run for diskless topics when managed replicas are enabled, while continuing to skip legacy/unmanaged diskless.
  • Refine and extend unit tests to distinguish unmanaged vs managed diskless periodic leader balancing behavior.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.

File Description
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java Includes managed-diskless partitions in preferred-leader imbalance tracking and enables periodic preferred leader election for managed diskless topics.
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java Splits the prior diskless balancing test into unmanaged-skip vs managed-rebalance scenarios.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@jeqo jeqo force-pushed the jeqo/preferred-leader-election-on-consolidation branch 2 times, most recently from 2cbb67f to 5cddaa3 Compare June 12, 2026 12:36
@jeqo jeqo changed the title fix(inkless:controller): enable preferred leader rebalance for managed diskless fix(inkless:controller): fix leader skew for managed diskless after rolling restart Jun 12, 2026
@jeqo jeqo requested a review from Copilot June 12, 2026 12:38
@jeqo

jeqo commented Jun 12, 2026

Copy link
Copy Markdown
Contributor Author

Looking good:
image

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated 4 comments.

Comment thread metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java Outdated
Comment thread metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java Outdated
@jeqo jeqo force-pushed the jeqo/preferred-leader-election-on-consolidation branch from 5cddaa3 to 7d0a0ff Compare June 12, 2026 14:29
@jeqo jeqo requested a review from Copilot June 12, 2026 14:29

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.

Comments suppressed due to low confidence (2)

metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:2433

  • For diskless topics, brokerFilter currently allows all replicas (including fenced/controlled-shutdown) into the ISR. Since buildPartitionRegistration always selects the leader as isr.get(0), a manual assignment that lists a fenced broker first would create a partition whose leader is not active. Consider (a) rejecting diskless assignments where none of the replicas are active, and (b) ordering the diskless ISR so that active replicas come first (while still including all replicas) to ensure the initial leader is always active.
        int partitionId = startPartitionId;
        for (int i = 0; i < partitionAssignments.size(); i++) {
            PartitionAssignment partitionAssignment = partitionAssignments.get(i);
            List<Integer> isr = isrs.get(i).stream().
                filter(brokerFilter).toList();
            // If the ISR is empty, it means that all brokers are fenced or
            // in controlled shutdown. To be consistent with the replica placer,
            // we reject the create topic request with INVALID_REPLICATION_FACTOR.
            if (isr.isEmpty()) {
                throw new InvalidReplicationFactorException(
                    "Unable to replicate the partition " + replicationFactor +
                        " time(s): All brokers are currently fenced or in controlled shutdown.");
            }
            records.add(buildPartitionRegistration(partitionAssignment, isr)
                .toRecord(topicId, partitionId, new ImageWriterOptions.Builder(featureControl.metadataVersionOrThrow()).

metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:996

  • Diskless topics created via manual assignments still compute ISR by filtering only active brokers (assignment.brokerIds().stream().filter(clusterControl::isActive)), which contradicts the new diskless semantics elsewhere in this PR (ISR should include all replicas regardless of fenced state, while still ensuring at least one replica is active for leader election). This means managed diskless topics created with a manual assignment can still end up with a reduced ISR.
                TopicAssignment topicAssignment;
                Predicate<Integer> brokerFilter;
                // Diskless managed-replicas uses standard rack-aware assignment
                // with user-defined RF (or defaultReplicationFactor if RF=-1)
                if (!disklessEnabled || isDisklessManagedReplicasEnabled) {
                    topicAssignment = clusterControl.replicaPlacer().place(new PlacementSpec(
                        0,
                        numPartitions,
                        replicationFactor
                    ), clusterDescriber);
                    // For diskless (managed or not): ISR = all replicas regardless of fenced state.
                    // Data lives in object storage, so broker fencing doesn't affect availability.
                    brokerFilter = disklessEnabled ? x -> true : clusterControl::isActive;
                } else {
                    topicAssignment = createDisklessAssignment(numPartitions);
                    if (topicAssignment == null) {
                        return new ApiError(Errors.BROKER_NOT_AVAILABLE, "No brokers available to create diskless topic.");
                    }
                    brokerFilter = x -> true;
                }

                for (int partitionId = 0; partitionId < topicAssignment.assignments().size(); partitionId++) {
                    PartitionAssignment partitionAssignment = topicAssignment.assignments().get(partitionId);
                    List<Integer> isr = partitionAssignment.replicas().stream().
                        filter(brokerFilter).toList();
                    // If the ISR is empty, it means that all brokers are fenced or

Comment thread metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java Outdated
@jeqo jeqo force-pushed the jeqo/preferred-leader-election-on-consolidation branch from 7d0a0ff to 147acd9 Compare June 12, 2026 15:49
@jeqo jeqo requested a review from Copilot June 12, 2026 15:53

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated 4 comments.

Comment thread metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java Outdated
@jeqo jeqo force-pushed the jeqo/preferred-leader-election-on-consolidation branch from 147acd9 to 5363857 Compare June 12, 2026 16:18
@jeqo jeqo requested a review from Copilot June 12, 2026 16:19

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.

Comment thread metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java Outdated
@jeqo jeqo force-pushed the jeqo/preferred-leader-election-on-consolidation branch from 5363857 to a9f7eee Compare June 12, 2026 16:43
@jeqo jeqo requested a review from Copilot June 12, 2026 16:44
@jeqo jeqo force-pushed the jeqo/preferred-leader-election-on-consolidation branch from a9f7eee to 6074982 Compare June 12, 2026 16:46

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.

Comments suppressed due to low confidence (2)

metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:2419

  • For diskless, isrs is currently set to the full replica list (including fenced brokers), and later code uses isr.get(0) as the initial leader. If the replica placer can return fenced brokers first, this can create a partition with a fenced leader. Diskless can include fenced replicas in ISR, but we should still require at least one active replica and order ISR with active replicas first.
        for (int i = 0; i < partitionAssignments.size(); i++) {
            PartitionAssignment partitionAssignment = partitionAssignments.get(i);
            List<Integer> isr = isrs.get(i).stream().

metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:997

  • The subsequent ISR/leader selection logic relies on filter(brokerFilter) + isr.isEmpty() to reject the “all replicas fenced” case. But for diskless brokerFilter is x -> true, so that validation can never trigger and buildPartitionRegistration may pick a fenced broker as the initial leader (isr.get(0)). Diskless can include fenced replicas in ISR, but we still need at least one active replica and should order ISR with active replicas first.
                    brokerFilter = x -> true;
                }

                for (int partitionId = 0; partitionId < topicAssignment.assignments().size(); partitionId++) {
                    PartitionAssignment partitionAssignment = topicAssignment.assignments().get(partitionId);

@jeqo jeqo force-pushed the jeqo/preferred-leader-election-on-consolidation branch from 6074982 to 607bd1e Compare June 12, 2026 17:06
@jeqo jeqo requested a review from Copilot June 12, 2026 17:07

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.

Comment thread metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java Outdated
…olling restart

Three changes that together ensure balanced leadership for managed
diskless topics:

1. Enable preferred leader rebalance: the previous blanket skip for
   all diskless topics was correct for unmanaged (RF=1) but wrong for
   managed (RF>1) where tiered storage upload/deletion are leader-only.

2. ISR = all replicas at creation/reassignment/addPartitions: matches
   unmanaged diskless semantics. Data is in object storage — broker
   fencing doesn't affect availability.

3. Expand ISR on broker unfence: when a broker returns, re-add it to
   ISR for diskless managed partitions where it is a replica. This
   repairs stale ISR from prior shrinks and enables the preferred
   leader election to redistribute leadership.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@jeqo jeqo force-pushed the jeqo/preferred-leader-election-on-consolidation branch from 607bd1e to d3c25b3 Compare June 12, 2026 17:23
@jeqo jeqo requested a review from Copilot June 12, 2026 17:23

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated no new comments.

Comments suppressed due to low confidence (2)

metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:591

  • When preferred-leader tracking is disabled for a topic (e.g., unmanaged diskless), this block skips updating imbalancedPartitions but also never removes any existing entry for the partition. If the topic previously participated in tracking (before a config change), stale entries can keep arePartitionLeadersImbalanced() returning true and cause unnecessary periodic rebalance scheduling. Consider explicitly removing the partition from imbalancedPartitions when shouldTrackPreferredLeader is false.
        if (shouldTrackPreferredLeader(topicInfo.name)) {
            if (newPartInfo.hasPreferredLeader()) {
                imbalancedPartitions.remove(new TopicIdPartition(record.topicId(), record.partitionId()));
            } else {
                imbalancedPartitions.add(new TopicIdPartition(record.topicId(), record.partitionId()));

metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:639

  • Same issue as in replay(PartitionRecord): when shouldTrackPreferredLeader is false, this code skips updating imbalancedPartitions but does not remove any pre-existing entry. That can leave stale imbalances around after a topic transitions into a mode where preferred-leader tracking is disabled, keeping rebalance tasks rescheduled unnecessarily.
        if (shouldTrackPreferredLeader(topicInfo.name)) {
            if (newPartitionInfo.hasPreferredLeader()) {
                imbalancedPartitions.remove(new TopicIdPartition(record.topicId(), record.partitionId()));
            } else {
                imbalancedPartitions.add(new TopicIdPartition(record.topicId(), record.partitionId()));

@jeqo jeqo marked this pull request as ready for review June 12, 2026 17:29
@jeqo jeqo requested a review from giuseppelillo June 12, 2026 17:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants