summaryrefslogtreecommitdiff
path: root/lib/extras.py
diff options
context:
space:
mode:
authorOleksandr Shulgin <oleksandr.shulgin@zalando.de>2015-10-01 11:08:56 +0200
committerOleksandr Shulgin <oleksandr.shulgin@zalando.de>2015-10-01 19:33:16 +0200
commit937a7a90246916bff0e956947b1bab6058c72d08 (patch)
treeba7ad38dddd28a1ade7f4e66a58a9c59e600de88 /lib/extras.py
parentf872a2aabbf69bc7f16a4c25f226d634f9d019c9 (diff)
downloadpsycopg2-937a7a90246916bff0e956947b1bab6058c72d08.tar.gz
Cleanup start replication wrt. slot type a bit.
Diffstat (limited to 'lib/extras.py')
-rw-r--r--lib/extras.py51
1 files changed, 26 insertions, 25 deletions
diff --git a/lib/extras.py b/lib/extras.py
index 85debc6..36138c6 100644
--- a/lib/extras.py
+++ b/lib/extras.py
@@ -480,8 +480,8 @@ class ReplicationConnection(_connection):
"""Streamging replication types."""
-REPLICATION_PHYSICAL = 0
-REPLICATION_LOGICAL = 1
+REPLICATION_LOGICAL = "LOGICAL"
+REPLICATION_PHYSICAL = "PHYSICAL"
class ReplicationCursor(_cursor):
"""A cursor used for replication commands."""
@@ -504,18 +504,18 @@ class ReplicationCursor(_cursor):
if slot_type == REPLICATION_LOGICAL:
if output_plugin is None:
- raise psycopg2.ProgrammingError("output_plugin is required for logical replication slot")
+ raise psycopg2.ProgrammingError("output plugin name is required for logical replication slot")
- command += "LOGICAL %s" % self.quote_ident(output_plugin)
+ command += "%s %s" % (slot_type, self.quote_ident(output_plugin))
elif slot_type == REPLICATION_PHYSICAL:
if output_plugin is not None:
- raise psycopg2.ProgrammingError("output_plugin is not applicable to physical replication")
+ raise psycopg2.ProgrammingError("cannot specify output plugin name for physical replication slot")
- command += "PHYSICAL"
+ command += slot_type
else:
- raise psycopg2.ProgrammingError("unrecognized replication slot type")
+ raise psycopg2.ProgrammingError("unrecognized replication slot type: %s" % slot_type)
self.execute(command)
@@ -525,44 +525,45 @@ class ReplicationCursor(_cursor):
command = "DROP_REPLICATION_SLOT %s" % self.quote_ident(slot_name)
self.execute(command)
- def start_replication(self, slot_type, slot_name=None, writer=None, start_lsn=None,
+ def start_replication(self, slot_type, slot_name=None, writer=None, start_lsn=0,
timeline=0, keepalive_interval=10, options=None):
"""Start and consume replication stream."""
command = "START_REPLICATION "
- if slot_type == REPLICATION_LOGICAL and slot_name is None:
- raise psycopg2.ProgrammingError("slot_name is required for logical replication slot")
+ if slot_type == REPLICATION_LOGICAL:
+ if slot_name:
+ command += "SLOT %s " % self.quote_ident(slot_name)
+ else:
+ raise psycopg2.ProgrammingError("slot name is required for logical replication")
- if slot_name:
- command += "SLOT %s " % self.quote_ident(slot_name)
+ command += "%s " % slot_type
- if slot_type == REPLICATION_LOGICAL:
- command += "LOGICAL "
elif slot_type == REPLICATION_PHYSICAL:
- command += "PHYSICAL "
+ if slot_name:
+ command += "SLOT %s " % self.quote_ident(slot_name)
+
+ # don't add "PHYSICAL", before 9.4 it was just START_REPLICATION XXX/XXX
else:
- raise psycopg2.ProgrammingError("unrecognized replication slot type")
+ raise psycopg2.ProgrammingError("unrecognized replication slot type: %s" % slot_type)
- if start_lsn is None:
- start_lsn = '0/0'
+ if type(start_lsn) is str:
+ lsn = start_lsn.split('/')
+ lsn = "%X/%08X" % (int(lsn[0], 16), int(lsn[1], 16))
+ else:
+ lsn = "%X/%08X" % ((start_lsn >> 32) & 0xFFFFFFFF, start_lsn & 0xFFFFFFFF)
- # reparse lsn to catch possible garbage
- lsn = start_lsn.split('/')
- command += "%X/%X" % (int(lsn[0], 16), int(lsn[1], 16))
+ command += lsn
if timeline != 0:
if slot_type == REPLICATION_LOGICAL:
raise psycopg2.ProgrammingError("cannot specify timeline for logical replication")
- if timeline < 0:
- raise psycopg2.ProgrammingError("timeline must be >= 0: %d" % timeline)
-
command += " TIMELINE %d" % timeline
if options:
if slot_type == REPLICATION_PHYSICAL:
- raise psycopg2.ProgrammingError("cannot specify plugin options for physical replication")
+ raise psycopg2.ProgrammingError("cannot specify output plugin options for physical replication")
command += " ("
for k,v in options.iteritems():