summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/HeadersExchange.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/HeadersExchange.cpp')
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.cpp185
1 files changed, 153 insertions, 32 deletions
diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp
index 38cc0e4050..e4a76a0bcd 100644
--- a/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -37,10 +37,21 @@ namespace _qmf = qmf::org::apache::qpid::broker;
using namespace qpid::broker;
namespace {
+ const std::string x_match("x-match");
+ // possible values for x-match
const std::string all("all");
const std::string any("any");
- const std::string x_match("x-match");
const std::string empty;
+
+ // federation related args and values
+ const std::string qpidFedOp("qpid.fed.op");
+ const std::string qpidFedTags("qpid.fed.tags");
+ const std::string qpidFedOrigin("qpid.fed.origin");
+
+ const std::string fedOpBind("B");
+ const std::string fedOpUnbind("U");
+ const std::string fedOpReorigin("R");
+ const std::string fedOpHello("H");
}
HeadersExchange::HeadersExchange(const string& _name, Manageable* _parent, Broker* b) :
@@ -68,37 +79,106 @@ std::string HeadersExchange::getMatch(const FieldTable* args)
return empty;
}
if (!what->convertsTo<std::string>()) {
- throw InternalErrorException(QPID_MSG("Invalid x-match value binding to headers exchange."));
+ throw InternalErrorException(QPID_MSG("Invalid x-match binding format to headers exchange. Must be a string [\"all\" or \"any\"]"));
}
return what->get<std::string>();
}
-bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable* args){
- std::string what = getMatch(args);
- if (what != all && what != any)
- throw InternalErrorException(QPID_MSG("Invalid x-match value binding to headers exchange."));
+bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable* args)
+{
+ string fedOp(fedOpBind);
+ string fedTags;
+ string fedOrigin;
+ if (args) {
+ fedOp = args->getAsString(qpidFedOp);
+ fedTags = args->getAsString(qpidFedTags);
+ fedOrigin = args->getAsString(qpidFedOrigin);
+ }
+ bool propagate = false;
- Binding::shared_ptr binding (new Binding (bindingKey, queue, this, *args));
- if (bindings.add_unless(binding, MatchArgs(queue, args))) {
- if (mgmtExchange != 0) {
- mgmtExchange->inc_bindingCount();
+ // The federation args get propagated directly, so we need to identify
+ // the non feteration args in case a federated propagate is needed
+ FieldTable extra_args;
+ getNonFedArgs(args, extra_args);
+
+ if (fedOp.empty() || fedOp == fedOpBind) {
+ // x-match arg MUST be present for a bind call
+ std::string x_match_value = getMatch(args);
+
+ if (x_match_value != all && x_match_value != any) {
+ throw InternalErrorException(QPID_MSG("Invalid or missing x-match value binding to headers exchange. Must be a string [\"all\" or \"any\"]"));
}
- routeIVE();
- return true;
- } else {
- return false;
+
+ {
+ Mutex::ScopedLock l(lock);
+ Binding::shared_ptr binding (new Binding (bindingKey, queue, this, *args));
+ BoundKey bk(binding);
+ if (bindings.add_unless(bk, MatchArgs(queue, args))) {
+ propagate = bk.fedBinding.addOrigin(fedOrigin);
+ if (mgmtExchange != 0) {
+ mgmtExchange->inc_bindingCount();
+ }
+ } else {
+ return false;
+ }
+ } // lock dropped
+
+ } else if (fedOp == fedOpUnbind) {
+ Mutex::ScopedLock l(lock);
+
+ FedUnbindModifier modifier(fedOrigin);
+ bindings.modify_if(MatchKey(queue, bindingKey), modifier);
+ propagate = modifier.shouldPropagate;
+ if (modifier.shouldUnbind) {
+ unbind(queue, bindingKey, args);
+ }
+
+ } else if (fedOp == fedOpReorigin) {
+ Bindings::ConstPtr p = bindings.snapshot();
+ if (p.get())
+ {
+ Mutex::ScopedLock l(lock);
+ for (std::vector<BoundKey>::const_iterator i = p->begin(); i != p->end(); ++i)
+ {
+ if ((*i).fedBinding.hasLocal()) {
+ propagateFedOp( (*i).binding->key, string(), fedOpBind, string());
+ }
+ }
+ }
+ }
+ routeIVE();
+ if (propagate) {
+ FieldTable * prop_args = (extra_args.count() != 0 ? &extra_args : 0);
+ propagateFedOp(bindingKey, fedTags, fedOp, fedOrigin, prop_args);
}
+
+ return true;
}
bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable*){
- if (bindings.remove_if(MatchKey(queue, bindingKey))) {
- if (mgmtExchange != 0) {
- mgmtExchange->dec_bindingCount();
+ bool propagate = false;
+ {
+ Mutex::ScopedLock l(lock);
+
+ FedUnbindModifier modifier;
+ MatchKey match_key(queue, bindingKey);
+ bindings.modify_if(match_key, modifier);
+ propagate = modifier.shouldPropagate;
+ if (modifier.shouldUnbind) {
+ if (bindings.remove_if(match_key)) {
+ if (mgmtExchange != 0) {
+ mgmtExchange->dec_bindingCount();
+ }
+ } else {
+ return false;
+ }
}
- return true;
- } else {
- return false;
}
+
+ if (propagate) {
+ propagateFedOp(bindingKey, string(), fedOpUnbind, string());
+ }
+ return true;
}
@@ -117,13 +197,12 @@ void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, cons
PreRoute pr(msg, this);
- ConstBindingList p = bindings.snapshot();
BindingList b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >);
- if (p.get())
- {
- for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i) {
- if (match((*i)->args, *args)) {
- b->push_back(*i);
+ Bindings::ConstPtr p = bindings.snapshot();
+ if (p.get()) {
+ for (std::vector<BoundKey>::const_iterator i = p->begin(); i != p->end(); ++i) {
+ if (match((*i).binding->args, *args)) {
+ b->push_back((*i).binding);
}
}
}
@@ -135,8 +214,8 @@ bool HeadersExchange::isBound(Queue::shared_ptr queue, const string* const, cons
{
Bindings::ConstPtr p = bindings.snapshot();
if (p.get()){
- for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i) {
- if ( (!args || equal((*i)->args, *args)) && (!queue || (*i)->queue == queue)) {
+ for (std::vector<BoundKey>::const_iterator i = p->begin(); i != p->end(); ++i) {
+ if ( (!args || equal((*i).binding->args, *args)) && (!queue || (*i).binding->queue == queue)) {
return true;
}
}
@@ -144,6 +223,26 @@ bool HeadersExchange::isBound(Queue::shared_ptr queue, const string* const, cons
return false;
}
+void HeadersExchange::getNonFedArgs(const FieldTable* args, FieldTable& nonFedArgs)
+{
+ if (!args)
+ {
+ return;
+ }
+
+ for (qpid::framing::FieldTable::ValueMap::const_iterator i=args->begin(); i != args->end(); ++i)
+ {
+ const string & name(i->first);
+ if (name == qpidFedOp ||
+ name == qpidFedTags ||
+ name == qpidFedOrigin)
+ {
+ continue;
+ }
+ nonFedArgs.insert((*i));
+ }
+}
+
HeadersExchange::~HeadersExchange() {}
const std::string HeadersExchange::typeName("headers");
@@ -206,15 +305,37 @@ bool HeadersExchange::equal(const FieldTable& a, const FieldTable& b) {
return true;
}
+//---------
HeadersExchange::MatchArgs::MatchArgs(Queue::shared_ptr q, const qpid::framing::FieldTable* a) : queue(q), args(a) {}
-bool HeadersExchange::MatchArgs::operator()(Exchange::Binding::shared_ptr b)
+
+bool HeadersExchange::MatchArgs::operator()(BoundKey & bk)
{
- return b->queue == queue && b->args == *args;
+ return bk.binding->queue == queue && bk.binding->args == *args;
}
+//---------
HeadersExchange::MatchKey::MatchKey(Queue::shared_ptr q, const std::string& k) : queue(q), key(k) {}
-bool HeadersExchange::MatchKey::operator()(Exchange::Binding::shared_ptr b)
+bool HeadersExchange::MatchKey::operator()(BoundKey & bk)
{
- return b->queue == queue && b->key == key;
+ return bk.binding->queue == queue && bk.binding->key == key;
}
+
+//----------
+HeadersExchange::FedUnbindModifier::FedUnbindModifier(string & origin) : fedOrigin(origin), shouldUnbind(false), shouldPropagate(false) {}
+HeadersExchange::FedUnbindModifier::FedUnbindModifier() : shouldUnbind(false), shouldPropagate(false) {}
+
+bool HeadersExchange::FedUnbindModifier::operator()(BoundKey & bk)
+{
+ if ("" == fedOrigin) {
+ shouldPropagate = bk.fedBinding.delOrigin();
+ } else {
+ shouldPropagate = bk.fedBinding.delOrigin(fedOrigin);
+ }
+ if (bk.fedBinding.count() == 0)
+ {
+ shouldUnbind = true;
+ }
+ return true;
+}
+