diff options
Diffstat (limited to 'src/backend/access/nbtree/nbtutils.c')
| -rw-r--r-- | src/backend/access/nbtree/nbtutils.c | 201 |
1 files changed, 195 insertions, 6 deletions
diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c index 74d0605258..5db8f55288 100644 --- a/src/backend/access/nbtree/nbtutils.c +++ b/src/backend/access/nbtree/nbtutils.c @@ -8,17 +8,20 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtutils.c,v 1.73 2006/05/07 01:21:30 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtutils.c,v 1.74 2006/05/08 00:00:10 tgl Exp $ * *------------------------------------------------------------------------- */ #include "postgres.h" +#include <time.h> + #include "access/genam.h" #include "access/nbtree.h" #include "catalog/catalog.h" #include "executor/execdebug.h" +#include "miscadmin.h" static void _bt_mark_scankey_required(ScanKey skey); @@ -820,11 +823,13 @@ _bt_check_rowcompare(ScanKey skey, IndexTuple tuple, TupleDesc tupdesc, * delete. We cope with cases where items have moved right due to insertions. * If an item has moved off the current page due to a split, we'll fail to * find it and do nothing (this is not an error case --- we assume the item - * will eventually get marked in a future indexscan). Likewise, if the item - * has moved left due to deletions or disappeared itself, we'll not find it, - * but these cases are not worth optimizing. (Since deletions are only done - * by VACUUM, any deletion makes it highly likely that the dead item has been - * removed itself, and therefore searching left is not worthwhile.) + * will eventually get marked in a future indexscan). Note that because we + * hold pin on the target page continuously from initially reading the items + * until applying this function, VACUUM cannot have deleted any items from + * the page, and so there is no need to search left from the recorded offset. + * (This observation also guarantees that the item is still the right one + * to delete, which might otherwise be questionable since heap TIDs can get + * recycled.) */ void _bt_killitems(IndexScanDesc scan, bool haveLock) @@ -890,3 +895,187 @@ _bt_killitems(IndexScanDesc scan, bool haveLock) */ so->numKilled = 0; } + + +/* + * The following routines manage a shared-memory area in which we track + * assignment of "vacuum cycle IDs" to currently-active btree vacuuming + * operations. There is a single counter which increments each time we + * start a vacuum to assign it a cycle ID. Since multiple vacuums could + * be active concurrently, we have to track the cycle ID for each active + * vacuum; this requires at most MaxBackends entries (usually far fewer). + * We assume at most one vacuum can be active for a given index. + * + * Access to the shared memory area is controlled by BtreeVacuumLock. + * In principle we could use a separate lmgr locktag for each index, + * but a single LWLock is much cheaper, and given the short time that + * the lock is ever held, the concurrency hit should be minimal. + */ + +typedef struct BTOneVacInfo +{ + LockRelId relid; /* global identifier of an index */ + BTCycleId cycleid; /* cycle ID for its active VACUUM */ +} BTOneVacInfo; + +typedef struct BTVacInfo +{ + BTCycleId cycle_ctr; /* cycle ID most recently assigned */ + int num_vacuums; /* number of currently active VACUUMs */ + int max_vacuums; /* allocated length of vacuums[] array */ + BTOneVacInfo vacuums[1]; /* VARIABLE LENGTH ARRAY */ +} BTVacInfo; + +static BTVacInfo *btvacinfo; + + +/* + * _bt_vacuum_cycleid --- get the active vacuum cycle ID for an index, + * or zero if there is no active VACUUM + * + * Note: for correct interlocking, the caller must already hold pin and + * exclusive lock on each buffer it will store the cycle ID into. This + * ensures that even if a VACUUM starts immediately afterwards, it cannot + * process those pages until the page split is complete. + */ +BTCycleId +_bt_vacuum_cycleid(Relation rel) +{ + BTCycleId result = 0; + int i; + + /* Share lock is enough since this is a read-only operation */ + LWLockAcquire(BtreeVacuumLock, LW_SHARED); + + for (i = 0; i < btvacinfo->num_vacuums; i++) + { + BTOneVacInfo *vac = &btvacinfo->vacuums[i]; + + if (vac->relid.relId == rel->rd_lockInfo.lockRelId.relId && + vac->relid.dbId == rel->rd_lockInfo.lockRelId.dbId) + { + result = vac->cycleid; + break; + } + } + + LWLockRelease(BtreeVacuumLock); + return result; +} + +/* + * _bt_start_vacuum --- assign a cycle ID to a just-starting VACUUM operation + * + * Note: the caller must guarantee (via PG_TRY) that it will eventually call + * _bt_end_vacuum, else we'll permanently leak an array slot. + */ +BTCycleId +_bt_start_vacuum(Relation rel) +{ + BTCycleId result; + int i; + BTOneVacInfo *vac; + + LWLockAcquire(BtreeVacuumLock, LW_EXCLUSIVE); + + /* Assign the next cycle ID, being careful to avoid zero */ + do { + result = ++(btvacinfo->cycle_ctr); + } while (result == 0); + + /* Let's just make sure there's no entry already for this index */ + for (i = 0; i < btvacinfo->num_vacuums; i++) + { + vac = &btvacinfo->vacuums[i]; + if (vac->relid.relId == rel->rd_lockInfo.lockRelId.relId && + vac->relid.dbId == rel->rd_lockInfo.lockRelId.dbId) + elog(ERROR, "multiple active vacuums for index \"%s\"", + RelationGetRelationName(rel)); + } + + /* OK, add an entry */ + if (btvacinfo->num_vacuums >= btvacinfo->max_vacuums) + elog(ERROR, "out of btvacinfo slots"); + vac = &btvacinfo->vacuums[btvacinfo->num_vacuums]; + vac->relid = rel->rd_lockInfo.lockRelId; + vac->cycleid = result; + btvacinfo->num_vacuums++; + + LWLockRelease(BtreeVacuumLock); + return result; +} + +/* + * _bt_end_vacuum --- mark a btree VACUUM operation as done + * + * Note: this is deliberately coded not to complain if no entry is found; + * this allows the caller to put PG_TRY around the start_vacuum operation. + */ +void +_bt_end_vacuum(Relation rel) +{ + int i; + + LWLockAcquire(BtreeVacuumLock, LW_EXCLUSIVE); + + /* Find the array entry */ + for (i = 0; i < btvacinfo->num_vacuums; i++) + { + BTOneVacInfo *vac = &btvacinfo->vacuums[i]; + + if (vac->relid.relId == rel->rd_lockInfo.lockRelId.relId && + vac->relid.dbId == rel->rd_lockInfo.lockRelId.dbId) + { + /* Remove it by shifting down the last entry */ + *vac = btvacinfo->vacuums[btvacinfo->num_vacuums - 1]; + btvacinfo->num_vacuums--; + break; + } + } + + LWLockRelease(BtreeVacuumLock); +} + +/* + * BTreeShmemSize --- report amount of shared memory space needed + */ +Size +BTreeShmemSize(void) +{ + Size size; + + size = offsetof(BTVacInfo, vacuums[0]); + size = add_size(size, mul_size(MaxBackends, sizeof(BTOneVacInfo))); + return size; +} + +/* + * BTreeShmemInit --- initialize this module's shared memory + */ +void +BTreeShmemInit(void) +{ + bool found; + + btvacinfo = (BTVacInfo *) ShmemInitStruct("BTree Vacuum State", + BTreeShmemSize(), + &found); + + if (!IsUnderPostmaster) + { + /* Initialize shared memory area */ + Assert(!found); + + /* + * It doesn't really matter what the cycle counter starts at, but + * having it always start the same doesn't seem good. Seed with + * low-order bits of time() instead. + */ + btvacinfo->cycle_ctr = (BTCycleId) time(NULL); + + btvacinfo->num_vacuums = 0; + btvacinfo->max_vacuums = MaxBackends; + } + else + Assert(found); +} |
