summaryrefslogtreecommitdiff
path: root/src/backend/catalog/pg_largeobject.c
diff options
context:
space:
mode:
authorItagaki Takahiro <itagaki.takahiro@gmail.com>2009-12-11 03:34:57 +0000
committerItagaki Takahiro <itagaki.takahiro@gmail.com>2009-12-11 03:34:57 +0000
commitf1325ce213ae1843d2ee636ff6780c3f8ac9ada6 (patch)
tree2fab9db3d075fcca27a87e92a9be02263865b93a /src/backend/catalog/pg_largeobject.c
parent64579962bbe522bf9ced8e4ed712b9072fb89142 (diff)
downloadpostgresql-f1325ce213ae1843d2ee636ff6780c3f8ac9ada6.tar.gz
Add large object access control.
A new system catalog pg_largeobject_metadata manages ownership and access privileges of large objects. KaiGai Kohei, reviewed by Jaime Casanova.
Diffstat (limited to 'src/backend/catalog/pg_largeobject.c')
-rw-r--r--src/backend/catalog/pg_largeobject.c257
1 files changed, 205 insertions, 52 deletions
diff --git a/src/backend/catalog/pg_largeobject.c b/src/backend/catalog/pg_largeobject.c
index 313ccdd3f0..517a0f3932 100644
--- a/src/backend/catalog/pg_largeobject.c
+++ b/src/backend/catalog/pg_largeobject.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/catalog/pg_largeobject.c,v 1.33 2009/08/04 16:08:36 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/catalog/pg_largeobject.c,v 1.34 2009/12/11 03:34:55 itagaki Exp $
*
*-------------------------------------------------------------------------
*/
@@ -16,8 +16,16 @@
#include "access/genam.h"
#include "access/heapam.h"
+#include "access/sysattr.h"
+#include "catalog/catalog.h"
+#include "catalog/dependency.h"
#include "catalog/indexing.h"
+#include "catalog/pg_authid.h"
#include "catalog/pg_largeobject.h"
+#include "catalog/pg_largeobject_metadata.h"
+#include "catalog/toasting.h"
+#include "miscadmin.h"
+#include "utils/acl.h"
#include "utils/bytea.h"
#include "utils/fmgroids.h"
#include "utils/rel.h"
@@ -27,113 +35,258 @@
/*
* Create a large object having the given LO identifier.
*
- * We do this by inserting an empty first page, so that the object will
- * appear to exist with size 0. Note that the unique index will reject
- * an attempt to create a duplicate page.
+ * We create a new large object by inserting an entry into
+ * pg_largeobject_metadata without any data pages, so that the object
+ * will appear to exist with size 0.
*/
-void
+Oid
LargeObjectCreate(Oid loid)
{
- Relation pg_largeobject;
+ Relation pg_lo_meta;
HeapTuple ntup;
- Datum values[Natts_pg_largeobject];
- bool nulls[Natts_pg_largeobject];
- int i;
+ Oid loid_new;
+ Datum values[Natts_pg_largeobject_metadata];
+ bool nulls[Natts_pg_largeobject_metadata];
- pg_largeobject = heap_open(LargeObjectRelationId, RowExclusiveLock);
+ pg_lo_meta = heap_open(LargeObjectMetadataRelationId,
+ RowExclusiveLock);
/*
- * Form new tuple
+ * Insert metadata of the largeobject
*/
- for (i = 0; i < Natts_pg_largeobject; i++)
- {
- values[i] = (Datum) NULL;
- nulls[i] = false;
- }
+ memset(values, 0, sizeof(values));
+ memset(nulls, false, sizeof(nulls));
- i = 0;
- values[i++] = ObjectIdGetDatum(loid);
- values[i++] = Int32GetDatum(0);
- values[i++] = DirectFunctionCall1(byteain,
- CStringGetDatum(""));
+ values[Anum_pg_largeobject_metadata_lomowner - 1]
+ = ObjectIdGetDatum(GetUserId());
+ nulls[Anum_pg_largeobject_metadata_lomacl - 1] = true;
- ntup = heap_form_tuple(pg_largeobject->rd_att, values, nulls);
+ ntup = heap_form_tuple(RelationGetDescr(pg_lo_meta),
+ values, nulls);
+ if (OidIsValid(loid))
+ HeapTupleSetOid(ntup, loid);
- /*
- * Insert it
- */
- simple_heap_insert(pg_largeobject, ntup);
-
- /* Update indexes */
- CatalogUpdateIndexes(pg_largeobject, ntup);
+ loid_new = simple_heap_insert(pg_lo_meta, ntup);
+ Assert(!OidIsValid(loid) || loid == loid_new);
- heap_close(pg_largeobject, RowExclusiveLock);
+ CatalogUpdateIndexes(pg_lo_meta, ntup);
heap_freetuple(ntup);
+
+ heap_close(pg_lo_meta, RowExclusiveLock);
+
+ return loid_new;
}
+/*
+ * Drop a large object having the given LO identifier.
+ *
+ * When we drop a large object, it is necessary to drop both of metadata
+ * and data pages in same time.
+ */
void
LargeObjectDrop(Oid loid)
{
- bool found = false;
+ Relation pg_lo_meta;
Relation pg_largeobject;
ScanKeyData skey[1];
- SysScanDesc sd;
+ SysScanDesc scan;
HeapTuple tuple;
+ pg_lo_meta = heap_open(LargeObjectMetadataRelationId,
+ RowExclusiveLock);
+
+ pg_largeobject = heap_open(LargeObjectRelationId,
+ RowExclusiveLock);
+
+ /*
+ * Delete an entry from pg_largeobject_metadata
+ */
ScanKeyInit(&skey[0],
- Anum_pg_largeobject_loid,
+ ObjectIdAttributeNumber,
BTEqualStrategyNumber, F_OIDEQ,
- ObjectIdGetDatum(loid));
+ ObjectIdGetDatum(loid));
- pg_largeobject = heap_open(LargeObjectRelationId, RowExclusiveLock);
+ scan = systable_beginscan(pg_lo_meta,
+ LargeObjectMetadataOidIndexId, true,
+ SnapshotNow, 1, skey);
- sd = systable_beginscan(pg_largeobject, LargeObjectLOidPNIndexId, true,
- SnapshotNow, 1, skey);
+ tuple = systable_getnext(scan);
+ if (!HeapTupleIsValid(tuple))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("large object %u does not exist", loid)));
+
+ simple_heap_delete(pg_lo_meta, &tuple->t_self);
+
+ systable_endscan(scan);
+
+ /*
+ * Delete all the associated entries from pg_largeobject
+ */
+ ScanKeyInit(&skey[0],
+ Anum_pg_largeobject_loid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(loid));
- while ((tuple = systable_getnext(sd)) != NULL)
+ scan = systable_beginscan(pg_largeobject,
+ LargeObjectLOidPNIndexId, true,
+ SnapshotNow, 1, skey);
+ while (HeapTupleIsValid(tuple = systable_getnext(scan)))
{
simple_heap_delete(pg_largeobject, &tuple->t_self);
- found = true;
}
- systable_endscan(sd);
+ systable_endscan(scan);
heap_close(pg_largeobject, RowExclusiveLock);
- if (!found)
+ heap_close(pg_lo_meta, RowExclusiveLock);
+}
+
+/*
+ * LargeObjectAlterOwner
+ *
+ * Implementation of ALTER LARGE OBJECT statement
+ */
+void
+LargeObjectAlterOwner(Oid loid, Oid newOwnerId)
+{
+ Form_pg_largeobject_metadata form_lo_meta;
+ Relation pg_lo_meta;
+ ScanKeyData skey[1];
+ SysScanDesc scan;
+ HeapTuple oldtup;
+ HeapTuple newtup;
+
+ pg_lo_meta = heap_open(LargeObjectMetadataRelationId,
+ RowExclusiveLock);
+
+ ScanKeyInit(&skey[0],
+ ObjectIdAttributeNumber,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(loid));
+
+ scan = systable_beginscan(pg_lo_meta,
+ LargeObjectMetadataOidIndexId, true,
+ SnapshotNow, 1, skey);
+
+ oldtup = systable_getnext(scan);
+ if (!HeapTupleIsValid(oldtup))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("large object %u does not exist", loid)));
+
+ form_lo_meta = (Form_pg_largeobject_metadata) GETSTRUCT(oldtup);
+ if (form_lo_meta->lomowner != newOwnerId)
+ {
+ Datum values[Natts_pg_largeobject_metadata];
+ bool nulls[Natts_pg_largeobject_metadata];
+ bool replaces[Natts_pg_largeobject_metadata];
+ Acl *newAcl;
+ Datum aclDatum;
+ bool isnull;
+
+ /* Superusers can always do it */
+ if (!superuser())
+ {
+ /*
+ * The 'lo_compat_privileges' is not checked here, because we
+ * don't have any access control features in the 8.4.x series
+ * or earlier release.
+ * So, it is not a place we can define a compatible behavior.
+ */
+
+ /* Otherwise, must be owner of the existing object */
+ if (!pg_largeobject_ownercheck(loid, GetUserId()))
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("must be owner of large object %u", loid)));
+
+ /* Must be able to become new owner */
+ check_is_member_of_role(GetUserId(), newOwnerId);
+ }
+
+ memset(values, 0, sizeof(values));
+ memset(nulls, false, sizeof(nulls));
+ memset(replaces, false, sizeof(nulls));
+
+ values[Anum_pg_largeobject_metadata_lomowner - 1]
+ = ObjectIdGetDatum(newOwnerId);
+ replaces[Anum_pg_largeobject_metadata_lomowner - 1] = true;
+
+ /*
+ * Determine the modified ACL for the new owner.
+ * This is only necessary when the ACL is non-null.
+ */
+ aclDatum = heap_getattr(oldtup,
+ Anum_pg_largeobject_metadata_lomacl,
+ RelationGetDescr(pg_lo_meta), &isnull);
+ if (!isnull)
+ {
+ newAcl = aclnewowner(DatumGetAclP(aclDatum),
+ form_lo_meta->lomowner, newOwnerId);
+ values[Anum_pg_largeobject_metadata_lomacl - 1]
+ = PointerGetDatum(newAcl);
+ replaces[Anum_pg_largeobject_metadata_lomacl - 1] = true;
+ }
+
+ newtup = heap_modify_tuple(oldtup, RelationGetDescr(pg_lo_meta),
+ values, nulls, replaces);
+
+ simple_heap_update(pg_lo_meta, &newtup->t_self, newtup);
+ CatalogUpdateIndexes(pg_lo_meta, newtup);
+
+ heap_freetuple(newtup);
+
+ /* Update owner dependency reference */
+ changeDependencyOnOwner(LargeObjectRelationId,
+ loid, newOwnerId);
+ }
+ systable_endscan(scan);
+
+ heap_close(pg_lo_meta, RowExclusiveLock);
}
+/*
+ * LargeObjectExists
+ *
+ * Currently, we don't use system cache to contain metadata of
+ * large objects, because massive number of large objects can
+ * consume not a small amount of process local memory.
+ *
+ * Note that LargeObjectExists always scans the system catalog
+ * with SnapshotNow, so it is unavailable to use to check
+ * existence in read-only accesses.
+ */
bool
LargeObjectExists(Oid loid)
{
+ Relation pg_lo_meta;
+ ScanKeyData skey[1];
+ SysScanDesc sd;
+ HeapTuple tuple;
bool retval = false;
- Relation pg_largeobject;
- ScanKeyData skey[1];
- SysScanDesc sd;
- /*
- * See if we can find any tuples belonging to the specified LO
- */
ScanKeyInit(&skey[0],
- Anum_pg_largeobject_loid,
+ ObjectIdAttributeNumber,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(loid));
- pg_largeobject = heap_open(LargeObjectRelationId, AccessShareLock);
+ pg_lo_meta = heap_open(LargeObjectMetadataRelationId,
+ AccessShareLock);
- sd = systable_beginscan(pg_largeobject, LargeObjectLOidPNIndexId, true,
+ sd = systable_beginscan(pg_lo_meta,
+ LargeObjectMetadataOidIndexId, true,
SnapshotNow, 1, skey);
- if (systable_getnext(sd) != NULL)
+ tuple = systable_getnext(sd);
+ if (HeapTupleIsValid(tuple))
retval = true;
systable_endscan(sd);
- heap_close(pg_largeobject, AccessShareLock);
+ heap_close(pg_lo_meta, AccessShareLock);
return retval;
}