summaryrefslogtreecommitdiff
path: root/src/backend/access/gist/gistxlog.c
diff options
context:
space:
mode:
authorTeodor Sigaev <teodor@sigaev.ru>2005-06-20 10:29:37 +0000
committerTeodor Sigaev <teodor@sigaev.ru>2005-06-20 10:29:37 +0000
commitd544ec8bbd70d50c1b50a00437b8061cabeeb5f2 (patch)
tree2eed78c3728c26840f525aa2aa8e2a4e6092f69e /src/backend/access/gist/gistxlog.c
parent0b62bbe086261a12cc6779244e979c54233da055 (diff)
downloadpostgresql-d544ec8bbd70d50c1b50a00437b8061cabeeb5f2.tar.gz
1. full functional WAL for GiST
2. improve vacuum for gist - use FSM - full vacuum: - reforms parent tuple if it's needed ( tuples was deleted on child page or parent tuple remains invalid after crash recovery ) - truncate index file if possible 3. fixes bugs and mistakes
Diffstat (limited to 'src/backend/access/gist/gistxlog.c')
-rw-r--r--src/backend/access/gist/gistxlog.c540
1 files changed, 406 insertions, 134 deletions
diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c
index b99ab24761..b6c0696e1a 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.1 2005/06/14 11:45:13 teodor Exp $
+ * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.2 2005/06/20 10:29:36 teodor Exp $
*-------------------------------------------------------------------------
*/
#include "postgres.h"
@@ -22,11 +22,13 @@
#include "miscadmin.h"
#include "utils/memutils.h"
+
typedef struct {
gistxlogEntryUpdate *data;
int len;
IndexTuple *itup;
BlockNumber *path;
+ OffsetNumber *todelete;
} EntryUpdateRecord;
typedef struct {
@@ -44,6 +46,7 @@ typedef struct {
NewPage *page;
IndexTuple *itup;
BlockNumber *path;
+ OffsetNumber *todelete;
} PageSplitRecord;
/* track for incomplete inserts, idea was taken from nbtxlog.c */
@@ -55,6 +58,7 @@ typedef struct gistIncompleteInsert {
BlockNumber *blkno;
int pathlen;
BlockNumber *path;
+ XLogRecPtr lsn;
} gistIncompleteInsert;
@@ -65,12 +69,12 @@ static List *incomplete_inserts;
#define ItemPointerEQ( a, b ) \
( \
- ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(a) && \
+ ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(b) && \
ItemPointerGetBlockNumber (a) == ItemPointerGetBlockNumber(b) \
)
static void
-pushIncompleteInsert(RelFileNode node, ItemPointerData key,
+pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key,
BlockNumber *blkno, int lenblk,
BlockNumber *path, int pathlen,
PageSplitRecord *xlinfo /* to extract blkno info */ ) {
@@ -79,6 +83,7 @@ pushIncompleteInsert(RelFileNode node, ItemPointerData key,
ninsert->node = node;
ninsert->key = key;
+ ninsert->lsn = lsn;
if ( lenblk && blkno ) {
ninsert->lenblk = lenblk;
@@ -95,7 +100,7 @@ pushIncompleteInsert(RelFileNode node, ItemPointerData key,
}
Assert( ninsert->lenblk>0 );
- if ( path && ninsert->pathlen ) {
+ if ( path && pathlen ) {
ninsert->pathlen = pathlen;
ninsert->path = (BlockNumber*)palloc( sizeof(BlockNumber)*ninsert->pathlen );
memcpy(ninsert->path, path, sizeof(BlockNumber)*ninsert->pathlen);
@@ -135,11 +140,17 @@ decodeEntryUpdateRecord(EntryUpdateRecord *decoded, XLogRecord *record) {
decoded->data = (gistxlogEntryUpdate*)begin;
if ( decoded->data->pathlen ) {
- addpath = sizeof(BlockNumber) * decoded->data->pathlen;
+ addpath = MAXALIGN( sizeof(BlockNumber) * decoded->data->pathlen );
decoded->path = (BlockNumber*)(begin+sizeof( gistxlogEntryUpdate ));
} else
decoded->path = NULL;
+ if ( decoded->data->ntodelete ) {
+ decoded->todelete = (OffsetNumber*)(begin + sizeof( gistxlogEntryUpdate ) + addpath);
+ addpath += MAXALIGN( sizeof(OffsetNumber) * decoded->data->ntodelete );
+ } else
+ decoded->todelete = NULL;
+
decoded->len=0;
ptr=begin+sizeof( gistxlogEntryUpdate ) + addpath;
while( ptr - begin < record->xl_len ) {
@@ -157,7 +168,9 @@ decodeEntryUpdateRecord(EntryUpdateRecord *decoded, XLogRecord *record) {
}
}
-
+/*
+ * redo any page update (except page split)
+ */
static void
gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) {
EntryUpdateRecord xlrec;
@@ -191,19 +204,39 @@ gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) {
}
}
- if ( isnewroot )
- GISTInitBuffer(buffer, 0);
- else if ( xlrec.data->todeleteoffnum != InvalidOffsetNumber )
- PageIndexTupleDelete(page, xlrec.data->todeleteoffnum);
+ if ( xlrec.data->isemptypage ) {
+ while( !PageIsEmpty(page) )
+ PageIndexTupleDelete( page, FirstOffsetNumber );
+
+ if ( xlrec.data->blkno == GIST_ROOT_BLKNO )
+ GistPageSetLeaf( page );
+ else
+ GistPageSetDeleted( page );
+ } else {
+ if ( isnewroot )
+ GISTInitBuffer(buffer, 0);
+ else if ( xlrec.data->ntodelete ) {
+ int i;
+ for(i=0; i < xlrec.data->ntodelete ; i++)
+ PageIndexTupleDelete(page, xlrec.todelete[i]);
+ if ( GistPageIsLeaf(page) )
+ GistMarkTuplesDeleted(page);
+ }
- /* add tuples */
- if ( xlrec.len > 0 ) {
- OffsetNumber off = (PageIsEmpty(page)) ?
- FirstOffsetNumber
- :
- OffsetNumberNext(PageGetMaxOffsetNumber(page));
+ /* add tuples */
+ if ( xlrec.len > 0 ) {
+ OffsetNumber off = (PageIsEmpty(page)) ?
+ FirstOffsetNumber
+ :
+ OffsetNumberNext(PageGetMaxOffsetNumber(page));
- gistfillbuffer(reln, page, xlrec.itup, xlrec.len, off);
+ gistfillbuffer(reln, page, xlrec.itup, xlrec.len, off);
+ }
+
+ /* special case: leafpage, nothing to insert, nothing to delete, then
+ vacuum marks page */
+ if ( GistPageIsLeaf(page) && xlrec.len == 0 && xlrec.data->ntodelete == 0 )
+ GistClearTuplesDeleted(page);
}
PageSetLSN(page, lsn);
@@ -216,7 +249,7 @@ gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) {
forgetIncompleteInsert(xlrec.data->node, xlrec.data->key);
if ( !isnewroot && xlrec.data->blkno!=GIST_ROOT_BLKNO )
- pushIncompleteInsert(xlrec.data->node, xlrec.data->key,
+ pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key,
&(xlrec.data->blkno), 1,
xlrec.path, xlrec.data->pathlen,
NULL);
@@ -233,11 +266,17 @@ decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record) {
decoded->itup = (IndexTuple*)palloc( sizeof(IndexTuple) * decoded->data->nitup );
if ( decoded->data->pathlen ) {
- addpath = sizeof(BlockNumber) * decoded->data->pathlen;
- decoded->path = (BlockNumber*)(begin+sizeof( gistxlogEntryUpdate ));
+ addpath = MAXALIGN( sizeof(BlockNumber) * decoded->data->pathlen );
+ decoded->path = (BlockNumber*)(begin+sizeof( gistxlogPageSplit ));
} else
decoded->path = NULL;
+ if ( decoded->data->ntodelete ) {
+ decoded->todelete = (OffsetNumber*)(begin + sizeof( gistxlogPageSplit ) + addpath);
+ addpath += MAXALIGN( sizeof(OffsetNumber) * decoded->data->ntodelete );
+ } else
+ decoded->todelete = NULL;
+
ptr=begin+sizeof( gistxlogPageSplit ) + addpath;
for(i=0;i<decoded->data->nitup;i++) {
Assert( ptr - begin < record->xl_len );
@@ -285,19 +324,23 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record ) {
return;
}
- if ( xlrec.data->todeleteoffnum != InvalidOffsetNumber )
- PageIndexTupleDelete(page, xlrec.data->todeleteoffnum);
+ if ( xlrec.data->ntodelete ) {
+ int i;
+ for(i=0; i < xlrec.data->ntodelete ; i++)
+ PageIndexTupleDelete(page, xlrec.todelete[i]);
+ }
itup = gistextractbuffer(buffer, &len);
itup = gistjoinvector(itup, &len, xlrec.itup, xlrec.data->nitup);
institup = (IndexTuple*)palloc( sizeof(IndexTuple) * len );
opaque = (GISTPageOpaque) PageGetSpecialPointer(page);
+ /* read and fill all pages */
for(i=0;i<xlrec.data->npage;i++) {
int j;
NewPage *newpage = xlrec.page + i;
- /* prepare itup vector */
+ /* prepare itup vector per page */
for(j=0;j<newpage->header->num;j++)
institup[j] = itup[ newpage->offnum[j] - 1 ];
@@ -311,9 +354,9 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record ) {
if (!BufferIsValid(newpage->buffer))
elog(PANIC, "gistRedoPageSplitRecord: lost page");
newpage->page = (Page) BufferGetPage(newpage->buffer);
- if (!PageIsNew((PageHeader) page) && XLByteLE(lsn, PageGetLSN(newpage->page))) {
- LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
- ReleaseBuffer(buffer);
+ if (!PageIsNew((PageHeader) newpage->page) && XLByteLE(lsn, PageGetLSN(newpage->page))) {
+ LockBuffer(newpage->buffer, BUFFER_LOCK_UNLOCK);
+ ReleaseBuffer(newpage->buffer);
newpage->is_ok=true;
continue; /* good page */
} else {
@@ -350,7 +393,7 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record ) {
if ( incomplete_inserts != NIL )
forgetIncompleteInsert(xlrec.data->node, xlrec.data->key);
- pushIncompleteInsert(xlrec.data->node, xlrec.data->key,
+ pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key,
NULL, 0,
xlrec.path, xlrec.data->pathlen,
&xlrec);
@@ -386,6 +429,21 @@ gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) {
WriteBuffer(buffer);
}
+static void
+gistRedoCompleteInsert(XLogRecPtr lsn, XLogRecord *record) {
+ char *begin = XLogRecGetData(record), *ptr;
+ gistxlogInsertComplete *xlrec;
+
+ xlrec = (gistxlogInsertComplete*)begin;
+
+ ptr = begin + sizeof( gistxlogInsertComplete );
+ while( ptr - begin < record->xl_len ) {
+ Assert( record->xl_len - (ptr - begin) >= sizeof(ItemPointerData) );
+ forgetIncompleteInsert( xlrec->node, *((ItemPointerData*)ptr) );
+ ptr += sizeof(ItemPointerData);
+ }
+}
+
void
gist_redo(XLogRecPtr lsn, XLogRecord *record)
{
@@ -408,8 +466,7 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record)
gistRedoCreateIndex(lsn, record);
break;
case XLOG_GIST_INSERT_COMPLETE:
- forgetIncompleteInsert( ((gistxlogInsertComplete*)XLogRecGetData(record))->node,
- ((gistxlogInsertComplete*)XLogRecGetData(record))->key );
+ gistRedoCompleteInsert(lsn, record);
break;
default:
elog(PANIC, "gist_redo: unknown op code %u", info);
@@ -431,16 +488,16 @@ out_target(char *buf, RelFileNode node, ItemPointerData key)
static void
out_gistxlogEntryUpdate(char *buf, gistxlogEntryUpdate *xlrec) {
out_target(buf, xlrec->node, xlrec->key);
- sprintf(buf + strlen(buf), "; block number %u; update offset %u;",
- xlrec->blkno, xlrec->todeleteoffnum);
+ sprintf(buf + strlen(buf), "; block number %u",
+ xlrec->blkno);
}
static void
out_gistxlogPageSplit(char *buf, gistxlogPageSplit *xlrec) {
strcat(buf, "page_split: ");
out_target(buf, xlrec->node, xlrec->key);
- sprintf(buf + strlen(buf), "; block number %u; update offset %u; add %d tuples; split to %d pages",
- xlrec->origblkno, xlrec->todeleteoffnum,
+ sprintf(buf + strlen(buf), "; block number %u; add %d tuples; split to %d pages",
+ xlrec->origblkno,
xlrec->nitup, xlrec->npage);
}
@@ -472,135 +529,172 @@ gist_desc(char *buf, uint8 xl_info, char *rec)
((RelFileNode*)rec)->relNode);
break;
case XLOG_GIST_INSERT_COMPLETE:
- strcat(buf, "insert_complete: ");
- out_target(buf, ((gistxlogInsertComplete*)rec)->node, ((gistxlogInsertComplete*)rec)->key);
+ sprintf(buf + strlen(buf), "complete_insert: rel %u/%u/%u",
+ ((gistxlogInsertComplete*)rec)->node.spcNode,
+ ((gistxlogInsertComplete*)rec)->node.dbNode,
+ ((gistxlogInsertComplete*)rec)->node.relNode);
default:
elog(PANIC, "gist_desc: unknown op code %u", info);
}
}
+IndexTuple
+gist_form_invalid_tuple(BlockNumber blkno) {
+ /* we don't alloc space for null's bitmap, this is invalid tuple,
+ be carefull in read and write code */
+ Size size = IndexInfoFindDataOffset(0);
+ IndexTuple tuple=(IndexTuple)palloc0( size );
+
+ tuple->t_info |= size;
+
+ ItemPointerSetBlockNumber(&(tuple->t_tid), blkno);
+ GistTupleSetInvalid( tuple );
+
+ return tuple;
+}
-#ifdef GIST_INCOMPLETE_INSERT
static void
gistContinueInsert(gistIncompleteInsert *insert) {
- GISTSTATE giststate;
- GISTInsertState state;
- int i;
+ IndexTuple *itup;
+ int i, lenitup;
MemoryContext oldCxt;
+ Relation index;
+
oldCxt = MemoryContextSwitchTo(opCtx);
- state.r = XLogOpenRelation(insert->node);
- if (!RelationIsValid(state.r))
+ index = XLogOpenRelation(insert->node);
+ if (!RelationIsValid(index))
return;
- initGISTstate(&giststate, state.r);
+ elog(LOG,"Detected incomplete insert into GiST index %u/%u/%u; It's desirable to vacuum or reindex index",
+ insert->node.spcNode, insert->node.dbNode, insert->node.relNode);
- state.needInsertComplete=false;
- ItemPointerSetInvalid( &(state.key) );
- state.path=NULL;
- state.pathlen=0;
- state.xlog_mode = true;
+ /* needed vector itup never will be more than initial lenblkno+2,
+ because during this processing Indextuple can be only smaller */
+ lenitup = insert->lenblk;
+ itup = (IndexTuple*)palloc(sizeof(IndexTuple)*(lenitup+2 /*guarantee root split*/));
- /* form union tuples */
- state.itup = (IndexTuple*)palloc(sizeof(IndexTuple)*insert->lenblk);
- state.ituplen = insert->lenblk;
- for(i=0;i<insert->lenblk;i++) {
- int len=0;
- IndexTuple *itup;
- Buffer buffer;
- Page page;
+ for(i=0;i<insert->lenblk;i++)
+ itup[i] = gist_form_invalid_tuple( insert->blkno[i] );
- buffer = XLogReadBuffer(false, state.r, insert->blkno[i]);
- if (!BufferIsValid(buffer))
- elog(PANIC, "gistContinueInsert: block unfound");
- page = (Page) BufferGetPage(buffer);
- if ( PageIsNew((PageHeader)page) )
- elog(PANIC, "gistContinueInsert: uninitialized page");
+ if ( insert->pathlen==0 ) {
+ /*it was split root, so we should only make new root*/
+ Buffer buffer = XLogReadBuffer(true, index, GIST_ROOT_BLKNO);
+ Page page;
- itup = gistextractbuffer(buffer, &len);
- state.itup[i] = gistunion(state.r, itup, len, &giststate);
+ if (!BufferIsValid(buffer))
+ elog(PANIC, "gistContinueInsert: root block unfound");
- ItemPointerSet( &(state.itup[i]->t_tid), insert->blkno[i], FirstOffsetNumber );
-
+ GISTInitBuffer(buffer, 0);
+ page = BufferGetPage(buffer);
+ gistfillbuffer(index, page, itup, lenitup, FirstOffsetNumber);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
- ReleaseBuffer(buffer);
- }
-
- if ( insert->pathlen==0 ) {
- /*it was split root, so we should only make new root*/
- gistnewroot(state.r, state.itup, state.ituplen, &(state.key), true);
- MemoryContextSwitchTo(oldCxt);
- MemoryContextReset(opCtx);
- return;
- }
+ WriteBuffer(buffer);
+ } else {
+ Buffer *buffers;
+ Page *pages;
+ int numbuffer;
+
+ buffers= (Buffer*) palloc( sizeof(Buffer) * (insert->lenblk+2/*guarantee root split*/) );
+ pages = (Page*) palloc( sizeof(Page ) * (insert->lenblk+2/*guarantee root split*/) );
- /* form stack */
- state.stack=NULL;
- for(i=0;i<insert->pathlen;i++) {
- int j,len=0;
- IndexTuple *itup;
- GISTInsertStack *top = (GISTInsertStack*)palloc( sizeof(GISTInsertStack) );
-
- top->blkno = insert->path[i];
- top->buffer = XLogReadBuffer(false, state.r, top->blkno);
- if (!BufferIsValid(top->buffer))
- elog(PANIC, "gistContinueInsert: block unfound");
- top->page = (Page) BufferGetPage(top->buffer);
- if ( PageIsNew((PageHeader)(top->page)) )
- elog(PANIC, "gistContinueInsert: uninitialized page");
-
- top->todelete = false;
-
- /* find childoffnum */
- itup = gistextractbuffer(top->buffer, &len);
- top->childoffnum=InvalidOffsetNumber;
- for(j=0;j<len && top->childoffnum==InvalidOffsetNumber;j++) {
- BlockNumber blkno = ItemPointerGetBlockNumber( &(itup[j]->t_tid) );
+ for(i=0;i<insert->pathlen;i++) {
+ int j, k, pituplen=0, childfound=0;
+
+ numbuffer=1;
+ buffers[numbuffer-1] = XLogReadBuffer(false, index, insert->path[i]);
+ if (!BufferIsValid(buffers[numbuffer-1]))
+ elog(PANIC, "gistContinueInsert: block %u unfound", insert->path[i]);
+ pages[numbuffer-1] = BufferGetPage( buffers[numbuffer-1] );
+ if ( PageIsNew((PageHeader)(pages[numbuffer-1])) )
+ elog(PANIC, "gistContinueInsert: uninitialized page");
+
+ pituplen = PageGetMaxOffsetNumber(pages[numbuffer-1]);
- if ( i==0 ) {
- int k;
- for(k=0;k<insert->lenblk;k++)
- if ( insert->blkno[k] == blkno ) {
- top->childoffnum = j+1;
+ /* remove old IndexTuples */
+ for(j=0;j<pituplen && childfound<lenitup;j++) {
+ BlockNumber blkno;
+ ItemId iid = PageGetItemId(pages[numbuffer-1], j+FirstOffsetNumber);
+ IndexTuple idxtup = (IndexTuple) PageGetItem(pages[numbuffer-1], iid);
+
+ blkno = ItemPointerGetBlockNumber( &(idxtup->t_tid) );
+
+ for(k=0;k<lenitup;k++)
+ if ( ItemPointerGetBlockNumber( &(itup[k]->t_tid) ) == blkno ) {
+ PageIndexTupleDelete(pages[numbuffer-1], j+FirstOffsetNumber);
+ j--; pituplen--;
+ childfound++;
break;
}
- } else if ( insert->path[i-1]==blkno )
- top->childoffnum = j+1;
- }
+ }
- if ( top->childoffnum==InvalidOffsetNumber ) {
- elog(WARNING, "gistContinueInsert: unknown parent, REINDEX GiST Indexes");
- return;
+ if ( gistnospace(pages[numbuffer-1], itup, lenitup) ) {
+ /* no space left on page, so we should split */
+ buffers[numbuffer] = XLogReadBuffer(true, index, P_NEW);
+ if (!BufferIsValid(buffers[numbuffer]))
+ elog(PANIC, "gistContinueInsert: can't create new block");
+ GISTInitBuffer(buffers[numbuffer], 0);
+ pages[numbuffer] = BufferGetPage( buffers[numbuffer] );
+ gistfillbuffer( index, pages[numbuffer], itup, lenitup, FirstOffsetNumber );
+ numbuffer++;
+
+ if ( BufferGetBlockNumber( buffers[0] ) == GIST_ROOT_BLKNO ) {
+ IndexTuple *parentitup;
+
+ parentitup = gistextractbuffer(buffers[numbuffer-1], &pituplen);
+
+ /* we split root, just copy tuples from old root to new page */
+ if ( i+1 != insert->pathlen )
+ elog(PANIC,"gistContinueInsert: can't restore index '%s'",
+ RelationGetRelationName( index ));
+
+ /* fill new page */
+ buffers[numbuffer] = XLogReadBuffer(true, index, P_NEW);
+ if (!BufferIsValid(buffers[numbuffer]))
+ elog(PANIC, "gistContinueInsert: can't create new block");
+ GISTInitBuffer(buffers[numbuffer], 0);
+ pages[numbuffer] = BufferGetPage( buffers[numbuffer] );
+ gistfillbuffer(index, pages[numbuffer], parentitup, pituplen, FirstOffsetNumber);
+ numbuffer++;
+
+ /* fill root page */
+ GISTInitBuffer(buffers[0], 0);
+ for(j=1;j<numbuffer;j++) {
+ IndexTuple tuple = gist_form_invalid_tuple( BufferGetBlockNumber( buffers[j] ) );
+ if ( InvalidOffsetNumber == PageAddItem(pages[0],
+ (Item)tuple,
+ IndexTupleSize( tuple ),
+ (OffsetNumber)j,
+ LP_USED) )
+ elog( PANIC,"gistContinueInsert: can't restore index '%s'",
+ RelationGetRelationName( index ));
+ }
+ }
+ } else
+ gistfillbuffer( index, pages[numbuffer-1], itup, lenitup,
+ (PageIsEmpty(pages[numbuffer-1])) ?
+ FirstOffsetNumber : OffsetNumberNext(PageGetMaxOffsetNumber(pages[numbuffer-1])) );
+
+ lenitup=numbuffer;
+ for(j=0;j<numbuffer;j++) {
+ itup[j]=gist_form_invalid_tuple( BufferGetBlockNumber( buffers[j] ) );
+ PageSetLSN(pages[j], insert->lsn);
+ PageSetTLI(pages[j], ThisTimeLineID);
+ LockBuffer(buffers[j], BUFFER_LOCK_UNLOCK);
+ WriteBuffer( buffers[j] );
+ }
}
-
- if ( i==0 )
- PageIndexTupleDelete(top->page, top->childoffnum);
-
- /* install item on right place in stack */
- top->parent=NULL;
- if ( state.stack ) {
- GISTInsertStack *ptr = state.stack;
- while( ptr->parent )
- ptr = ptr->parent;
- ptr->parent=top;
- } else
- state.stack = top;
}
- /* Good. Now we can continue insert */
-
- gistmakedeal(&state, &giststate);
-
MemoryContextSwitchTo(oldCxt);
MemoryContextReset(opCtx);
}
-#endif
void
gist_xlog_startup(void) {
incomplete_inserts=NIL;
insertCtx = AllocSetContextCreate(CurrentMemoryContext,
- "GiST insert in xlog temporary context",
+ "GiST recovery temporary context",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
@@ -613,16 +707,194 @@ gist_xlog_cleanup(void) {
foreach(l, incomplete_inserts) {
gistIncompleteInsert *insert = (gistIncompleteInsert*) lfirst(l);
- char buf[1024];
-
- *buf='\0';
- out_target(buf, insert->node, insert->key);
- elog(LOG,"Incomplete insert: %s; It's needed to reindex", buf);
-#ifdef GIST_INCOMPLETE_INSERT
gistContinueInsert(insert);
-#endif
}
MemoryContextDelete(opCtx);
MemoryContextDelete(insertCtx);
}
+
+XLogRecData *
+formSplitRdata(RelFileNode node, BlockNumber blkno,
+ OffsetNumber *todelete, int ntodelete,
+ IndexTuple *itup, int ituplen, ItemPointer key,
+ BlockNumber *path, int pathlen, SplitedPageLayout *dist ) {
+
+ XLogRecData *rdata;
+ gistxlogPageSplit *xlrec = (gistxlogPageSplit*)palloc(sizeof(gistxlogPageSplit));
+ SplitedPageLayout *ptr;
+ int npage = 0, cur=1, i;
+
+ ptr=dist;
+ while( ptr ) {
+ npage++;
+ ptr=ptr->next;
+ }
+
+ rdata = (XLogRecData*)palloc(sizeof(XLogRecData)*(npage*2 + ituplen + 3));
+
+ xlrec->node = node;
+ xlrec->origblkno = blkno;
+ xlrec->npage = (uint16)npage;
+ xlrec->nitup = (uint16)ituplen;
+ xlrec->ntodelete = (uint16)ntodelete;
+ xlrec->pathlen = (uint16)pathlen;
+ if ( key )
+ xlrec->key = *key;
+ else
+ ItemPointerSetInvalid( &(xlrec->key) );
+
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].data = (char *) xlrec;
+ rdata[0].len = sizeof( gistxlogPageSplit );
+ rdata[0].next = NULL;
+
+ if ( pathlen ) {
+ rdata[cur-1].next = &(rdata[cur]);
+ rdata[cur].buffer = InvalidBuffer;
+ rdata[cur].data = (char*)path;
+ rdata[cur].len = MAXALIGN(sizeof(BlockNumber)*pathlen);
+ rdata[cur].next = NULL;
+ cur++;
+ }
+
+ if ( ntodelete ) {
+ rdata[cur-1].next = &(rdata[cur]);
+ rdata[cur].buffer = InvalidBuffer;
+ rdata[cur].data = (char*)todelete;
+ rdata[cur].len = MAXALIGN(sizeof(OffsetNumber)*ntodelete);
+ rdata[cur].next = NULL;
+ cur++;
+ }
+
+ /* new tuples */
+ for(i=0;i<ituplen;i++) {
+ rdata[cur].buffer = InvalidBuffer;
+ rdata[cur].data = (char*)(itup[i]);
+ rdata[cur].len = IndexTupleSize(itup[i]);
+ rdata[cur].next = NULL;
+ rdata[cur-1].next = &(rdata[cur]);
+ cur++;
+ }
+
+ ptr=dist;
+ while(ptr) {
+ rdata[cur].buffer = InvalidBuffer;
+ rdata[cur].data = (char*)&(ptr->block);
+ rdata[cur].len = sizeof(gistxlogPage);
+ rdata[cur-1].next = &(rdata[cur]);
+ cur++;
+
+ rdata[cur].buffer = InvalidBuffer;
+ rdata[cur].data = (char*)(ptr->list);
+ rdata[cur].len = MAXALIGN(sizeof(OffsetNumber)*ptr->block.num);
+ if ( rdata[cur].len > sizeof(OffsetNumber)*ptr->block.num )
+ rdata[cur].data = repalloc( rdata[cur].data, rdata[cur].len );
+ rdata[cur-1].next = &(rdata[cur]);
+ rdata[cur].next=NULL;
+ cur++;
+ ptr=ptr->next;
+ }
+
+ return rdata;
+}
+
+
+XLogRecData *
+formUpdateRdata(RelFileNode node, BlockNumber blkno,
+ OffsetNumber *todelete, int ntodelete, bool emptypage,
+ IndexTuple *itup, int ituplen, ItemPointer key,
+ BlockNumber *path, int pathlen) {
+ XLogRecData *rdata;
+ gistxlogEntryUpdate *xlrec = (gistxlogEntryUpdate*)palloc(sizeof(gistxlogEntryUpdate));
+
+ xlrec->node = node;
+ xlrec->blkno = blkno;
+ if ( key )
+ xlrec->key = *key;
+ else
+ ItemPointerSetInvalid( &(xlrec->key) );
+
+ if ( emptypage ) {
+ xlrec->isemptypage = true;
+ xlrec->ntodelete = 0;
+ xlrec->pathlen = 0;
+
+ rdata = (XLogRecData*)palloc( sizeof(XLogRecData) );
+ rdata->buffer = InvalidBuffer;
+ rdata->data = (char*)xlrec;
+ rdata->len = sizeof(gistxlogEntryUpdate);
+ rdata->next = NULL;
+ } else {
+ int cur=1,i;
+
+ xlrec->isemptypage = false;
+ xlrec->ntodelete = ntodelete;
+ xlrec->pathlen = pathlen;
+
+ rdata = (XLogRecData*) palloc( sizeof(XLogRecData) * ( 3 + ituplen ) );
+
+ rdata->buffer = InvalidBuffer;
+ rdata->data = (char*)xlrec;
+ rdata->len = sizeof(gistxlogEntryUpdate);
+ rdata->next = NULL;
+
+ if ( pathlen ) {
+ rdata[cur-1].next = &(rdata[cur]);
+ rdata[cur].buffer = InvalidBuffer;
+ rdata[cur].data = (char*)path;
+ rdata[cur].len = MAXALIGN(sizeof(BlockNumber)*pathlen);
+ rdata[cur].next = NULL;
+ cur++;
+ }
+
+ if ( ntodelete ) {
+ rdata[cur-1].next = &(rdata[cur]);
+ rdata[cur].buffer = InvalidBuffer;
+ rdata[cur].data = (char*)todelete;
+ rdata[cur].len = MAXALIGN(sizeof(OffsetNumber)*ntodelete);
+ rdata[cur].next = NULL;
+ cur++;
+ }
+
+ /* new tuples */
+ for(i=0;i<ituplen;i++) {
+ rdata[cur].buffer = InvalidBuffer;
+ rdata[cur].data = (char*)(itup[i]);
+ rdata[cur].len = IndexTupleSize(itup[i]);
+ rdata[cur].next = NULL;
+ rdata[cur-1].next = &(rdata[cur]);
+ cur++;
+ }
+ }
+
+ return rdata;
+}
+
+XLogRecPtr
+gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len) {
+ gistxlogInsertComplete xlrec;
+ XLogRecData rdata[2];
+ XLogRecPtr recptr;
+
+ Assert(len>0);
+ xlrec.node = node;
+
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].data = (char *) &xlrec;
+ rdata[0].len = sizeof( gistxlogInsertComplete );
+ rdata[0].next = &(rdata[1]);
+
+ rdata[1].buffer = InvalidBuffer;
+ rdata[1].data = (char *) keys;
+ rdata[1].len = sizeof( ItemPointerData ) * len;
+ rdata[1].next = NULL;
+
+ START_CRIT_SECTION();
+
+ recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_INSERT_COMPLETE, rdata);
+
+ END_CRIT_SECTION();
+
+ return recptr;
+}