summaryrefslogtreecommitdiff
path: root/doc
diff options
context:
space:
mode:
authorOleksandr Shulgin <oleksandr.shulgin@zalando.de>2015-07-01 14:08:32 +0200
committerOleksandr Shulgin <oleksandr.shulgin@zalando.de>2015-07-01 14:08:32 +0200
commit9386653d721229eae3f9e691a93d711575d2e5c6 (patch)
tree2035067ff82a85df684e9ee2d9a3ff717a7d4050 /doc
parent0d731aa12e6d9a59e61cebe9c0a7d71025f000f8 (diff)
downloadpsycopg2-9386653d721229eae3f9e691a93d711575d2e5c6.tar.gz
Update docs on ReplicationCursor
Diffstat (limited to 'doc')
-rw-r--r--doc/src/extras.rst117
1 files changed, 60 insertions, 57 deletions
diff --git a/doc/src/extras.rst b/doc/src/extras.rst
index 7cca840..19c8152 100644
--- a/doc/src/extras.rst
+++ b/doc/src/extras.rst
@@ -165,12 +165,12 @@ Replication cursor
.. method:: identify_system()
- This method executes ``IDENTIFY_SYSTEM`` command of the streaming
- replication protocol and returns a result as a dictionary.
+ Execute ``IDENTIFY_SYSTEM`` command of the streaming replication
+ protocol and return the result as a dictionary.
Example::
- >>> print cur.identify_system()
+ >>> cur.identify_system()
{'timeline': 1, 'systemid': '1234567890123456789', 'dbname': 'test', 'xlogpos': '0/1ABCDEF'}
.. method:: create_replication_slot(slot_type, slot_name, output_plugin=None)
@@ -199,82 +199,81 @@ Replication cursor
.. method:: start_replication(slot_type, slot_name=None, writer=None, start_lsn=None, timeline=0, keepalive_interval=10, options=None)
- Start and consume replication stream.
+ Start a replication stream. On non-asynchronous connection, also
+ consume the stream messages.
:param slot_type: type of replication: either `REPLICATION_PHYSICAL` or
`REPLICATION_LOGICAL`
:param slot_name: name of the replication slot to use (required for
logical replication)
:param writer: a file-like object to write replication messages to
- :param start_lsn: the point in replication stream (WAL position) to start
- from, in the form ``XXX/XXX`` (forward-slash separated
- pair of hexadecimals)
+ :param start_lsn: the LSN position to start from, in the form
+ ``XXX/XXX`` (forward-slash separated pair of
+ hexadecimals)
:param timeline: WAL history timeline to start streaming from (optional,
can only be used with physical replication)
:param keepalive_interval: interval (in seconds) to send keepalive
messages to the server
- :param options: an dictionary of options to pass to logical replication
+ :param options: a dictionary of options to pass to logical replication
slot
- With non-asynchronous connection, this method enters an endless loop,
- reading messages from the server and passing them to ``write()`` method
- of the *writer* object. This is similar to operation of the
+ When used on non-asynchronous connection this method enters an endless
+ loop, reading messages from the server and passing them to ``write()``
+ method of the *writer* object. This is similar to operation of the
`~cursor.copy_to()` method. It also sends keepalive messages to the
server, in case there were no new data from it for the duration of
- *keepalive_interval* seconds (this parameter must be greater than 1
- second, but it can have a fractional part).
+ *keepalive_interval* seconds (this parameter's value must be equal to
+ at least than 1 second, but it can have a fractional part).
With asynchronous connection, this method returns immediately and the
calling code can start reading the replication messages in a loop.
- A sketch implementation of the *writer* object might look similar to
- the following::
+ A sketch implementation of the *writer* object for logical replication
+ might look similar to the following::
from io import TextIOBase
- class ReplicationStreamWriter(TextIOBase):
+ class LogicalStreamWriter(TextIOBase):
def write(self, msg):
- self.store_data_reliably(msg)
+ self.store_message_data(msg.payload)
- if self.should_report_to_the_server(msg):
+ if self.should_report_to_the_server_now(msg):
msg.cursor.send_replication_feedback(flush_lsn=msg.wal_end)
- def store_data_reliably(self, msg):
- ...
-
- def shoud_report_to_the_server(self, msg):
- ...
-
- First, like with the `~cursor.copy_to()` method, the code that is
- calling the provided write method checks if the *writer* object is
+ First, like with the `~cursor.copy_to()` method, the code that calls
+ the provided ``write()`` method checks if the *writer* object is
inherited from `~io.TextIOBase`. If that is the case, the message
payload to be passed is converted to unicode using the connection's
- encoding information. Otherwise, the message is passed as is.
+ `~connection.encoding` information. Otherwise, the message is passed
+ as is.
The *msg* object being passed is an instance of `~ReplicationMessage`
class.
- After storing the data passed in the message object, the writer object
- should consider sending a confirmation message to the server. This is
- done by calling `~send_replication_feedback()` method on the
- corresponding replication cursor. A reference to the cursor producing
- a given message is provided in the `~ReplicationMessage` as an
- attribute.
+ After storing certain amount of messages' data reliably, the client
+ should send a confirmation message to the server. This should be done
+ by calling `~send_replication_feedback()` method on the corresponding
+ replication cursor. A reference to the cursor is provided in the
+ `~ReplicationMessage` as an attribute.
+
+ .. warning::
- .. note::
+ Failure to properly notify the server by constantly consuming and
+ reporting success at appropriate times can eventually lead to "disk
+ full" condition on the server, because the server retains all the
+ WAL segments that might be needed to stream the changes via all of
+ the currently open replication slots.
- One needs to be aware that failure to properly notify the server on
- any one replication slot by constantly consuming and reporting
- success to the server at appropriate times can eventually lead to
- "disk full" condition on the server, because the server retains all
- the WAL segments that might be needed to stream the changes via
- currently open replication slots.
+ On the other hand, it is not recommended to send a confirmation
+ after every processed message, since that will put an unnecessary
+ load on network and the server. A possible strategy is to confirm
+ after every COMMIT message.
.. method:: stop_replication()
In non-asynchronous connection, when called from the ``write()``
- method tells the code in `~start_replication` to break out of the
+ method, tell the code in `~start_replication` to break out of the
endless loop and return.
.. method:: send_replication_feedback(write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False)
@@ -291,12 +290,12 @@ Replication cursor
Use this method to report to the server that all messages up to a
certain LSN position have been stored and may be discarded.
- This method can also be called with default parameters to send a
- keepalive message to the server.
+ This method can also be called with all default parameters' values to
+ send a keepalive message to the server.
- In case the message cannot be sent at the moment, remembers the
- positions for a later successful call or call to
- `~flush_replication_feedback()`.
+ In case of asynchronous connection, if the feedback message cannot be
+ sent at the moment, remembers the passed LSN positions for a later
+ hopefully successful call or call to `~flush_replication_feedback()`.
.. method:: flush_replication_feedback(reply=False)
@@ -307,10 +306,10 @@ Replication cursor
Low-level methods for asynchronous connection operation.
- While with the non-asynchronous connection, a single call to
- `~start_replication()` handles all the complexity, at times it might be
- beneficial to use low-level interface for better control, in particular to
- `~select.select()` on multiple sockets. The following methods are
+ With the non-asynchronous connection, a single call to
+ `~start_replication()` handles all the complexity, but at times it might
+ be beneficial to use low-level interface for better control, in particular
+ to `~select.select()` on multiple sockets. The following methods are
provided for asynchronous operation:
.. method:: read_replication_message(decode=True)
@@ -319,14 +318,18 @@ Replication cursor
performed on the data received from the server
This method should be used in a loop with asynchronous connections
- after calling `~start_replication()`.
+ after calling `~start_replication()` once.
It tries to read the next message from the server, without blocking
and returns an instance of `~ReplicationMessage` or *None*, in case
- there are no more data messages from the server at the moment. After
- receiving a *None* value from this method, one should use a
- `~select.select()` or `~select.poll()` on the corresponding connection
- to block the process until there is more data from the server.
+ there are no more data messages from the server at the moment.
+
+ It is expected that the calling code will call this method repeatedly
+ in order to consume all of the messages that might have been buffered,
+ until *None* is returned. After receiving a *None* value from this
+ method, one might use `~select.select()` or `~select.poll()` on the
+ corresponding connection to block the process until there is more data
+ from the server.
The server can send keepalive messages to the client periodically.
Such messages are silently consumed by this method and are never
@@ -334,8 +337,8 @@ Replication cursor
.. method:: fileno()
- Calls the corresponding connection's `~connection.fileno()` method
- and returns the result.
+ Call the corresponding connection's `~connection.fileno()` method and
+ return the result.
This is a convenience method which allows replication cursor to be
used directly in `~select.select()` or `~select.poll()` calls.