summaryrefslogtreecommitdiff
path: root/contrib
diff options
context:
space:
mode:
authorJoe Conway <mail@joeconway.com>2006-09-02 21:11:15 +0000
committerJoe Conway <mail@joeconway.com>2006-09-02 21:11:15 +0000
commit52a3ed9fac32e16f6061cbc49046c0bd97f8f77a (patch)
treee6ad0c82b727568e7a90cc283566fe56606cca92 /contrib
parent1cc9299a7a963582f076dd1184f41c772ab622d9 (diff)
downloadpostgresql-52a3ed9fac32e16f6061cbc49046c0bd97f8f77a.tar.gz
Added async query capability. Original patch by
Kai Londenberg, modified by Joe Conway
Diffstat (limited to 'contrib')
-rw-r--r--contrib/dblink/README.dblink74
-rw-r--r--contrib/dblink/dblink.c404
-rw-r--r--contrib/dblink/dblink.h8
-rw-r--r--contrib/dblink/dblink.sql.in35
-rw-r--r--contrib/dblink/doc/misc93
-rw-r--r--contrib/dblink/doc/query122
-rw-r--r--contrib/dblink/expected/dblink.out87
-rw-r--r--contrib/dblink/sql/dblink.sql29
8 files changed, 686 insertions, 166 deletions
diff --git a/contrib/dblink/README.dblink b/contrib/dblink/README.dblink
index 9e1bdba6cb..c765cf5b97 100644
--- a/contrib/dblink/README.dblink
+++ b/contrib/dblink/README.dblink
@@ -7,6 +7,7 @@
* And contributors:
* Darko Prenosil <Darko.Prenosil@finteh.hr>
* Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
+ * Kai Londenberg (K.Londenberg@librics.de)
*
* Copyright (c) 2001-2006, PostgreSQL Global Development Group
* ALL RIGHTS RESERVED;
@@ -31,6 +32,9 @@
*/
Release Notes:
+ 27 August 2006
+ - Added async query capability. Original patch by
+ Kai Londenberg (K.Londenberg@librics.de), modified by Joe Conway
Version 0.7 (as of 25 Feb, 2004)
- Added new version of dblink, dblink_exec, dblink_open, dblink_close,
and, dblink_fetch -- allows ERROR on remote side of connection to
@@ -85,75 +89,7 @@ Installation:
psql template1 < dblink.sql
- installs following functions into database template1:
-
- connection
- ------------
- dblink_connect(text) RETURNS text
- - opens an unnamed connection that will persist for duration of
- current backend or until it is disconnected
- dblink_connect(text,text) RETURNS text
- - opens a named connection that will persist for duration of current
- backend or until it is disconnected
- dblink_disconnect() RETURNS text
- - disconnects the unnamed persistent connection
- dblink_disconnect(text) RETURNS text
- - disconnects a named persistent connection
-
- cursor
- ------------
- dblink_open(text,text [, bool fail_on_error]) RETURNS text
- - opens a cursor using unnamed connection already opened with
- dblink_connect() that will persist for duration of current backend
- or until it is closed
- dblink_open(text,text,text [, bool fail_on_error]) RETURNS text
- - opens a cursor using a named connection already opened with
- dblink_connect() that will persist for duration of current backend
- or until it is closed
- dblink_fetch(text, int [, bool fail_on_error]) RETURNS setof record
- - fetches data from an already opened cursor on the unnamed connection
- dblink_fetch(text, text, int [, bool fail_on_error]) RETURNS setof record
- - fetches data from an already opened cursor on a named connection
- dblink_close(text [, bool fail_on_error]) RETURNS text
- - closes a cursor on the unnamed connection
- dblink_close(text,text [, bool fail_on_error]) RETURNS text
- - closes a cursor on a named connection
-
- query
- ------------
- dblink(text,text [, bool fail_on_error]) RETURNS setof record
- - returns a set of results from remote SELECT query; the first argument
- is either a connection string, or the name of an already opened
- persistant connection
- dblink(text [, bool fail_on_error]) RETURNS setof record
- - returns a set of results from remote SELECT query, using the unnamed
- connection already opened with dblink_connect()
-
- execute
- ------------
- dblink_exec(text, text [, bool fail_on_error]) RETURNS text
- - executes an INSERT/UPDATE/DELETE query remotely; the first argument
- is either a connection string, or the name of an already opened
- persistant connection
- dblink_exec(text [, bool fail_on_error]) RETURNS text
- - executes an INSERT/UPDATE/DELETE query remotely, using connection
- already opened with dblink_connect()
-
- misc
- ------------
- dblink_current_query() RETURNS text
- - returns the current query string
- dblink_get_pkey(text) RETURNS setof text
- - returns the field names of a relation's primary key fields
- dblink_build_sql_insert(text,int2vector,int2,_text,_text) RETURNS text
- - builds an insert statement using a local tuple, replacing the
- selection key field values with alternate supplied values
- dblink_build_sql_delete(text,int2vector,int2,_text) RETURNS text
- - builds a delete statement using supplied values for selection
- key field values
- dblink_build_sql_update(text,int2vector,int2,_text,_text) RETURNS text
- - builds an update statement using a local tuple, replacing the
- selection key field values with alternate supplied values
+ installs dblink functions into database template1
Documentation:
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 3405ddeaa1..7a46673b6b 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -8,7 +8,7 @@
* Darko Prenosil <Darko.Prenosil@finteh.hr>
* Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
*
- * $PostgreSQL: pgsql/contrib/dblink/dblink.c,v 1.57 2006/07/11 16:35:31 momjian Exp $
+ * $PostgreSQL: pgsql/contrib/dblink/dblink.c,v 1.58 2006/09/02 21:11:15 joe Exp $
* Copyright (c) 2001-2006, PostgreSQL Global Development Group
* ALL RIGHTS RESERVED;
*
@@ -73,6 +73,7 @@ typedef struct remoteConn
/*
* Internal declarations
*/
+static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async, bool do_get);
static remoteConn *getConnectionByName(const char *name);
static HTAB *createConnHash(void);
static void createNewConnection(const char *name, remoteConn * rconn);
@@ -691,6 +692,26 @@ PG_FUNCTION_INFO_V1(dblink_record);
Datum
dblink_record(PG_FUNCTION_ARGS)
{
+ return dblink_record_internal(fcinfo, false, false);
+}
+
+PG_FUNCTION_INFO_V1(dblink_send_query);
+Datum
+dblink_send_query(PG_FUNCTION_ARGS)
+{
+ return dblink_record_internal(fcinfo, true, false);
+}
+
+PG_FUNCTION_INFO_V1(dblink_get_result);
+Datum
+dblink_get_result(PG_FUNCTION_ARGS)
+{
+ return dblink_record_internal(fcinfo, true, true);
+}
+
+static Datum
+dblink_record_internal(FunctionCallInfo fcinfo, bool is_async, bool do_get)
+{
FuncCallContext *funcctx;
TupleDesc tupdesc = NULL;
int call_cntr;
@@ -723,128 +744,187 @@ dblink_record(PG_FUNCTION_ARGS)
*/
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
- if (PG_NARGS() == 3)
+ if (!is_async)
{
- /* text,text,bool */
- DBLINK_GET_CONN;
- sql = GET_STR(PG_GETARG_TEXT_P(1));
- fail = PG_GETARG_BOOL(2);
- }
- else if (PG_NARGS() == 2)
- {
- /* text,text or text,bool */
- if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
+ if (PG_NARGS() == 3)
{
+ /* text,text,bool */
+ DBLINK_GET_CONN;
+ sql = GET_STR(PG_GETARG_TEXT_P(1));
+ fail = PG_GETARG_BOOL(2);
+ }
+ else if (PG_NARGS() == 2)
+ {
+ /* text,text or text,bool */
+ if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
+ {
+ conn = pconn->conn;
+ sql = GET_STR(PG_GETARG_TEXT_P(0));
+ fail = PG_GETARG_BOOL(1);
+ }
+ else
+ {
+ DBLINK_GET_CONN;
+ sql = GET_STR(PG_GETARG_TEXT_P(1));
+ }
+ }
+ else if (PG_NARGS() == 1)
+ {
+ /* text */
conn = pconn->conn;
sql = GET_STR(PG_GETARG_TEXT_P(0));
- fail = PG_GETARG_BOOL(1);
}
else
+ /* shouldn't happen */
+ elog(ERROR, "wrong number of arguments");
+ }
+ else if (is_async && do_get)
+ {
+ /* get async result */
+ if (PG_NARGS() == 2)
{
+ /* text,bool */
DBLINK_GET_CONN;
- sql = GET_STR(PG_GETARG_TEXT_P(1));
+ fail = PG_GETARG_BOOL(2);
}
+ else if (PG_NARGS() == 1)
+ {
+ /* text */
+ DBLINK_GET_CONN;
+ }
+ else
+ /* shouldn't happen */
+ elog(ERROR, "wrong number of arguments");
}
- else if (PG_NARGS() == 1)
+ else
{
- /* text */
- conn = pconn->conn;
- sql = GET_STR(PG_GETARG_TEXT_P(0));
+ /* send async query */
+ if (PG_NARGS() == 2)
+ {
+ DBLINK_GET_CONN;
+ sql = GET_STR(PG_GETARG_TEXT_P(1));
+ }
+ else
+ /* shouldn't happen */
+ elog(ERROR, "wrong number of arguments");
}
- else
- /* shouldn't happen */
- elog(ERROR, "wrong number of arguments");
if (!conn)
DBLINK_CONN_NOT_AVAIL;
- res = PQexec(conn, sql);
- if (!res ||
- (PQresultStatus(res) != PGRES_COMMAND_OK &&
- PQresultStatus(res) != PGRES_TUPLES_OK))
+ if (!is_async || (is_async && do_get))
{
- if (fail)
- DBLINK_RES_ERROR("sql error");
+ /* synchronous query, or async result retrieval */
+ if (!is_async)
+ res = PQexec(conn, sql);
else
{
- DBLINK_RES_ERROR_AS_NOTICE("sql error");
- if (freeconn)
- PQfinish(conn);
- SRF_RETURN_DONE(funcctx);
+ res = PQgetResult(conn);
+ /* NULL means we're all done with the async results */
+ if (!res)
+ SRF_RETURN_DONE(funcctx);
}
- }
- if (PQresultStatus(res) == PGRES_COMMAND_OK)
- {
- is_sql_cmd = true;
-
- /* need a tuple descriptor representing one TEXT column */
- tupdesc = CreateTemplateTupleDesc(1, false);
- TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
- TEXTOID, -1, 0);
-
- /*
- * and save a copy of the command status string to return as our
- * result tuple
- */
- sql_cmd_status = PQcmdStatus(res);
- funcctx->max_calls = 1;
- }
- else
- funcctx->max_calls = PQntuples(res);
-
- /* got results, keep track of them */
- funcctx->user_fctx = res;
-
- /* if needed, close the connection to the database and cleanup */
- if (freeconn)
- PQfinish(conn);
-
- if (!is_sql_cmd)
- {
- /* get a tuple descriptor for our result type */
- switch (get_call_result_type(fcinfo, NULL, &tupdesc))
+ if (!res ||
+ (PQresultStatus(res) != PGRES_COMMAND_OK &&
+ PQresultStatus(res) != PGRES_TUPLES_OK))
{
- case TYPEFUNC_COMPOSITE:
- /* success */
- break;
- case TYPEFUNC_RECORD:
- /* failed to determine actual type of RECORD */
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("function returning record called in context "
- "that cannot accept type record")));
- break;
- default:
- /* result type isn't composite */
- elog(ERROR, "return type must be a row type");
- break;
+ if (fail)
+ DBLINK_RES_ERROR("sql error");
+ else
+ {
+ DBLINK_RES_ERROR_AS_NOTICE("sql error");
+ if (freeconn)
+ PQfinish(conn);
+ SRF_RETURN_DONE(funcctx);
+ }
}
-
- /* make sure we have a persistent copy of the tupdesc */
- tupdesc = CreateTupleDescCopy(tupdesc);
+
+ if (PQresultStatus(res) == PGRES_COMMAND_OK)
+ {
+ is_sql_cmd = true;
+
+ /* need a tuple descriptor representing one TEXT column */
+ tupdesc = CreateTemplateTupleDesc(1, false);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
+ TEXTOID, -1, 0);
+
+ /*
+ * and save a copy of the command status string to return as our
+ * result tuple
+ */
+ sql_cmd_status = PQcmdStatus(res);
+ funcctx->max_calls = 1;
+ }
+ else
+ funcctx->max_calls = PQntuples(res);
+
+ /* got results, keep track of them */
+ funcctx->user_fctx = res;
+
+ /* if needed, close the connection to the database and cleanup */
+ if (freeconn)
+ PQfinish(conn);
+
+ if (!is_sql_cmd)
+ {
+ /* get a tuple descriptor for our result type */
+ switch (get_call_result_type(fcinfo, NULL, &tupdesc))
+ {
+ case TYPEFUNC_COMPOSITE:
+ /* success */
+ break;
+ case TYPEFUNC_RECORD:
+ /* failed to determine actual type of RECORD */
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("function returning record called in context "
+ "that cannot accept type record")));
+ break;
+ default:
+ /* result type isn't composite */
+ elog(ERROR, "return type must be a row type");
+ break;
+ }
+
+ /* make sure we have a persistent copy of the tupdesc */
+ tupdesc = CreateTupleDescCopy(tupdesc);
+ }
+
+ /* check result and tuple descriptor have the same number of columns */
+ if (PQnfields(res) != tupdesc->natts)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("remote query result rowtype does not match "
+ "the specified FROM clause rowtype")));
+
+ /* fast track when no results */
+ if (funcctx->max_calls < 1)
+ {
+ if (res)
+ PQclear(res);
+ SRF_RETURN_DONE(funcctx);
+ }
+
+ /* store needed metadata for subsequent calls */
+ attinmeta = TupleDescGetAttInMetadata(tupdesc);
+ funcctx->attinmeta = attinmeta;
+
+ MemoryContextSwitchTo(oldcontext);
}
-
- /* check result and tuple descriptor have the same number of columns */
- if (PQnfields(res) != tupdesc->natts)
- ereport(ERROR,
- (errcode(ERRCODE_DATATYPE_MISMATCH),
- errmsg("remote query result rowtype does not match "
- "the specified FROM clause rowtype")));
-
- /* fast track when no results */
- if (funcctx->max_calls < 1)
+ else
{
- if (res)
- PQclear(res);
- SRF_RETURN_DONE(funcctx);
+ /* async query send */
+ MemoryContextSwitchTo(oldcontext);
+ PG_RETURN_INT32(PQsendQuery(conn, sql));
}
+ }
- /* store needed metadata for subsequent calls */
- attinmeta = TupleDescGetAttInMetadata(tupdesc);
- funcctx->attinmeta = attinmeta;
+ if (is_async && !do_get)
+ {
+ /* async query send -- should not happen */
+ elog(ERROR, "async query send called more than once");
- MemoryContextSwitchTo(oldcontext);
}
/* stuff done on every call of the function */
@@ -903,6 +983,140 @@ dblink_record(PG_FUNCTION_ARGS)
}
/*
+ * List all open dblink connections by name.
+ * Returns an array of all connection names.
+ * Takes no params
+ */
+PG_FUNCTION_INFO_V1(dblink_get_connections);
+Datum
+dblink_get_connections(PG_FUNCTION_ARGS)
+{
+ HASH_SEQ_STATUS status;
+ remoteConnHashEnt *hentry;
+ ArrayBuildState *astate = NULL;
+
+ if (remoteConnHash)
+ {
+ hash_seq_init(&status, remoteConnHash);
+ while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL)
+ {
+ /* stash away current value */
+ astate = accumArrayResult(astate,
+ PointerGetDatum(GET_TEXT(hentry->name)),
+ false, TEXTOID, CurrentMemoryContext);
+ }
+ }
+
+ if (astate)
+ PG_RETURN_ARRAYTYPE_P(makeArrayResult(astate,
+ CurrentMemoryContext));
+ else
+ PG_RETURN_NULL();
+}
+
+/*
+ * Checks if a given remote connection is busy
+ *
+ * Returns 1 if the connection is busy, 0 otherwise
+ * Params:
+ * text connection_name - name of the connection to check
+ *
+ */
+PG_FUNCTION_INFO_V1(dblink_is_busy);
+Datum
+dblink_is_busy(PG_FUNCTION_ARGS)
+{
+ char *msg;
+ PGconn *conn = NULL;
+ char *conname = NULL;
+ char *connstr = NULL;
+ remoteConn *rconn = NULL;
+ bool freeconn = false;
+
+ DBLINK_INIT;
+ DBLINK_GET_CONN;
+ if (!conn)
+ DBLINK_CONN_NOT_AVAIL;
+
+ PQconsumeInput(conn);
+ PG_RETURN_INT32(PQisBusy(conn));
+}
+
+/*
+ * Cancels a running request on a connection
+ *
+ * Returns text:
+ * "OK" if the cancel request has been sent correctly,
+ * an error message otherwise
+ *
+ * Params:
+ * text connection_name - name of the connection to check
+ *
+ */
+PG_FUNCTION_INFO_V1(dblink_cancel_query);
+Datum
+dblink_cancel_query(PG_FUNCTION_ARGS)
+{
+ char *msg;
+ int res = 0;
+ PGconn *conn = NULL;
+ char *conname = NULL;
+ char *connstr = NULL;
+ remoteConn *rconn = NULL;
+ bool freeconn = false;
+ PGcancel *cancel;
+ char errbuf[256];
+
+ DBLINK_INIT;
+ DBLINK_GET_CONN;
+ if (!conn)
+ DBLINK_CONN_NOT_AVAIL;
+ cancel = PQgetCancel(conn);
+
+ res = PQcancel(cancel, errbuf, 256);
+ PQfreeCancel(cancel);
+
+ if (res == 0)
+ PG_RETURN_TEXT_P(GET_TEXT("OK"));
+ else
+ PG_RETURN_TEXT_P(GET_TEXT(errbuf));
+}
+
+
+/*
+ * Get error message from a connection
+ *
+ * Returns text:
+ * "OK" if no error, an error message otherwise
+ *
+ * Params:
+ * text connection_name - name of the connection to check
+ *
+ */
+PG_FUNCTION_INFO_V1(dblink_error_message);
+Datum
+dblink_error_message(PG_FUNCTION_ARGS)
+{
+ char *msg;
+ PGconn *conn = NULL;
+ char *conname = NULL;
+ char *connstr = NULL;
+ remoteConn *rconn = NULL;
+ bool freeconn = false;
+
+ DBLINK_INIT;
+ DBLINK_GET_CONN;
+ if (!conn)
+ DBLINK_CONN_NOT_AVAIL;
+
+ msg = PQerrorMessage(conn);
+ if (!msg)
+ PG_RETURN_TEXT_P(GET_TEXT("OK"));
+ else
+ PG_RETURN_TEXT_P(GET_TEXT(msg));
+}
+
+/*
* Execute an SQL non-SELECT command
*/
PG_FUNCTION_INFO_V1(dblink_exec);
diff --git a/contrib/dblink/dblink.h b/contrib/dblink/dblink.h
index 969843765f..a479744621 100644
--- a/contrib/dblink/dblink.h
+++ b/contrib/dblink/dblink.h
@@ -8,7 +8,7 @@
* Darko Prenosil <Darko.Prenosil@finteh.hr>
* Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
*
- * $PostgreSQL: pgsql/contrib/dblink/dblink.h,v 1.16 2006/07/10 18:40:16 momjian Exp $
+ * $PostgreSQL: pgsql/contrib/dblink/dblink.h,v 1.17 2006/09/02 21:11:15 joe Exp $
* Copyright (c) 2001-2006, PostgreSQL Global Development Group
* ALL RIGHTS RESERVED;
*
@@ -45,6 +45,12 @@ extern Datum dblink_open(PG_FUNCTION_ARGS);
extern Datum dblink_close(PG_FUNCTION_ARGS);
extern Datum dblink_fetch(PG_FUNCTION_ARGS);
extern Datum dblink_record(PG_FUNCTION_ARGS);
+extern Datum dblink_send_query(PG_FUNCTION_ARGS);
+extern Datum dblink_get_result(PG_FUNCTION_ARGS);
+extern Datum dblink_get_connections(PG_FUNCTION_ARGS);
+extern Datum dblink_is_busy(PG_FUNCTION_ARGS);
+extern Datum dblink_cancel_query(PG_FUNCTION_ARGS);
+extern Datum dblink_error_message(PG_FUNCTION_ARGS);
extern Datum dblink_exec(PG_FUNCTION_ARGS);
extern Datum dblink_get_pkey(PG_FUNCTION_ARGS);
extern Datum dblink_build_sql_insert(PG_FUNCTION_ARGS);
diff --git a/contrib/dblink/dblink.sql.in b/contrib/dblink/dblink.sql.in
index 7cd705ba54..e99ea05ec7 100644
--- a/contrib/dblink/dblink.sql.in
+++ b/contrib/dblink/dblink.sql.in
@@ -144,3 +144,38 @@ CREATE OR REPLACE FUNCTION dblink_current_query ()
RETURNS text
AS 'MODULE_PATHNAME','dblink_current_query'
LANGUAGE C;
+
+CREATE OR REPLACE FUNCTION dblink_send_query(text, text)
+RETURNS int4
+AS 'MODULE_PATHNAME', 'dblink_send_query'
+LANGUAGE C STRICT;
+
+CREATE OR REPLACE FUNCTION dblink_is_busy(text)
+RETURNS int4
+AS 'MODULE_PATHNAME', 'dblink_is_busy'
+LANGUAGE C STRICT;
+
+CREATE OR REPLACE FUNCTION dblink_get_result(text)
+RETURNS SETOF record
+AS 'MODULE_PATHNAME', 'dblink_get_result'
+LANGUAGE C STRICT;
+
+CREATE OR REPLACE FUNCTION dblink_get_result(text, bool)
+RETURNS SETOF record
+AS 'MODULE_PATHNAME', 'dblink_get_result'
+LANGUAGE C STRICT;
+
+CREATE OR REPLACE FUNCTION dblink_get_connections()
+RETURNS text[]
+AS 'MODULE_PATHNAME', 'dblink_get_connections'
+LANGUAGE C;
+
+CREATE OR REPLACE FUNCTION dblink_cancel_query(text)
+RETURNS text
+AS 'MODULE_PATHNAME', 'dblink_cancel_query'
+LANGUAGE C STRICT;
+
+CREATE OR REPLACE FUNCTION dblink_error_message(text)
+RETURNS text
+AS 'MODULE_PATHNAME', 'dblink_error_message'
+LANGUAGE C STRICT;
diff --git a/contrib/dblink/doc/misc b/contrib/dblink/doc/misc
index ae79cf88ba..3834afd872 100644
--- a/contrib/dblink/doc/misc
+++ b/contrib/dblink/doc/misc
@@ -1,4 +1,4 @@
-$PostgreSQL: pgsql/contrib/dblink/doc/misc,v 1.3 2006/03/11 04:38:29 momjian Exp $
+$PostgreSQL: pgsql/contrib/dblink/doc/misc,v 1.4 2006/09/02 21:11:15 joe Exp $
==================================================================
Name
@@ -139,3 +139,94 @@ test=# select dblink_build_sql_update('foo','1 2',2,'{"1", "a"}','{"1", "b"}');
UPDATE foo SET f1='1',f2='b',f3='1' WHERE f1='1' AND f2='b'
(1 row)
+
+==================================================================
+Name
+
+dblink_get_connections -- returns a text array of all active named
+ dblink connections
+
+Synopsis
+
+dblink_get_connections() RETURNS text[]
+
+Inputs
+
+ none
+
+Outputs
+
+ Returns text array of all active named dblink connections
+
+Example usage
+
+ SELECT dblink_get_connections();
+
+==================================================================
+Name
+
+dblink_is_busy -- checks to see if named connection is busy
+ with an async query
+
+Synopsis
+
+dblink_is_busy(text connname) RETURNS int
+
+Inputs
+
+ connname
+ The specific connection name to use.
+
+Outputs
+
+ Returns 1 if connection is busy, 0 if it is not busy.
+ If this function returns 0, it is guaranteed that dblink_get_result
+ will not block.
+
+Example usage
+
+ SELECT dblink_is_busy('dtest1');
+
+==================================================================
+Name
+
+dblink_cancel_query -- cancels any active query on the named connection
+
+Synopsis
+
+dblink_cancel_query(text connname) RETURNS text
+
+Inputs
+
+ connname
+ The specific connection name to use.
+
+Outputs
+
+ Returns "OK" on success, or an error message on failure.
+
+Example usage
+
+ SELECT dblink_cancel_query('dtest1');
+
+==================================================================
+Name
+
+dblink_error_message -- gets last error message on the named connection
+
+Synopsis
+
+dblink_error_message(text connname) RETURNS text
+
+Inputs
+
+ connname
+ The specific connection name to use.
+
+Outputs
+
+ Returns last error message.
+
+Example usage
+
+ SELECT dblink_error_message('dtest1');
diff --git a/contrib/dblink/doc/query b/contrib/dblink/doc/query
index cd58a36142..42427b5d5c 100644
--- a/contrib/dblink/doc/query
+++ b/contrib/dblink/doc/query
@@ -118,3 +118,125 @@ Then you can simply write:
select * from myremote_pg_proc where proname like 'bytea%';
+
+==================================================================
+Name
+
+dblink_send_query -- Sends an async query to a remote database
+
+Synopsis
+
+dblink_send_query(text connname, text sql)
+
+Inputs
+
+ connname
+ The specific connection name to use.
+
+ sql
+
+ sql statement that you wish to execute on the remote host
+ e.g. "select * from pg_class"
+
+Outputs
+
+ Returns int. A return value of 1 if the query was successfully dispatched,
+ 0 otherwise. If 1, results must be fetched by dblink_get_result(connname).
+ A running query may be cancelled by dblink_cancel_query(connname).
+
+Example usage
+
+ SELECT dblink_connect('dtest1', 'dbname=contrib_regression');
+ SELECT * from
+ dblink_send_query('dtest1', 'select * from foo where f1 < 3') as t1;
+
+==================================================================
+Name
+
+dblink_get_result -- Gets an async query result
+
+Synopsis
+
+dblink_get_result(text connname [, bool fail_on_error])
+
+Inputs
+
+ connname
+ The specific connection name to use. An asynchronous query must
+ have already been sent using dblink_send_query()
+
+ fail_on_error
+
+ If true (default when not present) then an ERROR thrown on the remote side
+ of the connection causes an ERROR to also be thrown locally. If false, the
+ remote ERROR is locally treated as a NOTICE, and no rows are returned.
+
+Outputs
+
+ Returns setof record
+
+Notes
+ Blocks until a result gets available.
+
+ This function *must* be called if dblink_send_query returned
+ a 1, even on cancelled queries - otherwise the connection
+ can't be used anymore. It must be called once for each query
+ sent, and one additional time to obtain an empty set result,
+ prior to using the connection again.
+
+Example usage
+
+contrib_regression=# SELECT dblink_connect('dtest1', 'dbname=contrib_regression');
+ dblink_connect
+----------------
+ OK
+(1 row)
+
+contrib_regression=# SELECT * from
+contrib_regression-# dblink_send_query('dtest1', 'select * from foo where f1 < 3') as t1;
+ t1
+----
+ 1
+(1 row)
+
+contrib_regression=# SELECT * from dblink_get_result('dtest1') as t1(f1 int, f2 text, f3 text[]);
+ f1 | f2 | f3
+----+----+------------
+ 0 | a | {a0,b0,c0}
+ 1 | b | {a1,b1,c1}
+ 2 | c | {a2,b2,c2}
+(3 rows)
+
+contrib_regression=# SELECT * from dblink_get_result('dtest1') as t1(f1 int, f2 text, f3 text[]);
+ f1 | f2 | f3
+----+----+----
+(0 rows)
+
+contrib_regression=# SELECT * from
+ dblink_send_query('dtest1', 'select * from foo where f1 < 3; select * from foo where f1 > 6') as t1;
+ t1
+----
+ 1
+(1 row)
+
+contrib_regression=# SELECT * from dblink_get_result('dtest1') as t1(f1 int, f2 text, f3 text[]);
+ f1 | f2 | f3
+----+----+------------
+ 0 | a | {a0,b0,c0}
+ 1 | b | {a1,b1,c1}
+ 2 | c | {a2,b2,c2}
+(3 rows)
+
+contrib_regression=# SELECT * from dblink_get_result('dtest1') as t1(f1 int, f2 text, f3 text[]);
+ f1 | f2 | f3
+----+----+---------------
+ 7 | h | {a7,b7,c7}
+ 8 | i | {a8,b8,c8}
+ 9 | j | {a9,b9,c9}
+ 10 | k | {a10,b10,c10}
+(4 rows)
+
+contrib_regression=# SELECT * from dblink_get_result('dtest1') as t1(f1 int, f2 text, f3 text[]);
+ f1 | f2 | f3
+----+----+----
+(0 rows)
diff --git a/contrib/dblink/expected/dblink.out b/contrib/dblink/expected/dblink.out
index f2e364a942..c98ae5cf6b 100644
--- a/contrib/dblink/expected/dblink.out
+++ b/contrib/dblink/expected/dblink.out
@@ -669,3 +669,90 @@ SELECT dblink_disconnect('myconn');
-- should get 'connection "myconn" not available' error
SELECT dblink_disconnect('myconn');
ERROR: connection "myconn" not available
+-- test asynchronous queries
+SELECT dblink_connect('dtest1', 'dbname=contrib_regression');
+ dblink_connect
+----------------
+ OK
+(1 row)
+
+SELECT * from
+ dblink_send_query('dtest1', 'select * from foo where f1 < 3') as t1;
+ t1
+----
+ 1
+(1 row)
+
+SELECT dblink_connect('dtest2', 'dbname=contrib_regression');
+ dblink_connect
+----------------
+ OK
+(1 row)
+
+SELECT * from
+ dblink_send_query('dtest2', 'select * from foo where f1 > 2 and f1 < 7') as t1;
+ t1
+----
+ 1
+(1 row)
+
+SELECT dblink_connect('dtest3', 'dbname=contrib_regression');
+ dblink_connect
+----------------
+ OK
+(1 row)
+
+SELECT * from
+ dblink_send_query('dtest3', 'select * from foo where f1 > 6') as t1;
+ t1
+----
+ 1
+(1 row)
+
+CREATE TEMPORARY TABLE result AS
+(SELECT * from dblink_get_result('dtest1') as t1(f1 int, f2 text, f3 text[]))
+UNION
+(SELECT * from dblink_get_result('dtest2') as t2(f1 int, f2 text, f3 text[]))
+UNION
+(SELECT * from dblink_get_result('dtest3') as t3(f1 int, f2 text, f3 text[]))
+ORDER by f1;
+SELECT dblink_get_connections();
+ dblink_get_connections
+------------------------
+ {dtest1,dtest2,dtest3}
+(1 row)
+
+SELECT dblink_disconnect('dtest1');
+ dblink_disconnect
+-------------------
+ OK
+(1 row)
+
+SELECT dblink_disconnect('dtest2');
+ dblink_disconnect
+-------------------
+ OK
+(1 row)
+
+SELECT dblink_disconnect('dtest3');
+ dblink_disconnect
+-------------------
+ OK
+(1 row)
+
+SELECT * from result;
+ f1 | f2 | f3
+----+----+---------------
+ 0 | a | {a0,b0,c0}
+ 1 | b | {a1,b1,c1}
+ 2 | c | {a2,b2,c2}
+ 3 | d | {a3,b3,c3}
+ 4 | e | {a4,b4,c4}
+ 5 | f | {a5,b5,c5}
+ 6 | g | {a6,b6,c6}
+ 7 | h | {a7,b7,c7}
+ 8 | i | {a8,b8,c8}
+ 9 | j | {a9,b9,c9}
+ 10 | k | {a10,b10,c10}
+(11 rows)
+
diff --git a/contrib/dblink/sql/dblink.sql b/contrib/dblink/sql/dblink.sql
index 66e2607cfe..52a3d049b9 100644
--- a/contrib/dblink/sql/dblink.sql
+++ b/contrib/dblink/sql/dblink.sql
@@ -319,3 +319,32 @@ SELECT dblink_disconnect('myconn');
-- close the named persistent connection again
-- should get 'connection "myconn" not available' error
SELECT dblink_disconnect('myconn');
+
+-- test asynchronous queries
+SELECT dblink_connect('dtest1', 'dbname=contrib_regression');
+SELECT * from
+ dblink_send_query('dtest1', 'select * from foo where f1 < 3') as t1;
+
+SELECT dblink_connect('dtest2', 'dbname=contrib_regression');
+SELECT * from
+ dblink_send_query('dtest2', 'select * from foo where f1 > 2 and f1 < 7') as t1;
+
+SELECT dblink_connect('dtest3', 'dbname=contrib_regression');
+SELECT * from
+ dblink_send_query('dtest3', 'select * from foo where f1 > 6') as t1;
+
+CREATE TEMPORARY TABLE result AS
+(SELECT * from dblink_get_result('dtest1') as t1(f1 int, f2 text, f3 text[]))
+UNION
+(SELECT * from dblink_get_result('dtest2') as t2(f1 int, f2 text, f3 text[]))
+UNION
+(SELECT * from dblink_get_result('dtest3') as t3(f1 int, f2 text, f3 text[]))
+ORDER by f1;
+
+SELECT dblink_get_connections();
+
+SELECT dblink_disconnect('dtest1');
+SELECT dblink_disconnect('dtest2');
+SELECT dblink_disconnect('dtest3');
+SELECT * from result;
+