summaryrefslogtreecommitdiff
path: root/src/backend/access/gist/gistxlog.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/gist/gistxlog.c')
-rw-r--r--src/backend/access/gist/gistxlog.c797
1 files changed, 181 insertions, 616 deletions
diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c
index a90303e547..7d25728d8d 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -20,15 +20,6 @@
#include "utils/memutils.h"
#include "utils/rel.h"
-
-typedef struct
-{
- gistxlogPageUpdate *data;
- int len;
- IndexTuple *itup;
- OffsetNumber *todelete;
-} PageUpdateRecord;
-
typedef struct
{
gistxlogPage *header;
@@ -41,144 +32,37 @@ typedef struct
NewPage *page;
} PageSplitRecord;
-/* track for incomplete inserts, idea was taken from nbtxlog.c */
-
-typedef struct gistIncompleteInsert
-{
- RelFileNode node;
- BlockNumber origblkno; /* for splits */
- ItemPointerData key;
- int lenblk;
- BlockNumber *blkno;
- XLogRecPtr lsn;
- BlockNumber *path;
- int pathlen;
-} gistIncompleteInsert;
-
-
static MemoryContext opCtx; /* working memory for operations */
-static MemoryContext insertCtx; /* holds incomplete_inserts list */
-static List *incomplete_inserts;
-
-
-#define ItemPointerEQ(a, b) \
- ( ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(b) && \
- ItemPointerGetBlockNumber (a) == ItemPointerGetBlockNumber(b) )
-
+/*
+ * Replay the clearing of F_FOLLOW_RIGHT flag.
+ */
static void
-pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key,
- BlockNumber *blkno, int lenblk,
- PageSplitRecord *xlinfo /* to extract blkno info */ )
+gistRedoClearFollowRight(RelFileNode node, XLogRecPtr lsn,
+ BlockNumber leftblkno)
{
- MemoryContext oldCxt;
- gistIncompleteInsert *ninsert;
+ Buffer buffer;
- if (!ItemPointerIsValid(&key))
+ buffer = XLogReadBuffer(node, leftblkno, false);
+ if (BufferIsValid(buffer))
+ {
+ Page page = (Page) BufferGetPage(buffer);
/*
- * if key is null then we should not store insertion as incomplete,
- * because it's a vacuum operation..
+ * Note that we still update the page even if page LSN is equal to the
+ * LSN of this record, because the updated NSN is not included in the
+ * full page image.
*/
- return;
-
- oldCxt = MemoryContextSwitchTo(insertCtx);
- ninsert = (gistIncompleteInsert *) palloc(sizeof(gistIncompleteInsert));
-
- ninsert->node = node;
- ninsert->key = key;
- ninsert->lsn = lsn;
-
- if (lenblk && blkno)
- {
- ninsert->lenblk = lenblk;
- ninsert->blkno = (BlockNumber *) palloc(sizeof(BlockNumber) * ninsert->lenblk);
- memcpy(ninsert->blkno, blkno, sizeof(BlockNumber) * ninsert->lenblk);
- ninsert->origblkno = *blkno;
- }
- else
- {
- int i;
-
- Assert(xlinfo);
- ninsert->lenblk = xlinfo->data->npage;
- ninsert->blkno = (BlockNumber *) palloc(sizeof(BlockNumber) * ninsert->lenblk);
- for (i = 0; i < ninsert->lenblk; i++)
- ninsert->blkno[i] = xlinfo->page[i].header->blkno;
- ninsert->origblkno = xlinfo->data->origblkno;
- }
- Assert(ninsert->lenblk > 0);
-
- /*
- * Stick the new incomplete insert onto the front of the list, not the
- * back. This is so that gist_xlog_cleanup will process incompletions in
- * last-in-first-out order.
- */
- incomplete_inserts = lcons(ninsert, incomplete_inserts);
-
- MemoryContextSwitchTo(oldCxt);
-}
-
-static void
-forgetIncompleteInsert(RelFileNode node, ItemPointerData key)
-{
- ListCell *l;
-
- if (!ItemPointerIsValid(&key))
- return;
-
- if (incomplete_inserts == NIL)
- return;
-
- foreach(l, incomplete_inserts)
- {
- gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l);
-
- if (RelFileNodeEquals(node, insert->node) && ItemPointerEQ(&(insert->key), &(key)))
+ if (!XLByteLT(lsn, PageGetLSN(page)))
{
- /* found */
- incomplete_inserts = list_delete_ptr(incomplete_inserts, insert);
- pfree(insert->blkno);
- pfree(insert);
- break;
- }
- }
-}
+ GistPageGetOpaque(page)->nsn = lsn;
+ GistClearFollowRight(page);
-static void
-decodePageUpdateRecord(PageUpdateRecord *decoded, XLogRecord *record)
-{
- char *begin = XLogRecGetData(record),
- *ptr;
- int i = 0,
- addpath = 0;
-
- decoded->data = (gistxlogPageUpdate *) begin;
-
- if (decoded->data->ntodelete)
- {
- decoded->todelete = (OffsetNumber *) (begin + sizeof(gistxlogPageUpdate) + addpath);
- addpath = MAXALIGN(sizeof(OffsetNumber) * decoded->data->ntodelete);
- }
- else
- decoded->todelete = NULL;
-
- decoded->len = 0;
- ptr = begin + sizeof(gistxlogPageUpdate) + addpath;
- while (ptr - begin < record->xl_len)
- {
- decoded->len++;
- ptr += IndexTupleSize((IndexTuple) ptr);
- }
-
- decoded->itup = (IndexTuple *) palloc(sizeof(IndexTuple) * decoded->len);
-
- ptr = begin + sizeof(gistxlogPageUpdate) + addpath;
- while (ptr - begin < record->xl_len)
- {
- decoded->itup[i] = (IndexTuple) ptr;
- ptr += IndexTupleSize(decoded->itup[i]);
- i++;
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ MarkBufferDirty(buffer);
+ }
+ UnlockReleaseBuffer(buffer);
}
}
@@ -186,29 +70,22 @@ decodePageUpdateRecord(PageUpdateRecord *decoded, XLogRecord *record)
* redo any page update (except page split)
*/
static void
-gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
+gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record)
{
- gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) XLogRecGetData(record);
- PageUpdateRecord xlrec;
+ char *begin = XLogRecGetData(record);
+ gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) begin;
Buffer buffer;
Page page;
+ char *data;
- /* we must fix incomplete_inserts list even if XLR_BKP_BLOCK_1 is set */
- forgetIncompleteInsert(xldata->node, xldata->key);
+ if (BlockNumberIsValid(xldata->leftchild))
+ gistRedoClearFollowRight(xldata->node, lsn, xldata->leftchild);
- if (!isnewroot && xldata->blkno != GIST_ROOT_BLKNO)
- /* operation with root always finalizes insertion */
- pushIncompleteInsert(xldata->node, lsn, xldata->key,
- &(xldata->blkno), 1,
- NULL);
-
- /* nothing else to do if page was backed up (and no info to do it with) */
+ /* nothing more to do if page was backed up (and no info to do it with) */
if (record->xl_info & XLR_BKP_BLOCK_1)
return;
- decodePageUpdateRecord(&xlrec, record);
-
- buffer = XLogReadBuffer(xlrec.data->node, xlrec.data->blkno, false);
+ buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
if (!BufferIsValid(buffer))
return;
page = (Page) BufferGetPage(buffer);
@@ -219,28 +96,49 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
return;
}
- if (isnewroot)
- GISTInitBuffer(buffer, 0);
- else if (xlrec.data->ntodelete)
+ data = begin + sizeof(gistxlogPageUpdate);
+
+ /* Delete old tuples */
+ if (xldata->ntodelete > 0)
{
int i;
+ OffsetNumber *todelete = (OffsetNumber *) data;
+ data += sizeof(OffsetNumber) * xldata->ntodelete;
- for (i = 0; i < xlrec.data->ntodelete; i++)
- PageIndexTupleDelete(page, xlrec.todelete[i]);
+ for (i = 0; i < xldata->ntodelete; i++)
+ PageIndexTupleDelete(page, todelete[i]);
if (GistPageIsLeaf(page))
GistMarkTuplesDeleted(page);
}
/* add tuples */
- if (xlrec.len > 0)
- gistfillbuffer(page, xlrec.itup, xlrec.len, InvalidOffsetNumber);
-
- /*
- * special case: leafpage, nothing to insert, nothing to delete, then
- * vacuum marks page
- */
- if (GistPageIsLeaf(page) && xlrec.len == 0 && xlrec.data->ntodelete == 0)
- GistClearTuplesDeleted(page);
+ if (data - begin < record->xl_len)
+ {
+ OffsetNumber off = (PageIsEmpty(page)) ? FirstOffsetNumber :
+ OffsetNumberNext(PageGetMaxOffsetNumber(page));
+ while (data - begin < record->xl_len)
+ {
+ IndexTuple itup = (IndexTuple) data;
+ Size sz = IndexTupleSize(itup);
+ OffsetNumber l;
+ data += sz;
+
+ l = PageAddItem(page, (Item) itup, sz, off, false, false);
+ if (l == InvalidOffsetNumber)
+ elog(ERROR, "failed to add item to GiST index page, size %d bytes",
+ (int) sz);
+ off++;
+ }
+ }
+ else
+ {
+ /*
+ * special case: leafpage, nothing to insert, nothing to delete, then
+ * vacuum marks page
+ */
+ if (GistPageIsLeaf(page) && xldata->ntodelete == 0)
+ GistClearTuplesDeleted(page);
+ }
if (!GistPageIsLeaf(page) && PageGetMaxOffsetNumber(page) == InvalidOffsetNumber && xldata->blkno == GIST_ROOT_BLKNO)
@@ -315,41 +213,67 @@ decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record)
static void
gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
{
+ gistxlogPageSplit *xldata = (gistxlogPageSplit *) XLogRecGetData(record);
PageSplitRecord xlrec;
Buffer buffer;
Page page;
int i;
- int flags;
+ bool isrootsplit = false;
+ if (BlockNumberIsValid(xldata->leftchild))
+ gistRedoClearFollowRight(xldata->node, lsn, xldata->leftchild);
decodePageSplitRecord(&xlrec, record);
- flags = xlrec.data->origleaf ? F_LEAF : 0;
/* loop around all pages */
for (i = 0; i < xlrec.data->npage; i++)
{
NewPage *newpage = xlrec.page + i;
+ int flags;
+
+ if (newpage->header->blkno == GIST_ROOT_BLKNO)
+ {
+ Assert(i == 0);
+ isrootsplit = true;
+ }
buffer = XLogReadBuffer(xlrec.data->node, newpage->header->blkno, true);
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
/* ok, clear buffer */
+ if (xlrec.data->origleaf && newpage->header->blkno != GIST_ROOT_BLKNO)
+ flags = F_LEAF;
+ else
+ flags = 0;
GISTInitBuffer(buffer, flags);
/* and fill it */
gistfillbuffer(page, newpage->itup, newpage->header->num, FirstOffsetNumber);
+ if (newpage->header->blkno == GIST_ROOT_BLKNO)
+ {
+ GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
+ GistPageGetOpaque(page)->nsn = xldata->orignsn;
+ GistClearFollowRight(page);
+ }
+ else
+ {
+ if (i < xlrec.data->npage - 1)
+ GistPageGetOpaque(page)->rightlink = xlrec.page[i + 1].header->blkno;
+ else
+ GistPageGetOpaque(page)->rightlink = xldata->origrlink;
+ GistPageGetOpaque(page)->nsn = xldata->orignsn;
+ if (i < xlrec.data->npage - 1 && !isrootsplit)
+ GistMarkFollowRight(page);
+ else
+ GistClearFollowRight(page);
+ }
+
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
}
-
- forgetIncompleteInsert(xlrec.data->node, xlrec.data->key);
-
- pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key,
- NULL, 0,
- &xlrec);
}
static void
@@ -372,24 +296,6 @@ gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
UnlockReleaseBuffer(buffer);
}
-static void
-gistRedoCompleteInsert(XLogRecPtr lsn, XLogRecord *record)
-{
- char *begin = XLogRecGetData(record),
- *ptr;
- gistxlogInsertComplete *xlrec;
-
- xlrec = (gistxlogInsertComplete *) begin;
-
- ptr = begin + sizeof(gistxlogInsertComplete);
- while (ptr - begin < record->xl_len)
- {
- Assert(record->xl_len - (ptr - begin) >= sizeof(ItemPointerData));
- forgetIncompleteInsert(xlrec->node, *((ItemPointerData *) ptr));
- ptr += sizeof(ItemPointerData);
- }
-}
-
void
gist_redo(XLogRecPtr lsn, XLogRecord *record)
{
@@ -401,30 +307,23 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record)
* implement a similar optimization we have in b-tree, and remove killed
* tuples outside VACUUM, we'll need to handle that here.
*/
-
RestoreBkpBlocks(lsn, record, false);
oldCxt = MemoryContextSwitchTo(opCtx);
switch (info)
{
case XLOG_GIST_PAGE_UPDATE:
- gistRedoPageUpdateRecord(lsn, record, false);
+ gistRedoPageUpdateRecord(lsn, record);
break;
case XLOG_GIST_PAGE_DELETE:
gistRedoPageDeleteRecord(lsn, record);
break;
- case XLOG_GIST_NEW_ROOT:
- gistRedoPageUpdateRecord(lsn, record, true);
- break;
case XLOG_GIST_PAGE_SPLIT:
gistRedoPageSplitRecord(lsn, record);
break;
case XLOG_GIST_CREATE_INDEX:
gistRedoCreateIndex(lsn, record);
break;
- case XLOG_GIST_INSERT_COMPLETE:
- gistRedoCompleteInsert(lsn, record);
- break;
default:
elog(PANIC, "gist_redo: unknown op code %u", info);
}
@@ -434,20 +333,16 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record)
}
static void
-out_target(StringInfo buf, RelFileNode node, ItemPointerData key)
+out_target(StringInfo buf, RelFileNode node)
{
appendStringInfo(buf, "rel %u/%u/%u",
node.spcNode, node.dbNode, node.relNode);
- if (ItemPointerIsValid(&key))
- appendStringInfo(buf, "; tid %u/%u",
- ItemPointerGetBlockNumber(&key),
- ItemPointerGetOffsetNumber(&key));
}
static void
out_gistxlogPageUpdate(StringInfo buf, gistxlogPageUpdate *xlrec)
{
- out_target(buf, xlrec->node, xlrec->key);
+ out_target(buf, xlrec->node);
appendStringInfo(buf, "; block number %u", xlrec->blkno);
}
@@ -463,7 +358,7 @@ static void
out_gistxlogPageSplit(StringInfo buf, gistxlogPageSplit *xlrec)
{
appendStringInfo(buf, "page_split: ");
- out_target(buf, xlrec->node, xlrec->key);
+ out_target(buf, xlrec->node);
appendStringInfo(buf, "; block number %u splits to %d pages",
xlrec->origblkno, xlrec->npage);
}
@@ -482,10 +377,6 @@ gist_desc(StringInfo buf, uint8 xl_info, char *rec)
case XLOG_GIST_PAGE_DELETE:
out_gistxlogPageDelete(buf, (gistxlogPageDelete *) rec);
break;
- case XLOG_GIST_NEW_ROOT:
- appendStringInfo(buf, "new_root: ");
- out_target(buf, ((gistxlogPageUpdate *) rec)->node, ((gistxlogPageUpdate *) rec)->key);
- break;
case XLOG_GIST_PAGE_SPLIT:
out_gistxlogPageSplit(buf, (gistxlogPageSplit *) rec);
break;
@@ -495,415 +386,102 @@ gist_desc(StringInfo buf, uint8 xl_info, char *rec)
((RelFileNode *) rec)->dbNode,
((RelFileNode *) rec)->relNode);
break;
- case XLOG_GIST_INSERT_COMPLETE:
- appendStringInfo(buf, "complete_insert: rel %u/%u/%u",
- ((gistxlogInsertComplete *) rec)->node.spcNode,
- ((gistxlogInsertComplete *) rec)->node.dbNode,
- ((gistxlogInsertComplete *) rec)->node.relNode);
- break;
default:
appendStringInfo(buf, "unknown gist op code %u", info);
break;
}
}
-IndexTuple
-gist_form_invalid_tuple(BlockNumber blkno)
-{
- /*
- * we don't alloc space for null's bitmap, this is invalid tuple, be
- * carefull in read and write code
- */
- Size size = IndexInfoFindDataOffset(0);
- IndexTuple tuple = (IndexTuple) palloc0(size);
-
- tuple->t_info |= size;
-
- ItemPointerSetBlockNumber(&(tuple->t_tid), blkno);
- GistTupleSetInvalid(tuple);
-
- return tuple;
-}
-
-
-static void
-gistxlogFindPath(Relation index, gistIncompleteInsert *insert)
-{
- GISTInsertStack *top;
-
- insert->pathlen = 0;
- insert->path = NULL;
-
- if ((top = gistFindPath(index, insert->origblkno)) != NULL)
- {
- int i;
- GISTInsertStack *ptr;
-
- for (ptr = top; ptr; ptr = ptr->parent)
- insert->pathlen++;
-
- insert->path = (BlockNumber *) palloc(sizeof(BlockNumber) * insert->pathlen);
-
- i = 0;
- for (ptr = top; ptr; ptr = ptr->parent)
- insert->path[i++] = ptr->blkno;
- }
- else
- elog(ERROR, "lost parent for block %u", insert->origblkno);
-}
-
-static SplitedPageLayout *
-gistMakePageLayout(Buffer *buffers, int nbuffers)
-{
- SplitedPageLayout *res = NULL,
- *resptr;
-
- while (nbuffers-- > 0)
- {
- Page page = BufferGetPage(buffers[nbuffers]);
- IndexTuple *vec;
- int veclen;
-
- resptr = (SplitedPageLayout *) palloc0(sizeof(SplitedPageLayout));
-
- resptr->block.blkno = BufferGetBlockNumber(buffers[nbuffers]);
- resptr->block.num = PageGetMaxOffsetNumber(page);
-
- vec = gistextractpage(page, &veclen);
- resptr->list = gistfillitupvec(vec, veclen, &(resptr->lenlist));
-
- resptr->next = res;
- res = resptr;
- }
-
- return res;
-}
-
-/*
- * Continue insert after crash. In normal situations, there aren't any
- * incomplete inserts, but if a crash occurs partway through an insertion
- * sequence, we'll need to finish making the index valid at the end of WAL
- * replay.
- *
- * Note that we assume the index is now in a valid state, except for the
- * unfinished insertion. In particular it's safe to invoke gistFindPath();
- * there shouldn't be any garbage pages for it to run into.
- *
- * To complete insert we can't use basic insertion algorithm because
- * during insertion we can't call user-defined support functions of opclass.
- * So, we insert 'invalid' tuples without real key and do it by separate algorithm.
- * 'invalid' tuple should be updated by vacuum full.
- */
-static void
-gistContinueInsert(gistIncompleteInsert *insert)
-{
- IndexTuple *itup;
- int i,
- lenitup;
- Relation index;
-
- index = CreateFakeRelcacheEntry(insert->node);
-
- /*
- * needed vector itup never will be more than initial lenblkno+2, because
- * during this processing Indextuple can be only smaller
- */
- lenitup = insert->lenblk;
- itup = (IndexTuple *) palloc(sizeof(IndexTuple) * (lenitup + 2 /* guarantee root split */ ));
-
- for (i = 0; i < insert->lenblk; i++)
- itup[i] = gist_form_invalid_tuple(insert->blkno[i]);
-
- /*
- * any insertion of itup[] should make LOG message about
- */
-
- if (insert->origblkno == GIST_ROOT_BLKNO)
- {
- /*
- * it was split root, so we should only make new root. it can't be
- * simple insert into root, we should replace all content of root.
- */
- Buffer buffer = XLogReadBuffer(insert->node, GIST_ROOT_BLKNO, true);
-
- gistnewroot(index, buffer, itup, lenitup, NULL);
- UnlockReleaseBuffer(buffer);
- }
- else
- {
- Buffer *buffers;
- Page *pages;
- int numbuffer;
- OffsetNumber *todelete;
-
- /* construct path */
- gistxlogFindPath(index, insert);
-
- Assert(insert->pathlen > 0);
-
- buffers = (Buffer *) palloc(sizeof(Buffer) * (insert->lenblk + 2 /* guarantee root split */ ));
- pages = (Page *) palloc(sizeof(Page) * (insert->lenblk + 2 /* guarantee root split */ ));
- todelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (insert->lenblk + 2 /* guarantee root split */ ));
-
- for (i = 0; i < insert->pathlen; i++)
- {
- int j,
- k,
- pituplen = 0;
- uint8 xlinfo;
- XLogRecData *rdata;
- XLogRecPtr recptr;
- Buffer tempbuffer = InvalidBuffer;
- int ntodelete = 0;
-
- numbuffer = 1;
- buffers[0] = ReadBuffer(index, insert->path[i]);
- LockBuffer(buffers[0], GIST_EXCLUSIVE);
-
- /*
- * we check buffer, because we restored page earlier
- */
- gistcheckpage(index, buffers[0]);
-
- pages[0] = BufferGetPage(buffers[0]);
- Assert(!GistPageIsLeaf(pages[0]));
-
- pituplen = PageGetMaxOffsetNumber(pages[0]);
-
- /* find remove old IndexTuples to remove */
- for (j = 0; j < pituplen && ntodelete < lenitup; j++)
- {
- BlockNumber blkno;
- ItemId iid = PageGetItemId(pages[0], j + FirstOffsetNumber);
- IndexTuple idxtup = (IndexTuple) PageGetItem(pages[0], iid);
-
- blkno = ItemPointerGetBlockNumber(&(idxtup->t_tid));
-
- for (k = 0; k < lenitup; k++)
- if (ItemPointerGetBlockNumber(&(itup[k]->t_tid)) == blkno)
- {
- todelete[ntodelete] = j + FirstOffsetNumber - ntodelete;
- ntodelete++;
- break;
- }
- }
-
- if (ntodelete == 0)
- elog(PANIC, "gistContinueInsert: cannot find pointer to page(s)");
-
- /*
- * we check space with subtraction only first tuple to delete,
- * hope, that wiil be enough space....
- */
-
- if (gistnospace(pages[0], itup, lenitup, *todelete, 0))
- {
-
- /* no space left on page, so we must split */
- buffers[numbuffer] = ReadBuffer(index, P_NEW);
- LockBuffer(buffers[numbuffer], GIST_EXCLUSIVE);
- GISTInitBuffer(buffers[numbuffer], 0);
- pages[numbuffer] = BufferGetPage(buffers[numbuffer]);
- gistfillbuffer(pages[numbuffer], itup, lenitup, FirstOffsetNumber);
- numbuffer++;
-
- if (BufferGetBlockNumber(buffers[0]) == GIST_ROOT_BLKNO)
- {
- Buffer tmp;
-
- /*
- * we split root, just copy content from root to new page
- */
-
- /* sanity check */
- if (i + 1 != insert->pathlen)
- elog(PANIC, "unexpected pathlen in index \"%s\"",
- RelationGetRelationName(index));
-
- /* fill new page, root will be changed later */
- tempbuffer = ReadBuffer(index, P_NEW);
- LockBuffer(tempbuffer, GIST_EXCLUSIVE);
- memcpy(BufferGetPage(tempbuffer), pages[0], BufferGetPageSize(tempbuffer));
-
- /* swap buffers[0] (was root) and temp buffer */
- tmp = buffers[0];
- buffers[0] = tempbuffer;
- tempbuffer = tmp; /* now in tempbuffer GIST_ROOT_BLKNO,
- * it is still unchanged */
-
- pages[0] = BufferGetPage(buffers[0]);
- }
-
- START_CRIT_SECTION();
-
- for (j = 0; j < ntodelete; j++)
- PageIndexTupleDelete(pages[0], todelete[j]);
-
- xlinfo = XLOG_GIST_PAGE_SPLIT;
- rdata = formSplitRdata(index->rd_node, insert->path[i],
- false, &(insert->key),
- gistMakePageLayout(buffers, numbuffer));
-
- }
- else
- {
- START_CRIT_SECTION();
-
- for (j = 0; j < ntodelete; j++)
- PageIndexTupleDelete(pages[0], todelete[j]);
- gistfillbuffer(pages[0], itup, lenitup, InvalidOffsetNumber);
-
- xlinfo = XLOG_GIST_PAGE_UPDATE;
- rdata = formUpdateRdata(index->rd_node, buffers[0],
- todelete, ntodelete,
- itup, lenitup, &(insert->key));
- }
-
- /*
- * use insert->key as mark for completion of insert (form*Rdata()
- * above) for following possible replays
- */
-
- /* write pages, we should mark it dirty befor XLogInsert() */
- for (j = 0; j < numbuffer; j++)
- {
- GistPageGetOpaque(pages[j])->rightlink = InvalidBlockNumber;
- MarkBufferDirty(buffers[j]);
- }
- recptr = XLogInsert(RM_GIST_ID, xlinfo, rdata);
- for (j = 0; j < numbuffer; j++)
- {
- PageSetLSN(pages[j], recptr);
- PageSetTLI(pages[j], ThisTimeLineID);
- }
-
- END_CRIT_SECTION();
-
- lenitup = numbuffer;
- for (j = 0; j < numbuffer; j++)
- {
- itup[j] = gist_form_invalid_tuple(BufferGetBlockNumber(buffers[j]));
- UnlockReleaseBuffer(buffers[j]);
- }
-
- if (tempbuffer != InvalidBuffer)
- {
- /*
- * it was a root split, so fill it by new values
- */
- gistnewroot(index, tempbuffer, itup, lenitup, &(insert->key));
- UnlockReleaseBuffer(tempbuffer);
- }
- }
- }
-
- FreeFakeRelcacheEntry(index);
-
- ereport(LOG,
- (errmsg("index %u/%u/%u needs VACUUM FULL or REINDEX to finish crash recovery",
- insert->node.spcNode, insert->node.dbNode, insert->node.relNode),
- errdetail("Incomplete insertion detected during crash replay.")));
-}
-
void
gist_xlog_startup(void)
{
- incomplete_inserts = NIL;
- insertCtx = AllocSetContextCreate(CurrentMemoryContext,
- "GiST recovery temporary context",
- ALLOCSET_DEFAULT_MINSIZE,
- ALLOCSET_DEFAULT_INITSIZE,
- ALLOCSET_DEFAULT_MAXSIZE);
opCtx = createTempGistContext();
}
void
gist_xlog_cleanup(void)
{
- ListCell *l;
- MemoryContext oldCxt;
-
- oldCxt = MemoryContextSwitchTo(opCtx);
-
- foreach(l, incomplete_inserts)
- {
- gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l);
-
- gistContinueInsert(insert);
- MemoryContextReset(opCtx);
- }
- MemoryContextSwitchTo(oldCxt);
-
MemoryContextDelete(opCtx);
- MemoryContextDelete(insertCtx);
-}
-
-bool
-gist_safe_restartpoint(void)
-{
- if (incomplete_inserts)
- return false;
- return true;
}
-
-XLogRecData *
-formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
- ItemPointer key, SplitedPageLayout *dist)
+/*
+ * Write WAL record of a page split.
+ */
+XLogRecPtr
+gistXLogSplit(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
+ SplitedPageLayout *dist,
+ BlockNumber origrlink, GistNSN orignsn,
+ Buffer leftchildbuf)
{
XLogRecData *rdata;
- gistxlogPageSplit *xlrec = (gistxlogPageSplit *) palloc(sizeof(gistxlogPageSplit));
+ gistxlogPageSplit xlrec;
SplitedPageLayout *ptr;
int npage = 0,
- cur = 1;
+ cur;
+ XLogRecPtr recptr;
- ptr = dist;
- while (ptr)
- {
+ for (ptr = dist; ptr; ptr = ptr->next)
npage++;
- ptr = ptr->next;
- }
rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (npage * 2 + 2));
- xlrec->node = node;
- xlrec->origblkno = blkno;
- xlrec->origleaf = page_is_leaf;
- xlrec->npage = (uint16) npage;
- if (key)
- xlrec->key = *key;
- else
- ItemPointerSetInvalid(&(xlrec->key));
+ xlrec.node = node;
+ xlrec.origblkno = blkno;
+ xlrec.origrlink = origrlink;
+ xlrec.orignsn = orignsn;
+ xlrec.origleaf = page_is_leaf;
+ xlrec.npage = (uint16) npage;
+ xlrec.leftchild =
+ BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].data = (char *) xlrec;
+ rdata[0].data = (char *) &xlrec;
rdata[0].len = sizeof(gistxlogPageSplit);
- rdata[0].next = NULL;
+ rdata[0].buffer = InvalidBuffer;
- ptr = dist;
- while (ptr)
+ cur = 1;
+
+ /*
+ * Include a full page image of the child buf. (only necessary if a
+ * checkpoint happened since the child page was split)
+ */
+ if (BufferIsValid(leftchildbuf))
{
+ rdata[cur - 1].next = &(rdata[cur]);
+ rdata[cur].data = NULL;
+ rdata[cur].len = 0;
+ rdata[cur].buffer = leftchildbuf;
+ rdata[cur].buffer_std = true;
+ cur++;
+ }
+
+ for (ptr = dist; ptr; ptr = ptr->next)
+ {
+ rdata[cur - 1].next = &(rdata[cur]);
rdata[cur].buffer = InvalidBuffer;
rdata[cur].data = (char *) &(ptr->block);
rdata[cur].len = sizeof(gistxlogPage);
- rdata[cur - 1].next = &(rdata[cur]);
cur++;
+ rdata[cur - 1].next = &(rdata[cur]);
rdata[cur].buffer = InvalidBuffer;
rdata[cur].data = (char *) (ptr->list);
rdata[cur].len = ptr->lenlist;
- rdata[cur - 1].next = &(rdata[cur]);
- rdata[cur].next = NULL;
cur++;
- ptr = ptr->next;
}
+ rdata[cur - 1].next = NULL;
+
+ recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
- return rdata;
+ pfree(rdata);
+ return recptr;
}
/*
- * Construct the rdata array for an XLOG record describing a page update
- * (deletion and/or insertion of tuples on a single index page).
+ * Write XLOG record describing a page update. The update can include any
+ * number of deletions and/or insertions of tuples on a single index page.
+ *
+ * If this update inserts a downlink for a split page, also record that
+ * the F_FOLLOW_RIGHT flag on the child page is cleared and NSN set.
*
* Note that both the todelete array and the tuples are marked as belonging
* to the target buffer; they need not be stored in XLOG if XLogInsert decides
@@ -911,27 +489,26 @@ formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
* at least one rdata item referencing the buffer, even when ntodelete and
* ituplen are both zero; this ensures that XLogInsert knows about the buffer.
*/
-XLogRecData *
-formUpdateRdata(RelFileNode node, Buffer buffer,
- OffsetNumber *todelete, int ntodelete,
- IndexTuple *itup, int ituplen, ItemPointer key)
+XLogRecPtr
+gistXLogUpdate(RelFileNode node, Buffer buffer,
+ OffsetNumber *todelete, int ntodelete,
+ IndexTuple *itup, int ituplen,
+ Buffer leftchildbuf)
{
XLogRecData *rdata;
gistxlogPageUpdate *xlrec;
int cur,
i;
+ XLogRecPtr recptr;
- rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (3 + ituplen));
+ rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (4 + ituplen));
xlrec = (gistxlogPageUpdate *) palloc(sizeof(gistxlogPageUpdate));
xlrec->node = node;
xlrec->blkno = BufferGetBlockNumber(buffer);
xlrec->ntodelete = ntodelete;
-
- if (key)
- xlrec->key = *key;
- else
- ItemPointerSetInvalid(&(xlrec->key));
+ xlrec->leftchild =
+ BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber;
rdata[0].buffer = buffer;
rdata[0].buffer_std = true;
@@ -945,13 +522,13 @@ formUpdateRdata(RelFileNode node, Buffer buffer,
rdata[1].next = &(rdata[2]);
rdata[2].data = (char *) todelete;
- rdata[2].len = MAXALIGN(sizeof(OffsetNumber) * ntodelete);
+ rdata[2].len = sizeof(OffsetNumber) * ntodelete;
rdata[2].buffer = buffer;
rdata[2].buffer_std = true;
- rdata[2].next = NULL;
- /* new tuples */
cur = 3;
+
+ /* new tuples */
for (i = 0; i < ituplen; i++)
{
rdata[cur - 1].next = &(rdata[cur]);
@@ -959,38 +536,26 @@ formUpdateRdata(RelFileNode node, Buffer buffer,
rdata[cur].len = IndexTupleSize(itup[i]);
rdata[cur].buffer = buffer;
rdata[cur].buffer_std = true;
- rdata[cur].next = NULL;
cur++;
}
- return rdata;
-}
-
-XLogRecPtr
-gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len)
-{
- gistxlogInsertComplete xlrec;
- XLogRecData rdata[2];
- XLogRecPtr recptr;
-
- Assert(len > 0);
- xlrec.node = node;
-
- rdata[0].buffer = InvalidBuffer;
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = sizeof(gistxlogInsertComplete);
- rdata[0].next = &(rdata[1]);
-
- rdata[1].buffer = InvalidBuffer;
- rdata[1].data = (char *) keys;
- rdata[1].len = sizeof(ItemPointerData) * len;
- rdata[1].next = NULL;
-
- START_CRIT_SECTION();
-
- recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_INSERT_COMPLETE, rdata);
+ /*
+ * Include a full page image of the child buf. (only necessary if
+ * a checkpoint happened since the child page was split)
+ */
+ if (BufferIsValid(leftchildbuf))
+ {
+ rdata[cur - 1].next = &(rdata[cur]);
+ rdata[cur].data = NULL;
+ rdata[cur].len = 0;
+ rdata[cur].buffer = leftchildbuf;
+ rdata[cur].buffer_std = true;
+ cur++;
+ }
+ rdata[cur - 1].next = NULL;
- END_CRIT_SECTION();
+ recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
+ pfree(rdata);
return recptr;
}