diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /cpp/src/qpid/sys/ssl | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-asyncstore.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/ssl')
-rw-r--r-- | cpp/src/qpid/sys/ssl/SslHandler.cpp | 217 | ||||
-rw-r--r-- | cpp/src/qpid/sys/ssl/SslHandler.h | 85 | ||||
-rw-r--r-- | cpp/src/qpid/sys/ssl/SslIo.cpp | 470 | ||||
-rw-r--r-- | cpp/src/qpid/sys/ssl/SslIo.h | 193 | ||||
-rw-r--r-- | cpp/src/qpid/sys/ssl/SslSocket.cpp | 182 | ||||
-rw-r--r-- | cpp/src/qpid/sys/ssl/SslSocket.h | 32 | ||||
-rw-r--r-- | cpp/src/qpid/sys/ssl/util.cpp | 19 |
7 files changed, 95 insertions, 1103 deletions
diff --git a/cpp/src/qpid/sys/ssl/SslHandler.cpp b/cpp/src/qpid/sys/ssl/SslHandler.cpp deleted file mode 100644 index eeb8c26a76..0000000000 --- a/cpp/src/qpid/sys/ssl/SslHandler.cpp +++ /dev/null @@ -1,217 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/sys/ssl/SslHandler.h" -#include "qpid/sys/ssl/SslIo.h" -#include "qpid/sys/ssl/SslSocket.h" -#include "qpid/sys/Timer.h" -#include "qpid/framing/AMQP_HighestVersion.h" -#include "qpid/framing/ProtocolInitiation.h" -#include "qpid/log/Statement.h" - -#include <boost/bind.hpp> - -namespace qpid { -namespace sys { -namespace ssl { - - -struct ProtocolTimeoutTask : public sys::TimerTask { - SslHandler& handler; - std::string id; - - ProtocolTimeoutTask(const std::string& i, const Duration& timeout, SslHandler& h) : - TimerTask(timeout, "ProtocolTimeout"), - handler(h), - id(i) - {} - - void fire() { - // If this fires it means that we didn't negotiate the connection in the timeout period - // Schedule closing the connection for the io thread - QPID_LOG(error, "Connection " << id << " No protocol received closing"); - handler.abort(); - } -}; - -SslHandler::SslHandler(std::string id, ConnectionCodec::Factory* f, bool _nodict) : - identifier(id), - aio(0), - factory(f), - codec(0), - readError(false), - isClient(false), - nodict(_nodict) -{} - -SslHandler::~SslHandler() { - if (codec) - codec->closed(); - if (timeoutTimerTask) - timeoutTimerTask->cancel(); - delete codec; -} - -void SslHandler::init(SslIO* a, Timer& timer, uint32_t maxTime) { - aio = a; - - // Start timer for this connection - timeoutTimerTask = new ProtocolTimeoutTask(identifier, maxTime*TIME_MSEC, *this); - timer.add(timeoutTimerTask); - - // Give connection some buffers to use - aio->createBuffers(); -} - -void SslHandler::write(const framing::ProtocolInitiation& data) -{ - QPID_LOG(debug, "SENT [" << identifier << "]: INIT(" << data << ")"); - SslIO::BufferBase* buff = aio->getQueuedBuffer(); - assert(buff); - framing::Buffer out(buff->bytes, buff->byteCount); - data.encode(out); - buff->dataCount = data.encodedSize(); - aio->queueWrite(buff); -} - -void SslHandler::abort() { - // Don't disconnect if we're already disconnecting - if (!readError) { - aio->requestCallback(boost::bind(&SslHandler::eof, this, _1)); - } -} -void SslHandler::activateOutput() { - aio->notifyPendingWrite(); -} - -void SslHandler::giveReadCredit(int32_t) { - // FIXME aconway 2008-12-05: not yet implemented. -} - -// Input side -void SslHandler::readbuff(SslIO& , SslIO::BufferBase* buff) { - if (readError) { - return; - } - size_t decoded = 0; - if (codec) { // Already initiated - try { - decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount); - }catch(const std::exception& e){ - QPID_LOG(error, e.what()); - readError = true; - aio->queueWriteClose(); - } - }else{ - framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount); - framing::ProtocolInitiation protocolInit; - if (protocolInit.decode(in)) { - // We've just got the protocol negotiation so we can cancel the timeout for that - timeoutTimerTask->cancel(); - - decoded = in.getPosition(); - QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")"); - try { - codec = factory->create(protocolInit.getVersion(), *this, identifier, getSecuritySettings(aio)); - if (!codec) { - //TODO: may still want to revise this... - //send valid version header & close connection. - write(framing::ProtocolInitiation(framing::highestProtocolVersion)); - readError = true; - aio->queueWriteClose(); - } - } catch (const std::exception& e) { - QPID_LOG(error, e.what()); - readError = true; - aio->queueWriteClose(); - } - } - } - // TODO: unreading needs to go away, and when we can cope - // with multiple sub-buffers in the general buffer scheme, it will - if (decoded != size_t(buff->dataCount)) { - // Adjust buffer for used bytes and then "unread them" - buff->dataStart += decoded; - buff->dataCount -= decoded; - aio->unread(buff); - } else { - // Give whole buffer back to aio subsystem - aio->queueReadBuffer(buff); - } -} - -void SslHandler::eof(SslIO&) { - QPID_LOG(debug, "DISCONNECTED [" << identifier << "]"); - if (codec) codec->closed(); - aio->queueWriteClose(); -} - -void SslHandler::closedSocket(SslIO&, const SslSocket& s) { - // If we closed with data still to send log a warning - if (!aio->writeQueueEmpty()) { - QPID_LOG(warning, "CLOSING [" << identifier << "] unsent data (probably due to client disconnect)"); - } - delete &s; - aio->queueForDeletion(); - delete this; -} - -void SslHandler::disconnect(SslIO& a) { - // treat the same as eof - eof(a); -} - -// Notifications -void SslHandler::nobuffs(SslIO&) { -} - -void SslHandler::idle(SslIO&){ - if (isClient && codec == 0) { - codec = factory->create(*this, identifier, getSecuritySettings(aio)); - write(framing::ProtocolInitiation(codec->getVersion())); - // We've just sent the protocol negotiation so we can cancel the timeout for that - // This is not ideal, because we've not received anything yet, but heartbeats will - // be active soon - timeoutTimerTask->cancel(); - return; - } - if (codec == 0) return; - if (!codec->canEncode()) { - return; - } - SslIO::BufferBase* buff = aio->getQueuedBuffer(); - if (buff) { - size_t encoded=codec->encode(buff->bytes, buff->byteCount); - buff->dataCount = encoded; - aio->queueWrite(buff); - } - if (codec->isClosed()) - aio->queueWriteClose(); -} - -SecuritySettings SslHandler::getSecuritySettings(SslIO* aio) -{ - SecuritySettings settings = aio->getSecuritySettings(); - settings.nodict = nodict; - return settings; -} - - -}}} // namespace qpid::sys::ssl diff --git a/cpp/src/qpid/sys/ssl/SslHandler.h b/cpp/src/qpid/sys/ssl/SslHandler.h deleted file mode 100644 index 14814b0281..0000000000 --- a/cpp/src/qpid/sys/ssl/SslHandler.h +++ /dev/null @@ -1,85 +0,0 @@ -#ifndef QPID_SYS_SSL_SSLHANDLER_H -#define QPID_SYS_SSL_SSLHANDLER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/sys/ConnectionCodec.h" -#include "qpid/sys/OutputControl.h" - -#include <boost/intrusive_ptr.hpp> - -namespace qpid { - -namespace framing { - class ProtocolInitiation; -} - -namespace sys { - -class Timer; -class TimerTask; - -namespace ssl { - -class SslIO; -struct SslIOBufferBase; -class SslSocket; - -class SslHandler : public OutputControl { - std::string identifier; - SslIO* aio; - ConnectionCodec::Factory* factory; - ConnectionCodec* codec; - bool readError; - bool isClient; - bool nodict; - boost::intrusive_ptr<sys::TimerTask> timeoutTimerTask; - - void write(const framing::ProtocolInitiation&); - qpid::sys::SecuritySettings getSecuritySettings(SslIO* aio); - - public: - SslHandler(std::string id, ConnectionCodec::Factory* f, bool nodict); - ~SslHandler(); - void init(SslIO* a, Timer& timer, uint32_t maxTime); - - void setClient() { isClient = true; } - - // Output side - void abort(); - void activateOutput(); - void giveReadCredit(int32_t); - - // Input side - void readbuff(SslIO& aio, SslIOBufferBase* buff); - void eof(SslIO& aio); - void disconnect(SslIO& aio); - - // Notifications - void nobuffs(SslIO& aio); - void idle(SslIO& aio); - void closedSocket(SslIO& aio, const SslSocket& s); -}; - -}}} // namespace qpid::sys::ssl - -#endif /*!QPID_SYS_SSL_SSLHANDLER_H*/ diff --git a/cpp/src/qpid/sys/ssl/SslIo.cpp b/cpp/src/qpid/sys/ssl/SslIo.cpp deleted file mode 100644 index bbfb703170..0000000000 --- a/cpp/src/qpid/sys/ssl/SslIo.cpp +++ /dev/null @@ -1,470 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/sys/ssl/SslIo.h" -#include "qpid/sys/ssl/SslSocket.h" -#include "qpid/sys/ssl/check.h" - -#include "qpid/sys/Time.h" -#include "qpid/sys/posix/check.h" -#include "qpid/log/Statement.h" - -// TODO The basic algorithm here is not really POSIX specific and with a bit more abstraction -// could (should) be promoted to be platform portable -#include <unistd.h> -#include <sys/socket.h> -#include <signal.h> -#include <errno.h> -#include <string.h> - -#include <boost/bind.hpp> - -namespace qpid { -namespace sys { -namespace ssl { - -namespace { - -/* - * Make *process* not generate SIGPIPE when writing to closed - * pipe/socket (necessary as default action is to terminate process) - */ -void ignoreSigpipe() { - ::signal(SIGPIPE, SIG_IGN); -} - -/* - * We keep per thread state to avoid locking overhead. The assumption is that - * on average all the connections are serviced by all the threads so the state - * recorded in each thread is about the same. If this turns out not to be the - * case we could rebalance the info occasionally. - */ -__thread int threadReadTotal = 0; -__thread int threadReadCount = 0; -__thread int threadWriteTotal = 0; -__thread int threadWriteCount = 0; -__thread int64_t threadMaxIoTimeNs = 2 * 1000000; // start at 2ms -} - -/* - * Asynch Acceptor - */ - -template <class T> -SslAcceptorTmpl<T>::SslAcceptorTmpl(const T& s, Callback callback) : - acceptedCallback(callback), - handle(s, boost::bind(&SslAcceptorTmpl<T>::readable, this, _1), 0, 0), - socket(s) { - - s.setNonblocking(); - ignoreSigpipe(); -} - -template <class T> -SslAcceptorTmpl<T>::~SslAcceptorTmpl() -{ - handle.stopWatch(); -} - -template <class T> -void SslAcceptorTmpl<T>::start(Poller::shared_ptr poller) { - handle.startWatch(poller); -} - -/* - * We keep on accepting as long as there is something to accept - */ -template <class T> -void SslAcceptorTmpl<T>::readable(DispatchHandle& h) { - Socket* s; - do { - errno = 0; - // TODO: Currently we ignore the peers address, perhaps we should - // log it or use it for connection acceptance. - try { - s = socket.accept(); - if (s) { - acceptedCallback(*s); - } else { - break; - } - } catch (const std::exception& e) { - QPID_LOG(error, "Could not accept socket: " << e.what()); - } - } while (true); - - h.rewatch(); -} - -// Explicitly instantiate the templates we need -template class SslAcceptorTmpl<SslSocket>; -template class SslAcceptorTmpl<SslMuxSocket>; - -/* - * Asynch Connector - */ - -SslConnector::SslConnector(const SslSocket& s, - Poller::shared_ptr poller, - std::string hostname, - std::string port, - ConnectedCallback connCb, - FailedCallback failCb) : - DispatchHandle(s, - 0, - boost::bind(&SslConnector::connComplete, this, _1), - boost::bind(&SslConnector::connComplete, this, _1)), - connCallback(connCb), - failCallback(failCb), - socket(s) -{ - //TODO: would be better for connect to be performed on a - //non-blocking socket, but that doesn't work at present so connect - //blocks until complete - try { - socket.connect(hostname, port); - socket.setNonblocking(); - startWatch(poller); - } catch(std::exception& e) { - failure(-1, std::string(e.what())); - } -} - -void SslConnector::connComplete(DispatchHandle& h) -{ - int errCode = socket.getError(); - - h.stopWatch(); - if (errCode == 0) { - connCallback(socket); - DispatchHandle::doDelete(); - } else { - // TODO: This need to be fixed as strerror isn't thread safe - failure(errCode, std::string(::strerror(errCode))); - } -} - -void SslConnector::failure(int errCode, std::string message) -{ - if (failCallback) - failCallback(errCode, message); - - socket.close(); - delete &socket; - - DispatchHandle::doDelete(); -} - -/* - * Asynch reader/writer - */ -SslIO::SslIO(const SslSocket& s, - ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb, - ClosedCallback cCb, BuffersEmptyCallback eCb, IdleCallback iCb) : - - DispatchHandle(s, - boost::bind(&SslIO::readable, this, _1), - boost::bind(&SslIO::writeable, this, _1), - boost::bind(&SslIO::disconnected, this, _1)), - readCallback(rCb), - eofCallback(eofCb), - disCallback(disCb), - closedCallback(cCb), - emptyCallback(eCb), - idleCallback(iCb), - socket(s), - queuedClose(false), - writePending(false) { - - s.setNonblocking(); -} - -SslIO::~SslIO() { -} - -void SslIO::queueForDeletion() { - DispatchHandle::doDelete(); -} - -void SslIO::start(Poller::shared_ptr poller) { - DispatchHandle::startWatch(poller); -} - -void SslIO::createBuffers(uint32_t size) { - // Allocate all the buffer memory at once - bufferMemory.reset(new char[size*BufferCount]); - - // Create the Buffer structs in a vector - // And push into the buffer queue - buffers.reserve(BufferCount); - for (uint32_t i = 0; i < BufferCount; i++) { - buffers.push_back(BufferBase(&bufferMemory[i*size], size)); - queueReadBuffer(&buffers[i]); - } -} - -void SslIO::queueReadBuffer(BufferBase* buff) { - assert(buff); - buff->dataStart = 0; - buff->dataCount = 0; - bufferQueue.push_back(buff); - DispatchHandle::rewatchRead(); -} - -void SslIO::unread(BufferBase* buff) { - assert(buff); - if (buff->dataStart != 0) { - memmove(buff->bytes, buff->bytes+buff->dataStart, buff->dataCount); - buff->dataStart = 0; - } - bufferQueue.push_front(buff); - DispatchHandle::rewatchRead(); -} - -void SslIO::queueWrite(BufferBase* buff) { - assert(buff); - // If we've already closed the socket then throw the write away - if (queuedClose) { - bufferQueue.push_front(buff); - return; - } else { - writeQueue.push_front(buff); - } - writePending = false; - DispatchHandle::rewatchWrite(); -} - -void SslIO::notifyPendingWrite() { - writePending = true; - DispatchHandle::rewatchWrite(); -} - -void SslIO::queueWriteClose() { - queuedClose = true; - DispatchHandle::rewatchWrite(); -} - -void SslIO::requestCallback(RequestCallback callback) { - // TODO creating a function object every time isn't all that - // efficient - if this becomes heavily used do something better (what?) - assert(callback); - DispatchHandle::call(boost::bind(&SslIO::requestedCall, this, callback)); -} - -void SslIO::requestedCall(RequestCallback callback) { - assert(callback); - callback(*this); -} - -/** Return a queued buffer if there are enough - * to spare - */ -SslIO::BufferBase* SslIO::getQueuedBuffer() { - // Always keep at least one buffer (it might have data that was "unread" in it) - if (bufferQueue.size()<=1) - return 0; - BufferBase* buff = bufferQueue.back(); - assert(buff); - buff->dataStart = 0; - buff->dataCount = 0; - bufferQueue.pop_back(); - return buff; -} - -/* - * We keep on reading as long as we have something to read and a buffer to put - * it in - */ -void SslIO::readable(DispatchHandle& h) { - AbsTime readStartTime = AbsTime::now(); - do { - // (Try to) get a buffer - if (!bufferQueue.empty()) { - // Read into buffer - BufferBase* buff = bufferQueue.front(); - assert(buff); - bufferQueue.pop_front(); - errno = 0; - int readCount = buff->byteCount-buff->dataCount; - int rc = socket.read(buff->bytes + buff->dataCount, readCount); - if (rc > 0) { - buff->dataCount += rc; - threadReadTotal += rc; - - readCallback(*this, buff); - if (rc != readCount) { - // If we didn't fill the read buffer then time to stop reading - break; - } - - // Stop reading if we've overrun our timeslot - if (Duration(readStartTime, AbsTime::now()) > threadMaxIoTimeNs) { - break; - } - - } else { - // Put buffer back (at front so it doesn't interfere with unread buffers) - bufferQueue.push_front(buff); - assert(buff); - - // Eof or other side has gone away - if (rc == 0 || errno == ECONNRESET) { - eofCallback(*this); - h.unwatchRead(); - break; - } else if (errno == EAGAIN) { - // We have just put a buffer back so we know - // we can carry on watching for reads - break; - } else { - // Report error then just treat as a socket disconnect - QPID_LOG(error, "Error reading socket: " << getErrorString(PR_GetError())); - eofCallback(*this); - h.unwatchRead(); - break; - } - } - } else { - // Something to read but no buffer - if (emptyCallback) { - emptyCallback(*this); - } - // If we still have no buffers we can't do anything more - if (bufferQueue.empty()) { - h.unwatchRead(); - break; - } - - } - } while (true); - - ++threadReadCount; - return; -} - -/* - * We carry on writing whilst we have data to write and we can write - */ -void SslIO::writeable(DispatchHandle& h) { - AbsTime writeStartTime = AbsTime::now(); - do { - // See if we've got something to write - if (!writeQueue.empty()) { - // Write buffer - BufferBase* buff = writeQueue.back(); - writeQueue.pop_back(); - errno = 0; - assert(buff->dataStart+buff->dataCount <= buff->byteCount); - int rc = socket.write(buff->bytes+buff->dataStart, buff->dataCount); - if (rc >= 0) { - threadWriteTotal += rc; - - // If we didn't write full buffer put rest back - if (rc != buff->dataCount) { - buff->dataStart += rc; - buff->dataCount -= rc; - writeQueue.push_back(buff); - break; - } - - // Recycle the buffer - queueReadBuffer(buff); - - // Stop writing if we've overrun our timeslot - if (Duration(writeStartTime, AbsTime::now()) > threadMaxIoTimeNs) { - break; - } - } else { - // Put buffer back - writeQueue.push_back(buff); - if (errno == ECONNRESET || errno == EPIPE) { - // Just stop watching for write here - we'll get a - // disconnect callback soon enough - h.unwatchWrite(); - break; - } else if (errno == EAGAIN) { - // We have just put a buffer back so we know - // we can carry on watching for writes - break; - } else { - QPID_LOG(error, "Error writing to socket: " << getErrorString(PR_GetError())); - h.unwatchWrite(); - break; - } - } - } else { - // If we're waiting to close the socket then can do it now as there is nothing to write - if (queuedClose) { - close(h); - break; - } - // Fd is writable, but nothing to write - if (idleCallback) { - writePending = false; - idleCallback(*this); - } - // If we still have no buffers to write we can't do anything more - if (writeQueue.empty() && !writePending && !queuedClose) { - h.unwatchWrite(); - // The following handles the case where writePending is - // set to true after the test above; in this case its - // possible that the unwatchWrite overwrites the - // desired rewatchWrite so we correct that here - if (writePending) - h.rewatchWrite(); - break; - } - } - } while (true); - - ++threadWriteCount; - return; -} - -void SslIO::disconnected(DispatchHandle& h) { - // If we've already queued close do it instead of disconnected callback - if (queuedClose) { - close(h); - } else if (disCallback) { - disCallback(*this); - h.unwatch(); - } -} - -/* - * Close the socket and callback to say we've done it - */ -void SslIO::close(DispatchHandle& h) { - h.stopWatch(); - socket.close(); - if (closedCallback) { - closedCallback(*this, socket); - } -} - -SecuritySettings SslIO::getSecuritySettings() { - SecuritySettings settings; - settings.ssf = socket.getKeyLen(); - settings.authid = socket.getClientAuthId(); - return settings; -} - -}}} diff --git a/cpp/src/qpid/sys/ssl/SslIo.h b/cpp/src/qpid/sys/ssl/SslIo.h deleted file mode 100644 index f3112bfa65..0000000000 --- a/cpp/src/qpid/sys/ssl/SslIo.h +++ /dev/null @@ -1,193 +0,0 @@ -#ifndef _sys_ssl_SslIO -#define _sys_ssl_SslIO -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/sys/DispatchHandle.h" -#include "qpid/sys/SecuritySettings.h" - -#include <boost/function.hpp> -#include <boost/shared_array.hpp> -#include <deque> - -namespace qpid { -namespace sys { - -class Socket; - -namespace ssl { - -class SslSocket; - -/* - * Asynchronous ssl acceptor: accepts connections then does a callback - * with the accepted fd - */ -template <class T> -class SslAcceptorTmpl { -public: - typedef boost::function1<void, const Socket&> Callback; - -private: - Callback acceptedCallback; - qpid::sys::DispatchHandle handle; - const T& socket; - -public: - SslAcceptorTmpl(const T& s, Callback callback); - ~SslAcceptorTmpl(); - void start(qpid::sys::Poller::shared_ptr poller); - -private: - void readable(qpid::sys::DispatchHandle& handle); -}; - -/* - * Asynchronous ssl connector: starts the process of initiating a - * connection and invokes a callback when completed or failed. - */ -class SslConnector : private qpid::sys::DispatchHandle { -public: - typedef boost::function1<void, const SslSocket&> ConnectedCallback; - typedef boost::function2<void, int, std::string> FailedCallback; - -private: - ConnectedCallback connCallback; - FailedCallback failCallback; - const SslSocket& socket; - -public: - SslConnector(const SslSocket& socket, - Poller::shared_ptr poller, - std::string hostname, - std::string port, - ConnectedCallback connCb, - FailedCallback failCb = 0); - -private: - void connComplete(DispatchHandle& handle); - void failure(int, std::string); -}; - -struct SslIOBufferBase { - char* bytes; - int32_t byteCount; - int32_t dataStart; - int32_t dataCount; - - SslIOBufferBase(char* const b, const int32_t s) : - bytes(b), - byteCount(s), - dataStart(0), - dataCount(0) - {} - - virtual ~SslIOBufferBase() - {} -}; - -/* - * Asychronous reader/writer: - * Reader accepts buffers to read into; reads into the provided buffers - * and then does a callback with the buffer and amount read. Optionally it can callback - * when there is something to read but no buffer to read it into. - * - * Writer accepts a buffer and queues it for writing; can also be given - * a callback for when writing is "idle" (ie fd is writable, but nothing to write) - * - * The class is implemented in terms of DispatchHandle to allow it to be deleted by deleting - * the contained DispatchHandle - */ -class SslIO : private qpid::sys::DispatchHandle { -public: - typedef SslIOBufferBase BufferBase; - - typedef boost::function2<void, SslIO&, BufferBase*> ReadCallback; - typedef boost::function1<void, SslIO&> EofCallback; - typedef boost::function1<void, SslIO&> DisconnectCallback; - typedef boost::function2<void, SslIO&, const SslSocket&> ClosedCallback; - typedef boost::function1<void, SslIO&> BuffersEmptyCallback; - typedef boost::function1<void, SslIO&> IdleCallback; - typedef boost::function1<void, SslIO&> RequestCallback; - - SslIO(const SslSocket& s, - ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb, - ClosedCallback cCb = 0, BuffersEmptyCallback eCb = 0, IdleCallback iCb = 0); -private: - ReadCallback readCallback; - EofCallback eofCallback; - DisconnectCallback disCallback; - ClosedCallback closedCallback; - BuffersEmptyCallback emptyCallback; - IdleCallback idleCallback; - const SslSocket& socket; - std::deque<BufferBase*> bufferQueue; - std::deque<BufferBase*> writeQueue; - std::vector<BufferBase> buffers; - boost::shared_array<char> bufferMemory; - bool queuedClose; - /** - * This flag is used to detect and handle concurrency between - * calls to notifyPendingWrite() (which can be made from any thread) and - * the execution of the writeable() method (which is always on the - * thread processing this handle. - */ - volatile bool writePending; - -public: - /* - * Size of IO buffers - this is the maximum possible frame size + 1 - */ - const static uint32_t MaxBufferSize = 65536; - - /* - * Number of IO buffers allocated - I think the code can only use 2 - - * 1 for reading and 1 for writing, allocate 4 for safety - */ - const static uint32_t BufferCount = 4; - - void queueForDeletion(); - - void start(qpid::sys::Poller::shared_ptr poller); - void createBuffers(uint32_t size = MaxBufferSize); - void queueReadBuffer(BufferBase* buff); - void unread(BufferBase* buff); - void queueWrite(BufferBase* buff); - void notifyPendingWrite(); - void queueWriteClose(); - bool writeQueueEmpty() { return writeQueue.empty(); } - void requestCallback(RequestCallback); - BufferBase* getQueuedBuffer(); - - qpid::sys::SecuritySettings getSecuritySettings(); - -private: - ~SslIO(); - void readable(qpid::sys::DispatchHandle& handle); - void writeable(qpid::sys::DispatchHandle& handle); - void disconnected(qpid::sys::DispatchHandle& handle); - void requestedCall(RequestCallback); - void close(qpid::sys::DispatchHandle& handle); -}; - -}}} - -#endif // _sys_ssl_SslIO diff --git a/cpp/src/qpid/sys/ssl/SslSocket.cpp b/cpp/src/qpid/sys/ssl/SslSocket.cpp index 30234bb686..a328e49c13 100644 --- a/cpp/src/qpid/sys/ssl/SslSocket.cpp +++ b/cpp/src/qpid/sys/ssl/SslSocket.cpp @@ -20,6 +20,7 @@ */ #include "qpid/sys/ssl/SslSocket.h" +#include "qpid/sys/SocketAddress.h" #include "qpid/sys/ssl/check.h" #include "qpid/sys/ssl/util.h" #include "qpid/Exception.h" @@ -52,28 +53,6 @@ namespace sys { namespace ssl { namespace { -std::string getService(int fd, bool local) -{ - ::sockaddr_storage name; // big enough for any socket address - ::socklen_t namelen = sizeof(name); - - int result = -1; - if (local) { - result = ::getsockname(fd, (::sockaddr*)&name, &namelen); - } else { - result = ::getpeername(fd, (::sockaddr*)&name, &namelen); - } - - QPID_POSIX_CHECK(result); - - char servName[NI_MAXSERV]; - if (int rc=::getnameinfo((::sockaddr*)&name, namelen, 0, 0, - servName, sizeof(servName), - NI_NUMERICHOST | NI_NUMERICSERV) != 0) - throw QPID_POSIX_ERROR(rc); - return servName; -} - const std::string DOMAIN_SEPARATOR("@"); const std::string DC_SEPARATOR("."); const std::string DC("DC"); @@ -101,14 +80,18 @@ std::string getDomainFromSubject(std::string subject) } return domain; } - } -SslSocket::SslSocket() : socket(0), prototype(0) +SslSocket::SslSocket(const std::string& certName, bool clientAuth) : + nssSocket(0), certname(certName), prototype(0) { - impl->fd = ::socket (PF_INET, SOCK_STREAM, 0); - if (impl->fd < 0) throw QPID_POSIX_ERROR(errno); - socket = SSL_ImportFD(0, PR_ImportTCPSocket(impl->fd)); + //configure prototype socket: + prototype = SSL_ImportFD(0, PR_NewTCPSocket()); + + if (clientAuth) { + NSS_CHECK(SSL_OptionSet(prototype, SSL_REQUEST_CERTIFICATE, PR_TRUE)); + NSS_CHECK(SSL_OptionSet(prototype, SSL_REQUIRE_CERTIFICATE, PR_TRUE)); + } } /** @@ -116,25 +99,44 @@ SslSocket::SslSocket() : socket(0), prototype(0) * returned from accept. Because we use posix accept rather than * PR_Accept, we have to reset the handshake. */ -SslSocket::SslSocket(IOHandlePrivate* ioph, PRFileDesc* model) : Socket(ioph), socket(0), prototype(0) +SslSocket::SslSocket(int fd, PRFileDesc* model) : BSDSocket(fd), nssSocket(0), prototype(0) { - socket = SSL_ImportFD(model, PR_ImportTCPSocket(impl->fd)); - NSS_CHECK(SSL_ResetHandshake(socket, true)); + nssSocket = SSL_ImportFD(model, PR_ImportTCPSocket(fd)); + NSS_CHECK(SSL_ResetHandshake(nssSocket, PR_TRUE)); } void SslSocket::setNonblocking() const { + if (!nssSocket) { + BSDSocket::setNonblocking(); + return; + } PRSocketOptionData option; option.option = PR_SockOpt_Nonblocking; option.value.non_blocking = true; - PR_SetSocketOption(socket, &option); + PR_SetSocketOption(nssSocket, &option); } -void SslSocket::connect(const std::string& host, const std::string& port) const +void SslSocket::setTcpNoDelay() const { - std::stringstream namestream; - namestream << host << ":" << port; - connectname = namestream.str(); + if (!nssSocket) { + BSDSocket::setTcpNoDelay(); + return; + } + PRSocketOptionData option; + option.option = PR_SockOpt_NoDelay; + option.value.no_delay = true; + PR_SetSocketOption(nssSocket, &option); +} + +void SslSocket::connect(const SocketAddress& addr) const +{ + BSDSocket::connect(addr); +} + +void SslSocket::finishConnect(const SocketAddress& addr) const +{ + nssSocket = SSL_ImportFD(0, PR_ImportTCPSocket(fd)); void* arg; // Use the connection's cert-name if it has one; else use global cert-name @@ -145,75 +147,48 @@ void SslSocket::connect(const std::string& host, const std::string& port) const } else { arg = const_cast<char*>(SslOptions::global.certName.c_str()); } - NSS_CHECK(SSL_GetClientAuthDataHook(socket, NSS_GetClientAuthData, arg)); - NSS_CHECK(SSL_SetURL(socket, host.data())); - - char hostBuffer[PR_NETDB_BUF_SIZE]; - PRHostEnt hostEntry; - PR_CHECK(PR_GetHostByName(host.data(), hostBuffer, PR_NETDB_BUF_SIZE, &hostEntry)); - PRNetAddr address; - int value = PR_EnumerateHostEnt(0, &hostEntry, boost::lexical_cast<PRUint16>(port), &address); - if (value < 0) { - throw Exception(QPID_MSG("Error getting address for host: " << ErrorString())); - } else if (value == 0) { - throw Exception(QPID_MSG("Could not resolve address for host.")); - } - PR_CHECK(PR_Connect(socket, &address, PR_INTERVAL_NO_TIMEOUT)); - NSS_CHECK(SSL_ForceHandshake(socket)); + NSS_CHECK(SSL_GetClientAuthDataHook(nssSocket, NSS_GetClientAuthData, arg)); + + url = addr.getHost(); + NSS_CHECK(SSL_SetURL(nssSocket, url.data())); + + NSS_CHECK(SSL_ResetHandshake(nssSocket, PR_FALSE)); + NSS_CHECK(SSL_ForceHandshake(nssSocket)); } void SslSocket::close() const { - if (impl->fd > 0) { - PR_Close(socket); - impl->fd = -1; + if (!nssSocket) { + BSDSocket::close(); + return; + } + if (fd > 0) { + PR_Close(nssSocket); + fd = -1; } } -int SslSocket::listen(uint16_t port, int backlog, const std::string& certName, bool clientAuth) const +int SslSocket::listen(const SocketAddress& sa, int backlog) const { - //configure prototype socket: - prototype = SSL_ImportFD(0, PR_NewTCPSocket()); - if (clientAuth) { - NSS_CHECK(SSL_OptionSet(prototype, SSL_REQUEST_CERTIFICATE, PR_TRUE)); - NSS_CHECK(SSL_OptionSet(prototype, SSL_REQUIRE_CERTIFICATE, PR_TRUE)); - } - //get certificate and key (is this the correct way?) - CERTCertificate *cert = PK11_FindCertFromNickname(const_cast<char*>(certName.c_str()), 0); - if (!cert) throw Exception(QPID_MSG("Failed to load certificate '" << certName << "'")); + std::string cName( (certname == "") ? "localhost.localdomain" : certname); + CERTCertificate *cert = PK11_FindCertFromNickname(const_cast<char*>(cName.c_str()), 0); + if (!cert) throw Exception(QPID_MSG("Failed to load certificate '" << cName << "'")); SECKEYPrivateKey *key = PK11_FindKeyByAnyCert(cert, 0); if (!key) throw Exception(QPID_MSG("Failed to retrieve private key from certificate")); NSS_CHECK(SSL_ConfigSecureServer(prototype, cert, key, NSS_FindCertKEAType(cert))); SECKEY_DestroyPrivateKey(key); CERT_DestroyCertificate(cert); - //bind and listen - const int& socket = impl->fd; - int yes=1; - QPID_POSIX_CHECK(setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes))); - struct sockaddr_in name; - name.sin_family = AF_INET; - name.sin_port = htons(port); - name.sin_addr.s_addr = 0; - if (::bind(socket, (struct sockaddr*)&name, sizeof(name)) < 0) - throw Exception(QPID_MSG("Can't bind to port " << port << ": " << strError(errno))); - if (::listen(socket, backlog) < 0) - throw Exception(QPID_MSG("Can't listen on port " << port << ": " << strError(errno))); - - socklen_t namelen = sizeof(name); - if (::getsockname(socket, (struct sockaddr*)&name, &namelen) < 0) - throw QPID_POSIX_ERROR(errno); - - return ntohs(name.sin_port); + return BSDSocket::listen(sa, backlog); } -SslSocket* SslSocket::accept() const +Socket* SslSocket::accept() const { QPID_LOG(trace, "Accepting SSL connection."); - int afd = ::accept(impl->fd, 0, 0); + int afd = ::accept(fd, 0, 0); if ( afd >= 0) { - return new SslSocket(new IOHandlePrivate(afd), prototype); + return new SslSocket(afd, prototype); } else if (errno == EAGAIN) { return 0; } else { @@ -297,17 +272,22 @@ static bool isSslStream(int afd) { return isSSL2Handshake || isSSL3Handshake; } +SslMuxSocket::SslMuxSocket(const std::string& certName, bool clientAuth) : + SslSocket(certName, clientAuth) +{ +} + Socket* SslMuxSocket::accept() const { - int afd = ::accept(impl->fd, 0, 0); + int afd = ::accept(fd, 0, 0); if (afd >= 0) { QPID_LOG(trace, "Accepting connection with optional SSL wrapper."); if (isSslStream(afd)) { QPID_LOG(trace, "Accepted SSL connection."); - return new SslSocket(new IOHandlePrivate(afd), prototype); + return new SslSocket(afd, prototype); } else { QPID_LOG(trace, "Accepted Plaintext connection."); - return new Socket(new IOHandlePrivate(afd)); + return new BSDSocket(afd); } } else if (errno == EAGAIN) { return 0; @@ -318,32 +298,12 @@ Socket* SslMuxSocket::accept() const int SslSocket::read(void *buf, size_t count) const { - return PR_Read(socket, buf, count); + return PR_Read(nssSocket, buf, count); } int SslSocket::write(const void *buf, size_t count) const { - return PR_Write(socket, buf, count); -} - -uint16_t SslSocket::getLocalPort() const -{ - return std::atoi(getService(impl->fd, true).c_str()); -} - -uint16_t SslSocket::getRemotePort() const -{ - return atoi(getService(impl->fd, true).c_str()); -} - -void SslSocket::setTcpNoDelay(bool nodelay) const -{ - if (nodelay) { - PRSocketOptionData option; - option.option = PR_SockOpt_NoDelay; - option.value.no_delay = true; - PR_SetSocketOption(socket, &option); - } + return PR_Write(nssSocket, buf, count); } void SslSocket::setCertName(const std::string& name) @@ -359,7 +319,7 @@ int SslSocket::getKeyLen() const int keySize = 0; SECStatus rc; - rc = SSL_SecurityStatus( socket, + rc = SSL_SecurityStatus( nssSocket, &enabled, NULL, NULL, @@ -374,7 +334,7 @@ int SslSocket::getKeyLen() const std::string SslSocket::getClientAuthId() const { std::string authId; - CERTCertificate* cert = SSL_PeerCertificate(socket); + CERTCertificate* cert = SSL_PeerCertificate(nssSocket); if (cert) { authId = CERT_GetCommonName(&(cert->subject)); /* diff --git a/cpp/src/qpid/sys/ssl/SslSocket.h b/cpp/src/qpid/sys/ssl/SslSocket.h index eabadcbe23..fc97059cfd 100644 --- a/cpp/src/qpid/sys/ssl/SslSocket.h +++ b/cpp/src/qpid/sys/ssl/SslSocket.h @@ -23,7 +23,7 @@ */ #include "qpid/sys/IOHandle.h" -#include "qpid/sys/Socket.h" +#include "qpid/sys/posix/BSDSocket.h" #include <nspr.h> #include <string> @@ -37,55 +37,54 @@ class Duration; namespace ssl { -class SslSocket : public qpid::sys::Socket +class SslSocket : public qpid::sys::BSDSocket { public: - /** Create a socket wrapper for descriptor. */ - SslSocket(); + /** Create a socket wrapper for descriptor. + *@param certName name of certificate to use to identify the socket + */ + SslSocket(const std::string& certName = "", bool clientAuth = false); /** Set socket non blocking */ void setNonblocking() const; /** Set tcp-nodelay */ - void setTcpNoDelay(bool nodelay) const; + void setTcpNoDelay() const; /** Set SSL cert-name. Allows the cert-name to be set per * connection, overriding global cert-name settings from * NSSInit().*/ void setCertName(const std::string& certName); - void connect(const std::string& host, const std::string& port) const; + void connect(const SocketAddress&) const; + void finishConnect(const SocketAddress&) const; void close() const; /** Bind to a port and start listening. *@param port 0 means choose an available port. *@param backlog maximum number of pending connections. - *@param certName name of certificate to use to identify the server *@return The bound port. */ - int listen(uint16_t port = 0, int backlog = 10, const std::string& certName = "localhost.localdomain", bool clientAuth = false) const; + int listen(const SocketAddress&, int backlog = 10) const; /** * Accept a connection from a socket that is already listening * and has an incoming connection */ - SslSocket* accept() const; + virtual Socket* accept() const; // TODO The following are raw operations, maybe they need better wrapping? int read(void *buf, size_t count) const; int write(const void *buf, size_t count) const; - uint16_t getLocalPort() const; - uint16_t getRemotePort() const; - int getKeyLen() const; std::string getClientAuthId() const; protected: - mutable std::string connectname; - mutable PRFileDesc* socket; + mutable PRFileDesc* nssSocket; std::string certname; + mutable std::string url; /** * 'model' socket, with configuration to use when importing @@ -94,13 +93,14 @@ protected: */ mutable PRFileDesc* prototype; - SslSocket(IOHandlePrivate* ioph, PRFileDesc* model); - friend class SslMuxSocket; + SslSocket(int fd, PRFileDesc* model); + friend class SslMuxSocket; // Needed for this constructor }; class SslMuxSocket : public SslSocket { public: + SslMuxSocket(const std::string& certName = "", bool clientAuth = false); Socket* accept() const; }; diff --git a/cpp/src/qpid/sys/ssl/util.cpp b/cpp/src/qpid/sys/ssl/util.cpp index 3078e894df..de5d638b09 100644 --- a/cpp/src/qpid/sys/ssl/util.cpp +++ b/cpp/src/qpid/sys/ssl/util.cpp @@ -31,8 +31,6 @@ #include <iostream> #include <fstream> -#include <boost/filesystem/operations.hpp> -#include <boost/filesystem/path.hpp> namespace qpid { namespace sys { @@ -82,15 +80,14 @@ SslOptions SslOptions::global; char* readPasswordFromFile(PK11SlotInfo*, PRBool retry, void*) { const std::string& passwordFile = SslOptions::global.certPasswordFile; - if (retry || passwordFile.empty() || !boost::filesystem::exists(passwordFile)) { - return 0; - } else { - std::ifstream file(passwordFile.c_str()); - std::string password; - file >> password; - return PL_strdup(password.c_str()); - } -} + if (retry || passwordFile.empty()) return 0; + std::ifstream file(passwordFile.c_str()); + if (!file) return 0; + + std::string password; + file >> password; + return PL_strdup(password.c_str()); +} void initNSS(const SslOptions& options, bool server) { |