diff options
-rw-r--r-- | cpp/src/qpid/broker/Consumer.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 139 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 25 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 75 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.h | 18 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.h | 2 | ||||
-rw-r--r-- | cpp/src/tests/QueueTest.cpp | 22 |
8 files changed, 166 insertions, 125 deletions
diff --git a/cpp/src/qpid/broker/Consumer.h b/cpp/src/qpid/broker/Consumer.h index 4274ce823e..702ffd4d62 100644 --- a/cpp/src/qpid/broker/Consumer.h +++ b/cpp/src/qpid/broker/Consumer.h @@ -47,7 +47,7 @@ namespace qpid { class Consumer { const bool acquires; public: - typedef shared_ptr<Consumer> ptr; + typedef shared_ptr<Consumer> shared_ptr; framing::SequenceNumber position; diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 090c4b4bca..4b94cd32b0 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -90,8 +90,12 @@ Queue::~Queue() void Queue::notifyDurableIOComplete() { - Mutex::ScopedLock locker(messageLock); - notify(); + Listeners copy; + { + Mutex::ScopedLock locker(messageLock); + listeners.swap(copy); + } + for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify)); } bool isLocalTo(const OwnershipToken* token, boost::intrusive_ptr<Message>& msg) @@ -181,10 +185,14 @@ void Queue::process(boost::intrusive_ptr<Message>& msg){ } void Queue::requeue(const QueuedMessage& msg){ - Mutex::ScopedLock locker(messageLock); - msg.payload->enqueueComplete(); // mark the message as enqueued - messages.push_front(msg); - notify(); + Listeners copy; + { + Mutex::ScopedLock locker(messageLock); + msg.payload->enqueueComplete(); // mark the message as enqueued + messages.push_front(msg); + listeners.swap(copy); + } + for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify)); } bool Queue::acquire(const QueuedMessage& msg) { @@ -203,16 +211,16 @@ bool Queue::acquire(const QueuedMessage& msg) { return false; } -bool Queue::getNextMessage(QueuedMessage& m, Consumer& c) +bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c) { - if (c.preAcquires()) { + if (c->preAcquires()) { return consumeNextMessage(m, c); } else { return browseNextMessage(m, c); } } -bool Queue::checkForMessages(Consumer& c) +bool Queue::checkForMessages(Consumer::shared_ptr c) { Mutex::ScopedLock locker(messageLock); if (messages.empty()) { @@ -233,12 +241,12 @@ bool Queue::checkForMessages(Consumer& c) //message (if it does not, no need to register it for //notification as the consumer itself will handle the //credit allocation required to change this condition). - return c.accept(msg.payload); + return c->accept(msg.payload); } } } -bool Queue::consumeNextMessage(QueuedMessage& m, Consumer& c) +bool Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c) { while (true) { Mutex::ScopedLock locker(messageLock); @@ -254,8 +262,8 @@ bool Queue::consumeNextMessage(QueuedMessage& m, Consumer& c) return false; } - if (c.filter(msg.payload)) { - if (c.accept(msg.payload)) { + if (c->filter(msg.payload)) { + if (c->accept(msg.payload)) { m = msg; messages.pop_front(); return true; @@ -274,14 +282,14 @@ bool Queue::consumeNextMessage(QueuedMessage& m, Consumer& c) } -bool Queue::browseNextMessage(QueuedMessage& m, Consumer& c) +bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c) { QueuedMessage msg(this); while (seek(msg, c)) { - if (c.filter(msg.payload)) { - if (c.accept(msg.payload)) { + if (c->filter(msg.payload)) { + if (c->accept(msg.payload)) { //consumer wants the message - c.position = msg.position; + c->position = msg.position; m = msg; return true; } else { @@ -291,59 +299,47 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer& c) } } else { //consumer will never want this message, continue seeking - c.position = msg.position; + c->position = msg.position; QPID_LOG(debug, "Browser skipping message from '" << name << "'"); } } return false; } -/** - * notify listeners that there may be messages to process - */ -void Queue::notify() -{ - if (listeners.empty()) return; - - Listeners copy(listeners); - listeners.clear(); - for_each(copy.begin(), copy.end(), mem_fun(&Consumer::notify)); -} - -void Queue::removeListener(Consumer& c) +void Queue::removeListener(Consumer::shared_ptr c) { Mutex::ScopedLock locker(messageLock); - Listeners::iterator i = std::find(listeners.begin(), listeners.end(), &c); + Listeners::iterator i = std::find(listeners.begin(), listeners.end(), c); if (i != listeners.end()) listeners.erase(i); } -void Queue::addListener(Consumer& c) +void Queue::addListener(Consumer::shared_ptr c) { - Listeners::iterator i = std::find(listeners.begin(), listeners.end(), &c); - if (i == listeners.end()) listeners.push_back(&c); + Listeners::iterator i = std::find(listeners.begin(), listeners.end(), c); + if (i == listeners.end()) listeners.push_back(c); } -bool Queue::dispatch(Consumer& c) +bool Queue::dispatch(Consumer::shared_ptr c) { QueuedMessage msg(this); if (getNextMessage(msg, c)) { - c.deliver(msg); + c->deliver(msg); return true; } else { return false; } } -bool Queue::seek(QueuedMessage& msg, Consumer& c) { +bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) { Mutex::ScopedLock locker(messageLock); - if (!messages.empty() && messages.back().position > c.position) { - if (c.position < messages.front().position) { + if (!messages.empty() && messages.back().position > c->position) { + if (c->position < messages.front().position) { msg = messages.front(); return true; } else { //TODO: can improve performance of this search, for now just searching linearly from end Messages::reverse_iterator pos; - for (Messages::reverse_iterator i = messages.rbegin(); i != messages.rend() && i->position > c.position; i++) { + for (Messages::reverse_iterator i = messages.rbegin(); i != messages.rend() && i->position > c->position; i++) { pos = i; } msg = *pos; @@ -354,7 +350,7 @@ bool Queue::seek(QueuedMessage& msg, Consumer& c) { return false; } -void Queue::consume(Consumer& c, bool requestExclusive){ +void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){ Mutex::ScopedLock locker(consumerLock); if(exclusive) { throw ResourceLockedException( @@ -364,7 +360,7 @@ void Queue::consume(Consumer& c, bool requestExclusive){ throw ResourceLockedException( QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied.")); } else { - exclusive = c.getSession(); + exclusive = c->getSession(); } } consumerCount++; @@ -372,7 +368,7 @@ void Queue::consume(Consumer& c, bool requestExclusive){ mgmtObject->inc_consumerCount (); } -void Queue::cancel(Consumer& c){ +void Queue::cancel(Consumer::shared_ptr c){ removeListener(c); Mutex::ScopedLock locker(consumerLock); consumerCount--; @@ -415,35 +411,40 @@ uint32_t Queue::purge(const uint32_t purge_request){ } void Queue::push(boost::intrusive_ptr<Message>& msg){ - Mutex::ScopedLock locker(messageLock); - messages.push_back(QueuedMessage(this, msg, ++sequence)); - if (policy.get()) { - policy->enqueued(msg->contentSize()); - if (policy->limitExceeded()) { - if (!policyExceeded) { - policyExceeded = true; - QPID_LOG(info, "Queue size exceeded policy for " << name); - } - if (store) { - QPID_LOG(debug, "Message " << msg << " on " << name << " released from memory"); - msg->releaseContent(store); + Listeners copy; + { + Mutex::ScopedLock locker(messageLock); + messages.push_back(QueuedMessage(this, msg, ++sequence)); + if (policy.get()) { + policy->enqueued(msg->contentSize()); + if (policy->limitExceeded()) { + if (!policyExceeded) { + policyExceeded = true; + QPID_LOG(info, "Queue size exceeded policy for " << name); + } + if (store) { + QPID_LOG(debug, "Message " << msg << " on " << name << " released from memory"); + msg->releaseContent(store); + } else { + QPID_LOG(error, "Message " << msg << " on " << name + << " exceeds the policy for the queue but can't be released from memory as the queue is not durable"); + throw ResourceLimitExceededException(QPID_MSG("Policy exceeded for " << name << " " << *policy)); + } } else { - QPID_LOG(error, "Message " << msg << " on " << name - << " exceeds the policy for the queue but can't be released from memory as the queue is not durable"); - throw ResourceLimitExceededException(QPID_MSG("Policy exceeded for " << name << " " << *policy)); - } - } else { - if (policyExceeded) { - policyExceeded = false; - QPID_LOG(info, "Queue size within policy for " << name); + if (policyExceeded) { + policyExceeded = false; + QPID_LOG(info, "Queue size within policy for " << name); + } } } + listeners.swap(copy); } - notify(); + for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify)); } /** function only provided for unit tests, or code not in critical message path */ -uint32_t Queue::getMessageCount() const{ +uint32_t Queue::getMessageCount() const +{ Mutex::ScopedLock locker(messageLock); uint32_t count =0; @@ -454,12 +455,14 @@ uint32_t Queue::getMessageCount() const{ return count; } -uint32_t Queue::getConsumerCount() const{ +uint32_t Queue::getConsumerCount() const +{ Mutex::ScopedLock locker(consumerLock); return consumerCount; } -bool Queue::canAutoDelete() const{ +bool Queue::canAutoDelete() const +{ Mutex::ScopedLock locker(consumerLock); return autodelete && !consumerCount; } diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 8b8ba8278f..65d01a6888 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -34,6 +34,7 @@ #include "qpid/management/Queue.h" #include "qpid/framing/amqp_types.h" +#include <list> #include <vector> #include <memory> #include <deque> @@ -60,7 +61,8 @@ namespace qpid { */ class Queue : public boost::enable_shared_from_this<Queue>, public PersistableQueue, public management::Manageable { - typedef qpid::InlineVector<Consumer*, 5> Listeners; + + typedef std::list<Consumer::shared_ptr> Listeners; typedef std::deque<QueuedMessage> Messages; const string name; @@ -88,14 +90,13 @@ namespace qpid { void push(boost::intrusive_ptr<Message>& msg); void setPolicy(std::auto_ptr<QueuePolicy> policy); - bool seek(QueuedMessage& msg, Consumer& position); - bool getNextMessage(QueuedMessage& msg, Consumer& c); - bool consumeNextMessage(QueuedMessage& msg, Consumer& c); - bool browseNextMessage(QueuedMessage& msg, Consumer& c); + bool seek(QueuedMessage& msg, Consumer::shared_ptr position); + bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr c); + bool consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr c); + bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr c); - void notify(); - void removeListener(Consumer&); - void addListener(Consumer&); + void removeListener(Consumer::shared_ptr); + void addListener(Consumer::shared_ptr); bool isExcluded(boost::intrusive_ptr<Message>& msg); @@ -115,14 +116,14 @@ namespace qpid { management::Manageable* parent = 0); ~Queue(); - bool dispatch(Consumer&); + bool dispatch(Consumer::shared_ptr); /** * Check whether there would be a message available for * dispatch to this consumer. If not, the consumer will be * notified of events that may have changed this * situation. */ - bool checkForMessages(Consumer&); + bool checkForMessages(Consumer::shared_ptr); void create(const qpid::framing::FieldTable& settings); void configure(const qpid::framing::FieldTable& settings); @@ -154,8 +155,8 @@ namespace qpid { */ void recover(boost::intrusive_ptr<Message>& msg); - void consume(Consumer& c, bool exclusive = false); - void cancel(Consumer& c); + void consume(Consumer::shared_ptr c, bool exclusive = false); + void cancel(Consumer::shared_ptr c); uint32_t purge(const uint32_t purge_request = 0); //defaults to all messages diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 4d5c4e7537..3185080f94 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -72,7 +72,7 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss) SemanticState::~SemanticState() { //cancel all consumers for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { - cancel(*ptr_map_ptr(i)); + cancel(i->second); } if (dtxBuffer.get()) { @@ -91,16 +91,16 @@ void SemanticState::consume(DeliveryToken::shared_ptr token, string& tagInOut, { if(tagInOut.empty()) tagInOut = tagGenerator.generate(); - std::auto_ptr<ConsumerImpl> c(new ConsumerImpl(this, token, tagInOut, queue, ackRequired, nolocal, acquire)); - queue->consume(*c, exclusive);//may throw exception + ConsumerImpl::shared_ptr c(new ConsumerImpl(this, token, tagInOut, queue, ackRequired, nolocal, acquire)); + queue->consume(c, exclusive);//may throw exception outputTasks.addOutputTask(c.get()); - consumers.insert(tagInOut, c.release()); + consumers[tagInOut] = c; } void SemanticState::cancel(const string& tag){ ConsumerImplMap::iterator i = consumers.find(tag); if (i != consumers.end()) { - cancel(*ptr_map_ptr(i)); + cancel(i->second); consumers.erase(i); //should cancel all unacked messages for this consumer so that //they are not redelivered on recovery @@ -260,7 +260,8 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, blocked(true), windowing(true), msgCredit(0), - byteCredit(0){} + byteCredit(0), + notifyEnabled(true) {} OwnershipToken* SemanticState::ConsumerImpl::getSession() { @@ -324,10 +325,11 @@ bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg) SemanticState::ConsumerImpl::~ConsumerImpl() {} -void SemanticState::cancel(ConsumerImpl& c) +void SemanticState::cancel(ConsumerImpl::shared_ptr c) { - outputTasks.removeOutputTask(&c); - Queue::shared_ptr queue = c.getQueue(); + c->disableNotify(); + outputTasks.removeOutputTask(c.get()); + Queue::shared_ptr queue = c->getQueue(); if(queue) { queue->cancel(c); if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) { @@ -358,10 +360,10 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { cacheExchange = session.getBroker().getExchanges().get(exchangeName); } - if (acl && acl->doTransferAcl()) - { - if (!acl->authorise(getSession().getConnection().getUserId(),acl::PUBLISH,acl::EXCHANGE,exchangeName, msg->getRoutingKey() )) - throw NotAllowedException("ACL denied exhange publish request"); + if (acl && acl->doTransferAcl()) + { + if (!acl->authorise(getSession().getConnection().getUserId(),acl::PUBLISH,acl::EXCHANGE,exchangeName, msg->getRoutingKey() )) + throw NotAllowedException("ACL denied exhange publish request"); } cacheExchange->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders()); @@ -382,7 +384,7 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { void SemanticState::requestDispatch() { for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { - requestDispatch(*ptr_map_ptr(i)); + requestDispatch(*(i->second)); } } @@ -402,7 +404,7 @@ void SemanticState::complete(DeliveryRecord& delivery) delivery.subtractFrom(outstanding); ConsumerImplMap::iterator i = consumers.find(delivery.getTag()); if (i != consumers.end()) { - ptr_map_ptr(i)->complete(delivery); + i->second->complete(delivery); } } @@ -460,7 +462,7 @@ SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination) if (i == consumers.end()) { throw NotFoundException(QPID_MSG("Unknown destination " << destination)); } else { - return *ptr_map_ptr(i); + return *(i->second); } } @@ -526,7 +528,7 @@ void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value) void SemanticState::ConsumerImpl::flush() { - while(queue->dispatch(*this)) + while(queue->dispatch(shared_from_this())) ; stop(); } @@ -591,19 +593,34 @@ void SemanticState::reject(DeliveryId first, DeliveryId last) } bool SemanticState::ConsumerImpl::hasOutput() { - return queue->checkForMessages(*this); + return queue->checkForMessages(shared_from_this()); } bool SemanticState::ConsumerImpl::doOutput() { - //TODO: think through properly - return queue->dispatch(*this); + return queue->dispatch(shared_from_this()); +} + +void SemanticState::ConsumerImpl::enableNotify() +{ + Mutex::ScopedLock l(lock); + notifyEnabled = true; +} + +void SemanticState::ConsumerImpl::disableNotify() +{ + Mutex::ScopedLock l(lock); + notifyEnabled = true; } void SemanticState::ConsumerImpl::notify() { - //TODO: think through properly - parent->outputTasks.activateOutput(); + //TODO: alter this, don't want to hold locks across external + //calls; for now its is required to protect the notify() from + //having part of the object chain of the invocation being + //concurrently deleted + Mutex::ScopedLock l(lock); + if (notifyEnabled) parent->outputTasks.activateOutput(); } @@ -644,4 +661,18 @@ void SemanticState::completed(DeliveryId first, DeliveryId last) requestDispatch(); } +void SemanticState::attached() +{ + for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { + i->second->enableNotify(); + } +} + +void SemanticState::detached() +{ + for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { + i->second->disableNotify(); + } +} + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index e03d5ec89b..1d32d8aa50 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -37,6 +37,7 @@ #include "qpid/framing/SequenceSet.h" #include "qpid/framing/Uuid.h" #include "qpid/sys/AggregateOutput.h" +#include "qpid/sys/Mutex.h" #include "qpid/shared_ptr.h" #include "AclModule.h" @@ -58,8 +59,10 @@ class SessionContext; class SemanticState : public sys::OutputTask, private boost::noncopyable { - class ConsumerImpl : public Consumer, public sys::OutputTask + class ConsumerImpl : public Consumer, public sys::OutputTask, + public boost::enable_shared_from_this<ConsumerImpl> { + qpid::sys::Mutex lock; SemanticState* const parent; const DeliveryToken::shared_ptr token; const string name; @@ -71,11 +74,14 @@ class SemanticState : public sys::OutputTask, bool windowing; uint32_t msgCredit; uint32_t byteCredit; + bool notifyEnabled; bool checkCredit(boost::intrusive_ptr<Message>& msg); void allocateCredit(boost::intrusive_ptr<Message>& msg); public: + typedef boost::shared_ptr<ConsumerImpl> shared_ptr; + ConsumerImpl(SemanticState* parent, DeliveryToken::shared_ptr token, const string& name, Queue::shared_ptr queue, bool ack, bool nolocal, bool acquire); @@ -84,6 +90,9 @@ class SemanticState : public sys::OutputTask, bool deliver(QueuedMessage& msg); bool filter(boost::intrusive_ptr<Message> msg); bool accept(boost::intrusive_ptr<Message> msg); + + void disableNotify(); + void enableNotify(); void notify(); void setWindowMode(); @@ -100,7 +109,7 @@ class SemanticState : public sys::OutputTask, bool doOutput(); }; - typedef boost::ptr_map<std::string,ConsumerImpl> ConsumerImplMap; + typedef std::map<std::string, ConsumerImpl::shared_ptr> ConsumerImplMap; typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap; SessionContext& session; @@ -130,7 +139,7 @@ class SemanticState : public sys::OutputTask, AckRange findRange(DeliveryId first, DeliveryId last); void requestDispatch(); void requestDispatch(ConsumerImpl&); - void cancel(ConsumerImpl&); + void cancel(ConsumerImpl::shared_ptr); public: SemanticState(DeliveryAdapter&, SessionContext&); @@ -187,6 +196,9 @@ class SemanticState : public sys::OutputTask, //final 0-10 spec (completed and accepted are distinct): void completed(DeliveryId deliveryTag, DeliveryId endTag); void accepted(DeliveryId deliveryTag, DeliveryId endTag); + + void attached(); + void detached(); }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index aa6f6b7520..42f6b78521 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -92,9 +92,8 @@ bool SessionState::isLocal(const ConnectionToken* t) const } void SessionState::detach() { - // activateOutput can be called in a different thread, lock to protect attached status - Mutex::ScopedLock l(lock); QPID_LOG(debug, getId() << ": detached on broker."); + semanticState.detached();//prevents further activateOutput calls until reattached getConnection().outputTasks.removeOutputTask(&semanticState); handler = 0; if (mgmtObject != 0) @@ -102,8 +101,6 @@ void SessionState::detach() { } void SessionState::attach(SessionHandler& h) { - // activateOutput can be called in a different thread, lock to protect attached status - Mutex::ScopedLock l(lock); QPID_LOG(debug, getId() << ": attached on broker."); handler = &h; if (mgmtObject != 0) @@ -115,8 +112,6 @@ void SessionState::attach(SessionHandler& h) { } void SessionState::activateOutput() { - // activateOutput can be called in a different thread, lock to protect attached status - Mutex::ScopedLock l(lock); if (isAttached()) getConnection().outputTasks.activateOutput(); } @@ -273,6 +268,7 @@ void SessionState::senderCompleted(const SequenceSet& commands) { void SessionState::readyToSend() { QPID_LOG(debug, getId() << ": ready to send, activating output."); assert(handler); + semanticState.attached(); sys::AggregateOutput& tasks = handler->getConnection().outputTasks; tasks.addOutputTask(&semanticState); tasks.activateOutput(); diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index 96f2e8f512..c71fe068ce 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -25,7 +25,6 @@ #include "qpid/SessionState.h" #include "qpid/framing/FrameHandler.h" #include "qpid/framing/SequenceSet.h" -#include "qpid/sys/Mutex.h" #include "qpid/sys/Time.h" #include "qpid/management/Manageable.h" #include "qpid/management/Session.h" @@ -117,7 +116,6 @@ class SessionState : public qpid::SessionState, Broker& broker; SessionHandler* handler; sys::AbsTime expiry; // Used by SessionManager. - sys::Mutex lock; bool ignoring; std::string name; SemanticState semanticState; diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp index 20b3d90eb6..8795dbcd03 100644 --- a/cpp/src/tests/QueueTest.cpp +++ b/cpp/src/tests/QueueTest.cpp @@ -78,7 +78,7 @@ QPID_AUTO_TEST_CASE(testAsyncMessage) { Queue::shared_ptr queue(new Queue("my_test_queue", true)); intrusive_ptr<Message> received; - TestConsumer c1; + TestConsumer::shared_ptr c1(new TestConsumer()); queue->consume(c1); @@ -88,7 +88,7 @@ QPID_AUTO_TEST_CASE(testAsyncMessage) { queue->process(msg1); sleep(2); - BOOST_CHECK(!c1.received); + BOOST_CHECK(!c1->received); msg1->enqueueComplete(); received = queue->get().payload; @@ -114,8 +114,8 @@ QPID_AUTO_TEST_CASE(testConsumers){ Queue::shared_ptr queue(new Queue("my_queue", true)); //Test adding consumers: - TestConsumer c1; - TestConsumer c2; + TestConsumer::shared_ptr c1(new TestConsumer()); + TestConsumer::shared_ptr c2(new TestConsumer()); queue->consume(c1); queue->consume(c2); @@ -128,16 +128,16 @@ QPID_AUTO_TEST_CASE(testConsumers){ queue->deliver(msg1); BOOST_CHECK(queue->dispatch(c1)); - BOOST_CHECK_EQUAL(msg1.get(), c1.last.get()); + BOOST_CHECK_EQUAL(msg1.get(), c1->last.get()); queue->deliver(msg2); BOOST_CHECK(queue->dispatch(c2)); - BOOST_CHECK_EQUAL(msg2.get(), c2.last.get()); + BOOST_CHECK_EQUAL(msg2.get(), c2->last.get()); - c1.received = false; + c1->received = false; queue->deliver(msg3); BOOST_CHECK(queue->dispatch(c1)); - BOOST_CHECK_EQUAL(msg3.get(), c1.last.get()); + BOOST_CHECK_EQUAL(msg3.get(), c1->last.get()); //Test cancellation: queue->cancel(c1); @@ -187,13 +187,13 @@ QPID_AUTO_TEST_CASE(testDequeue){ BOOST_CHECK_EQUAL(msg2.get(), received.get()); BOOST_CHECK_EQUAL(uint32_t(1), queue->getMessageCount()); - TestConsumer consumer; + TestConsumer::shared_ptr consumer(new TestConsumer()); queue->consume(consumer); queue->dispatch(consumer); - if (!consumer.received) + if (!consumer->received) sleep(2); - BOOST_CHECK_EQUAL(msg3.get(), consumer.last.get()); + BOOST_CHECK_EQUAL(msg3.get(), consumer->last.get()); BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount()); received = queue->get().payload; |