diff options
| author | Gordon Sim <gsim@apache.org> | 2013-11-01 10:33:30 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2013-11-01 10:33:30 +0000 |
| commit | 245ae6f79cc3c818603f9952290cc9f33700615a (patch) | |
| tree | a7bc031784df4928d49acc34477b51e32195ace5 /qpid/cpp | |
| parent | 81c95a528a716800efac7b90667c83dfe2149c6e (diff) | |
| download | qpid-python-245ae6f79cc3c818603f9952290cc9f33700615a.tar.gz | |
QPID-5284: ensure timestamp is added to the data that is persisted
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1537889 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Message.cpp | 7 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Message.h | 3 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.cpp | 6 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.h | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Message.cpp | 7 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Message.h | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp | 25 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/legacystore/MessageUtils.h | 2 |
10 files changed, 37 insertions, 22 deletions
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp index deca238f22..db6149158a 100644 --- a/qpid/cpp/src/qpid/broker/Message.cpp +++ b/qpid/cpp/src/qpid/broker/Message.cpp @@ -121,14 +121,9 @@ void Message::clearTrace() annotationsChanged(); } -void Message::setTimestamp() -{ - timestamp = ::time(0); // AMQP-0.10: posix time_t - secs since Epoch -} - uint64_t Message::getTimestamp() const { - return timestamp; + return encoding ? encoding->getTimestamp() : 0; } uint64_t Message::getTtl() const diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h index fdc919242e..38cfb2c1b8 100644 --- a/qpid/cpp/src/qpid/broker/Message.h +++ b/qpid/cpp/src/qpid/broker/Message.h @@ -77,6 +77,7 @@ public: virtual std::string getContent() const = 0; virtual void processProperties(qpid::amqp::MapHandler&) const = 0; virtual std::string getUserId() const = 0; + virtual uint64_t getTimestamp() const = 0; }; QPID_BROKER_EXTERN Message(boost::intrusive_ptr<Encoding>, boost::intrusive_ptr<PersistableMessage>); @@ -106,8 +107,6 @@ public: uint64_t getTtl() const; QPID_BROKER_EXTERN bool getTtl(uint64_t&) const; - /** set the timestamp delivery property to the current time-of-day */ - QPID_BROKER_EXTERN void setTimestamp(); QPID_BROKER_EXTERN uint64_t getTimestamp() const; QPID_BROKER_EXTERN void addAnnotation(const std::string& key, const qpid::types::Variant& value); diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index ce7071c5ae..4d5ea3b5b6 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -388,7 +388,7 @@ bool SemanticStateConsumerImpl::deliver(const QueueCursor& cursor, const Message bool sync = syncFrequency && ++deliveryCount >= syncFrequency; if (sync) deliveryCount = 0;//reset - record.setId(parent->session.deliver(*transfer, getTag(), msg.isRedelivered(), msg.getTtl(), msg.getTimestamp(), + record.setId(parent->session.deliver(*transfer, getTag(), msg.isRedelivered(), msg.getTtl(), ackExpected ? message::ACCEPT_MODE_EXPLICIT : message::ACCEPT_MODE_NONE, acquire ? message::ACQUIRE_MODE_PRE_ACQUIRED : message::ACQUIRE_MODE_NOT_ACQUIRED, msg.getAnnotations(), diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index 555163089e..0c343e5d90 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -218,7 +218,7 @@ void SessionState::handleContent(AMQFrame& frame) } DeliverableMessage deliverable(Message(msg, msg), semanticState.getTxBuffer()); if (broker.isTimestamping()) - deliverable.getMessage().setTimestamp(); + msg->setTimestamp(); deliverable.getMessage().setPublisher(getConnection()); @@ -296,7 +296,7 @@ void SessionState::handleOut(AMQFrame& frame) { } DeliveryId SessionState::deliver(const qpid::broker::amqp_0_10::MessageTransfer& message, - const std::string& destination, bool isRedelivered, uint64_t ttl, uint64_t timestamp, + const std::string& destination, bool isRedelivered, uint64_t ttl, qpid::framing::message::AcceptMode acceptMode, qpid::framing::message::AcquireMode acquireMode, const qpid::types::Variant::Map& annotations, bool sync) { @@ -307,7 +307,7 @@ DeliveryId SessionState::deliver(const qpid::broker::amqp_0_10::MessageTransfer& framing::AMQFrame method((framing::MessageTransferBody(framing::ProtocolVersion(), destination, acceptMode, acquireMode))); method.setEof(false); getProxy().getHandler().handle(method); - message.sendHeader(getProxy().getHandler(), maxFrameSize, isRedelivered, ttl, timestamp, annotations); + message.sendHeader(getProxy().getHandler(), maxFrameSize, isRedelivered, ttl, annotations); message.sendContent(getProxy().getHandler(), maxFrameSize); assert(senderGetCommandPoint() == SessionPoint(commandId+1, 0)); // Delivery has moved sendPoint. diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index 9fe38636a3..23dc3b897d 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -101,7 +101,7 @@ class SessionState : public qpid::SessionState, void sendCompletion(); DeliveryId deliver(const qpid::broker::amqp_0_10::MessageTransfer& message, - const std::string& destination, bool isRedelivered, uint64_t ttl, uint64_t timestamp, + const std::string& destination, bool isRedelivered, uint64_t ttl, qpid::framing::message::AcceptMode, qpid::framing::message::AcquireMode, const qpid::types::Variant::Map& annotations, bool sync); diff --git a/qpid/cpp/src/qpid/broker/amqp/Message.cpp b/qpid/cpp/src/qpid/broker/amqp/Message.cpp index 7015db324b..b1e6c58b00 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Message.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Message.cpp @@ -60,6 +60,13 @@ std::string Message::getUserId() const return v; } +uint64_t Message::getTimestamp() const +{ + //AMQP 1.0 message doesn't have the equivalent of the 0-10 timestamp field + //TODO: define an annotation for that + return 0; +} + bool Message::isPersistent() const { return durable && durable.get(); diff --git a/qpid/cpp/src/qpid/broker/amqp/Message.h b/qpid/cpp/src/qpid/broker/amqp/Message.h index 5bf49ef556..161b1dc6f1 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Message.h +++ b/qpid/cpp/src/qpid/broker/amqp/Message.h @@ -51,6 +51,7 @@ class Message : public qpid::broker::Message::Encoding, private qpid::amqp::Mess std::string getContent() const; void processProperties(qpid::amqp::MapHandler&) const; std::string getUserId() const; + uint64_t getTimestamp() const; qpid::amqp::MessageId getMessageId() const; qpid::amqp::MessageId getCorrelationId() const; diff --git a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp index 8055b3fe1a..98dd70dceb 100644 --- a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp +++ b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp @@ -130,6 +130,19 @@ std::string MessageTransfer::getExchangeName() const return getFrames().as<framing::MessageTransferBody>()->getDestination(); } +void MessageTransfer::setTimestamp() +{ + DeliveryProperties* props = getFrames().getHeaders()->get<DeliveryProperties>(true); + time_t now = ::time(0); + props->setTimestamp(now); +} + +uint64_t MessageTransfer::getTimestamp() const +{ + const DeliveryProperties* props = getProperties<DeliveryProperties>(); + return props ? props->getTimestamp() : 0; +} + bool MessageTransfer::requiresAccept() const { const framing::MessageTransferBody* b = getFrames().as<framing::MessageTransferBody>(); @@ -174,11 +187,11 @@ void MessageTransfer::sendContent(framing::FrameHandler& out, uint16_t maxFrameS class SendHeader { public: - SendHeader(FrameHandler& h, bool r, uint64_t t, uint64_t ts, const qpid::types::Variant::Map& a) : handler(h), redelivered(r), ttl(t), timestamp(ts), annotations(a) {} + SendHeader(FrameHandler& h, bool r, uint64_t t, const qpid::types::Variant::Map& a) : handler(h), redelivered(r), ttl(t), annotations(a) {} void operator()(const AMQFrame& f) { AMQFrame copy = f; - if (redelivered || ttl || timestamp || annotations.size()) { + if (redelivered || ttl || annotations.size()) { copy.cloneBody(); if (annotations.size()) { MessageProperties* props = @@ -188,12 +201,11 @@ class SendHeader props->getApplicationHeaders().set(i->first, qpid::amqp_0_10::translate(i->second)); } } - if (redelivered || ttl || timestamp) { + if (redelivered || ttl) { DeliveryProperties* dp = copy.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true); if (ttl) dp->setTtl(ttl); if (redelivered) dp->setRedelivered(redelivered); - if (timestamp) dp->setTimestamp(timestamp); } } handler.handle(copy); @@ -202,15 +214,14 @@ class SendHeader FrameHandler& handler; bool redelivered; uint64_t ttl; - uint64_t timestamp; const qpid::types::Variant::Map& annotations; }; void MessageTransfer::sendHeader(framing::FrameHandler& out, uint16_t /*maxFrameSize*/, - bool redelivered, uint64_t ttl, uint64_t timestamp, + bool redelivered, uint64_t ttl, const qpid::types::Variant::Map& annotations) const { - SendHeader f(out, redelivered, ttl, timestamp, annotations); + SendHeader f(out, redelivered, ttl, annotations); frames.map_if(f, TypeFilter<HEADER_BODY>()); } bool MessageTransfer::isImmediateDeliveryRequired(const qpid::broker::Message& /*message*/) diff --git a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h index 422446db51..b55f8577a4 100644 --- a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h +++ b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h @@ -55,6 +55,8 @@ class MessageTransfer : public qpid::broker::Message::Encoding, public qpid::bro std::string getExchangeName() const; void processProperties(qpid::amqp::MapHandler&) const; std::string getUserId() const; + void setTimestamp(); + uint64_t getTimestamp() const; bool requiresAccept() const; const qpid::framing::SequenceNumber& getCommandId() const; @@ -92,7 +94,7 @@ class MessageTransfer : public qpid::broker::Message::Encoding, public qpid::bro void clearApplicationHeadersFlag(); void sendContent(framing::FrameHandler& out, uint16_t maxFrameSize) const; - void sendHeader(framing::FrameHandler& out, uint16_t maxFrameSize, bool redelivered, uint64_t ttl, uint64_t timestamp, const qpid::types::Variant::Map& annotations) const; + void sendHeader(framing::FrameHandler& out, uint16_t maxFrameSize, bool redelivered, uint64_t ttl, const qpid::types::Variant::Map& annotations) const; void decodeHeader(framing::Buffer& buffer); void decodeContent(framing::Buffer& buffer); diff --git a/qpid/cpp/src/tests/legacystore/MessageUtils.h b/qpid/cpp/src/tests/legacystore/MessageUtils.h index 6552357c72..cd23244293 100644 --- a/qpid/cpp/src/tests/legacystore/MessageUtils.h +++ b/qpid/cpp/src/tests/legacystore/MessageUtils.h @@ -98,7 +98,7 @@ struct MessageUtils static void deliver(Message& msg, FrameHandler& h, uint16_t framesize) { - qpid::broker::amqp_0_10::MessageTransfer::get(msg).sendHeader(h, framesize, false, 0, 0, qpid::types::Variant::Map()); + qpid::broker::amqp_0_10::MessageTransfer::get(msg).sendHeader(h, framesize, false, 0, qpid::types::Variant::Map()); qpid::broker::amqp_0_10::MessageTransfer::get(msg).sendContent(h, framesize); } |
