summaryrefslogtreecommitdiff
path: root/src/backend/access/gist/gistget.c
diff options
context:
space:
mode:
authorTeodor Sigaev <teodor@sigaev.ru>2005-06-27 12:45:23 +0000
committerTeodor Sigaev <teodor@sigaev.ru>2005-06-27 12:45:23 +0000
commite8cab5fe49c45e9dc2990e36ecd6a42bf01dc4bc (patch)
treeb4950c8a1550ce26aa276a157b8680732ae3ac9b /src/backend/access/gist/gistget.c
parentc3be085ab7a21e01f530357d962fa22f74a637ef (diff)
downloadpostgresql-e8cab5fe49c45e9dc2990e36ecd6a42bf01dc4bc.tar.gz
Concurrency for GiST
- full concurrency for insert/update/select/vacuum: - select and vacuum never locks more than one page simultaneously - select (gettuple) hasn't any lock across it's calls - insert never locks more than two page simultaneously: - during search of leaf to insert it locks only one page simultaneously - while walk upward to the root it locked only parent (may be non-direct parent) and child. One of them X-lock, another may be S- or X-lock - 'vacuum full' locks index - improve gistgetmulti - simplify XLOG records Fix bug in index_beginscan_internal: LockRelation may clean rd_aminfo structure, so move GET_REL_PROCEDURE after LockRelation
Diffstat (limited to 'src/backend/access/gist/gistget.c')
-rw-r--r--src/backend/access/gist/gistget.c326
1 files changed, 204 insertions, 122 deletions
diff --git a/src/backend/access/gist/gistget.c b/src/backend/access/gist/gistget.c
index 4bce9962f3..823defa3a6 100644
--- a/src/backend/access/gist/gistget.c
+++ b/src/backend/access/gist/gistget.c
@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.49 2005/06/20 10:29:36 teodor Exp $
+ * $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.50 2005/06/27 12:45:22 teodor Exp $
*
*-------------------------------------------------------------------------
*/
@@ -21,10 +21,63 @@
static OffsetNumber gistfindnext(IndexScanDesc scan, OffsetNumber n,
ScanDirection dir);
-static bool gistnext(IndexScanDesc scan, ScanDirection dir);
+static int gistnext(IndexScanDesc scan, ScanDirection dir, ItemPointer tids, int maxtids, bool ignore_killed_tuples);
static bool gistindex_keytest(IndexTuple tuple, IndexScanDesc scan,
OffsetNumber offset);
+static void
+killtuple(Relation r, GISTScanOpaque so, ItemPointer iptr) {
+ Buffer buffer = so->curbuf;
+
+ for(;;) {
+ Page p;
+ BlockNumber blkno;
+ OffsetNumber offset, maxoff;
+
+ LockBuffer( buffer, GIST_SHARE );
+ p = (Page)BufferGetPage( buffer );
+
+ if ( buffer == so->curbuf && XLByteEQ( so->stack->lsn, PageGetLSN(p) ) ) {
+ /* page unchanged, so all is simple */
+ offset = ItemPointerGetOffsetNumber(iptr);
+ PageGetItemId(p, offset)->lp_flags |= LP_DELETE;
+ SetBufferCommitInfoNeedsSave(buffer);
+ LockBuffer( buffer, GIST_UNLOCK );
+ break;
+ }
+
+ maxoff = PageGetMaxOffsetNumber( p );
+
+ for(offset = FirstOffsetNumber; offset<= maxoff; offset = OffsetNumberNext(offset)) {
+ IndexTuple ituple = (IndexTuple) PageGetItem(p, PageGetItemId(p, offset));
+
+ if ( ItemPointerEquals( &(ituple->t_tid), iptr ) ) {
+ /* found */
+ PageGetItemId(p, offset)->lp_flags |= LP_DELETE;
+ SetBufferCommitInfoNeedsSave(buffer);
+ LockBuffer( buffer, GIST_UNLOCK );
+ if ( buffer != so->curbuf )
+ ReleaseBuffer( buffer );
+ return;
+ }
+ }
+
+ /* follow right link */
+ /*
+ * ??? is it good? if tuple dropped by concurrent vacuum,
+ * we will read all leaf pages...
+ */
+ blkno = GistPageGetOpaque(p)->rightlink;
+ LockBuffer( buffer, GIST_UNLOCK );
+ if ( buffer != so->curbuf )
+ ReleaseBuffer( buffer );
+
+ if ( blkno==InvalidBlockNumber )
+ /* can't found, dropped by somebody else */
+ return;
+ buffer = ReadBuffer( r, blkno );
+ }
+}
/*
* gistgettuple() -- Get the next tuple in the scan
@@ -34,48 +87,27 @@ gistgettuple(PG_FUNCTION_ARGS)
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
ScanDirection dir = (ScanDirection) PG_GETARG_INT32(1);
- Page page;
- OffsetNumber offnum;
GISTScanOpaque so;
+ ItemPointerData tid;
+ bool res;
so = (GISTScanOpaque) scan->opaque;
/*
* If we have produced an index tuple in the past and the executor
* has informed us we need to mark it as "killed", do so now.
- *
- * XXX: right now there is no concurrent access. In the
- * future, we should (a) get a read lock on the page (b) check
- * that the location of the previously-fetched tuple hasn't
- * changed due to concurrent insertions.
*/
- if (scan->kill_prior_tuple && ItemPointerIsValid(&(scan->currentItemData)))
- {
- offnum = ItemPointerGetOffsetNumber(&(scan->currentItemData));
- page = BufferGetPage(so->curbuf);
- PageGetItemId(page, offnum)->lp_flags |= LP_DELETE;
- SetBufferCommitInfoNeedsSave(so->curbuf);
- }
+ if (scan->kill_prior_tuple && ItemPointerIsValid(&(scan->currentItemData)))
+ killtuple(scan->indexRelation, so, &(scan->currentItemData));
/*
* Get the next tuple that matches the search key. If asked to
* skip killed tuples, continue looping until we find a non-killed
* tuple that matches the search key.
*/
- for (;;)
- {
- bool res = gistnext(scan, dir);
-
- if (res == true && scan->ignore_killed_tuples)
- {
- offnum = ItemPointerGetOffsetNumber(&(scan->currentItemData));
- page = BufferGetPage(so->curbuf);
- if (ItemIdDeleted(PageGetItemId(page, offnum)))
- continue;
- }
+ res = ( gistnext(scan, dir, &tid, 1, scan->ignore_killed_tuples) ) ? true : false;
- PG_RETURN_BOOL(res);
- }
+ PG_RETURN_BOOL(res);
}
Datum
@@ -85,36 +117,28 @@ gistgetmulti(PG_FUNCTION_ARGS)
ItemPointer tids = (ItemPointer) PG_GETARG_POINTER(1);
int32 max_tids = PG_GETARG_INT32(2);
int32 *returned_tids = (int32 *) PG_GETARG_POINTER(3);
- bool res = true;
- int32 ntids = 0;
- /* XXX generic implementation: loop around guts of gistgettuple */
- while (ntids < max_tids)
- {
- res = gistnext(scan, ForwardScanDirection);
- if (!res)
- break;
- tids[ntids] = scan->xs_ctup.t_self;
- ntids++;
- }
-
- *returned_tids = ntids;
- PG_RETURN_BOOL(res);
+ *returned_tids = gistnext(scan, ForwardScanDirection, tids, max_tids, false);
+
+ PG_RETURN_BOOL(*returned_tids == max_tids);
}
/*
- * Fetch a tuple that matchs the search key; this can be invoked
+ * Fetch a tuples that matchs the search key; this can be invoked
* either to fetch the first such tuple or subsequent matching
* tuples. Returns true iff a matching tuple was found.
*/
-static bool
-gistnext(IndexScanDesc scan, ScanDirection dir)
+static int
+gistnext(IndexScanDesc scan, ScanDirection dir, ItemPointer tids, int maxtids, bool ignore_killed_tuples)
{
Page p;
OffsetNumber n;
GISTScanOpaque so;
- GISTSTACK *stk;
+ GISTSearchStack *stk;
IndexTuple it;
+ GISTPageOpaque opaque;
+ bool resetoffset=false;
+ int ntids=0;
so = (GISTScanOpaque) scan->opaque;
@@ -122,107 +146,164 @@ gistnext(IndexScanDesc scan, ScanDirection dir)
{
/* Being asked to fetch the first entry, so start at the root */
Assert(so->curbuf == InvalidBuffer);
- so->curbuf = ReadBuffer(scan->indexRelation, GIST_ROOT_BLKNO);
- }
+ Assert(so->stack == NULL);
- p = BufferGetPage(so->curbuf);
+ so->curbuf = ReadBuffer(scan->indexRelation, GIST_ROOT_BLKNO);
+
+ stk = so->stack = (GISTSearchStack*) palloc0( sizeof(GISTSearchStack) );
- if (ItemPointerIsValid(&scan->currentItemData) == false)
- {
- if (ScanDirectionIsBackward(dir))
- n = PageGetMaxOffsetNumber(p);
- else
- n = FirstOffsetNumber;
+ stk->next = NULL;
+ stk->block = GIST_ROOT_BLKNO;
+ } else if ( so->curbuf == InvalidBuffer ) {
+ return 0;
}
- else
- {
- n = ItemPointerGetOffsetNumber(&(scan->currentItemData));
- if (ScanDirectionIsBackward(dir))
- n = OffsetNumberPrev(n);
- else
- n = OffsetNumberNext(n);
- }
+ for(;;) {
+ /* First of all, we need lock buffer */
+ Assert( so->curbuf != InvalidBuffer );
+ LockBuffer( so->curbuf, GIST_SHARE );
+ p = BufferGetPage(so->curbuf);
+ opaque = GistPageGetOpaque( p );
+ resetoffset = false;
+
+ if ( XLogRecPtrIsInvalid( so->stack->lsn ) || !XLByteEQ( so->stack->lsn, PageGetLSN(p) ) ) {
+ /* page changed from last visit or visit first time , reset offset */
+ so->stack->lsn = PageGetLSN(p);
+ resetoffset = true;
+
+ /* check page split, occured from last visit or visit to parent */
+ if ( !XLogRecPtrIsInvalid( so->stack->parentlsn ) &&
+ XLByteLT( so->stack->parentlsn, opaque->nsn ) &&
+ opaque->rightlink != InvalidBlockNumber /* sanity check */ &&
+ (so->stack->next==NULL || so->stack->next->block != opaque->rightlink) /* check if already added */) {
+ /* detect page split, follow right link to add pages */
+
+ stk = (GISTSearchStack*) palloc( sizeof(GISTSearchStack) );
+ stk->next = so->stack->next;
+ stk->block = opaque->rightlink;
+ stk->parentlsn = so->stack->parentlsn;
+ memset( &(stk->lsn), 0, sizeof(GistNSN) );
+ so->stack->next = stk;
+ }
+ }
- for (;;)
- {
- n = gistfindnext(scan, n, dir);
+ /* if page is empty, then just skip it */
+ if ( PageIsEmpty(p) ) {
+ LockBuffer( so->curbuf, GIST_UNLOCK );
+ stk = so->stack->next;
+ pfree( so->stack );
+ so->stack = stk;
- if (!OffsetNumberIsValid(n))
- {
- /*
- * We ran out of matching index entries on the current
- * page, so pop the top stack entry and use it to continue
- * the search.
- */
- /* If we're out of stack entries, we're done */
- if (so->stack == NULL)
- {
+ if (so->stack == NULL) {
ReleaseBuffer(so->curbuf);
so->curbuf = InvalidBuffer;
- return false;
+ return ntids;
}
- stk = so->stack;
so->curbuf = ReleaseAndReadBuffer(so->curbuf, scan->indexRelation,
- stk->block);
- p = BufferGetPage(so->curbuf);
+ stk->block);
+ continue;
+ }
+ if (!GistPageIsLeaf(p) || resetoffset || ItemPointerIsValid(&scan->currentItemData) == false)
+ {
if (ScanDirectionIsBackward(dir))
- n = OffsetNumberPrev(stk->offset);
+ n = PageGetMaxOffsetNumber(p);
else
- n = OffsetNumberNext(stk->offset);
-
- so->stack = stk->parent;
- pfree(stk);
-
- continue;
+ n = FirstOffsetNumber;
+ }
+ else
+ {
+ n = ItemPointerGetOffsetNumber(&(scan->currentItemData));
+
+ if (ScanDirectionIsBackward(dir))
+ n = OffsetNumberPrev(n);
+ else
+ n = OffsetNumberNext(n);
}
- if (GistPageIsLeaf(p))
+ /* wonderfull, we can look at page */
+
+ for(;;)
{
- /*
- * We've found a matching index entry in a leaf page, so
- * return success. Note that we keep "curbuf" pinned so
- * that we can efficiently resume the index scan later.
- */
- ItemPointerSet(&(scan->currentItemData),
+ n = gistfindnext(scan, n, dir);
+
+ if (!OffsetNumberIsValid(n))
+ {
+ /*
+ * We ran out of matching index entries on the current
+ * page, so pop the top stack entry and use it to continue
+ * the search.
+ */
+ LockBuffer( so->curbuf, GIST_UNLOCK );
+ stk = so->stack->next;
+ pfree( so->stack );
+ so->stack = stk;
+
+ /* If we're out of stack entries, we're done */
+
+ if (so->stack == NULL)
+ {
+ ReleaseBuffer(so->curbuf);
+ so->curbuf = InvalidBuffer;
+ return ntids;
+ }
+
+ so->curbuf = ReleaseAndReadBuffer(so->curbuf, scan->indexRelation,
+ stk->block);
+ /* XXX go up */
+ break;
+ }
+
+ if (GistPageIsLeaf(p))
+ {
+ /*
+ * We've found a matching index entry in a leaf page, so
+ * return success. Note that we keep "curbuf" pinned so
+ * that we can efficiently resume the index scan later.
+ */
+
+ ItemPointerSet(&(scan->currentItemData),
BufferGetBlockNumber(so->curbuf), n);
- it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n));
- scan->xs_ctup.t_self = it->t_tid;
- return true;
- }
- else
- {
- /*
- * We've found an entry in an internal node whose key is
- * consistent with the search key, so continue the search
- * in the pointed-to child node (i.e. we search depth
- * first). Push the current node onto the stack so we
- * resume searching from this node later.
- */
- BlockNumber child_block;
-
- stk = (GISTSTACK *) palloc(sizeof(GISTSTACK));
- stk->offset = n;
- stk->block = BufferGetBlockNumber(so->curbuf);
- stk->parent = so->stack;
- so->stack = stk;
+ if ( ! ( ignore_killed_tuples && ItemIdDeleted(PageGetItemId(p, n)) ) ) {
+ it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n));
+ tids[ntids] = scan->xs_ctup.t_self = it->t_tid;
+ ntids++;
+
+ if ( ntids == maxtids ) {
+ LockBuffer( so->curbuf, GIST_UNLOCK );
+ return ntids;
+ }
+ }
+ }
+ else
+ {
+ /*
+ * We've found an entry in an internal node whose key is
+ * consistent with the search key, so push it to stack
+ */
- it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n));
- child_block = ItemPointerGetBlockNumber(&(it->t_tid));
+ stk = (GISTSearchStack *) palloc(sizeof(GISTSearchStack));
- so->curbuf = ReleaseAndReadBuffer(so->curbuf, scan->indexRelation,
- child_block);
- p = BufferGetPage(so->curbuf);
+ it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n));
+ stk->block = ItemPointerGetBlockNumber(&(it->t_tid));
+ memset( &(stk->lsn), 0, sizeof(GistNSN) );
+ stk->parentlsn = so->stack->lsn;
+
+ stk->next = so->stack->next;
+ so->stack->next = stk;
+
+ }
if (ScanDirectionIsBackward(dir))
- n = PageGetMaxOffsetNumber(p);
+ n = OffsetNumberPrev(n);
else
- n = FirstOffsetNumber;
+ n = OffsetNumberNext(n);
}
}
+
+ return ntids;
}
/*
@@ -313,6 +394,7 @@ gistindex_keytest(IndexTuple tuple,
* Return the offset of the first index entry that is consistent with
* the search key after offset 'n' in the current page. If there are
* no more consistent entries, return InvalidOffsetNumber.
+ * Page should be locked....
*/
static OffsetNumber
gistfindnext(IndexScanDesc scan, OffsetNumber n, ScanDirection dir)