diff options
Diffstat (limited to 'doc/src')
-rw-r--r-- | doc/src/extras.rst | 117 |
1 files changed, 60 insertions, 57 deletions
diff --git a/doc/src/extras.rst b/doc/src/extras.rst index 7cca840..19c8152 100644 --- a/doc/src/extras.rst +++ b/doc/src/extras.rst @@ -165,12 +165,12 @@ Replication cursor .. method:: identify_system() - This method executes ``IDENTIFY_SYSTEM`` command of the streaming - replication protocol and returns a result as a dictionary. + Execute ``IDENTIFY_SYSTEM`` command of the streaming replication + protocol and return the result as a dictionary. Example:: - >>> print cur.identify_system() + >>> cur.identify_system() {'timeline': 1, 'systemid': '1234567890123456789', 'dbname': 'test', 'xlogpos': '0/1ABCDEF'} .. method:: create_replication_slot(slot_type, slot_name, output_plugin=None) @@ -199,82 +199,81 @@ Replication cursor .. method:: start_replication(slot_type, slot_name=None, writer=None, start_lsn=None, timeline=0, keepalive_interval=10, options=None) - Start and consume replication stream. + Start a replication stream. On non-asynchronous connection, also + consume the stream messages. :param slot_type: type of replication: either `REPLICATION_PHYSICAL` or `REPLICATION_LOGICAL` :param slot_name: name of the replication slot to use (required for logical replication) :param writer: a file-like object to write replication messages to - :param start_lsn: the point in replication stream (WAL position) to start - from, in the form ``XXX/XXX`` (forward-slash separated - pair of hexadecimals) + :param start_lsn: the LSN position to start from, in the form + ``XXX/XXX`` (forward-slash separated pair of + hexadecimals) :param timeline: WAL history timeline to start streaming from (optional, can only be used with physical replication) :param keepalive_interval: interval (in seconds) to send keepalive messages to the server - :param options: an dictionary of options to pass to logical replication + :param options: a dictionary of options to pass to logical replication slot - With non-asynchronous connection, this method enters an endless loop, - reading messages from the server and passing them to ``write()`` method - of the *writer* object. This is similar to operation of the + When used on non-asynchronous connection this method enters an endless + loop, reading messages from the server and passing them to ``write()`` + method of the *writer* object. This is similar to operation of the `~cursor.copy_to()` method. It also sends keepalive messages to the server, in case there were no new data from it for the duration of - *keepalive_interval* seconds (this parameter must be greater than 1 - second, but it can have a fractional part). + *keepalive_interval* seconds (this parameter's value must be equal to + at least than 1 second, but it can have a fractional part). With asynchronous connection, this method returns immediately and the calling code can start reading the replication messages in a loop. - A sketch implementation of the *writer* object might look similar to - the following:: + A sketch implementation of the *writer* object for logical replication + might look similar to the following:: from io import TextIOBase - class ReplicationStreamWriter(TextIOBase): + class LogicalStreamWriter(TextIOBase): def write(self, msg): - self.store_data_reliably(msg) + self.store_message_data(msg.payload) - if self.should_report_to_the_server(msg): + if self.should_report_to_the_server_now(msg): msg.cursor.send_replication_feedback(flush_lsn=msg.wal_end) - def store_data_reliably(self, msg): - ... - - def shoud_report_to_the_server(self, msg): - ... - - First, like with the `~cursor.copy_to()` method, the code that is - calling the provided write method checks if the *writer* object is + First, like with the `~cursor.copy_to()` method, the code that calls + the provided ``write()`` method checks if the *writer* object is inherited from `~io.TextIOBase`. If that is the case, the message payload to be passed is converted to unicode using the connection's - encoding information. Otherwise, the message is passed as is. + `~connection.encoding` information. Otherwise, the message is passed + as is. The *msg* object being passed is an instance of `~ReplicationMessage` class. - After storing the data passed in the message object, the writer object - should consider sending a confirmation message to the server. This is - done by calling `~send_replication_feedback()` method on the - corresponding replication cursor. A reference to the cursor producing - a given message is provided in the `~ReplicationMessage` as an - attribute. + After storing certain amount of messages' data reliably, the client + should send a confirmation message to the server. This should be done + by calling `~send_replication_feedback()` method on the corresponding + replication cursor. A reference to the cursor is provided in the + `~ReplicationMessage` as an attribute. + + .. warning:: - .. note:: + Failure to properly notify the server by constantly consuming and + reporting success at appropriate times can eventually lead to "disk + full" condition on the server, because the server retains all the + WAL segments that might be needed to stream the changes via all of + the currently open replication slots. - One needs to be aware that failure to properly notify the server on - any one replication slot by constantly consuming and reporting - success to the server at appropriate times can eventually lead to - "disk full" condition on the server, because the server retains all - the WAL segments that might be needed to stream the changes via - currently open replication slots. + On the other hand, it is not recommended to send a confirmation + after every processed message, since that will put an unnecessary + load on network and the server. A possible strategy is to confirm + after every COMMIT message. .. method:: stop_replication() In non-asynchronous connection, when called from the ``write()`` - method tells the code in `~start_replication` to break out of the + method, tell the code in `~start_replication` to break out of the endless loop and return. .. method:: send_replication_feedback(write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False) @@ -291,12 +290,12 @@ Replication cursor Use this method to report to the server that all messages up to a certain LSN position have been stored and may be discarded. - This method can also be called with default parameters to send a - keepalive message to the server. + This method can also be called with all default parameters' values to + send a keepalive message to the server. - In case the message cannot be sent at the moment, remembers the - positions for a later successful call or call to - `~flush_replication_feedback()`. + In case of asynchronous connection, if the feedback message cannot be + sent at the moment, remembers the passed LSN positions for a later + hopefully successful call or call to `~flush_replication_feedback()`. .. method:: flush_replication_feedback(reply=False) @@ -307,10 +306,10 @@ Replication cursor Low-level methods for asynchronous connection operation. - While with the non-asynchronous connection, a single call to - `~start_replication()` handles all the complexity, at times it might be - beneficial to use low-level interface for better control, in particular to - `~select.select()` on multiple sockets. The following methods are + With the non-asynchronous connection, a single call to + `~start_replication()` handles all the complexity, but at times it might + be beneficial to use low-level interface for better control, in particular + to `~select.select()` on multiple sockets. The following methods are provided for asynchronous operation: .. method:: read_replication_message(decode=True) @@ -319,14 +318,18 @@ Replication cursor performed on the data received from the server This method should be used in a loop with asynchronous connections - after calling `~start_replication()`. + after calling `~start_replication()` once. It tries to read the next message from the server, without blocking and returns an instance of `~ReplicationMessage` or *None*, in case - there are no more data messages from the server at the moment. After - receiving a *None* value from this method, one should use a - `~select.select()` or `~select.poll()` on the corresponding connection - to block the process until there is more data from the server. + there are no more data messages from the server at the moment. + + It is expected that the calling code will call this method repeatedly + in order to consume all of the messages that might have been buffered, + until *None* is returned. After receiving a *None* value from this + method, one might use `~select.select()` or `~select.poll()` on the + corresponding connection to block the process until there is more data + from the server. The server can send keepalive messages to the client periodically. Such messages are silently consumed by this method and are never @@ -334,8 +337,8 @@ Replication cursor .. method:: fileno() - Calls the corresponding connection's `~connection.fileno()` method - and returns the result. + Call the corresponding connection's `~connection.fileno()` method and + return the result. This is a convenience method which allows replication cursor to be used directly in `~select.select()` or `~select.poll()` calls. |