summaryrefslogtreecommitdiff
path: root/lib/extras.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/extras.py')
-rw-r--r--lib/extras.py57
1 files changed, 11 insertions, 46 deletions
diff --git a/lib/extras.py b/lib/extras.py
index 8a8d34f..6e815d6 100644
--- a/lib/extras.py
+++ b/lib/extras.py
@@ -39,6 +39,8 @@ import psycopg2
from psycopg2 import extensions as _ext
from psycopg2.extensions import cursor as _cursor
from psycopg2.extensions import connection as _connection
+from psycopg2.extensions import REPLICATION_PHYSICAL, REPLICATION_LOGICAL
+from psycopg2.extensions import ReplicationConnection as _replicationConnection
from psycopg2.extensions import ReplicationCursor as _replicationCursor
from psycopg2.extensions import ReplicationMessage
from psycopg2.extensions import adapt as _A, quote_ident
@@ -439,65 +441,28 @@ class MinTimeLoggingCursor(LoggingCursor):
return LoggingCursor.callproc(self, procname, vars)
-"""Replication connection types."""
-REPLICATION_LOGICAL = "LOGICAL"
-REPLICATION_PHYSICAL = "PHYSICAL"
-
-
-class ReplicationConnectionBase(_connection):
+class ReplicationConnectionBase(_replicationConnection):
"""
Base class for Logical and Physical replication connection
classes. Uses `ReplicationCursor` automatically.
"""
def __init__(self, *args, **kwargs):
- """
- Initializes a replication connection by adding appropriate
- parameters to the provided DSN and tweaking the connection
- attributes.
- """
-
- # replication_type is set in subclasses
- if self.replication_type == REPLICATION_LOGICAL:
- replication = 'database'
-
- elif self.replication_type == REPLICATION_PHYSICAL:
- replication = 'true'
-
- else:
- raise psycopg2.ProgrammingError("unrecognized replication type: %s" % self.replication_type)
-
- items = _ext.parse_dsn(args[0])
-
- # we add an appropriate replication keyword parameter, unless
- # user has specified one explicitly in the DSN
- items.setdefault('replication', replication)
-
- dsn = " ".join(["%s=%s" % (k, psycopg2._param_escape(str(v)))
- for (k, v) in items.iteritems()])
-
- args = [dsn] + list(args[1:]) # async is the possible 2nd arg
super(ReplicationConnectionBase, self).__init__(*args, **kwargs)
-
- # prevent auto-issued BEGIN statements
- if not self.async:
- self.autocommit = True
-
- if self.cursor_factory is None:
- self.cursor_factory = ReplicationCursor
+ self.cursor_factory = ReplicationCursor
class LogicalReplicationConnection(ReplicationConnectionBase):
def __init__(self, *args, **kwargs):
- self.replication_type = REPLICATION_LOGICAL
+ kwargs['replication_type'] = REPLICATION_LOGICAL
super(LogicalReplicationConnection, self).__init__(*args, **kwargs)
class PhysicalReplicationConnection(ReplicationConnectionBase):
def __init__(self, *args, **kwargs):
- self.replication_type = REPLICATION_PHYSICAL
+ kwargs['replication_type'] = REPLICATION_PHYSICAL
super(PhysicalReplicationConnection, self).__init__(*args, **kwargs)
@@ -528,16 +493,16 @@ class ReplicationCursor(_replicationCursor):
if output_plugin is None:
raise psycopg2.ProgrammingError("output plugin name is required to create logical replication slot")
- command += "%s %s" % (slot_type, quote_ident(output_plugin, self))
+ command += "LOGICAL %s" % quote_ident(output_plugin, self)
elif slot_type == REPLICATION_PHYSICAL:
if output_plugin is not None:
raise psycopg2.ProgrammingError("cannot specify output plugin name when creating physical replication slot")
- command += slot_type
+ command += "PHYSICAL"
else:
- raise psycopg2.ProgrammingError("unrecognized replication type: %s" % slot_type)
+ raise psycopg2.ProgrammingError("unrecognized replication type: %s" % repr(slot_type))
self.execute(command)
@@ -562,7 +527,7 @@ class ReplicationCursor(_replicationCursor):
else:
raise psycopg2.ProgrammingError("slot name is required for logical replication")
- command += "%s " % slot_type
+ command += "LOGICAL "
elif slot_type == REPLICATION_PHYSICAL:
if slot_name:
@@ -570,7 +535,7 @@ class ReplicationCursor(_replicationCursor):
# don't add "PHYSICAL", before 9.4 it was just START_REPLICATION XXX/XXX
else:
- raise psycopg2.ProgrammingError("unrecognized replication type: %s" % slot_type)
+ raise psycopg2.ProgrammingError("unrecognized replication type: %s" % repr(slot_type))
if type(start_lsn) is str:
lsn = start_lsn.split('/')