From e1988c6c04ea53bda32db60441e3475e8e6c8e9a Mon Sep 17 00:00:00 2001 From: dvora-h <67596500+dvora-h@users.noreply.github.com> Date: Tue, 3 May 2022 01:14:06 +0300 Subject: Add support for redis 7 streams features (#2157) * xadd * streams redis 7 * linters * test xinfo stream * test xinfo stream * test xclaim --- redis/commands/core.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) (limited to 'redis/commands/core.py') diff --git a/redis/commands/core.py b/redis/commands/core.py index ac1b6c7..8bbcda3 100644 --- a/redis/commands/core.py +++ b/redis/commands/core.py @@ -3505,6 +3505,7 @@ class StreamCommands(CommandsProtocol): groupname: GroupT, id: StreamIdT = "$", mkstream: bool = False, + entries_read: Optional[int] = None, ) -> ResponseT: """ Create a new consumer group associated with a stream. @@ -3517,6 +3518,9 @@ class StreamCommands(CommandsProtocol): pieces: list[EncodableT] = ["XGROUP CREATE", name, groupname, id] if mkstream: pieces.append(b"MKSTREAM") + if entries_read is not None: + pieces.extend(["ENTRIESREAD", entries_read]) + return self.execute_command(*pieces) def xgroup_delconsumer( @@ -3572,6 +3576,7 @@ class StreamCommands(CommandsProtocol): name: KeyT, groupname: GroupT, id: StreamIdT, + entries_read: Optional[int] = None, ) -> ResponseT: """ Set the consumer group last delivered ID to something else. @@ -3581,7 +3586,10 @@ class StreamCommands(CommandsProtocol): For more information see https://redis.io/commands/xgroup-setid """ - return self.execute_command("XGROUP SETID", name, groupname, id) + pieces = [name, groupname, id] + if entries_read is not None: + pieces.extend(["ENTRIESREAD", entries_read]) + return self.execute_command("XGROUP SETID", *pieces) def xinfo_consumers(self, name: KeyT, groupname: GroupT) -> ResponseT: """ -- cgit v1.2.1