From 876d0b94c37f252b08c81656386100fad18a8a46 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Wed, 21 Feb 2007 19:25:45 +0000 Subject: Thread safety fixes for race conditions on incoming messages. * cpp/lib/client/MessageListener.h: const correctness. * cpp/tests/*: MessageListener const change. * cpp/lib/broker/Content.h: Removed out-of-date FIXME comments. * cpp/lib/client/ClientChannel.h/ .cpp(): - added locking for consumers map and other member access. - refactored implementations of Basic get, deliver, return: most logic now encapsulted in IncomingMessage class. - fix channel close problems. * cpp/lib/client/ClientMessage.h/.cpp: - const correctness & API convenience fixes. - getMethod/setMethod/getHeader: for new IncomingMessage * cpp/lib/client/Connection.h/.cpp: - Fixes to channel closure. * cpp/lib/client/IncomingMessage.h/.cpp: - Encapsulate *all* incoming message handling for client. - Moved handling of BasicGetOk to IncomingMessage to fix race. - Thread safety fixes. * cpp/lib/client/ResponseHandler.h/.cpp: - added getResponse for ClientChannel. * cpp/lib/common/Exception.h: - added missing throwSelf implementations. - added ShutdownException as general purpose shut-down indicator. - added EmptyException as general purpose "empty" indicator. * cpp/lib/common/sys/Condition|Monitor|Mutex.h|.cpp: - Condition variable abstraction extracted from Monitor for situations where a single lock is associated with multiple conditions. * cpp/tests/ClientChannelTest.cpp: - Test incoming message transfer, get, consume etc. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@510161 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/lib/client/ClientChannel.cpp | 302 +++++++++++++++++---------------------- 1 file changed, 130 insertions(+), 172 deletions(-) (limited to 'cpp/lib/client/ClientChannel.cpp') diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp index 42e5cf3054..a8fa219c16 100644 --- a/cpp/lib/client/ClientChannel.cpp +++ b/cpp/lib/client/ClientChannel.cpp @@ -18,6 +18,7 @@ * under the License. * */ +#include #include #include #include @@ -29,17 +30,14 @@ // handling of errors that should close the connection or the channel. // Make sure the user thread receives a connection in each case. // - -using namespace boost; //to use dynamic_pointer_cast +using namespace std; +using namespace boost; using namespace qpid::client; using namespace qpid::framing; using namespace qpid::sys; -const std::string Channel::OK("OK"); - Channel::Channel(bool _transactional, u_int16_t _prefetch) : connection(0), - incoming(0), prefetch(_prefetch), transactional(_transactional) { } @@ -106,8 +104,8 @@ void Channel::protocolInit( ConnectionRedirectBody::shared_ptr redirect( shared_polymorphic_downcast( responses.getResponse())); - std::cout << "Received redirection to " << redirect->getHost() - << std::endl; + cout << "Received redirection to " << redirect->getHost() + << endl; } else { THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response"); } @@ -183,11 +181,11 @@ void Channel::consume( Queue& queue, std::string& tag, MessageListener* listener, int ackMode, bool noLocal, bool synch, const FieldTable* fields) { - string q = queue.getName(); sendAndReceiveSync( synch, new BasicConsumeBody( - version, 0, q, tag, noLocal, ackMode == NO_ACK, false, !synch, + version, 0, queue.getName(), tag, noLocal, + ackMode == NO_ACK, false, !synch, fields ? *fields : FieldTable())); if (synch) { BasicConsumeOkBody::shared_ptr response = @@ -195,90 +193,78 @@ void Channel::consume( responses.getResponse()); tag = response->getConsumerTag(); } - Consumer& c = consumers[tag]; - c.listener = listener; - c.ackMode = ackMode; - c.lastDeliveryTag = 0; + // FIXME aconway 2007-02-20: Race condition! + // We could receive the first message for the consumer + // before we create the consumer below. + // Move consumer creation to handler for BasicConsumeOkBody + { + Mutex::ScopedLock l(lock); + ConsumerMap::iterator i = consumers.find(tag); + if (i != consumers.end()) + THROW_QPID_ERROR(CLIENT_ERROR, + "Consumer already exists with tag="+tag); + Consumer& c = consumers[tag]; + c.listener = listener; + c.ackMode = ackMode; + c.lastDeliveryTag = 0; + } } void Channel::cancel(const std::string& tag, bool synch) { - ConsumerMap::iterator i = consumers.find(tag); - if (i != consumers.end()) { - Consumer& c = i->second; - if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) - send(new BasicAckBody(version, c.lastDeliveryTag, true)); - sendAndReceiveSync( - synch, new BasicCancelBody(version, tag, !synch)); - consumers.erase(tag); + Consumer c; + { + Mutex::ScopedLock l(lock); + ConsumerMap::iterator i = consumers.find(tag); + if (i == consumers.end()) + return; + c = i->second; + consumers.erase(i); } + if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) + send(new BasicAckBody(version, c.lastDeliveryTag, true)); + sendAndReceiveSync( + synch, new BasicCancelBody(version, tag, !synch)); } void Channel::cancelAll(){ - while(!consumers.empty()) { - Consumer c = consumers.begin()->second; - consumers.erase(consumers.begin()); + ConsumerMap consumersCopy; + { + Mutex::ScopedLock l(lock); + consumersCopy = consumers; + consumers.clear(); + } + for (ConsumerMap::iterator i=consumersCopy.begin(); + i != consumersCopy.end(); ++i) + { + Consumer& c = i->second; if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK) && c.lastDeliveryTag > 0) { - // Let exceptions propagate, if one fails no point - // trying the rest. NB no memory leaks if we do, - // ConsumerMap holds values, not pointers. - // send(new BasicAckBody(version, c.lastDeliveryTag, true)); } } } -void Channel::retrieve(Message& msg){ - Monitor::ScopedLock l(retrievalMonitor); - while(retrieved == 0){ - retrievalMonitor.wait(); - } - - msg.header = retrieved->getHeader(); - msg.deliveryTag = retrieved->getDeliveryTag(); - msg.data = retrieved->getData(); - delete retrieved; - retrieved = 0; -} - bool Channel::get(Message& msg, const Queue& queue, int ackMode) { - string name = queue.getName(); - responses.expect(); - send(new BasicGetBody(version, 0, name, ackMode)); - responses.waitForResponse(); - AMQMethodBody::shared_ptr response = responses.getResponse(); - if(response->isA()) { - if(incoming != 0){ - std::cout << "Existing message not complete" << std::endl; - // FIXME aconway 2007-01-26: close the connection? the channel? - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); - }else{ - incoming = new IncomingMessage(dynamic_pointer_cast(response)); - } - retrieve(msg); - return true; - }if(response->isA()){ - return false; - }else{ - // FIXME aconway 2007-01-26: must close the connection. - THROW_QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame"); - } + // Expect a message starting with a BasicGetOk + incoming.startGet(); + send(new BasicGetBody(version, 0, queue.getName(), ackMode)); + return incoming.waitGet(msg); } -void Channel::publish(Message& msg, const Exchange& exchange, const std::string& routingKey, bool mandatory, bool immediate){ - // FIXME aconway 2007-01-30: Rework for message class. - - string e = exchange.getName(); +void Channel::publish( + const Message& msg, const Exchange& exchange, + const std::string& routingKey, bool mandatory, bool immediate) +{ + // FIXME aconway 2007-01-30: Rework for 0-9 message class. + const string e = exchange.getName(); string key = routingKey; send(new BasicPublishBody(version, 0, e, key, mandatory, immediate)); //break msg up into header frame and content frame(s) and send these - string data = msg.getData(); - msg.header->setContentSize(data.length()); send(msg.header); - + string data = msg.getData(); u_int64_t data_length = data.length(); if(data_length > 0){ u_int32_t frag_size = connection->getMaxFrameSize() - 8;//frame itself uses 8 bytes @@ -312,30 +298,30 @@ void Channel::handleMethodInContext( { //channel.flow, channel.close, basic.deliver, basic.return or a //response to a synchronous request - if(responses.isWaiting()){ + if(responses.isWaiting()) { responses.signalResponse(body); - }else if(body->isA()) { - if(incoming != 0){ - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); - std::cout << "Existing message not complete [deliveryTag=" << incoming->getDeliveryTag() << "]" << std::endl; - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); - }else{ - incoming = new IncomingMessage(dynamic_pointer_cast(body)); - } - }else if(body->isA()){ - if(incoming != 0){ - std::cout << "Existing message not complete" << std::endl; - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); - }else{ - incoming = new IncomingMessage(dynamic_pointer_cast(body)); - } - }else if(body->isA()){ + return; + } + + if(body->isA() + || body->isA() + || body->isA() + || body->isA()) + + { + incoming.add(body); + return; + } + else if(body->isA()) { peerClose(shared_polymorphic_downcast(body)); - }else if(body->isA()){ - // TODO aconway 2007-01-24: - }else if(body->isA()){ + } + else if(body->isA()){ + // TODO aconway 2007-01-24: not implemented yet. + } + else if(body->isA()){ connection->close(); - }else{ + } + else { connection->close( 504, "Unrecognised method", body->amqpClassId(), body->amqpMethodId()); @@ -343,31 +329,13 @@ void Channel::handleMethodInContext( } void Channel::handleHeader(AMQHeaderBody::shared_ptr body){ - if(incoming == 0){ - //handle invalid frame sequence - std::cout << "Invalid message sequence: got header before return or deliver." << std::endl; - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before return or deliver."); - }else{ - incoming->setHeader(body); - if(incoming->isComplete()){ - enqueue(); - } - } + incoming.add(body); } void Channel::handleContent(AMQContentBody::shared_ptr body){ - if(incoming == 0){ - //handle invalid frame sequence - std::cout << "Invalid message sequence: got content before return or deliver." << std::endl; - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before return or deliver."); - }else{ - incoming->addContent(body); - if(incoming->isComplete()){ - enqueue(); - } - } + incoming.add(body); } - + void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Channel received heartbeat"); } @@ -376,35 +344,6 @@ void Channel::start(){ dispatcher = Thread(this); } -void Channel::run(){ - dispatch(); -} - -void Channel::enqueue(){ - Monitor::ScopedLock l(retrievalMonitor); - if(incoming->isResponse()){ - retrieved = incoming; - retrievalMonitor.notify(); - }else{ - messages.push(incoming); - dispatchMonitor.notify(); - } - incoming = 0; -} - -IncomingMessage* Channel::dequeue(){ - Monitor::ScopedLock l(dispatchMonitor); - while(messages.empty() && isOpen()){ - dispatchMonitor.wait(); - } - IncomingMessage* msg = 0; - if(!messages.empty()){ - msg = messages.front(); - messages.pop(); - } - return msg; -} - void Channel::deliver(Consumer& consumer, Message& msg){ //record delivery tag: consumer.lastDeliveryTag = msg.getDeliveryTag(); @@ -412,8 +351,6 @@ void Channel::deliver(Consumer& consumer, Message& msg){ //allow registered listener to handle the message consumer.listener->received(msg); - //if the handler calls close on the channel or connection while - //handling this message, then consumer will now have been deleted. if(isOpen()){ bool multiple(false); switch(consumer.ackMode){ @@ -432,35 +369,53 @@ void Channel::deliver(Consumer& consumer, Message& msg){ //a transaction until it commits. } -void Channel::dispatch(){ - while(isOpen()){ - IncomingMessage* incomingMsg = dequeue(); - if(incomingMsg){ - //Note: msg is currently only valid for duration of this call - Message msg(incomingMsg->getHeader()); - msg.data = incomingMsg->getData(); - if(incomingMsg->isReturn()){ - if(returnsHandler == 0){ - //print warning to log/console - std::cout << "Message returned: " << msg.getData() << std::endl; - }else{ - returnsHandler->returned(msg); +void Channel::run() { + while(isOpen()) { + try { + Message msg = incoming.waitDispatch(); + if(msg.getMethod()->isA()) { + ReturnedMessageHandler* handler=0; + { + Mutex::ScopedLock l(lock); + handler=returnsHandler; } - }else{ - msg.deliveryTag = incomingMsg->getDeliveryTag(); - std::string tag = incomingMsg->getConsumerTag(); - - if(consumers.find(tag) == consumers.end()) - std::cout << "Unknown consumer: " << tag << std::endl; - else - deliver(consumers[tag], msg); + if(handler == 0) { + // TODO aconway 2007-02-20: proper logging. + cout << "Message returned: " << msg.getData() << endl; + } + else + handler->returned(msg); + } + else { + BasicDeliverBody::shared_ptr deliverBody = + boost::shared_polymorphic_downcast( + msg.getMethod()); + std::string tag = deliverBody->getConsumerTag(); + Consumer consumer; + { + Mutex::ScopedLock l(lock); + ConsumerMap::iterator i = consumers.find(tag); + if(i == consumers.end()) + THROW_QPID_ERROR(PROTOCOL_ERROR+504, + "Unknown consumer tag=" + tag); + consumer = i->second; + } + deliver(consumer, msg); } - delete incomingMsg; + } + catch (const ShutdownException&) { + /* Orderly shutdown */ + } + catch (const Exception& e) { + // FIXME aconway 2007-02-20: Report exception to user. + cout << "client::Channel::run() terminated by: " << e.toString() + << "(" << typeid(e).name() << ")" << endl; } } } void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler){ + Mutex::ScopedLock l(lock); returnsHandler = handler; } @@ -469,13 +424,17 @@ void Channel::close( u_int16_t code, const std::string& text, ClassId classId, MethodId methodId) { - if (getId() != 0 && isOpen()) { + if (isOpen()) { try { - sendAndReceive( - new ChannelCloseBody(version, code, text, classId, methodId)); - cancelAll(); + if (getId() != 0) { + sendAndReceive( + new ChannelCloseBody( + version, code, text, classId, methodId)); + } + static_cast(connection)->erase(getId()); closeInternal(); } catch (...) { + static_cast(connection)->erase(getId()); closeInternal(); throw; } @@ -491,14 +450,13 @@ void Channel::peerClose(ChannelCloseBody::shared_ptr) { } void Channel::closeInternal() { - assert(isOpen()); + if (isOpen()); { - Monitor::ScopedLock l(dispatchMonitor); - static_cast(connection)->erase(getId()); + cancelAll(); + incoming.shutdown(); connection = 0; // A 0 response means we are closed. responses.signalResponse(AMQMethodBody::shared_ptr()); - dispatchMonitor.notify(); } dispatcher.join(); } -- cgit v1.2.1