diff options
Diffstat (limited to 'contrib/dbmirror/pending.c')
| -rw-r--r-- | contrib/dbmirror/pending.c | 243 |
1 files changed, 85 insertions, 158 deletions
diff --git a/contrib/dbmirror/pending.c b/contrib/dbmirror/pending.c index 3ed9d2128c..36f5837bbd 100644 --- a/contrib/dbmirror/pending.c +++ b/contrib/dbmirror/pending.c @@ -1,7 +1,7 @@ /**************************************************************************** * pending.c - * $Id: pending.c,v 1.21 2005/03/29 00:16:48 tgl Exp $ - * $PostgreSQL: pgsql/contrib/dbmirror/pending.c,v 1.21 2005/03/29 00:16:48 tgl Exp $ + * $Id: pending.c,v 1.22 2005/10/02 23:50:05 tgl Exp $ + * $PostgreSQL: pgsql/contrib/dbmirror/pending.c,v 1.22 2005/10/02 23:50:05 tgl Exp $ * * This file contains a trigger for Postgresql-7.x to record changes to tables * to a pending table for mirroring. @@ -30,12 +30,13 @@ * * ***************************************************************************/ -#include <postgres.h> +#include "postgres.h" -#include <executor/spi.h> -#include <commands/trigger.h> -#include <utils/lsyscache.h> -#include <utils/array.h> +#include "executor/spi.h" +#include "commands/sequence.h" +#include "commands/trigger.h" +#include "utils/lsyscache.h" +#include "utils/array.h" enum FieldUsage { @@ -81,11 +82,11 @@ PG_FUNCTION_INFO_V1(recordchange); -extern Datum nextval(PG_FUNCTION_ARGS); -extern Datum setval(PG_FUNCTION_ARGS); +extern Datum setval_mirror(PG_FUNCTION_ARGS); +extern Datum setval3_mirror(PG_FUNCTION_ARGS); +extern Datum nextval_mirror(PG_FUNCTION_ARGS); -int saveSequenceUpdate(const text *sequenceName, - int nextSequenceValue); +static void saveSequenceUpdate(Oid relid, int64 nextValue, bool iscalled); /***************************************************************************** @@ -310,11 +311,9 @@ storeKeyInfo(char *cpTableName, HeapTuple tTupleData, SPI_pfree(cpKeyData); if (iRetCode != SPI_OK_INSERT) - { - ereport(ERROR, (errcode(ERRCODE_TRIGGERED_ACTION_EXCEPTION) - ,errmsg("error inserting row in pendingDelete"))); - return -1; - } + ereport(ERROR, + (errcode(ERRCODE_TRIGGERED_ACTION_EXCEPTION), + errmsg("error inserting row in pendingDelete"))); debug_msg("insert successful"); @@ -583,161 +582,75 @@ packageData(HeapTuple tTupleData, TupleDesc tTupleDesc, Oid tableOid, } -PG_FUNCTION_INFO_V1(setval); +/* + * Support for mirroring sequence objects. + */ + +PG_FUNCTION_INFO_V1(setval_mirror); Datum -setval(PG_FUNCTION_ARGS) +setval_mirror(PG_FUNCTION_ARGS) { + Oid relid = PG_GETARG_OID(0); + int64 next = PG_GETARG_INT64(1); + int64 result; + result = DatumGetInt64(DirectFunctionCall2(setval_oid, + ObjectIdGetDatum(relid), + Int64GetDatum(next))); - text *sequenceName; - - Oid setvalArgTypes[3] = {TEXTOID, INT4OID,BOOLOID}; - int nextValue; - void *setvalPlan = NULL; - Datum setvalData[3]; - const char *setvalQuery = "SELECT setval_pg($1,$2,$3)"; - int ret; - char is_called; - - sequenceName = PG_GETARG_TEXT_P(0); - nextValue = PG_GETARG_INT32(1); - is_called = PG_GETARG_BOOL(2); - - setvalData[0] = PointerGetDatum(sequenceName); - setvalData[1] = Int32GetDatum(nextValue); - if(PG_NARGS() > 2) - { - setvalData[2] = BoolGetDatum(is_called); - } - else - { - setvalData[2]=1; - } - - if (SPI_connect() < 0) - { - ereport(ERROR, (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION), - errmsg("dbmirror:setval could not connect to SPI"))); - return -1; - } - - setvalPlan = SPI_prepare(setvalQuery, 3, setvalArgTypes); - if (setvalPlan == NULL) - { - ereport(ERROR, (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION), - errmsg("dbmirror:setval could not prepare plan"))); - return -1; - } - - ret = SPI_execp(setvalPlan, setvalData, NULL, 1); - - if (ret != SPI_OK_SELECT || SPI_processed != 1) - return -1; - - debug_msg2("dbmirror:setval: setval_pg returned ok:%d", nextValue); - - ret = saveSequenceUpdate(sequenceName, nextValue); - - SPI_pfree(setvalPlan); - - SPI_finish(); - debug_msg("dbmirror:setval about to return"); - return Int64GetDatum(nextValue); + saveSequenceUpdate(relid, result, true); + PG_RETURN_INT64(result); } - - -PG_FUNCTION_INFO_V1(nextval); +PG_FUNCTION_INFO_V1(setval3_mirror); Datum -nextval(PG_FUNCTION_ARGS) +setval3_mirror(PG_FUNCTION_ARGS) { - text *sequenceName; - - const char *nextvalQuery = "SELECT nextval_pg($1)"; - Oid nextvalArgTypes[1] = {TEXTOID}; - void *nextvalPlan = NULL; - Datum nextvalData[1]; - - - int ret; - HeapTuple resTuple; - char isNull; - int nextSequenceValue; - - - - debug_msg("dbmirror:nextval Starting pending.so:nextval"); - - - sequenceName = PG_GETARG_TEXT_P(0); - - if (SPI_connect() < 0) - { - ereport(ERROR, (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION), - errmsg("dbmirror:nextval could not connect to SPI"))); - return -1; - } - - nextvalPlan = SPI_prepare(nextvalQuery, 1, nextvalArgTypes); - - - debug_msg("prepared plan to call nextval_pg"); - - - if (nextvalPlan == NULL) - { - ereport(ERROR, (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION), - errmsg("dbmirror:nextval error creating plan"))); - return -1; - } - nextvalData[0] = PointerGetDatum(sequenceName); - - ret = SPI_execp(nextvalPlan, nextvalData, NULL, 1); - - debug_msg("dbmirror:Executed call to nextval_pg"); - - - if (ret != SPI_OK_SELECT || SPI_processed != 1) - return -1; - - resTuple = SPI_tuptable->vals[0]; + Oid relid = PG_GETARG_OID(0); + int64 next = PG_GETARG_INT64(1); + bool iscalled = PG_GETARG_BOOL(2); + int64 result; - debug_msg("dbmirror:nextval Set resTuple"); - - nextSequenceValue = *(unsigned int *) (DatumGetPointer(SPI_getbinval(resTuple, - SPI_tuptable->tupdesc, - 1, &isNull))); + result = DatumGetInt64(DirectFunctionCall3(setval3_oid, + ObjectIdGetDatum(relid), + Int64GetDatum(next), + BoolGetDatum(iscalled))); + saveSequenceUpdate(relid, result, iscalled); + PG_RETURN_INT64(result); +} - debug_msg2("dbmirror:nextval Set SPI_getbinval:%d", nextSequenceValue); +PG_FUNCTION_INFO_V1(nextval_mirror); +Datum +nextval_mirror(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + int64 result; - saveSequenceUpdate(sequenceName, nextSequenceValue); - SPI_pfree(resTuple); - SPI_pfree(nextvalPlan); + result = DatumGetInt64(DirectFunctionCall1(nextval_oid, + ObjectIdGetDatum(relid))); - SPI_finish(); + saveSequenceUpdate(relid, result, true); - return Int64GetDatum(nextSequenceValue); + PG_RETURN_INT64(result); } -int -saveSequenceUpdate(const text *sequenceName, - int nextSequenceVal) +static void +saveSequenceUpdate(Oid relid, int64 nextValue, bool iscalled) { - - Oid insertArgTypes[2] = {TEXTOID, INT4OID}; + Oid insertArgTypes[2] = {NAMEOID, INT4OID}; Oid insertDataArgTypes[1] = {NAMEOID}; - void *insertPlan = NULL; - void *insertDataPlan = NULL; + void *insertPlan; + void *insertDataPlan; Datum insertDatum[2]; Datum insertDataDatum[1]; - char nextSequenceText[32]; + char nextSequenceText[64]; const char *insertQuery = "INSERT INTO dbmirror_Pending (TableName,Op,XID) VALUES" \ @@ -746,36 +659,50 @@ saveSequenceUpdate(const text *sequenceName, "INSERT INTO dbmirror_PendingData(SeqId,IsKey,Data) VALUES " \ "(currval('dbmirror_pending_seqid_seq'),'t',$1)"; - int ret; - + if (SPI_connect() < 0) + ereport(ERROR, + (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION), + errmsg("dbmirror:savesequenceupdate could not connect to SPI"))); insertPlan = SPI_prepare(insertQuery, 2, insertArgTypes); insertDataPlan = SPI_prepare(insertDataQuery, 1, insertDataArgTypes); - debug_msg("Prepared insert query"); - - if (insertPlan == NULL || insertDataPlan == NULL) - ereport(ERROR, (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION), errmsg("dbmirror:nextval error creating plan"))); + ereport(ERROR, + (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION), + errmsg("dbmirror:savesequenceupdate error creating plan"))); + insertDatum[0] = PointerGetDatum(get_rel_name(relid)); insertDatum[1] = Int32GetDatum(GetCurrentTransactionId()); - insertDatum[0] = PointerGetDatum(sequenceName); - sprintf(nextSequenceText, "%d", nextSequenceVal); + snprintf(nextSequenceText, sizeof(nextSequenceText), + INT64_FORMAT ",'%c'", + nextValue, iscalled ? 't' : 'f'); + + /* + * note type cheat here: we prepare a C string and then claim it is a + * NAME, which the system will coerce to varchar for us. + */ insertDataDatum[0] = PointerGetDatum(nextSequenceText); - debug_msg2("dbmirror:savesequenceupdate: Setting value %s", + + debug_msg2("dbmirror:savesequenceupdate: Setting value as %s", nextSequenceText); debug_msg("dbmirror:About to execute insert query"); - ret = SPI_execp(insertPlan, insertDatum, NULL, 1); + if (SPI_execp(insertPlan, insertDatum, NULL, 1) != SPI_OK_INSERT) + ereport(ERROR, + (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION), + errmsg("error inserting row in dbmirror_Pending"))); - ret = SPI_execp(insertDataPlan, insertDataDatum, NULL, 1); + if (SPI_execp(insertDataPlan, insertDataDatum, NULL, 1) != SPI_OK_INSERT) + ereport(ERROR, + (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION), + errmsg("error inserting row in dbmirror_PendingData"))); debug_msg("dbmirror:Insert query finished"); SPI_pfree(insertPlan); SPI_pfree(insertDataPlan); - return ret; - + SPI_finish(); } |
