summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-12-11 20:55:45 +0000
committerAlan Conway <aconway@apache.org>2009-12-11 20:55:45 +0000
commitd490fba74749bcde972e5a0d95f84b165f8ea05e (patch)
treeffc58006adb15ec8fa29955911f5f3a0f02dfa69 /cpp/src/qpid/broker
parente4aee82085958588458ba34d2bf7dd0db90a257d (diff)
downloadqpid-python-d490fba74749bcde972e5a0d95f84b165f8ea05e.tar.gz
QPID-2266: error sending update: Enqueue capacity threshold exceeded
Fix for the problem with a test to verify that messages going to the store have the same headers and content-size for an updatee or a broker that receives the publish directly. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@889813 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp1
-rw-r--r--cpp/src/qpid/broker/Broker.h5
-rw-r--r--cpp/src/qpid/broker/Message.cpp15
-rw-r--r--cpp/src/qpid/broker/Message.h8
-rw-r--r--cpp/src/qpid/broker/Queue.cpp2
5 files changed, 11 insertions, 20 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 3c67c429a0..849bf6d1f5 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -154,6 +154,7 @@ Broker::Broker(const Broker::Options& conf) :
queueCleaner(queues, timer),
queueEvents(poller,!conf.asyncQueueEvents),
recovery(true),
+ clusterUpdatee(false),
expiryPolicy(new ExpiryPolicy),
connectionCounter(conf.maxConnections),
getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 73d5860cb3..b85aa7d96c 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -168,8 +168,10 @@ public:
std::vector<Url> getKnownBrokersImpl();
std::string federationTag;
bool recovery;
+ bool clusterUpdatee;
boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
ConnectionCounter connectionCounter;
+
public:
virtual ~Broker();
@@ -259,6 +261,9 @@ public:
void setRecovery(bool set) { recovery = set; }
bool getRecovery() const { return recovery; }
+ void setClusterUpdatee(bool set) { clusterUpdatee = set; }
+ bool isClusterUpdatee() const { return clusterUpdatee; }
+
management::ManagementAgent* getManagementAgent() { return managementAgent.get(); }
ConnectionCounter& getConnectionCounter() {return connectionCounter;}
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index e2799b0bff..47ca7a7ae8 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -425,19 +425,4 @@ framing::FieldTable& Message::getOrInsertHeaders()
return getProperties<MessageProperties>()->getApplicationHeaders();
}
-
-void Message::setUpdateDestination(const std::string& d)
-{
- updateDestination = d;
-}
-
-
-bool Message::isUpdateMessage()
-{
- return updateDestination.size() && isA<MessageTransferBody>()
- && getMethod<MessageTransferBody>()->getDestination() == updateDestination;
-}
-
-std::string Message::updateDestination;
-
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index 3894960c95..375fa9ce26 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -104,6 +104,10 @@ public:
return frames.as<T>();
}
+ template <class T> T* getMethod() {
+ return frames.as<T>();
+ }
+
template <class T> bool isA() const {
return frames.isA<T>();
}
@@ -157,9 +161,6 @@ public:
void setDequeueCompleteCallback(MessageCallback& cb);
void resetDequeueCompleteCallback();
- bool isUpdateMessage();
- static void setUpdateDestination(const std::string&);
-
private:
typedef std::map<const Queue*,boost::intrusive_ptr<Message> > Replacement;
@@ -190,7 +191,6 @@ public:
MessageCallback* dequeueCallback;
uint32_t requiredCredit;
- static std::string updateDestination;
};
}}
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 780c254a56..ef1adaf7ec 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -598,7 +598,7 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
string key = ft->getAsString(qpidVQMatchProperty);
i = lvq.find(key);
- if (i == lvq.end() || msg->isUpdateMessage()){
+ if (i == lvq.end() || (broker && broker->isClusterUpdatee())) {
messages.push_back(qm);
listeners.populate(copy);
lvq[key] = msg;