summaryrefslogtreecommitdiff
path: root/lib/extras.py
diff options
context:
space:
mode:
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>2016-08-14 21:09:54 +0100
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>2016-08-14 21:09:54 +0100
commit1d950748af199d76069a5fb71bd9f7ace7a2e50e (patch)
treebf981b7ad7fe4f230af285fe76a8c9bcdab83b77 /lib/extras.py
parente779fec5f9eefa5fd2f943e15a785987feece679 (diff)
parent01c552baa3847819d024a0f945ec2b4f3bbeadba (diff)
downloadpsycopg2-1d950748af199d76069a5fb71bd9f7ace7a2e50e.tar.gz
Merge branch 'replication-protocol'
Diffstat (limited to 'lib/extras.py')
-rw-r--r--lib/extras.py123
1 files changed, 122 insertions, 1 deletions
diff --git a/lib/extras.py b/lib/extras.py
index 2713d6f..6ae9851 100644
--- a/lib/extras.py
+++ b/lib/extras.py
@@ -39,8 +39,12 @@ import psycopg2
from psycopg2 import extensions as _ext
from psycopg2.extensions import cursor as _cursor
from psycopg2.extensions import connection as _connection
-from psycopg2.extensions import adapt as _A
+from psycopg2.extensions import adapt as _A, quote_ident
from psycopg2.extensions import b
+from psycopg2._psycopg import REPLICATION_PHYSICAL, REPLICATION_LOGICAL
+from psycopg2._psycopg import ReplicationConnection as _replicationConnection
+from psycopg2._psycopg import ReplicationCursor as _replicationCursor
+from psycopg2._psycopg import ReplicationMessage
class DictCursorBase(_cursor):
@@ -437,6 +441,123 @@ class MinTimeLoggingCursor(LoggingCursor):
return LoggingCursor.callproc(self, procname, vars)
+class LogicalReplicationConnection(_replicationConnection):
+
+ def __init__(self, *args, **kwargs):
+ kwargs['replication_type'] = REPLICATION_LOGICAL
+ super(LogicalReplicationConnection, self).__init__(*args, **kwargs)
+
+
+class PhysicalReplicationConnection(_replicationConnection):
+
+ def __init__(self, *args, **kwargs):
+ kwargs['replication_type'] = REPLICATION_PHYSICAL
+ super(PhysicalReplicationConnection, self).__init__(*args, **kwargs)
+
+
+class StopReplication(Exception):
+ """
+ Exception used to break out of the endless loop in
+ `~ReplicationCursor.consume_stream()`.
+
+ Subclass of `~exceptions.Exception`. Intentionally *not* inherited from
+ `~psycopg2.Error` as occurrence of this exception does not indicate an
+ error.
+ """
+ pass
+
+
+class ReplicationCursor(_replicationCursor):
+ """A cursor used for communication on replication connections."""
+
+ def create_replication_slot(self, slot_name, slot_type=None, output_plugin=None):
+ """Create streaming replication slot."""
+
+ command = "CREATE_REPLICATION_SLOT %s " % quote_ident(slot_name, self)
+
+ if slot_type is None:
+ slot_type = self.connection.replication_type
+
+ if slot_type == REPLICATION_LOGICAL:
+ if output_plugin is None:
+ raise psycopg2.ProgrammingError("output plugin name is required to create logical replication slot")
+
+ command += "LOGICAL %s" % quote_ident(output_plugin, self)
+
+ elif slot_type == REPLICATION_PHYSICAL:
+ if output_plugin is not None:
+ raise psycopg2.ProgrammingError("cannot specify output plugin name when creating physical replication slot")
+
+ command += "PHYSICAL"
+
+ else:
+ raise psycopg2.ProgrammingError("unrecognized replication type: %s" % repr(slot_type))
+
+ self.execute(command)
+
+ def drop_replication_slot(self, slot_name):
+ """Drop streaming replication slot."""
+
+ command = "DROP_REPLICATION_SLOT %s" % quote_ident(slot_name, self)
+ self.execute(command)
+
+ def start_replication(self, slot_name=None, slot_type=None, start_lsn=0,
+ timeline=0, options=None, decode=False):
+ """Start replication stream."""
+
+ command = "START_REPLICATION "
+
+ if slot_type is None:
+ slot_type = self.connection.replication_type
+
+ if slot_type == REPLICATION_LOGICAL:
+ if slot_name:
+ command += "SLOT %s " % quote_ident(slot_name, self)
+ else:
+ raise psycopg2.ProgrammingError("slot name is required for logical replication")
+
+ command += "LOGICAL "
+
+ elif slot_type == REPLICATION_PHYSICAL:
+ if slot_name:
+ command += "SLOT %s " % quote_ident(slot_name, self)
+ # don't add "PHYSICAL", before 9.4 it was just START_REPLICATION XXX/XXX
+
+ else:
+ raise psycopg2.ProgrammingError("unrecognized replication type: %s" % repr(slot_type))
+
+ if type(start_lsn) is str:
+ lsn = start_lsn.split('/')
+ lsn = "%X/%08X" % (int(lsn[0], 16), int(lsn[1], 16))
+ else:
+ lsn = "%X/%08X" % ((start_lsn >> 32) & 0xFFFFFFFF, start_lsn & 0xFFFFFFFF)
+
+ command += lsn
+
+ if timeline != 0:
+ if slot_type == REPLICATION_LOGICAL:
+ raise psycopg2.ProgrammingError("cannot specify timeline for logical replication")
+
+ command += " TIMELINE %d" % timeline
+
+ if options:
+ if slot_type == REPLICATION_PHYSICAL:
+ raise psycopg2.ProgrammingError("cannot specify output plugin options for physical replication")
+
+ command += " ("
+ for k,v in options.iteritems():
+ if not command.endswith('('):
+ command += ", "
+ command += "%s %s" % (quote_ident(k, self), _A(str(v)))
+ command += ")"
+
+ self.start_replication_expert(command, decode=decode)
+
+ # allows replication cursors to be used in select.select() directly
+ def fileno(self):
+ return self.connection.fileno()
+
+
# a dbtype and adapter for Python UUID type
class UUID_adapter(object):