summaryrefslogtreecommitdiff
path: root/src/backend/utils
diff options
context:
space:
mode:
authorTom Lane <tgl@sss.pgh.pa.us>2004-07-17 03:32:14 +0000
committerTom Lane <tgl@sss.pgh.pa.us>2004-07-17 03:32:14 +0000
commitfe548629c50b753e96515ba2cfd8a85e8fba10de (patch)
treee80b54f71cb7868db3f9f9c97bccf4c48859951a /src/backend/utils
parentf4c069ca8fc80640bd1bff510697371ffaf45267 (diff)
downloadpostgresql-fe548629c50b753e96515ba2cfd8a85e8fba10de.tar.gz
Invent ResourceOwner mechanism as per my recent proposal, and use it to
keep track of portal-related resources separately from transaction-related resources. This allows cursors to work in a somewhat sane fashion with nested transactions. For now, cursor behavior is non-subtransactional, that is a cursor's state does not roll back if you abort a subtransaction that fetched from the cursor. We might want to change that later.
Diffstat (limited to 'src/backend/utils')
-rw-r--r--src/backend/utils/Makefile4
-rw-r--r--src/backend/utils/cache/catcache.c232
-rw-r--r--src/backend/utils/cache/relcache.c207
-rw-r--r--src/backend/utils/mmgr/portalmem.c191
-rw-r--r--src/backend/utils/resowner/Makefile30
-rw-r--r--src/backend/utils/resowner/README74
-rw-r--r--src/backend/utils/resowner/resowner.c840
7 files changed, 1183 insertions, 395 deletions
diff --git a/src/backend/utils/Makefile b/src/backend/utils/Makefile
index 657144ccd8..d48db1e818 100644
--- a/src/backend/utils/Makefile
+++ b/src/backend/utils/Makefile
@@ -1,14 +1,14 @@
#
# Makefile for utils
#
-# $PostgreSQL: pgsql/src/backend/utils/Makefile,v 1.22 2004/01/04 05:57:21 tgl Exp $
+# $PostgreSQL: pgsql/src/backend/utils/Makefile,v 1.23 2004/07/17 03:29:15 tgl Exp $
#
subdir = src/backend/utils/
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
-SUBDIRS := adt cache error fmgr hash init misc mmgr sort time mb
+SUBDIRS := adt cache error fmgr hash init mb misc mmgr resowner sort time
SUBDIROBJS := $(SUBDIRS:%=%/SUBSYS.o)
diff --git a/src/backend/utils/cache/catcache.c b/src/backend/utils/cache/catcache.c
index 8bfa3610bd..c382d74974 100644
--- a/src/backend/utils/cache/catcache.c
+++ b/src/backend/utils/cache/catcache.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/utils/cache/catcache.c,v 1.113 2004/07/01 00:51:17 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/utils/cache/catcache.c,v 1.114 2004/07/17 03:29:25 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -31,6 +31,7 @@
#include "utils/fmgroids.h"
#include "utils/catcache.h"
#include "utils/relcache.h"
+#include "utils/resowner.h"
#include "utils/syscache.h"
@@ -360,8 +361,6 @@ CatCacheRemoveCTup(CatCache *cache, CatCTup *ct)
/* free associated tuple data */
if (ct->tuple.t_data != NULL)
pfree(ct->tuple.t_data);
- if (ct->prev_refcount != NULL)
- pfree(ct->prev_refcount);
pfree(ct);
--cache->cc_ntup;
@@ -396,8 +395,6 @@ CatCacheRemoveCList(CatCache *cache, CatCList *cl)
/* free associated tuple data */
if (cl->tuple.t_data != NULL)
pfree(cl->tuple.t_data);
- if (cl->prev_refcount != NULL)
- pfree(cl->prev_refcount);
pfree(cl);
}
@@ -531,7 +528,7 @@ CreateCacheMemoryContext(void)
/*
* AtEOXact_CatCache
*
- * Clean up catcaches at end of transaction (either commit or abort)
+ * Clean up catcaches at end of main transaction (either commit or abort)
*
* We scan the caches to reset refcounts to zero. This is of course
* necessary in the abort case, since elog() may have interrupted routines.
@@ -564,13 +561,6 @@ AtEOXact_CatCache(bool isCommit)
cl->refcount = 0;
}
- /*
- * Reset the refcount stack. Drop the item count to zero,
- * but don't deallocate the stack itself, so it can be used by
- * future subtransactions.
- */
- cl->numpushes = 0;
-
/* Clean up any now-deletable dead entries */
if (cl->dead)
CatCacheRemoveCList(ccp, cl);
@@ -596,13 +586,6 @@ AtEOXact_CatCache(bool isCommit)
ct->refcount = 0;
}
- /*
- * Reset the refcount stack. Drop the item count to zero,
- * but don't deallocate the stack itself, so it can be used by
- * future subtransactions.
- */
- ct->numpushes = 0;
-
/* Clean up any now-deletable dead entries */
if (ct->dead)
CatCacheRemoveCTup(ct->my_cache, ct);
@@ -610,161 +593,6 @@ AtEOXact_CatCache(bool isCommit)
}
/*
- * AtSubStart_CatCache
- *
- * Saves reference counts of each entry at subtransaction start so they
- * can be restored if the subtransaction later aborts.
- */
-void
-AtSubStart_CatCache(void)
-{
- CatCache *ccp;
- Dlelem *elt,
- *nextelt;
- MemoryContext old_cxt;
-
-
- old_cxt = MemoryContextSwitchTo(CacheMemoryContext);
-
- /*
- * Prepare CLists
- */
- for (ccp = CacheHdr->ch_caches; ccp; ccp = ccp->cc_next)
- {
- for (elt = DLGetHead(&ccp->cc_lists); elt; elt = nextelt)
- {
- CatCList *cl = (CatCList *) DLE_VAL(elt);
-
- nextelt = DLGetSucc(elt);
-
- if (cl->numpushes == cl->numalloc)
- {
- if (cl->numalloc == 0)
- {
- cl->numalloc = 8;
- cl->prev_refcount = palloc(sizeof(int) * cl->numalloc);
- }
- else
- {
- cl->numalloc *= 2;
- cl->prev_refcount = repalloc(cl->prev_refcount, cl->numalloc * sizeof(int));
- }
- }
-
- cl->prev_refcount[cl->numpushes++] = cl->refcount;
- }
- }
-
- /*
- * Prepare CTuples
- */
- for (elt = DLGetHead(&CacheHdr->ch_lrulist); elt; elt = nextelt)
- {
- CatCTup *ct = (CatCTup *) DLE_VAL(elt);
-
- nextelt = DLGetSucc(elt);
-
- if (ct->numpushes == ct->numalloc)
- {
- if (ct->numalloc == 0)
- {
- ct->numalloc = 8;
- ct->prev_refcount = palloc(sizeof(int) * ct->numalloc);
- }
- else
- {
- ct->numalloc *= 2;
- ct->prev_refcount = repalloc(ct->prev_refcount, sizeof(int) * ct->numalloc);
- }
- }
-
- ct->prev_refcount[ct->numpushes++] = ct->refcount;
- }
-
- MemoryContextSwitchTo(old_cxt);
-}
-
-void
-AtEOSubXact_CatCache(bool isCommit)
-{
- CatCache *ccp;
- Dlelem *elt,
- *nextelt;
-
- /*
- * Restore CLists
- */
- for (ccp = CacheHdr->ch_caches; ccp; ccp = ccp->cc_next)
- {
- for (elt = DLGetHead(&ccp->cc_lists); elt; elt = nextelt)
- {
- CatCList *cl = (CatCList *) DLE_VAL(elt);
-
- nextelt = DLGetSucc(elt);
-
- /*
- * During commit, check whether the count is what
- * we expect.
- */
- if (isCommit)
- {
- int expected_refcount;
- if (cl->numpushes > 0)
- expected_refcount = cl->prev_refcount[cl->numpushes - 1];
- else
- expected_refcount = 0;
-
- if (cl->refcount != expected_refcount)
- elog(WARNING, "catcache reference leak");
- }
-
- /*
- * During abort we have to restore the original count;
- * during commit, we have to restore in case of a leak,
- * and it won't harm if this is the expected count.
- */
- if (cl->numpushes > 0)
- cl->refcount = cl->prev_refcount[--cl->numpushes];
- else
- cl->refcount = 0;
- }
- }
-
- /*
- * Prepare CTuples
- */
- for (elt = DLGetHead(&CacheHdr->ch_lrulist); elt; elt = nextelt)
- {
- CatCTup *ct = (CatCTup *) DLE_VAL(elt);
-
- nextelt = DLGetSucc(elt);
-
- if (isCommit)
- {
- int expected_refcount;
-
- if (ct->numpushes > 0)
- expected_refcount = ct->prev_refcount[ct->numpushes - 1];
- else
- expected_refcount = 0;
-
- if (ct->refcount != expected_refcount)
- elog(WARNING, "catcache reference leak");
- }
-
- /*
- * During abort we have to restore the original count;
- * during commit, we have to restore in case of a leak,
- * and it won't harm if this is the expected count.
- */
- if (ct->numpushes > 0)
- ct->refcount = ct->prev_refcount[--ct->numpushes];
- else
- ct->refcount = 0;
- }
-}
-
-/*
* ResetCatalogCache
*
* Reset one catalog cache to empty.
@@ -1334,7 +1162,9 @@ SearchCatCache(CatCache *cache,
*/
if (!ct->negative)
{
+ ResourceOwnerEnlargeCatCacheRefs(CurrentResourceOwner);
ct->refcount++;
+ ResourceOwnerRememberCatCacheRef(CurrentResourceOwner, &ct->tuple);
CACHE3_elog(DEBUG2, "SearchCatCache(%s): found in bucket %d",
cache->cc_relname, hashIndex);
@@ -1389,6 +1219,10 @@ SearchCatCache(CatCache *cache,
ct = CatalogCacheCreateEntry(cache, ntp,
hashValue, hashIndex,
false);
+ /* immediately set the refcount to 1 */
+ ResourceOwnerEnlargeCatCacheRefs(CurrentResourceOwner);
+ ct->refcount++;
+ ResourceOwnerRememberCatCacheRef(CurrentResourceOwner, &ct->tuple);
break; /* assume only one match */
}
@@ -1415,10 +1249,9 @@ SearchCatCache(CatCache *cache,
cache->cc_relname, hashIndex);
/*
- * We are not returning the new entry to the caller, so reset its
- * refcount.
+ * We are not returning the negative entry to the caller, so leave
+ * its refcount zero.
*/
- ct->refcount = 0; /* negative entries never have refs */
return NULL;
}
@@ -1457,6 +1290,7 @@ ReleaseCatCache(HeapTuple tuple)
Assert(ct->refcount > 0);
ct->refcount--;
+ ResourceOwnerForgetCatCacheRef(CurrentResourceOwner, &ct->tuple);
if (ct->refcount == 0
#ifndef CATCACHE_FORCE_RELEASE
@@ -1564,7 +1398,10 @@ SearchCatCacheList(CatCache *cache,
* do not move the members to the fronts of their hashbucket
* lists, however, since there's no point in that unless they are
* searched for individually.) Also bump the members' refcounts.
+ * (member refcounts are NOT registered separately with the
+ * resource owner.)
*/
+ ResourceOwnerEnlargeCatCacheListRefs(CurrentResourceOwner);
for (i = 0; i < cl->n_members; i++)
{
cl->members[i]->refcount++;
@@ -1574,6 +1411,7 @@ SearchCatCacheList(CatCache *cache,
/* Bump the list's refcount and return it */
cl->refcount++;
+ ResourceOwnerRememberCatCacheListRef(CurrentResourceOwner, cl);
CACHE2_elog(DEBUG2, "SearchCatCacheList(%s): found list",
cache->cc_relname);
@@ -1639,9 +1477,7 @@ SearchCatCacheList(CatCache *cache,
if (ct->c_list)
continue;
- /* Found a match, so bump its refcount and move to front */
- ct->refcount++;
-
+ /* Found a match, so move it to front */
DLMoveToFront(&ct->lrulist_elem);
break;
@@ -1655,6 +1491,16 @@ SearchCatCacheList(CatCache *cache,
false);
}
+ /*
+ * We have to bump the member refcounts immediately to ensure they
+ * won't get dropped from the cache while loading other members.
+ * If we get an error before we finish constructing the CatCList
+ * then we will leak those reference counts. This is annoying but
+ * it has no real consequence beyond possibly generating some
+ * warning messages at the next transaction commit, so it's not
+ * worth fixing.
+ */
+ ct->refcount++;
ctlist = lcons(ct, ctlist);
nmembers++;
}
@@ -1677,10 +1523,7 @@ SearchCatCacheList(CatCache *cache,
cl->cl_magic = CL_MAGIC;
cl->my_cache = cache;
DLInitElem(&cl->cache_elem, (void *) cl);
- cl->refcount = 1; /* count this first reference */
- cl->prev_refcount = NULL;
- cl->numpushes = 0;
- cl->numalloc = 0;
+ cl->refcount = 0; /* for the moment */
cl->dead = false;
cl->ordered = ordered;
cl->nkeys = nkeys;
@@ -1704,6 +1547,11 @@ SearchCatCacheList(CatCache *cache,
CACHE3_elog(DEBUG2, "SearchCatCacheList(%s): made list of %d members",
cache->cc_relname, nmembers);
+ /* Finally, bump the list's refcount and return it */
+ ResourceOwnerEnlargeCatCacheListRefs(CurrentResourceOwner);
+ cl->refcount++;
+ ResourceOwnerRememberCatCacheListRef(CurrentResourceOwner, cl);
+
return cl;
}
@@ -1735,6 +1583,7 @@ ReleaseCatCacheList(CatCList *list)
}
list->refcount--;
+ ResourceOwnerForgetCatCacheListRef(CurrentResourceOwner, list);
if (list->refcount == 0
#ifndef CATCACHE_FORCE_RELEASE
@@ -1748,7 +1597,7 @@ ReleaseCatCacheList(CatCList *list)
/*
* CatalogCacheCreateEntry
* Create a new CatCTup entry, copying the given HeapTuple and other
- * supplied data into it. The new entry is given refcount 1.
+ * supplied data into it. The new entry initially has refcount 0.
*/
static CatCTup *
CatalogCacheCreateEntry(CatCache *cache, HeapTuple ntp,
@@ -1775,13 +1624,10 @@ CatalogCacheCreateEntry(CatCache *cache, HeapTuple ntp,
DLInitElem(&ct->lrulist_elem, (void *) ct);
DLInitElem(&ct->cache_elem, (void *) ct);
ct->c_list = NULL;
- ct->refcount = 1; /* count this first reference */
+ ct->refcount = 0; /* for the moment */
ct->dead = false;
ct->negative = negative;
ct->hash_value = hashValue;
- ct->prev_refcount = NULL;
- ct->numpushes = 0;
- ct->numalloc = 0;
DLAddHead(&CacheHdr->ch_lrulist, &ct->lrulist_elem);
DLAddHead(&cache->cc_bucket[hashIndex], &ct->cache_elem);
@@ -1791,8 +1637,8 @@ CatalogCacheCreateEntry(CatCache *cache, HeapTuple ntp,
/*
* If we've exceeded the desired size of the caches, try to throw away
- * the least recently used entry. NB: the newly-built entry cannot
- * get thrown away here, because it has positive refcount.
+ * the least recently used entry. NB: be careful not to throw away
+ * the newly-built entry...
*/
if (CacheHdr->ch_ntup > CacheHdr->ch_maxtup)
{
@@ -1805,7 +1651,7 @@ CatalogCacheCreateEntry(CatCache *cache, HeapTuple ntp,
prevelt = DLGetPred(elt);
- if (oldct->refcount == 0)
+ if (oldct->refcount == 0 && oldct != ct)
{
CACHE2_elog(DEBUG2, "CatCacheCreateEntry(%s): Overflow, LRU removal",
cache->cc_relname);
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index 2342899272..c4787042c0 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/utils/cache/relcache.c,v 1.206 2004/07/01 00:51:17 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/utils/cache/relcache.c,v 1.207 2004/07/17 03:29:25 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -62,6 +62,7 @@
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/relcache.h"
+#include "utils/resowner.h"
#include "utils/syscache.h"
#include "utils/typcache.h"
@@ -273,8 +274,6 @@ static void IndexSupportInitialize(Form_pg_index iform,
static OpClassCacheEnt *LookupOpclassInfo(Oid operatorClassOid,
StrategyNumber numStrats,
StrategyNumber numSupport);
-static inline void RelationPushReferenceCount(Relation rel);
-static inline void RelationPopReferenceCount(Relation rel);
/*
@@ -830,16 +829,12 @@ RelationBuildDesc(RelationBuildDescInfo buildinfo,
RelationGetRelid(relation) = relid;
/*
- * initialize relation->rd_refcnt
- */
- RelationSetReferenceCount(relation, 1);
-
- /*
* normal relations are not nailed into the cache; nor can a
* pre-existing relation be new. It could be temp though. (Actually,
* it could be new too, but it's okay to forget that fact if forced to
* flush the entry.)
*/
+ relation->rd_refcnt = 0;
relation->rd_isnailed = 0;
relation->rd_isnew = false;
relation->rd_istemp = isTempNamespace(relation->rd_rel->relnamespace);
@@ -1280,9 +1275,9 @@ formrdesc(const char *relationName,
relation->rd_smgr = NULL;
/*
- * initialize reference count
+ * initialize reference count: 1 because it is nailed in cache
*/
- RelationSetReferenceCount(relation, 1);
+ relation->rd_refcnt = 1;
/*
* all entries built with this routine are nailed-in-cache; none are
@@ -1487,6 +1482,8 @@ RelationIdGetRelation(Oid relationId)
buildinfo.i.info_id = relationId;
rd = RelationBuildDesc(buildinfo, NULL);
+ if (RelationIsValid(rd))
+ RelationIncrementReferenceCount(rd);
return rd;
}
@@ -1516,6 +1513,8 @@ RelationSysNameGetRelation(const char *relationName)
buildinfo.i.info_name = (char *) relationName;
rd = RelationBuildDesc(buildinfo, NULL);
+ if (RelationIsValid(rd))
+ RelationIncrementReferenceCount(rd);
return rd;
}
@@ -1525,6 +1524,36 @@ RelationSysNameGetRelation(const char *relationName)
*/
/*
+ * RelationIncrementReferenceCount
+ * Increments relation reference count.
+ *
+ * Note: bootstrap mode has its own weird ideas about relation refcount
+ * behavior; we ought to fix it someday, but for now, just disable
+ * reference count ownership tracking in bootstrap mode.
+ */
+void
+RelationIncrementReferenceCount(Relation rel)
+{
+ ResourceOwnerEnlargeRelationRefs(CurrentResourceOwner);
+ rel->rd_refcnt += 1;
+ if (!IsBootstrapProcessingMode())
+ ResourceOwnerRememberRelationRef(CurrentResourceOwner, rel);
+}
+
+/*
+ * RelationDecrementReferenceCount
+ * Decrements relation reference count.
+ */
+void
+RelationDecrementReferenceCount(Relation rel)
+{
+ Assert(rel->rd_refcnt > 0);
+ rel->rd_refcnt -= 1;
+ if (!IsBootstrapProcessingMode())
+ ResourceOwnerForgetRelationRef(CurrentResourceOwner, rel);
+}
+
+/*
* RelationClose - close an open relation
*
* Actually, we just decrement the refcount.
@@ -1680,8 +1709,6 @@ RelationClearRelation(Relation relation, bool rebuild)
list_free(relation->rd_indexlist);
if (relation->rd_indexcxt)
MemoryContextDelete(relation->rd_indexcxt);
- if (relation->rd_prevrefcnt)
- pfree(relation->rd_prevrefcnt);
/*
* If we're really done with the relcache entry, blow it away. But if
@@ -1704,6 +1731,10 @@ RelationClearRelation(Relation relation, bool rebuild)
* When rebuilding an open relcache entry, must preserve ref count
* and rd_isnew flag. Also attempt to preserve the tupledesc and
* rewrite-rule substructures in place.
+ *
+ * Note that this process does not touch CurrentResourceOwner;
+ * which is good because whatever ref counts the entry may have
+ * do not necessarily belong to that resource owner.
*/
int old_refcnt = relation->rd_refcnt;
bool old_isnew = relation->rd_isnew;
@@ -1726,7 +1757,7 @@ RelationClearRelation(Relation relation, bool rebuild)
elog(ERROR, "relation %u deleted while still in use",
buildinfo.i.info_id);
}
- RelationSetReferenceCount(relation, old_refcnt);
+ relation->rd_refcnt = old_refcnt;
relation->rd_isnew = old_isnew;
if (equalTupleDescs(old_att, relation->rd_att))
{
@@ -1964,7 +1995,7 @@ RelationCacheInvalidate(void)
/*
* AtEOXact_RelationCache
*
- * Clean up the relcache at transaction commit or abort.
+ * Clean up the relcache at main-transaction commit or abort.
*
* Note: this must be called *before* processing invalidation messages.
* In the case of abort, we don't want to try to rebuild any invalidated
@@ -2031,22 +2062,16 @@ AtEOXact_RelationCache(bool isCommit)
elog(WARNING, "relcache reference leak: relation \"%s\" has refcnt %d instead of %d",
RelationGetRelationName(relation),
relation->rd_refcnt, expected_refcnt);
- RelationSetReferenceCount(relation, expected_refcnt);
+ relation->rd_refcnt = expected_refcnt;
}
}
else
{
/* abort case, just reset it quietly */
- RelationSetReferenceCount(relation, expected_refcnt);
+ relation->rd_refcnt = expected_refcnt;
}
/*
- * Reset the refcount stack. Just drop the item count; don't deallocate
- * the stack itself so it can be reused by future subtransactions.
- */
- relation->rd_numpushed = 0;
-
- /*
* Flush any temporary index list.
*/
if (relation->rd_indexvalid == 2)
@@ -2059,131 +2084,6 @@ AtEOXact_RelationCache(bool isCommit)
}
/*
- * RelationPushReferenceCount
- *
- * Push the current reference count into the stack. Don't modify the
- * reference count itself.
- */
-static inline void
-RelationPushReferenceCount(Relation rel)
-{
- /* Enlarge the stack if we run out of space. */
- if (rel->rd_numpushed == rel->rd_numalloc)
- {
- MemoryContext old_cxt = MemoryContextSwitchTo(CacheMemoryContext);
-
- if (rel->rd_numalloc == 0)
- {
- rel->rd_numalloc = 8;
- rel->rd_prevrefcnt = palloc(rel->rd_numalloc * sizeof(int));
- }
- else
- {
- rel->rd_numalloc *= 2;
- rel->rd_prevrefcnt = repalloc(rel->rd_prevrefcnt, rel->rd_numalloc * sizeof(int));
- }
-
- MemoryContextSwitchTo(old_cxt);
- }
-
- rel->rd_prevrefcnt[rel->rd_numpushed++] = rel->rd_refcnt;
-}
-
-/*
- * RelationPopReferenceCount
- *
- * Pop the latest stored reference count. If there is none, drop it
- * to zero; the entry was created in the current subtransaction.
- */
-static inline void
-RelationPopReferenceCount(Relation rel)
-{
- if (rel->rd_numpushed == 0)
- {
- rel->rd_refcnt = rel->rd_isnailed ? 1 : 0;
- return;
- }
-
- rel->rd_refcnt = rel->rd_prevrefcnt[--rel->rd_numpushed];
-}
-
-/*
- * AtEOSubXact_RelationCache
- */
-void
-AtEOSubXact_RelationCache(bool isCommit)
-{
- HASH_SEQ_STATUS status;
- RelIdCacheEnt *idhentry;
-
- /* We'd better not be bootstrapping. */
- Assert(!IsBootstrapProcessingMode());
-
- hash_seq_init(&status, RelationIdCache);
-
- while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
- {
- Relation relation = idhentry->reldesc;
-
- /*
- * During subtransaction commit, we first check whether the
- * current refcount is correct: if there is no item in the stack,
- * the relcache entry was created during this subtransaction, it should
- * be 0 (or 1 for nailed relations). If the stack has at least one
- * item, the expected count is whatever that item is.
- */
- if (isCommit)
- {
- int expected_refcnt;
-
- if (relation->rd_numpushed == 0)
- expected_refcnt = relation->rd_isnailed ? 1 : 0;
- else
- expected_refcnt = relation->rd_prevrefcnt[relation->rd_numpushed - 1];
-
- if (relation->rd_refcnt != expected_refcnt)
- {
- elog(WARNING, "relcache reference leak: relation \"%s\" has refcnt %d instead of %d",
- RelationGetRelationName(relation),
- relation->rd_refcnt, expected_refcnt);
- }
- }
-
- /*
- * On commit, the expected count is stored so there's no harm in
- * popping it (and we may need to fix if there was a leak); and during
- * abort, the correct refcount has to be restored.
- */
- RelationPopReferenceCount(relation);
- }
-}
-
-/*
- * AtSubStart_RelationCache
- *
- * At subtransaction start, we push the current reference count into
- * the refcount stack, so it can be restored if the subtransaction aborts.
- */
-void
-AtSubStart_RelationCache(void)
-{
- HASH_SEQ_STATUS status;
- RelIdCacheEnt *idhentry;
-
- /* We'd better not be bootstrapping. */
- Assert(!IsBootstrapProcessingMode());
-
- hash_seq_init(&status, RelationIdCache);
-
- while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
- {
- Relation relation = idhentry->reldesc;
-
- RelationPushReferenceCount(relation);
- }
-}
-
-/*
* RelationBuildLocalRelation
* Build a relcache entry for an about-to-be-created relation,
* and enter it into the relcache.
@@ -2223,7 +2123,7 @@ RelationBuildLocalRelation(const char *relname,
/* make sure relation is marked as having no open file yet */
rel->rd_smgr = NULL;
- RelationSetReferenceCount(rel, 1);
+ rel->rd_refcnt = nailit ? 1 : 0;
/* it's being created in this transaction */
rel->rd_isnew = true;
@@ -2305,6 +2205,11 @@ RelationBuildLocalRelation(const char *relname,
*/
MemoryContextSwitchTo(oldcxt);
+ /*
+ * Caller expects us to pin the returned entry.
+ */
+ RelationIncrementReferenceCount(rel);
+
return rel;
}
@@ -2422,7 +2327,7 @@ RelationCacheInitializePhase2(void)
buildinfo.i.info_name = (indname); \
ird = RelationBuildDesc(buildinfo, NULL); \
ird->rd_isnailed = 1; \
- RelationSetReferenceCount(ird, 1); \
+ ird->rd_refcnt = 1; \
} while (0)
LOAD_CRIT_INDEX(ClassNameNspIndex);
@@ -3201,9 +3106,9 @@ load_relcache_init_file(void)
rel->rd_smgr = NULL;
rel->rd_targblock = InvalidBlockNumber;
if (rel->rd_isnailed)
- RelationSetReferenceCount(rel, 1);
+ rel->rd_refcnt = 1;
else
- RelationSetReferenceCount(rel, 0);
+ rel->rd_refcnt = 0;
rel->rd_indexvalid = 0;
rel->rd_indexlist = NIL;
MemSet(&rel->pgstat_info, 0, sizeof(rel->pgstat_info));
diff --git a/src/backend/utils/mmgr/portalmem.c b/src/backend/utils/mmgr/portalmem.c
index 466b2fc97b..2093dc2a46 100644
--- a/src/backend/utils/mmgr/portalmem.c
+++ b/src/backend/utils/mmgr/portalmem.c
@@ -12,7 +12,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/utils/mmgr/portalmem.c,v 1.66 2004/07/01 00:51:29 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/utils/mmgr/portalmem.c,v 1.67 2004/07/17 03:29:46 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -186,6 +186,10 @@ CreatePortal(const char *name, bool allowDup, bool dupSilent)
ALLOCSET_SMALL_INITSIZE,
ALLOCSET_SMALL_MAXSIZE);
+ /* create a resource owner for the portal */
+ portal->resowner = ResourceOwnerCreate(CurTransactionResourceOwner,
+ "Portal");
+
/* initialize portal fields that don't start off zero */
portal->cleanup = PortalCleanup;
portal->createXact = GetCurrentTransactionId();
@@ -291,17 +295,14 @@ PortalCreateHoldStore(Portal portal)
/*
* PortalDrop
* Destroy the portal.
- *
- * isError: if true, we are destroying portals at the end of a failed
- * transaction. (This causes PortalCleanup to skip unneeded steps.)
*/
void
-PortalDrop(Portal portal, bool isError)
+PortalDrop(Portal portal, bool isTopCommit)
{
AssertArg(PortalIsValid(portal));
/* Not sure if this case can validly happen or not... */
- if (portal->portalActive)
+ if (portal->status == PORTAL_ACTIVE)
elog(ERROR, "cannot drop active portal");
/*
@@ -314,7 +315,49 @@ PortalDrop(Portal portal, bool isError)
/* let portalcmds.c clean up the state it knows about */
if (PointerIsValid(portal->cleanup))
- (*portal->cleanup) (portal, isError);
+ (*portal->cleanup) (portal);
+
+ /*
+ * Release any resources still attached to the portal. There are
+ * several cases being covered here:
+ *
+ * Top transaction commit (indicated by isTopCommit): normally we should
+ * do nothing here and let the regular end-of-transaction resource
+ * releasing mechanism handle these resources too. However, if we have
+ * a FAILED portal (eg, a cursor that got an error), we'd better clean
+ * up its resources to avoid resource-leakage warning messages.
+ *
+ * Sub transaction commit: never comes here at all, since we don't
+ * kill any portals in AtSubCommit_Portals().
+ *
+ * Main or sub transaction abort: we will do nothing here because
+ * portal->resowner was already set NULL; the resources were already
+ * cleaned up in transaction abort.
+ *
+ * Ordinary portal drop: must release resources. However, if the portal
+ * is not FAILED then we do not release its locks. The locks become
+ * the responsibility of the transaction's ResourceOwner (since it is
+ * the parent of the portal's owner) and will be released when the
+ * transaction eventually ends.
+ */
+ if (portal->resowner &&
+ (!isTopCommit || portal->status == PORTAL_FAILED))
+ {
+ bool isCommit = (portal->status != PORTAL_FAILED);
+
+ ResourceOwnerRelease(portal->resowner,
+ RESOURCE_RELEASE_BEFORE_LOCKS,
+ isCommit, false);
+ ResourceOwnerRelease(portal->resowner,
+ RESOURCE_RELEASE_LOCKS,
+ isCommit, false);
+ ResourceOwnerRelease(portal->resowner,
+ RESOURCE_RELEASE_AFTER_LOCKS,
+ isCommit, false);
+ if (!isCommit)
+ ResourceOwnerDelete(portal->resowner);
+ }
+ portal->resowner = NULL;
/*
* Delete tuplestore if present. We should do this even under error
@@ -396,19 +439,29 @@ AtCommit_Portals(void)
/*
* Do not touch active portals --- this can only happen in the
* case of a multi-transaction utility command, such as VACUUM.
+ *
+ * Note however that any resource owner attached to such a portal
+ * is still going to go away, so don't leave a dangling pointer.
*/
- if (portal->portalActive)
+ if (portal->status == PORTAL_ACTIVE)
+ {
+ portal->resowner = NULL;
continue;
+ }
- if (portal->cursorOptions & CURSOR_OPT_HOLD)
- {
- /*
- * Do nothing to cursors held over from a previous
- * transaction.
- */
- if (portal->createXact != xact)
- continue;
+ /*
+ * Do nothing else to cursors held over from a previous
+ * transaction. (This test must include checking CURSOR_OPT_HOLD,
+ * else we will fail to clean up a VACUUM portal if it fails after
+ * its first sub-transaction.)
+ */
+ if (portal->createXact != xact &&
+ (portal->cursorOptions & CURSOR_OPT_HOLD))
+ continue;
+ if ((portal->cursorOptions & CURSOR_OPT_HOLD) &&
+ portal->status == PORTAL_READY)
+ {
/*
* We are exiting the transaction that created a holdable
* cursor. Instead of dropping the portal, prepare it for
@@ -420,11 +473,18 @@ AtCommit_Portals(void)
*/
PortalCreateHoldStore(portal);
PersistHoldablePortal(portal);
+
+ /*
+ * Any resources belonging to the portal will be released in the
+ * upcoming transaction-wide cleanup; the portal will no
+ * longer have its own resources.
+ */
+ portal->resowner = NULL;
}
else
{
/* Zap all non-holdable portals */
- PortalDrop(portal, false);
+ PortalDrop(portal, true);
}
}
}
@@ -432,13 +492,11 @@ AtCommit_Portals(void)
/*
* Abort processing for portals.
*
- * At this point we reset the "active" flags and run the cleanup hook if
+ * At this point we reset "active" status and run the cleanup hook if
* present, but we can't release memory until the cleanup call.
*
* The reason we need to reset active is so that we can replace the unnamed
- * portal, else we'll fail to execute ROLLBACK when it arrives. Also, we
- * want to run the cleanup hook now to be certain it knows that we had an
- * error abort and not successful conclusion.
+ * portal, else we'll fail to execute ROLLBACK when it arrives.
*/
void
AtAbort_Portals(void)
@@ -453,7 +511,8 @@ AtAbort_Portals(void)
{
Portal portal = hentry->portal;
- portal->portalActive = false;
+ if (portal->status == PORTAL_ACTIVE)
+ portal->status = PORTAL_FAILED;
/*
* Do nothing else to cursors held over from a previous
@@ -468,17 +527,22 @@ AtAbort_Portals(void)
/* let portalcmds.c clean up the state it knows about */
if (PointerIsValid(portal->cleanup))
{
- (*portal->cleanup) (portal, true);
+ (*portal->cleanup) (portal);
portal->cleanup = NULL;
}
+ /*
+ * Any resources belonging to the portal will be released in the
+ * upcoming transaction-wide cleanup; they will be gone before
+ * we run PortalDrop.
+ */
+ portal->resowner = NULL;
}
}
/*
* Post-abort cleanup for portals.
*
- * Delete all portals not held over from prior transactions.
- */
+ * Delete all portals not held over from prior transactions. */
void
AtCleanup_Portals(void)
{
@@ -492,10 +556,9 @@ AtCleanup_Portals(void)
{
Portal portal = hentry->portal;
- /*
- * Let's just make sure no one's active...
- */
- portal->portalActive = false;
+ /* AtAbort_Portals should have fixed these: */
+ Assert(portal->status != PORTAL_ACTIVE);
+ Assert(portal->resowner == NULL);
/*
* Do nothing else to cursors held over from a previous
@@ -507,8 +570,8 @@ AtCleanup_Portals(void)
(portal->cursorOptions & CURSOR_OPT_HOLD))
continue;
- /* Else zap it with prejudice. */
- PortalDrop(portal, true);
+ /* Else zap it. */
+ PortalDrop(portal, false);
}
}
@@ -516,11 +579,11 @@ AtCleanup_Portals(void)
* Pre-subcommit processing for portals.
*
* Reassign the portals created in the current subtransaction to the parent
- * transaction. (XXX perhaps we should reassign only holdable cursors,
- * and drop the rest?)
+ * transaction.
*/
void
-AtSubCommit_Portals(TransactionId parentXid)
+AtSubCommit_Portals(TransactionId parentXid,
+ ResourceOwner parentXactOwner)
{
HASH_SEQ_STATUS status;
PortalHashEnt *hentry;
@@ -533,19 +596,24 @@ AtSubCommit_Portals(TransactionId parentXid)
Portal portal = hentry->portal;
if (portal->createXact == curXid)
+ {
portal->createXact = parentXid;
+ if (portal->resowner)
+ ResourceOwnerNewParent(portal->resowner, parentXactOwner);
+ }
}
}
/*
* Subtransaction abort handling for portals.
*
- * Deactivate all portals created during the failed subtransaction.
+ * Deactivate failed portals created during the failed subtransaction.
* Note that per AtSubCommit_Portals, this will catch portals created
* in descendants of the subtransaction too.
*/
void
-AtSubAbort_Portals(void)
+AtSubAbort_Portals(TransactionId parentXid,
+ ResourceOwner parentXactOwner)
{
HASH_SEQ_STATUS status;
PortalHashEnt *hentry;
@@ -560,13 +628,39 @@ AtSubAbort_Portals(void)
if (portal->createXact != curXid)
continue;
- portal->portalActive = false;
+ /*
+ * Force any active portals of my own transaction into FAILED state.
+ * This is mostly to ensure that a portal running a FETCH will go
+ * FAILED if the underlying cursor fails. (Note we do NOT want to
+ * do this to upper-level portals, since they may be able to continue.)
+ */
+ if (portal->status == PORTAL_ACTIVE)
+ portal->status = PORTAL_FAILED;
- /* let portalcmds.c clean up the state it knows about */
- if (PointerIsValid(portal->cleanup))
+ /*
+ * If the portal is READY then allow it to survive into the
+ * parent transaction; otherwise shut it down.
+ */
+ if (portal->status == PORTAL_READY)
{
- (*portal->cleanup) (portal, true);
- portal->cleanup = NULL;
+ portal->createXact = parentXid;
+ if (portal->resowner)
+ ResourceOwnerNewParent(portal->resowner, parentXactOwner);
+ }
+ else
+ {
+ /* let portalcmds.c clean up the state it knows about */
+ if (PointerIsValid(portal->cleanup))
+ {
+ (*portal->cleanup) (portal);
+ portal->cleanup = NULL;
+ }
+ /*
+ * Any resources belonging to the portal will be released in the
+ * upcoming transaction-wide cleanup; they will be gone before
+ * we run PortalDrop.
+ */
+ portal->resowner = NULL;
}
}
}
@@ -574,8 +668,8 @@ AtSubAbort_Portals(void)
/*
* Post-subabort cleanup for portals.
*
- * Drop all portals created in the finishing subtransaction and all
- * its descendants.
+ * Drop all portals created in the failed subtransaction (but note that
+ * we will not drop any that were reassigned to the parent above).
*/
void
AtSubCleanup_Portals(void)
@@ -593,12 +687,11 @@ AtSubCleanup_Portals(void)
if (portal->createXact != curXid)
continue;
- /*
- * Let's just make sure no one's active...
- */
- portal->portalActive = false;
+ /* AtSubAbort_Portals should have fixed these: */
+ Assert(portal->status != PORTAL_ACTIVE);
+ Assert(portal->resowner == NULL);
- /* Zap it with prejudice. */
- PortalDrop(portal, true);
+ /* Zap it. */
+ PortalDrop(portal, false);
}
}
diff --git a/src/backend/utils/resowner/Makefile b/src/backend/utils/resowner/Makefile
new file mode 100644
index 0000000000..691d316f16
--- /dev/null
+++ b/src/backend/utils/resowner/Makefile
@@ -0,0 +1,30 @@
+#-------------------------------------------------------------------------
+#
+# Makefile--
+# Makefile for utils/resowner
+#
+# IDENTIFICATION
+# $PostgreSQL: pgsql/src/backend/utils/resowner/Makefile,v 1.1 2004/07/17 03:30:10 tgl Exp $
+#
+#-------------------------------------------------------------------------
+
+subdir = src/backend/utils/resowner
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+
+OBJS = resowner.o
+
+all: SUBSYS.o
+
+SUBSYS.o: $(OBJS)
+ $(LD) $(LDREL) $(LDOUT) SUBSYS.o $(OBJS)
+
+depend dep:
+ $(CC) -MM $(CFLAGS) *.c >depend
+
+clean:
+ rm -f SUBSYS.o $(OBJS)
+
+ifeq (depend,$(wildcard depend))
+include depend
+endif
diff --git a/src/backend/utils/resowner/README b/src/backend/utils/resowner/README
new file mode 100644
index 0000000000..27180f6aff
--- /dev/null
+++ b/src/backend/utils/resowner/README
@@ -0,0 +1,74 @@
+$PostgreSQL: pgsql/src/backend/utils/resowner/README,v 1.1 2004/07/17 03:30:10 tgl Exp $
+
+Notes about resource owners
+---------------------------
+
+ResourceOwner objects are a concept invented to simplify management of
+query-related resources, such as buffer pins and table locks. These
+resources need to be tracked in a reliable way to ensure that they will
+be released at query end, even if the query fails due to an error.
+Rather than expecting the entire executor to have bulletproof data
+structures, we localize the tracking of such resources into a single
+module.
+
+The design of the ResourceOwner API is modeled on our MemoryContext API,
+which has proven very flexible and successful in preventing memory leaks.
+In particular we allow ResourceOwners to have child ResourceOwner objects
+so that there can be forests of the things; releasing a parent
+ResourceOwner acts on all its direct and indirect children as well.
+
+(It is tempting to consider unifying ResourceOwners and MemoryContexts
+into a single object type, but their usage patterns are sufficiently
+different that this is probably not really a helpful thing to do.)
+
+We create a ResourceOwner for each transaction or subtransaction as
+well as one for each Portal. During execution of a Portal, the global
+variable CurrentResourceOwner points to the Portal's ResourceOwner.
+This causes operations such as ReadBuffer and LockAcquire to record
+ownership of the acquired resources in that ResourceOwner object.
+
+When a Portal is closed, any remaining resources (typically only locks)
+become the responsibility of the current transaction. This is represented
+by making the Portal's ResourceOwner a child of the current transaction's
+ResourceOwner. Similarly, subtransaction ResourceOwners are children of
+their immediate parent.
+
+We need transaction-related ResourceOwners as well as Portal-related ones
+because transactions may initiate operations that require resources (such
+as query parsing) when no associated Portal exists yet.
+
+
+API overview
+------------
+
+The basic operations on a ResourceOwner are:
+
+* create a ResourceOwner
+
+* associate or deassociate some resource with a ResourceOwner
+
+* release a ResourceOwner's assets (free all owned resources, but not the
+ owner object itself)
+
+* delete a ResourceOwner (including child owner objects); all resources
+ must have been released beforehand
+
+Currently, ResourceOwners contain direct support for recording ownership
+of buffer pins, lmgr locks, and catcache and relcache references. Other
+objects can be associated with a ResourceOwner by recording the address of
+the owning ResourceOwner in such an object. There is an API for other
+modules to get control during ResourceOwner release, so that they can scan
+their own data structures to find the objects that need to be deleted.
+
+Whenever we are inside a transaction, the global variable
+CurrentResourceOwner shows which resource owner should be assigned
+ownership of acquired resources. Note however that CurrentResourceOwner
+is NULL when not inside any transaction (or when inside a failed
+transaction). In this case it is not valid to acquire query-lifespan
+resources.
+
+When unpinning a buffer or releasing a lock or cache reference,
+CurrentResourceOwner must point to the same resource owner that was current
+when the buffer, lock, or cache reference was acquired. It would be possible
+to relax this restriction given additional bookkeeping effort, but at present
+there seems no need.
diff --git a/src/backend/utils/resowner/resowner.c b/src/backend/utils/resowner/resowner.c
new file mode 100644
index 0000000000..e2eb1183ef
--- /dev/null
+++ b/src/backend/utils/resowner/resowner.c
@@ -0,0 +1,840 @@
+/*-------------------------------------------------------------------------
+ *
+ * resowner.c
+ * POSTGRES resource owner management code.
+ *
+ * Query-lifespan resources are tracked by associating them with
+ * ResourceOwner objects. This provides a simple mechanism for ensuring
+ * that such resources are freed at the right time.
+ * See utils/resowner/README for more info.
+ *
+ *
+ * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * $PostgreSQL: pgsql/src/backend/utils/resowner/resowner.c,v 1.1 2004/07/17 03:30:10 tgl Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "utils/resowner.h"
+#include "access/gistscan.h"
+#include "access/hash.h"
+#include "access/rtree.h"
+#include "storage/bufmgr.h"
+#include "storage/proc.h"
+#include "utils/memutils.h"
+#include "utils/relcache.h"
+
+
+/*
+ * Info needed to identify/release a lock
+ */
+typedef struct LockIdData
+{
+ /* we assume lockmethodid is part of locktag */
+ LOCKTAG locktag;
+ TransactionId xid;
+ LOCKMODE lockmode;
+} LockIdData;
+
+
+/*
+ * ResourceOwner objects look like this
+ */
+typedef struct ResourceOwnerData
+{
+ ResourceOwner parent; /* NULL if no parent (toplevel owner) */
+ ResourceOwner firstchild; /* head of linked list of children */
+ ResourceOwner nextchild; /* next child of same parent */
+ const char *name; /* name (just for debugging) */
+
+ /* We have built-in support for remembering owned buffers */
+ int nbuffers; /* number of owned buffer pins */
+ Buffer *buffers; /* dynamically allocated array */
+ int maxbuffers; /* currently allocated array size */
+
+ /* We have built-in support for remembering owned locks */
+ int nlocks; /* number of owned locks */
+ LockIdData *locks; /* dynamically allocated array */
+ int maxlocks; /* currently allocated array size */
+
+ /* We have built-in support for remembering catcache references */
+ int ncatrefs; /* number of owned catcache pins */
+ HeapTuple *catrefs; /* dynamically allocated array */
+ int maxcatrefs; /* currently allocated array size */
+
+ int ncatlistrefs; /* number of owned catcache-list pins */
+ CatCList **catlistrefs; /* dynamically allocated array */
+ int maxcatlistrefs; /* currently allocated array size */
+
+ /* We have built-in support for remembering relcache references */
+ int nrelrefs; /* number of owned relcache pins */
+ Relation *relrefs; /* dynamically allocated array */
+ int maxrelrefs; /* currently allocated array size */
+} ResourceOwnerData;
+
+
+/*****************************************************************************
+ * GLOBAL MEMORY *
+ *****************************************************************************/
+
+ResourceOwner CurrentResourceOwner = NULL;
+ResourceOwner CurTransactionResourceOwner = NULL;
+ResourceOwner TopTransactionResourceOwner = NULL;
+
+/*
+ * List of add-on callbacks for resource releasing
+ */
+typedef struct ResourceReleaseCallbackItem
+{
+ struct ResourceReleaseCallbackItem *next;
+ ResourceReleaseCallback callback;
+ void *arg;
+} ResourceReleaseCallbackItem;
+
+static ResourceReleaseCallbackItem *ResourceRelease_callbacks = NULL;
+
+
+/*****************************************************************************
+ * EXPORTED ROUTINES *
+ *****************************************************************************/
+
+
+/*
+ * ResourceOwnerCreate
+ * Create an empty ResourceOwner.
+ *
+ * All ResourceOwner objects are kept in TopMemoryContext, since they should
+ * only be freed explicitly.
+ */
+ResourceOwner
+ResourceOwnerCreate(ResourceOwner parent, const char *name)
+{
+ ResourceOwner owner;
+
+ owner = (ResourceOwner) MemoryContextAllocZero(TopMemoryContext,
+ sizeof(ResourceOwnerData));
+ owner->name = name;
+
+ if (parent)
+ {
+ owner->parent = parent;
+ owner->nextchild = parent->firstchild;
+ parent->firstchild = owner;
+ }
+
+ return owner;
+}
+
+/*
+ * ResourceOwnerRelease
+ * Release all resources owned by a ResourceOwner and its descendants,
+ * but don't delete the owner objects themselves.
+ *
+ * Note that this executes just one phase of release, and so typically
+ * must be called three times. We do it this way because (a) we want to
+ * do all the recursion separately for each phase, thereby preserving
+ * the needed order of operations; and (b) xact.c may have other operations
+ * to do between the phases.
+ *
+ * phase: release phase to execute
+ * isCommit: true for successful completion of a query or transaction,
+ * false for unsuccessful
+ * isTopLevel: true if completing a main transaction, else false
+ *
+ * isCommit is passed because some modules may expect that their resources
+ * were all released already if the transaction or portal finished normally.
+ * If so it is reasonable to give a warning (NOT an error) should any
+ * unreleased resources be present. When isCommit is false, such warnings
+ * are generally inappropriate.
+ *
+ * isTopLevel is passed when we are releasing TopTransactionResourceOwner
+ * at completion of a main transaction. This generally means that *all*
+ * resources will be released, and so we can optimize things a bit.
+ */
+void
+ResourceOwnerRelease(ResourceOwner owner,
+ ResourceReleasePhase phase,
+ bool isCommit,
+ bool isTopLevel)
+{
+ ResourceOwner child;
+ ResourceOwner save;
+ ResourceReleaseCallbackItem *item;
+
+ /* Recurse to handle descendants */
+ for (child = owner->firstchild; child != NULL; child = child->nextchild)
+ ResourceOwnerRelease(child, phase, isCommit, isTopLevel);
+
+ /*
+ * Make CurrentResourceOwner point to me, so that ReleaseBuffer etc
+ * don't get confused.
+ */
+ save = CurrentResourceOwner;
+ CurrentResourceOwner = owner;
+
+ if (phase == RESOURCE_RELEASE_BEFORE_LOCKS)
+ {
+ /* Release buffer pins */
+ if (isTopLevel)
+ {
+ /*
+ * For a top-level xact we are going to release all buffers,
+ * so just do a single bufmgr call at the top of the recursion.
+ */
+ if (owner == TopTransactionResourceOwner)
+ AtEOXact_Buffers(isCommit);
+ /* Mark object as owning no buffers, just for sanity */
+ owner->nbuffers = 0;
+ }
+ else
+ {
+ /*
+ * Release buffers retail. Note that ReleaseBuffer will remove
+ * the buffer entry from my list, so I just have to iterate till
+ * there are none.
+ *
+ * XXX this is fairly inefficient due to multiple BufMgrLock grabs
+ * if there are lots of buffers to be released, but we don't
+ * expect many (indeed none in the success case) so it's probably
+ * not worth optimizing.
+ *
+ * We are however careful to release back-to-front, so as to
+ * avoid O(N^2) behavior in ResourceOwnerForgetBuffer().
+ */
+ while (owner->nbuffers > 0)
+ ReleaseBuffer(owner->buffers[owner->nbuffers - 1]);
+ }
+ /* Release relcache references */
+ if (isTopLevel)
+ {
+ /*
+ * For a top-level xact we are going to release all references,
+ * so just do a single relcache call at the top of the recursion.
+ */
+ if (owner == TopTransactionResourceOwner)
+ AtEOXact_RelationCache(isCommit);
+ /* Mark object as owning no relrefs, just for sanity */
+ owner->nrelrefs = 0;
+ }
+ else
+ {
+ /*
+ * Release relcache refs retail. Note that RelationClose will
+ * remove the relref entry from my list, so I just have to iterate
+ * till there are none.
+ */
+ while (owner->nrelrefs > 0)
+ RelationClose(owner->relrefs[owner->nrelrefs - 1]);
+ }
+ }
+ else if (phase == RESOURCE_RELEASE_LOCKS)
+ {
+ if (isTopLevel)
+ {
+ /*
+ * For a top-level xact we are going to release all locks (or at
+ * least all non-session locks), so just do a single lmgr call
+ * at the top of the recursion.
+ */
+ if (owner == TopTransactionResourceOwner)
+ ProcReleaseLocks(isCommit);
+ /* Mark object as holding no locks, just for sanity */
+ owner->nlocks = 0;
+ }
+ else if (!isCommit)
+ {
+ /*
+ * Release locks retail. Note that LockRelease will remove
+ * the lock entry from my list, so I just have to iterate till
+ * there are none. Also note that if we are committing a
+ * subtransaction, we do NOT release its locks yet.
+ *
+ * XXX as above, this is a bit inefficient but probably not worth
+ * the trouble to optimize more.
+ */
+ while (owner->nlocks > 0)
+ {
+ LockIdData *lockid = &owner->locks[owner->nlocks - 1];
+
+ LockRelease(lockid->locktag.lockmethodid,
+ &lockid->locktag,
+ lockid->xid,
+ lockid->lockmode);
+ }
+ }
+ }
+ else if (phase == RESOURCE_RELEASE_AFTER_LOCKS)
+ {
+ /* Release catcache references */
+ if (isTopLevel)
+ {
+ /*
+ * For a top-level xact we are going to release all references,
+ * so just do a single catcache call at the top of the recursion.
+ */
+ if (owner == TopTransactionResourceOwner)
+ AtEOXact_CatCache(isCommit);
+ /* Mark object as owning no catrefs, just for sanity */
+ owner->ncatrefs = 0;
+ owner->ncatlistrefs = 0;
+ }
+ else
+ {
+ /*
+ * Release catcache refs retail. Note that ReleaseCatCache will
+ * remove the catref entry from my list, so I just have to iterate
+ * till there are none. Ditto for catcache lists.
+ */
+ while (owner->ncatrefs > 0)
+ ReleaseCatCache(owner->catrefs[owner->ncatrefs - 1]);
+ while (owner->ncatlistrefs > 0)
+ ReleaseCatCacheList(owner->catlistrefs[owner->ncatlistrefs - 1]);
+ }
+ /* Clean up index scans too */
+ ReleaseResources_gist();
+ ReleaseResources_hash();
+ ReleaseResources_rtree();
+ }
+
+ /* Let add-on modules get a chance too */
+ for (item = ResourceRelease_callbacks; item; item = item->next)
+ (*item->callback) (phase, isCommit, isTopLevel, item->arg);
+
+ CurrentResourceOwner = save;
+}
+
+/*
+ * ResourceOwnerDelete
+ * Delete an owner object and its descendants.
+ *
+ * The caller must have already released all resources in the object tree.
+ */
+void
+ResourceOwnerDelete(ResourceOwner owner)
+{
+ /* We had better not be deleting CurrentResourceOwner ... */
+ Assert(owner != CurrentResourceOwner);
+
+ /* And it better not own any resources, either */
+ Assert(owner->nbuffers == 0);
+ Assert(owner->nlocks == 0);
+ Assert(owner->ncatrefs == 0);
+ Assert(owner->ncatlistrefs == 0);
+ Assert(owner->nrelrefs == 0);
+
+ /*
+ * Delete children. The recursive call will delink the child
+ * from me, so just iterate as long as there is a child.
+ */
+ while (owner->firstchild != NULL)
+ ResourceOwnerDelete(owner->firstchild);
+
+ /*
+ * We delink the owner from its parent before deleting it, so that
+ * if there's an error we won't have deleted/busted owners still
+ * attached to the owner tree. Better a leak than a crash.
+ */
+ ResourceOwnerNewParent(owner, NULL);
+
+ /* And free the object. */
+ if (owner->buffers)
+ pfree(owner->buffers);
+ if (owner->locks)
+ pfree(owner->locks);
+ if (owner->catrefs)
+ pfree(owner->catrefs);
+ if (owner->catlistrefs)
+ pfree(owner->catlistrefs);
+ if (owner->relrefs)
+ pfree(owner->relrefs);
+
+ pfree(owner);
+}
+
+/*
+ * Reassign a ResourceOwner to have a new parent
+ */
+void
+ResourceOwnerNewParent(ResourceOwner owner,
+ ResourceOwner newparent)
+{
+ ResourceOwner oldparent = owner->parent;
+
+ if (oldparent)
+ {
+ if (owner == oldparent->firstchild)
+ oldparent->firstchild = owner->nextchild;
+ else
+ {
+ ResourceOwner child;
+
+ for (child = oldparent->firstchild; child; child = child->nextchild)
+ {
+ if (owner == child->nextchild)
+ {
+ child->nextchild = owner->nextchild;
+ break;
+ }
+ }
+ }
+ }
+
+ if (newparent)
+ {
+ Assert(owner != newparent);
+ owner->parent = newparent;
+ owner->nextchild = newparent->firstchild;
+ newparent->firstchild = owner;
+ }
+ else
+ {
+ owner->parent = NULL;
+ owner->nextchild = NULL;
+ }
+}
+
+/*
+ * Register or deregister callback functions for resource cleanup
+ *
+ * These functions are intended for use by dynamically loaded modules.
+ * For built-in modules we generally just hardwire the appropriate calls.
+ *
+ * Note that the callback occurs post-commit or post-abort, so the callback
+ * functions can only do noncritical cleanup.
+ */
+void
+RegisterResourceReleaseCallback(ResourceReleaseCallback callback, void *arg)
+{
+ ResourceReleaseCallbackItem *item;
+
+ item = (ResourceReleaseCallbackItem *)
+ MemoryContextAlloc(TopMemoryContext,
+ sizeof(ResourceReleaseCallbackItem));
+ item->callback = callback;
+ item->arg = arg;
+ item->next = ResourceRelease_callbacks;
+ ResourceRelease_callbacks = item;
+}
+
+void
+UnregisterResourceReleaseCallback(ResourceReleaseCallback callback, void *arg)
+{
+ ResourceReleaseCallbackItem *item;
+ ResourceReleaseCallbackItem *prev;
+
+ prev = NULL;
+ for (item = ResourceRelease_callbacks; item; prev = item, item = item->next)
+ {
+ if (item->callback == callback && item->arg == arg)
+ {
+ if (prev)
+ prev->next = item->next;
+ else
+ ResourceRelease_callbacks = item->next;
+ pfree(item);
+ break;
+ }
+ }
+}
+
+
+/*
+ * Make sure there is room for at least one more entry in a ResourceOwner's
+ * buffer array.
+ *
+ * This is separate from actually inserting an entry because if we run out
+ * of memory, it's critical to do so *before* acquiring the resource.
+ *
+ * We allow the case owner == NULL because the bufmgr is sometimes invoked
+ * outside any transaction (for example, in the bgwriter).
+ */
+void
+ResourceOwnerEnlargeBuffers(ResourceOwner owner)
+{
+ int newmax;
+
+ if (owner == NULL ||
+ owner->nbuffers < owner->maxbuffers)
+ return; /* nothing to do */
+
+ if (owner->buffers == NULL)
+ {
+ newmax = 16;
+ owner->buffers = (Buffer *)
+ MemoryContextAlloc(TopMemoryContext, newmax * sizeof(Buffer));
+ owner->maxbuffers = newmax;
+ }
+ else
+ {
+ newmax = owner->maxbuffers * 2;
+ owner->buffers = (Buffer *)
+ repalloc(owner->buffers, newmax * sizeof(Buffer));
+ owner->maxbuffers = newmax;
+ }
+}
+
+/*
+ * Remember that a buffer pin is owned by a ResourceOwner
+ *
+ * Caller must have previously done ResourceOwnerEnlargeBuffers()
+ *
+ * We allow the case owner == NULL because the bufmgr is sometimes invoked
+ * outside any transaction (for example, in the bgwriter).
+ */
+void
+ResourceOwnerRememberBuffer(ResourceOwner owner, Buffer buffer)
+{
+ if (owner != NULL)
+ {
+ Assert(owner->nbuffers < owner->maxbuffers);
+ owner->buffers[owner->nbuffers] = buffer;
+ owner->nbuffers++;
+ }
+}
+
+/*
+ * Forget that a buffer pin is owned by a ResourceOwner
+ *
+ * We allow the case owner == NULL because the bufmgr is sometimes invoked
+ * outside any transaction (for example, in the bgwriter).
+ */
+void
+ResourceOwnerForgetBuffer(ResourceOwner owner, Buffer buffer)
+{
+ if (owner != NULL)
+ {
+ Buffer *buffers = owner->buffers;
+ int nb1 = owner->nbuffers - 1;
+ int i;
+
+ /*
+ * Scan back-to-front because it's more likely we are releasing
+ * a recently pinned buffer. This isn't always the case of course,
+ * but it's the way to bet.
+ */
+ for (i = nb1; i >= 0; i--)
+ {
+ if (buffers[i] == buffer)
+ {
+ while (i < nb1)
+ {
+ buffers[i] = buffers[i + 1];
+ i++;
+ }
+ owner->nbuffers = nb1;
+ return;
+ }
+ }
+ elog(ERROR, "buffer %d is not owned by resource owner %s",
+ buffer, owner->name);
+ }
+}
+
+/*
+ * Make sure there is room for at least one more entry in a ResourceOwner's
+ * lock array.
+ *
+ * This is separate from actually inserting an entry because if we run out
+ * of memory, it's critical to do so *before* acquiring the resource.
+ */
+void
+ResourceOwnerEnlargeLocks(ResourceOwner owner)
+{
+ int newmax;
+
+ if (owner->nlocks < owner->maxlocks)
+ return; /* nothing to do */
+
+ if (owner->locks == NULL)
+ {
+ newmax = 16;
+ owner->locks = (LockIdData *)
+ MemoryContextAlloc(TopMemoryContext, newmax * sizeof(LockIdData));
+ owner->maxlocks = newmax;
+ }
+ else
+ {
+ newmax = owner->maxlocks * 2;
+ owner->locks = (LockIdData *)
+ repalloc(owner->locks, newmax * sizeof(LockIdData));
+ owner->maxlocks = newmax;
+ }
+}
+
+/*
+ * Remember that a lock is owned by a ResourceOwner
+ *
+ * Caller must have previously done ResourceOwnerEnlargeLocks()
+ */
+void
+ResourceOwnerRememberLock(ResourceOwner owner,
+ LOCKTAG *locktag,
+ TransactionId xid,
+ LOCKMODE lockmode)
+{
+ /* Session locks and user locks are not transactional */
+ if (xid != InvalidTransactionId &&
+ locktag->lockmethodid == DEFAULT_LOCKMETHOD)
+ {
+ Assert(owner->nlocks < owner->maxlocks);
+ owner->locks[owner->nlocks].locktag = *locktag;
+ owner->locks[owner->nlocks].xid = xid;
+ owner->locks[owner->nlocks].lockmode = lockmode;
+ owner->nlocks++;
+ }
+}
+
+/*
+ * Forget that a lock is owned by a ResourceOwner
+ */
+void
+ResourceOwnerForgetLock(ResourceOwner owner,
+ LOCKTAG *locktag,
+ TransactionId xid,
+ LOCKMODE lockmode)
+{
+ /* Session locks and user locks are not transactional */
+ if (xid != InvalidTransactionId &&
+ locktag->lockmethodid == DEFAULT_LOCKMETHOD)
+ {
+ LockIdData *locks = owner->locks;
+ int nl1 = owner->nlocks - 1;
+ int i;
+
+ for (i = nl1; i >= 0; i--)
+ {
+ if (memcmp(&locks[i].locktag, locktag, sizeof(LOCKTAG)) == 0 &&
+ locks[i].xid == xid &&
+ locks[i].lockmode == lockmode)
+ {
+ while (i < nl1)
+ {
+ locks[i] = locks[i + 1];
+ i++;
+ }
+ owner->nlocks = nl1;
+ return;
+ }
+ }
+ elog(ERROR, "lock %u/%u/%u is not owned by resource owner %s",
+ locktag->relId, locktag->dbId, locktag->objId.xid, owner->name);
+ }
+}
+
+/*
+ * Make sure there is room for at least one more entry in a ResourceOwner's
+ * catcache reference array.
+ *
+ * This is separate from actually inserting an entry because if we run out
+ * of memory, it's critical to do so *before* acquiring the resource.
+ */
+void
+ResourceOwnerEnlargeCatCacheRefs(ResourceOwner owner)
+{
+ int newmax;
+
+ if (owner->ncatrefs < owner->maxcatrefs)
+ return; /* nothing to do */
+
+ if (owner->catrefs == NULL)
+ {
+ newmax = 16;
+ owner->catrefs = (HeapTuple *)
+ MemoryContextAlloc(TopMemoryContext, newmax * sizeof(HeapTuple));
+ owner->maxcatrefs = newmax;
+ }
+ else
+ {
+ newmax = owner->maxcatrefs * 2;
+ owner->catrefs = (HeapTuple *)
+ repalloc(owner->catrefs, newmax * sizeof(HeapTuple));
+ owner->maxcatrefs = newmax;
+ }
+}
+
+/*
+ * Remember that a catcache reference is owned by a ResourceOwner
+ *
+ * Caller must have previously done ResourceOwnerEnlargeCatCacheRefs()
+ */
+void
+ResourceOwnerRememberCatCacheRef(ResourceOwner owner, HeapTuple tuple)
+{
+ Assert(owner->ncatrefs < owner->maxcatrefs);
+ owner->catrefs[owner->ncatrefs] = tuple;
+ owner->ncatrefs++;
+}
+
+/*
+ * Forget that a catcache reference is owned by a ResourceOwner
+ */
+void
+ResourceOwnerForgetCatCacheRef(ResourceOwner owner, HeapTuple tuple)
+{
+ HeapTuple *catrefs = owner->catrefs;
+ int nc1 = owner->ncatrefs - 1;
+ int i;
+
+ for (i = nc1; i >= 0; i--)
+ {
+ if (catrefs[i] == tuple)
+ {
+ while (i < nc1)
+ {
+ catrefs[i] = catrefs[i + 1];
+ i++;
+ }
+ owner->ncatrefs = nc1;
+ return;
+ }
+ }
+ elog(ERROR, "catcache reference %p is not owned by resource owner %s",
+ tuple, owner->name);
+}
+
+/*
+ * Make sure there is room for at least one more entry in a ResourceOwner's
+ * catcache-list reference array.
+ *
+ * This is separate from actually inserting an entry because if we run out
+ * of memory, it's critical to do so *before* acquiring the resource.
+ */
+void
+ResourceOwnerEnlargeCatCacheListRefs(ResourceOwner owner)
+{
+ int newmax;
+
+ if (owner->ncatlistrefs < owner->maxcatlistrefs)
+ return; /* nothing to do */
+
+ if (owner->catlistrefs == NULL)
+ {
+ newmax = 16;
+ owner->catlistrefs = (CatCList **)
+ MemoryContextAlloc(TopMemoryContext, newmax * sizeof(CatCList *));
+ owner->maxcatlistrefs = newmax;
+ }
+ else
+ {
+ newmax = owner->maxcatlistrefs * 2;
+ owner->catlistrefs = (CatCList **)
+ repalloc(owner->catlistrefs, newmax * sizeof(CatCList *));
+ owner->maxcatlistrefs = newmax;
+ }
+}
+
+/*
+ * Remember that a catcache-list reference is owned by a ResourceOwner
+ *
+ * Caller must have previously done ResourceOwnerEnlargeCatCacheListRefs()
+ */
+void
+ResourceOwnerRememberCatCacheListRef(ResourceOwner owner, CatCList *list)
+{
+ Assert(owner->ncatlistrefs < owner->maxcatlistrefs);
+ owner->catlistrefs[owner->ncatlistrefs] = list;
+ owner->ncatlistrefs++;
+}
+
+/*
+ * Forget that a catcache-list reference is owned by a ResourceOwner
+ */
+void
+ResourceOwnerForgetCatCacheListRef(ResourceOwner owner, CatCList *list)
+{
+ CatCList **catlistrefs = owner->catlistrefs;
+ int nc1 = owner->ncatlistrefs - 1;
+ int i;
+
+ for (i = nc1; i >= 0; i--)
+ {
+ if (catlistrefs[i] == list)
+ {
+ while (i < nc1)
+ {
+ catlistrefs[i] = catlistrefs[i + 1];
+ i++;
+ }
+ owner->ncatlistrefs = nc1;
+ return;
+ }
+ }
+ elog(ERROR, "catcache list reference %p is not owned by resource owner %s",
+ list, owner->name);
+}
+
+/*
+ * Make sure there is room for at least one more entry in a ResourceOwner's
+ * relcache reference array.
+ *
+ * This is separate from actually inserting an entry because if we run out
+ * of memory, it's critical to do so *before* acquiring the resource.
+ */
+void
+ResourceOwnerEnlargeRelationRefs(ResourceOwner owner)
+{
+ int newmax;
+
+ if (owner->nrelrefs < owner->maxrelrefs)
+ return; /* nothing to do */
+
+ if (owner->relrefs == NULL)
+ {
+ newmax = 16;
+ owner->relrefs = (Relation *)
+ MemoryContextAlloc(TopMemoryContext, newmax * sizeof(Relation));
+ owner->maxrelrefs = newmax;
+ }
+ else
+ {
+ newmax = owner->maxrelrefs * 2;
+ owner->relrefs = (Relation *)
+ repalloc(owner->relrefs, newmax * sizeof(Relation));
+ owner->maxrelrefs = newmax;
+ }
+}
+
+/*
+ * Remember that a relcache reference is owned by a ResourceOwner
+ *
+ * Caller must have previously done ResourceOwnerEnlargeRelationRefs()
+ */
+void
+ResourceOwnerRememberRelationRef(ResourceOwner owner, Relation rel)
+{
+ Assert(owner->nrelrefs < owner->maxrelrefs);
+ owner->relrefs[owner->nrelrefs] = rel;
+ owner->nrelrefs++;
+}
+
+/*
+ * Forget that a relcache reference is owned by a ResourceOwner
+ */
+void
+ResourceOwnerForgetRelationRef(ResourceOwner owner, Relation rel)
+{
+ Relation *relrefs = owner->relrefs;
+ int nr1 = owner->nrelrefs - 1;
+ int i;
+
+ for (i = nr1; i >= 0; i--)
+ {
+ if (relrefs[i] == rel)
+ {
+ while (i < nr1)
+ {
+ relrefs[i] = relrefs[i + 1];
+ i++;
+ }
+ owner->nrelrefs = nr1;
+ return;
+ }
+ }
+ elog(ERROR, "relcache reference %s is not owned by resource owner %s",
+ RelationGetRelationName(rel), owner->name);
+}