diff options
Diffstat (limited to 'src/backend/catalog')
| -rw-r--r-- | src/backend/catalog/Makefile | 8 | ||||
| -rw-r--r-- | src/backend/catalog/aclchk.c | 316 | ||||
| -rw-r--r-- | src/backend/catalog/dependency.c | 16 | ||||
| -rw-r--r-- | src/backend/catalog/pg_largeobject.c | 257 | ||||
| -rw-r--r-- | src/backend/catalog/pg_shdepend.c | 7 |
5 files changed, 545 insertions, 59 deletions
diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile index ec548990b1..02a2b01b81 100644 --- a/src/backend/catalog/Makefile +++ b/src/backend/catalog/Makefile @@ -2,7 +2,7 @@ # # Makefile for backend/catalog # -# $PostgreSQL: pgsql/src/backend/catalog/Makefile,v 1.73 2009/10/07 22:14:16 alvherre Exp $ +# $PostgreSQL: pgsql/src/backend/catalog/Makefile,v 1.74 2009/12/11 03:34:55 itagaki Exp $ # #------------------------------------------------------------------------- @@ -29,9 +29,9 @@ POSTGRES_BKI_SRCS = $(addprefix $(top_srcdir)/src/include/catalog/,\ pg_proc.h pg_type.h pg_attribute.h pg_class.h \ pg_attrdef.h pg_constraint.h pg_inherits.h pg_index.h pg_operator.h \ pg_opfamily.h pg_opclass.h pg_am.h pg_amop.h pg_amproc.h \ - pg_language.h pg_largeobject.h pg_aggregate.h pg_statistic.h \ - pg_rewrite.h pg_trigger.h pg_listener.h pg_description.h pg_cast.h \ - pg_enum.h pg_namespace.h pg_conversion.h pg_depend.h \ + pg_language.h pg_largeobject_metadata.h pg_largeobject.h pg_aggregate.h \ + pg_statistic.h pg_rewrite.h pg_trigger.h pg_listener.h pg_description.h \ + pg_cast.h pg_enum.h pg_namespace.h pg_conversion.h pg_depend.h \ pg_database.h pg_db_role_setting.h pg_tablespace.h pg_pltemplate.h \ pg_authid.h pg_auth_members.h pg_shdepend.h pg_shdescription.h \ pg_ts_config.h pg_ts_config_map.h pg_ts_dict.h \ diff --git a/src/backend/catalog/aclchk.c b/src/backend/catalog/aclchk.c index 8b2599a99f..3c2fdb0cf1 100644 --- a/src/backend/catalog/aclchk.c +++ b/src/backend/catalog/aclchk.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/catalog/aclchk.c,v 1.156 2009/10/12 20:39:39 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/catalog/aclchk.c,v 1.157 2009/12/11 03:34:55 itagaki Exp $ * * NOTES * See acl.h. @@ -31,6 +31,8 @@ #include "catalog/pg_foreign_data_wrapper.h" #include "catalog/pg_foreign_server.h" #include "catalog/pg_language.h" +#include "catalog/pg_largeobject.h" +#include "catalog/pg_largeobject_metadata.h" #include "catalog/pg_namespace.h" #include "catalog/pg_opclass.h" #include "catalog/pg_operator.h" @@ -103,6 +105,7 @@ static void ExecGrant_Fdw(InternalGrant *grantStmt); static void ExecGrant_ForeignServer(InternalGrant *grantStmt); static void ExecGrant_Function(InternalGrant *grantStmt); static void ExecGrant_Language(InternalGrant *grantStmt); +static void ExecGrant_Largeobject(InternalGrant *grantStmt); static void ExecGrant_Namespace(InternalGrant *grantStmt); static void ExecGrant_Tablespace(InternalGrant *grantStmt); @@ -251,6 +254,9 @@ restrict_and_check_grant(bool is_grant, AclMode avail_goptions, bool all_privs, case ACL_KIND_LANGUAGE: whole_mask = ACL_ALL_RIGHTS_LANGUAGE; break; + case ACL_KIND_LARGEOBJECT: + whole_mask = ACL_ALL_RIGHTS_LARGEOBJECT; + break; case ACL_KIND_NAMESPACE: whole_mask = ACL_ALL_RIGHTS_NAMESPACE; break; @@ -410,6 +416,10 @@ ExecuteGrantStmt(GrantStmt *stmt) all_privileges = ACL_ALL_RIGHTS_LANGUAGE; errormsg = gettext_noop("invalid privilege type %s for language"); break; + case ACL_OBJECT_LARGEOBJECT: + all_privileges = ACL_ALL_RIGHTS_LARGEOBJECT; + errormsg = gettext_noop("invalid privilege type %s for large object"); + break; case ACL_OBJECT_NAMESPACE: all_privileges = ACL_ALL_RIGHTS_NAMESPACE; errormsg = gettext_noop("invalid privilege type %s for schema"); @@ -513,6 +523,9 @@ ExecGrantStmt_oids(InternalGrant *istmt) case ACL_OBJECT_LANGUAGE: ExecGrant_Language(istmt); break; + case ACL_OBJECT_LARGEOBJECT: + ExecGrant_Largeobject(istmt); + break; case ACL_OBJECT_NAMESPACE: ExecGrant_Namespace(istmt); break; @@ -597,6 +610,20 @@ objectNamesToOids(GrantObjectType objtype, List *objnames) ReleaseSysCache(tuple); } break; + case ACL_OBJECT_LARGEOBJECT: + foreach(cell, objnames) + { + Oid lobjOid = intVal(lfirst(cell)); + + if (!LargeObjectExists(lobjOid)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("large object %u does not exist", + lobjOid))); + + objects = lappend_oid(objects, lobjOid); + } + break; case ACL_OBJECT_NAMESPACE: foreach(cell, objnames) { @@ -1279,6 +1306,9 @@ RemoveRoleFromObjectACL(Oid roleid, Oid classid, Oid objid) case LanguageRelationId: istmt.objtype = ACL_OBJECT_LANGUAGE; break; + case LargeObjectRelationId: + istmt.objtype = ACL_OBJECT_LARGEOBJECT; + break; case NamespaceRelationId: istmt.objtype = ACL_OBJECT_NAMESPACE; break; @@ -2473,6 +2503,138 @@ ExecGrant_Language(InternalGrant *istmt) } static void +ExecGrant_Largeobject(InternalGrant *istmt) +{ + Relation relation; + ListCell *cell; + + if (istmt->all_privs && istmt->privileges == ACL_NO_RIGHTS) + istmt->privileges = ACL_ALL_RIGHTS_LARGEOBJECT; + + relation = heap_open(LargeObjectMetadataRelationId, + RowExclusiveLock); + + foreach(cell, istmt->objects) + { + Oid loid = lfirst_oid(cell); + Form_pg_largeobject_metadata form_lo_meta; + char loname[NAMEDATALEN]; + Datum aclDatum; + bool isNull; + AclMode avail_goptions; + AclMode this_privileges; + Acl *old_acl; + Acl *new_acl; + Oid grantorId; + Oid ownerId; + HeapTuple newtuple; + Datum values[Natts_pg_largeobject_metadata]; + bool nulls[Natts_pg_largeobject_metadata]; + bool replaces[Natts_pg_largeobject_metadata]; + int noldmembers; + int nnewmembers; + Oid *oldmembers; + Oid *newmembers; + ScanKeyData entry[1]; + SysScanDesc scan; + HeapTuple tuple; + + /* There's no syscache for pg_largeobject_metadata */ + ScanKeyInit(&entry[0], + ObjectIdAttributeNumber, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(loid)); + + scan = systable_beginscan(relation, + LargeObjectMetadataOidIndexId, true, + SnapshotNow, 1, entry); + + tuple = systable_getnext(scan); + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "cache lookup failed for large object %u", loid); + + form_lo_meta = (Form_pg_largeobject_metadata) GETSTRUCT(tuple); + + /* + * Get owner ID and working copy of existing ACL. If there's no ACL, + * substitute the proper default. + */ + ownerId = form_lo_meta->lomowner; + aclDatum = heap_getattr(tuple, + Anum_pg_largeobject_metadata_lomacl, + RelationGetDescr(relation), &isNull); + if (isNull) + old_acl = acldefault(ACL_OBJECT_LARGEOBJECT, ownerId); + else + old_acl = DatumGetAclPCopy(aclDatum); + + /* Determine ID to do the grant as, and available grant options */ + select_best_grantor(GetUserId(), istmt->privileges, + old_acl, ownerId, + &grantorId, &avail_goptions); + + /* + * Restrict the privileges to what we can actually grant, and emit the + * standards-mandated warning and error messages. + */ + snprintf(loname, sizeof(loname), "large object %u", loid); + this_privileges = + restrict_and_check_grant(istmt->is_grant, avail_goptions, + istmt->all_privs, istmt->privileges, + loid, grantorId, ACL_KIND_LARGEOBJECT, + loname, 0, NULL); + + /* + * Generate new ACL. + * + * We need the members of both old and new ACLs so we can correct the + * shared dependency information. + */ + noldmembers = aclmembers(old_acl, &oldmembers); + + new_acl = merge_acl_with_grant(old_acl, istmt->is_grant, + istmt->grant_option, istmt->behavior, + istmt->grantees, this_privileges, + grantorId, ownerId); + + nnewmembers = aclmembers(new_acl, &newmembers); + + /* finished building new ACL value, now insert it */ + MemSet(values, 0, sizeof(values)); + MemSet(nulls, false, sizeof(nulls)); + MemSet(replaces, false, sizeof(replaces)); + + replaces[Anum_pg_largeobject_metadata_lomacl - 1] = true; + values[Anum_pg_largeobject_metadata_lomacl - 1] + = PointerGetDatum(new_acl); + + newtuple = heap_modify_tuple(tuple, RelationGetDescr(relation), + values, nulls, replaces); + + simple_heap_update(relation, &newtuple->t_self, newtuple); + + /* keep the catalog indexes up to date */ + CatalogUpdateIndexes(relation, newtuple); + + /* Update the shared dependency ACL info */ + updateAclDependencies(LargeObjectRelationId, + HeapTupleGetOid(tuple), 0, + ownerId, istmt->is_grant, + noldmembers, oldmembers, + nnewmembers, newmembers); + + systable_endscan(scan); + + pfree(new_acl); + + /* prevent error when processing duplicate objects */ + CommandCounterIncrement(); + } + + heap_close(relation, RowExclusiveLock); +} + +static void ExecGrant_Namespace(InternalGrant *istmt) { Relation relation; @@ -2812,6 +2974,8 @@ static const char *const no_priv_msg[MAX_ACL_KIND] = gettext_noop("permission denied for type %s"), /* ACL_KIND_LANGUAGE */ gettext_noop("permission denied for language %s"), + /* ACL_KIND_LARGEOBJECT */ + gettext_noop("permission denied for large object %s"), /* ACL_KIND_NAMESPACE */ gettext_noop("permission denied for schema %s"), /* ACL_KIND_OPCLASS */ @@ -2850,6 +3014,8 @@ static const char *const not_owner_msg[MAX_ACL_KIND] = gettext_noop("must be owner of type %s"), /* ACL_KIND_LANGUAGE */ gettext_noop("must be owner of language %s"), + /* ACL_KIND_LARGEOBJECT */ + gettext_noop("must be owner of large object %s"), /* ACL_KIND_NAMESPACE */ gettext_noop("must be owner of schema %s"), /* ACL_KIND_OPCLASS */ @@ -2969,6 +3135,9 @@ pg_aclmask(AclObjectKind objkind, Oid table_oid, AttrNumber attnum, Oid roleid, return pg_proc_aclmask(table_oid, roleid, mask, how); case ACL_KIND_LANGUAGE: return pg_language_aclmask(table_oid, roleid, mask, how); + case ACL_KIND_LARGEOBJECT: + return pg_largeobject_aclmask_snapshot(table_oid, roleid, + mask, how, SnapshotNow); case ACL_KIND_NAMESPACE: return pg_namespace_aclmask(table_oid, roleid, mask, how); case ACL_KIND_TABLESPACE: @@ -3352,6 +3521,90 @@ pg_language_aclmask(Oid lang_oid, Oid roleid, } /* + * Exported routine for examining a user's privileges for a largeobject + * + * The reason why this interface has an argument of snapshot is that + * we apply a snapshot available on lo_open(), not SnapshotNow, when + * it is opened as read-only mode. + * If we could see the metadata and data from inconsistent viewpoint, + * it will give us much confusion. So, we need to provide an interface + * which takes an argument of snapshot. + * + * If the caller refers a large object with a certain snapshot except + * for SnapshotNow, its permission checks should be also applied in + * the same snapshot. + */ +AclMode +pg_largeobject_aclmask_snapshot(Oid lobj_oid, Oid roleid, + AclMode mask, AclMaskHow how, + Snapshot snapshot) +{ + AclMode result; + Relation pg_lo_meta; + ScanKeyData entry[1]; + SysScanDesc scan; + HeapTuple tuple; + Datum aclDatum; + bool isNull; + Acl *acl; + Oid ownerId; + + /* Superusers bypass all permission checking. */ + if (superuser_arg(roleid)) + return mask; + + /* + * Get the largeobject's ACL from pg_language_metadata + */ + pg_lo_meta = heap_open(LargeObjectMetadataRelationId, + AccessShareLock); + + ScanKeyInit(&entry[0], + ObjectIdAttributeNumber, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(lobj_oid)); + + scan = systable_beginscan(pg_lo_meta, + LargeObjectMetadataOidIndexId, true, + snapshot, 1, entry); + + tuple = systable_getnext(scan); + if (!HeapTupleIsValid(tuple)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("large object %u does not exist", lobj_oid))); + + ownerId = ((Form_pg_largeobject_metadata) GETSTRUCT(tuple))->lomowner; + + aclDatum = heap_getattr(tuple, Anum_pg_largeobject_metadata_lomacl, + RelationGetDescr(pg_lo_meta), &isNull); + + if (isNull) + { + /* No ACL, so build default ACL */ + acl = acldefault(ACL_OBJECT_LARGEOBJECT, ownerId); + aclDatum = (Datum) 0; + } + else + { + /* detoast ACL if necessary */ + acl = DatumGetAclP(aclDatum); + } + + result = aclmask(acl, roleid, ownerId, mask, how); + + /* if we have a detoasted copy, free it */ + if (acl && (Pointer) acl != DatumGetPointer(aclDatum)) + pfree(acl); + + systable_endscan(scan); + + heap_close(pg_lo_meta, AccessShareLock); + + return result; +} + +/* * Exported routine for examining a user's privileges for a namespace */ AclMode @@ -3802,6 +4055,20 @@ pg_language_aclcheck(Oid lang_oid, Oid roleid, AclMode mode) } /* + * Exported routine for checking a user's access privileges to a largeobject + */ +AclResult +pg_largeobject_aclcheck_snapshot(Oid lobj_oid, Oid roleid, AclMode mode, + Snapshot snapshot) +{ + if (pg_largeobject_aclmask_snapshot(lobj_oid, roleid, mode, + ACLMASK_ANY, snapshot) != 0) + return ACLCHECK_OK; + else + return ACLCHECK_NO_PRIV; +} + +/* * Exported routine for checking a user's access privileges to a namespace */ AclResult @@ -3992,6 +4259,53 @@ pg_language_ownercheck(Oid lan_oid, Oid roleid) } /* + * Ownership check for a largeobject (specified by OID) + * + * Note that we have no candidate to call this routine with a certain + * snapshot except for SnapshotNow, so we don't provide an interface + * with _snapshot() version now. + */ +bool +pg_largeobject_ownercheck(Oid lobj_oid, Oid roleid) +{ + Relation pg_lo_meta; + ScanKeyData entry[1]; + SysScanDesc scan; + HeapTuple tuple; + Oid ownerId; + + /* Superusers bypass all permission checking. */ + if (superuser_arg(roleid)) + return true; + + /* There's no syscache for pg_largeobject_metadata */ + pg_lo_meta = heap_open(LargeObjectMetadataRelationId, + AccessShareLock); + + ScanKeyInit(&entry[0], + ObjectIdAttributeNumber, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(lobj_oid)); + + scan = systable_beginscan(pg_lo_meta, + LargeObjectMetadataOidIndexId, true, + SnapshotNow, 1, entry); + + tuple = systable_getnext(scan); + if (!HeapTupleIsValid(tuple)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("large object %u does not exist", lobj_oid))); + + ownerId = ((Form_pg_largeobject_metadata) GETSTRUCT(tuple))->lomowner; + + systable_endscan(scan); + heap_close(pg_lo_meta, AccessShareLock); + + return has_privs_of_role(roleid, ownerId); +} + +/* * Ownership check for a namespace (specified by OID). */ bool diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c index 8a07c69c7e..4ef3bb3c56 100644 --- a/src/backend/catalog/dependency.c +++ b/src/backend/catalog/dependency.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/catalog/dependency.c,v 1.92 2009/10/05 19:24:35 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/catalog/dependency.c,v 1.93 2009/12/11 03:34:55 itagaki Exp $ * *------------------------------------------------------------------------- */ @@ -37,6 +37,7 @@ #include "catalog/pg_foreign_data_wrapper.h" #include "catalog/pg_foreign_server.h" #include "catalog/pg_language.h" +#include "catalog/pg_largeobject.h" #include "catalog/pg_namespace.h" #include "catalog/pg_opclass.h" #include "catalog/pg_operator.h" @@ -131,6 +132,7 @@ static const Oid object_classes[MAX_OCLASS] = { ConversionRelationId, /* OCLASS_CONVERSION */ AttrDefaultRelationId, /* OCLASS_DEFAULT */ LanguageRelationId, /* OCLASS_LANGUAGE */ + LargeObjectRelationId, /* OCLASS_LARGEOBJECT */ OperatorRelationId, /* OCLASS_OPERATOR */ OperatorClassRelationId, /* OCLASS_OPCLASS */ OperatorFamilyRelationId, /* OCLASS_OPFAMILY */ @@ -1074,6 +1076,10 @@ doDeletion(const ObjectAddress *object) DropProceduralLanguageById(object->objectId); break; + case OCLASS_LARGEOBJECT: + LargeObjectDrop(object->objectId); + break; + case OCLASS_OPERATOR: RemoveOperatorById(object->objectId); break; @@ -1991,6 +1997,10 @@ getObjectClass(const ObjectAddress *object) Assert(object->objectSubId == 0); return OCLASS_LANGUAGE; + case LargeObjectRelationId: + Assert(object->objectSubId == 0); + return OCLASS_LARGEOBJECT; + case OperatorRelationId: Assert(object->objectSubId == 0); return OCLASS_OPERATOR; @@ -2243,6 +2253,10 @@ getObjectDescription(const ObjectAddress *object) ReleaseSysCache(langTup); break; } + case OCLASS_LARGEOBJECT: + appendStringInfo(&buffer, _("large object %u"), + object->objectId); + break; case OCLASS_OPERATOR: appendStringInfo(&buffer, _("operator %s"), diff --git a/src/backend/catalog/pg_largeobject.c b/src/backend/catalog/pg_largeobject.c index 313ccdd3f0..517a0f3932 100644 --- a/src/backend/catalog/pg_largeobject.c +++ b/src/backend/catalog/pg_largeobject.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/catalog/pg_largeobject.c,v 1.33 2009/08/04 16:08:36 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/catalog/pg_largeobject.c,v 1.34 2009/12/11 03:34:55 itagaki Exp $ * *------------------------------------------------------------------------- */ @@ -16,8 +16,16 @@ #include "access/genam.h" #include "access/heapam.h" +#include "access/sysattr.h" +#include "catalog/catalog.h" +#include "catalog/dependency.h" #include "catalog/indexing.h" +#include "catalog/pg_authid.h" #include "catalog/pg_largeobject.h" +#include "catalog/pg_largeobject_metadata.h" +#include "catalog/toasting.h" +#include "miscadmin.h" +#include "utils/acl.h" #include "utils/bytea.h" #include "utils/fmgroids.h" #include "utils/rel.h" @@ -27,113 +35,258 @@ /* * Create a large object having the given LO identifier. * - * We do this by inserting an empty first page, so that the object will - * appear to exist with size 0. Note that the unique index will reject - * an attempt to create a duplicate page. + * We create a new large object by inserting an entry into + * pg_largeobject_metadata without any data pages, so that the object + * will appear to exist with size 0. */ -void +Oid LargeObjectCreate(Oid loid) { - Relation pg_largeobject; + Relation pg_lo_meta; HeapTuple ntup; - Datum values[Natts_pg_largeobject]; - bool nulls[Natts_pg_largeobject]; - int i; + Oid loid_new; + Datum values[Natts_pg_largeobject_metadata]; + bool nulls[Natts_pg_largeobject_metadata]; - pg_largeobject = heap_open(LargeObjectRelationId, RowExclusiveLock); + pg_lo_meta = heap_open(LargeObjectMetadataRelationId, + RowExclusiveLock); /* - * Form new tuple + * Insert metadata of the largeobject */ - for (i = 0; i < Natts_pg_largeobject; i++) - { - values[i] = (Datum) NULL; - nulls[i] = false; - } + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); - i = 0; - values[i++] = ObjectIdGetDatum(loid); - values[i++] = Int32GetDatum(0); - values[i++] = DirectFunctionCall1(byteain, - CStringGetDatum("")); + values[Anum_pg_largeobject_metadata_lomowner - 1] + = ObjectIdGetDatum(GetUserId()); + nulls[Anum_pg_largeobject_metadata_lomacl - 1] = true; - ntup = heap_form_tuple(pg_largeobject->rd_att, values, nulls); + ntup = heap_form_tuple(RelationGetDescr(pg_lo_meta), + values, nulls); + if (OidIsValid(loid)) + HeapTupleSetOid(ntup, loid); - /* - * Insert it - */ - simple_heap_insert(pg_largeobject, ntup); - - /* Update indexes */ - CatalogUpdateIndexes(pg_largeobject, ntup); + loid_new = simple_heap_insert(pg_lo_meta, ntup); + Assert(!OidIsValid(loid) || loid == loid_new); - heap_close(pg_largeobject, RowExclusiveLock); + CatalogUpdateIndexes(pg_lo_meta, ntup); heap_freetuple(ntup); + + heap_close(pg_lo_meta, RowExclusiveLock); + + return loid_new; } +/* + * Drop a large object having the given LO identifier. + * + * When we drop a large object, it is necessary to drop both of metadata + * and data pages in same time. + */ void LargeObjectDrop(Oid loid) { - bool found = false; + Relation pg_lo_meta; Relation pg_largeobject; ScanKeyData skey[1]; - SysScanDesc sd; + SysScanDesc scan; HeapTuple tuple; + pg_lo_meta = heap_open(LargeObjectMetadataRelationId, + RowExclusiveLock); + + pg_largeobject = heap_open(LargeObjectRelationId, + RowExclusiveLock); + + /* + * Delete an entry from pg_largeobject_metadata + */ ScanKeyInit(&skey[0], - Anum_pg_largeobject_loid, + ObjectIdAttributeNumber, BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(loid)); + ObjectIdGetDatum(loid)); - pg_largeobject = heap_open(LargeObjectRelationId, RowExclusiveLock); + scan = systable_beginscan(pg_lo_meta, + LargeObjectMetadataOidIndexId, true, + SnapshotNow, 1, skey); - sd = systable_beginscan(pg_largeobject, LargeObjectLOidPNIndexId, true, - SnapshotNow, 1, skey); + tuple = systable_getnext(scan); + if (!HeapTupleIsValid(tuple)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("large object %u does not exist", loid))); + + simple_heap_delete(pg_lo_meta, &tuple->t_self); + + systable_endscan(scan); + + /* + * Delete all the associated entries from pg_largeobject + */ + ScanKeyInit(&skey[0], + Anum_pg_largeobject_loid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(loid)); - while ((tuple = systable_getnext(sd)) != NULL) + scan = systable_beginscan(pg_largeobject, + LargeObjectLOidPNIndexId, true, + SnapshotNow, 1, skey); + while (HeapTupleIsValid(tuple = systable_getnext(scan))) { simple_heap_delete(pg_largeobject, &tuple->t_self); - found = true; } - systable_endscan(sd); + systable_endscan(scan); heap_close(pg_largeobject, RowExclusiveLock); - if (!found) + heap_close(pg_lo_meta, RowExclusiveLock); +} + +/* + * LargeObjectAlterOwner + * + * Implementation of ALTER LARGE OBJECT statement + */ +void +LargeObjectAlterOwner(Oid loid, Oid newOwnerId) +{ + Form_pg_largeobject_metadata form_lo_meta; + Relation pg_lo_meta; + ScanKeyData skey[1]; + SysScanDesc scan; + HeapTuple oldtup; + HeapTuple newtup; + + pg_lo_meta = heap_open(LargeObjectMetadataRelationId, + RowExclusiveLock); + + ScanKeyInit(&skey[0], + ObjectIdAttributeNumber, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(loid)); + + scan = systable_beginscan(pg_lo_meta, + LargeObjectMetadataOidIndexId, true, + SnapshotNow, 1, skey); + + oldtup = systable_getnext(scan); + if (!HeapTupleIsValid(oldtup)) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("large object %u does not exist", loid))); + + form_lo_meta = (Form_pg_largeobject_metadata) GETSTRUCT(oldtup); + if (form_lo_meta->lomowner != newOwnerId) + { + Datum values[Natts_pg_largeobject_metadata]; + bool nulls[Natts_pg_largeobject_metadata]; + bool replaces[Natts_pg_largeobject_metadata]; + Acl *newAcl; + Datum aclDatum; + bool isnull; + + /* Superusers can always do it */ + if (!superuser()) + { + /* + * The 'lo_compat_privileges' is not checked here, because we + * don't have any access control features in the 8.4.x series + * or earlier release. + * So, it is not a place we can define a compatible behavior. + */ + + /* Otherwise, must be owner of the existing object */ + if (!pg_largeobject_ownercheck(loid, GetUserId())) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be owner of large object %u", loid))); + + /* Must be able to become new owner */ + check_is_member_of_role(GetUserId(), newOwnerId); + } + + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(nulls)); + + values[Anum_pg_largeobject_metadata_lomowner - 1] + = ObjectIdGetDatum(newOwnerId); + replaces[Anum_pg_largeobject_metadata_lomowner - 1] = true; + + /* + * Determine the modified ACL for the new owner. + * This is only necessary when the ACL is non-null. + */ + aclDatum = heap_getattr(oldtup, + Anum_pg_largeobject_metadata_lomacl, + RelationGetDescr(pg_lo_meta), &isnull); + if (!isnull) + { + newAcl = aclnewowner(DatumGetAclP(aclDatum), + form_lo_meta->lomowner, newOwnerId); + values[Anum_pg_largeobject_metadata_lomacl - 1] + = PointerGetDatum(newAcl); + replaces[Anum_pg_largeobject_metadata_lomacl - 1] = true; + } + + newtup = heap_modify_tuple(oldtup, RelationGetDescr(pg_lo_meta), + values, nulls, replaces); + + simple_heap_update(pg_lo_meta, &newtup->t_self, newtup); + CatalogUpdateIndexes(pg_lo_meta, newtup); + + heap_freetuple(newtup); + + /* Update owner dependency reference */ + changeDependencyOnOwner(LargeObjectRelationId, + loid, newOwnerId); + } + systable_endscan(scan); + + heap_close(pg_lo_meta, RowExclusiveLock); } +/* + * LargeObjectExists + * + * Currently, we don't use system cache to contain metadata of + * large objects, because massive number of large objects can + * consume not a small amount of process local memory. + * + * Note that LargeObjectExists always scans the system catalog + * with SnapshotNow, so it is unavailable to use to check + * existence in read-only accesses. + */ bool LargeObjectExists(Oid loid) { + Relation pg_lo_meta; + ScanKeyData skey[1]; + SysScanDesc sd; + HeapTuple tuple; bool retval = false; - Relation pg_largeobject; - ScanKeyData skey[1]; - SysScanDesc sd; - /* - * See if we can find any tuples belonging to the specified LO - */ ScanKeyInit(&skey[0], - Anum_pg_largeobject_loid, + ObjectIdAttributeNumber, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(loid)); - pg_largeobject = heap_open(LargeObjectRelationId, AccessShareLock); + pg_lo_meta = heap_open(LargeObjectMetadataRelationId, + AccessShareLock); - sd = systable_beginscan(pg_largeobject, LargeObjectLOidPNIndexId, true, + sd = systable_beginscan(pg_lo_meta, + LargeObjectMetadataOidIndexId, true, SnapshotNow, 1, skey); - if (systable_getnext(sd) != NULL) + tuple = systable_getnext(sd); + if (HeapTupleIsValid(tuple)) retval = true; systable_endscan(sd); - heap_close(pg_largeobject, AccessShareLock); + heap_close(pg_lo_meta, AccessShareLock); return retval; } diff --git a/src/backend/catalog/pg_shdepend.c b/src/backend/catalog/pg_shdepend.c index be70143ea2..7130444448 100644 --- a/src/backend/catalog/pg_shdepend.c +++ b/src/backend/catalog/pg_shdepend.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/catalog/pg_shdepend.c,v 1.36 2009/10/07 22:14:18 alvherre Exp $ + * $PostgreSQL: pgsql/src/backend/catalog/pg_shdepend.c,v 1.37 2009/12/11 03:34:55 itagaki Exp $ * *------------------------------------------------------------------------- */ @@ -25,6 +25,7 @@ #include "catalog/pg_database.h" #include "catalog/pg_default_acl.h" #include "catalog/pg_language.h" +#include "catalog/pg_largeobject.h" #include "catalog/pg_namespace.h" #include "catalog/pg_operator.h" #include "catalog/pg_proc.h" @@ -1347,6 +1348,10 @@ shdepReassignOwned(List *roleids, Oid newrole) AlterLanguageOwner_oid(sdepForm->objid, newrole); break; + case LargeObjectRelationId: + LargeObjectAlterOwner(sdepForm->objid, newrole); + break; + case DefaultAclRelationId: /* * Ignore default ACLs; they should be handled by |
