summaryrefslogtreecommitdiff
path: root/doc/src
diff options
context:
space:
mode:
authorOleksandr Shulgin <oleksandr.shulgin@zalando.de>2015-10-20 12:36:13 +0200
committerOleksandr Shulgin <oleksandr.shulgin@zalando.de>2015-10-20 12:36:13 +0200
commit23abe4f501ce60468e9e6b089910068265342368 (patch)
treeffc981cb8198d8ff90529635afc4bbf1f1ea0e6e /doc/src
parent0bb81fc84811134bca70b59daa4661bd0697f2ff (diff)
downloadpsycopg2-23abe4f501ce60468e9e6b089910068265342368.tar.gz
Add quick start to the replication doc, minor doc fixes.
Diffstat (limited to 'doc/src')
-rw-r--r--doc/src/extras.rst248
1 files changed, 159 insertions, 89 deletions
diff --git a/doc/src/extras.rst b/doc/src/extras.rst
index 9384a96..2a7bed2 100644
--- a/doc/src/extras.rst
+++ b/doc/src/extras.rst
@@ -141,8 +141,81 @@ Logging cursor
.. autoclass:: MinTimeLoggingCursor
-Replication cursor
-^^^^^^^^^^^^^^^^^^
+Replication protocol support
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Modern PostgreSQL servers (version 9.0 and above) support replication. The
+replication protocol is built on top of the client-server protocol and can be
+operated using ``libpq``, as such it can be also operated by ``psycopg2``.
+The replication protocol can be operated on both synchronous and
+:ref:`asynchronous <async-support>` connections.
+
+Server version 9.4 adds a new feature called *Logical Replication*.
+
+.. seealso::
+
+ - PostgreSQL `Streaming Replication Protocol`__
+
+ .. __: http://www.postgresql.org/docs/current/static/protocol-replication.html
+
+
+Logical replication Quick-Start
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+You must be using PostgreSQL server version 9.4 or above to run this quick
+start.
+
+Make sure that replication connections are permitted for user ``postgres`` in
+``pg_hba.conf`` and reload the server configuration. You also need to set
+``wal_level=logical`` and ``max_wal_senders``, ``max_replication_slots`` to
+value greater than zero in ``postgresql.conf`` (these changes require a server
+restart). Create a database ``psycopg2test``.
+
+Then run the following code to quickly try the replication support out. This
+is not production code -- it has no error handling, it sends feedback too
+often, etc. -- and it's only intended as a simple demo of logical
+replication::
+
+ from __future__ import print_function
+ import sys
+ import psycopg2
+ import psycopg2.extras
+
+ conn = psycopg2.connect('dbname=psycopg2test user=postgres',
+ connection_factory=psycopg2.extras.LogicalReplicationConnection)
+ cur = conn.cursor()
+ try:
+ cur.start_replication(slot_name='pytest')
+ except psycopg2.ProgrammingError:
+ cur.create_replication_slot('pytest', output_plugin='test_decoding')
+ cur.start_replication(slot_name='pytest')
+
+ class DemoConsumer(object):
+ def __call__(self, msg):
+ print(msg.payload)
+ msg.cursor.send_feedback(flush_lsn=msg.data_start)
+
+ democonsumer = DemoConsumer()
+
+ print("Starting streaming, press Control-C to end...", file=sys.stderr)
+ try:
+ cur.consume_stream(democonsumer)
+ except KeyboardInterrupt:
+ cur.close()
+ conn.close()
+ print("The slot 'pytest' still exists. Drop it with SELECT pg_drop_replication_slot('pytest'); if no longer needed.", file=sys.stderr)
+ print("WARNING: Transaction logs will accumulate in pg_xlog until the slot is dropped.", file=sys.stderr)
+
+
+You can now make changes to the ``psycopg2test`` database using a normal
+psycopg2 session, ``psql``, etc. and see the logical decoding stream printed
+by this demo client.
+
+This will continue running until terminated with ``Control-C``.
+
+
+Replication connection and cursor classes
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. autoclass:: ReplicationConnectionBase
@@ -177,17 +250,11 @@ The following replication types are defined:
phys_cur = phys_conn.cursor()
Both `LogicalReplicationConnection` and `PhysicalReplicationConnection` use
- `ReplicationCursor` for actual communication on the connection.
-
-.. seealso::
-
- - PostgreSQL `Streaming Replication Protocol`__
-
- .. __: http://www.postgresql.org/docs/current/static/protocol-replication.html
+ `ReplicationCursor` for actual communication with the server.
-The individual messages in the replication stream are presented by
-`ReplicationMessage` objects:
+The individual messages in the replication stream are represented by
+`ReplicationMessage` objects (both logical and physical type):
.. autoclass:: ReplicationMessage
@@ -249,7 +316,7 @@ The individual messages in the replication stream are presented by
replication slot is created by default. No output plugin parameter is
required or allowed when creating a physical replication slot.
- In either case, the type of slot being created can be specified
+ In either case the type of slot being created can be specified
explicitly using *slot_type* parameter.
Replication slots are a feature of PostgreSQL server starting with
@@ -295,25 +362,25 @@ The individual messages in the replication stream are presented by
replication can be used with both types of connection.
On the other hand, physical replication doesn't require a named
- replication slot to be used, only logical one does. In any case,
- logical replication and replication slots are a feature of PostgreSQL
- server starting with version 9.4. Physical replication can be used
- starting with 9.0.
+ replication slot to be used, only logical replication does. In any
+ case logical replication and replication slots are a feature of
+ PostgreSQL server starting with version 9.4. Physical replication can
+ be used starting with 9.0.
If *start_lsn* is specified, the requested stream will start from that
- LSN. The default is `!None`, which passes the LSN ``0/0``, causing
- replay to begin at the last point at which the server got replay
- confirmation from the client for, or the oldest available point for a
- new slot.
+ LSN. The default is `!None` which passes the LSN ``0/0`` causing
+ replay to begin at the last point for which the server got flush
+ confirmation from the client, or the oldest available point for a new
+ slot.
The server might produce an error if a WAL file for the given LSN has
- already been recycled, or it may silently start streaming from a later
+ already been recycled or it may silently start streaming from a later
position: the client can verify the actual position using information
- provided the `ReplicationMessage` attributes. The exact server
+ provided by the `ReplicationMessage` attributes. The exact server
behavior depends on the type of replication and use of slots.
- A *timeline* parameter can only be specified with physical replication
- and only starting with server version 9.3.
+ The *timeline* parameter can only be specified with physical
+ replication and only starting with server version 9.3.
A dictionary of *options* may be passed to the logical decoding plugin
on a logical replication slot. The set of supported options depends
@@ -324,8 +391,9 @@ The individual messages in the replication stream are presented by
`start_replication_expert()` internally.
After starting the replication, to actually consume the incoming
- server messages, use `consume_stream()` or implement a loop around
- `read_message()` in case of asynchronous connection.
+ server messages use `consume_stream()` or implement a loop around
+ `read_message()` in case of :ref:`asynchronous connection
+ <async-support>`.
.. method:: start_replication_expert(command)
@@ -343,66 +411,66 @@ The individual messages in the replication stream are presented by
This method can only be used with synchronous connection. For
asynchronous connections see `read_message()`.
- Before calling this method to consume the stream, use
+ Before calling this method to consume the stream use
`start_replication()` first.
- When called, this method enters an endless loop, reading messages from
- the server and passing them to ``consume()``, then waiting for more
- messages from the server. In order to make this method break out of
- the loop and return, ``consume()`` can throw a `StopReplication`
- exception (any unhandled exception will make it break out of the loop
- as well).
+ This method enters an endless loop reading messages from the server
+ and passing them to ``consume()``, then waiting for more messages from
+ the server. In order to make this method break out of the loop and
+ return, ``consume()`` can throw a `StopReplication` exception. Any
+ unhandled exception will make it break out of the loop as well.
- If *decode* is set to `!True`, the messages read from the server are
+ If *decode* is set to `!True` the messages read from the server are
converted according to the connection `~connection.encoding`. This
parameter should not be set with physical replication.
- This method also sends keepalive messages to the server, in case there
+ This method also sends keepalive messages to the server in case there
were no new data from the server for the duration of
*keepalive_interval* (in seconds). The value of this parameter must
- be equal to at least 1 second, but it can have a fractional part.
+ be set to at least 1 second, but it can have a fractional part.
+
+ The *msg* objects passed to ``consume()`` are instances of
+ `ReplicationMessage` class.
+
+ After processing certain amount of messages the client should send a
+ confirmation message to the server. This should be done by calling
+ `send_feedback()` method on the corresponding replication cursor. A
+ reference to the cursor is provided in the `ReplicationMessage` as an
+ attribute.
The following example is a sketch implementation of ``consume()``
callable for logical replication::
class LogicalStreamConsumer(object):
+ ...
+
def __call__(self, msg):
- self.store_message_data(msg.payload)
+ self.process_message(msg.payload)
- if self.should_report_to_the_server_now(msg):
+ if self.should_send_feedback(msg):
msg.cursor.send_feedback(flush_lsn=msg.data_start)
consumer = LogicalStreamConsumer()
cur.consume_stream(consumer, decode=True)
- The *msg* objects passed to ``consume()`` are instances of
- `ReplicationMessage` class.
-
- After storing certain amount of messages' data reliably, the client
- should send a confirmation message to the server. This should be done
- by calling `send_feedback()` method on the corresponding replication
- cursor. A reference to the cursor is provided in the
- `ReplicationMessage` as an attribute.
-
.. warning::
- When using replication with slots, failure to properly notify the
- server by constantly consuming and reporting success at
- appropriate times can eventually lead to "disk full" condition on
- the server, because the server retains all the WAL segments that
- might be needed to stream the changes via all of the currently
- open replication slots.
+ When using replication with slots, failure to constantly consume
+ *and* report success to the server appropriately can eventually
+ lead to "disk full" condition on the server, because the server
+ retains all the WAL segments that might be needed to stream the
+ changes via all of the currently open replication slots.
- On the other hand, it is not recommended to send a confirmation
- after every processed message, since that will put an unnecessary
- load on network and the server. A possible strategy is to confirm
- after every COMMIT message.
+ On the other hand, it is not recommended to send confirmation
+ after *every* processed message, since that will put an
+ unnecessary load on network and the server. A possible strategy
+ is to confirm after every COMMIT message.
.. method:: send_feedback(write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False)
:param write_lsn: a LSN position up to which the client has written the data locally
- :param flush_lsn: a LSN position up to which the client has stored the
+ :param flush_lsn: a LSN position up to which the client has processed the
data reliably (the server is allowed to discard all
and every data that predates this LSN)
:param apply_lsn: a LSN position up to which the warm standby server
@@ -411,7 +479,7 @@ The individual messages in the replication stream are presented by
:param reply: request the server to send back a keepalive message immediately
Use this method to report to the server that all messages up to a
- certain LSN position have been stored on the client and may be
+ certain LSN position have been processed on the client and may be
discarded on the server.
This method can also be called with all default parameters' values to
@@ -433,13 +501,14 @@ The individual messages in the replication stream are presented by
Returns `!True` if the feedback message was sent successfully,
`!False` otherwise.
- Low-level methods for asynchronous connection operation.
+ Low-level replication cursor methods for :ref:`asynchronous connection
+ <async-support>` operation.
- With the synchronous connection, a call to `consume_stream()` handles all
+ With the synchronous connection a call to `consume_stream()` handles all
the complexity of handling the incoming messages and sending keepalive
replies, but at times it might be beneficial to use low-level interface
- for better control, in particular to `~select.select()` on multiple
- sockets. The following methods are provided for asynchronous operation:
+ for better control, in particular to `~select` on multiple sockets. The
+ following methods are provided for asynchronous operation:
.. method:: read_message(decode=True)
@@ -449,16 +518,16 @@ The individual messages in the replication stream are presented by
This method should be used in a loop with asynchronous connections
after calling `start_replication()` once.
- It tries to read the next message from the server, without blocking
- and returns an instance of `ReplicationMessage` or `!None`, in case
- there are no more data messages from the server at the moment.
+ It tries to read the next message from the server without blocking and
+ returns an instance of `ReplicationMessage` or `!None`, in case there
+ are no more data messages from the server at the moment.
It is expected that the calling code will call this method repeatedly
- in order to consume all of the messages that might have been buffered,
- until `!None` is returned. After receiving a `!None` value from this
- method, the caller should use `~select.select()` or `~select.poll()`
- on the corresponding connection to block the process until there is
- more data from the server.
+ in order to consume all of the messages that might have been buffered
+ until `!None` is returned. After receiving `!None` from this method
+ the caller should use `~select.select()` or `~select.poll()` on the
+ corresponding connection to block the process until there is more data
+ from the server.
The server can send keepalive messages to the client periodically.
Such messages are silently consumed by this method and are never
@@ -480,24 +549,25 @@ The individual messages in the replication stream are presented by
An actual example of asynchronous operation might look like this::
- def consume(msg):
- ...
-
- keepalive_interval = 10.0
- while True:
- msg = cur.read_message()
- if msg:
- consume(msg)
- else:
- now = datetime.now()
- timeout = keepalive_interval - (now - cur.io_timestamp).total_seconds()
- if timeout > 0:
- sel = select.select([cur], [], [], timeout)
- else:
- sel = ([], [], [])
-
- if not sel[0]:
- cur.send_feedback()
+ def consume(msg):
+ ...
+
+ keepalive_interval = 10.0
+ while True:
+ msg = cur.read_message()
+ if msg:
+ consume(msg)
+ else:
+ now = datetime.now()
+ timeout = keepalive_interval - (now - cur.io_timestamp).total_seconds()
+ if timeout > 0:
+ sel = select.select([cur], [], [], timeout)
+ else:
+ sel = ([], [], [])
+
+ if not sel[0]:
+ # timed out, send keepalive message
+ cur.send_feedback()
.. index::
pair: Cursor; Replication