Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ if (repo != null) {
'docs/inkless/**',
'dump-schema-compose.yml',
'inkless-benchmarks/**',
'tests/kafkatest/services/inkless/**',
'tests/kafkatest/tests/inkless/**',
'inkless-sync/**',
'core/src/main/scala/io/aiven/inkless/**',
Expand Down
17 changes: 16 additions & 1 deletion tests/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ ARG ducker_creator=default
LABEL ducker.creator=$ducker_creator

# Update Linux and install necessary utilities.
RUN apt update && apt install -y sudo git netcat iptables rsync unzip wget curl jq coreutils openssh-server net-tools vim python3-pip python3-dev libffi-dev libssl-dev cmake pkg-config libfuse-dev iperf traceroute iproute2 iputils-ping && apt-get -y clean
# postgresql-client (psql) is used by the inkless consolidation pipeline test to
# query the Postgres control plane (WAL batches / log_start_offset) and assert WAL pruning.
RUN apt update && apt install -y sudo git netcat iptables rsync unzip wget curl jq coreutils openssh-server net-tools vim python3-pip python3-dev libffi-dev libssl-dev cmake pkg-config libfuse-dev iperf traceroute iproute2 iputils-ping postgresql-client && apt-get -y clean
RUN python3 -m pip install -U pip==21.1.1;
# NOTE: ducktape 0.12.0 supports py 3.9, 3.10, 3.11 and 3.12
COPY requirements.txt requirements.txt
Expand Down Expand Up @@ -132,6 +134,19 @@ RUN mkdir -p /opt/tiered-storage-plugin/core /opt/tiered-storage-plugin/s3 && \
tar xzf /tmp/s3.tgz -C /opt/tiered-storage-plugin/s3 --strip-components=1 && \
rm /tmp/*.tgz

# MinIO client (mc) for consolidation system tests that assert object movement
# (diskless WAL -> tiered-storage prefix) in MinIO.
ARG mc_version=mc.RELEASE.2025-08-13T08-35-41Z
RUN arch="$(dpkg --print-architecture)"; \
case "$arch" in \
amd64) mc_arch=amd64; mc_sha=01f866e9c5f9b87c2b09116fa5d7c06695b106242d829a8bb32990c00312e891 ;; \
arm64) mc_arch=arm64; mc_sha=14c8c9616cfce4636add161304353244e8de383b2e2752c0e9dad01d4c27c12c ;; \
*) echo "unsupported arch for mc: $arch" >&2; exit 1 ;; \
esac; \
curl -fsSL --retry 5 --retry-delay 5 "https://dl.min.io/client/mc/release/linux-${mc_arch}/archive/${mc_version}" -o /usr/local/bin/mc && \
echo "${mc_sha} /usr/local/bin/mc" | sha256sum -c - && \
chmod +x /usr/local/bin/mc
Comment thread
viktorsomogyi marked this conversation as resolved.

# To ensure the Kafka cluster starts successfully under JDK 17, we need to update the Zookeeper
# client from version 3.4.x to 3.5.7 in Kafka versions 2.1.1, 2.2.2, and 2.3.1, as the older Zookeeper
# client is incompatible with JDK 17. See KAFKA-17888 for more details.
Expand Down
15 changes: 15 additions & 0 deletions tests/kafkatest/services/inkless/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Inkless
# Copyright (C) 2024 - 2026 Aiven OY
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
273 changes: 273 additions & 0 deletions tests/kafkatest/services/inkless/consolidation_verifier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
# Inkless
# Copyright (C) 2024 - 2026 Aiven OY
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Helper to assert the inkless consolidation pipeline moved and pruned data.

Exposes three signals about a consolidating topic: the JMX consolidation gauges,
MinIO object counts (via ``mc``), and the Postgres control plane (via ``psql``).
Construct it with a started ``KafkaService`` and call the helpers from a test;
the Postgres/MinIO deps and the mc/psql clients are provided by the system-test
harness (see ``.github/workflows/inkless-system-tests.yml`` and the Dockerfile).
"""

import csv
import json
import time


class ConsolidationVerifier(object):
# Broker-aggregate consolidation gauges; Yammer gauges expose "Value".
JMX_PACKAGE = "io.aiven.inkless.consolidation"
JMX_TYPE = "ConsolidationMetrics"
TOTAL_LAG = "ConsolidationTotalLag"
LOCAL_LAG = "ConsolidationLocalLag"
DELETABLE_MESSAGES = "ConsolidationDeletableMessages"
JMX_ATTRIBUTE = "Value"

# MinIO (on ducknet alias "storage").
MINIO_ALIAS = "systest"
MINIO_ENDPOINT = "http://storage:9000"
MINIO_BUCKET = "inkless"
MINIO_ACCESS_KEY = "minioadmin"
MINIO_SECRET_KEY = "minioadmin"
# rsm.config.key.prefix; WAL objects live at the bucket root instead.
TIERED_PREFIX = "tiered-storage/"

# Postgres control plane (on ducknet alias "postgres").
PG_HOST = "postgres"
PG_PORT = 5432
PG_DB = "inkless"
PG_USER = "admin"
PG_PASSWORD = "admin"

def __init__(self, kafka):
self.kafka = kafka
self.logger = kafka.logger
self._mc_alias_ready = False

# ------------------------------------------------------------------ JMX --

@classmethod
def aggregate_object_name(cls, name):
# Object name for the broker-aggregate gauge (property order is irrelevant).
return "%s:type=%s,name=%s" % (cls.JMX_PACKAGE, cls.JMX_TYPE, name)

@classmethod
def partition_object_name(cls, name, topic, partition):
# Object name for a per-partition gauge (topic must not contain dots).
return "%s:type=%s,name=%s,topic=%s,partition=%d" % (
cls.JMX_PACKAGE, cls.JMX_TYPE, name, topic, partition)

@classmethod
def aggregate_object_names(cls):
# Object names for the three broker-aggregate gauges.
return [cls.aggregate_object_name(n)
for n in (cls.TOTAL_LAG, cls.LOCAL_LAG, cls.DELETABLE_MESSAGES)]

@classmethod
def find_aggregate_key(cls, keys, name):
# Locate the aggregate gauge's CSV key, ignoring key-property ordering and
# skipping per-partition keys. Returns the key, or None if absent.
attr_suffix = ":" + cls.JMX_ATTRIBUTE
for k in keys:
if not (k.startswith(cls.JMX_PACKAGE + ":") and k.endswith(attr_suffix)):
continue
if ("type=%s" % cls.JMX_TYPE) not in k:
continue
if "topic=" in k or "partition=" in k:
continue
if ("name=%s," % name) in k or ("name=%s:" % name) in k:
return k
return None

def start_jmx(self):
# Start JmxTool on the broker nodes only (the controller never exposes
# these MBeans, so wiring it into KafkaService would hang --wait).
self.kafka.jmx_object_names = self.aggregate_object_names()
self.kafka.jmx_attributes = [self.JMX_ATTRIBUTE]
for node in self.kafka.nodes:
self.kafka.start_jmx_tool(self.kafka.idx(node), node)

def latest_jmx_values(self):
# Most recent sample of each monitored attribute, summed across brokers
# (only the partition leader reports non-zero, so summing is safe).
totals = {}
for node in self.kafka.nodes:
sample = self._latest_jmx_sample_on(node)
for key, value in sample.items():
totals[key] = totals.get(key, 0.0) + value
return totals

def _latest_jmx_sample_on(self, node):
log = self.kafka.jmx_tool_log
try:
header = list(node.account.ssh_capture("head -n 1 %s" % log, allow_fail=True))
last = list(node.account.ssh_capture("tail -n 1 %s" % log, allow_fail=True))
except Exception as e: # noqa: BLE001 - best effort; missing log -> no sample
self.logger.debug("Could not read JMX log on %s: %s" % (node.account, e))
return {}
if not header or not last:
return {}
# Parse with csv: JMX ObjectNames contain commas (property separators) and
# are quoted in the header, so a naive comma split would misalign column
# names with their values. Header: "time","<objectName>:<attr>",...
names = next(csv.reader([header[0].strip()]), [])
data_line = last[0].strip()
if not data_line or data_line.startswith('"'):
# Only the header has been written so far; no sample yet.
return {}
fields = next(csv.reader([data_line]), [])
sample = {}
for i in range(1, len(names)):
if i >= len(fields):
break
try:
sample[names[i]] = float(fields[i])
except ValueError:
continue
return sample

def _aggregate_value(self, name):
values = self.latest_jmx_values()
key = self.find_aggregate_key(values.keys(), name)
return values.get(key, 0.0) if key is not None else 0.0

def total_lag(self):
return self._aggregate_value(self.TOTAL_LAG)

def local_lag(self):
return self._aggregate_value(self.LOCAL_LAG)

def deletable_messages(self):
return self._aggregate_value(self.DELETABLE_MESSAGES)

# ----------------------------------------------------------- Object store --

def _storage_node(self):
# Any broker node resolves the postgres/storage aliases on ducknet.
return self.kafka.nodes[0]

def _ensure_mc_alias(self, node):
if self._mc_alias_ready:
return
cmd = "mc alias set %s %s %s %s" % (
self.MINIO_ALIAS, self.MINIO_ENDPOINT,
self.MINIO_ACCESS_KEY, self.MINIO_SECRET_KEY)
rc = None
for _ in range(5):
rc = node.account.ssh(cmd, allow_fail=True)
if rc == 0:
self._mc_alias_ready = True
return
time.sleep(2)
raise AssertionError(
"Failed to configure mc alias '%s' against %s (rc=%s). Check MinIO "
"connectivity/credentials for the inkless system-test dependencies."
% (self.MINIO_ALIAS, self.MINIO_ENDPOINT, rc))

def object_keys(self, prefix=""):
# Object keys under the given bucket prefix, via mc ls --recursive --json.
node = self._storage_node()
self._ensure_mc_alias(node)
target = "%s/%s" % (self.MINIO_ALIAS, self.MINIO_BUCKET)
if prefix:
target += "/" + prefix
cmd = "mc ls --recursive --json %s" % target
keys = []
for line in node.account.ssh_capture(cmd, allow_fail=True):
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
except ValueError:
raise AssertionError(
"Unexpected non-JSON output from '%s': %s" % (cmd, line))
status = obj.get("status")
if status == "error":
err = obj.get("error") or {}
detail = err.get("message") or err.get("cause") or obj
raise AssertionError("mc failed listing %s: %s" % (target, detail))
if status != "success":
continue
key = obj.get("key")
if key:
keys.append(key)
return keys

def tiered_object_count(self):
# List only the tiered-storage prefix: the bucket persists across runs, so
# scanning the whole bucket here (called repeatedly in wait_until) would
# get slower and flakier over time.
return len(self.object_keys(self.TIERED_PREFIX))

def wal_object_count(self):
# Objects outside the tiered-storage prefix, i.e. diskless WAL files.
return len([k for k in self.object_keys() if not k.startswith(self.TIERED_PREFIX)])

# ----------------------------------------------------------- Control plane --

def _psql(self, sql):
node = self._storage_node()
cmd = "PGPASSWORD='%s' psql -h %s -p %d -U %s -d %s -tAc \"%s\"" % (
self.PG_PASSWORD, self.PG_HOST, self.PG_PORT, self.PG_USER, self.PG_DB, sql)
return "".join(node.account.ssh_capture(cmd, allow_fail=False)).strip()

@staticmethod
def _sql_literal(value):
# SQL string literal with single quotes doubled (standard escaping), so a
# topic with a quote can't break or inject into the query.
return "'%s'" % str(value).replace("'", "''")

def _psql_int(self, sql, default=0):
out = self._psql(sql)
if out == "" or out is None:
return default
try:
return int(out.splitlines()[0].strip())
except (ValueError, IndexError):
return default

def wal_batch_count(self, topic):
# WAL batch rows still tracked for the topic; drops to 0 as the pruner runs.
return self._psql_int(
"SELECT count(*) FROM batches b JOIN logs l ON b.topic_id = l.topic_id "
"WHERE l.topic_name = %s" % self._sql_literal(topic))

def min_log_start_offset(self, topic):
# Min log_start_offset across partitions; advances past 0 once WAL is pruned.
return self._psql_int(
"SELECT coalesce(min(log_start_offset), 0) FROM logs WHERE topic_name = %s"
% self._sql_literal(topic))

def high_watermark(self, topic):
return self._psql_int(
"SELECT coalesce(max(high_watermark), 0) FROM logs WHERE topic_name = %s"
% self._sql_literal(topic))

# --------------------------------------------------------------- Tooling --

def verify_tooling(self):
# Fail fast if the ducker image lacks the mc/psql clients.
node = self._storage_node()
missing = []
for tool in ("mc", "psql"):
if node.account.ssh("command -v %s" % tool, allow_fail=True) != 0:
missing.append(tool)
assert not missing, (
"Missing client tool(s) %s on the broker node. Rebuild the ducker image "
"(tests/docker/Dockerfile installs `mc` and `postgresql-client`) with "
"`tests/docker/ducker-ak up` before running consolidation pipeline tests." % missing)
Loading
Loading