summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/amqp_0_10/Connection.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-04-22 20:14:15 +0000
committerGordon Sim <gsim@apache.org>2008-04-22 20:14:15 +0000
commit1852df5d00eda8d25b7c11a01144fca629fc6427 (patch)
tree44162d68c73cc55f01b9818aa1333a0187916823 /cpp/src/qpid/amqp_0_10/Connection.cpp
parent7b977ed726d89a5a28f6113a0252c8ccc17ed64b (diff)
downloadqpid-python-1852df5d00eda8d25b7c11a01144fca629fc6427.tar.gz
Moved federation to final 0-10 codepath
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@650635 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/amqp_0_10/Connection.cpp')
-rw-r--r--cpp/src/qpid/amqp_0_10/Connection.cpp20
1 files changed, 15 insertions, 5 deletions
diff --git a/cpp/src/qpid/amqp_0_10/Connection.cpp b/cpp/src/qpid/amqp_0_10/Connection.cpp
index c5315ccf4c..03e553f180 100644
--- a/cpp/src/qpid/amqp_0_10/Connection.cpp
+++ b/cpp/src/qpid/amqp_0_10/Connection.cpp
@@ -27,12 +27,21 @@ namespace amqp_0_10 {
using sys::Mutex;
-Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id)
- : frameQueueClosed(false), output(o), connection(this, broker, id),
- identifier(id), initialized(false) {}
+Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id, bool _isClient)
+ : frameQueueClosed(false), output(o), connection(this, broker, id, _isClient),
+ identifier(id), initialized(false), isClient(_isClient) {}
size_t Connection::decode(const char* buffer, size_t size) {
framing::Buffer in(const_cast<char*>(buffer), size);
+ if (isClient && !initialized) {
+ //read in protocol header
+ framing::ProtocolInitiation pi;
+ if (pi.decode(in)) {
+ //TODO: check the version is correct
+ QPID_LOG(trace, "RECV " << identifier << " INIT(" << pi << ")");
+ }
+ initialized = true;
+ }
framing::AMQFrame frame;
while(frame.decode(in)) {
QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
@@ -44,7 +53,7 @@ size_t Connection::decode(const char* buffer, size_t size) {
bool Connection::canEncode() {
if (!frameQueueClosed) connection.doOutput();
Mutex::ScopedLock l(frameQueueLock);
- return !initialized || !frameQueue.empty();
+ return (!isClient && !initialized) || !frameQueue.empty();
}
bool Connection::isClosed() const {
@@ -55,10 +64,11 @@ bool Connection::isClosed() const {
size_t Connection::encode(const char* buffer, size_t size) {
Mutex::ScopedLock l(frameQueueLock);
framing::Buffer out(const_cast<char*>(buffer), size);
- if (!initialized) {
+ if (!isClient && !initialized) {
framing::ProtocolInitiation pi(getVersion());
pi.encode(out);
initialized = true;
+ QPID_LOG(trace, "SENT " << identifier << " INIT(" << pi << ")");
}
while (!frameQueue.empty() && (frameQueue.front().size() <= out.available())) {
frameQueue.front().encode(out);