summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Message.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Message.h')
-rw-r--r--cpp/src/qpid/broker/Message.h78
1 files changed, 47 insertions, 31 deletions
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index f7dd2734b6..dda45d73e6 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -29,17 +29,21 @@
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Time.h"
#include <boost/function.hpp>
+#include <boost/intrusive_ptr.hpp>
#include <boost/shared_ptr.hpp>
+#include <memory>
#include <string>
#include <vector>
namespace qpid {
-
+
namespace framing {
+class AMQBody;
+class AMQHeaderBody;
class FieldTable;
class SequenceNumber;
}
-
+
namespace broker {
class ConnectionToken;
class Exchange;
@@ -51,11 +55,10 @@ class ExpiryPolicy;
class Message : public PersistableMessage {
public:
typedef boost::function<void (const boost::intrusive_ptr<Message>&)> MessageCallback;
-
+
QPID_BROKER_EXTERN Message(const framing::SequenceNumber& id = framing::SequenceNumber());
- QPID_BROKER_EXTERN Message(const Message&);
QPID_BROKER_EXTERN ~Message();
-
+
uint64_t getPersistenceId() const { return persistenceId; }
void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; }
@@ -75,27 +78,31 @@ public:
bool isImmediate() const;
QPID_BROKER_EXTERN const framing::FieldTable* getApplicationHeaders() const;
QPID_BROKER_EXTERN std::string getAppId() const;
- framing::FieldTable& getOrInsertHeaders();
QPID_BROKER_EXTERN bool isPersistent() const;
bool requiresAccept();
- QPID_BROKER_EXTERN void setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e);
+ /** determine msg expiration time using the TTL value if present */
+ QPID_BROKER_EXTERN void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e);
void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e);
bool hasExpired();
sys::AbsTime getExpiration() const { return expiration; }
+ void setExpiration(sys::AbsTime exp) { expiration = exp; }
void adjustTtl();
-
- framing::FrameSet& getFrames() { return frames; }
- const framing::FrameSet& getFrames() const { return frames; }
-
- template <class T> T* getProperties() {
- qpid::framing::AMQHeaderBody* p = frames.getHeaders();
- return p->get<T>(true);
- }
+ void setRedelivered();
+ QPID_BROKER_EXTERN void insertCustomProperty(const std::string& key, int64_t value);
+ QPID_BROKER_EXTERN void insertCustomProperty(const std::string& key, const std::string& value);
+ QPID_BROKER_EXTERN void removeCustomProperty(const std::string& key);
+ void setExchange(const std::string&);
+ void clearApplicationHeadersFlag();
+ /** set the timestamp delivery property to the current time-of-day */
+ QPID_BROKER_EXTERN void setTimestamp();
+
+ framing::FrameSet& getFrames() { return frames; }
+ const framing::FrameSet& getFrames() const { return frames; }
template <class T> const T* getProperties() const {
- qpid::framing::AMQHeaderBody* p = frames.getHeaders();
- return p->get<T>(true);
+ const qpid::framing::AMQHeaderBody* p = frames.getHeaders();
+ return p->get<T>();
}
template <class T> const T* hasProperties() const {
@@ -103,6 +110,11 @@ public:
return p->get<T>();
}
+ template <class T> void eraseProperties() {
+ qpid::framing::AMQHeaderBody* p = frames.getHeaders();
+ p->erase<T>();
+ }
+
template <class T> const T* getMethod() const {
return frames.as<T>();
}
@@ -135,7 +147,7 @@ public:
QPID_BROKER_EXTERN void decodeHeader(framing::Buffer& buffer);
QPID_BROKER_EXTERN void decodeContent(framing::Buffer& buffer);
-
+
void QPID_BROKER_EXTERN tryReleaseContent();
void releaseContent();
void releaseContent(MessageStore* s);//deprecated, use 'setStore(store); releaseContent();' instead
@@ -149,24 +161,19 @@ public:
bool isExcluded(const std::vector<std::string>& excludes) const;
void addTraceId(const std::string& id);
-
- void forcePersistent();
- bool isForcedPersistent();
-
- /** Call cb when enqueue is complete, may call immediately. Holds cb by reference. */
- void setEnqueueCompleteCallback(MessageCallback& cb);
- void resetEnqueueCompleteCallback();
+ void forcePersistent();
+ bool isForcedPersistent();
/** Call cb when dequeue is complete, may call immediately. Holds cb by reference. */
void setDequeueCompleteCallback(MessageCallback& cb);
void resetDequeueCompleteCallback();
uint8_t getPriority() const;
-
+ bool getIsManagementMessage() const;
+ void setIsManagementMessage(bool b);
private:
MessageAdapter& getAdapter() const;
- void allEnqueuesComplete();
void allDequeuesComplete();
mutable sys::Mutex lock;
@@ -176,7 +183,7 @@ public:
bool redelivered;
bool loaded;
bool staged;
- bool forcePersistentPolicy; // used to force message as durable, via a broker policy
+ bool forcePersistentPolicy; // used to force message as durable, via a broker policy
ConnectionToken* publisher;
mutable MessageAdapter* adapter;
qpid::sys::AbsTime expiration;
@@ -187,11 +194,20 @@ public:
mutable boost::intrusive_ptr<Message> empty;
sys::Monitor callbackLock;
- MessageCallback* enqueueCallback;
MessageCallback* dequeueCallback;
bool inCallback;
uint32_t requiredCredit;
+ bool isManagementMessage;
+ mutable bool copyHeaderOnWrite;
+
+ /**
+ * Expects lock to be held
+ */
+ template <class T> T* getModifiableProperties() {
+ return getHeaderBody()->get<T>(true);
+ }
+ qpid::framing::AMQHeaderBody* getHeaderBody();
};
}}