summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordvora-h <67596500+dvora-h@users.noreply.github.com>2022-05-03 01:14:06 +0300
committerGitHub <noreply@github.com>2022-05-03 01:14:06 +0300
commite1988c6c04ea53bda32db60441e3475e8e6c8e9a (patch)
treeda49a140dd7a8abfdbb5b1572873bbfb236a542a
parentfdb9075745060e7a3633248fa6f419e895f010b7 (diff)
downloadredis-py-e1988c6c04ea53bda32db60441e3475e8e6c8e9a.tar.gz
Add support for redis 7 streams features (#2157)
* xadd * streams redis 7 * linters * test xinfo stream * test xinfo stream * test xclaim
-rw-r--r--redis/commands/core.py10
-rw-r--r--tests/test_commands.py55
2 files changed, 55 insertions, 10 deletions
diff --git a/redis/commands/core.py b/redis/commands/core.py
index ac1b6c7..8bbcda3 100644
--- a/redis/commands/core.py
+++ b/redis/commands/core.py
@@ -3505,6 +3505,7 @@ class StreamCommands(CommandsProtocol):
groupname: GroupT,
id: StreamIdT = "$",
mkstream: bool = False,
+ entries_read: Optional[int] = None,
) -> ResponseT:
"""
Create a new consumer group associated with a stream.
@@ -3517,6 +3518,9 @@ class StreamCommands(CommandsProtocol):
pieces: list[EncodableT] = ["XGROUP CREATE", name, groupname, id]
if mkstream:
pieces.append(b"MKSTREAM")
+ if entries_read is not None:
+ pieces.extend(["ENTRIESREAD", entries_read])
+
return self.execute_command(*pieces)
def xgroup_delconsumer(
@@ -3572,6 +3576,7 @@ class StreamCommands(CommandsProtocol):
name: KeyT,
groupname: GroupT,
id: StreamIdT,
+ entries_read: Optional[int] = None,
) -> ResponseT:
"""
Set the consumer group last delivered ID to something else.
@@ -3581,7 +3586,10 @@ class StreamCommands(CommandsProtocol):
For more information see https://redis.io/commands/xgroup-setid
"""
- return self.execute_command("XGROUP SETID", name, groupname, id)
+ pieces = [name, groupname, id]
+ if entries_read is not None:
+ pieces.extend(["ENTRIESREAD", entries_read])
+ return self.execute_command("XGROUP SETID", *pieces)
def xinfo_consumers(self, name: KeyT, groupname: GroupT) -> ResponseT:
"""
diff --git a/tests/test_commands.py b/tests/test_commands.py
index 11c9939..5975412 100644
--- a/tests/test_commands.py
+++ b/tests/test_commands.py
@@ -3735,6 +3735,13 @@ class TestRedisCommands:
r.xadd(stream, {"foo": "bar"})
assert r.xadd(stream, {"foo": "bar"}, approximate=True, minid=m3)
+ @skip_if_server_version_lt("7.0.0")
+ def test_xadd_explicit_ms(self, r: redis.Redis):
+ stream = "stream"
+ message_id = r.xadd(stream, {"foo": "bar"}, "9999999999999999999-*")
+ ms = message_id[: message_id.index(b"-")]
+ assert ms == b"9999999999999999999"
+
@skip_if_server_version_lt("6.2.0")
def test_xautoclaim(self, r):
stream = "stream"
@@ -3820,7 +3827,7 @@ class TestRedisCommands:
== [message_id]
)
- @skip_if_server_version_lt("5.0.0")
+ @skip_if_server_version_lt("7.0.0")
def test_xclaim_trimmed(self, r):
# xclaim should not raise an exception if the item is not there
stream = "stream"
@@ -3841,9 +3848,8 @@ class TestRedisCommands:
# xclaim them from consumer2
# the item that is still in the stream should be returned
item = r.xclaim(stream, group, "consumer2", 0, [sid1, sid2])
- assert len(item) == 2
- assert item[0] == (None, None)
- assert item[1][0] == sid2
+ assert len(item) == 1
+ assert item[0][0] == sid2
@skip_if_server_version_lt("5.0.0")
def test_xdel(self, r):
@@ -3860,7 +3866,7 @@ class TestRedisCommands:
assert r.xdel(stream, m1) == 1
assert r.xdel(stream, m2, m3) == 2
- @skip_if_server_version_lt("5.0.0")
+ @skip_if_server_version_lt("7.0.0")
def test_xgroup_create(self, r):
# tests xgroup_create and xinfo_groups
stream = "stream"
@@ -3877,11 +3883,13 @@ class TestRedisCommands:
"consumers": 0,
"pending": 0,
"last-delivered-id": b"0-0",
+ "entries-read": None,
+ "lag": 1,
}
]
assert r.xinfo_groups(stream) == expected
- @skip_if_server_version_lt("5.0.0")
+ @skip_if_server_version_lt("7.0.0")
def test_xgroup_create_mkstream(self, r):
# tests xgroup_create and xinfo_groups
stream = "stream"
@@ -3901,6 +3909,30 @@ class TestRedisCommands:
"consumers": 0,
"pending": 0,
"last-delivered-id": b"0-0",
+ "entries-read": None,
+ "lag": 0,
+ }
+ ]
+ assert r.xinfo_groups(stream) == expected
+
+ @skip_if_server_version_lt("7.0.0")
+ def test_xgroup_create_entriesread(self, r: redis.Redis):
+ stream = "stream"
+ group = "group"
+ r.xadd(stream, {"foo": "bar"})
+
+ # no group is setup yet, no info to obtain
+ assert r.xinfo_groups(stream) == []
+
+ assert r.xgroup_create(stream, group, 0, entries_read=7)
+ expected = [
+ {
+ "name": group.encode(),
+ "consumers": 0,
+ "pending": 0,
+ "last-delivered-id": b"0-0",
+ "entries-read": 7,
+ "lag": -6,
}
]
assert r.xinfo_groups(stream) == expected
@@ -3951,7 +3983,7 @@ class TestRedisCommands:
r.xgroup_create(stream, group, 0)
assert r.xgroup_destroy(stream, group)
- @skip_if_server_version_lt("5.0.0")
+ @skip_if_server_version_lt("7.0.0")
def test_xgroup_setid(self, r):
stream = "stream"
group = "group"
@@ -3959,13 +3991,15 @@ class TestRedisCommands:
r.xgroup_create(stream, group, 0)
# advance the last_delivered_id to the message_id
- r.xgroup_setid(stream, group, message_id)
+ r.xgroup_setid(stream, group, message_id, entries_read=2)
expected = [
{
"name": group.encode(),
"consumers": 0,
"pending": 0,
"last-delivered-id": message_id,
+ "entries-read": 2,
+ "lag": -1,
}
]
assert r.xinfo_groups(stream) == expected
@@ -3995,7 +4029,7 @@ class TestRedisCommands:
assert isinstance(info[1].pop("idle"), int)
assert info == expected
- @skip_if_server_version_lt("5.0.0")
+ @skip_if_server_version_lt("7.0.0")
def test_xinfo_stream(self, r):
stream = "stream"
m1 = r.xadd(stream, {"foo": "bar"})
@@ -4005,6 +4039,9 @@ class TestRedisCommands:
assert info["length"] == 2
assert info["first-entry"] == get_stream_message(r, stream, m1)
assert info["last-entry"] == get_stream_message(r, stream, m2)
+ assert info["max-deleted-entry-id"] == b"0-0"
+ assert info["entries-added"] == 2
+ assert info["recorded-first-entry-id"] == m1
@skip_if_server_version_lt("6.0.0")
def test_xinfo_stream_full(self, r):