summaryrefslogtreecommitdiff
path: root/cpp/lib/broker/MessageHandlerImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/lib/broker/MessageHandlerImpl.cpp')
-rw-r--r--cpp/lib/broker/MessageHandlerImpl.cpp42
1 files changed, 20 insertions, 22 deletions
diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp
index 30b69e4654..e19afd0e67 100644
--- a/cpp/lib/broker/MessageHandlerImpl.cpp
+++ b/cpp/lib/broker/MessageHandlerImpl.cpp
@@ -80,8 +80,6 @@ MessageHandlerImpl::consume(const MethodContext& context,
bool exclusive,
const qpid::framing::FieldTable& filter )
{
- //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-
Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
if(!destination.empty() && channel.exists(destination)){
throw ConnectionException(530, "Consumer tags must be unique");
@@ -139,7 +137,7 @@ MessageHandlerImpl::offset(const MethodContext&,
void
MessageHandlerImpl::ok( const MethodContext& )
{
- // TODO aconway 2007-02-05: For HA, we can drop acked messages here.
+ // TODO: Need to ack the transfers acknowledged so far for flow control purp oses
}
void
@@ -156,8 +154,6 @@ MessageHandlerImpl::qos(const MethodContext& context,
u_int16_t prefetchCount,
bool /*global*/ )
{
- //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-
//TODO: handle global
channel.setPrefetchSize(prefetchSize);
channel.setPrefetchCount(prefetchCount);
@@ -196,14 +192,14 @@ MessageHandlerImpl::transfer(const MethodContext& context,
u_int16_t /*ticket*/,
const string& /*destination*/,
bool /*redelivered*/,
- bool /* immediate */,
+ bool immediate,
u_int64_t /*ttl*/,
u_int8_t /*priority*/,
u_int64_t /*timestamp*/,
u_int8_t /*deliveryMode*/,
u_int64_t /*expiration*/,
const string& exchangeName,
- const string& /* routingKey */,
+ const string& routingKey,
const string& /*messageId*/,
const string& /*correlationId*/,
const string& /*replyTo*/,
@@ -215,22 +211,24 @@ MessageHandlerImpl::transfer(const MethodContext& context,
const string& /*securityToken*/,
const qpid::framing::FieldTable& /*applicationHeaders*/,
qpid::framing::Content body,
- bool /* mandatory */ )
+ bool mandatory)
{
- //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
- MessageTransferBody::shared_ptr transfer =
- boost::shared_polymorphic_downcast<MessageTransferBody>(
- context.methodBody);
- // Verify the exchange exists, will throw if not.
- broker.getExchanges().get(exchangeName);
- if (body.isInline()) {
- MessageMessage* msg = new MessageMessage(transfer);
- // FIXME aconway 2007-02-05: Remove exchange parameter.
- // use shared_ptr for message.
- channel.handlePublish(msg, Exchange::shared_ptr());
- sendOk(context);
- } else {
- references.get(body.getValue()).transfer(transfer);
+ Exchange::shared_ptr exchange = exchangeName.empty() ?
+ broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName);
+ boost::shared_ptr<MessageTransferBody> transfer(boost::dynamic_pointer_cast<MessageTransferBody>(context.methodBody));
+ if(exchange){
+ if (body.isInline()) {
+ Message::shared_ptr msg(new MessageMessage(transfer, exchangeName,
+ routingKey, mandatory, immediate));
+
+ channel.handleInlineTransfer(msg, exchange);
+
+ connection.client->getMessageHandler()->ok(context);
+ } else {
+ references.get(body.getValue()).transfer(transfer);
+ }
+ }else{
+ throw ChannelException(404, "Exchange not found '" + exchangeName + "'");
}
}