diff options
Diffstat (limited to 'cpp/src/qpid/amqp_0_10/Connection.cpp')
| -rw-r--r-- | cpp/src/qpid/amqp_0_10/Connection.cpp | 20 |
1 files changed, 15 insertions, 5 deletions
diff --git a/cpp/src/qpid/amqp_0_10/Connection.cpp b/cpp/src/qpid/amqp_0_10/Connection.cpp index c5315ccf4c..03e553f180 100644 --- a/cpp/src/qpid/amqp_0_10/Connection.cpp +++ b/cpp/src/qpid/amqp_0_10/Connection.cpp @@ -27,12 +27,21 @@ namespace amqp_0_10 { using sys::Mutex; -Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id) - : frameQueueClosed(false), output(o), connection(this, broker, id), - identifier(id), initialized(false) {} +Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id, bool _isClient) + : frameQueueClosed(false), output(o), connection(this, broker, id, _isClient), + identifier(id), initialized(false), isClient(_isClient) {} size_t Connection::decode(const char* buffer, size_t size) { framing::Buffer in(const_cast<char*>(buffer), size); + if (isClient && !initialized) { + //read in protocol header + framing::ProtocolInitiation pi; + if (pi.decode(in)) { + //TODO: check the version is correct + QPID_LOG(trace, "RECV " << identifier << " INIT(" << pi << ")"); + } + initialized = true; + } framing::AMQFrame frame; while(frame.decode(in)) { QPID_LOG(trace, "RECV [" << identifier << "]: " << frame); @@ -44,7 +53,7 @@ size_t Connection::decode(const char* buffer, size_t size) { bool Connection::canEncode() { if (!frameQueueClosed) connection.doOutput(); Mutex::ScopedLock l(frameQueueLock); - return !initialized || !frameQueue.empty(); + return (!isClient && !initialized) || !frameQueue.empty(); } bool Connection::isClosed() const { @@ -55,10 +64,11 @@ bool Connection::isClosed() const { size_t Connection::encode(const char* buffer, size_t size) { Mutex::ScopedLock l(frameQueueLock); framing::Buffer out(const_cast<char*>(buffer), size); - if (!initialized) { + if (!isClient && !initialized) { framing::ProtocolInitiation pi(getVersion()); pi.encode(out); initialized = true; + QPID_LOG(trace, "SENT " << identifier << " INIT(" << pi << ")"); } while (!frameQueue.empty() && (frameQueue.front().size() <= out.available())) { frameQueue.front().encode(out); |
