diff options
| author | Alan Conway <aconway@apache.org> | 2009-12-11 20:55:45 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2009-12-11 20:55:45 +0000 |
| commit | d490fba74749bcde972e5a0d95f84b165f8ea05e (patch) | |
| tree | ffc58006adb15ec8fa29955911f5f3a0f02dfa69 /cpp/src/qpid/cluster | |
| parent | e4aee82085958588458ba34d2bf7dd0db90a257d (diff) | |
| download | qpid-python-d490fba74749bcde972e5a0d95f84b165f8ea05e.tar.gz | |
QPID-2266: error sending update: Enqueue capacity threshold exceeded
Fix for the problem with a test to verify that messages going to the store
have the same headers and content-size for an updatee or a broker that
receives the publish directly.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@889813 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 7 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/UpdateExchange.cpp | 47 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/UpdateExchange.h | 8 |
5 files changed, 57 insertions, 8 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index f877720350..d049001eb0 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -619,6 +619,7 @@ void Cluster::initMapCompleted(Lock& l) { if (initMap.isUpdateNeeded()) { // Joining established cluster. broker.setRecovery(false); // Ditch my current store. + broker.setClusterUpdatee(true); state = JOINER; } else { // I can go ready. @@ -813,6 +814,7 @@ void Cluster::checkUpdateIn(Lock& l) { memberUpdate(l); mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); state = CATCHUP; + broker.setClusterUpdatee(false); discarding = false; // ok to set, we're stalled for update. QPID_LOG(notice, *this << " update complete, starting catch-up."); deliverEventQueue.start(); diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index 7f0a2752b0..e4aee6730b 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -139,7 +139,6 @@ struct ClusterPlugin : public Plugin { broker->setConnectionFactory( boost::shared_ptr<sys::ConnectionCodec::Factory>( new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster))); - broker::Message::setUpdateDestination(UpdateClient::UPDATE); ManagementAgent* mgmt = broker->getManagementAgent(); if (mgmt) { std::auto_ptr<IdAllocator> allocator(new UpdateClientIdAllocator()); diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index f263577fd3..279284da2c 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -217,10 +217,11 @@ class MessageUpdater { // Disable client code that clears the delivery-properties.exchange sb.get()->setDoClearDeliveryPropertiesExchange(false); framing::MessageTransferBody transfer( - framing::ProtocolVersion(), UpdateClient::UPDATE, message::ACCEPT_MODE_NONE, - message::ACQUIRE_MODE_PRE_ACQUIRED); + *message.payload->getFrames().as<framing::MessageTransferBody>()); + transfer.setDestination(UpdateClient::UPDATE); - sb.get()->send(transfer, message.payload->getFrames(), !message.payload->isContentReleased()); + sb.get()->send(transfer, message.payload->getFrames(), + !message.payload->isContentReleased()); if (message.payload->isContentReleased()){ uint16_t maxFrameSize = sb.get()->getConnection()->getNegotiatedSettings().maxFrameSize; uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); diff --git a/cpp/src/qpid/cluster/UpdateExchange.cpp b/cpp/src/qpid/cluster/UpdateExchange.cpp new file mode 100644 index 0000000000..11937f296f --- /dev/null +++ b/cpp/src/qpid/cluster/UpdateExchange.cpp @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/broker/Message.h" +#include "UpdateExchange.h" + +namespace qpid { +namespace cluster { + +using framing::MessageTransferBody; +using framing::DeliveryProperties; + +UpdateExchange::UpdateExchange(management::Manageable* parent) + : broker::Exchange(UpdateClient::UPDATE, parent), + broker::FanOutExchange(UpdateClient::UPDATE, parent) {} + + +void UpdateExchange::setProperties(const boost::intrusive_ptr<broker::Message>& msg) { + MessageTransferBody* transfer = msg->getMethod<MessageTransferBody>(); + assert(transfer); + const DeliveryProperties* props = msg->getProperties<DeliveryProperties>(); + assert(props); + if (props->hasExchange()) + transfer->setDestination(props->getExchange()); + else + transfer->clearDestinationFlag(); +} + +}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/UpdateExchange.h b/cpp/src/qpid/cluster/UpdateExchange.h index 00a92c7f1e..9d7d9ee5fc 100644 --- a/cpp/src/qpid/cluster/UpdateExchange.h +++ b/cpp/src/qpid/cluster/UpdateExchange.h @@ -30,14 +30,14 @@ namespace qpid { namespace cluster { /** - * A keyless exchange (like fanout exchange) that does not modify delivery-properties.exchange - * on messages. + * A keyless exchange (like fanout exchange) that does not modify + * delivery-properties.exchange but copies it to the MessageTransfer. */ class UpdateExchange : public broker::FanOutExchange { public: - UpdateExchange(management::Manageable* parent) : broker::Exchange(UpdateClient::UPDATE, parent), broker::FanOutExchange(UpdateClient::UPDATE, parent) {} - void setProperties(const boost::intrusive_ptr<broker::Message>&) {} + UpdateExchange(management::Manageable* parent); + void setProperties(const boost::intrusive_ptr<broker::Message>&); }; }} // namespace qpid::cluster |
