summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2013-10-15 12:41:53 +0000
committerGordon Sim <gsim@apache.org>2013-10-15 12:41:53 +0000
commit16a317a30e82a3f83739f8953ad67babedc6c646 (patch)
treebd7f569ad56383b51a93579c83ab582d72b60b9b /qpid/cpp/src
parent8ec3ef4b4bd0fa0478542e194f1ea81953de21b9 (diff)
downloadqpid-python-16a317a30e82a3f83739f8953ad67babedc6c646.tar.gz
QPID-5229: implement release and reject
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1532307 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp5
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp8
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h1
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp17
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SessionContext.h1
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp9
6 files changed, 34 insertions, 7 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
index 6b9da64017..31704cca73 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
@@ -136,11 +136,12 @@ void OutgoingFromQueue::handle(pn_delivery_t* delivery)
outgoingMessageRejected();
break;
case PN_RELEASED:
- if (preAcquires()) queue->release(r.cursor, false);//TODO: for PN_RELEASED, delivery count should not be incremented
+ if (preAcquires()) queue->release(r.cursor, false);//for PN_RELEASED, delivery count should not be incremented
outgoingMessageRejected();//TODO: not quite true...
break;
case PN_MODIFIED:
- if (preAcquires()) queue->release(r.cursor, true);//TODO: proper handling of modified
+ if (preAcquires()) queue->release(r.cursor, pn_disposition_is_failed(pn_delivery_remote(delivery)));
+ //TODO: handle undeliverable-here and message-annotations
outgoingMessageRejected();//TODO: not quite true...
break;
default:
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
index 13d943757e..b9af682aa0 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
@@ -234,6 +234,14 @@ void ConnectionContext::acknowledge(boost::shared_ptr<SessionContext> ssn, qpid:
wakeupDriver();
}
+void ConnectionContext::nack(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message& message, bool reject)
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ checkClosed(ssn);
+ ssn->nack(MessageImplAccess::get(message).getInternalId(), reject);
+ wakeupDriver();
+}
+
void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk)
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
index 2fdba7a3b2..eb1be8e5fd 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
@@ -81,6 +81,7 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag
bool fetch(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
bool get(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
void acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative);
+ void nack(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message& message, bool reject);
void setOption(const std::string& name, const qpid::types::Variant& value);
std::string getAuthenticatedUsername();
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
index 8f2a7d15d8..e13ccdbc36 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
@@ -140,6 +140,23 @@ void SessionContext::acknowledge(const qpid::framing::SequenceNumber& id, bool c
}
}
+void SessionContext::nack(const qpid::framing::SequenceNumber& id, bool reject)
+{
+ DeliveryMap::iterator i = unacked.find(id);
+ if (i != unacked.end()) {
+ if (reject) {
+ QPID_LOG(debug, "rejecting message with id=" << id);
+ pn_delivery_update(i->second, PN_REJECTED);
+ } else {
+ QPID_LOG(debug, "releasing message with id=" << id);
+ pn_delivery_update(i->second, PN_MODIFIED);
+ pn_disposition_set_failed(pn_delivery_local(i->second), true);
+ }
+ pn_delivery_settle(i->second);
+ unacked.erase(i);
+ }
+}
+
bool SessionContext::settled()
{
bool result = true;
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
index 5d68f6d8de..75a67a7d15 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
@@ -79,6 +79,7 @@ class SessionContext
void acknowledge();
void acknowledge(const qpid::framing::SequenceNumber& id, bool cummulative);
void acknowledge(DeliveryMap::iterator begin, DeliveryMap::iterator end);
+ void nack(const qpid::framing::SequenceNumber& id, bool reject);
};
}}} // namespace qpid::messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp
index 45635e4ced..50fe4ef30e 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp
@@ -57,18 +57,17 @@ void SessionHandle::acknowledge(bool /*sync*/)
void SessionHandle::acknowledge(qpid::messaging::Message& msg, bool cumulative)
{
- //TODO: handle cumulative
connection->acknowledge(session, &msg, cumulative);
}
-void SessionHandle::reject(qpid::messaging::Message&)
+void SessionHandle::reject(qpid::messaging::Message& msg)
{
-
+ connection->nack(session, msg, true);
}
-void SessionHandle::release(qpid::messaging::Message&)
+void SessionHandle::release(qpid::messaging::Message& msg)
{
-
+ connection->nack(session, msg, false);
}
void SessionHandle::close()