diff options
Diffstat (limited to 'cpp/src/qpid/broker/Connection.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 40 |
1 files changed, 10 insertions, 30 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index ce9e4865db..cdbcee1c69 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -41,51 +41,34 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_) : framemax(65536), heartbeat(0), client(0), - stagingThreshold(broker.getStagingThreshold()) + stagingThreshold(broker.getStagingThreshold()), + adapter(*this) {} -Queue::shared_ptr Connection::getQueue(const string& name, uint16_t channel){ - Queue::shared_ptr queue; - if (name.empty()) { - queue = getChannel(channel).getDefaultQueue(); - if (!queue) throw ConnectionException( 530, "Queue must be specified or previously declared" ); - } else { - queue = broker.getQueues().find(name); - if (queue == 0) { - throw ChannelException( 404, "Queue not found: " + name); - } - } - return queue; -} - - Exchange::shared_ptr Connection::findExchange(const string& name){ return broker.getExchanges().get(name); } void Connection::received(framing::AMQFrame& frame){ - getChannel((frame.getChannel())).getHandlers().in->handle(frame); + if (frame.getChannel() == 0) { + adapter.handle(frame); + } else { + getChannel((frame.getChannel())).getHandlers().in->handle(frame); + } } void Connection::close( ReplyCode code, const string& text, ClassId classId, MethodId methodId) { - client->close(code, text, classId, methodId); + adapter.close(code, text, classId, methodId); getOutput().close(); } void Connection::initiated(const framing::ProtocolInitiation& header) { version = ProtocolVersion(header.getMajor(), header.getMinor()); - FieldTable properties; - string mechanisms("PLAIN"); - string locales("en_US"); - getChannel(0).init(0, *out, getVersion()); - client = &getChannel(0).getAdapter().getProxy().getConnection(); - client->start( - header.getMajor(), header.getMinor(), - properties, mechanisms, locales); + adapter.init(header); } void Connection::idleOut(){} @@ -117,10 +100,7 @@ void Connection::closeChannel(uint16_t id) { Channel& Connection::getChannel(ChannelId id) { ChannelMap::iterator i = channels.find(id); if (i == channels.end()) { - i = channels.insert( - id, new Channel( - *this, id, framemax, broker.getQueues().getStore(), - broker.getStagingThreshold())).first; + i = channels.insert(id, new Channel(*this, id, &broker.getStore())).first; } return *i; } |
