diff options
author | Tom Lane <tgl@sss.pgh.pa.us> | 2006-03-30 23:03:10 +0000 |
---|---|---|
committer | Tom Lane <tgl@sss.pgh.pa.us> | 2006-03-30 23:03:10 +0000 |
commit | 89395bfa6f2fafccec10be377fcf759030910654 (patch) | |
tree | 836d0cd5721066b626d5e58f9028a6989e1cb97d /src/backend/access/gist/gist.c | |
parent | 4243f2387a762b26c8e6be5c8094425798d86ea3 (diff) | |
download | postgresql-89395bfa6f2fafccec10be377fcf759030910654.tar.gz |
Improve gist XLOG code to follow the coding rules needed to prevent
torn-page problems. This introduces some issues of its own, mainly
that there are now some critical sections of unreasonably broad scope,
but it's a step forward anyway. Further cleanup will require some
code refactoring that I'd prefer to get Oleg and Teodor involved in.
Diffstat (limited to 'src/backend/access/gist/gist.c')
-rw-r--r-- | src/backend/access/gist/gist.c | 105 |
1 files changed, 53 insertions, 52 deletions
diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c index de880831bf..d997db37ef 100644 --- a/src/backend/access/gist/gist.c +++ b/src/backend/access/gist/gist.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.129 2006/03/05 15:58:20 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.130 2006/03/30 23:03:09 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -90,6 +90,7 @@ gistbuild(PG_FUNCTION_ARGS) double reltuples; GISTBuildState buildstate; Buffer buffer; + Page page; /* * We expect to be called exactly once for any index relation. If that's @@ -104,33 +105,33 @@ gistbuild(PG_FUNCTION_ARGS) /* initialize the root page */ buffer = gistNewBuffer(index); + Assert(BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO); + page = BufferGetPage(buffer); + + START_CRIT_SECTION(); + GISTInitBuffer(buffer, F_LEAF); if (!index->rd_istemp) { XLogRecPtr recptr; XLogRecData rdata; - Page page; - rdata.buffer = InvalidBuffer; rdata.data = (char *) &(index->rd_node); rdata.len = sizeof(RelFileNode); + rdata.buffer = InvalidBuffer; rdata.next = NULL; - page = BufferGetPage(buffer); - - START_CRIT_SECTION(); - recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_CREATE_INDEX, &rdata); PageSetLSN(page, recptr); PageSetTLI(page, ThisTimeLineID); - - END_CRIT_SECTION(); } else - PageSetLSN(BufferGetPage(buffer), XLogRecPtrForTemp); + PageSetLSN(page, XLogRecPtrForTemp); LockBuffer(buffer, GIST_UNLOCK); WriteBuffer(buffer); + END_CRIT_SECTION(); + /* build the index */ buildstate.numindexattrs = indexInfo->ii_NumIndexAttrs; buildstate.indtuples = 0; @@ -305,6 +306,15 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) bool is_splitted = false; bool is_leaf = (GistPageIsLeaf(state->stack->page)) ? true : false; + /* + * XXX this code really ought to work by locking, but not modifying, + * all the buffers it needs; then starting a critical section; then + * modifying the buffers in an already-determined way and writing an + * XLOG record to reflect that. Since it doesn't, we've got to put + * a critical section around the entire process, which is horrible + * from a robustness point of view. + */ + START_CRIT_SECTION(); if (!is_leaf) @@ -312,6 +322,11 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) * This node's key has been modified, either because a child split * occurred or because we needed to adjust our key for an insert in a * child node. Therefore, remove the old version of this node's key. + * + * Note: for WAL replay, in the non-split case we handle this by + * setting up a one-element todelete array; in the split case, it's + * handled implicitly because the tuple vector passed to gistSplit + * won't include this tuple. */ PageIndexTupleDelete(state->stack->page, state->stack->childoffnum); @@ -336,9 +351,7 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) XLogRecData *rdata; rdata = formSplitRdata(state->r->rd_node, state->stack->blkno, - &(state->key), dist); - - START_CRIT_SECTION(); + is_leaf, &(state->key), dist); recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata); ptr = dist; @@ -348,8 +361,6 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) PageSetTLI(BufferGetPage(ptr->buffer), ThisTimeLineID); ptr = ptr->next; } - - END_CRIT_SECTION(); } else { @@ -410,7 +421,6 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) else ourpage = dist; - /* now gets all needed data, and sets nsn's */ page = (Page) BufferGetPage(ourpage->buffer); opaque = GistPageGetOpaque(page); @@ -437,8 +447,11 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) WriteBuffer(ptr->buffer); ptr = ptr->next; } + + WriteNoReleaseBuffer(state->stack->buffer); } - WriteNoReleaseBuffer(state->stack->buffer); + + END_CRIT_SECTION(); } else { @@ -451,7 +464,7 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) if (!state->r->rd_istemp) { OffsetNumber noffs = 0, - offs[MAXALIGN(sizeof(OffsetNumber)) / sizeof(OffsetNumber)]; + offs[1]; XLogRecPtr recptr; XLogRecData *rdata; @@ -462,17 +475,14 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) noffs = 1; } - rdata = formUpdateRdata(state->r->rd_node, state->stack->blkno, - offs, noffs, false, state->itup, state->ituplen, + rdata = formUpdateRdata(state->r->rd_node, state->stack->buffer, + offs, noffs, false, + state->itup, state->ituplen, &(state->key)); - START_CRIT_SECTION(); - - recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata); + recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata); PageSetLSN(state->stack->page, recptr); PageSetTLI(state->stack->page, ThisTimeLineID); - - END_CRIT_SECTION(); } else PageSetLSN(state->stack->page, XLogRecPtrForTemp); @@ -481,6 +491,8 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) state->needInsertComplete = false; WriteNoReleaseBuffer(state->stack->buffer); + END_CRIT_SECTION(); + if (!is_leaf) /* small optimization: inform scan ablout * deleting... */ gistadjscans(state->r, GISTOP_DEL, state->stack->blkno, @@ -636,30 +648,14 @@ gistfindleaf(GISTInsertState *state, GISTSTATE *giststate) } /* - * Should have the same interface as XLogReadBuffer - */ -static Buffer -gistReadAndLockBuffer(Relation r, BlockNumber blkno) -{ - Buffer buffer = ReadBuffer(r, blkno); - - LockBuffer(buffer, GIST_SHARE); - return buffer; -} - -/* - * Traverse the tree to find path from root page. + * Traverse the tree to find path from root page to specified "child" block. * * returns from the begining of closest parent; * - * Function is used in both regular and recovery mode, so must work with - * different read functions (gistReadAndLockBuffer and XLogReadBuffer) - * * To prevent deadlocks, this should lock only one page simultaneously. */ GISTInsertStack * -gistFindPath(Relation r, BlockNumber child, - Buffer (*myReadBuffer) (Relation, BlockNumber)) +gistFindPath(Relation r, BlockNumber child) { Page page; Buffer buffer; @@ -677,7 +673,8 @@ gistFindPath(Relation r, BlockNumber child, while (top && top->blkno != child) { - buffer = myReadBuffer(r, top->blkno); /* locks buffer */ + buffer = ReadBuffer(r, top->blkno); + LockBuffer(buffer, GIST_SHARE); gistcheckpage(r, buffer); page = (Page) BufferGetPage(buffer); @@ -833,7 +830,7 @@ gistFindCorrectParent(Relation r, GISTInsertStack *child) } /* ok, find new path */ - ptr = parent = gistFindPath(r, child->blkno, gistReadAndLockBuffer); + ptr = parent = gistFindPath(r, child->blkno); Assert(ptr != NULL); /* read all buffers as expected by caller */ @@ -1192,27 +1189,31 @@ gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer ke Assert(BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO); page = BufferGetPage(buffer); - GISTInitBuffer(buffer, 0); + START_CRIT_SECTION(); + + GISTInitBuffer(buffer, 0); /* XXX not F_LEAF? */ gistfillbuffer(r, page, itup, len, FirstOffsetNumber); + if (!r->rd_istemp) { XLogRecPtr recptr; XLogRecData *rdata; - rdata = formUpdateRdata(r->rd_node, GIST_ROOT_BLKNO, - NULL, 0, false, itup, len, key); - - START_CRIT_SECTION(); + rdata = formUpdateRdata(r->rd_node, buffer, + NULL, 0, false, + itup, len, key); recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_NEW_ROOT, rdata); PageSetLSN(page, recptr); PageSetTLI(page, ThisTimeLineID); - - END_CRIT_SECTION(); } else PageSetLSN(page, XLogRecPtrForTemp); + + WriteNoReleaseBuffer(buffer); + + END_CRIT_SECTION(); } void |