diff options
Diffstat (limited to 'src/backend/access/gist/gistxlog.c')
-rw-r--r-- | src/backend/access/gist/gistxlog.c | 269 |
1 files changed, 107 insertions, 162 deletions
diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c index 2999d21191..0a4f04810f 100644 --- a/src/backend/access/gist/gistxlog.c +++ b/src/backend/access/gist/gistxlog.c @@ -18,18 +18,6 @@ #include "access/xlogutils.h" #include "utils/memutils.h" -typedef struct -{ - gistxlogPage *header; - IndexTuple *itup; -} NewPage; - -typedef struct -{ - gistxlogPageSplit *data; - NewPage *page; -} PageSplitRecord; - static MemoryContext opCtx; /* working memory for operations */ /* @@ -44,9 +32,9 @@ static MemoryContext opCtx; /* working memory for operations */ * action.) */ static void -gistRedoClearFollowRight(XLogRecPtr lsn, XLogRecord *record, int block_index, - RelFileNode node, BlockNumber childblkno) +gistRedoClearFollowRight(XLogReaderState *record, uint8 block_id) { + XLogRecPtr lsn = record->EndRecPtr; Buffer buffer; Page page; XLogRedoAction action; @@ -55,8 +43,7 @@ gistRedoClearFollowRight(XLogRecPtr lsn, XLogRecord *record, int block_index, * Note that we still update the page even if it was restored from a full * page image, because the updated NSN is not included in the image. */ - action = XLogReadBufferForRedo(lsn, record, block_index, node, childblkno, - &buffer); + action = XLogReadBufferForRedo(record, block_id, &buffer); if (action == BLK_NEEDS_REDO || action == BLK_RESTORED) { page = BufferGetPage(buffer); @@ -75,20 +62,23 @@ gistRedoClearFollowRight(XLogRecPtr lsn, XLogRecord *record, int block_index, * redo any page update (except page split) */ static void -gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record) +gistRedoPageUpdateRecord(XLogReaderState *record) { - char *begin = XLogRecGetData(record); - gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) begin; + XLogRecPtr lsn = record->EndRecPtr; + gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) XLogRecGetData(record); Buffer buffer; Page page; - char *data; - if (XLogReadBufferForRedo(lsn, record, 0, xldata->node, xldata->blkno, - &buffer) == BLK_NEEDS_REDO) + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { - page = (Page) BufferGetPage(buffer); + char *begin; + char *data; + Size datalen; + int ninserted = 0; - data = begin + sizeof(gistxlogPageUpdate); + data = begin = XLogRecGetBlockData(record, 0, &datalen); + + page = (Page) BufferGetPage(buffer); /* Delete old tuples */ if (xldata->ntodelete > 0) @@ -105,12 +95,12 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record) } /* add tuples */ - if (data - begin < record->xl_len) + if (data - begin < datalen) { OffsetNumber off = (PageIsEmpty(page)) ? FirstOffsetNumber : OffsetNumberNext(PageGetMaxOffsetNumber(page)); - while (data - begin < record->xl_len) + while (data - begin < datalen) { IndexTuple itup = (IndexTuple) data; Size sz = IndexTupleSize(itup); @@ -123,9 +113,12 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record) elog(ERROR, "failed to add item to GiST index page, size %d bytes", (int) sz); off++; + ninserted++; } } + Assert(ninserted == xldata->ntoinsert); + PageSetLSN(page, lsn); MarkBufferDirty(buffer); } @@ -137,58 +130,51 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record) * that even if the target page no longer exists, we still attempt to * replay the change on the child page. */ - if (BlockNumberIsValid(xldata->leftchild)) - gistRedoClearFollowRight(lsn, record, 1, - xldata->node, xldata->leftchild); + if (XLogRecHasBlockRef(record, 1)) + gistRedoClearFollowRight(record, 1); if (BufferIsValid(buffer)) UnlockReleaseBuffer(buffer); } -static void -decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record) +/* + * Returns an array of index pointers. + */ +static IndexTuple * +decodePageSplitRecord(char *begin, int len, int *n) { - char *begin = XLogRecGetData(record), - *ptr; - int j, - i = 0; + char *ptr; + int i = 0; + IndexTuple *tuples; + + /* extract the number of tuples */ + memcpy(n, begin, sizeof(int)); + ptr = begin + sizeof(int); - decoded->data = (gistxlogPageSplit *) begin; - decoded->page = (NewPage *) palloc(sizeof(NewPage) * decoded->data->npage); + tuples = palloc(*n * sizeof(IndexTuple)); - ptr = begin + sizeof(gistxlogPageSplit); - for (i = 0; i < decoded->data->npage; i++) + for (i = 0; i < *n; i++) { - Assert(ptr - begin < record->xl_len); - decoded->page[i].header = (gistxlogPage *) ptr; - ptr += sizeof(gistxlogPage); - - decoded->page[i].itup = (IndexTuple *) - palloc(sizeof(IndexTuple) * decoded->page[i].header->num); - j = 0; - while (j < decoded->page[i].header->num) - { - Assert(ptr - begin < record->xl_len); - decoded->page[i].itup[j] = (IndexTuple) ptr; - ptr += IndexTupleSize((IndexTuple) ptr); - j++; - } + Assert(ptr - begin < len); + tuples[i] = (IndexTuple) ptr; + ptr += IndexTupleSize((IndexTuple) ptr); } + Assert(ptr - begin == len); + + return tuples; } static void -gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record) +gistRedoPageSplitRecord(XLogReaderState *record) { + XLogRecPtr lsn = record->EndRecPtr; gistxlogPageSplit *xldata = (gistxlogPageSplit *) XLogRecGetData(record); - PageSplitRecord xlrec; Buffer firstbuffer = InvalidBuffer; Buffer buffer; Page page; int i; bool isrootsplit = false; - decodePageSplitRecord(&xlrec, record); - /* * We must hold lock on the first-listed page throughout the action, * including while updating the left child page (if any). We can unlock @@ -198,32 +184,39 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record) */ /* loop around all pages */ - for (i = 0; i < xlrec.data->npage; i++) + for (i = 0; i < xldata->npage; i++) { - NewPage *newpage = xlrec.page + i; int flags; - - if (newpage->header->blkno == GIST_ROOT_BLKNO) + char *data; + Size datalen; + int num; + BlockNumber blkno; + IndexTuple *tuples; + + XLogRecGetBlockTag(record, i + 1, NULL, NULL, &blkno); + if (blkno == GIST_ROOT_BLKNO) { Assert(i == 0); isrootsplit = true; } - buffer = XLogReadBuffer(xlrec.data->node, newpage->header->blkno, true); - Assert(BufferIsValid(buffer)); + buffer = XLogInitBufferForRedo(record, i + 1); page = (Page) BufferGetPage(buffer); + data = XLogRecGetBlockData(record, i + 1, &datalen); + + tuples = decodePageSplitRecord(data, datalen, &num); /* ok, clear buffer */ - if (xlrec.data->origleaf && newpage->header->blkno != GIST_ROOT_BLKNO) + if (xldata->origleaf && blkno != GIST_ROOT_BLKNO) flags = F_LEAF; else flags = 0; GISTInitBuffer(buffer, flags); /* and fill it */ - gistfillbuffer(page, newpage->itup, newpage->header->num, FirstOffsetNumber); + gistfillbuffer(page, tuples, num, FirstOffsetNumber); - if (newpage->header->blkno == GIST_ROOT_BLKNO) + if (blkno == GIST_ROOT_BLKNO) { GistPageGetOpaque(page)->rightlink = InvalidBlockNumber; GistPageSetNSN(page, xldata->orignsn); @@ -231,12 +224,17 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record) } else { - if (i < xlrec.data->npage - 1) - GistPageGetOpaque(page)->rightlink = xlrec.page[i + 1].header->blkno; + if (i < xldata->npage - 1) + { + BlockNumber nextblkno; + + XLogRecGetBlockTag(record, i + 2, NULL, NULL, &nextblkno); + GistPageGetOpaque(page)->rightlink = nextblkno; + } else GistPageGetOpaque(page)->rightlink = xldata->origrlink; GistPageSetNSN(page, xldata->orignsn); - if (i < xlrec.data->npage - 1 && !isrootsplit && + if (i < xldata->npage - 1 && !isrootsplit && xldata->markfollowright) GistMarkFollowRight(page); else @@ -253,26 +251,22 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record) } /* Fix follow-right data on left child page, if any */ - if (BlockNumberIsValid(xldata->leftchild)) - gistRedoClearFollowRight(lsn, record, 0, - xldata->node, xldata->leftchild); + if (XLogRecHasBlockRef(record, 0)) + gistRedoClearFollowRight(record, 0); /* Finally, release lock on the first page */ UnlockReleaseBuffer(firstbuffer); } static void -gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) +gistRedoCreateIndex(XLogReaderState *record) { - RelFileNode *node = (RelFileNode *) XLogRecGetData(record); + XLogRecPtr lsn = record->EndRecPtr; Buffer buffer; Page page; - /* Backup blocks are not used in create_index records */ - Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK)); - - buffer = XLogReadBuffer(*node, GIST_ROOT_BLKNO, true); - Assert(BufferIsValid(buffer)); + buffer = XLogInitBufferForRedo(record, 0); + Assert(BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO); page = (Page) BufferGetPage(buffer); GISTInitBuffer(buffer, F_LEAF); @@ -284,9 +278,9 @@ gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) } void -gist_redo(XLogRecPtr lsn, XLogRecord *record) +gist_redo(XLogReaderState *record) { - uint8 info = record->xl_info & ~XLR_INFO_MASK; + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; MemoryContext oldCxt; /* @@ -299,13 +293,13 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record) switch (info) { case XLOG_GIST_PAGE_UPDATE: - gistRedoPageUpdateRecord(lsn, record); + gistRedoPageUpdateRecord(record); break; case XLOG_GIST_PAGE_SPLIT: - gistRedoPageSplitRecord(lsn, record); + gistRedoPageSplitRecord(record); break; case XLOG_GIST_CREATE_INDEX: - gistRedoCreateIndex(lsn, record); + gistRedoCreateIndex(record); break; default: elog(PANIC, "gist_redo: unknown op code %u", info); @@ -336,70 +330,49 @@ gistXLogSplit(RelFileNode node, BlockNumber blkno, bool page_is_leaf, BlockNumber origrlink, GistNSN orignsn, Buffer leftchildbuf, bool markfollowright) { - XLogRecData rdata[GIST_MAX_SPLIT_PAGES * 2 + 2]; gistxlogPageSplit xlrec; SplitedPageLayout *ptr; - int npage = 0, - cur; + int npage = 0; XLogRecPtr recptr; + int i; for (ptr = dist; ptr; ptr = ptr->next) npage++; - /* - * the caller should've checked this already, but doesn't hurt to check - * again. - */ - if (npage > GIST_MAX_SPLIT_PAGES) - elog(ERROR, "GiST page split into too many halves"); - - xlrec.node = node; - xlrec.origblkno = blkno; xlrec.origrlink = origrlink; xlrec.orignsn = orignsn; xlrec.origleaf = page_is_leaf; xlrec.npage = (uint16) npage; - xlrec.leftchild = - BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber; xlrec.markfollowright = markfollowright; - rdata[0].data = (char *) &xlrec; - rdata[0].len = sizeof(gistxlogPageSplit); - rdata[0].buffer = InvalidBuffer; - - cur = 1; + XLogBeginInsert(); /* * Include a full page image of the child buf. (only necessary if a * checkpoint happened since the child page was split) */ if (BufferIsValid(leftchildbuf)) - { - rdata[cur - 1].next = &(rdata[cur]); - rdata[cur].data = NULL; - rdata[cur].len = 0; - rdata[cur].buffer = leftchildbuf; - rdata[cur].buffer_std = true; - cur++; - } + XLogRegisterBuffer(0, leftchildbuf, REGBUF_STANDARD); + /* + * NOTE: We register a lot of data. The caller must've called + * XLogEnsureRecordSpace() to prepare for that. We cannot do it here, + * because we're already in a critical section. If you change the number + * of buffer or data registrations here, make sure you modify the + * XLogEnsureRecordSpace() calls accordingly! + */ + XLogRegisterData((char *) &xlrec, sizeof(gistxlogPageSplit)); + + i = 1; for (ptr = dist; ptr; ptr = ptr->next) { - rdata[cur - 1].next = &(rdata[cur]); - rdata[cur].buffer = InvalidBuffer; - rdata[cur].data = (char *) &(ptr->block); - rdata[cur].len = sizeof(gistxlogPage); - cur++; - - rdata[cur - 1].next = &(rdata[cur]); - rdata[cur].buffer = InvalidBuffer; - rdata[cur].data = (char *) (ptr->list); - rdata[cur].len = ptr->lenlist; - cur++; + XLogRegisterBuffer(i, ptr->buffer, REGBUF_WILL_INIT); + XLogRegisterBufData(i, (char *) &(ptr->block.num), sizeof(int)); + XLogRegisterBufData(i, (char *) ptr->list, ptr->lenlist); + i++; } - rdata[cur - 1].next = NULL; - recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata); + recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT); return recptr; } @@ -413,9 +386,7 @@ gistXLogSplit(RelFileNode node, BlockNumber blkno, bool page_is_leaf, * * Note that both the todelete array and the tuples are marked as belonging * to the target buffer; they need not be stored in XLOG if XLogInsert decides - * to log the whole buffer contents instead. Also, we take care that there's - * at least one rdata item referencing the buffer, even when ntodelete and - * ituplen are both zero; this ensures that XLogInsert knows about the buffer. + * to log the whole buffer contents instead. */ XLogRecPtr gistXLogUpdate(RelFileNode node, Buffer buffer, @@ -423,57 +394,31 @@ gistXLogUpdate(RelFileNode node, Buffer buffer, IndexTuple *itup, int ituplen, Buffer leftchildbuf) { - XLogRecData rdata[MaxIndexTuplesPerPage + 3]; gistxlogPageUpdate xlrec; - int cur, - i; + int i; XLogRecPtr recptr; - xlrec.node = node; - xlrec.blkno = BufferGetBlockNumber(buffer); xlrec.ntodelete = ntodelete; - xlrec.leftchild = - BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber; - - rdata[0].data = (char *) &xlrec; - rdata[0].len = sizeof(gistxlogPageUpdate); - rdata[0].buffer = InvalidBuffer; - rdata[0].next = &(rdata[1]); + xlrec.ntoinsert = ituplen; - rdata[1].data = (char *) todelete; - rdata[1].len = sizeof(OffsetNumber) * ntodelete; - rdata[1].buffer = buffer; - rdata[1].buffer_std = true; + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, sizeof(gistxlogPageUpdate)); - cur = 2; + XLogRegisterBuffer(0, buffer, REGBUF_STANDARD); + XLogRegisterBufData(0, (char *) todelete, sizeof(OffsetNumber) * ntodelete); /* new tuples */ for (i = 0; i < ituplen; i++) - { - rdata[cur - 1].next = &(rdata[cur]); - rdata[cur].data = (char *) (itup[i]); - rdata[cur].len = IndexTupleSize(itup[i]); - rdata[cur].buffer = buffer; - rdata[cur].buffer_std = true; - cur++; - } + XLogRegisterBufData(0, (char *) (itup[i]), IndexTupleSize(itup[i])); /* * Include a full page image of the child buf. (only necessary if a * checkpoint happened since the child page was split) */ if (BufferIsValid(leftchildbuf)) - { - rdata[cur - 1].next = &(rdata[cur]); - rdata[cur].data = NULL; - rdata[cur].len = 0; - rdata[cur].buffer = leftchildbuf; - rdata[cur].buffer_std = true; - cur++; - } - rdata[cur - 1].next = NULL; + XLogRegisterBuffer(1, leftchildbuf, REGBUF_STANDARD); - recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata); + recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE); return recptr; } |