summaryrefslogtreecommitdiff
path: root/src/t_stream.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/t_stream.c')
-rw-r--r--src/t_stream.c506
1 files changed, 406 insertions, 100 deletions
diff --git a/src/t_stream.c b/src/t_stream.c
index e47194926..cd7d9723e 100644
--- a/src/t_stream.c
+++ b/src/t_stream.c
@@ -68,8 +68,13 @@ stream *streamNew(void) {
stream *s = zmalloc(sizeof(*s));
s->rax = raxNew();
s->length = 0;
+ s->first_id.ms = 0;
+ s->first_id.seq = 0;
s->last_id.ms = 0;
s->last_id.seq = 0;
+ s->max_deleted_entry_id.seq = 0;
+ s->max_deleted_entry_id.ms = 0;
+ s->entries_added = 0;
s->cgroups = NULL; /* Created on demand to save memory when not used. */
return s;
}
@@ -184,7 +189,10 @@ robj *streamDup(robj *o) {
new_lp, NULL);
}
new_s->length = s->length;
+ new_s->first_id = s->first_id;
new_s->last_id = s->last_id;
+ new_s->max_deleted_entry_id = s->max_deleted_entry_id;
+ new_s->entries_added = s->entries_added;
raxStop(&ri);
if (s->cgroups == NULL) return sobj;
@@ -196,7 +204,8 @@ robj *streamDup(robj *o) {
while (raxNext(&ri_cgroups)) {
streamCG *cg = ri_cgroups.data;
streamCG *new_cg = streamCreateCG(new_s, (char *)ri_cgroups.key,
- ri_cgroups.key_len, &cg->last_id);
+ ri_cgroups.key_len, &cg->last_id,
+ cg->entries_read);
serverAssert(new_cg != NULL);
@@ -378,37 +387,21 @@ int streamCompareID(streamID *a, streamID *b) {
return 0;
}
-void streamGetEdgeID(stream *s, int first, streamID *edge_id)
+/* Retrieves the ID of the stream edge entry. An edge is either the first or
+ * the last ID in the stream, and may be a tombstone. To filter out tombstones,
+ * set the'skip_tombstones' argument to 1. */
+void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_id)
{
- raxIterator ri;
- raxStart(&ri, s->rax);
- int empty;
- if (first) {
- raxSeek(&ri, "^", NULL, 0);
- empty = !raxNext(&ri);
- } else {
- raxSeek(&ri, "$", NULL, 0);
- empty = !raxPrev(&ri);
- }
-
- if (empty) {
- /* Stream is empty, mark edge ID as lowest/highest possible. */
- edge_id->ms = first ? UINT64_MAX : 0;
- edge_id->seq = first ? UINT64_MAX : 0;
- raxStop(&ri);
- return;
+ streamIterator si;
+ int64_t numfields;
+ streamIteratorStart(&si,s,NULL,NULL,!first);
+ si.skip_tombstones = skip_tombstones;
+ int found = streamIteratorGetID(&si,edge_id,&numfields);
+ if (!found) {
+ streamID min_id = {0, 0}, max_id = {UINT64_MAX, UINT64_MAX};
+ *edge_id = first ? max_id : min_id;
}
- unsigned char *lp = ri.data;
-
- /* Read the master ID from the radix tree key. */
- streamID master_id;
- streamDecodeID(ri.key, &master_id);
-
- /* Construct edge ID. */
- lpGetEdgeStreamID(lp, first, &master_id, edge_id);
-
- raxStop(&ri);
}
/* Adds a new item into the stream 's' having the specified number of
@@ -664,7 +657,9 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_
if (ri.data != lp)
raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
s->length++;
+ s->entries_added++;
s->last_id = id;
+ if (s->length == 1) s->first_id = id;
if (added_id) *added_id = id;
return C_OK;
}
@@ -842,7 +837,7 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
}
deleted += deleted_from_lp;
- /* Now we the entries/deleted counters. */
+ /* Now we update the entries/deleted counters. */
p = lpFirst(lp);
lp = lpReplaceInteger(lp,&p,entries-deleted_from_lp);
p = lpNext(lp,p); /* Skip deleted field. */
@@ -864,8 +859,16 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
break; /* If we are here, there was enough to delete in the current
node, so no need to go to the next node. */
}
-
raxStop(&ri);
+
+ /* Update the stream's first ID after the trimming. */
+ if (s->length == 0) {
+ s->first_id.ms = 0;
+ s->first_id.seq = 0;
+ } else if (deleted) {
+ streamGetEdgeID(s,1,1,&s->first_id);
+ }
+
return deleted;
}
@@ -1089,9 +1092,10 @@ void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamI
}
}
si->stream = s;
- si->lp = NULL; /* There is no current listpack right now. */
+ si->lp = NULL; /* There is no current listpack right now. */
si->lp_ele = NULL; /* Current listpack cursor. */
- si->rev = rev; /* Direction, if non-zero reversed, from end to start. */
+ si->rev = rev; /* Direction, if non-zero reversed, from end to start. */
+ si->skip_tombstones = 1; /* By default tombstones aren't emitted. */
}
/* Return 1 and store the current item ID at 'id' if there are still
@@ -1189,10 +1193,10 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) {
serverAssert(*numfields>=0);
/* If current >= start, and the entry is not marked as
- * deleted, emit it. */
+ * deleted or tombstones are included, emit it. */
if (!si->rev) {
if (memcmp(buf,si->start_key,sizeof(streamID)) >= 0 &&
- !(flags & STREAM_ITEM_FLAG_DELETED))
+ (!si->skip_tombstones || !(flags & STREAM_ITEM_FLAG_DELETED)))
{
if (memcmp(buf,si->end_key,sizeof(streamID)) > 0)
return 0; /* We are already out of range. */
@@ -1203,7 +1207,7 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) {
}
} else {
if (memcmp(buf,si->end_key,sizeof(streamID)) <= 0 &&
- !(flags & STREAM_ITEM_FLAG_DELETED))
+ (!si->skip_tombstones || !(flags & STREAM_ITEM_FLAG_DELETED)))
{
if (memcmp(buf,si->start_key,sizeof(streamID)) < 0)
return 0; /* We are already out of range. */
@@ -1270,7 +1274,7 @@ void streamIteratorRemoveEntry(streamIterator *si, streamID *current) {
int64_t aux;
/* We do not really delete the entry here. Instead we mark it as
- * deleted flagging it, and also incrementing the count of the
+ * deleted by flagging it, and also incrementing the count of the
* deleted entries in the listpack header.
*
* We start flagging: */
@@ -1314,7 +1318,7 @@ void streamIteratorRemoveEntry(streamIterator *si, streamID *current) {
streamIteratorStop(si);
streamIteratorStart(si,si->stream,&start,&end,si->rev);
- /* TODO: perform a garbage collection here if the ration between
+ /* TODO: perform a garbage collection here if the ratio between
* deleted and valid goes over a certain limit. */
}
@@ -1325,6 +1329,20 @@ void streamIteratorStop(streamIterator *si) {
raxStop(&si->ri);
}
+/* Return 1 if `id` exists in `s` (and not marked as deleted) */
+int streamEntryExists(stream *s, streamID *id) {
+ streamIterator si;
+ streamIteratorStart(&si,s,id,id,0);
+ streamID myid;
+ int64_t numfields;
+ int found = streamIteratorGetID(&si,&myid,&numfields);
+ streamIteratorStop(&si);
+ if (!found)
+ return 0;
+ serverAssert(streamCompareID(id,&myid) == 0);
+ return 1;
+}
+
/* Delete the specified item ID from the stream, returning 1 if the item
* was deleted 0 otherwise (if it does not exist). */
int streamDeleteItem(stream *s, streamID *id) {
@@ -1372,6 +1390,148 @@ robj *createObjectFromStreamID(streamID *id) {
id->ms,id->seq));
}
+/* Returns non-zero if the ID is 0-0. */
+int streamIDEqZero(streamID *id) {
+ return !(id->ms || id->seq);
+}
+
+/* A helper that returns non-zero if the range from 'start' to `end`
+ * contains a tombstone.
+ *
+ * NOTE: this assumes that the caller had verified that 'start' is less than
+ * 's->last_id'. */
+int streamRangeHasTombstones(stream *s, streamID *start, streamID *end) {
+ streamID start_id, end_id;
+
+ if (!s->length || streamIDEqZero(&s->max_deleted_entry_id)) {
+ /* The stream is empty or has no tombstones. */
+ return 0;
+ }
+
+ if (streamCompareID(&s->first_id,&s->max_deleted_entry_id) > 0) {
+ /* The latest tombstone is before the first entry. */
+ return 0;
+ }
+
+ if (start) {
+ start_id = *start;
+ } else {
+ start_id.ms = 0;
+ start_id.seq = 0;
+ }
+
+ if (end) {
+ end_id = *end;
+ } else {
+ end_id.ms = UINT64_MAX;
+ end_id.seq = UINT64_MAX;
+ }
+
+ if (streamCompareID(&start_id,&s->max_deleted_entry_id) <= 0 &&
+ streamCompareID(&s->max_deleted_entry_id,&end_id) <= 0)
+ {
+ /* start_id <= max_deleted_entry_id <= end_id: The range does include a tombstone. */
+ return 1;
+ }
+
+ /* The range doesn't includes a tombstone. */
+ return 0;
+}
+
+/* Replies with a consumer group's current lag, that is the number of messages
+ * in the stream that are yet to be delivered. In case that the lag isn't
+ * available due to fragmentation, the reply to the client is a null. */
+void streamReplyWithCGLag(client *c, stream *s, streamCG *cg) {
+ int valid = 0;
+ long long lag = 0;
+
+ if (!s->entries_added) {
+ /* The lag of a newly-initialized stream is 0. */
+ lag = 0;
+ valid = 1;
+ } else if (cg->entries_read != SCG_INVALID_ENTRIES_READ && !streamRangeHasTombstones(s,&cg->last_id,NULL)) {
+ /* No fragmentation ahead means that the group's logical reads counter
+ * is valid for performing the lag calculation. */
+ lag = (long long)s->entries_added - cg->entries_read;
+ valid = 1;
+ } else {
+ /* Attempt to retrieve the group's last ID logical read counter. */
+ long long entries_read = streamEstimateDistanceFromFirstEverEntry(s,&cg->last_id);
+ if (entries_read != SCG_INVALID_ENTRIES_READ) {
+ /* A valid counter was obtained. */
+ lag = (long long)s->entries_added - entries_read;
+ valid = 1;
+ }
+ }
+
+ if (valid) {
+ addReplyLongLong(c,lag);
+ } else {
+ addReplyNull(c);
+ }
+}
+
+/* This function returns a value that is the ID's logical read counter, or its
+ * distance (the number of entries) from the first entry ever to have been added
+ * to the stream.
+ *
+ * A counter is returned only in one of the following cases:
+ * 1. The ID is the same as the stream's last ID. In this case, the returned
+ * is the same as the stream's entries_added counter.
+ * 2. The ID equals that of the currently first entry in the stream, and the
+ * stream has no tombstones. The returned value, in this case, is the result
+ * of subtracting the stream's length from its added_entries, incremented by
+ * one.
+ * 3. The ID less than the stream's first current entry's ID, and there are no
+ * tombstones. Here the estimated counter is the result of subtracting the
+ * stream's length from its added_entries.
+ * 4. The stream's added_entries is zero, meaning that no entries were ever
+ * added.
+ *
+ * The special return value of ULLONG_MAX signals that the counter's value isn't
+ * obtainable. It is returned in these cases:
+ * 1. The provided ID, if it even exists, is somewhere between the stream's
+ * current first and last entries' IDs, or in the future.
+ * 2. The stream contains one or more tombstones. */
+long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id) {
+ /* The counter of any ID in an empty, never-before-used stream is 0. */
+ if (!s->entries_added) {
+ return 0;
+ }
+
+ /* In the empty stream, if the ID is smaller or equal to the last ID,
+ * it can set to the current added_entries value. */
+ if (!s->length && streamCompareID(id,&s->last_id) < 1) {
+ return s->entries_added;
+ }
+
+ int cmp_last = streamCompareID(id,&s->last_id);
+ if (cmp_last == 0) {
+ /* Return the exact counter of the last entry in the stream. */
+ return s->entries_added;
+ } else if (cmp_last > 0) {
+ /* The counter of a future ID is unknown. */
+ return SCG_INVALID_ENTRIES_READ;
+ }
+
+ int cmp_id_first = streamCompareID(id,&s->first_id);
+ int cmp_xdel_first = streamCompareID(&s->max_deleted_entry_id,&s->first_id);
+ if (streamIDEqZero(&s->max_deleted_entry_id) || cmp_xdel_first < 0) {
+ /* There's definitely no fragmentation ahead. */
+ if (cmp_id_first < 0) {
+ /* Return the estimated counter. */
+ return s->entries_added - s->length;
+ } else if (cmp_id_first == 0) {
+ /* Return the exact counter of the first entry in the stream. */
+ return s->entries_added - s->length + 1;
+ }
+ }
+
+ /* The ID is either before an XDEL that fragments the stream or an arbitrary
+ * ID. Either case, so we can't make a prediction. */
+ return SCG_INVALID_ENTRIES_READ;
+}
+
/* As a result of an explicit XCLAIM or XREADGROUP command, new entries
* are created in the pending list of the stream and consumers. We need
* to propagate this changes in the form of XCLAIM commands. */
@@ -1411,19 +1571,22 @@ void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupnam
* that was consumed by XREADGROUP with the NOACK option: in that case we can't
* propagate the last ID just using the XCLAIM LASTID option, so we emit
*
- * XGROUP SETID <key> <groupname> <id>
+ * XGROUP SETID <key> <groupname> <id> ENTRIESREAD <entries_read>
*/
void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupname) {
- robj *argv[5];
+ robj *argv[7];
argv[0] = shared.xgroup;
argv[1] = shared.setid;
argv[2] = key;
argv[3] = groupname;
argv[4] = createObjectFromStreamID(&group->last_id);
+ argv[5] = shared.entriesread;
+ argv[6] = createStringObjectFromLongLong(group->entries_read);
- alsoPropagate(c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL);
+ alsoPropagate(c->db->id,argv,7,PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(argv[4]);
+ decrRefCount(argv[6]);
}
/* We need this when we want to propagate creation of consumer that was created
@@ -1462,6 +1625,10 @@ void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds
* function will not return it to the client.
* 3. An entry in the pending list will be created for every entry delivered
* for the first time to this consumer.
+ * 4. The group's read counter is incremented if it is already valid and there
+ * are no future tombstones, or is invalidated (set to 0) otherwise. If the
+ * counter is invalid to begin with, we try to obtain it for the last
+ * delivered ID.
*
* The behavior may be modified passing non-zero flags:
*
@@ -1518,6 +1685,15 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
while(streamIteratorGetID(&si,&id,&numfields)) {
/* Update the group last_id if needed. */
if (group && streamCompareID(&id,&group->last_id) > 0) {
+ if (group->entries_read != SCG_INVALID_ENTRIES_READ && !streamRangeHasTombstones(s,&id,NULL)) {
+ /* A valid counter and no future tombstones mean we can
+ * increment the read counter to keep tracking the group's
+ * progress. */
+ group->entries_read++;
+ } else if (s->entries_added) {
+ /* The group's counter may be invalid, so we try to obtain it. */
+ group->entries_read = streamEstimateDistanceFromFirstEverEntry(s,&id);
+ }
group->last_id = id;
/* Group last ID should be propagated only if NOACK was
* specified, otherwise the last id will be included
@@ -1791,7 +1967,7 @@ void streamRewriteTrimArgument(client *c, stream *s, int trim_strategy, int idx)
arg = createStringObjectFromLongLong(s->length);
} else {
streamID first_id;
- streamGetEdgeID(s, 1, &first_id);
+ streamGetEdgeID(s,1,0,&first_id);
arg = createObjectFromStreamID(&first_id);
}
@@ -2284,10 +2460,10 @@ void streamFreeConsumer(streamConsumer *sc) {
}
/* Create a new consumer group in the context of the stream 's', having the
- * specified name and last server ID. If a consumer group with the same name
- * already existed NULL is returned, otherwise the pointer to the consumer
- * group is returned. */
-streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id) {
+ * specified name, last server ID and reads counter. If a consumer group with
+ * the same name already exists NULL is returned, otherwise the pointer to the
+ * consumer group is returned. */
+streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, long long entries_read) {
if (s->cgroups == NULL) s->cgroups = raxNew();
if (raxFind(s->cgroups,(unsigned char*)name,namelen) != raxNotFound)
return NULL;
@@ -2296,6 +2472,7 @@ streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id) {
cg->pel = raxNew();
cg->consumers = raxNew();
cg->last_id = *id;
+ cg->entries_read = entries_read;
raxInsert(s->cgroups,(unsigned char*)name,namelen,cg,NULL);
return cg;
}
@@ -2375,8 +2552,8 @@ void streamDelConsumer(streamCG *cg, streamConsumer *consumer) {
* Consumer groups commands
* ----------------------------------------------------------------------- */
-/* XGROUP CREATE <key> <groupname> <id or $> [MKSTREAM]
- * XGROUP SETID <key> <groupname> <id or $>
+/* XGROUP CREATE <key> <groupname> <id or $> [MKSTREAM] [ENTRIESADDED count]
+ * XGROUP SETID <key> <groupname> <id or $> [ENTRIESADDED count]
* XGROUP DESTROY <key> <groupname>
* XGROUP CREATECONSUMER <key> <groupname> <consumer>
* XGROUP DELCONSUMER <key> <groupname> <consumername> */
@@ -2386,21 +2563,33 @@ void xgroupCommand(client *c) {
streamCG *cg = NULL;
char *opt = c->argv[1]->ptr; /* Subcommand name. */
int mkstream = 0;
+ long long entries_read = SCG_INVALID_ENTRIES_READ;
robj *o;
- /* CREATE has an MKSTREAM option that creates the stream if it
- * does not exist. */
- if (c->argc == 6 && !strcasecmp(opt,"CREATE")) {
- if (strcasecmp(c->argv[5]->ptr,"MKSTREAM")) {
- addReplySubcommandSyntaxError(c);
- return;
- }
- mkstream = 1;
- grpname = c->argv[3]->ptr;
- }
-
/* Everything but the "HELP" option requires a key and group name. */
if (c->argc >= 4) {
+ /* Parse optional arguments for CREATE and SETID */
+ int i = 5;
+ int create_subcmd = !strcasecmp(opt,"CREATE");
+ int setid_subcmd = !strcasecmp(opt,"SETID");
+ while (i < c->argc) {
+ if (create_subcmd && !strcasecmp(c->argv[i]->ptr,"MKSTREAM")) {
+ mkstream = 1;
+ i++;
+ } else if ((create_subcmd || setid_subcmd) && !strcasecmp(c->argv[i]->ptr,"ENTRIESREAD") && i + 1 < c->argc) {
+ if (getLongLongFromObjectOrReply(c,c->argv[i+1],&entries_read,NULL) != C_OK)
+ return;
+ if (entries_read < 0 && entries_read != SCG_INVALID_ENTRIES_READ) {
+ addReplyError(c,"value for ENTRIESREAD must be positive or -1");
+ return;
+ }
+ i += 2;
+ } else {
+ addReplySubcommandSyntaxError(c);
+ return;
+ }
+ }
+
o = lookupKeyWrite(c->db,c->argv[2]);
if (o) {
if (checkType(c,o,OBJ_STREAM)) return;
@@ -2440,18 +2629,20 @@ void xgroupCommand(client *c) {
" Create a new consumer group. Options are:",
" * MKSTREAM",
" Create the empty stream if it does not exist.",
+" * ENTRIESREAD entries_read",
+" Set the group's entries_read counter (internal use).",
"CREATECONSUMER <key> <groupname> <consumer>",
" Create a new consumer in the specified group.",
"DELCONSUMER <key> <groupname> <consumer>",
" Remove the specified consumer.",
"DESTROY <key> <groupname>",
" Remove the specified group.",
-"SETID <key> <groupname> <id|$>",
-" Set the current group ID.",
+"SETID <key> <groupname> <id|$> [ENTRIESREAD entries_read]",
+" Set the current group ID and entries_read counter.",
NULL
};
addReplyHelp(c, help);
- } else if (!strcasecmp(opt,"CREATE") && (c->argc == 5 || c->argc == 6)) {
+ } else if (!strcasecmp(opt,"CREATE") && (c->argc >= 5 && c->argc <= 8)) {
streamID id;
if (!strcmp(c->argv[4]->ptr,"$")) {
if (s) {
@@ -2473,7 +2664,7 @@ NULL
signalModifiedKey(c,c->db,c->argv[2]);
}
- streamCG *cg = streamCreateCG(s,grpname,sdslen(grpname),&id);
+ streamCG *cg = streamCreateCG(s,grpname,sdslen(grpname),&id,entries_read);
if (cg) {
addReply(c,shared.ok);
server.dirty++;
@@ -2482,7 +2673,7 @@ NULL
} else {
addReplyError(c,"-BUSYGROUP Consumer Group name already exists");
}
- } else if (!strcasecmp(opt,"SETID") && c->argc == 5) {
+ } else if (!strcasecmp(opt,"SETID") && (c->argc == 5 || c->argc == 7)) {
streamID id;
if (!strcmp(c->argv[4]->ptr,"$")) {
id = s->last_id;
@@ -2490,6 +2681,7 @@ NULL
return;
}
cg->last_id = id;
+ cg->entries_read = entries_read;
addReply(c,shared.ok);
server.dirty++;
notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-setid",c->argv[2],c->db->id);
@@ -2528,16 +2720,46 @@ NULL
}
}
-/* XSETID <stream> <id>
+/* XSETID <stream> <id> [ENTRIESADDED entries_added] [MAXDELETEDID max_deleted_entry_id]
*
- * Set the internal "last ID" of a stream. */
+ * Set the internal "last ID", "added entries" and "maximal deleted entry ID"
+ * of a stream. */
void xsetidCommand(client *c) {
+ streamID id, max_xdel_id = {0, 0};
+ long long entries_added = -1;
+
+ if (streamParseStrictIDOrReply(c,c->argv[2],&id,0,NULL) != C_OK)
+ return;
+
+ int i = 3;
+ while (i < c->argc) {
+ int moreargs = (c->argc-1) - i; /* Number of additional arguments. */
+ char *opt = c->argv[i]->ptr;
+ if (!strcasecmp(opt,"ENTRIESADDED") && moreargs) {
+ if (getLongLongFromObjectOrReply(c,c->argv[i+1],&entries_added,NULL) != C_OK) {
+ return;
+ } else if (entries_added < 0) {
+ addReplyError(c,"entries_added must be positive");
+ return;
+ }
+ i += 2;
+ } else if (!strcasecmp(opt,"MAXDELETEDID") && moreargs) {
+ if (streamParseStrictIDOrReply(c,c->argv[i+1],&max_xdel_id,0,NULL) != C_OK) {
+ return;
+ } else if (streamCompareID(&id,&max_xdel_id) < 0) {
+ addReplyError(c,"The ID specified in XSETID is smaller than the provided max_deleted_entry_id");
+ return;
+ }
+ i += 2;
+ } else {
+ addReplyErrorObject(c,shared.syntaxerr);
+ return;
+ }
+ }
+
robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
if (o == NULL || checkType(c,o,OBJ_STREAM)) return;
-
stream *s = o->ptr;
- streamID id;
- if (streamParseStrictIDOrReply(c,c->argv[2],&id,0,NULL) != C_OK) return;
/* If the stream has at least one item, we want to check that the user
* is setting a last ID that is equal or greater than the current top
@@ -2547,12 +2769,22 @@ void xsetidCommand(client *c) {
streamLastValidID(s,&maxid);
if (streamCompareID(&id,&maxid) < 0) {
- addReplyError(c,"The ID specified in XSETID is smaller than the "
- "target stream top item");
+ addReplyError(c,"The ID specified in XSETID is smaller than the target stream top item");
+ return;
+ }
+
+ /* If an entries_added was provided, it can't be lower than the length. */
+ if (entries_added != -1 && s->length > (uint64_t)entries_added) {
+ addReplyError(c,"The entries_added specified in XSETID is smaller than the target stream length");
return;
}
}
+
s->last_id = id;
+ if (entries_added != -1)
+ s->entries_added = entries_added;
+ if (!streamIDEqZero(&max_xdel_id))
+ s->max_deleted_entry_id = max_xdel_id;
addReply(c,shared.ok);
server.dirty++;
notifyKeyspaceEvent(NOTIFY_STREAM,"xsetid",c->argv[1],c->db->id);
@@ -2980,23 +3212,28 @@ void xclaimCommand(client *c) {
/* Lookup the ID in the group PEL. */
streamNACK *nack = raxFind(group->pel,buf,sizeof(buf));
+ /* Item must exist for us to transfer it to another consumer. */
+ if (!streamEntryExists(o->ptr,&id)) {
+ /* Clear this entry from the PEL, it no longer exists */
+ if (nack != raxNotFound) {
+ /* Propagate this change (we are going to delete the NACK). */
+ streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],c->argv[j],nack);
+ propagate_last_id = 0; /* Will be propagated by XCLAIM itself. */
+ server.dirty++;
+ /* Release the NACK */
+ raxRemove(group->pel,buf,sizeof(buf),NULL);
+ raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
+ streamFreeNACK(nack);
+ }
+ continue;
+ }
+
/* If FORCE is passed, let's check if at least the entry
* exists in the Stream. In such case, we'll create a new
* entry in the PEL from scratch, so that XCLAIM can also
* be used to create entries in the PEL. Useful for AOF
* and replication of consumer groups. */
if (force && nack == raxNotFound) {
- streamIterator myiterator;
- streamIteratorStart(&myiterator,o->ptr,&id,&id,0);
- int64_t numfields;
- int found = 0;
- streamID item_id;
- if (streamIteratorGetID(&myiterator,&item_id,&numfields)) found = 1;
- streamIteratorStop(&myiterator);
-
- /* Item must exist for us to create a NACK for it. */
- if (!found) continue;
-
/* Create the NACK. */
nack = streamCreateNACK(NULL);
raxInsert(group->pel,buf,sizeof(buf),nack,NULL);
@@ -3013,6 +3250,7 @@ void xclaimCommand(client *c) {
mstime_t this_idle = now - nack->delivery_time;
if (this_idle < minidle) continue;
}
+
if (consumer == NULL &&
(consumer = streamLookupConsumer(group,name,SLC_DEFAULT)) == NULL)
{
@@ -3042,9 +3280,7 @@ void xclaimCommand(client *c) {
if (justid) {
addReplyStreamID(c,&id);
} else {
- size_t emitted = streamReplyWithRange(c,o->ptr,&id,&id,1,0,
- NULL,NULL,STREAM_RWR_RAWENTRIES,NULL);
- if (!emitted) addReplyNull(c);
+ serverAssert(streamReplyWithRange(c,o->ptr,&id,&id,1,0,NULL,NULL,STREAM_RWR_RAWENTRIES,NULL) == 1);
}
arraylen++;
@@ -3138,9 +3374,9 @@ void xautoclaimCommand(client *c) {
streamConsumer *consumer = NULL;
long long attempts = count*10;
- addReplyArrayLen(c, 2);
- void *endidptr = addReplyDeferredLen(c);
- void *arraylenptr = addReplyDeferredLen(c);
+ addReplyArrayLen(c, 3); /* We add another reply later */
+ void *endidptr = addReplyDeferredLen(c); /* reply[0] */
+ void *arraylenptr = addReplyDeferredLen(c); /* reply[1] */
unsigned char startkey[sizeof(streamID)];
streamEncodeID(startkey,&startid);
@@ -3150,18 +3386,37 @@ void xautoclaimCommand(client *c) {
size_t arraylen = 0;
mstime_t now = mstime();
sds name = c->argv[3]->ptr;
+ streamID *deleted_ids = zmalloc(count * sizeof(streamID));
+ int deleted_id_num = 0;
while (attempts-- && count && raxNext(&ri)) {
streamNACK *nack = ri.data;
+ streamID id;
+ streamDecodeID(ri.key, &id);
+
+ /* Item must exist for us to transfer it to another consumer. */
+ if (!streamEntryExists(o->ptr,&id)) {
+ /* Propagate this change (we are going to delete the NACK). */
+ robj *idstr = createObjectFromStreamID(&id);
+ streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],idstr,nack);
+ decrRefCount(idstr);
+ server.dirty++;
+ /* Clear this entry from the PEL, it no longer exists */
+ raxRemove(group->pel,ri.key,ri.key_len,NULL);
+ raxRemove(nack->consumer->pel,ri.key,ri.key_len,NULL);
+ streamFreeNACK(nack);
+ /* Remember the ID for later */
+ deleted_ids[deleted_id_num++] = id;
+ raxSeek(&ri,">=",ri.key,ri.key_len);
+ continue;
+ }
+
if (minidle) {
mstime_t this_idle = now - nack->delivery_time;
if (this_idle < minidle)
continue;
}
- streamID id;
- streamDecodeID(ri.key, &id);
-
if (consumer == NULL &&
(consumer = streamLookupConsumer(group,name,SLC_DEFAULT)) == NULL)
{
@@ -3191,11 +3446,7 @@ void xautoclaimCommand(client *c) {
if (justid) {
addReplyStreamID(c,&id);
} else {
- size_t emitted =
- streamReplyWithRange(c,o->ptr,&id,&id,1,0,NULL,NULL,
- STREAM_RWR_RAWENTRIES,NULL);
- if (!emitted)
- addReplyNull(c);
+ serverAssert(streamReplyWithRange(c,o->ptr,&id,&id,1,0,NULL,NULL,STREAM_RWR_RAWENTRIES,NULL) == 1);
}
arraylen++;
count--;
@@ -3221,6 +3472,12 @@ void xautoclaimCommand(client *c) {
setDeferredArrayLen(c,arraylenptr,arraylen);
setDeferredReplyStreamID(c,endidptr,&endid);
+ addReplyArrayLen(c, deleted_id_num); /* reply[2] */
+ for (int i = 0; i < deleted_id_num; i++) {
+ addReplyStreamID(c, &deleted_ids[i]);
+ }
+ zfree(deleted_ids);
+
preventCommandPropagation(c);
}
@@ -3250,8 +3507,31 @@ void xdelCommand(client *c) {
/* Actually apply the command. */
int deleted = 0;
+ int first_entry = 0;
for (int j = 2; j < c->argc; j++) {
- deleted += streamDeleteItem(s,&ids[j-2]);
+ streamID *id = &ids[j-2];
+ if (streamDeleteItem(s,id)) {
+ /* We want to know if the first entry in the stream was deleted
+ * so we can later set the new one. */
+ if (streamCompareID(id,&s->first_id) == 0) {
+ first_entry = 1;
+ }
+ /* Update the stream's maximal tombstone if needed. */
+ if (streamCompareID(id,&s->max_deleted_entry_id) > 0) {
+ s->max_deleted_entry_id = *id;
+ }
+ deleted++;
+ };
+ }
+
+ /* Update the stream's first ID. */
+ if (deleted) {
+ if (s->length == 0) {
+ s->first_id.ms = 0;
+ s->first_id.seq = 0;
+ } else if (first_entry) {
+ streamGetEdgeID(s,1,1,&s->first_id);
+ }
}
/* Propagate the write if needed. */
@@ -3359,7 +3639,7 @@ void xinfoReplyWithStreamInfo(client *c, stream *s) {
}
}
- addReplyMapLen(c,full ? 6 : 7);
+ addReplyMapLen(c,full ? 9 : 10);
addReplyBulkCString(c,"length");
addReplyLongLong(c,s->length);
addReplyBulkCString(c,"radix-tree-keys");
@@ -3368,6 +3648,12 @@ void xinfoReplyWithStreamInfo(client *c, stream *s) {
addReplyLongLong(c,s->rax->numnodes);
addReplyBulkCString(c,"last-generated-id");
addReplyStreamID(c,&s->last_id);
+ addReplyBulkCString(c,"max-deleted-entry-id");
+ addReplyStreamID(c,&s->max_deleted_entry_id);
+ addReplyBulkCString(c,"entries-added");
+ addReplyLongLong(c,s->entries_added);
+ addReplyBulkCString(c,"recorded-first-entry-id");
+ addReplyStreamID(c,&s->first_id);
if (!full) {
/* XINFO STREAM <key> */
@@ -3406,7 +3692,7 @@ void xinfoReplyWithStreamInfo(client *c, stream *s) {
raxSeek(&ri_cgroups,"^",NULL,0);
while(raxNext(&ri_cgroups)) {
streamCG *cg = ri_cgroups.data;
- addReplyMapLen(c,5);
+ addReplyMapLen(c,7);
/* Name */
addReplyBulkCString(c,"name");
@@ -3416,6 +3702,18 @@ void xinfoReplyWithStreamInfo(client *c, stream *s) {
addReplyBulkCString(c,"last-delivered-id");
addReplyStreamID(c,&cg->last_id);
+ /* Read counter of the last delivered ID */
+ addReplyBulkCString(c,"entries-read");
+ if (cg->entries_read != SCG_INVALID_ENTRIES_READ) {
+ addReplyLongLong(c,cg->entries_read);
+ } else {
+ addReplyNull(c);
+ }
+
+ /* Group lag */
+ addReplyBulkCString(c,"lag");
+ streamReplyWithCGLag(c,s,cg);
+
/* Group PEL count */
addReplyBulkCString(c,"pel-count");
addReplyLongLong(c,raxSize(cg->pel));
@@ -3593,7 +3891,7 @@ NULL
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
streamCG *cg = ri.data;
- addReplyMapLen(c,4);
+ addReplyMapLen(c,6);
addReplyBulkCString(c,"name");
addReplyBulkCBuffer(c,ri.key,ri.key_len);
addReplyBulkCString(c,"consumers");
@@ -3602,6 +3900,14 @@ NULL
addReplyLongLong(c,raxSize(cg->pel));
addReplyBulkCString(c,"last-delivered-id");
addReplyStreamID(c,&cg->last_id);
+ addReplyBulkCString(c,"entries-read");
+ if (cg->entries_read != SCG_INVALID_ENTRIES_READ) {
+ addReplyLongLong(c,cg->entries_read);
+ } else {
+ addReplyNull(c);
+ }
+ addReplyBulkCString(c,"lag");
+ streamReplyWithCGLag(c,s,cg);
}
raxStop(&ri);
} else if (!strcasecmp(opt,"STREAM")) {