diff options
author | Oleksandr Shulgin <oleksandr.shulgin@zalando.de> | 2015-06-10 09:06:08 +0200 |
---|---|---|
committer | Oleksandr Shulgin <oleksandr.shulgin@zalando.de> | 2015-06-30 10:38:18 +0200 |
commit | 61e52ce8793472ff1348ab93ccdeb682a1e7b3df (patch) | |
tree | 4ddf3fd948a0b250af06a02c4eea760d1f8c1b18 /lib/extras.py | |
parent | 9ed90b1216828351ccbd9e9e28951bf7933fb1b3 (diff) | |
download | psycopg2-61e52ce8793472ff1348ab93ccdeb682a1e7b3df.tar.gz |
Rework replication protocol
This change exposes lower level functions for operating the
(logical) replication protocol, while keeping the high-level
start_replication function that does all the job for you in
case of a synchronous connection.
A number of other changes and fixes are put into this commit.
Diffstat (limited to 'lib/extras.py')
-rw-r--r-- | lib/extras.py | 36 |
1 files changed, 19 insertions, 17 deletions
diff --git a/lib/extras.py b/lib/extras.py index 2f32bf1..85debc6 100644 --- a/lib/extras.py +++ b/lib/extras.py @@ -471,7 +471,8 @@ class ReplicationConnection(_connection): super(ReplicationConnection, self).__init__(*args, **kwargs) # prevent auto-issued BEGIN statements - self.autocommit = True + if not self.async: + self.autocommit = True def cursor(self, *args, **kwargs): kwargs.setdefault('cursor_factory', ReplicationCursor) @@ -503,18 +504,18 @@ class ReplicationCursor(_cursor): if slot_type == REPLICATION_LOGICAL: if output_plugin is None: - raise RuntimeError("output_plugin is required for logical replication slot") + raise psycopg2.ProgrammingError("output_plugin is required for logical replication slot") command += "LOGICAL %s" % self.quote_ident(output_plugin) elif slot_type == REPLICATION_PHYSICAL: if output_plugin is not None: - raise RuntimeError("output_plugin is not applicable to physical replication") + raise psycopg2.ProgrammingError("output_plugin is not applicable to physical replication") command += "PHYSICAL" else: - raise RuntimeError("unrecognized replication slot type") + raise psycopg2.ProgrammingError("unrecognized replication slot type") self.execute(command) @@ -524,17 +525,14 @@ class ReplicationCursor(_cursor): command = "DROP_REPLICATION_SLOT %s" % self.quote_ident(slot_name) self.execute(command) - def start_replication(self, o, slot_type, slot_name=None, start_lsn=None, + def start_replication(self, slot_type, slot_name=None, writer=None, start_lsn=None, timeline=0, keepalive_interval=10, options=None): """Start and consume replication stream.""" - if keepalive_interval <= 0: - raise RuntimeError("keepalive_interval must be > 0: %d" % keepalive_interval) - command = "START_REPLICATION " if slot_type == REPLICATION_LOGICAL and slot_name is None: - raise RuntimeError("slot_name is required for logical replication slot") + raise psycopg2.ProgrammingError("slot_name is required for logical replication slot") if slot_name: command += "SLOT %s " % self.quote_ident(slot_name) @@ -544,7 +542,7 @@ class ReplicationCursor(_cursor): elif slot_type == REPLICATION_PHYSICAL: command += "PHYSICAL " else: - raise RuntimeError("unrecognized replication slot type") + raise psycopg2.ProgrammingError("unrecognized replication slot type") if start_lsn is None: start_lsn = '0/0' @@ -555,16 +553,16 @@ class ReplicationCursor(_cursor): if timeline != 0: if slot_type == REPLICATION_LOGICAL: - raise RuntimeError("cannot specify timeline for logical replication") + raise psycopg2.ProgrammingError("cannot specify timeline for logical replication") if timeline < 0: - raise RuntimeError("timeline must be >= 0: %d" % timeline) + raise psycopg2.ProgrammingError("timeline must be >= 0: %d" % timeline) command += " TIMELINE %d" % timeline if options: if slot_type == REPLICATION_PHYSICAL: - raise RuntimeError("cannot specify plugin options for physical replication") + raise psycopg2.ProgrammingError("cannot specify plugin options for physical replication") command += " (" for k,v in options.iteritems(): @@ -573,11 +571,15 @@ class ReplicationCursor(_cursor): command += "%s %s" % (self.quote_ident(k), _A(str(v))) command += ")" - return self.start_replication_expert(o, command, keepalive_interval) + return self.start_replication_expert(command, writer=writer, + keepalive_interval=keepalive_interval) + + def send_feedback_message(self, written_lsn=0, sync_lsn=0, apply_lsn=0, reply_requested=False): + return self.send_replication_feedback(written_lsn, sync_lsn, apply_lsn, reply_requested) - # thin wrapper - def sync_server(self, msg): - return self.replication_sync_server(msg) + # allows replication cursors to be used in select.select() directly + def fileno(self): + return self.connection.fileno() # a dbtype and adapter for Python UUID type |