diff options
| author | Daniele Varrazzo <daniele.varrazzo@gmail.com> | 2016-08-14 21:09:54 +0100 |
|---|---|---|
| committer | Daniele Varrazzo <daniele.varrazzo@gmail.com> | 2016-08-14 21:09:54 +0100 |
| commit | 1d950748af199d76069a5fb71bd9f7ace7a2e50e (patch) | |
| tree | bf981b7ad7fe4f230af285fe76a8c9bcdab83b77 /doc/src | |
| parent | e779fec5f9eefa5fd2f943e15a785987feece679 (diff) | |
| parent | 01c552baa3847819d024a0f945ec2b4f3bbeadba (diff) | |
| download | psycopg2-1d950748af199d76069a5fb71bd9f7ace7a2e50e.tar.gz | |
Merge branch 'replication-protocol'
Diffstat (limited to 'doc/src')
| -rw-r--r-- | doc/src/advanced.rst | 89 | ||||
| -rw-r--r-- | doc/src/extras.rst | 368 |
2 files changed, 456 insertions, 1 deletions
diff --git a/doc/src/advanced.rst b/doc/src/advanced.rst index f2e279f..5b5fb35 100644 --- a/doc/src/advanced.rst +++ b/doc/src/advanced.rst @@ -423,7 +423,7 @@ this will be probably implemented in a future release. Support for coroutine libraries ------------------------------- -.. versionadded:: 2.2.0 +.. versionadded:: 2.2 Psycopg can be used together with coroutine_\-based libraries and participate in cooperative multithreading. @@ -509,3 +509,90 @@ resources about the topic. conn.commit() cur.close() conn.close() + + + +.. index:: + single: Replication + +.. _replication-support: + +Replication protocol support +---------------------------- + +.. versionadded:: 2.7 + +Modern PostgreSQL servers (version 9.0 and above) support replication. The +replication protocol is built on top of the client-server protocol and can be +operated using ``libpq``, as such it can be also operated by ``psycopg2``. +The replication protocol can be operated on both synchronous and +:ref:`asynchronous <async-support>` connections. + +Server version 9.4 adds a new feature called *Logical Replication*. + +.. seealso:: + + - PostgreSQL `Streaming Replication Protocol`__ + + .. __: http://www.postgresql.org/docs/current/static/protocol-replication.html + + +Logical replication Quick-Start +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +You must be using PostgreSQL server version 9.4 or above to run this quick +start. + +Make sure that replication connections are permitted for user ``postgres`` in +``pg_hba.conf`` and reload the server configuration. You also need to set +``wal_level=logical`` and ``max_wal_senders``, ``max_replication_slots`` to +value greater than zero in ``postgresql.conf`` (these changes require a server +restart). Create a database ``psycopg2_test``. + +Then run the following code to quickly try the replication support out. This +is not production code -- it has no error handling, it sends feedback too +often, etc. -- and it's only intended as a simple demo of logical +replication:: + + from __future__ import print_function + import sys + import psycopg2 + import psycopg2.extras + + conn = psycopg2.connect('dbname=psycopg2_test user=postgres', + connection_factory=psycopg2.extras.LogicalReplicationConnection) + cur = conn.cursor() + try: + # test_decoding produces textual output + cur.start_replication(slot_name='pytest', decode=True) + except psycopg2.ProgrammingError: + cur.create_replication_slot('pytest', output_plugin='test_decoding') + cur.start_replication(slot_name='pytest', decode=True) + + class DemoConsumer(object): + def __call__(self, msg): + print(msg.payload) + msg.cursor.send_feedback(flush_lsn=msg.data_start) + + democonsumer = DemoConsumer() + + print("Starting streaming, press Control-C to end...", file=sys.stderr) + try: + cur.consume_stream(democonsumer) + except KeyboardInterrupt: + cur.close() + conn.close() + print("The slot 'pytest' still exists. Drop it with " + "SELECT pg_drop_replication_slot('pytest'); if no longer needed.", + file=sys.stderr) + print("WARNING: Transaction logs will accumulate in pg_xlog " + "until the slot is dropped.", file=sys.stderr) + + +You can now make changes to the ``psycopg2_test`` database using a normal +psycopg2 session, ``psql``, etc. and see the logical decoding stream printed +by this demo client. + +This will continue running until terminated with ``Control-C``. + +For the details see :ref:`replication-objects`. diff --git a/doc/src/extras.rst b/doc/src/extras.rst index 0e21ae5..78e96ef 100644 --- a/doc/src/extras.rst +++ b/doc/src/extras.rst @@ -143,6 +143,374 @@ Logging cursor +.. _replication-objects: + +Replication connection and cursor classes +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +See :ref:`replication-support` for an introduction to the topic. + + +The following replication types are defined: + +.. data:: REPLICATION_LOGICAL +.. data:: REPLICATION_PHYSICAL + + +.. index:: + pair: Connection; replication + +.. autoclass:: LogicalReplicationConnection + + This connection factory class can be used to open a special type of + connection that is used for logical replication. + + Example:: + + from psycopg2.extras import LogicalReplicationConnection + log_conn = psycopg2.connect(dsn, connection_factory=LogicalReplicationConnection) + log_cur = log_conn.cursor() + + +.. autoclass:: PhysicalReplicationConnection + + This connection factory class can be used to open a special type of + connection that is used for physical replication. + + Example:: + + from psycopg2.extras import PhysicalReplicationConnection + phys_conn = psycopg2.connect(dsn, connection_factory=PhysicalReplicationConnection) + phys_cur = phys_conn.cursor() + + Both `LogicalReplicationConnection` and `PhysicalReplicationConnection` use + `ReplicationCursor` for actual communication with the server. + + +.. index:: + pair: Message; replication + +The individual messages in the replication stream are represented by +`ReplicationMessage` objects (both logical and physical type): + +.. autoclass:: ReplicationMessage + + .. attribute:: payload + + The actual data received from the server. + + An instance of either `bytes()` or `unicode()`, depending on the value + of `decode` option passed to `~ReplicationCursor.start_replication()` + on the connection. See `~ReplicationCursor.read_message()` for + details. + + .. attribute:: data_size + + The raw size of the message payload (before possible unicode + conversion). + + .. attribute:: data_start + + LSN position of the start of the message. + + .. attribute:: wal_end + + LSN position of the current end of WAL on the server. + + .. attribute:: send_time + + A `~datetime` object representing the server timestamp at the moment + when the message was sent. + + .. attribute:: cursor + + A reference to the corresponding `ReplicationCursor` object. + + +.. index:: + pair: Cursor; replication + +.. autoclass:: ReplicationCursor + + .. method:: create_replication_slot(slot_name, slot_type=None, output_plugin=None) + + Create streaming replication slot. + + :param slot_name: name of the replication slot to be created + :param slot_type: type of replication: should be either + `REPLICATION_LOGICAL` or `REPLICATION_PHYSICAL` + :param output_plugin: name of the logical decoding output plugin to be + used by the slot; required for logical + replication connections, disallowed for physical + + Example:: + + log_cur.create_replication_slot("logical1", "test_decoding") + phys_cur.create_replication_slot("physical1") + + # either logical or physical replication connection + cur.create_replication_slot("slot1", slot_type=REPLICATION_LOGICAL) + + When creating a slot on a logical replication connection, a logical + replication slot is created by default. Logical replication requires + name of the logical decoding output plugin to be specified. + + When creating a slot on a physical replication connection, a physical + replication slot is created by default. No output plugin parameter is + required or allowed when creating a physical replication slot. + + In either case the type of slot being created can be specified + explicitly using *slot_type* parameter. + + Replication slots are a feature of PostgreSQL server starting with + version 9.4. + + .. method:: drop_replication_slot(slot_name) + + Drop streaming replication slot. + + :param slot_name: name of the replication slot to drop + + Example:: + + # either logical or physical replication connection + cur.drop_replication_slot("slot1") + + Replication slots are a feature of PostgreSQL server starting with + version 9.4. + + .. method:: start_replication(slot_name=None, slot_type=None, start_lsn=0, timeline=0, options=None, decode=False) + + Start replication on the connection. + + :param slot_name: name of the replication slot to use; required for + logical replication, physical replication can work + with or without a slot + :param slot_type: type of replication: should be either + `REPLICATION_LOGICAL` or `REPLICATION_PHYSICAL` + :param start_lsn: the optional LSN position to start replicating from, + can be an integer or a string of hexadecimal digits + in the form ``XXX/XXX`` + :param timeline: WAL history timeline to start streaming from (optional, + can only be used with physical replication) + :param options: a dictionary of options to pass to logical replication + slot (not allowed with physical replication) + :param decode: a flag indicating that unicode conversion should be + performed on messages received from the server + + If a *slot_name* is specified, the slot must exist on the server and + its type must match the replication type used. + + If not specified using *slot_type* parameter, the type of replication + is defined by the type of replication connection. Logical replication + is only allowed on logical replication connection, but physical + replication can be used with both types of connection. + + On the other hand, physical replication doesn't require a named + replication slot to be used, only logical replication does. In any + case logical replication and replication slots are a feature of + PostgreSQL server starting with version 9.4. Physical replication can + be used starting with 9.0. + + If *start_lsn* is specified, the requested stream will start from that + LSN. The default is `!None` which passes the LSN ``0/0`` causing + replay to begin at the last point for which the server got flush + confirmation from the client, or the oldest available point for a new + slot. + + The server might produce an error if a WAL file for the given LSN has + already been recycled or it may silently start streaming from a later + position: the client can verify the actual position using information + provided by the `ReplicationMessage` attributes. The exact server + behavior depends on the type of replication and use of slots. + + The *timeline* parameter can only be specified with physical + replication and only starting with server version 9.3. + + A dictionary of *options* may be passed to the logical decoding plugin + on a logical replication slot. The set of supported options depends + on the output plugin that was used to create the slot. Must be + `!None` for physical replication. + + If *decode* is set to `!True` the messages received from the server + would be converted according to the connection `~connection.encoding`. + *This parameter should not be set with physical replication or with + logical replication plugins that produce binary output.* + + This function constructs a ``START_REPLICATION`` command and calls + `start_replication_expert()` internally. + + After starting the replication, to actually consume the incoming + server messages use `consume_stream()` or implement a loop around + `read_message()` in case of :ref:`asynchronous connection + <async-support>`. + + .. method:: start_replication_expert(command, decode=False) + + Start replication on the connection using provided + ``START_REPLICATION`` command. See `start_replication()` for + description of *decode* parameter. + + .. method:: consume_stream(consume, keepalive_interval=10) + + :param consume: a callable object with signature :samp:`consume({msg})` + :param keepalive_interval: interval (in seconds) to send keepalive + messages to the server + + This method can only be used with synchronous connection. For + asynchronous connections see `read_message()`. + + Before using this method to consume the stream call + `start_replication()` first. + + This method enters an endless loop reading messages from the server + and passing them to ``consume()`` one at a time, then waiting for more + messages from the server. In order to make this method break out of + the loop and return, ``consume()`` can throw a `StopReplication` + exception. Any unhandled exception will make it break out of the loop + as well. + + The *msg* object passed to ``consume()`` is an instance of + `ReplicationMessage` class. See `read_message()` for details about + message decoding. + + This method also sends keepalive messages to the server in case there + were no new data from the server for the duration of + *keepalive_interval* (in seconds). The value of this parameter must + be set to at least 1 second, but it can have a fractional part. + + After processing certain amount of messages the client should send a + confirmation message to the server. This should be done by calling + `send_feedback()` method on the corresponding replication cursor. A + reference to the cursor is provided in the `ReplicationMessage` as an + attribute. + + The following example is a sketch implementation of ``consume()`` + callable for logical replication:: + + class LogicalStreamConsumer(object): + + ... + + def __call__(self, msg): + self.process_message(msg.payload) + + if self.should_send_feedback(msg): + msg.cursor.send_feedback(flush_lsn=msg.data_start) + + consumer = LogicalStreamConsumer() + cur.consume_stream(consumer) + + .. warning:: + + When using replication with slots, failure to constantly consume + *and* report success to the server appropriately can eventually + lead to "disk full" condition on the server, because the server + retains all the WAL segments that might be needed to stream the + changes via all of the currently open replication slots. + + On the other hand, it is not recommended to send confirmation + after *every* processed message, since that will put an + unnecessary load on network and the server. A possible strategy + is to confirm after every COMMIT message. + + .. method:: send_feedback(write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False) + + :param write_lsn: a LSN position up to which the client has written the data locally + :param flush_lsn: a LSN position up to which the client has processed the + data reliably (the server is allowed to discard all + and every data that predates this LSN) + :param apply_lsn: a LSN position up to which the warm standby server + has applied the changes (physical replication + master-slave protocol only) + :param reply: request the server to send back a keepalive message immediately + + Use this method to report to the server that all messages up to a + certain LSN position have been processed on the client and may be + discarded on the server. + + This method can also be called with all default parameters' values to + just send a keepalive message to the server. + + Low-level replication cursor methods for :ref:`asynchronous connection + <async-support>` operation. + + With the synchronous connection a call to `consume_stream()` handles all + the complexity of handling the incoming messages and sending keepalive + replies, but at times it might be beneficial to use low-level interface + for better control, in particular to `~select` on multiple sockets. The + following methods are provided for asynchronous operation: + + .. method:: read_message() + + Try to read the next message from the server without blocking and + return an instance of `ReplicationMessage` or `!None`, in case there + are no more data messages from the server at the moment. + + This method should be used in a loop with asynchronous connections + (after calling `start_replication()` once). For synchronous + connections see `consume_stream()`. + + The returned message's `~ReplicationMessage.payload` is an instance of + `!unicode` decoded according to connection `~connection.encoding` + *iff* *decode* was set to `!True` in the initial call to + `start_replication()` on this connection, otherwise it is an instance + of `!bytes` with no decoding. + + It is expected that the calling code will call this method repeatedly + in order to consume all of the messages that might have been buffered + until `!None` is returned. After receiving `!None` from this method + the caller should use `~select.select()` or `~select.poll()` on the + corresponding connection to block the process until there is more data + from the server. + + The server can send keepalive messages to the client periodically. + Such messages are silently consumed by this method and are never + reported to the caller. + + .. method:: fileno() + + Call the corresponding connection's `~connection.fileno()` method and + return the result. + + This is a convenience method which allows replication cursor to be + used directly in `~select.select()` or `~select.poll()` calls. + + .. attribute:: io_timestamp + + A `~datetime` object representing the timestamp at the moment of last + communication with the server (a data or keepalive message in either + direction). + + An actual example of asynchronous operation might look like this:: + + from select import select + from datetime import datetime + + def consume(msg): + ... + + keepalive_interval = 10.0 + while True: + msg = cur.read_message() + if msg: + consume(msg) + else: + now = datetime.now() + timeout = keepalive_interval - (now - cur.io_timestamp).total_seconds() + try: + sel = select([cur], [], [], max(0, timeout)) + if not any(sel): + cur.send_feedback() # timed out, send keepalive message + except InterruptedError: + pass # recalculate timeout and continue + +.. index:: + pair: Cursor; Replication + +.. autoclass:: StopReplication + + .. index:: single: Data types; Additional |
