summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/src/qpid/broker/DeliveryAdapter.h1
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.cpp25
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.h19
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.cpp6
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.h1
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp22
-rw-r--r--cpp/src/qpid/broker/SemanticState.h3
-rw-r--r--cpp/src/tests/DeliveryRecordTest.cpp68
-rw-r--r--cpp/src/tests/Makefile.am1
-rw-r--r--cpp/src/tests/TxAckTest.cpp2
-rw-r--r--python/tests_0-10/message.py46
11 files changed, 152 insertions, 42 deletions
diff --git a/cpp/src/qpid/broker/DeliveryAdapter.h b/cpp/src/qpid/broker/DeliveryAdapter.h
index f645b37c23..28e2e3eb93 100644
--- a/cpp/src/qpid/broker/DeliveryAdapter.h
+++ b/cpp/src/qpid/broker/DeliveryAdapter.h
@@ -43,7 +43,6 @@ namespace broker {
{
public:
virtual DeliveryId deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token) = 0;
- virtual void redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag) = 0;
virtual ~DeliveryAdapter(){}
};
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp
index df700cd15a..62c2002801 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -27,13 +27,15 @@
using namespace qpid::broker;
using std::string;
-DeliveryRecord::DeliveryRecord(QueuedMessage& _msg,
+DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg,
Queue::shared_ptr _queue,
- const string _consumerTag,
+ const std::string _tag,
+ DeliveryToken::shared_ptr _token,
const DeliveryId _id,
bool _acquired, bool _confirmed) : msg(_msg),
queue(_queue),
- consumerTag(_consumerTag),
+ tag(_tag),
+ token(_token),
id(_id),
acquired(_acquired),
confirmed(_confirmed),
@@ -41,11 +43,10 @@ DeliveryRecord::DeliveryRecord(QueuedMessage& _msg,
{
}
-DeliveryRecord::DeliveryRecord(QueuedMessage& _msg,
+DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg,
Queue::shared_ptr _queue,
const DeliveryId _id) : msg(_msg),
queue(_queue),
- consumerTag(""),
id(_id),
acquired(true),
confirmed(false),
@@ -74,13 +75,13 @@ bool DeliveryRecord::coveredBy(const framing::AccumulatedAck* const range) const
return range->covers(id);
}
-void DeliveryRecord::redeliver(SemanticState* const session) const{
+void DeliveryRecord::redeliver(SemanticState* const session) {
if (!confirmed) {
if(pull){
//if message was originally sent as response to get, we must requeue it
requeue();
}else{
- session->deliver(msg.payload, consumerTag, id);
+ id = session->redeliver(msg.payload, token);
}
}
}
@@ -151,11 +152,17 @@ void DeliveryRecord::acquire(std::vector<DeliveryId>& results) {
namespace qpid {
namespace broker {
-std::ostream& operator<<(std::ostream& out, const DeliveryRecord& r) {
+std::ostream& operator<<(std::ostream& out, const DeliveryRecord& r)
+{
out << "{" << "id=" << r.id.getValue();
- out << ", consumer=" << r.consumerTag;
+ out << ", tag=" << r.tag << "}";
out << ", queue=" << r.queue->getName() << "}";
return out;
}
+bool operator<(const DeliveryRecord& a, const DeliveryRecord& b)
+{
+ return a.id < b.id;
+}
+
}}
diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h
index f2c343e27a..21944cd553 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.h
+++ b/cpp/src/qpid/broker/DeliveryRecord.h
@@ -29,6 +29,7 @@
#include "Queue.h"
#include "Consumer.h"
#include "DeliveryId.h"
+#include "DeliveryToken.h"
#include "Message.h"
#include "Prefetch.h"
@@ -42,16 +43,17 @@ class SemanticState;
class DeliveryRecord{
mutable QueuedMessage msg;
mutable Queue::shared_ptr queue;
- const std::string consumerTag;
- const DeliveryId id;
+ const std::string tag;
+ DeliveryToken::shared_ptr token;
+ DeliveryId id;
bool acquired;
const bool confirmed;
const bool pull;
public:
- DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const std::string consumerTag,
+ DeliveryRecord(const QueuedMessage& msg, Queue::shared_ptr queue, const std::string tag, DeliveryToken::shared_ptr token,
const DeliveryId id, bool acquired, bool confirmed = false);
- DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const DeliveryId id);
+ DeliveryRecord(const QueuedMessage& msg, Queue::shared_ptr queue, const DeliveryId id);
void dequeue(TransactionContext* ctxt = 0) const;
bool matches(DeliveryId tag) const;
@@ -61,17 +63,16 @@ class DeliveryRecord{
void requeue() const;
void release();
void reject();
- void redeliver(SemanticState* const) const;
+ void redeliver(SemanticState* const);
void updateByteCredit(uint32_t& credit) const;
void addTo(Prefetch&) const;
void subtractFrom(Prefetch&) const;
- const std::string& getConsumerTag() const { return consumerTag; }
+ const std::string& getTag() const { return tag; }
bool isPull() const { return pull; }
bool isAcquired() const { return acquired; }
- //void setAcquired(bool isAcquired) { acquired = isAcquired; }
void acquire(std::vector<DeliveryId>& results);
-
- friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&);
+ friend bool operator<(const DeliveryRecord&, const DeliveryRecord&);
+ friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&);
};
typedef std::list<DeliveryRecord>::iterator ack_iterator;
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp
index ae5810684d..9cacb4ccf7 100644
--- a/cpp/src/qpid/broker/SemanticHandler.cpp
+++ b/cpp/src/qpid/broker/SemanticHandler.cpp
@@ -176,12 +176,6 @@ DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::sha
return outgoing.hwm;
}
-void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag)
-{
- MessageDelivery::deliver(msg, session.getHandler().out, tag, token,
- session.getConnection().getFrameMax());
-}
-
SemanticHandler::TrackId SemanticHandler::getTrack(const AMQFrame& frame)
{
//will be replaced by field in 0-10 frame header
diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h
index d6dbf878c9..dc7f21ac34 100644
--- a/cpp/src/qpid/broker/SemanticHandler.h
+++ b/cpp/src/qpid/broker/SemanticHandler.h
@@ -71,7 +71,6 @@ class SemanticHandler : public DeliveryAdapter,
//delivery adapter methods:
DeliveryId deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token);
- void redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag);
framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); }
Connection& getConnection() { return session.getConnection(); }
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index a3858d0989..11bce282eb 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -250,7 +250,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
DeliveryId deliveryTag =
parent->deliveryAdapter.deliver(msg.payload, token);
if (windowing || ackExpected) {
- parent->record(DeliveryRecord(msg, queue, name, deliveryTag, acquire, !ackExpected));
+ parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected));
}
}
return !blocked;
@@ -273,11 +273,6 @@ bool SemanticState::ConsumerImpl::checkCredit(Message::shared_ptr& msg)
}
}
-void SemanticState::ConsumerImpl::redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag) {
- Mutex::ScopedLock locker(parent->deliveryLock);
- parent->deliveryAdapter.redeliver(msg, token, deliveryTag);
-}
-
SemanticState::ConsumerImpl::~ConsumerImpl() {
cancel();
}
@@ -384,7 +379,7 @@ void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative)
void SemanticState::acknowledged(const DeliveryRecord& delivery)
{
delivery.subtractFrom(outstanding);
- ConsumerImplMap::iterator i = consumers.find(delivery.getConsumerTag());
+ ConsumerImplMap::iterator i = consumers.find(delivery.getTag());
if (i != consumers.end()) {
i->acknowledged(delivery);
}
@@ -411,6 +406,10 @@ void SemanticState::recover(bool requeue)
for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue));
}else{
for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::redeliver), this));
+ //unconfirmed messages re redelivered and therefore have their
+ //id adjusted, confirmed messages are not and so the ordering
+ //w.r.t id is lost
+ unacked.sort();
}
}
@@ -429,13 +428,10 @@ bool SemanticState::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue
}
}
-void SemanticState::deliver(Message::shared_ptr& msg, const string& consumerTag,
- DeliveryId deliveryTag)
+DeliveryId SemanticState::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token)
{
- ConsumerImplMap::iterator i = consumers.find(consumerTag);
- if (i != consumers.end()){
- i->redeliver(msg, deliveryTag);
- }
+ Mutex::ScopedLock locker(deliveryLock);
+ return deliveryAdapter.deliver(msg, token);
}
void SemanticState::flow(bool active)
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index 6147380714..8372f345ad 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -77,7 +77,6 @@ class SemanticState : public framing::FrameHandler::Chains,
bool ack, bool nolocal, bool acquire);
~ConsumerImpl();
bool deliver(QueuedMessage& msg);
- void redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag);
void cancel();
void requestDispatch();
@@ -169,7 +168,7 @@ class SemanticState : public framing::FrameHandler::Chains,
void ackRange(DeliveryId deliveryTag, DeliveryId endTag);
void recover(bool requeue);
void flow(bool active);
- void deliver(Message::shared_ptr& msg, const string& consumerTag, DeliveryId deliveryTag);
+ DeliveryId redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token);
void acquire(DeliveryId first, DeliveryId last, std::vector<DeliveryId>& acquired);
void release(DeliveryId first, DeliveryId last);
void reject(DeliveryId first, DeliveryId last);
diff --git a/cpp/src/tests/DeliveryRecordTest.cpp b/cpp/src/tests/DeliveryRecordTest.cpp
new file mode 100644
index 0000000000..011d8bf694
--- /dev/null
+++ b/cpp/src/tests/DeliveryRecordTest.cpp
@@ -0,0 +1,68 @@
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/DeliveryRecord.h"
+#include "qpid_test_plugin.h"
+#include <iostream>
+#include <memory>
+#include <boost/format.hpp>
+
+using namespace qpid::broker;
+using namespace qpid::sys;
+using namespace qpid::framing;
+using boost::dynamic_pointer_cast;
+using std::list;
+
+class DeliveryRecordTest : public CppUnit::TestCase
+{
+ CPPUNIT_TEST_SUITE(DeliveryRecordTest);
+ CPPUNIT_TEST(testSort);
+ CPPUNIT_TEST_SUITE_END();
+
+public:
+
+ void testSort()
+ {
+ list<SequenceNumber> ids;
+ ids.push_back(SequenceNumber(6));
+ ids.push_back(SequenceNumber(2));
+ ids.push_back(SequenceNumber(4));
+ ids.push_back(SequenceNumber(5));
+ ids.push_back(SequenceNumber(1));
+ ids.push_back(SequenceNumber(3));
+
+ list<DeliveryRecord> records;
+ for (list<SequenceNumber>::iterator i = ids.begin(); i != ids.end(); i++) {
+ records.push_back(DeliveryRecord(QueuedMessage(), Queue::shared_ptr(), "tag", DeliveryToken::shared_ptr(), *i, false, false));
+ }
+ records.sort();
+
+ SequenceNumber expected(0);
+ for (list<DeliveryRecord>::iterator i = records.begin(); i != records.end(); i++) {
+ CPPUNIT_ASSERT(i->matches(++expected));
+ }
+ }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(DeliveryRecordTest);
+
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 9c4d5c0fd0..b8622fc925 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -86,6 +86,7 @@ perftest_LDADD=$(lib_client)
broker_unit_tests = \
AccumulatedAckTest \
DtxWorkRecordTest \
+ DeliveryRecordTest \
ExchangeTest \
HeadersExchangeTest \
MessageTest \
diff --git a/cpp/src/tests/TxAckTest.cpp b/cpp/src/tests/TxAckTest.cpp
index 34d0bcd156..73628f25b5 100644
--- a/cpp/src/tests/TxAckTest.cpp
+++ b/cpp/src/tests/TxAckTest.cpp
@@ -78,7 +78,7 @@ public:
messages.push_back(msg);
QueuedMessage qm;
qm.payload = msg;
- deliveries.push_back(DeliveryRecord(qm, queue, "xyz", (i+1), true));
+ deliveries.push_back(DeliveryRecord(qm, queue, "xyz", DeliveryToken::shared_ptr(), (i+1), true));
}
//assume msgs 1-5, 7 and 9 are all acked (i.e. 6, 8 & 10 are not)
diff --git a/python/tests_0-10/message.py b/python/tests_0-10/message.py
index b5b058340f..967c03bbea 100644
--- a/python/tests_0-10/message.py
+++ b/python/tests_0-10/message.py
@@ -187,6 +187,46 @@ class MessageTests(TestBase):
self.fail("Got unexpected message: " + extra.content.body)
except Empty: None
+
+ def test_recover(self):
+ """
+ Test recover behaviour
+ """
+ channel = self.channel
+ channel.queue_declare(queue="queue-a", exclusive=True)
+ channel.queue_bind(exchange="amq.fanout", queue="queue-a")
+ channel.queue_declare(queue="queue-b", exclusive=True)
+ channel.queue_bind(exchange="amq.fanout", queue="queue-b")
+
+ self.subscribe(queue="queue-a", destination="unconfirmed", confirm_mode=1)
+ self.subscribe(queue="queue-b", destination="confirmed", confirm_mode=0)
+ confirmed = self.client.queue("confirmed")
+ unconfirmed = self.client.queue("unconfirmed")
+
+ data = ["One", "Two", "Three", "Four", "Five"]
+ for d in data:
+ channel.message_transfer(destination="amq.fanout", content=Content(body=d))
+
+ for q in [confirmed, unconfirmed]:
+ for d in data:
+ self.assertEqual(d, q.get(timeout=1).content.body)
+ self.assertEmpty(q)
+
+ channel.message_recover(requeue=False)
+
+ self.assertEmpty(confirmed)
+
+ while len(data):
+ msg = None
+ for d in data:
+ msg = unconfirmed.get(timeout=1)
+ self.assertEqual(d, msg.content.body)
+ self.assertEmpty(unconfirmed)
+ data.remove(msg.content.body)
+ msg.complete(cumulative=False)
+ channel.message_recover(requeue=False)
+
+
def test_recover_requeue(self):
"""
Test requeing on recovery
@@ -551,3 +591,9 @@ class MessageTests(TestBase):
def assertDataEquals(self, channel, msg, expected):
self.assertEquals(expected, msg.content.body)
+
+ def assertEmpty(self, queue):
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Queue not empty, contains: " + extra.content.body)
+ except Empty: None