From 4f2fa51f84e31ef9150ef24872ed1d2e193cf5d2 Mon Sep 17 00:00:00 2001 From: Giuseppe Lillo Date: Wed, 17 Jun 2026 15:08:58 +0200 Subject: [PATCH 1/2] fix(inkless:systest): avoid false data-loss failures from slow diskless tail reads in switch tests --- .../inkless/inkless_topic_switch_test.py | 39 ++++++++++++++++--- 1 file changed, 34 insertions(+), 5 deletions(-) diff --git a/tests/kafkatest/tests/inkless/inkless_topic_switch_test.py b/tests/kafkatest/tests/inkless/inkless_topic_switch_test.py index ab0855e490..ecb1180131 100644 --- a/tests/kafkatest/tests/inkless/inkless_topic_switch_test.py +++ b/tests/kafkatest/tests/inkless/inkless_topic_switch_test.py @@ -72,6 +72,18 @@ class InklessClassicToDisklessSwitchTest(Test): CONSUME_TIMEOUT_SEC = 120 BROKER_STARTUP_TIMEOUT_SEC = 120 + # Idle window after which kafka-console-consumer self-terminates when no new + # record arrives. For exact-count reads this doubles as the "end of stream" + # signal, so it must be comfortably longer than the worst-case latency for a + # freshly-committed diskless batch to become readable from the leader. + # Otherwise a transiently-slow diskless tail (a record that is already acked + # and committed but not yet served) looks like end-of-stream and the read + # stops short, producing a false data-loss failure. The wait loop returns as + # soon as the expected count is reached, so a generous value here costs + # nothing on the happy path and only widens the stall we tolerate before + # concluding the data is genuinely missing. + CONSUME_COMPLETION_IDLE_SEC = 60 + def __init__(self, test_context: TestContext) -> None: super(InklessClassicToDisklessSwitchTest, self).__init__(test_context=test_context) self.num_brokers = 3 @@ -403,12 +415,28 @@ def _consume_all_from_beginning(self, expected_count, topic=None, timeout_sec=No validation that catches post-restart classic data availability bugs. Set wait_for_completion when the caller needs an exact count rather than stopping as soon as the expected minimum is observed. + + Completion is decided by record count, not by the console consumer's + idle timeout: the wait returns as soon as ``expected_count`` records + have been delivered (for a no-duplicate log this caps exactly at + ``expected_count``, since there is nothing more to read, so exact-count + callers still observe ``consumed == expected_count``). Only if the + consumer drains and exits on its own *before* reaching the expected + count do we stop early and report the shortfall, which is the genuine + data-loss signal. The console consumer's idle timeout is set generously + (see ``CONSUME_COMPLETION_IDLE_SEC``) so that a transiently-slow + diskless tail does not look like end-of-stream and truncate the read. """ if topic is None: topic = self.topic if timeout_sec is None: timeout_sec = self.CONSUME_TIMEOUT_SEC + # For an exact-count read, keep the consumer alive across a slow diskless + # tail; a short idle timeout is fine when the caller only wants a minimum. + consumer_idle_ms = (self.CONSUME_COMPLETION_IDLE_SEC * 1000 + if wait_for_completion else 30000) + group_id = "fresh-%s" % str(uuid.uuid4())[:8] consumer = ConsoleConsumer( context=self.test_context, @@ -417,7 +445,7 @@ def _consume_all_from_beginning(self, expected_count, topic=None, timeout_sec=No topic=topic, group_id=group_id, from_beginning=True, - consumer_timeout_ms=30000, + consumer_timeout_ms=consumer_idle_ms, isolation_level="read_committed", print_key=True, ) @@ -429,18 +457,19 @@ def _check_consumed(): is_alive = consumer.alive(consumer.nodes[0]) if is_alive: consumer_seen_alive[0] = True - if wait_for_completion: - has_consumed = len(consumer.messages_consumed[1]) > 0 - return (consumer_seen_alive[0] or has_consumed) and not is_alive + # Done as soon as every expected record has been delivered. if len(consumer.messages_consumed[1]) >= expected_count: return True + # The consumer drained and exited on its own short of the expected + # count: stop waiting so the caller sees the shortfall (genuine data + # loss) instead of blocking until timeout_sec. return consumer_seen_alive[0] and not is_alive wait_until( _check_consumed, timeout_sec=timeout_sec, backoff_sec=2, - err_msg="Fresh consumer consumed only %d out of %d expected messages in %ds" % + err_msg=lambda: "Fresh consumer consumed only %d out of %d expected messages in %ds" % (len(consumer.messages_consumed[1]), expected_count, timeout_sec) ) From efc2498f8cf80edd3565522c4d05a79f8dd519f1 Mon Sep 17 00:00:00 2001 From: Giuseppe Lillo Date: Wed, 17 Jun 2026 15:45:22 +0200 Subject: [PATCH 2/2] fix(inkless:systest): fix sigstop and verify broker was stopped MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The test passed KafkaService.java_class_name() (regex kafka\.Kafka) to Trogdor's ProcessStopFaultSpec, but Trogdor's worker matches the target JVM by literal substring against jcmd -l. The escaped form never matched the real kafka.Kafka line, so SIGSTOP/SIGCONT were sent to zero pids and the leader was never actually frozen — the scenario passed without testing anything. Fix by passing the literal main-class name (kafka.Kafka) so the signal reaches the broker, and verify the fault actually took effect: assert the broker JVM reaches ps state T (stopped) during the pause and returns to running after SIGCONT, so any future no-op fails loudly instead of silently exercising nothing. --- .../inkless/inkless_topic_switch_test.py | 59 ++++++++++++++++++- 1 file changed, 58 insertions(+), 1 deletion(-) diff --git a/tests/kafkatest/tests/inkless/inkless_topic_switch_test.py b/tests/kafkatest/tests/inkless/inkless_topic_switch_test.py index ecb1180131..80d346e5c5 100644 --- a/tests/kafkatest/tests/inkless/inkless_topic_switch_test.py +++ b/tests/kafkatest/tests/inkless/inkless_topic_switch_test.py @@ -829,15 +829,67 @@ def _degrade_network(self, latency_ms=200, rate_kbit=2000, duration_ms=5 * 60 * ) return self.trogdor.create_task(task_name, spec) + # Trogdor's ProcessStopFaultWorker selects the target JVM by a *literal* + # substring match against each `jcmd -l` line (String.contains), so it must + # be given the plain main-class name as it appears there. Note this differs + # from KafkaService.java_class_name(), which returns the regex form + # "kafka\.Kafka" (escaped dot) intended for pgrep/jps regex matching and + # would not match literally here. + BROKER_JCMD_PROCESS_NAME = "kafka.Kafka" + def _pause_broker_process(self, node, duration_ms, task_name="pause-broker"): """SIGSTOP the broker JVM on ``node`` for ``duration_ms`` via Trogdor. Returns the Trogdor task; Trogdor will SIGCONT automatically when the task duration elapses or ``.stop()`` is called.""" spec = ProcessStopFaultSpec( - 0, duration_ms, [node], self.kafka.java_class_name(), + 0, duration_ms, [node], self.BROKER_JCMD_PROCESS_NAME, ) return self.trogdor.create_task(task_name, spec) + def _broker_pid(self, node): + """Return the broker JVM pid on ``node`` (the process Trogdor signals).""" + pids = self.kafka.pids(node) + assert pids, "No running broker JVM found on %s" % node.account.hostname + return pids[0] + + def _broker_process_state(self, node, pid): + """Return the primary ``ps`` state character for ``pid`` on ``node``. + ``T`` means stopped by a job-control signal (i.e. SIGSTOP took effect); + ``""`` means the process is no longer present.""" + for line in node.account.ssh_capture("ps -o stat= -p %s || true" % pid, + allow_fail=True): + line = line.decode("utf-8") if isinstance(line, bytes) else line + stat = line.strip() + if stat: + return stat[0] + return "" + + def _wait_for_broker_stopped(self, node, pid, timeout_sec=60): + """Assert the broker JVM actually froze (``ps`` state ``T``). Fails + loudly if the SIGSTOP fault is a no-op rather than silently exercising + nothing.""" + wait_until( + lambda: self._broker_process_state(node, pid) == "T", + timeout_sec=timeout_sec, + backoff_sec=1, + err_msg=lambda: "SIGSTOP fault was a no-op: broker pid %s on %s never " + "reached stopped (T) state (last state=%r)" % + (pid, node.account.hostname, + self._broker_process_state(node, pid)), + ) + + def _wait_for_broker_running(self, node, pid, timeout_sec=60): + """Assert the broker JVM resumed after SIGCONT (still alive, not ``T``).""" + wait_until( + lambda: self._broker_process_state(node, pid) not in ("", "T"), + timeout_sec=timeout_sec, + backoff_sec=1, + err_msg=lambda: "Broker pid %s on %s did not resume after SIGCONT " + "(last state=%r)" % + (pid, node.account.hostname, + self._broker_process_state(node, pid)), + ) + def _wait_for_mid_switch_state( self, state, min_partition_seconds=None, min_oldest_age_ms=None, timeout_sec=120) -> None: """Poll historical JMX until the injected fault is observed overlapping ``state``. @@ -1231,8 +1283,13 @@ def test_switch_leader_crash(self, metadata_quorum, leader_failure_mode) -> None self._stop_broker(leader_node, clean_shutdown=False) self._start_broker(leader_node) elif leader_failure_mode == "sigstop": + leader_pid = self._broker_pid(leader_node) pause = self._pause_broker_process(leader_node, duration_ms=pause_duration_ms) + # Confirm the fault actually took effect: a silent no-op here would + # let the whole scenario pass without ever freezing the leader. + self._wait_for_broker_stopped(leader_node, leader_pid) pause.wait_for_done(timeout_sec=pause_duration_ms // 1000 + 60) + self._wait_for_broker_running(leader_node, leader_pid) else: raise AssertionError("Unknown leader_failure_mode: %s" % leader_failure_mode)