diff options
Diffstat (limited to 'src/backend/commands/portalcmds.c')
| -rw-r--r-- | src/backend/commands/portalcmds.c | 634 |
1 files changed, 124 insertions, 510 deletions
diff --git a/src/backend/commands/portalcmds.c b/src/backend/commands/portalcmds.c index 9dc7583f06..35ed8a270b 100644 --- a/src/backend/commands/portalcmds.c +++ b/src/backend/commands/portalcmds.c @@ -1,14 +1,20 @@ /*------------------------------------------------------------------------- * * portalcmds.c - * portal support code + * Utility commands affecting portals (that is, SQL cursor commands) + * + * Note: see also tcop/pquery.c, which implements portal operations for + * the FE/BE protocol. This module uses pquery.c for some operations. + * And both modules depend on utils/mmgr/portalmem.c, which controls + * storage management for portals (but doesn't run any queries in them). + * * * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/commands/portalcmds.c,v 1.12 2003/04/29 03:21:29 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/commands/portalcmds.c,v 1.13 2003/05/02 20:54:33 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -22,18 +28,10 @@ #include "executor/executor.h" #include "optimizer/planner.h" #include "rewrite/rewriteHandler.h" +#include "tcop/pquery.h" #include "utils/memutils.h" -static long DoRelativeFetch(Portal portal, - bool forward, - long count, - CommandDest dest); -static uint32 RunFromStore(Portal portal, ScanDirection direction, long count); -static void DoPortalRewind(Portal portal); -static Portal PreparePortal(DeclareCursorStmt *stmt); - - /* * PerformCursorOpen * Execute SQL DECLARE CURSOR command. @@ -46,8 +44,13 @@ PerformCursorOpen(DeclareCursorStmt *stmt, CommandDest dest) Plan *plan; Portal portal; MemoryContext oldContext; - char *cursorName; - QueryDesc *queryDesc; + + /* + * Disallow empty-string cursor name (conflicts with protocol-level + * unnamed portal). + */ + if (strlen(stmt->portalname) == 0) + elog(ERROR, "Invalid cursor name: must not be empty"); /* * If this is a non-holdable cursor, we require that this statement @@ -77,38 +80,53 @@ PerformCursorOpen(DeclareCursorStmt *stmt, CommandDest dest) plan = planner(query, true, stmt->options); - /* If binary cursor, switch to alternate output format */ - if ((stmt->options & CURSOR_OPT_BINARY) && dest == Remote) - dest = RemoteInternal; - /* * Create a portal and copy the query and plan into its memory context. + * (If a duplicate cursor name already exists, warn and drop it.) */ - portal = PreparePortal(stmt); + portal = CreatePortal(stmt->portalname, true, false); oldContext = MemoryContextSwitchTo(PortalGetHeapMemory(portal)); + query = copyObject(query); plan = copyObject(plan); + PortalDefineQuery(portal, + NULL, /* unfortunately don't have sourceText */ + "SELECT", /* cursor's query is always a SELECT */ + makeList1(query), + makeList1(plan), + PortalGetHeapMemory(portal)); + + MemoryContextSwitchTo(oldContext); + /* - * Create the QueryDesc object in the portal context, too. + * Set up options for portal. + * + * If the user didn't specify a SCROLL type, allow or disallow + * scrolling based on whether it would require any additional + * runtime overhead to do so. */ - cursorName = pstrdup(stmt->portalname); - queryDesc = CreateQueryDesc(query, plan, dest, cursorName, NULL, false); + portal->cursorOptions = stmt->options; + if (!(portal->cursorOptions & (CURSOR_OPT_SCROLL | CURSOR_OPT_NO_SCROLL))) + { + if (ExecSupportsBackwardScan(plan)) + portal->cursorOptions |= CURSOR_OPT_SCROLL; + else + portal->cursorOptions |= CURSOR_OPT_NO_SCROLL; + } /* - * call ExecStart to prepare the plan for execution + * Start execution --- never any params for a cursor. */ - ExecutorStart(queryDesc); + PortalStart(portal, NULL); - /* Arrange to shut down the executor if portal is dropped */ - PortalSetQuery(portal, queryDesc); + Assert(portal->strategy == PORTAL_ONE_SELECT); /* * We're done; the query won't actually be run until PerformPortalFetch * is called. */ - MemoryContextSwitchTo(oldContext); } /* @@ -130,10 +148,6 @@ PerformPortalFetch(FetchStmt *stmt, Portal portal; long nprocessed; - /* initialize completion status in case of early exit */ - if (completionTag) - strcpy(completionTag, stmt->ismove ? "MOVE 0" : "FETCH 0"); - /* get the portal from the portal name */ portal = GetPortalByName(stmt->portalname); if (!PortalIsValid(portal)) @@ -141,14 +155,27 @@ PerformPortalFetch(FetchStmt *stmt, /* FIXME: shouldn't this be an ERROR? */ elog(WARNING, "PerformPortalFetch: portal \"%s\" not found", stmt->portalname); + if (completionTag) + strcpy(completionTag, stmt->ismove ? "MOVE 0" : "FETCH 0"); return; } + /* + * Adjust dest if needed. MOVE wants dest = None. + * + * If fetching from a binary cursor and the requested destination is + * Remote, change it to RemoteInternal. + */ + if (stmt->ismove) + dest = None; + else if (dest == Remote && (portal->cursorOptions & CURSOR_OPT_BINARY)) + dest = RemoteInternal; + /* Do it */ - nprocessed = DoPortalFetch(portal, - stmt->direction, - stmt->howMany, - stmt->ismove ? None : dest); + nprocessed = PortalRunFetch(portal, + stmt->direction, + stmt->howMany, + dest); /* Return command status if wanted */ if (completionTag) @@ -158,416 +185,6 @@ PerformPortalFetch(FetchStmt *stmt, } /* - * DoPortalFetch - * Guts of PerformPortalFetch --- shared with SPI cursor operations. - * Caller must already have validated the Portal. - * - * Returns number of rows processed (suitable for use in result tag) - */ -long -DoPortalFetch(Portal portal, - FetchDirection fdirection, - long count, - CommandDest dest) -{ - bool forward; - - switch (fdirection) - { - case FETCH_FORWARD: - if (count < 0) - { - fdirection = FETCH_BACKWARD; - count = -count; - } - /* fall out of switch to share code with FETCH_BACKWARD */ - break; - case FETCH_BACKWARD: - if (count < 0) - { - fdirection = FETCH_FORWARD; - count = -count; - } - /* fall out of switch to share code with FETCH_FORWARD */ - break; - case FETCH_ABSOLUTE: - if (count > 0) - { - /* - * Definition: Rewind to start, advance count-1 rows, return - * next row (if any). In practice, if the goal is less than - * halfway back to the start, it's better to scan from where - * we are. In any case, we arrange to fetch the target row - * going forwards. - */ - if (portal->posOverflow || portal->portalPos == LONG_MAX || - count-1 <= portal->portalPos / 2) - { - DoPortalRewind(portal); - if (count > 1) - DoRelativeFetch(portal, true, count-1, None); - } - else - { - long pos = portal->portalPos; - - if (portal->atEnd) - pos++; /* need one extra fetch if off end */ - if (count <= pos) - DoRelativeFetch(portal, false, pos-count+1, None); - else if (count > pos+1) - DoRelativeFetch(portal, true, count-pos-1, None); - } - return DoRelativeFetch(portal, true, 1L, dest); - } - else if (count < 0) - { - /* - * Definition: Advance to end, back up abs(count)-1 rows, - * return prior row (if any). We could optimize this if we - * knew in advance where the end was, but typically we won't. - * (Is it worth considering case where count > half of size - * of query? We could rewind once we know the size ...) - */ - DoRelativeFetch(portal, true, FETCH_ALL, None); - if (count < -1) - DoRelativeFetch(portal, false, -count-1, None); - return DoRelativeFetch(portal, false, 1L, dest); - } - else /* count == 0 */ - { - /* Rewind to start, return zero rows */ - DoPortalRewind(portal); - return DoRelativeFetch(portal, true, 0L, dest); - } - break; - case FETCH_RELATIVE: - if (count > 0) - { - /* - * Definition: advance count-1 rows, return next row (if any). - */ - if (count > 1) - DoRelativeFetch(portal, true, count-1, None); - return DoRelativeFetch(portal, true, 1L, dest); - } - else if (count < 0) - { - /* - * Definition: back up abs(count)-1 rows, return prior row - * (if any). - */ - if (count < -1) - DoRelativeFetch(portal, false, -count-1, None); - return DoRelativeFetch(portal, false, 1L, dest); - } - else /* count == 0 */ - { - /* Same as FETCH FORWARD 0, so fall out of switch */ - fdirection = FETCH_FORWARD; - } - break; - default: - elog(ERROR, "DoPortalFetch: bogus direction"); - break; - } - - /* - * Get here with fdirection == FETCH_FORWARD or FETCH_BACKWARD, - * and count >= 0. - */ - forward = (fdirection == FETCH_FORWARD); - - /* - * Zero count means to re-fetch the current row, if any (per SQL92) - */ - if (count == 0) - { - bool on_row; - - /* Are we sitting on a row? */ - on_row = (!portal->atStart && !portal->atEnd); - - if (dest == None) - { - /* MOVE 0 returns 0/1 based on if FETCH 0 would return a row */ - return on_row ? 1L : 0L; - } - else - { - /* - * If we are sitting on a row, back up one so we can re-fetch it. - * If we are not sitting on a row, we still have to start up and - * shut down the executor so that the destination is initialized - * and shut down correctly; so keep going. To DoRelativeFetch, - * count == 0 means we will retrieve no row. - */ - if (on_row) - { - DoRelativeFetch(portal, false, 1L, None); - /* Set up to fetch one row forward */ - count = 1; - forward = true; - } - } - } - - /* - * Optimize MOVE BACKWARD ALL into a Rewind. - */ - if (!forward && count == FETCH_ALL && dest == None) - { - long result = portal->portalPos; - - if (result > 0 && !portal->atEnd) - result--; - DoPortalRewind(portal); - /* result is bogus if pos had overflowed, but it's best we can do */ - return result; - } - - return DoRelativeFetch(portal, forward, count, dest); -} - -/* - * DoRelativeFetch - * Do fetch for a simple N-rows-forward-or-backward case. - * - * count <= 0 is interpreted as a no-op: the destination gets started up - * and shut down, but nothing else happens. Also, count == FETCH_ALL is - * interpreted as "all rows". - * - * Caller must already have validated the Portal. - * - * Returns number of rows processed (suitable for use in result tag) - */ -static long -DoRelativeFetch(Portal portal, - bool forward, - long count, - CommandDest dest) -{ - QueryDesc *queryDesc; - QueryDesc temp_queryDesc; - ScanDirection direction; - uint32 nprocessed; - - queryDesc = PortalGetQueryDesc(portal); - - /* - * If the requested destination is not the same as the query's - * original destination, make a temporary QueryDesc with the proper - * destination. This supports MOVE, for example, which will pass in - * dest = None. - * - * EXCEPTION: if the query's original dest is RemoteInternal (ie, it's a - * binary cursor) and the request is Remote, we do NOT override the - * original dest. This is necessary since a FETCH command will pass - * dest = Remote, not knowing whether the cursor is binary or not. - */ - if (dest != queryDesc->dest && - !(queryDesc->dest == RemoteInternal && dest == Remote)) - { - memcpy(&temp_queryDesc, queryDesc, sizeof(QueryDesc)); - temp_queryDesc.dest = dest; - queryDesc = &temp_queryDesc; - } - - /* - * Determine which direction to go in, and check to see if we're - * already at the end of the available tuples in that direction. If - * so, set the direction to NoMovement to avoid trying to fetch any - * tuples. (This check exists because not all plan node types are - * robust about being called again if they've already returned NULL - * once.) Then call the executor (we must not skip this, because the - * destination needs to see a setup and shutdown even if no tuples are - * available). Finally, update the portal position state depending on - * the number of tuples that were retrieved. - */ - if (forward) - { - if (portal->atEnd || count <= 0) - direction = NoMovementScanDirection; - else - direction = ForwardScanDirection; - - /* In the executor, zero count processes all rows */ - if (count == FETCH_ALL) - count = 0; - - if (portal->holdStore) - nprocessed = RunFromStore(portal, direction, count); - else - { - Assert(portal->executorRunning); - ExecutorRun(queryDesc, direction, count); - nprocessed = queryDesc->estate->es_processed; - } - - if (direction != NoMovementScanDirection) - { - long oldPos; - - if (nprocessed > 0) - portal->atStart = false; /* OK to go backward now */ - if (count == 0 || - (unsigned long) nprocessed < (unsigned long) count) - portal->atEnd = true; /* we retrieved 'em all */ - oldPos = portal->portalPos; - portal->portalPos += nprocessed; - /* portalPos doesn't advance when we fall off the end */ - if (portal->portalPos < oldPos) - portal->posOverflow = true; - } - } - else - { - if (portal->scrollType == DISABLE_SCROLL) - elog(ERROR, "Cursor can only scan forward" - "\n\tDeclare it with SCROLL option to enable backward scan"); - - if (portal->atStart || count <= 0) - direction = NoMovementScanDirection; - else - direction = BackwardScanDirection; - - /* In the executor, zero count processes all rows */ - if (count == FETCH_ALL) - count = 0; - - if (portal->holdStore) - nprocessed = RunFromStore(portal, direction, count); - else - { - Assert(portal->executorRunning); - ExecutorRun(queryDesc, direction, count); - nprocessed = queryDesc->estate->es_processed; - } - - if (direction != NoMovementScanDirection) - { - if (nprocessed > 0 && portal->atEnd) - { - portal->atEnd = false; /* OK to go forward now */ - portal->portalPos++; /* adjust for endpoint case */ - } - if (count == 0 || - (unsigned long) nprocessed < (unsigned long) count) - { - portal->atStart = true; /* we retrieved 'em all */ - portal->portalPos = 0; - portal->posOverflow = false; - } - else - { - long oldPos; - - oldPos = portal->portalPos; - portal->portalPos -= nprocessed; - if (portal->portalPos > oldPos || - portal->portalPos <= 0) - portal->posOverflow = true; - } - } - } - - return nprocessed; -} - -/* - * RunFromStore - * Fetch tuples from the portal's tuple store. - * - * Calling conventions are similar to ExecutorRun, except that we - * do not depend on having an estate, and therefore return the number - * of tuples processed as the result, not in estate->es_processed. - * - * One difference from ExecutorRun is that the destination receiver functions - * are run in the caller's memory context (since we have no estate). Watch - * out for memory leaks. - */ -static uint32 -RunFromStore(Portal portal, ScanDirection direction, long count) -{ - QueryDesc *queryDesc = PortalGetQueryDesc(portal); - DestReceiver *destfunc; - long current_tuple_count = 0; - - destfunc = DestToFunction(queryDesc->dest); - (*destfunc->setup) (destfunc, queryDesc->operation, - queryDesc->portalName, queryDesc->tupDesc); - - if (direction == NoMovementScanDirection) - { - /* do nothing except start/stop the destination */ - } - else - { - bool forward = (direction == ForwardScanDirection); - - for (;;) - { - MemoryContext oldcontext; - HeapTuple tup; - bool should_free; - - oldcontext = MemoryContextSwitchTo(portal->holdContext); - - tup = tuplestore_getheaptuple(portal->holdStore, forward, - &should_free); - - MemoryContextSwitchTo(oldcontext); - - if (tup == NULL) - break; - - (*destfunc->receiveTuple) (tup, queryDesc->tupDesc, destfunc); - - if (should_free) - pfree(tup); - - /* - * check our tuple count.. if we've processed the proper number - * then quit, else loop again and process more tuples. Zero - * count means no limit. - */ - current_tuple_count++; - if (count && count == current_tuple_count) - break; - } - } - - (*destfunc->cleanup) (destfunc); - - return (uint32) current_tuple_count; -} - -/* - * DoPortalRewind - rewind a Portal to starting point - */ -static void -DoPortalRewind(Portal portal) -{ - if (portal->holdStore) - { - MemoryContext oldcontext; - - oldcontext = MemoryContextSwitchTo(portal->holdContext); - tuplestore_rescan(portal->holdStore); - MemoryContextSwitchTo(oldcontext); - } - if (portal->executorRunning) - { - ExecutorRewind(PortalGetQueryDesc(portal)); - } - - portal->atStart = true; - portal->atEnd = false; - portal->portalPos = 0; - portal->posOverflow = false; -} - -/* * PerformPortalClose * Close a cursor. */ @@ -594,53 +211,6 @@ PerformPortalClose(char *name) } /* - * PreparePortal - * Given a DECLARE CURSOR statement, returns the Portal data - * structure based on that statement that is used to manage the - * Portal internally. If a portal with specified name already - * exists, it is replaced. - */ -static Portal -PreparePortal(DeclareCursorStmt *stmt) -{ - Portal portal; - - /* - * Check for already-in-use portal name. - */ - portal = GetPortalByName(stmt->portalname); - if (PortalIsValid(portal)) - { - /* - * XXX Should we raise an error rather than closing the old - * portal? - */ - elog(WARNING, "Closing pre-existing portal \"%s\"", - stmt->portalname); - PortalDrop(portal, false); - } - - /* - * Create the new portal. - */ - portal = CreatePortal(stmt->portalname); - - /* - * Modify the newly created portal based on the options specified in - * the DECLARE CURSOR statement. - */ - if (stmt->options & CURSOR_OPT_SCROLL) - portal->scrollType = ENABLE_SCROLL; - else if (stmt->options & CURSOR_OPT_NO_SCROLL) - portal->scrollType = DISABLE_SCROLL; - - if (stmt->options & CURSOR_OPT_HOLD) - portal->holdOpen = true; - - return portal; -} - -/* * PortalCleanup * * Clean up a portal when it's dropped. This is the standard cleanup hook @@ -649,6 +219,8 @@ PreparePortal(DeclareCursorStmt *stmt) void PortalCleanup(Portal portal, bool isError) { + QueryDesc *queryDesc; + /* * sanity checks */ @@ -658,7 +230,8 @@ PortalCleanup(Portal portal, bool isError) /* * Delete tuplestore if present. (Note: portalmem.c is responsible * for removing holdContext.) We should do this even under error - * conditions. + * conditions; since the tuplestore would have been using cross- + * transaction storage, its temp files need to be explicitly deleted. */ if (portal->holdStore) { @@ -674,11 +247,12 @@ PortalCleanup(Portal portal, bool isError) * abort, since other mechanisms will take care of releasing executor * resources, and we can't be sure that ExecutorEnd itself wouldn't fail. */ - if (portal->executorRunning) + queryDesc = PortalGetQueryDesc(portal); + if (queryDesc) { - portal->executorRunning = false; + portal->queryDesc = NULL; if (!isError) - ExecutorEnd(PortalGetQueryDesc(portal)); + ExecutorEnd(queryDesc); } } @@ -694,9 +268,10 @@ void PersistHoldablePortal(Portal portal) { QueryDesc *queryDesc = PortalGetQueryDesc(portal); + Portal saveCurrentPortal; + MemoryContext savePortalContext; + MemoryContext saveQueryContext; MemoryContext oldcxt; - CommandDest olddest; - TupleDesc tupdesc; /* * If we're preserving a holdable portal, we had better be @@ -704,6 +279,9 @@ PersistHoldablePortal(Portal portal) */ Assert(portal->createXact == GetCurrentTransactionId()); Assert(portal->holdStore == NULL); + Assert(queryDesc != NULL); + Assert(portal->portalReady); + Assert(!portal->portalDone); /* * This context is used to store the tuple set. @@ -716,6 +294,34 @@ PersistHoldablePortal(Portal portal) portal->holdStore = tuplestore_begin_heap(true, true, SortMem); /* + * Before closing down the executor, we must copy the tupdesc, since + * it was created in executor memory. Note we are copying it into + * the holdContext. + */ + portal->tupDesc = CreateTupleDescCopy(portal->tupDesc); + + MemoryContextSwitchTo(oldcxt); + + /* + * Check for improper portal use, and mark portal active. + */ + if (portal->portalActive) + elog(ERROR, "Portal \"%s\" already active", portal->name); + portal->portalActive = true; + + /* + * Set global portal and context pointers. + */ + saveCurrentPortal = CurrentPortal; + CurrentPortal = portal; + savePortalContext = PortalContext; + PortalContext = PortalGetHeapMemory(portal); + saveQueryContext = QueryContext; + QueryContext = portal->queryContext; + + MemoryContextSwitchTo(PortalContext); + + /* * Rewind the executor: we need to store the entire result set in * the tuplestore, so that subsequent backward FETCHs can be * processed. @@ -723,40 +329,40 @@ PersistHoldablePortal(Portal portal) ExecutorRewind(queryDesc); /* Set the destination to output to the tuplestore */ - olddest = queryDesc->dest; queryDesc->dest = Tuplestore; /* Fetch the result set into the tuplestore */ ExecutorRun(queryDesc, ForwardScanDirection, 0L); - queryDesc->dest = olddest; - - /* - * Before closing down the executor, we must copy the tupdesc, since - * it was created in executor memory. - */ - tupdesc = CreateTupleDescCopy(queryDesc->tupDesc); - /* * Now shut down the inner executor. */ - portal->executorRunning = false; + portal->queryDesc = NULL; /* prevent double shutdown */ ExecutorEnd(queryDesc); - /* ExecutorEnd clears this, so must wait to save copied pointer */ - queryDesc->tupDesc = tupdesc; + /* Mark portal not active */ + portal->portalActive = false; + + CurrentPortal = saveCurrentPortal; + PortalContext = savePortalContext; + QueryContext = saveQueryContext; /* * Reset the position in the result set: ideally, this could be * implemented by just skipping straight to the tuple # that we need * to be at, but the tuplestore API doesn't support that. So we * start at the beginning of the tuplestore and iterate through it - * until we reach where we need to be. + * until we reach where we need to be. FIXME someday? */ + MemoryContextSwitchTo(portal->holdContext); + if (!portal->atEnd) { long store_pos; + if (portal->posOverflow) /* oops, cannot trust portalPos */ + elog(ERROR, "Unable to reposition held cursor"); + tuplestore_rescan(portal->holdStore); for (store_pos = 0; store_pos < portal->portalPos; store_pos++) @@ -777,4 +383,12 @@ PersistHoldablePortal(Portal portal) } MemoryContextSwitchTo(oldcxt); + + /* + * We can now release any subsidiary memory of the portal's heap + * context; we'll never use it again. The executor already dropped + * its context, but this will clean up anything that glommed onto + * the portal's heap via PortalContext. + */ + MemoryContextDeleteChildren(PortalGetHeapMemory(portal)); } |
