diff options
| author | Oleksandr Shulgin <oleksandr.shulgin@zalando.de> | 2015-10-23 11:31:55 +0200 |
|---|---|---|
| committer | Oleksandr Shulgin <oleksandr.shulgin@zalando.de> | 2015-10-23 11:31:55 +0200 |
| commit | e69dafbeccf4a1ff096759bd531fd771955592da (patch) | |
| tree | 498e6e9ec8103ea89dde52c0c93c6f077494c604 | |
| parent | 76c7f4a0b5f3ff69499239917fb0aec8b0da6adf (diff) | |
| download | psycopg2-e69dafbeccf4a1ff096759bd531fd771955592da.tar.gz | |
Move the `decode` parameter to `start_replication()`.
It makes more sense this way, because otherwise it must be passed to every call
of `read_message()`.
| -rw-r--r-- | doc/src/extras.rst | 75 | ||||
| -rw-r--r-- | lib/extras.py | 4 | ||||
| -rw-r--r-- | psycopg/pqpath.c | 12 | ||||
| -rw-r--r-- | psycopg/pqpath.h | 4 | ||||
| -rw-r--r-- | psycopg/replication_cursor.h | 7 | ||||
| -rw-r--r-- | psycopg/replication_cursor_type.c | 41 |
6 files changed, 72 insertions, 71 deletions
diff --git a/doc/src/extras.rst b/doc/src/extras.rst index 2a7bed2..7df68a7 100644 --- a/doc/src/extras.rst +++ b/doc/src/extras.rst @@ -185,10 +185,10 @@ replication:: connection_factory=psycopg2.extras.LogicalReplicationConnection) cur = conn.cursor() try: - cur.start_replication(slot_name='pytest') + cur.start_replication(slot_name='pytest', decode=True) # test_decoding produces textual output except psycopg2.ProgrammingError: cur.create_replication_slot('pytest', output_plugin='test_decoding') - cur.start_replication(slot_name='pytest') + cur.start_replication(slot_name='pytest', decode=True) class DemoConsumer(object): def __call__(self, msg): @@ -260,9 +260,12 @@ The individual messages in the replication stream are represented by .. attribute:: payload - The actual data received from the server. An instance of either - ``str`` or ``unicode``, depending on the method that was used to - produce this message. + The actual data received from the server. + + An instance of either `bytes()` or `unicode()`, depending on the value + of `decode` option passed to `ReplicationCursor.start_replication()` + on the connection. See `ReplicationCursor.read_message()` for + details. .. attribute:: data_size @@ -336,7 +339,7 @@ The individual messages in the replication stream are represented by Replication slots are a feature of PostgreSQL server starting with version 9.4. - .. method:: start_replication(slot_name=None, slot_type=None, start_lsn=0, timeline=0, options=None) + .. method:: start_replication(slot_name=None, slot_type=None, start_lsn=0, timeline=0, options=None, decode=False) Start replication on the connection. @@ -352,6 +355,8 @@ The individual messages in the replication stream are represented by can only be used with physical replication) :param options: a dictionary of options to pass to logical replication slot (not allowed with physical replication) + :param decode: a flag indicating that unicode conversion should be + performed on messages received from the server If a *slot_name* is specified, the slot must exist on the server and its type must match the replication type used. @@ -387,6 +392,11 @@ The individual messages in the replication stream are represented by on the output plugin that was used to create the slot. Must be `!None` for physical replication. + If *decode* is set to `!True` the messages received from the server + would be converted according to the connection `~connection.encoding`. + *This parameter should not be set with physical replication or with + logical replication plugins that produce binary output.* + This function constructs a ``START_REPLICATION`` command and calls `start_replication_expert()` internally. @@ -395,43 +405,40 @@ The individual messages in the replication stream are represented by `read_message()` in case of :ref:`asynchronous connection <async-support>`. - .. method:: start_replication_expert(command) + .. method:: start_replication_expert(command, decode=False) - Start replication on the connection using provided ``START_REPLICATION`` - command. + Start replication on the connection using provided + ``START_REPLICATION`` command. See `start_replication()` for + description of *decode* parameter. - .. method:: consume_stream(consume, decode=False, keepalive_interval=10) + .. method:: consume_stream(consume, keepalive_interval=10) :param consume: a callable object with signature ``consume(msg)`` - :param decode: a flag indicating that unicode conversion should be - performed on the messages received from the server :param keepalive_interval: interval (in seconds) to send keepalive messages to the server This method can only be used with synchronous connection. For asynchronous connections see `read_message()`. - Before calling this method to consume the stream use + Before using this method to consume the stream call `start_replication()` first. This method enters an endless loop reading messages from the server - and passing them to ``consume()``, then waiting for more messages from - the server. In order to make this method break out of the loop and - return, ``consume()`` can throw a `StopReplication` exception. Any - unhandled exception will make it break out of the loop as well. + and passing them to ``consume()`` one at a time, then waiting for more + messages from the server. In order to make this method break out of + the loop and return, ``consume()`` can throw a `StopReplication` + exception. Any unhandled exception will make it break out of the loop + as well. - If *decode* is set to `!True` the messages read from the server are - converted according to the connection `~connection.encoding`. This - parameter should not be set with physical replication. + The *msg* object passed to ``consume()`` is an instance of + `ReplicationMessage` class. See `read_message()` for details about + message decoding. This method also sends keepalive messages to the server in case there were no new data from the server for the duration of *keepalive_interval* (in seconds). The value of this parameter must be set to at least 1 second, but it can have a fractional part. - The *msg* objects passed to ``consume()`` are instances of - `ReplicationMessage` class. - After processing certain amount of messages the client should send a confirmation message to the server. This should be done by calling `send_feedback()` method on the corresponding replication cursor. A @@ -452,7 +459,7 @@ The individual messages in the replication stream are represented by msg.cursor.send_feedback(flush_lsn=msg.data_start) consumer = LogicalStreamConsumer() - cur.consume_stream(consumer, decode=True) + cur.consume_stream(consumer) .. warning:: @@ -510,17 +517,21 @@ The individual messages in the replication stream are represented by for better control, in particular to `~select` on multiple sockets. The following methods are provided for asynchronous operation: - .. method:: read_message(decode=True) + .. method:: read_message() - :param decode: a flag indicating that unicode conversion should be - performed on the data received from the server + Try to read the next message from the server without blocking and + return an instance of `ReplicationMessage` or `!None`, in case there + are no more data messages from the server at the moment. This method should be used in a loop with asynchronous connections - after calling `start_replication()` once. - - It tries to read the next message from the server without blocking and - returns an instance of `ReplicationMessage` or `!None`, in case there - are no more data messages from the server at the moment. + (after calling `start_replication()` once). For synchronous + connections see `consume_stream()`. + + The returned message's `ReplicationMessage.payload` is an instance of + `unicode()` decoded according to connection `connection.encoding` + *iff* `decode` was set to `!True` in the initial call to + `start_replication()` on this connection, otherwise it is an instance + of `bytes()` with no decoding. It is expected that the calling code will call this method repeatedly in order to consume all of the messages that might have been buffered diff --git a/lib/extras.py b/lib/extras.py index 8e1373c..8a8d34f 100644 --- a/lib/extras.py +++ b/lib/extras.py @@ -548,7 +548,7 @@ class ReplicationCursor(_replicationCursor): self.execute(command) def start_replication(self, slot_name=None, slot_type=None, start_lsn=0, - timeline=0, options=None): + timeline=0, options=None, decode=False): """Start replication stream.""" command = "START_REPLICATION " @@ -597,7 +597,7 @@ class ReplicationCursor(_replicationCursor): command += "%s %s" % (quote_ident(k, self), _A(str(v))) command += ")" - self.start_replication_expert(command) + self.start_replication_expert(command, decode=decode) # allows replication cursors to be used in select.select() directly def fileno(self): diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index 30a3d39..424ed90 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -1543,7 +1543,7 @@ exit: are never returned to the caller. */ PyObject * -pq_read_replication_message(replicationCursorObject *repl, int decode) +pq_read_replication_message(replicationCursorObject *repl) { cursorObject *curs = &repl->cur; connectionObject *conn = curs->conn; @@ -1555,7 +1555,7 @@ pq_read_replication_message(replicationCursorObject *repl, int decode) PyObject *str = NULL, *result = NULL; replicationMessageObject *msg = NULL; - Dprintf("pq_read_replication_message(decode=%d)", decode); + Dprintf("pq_read_replication_message"); consumed = 0; retry: @@ -1629,8 +1629,7 @@ retry: Dprintf("pq_read_replication_message: >>%.*s<<", data_size, buffer + hdr); - /* XXX it would be wise to check if it's really a logical replication */ - if (decode) { + if (repl->decode) { str = PyUnicode_Decode(buffer + hdr, data_size, conn->codec, NULL); } else { str = Bytes_FromStringAndSize(buffer + hdr, data_size); @@ -1730,8 +1729,7 @@ pq_send_replication_feedback(replicationCursorObject *repl, int reply_requested) manages to send keepalive messages to the server as needed. */ int -pq_copy_both(replicationCursorObject *repl, PyObject *consume, int decode, - double keepalive_interval) +pq_copy_both(replicationCursorObject *repl, PyObject *consume, double keepalive_interval) { cursorObject *curs = &repl->cur; connectionObject *conn = curs->conn; @@ -1752,7 +1750,7 @@ pq_copy_both(replicationCursorObject *repl, PyObject *consume, int decode, keep_intr.tv_usec = (keepalive_interval - keep_intr.tv_sec)*1.0e6; while (1) { - msg = pq_read_replication_message(repl, decode); + msg = pq_read_replication_message(repl); if (!msg) { goto exit; } diff --git a/psycopg/pqpath.h b/psycopg/pqpath.h index 568f076..1348d9c 100644 --- a/psycopg/pqpath.h +++ b/psycopg/pqpath.h @@ -75,8 +75,8 @@ RAISES HIDDEN void pq_complete_error(connectionObject *conn, PGresult **pgres, /* replication protocol support */ HIDDEN int pq_copy_both(replicationCursorObject *repl, PyObject *consumer, - int decode, double keepalive_interval); -HIDDEN PyObject *pq_read_replication_message(replicationCursorObject *repl, int decode); + double keepalive_interval); +HIDDEN PyObject *pq_read_replication_message(replicationCursorObject *repl); HIDDEN int pq_send_replication_feedback(replicationCursorObject *repl, int reply_requested); #endif /* !defined(PSYCOPG_PQPATH_H) */ diff --git a/psycopg/replication_cursor.h b/psycopg/replication_cursor.h index 1b6dbfa..07bf7b5 100644 --- a/psycopg/replication_cursor.h +++ b/psycopg/replication_cursor.h @@ -38,10 +38,11 @@ extern HIDDEN PyTypeObject replicationCursorType; typedef struct replicationCursorObject { cursorObject cur; - int started:1; /* if replication is started */ - int consuming:1; /* if running the consume loop */ + int started:1; /* if replication is started */ + int consuming:1; /* if running the consume loop */ + int decode:1; /* if we should use character decoding on the messages */ - struct timeval last_io; /* timestamp of the last exchange with the server */ + struct timeval last_io ; /* timestamp of the last exchange with the server */ struct timeval keepalive_interval; /* interval for keepalive messages in replication mode */ XLogRecPtr write_lsn; /* LSN stats for replication feedback messages */ diff --git a/psycopg/replication_cursor_type.c b/psycopg/replication_cursor_type.c index d1f7939..1fd5ea3 100644 --- a/psycopg/replication_cursor_type.c +++ b/psycopg/replication_cursor_type.c @@ -38,8 +38,8 @@ #include "datetime.h" -#define psyco_repl_curs_start_replication_expert_doc \ -"start_replication_expert(command, writer=None, keepalive_interval=10) -- Start replication stream with a directly given command." +#define psyco_repl_curs_start_replication_expert_doc \ +"start_replication_expert(command, decode=False) -- Start replication with a given command." static PyObject * psyco_repl_curs_start_replication_expert(replicationCursorObject *self, @@ -49,9 +49,10 @@ psyco_repl_curs_start_replication_expert(replicationCursorObject *self, connectionObject *conn = self->cur.conn; PyObject *res = NULL; char *command; - static char *kwlist[] = {"command", NULL}; + long int decode = 0; + static char *kwlist[] = {"command", "decode", NULL}; - if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s", kwlist, &command)) { + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|l", kwlist, &command, &decode)) { return NULL; } @@ -60,17 +61,15 @@ psyco_repl_curs_start_replication_expert(replicationCursorObject *self, EXC_IF_TPC_PREPARED(conn, start_replication_expert); EXC_IF_REPLICATING(self, start_replication_expert); - Dprintf("psyco_repl_curs_start_replication_expert: %s", command); - - /* self->copysize = 0;*/ - - gettimeofday(&self->last_io, NULL); + Dprintf("psyco_repl_curs_start_replication_expert: '%s'; decode: %d", command, decode); if (pq_execute(curs, command, conn->async, 1 /* no_result */, 1 /* no_begin */) >= 0) { res = Py_None; Py_INCREF(res); self->started = 1; + self->decode = decode; + gettimeofday(&self->last_io, NULL); } return res; @@ -85,12 +84,11 @@ psyco_repl_curs_consume_stream(replicationCursorObject *self, { cursorObject *curs = &self->cur; PyObject *consume = NULL, *res = NULL; - int decode = 0; double keepalive_interval = 10; - static char *kwlist[] = {"consume", "decode", "keepalive_interval", NULL}; + static char *kwlist[] = {"consume", "keepalive_interval", NULL}; if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|id", kwlist, - &consume, &decode, &keepalive_interval)) { + &consume, &keepalive_interval)) { return NULL; } @@ -115,7 +113,7 @@ psyco_repl_curs_consume_stream(replicationCursorObject *self, self->consuming = 1; - if (pq_copy_both(self, consume, decode, keepalive_interval) >= 0) { + if (pq_copy_both(self, consume, keepalive_interval) >= 0) { res = Py_None; Py_INCREF(res); } @@ -126,27 +124,19 @@ psyco_repl_curs_consume_stream(replicationCursorObject *self, } #define psyco_repl_curs_read_message_doc \ -"read_message(decode=True) -- Try reading a replication message from the server (non-blocking)." +"read_message() -- Try reading a replication message from the server (non-blocking)." static PyObject * -psyco_repl_curs_read_message(replicationCursorObject *self, - PyObject *args, PyObject *kwargs) +psyco_repl_curs_read_message(replicationCursorObject *self) { cursorObject *curs = &self->cur; - int decode = 1; - static char *kwlist[] = {"decode", NULL}; EXC_IF_CURS_CLOSED(curs); EXC_IF_GREEN(read_message); EXC_IF_TPC_PREPARED(self->cur.conn, read_message); EXC_IF_NOT_REPLICATING(self, read_message); - if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|i", kwlist, - &decode)) { - return NULL; - } - - return pq_read_replication_message(self, decode); + return pq_read_replication_message(self); } static PyObject * @@ -267,7 +257,7 @@ static struct PyMethodDef replicationCursorObject_methods[] = { {"consume_stream", (PyCFunction)psyco_repl_curs_consume_stream, METH_VARARGS|METH_KEYWORDS, psyco_repl_curs_consume_stream_doc}, {"read_message", (PyCFunction)psyco_repl_curs_read_message, - METH_VARARGS|METH_KEYWORDS, psyco_repl_curs_read_message_doc}, + METH_NOARGS, psyco_repl_curs_read_message_doc}, {"send_feedback", (PyCFunction)psyco_repl_curs_send_feedback, METH_VARARGS|METH_KEYWORDS, psyco_repl_curs_send_feedback_doc}, {"flush_feedback", (PyCFunction)psyco_repl_curs_flush_feedback, @@ -289,6 +279,7 @@ replicationCursor_setup(replicationCursorObject* self) { self->started = 0; self->consuming = 0; + self->decode = 0; self->write_lsn = InvalidXLogRecPtr; self->flush_lsn = InvalidXLogRecPtr; |
