diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /cpp/src/qpid/messaging/amqp/ConnectionContext.cpp | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-asyncstore.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/messaging/amqp/ConnectionContext.cpp')
-rw-r--r-- | cpp/src/qpid/messaging/amqp/ConnectionContext.cpp | 612 |
1 files changed, 612 insertions, 0 deletions
diff --git a/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp new file mode 100644 index 0000000000..b2a9b979b6 --- /dev/null +++ b/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -0,0 +1,612 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ConnectionContext.h" +#include "DriverImpl.h" +#include "ReceiverContext.h" +#include "Sasl.h" +#include "SenderContext.h" +#include "SessionContext.h" +#include "Transport.h" +#include "qpid/messaging/exceptions.h" +#include "qpid/messaging/Duration.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/MessageImpl.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/ProtocolInitiation.h" +#include "qpid/framing/Uuid.h" +#include "qpid/log/Statement.h" +#include "qpid/sys/Time.h" +#include <vector> +extern "C" { +#include <proton/engine.h> +} + +namespace qpid { +namespace messaging { +namespace amqp { + + +ConnectionContext::ConnectionContext(const std::string& u, const qpid::types::Variant::Map& o) + : qpid::messaging::ConnectionOptions(o), + url(u), + engine(pn_transport()), + connection(pn_connection()), + //note: disabled read/write of header as now handled by engine + writeHeader(false), + readHeader(false), + haveOutput(false), + state(DISCONNECTED), + codecSwitch(*this) +{ + if (pn_transport_bind(engine, connection)) { + //error + } + pn_connection_set_container(connection, "qpid::messaging");//TODO: take this from a connection option + bool enableTrace(false); + QPID_LOG_TEST_CAT(trace, protocol, enableTrace); + if (enableTrace) pn_transport_trace(engine, PN_TRACE_FRM); +} + +ConnectionContext::~ConnectionContext() +{ + close(); + sessions.clear(); + pn_transport_free(engine); + pn_connection_free(connection); +} + +namespace { +const std::string COLON(":"); +} +void ConnectionContext::open() +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!"); + if (!driver) driver = DriverImpl::getDefault(); + + for (Url::const_iterator i = url.begin(); state != CONNECTED && i != url.end(); ++i) { + transport = driver->getTransport(i->protocol, *this); + std::stringstream port; + port << i->port; + id = i->host + COLON + port.str(); + if (useSasl()) { + sasl = std::auto_ptr<Sasl>(new Sasl(id, *this, i->host)); + } + state = CONNECTING; + try { + QPID_LOG(debug, id << " Connecting ..."); + transport->connect(i->host, port.str()); + } catch (const std::exception& e) { + QPID_LOG(info, id << " Error while connecting: " << e.what()); + } + while (state == CONNECTING) { + lock.wait(); + } + if (state == DISCONNECTED) { + QPID_LOG(debug, id << " Failed to connect"); + transport = boost::shared_ptr<Transport>(); + } else { + QPID_LOG(debug, id << " Connected"); + } + } + + if (state != CONNECTED) throw qpid::messaging::TransportFailure(QPID_MSG("Could not connect to " << url)); + + if (sasl.get()) { + wakeupDriver(); + while (!sasl->authenticated()) { + QPID_LOG(debug, id << " Waiting to be authenticated..."); + wait(); + } + QPID_LOG(debug, id << " Authenticated"); + } + + QPID_LOG(debug, id << " Opening..."); + pn_connection_open(connection); + wakeupDriver(); //want to write + while (pn_connection_state(connection) & PN_REMOTE_UNINIT) { + wait(); + } + if (!(pn_connection_state(connection) & PN_REMOTE_ACTIVE)) { + throw qpid::messaging::ConnectionError("Failed to open connection"); + } + QPID_LOG(debug, id << " Opened"); +} + +bool ConnectionContext::isOpen() const +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + return pn_connection_state(connection) & (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE); +} + +void ConnectionContext::endSession(boost::shared_ptr<SessionContext> ssn) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + pn_session_close(ssn->session); + //TODO: need to destroy session and remove context from map + wakeupDriver(); +} + +void ConnectionContext::close() +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + if (state != CONNECTED) return; + if (!(pn_connection_state(connection) & PN_LOCAL_CLOSED)) { + for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) { + //wait for outstanding sends to settle + while (!i->second->settled()) { + QPID_LOG(debug, "Waiting for sends to settle before closing"); + wait();//wait until message has been confirmed + } + + + if (!(pn_session_state(i->second->session) & PN_LOCAL_CLOSED)) { + pn_session_close(i->second->session); + } + } + pn_connection_close(connection); + wakeupDriver(); + //wait for close to be confirmed by peer? + while (!(pn_connection_state(connection) & PN_REMOTE_CLOSED)) { + wait(); + } + sessions.clear(); + } + transport->close(); + while (state != DISCONNECTED) { + lock.wait(); + } +} + +bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout) +{ + { + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + if (!lnk->capacity) { + pn_link_flow(lnk->receiver, 1); + wakeupDriver(); + } + } + if (get(ssn, lnk, message, timeout)) { + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + if (lnk->capacity) { + pn_link_flow(lnk->receiver, 1);//TODO: is this the right approach? + wakeupDriver(); + } + return true; + } else { + { + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + pn_link_drain(lnk->receiver, 0); + wakeupDriver(); + while (pn_link_credit(lnk->receiver) && !pn_link_queued(lnk->receiver)) { + QPID_LOG(debug, "Waiting for message or for credit to be drained: credit=" << pn_link_credit(lnk->receiver) << ", queued=" << pn_link_queued(lnk->receiver)); + wait(); + } + if (lnk->capacity && pn_link_queued(lnk->receiver) == 0) { + pn_link_flow(lnk->receiver, lnk->capacity); + } + } + if (get(ssn, lnk, message, qpid::messaging::Duration::IMMEDIATE)) { + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + if (lnk->capacity) { + pn_link_flow(lnk->receiver, 1); + wakeupDriver(); + } + return true; + } else { + return false; + } + } +} + +qpid::sys::AbsTime convert(qpid::messaging::Duration timeout) +{ + qpid::sys::AbsTime until; + uint64_t ms = timeout.getMilliseconds(); + if (ms < (uint64_t) (qpid::sys::TIME_INFINITE/qpid::sys::TIME_MSEC)) { + return qpid::sys::AbsTime(qpid::sys::now(), ms * qpid::sys::TIME_MSEC); + } else { + return qpid::sys::FAR_FUTURE; + } +} + +bool ConnectionContext::get(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout) +{ + qpid::sys::AbsTime until(convert(timeout)); + while (true) { + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + pn_delivery_t* current = pn_link_current((pn_link_t*) lnk->receiver); + QPID_LOG(debug, "In ConnectionContext::get(), current=" << current); + if (current) { + qpid::messaging::MessageImpl& impl = MessageImplAccess::get(message); + boost::shared_ptr<EncodedMessage> encoded(new EncodedMessage(pn_delivery_pending(current))); + ssize_t read = pn_link_recv(lnk->receiver, encoded->getData(), encoded->getSize()); + if (read < 0) throw qpid::messaging::MessagingException("Failed to read message"); + encoded->trim((size_t) read); + QPID_LOG(debug, "Received message of " << encoded->getSize() << " bytes: "); + encoded->init(impl); + impl.setEncoded(encoded); + impl.setInternalId(ssn->record(current)); + pn_link_advance(lnk->receiver); + return true; + } else if (until > qpid::sys::now()) { + wait(); + } else { + return false; + } + } + return false; +} + +void ConnectionContext::acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + if (message) { + ssn->acknowledge(MessageImplAccess::get(*message).getInternalId(), cumulative); + } else { + ssn->acknowledge(); + } + wakeupDriver(); +} + + +void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk) +{ + lnk->configure(); + attach(ssn->session, (pn_link_t*) lnk->sender); + if (!pn_link_remote_target((pn_link_t*) lnk->sender)) { + std::string msg("No such target : "); + msg += lnk->getTarget(); + throw qpid::messaging::NotFound(msg); + } +} + +void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk) +{ + lnk->configure(); + attach(ssn->session, lnk->receiver, lnk->capacity); + if (!pn_link_remote_source(lnk->receiver)) { + std::string msg("No such source : "); + msg += lnk->getSource(); + throw qpid::messaging::NotFound(msg); + } +} + +void ConnectionContext::attach(pn_session_t* /*session*/, pn_link_t* link, int credit) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + QPID_LOG(debug, "Attaching link " << link << ", state=" << pn_link_state(link)); + pn_link_open(link); + QPID_LOG(debug, "Link attached " << link << ", state=" << pn_link_state(link)); + if (credit) pn_link_flow(link, credit); + wakeupDriver(); + while (pn_link_state(link) & PN_REMOTE_UNINIT) { + QPID_LOG(debug, "waiting for confirmation of link attach for " << link << ", state=" << pn_link_state(link)); + wait(); + } +} + +void ConnectionContext::send(boost::shared_ptr<SenderContext> snd, const qpid::messaging::Message& message, bool sync) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + SenderContext::Delivery* delivery(0); + while (!(delivery = snd->send(message))) { + QPID_LOG(debug, "Waiting for capacity..."); + wait();//wait for capacity + } + wakeupDriver(); + if (sync) { + while (!delivery->accepted()) { + QPID_LOG(debug, "Waiting for confirmation..."); + wait();//wait until message has been confirmed + } + } +} + +void ConnectionContext::setCapacity(boost::shared_ptr<SenderContext> sender, uint32_t capacity) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sender->setCapacity(capacity); +} +uint32_t ConnectionContext::getCapacity(boost::shared_ptr<SenderContext> sender) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + return sender->getCapacity(); +} +uint32_t ConnectionContext::getUnsettled(boost::shared_ptr<SenderContext> sender) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + return sender->getUnsettled(); +} + +void ConnectionContext::setCapacity(boost::shared_ptr<ReceiverContext> receiver, uint32_t capacity) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + receiver->setCapacity(capacity); + pn_link_flow((pn_link_t*) receiver->receiver, receiver->getCapacity()); + wakeupDriver(); +} +uint32_t ConnectionContext::getCapacity(boost::shared_ptr<ReceiverContext> receiver) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + return receiver->getCapacity(); +} +uint32_t ConnectionContext::getAvailable(boost::shared_ptr<ReceiverContext> receiver) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + return receiver->getAvailable(); +} +uint32_t ConnectionContext::getUnsettled(boost::shared_ptr<ReceiverContext> receiver) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + return receiver->getUnsettled(); +} + +void ConnectionContext::activateOutput() +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + wakeupDriver(); +} +/** + * Expects lock to be held by caller + */ +void ConnectionContext::wakeupDriver() +{ + switch (state) { + case CONNECTED: + haveOutput = true; + transport->activateOutput(); + QPID_LOG(debug, "wakeupDriver()"); + break; + case DISCONNECTED: + case CONNECTING: + QPID_LOG(error, "wakeupDriver() called while not connected"); + break; + } +} + +void ConnectionContext::wait() +{ + lock.wait(); + if (state == DISCONNECTED) { + throw qpid::messaging::TransportFailure("Disconnected"); + } + //check for any closed links, sessions or indeed the connection +} + +boost::shared_ptr<SessionContext> ConnectionContext::newSession(bool transactional, const std::string& n) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + if (transactional) throw qpid::messaging::MessagingException("Transactions not yet supported"); + std::string name = n.empty() ? qpid::framing::Uuid(true).str() : n; + SessionMap::const_iterator i = sessions.find(name); + if (i == sessions.end()) { + boost::shared_ptr<SessionContext> s(new SessionContext(connection)); + s->session = pn_session(connection); + pn_session_open(s->session); + sessions[name] = s; + wakeupDriver(); + while (pn_session_state(s->session) & PN_REMOTE_UNINIT) { + wait(); + } + return s; + } else { + throw qpid::messaging::KeyError(std::string("Session already exists: ") + name); + } + +} +boost::shared_ptr<SessionContext> ConnectionContext::getSession(const std::string& name) const +{ + SessionMap::const_iterator i = sessions.find(name); + if (i == sessions.end()) { + throw qpid::messaging::KeyError(std::string("No such session") + name); + } else { + return i->second; + } +} + +void ConnectionContext::setOption(const std::string& name, const qpid::types::Variant& value) +{ + set(name, value); +} + +std::string ConnectionContext::getAuthenticatedUsername() +{ + return sasl.get() ? sasl->getAuthenticatedUsername() : std::string(); +} + +std::size_t ConnectionContext::decode(const char* buffer, std::size_t size) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + QPID_LOG(trace, id << " decode(" << size << ")"); + if (readHeader) { + size_t decoded = readProtocolHeader(buffer, size); + if (decoded < size) { + decoded += decode(buffer + decoded, size - decoded); + } + return decoded; + } + + //TODO: Fix pn_engine_input() to take const buffer + ssize_t n = pn_transport_input(engine, const_cast<char*>(buffer), size); + if (n > 0 || n == PN_EOS) { + //If engine returns EOS, have no way of knowing how many bytes + //it processed, but can assume none need to be reprocessed so + //consider them all read: + if (n == PN_EOS) n = size; + QPID_LOG_CAT(debug, network, id << " decoded " << n << " bytes from " << size) + pn_transport_tick(engine, 0); + lock.notifyAll(); + return n; + } else if (n == PN_ERR) { + throw qpid::Exception(QPID_MSG("Error on input: " << getError())); + } else { + return 0; + } + +} +std::size_t ConnectionContext::encode(char* buffer, std::size_t size) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + QPID_LOG(trace, id << " encode(" << size << ")"); + if (writeHeader) { + size_t encoded = writeProtocolHeader(buffer, size); + if (encoded < size) { + encoded += encode(buffer + encoded, size - encoded); + } + return encoded; + } + + ssize_t n = pn_transport_output(engine, buffer, size); + if (n > 0) { + QPID_LOG_CAT(debug, network, id << " encoded " << n << " bytes from " << size) + haveOutput = true; + return n; + } else if (n == PN_ERR) { + throw qpid::Exception(QPID_MSG("Error on output: " << getError())); + } else if (n == PN_EOS) { + haveOutput = false; + return 0;//Is this right? + } else { + haveOutput = false; + return 0; + } +} +bool ConnectionContext::canEncode() +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + return haveOutput && state == CONNECTED; +} +void ConnectionContext::closed() +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + state = DISCONNECTED; + lock.notifyAll(); +} +void ConnectionContext::opened() +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + state = CONNECTED; + lock.notifyAll(); +} +bool ConnectionContext::isClosed() const +{ + return !isOpen(); +} +namespace { +qpid::framing::ProtocolVersion AMQP_1_0_PLAIN(1,0,qpid::framing::ProtocolVersion::AMQP); +} + +std::string ConnectionContext::getError() +{ + std::stringstream text; + pn_error_t* cerror = pn_connection_error(connection); + if (cerror) text << "connection error " << pn_error_text(cerror); + pn_error_t* terror = pn_transport_error(engine); + if (terror) text << "transport error " << pn_error_text(terror); + return text.str(); +} + +framing::ProtocolVersion ConnectionContext::getVersion() const +{ + return AMQP_1_0_PLAIN; +} + +std::size_t ConnectionContext::readProtocolHeader(const char* buffer, std::size_t size) +{ + framing::ProtocolInitiation pi(getVersion()); + if (size >= pi.encodedSize()) { + readHeader = false; + qpid::framing::Buffer out(const_cast<char*>(buffer), size); + pi.decode(out); + QPID_LOG_CAT(debug, protocol, id << " read protocol header: " << pi); + return pi.encodedSize(); + } else { + return 0; + } +} +std::size_t ConnectionContext::writeProtocolHeader(char* buffer, std::size_t size) +{ + framing::ProtocolInitiation pi(getVersion()); + if (size >= pi.encodedSize()) { + QPID_LOG_CAT(debug, protocol, id << " writing protocol header: " << pi); + writeHeader = false; + qpid::framing::Buffer out(buffer, size); + pi.encode(out); + return pi.encodedSize(); + } else { + QPID_LOG_CAT(debug, protocol, id << " insufficient buffer for protocol header: " << size) + return 0; + } +} +bool ConnectionContext::useSasl() +{ + return !(mechanism == "none" || mechanism == "NONE" || mechanism == "None"); +} + +qpid::sys::Codec& ConnectionContext::getCodec() +{ + return codecSwitch; +} + +ConnectionContext::CodecSwitch::CodecSwitch(ConnectionContext& p) : parent(p) {} +std::size_t ConnectionContext::CodecSwitch::decode(const char* buffer, std::size_t size) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(parent.lock); + size_t decoded = 0; + if (parent.sasl.get() && !parent.sasl->authenticated()) { + decoded = parent.sasl->decode(buffer, size); + if (!parent.sasl->authenticated()) return decoded; + } + if (decoded < size) { + if (parent.sasl.get() && parent.sasl->getSecurityLayer()) decoded += parent.sasl->getSecurityLayer()->decode(buffer+decoded, size-decoded); + else decoded += parent.decode(buffer+decoded, size-decoded); + } + return decoded; +} +std::size_t ConnectionContext::CodecSwitch::encode(char* buffer, std::size_t size) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(parent.lock); + size_t encoded = 0; + if (parent.sasl.get() && parent.sasl->canEncode()) { + encoded += parent.sasl->encode(buffer, size); + if (!parent.sasl->authenticated()) return encoded; + } + if (encoded < size) { + if (parent.sasl.get() && parent.sasl->getSecurityLayer()) encoded += parent.sasl->getSecurityLayer()->encode(buffer+encoded, size-encoded); + else encoded += parent.encode(buffer+encoded, size-encoded); + } + return encoded; +} +bool ConnectionContext::CodecSwitch::canEncode() +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(parent.lock); + if (parent.sasl.get()) { + if (parent.sasl->canEncode()) return true; + else if (!parent.sasl->authenticated()) return false; + else if (parent.sasl->getSecurityLayer()) return parent.sasl->getSecurityLayer()->canEncode(); + } + return parent.canEncode(); +} + + +}}} // namespace qpid::messaging::amqp |