summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp7
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.cpp2
-rw-r--r--cpp/src/qpid/broker/Exchange.cpp4
-rw-r--r--cpp/src/qpid/broker/Message.cpp85
-rw-r--r--cpp/src/qpid/broker/Message.h35
-rw-r--r--cpp/src/qpid/broker/Queue.cpp7
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp16
-rw-r--r--cpp/src/qpid/cluster/UpdateExchange.cpp8
-rw-r--r--cpp/src/qpid/framing/AMQFrame.cpp5
-rw-r--r--cpp/src/qpid/framing/AMQFrame.h5
-rw-r--r--cpp/src/qpid/management/ManagementAgent.cpp2
-rw-r--r--cpp/src/qpid/replication/ReplicatingEventListener.cpp7
-rw-r--r--cpp/src/qpid/replication/ReplicationExchange.cpp9
13 files changed, 127 insertions, 65 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index f80e0f1e61..6eaf16b052 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -434,8 +434,9 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
_qmf::ArgsBrokerConnect& hp=
dynamic_cast<_qmf::ArgsBrokerConnect&>(args);
- QPID_LOG (debug, "Broker::connect()");
string transport = hp.i_transport.empty() ? TCP_TRANSPORT : hp.i_transport;
+ QPID_LOG (debug, "Broker::connect() " << hp.i_host << ":" << hp.i_port << "; transport=" << transport <<
+ "; durable=" << (hp.i_durable?"T":"F") << "; authMech=\"" << hp.i_authMechanism << "\"");
if (!getProtocolFactory(transport)) {
QPID_LOG(error, "Transport '" << transport << "' not supported");
return Manageable::STATUS_NOT_IMPLEMENTED;
@@ -452,9 +453,9 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
_qmf::ArgsBrokerQueueMoveMessages& moveArgs=
dynamic_cast<_qmf::ArgsBrokerQueueMoveMessages&>(args);
QPID_LOG (debug, "Broker::queueMoveMessages()");
- if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty))
+ if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty))
status = Manageable::STATUS_OK;
- else
+ else
return Manageable::STATUS_PARAMETER_INVALID;
break;
}
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp
index 58dcc6d7c7..11970db394 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -75,7 +75,7 @@ void DeliveryRecord::deliver(framing::FrameHandler& h, DeliveryId deliveryId, ui
{
id = deliveryId;
if (msg.payload->getRedelivered()){
- msg.payload->getProperties<framing::DeliveryProperties>()->setRedelivered(true);
+ msg.payload->setRedelivered();
}
msg.payload->adjustTtl();
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp
index 622cc81002..d68845062d 100644
--- a/cpp/src/qpid/broker/Exchange.cpp
+++ b/cpp/src/qpid/broker/Exchange.cpp
@@ -58,7 +58,7 @@ Exchange::PreRoute::PreRoute(Deliverable& msg, Exchange* _p):parent(_p) {
if (parent->sequence){
parent->sequenceNo++;
- msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,parent->sequenceNo);
+ msg.getMessage().insertCustomProperty(qpidMsgSequence,parent->sequenceNo);
}
if (parent->ive) {
parent->lastMsg = &( msg.getMessage());
@@ -390,7 +390,7 @@ bool Exchange::MatchQueue::operator()(Exchange::Binding::shared_ptr b)
}
void Exchange::setProperties(const boost::intrusive_ptr<Message>& msg) {
- msg->getProperties<DeliveryProperties>()->setExchange(getName());
+ msg->setExchange(getName());
}
bool Exchange::routeWithAlternate(Deliverable& msg)
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index d694d1eafd..28886826ca 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -30,7 +30,9 @@
#include "qpid/framing/SendContent.h"
#include "qpid/framing/SequenceNumber.h"
#include "qpid/framing/TypeFilter.h"
+#include "qpid/framing/reply_exceptions.h"
#include "qpid/log/Statement.h"
+#include <boost/pointer_cast.hpp>
#include <time.h>
@@ -51,18 +53,9 @@ Message::Message(const framing::SequenceNumber& id) :
frames(id), persistenceId(0), redelivered(false), loaded(false),
staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
expiration(FAR_FUTURE), dequeueCallback(0),
- inCallback(false), requiredCredit(0), isManagementMessage(false)
+ inCallback(false), requiredCredit(0), isManagementMessage(false), copyHeaderOnWrite(false)
{}
-Message::Message(const Message& original) :
- PersistableMessage(), frames(original.frames), persistenceId(0), redelivered(false), loaded(false),
- staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
- expiration(original.expiration), dequeueCallback(0),
- inCallback(false), requiredCredit(0)
-{
- setExpiryPolicy(original.expiryPolicy);
-}
-
Message::~Message() {}
void Message::forcePersistent()
@@ -288,6 +281,9 @@ void Message::sendHeader(framing::FrameHandler& out, uint16_t /*maxFrameSize*/)
sys::Mutex::ScopedLock l(lock);
Relay f(out);
frames.map_if(f, TypeFilter<HEADER_BODY>());
+ //as frame (and pointer to body) has now been passed to handler,
+ //subsequent modifications should use a copy
+ copyHeaderOnWrite = true;
}
// TODO aconway 2007-11-09: Obsolete, remove. Was used to cover over
@@ -342,11 +338,30 @@ bool Message::isExcluded(const std::vector<std::string>& excludes) const
return false;
}
+class CloneHeaderBody
+{
+public:
+ void operator()(AMQFrame& f)
+ {
+ f.cloneBody();
+ }
+};
+
+AMQHeaderBody* Message::getHeaderBody()
+{
+ if (copyHeaderOnWrite) {
+ CloneHeaderBody f;
+ frames.map_if(f, TypeFilter<HEADER_BODY>());
+ copyHeaderOnWrite = false;
+ }
+ return frames.getHeaders();
+}
+
void Message::addTraceId(const std::string& id)
{
sys::Mutex::ScopedLock l(lock);
if (isA<MessageTransferBody>()) {
- FieldTable& headers = getProperties<MessageProperties>()->getApplicationHeaders();
+ FieldTable& headers = getModifiableProperties<MessageProperties>()->getApplicationHeaders();
std::string trace = headers.getAsString(X_QPID_TRACE);
if (trace.empty()) {
headers.setString(X_QPID_TRACE, id);
@@ -360,7 +375,8 @@ void Message::addTraceId(const std::string& id)
void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e)
{
- DeliveryProperties* props = getProperties<DeliveryProperties>();
+ sys::Mutex::ScopedLock l(lock);
+ DeliveryProperties* props = getModifiableProperties<DeliveryProperties>();
if (props->getTtl()) {
// AMQP requires setting the expiration property to be posix
// time_t in seconds. TTL is in milliseconds
@@ -382,9 +398,9 @@ void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e)
void Message::adjustTtl()
{
- DeliveryProperties* props = getProperties<DeliveryProperties>();
+ sys::Mutex::ScopedLock l(lock);
+ DeliveryProperties* props = getModifiableProperties<DeliveryProperties>();
if (props->getTtl()) {
- sys::Mutex::ScopedLock l(lock);
if (expiration < FAR_FUTURE) {
sys::AbsTime current(
expiryPolicy ? expiryPolicy->getCurrentTime() : sys::AbsTime::now());
@@ -395,6 +411,42 @@ void Message::adjustTtl()
}
}
+void Message::setRedelivered()
+{
+ sys::Mutex::ScopedLock l(lock);
+ getModifiableProperties<framing::DeliveryProperties>()->setRedelivered(true);
+}
+
+void Message::insertCustomProperty(const std::string& key, int64_t value)
+{
+ sys::Mutex::ScopedLock l(lock);
+ getModifiableProperties<MessageProperties>()->getApplicationHeaders().setInt64(key,value);
+}
+
+void Message::insertCustomProperty(const std::string& key, const std::string& value)
+{
+ sys::Mutex::ScopedLock l(lock);
+ getModifiableProperties<MessageProperties>()->getApplicationHeaders().setString(key,value);
+}
+
+void Message::removeCustomProperty(const std::string& key)
+{
+ sys::Mutex::ScopedLock l(lock);
+ getModifiableProperties<MessageProperties>()->getApplicationHeaders().erase(key);
+}
+
+void Message::setExchange(const std::string& exchange)
+{
+ sys::Mutex::ScopedLock l(lock);
+ getModifiableProperties<DeliveryProperties>()->setExchange(exchange);
+}
+
+void Message::clearApplicationHeadersFlag()
+{
+ sys::Mutex::ScopedLock l(lock);
+ getModifiableProperties<MessageProperties>()->clearApplicationHeadersFlag();
+}
+
void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) {
expiryPolicy = e;
}
@@ -442,11 +494,6 @@ uint8_t Message::getPriority() const {
return getAdapter().getPriority(frames);
}
-framing::FieldTable& Message::getOrInsertHeaders()
-{
- return getProperties<MessageProperties>()->getApplicationHeaders();
-}
-
bool Message::getIsManagementMessage() const { return isManagementMessage; }
void Message::setIsManagementMessage(bool b) { isManagementMessage = b; }
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index e1d6c60a80..a9cb246a6f 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -29,13 +29,17 @@
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Time.h"
#include <boost/function.hpp>
+#include <boost/intrusive_ptr.hpp>
#include <boost/shared_ptr.hpp>
+#include <memory>
#include <string>
#include <vector>
namespace qpid {
namespace framing {
+class AMQBody;
+class AMQHeaderBody;
class FieldTable;
class SequenceNumber;
}
@@ -53,7 +57,6 @@ public:
typedef boost::function<void (const boost::intrusive_ptr<Message>&)> MessageCallback;
QPID_BROKER_EXTERN Message(const framing::SequenceNumber& id = framing::SequenceNumber());
- QPID_BROKER_EXTERN Message(const Message&);
QPID_BROKER_EXTERN ~Message();
uint64_t getPersistenceId() const { return persistenceId; }
@@ -75,7 +78,6 @@ public:
bool isImmediate() const;
QPID_BROKER_EXTERN const framing::FieldTable* getApplicationHeaders() const;
QPID_BROKER_EXTERN std::string getAppId() const;
- framing::FieldTable& getOrInsertHeaders();
QPID_BROKER_EXTERN bool isPersistent() const;
bool requiresAccept();
@@ -85,18 +87,19 @@ public:
sys::AbsTime getExpiration() const { return expiration; }
void setExpiration(sys::AbsTime exp) { expiration = exp; }
void adjustTtl();
+ void setRedelivered();
+ void insertCustomProperty(const std::string& key, int64_t value);
+ void insertCustomProperty(const std::string& key, const std::string& value);
+ void removeCustomProperty(const std::string& key);
+ void setExchange(const std::string&);
+ void clearApplicationHeadersFlag();
framing::FrameSet& getFrames() { return frames; }
const framing::FrameSet& getFrames() const { return frames; }
- template <class T> T* getProperties() {
- qpid::framing::AMQHeaderBody* p = frames.getHeaders();
- return p->get<T>(true);
- }
-
template <class T> const T* getProperties() const {
const qpid::framing::AMQHeaderBody* p = frames.getHeaders();
- return p->get<T>(true);
+ return p->get<T>();
}
template <class T> const T* hasProperties() const {
@@ -156,9 +159,8 @@ public:
bool isExcluded(const std::vector<std::string>& excludes) const;
void addTraceId(const std::string& id);
- void forcePersistent();
- bool isForcedPersistent();
-
+ void forcePersistent();
+ bool isForcedPersistent();
/** Call cb when dequeue is complete, may call immediately. Holds cb by reference. */
void setDequeueCompleteCallback(MessageCallback& cb);
@@ -178,7 +180,7 @@ public:
bool redelivered;
bool loaded;
bool staged;
- bool forcePersistentPolicy; // used to force message as durable, via a broker policy
+ bool forcePersistentPolicy; // used to force message as durable, via a broker policy
ConnectionToken* publisher;
mutable MessageAdapter* adapter;
qpid::sys::AbsTime expiration;
@@ -194,6 +196,15 @@ public:
uint32_t requiredCredit;
bool isManagementMessage;
+ mutable bool copyHeaderOnWrite;
+
+ /**
+ * Expects lock to be held
+ */
+ template <class T> T* getModifiableProperties() {
+ return getHeaderBody()->get<T>(true);
+ }
+ qpid::framing::AMQHeaderBody* getHeaderBody();
};
}}
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 42923567a2..dd3f982699 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -525,7 +525,7 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
{
Mutex::ScopedLock locker(messageLock);
QueuedMessage qm(this, msg, ++sequence);
- if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence);
+ if (insertSeqNo) msg->insertCustomProperty(seqNoKey, sequence);
dequeueRequired = messages->push(qm, removed);
listeners.populate(copy);
@@ -627,11 +627,6 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg
}
if (traceId.size()) {
- //copy on write: take deep copy of message before modifying it
- //as the frames may already be available for delivery on other
- //threads
- boost::intrusive_ptr<Message> copy(new Message(*msg));
- msg = copy;
msg->addTraceId(traceId);
}
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index afe5b8ac3a..f306517d37 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -318,22 +318,22 @@ class MessageUpdater {
lastPos = message.position;
// if the ttl > 0, we need to send the calculated expiration time to the updatee
- if (message.payload->getProperties<DeliveryProperties>()->getTtl() > 0) {
+ const DeliveryProperties* dprops =
+ message.payload->getProperties<DeliveryProperties>();
+ if (dprops && dprops->getTtl() > 0) {
bool hadMessageProps =
message.payload->hasProperties<framing::MessageProperties>();
- framing::MessageProperties* mprops =
+ const framing::MessageProperties* mprops =
message.payload->getProperties<framing::MessageProperties>();
bool hadApplicationHeaders = mprops->hasApplicationHeaders();
- FieldTable& applicationHeaders = mprops->getApplicationHeaders();
- applicationHeaders.setInt64(
- UpdateClient::X_QPID_EXPIRATION,
- sys::Duration(sys::EPOCH, message.payload->getExpiration()));
+ message.payload->insertCustomProperty(UpdateClient::X_QPID_EXPIRATION,
+ sys::Duration(sys::EPOCH, message.payload->getExpiration()));
// If message properties or application headers didn't exist
// prior to us adding data, we want to remove them on the other side.
if (!hadMessageProps)
- applicationHeaders.setInt(UpdateClient::X_QPID_NO_MESSAGE_PROPS, 0);
+ message.payload->insertCustomProperty(UpdateClient::X_QPID_NO_MESSAGE_PROPS, 0);
else if (!hadApplicationHeaders)
- applicationHeaders.setInt(UpdateClient::X_QPID_NO_HEADERS, 0);
+ message.payload->insertCustomProperty(UpdateClient::X_QPID_NO_HEADERS, 0);
}
// We can't send a broker::Message via the normal client API,
diff --git a/cpp/src/qpid/cluster/UpdateExchange.cpp b/cpp/src/qpid/cluster/UpdateExchange.cpp
index e830459aba..cb1376004e 100644
--- a/cpp/src/qpid/cluster/UpdateExchange.cpp
+++ b/cpp/src/qpid/cluster/UpdateExchange.cpp
@@ -49,18 +49,18 @@ void UpdateExchange::setProperties(const boost::intrusive_ptr<broker::Message>&
// Copy expiration from x-property if present.
if (msg->hasProperties<MessageProperties>()) {
- MessageProperties* mprops = msg->getProperties<MessageProperties>();
+ const MessageProperties* mprops = msg->getProperties<MessageProperties>();
if (mprops->hasApplicationHeaders()) {
- FieldTable& headers = mprops->getApplicationHeaders();
+ const FieldTable& headers = mprops->getApplicationHeaders();
if (headers.isSet(UpdateClient::X_QPID_EXPIRATION)) {
msg->setExpiration(
sys::AbsTime(sys::EPOCH, headers.getAsInt64(UpdateClient::X_QPID_EXPIRATION)));
- headers.erase(UpdateClient::X_QPID_EXPIRATION);
+ msg->removeCustomProperty(UpdateClient::X_QPID_EXPIRATION);
// Erase props/headers that were added by the UpdateClient
if (headers.isSet(UpdateClient::X_QPID_NO_MESSAGE_PROPS))
msg->eraseProperties<MessageProperties>();
else if (headers.isSet(UpdateClient::X_QPID_NO_HEADERS))
- mprops->clearApplicationHeadersFlag();
+ msg->clearApplicationHeadersFlag();
}
}
}
diff --git a/cpp/src/qpid/framing/AMQFrame.cpp b/cpp/src/qpid/framing/AMQFrame.cpp
index cd60cd971f..5b9673f0d0 100644
--- a/cpp/src/qpid/framing/AMQFrame.cpp
+++ b/cpp/src/qpid/framing/AMQFrame.cpp
@@ -139,6 +139,11 @@ bool AMQFrame::decode(Buffer& buffer)
return true;
}
+void AMQFrame::cloneBody()
+{
+ body = body->clone();
+}
+
std::ostream& operator<<(std::ostream& out, const AMQFrame& f)
{
return
diff --git a/cpp/src/qpid/framing/AMQFrame.h b/cpp/src/qpid/framing/AMQFrame.h
index c669d12bc0..59c4a7501f 100644
--- a/cpp/src/qpid/framing/AMQFrame.h
+++ b/cpp/src/qpid/framing/AMQFrame.h
@@ -59,6 +59,11 @@ class QPID_COMMON_CLASS_EXTERN AMQFrame : public AMQDataBlock
return boost::polymorphic_downcast<const T*>(getBody());
}
+ /**
+ * Take a deep copy of the body currently referenced
+ */
+ void cloneBody();
+
QPID_COMMON_EXTERN void encode(Buffer& buffer) const;
QPID_COMMON_EXTERN bool decode(Buffer& buffer);
QPID_COMMON_EXTERN uint32_t encodedSize() const;
diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp
index 923005b9fc..50fdc82ee0 100644
--- a/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/cpp/src/qpid/management/ManagementAgent.cpp
@@ -614,7 +614,7 @@ void ManagementAgent::sendBufferLH(const string& data,
props->setAppId("qmf2");
for (i = headers.begin(); i != headers.end(); ++i) {
- msg->getOrInsertHeaders().setString(i->first, i->second.asString());
+ msg->insertCustomProperty(i->first, i->second.asString());
}
DeliveryProperties* dp =
diff --git a/cpp/src/qpid/replication/ReplicatingEventListener.cpp b/cpp/src/qpid/replication/ReplicatingEventListener.cpp
index b7d52372f4..0ced4d9161 100644
--- a/cpp/src/qpid/replication/ReplicatingEventListener.cpp
+++ b/cpp/src/qpid/replication/ReplicatingEventListener.cpp
@@ -69,10 +69,9 @@ void ReplicatingEventListener::deliverDequeueMessage(const QueuedMessage& dequeu
void ReplicatingEventListener::deliverEnqueueMessage(const QueuedMessage& enqueued)
{
boost::intrusive_ptr<Message> msg(cloneMessage(*(enqueued.queue), enqueued.payload));
- FieldTable& headers = msg->getProperties<MessageProperties>()->getApplicationHeaders();
- headers.setString(REPLICATION_TARGET_QUEUE, enqueued.queue->getName());
- headers.setInt(REPLICATION_EVENT_TYPE, ENQUEUE);
- headers.setInt(QUEUE_MESSAGE_POSITION,enqueued.position);
+ msg->insertCustomProperty(REPLICATION_TARGET_QUEUE, enqueued.queue->getName());
+ msg->insertCustomProperty(REPLICATION_EVENT_TYPE, ENQUEUE);
+ msg->insertCustomProperty(QUEUE_MESSAGE_POSITION,enqueued.position);
route(msg);
}
diff --git a/cpp/src/qpid/replication/ReplicationExchange.cpp b/cpp/src/qpid/replication/ReplicationExchange.cpp
index 4b6d25ac7d..89a2bf516d 100644
--- a/cpp/src/qpid/replication/ReplicationExchange.cpp
+++ b/cpp/src/qpid/replication/ReplicationExchange.cpp
@@ -97,11 +97,10 @@ void ReplicationExchange::handleEnqueueEvent(const FieldTable* args, Deliverable
} else {
queue->setPosition(seqno1);
- FieldTable& headers = msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders();
- headers.erase(REPLICATION_TARGET_QUEUE);
- headers.erase(REPLICATION_EVENT_SEQNO);
- headers.erase(REPLICATION_EVENT_TYPE);
- headers.erase(QUEUE_MESSAGE_POSITION);
+ msg.getMessage().removeCustomProperty(REPLICATION_TARGET_QUEUE);
+ msg.getMessage().removeCustomProperty(REPLICATION_EVENT_SEQNO);
+ msg.getMessage().removeCustomProperty(REPLICATION_EVENT_TYPE);
+ msg.getMessage().removeCustomProperty(QUEUE_MESSAGE_POSITION);
msg.deliverTo(queue);
QPID_LOG(debug, "Enqueued replicated message onto " << queueName);
if (mgmtExchange != 0) {