summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Connection.cpp')
-rw-r--r--cpp/src/qpid/broker/Connection.cpp40
1 files changed, 10 insertions, 30 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index ce9e4865db..cdbcee1c69 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -41,51 +41,34 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_) :
framemax(65536),
heartbeat(0),
client(0),
- stagingThreshold(broker.getStagingThreshold())
+ stagingThreshold(broker.getStagingThreshold()),
+ adapter(*this)
{}
-Queue::shared_ptr Connection::getQueue(const string& name, uint16_t channel){
- Queue::shared_ptr queue;
- if (name.empty()) {
- queue = getChannel(channel).getDefaultQueue();
- if (!queue) throw ConnectionException( 530, "Queue must be specified or previously declared" );
- } else {
- queue = broker.getQueues().find(name);
- if (queue == 0) {
- throw ChannelException( 404, "Queue not found: " + name);
- }
- }
- return queue;
-}
-
-
Exchange::shared_ptr Connection::findExchange(const string& name){
return broker.getExchanges().get(name);
}
void Connection::received(framing::AMQFrame& frame){
- getChannel((frame.getChannel())).getHandlers().in->handle(frame);
+ if (frame.getChannel() == 0) {
+ adapter.handle(frame);
+ } else {
+ getChannel((frame.getChannel())).getHandlers().in->handle(frame);
+ }
}
void Connection::close(
ReplyCode code, const string& text, ClassId classId, MethodId methodId)
{
- client->close(code, text, classId, methodId);
+ adapter.close(code, text, classId, methodId);
getOutput().close();
}
void Connection::initiated(const framing::ProtocolInitiation& header) {
version = ProtocolVersion(header.getMajor(), header.getMinor());
- FieldTable properties;
- string mechanisms("PLAIN");
- string locales("en_US");
- getChannel(0).init(0, *out, getVersion());
- client = &getChannel(0).getAdapter().getProxy().getConnection();
- client->start(
- header.getMajor(), header.getMinor(),
- properties, mechanisms, locales);
+ adapter.init(header);
}
void Connection::idleOut(){}
@@ -117,10 +100,7 @@ void Connection::closeChannel(uint16_t id) {
Channel& Connection::getChannel(ChannelId id) {
ChannelMap::iterator i = channels.find(id);
if (i == channels.end()) {
- i = channels.insert(
- id, new Channel(
- *this, id, framemax, broker.getQueues().getStore(),
- broker.getStagingThreshold())).first;
+ i = channels.insert(id, new Channel(*this, id, &broker.getStore())).first;
}
return *i;
}