summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTom Lane <tgl@sss.pgh.pa.us>2010-02-18 01:29:10 +0000
committerTom Lane <tgl@sss.pgh.pa.us>2010-02-18 01:29:10 +0000
commitc0d5be5d6a736d2ee8141e920bc3de8e001bf6d9 (patch)
treef46f456a6b66217164dae1cbc36f975bd9679a00
parent2b44d74dd4caa0d5cec2aeb0ceec7923b69109d3 (diff)
downloadpostgresql-c0d5be5d6a736d2ee8141e920bc3de8e001bf6d9.tar.gz
Fix up pg_dump's treatment of large object ownership and ACLs. We now emit
a separate archive entry for each BLOB, and use pg_dump's standard methods for dealing with its ownership, ACL if any, and comment if any. This means that switches like --no-owner and --no-privileges do what they're supposed to. Preliminary testing says that performance is still reasonable even with many blobs, though we'll have to see how that shakes out in the field. KaiGai Kohei, revised by me
-rw-r--r--src/bin/pg_dump/dumputils.c5
-rw-r--r--src/bin/pg_dump/pg_backup_archiver.c94
-rw-r--r--src/bin/pg_dump/pg_backup_archiver.h13
-rw-r--r--src/bin/pg_dump/pg_backup_db.c24
-rw-r--r--src/bin/pg_dump/pg_backup_null.c15
-rw-r--r--src/bin/pg_dump/pg_dump.c344
-rw-r--r--src/bin/pg_dump/pg_dump.h13
-rw-r--r--src/bin/pg_dump/pg_dump_sort.c24
8 files changed, 284 insertions, 248 deletions
diff --git a/src/bin/pg_dump/dumputils.c b/src/bin/pg_dump/dumputils.c
index e448e388d4..3452944a42 100644
--- a/src/bin/pg_dump/dumputils.c
+++ b/src/bin/pg_dump/dumputils.c
@@ -8,7 +8,7 @@
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/bin/pg_dump/dumputils.c,v 1.53 2010/01/02 16:57:59 momjian Exp $
+ * $PostgreSQL: pgsql/src/bin/pg_dump/dumputils.c,v 1.54 2010/02/18 01:29:10 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -486,7 +486,8 @@ parsePGArray(const char *atext, char ***itemarray, int *nitems)
* name: the object name, in the form to use in the commands (already quoted)
* subname: the sub-object name, if any (already quoted); NULL if none
* type: the object type (as seen in GRANT command: must be one of
- * TABLE, SEQUENCE, FUNCTION, LANGUAGE, SCHEMA, DATABASE, or TABLESPACE)
+ * TABLE, SEQUENCE, FUNCTION, LANGUAGE, SCHEMA, DATABASE, TABLESPACE,
+ * FOREIGN DATA WRAPPER, SERVER, or LARGE OBJECT)
* acls: the ACL string fetched from the database
* owner: username of object owner (will be passed through fmtId); can be
* NULL or empty string to indicate "no owner known"
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 4357d11971..f227ed1590 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -15,7 +15,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.c,v 1.178 2010/01/19 18:39:19 tgl Exp $
+ * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.c,v 1.179 2010/02/18 01:29:10 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -98,6 +98,7 @@ static void _selectTablespace(ArchiveHandle *AH, const char *tablespace);
static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te);
static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te);
static teReqs _tocEntryRequired(TocEntry *te, RestoreOptions *ropt, bool include_acls);
+static bool _tocEntryIsACL(TocEntry *te);
static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
static TocEntry *getTocEntryByDumpId(ArchiveHandle *AH, DumpId id);
@@ -329,9 +330,9 @@ RestoreArchive(Archive *AHX, RestoreOptions *ropt)
AH->currentTE = te;
reqs = _tocEntryRequired(te, ropt, false /* needn't drop ACLs */ );
- if (((reqs & REQ_SCHEMA) != 0) && te->dropStmt)
+ /* We want anything that's selected and has a dropStmt */
+ if (((reqs & (REQ_SCHEMA|REQ_DATA)) != 0) && te->dropStmt)
{
- /* We want the schema */
ahlog(AH, 1, "dropping %s %s\n", te->desc, te->tag);
/* Select owner and schema as necessary */
_becomeOwner(AH, te);
@@ -381,7 +382,8 @@ RestoreArchive(Archive *AHX, RestoreOptions *ropt)
/* Work out what, if anything, we want from this entry */
reqs = _tocEntryRequired(te, ropt, true);
- if ((reqs & REQ_SCHEMA) != 0) /* We want the schema */
+ /* Both schema and data objects might now have ownership/ACLs */
+ if ((reqs & (REQ_SCHEMA|REQ_DATA)) != 0)
{
ahlog(AH, 1, "setting owner and privileges for %s %s\n",
te->desc, te->tag);
@@ -905,6 +907,7 @@ EndRestoreBlobs(ArchiveHandle *AH)
void
StartRestoreBlob(ArchiveHandle *AH, Oid oid, bool drop)
{
+ bool old_blob_style = (AH->version < K_VERS_1_12);
Oid loOid;
AH->blobCount++;
@@ -914,24 +917,32 @@ StartRestoreBlob(ArchiveHandle *AH, Oid oid, bool drop)
ahlog(AH, 2, "restoring large object with OID %u\n", oid);
- if (drop)
+ /* With an old archive we must do drop and create logic here */
+ if (old_blob_style && drop)
DropBlobIfExists(AH, oid);
if (AH->connection)
{
- loOid = lo_create(AH->connection, oid);
- if (loOid == 0 || loOid != oid)
- die_horribly(AH, modulename, "could not create large object %u\n",
- oid);
-
+ if (old_blob_style)
+ {
+ loOid = lo_create(AH->connection, oid);
+ if (loOid == 0 || loOid != oid)
+ die_horribly(AH, modulename, "could not create large object %u\n",
+ oid);
+ }
AH->loFd = lo_open(AH->connection, oid, INV_WRITE);
if (AH->loFd == -1)
- die_horribly(AH, modulename, "could not open large object\n");
+ die_horribly(AH, modulename, "could not open large object %u\n",
+ oid);
}
else
{
- ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
- oid, INV_WRITE);
+ if (old_blob_style)
+ ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
+ oid, INV_WRITE);
+ else
+ ahprintf(AH, "SELECT pg_catalog.lo_open('%u', %d);\n",
+ oid, INV_WRITE);
}
AH->writingBlob = 1;
@@ -1829,6 +1840,9 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
AH->vmin = K_VERS_MINOR;
AH->vrev = K_VERS_REV;
+ /* Make a convenient integer <maj><min><rev>00 */
+ AH->version = ((AH->vmaj * 256 + AH->vmin) * 256 + AH->vrev) * 256 + 0;
+
/* initialize for backwards compatible string processing */
AH->public.encoding = 0; /* PG_SQL_ASCII */
AH->public.std_strings = false;
@@ -2068,12 +2082,13 @@ ReadToc(ArchiveHandle *AH)
else
{
/*
- * rules for pre-8.4 archives wherein pg_dump hasn't classified
- * the entries into sections
+ * Rules for pre-8.4 archives wherein pg_dump hasn't classified
+ * the entries into sections. This list need not cover entry
+ * types added later than 8.4.
*/
if (strcmp(te->desc, "COMMENT") == 0 ||
strcmp(te->desc, "ACL") == 0 ||
- strcmp(te->desc, "DEFAULT ACL") == 0)
+ strcmp(te->desc, "ACL LANGUAGE") == 0)
te->section = SECTION_NONE;
else if (strcmp(te->desc, "TABLE DATA") == 0 ||
strcmp(te->desc, "BLOBS") == 0 ||
@@ -2228,10 +2243,10 @@ _tocEntryRequired(TocEntry *te, RestoreOptions *ropt, bool include_acls)
return 0;
/* If it's an ACL, maybe ignore it */
- if ((!include_acls || ropt->aclsSkip) &&
- (strcmp(te->desc, "ACL") == 0 || strcmp(te->desc, "DEFAULT ACL") == 0))
+ if ((!include_acls || ropt->aclsSkip) && _tocEntryIsACL(te))
return 0;
+ /* Ignore DATABASE entry unless we should create it */
if (!ropt->create && strcmp(te->desc, "DATABASE") == 0)
return 0;
@@ -2286,9 +2301,18 @@ _tocEntryRequired(TocEntry *te, RestoreOptions *ropt, bool include_acls)
if (!te->hadDumper)
{
/*
- * Special Case: If 'SEQUENCE SET' then it is considered a data entry
+ * Special Case: If 'SEQUENCE SET' or anything to do with BLOBs,
+ * then it is considered a data entry. We don't need to check for
+ * the BLOBS entry or old-style BLOB COMMENTS, because they will
+ * have hadDumper = true ... but we do need to check new-style
+ * BLOB comments.
*/
- if (strcmp(te->desc, "SEQUENCE SET") == 0)
+ if (strcmp(te->desc, "SEQUENCE SET") == 0 ||
+ strcmp(te->desc, "BLOB") == 0 ||
+ (strcmp(te->desc, "ACL") == 0 &&
+ strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
+ (strcmp(te->desc, "COMMENT") == 0 &&
+ strncmp(te->tag, "LARGE OBJECT ", 13) == 0))
res = res & REQ_DATA;
else
res = res & ~REQ_DATA;
@@ -2321,6 +2345,20 @@ _tocEntryRequired(TocEntry *te, RestoreOptions *ropt, bool include_acls)
}
/*
+ * Identify TOC entries that are ACLs.
+ */
+static bool
+_tocEntryIsACL(TocEntry *te)
+{
+ /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
+ if (strcmp(te->desc, "ACL") == 0 ||
+ strcmp(te->desc, "ACL LANGUAGE") == 0 ||
+ strcmp(te->desc, "DEFAULT ACL") == 0)
+ return true;
+ return false;
+}
+
+/*
* Issue SET commands for parameters that we want to have set the same way
* at all times during execution of a restore script.
*/
@@ -2685,6 +2723,13 @@ _getObjectDescription(PQExpBuffer buf, TocEntry *te, ArchiveHandle *AH)
return;
}
+ /* BLOBs just have a name, but it's numeric so must not use fmtId */
+ if (strcmp(type, "BLOB") == 0)
+ {
+ appendPQExpBuffer(buf, "LARGE OBJECT %s", te->tag);
+ return;
+ }
+
/*
* These object types require additional decoration. Fortunately, the
* information needed is exactly what's in the DROP command.
@@ -2723,14 +2768,12 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool isDat
/* ACLs are dumped only during acl pass */
if (acl_pass)
{
- if (!(strcmp(te->desc, "ACL") == 0 ||
- strcmp(te->desc, "DEFAULT ACL") == 0))
+ if (!_tocEntryIsACL(te))
return;
}
else
{
- if (strcmp(te->desc, "ACL") == 0 ||
- strcmp(te->desc, "DEFAULT ACL") == 0)
+ if (_tocEntryIsACL(te))
return;
}
@@ -2824,6 +2867,7 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool isDat
strlen(te->owner) > 0 && strlen(te->dropStmt) > 0)
{
if (strcmp(te->desc, "AGGREGATE") == 0 ||
+ strcmp(te->desc, "BLOB") == 0 ||
strcmp(te->desc, "CONVERSION") == 0 ||
strcmp(te->desc, "DATABASE") == 0 ||
strcmp(te->desc, "DOMAIN") == 0 ||
@@ -2873,7 +2917,7 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool isDat
* If it's an ACL entry, it might contain SET SESSION AUTHORIZATION
* commands, so we can no longer assume we know the current auth setting.
*/
- if (strncmp(te->desc, "ACL", 3) == 0)
+ if (acl_pass)
{
if (AH->currUser)
free(AH->currUser);
diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h
index b3eb8b7b10..dc2ccb5e95 100644
--- a/src/bin/pg_dump/pg_backup_archiver.h
+++ b/src/bin/pg_dump/pg_backup_archiver.h
@@ -17,7 +17,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.h,v 1.83 2009/12/14 00:39:11 itagaki Exp $
+ * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.h,v 1.84 2010/02/18 01:29:10 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -61,16 +61,16 @@ typedef struct _z_stream
typedef z_stream *z_streamp;
#endif
+/* Current archive version number (the format we can output) */
#define K_VERS_MAJOR 1
-#define K_VERS_MINOR 11
+#define K_VERS_MINOR 12
#define K_VERS_REV 0
/* Data block types */
#define BLK_DATA 1
-#define BLK_BLOB 2
#define BLK_BLOBS 3
-/* Some important version numbers (checked in code) */
+/* Historical version numbers (checked in code) */
#define K_VERS_1_0 (( (1 * 256 + 0) * 256 + 0) * 256 + 0)
#define K_VERS_1_2 (( (1 * 256 + 2) * 256 + 0) * 256 + 0) /* Allow No ZLIB */
#define K_VERS_1_3 (( (1 * 256 + 3) * 256 + 0) * 256 + 0) /* BLOBs */
@@ -87,8 +87,11 @@ typedef z_stream *z_streamp;
#define K_VERS_1_10 (( (1 * 256 + 10) * 256 + 0) * 256 + 0) /* add tablespace */
#define K_VERS_1_11 (( (1 * 256 + 11) * 256 + 0) * 256 + 0) /* add toc section
* indicator */
+#define K_VERS_1_12 (( (1 * 256 + 12) * 256 + 0) * 256 + 0) /* add separate BLOB
+ * entries */
-#define K_VERS_MAX (( (1 * 256 + 11) * 256 + 255) * 256 + 0)
+/* Newest format we can read */
+#define K_VERS_MAX (( (1 * 256 + 12) * 256 + 255) * 256 + 0)
/* Flags to indicate disposition of offsets stored in files */
diff --git a/src/bin/pg_dump/pg_backup_db.c b/src/bin/pg_dump/pg_backup_db.c
index 53711de119..1a9ddc9580 100644
--- a/src/bin/pg_dump/pg_backup_db.c
+++ b/src/bin/pg_dump/pg_backup_db.c
@@ -5,7 +5,7 @@
* Implements the basic DB functions used by the archiver.
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_db.c,v 1.87 2010/02/17 04:19:40 tgl Exp $
+ * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_db.c,v 1.88 2010/02/18 01:29:10 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -703,16 +703,26 @@ CommitTransaction(ArchiveHandle *AH)
void
DropBlobIfExists(ArchiveHandle *AH, Oid oid)
{
- /* Call lo_unlink only if exists to avoid not-found error. */
- if (PQserverVersion(AH->connection) >= 90000)
+ /*
+ * If we are not restoring to a direct database connection, we have to
+ * guess about how to detect whether the blob exists. Assume new-style.
+ */
+ if (AH->connection == NULL ||
+ PQserverVersion(AH->connection) >= 90000)
{
- ahprintf(AH, "SELECT pg_catalog.lo_unlink(oid) "
- "FROM pg_catalog.pg_largeobject_metadata "
- "WHERE oid = %u;\n", oid);
+ ahprintf(AH,
+ "SELECT pg_catalog.lo_unlink(oid) "
+ "FROM pg_catalog.pg_largeobject_metadata "
+ "WHERE oid = '%u';\n",
+ oid);
}
else
{
- ahprintf(AH, "SELECT CASE WHEN EXISTS(SELECT 1 FROM pg_catalog.pg_largeobject WHERE loid = '%u') THEN pg_catalog.lo_unlink('%u') END;\n",
+ /* Restoring to pre-9.0 server, so do it the old way */
+ ahprintf(AH,
+ "SELECT CASE WHEN EXISTS("
+ "SELECT 1 FROM pg_catalog.pg_largeobject WHERE loid = '%u'"
+ ") THEN pg_catalog.lo_unlink('%u') END;\n",
oid, oid);
}
}
diff --git a/src/bin/pg_dump/pg_backup_null.c b/src/bin/pg_dump/pg_backup_null.c
index 127ff57219..35b9a18a4e 100644
--- a/src/bin/pg_dump/pg_backup_null.c
+++ b/src/bin/pg_dump/pg_backup_null.c
@@ -17,7 +17,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_null.c,v 1.23 2009/12/14 00:39:11 itagaki Exp $
+ * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_null.c,v 1.24 2010/02/18 01:29:10 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -147,14 +147,21 @@ _StartBlobs(ArchiveHandle *AH, TocEntry *te)
static void
_StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
{
+ bool old_blob_style = (AH->version < K_VERS_1_12);
+
if (oid == 0)
die_horribly(AH, NULL, "invalid OID for large object\n");
- if (AH->ropt->dropSchema)
+ /* With an old archive we must do drop and create logic here */
+ if (old_blob_style && AH->ropt->dropSchema)
DropBlobIfExists(AH, oid);
- ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
- oid, INV_WRITE);
+ if (old_blob_style)
+ ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
+ oid, INV_WRITE);
+ else
+ ahprintf(AH, "SELECT pg_catalog.lo_open('%u', %d);\n",
+ oid, INV_WRITE);
AH->WriteDataPtr = _WriteBlobData;
}
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index ace4ffe34b..84d12b118f 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -12,7 +12,7 @@
* by PostgreSQL
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/bin/pg_dump/pg_dump.c,v 1.571 2010/02/17 04:19:40 tgl Exp $
+ * $PostgreSQL: pgsql/src/bin/pg_dump/pg_dump.c,v 1.572 2010/02/18 01:29:10 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -190,9 +190,9 @@ static void selectSourceSchema(const char *schemaName);
static char *getFormattedTypeName(Oid oid, OidOptions opts);
static char *myFormatType(const char *typname, int32 typmod);
static const char *fmtQualifiedId(const char *schema, const char *id);
-static bool hasBlobs(Archive *AH);
+static void getBlobs(Archive *AH);
+static void dumpBlob(Archive *AH, BlobInfo *binfo);
static int dumpBlobs(Archive *AH, void *arg);
-static int dumpBlobComments(Archive *AH, void *arg);
static void dumpDatabase(Archive *AH);
static void dumpEncoding(Archive *AH);
static void dumpStdStrings(Archive *AH);
@@ -701,25 +701,8 @@ main(int argc, char **argv)
getTableDataFKConstraints();
}
- if (outputBlobs && hasBlobs(g_fout))
- {
- /* Add placeholders to allow correct sorting of blobs */
- DumpableObject *blobobj;
- DumpableObject *blobcobj;
-
- blobobj = (DumpableObject *) malloc(sizeof(DumpableObject));
- blobobj->objType = DO_BLOBS;
- blobobj->catId = nilCatalogId;
- AssignDumpId(blobobj);
- blobobj->name = strdup("BLOBS");
-
- blobcobj = (DumpableObject *) malloc(sizeof(DumpableObject));
- blobcobj->objType = DO_BLOB_COMMENTS;
- blobcobj->catId = nilCatalogId;
- AssignDumpId(blobcobj);
- blobcobj->name = strdup("BLOB COMMENTS");
- addObjectDependency(blobcobj, blobobj->dumpId);
- }
+ if (outputBlobs)
+ getBlobs(g_fout);
/*
* Collect dependency data to assist in ordering the objects.
@@ -1808,7 +1791,7 @@ dumpDatabase(Archive *AH)
appendPQExpBuffer(loFrozenQry, "SELECT relfrozenxid\n"
"FROM pg_catalog.pg_class\n"
- "WHERE oid = %d;\n",
+ "WHERE oid = %u;\n",
LargeObjectRelationId);
lo_res = PQexec(g_conn, loFrozenQry->data);
@@ -1825,7 +1808,7 @@ dumpDatabase(Archive *AH)
appendPQExpBuffer(loOutQry, "\n-- For binary upgrade, set pg_largeobject relfrozenxid.\n");
appendPQExpBuffer(loOutQry, "UPDATE pg_catalog.pg_class\n"
"SET relfrozenxid = '%u'\n"
- "WHERE oid = %d;\n",
+ "WHERE oid = %u;\n",
atoi(PQgetvalue(lo_res, 0, i_relfrozenxid)),
LargeObjectRelationId);
ArchiveEntry(AH, nilCatalogId, createDumpId(),
@@ -1938,40 +1921,135 @@ dumpStdStrings(Archive *AH)
/*
- * hasBlobs:
- * Test whether database contains any large objects
+ * getBlobs:
+ * Collect schema-level data about large objects
*/
-static bool
-hasBlobs(Archive *AH)
+static void
+getBlobs(Archive *AH)
{
- bool result;
- const char *blobQry;
- PGresult *res;
+ PQExpBuffer blobQry = createPQExpBuffer();
+ BlobInfo *binfo;
+ DumpableObject *bdata;
+ PGresult *res;
+ int ntups;
+ int i;
+
+ /* Verbose message */
+ if (g_verbose)
+ write_msg(NULL, "reading large objects\n");
/* Make sure we are in proper schema */
selectSourceSchema("pg_catalog");
- /* Check for BLOB OIDs */
+ /* Fetch BLOB OIDs, and owner/ACL data if >= 9.0 */
if (AH->remoteVersion >= 90000)
- blobQry = "SELECT oid FROM pg_largeobject_metadata LIMIT 1";
+ appendPQExpBuffer(blobQry,
+ "SELECT oid, (%s lomowner) AS rolname, lomacl"
+ " FROM pg_largeobject_metadata",
+ username_subquery);
else if (AH->remoteVersion >= 70100)
- blobQry = "SELECT loid FROM pg_largeobject LIMIT 1";
+ appendPQExpBuffer(blobQry,
+ "SELECT DISTINCT loid, NULL::oid, NULL::oid"
+ " FROM pg_largeobject");
else
- blobQry = "SELECT oid FROM pg_class WHERE relkind = 'l' LIMIT 1";
+ appendPQExpBuffer(blobQry,
+ "SELECT oid, NULL::oid, NULL::oid"
+ " FROM pg_class WHERE relkind = 'l'");
- res = PQexec(g_conn, blobQry);
- check_sql_result(res, g_conn, blobQry, PGRES_TUPLES_OK);
+ res = PQexec(g_conn, blobQry->data);
+ check_sql_result(res, g_conn, blobQry->data, PGRES_TUPLES_OK);
+
+ ntups = PQntuples(res);
+ if (ntups > 0)
+ {
+ /*
+ * Each large object has its own BLOB archive entry.
+ */
+ binfo = (BlobInfo *) malloc(ntups * sizeof(BlobInfo));
- result = PQntuples(res) > 0;
+ for (i = 0; i < ntups; i++)
+ {
+ binfo[i].dobj.objType = DO_BLOB;
+ binfo[i].dobj.catId.tableoid = LargeObjectRelationId;
+ binfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, 0));
+ AssignDumpId(&binfo[i].dobj);
+
+ binfo[i].dobj.name = strdup(PQgetvalue(res, i, 0));
+ if (!PQgetisnull(res, i, 1))
+ binfo[i].rolname = strdup(PQgetvalue(res, i, 1));
+ else
+ binfo[i].rolname = "";
+ if (!PQgetisnull(res, i, 2))
+ binfo[i].blobacl = strdup(PQgetvalue(res, i, 2));
+ else
+ binfo[i].blobacl = NULL;
+ }
+
+ /*
+ * If we have any large objects, a "BLOBS" archive entry is needed.
+ * This is just a placeholder for sorting; it carries no data now.
+ */
+ bdata = (DumpableObject *) malloc(sizeof(DumpableObject));
+ bdata->objType = DO_BLOB_DATA;
+ bdata->catId = nilCatalogId;
+ AssignDumpId(bdata);
+ bdata->name = strdup("BLOBS");
+ }
PQclear(res);
+ destroyPQExpBuffer(blobQry);
+}
- return result;
+/*
+ * dumpBlob
+ *
+ * dump the definition (metadata) of the given large object
+ */
+static void
+dumpBlob(Archive *AH, BlobInfo *binfo)
+{
+ PQExpBuffer cquery = createPQExpBuffer();
+ PQExpBuffer dquery = createPQExpBuffer();
+
+ appendPQExpBuffer(cquery,
+ "SELECT pg_catalog.lo_create('%s');\n",
+ binfo->dobj.name);
+
+ appendPQExpBuffer(dquery,
+ "SELECT pg_catalog.lo_unlink('%s');\n",
+ binfo->dobj.name);
+
+ ArchiveEntry(AH, binfo->dobj.catId, binfo->dobj.dumpId,
+ binfo->dobj.name,
+ NULL, NULL,
+ binfo->rolname, false,
+ "BLOB", SECTION_PRE_DATA,
+ cquery->data, dquery->data, NULL,
+ binfo->dobj.dependencies, binfo->dobj.nDeps,
+ NULL, NULL);
+
+ /* set up tag for comment and/or ACL */
+ resetPQExpBuffer(cquery);
+ appendPQExpBuffer(cquery, "LARGE OBJECT %s", binfo->dobj.name);
+
+ /* Dump comment if any */
+ dumpComment(AH, cquery->data,
+ NULL, binfo->rolname,
+ binfo->dobj.catId, 0, binfo->dobj.dumpId);
+
+ /* Dump ACL if any */
+ if (binfo->blobacl)
+ dumpACL(AH, binfo->dobj.catId, binfo->dobj.dumpId, "LARGE OBJECT",
+ binfo->dobj.name, NULL, cquery->data,
+ NULL, binfo->rolname, binfo->blobacl);
+
+ destroyPQExpBuffer(cquery);
+ destroyPQExpBuffer(dquery);
}
/*
* dumpBlobs:
- * dump all blobs
+ * dump the data contents of all large objects
*/
static int
dumpBlobs(Archive *AH, void *arg)
@@ -1980,6 +2058,7 @@ dumpBlobs(Archive *AH, void *arg)
const char *blobFetchQry;
PGresult *res;
char buf[LOBBUFSIZE];
+ int ntups;
int i;
int cnt;
@@ -1989,7 +2068,10 @@ dumpBlobs(Archive *AH, void *arg)
/* Make sure we are in proper schema */
selectSourceSchema("pg_catalog");
- /* Cursor to get all BLOB OIDs */
+ /*
+ * Currently, we re-fetch all BLOB OIDs using a cursor. Consider
+ * scanning the already-in-memory dumpable objects instead...
+ */
if (AH->remoteVersion >= 90000)
blobQry = "DECLARE bloboid CURSOR FOR SELECT oid FROM pg_largeobject_metadata";
else if (AH->remoteVersion >= 70100)
@@ -2012,7 +2094,8 @@ dumpBlobs(Archive *AH, void *arg)
check_sql_result(res, g_conn, blobFetchQry, PGRES_TUPLES_OK);
/* Process the tuples, if any */
- for (i = 0; i < PQntuples(res); i++)
+ ntups = PQntuples(res);
+ for (i = 0; i < ntups; i++)
{
Oid blobOid;
int loFd;
@@ -2022,8 +2105,8 @@ dumpBlobs(Archive *AH, void *arg)
loFd = lo_open(g_conn, blobOid, INV_READ);
if (loFd == -1)
{
- write_msg(NULL, "dumpBlobs(): could not open large object: %s",
- PQerrorMessage(g_conn));
+ write_msg(NULL, "dumpBlobs(): could not open large object %u: %s",
+ blobOid, PQerrorMessage(g_conn));
exit_nicely();
}
@@ -2035,8 +2118,8 @@ dumpBlobs(Archive *AH, void *arg)
cnt = lo_read(g_conn, loFd, buf, LOBBUFSIZE);
if (cnt < 0)
{
- write_msg(NULL, "dumpBlobs(): error reading large object: %s",
- PQerrorMessage(g_conn));
+ write_msg(NULL, "dumpBlobs(): error reading large object %u: %s",
+ blobOid, PQerrorMessage(g_conn));
exit_nicely();
}
@@ -2047,141 +2130,13 @@ dumpBlobs(Archive *AH, void *arg)
EndBlob(AH, blobOid);
}
- } while (PQntuples(res) > 0);
+ } while (ntups > 0);
PQclear(res);
return 1;
}
-/*
- * dumpBlobComments
- * dump all blob properties.
- * It has "BLOB COMMENTS" tag due to the historical reason, but note
- * that it is the routine to dump all the properties of blobs.
- *
- * Since we don't provide any way to be selective about dumping blobs,
- * there's no need to be selective about their comments either. We put
- * all the comments into one big TOC entry.
- */
-static int
-dumpBlobComments(Archive *AH, void *arg)
-{
- const char *blobQry;
- const char *blobFetchQry;
- PQExpBuffer cmdQry = createPQExpBuffer();
- PGresult *res;
- int i;
-
- if (g_verbose)
- write_msg(NULL, "saving large object properties\n");
-
- /* Make sure we are in proper schema */
- selectSourceSchema("pg_catalog");
-
- /* Cursor to get all BLOB comments */
- if (AH->remoteVersion >= 90000)
- blobQry = "DECLARE blobcmt CURSOR FOR SELECT oid, "
- "obj_description(oid, 'pg_largeobject'), "
- "pg_get_userbyid(lomowner), lomacl "
- "FROM pg_largeobject_metadata";
- else if (AH->remoteVersion >= 70300)
- blobQry = "DECLARE blobcmt CURSOR FOR SELECT loid, "
- "obj_description(loid, 'pg_largeobject'), NULL, NULL "
- "FROM (SELECT DISTINCT loid FROM "
- "pg_description d JOIN pg_largeobject l ON (objoid = loid) "
- "WHERE classoid = 'pg_largeobject'::regclass) ss";
- else if (AH->remoteVersion >= 70200)
- blobQry = "DECLARE blobcmt CURSOR FOR SELECT loid, "
- "obj_description(loid, 'pg_largeobject'), NULL, NULL "
- "FROM (SELECT DISTINCT loid FROM pg_largeobject) ss";
- else if (AH->remoteVersion >= 70100)
- blobQry = "DECLARE blobcmt CURSOR FOR SELECT loid, "
- "obj_description(loid), NULL, NULL "
- "FROM (SELECT DISTINCT loid FROM pg_largeobject) ss";
- else
- blobQry = "DECLARE blobcmt CURSOR FOR SELECT oid, "
- " ( "
- " SELECT description "
- " FROM pg_description pd "
- " WHERE pd.objoid=pc.oid "
- " ), NULL, NULL "
- "FROM pg_class pc WHERE relkind = 'l'";
-
- res = PQexec(g_conn, blobQry);
- check_sql_result(res, g_conn, blobQry, PGRES_COMMAND_OK);
-
- /* Command to fetch from cursor */
- blobFetchQry = "FETCH 100 IN blobcmt";
-
- do
- {
- PQclear(res);
-
- /* Do a fetch */
- res = PQexec(g_conn, blobFetchQry);
- check_sql_result(res, g_conn, blobFetchQry, PGRES_TUPLES_OK);
-
- /* Process the tuples, if any */
- for (i = 0; i < PQntuples(res); i++)
- {
- Oid blobOid = atooid(PQgetvalue(res, i, 0));
- char *lo_comment = PQgetvalue(res, i, 1);
- char *lo_owner = PQgetvalue(res, i, 2);
- char *lo_acl = PQgetvalue(res, i, 3);
- char lo_name[32];
-
- resetPQExpBuffer(cmdQry);
-
- /* comment on the blob */
- if (!PQgetisnull(res, i, 1))
- {
- appendPQExpBuffer(cmdQry,
- "COMMENT ON LARGE OBJECT %u IS ", blobOid);
- appendStringLiteralAH(cmdQry, lo_comment, AH);
- appendPQExpBuffer(cmdQry, ";\n");
- }
-
- /* dump blob ownership, if necessary */
- if (!PQgetisnull(res, i, 2))
- {
- appendPQExpBuffer(cmdQry,
- "ALTER LARGE OBJECT %u OWNER TO %s;\n",
- blobOid, lo_owner);
- }
-
- /* dump blob privileges, if necessary */
- if (!PQgetisnull(res, i, 3) &&
- !dataOnly && !aclsSkip)
- {
- snprintf(lo_name, sizeof(lo_name), "%u", blobOid);
- if (!buildACLCommands(lo_name, NULL, "LARGE OBJECT",
- lo_acl, lo_owner, "",
- AH->remoteVersion, cmdQry))
- {
- write_msg(NULL, "could not parse ACL (%s) for "
- "large object %u", lo_acl, blobOid);
- exit_nicely();
- }
- }
-
- if (cmdQry->len > 0)
- {
- appendPQExpBuffer(cmdQry, "\n");
- archputs(cmdQry->data, AH);
- }
- }
- } while (PQntuples(res) > 0);
-
- PQclear(res);
-
- archputs("\n", AH);
-
- destroyPQExpBuffer(cmdQry);
-
- return 1;
-}
-
static void
binary_upgrade_set_type_oids_by_type_oid(PQExpBuffer upgrade_buffer,
Oid pg_type_oid)
@@ -6140,9 +6095,17 @@ dumpComment(Archive *fout, const char *target,
CommentItem *comments;
int ncomments;
- /* Comments are SCHEMA not data */
- if (dataOnly)
- return;
+ /* Comments are schema not data ... except blob comments are data */
+ if (strncmp(target, "LARGE OBJECT ", 13) != 0)
+ {
+ if (dataOnly)
+ return;
+ }
+ else
+ {
+ if (schemaOnly)
+ return;
+ }
/* Search for comments associated with catalogId, using table */
ncomments = findComments(fout, catalogId.tableoid, catalogId.oid,
@@ -6530,7 +6493,10 @@ dumpDumpableObject(Archive *fout, DumpableObject *dobj)
case DO_DEFAULT_ACL:
dumpDefaultACL(fout, (DefaultACLInfo *) dobj);
break;
- case DO_BLOBS:
+ case DO_BLOB:
+ dumpBlob(fout, (BlobInfo *) dobj);
+ break;
+ case DO_BLOB_DATA:
ArchiveEntry(fout, dobj->catId, dobj->dumpId,
dobj->name, NULL, NULL, "",
false, "BLOBS", SECTION_DATA,
@@ -6538,14 +6504,6 @@ dumpDumpableObject(Archive *fout, DumpableObject *dobj)
dobj->dependencies, dobj->nDeps,
dumpBlobs, NULL);
break;
- case DO_BLOB_COMMENTS:
- ArchiveEntry(fout, dobj->catId, dobj->dumpId,
- dobj->name, NULL, NULL, "",
- false, "BLOB COMMENTS", SECTION_DATA,
- "", "", NULL,
- dobj->dependencies, dobj->nDeps,
- dumpBlobComments, NULL);
- break;
}
}
@@ -10382,14 +10340,16 @@ dumpDefaultACL(Archive *fout, DefaultACLInfo *daclinfo)
*
* 'objCatId' is the catalog ID of the underlying object.
* 'objDumpId' is the dump ID of the underlying object.
- * 'type' must be TABLE, FUNCTION, LANGUAGE, SCHEMA, DATABASE, or TABLESPACE.
+ * 'type' must be one of
+ * TABLE, SEQUENCE, FUNCTION, LANGUAGE, SCHEMA, DATABASE, TABLESPACE,
+ * FOREIGN DATA WRAPPER, SERVER, or LARGE OBJECT.
* 'name' is the formatted name of the object. Must be quoted etc. already.
* 'subname' is the formatted name of the sub-object, if any. Must be quoted.
* 'tag' is the tag for the archive entry (typ. unquoted name of object).
* 'nspname' is the namespace the object is in (NULL if none).
* 'owner' is the owner, NULL if there is no owner (for languages).
* 'acls' is the string read out of the fooacl system catalog field;
- * it will be parsed here.
+ * it will be parsed here.
*----------
*/
static void
@@ -10401,7 +10361,11 @@ dumpACL(Archive *fout, CatalogId objCatId, DumpId objDumpId,
PQExpBuffer sql;
/* Do nothing if ACL dump is not enabled */
- if (dataOnly || aclsSkip)
+ if (aclsSkip)
+ return;
+
+ /* --data-only skips ACLs *except* BLOB ACLs */
+ if (dataOnly && strcmp(type, "LARGE OBJECT") != 0)
return;
sql = createPQExpBuffer();
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index fa5972a55f..e71d03604b 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -6,7 +6,7 @@
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/bin/pg_dump/pg_dump.h,v 1.162 2010/01/28 23:21:12 petere Exp $
+ * $PostgreSQL: pgsql/src/bin/pg_dump/pg_dump.h,v 1.163 2010/02/18 01:29:10 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -115,8 +115,8 @@ typedef enum
DO_FDW,
DO_FOREIGN_SERVER,
DO_DEFAULT_ACL,
- DO_BLOBS,
- DO_BLOB_COMMENTS
+ DO_BLOB,
+ DO_BLOB_DATA
} DumpableObjectType;
typedef struct _dumpableObject
@@ -443,6 +443,13 @@ typedef struct _defaultACLInfo
char *defaclacl;
} DefaultACLInfo;
+typedef struct _blobInfo
+{
+ DumpableObject dobj;
+ char *rolname;
+ char *blobacl;
+} BlobInfo;
+
/* global decls */
extern bool force_quotes; /* double-quotes for identifiers flag */
extern bool g_verbose; /* verbose flag */
diff --git a/src/bin/pg_dump/pg_dump_sort.c b/src/bin/pg_dump/pg_dump_sort.c
index 1551af3dbe..f3761217d1 100644
--- a/src/bin/pg_dump/pg_dump_sort.c
+++ b/src/bin/pg_dump/pg_dump_sort.c
@@ -9,7 +9,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/bin/pg_dump/pg_dump_sort.c,v 1.28 2010/02/15 19:59:47 petere Exp $
+ * $PostgreSQL: pgsql/src/bin/pg_dump/pg_dump_sort.c,v 1.29 2010/02/18 01:29:10 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -46,7 +46,7 @@ static const int oldObjectTypePriority[] =
16, /* DO_FK_CONSTRAINT */
2, /* DO_PROCLANG */
2, /* DO_CAST */
- 9, /* DO_TABLE_DATA */
+ 10, /* DO_TABLE_DATA */
7, /* DO_DUMMY_TYPE */
3, /* DO_TSPARSER */
4, /* DO_TSDICT */
@@ -55,8 +55,8 @@ static const int oldObjectTypePriority[] =
3, /* DO_FDW */
4, /* DO_FOREIGN_SERVER */
17, /* DO_DEFAULT_ACL */
- 10, /* DO_BLOBS */
- 11 /* DO_BLOB_COMMENTS */
+ 9, /* DO_BLOB */
+ 11 /* DO_BLOB_DATA */
};
/*
@@ -83,7 +83,7 @@ static const int newObjectTypePriority[] =
26, /* DO_FK_CONSTRAINT */
2, /* DO_PROCLANG */
8, /* DO_CAST */
- 19, /* DO_TABLE_DATA */
+ 20, /* DO_TABLE_DATA */
17, /* DO_DUMMY_TYPE */
10, /* DO_TSPARSER */
12, /* DO_TSDICT */
@@ -92,8 +92,8 @@ static const int newObjectTypePriority[] =
14, /* DO_FDW */
15, /* DO_FOREIGN_SERVER */
27, /* DO_DEFAULT_ACL */
- 20, /* DO_BLOBS */
- 21 /* DO_BLOB_COMMENTS */
+ 19, /* DO_BLOB */
+ 21 /* DO_BLOB_DATA */
};
@@ -1157,14 +1157,14 @@ describeDumpableObject(DumpableObject *obj, char *buf, int bufsize)
"DEFAULT ACL %s (ID %d OID %u)",
obj->name, obj->dumpId, obj->catId.oid);
return;
- case DO_BLOBS:
+ case DO_BLOB:
snprintf(buf, bufsize,
- "BLOBS (ID %d)",
- obj->dumpId);
+ "BLOB (ID %d OID %u)",
+ obj->dumpId, obj->catId.oid);
return;
- case DO_BLOB_COMMENTS:
+ case DO_BLOB_DATA:
snprintf(buf, bufsize,
- "BLOB COMMENTS (ID %d)",
+ "BLOB DATA (ID %d)",
obj->dumpId);
return;
}