diff options
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 7 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Exchange.cpp | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Message.cpp | 85 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Message.h | 35 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 7 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 16 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/UpdateExchange.cpp | 8 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/AMQFrame.cpp | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/AMQFrame.h | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/replication/ReplicatingEventListener.cpp | 7 | ||||
| -rw-r--r-- | cpp/src/qpid/replication/ReplicationExchange.cpp | 9 |
13 files changed, 127 insertions, 65 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index f80e0f1e61..6eaf16b052 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -434,8 +434,9 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, _qmf::ArgsBrokerConnect& hp= dynamic_cast<_qmf::ArgsBrokerConnect&>(args); - QPID_LOG (debug, "Broker::connect()"); string transport = hp.i_transport.empty() ? TCP_TRANSPORT : hp.i_transport; + QPID_LOG (debug, "Broker::connect() " << hp.i_host << ":" << hp.i_port << "; transport=" << transport << + "; durable=" << (hp.i_durable?"T":"F") << "; authMech=\"" << hp.i_authMechanism << "\""); if (!getProtocolFactory(transport)) { QPID_LOG(error, "Transport '" << transport << "' not supported"); return Manageable::STATUS_NOT_IMPLEMENTED; @@ -452,9 +453,9 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, _qmf::ArgsBrokerQueueMoveMessages& moveArgs= dynamic_cast<_qmf::ArgsBrokerQueueMoveMessages&>(args); QPID_LOG (debug, "Broker::queueMoveMessages()"); - if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty)) + if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty)) status = Manageable::STATUS_OK; - else + else return Manageable::STATUS_PARAMETER_INVALID; break; } diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp index 58dcc6d7c7..11970db394 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -75,7 +75,7 @@ void DeliveryRecord::deliver(framing::FrameHandler& h, DeliveryId deliveryId, ui { id = deliveryId; if (msg.payload->getRedelivered()){ - msg.payload->getProperties<framing::DeliveryProperties>()->setRedelivered(true); + msg.payload->setRedelivered(); } msg.payload->adjustTtl(); diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index 622cc81002..d68845062d 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -58,7 +58,7 @@ Exchange::PreRoute::PreRoute(Deliverable& msg, Exchange* _p):parent(_p) { if (parent->sequence){ parent->sequenceNo++; - msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,parent->sequenceNo); + msg.getMessage().insertCustomProperty(qpidMsgSequence,parent->sequenceNo); } if (parent->ive) { parent->lastMsg = &( msg.getMessage()); @@ -390,7 +390,7 @@ bool Exchange::MatchQueue::operator()(Exchange::Binding::shared_ptr b) } void Exchange::setProperties(const boost::intrusive_ptr<Message>& msg) { - msg->getProperties<DeliveryProperties>()->setExchange(getName()); + msg->setExchange(getName()); } bool Exchange::routeWithAlternate(Deliverable& msg) diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index d694d1eafd..28886826ca 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -30,7 +30,9 @@ #include "qpid/framing/SendContent.h" #include "qpid/framing/SequenceNumber.h" #include "qpid/framing/TypeFilter.h" +#include "qpid/framing/reply_exceptions.h" #include "qpid/log/Statement.h" +#include <boost/pointer_cast.hpp> #include <time.h> @@ -51,18 +53,9 @@ Message::Message(const framing::SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), expiration(FAR_FUTURE), dequeueCallback(0), - inCallback(false), requiredCredit(0), isManagementMessage(false) + inCallback(false), requiredCredit(0), isManagementMessage(false), copyHeaderOnWrite(false) {} -Message::Message(const Message& original) : - PersistableMessage(), frames(original.frames), persistenceId(0), redelivered(false), loaded(false), - staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), - expiration(original.expiration), dequeueCallback(0), - inCallback(false), requiredCredit(0) -{ - setExpiryPolicy(original.expiryPolicy); -} - Message::~Message() {} void Message::forcePersistent() @@ -288,6 +281,9 @@ void Message::sendHeader(framing::FrameHandler& out, uint16_t /*maxFrameSize*/) sys::Mutex::ScopedLock l(lock); Relay f(out); frames.map_if(f, TypeFilter<HEADER_BODY>()); + //as frame (and pointer to body) has now been passed to handler, + //subsequent modifications should use a copy + copyHeaderOnWrite = true; } // TODO aconway 2007-11-09: Obsolete, remove. Was used to cover over @@ -342,11 +338,30 @@ bool Message::isExcluded(const std::vector<std::string>& excludes) const return false; } +class CloneHeaderBody +{ +public: + void operator()(AMQFrame& f) + { + f.cloneBody(); + } +}; + +AMQHeaderBody* Message::getHeaderBody() +{ + if (copyHeaderOnWrite) { + CloneHeaderBody f; + frames.map_if(f, TypeFilter<HEADER_BODY>()); + copyHeaderOnWrite = false; + } + return frames.getHeaders(); +} + void Message::addTraceId(const std::string& id) { sys::Mutex::ScopedLock l(lock); if (isA<MessageTransferBody>()) { - FieldTable& headers = getProperties<MessageProperties>()->getApplicationHeaders(); + FieldTable& headers = getModifiableProperties<MessageProperties>()->getApplicationHeaders(); std::string trace = headers.getAsString(X_QPID_TRACE); if (trace.empty()) { headers.setString(X_QPID_TRACE, id); @@ -360,7 +375,8 @@ void Message::addTraceId(const std::string& id) void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e) { - DeliveryProperties* props = getProperties<DeliveryProperties>(); + sys::Mutex::ScopedLock l(lock); + DeliveryProperties* props = getModifiableProperties<DeliveryProperties>(); if (props->getTtl()) { // AMQP requires setting the expiration property to be posix // time_t in seconds. TTL is in milliseconds @@ -382,9 +398,9 @@ void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e) void Message::adjustTtl() { - DeliveryProperties* props = getProperties<DeliveryProperties>(); + sys::Mutex::ScopedLock l(lock); + DeliveryProperties* props = getModifiableProperties<DeliveryProperties>(); if (props->getTtl()) { - sys::Mutex::ScopedLock l(lock); if (expiration < FAR_FUTURE) { sys::AbsTime current( expiryPolicy ? expiryPolicy->getCurrentTime() : sys::AbsTime::now()); @@ -395,6 +411,42 @@ void Message::adjustTtl() } } +void Message::setRedelivered() +{ + sys::Mutex::ScopedLock l(lock); + getModifiableProperties<framing::DeliveryProperties>()->setRedelivered(true); +} + +void Message::insertCustomProperty(const std::string& key, int64_t value) +{ + sys::Mutex::ScopedLock l(lock); + getModifiableProperties<MessageProperties>()->getApplicationHeaders().setInt64(key,value); +} + +void Message::insertCustomProperty(const std::string& key, const std::string& value) +{ + sys::Mutex::ScopedLock l(lock); + getModifiableProperties<MessageProperties>()->getApplicationHeaders().setString(key,value); +} + +void Message::removeCustomProperty(const std::string& key) +{ + sys::Mutex::ScopedLock l(lock); + getModifiableProperties<MessageProperties>()->getApplicationHeaders().erase(key); +} + +void Message::setExchange(const std::string& exchange) +{ + sys::Mutex::ScopedLock l(lock); + getModifiableProperties<DeliveryProperties>()->setExchange(exchange); +} + +void Message::clearApplicationHeadersFlag() +{ + sys::Mutex::ScopedLock l(lock); + getModifiableProperties<MessageProperties>()->clearApplicationHeadersFlag(); +} + void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { expiryPolicy = e; } @@ -442,11 +494,6 @@ uint8_t Message::getPriority() const { return getAdapter().getPriority(frames); } -framing::FieldTable& Message::getOrInsertHeaders() -{ - return getProperties<MessageProperties>()->getApplicationHeaders(); -} - bool Message::getIsManagementMessage() const { return isManagementMessage; } void Message::setIsManagementMessage(bool b) { isManagementMessage = b; } diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index e1d6c60a80..a9cb246a6f 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -29,13 +29,17 @@ #include "qpid/sys/Monitor.h" #include "qpid/sys/Time.h" #include <boost/function.hpp> +#include <boost/intrusive_ptr.hpp> #include <boost/shared_ptr.hpp> +#include <memory> #include <string> #include <vector> namespace qpid { namespace framing { +class AMQBody; +class AMQHeaderBody; class FieldTable; class SequenceNumber; } @@ -53,7 +57,6 @@ public: typedef boost::function<void (const boost::intrusive_ptr<Message>&)> MessageCallback; QPID_BROKER_EXTERN Message(const framing::SequenceNumber& id = framing::SequenceNumber()); - QPID_BROKER_EXTERN Message(const Message&); QPID_BROKER_EXTERN ~Message(); uint64_t getPersistenceId() const { return persistenceId; } @@ -75,7 +78,6 @@ public: bool isImmediate() const; QPID_BROKER_EXTERN const framing::FieldTable* getApplicationHeaders() const; QPID_BROKER_EXTERN std::string getAppId() const; - framing::FieldTable& getOrInsertHeaders(); QPID_BROKER_EXTERN bool isPersistent() const; bool requiresAccept(); @@ -85,18 +87,19 @@ public: sys::AbsTime getExpiration() const { return expiration; } void setExpiration(sys::AbsTime exp) { expiration = exp; } void adjustTtl(); + void setRedelivered(); + void insertCustomProperty(const std::string& key, int64_t value); + void insertCustomProperty(const std::string& key, const std::string& value); + void removeCustomProperty(const std::string& key); + void setExchange(const std::string&); + void clearApplicationHeadersFlag(); framing::FrameSet& getFrames() { return frames; } const framing::FrameSet& getFrames() const { return frames; } - template <class T> T* getProperties() { - qpid::framing::AMQHeaderBody* p = frames.getHeaders(); - return p->get<T>(true); - } - template <class T> const T* getProperties() const { const qpid::framing::AMQHeaderBody* p = frames.getHeaders(); - return p->get<T>(true); + return p->get<T>(); } template <class T> const T* hasProperties() const { @@ -156,9 +159,8 @@ public: bool isExcluded(const std::vector<std::string>& excludes) const; void addTraceId(const std::string& id); - void forcePersistent(); - bool isForcedPersistent(); - + void forcePersistent(); + bool isForcedPersistent(); /** Call cb when dequeue is complete, may call immediately. Holds cb by reference. */ void setDequeueCompleteCallback(MessageCallback& cb); @@ -178,7 +180,7 @@ public: bool redelivered; bool loaded; bool staged; - bool forcePersistentPolicy; // used to force message as durable, via a broker policy + bool forcePersistentPolicy; // used to force message as durable, via a broker policy ConnectionToken* publisher; mutable MessageAdapter* adapter; qpid::sys::AbsTime expiration; @@ -194,6 +196,15 @@ public: uint32_t requiredCredit; bool isManagementMessage; + mutable bool copyHeaderOnWrite; + + /** + * Expects lock to be held + */ + template <class T> T* getModifiableProperties() { + return getHeaderBody()->get<T>(true); + } + qpid::framing::AMQHeaderBody* getHeaderBody(); }; }} diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 42923567a2..dd3f982699 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -525,7 +525,7 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ { Mutex::ScopedLock locker(messageLock); QueuedMessage qm(this, msg, ++sequence); - if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence); + if (insertSeqNo) msg->insertCustomProperty(seqNoKey, sequence); dequeueRequired = messages->push(qm, removed); listeners.populate(copy); @@ -627,11 +627,6 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg } if (traceId.size()) { - //copy on write: take deep copy of message before modifying it - //as the frames may already be available for delivery on other - //threads - boost::intrusive_ptr<Message> copy(new Message(*msg)); - msg = copy; msg->addTraceId(traceId); } diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index afe5b8ac3a..f306517d37 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -318,22 +318,22 @@ class MessageUpdater { lastPos = message.position; // if the ttl > 0, we need to send the calculated expiration time to the updatee - if (message.payload->getProperties<DeliveryProperties>()->getTtl() > 0) { + const DeliveryProperties* dprops = + message.payload->getProperties<DeliveryProperties>(); + if (dprops && dprops->getTtl() > 0) { bool hadMessageProps = message.payload->hasProperties<framing::MessageProperties>(); - framing::MessageProperties* mprops = + const framing::MessageProperties* mprops = message.payload->getProperties<framing::MessageProperties>(); bool hadApplicationHeaders = mprops->hasApplicationHeaders(); - FieldTable& applicationHeaders = mprops->getApplicationHeaders(); - applicationHeaders.setInt64( - UpdateClient::X_QPID_EXPIRATION, - sys::Duration(sys::EPOCH, message.payload->getExpiration())); + message.payload->insertCustomProperty(UpdateClient::X_QPID_EXPIRATION, + sys::Duration(sys::EPOCH, message.payload->getExpiration())); // If message properties or application headers didn't exist // prior to us adding data, we want to remove them on the other side. if (!hadMessageProps) - applicationHeaders.setInt(UpdateClient::X_QPID_NO_MESSAGE_PROPS, 0); + message.payload->insertCustomProperty(UpdateClient::X_QPID_NO_MESSAGE_PROPS, 0); else if (!hadApplicationHeaders) - applicationHeaders.setInt(UpdateClient::X_QPID_NO_HEADERS, 0); + message.payload->insertCustomProperty(UpdateClient::X_QPID_NO_HEADERS, 0); } // We can't send a broker::Message via the normal client API, diff --git a/cpp/src/qpid/cluster/UpdateExchange.cpp b/cpp/src/qpid/cluster/UpdateExchange.cpp index e830459aba..cb1376004e 100644 --- a/cpp/src/qpid/cluster/UpdateExchange.cpp +++ b/cpp/src/qpid/cluster/UpdateExchange.cpp @@ -49,18 +49,18 @@ void UpdateExchange::setProperties(const boost::intrusive_ptr<broker::Message>& // Copy expiration from x-property if present. if (msg->hasProperties<MessageProperties>()) { - MessageProperties* mprops = msg->getProperties<MessageProperties>(); + const MessageProperties* mprops = msg->getProperties<MessageProperties>(); if (mprops->hasApplicationHeaders()) { - FieldTable& headers = mprops->getApplicationHeaders(); + const FieldTable& headers = mprops->getApplicationHeaders(); if (headers.isSet(UpdateClient::X_QPID_EXPIRATION)) { msg->setExpiration( sys::AbsTime(sys::EPOCH, headers.getAsInt64(UpdateClient::X_QPID_EXPIRATION))); - headers.erase(UpdateClient::X_QPID_EXPIRATION); + msg->removeCustomProperty(UpdateClient::X_QPID_EXPIRATION); // Erase props/headers that were added by the UpdateClient if (headers.isSet(UpdateClient::X_QPID_NO_MESSAGE_PROPS)) msg->eraseProperties<MessageProperties>(); else if (headers.isSet(UpdateClient::X_QPID_NO_HEADERS)) - mprops->clearApplicationHeadersFlag(); + msg->clearApplicationHeadersFlag(); } } } diff --git a/cpp/src/qpid/framing/AMQFrame.cpp b/cpp/src/qpid/framing/AMQFrame.cpp index cd60cd971f..5b9673f0d0 100644 --- a/cpp/src/qpid/framing/AMQFrame.cpp +++ b/cpp/src/qpid/framing/AMQFrame.cpp @@ -139,6 +139,11 @@ bool AMQFrame::decode(Buffer& buffer) return true; } +void AMQFrame::cloneBody() +{ + body = body->clone(); +} + std::ostream& operator<<(std::ostream& out, const AMQFrame& f) { return diff --git a/cpp/src/qpid/framing/AMQFrame.h b/cpp/src/qpid/framing/AMQFrame.h index c669d12bc0..59c4a7501f 100644 --- a/cpp/src/qpid/framing/AMQFrame.h +++ b/cpp/src/qpid/framing/AMQFrame.h @@ -59,6 +59,11 @@ class QPID_COMMON_CLASS_EXTERN AMQFrame : public AMQDataBlock return boost::polymorphic_downcast<const T*>(getBody()); } + /** + * Take a deep copy of the body currently referenced + */ + void cloneBody(); + QPID_COMMON_EXTERN void encode(Buffer& buffer) const; QPID_COMMON_EXTERN bool decode(Buffer& buffer); QPID_COMMON_EXTERN uint32_t encodedSize() const; diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index 923005b9fc..50fdc82ee0 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -614,7 +614,7 @@ void ManagementAgent::sendBufferLH(const string& data, props->setAppId("qmf2"); for (i = headers.begin(); i != headers.end(); ++i) { - msg->getOrInsertHeaders().setString(i->first, i->second.asString()); + msg->insertCustomProperty(i->first, i->second.asString()); } DeliveryProperties* dp = diff --git a/cpp/src/qpid/replication/ReplicatingEventListener.cpp b/cpp/src/qpid/replication/ReplicatingEventListener.cpp index b7d52372f4..0ced4d9161 100644 --- a/cpp/src/qpid/replication/ReplicatingEventListener.cpp +++ b/cpp/src/qpid/replication/ReplicatingEventListener.cpp @@ -69,10 +69,9 @@ void ReplicatingEventListener::deliverDequeueMessage(const QueuedMessage& dequeu void ReplicatingEventListener::deliverEnqueueMessage(const QueuedMessage& enqueued) { boost::intrusive_ptr<Message> msg(cloneMessage(*(enqueued.queue), enqueued.payload)); - FieldTable& headers = msg->getProperties<MessageProperties>()->getApplicationHeaders(); - headers.setString(REPLICATION_TARGET_QUEUE, enqueued.queue->getName()); - headers.setInt(REPLICATION_EVENT_TYPE, ENQUEUE); - headers.setInt(QUEUE_MESSAGE_POSITION,enqueued.position); + msg->insertCustomProperty(REPLICATION_TARGET_QUEUE, enqueued.queue->getName()); + msg->insertCustomProperty(REPLICATION_EVENT_TYPE, ENQUEUE); + msg->insertCustomProperty(QUEUE_MESSAGE_POSITION,enqueued.position); route(msg); } diff --git a/cpp/src/qpid/replication/ReplicationExchange.cpp b/cpp/src/qpid/replication/ReplicationExchange.cpp index 4b6d25ac7d..89a2bf516d 100644 --- a/cpp/src/qpid/replication/ReplicationExchange.cpp +++ b/cpp/src/qpid/replication/ReplicationExchange.cpp @@ -97,11 +97,10 @@ void ReplicationExchange::handleEnqueueEvent(const FieldTable* args, Deliverable } else { queue->setPosition(seqno1); - FieldTable& headers = msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders(); - headers.erase(REPLICATION_TARGET_QUEUE); - headers.erase(REPLICATION_EVENT_SEQNO); - headers.erase(REPLICATION_EVENT_TYPE); - headers.erase(QUEUE_MESSAGE_POSITION); + msg.getMessage().removeCustomProperty(REPLICATION_TARGET_QUEUE); + msg.getMessage().removeCustomProperty(REPLICATION_EVENT_SEQNO); + msg.getMessage().removeCustomProperty(REPLICATION_EVENT_TYPE); + msg.getMessage().removeCustomProperty(QUEUE_MESSAGE_POSITION); msg.deliverTo(queue); QPID_LOG(debug, "Enqueued replicated message onto " << queueName); if (mgmtExchange != 0) { |
