From 07c8c499649c725a226eeda3e0bfe58fa8ba984c Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Thu, 5 Jul 2007 09:47:07 +0000 Subject: Fix for QPID-534. Get now detects closure correctly. Also fixed broker to allow channel.close-ok (and fixed client to send it). git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@553441 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/client/BasicMessageChannel.cpp | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) (limited to 'cpp/src/qpid/client/BasicMessageChannel.cpp') diff --git a/cpp/src/qpid/client/BasicMessageChannel.cpp b/cpp/src/qpid/client/BasicMessageChannel.cpp index 91849c735e..60368268c0 100644 --- a/cpp/src/qpid/client/BasicMessageChannel.cpp +++ b/cpp/src/qpid/client/BasicMessageChannel.cpp @@ -100,34 +100,32 @@ void BasicMessageChannel::cancel(const std::string& tag, bool synch) { c = i->second; consumers.erase(i); } - if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) + if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) { channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true)); + } channel.sendAndReceiveSync( synch, make_shared_ptr(new BasicCancelBody(channel.version, tag, !synch))); } void BasicMessageChannel::close(){ - ConsumerMap consumersCopy; - { - Mutex::ScopedLock l(lock); - consumersCopy = consumers; - consumers.clear(); - } destGet.shutdown(); destDispatch.shutdown(); - for (ConsumerMap::iterator i=consumersCopy.begin(); - i != consumersCopy.end(); ++i) +} + +void BasicMessageChannel::cancelAll(){ + Mutex::ScopedLock l(lock); + for (ConsumerMap::iterator i = consumers.begin(); i != consumers.end(); i++) { Consumer& c = i->second; - if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK) - && c.lastDeliveryTag > 0) + if (c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) { channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true)); } + channel.send(new BasicCancelBody(channel.version, i->first, true)); } + consumers.clear(); } - bool BasicMessageChannel::get( Message& msg, const Queue& queue, AckMode ackMode) { @@ -324,6 +322,7 @@ void BasicMessageChannel::run() { // Orderly shutdown. } catch (const Exception& e) { + std::cout << "Error caught by dispatch thread: " << e.what() << std::endl; // FIXME aconway 2007-02-20: Report exception to user. QPID_LOG(error, e.what()); } -- cgit v1.2.1