summaryrefslogtreecommitdiff
path: root/cpp/lib/client/IncomingMessage.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-02-21 19:25:45 +0000
committerAlan Conway <aconway@apache.org>2007-02-21 19:25:45 +0000
commit876d0b94c37f252b08c81656386100fad18a8a46 (patch)
tree4840b0d697d4629fd5c518507b58fceb7de1578a /cpp/lib/client/IncomingMessage.cpp
parentc36fb4454be5ce4311aa5f5d0e5683db713c5545 (diff)
downloadqpid-python-876d0b94c37f252b08c81656386100fad18a8a46.tar.gz
Thread safety fixes for race conditions on incoming messages.
* cpp/lib/client/MessageListener.h: const correctness. * cpp/tests/*: MessageListener const change. * cpp/lib/broker/Content.h: Removed out-of-date FIXME comments. * cpp/lib/client/ClientChannel.h/ .cpp(): - added locking for consumers map and other member access. - refactored implementations of Basic get, deliver, return: most logic now encapsulted in IncomingMessage class. - fix channel close problems. * cpp/lib/client/ClientMessage.h/.cpp: - const correctness & API convenience fixes. - getMethod/setMethod/getHeader: for new IncomingMessage * cpp/lib/client/Connection.h/.cpp: - Fixes to channel closure. * cpp/lib/client/IncomingMessage.h/.cpp: - Encapsulate *all* incoming message handling for client. - Moved handling of BasicGetOk to IncomingMessage to fix race. - Thread safety fixes. * cpp/lib/client/ResponseHandler.h/.cpp: - added getResponse for ClientChannel. * cpp/lib/common/Exception.h: - added missing throwSelf implementations. - added ShutdownException as general purpose shut-down indicator. - added EmptyException as general purpose "empty" indicator. * cpp/lib/common/sys/Condition|Monitor|Mutex.h|.cpp: - Condition variable abstraction extracted from Monitor for situations where a single lock is associated with multiple conditions. * cpp/tests/ClientChannelTest.cpp: - Test incoming message transfer, get, consume etc. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@510161 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/client/IncomingMessage.cpp')
-rw-r--r--cpp/lib/client/IncomingMessage.cpp152
1 files changed, 124 insertions, 28 deletions
diff --git a/cpp/lib/client/IncomingMessage.cpp b/cpp/lib/client/IncomingMessage.cpp
index c1f6ca880f..07f94ceb64 100644
--- a/cpp/lib/client/IncomingMessage.cpp
+++ b/cpp/lib/client/IncomingMessage.cpp
@@ -19,58 +19,154 @@
*
*/
#include <IncomingMessage.h>
+#include "framing/AMQHeaderBody.h"
+#include "framing/AMQContentBody.h"
+#include "BasicGetOkBody.h"
+#include "BasicReturnBody.h"
+#include "BasicDeliverBody.h"
#include <QpidError.h>
#include <iostream>
-using namespace qpid::client;
-using namespace qpid::framing;
+namespace qpid {
+namespace client {
-IncomingMessage::IncomingMessage(BasicDeliverBody::shared_ptr intro) : delivered(intro){}
-IncomingMessage::IncomingMessage(BasicReturnBody::shared_ptr intro): returned(intro){}
-IncomingMessage::IncomingMessage(BasicGetOkBody::shared_ptr intro): response(intro){}
+using namespace sys;
+using namespace framing;
-IncomingMessage::~IncomingMessage(){
+struct IncomingMessage::Guard: public Mutex::ScopedLock {
+ Guard(IncomingMessage* im) : Mutex::ScopedLock(im->lock) {
+ im->shutdownError.throwIf();
+ }
+};
+
+IncomingMessage::IncomingMessage() { reset(); }
+
+void IncomingMessage::reset() {
+ state = &IncomingMessage::expectRequest;
+ endFn= &IncomingMessage::endRequest;
+ buildMessage = Message();
+}
+
+void IncomingMessage::startGet() {
+ Guard g(this);
+ if (state != &IncomingMessage::expectRequest) {
+ endGet(new QPID_ERROR(CLIENT_ERROR, "Message already in progress."));
+ }
+ else {
+ state = &IncomingMessage::expectGetOk;
+ endFn = &IncomingMessage::endGet;
+ getError.reset();
+ getState = GETTING;
+ }
}
-void IncomingMessage::setHeader(AMQHeaderBody::shared_ptr _header){
- this->header = _header;
+bool IncomingMessage::waitGet(Message& msg) {
+ Guard g(this);
+ while (getState == GETTING && !shutdownError && !getError)
+ getReady.wait(lock);
+ shutdownError.throwIf();
+ getError.throwIf();
+ msg = getMessage;
+ return getState==GOT;
}
-void IncomingMessage::addContent(AMQContentBody::shared_ptr content){
- data.append(content->getData());
+Message IncomingMessage::waitDispatch() {
+ Guard g(this);
+ while(dispatchQueue.empty() && !shutdownError)
+ dispatchReady.wait(lock);
+ shutdownError.throwIf();
+
+ Message msg(dispatchQueue.front());
+ dispatchQueue.pop();
+ return msg;
}
-bool IncomingMessage::isComplete(){
- return header != 0 && header->getContentSize() == data.size();
+void IncomingMessage::add(BodyPtr body) {
+ Guard g(this);
+ shutdownError.throwIf();
+ // Call the current state function.
+ (this->*state)(body);
}
-bool IncomingMessage::isReturn(){
- return returned;
+void IncomingMessage::shutdown() {
+ Mutex::ScopedLock l(lock);
+ shutdownError.reset(new ShutdownException());
+ getReady.notify();
+ dispatchReady.notify();
}
-bool IncomingMessage::isDelivery(){
- return delivered;
+bool IncomingMessage::isShutdown() const {
+ Mutex::ScopedLock l(lock);
+ return shutdownError;
}
-bool IncomingMessage::isResponse(){
- return response;
+// Common check for all the expect functions. Called in network thread.
+template<class T>
+boost::shared_ptr<T> IncomingMessage::expectCheck(BodyPtr body) {
+ boost::shared_ptr<T> ptr = boost::dynamic_pointer_cast<T>(body);
+ if (!ptr)
+ throw QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame type");
+ return ptr;
}
-const string& IncomingMessage::getConsumerTag(){
- if(!isDelivery()) THROW_QPID_ERROR(CLIENT_ERROR, "Consumer tag only valid for delivery");
- return delivered->getConsumerTag();
+void IncomingMessage::expectGetOk(BodyPtr body) {
+ if (dynamic_cast<BasicGetOkBody*>(body.get()))
+ state = &IncomingMessage::expectHeader;
+ else if (dynamic_cast<BasicGetEmptyBody*>(body.get())) {
+ getState = EMPTY;
+ endGet();
+ }
+ else
+ throw QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame type");
}
-u_int64_t IncomingMessage::getDeliveryTag(){
- if(!isDelivery()) THROW_QPID_ERROR(CLIENT_ERROR, "Delivery tag only valid for delivery");
- return delivered->getDeliveryTag();
+void IncomingMessage::expectHeader(BodyPtr body) {
+ AMQHeaderBody::shared_ptr header = expectCheck<AMQHeaderBody>(body);
+ buildMessage.header = header;
+ state = &IncomingMessage::expectContent;
+ checkComplete();
}
-AMQHeaderBody::shared_ptr& IncomingMessage::getHeader(){
- return header;
+void IncomingMessage::expectContent(BodyPtr body) {
+ AMQContentBody::shared_ptr content = expectCheck<AMQContentBody>(body);
+ buildMessage.setData(buildMessage.getData() + content->getData());
+ checkComplete();
+}
+
+void IncomingMessage::checkComplete() {
+ size_t declaredSize = buildMessage.header->getContentSize();
+ size_t currentSize = buildMessage.getData().size();
+ if (declaredSize == currentSize)
+ (this->*endFn)(0);
+ else if (declaredSize < currentSize)
+ (this->*endFn)(new QPID_ERROR(
+ PROTOCOL_ERROR, "Message content exceeds declared size."));
+}
+
+void IncomingMessage::expectRequest(BodyPtr body) {
+ AMQMethodBody::shared_ptr method = expectCheck<AMQMethodBody>(body);
+ buildMessage.setMethod(method);
+ state = &IncomingMessage::expectHeader;
+}
+
+void IncomingMessage::endGet(Exception* ex) {
+ getError.reset(ex);
+ if (getState == GETTING) {
+ getMessage = buildMessage;
+ getState = GOT;
+ }
+ reset();
+ getReady.notify();
}
-std::string IncomingMessage::getData() const {
- return data;
+void IncomingMessage::endRequest(Exception* ex) {
+ ExceptionHolder eh(ex);
+ if (!eh) {
+ dispatchQueue.push(buildMessage);
+ reset();
+ dispatchReady.notify();
+ }
+ eh.throwIf();
}
+}} // namespace qpid::client