summaryrefslogtreecommitdiff
path: root/cpp/lib
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/lib')
-rw-r--r--cpp/lib/broker/BrokerAdapter.cpp240
-rw-r--r--cpp/lib/broker/BrokerChannel.cpp30
-rw-r--r--cpp/lib/broker/BrokerChannel.h8
-rw-r--r--cpp/lib/broker/BrokerMessageMessage.cpp51
-rw-r--r--cpp/lib/broker/BrokerMessageMessage.h33
-rw-r--r--cpp/lib/broker/CompletionHandler.h39
-rw-r--r--cpp/lib/broker/ExchangeRegistry.cpp5
-rw-r--r--cpp/lib/broker/Makefile.am2
-rw-r--r--cpp/lib/broker/MessageBuilder.cpp5
-rw-r--r--cpp/lib/broker/MessageBuilder.h11
-rw-r--r--cpp/lib/broker/MessageHandlerImpl.cpp88
-rw-r--r--cpp/lib/broker/MessageHandlerImpl.h23
-rw-r--r--cpp/lib/broker/Reference.cpp56
-rw-r--r--cpp/lib/broker/Reference.h111
-rw-r--r--cpp/lib/common/framing/MethodContext.h5
15 files changed, 363 insertions, 344 deletions
diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp
index fa25221bbd..6f55f32d47 100644
--- a/cpp/lib/broker/BrokerAdapter.cpp
+++ b/cpp/lib/broker/BrokerAdapter.cpp
@@ -355,245 +355,5 @@ BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::resume(
assert(0); // FIXME aconway 2007-01-04: 0-9 feature
}
-
-//
-// Message class method handlers
-//
-void
-BrokerAdapter::MessageHandlerImpl::append( u_int16_t /*channel*/,
- const string& /*reference*/,
- const string& /*bytes*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-
-void
-BrokerAdapter::MessageHandlerImpl::cancel( u_int16_t channel,
- const string& destination )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-
- connection.getChannel(channel).cancel(destination);
-
- connection.client->getMessageHandler()->ok(channel);
-}
-
-void
-BrokerAdapter::MessageHandlerImpl::checkpoint( u_int16_t /*channel*/,
- const string& /*reference*/,
- const string& /*identifier*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::MessageHandlerImpl::close( u_int16_t /*channel*/,
- const string& /*reference*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::MessageHandlerImpl::consume( u_int16_t channelId,
- u_int16_t /*ticket*/,
- const string& queueName,
- const string& destination,
- bool noLocal,
- bool noAck,
- bool exclusive,
- const qpid::framing::FieldTable& filter )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-
- Queue::shared_ptr queue = connection.getQueue(queueName, channelId);
- Channel& channel = connection.getChannel(channelId);
- if(!destination.empty() && channel.exists(destination)){
- throw ConnectionException(530, "Consumer tags must be unique");
- }
-
- try{
- string newTag = destination;
- channel.consume(newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter);
-
- connection.client->getMessageHandler()->ok(channelId);
-
- //allow messages to be dispatched if required as there is now a consumer:
- queue->dispatch();
- }catch(ExclusiveAccessException& e){
- if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted");
- else throw ChannelException(403, "Access would violate previously granted exclusivity");
- }
-
- connection.getChannel(channel).cancel(destination);
-
- connection.client->getMessageHandler()->ok(channel);
-}
-
-void
-BrokerAdapter::MessageHandlerImpl::empty( u_int16_t /*channel*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::MessageHandlerImpl::get( u_int16_t channelId,
- u_int16_t /*ticket*/,
- const string& queueName,
- const string& /*destination*/,
- bool noAck )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-
- Queue::shared_ptr queue = connection.getQueue(queueName, channelId);
-
- // FIXME: get is probably Basic specific
- if(!connection.getChannel(channelId).get(queue, !noAck)){
-
- connection.client->getMessageHandler()->empty(channelId);
- }
-
-}
-
-void
-BrokerAdapter::MessageHandlerImpl::offset( u_int16_t /*channel*/,
- u_int64_t /*value*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-
- Queue::shared_ptr queue = connection.getQueue(queueName, channelId);
- Channel& channel = connection.getChannel(channelId);
- if(!destination.empty() && channel.exists(destination)){
- throw ConnectionException(530, "Consumer tags must be unique");
- }
-
- try{
- string newTag = destination;
- channel.consume(newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter);
-
- connection.client->getMessageHandler()->ok(channelId);
-
- //allow messages to be dispatched if required as there is now a consumer:
- queue->dispatch();
- }catch(ExclusiveAccessException& e){
- if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted");
- else throw ChannelException(403, "Access would violate previously granted exclusivity");
- }
-
- connection.getChannel(channel).cancel(destination);
-
- connection.client->getMessageHandler()->ok(channel);
-}
-
-void
-BrokerAdapter::MessageHandlerImpl::ok( u_int16_t /*channel*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::MessageHandlerImpl::open( u_int16_t /*channel*/,
- const string& /*reference*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::MessageHandlerImpl::qos( u_int16_t channel,
- u_int32_t prefetchSize,
- u_int16_t prefetchCount,
- bool /*global*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-
- //TODO: handle global
- connection.getChannel(channel).setPrefetchSize(prefetchSize);
- connection.getChannel(channel).setPrefetchCount(prefetchCount);
-
- connection.client->getMessageHandler()->ok(channel);
-
- Queue::shared_ptr queue = connection.getQueue(queueName, channelId);
- Channel& channel = connection.getChannel(channelId);
- if(!destination.empty() && channel.exists(destination)){
- throw ConnectionException(530, "Consumer tags must be unique");
- }
-
- try{
- string newTag = destination;
- channel.consume(newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter);
-
- connection.client->getMessageHandler()->ok(channelId);
-
- //allow messages to be dispatched if required as there is now a consumer:
- queue->dispatch();
- }catch(ExclusiveAccessException& e){
- if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted");
- else throw ChannelException(403, "Access would violate previously granted exclusivity");
- }
-}
-
-void
-BrokerAdapter::MessageHandlerImpl::recover( u_int16_t channel,
- bool requeue )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-
- connection.getChannel(channel).recover(requeue);
-
-}
-
-void
-BrokerAdapter::MessageHandlerImpl::reject( u_int16_t /*channel*/,
- u_int16_t /*code*/,
- const string& /*text*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::MessageHandlerImpl::resume( u_int16_t /*channel*/,
- const string& /*reference*/,
- const string& /*identifier*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-}
-
-void
-BrokerAdapter::MessageHandlerImpl::transfer( u_int16_t channel,
- u_int16_t /*ticket*/,
- const string& /*destination*/,
- bool /*redelivered*/,
- bool immediate,
- u_int64_t /*ttl*/,
- u_int8_t /*priority*/,
- u_int64_t /*timestamp*/,
- u_int8_t /*deliveryMode*/,
- u_int64_t /*expiration*/,
- const string& exchangeName,
- const string& routingKey,
- const string& /*messageId*/,
- const string& /*correlationId*/,
- const string& /*replyTo*/,
- const string& /*contentType*/,
- const string& /*contentEncoding*/,
- const string& /*userId*/,
- const string& /*appId*/,
- const string& /*transactionId*/,
- const string& /*securityToken*/,
- const qpid::framing::FieldTable& /*applicationHeaders*/,
- qpid::framing::Content /*body*/ )
-{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-
- Exchange::shared_ptr exchange = exchangeName.empty() ?
- connection.broker.getExchanges().getDefault() : connection.broker.getExchanges().get(exchangeName);
- if(exchange){
- Message* msg = new Message(&connection, exchangeName, routingKey, false /*mandatory?*/, immediate);
- connection.getChannel(channel).handlePublish(msg, exchange);
- }else{
- throw ChannelException(404, "Exchange not found '" + exchangeName + "'");
- }
-}
-
}} // namespace qpid::broker
diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp
index 96215a60ed..c0250815e8 100644
--- a/cpp/lib/broker/BrokerChannel.cpp
+++ b/cpp/lib/broker/BrokerChannel.cpp
@@ -78,7 +78,7 @@ void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks,
bool exclusive, ConnectionToken* const connection,
const FieldTable*)
{
- if(tag.empty()) tag = tagGenerator.generate();
+ if(tag.empty()) tag = tagGenerator.generate();
ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks));
try{
queue->consume(c, exclusive);//may throw exception
@@ -187,6 +187,8 @@ void Channel::ConsumerImpl::requestDispatch(){
if(blocked) queue->dispatch();
}
+// FIXME aconway 2007-02-05: Drop exchange member, calculate from
+// message in ::complete().
void Channel::handlePublish(Message* _message, Exchange::shared_ptr _exchange){
Message::shared_ptr message(_message);
exchange = _exchange;
@@ -207,19 +209,19 @@ void Channel::handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>) {
// TODO aconway 2007-01-17: Implement heartbeating.
}
-void Channel::complete(Message::shared_ptr& msg){
- if(exchange){
- if(transactional){
- TxPublish* deliverable = new TxPublish(msg);
- exchange->route(*deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders()));
- txBuffer.enlist(new DeletingTxOp(deliverable));
- }else{
- DeliverableMessage deliverable(msg);
- exchange->route(deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders()));
- }
- exchange.reset();
- }else{
- std::cout << "Exchange not known in Channel::complete(Message::shared_ptr&)" << std::endl;
+void Channel::complete(Message::shared_ptr msg) {
+ Exchange::shared_ptr exchange =
+ connection.broker.getExchanges().get(msg->getExchange());
+ assert(exchange.get());
+ if(transactional) {
+ std::auto_ptr<TxPublish> deliverable(new TxPublish(msg));
+ exchange->route(*deliverable, msg->getRoutingKey(),
+ &(msg->getHeaderProperties()->getHeaders()));
+ txBuffer.enlist(new DeletingTxOp(deliverable.release()));
+ } else {
+ DeliverableMessage deliverable(msg);
+ exchange->route(deliverable, msg->getRoutingKey(),
+ &(msg->getHeaderProperties()->getHeaders()));
}
}
diff --git a/cpp/lib/broker/BrokerChannel.h b/cpp/lib/broker/BrokerChannel.h
index dd95e944bb..484a4d64e3 100644
--- a/cpp/lib/broker/BrokerChannel.h
+++ b/cpp/lib/broker/BrokerChannel.h
@@ -36,6 +36,7 @@
#include <Prefetch.h>
#include <TxBuffer.h>
#include "framing/ChannelAdapter.h"
+#include "CompletionHandler.h"
namespace qpid {
namespace broker {
@@ -51,9 +52,8 @@ using framing::string;
* Maintains state for an AMQP channel. Handles incoming and
* outgoing messages for that channel.
*/
-class Channel :
- public framing::ChannelAdapter,
- private MessageBuilder::CompletionHandler
+class Channel : public framing::ChannelAdapter,
+ public CompletionHandler
{
class ConsumerImpl : public virtual Consumer
{
@@ -96,7 +96,7 @@ class Channel :
boost::scoped_ptr<BrokerAdapter> adapter;
- virtual void complete(Message::shared_ptr& msg);
+ virtual void complete(Message::shared_ptr msg);
void deliver(Message::shared_ptr& msg, const string& tag, Queue::shared_ptr& queue, bool ackExpected);
void cancel(consumer_iterator consumer);
bool checkPrefetch(Message::shared_ptr& msg);
diff --git a/cpp/lib/broker/BrokerMessageMessage.cpp b/cpp/lib/broker/BrokerMessageMessage.cpp
index 4168ff639c..e2c4b94811 100644
--- a/cpp/lib/broker/BrokerMessageMessage.cpp
+++ b/cpp/lib/broker/BrokerMessageMessage.cpp
@@ -18,25 +18,39 @@
* under the License.
*
*/
+#include <iostream>
#include "BrokerMessageMessage.h"
+#include "MessageTransferBody.h"
+#include "MessageAppendBody.h"
+#include "Reference.h"
+using namespace std;
using namespace qpid::broker;
-MessageMessage::MessageMessage(
- const qpid::framing::AMQMethodBody::shared_ptr _methodBody,
- const std::string& _exchange, const std::string& _routingKey,
- bool _mandatory, bool _immediate) :
- Message(_exchange, _routingKey, _mandatory, _immediate, _methodBody),
- methodBody(_methodBody)
-{
-}
+MessageMessage::MessageMessage(TransferPtr transfer_)
+ : Message(transfer_->getExchange(), transfer_->getRoutingKey(),
+ transfer_->getMandatory(), transfer_->getImmediate(),
+ transfer_),
+ transfer(transfer_)
+{}
+
+MessageMessage::MessageMessage(TransferPtr transfer_, const Reference& ref)
+ : Message(transfer_->getExchange(), transfer_->getRoutingKey(),
+ transfer_->getMandatory(), transfer_->getImmediate(),
+ transfer_),
+ transfer(transfer_),
+ appends(ref.getAppends())
+{}
void MessageMessage::deliver(
- framing::ChannelAdapter& /*out*/,
+ framing::ChannelAdapter& /*channel*/,
const std::string& /*consumerTag*/,
u_int64_t /*deliveryTag*/,
u_int32_t /*framesize*/)
{
+ // FIXME aconway 2007-02-05:
+ cout << "MessageMessage::deliver" << *transfer << " + " << appends.size()
+ << " appends." << endl;
}
void MessageMessage::sendGetOk(
@@ -45,49 +59,50 @@ void MessageMessage::sendGetOk(
u_int64_t /*deliveryTag*/,
u_int32_t /*framesize*/)
{
+ // FIXME aconway 2007-02-05:
}
bool MessageMessage::isComplete()
{
- return true;
+ return true; // FIXME aconway 2007-02-05:
}
u_int64_t MessageMessage::contentSize() const
{
- return 0;
+ return 0; // FIXME aconway 2007-02-05:
}
qpid::framing::BasicHeaderProperties* MessageMessage::getHeaderProperties()
{
- return 0;
+ return 0; // FIXME aconway 2007-02-05:
}
bool MessageMessage::isPersistent()
{
- return false;
+ return false; // FIXME aconway 2007-02-05:
}
const ConnectionToken* const MessageMessage::getPublisher()
{
- return 0;
+ return 0; // FIXME aconway 2007-02-05:
}
u_int32_t MessageMessage::encodedSize()
{
- return 0;
+ return 0; // FIXME aconway 2007-02-05:
}
u_int32_t MessageMessage::encodedHeaderSize()
{
- return 0;
+ return 0; // FIXME aconway 2007-02-05:
}
u_int32_t MessageMessage::encodedContentSize()
{
- return 0;
+ return 0; // FIXME aconway 2007-02-05:
}
u_int64_t MessageMessage::expectedContentSize()
{
- return 0;
+ return 0; // FIXME aconway 2007-02-05:
}
diff --git a/cpp/lib/broker/BrokerMessageMessage.h b/cpp/lib/broker/BrokerMessageMessage.h
index cad5cf15b0..aa136863a1 100644
--- a/cpp/lib/broker/BrokerMessageMessage.h
+++ b/cpp/lib/broker/BrokerMessageMessage.h
@@ -21,23 +21,28 @@
* under the License.
*
*/
-
+#include <vector>
#include "BrokerMessageBase.h"
+#include "Reference.h"
namespace qpid {
+
namespace framing {
-class AMQMethodBody;
+class MessageTransferBody;
+class MessageApppendBody;
}
namespace broker {
-class MessageMessage: public Message{
- const qpid::framing::AMQMethodBody::shared_ptr methodBody;
+class Reference;
+class MessageMessage: public Message{
public:
- MessageMessage(
- const framing::AMQMethodBody::shared_ptr methodBody,
- const std::string& exchange, const std::string& routingKey,
- bool mandatory, bool immediate);
+ typedef Reference::TransferPtr TransferPtr;
+ typedef Reference::AppendPtr AppendPtr;
+ typedef Reference::Appends Appends;
+
+ MessageMessage(TransferPtr transfer);
+ MessageMessage(TransferPtr transfer, const Reference&);
// Default destructor okay
@@ -52,7 +57,7 @@ class MessageMessage: public Message{
u_int32_t framesize);
bool isComplete();
-
+
u_int64_t contentSize() const;
qpid::framing::BasicHeaderProperties* getHeaderProperties();
bool isPersistent();
@@ -62,10 +67,16 @@ class MessageMessage: public Message{
u_int32_t encodedHeaderSize();
u_int32_t encodedContentSize();
u_int64_t expectedContentSize();
+
+ TransferPtr getTransfer() { return transfer; }
+ const Appends& getAppends() { return appends; }
+ private:
+
+ const TransferPtr transfer;
+ const Appends appends;
};
-}
-}
+}}
#endif /*!_broker_BrokerMessage_h*/
diff --git a/cpp/lib/broker/CompletionHandler.h b/cpp/lib/broker/CompletionHandler.h
new file mode 100644
index 0000000000..9d51656282
--- /dev/null
+++ b/cpp/lib/broker/CompletionHandler.h
@@ -0,0 +1,39 @@
+#ifndef _broker_CompletionHandler_h
+#define _broker_CompletionHandler_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Callback interface to handle completion of a message.
+ */
+class CompletionHandler
+{
+ public:
+ virtual ~CompletionHandler(){}
+ virtual void complete(Message::shared_ptr) = 0;
+};
+
+}} // namespace qpid::broker
+
+
+
+#endif /*!_broker_CompletionHandler_h*/
diff --git a/cpp/lib/broker/ExchangeRegistry.cpp b/cpp/lib/broker/ExchangeRegistry.cpp
index 7bf96c4544..3e5ed89b54 100644
--- a/cpp/lib/broker/ExchangeRegistry.cpp
+++ b/cpp/lib/broker/ExchangeRegistry.cpp
@@ -59,7 +59,10 @@ void ExchangeRegistry::destroy(const string& name){
Exchange::shared_ptr ExchangeRegistry::get(const string& name){
Mutex::ScopedLock locker(lock);
- return exchanges[name];
+ Exchange::shared_ptr exchange =exchanges[name];
+ if (!exchange)
+ throw ChannelException(404, "Exchange not found:" + name);
+ return exchange;
}
namespace
diff --git a/cpp/lib/broker/Makefile.am b/cpp/lib/broker/Makefile.am
index 064b592124..760c6d61e2 100644
--- a/cpp/lib/broker/Makefile.am
+++ b/cpp/lib/broker/Makefile.am
@@ -69,6 +69,8 @@ libqpidbroker_la_SOURCES = \
QueueRegistry.h \
RecoveryManager.cpp \
RecoveryManager.h \
+ Reference.cpp \
+ Reference.h \
ConnectionFactory.cpp \
ConnectionFactory.h \
Connection.cpp \
diff --git a/cpp/lib/broker/MessageBuilder.cpp b/cpp/lib/broker/MessageBuilder.cpp
index 41bf812d2d..69e771c793 100644
--- a/cpp/lib/broker/MessageBuilder.cpp
+++ b/cpp/lib/broker/MessageBuilder.cpp
@@ -27,7 +27,10 @@ using namespace qpid::broker;
using namespace qpid::framing;
using std::auto_ptr;
-MessageBuilder::MessageBuilder(CompletionHandler* _handler, MessageStore* const _store, u_int64_t _stagingThreshold) :
+MessageBuilder::MessageBuilder(CompletionHandler* _handler,
+ MessageStore* const _store,
+ u_int64_t _stagingThreshold
+) :
handler(_handler),
store(_store),
stagingThreshold(_stagingThreshold)
diff --git a/cpp/lib/broker/MessageBuilder.h b/cpp/lib/broker/MessageBuilder.h
index 5b8516be42..f0b90a86cd 100644
--- a/cpp/lib/broker/MessageBuilder.h
+++ b/cpp/lib/broker/MessageBuilder.h
@@ -29,22 +29,19 @@
#include <AMQContentBody.h>
#include <AMQHeaderBody.h>
#include <BasicPublishBody.h>
+#include "CompletionHandler.h"
namespace qpid {
namespace broker {
class MessageBuilder{
public:
- class CompletionHandler{
- public:
- virtual void complete(Message::shared_ptr&) = 0;
- virtual ~CompletionHandler(){}
- };
MessageBuilder(CompletionHandler* _handler,
MessageStore* const store = 0,
u_int64_t stagingThreshold = 0);
void initialise(Message::shared_ptr& msg);
- void setHeader(qpid::framing::AMQHeaderBody::shared_ptr& header);
- void addContent(qpid::framing::AMQContentBody::shared_ptr& content);
+ void setHeader(framing::AMQHeaderBody::shared_ptr& header);
+ void addContent(framing::AMQContentBody::shared_ptr& content);
+ Message::shared_ptr getMessage() { return message; }
private:
Message::shared_ptr message;
CompletionHandler* handler;
diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp
index 71100996e7..30b69e4654 100644
--- a/cpp/lib/broker/MessageHandlerImpl.cpp
+++ b/cpp/lib/broker/MessageHandlerImpl.cpp
@@ -23,6 +23,8 @@
#include "Connection.h"
#include "Broker.h"
#include "BrokerMessageMessage.h"
+#include "MessageAppendBody.h"
+#include "MessageTransferBody.h"
namespace qpid {
namespace broker {
@@ -33,23 +35,23 @@ using namespace framing;
// Message class method handlers
//
void
-MessageHandlerImpl::append(const MethodContext&,
- const string& /*reference*/,
+MessageHandlerImpl::append(const MethodContext& context,
+ const string& reference,
const string& /*bytes*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ references.get(reference).append(
+ boost::shared_polymorphic_downcast<MessageAppendBody>(
+ context.methodBody));
+ sendOk(context);
}
void
-MessageHandlerImpl::cancel( const MethodContext& context,
- const string& destination )
+MessageHandlerImpl::cancel(const MethodContext& context,
+ const string& destination )
{
- //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-
channel.cancel(destination);
-
- connection.client->getMessageHandler()->ok(context);
+ sendOk(context);
}
void
@@ -61,10 +63,11 @@ MessageHandlerImpl::checkpoint(const MethodContext&,
}
void
-MessageHandlerImpl::close(const MethodContext&,
- const string& /*reference*/ )
+MessageHandlerImpl::close(const MethodContext& context,
+ const string& reference)
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ references.get(reference).close();
+ sendOk(context);
}
void
@@ -88,13 +91,16 @@ MessageHandlerImpl::consume(const MethodContext& context,
string newTag = destination;
channel.consume(newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter);
- connection.client->getMessageHandler()->ok(context);
+ sendOk(context);
//allow messages to be dispatched if required as there is now a consumer:
queue->dispatch();
}catch(ExclusiveAccessException& e){
- if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted");
- else throw ChannelException(403, "Access would violate previously granted exclusivity");
+ if(exclusive)
+ throw ChannelException(403, "Exclusive access cannot be granted");
+ else
+ throw ChannelException(
+ 403, "Access would violate previously granted exclusivity");
}
}
@@ -133,14 +139,15 @@ MessageHandlerImpl::offset(const MethodContext&,
void
MessageHandlerImpl::ok( const MethodContext& )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ // TODO aconway 2007-02-05: For HA, we can drop acked messages here.
}
void
-MessageHandlerImpl::open(const MethodContext&,
- const string& /*reference*/ )
+MessageHandlerImpl::open(const MethodContext& context,
+ const string& reference)
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ references.open(reference);
+ sendOk(context);
}
void
@@ -155,7 +162,7 @@ MessageHandlerImpl::qos(const MethodContext& context,
channel.setPrefetchSize(prefetchSize);
channel.setPrefetchCount(prefetchCount);
- connection.client->getMessageHandler()->ok(context);
+ sendOk(context);
}
void
@@ -189,14 +196,14 @@ MessageHandlerImpl::transfer(const MethodContext& context,
u_int16_t /*ticket*/,
const string& /*destination*/,
bool /*redelivered*/,
- bool immediate,
+ bool /* immediate */,
u_int64_t /*ttl*/,
u_int8_t /*priority*/,
u_int64_t /*timestamp*/,
u_int8_t /*deliveryMode*/,
u_int64_t /*expiration*/,
const string& exchangeName,
- const string& routingKey,
+ const string& /* routingKey */,
const string& /*messageId*/,
const string& /*correlationId*/,
const string& /*replyTo*/,
@@ -208,27 +215,28 @@ MessageHandlerImpl::transfer(const MethodContext& context,
const string& /*securityToken*/,
const qpid::framing::FieldTable& /*applicationHeaders*/,
qpid::framing::Content body,
- bool mandatory )
+ bool /* mandatory */ )
{
//assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-
- Exchange::shared_ptr exchange = exchangeName.empty() ?
- broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName);
- if(exchange){
- if (body.isInline()) {
- MessageMessage* msg =
- new MessageMessage(context.methodBody, exchangeName,
- routingKey, mandatory, immediate);
- channel.handlePublish(msg, exchange);
-
- connection.client->getMessageHandler()->ok(context);
- } else {
- // Don't handle reference content yet
- assert(body.isInline());
- }
- }else{
- throw ChannelException(404, "Exchange not found '" + exchangeName + "'");
+ MessageTransferBody::shared_ptr transfer =
+ boost::shared_polymorphic_downcast<MessageTransferBody>(
+ context.methodBody);
+ // Verify the exchange exists, will throw if not.
+ broker.getExchanges().get(exchangeName);
+ if (body.isInline()) {
+ MessageMessage* msg = new MessageMessage(transfer);
+ // FIXME aconway 2007-02-05: Remove exchange parameter.
+ // use shared_ptr for message.
+ channel.handlePublish(msg, Exchange::shared_ptr());
+ sendOk(context);
+ } else {
+ references.get(body.getValue()).transfer(transfer);
}
}
+
+void MessageHandlerImpl::sendOk(const MethodContext& context) {
+ connection.client->getMessageHandler()->ok(context);
+}
+
}} // namespace qpid::broker
diff --git a/cpp/lib/broker/MessageHandlerImpl.h b/cpp/lib/broker/MessageHandlerImpl.h
index 985efe3847..886ca5fb54 100644
--- a/cpp/lib/broker/MessageHandlerImpl.h
+++ b/cpp/lib/broker/MessageHandlerImpl.h
@@ -19,23 +19,25 @@
*
*/
+#include <memory>
+
#include "AMQP_ServerOperations.h"
+#include "Reference.h"
+#include "BrokerChannel.h"
namespace qpid {
namespace broker {
-class Channel;
class Connection;
class Broker;
+class MessageMessage;
-class MessageHandlerImpl : public qpid::framing::AMQP_ServerOperations::MessageHandler {
- Channel& channel;
- Connection& connection;
- Broker& broker;
-
+class MessageHandlerImpl :
+ public framing::AMQP_ServerOperations::MessageHandler
+{
public:
MessageHandlerImpl(Channel& ch, Connection& c, Broker& b)
- : channel(ch), connection(c), broker(b) {}
+ : channel(ch), connection(c), broker(b), references(ch) {}
void append(const framing::MethodContext&,
const std::string& reference,
@@ -116,6 +118,13 @@ class MessageHandlerImpl : public qpid::framing::AMQP_ServerOperations::MessageH
const framing::FieldTable& applicationHeaders,
framing::Content body,
bool mandatory );
+ private:
+ void sendOk(const framing::MethodContext&);
+
+ Channel& channel;
+ Connection& connection;
+ Broker& broker;
+ ReferenceRegistry references;
};
}} // namespace qpid::broker
diff --git a/cpp/lib/broker/Reference.cpp b/cpp/lib/broker/Reference.cpp
new file mode 100644
index 0000000000..a5e734d77a
--- /dev/null
+++ b/cpp/lib/broker/Reference.cpp
@@ -0,0 +1,56 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <boost/bind.hpp>
+#include "Reference.h"
+#include "BrokerMessageMessage.h"
+#include "QpidError.h"
+#include "CompletionHandler.h"
+
+namespace qpid {
+namespace broker {
+
+Reference& ReferenceRegistry::open(const Reference::Id& id) {
+ ReferenceMap::iterator i = references.find(id);
+ // TODO aconway 2007-02-05: should we throw Channel or Connection
+ // exceptions here?
+ if (i != references.end())
+ THROW_QPID_ERROR(CLIENT_ERROR, "Attempt to re-open reference " +id);
+ return references[id] = Reference(id, this);
+}
+
+Reference& ReferenceRegistry::get(const Reference::Id& id) {
+ ReferenceMap::iterator i = references.find(id);
+ if (i == references.end())
+ THROW_QPID_ERROR(
+ CLIENT_ERROR, "Attempt to use non-existent reference "+id);
+ return i->second;
+}
+
+void Reference::close() {
+ for_each(transfers.begin(), transfers.end(),
+ boost::bind(&Reference::complete, this, _1));
+ registry->references.erase(getId());
+}
+
+void Reference::complete(TransferPtr transfer) {
+ MessageMessage::shared_ptr msg(new MessageMessage(transfer, *this));
+ registry->handler.complete(msg);
+}
+
+}} // namespace qpid::broker
diff --git a/cpp/lib/broker/Reference.h b/cpp/lib/broker/Reference.h
new file mode 100644
index 0000000000..ecaca3de41
--- /dev/null
+++ b/cpp/lib/broker/Reference.h
@@ -0,0 +1,111 @@
+#ifndef _broker_Reference_h
+#define _broker_Reference_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <string>
+#include <vector>
+#include <map>
+#include <boost/shared_ptr.hpp>
+#include <boost/range.hpp>
+
+namespace qpid {
+
+namespace framing {
+class MessageTransferBody;
+class MessageAppendBody;
+}
+
+namespace broker {
+
+class CompletionHandler;
+class ReferenceRegistry;
+
+/**
+ * A reference is an accumulation point for data in a multi-frame
+ * message. A reference can be used by multiple transfer commands, so
+ * the reference tracks which commands are using it. When the reference
+ * is closed, all the associated transfers are completed.
+ *
+ * THREAD UNSAFE: per-channel resource, access to channels is
+ * serialized.
+ */
+class Reference
+{
+ public:
+ typedef std::string Id;
+ typedef boost::shared_ptr<framing::MessageTransferBody> TransferPtr;
+ typedef std::vector<TransferPtr> Transfers;
+ typedef boost::shared_ptr<framing::MessageAppendBody> AppendPtr;
+ typedef std::vector<AppendPtr> Appends;
+
+ Reference(const Id& id_=Id(), ReferenceRegistry* reg=0)
+ : id(id_), registry(reg) {}
+
+ const std::string& getId() const { return id; }
+
+ /** Add a transfer to be completed with this reference */
+ void transfer(TransferPtr transfer) { transfers.push_back(transfer); }
+
+ /** Append more data to the reference */
+ void append(AppendPtr ptr) { appends.push_back(ptr); }
+
+ /** Close the reference, complete each associated transfer */
+ void close();
+
+ const Appends& getAppends() const { return appends; }
+ const Transfers& getTransfers() const { return transfers; }
+
+ private:
+ void complete(TransferPtr transfer);
+
+ Id id;
+ ReferenceRegistry* registry;
+ Transfers transfers;
+ Appends appends;
+};
+
+
+/**
+ * A registry/factory for references.
+ *
+ * THREAD UNSAFE: per-channel resource, access to channels is
+ * serialized.
+ */
+class ReferenceRegistry {
+ public:
+ ReferenceRegistry(CompletionHandler& handler_) : handler(handler_) {};
+ Reference& open(const Reference::Id& id);
+ Reference& get(const Reference::Id& id);
+
+ private:
+ typedef std::map<Reference::Id, Reference> ReferenceMap;
+ CompletionHandler& handler;
+ ReferenceMap references;
+
+ // Reference calls references.erase() and uses handler.
+ friend class Reference;
+};
+
+
+}} // namespace qpid::broker
+
+
+
+#endif /*!_broker_Reference_h*/
diff --git a/cpp/lib/common/framing/MethodContext.h b/cpp/lib/common/framing/MethodContext.h
index d9717d90a0..afb499023d 100644
--- a/cpp/lib/common/framing/MethodContext.h
+++ b/cpp/lib/common/framing/MethodContext.h
@@ -64,7 +64,10 @@ struct MethodContext
};
// FIXME aconway 2007-02-01: Method context only required on Handler
-// functions, not on Proxy functions.
+// functions, not on Proxy functions. If we add set/getChannel(ChannelAdapter*)
+// on AMQBody and set it during decodeing then we could get rid of the context.
+
+
}} // namespace qpid::framing