diff options
Diffstat (limited to 'src/backend/catalog/heap.c')
| -rw-r--r-- | src/backend/catalog/heap.c | 441 |
1 files changed, 333 insertions, 108 deletions
diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c index 5ad0f7c413..5cb9da1cae 100644 --- a/src/backend/catalog/heap.c +++ b/src/backend/catalog/heap.c @@ -7,7 +7,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/catalog/heap.c,v 1.99 1999/09/29 16:05:56 wieck Exp $ + * $Header: /cvsroot/pgsql/src/backend/catalog/heap.c,v 1.100 1999/10/03 23:55:26 tgl Exp $ * * * INTERFACE ROUTINES @@ -44,9 +44,14 @@ #include "catalog/pg_proc.h" #include "catalog/pg_relcheck.h" #include "commands/trigger.h" +#include "optimizer/clauses.h" +#include "optimizer/planmain.h" #include "optimizer/tlist.h" +#include "optimizer/var.h" +#include "parser/parse_clause.h" #include "parser/parse_coerce.h" #include "parser/parse_expr.h" +#include "parser/parse_relation.h" #include "rewrite/rewriteRemove.h" #include "storage/smgr.h" #include "tcop/tcopprot.h" @@ -68,6 +73,9 @@ static void RelationRemoveIndexes(Relation relation); static void RelationRemoveInheritance(Relation relation); static void RemoveFromNoNameRelList(Relation r); static void AddNewRelationType(char *typeName, Oid new_rel_oid); +static void StoreAttrDefault(Relation rel, AttrNumber attnum, char *adbin, + bool updatePgAttribute); +static void StoreRelCheck(Relation rel, char *ccname, char *ccbin); static void StoreConstraints(Relation rel); static void RemoveConstraints(Relation rel); @@ -1650,86 +1658,66 @@ DestroyNoNameRels(void) tempRels = NULL; } - +/* + * Store a default expression for column attnum of relation rel. + * The expression must be presented as a nodeToString() string. + * If updatePgAttribute is true, update the pg_attribute entry + * for the column to show that a default exists. + */ static void -StoreAttrDefault(Relation rel, AttrDefault *attrdef) +StoreAttrDefault(Relation rel, AttrNumber attnum, char *adbin, + bool updatePgAttribute) { - char str[MAX_PARSE_BUFFER]; - char cast[2 * NAMEDATALEN] = {0}; - Form_pg_attribute atp = rel->rd_att->attrs[attrdef->adnum - 1]; - List *queryTree_list; - List *planTree_list; - Query *query; - TargetEntry *te; - Resdom *resdom; + Form_pg_attribute atp = rel->rd_att->attrs[attnum - 1]; Node *expr; Oid type; - char *adbin; - MemoryContext oldcxt; + RangeTblEntry *rte; + char *adsrc; Relation adrel; Relation idescs[Num_pg_attrdef_indices]; HeapTuple tuple; Datum values[4]; - char nulls[4] = {' ', ' ', ' ', ' '}; - extern GlobalMemory CacheCxt; - -start: + static char nulls[4] = {' ', ' ', ' ', ' '}; + Relation attrrel; + Relation attridescs[Num_pg_attr_indices]; + HeapTuple atttup; + Form_pg_attribute attStruct; - /* - * Surround table name with double quotes to allow mixed-case and - * whitespaces in names. - BGA 1998-11-14 - */ - snprintf(str, MAX_PARSE_BUFFER, - "select %s%s from \"%.*s\"", attrdef->adsrc, cast, - NAMEDATALEN, rel->rd_rel->relname.data); - setheapoverride(true); - planTree_list = pg_parse_and_plan(str, NULL, 0, - &queryTree_list, None, FALSE); - setheapoverride(false); - query = (Query *) lfirst(queryTree_list); - - if (length(query->rtable) > 1 || - flatten_tlist(query->targetList) != NIL) - elog(ERROR, "Cannot use attribute(s) in DEFAULT clause"); - te = (TargetEntry *) lfirst(query->targetList); - resdom = te->resdom; - expr = te->expr; + expr = stringToNode(adbin); type = exprType(expr); if (type != atp->atttypid) { - if (IS_BINARY_COMPATIBLE(type, atp->atttypid)) - ; /* use without change */ - else if (can_coerce_type(1, &(type), &(atp->atttypid))) - expr = coerce_type(NULL, (Node *) expr, type, atp->atttypid, - atp->atttypmod); - else if (IsA(expr, Const)) - { - if (*cast != 0) - elog(ERROR, "DEFAULT clause const type '%s' mismatched with column type '%s'", - typeidTypeName(type), typeidTypeName(atp->atttypid)); - snprintf(cast, 2 * NAMEDATALEN, ":: %s", typeidTypeName(atp->atttypid)); - goto start; - } - else - elog(ERROR, "DEFAULT clause type '%s' mismatched with column type '%s'", - typeidTypeName(type), typeidTypeName(atp->atttypid)); + /* + * Check that it will be possible to coerce the expression + * to the column's type. We store the expression without + * coercion, however, to avoid premature coercion in cases like + * CREATE TABLE tbl (fld datetime DEFAULT 'now'); + */ + coerce_type(NULL, expr, type, atp->atttypid, atp->atttypmod); } - adbin = nodeToString(expr); - oldcxt = MemoryContextSwitchTo((MemoryContext) CacheCxt); - attrdef->adbin = pstrdup(adbin); - (void) MemoryContextSwitchTo(oldcxt); - pfree(adbin); + /* + * deparse_expression needs a RangeTblEntry list, so make one + */ + rte = makeNode(RangeTblEntry); + rte->relname = RelationGetRelationName(rel)->data; + rte->refname = RelationGetRelationName(rel)->data; + rte->relid = RelationGetRelid(rel); + rte->inh = false; + rte->inFromCl = true; + rte->skipAcl = false; + adsrc = deparse_expression(expr, lcons(lcons(rte, NIL), NIL), false); values[Anum_pg_attrdef_adrelid - 1] = rel->rd_id; - values[Anum_pg_attrdef_adnum - 1] = attrdef->adnum; - values[Anum_pg_attrdef_adbin - 1] = PointerGetDatum(textin(attrdef->adbin)); - values[Anum_pg_attrdef_adsrc - 1] = PointerGetDatum(textin(attrdef->adsrc)); + values[Anum_pg_attrdef_adnum - 1] = attnum; + values[Anum_pg_attrdef_adbin - 1] = PointerGetDatum(textin(adbin)); + values[Anum_pg_attrdef_adsrc - 1] = PointerGetDatum(textin(adsrc)); adrel = heap_openr(AttrDefaultRelationName, RowExclusiveLock); tuple = heap_formtuple(adrel->rd_att, values, nulls); - CatalogOpenIndices(Num_pg_attrdef_indices, Name_pg_attrdef_indices, idescs); heap_insert(adrel, tuple); + CatalogOpenIndices(Num_pg_attrdef_indices, Name_pg_attrdef_indices, + idescs); CatalogIndexInsert(idescs, Num_pg_attrdef_indices, adrel, tuple); CatalogCloseIndices(Num_pg_attrdef_indices, idescs); heap_close(adrel, RowExclusiveLock); @@ -1737,62 +1725,78 @@ start: pfree(DatumGetPointer(values[Anum_pg_attrdef_adbin - 1])); pfree(DatumGetPointer(values[Anum_pg_attrdef_adsrc - 1])); pfree(tuple); - + pfree(adsrc); + + if (! updatePgAttribute) + return; /* done if pg_attribute is OK */ + + attrrel = heap_openr(AttributeRelationName, RowExclusiveLock); + atttup = SearchSysCacheTupleCopy(ATTNUM, + ObjectIdGetDatum(rel->rd_id), + (Datum) attnum, 0, 0); + if (!HeapTupleIsValid(atttup)) + elog(ERROR, "cache lookup of attribute %d in relation %u failed", + attnum, rel->rd_id); + attStruct = (Form_pg_attribute) GETSTRUCT(atttup); + if (! attStruct->atthasdef) + { + attStruct->atthasdef = true; + heap_replace(attrrel, &atttup->t_self, atttup, NULL); + /* keep catalog indices current */ + CatalogOpenIndices(Num_pg_attr_indices, Name_pg_attr_indices, + attridescs); + CatalogIndexInsert(attridescs, Num_pg_attr_indices, attrrel, atttup); + CatalogCloseIndices(Num_pg_attr_indices, attridescs); + } + heap_close(attrrel, RowExclusiveLock); + pfree(atttup); } +/* + * Store a constraint expression for the given relation. + * The expression must be presented as a nodeToString() string. + * + * Caller is responsible for updating the count of constraints + * in the pg_class entry for the relation. + */ static void -StoreRelCheck(Relation rel, ConstrCheck *check) +StoreRelCheck(Relation rel, char *ccname, char *ccbin) { - char str[MAX_PARSE_BUFFER]; - List *queryTree_list; - List *planTree_list; - Query *query; - Plan *plan; - List *qual; - char *ccbin; - MemoryContext oldcxt; + Node *expr; + RangeTblEntry *rte; + char *ccsrc; Relation rcrel; Relation idescs[Num_pg_relcheck_indices]; HeapTuple tuple; Datum values[4]; - char nulls[4] = {' ', ' ', ' ', ' '}; - extern GlobalMemory CacheCxt; + static char nulls[4] = {' ', ' ', ' ', ' '}; /* - * Check for table's existance. Surround table name with double-quotes - * to allow mixed-case and whitespace names. - thomas 1998-11-12 + * Convert condition to a normal boolean expression tree. */ - snprintf(str, MAX_PARSE_BUFFER, - "select 1 from \"%.*s\" where %s", - NAMEDATALEN, rel->rd_rel->relname.data, check->ccsrc); - setheapoverride(true); - planTree_list = pg_parse_and_plan(str, NULL, 0, - &queryTree_list, None, FALSE); - setheapoverride(false); - query = (Query *) lfirst(queryTree_list); - - if (length(query->rtable) > 1) - elog(ERROR, "Only relation '%.*s' can be referenced", - NAMEDATALEN, rel->rd_rel->relname.data); - - plan = (Plan *) lfirst(planTree_list); - qual = plan->qual; - - ccbin = nodeToString(qual); - oldcxt = MemoryContextSwitchTo((MemoryContext) CacheCxt); - check->ccbin = (char *) palloc(strlen(ccbin) + 1); - strcpy(check->ccbin, ccbin); - (void) MemoryContextSwitchTo(oldcxt); - pfree(ccbin); + expr = stringToNode(ccbin); + expr = (Node *) make_ands_explicit((List *) expr); + /* + * deparse_expression needs a RangeTblEntry list, so make one + */ + rte = makeNode(RangeTblEntry); + rte->relname = RelationGetRelationName(rel)->data; + rte->refname = RelationGetRelationName(rel)->data; + rte->relid = RelationGetRelid(rel); + rte->inh = false; + rte->inFromCl = true; + rte->skipAcl = false; + ccsrc = deparse_expression(expr, lcons(lcons(rte, NIL), NIL), false); values[Anum_pg_relcheck_rcrelid - 1] = rel->rd_id; - values[Anum_pg_relcheck_rcname - 1] = PointerGetDatum(namein(check->ccname)); - values[Anum_pg_relcheck_rcbin - 1] = PointerGetDatum(textin(check->ccbin)); - values[Anum_pg_relcheck_rcsrc - 1] = PointerGetDatum(textin(check->ccsrc)); + values[Anum_pg_relcheck_rcname - 1] = PointerGetDatum(namein(ccname)); + values[Anum_pg_relcheck_rcbin - 1] = PointerGetDatum(textin(ccbin)); + values[Anum_pg_relcheck_rcsrc - 1] = PointerGetDatum(textin(ccsrc)); rcrel = heap_openr(RelCheckRelationName, RowExclusiveLock); tuple = heap_formtuple(rcrel->rd_att, values, nulls); - CatalogOpenIndices(Num_pg_relcheck_indices, Name_pg_relcheck_indices, idescs); heap_insert(rcrel, tuple); + CatalogOpenIndices(Num_pg_relcheck_indices, Name_pg_relcheck_indices, + idescs); CatalogIndexInsert(idescs, Num_pg_relcheck_indices, rcrel, tuple); CatalogCloseIndices(Num_pg_relcheck_indices, idescs); heap_close(rcrel, RowExclusiveLock); @@ -1801,8 +1805,19 @@ StoreRelCheck(Relation rel, ConstrCheck *check) pfree(DatumGetPointer(values[Anum_pg_relcheck_rcbin - 1])); pfree(DatumGetPointer(values[Anum_pg_relcheck_rcsrc - 1])); pfree(tuple); + pfree(ccsrc); } +/* + * Store defaults and constraints passed in via the tuple constraint struct. + * + * NOTE: only pre-cooked expressions will be passed this way, which is to + * say expressions inherited from an existing relation. Newly parsed + * expressions can be added later, by direct calls to StoreAttrDefault + * and StoreRelCheck (see AddRelationRawConstraints()). We assume that + * pg_attribute and pg_class entries for the relation were already set + * to reflect the existence of these defaults/constraints. + */ static void StoreConstraints(Relation rel) { @@ -1812,19 +1827,229 @@ StoreConstraints(Relation rel) if (!constr) return; - if (constr->num_defval > 0) + for (i = 0; i < constr->num_defval; i++) + StoreAttrDefault(rel, constr->defval[i].adnum, + constr->defval[i].adbin, false); + + for (i = 0; i < constr->num_check; i++) + StoreRelCheck(rel, constr->check[i].ccname, + constr->check[i].ccbin); +} + +/* + * AddRelationRawConstraints + * + * Add raw (not-yet-transformed) column default expressions and/or constraint + * check expressions to an existing relation. This is defined to do both + * for efficiency in DefineRelation, but of course you can do just one or + * the other by passing empty lists. + * + * rel: relation to be modified + * rawColDefaults: list of RawColumnDefault structures + * rawConstraints: list of Constraint nodes + * + * All entries in rawColDefaults will be processed. Entries in rawConstraints + * will be processed only if they are CONSTR_CHECK type and contain a "raw" + * expression. + * + * NB: caller should have opened rel with AccessExclusiveLock, and should + * hold that lock till end of transaction. + */ +void +AddRelationRawConstraints(Relation rel, + List *rawColDefaults, + List *rawConstraints) +{ + char *relname = RelationGetRelationName(rel)->data; + TupleDesc tupleDesc; + TupleConstr *oldconstr; + int numoldchecks; + ConstrCheck *oldchecks; + ParseState *pstate; + int numchecks; + List *listptr; + Relation relrel; + Relation relidescs[Num_pg_class_indices]; + HeapTuple reltup; + Form_pg_class relStruct; + + /* + * Get info about existing constraints. + */ + tupleDesc = RelationGetDescr(rel); + oldconstr = tupleDesc->constr; + if (oldconstr) + { + numoldchecks = oldconstr->num_check; + oldchecks = oldconstr->check; + } + else { - for (i = 0; i < constr->num_defval; i++) - StoreAttrDefault(rel, &(constr->defval[i])); + numoldchecks = 0; + oldchecks = NULL; } - if (constr->num_check > 0) + /* + * Create a dummy ParseState and insert the target relation as + * its sole rangetable entry. We need a ParseState for transformExpr. + */ + pstate = make_parsestate(NULL); + makeRangeTable(pstate, NULL, NULL); + addRangeTableEntry(pstate, relname, relname, false, true); + + /* + * Process column default expressions. + */ + foreach(listptr, rawColDefaults) { - for (i = 0; i < constr->num_check; i++) - StoreRelCheck(rel, &(constr->check[i])); + RawColumnDefault *colDef = (RawColumnDefault *) lfirst(listptr); + Node *expr; + + Assert(colDef->raw_default != NULL); + /* + * Transform raw parsetree to executable expression. + */ + expr = transformExpr(pstate, colDef->raw_default, EXPR_COLUMN_FIRST); + /* + * Make sure default expr does not refer to any vars. + */ + if (contain_var_clause(expr)) + elog(ERROR, "Cannot use attribute(s) in DEFAULT clause"); + /* + * Might as well try to reduce any constant expressions. + */ + expr = eval_const_expressions(expr); + /* + * Must fix opids, in case any operators remain... + */ + fix_opids(expr); + /* + * OK, store it. + */ + StoreAttrDefault(rel, colDef->attnum, nodeToString(expr), true); } - return; + /* + * Process constraint expressions. + */ + numchecks = numoldchecks; + foreach(listptr, rawConstraints) + { + Constraint *cdef = (Constraint *) lfirst(listptr); + char *ccname; + Node *expr; + + if (cdef->contype != CONSTR_CHECK || cdef->raw_expr == NULL) + continue; + Assert(cdef->cooked_expr == NULL); + + /* Check name uniqueness, or generate a new name */ + if (cdef->name != NULL) + { + int i; + List *listptr2; + + ccname = cdef->name; + /* Check against old constraints */ + for (i = 0; i < numoldchecks; i++) + { + if (strcmp(oldchecks[i].ccname, ccname) == 0) + elog(ERROR, "Duplicate CHECK constraint name: '%s'", + ccname); + } + /* Check against other new constraints */ + foreach(listptr2, rawConstraints) + { + Constraint *cdef2 = (Constraint *) lfirst(listptr2); + + if (cdef2 == cdef || + cdef2->contype != CONSTR_CHECK || + cdef2->raw_expr == NULL || + cdef2->name == NULL) + continue; + if (strcmp(cdef2->name, ccname) == 0) + elog(ERROR, "Duplicate CHECK constraint name: '%s'", + ccname); + } + } + else + { + ccname = (char *) palloc(NAMEDATALEN); + snprintf(ccname, NAMEDATALEN, "$%d", numchecks + 1); + } + /* + * Transform raw parsetree to executable expression. + */ + expr = transformExpr(pstate, cdef->raw_expr, EXPR_COLUMN_FIRST); + /* + * Make sure no outside relations are referred to. + */ + if (length(pstate->p_rtable) != 1) + elog(ERROR, "Only relation '%s' can be referenced in CHECK", + relname); + /* + * Might as well try to reduce any constant expressions. + */ + expr = eval_const_expressions(expr); + /* + * Constraints are evaluated with execQual, which expects an + * implicit-AND list, so convert expression to implicit-AND form. + * (We could go so far as to convert to CNF, but that's probably + * overkill...) + */ + expr = (Node *) make_ands_implicit((Expr *) expr); + /* + * Must fix opids in operator clauses. + */ + fix_opids(expr); + /* + * OK, store it. + */ + StoreRelCheck(rel, ccname, nodeToString(expr)); + + numchecks++; + } + + /* + * Update the count of constraints in the relation's pg_class tuple. + * We do this even if there was no change, in order to ensure that an + * SI update message is sent out for the pg_class tuple, which will + * force other backends to rebuild their relcache entries for the rel. + * (Of course, for a newly created rel there is no need for an SI message, + * but for ALTER TABLE ADD ATTRIBUTE this'd be important.) + */ + relrel = heap_openr(RelationRelationName, RowExclusiveLock); + reltup = SearchSysCacheTupleCopy(RELOID, + ObjectIdGetDatum(rel->rd_id), + 0, 0, 0); + if (!HeapTupleIsValid(reltup)) + elog(ERROR, "cache lookup of relation %u failed", rel->rd_id); + relStruct = (Form_pg_class) GETSTRUCT(reltup); + + relStruct->relchecks = numchecks; + + heap_replace(relrel, &reltup->t_self, reltup, NULL); + + /* keep catalog indices current */ + CatalogOpenIndices(Num_pg_class_indices, Name_pg_class_indices, + relidescs); + CatalogIndexInsert(relidescs, Num_pg_class_indices, relrel, reltup); + CatalogCloseIndices(Num_pg_class_indices, relidescs); + + heap_close(relrel, RowExclusiveLock); + pfree(reltup); + + /* + * Force rebuild of our own relcache entry, otherwise subsequent commands + * in this transaction won't see the new defaults/constraints. + * Must bump command counter or relcache rebuild won't see 'em either. + * + * (This might seem unnecessary, since we are sending out an SI message; + * but if the relation has just been created then relcache.c will ignore + * the SI message on the grounds that the rel is transaction-local...) + */ + CommandCounterIncrement(); + RelationRebuildRelation(rel); } static void |
