diff options
| author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-04-06 18:18:44 +0000 | 
|---|---|---|
| committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-04-06 18:18:44 +0000 | 
| commit | 7e080f3ef470dcec94079f3d7e59edbf4c791844 (patch) | |
| tree | da103251d43bc432098bf977224caf437b661cab /cpp/src | |
| parent | e70e9f746a726130b848f45563b28a015b4e3fa2 (diff) | |
| download | qpid-python-7e080f3ef470dcec94079f3d7e59edbf4c791844.tar.gz | |
QPID-2482: prevent duplication of messages that match multiple binding keys on a topic exchange
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@931257 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
| -rw-r--r-- | cpp/src/qpid/broker/TopicExchange.cpp | 11 | ||||
| -rwxr-xr-x | cpp/src/tests/federation.py | 51 | 
2 files changed, 60 insertions, 2 deletions
| diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp index 6e53ef5fd2..66ace42cfa 100644 --- a/cpp/src/qpid/broker/TopicExchange.cpp +++ b/cpp/src/qpid/broker/TopicExchange.cpp @@ -19,6 +19,7 @@   *   */  #include "qpid/broker/TopicExchange.h" +#include "qpid/log/Statement.h"  #include <algorithm> @@ -214,6 +215,8 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons              if (mgmtExchange != 0) {                  mgmtExchange->inc_bindingCount();              } +            QPID_LOG(debug, "Bound [" << routingPattern << "] to queue " << queue->getName() +                     << " (origin=" << fedOrigin << ")");          }      } else if (fedOp == fedOpUnbind) {          { @@ -274,6 +277,7 @@ bool TopicExchange::unbind(Queue::shared_ptr queue, const string& constRoutingKe      if (mgmtExchange != 0) {          mgmtExchange->dec_bindingCount();      } +    QPID_LOG(debug, "Unbound [" << routingKey << "] from queue " << queue->getName());      if (propagate)          propagateFedOp(routingKey, string(), fedOpUnbind, string()); @@ -294,16 +298,19 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, const string& pattern)  void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/)  { -    Binding::vector mb;      BindingList b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >);      PreRoute pr(msg, this); +    std::set<std::string> qSet;      {          RWlock::ScopedRlock l(lock);          for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {              if (match(i->first, routingKey)) {                  Binding::vector& qv(i->second.bindingVector);                  for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++){ -                    b->push_back(*j); +                    // do not duplicate queues on the binding list +                    if (qSet.insert(j->get()->queue->getName()).second) { +                        b->push_back(*j); +                    }                  }              }          } diff --git a/cpp/src/tests/federation.py b/cpp/src/tests/federation.py index 15fa858c68..5f269c8363 100755 --- a/cpp/src/tests/federation.py +++ b/cpp/src/tests/federation.py @@ -738,6 +738,57 @@ class FederationTests(TestBase010):          self.verify_cleanup() +    def test_dynamic_topic_nodup(self): +        """Verify that a message whose routing key matches more than one +        binding does not get duplicated to the same queue. +        """ +        session = self.session +        r_conn = self.connect(host=self.remote_host(), port=self.remote_port()) +        r_session = r_conn.session("test_dynamic_topic_nodup") + +        session.exchange_declare(exchange="fed.topic", type="topic") +        r_session.exchange_declare(exchange="fed.topic", type="topic") + +        self.startQmf() +        qmf = self.qmf +        broker = qmf.getObjects(_class="broker")[0] +        result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") +        self.assertEqual(result.status, 0) + +        link = qmf.getObjects(_class="link")[0] +        result = link.bridge(False, "fed.topic", "fed.topic", "", "", "", False, False, True, 0) +        self.assertEqual(result.status, 0) +        bridge = qmf.getObjects(_class="bridge")[0] +        sleep(5) + +        session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) +        session.exchange_bind(queue="fed1", exchange="fed.topic", binding_key="red.*") +        session.exchange_bind(queue="fed1", exchange="fed.topic", binding_key="*.herring") + +        self.subscribe(queue="fed1", destination="f1") +        queue = session.incoming("f1") + +        for i in range(1, 11): +            dp = r_session.delivery_properties(routing_key="red.herring") +            r_session.message_transfer(destination="fed.topic", message=Message(dp, "Message %d" % i)) + +        for i in range(1, 11): +            msg = queue.get(timeout=5) +            self.assertEqual("Message %d" % i, msg.body) +        try: +            extra = queue.get(timeout=1) +            self.fail("Got unexpected message in queue: " + extra.body) +        except Empty: None + +        result = bridge.close() +        self.assertEqual(result.status, 0) +        result = link.close() +        self.assertEqual(result.status, 0) + +        self.verify_cleanup() + + +      def getProperty(self, msg, name):          for h in msg.headers:              if hasattr(h, name): return getattr(h, name) | 
