summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2014-06-23 23:02:26 +0000
committerGordon Sim <gsim@apache.org>2014-06-23 23:02:26 +0000
commit48b2044314633271da97c1558dfcbb07641bc360 (patch)
treeeab749b112ac0b0f14906aac16736342910be6c3 /qpid/cpp/src
parent882fd35c3a84e4f2f6b84d1bccf324936de3f3fe (diff)
downloadqpid-python-48b2044314633271da97c1558dfcbb07641bc360.tar.gz
QPID-5828: Check and adjust ttl on resend attempt
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1604952 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp8
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h2
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp43
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h13
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp4
5 files changed, 60 insertions, 10 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
index e217a796f8..bbb310b0f4 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
@@ -642,9 +642,7 @@ void ExchangeSink::declare(qpid::client::AsyncSession& session, const std::strin
void ExchangeSink::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m)
{
- m.message.getDeliveryProperties().setRoutingKey(m.getSubject());
- m.status = session.messageTransfer(arg::destination=name, arg::content=m.message);
- QPID_LOG(debug, "Sending to exchange " << name << " " << m.message.getMessageProperties() << " " << m.message.getDeliveryProperties());
+ m.send(session, name, m.getSubject());
}
void ExchangeSink::cancel(qpid::client::AsyncSession& session, const std::string&)
@@ -663,9 +661,7 @@ void QueueSink::declare(qpid::client::AsyncSession& session, const std::string&)
}
void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m)
{
- m.message.getDeliveryProperties().setRoutingKey(name);
- m.status = session.messageTransfer(arg::content=m.message);
- QPID_LOG(debug, "Sending to queue " << name << " " << m.message.getMessageProperties() << " " << m.message.getDeliveryProperties());
+ m.send(session, name);
}
void QueueSink::cancel(qpid::client::AsyncSession& session, const std::string&)
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h b/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h
index 8d87a3c7bb..d66d2ecb3c 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h
@@ -33,7 +33,7 @@ class Message;
namespace client {
namespace amqp0_10 {
-struct OutgoingMessage;
+class OutgoingMessage;
/**
*
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp b/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
index 834ba1fe9f..250725da53 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
@@ -27,6 +27,7 @@
#include "qpid/messaging/Message.h"
#include "qpid/messaging/MessageImpl.h"
#include "qpid/framing/enum.h"
+#include "qpid/log/Statement.h"
#include <sstream>
namespace qpid {
@@ -111,6 +112,7 @@ void OutgoingMessage::convert(const qpid::messaging::Message& from)
if (i != from.getProperties().end()) {
message.getMessageProperties().setContentEncoding(i->second.asString());
}
+ base = qpid::sys::now();
}
void OutgoingMessage::setSubject(const std::string& s)
@@ -123,4 +125,45 @@ std::string OutgoingMessage::getSubject() const
return subject;
}
+void OutgoingMessage::send(qpid::client::AsyncSession& session, const std::string& destination, const std::string& routingKey)
+{
+ if (!expired) {
+ message.getDeliveryProperties().setRoutingKey(routingKey);
+ status = session.messageTransfer(arg::destination=destination, arg::content=message);
+ if (destination.empty()) {
+ QPID_LOG(debug, "Sending to queue " << routingKey << " " << message.getMessageProperties() << " " << message.getDeliveryProperties());
+ } else {
+ QPID_LOG(debug, "Sending to exchange " << destination << " " << message.getMessageProperties() << " " << message.getDeliveryProperties());
+ }
+ }
+}
+void OutgoingMessage::send(qpid::client::AsyncSession& session, const std::string& routingKey)
+{
+ send(session, std::string(), routingKey);
+}
+
+bool OutgoingMessage::isComplete()
+{
+ return expired || (status.isValid() && status.isComplete());
+}
+void OutgoingMessage::markRedelivered()
+{
+ message.setRedelivered(true);
+ if (message.getDeliveryProperties().hasTtl()) {
+ uint64_t delta = qpid::sys::Duration(base, qpid::sys::now())/qpid::sys::TIME_MSEC;
+ uint64_t ttl = message.getDeliveryProperties().getTtl();
+ if (ttl <= delta) {
+ QPID_LOG(debug, "Expiring outgoing message (" << ttl << " < " << delta << ")");
+ expired = true;
+ message.getDeliveryProperties().setTtl(1);
+ } else {
+ QPID_LOG(debug, "Adjusting ttl on outgoing message from " << ttl << " by " << delta);
+ ttl = ttl - delta;
+ message.getDeliveryProperties().setTtl(ttl);
+ }
+ }
+}
+OutgoingMessage::OutgoingMessage() : expired (false) {}
+
+
}}} // namespace qpid::client::amqp0_10
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h b/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h
index 2191f45546..a17ef03e10 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h
@@ -21,8 +21,10 @@
* under the License.
*
*/
+#include "qpid/client/AsyncSession.h"
#include "qpid/client/Completion.h"
#include "qpid/client/Message.h"
+#include "qpid/sys/Time.h"
namespace qpid {
namespace messaging {
@@ -31,15 +33,24 @@ class Message;
namespace client {
namespace amqp0_10 {
-struct OutgoingMessage
+class OutgoingMessage
{
+ private:
qpid::client::Message message;
qpid::client::Completion status;
std::string subject;
+ qpid::sys::AbsTime base;
+ bool expired;
+ public:
+ OutgoingMessage();
void convert(const qpid::messaging::Message&);
void setSubject(const std::string& subject);
std::string getSubject() const;
+ void send(qpid::client::AsyncSession& session, const std::string& destination, const std::string& routingKey);
+ void send(qpid::client::AsyncSession& session,const std::string& routingKey);
+ bool isComplete();
+ void markRedelivered();
};
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
index 602c819ca3..9d862e79a4 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
@@ -140,7 +140,7 @@ void SenderImpl::sendUnreliable(const qpid::messaging::Message& m)
void SenderImpl::replay(const sys::Mutex::ScopedLock&)
{
for (OutgoingMessages::iterator i = outgoing.begin(); i != outgoing.end(); ++i) {
- i->message.setRedelivered(true);
+ i->markRedelivered();
sink->send(session, name, *i);
}
}
@@ -158,7 +158,7 @@ uint32_t SenderImpl::checkPendingSends(bool flush, const sys::Mutex::ScopedLock&
} else {
flushed = false;
}
- while (!outgoing.empty() && outgoing.front().status.isValid() && outgoing.front().status.isComplete()) {
+ while (!outgoing.empty() && outgoing.front().isComplete()) {
outgoing.pop_front();
}
return outgoing.size();