summaryrefslogtreecommitdiff
path: root/docs/userguide/pools.rst
blob: 9cc6fb0023607a4ad68740bc6ee993b8df0b77a8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
.. _guide-pools:

===============================
 Connection and Producer Pools
===============================

.. _default-pools:

Default Pools
=============

Kombu ships with two global pools: one connection pool,
and one producer pool.

These are convenient and the fact that they are global
may not be an issue as connections should often be limited
at the process level, rather than per thread/application
and so on, but if you need custom pools per thread
see :ref:`custom-pool-groups`.


.. _default-connections:

The connection pool group
-------------------------

The connection pools are available as :attr:`kombu.pools.connections`.
This is a pool group, which means you give it a connection instance,
and you get a pool instance back. We have one pool per connection
instance to support multiple connections in the same app.
All connection instances with the same connection parameters will
get the same pool:

.. code-block:: pycon

    >>> from kombu import Connection
    >>> from kombu.pools import connections

    >>> connections[Connection('redis://localhost:6379')]
    <kombu.connection.ConnectionPool object at 0x101805650>
    >>> connections[Connection('redis://localhost:6379')]
    <kombu.connection.ConnectionPool object at 0x101805650>

Let's acquire and release a connection:

.. code-block:: python

    from kombu import Connection
    from kombu.pools import connections

    connection = Connection('redis://localhost:6379')

    with connections[connection].acquire(block=True) as conn:
        print('Got connection: {0!r}'.format(connection.as_uri()))

.. note::

    The ``block=True`` here means that the acquire call will block
    until a connection is available in the pool.
    Note that this will block forever in case there is a deadlock
    in your code where a connection is not released. There
    is a ``timeout`` argument you can use to safeguard against this
    (see :meth:`kombu.connection.Resource.acquire`).

    If blocking is disabled and there aren't any connections
    left in the pool an :class:`kombu.exceptions.ConnectionLimitExceeded`
    exception will be raised.

That's about it. If you need to connect to multiple brokers
at once you can do that too:

.. code-block:: python

    from kombu import Connection
    from kombu.pools import connections

    c1 = Connection('amqp://')
    c2 = Connection('redis://')

    with connections[c1].acquire(block=True) as conn1:
        with connections[c2].acquire(block=True) as conn2:
            # ....

.. _default-producers:

The producer pool group
=======================

This is a pool group just like the connections, except
that it manages :class:`~kombu.Producer` instances
used to publish messages.

Here is an example using the producer pool to publish a message
to the ``news`` exchange:

.. code-block:: python

    from kombu import Connection, Exchange
    from kombu.pools import producers

    # The exchange we send our news articles to.
    news_exchange = Exchange('news')

    # The article we want to send
    article = {'title': 'No cellular coverage on the tube for 2012',
               'ingress': 'yadda yadda yadda'}

    # The broker where our exchange is.
    connection = Connection('amqp://guest:guest@localhost:5672//')

    with producers[connection].acquire(block=True) as producer:
        producer.publish(
            article,
            exchange=news_exchange,
            routing_key='domestic',
            declare=[news_exchange],
            serializer='json',
            compression='zlib')

.. _default-pool-limits:

Setting pool limits
-------------------

By default every connection instance has a limit of 200 connections.
You can change this limit using :func:`kombu.pools.set_limit`.
You are able to grow the pool at runtime, but you can't shrink it,
so it is best to set the limit as early as possible after your application
starts:

.. code-block:: pycon

    >>> from kombu import pools
    >>> pools.set_limit()

Resetting all pools
-------------------

You can close all active connections and reset all pool groups by
using the :func:`kombu.pools.reset` function. Note that this
will not respect anything currently using these connections,
so will just drag the connections away from under their feet:
you should be very careful before you use this.

Kombu will reset the pools if the process is forked,
so that forked processes start with clean pool groups.

.. _custom-pool-groups:

Custom Pool Groups
==================

To maintain your own pool groups you should create your own
:class:`~kombu.pools.Connections` and :class:`kombu.pools.Producers`
instances:

.. code-block:: python

    from kombu import pools
    from kombu import Connection

    connections = pools.Connections(limit=100)
    producers = pools.Producers(limit=connections.limit)

    connection = Connection('amqp://guest:guest@localhost:5672//')

    with connections[connection].acquire(block=True):
        # ...


If you want to use the global limit that can be set with
:func:`~kombu.pools.set_limit` you can use a special value as the ``limit``
argument:

.. code-block:: python

    from kombu import pools

    connections = pools.Connections(limit=pools.use_default_limit)