diff options
author | Alvaro Herrera <alvherre@alvh.no-ip.org> | 2021-03-15 18:13:42 -0300 |
---|---|---|
committer | Alvaro Herrera <alvherre@alvh.no-ip.org> | 2021-03-15 18:13:42 -0300 |
commit | acb7e4eb6b1c614c68a62fb3a6a5bba1af0a2659 (patch) | |
tree | ff5dccb6a8372d0373a442841d8df4333a234eaa /src/interfaces/libpq/fe-exec.c | |
parent | 146cb3889c3ccb3fce198fe7464a1296a9e107c3 (diff) | |
download | postgresql-acb7e4eb6b1c614c68a62fb3a6a5bba1af0a2659.tar.gz |
Implement pipeline mode in libpq
Pipeline mode in libpq lets an application avoid the Sync messages in
the FE/BE protocol that are implicit in the old libpq API after each
query. The application can then insert Sync at its leisure with a new
libpq function PQpipelineSync. This can lead to substantial reductions
in query latency.
Co-authored-by: Craig Ringer <craig.ringer@enterprisedb.com>
Co-authored-by: Matthieu Garrigues <matthieu.garrigues@gmail.com>
Co-authored-by: Álvaro Herrera <alvherre@alvh.no-ip.org>
Reviewed-by: Andres Freund <andres@anarazel.de>
Reviewed-by: Aya Iwata <iwata.aya@jp.fujitsu.com>
Reviewed-by: Daniel Vérité <daniel@manitou-mail.org>
Reviewed-by: David G. Johnston <david.g.johnston@gmail.com>
Reviewed-by: Justin Pryzby <pryzby@telsasoft.com>
Reviewed-by: Kirk Jamison <k.jamison@fujitsu.com>
Reviewed-by: Michael Paquier <michael.paquier@gmail.com>
Reviewed-by: Nikhil Sontakke <nikhils@2ndquadrant.com>
Reviewed-by: Vaishnavi Prabakaran <VaishnaviP@fast.au.fujitsu.com>
Reviewed-by: Zhihong Yu <zyu@yugabyte.com>
Discussion: https://postgr.es/m/CAMsr+YFUjJytRyV4J-16bEoiZyH=4nj+sQ7JP9ajwz=B4dMMZw@mail.gmail.com
Discussion: https://postgr.es/m/CAJkzx4T5E-2cQe3dtv2R78dYFvz+in8PY7A8MArvLhs_pg75gg@mail.gmail.com
Diffstat (limited to 'src/interfaces/libpq/fe-exec.c')
-rw-r--r-- | src/interfaces/libpq/fe-exec.c | 717 |
1 files changed, 636 insertions, 81 deletions
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index 9a038043b2..f3443708a6 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -39,7 +39,9 @@ char *const pgresStatus[] = { "PGRES_NONFATAL_ERROR", "PGRES_FATAL_ERROR", "PGRES_COPY_BOTH", - "PGRES_SINGLE_TUPLE" + "PGRES_SINGLE_TUPLE", + "PGRES_PIPELINE_SYNC", + "PGRES_PIPELINE_ABORTED" }; /* @@ -71,6 +73,8 @@ static PGresult *PQexecFinish(PGconn *conn); static int PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target); static int check_field_number(const PGresult *res, int field_num); +static void pqPipelineProcessQueue(PGconn *conn); +static int pqPipelineFlush(PGconn *conn); /* ---------------- @@ -1171,7 +1175,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp) conn->next_result = conn->result; conn->result = res; /* And mark the result ready to return */ - conn->asyncStatus = PGASYNC_READY; + conn->asyncStatus = PGASYNC_READY_MORE; } return 1; @@ -1185,6 +1189,87 @@ fail: /* + * pqAllocCmdQueueEntry + * Get a command queue entry for caller to fill. + * + * If the recycle queue has a free element, that is returned; if not, a + * fresh one is allocated. Caller is responsible for adding it to the + * command queue (pqAppendCmdQueueEntry) once the struct is filled in, or + * releasing the memory (pqRecycleCmdQueueEntry) if an error occurs. + * + * If allocation fails, sets the error message and returns NULL. + */ +static PGcmdQueueEntry * +pqAllocCmdQueueEntry(PGconn *conn) +{ + PGcmdQueueEntry *entry; + + if (conn->cmd_queue_recycle == NULL) + { + entry = (PGcmdQueueEntry *) malloc(sizeof(PGcmdQueueEntry)); + if (entry == NULL) + { + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext("out of memory\n")); + return NULL; + } + } + else + { + entry = conn->cmd_queue_recycle; + conn->cmd_queue_recycle = entry->next; + } + entry->next = NULL; + entry->query = NULL; + + return entry; +} + +/* + * pqAppendCmdQueueEntry + * Append a caller-allocated command queue entry to the queue. + * + * The query itself must already have been put in the output buffer by the + * caller. + */ +static void +pqAppendCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry) +{ + Assert(entry->next == NULL); + + if (conn->cmd_queue_head == NULL) + conn->cmd_queue_head = entry; + else + conn->cmd_queue_tail->next = entry; + + conn->cmd_queue_tail = entry; +} + +/* + * pqRecycleCmdQueueEntry + * Push a command queue entry onto the freelist. + */ +static void +pqRecycleCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry) +{ + if (entry == NULL) + return; + + /* recyclable entries should not have a follow-on command */ + Assert(entry->next == NULL); + + if (entry->query) + { + free(entry->query); + entry->query = NULL; + } + + entry->next = conn->cmd_queue_recycle; + conn->cmd_queue_recycle = entry; +} + + +/* * PQsendQuery * Submit a query, but don't wait for it to finish * @@ -1209,9 +1294,15 @@ PQsendQueryContinue(PGconn *conn, const char *query) static int PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery) { + PGcmdQueueEntry *entry = NULL; + if (!PQsendQueryStart(conn, newQuery)) return 0; + entry = pqAllocCmdQueueEntry(conn); + if (entry == NULL) + return 0; /* error msg already set */ + /* check the argument */ if (!query) { @@ -1220,37 +1311,75 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery) return 0; } - /* construct the outgoing Query message */ - if (pqPutMsgStart('Q', conn) < 0 || - pqPuts(query, conn) < 0 || - pqPutMsgEnd(conn) < 0) + /* Send the query message(s) */ + if (conn->pipelineStatus == PQ_PIPELINE_OFF) { - /* error message should be set up already */ - return 0; - } + /* construct the outgoing Query message */ + if (pqPutMsgStart('Q', conn) < 0 || + pqPuts(query, conn) < 0 || + pqPutMsgEnd(conn) < 0) + { + /* error message should be set up already */ + return 0; + } - /* remember we are using simple query protocol */ - conn->queryclass = PGQUERY_SIMPLE; + /* remember we are using simple query protocol */ + entry->queryclass = PGQUERY_SIMPLE; + /* and remember the query text too, if possible */ + entry->query = strdup(query); + } + else + { + /* + * In pipeline mode we cannot use the simple protocol, so we send + * Parse, Bind, Describe Portal, Execute. + */ + if (pqPutMsgStart('P', conn) < 0 || + pqPuts("", conn) < 0 || + pqPuts(query, conn) < 0 || + pqPutInt(0, 2, conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; + if (pqPutMsgStart('B', conn) < 0 || + pqPuts("", conn) < 0 || + pqPuts("", conn) < 0 || + pqPutInt(0, 2, conn) < 0 || + pqPutInt(0, 2, conn) < 0 || + pqPutInt(0, 2, conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; + if (pqPutMsgStart('D', conn) < 0 || + pqPutc('P', conn) < 0 || + pqPuts("", conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; + if (pqPutMsgStart('E', conn) < 0 || + pqPuts("", conn) < 0 || + pqPutInt(0, 4, conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; - /* and remember the query text too, if possible */ - /* if insufficient memory, last_query just winds up NULL */ - if (conn->last_query) - free(conn->last_query); - conn->last_query = strdup(query); + entry->queryclass = PGQUERY_EXTENDED; + entry->query = strdup(query); + } /* * Give the data a push. In nonblock mode, don't complain if we're unable * to send it all; PQgetResult() will do any additional flushing needed. */ - if (pqFlush(conn) < 0) - { - /* error message should be set up already */ - return 0; - } + if (pqPipelineFlush(conn) < 0) + goto sendFailed; /* OK, it's launched! */ - conn->asyncStatus = PGASYNC_BUSY; + pqAppendCmdQueueEntry(conn, entry); + if (conn->pipelineStatus == PQ_PIPELINE_OFF) + conn->asyncStatus = PGASYNC_BUSY; return 1; + +sendFailed: + pqRecycleCmdQueueEntry(conn, entry); + /* error message should be set up already */ + return 0; } /* @@ -1307,6 +1436,8 @@ PQsendPrepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes) { + PGcmdQueueEntry *entry = NULL; + if (!PQsendQueryStart(conn, true)) return 0; @@ -1330,6 +1461,10 @@ PQsendPrepare(PGconn *conn, return 0; } + entry = pqAllocCmdQueueEntry(conn); + if (entry == NULL) + return 0; /* error msg already set */ + /* construct the Parse message */ if (pqPutMsgStart('P', conn) < 0 || pqPuts(stmtName, conn) < 0 || @@ -1356,32 +1491,38 @@ PQsendPrepare(PGconn *conn, if (pqPutMsgEnd(conn) < 0) goto sendFailed; - /* construct the Sync message */ - if (pqPutMsgStart('S', conn) < 0 || - pqPutMsgEnd(conn) < 0) - goto sendFailed; + /* Add a Sync, unless in pipeline mode. */ + if (conn->pipelineStatus == PQ_PIPELINE_OFF) + { + if (pqPutMsgStart('S', conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; + } /* remember we are doing just a Parse */ - conn->queryclass = PGQUERY_PREPARE; + entry->queryclass = PGQUERY_PREPARE; /* and remember the query text too, if possible */ - /* if insufficient memory, last_query just winds up NULL */ - if (conn->last_query) - free(conn->last_query); - conn->last_query = strdup(query); + /* if insufficient memory, query just winds up NULL */ + entry->query = strdup(query); + + pqAppendCmdQueueEntry(conn, entry); + + if (conn->pipelineStatus == PQ_PIPELINE_OFF) + conn->asyncStatus = PGASYNC_BUSY; /* - * Give the data a push. In nonblock mode, don't complain if we're unable - * to send it all; PQgetResult() will do any additional flushing needed. + * Give the data a push (in pipeline mode, only if we're past the size + * threshold). In nonblock mode, don't complain if we're unable to send + * it all; PQgetResult() will do any additional flushing needed. */ - if (pqFlush(conn) < 0) + if (pqPipelineFlush(conn) < 0) goto sendFailed; - /* OK, it's launched! */ - conn->asyncStatus = PGASYNC_BUSY; return 1; sendFailed: + pqRecycleCmdQueueEntry(conn, entry); /* error message should be set up already */ return 0; } @@ -1429,7 +1570,8 @@ PQsendQueryPrepared(PGconn *conn, } /* - * Common startup code for PQsendQuery and sibling routines + * PQsendQueryStart + * Common startup code for PQsendQuery and sibling routines */ static bool PQsendQueryStart(PGconn *conn, bool newQuery) @@ -1450,20 +1592,57 @@ PQsendQueryStart(PGconn *conn, bool newQuery) libpq_gettext("no connection to the server\n")); return false; } - /* Can't send while already busy, either. */ - if (conn->asyncStatus != PGASYNC_IDLE) + + /* Can't send while already busy, either, unless enqueuing for later */ + if (conn->asyncStatus != PGASYNC_IDLE && + conn->pipelineStatus == PQ_PIPELINE_OFF) { appendPQExpBufferStr(&conn->errorMessage, libpq_gettext("another command is already in progress\n")); return false; } - /* initialize async result-accumulation state */ - pqClearAsyncResult(conn); + if (conn->pipelineStatus != PQ_PIPELINE_OFF) + { + /* + * When enqueuing commands we don't change much of the connection + * state since it's already in use for the current command. The + * connection state will get updated when pqPipelineProcessQueue() + * advances to start processing the queued message. + * + * Just make sure we can safely enqueue given the current connection + * state. We can enqueue behind another queue item, or behind a + * non-queue command (one that sends its own sync), but we can't + * enqueue if the connection is in a copy state. + */ + switch (conn->asyncStatus) + { + case PGASYNC_IDLE: + case PGASYNC_READY: + case PGASYNC_READY_MORE: + case PGASYNC_BUSY: + /* ok to queue */ + break; + case PGASYNC_COPY_IN: + case PGASYNC_COPY_OUT: + case PGASYNC_COPY_BOTH: + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext("cannot queue commands during COPY\n")); + return false; + } + } + else + { + /* + * This command's results will come in immediately. Initialize async + * result-accumulation state + */ + pqClearAsyncResult(conn); - /* reset single-row processing mode */ - conn->singleRowMode = false; + /* reset single-row processing mode */ + conn->singleRowMode = false; + } /* ready to send command message */ return true; } @@ -1487,10 +1666,16 @@ PQsendQueryGuts(PGconn *conn, int resultFormat) { int i; + PGcmdQueueEntry *entry; + + entry = pqAllocCmdQueueEntry(conn); + if (entry == NULL) + return 0; /* error msg already set */ /* - * We will send Parse (if needed), Bind, Describe Portal, Execute, Sync, - * using specified statement name and the unnamed portal. + * We will send Parse (if needed), Bind, Describe Portal, Execute, Sync + * (if not in pipeline mode), using specified statement name and the + * unnamed portal. */ if (command) @@ -1600,35 +1785,38 @@ PQsendQueryGuts(PGconn *conn, pqPutMsgEnd(conn) < 0) goto sendFailed; - /* construct the Sync message */ - if (pqPutMsgStart('S', conn) < 0 || - pqPutMsgEnd(conn) < 0) - goto sendFailed; + /* construct the Sync message if not in pipeline mode */ + if (conn->pipelineStatus == PQ_PIPELINE_OFF) + { + if (pqPutMsgStart('S', conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; + } /* remember we are using extended query protocol */ - conn->queryclass = PGQUERY_EXTENDED; + entry->queryclass = PGQUERY_EXTENDED; /* and remember the query text too, if possible */ - /* if insufficient memory, last_query just winds up NULL */ - if (conn->last_query) - free(conn->last_query); + /* if insufficient memory, query just winds up NULL */ if (command) - conn->last_query = strdup(command); - else - conn->last_query = NULL; + entry->query = strdup(command); /* - * Give the data a push. In nonblock mode, don't complain if we're unable - * to send it all; PQgetResult() will do any additional flushing needed. + * Give the data a push (in pipeline mode, only if we're past the size + * threshold). In nonblock mode, don't complain if we're unable to send + * it all; PQgetResult() will do any additional flushing needed. */ - if (pqFlush(conn) < 0) + if (pqPipelineFlush(conn) < 0) goto sendFailed; /* OK, it's launched! */ - conn->asyncStatus = PGASYNC_BUSY; + pqAppendCmdQueueEntry(conn, entry); + if (conn->pipelineStatus == PQ_PIPELINE_OFF) + conn->asyncStatus = PGASYNC_BUSY; return 1; sendFailed: + pqRecycleCmdQueueEntry(conn, entry); /* error message should be set up already */ return 0; } @@ -1647,8 +1835,9 @@ PQsetSingleRowMode(PGconn *conn) return 0; if (conn->asyncStatus != PGASYNC_BUSY) return 0; - if (conn->queryclass != PGQUERY_SIMPLE && - conn->queryclass != PGQUERY_EXTENDED) + if (!conn->cmd_queue_head || + (conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE && + conn->cmd_queue_head->queryclass != PGQUERY_EXTENDED)) return 0; if (conn->result) return 0; @@ -1726,14 +1915,17 @@ PQisBusy(PGconn *conn) return conn->asyncStatus == PGASYNC_BUSY || conn->write_failed; } - /* * PQgetResult * Get the next PGresult produced by a query. Returns NULL if no * query work remains or an error has occurred (e.g. out of * memory). + * + * In pipeline mode, once all the result of a query have been returned, + * PQgetResult returns NULL to let the user know that the next + * query is being processed. At the end of the pipeline, returns a + * result with PQresultStatus(result) == PGRES_PIPELINE_SYNC. */ - PGresult * PQgetResult(PGconn *conn) { @@ -1803,8 +1995,62 @@ PQgetResult(PGconn *conn) { case PGASYNC_IDLE: res = NULL; /* query is complete */ + if (conn->pipelineStatus != PQ_PIPELINE_OFF) + { + /* + * We're about to return the NULL that terminates the round of + * results from the current query; prepare to send the results + * of the next query when we're called next. Also, since this + * is the start of the results of the next query, clear any + * prior error message. + */ + resetPQExpBuffer(&conn->errorMessage); + pqPipelineProcessQueue(conn); + } break; case PGASYNC_READY: + + /* + * For any query type other than simple query protocol, we advance + * the command queue here. This is because for simple query + * protocol we can get the READY state multiple times before the + * command is actually complete, since the command string can + * contain many queries. In simple query protocol, the queue + * advance is done by fe-protocol3 when it receives ReadyForQuery. + */ + if (conn->cmd_queue_head && + conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE) + pqCommandQueueAdvance(conn); + res = pqPrepareAsyncResult(conn); + if (conn->pipelineStatus != PQ_PIPELINE_OFF) + { + /* + * We're about to send the results of the current query. Set + * us idle now, and ... + */ + conn->asyncStatus = PGASYNC_IDLE; + + /* + * ... in cases when we're sending a pipeline-sync result, + * move queue processing forwards immediately, so that next + * time we're called, we're prepared to return the next result + * received from the server. In all other cases, leave the + * queue state change for next time, so that a terminating + * NULL result is sent. + * + * (In other words: we don't return a NULL after a pipeline + * sync.) + */ + if (res && res->resultStatus == PGRES_PIPELINE_SYNC) + pqPipelineProcessQueue(conn); + } + else + { + /* Set the state back to BUSY, allowing parsing to proceed. */ + conn->asyncStatus = PGASYNC_BUSY; + } + break; + case PGASYNC_READY_MORE: res = pqPrepareAsyncResult(conn); /* Set the state back to BUSY, allowing parsing to proceed. */ conn->asyncStatus = PGASYNC_BUSY; @@ -1985,6 +2231,13 @@ PQexecStart(PGconn *conn) if (!conn) return false; + if (conn->pipelineStatus != PQ_PIPELINE_OFF) + { + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext("synchronous command execution functions are not allowed in pipeline mode\n")); + return false; + } + /* * Since this is the beginning of a query cycle, reset the error buffer. */ @@ -2148,6 +2401,8 @@ PQsendDescribePortal(PGconn *conn, const char *portal) static int PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target) { + PGcmdQueueEntry *entry = NULL; + /* Treat null desc_target as empty string */ if (!desc_target) desc_target = ""; @@ -2155,6 +2410,10 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target) if (!PQsendQueryStart(conn, true)) return 0; + entry = pqAllocCmdQueueEntry(conn); + if (entry == NULL) + return 0; /* error msg already set */ + /* construct the Describe message */ if (pqPutMsgStart('D', conn) < 0 || pqPutc(desc_type, conn) < 0 || @@ -2163,32 +2422,32 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target) goto sendFailed; /* construct the Sync message */ - if (pqPutMsgStart('S', conn) < 0 || - pqPutMsgEnd(conn) < 0) - goto sendFailed; - - /* remember we are doing a Describe */ - conn->queryclass = PGQUERY_DESCRIBE; - - /* reset last_query string (not relevant now) */ - if (conn->last_query) + if (conn->pipelineStatus == PQ_PIPELINE_OFF) { - free(conn->last_query); - conn->last_query = NULL; + if (pqPutMsgStart('S', conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; } + /* remember we are doing a Describe */ + entry->queryclass = PGQUERY_DESCRIBE; + /* - * Give the data a push. In nonblock mode, don't complain if we're unable - * to send it all; PQgetResult() will do any additional flushing needed. + * Give the data a push (in pipeline mode, only if we're past the size + * threshold). In nonblock mode, don't complain if we're unable to send + * it all; PQgetResult() will do any additional flushing needed. */ - if (pqFlush(conn) < 0) + if (pqPipelineFlush(conn) < 0) goto sendFailed; /* OK, it's launched! */ - conn->asyncStatus = PGASYNC_BUSY; + pqAppendCmdQueueEntry(conn, entry); + if (conn->pipelineStatus == PQ_PIPELINE_OFF) + conn->asyncStatus = PGASYNC_BUSY; return 1; sendFailed: + pqRecycleCmdQueueEntry(conn, entry); /* error message should be set up already */ return 0; } @@ -2327,7 +2586,8 @@ PQputCopyEnd(PGconn *conn, const char *errormsg) * If we sent the COPY command in extended-query mode, we must issue a * Sync as well. */ - if (conn->queryclass != PGQUERY_SIMPLE) + if (conn->cmd_queue_head && + conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE) { if (pqPutMsgStart('S', conn) < 0 || pqPutMsgEnd(conn) < 0) @@ -2541,6 +2801,13 @@ PQfn(PGconn *conn, */ resetPQExpBuffer(&conn->errorMessage); + if (conn->pipelineStatus != PQ_PIPELINE_OFF) + { + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext("PQfn not allowed in pipeline mode\n")); + return NULL; + } + if (conn->sock == PGINVALID_SOCKET || conn->asyncStatus != PGASYNC_IDLE || conn->result != NULL) { @@ -2555,6 +2822,277 @@ PQfn(PGconn *conn, args, nargs); } +/* ====== Pipeline mode support ======== */ + +/* + * PQenterPipelineMode + * Put an idle connection in pipeline mode. + * + * Returns 1 on success. On failure, errorMessage is set and 0 is returned. + * + * Commands submitted after this can be pipelined on the connection; + * there's no requirement to wait for one to finish before the next is + * dispatched. + * + * Queuing of a new query or syncing during COPY is not allowed. + * + * A set of commands is terminated by a PQpipelineSync. Multiple sync + * points can be established while in pipeline mode. Pipeline mode can + * be exited by calling PQexitPipelineMode() once all results are processed. + * + * This doesn't actually send anything on the wire, it just puts libpq + * into a state where it can pipeline work. + */ +int +PQenterPipelineMode(PGconn *conn) +{ + if (!conn) + return 0; + + /* succeed with no action if already in pipeline mode */ + if (conn->pipelineStatus != PQ_PIPELINE_OFF) + return 1; + + if (conn->asyncStatus != PGASYNC_IDLE) + { + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext("cannot enter pipeline mode, connection not idle\n")); + return 0; + } + + conn->pipelineStatus = PQ_PIPELINE_ON; + + return 1; +} + +/* + * PQexitPipelineMode + * End pipeline mode and return to normal command mode. + * + * Returns 1 in success (pipeline mode successfully ended, or not in pipeline + * mode). + * + * Returns 0 if in pipeline mode and cannot be ended yet. Error message will + * be set. + */ +int +PQexitPipelineMode(PGconn *conn) +{ + if (!conn) + return 0; + + if (conn->pipelineStatus == PQ_PIPELINE_OFF) + return 1; + + switch (conn->asyncStatus) + { + case PGASYNC_READY: + case PGASYNC_READY_MORE: + /* there are some uncollected results */ + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext("cannot exit pipeline mode with uncollected results\n")); + return 0; + + case PGASYNC_BUSY: + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext("cannot exit pipeline mode while busy\n")); + return 0; + + default: + /* OK */ + break; + } + + /* still work to process */ + if (conn->cmd_queue_head != NULL) + { + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext("cannot exit pipeline mode with uncollected results\n")); + return 0; + } + + conn->pipelineStatus = PQ_PIPELINE_OFF; + conn->asyncStatus = PGASYNC_IDLE; + + /* Flush any pending data in out buffer */ + if (pqFlush(conn) < 0) + return 0; /* error message is setup already */ + return 1; +} + +/* + * pqCommandQueueAdvance + * Remove one query from the command queue, when we receive + * all results from the server that pertain to it. + */ +void +pqCommandQueueAdvance(PGconn *conn) +{ + PGcmdQueueEntry *prevquery; + + if (conn->cmd_queue_head == NULL) + return; + + /* delink from queue */ + prevquery = conn->cmd_queue_head; + conn->cmd_queue_head = conn->cmd_queue_head->next; + + /* and make it recyclable */ + prevquery->next = NULL; + pqRecycleCmdQueueEntry(conn, prevquery); +} + +/* + * pqPipelineProcessQueue: subroutine for PQgetResult + * In pipeline mode, start processing the results of the next query in the queue. + */ +void +pqPipelineProcessQueue(PGconn *conn) +{ + switch (conn->asyncStatus) + { + case PGASYNC_COPY_IN: + case PGASYNC_COPY_OUT: + case PGASYNC_COPY_BOTH: + case PGASYNC_READY: + case PGASYNC_READY_MORE: + case PGASYNC_BUSY: + /* client still has to process current query or results */ + return; + case PGASYNC_IDLE: + /* next query please */ + break; + } + + /* Nothing to do if not in pipeline mode, or queue is empty */ + if (conn->pipelineStatus == PQ_PIPELINE_OFF || + conn->cmd_queue_head == NULL) + return; + + /* Initialize async result-accumulation state */ + pqClearAsyncResult(conn); + + /* + * Reset single-row processing mode. (Client has to set it up for each + * query, if desired.) + */ + conn->singleRowMode = false; + + if (conn->pipelineStatus == PQ_PIPELINE_ABORTED && + conn->cmd_queue_head->queryclass != PGQUERY_SYNC) + { + /* + * In an aborted pipeline we don't get anything from the server for + * each result; we're just discarding commands from the queue until we + * get to the next sync from the server. + * + * The PGRES_PIPELINE_ABORTED results tell the client that its queries + * got aborted. + */ + conn->result = PQmakeEmptyPGresult(conn, PGRES_PIPELINE_ABORTED); + if (!conn->result) + { + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext("out of memory\n")); + pqSaveErrorResult(conn); + return; + } + conn->asyncStatus = PGASYNC_READY; + } + else + { + /* allow parsing to continue */ + conn->asyncStatus = PGASYNC_BUSY; + } +} + +/* + * PQpipelineSync + * Send a Sync message as part of a pipeline, and flush to server + * + * It's legal to start submitting more commands in the pipeline immediately, + * without waiting for the results of the current pipeline. There's no need to + * end pipeline mode and start it again. + * + * If a command in a pipeline fails, every subsequent command up to and including + * the result to the Sync message sent by PQpipelineSync gets set to + * PGRES_PIPELINE_ABORTED state. If the whole pipeline is processed without + * error, a PGresult with PGRES_PIPELINE_SYNC is produced. + * + * Queries can already have been sent before PQpipelineSync is called, but + * PQpipelineSync need to be called before retrieving command results. + * + * The connection will remain in pipeline mode and unavailable for new + * synchronous command execution functions until all results from the pipeline + * are processed by the client. + */ +int +PQpipelineSync(PGconn *conn) +{ + PGcmdQueueEntry *entry; + + if (!conn) + return 0; + + if (conn->pipelineStatus == PQ_PIPELINE_OFF) + { + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext("cannot send pipeline when not in pipeline mode\n")); + return 0; + } + + switch (conn->asyncStatus) + { + case PGASYNC_COPY_IN: + case PGASYNC_COPY_OUT: + case PGASYNC_COPY_BOTH: + /* should be unreachable */ + appendPQExpBufferStr(&conn->errorMessage, + "internal error: cannot send pipeline while in COPY\n"); + return 0; + case PGASYNC_READY: + case PGASYNC_READY_MORE: + case PGASYNC_BUSY: + case PGASYNC_IDLE: + /* OK to send sync */ + break; + } + + entry = pqAllocCmdQueueEntry(conn); + if (entry == NULL) + return 0; /* error msg already set */ + + entry->queryclass = PGQUERY_SYNC; + entry->query = NULL; + + /* construct the Sync message */ + if (pqPutMsgStart('S', conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; + + pqAppendCmdQueueEntry(conn, entry); + + /* + * Give the data a push. In nonblock mode, don't complain if we're unable + * to send it all; PQgetResult() will do any additional flushing needed. + */ + if (PQflush(conn) < 0) + goto sendFailed; + + /* + * Call pqPipelineProcessQueue so the user can call start calling + * PQgetResult. + */ + pqPipelineProcessQueue(conn); + + return 1; + +sendFailed: + pqRecycleCmdQueueEntry(conn, entry); + /* error message should be set up already */ + return 0; +} + /* ====== accessor funcs for PGresult ======== */ @@ -2569,7 +3107,7 @@ PQresultStatus(const PGresult *res) char * PQresStatus(ExecStatusType status) { - if ((unsigned int) status >= sizeof pgresStatus / sizeof pgresStatus[0]) + if ((unsigned int) status >= lengthof(pgresStatus)) return libpq_gettext("invalid ExecStatusType code"); return pgresStatus[status]; } @@ -3152,6 +3690,23 @@ PQflush(PGconn *conn) return pqFlush(conn); } +/* + * pqPipelineFlush + * + * In pipeline mode, data will be flushed only when the out buffer reaches the + * threshold value. In non-pipeline mode, it behaves as stock pqFlush. + * + * Returns 0 on success. + */ +static int +pqPipelineFlush(PGconn *conn) +{ + if ((conn->pipelineStatus != PQ_PIPELINE_ON) || + (conn->outCount >= OUTBUFFER_THRESHOLD)) + return pqFlush(conn); + return 0; +} + /* * PQfreemem - safely frees memory allocated |