diff options
Diffstat (limited to 'cpp/lib/broker/MessageHandlerImpl.cpp')
| -rw-r--r-- | cpp/lib/broker/MessageHandlerImpl.cpp | 42 |
1 files changed, 20 insertions, 22 deletions
diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp index 30b69e4654..e19afd0e67 100644 --- a/cpp/lib/broker/MessageHandlerImpl.cpp +++ b/cpp/lib/broker/MessageHandlerImpl.cpp @@ -80,8 +80,6 @@ MessageHandlerImpl::consume(const MethodContext& context, bool exclusive, const qpid::framing::FieldTable& filter ) { - //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature - Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); if(!destination.empty() && channel.exists(destination)){ throw ConnectionException(530, "Consumer tags must be unique"); @@ -139,7 +137,7 @@ MessageHandlerImpl::offset(const MethodContext&, void MessageHandlerImpl::ok( const MethodContext& ) { - // TODO aconway 2007-02-05: For HA, we can drop acked messages here. + // TODO: Need to ack the transfers acknowledged so far for flow control purp oses } void @@ -156,8 +154,6 @@ MessageHandlerImpl::qos(const MethodContext& context, u_int16_t prefetchCount, bool /*global*/ ) { - //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature - //TODO: handle global channel.setPrefetchSize(prefetchSize); channel.setPrefetchCount(prefetchCount); @@ -196,14 +192,14 @@ MessageHandlerImpl::transfer(const MethodContext& context, u_int16_t /*ticket*/, const string& /*destination*/, bool /*redelivered*/, - bool /* immediate */, + bool immediate, u_int64_t /*ttl*/, u_int8_t /*priority*/, u_int64_t /*timestamp*/, u_int8_t /*deliveryMode*/, u_int64_t /*expiration*/, const string& exchangeName, - const string& /* routingKey */, + const string& routingKey, const string& /*messageId*/, const string& /*correlationId*/, const string& /*replyTo*/, @@ -215,22 +211,24 @@ MessageHandlerImpl::transfer(const MethodContext& context, const string& /*securityToken*/, const qpid::framing::FieldTable& /*applicationHeaders*/, qpid::framing::Content body, - bool /* mandatory */ ) + bool mandatory) { - //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature - MessageTransferBody::shared_ptr transfer = - boost::shared_polymorphic_downcast<MessageTransferBody>( - context.methodBody); - // Verify the exchange exists, will throw if not. - broker.getExchanges().get(exchangeName); - if (body.isInline()) { - MessageMessage* msg = new MessageMessage(transfer); - // FIXME aconway 2007-02-05: Remove exchange parameter. - // use shared_ptr for message. - channel.handlePublish(msg, Exchange::shared_ptr()); - sendOk(context); - } else { - references.get(body.getValue()).transfer(transfer); + Exchange::shared_ptr exchange = exchangeName.empty() ? + broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName); + boost::shared_ptr<MessageTransferBody> transfer(boost::dynamic_pointer_cast<MessageTransferBody>(context.methodBody)); + if(exchange){ + if (body.isInline()) { + Message::shared_ptr msg(new MessageMessage(transfer, exchangeName, + routingKey, mandatory, immediate)); + + channel.handleInlineTransfer(msg, exchange); + + connection.client->getMessageHandler()->ok(context); + } else { + references.get(body.getValue()).transfer(transfer); + } + }else{ + throw ChannelException(404, "Exchange not found '" + exchangeName + "'"); } } |
