diff options
Diffstat (limited to 'qpid/cpp-0-9/lib/client/IncomingMessage.cpp')
| -rw-r--r-- | qpid/cpp-0-9/lib/client/IncomingMessage.cpp | 172 |
1 files changed, 0 insertions, 172 deletions
diff --git a/qpid/cpp-0-9/lib/client/IncomingMessage.cpp b/qpid/cpp-0-9/lib/client/IncomingMessage.cpp deleted file mode 100644 index 07f94ceb64..0000000000 --- a/qpid/cpp-0-9/lib/client/IncomingMessage.cpp +++ /dev/null @@ -1,172 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <IncomingMessage.h> -#include "framing/AMQHeaderBody.h" -#include "framing/AMQContentBody.h" -#include "BasicGetOkBody.h" -#include "BasicReturnBody.h" -#include "BasicDeliverBody.h" -#include <QpidError.h> -#include <iostream> - -namespace qpid { -namespace client { - -using namespace sys; -using namespace framing; - -struct IncomingMessage::Guard: public Mutex::ScopedLock { - Guard(IncomingMessage* im) : Mutex::ScopedLock(im->lock) { - im->shutdownError.throwIf(); - } -}; - -IncomingMessage::IncomingMessage() { reset(); } - -void IncomingMessage::reset() { - state = &IncomingMessage::expectRequest; - endFn= &IncomingMessage::endRequest; - buildMessage = Message(); -} - -void IncomingMessage::startGet() { - Guard g(this); - if (state != &IncomingMessage::expectRequest) { - endGet(new QPID_ERROR(CLIENT_ERROR, "Message already in progress.")); - } - else { - state = &IncomingMessage::expectGetOk; - endFn = &IncomingMessage::endGet; - getError.reset(); - getState = GETTING; - } -} - -bool IncomingMessage::waitGet(Message& msg) { - Guard g(this); - while (getState == GETTING && !shutdownError && !getError) - getReady.wait(lock); - shutdownError.throwIf(); - getError.throwIf(); - msg = getMessage; - return getState==GOT; -} - -Message IncomingMessage::waitDispatch() { - Guard g(this); - while(dispatchQueue.empty() && !shutdownError) - dispatchReady.wait(lock); - shutdownError.throwIf(); - - Message msg(dispatchQueue.front()); - dispatchQueue.pop(); - return msg; -} - -void IncomingMessage::add(BodyPtr body) { - Guard g(this); - shutdownError.throwIf(); - // Call the current state function. - (this->*state)(body); -} - -void IncomingMessage::shutdown() { - Mutex::ScopedLock l(lock); - shutdownError.reset(new ShutdownException()); - getReady.notify(); - dispatchReady.notify(); -} - -bool IncomingMessage::isShutdown() const { - Mutex::ScopedLock l(lock); - return shutdownError; -} - -// Common check for all the expect functions. Called in network thread. -template<class T> -boost::shared_ptr<T> IncomingMessage::expectCheck(BodyPtr body) { - boost::shared_ptr<T> ptr = boost::dynamic_pointer_cast<T>(body); - if (!ptr) - throw QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame type"); - return ptr; -} - -void IncomingMessage::expectGetOk(BodyPtr body) { - if (dynamic_cast<BasicGetOkBody*>(body.get())) - state = &IncomingMessage::expectHeader; - else if (dynamic_cast<BasicGetEmptyBody*>(body.get())) { - getState = EMPTY; - endGet(); - } - else - throw QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame type"); -} - -void IncomingMessage::expectHeader(BodyPtr body) { - AMQHeaderBody::shared_ptr header = expectCheck<AMQHeaderBody>(body); - buildMessage.header = header; - state = &IncomingMessage::expectContent; - checkComplete(); -} - -void IncomingMessage::expectContent(BodyPtr body) { - AMQContentBody::shared_ptr content = expectCheck<AMQContentBody>(body); - buildMessage.setData(buildMessage.getData() + content->getData()); - checkComplete(); -} - -void IncomingMessage::checkComplete() { - size_t declaredSize = buildMessage.header->getContentSize(); - size_t currentSize = buildMessage.getData().size(); - if (declaredSize == currentSize) - (this->*endFn)(0); - else if (declaredSize < currentSize) - (this->*endFn)(new QPID_ERROR( - PROTOCOL_ERROR, "Message content exceeds declared size.")); -} - -void IncomingMessage::expectRequest(BodyPtr body) { - AMQMethodBody::shared_ptr method = expectCheck<AMQMethodBody>(body); - buildMessage.setMethod(method); - state = &IncomingMessage::expectHeader; -} - -void IncomingMessage::endGet(Exception* ex) { - getError.reset(ex); - if (getState == GETTING) { - getMessage = buildMessage; - getState = GOT; - } - reset(); - getReady.notify(); -} - -void IncomingMessage::endRequest(Exception* ex) { - ExceptionHolder eh(ex); - if (!eh) { - dispatchQueue.push(buildMessage); - reset(); - dispatchReady.notify(); - } - eh.throwIf(); -} - -}} // namespace qpid::client |
