diff options
Diffstat (limited to 'src/backend/access/gist/gistutil.c')
-rw-r--r-- | src/backend/access/gist/gistutil.c | 421 |
1 files changed, 61 insertions, 360 deletions
diff --git a/src/backend/access/gist/gistutil.c b/src/backend/access/gist/gistutil.c index 987aed89f7..3be4fd31f5 100644 --- a/src/backend/access/gist/gistutil.c +++ b/src/backend/access/gist/gistutil.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.15 2006/05/29 12:50:06 teodor Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.16 2006/06/28 12:00:14 teodor Exp $ *------------------------------------------------------------------------- */ #include "postgres.h" @@ -21,23 +21,12 @@ #include "miscadmin.h" #include "storage/freespace.h" -/* group flags ( in gistadjsubkey ) */ -#define LEFT_ADDED 0x01 -#define RIGHT_ADDED 0x02 -#define BOTH_ADDED ( LEFT_ADDED | RIGHT_ADDED ) - -static float gistpenalty(GISTSTATE *giststate, int attno, - GISTENTRY *key1, bool isNull1, - GISTENTRY *key2, bool isNull2); - - /* * static *S used for temrorary storage (saves stack and palloc() call) */ -static int attrsizeS[INDEX_MAX_KEYS]; -static Datum attrS[INDEX_MAX_KEYS]; -static bool isnullS[INDEX_MAX_KEYS]; +static Datum attrS[INDEX_MAX_KEYS]; +static bool isnullS[INDEX_MAX_KEYS]; /* * Write itup vector to page, has no control of free space @@ -156,24 +145,31 @@ gistfillitupvec(IndexTuple *vec, int veclen, int *memlen) { * invalid tuple. Resulting Datums aren't compressed. */ -static bool +bool gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startkey, - Datum *attr, bool *isnull, int *attrsize ) { + Datum *attr, bool *isnull ) { int i; GistEntryVector *evec; + int attrsize; - evec = (GistEntryVector *) palloc(((len == 1) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ); + evec = (GistEntryVector *) palloc( ( len + 2 ) * sizeof(GISTENTRY) + GEVHDRSZ); for (i = startkey; i < giststate->tupdesc->natts; i++) { int j; evec->n = 0; + if ( !isnull[i] ) { + gistentryinit( evec->vector[evec->n], attr[i], + NULL, NULL, (OffsetNumber) 0, + FALSE); + evec->n++; + } for (j = 0; j < len; j++) { Datum datum; bool IsNull; - if (GistTupleIsInvalid(itvec[j])) + if (GistTupleIsInvalid(itvec[j])) return FALSE; /* signals that union with invalid tuple => result is invalid */ datum = index_getattr(itvec[j], i + 1, giststate->tupdesc, &IsNull); @@ -184,7 +180,6 @@ gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startke evec->vector + evec->n, datum, NULL, NULL, (OffsetNumber) 0, - ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull), FALSE, IsNull); evec->n++; } @@ -192,7 +187,6 @@ gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startke /* If this tuple vector was all NULLs, the union is NULL */ if ( evec->n == 0 ) { attr[i] = (Datum) 0; - attrsize[i] = (Datum) 0; isnull[i] = TRUE; } else { if (evec->n == 1) { @@ -203,7 +197,7 @@ gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startke /* Make union and store in attr array */ attr[i] = FunctionCall2(&giststate->unionFn[i], PointerGetDatum(evec), - PointerGetDatum(attrsize + i)); + PointerGetDatum(&attrsize)); isnull[i] = FALSE; } @@ -219,20 +213,24 @@ gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startke IndexTuple gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate) { - if ( !gistMakeUnionItVec(giststate, itvec, len, 0, attrS, isnullS, attrsizeS) ) + memset(isnullS, TRUE, sizeof(bool) * giststate->tupdesc->natts); + + if ( !gistMakeUnionItVec(giststate, itvec, len, 0, attrS, isnullS ) ) return gist_form_invalid_tuple(InvalidBlockNumber); - return gistFormTuple(giststate, r, attrS, attrsizeS, isnullS); + return gistFormTuple(giststate, r, attrS, isnullS, false); } /* * makes union of two key */ -static void +void gistMakeUnionKey( GISTSTATE *giststate, int attno, GISTENTRY *entry1, bool isnull1, GISTENTRY *entry2, bool isnull2, - Datum *dst, int *dstsize, bool *dstisnull ) { + Datum *dst, bool *dstisnull ) { + + int dstsize; static char storage[ 2 * sizeof(GISTENTRY) + GEVHDRSZ ]; GistEntryVector *evec = (GistEntryVector*)storage; @@ -242,7 +240,6 @@ gistMakeUnionKey( GISTSTATE *giststate, int attno, if ( isnull1 && isnull2 ) { *dstisnull = TRUE; *dst = (Datum)0; - *dstsize = 0; } else { if ( isnull1 == FALSE && isnull2 == FALSE ) { evec->vector[0] = *entry1; @@ -258,11 +255,11 @@ gistMakeUnionKey( GISTSTATE *giststate, int attno, *dstisnull = FALSE; *dst = FunctionCall2(&giststate->unionFn[attno], PointerGetDatum(evec), - PointerGetDatum(dstsize)); + PointerGetDatum(&dstsize)); } } -static bool +bool gistKeyIsEQ(GISTSTATE *giststate, int attno, Datum a, Datum b) { bool result; @@ -273,6 +270,25 @@ gistKeyIsEQ(GISTSTATE *giststate, int attno, Datum a, Datum b) { } /* + * Decompress all keys in tuple + */ +void +gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p, + OffsetNumber o, GISTENTRY *attdata, bool *isnull) +{ + int i; + + for (i = 0; i < r->rd_att->natts; i++) + { + Datum datum = index_getattr(tuple, i + 1, giststate->tupdesc, &isnull[i]); + + gistdentryinit(giststate, i, &attdata[i], + datum, r, p, o, + FALSE, isnull[i]); + } +} + +/* * Forms union of oldtup and addtup, if union == oldtup then return NULL */ IndexTuple @@ -299,7 +315,7 @@ gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *gis gistMakeUnionKey( giststate, i, oldentries + i, oldisnull[i], addentries + i, addisnull[i], - attrS + i, attrsizeS + i, isnullS + i ); + attrS + i, isnullS + i ); if ( neednew ) /* we already need new key, so we can skip check */ @@ -318,7 +334,7 @@ gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *gis if (neednew) { /* need to update key */ - newtup = gistFormTuple(giststate, r, attrS, attrsizeS, isnullS); + newtup = gistFormTuple(giststate, r, attrS, isnullS, false); newtup->t_tid = oldtup->t_tid; } @@ -326,237 +342,6 @@ gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *gis } /* - * Forms unions of subkeys after page split, but - * uses only tuples aren't in groups of equalent tuples - */ -void -gistunionsubkeyvec(GISTSTATE *giststate, IndexTuple *itvec, - GistSplitVec *gsvp, int startkey) { - IndexTuple *cleanedItVec; - int i, cleanedLen=0; - - cleanedItVec = (IndexTuple*)palloc(sizeof(IndexTuple) * gsvp->len); - - for(i=0;i<gsvp->len;i++) { - if ( gsvp->idgrp && gsvp->idgrp[gsvp->entries[i]]) - continue; - - cleanedItVec[cleanedLen++] = itvec[gsvp->entries[i] - 1]; - } - - gistMakeUnionItVec(giststate, cleanedItVec, cleanedLen, startkey, - gsvp->attr, gsvp->isnull, gsvp->attrsize); - - pfree( cleanedItVec ); -} - -/* - * unions subkeys for after user picksplit over attno-1 column - */ -void -gistunionsubkey(GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl, int attno) -{ - GistSplitVec gsvp; - - gsvp.idgrp = spl->spl_idgrp; - - gsvp.attrsize = spl->spl_lattrsize; - gsvp.attr = spl->spl_lattr; - gsvp.len = spl->spl_nleft; - gsvp.entries = spl->spl_left; - gsvp.isnull = spl->spl_lisnull; - - gistunionsubkeyvec(giststate, itvec, &gsvp, attno); - - gsvp.attrsize = spl->spl_rattrsize; - gsvp.attr = spl->spl_rattr; - gsvp.len = spl->spl_nright; - gsvp.entries = spl->spl_right; - gsvp.isnull = spl->spl_risnull; - - gistunionsubkeyvec(giststate, itvec, &gsvp, attno); -} - -/* - * find group in vector with equal value - */ -static int -gistfindgroup(GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC *spl, int attno) -{ - int i; - int curid = 1; - - /* - * attno key is always not null (see gistSplitByKey), so we may not check for - * nulls - */ - for (i = 0; i < spl->spl_nleft; i++) - { - int j; - int len; - - if (spl->spl_idgrp[spl->spl_left[i]]) - continue; - len = 0; - /* find all equal value in right part */ - for (j = 0; j < spl->spl_nright; j++) - { - if (spl->spl_idgrp[spl->spl_right[j]]) - continue; - if (gistKeyIsEQ(giststate, attno, valvec[spl->spl_left[i]].key, valvec[spl->spl_right[j]].key)) - { - spl->spl_idgrp[spl->spl_right[j]] = curid; - len++; - } - } - /* find all other equal value in left part */ - if (len) - { - /* add current val to list of equal values */ - spl->spl_idgrp[spl->spl_left[i]] = curid; - /* searching .. */ - for (j = i + 1; j < spl->spl_nleft; j++) - { - if (spl->spl_idgrp[spl->spl_left[j]]) - continue; - if (gistKeyIsEQ(giststate, attno, valvec[spl->spl_left[i]].key, valvec[spl->spl_left[j]].key)) - { - spl->spl_idgrp[spl->spl_left[j]] = curid; - len++; - } - } - spl->spl_ngrp[curid] = len + 1; - curid++; - } - } - - return curid; -} - -/* - * Insert equivalent tuples to left or right page with minimum - * penalty - */ -static void -gistadjsubkey(Relation r, - IndexTuple *itup, /* contains compressed entry */ - int len, - GIST_SPLITVEC *v, - GISTSTATE *giststate, - int attno) -{ - int curlen; - OffsetNumber *curwpos; - GISTENTRY entry, - identry[INDEX_MAX_KEYS]; - float lpenalty = 0, - rpenalty = 0; - bool isnull[INDEX_MAX_KEYS]; - int i, - j; - - /* clear vectors */ - curlen = v->spl_nleft; - curwpos = v->spl_left; - for (i = 0; i < v->spl_nleft; i++) - { - if (v->spl_idgrp[v->spl_left[i]] == 0) - { - *curwpos = v->spl_left[i]; - curwpos++; - } - else - curlen--; - } - v->spl_nleft = curlen; - - curlen = v->spl_nright; - curwpos = v->spl_right; - for (i = 0; i < v->spl_nright; i++) - { - if (v->spl_idgrp[v->spl_right[i]] == 0) - { - *curwpos = v->spl_right[i]; - curwpos++; - } - else - curlen--; - } - v->spl_nright = curlen; - - /* add equivalent tuple */ - for (i = 0; i < len; i++) - { - if (v->spl_idgrp[i + 1] == 0) /* already inserted */ - continue; - gistDeCompressAtt(giststate, r, itup[i], NULL, (OffsetNumber) 0, - identry, isnull); - - v->spl_ngrp[v->spl_idgrp[i + 1]]--; - if (v->spl_ngrp[v->spl_idgrp[i + 1]] == 0 && - (v->spl_grpflag[v->spl_idgrp[i + 1]] & BOTH_ADDED) != BOTH_ADDED) - { - /* force last in group */ - rpenalty = 1.0; - lpenalty = (v->spl_grpflag[v->spl_idgrp[i + 1]] & LEFT_ADDED) ? 2.0 : 0.0; - } - else - { - /* where? */ - for (j = attno+1; j < r->rd_att->natts; j++) - { - gistentryinit(entry, v->spl_lattr[j], r, NULL, - (OffsetNumber) 0, v->spl_lattrsize[j], FALSE); - lpenalty = gistpenalty(giststate, j, &entry, v->spl_lisnull[j], - &identry[j], isnull[j]); - - gistentryinit(entry, v->spl_rattr[j], r, NULL, - (OffsetNumber) 0, v->spl_rattrsize[j], FALSE); - rpenalty = gistpenalty(giststate, j, &entry, v->spl_risnull[j], - &identry[j], isnull[j]); - - if (lpenalty != rpenalty) - break; - } - } - - /* - * add XXX: refactor this to avoid duplicating code - */ - if (lpenalty < rpenalty) - { - v->spl_grpflag[v->spl_idgrp[i + 1]] |= LEFT_ADDED; - v->spl_left[v->spl_nleft++] = i + 1; - - for (j = attno+1; j < r->rd_att->natts; j++) - { - gistentryinit(entry, v->spl_lattr[j], r, NULL, - (OffsetNumber) 0, v->spl_lattrsize[j], FALSE); - gistMakeUnionKey( giststate, j, - &entry, v->spl_lisnull[j], - identry + j, isnull[j], - v->spl_lattr + j, v->spl_lattrsize + j, v->spl_lisnull + j ); - } - } - else - { - v->spl_grpflag[v->spl_idgrp[i + 1]] |= RIGHT_ADDED; - v->spl_right[v->spl_nright++] = i + 1; - - for (j = attno+1; j < r->rd_att->natts; j++) - { - gistentryinit(entry, v->spl_rattr[j], r, NULL, - (OffsetNumber) 0, v->spl_rattrsize[j], FALSE); - gistMakeUnionKey( giststate, j, - &entry, v->spl_risnull[j], - identry + j, isnull[j], - v->spl_rattr + j, v->spl_rattrsize + j, v->spl_risnull + j ); - } - } - } -} - -/* * find entry with lowest penalty */ OffsetNumber @@ -580,6 +365,9 @@ gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */ it, NULL, (OffsetNumber) 0, identry, isnull); + Assert( maxoff >= FirstOffsetNumber ); + Assert( !GistPageIsLeaf(p) ); + for (i = FirstOffsetNumber; i <= maxoff && sum_grow; i = OffsetNumberNext(i)) { int j; @@ -602,7 +390,6 @@ gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */ datum = index_getattr(itup, j + 1, giststate->tupdesc, &IsNull); gistdentryinit(giststate, j, &entry, datum, r, p, i, - ATTSIZE(datum, giststate->tupdesc, j + 1, IsNull), FALSE, IsNull); usize = gistpenalty(giststate, j, &entry, IsNull, &identry[j], isnull[j]); @@ -637,23 +424,23 @@ gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */ void gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e, Datum k, Relation r, Page pg, OffsetNumber o, - int b, bool l, bool isNull) + bool l, bool isNull) { - if (b && !isNull) + if (!isNull) { GISTENTRY *dep; - gistentryinit(*e, k, r, pg, o, b, l); + gistentryinit(*e, k, r, pg, o, l); dep = (GISTENTRY *) DatumGetPointer(FunctionCall1(&giststate->decompressFn[nkey], PointerGetDatum(e))); /* decompressFn may just return the given pointer */ if (dep != e) gistentryinit(*e, dep->key, dep->rel, dep->page, dep->offset, - dep->bytes, dep->leafkey); + dep->leafkey); } else - gistentryinit(*e, (Datum) 0, r, pg, o, 0, l); + gistentryinit(*e, (Datum) 0, r, pg, o, l); } @@ -663,28 +450,28 @@ gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e, void gistcentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e, Datum k, Relation r, - Page pg, OffsetNumber o, int b, bool l, bool isNull) + Page pg, OffsetNumber o, bool l, bool isNull) { if (!isNull) { GISTENTRY *cep; - gistentryinit(*e, k, r, pg, o, b, l); + gistentryinit(*e, k, r, pg, o, l); cep = (GISTENTRY *) DatumGetPointer(FunctionCall1(&giststate->compressFn[nkey], PointerGetDatum(e))); /* compressFn may just return the given pointer */ if (cep != e) gistentryinit(*e, cep->key, cep->rel, cep->page, cep->offset, - cep->bytes, cep->leafkey); + cep->leafkey); } else - gistentryinit(*e, (Datum) 0, r, pg, o, 0, l); + gistentryinit(*e, (Datum) 0, r, pg, o, l); } IndexTuple gistFormTuple(GISTSTATE *giststate, Relation r, - Datum attdata[], int datumsize[], bool isnull[]) + Datum attdata[], bool isnull[], bool newValues) { GISTENTRY centry[INDEX_MAX_KEYS]; Datum compatt[INDEX_MAX_KEYS]; @@ -699,8 +486,7 @@ gistFormTuple(GISTSTATE *giststate, Relation r, { gistcentryinit(giststate, i, ¢ry[i], attdata[i], r, NULL, (OffsetNumber) 0, - (datumsize) ? datumsize[i] : -1, - (datumsize) ? FALSE : TRUE, + newValues, FALSE); compatt[i] = centry[i].key; } @@ -711,24 +497,7 @@ gistFormTuple(GISTSTATE *giststate, Relation r, return res; } -void -gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p, - OffsetNumber o, GISTENTRY *attdata, bool *isnull) -{ - int i; - - for (i = 0; i < r->rd_att->natts; i++) - { - Datum datum = index_getattr(tuple, i + 1, giststate->tupdesc, &isnull[i]); - - gistdentryinit(giststate, i, &attdata[i], - datum, r, p, o, - ATTSIZE(datum, giststate->tupdesc, i + 1, isnull[i]), - FALSE, isnull[i]); - } -} - -static float +float gistpenalty(GISTSTATE *giststate, int attno, GISTENTRY *orig, bool isNullOrig, GISTENTRY *add, bool isNullAdd) @@ -749,74 +518,6 @@ gistpenalty(GISTSTATE *giststate, int attno, } /* - * Calls user picksplit method for attno columns to split vector to - * two vectors. May use attno+n columns data to - * get better split. - * Returns TRUE if left and right unions of attno columns are the same, - * so caller may find better split - */ -bool -gistUserPicksplit(Relation r, GistEntryVector *entryvec, int attno, GIST_SPLITVEC *v, - IndexTuple *itup, int len, GISTSTATE *giststate) -{ - /* - * now let the user-defined picksplit function set up the split vector; in - * entryvec have no null value!! - */ - FunctionCall2(&giststate->picksplitFn[attno], - PointerGetDatum(entryvec), - PointerGetDatum(v)); - - /* compatibility with old code */ - if (v->spl_left[v->spl_nleft - 1] == InvalidOffsetNumber) - v->spl_left[v->spl_nleft - 1] = (OffsetNumber) (entryvec->n - 1); - if (v->spl_right[v->spl_nright - 1] == InvalidOffsetNumber) - v->spl_right[v->spl_nright - 1] = (OffsetNumber) (entryvec->n - 1); - - v->spl_lattr[attno] = v->spl_ldatum; - v->spl_rattr[attno] = v->spl_rdatum; - v->spl_lisnull[attno] = false; - v->spl_risnull[attno] = false; - - /* - * if index is multikey, then we must to try get smaller bounding box for - * subkey(s) - */ - if (giststate->tupdesc->natts > 1 && attno+1 != giststate->tupdesc->natts) - { - if ( gistKeyIsEQ(giststate, attno, v->spl_ldatum, v->spl_rdatum) ) { - /* - * Left and right key's unions are equial, so - * we can get better split by following columns. Note, - * uninons for attno columns are already done. - */ - - return true; - } else { - int MaxGrpId; - - v->spl_idgrp = (int *) palloc0(sizeof(int) * entryvec->n); - v->spl_grpflag = (char *) palloc0(sizeof(char) * entryvec->n); - v->spl_ngrp = (int *) palloc(sizeof(int) * entryvec->n); - - MaxGrpId = gistfindgroup(giststate, entryvec->vector, v, attno); - - /* form union of sub keys for each page (l,p) */ - gistunionsubkey(giststate, itup, v, attno + 1); - - /* - * if possible, we insert equivalent tuples with control by penalty - * for a subkey(s) - */ - if (MaxGrpId > 1) - gistadjsubkey(r, itup, len, v, giststate, attno); - } - } - - return false; -} - -/* * Initialize a new index page */ void |