summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--psycopg/libpq_support.c7
-rw-r--r--psycopg/libpq_support.h7
-rw-r--r--psycopg/pqpath.c8
-rw-r--r--psycopg/psycopgmodule.c1
-rw-r--r--psycopg/replication_message.h4
-rw-r--r--psycopg/replication_message_type.c49
6 files changed, 62 insertions, 14 deletions
diff --git a/psycopg/libpq_support.c b/psycopg/libpq_support.c
index 95a3ebc..160c849 100644
--- a/psycopg/libpq_support.c
+++ b/psycopg/libpq_support.c
@@ -41,13 +41,6 @@
/* support routines taken from pg_basebackup/streamutil.c */
-/* Julian-date equivalents of Day 0 in Unix and Postgres reckoning */
-#define UNIX_EPOCH_JDATE 2440588 /* == date2j(1970, 1, 1) */
-#define POSTGRES_EPOCH_JDATE 2451545 /* == date2j(2000, 1, 1) */
-
-#define SECS_PER_DAY 86400
-#define USECS_PER_SEC 1000000LL
-
/*
* Frontend version of GetCurrentTimestamp(), since we are not linked with
* backend code. The protocol always uses integer timestamps, regardless of
diff --git a/psycopg/libpq_support.h b/psycopg/libpq_support.h
index e597d24..ab35fef 100644
--- a/psycopg/libpq_support.h
+++ b/psycopg/libpq_support.h
@@ -37,6 +37,13 @@ typedef unsigned PG_INT64_TYPE XLogRecPtr;
#define XLOGFMTSTR "%x/%x"
#define XLOGFMTARGS(x) ((uint32)((x) >> 32)), ((uint32)((x) & 0xFFFFFFFF))
+/* Julian-date equivalents of Day 0 in Unix and Postgres reckoning */
+#define UNIX_EPOCH_JDATE 2440588 /* == date2j(1970, 1, 1) */
+#define POSTGRES_EPOCH_JDATE 2451545 /* == date2j(2000, 1, 1) */
+
+#define SECS_PER_DAY 86400
+#define USECS_PER_SEC 1000000LL
+
HIDDEN pg_int64 feGetCurrentTimestamp(void);
HIDDEN void fe_sendint64(pg_int64 i, char *buf);
HIDDEN pg_int64 fe_recvint64(char *buf);
diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c
index 4ae6297..7a3ec19 100644
--- a/psycopg/pqpath.c
+++ b/psycopg/pqpath.c
@@ -1572,6 +1572,7 @@ _pq_copy_both_v3(cursorObject *curs)
XLogRecPtr written_lsn = InvalidXLogRecPtr,
fsync_lsn = InvalidXLogRecPtr,
data_start, wal_end;
+ pg_int64 send_time;
if (!curs->copyfile) {
PyErr_SetString(ProgrammingError,
@@ -1669,10 +1670,10 @@ _pq_copy_both_v3(cursorObject *curs)
data_start = fe_recvint64(buffer + 1);
wal_end = fe_recvint64(buffer + 1 + 8);
- /*send_time = fe_recvint64(buffer + 1 + 8 + 8);*/
+ send_time = fe_recvint64(buffer + 1 + 8 + 8);
- Dprintf("_pq_copy_both_v3: data_start="XLOGFMTSTR", wal_end="XLOGFMTSTR,
- XLOGFMTARGS(data_start), XLOGFMTARGS(wal_end));
+ Dprintf("_pq_copy_both_v3: data_start="XLOGFMTSTR", wal_end="XLOGFMTSTR", send_time=%lld",
+ XLOGFMTARGS(data_start), XLOGFMTARGS(wal_end), send_time);
if (is_text) {
obj = PyUnicode_Decode(buffer + hdr, len - hdr, curs->conn->codec, NULL);
@@ -1690,6 +1691,7 @@ _pq_copy_both_v3(cursorObject *curs)
msg->data_start = data_start;
msg->wal_end = wal_end;
+ msg->send_time = send_time;
tmp = PyObject_CallFunctionObjArgs(write_func, msg, NULL);
diff --git a/psycopg/psycopgmodule.c b/psycopg/psycopgmodule.c
index 67393c3..27af211 100644
--- a/psycopg/psycopgmodule.c
+++ b/psycopg/psycopgmodule.c
@@ -869,6 +869,7 @@ INIT_MODULE(_psycopg)(void)
/* Initialize the PyDateTimeAPI everywhere is used */
PyDateTime_IMPORT;
if (psyco_adapter_datetime_init()) { goto exit; }
+ if (psyco_replmsg_datetime_init()) { goto exit; }
Py_TYPE(&pydatetimeType) = &PyType_Type;
if (PyType_Ready(&pydatetimeType) == -1) goto exit;
diff --git a/psycopg/replication_message.h b/psycopg/replication_message.h
index bf2b5f1..b03d1c4 100644
--- a/psycopg/replication_message.h
+++ b/psycopg/replication_message.h
@@ -42,9 +42,11 @@ struct replicationMessageObject {
XLogRecPtr data_start;
XLogRecPtr wal_end;
- /* send_time */
+ pg_int64 send_time;
};
+RAISES_NEG int psyco_replmsg_datetime_init(void);
+
#ifdef __cplusplus
}
#endif
diff --git a/psycopg/replication_message_type.c b/psycopg/replication_message_type.c
index 6968955..5d15ca6 100644
--- a/psycopg/replication_message_type.c
+++ b/psycopg/replication_message_type.c
@@ -27,14 +27,31 @@
#include "psycopg/psycopg.h"
#include "psycopg/replication_message.h"
+#include "psycopg/libpq_support.h"
+
+#include "datetime.h"
+
+RAISES_NEG int
+psyco_replmsg_datetime_init(void)
+{
+ Dprintf("psyco_replmsg_datetime_init: datetime init");
+
+ PyDateTime_IMPORT;
+
+ if (!PyDateTimeAPI) {
+ PyErr_SetString(PyExc_ImportError, "datetime initialization failed");
+ return -1;
+ }
+ return 0;
+}
static PyObject *
replmsg_repr(replicationMessageObject *self)
{
return PyString_FromFormat(
- "<replicationMessage object at %p; data_start: "XLOGFMTSTR"; wal_end: "XLOGFMTSTR">",
- self, XLOGFMTARGS(self->data_start), XLOGFMTARGS(self->wal_end));
+ "<replicationMessage object at %p; data_start: "XLOGFMTSTR"; wal_end: "XLOGFMTSTR"; send_time: %lld>",
+ self, XLOGFMTARGS(self->data_start), XLOGFMTARGS(self->wal_end), self->send_time);
}
static int
@@ -65,6 +82,26 @@ replmsg_dealloc(PyObject* obj)
replmsg_clear(obj);
}
+#define psyco_replmsg_send_time_doc \
+"send_time - Timestamp of the replication message departure from the server."
+
+static PyObject *
+psyco_replmsg_get_send_time(replicationMessageObject *self)
+{
+ PyObject *tval, *res = NULL;
+ double t;
+
+ t = (double)self->send_time / USECS_PER_SEC +
+ ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
+
+ tval = Py_BuildValue("(d)", t);
+ if (tval) {
+ res = PyDateTime_FromTimestamp(tval);
+ Py_DECREF(tval);
+ }
+
+ return res;
+}
#define OFFSETOF(x) offsetof(replicationMessageObject, x)
@@ -80,6 +117,12 @@ static struct PyMemberDef replicationMessageObject_members[] = {
{NULL}
};
+static struct PyGetSetDef replicationMessageObject_getsets[] = {
+ { "send_time", (getter)psyco_replmsg_get_send_time, NULL,
+ psyco_replmsg_send_time_doc, NULL },
+ {NULL}
+};
+
/* object type */
#define replicationMessageType_doc \
@@ -115,7 +158,7 @@ PyTypeObject replicationMessageType = {
0, /*tp_iternext*/
0, /*tp_methods*/
replicationMessageObject_members, /*tp_members*/
- 0, /*tp_getset*/
+ replicationMessageObject_getsets, /*tp_getset*/
0, /*tp_base*/
0, /*tp_dict*/
0, /*tp_descr_get*/