summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-10-02 09:54:59 +0000
committerGordon Sim <gsim@apache.org>2007-10-02 09:54:59 +0000
commit2ed6c3f489f3bf740d1641400037b644c132b75a (patch)
tree62cfc2f5ed5c21d2823a4f6a47b20ba1ea6dc721
parent5f0f7edc1f366cfd8981a29319130aa72b65efa3 (diff)
downloadqpid-python-2ed6c3f489f3bf740d1641400037b644c132b75a.tar.gz
Fixed recovery; unacked message records are now updated to hold the new command id when messages are resent.
Added unit and python test as previous bug was not being picked up by the existing tests. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@581175 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/broker/DeliveryAdapter.h1
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.cpp25
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.h19
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.cpp6
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.h1
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp22
-rw-r--r--cpp/src/qpid/broker/SemanticState.h3
-rw-r--r--cpp/src/tests/DeliveryRecordTest.cpp68
-rw-r--r--cpp/src/tests/Makefile.am1
-rw-r--r--cpp/src/tests/TxAckTest.cpp2
-rw-r--r--python/tests_0-10/message.py46
11 files changed, 152 insertions, 42 deletions
diff --git a/cpp/src/qpid/broker/DeliveryAdapter.h b/cpp/src/qpid/broker/DeliveryAdapter.h
index f645b37c23..28e2e3eb93 100644
--- a/cpp/src/qpid/broker/DeliveryAdapter.h
+++ b/cpp/src/qpid/broker/DeliveryAdapter.h
@@ -43,7 +43,6 @@ namespace broker {
{
public:
virtual DeliveryId deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token) = 0;
- virtual void redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag) = 0;
virtual ~DeliveryAdapter(){}
};
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp
index df700cd15a..62c2002801 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -27,13 +27,15 @@
using namespace qpid::broker;
using std::string;
-DeliveryRecord::DeliveryRecord(QueuedMessage& _msg,
+DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg,
Queue::shared_ptr _queue,
- const string _consumerTag,
+ const std::string _tag,
+ DeliveryToken::shared_ptr _token,
const DeliveryId _id,
bool _acquired, bool _confirmed) : msg(_msg),
queue(_queue),
- consumerTag(_consumerTag),
+ tag(_tag),
+ token(_token),
id(_id),
acquired(_acquired),
confirmed(_confirmed),
@@ -41,11 +43,10 @@ DeliveryRecord::DeliveryRecord(QueuedMessage& _msg,
{
}
-DeliveryRecord::DeliveryRecord(QueuedMessage& _msg,
+DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg,
Queue::shared_ptr _queue,
const DeliveryId _id) : msg(_msg),
queue(_queue),
- consumerTag(""),
id(_id),
acquired(true),
confirmed(false),
@@ -74,13 +75,13 @@ bool DeliveryRecord::coveredBy(const framing::AccumulatedAck* const range) const
return range->covers(id);
}
-void DeliveryRecord::redeliver(SemanticState* const session) const{
+void DeliveryRecord::redeliver(SemanticState* const session) {
if (!confirmed) {
if(pull){
//if message was originally sent as response to get, we must requeue it
requeue();
}else{
- session->deliver(msg.payload, consumerTag, id);
+ id = session->redeliver(msg.payload, token);
}
}
}
@@ -151,11 +152,17 @@ void DeliveryRecord::acquire(std::vector<DeliveryId>& results) {
namespace qpid {
namespace broker {
-std::ostream& operator<<(std::ostream& out, const DeliveryRecord& r) {
+std::ostream& operator<<(std::ostream& out, const DeliveryRecord& r)
+{
out << "{" << "id=" << r.id.getValue();
- out << ", consumer=" << r.consumerTag;
+ out << ", tag=" << r.tag << "}";
out << ", queue=" << r.queue->getName() << "}";
return out;
}
+bool operator<(const DeliveryRecord& a, const DeliveryRecord& b)
+{
+ return a.id < b.id;
+}
+
}}
diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h
index f2c343e27a..21944cd553 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.h
+++ b/cpp/src/qpid/broker/DeliveryRecord.h
@@ -29,6 +29,7 @@
#include "Queue.h"
#include "Consumer.h"
#include "DeliveryId.h"
+#include "DeliveryToken.h"
#include "Message.h"
#include "Prefetch.h"
@@ -42,16 +43,17 @@ class SemanticState;
class DeliveryRecord{
mutable QueuedMessage msg;
mutable Queue::shared_ptr queue;
- const std::string consumerTag;
- const DeliveryId id;
+ const std::string tag;
+ DeliveryToken::shared_ptr token;
+ DeliveryId id;
bool acquired;
const bool confirmed;
const bool pull;
public:
- DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const std::string consumerTag,
+ DeliveryRecord(const QueuedMessage& msg, Queue::shared_ptr queue, const std::string tag, DeliveryToken::shared_ptr token,
const DeliveryId id, bool acquired, bool confirmed = false);
- DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const DeliveryId id);
+ DeliveryRecord(const QueuedMessage& msg, Queue::shared_ptr queue, const DeliveryId id);
void dequeue(TransactionContext* ctxt = 0) const;
bool matches(DeliveryId tag) const;
@@ -61,17 +63,16 @@ class DeliveryRecord{
void requeue() const;
void release();
void reject();
- void redeliver(SemanticState* const) const;
+ void redeliver(SemanticState* const);
void updateByteCredit(uint32_t& credit) const;
void addTo(Prefetch&) const;
void subtractFrom(Prefetch&) const;
- const std::string& getConsumerTag() const { return consumerTag; }
+ const std::string& getTag() const { return tag; }
bool isPull() const { return pull; }
bool isAcquired() const { return acquired; }
- //void setAcquired(bool isAcquired) { acquired = isAcquired; }
void acquire(std::vector<DeliveryId>& results);
-
- friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&);
+ friend bool operator<(const DeliveryRecord&, const DeliveryRecord&);
+ friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&);
};
typedef std::list<DeliveryRecord>::iterator ack_iterator;
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp
index ae5810684d..9cacb4ccf7 100644
--- a/cpp/src/qpid/broker/SemanticHandler.cpp
+++ b/cpp/src/qpid/broker/SemanticHandler.cpp
@@ -176,12 +176,6 @@ DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::sha
return outgoing.hwm;
}
-void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag)
-{
- MessageDelivery::deliver(msg, session.getHandler().out, tag, token,
- session.getConnection().getFrameMax());
-}
-
SemanticHandler::TrackId SemanticHandler::getTrack(const AMQFrame& frame)
{
//will be replaced by field in 0-10 frame header
diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h
index d6dbf878c9..dc7f21ac34 100644
--- a/cpp/src/qpid/broker/SemanticHandler.h
+++ b/cpp/src/qpid/broker/SemanticHandler.h
@@ -71,7 +71,6 @@ class SemanticHandler : public DeliveryAdapter,
//delivery adapter methods:
DeliveryId deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token);
- void redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag);
framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); }
Connection& getConnection() { return session.getConnection(); }
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index a3858d0989..11bce282eb 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -250,7 +250,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
DeliveryId deliveryTag =
parent->deliveryAdapter.deliver(msg.payload, token);
if (windowing || ackExpected) {
- parent->record(DeliveryRecord(msg, queue, name, deliveryTag, acquire, !ackExpected));
+ parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected));
}
}
return !blocked;
@@ -273,11 +273,6 @@ bool SemanticState::ConsumerImpl::checkCredit(Message::shared_ptr& msg)
}
}
-void SemanticState::ConsumerImpl::redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag) {
- Mutex::ScopedLock locker(parent->deliveryLock);
- parent->deliveryAdapter.redeliver(msg, token, deliveryTag);
-}
-
SemanticState::ConsumerImpl::~ConsumerImpl() {
cancel();
}
@@ -384,7 +379,7 @@ void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative)
void SemanticState::acknowledged(const DeliveryRecord& delivery)
{
delivery.subtractFrom(outstanding);
- ConsumerImplMap::iterator i = consumers.find(delivery.getConsumerTag());
+ ConsumerImplMap::iterator i = consumers.find(delivery.getTag());
if (i != consumers.end()) {
i->acknowledged(delivery);
}
@@ -411,6 +406,10 @@ void SemanticState::recover(bool requeue)
for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue));
}else{
for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::redeliver), this));
+ //unconfirmed messages re redelivered and therefore have their
+ //id adjusted, confirmed messages are not and so the ordering
+ //w.r.t id is lost
+ unacked.sort();
}
}
@@ -429,13 +428,10 @@ bool SemanticState::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue
}
}
-void SemanticState::deliver(Message::shared_ptr& msg, const string& consumerTag,
- DeliveryId deliveryTag)
+DeliveryId SemanticState::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token)
{
- ConsumerImplMap::iterator i = consumers.find(consumerTag);
- if (i != consumers.end()){
- i->redeliver(msg, deliveryTag);
- }
+ Mutex::ScopedLock locker(deliveryLock);
+ return deliveryAdapter.deliver(msg, token);
}
void SemanticState::flow(bool active)
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index 6147380714..8372f345ad 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -77,7 +77,6 @@ class SemanticState : public framing::FrameHandler::Chains,
bool ack, bool nolocal, bool acquire);
~ConsumerImpl();
bool deliver(QueuedMessage& msg);
- void redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag);
void cancel();
void requestDispatch();
@@ -169,7 +168,7 @@ class SemanticState : public framing::FrameHandler::Chains,
void ackRange(DeliveryId deliveryTag, DeliveryId endTag);
void recover(bool requeue);
void flow(bool active);
- void deliver(Message::shared_ptr& msg, const string& consumerTag, DeliveryId deliveryTag);
+ DeliveryId redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token);
void acquire(DeliveryId first, DeliveryId last, std::vector<DeliveryId>& acquired);
void release(DeliveryId first, DeliveryId last);
void reject(DeliveryId first, DeliveryId last);
diff --git a/cpp/src/tests/DeliveryRecordTest.cpp b/cpp/src/tests/DeliveryRecordTest.cpp
new file mode 100644
index 0000000000..011d8bf694
--- /dev/null
+++ b/cpp/src/tests/DeliveryRecordTest.cpp
@@ -0,0 +1,68 @@
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/DeliveryRecord.h"
+#include "qpid_test_plugin.h"
+#include <iostream>
+#include <memory>
+#include <boost/format.hpp>
+
+using namespace qpid::broker;
+using namespace qpid::sys;
+using namespace qpid::framing;
+using boost::dynamic_pointer_cast;
+using std::list;
+
+class DeliveryRecordTest : public CppUnit::TestCase
+{
+ CPPUNIT_TEST_SUITE(DeliveryRecordTest);
+ CPPUNIT_TEST(testSort);
+ CPPUNIT_TEST_SUITE_END();
+
+public:
+
+ void testSort()
+ {
+ list<SequenceNumber> ids;
+ ids.push_back(SequenceNumber(6));
+ ids.push_back(SequenceNumber(2));
+ ids.push_back(SequenceNumber(4));
+ ids.push_back(SequenceNumber(5));
+ ids.push_back(SequenceNumber(1));
+ ids.push_back(SequenceNumber(3));
+
+ list<DeliveryRecord> records;
+ for (list<SequenceNumber>::iterator i = ids.begin(); i != ids.end(); i++) {
+ records.push_back(DeliveryRecord(QueuedMessage(), Queue::shared_ptr(), "tag", DeliveryToken::shared_ptr(), *i, false, false));
+ }
+ records.sort();
+
+ SequenceNumber expected(0);
+ for (list<DeliveryRecord>::iterator i = records.begin(); i != records.end(); i++) {
+ CPPUNIT_ASSERT(i->matches(++expected));
+ }
+ }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(DeliveryRecordTest);
+
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 9c4d5c0fd0..b8622fc925 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -86,6 +86,7 @@ perftest_LDADD=$(lib_client)
broker_unit_tests = \
AccumulatedAckTest \
DtxWorkRecordTest \
+ DeliveryRecordTest \
ExchangeTest \
HeadersExchangeTest \
MessageTest \
diff --git a/cpp/src/tests/TxAckTest.cpp b/cpp/src/tests/TxAckTest.cpp
index 34d0bcd156..73628f25b5 100644
--- a/cpp/src/tests/TxAckTest.cpp
+++ b/cpp/src/tests/TxAckTest.cpp
@@ -78,7 +78,7 @@ public:
messages.push_back(msg);
QueuedMessage qm;
qm.payload = msg;
- deliveries.push_back(DeliveryRecord(qm, queue, "xyz", (i+1), true));
+ deliveries.push_back(DeliveryRecord(qm, queue, "xyz", DeliveryToken::shared_ptr(), (i+1), true));
}
//assume msgs 1-5, 7 and 9 are all acked (i.e. 6, 8 & 10 are not)
diff --git a/python/tests_0-10/message.py b/python/tests_0-10/message.py
index b5b058340f..967c03bbea 100644
--- a/python/tests_0-10/message.py
+++ b/python/tests_0-10/message.py
@@ -187,6 +187,46 @@ class MessageTests(TestBase):
self.fail("Got unexpected message: " + extra.content.body)
except Empty: None
+
+ def test_recover(self):
+ """
+ Test recover behaviour
+ """
+ channel = self.channel
+ channel.queue_declare(queue="queue-a", exclusive=True)
+ channel.queue_bind(exchange="amq.fanout", queue="queue-a")
+ channel.queue_declare(queue="queue-b", exclusive=True)
+ channel.queue_bind(exchange="amq.fanout", queue="queue-b")
+
+ self.subscribe(queue="queue-a", destination="unconfirmed", confirm_mode=1)
+ self.subscribe(queue="queue-b", destination="confirmed", confirm_mode=0)
+ confirmed = self.client.queue("confirmed")
+ unconfirmed = self.client.queue("unconfirmed")
+
+ data = ["One", "Two", "Three", "Four", "Five"]
+ for d in data:
+ channel.message_transfer(destination="amq.fanout", content=Content(body=d))
+
+ for q in [confirmed, unconfirmed]:
+ for d in data:
+ self.assertEqual(d, q.get(timeout=1).content.body)
+ self.assertEmpty(q)
+
+ channel.message_recover(requeue=False)
+
+ self.assertEmpty(confirmed)
+
+ while len(data):
+ msg = None
+ for d in data:
+ msg = unconfirmed.get(timeout=1)
+ self.assertEqual(d, msg.content.body)
+ self.assertEmpty(unconfirmed)
+ data.remove(msg.content.body)
+ msg.complete(cumulative=False)
+ channel.message_recover(requeue=False)
+
+
def test_recover_requeue(self):
"""
Test requeing on recovery
@@ -551,3 +591,9 @@ class MessageTests(TestBase):
def assertDataEquals(self, channel, msg, expected):
self.assertEquals(expected, msg.content.body)
+
+ def assertEmpty(self, queue):
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Queue not empty, contains: " + extra.content.body)
+ except Empty: None