diff options
| author | Itagaki Takahiro <itagaki.takahiro@gmail.com> | 2009-12-11 03:34:57 +0000 |
|---|---|---|
| committer | Itagaki Takahiro <itagaki.takahiro@gmail.com> | 2009-12-11 03:34:57 +0000 |
| commit | f1325ce213ae1843d2ee636ff6780c3f8ac9ada6 (patch) | |
| tree | 2fab9db3d075fcca27a87e92a9be02263865b93a /src/backend/catalog/pg_largeobject.c | |
| parent | 64579962bbe522bf9ced8e4ed712b9072fb89142 (diff) | |
| download | postgresql-f1325ce213ae1843d2ee636ff6780c3f8ac9ada6.tar.gz | |
Add large object access control.
A new system catalog pg_largeobject_metadata manages
ownership and access privileges of large objects.
KaiGai Kohei, reviewed by Jaime Casanova.
Diffstat (limited to 'src/backend/catalog/pg_largeobject.c')
| -rw-r--r-- | src/backend/catalog/pg_largeobject.c | 257 |
1 files changed, 205 insertions, 52 deletions
diff --git a/src/backend/catalog/pg_largeobject.c b/src/backend/catalog/pg_largeobject.c index 313ccdd3f0..517a0f3932 100644 --- a/src/backend/catalog/pg_largeobject.c +++ b/src/backend/catalog/pg_largeobject.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/catalog/pg_largeobject.c,v 1.33 2009/08/04 16:08:36 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/catalog/pg_largeobject.c,v 1.34 2009/12/11 03:34:55 itagaki Exp $ * *------------------------------------------------------------------------- */ @@ -16,8 +16,16 @@ #include "access/genam.h" #include "access/heapam.h" +#include "access/sysattr.h" +#include "catalog/catalog.h" +#include "catalog/dependency.h" #include "catalog/indexing.h" +#include "catalog/pg_authid.h" #include "catalog/pg_largeobject.h" +#include "catalog/pg_largeobject_metadata.h" +#include "catalog/toasting.h" +#include "miscadmin.h" +#include "utils/acl.h" #include "utils/bytea.h" #include "utils/fmgroids.h" #include "utils/rel.h" @@ -27,113 +35,258 @@ /* * Create a large object having the given LO identifier. * - * We do this by inserting an empty first page, so that the object will - * appear to exist with size 0. Note that the unique index will reject - * an attempt to create a duplicate page. + * We create a new large object by inserting an entry into + * pg_largeobject_metadata without any data pages, so that the object + * will appear to exist with size 0. */ -void +Oid LargeObjectCreate(Oid loid) { - Relation pg_largeobject; + Relation pg_lo_meta; HeapTuple ntup; - Datum values[Natts_pg_largeobject]; - bool nulls[Natts_pg_largeobject]; - int i; + Oid loid_new; + Datum values[Natts_pg_largeobject_metadata]; + bool nulls[Natts_pg_largeobject_metadata]; - pg_largeobject = heap_open(LargeObjectRelationId, RowExclusiveLock); + pg_lo_meta = heap_open(LargeObjectMetadataRelationId, + RowExclusiveLock); /* - * Form new tuple + * Insert metadata of the largeobject */ - for (i = 0; i < Natts_pg_largeobject; i++) - { - values[i] = (Datum) NULL; - nulls[i] = false; - } + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); - i = 0; - values[i++] = ObjectIdGetDatum(loid); - values[i++] = Int32GetDatum(0); - values[i++] = DirectFunctionCall1(byteain, - CStringGetDatum("")); + values[Anum_pg_largeobject_metadata_lomowner - 1] + = ObjectIdGetDatum(GetUserId()); + nulls[Anum_pg_largeobject_metadata_lomacl - 1] = true; - ntup = heap_form_tuple(pg_largeobject->rd_att, values, nulls); + ntup = heap_form_tuple(RelationGetDescr(pg_lo_meta), + values, nulls); + if (OidIsValid(loid)) + HeapTupleSetOid(ntup, loid); - /* - * Insert it - */ - simple_heap_insert(pg_largeobject, ntup); - - /* Update indexes */ - CatalogUpdateIndexes(pg_largeobject, ntup); + loid_new = simple_heap_insert(pg_lo_meta, ntup); + Assert(!OidIsValid(loid) || loid == loid_new); - heap_close(pg_largeobject, RowExclusiveLock); + CatalogUpdateIndexes(pg_lo_meta, ntup); heap_freetuple(ntup); + + heap_close(pg_lo_meta, RowExclusiveLock); + + return loid_new; } +/* + * Drop a large object having the given LO identifier. + * + * When we drop a large object, it is necessary to drop both of metadata + * and data pages in same time. + */ void LargeObjectDrop(Oid loid) { - bool found = false; + Relation pg_lo_meta; Relation pg_largeobject; ScanKeyData skey[1]; - SysScanDesc sd; + SysScanDesc scan; HeapTuple tuple; + pg_lo_meta = heap_open(LargeObjectMetadataRelationId, + RowExclusiveLock); + + pg_largeobject = heap_open(LargeObjectRelationId, + RowExclusiveLock); + + /* + * Delete an entry from pg_largeobject_metadata + */ ScanKeyInit(&skey[0], - Anum_pg_largeobject_loid, + ObjectIdAttributeNumber, BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(loid)); + ObjectIdGetDatum(loid)); - pg_largeobject = heap_open(LargeObjectRelationId, RowExclusiveLock); + scan = systable_beginscan(pg_lo_meta, + LargeObjectMetadataOidIndexId, true, + SnapshotNow, 1, skey); - sd = systable_beginscan(pg_largeobject, LargeObjectLOidPNIndexId, true, - SnapshotNow, 1, skey); + tuple = systable_getnext(scan); + if (!HeapTupleIsValid(tuple)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("large object %u does not exist", loid))); + + simple_heap_delete(pg_lo_meta, &tuple->t_self); + + systable_endscan(scan); + + /* + * Delete all the associated entries from pg_largeobject + */ + ScanKeyInit(&skey[0], + Anum_pg_largeobject_loid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(loid)); - while ((tuple = systable_getnext(sd)) != NULL) + scan = systable_beginscan(pg_largeobject, + LargeObjectLOidPNIndexId, true, + SnapshotNow, 1, skey); + while (HeapTupleIsValid(tuple = systable_getnext(scan))) { simple_heap_delete(pg_largeobject, &tuple->t_self); - found = true; } - systable_endscan(sd); + systable_endscan(scan); heap_close(pg_largeobject, RowExclusiveLock); - if (!found) + heap_close(pg_lo_meta, RowExclusiveLock); +} + +/* + * LargeObjectAlterOwner + * + * Implementation of ALTER LARGE OBJECT statement + */ +void +LargeObjectAlterOwner(Oid loid, Oid newOwnerId) +{ + Form_pg_largeobject_metadata form_lo_meta; + Relation pg_lo_meta; + ScanKeyData skey[1]; + SysScanDesc scan; + HeapTuple oldtup; + HeapTuple newtup; + + pg_lo_meta = heap_open(LargeObjectMetadataRelationId, + RowExclusiveLock); + + ScanKeyInit(&skey[0], + ObjectIdAttributeNumber, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(loid)); + + scan = systable_beginscan(pg_lo_meta, + LargeObjectMetadataOidIndexId, true, + SnapshotNow, 1, skey); + + oldtup = systable_getnext(scan); + if (!HeapTupleIsValid(oldtup)) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("large object %u does not exist", loid))); + + form_lo_meta = (Form_pg_largeobject_metadata) GETSTRUCT(oldtup); + if (form_lo_meta->lomowner != newOwnerId) + { + Datum values[Natts_pg_largeobject_metadata]; + bool nulls[Natts_pg_largeobject_metadata]; + bool replaces[Natts_pg_largeobject_metadata]; + Acl *newAcl; + Datum aclDatum; + bool isnull; + + /* Superusers can always do it */ + if (!superuser()) + { + /* + * The 'lo_compat_privileges' is not checked here, because we + * don't have any access control features in the 8.4.x series + * or earlier release. + * So, it is not a place we can define a compatible behavior. + */ + + /* Otherwise, must be owner of the existing object */ + if (!pg_largeobject_ownercheck(loid, GetUserId())) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be owner of large object %u", loid))); + + /* Must be able to become new owner */ + check_is_member_of_role(GetUserId(), newOwnerId); + } + + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(nulls)); + + values[Anum_pg_largeobject_metadata_lomowner - 1] + = ObjectIdGetDatum(newOwnerId); + replaces[Anum_pg_largeobject_metadata_lomowner - 1] = true; + + /* + * Determine the modified ACL for the new owner. + * This is only necessary when the ACL is non-null. + */ + aclDatum = heap_getattr(oldtup, + Anum_pg_largeobject_metadata_lomacl, + RelationGetDescr(pg_lo_meta), &isnull); + if (!isnull) + { + newAcl = aclnewowner(DatumGetAclP(aclDatum), + form_lo_meta->lomowner, newOwnerId); + values[Anum_pg_largeobject_metadata_lomacl - 1] + = PointerGetDatum(newAcl); + replaces[Anum_pg_largeobject_metadata_lomacl - 1] = true; + } + + newtup = heap_modify_tuple(oldtup, RelationGetDescr(pg_lo_meta), + values, nulls, replaces); + + simple_heap_update(pg_lo_meta, &newtup->t_self, newtup); + CatalogUpdateIndexes(pg_lo_meta, newtup); + + heap_freetuple(newtup); + + /* Update owner dependency reference */ + changeDependencyOnOwner(LargeObjectRelationId, + loid, newOwnerId); + } + systable_endscan(scan); + + heap_close(pg_lo_meta, RowExclusiveLock); } +/* + * LargeObjectExists + * + * Currently, we don't use system cache to contain metadata of + * large objects, because massive number of large objects can + * consume not a small amount of process local memory. + * + * Note that LargeObjectExists always scans the system catalog + * with SnapshotNow, so it is unavailable to use to check + * existence in read-only accesses. + */ bool LargeObjectExists(Oid loid) { + Relation pg_lo_meta; + ScanKeyData skey[1]; + SysScanDesc sd; + HeapTuple tuple; bool retval = false; - Relation pg_largeobject; - ScanKeyData skey[1]; - SysScanDesc sd; - /* - * See if we can find any tuples belonging to the specified LO - */ ScanKeyInit(&skey[0], - Anum_pg_largeobject_loid, + ObjectIdAttributeNumber, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(loid)); - pg_largeobject = heap_open(LargeObjectRelationId, AccessShareLock); + pg_lo_meta = heap_open(LargeObjectMetadataRelationId, + AccessShareLock); - sd = systable_beginscan(pg_largeobject, LargeObjectLOidPNIndexId, true, + sd = systable_beginscan(pg_lo_meta, + LargeObjectMetadataOidIndexId, true, SnapshotNow, 1, skey); - if (systable_getnext(sd) != NULL) + tuple = systable_getnext(sd); + if (HeapTupleIsValid(tuple)) retval = true; systable_endscan(sd); - heap_close(pg_largeobject, AccessShareLock); + heap_close(pg_lo_meta, AccessShareLock); return retval; } |
