summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp5
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h13
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp2
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp2
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp7
-rw-r--r--qpid/cpp/src/qpid/sys/Stoppable.h6
-rw-r--r--qpid/cpp/src/tests/Makefile.am30
-rw-r--r--qpid/cpp/src/tests/brokertest.py1
-rw-r--r--qpid/cpp/src/tests/cluster.mk6
-rwxr-xr-xqpid/cpp/src/tests/qpid-cpp-benchmark15
-rw-r--r--qpid/cpp/src/tests/qpid-receive.cpp9
-rw-r--r--qpid/cpp/src/tests/testagent.mk2
12 files changed, 51 insertions, 47 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 8f807cd0fe..32b037bb21 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -1293,14 +1293,13 @@ void Queue::UsageBarrier::destroy()
// FIXME aconway 2011-05-06: naming - only affects consumers. stopDispatch()?
void Queue::stop() {
- QPID_LOG(critical, "FIXME Queue stopped " << getName());
+ QPID_LOG(trace, "Queue stopped: " << getName());
// FIXME aconway 2011-05-25: rename dispatching - acquiring?
dispatching.stop();
}
void Queue::start() {
- QPID_LOG(critical, "FIXME Queue started " << getName());
- assert(clusterContext); // FIXME aconway 2011-06-08: XXX
+ QPID_LOG(trace, "Queue started: " << getName());
dispatching.start();
notifyListener();
}
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 31d03e95b0..9435750b4e 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -402,16 +402,9 @@ class Queue : public boost::enable_shared_from_this<Queue>,
void start();
/** Context information used in a cluster. */
- boost::intrusive_ptr<RefCounted> getClusterContext() {
- // FIXME aconway 2011-06-08: XXX
- QPID_LOG(critical, "FIXME q get context " << name << clusterContext);
- return clusterContext;
- }
- void setClusterContext(boost::intrusive_ptr<RefCounted> context) {
- // FIXME aconway 2011-06-08: XXX
- clusterContext = context;
- QPID_LOG(critical, "FIXME q set context " << name << clusterContext);
- }
+ boost::intrusive_ptr<RefCounted> getClusterContext() { return clusterContext; }
+ void setClusterContext(boost::intrusive_ptr<RefCounted> context) { clusterContext = context; }
+
};
}} // qpid::broker
diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
index fa247ae8f5..f30a790547 100644
--- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
@@ -136,7 +136,7 @@ void BrokerContext::create(broker::Queue& q) {
framing::Buffer buf(&data[0], data.size());
q.encode(buf);
core.mcast(ClusterWiringCreateQueueBody(ProtocolVersion(), data));
- QPID_LOG(critical, "FIXME BrokerContext create " << q.getName() << q.getClusterContext().get());
+ // FIXME aconway 2011-07-29: Need asynchronous completion.
}
void BrokerContext::destroy(broker::Queue& q) {
diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
index 86894b9dd9..211b7052e5 100644
--- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
@@ -95,7 +95,7 @@ void MessageHandler::acquire(const std::string& q, uint32_t position) {
BrokerContext::ScopedSuppressReplication ssr;
bool ok = queue->acquireMessageAt(position, qm);
(void)ok; // Avoid unused variable warnings.
- assert(ok);
+ assert(ok); // FIXME aconway 2011-08-04: failing this assertion.
assert(qm.position.getValue() == position);
assert(qm.payload);
}
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
index 122163ee7e..60b218da14 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
@@ -56,7 +56,7 @@ QueueContext::~QueueContext() {
// FIXME aconway 2011-07-27: revisit shutdown logic.
// timeout() could be called concurrently with destructor.
sys::Mutex::ScopedLock l(lock);
- timerTask->cancel();
+ if (timerTask) timerTask->cancel();
}
void QueueContext::replicaState(QueueOwnership state) {
@@ -84,6 +84,7 @@ void QueueContext::replicaState(QueueOwnership state) {
// FIXME aconway 2011-07-27: Dont spin token on an empty queue. Cancel timer.
+// Called in connection threads when a consumer is added
void QueueContext::consume(size_t n) {
sys::Mutex::ScopedLock l(lock);
consumers = n;
@@ -91,6 +92,7 @@ void QueueContext::consume(size_t n) {
framing::ClusterQueueSubscribeBody(framing::ProtocolVersion(), queue.getName()));
}
+// Called in connection threads when a consumer is cancelled
void QueueContext::cancel(size_t n) {
sys::Mutex::ScopedLock l(lock);
consumers = n;
@@ -100,6 +102,7 @@ void QueueContext::cancel(size_t n) {
void QueueContext::timeout() {
QPID_LOG(critical, "FIXME Ownership timeout on queue " << queue.getName());
queue.stop();
+ // When all threads have stopped, queue will call stopped()
}
@@ -109,7 +112,7 @@ void QueueContext::stopped() {
sys::Mutex::ScopedLock l(lock);
// FIXME aconway 2011-07-28: review thread safety of state.
// Deffered call to stopped doesn't sit well.
- // queueActive is invaled while stop is in progress?
+ // queueActive is invalid while stop is in progress?
if (consumers == 0)
mcast.mcast(framing::ClusterQueueUnsubscribeBody(
framing::ProtocolVersion(), queue.getName()));
diff --git a/qpid/cpp/src/qpid/sys/Stoppable.h b/qpid/cpp/src/qpid/sys/Stoppable.h
index 6bb02bc6af..ac0f03d3a1 100644
--- a/qpid/cpp/src/qpid/sys/Stoppable.h
+++ b/qpid/cpp/src/qpid/sys/Stoppable.h
@@ -64,10 +64,9 @@ class Stoppable {
/**
* Set state to "stopped", so no new threads can enter.
- * Call notify function when all busy threads have left.
+ * Notify function will be called when all busy threads have left.
+ * No-op if already stopping.
*/
- // FIXME aconway 2011-06-27: not guaranteed that stopped will be called,
- // deadlock?
void stop() {
sys::Monitor::ScopedLock l(lock);
stopped = true;
@@ -75,6 +74,7 @@ class Stoppable {
}
/** Set the state to "started", allow threads to enter.
+ * If already stopping this will prevent notify function from being called.
*/
void start() {
sys::Monitor::ScopedLock l(lock);
diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am
index bd75432d57..8c8c6c6b15 100644
--- a/qpid/cpp/src/tests/Makefile.am
+++ b/qpid/cpp/src/tests/Makefile.am
@@ -51,9 +51,9 @@ CLEAN_LOCAL=
qpidexecdir = $(libexecdir)/qpid
qpidexec_PROGRAMS =
qpidexec_SCRIPTS =
-qpidtestdir = $(qpidexecdir)/tests
-qpidtest_PROGRAMS =
-qpidtest_SCRIPTS =
+qpidexectestdir = $(qpidexecdir)/tests
+qpidexectest_PROGRAMS =
+qpidexectest_SCRIPTS =
tmoduledir = $(libdir)/qpid/tests
tmodule_LTLIBRARIES=
@@ -150,17 +150,17 @@ endif
# Test programs that are installed and therefore built as part of make, not make check
-qpidtest_SCRIPTS += qpid-cpp-benchmark install_env.sh
+qpidexectest_SCRIPTS += qpid-cpp-benchmark install_env.sh
EXTRA_DIST += qpid-cpp-benchmark install_env.sh
-qpidtest_PROGRAMS += receiver
+qpidexectest_PROGRAMS += receiver
receiver_SOURCES = \
receiver.cpp \
TestOptions.h \
ConnectionOptions.h
receiver_LDADD = $(lib_client)
-qpidtest_PROGRAMS += sender
+qpidexectest_PROGRAMS += sender
sender_SOURCES = \
sender.cpp \
TestOptions.h \
@@ -168,7 +168,7 @@ sender_SOURCES = \
Statistics.cpp
sender_LDADD = $(lib_messaging)
-qpidtest_PROGRAMS += qpid-receive
+qpidexectest_PROGRAMS += qpid-receive
qpid_receive_SOURCES = \
qpid-receive.cpp \
TestOptions.h \
@@ -177,7 +177,7 @@ qpid_receive_SOURCES = \
Statistics.cpp
qpid_receive_LDADD = $(lib_messaging)
-qpidtest_PROGRAMS += qpid-send
+qpidexectest_PROGRAMS += qpid-send
qpid_send_SOURCES = \
qpid-send.cpp \
TestOptions.h \
@@ -186,37 +186,37 @@ qpid_send_SOURCES = \
Statistics.cpp
qpid_send_LDADD = $(lib_messaging)
-qpidtest_PROGRAMS+=qpid-perftest
+qpidexectest_PROGRAMS+=qpid-perftest
qpid_perftest_SOURCES=qpid-perftest.cpp test_tools.h TestOptions.h ConnectionOptions.h
qpid_perftest_INCLUDES=$(PUBLIC_INCLUDES)
qpid_perftest_LDADD=$(lib_client)
-qpidtest_PROGRAMS+=qpid-txtest
+qpidexectest_PROGRAMS+=qpid-txtest
qpid_txtest_INCLUDES=$(PUBLIC_INCLUDES)
qpid_txtest_SOURCES=qpid-txtest.cpp TestOptions.h ConnectionOptions.h
qpid_txtest_LDADD=$(lib_client)
-qpidtest_PROGRAMS+=qpid-latency-test
+qpidexectest_PROGRAMS+=qpid-latency-test
qpid_latency_test_INCLUDES=$(PUBLIC_INCLUDES)
qpid_latency_test_SOURCES=qpid-latency-test.cpp TestOptions.h ConnectionOptions.h
qpid_latency_test_LDADD=$(lib_client)
-qpidtest_PROGRAMS+=qpid-client-test
+qpidexectest_PROGRAMS+=qpid-client-test
qpid_client_test_INCLUDES=$(PUBLIC_INCLUDES)
qpid_client_test_SOURCES=qpid-client-test.cpp TestOptions.h ConnectionOptions.h
qpid_client_test_LDADD=$(lib_client)
-qpidtest_PROGRAMS+=qpid-topic-listener
+qpidexectest_PROGRAMS+=qpid-topic-listener
qpid_topic_listener_INCLUDES=$(PUBLIC_INCLUDES)
qpid_topic_listener_SOURCES=qpid-topic-listener.cpp TestOptions.h ConnectionOptions.h
qpid_topic_listener_LDADD=$(lib_client)
-qpidtest_PROGRAMS+=qpid-topic-publisher
+qpidexectest_PROGRAMS+=qpid-topic-publisher
qpid_topic_publisher_INCLUDES=$(PUBLIC_INCLUDES)
qpid_topic_publisher_SOURCES=qpid-topic-publisher.cpp TestOptions.h ConnectionOptions.h
qpid_topic_publisher_LDADD=$(lib_client)
-qpidtest_PROGRAMS+=qpid-ping
+qpidexectest_PROGRAMS+=qpid-ping
qpid_ping_INCLUDES=$(PUBLIC_INCLUDES)
qpid_ping_SOURCES=qpid-ping.cpp test_tools.h TestOptions.h ConnectionOptions.h
qpid_ping_LDADD=$(lib_client)
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 7b231259d5..2a21b26a2b 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -418,7 +418,6 @@ class Cluster:
self.args += [ cluster_name,
"%s-%s:%d" % (self.name, socket.gethostname(), os.getpid()) ]
self.args += [ "--log-enable=info+", "--log-enable=debug+:cluster"]
- self.args += [ "--log-enable=info+", "--log-enable=trace+:cluster"]
assert cluster_lib, "Cannot locate cluster plug-in"
self.args += [ "--load-module", cluster_lib ]
diff --git a/qpid/cpp/src/tests/cluster.mk b/qpid/cpp/src/tests/cluster.mk
index bf5064e74c..bbcc46a120 100644
--- a/qpid/cpp/src/tests/cluster.mk
+++ b/qpid/cpp/src/tests/cluster.mk
@@ -80,7 +80,7 @@ LONG_TESTS += \
cluster_python_tests \
stop_cluster
-qpidtest_PROGRAMS += cluster_test
+qpidexectest_PROGRAMS += cluster_test
cluster_test_SOURCES = \
cluster_test.cpp \
@@ -94,7 +94,7 @@ cluster_test_SOURCES = \
cluster_test_LDADD=$(lib_client) $(lib_broker) $(lib_messaging) ../cluster.la -lboost_unit_test_framework
-qpidtest_SCRIPTS += run_cluster_tests brokertest.py cluster_tests.py cluster_test_logs.py run_long_cluster_tests long_cluster_tests.py testlib.py cluster_tests.fail
-qpidtest_SCRIPTS += $(CLUSTER_TEST_SCRIPTS_LIST)
+qpidexectest_SCRIPTS += run_cluster_tests brokertest.py cluster_tests.py cluster_test_logs.py run_long_cluster_tests long_cluster_tests.py testlib.py cluster_tests.fail
+qpidexectest_SCRIPTS += $(CLUSTER_TEST_SCRIPTS_LIST)
endif
diff --git a/qpid/cpp/src/tests/qpid-cpp-benchmark b/qpid/cpp/src/tests/qpid-cpp-benchmark
index 74bf2df22d..fcc76f6cf3 100755
--- a/qpid/cpp/src/tests/qpid-cpp-benchmark
+++ b/qpid/cpp/src/tests/qpid-cpp-benchmark
@@ -74,7 +74,7 @@ def posix_quote(string):
return "'" + single_quote_re.sub("\\'", string) + "'";
def ssh_command(host, command):
- """Convert command into an ssh command on host with quoting"""
+ """ Convert command into an ssh command on host with quoting"""
return ["ssh", host] + [posix_quote(arg) for arg in command]
class Clients:
@@ -150,8 +150,8 @@ def first_line(p):
def queue_exists(queue,broker):
c = qpid.messaging.Connection(broker)
c.open()
- s = c.session()
try:
+ s = c.session()
try:
s.sender(queue)
return True
@@ -168,12 +168,12 @@ def recreate_queues(queues, brokers):
except qpid.messaging.exceptions.NotFound: pass
# FIXME aconway 2011-05-04: async wiring, wait for changes to propagate.
for b in brokers:
- while queue_exists(q,b): time.sleep(0.001);
+ while queue_exists(q,b): time.sleep(0.1);
for q in queues:
s.sender("%s;{create:always}"%q)
# FIXME aconway 2011-05-04: async wiring, wait for changes to propagate.
for b in brokers:
- while not queue_exists(q,b): time.sleep(0.001);
+ while not queue_exists(q,b): time.sleep(0.1);
c.close()
def print_header(timestamp):
@@ -251,9 +251,12 @@ class RoundRobin:
def main():
opts, args = op.parse_args()
- if not opts.broker: opts.broker = ["127.0.0.1"] # Deafult to local broker
- opts.broker = flatten(opts.broker)
opts.client_host = flatten(opts.client_host)
+ if not opts.broker:
+ if opts.client_host:
+ raise Exception("--broker must be specified if --client_host is.")
+ opts.broker = ["127.0.0.1"] # Deafult to local broker
+ opts.broker = flatten(opts.broker)
brokers = RoundRobin(opts.broker)
client_hosts = RoundRobin(opts.client_host)
send_out = ""
diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp
index 9c713e872a..fc33685407 100644
--- a/qpid/cpp/src/tests/qpid-receive.cpp
+++ b/qpid/cpp/src/tests/qpid-receive.cpp
@@ -190,13 +190,20 @@ int main(int argc, char ** argv)
session.createSender(opts.readyAddress).send(msg);
// For receive rate calculation
- qpid::sys::AbsTime start = qpid::sys::now();
+ qpid::sys::AbsTime start; // Will be set on first itertion.
+ bool started=false;
int64_t interval = 0;
if (opts.receiveRate) interval = qpid::sys::TIME_SEC/opts.receiveRate;
std::map<std::string,Sender> replyTo;
while (!done && receiver.fetch(msg, timeout)) {
+ if (!started) {
+ // Start the time on receipt of the first message to avoid counting
+ // idle time at process startup.
+ start = qpid::sys::AbsTime::now();
+ started = true;
+ }
reporter.message(msg);
if (!opts.ignoreDuplicates || !sequenceTracker.isDuplicate(msg)) {
if (msg.getContent() == EOS) {
diff --git a/qpid/cpp/src/tests/testagent.mk b/qpid/cpp/src/tests/testagent.mk
index 19d91ccab9..25cf43d71e 100644
--- a/qpid/cpp/src/tests/testagent.mk
+++ b/qpid/cpp/src/tests/testagent.mk
@@ -43,7 +43,7 @@ testagent_gen.timestamp: testagent.xml ${TESTAGENT_DEPS}
CLEANFILES+=$(TESTAGENT_GEN_SRC) testagent_gen.timestamp
testagent-testagent.$(OBJEXT): $(TESTAGENT_GEN_SRC)
-qpidtest_PROGRAMS+=testagent
+qpidexectest_PROGRAMS+=testagent
testagent_CXXFLAGS=$(CXXFLAGS) -Itestagent_gen
testagent_SOURCES=testagent.cpp $(TESTAGENT_GEN_SRC)
testagent_LDADD=$(top_builddir)/src/libqmf.la