1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
|
from __future__ import with_statement
import os
import pytest
import redis
import time
from threading import Thread
from redis._compat import Queue
class DummyConnection(object):
def __init__(self, **kwargs):
self.kwargs = kwargs
self.pid = os.getpid()
class TestConnectionPoolCase(object):
def get_pool(self, connection_info=None, max_connections=None):
connection_info = connection_info or {'a': 1, 'b': 2, 'c': 3}
pool = redis.ConnectionPool(
connection_class=DummyConnection, max_connections=max_connections,
**connection_info)
return pool
def test_connection_creation(self):
connection_info = {'foo': 'bar', 'biz': 'baz'}
pool = self.get_pool(connection_info=connection_info)
connection = pool.get_connection('_')
assert connection.kwargs == connection_info
def test_multiple_connections(self):
pool = self.get_pool()
c1 = pool.get_connection('_')
c2 = pool.get_connection('_')
assert c1 != c2
def test_max_connections(self):
pool = self.get_pool(max_connections=2)
pool.get_connection('_')
pool.get_connection('_')
with pytest.raises(redis.ConnectionError):
pool.get_connection('_')
def test_reuse_previously_released_connection(self):
pool = self.get_pool()
c1 = pool.get_connection('_')
pool.release(c1)
c2 = pool.get_connection('_')
assert c1 == c2
class TestBlockingConnectionPool(object):
def get_pool(self, connection_info=None, max_connections=10, timeout=20):
connection_info = connection_info or {'a': 1, 'b': 2, 'c': 3}
pool = redis.BlockingConnectionPool(connection_class=DummyConnection,
max_connections=max_connections,
timeout=timeout, **connection_info)
return pool
def test_connection_creation(self):
connection_info = {'foo': 'bar', 'biz': 'baz'}
pool = self.get_pool(connection_info=connection_info)
connection = pool.get_connection('_')
assert connection.kwargs == connection_info
def test_multiple_connections(self):
pool = self.get_pool()
c1 = pool.get_connection('_')
c2 = pool.get_connection('_')
assert c1 != c2
def test_max_connections_blocks(self):
"""Getting a connection should block for until available."""
q = Queue()
q.put_nowait('Not yet got')
pool = self.get_pool(max_connections=2, timeout=5)
c1 = pool.get_connection('_')
pool.get_connection('_')
target = lambda: q.put_nowait(pool.get_connection('_'))
Thread(target=target).start()
# Blocks while non available.
time.sleep(0.05)
c3 = q.get_nowait()
assert c3 == 'Not yet got'
# Then got when available.
pool.release(c1)
time.sleep(0.05)
c3 = q.get_nowait()
assert c1 == c3
def test_max_connections_timeout(self):
"""Getting a connection raises ``ConnectionError`` after timeout."""
pool = self.get_pool(max_connections=2, timeout=0.1)
pool.get_connection('_')
pool.get_connection('_')
with pytest.raises(redis.ConnectionError):
pool.get_connection('_')
def test_reuse_previously_released_connection(self):
pool = self.get_pool()
c1 = pool.get_connection('_')
pool.release(c1)
c2 = pool.get_connection('_')
assert c1 == c2
|