diff options
-rw-r--r-- | psycopg/libpq_support.c | 7 | ||||
-rw-r--r-- | psycopg/libpq_support.h | 7 | ||||
-rw-r--r-- | psycopg/pqpath.c | 8 | ||||
-rw-r--r-- | psycopg/psycopgmodule.c | 1 | ||||
-rw-r--r-- | psycopg/replication_message.h | 4 | ||||
-rw-r--r-- | psycopg/replication_message_type.c | 49 |
6 files changed, 62 insertions, 14 deletions
diff --git a/psycopg/libpq_support.c b/psycopg/libpq_support.c index 95a3ebc..160c849 100644 --- a/psycopg/libpq_support.c +++ b/psycopg/libpq_support.c @@ -41,13 +41,6 @@ /* support routines taken from pg_basebackup/streamutil.c */ -/* Julian-date equivalents of Day 0 in Unix and Postgres reckoning */ -#define UNIX_EPOCH_JDATE 2440588 /* == date2j(1970, 1, 1) */ -#define POSTGRES_EPOCH_JDATE 2451545 /* == date2j(2000, 1, 1) */ - -#define SECS_PER_DAY 86400 -#define USECS_PER_SEC 1000000LL - /* * Frontend version of GetCurrentTimestamp(), since we are not linked with * backend code. The protocol always uses integer timestamps, regardless of diff --git a/psycopg/libpq_support.h b/psycopg/libpq_support.h index e597d24..ab35fef 100644 --- a/psycopg/libpq_support.h +++ b/psycopg/libpq_support.h @@ -37,6 +37,13 @@ typedef unsigned PG_INT64_TYPE XLogRecPtr; #define XLOGFMTSTR "%x/%x" #define XLOGFMTARGS(x) ((uint32)((x) >> 32)), ((uint32)((x) & 0xFFFFFFFF)) +/* Julian-date equivalents of Day 0 in Unix and Postgres reckoning */ +#define UNIX_EPOCH_JDATE 2440588 /* == date2j(1970, 1, 1) */ +#define POSTGRES_EPOCH_JDATE 2451545 /* == date2j(2000, 1, 1) */ + +#define SECS_PER_DAY 86400 +#define USECS_PER_SEC 1000000LL + HIDDEN pg_int64 feGetCurrentTimestamp(void); HIDDEN void fe_sendint64(pg_int64 i, char *buf); HIDDEN pg_int64 fe_recvint64(char *buf); diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index 4ae6297..7a3ec19 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -1572,6 +1572,7 @@ _pq_copy_both_v3(cursorObject *curs) XLogRecPtr written_lsn = InvalidXLogRecPtr, fsync_lsn = InvalidXLogRecPtr, data_start, wal_end; + pg_int64 send_time; if (!curs->copyfile) { PyErr_SetString(ProgrammingError, @@ -1669,10 +1670,10 @@ _pq_copy_both_v3(cursorObject *curs) data_start = fe_recvint64(buffer + 1); wal_end = fe_recvint64(buffer + 1 + 8); - /*send_time = fe_recvint64(buffer + 1 + 8 + 8);*/ + send_time = fe_recvint64(buffer + 1 + 8 + 8); - Dprintf("_pq_copy_both_v3: data_start="XLOGFMTSTR", wal_end="XLOGFMTSTR, - XLOGFMTARGS(data_start), XLOGFMTARGS(wal_end)); + Dprintf("_pq_copy_both_v3: data_start="XLOGFMTSTR", wal_end="XLOGFMTSTR", send_time=%lld", + XLOGFMTARGS(data_start), XLOGFMTARGS(wal_end), send_time); if (is_text) { obj = PyUnicode_Decode(buffer + hdr, len - hdr, curs->conn->codec, NULL); @@ -1690,6 +1691,7 @@ _pq_copy_both_v3(cursorObject *curs) msg->data_start = data_start; msg->wal_end = wal_end; + msg->send_time = send_time; tmp = PyObject_CallFunctionObjArgs(write_func, msg, NULL); diff --git a/psycopg/psycopgmodule.c b/psycopg/psycopgmodule.c index 67393c3..27af211 100644 --- a/psycopg/psycopgmodule.c +++ b/psycopg/psycopgmodule.c @@ -869,6 +869,7 @@ INIT_MODULE(_psycopg)(void) /* Initialize the PyDateTimeAPI everywhere is used */ PyDateTime_IMPORT; if (psyco_adapter_datetime_init()) { goto exit; } + if (psyco_replmsg_datetime_init()) { goto exit; } Py_TYPE(&pydatetimeType) = &PyType_Type; if (PyType_Ready(&pydatetimeType) == -1) goto exit; diff --git a/psycopg/replication_message.h b/psycopg/replication_message.h index bf2b5f1..b03d1c4 100644 --- a/psycopg/replication_message.h +++ b/psycopg/replication_message.h @@ -42,9 +42,11 @@ struct replicationMessageObject { XLogRecPtr data_start; XLogRecPtr wal_end; - /* send_time */ + pg_int64 send_time; }; +RAISES_NEG int psyco_replmsg_datetime_init(void); + #ifdef __cplusplus } #endif diff --git a/psycopg/replication_message_type.c b/psycopg/replication_message_type.c index 6968955..5d15ca6 100644 --- a/psycopg/replication_message_type.c +++ b/psycopg/replication_message_type.c @@ -27,14 +27,31 @@ #include "psycopg/psycopg.h" #include "psycopg/replication_message.h" +#include "psycopg/libpq_support.h" + +#include "datetime.h" + +RAISES_NEG int +psyco_replmsg_datetime_init(void) +{ + Dprintf("psyco_replmsg_datetime_init: datetime init"); + + PyDateTime_IMPORT; + + if (!PyDateTimeAPI) { + PyErr_SetString(PyExc_ImportError, "datetime initialization failed"); + return -1; + } + return 0; +} static PyObject * replmsg_repr(replicationMessageObject *self) { return PyString_FromFormat( - "<replicationMessage object at %p; data_start: "XLOGFMTSTR"; wal_end: "XLOGFMTSTR">", - self, XLOGFMTARGS(self->data_start), XLOGFMTARGS(self->wal_end)); + "<replicationMessage object at %p; data_start: "XLOGFMTSTR"; wal_end: "XLOGFMTSTR"; send_time: %lld>", + self, XLOGFMTARGS(self->data_start), XLOGFMTARGS(self->wal_end), self->send_time); } static int @@ -65,6 +82,26 @@ replmsg_dealloc(PyObject* obj) replmsg_clear(obj); } +#define psyco_replmsg_send_time_doc \ +"send_time - Timestamp of the replication message departure from the server." + +static PyObject * +psyco_replmsg_get_send_time(replicationMessageObject *self) +{ + PyObject *tval, *res = NULL; + double t; + + t = (double)self->send_time / USECS_PER_SEC + + ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY); + + tval = Py_BuildValue("(d)", t); + if (tval) { + res = PyDateTime_FromTimestamp(tval); + Py_DECREF(tval); + } + + return res; +} #define OFFSETOF(x) offsetof(replicationMessageObject, x) @@ -80,6 +117,12 @@ static struct PyMemberDef replicationMessageObject_members[] = { {NULL} }; +static struct PyGetSetDef replicationMessageObject_getsets[] = { + { "send_time", (getter)psyco_replmsg_get_send_time, NULL, + psyco_replmsg_send_time_doc, NULL }, + {NULL} +}; + /* object type */ #define replicationMessageType_doc \ @@ -115,7 +158,7 @@ PyTypeObject replicationMessageType = { 0, /*tp_iternext*/ 0, /*tp_methods*/ replicationMessageObject_members, /*tp_members*/ - 0, /*tp_getset*/ + replicationMessageObject_getsets, /*tp_getset*/ 0, /*tp_base*/ 0, /*tp_dict*/ 0, /*tp_descr_get*/ |