diff options
| author | Alan Conway <aconway@apache.org> | 2007-02-06 15:01:45 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-02-06 15:01:45 +0000 |
| commit | fbd97f554b04a109c95c01fe6ad538c5f50161af (patch) | |
| tree | 0324d02ee4f8d6ca2387d1d3ff85bcd61a123a34 /cpp/lib/broker/MessageHandlerImpl.cpp | |
| parent | 80b1b0b5f443bfb3c9d62a80e1419c224d0229d8 (diff) | |
| download | qpid-python-fbd97f554b04a109c95c01fe6ad538c5f50161af.tar.gz | |
* broker/Reference, tests/ReferenceTest: class representing a reference.
* broker/BrokerChannel.cpp (complete): get destination exchange from Message,
don't assume only one message in progress (could have multiple
references open.)
* broker/BrokerMessageMessage.cpp,.h: Contains transfer body and
vector of append bodies. Construct from Reference.
* broker/CompletionHandler.h: Extracted from BrokerMessage, used for
MessageMessage also.
* broker/ExchangeRegistry.cpp: Moved throw for missing exchanges to
registry.
* cpp/tests/start_broker: Increased wait time to 5 secs.
* cpp/tests/*: renamed DummyChannel as MockChannel.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@504172 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/broker/MessageHandlerImpl.cpp')
| -rw-r--r-- | cpp/lib/broker/MessageHandlerImpl.cpp | 88 |
1 files changed, 48 insertions, 40 deletions
diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp index 71100996e7..30b69e4654 100644 --- a/cpp/lib/broker/MessageHandlerImpl.cpp +++ b/cpp/lib/broker/MessageHandlerImpl.cpp @@ -23,6 +23,8 @@ #include "Connection.h" #include "Broker.h" #include "BrokerMessageMessage.h" +#include "MessageAppendBody.h" +#include "MessageTransferBody.h" namespace qpid { namespace broker { @@ -33,23 +35,23 @@ using namespace framing; // Message class method handlers // void -MessageHandlerImpl::append(const MethodContext&, - const string& /*reference*/, +MessageHandlerImpl::append(const MethodContext& context, + const string& reference, const string& /*bytes*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + references.get(reference).append( + boost::shared_polymorphic_downcast<MessageAppendBody>( + context.methodBody)); + sendOk(context); } void -MessageHandlerImpl::cancel( const MethodContext& context, - const string& destination ) +MessageHandlerImpl::cancel(const MethodContext& context, + const string& destination ) { - //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature - channel.cancel(destination); - - connection.client->getMessageHandler()->ok(context); + sendOk(context); } void @@ -61,10 +63,11 @@ MessageHandlerImpl::checkpoint(const MethodContext&, } void -MessageHandlerImpl::close(const MethodContext&, - const string& /*reference*/ ) +MessageHandlerImpl::close(const MethodContext& context, + const string& reference) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + references.get(reference).close(); + sendOk(context); } void @@ -88,13 +91,16 @@ MessageHandlerImpl::consume(const MethodContext& context, string newTag = destination; channel.consume(newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter); - connection.client->getMessageHandler()->ok(context); + sendOk(context); //allow messages to be dispatched if required as there is now a consumer: queue->dispatch(); }catch(ExclusiveAccessException& e){ - if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted"); - else throw ChannelException(403, "Access would violate previously granted exclusivity"); + if(exclusive) + throw ChannelException(403, "Exclusive access cannot be granted"); + else + throw ChannelException( + 403, "Access would violate previously granted exclusivity"); } } @@ -133,14 +139,15 @@ MessageHandlerImpl::offset(const MethodContext&, void MessageHandlerImpl::ok( const MethodContext& ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + // TODO aconway 2007-02-05: For HA, we can drop acked messages here. } void -MessageHandlerImpl::open(const MethodContext&, - const string& /*reference*/ ) +MessageHandlerImpl::open(const MethodContext& context, + const string& reference) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + references.open(reference); + sendOk(context); } void @@ -155,7 +162,7 @@ MessageHandlerImpl::qos(const MethodContext& context, channel.setPrefetchSize(prefetchSize); channel.setPrefetchCount(prefetchCount); - connection.client->getMessageHandler()->ok(context); + sendOk(context); } void @@ -189,14 +196,14 @@ MessageHandlerImpl::transfer(const MethodContext& context, u_int16_t /*ticket*/, const string& /*destination*/, bool /*redelivered*/, - bool immediate, + bool /* immediate */, u_int64_t /*ttl*/, u_int8_t /*priority*/, u_int64_t /*timestamp*/, u_int8_t /*deliveryMode*/, u_int64_t /*expiration*/, const string& exchangeName, - const string& routingKey, + const string& /* routingKey */, const string& /*messageId*/, const string& /*correlationId*/, const string& /*replyTo*/, @@ -208,27 +215,28 @@ MessageHandlerImpl::transfer(const MethodContext& context, const string& /*securityToken*/, const qpid::framing::FieldTable& /*applicationHeaders*/, qpid::framing::Content body, - bool mandatory ) + bool /* mandatory */ ) { //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature - - Exchange::shared_ptr exchange = exchangeName.empty() ? - broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName); - if(exchange){ - if (body.isInline()) { - MessageMessage* msg = - new MessageMessage(context.methodBody, exchangeName, - routingKey, mandatory, immediate); - channel.handlePublish(msg, exchange); - - connection.client->getMessageHandler()->ok(context); - } else { - // Don't handle reference content yet - assert(body.isInline()); - } - }else{ - throw ChannelException(404, "Exchange not found '" + exchangeName + "'"); + MessageTransferBody::shared_ptr transfer = + boost::shared_polymorphic_downcast<MessageTransferBody>( + context.methodBody); + // Verify the exchange exists, will throw if not. + broker.getExchanges().get(exchangeName); + if (body.isInline()) { + MessageMessage* msg = new MessageMessage(transfer); + // FIXME aconway 2007-02-05: Remove exchange parameter. + // use shared_ptr for message. + channel.handlePublish(msg, Exchange::shared_ptr()); + sendOk(context); + } else { + references.get(body.getValue()).transfer(transfer); } } + +void MessageHandlerImpl::sendOk(const MethodContext& context) { + connection.client->getMessageHandler()->ok(context); +} + }} // namespace qpid::broker |
