diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/amqp/Session.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Session.cpp | 120 |
1 files changed, 76 insertions, 44 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp index 0e44374d19..3b65e6a64d 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp @@ -61,6 +61,8 @@ namespace qpid { namespace broker { namespace amqp { +using namespace qpid::amqp::transaction; + namespace { pn_bytes_t convert(const std::string& s) { @@ -209,6 +211,7 @@ class IncomingToCoordinator : public DecodingIncoming public: IncomingToCoordinator(pn_link_t* link, Broker& broker, Session& parent) : DecodingIncoming(link, broker, parent, std::string(), "txn-ctrl", pn_link_name(link)) {} + ~IncomingToCoordinator() { session.abort(); } void deliver(boost::intrusive_ptr<qpid::broker::amqp::Message>, pn_delivery_t*); void handle(qpid::broker::Message&, qpid::broker::TxBuffer*) {} @@ -218,7 +221,9 @@ class IncomingToCoordinator : public DecodingIncoming Session::Session(pn_session_t* s, Connection& c, qpid::sys::OutputControl& o) : ManagedSession(c.getBroker(), c, (boost::format("%1%") % s).str()), session(s), connection(c), out(o), deleted(false), authorise(connection.getUserId(), connection.getBroker().getAcl()), - detachRequested(), txnId((boost::format("%1%") % s).str()) {} + detachRequested(), + tx(*this) +{} Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* terminus, bool incoming) @@ -636,11 +641,12 @@ void Session::readable(pn_link_t* link, pn_delivery_t* delivery) if (target->second->haveWork()) out.activateOutput(); } } + void Session::writable(pn_link_t* link, pn_delivery_t* delivery) { OutgoingLinks::iterator sender = outgoing.find(link); if (sender == outgoing.end()) { - QPID_LOG(error, "Delivery returned for unknown link"); + QPID_LOG(error, "Delivery returned for unknown link " << pn_link_name(link)); } else { sender->second->handle(delivery); } @@ -649,7 +655,7 @@ void Session::writable(pn_link_t* link, pn_delivery_t* delivery) bool Session::dispatch() { bool output(false); - if (commitPending.boolCompareAndSwap(true, false)) { + if (tx.commitPending.boolCompareAndSwap(true, false)) { committed(true); } for (OutgoingLinks::iterator s = outgoing.begin(); s != outgoing.end();) { @@ -737,7 +743,7 @@ void Session::detachedByManagement() TxBuffer* Session::getTransaction(const std::string& id) { - return (txn.get() && id == txnId) ? txn.get() : 0; + return (tx.buffer.get() && id == tx.id) ? tx.buffer.get() : 0; } TxBuffer* Session::getTransaction(pn_delivery_t* delivery) @@ -748,42 +754,41 @@ TxBuffer* Session::getTransaction(pn_delivery_t* delivery) std::pair<TxBuffer*,uint64_t> Session::getTransactionalState(pn_delivery_t* delivery) { std::pair<TxBuffer*,uint64_t> result((TxBuffer*)0, 0); - if (pn_delivery_remote_state(delivery) == qpid::amqp::transaction::TRANSACTIONAL_STATE_CODE) { + if (pn_delivery_remote_state(delivery) == TRANSACTIONAL_STATE_CODE) { pn_data_t* data = pn_disposition_data(pn_delivery_remote(delivery)); - if (data && pn_data_next(data)) { - size_t count = pn_data_get_list(data); - if (count > 0) { + pn_data_rewind(data); + size_t count = 0; + if (data && pn_data_next(data) && (count = pn_data_get_list(data)) > 0) { + pn_data_enter(data); + pn_data_next(data); + std::string id = convert(pn_data_get_binary(data)); + result.first = getTransaction(id); + if (!result.first) { + QPID_LOG(error, "Transaction not found for id: " << id); + } + if (count > 1 && pn_data_next(data)) { pn_data_enter(data); pn_data_next(data); - std::string id = convert(pn_data_get_binary(data)); - result.first = getTransaction(id); - if (!result.first) { - QPID_LOG(error, "Transaction not found for id: " << id); - } - if (count > 1 && pn_data_next(data) && pn_data_is_described(data)) { - pn_data_enter(data); - pn_data_next(data); - result.second = pn_data_get_ulong(data); - } - pn_data_exit(data); + result.second = pn_data_get_ulong(data); } - } else { - QPID_LOG(error, "Transactional delivery " << delivery << " appears to have no data"); } + else + QPID_LOG(error, "Transactional delivery " << delivery << " appears to have no data"); } return result; } std::string Session::declare() { - if (txn.get()) { + if (tx.buffer.get()) { //not sure what the error code should be; none in spec really fit well. - throw Exception(qpid::amqp::error_conditions::transaction::ROLLBACK, "Session only supports one transaction active at a time"); + throw Exception(qpid::amqp::error_conditions::transaction::ROLLBACK, + "Session only supports one transaction active at a time"); } - txn = boost::intrusive_ptr<TxBuffer>(new TxBuffer()); - connection.getBroker().getBrokerObservers().startTx(txn); + tx.buffer = boost::intrusive_ptr<TxBuffer>(new TxBuffer()); + connection.getBroker().getBrokerObservers().startTx(tx.buffer); txStarted(); - return txnId; + return tx.id; } namespace { @@ -797,32 +802,41 @@ namespace { boost::intrusive_ptr<qpid::broker::AsyncCompletion::Callback> copy(new AsyncCommit(session)); return copy; } + private: boost::shared_ptr<Session> session; }; } -void Session::discharge(const std::string& id, bool failed) +void Session::discharge(const std::string& id, bool failed, pn_delivery_t* delivery) { - if (!txn.get() || id != txnId) { - throw Exception(qpid::amqp::error_conditions::transaction::UNKNOWN_ID, "No transaction declared with that id"); + QPID_LOG(debug, "Coordinator " << (failed ? " rollback" : " commit") + << " transaction " << id); + if (!tx.buffer.get() || id != tx.id) { + throw Exception(qpid::amqp::error_conditions::transaction::UNKNOWN_ID, + Msg() << "Cannot discharge transaction " << id + << (tx.buffer.get() ? Msg() << ", current transaction is " << tx.id : + Msg() << ", no current transaction")); } + tx.discharge = delivery; if (failed) { abort(); } else { - txn->begin(); - txn->startCommit(&connection.getBroker().getStore()); + tx.buffer->begin(); + tx.buffer->startCommit(&connection.getBroker().getStore()); AsyncCommit callback(shared_from_this()); - txn->end(callback); + tx.buffer->end(callback); } } void Session::abort() { - if (txn) { - txn->rollback(); + if (tx.buffer) { + tx.dischargeComplete(); + tx.buffer->rollback(); txAborted(); - txn = boost::intrusive_ptr<TxBuffer>(); + tx.buffer.reset(); + QPID_LOG(debug, "Transaction " << tx.id << " rolled back"); } } @@ -830,16 +844,18 @@ void Session::committed(bool sync) { if (sync) { //this is on IO thread - if (txn.get()) { - txn->endCommit(&connection.getBroker().getStore()); + tx.dischargeComplete(); + if (tx.buffer.get()) { + tx.buffer->endCommit(&connection.getBroker().getStore()); txCommitted(); - txn = boost::intrusive_ptr<TxBuffer>(); + tx.buffer.reset(); + QPID_LOG(debug, "Transaction " << tx.id << " comitted"); } else { throw Exception(qpid::amqp::error_conditions::transaction::ROLLBACK, "tranaction vanished during async commit"); } } else { //this is not on IO thread, need to delay processing until on IO thread - if (commitPending.boolCompareAndSwap(false, true)) { + if (tx.commitPending.boolCompareAndSwap(false, true)) { qpid::sys::Mutex::ScopedLock l(lock); if (!deleted) { out.activateOutput(); @@ -880,7 +896,7 @@ void IncomingToCoordinator::deliver(boost::intrusive_ptr<qpid::broker::amqp::Mes { if (message && message->isTypedBody()) { QPID_LOG(debug, "Coordinator got message: @" << message->getBodyDescriptor() << " " << message->getTypedBody()); - if (message->getBodyDescriptor().match(qpid::amqp::transaction::DECLARE_SYMBOL, qpid::amqp::transaction::DECLARE_CODE)) { + if (message->getBodyDescriptor().match(DECLARE_SYMBOL, DECLARE_CODE)) { std::string id = session.declare(); //encode the txn id in a 'declared' list on the disposition pn_data_t* data = pn_disposition_data(pn_delivery_local(delivery)); @@ -889,22 +905,38 @@ void IncomingToCoordinator::deliver(boost::intrusive_ptr<qpid::broker::amqp::Mes pn_data_put_binary(data, convert(id)); pn_data_exit(data); pn_data_exit(data); - pn_delivery_update(delivery, qpid::amqp::transaction::DECLARED_CODE); + pn_delivery_update(delivery, DECLARED_CODE); pn_delivery_settle(delivery); session.incomingMessageAccepted(); - } else if (message->getBodyDescriptor().match(qpid::amqp::transaction::DISCHARGE_SYMBOL, qpid::amqp::transaction::DISCHARGE_CODE)) { + QPID_LOG(debug, "Coordinator declared transaction " << id); + } else if (message->getBodyDescriptor().match(DISCHARGE_SYMBOL, DISCHARGE_CODE)) { if (message->getTypedBody().getType() == qpid::types::VAR_LIST) { qpid::types::Variant::List args = message->getTypedBody().asList(); qpid::types::Variant::List::const_iterator i = args.begin(); if (i != args.end()) { std::string id = *i; bool failed = ++i != args.end() ? i->asBool() : false; - session.discharge(id, failed); - DecodingIncoming::deliver(message, delivery);//ensures async completion of commit is taken care of + session.discharge(id, failed, delivery); } + + } else { + throw framing::IllegalArgumentException( + Msg() << "Coordinator unknown message: @" << + message->getBodyDescriptor() << " " << message->getTypedBody()); } } } } +Session::Transaction::Transaction(Session& s) : + session(s), id((boost::format("%1%") % &s).str()), discharge(0) {} + +// Called in IO thread to signal completion of dischage by settling discharge message. +void Session::Transaction::dischargeComplete() { + if (buffer.get() && discharge) { + session.accepted(discharge, false); // Queue up accept and activate output. + discharge = 0; + } +} + }}} // namespace qpid::broker::amqp |