diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /cpp/src/qpid/client | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-asyncstore.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
-rw-r--r-- | cpp/src/qpid/client/ConnectionHandler.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.cpp | 20 | ||||
-rw-r--r-- | cpp/src/qpid/client/FailoverManager.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/client/LoadPlugins.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/LoadPlugins.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/client/QueueOptions.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/client/RdmaConnector.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionImpl.cpp | 108 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionImpl.h | 19 | ||||
-rw-r--r-- | cpp/src/qpid/client/SslConnector.cpp | 106 | ||||
-rw-r--r-- | cpp/src/qpid/client/TCPConnector.cpp | 25 | ||||
-rw-r--r-- | cpp/src/qpid/client/TCPConnector.h | 7 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/client/windows/ClientDllMain.cpp | 22 | ||||
-rw-r--r-- | cpp/src/qpid/client/windows/SslConnector.cpp | 20 |
15 files changed, 191 insertions, 166 deletions
diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp index 91838d8e8b..4f88cb97ee 100644 --- a/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/cpp/src/qpid/client/ConnectionHandler.cpp @@ -82,9 +82,11 @@ void ConnectionHandler::Adapter::handle(qpid::framing::AMQFrame& f) handler.out(f); } -ConnectionHandler::ConnectionHandler(const ConnectionSettings& s, ProtocolVersion& v, Bounds& b) - : StateManager(NOT_STARTED), ConnectionSettings(s), outHandler(*this, b), proxy(outHandler), - errorCode(CLOSE_CODE_NORMAL), version(v) +ConnectionHandler::ConnectionHandler( + const ConnectionSettings& s, ProtocolVersion& v, Bounds& b) + : StateManager(NOT_STARTED), ConnectionSettings(s), + outHandler(*this, b), proxy(outHandler), errorCode(CLOSE_CODE_NORMAL), version(v), + properties(s.clientProperties) { insist = true; diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index 85b0e8303e..0abfbe09ec 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -128,15 +128,17 @@ public: // and we can't do that before we're unloaded as we can't // restart the Poller after shutting it down ~IOThread() { - std::vector<Thread> threads; - { - ScopedLock<Mutex> l(threadLock); - if (poller_) - poller_->shutdown(); - t.swap(threads); - } - for (std::vector<Thread>::iterator i = threads.begin(); i != threads.end(); ++i) { - i->join(); + if (SystemInfo::threadSafeShutdown()) { + std::vector<Thread> threads; + { + ScopedLock<Mutex> l(threadLock); + if (poller_) + poller_->shutdown(); + t.swap(threads); + } + for (std::vector<Thread>::iterator i = threads.begin(); i != threads.end(); ++i) { + i->join(); + } } } }; diff --git a/cpp/src/qpid/client/FailoverManager.cpp b/cpp/src/qpid/client/FailoverManager.cpp index 9405765b47..f27aeb5b52 100644 --- a/cpp/src/qpid/client/FailoverManager.cpp +++ b/cpp/src/qpid/client/FailoverManager.cpp @@ -34,6 +34,9 @@ using qpid::sys::Duration; FailoverManager::FailoverManager(const ConnectionSettings& s, ReconnectionStrategy* rs) : settings(s), strategy(rs), state(IDLE) {} +FailoverManager::~FailoverManager() +{} + void FailoverManager::execute(Command& c) { bool retry = false; diff --git a/cpp/src/qpid/client/LoadPlugins.cpp b/cpp/src/qpid/client/LoadPlugins.cpp index d76e1d458e..c5d8924014 100644 --- a/cpp/src/qpid/client/LoadPlugins.cpp +++ b/cpp/src/qpid/client/LoadPlugins.cpp @@ -48,7 +48,7 @@ struct LoadtimeInitialise { for (vector<string>::iterator iter = moduleOptions.load.begin(); iter != moduleOptions.load.end(); iter++) - qpid::tryShlib (iter->data(), false); + qpid::tryShlib (*iter); if (!moduleOptions.noLoad) { bool isDefault = defaultPath == moduleOptions.loadDir; diff --git a/cpp/src/qpid/client/LoadPlugins.h b/cpp/src/qpid/client/LoadPlugins.h index 0be4ae9f0c..0b398f6831 100644 --- a/cpp/src/qpid/client/LoadPlugins.h +++ b/cpp/src/qpid/client/LoadPlugins.h @@ -22,10 +22,12 @@ #ifndef _LoadPlugins_ #define _LoadPlugins_ +#include "qpid/client/ClientImportExport.h" + namespace qpid { namespace client { -void theModuleLoader(); +QPID_CLIENT_EXTERN void theModuleLoader(); }} diff --git a/cpp/src/qpid/client/QueueOptions.cpp b/cpp/src/qpid/client/QueueOptions.cpp index f4c1483859..460f3f5490 100644 --- a/cpp/src/qpid/client/QueueOptions.cpp +++ b/cpp/src/qpid/client/QueueOptions.cpp @@ -49,8 +49,8 @@ QueueOptions::~QueueOptions() void QueueOptions::setSizePolicy(QueueSizePolicy sp, uint64_t maxSize, uint32_t maxCount) { - if (maxCount) setInt(strMaxCountKey, maxCount); - if (maxSize) setInt(strMaxSizeKey, maxSize); + if (maxCount) setUInt64(strMaxCountKey, maxCount); + if (maxSize) setUInt64(strMaxSizeKey, maxSize); if (maxSize || maxCount){ switch (sp) { diff --git a/cpp/src/qpid/client/RdmaConnector.cpp b/cpp/src/qpid/client/RdmaConnector.cpp index 21143a1a75..9b6dcd645d 100644 --- a/cpp/src/qpid/client/RdmaConnector.cpp +++ b/cpp/src/qpid/client/RdmaConnector.cpp @@ -109,7 +109,7 @@ class RdmaConnector : public Connector, public sys::Codec const qpid::sys::SecuritySettings* getSecuritySettings() { return 0; } size_t decode(const char* buffer, size_t size); - size_t encode(const char* buffer, size_t size); + size_t encode(char* buffer, size_t size); bool canEncode(); public: @@ -371,9 +371,9 @@ bool RdmaConnector::canEncode() return aio->writable() && (lastEof || currentSize >= maxFrameSize); } -size_t RdmaConnector::encode(const char* buffer, size_t size) +size_t RdmaConnector::encode(char* buffer, size_t size) { - framing::Buffer out(const_cast<char*>(buffer), size); + framing::Buffer out(buffer, size); size_t bytesWritten(0); { Mutex::ScopedLock l(lock); diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp index 91e728d5ae..01e614e041 100644 --- a/cpp/src/qpid/client/SessionImpl.cpp +++ b/cpp/src/qpid/client/SessionImpl.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -61,9 +61,7 @@ SessionImpl::SessionImpl(const std::string& name, boost::shared_ptr<ConnectionIm ioHandler(*this), proxy(ioHandler), nextIn(0), - nextOut(0), - doClearDeliveryPropertiesExchange(true), - autoDetach(true) + nextOut(0) { channel.next = connection.get(); } @@ -72,12 +70,10 @@ SessionImpl::~SessionImpl() { { Lock l(state); if (state != DETACHED && state != DETACHING) { - if (autoDetach) { - QPID_LOG(warning, "Session was not closed cleanly: " << id); - // Inform broker but don't wait for detached as that deadlocks. - // The detached will be ignored as the channel will be invalid. - try { detach(); } catch (...) {} // ignore errors. - } + QPID_LOG(warning, "Session was not closed cleanly: " << id); + // Inform broker but don't wait for detached as that deadlocks. + // The detached will be ignored as the channel will be invalid. + try { detach(); } catch (...) {} // ignore errors. setState(DETACHED); handleClosed(); state.waitWaiters(); @@ -136,10 +132,10 @@ void SessionImpl::resume(boost::shared_ptr<ConnectionImpl>) // user thread void SessionImpl::suspend() //user thread { Lock l(state); - detach(); + detach(); } -void SessionImpl::detach() //call with lock held +void SessionImpl::detach() //call with lock held { if (state == ATTACHED) { setState(DETACHING); @@ -149,8 +145,8 @@ void SessionImpl::detach() //call with lock held uint16_t SessionImpl::getChannel() const // user thread -{ - return channel; +{ + return channel; } void SessionImpl::setChannel(uint16_t c) // user thread @@ -182,7 +178,7 @@ void SessionImpl::waitForCompletionImpl(const SequenceNumber& id) //call with lo bool SessionImpl::isComplete(const SequenceNumber& id) { - Lock l(state); + Lock l(state); return !incompleteOut.contains(id); } @@ -219,7 +215,7 @@ framing::SequenceNumber SessionImpl::getCompleteUpTo() return --firstIncomplete; } -struct MarkCompleted +struct MarkCompleted { const SequenceNumber& id; SequenceSet& completedIn; @@ -230,7 +226,7 @@ struct MarkCompleted { if (id >= end) { completedIn.add(start, end); - } else if (id >= start) { + } else if (id >= start) { completedIn.add(start, id); } } @@ -244,13 +240,13 @@ void SessionImpl::markCompleted(const SequenceSet& ids, bool notifyPeer) completedIn.add(ids); if (notifyPeer) { sendCompletion(); - } + } } void SessionImpl::markCompleted(const SequenceNumber& id, bool cumulative, bool notifyPeer) { Lock l(state); - if (cumulative) { + if (cumulative) { //everything in incompleteIn less than or equal to id is now complete MarkCompleted f(id, completedIn); incompleteIn.for_each(f); @@ -260,11 +256,11 @@ void SessionImpl::markCompleted(const SequenceNumber& id, bool cumulative, bool incompleteIn.remove(completedIn); } else if (incompleteIn.contains(id)) { incompleteIn.remove(id); - completedIn.add(id); + completedIn.add(id); } if (notifyPeer) { sendCompletion(); - } + } } void SessionImpl::setException(const sys::ExceptionHolder& ex) { @@ -310,42 +306,24 @@ namespace { struct SendContentFn { FrameHandler& handler; void operator()(const AMQFrame& f) { - if (!f.getMethod()) + if (!f.getMethod()) handler(const_cast<AMQFrame&>(f)); } SendContentFn(FrameHandler& h) : handler(h) {} }; -// Adaptor to make FrameSet look like MethodContent; used in cluster update client -struct MethodContentAdaptor : MethodContent -{ - AMQHeaderBody header; - const std::string content; - - MethodContentAdaptor(const FrameSet& f) : header(*f.getHeaders()), content(f.getContent()) {} - - const AMQHeaderBody& getHeader() const - { - return header; - } - const std::string& getData() const - { - return content; - } -}; - } - -Future SessionImpl::send(const AMQBody& command, const FrameSet& content, bool reframe) { + +Future SessionImpl::send(const AMQBody& command, const FrameSet& content) { Acquire a(sendLock); SequenceNumber id = nextOut++; { Lock l(state); - checkOpen(); + checkOpen(); incompleteOut.add(id); } Future f(id); - if (command.getMethod()->resultExpected()) { + if (command.getMethod()->resultExpected()) { Lock l(state); //result listener must be set before the command is sent f.setFutureResult(results.listenForResult(id)); @@ -353,14 +331,8 @@ Future SessionImpl::send(const AMQBody& command, const FrameSet& content, bool r AMQFrame frame(command); frame.setEof(false); handleOut(frame); - - if (reframe) { - MethodContentAdaptor c(content); - sendContent(c); - } else { - SendContentFn send(out); - content.map(send); - } + SendContentFn send(out); + content.map(send); return f; } @@ -375,11 +347,11 @@ Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent* con SequenceNumber id = nextOut++; { Lock l(state); - checkOpen(); + checkOpen(); incompleteOut.add(id); } Future f(id); - if (command.getMethod()->resultExpected()) { + if (command.getMethod()->resultExpected()) { Lock l(state); //result listener must be set before the command is sent f.setFutureResult(results.listenForResult(id)); @@ -399,23 +371,13 @@ void SessionImpl::sendContent(const MethodContent& content) { AMQFrame header(content.getHeader()); - // doClearDeliveryPropertiesExchange is set by cluster update client so - // it can send messages with delivery-properties.exchange set. - // - if (doClearDeliveryPropertiesExchange) { - // Normal client is not allowed to set the delivery-properties.exchange - // so clear it here. - AMQHeaderBody* headerp = static_cast<AMQHeaderBody*>(header.getBody()); - if (headerp && headerp->get<DeliveryProperties>()) - headerp->get<DeliveryProperties>(true)->clearExchangeFlag(); - } header.setFirstSegment(false); uint64_t data_length = content.getData().length(); if(data_length > 0){ header.setLastSegment(false); - handleOut(header); + handleOut(header); /*Note: end of frame marker included in overhead but not in size*/ - const uint32_t frag_size = maxFrameSize - AMQFrame::frameOverhead(); + const uint32_t frag_size = maxFrameSize - AMQFrame::frameOverhead(); if(data_length < frag_size){ AMQFrame frame((AMQContentBody(content.getData()))); @@ -442,7 +404,7 @@ void SessionImpl::sendContent(const MethodContent& content) } } } else { - handleOut(header); + handleOut(header); } } @@ -462,7 +424,7 @@ bool isContentFrame(AMQFrame& frame) { AMQBody* body = frame.getBody(); uint8_t type = body->type(); - return type == HEADER_BODY || type == CONTENT_BODY || isMessageMethod(body); + return type == HEADER_BODY || type == CONTENT_BODY || isMessageMethod(body); } void SessionImpl::handleIn(AMQFrame& frame) // network thread @@ -585,7 +547,7 @@ void SessionImpl::timeout(uint32_t t) void SessionImpl::commandPoint(const framing::SequenceNumber& id, uint64_t offset) { if (offset) throw NotImplementedException("Non-zero byte offset not yet supported for command-point"); - + Lock l(state); nextIn = id; } @@ -677,10 +639,10 @@ void SessionImpl::exception(uint16_t errorCode, { Lock l(state); setExceptionLH(createSessionException(errorCode, description)); - QPID_LOG(warning, "Exception received from broker: " << exceptionHolder.what() + QPID_LOG(warning, "Exception received from broker: " << exceptionHolder.what() << " [caused by " << commandId << " " << classCode << ":" << commandCode << "]"); - if (detachedLifetime) + if (detachedLifetime) setTimeout(0); } @@ -748,6 +710,4 @@ boost::shared_ptr<ConnectionImpl> SessionImpl::getConnection() return connection; } -void SessionImpl::disableAutoDetach() { autoDetach = false; } - }} diff --git a/cpp/src/qpid/client/SessionImpl.h b/cpp/src/qpid/client/SessionImpl.h index 4f9213a00a..e6ea8e6b90 100644 --- a/cpp/src/qpid/client/SessionImpl.h +++ b/cpp/src/qpid/client/SessionImpl.h @@ -87,15 +87,7 @@ public: Future send(const framing::AMQBody& command); Future send(const framing::AMQBody& command, const framing::MethodContent& content); - /** - * This method takes the content as a FrameSet; if reframe=false, - * the caller is resposnible for ensuring that the header and - * content frames in that set are correct for this connection - * (right flags, right fragmentation etc). If reframe=true, then - * the header and content from the frameset will be copied and - * reframed correctly for the connection. - */ - QPID_CLIENT_EXTERN Future send(const framing::AMQBody& command, const framing::FrameSet& content, bool reframe=false); + QPID_CLIENT_EXTERN Future send(const framing::AMQBody& command, const framing::FrameSet& content); void sendRawFrame(framing::AMQFrame& frame); Demux& getDemux(); @@ -125,11 +117,6 @@ public: */ boost::shared_ptr<ConnectionImpl> getConnection(); - void setDoClearDeliveryPropertiesExchange(bool b=true) { doClearDeliveryPropertiesExchange = b; } - - /** Suppress sending detach in destructor. Used by cluster to build session state */ - void disableAutoDetach(); - private: enum State { INACTIVE, @@ -225,10 +212,6 @@ private: SessionState sessionState; - bool doClearDeliveryPropertiesExchange; - - bool autoDetach; - friend class client::SessionHandler; }; diff --git a/cpp/src/qpid/client/SslConnector.cpp b/cpp/src/qpid/client/SslConnector.cpp index c2081a88f2..11707eb3f7 100644 --- a/cpp/src/qpid/client/SslConnector.cpp +++ b/cpp/src/qpid/client/SslConnector.cpp @@ -30,8 +30,9 @@ #include "qpid/framing/AMQFrame.h" #include "qpid/framing/InitiationHandler.h" #include "qpid/sys/ssl/util.h" -#include "qpid/sys/ssl/SslIo.h" +#include "qpid/sys/AsynchIO.h" #include "qpid/sys/ssl/SslSocket.h" +#include "qpid/sys/SocketAddress.h" #include "qpid/sys/Dispatcher.h" #include "qpid/sys/Poller.h" #include "qpid/sys/SecuritySettings.h" @@ -72,23 +73,28 @@ class SslConnector : public Connector sys::ssl::SslSocket socket; - sys::ssl::SslIO* aio; + sys::AsynchConnector* connector; + sys::AsynchIO* aio; std::string identifier; Poller::shared_ptr poller; SecuritySettings securitySettings; ~SslConnector(); - void readbuff(qpid::sys::ssl::SslIO&, qpid::sys::ssl::SslIOBufferBase*); - void writebuff(qpid::sys::ssl::SslIO&); + void readbuff(AsynchIO&, AsynchIOBufferBase*); + void writebuff(AsynchIO&); void writeDataBlock(const framing::AMQDataBlock& data); - void eof(qpid::sys::ssl::SslIO&); - void disconnected(qpid::sys::ssl::SslIO&); + void eof(AsynchIO&); + void disconnected(AsynchIO&); void connect(const std::string& host, const std::string& port); + void connected(const sys::Socket&); + void connectFailed(const std::string& msg); + void close(); void send(framing::AMQFrame& frame); - void abort() {} // TODO: Need to fix for heartbeat timeouts to work + void abort(); + void connectAborted(); void setInputHandler(framing::InputHandler* handler); void setShutdownHandler(sys::ShutdownHandler* handler); @@ -96,10 +102,10 @@ class SslConnector : public Connector framing::OutputHandler* getOutputHandler(); const std::string& getIdentifier() const; const SecuritySettings* getSecuritySettings(); - void socketClosed(qpid::sys::ssl::SslIO&, const qpid::sys::ssl::SslSocket&); + void socketClosed(AsynchIO&, const Socket&); size_t decode(const char* buffer, size_t size); - size_t encode(const char* buffer, size_t size); + size_t encode(char* buffer, size_t size); bool canEncode(); public: @@ -164,32 +170,46 @@ SslConnector::~SslConnector() { close(); } -void SslConnector::connect(const std::string& host, const std::string& port){ +void SslConnector::connect(const std::string& host, const std::string& port) { Mutex::ScopedLock l(lock); assert(closed); - try { - socket.connect(host, port); - } catch (const std::exception& e) { - socket.close(); - throw TransportFailure(e.what()); - } - + connector = AsynchConnector::create( + socket, + host, port, + boost::bind(&SslConnector::connected, this, _1), + boost::bind(&SslConnector::connectFailed, this, _3)); closed = false; - aio = new SslIO(socket, - boost::bind(&SslConnector::readbuff, this, _1, _2), - boost::bind(&SslConnector::eof, this, _1), - boost::bind(&SslConnector::disconnected, this, _1), - boost::bind(&SslConnector::socketClosed, this, _1, _2), - 0, // nobuffs - boost::bind(&SslConnector::writebuff, this, _1)); + + connector->start(poller); +} + +void SslConnector::connected(const Socket&) { + connector = 0; + aio = AsynchIO::create(socket, + boost::bind(&SslConnector::readbuff, this, _1, _2), + boost::bind(&SslConnector::eof, this, _1), + boost::bind(&SslConnector::disconnected, this, _1), + boost::bind(&SslConnector::socketClosed, this, _1, _2), + 0, // nobuffs + boost::bind(&SslConnector::writebuff, this, _1)); aio->createBuffers(maxFrameSize); - identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress()); + identifier = str(format("[%1%]") % socket.getFullAddress()); ProtocolInitiation init(version); writeDataBlock(init); aio->start(poller); } +void SslConnector::connectFailed(const std::string& msg) { + connector = 0; + QPID_LOG(warning, "Connect failed: " << msg); + socket.close(); + if (!closed) + closed = true; + if (shutdownHandler) + shutdownHandler->shutdown(); +} + void SslConnector::close() { Mutex::ScopedLock l(lock); if (!closed) { @@ -199,13 +219,31 @@ void SslConnector::close() { } } -void SslConnector::socketClosed(SslIO&, const SslSocket&) { +void SslConnector::socketClosed(AsynchIO&, const Socket&) { if (aio) aio->queueForDeletion(); if (shutdownHandler) shutdownHandler->shutdown(); } +void SslConnector::connectAborted() { + connector->stop(); + connectFailed("Connection timedout"); +} + +void SslConnector::abort() { + // Can't abort a closed connection + if (!closed) { + if (aio) { + // Established connection + aio->requestCallback(boost::bind(&SslConnector::eof, this, _1)); + } else if (connector) { + // We're still connecting + connector->requestCallback(boost::bind(&SslConnector::connectAborted, this)); + } + } +} + void SslConnector::setInputHandler(InputHandler* handler){ input = handler; } @@ -255,7 +293,7 @@ void SslConnector::send(AMQFrame& frame) { } } -void SslConnector::writebuff(SslIO& /*aio*/) +void SslConnector::writebuff(AsynchIO& /*aio*/) { // It's possible to be disconnected and be writable if (closed) @@ -265,7 +303,7 @@ void SslConnector::writebuff(SslIO& /*aio*/) return; } - SslIO::BufferBase* buffer = aio->getQueuedBuffer(); + AsynchIOBufferBase* buffer = aio->getQueuedBuffer(); if (buffer) { size_t encoded = encode(buffer->bytes, buffer->byteCount); @@ -285,9 +323,9 @@ bool SslConnector::canEncode() } // Called in IO thread. -size_t SslConnector::encode(const char* buffer, size_t size) +size_t SslConnector::encode(char* buffer, size_t size) { - framing::Buffer out(const_cast<char*>(buffer), size); + framing::Buffer out(buffer, size); size_t bytesWritten(0); { Mutex::ScopedLock l(lock); @@ -304,7 +342,7 @@ size_t SslConnector::encode(const char* buffer, size_t size) return bytesWritten; } -void SslConnector::readbuff(SslIO& aio, SslIO::BufferBase* buff) +void SslConnector::readbuff(AsynchIO& aio, AsynchIOBufferBase* buff) { int32_t decoded = decode(buff->bytes+buff->dataStart, buff->dataCount); // TODO: unreading needs to go away, and when we can cope @@ -343,7 +381,7 @@ size_t SslConnector::decode(const char* buffer, size_t size) } void SslConnector::writeDataBlock(const AMQDataBlock& data) { - SslIO::BufferBase* buff = aio->getQueuedBuffer(); + AsynchIOBufferBase* buff = aio->getQueuedBuffer(); assert(buff); framing::Buffer out(buff->bytes, buff->byteCount); data.encode(out); @@ -351,11 +389,11 @@ void SslConnector::writeDataBlock(const AMQDataBlock& data) { aio->queueWrite(buff); } -void SslConnector::eof(SslIO&) { +void SslConnector::eof(AsynchIO&) { close(); } -void SslConnector::disconnected(SslIO&) { +void SslConnector::disconnected(AsynchIO&) { close(); socketClosed(*aio, socket); } diff --git a/cpp/src/qpid/client/TCPConnector.cpp b/cpp/src/qpid/client/TCPConnector.cpp index a5c6465bad..783742764b 100644 --- a/cpp/src/qpid/client/TCPConnector.cpp +++ b/cpp/src/qpid/client/TCPConnector.cpp @@ -72,12 +72,13 @@ TCPConnector::TCPConnector(Poller::shared_ptr p, closed(true), shutdownHandler(0), input(0), + socket(createSocket()), connector(0), aio(0), poller(p) { QPID_LOG(debug, "TCPConnector created for " << version); - settings.configureSocket(socket); + settings.configureSocket(*socket); } TCPConnector::~TCPConnector() { @@ -88,7 +89,7 @@ void TCPConnector::connect(const std::string& host, const std::string& port) { Mutex::ScopedLock l(lock); assert(closed); connector = AsynchConnector::create( - socket, + *socket, host, port, boost::bind(&TCPConnector::connected, this, _1), boost::bind(&TCPConnector::connectFailed, this, _3)); @@ -99,7 +100,7 @@ void TCPConnector::connect(const std::string& host, const std::string& port) { void TCPConnector::connected(const Socket&) { connector = 0; - aio = AsynchIO::create(socket, + aio = AsynchIO::create(*socket, boost::bind(&TCPConnector::readbuff, this, _1, _2), boost::bind(&TCPConnector::eof, this, _1), boost::bind(&TCPConnector::disconnected, this, _1), @@ -116,7 +117,7 @@ void TCPConnector::start(sys::AsynchIO* aio_) { aio->createBuffers(maxFrameSize); - identifier = str(format("[%1%]") % socket.getFullAddress()); + identifier = str(format("[%1%]") % socket->getFullAddress()); } void TCPConnector::initAmqp() { @@ -127,7 +128,7 @@ void TCPConnector::initAmqp() { void TCPConnector::connectFailed(const std::string& msg) { connector = 0; QPID_LOG(warning, "Connect failed: " << msg); - socket.close(); + socket->close(); if (!closed) closed = true; if (shutdownHandler) @@ -150,6 +151,11 @@ void TCPConnector::socketClosed(AsynchIO&, const Socket&) { shutdownHandler->shutdown(); } +void TCPConnector::connectAborted() { + connector->stop(); + connectFailed("Connection timedout"); +} + void TCPConnector::abort() { // Can't abort a closed connection if (!closed) { @@ -158,8 +164,7 @@ void TCPConnector::abort() { aio->requestCallback(boost::bind(&TCPConnector::eof, this, _1)); } else if (connector) { // We're still connecting - connector->stop(); - connectFailed("Connection timedout"); + connector->requestCallback(boost::bind(&TCPConnector::connectAborted, this)); } } } @@ -245,9 +250,9 @@ bool TCPConnector::canEncode() } // Called in IO thread. -size_t TCPConnector::encode(const char* buffer, size_t size) +size_t TCPConnector::encode(char* buffer, size_t size) { - framing::Buffer out(const_cast<char*>(buffer), size); + framing::Buffer out(buffer, size); size_t bytesWritten(0); { Mutex::ScopedLock l(lock); @@ -318,7 +323,7 @@ void TCPConnector::eof(AsynchIO&) { void TCPConnector::disconnected(AsynchIO&) { close(); - socketClosed(*aio, socket); + socketClosed(*aio, *socket); } void TCPConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl) diff --git a/cpp/src/qpid/client/TCPConnector.h b/cpp/src/qpid/client/TCPConnector.h index c0bc26028d..63af3b878a 100644 --- a/cpp/src/qpid/client/TCPConnector.h +++ b/cpp/src/qpid/client/TCPConnector.h @@ -35,7 +35,7 @@ #include "qpid/sys/Thread.h" #include <boost/shared_ptr.hpp> -#include <boost/weak_ptr.hpp> +#include <boost/scoped_ptr.hpp> #include <deque> #include <string> @@ -66,7 +66,7 @@ class TCPConnector : public Connector, public sys::Codec sys::ShutdownHandler* shutdownHandler; framing::InputHandler* input; - sys::Socket socket; + boost::scoped_ptr<sys::Socket> socket; sys::AsynchConnector* connector; sys::AsynchIO* aio; @@ -80,6 +80,7 @@ class TCPConnector : public Connector, public sys::Codec void close(); void send(framing::AMQFrame& frame); void abort(); + void connectAborted(); void setInputHandler(framing::InputHandler* handler); void setShutdownHandler(sys::ShutdownHandler* handler); @@ -90,7 +91,7 @@ class TCPConnector : public Connector, public sys::Codec const qpid::sys::SecuritySettings* getSecuritySettings() { return 0; } size_t decode(const char* buffer, size_t size); - size_t encode(const char* buffer, size_t size); + size_t encode(char* buffer, size_t size); bool canEncode(); protected: diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp index aaebec0720..f43119ea4c 100644 --- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp @@ -26,6 +26,7 @@ #include "qpid/framing/Uuid.h" #include "qpid/log/Statement.h" #include "qpid/Url.h" +#include "qpid/amqp_0_10/Codecs.h" #include <boost/intrusive_ptr.hpp> #include <vector> #include <sstream> @@ -156,6 +157,8 @@ void ConnectionImpl::setOption(const std::string& name, const Variant& value) settings.sslCertName = value.asString(); } else if (name == "x-reconnect-on-limit-exceeded" || name == "x_reconnect_on_limit_exceeded") { reconnectOnLimitExceeded = value; + } else if (name == "client-properties") { + amqp_0_10::translate(value.asMap(), settings.clientProperties); } else { throw qpid::messaging::MessagingException(QPID_MSG("Invalid option: " << name << " not recognised")); } diff --git a/cpp/src/qpid/client/windows/ClientDllMain.cpp b/cpp/src/qpid/client/windows/ClientDllMain.cpp new file mode 100644 index 0000000000..d636489908 --- /dev/null +++ b/cpp/src/qpid/client/windows/ClientDllMain.cpp @@ -0,0 +1,22 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/windows/QpidDllMain.h" diff --git a/cpp/src/qpid/client/windows/SslConnector.cpp b/cpp/src/qpid/client/windows/SslConnector.cpp index 2aa31e8202..e1f34e7aea 100644 --- a/cpp/src/qpid/client/windows/SslConnector.cpp +++ b/cpp/src/qpid/client/windows/SslConnector.cpp @@ -71,6 +71,8 @@ class SslConnector : public qpid::client::TCPConnector void redirectReadbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*); void redirectWritebuff(qpid::sys::AsynchIO&); void redirectEof(qpid::sys::AsynchIO&); + void redirectDisconnect(qpid::sys::AsynchIO&); + void redirectSocketClosed(qpid::sys::AsynchIO&, const qpid::sys::Socket&); public: SslConnector(boost::shared_ptr<qpid::sys::Poller>, @@ -79,7 +81,6 @@ public: ConnectionImpl*); virtual void connect(const std::string& host, const std::string& port); virtual void connected(const Socket&); - unsigned int getSSF(); }; // Static constructor which registers connector here @@ -124,6 +125,14 @@ void SslConnector::redirectEof(qpid::sys::AsynchIO& a) { eof(a); } +void SslConnector::redirectDisconnect(qpid::sys::AsynchIO& a) { + disconnected(a); +} + +void SslConnector::redirectSocketClosed(qpid::sys::AsynchIO& a, const qpid::sys::Socket& s) { + socketClosed(a, s); +} + SslConnector::SslConnector(boost::shared_ptr<qpid::sys::Poller> p, framing::ProtocolVersion ver, const ConnectionSettings& settings, @@ -164,8 +173,8 @@ void SslConnector::connected(const Socket& s) { credHandle, boost::bind(&SslConnector::redirectReadbuff, this, _1, _2), boost::bind(&SslConnector::redirectEof, this, _1), - boost::bind(&SslConnector::redirectEof, this, _1), - 0, // closed + boost::bind(&SslConnector::redirectDisconnect, this, _1), + boost::bind(&SslConnector::redirectSocketClosed, this, _1, _2), 0, // nobuffs boost::bind(&SslConnector::redirectWritebuff, this, _1), boost::bind(&SslConnector::negotiationDone, this, _1)); @@ -173,9 +182,4 @@ void SslConnector::connected(const Socket& s) { shim->start(poller); } -unsigned int SslConnector::getSSF() -{ - return shim->getSslKeySize(); -} - }}} // namespace qpid::client::windows |