diff options
Diffstat (limited to 'src/backend/catalog/pg_enum.c')
| -rw-r--r-- | src/backend/catalog/pg_enum.c | 402 |
1 files changed, 368 insertions, 34 deletions
diff --git a/src/backend/catalog/pg_enum.c b/src/backend/catalog/pg_enum.c index d544c1f477..0c384def7b 100644 --- a/src/backend/catalog/pg_enum.c +++ b/src/backend/catalog/pg_enum.c @@ -15,15 +15,24 @@ #include "access/genam.h" #include "access/heapam.h" +#include "access/xact.h" #include "catalog/catalog.h" #include "catalog/indexing.h" #include "catalog/pg_enum.h" +#include "catalog/pg_type.h" +#include "storage/lmgr.h" #include "utils/builtins.h" #include "utils/fmgroids.h" #include "utils/rel.h" +#include "utils/syscache.h" #include "utils/tqual.h" + +Oid binary_upgrade_next_pg_enum_oid = InvalidOid; + +static void RenumberEnumType(Relation pg_enum, HeapTuple *existing, int nelems); static int oid_cmp(const void *p1, const void *p2); +static int sort_order_cmp(const void *p1, const void *p2); /* @@ -33,11 +42,9 @@ static int oid_cmp(const void *p1, const void *p2); * vals is a list of Value strings. */ void -EnumValuesCreate(Oid enumTypeOid, List *vals, - Oid binary_upgrade_next_pg_enum_oid) +EnumValuesCreate(Oid enumTypeOid, List *vals) { Relation pg_enum; - TupleDesc tupDesc; NameData enumlabel; Oid *oids; int elemno, @@ -50,48 +57,42 @@ EnumValuesCreate(Oid enumTypeOid, List *vals, num_elems = list_length(vals); /* - * XXX we do not bother to check the list of values for duplicates --- if + * We do not bother to check the list of values for duplicates --- if * you have any, you'll get a less-than-friendly unique-index violation. - * Is it worth trying harder? + * It is probably not worth trying harder. */ pg_enum = heap_open(EnumRelationId, RowExclusiveLock); - tupDesc = pg_enum->rd_att; /* - * Allocate oids + * Allocate OIDs for the enum's members. + * + * While this method does not absolutely guarantee that we generate no + * duplicate OIDs (since we haven't entered each oid into the table + * before allocating the next), trouble could only occur if the OID + * counter wraps all the way around before we finish. Which seems + * unlikely. */ oids = (Oid *) palloc(num_elems * sizeof(Oid)); - if (OidIsValid(binary_upgrade_next_pg_enum_oid)) - { - if (num_elems != 1) - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("EnumValuesCreate() can only set a single OID"))); - oids[0] = binary_upgrade_next_pg_enum_oid; - binary_upgrade_next_pg_enum_oid = InvalidOid; - } - else + + for (elemno = 0; elemno < num_elems; elemno++) { /* - * While this method does not absolutely guarantee that we generate no - * duplicate oids (since we haven't entered each oid into the table - * before allocating the next), trouble could only occur if the oid - * counter wraps all the way around before we finish. Which seems - * unlikely. + * We assign even-numbered OIDs to all the new enum labels. This + * tells the comparison functions the OIDs are in the correct sort + * order and can be compared directly. */ - for (elemno = 0; elemno < num_elems; elemno++) - { - /* - * The pg_enum.oid is stored in user tables. This oid must be - * preserved by binary upgrades. - */ - oids[elemno] = GetNewOid(pg_enum); - } - /* sort them, just in case counter wrapped from high to low */ - qsort(oids, num_elems, sizeof(Oid), oid_cmp); + Oid new_oid; + + do { + new_oid = GetNewOid(pg_enum); + } while (new_oid & 1); + oids[elemno] = new_oid; } + /* sort them, just in case OID counter wrapped from high to low */ + qsort(oids, num_elems, sizeof(Oid), oid_cmp); + /* and make the entries */ memset(nulls, false, sizeof(nulls)); @@ -112,10 +113,11 @@ EnumValuesCreate(Oid enumTypeOid, List *vals, NAMEDATALEN - 1))); values[Anum_pg_enum_enumtypid - 1] = ObjectIdGetDatum(enumTypeOid); + values[Anum_pg_enum_enumsortorder - 1] = Float4GetDatum(elemno + 1); namestrcpy(&enumlabel, lab); values[Anum_pg_enum_enumlabel - 1] = NameGetDatum(&enumlabel); - tup = heap_form_tuple(tupDesc, values, nulls); + tup = heap_form_tuple(RelationGetDescr(pg_enum), values, nulls); HeapTupleSetOid(tup, oids[elemno]); simple_heap_insert(pg_enum, tup); @@ -164,7 +166,322 @@ EnumValuesDelete(Oid enumTypeOid) } -/* qsort comparison function */ +/* + * AddEnumLabel + * Add a new label to the enum set. By default it goes at + * the end, but the user can choose to place it before or + * after any existing set member. + */ +void +AddEnumLabel(Oid enumTypeOid, + const char *newVal, + const char *neighbor, + bool newValIsAfter) +{ + Relation pg_enum; + Oid newOid; + Datum values[Natts_pg_enum]; + bool nulls[Natts_pg_enum]; + NameData enumlabel; + HeapTuple enum_tup; + float4 newelemorder; + HeapTuple *existing; + CatCList *list; + int nelems; + int i; + + /* check length of new label is ok */ + if (strlen(newVal) > (NAMEDATALEN - 1)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_NAME), + errmsg("invalid enum label \"%s\"", newVal), + errdetail("Labels must be %d characters or less.", + NAMEDATALEN - 1))); + + /* + * Acquire a lock on the enum type, which we won't release until commit. + * This ensures that two backends aren't concurrently modifying the same + * enum type. Without that, we couldn't be sure to get a consistent + * view of the enum members via the syscache. Note that this does not + * block other backends from inspecting the type; see comments for + * RenumberEnumType. + */ + LockDatabaseObject(TypeRelationId, enumTypeOid, 0, ExclusiveLock); + + pg_enum = heap_open(EnumRelationId, RowExclusiveLock); + + /* If we have to renumber the existing members, we restart from here */ +restart: + + /* Get the list of existing members of the enum */ + list = SearchSysCacheList1(ENUMTYPOIDNAME, + ObjectIdGetDatum(enumTypeOid)); + nelems = list->n_members; + + /* Sort the existing members by enumsortorder */ + existing = (HeapTuple *) palloc(nelems * sizeof(HeapTuple)); + for (i = 0; i < nelems; i++) + existing[i] = &(list->members[i]->tuple); + + qsort(existing, nelems, sizeof(HeapTuple), sort_order_cmp); + + if (neighbor == NULL) + { + /* + * Put the new label at the end of the list. + * No change to existing tuples is required. + */ + if (nelems > 0) + { + Form_pg_enum en = (Form_pg_enum) GETSTRUCT(existing[nelems - 1]); + + newelemorder = en->enumsortorder + 1; + } + else + newelemorder = 1; + } + else + { + /* BEFORE or AFTER was specified */ + int nbr_index; + int other_nbr_index; + Form_pg_enum nbr_en; + Form_pg_enum other_nbr_en; + + /* Locate the neighbor element */ + for (nbr_index = 0; nbr_index < nelems; nbr_index++) + { + Form_pg_enum en = (Form_pg_enum) GETSTRUCT(existing[nbr_index]); + + if (strcmp(NameStr(en->enumlabel), neighbor) == 0) + break; + } + if (nbr_index >= nelems) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("\"%s\" is not an existing enum label", + neighbor))); + nbr_en = (Form_pg_enum) GETSTRUCT(existing[nbr_index]); + + /* + * Attempt to assign an appropriate enumsortorder value: one less + * than the smallest member, one more than the largest member, + * or halfway between two existing members. + * + * In the "halfway" case, because of the finite precision of float4, + * we might compute a value that's actually equal to one or the + * other of its neighbors. In that case we renumber the existing + * members and try again. + */ + if (newValIsAfter) + other_nbr_index = nbr_index + 1; + else + other_nbr_index = nbr_index - 1; + + if (other_nbr_index < 0) + newelemorder = nbr_en->enumsortorder - 1; + else if (other_nbr_index >= nelems) + newelemorder = nbr_en->enumsortorder + 1; + else + { + other_nbr_en = (Form_pg_enum) GETSTRUCT(existing[other_nbr_index]); + newelemorder = (nbr_en->enumsortorder + + other_nbr_en->enumsortorder) / 2; + if (newelemorder == nbr_en->enumsortorder || + newelemorder == other_nbr_en->enumsortorder) + { + RenumberEnumType(pg_enum, existing, nelems); + /* Clean up and start over */ + pfree(existing); + ReleaseCatCacheList(list); + goto restart; + } + } + } + + /* Get a new OID for the new label */ + if (OidIsValid(binary_upgrade_next_pg_enum_oid)) + { + /* + * In binary upgrades, just add the new label with the predetermined + * Oid. It's pg_upgrade's responsibility that the Oid meets + * requirements. + */ + if (neighbor != NULL) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("ALTER TYPE ADD BEFORE/AFTER is incompatible with binary upgrade"))); + + newOid = binary_upgrade_next_pg_enum_oid; + binary_upgrade_next_pg_enum_oid = InvalidOid; + } + else + { + /* + * Normal case: we need to allocate a new Oid for the value. + * + * We want to give the new element an even-numbered Oid if it's safe, + * which is to say it compares correctly to all pre-existing even + * numbered Oids in the enum. Otherwise, we must give it an odd Oid. + */ + for (;;) + { + bool sorts_ok; + + /* Get a new OID (different from all existing pg_enum tuples) */ + newOid = GetNewOid(pg_enum); + + /* + * Detect whether it sorts correctly relative to existing + * even-numbered labels of the enum. We can ignore existing + * labels with odd Oids, since a comparison involving one of + * those will not take the fast path anyway. + */ + sorts_ok = true; + for (i = 0; i < nelems; i++) + { + HeapTuple exists_tup = existing[i]; + Form_pg_enum exists_en = (Form_pg_enum) GETSTRUCT(exists_tup); + Oid exists_oid = HeapTupleGetOid(exists_tup); + + if (exists_oid & 1) + continue; /* ignore odd Oids */ + + if (exists_en->enumsortorder < newelemorder) + { + /* should sort before */ + if (exists_oid >= newOid) + { + sorts_ok = false; + break; + } + } + else + { + /* should sort after */ + if (exists_oid <= newOid) + { + sorts_ok = false; + break; + } + } + } + + if (sorts_ok) + { + /* If it's even and sorts OK, we're done. */ + if ((newOid & 1) == 0) + break; + + /* + * If it's odd, and sorts OK, loop back to get another OID + * and try again. Probably, the next available even OID + * will sort correctly too, so it's worth trying. + */ + } + else + { + /* + * If it's odd, and does not sort correctly, we're done. + * (Probably, the next available even OID would sort + * incorrectly too, so no point in trying again.) + */ + if (newOid & 1) + break; + + /* + * If it's even, and does not sort correctly, loop back to get + * another OID and try again. (We *must* reject this case.) + */ + } + } + } + + /* Done with info about existing members */ + pfree(existing); + ReleaseCatCacheList(list); + + /* Create the new pg_enum entry */ + memset(nulls, false, sizeof(nulls)); + values[Anum_pg_enum_enumtypid - 1] = ObjectIdGetDatum(enumTypeOid); + values[Anum_pg_enum_enumsortorder - 1] = Float4GetDatum(newelemorder); + namestrcpy(&enumlabel, newVal); + values[Anum_pg_enum_enumlabel - 1] = NameGetDatum(&enumlabel); + enum_tup = heap_form_tuple(RelationGetDescr(pg_enum), values, nulls); + HeapTupleSetOid(enum_tup, newOid); + simple_heap_insert(pg_enum, enum_tup); + CatalogUpdateIndexes(pg_enum, enum_tup); + heap_freetuple(enum_tup); + + heap_close(pg_enum, RowExclusiveLock); +} + + +/* + * RenumberEnumType + * Renumber existing enum elements to have sort positions 1..n. + * + * We avoid doing this unless absolutely necessary; in most installations + * it will never happen. The reason is that updating existing pg_enum + * entries creates hazards for other backends that are concurrently reading + * pg_enum with SnapshotNow semantics. A concurrent SnapshotNow scan could + * see both old and new versions of an updated row as valid, or neither of + * them, if the commit happens between scanning the two versions. It's + * also quite likely for a concurrent scan to see an inconsistent set of + * rows (some members updated, some not). + * + * We can avoid these risks by reading pg_enum with an MVCC snapshot + * instead of SnapshotNow, but that forecloses use of the syscaches. + * We therefore make the following choices: + * + * 1. Any code that is interested in the enumsortorder values MUST read + * pg_enum with an MVCC snapshot, or else acquire lock on the enum type + * to prevent concurrent execution of AddEnumLabel(). The risk of + * seeing inconsistent values of enumsortorder is too high otherwise. + * + * 2. Code that is not examining enumsortorder can use a syscache + * (for example, enum_in and enum_out do so). The worst that can happen + * is a transient failure to find any valid value of the row. This is + * judged acceptable in view of the infrequency of use of RenumberEnumType. + */ +static void +RenumberEnumType(Relation pg_enum, HeapTuple *existing, int nelems) +{ + int i; + + /* + * We should only need to increase existing elements' enumsortorders, + * never decrease them. Therefore, work from the end backwards, to avoid + * unwanted uniqueness violations. + */ + for (i = nelems - 1; i >= 0; i--) + { + HeapTuple newtup; + Form_pg_enum en; + float4 newsortorder; + + newtup = heap_copytuple(existing[i]); + en = (Form_pg_enum) GETSTRUCT(newtup); + + newsortorder = i + 1; + if (en->enumsortorder != newsortorder) + { + en->enumsortorder = newsortorder; + + simple_heap_update(pg_enum, &newtup->t_self, newtup); + + CatalogUpdateIndexes(pg_enum, newtup); + } + + heap_freetuple(newtup); + } + + /* Make the updates visible */ + CommandCounterIncrement(); +} + + +/* qsort comparison function for oids */ static int oid_cmp(const void *p1, const void *p2) { @@ -177,3 +494,20 @@ oid_cmp(const void *p1, const void *p2) return 1; return 0; } + +/* qsort comparison function for tuples by sort order */ +static int +sort_order_cmp(const void *p1, const void *p2) +{ + HeapTuple v1 = *((const HeapTuple *) p1); + HeapTuple v2 = *((const HeapTuple *) p2); + Form_pg_enum en1 = (Form_pg_enum) GETSTRUCT(v1); + Form_pg_enum en2 = (Form_pg_enum) GETSTRUCT(v2); + + if (en1->enumsortorder < en2->enumsortorder) + return -1; + else if (en1->enumsortorder > en2->enumsortorder) + return 1; + else + return 0; +} |
