diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2007-02-06 14:19:50 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2007-02-06 14:19:50 +0000 |
| commit | 923ad36710bb575ca01e3c2bdb81eea005c9383d (patch) | |
| tree | c5b4fbdebe071a18c781815051e133186fa001d8 /cpp | |
| parent | e9c5cd3d7ff42489422ef47177cc052baf13f426 (diff) | |
| download | qpid-python-923ad36710bb575ca01e3c2bdb81eea005c9383d.tar.gz | |
r800@fuschia: andrew | 2007-01-17 17:34:13 +0000
Updated to latest upstream changes
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@504148 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
| -rw-r--r-- | cpp/lib/broker/BrokerAdapter.cpp | 193 | ||||
| -rw-r--r-- | cpp/tests/MessageHandlerTest.cpp | 2 |
2 files changed, 194 insertions, 1 deletions
diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp index 10e386ff41..577c053413 100644 --- a/cpp/lib/broker/BrokerAdapter.cpp +++ b/cpp/lib/broker/BrokerAdapter.cpp @@ -356,5 +356,198 @@ BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::resume( } +// +// Message class method handlers +// +void +BrokerAdapter::MessageHandlerImpl::append( u_int16_t /*channel*/, + const string& /*reference*/, + const string& /*bytes*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + + +void +BrokerAdapter::MessageHandlerImpl::cancel( u_int16_t channel, + const string& destination ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + + connection.getChannel(channel).cancel(destination); + + connection.client->getMessageHandler()->ok(channel); +} + +void +BrokerAdapter::MessageHandlerImpl::checkpoint( u_int16_t /*channel*/, + const string& /*reference*/, + const string& /*identifier*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +BrokerAdapter::MessageHandlerImpl::close( u_int16_t /*channel*/, + const string& /*reference*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +BrokerAdapter::MessageHandlerImpl::consume( u_int16_t channelId, + u_int16_t /*ticket*/, + const string& queueName, + const string& destination, + bool noLocal, + bool noAck, + bool exclusive, + const qpid::framing::FieldTable& filter ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + + Queue::shared_ptr queue = connection.getQueue(queueName, channelId); + Channel& channel = connection.getChannel(channelId); + if(!destination.empty() && channel.exists(destination)){ + throw ConnectionException(530, "Consumer tags must be unique"); + } + + try{ + string newTag = destination; + channel.consume(newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter); + + connection.client->getMessageHandler()->ok(channelId); + + //allow messages to be dispatched if required as there is now a consumer: + queue->dispatch(); + }catch(ExclusiveAccessException& e){ + if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted"); + else throw ChannelException(403, "Access would violate previously granted exclusivity"); + } +} + +void +BrokerAdapter::MessageHandlerImpl::empty( u_int16_t /*channel*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +BrokerAdapter::MessageHandlerImpl::get( u_int16_t channelId, + u_int16_t /*ticket*/, + const string& queueName, + const string& /*destination*/, + bool noAck ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + + Queue::shared_ptr queue = connection.getQueue(queueName, channelId); + + // FIXME: get is probably Basic specific + if(!connection.getChannel(channelId).get(queue, !noAck)){ + + connection.client->getMessageHandler()->empty(channelId); + } + +} + +void +BrokerAdapter::MessageHandlerImpl::offset( u_int16_t /*channel*/, + u_int64_t /*value*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +BrokerAdapter::MessageHandlerImpl::ok( u_int16_t /*channel*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +BrokerAdapter::MessageHandlerImpl::open( u_int16_t /*channel*/, + const string& /*reference*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +BrokerAdapter::MessageHandlerImpl::qos( u_int16_t channel, + u_int32_t prefetchSize, + u_int16_t prefetchCount, + bool /*global*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + + //TODO: handle global + connection.getChannel(channel).setPrefetchSize(prefetchSize); + connection.getChannel(channel).setPrefetchCount(prefetchCount); + + connection.client->getMessageHandler()->ok(channel); +} + +void +BrokerAdapter::MessageHandlerImpl::recover( u_int16_t channel, + bool requeue ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + + connection.getChannel(channel).recover(requeue); + +} + +void +BrokerAdapter::MessageHandlerImpl::reject( u_int16_t /*channel*/, + u_int16_t /*code*/, + const string& /*text*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +BrokerAdapter::MessageHandlerImpl::resume( u_int16_t /*channel*/, + const string& /*reference*/, + const string& /*identifier*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature +} + +void +BrokerAdapter::MessageHandlerImpl::transfer( u_int16_t channel, + u_int16_t /*ticket*/, + const string& /*destination*/, + bool /*redelivered*/, + bool immediate, + u_int64_t /*ttl*/, + u_int8_t /*priority*/, + u_int64_t /*timestamp*/, + u_int8_t /*deliveryMode*/, + u_int64_t /*expiration*/, + const string& exchangeName, + const string& routingKey, + const string& /*messageId*/, + const string& /*correlationId*/, + const string& /*replyTo*/, + const string& /*contentType*/, + const string& /*contentEncoding*/, + const string& /*userId*/, + const string& /*appId*/, + const string& /*transactionId*/, + const string& /*securityToken*/, + const qpid::framing::FieldTable& /*applicationHeaders*/, + qpid::framing::Content /*body*/ ) +{ + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + + Exchange::shared_ptr exchange = exchangeName.empty() ? + connection.broker.getExchanges().getDefault() : connection.broker.getExchanges().get(exchangeName); + if(exchange){ + Message* msg = new Message(&connection, exchangeName, routingKey, false /*mandatory?*/, immediate); + connection.getChannel(channel).handlePublish(msg, exchange); + }else{ + throw ChannelException(404, "Exchange not found '" + exchangeName + "'"); + } +} + }} // namespace qpid::broker diff --git a/cpp/tests/MessageHandlerTest.cpp b/cpp/tests/MessageHandlerTest.cpp index 4dd2c13408..55971355f6 100644 --- a/cpp/tests/MessageHandlerTest.cpp +++ b/cpp/tests/MessageHandlerTest.cpp @@ -23,7 +23,7 @@ #include <amqp_framing.h> #include <qpid_test_plugin.h> -#include <SessionHandlerFactoryImpl.h> +#include <BrokerAdapter.h> using namespace qpid::framing; using namespace qpid::broker; |
