summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlvaro Herrera <alvherre@alvh.no-ip.org>2021-03-15 18:13:42 -0300
committerAlvaro Herrera <alvherre@alvh.no-ip.org>2021-03-15 18:13:42 -0300
commitacb7e4eb6b1c614c68a62fb3a6a5bba1af0a2659 (patch)
treeff5dccb6a8372d0373a442841d8df4333a234eaa /src
parent146cb3889c3ccb3fce198fe7464a1296a9e107c3 (diff)
downloadpostgresql-acb7e4eb6b1c614c68a62fb3a6a5bba1af0a2659.tar.gz
Implement pipeline mode in libpq
Pipeline mode in libpq lets an application avoid the Sync messages in the FE/BE protocol that are implicit in the old libpq API after each query. The application can then insert Sync at its leisure with a new libpq function PQpipelineSync. This can lead to substantial reductions in query latency. Co-authored-by: Craig Ringer <craig.ringer@enterprisedb.com> Co-authored-by: Matthieu Garrigues <matthieu.garrigues@gmail.com> Co-authored-by: Álvaro Herrera <alvherre@alvh.no-ip.org> Reviewed-by: Andres Freund <andres@anarazel.de> Reviewed-by: Aya Iwata <iwata.aya@jp.fujitsu.com> Reviewed-by: Daniel Vérité <daniel@manitou-mail.org> Reviewed-by: David G. Johnston <david.g.johnston@gmail.com> Reviewed-by: Justin Pryzby <pryzby@telsasoft.com> Reviewed-by: Kirk Jamison <k.jamison@fujitsu.com> Reviewed-by: Michael Paquier <michael.paquier@gmail.com> Reviewed-by: Nikhil Sontakke <nikhils@2ndquadrant.com> Reviewed-by: Vaishnavi Prabakaran <VaishnaviP@fast.au.fujitsu.com> Reviewed-by: Zhihong Yu <zyu@yugabyte.com> Discussion: https://postgr.es/m/CAMsr+YFUjJytRyV4J-16bEoiZyH=4nj+sQ7JP9ajwz=B4dMMZw@mail.gmail.com Discussion: https://postgr.es/m/CAJkzx4T5E-2cQe3dtv2R78dYFvz+in8PY7A8MArvLhs_pg75gg@mail.gmail.com
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/libpqwalreceiver/libpqwalreceiver.c6
-rw-r--r--src/bin/pg_amcheck/pg_amcheck.c2
-rw-r--r--src/interfaces/libpq/exports.txt4
-rw-r--r--src/interfaces/libpq/fe-connect.c37
-rw-r--r--src/interfaces/libpq/fe-exec.c717
-rw-r--r--src/interfaces/libpq/fe-protocol3.c77
-rw-r--r--src/interfaces/libpq/libpq-fe.h21
-rw-r--r--src/interfaces/libpq/libpq-int.h60
-rw-r--r--src/test/modules/Makefile1
-rw-r--r--src/test/modules/libpq_pipeline/.gitignore5
-rw-r--r--src/test/modules/libpq_pipeline/Makefile20
-rw-r--r--src/test/modules/libpq_pipeline/README1
-rw-r--r--src/test/modules/libpq_pipeline/libpq_pipeline.c1303
-rw-r--r--src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl28
-rw-r--r--src/tools/msvc/Mkvcbuild.pm9
-rw-r--r--src/tools/pgindent/typedefs.list2
16 files changed, 2182 insertions, 111 deletions
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 5272eed9ab..f74378110a 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -1019,6 +1019,12 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
walres->err = _("empty query");
break;
+ case PGRES_PIPELINE_SYNC:
+ case PGRES_PIPELINE_ABORTED:
+ walres->status = WALRCV_ERROR;
+ walres->err = _("unexpected pipeline mode");
+ break;
+
case PGRES_NONFATAL_ERROR:
case PGRES_FATAL_ERROR:
case PGRES_BAD_RESPONSE:
diff --git a/src/bin/pg_amcheck/pg_amcheck.c b/src/bin/pg_amcheck/pg_amcheck.c
index 008a75d207..c9d9900693 100644
--- a/src/bin/pg_amcheck/pg_amcheck.c
+++ b/src/bin/pg_amcheck/pg_amcheck.c
@@ -929,6 +929,8 @@ should_processing_continue(PGresult *res)
case PGRES_COPY_IN:
case PGRES_COPY_BOTH:
case PGRES_SINGLE_TUPLE:
+ case PGRES_PIPELINE_SYNC:
+ case PGRES_PIPELINE_ABORTED:
return false;
}
return true;
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index bbc1f90481..5c48c14191 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -179,3 +179,7 @@ PQgetgssctx 176
PQsetSSLKeyPassHook_OpenSSL 177
PQgetSSLKeyPassHook_OpenSSL 178
PQdefaultSSLKeyPassHook_OpenSSL 179
+PQenterPipelineMode 180
+PQexitPipelineMode 181
+PQpipelineSync 182
+PQpipelineStatus 183
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index 4e21057d0f..53b354abb2 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -522,6 +522,23 @@ pqDropConnection(PGconn *conn, bool flushInput)
}
}
+/*
+ * pqFreeCommandQueue
+ * Free all the entries of PGcmdQueueEntry queue passed.
+ */
+static void
+pqFreeCommandQueue(PGcmdQueueEntry *queue)
+{
+ while (queue != NULL)
+ {
+ PGcmdQueueEntry *cur = queue;
+
+ queue = cur->next;
+ if (cur->query)
+ free(cur->query);
+ free(cur);
+ }
+}
/*
* pqDropServerData
@@ -553,6 +570,12 @@ pqDropServerData(PGconn *conn)
}
conn->notifyHead = conn->notifyTail = NULL;
+ pqFreeCommandQueue(conn->cmd_queue_head);
+ conn->cmd_queue_head = conn->cmd_queue_tail = NULL;
+
+ pqFreeCommandQueue(conn->cmd_queue_recycle);
+ conn->cmd_queue_recycle = NULL;
+
/* Reset ParameterStatus data, as well as variables deduced from it */
pstatus = conn->pstatus;
while (pstatus != NULL)
@@ -2459,6 +2482,7 @@ keep_going: /* We will come back to here until there is
/* Drop any PGresult we might have, too */
conn->asyncStatus = PGASYNC_IDLE;
conn->xactStatus = PQTRANS_IDLE;
+ conn->pipelineStatus = PQ_PIPELINE_OFF;
pqClearAsyncResult(conn);
/* Reset conn->status to put the state machine in the right state */
@@ -3917,6 +3941,7 @@ makeEmptyPGconn(void)
conn->status = CONNECTION_BAD;
conn->asyncStatus = PGASYNC_IDLE;
+ conn->pipelineStatus = PQ_PIPELINE_OFF;
conn->xactStatus = PQTRANS_IDLE;
conn->options_valid = false;
conn->nonblocking = false;
@@ -4084,8 +4109,6 @@ freePGconn(PGconn *conn)
if (conn->connip)
free(conn->connip);
/* Note that conn->Pfdebug is not ours to close or free */
- if (conn->last_query)
- free(conn->last_query);
if (conn->write_err_msg)
free(conn->write_err_msg);
if (conn->inBuffer)
@@ -4174,6 +4197,7 @@ closePGconn(PGconn *conn)
conn->status = CONNECTION_BAD; /* Well, not really _bad_ - just absent */
conn->asyncStatus = PGASYNC_IDLE;
conn->xactStatus = PQTRANS_IDLE;
+ conn->pipelineStatus = PQ_PIPELINE_OFF;
pqClearAsyncResult(conn); /* deallocate result */
resetPQExpBuffer(&conn->errorMessage);
release_conn_addrinfo(conn);
@@ -6726,6 +6750,15 @@ PQbackendPID(const PGconn *conn)
return conn->be_pid;
}
+PGpipelineStatus
+PQpipelineStatus(const PGconn *conn)
+{
+ if (!conn)
+ return PQ_PIPELINE_OFF;
+
+ return conn->pipelineStatus;
+}
+
int
PQconnectionNeedsPassword(const PGconn *conn)
{
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 9a038043b2..f3443708a6 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -39,7 +39,9 @@ char *const pgresStatus[] = {
"PGRES_NONFATAL_ERROR",
"PGRES_FATAL_ERROR",
"PGRES_COPY_BOTH",
- "PGRES_SINGLE_TUPLE"
+ "PGRES_SINGLE_TUPLE",
+ "PGRES_PIPELINE_SYNC",
+ "PGRES_PIPELINE_ABORTED"
};
/*
@@ -71,6 +73,8 @@ static PGresult *PQexecFinish(PGconn *conn);
static int PQsendDescribe(PGconn *conn, char desc_type,
const char *desc_target);
static int check_field_number(const PGresult *res, int field_num);
+static void pqPipelineProcessQueue(PGconn *conn);
+static int pqPipelineFlush(PGconn *conn);
/* ----------------
@@ -1171,7 +1175,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
conn->next_result = conn->result;
conn->result = res;
/* And mark the result ready to return */
- conn->asyncStatus = PGASYNC_READY;
+ conn->asyncStatus = PGASYNC_READY_MORE;
}
return 1;
@@ -1185,6 +1189,87 @@ fail:
/*
+ * pqAllocCmdQueueEntry
+ * Get a command queue entry for caller to fill.
+ *
+ * If the recycle queue has a free element, that is returned; if not, a
+ * fresh one is allocated. Caller is responsible for adding it to the
+ * command queue (pqAppendCmdQueueEntry) once the struct is filled in, or
+ * releasing the memory (pqRecycleCmdQueueEntry) if an error occurs.
+ *
+ * If allocation fails, sets the error message and returns NULL.
+ */
+static PGcmdQueueEntry *
+pqAllocCmdQueueEntry(PGconn *conn)
+{
+ PGcmdQueueEntry *entry;
+
+ if (conn->cmd_queue_recycle == NULL)
+ {
+ entry = (PGcmdQueueEntry *) malloc(sizeof(PGcmdQueueEntry));
+ if (entry == NULL)
+ {
+ appendPQExpBufferStr(&conn->errorMessage,
+ libpq_gettext("out of memory\n"));
+ return NULL;
+ }
+ }
+ else
+ {
+ entry = conn->cmd_queue_recycle;
+ conn->cmd_queue_recycle = entry->next;
+ }
+ entry->next = NULL;
+ entry->query = NULL;
+
+ return entry;
+}
+
+/*
+ * pqAppendCmdQueueEntry
+ * Append a caller-allocated command queue entry to the queue.
+ *
+ * The query itself must already have been put in the output buffer by the
+ * caller.
+ */
+static void
+pqAppendCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry)
+{
+ Assert(entry->next == NULL);
+
+ if (conn->cmd_queue_head == NULL)
+ conn->cmd_queue_head = entry;
+ else
+ conn->cmd_queue_tail->next = entry;
+
+ conn->cmd_queue_tail = entry;
+}
+
+/*
+ * pqRecycleCmdQueueEntry
+ * Push a command queue entry onto the freelist.
+ */
+static void
+pqRecycleCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry)
+{
+ if (entry == NULL)
+ return;
+
+ /* recyclable entries should not have a follow-on command */
+ Assert(entry->next == NULL);
+
+ if (entry->query)
+ {
+ free(entry->query);
+ entry->query = NULL;
+ }
+
+ entry->next = conn->cmd_queue_recycle;
+ conn->cmd_queue_recycle = entry;
+}
+
+
+/*
* PQsendQuery
* Submit a query, but don't wait for it to finish
*
@@ -1209,9 +1294,15 @@ PQsendQueryContinue(PGconn *conn, const char *query)
static int
PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
{
+ PGcmdQueueEntry *entry = NULL;
+
if (!PQsendQueryStart(conn, newQuery))
return 0;
+ entry = pqAllocCmdQueueEntry(conn);
+ if (entry == NULL)
+ return 0; /* error msg already set */
+
/* check the argument */
if (!query)
{
@@ -1220,37 +1311,75 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
return 0;
}
- /* construct the outgoing Query message */
- if (pqPutMsgStart('Q', conn) < 0 ||
- pqPuts(query, conn) < 0 ||
- pqPutMsgEnd(conn) < 0)
+ /* Send the query message(s) */
+ if (conn->pipelineStatus == PQ_PIPELINE_OFF)
{
- /* error message should be set up already */
- return 0;
- }
+ /* construct the outgoing Query message */
+ if (pqPutMsgStart('Q', conn) < 0 ||
+ pqPuts(query, conn) < 0 ||
+ pqPutMsgEnd(conn) < 0)
+ {
+ /* error message should be set up already */
+ return 0;
+ }
- /* remember we are using simple query protocol */
- conn->queryclass = PGQUERY_SIMPLE;
+ /* remember we are using simple query protocol */
+ entry->queryclass = PGQUERY_SIMPLE;
+ /* and remember the query text too, if possible */
+ entry->query = strdup(query);
+ }
+ else
+ {
+ /*
+ * In pipeline mode we cannot use the simple protocol, so we send
+ * Parse, Bind, Describe Portal, Execute.
+ */
+ if (pqPutMsgStart('P', conn) < 0 ||
+ pqPuts("", conn) < 0 ||
+ pqPuts(query, conn) < 0 ||
+ pqPutInt(0, 2, conn) < 0 ||
+ pqPutMsgEnd(conn) < 0)
+ goto sendFailed;
+ if (pqPutMsgStart('B', conn) < 0 ||
+ pqPuts("", conn) < 0 ||
+ pqPuts("", conn) < 0 ||
+ pqPutInt(0, 2, conn) < 0 ||
+ pqPutInt(0, 2, conn) < 0 ||
+ pqPutInt(0, 2, conn) < 0 ||
+ pqPutMsgEnd(conn) < 0)
+ goto sendFailed;
+ if (pqPutMsgStart('D', conn) < 0 ||
+ pqPutc('P', conn) < 0 ||
+ pqPuts("", conn) < 0 ||
+ pqPutMsgEnd(conn) < 0)
+ goto sendFailed;
+ if (pqPutMsgStart('E', conn) < 0 ||
+ pqPuts("", conn) < 0 ||
+ pqPutInt(0, 4, conn) < 0 ||
+ pqPutMsgEnd(conn) < 0)
+ goto sendFailed;
- /* and remember the query text too, if possible */
- /* if insufficient memory, last_query just winds up NULL */
- if (conn->last_query)
- free(conn->last_query);
- conn->last_query = strdup(query);
+ entry->queryclass = PGQUERY_EXTENDED;
+ entry->query = strdup(query);
+ }
/*
* Give the data a push. In nonblock mode, don't complain if we're unable
* to send it all; PQgetResult() will do any additional flushing needed.
*/
- if (pqFlush(conn) < 0)
- {
- /* error message should be set up already */
- return 0;
- }
+ if (pqPipelineFlush(conn) < 0)
+ goto sendFailed;
/* OK, it's launched! */
- conn->asyncStatus = PGASYNC_BUSY;
+ pqAppendCmdQueueEntry(conn, entry);
+ if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+ conn->asyncStatus = PGASYNC_BUSY;
return 1;
+
+sendFailed:
+ pqRecycleCmdQueueEntry(conn, entry);
+ /* error message should be set up already */
+ return 0;
}
/*
@@ -1307,6 +1436,8 @@ PQsendPrepare(PGconn *conn,
const char *stmtName, const char *query,
int nParams, const Oid *paramTypes)
{
+ PGcmdQueueEntry *entry = NULL;
+
if (!PQsendQueryStart(conn, true))
return 0;
@@ -1330,6 +1461,10 @@ PQsendPrepare(PGconn *conn,
return 0;
}
+ entry = pqAllocCmdQueueEntry(conn);
+ if (entry == NULL)
+ return 0; /* error msg already set */
+
/* construct the Parse message */
if (pqPutMsgStart('P', conn) < 0 ||
pqPuts(stmtName, conn) < 0 ||
@@ -1356,32 +1491,38 @@ PQsendPrepare(PGconn *conn,
if (pqPutMsgEnd(conn) < 0)
goto sendFailed;
- /* construct the Sync message */
- if (pqPutMsgStart('S', conn) < 0 ||
- pqPutMsgEnd(conn) < 0)
- goto sendFailed;
+ /* Add a Sync, unless in pipeline mode. */
+ if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+ {
+ if (pqPutMsgStart('S', conn) < 0 ||
+ pqPutMsgEnd(conn) < 0)
+ goto sendFailed;
+ }
/* remember we are doing just a Parse */
- conn->queryclass = PGQUERY_PREPARE;
+ entry->queryclass = PGQUERY_PREPARE;
/* and remember the query text too, if possible */
- /* if insufficient memory, last_query just winds up NULL */
- if (conn->last_query)
- free(conn->last_query);
- conn->last_query = strdup(query);
+ /* if insufficient memory, query just winds up NULL */
+ entry->query = strdup(query);
+
+ pqAppendCmdQueueEntry(conn, entry);
+
+ if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+ conn->asyncStatus = PGASYNC_BUSY;
/*
- * Give the data a push. In nonblock mode, don't complain if we're unable
- * to send it all; PQgetResult() will do any additional flushing needed.
+ * Give the data a push (in pipeline mode, only if we're past the size
+ * threshold). In nonblock mode, don't complain if we're unable to send
+ * it all; PQgetResult() will do any additional flushing needed.
*/
- if (pqFlush(conn) < 0)
+ if (pqPipelineFlush(conn) < 0)
goto sendFailed;
- /* OK, it's launched! */
- conn->asyncStatus = PGASYNC_BUSY;
return 1;
sendFailed:
+ pqRecycleCmdQueueEntry(conn, entry);
/* error message should be set up already */
return 0;
}
@@ -1429,7 +1570,8 @@ PQsendQueryPrepared(PGconn *conn,
}
/*
- * Common startup code for PQsendQuery and sibling routines
+ * PQsendQueryStart
+ * Common startup code for PQsendQuery and sibling routines
*/
static bool
PQsendQueryStart(PGconn *conn, bool newQuery)
@@ -1450,20 +1592,57 @@ PQsendQueryStart(PGconn *conn, bool newQuery)
libpq_gettext("no connection to the server\n"));
return false;
}
- /* Can't send while already busy, either. */
- if (conn->asyncStatus != PGASYNC_IDLE)
+
+ /* Can't send while already busy, either, unless enqueuing for later */
+ if (conn->asyncStatus != PGASYNC_IDLE &&
+ conn->pipelineStatus == PQ_PIPELINE_OFF)
{
appendPQExpBufferStr(&conn->errorMessage,
libpq_gettext("another command is already in progress\n"));
return false;
}
- /* initialize async result-accumulation state */
- pqClearAsyncResult(conn);
+ if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+ {
+ /*
+ * When enqueuing commands we don't change much of the connection
+ * state since it's already in use for the current command. The
+ * connection state will get updated when pqPipelineProcessQueue()
+ * advances to start processing the queued message.
+ *
+ * Just make sure we can safely enqueue given the current connection
+ * state. We can enqueue behind another queue item, or behind a
+ * non-queue command (one that sends its own sync), but we can't
+ * enqueue if the connection is in a copy state.
+ */
+ switch (conn->asyncStatus)
+ {
+ case PGASYNC_IDLE:
+ case PGASYNC_READY:
+ case PGASYNC_READY_MORE:
+ case PGASYNC_BUSY:
+ /* ok to queue */
+ break;
+ case PGASYNC_COPY_IN:
+ case PGASYNC_COPY_OUT:
+ case PGASYNC_COPY_BOTH:
+ appendPQExpBufferStr(&conn->errorMessage,
+ libpq_gettext("cannot queue commands during COPY\n"));
+ return false;
+ }
+ }
+ else
+ {
+ /*
+ * This command's results will come in immediately. Initialize async
+ * result-accumulation state
+ */
+ pqClearAsyncResult(conn);
- /* reset single-row processing mode */
- conn->singleRowMode = false;
+ /* reset single-row processing mode */
+ conn->singleRowMode = false;
+ }
/* ready to send command message */
return true;
}
@@ -1487,10 +1666,16 @@ PQsendQueryGuts(PGconn *conn,
int resultFormat)
{
int i;
+ PGcmdQueueEntry *entry;
+
+ entry = pqAllocCmdQueueEntry(conn);
+ if (entry == NULL)
+ return 0; /* error msg already set */
/*
- * We will send Parse (if needed), Bind, Describe Portal, Execute, Sync,
- * using specified statement name and the unnamed portal.
+ * We will send Parse (if needed), Bind, Describe Portal, Execute, Sync
+ * (if not in pipeline mode), using specified statement name and the
+ * unnamed portal.
*/
if (command)
@@ -1600,35 +1785,38 @@ PQsendQueryGuts(PGconn *conn,
pqPutMsgEnd(conn) < 0)
goto sendFailed;
- /* construct the Sync message */
- if (pqPutMsgStart('S', conn) < 0 ||
- pqPutMsgEnd(conn) < 0)
- goto sendFailed;
+ /* construct the Sync message if not in pipeline mode */
+ if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+ {
+ if (pqPutMsgStart('S', conn) < 0 ||
+ pqPutMsgEnd(conn) < 0)
+ goto sendFailed;
+ }
/* remember we are using extended query protocol */
- conn->queryclass = PGQUERY_EXTENDED;
+ entry->queryclass = PGQUERY_EXTENDED;
/* and remember the query text too, if possible */
- /* if insufficient memory, last_query just winds up NULL */
- if (conn->last_query)
- free(conn->last_query);
+ /* if insufficient memory, query just winds up NULL */
if (command)
- conn->last_query = strdup(command);
- else
- conn->last_query = NULL;
+ entry->query = strdup(command);
/*
- * Give the data a push. In nonblock mode, don't complain if we're unable
- * to send it all; PQgetResult() will do any additional flushing needed.
+ * Give the data a push (in pipeline mode, only if we're past the size
+ * threshold). In nonblock mode, don't complain if we're unable to send
+ * it all; PQgetResult() will do any additional flushing needed.
*/
- if (pqFlush(conn) < 0)
+ if (pqPipelineFlush(conn) < 0)
goto sendFailed;
/* OK, it's launched! */
- conn->asyncStatus = PGASYNC_BUSY;
+ pqAppendCmdQueueEntry(conn, entry);
+ if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+ conn->asyncStatus = PGASYNC_BUSY;
return 1;
sendFailed:
+ pqRecycleCmdQueueEntry(conn, entry);
/* error message should be set up already */
return 0;
}
@@ -1647,8 +1835,9 @@ PQsetSingleRowMode(PGconn *conn)
return 0;
if (conn->asyncStatus != PGASYNC_BUSY)
return 0;
- if (conn->queryclass != PGQUERY_SIMPLE &&
- conn->queryclass != PGQUERY_EXTENDED)
+ if (!conn->cmd_queue_head ||
+ (conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE &&
+ conn->cmd_queue_head->queryclass != PGQUERY_EXTENDED))
return 0;
if (conn->result)
return 0;
@@ -1726,14 +1915,17 @@ PQisBusy(PGconn *conn)
return conn->asyncStatus == PGASYNC_BUSY || conn->write_failed;
}
-
/*
* PQgetResult
* Get the next PGresult produced by a query. Returns NULL if no
* query work remains or an error has occurred (e.g. out of
* memory).
+ *
+ * In pipeline mode, once all the result of a query have been returned,
+ * PQgetResult returns NULL to let the user know that the next
+ * query is being processed. At the end of the pipeline, returns a
+ * result with PQresultStatus(result) == PGRES_PIPELINE_SYNC.
*/
-
PGresult *
PQgetResult(PGconn *conn)
{
@@ -1803,8 +1995,62 @@ PQgetResult(PGconn *conn)
{
case PGASYNC_IDLE:
res = NULL; /* query is complete */
+ if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+ {
+ /*
+ * We're about to return the NULL that terminates the round of
+ * results from the current query; prepare to send the results
+ * of the next query when we're called next. Also, since this
+ * is the start of the results of the next query, clear any
+ * prior error message.
+ */
+ resetPQExpBuffer(&conn->errorMessage);
+ pqPipelineProcessQueue(conn);
+ }
break;
case PGASYNC_READY:
+
+ /*
+ * For any query type other than simple query protocol, we advance
+ * the command queue here. This is because for simple query
+ * protocol we can get the READY state multiple times before the
+ * command is actually complete, since the command string can
+ * contain many queries. In simple query protocol, the queue
+ * advance is done by fe-protocol3 when it receives ReadyForQuery.
+ */
+ if (conn->cmd_queue_head &&
+ conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE)
+ pqCommandQueueAdvance(conn);
+ res = pqPrepareAsyncResult(conn);
+ if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+ {
+ /*
+ * We're about to send the results of the current query. Set
+ * us idle now, and ...
+ */
+ conn->asyncStatus = PGASYNC_IDLE;
+
+ /*
+ * ... in cases when we're sending a pipeline-sync result,
+ * move queue processing forwards immediately, so that next
+ * time we're called, we're prepared to return the next result
+ * received from the server. In all other cases, leave the
+ * queue state change for next time, so that a terminating
+ * NULL result is sent.
+ *
+ * (In other words: we don't return a NULL after a pipeline
+ * sync.)
+ */
+ if (res && res->resultStatus == PGRES_PIPELINE_SYNC)
+ pqPipelineProcessQueue(conn);
+ }
+ else
+ {
+ /* Set the state back to BUSY, allowing parsing to proceed. */
+ conn->asyncStatus = PGASYNC_BUSY;
+ }
+ break;
+ case PGASYNC_READY_MORE:
res = pqPrepareAsyncResult(conn);
/* Set the state back to BUSY, allowing parsing to proceed. */
conn->asyncStatus = PGASYNC_BUSY;
@@ -1985,6 +2231,13 @@ PQexecStart(PGconn *conn)
if (!conn)
return false;
+ if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+ {
+ appendPQExpBufferStr(&conn->errorMessage,
+ libpq_gettext("synchronous command execution functions are not allowed in pipeline mode\n"));
+ return false;
+ }
+
/*
* Since this is the beginning of a query cycle, reset the error buffer.
*/
@@ -2148,6 +2401,8 @@ PQsendDescribePortal(PGconn *conn, const char *portal)
static int
PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
{
+ PGcmdQueueEntry *entry = NULL;
+
/* Treat null desc_target as empty string */
if (!desc_target)
desc_target = "";
@@ -2155,6 +2410,10 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
if (!PQsendQueryStart(conn, true))
return 0;
+ entry = pqAllocCmdQueueEntry(conn);
+ if (entry == NULL)
+ return 0; /* error msg already set */
+
/* construct the Describe message */
if (pqPutMsgStart('D', conn) < 0 ||
pqPutc(desc_type, conn) < 0 ||
@@ -2163,32 +2422,32 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
goto sendFailed;
/* construct the Sync message */
- if (pqPutMsgStart('S', conn) < 0 ||
- pqPutMsgEnd(conn) < 0)
- goto sendFailed;
-
- /* remember we are doing a Describe */
- conn->queryclass = PGQUERY_DESCRIBE;
-
- /* reset last_query string (not relevant now) */
- if (conn->last_query)
+ if (conn->pipelineStatus == PQ_PIPELINE_OFF)
{
- free(conn->last_query);
- conn->last_query = NULL;
+ if (pqPutMsgStart('S', conn) < 0 ||
+ pqPutMsgEnd(conn) < 0)
+ goto sendFailed;
}
+ /* remember we are doing a Describe */
+ entry->queryclass = PGQUERY_DESCRIBE;
+
/*
- * Give the data a push. In nonblock mode, don't complain if we're unable
- * to send it all; PQgetResult() will do any additional flushing needed.
+ * Give the data a push (in pipeline mode, only if we're past the size
+ * threshold). In nonblock mode, don't complain if we're unable to send
+ * it all; PQgetResult() will do any additional flushing needed.
*/
- if (pqFlush(conn) < 0)
+ if (pqPipelineFlush(conn) < 0)
goto sendFailed;
/* OK, it's launched! */
- conn->asyncStatus = PGASYNC_BUSY;
+ pqAppendCmdQueueEntry(conn, entry);
+ if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+ conn->asyncStatus = PGASYNC_BUSY;
return 1;
sendFailed:
+ pqRecycleCmdQueueEntry(conn, entry);
/* error message should be set up already */
return 0;
}
@@ -2327,7 +2586,8 @@ PQputCopyEnd(PGconn *conn, const char *errormsg)
* If we sent the COPY command in extended-query mode, we must issue a
* Sync as well.
*/
- if (conn->queryclass != PGQUERY_SIMPLE)
+ if (conn->cmd_queue_head &&
+ conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE)
{
if (pqPutMsgStart('S', conn) < 0 ||
pqPutMsgEnd(conn) < 0)
@@ -2541,6 +2801,13 @@ PQfn(PGconn *conn,
*/
resetPQExpBuffer(&conn->errorMessage);
+ if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+ {
+ appendPQExpBufferStr(&conn->errorMessage,
+ libpq_gettext("PQfn not allowed in pipeline mode\n"));
+ return NULL;
+ }
+
if (conn->sock == PGINVALID_SOCKET || conn->asyncStatus != PGASYNC_IDLE ||
conn->result != NULL)
{
@@ -2555,6 +2822,277 @@ PQfn(PGconn *conn,
args, nargs);
}
+/* ====== Pipeline mode support ======== */
+
+/*
+ * PQenterPipelineMode
+ * Put an idle connection in pipeline mode.
+ *
+ * Returns 1 on success. On failure, errorMessage is set and 0 is returned.
+ *
+ * Commands submitted after this can be pipelined on the connection;
+ * there's no requirement to wait for one to finish before the next is
+ * dispatched.
+ *
+ * Queuing of a new query or syncing during COPY is not allowed.
+ *
+ * A set of commands is terminated by a PQpipelineSync. Multiple sync
+ * points can be established while in pipeline mode. Pipeline mode can
+ * be exited by calling PQexitPipelineMode() once all results are processed.
+ *
+ * This doesn't actually send anything on the wire, it just puts libpq
+ * into a state where it can pipeline work.
+ */
+int
+PQenterPipelineMode(PGconn *conn)
+{
+ if (!conn)
+ return 0;
+
+ /* succeed with no action if already in pipeline mode */
+ if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+ return 1;
+
+ if (conn->asyncStatus != PGASYNC_IDLE)
+ {
+ appendPQExpBufferStr(&conn->errorMessage,
+ libpq_gettext("cannot enter pipeline mode, connection not idle\n"));
+ return 0;
+ }
+
+ conn->pipelineStatus = PQ_PIPELINE_ON;
+
+ return 1;
+}
+
+/*
+ * PQexitPipelineMode
+ * End pipeline mode and return to normal command mode.
+ *
+ * Returns 1 in success (pipeline mode successfully ended, or not in pipeline
+ * mode).
+ *
+ * Returns 0 if in pipeline mode and cannot be ended yet. Error message will
+ * be set.
+ */
+int
+PQexitPipelineMode(PGconn *conn)
+{
+ if (!conn)
+ return 0;
+
+ if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+ return 1;
+
+ switch (conn->asyncStatus)
+ {
+ case PGASYNC_READY:
+ case PGASYNC_READY_MORE:
+ /* there are some uncollected results */
+ appendPQExpBufferStr(&conn->errorMessage,
+ libpq_gettext("cannot exit pipeline mode with uncollected results\n"));
+ return 0;
+
+ case PGASYNC_BUSY:
+ appendPQExpBufferStr(&conn->errorMessage,
+ libpq_gettext("cannot exit pipeline mode while busy\n"));
+ return 0;
+
+ default:
+ /* OK */
+ break;
+ }
+
+ /* still work to process */
+ if (conn->cmd_queue_head != NULL)
+ {
+ appendPQExpBufferStr(&conn->errorMessage,
+ libpq_gettext("cannot exit pipeline mode with uncollected results\n"));
+ return 0;
+ }
+
+ conn->pipelineStatus = PQ_PIPELINE_OFF;
+ conn->asyncStatus = PGASYNC_IDLE;
+
+ /* Flush any pending data in out buffer */
+ if (pqFlush(conn) < 0)
+ return 0; /* error message is setup already */
+ return 1;
+}
+
+/*
+ * pqCommandQueueAdvance
+ * Remove one query from the command queue, when we receive
+ * all results from the server that pertain to it.
+ */
+void
+pqCommandQueueAdvance(PGconn *conn)
+{
+ PGcmdQueueEntry *prevquery;
+
+ if (conn->cmd_queue_head == NULL)
+ return;
+
+ /* delink from queue */
+ prevquery = conn->cmd_queue_head;
+ conn->cmd_queue_head = conn->cmd_queue_head->next;
+
+ /* and make it recyclable */
+ prevquery->next = NULL;
+ pqRecycleCmdQueueEntry(conn, prevquery);
+}
+
+/*
+ * pqPipelineProcessQueue: subroutine for PQgetResult
+ * In pipeline mode, start processing the results of the next query in the queue.
+ */
+void
+pqPipelineProcessQueue(PGconn *conn)
+{
+ switch (conn->asyncStatus)
+ {
+ case PGASYNC_COPY_IN:
+ case PGASYNC_COPY_OUT:
+ case PGASYNC_COPY_BOTH:
+ case PGASYNC_READY:
+ case PGASYNC_READY_MORE:
+ case PGASYNC_BUSY:
+ /* client still has to process current query or results */
+ return;
+ case PGASYNC_IDLE:
+ /* next query please */
+ break;
+ }
+
+ /* Nothing to do if not in pipeline mode, or queue is empty */
+ if (conn->pipelineStatus == PQ_PIPELINE_OFF ||
+ conn->cmd_queue_head == NULL)
+ return;
+
+ /* Initialize async result-accumulation state */
+ pqClearAsyncResult(conn);
+
+ /*
+ * Reset single-row processing mode. (Client has to set it up for each
+ * query, if desired.)
+ */
+ conn->singleRowMode = false;
+
+ if (conn->pipelineStatus == PQ_PIPELINE_ABORTED &&
+ conn->cmd_queue_head->queryclass != PGQUERY_SYNC)
+ {
+ /*
+ * In an aborted pipeline we don't get anything from the server for
+ * each result; we're just discarding commands from the queue until we
+ * get to the next sync from the server.
+ *
+ * The PGRES_PIPELINE_ABORTED results tell the client that its queries
+ * got aborted.
+ */
+ conn->result = PQmakeEmptyPGresult(conn, PGRES_PIPELINE_ABORTED);
+ if (!conn->result)
+ {
+ appendPQExpBufferStr(&conn->errorMessage,
+ libpq_gettext("out of memory\n"));
+ pqSaveErrorResult(conn);
+ return;
+ }
+ conn->asyncStatus = PGASYNC_READY;
+ }
+ else
+ {
+ /* allow parsing to continue */
+ conn->asyncStatus = PGASYNC_BUSY;
+ }
+}
+
+/*
+ * PQpipelineSync
+ * Send a Sync message as part of a pipeline, and flush to server
+ *
+ * It's legal to start submitting more commands in the pipeline immediately,
+ * without waiting for the results of the current pipeline. There's no need to
+ * end pipeline mode and start it again.
+ *
+ * If a command in a pipeline fails, every subsequent command up to and including
+ * the result to the Sync message sent by PQpipelineSync gets set to
+ * PGRES_PIPELINE_ABORTED state. If the whole pipeline is processed without
+ * error, a PGresult with PGRES_PIPELINE_SYNC is produced.
+ *
+ * Queries can already have been sent before PQpipelineSync is called, but
+ * PQpipelineSync need to be called before retrieving command results.
+ *
+ * The connection will remain in pipeline mode and unavailable for new
+ * synchronous command execution functions until all results from the pipeline
+ * are processed by the client.
+ */
+int
+PQpipelineSync(PGconn *conn)
+{
+ PGcmdQueueEntry *entry;
+
+ if (!conn)
+ return 0;
+
+ if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+ {
+ appendPQExpBufferStr(&conn->errorMessage,
+ libpq_gettext("cannot send pipeline when not in pipeline mode\n"));
+ return 0;
+ }
+
+ switch (conn->asyncStatus)
+ {
+ case PGASYNC_COPY_IN:
+ case PGASYNC_COPY_OUT:
+ case PGASYNC_COPY_BOTH:
+ /* should be unreachable */
+ appendPQExpBufferStr(&conn->errorMessage,
+ "internal error: cannot send pipeline while in COPY\n");
+ return 0;
+ case PGASYNC_READY:
+ case PGASYNC_READY_MORE:
+ case PGASYNC_BUSY:
+ case PGASYNC_IDLE:
+ /* OK to send sync */
+ break;
+ }
+
+ entry = pqAllocCmdQueueEntry(conn);
+ if (entry == NULL)
+ return 0; /* error msg already set */
+
+ entry->queryclass = PGQUERY_SYNC;
+ entry->query = NULL;
+
+ /* construct the Sync message */
+ if (pqPutMsgStart('S', conn) < 0 ||
+ pqPutMsgEnd(conn) < 0)
+ goto sendFailed;
+
+ pqAppendCmdQueueEntry(conn, entry);
+
+ /*
+ * Give the data a push. In nonblock mode, don't complain if we're unable
+ * to send it all; PQgetResult() will do any additional flushing needed.
+ */
+ if (PQflush(conn) < 0)
+ goto sendFailed;
+
+ /*
+ * Call pqPipelineProcessQueue so the user can call start calling
+ * PQgetResult.
+ */
+ pqPipelineProcessQueue(conn);
+
+ return 1;
+
+sendFailed:
+ pqRecycleCmdQueueEntry(conn, entry);
+ /* error message should be set up already */
+ return 0;
+}
+
/* ====== accessor funcs for PGresult ======== */
@@ -2569,7 +3107,7 @@ PQresultStatus(const PGresult *res)
char *
PQresStatus(ExecStatusType status)
{
- if ((unsigned int) status >= sizeof pgresStatus / sizeof pgresStatus[0])
+ if ((unsigned int) status >= lengthof(pgresStatus))
return libpq_gettext("invalid ExecStatusType code");
return pgresStatus[status];
}
@@ -3152,6 +3690,23 @@ PQflush(PGconn *conn)
return pqFlush(conn);
}
+/*
+ * pqPipelineFlush
+ *
+ * In pipeline mode, data will be flushed only when the out buffer reaches the
+ * threshold value. In non-pipeline mode, it behaves as stock pqFlush.
+ *
+ * Returns 0 on success.
+ */
+static int
+pqPipelineFlush(PGconn *conn)
+{
+ if ((conn->pipelineStatus != PQ_PIPELINE_ON) ||
+ (conn->outCount >= OUTBUFFER_THRESHOLD))
+ return pqFlush(conn);
+ return 0;
+}
+
/*
* PQfreemem - safely frees memory allocated
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index eb55d528fb..306e89acfd 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -159,6 +159,18 @@ pqParseInput3(PGconn *conn)
return;
/*
+ * We're also notionally not-IDLE when in pipeline mode the state
+ * says "idle" (so we have completed receiving the results of one
+ * query from the server and dispatched them to the application)
+ * but another query is queued; yield back control to caller so
+ * that they can initiate processing of the next query in the
+ * queue.
+ */
+ if (conn->pipelineStatus != PQ_PIPELINE_OFF &&
+ conn->cmd_queue_head != NULL)
+ return;
+
+ /*
* Unexpected message in IDLE state; need to recover somehow.
* ERROR messages are handled using the notice processor;
* ParameterStatus is handled normally; anything else is just
@@ -179,6 +191,7 @@ pqParseInput3(PGconn *conn)
}
else
{
+ /* Any other case is unexpected and we summarily skip it */
pqInternalNotice(&conn->noticeHooks,
"message type 0x%02x arrived from server while idle",
id);
@@ -217,10 +230,37 @@ pqParseInput3(PGconn *conn)
return;
conn->asyncStatus = PGASYNC_READY;
break;
- case 'Z': /* backend is ready for new query */
+ case 'Z': /* sync response, backend is ready for new
+ * query */
if (getReadyForQuery(conn))
return;
- conn->asyncStatus = PGASYNC_IDLE;
+ if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+ {
+ conn->result = PQmakeEmptyPGresult(conn,
+ PGRES_PIPELINE_SYNC);
+ if (!conn->result)
+ {
+ appendPQExpBufferStr(&conn->errorMessage,
+ libpq_gettext("out of memory"));
+ pqSaveErrorResult(conn);
+ }
+ else
+ {
+ conn->pipelineStatus = PQ_PIPELINE_ON;
+ conn->asyncStatus = PGASYNC_READY;
+ }
+ }
+ else
+ {
+ /*
+ * In simple query protocol, advance the command queue
+ * (see PQgetResult).
+ */
+ if (conn->cmd_queue_head &&
+ conn->cmd_queue_head->queryclass == PGQUERY_SIMPLE)
+ pqCommandQueueAdvance(conn);
+ conn->asyncStatus = PGASYNC_IDLE;
+ }
break;
case 'I': /* empty query */
if (conn->result == NULL)
@@ -238,7 +278,8 @@ pqParseInput3(PGconn *conn)
break;
case '1': /* Parse Complete */
/* If we're doing PQprepare, we're done; else ignore */
- if (conn->queryclass == PGQUERY_PREPARE)
+ if (conn->cmd_queue_head &&
+ conn->cmd_queue_head->queryclass == PGQUERY_PREPARE)
{
if (conn->result == NULL)
{
@@ -285,7 +326,8 @@ pqParseInput3(PGconn *conn)
conn->inCursor += msgLength;
}
else if (conn->result == NULL ||
- conn->queryclass == PGQUERY_DESCRIBE)
+ (conn->cmd_queue_head &&
+ conn->cmd_queue_head->queryclass == PGQUERY_DESCRIBE))
{
/* First 'T' in a query sequence */
if (getRowDescriptions(conn, msgLength))
@@ -316,7 +358,8 @@ pqParseInput3(PGconn *conn)
* instead of PGRES_TUPLES_OK. Otherwise we can just
* ignore this message.
*/
- if (conn->queryclass == PGQUERY_DESCRIBE)
+ if (conn->cmd_queue_head &&
+ conn->cmd_queue_head->queryclass == PGQUERY_DESCRIBE)
{
if (conn->result == NULL)
{
@@ -445,7 +488,7 @@ handleSyncLoss(PGconn *conn, char id, int msgLength)
id, msgLength);
/* build an error result holding the error message */
pqSaveErrorResult(conn);
- conn->asyncStatus = PGASYNC_READY; /* drop out of GetResult wait loop */
+ conn->asyncStatus = PGASYNC_READY; /* drop out of PQgetResult wait loop */
/* flush input data since we're giving up on processing it */
pqDropConnection(conn, true);
conn->status = CONNECTION_BAD; /* No more connection to backend */
@@ -471,7 +514,9 @@ getRowDescriptions(PGconn *conn, int msgLength)
* PGresult created by getParamDescriptions, and we should fill data into
* that. Otherwise, create a new, empty PGresult.
*/
- if (conn->queryclass == PGQUERY_DESCRIBE)
+ if (!conn->cmd_queue_head ||
+ (conn->cmd_queue_head &&
+ conn->cmd_queue_head->queryclass == PGQUERY_DESCRIBE))
{
if (conn->result)
result = conn->result;
@@ -568,7 +613,9 @@ getRowDescriptions(PGconn *conn, int msgLength)
* If we're doing a Describe, we're done, and ready to pass the result
* back to the client.
*/
- if (conn->queryclass == PGQUERY_DESCRIBE)
+ if ((!conn->cmd_queue_head) ||
+ (conn->cmd_queue_head &&
+ conn->cmd_queue_head->queryclass == PGQUERY_DESCRIBE))
{
conn->asyncStatus = PGASYNC_READY;
return 0;
@@ -841,6 +888,10 @@ pqGetErrorNotice3(PGconn *conn, bool isError)
PQExpBufferData workBuf;
char id;
+ /* If in pipeline mode, set error indicator for it */
+ if (isError && conn->pipelineStatus != PQ_PIPELINE_OFF)
+ conn->pipelineStatus = PQ_PIPELINE_ABORTED;
+
/*
* If this is an error message, pre-emptively clear any incomplete query
* result we may have. We'd just throw it away below anyway, and
@@ -897,8 +948,8 @@ pqGetErrorNotice3(PGconn *conn, bool isError)
* might need it for an error cursor display, which is only true if there
* is a PG_DIAG_STATEMENT_POSITION field.
*/
- if (have_position && conn->last_query && res)
- res->errQuery = pqResultStrdup(res, conn->last_query);
+ if (have_position && res && conn->cmd_queue_head && conn->cmd_queue_head->query)
+ res->errQuery = pqResultStrdup(res, conn->cmd_queue_head->query);
/*
* Now build the "overall" error message for PQresultErrorMessage.
@@ -1817,7 +1868,8 @@ pqEndcopy3(PGconn *conn)
* If we sent the COPY command in extended-query mode, we must issue a
* Sync as well.
*/
- if (conn->queryclass != PGQUERY_SIMPLE)
+ if (conn->cmd_queue_head &&
+ conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE)
{
if (pqPutMsgStart('S', conn) < 0 ||
pqPutMsgEnd(conn) < 0)
@@ -1897,6 +1949,9 @@ pqFunctionCall3(PGconn *conn, Oid fnid,
int avail;
int i;
+ /* already validated by PQfn */
+ Assert(conn->pipelineStatus == PQ_PIPELINE_OFF);
+
/* PQfn already validated connection state */
if (pqPutMsgStart('F', conn) < 0 || /* function call msg */
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index fa9b62a844..cee42d4843 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -96,7 +96,10 @@ typedef enum
PGRES_NONFATAL_ERROR, /* notice or warning message */
PGRES_FATAL_ERROR, /* query failed */
PGRES_COPY_BOTH, /* Copy In/Out data transfer in progress */
- PGRES_SINGLE_TUPLE /* single tuple from larger resultset */
+ PGRES_SINGLE_TUPLE, /* single tuple from larger resultset */
+ PGRES_PIPELINE_SYNC, /* pipeline synchronization point */
+ PGRES_PIPELINE_ABORTED, /* Command didn't run because of an abort
+ * earlier in a pipeline */
} ExecStatusType;
typedef enum
@@ -136,6 +139,16 @@ typedef enum
PQPING_NO_ATTEMPT /* connection not attempted (bad params) */
} PGPing;
+/*
+ * PGpipelineStatus - Current status of pipeline mode
+ */
+typedef enum
+{
+ PQ_PIPELINE_OFF,
+ PQ_PIPELINE_ON,
+ PQ_PIPELINE_ABORTED
+} PGpipelineStatus;
+
/* PGconn encapsulates a connection to the backend.
* The contents of this struct are not supposed to be known to applications.
*/
@@ -327,6 +340,7 @@ extern int PQserverVersion(const PGconn *conn);
extern char *PQerrorMessage(const PGconn *conn);
extern int PQsocket(const PGconn *conn);
extern int PQbackendPID(const PGconn *conn);
+extern PGpipelineStatus PQpipelineStatus(const PGconn *conn);
extern int PQconnectionNeedsPassword(const PGconn *conn);
extern int PQconnectionUsedPassword(const PGconn *conn);
extern int PQclientEncoding(const PGconn *conn);
@@ -434,6 +448,11 @@ extern PGresult *PQgetResult(PGconn *conn);
extern int PQisBusy(PGconn *conn);
extern int PQconsumeInput(PGconn *conn);
+/* Routines for pipeline mode management */
+extern int PQenterPipelineMode(PGconn *conn);
+extern int PQexitPipelineMode(PGconn *conn);
+extern int PQpipelineSync(PGconn *conn);
+
/* LISTEN/NOTIFY support */
extern PGnotify *PQnotifies(PGconn *conn);
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 2f052f61f8..6374ec657a 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -217,21 +217,16 @@ typedef enum
{
PGASYNC_IDLE, /* nothing's happening, dude */
PGASYNC_BUSY, /* query in progress */
- PGASYNC_READY, /* result ready for PQgetResult */
+ PGASYNC_READY, /* query done, waiting for client to fetch
+ * result */
+ PGASYNC_READY_MORE, /* query done, waiting for client to fetch
+ * result, more results expected from this
+ * query */
PGASYNC_COPY_IN, /* Copy In data transfer in progress */
PGASYNC_COPY_OUT, /* Copy Out data transfer in progress */
PGASYNC_COPY_BOTH /* Copy In/Out data transfer in progress */
} PGAsyncStatusType;
-/* PGQueryClass tracks which query protocol we are now executing */
-typedef enum
-{
- PGQUERY_SIMPLE, /* simple Query protocol (PQexec) */
- PGQUERY_EXTENDED, /* full Extended protocol (PQexecParams) */
- PGQUERY_PREPARE, /* Parse only (PQprepare) */
- PGQUERY_DESCRIBE /* Describe Statement or Portal */
-} PGQueryClass;
-
/* Target server type (decoded value of target_session_attrs) */
typedef enum
{
@@ -306,6 +301,29 @@ typedef enum pg_conn_host_type
} pg_conn_host_type;
/*
+ * PGQueryClass tracks which query protocol is in use for each command queue
+ * entry, or special operation in execution
+ */
+typedef enum
+{
+ PGQUERY_SIMPLE, /* simple Query protocol (PQexec) */
+ PGQUERY_EXTENDED, /* full Extended protocol (PQexecParams) */
+ PGQUERY_PREPARE, /* Parse only (PQprepare) */
+ PGQUERY_DESCRIBE, /* Describe Statement or Portal */
+ PGQUERY_SYNC /* Sync (at end of a pipeline) */
+} PGQueryClass;
+
+/*
+ * An entry in the pending command queue.
+ */
+typedef struct PGcmdQueueEntry
+{
+ PGQueryClass queryclass; /* Query type */
+ char *query; /* SQL command, or NULL if none/unknown/OOM */
+ struct PGcmdQueueEntry *next; /* list link */
+} PGcmdQueueEntry;
+
+/*
* pg_conn_host stores all information about each of possibly several hosts
* mentioned in the connection string. Most fields are derived by splitting
* the relevant connection parameter (e.g., pghost) at commas.
@@ -389,12 +407,11 @@ struct pg_conn
ConnStatusType status;
PGAsyncStatusType asyncStatus;
PGTransactionStatusType xactStatus; /* never changes to ACTIVE */
- PGQueryClass queryclass;
- char *last_query; /* last SQL command, or NULL if unknown */
char last_sqlstate[6]; /* last reported SQLSTATE */
bool options_valid; /* true if OK to attempt connection */
bool nonblocking; /* whether this connection is using nonblock
* sending semantics */
+ PGpipelineStatus pipelineStatus; /* status of pipeline mode */
bool singleRowMode; /* return current query result row-by-row? */
char copy_is_binary; /* 1 = copy binary, 0 = copy text */
int copy_already_done; /* # bytes already returned in COPY OUT */
@@ -407,6 +424,19 @@ struct pg_conn
pg_conn_host *connhost; /* details about each named host */
char *connip; /* IP address for current network connection */
+ /*
+ * The pending command queue as a singly-linked list. Head is the command
+ * currently in execution, tail is where new commands are added.
+ */
+ PGcmdQueueEntry *cmd_queue_head;
+ PGcmdQueueEntry *cmd_queue_tail;
+
+ /*
+ * To save malloc traffic, we don't free entries right away; instead we
+ * save them in this list for possible reuse.
+ */
+ PGcmdQueueEntry *cmd_queue_recycle;
+
/* Connection data */
pgsocket sock; /* FD for socket, PGINVALID_SOCKET if
* unconnected */
@@ -622,6 +652,7 @@ extern void pqSaveMessageField(PGresult *res, char code,
extern void pqSaveParameterStatus(PGconn *conn, const char *name,
const char *value);
extern int pqRowProcessor(PGconn *conn, const char **errmsgp);
+extern void pqCommandQueueAdvance(PGconn *conn);
extern int PQsendQueryContinue(PGconn *conn, const char *query);
/* === in fe-protocol3.c === */
@@ -795,6 +826,11 @@ extern ssize_t pg_GSS_read(PGconn *conn, void *ptr, size_t len);
*/
#define pqIsnonblocking(conn) ((conn)->nonblocking)
+/*
+ * Connection's outbuffer threshold, for pipeline mode.
+ */
+#define OUTBUFFER_THRESHOLD 65536
+
#ifdef ENABLE_NLS
extern char *libpq_gettext(const char *msgid) pg_attribute_format_arg(1);
extern char *libpq_ngettext(const char *msgid, const char *msgid_plural, unsigned long n) pg_attribute_format_arg(1) pg_attribute_format_arg(2);
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index 5391f461a2..93e7829c67 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -10,6 +10,7 @@ SUBDIRS = \
delay_execution \
dummy_index_am \
dummy_seclabel \
+ libpq_pipeline \
plsample \
snapshot_too_old \
test_bloomfilter \
diff --git a/src/test/modules/libpq_pipeline/.gitignore b/src/test/modules/libpq_pipeline/.gitignore
new file mode 100644
index 0000000000..3a11e786b8
--- /dev/null
+++ b/src/test/modules/libpq_pipeline/.gitignore
@@ -0,0 +1,5 @@
+# Generated subdirectories
+/log/
+/results/
+/tmp_check/
+/libpq_pipeline
diff --git a/src/test/modules/libpq_pipeline/Makefile b/src/test/modules/libpq_pipeline/Makefile
new file mode 100644
index 0000000000..b798f5fbbc
--- /dev/null
+++ b/src/test/modules/libpq_pipeline/Makefile
@@ -0,0 +1,20 @@
+# src/test/modules/libpq_pipeline/Makefile
+
+PROGRAM = libpq_pipeline
+OBJS = libpq_pipeline.o
+
+PG_CPPFLAGS = -I$(libpq_srcdir)
+PG_LIBS_INTERNAL += $(libpq_pgport)
+
+TAP_TESTS = 1
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/libpq_pipeline
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/libpq_pipeline/README b/src/test/modules/libpq_pipeline/README
new file mode 100644
index 0000000000..d8174dd579
--- /dev/null
+++ b/src/test/modules/libpq_pipeline/README
@@ -0,0 +1 @@
+Test programs and libraries for libpq
diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c
new file mode 100644
index 0000000000..03eb3df504
--- /dev/null
+++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c
@@ -0,0 +1,1303 @@
+/*-------------------------------------------------------------------------
+ *
+ * libpq_pipeline.c
+ * Verify libpq pipeline execution functionality
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/test/modules/libpq_pipeline/libpq_pipeline.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include <sys/time.h>
+#ifdef HAVE_SYS_SELECT_H
+#include <sys/select.h>
+#endif
+
+#include "catalog/pg_type_d.h"
+#include "common/fe_memutils.h"
+#include "libpq-fe.h"
+#include "portability/instr_time.h"
+
+
+static void exit_nicely(PGconn *conn);
+
+const char *const progname = "libpq_pipeline";
+
+
+#define DEBUG
+#ifdef DEBUG
+#define pg_debug(...) do { fprintf(stderr, __VA_ARGS__); } while (0)
+#else
+#define pg_debug(...)
+#endif
+
+static const char *const drop_table_sql =
+"DROP TABLE IF EXISTS pq_pipeline_demo";
+static const char *const create_table_sql =
+"CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer);";
+static const char *const insert_sql =
+"INSERT INTO pq_pipeline_demo(itemno) VALUES ($1);";
+
+/* max char length of an int32, plus sign and null terminator */
+#define MAXINTLEN 12
+
+static void
+exit_nicely(PGconn *conn)
+{
+ PQfinish(conn);
+ exit(1);
+}
+
+/*
+ * Print an error to stderr and terminate the program.
+ */
+#define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)
+static void
+pg_fatal_impl(int line, const char *fmt,...)
+{
+ va_list args;
+
+
+ fflush(stdout);
+
+ fprintf(stderr, "\n%s:%d: ", progname, line);
+ va_start(args, fmt);
+ vfprintf(stderr, fmt, args);
+ va_end(args);
+ Assert(fmt[strlen(fmt) - 1] != '\n');
+ fprintf(stderr, "\n");
+ exit(1);
+}
+
+static void
+test_disallowed_in_pipeline(PGconn *conn)
+{
+ PGresult *res = NULL;
+
+ fprintf(stderr, "test error cases... ");
+
+ if (PQisnonblocking(conn))
+ pg_fatal("Expected blocking connection mode");
+
+ if (PQenterPipelineMode(conn) != 1)
+ pg_fatal("Unable to enter pipeline mode");
+
+ if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
+ pg_fatal("Pipeline mode not activated properly");
+
+ /* PQexec should fail in pipeline mode */
+ res = PQexec(conn, "SELECT 1");
+ if (PQresultStatus(res) != PGRES_FATAL_ERROR)
+ pg_fatal("PQexec should fail in pipeline mode but succeeded");
+
+ /* Entering pipeline mode when already in pipeline mode is OK */
+ if (PQenterPipelineMode(conn) != 1)
+ pg_fatal("re-entering pipeline mode should be a no-op but failed");
+
+ if (PQisBusy(conn) != 0)
+ pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1");
+
+ /* ok, back to normal command mode */
+ if (PQexitPipelineMode(conn) != 1)
+ pg_fatal("couldn't exit idle empty pipeline mode");
+
+ if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
+ pg_fatal("Pipeline mode not terminated properly");
+
+ /* exiting pipeline mode when not in pipeline mode should be a no-op */
+ if (PQexitPipelineMode(conn) != 1)
+ pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed");
+
+ /* can now PQexec again */
+ res = PQexec(conn, "SELECT 1");
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s",
+ PQerrorMessage(conn));
+
+ fprintf(stderr, "ok\n");
+}
+
+static void
+test_multi_pipelines(PGconn *conn)
+{
+ PGresult *res = NULL;
+ const char *dummy_params[1] = {"1"};
+ Oid dummy_param_oids[1] = {INT4OID};
+
+ fprintf(stderr, "multi pipeline... ");
+
+ /*
+ * Queue up a couple of small pipelines and process each without returning
+ * to command mode first.
+ */
+ if (PQenterPipelineMode(conn) != 1)
+ pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+
+ if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
+ dummy_params, NULL, NULL, 0) != 1)
+ pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
+
+ if (PQpipelineSync(conn) != 1)
+ pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
+
+ if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
+ dummy_params, NULL, NULL, 0) != 1)
+ pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
+
+ if (PQpipelineSync(conn) != 1)
+ pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+
+ /* OK, start processing the results */
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+ PQerrorMessage(conn));
+
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pg_fatal("Unexpected result code %s from first pipeline item",
+ PQresStatus(PQresultStatus(res)));
+ PQclear(res);
+ res = NULL;
+
+ if (PQgetResult(conn) != NULL)
+ pg_fatal("PQgetResult returned something extra after first result");
+
+ if (PQexitPipelineMode(conn) != 0)
+ pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
+
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("PQgetResult returned null when sync result expected: %s",
+ PQerrorMessage(conn));
+
+ if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+ pg_fatal("Unexpected result code %s instead of sync result, error: %s",
+ PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+ PQclear(res);
+
+ /* second pipeline */
+
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+ PQerrorMessage(conn));
+
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pg_fatal("Unexpected result code %s from second pipeline item",
+ PQresStatus(PQresultStatus(res)));
+
+ res = PQgetResult(conn);
+ if (res != NULL)
+ pg_fatal("Expected null result, got %s",
+ PQresStatus(PQresultStatus(res)));
+
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+ PQerrorMessage(conn));
+
+ if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+ pg_fatal("Unexpected result code %s from second pipeline sync",
+ PQresStatus(PQresultStatus(res)));
+
+ /* We're still in pipeline mode ... */
+ if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
+ pg_fatal("Fell out of pipeline mode somehow");
+
+ /* until we end it, which we can safely do now */
+ if (PQexitPipelineMode(conn) != 1)
+ pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
+ PQerrorMessage(conn));
+
+ if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
+ pg_fatal("exiting pipeline mode didn't seem to work");
+
+ fprintf(stderr, "ok\n");
+}
+
+/*
+ * When an operation in a pipeline fails the rest of the pipeline is flushed. We
+ * still have to get results for each pipeline item, but the item will just be
+ * a PGRES_PIPELINE_ABORTED code.
+ *
+ * This intentionally doesn't use a transaction to wrap the pipeline. You should
+ * usually use an xact, but in this case we want to observe the effects of each
+ * statement.
+ */
+static void
+test_pipeline_abort(PGconn *conn)
+{
+ PGresult *res = NULL;
+ const char *dummy_params[1] = {"1"};
+ Oid dummy_param_oids[1] = {INT4OID};
+ int i;
+ bool goterror;
+
+ fprintf(stderr, "aborted pipeline... ");
+
+ res = PQexec(conn, drop_table_sql);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("dispatching DROP TABLE failed: %s", PQerrorMessage(conn));
+
+ res = PQexec(conn, create_table_sql);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("dispatching CREATE TABLE failed: %s", PQerrorMessage(conn));
+
+ /*
+ * Queue up a couple of small pipelines and process each without returning
+ * to command mode first. Make sure the second operation in the first
+ * pipeline ERRORs.
+ */
+ if (PQenterPipelineMode(conn) != 1)
+ pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+
+ dummy_params[0] = "1";
+ if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
+ dummy_params, NULL, NULL, 0) != 1)
+ pg_fatal("dispatching first insert failed: %s", PQerrorMessage(conn));
+
+ if (PQsendQueryParams(conn, "SELECT no_such_function($1)",
+ 1, dummy_param_oids, dummy_params,
+ NULL, NULL, 0) != 1)
+ pg_fatal("dispatching error select failed: %s", PQerrorMessage(conn));
+
+ dummy_params[0] = "2";
+ if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
+ dummy_params, NULL, NULL, 0) != 1)
+ pg_fatal("dispatching second insert failed: %s", PQerrorMessage(conn));
+
+ if (PQpipelineSync(conn) != 1)
+ pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+
+ dummy_params[0] = "3";
+ if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
+ dummy_params, NULL, NULL, 0) != 1)
+ pg_fatal("dispatching second-pipeline insert failed: %s",
+ PQerrorMessage(conn));
+
+ if (PQpipelineSync(conn) != 1)
+ pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+
+ /*
+ * OK, start processing the pipeline results.
+ *
+ * We should get a command-ok for the first query, then a fatal error and
+ * a pipeline aborted message for the second insert, a pipeline-end, then
+ * a command-ok and a pipeline-ok for the second pipeline operation.
+ */
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("Unexpected result status %s: %s",
+ PQresStatus(PQresultStatus(res)),
+ PQresultErrorMessage(res));
+ PQclear(res);
+
+ /* NULL result to signal end-of-results for this command */
+ if ((res = PQgetResult(conn)) != NULL)
+ pg_fatal("Expected null result, got %s",
+ PQresStatus(PQresultStatus(res)));
+
+ /* Second query caused error, so we expect an error next */
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
+ if (PQresultStatus(res) != PGRES_FATAL_ERROR)
+ pg_fatal("Unexpected result code -- expected PGRES_FATAL_ERROR, got %s",
+ PQresStatus(PQresultStatus(res)));
+ PQclear(res);
+
+ /* NULL result to signal end-of-results for this command */
+ if ((res = PQgetResult(conn)) != NULL)
+ pg_fatal("Expected null result, got %s",
+ PQresStatus(PQresultStatus(res)));
+
+ /*
+ * pipeline should now be aborted.
+ *
+ * Note that we could still queue more queries at this point if we wanted;
+ * they'd get added to a new third pipeline since we've already sent a
+ * second. The aborted flag relates only to the pipeline being received.
+ */
+ if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
+ pg_fatal("pipeline should be flagged as aborted but isn't");
+
+ /* third query in pipeline, the second insert */
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
+ if (PQresultStatus(res) != PGRES_PIPELINE_ABORTED)
+ pg_fatal("Unexpected result code -- expected PGRES_PIPELINE_ABORTED, got %s",
+ PQresStatus(PQresultStatus(res)));
+ PQclear(res);
+
+ /* NULL result to signal end-of-results for this command */
+ if ((res = PQgetResult(conn)) != NULL)
+ pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
+
+ if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
+ pg_fatal("pipeline should be flagged as aborted but isn't");
+
+ /* Ensure we're still in pipeline */
+ if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
+ pg_fatal("Fell out of pipeline mode somehow");
+
+ /*
+ * The end of a failed pipeline is a PGRES_PIPELINE_SYNC.
+ *
+ * (This is so clients know to start processing results normally again and
+ * can tell the difference between skipped commands and the sync.)
+ */
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
+ if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+ pg_fatal("Unexpected result code from first pipeline sync\n"
+ "Expected PGRES_PIPELINE_SYNC, got %s",
+ PQresStatus(PQresultStatus(res)));
+ PQclear(res);
+
+ if (PQpipelineStatus(conn) == PQ_PIPELINE_ABORTED)
+ pg_fatal("sync should've cleared the aborted flag but didn't");
+
+ /* We're still in pipeline mode... */
+ if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
+ pg_fatal("Fell out of pipeline mode somehow");
+
+ /* the insert from the second pipeline */
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("Unexpected result code %s from first item in second pipeline",
+ PQresStatus(PQresultStatus(res)));
+ PQclear(res);
+
+ /* Read the NULL result at the end of the command */
+ if ((res = PQgetResult(conn)) != NULL)
+ pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
+
+ /* the second pipeline sync */
+ if ((res = PQgetResult(conn)) == NULL)
+ pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
+ if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+ pg_fatal("Unexpected result code %s from second pipeline sync",
+ PQresStatus(PQresultStatus(res)));
+ PQclear(res);
+
+ if ((res = PQgetResult(conn)) != NULL)
+ pg_fatal("Expected null result, got %s: %s",
+ PQresStatus(PQresultStatus(res)),
+ PQerrorMessage(conn));
+
+ /* Try to send two queries in one command */
+ if (PQsendQuery(conn, "SELECT 1; SELECT 2") != 1)
+ pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+ if (PQpipelineSync(conn) != 1)
+ pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+ goterror = false;
+ while ((res = PQgetResult(conn)) != NULL)
+ {
+ switch (PQresultStatus(res))
+ {
+ case PGRES_FATAL_ERROR:
+ if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "42601") != 0)
+ pg_fatal("expected error about multiple commands, got %s",
+ PQerrorMessage(conn));
+ printf("got expected %s", PQerrorMessage(conn));
+ goterror = true;
+ break;
+ default:
+ pg_fatal("got unexpected status %s", PQresStatus(PQresultStatus(res)));
+ break;
+ }
+ }
+ if (!goterror)
+ pg_fatal("did not get cannot-insert-multiple-commands error");
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("got NULL result");
+ if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+ pg_fatal("Unexpected result code %s from pipeline sync",
+ PQresStatus(PQresultStatus(res)));
+
+ /* Test single-row mode with an error partways */
+ if (PQsendQuery(conn, "SELECT 1.0/g FROM generate_series(3, -1, -1) g") != 1)
+ pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+ if (PQpipelineSync(conn) != 1)
+ pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+ PQsetSingleRowMode(conn);
+ goterror = false;
+ while ((res = PQgetResult(conn)) != NULL)
+ {
+ switch (PQresultStatus(res))
+ {
+ case PGRES_SINGLE_TUPLE:
+ printf("got row: %s\n", PQgetvalue(res, 0, 0));
+ break;
+ case PGRES_FATAL_ERROR:
+ if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "22012") != 0)
+ pg_fatal("expected division-by-zero, got: %s (%s)",
+ PQerrorMessage(conn),
+ PQresultErrorField(res, PG_DIAG_SQLSTATE));
+ printf("got expected division-by-zero\n");
+ goterror = true;
+ break;
+ default:
+ pg_fatal("got unexpected result %s", PQresStatus(PQresultStatus(res)));
+ }
+ PQclear(res);
+ }
+ if (!goterror)
+ pg_fatal("did not get division-by-zero error");
+ /* the third pipeline sync */
+ if ((res = PQgetResult(conn)) == NULL)
+ pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
+ if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+ pg_fatal("Unexpected result code %s from third pipeline sync",
+ PQresStatus(PQresultStatus(res)));
+ PQclear(res);
+
+ /* We're still in pipeline mode... */
+ if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
+ pg_fatal("Fell out of pipeline mode somehow");
+
+ /* until we end it, which we can safely do now */
+ if (PQexitPipelineMode(conn) != 1)
+ pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
+ PQerrorMessage(conn));
+
+ if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
+ pg_fatal("exiting pipeline mode didn't seem to work");
+
+ fprintf(stderr, "ok\n");
+
+ /*-
+ * Since we fired the pipelines off without a surrounding xact, the results
+ * should be:
+ *
+ * - Implicit xact started by server around 1st pipeline
+ * - First insert applied
+ * - Second statement aborted xact
+ * - Third insert skipped
+ * - Sync rolled back first implicit xact
+ * - Implicit xact created by server around 2nd pipeline
+ * - insert applied from 2nd pipeline
+ * - Sync commits 2nd xact
+ *
+ * So we should only have the value 3 that we inserted.
+ */
+ res = PQexec(conn, "SELECT itemno FROM pq_pipeline_demo");
+
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pg_fatal("Expected tuples, got %s: %s",
+ PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+ if (PQntuples(res) != 1)
+ pg_fatal("expected 1 result, got %d", PQntuples(res));
+ for (i = 0; i < PQntuples(res); i++)
+ {
+ const char *val = PQgetvalue(res, i, 0);
+
+ if (strcmp(val, "3") != 0)
+ pg_fatal("expected only insert with value 3, got %s", val);
+ }
+
+ PQclear(res);
+}
+
+/* State machine enum for test_pipelined_insert */
+enum PipelineInsertStep
+{
+ BI_BEGIN_TX,
+ BI_DROP_TABLE,
+ BI_CREATE_TABLE,
+ BI_PREPARE,
+ BI_INSERT_ROWS,
+ BI_COMMIT_TX,
+ BI_SYNC,
+ BI_DONE
+};
+
+static void
+test_pipelined_insert(PGconn *conn, int n_rows)
+{
+ const char *insert_params[1];
+ Oid insert_param_oids[1] = {INT4OID};
+ char insert_param_0[MAXINTLEN];
+ enum PipelineInsertStep send_step = BI_BEGIN_TX,
+ recv_step = BI_BEGIN_TX;
+ int rows_to_send,
+ rows_to_receive;
+
+ insert_params[0] = &insert_param_0[0];
+
+ rows_to_send = rows_to_receive = n_rows;
+
+ /*
+ * Do a pipelined insert into a table created at the start of the pipeline
+ */
+ if (PQenterPipelineMode(conn) != 1)
+ pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+
+ while (send_step != BI_PREPARE)
+ {
+ const char *sql;
+
+ switch (send_step)
+ {
+ case BI_BEGIN_TX:
+ sql = "BEGIN TRANSACTION";
+ send_step = BI_DROP_TABLE;
+ break;
+
+ case BI_DROP_TABLE:
+ sql = drop_table_sql;
+ send_step = BI_CREATE_TABLE;
+ break;
+
+ case BI_CREATE_TABLE:
+ sql = create_table_sql;
+ send_step = BI_PREPARE;
+ break;
+
+ default:
+ pg_fatal("invalid state");
+ }
+
+ pg_debug("sending: %s\n", sql);
+ if (PQsendQueryParams(conn, sql,
+ 0, NULL, NULL, NULL, NULL, 0) != 1)
+ pg_fatal("dispatching %s failed: %s", sql, PQerrorMessage(conn));
+ }
+
+ Assert(send_step == BI_PREPARE);
+ pg_debug("sending: %s\n", insert_sql);
+ if (PQsendPrepare(conn, "my_insert", insert_sql, 1, insert_param_oids) != 1)
+ pg_fatal("dispatching PREPARE failed: %s", PQerrorMessage(conn));
+ send_step = BI_INSERT_ROWS;
+
+ /*
+ * Now we start inserting. We'll be sending enough data that we could fill
+ * our output buffer, so to avoid deadlocking we need to enter nonblocking
+ * mode and consume input while we send more output. As results of each
+ * query are processed we should pop them to allow processing of the next
+ * query. There's no need to finish the pipeline before processing
+ * results.
+ */
+ if (PQsetnonblocking(conn, 1) != 0)
+ pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
+
+ while (recv_step != BI_DONE)
+ {
+ int sock;
+ fd_set input_mask;
+ fd_set output_mask;
+
+ sock = PQsocket(conn);
+
+ if (sock < 0)
+ break; /* shouldn't happen */
+
+ FD_ZERO(&input_mask);
+ FD_SET(sock, &input_mask);
+ FD_ZERO(&output_mask);
+ FD_SET(sock, &output_mask);
+
+ if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
+ {
+ fprintf(stderr, "select() failed: %s\n", strerror(errno));
+ exit_nicely(conn);
+ }
+
+ /*
+ * Process any results, so we keep the server's output buffer free
+ * flowing and it can continue to process input
+ */
+ if (FD_ISSET(sock, &input_mask))
+ {
+ PQconsumeInput(conn);
+
+ /* Read until we'd block if we tried to read */
+ while (!PQisBusy(conn) && recv_step < BI_DONE)
+ {
+ PGresult *res;
+ const char *cmdtag;
+ const char *description = "";
+ int status;
+
+ /*
+ * Read next result. If no more results from this query,
+ * advance to the next query
+ */
+ res = PQgetResult(conn);
+ if (res == NULL)
+ continue;
+
+ status = PGRES_COMMAND_OK;
+ switch (recv_step)
+ {
+ case BI_BEGIN_TX:
+ cmdtag = "BEGIN";
+ recv_step++;
+ break;
+ case BI_DROP_TABLE:
+ cmdtag = "DROP TABLE";
+ recv_step++;
+ break;
+ case BI_CREATE_TABLE:
+ cmdtag = "CREATE TABLE";
+ recv_step++;
+ break;
+ case BI_PREPARE:
+ cmdtag = "";
+ description = "PREPARE";
+ recv_step++;
+ break;
+ case BI_INSERT_ROWS:
+ cmdtag = "INSERT";
+ rows_to_receive--;
+ if (rows_to_receive == 0)
+ recv_step++;
+ break;
+ case BI_COMMIT_TX:
+ cmdtag = "COMMIT";
+ recv_step++;
+ break;
+ case BI_SYNC:
+ cmdtag = "";
+ description = "SYNC";
+ status = PGRES_PIPELINE_SYNC;
+ recv_step++;
+ break;
+ case BI_DONE:
+ /* unreachable */
+ description = "";
+ abort();
+ }
+
+ if (PQresultStatus(res) != status)
+ pg_fatal("%s reported status %s, expected %s\n"
+ "Error message: \"%s\"",
+ description, PQresStatus(PQresultStatus(res)),
+ PQresStatus(status), PQerrorMessage(conn));
+
+ if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
+ pg_fatal("%s expected command tag '%s', got '%s'",
+ description, cmdtag, PQcmdStatus(res));
+
+ pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? cmdtag : description);
+
+ PQclear(res);
+ }
+ }
+
+ /* Write more rows and/or the end pipeline message, if needed */
+ if (FD_ISSET(sock, &output_mask))
+ {
+ PQflush(conn);
+
+ if (send_step == BI_INSERT_ROWS)
+ {
+ snprintf(&insert_param_0[0], MAXINTLEN, "%d", rows_to_send);
+
+ if (PQsendQueryPrepared(conn, "my_insert",
+ 1, insert_params, NULL, NULL, 0) == 1)
+ {
+ pg_debug("sent row %d\n", rows_to_send);
+
+ rows_to_send--;
+ if (rows_to_send == 0)
+ send_step++;
+ }
+ else
+ {
+ /*
+ * in nonblocking mode, so it's OK for an insert to fail
+ * to send
+ */
+ fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
+ rows_to_send, PQerrorMessage(conn));
+ }
+ }
+ else if (send_step == BI_COMMIT_TX)
+ {
+ if (PQsendQueryParams(conn, "COMMIT",
+ 0, NULL, NULL, NULL, NULL, 0) == 1)
+ {
+ pg_debug("sent COMMIT\n");
+ send_step++;
+ }
+ else
+ {
+ fprintf(stderr, "WARNING: failed to send commit: %s\n",
+ PQerrorMessage(conn));
+ }
+ }
+ else if (send_step == BI_SYNC)
+ {
+ if (PQpipelineSync(conn) == 1)
+ {
+ fprintf(stdout, "pipeline sync sent\n");
+ send_step++;
+ }
+ else
+ {
+ fprintf(stderr, "WARNING: pipeline sync failed: %s\n",
+ PQerrorMessage(conn));
+ }
+ }
+ }
+ }
+
+ /* We've got the sync message and the pipeline should be done */
+ if (PQexitPipelineMode(conn) != 1)
+ pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
+ PQerrorMessage(conn));
+
+ if (PQsetnonblocking(conn, 0) != 0)
+ pg_fatal("failed to clear nonblocking mode: %s", PQerrorMessage(conn));
+
+ fprintf(stderr, "ok\n");
+}
+
+static void
+test_prepared(PGconn *conn)
+{
+ PGresult *res = NULL;
+ Oid param_oids[1] = {INT4OID};
+ Oid expected_oids[4];
+ Oid typ;
+
+ fprintf(stderr, "prepared... ");
+
+ if (PQenterPipelineMode(conn) != 1)
+ pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+ if (PQsendPrepare(conn, "select_one", "SELECT $1, '42', $1::numeric, "
+ "interval '1 sec'",
+ 1, param_oids) != 1)
+ pg_fatal("preparing query failed: %s", PQerrorMessage(conn));
+ expected_oids[0] = INT4OID;
+ expected_oids[1] = TEXTOID;
+ expected_oids[2] = NUMERICOID;
+ expected_oids[3] = INTERVALOID;
+ if (PQsendDescribePrepared(conn, "select_one") != 1)
+ pg_fatal("failed to send describePrepared: %s", PQerrorMessage(conn));
+ if (PQpipelineSync(conn) != 1)
+ pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("PQgetResult returned null");
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
+ PQclear(res);
+ res = PQgetResult(conn);
+ if (res != NULL)
+ pg_fatal("expected NULL result");
+
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("PQgetResult returned NULL");
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
+ if (PQnfields(res) != lengthof(expected_oids))
+ pg_fatal("expected %d columns, got %d",
+ lengthof(expected_oids), PQnfields(res));
+ for (int i = 0; i < PQnfields(res); i++)
+ {
+ typ = PQftype(res, i);
+ if (typ != expected_oids[i])
+ pg_fatal("field %d: expected type %u, got %u",
+ i, expected_oids[i], typ);
+ }
+ PQclear(res);
+ res = PQgetResult(conn);
+ if (res != NULL)
+ pg_fatal("expected NULL result");
+
+ res = PQgetResult(conn);
+ if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+ pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
+
+ if (PQexitPipelineMode(conn) != 1)
+ pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
+
+ PQexec(conn, "BEGIN");
+ PQexec(conn, "DECLARE cursor_one CURSOR FOR SELECT 1");
+ PQenterPipelineMode(conn);
+ if (PQsendDescribePortal(conn, "cursor_one") != 1)
+ pg_fatal("PQsendDescribePortal failed: %s", PQerrorMessage(conn));
+ if (PQpipelineSync(conn) != 1)
+ pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("expected NULL result");
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
+
+ typ = PQftype(res, 0);
+ if (typ != INT4OID)
+ pg_fatal("portal: expected type %u, got %u",
+ INT4OID, typ);
+ PQclear(res);
+ res = PQgetResult(conn);
+ if (res != NULL)
+ pg_fatal("expected NULL result");
+ res = PQgetResult(conn);
+ if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+ pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
+
+ if (PQexitPipelineMode(conn) != 1)
+ pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
+
+ fprintf(stderr, "ok\n");
+}
+
+static void
+test_simple_pipeline(PGconn *conn)
+{
+ PGresult *res = NULL;
+ const char *dummy_params[1] = {"1"};
+ Oid dummy_param_oids[1] = {INT4OID};
+
+ fprintf(stderr, "simple pipeline... ");
+
+ /*
+ * Enter pipeline mode and dispatch a set of operations, which we'll then
+ * process the results of as they come in.
+ *
+ * For a simple case we should be able to do this without interim
+ * processing of results since our output buffer will give us enough slush
+ * to work with and we won't block on sending. So blocking mode is fine.
+ */
+ if (PQisnonblocking(conn))
+ pg_fatal("Expected blocking connection mode");
+
+ if (PQenterPipelineMode(conn) != 1)
+ pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+
+ if (PQsendQueryParams(conn, "SELECT $1",
+ 1, dummy_param_oids, dummy_params,
+ NULL, NULL, 0) != 1)
+ pg_fatal("dispatching SELECT failed: %s", PQerrorMessage(conn));
+
+ if (PQexitPipelineMode(conn) != 0)
+ pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded");
+
+ if (PQpipelineSync(conn) != 1)
+ pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+ PQerrorMessage(conn));
+
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pg_fatal("Unexpected result code %s from first pipeline item",
+ PQresStatus(PQresultStatus(res)));
+
+ PQclear(res);
+ res = NULL;
+
+ if (PQgetResult(conn) != NULL)
+ pg_fatal("PQgetResult returned something extra after first query result.");
+
+ /*
+ * Even though we've processed the result there's still a sync to come and
+ * we can't exit pipeline mode yet
+ */
+ if (PQexitPipelineMode(conn) != 0)
+ pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
+
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("PQgetResult returned null when sync result PGRES_PIPELINE_SYNC expected: %s",
+ PQerrorMessage(conn));
+
+ if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+ pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
+ PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+
+ PQclear(res);
+ res = NULL;
+
+ if (PQgetResult(conn) != NULL)
+ pg_fatal("PQgetResult returned something extra after pipeline end: %s",
+ PQresStatus(PQresultStatus(res)));
+
+ /* We're still in pipeline mode... */
+ if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
+ pg_fatal("Fell out of pipeline mode somehow");
+
+ /* ... until we end it, which we can safely do now */
+ if (PQexitPipelineMode(conn) != 1)
+ pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
+ PQerrorMessage(conn));
+
+ if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
+ pg_fatal("Exiting pipeline mode didn't seem to work");
+
+ fprintf(stderr, "ok\n");
+}
+
+static void
+test_singlerowmode(PGconn *conn)
+{
+ PGresult *res;
+ int i;
+ bool pipeline_ended = false;
+
+ /* 1 pipeline, 3 queries in it */
+ if (PQenterPipelineMode(conn) != 1)
+ pg_fatal("failed to enter pipeline mode: %s",
+ PQerrorMessage(conn));
+
+ for (i = 0; i < 3; i++)
+ {
+ char *param[1];
+
+ param[0] = psprintf("%d", 44 + i);
+
+ if (PQsendQueryParams(conn,
+ "SELECT generate_series(42, $1)",
+ 1,
+ NULL,
+ (const char **) param,
+ NULL,
+ NULL,
+ 0) != 1)
+ pg_fatal("failed to send query: %s",
+ PQerrorMessage(conn));
+ pfree(param[0]);
+ }
+ if (PQpipelineSync(conn) != 1)
+ pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+
+ for (i = 0; !pipeline_ended; i++)
+ {
+ bool first = true;
+ bool saw_ending_tuplesok;
+ bool isSingleTuple = false;
+
+ /* Set single row mode for only first 2 SELECT queries */
+ if (i < 2)
+ {
+ if (PQsetSingleRowMode(conn) != 1)
+ pg_fatal("PQsetSingleRowMode() failed for i=%d", i);
+ }
+
+ /* Consume rows for this query */
+ saw_ending_tuplesok = false;
+ while ((res = PQgetResult(conn)) != NULL)
+ {
+ ExecStatusType est = PQresultStatus(res);
+
+ if (est == PGRES_PIPELINE_SYNC)
+ {
+ fprintf(stderr, "end of pipeline reached\n");
+ pipeline_ended = true;
+ PQclear(res);
+ if (i != 3)
+ pg_fatal("Expected three results, got %d", i);
+ break;
+ }
+
+ /* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */
+ if (first)
+ {
+ if (i <= 1 && est != PGRES_SINGLE_TUPLE)
+ pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s",
+ i, PQresStatus(est));
+ if (i >= 2 && est != PGRES_TUPLES_OK)
+ pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s",
+ i, PQresStatus(est));
+ first = false;
+ }
+
+ fprintf(stderr, "Result status %s for query %d", PQresStatus(est), i);
+ switch (est)
+ {
+ case PGRES_TUPLES_OK:
+ fprintf(stderr, ", tuples: %d\n", PQntuples(res));
+ saw_ending_tuplesok = true;
+ if (isSingleTuple)
+ {
+ if (PQntuples(res) == 0)
+ fprintf(stderr, "all tuples received in query %d\n", i);
+ else
+ pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");
+ }
+ break;
+
+ case PGRES_SINGLE_TUPLE:
+ isSingleTuple = true;
+ fprintf(stderr, ", %d tuple: %s\n", PQntuples(res), PQgetvalue(res, 0, 0));
+ break;
+
+ default:
+ pg_fatal("unexpected");
+ }
+ PQclear(res);
+ }
+ if (!pipeline_ended && !saw_ending_tuplesok)
+ pg_fatal("didn't get expected terminating TUPLES_OK");
+ }
+
+ if (PQexitPipelineMode(conn) != 1)
+ pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
+}
+
+/*
+ * Simple test to verify that a pipeline is discarded as a whole when there's
+ * an error, ignoring transaction commands.
+ */
+static void
+test_transaction(PGconn *conn)
+{
+ PGresult *res;
+ bool expect_null;
+ int num_syncs = 0;
+
+ res = PQexec(conn, "DROP TABLE IF EXISTS pq_pipeline_tst;"
+ "CREATE TABLE pq_pipeline_tst (id int)");
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("failed to create test table: %s",
+ PQerrorMessage(conn));
+ PQclear(res);
+
+ if (PQenterPipelineMode(conn) != 1)
+ pg_fatal("failed to enter pipeline mode: %s",
+ PQerrorMessage(conn));
+ if (PQsendPrepare(conn, "rollback", "ROLLBACK", 0, NULL) != 1)
+ pg_fatal("could not send prepare on pipeline: %s",
+ PQerrorMessage(conn));
+
+ if (PQsendQueryParams(conn,
+ "BEGIN",
+ 0, NULL, NULL, NULL, NULL, 0) != 1)
+ pg_fatal("failed to send query: %s",
+ PQerrorMessage(conn));
+ if (PQsendQueryParams(conn,
+ "SELECT 0/0",
+ 0, NULL, NULL, NULL, NULL, 0) != 1)
+ pg_fatal("failed to send query: %s",
+ PQerrorMessage(conn));
+
+ /*
+ * send a ROLLBACK using a prepared stmt. Doesn't work because we need to
+ * get out of the pipeline-aborted state first.
+ */
+ if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
+ pg_fatal("failed to execute prepared: %s",
+ PQerrorMessage(conn));
+
+ /* This insert fails because we're in pipeline-aborted state */
+ if (PQsendQueryParams(conn,
+ "INSERT INTO pq_pipeline_tst VALUES (1)",
+ 0, NULL, NULL, NULL, NULL, 0) != 1)
+ pg_fatal("failed to send query: %s",
+ PQerrorMessage(conn));
+ if (PQpipelineSync(conn) != 1)
+ pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+ num_syncs++;
+
+ /*
+ * This insert fails even though the pipeline got a SYNC, because we're in
+ * an aborted transaction
+ */
+ if (PQsendQueryParams(conn,
+ "INSERT INTO pq_pipeline_tst VALUES (2)",
+ 0, NULL, NULL, NULL, NULL, 0) != 1)
+ pg_fatal("failed to send query: %s",
+ PQerrorMessage(conn));
+ if (PQpipelineSync(conn) != 1)
+ pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+ num_syncs++;
+
+ /*
+ * Send ROLLBACK using prepared stmt. This one works because we just did
+ * PQpipelineSync above.
+ */
+ if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
+ pg_fatal("failed to execute prepared: %s",
+ PQerrorMessage(conn));
+
+ /*
+ * Now that we're out of a transaction and in pipeline-good mode, this
+ * insert works
+ */
+ if (PQsendQueryParams(conn,
+ "INSERT INTO pq_pipeline_tst VALUES (3)",
+ 0, NULL, NULL, NULL, NULL, 0) != 1)
+ pg_fatal("failed to send query: %s",
+ PQerrorMessage(conn));
+ /* Send two syncs now -- match up to SYNC messages below */
+ if (PQpipelineSync(conn) != 1)
+ pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+ num_syncs++;
+ if (PQpipelineSync(conn) != 1)
+ pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+ num_syncs++;
+
+ expect_null = false;
+ for (int i = 0;; i++)
+ {
+ ExecStatusType restype;
+
+ res = PQgetResult(conn);
+ if (res == NULL)
+ {
+ printf("%d: got NULL result\n", i);
+ if (!expect_null)
+ pg_fatal("did not expect NULL here");
+ expect_null = false;
+ continue;
+ }
+ restype = PQresultStatus(res);
+ printf("%d: got status %s", i, PQresStatus(restype));
+ if (expect_null)
+ pg_fatal("expected NULL");
+ if (restype == PGRES_FATAL_ERROR)
+ printf("; error: %s", PQerrorMessage(conn));
+ else if (restype == PGRES_PIPELINE_ABORTED)
+ {
+ printf(": command didn't run because pipeline aborted\n");
+ }
+ else
+ printf("\n");
+ PQclear(res);
+
+ if (restype == PGRES_PIPELINE_SYNC)
+ num_syncs--;
+ else
+ expect_null = true;
+ if (num_syncs <= 0)
+ break;
+ }
+ if (PQgetResult(conn) != NULL)
+ pg_fatal("returned something extra after all the syncs: %s",
+ PQresStatus(PQresultStatus(res)));
+
+ if (PQexitPipelineMode(conn) != 1)
+ pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
+
+ /* We expect to find one tuple containing the value "3" */
+ res = PQexec(conn, "SELECT * FROM pq_pipeline_tst");
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pg_fatal("failed to obtain result: %s", PQerrorMessage(conn));
+ if (PQntuples(res) != 1)
+ pg_fatal("did not get 1 tuple");
+ if (strcmp(PQgetvalue(res, 0, 0), "3") != 0)
+ pg_fatal("did not get expected tuple");
+ PQclear(res);
+
+ fprintf(stderr, "ok\n");
+}
+
+static void
+usage(const char *progname)
+{
+ fprintf(stderr, "%s tests libpq's pipeline mode.\n\n", progname);
+ fprintf(stderr, "Usage:\n");
+ fprintf(stderr, " %s tests", progname);
+ fprintf(stderr, " %s testname [conninfo [number_of_rows]]\n", progname);
+}
+
+static void
+print_test_list(void)
+{
+ printf("disallowed_in_pipeline\n");
+ printf("multi_pipelines\n");
+ printf("pipeline_abort\n");
+ printf("pipelined_insert\n");
+ printf("prepared\n");
+ printf("simple_pipeline\n");
+ printf("singlerow\n");
+ printf("transaction\n");
+}
+
+int
+main(int argc, char **argv)
+{
+ const char *conninfo = "";
+ PGconn *conn;
+ int numrows = 10000;
+ PGresult *res;
+
+ if (strcmp(argv[1], "tests") == 0)
+ {
+ print_test_list();
+ exit(0);
+ }
+
+ /*
+ * The testname parameter is mandatory; it can be followed by a conninfo
+ * string and number of rows.
+ */
+ if (argc < 2 || argc > 4)
+ {
+ usage(argv[0]);
+ exit(1);
+ }
+
+ if (argc >= 3)
+ conninfo = pg_strdup(argv[2]);
+
+ if (argc >= 4)
+ {
+ errno = 0;
+ numrows = strtol(argv[3], NULL, 10);
+ if (errno != 0 || numrows <= 0)
+ {
+ fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n", argv[3]);
+ exit(1);
+ }
+ }
+
+ /* Make a connection to the database */
+ conn = PQconnectdb(conninfo);
+ if (PQstatus(conn) != CONNECTION_OK)
+ {
+ fprintf(stderr, "Connection to database failed: %s\n",
+ PQerrorMessage(conn));
+ exit_nicely(conn);
+ }
+ res = PQexec(conn, "SET lc_messages TO \"C\"");
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("failed to set lc_messages: %s", PQerrorMessage(conn));
+
+ if (strcmp(argv[1], "disallowed_in_pipeline") == 0)
+ test_disallowed_in_pipeline(conn);
+ else if (strcmp(argv[1], "multi_pipelines") == 0)
+ test_multi_pipelines(conn);
+ else if (strcmp(argv[1], "pipeline_abort") == 0)
+ test_pipeline_abort(conn);
+ else if (strcmp(argv[1], "pipelined_insert") == 0)
+ test_pipelined_insert(conn, numrows);
+ else if (strcmp(argv[1], "prepared") == 0)
+ test_prepared(conn);
+ else if (strcmp(argv[1], "simple_pipeline") == 0)
+ test_simple_pipeline(conn);
+ else if (strcmp(argv[1], "singlerow") == 0)
+ test_singlerowmode(conn);
+ else if (strcmp(argv[1], "transaction") == 0)
+ test_transaction(conn);
+ else
+ {
+ fprintf(stderr, "\"%s\" is not a recognized test name\n", argv[1]);
+ usage(argv[0]);
+ exit(1);
+ }
+
+ /* close the connection to the database and cleanup */
+ PQfinish(conn);
+ return 0;
+}
diff --git a/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl b/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl
new file mode 100644
index 0000000000..ba15b64ca7
--- /dev/null
+++ b/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl
@@ -0,0 +1,28 @@
+use strict;
+use warnings;
+
+use Config;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 8;
+use Cwd;
+
+my $node = get_new_node('main');
+$node->init;
+$node->start;
+
+my $numrows = 10000;
+$ENV{PATH} = "$ENV{PATH}:" . getcwd();
+
+my ($out, $err) = run_command(['libpq_pipeline', 'tests']);
+die "oops: $err" unless $err eq '';
+my @tests = split(/\s/, $out);
+
+for my $testname (@tests)
+{
+ $node->command_ok(
+ [ 'libpq_pipeline', $testname, $node->connstr('postgres'), $numrows ],
+ "libpq_pipeline $testname");
+}
+
+$node->stop('fast');
diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm
index 74fde40e3a..a184404e21 100644
--- a/src/tools/msvc/Mkvcbuild.pm
+++ b/src/tools/msvc/Mkvcbuild.pm
@@ -33,10 +33,11 @@ my @unlink_on_exit;
# Set of variables for modules in contrib/ and src/test/modules/
my $contrib_defines = { 'refint' => 'REFINT_VERBOSE' };
-my @contrib_uselibpq = ('dblink', 'oid2name', 'postgres_fdw', 'vacuumlo');
-my @contrib_uselibpgport = ('oid2name', 'vacuumlo');
-my @contrib_uselibpgcommon = ('oid2name', 'vacuumlo');
-my $contrib_extralibs = undef;
+my @contrib_uselibpq =
+ ('dblink', 'oid2name', 'postgres_fdw', 'vacuumlo', 'libpq_pipeline');
+my @contrib_uselibpgport = ('libpq_pipeline', 'oid2name', 'vacuumlo');
+my @contrib_uselibpgcommon = ('libpq_pipeline', 'oid2name', 'vacuumlo');
+my $contrib_extralibs = { 'libpq_pipeline' => ['ws2_32.lib'] };
my $contrib_extraincludes = { 'dblink' => ['src/backend'] };
my $contrib_extrasource = {
'cube' => [ 'contrib/cube/cubescan.l', 'contrib/cube/cubeparse.y' ],
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 61cf4eae1f..9e6777e9d0 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1563,10 +1563,12 @@ PG_Locale_Strategy
PG_Lock_Status
PG_init_t
PGcancel
+PGcmdQueueEntry
PGconn
PGdataValue
PGlobjfuncs
PGnotify
+PGpipelineStatus
PGresAttDesc
PGresAttValue
PGresParamDesc