summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/access/gin/ginbtree.c182
-rw-r--r--src/backend/access/gin/gindatapage.c168
-rw-r--r--src/backend/access/gin/ginentrypage.c96
-rw-r--r--src/backend/access/gin/ginget.c2
-rw-r--r--src/backend/access/gin/gininsert.c11
-rw-r--r--src/backend/access/gin/ginxlog.c22
-rw-r--r--src/include/access/gin_private.h46
7 files changed, 309 insertions, 218 deletions
diff --git a/src/backend/access/gin/ginbtree.c b/src/backend/access/gin/ginbtree.c
index 7248b06326..f3c0dfd128 100644
--- a/src/backend/access/gin/ginbtree.c
+++ b/src/backend/access/gin/ginbtree.c
@@ -19,7 +19,7 @@
#include "utils/rel.h"
/*
- * Locks buffer by needed method for search.
+ * Lock buffer by needed method for search.
*/
static int
ginTraverseLock(Buffer buffer, bool searchMode)
@@ -53,9 +53,9 @@ ginTraverseLock(Buffer buffer, bool searchMode)
}
/*
- * Descends the tree to the leaf page that contains or would contain the
- * key we're searching for. The key should already be filled in 'btree',
- * in tree-type specific manner. If btree->fullScan is true, descends to the
+ * Descend the tree to the leaf page that contains or would contain the key
+ * we're searching for. The key should already be filled in 'btree', in
+ * tree-type specific manner. If btree->fullScan is true, descends to the
* leftmost leaf page.
*
* If 'searchmode' is false, on return stack->buffer is exclusively locked,
@@ -63,13 +63,13 @@ ginTraverseLock(Buffer buffer, bool searchMode)
* is share-locked, and stack->parent is NULL.
*/
GinBtreeStack *
-ginFindLeafPage(GinBtree btree, BlockNumber rootBlkno, bool searchMode)
+ginFindLeafPage(GinBtree btree, bool searchMode)
{
GinBtreeStack *stack;
stack = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
- stack->blkno = rootBlkno;
- stack->buffer = ReadBuffer(btree->index, rootBlkno);
+ stack->blkno = btree->rootBlkno;
+ stack->buffer = ReadBuffer(btree->index, btree->rootBlkno);
stack->parent = NULL;
stack->predictNumber = 1;
@@ -89,7 +89,7 @@ ginFindLeafPage(GinBtree btree, BlockNumber rootBlkno, bool searchMode)
* ok, page is correctly locked, we should check to move right ..,
* root never has a right link, so small optimization
*/
- while (btree->fullScan == FALSE && stack->blkno != rootBlkno &&
+ while (btree->fullScan == FALSE && stack->blkno != btree->rootBlkno &&
btree->isMoveRight(btree, page))
{
BlockNumber rightlink = GinPageGetOpaque(page)->rightlink;
@@ -146,7 +146,7 @@ ginStepRight(Buffer buffer, Relation index, int lockmode)
Page page = BufferGetPage(buffer);
bool isLeaf = GinPageIsLeaf(page);
bool isData = GinPageIsData(page);
- BlockNumber blkno = GinPageGetOpaque(page)->rightlink;
+ BlockNumber blkno = GinPageGetOpaque(page)->rightlink;
nextbuffer = ReadBuffer(index, blkno);
LockBuffer(nextbuffer, lockmode);
@@ -158,10 +158,10 @@ ginStepRight(Buffer buffer, Relation index, int lockmode)
elog(ERROR, "right sibling of GIN page is of different type");
/*
- * Given the proper lock sequence above, we should never land on a
- * deleted page.
+ * Given the proper lock sequence above, we should never land on a deleted
+ * page.
*/
- if (GinPageIsDeleted(page))
+ if (GinPageIsDeleted(page))
elog(ERROR, "right sibling of GIN page was deleted");
return nextbuffer;
@@ -183,14 +183,12 @@ freeGinBtreeStack(GinBtreeStack *stack)
}
/*
- * Try to find parent for current stack position, returns correct
- * parent and child's offset in stack->parent.
- * Function should never release root page to prevent conflicts
- * with vacuum process
+ * Try to find parent for current stack position. Returns correct parent and
+ * child's offset in stack->parent. The root page is never released, to
+ * to prevent conflict with vacuum process.
*/
void
-ginFindParents(GinBtree btree, GinBtreeStack *stack,
- BlockNumber rootBlkno)
+ginFindParents(GinBtree btree, GinBtreeStack *stack)
{
Page page;
Buffer buffer;
@@ -204,8 +202,8 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack,
{
/* XLog mode... */
root = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
- root->blkno = rootBlkno;
- root->buffer = ReadBuffer(btree->index, rootBlkno);
+ root->blkno = btree->rootBlkno;
+ root->buffer = ReadBuffer(btree->index, btree->rootBlkno);
LockBuffer(root->buffer, GIN_EXCLUSIVE);
root->parent = NULL;
}
@@ -221,8 +219,8 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack,
root = root->parent;
}
- Assert(root->blkno == rootBlkno);
- Assert(BufferGetBlockNumber(root->buffer) == rootBlkno);
+ Assert(root->blkno == btree->rootBlkno);
+ Assert(BufferGetBlockNumber(root->buffer) == btree->rootBlkno);
LockBuffer(root->buffer, GIN_EXCLUSIVE);
}
root->off = InvalidOffsetNumber;
@@ -268,7 +266,7 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack,
ptr = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
ptr->blkno = blkno;
ptr->buffer = buffer;
- ptr->parent = root; /* it's may be wrong, but in next call we will
+ ptr->parent = root; /* it may be wrong, but in next call we will
* correct */
ptr->off = offset;
stack->parent = ptr;
@@ -280,21 +278,35 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack,
}
/*
- * Returns true if the insertion is done, false if the page was split and
- * downlink insertion is pending.
+ * Insert a new item to a page.
+ *
+ * Returns true if the insertion was finished. On false, the page was split and
+ * the parent needs to be updated. (a root split returns true as it doesn't
+ * need any further action by the caller to complete)
+ *
+ * When inserting a downlink to a internal page, the existing item at the
+ * given location is updated to point to 'updateblkno'.
*
* stack->buffer is locked on entry, and is kept locked.
*/
static bool
-ginPlaceToPage(GinBtree btree, BlockNumber rootBlkno, GinBtreeStack *stack,
+ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
+ void *insertdata, BlockNumber updateblkno,
GinStatsData *buildStats)
{
Page page = BufferGetPage(stack->buffer);
XLogRecData *rdata;
bool fit;
+ /*
+ * Try to put the incoming tuple on the page. If it doesn't fit,
+ * placeToPage method will return false and leave the page unmodified, and
+ * we'll have to split the page.
+ */
START_CRIT_SECTION();
- fit = btree->placeToPage(btree, stack->buffer, stack->off, &rdata);
+ fit = btree->placeToPage(btree, stack->buffer, stack->off,
+ insertdata, updateblkno,
+ &rdata);
if (fit)
{
MarkBufferDirty(stack->buffer);
@@ -324,18 +336,7 @@ ginPlaceToPage(GinBtree btree, BlockNumber rootBlkno, GinBtreeStack *stack,
END_CRIT_SECTION();
rbuffer = GinNewBuffer(btree->index);
-
- savedRightLink = GinPageGetOpaque(page)->rightlink;
-
- /*
- * newlpage is a pointer to memory page, it is not associated with
- * a buffer. stack->buffer is not touched yet.
- */
- newlpage = btree->splitPage(btree, stack->buffer, rbuffer, stack->off, &rdata);
-
- ((ginxlogSplit *) (rdata->data))->rootBlkno = rootBlkno;
-
- /* During index build, count the newly-split page */
+ /* During index build, count the new page */
if (buildStats)
{
if (btree->isData)
@@ -344,6 +345,18 @@ ginPlaceToPage(GinBtree btree, BlockNumber rootBlkno, GinBtreeStack *stack,
buildStats->nEntryPages++;
}
+ savedRightLink = GinPageGetOpaque(page)->rightlink;
+
+ /*
+ * newlpage is a pointer to memory page, it is not associated with a
+ * buffer. stack->buffer is not touched yet.
+ */
+ newlpage = btree->splitPage(btree, stack->buffer, rbuffer, stack->off,
+ insertdata, updateblkno,
+ &rdata);
+
+ ((ginxlogSplit *) (rdata->data))->rootBlkno = btree->rootBlkno;
+
parent = stack->parent;
if (parent == NULL)
@@ -354,6 +367,15 @@ ginPlaceToPage(GinBtree btree, BlockNumber rootBlkno, GinBtreeStack *stack,
*/
Buffer lbuffer = GinNewBuffer(btree->index);
+ /* During index build, count the new page */
+ if (buildStats)
+ {
+ if (btree->isData)
+ buildStats->nDataPages++;
+ else
+ buildStats->nEntryPages++;
+ }
+
((ginxlogSplit *) (rdata->data))->isRootSplit = TRUE;
((ginxlogSplit *) (rdata->data))->rrlink = InvalidBlockNumber;
@@ -434,46 +456,27 @@ ginPlaceToPage(GinBtree btree, BlockNumber rootBlkno, GinBtreeStack *stack,
}
/*
- * Insert value (stored in GinBtree) to tree described by stack
+ * Finish a split by inserting the downlink for the new page to parent.
*
- * During an index build, buildStats is non-null and the counters
- * it contains are incremented as needed.
+ * On entry, stack->buffer is exclusively locked.
*
* NB: the passed-in stack is freed, as though by freeGinBtreeStack.
*/
void
-ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
+ginFinishSplit(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
{
- GinBtreeStack *parent;
- BlockNumber rootBlkno;
Page page;
-
- /* extract root BlockNumber from stack */
- Assert(stack != NULL);
- parent = stack;
- while (parent->parent)
- parent = parent->parent;
- rootBlkno = parent->blkno;
- Assert(BlockNumberIsValid(rootBlkno));
+ bool done;
/* this loop crawls up the stack until the insertion is complete */
- for (;;)
+ do
{
- bool done;
-
- done = ginPlaceToPage(btree, rootBlkno, stack, buildStats);
-
- /* just to be extra sure we don't delete anything by accident... */
- btree->isDelete = FALSE;
-
- if (done)
- {
- LockBuffer(stack->buffer, GIN_UNLOCK);
- freeGinBtreeStack(stack);
- break;
- }
+ GinBtreeStack *parent = stack->parent;
+ void *insertdata;
+ BlockNumber updateblkno;
- btree->prepareDownlink(btree, stack->buffer);
+ insertdata = btree->prepareDownlink(btree, stack->buffer);
+ updateblkno = GinPageGetOpaque(BufferGetPage(stack->buffer))->rightlink;
/* search parent to lock */
LockBuffer(parent->buffer, GIN_EXCLUSIVE);
@@ -491,7 +494,7 @@ ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
* plain search...
*/
LockBuffer(parent->buffer, GIN_UNLOCK);
- ginFindParents(btree, stack, rootBlkno);
+ ginFindParents(btree, stack);
parent = stack->parent;
Assert(parent != NULL);
break;
@@ -502,8 +505,49 @@ ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
page = BufferGetPage(parent->buffer);
}
+ /* release the child */
UnlockReleaseBuffer(stack->buffer);
pfree(stack);
stack = parent;
+
+ /* insert the downlink to parent */
+ done = ginPlaceToPage(btree, stack,
+ insertdata, updateblkno,
+ buildStats);
+ pfree(insertdata);
+ } while (!done);
+ LockBuffer(stack->buffer, GIN_UNLOCK);
+
+ /* free the rest of the stack */
+ freeGinBtreeStack(stack);
+}
+
+/*
+ * Insert a value to tree described by stack.
+ *
+ * The value to be inserted is given in 'insertdata'. Its format depends
+ * on whether this is an entry or data tree, ginInsertValue just passes it
+ * through to the tree-specific callback function.
+ *
+ * During an index build, buildStats is non-null and the counters it contains
+ * are incremented as needed.
+ *
+ * NB: the passed-in stack is freed, as though by freeGinBtreeStack.
+ */
+void
+ginInsertValue(GinBtree btree, GinBtreeStack *stack, void *insertdata,
+ GinStatsData *buildStats)
+{
+ bool done;
+
+ done = ginPlaceToPage(btree, stack,
+ insertdata, InvalidBlockNumber,
+ buildStats);
+ if (done)
+ {
+ LockBuffer(stack->buffer, GIN_UNLOCK);
+ freeGinBtreeStack(stack);
}
+ else
+ ginFinishSplit(btree, stack, buildStats);
}
diff --git a/src/backend/access/gin/gindatapage.c b/src/backend/access/gin/gindatapage.c
index fbdde1dc02..14e5e8b61f 100644
--- a/src/backend/access/gin/gindatapage.c
+++ b/src/backend/access/gin/gindatapage.c
@@ -30,7 +30,7 @@ dataIsMoveRight(GinBtree btree, Page page)
if (GinPageRightMost(page))
return FALSE;
- return (ginCompareItemPointers(btree->items + btree->curitem, iptr) > 0) ? TRUE : FALSE;
+ return (ginCompareItemPointers(&btree->itemptr, iptr) > 0) ? TRUE : FALSE;
}
/*
@@ -80,7 +80,7 @@ dataLocateItem(GinBtree btree, GinBtreeStack *stack)
else
{
pitem = GinDataPageGetPostingItem(page, mid);
- result = ginCompareItemPointers(btree->items + btree->curitem, &(pitem->key));
+ result = ginCompareItemPointers(&btree->itemptr, &(pitem->key));
}
if (result == 0)
@@ -138,7 +138,7 @@ dataLocateLeafItem(GinBtree btree, GinBtreeStack *stack)
{
OffsetNumber mid = low + ((high - low) / 2);
- result = ginCompareItemPointers(btree->items + btree->curitem,
+ result = ginCompareItemPointers(&btree->itemptr,
GinDataPageGetItemPointer(page, mid));
if (result == 0)
@@ -298,18 +298,19 @@ GinPageDeletePostingItem(Page page, OffsetNumber offset)
* item pointer never deletes!
*/
static bool
-dataIsEnoughSpace(GinBtree btree, Buffer buf, OffsetNumber off)
+dataIsEnoughSpace(GinBtree btree, Buffer buf, OffsetNumber off, void *insertdata)
{
Page page = BufferGetPage(buf);
Assert(GinPageIsData(page));
- Assert(!btree->isDelete);
if (GinPageIsLeaf(page))
{
+ GinBtreeDataLeafInsertData *items = insertdata;
+
if (GinPageRightMost(page) && off > GinPageGetOpaque(page)->maxoff)
{
- if ((btree->nitem - btree->curitem) * sizeof(ItemPointerData) <= GinDataPageGetFreeSpace(page))
+ if ((items->nitem - items->curitem) * sizeof(ItemPointerData) <= GinDataPageGetFreeSpace(page))
return true;
}
else if (sizeof(ItemPointerData) <= GinDataPageGetFreeSpace(page))
@@ -322,42 +323,21 @@ dataIsEnoughSpace(GinBtree btree, Buffer buf, OffsetNumber off)
}
/*
- * In case of previous split update old child blkno to
- * new right page
- * item pointer never deletes!
- */
-static BlockNumber
-dataPrepareData(GinBtree btree, Page page, OffsetNumber off)
-{
- BlockNumber ret = InvalidBlockNumber;
-
- Assert(GinPageIsData(page));
-
- if (!GinPageIsLeaf(page) && btree->rightblkno != InvalidBlockNumber)
- {
- PostingItem *pitem = GinDataPageGetPostingItem(page, off);
-
- PostingItemSetBlockNumber(pitem, btree->rightblkno);
- ret = btree->rightblkno;
- }
-
- btree->rightblkno = InvalidBlockNumber;
-
- return ret;
-}
-
-/*
* Places keys to page and fills WAL record. In case leaf page and
* build mode puts all ItemPointers to page.
*
* If none of the keys fit, returns false without modifying the page.
+ *
+ * On insertion to an internal node, in addition to inserting the given item,
+ * the downlink of the existing item at 'off' is updated to point to
+ * 'updateblkno'.
*/
static bool
dataPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
+ void *insertdata, BlockNumber updateblkno,
XLogRecData **prdata)
{
Page page = BufferGetPage(buf);
- int sizeofitem = GinSizeOfDataPageItem(page);
int cnt = 0;
/* these must be static so they can be returned to caller */
@@ -365,14 +345,21 @@ dataPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
static ginxlogInsert data;
/* quick exit if it doesn't fit */
- if (!dataIsEnoughSpace(btree, buf, off))
+ if (!dataIsEnoughSpace(btree, buf, off, insertdata))
return false;
*prdata = rdata;
Assert(GinPageIsData(page));
- data.updateBlkno = dataPrepareData(btree, page, off);
+ /* Update existing downlink to point to next page (on internal page) */
+ if (!GinPageIsLeaf(page))
+ {
+ PostingItem *pitem = GinDataPageGetPostingItem(page, off);
+ PostingItemSetBlockNumber(pitem, updateblkno);
+ }
+
+ data.updateBlkno = updateblkno;
data.node = btree->index->rd_node;
data.blkno = BufferGetBlockNumber(buf);
data.offset = off;
@@ -405,34 +392,43 @@ dataPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
cnt++;
rdata[cnt].buffer = InvalidBuffer;
- rdata[cnt].data = (GinPageIsLeaf(page)) ? ((char *) (btree->items + btree->curitem)) : ((char *) &(btree->pitem));
- rdata[cnt].len = sizeofitem;
+ /* data and len filled in below */
rdata[cnt].next = NULL;
if (GinPageIsLeaf(page))
{
+ GinBtreeDataLeafInsertData *items = insertdata;
+ uint32 savedPos = items->curitem;
+
if (GinPageRightMost(page) && off > GinPageGetOpaque(page)->maxoff)
{
/* usually, create index... */
- uint32 savedPos = btree->curitem;
-
- while (btree->curitem < btree->nitem)
+ while (items->curitem < items->nitem)
{
- GinDataPageAddItemPointer(page, btree->items + btree->curitem, off);
+ GinDataPageAddItemPointer(page, items->items + items->curitem, off);
off++;
- btree->curitem++;
+ items->curitem++;
}
- data.nitem = btree->curitem - savedPos;
- rdata[cnt].len = sizeofitem * data.nitem;
+ data.nitem = items->curitem - savedPos;
}
else
{
- GinDataPageAddItemPointer(page, btree->items + btree->curitem, off);
- btree->curitem++;
+ GinDataPageAddItemPointer(page, items->items + items->curitem, off);
+ items->curitem++;
}
+
+ rdata[cnt].data = (char *) &items->items[savedPos];
+ rdata[cnt].len = sizeof(ItemPointerData) * data.nitem;
}
else
- GinDataPageAddPostingItem(page, &(btree->pitem), off);
+ {
+ PostingItem *pitem = insertdata;
+
+ GinDataPageAddPostingItem(page, pitem, off);
+
+ rdata[cnt].data = (char *) pitem;
+ rdata[cnt].len = sizeof(PostingItem);
+ }
return true;
}
@@ -444,7 +440,8 @@ dataPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
* left page
*/
static Page
-dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRecData **prdata)
+dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off,
+ void *insertdata, BlockNumber updateblkno, XLogRecData **prdata)
{
char *ptr;
OffsetNumber separator;
@@ -457,7 +454,6 @@ dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRe
Page rpage = BufferGetPage(rbuf);
Size pageSize = PageGetPageSize(lpage);
Size freeSpace;
- uint32 nCopied = 1;
/* these must be static so they can be returned to caller */
static ginxlogSplit data;
@@ -468,9 +464,14 @@ dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRe
freeSpace = GinDataPageGetFreeSpace(rpage);
*prdata = rdata;
- data.leftChildBlkno = (GinPageIsLeaf(lpage)) ?
- InvalidOffsetNumber : PostingItemGetBlockNumber(&(btree->pitem));
- data.updateBlkno = dataPrepareData(btree, lpage, off);
+
+ /* Update existing downlink to point to next page (on internal page) */
+ if (!isleaf)
+ {
+ PostingItem *pitem = GinDataPageGetPostingItem(lpage, off);
+
+ PostingItemSetBlockNumber(pitem, updateblkno);
+ }
if (isleaf)
{
@@ -487,16 +488,16 @@ dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRe
if (isleaf && GinPageRightMost(lpage) && off > GinPageGetOpaque(lpage)->maxoff)
{
- nCopied = 0;
- while (btree->curitem < btree->nitem &&
+ GinBtreeDataLeafInsertData *items = insertdata;
+
+ while (items->curitem < items->nitem &&
maxoff * sizeof(ItemPointerData) < 2 * (freeSpace - sizeof(ItemPointerData)))
{
memcpy(vector + maxoff * sizeof(ItemPointerData),
- btree->items + btree->curitem,
+ items->items + items->curitem,
sizeof(ItemPointerData));
maxoff++;
- nCopied++;
- btree->curitem++;
+ items->curitem++;
}
}
else
@@ -506,11 +507,17 @@ dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRe
memmove(ptr + sizeofitem, ptr, (maxoff - off + 1) * sizeofitem);
if (isleaf)
{
- memcpy(ptr, btree->items + btree->curitem, sizeofitem);
- btree->curitem++;
+ GinBtreeDataLeafInsertData *items = insertdata;
+
+ memcpy(ptr, items->items + items->curitem, sizeofitem);
+ items->curitem++;
}
else
- memcpy(ptr, &(btree->pitem), sizeofitem);
+ {
+ PostingItem *pitem = insertdata;
+
+ memcpy(ptr, pitem, sizeofitem);
+ }
maxoff++;
}
@@ -553,7 +560,7 @@ dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRe
GinPageGetOpaque(lpage)->maxoff);
else
*bound = GinDataPageGetPostingItem(lpage,
- GinPageGetOpaque(lpage)->maxoff)->key;
+ GinPageGetOpaque(lpage)->maxoff)->key;
/* set up right bound for right page */
bound = GinDataPageGetRightBound(rpage);
@@ -584,16 +591,18 @@ dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRe
}
/*
- * Prepare the state in 'btree' for inserting a downlink for given buffer.
+ * Construct insertion payload for inserting the downlink for given buffer.
*/
-static void
+static void *
dataPrepareDownlink(GinBtree btree, Buffer lbuf)
{
+ PostingItem *pitem = palloc(sizeof(PostingItem));
Page lpage = BufferGetPage(lbuf);
- PostingItemSetBlockNumber(&(btree->pitem), BufferGetBlockNumber(lbuf));
- btree->pitem.key = *GinDataPageGetRightBound(lpage);
- btree->rightblkno = GinPageGetOpaque(lpage)->rightlink;
+ PostingItemSetBlockNumber(pitem, BufferGetBlockNumber(lbuf));
+ pitem->key = *GinDataPageGetRightBound(lpage);
+
+ return pitem;
}
/*
@@ -698,11 +707,12 @@ createPostingTree(Relation index, ItemPointerData *items, uint32 nitems,
}
void
-ginPrepareDataScan(GinBtree btree, Relation index)
+ginPrepareDataScan(GinBtree btree, Relation index, BlockNumber rootBlkno)
{
memset(btree, 0, sizeof(GinBtreeData));
btree->index = index;
+ btree->rootBlkno = rootBlkno;
btree->findChildPage = dataLocateItem;
btree->getLeftMostChild = dataGetLeftMostPage;
@@ -715,7 +725,6 @@ ginPrepareDataScan(GinBtree btree, Relation index)
btree->prepareDownlink = dataPrepareDownlink;
btree->isData = TRUE;
- btree->isDelete = FALSE;
btree->fullScan = FALSE;
btree->isBuild = FALSE;
}
@@ -729,29 +738,32 @@ ginInsertItemPointers(Relation index, BlockNumber rootBlkno,
GinStatsData *buildStats)
{
GinBtreeData btree;
+ GinBtreeDataLeafInsertData insertdata;
GinBtreeStack *stack;
- ginPrepareDataScan(&btree, index);
+ ginPrepareDataScan(&btree, index, rootBlkno);
btree.isBuild = (buildStats != NULL);
- btree.items = items;
- btree.nitem = nitem;
- btree.curitem = 0;
+ insertdata.items = items;
+ insertdata.nitem = nitem;
+ insertdata.curitem = 0;
- while (btree.curitem < btree.nitem)
+ while (insertdata.curitem < insertdata.nitem)
{
- stack = ginFindLeafPage(&btree, rootBlkno, false);
+ /* search for the leaf page where the first item should go to */
+ btree.itemptr = insertdata.items[insertdata.curitem];
+ stack = ginFindLeafPage(&btree, false);
if (btree.findItem(&btree, stack))
{
/*
- * btree.items[btree.curitem] already exists in index
+ * Current item already exists in index.
*/
- btree.curitem++;
+ insertdata.curitem++;
LockBuffer(stack->buffer, GIN_UNLOCK);
freeGinBtreeStack(stack);
}
else
- ginInsertValue(&btree, stack, buildStats);
+ ginInsertValue(&btree, stack, &insertdata, buildStats);
}
}
@@ -764,11 +776,11 @@ ginScanBeginPostingTree(Relation index, BlockNumber rootBlkno)
GinBtreeData btree;
GinBtreeStack *stack;
- ginPrepareDataScan(&btree, index);
+ ginPrepareDataScan(&btree, index, rootBlkno);
btree.fullScan = TRUE;
- stack = ginFindLeafPage(&btree, rootBlkno, TRUE);
+ stack = ginFindLeafPage(&btree, TRUE);
return stack;
}
diff --git a/src/backend/access/gin/ginentrypage.c b/src/backend/access/gin/ginentrypage.c
index ec0114e7d3..1949f24b97 100644
--- a/src/backend/access/gin/ginentrypage.c
+++ b/src/backend/access/gin/ginentrypage.c
@@ -112,6 +112,7 @@ GinFormTuple(GinState *ginstate,
if (newsize != IndexTupleSize(itup))
{
itup = repalloc(itup, newsize);
+
/*
* PostgreSQL 9.3 and earlier did not clear this new space, so we
* might find uninitialized padding when reading tuples from disk.
@@ -431,22 +432,26 @@ entryGetLeftMostPage(GinBtree btree, Page page)
}
static bool
-entryIsEnoughSpace(GinBtree btree, Buffer buf, OffsetNumber off)
+entryIsEnoughSpace(GinBtree btree, Buffer buf, OffsetNumber off,
+ GinBtreeEntryInsertData *insertData)
{
- Size itupsz = 0;
+ Size releasedsz = 0;
+ Size addedsz;
Page page = BufferGetPage(buf);
- Assert(btree->entry);
+ Assert(insertData->entry);
Assert(!GinPageIsData(page));
- if (btree->isDelete)
+ if (insertData->isDelete)
{
IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, off));
- itupsz = MAXALIGN(IndexTupleSize(itup)) + sizeof(ItemIdData);
+ releasedsz = MAXALIGN(IndexTupleSize(itup)) + sizeof(ItemIdData);
}
- if (PageGetFreeSpace(page) + itupsz >= MAXALIGN(IndexTupleSize(btree->entry)) + sizeof(ItemIdData))
+ addedsz = MAXALIGN(IndexTupleSize(insertData->entry)) + sizeof(ItemIdData);
+
+ if (PageGetFreeSpace(page) + releasedsz >= addedsz)
return true;
return false;
@@ -457,42 +462,42 @@ entryIsEnoughSpace(GinBtree btree, Buffer buf, OffsetNumber off)
* should update it, update old child blkno to new right page
* if child split occurred
*/
-static BlockNumber
-entryPreparePage(GinBtree btree, Page page, OffsetNumber off)
+static void
+entryPreparePage(GinBtree btree, Page page, OffsetNumber off,
+ GinBtreeEntryInsertData *insertData, BlockNumber updateblkno)
{
- BlockNumber ret = InvalidBlockNumber;
-
- Assert(btree->entry);
+ Assert(insertData->entry);
Assert(!GinPageIsData(page));
- if (btree->isDelete)
+ if (insertData->isDelete)
{
Assert(GinPageIsLeaf(page));
PageIndexTupleDelete(page, off);
}
- if (!GinPageIsLeaf(page) && btree->rightblkno != InvalidBlockNumber)
+ if (!GinPageIsLeaf(page) && updateblkno != InvalidBlockNumber)
{
IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, off));
- GinSetDownlink(itup, btree->rightblkno);
- ret = btree->rightblkno;
+ GinSetDownlink(itup, updateblkno);
}
-
- btree->rightblkno = InvalidBlockNumber;
-
- return ret;
}
/*
* Place tuple on page and fills WAL record
*
* If the tuple doesn't fit, returns false without modifying the page.
+ *
+ * On insertion to an internal node, in addition to inserting the given item,
+ * the downlink of the existing item at 'off' is updated to point to
+ * 'updateblkno'.
*/
static bool
entryPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
+ void *insertPayload, BlockNumber updateblkno,
XLogRecData **prdata)
{
+ GinBtreeEntryInsertData *insertData = insertPayload;
Page page = BufferGetPage(buf);
OffsetNumber placed;
int cnt = 0;
@@ -502,13 +507,17 @@ entryPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
static ginxlogInsert data;
/* quick exit if it doesn't fit */
- if (!entryIsEnoughSpace(btree, buf, off))
+ if (!entryIsEnoughSpace(btree, buf, off, insertData))
return false;
*prdata = rdata;
- data.updateBlkno = entryPreparePage(btree, page, off);
+ entryPreparePage(btree, page, off, insertData, updateblkno);
+ data.updateBlkno = updateblkno;
- placed = PageAddItem(page, (Item) btree->entry, IndexTupleSize(btree->entry), off, false, false);
+ placed = PageAddItem(page,
+ (Item) insertData->entry,
+ IndexTupleSize(insertData->entry),
+ off, false, false);
if (placed != off)
elog(ERROR, "failed to add item to index page in \"%s\"",
RelationGetRelationName(btree->index));
@@ -517,7 +526,7 @@ entryPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
data.blkno = BufferGetBlockNumber(buf);
data.offset = off;
data.nitem = 1;
- data.isDelete = btree->isDelete;
+ data.isDelete = insertData->isDelete;
data.isData = false;
data.isLeaf = GinPageIsLeaf(page) ? TRUE : FALSE;
@@ -545,12 +554,10 @@ entryPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
cnt++;
rdata[cnt].buffer = InvalidBuffer;
- rdata[cnt].data = (char *) btree->entry;
- rdata[cnt].len = IndexTupleSize(btree->entry);
+ rdata[cnt].data = (char *) insertData->entry;
+ rdata[cnt].len = IndexTupleSize(insertData->entry);
rdata[cnt].next = NULL;
- btree->entry = NULL;
-
return true;
}
@@ -561,8 +568,11 @@ entryPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
* an equal number!
*/
static Page
-entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRecData **prdata)
+entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off,
+ void *insertPayload,
+ BlockNumber updateblkno, XLogRecData **prdata)
{
+ GinBtreeEntryInsertData *insertData = insertPayload;
OffsetNumber i,
maxoff,
separator = InvalidOffsetNumber;
@@ -583,8 +593,9 @@ entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogR
*prdata = rdata;
data.leftChildBlkno = (GinPageIsLeaf(lpage)) ?
- InvalidOffsetNumber : GinGetDownlink(btree->entry);
- data.updateBlkno = entryPreparePage(btree, lpage, off);
+ InvalidOffsetNumber : GinGetDownlink(insertData->entry);
+ data.updateBlkno = updateblkno;
+ entryPreparePage(btree, lpage, off, insertData, updateblkno);
maxoff = PageGetMaxOffsetNumber(lpage);
ptr = tupstore;
@@ -593,8 +604,8 @@ entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogR
{
if (i == off)
{
- size = MAXALIGN(IndexTupleSize(btree->entry));
- memcpy(ptr, btree->entry, size);
+ size = MAXALIGN(IndexTupleSize(insertData->entry));
+ memcpy(ptr, insertData->entry, size);
ptr += size;
totalsize += size + sizeof(ItemIdData);
}
@@ -608,8 +619,8 @@ entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogR
if (off == maxoff + 1)
{
- size = MAXALIGN(IndexTupleSize(btree->entry));
- memcpy(ptr, btree->entry, size);
+ size = MAXALIGN(IndexTupleSize(insertData->entry));
+ memcpy(ptr, insertData->entry, size);
ptr += size;
totalsize += size + sizeof(ItemIdData);
}
@@ -667,20 +678,23 @@ entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogR
}
/*
- * Prepare the state in 'btree' for inserting a downlink for given buffer.
+ * Construct insertion payload for inserting the downlink for given buffer.
*/
-static void
+static void *
entryPrepareDownlink(GinBtree btree, Buffer lbuf)
{
+ GinBtreeEntryInsertData *insertData;
Page lpage = BufferGetPage(lbuf);
+ BlockNumber lblkno = BufferGetBlockNumber(lbuf);
IndexTuple itup;
itup = getRightMostTuple(lpage);
- btree->entry = GinFormInteriorTuple(itup,
- lpage,
- BufferGetBlockNumber(lbuf));
- btree->rightblkno = GinPageGetOpaque(lpage)->rightlink;
+ insertData = palloc(sizeof(GinBtreeEntryInsertData));
+ insertData->entry = GinFormInteriorTuple(itup, lpage, lblkno);
+ insertData->isDelete = false;
+
+ return insertData;
}
/*
@@ -724,6 +738,7 @@ ginPrepareEntryScan(GinBtree btree, OffsetNumber attnum,
memset(btree, 0, sizeof(GinBtreeData));
btree->index = ginstate->index;
+ btree->rootBlkno = GIN_ROOT_BLKNO;
btree->ginstate = ginstate;
btree->findChildPage = entryLocateEntry;
@@ -743,5 +758,4 @@ ginPrepareEntryScan(GinBtree btree, OffsetNumber attnum,
btree->entryAttnum = attnum;
btree->entryKey = key;
btree->entryCategory = category;
- btree->isDelete = FALSE;
}
diff --git a/src/backend/access/gin/ginget.c b/src/backend/access/gin/ginget.c
index 2a3991f1d1..f2ee2fb966 100644
--- a/src/backend/access/gin/ginget.c
+++ b/src/backend/access/gin/ginget.c
@@ -374,7 +374,7 @@ restartScanEntry:
ginPrepareEntryScan(&btreeEntry, entry->attnum,
entry->queryKey, entry->queryCategory,
ginstate);
- stackEntry = ginFindLeafPage(&btreeEntry, GIN_ROOT_BLKNO, true);
+ stackEntry = ginFindLeafPage(&btreeEntry, true);
page = BufferGetPage(stackEntry->buffer);
needUnlock = TRUE;
diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c
index 0a2883aae3..556e31854e 100644
--- a/src/backend/access/gin/gininsert.c
+++ b/src/backend/access/gin/gininsert.c
@@ -163,17 +163,20 @@ ginEntryInsert(GinState *ginstate,
GinStatsData *buildStats)
{
GinBtreeData btree;
+ GinBtreeEntryInsertData insertdata;
GinBtreeStack *stack;
IndexTuple itup;
Page page;
+ insertdata.isDelete = FALSE;
+
/* During index build, count the to-be-inserted entry */
if (buildStats)
buildStats->nEntries++;
ginPrepareEntryScan(&btree, attnum, key, category, ginstate);
- stack = ginFindLeafPage(&btree, GIN_ROOT_BLKNO, false);
+ stack = ginFindLeafPage(&btree, false);
page = BufferGetPage(stack->buffer);
if (btree.findItem(&btree, stack))
@@ -201,7 +204,7 @@ ginEntryInsert(GinState *ginstate,
itup = addItemPointersToLeafTuple(ginstate, itup,
items, nitem, buildStats);
- btree.isDelete = TRUE;
+ insertdata.isDelete = TRUE;
}
else
{
@@ -211,8 +214,8 @@ ginEntryInsert(GinState *ginstate,
}
/* Insert the new or modified leaf tuple */
- btree.entry = itup;
- ginInsertValue(&btree, stack, buildStats);
+ insertdata.entry = itup;
+ ginInsertValue(&btree, stack, &insertdata, buildStats);
pfree(itup);
}
diff --git a/src/backend/access/gin/ginxlog.c b/src/backend/access/gin/ginxlog.c
index ddac343061..ca7bee1f34 100644
--- a/src/backend/access/gin/ginxlog.c
+++ b/src/backend/access/gin/ginxlog.c
@@ -774,7 +774,7 @@ ginContinueSplit(ginIncompleteSplit *split)
GinState ginstate;
Relation reln;
Buffer buffer;
- GinBtreeStack stack;
+ GinBtreeStack *stack;
/*
* elog(NOTICE,"ginContinueSplit root:%u l:%u r:%u", split->rootBlkno,
@@ -802,22 +802,22 @@ ginContinueSplit(ginIncompleteSplit *split)
}
else
{
- ginPrepareDataScan(&btree, reln);
+ ginPrepareDataScan(&btree, reln, split->rootBlkno);
}
- stack.blkno = split->leftBlkno;
- stack.buffer = buffer;
- stack.off = InvalidOffsetNumber;
- stack.parent = NULL;
+ stack = palloc(sizeof(GinBtreeStack));
+ stack->blkno = split->leftBlkno;
+ stack->buffer = buffer;
+ stack->off = InvalidOffsetNumber;
+ stack->parent = NULL;
- ginFindParents(&btree, &stack, split->rootBlkno);
+ ginFindParents(&btree, stack);
+ LockBuffer(stack->parent->buffer, GIN_UNLOCK);
+ ginFinishSplit(&btree, stack, NULL);
- btree.prepareDownlink(&btree, buffer);
- ginInsertValue(&btree, stack.parent, NULL);
+ /* buffer is released by ginFinishSplit */
FreeFakeRelcacheEntry(reln);
-
- UnlockReleaseBuffer(buffer);
}
void
diff --git a/src/include/access/gin_private.h b/src/include/access/gin_private.h
index 39529353d1..bd407fe342 100644
--- a/src/include/access/gin_private.h
+++ b/src/include/access/gin_private.h
@@ -485,41 +485,59 @@ typedef struct GinBtreeData
/* insert methods */
OffsetNumber (*findChildPtr) (GinBtree, Page, BlockNumber, OffsetNumber);
- bool (*placeToPage) (GinBtree, Buffer, OffsetNumber, XLogRecData **);
- Page (*splitPage) (GinBtree, Buffer, Buffer, OffsetNumber, XLogRecData **);
- void (*prepareDownlink) (GinBtree, Buffer);
+ bool (*placeToPage) (GinBtree, Buffer, OffsetNumber, void *, BlockNumber, XLogRecData **);
+ Page (*splitPage) (GinBtree, Buffer, Buffer, OffsetNumber, void *, BlockNumber, XLogRecData **);
+ void *(*prepareDownlink) (GinBtree, Buffer);
void (*fillRoot) (GinBtree, Buffer, Buffer, Buffer);
bool isData;
Relation index;
+ BlockNumber rootBlkno;
GinState *ginstate; /* not valid in a data scan */
bool fullScan;
bool isBuild;
- BlockNumber rightblkno;
-
- /* Entry options */
+ /* Search key for Entry tree */
OffsetNumber entryAttnum;
Datum entryKey;
GinNullCategory entryCategory;
- IndexTuple entry;
- bool isDelete;
- /* Data (posting tree) options */
+ /* Search key for data tree (posting tree) */
+ ItemPointerData itemptr;
+} GinBtreeData;
+
+/* This represents a tuple to be inserted to entry tree. */
+typedef struct
+{
+ IndexTuple entry; /* tuple to insert */
+ bool isDelete; /* delete old tuple at same offset? */
+} GinBtreeEntryInsertData;
+
+/*
+ * This represents an itempointer, or many itempointers, to be inserted to
+ * a data (posting tree) leaf page
+ */
+typedef struct
+{
ItemPointerData *items;
uint32 nitem;
uint32 curitem;
+} GinBtreeDataLeafInsertData;
- PostingItem pitem;
-} GinBtreeData;
+/*
+ * For internal data (posting tree) pages, the insertion payload is a
+ * PostingItem
+ */
-extern GinBtreeStack *ginFindLeafPage(GinBtree btree, BlockNumber rootBlkno, bool searchMode);
+extern GinBtreeStack *ginFindLeafPage(GinBtree btree, bool searchMode);
extern Buffer ginStepRight(Buffer buffer, Relation index, int lockmode);
extern void freeGinBtreeStack(GinBtreeStack *stack);
extern void ginInsertValue(GinBtree btree, GinBtreeStack *stack,
+ void *insertdata, GinStatsData *buildStats);
+extern void ginFindParents(GinBtree btree, GinBtreeStack *stack);
+extern void ginFinishSplit(GinBtree btree, GinBtreeStack *stack,
GinStatsData *buildStats);
-extern void ginFindParents(GinBtree btree, GinBtreeStack *stack, BlockNumber rootBlkno);
/* ginentrypage.c */
extern IndexTuple GinFormTuple(GinState *ginstate,
@@ -543,7 +561,7 @@ extern void ginInsertItemPointers(Relation index, BlockNumber rootBlkno,
GinStatsData *buildStats);
extern GinBtreeStack *ginScanBeginPostingTree(Relation index, BlockNumber rootBlkno);
extern void ginDataFillRoot(GinBtree btree, Buffer root, Buffer lbuf, Buffer rbuf);
-extern void ginPrepareDataScan(GinBtree btree, Relation index);
+extern void ginPrepareDataScan(GinBtree btree, Relation index, BlockNumber rootBlkno);
/* ginscan.c */