summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2013-11-01 10:33:30 +0000
committerGordon Sim <gsim@apache.org>2013-11-01 10:33:30 +0000
commit245ae6f79cc3c818603f9952290cc9f33700615a (patch)
treea7bc031784df4928d49acc34477b51e32195ace5 /qpid/cpp
parent81c95a528a716800efac7b90667c83dfe2149c6e (diff)
downloadqpid-python-245ae6f79cc3c818603f9952290cc9f33700615a.tar.gz
QPID-5284: ensure timestamp is added to the data that is persisted
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1537889 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/Message.cpp7
-rw-r--r--qpid/cpp/src/qpid/broker/Message.h3
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.h2
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Message.cpp7
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Message.h1
-rw-r--r--qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp25
-rw-r--r--qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h4
-rw-r--r--qpid/cpp/src/tests/legacystore/MessageUtils.h2
10 files changed, 37 insertions, 22 deletions
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp
index deca238f22..db6149158a 100644
--- a/qpid/cpp/src/qpid/broker/Message.cpp
+++ b/qpid/cpp/src/qpid/broker/Message.cpp
@@ -121,14 +121,9 @@ void Message::clearTrace()
annotationsChanged();
}
-void Message::setTimestamp()
-{
- timestamp = ::time(0); // AMQP-0.10: posix time_t - secs since Epoch
-}
-
uint64_t Message::getTimestamp() const
{
- return timestamp;
+ return encoding ? encoding->getTimestamp() : 0;
}
uint64_t Message::getTtl() const
diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h
index fdc919242e..38cfb2c1b8 100644
--- a/qpid/cpp/src/qpid/broker/Message.h
+++ b/qpid/cpp/src/qpid/broker/Message.h
@@ -77,6 +77,7 @@ public:
virtual std::string getContent() const = 0;
virtual void processProperties(qpid::amqp::MapHandler&) const = 0;
virtual std::string getUserId() const = 0;
+ virtual uint64_t getTimestamp() const = 0;
};
QPID_BROKER_EXTERN Message(boost::intrusive_ptr<Encoding>, boost::intrusive_ptr<PersistableMessage>);
@@ -106,8 +107,6 @@ public:
uint64_t getTtl() const;
QPID_BROKER_EXTERN bool getTtl(uint64_t&) const;
- /** set the timestamp delivery property to the current time-of-day */
- QPID_BROKER_EXTERN void setTimestamp();
QPID_BROKER_EXTERN uint64_t getTimestamp() const;
QPID_BROKER_EXTERN void addAnnotation(const std::string& key, const qpid::types::Variant& value);
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index ce7071c5ae..4d5ea3b5b6 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -388,7 +388,7 @@ bool SemanticStateConsumerImpl::deliver(const QueueCursor& cursor, const Message
bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
if (sync) deliveryCount = 0;//reset
- record.setId(parent->session.deliver(*transfer, getTag(), msg.isRedelivered(), msg.getTtl(), msg.getTimestamp(),
+ record.setId(parent->session.deliver(*transfer, getTag(), msg.isRedelivered(), msg.getTtl(),
ackExpected ? message::ACCEPT_MODE_EXPLICIT : message::ACCEPT_MODE_NONE,
acquire ? message::ACQUIRE_MODE_PRE_ACQUIRED : message::ACQUIRE_MODE_NOT_ACQUIRED,
msg.getAnnotations(),
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index 555163089e..0c343e5d90 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -218,7 +218,7 @@ void SessionState::handleContent(AMQFrame& frame)
}
DeliverableMessage deliverable(Message(msg, msg), semanticState.getTxBuffer());
if (broker.isTimestamping())
- deliverable.getMessage().setTimestamp();
+ msg->setTimestamp();
deliverable.getMessage().setPublisher(getConnection());
@@ -296,7 +296,7 @@ void SessionState::handleOut(AMQFrame& frame) {
}
DeliveryId SessionState::deliver(const qpid::broker::amqp_0_10::MessageTransfer& message,
- const std::string& destination, bool isRedelivered, uint64_t ttl, uint64_t timestamp,
+ const std::string& destination, bool isRedelivered, uint64_t ttl,
qpid::framing::message::AcceptMode acceptMode, qpid::framing::message::AcquireMode acquireMode,
const qpid::types::Variant::Map& annotations, bool sync)
{
@@ -307,7 +307,7 @@ DeliveryId SessionState::deliver(const qpid::broker::amqp_0_10::MessageTransfer&
framing::AMQFrame method((framing::MessageTransferBody(framing::ProtocolVersion(), destination, acceptMode, acquireMode)));
method.setEof(false);
getProxy().getHandler().handle(method);
- message.sendHeader(getProxy().getHandler(), maxFrameSize, isRedelivered, ttl, timestamp, annotations);
+ message.sendHeader(getProxy().getHandler(), maxFrameSize, isRedelivered, ttl, annotations);
message.sendContent(getProxy().getHandler(), maxFrameSize);
assert(senderGetCommandPoint() == SessionPoint(commandId+1, 0)); // Delivery has moved sendPoint.
diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h
index 9fe38636a3..23dc3b897d 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.h
+++ b/qpid/cpp/src/qpid/broker/SessionState.h
@@ -101,7 +101,7 @@ class SessionState : public qpid::SessionState,
void sendCompletion();
DeliveryId deliver(const qpid::broker::amqp_0_10::MessageTransfer& message,
- const std::string& destination, bool isRedelivered, uint64_t ttl, uint64_t timestamp,
+ const std::string& destination, bool isRedelivered, uint64_t ttl,
qpid::framing::message::AcceptMode, qpid::framing::message::AcquireMode,
const qpid::types::Variant::Map& annotations, bool sync);
diff --git a/qpid/cpp/src/qpid/broker/amqp/Message.cpp b/qpid/cpp/src/qpid/broker/amqp/Message.cpp
index 7015db324b..b1e6c58b00 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Message.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Message.cpp
@@ -60,6 +60,13 @@ std::string Message::getUserId() const
return v;
}
+uint64_t Message::getTimestamp() const
+{
+ //AMQP 1.0 message doesn't have the equivalent of the 0-10 timestamp field
+ //TODO: define an annotation for that
+ return 0;
+}
+
bool Message::isPersistent() const
{
return durable && durable.get();
diff --git a/qpid/cpp/src/qpid/broker/amqp/Message.h b/qpid/cpp/src/qpid/broker/amqp/Message.h
index 5bf49ef556..161b1dc6f1 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Message.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Message.h
@@ -51,6 +51,7 @@ class Message : public qpid::broker::Message::Encoding, private qpid::amqp::Mess
std::string getContent() const;
void processProperties(qpid::amqp::MapHandler&) const;
std::string getUserId() const;
+ uint64_t getTimestamp() const;
qpid::amqp::MessageId getMessageId() const;
qpid::amqp::MessageId getCorrelationId() const;
diff --git a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
index 8055b3fe1a..98dd70dceb 100644
--- a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
@@ -130,6 +130,19 @@ std::string MessageTransfer::getExchangeName() const
return getFrames().as<framing::MessageTransferBody>()->getDestination();
}
+void MessageTransfer::setTimestamp()
+{
+ DeliveryProperties* props = getFrames().getHeaders()->get<DeliveryProperties>(true);
+ time_t now = ::time(0);
+ props->setTimestamp(now);
+}
+
+uint64_t MessageTransfer::getTimestamp() const
+{
+ const DeliveryProperties* props = getProperties<DeliveryProperties>();
+ return props ? props->getTimestamp() : 0;
+}
+
bool MessageTransfer::requiresAccept() const
{
const framing::MessageTransferBody* b = getFrames().as<framing::MessageTransferBody>();
@@ -174,11 +187,11 @@ void MessageTransfer::sendContent(framing::FrameHandler& out, uint16_t maxFrameS
class SendHeader
{
public:
- SendHeader(FrameHandler& h, bool r, uint64_t t, uint64_t ts, const qpid::types::Variant::Map& a) : handler(h), redelivered(r), ttl(t), timestamp(ts), annotations(a) {}
+ SendHeader(FrameHandler& h, bool r, uint64_t t, const qpid::types::Variant::Map& a) : handler(h), redelivered(r), ttl(t), annotations(a) {}
void operator()(const AMQFrame& f)
{
AMQFrame copy = f;
- if (redelivered || ttl || timestamp || annotations.size()) {
+ if (redelivered || ttl || annotations.size()) {
copy.cloneBody();
if (annotations.size()) {
MessageProperties* props =
@@ -188,12 +201,11 @@ class SendHeader
props->getApplicationHeaders().set(i->first, qpid::amqp_0_10::translate(i->second));
}
}
- if (redelivered || ttl || timestamp) {
+ if (redelivered || ttl) {
DeliveryProperties* dp =
copy.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true);
if (ttl) dp->setTtl(ttl);
if (redelivered) dp->setRedelivered(redelivered);
- if (timestamp) dp->setTimestamp(timestamp);
}
}
handler.handle(copy);
@@ -202,15 +214,14 @@ class SendHeader
FrameHandler& handler;
bool redelivered;
uint64_t ttl;
- uint64_t timestamp;
const qpid::types::Variant::Map& annotations;
};
void MessageTransfer::sendHeader(framing::FrameHandler& out, uint16_t /*maxFrameSize*/,
- bool redelivered, uint64_t ttl, uint64_t timestamp,
+ bool redelivered, uint64_t ttl,
const qpid::types::Variant::Map& annotations) const
{
- SendHeader f(out, redelivered, ttl, timestamp, annotations);
+ SendHeader f(out, redelivered, ttl, annotations);
frames.map_if(f, TypeFilter<HEADER_BODY>());
}
bool MessageTransfer::isImmediateDeliveryRequired(const qpid::broker::Message& /*message*/)
diff --git a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
index 422446db51..b55f8577a4 100644
--- a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
+++ b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
@@ -55,6 +55,8 @@ class MessageTransfer : public qpid::broker::Message::Encoding, public qpid::bro
std::string getExchangeName() const;
void processProperties(qpid::amqp::MapHandler&) const;
std::string getUserId() const;
+ void setTimestamp();
+ uint64_t getTimestamp() const;
bool requiresAccept() const;
const qpid::framing::SequenceNumber& getCommandId() const;
@@ -92,7 +94,7 @@ class MessageTransfer : public qpid::broker::Message::Encoding, public qpid::bro
void clearApplicationHeadersFlag();
void sendContent(framing::FrameHandler& out, uint16_t maxFrameSize) const;
- void sendHeader(framing::FrameHandler& out, uint16_t maxFrameSize, bool redelivered, uint64_t ttl, uint64_t timestamp, const qpid::types::Variant::Map& annotations) const;
+ void sendHeader(framing::FrameHandler& out, uint16_t maxFrameSize, bool redelivered, uint64_t ttl, const qpid::types::Variant::Map& annotations) const;
void decodeHeader(framing::Buffer& buffer);
void decodeContent(framing::Buffer& buffer);
diff --git a/qpid/cpp/src/tests/legacystore/MessageUtils.h b/qpid/cpp/src/tests/legacystore/MessageUtils.h
index 6552357c72..cd23244293 100644
--- a/qpid/cpp/src/tests/legacystore/MessageUtils.h
+++ b/qpid/cpp/src/tests/legacystore/MessageUtils.h
@@ -98,7 +98,7 @@ struct MessageUtils
static void deliver(Message& msg, FrameHandler& h, uint16_t framesize)
{
- qpid::broker::amqp_0_10::MessageTransfer::get(msg).sendHeader(h, framesize, false, 0, 0, qpid::types::Variant::Map());
+ qpid::broker::amqp_0_10::MessageTransfer::get(msg).sendHeader(h, framesize, false, 0, qpid::types::Variant::Map());
qpid::broker::amqp_0_10::MessageTransfer::get(msg).sendContent(h, framesize);
}