diff options
author | Utkarsh Gupta <utkarshgupta137@gmail.com> | 2022-06-01 17:15:50 +0530 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-06-01 14:45:50 +0300 |
commit | 05fc203f68c24fbd54c7b338b4610fa62972c326 (patch) | |
tree | 4550f62dc9c2ce6d2822b9bbaeaf227cb82ad75b /benchmarks | |
parent | 7880460b72aca49aa5b9512f0995c0d17d884a7d (diff) | |
download | redis-py-05fc203f68c24fbd54c7b338b4610fa62972c326.tar.gz |
async_cluster: optimisations (#2205)
- return true from execute_pipeline if there are any errors
- use todo list to speedup retries
- store initialisation node in CommandsParser object
- add sync context manager for pipeline
- use if/else instead of try/except
- make command a function argument in _determine_nodes & _determine_slot
- add async cluster pipeline benchmark script
Diffstat (limited to 'benchmarks')
-rw-r--r-- | benchmarks/cluster_async.py | 4 | ||||
-rw-r--r-- | benchmarks/cluster_async_pipeline.py | 107 |
2 files changed, 109 insertions, 2 deletions
diff --git a/benchmarks/cluster_async.py b/benchmarks/cluster_async.py index fd2ab46..17dd52b 100644 --- a/benchmarks/cluster_async.py +++ b/benchmarks/cluster_async.py @@ -249,8 +249,8 @@ if __name__ == "__main__": port = 16379 password = None - count = 1000 - size = 16 + count = 10000 + size = 256 asyncio.run(main("asyncio")) asyncio.run(main("asyncio", gather=False)) diff --git a/benchmarks/cluster_async_pipeline.py b/benchmarks/cluster_async_pipeline.py new file mode 100644 index 0000000..af45b44 --- /dev/null +++ b/benchmarks/cluster_async_pipeline.py @@ -0,0 +1,107 @@ +import asyncio +import functools +import time + +import aioredis_cluster +import aredis +import uvloop + +import redis.asyncio as redispy + + +def timer(func): + @functools.wraps(func) + async def wrapper(*args, **kwargs): + tic = time.perf_counter() + await func(*args, **kwargs) + toc = time.perf_counter() + return f"{toc - tic:.4f}" + + return wrapper + + +@timer +async def warmup(client): + await asyncio.gather( + *(asyncio.create_task(client.exists(f"bench:warmup_{i}")) for i in range(100)) + ) + + +@timer +async def run(client): + data_str = "a" * size + data_int = int("1" * size) + + for i in range(count): + with client.pipeline() as pipe: + await ( + pipe.set(f"bench:str_{i}", data_str) + .set(f"bench:int_{i}", data_int) + .get(f"bench:str_{i}") + .get(f"bench:int_{i}") + .hset("bench:hset", str(i), data_str) + .hget("bench:hset", str(i)) + .incr("bench:incr") + .lpush("bench:lpush", data_int) + .lrange("bench:lpush", 0, 300) + .lpop("bench:lpush") + .execute() + ) + + +async def main(loop): + arc = aredis.StrictRedisCluster( + host=host, + port=port, + password=password, + max_connections=2**31, + max_connections_per_node=2**31, + readonly=False, + reinitialize_steps=count, + skip_full_coverage_check=True, + decode_responses=False, + max_idle_time=count, + idle_check_interval=count, + ) + print(f"{loop} {await warmup(arc)} aredis") + print(await run(arc)) + arc.connection_pool.disconnect() + + aiorc = await aioredis_cluster.create_redis_cluster( + [(host, port)], + password=password, + state_reload_interval=count, + idle_connection_timeout=count, + pool_maxsize=2**31, + ) + print(f"{loop} {await warmup(aiorc)} aioredis-cluster") + print(await run(aiorc)) + aiorc.close() + await aiorc.wait_closed() + + async with redispy.RedisCluster( + host=host, + port=port, + password=password, + reinitialize_steps=count, + read_from_replicas=False, + decode_responses=False, + max_connections=2**31, + ) as rca: + print(f"{loop} {await warmup(rca)} redispy") + print(await run(rca)) + + +if __name__ == "__main__": + host = "localhost" + port = 16379 + password = None + + count = 10000 + size = 256 + + asyncio.run(main("asyncio")) + + uvloop.install() + + asyncio.run(main("uvloop")) |