diff options
| author | Gordon Sim <gsim@apache.org> | 2014-06-23 23:02:26 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2014-06-23 23:02:26 +0000 |
| commit | 48b2044314633271da97c1558dfcbb07641bc360 (patch) | |
| tree | eab749b112ac0b0f14906aac16736342910be6c3 /qpid/cpp/src | |
| parent | 882fd35c3a84e4f2f6b84d1bccf324936de3f3fe (diff) | |
| download | qpid-python-48b2044314633271da97c1558dfcbb07641bc360.tar.gz | |
QPID-5828: Check and adjust ttl on resend attempt
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1604952 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
5 files changed, 60 insertions, 10 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp index e217a796f8..bbb310b0f4 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp @@ -642,9 +642,7 @@ void ExchangeSink::declare(qpid::client::AsyncSession& session, const std::strin void ExchangeSink::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m) { - m.message.getDeliveryProperties().setRoutingKey(m.getSubject()); - m.status = session.messageTransfer(arg::destination=name, arg::content=m.message); - QPID_LOG(debug, "Sending to exchange " << name << " " << m.message.getMessageProperties() << " " << m.message.getDeliveryProperties()); + m.send(session, name, m.getSubject()); } void ExchangeSink::cancel(qpid::client::AsyncSession& session, const std::string&) @@ -663,9 +661,7 @@ void QueueSink::declare(qpid::client::AsyncSession& session, const std::string&) } void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m) { - m.message.getDeliveryProperties().setRoutingKey(name); - m.status = session.messageTransfer(arg::content=m.message); - QPID_LOG(debug, "Sending to queue " << name << " " << m.message.getMessageProperties() << " " << m.message.getDeliveryProperties()); + m.send(session, name); } void QueueSink::cancel(qpid::client::AsyncSession& session, const std::string&) diff --git a/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h b/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h index 8d87a3c7bb..d66d2ecb3c 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h @@ -33,7 +33,7 @@ class Message; namespace client { namespace amqp0_10 { -struct OutgoingMessage; +class OutgoingMessage; /** * diff --git a/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp b/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp index 834ba1fe9f..250725da53 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp @@ -27,6 +27,7 @@ #include "qpid/messaging/Message.h" #include "qpid/messaging/MessageImpl.h" #include "qpid/framing/enum.h" +#include "qpid/log/Statement.h" #include <sstream> namespace qpid { @@ -111,6 +112,7 @@ void OutgoingMessage::convert(const qpid::messaging::Message& from) if (i != from.getProperties().end()) { message.getMessageProperties().setContentEncoding(i->second.asString()); } + base = qpid::sys::now(); } void OutgoingMessage::setSubject(const std::string& s) @@ -123,4 +125,45 @@ std::string OutgoingMessage::getSubject() const return subject; } +void OutgoingMessage::send(qpid::client::AsyncSession& session, const std::string& destination, const std::string& routingKey) +{ + if (!expired) { + message.getDeliveryProperties().setRoutingKey(routingKey); + status = session.messageTransfer(arg::destination=destination, arg::content=message); + if (destination.empty()) { + QPID_LOG(debug, "Sending to queue " << routingKey << " " << message.getMessageProperties() << " " << message.getDeliveryProperties()); + } else { + QPID_LOG(debug, "Sending to exchange " << destination << " " << message.getMessageProperties() << " " << message.getDeliveryProperties()); + } + } +} +void OutgoingMessage::send(qpid::client::AsyncSession& session, const std::string& routingKey) +{ + send(session, std::string(), routingKey); +} + +bool OutgoingMessage::isComplete() +{ + return expired || (status.isValid() && status.isComplete()); +} +void OutgoingMessage::markRedelivered() +{ + message.setRedelivered(true); + if (message.getDeliveryProperties().hasTtl()) { + uint64_t delta = qpid::sys::Duration(base, qpid::sys::now())/qpid::sys::TIME_MSEC; + uint64_t ttl = message.getDeliveryProperties().getTtl(); + if (ttl <= delta) { + QPID_LOG(debug, "Expiring outgoing message (" << ttl << " < " << delta << ")"); + expired = true; + message.getDeliveryProperties().setTtl(1); + } else { + QPID_LOG(debug, "Adjusting ttl on outgoing message from " << ttl << " by " << delta); + ttl = ttl - delta; + message.getDeliveryProperties().setTtl(ttl); + } + } +} +OutgoingMessage::OutgoingMessage() : expired (false) {} + + }}} // namespace qpid::client::amqp0_10 diff --git a/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h b/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h index 2191f45546..a17ef03e10 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h @@ -21,8 +21,10 @@ * under the License. * */ +#include "qpid/client/AsyncSession.h" #include "qpid/client/Completion.h" #include "qpid/client/Message.h" +#include "qpid/sys/Time.h" namespace qpid { namespace messaging { @@ -31,15 +33,24 @@ class Message; namespace client { namespace amqp0_10 { -struct OutgoingMessage +class OutgoingMessage { + private: qpid::client::Message message; qpid::client::Completion status; std::string subject; + qpid::sys::AbsTime base; + bool expired; + public: + OutgoingMessage(); void convert(const qpid::messaging::Message&); void setSubject(const std::string& subject); std::string getSubject() const; + void send(qpid::client::AsyncSession& session, const std::string& destination, const std::string& routingKey); + void send(qpid::client::AsyncSession& session,const std::string& routingKey); + bool isComplete(); + void markRedelivered(); }; diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp index 602c819ca3..9d862e79a4 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp @@ -140,7 +140,7 @@ void SenderImpl::sendUnreliable(const qpid::messaging::Message& m) void SenderImpl::replay(const sys::Mutex::ScopedLock&) { for (OutgoingMessages::iterator i = outgoing.begin(); i != outgoing.end(); ++i) { - i->message.setRedelivered(true); + i->markRedelivered(); sink->send(session, name, *i); } } @@ -158,7 +158,7 @@ uint32_t SenderImpl::checkPendingSends(bool flush, const sys::Mutex::ScopedLock& } else { flushed = false; } - while (!outgoing.empty() && outgoing.front().status.isValid() && outgoing.front().status.isComplete()) { + while (!outgoing.empty() && outgoing.front().isComplete()) { outgoing.pop_front(); } return outgoing.size(); |
