diff options
| author | Alan Conway <aconway@apache.org> | 2007-02-21 19:25:45 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-02-21 19:25:45 +0000 |
| commit | 876d0b94c37f252b08c81656386100fad18a8a46 (patch) | |
| tree | 4840b0d697d4629fd5c518507b58fceb7de1578a /cpp/lib/client/ClientChannel.cpp | |
| parent | c36fb4454be5ce4311aa5f5d0e5683db713c5545 (diff) | |
| download | qpid-python-876d0b94c37f252b08c81656386100fad18a8a46.tar.gz | |
Thread safety fixes for race conditions on incoming messages.
* cpp/lib/client/MessageListener.h: const correctness.
* cpp/tests/*: MessageListener const change.
* cpp/lib/broker/Content.h: Removed out-of-date FIXME comments.
* cpp/lib/client/ClientChannel.h/ .cpp():
- added locking for consumers map and other member access.
- refactored implementations of Basic get, deliver, return:
most logic now encapsulted in IncomingMessage class.
- fix channel close problems.
* cpp/lib/client/ClientMessage.h/.cpp:
- const correctness & API convenience fixes.
- getMethod/setMethod/getHeader: for new IncomingMessage
* cpp/lib/client/Connection.h/.cpp:
- Fixes to channel closure.
* cpp/lib/client/IncomingMessage.h/.cpp:
- Encapsulate *all* incoming message handling for client.
- Moved handling of BasicGetOk to IncomingMessage to fix race.
- Thread safety fixes.
* cpp/lib/client/ResponseHandler.h/.cpp:
- added getResponse for ClientChannel.
* cpp/lib/common/Exception.h:
- added missing throwSelf implementations.
- added ShutdownException as general purpose shut-down indicator.
- added EmptyException as general purpose "empty" indicator.
* cpp/lib/common/sys/Condition|Monitor|Mutex.h|.cpp:
- Condition variable abstraction extracted from Monitor for situations
where a single lock is associated with multiple conditions.
* cpp/tests/ClientChannelTest.cpp:
- Test incoming message transfer, get, consume etc.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@510161 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/client/ClientChannel.cpp')
| -rw-r--r-- | cpp/lib/client/ClientChannel.cpp | 302 |
1 files changed, 130 insertions, 172 deletions
diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp index 42e5cf3054..a8fa219c16 100644 --- a/cpp/lib/client/ClientChannel.cpp +++ b/cpp/lib/client/ClientChannel.cpp @@ -18,6 +18,7 @@ * under the License. * */ +#include <iostream> #include <ClientChannel.h> #include <sys/Monitor.h> #include <ClientMessage.h> @@ -29,17 +30,14 @@ // handling of errors that should close the connection or the channel. // Make sure the user thread receives a connection in each case. // - -using namespace boost; //to use dynamic_pointer_cast +using namespace std; +using namespace boost; using namespace qpid::client; using namespace qpid::framing; using namespace qpid::sys; -const std::string Channel::OK("OK"); - Channel::Channel(bool _transactional, u_int16_t _prefetch) : connection(0), - incoming(0), prefetch(_prefetch), transactional(_transactional) { } @@ -106,8 +104,8 @@ void Channel::protocolInit( ConnectionRedirectBody::shared_ptr redirect( shared_polymorphic_downcast<ConnectionRedirectBody>( responses.getResponse())); - std::cout << "Received redirection to " << redirect->getHost() - << std::endl; + cout << "Received redirection to " << redirect->getHost() + << endl; } else { THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response"); } @@ -183,11 +181,11 @@ void Channel::consume( Queue& queue, std::string& tag, MessageListener* listener, int ackMode, bool noLocal, bool synch, const FieldTable* fields) { - string q = queue.getName(); sendAndReceiveSync<BasicConsumeOkBody>( synch, new BasicConsumeBody( - version, 0, q, tag, noLocal, ackMode == NO_ACK, false, !synch, + version, 0, queue.getName(), tag, noLocal, + ackMode == NO_ACK, false, !synch, fields ? *fields : FieldTable())); if (synch) { BasicConsumeOkBody::shared_ptr response = @@ -195,90 +193,78 @@ void Channel::consume( responses.getResponse()); tag = response->getConsumerTag(); } - Consumer& c = consumers[tag]; - c.listener = listener; - c.ackMode = ackMode; - c.lastDeliveryTag = 0; + // FIXME aconway 2007-02-20: Race condition! + // We could receive the first message for the consumer + // before we create the consumer below. + // Move consumer creation to handler for BasicConsumeOkBody + { + Mutex::ScopedLock l(lock); + ConsumerMap::iterator i = consumers.find(tag); + if (i != consumers.end()) + THROW_QPID_ERROR(CLIENT_ERROR, + "Consumer already exists with tag="+tag); + Consumer& c = consumers[tag]; + c.listener = listener; + c.ackMode = ackMode; + c.lastDeliveryTag = 0; + } } void Channel::cancel(const std::string& tag, bool synch) { - ConsumerMap::iterator i = consumers.find(tag); - if (i != consumers.end()) { - Consumer& c = i->second; - if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) - send(new BasicAckBody(version, c.lastDeliveryTag, true)); - sendAndReceiveSync<BasicCancelOkBody>( - synch, new BasicCancelBody(version, tag, !synch)); - consumers.erase(tag); + Consumer c; + { + Mutex::ScopedLock l(lock); + ConsumerMap::iterator i = consumers.find(tag); + if (i == consumers.end()) + return; + c = i->second; + consumers.erase(i); } + if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) + send(new BasicAckBody(version, c.lastDeliveryTag, true)); + sendAndReceiveSync<BasicCancelOkBody>( + synch, new BasicCancelBody(version, tag, !synch)); } void Channel::cancelAll(){ - while(!consumers.empty()) { - Consumer c = consumers.begin()->second; - consumers.erase(consumers.begin()); + ConsumerMap consumersCopy; + { + Mutex::ScopedLock l(lock); + consumersCopy = consumers; + consumers.clear(); + } + for (ConsumerMap::iterator i=consumersCopy.begin(); + i != consumersCopy.end(); ++i) + { + Consumer& c = i->second; if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK) && c.lastDeliveryTag > 0) { - // Let exceptions propagate, if one fails no point - // trying the rest. NB no memory leaks if we do, - // ConsumerMap holds values, not pointers. - // send(new BasicAckBody(version, c.lastDeliveryTag, true)); } } } -void Channel::retrieve(Message& msg){ - Monitor::ScopedLock l(retrievalMonitor); - while(retrieved == 0){ - retrievalMonitor.wait(); - } - - msg.header = retrieved->getHeader(); - msg.deliveryTag = retrieved->getDeliveryTag(); - msg.data = retrieved->getData(); - delete retrieved; - retrieved = 0; -} - bool Channel::get(Message& msg, const Queue& queue, int ackMode) { - string name = queue.getName(); - responses.expect(); - send(new BasicGetBody(version, 0, name, ackMode)); - responses.waitForResponse(); - AMQMethodBody::shared_ptr response = responses.getResponse(); - if(response->isA<BasicGetOkBody>()) { - if(incoming != 0){ - std::cout << "Existing message not complete" << std::endl; - // FIXME aconway 2007-01-26: close the connection? the channel? - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); - }else{ - incoming = new IncomingMessage(dynamic_pointer_cast<BasicGetOkBody, AMQMethodBody>(response)); - } - retrieve(msg); - return true; - }if(response->isA<BasicGetEmptyBody>()){ - return false; - }else{ - // FIXME aconway 2007-01-26: must close the connection. - THROW_QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame"); - } + // Expect a message starting with a BasicGetOk + incoming.startGet(); + send(new BasicGetBody(version, 0, queue.getName(), ackMode)); + return incoming.waitGet(msg); } -void Channel::publish(Message& msg, const Exchange& exchange, const std::string& routingKey, bool mandatory, bool immediate){ - // FIXME aconway 2007-01-30: Rework for message class. - - string e = exchange.getName(); +void Channel::publish( + const Message& msg, const Exchange& exchange, + const std::string& routingKey, bool mandatory, bool immediate) +{ + // FIXME aconway 2007-01-30: Rework for 0-9 message class. + const string e = exchange.getName(); string key = routingKey; send(new BasicPublishBody(version, 0, e, key, mandatory, immediate)); //break msg up into header frame and content frame(s) and send these - string data = msg.getData(); - msg.header->setContentSize(data.length()); send(msg.header); - + string data = msg.getData(); u_int64_t data_length = data.length(); if(data_length > 0){ u_int32_t frag_size = connection->getMaxFrameSize() - 8;//frame itself uses 8 bytes @@ -312,30 +298,30 @@ void Channel::handleMethodInContext( { //channel.flow, channel.close, basic.deliver, basic.return or a //response to a synchronous request - if(responses.isWaiting()){ + if(responses.isWaiting()) { responses.signalResponse(body); - }else if(body->isA<BasicDeliverBody>()) { - if(incoming != 0){ - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); - std::cout << "Existing message not complete [deliveryTag=" << incoming->getDeliveryTag() << "]" << std::endl; - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); - }else{ - incoming = new IncomingMessage(dynamic_pointer_cast<BasicDeliverBody, AMQMethodBody>(body)); - } - }else if(body->isA<BasicReturnBody>()){ - if(incoming != 0){ - std::cout << "Existing message not complete" << std::endl; - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); - }else{ - incoming = new IncomingMessage(dynamic_pointer_cast<BasicReturnBody, AMQMethodBody>(body)); - } - }else if(body->isA<ChannelCloseBody>()){ + return; + } + + if(body->isA<BasicDeliverBody>() + || body->isA<BasicReturnBody>() + || body->isA<BasicGetOkBody>() + || body->isA<BasicGetEmptyBody>()) + + { + incoming.add(body); + return; + } + else if(body->isA<ChannelCloseBody>()) { peerClose(shared_polymorphic_downcast<ChannelCloseBody>(body)); - }else if(body->isA<ChannelFlowBody>()){ - // TODO aconway 2007-01-24: - }else if(body->isA<ConnectionCloseBody>()){ + } + else if(body->isA<ChannelFlowBody>()){ + // TODO aconway 2007-01-24: not implemented yet. + } + else if(body->isA<ConnectionCloseBody>()){ connection->close(); - }else{ + } + else { connection->close( 504, "Unrecognised method", body->amqpClassId(), body->amqpMethodId()); @@ -343,31 +329,13 @@ void Channel::handleMethodInContext( } void Channel::handleHeader(AMQHeaderBody::shared_ptr body){ - if(incoming == 0){ - //handle invalid frame sequence - std::cout << "Invalid message sequence: got header before return or deliver." << std::endl; - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before return or deliver."); - }else{ - incoming->setHeader(body); - if(incoming->isComplete()){ - enqueue(); - } - } + incoming.add(body); } void Channel::handleContent(AMQContentBody::shared_ptr body){ - if(incoming == 0){ - //handle invalid frame sequence - std::cout << "Invalid message sequence: got content before return or deliver." << std::endl; - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before return or deliver."); - }else{ - incoming->addContent(body); - if(incoming->isComplete()){ - enqueue(); - } - } + incoming.add(body); } - + void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Channel received heartbeat"); } @@ -376,35 +344,6 @@ void Channel::start(){ dispatcher = Thread(this); } -void Channel::run(){ - dispatch(); -} - -void Channel::enqueue(){ - Monitor::ScopedLock l(retrievalMonitor); - if(incoming->isResponse()){ - retrieved = incoming; - retrievalMonitor.notify(); - }else{ - messages.push(incoming); - dispatchMonitor.notify(); - } - incoming = 0; -} - -IncomingMessage* Channel::dequeue(){ - Monitor::ScopedLock l(dispatchMonitor); - while(messages.empty() && isOpen()){ - dispatchMonitor.wait(); - } - IncomingMessage* msg = 0; - if(!messages.empty()){ - msg = messages.front(); - messages.pop(); - } - return msg; -} - void Channel::deliver(Consumer& consumer, Message& msg){ //record delivery tag: consumer.lastDeliveryTag = msg.getDeliveryTag(); @@ -412,8 +351,6 @@ void Channel::deliver(Consumer& consumer, Message& msg){ //allow registered listener to handle the message consumer.listener->received(msg); - //if the handler calls close on the channel or connection while - //handling this message, then consumer will now have been deleted. if(isOpen()){ bool multiple(false); switch(consumer.ackMode){ @@ -432,35 +369,53 @@ void Channel::deliver(Consumer& consumer, Message& msg){ //a transaction until it commits. } -void Channel::dispatch(){ - while(isOpen()){ - IncomingMessage* incomingMsg = dequeue(); - if(incomingMsg){ - //Note: msg is currently only valid for duration of this call - Message msg(incomingMsg->getHeader()); - msg.data = incomingMsg->getData(); - if(incomingMsg->isReturn()){ - if(returnsHandler == 0){ - //print warning to log/console - std::cout << "Message returned: " << msg.getData() << std::endl; - }else{ - returnsHandler->returned(msg); +void Channel::run() { + while(isOpen()) { + try { + Message msg = incoming.waitDispatch(); + if(msg.getMethod()->isA<BasicReturnBody>()) { + ReturnedMessageHandler* handler=0; + { + Mutex::ScopedLock l(lock); + handler=returnsHandler; } - }else{ - msg.deliveryTag = incomingMsg->getDeliveryTag(); - std::string tag = incomingMsg->getConsumerTag(); - - if(consumers.find(tag) == consumers.end()) - std::cout << "Unknown consumer: " << tag << std::endl; - else - deliver(consumers[tag], msg); + if(handler == 0) { + // TODO aconway 2007-02-20: proper logging. + cout << "Message returned: " << msg.getData() << endl; + } + else + handler->returned(msg); + } + else { + BasicDeliverBody::shared_ptr deliverBody = + boost::shared_polymorphic_downcast<BasicDeliverBody>( + msg.getMethod()); + std::string tag = deliverBody->getConsumerTag(); + Consumer consumer; + { + Mutex::ScopedLock l(lock); + ConsumerMap::iterator i = consumers.find(tag); + if(i == consumers.end()) + THROW_QPID_ERROR(PROTOCOL_ERROR+504, + "Unknown consumer tag=" + tag); + consumer = i->second; + } + deliver(consumer, msg); } - delete incomingMsg; + } + catch (const ShutdownException&) { + /* Orderly shutdown */ + } + catch (const Exception& e) { + // FIXME aconway 2007-02-20: Report exception to user. + cout << "client::Channel::run() terminated by: " << e.toString() + << "(" << typeid(e).name() << ")" << endl; } } } void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler){ + Mutex::ScopedLock l(lock); returnsHandler = handler; } @@ -469,13 +424,17 @@ void Channel::close( u_int16_t code, const std::string& text, ClassId classId, MethodId methodId) { - if (getId() != 0 && isOpen()) { + if (isOpen()) { try { - sendAndReceive<ChannelCloseOkBody>( - new ChannelCloseBody(version, code, text, classId, methodId)); - cancelAll(); + if (getId() != 0) { + sendAndReceive<ChannelCloseOkBody>( + new ChannelCloseBody( + version, code, text, classId, methodId)); + } + static_cast<ConnectionForChannel*>(connection)->erase(getId()); closeInternal(); } catch (...) { + static_cast<ConnectionForChannel*>(connection)->erase(getId()); closeInternal(); throw; } @@ -491,14 +450,13 @@ void Channel::peerClose(ChannelCloseBody::shared_ptr) { } void Channel::closeInternal() { - assert(isOpen()); + if (isOpen()); { - Monitor::ScopedLock l(dispatchMonitor); - static_cast<ConnectionForChannel*>(connection)->erase(getId()); + cancelAll(); + incoming.shutdown(); connection = 0; // A 0 response means we are closed. responses.signalResponse(AMQMethodBody::shared_ptr()); - dispatchMonitor.notify(); } dispatcher.join(); } |
