summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOleksandr Shulgin <oleksandr.shulgin@zalando.de>2015-10-23 11:31:55 +0200
committerOleksandr Shulgin <oleksandr.shulgin@zalando.de>2015-10-23 11:31:55 +0200
commite69dafbeccf4a1ff096759bd531fd771955592da (patch)
tree498e6e9ec8103ea89dde52c0c93c6f077494c604
parent76c7f4a0b5f3ff69499239917fb0aec8b0da6adf (diff)
downloadpsycopg2-e69dafbeccf4a1ff096759bd531fd771955592da.tar.gz
Move the `decode` parameter to `start_replication()`.
It makes more sense this way, because otherwise it must be passed to every call of `read_message()`.
-rw-r--r--doc/src/extras.rst75
-rw-r--r--lib/extras.py4
-rw-r--r--psycopg/pqpath.c12
-rw-r--r--psycopg/pqpath.h4
-rw-r--r--psycopg/replication_cursor.h7
-rw-r--r--psycopg/replication_cursor_type.c41
6 files changed, 72 insertions, 71 deletions
diff --git a/doc/src/extras.rst b/doc/src/extras.rst
index 2a7bed2..7df68a7 100644
--- a/doc/src/extras.rst
+++ b/doc/src/extras.rst
@@ -185,10 +185,10 @@ replication::
connection_factory=psycopg2.extras.LogicalReplicationConnection)
cur = conn.cursor()
try:
- cur.start_replication(slot_name='pytest')
+ cur.start_replication(slot_name='pytest', decode=True) # test_decoding produces textual output
except psycopg2.ProgrammingError:
cur.create_replication_slot('pytest', output_plugin='test_decoding')
- cur.start_replication(slot_name='pytest')
+ cur.start_replication(slot_name='pytest', decode=True)
class DemoConsumer(object):
def __call__(self, msg):
@@ -260,9 +260,12 @@ The individual messages in the replication stream are represented by
.. attribute:: payload
- The actual data received from the server. An instance of either
- ``str`` or ``unicode``, depending on the method that was used to
- produce this message.
+ The actual data received from the server.
+
+ An instance of either `bytes()` or `unicode()`, depending on the value
+ of `decode` option passed to `ReplicationCursor.start_replication()`
+ on the connection. See `ReplicationCursor.read_message()` for
+ details.
.. attribute:: data_size
@@ -336,7 +339,7 @@ The individual messages in the replication stream are represented by
Replication slots are a feature of PostgreSQL server starting with
version 9.4.
- .. method:: start_replication(slot_name=None, slot_type=None, start_lsn=0, timeline=0, options=None)
+ .. method:: start_replication(slot_name=None, slot_type=None, start_lsn=0, timeline=0, options=None, decode=False)
Start replication on the connection.
@@ -352,6 +355,8 @@ The individual messages in the replication stream are represented by
can only be used with physical replication)
:param options: a dictionary of options to pass to logical replication
slot (not allowed with physical replication)
+ :param decode: a flag indicating that unicode conversion should be
+ performed on messages received from the server
If a *slot_name* is specified, the slot must exist on the server and
its type must match the replication type used.
@@ -387,6 +392,11 @@ The individual messages in the replication stream are represented by
on the output plugin that was used to create the slot. Must be
`!None` for physical replication.
+ If *decode* is set to `!True` the messages received from the server
+ would be converted according to the connection `~connection.encoding`.
+ *This parameter should not be set with physical replication or with
+ logical replication plugins that produce binary output.*
+
This function constructs a ``START_REPLICATION`` command and calls
`start_replication_expert()` internally.
@@ -395,43 +405,40 @@ The individual messages in the replication stream are represented by
`read_message()` in case of :ref:`asynchronous connection
<async-support>`.
- .. method:: start_replication_expert(command)
+ .. method:: start_replication_expert(command, decode=False)
- Start replication on the connection using provided ``START_REPLICATION``
- command.
+ Start replication on the connection using provided
+ ``START_REPLICATION`` command. See `start_replication()` for
+ description of *decode* parameter.
- .. method:: consume_stream(consume, decode=False, keepalive_interval=10)
+ .. method:: consume_stream(consume, keepalive_interval=10)
:param consume: a callable object with signature ``consume(msg)``
- :param decode: a flag indicating that unicode conversion should be
- performed on the messages received from the server
:param keepalive_interval: interval (in seconds) to send keepalive
messages to the server
This method can only be used with synchronous connection. For
asynchronous connections see `read_message()`.
- Before calling this method to consume the stream use
+ Before using this method to consume the stream call
`start_replication()` first.
This method enters an endless loop reading messages from the server
- and passing them to ``consume()``, then waiting for more messages from
- the server. In order to make this method break out of the loop and
- return, ``consume()`` can throw a `StopReplication` exception. Any
- unhandled exception will make it break out of the loop as well.
+ and passing them to ``consume()`` one at a time, then waiting for more
+ messages from the server. In order to make this method break out of
+ the loop and return, ``consume()`` can throw a `StopReplication`
+ exception. Any unhandled exception will make it break out of the loop
+ as well.
- If *decode* is set to `!True` the messages read from the server are
- converted according to the connection `~connection.encoding`. This
- parameter should not be set with physical replication.
+ The *msg* object passed to ``consume()`` is an instance of
+ `ReplicationMessage` class. See `read_message()` for details about
+ message decoding.
This method also sends keepalive messages to the server in case there
were no new data from the server for the duration of
*keepalive_interval* (in seconds). The value of this parameter must
be set to at least 1 second, but it can have a fractional part.
- The *msg* objects passed to ``consume()`` are instances of
- `ReplicationMessage` class.
-
After processing certain amount of messages the client should send a
confirmation message to the server. This should be done by calling
`send_feedback()` method on the corresponding replication cursor. A
@@ -452,7 +459,7 @@ The individual messages in the replication stream are represented by
msg.cursor.send_feedback(flush_lsn=msg.data_start)
consumer = LogicalStreamConsumer()
- cur.consume_stream(consumer, decode=True)
+ cur.consume_stream(consumer)
.. warning::
@@ -510,17 +517,21 @@ The individual messages in the replication stream are represented by
for better control, in particular to `~select` on multiple sockets. The
following methods are provided for asynchronous operation:
- .. method:: read_message(decode=True)
+ .. method:: read_message()
- :param decode: a flag indicating that unicode conversion should be
- performed on the data received from the server
+ Try to read the next message from the server without blocking and
+ return an instance of `ReplicationMessage` or `!None`, in case there
+ are no more data messages from the server at the moment.
This method should be used in a loop with asynchronous connections
- after calling `start_replication()` once.
-
- It tries to read the next message from the server without blocking and
- returns an instance of `ReplicationMessage` or `!None`, in case there
- are no more data messages from the server at the moment.
+ (after calling `start_replication()` once). For synchronous
+ connections see `consume_stream()`.
+
+ The returned message's `ReplicationMessage.payload` is an instance of
+ `unicode()` decoded according to connection `connection.encoding`
+ *iff* `decode` was set to `!True` in the initial call to
+ `start_replication()` on this connection, otherwise it is an instance
+ of `bytes()` with no decoding.
It is expected that the calling code will call this method repeatedly
in order to consume all of the messages that might have been buffered
diff --git a/lib/extras.py b/lib/extras.py
index 8e1373c..8a8d34f 100644
--- a/lib/extras.py
+++ b/lib/extras.py
@@ -548,7 +548,7 @@ class ReplicationCursor(_replicationCursor):
self.execute(command)
def start_replication(self, slot_name=None, slot_type=None, start_lsn=0,
- timeline=0, options=None):
+ timeline=0, options=None, decode=False):
"""Start replication stream."""
command = "START_REPLICATION "
@@ -597,7 +597,7 @@ class ReplicationCursor(_replicationCursor):
command += "%s %s" % (quote_ident(k, self), _A(str(v)))
command += ")"
- self.start_replication_expert(command)
+ self.start_replication_expert(command, decode=decode)
# allows replication cursors to be used in select.select() directly
def fileno(self):
diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c
index 30a3d39..424ed90 100644
--- a/psycopg/pqpath.c
+++ b/psycopg/pqpath.c
@@ -1543,7 +1543,7 @@ exit:
are never returned to the caller.
*/
PyObject *
-pq_read_replication_message(replicationCursorObject *repl, int decode)
+pq_read_replication_message(replicationCursorObject *repl)
{
cursorObject *curs = &repl->cur;
connectionObject *conn = curs->conn;
@@ -1555,7 +1555,7 @@ pq_read_replication_message(replicationCursorObject *repl, int decode)
PyObject *str = NULL, *result = NULL;
replicationMessageObject *msg = NULL;
- Dprintf("pq_read_replication_message(decode=%d)", decode);
+ Dprintf("pq_read_replication_message");
consumed = 0;
retry:
@@ -1629,8 +1629,7 @@ retry:
Dprintf("pq_read_replication_message: >>%.*s<<", data_size, buffer + hdr);
- /* XXX it would be wise to check if it's really a logical replication */
- if (decode) {
+ if (repl->decode) {
str = PyUnicode_Decode(buffer + hdr, data_size, conn->codec, NULL);
} else {
str = Bytes_FromStringAndSize(buffer + hdr, data_size);
@@ -1730,8 +1729,7 @@ pq_send_replication_feedback(replicationCursorObject *repl, int reply_requested)
manages to send keepalive messages to the server as needed.
*/
int
-pq_copy_both(replicationCursorObject *repl, PyObject *consume, int decode,
- double keepalive_interval)
+pq_copy_both(replicationCursorObject *repl, PyObject *consume, double keepalive_interval)
{
cursorObject *curs = &repl->cur;
connectionObject *conn = curs->conn;
@@ -1752,7 +1750,7 @@ pq_copy_both(replicationCursorObject *repl, PyObject *consume, int decode,
keep_intr.tv_usec = (keepalive_interval - keep_intr.tv_sec)*1.0e6;
while (1) {
- msg = pq_read_replication_message(repl, decode);
+ msg = pq_read_replication_message(repl);
if (!msg) {
goto exit;
}
diff --git a/psycopg/pqpath.h b/psycopg/pqpath.h
index 568f076..1348d9c 100644
--- a/psycopg/pqpath.h
+++ b/psycopg/pqpath.h
@@ -75,8 +75,8 @@ RAISES HIDDEN void pq_complete_error(connectionObject *conn, PGresult **pgres,
/* replication protocol support */
HIDDEN int pq_copy_both(replicationCursorObject *repl, PyObject *consumer,
- int decode, double keepalive_interval);
-HIDDEN PyObject *pq_read_replication_message(replicationCursorObject *repl, int decode);
+ double keepalive_interval);
+HIDDEN PyObject *pq_read_replication_message(replicationCursorObject *repl);
HIDDEN int pq_send_replication_feedback(replicationCursorObject *repl, int reply_requested);
#endif /* !defined(PSYCOPG_PQPATH_H) */
diff --git a/psycopg/replication_cursor.h b/psycopg/replication_cursor.h
index 1b6dbfa..07bf7b5 100644
--- a/psycopg/replication_cursor.h
+++ b/psycopg/replication_cursor.h
@@ -38,10 +38,11 @@ extern HIDDEN PyTypeObject replicationCursorType;
typedef struct replicationCursorObject {
cursorObject cur;
- int started:1; /* if replication is started */
- int consuming:1; /* if running the consume loop */
+ int started:1; /* if replication is started */
+ int consuming:1; /* if running the consume loop */
+ int decode:1; /* if we should use character decoding on the messages */
- struct timeval last_io; /* timestamp of the last exchange with the server */
+ struct timeval last_io ; /* timestamp of the last exchange with the server */
struct timeval keepalive_interval; /* interval for keepalive messages in replication mode */
XLogRecPtr write_lsn; /* LSN stats for replication feedback messages */
diff --git a/psycopg/replication_cursor_type.c b/psycopg/replication_cursor_type.c
index d1f7939..1fd5ea3 100644
--- a/psycopg/replication_cursor_type.c
+++ b/psycopg/replication_cursor_type.c
@@ -38,8 +38,8 @@
#include "datetime.h"
-#define psyco_repl_curs_start_replication_expert_doc \
-"start_replication_expert(command, writer=None, keepalive_interval=10) -- Start replication stream with a directly given command."
+#define psyco_repl_curs_start_replication_expert_doc \
+"start_replication_expert(command, decode=False) -- Start replication with a given command."
static PyObject *
psyco_repl_curs_start_replication_expert(replicationCursorObject *self,
@@ -49,9 +49,10 @@ psyco_repl_curs_start_replication_expert(replicationCursorObject *self,
connectionObject *conn = self->cur.conn;
PyObject *res = NULL;
char *command;
- static char *kwlist[] = {"command", NULL};
+ long int decode = 0;
+ static char *kwlist[] = {"command", "decode", NULL};
- if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s", kwlist, &command)) {
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|l", kwlist, &command, &decode)) {
return NULL;
}
@@ -60,17 +61,15 @@ psyco_repl_curs_start_replication_expert(replicationCursorObject *self,
EXC_IF_TPC_PREPARED(conn, start_replication_expert);
EXC_IF_REPLICATING(self, start_replication_expert);
- Dprintf("psyco_repl_curs_start_replication_expert: %s", command);
-
- /* self->copysize = 0;*/
-
- gettimeofday(&self->last_io, NULL);
+ Dprintf("psyco_repl_curs_start_replication_expert: '%s'; decode: %d", command, decode);
if (pq_execute(curs, command, conn->async, 1 /* no_result */, 1 /* no_begin */) >= 0) {
res = Py_None;
Py_INCREF(res);
self->started = 1;
+ self->decode = decode;
+ gettimeofday(&self->last_io, NULL);
}
return res;
@@ -85,12 +84,11 @@ psyco_repl_curs_consume_stream(replicationCursorObject *self,
{
cursorObject *curs = &self->cur;
PyObject *consume = NULL, *res = NULL;
- int decode = 0;
double keepalive_interval = 10;
- static char *kwlist[] = {"consume", "decode", "keepalive_interval", NULL};
+ static char *kwlist[] = {"consume", "keepalive_interval", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|id", kwlist,
- &consume, &decode, &keepalive_interval)) {
+ &consume, &keepalive_interval)) {
return NULL;
}
@@ -115,7 +113,7 @@ psyco_repl_curs_consume_stream(replicationCursorObject *self,
self->consuming = 1;
- if (pq_copy_both(self, consume, decode, keepalive_interval) >= 0) {
+ if (pq_copy_both(self, consume, keepalive_interval) >= 0) {
res = Py_None;
Py_INCREF(res);
}
@@ -126,27 +124,19 @@ psyco_repl_curs_consume_stream(replicationCursorObject *self,
}
#define psyco_repl_curs_read_message_doc \
-"read_message(decode=True) -- Try reading a replication message from the server (non-blocking)."
+"read_message() -- Try reading a replication message from the server (non-blocking)."
static PyObject *
-psyco_repl_curs_read_message(replicationCursorObject *self,
- PyObject *args, PyObject *kwargs)
+psyco_repl_curs_read_message(replicationCursorObject *self)
{
cursorObject *curs = &self->cur;
- int decode = 1;
- static char *kwlist[] = {"decode", NULL};
EXC_IF_CURS_CLOSED(curs);
EXC_IF_GREEN(read_message);
EXC_IF_TPC_PREPARED(self->cur.conn, read_message);
EXC_IF_NOT_REPLICATING(self, read_message);
- if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|i", kwlist,
- &decode)) {
- return NULL;
- }
-
- return pq_read_replication_message(self, decode);
+ return pq_read_replication_message(self);
}
static PyObject *
@@ -267,7 +257,7 @@ static struct PyMethodDef replicationCursorObject_methods[] = {
{"consume_stream", (PyCFunction)psyco_repl_curs_consume_stream,
METH_VARARGS|METH_KEYWORDS, psyco_repl_curs_consume_stream_doc},
{"read_message", (PyCFunction)psyco_repl_curs_read_message,
- METH_VARARGS|METH_KEYWORDS, psyco_repl_curs_read_message_doc},
+ METH_NOARGS, psyco_repl_curs_read_message_doc},
{"send_feedback", (PyCFunction)psyco_repl_curs_send_feedback,
METH_VARARGS|METH_KEYWORDS, psyco_repl_curs_send_feedback_doc},
{"flush_feedback", (PyCFunction)psyco_repl_curs_flush_feedback,
@@ -289,6 +279,7 @@ replicationCursor_setup(replicationCursorObject* self)
{
self->started = 0;
self->consuming = 0;
+ self->decode = 0;
self->write_lsn = InvalidXLogRecPtr;
self->flush_lsn = InvalidXLogRecPtr;