Skip to content

Commit

Permalink
fix(stream_family): Fix journaling in the XADD and XTRIM commands
Browse files Browse the repository at this point in the history
fixes #4202

Signed-off-by: Stepan Bagritsevich <[email protected]>
  • Loading branch information
BagritsevichStepan committed Jan 13, 2025
1 parent f6441df commit 2ddf4cc
Show file tree
Hide file tree
Showing 4 changed files with 246 additions and 151 deletions.
8 changes: 5 additions & 3 deletions src/redis/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,11 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_
int streamDeleteItem(stream *s, streamID *id);
void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_id);
long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id);
int64_t streamTrim(stream *s, streamAddTrimArgs *args);
int64_t streamTrimByLength(stream *s, long long maxlen, int approx);
int64_t streamTrimByID(stream *s, streamID minid, int approx);
int64_t streamTrim(stream *s, streamAddTrimArgs *args, streamID *last_id);
int64_t streamTrimByLength(stream *s, long long maxlen, int approx, streamID *last_id);
int64_t streamTrimByLengthLimited(stream *s, long long maxlen, int approx, long long limit, streamID *last_id);
int64_t streamTrimByID(stream *s, streamID minid, int approx, streamID *last_id);
int64_t streamTrimByIDLimited(stream *s, streamID minid, int approx, long long limit, streamID *last_id);
void streamFreeCG(streamCG *cg);
void streamDelConsumer(streamCG *cg, streamConsumer *consumer);
void streamLastValidID(stream *s, streamID *maxid);
Expand Down
52 changes: 39 additions & 13 deletions src/redis/t_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_i
* that should be trimmed, there is a chance we will still have entries with
* IDs < 'id' (or number of elements >= maxlen in case of MAXLEN).
*/
int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
int64_t streamTrim(stream *s, streamAddTrimArgs *args, streamID *last_id) {
size_t maxlen = args->maxlen;
streamID *id = &args->minid;
int approx = args->approx_trim;
Expand All @@ -315,6 +315,8 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
raxSeek(&ri,"^",NULL,0);

int64_t deleted = 0;
streamID last_deleted_id = {0, 0}; // Initialize last deleted ID

while (raxNext(&ri)) {
if (trim_strategy == TRIM_STRATEGY_MAXLEN && s->length <= maxlen)
break;
Expand All @@ -331,16 +333,24 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
streamID master_id = {0}; /* For MINID */
if (trim_strategy == TRIM_STRATEGY_MAXLEN) {
remove_node = s->length - entries >= maxlen;
if (remove_node) {
streamDecodeID(ri.key, &master_id);
// Write last ID to last_deleted_id
lpGetEdgeStreamID(lp, 0, &master_id, &last_deleted_id);
}
} else {
/* Read the master ID from the radix tree key. */
streamDecodeID(ri.key, &master_id);

/* Read last ID. */
streamID last_id = {0, 0};
lpGetEdgeStreamID(lp, 0, &master_id, &last_id);

/* We can remove the entire node id its last ID < 'id' */
remove_node = streamCompareID(&last_id, id) < 0;
if (remove_node) {
last_deleted_id = last_id;
}
}

if (remove_node) {
Expand All @@ -356,6 +366,10 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
* stop here. */
if (approx) break;

if (trim_strategy == TRIM_STRATEGY_MAXLEN) {
streamDecodeID(ri.key, &master_id);
}

/* Now we have to trim entries from within 'lp' */
int64_t deleted_from_lp = 0;

Expand Down Expand Up @@ -386,11 +400,7 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
int64_t seq_delta = lpGetInteger(p);
p = lpNext(lp, p); /* Skip ID seq delta */

streamID currid = {0}; /* For MINID */
if (trim_strategy == TRIM_STRATEGY_MINID) {
currid.ms = master_id.ms + ms_delta;
currid.seq = master_id.seq + seq_delta;
}
streamID currid = {master_id.ms + ms_delta, master_id.seq + seq_delta};

int stop;
if (trim_strategy == TRIM_STRATEGY_MAXLEN) {
Expand Down Expand Up @@ -422,6 +432,7 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
deleted_from_lp++;
s->length--;
p = lp + delta;
last_deleted_id = currid;
}
}
deleted += deleted_from_lp;
Expand Down Expand Up @@ -458,29 +469,44 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
streamGetEdgeID(s,1,1,&s->first_id);
}

/* Set the last deleted ID, if applicable. */
if (last_id) {
*last_id = last_deleted_id;
}

return deleted;
}

/* Trims a stream by length. Returns the number of deleted items. */
int64_t streamTrimByLength(stream *s, long long maxlen, int approx) {
int64_t streamTrimByLength(stream *s, long long maxlen, int approx, streamID *last_id) {
return streamTrimByLengthLimited(s, maxlen, approx, approx ? 100 * server.stream_node_max_entries : 0, last_id);
}

/* Trims a stream by length. Returns the number of deleted items. */
int64_t streamTrimByLengthLimited(stream *s, long long maxlen, int approx, long long limit, streamID *last_id) {
streamAddTrimArgs args = {
.trim_strategy = TRIM_STRATEGY_MAXLEN,
.approx_trim = approx,
.limit = approx ? 100 * server.stream_node_max_entries : 0,
.limit = limit,
.maxlen = maxlen
};
return streamTrim(s, &args);
return streamTrim(s, &args, last_id);
}

/* Trims a stream by minimum ID. Returns the number of deleted items. */
int64_t streamTrimByID(stream *s, streamID minid, int approx, streamID *last_id) {
return streamTrimByIDLimited(s, minid, approx, approx ? 100 * server.stream_node_max_entries : 0, last_id);
}

/* Trims a stream by minimum ID. Returns the number of deleted items. */
int64_t streamTrimByID(stream *s, streamID minid, int approx) {
int64_t streamTrimByIDLimited(stream *s, streamID minid, int approx, long long limit, streamID *last_id) {
streamAddTrimArgs args = {
.trim_strategy = TRIM_STRATEGY_MINID,
.approx_trim = approx,
.limit = approx ? 100 * server.stream_node_max_entries : 0,
.limit = limit,
.minid = minid
};
return streamTrim(s, &args);
return streamTrim(s, &args, last_id);
}

/* Initialize the stream iterator, so that we can call iterating functions
Expand Down
Loading

0 comments on commit 2ddf4cc

Please sign in to comment.