diff options
Diffstat (limited to 'cpp/src/qpid/broker/Message.h')
-rw-r--r-- | cpp/src/qpid/broker/Message.h | 78 |
1 files changed, 47 insertions, 31 deletions
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index f7dd2734b6..dda45d73e6 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -29,17 +29,21 @@ #include "qpid/sys/Monitor.h" #include "qpid/sys/Time.h" #include <boost/function.hpp> +#include <boost/intrusive_ptr.hpp> #include <boost/shared_ptr.hpp> +#include <memory> #include <string> #include <vector> namespace qpid { - + namespace framing { +class AMQBody; +class AMQHeaderBody; class FieldTable; class SequenceNumber; } - + namespace broker { class ConnectionToken; class Exchange; @@ -51,11 +55,10 @@ class ExpiryPolicy; class Message : public PersistableMessage { public: typedef boost::function<void (const boost::intrusive_ptr<Message>&)> MessageCallback; - + QPID_BROKER_EXTERN Message(const framing::SequenceNumber& id = framing::SequenceNumber()); - QPID_BROKER_EXTERN Message(const Message&); QPID_BROKER_EXTERN ~Message(); - + uint64_t getPersistenceId() const { return persistenceId; } void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; } @@ -75,27 +78,31 @@ public: bool isImmediate() const; QPID_BROKER_EXTERN const framing::FieldTable* getApplicationHeaders() const; QPID_BROKER_EXTERN std::string getAppId() const; - framing::FieldTable& getOrInsertHeaders(); QPID_BROKER_EXTERN bool isPersistent() const; bool requiresAccept(); - QPID_BROKER_EXTERN void setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e); + /** determine msg expiration time using the TTL value if present */ + QPID_BROKER_EXTERN void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e); void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e); bool hasExpired(); sys::AbsTime getExpiration() const { return expiration; } + void setExpiration(sys::AbsTime exp) { expiration = exp; } void adjustTtl(); - - framing::FrameSet& getFrames() { return frames; } - const framing::FrameSet& getFrames() const { return frames; } - - template <class T> T* getProperties() { - qpid::framing::AMQHeaderBody* p = frames.getHeaders(); - return p->get<T>(true); - } + void setRedelivered(); + QPID_BROKER_EXTERN void insertCustomProperty(const std::string& key, int64_t value); + QPID_BROKER_EXTERN void insertCustomProperty(const std::string& key, const std::string& value); + QPID_BROKER_EXTERN void removeCustomProperty(const std::string& key); + void setExchange(const std::string&); + void clearApplicationHeadersFlag(); + /** set the timestamp delivery property to the current time-of-day */ + QPID_BROKER_EXTERN void setTimestamp(); + + framing::FrameSet& getFrames() { return frames; } + const framing::FrameSet& getFrames() const { return frames; } template <class T> const T* getProperties() const { - qpid::framing::AMQHeaderBody* p = frames.getHeaders(); - return p->get<T>(true); + const qpid::framing::AMQHeaderBody* p = frames.getHeaders(); + return p->get<T>(); } template <class T> const T* hasProperties() const { @@ -103,6 +110,11 @@ public: return p->get<T>(); } + template <class T> void eraseProperties() { + qpid::framing::AMQHeaderBody* p = frames.getHeaders(); + p->erase<T>(); + } + template <class T> const T* getMethod() const { return frames.as<T>(); } @@ -135,7 +147,7 @@ public: QPID_BROKER_EXTERN void decodeHeader(framing::Buffer& buffer); QPID_BROKER_EXTERN void decodeContent(framing::Buffer& buffer); - + void QPID_BROKER_EXTERN tryReleaseContent(); void releaseContent(); void releaseContent(MessageStore* s);//deprecated, use 'setStore(store); releaseContent();' instead @@ -149,24 +161,19 @@ public: bool isExcluded(const std::vector<std::string>& excludes) const; void addTraceId(const std::string& id); - - void forcePersistent(); - bool isForcedPersistent(); - - /** Call cb when enqueue is complete, may call immediately. Holds cb by reference. */ - void setEnqueueCompleteCallback(MessageCallback& cb); - void resetEnqueueCompleteCallback(); + void forcePersistent(); + bool isForcedPersistent(); /** Call cb when dequeue is complete, may call immediately. Holds cb by reference. */ void setDequeueCompleteCallback(MessageCallback& cb); void resetDequeueCompleteCallback(); uint8_t getPriority() const; - + bool getIsManagementMessage() const; + void setIsManagementMessage(bool b); private: MessageAdapter& getAdapter() const; - void allEnqueuesComplete(); void allDequeuesComplete(); mutable sys::Mutex lock; @@ -176,7 +183,7 @@ public: bool redelivered; bool loaded; bool staged; - bool forcePersistentPolicy; // used to force message as durable, via a broker policy + bool forcePersistentPolicy; // used to force message as durable, via a broker policy ConnectionToken* publisher; mutable MessageAdapter* adapter; qpid::sys::AbsTime expiration; @@ -187,11 +194,20 @@ public: mutable boost::intrusive_ptr<Message> empty; sys::Monitor callbackLock; - MessageCallback* enqueueCallback; MessageCallback* dequeueCallback; bool inCallback; uint32_t requiredCredit; + bool isManagementMessage; + mutable bool copyHeaderOnWrite; + + /** + * Expects lock to be held + */ + template <class T> T* getModifiableProperties() { + return getHeaderBody()->get<T>(true); + } + qpid::framing::AMQHeaderBody* getHeaderBody(); }; }} |