diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
commit | 66765100f4257159622cefe57bed50125a5ad017 (patch) | |
tree | a88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /cpp/src/qpid/client/ConnectionHandler.cpp | |
parent | 1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff) | |
parent | 88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff) | |
download | qpid-python-rajith_jms_client.tar.gz |
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/ConnectionHandler.cpp')
-rw-r--r-- | cpp/src/qpid/client/ConnectionHandler.cpp | 356 |
1 files changed, 0 insertions, 356 deletions
diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp deleted file mode 100644 index 4fbf55aa60..0000000000 --- a/cpp/src/qpid/client/ConnectionHandler.cpp +++ /dev/null @@ -1,356 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/client/ConnectionHandler.h" - -#include "qpid/SaslFactory.h" -#include "qpid/StringUtils.h" -#include "qpid/client/Bounds.h" -#include "qpid/framing/amqp_framing.h" -#include "qpid/framing/all_method_bodies.h" -#include "qpid/framing/ClientInvoker.h" -#include "qpid/framing/reply_exceptions.h" -#include "qpid/log/Helpers.h" -#include "qpid/log/Statement.h" -#include "qpid/sys/SystemInfo.h" - -using namespace qpid::client; -using namespace qpid::framing; -using namespace qpid::framing::connection; -using qpid::sys::SecurityLayer; -using qpid::sys::Duration; -using qpid::sys::TimerTask; -using qpid::sys::Timer; -using qpid::sys::AbsTime; -using qpid::sys::TIME_SEC; -using qpid::sys::ScopedLock; -using qpid::sys::Mutex; - -namespace { -const std::string OK("OK"); -const std::string PLAIN("PLAIN"); -const std::string en_US("en_US"); - -const std::string INVALID_STATE_START("start received in invalid state"); -const std::string INVALID_STATE_TUNE("tune received in invalid state"); -const std::string INVALID_STATE_OPEN_OK("open-ok received in invalid state"); -const std::string INVALID_STATE_CLOSE_OK("close-ok received in invalid state"); - -const std::string SESSION_FLOW_CONTROL("qpid.session_flow"); -const std::string CLIENT_PROCESS_NAME("qpid.client_process"); -const std::string CLIENT_PID("qpid.client_pid"); -const std::string CLIENT_PPID("qpid.client_ppid"); -const int SESSION_FLOW_CONTROL_VER = 1; -} - -CloseCode ConnectionHandler::convert(uint16_t replyCode) -{ - switch (replyCode) { - case 200: return CLOSE_CODE_NORMAL; - case 320: return CLOSE_CODE_CONNECTION_FORCED; - case 402: return CLOSE_CODE_INVALID_PATH; - case 501: default: - return CLOSE_CODE_FRAMING_ERROR; - } -} - -ConnectionHandler::Adapter::Adapter(ConnectionHandler& h, Bounds& b) : handler(h), bounds(b) {} -void ConnectionHandler::Adapter::handle(qpid::framing::AMQFrame& f) -{ - bounds.expand(f.encodedSize(), false); - handler.out(f); -} - -ConnectionHandler::ConnectionHandler(const ConnectionSettings& s, ProtocolVersion& v, Bounds& b) - : StateManager(NOT_STARTED), ConnectionSettings(s), outHandler(*this, b), proxy(outHandler), - errorCode(CLOSE_CODE_NORMAL), version(v) -{ - insist = true; - - ESTABLISHED.insert(FAILED); - ESTABLISHED.insert(CLOSED); - ESTABLISHED.insert(OPEN); - - FINISHED.insert(FAILED); - FINISHED.insert(CLOSED); - - properties.setInt(SESSION_FLOW_CONTROL, SESSION_FLOW_CONTROL_VER); - properties.setString(CLIENT_PROCESS_NAME, sys::SystemInfo::getProcessName()); - properties.setInt(CLIENT_PID, sys::SystemInfo::getProcessId()); - properties.setInt(CLIENT_PPID, sys::SystemInfo::getParentProcessId()); -} - -void ConnectionHandler::incoming(AMQFrame& frame) -{ - if (getState() == CLOSED) { - throw Exception("Received frame on closed connection"); - } - - if (rcvTimeoutTask) { - // Received frame on connection so delay timeout - rcvTimeoutTask->restart(); - } - - AMQBody* body = frame.getBody(); - try { - if (frame.getChannel() != 0 || !invoke(static_cast<ConnectionOperations&>(*this), *body)) { - switch(getState()) { - case OPEN: - in(frame); - break; - case CLOSING: - QPID_LOG(warning, "Ignoring frame while closing connection: " << frame); - break; - default: - throw Exception("Cannot receive frames on non-zero channel until connection is established."); - } - } - }catch(std::exception& e){ - QPID_LOG(warning, "Closing connection due to " << e.what()); - setState(CLOSING); - errorCode = CLOSE_CODE_FRAMING_ERROR; - errorText = e.what(); - proxy.close(501, e.what()); - } -} - -void ConnectionHandler::outgoing(AMQFrame& frame) -{ - if (getState() == OPEN) - out(frame); - else - throw TransportFailure(errorText.empty() ? "Connection is not open." : errorText); -} - -void ConnectionHandler::waitForOpen() -{ - waitFor(ESTABLISHED); - if (getState() == FAILED || getState() == CLOSED) { - throw ConnectionException(errorCode, errorText); - } -} - -void ConnectionHandler::close() -{ - switch (getState()) { - case NEGOTIATING: - case OPENING: - fail("Connection closed before it was established"); - break; - case OPEN: - if (setState(CLOSING, OPEN)) { - proxy.close(200, OK); - if (ConnectionSettings::heartbeat) { - //heartbeat timer is turned off at this stage, so don't wait indefinately - if (!waitFor(FINISHED, qpid::sys::Duration(ConnectionSettings::heartbeat * qpid::sys::TIME_SEC))) { - QPID_LOG(warning, "Connection close timed out"); - } - } else { - waitFor(FINISHED);//FINISHED = CLOSED or FAILED - } - } - //else, state was changed from open after we checked, can only - //change to failed or closed, so nothing to do - break; - - // Nothing to do if already CLOSING, CLOSED, FAILED or if NOT_STARTED - } -} - -void ConnectionHandler::heartbeat() -{ - // Do nothing - the purpose of heartbeats is just to make sure that there is some - // traffic on the connection within the heart beat interval, we check for the - // traffic and don't need to do anything in response to heartbeats - - // Although the above is still true we're now using a received heartbeat as a trigger - // to send out our own heartbeat - proxy.heartbeat(); -} - -void ConnectionHandler::checkState(STATES s, const std::string& msg) -{ - if (getState() != s) { - throw CommandInvalidException(msg); - } -} - -void ConnectionHandler::fail(const std::string& message) -{ - errorCode = CLOSE_CODE_FRAMING_ERROR; - errorText = message; - QPID_LOG(warning, message); - setState(FAILED); -} - -namespace { -std::string SPACE(" "); - -std::string join(const std::vector<std::string>& in) -{ - std::string result; - for (std::vector<std::string>::const_iterator i = in.begin(); i != in.end(); ++i) { - if (result.size()) result += SPACE; - result += *i; - } - return result; -} - -void intersection(const std::vector<std::string>& a, const std::vector<std::string>& b, std::vector<std::string>& results) -{ - for (std::vector<std::string>::const_iterator i = a.begin(); i != a.end(); ++i) { - if (std::find(b.begin(), b.end(), *i) != b.end()) results.push_back(*i); - } -} - -} - -void ConnectionHandler::start(const FieldTable& /*serverProps*/, const Array& mechanisms, const Array& /*locales*/) -{ - checkState(NOT_STARTED, INVALID_STATE_START); - setState(NEGOTIATING); - sasl = SaslFactory::getInstance().create( username, - password, - service, - host, - minSsf, - maxSsf - ); - - std::vector<std::string> mechlist; - if (mechanism.empty()) { - //mechlist is simply what the server offers - mechanisms.collect(mechlist); - } else { - //mechlist is the intersection of those indicated by user and - //those supported by server, in the order listed by user - std::vector<std::string> allowed = split(mechanism, " "); - std::vector<std::string> supported; - mechanisms.collect(supported); - intersection(allowed, supported, mechlist); - if (mechlist.empty()) { - throw Exception(QPID_MSG("Desired mechanism(s) not valid: " << mechanism << " (supported: " << join(supported) << ")")); - } - } - - if (sasl.get()) { - string response = sasl->start(join(mechlist), getSecuritySettings ? getSecuritySettings() : 0); - proxy.startOk(properties, sasl->getMechanism(), response, locale); - } else { - //TODO: verify that desired mechanism and locale are supported - string response = ((char)0) + username + ((char)0) + password; - proxy.startOk(properties, mechanism, response, locale); - } -} - -void ConnectionHandler::secure(const std::string& challenge) -{ - if (sasl.get()) { - string response = sasl->step(challenge); - proxy.secureOk(response); - } else { - throw NotImplementedException("Challenge-response cycle not yet implemented in client"); - } -} - -void ConnectionHandler::tune(uint16_t maxChannelsProposed, uint16_t maxFrameSizeProposed, - uint16_t heartbeatMin, uint16_t heartbeatMax) -{ - checkState(NEGOTIATING, INVALID_STATE_TUNE); - maxChannels = std::min(maxChannels, maxChannelsProposed); - maxFrameSize = std::min(maxFrameSize, maxFrameSizeProposed); - // Clip the requested heartbeat to the maximum/minimum offered - uint16_t heartbeat = ConnectionSettings::heartbeat; - heartbeat = heartbeat < heartbeatMin ? heartbeatMin : - heartbeat > heartbeatMax ? heartbeatMax : - heartbeat; - ConnectionSettings::heartbeat = heartbeat; - proxy.tuneOk(maxChannels, maxFrameSize, heartbeat); - setState(OPENING); - proxy.open(virtualhost, capabilities, insist); -} - -void ConnectionHandler::openOk ( const Array& knownBrokers ) -{ - checkState(OPENING, INVALID_STATE_OPEN_OK); - knownBrokersUrls.clear(); - framing::Array::ValueVector::const_iterator i; - for ( i = knownBrokers.begin(); i != knownBrokers.end(); ++i ) - knownBrokersUrls.push_back(Url((*i)->get<std::string>())); - if (sasl.get()) { - securityLayer = sasl->getSecurityLayer(maxFrameSize); - operUserId = sasl->getUserId(); - } - setState(OPEN); - QPID_LOG(debug, "Known-brokers for connection: " << log::formatList(knownBrokersUrls)); -} - - -void ConnectionHandler::redirect(const std::string& /*host*/, const Array& /*knownHosts*/) -{ - throw NotImplementedException("Redirection received from broker; not yet implemented in client"); -} - -void ConnectionHandler::close(uint16_t replyCode, const std::string& replyText) -{ - proxy.closeOk(); - errorCode = convert(replyCode); - errorText = replyText; - setState(CLOSED); - QPID_LOG(warning, "Broker closed connection: " << replyCode << ", " << replyText); - if (onError) { - onError(replyCode, replyText); - } -} - -void ConnectionHandler::closeOk() -{ - checkState(CLOSING, INVALID_STATE_CLOSE_OK); - if (onError && errorCode != CLOSE_CODE_NORMAL) { - onError(errorCode, errorText); - } else if (onClose) { - onClose(); - } - setState(CLOSED); -} - -bool ConnectionHandler::isOpen() const -{ - return getState() == OPEN; -} - -bool ConnectionHandler::isClosed() const -{ - int s = getState(); - return s == CLOSED || s == FAILED; -} - -bool ConnectionHandler::isClosing() const { return getState() == CLOSING; } - -std::auto_ptr<qpid::sys::SecurityLayer> ConnectionHandler::getSecurityLayer() -{ - return securityLayer; -} - -void ConnectionHandler::setRcvTimeoutTask(boost::intrusive_ptr<qpid::sys::TimerTask> t) -{ - rcvTimeoutTask = t; -} |