summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2014-06-23 23:02:33 +0000
committerGordon Sim <gsim@apache.org>2014-06-23 23:02:33 +0000
commit93cb26f6a091767aba4d0ecbc7a21b0bbf642c02 (patch)
tree3cca21ee5cf30a3dd8d6b0741a5633451e8384b0 /qpid/cpp/src
parent48b2044314633271da97c1558dfcbb07641bc360 (diff)
downloadqpid-python-93cb26f6a091767aba4d0ecbc7a21b0bbf642c02.tar.gz
QPID-5828: Drop expired incoming messages
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1604953 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/client/SessionImpl.cpp1
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp22
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h1
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp1
-rw-r--r--qpid/cpp/src/qpid/framing/FrameSet.cpp24
-rw-r--r--qpid/cpp/src/qpid/framing/FrameSet.h9
6 files changed, 51 insertions, 7 deletions
diff --git a/qpid/cpp/src/qpid/client/SessionImpl.cpp b/qpid/cpp/src/qpid/client/SessionImpl.cpp
index 01e614e041..397067b37a 100644
--- a/qpid/cpp/src/qpid/client/SessionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/SessionImpl.cpp
@@ -480,6 +480,7 @@ void SessionImpl::deliver(AMQFrame& frame) // network thread
//as completion affects flow control; other commands will be
//considered completed as soon as processed here
if (arriving->isA<MessageTransferBody>()) {
+ arriving->setReceived();
Lock l(state);
incompleteIn.add(arriving->getId());
} else {
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
index abf88c89c5..5d8a77999e 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
@@ -132,12 +132,16 @@ bool IncomingMessages::get(Handler& handler, qpid::sys::Duration timeout)
AbsTime deadline(AbsTime::now(), timeout);
do {
//search through received list for any transfer of interest:
- for (FrameSetQueue::iterator i = received.begin(); i != received.end(); i++)
+ for (FrameSetQueue::iterator i = received.begin(); i != received.end();)
{
MessageTransfer transfer(*i, *this);
- if (handler.accept(transfer)) {
+ if (transfer.checkExpired()) {
+ i = received.erase(i);
+ } else if (handler.accept(transfer)) {
received.erase(i);
return true;
+ } else {
+ ++i;
}
}
if (inUse) {
@@ -260,7 +264,9 @@ bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration)
for (Duration timeout = duration; pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) {
if (content->isA<MessageTransferBody>()) {
MessageTransfer transfer(content, *this);
- if (handler && handler->accept(transfer)) {
+ if (transfer.checkExpired()) {
+ QPID_LOG(debug, "Expired received transfer: " << *content->getMethod());
+ } else if (handler && handler->accept(transfer)) {
QPID_LOG(debug, "Delivered " << *content->getMethod() << " "
<< *content->getHeaders());
return true;
@@ -359,6 +365,16 @@ void IncomingMessages::MessageTransfer::retrieve(qpid::messaging::Message* messa
parent.retrieve(content, message);
}
+bool IncomingMessages::MessageTransfer::checkExpired()
+{
+ if (content->hasExpired()) {
+ retrieve(0);
+ parent.accept(content->getId(), false);
+ return true;
+ } else {
+ return false;
+ }
+}
namespace {
//TODO: unify conversion to and from 0-10 message that is currently
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
index ff1fb37f3d..c9ea0673a3 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
@@ -57,6 +57,7 @@ class IncomingMessages
private:
FrameSetPtr content;
IncomingMessages& parent;
+ bool checkExpired();
MessageTransfer(FrameSetPtr, IncomingMessages&);
friend class IncomingMessages;
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp b/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
index 250725da53..f2b205a78a 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
@@ -24,6 +24,7 @@
#include "qpid/types/encodings.h"
#include "qpid/types/Variant.h"
#include "qpid/messaging/Address.h"
+#include "qpid/messaging/Duration.h"
#include "qpid/messaging/Message.h"
#include "qpid/messaging/MessageImpl.h"
#include "qpid/framing/enum.h"
diff --git a/qpid/cpp/src/qpid/framing/FrameSet.cpp b/qpid/cpp/src/qpid/framing/FrameSet.cpp
index 9aee7b98b9..4089475c7a 100644
--- a/qpid/cpp/src/qpid/framing/FrameSet.cpp
+++ b/qpid/cpp/src/qpid/framing/FrameSet.cpp
@@ -26,9 +26,11 @@
#include "qpid/framing/TypeFilter.h"
using namespace qpid::framing;
+using qpid::sys::AbsTime;
+using qpid::sys::TIME_MSEC;
-FrameSet::FrameSet(const SequenceNumber& _id) : id(_id),contentSize(0),recalculateSize(true) { }
-FrameSet::FrameSet(const FrameSet& original) : id(original.id), contentSize(0), recalculateSize(true)
+FrameSet::FrameSet(const SequenceNumber& _id) : id(_id),contentSize(0),recalculateSize(true),received(AbsTime::FarFuture()) { }
+FrameSet::FrameSet(const FrameSet& original) : id(original.id), contentSize(0), recalculateSize(true), received(AbsTime::FarFuture())
{
for (Frames::const_iterator i = original.begin(); i != original.end(); ++i) {
parts.push_back(AMQFrame(*(i->getBody())));
@@ -106,3 +108,21 @@ std::string FrameSet::getContent() const {
bool FrameSet::hasContent() const {
return parts.size() >= 3;
}
+
+void FrameSet::setReceived()
+{
+ received = AbsTime::now();
+}
+namespace {
+uint64_t MAX_TTL = std::numeric_limits<int64_t>::max()/TIME_MSEC;
+}
+
+bool FrameSet::hasExpired() const
+{
+ const DeliveryProperties* props = getHeaderProperties<DeliveryProperties>();
+ if (props && props->hasTtl() && props->getTtl() < MAX_TTL) {
+ AbsTime expiration(received, props->getTtl()*TIME_MSEC);
+ return expiration < AbsTime::now();
+ }
+ return false;
+}
diff --git a/qpid/cpp/src/qpid/framing/FrameSet.h b/qpid/cpp/src/qpid/framing/FrameSet.h
index 4188fd9b8c..9abd3ff096 100644
--- a/qpid/cpp/src/qpid/framing/FrameSet.h
+++ b/qpid/cpp/src/qpid/framing/FrameSet.h
@@ -26,6 +26,7 @@
#include "qpid/framing/amqp_framing.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/SequenceNumber.h"
+#include "qpid/sys/Time.h"
#include "qpid/CommonImportExport.h"
namespace qpid {
@@ -39,8 +40,9 @@ class FrameSet
typedef InlineVector<AMQFrame, 4> Frames;
const SequenceNumber id;
Frames parts;
- mutable uint64_t contentSize;
- mutable bool recalculateSize;
+ mutable uint64_t contentSize;
+ mutable bool recalculateSize;
+ qpid::sys::AbsTime received;
public:
typedef boost::shared_ptr<FrameSet> shared_ptr;
@@ -58,6 +60,9 @@ public:
QPID_COMMON_EXTERN std::string getContent() const;
QPID_COMMON_EXTERN bool hasContent() const;
+ QPID_COMMON_EXTERN void setReceived();
+ QPID_COMMON_EXTERN bool hasExpired() const;
+
bool isContentBearing() const;
QPID_COMMON_EXTERN const AMQMethodBody* getMethod() const;