summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/broker/Consumer.h3
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp1
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp18
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h3
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py5
5 files changed, 26 insertions, 4 deletions
diff --git a/qpid/cpp/src/qpid/broker/Consumer.h b/qpid/cpp/src/qpid/broker/Consumer.h
index e01eff98be..4a0621243c 100644
--- a/qpid/cpp/src/qpid/broker/Consumer.h
+++ b/qpid/cpp/src/qpid/broker/Consumer.h
@@ -91,6 +91,9 @@ class Consumer : public QueueCursor {
const std::string& getTag() const { return tag; }
+ /** Called when there are no more messages immediately available for this consumer on the queue */
+ virtual void stopped() {}
+
protected:
//framing::SequenceNumber position;
const std::string tag; // <destination> from AMQP 0-10 Message.subscribe command
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 2f189da012..8a20bcc69b 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -472,6 +472,7 @@ bool Queue::getNextMessage(Message& m, Consumer::shared_ptr& c)
}
} else {
QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
+ c->stopped();
listeners.addListener(c);
break;
}
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 635d5047bd..a0cfa393aa 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -107,7 +107,8 @@ ReplicatingSubscription::ReplicatingSubscription(
const framing::FieldTable& arguments
) : ConsumerImpl(parent, name, queue_, ack, REPLICATOR, exclusive, tag,
resumeId, resumeTtl, arguments),
- position(0), ready(false), cancelled(false),
+
+ position(0), wasStopped(false), ready(false), cancelled(false),
haBroker(hb),
primary(boost::dynamic_pointer_cast<Primary>(haBroker.getRole()))
{}
@@ -186,10 +187,23 @@ void ReplicatingSubscription::initialize() {
ReplicatingSubscription::~ReplicatingSubscription() {}
+void ReplicatingSubscription::stopped() {
+ Mutex::ScopedLock l(lock);
+ // We have reached the last available message on the queue.
+ //
+ // Note that if messages have been removed out-of-order this may not be the
+ // head of the queue. We may not even have reached the guard
+ // position. However there are no more messages to protect and we will not
+ // be advanced any further, so we should consider ourselves guarded for
+ // purposes of readiness.
+ wasStopped = true;
+ checkReady(l);
+}
// True if the next position for the ReplicatingSubscription is a guarded position.
bool ReplicatingSubscription::isGuarded(sys::Mutex::ScopedLock&) {
- return position+1 >= guard->getFirst();
+ // See comment in stopped()
+ return wasStopped || (position+1 >= guard->getFirst());
}
// Message is delivered in the subscription's connection thread.
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
index 28ab98f73b..868442da7e 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -113,6 +113,8 @@ class ReplicatingSubscription :
void cancel();
void acknowledged(const broker::DeliveryRecord&);
bool browseAcquired() const { return true; }
+ void stopped();
+
// Hide the "queue deleted" error for a ReplicatingSubscription when a
// queue is deleted, this is normal and not an error.
bool hideDeletedError() { return true; }
@@ -147,6 +149,7 @@ class ReplicatingSubscription :
ReplicationIdSet dequeues; // Dequeues to be sent in next dequeue event.
ReplicationIdSet skip; // Skip enqueues: messages already on backup and tx enqueues.
ReplicationIdSet unready; // Unguarded, replicated and un-acknowledged.
+ bool wasStopped;
bool ready;
bool cancelled;
BrokerInfo info;
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index dfb65318a9..f71560dffb 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -20,7 +20,6 @@
import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest
import traceback
from qpid.datatypes import uuid4, UUID
-from qpid.harness import Skipped
from brokertest import *
from ha_test import *
from threading import Thread, Lock, Condition
@@ -363,7 +362,9 @@ class ReplicationTests(HaBrokerTest):
cluster[0].wait_status("ready")
cluster.bounce(1)
# FIXME aconway 2014-02-20: pr does not fail over with 1.0/swig
- if qm == qpid_messaging: raise Skipped("FIXME SWIG client failover bug")
+ if qm == qpid_messaging:
+ print "WARNING: Skipping SWIG client failover bug"
+ return
self.assertEqual("a", pr.fetch().content)
pr.session.acknowledge()
backup.assert_browse_backup("q", ["b"])