summaryrefslogtreecommitdiff
path: root/src/backend/access/gist/gistproc.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/gist/gistproc.c')
-rw-r--r--src/backend/access/gist/gistproc.c811
1 files changed, 557 insertions, 254 deletions
diff --git a/src/backend/access/gist/gistproc.c b/src/backend/access/gist/gistproc.c
index 43c4b1251b..f7eb9412f9 100644
--- a/src/backend/access/gist/gistproc.c
+++ b/src/backend/access/gist/gistproc.c
@@ -27,6 +27,9 @@ static double size_box(Datum dbox);
static bool rtree_internal_consistent(BOX *key, BOX *query,
StrategyNumber strategy);
+/* Minimum accepted ratio of split */
+#define LIMIT_RATIO 0.3
+
/**************************************************
* Box ops
@@ -49,30 +52,6 @@ rt_box_union(PG_FUNCTION_ARGS)
PG_RETURN_BOX_P(n);
}
-static Datum
-rt_box_inter(PG_FUNCTION_ARGS)
-{
- BOX *a = PG_GETARG_BOX_P(0);
- BOX *b = PG_GETARG_BOX_P(1);
- BOX *n;
-
- n = (BOX *) palloc(sizeof(BOX));
-
- n->high.x = Min(a->high.x, b->high.x);
- n->high.y = Min(a->high.y, b->high.y);
- n->low.x = Max(a->low.x, b->low.x);
- n->low.y = Max(a->low.y, b->low.y);
-
- if (n->high.x < n->low.x || n->high.y < n->low.y)
- {
- pfree(n);
- /* Indicate "no intersection" by returning NULL pointer */
- n = NULL;
- }
-
- PG_RETURN_BOX_P(n);
-}
-
/*
* The GiST Consistent method for boxes
*
@@ -194,86 +173,6 @@ gist_box_penalty(PG_FUNCTION_ARGS)
PG_RETURN_POINTER(result);
}
-static void
-chooseLR(GIST_SPLITVEC *v,
- OffsetNumber *list1, int nlist1, BOX *union1,
- OffsetNumber *list2, int nlist2, BOX *union2)
-{
- bool firstToLeft = true;
-
- if (v->spl_ldatum_exists || v->spl_rdatum_exists)
- {
- if (v->spl_ldatum_exists && v->spl_rdatum_exists)
- {
- BOX LRl = *union1,
- LRr = *union2;
- BOX RLl = *union2,
- RLr = *union1;
- double sizeLR,
- sizeRL;
-
- adjustBox(&LRl, DatumGetBoxP(v->spl_ldatum));
- adjustBox(&LRr, DatumGetBoxP(v->spl_rdatum));
- adjustBox(&RLl, DatumGetBoxP(v->spl_ldatum));
- adjustBox(&RLr, DatumGetBoxP(v->spl_rdatum));
-
- sizeLR = size_box(DirectFunctionCall2(rt_box_inter, BoxPGetDatum(&LRl), BoxPGetDatum(&LRr)));
- sizeRL = size_box(DirectFunctionCall2(rt_box_inter, BoxPGetDatum(&RLl), BoxPGetDatum(&RLr)));
-
- if (sizeLR > sizeRL)
- firstToLeft = false;
-
- }
- else
- {
- float p1,
- p2;
- GISTENTRY oldUnion,
- addon;
-
- gistentryinit(oldUnion, (v->spl_ldatum_exists) ? v->spl_ldatum : v->spl_rdatum,
- NULL, NULL, InvalidOffsetNumber, FALSE);
-
- gistentryinit(addon, BoxPGetDatum(union1), NULL, NULL, InvalidOffsetNumber, FALSE);
- DirectFunctionCall3(gist_box_penalty, PointerGetDatum(&oldUnion), PointerGetDatum(&addon), PointerGetDatum(&p1));
- gistentryinit(addon, BoxPGetDatum(union2), NULL, NULL, InvalidOffsetNumber, FALSE);
- DirectFunctionCall3(gist_box_penalty, PointerGetDatum(&oldUnion), PointerGetDatum(&addon), PointerGetDatum(&p2));
-
- if ((v->spl_ldatum_exists && p1 > p2) || (v->spl_rdatum_exists && p1 < p2))
- firstToLeft = false;
- }
- }
-
- if (firstToLeft)
- {
- v->spl_left = list1;
- v->spl_right = list2;
- v->spl_nleft = nlist1;
- v->spl_nright = nlist2;
- if (v->spl_ldatum_exists)
- adjustBox(union1, DatumGetBoxP(v->spl_ldatum));
- v->spl_ldatum = BoxPGetDatum(union1);
- if (v->spl_rdatum_exists)
- adjustBox(union2, DatumGetBoxP(v->spl_rdatum));
- v->spl_rdatum = BoxPGetDatum(union2);
- }
- else
- {
- v->spl_left = list2;
- v->spl_right = list1;
- v->spl_nleft = nlist2;
- v->spl_nright = nlist1;
- if (v->spl_ldatum_exists)
- adjustBox(union2, DatumGetBoxP(v->spl_ldatum));
- v->spl_ldatum = BoxPGetDatum(union2);
- if (v->spl_rdatum_exists)
- adjustBox(union1, DatumGetBoxP(v->spl_rdatum));
- v->spl_rdatum = BoxPGetDatum(union1);
- }
-
- v->spl_ldatum_exists = v->spl_rdatum_exists = false;
-}
-
/*
* Trivial split: half of entries will be placed on one page
* and another half - to another
@@ -338,199 +237,603 @@ fallbackSplit(GistEntryVector *entryvec, GIST_SPLITVEC *v)
}
/*
- * The GiST PickSplit method
+ * Represents information about an entry that can be placed to either group
+ * without affecting overlap over selected axis ("common entry").
+ */
+typedef struct
+{
+ /* Index of entry in the initial array */
+ int index;
+ /* Delta between penalties of entry insertion into different groups */
+ double delta;
+} CommonEntry;
+
+/*
+ * Context for g_box_consider_split. Contains information about currently
+ * selected split and some general information.
+ */
+typedef struct
+{
+ int entriesCount; /* total number of entries being split */
+ BOX boundingBox; /* minimum bounding box across all entries */
+
+ /* Information about currently selected split follows */
+
+ bool first; /* true if no split was selected yet */
+
+ double leftUpper; /* upper bound of left interval */
+ double rightLower; /* lower bound of right interval */
+
+ float4 ratio;
+ float4 overlap;
+ int dim; /* axis of this split */
+ double range; /* width of general MBR projection to the
+ * selected axis */
+} ConsiderSplitContext;
+
+/*
+ * Interval represents projection of box to axis.
+ */
+typedef struct
+{
+ double lower,
+ upper;
+} SplitInterval;
+
+/*
+ * Interval comparison function by lower bound of the interval;
+ */
+static int
+interval_cmp_lower(const void *i1, const void *i2)
+{
+ double lower1 = ((SplitInterval *) i1)->lower,
+ lower2 = ((SplitInterval *) i2)->lower;
+
+ if (lower1 < lower2)
+ return -1;
+ else if (lower1 > lower2)
+ return 1;
+ else
+ return 0;
+}
+
+/*
+ * Interval comparison function by upper bound of the interval;
+ */
+static int
+interval_cmp_upper(const void *i1, const void *i2)
+{
+ double upper1 = ((SplitInterval *) i1)->upper,
+ upper2 = ((SplitInterval *) i2)->upper;
+
+ if (upper1 < upper2)
+ return -1;
+ else if (upper1 > upper2)
+ return 1;
+ else
+ return 0;
+}
+
+/*
+ * Replace negative value with zero.
+ */
+static inline float
+non_negative(float val)
+{
+ if (val >= 0.0f)
+ return val;
+ else
+ return 0.0f;
+}
+
+/*
+ * Consider replacement of currently selected split with the better one.
+ */
+static void inline
+g_box_consider_split(ConsiderSplitContext *context, int dimNum,
+ double rightLower, int minLeftCount,
+ double leftUpper, int maxLeftCount)
+{
+ int leftCount,
+ rightCount;
+ float4 ratio,
+ overlap;
+ double range;
+
+ /*
+ * Calculate entries distribution ratio assuming most uniform distribution
+ * of common entries.
+ */
+ if (minLeftCount >= (context->entriesCount + 1) / 2)
+ {
+ leftCount = minLeftCount;
+ }
+ else
+ {
+ if (maxLeftCount <= context->entriesCount / 2)
+ leftCount = maxLeftCount;
+ else
+ leftCount = context->entriesCount / 2;
+ }
+ rightCount = context->entriesCount - leftCount;
+
+ /*
+ * Ratio of split - quotient between size of lesser group and total
+ * entries count.
+ */
+ ratio = ((float4) Min(leftCount, rightCount)) /
+ ((float4) context->entriesCount);
+
+ if (ratio > LIMIT_RATIO)
+ {
+ bool selectthis = false;
+
+ /*
+ * The ratio is acceptable, so compare current split with previously
+ * selected one. Between splits of one dimension we search for minimal
+ * overlap (allowing negative values) and minimal ration (between same
+ * overlaps. We switch dimension if find less overlap (non-negative)
+ * or less range with same overlap.
+ */
+ if (dimNum == 0)
+ range = context->boundingBox.high.x - context->boundingBox.low.x;
+ else
+ range = context->boundingBox.high.y - context->boundingBox.low.y;
+
+ overlap = (leftUpper - rightLower) / range;
+
+ /* If there is no previous selection, select this */
+ if (context->first)
+ selectthis = true;
+ else if (context->dim == dimNum)
+ {
+ /*
+ * Within the same dimension, choose the new split if it has a
+ * smaller overlap, or same overlap but better ratio.
+ */
+ if (overlap < context->overlap ||
+ (overlap == context->overlap && ratio > context->ratio))
+ selectthis = true;
+ }
+ else
+ {
+ /*
+ * Across dimensions, choose the new split if it has a smaller
+ * *non-negative* overlap, or same *non-negative* overlap but
+ * bigger range. This condition differs from the one described in
+ * the article. On the datasets where leaf MBRs don't overlap
+ * themselves, non-overlapping splits (i.e. splits which have zero
+ * *non-negative* overlap) are frequently possible. In this case
+ * splits tends to be along one dimension, because most distant
+ * non-overlapping splits (i.e. having lowest negative overlap)
+ * appears to be in the same dimension as in the previous split.
+ * Therefore MBRs appear to be very prolonged along another
+ * dimension, which leads to bad search performance. Using range
+ * as the second split criteria makes MBRs more quadratic. Using
+ * *non-negative* overlap instead of overlap as the first split
+ * criteria gives to range criteria a chance to matter, because
+ * non-overlapping splits are equivalent in this criteria.
+ */
+ if (non_negative(overlap) < non_negative(context->overlap) ||
+ (range > context->range &&
+ non_negative(overlap) <= non_negative(context->overlap)))
+ selectthis = true;
+ }
+
+ if (selectthis)
+ {
+ /* save information about selected split */
+ context->first = false;
+ context->ratio = ratio;
+ context->range = range;
+ context->overlap = overlap;
+ context->rightLower = rightLower;
+ context->leftUpper = leftUpper;
+ context->dim = dimNum;
+ }
+ }
+}
+
+/*
+ * Return increase of original BOX area by new BOX area insertion.
+ */
+static double
+box_penalty(BOX *original, BOX *new)
+{
+ double union_width,
+ union_height;
+
+ union_width = Max(original->high.x, new->high.x) -
+ Min(original->low.x, new->low.x);
+ union_height = Max(original->high.y, new->high.y) -
+ Min(original->low.y, new->low.y);
+ return union_width * union_height - (original->high.x - original->low.x) *
+ (original->high.y - original->low.y);
+}
+
+/*
+ * Compare common entries by their deltas.
+ */
+static int
+common_entry_cmp(const void *i1, const void *i2)
+{
+ double delta1 = ((CommonEntry *) i1)->delta,
+ delta2 = ((CommonEntry *) i2)->delta;
+
+ if (delta1 < delta2)
+ return -1;
+ else if (delta1 > delta2)
+ return 1;
+ else
+ return 0;
+}
+
+/*
+ * --------------------------------------------------------------------------
+ * Double sorting split algorithm. This is used for both boxes and points.
*
- * New linear algorithm, see 'New Linear Node Splitting Algorithm for R-tree',
- * C.H.Ang and T.C.Tan
+ * The algorithm finds split of boxes by considering splits along each axis.
+ * Each entry is first projected as an interval on the X-axis, and different
+ * ways to split the intervals into two groups are considered, trying to
+ * minimize the overlap of the groups. Then the same is repeated for the
+ * Y-axis, and the overall best split is chosen. The quality of a split is
+ * determined by overlap along that axis and some other criteria (see
+ * g_box_consider_split).
*
- * This is used for both boxes and points.
+ * After that, all the entries are divided into three groups:
+ *
+ * 1) Entries which should be placed to the left group
+ * 2) Entries which should be placed to the right group
+ * 3) "Common entries" which can be placed to any of groups without affecting
+ * of overlap along selected axis.
+ *
+ * The common entries are distributed by minimizing penalty.
+ *
+ * For details see:
+ * "A new double sorting-based node splitting algorithm for R-tree", A. Korotkov
+ * http://syrcose.ispras.ru/2011/files/SYRCoSE2011_Proceedings.pdf#page=36
+ * --------------------------------------------------------------------------
*/
Datum
gist_box_picksplit(PG_FUNCTION_ARGS)
{
GistEntryVector *entryvec = (GistEntryVector *) PG_GETARG_POINTER(0);
GIST_SPLITVEC *v = (GIST_SPLITVEC *) PG_GETARG_POINTER(1);
- OffsetNumber i;
- OffsetNumber *listL,
- *listR,
- *listB,
- *listT;
- BOX *unionL,
- *unionR,
- *unionB,
- *unionT;
- int posL,
- posR,
- posB,
- posT;
- BOX pageunion;
- BOX *cur;
- char direction = ' ';
- bool allisequal = true;
- OffsetNumber maxoff;
- int nbytes;
+ OffsetNumber i,
+ maxoff;
+ ConsiderSplitContext context;
+ BOX *box,
+ *leftBox,
+ *rightBox;
+ int dim,
+ commonEntriesCount;
+ SplitInterval *intervalsLower,
+ *intervalsUpper;
+ CommonEntry *commonEntries;
+ int nentries;
+
+ memset(&context, 0, sizeof(ConsiderSplitContext));
- posL = posR = posB = posT = 0;
maxoff = entryvec->n - 1;
+ nentries = context.entriesCount = maxoff - FirstOffsetNumber + 1;
- cur = DatumGetBoxP(entryvec->vector[FirstOffsetNumber].key);
- memcpy((void *) &pageunion, (void *) cur, sizeof(BOX));
+ /* Allocate arrays for intervals along axes */
+ intervalsLower = (SplitInterval *) palloc(nentries * sizeof(SplitInterval));
+ intervalsUpper = (SplitInterval *) palloc(nentries * sizeof(SplitInterval));
- /* find MBR */
- for (i = OffsetNumberNext(FirstOffsetNumber); i <= maxoff; i = OffsetNumberNext(i))
+ /*
+ * Calculate the overall minimum bounding box over all the entries.
+ */
+ for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
{
- cur = DatumGetBoxP(entryvec->vector[i].key);
- if (allisequal && (
- pageunion.high.x != cur->high.x ||
- pageunion.high.y != cur->high.y ||
- pageunion.low.x != cur->low.x ||
- pageunion.low.y != cur->low.y
- ))
- allisequal = false;
-
- adjustBox(&pageunion, cur);
+ box = DatumGetBoxP(entryvec->vector[i].key);
+ if (i == FirstOffsetNumber)
+ context.boundingBox = *box;
+ else
+ adjustBox(&context.boundingBox, box);
}
- if (allisequal)
+ /*
+ * Iterate over axes for optimal split searching.
+ */
+ context.first = true; /* nothing selected yet */
+ for (dim = 0; dim < 2; dim++)
{
+ double leftUpper,
+ rightLower;
+ int i1,
+ i2;
+
+ /* Project each entry as an interval on the selected axis. */
+ for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
+ {
+ box = DatumGetBoxP(entryvec->vector[i].key);
+ if (dim == 0)
+ {
+ intervalsLower[i - FirstOffsetNumber].lower = box->low.x;
+ intervalsLower[i - FirstOffsetNumber].upper = box->high.x;
+ }
+ else
+ {
+ intervalsLower[i - FirstOffsetNumber].lower = box->low.y;
+ intervalsLower[i - FirstOffsetNumber].upper = box->high.y;
+ }
+ }
+
+ /*
+ * Make two arrays of intervals: one sorted by lower bound and another
+ * sorted by upper bound.
+ */
+ memcpy(intervalsUpper, intervalsLower,
+ sizeof(SplitInterval) * nentries);
+ qsort(intervalsLower, nentries, sizeof(SplitInterval),
+ interval_cmp_lower);
+ qsort(intervalsUpper, nentries, sizeof(SplitInterval),
+ interval_cmp_upper);
+
+ /*----
+ * The goal is to form a left and right interval, so that every entry
+ * interval is contained by either left or right interval (or both).
+ *
+ * For example, with the intervals (0,1), (1,3), (2,3), (2,4):
+ *
+ * 0 1 2 3 4
+ * +-+
+ * +---+
+ * +-+
+ * +---+
+ *
+ * The left and right intervals are of the form (0,a) and (b,4).
+ * We first consider splits where b is the lower bound of an entry.
+ * We iterate through all entries, and for each b, calculate the
+ * smallest possible a. Then we consider splits where a is the
+ * uppper bound of an entry, and for each a, calculate the greatest
+ * possible b.
+ *
+ * In the above example, the first loop would consider splits:
+ * b=0: (0,1)-(0,4)
+ * b=1: (0,1)-(1,4)
+ * b=2: (0,3)-(2,4)
+ *
+ * And the second loop:
+ * a=1: (0,1)-(1,4)
+ * a=3: (0,3)-(2,4)
+ * a=4: (0,4)-(2,4)
+ */
+
+ /*
+ * Iterate over lower bound of right group, finding smallest possible
+ * upper bound of left group.
+ */
+ i1 = 0;
+ i2 = 0;
+ rightLower = intervalsLower[i1].lower;
+ leftUpper = intervalsUpper[i2].lower;
+ while (true)
+ {
+ /*
+ * Find next lower bound of right group.
+ */
+ while (i1 < nentries && rightLower == intervalsLower[i1].lower)
+ {
+ leftUpper = Max(leftUpper, intervalsLower[i1].upper);
+ i1++;
+ }
+ if (i1 >= nentries)
+ break;
+ rightLower = intervalsLower[i1].lower;
+
+ /*
+ * Find count of intervals which anyway should be placed to the
+ * left group.
+ */
+ while (i2 < nentries && intervalsUpper[i2].upper <= leftUpper)
+ i2++;
+
+ /*
+ * Consider found split.
+ */
+ g_box_consider_split(&context, dim, rightLower, i1, leftUpper, i2);
+ }
+
/*
- * All entries are the same
+ * Iterate over upper bound of left group finding greates possible
+ * lower bound of right group.
*/
+ i1 = nentries - 1;
+ i2 = nentries - 1;
+ rightLower = intervalsLower[i1].upper;
+ leftUpper = intervalsUpper[i2].upper;
+ while (true)
+ {
+ /*
+ * Find next upper bound of left group.
+ */
+ while (i2 >= 0 && leftUpper == intervalsUpper[i2].upper)
+ {
+ rightLower = Min(rightLower, intervalsUpper[i2].lower);
+ i2--;
+ }
+ if (i2 < 0)
+ break;
+ leftUpper = intervalsUpper[i2].upper;
+
+ /*
+ * Find count of intervals which anyway should be placed to the
+ * right group.
+ */
+ while (i1 >= 0 && intervalsLower[i1].lower >= rightLower)
+ i1--;
+
+ /*
+ * Consider found split.
+ */
+ g_box_consider_split(&context, dim,
+ rightLower, i1 + 1, leftUpper, i2 + 1);
+ }
+ }
+
+ /*
+ * If we failed to find any acceptable splits, use trivial split.
+ */
+ if (context.first)
+ {
fallbackSplit(entryvec, v);
PG_RETURN_POINTER(v);
}
- nbytes = (maxoff + 2) * sizeof(OffsetNumber);
- listL = (OffsetNumber *) palloc(nbytes);
- listR = (OffsetNumber *) palloc(nbytes);
- listB = (OffsetNumber *) palloc(nbytes);
- listT = (OffsetNumber *) palloc(nbytes);
- unionL = (BOX *) palloc(sizeof(BOX));
- unionR = (BOX *) palloc(sizeof(BOX));
- unionB = (BOX *) palloc(sizeof(BOX));
- unionT = (BOX *) palloc(sizeof(BOX));
-
-#define ADDLIST( list, unionD, pos, num ) do { \
- if ( pos ) { \
- if ( (unionD)->high.x < cur->high.x ) (unionD)->high.x = cur->high.x; \
- if ( (unionD)->low.x > cur->low.x ) (unionD)->low.x = cur->low.x; \
- if ( (unionD)->high.y < cur->high.y ) (unionD)->high.y = cur->high.y; \
- if ( (unionD)->low.y > cur->low.y ) (unionD)->low.y = cur->low.y; \
- } else { \
- memcpy( (void*)(unionD), (void*) cur, sizeof( BOX ) ); \
- } \
- (list)[pos] = num; \
- (pos)++; \
-} while(0)
+ /*
+ * Ok, we have now selected the split across one axis.
+ *
+ * While considering the splits, we already determined that there will be
+ * enough entries in both groups to reach the desired ratio, but we did
+ * not memorize which entries go to which group. So determine that now.
+ */
- for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
- {
- cur = DatumGetBoxP(entryvec->vector[i].key);
- if (cur->low.x - pageunion.low.x < pageunion.high.x - cur->high.x)
- ADDLIST(listL, unionL, posL, i);
- else
- ADDLIST(listR, unionR, posR, i);
- if (cur->low.y - pageunion.low.y < pageunion.high.y - cur->high.y)
- ADDLIST(listB, unionB, posB, i);
- else
- ADDLIST(listT, unionT, posT, i);
- }
+ /* Allocate vectors for results */
+ v->spl_left = (OffsetNumber *) palloc(nentries * sizeof(OffsetNumber));
+ v->spl_right = (OffsetNumber *) palloc(nentries * sizeof(OffsetNumber));
+ v->spl_nleft = 0;
+ v->spl_nright = 0;
+
+ /* Allocate bounding boxes of left and right groups */
+ leftBox = palloc0(sizeof(BOX));
+ rightBox = palloc0(sizeof(BOX));
-#define LIMIT_RATIO 0.1
-#define _IS_BADRATIO(x,y) ( (y) == 0 || (float)(x)/(float)(y) < LIMIT_RATIO )
-#define IS_BADRATIO(x,y) ( _IS_BADRATIO((x),(y)) || _IS_BADRATIO((y),(x)) )
- /* bad disposition, try to split by centers of boxes */
- if (IS_BADRATIO(posR, posL) && IS_BADRATIO(posT, posB))
+ /*
+ * Allocate an array for "common entries" - entries which can be placed to
+ * either group without affecting overlap along selected axis.
+ */
+ commonEntriesCount = 0;
+ commonEntries = (CommonEntry *) palloc(nentries * sizeof(CommonEntry));
+
+ /* Helper macros to place an entry in the left or right group */
+#define PLACE_LEFT(box, off) \
+ do { \
+ if (v->spl_nleft > 0) \
+ adjustBox(leftBox, box); \
+ else \
+ *leftBox = *(box); \
+ v->spl_left[v->spl_nleft++] = off; \
+ } while(0)
+
+#define PLACE_RIGHT(box, off) \
+ do { \
+ if (v->spl_nright > 0) \
+ adjustBox(rightBox, box); \
+ else \
+ *rightBox = *(box); \
+ v->spl_right[v->spl_nright++] = off; \
+ } while(0)
+
+ /*
+ * Distribute entries which can be distributed unambiguously, and collect
+ * common entries.
+ */
+ for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
{
- double avgCenterX = 0.0,
- avgCenterY = 0.0;
- double CenterX,
- CenterY;
+ double lower,
+ upper;
- for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
+ /*
+ * Get upper and lower bounds along selected axis.
+ */
+ box = DatumGetBoxP(entryvec->vector[i].key);
+ if (context.dim == 0)
{
- cur = DatumGetBoxP(entryvec->vector[i].key);
- avgCenterX += ((double) cur->high.x + (double) cur->low.x) / 2.0;
- avgCenterY += ((double) cur->high.y + (double) cur->low.y) / 2.0;
+ lower = box->low.x;
+ upper = box->high.x;
}
-
- avgCenterX /= maxoff;
- avgCenterY /= maxoff;
-
- posL = posR = posB = posT = 0;
- for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
+ else
{
- cur = DatumGetBoxP(entryvec->vector[i].key);
-
- CenterX = ((double) cur->high.x + (double) cur->low.x) / 2.0;
- CenterY = ((double) cur->high.y + (double) cur->low.y) / 2.0;
+ lower = box->low.y;
+ upper = box->high.y;
+ }
- if (CenterX < avgCenterX)
- ADDLIST(listL, unionL, posL, i);
- else if (CenterX == avgCenterX)
+ if (upper <= context.leftUpper)
+ {
+ /* Fits to the left group */
+ if (lower >= context.rightLower)
{
- if (posL > posR)
- ADDLIST(listR, unionR, posR, i);
- else
- ADDLIST(listL, unionL, posL, i);
+ /* Fits also to the right group, so "common entry" */
+ commonEntries[commonEntriesCount++].index = i;
}
else
- ADDLIST(listR, unionR, posR, i);
-
- if (CenterY < avgCenterY)
- ADDLIST(listB, unionB, posB, i);
- else if (CenterY == avgCenterY)
{
- if (posB > posT)
- ADDLIST(listT, unionT, posT, i);
- else
- ADDLIST(listB, unionB, posB, i);
+ /* Doesn't fit to the right group, so join to the left group */
+ PLACE_LEFT(box, i);
}
- else
- ADDLIST(listT, unionT, posT, i);
}
-
- if (IS_BADRATIO(posR, posL) && IS_BADRATIO(posT, posB))
+ else
{
- fallbackSplit(entryvec, v);
- PG_RETURN_POINTER(v);
+ /*
+ * Each entry should fit on either left or right group. Since this
+ * entry didn't fit on the left group, it better fit in the right
+ * group.
+ */
+ Assert(lower >= context.rightLower);
+
+ /* Doesn't fit to the left group, so join to the right group */
+ PLACE_RIGHT(box, i);
}
}
- /* which split more optimal? */
- if (Max(posL, posR) < Max(posB, posT))
- direction = 'x';
- else if (Max(posL, posR) > Max(posB, posT))
- direction = 'y';
- else
+ /*
+ * Distribute "common entries", if any.
+ */
+ if (commonEntriesCount > 0)
{
- Datum interLR = DirectFunctionCall2(rt_box_inter,
- BoxPGetDatum(unionL),
- BoxPGetDatum(unionR));
- Datum interBT = DirectFunctionCall2(rt_box_inter,
- BoxPGetDatum(unionB),
- BoxPGetDatum(unionT));
- double sizeLR,
- sizeBT;
-
- sizeLR = size_box(interLR);
- sizeBT = size_box(interBT);
-
- if (sizeLR < sizeBT)
- direction = 'x';
- else
- direction = 'y';
- }
+ /*
+ * Calculate minimum number of entries that must be placed in both
+ * groups, to reach LIMIT_RATIO.
+ */
+ int m = ceil(LIMIT_RATIO * (double) nentries);
- if (direction == 'x')
- chooseLR(v,
- listL, posL, unionL,
- listR, posR, unionR);
- else
- chooseLR(v,
- listB, posB, unionB,
- listT, posT, unionT);
+ /*
+ * Calculate delta between penalties of join "common entries" to
+ * different groups.
+ */
+ for (i = 0; i < commonEntriesCount; i++)
+ {
+ box = DatumGetBoxP(entryvec->vector[commonEntries[i].index].key);
+ commonEntries[i].delta = Abs(box_penalty(leftBox, box) -
+ box_penalty(rightBox, box));
+ }
+
+ /*
+ * Sort "common entries" by calculated deltas in order to distribute
+ * the most ambiguous entries first.
+ */
+ qsort(commonEntries, commonEntriesCount, sizeof(CommonEntry), common_entry_cmp);
+
+ /*
+ * Distribute "common entries" between groups.
+ */
+ for (i = 0; i < commonEntriesCount; i++)
+ {
+ box = DatumGetBoxP(entryvec->vector[commonEntries[i].index].key);
+
+ /*
+ * Check if we have to place this entry in either group to achieve
+ * LIMIT_RATIO.
+ */
+ if (v->spl_nleft + (commonEntriesCount - i) <= m)
+ PLACE_LEFT(box, commonEntries[i].index);
+ else if (v->spl_nright + (commonEntriesCount - i) <= m)
+ PLACE_RIGHT(box, commonEntries[i].index);
+ else
+ {
+ /* Otherwise select the group by minimal penalty */
+ if (box_penalty(leftBox, box) < box_penalty(rightBox, box))
+ PLACE_LEFT(box, commonEntries[i].index);
+ else
+ PLACE_RIGHT(box, commonEntries[i].index);
+ }
+ }
+ }
+ v->spl_ldatum = PointerGetDatum(leftBox);
+ v->spl_rdatum = PointerGetDatum(rightBox);
PG_RETURN_POINTER(v);
}