summaryrefslogtreecommitdiff
path: root/cpp/lib/broker/MessageHandlerImpl.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-02-06 15:01:45 +0000
committerAlan Conway <aconway@apache.org>2007-02-06 15:01:45 +0000
commitfbd97f554b04a109c95c01fe6ad538c5f50161af (patch)
tree0324d02ee4f8d6ca2387d1d3ff85bcd61a123a34 /cpp/lib/broker/MessageHandlerImpl.cpp
parent80b1b0b5f443bfb3c9d62a80e1419c224d0229d8 (diff)
downloadqpid-python-fbd97f554b04a109c95c01fe6ad538c5f50161af.tar.gz
* broker/Reference, tests/ReferenceTest: class representing a reference.
* broker/BrokerChannel.cpp (complete): get destination exchange from Message, don't assume only one message in progress (could have multiple references open.) * broker/BrokerMessageMessage.cpp,.h: Contains transfer body and vector of append bodies. Construct from Reference. * broker/CompletionHandler.h: Extracted from BrokerMessage, used for MessageMessage also. * broker/ExchangeRegistry.cpp: Moved throw for missing exchanges to registry. * cpp/tests/start_broker: Increased wait time to 5 secs. * cpp/tests/*: renamed DummyChannel as MockChannel. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@504172 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/broker/MessageHandlerImpl.cpp')
-rw-r--r--cpp/lib/broker/MessageHandlerImpl.cpp88
1 files changed, 48 insertions, 40 deletions
diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp
index 71100996e7..30b69e4654 100644
--- a/cpp/lib/broker/MessageHandlerImpl.cpp
+++ b/cpp/lib/broker/MessageHandlerImpl.cpp
@@ -23,6 +23,8 @@
#include "Connection.h"
#include "Broker.h"
#include "BrokerMessageMessage.h"
+#include "MessageAppendBody.h"
+#include "MessageTransferBody.h"
namespace qpid {
namespace broker {
@@ -33,23 +35,23 @@ using namespace framing;
// Message class method handlers
//
void
-MessageHandlerImpl::append(const MethodContext&,
- const string& /*reference*/,
+MessageHandlerImpl::append(const MethodContext& context,
+ const string& reference,
const string& /*bytes*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ references.get(reference).append(
+ boost::shared_polymorphic_downcast<MessageAppendBody>(
+ context.methodBody));
+ sendOk(context);
}
void
-MessageHandlerImpl::cancel( const MethodContext& context,
- const string& destination )
+MessageHandlerImpl::cancel(const MethodContext& context,
+ const string& destination )
{
- //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-
channel.cancel(destination);
-
- connection.client->getMessageHandler()->ok(context);
+ sendOk(context);
}
void
@@ -61,10 +63,11 @@ MessageHandlerImpl::checkpoint(const MethodContext&,
}
void
-MessageHandlerImpl::close(const MethodContext&,
- const string& /*reference*/ )
+MessageHandlerImpl::close(const MethodContext& context,
+ const string& reference)
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ references.get(reference).close();
+ sendOk(context);
}
void
@@ -88,13 +91,16 @@ MessageHandlerImpl::consume(const MethodContext& context,
string newTag = destination;
channel.consume(newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter);
- connection.client->getMessageHandler()->ok(context);
+ sendOk(context);
//allow messages to be dispatched if required as there is now a consumer:
queue->dispatch();
}catch(ExclusiveAccessException& e){
- if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted");
- else throw ChannelException(403, "Access would violate previously granted exclusivity");
+ if(exclusive)
+ throw ChannelException(403, "Exclusive access cannot be granted");
+ else
+ throw ChannelException(
+ 403, "Access would violate previously granted exclusivity");
}
}
@@ -133,14 +139,15 @@ MessageHandlerImpl::offset(const MethodContext&,
void
MessageHandlerImpl::ok( const MethodContext& )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ // TODO aconway 2007-02-05: For HA, we can drop acked messages here.
}
void
-MessageHandlerImpl::open(const MethodContext&,
- const string& /*reference*/ )
+MessageHandlerImpl::open(const MethodContext& context,
+ const string& reference)
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ references.open(reference);
+ sendOk(context);
}
void
@@ -155,7 +162,7 @@ MessageHandlerImpl::qos(const MethodContext& context,
channel.setPrefetchSize(prefetchSize);
channel.setPrefetchCount(prefetchCount);
- connection.client->getMessageHandler()->ok(context);
+ sendOk(context);
}
void
@@ -189,14 +196,14 @@ MessageHandlerImpl::transfer(const MethodContext& context,
u_int16_t /*ticket*/,
const string& /*destination*/,
bool /*redelivered*/,
- bool immediate,
+ bool /* immediate */,
u_int64_t /*ttl*/,
u_int8_t /*priority*/,
u_int64_t /*timestamp*/,
u_int8_t /*deliveryMode*/,
u_int64_t /*expiration*/,
const string& exchangeName,
- const string& routingKey,
+ const string& /* routingKey */,
const string& /*messageId*/,
const string& /*correlationId*/,
const string& /*replyTo*/,
@@ -208,27 +215,28 @@ MessageHandlerImpl::transfer(const MethodContext& context,
const string& /*securityToken*/,
const qpid::framing::FieldTable& /*applicationHeaders*/,
qpid::framing::Content body,
- bool mandatory )
+ bool /* mandatory */ )
{
//assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-
- Exchange::shared_ptr exchange = exchangeName.empty() ?
- broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName);
- if(exchange){
- if (body.isInline()) {
- MessageMessage* msg =
- new MessageMessage(context.methodBody, exchangeName,
- routingKey, mandatory, immediate);
- channel.handlePublish(msg, exchange);
-
- connection.client->getMessageHandler()->ok(context);
- } else {
- // Don't handle reference content yet
- assert(body.isInline());
- }
- }else{
- throw ChannelException(404, "Exchange not found '" + exchangeName + "'");
+ MessageTransferBody::shared_ptr transfer =
+ boost::shared_polymorphic_downcast<MessageTransferBody>(
+ context.methodBody);
+ // Verify the exchange exists, will throw if not.
+ broker.getExchanges().get(exchangeName);
+ if (body.isInline()) {
+ MessageMessage* msg = new MessageMessage(transfer);
+ // FIXME aconway 2007-02-05: Remove exchange parameter.
+ // use shared_ptr for message.
+ channel.handlePublish(msg, Exchange::shared_ptr());
+ sendOk(context);
+ } else {
+ references.get(body.getValue()).transfer(transfer);
}
}
+
+void MessageHandlerImpl::sendOk(const MethodContext& context) {
+ connection.client->getMessageHandler()->ok(context);
+}
+
}} // namespace qpid::broker