diff options
| author | Alan Conway <aconway@apache.org> | 2009-12-11 20:55:45 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2009-12-11 20:55:45 +0000 |
| commit | d490fba74749bcde972e5a0d95f84b165f8ea05e (patch) | |
| tree | ffc58006adb15ec8fa29955911f5f3a0f02dfa69 /cpp/src/qpid | |
| parent | e4aee82085958588458ba34d2bf7dd0db90a257d (diff) | |
| download | qpid-python-d490fba74749bcde972e5a0d95f84b165f8ea05e.tar.gz | |
QPID-2266: error sending update: Enqueue capacity threshold exceeded
Fix for the problem with a test to verify that messages going to the store
have the same headers and content-size for an updatee or a broker that
receives the publish directly.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@889813 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Broker.h | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Message.cpp | 15 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Message.h | 8 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 7 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/UpdateExchange.cpp | 47 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/UpdateExchange.h | 8 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/FrameSet.cpp | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/FrameSet.h | 6 |
12 files changed, 79 insertions, 28 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 3c67c429a0..849bf6d1f5 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -154,6 +154,7 @@ Broker::Broker(const Broker::Options& conf) : queueCleaner(queues, timer), queueEvents(poller,!conf.asyncQueueEvents), recovery(true), + clusterUpdatee(false), expiryPolicy(new ExpiryPolicy), connectionCounter(conf.maxConnections), getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)) diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 73d5860cb3..b85aa7d96c 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -168,8 +168,10 @@ public: std::vector<Url> getKnownBrokersImpl(); std::string federationTag; bool recovery; + bool clusterUpdatee; boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; ConnectionCounter connectionCounter; + public: virtual ~Broker(); @@ -259,6 +261,9 @@ public: void setRecovery(bool set) { recovery = set; } bool getRecovery() const { return recovery; } + void setClusterUpdatee(bool set) { clusterUpdatee = set; } + bool isClusterUpdatee() const { return clusterUpdatee; } + management::ManagementAgent* getManagementAgent() { return managementAgent.get(); } ConnectionCounter& getConnectionCounter() {return connectionCounter;} diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index e2799b0bff..47ca7a7ae8 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -425,19 +425,4 @@ framing::FieldTable& Message::getOrInsertHeaders() return getProperties<MessageProperties>()->getApplicationHeaders(); } - -void Message::setUpdateDestination(const std::string& d) -{ - updateDestination = d; -} - - -bool Message::isUpdateMessage() -{ - return updateDestination.size() && isA<MessageTransferBody>() - && getMethod<MessageTransferBody>()->getDestination() == updateDestination; -} - -std::string Message::updateDestination; - }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index 3894960c95..375fa9ce26 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -104,6 +104,10 @@ public: return frames.as<T>(); } + template <class T> T* getMethod() { + return frames.as<T>(); + } + template <class T> bool isA() const { return frames.isA<T>(); } @@ -157,9 +161,6 @@ public: void setDequeueCompleteCallback(MessageCallback& cb); void resetDequeueCompleteCallback(); - bool isUpdateMessage(); - static void setUpdateDestination(const std::string&); - private: typedef std::map<const Queue*,boost::intrusive_ptr<Message> > Replacement; @@ -190,7 +191,6 @@ public: MessageCallback* dequeueCallback; uint32_t requiredCredit; - static std::string updateDestination; }; }} diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 780c254a56..ef1adaf7ec 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -598,7 +598,7 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ string key = ft->getAsString(qpidVQMatchProperty); i = lvq.find(key); - if (i == lvq.end() || msg->isUpdateMessage()){ + if (i == lvq.end() || (broker && broker->isClusterUpdatee())) { messages.push_back(qm); listeners.populate(copy); lvq[key] = msg; diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index f877720350..d049001eb0 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -619,6 +619,7 @@ void Cluster::initMapCompleted(Lock& l) { if (initMap.isUpdateNeeded()) { // Joining established cluster. broker.setRecovery(false); // Ditch my current store. + broker.setClusterUpdatee(true); state = JOINER; } else { // I can go ready. @@ -813,6 +814,7 @@ void Cluster::checkUpdateIn(Lock& l) { memberUpdate(l); mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); state = CATCHUP; + broker.setClusterUpdatee(false); discarding = false; // ok to set, we're stalled for update. QPID_LOG(notice, *this << " update complete, starting catch-up."); deliverEventQueue.start(); diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index 7f0a2752b0..e4aee6730b 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -139,7 +139,6 @@ struct ClusterPlugin : public Plugin { broker->setConnectionFactory( boost::shared_ptr<sys::ConnectionCodec::Factory>( new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster))); - broker::Message::setUpdateDestination(UpdateClient::UPDATE); ManagementAgent* mgmt = broker->getManagementAgent(); if (mgmt) { std::auto_ptr<IdAllocator> allocator(new UpdateClientIdAllocator()); diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index f263577fd3..279284da2c 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -217,10 +217,11 @@ class MessageUpdater { // Disable client code that clears the delivery-properties.exchange sb.get()->setDoClearDeliveryPropertiesExchange(false); framing::MessageTransferBody transfer( - framing::ProtocolVersion(), UpdateClient::UPDATE, message::ACCEPT_MODE_NONE, - message::ACQUIRE_MODE_PRE_ACQUIRED); + *message.payload->getFrames().as<framing::MessageTransferBody>()); + transfer.setDestination(UpdateClient::UPDATE); - sb.get()->send(transfer, message.payload->getFrames(), !message.payload->isContentReleased()); + sb.get()->send(transfer, message.payload->getFrames(), + !message.payload->isContentReleased()); if (message.payload->isContentReleased()){ uint16_t maxFrameSize = sb.get()->getConnection()->getNegotiatedSettings().maxFrameSize; uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); diff --git a/cpp/src/qpid/cluster/UpdateExchange.cpp b/cpp/src/qpid/cluster/UpdateExchange.cpp new file mode 100644 index 0000000000..11937f296f --- /dev/null +++ b/cpp/src/qpid/cluster/UpdateExchange.cpp @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/broker/Message.h" +#include "UpdateExchange.h" + +namespace qpid { +namespace cluster { + +using framing::MessageTransferBody; +using framing::DeliveryProperties; + +UpdateExchange::UpdateExchange(management::Manageable* parent) + : broker::Exchange(UpdateClient::UPDATE, parent), + broker::FanOutExchange(UpdateClient::UPDATE, parent) {} + + +void UpdateExchange::setProperties(const boost::intrusive_ptr<broker::Message>& msg) { + MessageTransferBody* transfer = msg->getMethod<MessageTransferBody>(); + assert(transfer); + const DeliveryProperties* props = msg->getProperties<DeliveryProperties>(); + assert(props); + if (props->hasExchange()) + transfer->setDestination(props->getExchange()); + else + transfer->clearDestinationFlag(); +} + +}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/UpdateExchange.h b/cpp/src/qpid/cluster/UpdateExchange.h index 00a92c7f1e..9d7d9ee5fc 100644 --- a/cpp/src/qpid/cluster/UpdateExchange.h +++ b/cpp/src/qpid/cluster/UpdateExchange.h @@ -30,14 +30,14 @@ namespace qpid { namespace cluster { /** - * A keyless exchange (like fanout exchange) that does not modify delivery-properties.exchange - * on messages. + * A keyless exchange (like fanout exchange) that does not modify + * delivery-properties.exchange but copies it to the MessageTransfer. */ class UpdateExchange : public broker::FanOutExchange { public: - UpdateExchange(management::Manageable* parent) : broker::Exchange(UpdateClient::UPDATE, parent), broker::FanOutExchange(UpdateClient::UPDATE, parent) {} - void setProperties(const boost::intrusive_ptr<broker::Message>&) {} + UpdateExchange(management::Manageable* parent); + void setProperties(const boost::intrusive_ptr<broker::Message>&); }; }} // namespace qpid::cluster diff --git a/cpp/src/qpid/framing/FrameSet.cpp b/cpp/src/qpid/framing/FrameSet.cpp index f50035e49a..c03dd39458 100644 --- a/cpp/src/qpid/framing/FrameSet.cpp +++ b/cpp/src/qpid/framing/FrameSet.cpp @@ -52,6 +52,11 @@ const AMQMethodBody* FrameSet::getMethod() const return parts.empty() ? 0 : parts[0].getMethod(); } +AMQMethodBody* FrameSet::getMethod() +{ + return parts.empty() ? 0 : parts[0].getMethod(); +} + const AMQHeaderBody* FrameSet::getHeaders() const { return parts.size() < 2 ? 0 : parts[1].castBody<AMQHeaderBody>(); diff --git a/cpp/src/qpid/framing/FrameSet.h b/cpp/src/qpid/framing/FrameSet.h index e3e8727600..398a709353 100644 --- a/cpp/src/qpid/framing/FrameSet.h +++ b/cpp/src/qpid/framing/FrameSet.h @@ -57,6 +57,7 @@ public: bool isContentBearing() const; QPID_COMMON_EXTERN const AMQMethodBody* getMethod() const; + QPID_COMMON_EXTERN AMQMethodBody* getMethod(); QPID_COMMON_EXTERN const AMQHeaderBody* getHeaders() const; QPID_COMMON_EXTERN AMQHeaderBody* getHeaders(); @@ -70,6 +71,11 @@ public: return (method && method->isA<T>()) ? dynamic_cast<const T*>(method) : 0; } + template <class T> T* as() { + AMQMethodBody* method = getMethod(); + return (method && method->isA<T>()) ? dynamic_cast<T*>(method) : 0; + } + template <class T> const T* getHeaderProperties() const { const AMQHeaderBody* header = getHeaders(); return header ? header->get<T>() : 0; |
