diff options
author | Heikki Linnakangas <heikki.linnakangas@iki.fi> | 2010-12-23 16:03:08 +0200 |
---|---|---|
committer | Heikki Linnakangas <heikki.linnakangas@iki.fi> | 2010-12-23 16:21:47 +0200 |
commit | 9de3aa65f01fb51cbc725e8508ea233e4e92c46c (patch) | |
tree | d78b3dcde6cd6955af5ddb913f9a32bffd1a25a9 /src/backend/access/gist/gistxlog.c | |
parent | 7a1ca8977fd109a86b540444f71f24bd2f38ea43 (diff) | |
download | postgresql-9de3aa65f01fb51cbc725e8508ea233e4e92c46c.tar.gz |
Rewrite the GiST insertion logic so that we don't need the post-recovery
cleanup stage to finish incomplete inserts or splits anymore. There was two
reasons for the cleanup step:
1. When a new tuple was inserted to a leaf page, the downlink in the parent
needed to be updated to contain (ie. to be consistent with) the new key.
Updating the parent in turn might require recursively updating the parent of
the parent. We now handle that by updating the parent while traversing down
the tree, so that when we insert the leaf tuple, all the parents are already
consistent with the new key, and the tree is consistent at every step.
2. When a page is split, we need to insert the downlink for the new right
page(s), and update the downlink for the original page to not include keys
that moved to the right page(s). We now handle that by setting a new flag,
F_FOLLOW_RIGHT, on the non-rightmost pages in the split. When that flag is
set, scans always follow the rightlink, regardless of the NSN mechanism used
to detect concurrent page splits. That way the tree is consistent right after
split, even though the downlink is still missing. This is very similar to the
way B-tree splits are handled. When the downlink is inserted in the parent,
the flag is cleared. To keep the insertion algorithm simple, when an
insertion sees an incomplete split, indicated by the F_FOLLOW_RIGHT flag, it
finishes the split before doing anything else.
These changes allow removing the whole "invalid tuple" mechanism, but I
retained the scan code to still follow invalid tuples correctly. While we
don't create any such tuples anymore, we want to handle them gracefully in
case you pg_upgrade a GiST index that has them. If we encounter any on an
insert, though, we just throw an error saying that you need to REINDEX.
The issue that got me into doing this is that if you did a checkpoint while
an insert or split was in progress, and the checkpoint finishes quickly so
that there is no WAL record related to the insert between RedoRecPtr and the
checkpoint record, recovery from that checkpoint would not know to finish
the incomplete insert. IOW, we have the same issue we solved with the
rm_safe_restartpoint mechanism during normal operation too. It's highly
unlikely to happen in practice, and this fix is far too large to backpatch,
so we're just going to live with in previous versions, but this refactoring
fixes it going forward.
With this patch, you don't get the annoying
'index "FOO" needs VACUUM or REINDEX to finish crash recovery' notices
anymore if you crash at an unfortunate moment.
Diffstat (limited to 'src/backend/access/gist/gistxlog.c')
-rw-r--r-- | src/backend/access/gist/gistxlog.c | 797 |
1 files changed, 181 insertions, 616 deletions
diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c index a90303e547..7d25728d8d 100644 --- a/src/backend/access/gist/gistxlog.c +++ b/src/backend/access/gist/gistxlog.c @@ -20,15 +20,6 @@ #include "utils/memutils.h" #include "utils/rel.h" - -typedef struct -{ - gistxlogPageUpdate *data; - int len; - IndexTuple *itup; - OffsetNumber *todelete; -} PageUpdateRecord; - typedef struct { gistxlogPage *header; @@ -41,144 +32,37 @@ typedef struct NewPage *page; } PageSplitRecord; -/* track for incomplete inserts, idea was taken from nbtxlog.c */ - -typedef struct gistIncompleteInsert -{ - RelFileNode node; - BlockNumber origblkno; /* for splits */ - ItemPointerData key; - int lenblk; - BlockNumber *blkno; - XLogRecPtr lsn; - BlockNumber *path; - int pathlen; -} gistIncompleteInsert; - - static MemoryContext opCtx; /* working memory for operations */ -static MemoryContext insertCtx; /* holds incomplete_inserts list */ -static List *incomplete_inserts; - - -#define ItemPointerEQ(a, b) \ - ( ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(b) && \ - ItemPointerGetBlockNumber (a) == ItemPointerGetBlockNumber(b) ) - +/* + * Replay the clearing of F_FOLLOW_RIGHT flag. + */ static void -pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key, - BlockNumber *blkno, int lenblk, - PageSplitRecord *xlinfo /* to extract blkno info */ ) +gistRedoClearFollowRight(RelFileNode node, XLogRecPtr lsn, + BlockNumber leftblkno) { - MemoryContext oldCxt; - gistIncompleteInsert *ninsert; + Buffer buffer; - if (!ItemPointerIsValid(&key)) + buffer = XLogReadBuffer(node, leftblkno, false); + if (BufferIsValid(buffer)) + { + Page page = (Page) BufferGetPage(buffer); /* - * if key is null then we should not store insertion as incomplete, - * because it's a vacuum operation.. + * Note that we still update the page even if page LSN is equal to the + * LSN of this record, because the updated NSN is not included in the + * full page image. */ - return; - - oldCxt = MemoryContextSwitchTo(insertCtx); - ninsert = (gistIncompleteInsert *) palloc(sizeof(gistIncompleteInsert)); - - ninsert->node = node; - ninsert->key = key; - ninsert->lsn = lsn; - - if (lenblk && blkno) - { - ninsert->lenblk = lenblk; - ninsert->blkno = (BlockNumber *) palloc(sizeof(BlockNumber) * ninsert->lenblk); - memcpy(ninsert->blkno, blkno, sizeof(BlockNumber) * ninsert->lenblk); - ninsert->origblkno = *blkno; - } - else - { - int i; - - Assert(xlinfo); - ninsert->lenblk = xlinfo->data->npage; - ninsert->blkno = (BlockNumber *) palloc(sizeof(BlockNumber) * ninsert->lenblk); - for (i = 0; i < ninsert->lenblk; i++) - ninsert->blkno[i] = xlinfo->page[i].header->blkno; - ninsert->origblkno = xlinfo->data->origblkno; - } - Assert(ninsert->lenblk > 0); - - /* - * Stick the new incomplete insert onto the front of the list, not the - * back. This is so that gist_xlog_cleanup will process incompletions in - * last-in-first-out order. - */ - incomplete_inserts = lcons(ninsert, incomplete_inserts); - - MemoryContextSwitchTo(oldCxt); -} - -static void -forgetIncompleteInsert(RelFileNode node, ItemPointerData key) -{ - ListCell *l; - - if (!ItemPointerIsValid(&key)) - return; - - if (incomplete_inserts == NIL) - return; - - foreach(l, incomplete_inserts) - { - gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l); - - if (RelFileNodeEquals(node, insert->node) && ItemPointerEQ(&(insert->key), &(key))) + if (!XLByteLT(lsn, PageGetLSN(page))) { - /* found */ - incomplete_inserts = list_delete_ptr(incomplete_inserts, insert); - pfree(insert->blkno); - pfree(insert); - break; - } - } -} + GistPageGetOpaque(page)->nsn = lsn; + GistClearFollowRight(page); -static void -decodePageUpdateRecord(PageUpdateRecord *decoded, XLogRecord *record) -{ - char *begin = XLogRecGetData(record), - *ptr; - int i = 0, - addpath = 0; - - decoded->data = (gistxlogPageUpdate *) begin; - - if (decoded->data->ntodelete) - { - decoded->todelete = (OffsetNumber *) (begin + sizeof(gistxlogPageUpdate) + addpath); - addpath = MAXALIGN(sizeof(OffsetNumber) * decoded->data->ntodelete); - } - else - decoded->todelete = NULL; - - decoded->len = 0; - ptr = begin + sizeof(gistxlogPageUpdate) + addpath; - while (ptr - begin < record->xl_len) - { - decoded->len++; - ptr += IndexTupleSize((IndexTuple) ptr); - } - - decoded->itup = (IndexTuple *) palloc(sizeof(IndexTuple) * decoded->len); - - ptr = begin + sizeof(gistxlogPageUpdate) + addpath; - while (ptr - begin < record->xl_len) - { - decoded->itup[i] = (IndexTuple) ptr; - ptr += IndexTupleSize(decoded->itup[i]); - i++; + PageSetLSN(page, lsn); + PageSetTLI(page, ThisTimeLineID); + MarkBufferDirty(buffer); + } + UnlockReleaseBuffer(buffer); } } @@ -186,29 +70,22 @@ decodePageUpdateRecord(PageUpdateRecord *decoded, XLogRecord *record) * redo any page update (except page split) */ static void -gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) +gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record) { - gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) XLogRecGetData(record); - PageUpdateRecord xlrec; + char *begin = XLogRecGetData(record); + gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) begin; Buffer buffer; Page page; + char *data; - /* we must fix incomplete_inserts list even if XLR_BKP_BLOCK_1 is set */ - forgetIncompleteInsert(xldata->node, xldata->key); + if (BlockNumberIsValid(xldata->leftchild)) + gistRedoClearFollowRight(xldata->node, lsn, xldata->leftchild); - if (!isnewroot && xldata->blkno != GIST_ROOT_BLKNO) - /* operation with root always finalizes insertion */ - pushIncompleteInsert(xldata->node, lsn, xldata->key, - &(xldata->blkno), 1, - NULL); - - /* nothing else to do if page was backed up (and no info to do it with) */ + /* nothing more to do if page was backed up (and no info to do it with) */ if (record->xl_info & XLR_BKP_BLOCK_1) return; - decodePageUpdateRecord(&xlrec, record); - - buffer = XLogReadBuffer(xlrec.data->node, xlrec.data->blkno, false); + buffer = XLogReadBuffer(xldata->node, xldata->blkno, false); if (!BufferIsValid(buffer)) return; page = (Page) BufferGetPage(buffer); @@ -219,28 +96,49 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) return; } - if (isnewroot) - GISTInitBuffer(buffer, 0); - else if (xlrec.data->ntodelete) + data = begin + sizeof(gistxlogPageUpdate); + + /* Delete old tuples */ + if (xldata->ntodelete > 0) { int i; + OffsetNumber *todelete = (OffsetNumber *) data; + data += sizeof(OffsetNumber) * xldata->ntodelete; - for (i = 0; i < xlrec.data->ntodelete; i++) - PageIndexTupleDelete(page, xlrec.todelete[i]); + for (i = 0; i < xldata->ntodelete; i++) + PageIndexTupleDelete(page, todelete[i]); if (GistPageIsLeaf(page)) GistMarkTuplesDeleted(page); } /* add tuples */ - if (xlrec.len > 0) - gistfillbuffer(page, xlrec.itup, xlrec.len, InvalidOffsetNumber); - - /* - * special case: leafpage, nothing to insert, nothing to delete, then - * vacuum marks page - */ - if (GistPageIsLeaf(page) && xlrec.len == 0 && xlrec.data->ntodelete == 0) - GistClearTuplesDeleted(page); + if (data - begin < record->xl_len) + { + OffsetNumber off = (PageIsEmpty(page)) ? FirstOffsetNumber : + OffsetNumberNext(PageGetMaxOffsetNumber(page)); + while (data - begin < record->xl_len) + { + IndexTuple itup = (IndexTuple) data; + Size sz = IndexTupleSize(itup); + OffsetNumber l; + data += sz; + + l = PageAddItem(page, (Item) itup, sz, off, false, false); + if (l == InvalidOffsetNumber) + elog(ERROR, "failed to add item to GiST index page, size %d bytes", + (int) sz); + off++; + } + } + else + { + /* + * special case: leafpage, nothing to insert, nothing to delete, then + * vacuum marks page + */ + if (GistPageIsLeaf(page) && xldata->ntodelete == 0) + GistClearTuplesDeleted(page); + } if (!GistPageIsLeaf(page) && PageGetMaxOffsetNumber(page) == InvalidOffsetNumber && xldata->blkno == GIST_ROOT_BLKNO) @@ -315,41 +213,67 @@ decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record) static void gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record) { + gistxlogPageSplit *xldata = (gistxlogPageSplit *) XLogRecGetData(record); PageSplitRecord xlrec; Buffer buffer; Page page; int i; - int flags; + bool isrootsplit = false; + if (BlockNumberIsValid(xldata->leftchild)) + gistRedoClearFollowRight(xldata->node, lsn, xldata->leftchild); decodePageSplitRecord(&xlrec, record); - flags = xlrec.data->origleaf ? F_LEAF : 0; /* loop around all pages */ for (i = 0; i < xlrec.data->npage; i++) { NewPage *newpage = xlrec.page + i; + int flags; + + if (newpage->header->blkno == GIST_ROOT_BLKNO) + { + Assert(i == 0); + isrootsplit = true; + } buffer = XLogReadBuffer(xlrec.data->node, newpage->header->blkno, true); Assert(BufferIsValid(buffer)); page = (Page) BufferGetPage(buffer); /* ok, clear buffer */ + if (xlrec.data->origleaf && newpage->header->blkno != GIST_ROOT_BLKNO) + flags = F_LEAF; + else + flags = 0; GISTInitBuffer(buffer, flags); /* and fill it */ gistfillbuffer(page, newpage->itup, newpage->header->num, FirstOffsetNumber); + if (newpage->header->blkno == GIST_ROOT_BLKNO) + { + GistPageGetOpaque(page)->rightlink = InvalidBlockNumber; + GistPageGetOpaque(page)->nsn = xldata->orignsn; + GistClearFollowRight(page); + } + else + { + if (i < xlrec.data->npage - 1) + GistPageGetOpaque(page)->rightlink = xlrec.page[i + 1].header->blkno; + else + GistPageGetOpaque(page)->rightlink = xldata->origrlink; + GistPageGetOpaque(page)->nsn = xldata->orignsn; + if (i < xlrec.data->npage - 1 && !isrootsplit) + GistMarkFollowRight(page); + else + GistClearFollowRight(page); + } + PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); MarkBufferDirty(buffer); UnlockReleaseBuffer(buffer); } - - forgetIncompleteInsert(xlrec.data->node, xlrec.data->key); - - pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key, - NULL, 0, - &xlrec); } static void @@ -372,24 +296,6 @@ gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) UnlockReleaseBuffer(buffer); } -static void -gistRedoCompleteInsert(XLogRecPtr lsn, XLogRecord *record) -{ - char *begin = XLogRecGetData(record), - *ptr; - gistxlogInsertComplete *xlrec; - - xlrec = (gistxlogInsertComplete *) begin; - - ptr = begin + sizeof(gistxlogInsertComplete); - while (ptr - begin < record->xl_len) - { - Assert(record->xl_len - (ptr - begin) >= sizeof(ItemPointerData)); - forgetIncompleteInsert(xlrec->node, *((ItemPointerData *) ptr)); - ptr += sizeof(ItemPointerData); - } -} - void gist_redo(XLogRecPtr lsn, XLogRecord *record) { @@ -401,30 +307,23 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record) * implement a similar optimization we have in b-tree, and remove killed * tuples outside VACUUM, we'll need to handle that here. */ - RestoreBkpBlocks(lsn, record, false); oldCxt = MemoryContextSwitchTo(opCtx); switch (info) { case XLOG_GIST_PAGE_UPDATE: - gistRedoPageUpdateRecord(lsn, record, false); + gistRedoPageUpdateRecord(lsn, record); break; case XLOG_GIST_PAGE_DELETE: gistRedoPageDeleteRecord(lsn, record); break; - case XLOG_GIST_NEW_ROOT: - gistRedoPageUpdateRecord(lsn, record, true); - break; case XLOG_GIST_PAGE_SPLIT: gistRedoPageSplitRecord(lsn, record); break; case XLOG_GIST_CREATE_INDEX: gistRedoCreateIndex(lsn, record); break; - case XLOG_GIST_INSERT_COMPLETE: - gistRedoCompleteInsert(lsn, record); - break; default: elog(PANIC, "gist_redo: unknown op code %u", info); } @@ -434,20 +333,16 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record) } static void -out_target(StringInfo buf, RelFileNode node, ItemPointerData key) +out_target(StringInfo buf, RelFileNode node) { appendStringInfo(buf, "rel %u/%u/%u", node.spcNode, node.dbNode, node.relNode); - if (ItemPointerIsValid(&key)) - appendStringInfo(buf, "; tid %u/%u", - ItemPointerGetBlockNumber(&key), - ItemPointerGetOffsetNumber(&key)); } static void out_gistxlogPageUpdate(StringInfo buf, gistxlogPageUpdate *xlrec) { - out_target(buf, xlrec->node, xlrec->key); + out_target(buf, xlrec->node); appendStringInfo(buf, "; block number %u", xlrec->blkno); } @@ -463,7 +358,7 @@ static void out_gistxlogPageSplit(StringInfo buf, gistxlogPageSplit *xlrec) { appendStringInfo(buf, "page_split: "); - out_target(buf, xlrec->node, xlrec->key); + out_target(buf, xlrec->node); appendStringInfo(buf, "; block number %u splits to %d pages", xlrec->origblkno, xlrec->npage); } @@ -482,10 +377,6 @@ gist_desc(StringInfo buf, uint8 xl_info, char *rec) case XLOG_GIST_PAGE_DELETE: out_gistxlogPageDelete(buf, (gistxlogPageDelete *) rec); break; - case XLOG_GIST_NEW_ROOT: - appendStringInfo(buf, "new_root: "); - out_target(buf, ((gistxlogPageUpdate *) rec)->node, ((gistxlogPageUpdate *) rec)->key); - break; case XLOG_GIST_PAGE_SPLIT: out_gistxlogPageSplit(buf, (gistxlogPageSplit *) rec); break; @@ -495,415 +386,102 @@ gist_desc(StringInfo buf, uint8 xl_info, char *rec) ((RelFileNode *) rec)->dbNode, ((RelFileNode *) rec)->relNode); break; - case XLOG_GIST_INSERT_COMPLETE: - appendStringInfo(buf, "complete_insert: rel %u/%u/%u", - ((gistxlogInsertComplete *) rec)->node.spcNode, - ((gistxlogInsertComplete *) rec)->node.dbNode, - ((gistxlogInsertComplete *) rec)->node.relNode); - break; default: appendStringInfo(buf, "unknown gist op code %u", info); break; } } -IndexTuple -gist_form_invalid_tuple(BlockNumber blkno) -{ - /* - * we don't alloc space for null's bitmap, this is invalid tuple, be - * carefull in read and write code - */ - Size size = IndexInfoFindDataOffset(0); - IndexTuple tuple = (IndexTuple) palloc0(size); - - tuple->t_info |= size; - - ItemPointerSetBlockNumber(&(tuple->t_tid), blkno); - GistTupleSetInvalid(tuple); - - return tuple; -} - - -static void -gistxlogFindPath(Relation index, gistIncompleteInsert *insert) -{ - GISTInsertStack *top; - - insert->pathlen = 0; - insert->path = NULL; - - if ((top = gistFindPath(index, insert->origblkno)) != NULL) - { - int i; - GISTInsertStack *ptr; - - for (ptr = top; ptr; ptr = ptr->parent) - insert->pathlen++; - - insert->path = (BlockNumber *) palloc(sizeof(BlockNumber) * insert->pathlen); - - i = 0; - for (ptr = top; ptr; ptr = ptr->parent) - insert->path[i++] = ptr->blkno; - } - else - elog(ERROR, "lost parent for block %u", insert->origblkno); -} - -static SplitedPageLayout * -gistMakePageLayout(Buffer *buffers, int nbuffers) -{ - SplitedPageLayout *res = NULL, - *resptr; - - while (nbuffers-- > 0) - { - Page page = BufferGetPage(buffers[nbuffers]); - IndexTuple *vec; - int veclen; - - resptr = (SplitedPageLayout *) palloc0(sizeof(SplitedPageLayout)); - - resptr->block.blkno = BufferGetBlockNumber(buffers[nbuffers]); - resptr->block.num = PageGetMaxOffsetNumber(page); - - vec = gistextractpage(page, &veclen); - resptr->list = gistfillitupvec(vec, veclen, &(resptr->lenlist)); - - resptr->next = res; - res = resptr; - } - - return res; -} - -/* - * Continue insert after crash. In normal situations, there aren't any - * incomplete inserts, but if a crash occurs partway through an insertion - * sequence, we'll need to finish making the index valid at the end of WAL - * replay. - * - * Note that we assume the index is now in a valid state, except for the - * unfinished insertion. In particular it's safe to invoke gistFindPath(); - * there shouldn't be any garbage pages for it to run into. - * - * To complete insert we can't use basic insertion algorithm because - * during insertion we can't call user-defined support functions of opclass. - * So, we insert 'invalid' tuples without real key and do it by separate algorithm. - * 'invalid' tuple should be updated by vacuum full. - */ -static void -gistContinueInsert(gistIncompleteInsert *insert) -{ - IndexTuple *itup; - int i, - lenitup; - Relation index; - - index = CreateFakeRelcacheEntry(insert->node); - - /* - * needed vector itup never will be more than initial lenblkno+2, because - * during this processing Indextuple can be only smaller - */ - lenitup = insert->lenblk; - itup = (IndexTuple *) palloc(sizeof(IndexTuple) * (lenitup + 2 /* guarantee root split */ )); - - for (i = 0; i < insert->lenblk; i++) - itup[i] = gist_form_invalid_tuple(insert->blkno[i]); - - /* - * any insertion of itup[] should make LOG message about - */ - - if (insert->origblkno == GIST_ROOT_BLKNO) - { - /* - * it was split root, so we should only make new root. it can't be - * simple insert into root, we should replace all content of root. - */ - Buffer buffer = XLogReadBuffer(insert->node, GIST_ROOT_BLKNO, true); - - gistnewroot(index, buffer, itup, lenitup, NULL); - UnlockReleaseBuffer(buffer); - } - else - { - Buffer *buffers; - Page *pages; - int numbuffer; - OffsetNumber *todelete; - - /* construct path */ - gistxlogFindPath(index, insert); - - Assert(insert->pathlen > 0); - - buffers = (Buffer *) palloc(sizeof(Buffer) * (insert->lenblk + 2 /* guarantee root split */ )); - pages = (Page *) palloc(sizeof(Page) * (insert->lenblk + 2 /* guarantee root split */ )); - todelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (insert->lenblk + 2 /* guarantee root split */ )); - - for (i = 0; i < insert->pathlen; i++) - { - int j, - k, - pituplen = 0; - uint8 xlinfo; - XLogRecData *rdata; - XLogRecPtr recptr; - Buffer tempbuffer = InvalidBuffer; - int ntodelete = 0; - - numbuffer = 1; - buffers[0] = ReadBuffer(index, insert->path[i]); - LockBuffer(buffers[0], GIST_EXCLUSIVE); - - /* - * we check buffer, because we restored page earlier - */ - gistcheckpage(index, buffers[0]); - - pages[0] = BufferGetPage(buffers[0]); - Assert(!GistPageIsLeaf(pages[0])); - - pituplen = PageGetMaxOffsetNumber(pages[0]); - - /* find remove old IndexTuples to remove */ - for (j = 0; j < pituplen && ntodelete < lenitup; j++) - { - BlockNumber blkno; - ItemId iid = PageGetItemId(pages[0], j + FirstOffsetNumber); - IndexTuple idxtup = (IndexTuple) PageGetItem(pages[0], iid); - - blkno = ItemPointerGetBlockNumber(&(idxtup->t_tid)); - - for (k = 0; k < lenitup; k++) - if (ItemPointerGetBlockNumber(&(itup[k]->t_tid)) == blkno) - { - todelete[ntodelete] = j + FirstOffsetNumber - ntodelete; - ntodelete++; - break; - } - } - - if (ntodelete == 0) - elog(PANIC, "gistContinueInsert: cannot find pointer to page(s)"); - - /* - * we check space with subtraction only first tuple to delete, - * hope, that wiil be enough space.... - */ - - if (gistnospace(pages[0], itup, lenitup, *todelete, 0)) - { - - /* no space left on page, so we must split */ - buffers[numbuffer] = ReadBuffer(index, P_NEW); - LockBuffer(buffers[numbuffer], GIST_EXCLUSIVE); - GISTInitBuffer(buffers[numbuffer], 0); - pages[numbuffer] = BufferGetPage(buffers[numbuffer]); - gistfillbuffer(pages[numbuffer], itup, lenitup, FirstOffsetNumber); - numbuffer++; - - if (BufferGetBlockNumber(buffers[0]) == GIST_ROOT_BLKNO) - { - Buffer tmp; - - /* - * we split root, just copy content from root to new page - */ - - /* sanity check */ - if (i + 1 != insert->pathlen) - elog(PANIC, "unexpected pathlen in index \"%s\"", - RelationGetRelationName(index)); - - /* fill new page, root will be changed later */ - tempbuffer = ReadBuffer(index, P_NEW); - LockBuffer(tempbuffer, GIST_EXCLUSIVE); - memcpy(BufferGetPage(tempbuffer), pages[0], BufferGetPageSize(tempbuffer)); - - /* swap buffers[0] (was root) and temp buffer */ - tmp = buffers[0]; - buffers[0] = tempbuffer; - tempbuffer = tmp; /* now in tempbuffer GIST_ROOT_BLKNO, - * it is still unchanged */ - - pages[0] = BufferGetPage(buffers[0]); - } - - START_CRIT_SECTION(); - - for (j = 0; j < ntodelete; j++) - PageIndexTupleDelete(pages[0], todelete[j]); - - xlinfo = XLOG_GIST_PAGE_SPLIT; - rdata = formSplitRdata(index->rd_node, insert->path[i], - false, &(insert->key), - gistMakePageLayout(buffers, numbuffer)); - - } - else - { - START_CRIT_SECTION(); - - for (j = 0; j < ntodelete; j++) - PageIndexTupleDelete(pages[0], todelete[j]); - gistfillbuffer(pages[0], itup, lenitup, InvalidOffsetNumber); - - xlinfo = XLOG_GIST_PAGE_UPDATE; - rdata = formUpdateRdata(index->rd_node, buffers[0], - todelete, ntodelete, - itup, lenitup, &(insert->key)); - } - - /* - * use insert->key as mark for completion of insert (form*Rdata() - * above) for following possible replays - */ - - /* write pages, we should mark it dirty befor XLogInsert() */ - for (j = 0; j < numbuffer; j++) - { - GistPageGetOpaque(pages[j])->rightlink = InvalidBlockNumber; - MarkBufferDirty(buffers[j]); - } - recptr = XLogInsert(RM_GIST_ID, xlinfo, rdata); - for (j = 0; j < numbuffer; j++) - { - PageSetLSN(pages[j], recptr); - PageSetTLI(pages[j], ThisTimeLineID); - } - - END_CRIT_SECTION(); - - lenitup = numbuffer; - for (j = 0; j < numbuffer; j++) - { - itup[j] = gist_form_invalid_tuple(BufferGetBlockNumber(buffers[j])); - UnlockReleaseBuffer(buffers[j]); - } - - if (tempbuffer != InvalidBuffer) - { - /* - * it was a root split, so fill it by new values - */ - gistnewroot(index, tempbuffer, itup, lenitup, &(insert->key)); - UnlockReleaseBuffer(tempbuffer); - } - } - } - - FreeFakeRelcacheEntry(index); - - ereport(LOG, - (errmsg("index %u/%u/%u needs VACUUM FULL or REINDEX to finish crash recovery", - insert->node.spcNode, insert->node.dbNode, insert->node.relNode), - errdetail("Incomplete insertion detected during crash replay."))); -} - void gist_xlog_startup(void) { - incomplete_inserts = NIL; - insertCtx = AllocSetContextCreate(CurrentMemoryContext, - "GiST recovery temporary context", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); opCtx = createTempGistContext(); } void gist_xlog_cleanup(void) { - ListCell *l; - MemoryContext oldCxt; - - oldCxt = MemoryContextSwitchTo(opCtx); - - foreach(l, incomplete_inserts) - { - gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l); - - gistContinueInsert(insert); - MemoryContextReset(opCtx); - } - MemoryContextSwitchTo(oldCxt); - MemoryContextDelete(opCtx); - MemoryContextDelete(insertCtx); -} - -bool -gist_safe_restartpoint(void) -{ - if (incomplete_inserts) - return false; - return true; } - -XLogRecData * -formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf, - ItemPointer key, SplitedPageLayout *dist) +/* + * Write WAL record of a page split. + */ +XLogRecPtr +gistXLogSplit(RelFileNode node, BlockNumber blkno, bool page_is_leaf, + SplitedPageLayout *dist, + BlockNumber origrlink, GistNSN orignsn, + Buffer leftchildbuf) { XLogRecData *rdata; - gistxlogPageSplit *xlrec = (gistxlogPageSplit *) palloc(sizeof(gistxlogPageSplit)); + gistxlogPageSplit xlrec; SplitedPageLayout *ptr; int npage = 0, - cur = 1; + cur; + XLogRecPtr recptr; - ptr = dist; - while (ptr) - { + for (ptr = dist; ptr; ptr = ptr->next) npage++; - ptr = ptr->next; - } rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (npage * 2 + 2)); - xlrec->node = node; - xlrec->origblkno = blkno; - xlrec->origleaf = page_is_leaf; - xlrec->npage = (uint16) npage; - if (key) - xlrec->key = *key; - else - ItemPointerSetInvalid(&(xlrec->key)); + xlrec.node = node; + xlrec.origblkno = blkno; + xlrec.origrlink = origrlink; + xlrec.orignsn = orignsn; + xlrec.origleaf = page_is_leaf; + xlrec.npage = (uint16) npage; + xlrec.leftchild = + BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber; - rdata[0].buffer = InvalidBuffer; - rdata[0].data = (char *) xlrec; + rdata[0].data = (char *) &xlrec; rdata[0].len = sizeof(gistxlogPageSplit); - rdata[0].next = NULL; + rdata[0].buffer = InvalidBuffer; - ptr = dist; - while (ptr) + cur = 1; + + /* + * Include a full page image of the child buf. (only necessary if a + * checkpoint happened since the child page was split) + */ + if (BufferIsValid(leftchildbuf)) { + rdata[cur - 1].next = &(rdata[cur]); + rdata[cur].data = NULL; + rdata[cur].len = 0; + rdata[cur].buffer = leftchildbuf; + rdata[cur].buffer_std = true; + cur++; + } + + for (ptr = dist; ptr; ptr = ptr->next) + { + rdata[cur - 1].next = &(rdata[cur]); rdata[cur].buffer = InvalidBuffer; rdata[cur].data = (char *) &(ptr->block); rdata[cur].len = sizeof(gistxlogPage); - rdata[cur - 1].next = &(rdata[cur]); cur++; + rdata[cur - 1].next = &(rdata[cur]); rdata[cur].buffer = InvalidBuffer; rdata[cur].data = (char *) (ptr->list); rdata[cur].len = ptr->lenlist; - rdata[cur - 1].next = &(rdata[cur]); - rdata[cur].next = NULL; cur++; - ptr = ptr->next; } + rdata[cur - 1].next = NULL; + + recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata); - return rdata; + pfree(rdata); + return recptr; } /* - * Construct the rdata array for an XLOG record describing a page update - * (deletion and/or insertion of tuples on a single index page). + * Write XLOG record describing a page update. The update can include any + * number of deletions and/or insertions of tuples on a single index page. + * + * If this update inserts a downlink for a split page, also record that + * the F_FOLLOW_RIGHT flag on the child page is cleared and NSN set. * * Note that both the todelete array and the tuples are marked as belonging * to the target buffer; they need not be stored in XLOG if XLogInsert decides @@ -911,27 +489,26 @@ formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf, * at least one rdata item referencing the buffer, even when ntodelete and * ituplen are both zero; this ensures that XLogInsert knows about the buffer. */ -XLogRecData * -formUpdateRdata(RelFileNode node, Buffer buffer, - OffsetNumber *todelete, int ntodelete, - IndexTuple *itup, int ituplen, ItemPointer key) +XLogRecPtr +gistXLogUpdate(RelFileNode node, Buffer buffer, + OffsetNumber *todelete, int ntodelete, + IndexTuple *itup, int ituplen, + Buffer leftchildbuf) { XLogRecData *rdata; gistxlogPageUpdate *xlrec; int cur, i; + XLogRecPtr recptr; - rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (3 + ituplen)); + rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (4 + ituplen)); xlrec = (gistxlogPageUpdate *) palloc(sizeof(gistxlogPageUpdate)); xlrec->node = node; xlrec->blkno = BufferGetBlockNumber(buffer); xlrec->ntodelete = ntodelete; - - if (key) - xlrec->key = *key; - else - ItemPointerSetInvalid(&(xlrec->key)); + xlrec->leftchild = + BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber; rdata[0].buffer = buffer; rdata[0].buffer_std = true; @@ -945,13 +522,13 @@ formUpdateRdata(RelFileNode node, Buffer buffer, rdata[1].next = &(rdata[2]); rdata[2].data = (char *) todelete; - rdata[2].len = MAXALIGN(sizeof(OffsetNumber) * ntodelete); + rdata[2].len = sizeof(OffsetNumber) * ntodelete; rdata[2].buffer = buffer; rdata[2].buffer_std = true; - rdata[2].next = NULL; - /* new tuples */ cur = 3; + + /* new tuples */ for (i = 0; i < ituplen; i++) { rdata[cur - 1].next = &(rdata[cur]); @@ -959,38 +536,26 @@ formUpdateRdata(RelFileNode node, Buffer buffer, rdata[cur].len = IndexTupleSize(itup[i]); rdata[cur].buffer = buffer; rdata[cur].buffer_std = true; - rdata[cur].next = NULL; cur++; } - return rdata; -} - -XLogRecPtr -gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len) -{ - gistxlogInsertComplete xlrec; - XLogRecData rdata[2]; - XLogRecPtr recptr; - - Assert(len > 0); - xlrec.node = node; - - rdata[0].buffer = InvalidBuffer; - rdata[0].data = (char *) &xlrec; - rdata[0].len = sizeof(gistxlogInsertComplete); - rdata[0].next = &(rdata[1]); - - rdata[1].buffer = InvalidBuffer; - rdata[1].data = (char *) keys; - rdata[1].len = sizeof(ItemPointerData) * len; - rdata[1].next = NULL; - - START_CRIT_SECTION(); - - recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_INSERT_COMPLETE, rdata); + /* + * Include a full page image of the child buf. (only necessary if + * a checkpoint happened since the child page was split) + */ + if (BufferIsValid(leftchildbuf)) + { + rdata[cur - 1].next = &(rdata[cur]); + rdata[cur].data = NULL; + rdata[cur].len = 0; + rdata[cur].buffer = leftchildbuf; + rdata[cur].buffer_std = true; + cur++; + } + rdata[cur - 1].next = NULL; - END_CRIT_SECTION(); + recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata); + pfree(rdata); return recptr; } |