diff options
Diffstat (limited to 'src/backend/access/gist/gist.c')
-rw-r--r-- | src/backend/access/gist/gist.c | 1103 |
1 files changed, 485 insertions, 618 deletions
diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c index 7b30aa72a9..969b3309f7 100644 --- a/src/backend/access/gist/gist.c +++ b/src/backend/access/gist/gist.c @@ -6,7 +6,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/gist/gist.c,v 1.67 2000/11/30 08:46:20 vadim Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/gist/gist.c,v 1.68 2001/01/12 00:12:58 scrappy Exp $ * *------------------------------------------------------------------------- */ @@ -25,37 +25,61 @@ #include "access/xlogutils.h" +/* result's status */ +#define INSERTED 0x01 +#define SPLITED 0x02 + /* non-export function prototypes */ -static InsertIndexResult gistdoinsert(Relation r, IndexTuple itup, - GISTSTATE *GISTstate); -static InsertIndexResult gistentryinsert(Relation r, GISTSTACK *stk, - IndexTuple tup, - GISTSTATE *giststate); -static void gistentryinserttwo(Relation r, GISTSTACK *stk, IndexTuple ltup, - IndexTuple rtup, GISTSTATE *giststate); -static void gistAdjustKeys(Relation r, GISTSTACK *stk, BlockNumber blk, - char *datum, int att_size, GISTSTATE *giststate); -static void gistintinsert(Relation r, GISTSTACK *stk, IndexTuple ltup, - IndexTuple rtup, GISTSTATE *giststate); -static InsertIndexResult gistSplit(Relation r, Buffer buffer, - GISTSTACK *stack, IndexTuple itup, - GISTSTATE *giststate); -static void gistnewroot(GISTSTATE *giststate, Relation r, IndexTuple lt, - IndexTuple rt); +static void gistdoinsert(Relation r, + IndexTuple itup, + InsertIndexResult *res, + GISTSTATE *GISTstate); +static int gistlayerinsert( Relation r, BlockNumber blkno, + IndexTuple **itup, + int *len, + InsertIndexResult *res, + GISTSTATE *giststate ); +static OffsetNumber gistwritebuffer( Relation r, + Page page, + IndexTuple *itup, + int len, + OffsetNumber off, + GISTSTATE *giststate ); +static int gistnospace( Page page, + IndexTuple *itvec, int len ); +static IndexTuple * gistreadbuffer( Relation r, + Buffer buffer, int *len ); +static IndexTuple * gistjoinvector( + IndexTuple *itvec, int *len, + IndexTuple *additvec, int addlen ); +static IndexTuple gistunion( Relation r, IndexTuple *itvec, + int len, GISTSTATE *giststate ); +static IndexTuple gistgetadjusted( Relation r, + IndexTuple oldtup, + IndexTuple addtup, + GISTSTATE *giststate ); +static IndexTuple * gistSplit(Relation r, + Buffer buffer, + IndexTuple *itup, + int *len, + GISTSTATE *giststate, + InsertIndexResult *res); +static void gistnewroot(GISTSTATE *giststate, Relation r, + IndexTuple *itup, int len); static void GISTInitBuffer(Buffer b, uint32 f); -static BlockNumber gistChooseSubtree(Relation r, IndexTuple itup, int level, - GISTSTATE *giststate, - GISTSTACK **retstack, Buffer *leafbuf); -static OffsetNumber gistchoose(Relation r, Page p, IndexTuple it, - GISTSTATE *giststate); -static int gistnospace(Page p, IndexTuple it); -static IndexTuple gist_tuple_replacekey(Relation r, GISTENTRY entry, IndexTuple t); -static void gistcentryinit(GISTSTATE *giststate, GISTENTRY *e, char *pr, - Relation r, Page pg, OffsetNumber o, int b, bool l); - +static OffsetNumber gistchoose(Relation r, Page p, + IndexTuple it, + GISTSTATE *giststate); +static IndexTuple gist_tuple_replacekey(Relation r, + GISTENTRY entry, IndexTuple t); +static void gistcentryinit(GISTSTATE *giststate, + GISTENTRY *e, char *pr, + Relation r, Page pg, + OffsetNumber o, int b, bool l); + +#undef GISTDEBUG #ifdef GISTDEBUG -static char *int_range_out(INTRANGE *r); - +static void gist_dumptree(Relation r, int level, BlockNumber blk, OffsetNumber coff); #endif /* @@ -86,7 +110,6 @@ gistbuild(PG_FUNCTION_ARGS) TupleTableSlot *slot; #endif ExprContext *econtext; - InsertIndexResult res = NULL; GISTSTATE giststate; GISTENTRY tmpcentry; Buffer buffer = InvalidBuffer; @@ -223,14 +246,13 @@ gistbuild(PG_FUNCTION_ARGS) * not when you're initializing the whole index at once. */ - res = gistdoinsert(index, itup, &giststate); + gistdoinsert(index, itup, NULL, &giststate); for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++) if (compvec[i]) pfree(DatumGetPointer(attdata[i])); pfree(itup); - pfree(res); } /* okay, all heap tuples are indexed */ @@ -274,6 +296,10 @@ gistbuild(PG_FUNCTION_ARGS) } } +#ifdef GISTDEBUG +gist_dumptree(index, 0, GISTP_ROOT, 0); +#endif + PG_RETURN_VOID(); } @@ -324,7 +350,8 @@ gistinsert(PG_FUNCTION_ARGS) * RelationSetLockForWrite(r); */ - res = gistdoinsert(r, itup, &giststate); + res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData)); + gistdoinsert(r, itup, &res, &giststate); for (i = 0; i < r->rd_att->natts; i++) if (compvec[i] == TRUE) pfree((char *) datum[i]); @@ -353,6 +380,7 @@ gistPageAddItem(GISTSTATE *giststate, { GISTENTRY tmpcentry; IndexTuple itup = (IndexTuple) item; + OffsetNumber retval; /* * recompress the item given that we now know the exact page and @@ -364,295 +392,353 @@ gistPageAddItem(GISTSTATE *giststate, IndexTupleSize(itup) - sizeof(IndexTupleData), FALSE); gistcentryinit(giststate, &tmpcentry, dentry->pred, r, page, offsetNumber, dentry->bytes, FALSE); - *newtup = gist_tuple_replacekey(r, *dentry, itup); + *newtup = gist_tuple_replacekey(r, tmpcentry, itup); + retval = PageAddItem(page, (Item) *newtup, IndexTupleSize(*newtup), + offsetNumber, flags); /* be tidy */ - if (tmpcentry.pred != dentry->pred + if (tmpcentry.pred && tmpcentry.pred != dentry->pred && tmpcentry.pred != (((char *) itup) + sizeof(IndexTupleData))) pfree(tmpcentry.pred); - - return (PageAddItem(page, (Item) *newtup, IndexTupleSize(*newtup), - offsetNumber, flags)); + return (retval); } +static void +gistdoinsert( Relation r, + IndexTuple itup, + InsertIndexResult *res, + GISTSTATE *giststate ) { + IndexTuple *instup; + int i,ret,len = 1; + + instup = ( IndexTuple* ) palloc( sizeof(IndexTuple) ); + instup[0] = ( IndexTuple ) palloc( IndexTupleSize( itup ) ); + memcpy( instup[0], itup, IndexTupleSize( itup ) ); + + ret = gistlayerinsert(r, GISTP_ROOT, &instup, &len, res, giststate); + if ( ret & SPLITED ) + gistnewroot( giststate, r, instup, len ); -static InsertIndexResult -gistdoinsert(Relation r, - IndexTuple itup, /* itup contains compressed entry */ - GISTSTATE *giststate) -{ - GISTENTRY tmpdentry; - InsertIndexResult res; - OffsetNumber l; - GISTSTACK *stack; - Buffer buffer; - BlockNumber blk; - Page page; - OffsetNumber off; - IndexTuple newtup; + for(i=0;i<len;i++) + pfree( instup[i] ); + pfree( instup ); +} - /* 3rd arg is ignored for now */ - blk = gistChooseSubtree(r, itup, 0, giststate, &stack, &buffer); - page = (Page) BufferGetPage(buffer); +static int +gistlayerinsert( Relation r, BlockNumber blkno, + IndexTuple **itup, /* in - out, has compressed entry */ + int *len , /* in - out */ + InsertIndexResult *res, /* out */ + GISTSTATE *giststate ) { + Buffer buffer; + Page page; + OffsetNumber child; + int ret; + GISTPageOpaque opaque; - if (gistnospace(page, itup)) - { - /* need to do a split */ - res = gistSplit(r, buffer, stack, itup, giststate); - gistfreestack(stack); - WriteBuffer(buffer); /* don't forget to release buffer! */ - return res; - } + buffer = ReadBuffer(r, blkno); + page = (Page) BufferGetPage(buffer); + opaque = (GISTPageOpaque) PageGetSpecialPointer(page); - if (PageIsEmpty(page)) - off = FirstOffsetNumber; - else - off = OffsetNumberNext(PageGetMaxOffsetNumber(page)); + if (!(opaque->flags & F_LEAF)) { + /* internal page, so we must walk on tree */ + /* len IS equial 1 */ + ItemId iid; + BlockNumber nblkno; + ItemPointerData oldtid; + IndexTuple oldtup; + + child = gistchoose( r, page, *(*itup), giststate ); + iid = PageGetItemId(page, child); + oldtup = (IndexTuple) PageGetItem(page, iid); + nblkno = ItemPointerGetBlockNumber(&(oldtup->t_tid)); + + /* + * After this call: + * 1. if child page was splited, then itup contains + * keys for each page + * 2. if child page wasn't splited, then itup contains + * additional for adjustement of current key + */ + ret = gistlayerinsert( r, nblkno, itup, len, res, giststate ); - /* add the item and write the buffer */ - l = gistPageAddItem(giststate, r, page, (Item) itup, IndexTupleSize(itup), - off, LP_USED, &tmpdentry, &newtup); - WriteBuffer(buffer); + /* nothing inserted in child */ + if ( ! (ret & INSERTED) ) { + ReleaseBuffer(buffer); + return 0x00; + } - /* now expand the page boundary in the parent to include the new child */ - gistAdjustKeys(r, stack, blk, tmpdentry.pred, tmpdentry.bytes, giststate); - gistfreestack(stack); + /* child does not splited */ + if ( ! (ret & SPLITED) ) { + IndexTuple newtup = gistgetadjusted( r, oldtup, (*itup)[0], giststate ); + if ( ! newtup ) { + /* not need to update key */ + ReleaseBuffer(buffer); + return 0x00; + } - /* be tidy */ - if (itup != newtup) - pfree(newtup); - if (tmpdentry.pred != (((char *) itup) + sizeof(IndexTupleData))) - pfree(tmpdentry.pred); + pfree( (*itup)[0] ); /* !!! */ + (*itup)[0] = newtup; + } - /* build and return an InsertIndexResult for this insertion */ - res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData)); - ItemPointerSet(&(res->pointerData), blk, l); + /* key is modified, so old version must be deleted */ + ItemPointerSet(&oldtid, blkno, child); + DirectFunctionCall2(gistdelete, + PointerGetDatum(r), + PointerGetDatum(&oldtid)); + } - return res; -} + ret = INSERTED; + + if ( gistnospace(page, (*itup), *len) ) { + /* no space for insertion */ + IndexTuple *itvec; + int tlen; + + ret |= SPLITED; + itvec = gistreadbuffer( r, buffer, &tlen ); + itvec = gistjoinvector( itvec, &tlen, (*itup), *len ); + pfree( (*itup) ); + (*itup) = gistSplit( r, buffer, itvec, &tlen, giststate, + (opaque->flags & F_LEAF) ? res : NULL ); /*res only for inserting in leaf*/ + ReleaseBuffer( buffer ); + pfree( itvec ); + *len = tlen; /* now tlen >= 2 */ + } else { + /* enogth space */ + OffsetNumber off, l; + + off = ( PageIsEmpty(page) ) ? + FirstOffsetNumber + : + OffsetNumberNext(PageGetMaxOffsetNumber(page)); + l = gistwritebuffer( r, page, (*itup), *len, off, giststate ); + WriteBuffer(buffer); + /* set res if insert into leaf page, in + this case, len = 1 always */ + if ( res && (opaque->flags & F_LEAF) ) + ItemPointerSet(&((*res)->pointerData), blkno, l); + + if ( *len > 1 ) { /* previos insert ret & SPLITED != 0 */ + int i; + /* child was splited, so we must form union + * for insertion in parent */ + IndexTuple newtup = gistunion(r, (*itup), *len, giststate); + for(i=0; i<*len; i++) + pfree( (*itup)[i] ); + (*itup)[0] = newtup; + *len = 1; + } + } + + return ret; +} -static BlockNumber -gistChooseSubtree(Relation r, IndexTuple itup, /* itup has compressed - * entry */ - int level, - GISTSTATE *giststate, - GISTSTACK **retstack /* out */ , - Buffer *leafbuf /* out */ ) -{ - Buffer buffer; - BlockNumber blk; - GISTSTACK *stack; - Page page; - GISTPageOpaque opaque; - IndexTuple which; +/* + * Write itup vector to page, has no control of free space + */ +static OffsetNumber +gistwritebuffer( Relation r, Page page, IndexTuple *itup, + int len, OffsetNumber off, GISTSTATE *giststate) { + OffsetNumber l = InvalidOffsetNumber; + int i; + GISTENTRY tmpdentry; + IndexTuple newtup; + + for(i=0; i<len; i++) { + l = gistPageAddItem(giststate, r, page, + (Item) itup[i], IndexTupleSize(itup[i]), + off, LP_USED, &tmpdentry, &newtup); + off = OffsetNumberNext( off ); + if (tmpdentry.pred != (((char *) itup[i]) + sizeof(IndexTupleData)) && tmpdentry.pred) + pfree(tmpdentry.pred); + if (itup[i] != newtup) + pfree(newtup); + } + return l; +} - blk = GISTP_ROOT; - buffer = InvalidBuffer; - stack = (GISTSTACK *) NULL; +/* + * Check space for itup vector on page + */ +static int +gistnospace( Page page, IndexTuple *itvec, int len ) { + int size = 0; + int i; + for(i=0; i<len; i++) + size += IndexTupleSize( itvec[i] )+4; /* ??? */ - do - { - /* let go of current buffer before getting next */ - if (buffer != InvalidBuffer) - ReleaseBuffer(buffer); + return (PageGetFreeSpace(page) < size); +} - /* get next buffer */ - buffer = ReadBuffer(r, blk); - page = (Page) BufferGetPage(buffer); +/* + * Read buffer into itup vector + */ +static IndexTuple * +gistreadbuffer( Relation r, Buffer buffer, int *len /*out*/) { + OffsetNumber i, maxoff; + IndexTuple *itvec; + Page p = (Page) BufferGetPage(buffer); - opaque = (GISTPageOpaque) PageGetSpecialPointer(page); - if (!(opaque->flags & F_LEAF)) - { - GISTSTACK *n; - ItemId iid; - - n = (GISTSTACK *) palloc(sizeof(GISTSTACK)); - n->gs_parent = stack; - n->gs_blk = blk; - n->gs_child = gistchoose(r, page, itup, giststate); - stack = n; - - iid = PageGetItemId(page, n->gs_child); - which = (IndexTuple) PageGetItem(page, iid); - blk = ItemPointerGetBlockNumber(&(which->t_tid)); - } - } while (!(opaque->flags & F_LEAF)); + *len=0; + maxoff = PageGetMaxOffsetNumber(p); + itvec = palloc( sizeof(IndexTuple) * maxoff ); + for(i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) + itvec[ (*len)++ ] = (IndexTuple) PageGetItem(p, PageGetItemId(p, i)); - *retstack = stack; - *leafbuf = buffer; + return itvec; +} - return blk; +/* + * join two vectors into one + */ +static IndexTuple * +gistjoinvector( IndexTuple *itvec, int *len, IndexTuple *additvec, int addlen ) { + itvec = (IndexTuple*) repalloc( (void*)itvec, sizeof(IndexTuple) * ( (*len) + addlen ) ); + memmove( &itvec[*len], additvec, sizeof(IndexTuple) * addlen ); + *len += addlen; + return itvec; } +/* + * return union of itup vector + */ +static IndexTuple +gistunion( Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate ) { + bytea *evec; + char *datum; + int datumsize, i; + GISTENTRY centry; + char isnull; + IndexTuple newtup; + + evec = (bytea *) palloc(len * sizeof(GISTENTRY) + VARHDRSZ); + VARATT_SIZEP(evec) = len * sizeof(GISTENTRY) + VARHDRSZ; + + for ( i = 0 ; i< len ; i++ ) + gistdentryinit(giststate, &((GISTENTRY *) VARDATA(evec))[i], + (char*) itvec[i] + sizeof(IndexTupleData), + (Relation)NULL, (Page)NULL, (OffsetNumber)NULL, + IndexTupleSize((IndexTuple)itvec[i]) - sizeof(IndexTupleData), FALSE); -static void -gistAdjustKeys(Relation r, - GISTSTACK *stk, - BlockNumber blk, - char *datum, /* datum is uncompressed */ - int att_size, - GISTSTATE *giststate) -{ - char *oldud; - Page p; - Buffer b; - bool result; - bytea *evec; - GISTENTRY centry, - *ev0p, - *ev1p; - int size, - datumsize; - IndexTuple tid; + datum = (char *) + DatumGetPointer(FunctionCall2(&giststate->unionFn, + PointerGetDatum(evec), + PointerGetDatum(&datumsize))); - if (stk == (GISTSTACK *) NULL) - return; + for ( i = 0 ; i< len ; i++ ) + if ( ((GISTENTRY *) VARDATA(evec))[i].pred && + ((GISTENTRY *) VARDATA(evec))[i].pred != + ((char*)( itvec[i] )+ sizeof(IndexTupleData)) ) + pfree( ((GISTENTRY *) VARDATA(evec))[i].pred ); + + pfree( evec ); - b = ReadBuffer(r, stk->gs_blk); - p = BufferGetPage(b); + gistcentryinit(giststate, ¢ry, datum, + (Relation)NULL, (Page)NULL, (OffsetNumber)NULL, + datumsize, FALSE); + + isnull = (centry.pred) ? ' ' : 'n'; + newtup = (IndexTuple) index_formtuple( r->rd_att, (Datum *) ¢ry.pred, &isnull ); + if (centry.pred != datum) + pfree( datum ); - oldud = (char *) PageGetItem(p, PageGetItemId(p, stk->gs_child)); - tid = (IndexTuple) oldud; - size = IndexTupleSize((IndexTuple) oldud) - sizeof(IndexTupleData); - oldud += sizeof(IndexTupleData); + return newtup; +} +/* + * Forms union of oldtup and addtup, if union == oldtup then return NULL + */ +static IndexTuple +gistgetadjusted( Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *giststate ) { + bytea *evec; + char *datum; + int datumsize; + bool result; + char isnull; + GISTENTRY centry, *ev0p, *ev1p; + IndexTuple newtup = NULL; + evec = (bytea *) palloc(2 * sizeof(GISTENTRY) + VARHDRSZ); VARATT_SIZEP(evec) = 2 * sizeof(GISTENTRY) + VARHDRSZ; - /* insert decompressed oldud into entry vector */ gistdentryinit(giststate, &((GISTENTRY *) VARDATA(evec))[0], - oldud, r, p, stk->gs_child, - size, FALSE); + (char*) oldtup + sizeof(IndexTupleData), (Relation) NULL, + (Page) NULL, (OffsetNumber) 0, + IndexTupleSize((IndexTuple)oldtup) - sizeof(IndexTupleData), FALSE); ev0p = &((GISTENTRY *) VARDATA(evec))[0]; - /* insert datum entry into entry vector */ - gistentryinit(((GISTENTRY *) VARDATA(evec))[1], datum, - (Relation) NULL, (Page) NULL, (OffsetNumber) 0, att_size, FALSE); + gistdentryinit(giststate, &((GISTENTRY *) VARDATA(evec))[1], + (char*) addtup + sizeof(IndexTupleData), (Relation) NULL, + (Page) NULL, (OffsetNumber) 0, + IndexTupleSize((IndexTuple)addtup) - sizeof(IndexTupleData), FALSE); ev1p = &((GISTENTRY *) VARDATA(evec))[1]; - /* form union of decompressed entries */ datum = (char *) DatumGetPointer(FunctionCall2(&giststate->unionFn, - PointerGetDatum(evec), - PointerGetDatum(&datumsize))); - - /* did union leave decompressed version of oldud unchanged? */ - FunctionCall3(&giststate->equalFn, - PointerGetDatum(ev0p->pred), - PointerGetDatum(datum), - PointerGetDatum(&result)); - if (!result) - { - TupleDesc td = RelationGetDescr(r); + PointerGetDatum(evec), + PointerGetDatum(&datumsize))); + + if ( ! ( ev0p->pred && ev1p->pred ) ) { + result = ( ev0p->pred == NULL && ev1p->pred == NULL ); + } else { + FunctionCall3(&giststate->equalFn, + PointerGetDatum(ev0p->pred), + PointerGetDatum(datum), + PointerGetDatum(&result)); + } - /* compress datum for storage on page */ + if ( result ) { + /* not need to update key */ + pfree( datum ); + } else { gistcentryinit(giststate, ¢ry, datum, ev0p->rel, ev0p->page, - ev0p->offset, datumsize, FALSE); - if (td->attrs[0]->attlen >= 0) - { - memmove(oldud, centry.pred, att_size); - gistAdjustKeys(r, stk->gs_parent, stk->gs_blk, datum, att_size, - giststate); - } - else if (VARSIZE(centry.pred) == VARSIZE(oldud)) - { - memmove(oldud, centry.pred, VARSIZE(centry.pred)); - gistAdjustKeys(r, stk->gs_parent, stk->gs_blk, datum, att_size, - giststate); - } - else - { - - /* - * * new datum is not the same size as the old. * We have to - * delete the old entry and insert the new * one. Note that - * this may cause a split here! - */ - IndexTuple newtup; - ItemPointerData oldtid; - char *isnull; - TupleDesc tupDesc; - InsertIndexResult res; - - /* delete old tuple */ - ItemPointerSet(&oldtid, stk->gs_blk, stk->gs_child); - DirectFunctionCall2(gistdelete, - PointerGetDatum(r), - PointerGetDatum(&oldtid)); - - /* generate and insert new tuple */ - tupDesc = r->rd_att; - isnull = (char *) palloc(r->rd_rel->relnatts); - MemSet(isnull, ' ', r->rd_rel->relnatts); - newtup = (IndexTuple) index_formtuple(tupDesc, - (Datum *) ¢ry.pred, isnull); - pfree(isnull); - /* set pointer in new tuple to point to current child */ - ItemPointerSet(&oldtid, blk, 1); - newtup->t_tid = oldtid; - - /* inserting the new entry also adjust keys above */ - res = gistentryinsert(r, stk, newtup, giststate); - - /* in stack, set info to point to new tuple */ - stk->gs_blk = ItemPointerGetBlockNumber(&(res->pointerData)); - stk->gs_child = ItemPointerGetOffsetNumber(&(res->pointerData)); - - pfree(res); - } - WriteBuffer(b); + ev0p->offset, datumsize, FALSE); + isnull = (centry.pred) ? ' ' : 'n'; + newtup = (IndexTuple) index_formtuple( r->rd_att, (Datum *) ¢ry.pred, &isnull ); + newtup->t_tid = oldtup->t_tid; if (centry.pred != datum) - pfree(datum); + pfree( datum ); } - else - ReleaseBuffer(b); - pfree(evec); -} + if ( ev0p->pred && + ev0p->pred != (char*) oldtup + sizeof(IndexTupleData) ) + pfree( ev0p->pred ); + if ( ev1p->pred && + ev1p->pred != (char*) addtup + sizeof(IndexTupleData) ) + pfree( ev1p->pred ); + pfree( evec ); + + return newtup; +} + /* * gistSplit -- split a page in the tree. - * */ -static InsertIndexResult +static IndexTuple * gistSplit(Relation r, Buffer buffer, - GISTSTACK *stack, - IndexTuple itup, /* contains compressed entry */ - GISTSTATE *giststate) + IndexTuple *itup, /* contains compressed entry */ + int *len, + GISTSTATE *giststate, + InsertIndexResult *res) { Page p; - Buffer leftbuf, - rightbuf; - Page left, - right; - ItemId itemid; - IndexTuple item; - IndexTuple ltup, - rtup, - newtup; - OffsetNumber maxoff; - OffsetNumber i; - OffsetNumber leftoff, - rightoff; - BlockNumber lbknum, - rbknum; - BlockNumber bufblock; + Buffer leftbuf, rightbuf; + Page left, right; + OffsetNumber *spl_left, *spl_right; + IndexTuple *lvectup, *rvectup, *newtup; + int leftoff, rightoff; + BlockNumber lbknum, rbknum; GISTPageOpaque opaque; - int blank; - InsertIndexResult res; - char *isnull; + char isnull; GIST_SPLITVEC v; - TupleDesc tupDesc; bytea *entryvec; bool *decompvec; - IndexTuple item_1; - GISTENTRY tmpdentry, - tmpentry; + GISTENTRY tmpentry; + int i, nlen; - isnull = (char *) palloc(r->rd_rel->relnatts); - for (blank = 0; blank < r->rd_rel->relnatts; blank++) - isnull[blank] = ' '; p = (Page) BufferGetPage(buffer); opaque = (GISTPageOpaque) PageGetSpecialPointer(p); @@ -684,313 +770,138 @@ gistSplit(Relation r, right = (Page) BufferGetPage(rightbuf); /* generate the item array */ - maxoff = PageGetMaxOffsetNumber(p); - entryvec = (bytea *) palloc(VARHDRSZ + (maxoff + 2) * sizeof(GISTENTRY)); - decompvec = (bool *) palloc(VARHDRSZ + (maxoff + 2) * sizeof(bool)); - for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) + entryvec = (bytea *) palloc(VARHDRSZ + (*len+1) * sizeof(GISTENTRY)); + decompvec = (bool *) palloc(VARHDRSZ + (*len+1) * sizeof(bool)); + VARATT_SIZEP(entryvec) = (*len+1) * sizeof(GISTENTRY) + VARHDRSZ; + for (i = 1; i <= *len; i++) { - item_1 = (IndexTuple) PageGetItem(p, PageGetItemId(p, i)); gistdentryinit(giststate, &((GISTENTRY *) VARDATA(entryvec))[i], - (((char *) item_1) + sizeof(IndexTupleData)), + (((char *) itup[i-1]) + sizeof(IndexTupleData)), r, p, i, - IndexTupleSize(item_1) - sizeof(IndexTupleData), FALSE); + IndexTupleSize(itup[i-1]) - sizeof(IndexTupleData), FALSE); if ((char *) (((GISTENTRY *) VARDATA(entryvec))[i].pred) - == (((char *) item_1) + sizeof(IndexTupleData))) + == (((char *) itup[i-1]) + sizeof(IndexTupleData))) decompvec[i] = FALSE; else decompvec[i] = TRUE; } - /* add the new datum as the last entry */ - gistdentryinit(giststate, &(((GISTENTRY *) VARDATA(entryvec))[maxoff + 1]), - (((char *) itup) + sizeof(IndexTupleData)), - (Relation) NULL, (Page) NULL, - (OffsetNumber) 0, tmpentry.bytes, FALSE); - if ((char *) (((GISTENTRY *) VARDATA(entryvec))[maxoff + 1]).pred != - (((char *) itup) + sizeof(IndexTupleData))) - decompvec[maxoff + 1] = TRUE; - else - decompvec[maxoff + 1] = FALSE; - - VARATT_SIZEP(entryvec) = (maxoff + 2) * sizeof(GISTENTRY) + VARHDRSZ; - /* now let the user-defined picksplit function set up the split vector */ FunctionCall2(&giststate->picksplitFn, - PointerGetDatum(entryvec), - PointerGetDatum(&v)); - - /* compress ldatum and rdatum */ - gistcentryinit(giststate, &tmpentry, v.spl_ldatum, (Relation) NULL, - (Page) NULL, (OffsetNumber) 0, - ((GISTENTRY *) VARDATA(entryvec))[i].bytes, FALSE); - if (v.spl_ldatum != tmpentry.pred) - pfree(v.spl_ldatum); - v.spl_ldatum = tmpentry.pred; - - gistcentryinit(giststate, &tmpentry, v.spl_rdatum, (Relation) NULL, - (Page) NULL, (OffsetNumber) 0, - ((GISTENTRY *) VARDATA(entryvec))[i].bytes, FALSE); - if (v.spl_rdatum != tmpentry.pred) - pfree(v.spl_rdatum); - v.spl_rdatum = tmpentry.pred; + PointerGetDatum(entryvec), + PointerGetDatum(&v)); /* clean up the entry vector: its preds need to be deleted, too */ - for (i = FirstOffsetNumber; i <= maxoff + 1; i = OffsetNumberNext(i)) - if (decompvec[i]) + for (i = 1; i <= *len; i++) + if (decompvec[i] && ((GISTENTRY *) VARDATA(entryvec))[i].pred) pfree(((GISTENTRY *) VARDATA(entryvec))[i].pred); pfree(entryvec); pfree(decompvec); - leftoff = rightoff = FirstOffsetNumber; - maxoff = PageGetMaxOffsetNumber(p); - for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) - { - itemid = PageGetItemId(p, i); - item = (IndexTuple) PageGetItem(p, itemid); - - if (i == *(v.spl_left)) - { - gistPageAddItem(giststate, r, left, (Item) item, - IndexTupleSize(item), - leftoff, LP_USED, &tmpdentry, &newtup); - leftoff = OffsetNumberNext(leftoff); - v.spl_left++; /* advance in left split vector */ - /* be tidy */ - if (tmpdentry.pred != (((char *) item) + sizeof(IndexTupleData))) - pfree(tmpdentry.pred); - if ((IndexTuple) item != newtup) - pfree(newtup); - } - else - { - gistPageAddItem(giststate, r, right, (Item) item, - IndexTupleSize(item), - rightoff, LP_USED, &tmpdentry, &newtup); - rightoff = OffsetNumberNext(rightoff); - v.spl_right++; /* advance in right split vector */ - /* be tidy */ - if (tmpdentry.pred != (((char *) item) + sizeof(IndexTupleData))) - pfree(tmpdentry.pred); - if (item != newtup) - pfree(newtup); + spl_left = v.spl_left; spl_right = v.spl_right; + + /* form left and right vector */ + lvectup = (IndexTuple*) palloc( sizeof( IndexTuple )*v.spl_nleft ); + rvectup = (IndexTuple*) palloc( sizeof( IndexTuple )*v.spl_nright ); + leftoff = rightoff = 0; + for( i=1; i <= *len; i++ ) { + if (i == *(spl_left) || ( i==*len && *(spl_left) != FirstOffsetNumber ) ) { + lvectup[ leftoff++ ] = itup[ i-1 ]; + spl_left++; + } else { + rvectup[ rightoff++ ] = itup[ i-1 ]; + spl_right++; } } - /* build an InsertIndexResult for this insertion */ - res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData)); - - /* now insert the new index tuple */ - if (*(v.spl_left) != FirstOffsetNumber) - { - gistPageAddItem(giststate, r, left, (Item) itup, - IndexTupleSize(itup), - leftoff, LP_USED, &tmpdentry, &newtup); - leftoff = OffsetNumberNext(leftoff); - ItemPointerSet(&(res->pointerData), lbknum, leftoff); - /* be tidy */ - if (tmpdentry.pred != (((char *) itup) + sizeof(IndexTupleData))) - pfree(tmpdentry.pred); - if (itup != newtup) - pfree(newtup); - } - else - { - gistPageAddItem(giststate, r, right, (Item) itup, - IndexTupleSize(itup), - rightoff, LP_USED, &tmpdentry, &newtup); - rightoff = OffsetNumberNext(rightoff); - ItemPointerSet(&(res->pointerData), rbknum, rightoff); - /* be tidy */ - if (tmpdentry.pred != (((char *) itup) + sizeof(IndexTupleData))) - pfree(tmpdentry.pred); - if (itup != newtup) - pfree(newtup); + /* write on disk (may be need another split) */ + if ( gistnospace(right, rvectup, v.spl_nright) ) { + nlen = v.spl_nright; + newtup = gistSplit(r, rightbuf, rvectup, &nlen, giststate, + ( res && rvectup[ nlen-1 ] == itup[ *len - 1 ] ) ? res : NULL ); + ReleaseBuffer( rightbuf ); + } else { + OffsetNumber l; + + l = gistwritebuffer( r, right, rvectup, v.spl_nright, FirstOffsetNumber, giststate ); + WriteBuffer(rightbuf); + + if ( res ) + ItemPointerSet(&((*res)->pointerData), rbknum, l); + gistcentryinit(giststate, &tmpentry, v.spl_rdatum, (Relation) NULL, + (Page) NULL, (OffsetNumber) 0, + -1, FALSE); + if (v.spl_rdatum != tmpentry.pred) + pfree(v.spl_rdatum); + v.spl_rdatum = tmpentry.pred; + + nlen = 1; + newtup = (IndexTuple*) palloc( sizeof(IndexTuple) * 1); + isnull = ( v.spl_rdatum ) ? ' ' : 'n'; + newtup[0] = (IndexTuple) index_formtuple(r->rd_att, (Datum *) &(v.spl_rdatum), &isnull); + ItemPointerSet(&(newtup[0]->t_tid), rbknum, 1); } - if ((bufblock = BufferGetBlockNumber(buffer)) != GISTP_ROOT) - PageRestoreTempPage(left, p); - WriteBuffer(leftbuf); - WriteBuffer(rightbuf); - - /* - * Okay, the page is split. We have three things left to do: - * - * 1) Adjust any active scans on this index to cope with changes we - * introduced in its structure by splitting this page. - * - * 2) "Tighten" the bounding box of the pointer to the left page in the - * parent node in the tree, if any. Since we moved a bunch of stuff - * off the left page, we expect it to get smaller. This happens in - * the internal insertion routine. - * - * 3) Insert a pointer to the right page in the parent. This may cause - * the parent to split. If it does, we need to repeat steps one and - * two for each split node in the tree. - */ - - /* adjust active scans */ - gistadjscans(r, GISTOP_SPLIT, bufblock, FirstOffsetNumber); - - tupDesc = r->rd_att; - - ltup = (IndexTuple) index_formtuple(tupDesc, - (Datum *) &(v.spl_ldatum), isnull); - rtup = (IndexTuple) index_formtuple(tupDesc, - (Datum *) &(v.spl_rdatum), isnull); - pfree(isnull); - - /* set pointers to new child pages in the internal index tuples */ - ItemPointerSet(&(ltup->t_tid), lbknum, 1); - ItemPointerSet(&(rtup->t_tid), rbknum, 1); - - gistintinsert(r, stack, ltup, rtup, giststate); - - pfree(ltup); - pfree(rtup); - - return res; -} - -/* -** After a split, we need to overwrite the old entry's key in the parent, -** and install install an entry for the new key into the parent. -*/ -static void -gistintinsert(Relation r, - GISTSTACK *stk, - IndexTuple ltup, /* new version of entry for old page */ - IndexTuple rtup, /* entry for new page */ - GISTSTATE *giststate) -{ - ItemPointerData ltid; - - if (stk == (GISTSTACK *) NULL) - { - gistnewroot(giststate, r, ltup, rtup); - return; + if ( gistnospace(left, lvectup, v.spl_nleft) ) { + int llen = v.spl_nleft; + IndexTuple *lntup; + + lntup = gistSplit(r, leftbuf, lvectup, &llen, giststate, + ( res && lvectup[ llen-1 ] == itup[ *len - 1 ] ) ? res : NULL ); + ReleaseBuffer( leftbuf ); + + newtup = gistjoinvector( newtup, &nlen, lntup, llen ); + pfree( lntup ); + } else { + OffsetNumber l; + + l = gistwritebuffer( r, left, lvectup, v.spl_nleft, FirstOffsetNumber, giststate ); + if ( BufferGetBlockNumber(buffer) != GISTP_ROOT) + PageRestoreTempPage(left, p); + + WriteBuffer(leftbuf); + + if ( res ) + ItemPointerSet(&((*res)->pointerData), lbknum, l); + gistcentryinit(giststate, &tmpentry, v.spl_ldatum, (Relation) NULL, + (Page) NULL, (OffsetNumber) 0, + -1, FALSE); + if (v.spl_ldatum != tmpentry.pred) + pfree(v.spl_ldatum); + v.spl_ldatum = tmpentry.pred; + + nlen += 1; + newtup = (IndexTuple*) repalloc( (void*)newtup, sizeof(IndexTuple) * nlen); + isnull = ( v.spl_ldatum ) ? ' ' : 'n'; + newtup[nlen-1] = (IndexTuple) index_formtuple(r->rd_att, (Datum *) &(v.spl_ldatum), &isnull); + ItemPointerSet(&(newtup[nlen-1]->t_tid), lbknum, 1); } - /* remove old left pointer, insert the 2 new entries */ - ItemPointerSet(<id, stk->gs_blk, stk->gs_child); - DirectFunctionCall2(gistdelete, - PointerGetDatum(r), - PointerGetDatum(<id)); - gistentryinserttwo(r, stk, ltup, rtup, giststate); -} - - -/* -** Insert two entries onto one page, handling a split for either one! -*/ -static void -gistentryinserttwo(Relation r, GISTSTACK *stk, IndexTuple ltup, - IndexTuple rtup, GISTSTATE *giststate) -{ - Buffer b; - Page p; - InsertIndexResult res; - GISTENTRY tmpentry; - IndexTuple newtup; - - b = ReadBuffer(r, stk->gs_blk); - p = BufferGetPage(b); - - if (gistnospace(p, ltup)) - { - res = gistSplit(r, b, stk->gs_parent, ltup, giststate); - WriteBuffer(b); /* don't forget to release buffer! - - * 01/31/94 */ - pfree(res); - gistdoinsert(r, rtup, giststate); - } - else - { - gistPageAddItem(giststate, r, p, (Item) ltup, - IndexTupleSize(ltup), InvalidOffsetNumber, - LP_USED, &tmpentry, &newtup); - WriteBuffer(b); - gistAdjustKeys(r, stk->gs_parent, stk->gs_blk, tmpentry.pred, - tmpentry.bytes, giststate); - /* be tidy */ - if (tmpentry.pred != (((char *) ltup) + sizeof(IndexTupleData))) - pfree(tmpentry.pred); - if (ltup != newtup) - pfree(newtup); - gistentryinsert(r, stk, rtup, giststate); - } -} + /* adjust active scans */ + gistadjscans(r, GISTOP_SPLIT, BufferGetBlockNumber(buffer), FirstOffsetNumber); -/* -** Insert an entry onto a page -*/ -static InsertIndexResult -gistentryinsert(Relation r, GISTSTACK *stk, IndexTuple tup, - GISTSTATE *giststate) -{ - Buffer b; - Page p; - InsertIndexResult res; - OffsetNumber off; - GISTENTRY tmpentry; - IndexTuple newtup; - - b = ReadBuffer(r, stk->gs_blk); - p = BufferGetPage(b); + /* !!! pfree */ + pfree( rvectup ); + pfree( lvectup ); + pfree( v.spl_left ); + pfree( v.spl_right ); - if (gistnospace(p, tup)) - { - res = gistSplit(r, b, stk->gs_parent, tup, giststate); - WriteBuffer(b); /* don't forget to release buffer! - - * 01/31/94 */ - return res; - } - else - { - res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData)); - off = gistPageAddItem(giststate, r, p, (Item) tup, IndexTupleSize(tup), - InvalidOffsetNumber, LP_USED, &tmpentry, &newtup); - WriteBuffer(b); - ItemPointerSet(&(res->pointerData), stk->gs_blk, off); - gistAdjustKeys(r, stk->gs_parent, stk->gs_blk, tmpentry.pred, - tmpentry.bytes, giststate); - /* be tidy */ - if (tmpentry.pred != (((char *) tup) + sizeof(IndexTupleData))) - pfree(tmpentry.pred); - if (tup != newtup) - pfree(newtup); - return res; - } + *len = nlen; + return newtup; } - static void -gistnewroot(GISTSTATE *giststate, Relation r, IndexTuple lt, IndexTuple rt) +gistnewroot(GISTSTATE *giststate, Relation r, IndexTuple *itup, int len) { Buffer b; Page p; - GISTENTRY tmpentry; - IndexTuple newtup; b = ReadBuffer(r, GISTP_ROOT); GISTInitBuffer(b, 0); p = BufferGetPage(b); - gistPageAddItem(giststate, r, p, (Item) lt, IndexTupleSize(lt), - FirstOffsetNumber, - LP_USED, &tmpentry, &newtup); - /* be tidy */ - if (tmpentry.pred != (((char *) lt) + sizeof(IndexTupleData))) - pfree(tmpentry.pred); - if (lt != newtup) - pfree(newtup); - gistPageAddItem(giststate, r, p, (Item) rt, IndexTupleSize(rt), - OffsetNumberNext(FirstOffsetNumber), LP_USED, - &tmpentry, &newtup); - /* be tidy */ - if (tmpentry.pred != (((char *) rt) + sizeof(IndexTupleData))) - pfree(tmpentry.pred); - if (rt != newtup) - pfree(newtup); + + gistwritebuffer( r, p, itup, len, FirstOffsetNumber, giststate ); WriteBuffer(b); } @@ -1057,21 +968,15 @@ gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */ if (which_grow == 0) break; } - if (entry.pred != datum) + if (entry.pred && entry.pred != datum) pfree(entry.pred); } - if (identry.pred != id) + if (identry.pred && identry.pred != id) pfree(identry.pred); return which; } -static int -gistnospace(Page p, IndexTuple it) -{ - return PageGetFreeSpace(p) < IndexTupleSize(it); -} - void gistfreestack(GISTSTACK *s) { @@ -1193,7 +1098,7 @@ gist_tuple_replacekey(Relation r, GISTENTRY entry, IndexTuple t) char *datum = (((char *) t) + sizeof(IndexTupleData)); /* if new entry fits in index tuple, copy it in */ - if ((Size) entry.bytes < IndexTupleSize(t) - sizeof(IndexTupleData)) + if ((Size) entry.bytes < IndexTupleSize(t) - sizeof(IndexTupleData) || (Size) entry.bytes == 0 ) { memcpy(datum, entry.pred, entry.bytes); /* clear out old size */ @@ -1208,17 +1113,13 @@ gist_tuple_replacekey(Relation r, GISTENTRY entry, IndexTuple t) /* generate a new index tuple for the compressed entry */ TupleDesc tupDesc = r->rd_att; IndexTuple newtup; - char *isnull; - int blank; + char isnull; - isnull = (char *) palloc(r->rd_rel->relnatts); - for (blank = 0; blank < r->rd_rel->relnatts; blank++) - isnull[blank] = ' '; + isnull = ( entry.pred ) ? ' ' : 'n'; newtup = (IndexTuple) index_formtuple(tupDesc, (Datum *) &(entry.pred), - isnull); + &isnull); newtup->t_tid = t->t_tid; - pfree(isnull); return newtup; } } @@ -1269,81 +1170,46 @@ gistcentryinit(GISTSTATE *giststate, GISTENTRY *e, char *pr, Relation r, } } - - #ifdef GISTDEBUG - -/* -** sloppy debugging support routine, requires recompilation with appropriate -** "out" method for the index keys. Could be fixed to find that info -** in the catalogs... -*/ -void -_gistdump(Relation r) +static void +gist_dumptree(Relation r, int level, BlockNumber blk, OffsetNumber coff) { - Buffer buf; + Buffer buffer; Page page; - OffsetNumber offnum, - maxoff; - BlockNumber blkno; - BlockNumber nblocks; - GISTPageOpaque po; - IndexTuple itup; - BlockNumber itblkno; - OffsetNumber itoffno; - char *datum; - char *itkey; + GISTPageOpaque opaque; + IndexTuple which; + ItemId iid; + OffsetNumber i,maxoff; + BlockNumber cblk; + char *pred; - nblocks = RelationGetNumberOfBlocks(r); - for (blkno = 0; blkno < nblocks; blkno++) - { - buf = ReadBuffer(r, blkno); - page = BufferGetPage(buf); - po = (GISTPageOpaque) PageGetSpecialPointer(page); - maxoff = PageGetMaxOffsetNumber(page); - printf("Page %d maxoff %d <%s>\n", blkno, maxoff, - (po->flags & F_LEAF ? "LEAF" : "INTERNAL")); - - if (PageIsEmpty(page)) - { - ReleaseBuffer(buf); - continue; - } + pred = (char*) palloc( sizeof(char)*level+1 ); + MemSet(pred, '\t', level); + pred[level]='\0'; - for (offnum = FirstOffsetNumber; - offnum <= maxoff; - offnum = OffsetNumberNext(offnum)) - { - itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum)); - itblkno = ItemPointerGetBlockNumber(&(itup->t_tid)); - itoffno = ItemPointerGetOffsetNumber(&(itup->t_tid)); - datum = ((char *) itup); - datum += sizeof(IndexTupleData); - /* get out function for type of key, and out it! */ - itkey = (char *) int_range_out((INTRANGE *) datum); - /* itkey = " unable to print"; */ - printf("\t[%d] size %d heap <%d,%d> key:%s\n", - offnum, IndexTupleSize(itup), itblkno, itoffno, itkey); - pfree(itkey); + buffer = ReadBuffer(r, blk); + page = (Page) BufferGetPage(buffer); + opaque = (GISTPageOpaque) PageGetSpecialPointer(page); + + maxoff = PageGetMaxOffsetNumber( page ); + + elog(NOTICE,"%sPage: %d %s blk: %d maxoff: %d free: %d", pred, coff, ( opaque->flags & F_LEAF ) ? "LEAF" : "INTE", (int)blk, (int)maxoff, PageGetFreeSpace(page)); + + for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) { + iid = PageGetItemId(page, i); + which = (IndexTuple) PageGetItem(page, iid); + cblk = ItemPointerGetBlockNumber(&(which->t_tid)); +#ifdef PRINTTUPLE + elog(NOTICE,"%s Tuple. blk: %d size: %d", pred, (int)cblk, IndexTupleSize( which ) ); +#endif + + if ( ! ( opaque->flags & F_LEAF ) ) { + gist_dumptree( r, level+1, cblk, i ); } - - ReleaseBuffer(buf); } + ReleaseBuffer(buffer); + pfree(pred); } - -static char * -int_range_out(INTRANGE *r) -{ - char *result; - - if (r == NULL) - return NULL; - result = (char *) palloc(80); - snprintf(result, 80, "[%d,%d): %d", r->lower, r->upper, r->flag); - - return result; -} - #endif /* defined GISTDEBUG */ void @@ -1362,3 +1228,4 @@ void gist_desc(char *buf, uint8 xl_info, char* rec) { } + |