diff options
author | Daniele Varrazzo <daniele.varrazzo@gmail.com> | 2016-08-14 21:09:54 +0100 |
---|---|---|
committer | Daniele Varrazzo <daniele.varrazzo@gmail.com> | 2016-08-14 21:09:54 +0100 |
commit | 1d950748af199d76069a5fb71bd9f7ace7a2e50e (patch) | |
tree | bf981b7ad7fe4f230af285fe76a8c9bcdab83b77 /lib/extras.py | |
parent | e779fec5f9eefa5fd2f943e15a785987feece679 (diff) | |
parent | 01c552baa3847819d024a0f945ec2b4f3bbeadba (diff) | |
download | psycopg2-1d950748af199d76069a5fb71bd9f7ace7a2e50e.tar.gz |
Merge branch 'replication-protocol'
Diffstat (limited to 'lib/extras.py')
-rw-r--r-- | lib/extras.py | 123 |
1 files changed, 122 insertions, 1 deletions
diff --git a/lib/extras.py b/lib/extras.py index 2713d6f..6ae9851 100644 --- a/lib/extras.py +++ b/lib/extras.py @@ -39,8 +39,12 @@ import psycopg2 from psycopg2 import extensions as _ext from psycopg2.extensions import cursor as _cursor from psycopg2.extensions import connection as _connection -from psycopg2.extensions import adapt as _A +from psycopg2.extensions import adapt as _A, quote_ident from psycopg2.extensions import b +from psycopg2._psycopg import REPLICATION_PHYSICAL, REPLICATION_LOGICAL +from psycopg2._psycopg import ReplicationConnection as _replicationConnection +from psycopg2._psycopg import ReplicationCursor as _replicationCursor +from psycopg2._psycopg import ReplicationMessage class DictCursorBase(_cursor): @@ -437,6 +441,123 @@ class MinTimeLoggingCursor(LoggingCursor): return LoggingCursor.callproc(self, procname, vars) +class LogicalReplicationConnection(_replicationConnection): + + def __init__(self, *args, **kwargs): + kwargs['replication_type'] = REPLICATION_LOGICAL + super(LogicalReplicationConnection, self).__init__(*args, **kwargs) + + +class PhysicalReplicationConnection(_replicationConnection): + + def __init__(self, *args, **kwargs): + kwargs['replication_type'] = REPLICATION_PHYSICAL + super(PhysicalReplicationConnection, self).__init__(*args, **kwargs) + + +class StopReplication(Exception): + """ + Exception used to break out of the endless loop in + `~ReplicationCursor.consume_stream()`. + + Subclass of `~exceptions.Exception`. Intentionally *not* inherited from + `~psycopg2.Error` as occurrence of this exception does not indicate an + error. + """ + pass + + +class ReplicationCursor(_replicationCursor): + """A cursor used for communication on replication connections.""" + + def create_replication_slot(self, slot_name, slot_type=None, output_plugin=None): + """Create streaming replication slot.""" + + command = "CREATE_REPLICATION_SLOT %s " % quote_ident(slot_name, self) + + if slot_type is None: + slot_type = self.connection.replication_type + + if slot_type == REPLICATION_LOGICAL: + if output_plugin is None: + raise psycopg2.ProgrammingError("output plugin name is required to create logical replication slot") + + command += "LOGICAL %s" % quote_ident(output_plugin, self) + + elif slot_type == REPLICATION_PHYSICAL: + if output_plugin is not None: + raise psycopg2.ProgrammingError("cannot specify output plugin name when creating physical replication slot") + + command += "PHYSICAL" + + else: + raise psycopg2.ProgrammingError("unrecognized replication type: %s" % repr(slot_type)) + + self.execute(command) + + def drop_replication_slot(self, slot_name): + """Drop streaming replication slot.""" + + command = "DROP_REPLICATION_SLOT %s" % quote_ident(slot_name, self) + self.execute(command) + + def start_replication(self, slot_name=None, slot_type=None, start_lsn=0, + timeline=0, options=None, decode=False): + """Start replication stream.""" + + command = "START_REPLICATION " + + if slot_type is None: + slot_type = self.connection.replication_type + + if slot_type == REPLICATION_LOGICAL: + if slot_name: + command += "SLOT %s " % quote_ident(slot_name, self) + else: + raise psycopg2.ProgrammingError("slot name is required for logical replication") + + command += "LOGICAL " + + elif slot_type == REPLICATION_PHYSICAL: + if slot_name: + command += "SLOT %s " % quote_ident(slot_name, self) + # don't add "PHYSICAL", before 9.4 it was just START_REPLICATION XXX/XXX + + else: + raise psycopg2.ProgrammingError("unrecognized replication type: %s" % repr(slot_type)) + + if type(start_lsn) is str: + lsn = start_lsn.split('/') + lsn = "%X/%08X" % (int(lsn[0], 16), int(lsn[1], 16)) + else: + lsn = "%X/%08X" % ((start_lsn >> 32) & 0xFFFFFFFF, start_lsn & 0xFFFFFFFF) + + command += lsn + + if timeline != 0: + if slot_type == REPLICATION_LOGICAL: + raise psycopg2.ProgrammingError("cannot specify timeline for logical replication") + + command += " TIMELINE %d" % timeline + + if options: + if slot_type == REPLICATION_PHYSICAL: + raise psycopg2.ProgrammingError("cannot specify output plugin options for physical replication") + + command += " (" + for k,v in options.iteritems(): + if not command.endswith('('): + command += ", " + command += "%s %s" % (quote_ident(k, self), _A(str(v))) + command += ")" + + self.start_replication_expert(command, decode=decode) + + # allows replication cursors to be used in select.select() directly + def fileno(self): + return self.connection.fileno() + + # a dbtype and adapter for Python UUID type class UUID_adapter(object): |