From e8cab5fe49c45e9dc2990e36ecd6a42bf01dc4bc Mon Sep 17 00:00:00 2001 From: Teodor Sigaev Date: Mon, 27 Jun 2005 12:45:23 +0000 Subject: Concurrency for GiST - full concurrency for insert/update/select/vacuum: - select and vacuum never locks more than one page simultaneously - select (gettuple) hasn't any lock across it's calls - insert never locks more than two page simultaneously: - during search of leaf to insert it locks only one page simultaneously - while walk upward to the root it locked only parent (may be non-direct parent) and child. One of them X-lock, another may be S- or X-lock - 'vacuum full' locks index - improve gistgetmulti - simplify XLOG records Fix bug in index_beginscan_internal: LockRelation may clean rd_aminfo structure, so move GET_REL_PROCEDURE after LockRelation --- src/backend/access/gist/gistget.c | 326 ++++++++++++++++++++++++-------------- 1 file changed, 204 insertions(+), 122 deletions(-) (limited to 'src/backend/access/gist/gistget.c') diff --git a/src/backend/access/gist/gistget.c b/src/backend/access/gist/gistget.c index 4bce9962f3..823defa3a6 100644 --- a/src/backend/access/gist/gistget.c +++ b/src/backend/access/gist/gistget.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.49 2005/06/20 10:29:36 teodor Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.50 2005/06/27 12:45:22 teodor Exp $ * *------------------------------------------------------------------------- */ @@ -21,10 +21,63 @@ static OffsetNumber gistfindnext(IndexScanDesc scan, OffsetNumber n, ScanDirection dir); -static bool gistnext(IndexScanDesc scan, ScanDirection dir); +static int gistnext(IndexScanDesc scan, ScanDirection dir, ItemPointer tids, int maxtids, bool ignore_killed_tuples); static bool gistindex_keytest(IndexTuple tuple, IndexScanDesc scan, OffsetNumber offset); +static void +killtuple(Relation r, GISTScanOpaque so, ItemPointer iptr) { + Buffer buffer = so->curbuf; + + for(;;) { + Page p; + BlockNumber blkno; + OffsetNumber offset, maxoff; + + LockBuffer( buffer, GIST_SHARE ); + p = (Page)BufferGetPage( buffer ); + + if ( buffer == so->curbuf && XLByteEQ( so->stack->lsn, PageGetLSN(p) ) ) { + /* page unchanged, so all is simple */ + offset = ItemPointerGetOffsetNumber(iptr); + PageGetItemId(p, offset)->lp_flags |= LP_DELETE; + SetBufferCommitInfoNeedsSave(buffer); + LockBuffer( buffer, GIST_UNLOCK ); + break; + } + + maxoff = PageGetMaxOffsetNumber( p ); + + for(offset = FirstOffsetNumber; offset<= maxoff; offset = OffsetNumberNext(offset)) { + IndexTuple ituple = (IndexTuple) PageGetItem(p, PageGetItemId(p, offset)); + + if ( ItemPointerEquals( &(ituple->t_tid), iptr ) ) { + /* found */ + PageGetItemId(p, offset)->lp_flags |= LP_DELETE; + SetBufferCommitInfoNeedsSave(buffer); + LockBuffer( buffer, GIST_UNLOCK ); + if ( buffer != so->curbuf ) + ReleaseBuffer( buffer ); + return; + } + } + + /* follow right link */ + /* + * ??? is it good? if tuple dropped by concurrent vacuum, + * we will read all leaf pages... + */ + blkno = GistPageGetOpaque(p)->rightlink; + LockBuffer( buffer, GIST_UNLOCK ); + if ( buffer != so->curbuf ) + ReleaseBuffer( buffer ); + + if ( blkno==InvalidBlockNumber ) + /* can't found, dropped by somebody else */ + return; + buffer = ReadBuffer( r, blkno ); + } +} /* * gistgettuple() -- Get the next tuple in the scan @@ -34,48 +87,27 @@ gistgettuple(PG_FUNCTION_ARGS) { IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0); ScanDirection dir = (ScanDirection) PG_GETARG_INT32(1); - Page page; - OffsetNumber offnum; GISTScanOpaque so; + ItemPointerData tid; + bool res; so = (GISTScanOpaque) scan->opaque; /* * If we have produced an index tuple in the past and the executor * has informed us we need to mark it as "killed", do so now. - * - * XXX: right now there is no concurrent access. In the - * future, we should (a) get a read lock on the page (b) check - * that the location of the previously-fetched tuple hasn't - * changed due to concurrent insertions. */ - if (scan->kill_prior_tuple && ItemPointerIsValid(&(scan->currentItemData))) - { - offnum = ItemPointerGetOffsetNumber(&(scan->currentItemData)); - page = BufferGetPage(so->curbuf); - PageGetItemId(page, offnum)->lp_flags |= LP_DELETE; - SetBufferCommitInfoNeedsSave(so->curbuf); - } + if (scan->kill_prior_tuple && ItemPointerIsValid(&(scan->currentItemData))) + killtuple(scan->indexRelation, so, &(scan->currentItemData)); /* * Get the next tuple that matches the search key. If asked to * skip killed tuples, continue looping until we find a non-killed * tuple that matches the search key. */ - for (;;) - { - bool res = gistnext(scan, dir); - - if (res == true && scan->ignore_killed_tuples) - { - offnum = ItemPointerGetOffsetNumber(&(scan->currentItemData)); - page = BufferGetPage(so->curbuf); - if (ItemIdDeleted(PageGetItemId(page, offnum))) - continue; - } + res = ( gistnext(scan, dir, &tid, 1, scan->ignore_killed_tuples) ) ? true : false; - PG_RETURN_BOOL(res); - } + PG_RETURN_BOOL(res); } Datum @@ -85,36 +117,28 @@ gistgetmulti(PG_FUNCTION_ARGS) ItemPointer tids = (ItemPointer) PG_GETARG_POINTER(1); int32 max_tids = PG_GETARG_INT32(2); int32 *returned_tids = (int32 *) PG_GETARG_POINTER(3); - bool res = true; - int32 ntids = 0; - /* XXX generic implementation: loop around guts of gistgettuple */ - while (ntids < max_tids) - { - res = gistnext(scan, ForwardScanDirection); - if (!res) - break; - tids[ntids] = scan->xs_ctup.t_self; - ntids++; - } - - *returned_tids = ntids; - PG_RETURN_BOOL(res); + *returned_tids = gistnext(scan, ForwardScanDirection, tids, max_tids, false); + + PG_RETURN_BOOL(*returned_tids == max_tids); } /* - * Fetch a tuple that matchs the search key; this can be invoked + * Fetch a tuples that matchs the search key; this can be invoked * either to fetch the first such tuple or subsequent matching * tuples. Returns true iff a matching tuple was found. */ -static bool -gistnext(IndexScanDesc scan, ScanDirection dir) +static int +gistnext(IndexScanDesc scan, ScanDirection dir, ItemPointer tids, int maxtids, bool ignore_killed_tuples) { Page p; OffsetNumber n; GISTScanOpaque so; - GISTSTACK *stk; + GISTSearchStack *stk; IndexTuple it; + GISTPageOpaque opaque; + bool resetoffset=false; + int ntids=0; so = (GISTScanOpaque) scan->opaque; @@ -122,107 +146,164 @@ gistnext(IndexScanDesc scan, ScanDirection dir) { /* Being asked to fetch the first entry, so start at the root */ Assert(so->curbuf == InvalidBuffer); - so->curbuf = ReadBuffer(scan->indexRelation, GIST_ROOT_BLKNO); - } + Assert(so->stack == NULL); - p = BufferGetPage(so->curbuf); + so->curbuf = ReadBuffer(scan->indexRelation, GIST_ROOT_BLKNO); + + stk = so->stack = (GISTSearchStack*) palloc0( sizeof(GISTSearchStack) ); - if (ItemPointerIsValid(&scan->currentItemData) == false) - { - if (ScanDirectionIsBackward(dir)) - n = PageGetMaxOffsetNumber(p); - else - n = FirstOffsetNumber; + stk->next = NULL; + stk->block = GIST_ROOT_BLKNO; + } else if ( so->curbuf == InvalidBuffer ) { + return 0; } - else - { - n = ItemPointerGetOffsetNumber(&(scan->currentItemData)); - if (ScanDirectionIsBackward(dir)) - n = OffsetNumberPrev(n); - else - n = OffsetNumberNext(n); - } + for(;;) { + /* First of all, we need lock buffer */ + Assert( so->curbuf != InvalidBuffer ); + LockBuffer( so->curbuf, GIST_SHARE ); + p = BufferGetPage(so->curbuf); + opaque = GistPageGetOpaque( p ); + resetoffset = false; + + if ( XLogRecPtrIsInvalid( so->stack->lsn ) || !XLByteEQ( so->stack->lsn, PageGetLSN(p) ) ) { + /* page changed from last visit or visit first time , reset offset */ + so->stack->lsn = PageGetLSN(p); + resetoffset = true; + + /* check page split, occured from last visit or visit to parent */ + if ( !XLogRecPtrIsInvalid( so->stack->parentlsn ) && + XLByteLT( so->stack->parentlsn, opaque->nsn ) && + opaque->rightlink != InvalidBlockNumber /* sanity check */ && + (so->stack->next==NULL || so->stack->next->block != opaque->rightlink) /* check if already added */) { + /* detect page split, follow right link to add pages */ + + stk = (GISTSearchStack*) palloc( sizeof(GISTSearchStack) ); + stk->next = so->stack->next; + stk->block = opaque->rightlink; + stk->parentlsn = so->stack->parentlsn; + memset( &(stk->lsn), 0, sizeof(GistNSN) ); + so->stack->next = stk; + } + } - for (;;) - { - n = gistfindnext(scan, n, dir); + /* if page is empty, then just skip it */ + if ( PageIsEmpty(p) ) { + LockBuffer( so->curbuf, GIST_UNLOCK ); + stk = so->stack->next; + pfree( so->stack ); + so->stack = stk; - if (!OffsetNumberIsValid(n)) - { - /* - * We ran out of matching index entries on the current - * page, so pop the top stack entry and use it to continue - * the search. - */ - /* If we're out of stack entries, we're done */ - if (so->stack == NULL) - { + if (so->stack == NULL) { ReleaseBuffer(so->curbuf); so->curbuf = InvalidBuffer; - return false; + return ntids; } - stk = so->stack; so->curbuf = ReleaseAndReadBuffer(so->curbuf, scan->indexRelation, - stk->block); - p = BufferGetPage(so->curbuf); + stk->block); + continue; + } + if (!GistPageIsLeaf(p) || resetoffset || ItemPointerIsValid(&scan->currentItemData) == false) + { if (ScanDirectionIsBackward(dir)) - n = OffsetNumberPrev(stk->offset); + n = PageGetMaxOffsetNumber(p); else - n = OffsetNumberNext(stk->offset); - - so->stack = stk->parent; - pfree(stk); - - continue; + n = FirstOffsetNumber; + } + else + { + n = ItemPointerGetOffsetNumber(&(scan->currentItemData)); + + if (ScanDirectionIsBackward(dir)) + n = OffsetNumberPrev(n); + else + n = OffsetNumberNext(n); } - if (GistPageIsLeaf(p)) + /* wonderfull, we can look at page */ + + for(;;) { - /* - * We've found a matching index entry in a leaf page, so - * return success. Note that we keep "curbuf" pinned so - * that we can efficiently resume the index scan later. - */ - ItemPointerSet(&(scan->currentItemData), + n = gistfindnext(scan, n, dir); + + if (!OffsetNumberIsValid(n)) + { + /* + * We ran out of matching index entries on the current + * page, so pop the top stack entry and use it to continue + * the search. + */ + LockBuffer( so->curbuf, GIST_UNLOCK ); + stk = so->stack->next; + pfree( so->stack ); + so->stack = stk; + + /* If we're out of stack entries, we're done */ + + if (so->stack == NULL) + { + ReleaseBuffer(so->curbuf); + so->curbuf = InvalidBuffer; + return ntids; + } + + so->curbuf = ReleaseAndReadBuffer(so->curbuf, scan->indexRelation, + stk->block); + /* XXX go up */ + break; + } + + if (GistPageIsLeaf(p)) + { + /* + * We've found a matching index entry in a leaf page, so + * return success. Note that we keep "curbuf" pinned so + * that we can efficiently resume the index scan later. + */ + + ItemPointerSet(&(scan->currentItemData), BufferGetBlockNumber(so->curbuf), n); - it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n)); - scan->xs_ctup.t_self = it->t_tid; - return true; - } - else - { - /* - * We've found an entry in an internal node whose key is - * consistent with the search key, so continue the search - * in the pointed-to child node (i.e. we search depth - * first). Push the current node onto the stack so we - * resume searching from this node later. - */ - BlockNumber child_block; - - stk = (GISTSTACK *) palloc(sizeof(GISTSTACK)); - stk->offset = n; - stk->block = BufferGetBlockNumber(so->curbuf); - stk->parent = so->stack; - so->stack = stk; + if ( ! ( ignore_killed_tuples && ItemIdDeleted(PageGetItemId(p, n)) ) ) { + it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n)); + tids[ntids] = scan->xs_ctup.t_self = it->t_tid; + ntids++; + + if ( ntids == maxtids ) { + LockBuffer( so->curbuf, GIST_UNLOCK ); + return ntids; + } + } + } + else + { + /* + * We've found an entry in an internal node whose key is + * consistent with the search key, so push it to stack + */ - it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n)); - child_block = ItemPointerGetBlockNumber(&(it->t_tid)); + stk = (GISTSearchStack *) palloc(sizeof(GISTSearchStack)); - so->curbuf = ReleaseAndReadBuffer(so->curbuf, scan->indexRelation, - child_block); - p = BufferGetPage(so->curbuf); + it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n)); + stk->block = ItemPointerGetBlockNumber(&(it->t_tid)); + memset( &(stk->lsn), 0, sizeof(GistNSN) ); + stk->parentlsn = so->stack->lsn; + + stk->next = so->stack->next; + so->stack->next = stk; + + } if (ScanDirectionIsBackward(dir)) - n = PageGetMaxOffsetNumber(p); + n = OffsetNumberPrev(n); else - n = FirstOffsetNumber; + n = OffsetNumberNext(n); } } + + return ntids; } /* @@ -313,6 +394,7 @@ gistindex_keytest(IndexTuple tuple, * Return the offset of the first index entry that is consistent with * the search key after offset 'n' in the current page. If there are * no more consistent entries, return InvalidOffsetNumber. + * Page should be locked.... */ static OffsetNumber gistfindnext(IndexScanDesc scan, OffsetNumber n, ScanDirection dir) -- cgit v1.2.1