diff options
| author | Gordon Sim <gsim@apache.org> | 2013-10-15 12:41:53 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2013-10-15 12:41:53 +0000 |
| commit | 16a317a30e82a3f83739f8953ad67babedc6c646 (patch) | |
| tree | bd7f569ad56383b51a93579c83ab582d72b60b9b /qpid/cpp/src | |
| parent | 8ec3ef4b4bd0fa0478542e194f1ea81953de21b9 (diff) | |
| download | qpid-python-16a317a30e82a3f83739f8953ad67babedc6c646.tar.gz | |
QPID-5229: implement release and reject
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1532307 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
6 files changed, 34 insertions, 7 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp index 6b9da64017..31704cca73 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp @@ -136,11 +136,12 @@ void OutgoingFromQueue::handle(pn_delivery_t* delivery) outgoingMessageRejected(); break; case PN_RELEASED: - if (preAcquires()) queue->release(r.cursor, false);//TODO: for PN_RELEASED, delivery count should not be incremented + if (preAcquires()) queue->release(r.cursor, false);//for PN_RELEASED, delivery count should not be incremented outgoingMessageRejected();//TODO: not quite true... break; case PN_MODIFIED: - if (preAcquires()) queue->release(r.cursor, true);//TODO: proper handling of modified + if (preAcquires()) queue->release(r.cursor, pn_disposition_is_failed(pn_delivery_remote(delivery))); + //TODO: handle undeliverable-here and message-annotations outgoingMessageRejected();//TODO: not quite true... break; default: diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp index 13d943757e..b9af682aa0 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -234,6 +234,14 @@ void ConnectionContext::acknowledge(boost::shared_ptr<SessionContext> ssn, qpid: wakeupDriver(); } +void ConnectionContext::nack(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message& message, bool reject) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + checkClosed(ssn); + ssn->nack(MessageImplAccess::get(message).getInternalId(), reject); + wakeupDriver(); +} + void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk) { qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h index 2fdba7a3b2..eb1be8e5fd 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h @@ -81,6 +81,7 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag bool fetch(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout); bool get(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout); void acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative); + void nack(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message& message, bool reject); void setOption(const std::string& name, const qpid::types::Variant& value); std::string getAuthenticatedUsername(); diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp index 8f2a7d15d8..e13ccdbc36 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp @@ -140,6 +140,23 @@ void SessionContext::acknowledge(const qpid::framing::SequenceNumber& id, bool c } } +void SessionContext::nack(const qpid::framing::SequenceNumber& id, bool reject) +{ + DeliveryMap::iterator i = unacked.find(id); + if (i != unacked.end()) { + if (reject) { + QPID_LOG(debug, "rejecting message with id=" << id); + pn_delivery_update(i->second, PN_REJECTED); + } else { + QPID_LOG(debug, "releasing message with id=" << id); + pn_delivery_update(i->second, PN_MODIFIED); + pn_disposition_set_failed(pn_delivery_local(i->second), true); + } + pn_delivery_settle(i->second); + unacked.erase(i); + } +} + bool SessionContext::settled() { bool result = true; diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h index 5d68f6d8de..75a67a7d15 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h @@ -79,6 +79,7 @@ class SessionContext void acknowledge(); void acknowledge(const qpid::framing::SequenceNumber& id, bool cummulative); void acknowledge(DeliveryMap::iterator begin, DeliveryMap::iterator end); + void nack(const qpid::framing::SequenceNumber& id, bool reject); }; }}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp index 45635e4ced..50fe4ef30e 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp @@ -57,18 +57,17 @@ void SessionHandle::acknowledge(bool /*sync*/) void SessionHandle::acknowledge(qpid::messaging::Message& msg, bool cumulative) { - //TODO: handle cumulative connection->acknowledge(session, &msg, cumulative); } -void SessionHandle::reject(qpid::messaging::Message&) +void SessionHandle::reject(qpid::messaging::Message& msg) { - + connection->nack(session, msg, true); } -void SessionHandle::release(qpid::messaging::Message&) +void SessionHandle::release(qpid::messaging::Message& msg) { - + connection->nack(session, msg, false); } void SessionHandle::close() |
