diff options
author | Teodor Sigaev <teodor@sigaev.ru> | 2005-06-20 10:29:37 +0000 |
---|---|---|
committer | Teodor Sigaev <teodor@sigaev.ru> | 2005-06-20 10:29:37 +0000 |
commit | d544ec8bbd70d50c1b50a00437b8061cabeeb5f2 (patch) | |
tree | 2eed78c3728c26840f525aa2aa8e2a4e6092f69e /src/backend/access/gist/gist.c | |
parent | 0b62bbe086261a12cc6779244e979c54233da055 (diff) | |
download | postgresql-d544ec8bbd70d50c1b50a00437b8061cabeeb5f2.tar.gz |
1. full functional WAL for GiST
2. improve vacuum for gist
- use FSM
- full vacuum:
- reforms parent tuple if it's needed
( tuples was deleted on child page or parent tuple remains invalid
after crash recovery )
- truncate index file if possible
3. fixes bugs and mistakes
Diffstat (limited to 'src/backend/access/gist/gist.c')
-rw-r--r-- | src/backend/access/gist/gist.c | 575 |
1 files changed, 164 insertions, 411 deletions
diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c index 4e3faccdf9..340f6b9b4f 100644 --- a/src/backend/access/gist/gist.c +++ b/src/backend/access/gist/gist.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.119 2005/06/14 11:45:13 teodor Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.120 2005/06/20 10:29:36 teodor Exp $ * *------------------------------------------------------------------------- */ @@ -29,7 +29,7 @@ typedef struct GISTSTATE giststate; int numindexattrs; double indtuples; - MemoryContext tmpCxt; + MemoryContext tmpCtx; } GISTBuildState; @@ -47,37 +47,14 @@ static void gistfindleaf(GISTInsertState *state, GISTSTATE *giststate); -typedef struct PageLayout { - gistxlogPage block; - OffsetNumber *list; - Buffer buffer; /* to write after all proceed */ - - struct PageLayout *next; -} PageLayout; - - #define ROTATEDIST(d) do { \ - PageLayout *tmp=(PageLayout*)palloc(sizeof(PageLayout)); \ - memset(tmp,0,sizeof(PageLayout)); \ + SplitedPageLayout *tmp=(SplitedPageLayout*)palloc(sizeof(SplitedPageLayout)); \ + memset(tmp,0,sizeof(SplitedPageLayout)); \ tmp->next = (d); \ (d)=tmp; \ } while(0) -static IndexTuple *gistSplit(Relation r, - Buffer buffer, - IndexTuple *itup, - int *len, - PageLayout **dist, - GISTSTATE *giststate); - - -#undef GISTDEBUG - -#ifdef GISTDEBUG -static void gist_dumptree(Relation r, int level, BlockNumber blk, OffsetNumber coff); -#endif - /* * Create and return a temporary memory context for use by GiST. We * _always_ invoke user-provided methods in a temporary memory @@ -124,7 +101,7 @@ gistbuild(PG_FUNCTION_ARGS) initGISTstate(&buildstate.giststate, index); /* initialize the root page */ - buffer = ReadBuffer(index, P_NEW); + buffer = gistReadBuffer(index, P_NEW); GISTInitBuffer(buffer, F_LEAF); if ( !index->rd_istemp ) { XLogRecPtr recptr; @@ -155,23 +132,20 @@ gistbuild(PG_FUNCTION_ARGS) * create a temporary memory context that is reset once for each * tuple inserted into the index */ - buildstate.tmpCxt = createTempGistContext(); + buildstate.tmpCtx = createTempGistContext(); /* do the heap scan */ reltuples = IndexBuildHeapScan(heap, index, indexInfo, gistbuildCallback, (void *) &buildstate); /* okay, all heap tuples are indexed */ - MemoryContextDelete(buildstate.tmpCxt); + MemoryContextDelete(buildstate.tmpCtx); /* since we just counted the # of tuples, may as well update stats */ IndexCloseAndUpdateStats(heap, reltuples, index, buildstate.indtuples); freeGISTstate(&buildstate.giststate); -#ifdef GISTDEBUG - gist_dumptree(index, 0, GIST_ROOT_BLKNO, 0); -#endif PG_RETURN_VOID(); } @@ -190,13 +164,13 @@ gistbuildCallback(Relation index, IndexTuple itup; GISTENTRY tmpcentry; int i; - MemoryContext oldCxt; + MemoryContext oldCtx; /* GiST cannot index tuples with leading NULLs */ if (isnull[0]) return; - oldCxt = MemoryContextSwitchTo(buildstate->tmpCxt); + oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx); /* immediately compress keys to normalize */ for (i = 0; i < buildstate->numindexattrs; i++) @@ -226,8 +200,8 @@ gistbuildCallback(Relation index, gistdoinsert(index, itup, &buildstate->giststate); buildstate->indtuples += 1; - MemoryContextSwitchTo(oldCxt); - MemoryContextReset(buildstate->tmpCxt); + MemoryContextSwitchTo(oldCtx); + MemoryContextReset(buildstate->tmpCtx); } /* @@ -251,8 +225,8 @@ gistinsert(PG_FUNCTION_ARGS) GISTSTATE giststate; GISTENTRY tmpentry; int i; - MemoryContext oldCxt; - MemoryContext insertCxt; + MemoryContext oldCtx; + MemoryContext insertCtx; /* * Since GIST is not marked "amconcurrent" in pg_am, caller should @@ -264,8 +238,8 @@ gistinsert(PG_FUNCTION_ARGS) if (isnull[0]) PG_RETURN_BOOL(false); - insertCxt = createTempGistContext(); - oldCxt = MemoryContextSwitchTo(insertCxt); + insertCtx = createTempGistContext(); + oldCtx = MemoryContextSwitchTo(insertCtx); initGISTstate(&giststate, r); @@ -289,8 +263,8 @@ gistinsert(PG_FUNCTION_ARGS) /* cleanup */ freeGISTstate(&giststate); - MemoryContextSwitchTo(oldCxt); - MemoryContextDelete(insertCxt); + MemoryContextSwitchTo(oldCtx); + MemoryContextDelete(insertCtx); PG_RETURN_BOOL(true); } @@ -315,7 +289,6 @@ gistdoinsert(Relation r, IndexTuple itup, GISTSTATE *giststate) state.r = r; state.key = itup->t_tid; state.needInsertComplete = true; - state.xlog_mode = false; state.stack = (GISTInsertStack*)palloc(sizeof(GISTInsertStack)); memset( state.stack, 0, sizeof(GISTInsertStack)); @@ -335,80 +308,27 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) { IndexTuple *itvec, *newitup; int tlen,olen; - PageLayout *dist=NULL, *ptr; + SplitedPageLayout *dist=NULL, *ptr; - memset(&dist, 0, sizeof(PageLayout)); is_splitted = true; itvec = gistextractbuffer(state->stack->buffer, &tlen); olen=tlen; itvec = gistjoinvector(itvec, &tlen, state->itup, state->ituplen); newitup = gistSplit(state->r, state->stack->buffer, itvec, &tlen, &dist, giststate); - if ( !state->r->rd_istemp && !state->xlog_mode) { - gistxlogPageSplit xlrec; - XLogRecPtr recptr; - XLogRecData *rdata; - int i, npage = 0, cur=1; - - ptr=dist; - while( ptr ) { - npage++; - ptr=ptr->next; - } - - rdata = (XLogRecData*)palloc(sizeof(XLogRecData)*(npage*2 + state->ituplen + 2)); - - xlrec.node = state->r->rd_node; - xlrec.origblkno = state->stack->blkno; - xlrec.npage = npage; - xlrec.nitup = state->ituplen; - xlrec.todeleteoffnum = ( state->stack->todelete ) ? state->stack->childoffnum : InvalidOffsetNumber; - xlrec.key = state->key; - xlrec.pathlen = (uint16)state->pathlen; - - rdata[0].buffer = InvalidBuffer; - rdata[0].data = (char *) &xlrec; - rdata[0].len = sizeof( gistxlogPageSplit ); - rdata[0].next = NULL; - - if ( state->pathlen>=0 ) { - rdata[0].next = &(rdata[1]); - rdata[1].buffer = InvalidBuffer; - rdata[1].data = (char *) (state->path); - rdata[1].len = sizeof( BlockNumber ) * state->pathlen; - rdata[1].next = NULL; - cur++; - } - - /* new tuples */ - for(i=0;i<state->ituplen;i++) { - rdata[cur].buffer = InvalidBuffer; - rdata[cur].data = (char*)(state->itup[i]); - rdata[cur].len = IndexTupleSize(state->itup[i]); - rdata[cur-1].next = &(rdata[cur]); - cur++; + if ( !state->r->rd_istemp ) { + OffsetNumber noffs=0, offs[ MAXALIGN( sizeof(OffsetNumber) ) / sizeof(OffsetNumber) ]; + XLogRecPtr recptr; + XLogRecData *rdata; + + if ( state->stack->todelete ) { + offs[0] = state->stack->childoffnum; + noffs=1; } - /* new page layout */ - ptr=dist; - while(ptr) { - rdata[cur].buffer = InvalidBuffer; - rdata[cur].data = (char*)&(ptr->block); - rdata[cur].len = sizeof(gistxlogPage); - rdata[cur-1].next = &(rdata[cur]); - cur++; - - rdata[cur].buffer = InvalidBuffer; - rdata[cur].data = (char*)(ptr->list); - rdata[cur].len = MAXALIGN(sizeof(OffsetNumber)*ptr->block.num); - if ( rdata[cur].len > sizeof(OffsetNumber)*ptr->block.num ) - rdata[cur].data = repalloc( rdata[cur].data, rdata[cur].len ); - rdata[cur-1].next = &(rdata[cur]); - rdata[cur].next=NULL; - cur++; - - ptr=ptr->next; - } + rdata = formSplitRdata(state->r->rd_node, state->stack->blkno, + offs, noffs, state->itup, state->ituplen, + &(state->key), state->path, state->pathlen, dist); START_CRIT_SECTION(); @@ -433,57 +353,36 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) { state->ituplen = tlen; /* now tlen >= 2 */ if ( state->stack->blkno == GIST_ROOT_BLKNO ) { - gistnewroot(state->r, state->itup, state->ituplen, &(state->key), state->xlog_mode); + gistnewroot(state->r, state->itup, state->ituplen, &(state->key)); state->needInsertComplete=false; } - if ( state->xlog_mode ) - LockBuffer(state->stack->buffer, BUFFER_LOCK_UNLOCK); ReleaseBuffer(state->stack->buffer); } else { /* enough space */ OffsetNumber off, l; + bool is_leaf = (GistPageIsLeaf(state->stack->page)) ? true : false; off = (PageIsEmpty(state->stack->page)) ? FirstOffsetNumber : OffsetNumberNext(PageGetMaxOffsetNumber(state->stack->page)); l = gistfillbuffer(state->r, state->stack->page, state->itup, state->ituplen, off); - if ( !state->r->rd_istemp && !state->xlog_mode) { - gistxlogEntryUpdate xlrec; - XLogRecPtr recptr; - XLogRecData *rdata = (XLogRecData*)palloc( sizeof(XLogRecData) * ( state->ituplen + 2 ) ); - int i, cur=0; - - xlrec.node = state->r->rd_node; - xlrec.blkno = state->stack->blkno; - xlrec.todeleteoffnum = ( state->stack->todelete ) ? state->stack->childoffnum : InvalidOffsetNumber; - xlrec.key = state->key; - xlrec.pathlen = (uint16)state->pathlen; - - rdata[0].buffer = InvalidBuffer; - rdata[0].data = (char *) &xlrec; - rdata[0].len = sizeof( gistxlogEntryUpdate ); - rdata[0].next = NULL; - - if ( state->pathlen>=0 ) { - rdata[0].next = &(rdata[1]); - rdata[1].buffer = InvalidBuffer; - rdata[1].data = (char *) (state->path); - rdata[1].len = sizeof( BlockNumber ) * state->pathlen; - rdata[1].next = NULL; - cur++; + if ( !state->r->rd_istemp ) { + OffsetNumber noffs=0, offs[ MAXALIGN( sizeof(OffsetNumber) ) / sizeof(OffsetNumber) ]; + XLogRecPtr recptr; + XLogRecData *rdata; + + if ( state->stack->todelete ) { + offs[0] = state->stack->childoffnum; + noffs=1; } + + rdata = formUpdateRdata(state->r->rd_node, state->stack->blkno, + offs, noffs, false, state->itup, state->ituplen, + &(state->key), state->path, state->pathlen); - for(i=1; i<=state->ituplen; i++) { /* adding tuples */ - rdata[i+cur].buffer = InvalidBuffer; - rdata[i+cur].data = (char*)(state->itup[i-1]); - rdata[i+cur].len = IndexTupleSize(state->itup[i-1]); - rdata[i+cur].next = NULL; - rdata[i-1+cur].next = &(rdata[i+cur]); - } - START_CRIT_SECTION(); recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata); @@ -495,9 +394,6 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) { if ( state->stack->blkno == GIST_ROOT_BLKNO ) state->needInsertComplete=false; - - if ( state->xlog_mode ) - LockBuffer(state->stack->buffer, BUFFER_LOCK_UNLOCK); WriteBuffer(state->stack->buffer); if (state->ituplen > 1) @@ -507,9 +403,14 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) { * parent */ IndexTuple newtup = gistunion(state->r, state->itup, state->ituplen, giststate); - ItemPointerSet(&(newtup->t_tid), state->stack->blkno, FirstOffsetNumber); + ItemPointerSetBlockNumber(&(newtup->t_tid), state->stack->blkno); state->itup[0] = newtup; state->ituplen = 1; + } else if (is_leaf) { + /* itup[0] store key to adjust parent, we set it to valid + to correct check by GistTupleIsInvalid macro in gistgetadjusted() */ + ItemPointerSetBlockNumber(&(state->itup[0]->t_tid), state->stack->blkno); + GistTupleSetValid( state->itup[0] ); } } return is_splitted; @@ -524,13 +425,10 @@ gistfindleaf(GISTInsertState *state, GISTSTATE *giststate) /* walk down */ while( true ) { - GISTPageOpaque opaque; - - state->stack->buffer = ReadBuffer(state->r, state->stack->blkno); + state->stack->buffer = gistReadBuffer(state->r, state->stack->blkno); state->stack->page = (Page) BufferGetPage(state->stack->buffer); - opaque = (GISTPageOpaque) PageGetSpecialPointer(state->stack->page); - - if (!(opaque->flags & F_LEAF)) + + if (!GistPageIsLeaf(state->stack->page)) { /* * This is an internal page, so continue to walk down the @@ -564,7 +462,7 @@ gistfindleaf(GISTInsertState *state, GISTSTATE *giststate) state->pathlen++; ptr=ptr->parent; } - state->path=(BlockNumber*)palloc(sizeof(BlockNumber)*state->pathlen); + state->path=(BlockNumber*)palloc(MAXALIGN(sizeof(BlockNumber)*state->pathlen)); ptr = state->stack; state->pathlen=0; while( ptr ) { @@ -591,7 +489,7 @@ gistmakedeal(GISTInsertState *state, GISTSTATE *giststate) { * then itup contains additional for adjustment of current key */ - is_splitted = gistplacetopage(state, giststate ); + is_splitted = gistplacetopage(state, giststate); /* pop page from stack */ state->stack = state->stack->parent; @@ -623,6 +521,7 @@ gistmakedeal(GISTInsertState *state, GISTSTATE *giststate) { * an insert in a child node. Therefore, remove the old * version of this node's key. */ + gistadjscans(state->r, GISTOP_DEL, state->stack->blkno, state->stack->childoffnum); PageIndexTupleDelete(state->stack->page, state->stack->childoffnum); if ( !state->r->rd_istemp ) @@ -639,42 +538,32 @@ gistmakedeal(GISTInsertState *state, GISTSTATE *giststate) { /* release all buffers */ while( state->stack ) { - if ( state->xlog_mode ) - LockBuffer(state->stack->buffer, BUFFER_LOCK_UNLOCK); ReleaseBuffer(state->stack->buffer); state->stack = state->stack->parent; } /* say to xlog that insert is completed */ - if ( !state->xlog_mode && state->needInsertComplete && !state->r->rd_istemp ) { - gistxlogInsertComplete xlrec; - XLogRecData rdata; - - xlrec.node = state->r->rd_node; - xlrec.key = state->key; - - rdata.buffer = InvalidBuffer; - rdata.data = (char *) &xlrec; - rdata.len = sizeof( gistxlogInsertComplete ); - rdata.next = NULL; - - START_CRIT_SECTION(); + if ( state->needInsertComplete && !state->r->rd_istemp ) + gistxlogInsertCompletion(state->r->rd_node, &(state->key), 1); +} - XLogInsert(RM_GIST_ID, XLOG_GIST_INSERT_COMPLETE, &rdata); +static void +gistToRealOffset(OffsetNumber *arr, int len, OffsetNumber *reasloffset) { + int i; - END_CRIT_SECTION(); - } + for(i=0;i<len;i++) + arr[i] = reasloffset[ arr[i] ]; } /* * gistSplit -- split a page in the tree. */ -static IndexTuple * +IndexTuple * gistSplit(Relation r, Buffer buffer, IndexTuple *itup, /* contains compressed entry */ int *len, - PageLayout **dist, + SplitedPageLayout **dist, GISTSTATE *giststate) { Page p; @@ -690,8 +579,11 @@ gistSplit(Relation r, GISTPageOpaque opaque; GIST_SPLITVEC v; GistEntryVector *entryvec; - int i, + int i, fakeoffset, nlen; + OffsetNumber *realoffset; + IndexTuple *cleaneditup = itup; + int lencleaneditup = *len; p = (Page) BufferGetPage(buffer); opaque = (GISTPageOpaque) PageGetSpecialPointer(p); @@ -703,8 +595,8 @@ gistSplit(Relation r, */ if (BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO) { - leftbuf = ReadBuffer(r, P_NEW); - GISTInitBuffer(leftbuf, opaque->flags); + leftbuf = gistReadBuffer(r, P_NEW); + GISTInitBuffer(leftbuf, opaque->flags&F_LEAF); lbknum = BufferGetBlockNumber(leftbuf); left = (Page) BufferGetPage(leftbuf); } @@ -716,74 +608,99 @@ gistSplit(Relation r, left = (Page) PageGetTempPage(p, sizeof(GISTPageOpaqueData)); } - rightbuf = ReadBuffer(r, P_NEW); - GISTInitBuffer(rightbuf, opaque->flags); + rightbuf = gistReadBuffer(r, P_NEW); + GISTInitBuffer(rightbuf, opaque->flags&F_LEAF); rbknum = BufferGetBlockNumber(rightbuf); right = (Page) BufferGetPage(rightbuf); /* generate the item array */ + realoffset = palloc((*len + 1) * sizeof(OffsetNumber)); entryvec = palloc(GEVHDRSZ + (*len + 1) * sizeof(GISTENTRY)); entryvec->n = *len + 1; + fakeoffset = FirstOffsetNumber; for (i = 1; i <= *len; i++) { Datum datum; bool IsNull; + if (!GistPageIsLeaf(p) && GistTupleIsInvalid( itup[i - 1] )) { + entryvec->n--; + /* remember position of invalid tuple */ + realoffset[ entryvec->n ] = i; + continue; + } + datum = index_getattr(itup[i - 1], 1, giststate->tupdesc, &IsNull); - gistdentryinit(giststate, 0, &(entryvec->vector[i]), + gistdentryinit(giststate, 0, &(entryvec->vector[fakeoffset]), datum, r, p, i, ATTSIZE(datum, giststate->tupdesc, 1, IsNull), FALSE, IsNull); + realoffset[ fakeoffset ] = i; + fakeoffset++; } - /* - * now let the user-defined picksplit function set up the split - * vector; in entryvec have no null value!! - */ - FunctionCall2(&giststate->picksplitFn[0], - PointerGetDatum(entryvec), - PointerGetDatum(&v)); - - /* compatibility with old code */ - if (v.spl_left[v.spl_nleft - 1] == InvalidOffsetNumber) - v.spl_left[v.spl_nleft - 1] = (OffsetNumber) *len; - if (v.spl_right[v.spl_nright - 1] == InvalidOffsetNumber) - v.spl_right[v.spl_nright - 1] = (OffsetNumber) *len; - - v.spl_lattr[0] = v.spl_ldatum; - v.spl_rattr[0] = v.spl_rdatum; - v.spl_lisnull[0] = false; - v.spl_risnull[0] = false; - - /* - * if index is multikey, then we must to try get smaller bounding box - * for subkey(s) - */ - if (r->rd_att->natts > 1) - { - int MaxGrpId; - - v.spl_idgrp = (int *) palloc0(sizeof(int) * (*len + 1)); - v.spl_grpflag = (char *) palloc0(sizeof(char) * (*len + 1)); - v.spl_ngrp = (int *) palloc(sizeof(int) * (*len + 1)); - - MaxGrpId = gistfindgroup(giststate, entryvec->vector, &v); - - /* form union of sub keys for each page (l,p) */ - gistunionsubkey(r, giststate, itup, &v); - - /* - * if possible, we insert equivalent tuples with control by - * penalty for a subkey(s) - */ - if (MaxGrpId > 1) - gistadjsubkey(r, itup, len, &v, giststate); + /* + * if it was invalid tuple then we need special processing. If + * it's possible, we move all invalid tuples on right page. + * We should remember, that union with invalid tuples + * is a invalid tuple. + */ + if ( entryvec->n != *len + 1 ) { + lencleaneditup = entryvec->n-1; + cleaneditup = (IndexTuple*)palloc(lencleaneditup * sizeof(IndexTuple)); + for(i=1;i<entryvec->n;i++) + cleaneditup[i-1] = itup[ realoffset[ i ]-1 ]; + + if ( gistnospace( left, cleaneditup, lencleaneditup ) ) { + /* no space on left to put all good tuples, so picksplit */ + gistUserPicksplit(r, entryvec, &v, cleaneditup, lencleaneditup, giststate); + v.spl_leftvalid = true; + v.spl_rightvalid = false; + gistToRealOffset( v.spl_left, v.spl_nleft, realoffset ); + gistToRealOffset( v.spl_right, v.spl_nright, realoffset ); + } else { + /* we can try to store all valid tuples on one page */ + v.spl_right = (OffsetNumber*)palloc( entryvec->n * sizeof(OffsetNumber) ); + v.spl_left = (OffsetNumber*)palloc( entryvec->n * sizeof(OffsetNumber) ); + + if ( lencleaneditup==0 ) { + /* all tuples are invalid, so moves half of its to right */ + v.spl_leftvalid = v.spl_rightvalid = false; + v.spl_nright = 0; + v.spl_nleft = 0; + for(i=1;i<=*len;i++) + if ( i-1<*len/2 ) + v.spl_left[ v.spl_nleft++ ] = i; + else + v.spl_right[ v.spl_nright++ ] = i; + } else { + /* we will not call gistUserPicksplit, just put good + tuples on left and invalid on right */ + v.spl_nleft = lencleaneditup; + v.spl_nright = 0; + for(i=1;i<entryvec->n;i++) + v.spl_left[i-1] = i; + gistToRealOffset( v.spl_left, v.spl_nleft, realoffset ); + v.spl_lattr[0] = v.spl_ldatum = (Datum)0; + v.spl_rattr[0] = v.spl_rdatum = (Datum)0; + v.spl_lisnull[0] = true; + v.spl_risnull[0] = true; + gistunionsubkey(r, giststate, itup, &v, true); + v.spl_leftvalid = true; + v.spl_rightvalid = false; + } + } + } else { + /* there is no invalid tuples, so usial processing */ + gistUserPicksplit(r, entryvec, &v, itup, *len, giststate); + v.spl_leftvalid = v.spl_rightvalid = true; } + /* form left and right vector */ - lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * v.spl_nleft); - rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * v.spl_nright); + lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (*len+1)); + rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (*len+1)); for (i = 0; i < v.spl_nleft; i++) lvectup[i] = itup[v.spl_left[i] - 1]; @@ -791,12 +708,16 @@ gistSplit(Relation r, for (i = 0; i < v.spl_nright; i++) rvectup[i] = itup[v.spl_right[i] - 1]; + /* place invalid tuples on right page if itsn't done yet */ + for (fakeoffset = entryvec->n; fakeoffset < *len+1 && lencleaneditup; fakeoffset++) { + rvectup[v.spl_nright++] = itup[realoffset[fakeoffset] - 1]; + } /* write on disk (may need another split) */ if (gistnospace(right, rvectup, v.spl_nright)) { int i; - PageLayout *d, *origd=*dist; + SplitedPageLayout *d, *origd=*dist; nlen = v.spl_nright; newtup = gistSplit(r, rightbuf, rvectup, &nlen, dist, giststate); @@ -824,8 +745,9 @@ gistSplit(Relation r, nlen = 1; newtup = (IndexTuple *) palloc(sizeof(IndexTuple) * 1); - newtup[0] = gistFormTuple(giststate, r, v.spl_rattr, v.spl_rattrsize, v.spl_risnull); - ItemPointerSet(&(newtup[0]->t_tid), rbknum, FirstOffsetNumber); + newtup[0] = ( v.spl_rightvalid ) ? gistFormTuple(giststate, r, v.spl_rattr, v.spl_rattrsize, v.spl_risnull) + : gist_form_invalid_tuple( rbknum ); + ItemPointerSetBlockNumber(&(newtup[0]->t_tid), rbknum); } if (gistnospace(left, lvectup, v.spl_nleft)) @@ -833,7 +755,7 @@ gistSplit(Relation r, int llen = v.spl_nleft; IndexTuple *lntup; int i; - PageLayout *d, *origd=*dist; + SplitedPageLayout *d, *origd=*dist; lntup = gistSplit(r, leftbuf, lvectup, &llen, dist, giststate); @@ -867,49 +789,35 @@ gistSplit(Relation r, nlen += 1; newtup = (IndexTuple *) repalloc(newtup, sizeof(IndexTuple) * nlen); - newtup[nlen - 1] = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lattrsize, v.spl_lisnull); - ItemPointerSet(&(newtup[nlen - 1]->t_tid), lbknum, FirstOffsetNumber); + newtup[nlen - 1] = ( v.spl_leftvalid ) ? gistFormTuple(giststate, r, v.spl_lattr, v.spl_lattrsize, v.spl_lisnull) + : gist_form_invalid_tuple( lbknum ); + ItemPointerSetBlockNumber(&(newtup[nlen - 1]->t_tid), lbknum); } + GistClearTuplesDeleted(p); + *len = nlen; return newtup; } void -gistnewroot(Relation r, IndexTuple *itup, int len, ItemPointer key, bool xlog_mode) +gistnewroot(Relation r, IndexTuple *itup, int len, ItemPointer key) { Buffer buffer; Page page; - buffer = (xlog_mode) ? XLogReadBuffer(false, r, GIST_ROOT_BLKNO) : ReadBuffer(r, GIST_ROOT_BLKNO); + buffer = gistReadBuffer(r, GIST_ROOT_BLKNO); GISTInitBuffer(buffer, 0); page = BufferGetPage(buffer); gistfillbuffer(r, page, itup, len, FirstOffsetNumber); - if ( !xlog_mode && !r->rd_istemp ) { - gistxlogEntryUpdate xlrec; + if ( !r->rd_istemp ) { XLogRecPtr recptr; - XLogRecData *rdata = (XLogRecData*)palloc( sizeof(XLogRecData) * ( len + 1 ) ); - int i; + XLogRecData *rdata; - xlrec.node = r->rd_node; - xlrec.blkno = GIST_ROOT_BLKNO; - xlrec.todeleteoffnum = InvalidOffsetNumber; - xlrec.key = *key; - xlrec.pathlen=0; - - rdata[0].buffer = InvalidBuffer; - rdata[0].data = (char *) &xlrec; - rdata[0].len = sizeof( gistxlogEntryUpdate ); - rdata[0].next = NULL; - - for(i=1; i<=len; i++) { - rdata[i].buffer = InvalidBuffer; - rdata[i].data = (char*)(itup[i-1]); - rdata[i].len = IndexTupleSize(itup[i-1]); - rdata[i].next = NULL; - rdata[i-1].next = &(rdata[i]); - } + rdata = formUpdateRdata(r->rd_node, GIST_ROOT_BLKNO, + NULL, 0, false, itup, len, + key, NULL, 0); START_CRIT_SECTION(); @@ -919,118 +827,9 @@ gistnewroot(Relation r, IndexTuple *itup, int len, ItemPointer key, bool xlog_mo END_CRIT_SECTION(); } - if ( xlog_mode ) - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); WriteBuffer(buffer); } - -/* - * Bulk deletion of all index entries pointing to a set of heap tuples. - * The set of target tuples is specified via a callback routine that tells - * whether any given heap tuple (identified by ItemPointer) is being deleted. - * - * Result: a palloc'd struct containing statistical info for VACUUM displays. - */ -Datum -gistbulkdelete(PG_FUNCTION_ARGS) -{ - Relation rel = (Relation) PG_GETARG_POINTER(0); - IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(1); - void *callback_state = (void *) PG_GETARG_POINTER(2); - IndexBulkDeleteResult *result; - BlockNumber num_pages; - double tuples_removed; - double num_index_tuples; - IndexScanDesc iscan; - - tuples_removed = 0; - num_index_tuples = 0; - - /* - * Since GIST is not marked "amconcurrent" in pg_am, caller should - * have acquired exclusive lock on index relation. We need no locking - * here. - */ - - /* - * XXX generic implementation --- should be improved! - */ - - /* walk through the entire index */ - iscan = index_beginscan(NULL, rel, SnapshotAny, 0, NULL); - /* including killed tuples */ - iscan->ignore_killed_tuples = false; - - while (index_getnext_indexitem(iscan, ForwardScanDirection)) - { - vacuum_delay_point(); - - if (callback(&iscan->xs_ctup.t_self, callback_state)) - { - ItemPointerData indextup = iscan->currentItemData; - BlockNumber blkno; - OffsetNumber offnum; - Buffer buf; - Page page; - - blkno = ItemPointerGetBlockNumber(&indextup); - offnum = ItemPointerGetOffsetNumber(&indextup); - - /* adjust any scans that will be affected by this deletion */ - gistadjscans(rel, GISTOP_DEL, blkno, offnum); - - /* delete the index tuple */ - buf = ReadBuffer(rel, blkno); - page = BufferGetPage(buf); - - PageIndexTupleDelete(page, offnum); - if ( !rel->rd_istemp ) { - gistxlogEntryUpdate xlrec; - XLogRecPtr recptr; - XLogRecData rdata; - - xlrec.node = rel->rd_node; - xlrec.blkno = blkno; - xlrec.todeleteoffnum = offnum; - xlrec.pathlen=0; - ItemPointerSetInvalid( &(xlrec.key) ); - - rdata.buffer = InvalidBuffer; - rdata.data = (char *) &xlrec; - rdata.len = sizeof( gistxlogEntryUpdate ); - rdata.next = NULL; - - START_CRIT_SECTION(); - - recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_DELETE, &rdata); - PageSetLSN(page, recptr); - PageSetTLI(page, ThisTimeLineID); - - END_CRIT_SECTION(); - } - - WriteBuffer(buf); - - tuples_removed += 1; - } - else - num_index_tuples += 1; - } - - index_endscan(iscan); - - /* return statistics */ - num_pages = RelationGetNumberOfBlocks(rel); - - result = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult)); - result->num_pages = num_pages; - result->num_index_tuples = num_index_tuples; - result->tuples_removed = tuples_removed; - - PG_RETURN_POINTER(result); -} - void initGISTstate(GISTSTATE *giststate, Relation index) { @@ -1074,49 +873,3 @@ freeGISTstate(GISTSTATE *giststate) /* no work */ } -#ifdef GISTDEBUG -static void -gist_dumptree(Relation r, int level, BlockNumber blk, OffsetNumber coff) -{ - Buffer buffer; - Page page; - GISTPageOpaque opaque; - IndexTuple which; - ItemId iid; - OffsetNumber i, - maxoff; - BlockNumber cblk; - char *pred; - - pred = (char *) palloc(sizeof(char) * level + 1); - MemSet(pred, '\t', level); - pred[level] = '\0'; - - buffer = ReadBuffer(r, blk); - page = (Page) BufferGetPage(buffer); - opaque = (GISTPageOpaque) PageGetSpecialPointer(page); - - maxoff = PageGetMaxOffsetNumber(page); - - elog(DEBUG4, "%sPage: %d %s blk: %d maxoff: %d free: %d", pred, - coff, (opaque->flags & F_LEAF) ? "LEAF" : "INTE", (int) blk, - (int) maxoff, PageGetFreeSpace(page)); - - for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) - { - iid = PageGetItemId(page, i); - which = (IndexTuple) PageGetItem(page, iid); - cblk = ItemPointerGetBlockNumber(&(which->t_tid)); -#ifdef PRINTTUPLE - elog(DEBUG4, "%s Tuple. blk: %d size: %d", pred, (int) cblk, - IndexTupleSize(which)); -#endif - - if (!(opaque->flags & F_LEAF)) - gist_dumptree(r, level + 1, cblk, i); - } - ReleaseBuffer(buffer); - pfree(pred); -} -#endif /* defined GISTDEBUG */ - |