summaryrefslogtreecommitdiff
path: root/src/interfaces/libpq/fe-protocol3.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/interfaces/libpq/fe-protocol3.c')
-rw-r--r--src/interfaces/libpq/fe-protocol3.c77
1 files changed, 66 insertions, 11 deletions
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index eb55d528fb..306e89acfd 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -159,6 +159,18 @@ pqParseInput3(PGconn *conn)
return;
/*
+ * We're also notionally not-IDLE when in pipeline mode the state
+ * says "idle" (so we have completed receiving the results of one
+ * query from the server and dispatched them to the application)
+ * but another query is queued; yield back control to caller so
+ * that they can initiate processing of the next query in the
+ * queue.
+ */
+ if (conn->pipelineStatus != PQ_PIPELINE_OFF &&
+ conn->cmd_queue_head != NULL)
+ return;
+
+ /*
* Unexpected message in IDLE state; need to recover somehow.
* ERROR messages are handled using the notice processor;
* ParameterStatus is handled normally; anything else is just
@@ -179,6 +191,7 @@ pqParseInput3(PGconn *conn)
}
else
{
+ /* Any other case is unexpected and we summarily skip it */
pqInternalNotice(&conn->noticeHooks,
"message type 0x%02x arrived from server while idle",
id);
@@ -217,10 +230,37 @@ pqParseInput3(PGconn *conn)
return;
conn->asyncStatus = PGASYNC_READY;
break;
- case 'Z': /* backend is ready for new query */
+ case 'Z': /* sync response, backend is ready for new
+ * query */
if (getReadyForQuery(conn))
return;
- conn->asyncStatus = PGASYNC_IDLE;
+ if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+ {
+ conn->result = PQmakeEmptyPGresult(conn,
+ PGRES_PIPELINE_SYNC);
+ if (!conn->result)
+ {
+ appendPQExpBufferStr(&conn->errorMessage,
+ libpq_gettext("out of memory"));
+ pqSaveErrorResult(conn);
+ }
+ else
+ {
+ conn->pipelineStatus = PQ_PIPELINE_ON;
+ conn->asyncStatus = PGASYNC_READY;
+ }
+ }
+ else
+ {
+ /*
+ * In simple query protocol, advance the command queue
+ * (see PQgetResult).
+ */
+ if (conn->cmd_queue_head &&
+ conn->cmd_queue_head->queryclass == PGQUERY_SIMPLE)
+ pqCommandQueueAdvance(conn);
+ conn->asyncStatus = PGASYNC_IDLE;
+ }
break;
case 'I': /* empty query */
if (conn->result == NULL)
@@ -238,7 +278,8 @@ pqParseInput3(PGconn *conn)
break;
case '1': /* Parse Complete */
/* If we're doing PQprepare, we're done; else ignore */
- if (conn->queryclass == PGQUERY_PREPARE)
+ if (conn->cmd_queue_head &&
+ conn->cmd_queue_head->queryclass == PGQUERY_PREPARE)
{
if (conn->result == NULL)
{
@@ -285,7 +326,8 @@ pqParseInput3(PGconn *conn)
conn->inCursor += msgLength;
}
else if (conn->result == NULL ||
- conn->queryclass == PGQUERY_DESCRIBE)
+ (conn->cmd_queue_head &&
+ conn->cmd_queue_head->queryclass == PGQUERY_DESCRIBE))
{
/* First 'T' in a query sequence */
if (getRowDescriptions(conn, msgLength))
@@ -316,7 +358,8 @@ pqParseInput3(PGconn *conn)
* instead of PGRES_TUPLES_OK. Otherwise we can just
* ignore this message.
*/
- if (conn->queryclass == PGQUERY_DESCRIBE)
+ if (conn->cmd_queue_head &&
+ conn->cmd_queue_head->queryclass == PGQUERY_DESCRIBE)
{
if (conn->result == NULL)
{
@@ -445,7 +488,7 @@ handleSyncLoss(PGconn *conn, char id, int msgLength)
id, msgLength);
/* build an error result holding the error message */
pqSaveErrorResult(conn);
- conn->asyncStatus = PGASYNC_READY; /* drop out of GetResult wait loop */
+ conn->asyncStatus = PGASYNC_READY; /* drop out of PQgetResult wait loop */
/* flush input data since we're giving up on processing it */
pqDropConnection(conn, true);
conn->status = CONNECTION_BAD; /* No more connection to backend */
@@ -471,7 +514,9 @@ getRowDescriptions(PGconn *conn, int msgLength)
* PGresult created by getParamDescriptions, and we should fill data into
* that. Otherwise, create a new, empty PGresult.
*/
- if (conn->queryclass == PGQUERY_DESCRIBE)
+ if (!conn->cmd_queue_head ||
+ (conn->cmd_queue_head &&
+ conn->cmd_queue_head->queryclass == PGQUERY_DESCRIBE))
{
if (conn->result)
result = conn->result;
@@ -568,7 +613,9 @@ getRowDescriptions(PGconn *conn, int msgLength)
* If we're doing a Describe, we're done, and ready to pass the result
* back to the client.
*/
- if (conn->queryclass == PGQUERY_DESCRIBE)
+ if ((!conn->cmd_queue_head) ||
+ (conn->cmd_queue_head &&
+ conn->cmd_queue_head->queryclass == PGQUERY_DESCRIBE))
{
conn->asyncStatus = PGASYNC_READY;
return 0;
@@ -841,6 +888,10 @@ pqGetErrorNotice3(PGconn *conn, bool isError)
PQExpBufferData workBuf;
char id;
+ /* If in pipeline mode, set error indicator for it */
+ if (isError && conn->pipelineStatus != PQ_PIPELINE_OFF)
+ conn->pipelineStatus = PQ_PIPELINE_ABORTED;
+
/*
* If this is an error message, pre-emptively clear any incomplete query
* result we may have. We'd just throw it away below anyway, and
@@ -897,8 +948,8 @@ pqGetErrorNotice3(PGconn *conn, bool isError)
* might need it for an error cursor display, which is only true if there
* is a PG_DIAG_STATEMENT_POSITION field.
*/
- if (have_position && conn->last_query && res)
- res->errQuery = pqResultStrdup(res, conn->last_query);
+ if (have_position && res && conn->cmd_queue_head && conn->cmd_queue_head->query)
+ res->errQuery = pqResultStrdup(res, conn->cmd_queue_head->query);
/*
* Now build the "overall" error message for PQresultErrorMessage.
@@ -1817,7 +1868,8 @@ pqEndcopy3(PGconn *conn)
* If we sent the COPY command in extended-query mode, we must issue a
* Sync as well.
*/
- if (conn->queryclass != PGQUERY_SIMPLE)
+ if (conn->cmd_queue_head &&
+ conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE)
{
if (pqPutMsgStart('S', conn) < 0 ||
pqPutMsgEnd(conn) < 0)
@@ -1897,6 +1949,9 @@ pqFunctionCall3(PGconn *conn, Oid fnid,
int avail;
int i;
+ /* already validated by PQfn */
+ Assert(conn->pipelineStatus == PQ_PIPELINE_OFF);
+
/* PQfn already validated connection state */
if (pqPutMsgStart('F', conn) < 0 || /* function call msg */