diff options
Diffstat (limited to 'cpp')
27 files changed, 530 insertions, 392 deletions
| diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp index fa25221bbd..6f55f32d47 100644 --- a/cpp/lib/broker/BrokerAdapter.cpp +++ b/cpp/lib/broker/BrokerAdapter.cpp @@ -355,245 +355,5 @@ BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::resume(      assert(0);                // FIXME aconway 2007-01-04: 0-9 feature  } - -// -// Message class method handlers -// -void -BrokerAdapter::MessageHandlerImpl::append( u_int16_t /*channel*/, -                                                const string& /*reference*/, -                                                const string& /*bytes*/ ) -{ -    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature -} - - -void -BrokerAdapter::MessageHandlerImpl::cancel( u_int16_t channel, -                                                const string& destination ) -{ -    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature - -    connection.getChannel(channel).cancel(destination); - -    connection.client->getMessageHandler()->ok(channel); -} - -void -BrokerAdapter::MessageHandlerImpl::checkpoint( u_int16_t /*channel*/, -                                                    const string& /*reference*/, -                                                    const string& /*identifier*/ ) -{ -    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -BrokerAdapter::MessageHandlerImpl::close( u_int16_t /*channel*/, -                                               const string& /*reference*/ ) -{ -    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -BrokerAdapter::MessageHandlerImpl::consume( u_int16_t channelId, -                    u_int16_t /*ticket*/, -                    const string& queueName, -                    const string& destination, -                    bool noLocal, -                    bool noAck, -                    bool exclusive, -                    const qpid::framing::FieldTable& filter ) -{ -    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature -	 -    Queue::shared_ptr queue = connection.getQueue(queueName, channelId);     -    Channel& channel = connection.getChannel(channelId); -    if(!destination.empty() && channel.exists(destination)){ -        throw ConnectionException(530, "Consumer tags must be unique"); -    } - -    try{ -        string newTag = destination; -        channel.consume(newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter); - -	    connection.client->getMessageHandler()->ok(channelId); - -        //allow messages to be dispatched if required as there is now a consumer: -        queue->dispatch(); -    }catch(ExclusiveAccessException& e){ -        if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted"); -        else throw ChannelException(403, "Access would violate previously granted exclusivity"); -    } - -    connection.getChannel(channel).cancel(destination); - -    connection.client->getMessageHandler()->ok(channel); -} - -void -BrokerAdapter::MessageHandlerImpl::empty( u_int16_t /*channel*/ ) -{ -    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -BrokerAdapter::MessageHandlerImpl::get( u_int16_t channelId, -                                             u_int16_t /*ticket*/, -                                             const string& queueName, -                                             const string& /*destination*/, -                                             bool noAck ) -{ -    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature - -    Queue::shared_ptr queue = connection.getQueue(queueName, channelId); -     -    // FIXME: get is probably Basic specific -    if(!connection.getChannel(channelId).get(queue, !noAck)){ - -        connection.client->getMessageHandler()->empty(channelId); -    } -     -} - -void -BrokerAdapter::MessageHandlerImpl::offset( u_int16_t /*channel*/, -                                                u_int64_t /*value*/ ) -{ -    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature -	 -    Queue::shared_ptr queue = connection.getQueue(queueName, channelId);     -    Channel& channel = connection.getChannel(channelId); -    if(!destination.empty() && channel.exists(destination)){ -        throw ConnectionException(530, "Consumer tags must be unique"); -    } - -    try{ -        string newTag = destination; -        channel.consume(newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter); - -	    connection.client->getMessageHandler()->ok(channelId); - -        //allow messages to be dispatched if required as there is now a consumer: -        queue->dispatch(); -    }catch(ExclusiveAccessException& e){ -        if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted"); -        else throw ChannelException(403, "Access would violate previously granted exclusivity"); -    } - -    connection.getChannel(channel).cancel(destination); - -    connection.client->getMessageHandler()->ok(channel); -} - -void -BrokerAdapter::MessageHandlerImpl::ok( u_int16_t /*channel*/ ) -{ -    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -BrokerAdapter::MessageHandlerImpl::open( u_int16_t /*channel*/, -                                              const string& /*reference*/ ) -{ -    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -BrokerAdapter::MessageHandlerImpl::qos( u_int16_t channel, -                    u_int32_t prefetchSize, -                    u_int16_t prefetchCount, -                    bool /*global*/ ) -{ -    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature -     -    //TODO: handle global -    connection.getChannel(channel).setPrefetchSize(prefetchSize); -    connection.getChannel(channel).setPrefetchCount(prefetchCount); -     -    connection.client->getMessageHandler()->ok(channel); -	 -    Queue::shared_ptr queue = connection.getQueue(queueName, channelId);     -    Channel& channel = connection.getChannel(channelId); -    if(!destination.empty() && channel.exists(destination)){ -        throw ConnectionException(530, "Consumer tags must be unique"); -    } - -    try{ -        string newTag = destination; -        channel.consume(newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter); - -	    connection.client->getMessageHandler()->ok(channelId); - -        //allow messages to be dispatched if required as there is now a consumer: -        queue->dispatch(); -    }catch(ExclusiveAccessException& e){ -        if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted"); -        else throw ChannelException(403, "Access would violate previously granted exclusivity"); -    } -} - -void -BrokerAdapter::MessageHandlerImpl::recover( u_int16_t channel, -                                                 bool requeue ) -{ -    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature - -    connection.getChannel(channel).recover(requeue); -     -} - -void -BrokerAdapter::MessageHandlerImpl::reject( u_int16_t /*channel*/, -                                                u_int16_t /*code*/, -                                                const string& /*text*/ ) -{ -    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -BrokerAdapter::MessageHandlerImpl::resume( u_int16_t /*channel*/, -                                                const string& /*reference*/, -                                                const string& /*identifier*/ ) -{ -    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -BrokerAdapter::MessageHandlerImpl::transfer( u_int16_t channel, -                    u_int16_t /*ticket*/, -                    const string& /*destination*/, -                    bool /*redelivered*/, -                    bool immediate, -                    u_int64_t /*ttl*/, -                    u_int8_t /*priority*/, -                    u_int64_t /*timestamp*/, -                    u_int8_t /*deliveryMode*/, -                    u_int64_t /*expiration*/, -                    const string& exchangeName, -                    const string& routingKey, -                    const string& /*messageId*/, -                    const string& /*correlationId*/, -                    const string& /*replyTo*/, -                    const string& /*contentType*/, -                    const string& /*contentEncoding*/, -                    const string& /*userId*/, -                    const string& /*appId*/, -                    const string& /*transactionId*/, -                    const string& /*securityToken*/, -                    const qpid::framing::FieldTable& /*applicationHeaders*/, -                    qpid::framing::Content /*body*/ ) -{ -	assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature - -	Exchange::shared_ptr exchange = exchangeName.empty() ? -		connection.broker.getExchanges().getDefault() : connection.broker.getExchanges().get(exchangeName); -	if(exchange){ -	    Message* msg = new Message(&connection, exchangeName, routingKey, false /*mandatory?*/, immediate); -	    connection.getChannel(channel).handlePublish(msg, exchange); -	}else{ -	    throw ChannelException(404, "Exchange not found '" + exchangeName + "'"); -	} -} -  }} // namespace qpid::broker diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp index 96215a60ed..c0250815e8 100644 --- a/cpp/lib/broker/BrokerChannel.cpp +++ b/cpp/lib/broker/BrokerChannel.cpp @@ -78,7 +78,7 @@ void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks,                        bool exclusive, ConnectionToken* const connection,                        const FieldTable*)  { -	if(tag.empty()) tag = tagGenerator.generate(); +    if(tag.empty()) tag = tagGenerator.generate();      ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks));      try{          queue->consume(c, exclusive);//may throw exception @@ -187,6 +187,8 @@ void Channel::ConsumerImpl::requestDispatch(){      if(blocked) queue->dispatch();  } +// FIXME aconway 2007-02-05: Drop exchange member, calculate from +// message in ::complete().  void Channel::handlePublish(Message* _message, Exchange::shared_ptr _exchange){      Message::shared_ptr message(_message);      exchange = _exchange; @@ -207,19 +209,19 @@ void Channel::handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>) {      // TODO aconway 2007-01-17: Implement heartbeating.  } -void Channel::complete(Message::shared_ptr& msg){ -    if(exchange){ -        if(transactional){ -            TxPublish* deliverable = new TxPublish(msg); -            exchange->route(*deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders())); -            txBuffer.enlist(new DeletingTxOp(deliverable)); -        }else{ -            DeliverableMessage deliverable(msg); -            exchange->route(deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders())); -        } -        exchange.reset(); -    }else{ -        std::cout << "Exchange not known in Channel::complete(Message::shared_ptr&)" << std::endl; +void Channel::complete(Message::shared_ptr msg) { +    Exchange::shared_ptr exchange = +        connection.broker.getExchanges().get(msg->getExchange()); +    assert(exchange.get()); +    if(transactional) { +        std::auto_ptr<TxPublish> deliverable(new TxPublish(msg)); +        exchange->route(*deliverable, msg->getRoutingKey(), +                        &(msg->getHeaderProperties()->getHeaders())); +        txBuffer.enlist(new DeletingTxOp(deliverable.release())); +    } else { +        DeliverableMessage deliverable(msg); +        exchange->route(deliverable, msg->getRoutingKey(), +                        &(msg->getHeaderProperties()->getHeaders()));      }  } diff --git a/cpp/lib/broker/BrokerChannel.h b/cpp/lib/broker/BrokerChannel.h index dd95e944bb..484a4d64e3 100644 --- a/cpp/lib/broker/BrokerChannel.h +++ b/cpp/lib/broker/BrokerChannel.h @@ -36,6 +36,7 @@  #include <Prefetch.h>  #include <TxBuffer.h>  #include "framing/ChannelAdapter.h" +#include "CompletionHandler.h"  namespace qpid {  namespace broker { @@ -51,9 +52,8 @@ using framing::string;   * Maintains state for an AMQP channel. Handles incoming and   * outgoing messages for that channel.   */ -class Channel : -        public framing::ChannelAdapter, -        private MessageBuilder::CompletionHandler +class Channel : public framing::ChannelAdapter, +                public CompletionHandler  {      class ConsumerImpl : public virtual Consumer      { @@ -96,7 +96,7 @@ class Channel :      boost::scoped_ptr<BrokerAdapter> adapter; -    virtual void complete(Message::shared_ptr& msg); +    virtual void complete(Message::shared_ptr msg);      void deliver(Message::shared_ptr& msg, const string& tag, Queue::shared_ptr& queue, bool ackExpected);                  void cancel(consumer_iterator consumer);      bool checkPrefetch(Message::shared_ptr& msg); diff --git a/cpp/lib/broker/BrokerMessageMessage.cpp b/cpp/lib/broker/BrokerMessageMessage.cpp index 4168ff639c..e2c4b94811 100644 --- a/cpp/lib/broker/BrokerMessageMessage.cpp +++ b/cpp/lib/broker/BrokerMessageMessage.cpp @@ -18,25 +18,39 @@   * under the License.   *   */ +#include <iostream>  #include "BrokerMessageMessage.h" +#include "MessageTransferBody.h" +#include "MessageAppendBody.h" +#include "Reference.h" +using namespace std;  using namespace qpid::broker; -MessageMessage::MessageMessage( -    const qpid::framing::AMQMethodBody::shared_ptr _methodBody,  -    const std::string& _exchange, const std::string& _routingKey,  -    bool _mandatory, bool _immediate) : -    Message(_exchange, _routingKey, _mandatory, _immediate, _methodBody), -    methodBody(_methodBody) -{ -} +MessageMessage::MessageMessage(TransferPtr transfer_) +    : Message(transfer_->getExchange(), transfer_->getRoutingKey(), +              transfer_->getMandatory(), transfer_->getImmediate(), +              transfer_), +      transfer(transfer_) +{} + +MessageMessage::MessageMessage(TransferPtr transfer_, const Reference& ref) +    : Message(transfer_->getExchange(), transfer_->getRoutingKey(), +              transfer_->getMandatory(), transfer_->getImmediate(), +              transfer_), +      transfer(transfer_), +      appends(ref.getAppends()) +{}  void MessageMessage::deliver( -    framing::ChannelAdapter& /*out*/,  +    framing::ChannelAdapter& /*channel*/,      const std::string& /*consumerTag*/,       u_int64_t /*deliveryTag*/,       u_int32_t /*framesize*/)  { +    // FIXME aconway 2007-02-05: +    cout << "MessageMessage::deliver" << *transfer << " + " << appends.size() +         << " appends." << endl;  }  void MessageMessage::sendGetOk( @@ -45,49 +59,50 @@ void MessageMessage::sendGetOk(      u_int64_t /*deliveryTag*/,       u_int32_t /*framesize*/)  { +    // FIXME aconway 2007-02-05:   }  bool MessageMessage::isComplete()  { -	return true; +    return true;               // FIXME aconway 2007-02-05:   }  u_int64_t MessageMessage::contentSize() const  { -	return 0; +    return 0;               // FIXME aconway 2007-02-05:   }  qpid::framing::BasicHeaderProperties* MessageMessage::getHeaderProperties()  { -	return 0; +    return 0;               // FIXME aconway 2007-02-05:   }  bool MessageMessage::isPersistent()  { -	return false; +    return false;               // FIXME aconway 2007-02-05:   }  const ConnectionToken* const MessageMessage::getPublisher()  { -	return 0; +    return 0;               // FIXME aconway 2007-02-05:   }  u_int32_t MessageMessage::encodedSize()  { -	return 0; +    return 0;               // FIXME aconway 2007-02-05:   }  u_int32_t MessageMessage::encodedHeaderSize()  { -	return 0; +    return 0;               // FIXME aconway 2007-02-05:   }  u_int32_t MessageMessage::encodedContentSize()  { -	return 0; +    return 0;               // FIXME aconway 2007-02-05:   }  u_int64_t MessageMessage::expectedContentSize()  { -	return 0; +    return 0;               // FIXME aconway 2007-02-05:   } diff --git a/cpp/lib/broker/BrokerMessageMessage.h b/cpp/lib/broker/BrokerMessageMessage.h index cad5cf15b0..aa136863a1 100644 --- a/cpp/lib/broker/BrokerMessageMessage.h +++ b/cpp/lib/broker/BrokerMessageMessage.h @@ -21,23 +21,28 @@   * under the License.   *   */ - +#include <vector>  #include "BrokerMessageBase.h" +#include "Reference.h"           namespace qpid { +  namespace framing { -class AMQMethodBody; +class MessageTransferBody; +class MessageApppendBody;  }  namespace broker { -class MessageMessage: public Message{ -    const qpid::framing::AMQMethodBody::shared_ptr methodBody; +class Reference; +class MessageMessage: public Message{    public: -    MessageMessage( -        const framing::AMQMethodBody::shared_ptr methodBody,  -        const std::string& exchange, const std::string& routingKey,  -        bool mandatory, bool immediate); +    typedef Reference::TransferPtr TransferPtr; +    typedef Reference::AppendPtr AppendPtr; +    typedef  Reference::Appends Appends; + +    MessageMessage(TransferPtr transfer); +    MessageMessage(TransferPtr transfer, const Reference&);      // Default destructor okay @@ -52,7 +57,7 @@ class MessageMessage: public Message{                     u_int32_t framesize);      bool isComplete(); -             +      u_int64_t contentSize() const;      qpid::framing::BasicHeaderProperties* getHeaderProperties();      bool isPersistent(); @@ -62,10 +67,16 @@ class MessageMessage: public Message{      u_int32_t encodedHeaderSize();      u_int32_t encodedContentSize();      u_int64_t expectedContentSize(); + +    TransferPtr getTransfer() { return transfer; } +    const Appends& getAppends() { return appends; } +  private: + +    const TransferPtr transfer; +    const Appends appends;  }; -} -} +}}  #endif  /*!_broker_BrokerMessage_h*/ diff --git a/cpp/lib/broker/CompletionHandler.h b/cpp/lib/broker/CompletionHandler.h new file mode 100644 index 0000000000..9d51656282 --- /dev/null +++ b/cpp/lib/broker/CompletionHandler.h @@ -0,0 +1,39 @@ +#ifndef _broker_CompletionHandler_h +#define _broker_CompletionHandler_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *    http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +namespace qpid { +namespace broker { + +/** + * Callback interface to handle completion of a message. + */ +class CompletionHandler +{ +  public: +    virtual ~CompletionHandler(){} +    virtual void complete(Message::shared_ptr) = 0; +}; + +}} // namespace qpid::broker + + + +#endif  /*!_broker_CompletionHandler_h*/ diff --git a/cpp/lib/broker/ExchangeRegistry.cpp b/cpp/lib/broker/ExchangeRegistry.cpp index 7bf96c4544..3e5ed89b54 100644 --- a/cpp/lib/broker/ExchangeRegistry.cpp +++ b/cpp/lib/broker/ExchangeRegistry.cpp @@ -59,7 +59,10 @@ void ExchangeRegistry::destroy(const string& name){  Exchange::shared_ptr ExchangeRegistry::get(const string& name){      Mutex::ScopedLock locker(lock); -    return exchanges[name]; +    Exchange::shared_ptr exchange =exchanges[name]; +    if (!exchange)  +        throw ChannelException(404, "Exchange not found:" + name); +    return exchange;  }  namespace  diff --git a/cpp/lib/broker/Makefile.am b/cpp/lib/broker/Makefile.am index 064b592124..760c6d61e2 100644 --- a/cpp/lib/broker/Makefile.am +++ b/cpp/lib/broker/Makefile.am @@ -69,6 +69,8 @@ libqpidbroker_la_SOURCES =				\    QueueRegistry.h				\    RecoveryManager.cpp				\    RecoveryManager.h				\ +  Reference.cpp					\ +  Reference.h					\    ConnectionFactory.cpp				\    ConnectionFactory.h				\    Connection.cpp				\ diff --git a/cpp/lib/broker/MessageBuilder.cpp b/cpp/lib/broker/MessageBuilder.cpp index 41bf812d2d..69e771c793 100644 --- a/cpp/lib/broker/MessageBuilder.cpp +++ b/cpp/lib/broker/MessageBuilder.cpp @@ -27,7 +27,10 @@ using namespace qpid::broker;  using namespace qpid::framing;  using std::auto_ptr; -MessageBuilder::MessageBuilder(CompletionHandler* _handler, MessageStore* const _store, u_int64_t _stagingThreshold) :  +MessageBuilder::MessageBuilder(CompletionHandler* _handler, +                               MessageStore* const _store, +                               u_int64_t _stagingThreshold +) :       handler(_handler),      store(_store),      stagingThreshold(_stagingThreshold) diff --git a/cpp/lib/broker/MessageBuilder.h b/cpp/lib/broker/MessageBuilder.h index 5b8516be42..f0b90a86cd 100644 --- a/cpp/lib/broker/MessageBuilder.h +++ b/cpp/lib/broker/MessageBuilder.h @@ -29,22 +29,19 @@  #include <AMQContentBody.h>  #include <AMQHeaderBody.h>  #include <BasicPublishBody.h> +#include "CompletionHandler.h"  namespace qpid {      namespace broker {          class MessageBuilder{          public: -            class CompletionHandler{ -            public: -                virtual void complete(Message::shared_ptr&) = 0; -                virtual ~CompletionHandler(){} -            };              MessageBuilder(CompletionHandler* _handler,                             MessageStore* const store = 0,                             u_int64_t stagingThreshold = 0);              void initialise(Message::shared_ptr& msg); -            void setHeader(qpid::framing::AMQHeaderBody::shared_ptr& header); -            void addContent(qpid::framing::AMQContentBody::shared_ptr& content); +            void setHeader(framing::AMQHeaderBody::shared_ptr& header); +            void addContent(framing::AMQContentBody::shared_ptr& content); +            Message::shared_ptr getMessage() { return message; }          private:              Message::shared_ptr message;              CompletionHandler* handler; diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp index 71100996e7..30b69e4654 100644 --- a/cpp/lib/broker/MessageHandlerImpl.cpp +++ b/cpp/lib/broker/MessageHandlerImpl.cpp @@ -23,6 +23,8 @@  #include "Connection.h"  #include "Broker.h"  #include "BrokerMessageMessage.h" +#include "MessageAppendBody.h" +#include "MessageTransferBody.h"  namespace qpid {  namespace broker { @@ -33,23 +35,23 @@ using namespace framing;  // Message class method handlers  //  void -MessageHandlerImpl::append(const MethodContext&, -                           const string& /*reference*/, +MessageHandlerImpl::append(const MethodContext& context, +                           const string& reference,                             const string& /*bytes*/ )  { -    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature +    references.get(reference).append( +        boost::shared_polymorphic_downcast<MessageAppendBody>( +            context.methodBody)); +    sendOk(context);  }  void -MessageHandlerImpl::cancel( const MethodContext& context, -                            const string& destination ) +MessageHandlerImpl::cancel(const MethodContext& context, +                           const string& destination )  { -    //assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature -      channel.cancel(destination); - -    connection.client->getMessageHandler()->ok(context); +    sendOk(context);  }  void @@ -61,10 +63,11 @@ MessageHandlerImpl::checkpoint(const MethodContext&,  }  void -MessageHandlerImpl::close(const MethodContext&, -                          const string& /*reference*/ ) +MessageHandlerImpl::close(const MethodContext& context, +                          const string& reference)  { -    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature +    references.get(reference).close(); +    sendOk(context);  }  void @@ -88,13 +91,16 @@ MessageHandlerImpl::consume(const MethodContext& context,          string newTag = destination;          channel.consume(newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter); -        connection.client->getMessageHandler()->ok(context); +        sendOk(context);          //allow messages to be dispatched if required as there is now a consumer:          queue->dispatch();      }catch(ExclusiveAccessException& e){ -        if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted"); -        else throw ChannelException(403, "Access would violate previously granted exclusivity"); +        if(exclusive) +            throw ChannelException(403, "Exclusive access cannot be granted"); +        else +            throw ChannelException( +                403, "Access would violate previously granted exclusivity");      }  } @@ -133,14 +139,15 @@ MessageHandlerImpl::offset(const MethodContext&,  void  MessageHandlerImpl::ok( const MethodContext& )  { -    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature +    // TODO aconway 2007-02-05: For HA, we can drop acked messages here.  }  void -MessageHandlerImpl::open(const MethodContext&, -                         const string& /*reference*/ ) +MessageHandlerImpl::open(const MethodContext& context, +                         const string& reference)  { -    assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature +    references.open(reference); +    sendOk(context);  }  void @@ -155,7 +162,7 @@ MessageHandlerImpl::qos(const MethodContext& context,      channel.setPrefetchSize(prefetchSize);      channel.setPrefetchCount(prefetchCount); -    connection.client->getMessageHandler()->ok(context); +    sendOk(context);  }  void @@ -189,14 +196,14 @@ MessageHandlerImpl::transfer(const MethodContext& context,                               u_int16_t /*ticket*/,                               const string& /*destination*/,                               bool /*redelivered*/, -                             bool immediate, +                             bool /* immediate */,                               u_int64_t /*ttl*/,                               u_int8_t /*priority*/,                               u_int64_t /*timestamp*/,                               u_int8_t /*deliveryMode*/,                               u_int64_t /*expiration*/,                               const string& exchangeName, -                             const string& routingKey, +                             const string& /* routingKey */,                               const string& /*messageId*/,                               const string& /*correlationId*/,                               const string& /*replyTo*/, @@ -208,27 +215,28 @@ MessageHandlerImpl::transfer(const MethodContext& context,                               const string& /*securityToken*/,                               const qpid::framing::FieldTable& /*applicationHeaders*/,                               qpid::framing::Content body, -                             bool mandatory ) +                             bool /* mandatory */ )  {      //assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature - -    Exchange::shared_ptr exchange = exchangeName.empty() ? -        broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName); -    if(exchange){ -    	if (body.isInline()) { -            MessageMessage* msg = -                new MessageMessage(context.methodBody, exchangeName, -                                   routingKey, mandatory, immediate); -            channel.handlePublish(msg, exchange); -         -            connection.client->getMessageHandler()->ok(context); -    	} else { -            // Don't handle reference content yet -            assert(body.isInline()); -    	} -    }else{ -        throw ChannelException(404, "Exchange not found '" + exchangeName + "'"); +    MessageTransferBody::shared_ptr transfer = +        boost::shared_polymorphic_downcast<MessageTransferBody>( +            context.methodBody); +    // Verify the exchange exists, will throw if not. +    broker.getExchanges().get(exchangeName); +    if (body.isInline()) { +        MessageMessage* msg = new MessageMessage(transfer); +        // FIXME aconway 2007-02-05: Remove exchange parameter. +        // use shared_ptr for message. +        channel.handlePublish(msg, Exchange::shared_ptr()); +        sendOk(context); +    } else { +        references.get(body.getValue()).transfer(transfer);      }  } + +void MessageHandlerImpl::sendOk(const MethodContext& context) { +    connection.client->getMessageHandler()->ok(context); +} +  }} // namespace qpid::broker diff --git a/cpp/lib/broker/MessageHandlerImpl.h b/cpp/lib/broker/MessageHandlerImpl.h index 985efe3847..886ca5fb54 100644 --- a/cpp/lib/broker/MessageHandlerImpl.h +++ b/cpp/lib/broker/MessageHandlerImpl.h @@ -19,23 +19,25 @@   *   */ +#include <memory> +  #include "AMQP_ServerOperations.h" +#include "Reference.h" +#include "BrokerChannel.h"  namespace qpid {  namespace broker { -class Channel;  class Connection;  class Broker; +class MessageMessage; -class MessageHandlerImpl : public qpid::framing::AMQP_ServerOperations::MessageHandler { -    Channel& channel; -    Connection& connection; -    Broker& broker; - +class MessageHandlerImpl : +        public framing::AMQP_ServerOperations::MessageHandler +{    public:      MessageHandlerImpl(Channel& ch, Connection& c, Broker& b) -        : channel(ch), connection(c), broker(b) {} +        : channel(ch), connection(c), broker(b), references(ch) {}      void append(const framing::MethodContext&,                   const std::string& reference, @@ -116,6 +118,13 @@ class MessageHandlerImpl : public qpid::framing::AMQP_ServerOperations::MessageH                     const framing::FieldTable& applicationHeaders,                     framing::Content body,                     bool mandatory ); +  private: +    void sendOk(const framing::MethodContext&); +     +    Channel& channel; +    Connection& connection; +    Broker& broker; +    ReferenceRegistry references;  };  }} // namespace qpid::broker diff --git a/cpp/lib/broker/Reference.cpp b/cpp/lib/broker/Reference.cpp new file mode 100644 index 0000000000..a5e734d77a --- /dev/null +++ b/cpp/lib/broker/Reference.cpp @@ -0,0 +1,56 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *    http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <boost/bind.hpp> +#include "Reference.h" +#include "BrokerMessageMessage.h" +#include "QpidError.h" +#include "CompletionHandler.h" + +namespace qpid { +namespace broker { + +Reference&  ReferenceRegistry::open(const Reference::Id& id) { +    ReferenceMap::iterator i = references.find(id); +    // TODO aconway 2007-02-05: should we throw Channel or Connection +    // exceptions here? +    if (i != references.end()) +        THROW_QPID_ERROR(CLIENT_ERROR, "Attempt to re-open reference " +id); +    return references[id] = Reference(id, this); +} + +Reference&  ReferenceRegistry::get(const Reference::Id& id) { +    ReferenceMap::iterator i = references.find(id); +    if (i == references.end())  +        THROW_QPID_ERROR( +            CLIENT_ERROR, "Attempt to use non-existent reference "+id); +    return i->second; +} + +void  Reference::close() { +    for_each(transfers.begin(), transfers.end(), +             boost::bind(&Reference::complete, this, _1)); +    registry->references.erase(getId()); +} + +void Reference::complete(TransferPtr transfer) { +    MessageMessage::shared_ptr msg(new MessageMessage(transfer, *this)); +    registry->handler.complete(msg); +} + +}} // namespace qpid::broker diff --git a/cpp/lib/broker/Reference.h b/cpp/lib/broker/Reference.h new file mode 100644 index 0000000000..ecaca3de41 --- /dev/null +++ b/cpp/lib/broker/Reference.h @@ -0,0 +1,111 @@ +#ifndef _broker_Reference_h +#define _broker_Reference_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *    http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <string> +#include <vector> +#include <map> +#include <boost/shared_ptr.hpp> +#include <boost/range.hpp> + +namespace qpid { + +namespace framing { +class MessageTransferBody; +class MessageAppendBody; +} + +namespace broker { + +class CompletionHandler; +class ReferenceRegistry; + +/** + * A reference is an accumulation point for data in a multi-frame + * message. A reference can be used by multiple transfer commands, so + * the reference tracks which commands are using it. When the reference + * is closed, all the associated transfers are completed. + * + * THREAD UNSAFE: per-channel resource, access to channels is + * serialized. + */ +class Reference +{ +  public: +    typedef std::string Id; +    typedef boost::shared_ptr<framing::MessageTransferBody> TransferPtr; +    typedef std::vector<TransferPtr> Transfers; +    typedef boost::shared_ptr<framing::MessageAppendBody> AppendPtr; +    typedef std::vector<AppendPtr> Appends; + +    Reference(const Id& id_=Id(), ReferenceRegistry* reg=0) +        : id(id_), registry(reg) {} +     +    const std::string& getId() const { return id; } + +    /** Add a transfer to be completed with this reference */ +    void transfer(TransferPtr transfer) { transfers.push_back(transfer); } + +    /** Append more data to the reference */ +    void append(AppendPtr ptr) { appends.push_back(ptr); } + +    /** Close the reference, complete each associated transfer */ +    void close(); + +    const Appends& getAppends() const { return appends; } +    const Transfers& getTransfers() const { return transfers; } +     +  private: +    void complete(TransferPtr transfer); +     +    Id id; +    ReferenceRegistry* registry; +    Transfers transfers; +    Appends appends; +}; + + +/** + * A registry/factory for references. + *  + * THREAD UNSAFE: per-channel resource, access to channels is + * serialized. + */ +class ReferenceRegistry { +  public: +    ReferenceRegistry(CompletionHandler& handler_) : handler(handler_) {}; +    Reference& open(const Reference::Id& id); +    Reference& get(const Reference::Id& id); + +  private: +    typedef std::map<Reference::Id, Reference> ReferenceMap; +    CompletionHandler& handler; +    ReferenceMap references; + +    // Reference calls references.erase() and uses handler. +  friend class Reference; +}; + + +}} // namespace qpid::broker + + + +#endif  /*!_broker_Reference_h*/ diff --git a/cpp/lib/common/framing/MethodContext.h b/cpp/lib/common/framing/MethodContext.h index d9717d90a0..afb499023d 100644 --- a/cpp/lib/common/framing/MethodContext.h +++ b/cpp/lib/common/framing/MethodContext.h @@ -64,7 +64,10 @@ struct MethodContext  };  // FIXME aconway 2007-02-01: Method context only required on Handler -// functions, not on Proxy functions. +// functions, not on Proxy functions. If we add set/getChannel(ChannelAdapter*) +// on AMQBody and set it during decodeing then we could get rid of the context. + +  }} // namespace qpid::framing diff --git a/cpp/tests/ChannelTest.cpp b/cpp/tests/ChannelTest.cpp index a3dabe6408..dcfd2fdb90 100644 --- a/cpp/tests/ChannelTest.cpp +++ b/cpp/tests/ChannelTest.cpp @@ -28,7 +28,7 @@  #include <memory>  #include <AMQP_HighestVersion.h>  #include "AMQFrame.h" -#include "DummyChannel.h" +#include "MockChannel.h"  #include "broker/Connection.h"  #include "ProtocolInitiation.h" @@ -39,7 +39,7 @@ using namespace qpid::sys;  using std::string;  using std::queue; -struct DummyHandler : ConnectionOutputHandler{ +struct MockHandler : ConnectionOutputHandler{      std::vector<AMQFrame*> frames;       void send(AMQFrame* frame){ frames.push_back(frame); } @@ -60,7 +60,7 @@ class ChannelTest : public CppUnit::TestCase      Broker::shared_ptr broker;      Connection connection; -    DummyHandler handler; +    MockHandler handler;      class MockMessageStore : public NullMessageStore      { @@ -240,10 +240,10 @@ class ChannelTest : public CppUnit::TestCase          Channel channel(              connection, 1, 1000/*framesize*/, &store, 10/*staging threshold*/);          const string data[] = {"abcde", "fghij", "klmno"}; - +                  Message* msg = new BasicMessage(              0, "my_exchange", "my_routing_key", false, false, -            DummyChannel::basicGetBody()); +            MockChannel::basicGetBody());          store.expect();          store.stage(msg); @@ -253,7 +253,8 @@ class ChannelTest : public CppUnit::TestCase          store.destroy(msg);          store.test(); -        Exchange::shared_ptr exchange(new FanOutExchange("my_exchange")); +        Exchange::shared_ptr exchange  = +            broker->getExchanges().declare("my_exchange", "fanout").first;          Queue::shared_ptr queue(new Queue("my_queue"));          exchange->bind(queue, "", 0); @@ -333,7 +334,7 @@ class ChannelTest : public CppUnit::TestCase      {          BasicMessage* msg = new BasicMessage(              0, exchange, routingKey, false, false, -            DummyChannel::basicGetBody()); +            MockChannel::basicGetBody());          AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));          header->setContentSize(contentSize);                  msg->setHeader(header); diff --git a/cpp/tests/InMemoryContentTest.cpp b/cpp/tests/InMemoryContentTest.cpp index db54ea44a0..c8433432e8 100644 --- a/cpp/tests/InMemoryContentTest.cpp +++ b/cpp/tests/InMemoryContentTest.cpp @@ -24,7 +24,7 @@  #include <iostream>  #include <list>  #include "AMQFrame.h" -#include "DummyChannel.h" +#include "MockChannel.h"  using std::list;  using std::string; @@ -58,7 +58,7 @@ public:      void refragment(size_t inCount, string* in, size_t outCount, string* out, u_int32_t framesize = 5)      {          InMemoryContent content; -        DummyChannel channel(3); +        MockChannel channel(3);          addframes(content, inCount, in);          content.send(channel, framesize);          diff --git a/cpp/tests/LazyLoadedContentTest.cpp b/cpp/tests/LazyLoadedContentTest.cpp index 49e4ecc4ae..365e4f6a11 100644 --- a/cpp/tests/LazyLoadedContentTest.cpp +++ b/cpp/tests/LazyLoadedContentTest.cpp @@ -26,7 +26,7 @@  #include <list>  #include <sstream>  #include "AMQFrame.h" -#include "DummyChannel.h" +#include "MockChannel.h"  using std::list;  using std::string;  using boost::dynamic_pointer_cast; @@ -92,7 +92,7 @@ public:      {          TestMessageStore store(in);          LazyLoadedContent content(&store, 0, in.size()); -        DummyChannel channel(3); +        MockChannel channel(3);          content.send(channel, framesize);                   CPPUNIT_ASSERT_EQUAL(outCount, channel.out.frames.size()); diff --git a/cpp/tests/Makefile.am b/cpp/tests/Makefile.am index a2bbc07213..7a8480cded 100644 --- a/cpp/tests/Makefile.am +++ b/cpp/tests/Makefile.am @@ -9,12 +9,6 @@ INCLUDES =				\    -I$(top_srcdir)/lib/common/framing	\    $(APR_CXXFLAGS) -EXTRA_DIST =		\ -  topictest		\ -  qpid_test_plugin.h	\ -  MockConnectionInputHandler.h - -  client_exe_tests =	\    client_test		\    echo_service		\ @@ -32,6 +26,7 @@ broker_tests =		\    MessageBuilderTest	\    MessageHandlerTest	\    MessageTest		\ +  ReferenceTest         \    QueueRegistryTest	\    QueueTest		\    QueuePolicyTest	\ @@ -69,7 +64,14 @@ noinst_PROGRAMS = $(client_exe_tests)  CLIENT_TESTS = client_test quick_topictest  TESTS = run-unit-tests start_broker $(CLIENT_TESTS) python_tests kill_broker -EXTRA_DIST += $(TESTS) topictest + +EXTRA_DIST =		\ +  $(TESTS) 		\ +  topictest		\ +  qpid_test_plugin.h	\ +  MockConnectionInputHandler.h \ +  MockChannel.h		\ +  InProcessBroker.h  include gen.mk diff --git a/cpp/tests/MessageBuilderTest.cpp b/cpp/tests/MessageBuilderTest.cpp index dc660751b7..d1b9c6ee62 100644 --- a/cpp/tests/MessageBuilderTest.cpp +++ b/cpp/tests/MessageBuilderTest.cpp @@ -26,7 +26,7 @@  #include <qpid_test_plugin.h>  #include <iostream>  #include <memory> -#include "DummyChannel.h" +#include "MockChannel.h"  using namespace boost;  using namespace qpid::broker; @@ -35,10 +35,10 @@ using namespace qpid::sys;  class MessageBuilderTest : public CppUnit::TestCase    { -    struct DummyHandler : MessageBuilder::CompletionHandler{ +    struct MockHandler : CompletionHandler {          Message::shared_ptr msg; -        virtual void complete(Message::shared_ptr& _msg){ +        virtual void complete(Message::shared_ptr _msg){              msg = _msg;          }      }; @@ -114,13 +114,13 @@ class MessageBuilderTest : public CppUnit::TestCase    public:      void testHeaderOnly(){ -        DummyHandler handler; +        MockHandler handler;          MessageBuilder builder(&handler);          Message::shared_ptr message(              new BasicMessage(                  0, "test", "my_routing_key", false, false, -                DummyChannel::basicGetBody())); +                MockChannel::basicGetBody()));          AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));          header->setContentSize(0); @@ -132,14 +132,14 @@ class MessageBuilderTest : public CppUnit::TestCase      }      void test1ContentFrame(){ -        DummyHandler handler; +        MockHandler handler;          MessageBuilder builder(&handler);          string data1("abcdefg");          Message::shared_ptr message(              new BasicMessage(0, "test", "my_routing_key", false, false, -                             DummyChannel::basicGetBody())); +                             MockChannel::basicGetBody()));          AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));          header->setContentSize(7);          AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); @@ -154,7 +154,7 @@ class MessageBuilderTest : public CppUnit::TestCase      }      void test2ContentFrames(){ -        DummyHandler handler; +        MockHandler handler;          MessageBuilder builder(&handler);          string data1("abcdefg"); @@ -162,7 +162,7 @@ class MessageBuilderTest : public CppUnit::TestCase          Message::shared_ptr message(              new BasicMessage(0, "test", "my_routing_key", false, false, -                             DummyChannel::basicGetBody())); +                             MockChannel::basicGetBody()));          AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));          header->setContentSize(14);          AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); @@ -185,7 +185,7 @@ class MessageBuilderTest : public CppUnit::TestCase          //loaded content is in use)          TestMessageStore store(14);          { -            DummyHandler handler; +            MockHandler handler;              MessageBuilder builder(&handler, &store, 5);              string data1("abcdefg"); @@ -193,7 +193,7 @@ class MessageBuilderTest : public CppUnit::TestCase              Message::shared_ptr message(                  new BasicMessage(0, "test", "my_routing_key", false, false, -                                 DummyChannel::basicGetBody())); +                                 MockChannel::basicGetBody()));              AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));              header->setContentSize(14);              BasicHeaderProperties* properties = dynamic_cast<BasicHeaderProperties*>(header->getProperties()); diff --git a/cpp/tests/MessageTest.cpp b/cpp/tests/MessageTest.cpp index 103a23f0df..2f49a28b83 100644 --- a/cpp/tests/MessageTest.cpp +++ b/cpp/tests/MessageTest.cpp @@ -23,7 +23,7 @@  #include <iostream>  #include <AMQP_HighestVersion.h>  #include "AMQFrame.h" -#include "DummyChannel.h" +#include "MockChannel.h"  using namespace boost;  using namespace qpid::broker; @@ -47,7 +47,7 @@ class MessageTest : public CppUnit::TestCase          BasicMessage::shared_ptr msg(              new BasicMessage(0, exchange, routingKey, false, false, -                             DummyChannel::basicGetBody())); +                             MockChannel::basicGetBody()));          AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));          header->setContentSize(14);                  AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); @@ -73,7 +73,7 @@ class MessageTest : public CppUnit::TestCase          CPPUNIT_ASSERT_EQUAL(string("xyz"), msg->getHeaderProperties()->getHeaders().getString("abc"));          CPPUNIT_ASSERT_EQUAL((u_int64_t) 14, msg->contentSize()); -        DummyChannel channel(1); +        MockChannel channel(1);          // FIXME aconway 2007-02-02: deliver should take const ProtocolVersion&          msg->deliver(channel, "ignore", 0, 100);           CPPUNIT_ASSERT_EQUAL((size_t) 3, channel.out.frames.size()); diff --git a/cpp/tests/DummyChannel.h b/cpp/tests/MockChannel.h index 2b6bd9b2b9..10fcb56969 100644 --- a/cpp/tests/DummyChannel.h +++ b/cpp/tests/MockChannel.h @@ -1,5 +1,5 @@ -#ifndef _tests_DummyChannel_h -#define _tests_DummyChannel_h +#ifndef _tests_MockChannel_h +#define _tests_MockChannel_h  /*   * @@ -26,16 +26,16 @@  #include "framing/AMQFrame.h"  #include "BasicGetBody.h" -/** Dummy output handler to collect frames */ -struct DummyOutputHandler : public qpid::framing::OutputHandler { +/** Mock output handler to collect frames */ +struct MockOutputHandler : public qpid::framing::OutputHandler {      std::vector<qpid::framing::AMQFrame*> frames;      void send(qpid::framing::AMQFrame* frame){ frames.push_back(frame); }  };  /** - * Combination dummy OutputHandler and ChannelAdapter for tests. + * Combination mock OutputHandler and ChannelAdapter for tests.   */ -struct DummyChannel : public qpid::framing::ChannelAdapter +struct MockChannel : public qpid::framing::ChannelAdapter  {      typedef qpid::framing::BasicGetBody Body;      static Body::shared_ptr basicGetBody() { @@ -43,9 +43,9 @@ struct DummyChannel : public qpid::framing::ChannelAdapter              new Body(qpid::framing::ProtocolVersion()));      } -    DummyOutputHandler out; +    MockOutputHandler out; -    DummyChannel(qpid::framing::ChannelId id) { +    MockChannel(qpid::framing::ChannelId id) {          init(id, out, qpid::framing::ProtocolVersion());      } @@ -66,4 +66,4 @@ struct DummyChannel : public qpid::framing::ChannelAdapter  }; -#endif  /*!_tests_DummyChannel_h*/ +#endif  /*!_tests_MockChannel_h*/ diff --git a/cpp/tests/QueueTest.cpp b/cpp/tests/QueueTest.cpp index 7105509de6..59ca7728ca 100644 --- a/cpp/tests/QueueTest.cpp +++ b/cpp/tests/QueueTest.cpp @@ -22,7 +22,7 @@  #include <QueueRegistry.h>  #include <qpid_test_plugin.h>  #include <iostream> -#include "DummyChannel.h" +#include "MockChannel.h"  using namespace qpid::broker;  using namespace qpid::sys; @@ -58,7 +58,7 @@ class QueueTest : public CppUnit::TestCase      Message::shared_ptr message(std::string exchange, std::string routingKey) {          return Message::shared_ptr(              new BasicMessage(0, exchange, routingKey, true, true, -                             DummyChannel::basicGetBody())); +                             MockChannel::basicGetBody()));      }      void testConsumers(){ diff --git a/cpp/tests/ReferenceTest.cpp b/cpp/tests/ReferenceTest.cpp new file mode 100644 index 0000000000..b50511f724 --- /dev/null +++ b/cpp/tests/ReferenceTest.cpp @@ -0,0 +1,116 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <iostream> +#include <memory> +#include "qpid_test_plugin.h" +#include "Reference.h" +#include "BrokerMessageMessage.h" +#include "MessageTransferBody.h" +#include "MessageAppendBody.h" +#include "CompletionHandler.h" + +using namespace boost; +using namespace qpid::broker; +using namespace qpid::framing; +using namespace std; + +class ReferenceTest : public CppUnit::TestCase   +{ +    CPPUNIT_TEST_SUITE(ReferenceTest); +    CPPUNIT_TEST(testRegistry); +    CPPUNIT_TEST(testReference); +    CPPUNIT_TEST_SUITE_END(); + + +    struct MockCompletionHandler : public CompletionHandler { +        std::vector<Message::shared_ptr> messages; +        void complete(Message::shared_ptr msg) { messages.push_back(msg); } +    }; + +    MockCompletionHandler handler; +    ProtocolVersion v; +    ReferenceRegistry registry; +    MessageTransferBody::shared_ptr t1, t2; +    MessageAppendBody::shared_ptr a1, a2; +  public: + +    ReferenceTest() : +        registry(handler), +        t1(new MessageTransferBody(v)), +        t2(new MessageTransferBody(v)), +        a1(new MessageAppendBody(v)), +        a2(new MessageAppendBody(v)) +    {} + +    void testRegistry() { +        Reference& ref = registry.open("foo"); +        CPPUNIT_ASSERT_EQUAL(string("foo"), ref.getId()); +        CPPUNIT_ASSERT(&ref == ®istry.get("foo")); +        try { +            registry.get("none"); +            CPPUNIT_FAIL("Expected exception"); +        } catch (...) {} +        try { +            registry.open("foo"); +            CPPUNIT_FAIL("Expected exception"); +        } catch(...) {} +    } + +    MessageMessage& handlerMessage(size_t i) { +        CPPUNIT_ASSERT(handler.messages.size() > i); +        MessageMessage* msg = dynamic_cast<MessageMessage*>( +            handler.messages[i].get()); +        CPPUNIT_ASSERT(msg); +        return *msg; +    } +     +    void testReference() { +        Reference& ref = registry.open("foo"); +        ref.transfer(t1); +        ref.transfer(t2); +        CPPUNIT_ASSERT_EQUAL(size_t(2), ref.getTransfers().size()); +        ref.append(a1); +        ref.append(a2); +        CPPUNIT_ASSERT_EQUAL(size_t(2), ref.getAppends().size()); +        ref.close(); +        try { +            registry.open("foo"); +            CPPUNIT_FAIL("Expected exception"); +        } catch(...) {} + +        vector<Message::shared_ptr>& messages = handler.messages; +        CPPUNIT_ASSERT_EQUAL(size_t(2), messages.size()); + +        CPPUNIT_ASSERT_EQUAL(handlerMessage(0).getTransfer(), t1); +        CPPUNIT_ASSERT_EQUAL(handlerMessage(0).getAppends()[0], a1); +        CPPUNIT_ASSERT_EQUAL(handlerMessage(0).getAppends()[1], a2); + +        CPPUNIT_ASSERT_EQUAL(handlerMessage(1).getTransfer(), t2); +        CPPUNIT_ASSERT_EQUAL(handlerMessage(1).getAppends()[0], a1); +        CPPUNIT_ASSERT_EQUAL(handlerMessage(1).getAppends()[1], a2); +    } +                              +     +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(ReferenceTest); diff --git a/cpp/tests/TxAckTest.cpp b/cpp/tests/TxAckTest.cpp index 07381d187d..e464ff78f4 100644 --- a/cpp/tests/TxAckTest.cpp +++ b/cpp/tests/TxAckTest.cpp @@ -25,7 +25,7 @@  #include <iostream>  #include <list>  #include <vector> -#include "DummyChannel.h" +#include "MockChannel.h"  using std::list;  using std::vector; @@ -72,7 +72,7 @@ public:          for(int i = 0; i < 10; i++){              Message::shared_ptr msg(                  new BasicMessage(0, "exchange", "routing_key", false, false, -                                 DummyChannel::basicGetBody())); +                                 MockChannel::basicGetBody()));              msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC)));              msg->getHeaderProperties()->setDeliveryMode(PERSISTENT);              messages.push_back(msg); diff --git a/cpp/tests/TxPublishTest.cpp b/cpp/tests/TxPublishTest.cpp index 08e5339048..39d27a754d 100644 --- a/cpp/tests/TxPublishTest.cpp +++ b/cpp/tests/TxPublishTest.cpp @@ -25,7 +25,7 @@  #include <iostream>  #include <list>  #include <vector> -#include "DummyChannel.h" +#include "MockChannel.h"  using std::list;  using std::pair; @@ -78,7 +78,7 @@ public:          queue1(new Queue("queue1", false, &store, 0)),           queue2(new Queue("queue2", false, &store, 0)),           msg(new BasicMessage(0, "exchange", "routing_key", false, false, -                             DummyChannel::basicGetBody())), +                             MockChannel::basicGetBody())),          op(msg, &xid)      {          msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC))); diff --git a/cpp/tests/start_broker b/cpp/tests/start_broker index 05510b17ac..fe30458463 100755 --- a/cpp/tests/start_broker +++ b/cpp/tests/start_broker @@ -11,4 +11,4 @@ rm -rf $LOG $PID  # FIXME aconway 2007-01-18: qpidd should not return till it is accepting  # connections, remove arbitrary sleep. -sleep 2 +sleep 5 | 
