Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 92 additions & 6 deletions tests/kafkatest/tests/inkless/inkless_topic_switch_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,18 @@ class InklessClassicToDisklessSwitchTest(Test):
CONSUME_TIMEOUT_SEC = 120
BROKER_STARTUP_TIMEOUT_SEC = 120

# Idle window after which kafka-console-consumer self-terminates when no new
# record arrives. For exact-count reads this doubles as the "end of stream"
# signal, so it must be comfortably longer than the worst-case latency for a
# freshly-committed diskless batch to become readable from the leader.
# Otherwise a transiently-slow diskless tail (a record that is already acked
# and committed but not yet served) looks like end-of-stream and the read
# stops short, producing a false data-loss failure. The wait loop returns as
# soon as the expected count is reached, so a generous value here costs
# nothing on the happy path and only widens the stall we tolerate before
# concluding the data is genuinely missing.
CONSUME_COMPLETION_IDLE_SEC = 60

def __init__(self, test_context: TestContext) -> None:
super(InklessClassicToDisklessSwitchTest, self).__init__(test_context=test_context)
self.num_brokers = 3
Expand Down Expand Up @@ -403,12 +415,28 @@ def _consume_all_from_beginning(self, expected_count, topic=None, timeout_sec=No
validation that catches post-restart classic data availability bugs.
Set wait_for_completion when the caller needs an exact count rather
than stopping as soon as the expected minimum is observed.

Completion is decided by record count, not by the console consumer's
idle timeout: the wait returns as soon as ``expected_count`` records
have been delivered (for a no-duplicate log this caps exactly at
``expected_count``, since there is nothing more to read, so exact-count
callers still observe ``consumed == expected_count``). Only if the
consumer drains and exits on its own *before* reaching the expected
count do we stop early and report the shortfall, which is the genuine
data-loss signal. The console consumer's idle timeout is set generously
(see ``CONSUME_COMPLETION_IDLE_SEC``) so that a transiently-slow
diskless tail does not look like end-of-stream and truncate the read.
"""
if topic is None:
topic = self.topic
if timeout_sec is None:
timeout_sec = self.CONSUME_TIMEOUT_SEC

# For an exact-count read, keep the consumer alive across a slow diskless
# tail; a short idle timeout is fine when the caller only wants a minimum.
consumer_idle_ms = (self.CONSUME_COMPLETION_IDLE_SEC * 1000
if wait_for_completion else 30000)

group_id = "fresh-%s" % str(uuid.uuid4())[:8]
consumer = ConsoleConsumer(
context=self.test_context,
Expand All @@ -417,7 +445,7 @@ def _consume_all_from_beginning(self, expected_count, topic=None, timeout_sec=No
topic=topic,
group_id=group_id,
from_beginning=True,
consumer_timeout_ms=30000,
consumer_timeout_ms=consumer_idle_ms,
isolation_level="read_committed",
print_key=True,
)
Expand All @@ -429,18 +457,19 @@ def _check_consumed():
is_alive = consumer.alive(consumer.nodes[0])
if is_alive:
consumer_seen_alive[0] = True
if wait_for_completion:
has_consumed = len(consumer.messages_consumed[1]) > 0
return (consumer_seen_alive[0] or has_consumed) and not is_alive
# Done as soon as every expected record has been delivered.
if len(consumer.messages_consumed[1]) >= expected_count:
return True
# The consumer drained and exited on its own short of the expected
# count: stop waiting so the caller sees the shortfall (genuine data
# loss) instead of blocking until timeout_sec.
return consumer_seen_alive[0] and not is_alive

wait_until(
_check_consumed,
timeout_sec=timeout_sec,
backoff_sec=2,
err_msg="Fresh consumer consumed only %d out of %d expected messages in %ds" %
err_msg=lambda: "Fresh consumer consumed only %d out of %d expected messages in %ds" %
(len(consumer.messages_consumed[1]), expected_count, timeout_sec)
)

Expand Down Expand Up @@ -800,15 +829,67 @@ def _degrade_network(self, latency_ms=200, rate_kbit=2000, duration_ms=5 * 60 *
)
return self.trogdor.create_task(task_name, spec)

# Trogdor's ProcessStopFaultWorker selects the target JVM by a *literal*
# substring match against each `jcmd -l` line (String.contains), so it must
# be given the plain main-class name as it appears there. Note this differs
# from KafkaService.java_class_name(), which returns the regex form
# "kafka\.Kafka" (escaped dot) intended for pgrep/jps regex matching and
# would not match literally here.
BROKER_JCMD_PROCESS_NAME = "kafka.Kafka"

def _pause_broker_process(self, node, duration_ms, task_name="pause-broker"):
"""SIGSTOP the broker JVM on ``node`` for ``duration_ms`` via Trogdor.
Returns the Trogdor task; Trogdor will SIGCONT automatically when
the task duration elapses or ``.stop()`` is called."""
spec = ProcessStopFaultSpec(
0, duration_ms, [node], self.kafka.java_class_name(),
0, duration_ms, [node], self.BROKER_JCMD_PROCESS_NAME,
)
return self.trogdor.create_task(task_name, spec)

def _broker_pid(self, node):
"""Return the broker JVM pid on ``node`` (the process Trogdor signals)."""
pids = self.kafka.pids(node)
assert pids, "No running broker JVM found on %s" % node.account.hostname
return pids[0]

def _broker_process_state(self, node, pid):
"""Return the primary ``ps`` state character for ``pid`` on ``node``.
``T`` means stopped by a job-control signal (i.e. SIGSTOP took effect);
``""`` means the process is no longer present."""
for line in node.account.ssh_capture("ps -o stat= -p %s || true" % pid,
allow_fail=True):
line = line.decode("utf-8") if isinstance(line, bytes) else line
stat = line.strip()
if stat:
return stat[0]
return ""

def _wait_for_broker_stopped(self, node, pid, timeout_sec=60):
"""Assert the broker JVM actually froze (``ps`` state ``T``). Fails
loudly if the SIGSTOP fault is a no-op rather than silently exercising
nothing."""
wait_until(
lambda: self._broker_process_state(node, pid) == "T",
timeout_sec=timeout_sec,
backoff_sec=1,
err_msg=lambda: "SIGSTOP fault was a no-op: broker pid %s on %s never "
"reached stopped (T) state (last state=%r)" %
(pid, node.account.hostname,
self._broker_process_state(node, pid)),
)

def _wait_for_broker_running(self, node, pid, timeout_sec=60):
"""Assert the broker JVM resumed after SIGCONT (still alive, not ``T``)."""
wait_until(
lambda: self._broker_process_state(node, pid) not in ("", "T"),
timeout_sec=timeout_sec,
backoff_sec=1,
err_msg=lambda: "Broker pid %s on %s did not resume after SIGCONT "
"(last state=%r)" %
(pid, node.account.hostname,
self._broker_process_state(node, pid)),
)

def _wait_for_mid_switch_state(
self, state, min_partition_seconds=None, min_oldest_age_ms=None, timeout_sec=120) -> None:
"""Poll historical JMX until the injected fault is observed overlapping ``state``.
Expand Down Expand Up @@ -1202,8 +1283,13 @@ def test_switch_leader_crash(self, metadata_quorum, leader_failure_mode) -> None
self._stop_broker(leader_node, clean_shutdown=False)
self._start_broker(leader_node)
elif leader_failure_mode == "sigstop":
leader_pid = self._broker_pid(leader_node)
pause = self._pause_broker_process(leader_node, duration_ms=pause_duration_ms)
# Confirm the fault actually took effect: a silent no-op here would
# let the whole scenario pass without ever freezing the leader.
self._wait_for_broker_stopped(leader_node, leader_pid)
pause.wait_for_done(timeout_sec=pause_duration_ms // 1000 + 60)
self._wait_for_broker_running(leader_node, leader_pid)
else:
raise AssertionError("Unknown leader_failure_mode: %s" % leader_failure_mode)

Expand Down
Loading