diff options
Diffstat (limited to 'src/backend/access/gist/gistxlog.c')
-rw-r--r-- | src/backend/access/gist/gistxlog.c | 797 |
1 files changed, 181 insertions, 616 deletions
diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c index a90303e547..7d25728d8d 100644 --- a/src/backend/access/gist/gistxlog.c +++ b/src/backend/access/gist/gistxlog.c @@ -20,15 +20,6 @@ #include "utils/memutils.h" #include "utils/rel.h" - -typedef struct -{ - gistxlogPageUpdate *data; - int len; - IndexTuple *itup; - OffsetNumber *todelete; -} PageUpdateRecord; - typedef struct { gistxlogPage *header; @@ -41,144 +32,37 @@ typedef struct NewPage *page; } PageSplitRecord; -/* track for incomplete inserts, idea was taken from nbtxlog.c */ - -typedef struct gistIncompleteInsert -{ - RelFileNode node; - BlockNumber origblkno; /* for splits */ - ItemPointerData key; - int lenblk; - BlockNumber *blkno; - XLogRecPtr lsn; - BlockNumber *path; - int pathlen; -} gistIncompleteInsert; - - static MemoryContext opCtx; /* working memory for operations */ -static MemoryContext insertCtx; /* holds incomplete_inserts list */ -static List *incomplete_inserts; - - -#define ItemPointerEQ(a, b) \ - ( ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(b) && \ - ItemPointerGetBlockNumber (a) == ItemPointerGetBlockNumber(b) ) - +/* + * Replay the clearing of F_FOLLOW_RIGHT flag. + */ static void -pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key, - BlockNumber *blkno, int lenblk, - PageSplitRecord *xlinfo /* to extract blkno info */ ) +gistRedoClearFollowRight(RelFileNode node, XLogRecPtr lsn, + BlockNumber leftblkno) { - MemoryContext oldCxt; - gistIncompleteInsert *ninsert; + Buffer buffer; - if (!ItemPointerIsValid(&key)) + buffer = XLogReadBuffer(node, leftblkno, false); + if (BufferIsValid(buffer)) + { + Page page = (Page) BufferGetPage(buffer); /* - * if key is null then we should not store insertion as incomplete, - * because it's a vacuum operation.. + * Note that we still update the page even if page LSN is equal to the + * LSN of this record, because the updated NSN is not included in the + * full page image. */ - return; - - oldCxt = MemoryContextSwitchTo(insertCtx); - ninsert = (gistIncompleteInsert *) palloc(sizeof(gistIncompleteInsert)); - - ninsert->node = node; - ninsert->key = key; - ninsert->lsn = lsn; - - if (lenblk && blkno) - { - ninsert->lenblk = lenblk; - ninsert->blkno = (BlockNumber *) palloc(sizeof(BlockNumber) * ninsert->lenblk); - memcpy(ninsert->blkno, blkno, sizeof(BlockNumber) * ninsert->lenblk); - ninsert->origblkno = *blkno; - } - else - { - int i; - - Assert(xlinfo); - ninsert->lenblk = xlinfo->data->npage; - ninsert->blkno = (BlockNumber *) palloc(sizeof(BlockNumber) * ninsert->lenblk); - for (i = 0; i < ninsert->lenblk; i++) - ninsert->blkno[i] = xlinfo->page[i].header->blkno; - ninsert->origblkno = xlinfo->data->origblkno; - } - Assert(ninsert->lenblk > 0); - - /* - * Stick the new incomplete insert onto the front of the list, not the - * back. This is so that gist_xlog_cleanup will process incompletions in - * last-in-first-out order. - */ - incomplete_inserts = lcons(ninsert, incomplete_inserts); - - MemoryContextSwitchTo(oldCxt); -} - -static void -forgetIncompleteInsert(RelFileNode node, ItemPointerData key) -{ - ListCell *l; - - if (!ItemPointerIsValid(&key)) - return; - - if (incomplete_inserts == NIL) - return; - - foreach(l, incomplete_inserts) - { - gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l); - - if (RelFileNodeEquals(node, insert->node) && ItemPointerEQ(&(insert->key), &(key))) + if (!XLByteLT(lsn, PageGetLSN(page))) { - /* found */ - incomplete_inserts = list_delete_ptr(incomplete_inserts, insert); - pfree(insert->blkno); - pfree(insert); - break; - } - } -} + GistPageGetOpaque(page)->nsn = lsn; + GistClearFollowRight(page); -static void -decodePageUpdateRecord(PageUpdateRecord *decoded, XLogRecord *record) -{ - char *begin = XLogRecGetData(record), - *ptr; - int i = 0, - addpath = 0; - - decoded->data = (gistxlogPageUpdate *) begin; - - if (decoded->data->ntodelete) - { - decoded->todelete = (OffsetNumber *) (begin + sizeof(gistxlogPageUpdate) + addpath); - addpath = MAXALIGN(sizeof(OffsetNumber) * decoded->data->ntodelete); - } - else - decoded->todelete = NULL; - - decoded->len = 0; - ptr = begin + sizeof(gistxlogPageUpdate) + addpath; - while (ptr - begin < record->xl_len) - { - decoded->len++; - ptr += IndexTupleSize((IndexTuple) ptr); - } - - decoded->itup = (IndexTuple *) palloc(sizeof(IndexTuple) * decoded->len); - - ptr = begin + sizeof(gistxlogPageUpdate) + addpath; - while (ptr - begin < record->xl_len) - { - decoded->itup[i] = (IndexTuple) ptr; - ptr += IndexTupleSize(decoded->itup[i]); - i++; + PageSetLSN(page, lsn); + PageSetTLI(page, ThisTimeLineID); + MarkBufferDirty(buffer); + } + UnlockReleaseBuffer(buffer); } } @@ -186,29 +70,22 @@ decodePageUpdateRecord(PageUpdateRecord *decoded, XLogRecord *record) * redo any page update (except page split) */ static void -gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) +gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record) { - gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) XLogRecGetData(record); - PageUpdateRecord xlrec; + char *begin = XLogRecGetData(record); + gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) begin; Buffer buffer; Page page; + char *data; - /* we must fix incomplete_inserts list even if XLR_BKP_BLOCK_1 is set */ - forgetIncompleteInsert(xldata->node, xldata->key); + if (BlockNumberIsValid(xldata->leftchild)) + gistRedoClearFollowRight(xldata->node, lsn, xldata->leftchild); - if (!isnewroot && xldata->blkno != GIST_ROOT_BLKNO) - /* operation with root always finalizes insertion */ - pushIncompleteInsert(xldata->node, lsn, xldata->key, - &(xldata->blkno), 1, - NULL); - - /* nothing else to do if page was backed up (and no info to do it with) */ + /* nothing more to do if page was backed up (and no info to do it with) */ if (record->xl_info & XLR_BKP_BLOCK_1) return; - decodePageUpdateRecord(&xlrec, record); - - buffer = XLogReadBuffer(xlrec.data->node, xlrec.data->blkno, false); + buffer = XLogReadBuffer(xldata->node, xldata->blkno, false); if (!BufferIsValid(buffer)) return; page = (Page) BufferGetPage(buffer); @@ -219,28 +96,49 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) return; } - if (isnewroot) - GISTInitBuffer(buffer, 0); - else if (xlrec.data->ntodelete) + data = begin + sizeof(gistxlogPageUpdate); + + /* Delete old tuples */ + if (xldata->ntodelete > 0) { int i; + OffsetNumber *todelete = (OffsetNumber *) data; + data += sizeof(OffsetNumber) * xldata->ntodelete; - for (i = 0; i < xlrec.data->ntodelete; i++) - PageIndexTupleDelete(page, xlrec.todelete[i]); + for (i = 0; i < xldata->ntodelete; i++) + PageIndexTupleDelete(page, todelete[i]); if (GistPageIsLeaf(page)) GistMarkTuplesDeleted(page); } /* add tuples */ - if (xlrec.len > 0) - gistfillbuffer(page, xlrec.itup, xlrec.len, InvalidOffsetNumber); - - /* - * special case: leafpage, nothing to insert, nothing to delete, then - * vacuum marks page - */ - if (GistPageIsLeaf(page) && xlrec.len == 0 && xlrec.data->ntodelete == 0) - GistClearTuplesDeleted(page); + if (data - begin < record->xl_len) + { + OffsetNumber off = (PageIsEmpty(page)) ? FirstOffsetNumber : + OffsetNumberNext(PageGetMaxOffsetNumber(page)); + while (data - begin < record->xl_len) + { + IndexTuple itup = (IndexTuple) data; + Size sz = IndexTupleSize(itup); + OffsetNumber l; + data += sz; + + l = PageAddItem(page, (Item) itup, sz, off, false, false); + if (l == InvalidOffsetNumber) + elog(ERROR, "failed to add item to GiST index page, size %d bytes", + (int) sz); + off++; + } + } + else + { + /* + * special case: leafpage, nothing to insert, nothing to delete, then + * vacuum marks page + */ + if (GistPageIsLeaf(page) && xldata->ntodelete == 0) + GistClearTuplesDeleted(page); + } if (!GistPageIsLeaf(page) && PageGetMaxOffsetNumber(page) == InvalidOffsetNumber && xldata->blkno == GIST_ROOT_BLKNO) @@ -315,41 +213,67 @@ decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record) static void gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record) { + gistxlogPageSplit *xldata = (gistxlogPageSplit *) XLogRecGetData(record); PageSplitRecord xlrec; Buffer buffer; Page page; int i; - int flags; + bool isrootsplit = false; + if (BlockNumberIsValid(xldata->leftchild)) + gistRedoClearFollowRight(xldata->node, lsn, xldata->leftchild); decodePageSplitRecord(&xlrec, record); - flags = xlrec.data->origleaf ? F_LEAF : 0; /* loop around all pages */ for (i = 0; i < xlrec.data->npage; i++) { NewPage *newpage = xlrec.page + i; + int flags; + + if (newpage->header->blkno == GIST_ROOT_BLKNO) + { + Assert(i == 0); + isrootsplit = true; + } buffer = XLogReadBuffer(xlrec.data->node, newpage->header->blkno, true); Assert(BufferIsValid(buffer)); page = (Page) BufferGetPage(buffer); /* ok, clear buffer */ + if (xlrec.data->origleaf && newpage->header->blkno != GIST_ROOT_BLKNO) + flags = F_LEAF; + else + flags = 0; GISTInitBuffer(buffer, flags); /* and fill it */ gistfillbuffer(page, newpage->itup, newpage->header->num, FirstOffsetNumber); + if (newpage->header->blkno == GIST_ROOT_BLKNO) + { + GistPageGetOpaque(page)->rightlink = InvalidBlockNumber; + GistPageGetOpaque(page)->nsn = xldata->orignsn; + GistClearFollowRight(page); + } + else + { + if (i < xlrec.data->npage - 1) + GistPageGetOpaque(page)->rightlink = xlrec.page[i + 1].header->blkno; + else + GistPageGetOpaque(page)->rightlink = xldata->origrlink; + GistPageGetOpaque(page)->nsn = xldata->orignsn; + if (i < xlrec.data->npage - 1 && !isrootsplit) + GistMarkFollowRight(page); + else + GistClearFollowRight(page); + } + PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); MarkBufferDirty(buffer); UnlockReleaseBuffer(buffer); } - - forgetIncompleteInsert(xlrec.data->node, xlrec.data->key); - - pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key, - NULL, 0, - &xlrec); } static void @@ -372,24 +296,6 @@ gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) UnlockReleaseBuffer(buffer); } -static void -gistRedoCompleteInsert(XLogRecPtr lsn, XLogRecord *record) -{ - char *begin = XLogRecGetData(record), - *ptr; - gistxlogInsertComplete *xlrec; - - xlrec = (gistxlogInsertComplete *) begin; - - ptr = begin + sizeof(gistxlogInsertComplete); - while (ptr - begin < record->xl_len) - { - Assert(record->xl_len - (ptr - begin) >= sizeof(ItemPointerData)); - forgetIncompleteInsert(xlrec->node, *((ItemPointerData *) ptr)); - ptr += sizeof(ItemPointerData); - } -} - void gist_redo(XLogRecPtr lsn, XLogRecord *record) { @@ -401,30 +307,23 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record) * implement a similar optimization we have in b-tree, and remove killed * tuples outside VACUUM, we'll need to handle that here. */ - RestoreBkpBlocks(lsn, record, false); oldCxt = MemoryContextSwitchTo(opCtx); switch (info) { case XLOG_GIST_PAGE_UPDATE: - gistRedoPageUpdateRecord(lsn, record, false); + gistRedoPageUpdateRecord(lsn, record); break; case XLOG_GIST_PAGE_DELETE: gistRedoPageDeleteRecord(lsn, record); break; - case XLOG_GIST_NEW_ROOT: - gistRedoPageUpdateRecord(lsn, record, true); - break; case XLOG_GIST_PAGE_SPLIT: gistRedoPageSplitRecord(lsn, record); break; case XLOG_GIST_CREATE_INDEX: gistRedoCreateIndex(lsn, record); break; - case XLOG_GIST_INSERT_COMPLETE: - gistRedoCompleteInsert(lsn, record); - break; default: elog(PANIC, "gist_redo: unknown op code %u", info); } @@ -434,20 +333,16 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record) } static void -out_target(StringInfo buf, RelFileNode node, ItemPointerData key) +out_target(StringInfo buf, RelFileNode node) { appendStringInfo(buf, "rel %u/%u/%u", node.spcNode, node.dbNode, node.relNode); - if (ItemPointerIsValid(&key)) - appendStringInfo(buf, "; tid %u/%u", - ItemPointerGetBlockNumber(&key), - ItemPointerGetOffsetNumber(&key)); } static void out_gistxlogPageUpdate(StringInfo buf, gistxlogPageUpdate *xlrec) { - out_target(buf, xlrec->node, xlrec->key); + out_target(buf, xlrec->node); appendStringInfo(buf, "; block number %u", xlrec->blkno); } @@ -463,7 +358,7 @@ static void out_gistxlogPageSplit(StringInfo buf, gistxlogPageSplit *xlrec) { appendStringInfo(buf, "page_split: "); - out_target(buf, xlrec->node, xlrec->key); + out_target(buf, xlrec->node); appendStringInfo(buf, "; block number %u splits to %d pages", xlrec->origblkno, xlrec->npage); } @@ -482,10 +377,6 @@ gist_desc(StringInfo buf, uint8 xl_info, char *rec) case XLOG_GIST_PAGE_DELETE: out_gistxlogPageDelete(buf, (gistxlogPageDelete *) rec); break; - case XLOG_GIST_NEW_ROOT: - appendStringInfo(buf, "new_root: "); - out_target(buf, ((gistxlogPageUpdate *) rec)->node, ((gistxlogPageUpdate *) rec)->key); - break; case XLOG_GIST_PAGE_SPLIT: out_gistxlogPageSplit(buf, (gistxlogPageSplit *) rec); break; @@ -495,415 +386,102 @@ gist_desc(StringInfo buf, uint8 xl_info, char *rec) ((RelFileNode *) rec)->dbNode, ((RelFileNode *) rec)->relNode); break; - case XLOG_GIST_INSERT_COMPLETE: - appendStringInfo(buf, "complete_insert: rel %u/%u/%u", - ((gistxlogInsertComplete *) rec)->node.spcNode, - ((gistxlogInsertComplete *) rec)->node.dbNode, - ((gistxlogInsertComplete *) rec)->node.relNode); - break; default: appendStringInfo(buf, "unknown gist op code %u", info); break; } } -IndexTuple -gist_form_invalid_tuple(BlockNumber blkno) -{ - /* - * we don't alloc space for null's bitmap, this is invalid tuple, be - * carefull in read and write code - */ - Size size = IndexInfoFindDataOffset(0); - IndexTuple tuple = (IndexTuple) palloc0(size); - - tuple->t_info |= size; - - ItemPointerSetBlockNumber(&(tuple->t_tid), blkno); - GistTupleSetInvalid(tuple); - - return tuple; -} - - -static void -gistxlogFindPath(Relation index, gistIncompleteInsert *insert) -{ - GISTInsertStack *top; - - insert->pathlen = 0; - insert->path = NULL; - - if ((top = gistFindPath(index, insert->origblkno)) != NULL) - { - int i; - GISTInsertStack *ptr; - - for (ptr = top; ptr; ptr = ptr->parent) - insert->pathlen++; - - insert->path = (BlockNumber *) palloc(sizeof(BlockNumber) * insert->pathlen); - - i = 0; - for (ptr = top; ptr; ptr = ptr->parent) - insert->path[i++] = ptr->blkno; - } - else - elog(ERROR, "lost parent for block %u", insert->origblkno); -} - -static SplitedPageLayout * -gistMakePageLayout(Buffer *buffers, int nbuffers) -{ - SplitedPageLayout *res = NULL, - *resptr; - - while (nbuffers-- > 0) - { - Page page = BufferGetPage(buffers[nbuffers]); - IndexTuple *vec; - int veclen; - - resptr = (SplitedPageLayout *) palloc0(sizeof(SplitedPageLayout)); - - resptr->block.blkno = BufferGetBlockNumber(buffers[nbuffers]); - resptr->block.num = PageGetMaxOffsetNumber(page); - - vec = gistextractpage(page, &veclen); - resptr->list = gistfillitupvec(vec, veclen, &(resptr->lenlist)); - - resptr->next = res; - res = resptr; - } - - return res; -} - -/* - * Continue insert after crash. In normal situations, there aren't any - * incomplete inserts, but if a crash occurs partway through an insertion - * sequence, we'll need to finish making the index valid at the end of WAL - * replay. - * - * Note that we assume the index is now in a valid state, except for the - * unfinished insertion. In particular it's safe to invoke gistFindPath(); - * there shouldn't be any garbage pages for it to run into. - * - * To complete insert we can't use basic insertion algorithm because - * during insertion we can't call user-defined support functions of opclass. - * So, we insert 'invalid' tuples without real key and do it by separate algorithm. - * 'invalid' tuple should be updated by vacuum full. - */ -static void -gistContinueInsert(gistIncompleteInsert *insert) -{ - IndexTuple *itup; - int i, - lenitup; - Relation index; - - index = CreateFakeRelcacheEntry(insert->node); - - /* - * needed vector itup never will be more than initial lenblkno+2, because - * during this processing Indextuple can be only smaller - */ - lenitup = insert->lenblk; - itup = (IndexTuple *) palloc(sizeof(IndexTuple) * (lenitup + 2 /* guarantee root split */ )); - - for (i = 0; i < insert->lenblk; i++) - itup[i] = gist_form_invalid_tuple(insert->blkno[i]); - - /* - * any insertion of itup[] should make LOG message about - */ - - if (insert->origblkno == GIST_ROOT_BLKNO) - { - /* - * it was split root, so we should only make new root. it can't be - * simple insert into root, we should replace all content of root. - */ - Buffer buffer = XLogReadBuffer(insert->node, GIST_ROOT_BLKNO, true); - - gistnewroot(index, buffer, itup, lenitup, NULL); - UnlockReleaseBuffer(buffer); - } - else - { - Buffer *buffers; - Page *pages; - int numbuffer; - OffsetNumber *todelete; - - /* construct path */ - gistxlogFindPath(index, insert); - - Assert(insert->pathlen > 0); - - buffers = (Buffer *) palloc(sizeof(Buffer) * (insert->lenblk + 2 /* guarantee root split */ )); - pages = (Page *) palloc(sizeof(Page) * (insert->lenblk + 2 /* guarantee root split */ )); - todelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (insert->lenblk + 2 /* guarantee root split */ )); - - for (i = 0; i < insert->pathlen; i++) - { - int j, - k, - pituplen = 0; - uint8 xlinfo; - XLogRecData *rdata; - XLogRecPtr recptr; - Buffer tempbuffer = InvalidBuffer; - int ntodelete = 0; - - numbuffer = 1; - buffers[0] = ReadBuffer(index, insert->path[i]); - LockBuffer(buffers[0], GIST_EXCLUSIVE); - - /* - * we check buffer, because we restored page earlier - */ - gistcheckpage(index, buffers[0]); - - pages[0] = BufferGetPage(buffers[0]); - Assert(!GistPageIsLeaf(pages[0])); - - pituplen = PageGetMaxOffsetNumber(pages[0]); - - /* find remove old IndexTuples to remove */ - for (j = 0; j < pituplen && ntodelete < lenitup; j++) - { - BlockNumber blkno; - ItemId iid = PageGetItemId(pages[0], j + FirstOffsetNumber); - IndexTuple idxtup = (IndexTuple) PageGetItem(pages[0], iid); - - blkno = ItemPointerGetBlockNumber(&(idxtup->t_tid)); - - for (k = 0; k < lenitup; k++) - if (ItemPointerGetBlockNumber(&(itup[k]->t_tid)) == blkno) - { - todelete[ntodelete] = j + FirstOffsetNumber - ntodelete; - ntodelete++; - break; - } - } - - if (ntodelete == 0) - elog(PANIC, "gistContinueInsert: cannot find pointer to page(s)"); - - /* - * we check space with subtraction only first tuple to delete, - * hope, that wiil be enough space.... - */ - - if (gistnospace(pages[0], itup, lenitup, *todelete, 0)) - { - - /* no space left on page, so we must split */ - buffers[numbuffer] = ReadBuffer(index, P_NEW); - LockBuffer(buffers[numbuffer], GIST_EXCLUSIVE); - GISTInitBuffer(buffers[numbuffer], 0); - pages[numbuffer] = BufferGetPage(buffers[numbuffer]); - gistfillbuffer(pages[numbuffer], itup, lenitup, FirstOffsetNumber); - numbuffer++; - - if (BufferGetBlockNumber(buffers[0]) == GIST_ROOT_BLKNO) - { - Buffer tmp; - - /* - * we split root, just copy content from root to new page - */ - - /* sanity check */ - if (i + 1 != insert->pathlen) - elog(PANIC, "unexpected pathlen in index \"%s\"", - RelationGetRelationName(index)); - - /* fill new page, root will be changed later */ - tempbuffer = ReadBuffer(index, P_NEW); - LockBuffer(tempbuffer, GIST_EXCLUSIVE); - memcpy(BufferGetPage(tempbuffer), pages[0], BufferGetPageSize(tempbuffer)); - - /* swap buffers[0] (was root) and temp buffer */ - tmp = buffers[0]; - buffers[0] = tempbuffer; - tempbuffer = tmp; /* now in tempbuffer GIST_ROOT_BLKNO, - * it is still unchanged */ - - pages[0] = BufferGetPage(buffers[0]); - } - - START_CRIT_SECTION(); - - for (j = 0; j < ntodelete; j++) - PageIndexTupleDelete(pages[0], todelete[j]); - - xlinfo = XLOG_GIST_PAGE_SPLIT; - rdata = formSplitRdata(index->rd_node, insert->path[i], - false, &(insert->key), - gistMakePageLayout(buffers, numbuffer)); - - } - else - { - START_CRIT_SECTION(); - - for (j = 0; j < ntodelete; j++) - PageIndexTupleDelete(pages[0], todelete[j]); - gistfillbuffer(pages[0], itup, lenitup, InvalidOffsetNumber); - - xlinfo = XLOG_GIST_PAGE_UPDATE; - rdata = formUpdateRdata(index->rd_node, buffers[0], - todelete, ntodelete, - itup, lenitup, &(insert->key)); - } - - /* - * use insert->key as mark for completion of insert (form*Rdata() - * above) for following possible replays - */ - - /* write pages, we should mark it dirty befor XLogInsert() */ - for (j = 0; j < numbuffer; j++) - { - GistPageGetOpaque(pages[j])->rightlink = InvalidBlockNumber; - MarkBufferDirty(buffers[j]); - } - recptr = XLogInsert(RM_GIST_ID, xlinfo, rdata); - for (j = 0; j < numbuffer; j++) - { - PageSetLSN(pages[j], recptr); - PageSetTLI(pages[j], ThisTimeLineID); - } - - END_CRIT_SECTION(); - - lenitup = numbuffer; - for (j = 0; j < numbuffer; j++) - { - itup[j] = gist_form_invalid_tuple(BufferGetBlockNumber(buffers[j])); - UnlockReleaseBuffer(buffers[j]); - } - - if (tempbuffer != InvalidBuffer) - { - /* - * it was a root split, so fill it by new values - */ - gistnewroot(index, tempbuffer, itup, lenitup, &(insert->key)); - UnlockReleaseBuffer(tempbuffer); - } - } - } - - FreeFakeRelcacheEntry(index); - - ereport(LOG, - (errmsg("index %u/%u/%u needs VACUUM FULL or REINDEX to finish crash recovery", - insert->node.spcNode, insert->node.dbNode, insert->node.relNode), - errdetail("Incomplete insertion detected during crash replay."))); -} - void gist_xlog_startup(void) { - incomplete_inserts = NIL; - insertCtx = AllocSetContextCreate(CurrentMemoryContext, - "GiST recovery temporary context", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); opCtx = createTempGistContext(); } void gist_xlog_cleanup(void) { - ListCell *l; - MemoryContext oldCxt; - - oldCxt = MemoryContextSwitchTo(opCtx); - - foreach(l, incomplete_inserts) - { - gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l); - - gistContinueInsert(insert); - MemoryContextReset(opCtx); - } - MemoryContextSwitchTo(oldCxt); - MemoryContextDelete(opCtx); - MemoryContextDelete(insertCtx); -} - -bool -gist_safe_restartpoint(void) -{ - if (incomplete_inserts) - return false; - return true; } - -XLogRecData * -formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf, - ItemPointer key, SplitedPageLayout *dist) +/* + * Write WAL record of a page split. + */ +XLogRecPtr +gistXLogSplit(RelFileNode node, BlockNumber blkno, bool page_is_leaf, + SplitedPageLayout *dist, + BlockNumber origrlink, GistNSN orignsn, + Buffer leftchildbuf) { XLogRecData *rdata; - gistxlogPageSplit *xlrec = (gistxlogPageSplit *) palloc(sizeof(gistxlogPageSplit)); + gistxlogPageSplit xlrec; SplitedPageLayout *ptr; int npage = 0, - cur = 1; + cur; + XLogRecPtr recptr; - ptr = dist; - while (ptr) - { + for (ptr = dist; ptr; ptr = ptr->next) npage++; - ptr = ptr->next; - } rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (npage * 2 + 2)); - xlrec->node = node; - xlrec->origblkno = blkno; - xlrec->origleaf = page_is_leaf; - xlrec->npage = (uint16) npage; - if (key) - xlrec->key = *key; - else - ItemPointerSetInvalid(&(xlrec->key)); + xlrec.node = node; + xlrec.origblkno = blkno; + xlrec.origrlink = origrlink; + xlrec.orignsn = orignsn; + xlrec.origleaf = page_is_leaf; + xlrec.npage = (uint16) npage; + xlrec.leftchild = + BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber; - rdata[0].buffer = InvalidBuffer; - rdata[0].data = (char *) xlrec; + rdata[0].data = (char *) &xlrec; rdata[0].len = sizeof(gistxlogPageSplit); - rdata[0].next = NULL; + rdata[0].buffer = InvalidBuffer; - ptr = dist; - while (ptr) + cur = 1; + + /* + * Include a full page image of the child buf. (only necessary if a + * checkpoint happened since the child page was split) + */ + if (BufferIsValid(leftchildbuf)) { + rdata[cur - 1].next = &(rdata[cur]); + rdata[cur].data = NULL; + rdata[cur].len = 0; + rdata[cur].buffer = leftchildbuf; + rdata[cur].buffer_std = true; + cur++; + } + + for (ptr = dist; ptr; ptr = ptr->next) + { + rdata[cur - 1].next = &(rdata[cur]); rdata[cur].buffer = InvalidBuffer; rdata[cur].data = (char *) &(ptr->block); rdata[cur].len = sizeof(gistxlogPage); - rdata[cur - 1].next = &(rdata[cur]); cur++; + rdata[cur - 1].next = &(rdata[cur]); rdata[cur].buffer = InvalidBuffer; rdata[cur].data = (char *) (ptr->list); rdata[cur].len = ptr->lenlist; - rdata[cur - 1].next = &(rdata[cur]); - rdata[cur].next = NULL; cur++; - ptr = ptr->next; } + rdata[cur - 1].next = NULL; + + recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata); - return rdata; + pfree(rdata); + return recptr; } /* - * Construct the rdata array for an XLOG record describing a page update - * (deletion and/or insertion of tuples on a single index page). + * Write XLOG record describing a page update. The update can include any + * number of deletions and/or insertions of tuples on a single index page. + * + * If this update inserts a downlink for a split page, also record that + * the F_FOLLOW_RIGHT flag on the child page is cleared and NSN set. * * Note that both the todelete array and the tuples are marked as belonging * to the target buffer; they need not be stored in XLOG if XLogInsert decides @@ -911,27 +489,26 @@ formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf, * at least one rdata item referencing the buffer, even when ntodelete and * ituplen are both zero; this ensures that XLogInsert knows about the buffer. */ -XLogRecData * -formUpdateRdata(RelFileNode node, Buffer buffer, - OffsetNumber *todelete, int ntodelete, - IndexTuple *itup, int ituplen, ItemPointer key) +XLogRecPtr +gistXLogUpdate(RelFileNode node, Buffer buffer, + OffsetNumber *todelete, int ntodelete, + IndexTuple *itup, int ituplen, + Buffer leftchildbuf) { XLogRecData *rdata; gistxlogPageUpdate *xlrec; int cur, i; + XLogRecPtr recptr; - rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (3 + ituplen)); + rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (4 + ituplen)); xlrec = (gistxlogPageUpdate *) palloc(sizeof(gistxlogPageUpdate)); xlrec->node = node; xlrec->blkno = BufferGetBlockNumber(buffer); xlrec->ntodelete = ntodelete; - - if (key) - xlrec->key = *key; - else - ItemPointerSetInvalid(&(xlrec->key)); + xlrec->leftchild = + BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber; rdata[0].buffer = buffer; rdata[0].buffer_std = true; @@ -945,13 +522,13 @@ formUpdateRdata(RelFileNode node, Buffer buffer, rdata[1].next = &(rdata[2]); rdata[2].data = (char *) todelete; - rdata[2].len = MAXALIGN(sizeof(OffsetNumber) * ntodelete); + rdata[2].len = sizeof(OffsetNumber) * ntodelete; rdata[2].buffer = buffer; rdata[2].buffer_std = true; - rdata[2].next = NULL; - /* new tuples */ cur = 3; + + /* new tuples */ for (i = 0; i < ituplen; i++) { rdata[cur - 1].next = &(rdata[cur]); @@ -959,38 +536,26 @@ formUpdateRdata(RelFileNode node, Buffer buffer, rdata[cur].len = IndexTupleSize(itup[i]); rdata[cur].buffer = buffer; rdata[cur].buffer_std = true; - rdata[cur].next = NULL; cur++; } - return rdata; -} - -XLogRecPtr -gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len) -{ - gistxlogInsertComplete xlrec; - XLogRecData rdata[2]; - XLogRecPtr recptr; - - Assert(len > 0); - xlrec.node = node; - - rdata[0].buffer = InvalidBuffer; - rdata[0].data = (char *) &xlrec; - rdata[0].len = sizeof(gistxlogInsertComplete); - rdata[0].next = &(rdata[1]); - - rdata[1].buffer = InvalidBuffer; - rdata[1].data = (char *) keys; - rdata[1].len = sizeof(ItemPointerData) * len; - rdata[1].next = NULL; - - START_CRIT_SECTION(); - - recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_INSERT_COMPLETE, rdata); + /* + * Include a full page image of the child buf. (only necessary if + * a checkpoint happened since the child page was split) + */ + if (BufferIsValid(leftchildbuf)) + { + rdata[cur - 1].next = &(rdata[cur]); + rdata[cur].data = NULL; + rdata[cur].len = 0; + rdata[cur].buffer = leftchildbuf; + rdata[cur].buffer_std = true; + cur++; + } + rdata[cur - 1].next = NULL; - END_CRIT_SECTION(); + recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata); + pfree(rdata); return recptr; } |