summaryrefslogtreecommitdiff
path: root/qpid/cpp-0-9/lib/client/IncomingMessage.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp-0-9/lib/client/IncomingMessage.cpp')
-rw-r--r--qpid/cpp-0-9/lib/client/IncomingMessage.cpp172
1 files changed, 0 insertions, 172 deletions
diff --git a/qpid/cpp-0-9/lib/client/IncomingMessage.cpp b/qpid/cpp-0-9/lib/client/IncomingMessage.cpp
deleted file mode 100644
index 07f94ceb64..0000000000
--- a/qpid/cpp-0-9/lib/client/IncomingMessage.cpp
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <IncomingMessage.h>
-#include "framing/AMQHeaderBody.h"
-#include "framing/AMQContentBody.h"
-#include "BasicGetOkBody.h"
-#include "BasicReturnBody.h"
-#include "BasicDeliverBody.h"
-#include <QpidError.h>
-#include <iostream>
-
-namespace qpid {
-namespace client {
-
-using namespace sys;
-using namespace framing;
-
-struct IncomingMessage::Guard: public Mutex::ScopedLock {
- Guard(IncomingMessage* im) : Mutex::ScopedLock(im->lock) {
- im->shutdownError.throwIf();
- }
-};
-
-IncomingMessage::IncomingMessage() { reset(); }
-
-void IncomingMessage::reset() {
- state = &IncomingMessage::expectRequest;
- endFn= &IncomingMessage::endRequest;
- buildMessage = Message();
-}
-
-void IncomingMessage::startGet() {
- Guard g(this);
- if (state != &IncomingMessage::expectRequest) {
- endGet(new QPID_ERROR(CLIENT_ERROR, "Message already in progress."));
- }
- else {
- state = &IncomingMessage::expectGetOk;
- endFn = &IncomingMessage::endGet;
- getError.reset();
- getState = GETTING;
- }
-}
-
-bool IncomingMessage::waitGet(Message& msg) {
- Guard g(this);
- while (getState == GETTING && !shutdownError && !getError)
- getReady.wait(lock);
- shutdownError.throwIf();
- getError.throwIf();
- msg = getMessage;
- return getState==GOT;
-}
-
-Message IncomingMessage::waitDispatch() {
- Guard g(this);
- while(dispatchQueue.empty() && !shutdownError)
- dispatchReady.wait(lock);
- shutdownError.throwIf();
-
- Message msg(dispatchQueue.front());
- dispatchQueue.pop();
- return msg;
-}
-
-void IncomingMessage::add(BodyPtr body) {
- Guard g(this);
- shutdownError.throwIf();
- // Call the current state function.
- (this->*state)(body);
-}
-
-void IncomingMessage::shutdown() {
- Mutex::ScopedLock l(lock);
- shutdownError.reset(new ShutdownException());
- getReady.notify();
- dispatchReady.notify();
-}
-
-bool IncomingMessage::isShutdown() const {
- Mutex::ScopedLock l(lock);
- return shutdownError;
-}
-
-// Common check for all the expect functions. Called in network thread.
-template<class T>
-boost::shared_ptr<T> IncomingMessage::expectCheck(BodyPtr body) {
- boost::shared_ptr<T> ptr = boost::dynamic_pointer_cast<T>(body);
- if (!ptr)
- throw QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame type");
- return ptr;
-}
-
-void IncomingMessage::expectGetOk(BodyPtr body) {
- if (dynamic_cast<BasicGetOkBody*>(body.get()))
- state = &IncomingMessage::expectHeader;
- else if (dynamic_cast<BasicGetEmptyBody*>(body.get())) {
- getState = EMPTY;
- endGet();
- }
- else
- throw QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame type");
-}
-
-void IncomingMessage::expectHeader(BodyPtr body) {
- AMQHeaderBody::shared_ptr header = expectCheck<AMQHeaderBody>(body);
- buildMessage.header = header;
- state = &IncomingMessage::expectContent;
- checkComplete();
-}
-
-void IncomingMessage::expectContent(BodyPtr body) {
- AMQContentBody::shared_ptr content = expectCheck<AMQContentBody>(body);
- buildMessage.setData(buildMessage.getData() + content->getData());
- checkComplete();
-}
-
-void IncomingMessage::checkComplete() {
- size_t declaredSize = buildMessage.header->getContentSize();
- size_t currentSize = buildMessage.getData().size();
- if (declaredSize == currentSize)
- (this->*endFn)(0);
- else if (declaredSize < currentSize)
- (this->*endFn)(new QPID_ERROR(
- PROTOCOL_ERROR, "Message content exceeds declared size."));
-}
-
-void IncomingMessage::expectRequest(BodyPtr body) {
- AMQMethodBody::shared_ptr method = expectCheck<AMQMethodBody>(body);
- buildMessage.setMethod(method);
- state = &IncomingMessage::expectHeader;
-}
-
-void IncomingMessage::endGet(Exception* ex) {
- getError.reset(ex);
- if (getState == GETTING) {
- getMessage = buildMessage;
- getState = GOT;
- }
- reset();
- getReady.notify();
-}
-
-void IncomingMessage::endRequest(Exception* ex) {
- ExceptionHolder eh(ex);
- if (!eh) {
- dispatchQueue.push(buildMessage);
- reset();
- dispatchReady.notify();
- }
- eh.throwIf();
-}
-
-}} // namespace qpid::client