diff options
| author | Alan Conway <aconway@apache.org> | 2007-02-21 19:25:45 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-02-21 19:25:45 +0000 |
| commit | 876d0b94c37f252b08c81656386100fad18a8a46 (patch) | |
| tree | 4840b0d697d4629fd5c518507b58fceb7de1578a /cpp/lib/client | |
| parent | c36fb4454be5ce4311aa5f5d0e5683db713c5545 (diff) | |
| download | qpid-python-876d0b94c37f252b08c81656386100fad18a8a46.tar.gz | |
Thread safety fixes for race conditions on incoming messages.
* cpp/lib/client/MessageListener.h: const correctness.
* cpp/tests/*: MessageListener const change.
* cpp/lib/broker/Content.h: Removed out-of-date FIXME comments.
* cpp/lib/client/ClientChannel.h/ .cpp():
- added locking for consumers map and other member access.
- refactored implementations of Basic get, deliver, return:
most logic now encapsulted in IncomingMessage class.
- fix channel close problems.
* cpp/lib/client/ClientMessage.h/.cpp:
- const correctness & API convenience fixes.
- getMethod/setMethod/getHeader: for new IncomingMessage
* cpp/lib/client/Connection.h/.cpp:
- Fixes to channel closure.
* cpp/lib/client/IncomingMessage.h/.cpp:
- Encapsulate *all* incoming message handling for client.
- Moved handling of BasicGetOk to IncomingMessage to fix race.
- Thread safety fixes.
* cpp/lib/client/ResponseHandler.h/.cpp:
- added getResponse for ClientChannel.
* cpp/lib/common/Exception.h:
- added missing throwSelf implementations.
- added ShutdownException as general purpose shut-down indicator.
- added EmptyException as general purpose "empty" indicator.
* cpp/lib/common/sys/Condition|Monitor|Mutex.h|.cpp:
- Condition variable abstraction extracted from Monitor for situations
where a single lock is associated with multiple conditions.
* cpp/tests/ClientChannelTest.cpp:
- Test incoming message transfer, get, consume etc.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@510161 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/client')
| -rw-r--r-- | cpp/lib/client/ClientChannel.cpp | 302 | ||||
| -rw-r--r-- | cpp/lib/client/ClientChannel.h | 21 | ||||
| -rw-r--r-- | cpp/lib/client/ClientMessage.cpp | 37 | ||||
| -rw-r--r-- | cpp/lib/client/ClientMessage.h | 152 | ||||
| -rw-r--r-- | cpp/lib/client/Connection.cpp | 15 | ||||
| -rw-r--r-- | cpp/lib/client/IncomingMessage.cpp | 152 | ||||
| -rw-r--r-- | cpp/lib/client/IncomingMessage.h | 117 | ||||
| -rw-r--r-- | cpp/lib/client/ResponseHandler.cpp | 14 | ||||
| -rw-r--r-- | cpp/lib/client/ResponseHandler.h | 2 |
9 files changed, 468 insertions, 344 deletions
diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp index 42e5cf3054..a8fa219c16 100644 --- a/cpp/lib/client/ClientChannel.cpp +++ b/cpp/lib/client/ClientChannel.cpp @@ -18,6 +18,7 @@ * under the License. * */ +#include <iostream> #include <ClientChannel.h> #include <sys/Monitor.h> #include <ClientMessage.h> @@ -29,17 +30,14 @@ // handling of errors that should close the connection or the channel. // Make sure the user thread receives a connection in each case. // - -using namespace boost; //to use dynamic_pointer_cast +using namespace std; +using namespace boost; using namespace qpid::client; using namespace qpid::framing; using namespace qpid::sys; -const std::string Channel::OK("OK"); - Channel::Channel(bool _transactional, u_int16_t _prefetch) : connection(0), - incoming(0), prefetch(_prefetch), transactional(_transactional) { } @@ -106,8 +104,8 @@ void Channel::protocolInit( ConnectionRedirectBody::shared_ptr redirect( shared_polymorphic_downcast<ConnectionRedirectBody>( responses.getResponse())); - std::cout << "Received redirection to " << redirect->getHost() - << std::endl; + cout << "Received redirection to " << redirect->getHost() + << endl; } else { THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response"); } @@ -183,11 +181,11 @@ void Channel::consume( Queue& queue, std::string& tag, MessageListener* listener, int ackMode, bool noLocal, bool synch, const FieldTable* fields) { - string q = queue.getName(); sendAndReceiveSync<BasicConsumeOkBody>( synch, new BasicConsumeBody( - version, 0, q, tag, noLocal, ackMode == NO_ACK, false, !synch, + version, 0, queue.getName(), tag, noLocal, + ackMode == NO_ACK, false, !synch, fields ? *fields : FieldTable())); if (synch) { BasicConsumeOkBody::shared_ptr response = @@ -195,90 +193,78 @@ void Channel::consume( responses.getResponse()); tag = response->getConsumerTag(); } - Consumer& c = consumers[tag]; - c.listener = listener; - c.ackMode = ackMode; - c.lastDeliveryTag = 0; + // FIXME aconway 2007-02-20: Race condition! + // We could receive the first message for the consumer + // before we create the consumer below. + // Move consumer creation to handler for BasicConsumeOkBody + { + Mutex::ScopedLock l(lock); + ConsumerMap::iterator i = consumers.find(tag); + if (i != consumers.end()) + THROW_QPID_ERROR(CLIENT_ERROR, + "Consumer already exists with tag="+tag); + Consumer& c = consumers[tag]; + c.listener = listener; + c.ackMode = ackMode; + c.lastDeliveryTag = 0; + } } void Channel::cancel(const std::string& tag, bool synch) { - ConsumerMap::iterator i = consumers.find(tag); - if (i != consumers.end()) { - Consumer& c = i->second; - if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) - send(new BasicAckBody(version, c.lastDeliveryTag, true)); - sendAndReceiveSync<BasicCancelOkBody>( - synch, new BasicCancelBody(version, tag, !synch)); - consumers.erase(tag); + Consumer c; + { + Mutex::ScopedLock l(lock); + ConsumerMap::iterator i = consumers.find(tag); + if (i == consumers.end()) + return; + c = i->second; + consumers.erase(i); } + if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) + send(new BasicAckBody(version, c.lastDeliveryTag, true)); + sendAndReceiveSync<BasicCancelOkBody>( + synch, new BasicCancelBody(version, tag, !synch)); } void Channel::cancelAll(){ - while(!consumers.empty()) { - Consumer c = consumers.begin()->second; - consumers.erase(consumers.begin()); + ConsumerMap consumersCopy; + { + Mutex::ScopedLock l(lock); + consumersCopy = consumers; + consumers.clear(); + } + for (ConsumerMap::iterator i=consumersCopy.begin(); + i != consumersCopy.end(); ++i) + { + Consumer& c = i->second; if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK) && c.lastDeliveryTag > 0) { - // Let exceptions propagate, if one fails no point - // trying the rest. NB no memory leaks if we do, - // ConsumerMap holds values, not pointers. - // send(new BasicAckBody(version, c.lastDeliveryTag, true)); } } } -void Channel::retrieve(Message& msg){ - Monitor::ScopedLock l(retrievalMonitor); - while(retrieved == 0){ - retrievalMonitor.wait(); - } - - msg.header = retrieved->getHeader(); - msg.deliveryTag = retrieved->getDeliveryTag(); - msg.data = retrieved->getData(); - delete retrieved; - retrieved = 0; -} - bool Channel::get(Message& msg, const Queue& queue, int ackMode) { - string name = queue.getName(); - responses.expect(); - send(new BasicGetBody(version, 0, name, ackMode)); - responses.waitForResponse(); - AMQMethodBody::shared_ptr response = responses.getResponse(); - if(response->isA<BasicGetOkBody>()) { - if(incoming != 0){ - std::cout << "Existing message not complete" << std::endl; - // FIXME aconway 2007-01-26: close the connection? the channel? - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); - }else{ - incoming = new IncomingMessage(dynamic_pointer_cast<BasicGetOkBody, AMQMethodBody>(response)); - } - retrieve(msg); - return true; - }if(response->isA<BasicGetEmptyBody>()){ - return false; - }else{ - // FIXME aconway 2007-01-26: must close the connection. - THROW_QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame"); - } + // Expect a message starting with a BasicGetOk + incoming.startGet(); + send(new BasicGetBody(version, 0, queue.getName(), ackMode)); + return incoming.waitGet(msg); } -void Channel::publish(Message& msg, const Exchange& exchange, const std::string& routingKey, bool mandatory, bool immediate){ - // FIXME aconway 2007-01-30: Rework for message class. - - string e = exchange.getName(); +void Channel::publish( + const Message& msg, const Exchange& exchange, + const std::string& routingKey, bool mandatory, bool immediate) +{ + // FIXME aconway 2007-01-30: Rework for 0-9 message class. + const string e = exchange.getName(); string key = routingKey; send(new BasicPublishBody(version, 0, e, key, mandatory, immediate)); //break msg up into header frame and content frame(s) and send these - string data = msg.getData(); - msg.header->setContentSize(data.length()); send(msg.header); - + string data = msg.getData(); u_int64_t data_length = data.length(); if(data_length > 0){ u_int32_t frag_size = connection->getMaxFrameSize() - 8;//frame itself uses 8 bytes @@ -312,30 +298,30 @@ void Channel::handleMethodInContext( { //channel.flow, channel.close, basic.deliver, basic.return or a //response to a synchronous request - if(responses.isWaiting()){ + if(responses.isWaiting()) { responses.signalResponse(body); - }else if(body->isA<BasicDeliverBody>()) { - if(incoming != 0){ - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); - std::cout << "Existing message not complete [deliveryTag=" << incoming->getDeliveryTag() << "]" << std::endl; - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); - }else{ - incoming = new IncomingMessage(dynamic_pointer_cast<BasicDeliverBody, AMQMethodBody>(body)); - } - }else if(body->isA<BasicReturnBody>()){ - if(incoming != 0){ - std::cout << "Existing message not complete" << std::endl; - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete"); - }else{ - incoming = new IncomingMessage(dynamic_pointer_cast<BasicReturnBody, AMQMethodBody>(body)); - } - }else if(body->isA<ChannelCloseBody>()){ + return; + } + + if(body->isA<BasicDeliverBody>() + || body->isA<BasicReturnBody>() + || body->isA<BasicGetOkBody>() + || body->isA<BasicGetEmptyBody>()) + + { + incoming.add(body); + return; + } + else if(body->isA<ChannelCloseBody>()) { peerClose(shared_polymorphic_downcast<ChannelCloseBody>(body)); - }else if(body->isA<ChannelFlowBody>()){ - // TODO aconway 2007-01-24: - }else if(body->isA<ConnectionCloseBody>()){ + } + else if(body->isA<ChannelFlowBody>()){ + // TODO aconway 2007-01-24: not implemented yet. + } + else if(body->isA<ConnectionCloseBody>()){ connection->close(); - }else{ + } + else { connection->close( 504, "Unrecognised method", body->amqpClassId(), body->amqpMethodId()); @@ -343,31 +329,13 @@ void Channel::handleMethodInContext( } void Channel::handleHeader(AMQHeaderBody::shared_ptr body){ - if(incoming == 0){ - //handle invalid frame sequence - std::cout << "Invalid message sequence: got header before return or deliver." << std::endl; - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before return or deliver."); - }else{ - incoming->setHeader(body); - if(incoming->isComplete()){ - enqueue(); - } - } + incoming.add(body); } void Channel::handleContent(AMQContentBody::shared_ptr body){ - if(incoming == 0){ - //handle invalid frame sequence - std::cout << "Invalid message sequence: got content before return or deliver." << std::endl; - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before return or deliver."); - }else{ - incoming->addContent(body); - if(incoming->isComplete()){ - enqueue(); - } - } + incoming.add(body); } - + void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Channel received heartbeat"); } @@ -376,35 +344,6 @@ void Channel::start(){ dispatcher = Thread(this); } -void Channel::run(){ - dispatch(); -} - -void Channel::enqueue(){ - Monitor::ScopedLock l(retrievalMonitor); - if(incoming->isResponse()){ - retrieved = incoming; - retrievalMonitor.notify(); - }else{ - messages.push(incoming); - dispatchMonitor.notify(); - } - incoming = 0; -} - -IncomingMessage* Channel::dequeue(){ - Monitor::ScopedLock l(dispatchMonitor); - while(messages.empty() && isOpen()){ - dispatchMonitor.wait(); - } - IncomingMessage* msg = 0; - if(!messages.empty()){ - msg = messages.front(); - messages.pop(); - } - return msg; -} - void Channel::deliver(Consumer& consumer, Message& msg){ //record delivery tag: consumer.lastDeliveryTag = msg.getDeliveryTag(); @@ -412,8 +351,6 @@ void Channel::deliver(Consumer& consumer, Message& msg){ //allow registered listener to handle the message consumer.listener->received(msg); - //if the handler calls close on the channel or connection while - //handling this message, then consumer will now have been deleted. if(isOpen()){ bool multiple(false); switch(consumer.ackMode){ @@ -432,35 +369,53 @@ void Channel::deliver(Consumer& consumer, Message& msg){ //a transaction until it commits. } -void Channel::dispatch(){ - while(isOpen()){ - IncomingMessage* incomingMsg = dequeue(); - if(incomingMsg){ - //Note: msg is currently only valid for duration of this call - Message msg(incomingMsg->getHeader()); - msg.data = incomingMsg->getData(); - if(incomingMsg->isReturn()){ - if(returnsHandler == 0){ - //print warning to log/console - std::cout << "Message returned: " << msg.getData() << std::endl; - }else{ - returnsHandler->returned(msg); +void Channel::run() { + while(isOpen()) { + try { + Message msg = incoming.waitDispatch(); + if(msg.getMethod()->isA<BasicReturnBody>()) { + ReturnedMessageHandler* handler=0; + { + Mutex::ScopedLock l(lock); + handler=returnsHandler; } - }else{ - msg.deliveryTag = incomingMsg->getDeliveryTag(); - std::string tag = incomingMsg->getConsumerTag(); - - if(consumers.find(tag) == consumers.end()) - std::cout << "Unknown consumer: " << tag << std::endl; - else - deliver(consumers[tag], msg); + if(handler == 0) { + // TODO aconway 2007-02-20: proper logging. + cout << "Message returned: " << msg.getData() << endl; + } + else + handler->returned(msg); + } + else { + BasicDeliverBody::shared_ptr deliverBody = + boost::shared_polymorphic_downcast<BasicDeliverBody>( + msg.getMethod()); + std::string tag = deliverBody->getConsumerTag(); + Consumer consumer; + { + Mutex::ScopedLock l(lock); + ConsumerMap::iterator i = consumers.find(tag); + if(i == consumers.end()) + THROW_QPID_ERROR(PROTOCOL_ERROR+504, + "Unknown consumer tag=" + tag); + consumer = i->second; + } + deliver(consumer, msg); } - delete incomingMsg; + } + catch (const ShutdownException&) { + /* Orderly shutdown */ + } + catch (const Exception& e) { + // FIXME aconway 2007-02-20: Report exception to user. + cout << "client::Channel::run() terminated by: " << e.toString() + << "(" << typeid(e).name() << ")" << endl; } } } void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler){ + Mutex::ScopedLock l(lock); returnsHandler = handler; } @@ -469,13 +424,17 @@ void Channel::close( u_int16_t code, const std::string& text, ClassId classId, MethodId methodId) { - if (getId() != 0 && isOpen()) { + if (isOpen()) { try { - sendAndReceive<ChannelCloseOkBody>( - new ChannelCloseBody(version, code, text, classId, methodId)); - cancelAll(); + if (getId() != 0) { + sendAndReceive<ChannelCloseOkBody>( + new ChannelCloseBody( + version, code, text, classId, methodId)); + } + static_cast<ConnectionForChannel*>(connection)->erase(getId()); closeInternal(); } catch (...) { + static_cast<ConnectionForChannel*>(connection)->erase(getId()); closeInternal(); throw; } @@ -491,14 +450,13 @@ void Channel::peerClose(ChannelCloseBody::shared_ptr) { } void Channel::closeInternal() { - assert(isOpen()); + if (isOpen()); { - Monitor::ScopedLock l(dispatchMonitor); - static_cast<ConnectionForChannel*>(connection)->erase(getId()); + cancelAll(); + incoming.shutdown(); connection = 0; // A 0 response means we are closed. responses.signalResponse(AMQMethodBody::shared_ptr()); - dispatchMonitor.notify(); } dispatcher.join(); } diff --git a/cpp/lib/client/ClientChannel.h b/cpp/lib/client/ClientChannel.h index ed67fd8f6b..9c422305b0 100644 --- a/cpp/lib/client/ClientChannel.h +++ b/cpp/lib/client/ClientChannel.h @@ -89,19 +89,12 @@ class Channel : public framing::ChannelAdapter, u_int64_t lastDeliveryTag; }; typedef std::map<std::string, Consumer> ConsumerMap; - typedef std::queue<boost::shared_ptr<framing::AMQMethodBody> > IncomingMethods; - static const std::string OK; - + sys::Mutex lock; Connection* connection; sys::Thread dispatcher; - IncomingMethods incomingMethods; - IncomingMessage* incoming; + IncomingMessage incoming; ResponseHandler responses; - std::queue<IncomingMessage*> messages;//holds returned messages or those delivered for a consume - IncomingMessage* retrieved;//holds response to basic.get - sys::Monitor dispatchMonitor; - sys::Monitor retrievalMonitor; ConsumerMap consumers; ReturnedMessageHandler* returnsHandler; @@ -109,10 +102,7 @@ class Channel : public framing::ChannelAdapter, const bool transactional; framing::ProtocolVersion version; - void enqueue(); void retrieve(Message& msg); - IncomingMessage* dequeue(); - void dispatch(); void deliver(Consumer& consumer, Message& msg); void handleHeader(framing::AMQHeaderBody::shared_ptr body); @@ -307,7 +297,8 @@ class Channel : public framing::ChannelAdapter, * receive this message on publication, the message will be * returned (see setReturnedMessageHandler()). */ - void publish(Message& msg, const Exchange& exchange, const std::string& routingKey, + void publish(const Message& msg, const Exchange& exchange, + const std::string& routingKey, bool mandatory = false, bool immediate = false); /** @@ -352,8 +343,8 @@ class Channel : public framing::ChannelAdapter, * Closing a channel that is not open has no effect. */ void close( - framing::ReplyCode = 200, const std::string& =OK, - framing::ClassId = 0, framing::MethodId = 0); + framing::ReplyCode = 200, const std::string& ="OK", + framing::ClassId = 0, framing::MethodId = 0); /** * Set a handler for this channel that will process any diff --git a/cpp/lib/client/ClientMessage.cpp b/cpp/lib/client/ClientMessage.cpp index 8b08f7e535..bd4adb78f7 100644 --- a/cpp/lib/client/ClientMessage.cpp +++ b/cpp/lib/client/ClientMessage.cpp @@ -19,7 +19,6 @@ * */ #include <ClientMessage.h> - using namespace qpid::client; using namespace qpid::framing; @@ -40,63 +39,63 @@ Message::Message(AMQHeaderBody::shared_ptr& _header) : header(_header){ Message::~Message(){ } -BasicHeaderProperties* Message::getHeaderProperties(){ +BasicHeaderProperties* Message::getHeaderProperties() const { return dynamic_cast<BasicHeaderProperties*>(header->getProperties()); } -const std::string& Message::getContentType(){ +const std::string& Message::getContentType() const { return getHeaderProperties()->getContentType(); } -const std::string& Message::getContentEncoding(){ +const std::string& Message::getContentEncoding() const { return getHeaderProperties()->getContentEncoding(); } -FieldTable& Message::getHeaders(){ +FieldTable& Message::getHeaders() const { return getHeaderProperties()->getHeaders(); } -u_int8_t Message::getDeliveryMode(){ +u_int8_t Message::getDeliveryMode() const { return getHeaderProperties()->getDeliveryMode(); } -u_int8_t Message::getPriority(){ +u_int8_t Message::getPriority() const { return getHeaderProperties()->getPriority(); } -const std::string& Message::getCorrelationId(){ +const std::string& Message::getCorrelationId() const { return getHeaderProperties()->getCorrelationId(); } -const std::string& Message::getReplyTo(){ +const std::string& Message::getReplyTo() const { return getHeaderProperties()->getReplyTo(); } -const std::string& Message::getExpiration(){ +const std::string& Message::getExpiration() const { return getHeaderProperties()->getExpiration(); } -const std::string& Message::getMessageId(){ +const std::string& Message::getMessageId() const { return getHeaderProperties()->getMessageId(); } -u_int64_t Message::getTimestamp(){ +u_int64_t Message::getTimestamp() const { return getHeaderProperties()->getTimestamp(); } -const std::string& Message::getType(){ +const std::string& Message::getType() const { return getHeaderProperties()->getType(); } -const std::string& Message::getUserId(){ +const std::string& Message::getUserId() const { return getHeaderProperties()->getUserId(); } -const std::string& Message::getAppId(){ +const std::string& Message::getAppId() const { return getHeaderProperties()->getAppId(); } -const std::string& Message::getClusterId(){ +const std::string& Message::getClusterId() const { return getHeaderProperties()->getClusterId(); } @@ -155,3 +154,9 @@ void Message::setAppId(const std::string& appId){ void Message::setClusterId(const std::string& clusterId){ getHeaderProperties()->setClusterId(clusterId); } + + +u_int64_t Message::getDeliveryTag() const { + BasicDeliverBody* deliver=dynamic_cast<BasicDeliverBody*>(method.get()); + return deliver ? deliver->getDeliveryTag() : 0; +} diff --git a/cpp/lib/client/ClientMessage.h b/cpp/lib/client/ClientMessage.h index 148f9240c8..8661f6b791 100644 --- a/cpp/lib/client/ClientMessage.h +++ b/cpp/lib/client/ClientMessage.h @@ -25,89 +25,99 @@ #include <framing/amqp_framing.h> namespace qpid { + namespace client { +class IncomingMessage; - /** - * A representation of messages for sent or recived through the - * client api. - * - * \ingroup clientapi - */ - class Message{ - qpid::framing::AMQHeaderBody::shared_ptr header; - std::string data; - bool redelivered; - u_int64_t deliveryTag; +/** + * A representation of messages for sent or recived through the + * client api. + * + * \ingroup clientapi + */ +class Message { + framing::AMQMethodBody::shared_ptr method; + framing::AMQHeaderBody::shared_ptr header; + std::string data; + bool redelivered; - qpid::framing::BasicHeaderProperties* getHeaderProperties(); - Message(qpid::framing::AMQHeaderBody::shared_ptr& header); + // FIXME aconway 2007-02-20: const incorrect, needs const return type. + framing::BasicHeaderProperties* getHeaderProperties() const; + Message(qpid::framing::AMQHeaderBody::shared_ptr& header); - public: - Message(const std::string& data=std::string()); - ~Message(); + public: + Message(const std::string& data=std::string()); + ~Message(); - /** - * Allows the application to access the content of messages - * received. - * - * @return a string representing the data of the message - */ - std::string getData() const { return data; } + /** + * Allows the application to access the content of messages + * received. + * + * @return a string representing the data of the message + */ + std::string getData() const { return data; } - /** - * Allows the application to set the content of messages to be - * sent. - * - * @param data a string representing the data of the message - */ - void setData(const std::string& _data); + /** + * Allows the application to set the content of messages to be + * sent. + * + * @param data a string representing the data of the message + */ + void setData(const std::string& _data); - /** - * @return true if this message was delivered previously (to - * any consumer) but was not acknowledged. - */ - inline bool isRedelivered(){ return redelivered; } - inline void setRedelivered(bool _redelivered){ redelivered = _redelivered; } + /** + * @return true if this message was delivered previously (to + * any consumer) but was not acknowledged. + */ + bool isRedelivered(){ return redelivered; } + void setRedelivered(bool _redelivered){ redelivered = _redelivered; } - inline u_int64_t getDeliveryTag(){ return deliveryTag; } + u_int64_t getDeliveryTag() const; - const std::string& getContentType(); - const std::string& getContentEncoding(); - qpid::framing::FieldTable& getHeaders(); - u_int8_t getDeliveryMode(); - u_int8_t getPriority(); - const std::string& getCorrelationId(); - const std::string& getReplyTo(); - const std::string& getExpiration(); - const std::string& getMessageId(); - u_int64_t getTimestamp(); - const std::string& getType(); - const std::string& getUserId(); - const std::string& getAppId(); - const std::string& getClusterId(); + const std::string& getContentType() const; + const std::string& getContentEncoding() const; + qpid::framing::FieldTable& getHeaders() const; + u_int8_t getDeliveryMode() const; + u_int8_t getPriority() const; + const std::string& getCorrelationId() const; + const std::string& getReplyTo() const; + const std::string& getExpiration() const; + const std::string& getMessageId() const; + u_int64_t getTimestamp() const; + const std::string& getType() const; + const std::string& getUserId() const; + const std::string& getAppId() const; + const std::string& getClusterId() const; - void setContentType(const std::string& type); - void setContentEncoding(const std::string& encoding); - void setHeaders(const qpid::framing::FieldTable& headers); - /** - * Sets the delivery mode. 1 = non-durable, 2 = durable. - */ - void setDeliveryMode(u_int8_t mode); - void setPriority(u_int8_t priority); - void setCorrelationId(const std::string& correlationId); - void setReplyTo(const std::string& replyTo); - void setExpiration(const std::string& expiration); - void setMessageId(const std::string& messageId); - void setTimestamp(u_int64_t timestamp); - void setType(const std::string& type); - void setUserId(const std::string& userId); - void setAppId(const std::string& appId); - void setClusterId(const std::string& clusterId); + void setContentType(const std::string& type); + void setContentEncoding(const std::string& encoding); + void setHeaders(const qpid::framing::FieldTable& headers); + /** + * Sets the delivery mode. 1 = non-durable, 2 = durable. + */ + void setDeliveryMode(u_int8_t mode); + void setPriority(u_int8_t priority); + void setCorrelationId(const std::string& correlationId); + void setReplyTo(const std::string& replyTo); + void setExpiration(const std::string& expiration); + void setMessageId(const std::string& messageId); + void setTimestamp(u_int64_t timestamp); + void setType(const std::string& type); + void setUserId(const std::string& userId); + void setAppId(const std::string& appId); + void setClusterId(const std::string& clusterId); + /** Get the method used to deliver this message */ + boost::shared_ptr<framing::AMQMethodBody> getMethod() const + { return method; } + + void setMethod(framing::AMQMethodBody::shared_ptr m) { method=m; } + boost::shared_ptr<framing::AMQHeaderBody> getHeader(); - // TODO aconway 2007-02-15: remove friendships. - friend class Channel; - }; + // TODO aconway 2007-02-15: remove friendships. + friend class IncomingMessage; + friend class Channel; +}; }} diff --git a/cpp/lib/client/Connection.cpp b/cpp/lib/client/Connection.cpp index 5b97ca8e5d..566c8fc573 100644 --- a/cpp/lib/client/Connection.cpp +++ b/cpp/lib/client/Connection.cpp @@ -18,7 +18,9 @@ * under the License. * */ +#include <algorithm> #include <boost/format.hpp> +#include <boost/bind.hpp> #include <Connection.h> #include <ClientChannel.h> @@ -27,7 +29,6 @@ #include <iostream> #include <sstream> #include <MethodBodyInstances.h> -#include <boost/bind.hpp> #include <functional> using namespace qpid::framing; @@ -83,15 +84,17 @@ void Connection::close( { if(isOpen) { // TODO aconway 2007-01-29: Exception handling - could end up - // partly closed. + // partly closed with threads left unjoined. isOpen = false; channel0.sendAndReceive<ConnectionCloseOkBody>( new ConnectionCloseBody( getVersion(), code, msg, classId, methodId)); - while(!channels.empty()) { - channels.begin()->second->close(); - channels.erase(channels.begin()); - } + + using boost::bind; + for_each(channels.begin(), channels.end(), + bind(&Channel::closeInternal, + bind(&ChannelMap::value_type::second, _1))); + channels.clear(); connector->close(); } } diff --git a/cpp/lib/client/IncomingMessage.cpp b/cpp/lib/client/IncomingMessage.cpp index c1f6ca880f..07f94ceb64 100644 --- a/cpp/lib/client/IncomingMessage.cpp +++ b/cpp/lib/client/IncomingMessage.cpp @@ -19,58 +19,154 @@ * */ #include <IncomingMessage.h> +#include "framing/AMQHeaderBody.h" +#include "framing/AMQContentBody.h" +#include "BasicGetOkBody.h" +#include "BasicReturnBody.h" +#include "BasicDeliverBody.h" #include <QpidError.h> #include <iostream> -using namespace qpid::client; -using namespace qpid::framing; +namespace qpid { +namespace client { -IncomingMessage::IncomingMessage(BasicDeliverBody::shared_ptr intro) : delivered(intro){} -IncomingMessage::IncomingMessage(BasicReturnBody::shared_ptr intro): returned(intro){} -IncomingMessage::IncomingMessage(BasicGetOkBody::shared_ptr intro): response(intro){} +using namespace sys; +using namespace framing; -IncomingMessage::~IncomingMessage(){ +struct IncomingMessage::Guard: public Mutex::ScopedLock { + Guard(IncomingMessage* im) : Mutex::ScopedLock(im->lock) { + im->shutdownError.throwIf(); + } +}; + +IncomingMessage::IncomingMessage() { reset(); } + +void IncomingMessage::reset() { + state = &IncomingMessage::expectRequest; + endFn= &IncomingMessage::endRequest; + buildMessage = Message(); +} + +void IncomingMessage::startGet() { + Guard g(this); + if (state != &IncomingMessage::expectRequest) { + endGet(new QPID_ERROR(CLIENT_ERROR, "Message already in progress.")); + } + else { + state = &IncomingMessage::expectGetOk; + endFn = &IncomingMessage::endGet; + getError.reset(); + getState = GETTING; + } } -void IncomingMessage::setHeader(AMQHeaderBody::shared_ptr _header){ - this->header = _header; +bool IncomingMessage::waitGet(Message& msg) { + Guard g(this); + while (getState == GETTING && !shutdownError && !getError) + getReady.wait(lock); + shutdownError.throwIf(); + getError.throwIf(); + msg = getMessage; + return getState==GOT; } -void IncomingMessage::addContent(AMQContentBody::shared_ptr content){ - data.append(content->getData()); +Message IncomingMessage::waitDispatch() { + Guard g(this); + while(dispatchQueue.empty() && !shutdownError) + dispatchReady.wait(lock); + shutdownError.throwIf(); + + Message msg(dispatchQueue.front()); + dispatchQueue.pop(); + return msg; } -bool IncomingMessage::isComplete(){ - return header != 0 && header->getContentSize() == data.size(); +void IncomingMessage::add(BodyPtr body) { + Guard g(this); + shutdownError.throwIf(); + // Call the current state function. + (this->*state)(body); } -bool IncomingMessage::isReturn(){ - return returned; +void IncomingMessage::shutdown() { + Mutex::ScopedLock l(lock); + shutdownError.reset(new ShutdownException()); + getReady.notify(); + dispatchReady.notify(); } -bool IncomingMessage::isDelivery(){ - return delivered; +bool IncomingMessage::isShutdown() const { + Mutex::ScopedLock l(lock); + return shutdownError; } -bool IncomingMessage::isResponse(){ - return response; +// Common check for all the expect functions. Called in network thread. +template<class T> +boost::shared_ptr<T> IncomingMessage::expectCheck(BodyPtr body) { + boost::shared_ptr<T> ptr = boost::dynamic_pointer_cast<T>(body); + if (!ptr) + throw QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame type"); + return ptr; } -const string& IncomingMessage::getConsumerTag(){ - if(!isDelivery()) THROW_QPID_ERROR(CLIENT_ERROR, "Consumer tag only valid for delivery"); - return delivered->getConsumerTag(); +void IncomingMessage::expectGetOk(BodyPtr body) { + if (dynamic_cast<BasicGetOkBody*>(body.get())) + state = &IncomingMessage::expectHeader; + else if (dynamic_cast<BasicGetEmptyBody*>(body.get())) { + getState = EMPTY; + endGet(); + } + else + throw QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame type"); } -u_int64_t IncomingMessage::getDeliveryTag(){ - if(!isDelivery()) THROW_QPID_ERROR(CLIENT_ERROR, "Delivery tag only valid for delivery"); - return delivered->getDeliveryTag(); +void IncomingMessage::expectHeader(BodyPtr body) { + AMQHeaderBody::shared_ptr header = expectCheck<AMQHeaderBody>(body); + buildMessage.header = header; + state = &IncomingMessage::expectContent; + checkComplete(); } -AMQHeaderBody::shared_ptr& IncomingMessage::getHeader(){ - return header; +void IncomingMessage::expectContent(BodyPtr body) { + AMQContentBody::shared_ptr content = expectCheck<AMQContentBody>(body); + buildMessage.setData(buildMessage.getData() + content->getData()); + checkComplete(); +} + +void IncomingMessage::checkComplete() { + size_t declaredSize = buildMessage.header->getContentSize(); + size_t currentSize = buildMessage.getData().size(); + if (declaredSize == currentSize) + (this->*endFn)(0); + else if (declaredSize < currentSize) + (this->*endFn)(new QPID_ERROR( + PROTOCOL_ERROR, "Message content exceeds declared size.")); +} + +void IncomingMessage::expectRequest(BodyPtr body) { + AMQMethodBody::shared_ptr method = expectCheck<AMQMethodBody>(body); + buildMessage.setMethod(method); + state = &IncomingMessage::expectHeader; +} + +void IncomingMessage::endGet(Exception* ex) { + getError.reset(ex); + if (getState == GETTING) { + getMessage = buildMessage; + getState = GOT; + } + reset(); + getReady.notify(); } -std::string IncomingMessage::getData() const { - return data; +void IncomingMessage::endRequest(Exception* ex) { + ExceptionHolder eh(ex); + if (!eh) { + dispatchQueue.push(buildMessage); + reset(); + dispatchReady.notify(); + } + eh.throwIf(); } +}} // namespace qpid::client diff --git a/cpp/lib/client/IncomingMessage.h b/cpp/lib/client/IncomingMessage.h index a2aa4d8441..2d7c8723c5 100644 --- a/cpp/lib/client/IncomingMessage.h +++ b/cpp/lib/client/IncomingMessage.h @@ -1,3 +1,6 @@ +#ifndef _IncomingMessage_ +#define _IncomingMessage_ + /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,43 +22,97 @@ * */ #include <string> -#include <vector> +#include <queue> #include <framing/amqp_framing.h> +#include "ExceptionHolder.h" +#include "ClientMessage.h" +#include "sys/Mutex.h" +#include "sys/Condition.h" -#ifndef _IncomingMessage_ -#define _IncomingMessage_ +namespace qpid { -#include <ClientMessage.h> +namespace framing { +class AMQBody; +} -namespace qpid { namespace client { +/** + * Accumulates incoming message frames into messages. + * Client-initiated messages (basic.get) are initiated and made + * available to the user thread one at a time. + * + * Broker initiated messages (basic.return, basic.deliver) are + * queued for handling by the user dispatch thread. + */ +class IncomingMessage { + public: + typedef boost::shared_ptr<framing::AMQBody> BodyPtr; + IncomingMessage(); + + /** Expect a new message starting with getOk. Called in user thread.*/ + void startGet(); - class IncomingMessage{ - //content will be preceded by one of these method frames - qpid::framing::BasicDeliverBody::shared_ptr delivered; - qpid::framing::BasicReturnBody::shared_ptr returned; - qpid::framing::BasicGetOkBody::shared_ptr response; - qpid::framing::AMQHeaderBody::shared_ptr header; - std::string data; - public: - IncomingMessage(qpid::framing::BasicDeliverBody::shared_ptr intro); - IncomingMessage(qpid::framing::BasicReturnBody::shared_ptr intro); - IncomingMessage(qpid::framing::BasicGetOkBody::shared_ptr intro); - ~IncomingMessage(); - void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header); - void addContent(qpid::framing::AMQContentBody::shared_ptr content); - bool isComplete(); - bool isReturn(); - bool isDelivery(); - bool isResponse(); - const std::string& getConsumerTag();//only relevant if isDelivery() - qpid::framing::AMQHeaderBody::shared_ptr& getHeader(); - u_int64_t getDeliveryTag(); - std::string getData() const; - }; + /** Wait for the message to complete, return the message. + * Called in user thread. + *@raises QpidError if there was an error. + */ + bool waitGet(Message&); -} -} + /** Wait for the next broker-initiated message. */ + Message waitDispatch(); + + /** Add a frame body to the message. Called in network thread. */ + void add(BodyPtr); + + /** Shut down: all further calls to any function throw ex. */ + void shutdown(); + + /** Check if shutdown */ + bool isShutdown() const; + + private: + + typedef void (IncomingMessage::* ExpectFn)(BodyPtr); + typedef void (IncomingMessage::* EndFn)(Exception*); + typedef std::queue<Message> MessageQueue; + struct Guard; + friend struct Guard; + + void reset(); + template <class T> boost::shared_ptr<T> expectCheck(BodyPtr); + + // State functions - a state machine where each state is + // a member function that processes a frame body. + void expectGetOk(BodyPtr); + void expectHeader(BodyPtr); + void expectContent(BodyPtr); + void expectRequest(BodyPtr); + + // End functions. + void endGet(Exception* ex = 0); + void endRequest(Exception* ex); + + // Check for complete message. + void checkComplete(); + + mutable sys::Mutex lock; + ExpectFn state; + EndFn endFn; + Message buildMessage; + ExceptionHolder shutdownError; + + // For basic.get messages. + sys::Condition getReady; + ExceptionHolder getError; + Message getMessage; + enum { GETTING, GOT, EMPTY } getState; + + // For broker-initiated messages + sys::Condition dispatchReady; + MessageQueue dispatchQueue; +}; + +}} #endif diff --git a/cpp/lib/client/ResponseHandler.cpp b/cpp/lib/client/ResponseHandler.cpp index ea48fa2386..4498de41ae 100644 --- a/cpp/lib/client/ResponseHandler.cpp +++ b/cpp/lib/client/ResponseHandler.cpp @@ -59,11 +59,8 @@ void ResponseHandler::receive(ClassId c, MethodId m) { Monitor::ScopedLock l(monitor); while (waiting) monitor.wait(); - if (!response) { - THROW_QPID_ERROR( - PROTOCOL_ERROR, "Channel closed unexpectedly."); - } - if(!validate(response->amqpClassId(), response->amqpMethodId())) { + getResponse(); // Check for closed. + if(!validate(response->amqpClassId(), response->amqpMethodId())) { THROW_QPID_ERROR( PROTOCOL_ERROR, boost::format("Expected class:method %d:%d, got %d:%d") @@ -71,6 +68,13 @@ void ResponseHandler::receive(ClassId c, MethodId m) { } } +framing::AMQMethodBody::shared_ptr ResponseHandler::getResponse() { + if (!response) + THROW_QPID_ERROR( + PROTOCOL_ERROR, "Channel closed unexpectedly."); + return response; +} + RequestId ResponseHandler::getRequestId() { assert(response->getRequestId()); return response->getRequestId(); diff --git a/cpp/lib/client/ResponseHandler.h b/cpp/lib/client/ResponseHandler.h index af0c250eb1..d28048c3d3 100644 --- a/cpp/lib/client/ResponseHandler.h +++ b/cpp/lib/client/ResponseHandler.h @@ -42,7 +42,7 @@ class ResponseHandler{ ~ResponseHandler(); bool isWaiting(){ return waiting; } - framing::AMQMethodBody::shared_ptr getResponse(){ return response;} + framing::AMQMethodBody::shared_ptr getResponse(); void waitForResponse(); void signalResponse(framing::AMQMethodBody::shared_ptr response); |
