Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix incorrect lag and entries-read value with tombstone #13379

Open
wants to merge 4 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_
int streamDeleteItem(stream *s, streamID *id);
void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_id);
long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id);
long long streamEstimateDistance(stream *s, streamCG *cg, streamID *next_id);
int64_t streamTrimByLength(stream *s, long long maxlen, int approx);
int64_t streamTrimByID(stream *s, streamID minid, int approx);

Expand Down
92 changes: 38 additions & 54 deletions src/t_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -1400,16 +1400,6 @@ int streamIDEqZero(streamID *id) {
int streamRangeHasTombstones(stream *s, streamID *start, streamID *end) {
streamID start_id, end_id;

if (!s->length || streamIDEqZero(&s->max_deleted_entry_id)) {
/* The stream is empty or has no tombstones. */
return 0;
}

if (streamCompareID(&s->first_id,&s->max_deleted_entry_id) > 0) {
/* The latest tombstone is before the first entry. */
return 0;
}

if (start) {
start_id = *start;
} else {
Expand Down Expand Up @@ -1441,33 +1431,48 @@ int streamRangeHasTombstones(stream *s, streamID *start, streamID *end) {
void streamReplyWithCGLag(client *c, stream *s, streamCG *cg) {
int valid = 0;
long long lag = 0;

if (!s->entries_added) {
/* The lag of a newly-initialized stream is 0. */
lag = 0;
valid = 1;
} else if (cg->entries_read != SCG_INVALID_ENTRIES_READ && !streamRangeHasTombstones(s,&cg->last_id,NULL)) {
/* No fragmentation ahead means that the group's logical reads counter
* is valid for performing the lag calculation. */
lag = (long long)s->entries_added - cg->entries_read;
/* Attempt to retrieve the group's last ID logical read counter. */
long long entries_read = streamEstimateDistance(s, cg, &cg->last_id);
if (entries_read != SCG_INVALID_ENTRIES_READ) {
/* A valid counter was obtained. */
lag = (long long)s->entries_added - entries_read;
valid = 1;
} else {
/* Attempt to retrieve the group's last ID logical read counter. */
long long entries_read = streamEstimateDistanceFromFirstEverEntry(s,&cg->last_id);
if (entries_read != SCG_INVALID_ENTRIES_READ) {
/* A valid counter was obtained. */
lag = (long long)s->entries_added - entries_read;
valid = 1;
}
}

if (valid) {
addReplyLongLong(c,lag);
/* Read counter of the last delivered ID */
addReplyBulkCString(c, "entries-read");
addReplyLongLong(c, entries_read);
/* Group lag */
addReplyBulkCString(c, "lag");
addReplyLongLong(c, lag);
} else {
addReplyBulkCString(c, "entries-read");
addReplyNull(c);
addReplyBulkCString(c, "lag");
addReplyNull(c);
}
}

/* The function returns the logical read counter corresponding to next_id
* based on the information of the group.
*/
long long streamEstimateDistance(stream *s, streamCG *cg, streamID *next_id) {
/* If the values of next_id and last_id are the same,
* it is considered that only the current value needs to be returned,
* otherwise it is considered to be the calculated value.
* This is used to align with the streamEstimateDistanceFromFirstEverEntry method.
*/
long long step = streamCompareID(&cg->last_id, next_id) == 0 ? 0 : 1;
if (cg->entries_read != SCG_INVALID_ENTRIES_READ && !streamRangeHasTombstones(s, &cg->last_id, NULL)) {
/* A valid counter and no future tombstones mean we can
* increment the read counter to keep tracking the group's
* progress. */
return cg->entries_read + step;
}
return streamEstimateDistanceFromFirstEverEntry(s, next_id);
}

/* This function returns a value that is the ID's logical read counter, or its
* distance (the number of entries) from the first entry ever to have been added
* to the stream.
Expand Down Expand Up @@ -1502,6 +1507,10 @@ long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id) {
return s->entries_added;
}

/* There are fragmentations between the `id` and the stream's last-generated-id. */
if (!streamIDEqZero(id) && streamCompareID(id,&s->max_deleted_entry_id) < 0)
return SCG_INVALID_ENTRIES_READ;

int cmp_last = streamCompareID(id,&s->last_id);
if (cmp_last == 0) {
/* Return the exact counter of the last entry in the stream. */
Expand Down Expand Up @@ -1684,15 +1693,7 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
while(streamIteratorGetID(&si,&id,&numfields)) {
/* Update the group last_id if needed. */
if (group && streamCompareID(&id,&group->last_id) > 0) {
if (group->entries_read != SCG_INVALID_ENTRIES_READ && !streamRangeHasTombstones(s,&id,NULL)) {
/* A valid counter and no future tombstones mean we can
* increment the read counter to keep tracking the group's
* progress. */
group->entries_read++;
} else if (s->entries_added) {
/* The group's counter may be invalid, so we try to obtain it. */
group->entries_read = streamEstimateDistanceFromFirstEverEntry(s,&id);
}
group->entries_read = streamEstimateDistance(s, group, &id);
group->last_id = id;
/* In the past, we would only set it when NOACK was specified. And in
* #9127, XCLAIM did not propagate entries_read in ACK, which would
Expand Down Expand Up @@ -3737,16 +3738,6 @@ void xinfoReplyWithStreamInfo(client *c, stream *s) {
addReplyBulkCString(c,"last-delivered-id");
addReplyStreamID(c,&cg->last_id);

/* Read counter of the last delivered ID */
addReplyBulkCString(c,"entries-read");
if (cg->entries_read != SCG_INVALID_ENTRIES_READ) {
addReplyLongLong(c,cg->entries_read);
} else {
addReplyNull(c);
}

/* Group lag */
addReplyBulkCString(c,"lag");
streamReplyWithCGLag(c,s,cg);

/* Group PEL count */
Expand Down Expand Up @@ -3934,13 +3925,6 @@ NULL
addReplyLongLong(c,raxSize(cg->pel));
addReplyBulkCString(c,"last-delivered-id");
addReplyStreamID(c,&cg->last_id);
addReplyBulkCString(c,"entries-read");
if (cg->entries_read != SCG_INVALID_ENTRIES_READ) {
addReplyLongLong(c,cg->entries_read);
} else {
addReplyNull(c);
}
addReplyBulkCString(c,"lag");
streamReplyWithCGLag(c,s,cg);
}
raxStop(&ri);
Expand Down
69 changes: 64 additions & 5 deletions tests/unit/type/stream-cgroups.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -1127,7 +1127,7 @@ start_server {
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $reply max-deleted-entry-id] "0-0"
assert_equal [dict get $reply entries-added] 0
assert_equal [dict get $group entries-read] {}
assert_equal [dict get $group entries-read] 0
assert_equal [dict get $group lag] 0

r XADD x 1-0 data a
Expand All @@ -1137,7 +1137,7 @@ start_server {
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $reply max-deleted-entry-id] "1-0"
assert_equal [dict get $reply entries-added] 1
assert_equal [dict get $group entries-read] {}
assert_equal [dict get $group entries-read] 1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is it 1? i don't see any XREADGROUP command.

assert_equal [dict get $group lag] 0
}

Expand All @@ -1152,7 +1152,7 @@ start_server {

set reply [r XINFO STREAM x FULL]
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $group entries-read] {}
assert_equal [dict get $group entries-read] 0
assert_equal [dict get $group lag] 5

r XREADGROUP GROUP g1 c11 COUNT 1 STREAMS x >
Expand Down Expand Up @@ -1226,7 +1226,7 @@ start_server {
assert_equal [dict get $group entries-read] 5
assert_equal [dict get $group lag] 1
set group [lindex [dict get $reply groups] 1]
assert_equal [dict get $group entries-read] {}
assert_equal [dict get $group entries-read] 3
assert_equal [dict get $group lag] 3

r XTRIM x MINID = 5-0
Expand All @@ -1235,10 +1235,69 @@ start_server {
assert_equal [dict get $group entries-read] 5
assert_equal [dict get $group lag] 1
set group [lindex [dict get $reply groups] 1]
assert_equal [dict get $group entries-read] {}
assert_equal [dict get $group entries-read] 4
assert_equal [dict get $group lag] 2
}

test {Consumer group check lag and entries-read consistency} {
r DEL x
r XGROUP CREATE x processing $ MKSTREAM
r XGROUP CREATE x processing1 $ MKSTREAM
r XADD x 0-1 name Mercury
r XADD x 0-2 name Venus
r XADD x 0-3 name Earth
r XADD x 0-4 name Jupiter

r XREADGROUP GROUP processing alice COUNT 2 STREAMS x >
r XDEL x 0-3

set reply [r XINFO STREAM x FULL]
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $group entries-read] {}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why isn't it 2?

assert_equal [dict get $group lag] {}

r DEL x
r XGROUP CREATE x processing $ MKSTREAM
r XGROUP CREATE x processing1 $ MKSTREAM
r XADD x 0-1 name Mercury
r XADD x 0-2 name Venus
r XADD x 0-3 name Earth
r XADD x 0-4 name Jupiter

r XDEL x 0-3
r XREADGROUP GROUP processing alice COUNT 2 STREAMS x >

set reply [r XINFO STREAM x FULL]
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $group entries-read] {}
assert_equal [dict get $group lag] {}
}

test {Consumer Group Lag with XDELs and tombstone after the last_id of consume group} {
r DEL x
r XGROUP CREATE x g1 $ MKSTREAM
r XADD x 1-0 data a
r XREADGROUP GROUP g1 alice STREAMS x > ;# Read one entry
r XADD x 2-0 data c
r XADD x 3-0 data d
r XDEL x 1-0
r XDEL x 2-0
# Now the latest tombstone(2-0) is before the first entry(3-0), but there is still
# a tombstone(2-0) after the last_id of the consume group.
set reply [r XINFO STREAM x FULL]
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $group entries-read] {}
assert_equal [dict get $group lag] {}

# Now there is a tombstone(2-0) after the last_id of the consume group, so after consuming
# entry(3-0), the group's counter will be invalid.
r XREADGROUP GROUP g1 alice STREAMS x >
set reply [r XINFO STREAM x FULL]
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $group entries-read] 3
assert_equal [dict get $group lag] 0
}

test {Loading from legacy (Redis <= v6.2.x, rdb_ver < 10) persistence} {
# The payload was DUMPed from a v5 instance after:
# XADD x 1-0 data a
Expand Down
Loading