diff options
Diffstat (limited to 'lib/extras.py')
-rw-r--r-- | lib/extras.py | 169 |
1 files changed, 168 insertions, 1 deletions
diff --git a/lib/extras.py b/lib/extras.py index 2713d6f..8a8d34f 100644 --- a/lib/extras.py +++ b/lib/extras.py @@ -39,7 +39,9 @@ import psycopg2 from psycopg2 import extensions as _ext from psycopg2.extensions import cursor as _cursor from psycopg2.extensions import connection as _connection -from psycopg2.extensions import adapt as _A +from psycopg2.extensions import ReplicationCursor as _replicationCursor +from psycopg2.extensions import ReplicationMessage +from psycopg2.extensions import adapt as _A, quote_ident from psycopg2.extensions import b @@ -437,6 +439,171 @@ class MinTimeLoggingCursor(LoggingCursor): return LoggingCursor.callproc(self, procname, vars) +"""Replication connection types.""" +REPLICATION_LOGICAL = "LOGICAL" +REPLICATION_PHYSICAL = "PHYSICAL" + + +class ReplicationConnectionBase(_connection): + """ + Base class for Logical and Physical replication connection + classes. Uses `ReplicationCursor` automatically. + """ + + def __init__(self, *args, **kwargs): + """ + Initializes a replication connection by adding appropriate + parameters to the provided DSN and tweaking the connection + attributes. + """ + + # replication_type is set in subclasses + if self.replication_type == REPLICATION_LOGICAL: + replication = 'database' + + elif self.replication_type == REPLICATION_PHYSICAL: + replication = 'true' + + else: + raise psycopg2.ProgrammingError("unrecognized replication type: %s" % self.replication_type) + + items = _ext.parse_dsn(args[0]) + + # we add an appropriate replication keyword parameter, unless + # user has specified one explicitly in the DSN + items.setdefault('replication', replication) + + dsn = " ".join(["%s=%s" % (k, psycopg2._param_escape(str(v))) + for (k, v) in items.iteritems()]) + + args = [dsn] + list(args[1:]) # async is the possible 2nd arg + super(ReplicationConnectionBase, self).__init__(*args, **kwargs) + + # prevent auto-issued BEGIN statements + if not self.async: + self.autocommit = True + + if self.cursor_factory is None: + self.cursor_factory = ReplicationCursor + + +class LogicalReplicationConnection(ReplicationConnectionBase): + + def __init__(self, *args, **kwargs): + self.replication_type = REPLICATION_LOGICAL + super(LogicalReplicationConnection, self).__init__(*args, **kwargs) + + +class PhysicalReplicationConnection(ReplicationConnectionBase): + + def __init__(self, *args, **kwargs): + self.replication_type = REPLICATION_PHYSICAL + super(PhysicalReplicationConnection, self).__init__(*args, **kwargs) + + +class StopReplication(Exception): + """ + Exception used to break out of the endless loop in + `~ReplicationCursor.consume_stream()`. + + Subclass of `~exceptions.Exception`. Intentionally *not* inherited from + `~psycopg2.Error` as occurrence of this exception does not indicate an + error. + """ + pass + + +class ReplicationCursor(_replicationCursor): + """A cursor used for communication on replication connections.""" + + def create_replication_slot(self, slot_name, slot_type=None, output_plugin=None): + """Create streaming replication slot.""" + + command = "CREATE_REPLICATION_SLOT %s " % quote_ident(slot_name, self) + + if slot_type is None: + slot_type = self.connection.replication_type + + if slot_type == REPLICATION_LOGICAL: + if output_plugin is None: + raise psycopg2.ProgrammingError("output plugin name is required to create logical replication slot") + + command += "%s %s" % (slot_type, quote_ident(output_plugin, self)) + + elif slot_type == REPLICATION_PHYSICAL: + if output_plugin is not None: + raise psycopg2.ProgrammingError("cannot specify output plugin name when creating physical replication slot") + + command += slot_type + + else: + raise psycopg2.ProgrammingError("unrecognized replication type: %s" % slot_type) + + self.execute(command) + + def drop_replication_slot(self, slot_name): + """Drop streaming replication slot.""" + + command = "DROP_REPLICATION_SLOT %s" % quote_ident(slot_name, self) + self.execute(command) + + def start_replication(self, slot_name=None, slot_type=None, start_lsn=0, + timeline=0, options=None, decode=False): + """Start replication stream.""" + + command = "START_REPLICATION " + + if slot_type is None: + slot_type = self.connection.replication_type + + if slot_type == REPLICATION_LOGICAL: + if slot_name: + command += "SLOT %s " % quote_ident(slot_name, self) + else: + raise psycopg2.ProgrammingError("slot name is required for logical replication") + + command += "%s " % slot_type + + elif slot_type == REPLICATION_PHYSICAL: + if slot_name: + command += "SLOT %s " % quote_ident(slot_name, self) + # don't add "PHYSICAL", before 9.4 it was just START_REPLICATION XXX/XXX + + else: + raise psycopg2.ProgrammingError("unrecognized replication type: %s" % slot_type) + + if type(start_lsn) is str: + lsn = start_lsn.split('/') + lsn = "%X/%08X" % (int(lsn[0], 16), int(lsn[1], 16)) + else: + lsn = "%X/%08X" % ((start_lsn >> 32) & 0xFFFFFFFF, start_lsn & 0xFFFFFFFF) + + command += lsn + + if timeline != 0: + if slot_type == REPLICATION_LOGICAL: + raise psycopg2.ProgrammingError("cannot specify timeline for logical replication") + + command += " TIMELINE %d" % timeline + + if options: + if slot_type == REPLICATION_PHYSICAL: + raise psycopg2.ProgrammingError("cannot specify output plugin options for physical replication") + + command += " (" + for k,v in options.iteritems(): + if not command.endswith('('): + command += ", " + command += "%s %s" % (quote_ident(k, self), _A(str(v))) + command += ")" + + self.start_replication_expert(command, decode=decode) + + # allows replication cursors to be used in select.select() directly + def fileno(self): + return self.connection.fileno() + + # a dbtype and adapter for Python UUID type class UUID_adapter(object): |