diff options
| author | Gordon Sim <gsim@apache.org> | 2014-06-23 23:02:33 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2014-06-23 23:02:33 +0000 |
| commit | 93cb26f6a091767aba4d0ecbc7a21b0bbf642c02 (patch) | |
| tree | 3cca21ee5cf30a3dd8d6b0741a5633451e8384b0 /qpid/cpp/src | |
| parent | 48b2044314633271da97c1558dfcbb07641bc360 (diff) | |
| download | qpid-python-93cb26f6a091767aba4d0ecbc7a21b0bbf642c02.tar.gz | |
QPID-5828: Drop expired incoming messages
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1604953 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/client/SessionImpl.cpp | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp | 22 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/framing/FrameSet.cpp | 24 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/framing/FrameSet.h | 9 |
6 files changed, 51 insertions, 7 deletions
diff --git a/qpid/cpp/src/qpid/client/SessionImpl.cpp b/qpid/cpp/src/qpid/client/SessionImpl.cpp index 01e614e041..397067b37a 100644 --- a/qpid/cpp/src/qpid/client/SessionImpl.cpp +++ b/qpid/cpp/src/qpid/client/SessionImpl.cpp @@ -480,6 +480,7 @@ void SessionImpl::deliver(AMQFrame& frame) // network thread //as completion affects flow control; other commands will be //considered completed as soon as processed here if (arriving->isA<MessageTransferBody>()) { + arriving->setReceived(); Lock l(state); incompleteIn.add(arriving->getId()); } else { diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp index abf88c89c5..5d8a77999e 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp @@ -132,12 +132,16 @@ bool IncomingMessages::get(Handler& handler, qpid::sys::Duration timeout) AbsTime deadline(AbsTime::now(), timeout); do { //search through received list for any transfer of interest: - for (FrameSetQueue::iterator i = received.begin(); i != received.end(); i++) + for (FrameSetQueue::iterator i = received.begin(); i != received.end();) { MessageTransfer transfer(*i, *this); - if (handler.accept(transfer)) { + if (transfer.checkExpired()) { + i = received.erase(i); + } else if (handler.accept(transfer)) { received.erase(i); return true; + } else { + ++i; } } if (inUse) { @@ -260,7 +264,9 @@ bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration) for (Duration timeout = duration; pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) { if (content->isA<MessageTransferBody>()) { MessageTransfer transfer(content, *this); - if (handler && handler->accept(transfer)) { + if (transfer.checkExpired()) { + QPID_LOG(debug, "Expired received transfer: " << *content->getMethod()); + } else if (handler && handler->accept(transfer)) { QPID_LOG(debug, "Delivered " << *content->getMethod() << " " << *content->getHeaders()); return true; @@ -359,6 +365,16 @@ void IncomingMessages::MessageTransfer::retrieve(qpid::messaging::Message* messa parent.retrieve(content, message); } +bool IncomingMessages::MessageTransfer::checkExpired() +{ + if (content->hasExpired()) { + retrieve(0); + parent.accept(content->getId(), false); + return true; + } else { + return false; + } +} namespace { //TODO: unify conversion to and from 0-10 message that is currently diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h index ff1fb37f3d..c9ea0673a3 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h @@ -57,6 +57,7 @@ class IncomingMessages private: FrameSetPtr content; IncomingMessages& parent; + bool checkExpired(); MessageTransfer(FrameSetPtr, IncomingMessages&); friend class IncomingMessages; diff --git a/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp b/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp index 250725da53..f2b205a78a 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp @@ -24,6 +24,7 @@ #include "qpid/types/encodings.h" #include "qpid/types/Variant.h" #include "qpid/messaging/Address.h" +#include "qpid/messaging/Duration.h" #include "qpid/messaging/Message.h" #include "qpid/messaging/MessageImpl.h" #include "qpid/framing/enum.h" diff --git a/qpid/cpp/src/qpid/framing/FrameSet.cpp b/qpid/cpp/src/qpid/framing/FrameSet.cpp index 9aee7b98b9..4089475c7a 100644 --- a/qpid/cpp/src/qpid/framing/FrameSet.cpp +++ b/qpid/cpp/src/qpid/framing/FrameSet.cpp @@ -26,9 +26,11 @@ #include "qpid/framing/TypeFilter.h" using namespace qpid::framing; +using qpid::sys::AbsTime; +using qpid::sys::TIME_MSEC; -FrameSet::FrameSet(const SequenceNumber& _id) : id(_id),contentSize(0),recalculateSize(true) { } -FrameSet::FrameSet(const FrameSet& original) : id(original.id), contentSize(0), recalculateSize(true) +FrameSet::FrameSet(const SequenceNumber& _id) : id(_id),contentSize(0),recalculateSize(true),received(AbsTime::FarFuture()) { } +FrameSet::FrameSet(const FrameSet& original) : id(original.id), contentSize(0), recalculateSize(true), received(AbsTime::FarFuture()) { for (Frames::const_iterator i = original.begin(); i != original.end(); ++i) { parts.push_back(AMQFrame(*(i->getBody()))); @@ -106,3 +108,21 @@ std::string FrameSet::getContent() const { bool FrameSet::hasContent() const { return parts.size() >= 3; } + +void FrameSet::setReceived() +{ + received = AbsTime::now(); +} +namespace { +uint64_t MAX_TTL = std::numeric_limits<int64_t>::max()/TIME_MSEC; +} + +bool FrameSet::hasExpired() const +{ + const DeliveryProperties* props = getHeaderProperties<DeliveryProperties>(); + if (props && props->hasTtl() && props->getTtl() < MAX_TTL) { + AbsTime expiration(received, props->getTtl()*TIME_MSEC); + return expiration < AbsTime::now(); + } + return false; +} diff --git a/qpid/cpp/src/qpid/framing/FrameSet.h b/qpid/cpp/src/qpid/framing/FrameSet.h index 4188fd9b8c..9abd3ff096 100644 --- a/qpid/cpp/src/qpid/framing/FrameSet.h +++ b/qpid/cpp/src/qpid/framing/FrameSet.h @@ -26,6 +26,7 @@ #include "qpid/framing/amqp_framing.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/SequenceNumber.h" +#include "qpid/sys/Time.h" #include "qpid/CommonImportExport.h" namespace qpid { @@ -39,8 +40,9 @@ class FrameSet typedef InlineVector<AMQFrame, 4> Frames; const SequenceNumber id; Frames parts; - mutable uint64_t contentSize; - mutable bool recalculateSize; + mutable uint64_t contentSize; + mutable bool recalculateSize; + qpid::sys::AbsTime received; public: typedef boost::shared_ptr<FrameSet> shared_ptr; @@ -58,6 +60,9 @@ public: QPID_COMMON_EXTERN std::string getContent() const; QPID_COMMON_EXTERN bool hasContent() const; + QPID_COMMON_EXTERN void setReceived(); + QPID_COMMON_EXTERN bool hasExpired() const; + bool isContentBearing() const; QPID_COMMON_EXTERN const AMQMethodBody* getMethod() const; |
