summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-12-11 20:55:45 +0000
committerAlan Conway <aconway@apache.org>2009-12-11 20:55:45 +0000
commitd490fba74749bcde972e5a0d95f84b165f8ea05e (patch)
treeffc58006adb15ec8fa29955911f5f3a0f02dfa69 /cpp/src/qpid/cluster
parente4aee82085958588458ba34d2bf7dd0db90a257d (diff)
downloadqpid-python-d490fba74749bcde972e5a0d95f84b165f8ea05e.tar.gz
QPID-2266: error sending update: Enqueue capacity threshold exceeded
Fix for the problem with a test to verify that messages going to the store have the same headers and content-size for an updatee or a broker that receives the publish directly. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@889813 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp2
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp1
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp7
-rw-r--r--cpp/src/qpid/cluster/UpdateExchange.cpp47
-rw-r--r--cpp/src/qpid/cluster/UpdateExchange.h8
5 files changed, 57 insertions, 8 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index f877720350..d049001eb0 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -619,6 +619,7 @@ void Cluster::initMapCompleted(Lock& l) {
if (initMap.isUpdateNeeded()) { // Joining established cluster.
broker.setRecovery(false); // Ditch my current store.
+ broker.setClusterUpdatee(true);
state = JOINER;
}
else { // I can go ready.
@@ -813,6 +814,7 @@ void Cluster::checkUpdateIn(Lock& l) {
memberUpdate(l);
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
state = CATCHUP;
+ broker.setClusterUpdatee(false);
discarding = false; // ok to set, we're stalled for update.
QPID_LOG(notice, *this << " update complete, starting catch-up.");
deliverEventQueue.start();
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index 7f0a2752b0..e4aee6730b 100644
--- a/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -139,7 +139,6 @@ struct ClusterPlugin : public Plugin {
broker->setConnectionFactory(
boost::shared_ptr<sys::ConnectionCodec::Factory>(
new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster)));
- broker::Message::setUpdateDestination(UpdateClient::UPDATE);
ManagementAgent* mgmt = broker->getManagementAgent();
if (mgmt) {
std::auto_ptr<IdAllocator> allocator(new UpdateClientIdAllocator());
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index f263577fd3..279284da2c 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -217,10 +217,11 @@ class MessageUpdater {
// Disable client code that clears the delivery-properties.exchange
sb.get()->setDoClearDeliveryPropertiesExchange(false);
framing::MessageTransferBody transfer(
- framing::ProtocolVersion(), UpdateClient::UPDATE, message::ACCEPT_MODE_NONE,
- message::ACQUIRE_MODE_PRE_ACQUIRED);
+ *message.payload->getFrames().as<framing::MessageTransferBody>());
+ transfer.setDestination(UpdateClient::UPDATE);
- sb.get()->send(transfer, message.payload->getFrames(), !message.payload->isContentReleased());
+ sb.get()->send(transfer, message.payload->getFrames(),
+ !message.payload->isContentReleased());
if (message.payload->isContentReleased()){
uint16_t maxFrameSize = sb.get()->getConnection()->getNegotiatedSettings().maxFrameSize;
uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
diff --git a/cpp/src/qpid/cluster/UpdateExchange.cpp b/cpp/src/qpid/cluster/UpdateExchange.cpp
new file mode 100644
index 0000000000..11937f296f
--- /dev/null
+++ b/cpp/src/qpid/cluster/UpdateExchange.cpp
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/broker/Message.h"
+#include "UpdateExchange.h"
+
+namespace qpid {
+namespace cluster {
+
+using framing::MessageTransferBody;
+using framing::DeliveryProperties;
+
+UpdateExchange::UpdateExchange(management::Manageable* parent)
+ : broker::Exchange(UpdateClient::UPDATE, parent),
+ broker::FanOutExchange(UpdateClient::UPDATE, parent) {}
+
+
+void UpdateExchange::setProperties(const boost::intrusive_ptr<broker::Message>& msg) {
+ MessageTransferBody* transfer = msg->getMethod<MessageTransferBody>();
+ assert(transfer);
+ const DeliveryProperties* props = msg->getProperties<DeliveryProperties>();
+ assert(props);
+ if (props->hasExchange())
+ transfer->setDestination(props->getExchange());
+ else
+ transfer->clearDestinationFlag();
+}
+
+}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/UpdateExchange.h b/cpp/src/qpid/cluster/UpdateExchange.h
index 00a92c7f1e..9d7d9ee5fc 100644
--- a/cpp/src/qpid/cluster/UpdateExchange.h
+++ b/cpp/src/qpid/cluster/UpdateExchange.h
@@ -30,14 +30,14 @@ namespace qpid {
namespace cluster {
/**
- * A keyless exchange (like fanout exchange) that does not modify delivery-properties.exchange
- * on messages.
+ * A keyless exchange (like fanout exchange) that does not modify
+ * delivery-properties.exchange but copies it to the MessageTransfer.
*/
class UpdateExchange : public broker::FanOutExchange
{
public:
- UpdateExchange(management::Manageable* parent) : broker::Exchange(UpdateClient::UPDATE, parent), broker::FanOutExchange(UpdateClient::UPDATE, parent) {}
- void setProperties(const boost::intrusive_ptr<broker::Message>&) {}
+ UpdateExchange(management::Manageable* parent);
+ void setProperties(const boost::intrusive_ptr<broker::Message>&);
};
}} // namespace qpid::cluster