summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOleksandr Shulgin <oleksandr.shulgin@zalando.de>2015-06-30 16:17:31 +0200
committerOleksandr Shulgin <oleksandr.shulgin@zalando.de>2015-06-30 16:17:31 +0200
commit318706f28c07444c1a73a3022eab2018ec73817c (patch)
tree1e51d69ea110781d0c8cfc7af453245674362909
parent058db5643011713390ce28fc50b1d6acfde2404f (diff)
downloadpsycopg2-318706f28c07444c1a73a3022eab2018ec73817c.tar.gz
Update docs for Replication protocol
-rw-r--r--doc/src/extras.rst199
1 files changed, 165 insertions, 34 deletions
diff --git a/doc/src/extras.rst b/doc/src/extras.rst
index 9bc302e..7cca840 100644
--- a/doc/src/extras.rst
+++ b/doc/src/extras.rst
@@ -165,8 +165,8 @@ Replication cursor
.. method:: identify_system()
- Get information about the cluster status in form of a dict with
- ``systemid``, ``timeline``, ``xlogpos`` and ``dbname`` as keys.
+ This method executes ``IDENTIFY_SYSTEM`` command of the streaming
+ replication protocol and returns a result as a dictionary.
Example::
@@ -197,65 +197,196 @@ Replication cursor
cur.drop_replication_slot("testslot")
- .. method:: start_replication(file, slot_type, slot_name=None, start_lsn=None, timeline=0, keepalive_interval=10, options=None)
+ .. method:: start_replication(slot_type, slot_name=None, writer=None, start_lsn=None, timeline=0, keepalive_interval=10, options=None)
Start and consume replication stream.
- :param file: a file-like object to write replication stream messages to
:param slot_type: type of replication: either `REPLICATION_PHYSICAL` or
`REPLICATION_LOGICAL`
:param slot_name: name of the replication slot to use (required for
logical replication)
+ :param writer: a file-like object to write replication messages to
:param start_lsn: the point in replication stream (WAL position) to start
from, in the form ``XXX/XXX`` (forward-slash separated
pair of hexadecimals)
:param timeline: WAL history timeline to start streaming from (optional,
can only be used with physical replication)
:param keepalive_interval: interval (in seconds) to send keepalive
- messages to the server, in case there was no
- communication during that period of time
+ messages to the server
:param options: an dictionary of options to pass to logical replication
slot
- The ``keepalive_interval`` must be greater than zero.
+ With non-asynchronous connection, this method enters an endless loop,
+ reading messages from the server and passing them to ``write()`` method
+ of the *writer* object. This is similar to operation of the
+ `~cursor.copy_to()` method. It also sends keepalive messages to the
+ server, in case there were no new data from it for the duration of
+ *keepalive_interval* seconds (this parameter must be greater than 1
+ second, but it can have a fractional part).
- This method never returns unless an error message is sent from the
- server, or the server closes connection, or there is an exception in the
- ``write()`` method of the ``file`` object.
+ With asynchronous connection, this method returns immediately and the
+ calling code can start reading the replication messages in a loop.
- One can even use ``sys.stdout`` as the destination (this is only good for
- testing purposes, however)::
+ A sketch implementation of the *writer* object might look similar to
+ the following::
- >>> cur.start_replication(sys.stdout, "testslot")
- ...
+ from io import TextIOBase
- This method acts much like the `~cursor.copy_to()` with an important
- distinction that ``write()`` method return value is dirving the
- server-side replication cursor. In order to report to the server that
- the all the messages up to the current one have been stored reliably, one
- should return true value (i.e. something that satisfies ``if retval:``
- conidtion) from the ``write`` callback::
+ class ReplicationStreamWriter(TextIOBase):
- class ReplicationStreamWriter(object):
def write(self, msg):
- if store_message_reliably(msg):
- return True
+ self.store_data_reliably(msg)
- cur.start_replication(writer, "testslot")
- ...
+ if self.should_report_to_the_server(msg):
+ msg.cursor.send_replication_feedback(flush_lsn=msg.wal_end)
+
+ def store_data_reliably(self, msg):
+ ...
+
+ def shoud_report_to_the_server(self, msg):
+ ...
+
+ First, like with the `~cursor.copy_to()` method, the code that is
+ calling the provided write method checks if the *writer* object is
+ inherited from `~io.TextIOBase`. If that is the case, the message
+ payload to be passed is converted to unicode using the connection's
+ encoding information. Otherwise, the message is passed as is.
+
+ The *msg* object being passed is an instance of `~ReplicationMessage`
+ class.
+
+ After storing the data passed in the message object, the writer object
+ should consider sending a confirmation message to the server. This is
+ done by calling `~send_replication_feedback()` method on the
+ corresponding replication cursor. A reference to the cursor producing
+ a given message is provided in the `~ReplicationMessage` as an
+ attribute.
.. note::
- One needs to be aware that failure to update the server-side cursor
- on any one replication slot properly by constantly consuming and
- reporting success to the server can eventually lead to "disk full"
- condition on the server, because the server retains all the WAL
- segments that might be needed to stream the changes via currently
- open replication slots.
+ One needs to be aware that failure to properly notify the server on
+ any one replication slot by constantly consuming and reporting
+ success to the server at appropriate times can eventually lead to
+ "disk full" condition on the server, because the server retains all
+ the WAL segments that might be needed to stream the changes via
+ currently open replication slots.
+
+ .. method:: stop_replication()
+
+ In non-asynchronous connection, when called from the ``write()``
+ method tells the code in `~start_replication` to break out of the
+ endless loop and return.
+
+ .. method:: send_replication_feedback(write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False)
+
+ :param write_lsn: a LSN position up to which the client has written the data locally
+ :param flush_lsn: a LSN position up to which the client has stored the
+ data reliably (the server is allowed to discard all
+ and every data that predates this LSN)
+ :param apply_lsn: a LSN position up to which the warm standby server
+ has applied the changes (physical replication
+ master-slave protocol only)
+ :param reply: request the server to send back a keepalive message immediately
+
+ Use this method to report to the server that all messages up to a
+ certain LSN position have been stored and may be discarded.
+
+ This method can also be called with default parameters to send a
+ keepalive message to the server.
+
+ In case the message cannot be sent at the moment, remembers the
+ positions for a later successful call or call to
+ `~flush_replication_feedback()`.
+
+ .. method:: flush_replication_feedback(reply=False)
+
+ :param reply: request the server to send back a keepalive message immediately
+
+ This method tries to flush the latest replication feedback message
+ that `~send_replication_feedback()` was trying to send, if any.
+
+ Low-level methods for asynchronous connection operation.
+
+ While with the non-asynchronous connection, a single call to
+ `~start_replication()` handles all the complexity, at times it might be
+ beneficial to use low-level interface for better control, in particular to
+ `~select.select()` on multiple sockets. The following methods are
+ provided for asynchronous operation:
+
+ .. method:: read_replication_message(decode=True)
+
+ :param decode: a flag indicating that unicode conversion should be
+ performed on the data received from the server
+
+ This method should be used in a loop with asynchronous connections
+ after calling `~start_replication()`.
+
+ It tries to read the next message from the server, without blocking
+ and returns an instance of `~ReplicationMessage` or *None*, in case
+ there are no more data messages from the server at the moment. After
+ receiving a *None* value from this method, one should use a
+ `~select.select()` or `~select.poll()` on the corresponding connection
+ to block the process until there is more data from the server.
+
+ The server can send keepalive messages to the client periodically.
+ Such messages are silently consumed by this method and are never
+ reported to the caller.
+
+ .. method:: fileno()
+
+ Calls the corresponding connection's `~connection.fileno()` method
+ and returns the result.
+
+ This is a convenience method which allows replication cursor to be
+ used directly in `~select.select()` or `~select.poll()` calls.
+
+ .. attribute:: replication_io_timestamp
+
+ A `~datetime` object representing the timestamp at the moment of last
+ communication with the server (a data or keepalive message in either
+ direction).
+
+ An actual example of asynchronous operation might look like this::
+
+ keepalive_interval = 10.0
+ while True:
+ if (datetime.now() - cur.replication_io_timestamp).total_seconds() >= keepalive_interval:
+ cur.send_replication_feedback()
+
+ while True:
+ msg = cur.read_replication_message()
+ if not msg:
+ break
+ writer.write(msg)
+
+ timeout = keepalive_interval - (datetime.now() - cur.replication_io_timestamp).total_seconds()
+ if timeout > 0:
+ select.select([cur], [], [], timeout)
+
+.. autoclass:: ReplicationMessage
+
+ .. attribute:: payload
+
+ The actual data received from the server. An instance of either
+ ``str`` or ``unicode``.
+
+ .. attribute:: data_start
+
+ LSN position of the start of the message.
+
+ .. attribute:: wal_end
+
+ LSN position of the end of the message.
+
+ .. attribute:: send_time
+
+ A `~datetime` object representing the server timestamp at the moment
+ when the message was sent.
+
+ .. attribute:: cursor
+
+ A reference to the corresponding `~ReplicationCursor` object.
- Drop any open replication slots that are no longer being used. The
- list of open slots can be obtained by running a query like ``SELECT *
- FROM pg_replication_slots``.
.. data:: REPLICATION_PHYSICAL