diff options
author | Oran Agra <oran@redislabs.com> | 2022-02-28 15:35:46 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-02-28 15:35:46 +0200 |
commit | d2b5a579dd8b785690aa7714df8776ffc452d242 (patch) | |
tree | 1c54c71bae68eaa44efbf89020d75399a88dee40 /src/t_stream.c | |
parent | d5915a167f696644e210ee85e549c7ceb41b5791 (diff) | |
parent | 10dc57ab226155bbdbfb0b0d914e681aa346d7de (diff) | |
download | redis-7.0-rc2.tar.gz |
Merge pull request #10355 from oranagra/release-7.0-rc27.0-rc2
Release 7.0 RC2
Diffstat (limited to 'src/t_stream.c')
-rw-r--r-- | src/t_stream.c | 506 |
1 files changed, 406 insertions, 100 deletions
diff --git a/src/t_stream.c b/src/t_stream.c index e47194926..cd7d9723e 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -68,8 +68,13 @@ stream *streamNew(void) { stream *s = zmalloc(sizeof(*s)); s->rax = raxNew(); s->length = 0; + s->first_id.ms = 0; + s->first_id.seq = 0; s->last_id.ms = 0; s->last_id.seq = 0; + s->max_deleted_entry_id.seq = 0; + s->max_deleted_entry_id.ms = 0; + s->entries_added = 0; s->cgroups = NULL; /* Created on demand to save memory when not used. */ return s; } @@ -184,7 +189,10 @@ robj *streamDup(robj *o) { new_lp, NULL); } new_s->length = s->length; + new_s->first_id = s->first_id; new_s->last_id = s->last_id; + new_s->max_deleted_entry_id = s->max_deleted_entry_id; + new_s->entries_added = s->entries_added; raxStop(&ri); if (s->cgroups == NULL) return sobj; @@ -196,7 +204,8 @@ robj *streamDup(robj *o) { while (raxNext(&ri_cgroups)) { streamCG *cg = ri_cgroups.data; streamCG *new_cg = streamCreateCG(new_s, (char *)ri_cgroups.key, - ri_cgroups.key_len, &cg->last_id); + ri_cgroups.key_len, &cg->last_id, + cg->entries_read); serverAssert(new_cg != NULL); @@ -378,37 +387,21 @@ int streamCompareID(streamID *a, streamID *b) { return 0; } -void streamGetEdgeID(stream *s, int first, streamID *edge_id) +/* Retrieves the ID of the stream edge entry. An edge is either the first or + * the last ID in the stream, and may be a tombstone. To filter out tombstones, + * set the'skip_tombstones' argument to 1. */ +void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_id) { - raxIterator ri; - raxStart(&ri, s->rax); - int empty; - if (first) { - raxSeek(&ri, "^", NULL, 0); - empty = !raxNext(&ri); - } else { - raxSeek(&ri, "$", NULL, 0); - empty = !raxPrev(&ri); - } - - if (empty) { - /* Stream is empty, mark edge ID as lowest/highest possible. */ - edge_id->ms = first ? UINT64_MAX : 0; - edge_id->seq = first ? UINT64_MAX : 0; - raxStop(&ri); - return; + streamIterator si; + int64_t numfields; + streamIteratorStart(&si,s,NULL,NULL,!first); + si.skip_tombstones = skip_tombstones; + int found = streamIteratorGetID(&si,edge_id,&numfields); + if (!found) { + streamID min_id = {0, 0}, max_id = {UINT64_MAX, UINT64_MAX}; + *edge_id = first ? max_id : min_id; } - unsigned char *lp = ri.data; - - /* Read the master ID from the radix tree key. */ - streamID master_id; - streamDecodeID(ri.key, &master_id); - - /* Construct edge ID. */ - lpGetEdgeStreamID(lp, first, &master_id, edge_id); - - raxStop(&ri); } /* Adds a new item into the stream 's' having the specified number of @@ -664,7 +657,9 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_ if (ri.data != lp) raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL); s->length++; + s->entries_added++; s->last_id = id; + if (s->length == 1) s->first_id = id; if (added_id) *added_id = id; return C_OK; } @@ -842,7 +837,7 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) { } deleted += deleted_from_lp; - /* Now we the entries/deleted counters. */ + /* Now we update the entries/deleted counters. */ p = lpFirst(lp); lp = lpReplaceInteger(lp,&p,entries-deleted_from_lp); p = lpNext(lp,p); /* Skip deleted field. */ @@ -864,8 +859,16 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) { break; /* If we are here, there was enough to delete in the current node, so no need to go to the next node. */ } - raxStop(&ri); + + /* Update the stream's first ID after the trimming. */ + if (s->length == 0) { + s->first_id.ms = 0; + s->first_id.seq = 0; + } else if (deleted) { + streamGetEdgeID(s,1,1,&s->first_id); + } + return deleted; } @@ -1089,9 +1092,10 @@ void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamI } } si->stream = s; - si->lp = NULL; /* There is no current listpack right now. */ + si->lp = NULL; /* There is no current listpack right now. */ si->lp_ele = NULL; /* Current listpack cursor. */ - si->rev = rev; /* Direction, if non-zero reversed, from end to start. */ + si->rev = rev; /* Direction, if non-zero reversed, from end to start. */ + si->skip_tombstones = 1; /* By default tombstones aren't emitted. */ } /* Return 1 and store the current item ID at 'id' if there are still @@ -1189,10 +1193,10 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) { serverAssert(*numfields>=0); /* If current >= start, and the entry is not marked as - * deleted, emit it. */ + * deleted or tombstones are included, emit it. */ if (!si->rev) { if (memcmp(buf,si->start_key,sizeof(streamID)) >= 0 && - !(flags & STREAM_ITEM_FLAG_DELETED)) + (!si->skip_tombstones || !(flags & STREAM_ITEM_FLAG_DELETED))) { if (memcmp(buf,si->end_key,sizeof(streamID)) > 0) return 0; /* We are already out of range. */ @@ -1203,7 +1207,7 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) { } } else { if (memcmp(buf,si->end_key,sizeof(streamID)) <= 0 && - !(flags & STREAM_ITEM_FLAG_DELETED)) + (!si->skip_tombstones || !(flags & STREAM_ITEM_FLAG_DELETED))) { if (memcmp(buf,si->start_key,sizeof(streamID)) < 0) return 0; /* We are already out of range. */ @@ -1270,7 +1274,7 @@ void streamIteratorRemoveEntry(streamIterator *si, streamID *current) { int64_t aux; /* We do not really delete the entry here. Instead we mark it as - * deleted flagging it, and also incrementing the count of the + * deleted by flagging it, and also incrementing the count of the * deleted entries in the listpack header. * * We start flagging: */ @@ -1314,7 +1318,7 @@ void streamIteratorRemoveEntry(streamIterator *si, streamID *current) { streamIteratorStop(si); streamIteratorStart(si,si->stream,&start,&end,si->rev); - /* TODO: perform a garbage collection here if the ration between + /* TODO: perform a garbage collection here if the ratio between * deleted and valid goes over a certain limit. */ } @@ -1325,6 +1329,20 @@ void streamIteratorStop(streamIterator *si) { raxStop(&si->ri); } +/* Return 1 if `id` exists in `s` (and not marked as deleted) */ +int streamEntryExists(stream *s, streamID *id) { + streamIterator si; + streamIteratorStart(&si,s,id,id,0); + streamID myid; + int64_t numfields; + int found = streamIteratorGetID(&si,&myid,&numfields); + streamIteratorStop(&si); + if (!found) + return 0; + serverAssert(streamCompareID(id,&myid) == 0); + return 1; +} + /* Delete the specified item ID from the stream, returning 1 if the item * was deleted 0 otherwise (if it does not exist). */ int streamDeleteItem(stream *s, streamID *id) { @@ -1372,6 +1390,148 @@ robj *createObjectFromStreamID(streamID *id) { id->ms,id->seq)); } +/* Returns non-zero if the ID is 0-0. */ +int streamIDEqZero(streamID *id) { + return !(id->ms || id->seq); +} + +/* A helper that returns non-zero if the range from 'start' to `end` + * contains a tombstone. + * + * NOTE: this assumes that the caller had verified that 'start' is less than + * 's->last_id'. */ +int streamRangeHasTombstones(stream *s, streamID *start, streamID *end) { + streamID start_id, end_id; + + if (!s->length || streamIDEqZero(&s->max_deleted_entry_id)) { + /* The stream is empty or has no tombstones. */ + return 0; + } + + if (streamCompareID(&s->first_id,&s->max_deleted_entry_id) > 0) { + /* The latest tombstone is before the first entry. */ + return 0; + } + + if (start) { + start_id = *start; + } else { + start_id.ms = 0; + start_id.seq = 0; + } + + if (end) { + end_id = *end; + } else { + end_id.ms = UINT64_MAX; + end_id.seq = UINT64_MAX; + } + + if (streamCompareID(&start_id,&s->max_deleted_entry_id) <= 0 && + streamCompareID(&s->max_deleted_entry_id,&end_id) <= 0) + { + /* start_id <= max_deleted_entry_id <= end_id: The range does include a tombstone. */ + return 1; + } + + /* The range doesn't includes a tombstone. */ + return 0; +} + +/* Replies with a consumer group's current lag, that is the number of messages + * in the stream that are yet to be delivered. In case that the lag isn't + * available due to fragmentation, the reply to the client is a null. */ +void streamReplyWithCGLag(client *c, stream *s, streamCG *cg) { + int valid = 0; + long long lag = 0; + + if (!s->entries_added) { + /* The lag of a newly-initialized stream is 0. */ + lag = 0; + valid = 1; + } else if (cg->entries_read != SCG_INVALID_ENTRIES_READ && !streamRangeHasTombstones(s,&cg->last_id,NULL)) { + /* No fragmentation ahead means that the group's logical reads counter + * is valid for performing the lag calculation. */ + lag = (long long)s->entries_added - cg->entries_read; + valid = 1; + } else { + /* Attempt to retrieve the group's last ID logical read counter. */ + long long entries_read = streamEstimateDistanceFromFirstEverEntry(s,&cg->last_id); + if (entries_read != SCG_INVALID_ENTRIES_READ) { + /* A valid counter was obtained. */ + lag = (long long)s->entries_added - entries_read; + valid = 1; + } + } + + if (valid) { + addReplyLongLong(c,lag); + } else { + addReplyNull(c); + } +} + +/* This function returns a value that is the ID's logical read counter, or its + * distance (the number of entries) from the first entry ever to have been added + * to the stream. + * + * A counter is returned only in one of the following cases: + * 1. The ID is the same as the stream's last ID. In this case, the returned + * is the same as the stream's entries_added counter. + * 2. The ID equals that of the currently first entry in the stream, and the + * stream has no tombstones. The returned value, in this case, is the result + * of subtracting the stream's length from its added_entries, incremented by + * one. + * 3. The ID less than the stream's first current entry's ID, and there are no + * tombstones. Here the estimated counter is the result of subtracting the + * stream's length from its added_entries. + * 4. The stream's added_entries is zero, meaning that no entries were ever + * added. + * + * The special return value of ULLONG_MAX signals that the counter's value isn't + * obtainable. It is returned in these cases: + * 1. The provided ID, if it even exists, is somewhere between the stream's + * current first and last entries' IDs, or in the future. + * 2. The stream contains one or more tombstones. */ +long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id) { + /* The counter of any ID in an empty, never-before-used stream is 0. */ + if (!s->entries_added) { + return 0; + } + + /* In the empty stream, if the ID is smaller or equal to the last ID, + * it can set to the current added_entries value. */ + if (!s->length && streamCompareID(id,&s->last_id) < 1) { + return s->entries_added; + } + + int cmp_last = streamCompareID(id,&s->last_id); + if (cmp_last == 0) { + /* Return the exact counter of the last entry in the stream. */ + return s->entries_added; + } else if (cmp_last > 0) { + /* The counter of a future ID is unknown. */ + return SCG_INVALID_ENTRIES_READ; + } + + int cmp_id_first = streamCompareID(id,&s->first_id); + int cmp_xdel_first = streamCompareID(&s->max_deleted_entry_id,&s->first_id); + if (streamIDEqZero(&s->max_deleted_entry_id) || cmp_xdel_first < 0) { + /* There's definitely no fragmentation ahead. */ + if (cmp_id_first < 0) { + /* Return the estimated counter. */ + return s->entries_added - s->length; + } else if (cmp_id_first == 0) { + /* Return the exact counter of the first entry in the stream. */ + return s->entries_added - s->length + 1; + } + } + + /* The ID is either before an XDEL that fragments the stream or an arbitrary + * ID. Either case, so we can't make a prediction. */ + return SCG_INVALID_ENTRIES_READ; +} + /* As a result of an explicit XCLAIM or XREADGROUP command, new entries * are created in the pending list of the stream and consumers. We need * to propagate this changes in the form of XCLAIM commands. */ @@ -1411,19 +1571,22 @@ void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupnam * that was consumed by XREADGROUP with the NOACK option: in that case we can't * propagate the last ID just using the XCLAIM LASTID option, so we emit * - * XGROUP SETID <key> <groupname> <id> + * XGROUP SETID <key> <groupname> <id> ENTRIESREAD <entries_read> */ void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupname) { - robj *argv[5]; + robj *argv[7]; argv[0] = shared.xgroup; argv[1] = shared.setid; argv[2] = key; argv[3] = groupname; argv[4] = createObjectFromStreamID(&group->last_id); + argv[5] = shared.entriesread; + argv[6] = createStringObjectFromLongLong(group->entries_read); - alsoPropagate(c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL); + alsoPropagate(c->db->id,argv,7,PROPAGATE_AOF|PROPAGATE_REPL); decrRefCount(argv[4]); + decrRefCount(argv[6]); } /* We need this when we want to propagate creation of consumer that was created @@ -1462,6 +1625,10 @@ void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds * function will not return it to the client. * 3. An entry in the pending list will be created for every entry delivered * for the first time to this consumer. + * 4. The group's read counter is incremented if it is already valid and there + * are no future tombstones, or is invalidated (set to 0) otherwise. If the + * counter is invalid to begin with, we try to obtain it for the last + * delivered ID. * * The behavior may be modified passing non-zero flags: * @@ -1518,6 +1685,15 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end while(streamIteratorGetID(&si,&id,&numfields)) { /* Update the group last_id if needed. */ if (group && streamCompareID(&id,&group->last_id) > 0) { + if (group->entries_read != SCG_INVALID_ENTRIES_READ && !streamRangeHasTombstones(s,&id,NULL)) { + /* A valid counter and no future tombstones mean we can + * increment the read counter to keep tracking the group's + * progress. */ + group->entries_read++; + } else if (s->entries_added) { + /* The group's counter may be invalid, so we try to obtain it. */ + group->entries_read = streamEstimateDistanceFromFirstEverEntry(s,&id); + } group->last_id = id; /* Group last ID should be propagated only if NOACK was * specified, otherwise the last id will be included @@ -1791,7 +1967,7 @@ void streamRewriteTrimArgument(client *c, stream *s, int trim_strategy, int idx) arg = createStringObjectFromLongLong(s->length); } else { streamID first_id; - streamGetEdgeID(s, 1, &first_id); + streamGetEdgeID(s,1,0,&first_id); arg = createObjectFromStreamID(&first_id); } @@ -2284,10 +2460,10 @@ void streamFreeConsumer(streamConsumer *sc) { } /* Create a new consumer group in the context of the stream 's', having the - * specified name and last server ID. If a consumer group with the same name - * already existed NULL is returned, otherwise the pointer to the consumer - * group is returned. */ -streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id) { + * specified name, last server ID and reads counter. If a consumer group with + * the same name already exists NULL is returned, otherwise the pointer to the + * consumer group is returned. */ +streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, long long entries_read) { if (s->cgroups == NULL) s->cgroups = raxNew(); if (raxFind(s->cgroups,(unsigned char*)name,namelen) != raxNotFound) return NULL; @@ -2296,6 +2472,7 @@ streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id) { cg->pel = raxNew(); cg->consumers = raxNew(); cg->last_id = *id; + cg->entries_read = entries_read; raxInsert(s->cgroups,(unsigned char*)name,namelen,cg,NULL); return cg; } @@ -2375,8 +2552,8 @@ void streamDelConsumer(streamCG *cg, streamConsumer *consumer) { * Consumer groups commands * ----------------------------------------------------------------------- */ -/* XGROUP CREATE <key> <groupname> <id or $> [MKSTREAM] - * XGROUP SETID <key> <groupname> <id or $> +/* XGROUP CREATE <key> <groupname> <id or $> [MKSTREAM] [ENTRIESADDED count] + * XGROUP SETID <key> <groupname> <id or $> [ENTRIESADDED count] * XGROUP DESTROY <key> <groupname> * XGROUP CREATECONSUMER <key> <groupname> <consumer> * XGROUP DELCONSUMER <key> <groupname> <consumername> */ @@ -2386,21 +2563,33 @@ void xgroupCommand(client *c) { streamCG *cg = NULL; char *opt = c->argv[1]->ptr; /* Subcommand name. */ int mkstream = 0; + long long entries_read = SCG_INVALID_ENTRIES_READ; robj *o; - /* CREATE has an MKSTREAM option that creates the stream if it - * does not exist. */ - if (c->argc == 6 && !strcasecmp(opt,"CREATE")) { - if (strcasecmp(c->argv[5]->ptr,"MKSTREAM")) { - addReplySubcommandSyntaxError(c); - return; - } - mkstream = 1; - grpname = c->argv[3]->ptr; - } - /* Everything but the "HELP" option requires a key and group name. */ if (c->argc >= 4) { + /* Parse optional arguments for CREATE and SETID */ + int i = 5; + int create_subcmd = !strcasecmp(opt,"CREATE"); + int setid_subcmd = !strcasecmp(opt,"SETID"); + while (i < c->argc) { + if (create_subcmd && !strcasecmp(c->argv[i]->ptr,"MKSTREAM")) { + mkstream = 1; + i++; + } else if ((create_subcmd || setid_subcmd) && !strcasecmp(c->argv[i]->ptr,"ENTRIESREAD") && i + 1 < c->argc) { + if (getLongLongFromObjectOrReply(c,c->argv[i+1],&entries_read,NULL) != C_OK) + return; + if (entries_read < 0 && entries_read != SCG_INVALID_ENTRIES_READ) { + addReplyError(c,"value for ENTRIESREAD must be positive or -1"); + return; + } + i += 2; + } else { + addReplySubcommandSyntaxError(c); + return; + } + } + o = lookupKeyWrite(c->db,c->argv[2]); if (o) { if (checkType(c,o,OBJ_STREAM)) return; @@ -2440,18 +2629,20 @@ void xgroupCommand(client *c) { " Create a new consumer group. Options are:", " * MKSTREAM", " Create the empty stream if it does not exist.", +" * ENTRIESREAD entries_read", +" Set the group's entries_read counter (internal use).", "CREATECONSUMER <key> <groupname> <consumer>", " Create a new consumer in the specified group.", "DELCONSUMER <key> <groupname> <consumer>", " Remove the specified consumer.", "DESTROY <key> <groupname>", " Remove the specified group.", -"SETID <key> <groupname> <id|$>", -" Set the current group ID.", +"SETID <key> <groupname> <id|$> [ENTRIESREAD entries_read]", +" Set the current group ID and entries_read counter.", NULL }; addReplyHelp(c, help); - } else if (!strcasecmp(opt,"CREATE") && (c->argc == 5 || c->argc == 6)) { + } else if (!strcasecmp(opt,"CREATE") && (c->argc >= 5 && c->argc <= 8)) { streamID id; if (!strcmp(c->argv[4]->ptr,"$")) { if (s) { @@ -2473,7 +2664,7 @@ NULL signalModifiedKey(c,c->db,c->argv[2]); } - streamCG *cg = streamCreateCG(s,grpname,sdslen(grpname),&id); + streamCG *cg = streamCreateCG(s,grpname,sdslen(grpname),&id,entries_read); if (cg) { addReply(c,shared.ok); server.dirty++; @@ -2482,7 +2673,7 @@ NULL } else { addReplyError(c,"-BUSYGROUP Consumer Group name already exists"); } - } else if (!strcasecmp(opt,"SETID") && c->argc == 5) { + } else if (!strcasecmp(opt,"SETID") && (c->argc == 5 || c->argc == 7)) { streamID id; if (!strcmp(c->argv[4]->ptr,"$")) { id = s->last_id; @@ -2490,6 +2681,7 @@ NULL return; } cg->last_id = id; + cg->entries_read = entries_read; addReply(c,shared.ok); server.dirty++; notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-setid",c->argv[2],c->db->id); @@ -2528,16 +2720,46 @@ NULL } } -/* XSETID <stream> <id> +/* XSETID <stream> <id> [ENTRIESADDED entries_added] [MAXDELETEDID max_deleted_entry_id] * - * Set the internal "last ID" of a stream. */ + * Set the internal "last ID", "added entries" and "maximal deleted entry ID" + * of a stream. */ void xsetidCommand(client *c) { + streamID id, max_xdel_id = {0, 0}; + long long entries_added = -1; + + if (streamParseStrictIDOrReply(c,c->argv[2],&id,0,NULL) != C_OK) + return; + + int i = 3; + while (i < c->argc) { + int moreargs = (c->argc-1) - i; /* Number of additional arguments. */ + char *opt = c->argv[i]->ptr; + if (!strcasecmp(opt,"ENTRIESADDED") && moreargs) { + if (getLongLongFromObjectOrReply(c,c->argv[i+1],&entries_added,NULL) != C_OK) { + return; + } else if (entries_added < 0) { + addReplyError(c,"entries_added must be positive"); + return; + } + i += 2; + } else if (!strcasecmp(opt,"MAXDELETEDID") && moreargs) { + if (streamParseStrictIDOrReply(c,c->argv[i+1],&max_xdel_id,0,NULL) != C_OK) { + return; + } else if (streamCompareID(&id,&max_xdel_id) < 0) { + addReplyError(c,"The ID specified in XSETID is smaller than the provided max_deleted_entry_id"); + return; + } + i += 2; + } else { + addReplyErrorObject(c,shared.syntaxerr); + return; + } + } + robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr); if (o == NULL || checkType(c,o,OBJ_STREAM)) return; - stream *s = o->ptr; - streamID id; - if (streamParseStrictIDOrReply(c,c->argv[2],&id,0,NULL) != C_OK) return; /* If the stream has at least one item, we want to check that the user * is setting a last ID that is equal or greater than the current top @@ -2547,12 +2769,22 @@ void xsetidCommand(client *c) { streamLastValidID(s,&maxid); if (streamCompareID(&id,&maxid) < 0) { - addReplyError(c,"The ID specified in XSETID is smaller than the " - "target stream top item"); + addReplyError(c,"The ID specified in XSETID is smaller than the target stream top item"); + return; + } + + /* If an entries_added was provided, it can't be lower than the length. */ + if (entries_added != -1 && s->length > (uint64_t)entries_added) { + addReplyError(c,"The entries_added specified in XSETID is smaller than the target stream length"); return; } } + s->last_id = id; + if (entries_added != -1) + s->entries_added = entries_added; + if (!streamIDEqZero(&max_xdel_id)) + s->max_deleted_entry_id = max_xdel_id; addReply(c,shared.ok); server.dirty++; notifyKeyspaceEvent(NOTIFY_STREAM,"xsetid",c->argv[1],c->db->id); @@ -2980,23 +3212,28 @@ void xclaimCommand(client *c) { /* Lookup the ID in the group PEL. */ streamNACK *nack = raxFind(group->pel,buf,sizeof(buf)); + /* Item must exist for us to transfer it to another consumer. */ + if (!streamEntryExists(o->ptr,&id)) { + /* Clear this entry from the PEL, it no longer exists */ + if (nack != raxNotFound) { + /* Propagate this change (we are going to delete the NACK). */ + streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],c->argv[j],nack); + propagate_last_id = 0; /* Will be propagated by XCLAIM itself. */ + server.dirty++; + /* Release the NACK */ + raxRemove(group->pel,buf,sizeof(buf),NULL); + raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); + streamFreeNACK(nack); + } + continue; + } + /* If FORCE is passed, let's check if at least the entry * exists in the Stream. In such case, we'll create a new * entry in the PEL from scratch, so that XCLAIM can also * be used to create entries in the PEL. Useful for AOF * and replication of consumer groups. */ if (force && nack == raxNotFound) { - streamIterator myiterator; - streamIteratorStart(&myiterator,o->ptr,&id,&id,0); - int64_t numfields; - int found = 0; - streamID item_id; - if (streamIteratorGetID(&myiterator,&item_id,&numfields)) found = 1; - streamIteratorStop(&myiterator); - - /* Item must exist for us to create a NACK for it. */ - if (!found) continue; - /* Create the NACK. */ nack = streamCreateNACK(NULL); raxInsert(group->pel,buf,sizeof(buf),nack,NULL); @@ -3013,6 +3250,7 @@ void xclaimCommand(client *c) { mstime_t this_idle = now - nack->delivery_time; if (this_idle < minidle) continue; } + if (consumer == NULL && (consumer = streamLookupConsumer(group,name,SLC_DEFAULT)) == NULL) { @@ -3042,9 +3280,7 @@ void xclaimCommand(client *c) { if (justid) { addReplyStreamID(c,&id); } else { - size_t emitted = streamReplyWithRange(c,o->ptr,&id,&id,1,0, - NULL,NULL,STREAM_RWR_RAWENTRIES,NULL); - if (!emitted) addReplyNull(c); + serverAssert(streamReplyWithRange(c,o->ptr,&id,&id,1,0,NULL,NULL,STREAM_RWR_RAWENTRIES,NULL) == 1); } arraylen++; @@ -3138,9 +3374,9 @@ void xautoclaimCommand(client *c) { streamConsumer *consumer = NULL; long long attempts = count*10; - addReplyArrayLen(c, 2); - void *endidptr = addReplyDeferredLen(c); - void *arraylenptr = addReplyDeferredLen(c); + addReplyArrayLen(c, 3); /* We add another reply later */ + void *endidptr = addReplyDeferredLen(c); /* reply[0] */ + void *arraylenptr = addReplyDeferredLen(c); /* reply[1] */ unsigned char startkey[sizeof(streamID)]; streamEncodeID(startkey,&startid); @@ -3150,18 +3386,37 @@ void xautoclaimCommand(client *c) { size_t arraylen = 0; mstime_t now = mstime(); sds name = c->argv[3]->ptr; + streamID *deleted_ids = zmalloc(count * sizeof(streamID)); + int deleted_id_num = 0; while (attempts-- && count && raxNext(&ri)) { streamNACK *nack = ri.data; + streamID id; + streamDecodeID(ri.key, &id); + + /* Item must exist for us to transfer it to another consumer. */ + if (!streamEntryExists(o->ptr,&id)) { + /* Propagate this change (we are going to delete the NACK). */ + robj *idstr = createObjectFromStreamID(&id); + streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],idstr,nack); + decrRefCount(idstr); + server.dirty++; + /* Clear this entry from the PEL, it no longer exists */ + raxRemove(group->pel,ri.key,ri.key_len,NULL); + raxRemove(nack->consumer->pel,ri.key,ri.key_len,NULL); + streamFreeNACK(nack); + /* Remember the ID for later */ + deleted_ids[deleted_id_num++] = id; + raxSeek(&ri,">=",ri.key,ri.key_len); + continue; + } + if (minidle) { mstime_t this_idle = now - nack->delivery_time; if (this_idle < minidle) continue; } - streamID id; - streamDecodeID(ri.key, &id); - if (consumer == NULL && (consumer = streamLookupConsumer(group,name,SLC_DEFAULT)) == NULL) { @@ -3191,11 +3446,7 @@ void xautoclaimCommand(client *c) { if (justid) { addReplyStreamID(c,&id); } else { - size_t emitted = - streamReplyWithRange(c,o->ptr,&id,&id,1,0,NULL,NULL, - STREAM_RWR_RAWENTRIES,NULL); - if (!emitted) - addReplyNull(c); + serverAssert(streamReplyWithRange(c,o->ptr,&id,&id,1,0,NULL,NULL,STREAM_RWR_RAWENTRIES,NULL) == 1); } arraylen++; count--; @@ -3221,6 +3472,12 @@ void xautoclaimCommand(client *c) { setDeferredArrayLen(c,arraylenptr,arraylen); setDeferredReplyStreamID(c,endidptr,&endid); + addReplyArrayLen(c, deleted_id_num); /* reply[2] */ + for (int i = 0; i < deleted_id_num; i++) { + addReplyStreamID(c, &deleted_ids[i]); + } + zfree(deleted_ids); + preventCommandPropagation(c); } @@ -3250,8 +3507,31 @@ void xdelCommand(client *c) { /* Actually apply the command. */ int deleted = 0; + int first_entry = 0; for (int j = 2; j < c->argc; j++) { - deleted += streamDeleteItem(s,&ids[j-2]); + streamID *id = &ids[j-2]; + if (streamDeleteItem(s,id)) { + /* We want to know if the first entry in the stream was deleted + * so we can later set the new one. */ + if (streamCompareID(id,&s->first_id) == 0) { + first_entry = 1; + } + /* Update the stream's maximal tombstone if needed. */ + if (streamCompareID(id,&s->max_deleted_entry_id) > 0) { + s->max_deleted_entry_id = *id; + } + deleted++; + }; + } + + /* Update the stream's first ID. */ + if (deleted) { + if (s->length == 0) { + s->first_id.ms = 0; + s->first_id.seq = 0; + } else if (first_entry) { + streamGetEdgeID(s,1,1,&s->first_id); + } } /* Propagate the write if needed. */ @@ -3359,7 +3639,7 @@ void xinfoReplyWithStreamInfo(client *c, stream *s) { } } - addReplyMapLen(c,full ? 6 : 7); + addReplyMapLen(c,full ? 9 : 10); addReplyBulkCString(c,"length"); addReplyLongLong(c,s->length); addReplyBulkCString(c,"radix-tree-keys"); @@ -3368,6 +3648,12 @@ void xinfoReplyWithStreamInfo(client *c, stream *s) { addReplyLongLong(c,s->rax->numnodes); addReplyBulkCString(c,"last-generated-id"); addReplyStreamID(c,&s->last_id); + addReplyBulkCString(c,"max-deleted-entry-id"); + addReplyStreamID(c,&s->max_deleted_entry_id); + addReplyBulkCString(c,"entries-added"); + addReplyLongLong(c,s->entries_added); + addReplyBulkCString(c,"recorded-first-entry-id"); + addReplyStreamID(c,&s->first_id); if (!full) { /* XINFO STREAM <key> */ @@ -3406,7 +3692,7 @@ void xinfoReplyWithStreamInfo(client *c, stream *s) { raxSeek(&ri_cgroups,"^",NULL,0); while(raxNext(&ri_cgroups)) { streamCG *cg = ri_cgroups.data; - addReplyMapLen(c,5); + addReplyMapLen(c,7); /* Name */ addReplyBulkCString(c,"name"); @@ -3416,6 +3702,18 @@ void xinfoReplyWithStreamInfo(client *c, stream *s) { addReplyBulkCString(c,"last-delivered-id"); addReplyStreamID(c,&cg->last_id); + /* Read counter of the last delivered ID */ + addReplyBulkCString(c,"entries-read"); + if (cg->entries_read != SCG_INVALID_ENTRIES_READ) { + addReplyLongLong(c,cg->entries_read); + } else { + addReplyNull(c); + } + + /* Group lag */ + addReplyBulkCString(c,"lag"); + streamReplyWithCGLag(c,s,cg); + /* Group PEL count */ addReplyBulkCString(c,"pel-count"); addReplyLongLong(c,raxSize(cg->pel)); @@ -3593,7 +3891,7 @@ NULL raxSeek(&ri,"^",NULL,0); while(raxNext(&ri)) { streamCG *cg = ri.data; - addReplyMapLen(c,4); + addReplyMapLen(c,6); addReplyBulkCString(c,"name"); addReplyBulkCBuffer(c,ri.key,ri.key_len); addReplyBulkCString(c,"consumers"); @@ -3602,6 +3900,14 @@ NULL addReplyLongLong(c,raxSize(cg->pel)); addReplyBulkCString(c,"last-delivered-id"); addReplyStreamID(c,&cg->last_id); + addReplyBulkCString(c,"entries-read"); + if (cg->entries_read != SCG_INVALID_ENTRIES_READ) { + addReplyLongLong(c,cg->entries_read); + } else { + addReplyNull(c); + } + addReplyBulkCString(c,"lag"); + streamReplyWithCGLag(c,s,cg); } raxStop(&ri); } else if (!strcasecmp(opt,"STREAM")) { |