summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/amqp_0_10/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/amqp_0_10/Connection.cpp')
-rw-r--r--cpp/src/qpid/amqp_0_10/Connection.cpp20
1 files changed, 15 insertions, 5 deletions
diff --git a/cpp/src/qpid/amqp_0_10/Connection.cpp b/cpp/src/qpid/amqp_0_10/Connection.cpp
index c5315ccf4c..03e553f180 100644
--- a/cpp/src/qpid/amqp_0_10/Connection.cpp
+++ b/cpp/src/qpid/amqp_0_10/Connection.cpp
@@ -27,12 +27,21 @@ namespace amqp_0_10 {
using sys::Mutex;
-Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id)
- : frameQueueClosed(false), output(o), connection(this, broker, id),
- identifier(id), initialized(false) {}
+Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id, bool _isClient)
+ : frameQueueClosed(false), output(o), connection(this, broker, id, _isClient),
+ identifier(id), initialized(false), isClient(_isClient) {}
size_t Connection::decode(const char* buffer, size_t size) {
framing::Buffer in(const_cast<char*>(buffer), size);
+ if (isClient && !initialized) {
+ //read in protocol header
+ framing::ProtocolInitiation pi;
+ if (pi.decode(in)) {
+ //TODO: check the version is correct
+ QPID_LOG(trace, "RECV " << identifier << " INIT(" << pi << ")");
+ }
+ initialized = true;
+ }
framing::AMQFrame frame;
while(frame.decode(in)) {
QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
@@ -44,7 +53,7 @@ size_t Connection::decode(const char* buffer, size_t size) {
bool Connection::canEncode() {
if (!frameQueueClosed) connection.doOutput();
Mutex::ScopedLock l(frameQueueLock);
- return !initialized || !frameQueue.empty();
+ return (!isClient && !initialized) || !frameQueue.empty();
}
bool Connection::isClosed() const {
@@ -55,10 +64,11 @@ bool Connection::isClosed() const {
size_t Connection::encode(const char* buffer, size_t size) {
Mutex::ScopedLock l(frameQueueLock);
framing::Buffer out(const_cast<char*>(buffer), size);
- if (!initialized) {
+ if (!isClient && !initialized) {
framing::ProtocolInitiation pi(getVersion());
pi.encode(out);
initialized = true;
+ QPID_LOG(trace, "SENT " << identifier << " INIT(" << pi << ")");
}
while (!frameQueue.empty() && (frameQueue.front().size() <= out.available())) {
frameQueue.front().encode(out);