diff options
author | Teodor Sigaev <teodor@sigaev.ru> | 2005-06-20 10:29:37 +0000 |
---|---|---|
committer | Teodor Sigaev <teodor@sigaev.ru> | 2005-06-20 10:29:37 +0000 |
commit | d544ec8bbd70d50c1b50a00437b8061cabeeb5f2 (patch) | |
tree | 2eed78c3728c26840f525aa2aa8e2a4e6092f69e /src/backend/access/gist/gistxlog.c | |
parent | 0b62bbe086261a12cc6779244e979c54233da055 (diff) | |
download | postgresql-d544ec8bbd70d50c1b50a00437b8061cabeeb5f2.tar.gz |
1. full functional WAL for GiST
2. improve vacuum for gist
- use FSM
- full vacuum:
- reforms parent tuple if it's needed
( tuples was deleted on child page or parent tuple remains invalid
after crash recovery )
- truncate index file if possible
3. fixes bugs and mistakes
Diffstat (limited to 'src/backend/access/gist/gistxlog.c')
-rw-r--r-- | src/backend/access/gist/gistxlog.c | 540 |
1 files changed, 406 insertions, 134 deletions
diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c index b99ab24761..b6c0696e1a 100644 --- a/src/backend/access/gist/gistxlog.c +++ b/src/backend/access/gist/gistxlog.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.1 2005/06/14 11:45:13 teodor Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.2 2005/06/20 10:29:36 teodor Exp $ *------------------------------------------------------------------------- */ #include "postgres.h" @@ -22,11 +22,13 @@ #include "miscadmin.h" #include "utils/memutils.h" + typedef struct { gistxlogEntryUpdate *data; int len; IndexTuple *itup; BlockNumber *path; + OffsetNumber *todelete; } EntryUpdateRecord; typedef struct { @@ -44,6 +46,7 @@ typedef struct { NewPage *page; IndexTuple *itup; BlockNumber *path; + OffsetNumber *todelete; } PageSplitRecord; /* track for incomplete inserts, idea was taken from nbtxlog.c */ @@ -55,6 +58,7 @@ typedef struct gistIncompleteInsert { BlockNumber *blkno; int pathlen; BlockNumber *path; + XLogRecPtr lsn; } gistIncompleteInsert; @@ -65,12 +69,12 @@ static List *incomplete_inserts; #define ItemPointerEQ( a, b ) \ ( \ - ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(a) && \ + ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(b) && \ ItemPointerGetBlockNumber (a) == ItemPointerGetBlockNumber(b) \ ) static void -pushIncompleteInsert(RelFileNode node, ItemPointerData key, +pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key, BlockNumber *blkno, int lenblk, BlockNumber *path, int pathlen, PageSplitRecord *xlinfo /* to extract blkno info */ ) { @@ -79,6 +83,7 @@ pushIncompleteInsert(RelFileNode node, ItemPointerData key, ninsert->node = node; ninsert->key = key; + ninsert->lsn = lsn; if ( lenblk && blkno ) { ninsert->lenblk = lenblk; @@ -95,7 +100,7 @@ pushIncompleteInsert(RelFileNode node, ItemPointerData key, } Assert( ninsert->lenblk>0 ); - if ( path && ninsert->pathlen ) { + if ( path && pathlen ) { ninsert->pathlen = pathlen; ninsert->path = (BlockNumber*)palloc( sizeof(BlockNumber)*ninsert->pathlen ); memcpy(ninsert->path, path, sizeof(BlockNumber)*ninsert->pathlen); @@ -135,11 +140,17 @@ decodeEntryUpdateRecord(EntryUpdateRecord *decoded, XLogRecord *record) { decoded->data = (gistxlogEntryUpdate*)begin; if ( decoded->data->pathlen ) { - addpath = sizeof(BlockNumber) * decoded->data->pathlen; + addpath = MAXALIGN( sizeof(BlockNumber) * decoded->data->pathlen ); decoded->path = (BlockNumber*)(begin+sizeof( gistxlogEntryUpdate )); } else decoded->path = NULL; + if ( decoded->data->ntodelete ) { + decoded->todelete = (OffsetNumber*)(begin + sizeof( gistxlogEntryUpdate ) + addpath); + addpath += MAXALIGN( sizeof(OffsetNumber) * decoded->data->ntodelete ); + } else + decoded->todelete = NULL; + decoded->len=0; ptr=begin+sizeof( gistxlogEntryUpdate ) + addpath; while( ptr - begin < record->xl_len ) { @@ -157,7 +168,9 @@ decodeEntryUpdateRecord(EntryUpdateRecord *decoded, XLogRecord *record) { } } - +/* + * redo any page update (except page split) + */ static void gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) { EntryUpdateRecord xlrec; @@ -191,19 +204,39 @@ gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) { } } - if ( isnewroot ) - GISTInitBuffer(buffer, 0); - else if ( xlrec.data->todeleteoffnum != InvalidOffsetNumber ) - PageIndexTupleDelete(page, xlrec.data->todeleteoffnum); + if ( xlrec.data->isemptypage ) { + while( !PageIsEmpty(page) ) + PageIndexTupleDelete( page, FirstOffsetNumber ); + + if ( xlrec.data->blkno == GIST_ROOT_BLKNO ) + GistPageSetLeaf( page ); + else + GistPageSetDeleted( page ); + } else { + if ( isnewroot ) + GISTInitBuffer(buffer, 0); + else if ( xlrec.data->ntodelete ) { + int i; + for(i=0; i < xlrec.data->ntodelete ; i++) + PageIndexTupleDelete(page, xlrec.todelete[i]); + if ( GistPageIsLeaf(page) ) + GistMarkTuplesDeleted(page); + } - /* add tuples */ - if ( xlrec.len > 0 ) { - OffsetNumber off = (PageIsEmpty(page)) ? - FirstOffsetNumber - : - OffsetNumberNext(PageGetMaxOffsetNumber(page)); + /* add tuples */ + if ( xlrec.len > 0 ) { + OffsetNumber off = (PageIsEmpty(page)) ? + FirstOffsetNumber + : + OffsetNumberNext(PageGetMaxOffsetNumber(page)); - gistfillbuffer(reln, page, xlrec.itup, xlrec.len, off); + gistfillbuffer(reln, page, xlrec.itup, xlrec.len, off); + } + + /* special case: leafpage, nothing to insert, nothing to delete, then + vacuum marks page */ + if ( GistPageIsLeaf(page) && xlrec.len == 0 && xlrec.data->ntodelete == 0 ) + GistClearTuplesDeleted(page); } PageSetLSN(page, lsn); @@ -216,7 +249,7 @@ gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) { forgetIncompleteInsert(xlrec.data->node, xlrec.data->key); if ( !isnewroot && xlrec.data->blkno!=GIST_ROOT_BLKNO ) - pushIncompleteInsert(xlrec.data->node, xlrec.data->key, + pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key, &(xlrec.data->blkno), 1, xlrec.path, xlrec.data->pathlen, NULL); @@ -233,11 +266,17 @@ decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record) { decoded->itup = (IndexTuple*)palloc( sizeof(IndexTuple) * decoded->data->nitup ); if ( decoded->data->pathlen ) { - addpath = sizeof(BlockNumber) * decoded->data->pathlen; - decoded->path = (BlockNumber*)(begin+sizeof( gistxlogEntryUpdate )); + addpath = MAXALIGN( sizeof(BlockNumber) * decoded->data->pathlen ); + decoded->path = (BlockNumber*)(begin+sizeof( gistxlogPageSplit )); } else decoded->path = NULL; + if ( decoded->data->ntodelete ) { + decoded->todelete = (OffsetNumber*)(begin + sizeof( gistxlogPageSplit ) + addpath); + addpath += MAXALIGN( sizeof(OffsetNumber) * decoded->data->ntodelete ); + } else + decoded->todelete = NULL; + ptr=begin+sizeof( gistxlogPageSplit ) + addpath; for(i=0;i<decoded->data->nitup;i++) { Assert( ptr - begin < record->xl_len ); @@ -285,19 +324,23 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record ) { return; } - if ( xlrec.data->todeleteoffnum != InvalidOffsetNumber ) - PageIndexTupleDelete(page, xlrec.data->todeleteoffnum); + if ( xlrec.data->ntodelete ) { + int i; + for(i=0; i < xlrec.data->ntodelete ; i++) + PageIndexTupleDelete(page, xlrec.todelete[i]); + } itup = gistextractbuffer(buffer, &len); itup = gistjoinvector(itup, &len, xlrec.itup, xlrec.data->nitup); institup = (IndexTuple*)palloc( sizeof(IndexTuple) * len ); opaque = (GISTPageOpaque) PageGetSpecialPointer(page); + /* read and fill all pages */ for(i=0;i<xlrec.data->npage;i++) { int j; NewPage *newpage = xlrec.page + i; - /* prepare itup vector */ + /* prepare itup vector per page */ for(j=0;j<newpage->header->num;j++) institup[j] = itup[ newpage->offnum[j] - 1 ]; @@ -311,9 +354,9 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record ) { if (!BufferIsValid(newpage->buffer)) elog(PANIC, "gistRedoPageSplitRecord: lost page"); newpage->page = (Page) BufferGetPage(newpage->buffer); - if (!PageIsNew((PageHeader) page) && XLByteLE(lsn, PageGetLSN(newpage->page))) { - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); - ReleaseBuffer(buffer); + if (!PageIsNew((PageHeader) newpage->page) && XLByteLE(lsn, PageGetLSN(newpage->page))) { + LockBuffer(newpage->buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(newpage->buffer); newpage->is_ok=true; continue; /* good page */ } else { @@ -350,7 +393,7 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record ) { if ( incomplete_inserts != NIL ) forgetIncompleteInsert(xlrec.data->node, xlrec.data->key); - pushIncompleteInsert(xlrec.data->node, xlrec.data->key, + pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key, NULL, 0, xlrec.path, xlrec.data->pathlen, &xlrec); @@ -386,6 +429,21 @@ gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) { WriteBuffer(buffer); } +static void +gistRedoCompleteInsert(XLogRecPtr lsn, XLogRecord *record) { + char *begin = XLogRecGetData(record), *ptr; + gistxlogInsertComplete *xlrec; + + xlrec = (gistxlogInsertComplete*)begin; + + ptr = begin + sizeof( gistxlogInsertComplete ); + while( ptr - begin < record->xl_len ) { + Assert( record->xl_len - (ptr - begin) >= sizeof(ItemPointerData) ); + forgetIncompleteInsert( xlrec->node, *((ItemPointerData*)ptr) ); + ptr += sizeof(ItemPointerData); + } +} + void gist_redo(XLogRecPtr lsn, XLogRecord *record) { @@ -408,8 +466,7 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record) gistRedoCreateIndex(lsn, record); break; case XLOG_GIST_INSERT_COMPLETE: - forgetIncompleteInsert( ((gistxlogInsertComplete*)XLogRecGetData(record))->node, - ((gistxlogInsertComplete*)XLogRecGetData(record))->key ); + gistRedoCompleteInsert(lsn, record); break; default: elog(PANIC, "gist_redo: unknown op code %u", info); @@ -431,16 +488,16 @@ out_target(char *buf, RelFileNode node, ItemPointerData key) static void out_gistxlogEntryUpdate(char *buf, gistxlogEntryUpdate *xlrec) { out_target(buf, xlrec->node, xlrec->key); - sprintf(buf + strlen(buf), "; block number %u; update offset %u;", - xlrec->blkno, xlrec->todeleteoffnum); + sprintf(buf + strlen(buf), "; block number %u", + xlrec->blkno); } static void out_gistxlogPageSplit(char *buf, gistxlogPageSplit *xlrec) { strcat(buf, "page_split: "); out_target(buf, xlrec->node, xlrec->key); - sprintf(buf + strlen(buf), "; block number %u; update offset %u; add %d tuples; split to %d pages", - xlrec->origblkno, xlrec->todeleteoffnum, + sprintf(buf + strlen(buf), "; block number %u; add %d tuples; split to %d pages", + xlrec->origblkno, xlrec->nitup, xlrec->npage); } @@ -472,135 +529,172 @@ gist_desc(char *buf, uint8 xl_info, char *rec) ((RelFileNode*)rec)->relNode); break; case XLOG_GIST_INSERT_COMPLETE: - strcat(buf, "insert_complete: "); - out_target(buf, ((gistxlogInsertComplete*)rec)->node, ((gistxlogInsertComplete*)rec)->key); + sprintf(buf + strlen(buf), "complete_insert: rel %u/%u/%u", + ((gistxlogInsertComplete*)rec)->node.spcNode, + ((gistxlogInsertComplete*)rec)->node.dbNode, + ((gistxlogInsertComplete*)rec)->node.relNode); default: elog(PANIC, "gist_desc: unknown op code %u", info); } } +IndexTuple +gist_form_invalid_tuple(BlockNumber blkno) { + /* we don't alloc space for null's bitmap, this is invalid tuple, + be carefull in read and write code */ + Size size = IndexInfoFindDataOffset(0); + IndexTuple tuple=(IndexTuple)palloc0( size ); + + tuple->t_info |= size; + + ItemPointerSetBlockNumber(&(tuple->t_tid), blkno); + GistTupleSetInvalid( tuple ); + + return tuple; +} -#ifdef GIST_INCOMPLETE_INSERT static void gistContinueInsert(gistIncompleteInsert *insert) { - GISTSTATE giststate; - GISTInsertState state; - int i; + IndexTuple *itup; + int i, lenitup; MemoryContext oldCxt; + Relation index; + oldCxt = MemoryContextSwitchTo(opCtx); - state.r = XLogOpenRelation(insert->node); - if (!RelationIsValid(state.r)) + index = XLogOpenRelation(insert->node); + if (!RelationIsValid(index)) return; - initGISTstate(&giststate, state.r); + elog(LOG,"Detected incomplete insert into GiST index %u/%u/%u; It's desirable to vacuum or reindex index", + insert->node.spcNode, insert->node.dbNode, insert->node.relNode); - state.needInsertComplete=false; - ItemPointerSetInvalid( &(state.key) ); - state.path=NULL; - state.pathlen=0; - state.xlog_mode = true; + /* needed vector itup never will be more than initial lenblkno+2, + because during this processing Indextuple can be only smaller */ + lenitup = insert->lenblk; + itup = (IndexTuple*)palloc(sizeof(IndexTuple)*(lenitup+2 /*guarantee root split*/)); - /* form union tuples */ - state.itup = (IndexTuple*)palloc(sizeof(IndexTuple)*insert->lenblk); - state.ituplen = insert->lenblk; - for(i=0;i<insert->lenblk;i++) { - int len=0; - IndexTuple *itup; - Buffer buffer; - Page page; + for(i=0;i<insert->lenblk;i++) + itup[i] = gist_form_invalid_tuple( insert->blkno[i] ); - buffer = XLogReadBuffer(false, state.r, insert->blkno[i]); - if (!BufferIsValid(buffer)) - elog(PANIC, "gistContinueInsert: block unfound"); - page = (Page) BufferGetPage(buffer); - if ( PageIsNew((PageHeader)page) ) - elog(PANIC, "gistContinueInsert: uninitialized page"); + if ( insert->pathlen==0 ) { + /*it was split root, so we should only make new root*/ + Buffer buffer = XLogReadBuffer(true, index, GIST_ROOT_BLKNO); + Page page; - itup = gistextractbuffer(buffer, &len); - state.itup[i] = gistunion(state.r, itup, len, &giststate); + if (!BufferIsValid(buffer)) + elog(PANIC, "gistContinueInsert: root block unfound"); - ItemPointerSet( &(state.itup[i]->t_tid), insert->blkno[i], FirstOffsetNumber ); - + GISTInitBuffer(buffer, 0); + page = BufferGetPage(buffer); + gistfillbuffer(index, page, itup, lenitup, FirstOffsetNumber); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); - ReleaseBuffer(buffer); - } - - if ( insert->pathlen==0 ) { - /*it was split root, so we should only make new root*/ - gistnewroot(state.r, state.itup, state.ituplen, &(state.key), true); - MemoryContextSwitchTo(oldCxt); - MemoryContextReset(opCtx); - return; - } + WriteBuffer(buffer); + } else { + Buffer *buffers; + Page *pages; + int numbuffer; + + buffers= (Buffer*) palloc( sizeof(Buffer) * (insert->lenblk+2/*guarantee root split*/) ); + pages = (Page*) palloc( sizeof(Page ) * (insert->lenblk+2/*guarantee root split*/) ); - /* form stack */ - state.stack=NULL; - for(i=0;i<insert->pathlen;i++) { - int j,len=0; - IndexTuple *itup; - GISTInsertStack *top = (GISTInsertStack*)palloc( sizeof(GISTInsertStack) ); - - top->blkno = insert->path[i]; - top->buffer = XLogReadBuffer(false, state.r, top->blkno); - if (!BufferIsValid(top->buffer)) - elog(PANIC, "gistContinueInsert: block unfound"); - top->page = (Page) BufferGetPage(top->buffer); - if ( PageIsNew((PageHeader)(top->page)) ) - elog(PANIC, "gistContinueInsert: uninitialized page"); - - top->todelete = false; - - /* find childoffnum */ - itup = gistextractbuffer(top->buffer, &len); - top->childoffnum=InvalidOffsetNumber; - for(j=0;j<len && top->childoffnum==InvalidOffsetNumber;j++) { - BlockNumber blkno = ItemPointerGetBlockNumber( &(itup[j]->t_tid) ); + for(i=0;i<insert->pathlen;i++) { + int j, k, pituplen=0, childfound=0; + + numbuffer=1; + buffers[numbuffer-1] = XLogReadBuffer(false, index, insert->path[i]); + if (!BufferIsValid(buffers[numbuffer-1])) + elog(PANIC, "gistContinueInsert: block %u unfound", insert->path[i]); + pages[numbuffer-1] = BufferGetPage( buffers[numbuffer-1] ); + if ( PageIsNew((PageHeader)(pages[numbuffer-1])) ) + elog(PANIC, "gistContinueInsert: uninitialized page"); + + pituplen = PageGetMaxOffsetNumber(pages[numbuffer-1]); - if ( i==0 ) { - int k; - for(k=0;k<insert->lenblk;k++) - if ( insert->blkno[k] == blkno ) { - top->childoffnum = j+1; + /* remove old IndexTuples */ + for(j=0;j<pituplen && childfound<lenitup;j++) { + BlockNumber blkno; + ItemId iid = PageGetItemId(pages[numbuffer-1], j+FirstOffsetNumber); + IndexTuple idxtup = (IndexTuple) PageGetItem(pages[numbuffer-1], iid); + + blkno = ItemPointerGetBlockNumber( &(idxtup->t_tid) ); + + for(k=0;k<lenitup;k++) + if ( ItemPointerGetBlockNumber( &(itup[k]->t_tid) ) == blkno ) { + PageIndexTupleDelete(pages[numbuffer-1], j+FirstOffsetNumber); + j--; pituplen--; + childfound++; break; } - } else if ( insert->path[i-1]==blkno ) - top->childoffnum = j+1; - } + } - if ( top->childoffnum==InvalidOffsetNumber ) { - elog(WARNING, "gistContinueInsert: unknown parent, REINDEX GiST Indexes"); - return; + if ( gistnospace(pages[numbuffer-1], itup, lenitup) ) { + /* no space left on page, so we should split */ + buffers[numbuffer] = XLogReadBuffer(true, index, P_NEW); + if (!BufferIsValid(buffers[numbuffer])) + elog(PANIC, "gistContinueInsert: can't create new block"); + GISTInitBuffer(buffers[numbuffer], 0); + pages[numbuffer] = BufferGetPage( buffers[numbuffer] ); + gistfillbuffer( index, pages[numbuffer], itup, lenitup, FirstOffsetNumber ); + numbuffer++; + + if ( BufferGetBlockNumber( buffers[0] ) == GIST_ROOT_BLKNO ) { + IndexTuple *parentitup; + + parentitup = gistextractbuffer(buffers[numbuffer-1], &pituplen); + + /* we split root, just copy tuples from old root to new page */ + if ( i+1 != insert->pathlen ) + elog(PANIC,"gistContinueInsert: can't restore index '%s'", + RelationGetRelationName( index )); + + /* fill new page */ + buffers[numbuffer] = XLogReadBuffer(true, index, P_NEW); + if (!BufferIsValid(buffers[numbuffer])) + elog(PANIC, "gistContinueInsert: can't create new block"); + GISTInitBuffer(buffers[numbuffer], 0); + pages[numbuffer] = BufferGetPage( buffers[numbuffer] ); + gistfillbuffer(index, pages[numbuffer], parentitup, pituplen, FirstOffsetNumber); + numbuffer++; + + /* fill root page */ + GISTInitBuffer(buffers[0], 0); + for(j=1;j<numbuffer;j++) { + IndexTuple tuple = gist_form_invalid_tuple( BufferGetBlockNumber( buffers[j] ) ); + if ( InvalidOffsetNumber == PageAddItem(pages[0], + (Item)tuple, + IndexTupleSize( tuple ), + (OffsetNumber)j, + LP_USED) ) + elog( PANIC,"gistContinueInsert: can't restore index '%s'", + RelationGetRelationName( index )); + } + } + } else + gistfillbuffer( index, pages[numbuffer-1], itup, lenitup, + (PageIsEmpty(pages[numbuffer-1])) ? + FirstOffsetNumber : OffsetNumberNext(PageGetMaxOffsetNumber(pages[numbuffer-1])) ); + + lenitup=numbuffer; + for(j=0;j<numbuffer;j++) { + itup[j]=gist_form_invalid_tuple( BufferGetBlockNumber( buffers[j] ) ); + PageSetLSN(pages[j], insert->lsn); + PageSetTLI(pages[j], ThisTimeLineID); + LockBuffer(buffers[j], BUFFER_LOCK_UNLOCK); + WriteBuffer( buffers[j] ); + } } - - if ( i==0 ) - PageIndexTupleDelete(top->page, top->childoffnum); - - /* install item on right place in stack */ - top->parent=NULL; - if ( state.stack ) { - GISTInsertStack *ptr = state.stack; - while( ptr->parent ) - ptr = ptr->parent; - ptr->parent=top; - } else - state.stack = top; } - /* Good. Now we can continue insert */ - - gistmakedeal(&state, &giststate); - MemoryContextSwitchTo(oldCxt); MemoryContextReset(opCtx); } -#endif void gist_xlog_startup(void) { incomplete_inserts=NIL; insertCtx = AllocSetContextCreate(CurrentMemoryContext, - "GiST insert in xlog temporary context", + "GiST recovery temporary context", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); @@ -613,16 +707,194 @@ gist_xlog_cleanup(void) { foreach(l, incomplete_inserts) { gistIncompleteInsert *insert = (gistIncompleteInsert*) lfirst(l); - char buf[1024]; - - *buf='\0'; - out_target(buf, insert->node, insert->key); - elog(LOG,"Incomplete insert: %s; It's needed to reindex", buf); -#ifdef GIST_INCOMPLETE_INSERT gistContinueInsert(insert); -#endif } MemoryContextDelete(opCtx); MemoryContextDelete(insertCtx); } + +XLogRecData * +formSplitRdata(RelFileNode node, BlockNumber blkno, + OffsetNumber *todelete, int ntodelete, + IndexTuple *itup, int ituplen, ItemPointer key, + BlockNumber *path, int pathlen, SplitedPageLayout *dist ) { + + XLogRecData *rdata; + gistxlogPageSplit *xlrec = (gistxlogPageSplit*)palloc(sizeof(gistxlogPageSplit)); + SplitedPageLayout *ptr; + int npage = 0, cur=1, i; + + ptr=dist; + while( ptr ) { + npage++; + ptr=ptr->next; + } + + rdata = (XLogRecData*)palloc(sizeof(XLogRecData)*(npage*2 + ituplen + 3)); + + xlrec->node = node; + xlrec->origblkno = blkno; + xlrec->npage = (uint16)npage; + xlrec->nitup = (uint16)ituplen; + xlrec->ntodelete = (uint16)ntodelete; + xlrec->pathlen = (uint16)pathlen; + if ( key ) + xlrec->key = *key; + else + ItemPointerSetInvalid( &(xlrec->key) ); + + rdata[0].buffer = InvalidBuffer; + rdata[0].data = (char *) xlrec; + rdata[0].len = sizeof( gistxlogPageSplit ); + rdata[0].next = NULL; + + if ( pathlen ) { + rdata[cur-1].next = &(rdata[cur]); + rdata[cur].buffer = InvalidBuffer; + rdata[cur].data = (char*)path; + rdata[cur].len = MAXALIGN(sizeof(BlockNumber)*pathlen); + rdata[cur].next = NULL; + cur++; + } + + if ( ntodelete ) { + rdata[cur-1].next = &(rdata[cur]); + rdata[cur].buffer = InvalidBuffer; + rdata[cur].data = (char*)todelete; + rdata[cur].len = MAXALIGN(sizeof(OffsetNumber)*ntodelete); + rdata[cur].next = NULL; + cur++; + } + + /* new tuples */ + for(i=0;i<ituplen;i++) { + rdata[cur].buffer = InvalidBuffer; + rdata[cur].data = (char*)(itup[i]); + rdata[cur].len = IndexTupleSize(itup[i]); + rdata[cur].next = NULL; + rdata[cur-1].next = &(rdata[cur]); + cur++; + } + + ptr=dist; + while(ptr) { + rdata[cur].buffer = InvalidBuffer; + rdata[cur].data = (char*)&(ptr->block); + rdata[cur].len = sizeof(gistxlogPage); + rdata[cur-1].next = &(rdata[cur]); + cur++; + + rdata[cur].buffer = InvalidBuffer; + rdata[cur].data = (char*)(ptr->list); + rdata[cur].len = MAXALIGN(sizeof(OffsetNumber)*ptr->block.num); + if ( rdata[cur].len > sizeof(OffsetNumber)*ptr->block.num ) + rdata[cur].data = repalloc( rdata[cur].data, rdata[cur].len ); + rdata[cur-1].next = &(rdata[cur]); + rdata[cur].next=NULL; + cur++; + ptr=ptr->next; + } + + return rdata; +} + + +XLogRecData * +formUpdateRdata(RelFileNode node, BlockNumber blkno, + OffsetNumber *todelete, int ntodelete, bool emptypage, + IndexTuple *itup, int ituplen, ItemPointer key, + BlockNumber *path, int pathlen) { + XLogRecData *rdata; + gistxlogEntryUpdate *xlrec = (gistxlogEntryUpdate*)palloc(sizeof(gistxlogEntryUpdate)); + + xlrec->node = node; + xlrec->blkno = blkno; + if ( key ) + xlrec->key = *key; + else + ItemPointerSetInvalid( &(xlrec->key) ); + + if ( emptypage ) { + xlrec->isemptypage = true; + xlrec->ntodelete = 0; + xlrec->pathlen = 0; + + rdata = (XLogRecData*)palloc( sizeof(XLogRecData) ); + rdata->buffer = InvalidBuffer; + rdata->data = (char*)xlrec; + rdata->len = sizeof(gistxlogEntryUpdate); + rdata->next = NULL; + } else { + int cur=1,i; + + xlrec->isemptypage = false; + xlrec->ntodelete = ntodelete; + xlrec->pathlen = pathlen; + + rdata = (XLogRecData*) palloc( sizeof(XLogRecData) * ( 3 + ituplen ) ); + + rdata->buffer = InvalidBuffer; + rdata->data = (char*)xlrec; + rdata->len = sizeof(gistxlogEntryUpdate); + rdata->next = NULL; + + if ( pathlen ) { + rdata[cur-1].next = &(rdata[cur]); + rdata[cur].buffer = InvalidBuffer; + rdata[cur].data = (char*)path; + rdata[cur].len = MAXALIGN(sizeof(BlockNumber)*pathlen); + rdata[cur].next = NULL; + cur++; + } + + if ( ntodelete ) { + rdata[cur-1].next = &(rdata[cur]); + rdata[cur].buffer = InvalidBuffer; + rdata[cur].data = (char*)todelete; + rdata[cur].len = MAXALIGN(sizeof(OffsetNumber)*ntodelete); + rdata[cur].next = NULL; + cur++; + } + + /* new tuples */ + for(i=0;i<ituplen;i++) { + rdata[cur].buffer = InvalidBuffer; + rdata[cur].data = (char*)(itup[i]); + rdata[cur].len = IndexTupleSize(itup[i]); + rdata[cur].next = NULL; + rdata[cur-1].next = &(rdata[cur]); + cur++; + } + } + + return rdata; +} + +XLogRecPtr +gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len) { + gistxlogInsertComplete xlrec; + XLogRecData rdata[2]; + XLogRecPtr recptr; + + Assert(len>0); + xlrec.node = node; + + rdata[0].buffer = InvalidBuffer; + rdata[0].data = (char *) &xlrec; + rdata[0].len = sizeof( gistxlogInsertComplete ); + rdata[0].next = &(rdata[1]); + + rdata[1].buffer = InvalidBuffer; + rdata[1].data = (char *) keys; + rdata[1].len = sizeof( ItemPointerData ) * len; + rdata[1].next = NULL; + + START_CRIT_SECTION(); + + recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_INSERT_COMPLETE, rdata); + + END_CRIT_SECTION(); + + return recptr; +} |