summaryrefslogtreecommitdiff
path: root/lib/extras.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/extras.py')
-rw-r--r--lib/extras.py169
1 files changed, 168 insertions, 1 deletions
diff --git a/lib/extras.py b/lib/extras.py
index 2713d6f..8a8d34f 100644
--- a/lib/extras.py
+++ b/lib/extras.py
@@ -39,7 +39,9 @@ import psycopg2
from psycopg2 import extensions as _ext
from psycopg2.extensions import cursor as _cursor
from psycopg2.extensions import connection as _connection
-from psycopg2.extensions import adapt as _A
+from psycopg2.extensions import ReplicationCursor as _replicationCursor
+from psycopg2.extensions import ReplicationMessage
+from psycopg2.extensions import adapt as _A, quote_ident
from psycopg2.extensions import b
@@ -437,6 +439,171 @@ class MinTimeLoggingCursor(LoggingCursor):
return LoggingCursor.callproc(self, procname, vars)
+"""Replication connection types."""
+REPLICATION_LOGICAL = "LOGICAL"
+REPLICATION_PHYSICAL = "PHYSICAL"
+
+
+class ReplicationConnectionBase(_connection):
+ """
+ Base class for Logical and Physical replication connection
+ classes. Uses `ReplicationCursor` automatically.
+ """
+
+ def __init__(self, *args, **kwargs):
+ """
+ Initializes a replication connection by adding appropriate
+ parameters to the provided DSN and tweaking the connection
+ attributes.
+ """
+
+ # replication_type is set in subclasses
+ if self.replication_type == REPLICATION_LOGICAL:
+ replication = 'database'
+
+ elif self.replication_type == REPLICATION_PHYSICAL:
+ replication = 'true'
+
+ else:
+ raise psycopg2.ProgrammingError("unrecognized replication type: %s" % self.replication_type)
+
+ items = _ext.parse_dsn(args[0])
+
+ # we add an appropriate replication keyword parameter, unless
+ # user has specified one explicitly in the DSN
+ items.setdefault('replication', replication)
+
+ dsn = " ".join(["%s=%s" % (k, psycopg2._param_escape(str(v)))
+ for (k, v) in items.iteritems()])
+
+ args = [dsn] + list(args[1:]) # async is the possible 2nd arg
+ super(ReplicationConnectionBase, self).__init__(*args, **kwargs)
+
+ # prevent auto-issued BEGIN statements
+ if not self.async:
+ self.autocommit = True
+
+ if self.cursor_factory is None:
+ self.cursor_factory = ReplicationCursor
+
+
+class LogicalReplicationConnection(ReplicationConnectionBase):
+
+ def __init__(self, *args, **kwargs):
+ self.replication_type = REPLICATION_LOGICAL
+ super(LogicalReplicationConnection, self).__init__(*args, **kwargs)
+
+
+class PhysicalReplicationConnection(ReplicationConnectionBase):
+
+ def __init__(self, *args, **kwargs):
+ self.replication_type = REPLICATION_PHYSICAL
+ super(PhysicalReplicationConnection, self).__init__(*args, **kwargs)
+
+
+class StopReplication(Exception):
+ """
+ Exception used to break out of the endless loop in
+ `~ReplicationCursor.consume_stream()`.
+
+ Subclass of `~exceptions.Exception`. Intentionally *not* inherited from
+ `~psycopg2.Error` as occurrence of this exception does not indicate an
+ error.
+ """
+ pass
+
+
+class ReplicationCursor(_replicationCursor):
+ """A cursor used for communication on replication connections."""
+
+ def create_replication_slot(self, slot_name, slot_type=None, output_plugin=None):
+ """Create streaming replication slot."""
+
+ command = "CREATE_REPLICATION_SLOT %s " % quote_ident(slot_name, self)
+
+ if slot_type is None:
+ slot_type = self.connection.replication_type
+
+ if slot_type == REPLICATION_LOGICAL:
+ if output_plugin is None:
+ raise psycopg2.ProgrammingError("output plugin name is required to create logical replication slot")
+
+ command += "%s %s" % (slot_type, quote_ident(output_plugin, self))
+
+ elif slot_type == REPLICATION_PHYSICAL:
+ if output_plugin is not None:
+ raise psycopg2.ProgrammingError("cannot specify output plugin name when creating physical replication slot")
+
+ command += slot_type
+
+ else:
+ raise psycopg2.ProgrammingError("unrecognized replication type: %s" % slot_type)
+
+ self.execute(command)
+
+ def drop_replication_slot(self, slot_name):
+ """Drop streaming replication slot."""
+
+ command = "DROP_REPLICATION_SLOT %s" % quote_ident(slot_name, self)
+ self.execute(command)
+
+ def start_replication(self, slot_name=None, slot_type=None, start_lsn=0,
+ timeline=0, options=None, decode=False):
+ """Start replication stream."""
+
+ command = "START_REPLICATION "
+
+ if slot_type is None:
+ slot_type = self.connection.replication_type
+
+ if slot_type == REPLICATION_LOGICAL:
+ if slot_name:
+ command += "SLOT %s " % quote_ident(slot_name, self)
+ else:
+ raise psycopg2.ProgrammingError("slot name is required for logical replication")
+
+ command += "%s " % slot_type
+
+ elif slot_type == REPLICATION_PHYSICAL:
+ if slot_name:
+ command += "SLOT %s " % quote_ident(slot_name, self)
+ # don't add "PHYSICAL", before 9.4 it was just START_REPLICATION XXX/XXX
+
+ else:
+ raise psycopg2.ProgrammingError("unrecognized replication type: %s" % slot_type)
+
+ if type(start_lsn) is str:
+ lsn = start_lsn.split('/')
+ lsn = "%X/%08X" % (int(lsn[0], 16), int(lsn[1], 16))
+ else:
+ lsn = "%X/%08X" % ((start_lsn >> 32) & 0xFFFFFFFF, start_lsn & 0xFFFFFFFF)
+
+ command += lsn
+
+ if timeline != 0:
+ if slot_type == REPLICATION_LOGICAL:
+ raise psycopg2.ProgrammingError("cannot specify timeline for logical replication")
+
+ command += " TIMELINE %d" % timeline
+
+ if options:
+ if slot_type == REPLICATION_PHYSICAL:
+ raise psycopg2.ProgrammingError("cannot specify output plugin options for physical replication")
+
+ command += " ("
+ for k,v in options.iteritems():
+ if not command.endswith('('):
+ command += ", "
+ command += "%s %s" % (quote_ident(k, self), _A(str(v)))
+ command += ")"
+
+ self.start_replication_expert(command, decode=decode)
+
+ # allows replication cursors to be used in select.select() directly
+ def fileno(self):
+ return self.connection.fileno()
+
+
# a dbtype and adapter for Python UUID type
class UUID_adapter(object):