diff options
Diffstat (limited to 'lib/extras.py')
-rw-r--r-- | lib/extras.py | 57 |
1 files changed, 11 insertions, 46 deletions
diff --git a/lib/extras.py b/lib/extras.py index 8a8d34f..6e815d6 100644 --- a/lib/extras.py +++ b/lib/extras.py @@ -39,6 +39,8 @@ import psycopg2 from psycopg2 import extensions as _ext from psycopg2.extensions import cursor as _cursor from psycopg2.extensions import connection as _connection +from psycopg2.extensions import REPLICATION_PHYSICAL, REPLICATION_LOGICAL +from psycopg2.extensions import ReplicationConnection as _replicationConnection from psycopg2.extensions import ReplicationCursor as _replicationCursor from psycopg2.extensions import ReplicationMessage from psycopg2.extensions import adapt as _A, quote_ident @@ -439,65 +441,28 @@ class MinTimeLoggingCursor(LoggingCursor): return LoggingCursor.callproc(self, procname, vars) -"""Replication connection types.""" -REPLICATION_LOGICAL = "LOGICAL" -REPLICATION_PHYSICAL = "PHYSICAL" - - -class ReplicationConnectionBase(_connection): +class ReplicationConnectionBase(_replicationConnection): """ Base class for Logical and Physical replication connection classes. Uses `ReplicationCursor` automatically. """ def __init__(self, *args, **kwargs): - """ - Initializes a replication connection by adding appropriate - parameters to the provided DSN and tweaking the connection - attributes. - """ - - # replication_type is set in subclasses - if self.replication_type == REPLICATION_LOGICAL: - replication = 'database' - - elif self.replication_type == REPLICATION_PHYSICAL: - replication = 'true' - - else: - raise psycopg2.ProgrammingError("unrecognized replication type: %s" % self.replication_type) - - items = _ext.parse_dsn(args[0]) - - # we add an appropriate replication keyword parameter, unless - # user has specified one explicitly in the DSN - items.setdefault('replication', replication) - - dsn = " ".join(["%s=%s" % (k, psycopg2._param_escape(str(v))) - for (k, v) in items.iteritems()]) - - args = [dsn] + list(args[1:]) # async is the possible 2nd arg super(ReplicationConnectionBase, self).__init__(*args, **kwargs) - - # prevent auto-issued BEGIN statements - if not self.async: - self.autocommit = True - - if self.cursor_factory is None: - self.cursor_factory = ReplicationCursor + self.cursor_factory = ReplicationCursor class LogicalReplicationConnection(ReplicationConnectionBase): def __init__(self, *args, **kwargs): - self.replication_type = REPLICATION_LOGICAL + kwargs['replication_type'] = REPLICATION_LOGICAL super(LogicalReplicationConnection, self).__init__(*args, **kwargs) class PhysicalReplicationConnection(ReplicationConnectionBase): def __init__(self, *args, **kwargs): - self.replication_type = REPLICATION_PHYSICAL + kwargs['replication_type'] = REPLICATION_PHYSICAL super(PhysicalReplicationConnection, self).__init__(*args, **kwargs) @@ -528,16 +493,16 @@ class ReplicationCursor(_replicationCursor): if output_plugin is None: raise psycopg2.ProgrammingError("output plugin name is required to create logical replication slot") - command += "%s %s" % (slot_type, quote_ident(output_plugin, self)) + command += "LOGICAL %s" % quote_ident(output_plugin, self) elif slot_type == REPLICATION_PHYSICAL: if output_plugin is not None: raise psycopg2.ProgrammingError("cannot specify output plugin name when creating physical replication slot") - command += slot_type + command += "PHYSICAL" else: - raise psycopg2.ProgrammingError("unrecognized replication type: %s" % slot_type) + raise psycopg2.ProgrammingError("unrecognized replication type: %s" % repr(slot_type)) self.execute(command) @@ -562,7 +527,7 @@ class ReplicationCursor(_replicationCursor): else: raise psycopg2.ProgrammingError("slot name is required for logical replication") - command += "%s " % slot_type + command += "LOGICAL " elif slot_type == REPLICATION_PHYSICAL: if slot_name: @@ -570,7 +535,7 @@ class ReplicationCursor(_replicationCursor): # don't add "PHYSICAL", before 9.4 it was just START_REPLICATION XXX/XXX else: - raise psycopg2.ProgrammingError("unrecognized replication type: %s" % slot_type) + raise psycopg2.ProgrammingError("unrecognized replication type: %s" % repr(slot_type)) if type(start_lsn) is str: lsn = start_lsn.split('/') |