summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/amqp/Session.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/amqp/Session.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Session.cpp120
1 files changed, 76 insertions, 44 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
index 0e44374d19..3b65e6a64d 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
@@ -61,6 +61,8 @@ namespace qpid {
namespace broker {
namespace amqp {
+using namespace qpid::amqp::transaction;
+
namespace {
pn_bytes_t convert(const std::string& s)
{
@@ -209,6 +211,7 @@ class IncomingToCoordinator : public DecodingIncoming
public:
IncomingToCoordinator(pn_link_t* link, Broker& broker, Session& parent)
: DecodingIncoming(link, broker, parent, std::string(), "txn-ctrl", pn_link_name(link)) {}
+
~IncomingToCoordinator() { session.abort(); }
void deliver(boost::intrusive_ptr<qpid::broker::amqp::Message>, pn_delivery_t*);
void handle(qpid::broker::Message&, qpid::broker::TxBuffer*) {}
@@ -218,7 +221,9 @@ class IncomingToCoordinator : public DecodingIncoming
Session::Session(pn_session_t* s, Connection& c, qpid::sys::OutputControl& o)
: ManagedSession(c.getBroker(), c, (boost::format("%1%") % s).str()), session(s), connection(c), out(o), deleted(false),
authorise(connection.getUserId(), connection.getBroker().getAcl()),
- detachRequested(), txnId((boost::format("%1%") % s).str()) {}
+ detachRequested(),
+ tx(*this)
+{}
Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* terminus, bool incoming)
@@ -636,11 +641,12 @@ void Session::readable(pn_link_t* link, pn_delivery_t* delivery)
if (target->second->haveWork()) out.activateOutput();
}
}
+
void Session::writable(pn_link_t* link, pn_delivery_t* delivery)
{
OutgoingLinks::iterator sender = outgoing.find(link);
if (sender == outgoing.end()) {
- QPID_LOG(error, "Delivery returned for unknown link");
+ QPID_LOG(error, "Delivery returned for unknown link " << pn_link_name(link));
} else {
sender->second->handle(delivery);
}
@@ -649,7 +655,7 @@ void Session::writable(pn_link_t* link, pn_delivery_t* delivery)
bool Session::dispatch()
{
bool output(false);
- if (commitPending.boolCompareAndSwap(true, false)) {
+ if (tx.commitPending.boolCompareAndSwap(true, false)) {
committed(true);
}
for (OutgoingLinks::iterator s = outgoing.begin(); s != outgoing.end();) {
@@ -737,7 +743,7 @@ void Session::detachedByManagement()
TxBuffer* Session::getTransaction(const std::string& id)
{
- return (txn.get() && id == txnId) ? txn.get() : 0;
+ return (tx.buffer.get() && id == tx.id) ? tx.buffer.get() : 0;
}
TxBuffer* Session::getTransaction(pn_delivery_t* delivery)
@@ -748,42 +754,41 @@ TxBuffer* Session::getTransaction(pn_delivery_t* delivery)
std::pair<TxBuffer*,uint64_t> Session::getTransactionalState(pn_delivery_t* delivery)
{
std::pair<TxBuffer*,uint64_t> result((TxBuffer*)0, 0);
- if (pn_delivery_remote_state(delivery) == qpid::amqp::transaction::TRANSACTIONAL_STATE_CODE) {
+ if (pn_delivery_remote_state(delivery) == TRANSACTIONAL_STATE_CODE) {
pn_data_t* data = pn_disposition_data(pn_delivery_remote(delivery));
- if (data && pn_data_next(data)) {
- size_t count = pn_data_get_list(data);
- if (count > 0) {
+ pn_data_rewind(data);
+ size_t count = 0;
+ if (data && pn_data_next(data) && (count = pn_data_get_list(data)) > 0) {
+ pn_data_enter(data);
+ pn_data_next(data);
+ std::string id = convert(pn_data_get_binary(data));
+ result.first = getTransaction(id);
+ if (!result.first) {
+ QPID_LOG(error, "Transaction not found for id: " << id);
+ }
+ if (count > 1 && pn_data_next(data)) {
pn_data_enter(data);
pn_data_next(data);
- std::string id = convert(pn_data_get_binary(data));
- result.first = getTransaction(id);
- if (!result.first) {
- QPID_LOG(error, "Transaction not found for id: " << id);
- }
- if (count > 1 && pn_data_next(data) && pn_data_is_described(data)) {
- pn_data_enter(data);
- pn_data_next(data);
- result.second = pn_data_get_ulong(data);
- }
- pn_data_exit(data);
+ result.second = pn_data_get_ulong(data);
}
- } else {
- QPID_LOG(error, "Transactional delivery " << delivery << " appears to have no data");
}
+ else
+ QPID_LOG(error, "Transactional delivery " << delivery << " appears to have no data");
}
return result;
}
std::string Session::declare()
{
- if (txn.get()) {
+ if (tx.buffer.get()) {
//not sure what the error code should be; none in spec really fit well.
- throw Exception(qpid::amqp::error_conditions::transaction::ROLLBACK, "Session only supports one transaction active at a time");
+ throw Exception(qpid::amqp::error_conditions::transaction::ROLLBACK,
+ "Session only supports one transaction active at a time");
}
- txn = boost::intrusive_ptr<TxBuffer>(new TxBuffer());
- connection.getBroker().getBrokerObservers().startTx(txn);
+ tx.buffer = boost::intrusive_ptr<TxBuffer>(new TxBuffer());
+ connection.getBroker().getBrokerObservers().startTx(tx.buffer);
txStarted();
- return txnId;
+ return tx.id;
}
namespace {
@@ -797,32 +802,41 @@ namespace {
boost::intrusive_ptr<qpid::broker::AsyncCompletion::Callback> copy(new AsyncCommit(session));
return copy;
}
+
private:
boost::shared_ptr<Session> session;
};
}
-void Session::discharge(const std::string& id, bool failed)
+void Session::discharge(const std::string& id, bool failed, pn_delivery_t* delivery)
{
- if (!txn.get() || id != txnId) {
- throw Exception(qpid::amqp::error_conditions::transaction::UNKNOWN_ID, "No transaction declared with that id");
+ QPID_LOG(debug, "Coordinator " << (failed ? " rollback" : " commit")
+ << " transaction " << id);
+ if (!tx.buffer.get() || id != tx.id) {
+ throw Exception(qpid::amqp::error_conditions::transaction::UNKNOWN_ID,
+ Msg() << "Cannot discharge transaction " << id
+ << (tx.buffer.get() ? Msg() << ", current transaction is " << tx.id :
+ Msg() << ", no current transaction"));
}
+ tx.discharge = delivery;
if (failed) {
abort();
} else {
- txn->begin();
- txn->startCommit(&connection.getBroker().getStore());
+ tx.buffer->begin();
+ tx.buffer->startCommit(&connection.getBroker().getStore());
AsyncCommit callback(shared_from_this());
- txn->end(callback);
+ tx.buffer->end(callback);
}
}
void Session::abort()
{
- if (txn) {
- txn->rollback();
+ if (tx.buffer) {
+ tx.dischargeComplete();
+ tx.buffer->rollback();
txAborted();
- txn = boost::intrusive_ptr<TxBuffer>();
+ tx.buffer.reset();
+ QPID_LOG(debug, "Transaction " << tx.id << " rolled back");
}
}
@@ -830,16 +844,18 @@ void Session::committed(bool sync)
{
if (sync) {
//this is on IO thread
- if (txn.get()) {
- txn->endCommit(&connection.getBroker().getStore());
+ tx.dischargeComplete();
+ if (tx.buffer.get()) {
+ tx.buffer->endCommit(&connection.getBroker().getStore());
txCommitted();
- txn = boost::intrusive_ptr<TxBuffer>();
+ tx.buffer.reset();
+ QPID_LOG(debug, "Transaction " << tx.id << " comitted");
} else {
throw Exception(qpid::amqp::error_conditions::transaction::ROLLBACK, "tranaction vanished during async commit");
}
} else {
//this is not on IO thread, need to delay processing until on IO thread
- if (commitPending.boolCompareAndSwap(false, true)) {
+ if (tx.commitPending.boolCompareAndSwap(false, true)) {
qpid::sys::Mutex::ScopedLock l(lock);
if (!deleted) {
out.activateOutput();
@@ -880,7 +896,7 @@ void IncomingToCoordinator::deliver(boost::intrusive_ptr<qpid::broker::amqp::Mes
{
if (message && message->isTypedBody()) {
QPID_LOG(debug, "Coordinator got message: @" << message->getBodyDescriptor() << " " << message->getTypedBody());
- if (message->getBodyDescriptor().match(qpid::amqp::transaction::DECLARE_SYMBOL, qpid::amqp::transaction::DECLARE_CODE)) {
+ if (message->getBodyDescriptor().match(DECLARE_SYMBOL, DECLARE_CODE)) {
std::string id = session.declare();
//encode the txn id in a 'declared' list on the disposition
pn_data_t* data = pn_disposition_data(pn_delivery_local(delivery));
@@ -889,22 +905,38 @@ void IncomingToCoordinator::deliver(boost::intrusive_ptr<qpid::broker::amqp::Mes
pn_data_put_binary(data, convert(id));
pn_data_exit(data);
pn_data_exit(data);
- pn_delivery_update(delivery, qpid::amqp::transaction::DECLARED_CODE);
+ pn_delivery_update(delivery, DECLARED_CODE);
pn_delivery_settle(delivery);
session.incomingMessageAccepted();
- } else if (message->getBodyDescriptor().match(qpid::amqp::transaction::DISCHARGE_SYMBOL, qpid::amqp::transaction::DISCHARGE_CODE)) {
+ QPID_LOG(debug, "Coordinator declared transaction " << id);
+ } else if (message->getBodyDescriptor().match(DISCHARGE_SYMBOL, DISCHARGE_CODE)) {
if (message->getTypedBody().getType() == qpid::types::VAR_LIST) {
qpid::types::Variant::List args = message->getTypedBody().asList();
qpid::types::Variant::List::const_iterator i = args.begin();
if (i != args.end()) {
std::string id = *i;
bool failed = ++i != args.end() ? i->asBool() : false;
- session.discharge(id, failed);
- DecodingIncoming::deliver(message, delivery);//ensures async completion of commit is taken care of
+ session.discharge(id, failed, delivery);
}
+
+ } else {
+ throw framing::IllegalArgumentException(
+ Msg() << "Coordinator unknown message: @" <<
+ message->getBodyDescriptor() << " " << message->getTypedBody());
}
}
}
}
+Session::Transaction::Transaction(Session& s) :
+ session(s), id((boost::format("%1%") % &s).str()), discharge(0) {}
+
+// Called in IO thread to signal completion of dischage by settling discharge message.
+void Session::Transaction::dischargeComplete() {
+ if (buffer.get() && discharge) {
+ session.accepted(discharge, false); // Queue up accept and activate output.
+ discharge = 0;
+ }
+}
+
}}} // namespace qpid::broker::amqp