diff options
| author | Alan Conway <aconway@apache.org> | 2007-02-06 15:01:45 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-02-06 15:01:45 +0000 |
| commit | fbd97f554b04a109c95c01fe6ad538c5f50161af (patch) | |
| tree | 0324d02ee4f8d6ca2387d1d3ff85bcd61a123a34 /cpp | |
| parent | 80b1b0b5f443bfb3c9d62a80e1419c224d0229d8 (diff) | |
| download | qpid-python-fbd97f554b04a109c95c01fe6ad538c5f50161af.tar.gz | |
* broker/Reference, tests/ReferenceTest: class representing a reference.
* broker/BrokerChannel.cpp (complete): get destination exchange from Message,
don't assume only one message in progress (could have multiple
references open.)
* broker/BrokerMessageMessage.cpp,.h: Contains transfer body and
vector of append bodies. Construct from Reference.
* broker/CompletionHandler.h: Extracted from BrokerMessage, used for
MessageMessage also.
* broker/ExchangeRegistry.cpp: Moved throw for missing exchanges to
registry.
* cpp/tests/start_broker: Increased wait time to 5 secs.
* cpp/tests/*: renamed DummyChannel as MockChannel.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@504172 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
27 files changed, 530 insertions, 392 deletions
diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp index fa25221bbd..6f55f32d47 100644 --- a/cpp/lib/broker/BrokerAdapter.cpp +++ b/cpp/lib/broker/BrokerAdapter.cpp @@ -355,245 +355,5 @@ BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::resume( assert(0); // FIXME aconway 2007-01-04: 0-9 feature } - -// -// Message class method handlers -// -void -BrokerAdapter::MessageHandlerImpl::append( u_int16_t /*channel*/, - const string& /*reference*/, - const string& /*bytes*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - - -void -BrokerAdapter::MessageHandlerImpl::cancel( u_int16_t channel, - const string& destination ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature - - connection.getChannel(channel).cancel(destination); - - connection.client->getMessageHandler()->ok(channel); -} - -void -BrokerAdapter::MessageHandlerImpl::checkpoint( u_int16_t /*channel*/, - const string& /*reference*/, - const string& /*identifier*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -BrokerAdapter::MessageHandlerImpl::close( u_int16_t /*channel*/, - const string& /*reference*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -BrokerAdapter::MessageHandlerImpl::consume( u_int16_t channelId, - u_int16_t /*ticket*/, - const string& queueName, - const string& destination, - bool noLocal, - bool noAck, - bool exclusive, - const qpid::framing::FieldTable& filter ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature - - Queue::shared_ptr queue = connection.getQueue(queueName, channelId); - Channel& channel = connection.getChannel(channelId); - if(!destination.empty() && channel.exists(destination)){ - throw ConnectionException(530, "Consumer tags must be unique"); - } - - try{ - string newTag = destination; - channel.consume(newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter); - - connection.client->getMessageHandler()->ok(channelId); - - //allow messages to be dispatched if required as there is now a consumer: - queue->dispatch(); - }catch(ExclusiveAccessException& e){ - if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted"); - else throw ChannelException(403, "Access would violate previously granted exclusivity"); - } - - connection.getChannel(channel).cancel(destination); - - connection.client->getMessageHandler()->ok(channel); -} - -void -BrokerAdapter::MessageHandlerImpl::empty( u_int16_t /*channel*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -BrokerAdapter::MessageHandlerImpl::get( u_int16_t channelId, - u_int16_t /*ticket*/, - const string& queueName, - const string& /*destination*/, - bool noAck ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature - - Queue::shared_ptr queue = connection.getQueue(queueName, channelId); - - // FIXME: get is probably Basic specific - if(!connection.getChannel(channelId).get(queue, !noAck)){ - - connection.client->getMessageHandler()->empty(channelId); - } - -} - -void -BrokerAdapter::MessageHandlerImpl::offset( u_int16_t /*channel*/, - u_int64_t /*value*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature - - Queue::shared_ptr queue = connection.getQueue(queueName, channelId); - Channel& channel = connection.getChannel(channelId); - if(!destination.empty() && channel.exists(destination)){ - throw ConnectionException(530, "Consumer tags must be unique"); - } - - try{ - string newTag = destination; - channel.consume(newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter); - - connection.client->getMessageHandler()->ok(channelId); - - //allow messages to be dispatched if required as there is now a consumer: - queue->dispatch(); - }catch(ExclusiveAccessException& e){ - if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted"); - else throw ChannelException(403, "Access would violate previously granted exclusivity"); - } - - connection.getChannel(channel).cancel(destination); - - connection.client->getMessageHandler()->ok(channel); -} - -void -BrokerAdapter::MessageHandlerImpl::ok( u_int16_t /*channel*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -BrokerAdapter::MessageHandlerImpl::open( u_int16_t /*channel*/, - const string& /*reference*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -BrokerAdapter::MessageHandlerImpl::qos( u_int16_t channel, - u_int32_t prefetchSize, - u_int16_t prefetchCount, - bool /*global*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature - - //TODO: handle global - connection.getChannel(channel).setPrefetchSize(prefetchSize); - connection.getChannel(channel).setPrefetchCount(prefetchCount); - - connection.client->getMessageHandler()->ok(channel); - - Queue::shared_ptr queue = connection.getQueue(queueName, channelId); - Channel& channel = connection.getChannel(channelId); - if(!destination.empty() && channel.exists(destination)){ - throw ConnectionException(530, "Consumer tags must be unique"); - } - - try{ - string newTag = destination; - channel.consume(newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter); - - connection.client->getMessageHandler()->ok(channelId); - - //allow messages to be dispatched if required as there is now a consumer: - queue->dispatch(); - }catch(ExclusiveAccessException& e){ - if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted"); - else throw ChannelException(403, "Access would violate previously granted exclusivity"); - } -} - -void -BrokerAdapter::MessageHandlerImpl::recover( u_int16_t channel, - bool requeue ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature - - connection.getChannel(channel).recover(requeue); - -} - -void -BrokerAdapter::MessageHandlerImpl::reject( u_int16_t /*channel*/, - u_int16_t /*code*/, - const string& /*text*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -BrokerAdapter::MessageHandlerImpl::resume( u_int16_t /*channel*/, - const string& /*reference*/, - const string& /*identifier*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -BrokerAdapter::MessageHandlerImpl::transfer( u_int16_t channel, - u_int16_t /*ticket*/, - const string& /*destination*/, - bool /*redelivered*/, - bool immediate, - u_int64_t /*ttl*/, - u_int8_t /*priority*/, - u_int64_t /*timestamp*/, - u_int8_t /*deliveryMode*/, - u_int64_t /*expiration*/, - const string& exchangeName, - const string& routingKey, - const string& /*messageId*/, - const string& /*correlationId*/, - const string& /*replyTo*/, - const string& /*contentType*/, - const string& /*contentEncoding*/, - const string& /*userId*/, - const string& /*appId*/, - const string& /*transactionId*/, - const string& /*securityToken*/, - const qpid::framing::FieldTable& /*applicationHeaders*/, - qpid::framing::Content /*body*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature - - Exchange::shared_ptr exchange = exchangeName.empty() ? - connection.broker.getExchanges().getDefault() : connection.broker.getExchanges().get(exchangeName); - if(exchange){ - Message* msg = new Message(&connection, exchangeName, routingKey, false /*mandatory?*/, immediate); - connection.getChannel(channel).handlePublish(msg, exchange); - }else{ - throw ChannelException(404, "Exchange not found '" + exchangeName + "'"); - } -} - }} // namespace qpid::broker diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp index 96215a60ed..c0250815e8 100644 --- a/cpp/lib/broker/BrokerChannel.cpp +++ b/cpp/lib/broker/BrokerChannel.cpp @@ -78,7 +78,7 @@ void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection, const FieldTable*) { - if(tag.empty()) tag = tagGenerator.generate(); + if(tag.empty()) tag = tagGenerator.generate(); ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks)); try{ queue->consume(c, exclusive);//may throw exception @@ -187,6 +187,8 @@ void Channel::ConsumerImpl::requestDispatch(){ if(blocked) queue->dispatch(); } +// FIXME aconway 2007-02-05: Drop exchange member, calculate from +// message in ::complete(). void Channel::handlePublish(Message* _message, Exchange::shared_ptr _exchange){ Message::shared_ptr message(_message); exchange = _exchange; @@ -207,19 +209,19 @@ void Channel::handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>) { // TODO aconway 2007-01-17: Implement heartbeating. } -void Channel::complete(Message::shared_ptr& msg){ - if(exchange){ - if(transactional){ - TxPublish* deliverable = new TxPublish(msg); - exchange->route(*deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders())); - txBuffer.enlist(new DeletingTxOp(deliverable)); - }else{ - DeliverableMessage deliverable(msg); - exchange->route(deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders())); - } - exchange.reset(); - }else{ - std::cout << "Exchange not known in Channel::complete(Message::shared_ptr&)" << std::endl; +void Channel::complete(Message::shared_ptr msg) { + Exchange::shared_ptr exchange = + connection.broker.getExchanges().get(msg->getExchange()); + assert(exchange.get()); + if(transactional) { + std::auto_ptr<TxPublish> deliverable(new TxPublish(msg)); + exchange->route(*deliverable, msg->getRoutingKey(), + &(msg->getHeaderProperties()->getHeaders())); + txBuffer.enlist(new DeletingTxOp(deliverable.release())); + } else { + DeliverableMessage deliverable(msg); + exchange->route(deliverable, msg->getRoutingKey(), + &(msg->getHeaderProperties()->getHeaders())); } } diff --git a/cpp/lib/broker/BrokerChannel.h b/cpp/lib/broker/BrokerChannel.h index dd95e944bb..484a4d64e3 100644 --- a/cpp/lib/broker/BrokerChannel.h +++ b/cpp/lib/broker/BrokerChannel.h @@ -36,6 +36,7 @@ #include <Prefetch.h> #include <TxBuffer.h> #include "framing/ChannelAdapter.h" +#include "CompletionHandler.h" namespace qpid { namespace broker { @@ -51,9 +52,8 @@ using framing::string; * Maintains state for an AMQP channel. Handles incoming and * outgoing messages for that channel. */ -class Channel : - public framing::ChannelAdapter, - private MessageBuilder::CompletionHandler +class Channel : public framing::ChannelAdapter, + public CompletionHandler { class ConsumerImpl : public virtual Consumer { @@ -96,7 +96,7 @@ class Channel : boost::scoped_ptr<BrokerAdapter> adapter; - virtual void complete(Message::shared_ptr& msg); + virtual void complete(Message::shared_ptr msg); void deliver(Message::shared_ptr& msg, const string& tag, Queue::shared_ptr& queue, bool ackExpected); void cancel(consumer_iterator consumer); bool checkPrefetch(Message::shared_ptr& msg); diff --git a/cpp/lib/broker/BrokerMessageMessage.cpp b/cpp/lib/broker/BrokerMessageMessage.cpp index 4168ff639c..e2c4b94811 100644 --- a/cpp/lib/broker/BrokerMessageMessage.cpp +++ b/cpp/lib/broker/BrokerMessageMessage.cpp @@ -18,25 +18,39 @@ * under the License. * */ +#include <iostream> #include "BrokerMessageMessage.h" +#include "MessageTransferBody.h" +#include "MessageAppendBody.h" +#include "Reference.h" +using namespace std; using namespace qpid::broker; -MessageMessage::MessageMessage( - const qpid::framing::AMQMethodBody::shared_ptr _methodBody, - const std::string& _exchange, const std::string& _routingKey, - bool _mandatory, bool _immediate) : - Message(_exchange, _routingKey, _mandatory, _immediate, _methodBody), - methodBody(_methodBody) -{ -} +MessageMessage::MessageMessage(TransferPtr transfer_) + : Message(transfer_->getExchange(), transfer_->getRoutingKey(), + transfer_->getMandatory(), transfer_->getImmediate(), + transfer_), + transfer(transfer_) +{} + +MessageMessage::MessageMessage(TransferPtr transfer_, const Reference& ref) + : Message(transfer_->getExchange(), transfer_->getRoutingKey(), + transfer_->getMandatory(), transfer_->getImmediate(), + transfer_), + transfer(transfer_), + appends(ref.getAppends()) +{} void MessageMessage::deliver( - framing::ChannelAdapter& /*out*/, + framing::ChannelAdapter& /*channel*/, const std::string& /*consumerTag*/, u_int64_t /*deliveryTag*/, u_int32_t /*framesize*/) { + // FIXME aconway 2007-02-05: + cout << "MessageMessage::deliver" << *transfer << " + " << appends.size() + << " appends." << endl; } void MessageMessage::sendGetOk( @@ -45,49 +59,50 @@ void MessageMessage::sendGetOk( u_int64_t /*deliveryTag*/, u_int32_t /*framesize*/) { + // FIXME aconway 2007-02-05: } bool MessageMessage::isComplete() { - return true; + return true; // FIXME aconway 2007-02-05: } u_int64_t MessageMessage::contentSize() const { - return 0; + return 0; // FIXME aconway 2007-02-05: } qpid::framing::BasicHeaderProperties* MessageMessage::getHeaderProperties() { - return 0; + return 0; // FIXME aconway 2007-02-05: } bool MessageMessage::isPersistent() { - return false; + return false; // FIXME aconway 2007-02-05: } const ConnectionToken* const MessageMessage::getPublisher() { - return 0; + return 0; // FIXME aconway 2007-02-05: } u_int32_t MessageMessage::encodedSize() { - return 0; + return 0; // FIXME aconway 2007-02-05: } u_int32_t MessageMessage::encodedHeaderSize() { - return 0; + return 0; // FIXME aconway 2007-02-05: } u_int32_t MessageMessage::encodedContentSize() { - return 0; + return 0; // FIXME aconway 2007-02-05: } u_int64_t MessageMessage::expectedContentSize() { - return 0; + return 0; // FIXME aconway 2007-02-05: } diff --git a/cpp/lib/broker/BrokerMessageMessage.h b/cpp/lib/broker/BrokerMessageMessage.h index cad5cf15b0..aa136863a1 100644 --- a/cpp/lib/broker/BrokerMessageMessage.h +++ b/cpp/lib/broker/BrokerMessageMessage.h @@ -21,23 +21,28 @@ * under the License. * */ - +#include <vector> #include "BrokerMessageBase.h" +#include "Reference.h" namespace qpid { + namespace framing { -class AMQMethodBody; +class MessageTransferBody; +class MessageApppendBody; } namespace broker { -class MessageMessage: public Message{ - const qpid::framing::AMQMethodBody::shared_ptr methodBody; +class Reference; +class MessageMessage: public Message{ public: - MessageMessage( - const framing::AMQMethodBody::shared_ptr methodBody, - const std::string& exchange, const std::string& routingKey, - bool mandatory, bool immediate); + typedef Reference::TransferPtr TransferPtr; + typedef Reference::AppendPtr AppendPtr; + typedef Reference::Appends Appends; + + MessageMessage(TransferPtr transfer); + MessageMessage(TransferPtr transfer, const Reference&); // Default destructor okay @@ -52,7 +57,7 @@ class MessageMessage: public Message{ u_int32_t framesize); bool isComplete(); - + u_int64_t contentSize() const; qpid::framing::BasicHeaderProperties* getHeaderProperties(); bool isPersistent(); @@ -62,10 +67,16 @@ class MessageMessage: public Message{ u_int32_t encodedHeaderSize(); u_int32_t encodedContentSize(); u_int64_t expectedContentSize(); + + TransferPtr getTransfer() { return transfer; } + const Appends& getAppends() { return appends; } + private: + + const TransferPtr transfer; + const Appends appends; }; -} -} +}} #endif /*!_broker_BrokerMessage_h*/ diff --git a/cpp/lib/broker/CompletionHandler.h b/cpp/lib/broker/CompletionHandler.h new file mode 100644 index 0000000000..9d51656282 --- /dev/null +++ b/cpp/lib/broker/CompletionHandler.h @@ -0,0 +1,39 @@ +#ifndef _broker_CompletionHandler_h +#define _broker_CompletionHandler_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +namespace qpid { +namespace broker { + +/** + * Callback interface to handle completion of a message. + */ +class CompletionHandler +{ + public: + virtual ~CompletionHandler(){} + virtual void complete(Message::shared_ptr) = 0; +}; + +}} // namespace qpid::broker + + + +#endif /*!_broker_CompletionHandler_h*/ diff --git a/cpp/lib/broker/ExchangeRegistry.cpp b/cpp/lib/broker/ExchangeRegistry.cpp index 7bf96c4544..3e5ed89b54 100644 --- a/cpp/lib/broker/ExchangeRegistry.cpp +++ b/cpp/lib/broker/ExchangeRegistry.cpp @@ -59,7 +59,10 @@ void ExchangeRegistry::destroy(const string& name){ Exchange::shared_ptr ExchangeRegistry::get(const string& name){ Mutex::ScopedLock locker(lock); - return exchanges[name]; + Exchange::shared_ptr exchange =exchanges[name]; + if (!exchange) + throw ChannelException(404, "Exchange not found:" + name); + return exchange; } namespace diff --git a/cpp/lib/broker/Makefile.am b/cpp/lib/broker/Makefile.am index 064b592124..760c6d61e2 100644 --- a/cpp/lib/broker/Makefile.am +++ b/cpp/lib/broker/Makefile.am @@ -69,6 +69,8 @@ libqpidbroker_la_SOURCES = \ QueueRegistry.h \ RecoveryManager.cpp \ RecoveryManager.h \ + Reference.cpp \ + Reference.h \ ConnectionFactory.cpp \ ConnectionFactory.h \ Connection.cpp \ diff --git a/cpp/lib/broker/MessageBuilder.cpp b/cpp/lib/broker/MessageBuilder.cpp index 41bf812d2d..69e771c793 100644 --- a/cpp/lib/broker/MessageBuilder.cpp +++ b/cpp/lib/broker/MessageBuilder.cpp @@ -27,7 +27,10 @@ using namespace qpid::broker; using namespace qpid::framing; using std::auto_ptr; -MessageBuilder::MessageBuilder(CompletionHandler* _handler, MessageStore* const _store, u_int64_t _stagingThreshold) : +MessageBuilder::MessageBuilder(CompletionHandler* _handler, + MessageStore* const _store, + u_int64_t _stagingThreshold +) : handler(_handler), store(_store), stagingThreshold(_stagingThreshold) diff --git a/cpp/lib/broker/MessageBuilder.h b/cpp/lib/broker/MessageBuilder.h index 5b8516be42..f0b90a86cd 100644 --- a/cpp/lib/broker/MessageBuilder.h +++ b/cpp/lib/broker/MessageBuilder.h @@ -29,22 +29,19 @@ #include <AMQContentBody.h> #include <AMQHeaderBody.h> #include <BasicPublishBody.h> +#include "CompletionHandler.h" namespace qpid { namespace broker { class MessageBuilder{ public: - class CompletionHandler{ - public: - virtual void complete(Message::shared_ptr&) = 0; - virtual ~CompletionHandler(){} - }; MessageBuilder(CompletionHandler* _handler, MessageStore* const store = 0, u_int64_t stagingThreshold = 0); void initialise(Message::shared_ptr& msg); - void setHeader(qpid::framing::AMQHeaderBody::shared_ptr& header); - void addContent(qpid::framing::AMQContentBody::shared_ptr& content); + void setHeader(framing::AMQHeaderBody::shared_ptr& header); + void addContent(framing::AMQContentBody::shared_ptr& content); + Message::shared_ptr getMessage() { return message; } private: Message::shared_ptr message; CompletionHandler* handler; diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp index 71100996e7..30b69e4654 100644 --- a/cpp/lib/broker/MessageHandlerImpl.cpp +++ b/cpp/lib/broker/MessageHandlerImpl.cpp @@ -23,6 +23,8 @@ #include "Connection.h" #include "Broker.h" #include "BrokerMessageMessage.h" +#include "MessageAppendBody.h" +#include "MessageTransferBody.h" namespace qpid { namespace broker { @@ -33,23 +35,23 @@ using namespace framing; // Message class method handlers // void -MessageHandlerImpl::append(const MethodContext&, - const string& /*reference*/, +MessageHandlerImpl::append(const MethodContext& context, + const string& reference, const string& /*bytes*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + references.get(reference).append( + boost::shared_polymorphic_downcast<MessageAppendBody>( + context.methodBody)); + sendOk(context); } void -MessageHandlerImpl::cancel( const MethodContext& context, - const string& destination ) +MessageHandlerImpl::cancel(const MethodContext& context, + const string& destination ) { - //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature - channel.cancel(destination); - - connection.client->getMessageHandler()->ok(context); + sendOk(context); } void @@ -61,10 +63,11 @@ MessageHandlerImpl::checkpoint(const MethodContext&, } void -MessageHandlerImpl::close(const MethodContext&, - const string& /*reference*/ ) +MessageHandlerImpl::close(const MethodContext& context, + const string& reference) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + references.get(reference).close(); + sendOk(context); } void @@ -88,13 +91,16 @@ MessageHandlerImpl::consume(const MethodContext& context, string newTag = destination; channel.consume(newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter); - connection.client->getMessageHandler()->ok(context); + sendOk(context); //allow messages to be dispatched if required as there is now a consumer: queue->dispatch(); }catch(ExclusiveAccessException& e){ - if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted"); - else throw ChannelException(403, "Access would violate previously granted exclusivity"); + if(exclusive) + throw ChannelException(403, "Exclusive access cannot be granted"); + else + throw ChannelException( + 403, "Access would violate previously granted exclusivity"); } } @@ -133,14 +139,15 @@ MessageHandlerImpl::offset(const MethodContext&, void MessageHandlerImpl::ok( const MethodContext& ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + // TODO aconway 2007-02-05: For HA, we can drop acked messages here. } void -MessageHandlerImpl::open(const MethodContext&, - const string& /*reference*/ ) +MessageHandlerImpl::open(const MethodContext& context, + const string& reference) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + references.open(reference); + sendOk(context); } void @@ -155,7 +162,7 @@ MessageHandlerImpl::qos(const MethodContext& context, channel.setPrefetchSize(prefetchSize); channel.setPrefetchCount(prefetchCount); - connection.client->getMessageHandler()->ok(context); + sendOk(context); } void @@ -189,14 +196,14 @@ MessageHandlerImpl::transfer(const MethodContext& context, u_int16_t /*ticket*/, const string& /*destination*/, bool /*redelivered*/, - bool immediate, + bool /* immediate */, u_int64_t /*ttl*/, u_int8_t /*priority*/, u_int64_t /*timestamp*/, u_int8_t /*deliveryMode*/, u_int64_t /*expiration*/, const string& exchangeName, - const string& routingKey, + const string& /* routingKey */, const string& /*messageId*/, const string& /*correlationId*/, const string& /*replyTo*/, @@ -208,27 +215,28 @@ MessageHandlerImpl::transfer(const MethodContext& context, const string& /*securityToken*/, const qpid::framing::FieldTable& /*applicationHeaders*/, qpid::framing::Content body, - bool mandatory ) + bool /* mandatory */ ) { //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature - - Exchange::shared_ptr exchange = exchangeName.empty() ? - broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName); - if(exchange){ - if (body.isInline()) { - MessageMessage* msg = - new MessageMessage(context.methodBody, exchangeName, - routingKey, mandatory, immediate); - channel.handlePublish(msg, exchange); - - connection.client->getMessageHandler()->ok(context); - } else { - // Don't handle reference content yet - assert(body.isInline()); - } - }else{ - throw ChannelException(404, "Exchange not found '" + exchangeName + "'"); + MessageTransferBody::shared_ptr transfer = + boost::shared_polymorphic_downcast<MessageTransferBody>( + context.methodBody); + // Verify the exchange exists, will throw if not. + broker.getExchanges().get(exchangeName); + if (body.isInline()) { + MessageMessage* msg = new MessageMessage(transfer); + // FIXME aconway 2007-02-05: Remove exchange parameter. + // use shared_ptr for message. + channel.handlePublish(msg, Exchange::shared_ptr()); + sendOk(context); + } else { + references.get(body.getValue()).transfer(transfer); } } + +void MessageHandlerImpl::sendOk(const MethodContext& context) { + connection.client->getMessageHandler()->ok(context); +} + }} // namespace qpid::broker diff --git a/cpp/lib/broker/MessageHandlerImpl.h b/cpp/lib/broker/MessageHandlerImpl.h index 985efe3847..886ca5fb54 100644 --- a/cpp/lib/broker/MessageHandlerImpl.h +++ b/cpp/lib/broker/MessageHandlerImpl.h @@ -19,23 +19,25 @@ * */ +#include <memory> + #include "AMQP_ServerOperations.h" +#include "Reference.h" +#include "BrokerChannel.h" namespace qpid { namespace broker { -class Channel; class Connection; class Broker; +class MessageMessage; -class MessageHandlerImpl : public qpid::framing::AMQP_ServerOperations::MessageHandler { - Channel& channel; - Connection& connection; - Broker& broker; - +class MessageHandlerImpl : + public framing::AMQP_ServerOperations::MessageHandler +{ public: MessageHandlerImpl(Channel& ch, Connection& c, Broker& b) - : channel(ch), connection(c), broker(b) {} + : channel(ch), connection(c), broker(b), references(ch) {} void append(const framing::MethodContext&, const std::string& reference, @@ -116,6 +118,13 @@ class MessageHandlerImpl : public qpid::framing::AMQP_ServerOperations::MessageH const framing::FieldTable& applicationHeaders, framing::Content body, bool mandatory ); + private: + void sendOk(const framing::MethodContext&); + + Channel& channel; + Connection& connection; + Broker& broker; + ReferenceRegistry references; }; }} // namespace qpid::broker diff --git a/cpp/lib/broker/Reference.cpp b/cpp/lib/broker/Reference.cpp new file mode 100644 index 0000000000..a5e734d77a --- /dev/null +++ b/cpp/lib/broker/Reference.cpp @@ -0,0 +1,56 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <boost/bind.hpp> +#include "Reference.h" +#include "BrokerMessageMessage.h" +#include "QpidError.h" +#include "CompletionHandler.h" + +namespace qpid { +namespace broker { + +Reference& ReferenceRegistry::open(const Reference::Id& id) { + ReferenceMap::iterator i = references.find(id); + // TODO aconway 2007-02-05: should we throw Channel or Connection + // exceptions here? + if (i != references.end()) + THROW_QPID_ERROR(CLIENT_ERROR, "Attempt to re-open reference " +id); + return references[id] = Reference(id, this); +} + +Reference& ReferenceRegistry::get(const Reference::Id& id) { + ReferenceMap::iterator i = references.find(id); + if (i == references.end()) + THROW_QPID_ERROR( + CLIENT_ERROR, "Attempt to use non-existent reference "+id); + return i->second; +} + +void Reference::close() { + for_each(transfers.begin(), transfers.end(), + boost::bind(&Reference::complete, this, _1)); + registry->references.erase(getId()); +} + +void Reference::complete(TransferPtr transfer) { + MessageMessage::shared_ptr msg(new MessageMessage(transfer, *this)); + registry->handler.complete(msg); +} + +}} // namespace qpid::broker diff --git a/cpp/lib/broker/Reference.h b/cpp/lib/broker/Reference.h new file mode 100644 index 0000000000..ecaca3de41 --- /dev/null +++ b/cpp/lib/broker/Reference.h @@ -0,0 +1,111 @@ +#ifndef _broker_Reference_h +#define _broker_Reference_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <string> +#include <vector> +#include <map> +#include <boost/shared_ptr.hpp> +#include <boost/range.hpp> + +namespace qpid { + +namespace framing { +class MessageTransferBody; +class MessageAppendBody; +} + +namespace broker { + +class CompletionHandler; +class ReferenceRegistry; + +/** + * A reference is an accumulation point for data in a multi-frame + * message. A reference can be used by multiple transfer commands, so + * the reference tracks which commands are using it. When the reference + * is closed, all the associated transfers are completed. + * + * THREAD UNSAFE: per-channel resource, access to channels is + * serialized. + */ +class Reference +{ + public: + typedef std::string Id; + typedef boost::shared_ptr<framing::MessageTransferBody> TransferPtr; + typedef std::vector<TransferPtr> Transfers; + typedef boost::shared_ptr<framing::MessageAppendBody> AppendPtr; + typedef std::vector<AppendPtr> Appends; + + Reference(const Id& id_=Id(), ReferenceRegistry* reg=0) + : id(id_), registry(reg) {} + + const std::string& getId() const { return id; } + + /** Add a transfer to be completed with this reference */ + void transfer(TransferPtr transfer) { transfers.push_back(transfer); } + + /** Append more data to the reference */ + void append(AppendPtr ptr) { appends.push_back(ptr); } + + /** Close the reference, complete each associated transfer */ + void close(); + + const Appends& getAppends() const { return appends; } + const Transfers& getTransfers() const { return transfers; } + + private: + void complete(TransferPtr transfer); + + Id id; + ReferenceRegistry* registry; + Transfers transfers; + Appends appends; +}; + + +/** + * A registry/factory for references. + * + * THREAD UNSAFE: per-channel resource, access to channels is + * serialized. + */ +class ReferenceRegistry { + public: + ReferenceRegistry(CompletionHandler& handler_) : handler(handler_) {}; + Reference& open(const Reference::Id& id); + Reference& get(const Reference::Id& id); + + private: + typedef std::map<Reference::Id, Reference> ReferenceMap; + CompletionHandler& handler; + ReferenceMap references; + + // Reference calls references.erase() and uses handler. + friend class Reference; +}; + + +}} // namespace qpid::broker + + + +#endif /*!_broker_Reference_h*/ diff --git a/cpp/lib/common/framing/MethodContext.h b/cpp/lib/common/framing/MethodContext.h index d9717d90a0..afb499023d 100644 --- a/cpp/lib/common/framing/MethodContext.h +++ b/cpp/lib/common/framing/MethodContext.h @@ -64,7 +64,10 @@ struct MethodContext }; // FIXME aconway 2007-02-01: Method context only required on Handler -// functions, not on Proxy functions. +// functions, not on Proxy functions. If we add set/getChannel(ChannelAdapter*) +// on AMQBody and set it during decodeing then we could get rid of the context. + + }} // namespace qpid::framing diff --git a/cpp/tests/ChannelTest.cpp b/cpp/tests/ChannelTest.cpp index a3dabe6408..dcfd2fdb90 100644 --- a/cpp/tests/ChannelTest.cpp +++ b/cpp/tests/ChannelTest.cpp @@ -28,7 +28,7 @@ #include <memory> #include <AMQP_HighestVersion.h> #include "AMQFrame.h" -#include "DummyChannel.h" +#include "MockChannel.h" #include "broker/Connection.h" #include "ProtocolInitiation.h" @@ -39,7 +39,7 @@ using namespace qpid::sys; using std::string; using std::queue; -struct DummyHandler : ConnectionOutputHandler{ +struct MockHandler : ConnectionOutputHandler{ std::vector<AMQFrame*> frames; void send(AMQFrame* frame){ frames.push_back(frame); } @@ -60,7 +60,7 @@ class ChannelTest : public CppUnit::TestCase Broker::shared_ptr broker; Connection connection; - DummyHandler handler; + MockHandler handler; class MockMessageStore : public NullMessageStore { @@ -240,10 +240,10 @@ class ChannelTest : public CppUnit::TestCase Channel channel( connection, 1, 1000/*framesize*/, &store, 10/*staging threshold*/); const string data[] = {"abcde", "fghij", "klmno"}; - + Message* msg = new BasicMessage( 0, "my_exchange", "my_routing_key", false, false, - DummyChannel::basicGetBody()); + MockChannel::basicGetBody()); store.expect(); store.stage(msg); @@ -253,7 +253,8 @@ class ChannelTest : public CppUnit::TestCase store.destroy(msg); store.test(); - Exchange::shared_ptr exchange(new FanOutExchange("my_exchange")); + Exchange::shared_ptr exchange = + broker->getExchanges().declare("my_exchange", "fanout").first; Queue::shared_ptr queue(new Queue("my_queue")); exchange->bind(queue, "", 0); @@ -333,7 +334,7 @@ class ChannelTest : public CppUnit::TestCase { BasicMessage* msg = new BasicMessage( 0, exchange, routingKey, false, false, - DummyChannel::basicGetBody()); + MockChannel::basicGetBody()); AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); header->setContentSize(contentSize); msg->setHeader(header); diff --git a/cpp/tests/InMemoryContentTest.cpp b/cpp/tests/InMemoryContentTest.cpp index db54ea44a0..c8433432e8 100644 --- a/cpp/tests/InMemoryContentTest.cpp +++ b/cpp/tests/InMemoryContentTest.cpp @@ -24,7 +24,7 @@ #include <iostream> #include <list> #include "AMQFrame.h" -#include "DummyChannel.h" +#include "MockChannel.h" using std::list; using std::string; @@ -58,7 +58,7 @@ public: void refragment(size_t inCount, string* in, size_t outCount, string* out, u_int32_t framesize = 5) { InMemoryContent content; - DummyChannel channel(3); + MockChannel channel(3); addframes(content, inCount, in); content.send(channel, framesize); diff --git a/cpp/tests/LazyLoadedContentTest.cpp b/cpp/tests/LazyLoadedContentTest.cpp index 49e4ecc4ae..365e4f6a11 100644 --- a/cpp/tests/LazyLoadedContentTest.cpp +++ b/cpp/tests/LazyLoadedContentTest.cpp @@ -26,7 +26,7 @@ #include <list> #include <sstream> #include "AMQFrame.h" -#include "DummyChannel.h" +#include "MockChannel.h" using std::list; using std::string; using boost::dynamic_pointer_cast; @@ -92,7 +92,7 @@ public: { TestMessageStore store(in); LazyLoadedContent content(&store, 0, in.size()); - DummyChannel channel(3); + MockChannel channel(3); content.send(channel, framesize); CPPUNIT_ASSERT_EQUAL(outCount, channel.out.frames.size()); diff --git a/cpp/tests/Makefile.am b/cpp/tests/Makefile.am index a2bbc07213..7a8480cded 100644 --- a/cpp/tests/Makefile.am +++ b/cpp/tests/Makefile.am @@ -9,12 +9,6 @@ INCLUDES = \ -I$(top_srcdir)/lib/common/framing \ $(APR_CXXFLAGS) -EXTRA_DIST = \ - topictest \ - qpid_test_plugin.h \ - MockConnectionInputHandler.h - - client_exe_tests = \ client_test \ echo_service \ @@ -32,6 +26,7 @@ broker_tests = \ MessageBuilderTest \ MessageHandlerTest \ MessageTest \ + ReferenceTest \ QueueRegistryTest \ QueueTest \ QueuePolicyTest \ @@ -69,7 +64,14 @@ noinst_PROGRAMS = $(client_exe_tests) CLIENT_TESTS = client_test quick_topictest TESTS = run-unit-tests start_broker $(CLIENT_TESTS) python_tests kill_broker -EXTRA_DIST += $(TESTS) topictest + +EXTRA_DIST = \ + $(TESTS) \ + topictest \ + qpid_test_plugin.h \ + MockConnectionInputHandler.h \ + MockChannel.h \ + InProcessBroker.h include gen.mk diff --git a/cpp/tests/MessageBuilderTest.cpp b/cpp/tests/MessageBuilderTest.cpp index dc660751b7..d1b9c6ee62 100644 --- a/cpp/tests/MessageBuilderTest.cpp +++ b/cpp/tests/MessageBuilderTest.cpp @@ -26,7 +26,7 @@ #include <qpid_test_plugin.h> #include <iostream> #include <memory> -#include "DummyChannel.h" +#include "MockChannel.h" using namespace boost; using namespace qpid::broker; @@ -35,10 +35,10 @@ using namespace qpid::sys; class MessageBuilderTest : public CppUnit::TestCase { - struct DummyHandler : MessageBuilder::CompletionHandler{ + struct MockHandler : CompletionHandler { Message::shared_ptr msg; - virtual void complete(Message::shared_ptr& _msg){ + virtual void complete(Message::shared_ptr _msg){ msg = _msg; } }; @@ -114,13 +114,13 @@ class MessageBuilderTest : public CppUnit::TestCase public: void testHeaderOnly(){ - DummyHandler handler; + MockHandler handler; MessageBuilder builder(&handler); Message::shared_ptr message( new BasicMessage( 0, "test", "my_routing_key", false, false, - DummyChannel::basicGetBody())); + MockChannel::basicGetBody())); AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); header->setContentSize(0); @@ -132,14 +132,14 @@ class MessageBuilderTest : public CppUnit::TestCase } void test1ContentFrame(){ - DummyHandler handler; + MockHandler handler; MessageBuilder builder(&handler); string data1("abcdefg"); Message::shared_ptr message( new BasicMessage(0, "test", "my_routing_key", false, false, - DummyChannel::basicGetBody())); + MockChannel::basicGetBody())); AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); header->setContentSize(7); AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); @@ -154,7 +154,7 @@ class MessageBuilderTest : public CppUnit::TestCase } void test2ContentFrames(){ - DummyHandler handler; + MockHandler handler; MessageBuilder builder(&handler); string data1("abcdefg"); @@ -162,7 +162,7 @@ class MessageBuilderTest : public CppUnit::TestCase Message::shared_ptr message( new BasicMessage(0, "test", "my_routing_key", false, false, - DummyChannel::basicGetBody())); + MockChannel::basicGetBody())); AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); header->setContentSize(14); AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); @@ -185,7 +185,7 @@ class MessageBuilderTest : public CppUnit::TestCase //loaded content is in use) TestMessageStore store(14); { - DummyHandler handler; + MockHandler handler; MessageBuilder builder(&handler, &store, 5); string data1("abcdefg"); @@ -193,7 +193,7 @@ class MessageBuilderTest : public CppUnit::TestCase Message::shared_ptr message( new BasicMessage(0, "test", "my_routing_key", false, false, - DummyChannel::basicGetBody())); + MockChannel::basicGetBody())); AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); header->setContentSize(14); BasicHeaderProperties* properties = dynamic_cast<BasicHeaderProperties*>(header->getProperties()); diff --git a/cpp/tests/MessageTest.cpp b/cpp/tests/MessageTest.cpp index 103a23f0df..2f49a28b83 100644 --- a/cpp/tests/MessageTest.cpp +++ b/cpp/tests/MessageTest.cpp @@ -23,7 +23,7 @@ #include <iostream> #include <AMQP_HighestVersion.h> #include "AMQFrame.h" -#include "DummyChannel.h" +#include "MockChannel.h" using namespace boost; using namespace qpid::broker; @@ -47,7 +47,7 @@ class MessageTest : public CppUnit::TestCase BasicMessage::shared_ptr msg( new BasicMessage(0, exchange, routingKey, false, false, - DummyChannel::basicGetBody())); + MockChannel::basicGetBody())); AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); header->setContentSize(14); AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); @@ -73,7 +73,7 @@ class MessageTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL(string("xyz"), msg->getHeaderProperties()->getHeaders().getString("abc")); CPPUNIT_ASSERT_EQUAL((u_int64_t) 14, msg->contentSize()); - DummyChannel channel(1); + MockChannel channel(1); // FIXME aconway 2007-02-02: deliver should take const ProtocolVersion& msg->deliver(channel, "ignore", 0, 100); CPPUNIT_ASSERT_EQUAL((size_t) 3, channel.out.frames.size()); diff --git a/cpp/tests/DummyChannel.h b/cpp/tests/MockChannel.h index 2b6bd9b2b9..10fcb56969 100644 --- a/cpp/tests/DummyChannel.h +++ b/cpp/tests/MockChannel.h @@ -1,5 +1,5 @@ -#ifndef _tests_DummyChannel_h -#define _tests_DummyChannel_h +#ifndef _tests_MockChannel_h +#define _tests_MockChannel_h /* * @@ -26,16 +26,16 @@ #include "framing/AMQFrame.h" #include "BasicGetBody.h" -/** Dummy output handler to collect frames */ -struct DummyOutputHandler : public qpid::framing::OutputHandler { +/** Mock output handler to collect frames */ +struct MockOutputHandler : public qpid::framing::OutputHandler { std::vector<qpid::framing::AMQFrame*> frames; void send(qpid::framing::AMQFrame* frame){ frames.push_back(frame); } }; /** - * Combination dummy OutputHandler and ChannelAdapter for tests. + * Combination mock OutputHandler and ChannelAdapter for tests. */ -struct DummyChannel : public qpid::framing::ChannelAdapter +struct MockChannel : public qpid::framing::ChannelAdapter { typedef qpid::framing::BasicGetBody Body; static Body::shared_ptr basicGetBody() { @@ -43,9 +43,9 @@ struct DummyChannel : public qpid::framing::ChannelAdapter new Body(qpid::framing::ProtocolVersion())); } - DummyOutputHandler out; + MockOutputHandler out; - DummyChannel(qpid::framing::ChannelId id) { + MockChannel(qpid::framing::ChannelId id) { init(id, out, qpid::framing::ProtocolVersion()); } @@ -66,4 +66,4 @@ struct DummyChannel : public qpid::framing::ChannelAdapter }; -#endif /*!_tests_DummyChannel_h*/ +#endif /*!_tests_MockChannel_h*/ diff --git a/cpp/tests/QueueTest.cpp b/cpp/tests/QueueTest.cpp index 7105509de6..59ca7728ca 100644 --- a/cpp/tests/QueueTest.cpp +++ b/cpp/tests/QueueTest.cpp @@ -22,7 +22,7 @@ #include <QueueRegistry.h> #include <qpid_test_plugin.h> #include <iostream> -#include "DummyChannel.h" +#include "MockChannel.h" using namespace qpid::broker; using namespace qpid::sys; @@ -58,7 +58,7 @@ class QueueTest : public CppUnit::TestCase Message::shared_ptr message(std::string exchange, std::string routingKey) { return Message::shared_ptr( new BasicMessage(0, exchange, routingKey, true, true, - DummyChannel::basicGetBody())); + MockChannel::basicGetBody())); } void testConsumers(){ diff --git a/cpp/tests/ReferenceTest.cpp b/cpp/tests/ReferenceTest.cpp new file mode 100644 index 0000000000..b50511f724 --- /dev/null +++ b/cpp/tests/ReferenceTest.cpp @@ -0,0 +1,116 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <iostream> +#include <memory> +#include "qpid_test_plugin.h" +#include "Reference.h" +#include "BrokerMessageMessage.h" +#include "MessageTransferBody.h" +#include "MessageAppendBody.h" +#include "CompletionHandler.h" + +using namespace boost; +using namespace qpid::broker; +using namespace qpid::framing; +using namespace std; + +class ReferenceTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(ReferenceTest); + CPPUNIT_TEST(testRegistry); + CPPUNIT_TEST(testReference); + CPPUNIT_TEST_SUITE_END(); + + + struct MockCompletionHandler : public CompletionHandler { + std::vector<Message::shared_ptr> messages; + void complete(Message::shared_ptr msg) { messages.push_back(msg); } + }; + + MockCompletionHandler handler; + ProtocolVersion v; + ReferenceRegistry registry; + MessageTransferBody::shared_ptr t1, t2; + MessageAppendBody::shared_ptr a1, a2; + public: + + ReferenceTest() : + registry(handler), + t1(new MessageTransferBody(v)), + t2(new MessageTransferBody(v)), + a1(new MessageAppendBody(v)), + a2(new MessageAppendBody(v)) + {} + + void testRegistry() { + Reference& ref = registry.open("foo"); + CPPUNIT_ASSERT_EQUAL(string("foo"), ref.getId()); + CPPUNIT_ASSERT(&ref == ®istry.get("foo")); + try { + registry.get("none"); + CPPUNIT_FAIL("Expected exception"); + } catch (...) {} + try { + registry.open("foo"); + CPPUNIT_FAIL("Expected exception"); + } catch(...) {} + } + + MessageMessage& handlerMessage(size_t i) { + CPPUNIT_ASSERT(handler.messages.size() > i); + MessageMessage* msg = dynamic_cast<MessageMessage*>( + handler.messages[i].get()); + CPPUNIT_ASSERT(msg); + return *msg; + } + + void testReference() { + Reference& ref = registry.open("foo"); + ref.transfer(t1); + ref.transfer(t2); + CPPUNIT_ASSERT_EQUAL(size_t(2), ref.getTransfers().size()); + ref.append(a1); + ref.append(a2); + CPPUNIT_ASSERT_EQUAL(size_t(2), ref.getAppends().size()); + ref.close(); + try { + registry.open("foo"); + CPPUNIT_FAIL("Expected exception"); + } catch(...) {} + + vector<Message::shared_ptr>& messages = handler.messages; + CPPUNIT_ASSERT_EQUAL(size_t(2), messages.size()); + + CPPUNIT_ASSERT_EQUAL(handlerMessage(0).getTransfer(), t1); + CPPUNIT_ASSERT_EQUAL(handlerMessage(0).getAppends()[0], a1); + CPPUNIT_ASSERT_EQUAL(handlerMessage(0).getAppends()[1], a2); + + CPPUNIT_ASSERT_EQUAL(handlerMessage(1).getTransfer(), t2); + CPPUNIT_ASSERT_EQUAL(handlerMessage(1).getAppends()[0], a1); + CPPUNIT_ASSERT_EQUAL(handlerMessage(1).getAppends()[1], a2); + } + + +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(ReferenceTest); diff --git a/cpp/tests/TxAckTest.cpp b/cpp/tests/TxAckTest.cpp index 07381d187d..e464ff78f4 100644 --- a/cpp/tests/TxAckTest.cpp +++ b/cpp/tests/TxAckTest.cpp @@ -25,7 +25,7 @@ #include <iostream> #include <list> #include <vector> -#include "DummyChannel.h" +#include "MockChannel.h" using std::list; using std::vector; @@ -72,7 +72,7 @@ public: for(int i = 0; i < 10; i++){ Message::shared_ptr msg( new BasicMessage(0, "exchange", "routing_key", false, false, - DummyChannel::basicGetBody())); + MockChannel::basicGetBody())); msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC))); msg->getHeaderProperties()->setDeliveryMode(PERSISTENT); messages.push_back(msg); diff --git a/cpp/tests/TxPublishTest.cpp b/cpp/tests/TxPublishTest.cpp index 08e5339048..39d27a754d 100644 --- a/cpp/tests/TxPublishTest.cpp +++ b/cpp/tests/TxPublishTest.cpp @@ -25,7 +25,7 @@ #include <iostream> #include <list> #include <vector> -#include "DummyChannel.h" +#include "MockChannel.h" using std::list; using std::pair; @@ -78,7 +78,7 @@ public: queue1(new Queue("queue1", false, &store, 0)), queue2(new Queue("queue2", false, &store, 0)), msg(new BasicMessage(0, "exchange", "routing_key", false, false, - DummyChannel::basicGetBody())), + MockChannel::basicGetBody())), op(msg, &xid) { msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC))); diff --git a/cpp/tests/start_broker b/cpp/tests/start_broker index 05510b17ac..fe30458463 100755 --- a/cpp/tests/start_broker +++ b/cpp/tests/start_broker @@ -11,4 +11,4 @@ rm -rf $LOG $PID # FIXME aconway 2007-01-18: qpidd should not return till it is accepting # connections, remove arbitrary sleep. -sleep 2 +sleep 5 |
