diff options
author | Tom Lane <tgl@sss.pgh.pa.us> | 2016-01-21 19:47:15 -0500 |
---|---|---|
committer | Tom Lane <tgl@sss.pgh.pa.us> | 2016-01-21 19:47:15 -0500 |
commit | be44ed27b86ebd165bbedf06a4ac5a8eb943d43c (patch) | |
tree | a8371a5b5e3052dbb3b3118c24956f67c9d3b591 /src/backend/access/gist/gistvalidate.c | |
parent | b99551832e79c915e4d877cf0a072120bd248748 (diff) | |
download | postgresql-be44ed27b86ebd165bbedf06a4ac5a8eb943d43c.tar.gz |
Improve index AMs' opclass validation procedures.
The amvalidate functions added in commit 65c5fcd353a859da were on the
crude side. Improve them in a few ways:
* Perform signature checking for operators and support functions.
* Apply more thorough checks for missing operators and functions,
where possible.
* Instead of reporting problems as ERRORs, report most problems as INFO
messages and make the amvalidate function return FALSE. This allows
more than one problem to be discovered per run.
* Report object names rather than OIDs, and work a bit harder on making
the messages understandable.
Also, remove a few more opr_sanity regression test queries that are
now superseded by the amvalidate checks.
Diffstat (limited to 'src/backend/access/gist/gistvalidate.c')
-rw-r--r-- | src/backend/access/gist/gistvalidate.c | 241 |
1 files changed, 191 insertions, 50 deletions
diff --git a/src/backend/access/gist/gistvalidate.c b/src/backend/access/gist/gistvalidate.c index 86b5aeaec5..190b9787bb 100644 --- a/src/backend/access/gist/gistvalidate.c +++ b/src/backend/access/gist/gistvalidate.c @@ -13,12 +13,16 @@ */ #include "postgres.h" +#include "access/amvalidate.h" #include "access/gist_private.h" #include "access/htup_details.h" #include "catalog/pg_amop.h" #include "catalog/pg_amproc.h" #include "catalog/pg_opclass.h" -#include "utils/catcache.h" +#include "catalog/pg_opfamily.h" +#include "catalog/pg_type.h" +#include "utils/builtins.h" +#include "utils/lsyscache.h" #include "utils/syscache.h" @@ -28,15 +32,22 @@ bool gistvalidate(Oid opclassoid) { + bool result = true; HeapTuple classtup; Form_pg_opclass classform; Oid opfamilyoid; Oid opcintype; - int numclassops; - int32 classfuncbits; + Oid opckeytype; + char *opclassname; + HeapTuple familytup; + Form_pg_opfamily familyform; + char *opfamilyname; CatCList *proclist, *oprlist; + List *grouplist; + OpFamilyOpFuncGroup *opclassgroup; int i; + ListCell *lc; /* Fetch opclass information */ classtup = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclassoid)); @@ -46,88 +57,218 @@ gistvalidate(Oid opclassoid) opfamilyoid = classform->opcfamily; opcintype = classform->opcintype; + opckeytype = classform->opckeytype; + if (!OidIsValid(opckeytype)) + opckeytype = opcintype; + opclassname = NameStr(classform->opcname); - ReleaseSysCache(classtup); + /* Fetch opfamily information */ + familytup = SearchSysCache1(OPFAMILYOID, ObjectIdGetDatum(opfamilyoid)); + if (!HeapTupleIsValid(familytup)) + elog(ERROR, "cache lookup failed for operator family %u", opfamilyoid); + familyform = (Form_pg_opfamily) GETSTRUCT(familytup); + + opfamilyname = NameStr(familyform->opfname); /* Fetch all operators and support functions of the opfamily */ oprlist = SearchSysCacheList1(AMOPSTRATEGY, ObjectIdGetDatum(opfamilyoid)); proclist = SearchSysCacheList1(AMPROCNUM, ObjectIdGetDatum(opfamilyoid)); - /* We'll track the ops and functions belonging to the named opclass */ - numclassops = 0; - classfuncbits = 0; - - /* Check support functions */ + /* Check individual support functions */ for (i = 0; i < proclist->n_members; i++) { HeapTuple proctup = &proclist->members[i]->tuple; Form_pg_amproc procform = (Form_pg_amproc) GETSTRUCT(proctup); + bool ok; + + /* + * All GiST support functions should be registered with matching + * left/right types + */ + if (procform->amproclefttype != procform->amprocrighttype) + { + ereport(INFO, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("gist opfamily %s contains support procedure %s with cross-type registration", + opfamilyname, + format_procedure(procform->amproc)))); + result = false; + } + + /* + * We can't check signatures except within the specific opclass, since + * we need to know the associated opckeytype in many cases. + */ + if (procform->amproclefttype != opcintype) + continue; - /* Check that only allowed procedure numbers exist */ - if (procform->amprocnum < 1 || - procform->amprocnum > GISTNProcs) - ereport(ERROR, + /* Check procedure numbers and function signatures */ + switch (procform->amprocnum) + { + case GIST_CONSISTENT_PROC: + ok = check_amproc_signature(procform->amproc, BOOLOID, false, + 5, 5, INTERNALOID, opcintype, + INT2OID, OIDOID, INTERNALOID); + break; + case GIST_UNION_PROC: + ok = check_amproc_signature(procform->amproc, opckeytype, false, + 2, 2, INTERNALOID, INTERNALOID); + break; + case GIST_COMPRESS_PROC: + case GIST_DECOMPRESS_PROC: + case GIST_FETCH_PROC: + ok = check_amproc_signature(procform->amproc, INTERNALOID, true, + 1, 1, INTERNALOID); + break; + case GIST_PENALTY_PROC: + ok = check_amproc_signature(procform->amproc, INTERNALOID, true, + 3, 3, INTERNALOID, + INTERNALOID, INTERNALOID); + break; + case GIST_PICKSPLIT_PROC: + ok = check_amproc_signature(procform->amproc, INTERNALOID, true, + 2, 2, INTERNALOID, INTERNALOID); + break; + case GIST_EQUAL_PROC: + ok = check_amproc_signature(procform->amproc, INTERNALOID, false, + 3, 3, opckeytype, opckeytype, + INTERNALOID); + break; + case GIST_DISTANCE_PROC: + ok = check_amproc_signature(procform->amproc, FLOAT8OID, false, + 5, 5, INTERNALOID, opcintype, + INT2OID, OIDOID, INTERNALOID); + break; + default: + ereport(INFO, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("gist opfamily %s contains function %s with invalid support number %d", + opfamilyname, + format_procedure(procform->amproc), + procform->amprocnum))); + result = false; + continue; /* don't want additional message */ + } + + if (!ok) + { + ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("gist opfamily %u contains invalid support number %d for procedure %u", - opfamilyoid, - procform->amprocnum, procform->amproc))); - - /* Remember functions that are specifically for the named opclass */ - if (procform->amproclefttype == opcintype && - procform->amprocrighttype == opcintype) - classfuncbits |= (1 << procform->amprocnum); + errmsg("gist opfamily %s contains function %s with wrong signature for support number %d", + opfamilyname, + format_procedure(procform->amproc), + procform->amprocnum))); + result = false; + } } - /* Check operators */ + /* Check individual operators */ for (i = 0; i < oprlist->n_members; i++) { HeapTuple oprtup = &oprlist->members[i]->tuple; Form_pg_amop oprform = (Form_pg_amop) GETSTRUCT(oprtup); + Oid op_rettype; /* TODO: Check that only allowed strategy numbers exist */ if (oprform->amopstrategy < 1) - ereport(ERROR, + { + ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("gist opfamily %u contains invalid strategy number %d for operator %u", - opfamilyoid, - oprform->amopstrategy, oprform->amopopr))); - - /* GiST supports ORDER BY operators, but must have distance proc */ - if (oprform->amoppurpose != AMOP_SEARCH && - oprform->amoplefttype == opcintype && - oprform->amoprighttype == opcintype && - (classfuncbits & (1 << GIST_DISTANCE_PROC)) == 0) - ereport(ERROR, + errmsg("gist opfamily %s contains operator %s with invalid strategy number %d", + opfamilyname, + format_operator(oprform->amopopr), + oprform->amopstrategy))); + result = false; + } + + /* GiST supports ORDER BY operators */ + if (oprform->amoppurpose != AMOP_SEARCH) + { + /* ... but must have matching distance proc */ + if (!OidIsValid(get_opfamily_proc(opfamilyoid, + oprform->amoplefttype, + oprform->amoplefttype, + GIST_DISTANCE_PROC))) + { + ereport(INFO, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("gist opfamily %s contains unsupported ORDER BY specification for operator %s", + opfamilyname, + format_operator(oprform->amopopr)))); + result = false; + } + /* ... and operator result must match the claimed btree opfamily */ + op_rettype = get_op_rettype(oprform->amopopr); + if (!opfamily_can_sort_type(oprform->amopsortfamily, op_rettype)) + { + ereport(INFO, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("gist opfamily %s contains incorrect ORDER BY opfamily specification for operator %s", + opfamilyname, + format_operator(oprform->amopopr)))); + result = false; + } + } + else + { + /* Search operators must always return bool */ + op_rettype = BOOLOID; + } + + /* Check operator signature */ + if (!check_amop_signature(oprform->amopopr, op_rettype, + oprform->amoplefttype, + oprform->amoprighttype)) + { + ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("gist opfamily %u contains unsupported ORDER BY specification for operator %u", - opfamilyoid, oprform->amopopr))); + errmsg("gist opfamily %s contains operator %s with wrong signature", + opfamilyname, + format_operator(oprform->amopopr)))); + result = false; + } + } + + /* Now check for inconsistent groups of operators/functions */ + grouplist = identify_opfamily_groups(oprlist, proclist); + opclassgroup = NULL; + foreach(lc, grouplist) + { + OpFamilyOpFuncGroup *thisgroup = (OpFamilyOpFuncGroup *) lfirst(lc); - /* Count operators that are specifically for the named opclass */ - /* XXX we consider only lefttype here */ - if (oprform->amoplefttype == opcintype) - numclassops++; + /* Remember the group exactly matching the test opclass */ + if (thisgroup->lefttype == opcintype && + thisgroup->righttype == opcintype) + opclassgroup = thisgroup; + + /* + * There is not a lot we can do to check the operator sets, since each + * GiST opclass is more or less a law unto itself, and some contain + * only operators that are binary-compatible with the opclass datatype + * (meaning that empty operator sets can be OK). That case also means + * that we shouldn't insist on nonempty function sets except for the + * opclass's own group. + */ } - /* Check that the named opclass is complete */ - if (numclassops == 0) - ereport(ERROR, - (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("gist opclass %u is missing operator(s)", - opclassoid))); + /* Check that the originally-named opclass is complete */ for (i = 1; i <= GISTNProcs; i++) { - if ((classfuncbits & (1 << i)) != 0) + if (opclassgroup && (opclassgroup->functionset & (1 << i)) != 0) continue; /* got it */ if (i == GIST_DISTANCE_PROC || i == GIST_FETCH_PROC) continue; /* optional methods */ - ereport(ERROR, + ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("gist opclass %u is missing required support function %d", - opclassoid, i))); + errmsg("gist opclass %s is missing support function %d", + opclassname, i))); + result = false; } ReleaseCatCacheList(proclist); ReleaseCatCacheList(oprlist); + ReleaseSysCache(familytup); + ReleaseSysCache(classtup); - return true; + return result; } |