diff options
| author | Kevin Grittner <kgrittn@postgresql.org> | 2013-10-09 14:26:09 -0500 |
|---|---|---|
| committer | Kevin Grittner <kgrittn@postgresql.org> | 2013-10-09 14:26:09 -0500 |
| commit | f566515192461acd8d9c232f48ddac3fc965cfd8 (patch) | |
| tree | d8609adb64a67bcaadbd1142bd074911aa158d87 /src/backend/utils/adt | |
| parent | 1cccce50f374cfc6081850aedce8eb0f8b274bc5 (diff) | |
| download | postgresql-f566515192461acd8d9c232f48ddac3fc965cfd8.tar.gz | |
Add record_image_ops opclass for matview concurrent refresh.
REFRESH MATERIALIZED VIEW CONCURRENTLY was broken for any matview
containing a column of a type without a default btree operator
class. It also did not produce results consistent with a non-
concurrent REFRESH or a normal view if any column was of a type
which allowed user-visible differences between values which
compared as equal according to the type's default btree opclass.
Concurrent matview refresh was modified to use the new operators
to solve these problems.
Documentation was added for record comparison, both for the
default btree operator class for record, and the newly added
operators. Regression tests now check for proper behavior both
for a matview with a box column and a matview containing a citext
column.
Reviewed by Steve Singer, who suggested some of the doc language.
Diffstat (limited to 'src/backend/utils/adt')
| -rw-r--r-- | src/backend/utils/adt/rowtypes.c | 494 |
1 files changed, 494 insertions, 0 deletions
diff --git a/src/backend/utils/adt/rowtypes.c b/src/backend/utils/adt/rowtypes.c index 1bd473af65..0bcbb5d664 100644 --- a/src/backend/utils/adt/rowtypes.c +++ b/src/backend/utils/adt/rowtypes.c @@ -17,6 +17,7 @@ #include <ctype.h> #include "access/htup_details.h" +#include "access/tuptoaster.h" #include "catalog/pg_type.h" #include "libpq/pqformat.h" #include "utils/builtins.h" @@ -1281,3 +1282,496 @@ btrecordcmp(PG_FUNCTION_ARGS) { PG_RETURN_INT32(record_cmp(fcinfo)); } + + +/* + * record_image_cmp : + * Internal byte-oriented comparison function for records. + * + * Returns -1, 0 or 1 + * + * Note: The normal concepts of "equality" do not apply here; different + * representation of values considered to be equal are not considered to be + * identical. As an example, for the citext type 'A' and 'a' are equal, but + * they are not identical. + */ +static bool +record_image_cmp(PG_FUNCTION_ARGS) +{ + HeapTupleHeader record1 = PG_GETARG_HEAPTUPLEHEADER(0); + HeapTupleHeader record2 = PG_GETARG_HEAPTUPLEHEADER(1); + int32 result = 0; + Oid tupType1; + Oid tupType2; + int32 tupTypmod1; + int32 tupTypmod2; + TupleDesc tupdesc1; + TupleDesc tupdesc2; + HeapTupleData tuple1; + HeapTupleData tuple2; + int ncolumns1; + int ncolumns2; + RecordCompareData *my_extra; + int ncols; + Datum *values1; + Datum *values2; + bool *nulls1; + bool *nulls2; + int i1; + int i2; + int j; + + /* Extract type info from the tuples */ + tupType1 = HeapTupleHeaderGetTypeId(record1); + tupTypmod1 = HeapTupleHeaderGetTypMod(record1); + tupdesc1 = lookup_rowtype_tupdesc(tupType1, tupTypmod1); + ncolumns1 = tupdesc1->natts; + tupType2 = HeapTupleHeaderGetTypeId(record2); + tupTypmod2 = HeapTupleHeaderGetTypMod(record2); + tupdesc2 = lookup_rowtype_tupdesc(tupType2, tupTypmod2); + ncolumns2 = tupdesc2->natts; + + /* Build temporary HeapTuple control structures */ + tuple1.t_len = HeapTupleHeaderGetDatumLength(record1); + ItemPointerSetInvalid(&(tuple1.t_self)); + tuple1.t_tableOid = InvalidOid; + tuple1.t_data = record1; + tuple2.t_len = HeapTupleHeaderGetDatumLength(record2); + ItemPointerSetInvalid(&(tuple2.t_self)); + tuple2.t_tableOid = InvalidOid; + tuple2.t_data = record2; + + /* + * We arrange to look up the needed comparison info just once per series + * of calls, assuming the record types don't change underneath us. + */ + ncols = Max(ncolumns1, ncolumns2); + my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra; + if (my_extra == NULL || + my_extra->ncolumns < ncols) + { + fcinfo->flinfo->fn_extra = + MemoryContextAlloc(fcinfo->flinfo->fn_mcxt, + sizeof(RecordCompareData) - sizeof(ColumnCompareData) + + ncols * sizeof(ColumnCompareData)); + my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra; + my_extra->ncolumns = ncols; + my_extra->record1_type = InvalidOid; + my_extra->record1_typmod = 0; + my_extra->record2_type = InvalidOid; + my_extra->record2_typmod = 0; + } + + if (my_extra->record1_type != tupType1 || + my_extra->record1_typmod != tupTypmod1 || + my_extra->record2_type != tupType2 || + my_extra->record2_typmod != tupTypmod2) + { + MemSet(my_extra->columns, 0, ncols * sizeof(ColumnCompareData)); + my_extra->record1_type = tupType1; + my_extra->record1_typmod = tupTypmod1; + my_extra->record2_type = tupType2; + my_extra->record2_typmod = tupTypmod2; + } + + /* Break down the tuples into fields */ + values1 = (Datum *) palloc(ncolumns1 * sizeof(Datum)); + nulls1 = (bool *) palloc(ncolumns1 * sizeof(bool)); + heap_deform_tuple(&tuple1, tupdesc1, values1, nulls1); + values2 = (Datum *) palloc(ncolumns2 * sizeof(Datum)); + nulls2 = (bool *) palloc(ncolumns2 * sizeof(bool)); + heap_deform_tuple(&tuple2, tupdesc2, values2, nulls2); + + /* + * Scan corresponding columns, allowing for dropped columns in different + * places in the two rows. i1 and i2 are physical column indexes, j is + * the logical column index. + */ + i1 = i2 = j = 0; + while (i1 < ncolumns1 || i2 < ncolumns2) + { + /* + * Skip dropped columns + */ + if (i1 < ncolumns1 && tupdesc1->attrs[i1]->attisdropped) + { + i1++; + continue; + } + if (i2 < ncolumns2 && tupdesc2->attrs[i2]->attisdropped) + { + i2++; + continue; + } + if (i1 >= ncolumns1 || i2 >= ncolumns2) + break; /* we'll deal with mismatch below loop */ + + /* + * Have two matching columns, they must be same type + */ + if (tupdesc1->attrs[i1]->atttypid != + tupdesc2->attrs[i2]->atttypid) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("cannot compare dissimilar column types %s and %s at record column %d", + format_type_be(tupdesc1->attrs[i1]->atttypid), + format_type_be(tupdesc2->attrs[i2]->atttypid), + j + 1))); + + /* + * We consider two NULLs equal; NULL > not-NULL. + */ + if (!nulls1[i1] || !nulls2[i2]) + { + int cmpresult; + + if (nulls1[i1]) + { + /* arg1 is greater than arg2 */ + result = 1; + break; + } + if (nulls2[i2]) + { + /* arg1 is less than arg2 */ + result = -1; + break; + } + + /* Compare the pair of elements */ + if (tupdesc1->attrs[i1]->attlen == -1) + { + Size len1, + len2; + struct varlena *arg1val; + struct varlena *arg2val; + + len1 = toast_raw_datum_size(values1[i1]); + len2 = toast_raw_datum_size(values2[i2]); + arg1val = PG_DETOAST_DATUM_PACKED(values1[i1]); + arg2val = PG_DETOAST_DATUM_PACKED(values2[i2]); + + cmpresult = memcmp(VARDATA_ANY(arg1val), + VARDATA_ANY(arg2val), + len1 - VARHDRSZ); + if ((cmpresult == 0) && (len1 != len2)) + cmpresult = (len1 < len2) ? -1 : 1; + + if ((Pointer) arg1val != (Pointer) values1[i1]) + pfree(arg1val); + if ((Pointer) arg2val != (Pointer) values2[i2]) + pfree(arg2val); + } + else if (tupdesc1->attrs[i1]->attbyval) + { + cmpresult = memcmp(&(values1[i1]), + &(values2[i2]), + tupdesc1->attrs[i1]->attlen); + } + else + { + cmpresult = memcmp(DatumGetPointer(values1[i1]), + DatumGetPointer(values2[i2]), + tupdesc1->attrs[i1]->attlen); + } + + if (cmpresult < 0) + { + /* arg1 is less than arg2 */ + result = -1; + break; + } + else if (cmpresult > 0) + { + /* arg1 is greater than arg2 */ + result = 1; + break; + } + } + + /* equal, so continue to next column */ + i1++, i2++, j++; + } + + /* + * If we didn't break out of the loop early, check for column count + * mismatch. (We do not report such mismatch if we found unequal column + * values; is that a feature or a bug?) + */ + if (result == 0) + { + if (i1 != ncolumns1 || i2 != ncolumns2) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("cannot compare record types with different numbers of columns"))); + } + + pfree(values1); + pfree(nulls1); + pfree(values2); + pfree(nulls2); + ReleaseTupleDesc(tupdesc1); + ReleaseTupleDesc(tupdesc2); + + /* Avoid leaking memory when handed toasted input. */ + PG_FREE_IF_COPY(record1, 0); + PG_FREE_IF_COPY(record2, 1); + + return result; +} + +/* + * record_image_eq : + * compares two records for identical contents, based on byte images + * result : + * returns true if the records are identical, false otherwise. + * + * Note: we do not use record_image_cmp here, since we can avoid + * de-toasting for unequal lengths this way. + */ +Datum +record_image_eq(PG_FUNCTION_ARGS) +{ + HeapTupleHeader record1 = PG_GETARG_HEAPTUPLEHEADER(0); + HeapTupleHeader record2 = PG_GETARG_HEAPTUPLEHEADER(1); + bool result = true; + Oid tupType1; + Oid tupType2; + int32 tupTypmod1; + int32 tupTypmod2; + TupleDesc tupdesc1; + TupleDesc tupdesc2; + HeapTupleData tuple1; + HeapTupleData tuple2; + int ncolumns1; + int ncolumns2; + RecordCompareData *my_extra; + int ncols; + Datum *values1; + Datum *values2; + bool *nulls1; + bool *nulls2; + int i1; + int i2; + int j; + + /* Extract type info from the tuples */ + tupType1 = HeapTupleHeaderGetTypeId(record1); + tupTypmod1 = HeapTupleHeaderGetTypMod(record1); + tupdesc1 = lookup_rowtype_tupdesc(tupType1, tupTypmod1); + ncolumns1 = tupdesc1->natts; + tupType2 = HeapTupleHeaderGetTypeId(record2); + tupTypmod2 = HeapTupleHeaderGetTypMod(record2); + tupdesc2 = lookup_rowtype_tupdesc(tupType2, tupTypmod2); + ncolumns2 = tupdesc2->natts; + + /* Build temporary HeapTuple control structures */ + tuple1.t_len = HeapTupleHeaderGetDatumLength(record1); + ItemPointerSetInvalid(&(tuple1.t_self)); + tuple1.t_tableOid = InvalidOid; + tuple1.t_data = record1; + tuple2.t_len = HeapTupleHeaderGetDatumLength(record2); + ItemPointerSetInvalid(&(tuple2.t_self)); + tuple2.t_tableOid = InvalidOid; + tuple2.t_data = record2; + + /* + * We arrange to look up the needed comparison info just once per series + * of calls, assuming the record types don't change underneath us. + */ + ncols = Max(ncolumns1, ncolumns2); + my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra; + if (my_extra == NULL || + my_extra->ncolumns < ncols) + { + fcinfo->flinfo->fn_extra = + MemoryContextAlloc(fcinfo->flinfo->fn_mcxt, + sizeof(RecordCompareData) - sizeof(ColumnCompareData) + + ncols * sizeof(ColumnCompareData)); + my_extra = (RecordCompareData *) fcinfo->flinfo->fn_extra; + my_extra->ncolumns = ncols; + my_extra->record1_type = InvalidOid; + my_extra->record1_typmod = 0; + my_extra->record2_type = InvalidOid; + my_extra->record2_typmod = 0; + } + + if (my_extra->record1_type != tupType1 || + my_extra->record1_typmod != tupTypmod1 || + my_extra->record2_type != tupType2 || + my_extra->record2_typmod != tupTypmod2) + { + MemSet(my_extra->columns, 0, ncols * sizeof(ColumnCompareData)); + my_extra->record1_type = tupType1; + my_extra->record1_typmod = tupTypmod1; + my_extra->record2_type = tupType2; + my_extra->record2_typmod = tupTypmod2; + } + + /* Break down the tuples into fields */ + values1 = (Datum *) palloc(ncolumns1 * sizeof(Datum)); + nulls1 = (bool *) palloc(ncolumns1 * sizeof(bool)); + heap_deform_tuple(&tuple1, tupdesc1, values1, nulls1); + values2 = (Datum *) palloc(ncolumns2 * sizeof(Datum)); + nulls2 = (bool *) palloc(ncolumns2 * sizeof(bool)); + heap_deform_tuple(&tuple2, tupdesc2, values2, nulls2); + + /* + * Scan corresponding columns, allowing for dropped columns in different + * places in the two rows. i1 and i2 are physical column indexes, j is + * the logical column index. + */ + i1 = i2 = j = 0; + while (i1 < ncolumns1 || i2 < ncolumns2) + { + /* + * Skip dropped columns + */ + if (i1 < ncolumns1 && tupdesc1->attrs[i1]->attisdropped) + { + i1++; + continue; + } + if (i2 < ncolumns2 && tupdesc2->attrs[i2]->attisdropped) + { + i2++; + continue; + } + if (i1 >= ncolumns1 || i2 >= ncolumns2) + break; /* we'll deal with mismatch below loop */ + + /* + * Have two matching columns, they must be same type + */ + if (tupdesc1->attrs[i1]->atttypid != + tupdesc2->attrs[i2]->atttypid) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("cannot compare dissimilar column types %s and %s at record column %d", + format_type_be(tupdesc1->attrs[i1]->atttypid), + format_type_be(tupdesc2->attrs[i2]->atttypid), + j + 1))); + + /* + * We consider two NULLs equal; NULL > not-NULL. + */ + if (!nulls1[i1] || !nulls2[i2]) + { + if (nulls1[i1] || nulls2[i2]) + { + result = false; + break; + } + + /* Compare the pair of elements */ + if (tupdesc1->attrs[i1]->attlen == -1) + { + Size len1, + len2; + + len1 = toast_raw_datum_size(values1[i1]); + len2 = toast_raw_datum_size(values2[i2]); + /* No need to de-toast if lengths don't match. */ + if (len1 != len2) + result = false; + else + { + struct varlena *arg1val; + struct varlena *arg2val; + + arg1val = PG_DETOAST_DATUM_PACKED(values1[i1]); + arg2val = PG_DETOAST_DATUM_PACKED(values2[i2]); + + result = (memcmp(VARDATA_ANY(arg1val), + VARDATA_ANY(arg2val), + len1 - VARHDRSZ) == 0); + + /* Only free memory if it's a copy made here. */ + if ((Pointer) arg1val != (Pointer) values1[i1]) + pfree(arg1val); + if ((Pointer) arg2val != (Pointer) values2[i2]) + pfree(arg2val); + } + } + else if (tupdesc1->attrs[i1]->attbyval) + { + result = (memcmp(&(values1[i1]), + &(values2[i2]), + tupdesc1->attrs[i1]->attlen) == 0); + } + else + { + result = (memcmp(DatumGetPointer(values1[i1]), + DatumGetPointer(values2[i2]), + tupdesc1->attrs[i1]->attlen) == 0); + } + if (!result) + break; + } + + /* equal, so continue to next column */ + i1++, i2++, j++; + } + + /* + * If we didn't break out of the loop early, check for column count + * mismatch. (We do not report such mismatch if we found unequal column + * values; is that a feature or a bug?) + */ + if (result) + { + if (i1 != ncolumns1 || i2 != ncolumns2) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("cannot compare record types with different numbers of columns"))); + } + + pfree(values1); + pfree(nulls1); + pfree(values2); + pfree(nulls2); + ReleaseTupleDesc(tupdesc1); + ReleaseTupleDesc(tupdesc2); + + /* Avoid leaking memory when handed toasted input. */ + PG_FREE_IF_COPY(record1, 0); + PG_FREE_IF_COPY(record2, 1); + + PG_RETURN_BOOL(result); +} + +Datum +record_image_ne(PG_FUNCTION_ARGS) +{ + PG_RETURN_BOOL(!DatumGetBool(record_image_eq(fcinfo))); +} + +Datum +record_image_lt(PG_FUNCTION_ARGS) +{ + PG_RETURN_BOOL(record_image_cmp(fcinfo) < 0); +} + +Datum +record_image_gt(PG_FUNCTION_ARGS) +{ + PG_RETURN_BOOL(record_image_cmp(fcinfo) > 0); +} + +Datum +record_image_le(PG_FUNCTION_ARGS) +{ + PG_RETURN_BOOL(record_image_cmp(fcinfo) <= 0); +} + +Datum +record_image_ge(PG_FUNCTION_ARGS) +{ + PG_RETURN_BOOL(record_image_cmp(fcinfo) >= 0); +} + +Datum +btrecordimagecmp(PG_FUNCTION_ARGS) +{ + PG_RETURN_INT32(record_image_cmp(fcinfo)); +} |
