summaryrefslogtreecommitdiff
path: root/cpp/lib/client
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-02-21 19:25:45 +0000
committerAlan Conway <aconway@apache.org>2007-02-21 19:25:45 +0000
commit876d0b94c37f252b08c81656386100fad18a8a46 (patch)
tree4840b0d697d4629fd5c518507b58fceb7de1578a /cpp/lib/client
parentc36fb4454be5ce4311aa5f5d0e5683db713c5545 (diff)
downloadqpid-python-876d0b94c37f252b08c81656386100fad18a8a46.tar.gz
Thread safety fixes for race conditions on incoming messages.
* cpp/lib/client/MessageListener.h: const correctness. * cpp/tests/*: MessageListener const change. * cpp/lib/broker/Content.h: Removed out-of-date FIXME comments. * cpp/lib/client/ClientChannel.h/ .cpp(): - added locking for consumers map and other member access. - refactored implementations of Basic get, deliver, return: most logic now encapsulted in IncomingMessage class. - fix channel close problems. * cpp/lib/client/ClientMessage.h/.cpp: - const correctness & API convenience fixes. - getMethod/setMethod/getHeader: for new IncomingMessage * cpp/lib/client/Connection.h/.cpp: - Fixes to channel closure. * cpp/lib/client/IncomingMessage.h/.cpp: - Encapsulate *all* incoming message handling for client. - Moved handling of BasicGetOk to IncomingMessage to fix race. - Thread safety fixes. * cpp/lib/client/ResponseHandler.h/.cpp: - added getResponse for ClientChannel. * cpp/lib/common/Exception.h: - added missing throwSelf implementations. - added ShutdownException as general purpose shut-down indicator. - added EmptyException as general purpose "empty" indicator. * cpp/lib/common/sys/Condition|Monitor|Mutex.h|.cpp: - Condition variable abstraction extracted from Monitor for situations where a single lock is associated with multiple conditions. * cpp/tests/ClientChannelTest.cpp: - Test incoming message transfer, get, consume etc. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@510161 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/client')
-rw-r--r--cpp/lib/client/ClientChannel.cpp302
-rw-r--r--cpp/lib/client/ClientChannel.h21
-rw-r--r--cpp/lib/client/ClientMessage.cpp37
-rw-r--r--cpp/lib/client/ClientMessage.h152
-rw-r--r--cpp/lib/client/Connection.cpp15
-rw-r--r--cpp/lib/client/IncomingMessage.cpp152
-rw-r--r--cpp/lib/client/IncomingMessage.h117
-rw-r--r--cpp/lib/client/ResponseHandler.cpp14
-rw-r--r--cpp/lib/client/ResponseHandler.h2
9 files changed, 468 insertions, 344 deletions
diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp
index 42e5cf3054..a8fa219c16 100644
--- a/cpp/lib/client/ClientChannel.cpp
+++ b/cpp/lib/client/ClientChannel.cpp
@@ -18,6 +18,7 @@
* under the License.
*
*/
+#include <iostream>
#include <ClientChannel.h>
#include <sys/Monitor.h>
#include <ClientMessage.h>
@@ -29,17 +30,14 @@
// handling of errors that should close the connection or the channel.
// Make sure the user thread receives a connection in each case.
//
-
-using namespace boost; //to use dynamic_pointer_cast
+using namespace std;
+using namespace boost;
using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::sys;
-const std::string Channel::OK("OK");
-
Channel::Channel(bool _transactional, u_int16_t _prefetch) :
connection(0),
- incoming(0),
prefetch(_prefetch),
transactional(_transactional)
{ }
@@ -106,8 +104,8 @@ void Channel::protocolInit(
ConnectionRedirectBody::shared_ptr redirect(
shared_polymorphic_downcast<ConnectionRedirectBody>(
responses.getResponse()));
- std::cout << "Received redirection to " << redirect->getHost()
- << std::endl;
+ cout << "Received redirection to " << redirect->getHost()
+ << endl;
} else {
THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response");
}
@@ -183,11 +181,11 @@ void Channel::consume(
Queue& queue, std::string& tag, MessageListener* listener,
int ackMode, bool noLocal, bool synch, const FieldTable* fields)
{
- string q = queue.getName();
sendAndReceiveSync<BasicConsumeOkBody>(
synch,
new BasicConsumeBody(
- version, 0, q, tag, noLocal, ackMode == NO_ACK, false, !synch,
+ version, 0, queue.getName(), tag, noLocal,
+ ackMode == NO_ACK, false, !synch,
fields ? *fields : FieldTable()));
if (synch) {
BasicConsumeOkBody::shared_ptr response =
@@ -195,90 +193,78 @@ void Channel::consume(
responses.getResponse());
tag = response->getConsumerTag();
}
- Consumer& c = consumers[tag];
- c.listener = listener;
- c.ackMode = ackMode;
- c.lastDeliveryTag = 0;
+ // FIXME aconway 2007-02-20: Race condition!
+ // We could receive the first message for the consumer
+ // before we create the consumer below.
+ // Move consumer creation to handler for BasicConsumeOkBody
+ {
+ Mutex::ScopedLock l(lock);
+ ConsumerMap::iterator i = consumers.find(tag);
+ if (i != consumers.end())
+ THROW_QPID_ERROR(CLIENT_ERROR,
+ "Consumer already exists with tag="+tag);
+ Consumer& c = consumers[tag];
+ c.listener = listener;
+ c.ackMode = ackMode;
+ c.lastDeliveryTag = 0;
+ }
}
void Channel::cancel(const std::string& tag, bool synch) {
- ConsumerMap::iterator i = consumers.find(tag);
- if (i != consumers.end()) {
- Consumer& c = i->second;
- if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
- send(new BasicAckBody(version, c.lastDeliveryTag, true));
- sendAndReceiveSync<BasicCancelOkBody>(
- synch, new BasicCancelBody(version, tag, !synch));
- consumers.erase(tag);
+ Consumer c;
+ {
+ Mutex::ScopedLock l(lock);
+ ConsumerMap::iterator i = consumers.find(tag);
+ if (i == consumers.end())
+ return;
+ c = i->second;
+ consumers.erase(i);
}
+ if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
+ send(new BasicAckBody(version, c.lastDeliveryTag, true));
+ sendAndReceiveSync<BasicCancelOkBody>(
+ synch, new BasicCancelBody(version, tag, !synch));
}
void Channel::cancelAll(){
- while(!consumers.empty()) {
- Consumer c = consumers.begin()->second;
- consumers.erase(consumers.begin());
+ ConsumerMap consumersCopy;
+ {
+ Mutex::ScopedLock l(lock);
+ consumersCopy = consumers;
+ consumers.clear();
+ }
+ for (ConsumerMap::iterator i=consumersCopy.begin();
+ i != consumersCopy.end(); ++i)
+ {
+ Consumer& c = i->second;
if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK)
&& c.lastDeliveryTag > 0)
{
- // Let exceptions propagate, if one fails no point
- // trying the rest. NB no memory leaks if we do,
- // ConsumerMap holds values, not pointers.
- //
send(new BasicAckBody(version, c.lastDeliveryTag, true));
}
}
}
-void Channel::retrieve(Message& msg){
- Monitor::ScopedLock l(retrievalMonitor);
- while(retrieved == 0){
- retrievalMonitor.wait();
- }
-
- msg.header = retrieved->getHeader();
- msg.deliveryTag = retrieved->getDeliveryTag();
- msg.data = retrieved->getData();
- delete retrieved;
- retrieved = 0;
-}
-
bool Channel::get(Message& msg, const Queue& queue, int ackMode) {
- string name = queue.getName();
- responses.expect();
- send(new BasicGetBody(version, 0, name, ackMode));
- responses.waitForResponse();
- AMQMethodBody::shared_ptr response = responses.getResponse();
- if(response->isA<BasicGetOkBody>()) {
- if(incoming != 0){
- std::cout << "Existing message not complete" << std::endl;
- // FIXME aconway 2007-01-26: close the connection? the channel?
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
- }else{
- incoming = new IncomingMessage(dynamic_pointer_cast<BasicGetOkBody, AMQMethodBody>(response));
- }
- retrieve(msg);
- return true;
- }if(response->isA<BasicGetEmptyBody>()){
- return false;
- }else{
- // FIXME aconway 2007-01-26: must close the connection.
- THROW_QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame");
- }
+ // Expect a message starting with a BasicGetOk
+ incoming.startGet();
+ send(new BasicGetBody(version, 0, queue.getName(), ackMode));
+ return incoming.waitGet(msg);
}
-void Channel::publish(Message& msg, const Exchange& exchange, const std::string& routingKey, bool mandatory, bool immediate){
- // FIXME aconway 2007-01-30: Rework for message class.
-
- string e = exchange.getName();
+void Channel::publish(
+ const Message& msg, const Exchange& exchange,
+ const std::string& routingKey, bool mandatory, bool immediate)
+{
+ // FIXME aconway 2007-01-30: Rework for 0-9 message class.
+ const string e = exchange.getName();
string key = routingKey;
send(new BasicPublishBody(version, 0, e, key, mandatory, immediate));
//break msg up into header frame and content frame(s) and send these
- string data = msg.getData();
- msg.header->setContentSize(data.length());
send(msg.header);
-
+ string data = msg.getData();
u_int64_t data_length = data.length();
if(data_length > 0){
u_int32_t frag_size = connection->getMaxFrameSize() - 8;//frame itself uses 8 bytes
@@ -312,30 +298,30 @@ void Channel::handleMethodInContext(
{
//channel.flow, channel.close, basic.deliver, basic.return or a
//response to a synchronous request
- if(responses.isWaiting()){
+ if(responses.isWaiting()) {
responses.signalResponse(body);
- }else if(body->isA<BasicDeliverBody>()) {
- if(incoming != 0){
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
- std::cout << "Existing message not complete [deliveryTag=" << incoming->getDeliveryTag() << "]" << std::endl;
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
- }else{
- incoming = new IncomingMessage(dynamic_pointer_cast<BasicDeliverBody, AMQMethodBody>(body));
- }
- }else if(body->isA<BasicReturnBody>()){
- if(incoming != 0){
- std::cout << "Existing message not complete" << std::endl;
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
- }else{
- incoming = new IncomingMessage(dynamic_pointer_cast<BasicReturnBody, AMQMethodBody>(body));
- }
- }else if(body->isA<ChannelCloseBody>()){
+ return;
+ }
+
+ if(body->isA<BasicDeliverBody>()
+ || body->isA<BasicReturnBody>()
+ || body->isA<BasicGetOkBody>()
+ || body->isA<BasicGetEmptyBody>())
+
+ {
+ incoming.add(body);
+ return;
+ }
+ else if(body->isA<ChannelCloseBody>()) {
peerClose(shared_polymorphic_downcast<ChannelCloseBody>(body));
- }else if(body->isA<ChannelFlowBody>()){
- // TODO aconway 2007-01-24:
- }else if(body->isA<ConnectionCloseBody>()){
+ }
+ else if(body->isA<ChannelFlowBody>()){
+ // TODO aconway 2007-01-24: not implemented yet.
+ }
+ else if(body->isA<ConnectionCloseBody>()){
connection->close();
- }else{
+ }
+ else {
connection->close(
504, "Unrecognised method",
body->amqpClassId(), body->amqpMethodId());
@@ -343,31 +329,13 @@ void Channel::handleMethodInContext(
}
void Channel::handleHeader(AMQHeaderBody::shared_ptr body){
- if(incoming == 0){
- //handle invalid frame sequence
- std::cout << "Invalid message sequence: got header before return or deliver." << std::endl;
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before return or deliver.");
- }else{
- incoming->setHeader(body);
- if(incoming->isComplete()){
- enqueue();
- }
- }
+ incoming.add(body);
}
void Channel::handleContent(AMQContentBody::shared_ptr body){
- if(incoming == 0){
- //handle invalid frame sequence
- std::cout << "Invalid message sequence: got content before return or deliver." << std::endl;
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before return or deliver.");
- }else{
- incoming->addContent(body);
- if(incoming->isComplete()){
- enqueue();
- }
- }
+ incoming.add(body);
}
-
+
void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Channel received heartbeat");
}
@@ -376,35 +344,6 @@ void Channel::start(){
dispatcher = Thread(this);
}
-void Channel::run(){
- dispatch();
-}
-
-void Channel::enqueue(){
- Monitor::ScopedLock l(retrievalMonitor);
- if(incoming->isResponse()){
- retrieved = incoming;
- retrievalMonitor.notify();
- }else{
- messages.push(incoming);
- dispatchMonitor.notify();
- }
- incoming = 0;
-}
-
-IncomingMessage* Channel::dequeue(){
- Monitor::ScopedLock l(dispatchMonitor);
- while(messages.empty() && isOpen()){
- dispatchMonitor.wait();
- }
- IncomingMessage* msg = 0;
- if(!messages.empty()){
- msg = messages.front();
- messages.pop();
- }
- return msg;
-}
-
void Channel::deliver(Consumer& consumer, Message& msg){
//record delivery tag:
consumer.lastDeliveryTag = msg.getDeliveryTag();
@@ -412,8 +351,6 @@ void Channel::deliver(Consumer& consumer, Message& msg){
//allow registered listener to handle the message
consumer.listener->received(msg);
- //if the handler calls close on the channel or connection while
- //handling this message, then consumer will now have been deleted.
if(isOpen()){
bool multiple(false);
switch(consumer.ackMode){
@@ -432,35 +369,53 @@ void Channel::deliver(Consumer& consumer, Message& msg){
//a transaction until it commits.
}
-void Channel::dispatch(){
- while(isOpen()){
- IncomingMessage* incomingMsg = dequeue();
- if(incomingMsg){
- //Note: msg is currently only valid for duration of this call
- Message msg(incomingMsg->getHeader());
- msg.data = incomingMsg->getData();
- if(incomingMsg->isReturn()){
- if(returnsHandler == 0){
- //print warning to log/console
- std::cout << "Message returned: " << msg.getData() << std::endl;
- }else{
- returnsHandler->returned(msg);
+void Channel::run() {
+ while(isOpen()) {
+ try {
+ Message msg = incoming.waitDispatch();
+ if(msg.getMethod()->isA<BasicReturnBody>()) {
+ ReturnedMessageHandler* handler=0;
+ {
+ Mutex::ScopedLock l(lock);
+ handler=returnsHandler;
}
- }else{
- msg.deliveryTag = incomingMsg->getDeliveryTag();
- std::string tag = incomingMsg->getConsumerTag();
-
- if(consumers.find(tag) == consumers.end())
- std::cout << "Unknown consumer: " << tag << std::endl;
- else
- deliver(consumers[tag], msg);
+ if(handler == 0) {
+ // TODO aconway 2007-02-20: proper logging.
+ cout << "Message returned: " << msg.getData() << endl;
+ }
+ else
+ handler->returned(msg);
+ }
+ else {
+ BasicDeliverBody::shared_ptr deliverBody =
+ boost::shared_polymorphic_downcast<BasicDeliverBody>(
+ msg.getMethod());
+ std::string tag = deliverBody->getConsumerTag();
+ Consumer consumer;
+ {
+ Mutex::ScopedLock l(lock);
+ ConsumerMap::iterator i = consumers.find(tag);
+ if(i == consumers.end())
+ THROW_QPID_ERROR(PROTOCOL_ERROR+504,
+ "Unknown consumer tag=" + tag);
+ consumer = i->second;
+ }
+ deliver(consumer, msg);
}
- delete incomingMsg;
+ }
+ catch (const ShutdownException&) {
+ /* Orderly shutdown */
+ }
+ catch (const Exception& e) {
+ // FIXME aconway 2007-02-20: Report exception to user.
+ cout << "client::Channel::run() terminated by: " << e.toString()
+ << "(" << typeid(e).name() << ")" << endl;
}
}
}
void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler){
+ Mutex::ScopedLock l(lock);
returnsHandler = handler;
}
@@ -469,13 +424,17 @@ void Channel::close(
u_int16_t code, const std::string& text,
ClassId classId, MethodId methodId)
{
- if (getId() != 0 && isOpen()) {
+ if (isOpen()) {
try {
- sendAndReceive<ChannelCloseOkBody>(
- new ChannelCloseBody(version, code, text, classId, methodId));
- cancelAll();
+ if (getId() != 0) {
+ sendAndReceive<ChannelCloseOkBody>(
+ new ChannelCloseBody(
+ version, code, text, classId, methodId));
+ }
+ static_cast<ConnectionForChannel*>(connection)->erase(getId());
closeInternal();
} catch (...) {
+ static_cast<ConnectionForChannel*>(connection)->erase(getId());
closeInternal();
throw;
}
@@ -491,14 +450,13 @@ void Channel::peerClose(ChannelCloseBody::shared_ptr) {
}
void Channel::closeInternal() {
- assert(isOpen());
+ if (isOpen());
{
- Monitor::ScopedLock l(dispatchMonitor);
- static_cast<ConnectionForChannel*>(connection)->erase(getId());
+ cancelAll();
+ incoming.shutdown();
connection = 0;
// A 0 response means we are closed.
responses.signalResponse(AMQMethodBody::shared_ptr());
- dispatchMonitor.notify();
}
dispatcher.join();
}
diff --git a/cpp/lib/client/ClientChannel.h b/cpp/lib/client/ClientChannel.h
index ed67fd8f6b..9c422305b0 100644
--- a/cpp/lib/client/ClientChannel.h
+++ b/cpp/lib/client/ClientChannel.h
@@ -89,19 +89,12 @@ class Channel : public framing::ChannelAdapter,
u_int64_t lastDeliveryTag;
};
typedef std::map<std::string, Consumer> ConsumerMap;
- typedef std::queue<boost::shared_ptr<framing::AMQMethodBody> > IncomingMethods;
- static const std::string OK;
-
+ sys::Mutex lock;
Connection* connection;
sys::Thread dispatcher;
- IncomingMethods incomingMethods;
- IncomingMessage* incoming;
+ IncomingMessage incoming;
ResponseHandler responses;
- std::queue<IncomingMessage*> messages;//holds returned messages or those delivered for a consume
- IncomingMessage* retrieved;//holds response to basic.get
- sys::Monitor dispatchMonitor;
- sys::Monitor retrievalMonitor;
ConsumerMap consumers;
ReturnedMessageHandler* returnsHandler;
@@ -109,10 +102,7 @@ class Channel : public framing::ChannelAdapter,
const bool transactional;
framing::ProtocolVersion version;
- void enqueue();
void retrieve(Message& msg);
- IncomingMessage* dequeue();
- void dispatch();
void deliver(Consumer& consumer, Message& msg);
void handleHeader(framing::AMQHeaderBody::shared_ptr body);
@@ -307,7 +297,8 @@ class Channel : public framing::ChannelAdapter,
* receive this message on publication, the message will be
* returned (see setReturnedMessageHandler()).
*/
- void publish(Message& msg, const Exchange& exchange, const std::string& routingKey,
+ void publish(const Message& msg, const Exchange& exchange,
+ const std::string& routingKey,
bool mandatory = false, bool immediate = false);
/**
@@ -352,8 +343,8 @@ class Channel : public framing::ChannelAdapter,
* Closing a channel that is not open has no effect.
*/
void close(
- framing::ReplyCode = 200, const std::string& =OK,
- framing::ClassId = 0, framing::MethodId = 0);
+ framing::ReplyCode = 200, const std::string& ="OK",
+ framing::ClassId = 0, framing::MethodId = 0);
/**
* Set a handler for this channel that will process any
diff --git a/cpp/lib/client/ClientMessage.cpp b/cpp/lib/client/ClientMessage.cpp
index 8b08f7e535..bd4adb78f7 100644
--- a/cpp/lib/client/ClientMessage.cpp
+++ b/cpp/lib/client/ClientMessage.cpp
@@ -19,7 +19,6 @@
*
*/
#include <ClientMessage.h>
-
using namespace qpid::client;
using namespace qpid::framing;
@@ -40,63 +39,63 @@ Message::Message(AMQHeaderBody::shared_ptr& _header) : header(_header){
Message::~Message(){
}
-BasicHeaderProperties* Message::getHeaderProperties(){
+BasicHeaderProperties* Message::getHeaderProperties() const {
return dynamic_cast<BasicHeaderProperties*>(header->getProperties());
}
-const std::string& Message::getContentType(){
+const std::string& Message::getContentType() const {
return getHeaderProperties()->getContentType();
}
-const std::string& Message::getContentEncoding(){
+const std::string& Message::getContentEncoding() const {
return getHeaderProperties()->getContentEncoding();
}
-FieldTable& Message::getHeaders(){
+FieldTable& Message::getHeaders() const {
return getHeaderProperties()->getHeaders();
}
-u_int8_t Message::getDeliveryMode(){
+u_int8_t Message::getDeliveryMode() const {
return getHeaderProperties()->getDeliveryMode();
}
-u_int8_t Message::getPriority(){
+u_int8_t Message::getPriority() const {
return getHeaderProperties()->getPriority();
}
-const std::string& Message::getCorrelationId(){
+const std::string& Message::getCorrelationId() const {
return getHeaderProperties()->getCorrelationId();
}
-const std::string& Message::getReplyTo(){
+const std::string& Message::getReplyTo() const {
return getHeaderProperties()->getReplyTo();
}
-const std::string& Message::getExpiration(){
+const std::string& Message::getExpiration() const {
return getHeaderProperties()->getExpiration();
}
-const std::string& Message::getMessageId(){
+const std::string& Message::getMessageId() const {
return getHeaderProperties()->getMessageId();
}
-u_int64_t Message::getTimestamp(){
+u_int64_t Message::getTimestamp() const {
return getHeaderProperties()->getTimestamp();
}
-const std::string& Message::getType(){
+const std::string& Message::getType() const {
return getHeaderProperties()->getType();
}
-const std::string& Message::getUserId(){
+const std::string& Message::getUserId() const {
return getHeaderProperties()->getUserId();
}
-const std::string& Message::getAppId(){
+const std::string& Message::getAppId() const {
return getHeaderProperties()->getAppId();
}
-const std::string& Message::getClusterId(){
+const std::string& Message::getClusterId() const {
return getHeaderProperties()->getClusterId();
}
@@ -155,3 +154,9 @@ void Message::setAppId(const std::string& appId){
void Message::setClusterId(const std::string& clusterId){
getHeaderProperties()->setClusterId(clusterId);
}
+
+
+u_int64_t Message::getDeliveryTag() const {
+ BasicDeliverBody* deliver=dynamic_cast<BasicDeliverBody*>(method.get());
+ return deliver ? deliver->getDeliveryTag() : 0;
+}
diff --git a/cpp/lib/client/ClientMessage.h b/cpp/lib/client/ClientMessage.h
index 148f9240c8..8661f6b791 100644
--- a/cpp/lib/client/ClientMessage.h
+++ b/cpp/lib/client/ClientMessage.h
@@ -25,89 +25,99 @@
#include <framing/amqp_framing.h>
namespace qpid {
+
namespace client {
+class IncomingMessage;
- /**
- * A representation of messages for sent or recived through the
- * client api.
- *
- * \ingroup clientapi
- */
- class Message{
- qpid::framing::AMQHeaderBody::shared_ptr header;
- std::string data;
- bool redelivered;
- u_int64_t deliveryTag;
+/**
+ * A representation of messages for sent or recived through the
+ * client api.
+ *
+ * \ingroup clientapi
+ */
+class Message {
+ framing::AMQMethodBody::shared_ptr method;
+ framing::AMQHeaderBody::shared_ptr header;
+ std::string data;
+ bool redelivered;
- qpid::framing::BasicHeaderProperties* getHeaderProperties();
- Message(qpid::framing::AMQHeaderBody::shared_ptr& header);
+ // FIXME aconway 2007-02-20: const incorrect, needs const return type.
+ framing::BasicHeaderProperties* getHeaderProperties() const;
+ Message(qpid::framing::AMQHeaderBody::shared_ptr& header);
- public:
- Message(const std::string& data=std::string());
- ~Message();
+ public:
+ Message(const std::string& data=std::string());
+ ~Message();
- /**
- * Allows the application to access the content of messages
- * received.
- *
- * @return a string representing the data of the message
- */
- std::string getData() const { return data; }
+ /**
+ * Allows the application to access the content of messages
+ * received.
+ *
+ * @return a string representing the data of the message
+ */
+ std::string getData() const { return data; }
- /**
- * Allows the application to set the content of messages to be
- * sent.
- *
- * @param data a string representing the data of the message
- */
- void setData(const std::string& _data);
+ /**
+ * Allows the application to set the content of messages to be
+ * sent.
+ *
+ * @param data a string representing the data of the message
+ */
+ void setData(const std::string& _data);
- /**
- * @return true if this message was delivered previously (to
- * any consumer) but was not acknowledged.
- */
- inline bool isRedelivered(){ return redelivered; }
- inline void setRedelivered(bool _redelivered){ redelivered = _redelivered; }
+ /**
+ * @return true if this message was delivered previously (to
+ * any consumer) but was not acknowledged.
+ */
+ bool isRedelivered(){ return redelivered; }
+ void setRedelivered(bool _redelivered){ redelivered = _redelivered; }
- inline u_int64_t getDeliveryTag(){ return deliveryTag; }
+ u_int64_t getDeliveryTag() const;
- const std::string& getContentType();
- const std::string& getContentEncoding();
- qpid::framing::FieldTable& getHeaders();
- u_int8_t getDeliveryMode();
- u_int8_t getPriority();
- const std::string& getCorrelationId();
- const std::string& getReplyTo();
- const std::string& getExpiration();
- const std::string& getMessageId();
- u_int64_t getTimestamp();
- const std::string& getType();
- const std::string& getUserId();
- const std::string& getAppId();
- const std::string& getClusterId();
+ const std::string& getContentType() const;
+ const std::string& getContentEncoding() const;
+ qpid::framing::FieldTable& getHeaders() const;
+ u_int8_t getDeliveryMode() const;
+ u_int8_t getPriority() const;
+ const std::string& getCorrelationId() const;
+ const std::string& getReplyTo() const;
+ const std::string& getExpiration() const;
+ const std::string& getMessageId() const;
+ u_int64_t getTimestamp() const;
+ const std::string& getType() const;
+ const std::string& getUserId() const;
+ const std::string& getAppId() const;
+ const std::string& getClusterId() const;
- void setContentType(const std::string& type);
- void setContentEncoding(const std::string& encoding);
- void setHeaders(const qpid::framing::FieldTable& headers);
- /**
- * Sets the delivery mode. 1 = non-durable, 2 = durable.
- */
- void setDeliveryMode(u_int8_t mode);
- void setPriority(u_int8_t priority);
- void setCorrelationId(const std::string& correlationId);
- void setReplyTo(const std::string& replyTo);
- void setExpiration(const std::string& expiration);
- void setMessageId(const std::string& messageId);
- void setTimestamp(u_int64_t timestamp);
- void setType(const std::string& type);
- void setUserId(const std::string& userId);
- void setAppId(const std::string& appId);
- void setClusterId(const std::string& clusterId);
+ void setContentType(const std::string& type);
+ void setContentEncoding(const std::string& encoding);
+ void setHeaders(const qpid::framing::FieldTable& headers);
+ /**
+ * Sets the delivery mode. 1 = non-durable, 2 = durable.
+ */
+ void setDeliveryMode(u_int8_t mode);
+ void setPriority(u_int8_t priority);
+ void setCorrelationId(const std::string& correlationId);
+ void setReplyTo(const std::string& replyTo);
+ void setExpiration(const std::string& expiration);
+ void setMessageId(const std::string& messageId);
+ void setTimestamp(u_int64_t timestamp);
+ void setType(const std::string& type);
+ void setUserId(const std::string& userId);
+ void setAppId(const std::string& appId);
+ void setClusterId(const std::string& clusterId);
+ /** Get the method used to deliver this message */
+ boost::shared_ptr<framing::AMQMethodBody> getMethod() const
+ { return method; }
+
+ void setMethod(framing::AMQMethodBody::shared_ptr m) { method=m; }
+ boost::shared_ptr<framing::AMQHeaderBody> getHeader();
- // TODO aconway 2007-02-15: remove friendships.
- friend class Channel;
- };
+ // TODO aconway 2007-02-15: remove friendships.
+ friend class IncomingMessage;
+ friend class Channel;
+};
}}
diff --git a/cpp/lib/client/Connection.cpp b/cpp/lib/client/Connection.cpp
index 5b97ca8e5d..566c8fc573 100644
--- a/cpp/lib/client/Connection.cpp
+++ b/cpp/lib/client/Connection.cpp
@@ -18,7 +18,9 @@
* under the License.
*
*/
+#include <algorithm>
#include <boost/format.hpp>
+#include <boost/bind.hpp>
#include <Connection.h>
#include <ClientChannel.h>
@@ -27,7 +29,6 @@
#include <iostream>
#include <sstream>
#include <MethodBodyInstances.h>
-#include <boost/bind.hpp>
#include <functional>
using namespace qpid::framing;
@@ -83,15 +84,17 @@ void Connection::close(
{
if(isOpen) {
// TODO aconway 2007-01-29: Exception handling - could end up
- // partly closed.
+ // partly closed with threads left unjoined.
isOpen = false;
channel0.sendAndReceive<ConnectionCloseOkBody>(
new ConnectionCloseBody(
getVersion(), code, msg, classId, methodId));
- while(!channels.empty()) {
- channels.begin()->second->close();
- channels.erase(channels.begin());
- }
+
+ using boost::bind;
+ for_each(channels.begin(), channels.end(),
+ bind(&Channel::closeInternal,
+ bind(&ChannelMap::value_type::second, _1)));
+ channels.clear();
connector->close();
}
}
diff --git a/cpp/lib/client/IncomingMessage.cpp b/cpp/lib/client/IncomingMessage.cpp
index c1f6ca880f..07f94ceb64 100644
--- a/cpp/lib/client/IncomingMessage.cpp
+++ b/cpp/lib/client/IncomingMessage.cpp
@@ -19,58 +19,154 @@
*
*/
#include <IncomingMessage.h>
+#include "framing/AMQHeaderBody.h"
+#include "framing/AMQContentBody.h"
+#include "BasicGetOkBody.h"
+#include "BasicReturnBody.h"
+#include "BasicDeliverBody.h"
#include <QpidError.h>
#include <iostream>
-using namespace qpid::client;
-using namespace qpid::framing;
+namespace qpid {
+namespace client {
-IncomingMessage::IncomingMessage(BasicDeliverBody::shared_ptr intro) : delivered(intro){}
-IncomingMessage::IncomingMessage(BasicReturnBody::shared_ptr intro): returned(intro){}
-IncomingMessage::IncomingMessage(BasicGetOkBody::shared_ptr intro): response(intro){}
+using namespace sys;
+using namespace framing;
-IncomingMessage::~IncomingMessage(){
+struct IncomingMessage::Guard: public Mutex::ScopedLock {
+ Guard(IncomingMessage* im) : Mutex::ScopedLock(im->lock) {
+ im->shutdownError.throwIf();
+ }
+};
+
+IncomingMessage::IncomingMessage() { reset(); }
+
+void IncomingMessage::reset() {
+ state = &IncomingMessage::expectRequest;
+ endFn= &IncomingMessage::endRequest;
+ buildMessage = Message();
+}
+
+void IncomingMessage::startGet() {
+ Guard g(this);
+ if (state != &IncomingMessage::expectRequest) {
+ endGet(new QPID_ERROR(CLIENT_ERROR, "Message already in progress."));
+ }
+ else {
+ state = &IncomingMessage::expectGetOk;
+ endFn = &IncomingMessage::endGet;
+ getError.reset();
+ getState = GETTING;
+ }
}
-void IncomingMessage::setHeader(AMQHeaderBody::shared_ptr _header){
- this->header = _header;
+bool IncomingMessage::waitGet(Message& msg) {
+ Guard g(this);
+ while (getState == GETTING && !shutdownError && !getError)
+ getReady.wait(lock);
+ shutdownError.throwIf();
+ getError.throwIf();
+ msg = getMessage;
+ return getState==GOT;
}
-void IncomingMessage::addContent(AMQContentBody::shared_ptr content){
- data.append(content->getData());
+Message IncomingMessage::waitDispatch() {
+ Guard g(this);
+ while(dispatchQueue.empty() && !shutdownError)
+ dispatchReady.wait(lock);
+ shutdownError.throwIf();
+
+ Message msg(dispatchQueue.front());
+ dispatchQueue.pop();
+ return msg;
}
-bool IncomingMessage::isComplete(){
- return header != 0 && header->getContentSize() == data.size();
+void IncomingMessage::add(BodyPtr body) {
+ Guard g(this);
+ shutdownError.throwIf();
+ // Call the current state function.
+ (this->*state)(body);
}
-bool IncomingMessage::isReturn(){
- return returned;
+void IncomingMessage::shutdown() {
+ Mutex::ScopedLock l(lock);
+ shutdownError.reset(new ShutdownException());
+ getReady.notify();
+ dispatchReady.notify();
}
-bool IncomingMessage::isDelivery(){
- return delivered;
+bool IncomingMessage::isShutdown() const {
+ Mutex::ScopedLock l(lock);
+ return shutdownError;
}
-bool IncomingMessage::isResponse(){
- return response;
+// Common check for all the expect functions. Called in network thread.
+template<class T>
+boost::shared_ptr<T> IncomingMessage::expectCheck(BodyPtr body) {
+ boost::shared_ptr<T> ptr = boost::dynamic_pointer_cast<T>(body);
+ if (!ptr)
+ throw QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame type");
+ return ptr;
}
-const string& IncomingMessage::getConsumerTag(){
- if(!isDelivery()) THROW_QPID_ERROR(CLIENT_ERROR, "Consumer tag only valid for delivery");
- return delivered->getConsumerTag();
+void IncomingMessage::expectGetOk(BodyPtr body) {
+ if (dynamic_cast<BasicGetOkBody*>(body.get()))
+ state = &IncomingMessage::expectHeader;
+ else if (dynamic_cast<BasicGetEmptyBody*>(body.get())) {
+ getState = EMPTY;
+ endGet();
+ }
+ else
+ throw QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame type");
}
-u_int64_t IncomingMessage::getDeliveryTag(){
- if(!isDelivery()) THROW_QPID_ERROR(CLIENT_ERROR, "Delivery tag only valid for delivery");
- return delivered->getDeliveryTag();
+void IncomingMessage::expectHeader(BodyPtr body) {
+ AMQHeaderBody::shared_ptr header = expectCheck<AMQHeaderBody>(body);
+ buildMessage.header = header;
+ state = &IncomingMessage::expectContent;
+ checkComplete();
}
-AMQHeaderBody::shared_ptr& IncomingMessage::getHeader(){
- return header;
+void IncomingMessage::expectContent(BodyPtr body) {
+ AMQContentBody::shared_ptr content = expectCheck<AMQContentBody>(body);
+ buildMessage.setData(buildMessage.getData() + content->getData());
+ checkComplete();
+}
+
+void IncomingMessage::checkComplete() {
+ size_t declaredSize = buildMessage.header->getContentSize();
+ size_t currentSize = buildMessage.getData().size();
+ if (declaredSize == currentSize)
+ (this->*endFn)(0);
+ else if (declaredSize < currentSize)
+ (this->*endFn)(new QPID_ERROR(
+ PROTOCOL_ERROR, "Message content exceeds declared size."));
+}
+
+void IncomingMessage::expectRequest(BodyPtr body) {
+ AMQMethodBody::shared_ptr method = expectCheck<AMQMethodBody>(body);
+ buildMessage.setMethod(method);
+ state = &IncomingMessage::expectHeader;
+}
+
+void IncomingMessage::endGet(Exception* ex) {
+ getError.reset(ex);
+ if (getState == GETTING) {
+ getMessage = buildMessage;
+ getState = GOT;
+ }
+ reset();
+ getReady.notify();
}
-std::string IncomingMessage::getData() const {
- return data;
+void IncomingMessage::endRequest(Exception* ex) {
+ ExceptionHolder eh(ex);
+ if (!eh) {
+ dispatchQueue.push(buildMessage);
+ reset();
+ dispatchReady.notify();
+ }
+ eh.throwIf();
}
+}} // namespace qpid::client
diff --git a/cpp/lib/client/IncomingMessage.h b/cpp/lib/client/IncomingMessage.h
index a2aa4d8441..2d7c8723c5 100644
--- a/cpp/lib/client/IncomingMessage.h
+++ b/cpp/lib/client/IncomingMessage.h
@@ -1,3 +1,6 @@
+#ifndef _IncomingMessage_
+#define _IncomingMessage_
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -19,43 +22,97 @@
*
*/
#include <string>
-#include <vector>
+#include <queue>
#include <framing/amqp_framing.h>
+#include "ExceptionHolder.h"
+#include "ClientMessage.h"
+#include "sys/Mutex.h"
+#include "sys/Condition.h"
-#ifndef _IncomingMessage_
-#define _IncomingMessage_
+namespace qpid {
-#include <ClientMessage.h>
+namespace framing {
+class AMQBody;
+}
-namespace qpid {
namespace client {
+/**
+ * Accumulates incoming message frames into messages.
+ * Client-initiated messages (basic.get) are initiated and made
+ * available to the user thread one at a time.
+ *
+ * Broker initiated messages (basic.return, basic.deliver) are
+ * queued for handling by the user dispatch thread.
+ */
+class IncomingMessage {
+ public:
+ typedef boost::shared_ptr<framing::AMQBody> BodyPtr;
+ IncomingMessage();
+
+ /** Expect a new message starting with getOk. Called in user thread.*/
+ void startGet();
- class IncomingMessage{
- //content will be preceded by one of these method frames
- qpid::framing::BasicDeliverBody::shared_ptr delivered;
- qpid::framing::BasicReturnBody::shared_ptr returned;
- qpid::framing::BasicGetOkBody::shared_ptr response;
- qpid::framing::AMQHeaderBody::shared_ptr header;
- std::string data;
- public:
- IncomingMessage(qpid::framing::BasicDeliverBody::shared_ptr intro);
- IncomingMessage(qpid::framing::BasicReturnBody::shared_ptr intro);
- IncomingMessage(qpid::framing::BasicGetOkBody::shared_ptr intro);
- ~IncomingMessage();
- void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
- void addContent(qpid::framing::AMQContentBody::shared_ptr content);
- bool isComplete();
- bool isReturn();
- bool isDelivery();
- bool isResponse();
- const std::string& getConsumerTag();//only relevant if isDelivery()
- qpid::framing::AMQHeaderBody::shared_ptr& getHeader();
- u_int64_t getDeliveryTag();
- std::string getData() const;
- };
+ /** Wait for the message to complete, return the message.
+ * Called in user thread.
+ *@raises QpidError if there was an error.
+ */
+ bool waitGet(Message&);
-}
-}
+ /** Wait for the next broker-initiated message. */
+ Message waitDispatch();
+
+ /** Add a frame body to the message. Called in network thread. */
+ void add(BodyPtr);
+
+ /** Shut down: all further calls to any function throw ex. */
+ void shutdown();
+
+ /** Check if shutdown */
+ bool isShutdown() const;
+
+ private:
+
+ typedef void (IncomingMessage::* ExpectFn)(BodyPtr);
+ typedef void (IncomingMessage::* EndFn)(Exception*);
+ typedef std::queue<Message> MessageQueue;
+ struct Guard;
+ friend struct Guard;
+
+ void reset();
+ template <class T> boost::shared_ptr<T> expectCheck(BodyPtr);
+
+ // State functions - a state machine where each state is
+ // a member function that processes a frame body.
+ void expectGetOk(BodyPtr);
+ void expectHeader(BodyPtr);
+ void expectContent(BodyPtr);
+ void expectRequest(BodyPtr);
+
+ // End functions.
+ void endGet(Exception* ex = 0);
+ void endRequest(Exception* ex);
+
+ // Check for complete message.
+ void checkComplete();
+
+ mutable sys::Mutex lock;
+ ExpectFn state;
+ EndFn endFn;
+ Message buildMessage;
+ ExceptionHolder shutdownError;
+
+ // For basic.get messages.
+ sys::Condition getReady;
+ ExceptionHolder getError;
+ Message getMessage;
+ enum { GETTING, GOT, EMPTY } getState;
+
+ // For broker-initiated messages
+ sys::Condition dispatchReady;
+ MessageQueue dispatchQueue;
+};
+
+}}
#endif
diff --git a/cpp/lib/client/ResponseHandler.cpp b/cpp/lib/client/ResponseHandler.cpp
index ea48fa2386..4498de41ae 100644
--- a/cpp/lib/client/ResponseHandler.cpp
+++ b/cpp/lib/client/ResponseHandler.cpp
@@ -59,11 +59,8 @@ void ResponseHandler::receive(ClassId c, MethodId m) {
Monitor::ScopedLock l(monitor);
while (waiting)
monitor.wait();
- if (!response) {
- THROW_QPID_ERROR(
- PROTOCOL_ERROR, "Channel closed unexpectedly.");
- }
- if(!validate(response->amqpClassId(), response->amqpMethodId())) {
+ getResponse(); // Check for closed.
+ if(!validate(response->amqpClassId(), response->amqpMethodId())) {
THROW_QPID_ERROR(
PROTOCOL_ERROR,
boost::format("Expected class:method %d:%d, got %d:%d")
@@ -71,6 +68,13 @@ void ResponseHandler::receive(ClassId c, MethodId m) {
}
}
+framing::AMQMethodBody::shared_ptr ResponseHandler::getResponse() {
+ if (!response)
+ THROW_QPID_ERROR(
+ PROTOCOL_ERROR, "Channel closed unexpectedly.");
+ return response;
+}
+
RequestId ResponseHandler::getRequestId() {
assert(response->getRequestId());
return response->getRequestId();
diff --git a/cpp/lib/client/ResponseHandler.h b/cpp/lib/client/ResponseHandler.h
index af0c250eb1..d28048c3d3 100644
--- a/cpp/lib/client/ResponseHandler.h
+++ b/cpp/lib/client/ResponseHandler.h
@@ -42,7 +42,7 @@ class ResponseHandler{
~ResponseHandler();
bool isWaiting(){ return waiting; }
- framing::AMQMethodBody::shared_ptr getResponse(){ return response;}
+ framing::AMQMethodBody::shared_ptr getResponse();
void waitForResponse();
void signalResponse(framing::AMQMethodBody::shared_ptr response);