summaryrefslogtreecommitdiff
path: root/src/interfaces/libpq/fe-exec.c
diff options
context:
space:
mode:
authorAlvaro Herrera <alvherre@alvh.no-ip.org>2021-03-15 18:13:42 -0300
committerAlvaro Herrera <alvherre@alvh.no-ip.org>2021-03-15 18:13:42 -0300
commitacb7e4eb6b1c614c68a62fb3a6a5bba1af0a2659 (patch)
treeff5dccb6a8372d0373a442841d8df4333a234eaa /src/interfaces/libpq/fe-exec.c
parent146cb3889c3ccb3fce198fe7464a1296a9e107c3 (diff)
downloadpostgresql-acb7e4eb6b1c614c68a62fb3a6a5bba1af0a2659.tar.gz
Implement pipeline mode in libpq
Pipeline mode in libpq lets an application avoid the Sync messages in the FE/BE protocol that are implicit in the old libpq API after each query. The application can then insert Sync at its leisure with a new libpq function PQpipelineSync. This can lead to substantial reductions in query latency. Co-authored-by: Craig Ringer <craig.ringer@enterprisedb.com> Co-authored-by: Matthieu Garrigues <matthieu.garrigues@gmail.com> Co-authored-by: Álvaro Herrera <alvherre@alvh.no-ip.org> Reviewed-by: Andres Freund <andres@anarazel.de> Reviewed-by: Aya Iwata <iwata.aya@jp.fujitsu.com> Reviewed-by: Daniel Vérité <daniel@manitou-mail.org> Reviewed-by: David G. Johnston <david.g.johnston@gmail.com> Reviewed-by: Justin Pryzby <pryzby@telsasoft.com> Reviewed-by: Kirk Jamison <k.jamison@fujitsu.com> Reviewed-by: Michael Paquier <michael.paquier@gmail.com> Reviewed-by: Nikhil Sontakke <nikhils@2ndquadrant.com> Reviewed-by: Vaishnavi Prabakaran <VaishnaviP@fast.au.fujitsu.com> Reviewed-by: Zhihong Yu <zyu@yugabyte.com> Discussion: https://postgr.es/m/CAMsr+YFUjJytRyV4J-16bEoiZyH=4nj+sQ7JP9ajwz=B4dMMZw@mail.gmail.com Discussion: https://postgr.es/m/CAJkzx4T5E-2cQe3dtv2R78dYFvz+in8PY7A8MArvLhs_pg75gg@mail.gmail.com
Diffstat (limited to 'src/interfaces/libpq/fe-exec.c')
-rw-r--r--src/interfaces/libpq/fe-exec.c717
1 files changed, 636 insertions, 81 deletions
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 9a038043b2..f3443708a6 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -39,7 +39,9 @@ char *const pgresStatus[] = {
"PGRES_NONFATAL_ERROR",
"PGRES_FATAL_ERROR",
"PGRES_COPY_BOTH",
- "PGRES_SINGLE_TUPLE"
+ "PGRES_SINGLE_TUPLE",
+ "PGRES_PIPELINE_SYNC",
+ "PGRES_PIPELINE_ABORTED"
};
/*
@@ -71,6 +73,8 @@ static PGresult *PQexecFinish(PGconn *conn);
static int PQsendDescribe(PGconn *conn, char desc_type,
const char *desc_target);
static int check_field_number(const PGresult *res, int field_num);
+static void pqPipelineProcessQueue(PGconn *conn);
+static int pqPipelineFlush(PGconn *conn);
/* ----------------
@@ -1171,7 +1175,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
conn->next_result = conn->result;
conn->result = res;
/* And mark the result ready to return */
- conn->asyncStatus = PGASYNC_READY;
+ conn->asyncStatus = PGASYNC_READY_MORE;
}
return 1;
@@ -1185,6 +1189,87 @@ fail:
/*
+ * pqAllocCmdQueueEntry
+ * Get a command queue entry for caller to fill.
+ *
+ * If the recycle queue has a free element, that is returned; if not, a
+ * fresh one is allocated. Caller is responsible for adding it to the
+ * command queue (pqAppendCmdQueueEntry) once the struct is filled in, or
+ * releasing the memory (pqRecycleCmdQueueEntry) if an error occurs.
+ *
+ * If allocation fails, sets the error message and returns NULL.
+ */
+static PGcmdQueueEntry *
+pqAllocCmdQueueEntry(PGconn *conn)
+{
+ PGcmdQueueEntry *entry;
+
+ if (conn->cmd_queue_recycle == NULL)
+ {
+ entry = (PGcmdQueueEntry *) malloc(sizeof(PGcmdQueueEntry));
+ if (entry == NULL)
+ {
+ appendPQExpBufferStr(&conn->errorMessage,
+ libpq_gettext("out of memory\n"));
+ return NULL;
+ }
+ }
+ else
+ {
+ entry = conn->cmd_queue_recycle;
+ conn->cmd_queue_recycle = entry->next;
+ }
+ entry->next = NULL;
+ entry->query = NULL;
+
+ return entry;
+}
+
+/*
+ * pqAppendCmdQueueEntry
+ * Append a caller-allocated command queue entry to the queue.
+ *
+ * The query itself must already have been put in the output buffer by the
+ * caller.
+ */
+static void
+pqAppendCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry)
+{
+ Assert(entry->next == NULL);
+
+ if (conn->cmd_queue_head == NULL)
+ conn->cmd_queue_head = entry;
+ else
+ conn->cmd_queue_tail->next = entry;
+
+ conn->cmd_queue_tail = entry;
+}
+
+/*
+ * pqRecycleCmdQueueEntry
+ * Push a command queue entry onto the freelist.
+ */
+static void
+pqRecycleCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry)
+{
+ if (entry == NULL)
+ return;
+
+ /* recyclable entries should not have a follow-on command */
+ Assert(entry->next == NULL);
+
+ if (entry->query)
+ {
+ free(entry->query);
+ entry->query = NULL;
+ }
+
+ entry->next = conn->cmd_queue_recycle;
+ conn->cmd_queue_recycle = entry;
+}
+
+
+/*
* PQsendQuery
* Submit a query, but don't wait for it to finish
*
@@ -1209,9 +1294,15 @@ PQsendQueryContinue(PGconn *conn, const char *query)
static int
PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
{
+ PGcmdQueueEntry *entry = NULL;
+
if (!PQsendQueryStart(conn, newQuery))
return 0;
+ entry = pqAllocCmdQueueEntry(conn);
+ if (entry == NULL)
+ return 0; /* error msg already set */
+
/* check the argument */
if (!query)
{
@@ -1220,37 +1311,75 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
return 0;
}
- /* construct the outgoing Query message */
- if (pqPutMsgStart('Q', conn) < 0 ||
- pqPuts(query, conn) < 0 ||
- pqPutMsgEnd(conn) < 0)
+ /* Send the query message(s) */
+ if (conn->pipelineStatus == PQ_PIPELINE_OFF)
{
- /* error message should be set up already */
- return 0;
- }
+ /* construct the outgoing Query message */
+ if (pqPutMsgStart('Q', conn) < 0 ||
+ pqPuts(query, conn) < 0 ||
+ pqPutMsgEnd(conn) < 0)
+ {
+ /* error message should be set up already */
+ return 0;
+ }
- /* remember we are using simple query protocol */
- conn->queryclass = PGQUERY_SIMPLE;
+ /* remember we are using simple query protocol */
+ entry->queryclass = PGQUERY_SIMPLE;
+ /* and remember the query text too, if possible */
+ entry->query = strdup(query);
+ }
+ else
+ {
+ /*
+ * In pipeline mode we cannot use the simple protocol, so we send
+ * Parse, Bind, Describe Portal, Execute.
+ */
+ if (pqPutMsgStart('P', conn) < 0 ||
+ pqPuts("", conn) < 0 ||
+ pqPuts(query, conn) < 0 ||
+ pqPutInt(0, 2, conn) < 0 ||
+ pqPutMsgEnd(conn) < 0)
+ goto sendFailed;
+ if (pqPutMsgStart('B', conn) < 0 ||
+ pqPuts("", conn) < 0 ||
+ pqPuts("", conn) < 0 ||
+ pqPutInt(0, 2, conn) < 0 ||
+ pqPutInt(0, 2, conn) < 0 ||
+ pqPutInt(0, 2, conn) < 0 ||
+ pqPutMsgEnd(conn) < 0)
+ goto sendFailed;
+ if (pqPutMsgStart('D', conn) < 0 ||
+ pqPutc('P', conn) < 0 ||
+ pqPuts("", conn) < 0 ||
+ pqPutMsgEnd(conn) < 0)
+ goto sendFailed;
+ if (pqPutMsgStart('E', conn) < 0 ||
+ pqPuts("", conn) < 0 ||
+ pqPutInt(0, 4, conn) < 0 ||
+ pqPutMsgEnd(conn) < 0)
+ goto sendFailed;
- /* and remember the query text too, if possible */
- /* if insufficient memory, last_query just winds up NULL */
- if (conn->last_query)
- free(conn->last_query);
- conn->last_query = strdup(query);
+ entry->queryclass = PGQUERY_EXTENDED;
+ entry->query = strdup(query);
+ }
/*
* Give the data a push. In nonblock mode, don't complain if we're unable
* to send it all; PQgetResult() will do any additional flushing needed.
*/
- if (pqFlush(conn) < 0)
- {
- /* error message should be set up already */
- return 0;
- }
+ if (pqPipelineFlush(conn) < 0)
+ goto sendFailed;
/* OK, it's launched! */
- conn->asyncStatus = PGASYNC_BUSY;
+ pqAppendCmdQueueEntry(conn, entry);
+ if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+ conn->asyncStatus = PGASYNC_BUSY;
return 1;
+
+sendFailed:
+ pqRecycleCmdQueueEntry(conn, entry);
+ /* error message should be set up already */
+ return 0;
}
/*
@@ -1307,6 +1436,8 @@ PQsendPrepare(PGconn *conn,
const char *stmtName, const char *query,
int nParams, const Oid *paramTypes)
{
+ PGcmdQueueEntry *entry = NULL;
+
if (!PQsendQueryStart(conn, true))
return 0;
@@ -1330,6 +1461,10 @@ PQsendPrepare(PGconn *conn,
return 0;
}
+ entry = pqAllocCmdQueueEntry(conn);
+ if (entry == NULL)
+ return 0; /* error msg already set */
+
/* construct the Parse message */
if (pqPutMsgStart('P', conn) < 0 ||
pqPuts(stmtName, conn) < 0 ||
@@ -1356,32 +1491,38 @@ PQsendPrepare(PGconn *conn,
if (pqPutMsgEnd(conn) < 0)
goto sendFailed;
- /* construct the Sync message */
- if (pqPutMsgStart('S', conn) < 0 ||
- pqPutMsgEnd(conn) < 0)
- goto sendFailed;
+ /* Add a Sync, unless in pipeline mode. */
+ if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+ {
+ if (pqPutMsgStart('S', conn) < 0 ||
+ pqPutMsgEnd(conn) < 0)
+ goto sendFailed;
+ }
/* remember we are doing just a Parse */
- conn->queryclass = PGQUERY_PREPARE;
+ entry->queryclass = PGQUERY_PREPARE;
/* and remember the query text too, if possible */
- /* if insufficient memory, last_query just winds up NULL */
- if (conn->last_query)
- free(conn->last_query);
- conn->last_query = strdup(query);
+ /* if insufficient memory, query just winds up NULL */
+ entry->query = strdup(query);
+
+ pqAppendCmdQueueEntry(conn, entry);
+
+ if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+ conn->asyncStatus = PGASYNC_BUSY;
/*
- * Give the data a push. In nonblock mode, don't complain if we're unable
- * to send it all; PQgetResult() will do any additional flushing needed.
+ * Give the data a push (in pipeline mode, only if we're past the size
+ * threshold). In nonblock mode, don't complain if we're unable to send
+ * it all; PQgetResult() will do any additional flushing needed.
*/
- if (pqFlush(conn) < 0)
+ if (pqPipelineFlush(conn) < 0)
goto sendFailed;
- /* OK, it's launched! */
- conn->asyncStatus = PGASYNC_BUSY;
return 1;
sendFailed:
+ pqRecycleCmdQueueEntry(conn, entry);
/* error message should be set up already */
return 0;
}
@@ -1429,7 +1570,8 @@ PQsendQueryPrepared(PGconn *conn,
}
/*
- * Common startup code for PQsendQuery and sibling routines
+ * PQsendQueryStart
+ * Common startup code for PQsendQuery and sibling routines
*/
static bool
PQsendQueryStart(PGconn *conn, bool newQuery)
@@ -1450,20 +1592,57 @@ PQsendQueryStart(PGconn *conn, bool newQuery)
libpq_gettext("no connection to the server\n"));
return false;
}
- /* Can't send while already busy, either. */
- if (conn->asyncStatus != PGASYNC_IDLE)
+
+ /* Can't send while already busy, either, unless enqueuing for later */
+ if (conn->asyncStatus != PGASYNC_IDLE &&
+ conn->pipelineStatus == PQ_PIPELINE_OFF)
{
appendPQExpBufferStr(&conn->errorMessage,
libpq_gettext("another command is already in progress\n"));
return false;
}
- /* initialize async result-accumulation state */
- pqClearAsyncResult(conn);
+ if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+ {
+ /*
+ * When enqueuing commands we don't change much of the connection
+ * state since it's already in use for the current command. The
+ * connection state will get updated when pqPipelineProcessQueue()
+ * advances to start processing the queued message.
+ *
+ * Just make sure we can safely enqueue given the current connection
+ * state. We can enqueue behind another queue item, or behind a
+ * non-queue command (one that sends its own sync), but we can't
+ * enqueue if the connection is in a copy state.
+ */
+ switch (conn->asyncStatus)
+ {
+ case PGASYNC_IDLE:
+ case PGASYNC_READY:
+ case PGASYNC_READY_MORE:
+ case PGASYNC_BUSY:
+ /* ok to queue */
+ break;
+ case PGASYNC_COPY_IN:
+ case PGASYNC_COPY_OUT:
+ case PGASYNC_COPY_BOTH:
+ appendPQExpBufferStr(&conn->errorMessage,
+ libpq_gettext("cannot queue commands during COPY\n"));
+ return false;
+ }
+ }
+ else
+ {
+ /*
+ * This command's results will come in immediately. Initialize async
+ * result-accumulation state
+ */
+ pqClearAsyncResult(conn);
- /* reset single-row processing mode */
- conn->singleRowMode = false;
+ /* reset single-row processing mode */
+ conn->singleRowMode = false;
+ }
/* ready to send command message */
return true;
}
@@ -1487,10 +1666,16 @@ PQsendQueryGuts(PGconn *conn,
int resultFormat)
{
int i;
+ PGcmdQueueEntry *entry;
+
+ entry = pqAllocCmdQueueEntry(conn);
+ if (entry == NULL)
+ return 0; /* error msg already set */
/*
- * We will send Parse (if needed), Bind, Describe Portal, Execute, Sync,
- * using specified statement name and the unnamed portal.
+ * We will send Parse (if needed), Bind, Describe Portal, Execute, Sync
+ * (if not in pipeline mode), using specified statement name and the
+ * unnamed portal.
*/
if (command)
@@ -1600,35 +1785,38 @@ PQsendQueryGuts(PGconn *conn,
pqPutMsgEnd(conn) < 0)
goto sendFailed;
- /* construct the Sync message */
- if (pqPutMsgStart('S', conn) < 0 ||
- pqPutMsgEnd(conn) < 0)
- goto sendFailed;
+ /* construct the Sync message if not in pipeline mode */
+ if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+ {
+ if (pqPutMsgStart('S', conn) < 0 ||
+ pqPutMsgEnd(conn) < 0)
+ goto sendFailed;
+ }
/* remember we are using extended query protocol */
- conn->queryclass = PGQUERY_EXTENDED;
+ entry->queryclass = PGQUERY_EXTENDED;
/* and remember the query text too, if possible */
- /* if insufficient memory, last_query just winds up NULL */
- if (conn->last_query)
- free(conn->last_query);
+ /* if insufficient memory, query just winds up NULL */
if (command)
- conn->last_query = strdup(command);
- else
- conn->last_query = NULL;
+ entry->query = strdup(command);
/*
- * Give the data a push. In nonblock mode, don't complain if we're unable
- * to send it all; PQgetResult() will do any additional flushing needed.
+ * Give the data a push (in pipeline mode, only if we're past the size
+ * threshold). In nonblock mode, don't complain if we're unable to send
+ * it all; PQgetResult() will do any additional flushing needed.
*/
- if (pqFlush(conn) < 0)
+ if (pqPipelineFlush(conn) < 0)
goto sendFailed;
/* OK, it's launched! */
- conn->asyncStatus = PGASYNC_BUSY;
+ pqAppendCmdQueueEntry(conn, entry);
+ if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+ conn->asyncStatus = PGASYNC_BUSY;
return 1;
sendFailed:
+ pqRecycleCmdQueueEntry(conn, entry);
/* error message should be set up already */
return 0;
}
@@ -1647,8 +1835,9 @@ PQsetSingleRowMode(PGconn *conn)
return 0;
if (conn->asyncStatus != PGASYNC_BUSY)
return 0;
- if (conn->queryclass != PGQUERY_SIMPLE &&
- conn->queryclass != PGQUERY_EXTENDED)
+ if (!conn->cmd_queue_head ||
+ (conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE &&
+ conn->cmd_queue_head->queryclass != PGQUERY_EXTENDED))
return 0;
if (conn->result)
return 0;
@@ -1726,14 +1915,17 @@ PQisBusy(PGconn *conn)
return conn->asyncStatus == PGASYNC_BUSY || conn->write_failed;
}
-
/*
* PQgetResult
* Get the next PGresult produced by a query. Returns NULL if no
* query work remains or an error has occurred (e.g. out of
* memory).
+ *
+ * In pipeline mode, once all the result of a query have been returned,
+ * PQgetResult returns NULL to let the user know that the next
+ * query is being processed. At the end of the pipeline, returns a
+ * result with PQresultStatus(result) == PGRES_PIPELINE_SYNC.
*/
-
PGresult *
PQgetResult(PGconn *conn)
{
@@ -1803,8 +1995,62 @@ PQgetResult(PGconn *conn)
{
case PGASYNC_IDLE:
res = NULL; /* query is complete */
+ if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+ {
+ /*
+ * We're about to return the NULL that terminates the round of
+ * results from the current query; prepare to send the results
+ * of the next query when we're called next. Also, since this
+ * is the start of the results of the next query, clear any
+ * prior error message.
+ */
+ resetPQExpBuffer(&conn->errorMessage);
+ pqPipelineProcessQueue(conn);
+ }
break;
case PGASYNC_READY:
+
+ /*
+ * For any query type other than simple query protocol, we advance
+ * the command queue here. This is because for simple query
+ * protocol we can get the READY state multiple times before the
+ * command is actually complete, since the command string can
+ * contain many queries. In simple query protocol, the queue
+ * advance is done by fe-protocol3 when it receives ReadyForQuery.
+ */
+ if (conn->cmd_queue_head &&
+ conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE)
+ pqCommandQueueAdvance(conn);
+ res = pqPrepareAsyncResult(conn);
+ if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+ {
+ /*
+ * We're about to send the results of the current query. Set
+ * us idle now, and ...
+ */
+ conn->asyncStatus = PGASYNC_IDLE;
+
+ /*
+ * ... in cases when we're sending a pipeline-sync result,
+ * move queue processing forwards immediately, so that next
+ * time we're called, we're prepared to return the next result
+ * received from the server. In all other cases, leave the
+ * queue state change for next time, so that a terminating
+ * NULL result is sent.
+ *
+ * (In other words: we don't return a NULL after a pipeline
+ * sync.)
+ */
+ if (res && res->resultStatus == PGRES_PIPELINE_SYNC)
+ pqPipelineProcessQueue(conn);
+ }
+ else
+ {
+ /* Set the state back to BUSY, allowing parsing to proceed. */
+ conn->asyncStatus = PGASYNC_BUSY;
+ }
+ break;
+ case PGASYNC_READY_MORE:
res = pqPrepareAsyncResult(conn);
/* Set the state back to BUSY, allowing parsing to proceed. */
conn->asyncStatus = PGASYNC_BUSY;
@@ -1985,6 +2231,13 @@ PQexecStart(PGconn *conn)
if (!conn)
return false;
+ if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+ {
+ appendPQExpBufferStr(&conn->errorMessage,
+ libpq_gettext("synchronous command execution functions are not allowed in pipeline mode\n"));
+ return false;
+ }
+
/*
* Since this is the beginning of a query cycle, reset the error buffer.
*/
@@ -2148,6 +2401,8 @@ PQsendDescribePortal(PGconn *conn, const char *portal)
static int
PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
{
+ PGcmdQueueEntry *entry = NULL;
+
/* Treat null desc_target as empty string */
if (!desc_target)
desc_target = "";
@@ -2155,6 +2410,10 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
if (!PQsendQueryStart(conn, true))
return 0;
+ entry = pqAllocCmdQueueEntry(conn);
+ if (entry == NULL)
+ return 0; /* error msg already set */
+
/* construct the Describe message */
if (pqPutMsgStart('D', conn) < 0 ||
pqPutc(desc_type, conn) < 0 ||
@@ -2163,32 +2422,32 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
goto sendFailed;
/* construct the Sync message */
- if (pqPutMsgStart('S', conn) < 0 ||
- pqPutMsgEnd(conn) < 0)
- goto sendFailed;
-
- /* remember we are doing a Describe */
- conn->queryclass = PGQUERY_DESCRIBE;
-
- /* reset last_query string (not relevant now) */
- if (conn->last_query)
+ if (conn->pipelineStatus == PQ_PIPELINE_OFF)
{
- free(conn->last_query);
- conn->last_query = NULL;
+ if (pqPutMsgStart('S', conn) < 0 ||
+ pqPutMsgEnd(conn) < 0)
+ goto sendFailed;
}
+ /* remember we are doing a Describe */
+ entry->queryclass = PGQUERY_DESCRIBE;
+
/*
- * Give the data a push. In nonblock mode, don't complain if we're unable
- * to send it all; PQgetResult() will do any additional flushing needed.
+ * Give the data a push (in pipeline mode, only if we're past the size
+ * threshold). In nonblock mode, don't complain if we're unable to send
+ * it all; PQgetResult() will do any additional flushing needed.
*/
- if (pqFlush(conn) < 0)
+ if (pqPipelineFlush(conn) < 0)
goto sendFailed;
/* OK, it's launched! */
- conn->asyncStatus = PGASYNC_BUSY;
+ pqAppendCmdQueueEntry(conn, entry);
+ if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+ conn->asyncStatus = PGASYNC_BUSY;
return 1;
sendFailed:
+ pqRecycleCmdQueueEntry(conn, entry);
/* error message should be set up already */
return 0;
}
@@ -2327,7 +2586,8 @@ PQputCopyEnd(PGconn *conn, const char *errormsg)
* If we sent the COPY command in extended-query mode, we must issue a
* Sync as well.
*/
- if (conn->queryclass != PGQUERY_SIMPLE)
+ if (conn->cmd_queue_head &&
+ conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE)
{
if (pqPutMsgStart('S', conn) < 0 ||
pqPutMsgEnd(conn) < 0)
@@ -2541,6 +2801,13 @@ PQfn(PGconn *conn,
*/
resetPQExpBuffer(&conn->errorMessage);
+ if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+ {
+ appendPQExpBufferStr(&conn->errorMessage,
+ libpq_gettext("PQfn not allowed in pipeline mode\n"));
+ return NULL;
+ }
+
if (conn->sock == PGINVALID_SOCKET || conn->asyncStatus != PGASYNC_IDLE ||
conn->result != NULL)
{
@@ -2555,6 +2822,277 @@ PQfn(PGconn *conn,
args, nargs);
}
+/* ====== Pipeline mode support ======== */
+
+/*
+ * PQenterPipelineMode
+ * Put an idle connection in pipeline mode.
+ *
+ * Returns 1 on success. On failure, errorMessage is set and 0 is returned.
+ *
+ * Commands submitted after this can be pipelined on the connection;
+ * there's no requirement to wait for one to finish before the next is
+ * dispatched.
+ *
+ * Queuing of a new query or syncing during COPY is not allowed.
+ *
+ * A set of commands is terminated by a PQpipelineSync. Multiple sync
+ * points can be established while in pipeline mode. Pipeline mode can
+ * be exited by calling PQexitPipelineMode() once all results are processed.
+ *
+ * This doesn't actually send anything on the wire, it just puts libpq
+ * into a state where it can pipeline work.
+ */
+int
+PQenterPipelineMode(PGconn *conn)
+{
+ if (!conn)
+ return 0;
+
+ /* succeed with no action if already in pipeline mode */
+ if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+ return 1;
+
+ if (conn->asyncStatus != PGASYNC_IDLE)
+ {
+ appendPQExpBufferStr(&conn->errorMessage,
+ libpq_gettext("cannot enter pipeline mode, connection not idle\n"));
+ return 0;
+ }
+
+ conn->pipelineStatus = PQ_PIPELINE_ON;
+
+ return 1;
+}
+
+/*
+ * PQexitPipelineMode
+ * End pipeline mode and return to normal command mode.
+ *
+ * Returns 1 in success (pipeline mode successfully ended, or not in pipeline
+ * mode).
+ *
+ * Returns 0 if in pipeline mode and cannot be ended yet. Error message will
+ * be set.
+ */
+int
+PQexitPipelineMode(PGconn *conn)
+{
+ if (!conn)
+ return 0;
+
+ if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+ return 1;
+
+ switch (conn->asyncStatus)
+ {
+ case PGASYNC_READY:
+ case PGASYNC_READY_MORE:
+ /* there are some uncollected results */
+ appendPQExpBufferStr(&conn->errorMessage,
+ libpq_gettext("cannot exit pipeline mode with uncollected results\n"));
+ return 0;
+
+ case PGASYNC_BUSY:
+ appendPQExpBufferStr(&conn->errorMessage,
+ libpq_gettext("cannot exit pipeline mode while busy\n"));
+ return 0;
+
+ default:
+ /* OK */
+ break;
+ }
+
+ /* still work to process */
+ if (conn->cmd_queue_head != NULL)
+ {
+ appendPQExpBufferStr(&conn->errorMessage,
+ libpq_gettext("cannot exit pipeline mode with uncollected results\n"));
+ return 0;
+ }
+
+ conn->pipelineStatus = PQ_PIPELINE_OFF;
+ conn->asyncStatus = PGASYNC_IDLE;
+
+ /* Flush any pending data in out buffer */
+ if (pqFlush(conn) < 0)
+ return 0; /* error message is setup already */
+ return 1;
+}
+
+/*
+ * pqCommandQueueAdvance
+ * Remove one query from the command queue, when we receive
+ * all results from the server that pertain to it.
+ */
+void
+pqCommandQueueAdvance(PGconn *conn)
+{
+ PGcmdQueueEntry *prevquery;
+
+ if (conn->cmd_queue_head == NULL)
+ return;
+
+ /* delink from queue */
+ prevquery = conn->cmd_queue_head;
+ conn->cmd_queue_head = conn->cmd_queue_head->next;
+
+ /* and make it recyclable */
+ prevquery->next = NULL;
+ pqRecycleCmdQueueEntry(conn, prevquery);
+}
+
+/*
+ * pqPipelineProcessQueue: subroutine for PQgetResult
+ * In pipeline mode, start processing the results of the next query in the queue.
+ */
+void
+pqPipelineProcessQueue(PGconn *conn)
+{
+ switch (conn->asyncStatus)
+ {
+ case PGASYNC_COPY_IN:
+ case PGASYNC_COPY_OUT:
+ case PGASYNC_COPY_BOTH:
+ case PGASYNC_READY:
+ case PGASYNC_READY_MORE:
+ case PGASYNC_BUSY:
+ /* client still has to process current query or results */
+ return;
+ case PGASYNC_IDLE:
+ /* next query please */
+ break;
+ }
+
+ /* Nothing to do if not in pipeline mode, or queue is empty */
+ if (conn->pipelineStatus == PQ_PIPELINE_OFF ||
+ conn->cmd_queue_head == NULL)
+ return;
+
+ /* Initialize async result-accumulation state */
+ pqClearAsyncResult(conn);
+
+ /*
+ * Reset single-row processing mode. (Client has to set it up for each
+ * query, if desired.)
+ */
+ conn->singleRowMode = false;
+
+ if (conn->pipelineStatus == PQ_PIPELINE_ABORTED &&
+ conn->cmd_queue_head->queryclass != PGQUERY_SYNC)
+ {
+ /*
+ * In an aborted pipeline we don't get anything from the server for
+ * each result; we're just discarding commands from the queue until we
+ * get to the next sync from the server.
+ *
+ * The PGRES_PIPELINE_ABORTED results tell the client that its queries
+ * got aborted.
+ */
+ conn->result = PQmakeEmptyPGresult(conn, PGRES_PIPELINE_ABORTED);
+ if (!conn->result)
+ {
+ appendPQExpBufferStr(&conn->errorMessage,
+ libpq_gettext("out of memory\n"));
+ pqSaveErrorResult(conn);
+ return;
+ }
+ conn->asyncStatus = PGASYNC_READY;
+ }
+ else
+ {
+ /* allow parsing to continue */
+ conn->asyncStatus = PGASYNC_BUSY;
+ }
+}
+
+/*
+ * PQpipelineSync
+ * Send a Sync message as part of a pipeline, and flush to server
+ *
+ * It's legal to start submitting more commands in the pipeline immediately,
+ * without waiting for the results of the current pipeline. There's no need to
+ * end pipeline mode and start it again.
+ *
+ * If a command in a pipeline fails, every subsequent command up to and including
+ * the result to the Sync message sent by PQpipelineSync gets set to
+ * PGRES_PIPELINE_ABORTED state. If the whole pipeline is processed without
+ * error, a PGresult with PGRES_PIPELINE_SYNC is produced.
+ *
+ * Queries can already have been sent before PQpipelineSync is called, but
+ * PQpipelineSync need to be called before retrieving command results.
+ *
+ * The connection will remain in pipeline mode and unavailable for new
+ * synchronous command execution functions until all results from the pipeline
+ * are processed by the client.
+ */
+int
+PQpipelineSync(PGconn *conn)
+{
+ PGcmdQueueEntry *entry;
+
+ if (!conn)
+ return 0;
+
+ if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+ {
+ appendPQExpBufferStr(&conn->errorMessage,
+ libpq_gettext("cannot send pipeline when not in pipeline mode\n"));
+ return 0;
+ }
+
+ switch (conn->asyncStatus)
+ {
+ case PGASYNC_COPY_IN:
+ case PGASYNC_COPY_OUT:
+ case PGASYNC_COPY_BOTH:
+ /* should be unreachable */
+ appendPQExpBufferStr(&conn->errorMessage,
+ "internal error: cannot send pipeline while in COPY\n");
+ return 0;
+ case PGASYNC_READY:
+ case PGASYNC_READY_MORE:
+ case PGASYNC_BUSY:
+ case PGASYNC_IDLE:
+ /* OK to send sync */
+ break;
+ }
+
+ entry = pqAllocCmdQueueEntry(conn);
+ if (entry == NULL)
+ return 0; /* error msg already set */
+
+ entry->queryclass = PGQUERY_SYNC;
+ entry->query = NULL;
+
+ /* construct the Sync message */
+ if (pqPutMsgStart('S', conn) < 0 ||
+ pqPutMsgEnd(conn) < 0)
+ goto sendFailed;
+
+ pqAppendCmdQueueEntry(conn, entry);
+
+ /*
+ * Give the data a push. In nonblock mode, don't complain if we're unable
+ * to send it all; PQgetResult() will do any additional flushing needed.
+ */
+ if (PQflush(conn) < 0)
+ goto sendFailed;
+
+ /*
+ * Call pqPipelineProcessQueue so the user can call start calling
+ * PQgetResult.
+ */
+ pqPipelineProcessQueue(conn);
+
+ return 1;
+
+sendFailed:
+ pqRecycleCmdQueueEntry(conn, entry);
+ /* error message should be set up already */
+ return 0;
+}
+
/* ====== accessor funcs for PGresult ======== */
@@ -2569,7 +3107,7 @@ PQresultStatus(const PGresult *res)
char *
PQresStatus(ExecStatusType status)
{
- if ((unsigned int) status >= sizeof pgresStatus / sizeof pgresStatus[0])
+ if ((unsigned int) status >= lengthof(pgresStatus))
return libpq_gettext("invalid ExecStatusType code");
return pgresStatus[status];
}
@@ -3152,6 +3690,23 @@ PQflush(PGconn *conn)
return pqFlush(conn);
}
+/*
+ * pqPipelineFlush
+ *
+ * In pipeline mode, data will be flushed only when the out buffer reaches the
+ * threshold value. In non-pipeline mode, it behaves as stock pqFlush.
+ *
+ * Returns 0 on success.
+ */
+static int
+pqPipelineFlush(PGconn *conn)
+{
+ if ((conn->pipelineStatus != PQ_PIPELINE_ON) ||
+ (conn->outCount >= OUTBUFFER_THRESHOLD))
+ return pqFlush(conn);
+ return 0;
+}
+
/*
* PQfreemem - safely frees memory allocated