diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
commit | 66765100f4257159622cefe57bed50125a5ad017 (patch) | |
tree | a88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /cpp/src/qpid/broker/SessionAdapter.cpp | |
parent | 1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff) | |
parent | 88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff) | |
download | qpid-python-rajith_jms_client.tar.gz |
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/SessionAdapter.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.cpp | 693 |
1 files changed, 0 insertions, 693 deletions
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp deleted file mode 100644 index 63c4b660b2..0000000000 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ /dev/null @@ -1,693 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "qpid/broker/SessionAdapter.h" -#include "qpid/broker/Connection.h" -#include "qpid/broker/Queue.h" -#include "qpid/Exception.h" -#include "qpid/framing/reply_exceptions.h" -#include "qpid/framing/enum.h" -#include "qpid/log/Statement.h" -#include "qpid/framing/SequenceSet.h" -#include "qpid/management/ManagementAgent.h" -#include "qpid/broker/SessionState.h" -#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h" -#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h" -#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h" -#include "qmf/org/apache/qpid/broker/EventQueueDelete.h" -#include "qmf/org/apache/qpid/broker/EventBind.h" -#include "qmf/org/apache/qpid/broker/EventUnbind.h" -#include "qmf/org/apache/qpid/broker/EventSubscribe.h" -#include "qmf/org/apache/qpid/broker/EventUnsubscribe.h" -#include <boost/format.hpp> -#include <boost/cast.hpp> -#include <boost/bind.hpp> - -namespace qpid { -namespace broker { - -using namespace qpid; -using namespace qpid::framing; -using namespace qpid::framing::dtx; -using namespace qpid::management; -namespace _qmf = qmf::org::apache::qpid::broker; - -typedef std::vector<Queue::shared_ptr> QueueVector; - -SessionAdapter::SessionAdapter(SemanticState& s) : - HandlerImpl(s), - exchangeImpl(s), - queueImpl(s), - messageImpl(s), - executionImpl(s), - txImpl(s), - dtxImpl(s) -{} - -static const std::string _TRUE("true"); -static const std::string _FALSE("false"); - -void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const string& type, - const string& alternateExchange, - bool passive, bool durable, bool /*autoDelete*/, const FieldTable& args){ - - //TODO: implement autoDelete - Exchange::shared_ptr alternate; - if (!alternateExchange.empty()) { - alternate = getBroker().getExchanges().get(alternateExchange); - } - if(passive){ - AclModule* acl = getBroker().getAcl(); - if (acl) { - //TODO: why does a passive declare require create - //permission? The purpose of the passive flag is to state - //that the exchange should *not* created. For - //authorisation a passive declare is similar to - //exchange-query. - std::map<acl::Property, std::string> params; - params.insert(make_pair(acl::PROP_TYPE, type)); - params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange)); - params.insert(make_pair(acl::PROP_PASSIVE, _TRUE)); - params.insert(make_pair(acl::PROP_DURABLE, durable ? _TRUE : _FALSE)); - if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchange,¶ms) ) - throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange create request from " << getConnection().getUserId())); - } - Exchange::shared_ptr actual(getBroker().getExchanges().get(exchange)); - checkType(actual, type); - checkAlternate(actual, alternate); - }else{ - if(exchange.find("amq.") == 0 || exchange.find("qpid.") == 0) { - throw framing::NotAllowedException(QPID_MSG("Exchange names beginning with \"amq.\" or \"qpid.\" are reserved. (exchange=\"" << exchange << "\")")); - } - try{ - std::pair<Exchange::shared_ptr, bool> response = - getBroker().createExchange(exchange, type, durable, alternateExchange, args, - getConnection().getUserId(), getConnection().getUrl()); - if (!response.second) { - //exchange already there, not created - checkType(response.first, type); - checkAlternate(response.first, alternate); - ManagementAgent* agent = getBroker().getManagementAgent(); - if (agent) - agent->raiseEvent(_qmf::EventExchangeDeclare(getConnection().getUrl(), - getConnection().getUserId(), - exchange, - type, - alternateExchange, - durable, - false, - ManagementAgent::toMap(args), - "existing")); - } - }catch(UnknownExchangeTypeException& /*e*/){ - throw NotFoundException(QPID_MSG("Exchange type not implemented: " << type)); - } - } -} - -void SessionAdapter::ExchangeHandlerImpl::checkType(Exchange::shared_ptr exchange, const std::string& type) -{ - if (!type.empty() && exchange->getType() != type) { - throw NotAllowedException(QPID_MSG("Exchange declared to be of type " << exchange->getType() << ", requested " << type)); - } -} - -void SessionAdapter::ExchangeHandlerImpl::checkAlternate(Exchange::shared_ptr exchange, Exchange::shared_ptr alternate) -{ - if (alternate && ((exchange->getAlternate() && alternate != exchange->getAlternate()) - || !exchange->getAlternate())) - throw NotAllowedException(QPID_MSG("Exchange declared with alternate-exchange " - << (exchange->getAlternate() ? exchange->getAlternate()->getName() : "<nonexistent>") - << ", requested " - << alternate->getName())); -} - -void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool /*ifUnused*/) -{ - //TODO: implement if-unused - getBroker().deleteExchange(name, getConnection().getUserId(), getConnection().getUrl()); -} - -ExchangeQueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& name) -{ - AclModule* acl = getBroker().getAcl(); - if (acl) { - if (!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_EXCHANGE,name,NULL) ) - throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange query request from " << getConnection().getUserId())); - } - - try { - Exchange::shared_ptr exchange(getBroker().getExchanges().get(name)); - return ExchangeQueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs()); - } catch (const NotFoundException& /*e*/) { - return ExchangeQueryResult("", false, true, FieldTable()); - } -} - -void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName, - const string& exchangeName, const string& routingKey, - const FieldTable& arguments) -{ - getBroker().bind(queueName, exchangeName, routingKey, arguments, - getConnection().getUserId(), getConnection().getUrl()); -} - -void SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName, - const string& exchangeName, - const string& routingKey) -{ - getBroker().unbind(queueName, exchangeName, routingKey, - getConnection().getUserId(), getConnection().getUrl()); -} - -ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string& exchangeName, - const std::string& queueName, - const std::string& key, - const framing::FieldTable& args) -{ - AclModule* acl = getBroker().getAcl(); - if (acl) { - std::map<acl::Property, std::string> params; - params.insert(make_pair(acl::PROP_QUEUENAME, queueName)); - params.insert(make_pair(acl::PROP_ROUTINGKEY, key)); - if (!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_EXCHANGE,exchangeName,¶ms) ) - throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange bound request from " << getConnection().getUserId())); - } - - Exchange::shared_ptr exchange; - try { - exchange = getBroker().getExchanges().get(exchangeName); - } catch (const NotFoundException&) {} - - Queue::shared_ptr queue; - if (!queueName.empty()) { - queue = getBroker().getQueues().find(queueName); - } - - if (!exchange) { - return ExchangeBoundResult(true, (!queueName.empty() && !queue), false, false, false); - } else if (!queueName.empty() && !queue) { - return ExchangeBoundResult(false, true, false, false, false); - } else if (exchange->isBound(queue, key.empty() ? 0 : &key, args.count() > 0 ? &args : &args)) { - return ExchangeBoundResult(false, false, false, false, false); - } else { - //need to test each specified option individually - bool queueMatched = queueName.empty() || exchange->isBound(queue, 0, 0); - bool keyMatched = key.empty() || exchange->isBound(Queue::shared_ptr(), &key, 0); - bool argsMatched = args.count() == 0 || exchange->isBound(Queue::shared_ptr(), 0, &args); - - return ExchangeBoundResult(false, false, !queueMatched, !keyMatched, !argsMatched); - } -} - -SessionAdapter::QueueHandlerImpl::QueueHandlerImpl(SemanticState& session) : HandlerHelper(session), broker(getBroker()) -{} - - -SessionAdapter::QueueHandlerImpl::~QueueHandlerImpl() -{ - try { - destroyExclusiveQueues(); - } catch (std::exception& e) { - QPID_LOG(error, e.what()); - } -} - -void SessionAdapter::QueueHandlerImpl::destroyExclusiveQueues() -{ - while (!exclusiveQueues.empty()) { - Queue::shared_ptr q(exclusiveQueues.front()); - q->releaseExclusiveOwnership(); - if (q->canAutoDelete()) { - Queue::tryAutoDelete(broker, q); - } - exclusiveQueues.erase(exclusiveQueues.begin()); - } -} - -bool SessionAdapter::QueueHandlerImpl::isLocal(const ConnectionToken* t) const -{ - return session.isLocal(t); -} - - -QueueQueryResult SessionAdapter::QueueHandlerImpl::query(const string& name) -{ - AclModule* acl = getBroker().getAcl(); - if (acl) { - if (!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_QUEUE,name,NULL) ) - throw UnauthorizedAccessException(QPID_MSG("ACL denied queue query request from " << getConnection().getUserId())); - } - - Queue::shared_ptr queue = session.getBroker().getQueues().find(name); - if (queue) { - - Exchange::shared_ptr alternateExchange = queue->getAlternateExchange(); - - return QueueQueryResult(queue->getName(), - alternateExchange ? alternateExchange->getName() : "", - queue->isDurable(), - queue->hasExclusiveOwner(), - queue->isAutoDelete(), - queue->getSettings(), - queue->getMessageCount(), - queue->getConsumerCount()); - } else { - return QueueQueryResult(); - } -} - -void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& alternateExchange, - bool passive, bool durable, bool exclusive, - bool autoDelete, const qpid::framing::FieldTable& arguments) -{ - Queue::shared_ptr queue; - if (passive && !name.empty()) { - AclModule* acl = getBroker().getAcl(); - if (acl) { - //TODO: why does a passive declare require create - //permission? The purpose of the passive flag is to state - //that the queue should *not* created. For - //authorisation a passive declare is similar to - //queue-query (or indeed a qmf query). - std::map<acl::Property, std::string> params; - params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange)); - params.insert(make_pair(acl::PROP_PASSIVE, _TRUE)); - params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? _TRUE : _FALSE))); - params.insert(make_pair(acl::PROP_EXCLUSIVE, std::string(exclusive ? _TRUE : _FALSE))); - params.insert(make_pair(acl::PROP_AUTODELETE, std::string(autoDelete ? _TRUE : _FALSE))); - params.insert(make_pair(acl::PROP_POLICYTYPE, arguments.getAsString("qpid.policy_type"))); - params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(arguments.getAsInt("qpid.max_count")))); - params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(arguments.getAsInt64("qpid.max_size")))); - if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_QUEUE,name,¶ms) ) - throw UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << getConnection().getUserId())); - } - queue = getQueue(name); - //TODO: check alternate-exchange is as expected - } else { - std::pair<Queue::shared_ptr, bool> queue_created = - getBroker().createQueue(name, durable, - autoDelete, - exclusive ? &session : 0, - alternateExchange, - arguments, - getConnection().getUserId(), - getConnection().getUrl()); - queue = queue_created.first; - assert(queue); - if (queue_created.second) { // This is a new queue - //handle automatic cleanup: - if (exclusive) { - exclusiveQueues.push_back(queue); - } - } else { - if (exclusive && queue->setExclusiveOwner(&session)) { - exclusiveQueues.push_back(queue); - } - ManagementAgent* agent = getBroker().getManagementAgent(); - if (agent) - agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(), - name, durable, exclusive, autoDelete, ManagementAgent::toMap(arguments), - "existing")); - } - - } - - if (exclusive && !queue->isExclusiveOwner(&session)) - throw ResourceLockedException(QPID_MSG("Cannot grant exclusive access to queue " - << queue->getName())); -} - -void SessionAdapter::QueueHandlerImpl::purge(const string& queue){ - AclModule* acl = getBroker().getAcl(); - if (acl) - { - if (!acl->authorise(getConnection().getUserId(),acl::ACT_PURGE,acl::OBJ_QUEUE,queue,NULL) ) - throw UnauthorizedAccessException(QPID_MSG("ACL denied queue purge request from " << getConnection().getUserId())); - } - getQueue(queue)->purge(); -} - -void SessionAdapter::QueueHandlerImpl::checkDelete(Queue::shared_ptr queue, bool ifUnused, bool ifEmpty) -{ - if (queue->hasExclusiveOwner() && !queue->isExclusiveOwner(&session)) { - throw ResourceLockedException(QPID_MSG("Cannot delete queue " - << queue->getName() << "; it is exclusive to another session")); - } else if(ifEmpty && queue->getMessageCount() > 0) { - throw PreconditionFailedException(QPID_MSG("Cannot delete queue " - << queue->getName() << "; queue not empty")); - } else if(ifUnused && queue->getConsumerCount() > 0) { - throw PreconditionFailedException(QPID_MSG("Cannot delete queue " - << queue->getName() << "; queue in use")); - } else if (queue->isExclusiveOwner(&session)) { - //remove the queue from the list of exclusive queues if necessary - QueueVector::iterator i = std::find(exclusiveQueues.begin(), - exclusiveQueues.end(), - queue); - if (i < exclusiveQueues.end()) exclusiveQueues.erase(i); - } -} - -void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnused, bool ifEmpty) -{ - getBroker().deleteQueue(queue, getConnection().getUserId(), getConnection().getUrl(), - boost::bind(&SessionAdapter::QueueHandlerImpl::checkDelete, this, _1, ifUnused, ifEmpty)); -} - -SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) : - HandlerHelper(s), - releaseRedeliveredOp(boost::bind(&SemanticState::release, &state, _1, _2, true)), - releaseOp(boost::bind(&SemanticState::release, &state, _1, _2, false)), - rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2)) - {} - -// -// Message class method handlers -// - -void SessionAdapter::MessageHandlerImpl::transfer(const string& /*destination*/, - uint8_t /*acceptMode*/, - uint8_t /*acquireMode*/) -{ - //not yet used (content containing assemblies treated differently at present - std::cout << "SessionAdapter::MessageHandlerImpl::transfer() called" << std::endl; -} - -void SessionAdapter::MessageHandlerImpl::release(const SequenceSet& transfers, bool setRedelivered) -{ - transfers.for_each(setRedelivered ? releaseRedeliveredOp : releaseOp); -} - -void -SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName, - const string& destination, - uint8_t acceptMode, - uint8_t acquireMode, - bool exclusive, - const string& resumeId, - uint64_t resumeTtl, - const FieldTable& arguments) -{ - - AclModule* acl = getBroker().getAcl(); - if (acl) - { - if (!acl->authorise(getConnection().getUserId(),acl::ACT_CONSUME,acl::OBJ_QUEUE,queueName,NULL) ) - throw UnauthorizedAccessException(QPID_MSG("ACL denied Queue subscribe request from " << getConnection().getUserId())); - } - - Queue::shared_ptr queue = getQueue(queueName); - if(!destination.empty() && state.exists(destination)) - throw NotAllowedException(QPID_MSG("Consumer tags must be unique")); - - if (queue->hasExclusiveOwner() && !queue->isExclusiveOwner(&session) && acquireMode == 0) - throw ResourceLockedException(QPID_MSG("Cannot subscribe to exclusive queue " - << queue->getName())); - - state.consume(destination, queue, - acceptMode == 0, acquireMode == 0, exclusive, - resumeId, resumeTtl, arguments); - - ManagementAgent* agent = getBroker().getManagementAgent(); - if (agent) - agent->raiseEvent(_qmf::EventSubscribe(getConnection().getUrl(), getConnection().getUserId(), - queueName, destination, exclusive, ManagementAgent::toMap(arguments))); -} - -void -SessionAdapter::MessageHandlerImpl::cancel(const string& destination ) -{ - if (!state.cancel(destination)) { - throw NotFoundException(QPID_MSG("No such subscription: " << destination)); - } - - ManagementAgent* agent = getBroker().getManagementAgent(); - if (agent) - agent->raiseEvent(_qmf::EventUnsubscribe(getConnection().getUrl(), getConnection().getUserId(), destination)); -} - -void -SessionAdapter::MessageHandlerImpl::reject(const SequenceSet& transfers, uint16_t /*code*/, const string& /*text*/ ) -{ - transfers.for_each(rejectOp); -} - -void SessionAdapter::MessageHandlerImpl::flow(const std::string& destination, uint8_t unit, uint32_t value) -{ - if (unit == 0) { - //message - state.addMessageCredit(destination, value); - } else if (unit == 1) { - //bytes - state.addByteCredit(destination, value); - } else { - //unknown - throw InvalidArgumentException(QPID_MSG("Invalid value for unit " << unit)); - } - -} - -void SessionAdapter::MessageHandlerImpl::setFlowMode(const std::string& destination, uint8_t mode) -{ - if (mode == 0) { - //credit - state.setCreditMode(destination); - } else if (mode == 1) { - //window - state.setWindowMode(destination); - } else{ - throw InvalidArgumentException(QPID_MSG("Invalid value for mode " << mode)); - } -} - -void SessionAdapter::MessageHandlerImpl::flush(const std::string& destination) -{ - state.flush(destination); -} - -void SessionAdapter::MessageHandlerImpl::stop(const std::string& destination) -{ - state.stop(destination); -} - -void SessionAdapter::MessageHandlerImpl::accept(const framing::SequenceSet& commands) -{ - state.accepted(commands); -} - -framing::MessageAcquireResult SessionAdapter::MessageHandlerImpl::acquire(const framing::SequenceSet& transfers) -{ - // FIXME aconway 2008-05-12: create SequenceSet directly, no need for intermediate results vector. - SequenceNumberSet results; - RangedOperation f = boost::bind(&SemanticState::acquire, &state, _1, _2, boost::ref(results)); - transfers.for_each(f); - - results = results.condense(); - SequenceSet acquisitions; - RangedOperation g = boost::bind(&SequenceSet::add, &acquisitions, _1, _2); - results.processRanges(g); - - return MessageAcquireResult(acquisitions); -} - -framing::MessageResumeResult SessionAdapter::MessageHandlerImpl::resume(const std::string& /*destination*/, - const std::string& /*resumeId*/) -{ - throw NotImplementedException("resuming transfers not yet supported"); -} - - - -void SessionAdapter::ExecutionHandlerImpl::sync() -{ - session.addPendingExecutionSync(); - /** @todo KAG - need a generic mechanism to allow a command to returning "not completed" status back to SessionState */ - -} - -void SessionAdapter::ExecutionHandlerImpl::result(const SequenceNumber& /*commandId*/, const string& /*value*/) -{ - //TODO: but currently never used client->server -} - -void SessionAdapter::ExecutionHandlerImpl::exception(uint16_t /*errorCode*/, - const SequenceNumber& /*commandId*/, - uint8_t /*classCode*/, - uint8_t /*commandCode*/, - uint8_t /*fieldIndex*/, - const std::string& /*description*/, - const framing::FieldTable& /*errorInfo*/) -{ - //TODO: again, not really used client->server but may be important - //for inter-broker links -} - - - -void SessionAdapter::TxHandlerImpl::select() -{ - state.startTx(); -} - -void SessionAdapter::TxHandlerImpl::commit() -{ - state.commit(&getBroker().getStore()); -} - -void SessionAdapter::TxHandlerImpl::rollback() -{ - state.rollback(); -} - -std::string SessionAdapter::DtxHandlerImpl::convert(const framing::Xid& xid) -{ - std::string encoded; - encode(xid, encoded); - return encoded; -} - -void SessionAdapter::DtxHandlerImpl::select() -{ - state.selectDtx(); -} - -XaResult SessionAdapter::DtxHandlerImpl::end(const Xid& xid, - bool fail, - bool suspend) -{ - try { - if (fail) { - state.endDtx(convert(xid), true); - if (suspend) { - throw CommandInvalidException(QPID_MSG("End and suspend cannot both be set.")); - } else { - return XaResult(XA_STATUS_XA_RBROLLBACK); - } - } else { - if (suspend) { - state.suspendDtx(convert(xid)); - } else { - state.endDtx(convert(xid), false); - } - return XaResult(XA_STATUS_XA_OK); - } - } catch (const DtxTimeoutException& /*e*/) { - return XaResult(XA_STATUS_XA_RBTIMEOUT); - } -} - -XaResult SessionAdapter::DtxHandlerImpl::start(const Xid& xid, - bool join, - bool resume) -{ - if (join && resume) { - throw CommandInvalidException(QPID_MSG("Join and resume cannot both be set.")); - } - try { - if (resume) { - state.resumeDtx(convert(xid)); - } else { - state.startDtx(convert(xid), getBroker().getDtxManager(), join); - } - return XaResult(XA_STATUS_XA_OK); - } catch (const DtxTimeoutException& /*e*/) { - return XaResult(XA_STATUS_XA_RBTIMEOUT); - } -} - -XaResult SessionAdapter::DtxHandlerImpl::prepare(const Xid& xid) -{ - try { - bool ok = getBroker().getDtxManager().prepare(convert(xid)); - return XaResult(ok ? XA_STATUS_XA_OK : XA_STATUS_XA_RBROLLBACK); - } catch (const DtxTimeoutException& /*e*/) { - return XaResult(XA_STATUS_XA_RBTIMEOUT); - } -} - -XaResult SessionAdapter::DtxHandlerImpl::commit(const Xid& xid, - bool onePhase) -{ - try { - bool ok = getBroker().getDtxManager().commit(convert(xid), onePhase); - return XaResult(ok ? XA_STATUS_XA_OK : XA_STATUS_XA_RBROLLBACK); - } catch (const DtxTimeoutException& /*e*/) { - return XaResult(XA_STATUS_XA_RBTIMEOUT); - } -} - - -XaResult SessionAdapter::DtxHandlerImpl::rollback(const Xid& xid) -{ - try { - getBroker().getDtxManager().rollback(convert(xid)); - return XaResult(XA_STATUS_XA_OK); - } catch (const DtxTimeoutException& /*e*/) { - return XaResult(XA_STATUS_XA_RBTIMEOUT); - } -} - -DtxRecoverResult SessionAdapter::DtxHandlerImpl::recover() -{ - std::set<std::string> xids; - getBroker().getStore().collectPreparedXids(xids); - /* - * create array of long structs - */ - Array indoubt(0xAB); - for (std::set<std::string>::iterator i = xids.begin(); i != xids.end(); i++) { - boost::shared_ptr<FieldValue> xid(new Struct32Value(*i)); - indoubt.add(xid); - } - return DtxRecoverResult(indoubt); -} - -void SessionAdapter::DtxHandlerImpl::forget(const Xid& xid) -{ - //Currently no heuristic completion is supported, so this should never be used. - throw NotImplementedException(QPID_MSG("Forget not implemented. Branch with xid " << xid << " not heuristically completed!")); -} - -DtxGetTimeoutResult SessionAdapter::DtxHandlerImpl::getTimeout(const Xid& xid) -{ - uint32_t timeout = getBroker().getDtxManager().getTimeout(convert(xid)); - return DtxGetTimeoutResult(timeout); -} - - -void SessionAdapter::DtxHandlerImpl::setTimeout(const Xid& xid, - uint32_t timeout) -{ - getBroker().getDtxManager().setTimeout(convert(xid), timeout); -} - - -Queue::shared_ptr SessionAdapter::HandlerHelper::getQueue(const string& name) const { - Queue::shared_ptr queue; - if (name.empty()) { - throw framing::IllegalArgumentException(QPID_MSG("No queue name specified.")); - } else { - queue = session.getBroker().getQueues().find(name); - if (!queue) - throw framing::NotFoundException(QPID_MSG("Queue not found: "<<name)); - } - return queue; -} - -}} // namespace qpid::broker - - |