diff options
Diffstat (limited to 'src/interfaces/libpq/fe-protocol3.c')
-rw-r--r-- | src/interfaces/libpq/fe-protocol3.c | 77 |
1 files changed, 66 insertions, 11 deletions
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index eb55d528fb..306e89acfd 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -159,6 +159,18 @@ pqParseInput3(PGconn *conn) return; /* + * We're also notionally not-IDLE when in pipeline mode the state + * says "idle" (so we have completed receiving the results of one + * query from the server and dispatched them to the application) + * but another query is queued; yield back control to caller so + * that they can initiate processing of the next query in the + * queue. + */ + if (conn->pipelineStatus != PQ_PIPELINE_OFF && + conn->cmd_queue_head != NULL) + return; + + /* * Unexpected message in IDLE state; need to recover somehow. * ERROR messages are handled using the notice processor; * ParameterStatus is handled normally; anything else is just @@ -179,6 +191,7 @@ pqParseInput3(PGconn *conn) } else { + /* Any other case is unexpected and we summarily skip it */ pqInternalNotice(&conn->noticeHooks, "message type 0x%02x arrived from server while idle", id); @@ -217,10 +230,37 @@ pqParseInput3(PGconn *conn) return; conn->asyncStatus = PGASYNC_READY; break; - case 'Z': /* backend is ready for new query */ + case 'Z': /* sync response, backend is ready for new + * query */ if (getReadyForQuery(conn)) return; - conn->asyncStatus = PGASYNC_IDLE; + if (conn->pipelineStatus != PQ_PIPELINE_OFF) + { + conn->result = PQmakeEmptyPGresult(conn, + PGRES_PIPELINE_SYNC); + if (!conn->result) + { + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext("out of memory")); + pqSaveErrorResult(conn); + } + else + { + conn->pipelineStatus = PQ_PIPELINE_ON; + conn->asyncStatus = PGASYNC_READY; + } + } + else + { + /* + * In simple query protocol, advance the command queue + * (see PQgetResult). + */ + if (conn->cmd_queue_head && + conn->cmd_queue_head->queryclass == PGQUERY_SIMPLE) + pqCommandQueueAdvance(conn); + conn->asyncStatus = PGASYNC_IDLE; + } break; case 'I': /* empty query */ if (conn->result == NULL) @@ -238,7 +278,8 @@ pqParseInput3(PGconn *conn) break; case '1': /* Parse Complete */ /* If we're doing PQprepare, we're done; else ignore */ - if (conn->queryclass == PGQUERY_PREPARE) + if (conn->cmd_queue_head && + conn->cmd_queue_head->queryclass == PGQUERY_PREPARE) { if (conn->result == NULL) { @@ -285,7 +326,8 @@ pqParseInput3(PGconn *conn) conn->inCursor += msgLength; } else if (conn->result == NULL || - conn->queryclass == PGQUERY_DESCRIBE) + (conn->cmd_queue_head && + conn->cmd_queue_head->queryclass == PGQUERY_DESCRIBE)) { /* First 'T' in a query sequence */ if (getRowDescriptions(conn, msgLength)) @@ -316,7 +358,8 @@ pqParseInput3(PGconn *conn) * instead of PGRES_TUPLES_OK. Otherwise we can just * ignore this message. */ - if (conn->queryclass == PGQUERY_DESCRIBE) + if (conn->cmd_queue_head && + conn->cmd_queue_head->queryclass == PGQUERY_DESCRIBE) { if (conn->result == NULL) { @@ -445,7 +488,7 @@ handleSyncLoss(PGconn *conn, char id, int msgLength) id, msgLength); /* build an error result holding the error message */ pqSaveErrorResult(conn); - conn->asyncStatus = PGASYNC_READY; /* drop out of GetResult wait loop */ + conn->asyncStatus = PGASYNC_READY; /* drop out of PQgetResult wait loop */ /* flush input data since we're giving up on processing it */ pqDropConnection(conn, true); conn->status = CONNECTION_BAD; /* No more connection to backend */ @@ -471,7 +514,9 @@ getRowDescriptions(PGconn *conn, int msgLength) * PGresult created by getParamDescriptions, and we should fill data into * that. Otherwise, create a new, empty PGresult. */ - if (conn->queryclass == PGQUERY_DESCRIBE) + if (!conn->cmd_queue_head || + (conn->cmd_queue_head && + conn->cmd_queue_head->queryclass == PGQUERY_DESCRIBE)) { if (conn->result) result = conn->result; @@ -568,7 +613,9 @@ getRowDescriptions(PGconn *conn, int msgLength) * If we're doing a Describe, we're done, and ready to pass the result * back to the client. */ - if (conn->queryclass == PGQUERY_DESCRIBE) + if ((!conn->cmd_queue_head) || + (conn->cmd_queue_head && + conn->cmd_queue_head->queryclass == PGQUERY_DESCRIBE)) { conn->asyncStatus = PGASYNC_READY; return 0; @@ -841,6 +888,10 @@ pqGetErrorNotice3(PGconn *conn, bool isError) PQExpBufferData workBuf; char id; + /* If in pipeline mode, set error indicator for it */ + if (isError && conn->pipelineStatus != PQ_PIPELINE_OFF) + conn->pipelineStatus = PQ_PIPELINE_ABORTED; + /* * If this is an error message, pre-emptively clear any incomplete query * result we may have. We'd just throw it away below anyway, and @@ -897,8 +948,8 @@ pqGetErrorNotice3(PGconn *conn, bool isError) * might need it for an error cursor display, which is only true if there * is a PG_DIAG_STATEMENT_POSITION field. */ - if (have_position && conn->last_query && res) - res->errQuery = pqResultStrdup(res, conn->last_query); + if (have_position && res && conn->cmd_queue_head && conn->cmd_queue_head->query) + res->errQuery = pqResultStrdup(res, conn->cmd_queue_head->query); /* * Now build the "overall" error message for PQresultErrorMessage. @@ -1817,7 +1868,8 @@ pqEndcopy3(PGconn *conn) * If we sent the COPY command in extended-query mode, we must issue a * Sync as well. */ - if (conn->queryclass != PGQUERY_SIMPLE) + if (conn->cmd_queue_head && + conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE) { if (pqPutMsgStart('S', conn) < 0 || pqPutMsgEnd(conn) < 0) @@ -1897,6 +1949,9 @@ pqFunctionCall3(PGconn *conn, Oid fnid, int avail; int i; + /* already validated by PQfn */ + Assert(conn->pipelineStatus == PQ_PIPELINE_OFF); + /* PQfn already validated connection state */ if (pqPutMsgStart('F', conn) < 0 || /* function call msg */ |