summaryrefslogtreecommitdiff
path: root/doc/src
diff options
context:
space:
mode:
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>2016-08-14 21:09:54 +0100
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>2016-08-14 21:09:54 +0100
commit1d950748af199d76069a5fb71bd9f7ace7a2e50e (patch)
treebf981b7ad7fe4f230af285fe76a8c9bcdab83b77 /doc/src
parente779fec5f9eefa5fd2f943e15a785987feece679 (diff)
parent01c552baa3847819d024a0f945ec2b4f3bbeadba (diff)
downloadpsycopg2-1d950748af199d76069a5fb71bd9f7ace7a2e50e.tar.gz
Merge branch 'replication-protocol'
Diffstat (limited to 'doc/src')
-rw-r--r--doc/src/advanced.rst89
-rw-r--r--doc/src/extras.rst368
2 files changed, 456 insertions, 1 deletions
diff --git a/doc/src/advanced.rst b/doc/src/advanced.rst
index f2e279f..5b5fb35 100644
--- a/doc/src/advanced.rst
+++ b/doc/src/advanced.rst
@@ -423,7 +423,7 @@ this will be probably implemented in a future release.
Support for coroutine libraries
-------------------------------
-.. versionadded:: 2.2.0
+.. versionadded:: 2.2
Psycopg can be used together with coroutine_\-based libraries and participate
in cooperative multithreading.
@@ -509,3 +509,90 @@ resources about the topic.
conn.commit()
cur.close()
conn.close()
+
+
+
+.. index::
+ single: Replication
+
+.. _replication-support:
+
+Replication protocol support
+----------------------------
+
+.. versionadded:: 2.7
+
+Modern PostgreSQL servers (version 9.0 and above) support replication. The
+replication protocol is built on top of the client-server protocol and can be
+operated using ``libpq``, as such it can be also operated by ``psycopg2``.
+The replication protocol can be operated on both synchronous and
+:ref:`asynchronous <async-support>` connections.
+
+Server version 9.4 adds a new feature called *Logical Replication*.
+
+.. seealso::
+
+ - PostgreSQL `Streaming Replication Protocol`__
+
+ .. __: http://www.postgresql.org/docs/current/static/protocol-replication.html
+
+
+Logical replication Quick-Start
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+You must be using PostgreSQL server version 9.4 or above to run this quick
+start.
+
+Make sure that replication connections are permitted for user ``postgres`` in
+``pg_hba.conf`` and reload the server configuration. You also need to set
+``wal_level=logical`` and ``max_wal_senders``, ``max_replication_slots`` to
+value greater than zero in ``postgresql.conf`` (these changes require a server
+restart). Create a database ``psycopg2_test``.
+
+Then run the following code to quickly try the replication support out. This
+is not production code -- it has no error handling, it sends feedback too
+often, etc. -- and it's only intended as a simple demo of logical
+replication::
+
+ from __future__ import print_function
+ import sys
+ import psycopg2
+ import psycopg2.extras
+
+ conn = psycopg2.connect('dbname=psycopg2_test user=postgres',
+ connection_factory=psycopg2.extras.LogicalReplicationConnection)
+ cur = conn.cursor()
+ try:
+ # test_decoding produces textual output
+ cur.start_replication(slot_name='pytest', decode=True)
+ except psycopg2.ProgrammingError:
+ cur.create_replication_slot('pytest', output_plugin='test_decoding')
+ cur.start_replication(slot_name='pytest', decode=True)
+
+ class DemoConsumer(object):
+ def __call__(self, msg):
+ print(msg.payload)
+ msg.cursor.send_feedback(flush_lsn=msg.data_start)
+
+ democonsumer = DemoConsumer()
+
+ print("Starting streaming, press Control-C to end...", file=sys.stderr)
+ try:
+ cur.consume_stream(democonsumer)
+ except KeyboardInterrupt:
+ cur.close()
+ conn.close()
+ print("The slot 'pytest' still exists. Drop it with "
+ "SELECT pg_drop_replication_slot('pytest'); if no longer needed.",
+ file=sys.stderr)
+ print("WARNING: Transaction logs will accumulate in pg_xlog "
+ "until the slot is dropped.", file=sys.stderr)
+
+
+You can now make changes to the ``psycopg2_test`` database using a normal
+psycopg2 session, ``psql``, etc. and see the logical decoding stream printed
+by this demo client.
+
+This will continue running until terminated with ``Control-C``.
+
+For the details see :ref:`replication-objects`.
diff --git a/doc/src/extras.rst b/doc/src/extras.rst
index 0e21ae5..78e96ef 100644
--- a/doc/src/extras.rst
+++ b/doc/src/extras.rst
@@ -143,6 +143,374 @@ Logging cursor
+.. _replication-objects:
+
+Replication connection and cursor classes
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+See :ref:`replication-support` for an introduction to the topic.
+
+
+The following replication types are defined:
+
+.. data:: REPLICATION_LOGICAL
+.. data:: REPLICATION_PHYSICAL
+
+
+.. index::
+ pair: Connection; replication
+
+.. autoclass:: LogicalReplicationConnection
+
+ This connection factory class can be used to open a special type of
+ connection that is used for logical replication.
+
+ Example::
+
+ from psycopg2.extras import LogicalReplicationConnection
+ log_conn = psycopg2.connect(dsn, connection_factory=LogicalReplicationConnection)
+ log_cur = log_conn.cursor()
+
+
+.. autoclass:: PhysicalReplicationConnection
+
+ This connection factory class can be used to open a special type of
+ connection that is used for physical replication.
+
+ Example::
+
+ from psycopg2.extras import PhysicalReplicationConnection
+ phys_conn = psycopg2.connect(dsn, connection_factory=PhysicalReplicationConnection)
+ phys_cur = phys_conn.cursor()
+
+ Both `LogicalReplicationConnection` and `PhysicalReplicationConnection` use
+ `ReplicationCursor` for actual communication with the server.
+
+
+.. index::
+ pair: Message; replication
+
+The individual messages in the replication stream are represented by
+`ReplicationMessage` objects (both logical and physical type):
+
+.. autoclass:: ReplicationMessage
+
+ .. attribute:: payload
+
+ The actual data received from the server.
+
+ An instance of either `bytes()` or `unicode()`, depending on the value
+ of `decode` option passed to `~ReplicationCursor.start_replication()`
+ on the connection. See `~ReplicationCursor.read_message()` for
+ details.
+
+ .. attribute:: data_size
+
+ The raw size of the message payload (before possible unicode
+ conversion).
+
+ .. attribute:: data_start
+
+ LSN position of the start of the message.
+
+ .. attribute:: wal_end
+
+ LSN position of the current end of WAL on the server.
+
+ .. attribute:: send_time
+
+ A `~datetime` object representing the server timestamp at the moment
+ when the message was sent.
+
+ .. attribute:: cursor
+
+ A reference to the corresponding `ReplicationCursor` object.
+
+
+.. index::
+ pair: Cursor; replication
+
+.. autoclass:: ReplicationCursor
+
+ .. method:: create_replication_slot(slot_name, slot_type=None, output_plugin=None)
+
+ Create streaming replication slot.
+
+ :param slot_name: name of the replication slot to be created
+ :param slot_type: type of replication: should be either
+ `REPLICATION_LOGICAL` or `REPLICATION_PHYSICAL`
+ :param output_plugin: name of the logical decoding output plugin to be
+ used by the slot; required for logical
+ replication connections, disallowed for physical
+
+ Example::
+
+ log_cur.create_replication_slot("logical1", "test_decoding")
+ phys_cur.create_replication_slot("physical1")
+
+ # either logical or physical replication connection
+ cur.create_replication_slot("slot1", slot_type=REPLICATION_LOGICAL)
+
+ When creating a slot on a logical replication connection, a logical
+ replication slot is created by default. Logical replication requires
+ name of the logical decoding output plugin to be specified.
+
+ When creating a slot on a physical replication connection, a physical
+ replication slot is created by default. No output plugin parameter is
+ required or allowed when creating a physical replication slot.
+
+ In either case the type of slot being created can be specified
+ explicitly using *slot_type* parameter.
+
+ Replication slots are a feature of PostgreSQL server starting with
+ version 9.4.
+
+ .. method:: drop_replication_slot(slot_name)
+
+ Drop streaming replication slot.
+
+ :param slot_name: name of the replication slot to drop
+
+ Example::
+
+ # either logical or physical replication connection
+ cur.drop_replication_slot("slot1")
+
+ Replication slots are a feature of PostgreSQL server starting with
+ version 9.4.
+
+ .. method:: start_replication(slot_name=None, slot_type=None, start_lsn=0, timeline=0, options=None, decode=False)
+
+ Start replication on the connection.
+
+ :param slot_name: name of the replication slot to use; required for
+ logical replication, physical replication can work
+ with or without a slot
+ :param slot_type: type of replication: should be either
+ `REPLICATION_LOGICAL` or `REPLICATION_PHYSICAL`
+ :param start_lsn: the optional LSN position to start replicating from,
+ can be an integer or a string of hexadecimal digits
+ in the form ``XXX/XXX``
+ :param timeline: WAL history timeline to start streaming from (optional,
+ can only be used with physical replication)
+ :param options: a dictionary of options to pass to logical replication
+ slot (not allowed with physical replication)
+ :param decode: a flag indicating that unicode conversion should be
+ performed on messages received from the server
+
+ If a *slot_name* is specified, the slot must exist on the server and
+ its type must match the replication type used.
+
+ If not specified using *slot_type* parameter, the type of replication
+ is defined by the type of replication connection. Logical replication
+ is only allowed on logical replication connection, but physical
+ replication can be used with both types of connection.
+
+ On the other hand, physical replication doesn't require a named
+ replication slot to be used, only logical replication does. In any
+ case logical replication and replication slots are a feature of
+ PostgreSQL server starting with version 9.4. Physical replication can
+ be used starting with 9.0.
+
+ If *start_lsn* is specified, the requested stream will start from that
+ LSN. The default is `!None` which passes the LSN ``0/0`` causing
+ replay to begin at the last point for which the server got flush
+ confirmation from the client, or the oldest available point for a new
+ slot.
+
+ The server might produce an error if a WAL file for the given LSN has
+ already been recycled or it may silently start streaming from a later
+ position: the client can verify the actual position using information
+ provided by the `ReplicationMessage` attributes. The exact server
+ behavior depends on the type of replication and use of slots.
+
+ The *timeline* parameter can only be specified with physical
+ replication and only starting with server version 9.3.
+
+ A dictionary of *options* may be passed to the logical decoding plugin
+ on a logical replication slot. The set of supported options depends
+ on the output plugin that was used to create the slot. Must be
+ `!None` for physical replication.
+
+ If *decode* is set to `!True` the messages received from the server
+ would be converted according to the connection `~connection.encoding`.
+ *This parameter should not be set with physical replication or with
+ logical replication plugins that produce binary output.*
+
+ This function constructs a ``START_REPLICATION`` command and calls
+ `start_replication_expert()` internally.
+
+ After starting the replication, to actually consume the incoming
+ server messages use `consume_stream()` or implement a loop around
+ `read_message()` in case of :ref:`asynchronous connection
+ <async-support>`.
+
+ .. method:: start_replication_expert(command, decode=False)
+
+ Start replication on the connection using provided
+ ``START_REPLICATION`` command. See `start_replication()` for
+ description of *decode* parameter.
+
+ .. method:: consume_stream(consume, keepalive_interval=10)
+
+ :param consume: a callable object with signature :samp:`consume({msg})`
+ :param keepalive_interval: interval (in seconds) to send keepalive
+ messages to the server
+
+ This method can only be used with synchronous connection. For
+ asynchronous connections see `read_message()`.
+
+ Before using this method to consume the stream call
+ `start_replication()` first.
+
+ This method enters an endless loop reading messages from the server
+ and passing them to ``consume()`` one at a time, then waiting for more
+ messages from the server. In order to make this method break out of
+ the loop and return, ``consume()`` can throw a `StopReplication`
+ exception. Any unhandled exception will make it break out of the loop
+ as well.
+
+ The *msg* object passed to ``consume()`` is an instance of
+ `ReplicationMessage` class. See `read_message()` for details about
+ message decoding.
+
+ This method also sends keepalive messages to the server in case there
+ were no new data from the server for the duration of
+ *keepalive_interval* (in seconds). The value of this parameter must
+ be set to at least 1 second, but it can have a fractional part.
+
+ After processing certain amount of messages the client should send a
+ confirmation message to the server. This should be done by calling
+ `send_feedback()` method on the corresponding replication cursor. A
+ reference to the cursor is provided in the `ReplicationMessage` as an
+ attribute.
+
+ The following example is a sketch implementation of ``consume()``
+ callable for logical replication::
+
+ class LogicalStreamConsumer(object):
+
+ ...
+
+ def __call__(self, msg):
+ self.process_message(msg.payload)
+
+ if self.should_send_feedback(msg):
+ msg.cursor.send_feedback(flush_lsn=msg.data_start)
+
+ consumer = LogicalStreamConsumer()
+ cur.consume_stream(consumer)
+
+ .. warning::
+
+ When using replication with slots, failure to constantly consume
+ *and* report success to the server appropriately can eventually
+ lead to "disk full" condition on the server, because the server
+ retains all the WAL segments that might be needed to stream the
+ changes via all of the currently open replication slots.
+
+ On the other hand, it is not recommended to send confirmation
+ after *every* processed message, since that will put an
+ unnecessary load on network and the server. A possible strategy
+ is to confirm after every COMMIT message.
+
+ .. method:: send_feedback(write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False)
+
+ :param write_lsn: a LSN position up to which the client has written the data locally
+ :param flush_lsn: a LSN position up to which the client has processed the
+ data reliably (the server is allowed to discard all
+ and every data that predates this LSN)
+ :param apply_lsn: a LSN position up to which the warm standby server
+ has applied the changes (physical replication
+ master-slave protocol only)
+ :param reply: request the server to send back a keepalive message immediately
+
+ Use this method to report to the server that all messages up to a
+ certain LSN position have been processed on the client and may be
+ discarded on the server.
+
+ This method can also be called with all default parameters' values to
+ just send a keepalive message to the server.
+
+ Low-level replication cursor methods for :ref:`asynchronous connection
+ <async-support>` operation.
+
+ With the synchronous connection a call to `consume_stream()` handles all
+ the complexity of handling the incoming messages and sending keepalive
+ replies, but at times it might be beneficial to use low-level interface
+ for better control, in particular to `~select` on multiple sockets. The
+ following methods are provided for asynchronous operation:
+
+ .. method:: read_message()
+
+ Try to read the next message from the server without blocking and
+ return an instance of `ReplicationMessage` or `!None`, in case there
+ are no more data messages from the server at the moment.
+
+ This method should be used in a loop with asynchronous connections
+ (after calling `start_replication()` once). For synchronous
+ connections see `consume_stream()`.
+
+ The returned message's `~ReplicationMessage.payload` is an instance of
+ `!unicode` decoded according to connection `~connection.encoding`
+ *iff* *decode* was set to `!True` in the initial call to
+ `start_replication()` on this connection, otherwise it is an instance
+ of `!bytes` with no decoding.
+
+ It is expected that the calling code will call this method repeatedly
+ in order to consume all of the messages that might have been buffered
+ until `!None` is returned. After receiving `!None` from this method
+ the caller should use `~select.select()` or `~select.poll()` on the
+ corresponding connection to block the process until there is more data
+ from the server.
+
+ The server can send keepalive messages to the client periodically.
+ Such messages are silently consumed by this method and are never
+ reported to the caller.
+
+ .. method:: fileno()
+
+ Call the corresponding connection's `~connection.fileno()` method and
+ return the result.
+
+ This is a convenience method which allows replication cursor to be
+ used directly in `~select.select()` or `~select.poll()` calls.
+
+ .. attribute:: io_timestamp
+
+ A `~datetime` object representing the timestamp at the moment of last
+ communication with the server (a data or keepalive message in either
+ direction).
+
+ An actual example of asynchronous operation might look like this::
+
+ from select import select
+ from datetime import datetime
+
+ def consume(msg):
+ ...
+
+ keepalive_interval = 10.0
+ while True:
+ msg = cur.read_message()
+ if msg:
+ consume(msg)
+ else:
+ now = datetime.now()
+ timeout = keepalive_interval - (now - cur.io_timestamp).total_seconds()
+ try:
+ sel = select([cur], [], [], max(0, timeout))
+ if not any(sel):
+ cur.send_feedback() # timed out, send keepalive message
+ except InterruptedError:
+ pass # recalculate timeout and continue
+
+.. index::
+ pair: Cursor; Replication
+
+.. autoclass:: StopReplication
+
+
.. index::
single: Data types; Additional