summaryrefslogtreecommitdiff
path: root/lib/extras.py
diff options
context:
space:
mode:
authorOleksandr Shulgin <oleksandr.shulgin@zalando.de>2015-06-10 09:06:08 +0200
committerOleksandr Shulgin <oleksandr.shulgin@zalando.de>2015-06-30 10:38:18 +0200
commit61e52ce8793472ff1348ab93ccdeb682a1e7b3df (patch)
tree4ddf3fd948a0b250af06a02c4eea760d1f8c1b18 /lib/extras.py
parent9ed90b1216828351ccbd9e9e28951bf7933fb1b3 (diff)
downloadpsycopg2-61e52ce8793472ff1348ab93ccdeb682a1e7b3df.tar.gz
Rework replication protocol
This change exposes lower level functions for operating the (logical) replication protocol, while keeping the high-level start_replication function that does all the job for you in case of a synchronous connection. A number of other changes and fixes are put into this commit.
Diffstat (limited to 'lib/extras.py')
-rw-r--r--lib/extras.py36
1 files changed, 19 insertions, 17 deletions
diff --git a/lib/extras.py b/lib/extras.py
index 2f32bf1..85debc6 100644
--- a/lib/extras.py
+++ b/lib/extras.py
@@ -471,7 +471,8 @@ class ReplicationConnection(_connection):
super(ReplicationConnection, self).__init__(*args, **kwargs)
# prevent auto-issued BEGIN statements
- self.autocommit = True
+ if not self.async:
+ self.autocommit = True
def cursor(self, *args, **kwargs):
kwargs.setdefault('cursor_factory', ReplicationCursor)
@@ -503,18 +504,18 @@ class ReplicationCursor(_cursor):
if slot_type == REPLICATION_LOGICAL:
if output_plugin is None:
- raise RuntimeError("output_plugin is required for logical replication slot")
+ raise psycopg2.ProgrammingError("output_plugin is required for logical replication slot")
command += "LOGICAL %s" % self.quote_ident(output_plugin)
elif slot_type == REPLICATION_PHYSICAL:
if output_plugin is not None:
- raise RuntimeError("output_plugin is not applicable to physical replication")
+ raise psycopg2.ProgrammingError("output_plugin is not applicable to physical replication")
command += "PHYSICAL"
else:
- raise RuntimeError("unrecognized replication slot type")
+ raise psycopg2.ProgrammingError("unrecognized replication slot type")
self.execute(command)
@@ -524,17 +525,14 @@ class ReplicationCursor(_cursor):
command = "DROP_REPLICATION_SLOT %s" % self.quote_ident(slot_name)
self.execute(command)
- def start_replication(self, o, slot_type, slot_name=None, start_lsn=None,
+ def start_replication(self, slot_type, slot_name=None, writer=None, start_lsn=None,
timeline=0, keepalive_interval=10, options=None):
"""Start and consume replication stream."""
- if keepalive_interval <= 0:
- raise RuntimeError("keepalive_interval must be > 0: %d" % keepalive_interval)
-
command = "START_REPLICATION "
if slot_type == REPLICATION_LOGICAL and slot_name is None:
- raise RuntimeError("slot_name is required for logical replication slot")
+ raise psycopg2.ProgrammingError("slot_name is required for logical replication slot")
if slot_name:
command += "SLOT %s " % self.quote_ident(slot_name)
@@ -544,7 +542,7 @@ class ReplicationCursor(_cursor):
elif slot_type == REPLICATION_PHYSICAL:
command += "PHYSICAL "
else:
- raise RuntimeError("unrecognized replication slot type")
+ raise psycopg2.ProgrammingError("unrecognized replication slot type")
if start_lsn is None:
start_lsn = '0/0'
@@ -555,16 +553,16 @@ class ReplicationCursor(_cursor):
if timeline != 0:
if slot_type == REPLICATION_LOGICAL:
- raise RuntimeError("cannot specify timeline for logical replication")
+ raise psycopg2.ProgrammingError("cannot specify timeline for logical replication")
if timeline < 0:
- raise RuntimeError("timeline must be >= 0: %d" % timeline)
+ raise psycopg2.ProgrammingError("timeline must be >= 0: %d" % timeline)
command += " TIMELINE %d" % timeline
if options:
if slot_type == REPLICATION_PHYSICAL:
- raise RuntimeError("cannot specify plugin options for physical replication")
+ raise psycopg2.ProgrammingError("cannot specify plugin options for physical replication")
command += " ("
for k,v in options.iteritems():
@@ -573,11 +571,15 @@ class ReplicationCursor(_cursor):
command += "%s %s" % (self.quote_ident(k), _A(str(v)))
command += ")"
- return self.start_replication_expert(o, command, keepalive_interval)
+ return self.start_replication_expert(command, writer=writer,
+ keepalive_interval=keepalive_interval)
+
+ def send_feedback_message(self, written_lsn=0, sync_lsn=0, apply_lsn=0, reply_requested=False):
+ return self.send_replication_feedback(written_lsn, sync_lsn, apply_lsn, reply_requested)
- # thin wrapper
- def sync_server(self, msg):
- return self.replication_sync_server(msg)
+ # allows replication cursors to be used in select.select() directly
+ def fileno(self):
+ return self.connection.fileno()
# a dbtype and adapter for Python UUID type