diff options
author | Alan Conway <aconway@apache.org> | 2008-01-29 14:48:59 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-01-29 14:48:59 +0000 |
commit | 6b6dfc1709eace6db0c624676ad297e34fef4aa7 (patch) | |
tree | fb397c690916219439994edb19072ab5f6254a27 /cpp | |
parent | ff63c19a83d95fa8f0116424d609e61df9085500 (diff) | |
download | qpid-python-6b6dfc1709eace6db0c624676ad297e34fef4aa7.tar.gz |
Deleted unused classes, adjusted files that still mention them.
D src/qpid/framing/ChannelAdapter.cpp
D src/qpid/framing/ChannelAdapter.h
D src/qpid/framing/HandlerUpdater.h
D src/tests/BrokerChannelTest.cpp
D src/tests/MockChannel.h
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@616353 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/Makefile.am | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 13 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.h | 13 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/SessionManager.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/SessionManager.h | 7 | ||||
-rw-r--r-- | cpp/src/qpid/framing/ChannelAdapter.cpp | 71 | ||||
-rw-r--r-- | cpp/src/qpid/framing/ChannelAdapter.h | 93 | ||||
-rw-r--r-- | cpp/src/qpid/framing/HandlerUpdater.h | 46 | ||||
-rw-r--r-- | cpp/src/tests/BrokerChannelTest.cpp | 367 | ||||
-rw-r--r-- | cpp/src/tests/Makefile.am | 1 | ||||
-rw-r--r-- | cpp/src/tests/MessageTest.cpp | 1 | ||||
-rw-r--r-- | cpp/src/tests/MockChannel.h | 55 | ||||
-rw-r--r-- | cpp/src/tests/QueueTest.cpp | 1 | ||||
-rw-r--r-- | cpp/src/tests/TxAckTest.cpp | 1 | ||||
-rw-r--r-- | cpp/src/tests/TxPublishTest.cpp | 1 |
17 files changed, 8 insertions, 675 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index a79f263c82..48e081e412 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -111,7 +111,6 @@ libqpidcommon_la_SOURCES = \ qpid/framing/Array.cpp \ qpid/framing/BodyHolder.cpp \ qpid/framing/BodyHandler.cpp \ - qpid/framing/ChannelAdapter.cpp \ qpid/framing/Buffer.cpp \ qpid/framing/FieldTable.cpp \ qpid/framing/FieldValue.cpp \ @@ -358,7 +357,6 @@ nobase_include_HEADERS = \ qpid/framing/BodyHandler.h \ qpid/framing/Buffer.h \ qpid/framing/ChannelHandler.h \ - qpid/framing/ChannelAdapter.h \ qpid/framing/FieldTable.h \ qpid/framing/FieldValue.h \ qpid/framing/FrameDefaultVisitor.h \ @@ -367,7 +365,6 @@ nobase_include_HEADERS = \ qpid/framing/FrameSet.h \ qpid/framing/FramingContent.h \ qpid/framing/Handler.h \ - qpid/framing/HandlerUpdater.h \ qpid/framing/HeaderProperties.h \ qpid/framing/InitiationHandler.h \ qpid/framing/Invoker.h \ diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 8edbb25cd5..06a0d33a85 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -35,7 +35,6 @@ #include "qpid/log/Statement.h" #include "qpid/Url.h" #include "qpid/framing/AMQFrame.h" -#include "qpid/framing/HandlerUpdater.h" #include "qpid/framing/ProtocolInitiation.h" #include "qpid/sys/Acceptor.h" #include "qpid/sys/ConnectionInputHandler.h" @@ -49,7 +48,6 @@ #include <memory> using qpid::sys::Acceptor; -using qpid::framing::HandlerUpdater; using qpid::framing::FrameHandler; using qpid::framing::ChannelId; using qpid::management::ManagementAgent; @@ -238,17 +236,6 @@ Acceptor& Broker::getAcceptor() const { return *acceptor; } -void Broker::add(const shared_ptr<HandlerUpdater>& updater) { - QPID_LOG(debug, "Broker added HandlerUpdater"); - handlerUpdaters.push_back(updater); -} - -void Broker::update(ChannelId channel, FrameHandler::Chains& chains) { - for_each(handlerUpdaters.begin(), handlerUpdaters.end(), - boost::bind(&HandlerUpdater::update, _1, - channel, boost::ref(chains))); -} - ManagementObject::shared_ptr Broker::GetManagementObject(void) const { return dynamic_pointer_cast<ManagementObject> (mgmtObject); diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 32a3ea55f7..1917da83cc 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -46,11 +46,6 @@ #include <vector> namespace qpid { - -namespace framing { -class HandlerUpdater; -} - namespace broker { /** @@ -99,12 +94,6 @@ class Broker : public sys::Runnable, public Plugin::Target, public management::M /** Shut down the broker */ virtual void shutdown(); - /** Register a handler updater. */ - void add(const shared_ptr<framing::HandlerUpdater>&); - - /** Apply all handler updaters to a handler chain pair. */ - void update(framing::ChannelId, framing::FrameHandler::Chains&); - void setStore (MessageStore*); MessageStore& getStore() { return *store; } QueueRegistry& getQueues() { return queues; } @@ -125,13 +114,11 @@ class Broker : public sys::Runnable, public Plugin::Target, public management::M Options config; sys::Acceptor::shared_ptr acceptor; MessageStore* store; - typedef std::vector<shared_ptr<framing::HandlerUpdater> > HandlerUpdaters; QueueRegistry queues; ExchangeRegistry exchanges; ConnectionFactory factory; DtxManager dtxManager; - HandlerUpdaters handlerUpdaters; SessionManager sessionManager; management::ManagementAgent::shared_ptr managementAgent; management::Broker::shared_ptr mgmtObject; diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 7e8cb2d435..1c53bcb9a7 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -64,7 +64,8 @@ class Cluster : public framing::FrameHandler, virtual ~Cluster(); - framing::HandlerUpdater& getHandlerUpdater() { return sessions; } + // FIXME aconway 2008-01-29: + //framing::HandlerUpdater& getHandlerUpdater() { return sessions; } /** Get the current cluster membership. */ MemberList getMembers() const; diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index 380bd7f632..35a07fbc2d 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -55,7 +55,7 @@ struct ClusterPlugin : public Plugin { if (broker && !options.clusterName.empty()) { assert(!cluster); // A process can only belong to one cluster. cluster = boost::in_place(options.clusterName, broker->getUrl(), boost::ref(*broker)); - broker->add(make_shared_ptr(&cluster->getHandlerUpdater(), nullDeleter)); + // broker->add(make_shared_ptr(&cluster->getHandlerUpdater(), nullDeleter)); } } }; diff --git a/cpp/src/qpid/cluster/SessionManager.cpp b/cpp/src/qpid/cluster/SessionManager.cpp index 71f52507b8..68e0223a40 100644 --- a/cpp/src/qpid/cluster/SessionManager.cpp +++ b/cpp/src/qpid/cluster/SessionManager.cpp @@ -25,7 +25,6 @@ #include "qpid/framing/AMQP_ServerOperations.h" #include "qpid/broker/BrokerAdapter.h" #include "qpid/broker/Connection.h" -#include "qpid/framing/ChannelAdapter.h" #include <boost/utility/in_place_factory.hpp> @@ -37,7 +36,7 @@ using namespace sys; using namespace broker; /** Handler to send frames direct to local broker (bypass correlation etc.) */ -struct SessionManager::BrokerHandler : public FrameHandler, private ChannelAdapter +struct SessionManager::BrokerHandler : public FrameHandler { Connection connection; SessionHandler sessionAdapter; @@ -56,7 +55,7 @@ struct SessionManager::BrokerHandler : public FrameHandler, private ChannelAdapt connection(0, broker), sessionAdapter(connection, 0), session(sessionAdapter, 1), - adapter(session, static_cast<ChannelAdapter&>(*this)) {} + adapter(session, 0) {} // FIXME aconway 2008-01-29: void handle(AMQFrame& frame) { AMQMethodBody* body=dynamic_cast<AMQMethodBody*>(frame.getBody()); diff --git a/cpp/src/qpid/cluster/SessionManager.h b/cpp/src/qpid/cluster/SessionManager.h index 10fe5f82d1..c0e0cb5736 100644 --- a/cpp/src/qpid/cluster/SessionManager.h +++ b/cpp/src/qpid/cluster/SessionManager.h @@ -21,7 +21,7 @@ #include "ClassifierHandler.h" -#include "qpid/framing/HandlerUpdater.h" +//FIXME aconway 2008-01-29: #include "qpid/framing/HandlerUpdater.h" #include "qpid/framing/FrameHandler.h" #include "qpid/sys/Mutex.h" @@ -45,10 +45,9 @@ namespace cluster { /** * Manage the clusters session map. - * + * // FIXME aconway 2008-01-29: HandlerUpdater */ -class SessionManager : public framing::HandlerUpdater, public framing::FrameHandler, - private boost::noncopyable +class SessionManager : public framing::FrameHandler, private boost::noncopyable { public: SessionManager(broker::Broker& broker, framing::FrameHandler& cluster); diff --git a/cpp/src/qpid/framing/ChannelAdapter.cpp b/cpp/src/qpid/framing/ChannelAdapter.cpp deleted file mode 100644 index a1e49a1904..0000000000 --- a/cpp/src/qpid/framing/ChannelAdapter.cpp +++ /dev/null @@ -1,71 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "ChannelAdapter.h" -#include "OutputHandler.h" -#include "AMQFrame.h" -#include "FrameHandler.h" -#include "qpid/Exception.h" - -#include "AMQMethodBody.h" -#include "qpid/framing/ConnectionOpenBody.h" - -namespace qpid { -namespace framing { - -ChannelAdapter::Handler::Handler(ChannelAdapter& c) : parent(c) {} -void ChannelAdapter::Handler::handle(AMQFrame& f) { parent.handleBody(f.getBody()); } - -ChannelAdapter::ChannelAdapter() : handler(*this), id(0) {} - -void ChannelAdapter::init(ChannelId i, FrameHandler& out, ProtocolVersion v) -{ - assertChannelNotOpen(); - id = i; - version = v; - handlers.reset(&handler, &out); -} - -void ChannelAdapter::send(const AMQBody& body) -{ - assertChannelOpen(); - AMQFrame frame(body); - frame.setChannel(getId()); - handlers.out(frame); -} - -void ChannelAdapter::assertMethodOk(AMQMethodBody& method) const { - if (getId() != 0 && method.amqpClassId() == ConnectionOpenBody::CLASS_ID) - throw ChannelErrorException( - QPID_MSG("Connection method on non-0 channel " << getId())); -} - -void ChannelAdapter::assertChannelOpen() const { - if (getId() != 0 && !isOpen()) - throw ChannelErrorException( - QPID_MSG("Channel " << getId() << " is not open.")); -} - -void ChannelAdapter::assertChannelNotOpen() const { - if (getId() != 0 && isOpen()) - throw ChannelErrorException( - QPID_MSG("Channel " << getId() << " is already open.")); -} - -void ChannelAdapter::handle(AMQFrame& f) { handleBody(f.getBody()); } - -}} // namespace qpid::framing diff --git a/cpp/src/qpid/framing/ChannelAdapter.h b/cpp/src/qpid/framing/ChannelAdapter.h deleted file mode 100644 index 55fd08da9d..0000000000 --- a/cpp/src/qpid/framing/ChannelAdapter.h +++ /dev/null @@ -1,93 +0,0 @@ - - - -#ifndef _ChannelAdapter_ -#define _ChannelAdapter_ - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "BodyHandler.h" -#include "ProtocolVersion.h" -#include "amqp_types.h" -#include "FrameHandler.h" - -namespace qpid { -namespace framing { - -/** - * Base class for client and broker channels. - * - * Provides in/out handler chains containing channel handlers. - * Chains may be modified by ChannelUpdaters registered with the broker. - * - * The handlers provided by the ChannelAdapter update request/response data. - * - * send() constructs a frame, updates request/resposne ID and forwards it - * to the out() chain. - * - * Thread safety: OBJECT UNSAFE. Instances must not be called - * concurrently. AMQP defines channels to be serialized. - */ -class ChannelAdapter : protected BodyHandler { - public: - /** - *@param output Processed frames are forwarded to this handler. - */ - ChannelAdapter(); - virtual ~ChannelAdapter() {} - - /** Initialize the channel adapter. */ - void init(ChannelId, FrameHandler&, ProtocolVersion); - - FrameHandler::Chains& getHandlers() { return handlers; } - - ChannelId getId() const { return id; } - ProtocolVersion getVersion() const { return version; } - - virtual void send(const AMQBody& body); - - virtual bool isOpen() const = 0; - - void handle(AMQFrame& f); - protected: - void assertMethodOk(AMQMethodBody& method) const; - void assertChannelOpen() const; - void assertChannelNotOpen() const; - - virtual void handleMethod(AMQMethodBody*) = 0; - - private: - struct Handler : public FrameHandler { - Handler(ChannelAdapter&); - void handle(AMQFrame&); - ChannelAdapter& parent; - }; - Handler handler; - ChannelId id; - ProtocolVersion version; - FrameHandler::Chains handlers; -}; - -}} - - -#endif diff --git a/cpp/src/qpid/framing/HandlerUpdater.h b/cpp/src/qpid/framing/HandlerUpdater.h deleted file mode 100644 index fb71c04fd6..0000000000 --- a/cpp/src/qpid/framing/HandlerUpdater.h +++ /dev/null @@ -1,46 +0,0 @@ -#ifndef QPID_FRAMING_HANDLERUPDATER_H -#define QPID_FRAMING_HANDLERUPDATER_H - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "qpid/Plugin.h" -#include "qpid/framing/amqp_types.h" -#include "qpid/framing/FrameHandler.h" - -namespace qpid { -namespace framing { - -/** Interface for objects that can update handler chains. */ -struct HandlerUpdater { - virtual ~HandlerUpdater() {} - - /** Update the handler chains. - *@param channel Id of associated channel. - *@param chains Handler chains to be updated. - */ - virtual void update(ChannelId channel, FrameHandler::Chains& chains) = 0; -}; - -}} // namespace qpid::framing - - - - - -#endif /*!QPID_FRAMING_HANDLERUPDATER_H*/ diff --git a/cpp/src/tests/BrokerChannelTest.cpp b/cpp/src/tests/BrokerChannelTest.cpp deleted file mode 100644 index 74c8998d7c..0000000000 --- a/cpp/src/tests/BrokerChannelTest.cpp +++ /dev/null @@ -1,367 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/broker/BrokerChannel.h" -#include "qpid/broker/Queue.h" -#include "qpid/broker/FanOutExchange.h" -#include "qpid/broker/Message.h" -#include "qpid/broker/MessageDelivery.h" -#include "qpid/broker/NullMessageStore.h" -#include "qpid_test_plugin.h" -#include <iostream> -#include <sstream> -#include <memory> -#include "qpid/framing/AMQP_HighestVersion.h" -#include "qpid/framing/AMQFrame.h" -#include "MockChannel.h" -#include "qpid/broker/Connection.h" -#include "qpid/framing/ProtocolInitiation.h" -#include "qpid/framing/ConnectionStartBody.h" -#include <vector> - -using namespace boost; -using namespace qpid::broker; -using namespace qpid::framing; -using namespace qpid::sys; -using std::string; -using std::queue; - -struct MockHandler : ConnectionOutputHandler{ - std::vector<AMQFrame> frames; - - void send(AMQFrame& frame){ frames.push_back(frame); } - - void close() {}; -}; - -struct DeliveryRecorder : DeliveryAdapter -{ - DeliveryId id; - typedef std::pair<intrusive_ptr<Message>, DeliveryToken::shared_ptr> Delivery; - std::vector<Delivery> delivered; - - DeliveryId deliver(intrusive_ptr<Message>& msg, DeliveryToken::shared_ptr token) - { - delivered.push_back(Delivery(msg, token)); - return ++id; - } - - void redeliver(intrusive_ptr<Message>& msg, DeliveryToken::shared_ptr token, DeliveryId /*tag*/) - { - delivered.push_back(Delivery(msg, token)); - } -}; - -class BrokerChannelTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(BrokerChannelTest); - CPPUNIT_TEST(testConsumerMgmt);; - CPPUNIT_TEST(testDeliveryNoAck); - CPPUNIT_TEST(testQueuePolicy); - CPPUNIT_TEST(testFlow); - CPPUNIT_TEST(testAsyncMesgToMoreThanOneQueue); - CPPUNIT_TEST_SUITE_END(); - - shared_ptr<Broker> broker; - Connection connection; - MockHandler handler; - - class MockMessageStore : public NullMessageStore - { - struct MethodCall - { - const string name; - PersistableMessage* msg; - const string data;//only needed for appendContent - - void check(const MethodCall& other) const - { - CPPUNIT_ASSERT_EQUAL(name, other.name); - CPPUNIT_ASSERT_EQUAL(msg, other.msg); - CPPUNIT_ASSERT_EQUAL(data, other.data); - } - }; - - queue<MethodCall> expected; - bool expectMode;//true when setting up expected calls - - void handle(const MethodCall& call) - { - if (expectMode) { - expected.push(call); - } else { - call.check(expected.front()); - expected.pop(); - } - } - - void handle(const string& name, PersistableMessage* msg, const string& data) - { - MethodCall call = {name, msg, data}; - handle(call); - } - - public: - - MockMessageStore() : expectMode(false) {} - - void stage(PersistableMessage& msg) - { - if(!expectMode) msg.setPersistenceId(1); - MethodCall call = {"stage", &msg, ""}; - handle(call); - } - - void appendContent(PersistableMessage& msg, const string& data) - { - MethodCall call = {"appendContent", &msg, data}; - handle(call); - } - - // Don't hide overloads. - using NullMessageStore::destroy; - - void destroy(PersistableMessage& msg) - { - MethodCall call = {"destroy", &msg, ""}; - handle(call); - } - - void expect() - { - expectMode = true; - } - - void test() - { - expectMode = false; - } - - void check() - { - if (!expected.empty()) { - std::stringstream error; - error << "Expected: "; - while (!expected.empty()) { - MethodCall& m = expected.front(); - error << m.name << "(" << m.msg << ", '" << m.data << "'); "; - expected.pop(); - } - CPPUNIT_FAIL(error.str()); - } - } - }; - - DeliveryRecorder recorder; - - public: - - BrokerChannelTest() : - broker(Broker::create()), - connection(&handler, *broker) - { - connection.initiated(ProtocolInitiation()); - } - - - void testConsumerMgmt(){ - Queue::shared_ptr queue(new Queue("my_queue")); - Channel channel(connection, recorder, 0); - channel.open(); - CPPUNIT_ASSERT(!channel.exists("my_consumer")); - - ConnectionToken* owner = 0; - string tag("my_consumer"); - DeliveryToken::shared_ptr unused; - channel.consume(unused, tag, queue, false, false, owner); - string tagA; - string tagB; - channel.consume(unused, tagA, queue, false, false, owner); - channel.consume(unused, tagB, queue, false, false, owner); - CPPUNIT_ASSERT_EQUAL((uint32_t) 3, queue->getConsumerCount()); - CPPUNIT_ASSERT(channel.exists("my_consumer")); - CPPUNIT_ASSERT(channel.exists(tagA)); - CPPUNIT_ASSERT(channel.exists(tagB)); - channel.cancel(tagA); - CPPUNIT_ASSERT_EQUAL((uint32_t) 2, queue->getConsumerCount()); - CPPUNIT_ASSERT(channel.exists("my_consumer")); - CPPUNIT_ASSERT(!channel.exists(tagA)); - CPPUNIT_ASSERT(channel.exists(tagB)); - channel.close(); - CPPUNIT_ASSERT_EQUAL((uint32_t) 0, queue->getConsumerCount()); - } - - void testDeliveryNoAck(){ - Channel channel(connection, recorder, 7); - intrusive_ptr<Message> msg(createMessage("test", "my_routing_key", "my_message_id", 14)); - Queue::shared_ptr queue(new Queue("my_queue")); - string tag("test"); - DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken("my-token")); - channel.consume(token, tag, queue, false, false, 0); - queue->deliver(msg); - sleep(2); - - CPPUNIT_ASSERT_EQUAL((size_t) 1, recorder.delivered.size()); - CPPUNIT_ASSERT_EQUAL(msg, recorder.delivered.front().first); - CPPUNIT_ASSERT_EQUAL(token, recorder.delivered.front().second); - } - - - //NOTE: strictly speaking this should/could be part of QueueTest, - //but as it can usefully use the same utility classes as this - //class it is defined here for simpllicity - void testQueuePolicy() - { - MockMessageStore store; - {//must ensure that store is last thing deleted as it is needed by destructor of lazy loaded content - const string data1("abcd"); - const string data2("efghijk"); - const string data3("lmnopqrstuvwxyz"); - intrusive_ptr<Message> msg1(createMessage("e", "A", "MsgA", data1.size())); - intrusive_ptr<Message> msg2(createMessage("e", "B", "MsgB", data2.size())); - intrusive_ptr<Message> msg3(createMessage("e", "C", "MsgC", data3.size())); - addContent(msg1, data1); - addContent(msg2, data2); - addContent(msg3, data3); - - QueuePolicy policy(2, 0);//third message should be stored on disk and lazy loaded - FieldTable settings; - policy.update(settings); - - store.expect(); - store.stage(0, *msg3); - store.test(); - - Queue::shared_ptr queue(new Queue("my_queue", false, &store, 0)); - queue->configure(settings);//set policy - queue->deliver(msg1); - queue->deliver(msg2); - queue->deliver(msg3); - sleep(2); - - intrusive_ptr<Message> next = queue->dequeue().payload; - CPPUNIT_ASSERT_EQUAL(msg1, next); - CPPUNIT_ASSERT_EQUAL((uint32_t) data1.size(), next->encodedContentSize()); - next = queue->dequeue().payload; - CPPUNIT_ASSERT_EQUAL(msg2, next); - CPPUNIT_ASSERT_EQUAL((uint32_t) data2.size(), next->encodedContentSize()); - next = queue->dequeue().payload; - CPPUNIT_ASSERT_EQUAL(msg3, next); - CPPUNIT_ASSERT_EQUAL((uint32_t) 0, next->encodedContentSize()); - - next.reset(); - msg1.reset(); - msg2.reset(); - msg3.reset();//must clear all references to messages to allow them to be destroyed - - } - store.check(); - } - - - //NOTE: message or queue test, - //but as it can usefully use the same utility classes as this - //class it is defined here for simpllicity - void testAsyncMesgToMoreThanOneQueue() - { - MockMessageStore store; - {//must ensure that store is last thing deleted - const string data1("abcd"); - intrusive_ptr<Message> msg1(createMessage("e", "A", "MsgA", data1.size())); - addContent(msg1, data1); - - Queue::shared_ptr queue1(new Queue("my_queue1", false, &store, 0)); - Queue::shared_ptr queue2(new Queue("my_queue2", false, &store, 0)); - Queue::shared_ptr queue3(new Queue("my_queue3", false, &store, 0)); - queue1->deliver(msg1); - queue2->deliver(msg1); - queue3->deliver(msg1); - sleep(2); - - intrusive_ptr<Message> next = queue1->dequeue().payload; - CPPUNIT_ASSERT_EQUAL(msg1, next); - next = queue2->dequeue().payload; - CPPUNIT_ASSERT_EQUAL(msg1, next); - next = queue3->dequeue().payload; - CPPUNIT_ASSERT_EQUAL(msg1, next); - - } - } - - - - void testFlow(){ - Channel channel(connection, recorder, 7); - channel.open(); - //there will always be a connection-start frame - CPPUNIT_ASSERT_EQUAL((size_t) 1, handler.frames.size()); - CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0].getChannel()); - CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>(handler.frames[0].getBody())); - - Queue::shared_ptr queue(new Queue("my_queue")); - string tag("test"); - DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken("my-token")); - channel.consume(token, tag, queue, false, false, 0); - channel.flow(false); - - //'publish' a message - intrusive_ptr<Message> msg(createMessage("test", "my_routing_key", "my_message_id", 14)); - addContent(msg, "abcdefghijklmn"); - queue->deliver(msg); - - //ensure no messages have been delivered - CPPUNIT_ASSERT_EQUAL((size_t) 0, recorder.delivered.size()); - - channel.flow(true); - sleep(2); - //ensure no messages have been delivered - CPPUNIT_ASSERT_EQUAL((size_t) 1, recorder.delivered.size()); - CPPUNIT_ASSERT_EQUAL(msg, recorder.delivered.front().first); - CPPUNIT_ASSERT_EQUAL(token, recorder.delivered.front().second); - } - - intrusive_ptr<Message> createMessage(const string& exchange, const string& routingKey, const string& messageId, uint64_t contentSize) - { - intrusive_ptr<Message> msg(new Message()); - - AMQFrame method(in_place<MessageTransferBody>( - ProtocolVersion(), 0, exchange, 0, 0)); - AMQFrame header(in_place<AMQHeaderBody>()); - - msg->getFrames().append(method); - msg->getFrames().append(header); - MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true); - props->setContentLength(contentSize); - props->setMessageId(messageId); - msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey); - return msg; - } - - void addContent(intrusive_ptr<Message> msg, const string& data) - { - AMQFrame content(in_place<AMQContentBody>(data)); - msg->getFrames().append(content); - } -}; - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(BrokerChannelTest); diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 7ec1858a35..3d152258a3 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -123,7 +123,6 @@ EXTRA_DIST += \ .valgrind.supp \ .valgrindrc \ MessageUtils.h \ - MockChannel.h \ MockConnectionInputHandler.h \ TxMocks.h \ qpid_test_plugin.h diff --git a/cpp/src/tests/MessageTest.cpp b/cpp/src/tests/MessageTest.cpp index f683eaad68..a19080e1ce 100644 --- a/cpp/src/tests/MessageTest.cpp +++ b/cpp/src/tests/MessageTest.cpp @@ -22,7 +22,6 @@ #include "qpid/framing/AMQP_HighestVersion.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/FieldValue.h" -#include "MockChannel.h" #include "qpid_test_plugin.h" diff --git a/cpp/src/tests/MockChannel.h b/cpp/src/tests/MockChannel.h deleted file mode 100644 index 0c06daab2c..0000000000 --- a/cpp/src/tests/MockChannel.h +++ /dev/null @@ -1,55 +0,0 @@ -#ifndef _tests_MockChannel_h -#define _tests_MockChannel_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "qpid/framing/ChannelAdapter.h" -#include "qpid/framing/OutputHandler.h" -#include "qpid/framing/AMQFrame.h" -#include "qpid/framing/BasicGetBody.h" -#include <boost/shared_ptr.hpp> -#include <vector> - -/** Mock output handler to collect frames */ -struct MockOutputHandler : public qpid::framing::OutputHandler { - std::vector<qpid::framing::AMQFrame> frames; - void send(qpid::framing::AMQFrame& frame){ frames.push_back(frame); } -}; - -/** - * Combination mock OutputHandler and ChannelAdapter for tests. - */ -struct MockChannel : public qpid::framing::ChannelAdapter -{ - MockOutputHandler out; - - MockChannel(qpid::framing::ChannelId id) { - init(id, out, qpid::framing::ProtocolVersion()); - } - - bool isOpen() const { return true; } - - void handleHeader(qpid::framing::AMQHeaderBody* b) { send(*b); } - void handleContent(qpid::framing::AMQContentBody* b) { send(*b); } - void handleHeartbeat(qpid::framing::AMQHeartbeatBody* b) { send(*b); } - void handleMethod(qpid::framing::AMQMethodBody* b) { send(*b); }; - -}; - -#endif /*!_tests_MockChannel_h*/ diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp index 7e757cfad0..fae0d88b7c 100644 --- a/cpp/src/tests/QueueTest.cpp +++ b/cpp/src/tests/QueueTest.cpp @@ -25,7 +25,6 @@ #include "qpid/broker/QueueRegistry.h" #include "qpid_test_plugin.h" #include <iostream> -#include "MockChannel.h" #include "boost/format.hpp" using namespace qpid; diff --git a/cpp/src/tests/TxAckTest.cpp b/cpp/src/tests/TxAckTest.cpp index 1451fb65b6..6c0bc8b5ee 100644 --- a/cpp/src/tests/TxAckTest.cpp +++ b/cpp/src/tests/TxAckTest.cpp @@ -25,7 +25,6 @@ #include <iostream> #include <list> #include <vector> -#include "MockChannel.h" using std::list; using std::vector; diff --git a/cpp/src/tests/TxPublishTest.cpp b/cpp/src/tests/TxPublishTest.cpp index c9da9762ec..2b363acfec 100644 --- a/cpp/src/tests/TxPublishTest.cpp +++ b/cpp/src/tests/TxPublishTest.cpp @@ -25,7 +25,6 @@ #include <iostream> #include <list> #include <vector> -#include "MockChannel.h" #include "MessageUtils.h" using std::list; |