diff options
| author | Alan Conway <aconway@apache.org> | 2006-12-01 05:11:45 +0000 | 
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2006-12-01 05:11:45 +0000 | 
| commit | fb9ad93a3d422c1e83c998f44c4782f7bf1d1a66 (patch) | |
| tree | a2ebf932750bf13bf3db271f92df390335b0e844 /cpp/tests | |
| parent | 33c04c7e619a65e2d92ac231805e8ad27f4a29c2 (diff) | |
| download | qpid-python-fb9ad93a3d422c1e83c998f44c4782f7bf1d1a66.tar.gz | |
2006-12-01  Jim Meyering  <meyering@redhat.com>
This delta imposes two major changes on the C++ hierarchy:
  - adds autoconf, automake, libtool support
  - makes the hierarchy flatter and renames a few files (e.g., Queue.h,
  Queue.cpp) that appeared twice, once under client/ and again under broker/.
In the process, I've changed many #include directives, mostly
to remove a qpid/ or qpid/framing/ prefix from the file name argument.
Although most changes were to .cpp and .h files under qpid/cpp/, there
were also several to template files under qpid/gentools, and even one
to CppGenerator.java.
Nearly all files are moved to a new position in the hierarchy.
The new hierarchy looks like this:
  src               # this is the new home of qpidd.cpp
  tests             # all tests are here.  See Makefile.am.
  gen               # As before, all generated files go here.
  lib               # This is just a container for the 3 lib dirs:
  lib/client
  lib/broker
  lib/common
  lib/common/framing
  lib/common/sys
  lib/common/sys/posix
  lib/common/sys/apr
  build-aux
  m4
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@481159 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/tests')
35 files changed, 4181 insertions, 0 deletions
| diff --git a/cpp/tests/APRBaseTest.cpp b/cpp/tests/APRBaseTest.cpp new file mode 100644 index 0000000000..7d95c3bf52 --- /dev/null +++ b/cpp/tests/APRBaseTest.cpp @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <apr/APRBase.h> +#include <qpid_test_plugin.h> +#include <iostream> + +using namespace qpid::sys; + +class APRBaseTest : public CppUnit::TestCase   +{ +    CPPUNIT_TEST_SUITE(APRBaseTest); +    CPPUNIT_TEST(testMe); +    CPPUNIT_TEST_SUITE_END(); + +  public: + +    void testMe()  +    { +        APRBase::increment(); +        APRBase::increment(); +        APRBase::decrement(); +        APRBase::decrement(); +    } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(APRBaseTest); + diff --git a/cpp/tests/AccumulatedAckTest.cpp b/cpp/tests/AccumulatedAckTest.cpp new file mode 100644 index 0000000000..bfd9358422 --- /dev/null +++ b/cpp/tests/AccumulatedAckTest.cpp @@ -0,0 +1,85 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <AccumulatedAck.h> +#include <qpid_test_plugin.h> +#include <iostream> +#include <list> + +using std::list; +using namespace qpid::broker; + +class AccumulatedAckTest : public CppUnit::TestCase   +{ +        CPPUNIT_TEST_SUITE(AccumulatedAckTest); +        CPPUNIT_TEST(testCovers); +        CPPUNIT_TEST(testUpdateAndConsolidate); +        CPPUNIT_TEST_SUITE_END(); + +    public: +        void testCovers() +        { +            AccumulatedAck ack; +            ack.range = 5; +            ack.individual.push_back(7); +            ack.individual.push_back(9); +             +            CPPUNIT_ASSERT(ack.covers(1)); +            CPPUNIT_ASSERT(ack.covers(2)); +            CPPUNIT_ASSERT(ack.covers(3)); +            CPPUNIT_ASSERT(ack.covers(4)); +            CPPUNIT_ASSERT(ack.covers(5)); +            CPPUNIT_ASSERT(ack.covers(7)); +            CPPUNIT_ASSERT(ack.covers(9)); + +            CPPUNIT_ASSERT(!ack.covers(6)); +            CPPUNIT_ASSERT(!ack.covers(8)); +            CPPUNIT_ASSERT(!ack.covers(10)); +        } + +        void testUpdateAndConsolidate() +        { +            AccumulatedAck ack; +            ack.clear(); +            ack.update(1, false); +            ack.update(3, false); +            ack.update(10, false); +            ack.update(8, false); +            ack.update(6, false); +            ack.update(3, true); +            ack.update(2, true); +            ack.update(5, true); +            ack.consolidate(); +            CPPUNIT_ASSERT_EQUAL((u_int64_t) 5, ack.range); +            CPPUNIT_ASSERT_EQUAL((size_t) 3, ack.individual.size()); +            list<u_int64_t>::iterator i = ack.individual.begin(); +            CPPUNIT_ASSERT_EQUAL((u_int64_t) 6, *i); +            i++; +            CPPUNIT_ASSERT_EQUAL((u_int64_t) 8, *i); +            i++; +            CPPUNIT_ASSERT_EQUAL((u_int64_t) 10, *i); + +        } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(AccumulatedAckTest); + diff --git a/cpp/tests/BodyHandlerTest.cpp b/cpp/tests/BodyHandlerTest.cpp new file mode 100644 index 0000000000..5c3cba0f25 --- /dev/null +++ b/cpp/tests/BodyHandlerTest.cpp @@ -0,0 +1,109 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <iostream> +#include <amqp_framing.h> +#include <qpid_test_plugin.h> +using namespace qpid::framing; + +class BodyHandlerTest : public CppUnit::TestCase   +{ +    CPPUNIT_TEST_SUITE(BodyHandlerTest); +    CPPUNIT_TEST(testMethod); +    CPPUNIT_TEST(testHeader); +    CPPUNIT_TEST(testContent); +    CPPUNIT_TEST(testHeartbeat); +    CPPUNIT_TEST_SUITE_END(); +private: + +    class TestBodyHandler : public BodyHandler{ +        AMQMethodBody* const method; +        AMQHeaderBody* const header; +        AMQContentBody* const content; +        AMQHeartbeatBody* const heartbeat;         + +    public: + +        TestBodyHandler(AMQMethodBody* _method) : method(_method), header(0), content(0), heartbeat(0){} +        TestBodyHandler(AMQHeaderBody* _header) : method(0), header(_header), content(0), heartbeat(0){} +        TestBodyHandler(AMQContentBody* _content) : method(0), header(0), content(_content), heartbeat(0){} +        TestBodyHandler(AMQHeartbeatBody* _heartbeat) : method(0), header(0), content(0), heartbeat(_heartbeat){} + +	virtual void handleMethod(AMQMethodBody::shared_ptr body){ +            CPPUNIT_ASSERT(method); +            CPPUNIT_ASSERT_EQUAL(method, body.get()); +        } +	virtual void handleHeader(AMQHeaderBody::shared_ptr body){ +            CPPUNIT_ASSERT(header); +            CPPUNIT_ASSERT_EQUAL(header, body.get()); +        } +	virtual void handleContent(AMQContentBody::shared_ptr body){ +            CPPUNIT_ASSERT(content); +            CPPUNIT_ASSERT_EQUAL(content, body.get()); +        } +	virtual void handleHeartbeat(AMQHeartbeatBody::shared_ptr body){ +            CPPUNIT_ASSERT(heartbeat); +            CPPUNIT_ASSERT_EQUAL(heartbeat, body.get()); +        } +    }; +  	ProtocolVersion v; + +public: +         +    BodyHandlerTest() : v(8, 0) {} + +    void testMethod()  +    { +        AMQMethodBody* method = new QueueDeclareBody(v); +        AMQFrame frame(0, method); +        TestBodyHandler handler(method); +        handler.handleBody(frame.getBody()); +    } + +    void testHeader()  +    { +        AMQHeaderBody* header = new AMQHeaderBody(); +        AMQFrame frame(0, header); +        TestBodyHandler handler(header); +        handler.handleBody(frame.getBody()); +    } + +    void testContent()  +    { +        AMQContentBody* content = new AMQContentBody(); +        AMQFrame frame(0, content); +        TestBodyHandler handler(content); +        handler.handleBody(frame.getBody()); +    } + +    void testHeartbeat()  +    { +        AMQHeartbeatBody* heartbeat = new AMQHeartbeatBody(); +        AMQFrame frame(0, heartbeat); +        TestBodyHandler handler(heartbeat); +        handler.handleBody(frame.getBody()); +    } +}; + + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(BodyHandlerTest); + diff --git a/cpp/tests/ChannelTest.cpp b/cpp/tests/ChannelTest.cpp new file mode 100644 index 0000000000..6351b1555a --- /dev/null +++ b/cpp/tests/ChannelTest.cpp @@ -0,0 +1,138 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <BrokerChannel.h> +#include <BrokerMessage.h> +#include <qpid_test_plugin.h> +#include <iostream> +#include <memory> + +using namespace boost; +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::sys; + +struct DummyHandler : OutputHandler{ +    std::vector<AMQFrame*> frames;  + +    virtual void send(AMQFrame* frame){ +        frames.push_back(frame); +    } +}; + + +class ChannelTest : public CppUnit::TestCase   +{ +    CPPUNIT_TEST_SUITE(ChannelTest); +    CPPUNIT_TEST(testConsumerMgmt); +    CPPUNIT_TEST(testDeliveryNoAck); +    CPPUNIT_TEST_SUITE_END(); + +  public: + +    void testConsumerMgmt(){ +        Queue::shared_ptr queue(new Queue("my_queue")); +        Channel channel(0, 0, 0); +        CPPUNIT_ASSERT(!channel.exists("my_consumer")); + +        ConnectionToken* owner = 0; +        string tag("my_consumer"); +        channel.consume(tag, queue, false, false, owner); +        string tagA; +        string tagB; +        channel.consume(tagA, queue, false, false, owner); +        channel.consume(tagB, queue, false, false, owner); +        CPPUNIT_ASSERT_EQUAL((u_int32_t) 3, queue->getConsumerCount()); +        CPPUNIT_ASSERT(channel.exists("my_consumer")); +        CPPUNIT_ASSERT(channel.exists(tagA)); +        CPPUNIT_ASSERT(channel.exists(tagB)); +        channel.cancel(tagA); +        CPPUNIT_ASSERT_EQUAL((u_int32_t) 2, queue->getConsumerCount()); +        CPPUNIT_ASSERT(channel.exists("my_consumer")); +        CPPUNIT_ASSERT(!channel.exists(tagA)); +        CPPUNIT_ASSERT(channel.exists(tagB)); +        channel.close(); +        CPPUNIT_ASSERT_EQUAL((u_int32_t) 0, queue->getConsumerCount());         +    } + +    void testDeliveryNoAck(){ +        DummyHandler handler; +        Channel channel(&handler, 7, 10000); + +        Message::shared_ptr msg(new Message(0, "test", "my_routing_key", false, false)); +        AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); +        header->setContentSize(14); +        msg->setHeader(header); +        AMQContentBody::shared_ptr body(new AMQContentBody("abcdefghijklmn")); +        msg->addContent(body); + +        Queue::shared_ptr queue(new Queue("my_queue")); +        ConnectionToken* owner(0); +        string tag("no_ack"); +        channel.consume(tag, queue, false, false, owner); + +        queue->deliver(msg); +        CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size()); +        CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[0]->getChannel());         +        CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1]->getChannel());         +        CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2]->getChannel()); +        BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast<BasicDeliverBody, AMQBody>(handler.frames[0]->getBody())); +        AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(handler.frames[1]->getBody())); +        AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[2]->getBody())); +        CPPUNIT_ASSERT(deliver); +        CPPUNIT_ASSERT(contentHeader); +        CPPUNIT_ASSERT(contentBody); +        CPPUNIT_ASSERT_EQUAL(string("abcdefghijklmn"), contentBody->getData()); +    } + +    void testDeliveryAndRecovery(){ +        DummyHandler handler; +        Channel channel(&handler, 7, 10000); + +        Message::shared_ptr msg(new Message(0, "test", "my_routing_key", false, false)); +        AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); +        header->setContentSize(14); +        msg->setHeader(header); +        AMQContentBody::shared_ptr body(new AMQContentBody("abcdefghijklmn")); +        msg->addContent(body); + +        Queue::shared_ptr queue(new Queue("my_queue")); +        ConnectionToken* owner; +        string tag("ack"); +        channel.consume(tag, queue, true, false, owner); + +        queue->deliver(msg); +        CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size()); +        CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[0]->getChannel());         +        CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1]->getChannel());         +        CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2]->getChannel()); +        BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast<BasicDeliverBody, AMQBody>(handler.frames[0]->getBody())); +        AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(handler.frames[1]->getBody())); +        AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[2]->getBody())); +        CPPUNIT_ASSERT(deliver); +        CPPUNIT_ASSERT(contentHeader); +        CPPUNIT_ASSERT(contentBody); +        CPPUNIT_ASSERT_EQUAL(string("abcdefghijklmn"), contentBody->getData()); +    } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(ChannelTest); diff --git a/cpp/tests/ConfigurationTest.cpp b/cpp/tests/ConfigurationTest.cpp new file mode 100644 index 0000000000..2b308f45cf --- /dev/null +++ b/cpp/tests/ConfigurationTest.cpp @@ -0,0 +1,88 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <Configuration.h> +#include <qpid_test_plugin.h> +#include <iostream> + +using namespace std; +using namespace qpid::broker; + +class ConfigurationTest : public CppUnit::TestCase   +{ +    CPPUNIT_TEST_SUITE(ConfigurationTest); +    CPPUNIT_TEST(testIsHelp); +    CPPUNIT_TEST(testPortLongForm); +    CPPUNIT_TEST(testPortShortForm); +    CPPUNIT_TEST(testStore); +    CPPUNIT_TEST(testVarious); +    CPPUNIT_TEST_SUITE_END(); + +  public: + +    void testIsHelp()  +    { +        Configuration conf; +        char* argv[] = {"ignore", "--help"}; +        conf.parse(2, argv); +        CPPUNIT_ASSERT(conf.isHelp()); +    } + +    void testPortLongForm()  +    { +        Configuration conf; +        char* argv[] = {"ignore", "--port", "6789"}; +        conf.parse(3, argv); +        CPPUNIT_ASSERT_EQUAL(6789, conf.getPort()); +    } + +    void testPortShortForm()  +    { +        Configuration conf; +        char* argv[] = {"ignore", "-p", "6789"}; +        conf.parse(3, argv); +        CPPUNIT_ASSERT_EQUAL(6789, conf.getPort()); +    } + +    void testStore()  +    { +        Configuration conf; +        char* argv[] = {"ignore", "--store", "my-store-module.so"}; +        conf.parse(3, argv); +        std::string expected("my-store-module.so"); +        CPPUNIT_ASSERT_EQUAL(expected, conf.getStore()); +    } + +    void testVarious()  +    {         +        Configuration conf; +        char* argv[] = {"ignore", "-t", "--worker-threads", "10"}; +        conf.parse(4, argv); +        CPPUNIT_ASSERT_EQUAL(5672, conf.getPort());//default +        CPPUNIT_ASSERT_EQUAL(10, conf.getWorkerThreads()); +        CPPUNIT_ASSERT(conf.isTrace()); +        CPPUNIT_ASSERT(!conf.isHelp()); +    } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(ConfigurationTest); + diff --git a/cpp/tests/EventChannelTest.cpp b/cpp/tests/EventChannelTest.cpp new file mode 100644 index 0000000000..8e5c724a15 --- /dev/null +++ b/cpp/tests/EventChannelTest.cpp @@ -0,0 +1,187 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <posix/EventChannel.h> +#include <posix/check.h> +#include <sys/Runnable.h> +#include <sys/Socket.h> +#include <sys/Thread.h> +#include <qpid_test_plugin.h> + +#include <sys/socket.h> +#include <signal.h> +#include <netinet/in.h> +#include <netdb.h> +#include <iostream> + +using namespace qpid::sys; + + +const char hello[] = "hello"; +const size_t size = sizeof(hello); + +struct RunMe : public Runnable  +{ +    bool ran; +    RunMe() : ran(false) {} +    void run() { ran = true; } +}; + +class EventChannelTest : public CppUnit::TestCase   +{ +    CPPUNIT_TEST_SUITE(EventChannelTest); +    CPPUNIT_TEST(testEvent); +    CPPUNIT_TEST(testRead); +    CPPUNIT_TEST(testFailedRead); +    CPPUNIT_TEST(testWrite); +    CPPUNIT_TEST(testFailedWrite); +    CPPUNIT_TEST(testReadWrite); +    CPPUNIT_TEST(testAccept); +    CPPUNIT_TEST_SUITE_END(); + +  private: +    EventChannel::shared_ptr ec; +    int pipe[2]; +    char readBuf[size]; + +  public: + +    void setUp() +    { +        memset(readBuf, size, 0); +        ec = EventChannel::create(); +        if (::pipe(pipe) != 0) throw QPID_POSIX_ERROR(errno); +        // Ignore SIGPIPE, otherwise we will crash writing to broken pipe. +        signal(SIGPIPE, SIG_IGN); +    } + +    // Verify that calling getEvent returns event. +    template <class T> bool isNextEvent(T& event) +    { +        return &event == dynamic_cast<T*>(ec->getEvent()); +    } + +    template <class T> bool isNextEventOk(T& event) +    { +        Event* next = ec->getEvent(); +        if (next) next->throwIfError(); +        return &event == next; +    } +         +    void testEvent() +    { +        RunMe runMe; +        CPPUNIT_ASSERT(!runMe.ran); +        // Instances of Event just pass thru the channel immediately. +        Event e(runMe.functor()); +        ec->postEvent(e); +        CPPUNIT_ASSERT(isNextEventOk(e)); +        e.dispatch(); +        CPPUNIT_ASSERT(runMe.ran); +    } + +    void testRead() { +        ReadEvent re(pipe[0], readBuf, size); +        ec->postEvent(re); +        CPPUNIT_ASSERT_EQUAL(ssize_t(size), ::write(pipe[1], hello, size)); +        CPPUNIT_ASSERT(isNextEventOk(re)); +        CPPUNIT_ASSERT_EQUAL(size, re.getSize()); +        CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf)); +    } + +    void testFailedRead()  +    { +        ReadEvent re(pipe[0], readBuf, size); +        ec->postEvent(re); + +        // EOF before all data read. +        ::close(pipe[1]); +        CPPUNIT_ASSERT(isNextEvent(re)); +        CPPUNIT_ASSERT(re.hasError()); +        try { +            re.throwIfError(); +            CPPUNIT_FAIL("Expected QpidError."); +        } +        catch (const qpid::QpidError&) { } + +        //  Bad file descriptor. Note in this case we fail +        //  in postEvent and throw immediately. +        try { +            ReadEvent bad; +            ec->postEvent(bad); +            CPPUNIT_FAIL("Expected QpidError."); +        } +        catch (const qpid::QpidError&) { } +    } + +    void testWrite() { +        WriteEvent wr(pipe[1], hello, size); +        ec->postEvent(wr); +        CPPUNIT_ASSERT(isNextEventOk(wr)); +        CPPUNIT_ASSERT_EQUAL(ssize_t(size), ::read(pipe[0], readBuf, size));; +        CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf)); +    } + +    void testFailedWrite() { +        WriteEvent wr(pipe[1], hello, size); +        ::close(pipe[0]); +        ec->postEvent(wr); +        CPPUNIT_ASSERT(isNextEvent(wr)); +        CPPUNIT_ASSERT(wr.hasError()); +    } + +    void testReadWrite() +    { +        ReadEvent re(pipe[0], readBuf, size); +        WriteEvent wr(pipe[1], hello, size); +        ec->postEvent(re); +        ec->postEvent(wr); +        ec->getEvent(); +        ec->getEvent(); +        CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf)); +    } + +    void testAccept() { +        Socket s = Socket::createTcp(); +        int port = s.listen(0, 10); +        CPPUNIT_ASSERT(port != 0); + +        AcceptEvent ae(s.fd()); +        ec->postEvent(ae); +        Socket client = Socket::createTcp(); +        client.connect("localhost", port); +        CPPUNIT_ASSERT(isNextEvent(ae)); +        ae.dispatch(); + +        // Verify client writes are read by the accepted descriptor. +        char readBuf[size]; +        ReadEvent re(ae.getAcceptedDesscriptor(), readBuf, size); +        ec->postEvent(re); +        CPPUNIT_ASSERT_EQUAL(ssize_t(size), client.send(hello, sizeof(hello))); +        CPPUNIT_ASSERT(isNextEvent(re)); +        re.dispatch(); +        CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf)); +    } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(EventChannelTest); + diff --git a/cpp/tests/EventChannelThreadsTest.cpp b/cpp/tests/EventChannelThreadsTest.cpp new file mode 100644 index 0000000000..285ed29518 --- /dev/null +++ b/cpp/tests/EventChannelThreadsTest.cpp @@ -0,0 +1,247 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <iostream> +#include <boost/bind.hpp> + +#include <sys/Socket.h> +#include <posix/EventChannelThreads.h> +#include <qpid_test_plugin.h> + + +using namespace std; + +using namespace qpid::sys; + +const int nConnections = 5; +const int nMessages = 10; // Messages read/written per connection. + + +// Accepts + reads + writes. +const int totalEvents = nConnections+2*nConnections*nMessages; + +/** + * Messages are numbered 0..nMessages. + * We count the total number of events, and the + * number of reads and writes for each message number. + */ +class TestResults : public Monitor { +  public: +    TestResults() : isShutdown(false), nEventsRemaining(totalEvents) {} + +    void countEvent() { +        if (--nEventsRemaining == 0) +            shutdown(); +    } + +    void countRead(int messageNo) { +        ++reads[messageNo]; +        countEvent(); +    } + +    void countWrite(int messageNo) { +        ++writes[messageNo]; +        countEvent(); +    } + +    void shutdown(const std::string& exceptionMsg = std::string()) { +        ScopedLock lock(*this); +        exception = exceptionMsg; +        isShutdown = true; +        notifyAll(); +    } +     +    void wait() { +        ScopedLock lock(*this); +        Time deadline = now() + 10*TIME_SEC;  +        while (!isShutdown) { +            CPPUNIT_ASSERT(Monitor::wait(deadline)); +        } +    } + +    bool isShutdown; +    std::string exception; +    AtomicCount reads[nMessages]; +    AtomicCount writes[nMessages]; +    AtomicCount nEventsRemaining; +}; + +TestResults results; + +EventChannelThreads::shared_ptr threads; + +// Functor to wrap callbacks in try/catch. +class SafeCallback { +  public: +    SafeCallback(Runnable& r) : callback(r.functor()) {} +    SafeCallback(Event::Callback cb) : callback(cb) {} +     +    void operator()() { +        std::string exception; +        try { +            callback(); +            return; +        } +        catch (const std::exception& e) { +            exception = e.what(); +        } +        catch (...) { +            exception = "Unknown exception."; +        } +        results.shutdown(exception); +    } + +  private: +    Event::Callback callback; +}; + +/** Repost an event N times. */ +class Repost { +  public: +    Repost(int n) : count (n) {} +    virtual ~Repost() {} +     +    void repost(Event* event) { +        if (--count==0) { +            delete event; +        } else { +            threads->postEvent(event); +        } +    } +  private: +    int count; +}; +     +             + +/** Repeating read event. */ +class TestReadEvent : public ReadEvent, public Runnable, private Repost { +  public: +    explicit TestReadEvent(int fd=-1) : +        ReadEvent(fd, &value, sizeof(value), SafeCallback(*this)), +        Repost(nMessages) +    {} +     +    void run() { +        CPPUNIT_ASSERT_EQUAL(sizeof(value), getSize()); +        CPPUNIT_ASSERT(0 <= value); +        CPPUNIT_ASSERT(value < nMessages); +        results.countRead(value); +        repost(this); +    } +     +  private: +    int value; +    ReadEvent original; +}; + + +/** Fire and forget write event */ +class TestWriteEvent : public WriteEvent, public Runnable, private Repost { +  public: +    TestWriteEvent(int fd=-1) : +        WriteEvent(fd, &value, sizeof(value), SafeCallback(*this)), +        Repost(nMessages), +        value(0) +    {} +     +    void run() { +        CPPUNIT_ASSERT_EQUAL(sizeof(int), getSize()); +        results.countWrite(value++); +        repost(this); +    } + +  private: +    int value; +}; + +/** Fire-and-forget Accept event, posts reads on the accepted connection. */ +class TestAcceptEvent : public AcceptEvent, public Runnable, private Repost { +  public: +    TestAcceptEvent(int fd=-1) : +        AcceptEvent(fd, SafeCallback(*this)), +        Repost(nConnections) +    {} +     +    void run() { +        threads->postEvent(new TestReadEvent(getAcceptedDesscriptor())); +        results.countEvent(); +        repost(this); +    } +}; + +class EventChannelThreadsTest : public CppUnit::TestCase +{ +    CPPUNIT_TEST_SUITE(EventChannelThreadsTest); +    CPPUNIT_TEST(testThreads); +    CPPUNIT_TEST_SUITE_END(); + +  public: + +    void setUp() { +        threads = EventChannelThreads::create(EventChannel::create()); +    } + +    void tearDown() { +        threads.reset(); +    } + +    void testThreads() +    { +        Socket listener = Socket::createTcp(); +        int port = listener.listen(); + +        // Post looping accept events, will repost nConnections times. +        // The accept event will automatically post read events. +        threads->postEvent(new TestAcceptEvent(listener.fd())); + +        // Make connections. +        Socket connections[nConnections]; +        for (int i = 0; i < nConnections; ++i) { +            connections[i] = Socket::createTcp(); +            connections[i].connect("localhost", port); +        } + +        // Post looping write events. +        for (int i = 0; i < nConnections; ++i) { +            threads->postEvent(new TestWriteEvent(connections[i].fd())); +        } + +        // Wait for all events to be dispatched. +        results.wait(); + +        if (!results.exception.empty()) CPPUNIT_FAIL(results.exception); +        CPPUNIT_ASSERT_EQUAL(0, int(results.nEventsRemaining)); + +        // Expect a read and write for each messageNo from each connection. +        for (int messageNo = 0; messageNo < nMessages; ++messageNo) { +            CPPUNIT_ASSERT_EQUAL(nConnections, int(results.reads[messageNo])); +            CPPUNIT_ASSERT_EQUAL(nConnections, int(results.writes[messageNo])); +        } + +        threads->shutdown(); +        threads->join(); +    } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(EventChannelThreadsTest); + diff --git a/cpp/tests/ExceptionTest.cpp b/cpp/tests/ExceptionTest.cpp new file mode 100644 index 0000000000..6cea863168 --- /dev/null +++ b/cpp/tests/ExceptionTest.cpp @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include <Exception.h> +#include <qpid_test_plugin.h> + +using namespace qpid; + +struct CountDestroyedException : public Exception { +    int& count; +    static int staticCount; +    CountDestroyedException() : count(staticCount) { } +    CountDestroyedException(int& n) : count(n) {} +    ~CountDestroyedException() throw() { count++; } +    void throwSelf() const { throw *this; } +}; + +int CountDestroyedException::staticCount = 0; +          +         +class ExceptionTest : public CppUnit::TestCase  +{ +    CPPUNIT_TEST_SUITE(ExceptionTest); +    CPPUNIT_TEST(testHeapException); +    CPPUNIT_TEST_SUITE_END(); +  public: +    // Verify proper memory management for heap-allocated exceptions. +    void testHeapException() { +        int count = 0; +        try { +            std::auto_ptr<Exception> p( +                new CountDestroyedException(count)); +            p.release()->throwSelf(); +            CPPUNIT_FAIL("Expected CountDestroyedException."); +        } catch (const CountDestroyedException& e) { +            CPPUNIT_ASSERT(&e.count == &count); +        } +        CPPUNIT_ASSERT_EQUAL(1, count); +    } +}; + +     +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(ExceptionTest); + diff --git a/cpp/tests/ExchangeTest.cpp b/cpp/tests/ExchangeTest.cpp new file mode 100644 index 0000000000..8fef4ccaac --- /dev/null +++ b/cpp/tests/ExchangeTest.cpp @@ -0,0 +1,67 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <DeliverableMessage.h> +#include <DirectExchange.h> +#include <BrokerExchange.h> +#include <BrokerQueue.h> +#include <TopicExchange.h> +#include <qpid_test_plugin.h> +#include <iostream> + +using namespace qpid::broker; +using namespace qpid::sys; + +class ExchangeTest : public CppUnit::TestCase +{ +    CPPUNIT_TEST_SUITE(ExchangeTest); +    CPPUNIT_TEST(testMe); +    CPPUNIT_TEST_SUITE_END(); + +  public: + +    void testMe()  +    { +        Queue::shared_ptr queue(new Queue("queue", true)); +        Queue::shared_ptr queue2(new Queue("queue2", true)); + +        TopicExchange topic("topic"); +        topic.bind(queue, "abc", 0); +        topic.bind(queue2, "abc", 0); + +        DirectExchange direct("direct"); +        direct.bind(queue, "abc", 0); +        direct.bind(queue2, "abc", 0); + +        queue.reset(); +        queue2.reset(); + +        Message::shared_ptr msgPtr(new Message(0, "e", "A", true, true)); +        DeliverableMessage msg(msgPtr); +        topic.route(msg, "abc", 0); +        direct.route(msg, "abc", 0); + +    } +}; +     +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(ExchangeTest); diff --git a/cpp/tests/FieldTableTest.cpp b/cpp/tests/FieldTableTest.cpp new file mode 100644 index 0000000000..8d9285bf4b --- /dev/null +++ b/cpp/tests/FieldTableTest.cpp @@ -0,0 +1,55 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <iostream> +#include <amqp_framing.h> +#include <qpid_test_plugin.h> + +using namespace qpid::framing; + +class FieldTableTest : public CppUnit::TestCase   +{ +    CPPUNIT_TEST_SUITE(FieldTableTest); +    CPPUNIT_TEST(testMe); +    CPPUNIT_TEST_SUITE_END(); + +  public: + +    void testMe()  +    { +        FieldTable ft; +        ft.setString("A", "BCDE"); +        CPPUNIT_ASSERT_EQUAL(std::string("BCDE"), ft.getString("A")); + +        Buffer buffer(100); +        buffer.putFieldTable(ft); +        buffer.flip();      +        FieldTable ft2; +        buffer.getFieldTable(ft2); +        CPPUNIT_ASSERT_EQUAL(std::string("BCDE"), ft2.getString("A")); + +    } +}; + + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(FieldTableTest); + diff --git a/cpp/tests/FramingTest.cpp b/cpp/tests/FramingTest.cpp new file mode 100644 index 0000000000..edc62b6187 --- /dev/null +++ b/cpp/tests/FramingTest.cpp @@ -0,0 +1,149 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <ConnectionRedirectBody.h> +#include <ProtocolVersion.h> +#include <amqp_framing.h> +#include <iostream> +#include <qpid_test_plugin.h> +#include <sstream> +#include <typeinfo> + +using namespace qpid::framing; + +template <class T> +std::string tostring(const T& x)  +{ +    std::ostringstream out; +    out << x; +    return out.str(); +} + +class FramingTest : public CppUnit::TestCase   +{ +    CPPUNIT_TEST_SUITE(FramingTest); +    CPPUNIT_TEST(testBasicQosBody);  +    CPPUNIT_TEST(testConnectionSecureBody);  +    CPPUNIT_TEST(testConnectionRedirectBody); +    CPPUNIT_TEST(testAccessRequestBody); +    CPPUNIT_TEST(testBasicConsumeBody); +    CPPUNIT_TEST(testConnectionRedirectBodyFrame); +    CPPUNIT_TEST(testBasicConsumeOkBodyFrame); +    CPPUNIT_TEST_SUITE_END(); + +  private: +    Buffer buffer; +  	ProtocolVersion v; + +  public: + +// AMQP version management change - kpvdr 2006-11-17 +// TODO: Make this class version-aware and link these hard-wired numbers to that version +    FramingTest() : buffer(100), v(8, 0) {} + +    void testBasicQosBody()  +    { +        BasicQosBody in(v, 0xCAFEBABE, 0xABBA, true); +        in.encodeContent(buffer); +        buffer.flip();  +        BasicQosBody out(v); +        out.decodeContent(buffer); +        CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); +    } +     +    void testConnectionSecureBody()  +    { +        std::string s = "security credential"; +        ConnectionSecureBody in(v, s); +        in.encodeContent(buffer); +        buffer.flip();  +        ConnectionSecureBody out(v); +        out.decodeContent(buffer); +        CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); +    } + +    void testConnectionRedirectBody() +    { +        std::string a = "hostA"; +        std::string b = "hostB"; +        ConnectionRedirectBody in(v, a, b); +        in.encodeContent(buffer); +        buffer.flip();  +        ConnectionRedirectBody out(v); +        out.decodeContent(buffer); +        CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); +    } + +    void testAccessRequestBody() +    { +        std::string s = "text"; +        AccessRequestBody in(v, s, true, false, true, false, true); +        in.encodeContent(buffer); +        buffer.flip();  +        AccessRequestBody out(v); +        out.decodeContent(buffer); +        CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); +    } + +    void testBasicConsumeBody() +    { +        std::string q = "queue"; +        std::string t = "tag"; +        BasicConsumeBody in(v, 0, q, t, false, true, false, false); +        in.encodeContent(buffer); +        buffer.flip();  +        BasicConsumeBody out(v); +        out.decodeContent(buffer); +        CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); +    } +     + +    void testConnectionRedirectBodyFrame() +    { +        std::string a = "hostA"; +        std::string b = "hostB"; +        AMQFrame in(999, new ConnectionRedirectBody(v, a, b)); +        in.encode(buffer); +        buffer.flip();  +        AMQFrame out; +        out.decode(buffer); +        CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); +    } + +    void testBasicConsumeOkBodyFrame() +    { +        std::string s = "hostA"; +        AMQFrame in(999, new BasicConsumeOkBody(v, s)); +        in.encode(buffer); +        buffer.flip();  +        AMQFrame out; +        for(int i = 0; i < 5; i++){ +            out.decode(buffer); +            CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); +        } +    } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(FramingTest); + + + diff --git a/cpp/tests/HeaderTest.cpp b/cpp/tests/HeaderTest.cpp new file mode 100644 index 0000000000..01927c7190 --- /dev/null +++ b/cpp/tests/HeaderTest.cpp @@ -0,0 +1,141 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <iostream> +#include <amqp_framing.h> +#include <qpid_test_plugin.h> + +using namespace qpid::framing; + +class HeaderTest : public CppUnit::TestCase   +{ +    CPPUNIT_TEST_SUITE(HeaderTest); +    CPPUNIT_TEST(testGenericProperties); +    CPPUNIT_TEST(testAllSpecificProperties); +    CPPUNIT_TEST(testSomeSpecificProperties); +    CPPUNIT_TEST_SUITE_END(); + +public: + +    void testGenericProperties()  +    { +        AMQHeaderBody body(BASIC); +        dynamic_cast<BasicHeaderProperties*>(body.getProperties())->getHeaders().setString("A", "BCDE"); +        Buffer buffer(100); + +        body.encode(buffer); +        buffer.flip();      +        AMQHeaderBody body2; +        body2.decode(buffer, body.size()); +        BasicHeaderProperties* props = +            dynamic_cast<BasicHeaderProperties*>(body2.getProperties()); +        CPPUNIT_ASSERT_EQUAL(std::string("BCDE"), +                             props->getHeaders().getString("A")); +    } + +    void testAllSpecificProperties(){ +	string contentType("text/html"); +	string contentEncoding("UTF8"); +	u_int8_t deliveryMode(2); +	u_int8_t priority(3); +	string correlationId("abc"); +	string replyTo("no-address"); +	string expiration("why is this a string?"); +	string messageId("xyz"); +	u_int64_t timestamp(0xabcd); +	string type("eh?"); +	string userId("guest"); +	string appId("just testing"); +	string clusterId("no clustering required"); + +        AMQHeaderBody body(BASIC); +        BasicHeaderProperties* properties =  +            dynamic_cast<BasicHeaderProperties*>(body.getProperties()); +        properties->setContentType(contentType); +        properties->getHeaders().setString("A", "BCDE"); +        properties->setDeliveryMode(deliveryMode); +        properties->setPriority(priority); +        properties->setCorrelationId(correlationId); +        properties->setReplyTo(replyTo); +        properties->setExpiration(expiration); +        properties->setMessageId(messageId); +        properties->setTimestamp(timestamp); +        properties->setType(type); +        properties->setUserId(userId); +        properties->setAppId(appId); +        properties->setClusterId(clusterId); + +        Buffer buffer(10000); +        body.encode(buffer); +        buffer.flip();      +        AMQHeaderBody temp; +        temp.decode(buffer, body.size()); +        properties = dynamic_cast<BasicHeaderProperties*>(temp.getProperties()); + +        CPPUNIT_ASSERT_EQUAL(contentType, properties->getContentType()); +        CPPUNIT_ASSERT_EQUAL(std::string("BCDE"), properties->getHeaders().getString("A")); +        CPPUNIT_ASSERT_EQUAL(deliveryMode, properties->getDeliveryMode()); +        CPPUNIT_ASSERT_EQUAL(priority, properties->getPriority()); +        CPPUNIT_ASSERT_EQUAL(correlationId, properties->getCorrelationId()); +        CPPUNIT_ASSERT_EQUAL(replyTo, properties->getReplyTo()); +        CPPUNIT_ASSERT_EQUAL(expiration, properties->getExpiration()); +        CPPUNIT_ASSERT_EQUAL(messageId, properties->getMessageId()); +        CPPUNIT_ASSERT_EQUAL(timestamp, properties->getTimestamp()); +        CPPUNIT_ASSERT_EQUAL(type, properties->getType()); +        CPPUNIT_ASSERT_EQUAL(userId, properties->getUserId()); +        CPPUNIT_ASSERT_EQUAL(appId, properties->getAppId()); +        CPPUNIT_ASSERT_EQUAL(clusterId, properties->getClusterId()); +    } + +    void testSomeSpecificProperties(){ +        string contentType("application/octet-stream"); +        u_int8_t deliveryMode(5); +        u_int8_t priority(6); +        string expiration("Z"); +        u_int64_t timestamp(0xabe4a34a); + +        AMQHeaderBody body(BASIC); +        BasicHeaderProperties* properties =  +            dynamic_cast<BasicHeaderProperties*>(body.getProperties()); +        properties->setContentType(contentType); +        properties->setDeliveryMode(deliveryMode); +        properties->setPriority(priority); +        properties->setExpiration(expiration); +        properties->setTimestamp(timestamp); + +        Buffer buffer(100); +        body.encode(buffer); +        buffer.flip();      +        AMQHeaderBody temp; +        temp.decode(buffer, body.size()); +        properties = dynamic_cast<BasicHeaderProperties*>(temp.getProperties()); + +        CPPUNIT_ASSERT_EQUAL(contentType, properties->getContentType()); +        CPPUNIT_ASSERT_EQUAL((int) deliveryMode, (int) properties->getDeliveryMode()); +        CPPUNIT_ASSERT_EQUAL((int) priority, (int) properties->getPriority()); +        CPPUNIT_ASSERT_EQUAL(expiration, properties->getExpiration()); +        CPPUNIT_ASSERT_EQUAL(timestamp, properties->getTimestamp()); +    } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(HeaderTest); + diff --git a/cpp/tests/HeadersExchangeTest.cpp b/cpp/tests/HeadersExchangeTest.cpp new file mode 100644 index 0000000000..6cd51c55a9 --- /dev/null +++ b/cpp/tests/HeadersExchangeTest.cpp @@ -0,0 +1,115 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <HeadersExchange.h> +#include <FieldTable.h> +#include <Value.h> +#include <qpid_test_plugin.h> + +using namespace qpid::broker; +using namespace qpid::framing; + +class HeadersExchangeTest : public CppUnit::TestCase +{ +    CPPUNIT_TEST_SUITE(HeadersExchangeTest); +    CPPUNIT_TEST(testMatchAll); +    CPPUNIT_TEST(testMatchAny); +    CPPUNIT_TEST(testMatchEmptyValue); +    CPPUNIT_TEST(testMatchEmptyArgs); +    CPPUNIT_TEST(testMatchNoXMatch); +    CPPUNIT_TEST_SUITE_END(); + +  public: + +    void testMatchAll()  +    { +        FieldTable b, m; +        b.setString("x-match", "all"); +        b.setString("foo", "FOO"); +        b.setInt("n", 42); +        m.setString("foo", "FOO"); +        m.setInt("n", 42); +        CPPUNIT_ASSERT(HeadersExchange::match(b, m)); + +        // Ignore extras. +        m.setString("extra", "x"); +        CPPUNIT_ASSERT(HeadersExchange::match(b, m)); +         +        // Fail mismatch, wrong value. +        m.setString("foo", "NotFoo"); +        CPPUNIT_ASSERT(!HeadersExchange::match(b, m)); + +        // Fail mismatch, missing value +        m.erase("foo"); +        CPPUNIT_ASSERT(!HeadersExchange::match(b, m)); +    } + +    void testMatchAny()  +    { +        FieldTable b, m; +        b.setString("x-match", "any"); +        b.setString("foo", "FOO"); +        b.setInt("n", 42); +        m.setString("foo", "FOO"); +        CPPUNIT_ASSERT(HeadersExchange::match(b, m)); +        m.erase("foo"); +        CPPUNIT_ASSERT(!HeadersExchange::match(b, m)); +        m.setInt("n", 42); +        CPPUNIT_ASSERT(HeadersExchange::match(b, m)); +    } + +    void testMatchEmptyValue()  +    { +        FieldTable b, m; +        b.setString("x-match", "all"); +        b.getMap()["foo"] = FieldTable::ValuePtr(new EmptyValue()); +        b.getMap()["n"] = FieldTable::ValuePtr(new EmptyValue()); +        CPPUNIT_ASSERT(!HeadersExchange::match(b, m)); +        m.setString("foo", "blah"); +        m.setInt("n", 123); +    } + +    void testMatchEmptyArgs() +    { +        FieldTable b, m; +        m.setString("foo", "FOO"); +         +        b.setString("x-match", "all"); +        CPPUNIT_ASSERT(HeadersExchange::match(b, m)); +        b.setString("x-match", "any"); +        CPPUNIT_ASSERT(!HeadersExchange::match(b, m)); +    } +     + +    void testMatchNoXMatch()  +    { +        FieldTable b, m; +        b.setString("foo", "FOO"); +        m.setString("foo", "FOO"); +        CPPUNIT_ASSERT(!HeadersExchange::match(b, m)); +    } +     +     +}; +     +// make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(HeadersExchangeTest); diff --git a/cpp/tests/InMemoryContentTest.cpp b/cpp/tests/InMemoryContentTest.cpp new file mode 100644 index 0000000000..64789ed836 --- /dev/null +++ b/cpp/tests/InMemoryContentTest.cpp @@ -0,0 +1,97 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <InMemoryContent.h> +#include <qpid_test_plugin.h> +#include <iostream> +#include <list> + +using std::list; +using std::string; +using boost::dynamic_pointer_cast; +using namespace qpid::broker; +using namespace qpid::framing; + +struct DummyHandler : OutputHandler{ +    std::vector<AMQFrame*> frames;  + +    virtual void send(AMQFrame* frame){ +        frames.push_back(frame); +    } +}; + +class InMemoryContentTest : public CppUnit::TestCase   +{ +        CPPUNIT_TEST_SUITE(InMemoryContentTest); +        CPPUNIT_TEST(testRefragmentation); +        CPPUNIT_TEST_SUITE_END(); + +public: +    void testRefragmentation() +    { +        {//no remainder +            string out[] = {"abcde", "fghij", "klmno", "pqrst"}; +            string in[] = {out[0] + out[1], out[2] + out[3]};         +            refragment(2, in, 4, out); +        } +        {//remainder for last frame +            string out[] = {"abcde", "fghij", "klmno", "pqrst", "uvw"}; +            string in[] = {out[0] + out[1], out[2] + out[3] + out[4]};         +            refragment(2, in, 5, out); +        } +    } + + +    void refragment(size_t inCount, string* in, size_t outCount, string* out, u_int32_t framesize = 5) +    { +        InMemoryContent content; +        DummyHandler handler; +        u_int16_t channel = 3; + +        addframes(content, inCount, in); +        content.send(&handler, channel, framesize);          +        check(handler, channel, outCount, out); +    } + +    void addframes(InMemoryContent& content, size_t frameCount, string* frameData) +    { +        for (unsigned int i = 0; i < frameCount; i++) { +            AMQContentBody::shared_ptr frame(new AMQContentBody(frameData[i])); +            content.add(frame); +        } +    } + +    void check(DummyHandler& handler, u_int16_t channel, size_t expectedChunkCount, string* expectedChunks) +    { +        CPPUNIT_ASSERT_EQUAL(expectedChunkCount, handler.frames.size()); + +        for (unsigned int i = 0; i < expectedChunkCount; i++) { +            AMQContentBody::shared_ptr chunk(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[i]->getBody())); +            CPPUNIT_ASSERT(chunk); +            CPPUNIT_ASSERT_EQUAL(expectedChunks[i], chunk->getData()); +            CPPUNIT_ASSERT_EQUAL(channel, handler.frames[i]->getChannel()); +        } +    } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(InMemoryContentTest); + diff --git a/cpp/tests/LazyLoadedContentTest.cpp b/cpp/tests/LazyLoadedContentTest.cpp new file mode 100644 index 0000000000..a8e7d61e0d --- /dev/null +++ b/cpp/tests/LazyLoadedContentTest.cpp @@ -0,0 +1,122 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <LazyLoadedContent.h> +#include <NullMessageStore.h> +#include <qpid_test_plugin.h> +#include <iostream> +#include <list> +#include <sstream> + +using std::list; +using std::string; +using boost::dynamic_pointer_cast; +using namespace qpid::broker; +using namespace qpid::framing; + +struct DummyHandler : OutputHandler{ +    std::vector<AMQFrame*> frames;  + +    virtual void send(AMQFrame* frame){ +        frames.push_back(frame); +    } +}; + + +class LazyLoadedContentTest : public CppUnit::TestCase   +{ +        CPPUNIT_TEST_SUITE(LazyLoadedContentTest); +        CPPUNIT_TEST(testFragmented); +        CPPUNIT_TEST(testWhole); +        CPPUNIT_TEST(testHalved); +        CPPUNIT_TEST_SUITE_END(); + +    class TestMessageStore : public NullMessageStore +    { +        const string content; +         +    public: +        TestMessageStore(const string& _content) : content(_content) {} + +        void loadContent(Message* const, string& data, u_int64_t offset, u_int32_t length) +        { +            if (offset + length <= content.size()) { +                data = content.substr(offset, length); +            } else{ +                std::stringstream error; +                error << "Invalid segment: offset=" << offset << ", length=" << length << ", content_length=" << content.size(); +                throw qpid::Exception(error.str()); +            } +        } +    }; + + +public: +    void testFragmented() +    { +        string data = "abcdefghijklmnopqrstuvwxyz"; +        u_int32_t framesize = 5; +        string out[] = {"abcde", "fghij", "klmno", "pqrst", "uvwxy", "z"}; +        load(data, 6, out, framesize); +    } + +    void testWhole() +    { +        string data = "abcdefghijklmnopqrstuvwxyz"; +        u_int32_t framesize = 50; +        string out[] = {data}; +        load(data, 1, out, framesize); +    } + +    void testHalved() +    { +        string data = "abcdefghijklmnopqrstuvwxyz"; +        u_int32_t framesize = 13; +        string out[] = {"abcdefghijklm", "nopqrstuvwxyz"}; +        load(data, 2, out, framesize); +    } + +    void load(string& in, size_t outCount, string* out, u_int32_t framesize) +    { +        TestMessageStore store(in); +        LazyLoadedContent content(&store, 0, in.size()); +        DummyHandler handler; +        u_int16_t channel = 3; +        content.send(&handler, channel, framesize);          +        check(handler, channel, outCount, out); +    } + +    void check(DummyHandler& handler, u_int16_t channel, size_t expectedChunkCount, string* expectedChunks) +    { +        CPPUNIT_ASSERT_EQUAL(expectedChunkCount, handler.frames.size()); + +        for (unsigned int i = 0; i < expectedChunkCount; i++) { +            AMQContentBody::shared_ptr chunk(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[i]->getBody())); +            CPPUNIT_ASSERT(chunk); +            CPPUNIT_ASSERT_EQUAL(expectedChunks[i], chunk->getData()); +            CPPUNIT_ASSERT_EQUAL(channel, handler.frames[i]->getChannel()); +        } +    } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(LazyLoadedContentTest); + diff --git a/cpp/tests/Makefile.am b/cpp/tests/Makefile.am new file mode 100644 index 0000000000..8c7acc067d --- /dev/null +++ b/cpp/tests/Makefile.am @@ -0,0 +1,100 @@ +# TODO aconway 2006-11-30: nasty hack, should be done by automake? +abs_builddir = @abs_builddir@ + +AM_CXXFLAGS = $(WARNING_CFLAGS) +INCLUDES = \ +  -I$(top_srcdir)/gen \ +  -I$(top_srcdir)/lib \ +  -I$(top_srcdir)/lib/client \ +  -I$(top_srcdir)/lib/broker \ +  -I$(top_srcdir)/lib/common \ +  -I$(top_srcdir)/lib/common/sys \ +  -I$(top_srcdir)/lib/common/framing + +# FIXME: -lcppunit must come from autoconf + +# FIXME: have e.g., topicall, run as part of "make check"? +EXTRA_DIST =		\ +  env			\ +  broker		\ +  topicall		\ +  topictest		\ +  qpid_test_plugin.h	\ +  APRBaseTest.cpp + +client_tests =		\ +  client_test		\ +  echo_service		\ +  topic_listener	\ +  topic_publisher + +broker_tests =		\ +  AccumulatedAckTest	\ +  ChannelTest		\ +  ConfigurationTest	\ +  ExchangeTest		\ +  HeadersExchangeTest	\ +  InMemoryContentTest	\ +  LazyLoadedContentTest	\ +  MessageBuilderTest	\ +  MessageTest		\ +  QueueRegistryTest	\ +  QueueTest		\ +  TopicExchangeTest	\ +  TxAckTest		\ +  TxBufferTest		\ +  TxPublishTest		\ +  ValueTest + +framing_tests =		\ +  BodyHandlerTest	\ +  FieldTableTest	\ +  FramingTest		\ +  HeaderTest + +misc_tests =		\ +  ExceptionTest + +posix_tests =		\ +  EventChannelTest	\ +  EventChannelThreadsTest + +unit_tests =		\ +  $(broker_tests)	\ +  $(framing_tests)	\ +  $(misc_tests)		\ +  $(posix_tests) + +noinst_PROGRAMS = $(client_tests) + +check: run-unit-tests + +.PHONY: run-unit-tests +run-unit-tests: $(check_LTLIBRARIES) +	DllPlugInTester -c -b .libs/*.so + +include gen.mk + +extra_libs = -lcppunit +lib_client = ../lib/client/libclient.la +lib_common = ../lib/common/libcommon.la +lib_broker = ../lib/broker/libbroker.la + +gen.mk: Makefile.am +	(					\ +	  for i in $(client_tests); do		\ +	    echo $${i}_SOURCES = $$i.cpp;	\ +	    echo $${i}_LDADD = '$$(lib_client) $$(lib_common) $$(extra_libs)'; \ +	  done;					\ +	  libs=;				\ +	  for i in $(unit_tests); do		\ +	    libs="$$libs $${i}.la";		\ +	    echo $${i}_la_SOURCES = $$i.cpp;	\ +	    echo $${i}_la_LIBADD = '$$(lib_common)';			\ +	    echo $${i}_la_LIBADD += '$$(lib_broker) $$(extra_libs)';	\ +	    echo $${i}_la_LDFLAGS = '-module -rpath $$(abs_builddir)';	\ +	  done;					\ +	  echo "check_LTLIBRARIES =$$libs";	\ +	)					\ +	> $@-t +	mv $@-t $@ diff --git a/cpp/tests/MessageBuilderTest.cpp b/cpp/tests/MessageBuilderTest.cpp new file mode 100644 index 0000000000..d609fb9b75 --- /dev/null +++ b/cpp/tests/MessageBuilderTest.cpp @@ -0,0 +1,203 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <Exception.h> +#include <BrokerMessage.h> +#include <MessageBuilder.h> +#include <NullMessageStore.h> +#include <Buffer.h> +#include <qpid_test_plugin.h> +#include <iostream> +#include <memory> + +using namespace boost; +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::sys; + +class MessageBuilderTest : public CppUnit::TestCase   +{ +    struct DummyHandler : MessageBuilder::CompletionHandler{ +        Message::shared_ptr msg; +         +        virtual void complete(Message::shared_ptr& _msg){ +            msg = _msg; +        } +    }; + +    class TestMessageStore : public NullMessageStore +    { +        Buffer* header; +        Buffer* content; +        const u_int32_t contentBufferSize; +         +    public: + +        void stage(Message::shared_ptr& msg) +        { +            if (msg->getPersistenceId() == 0) { +                header = new Buffer(msg->encodedHeaderSize()); +                msg->encodeHeader(*header);                 +                content = new Buffer(contentBufferSize); +                msg->setPersistenceId(1); +            } else { +                throw qpid::Exception("Message already staged!"); +            } +        } + +        void appendContent(u_int64_t msgId, const string& data) +        { +            if (msgId == 1) { +                content->putRawData(data); +            } else { +                throw qpid::Exception("Invalid message id!"); +            } +        } + +        Message::shared_ptr getRestoredMessage() +        { +            Message::shared_ptr msg(new Message()); +            if (header) { +                header->flip(); +                msg->decodeHeader(*header); +                delete header; +                header = 0;  +                if (content) { +                    content->flip(); +                    msg->decodeContent(*content); +                    delete content; +                    content = 0; +                } +            } +            return msg; +        } +         +        //dont care about any of the other methods: +        TestMessageStore(u_int32_t _contentBufferSize) : NullMessageStore(false), header(0), content(0),  +                                                         contentBufferSize(_contentBufferSize) {} +        ~TestMessageStore(){} +    }; + +    CPPUNIT_TEST_SUITE(MessageBuilderTest); +    CPPUNIT_TEST(testHeaderOnly); +    CPPUNIT_TEST(test1ContentFrame); +    CPPUNIT_TEST(test2ContentFrames); +    CPPUNIT_TEST(testStaging); +    CPPUNIT_TEST_SUITE_END(); + +  public: + +    void testHeaderOnly(){ +        DummyHandler handler; +        MessageBuilder builder(&handler); + +        Message::shared_ptr message(new Message(0, "test", "my_routing_key", false, false)); +        AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); +        header->setContentSize(0); +         +        builder.initialise(message); +        CPPUNIT_ASSERT(!handler.msg); +        builder.setHeader(header); +        CPPUNIT_ASSERT(handler.msg); +        CPPUNIT_ASSERT_EQUAL(message, handler.msg); +    } + +    void test1ContentFrame(){ +        DummyHandler handler; +        MessageBuilder builder(&handler); + +        string data1("abcdefg"); + +        Message::shared_ptr message(new Message(0, "test", "my_routing_key", false, false)); +        AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); +        header->setContentSize(7); +        AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); +         +        builder.initialise(message); +        CPPUNIT_ASSERT(!handler.msg); +        builder.setHeader(header); +        CPPUNIT_ASSERT(!handler.msg); +        builder.addContent(part1); +        CPPUNIT_ASSERT(handler.msg); +        CPPUNIT_ASSERT_EQUAL(message, handler.msg); +    } + +    void test2ContentFrames(){ +        DummyHandler handler; +        MessageBuilder builder(&handler); + +        string data1("abcdefg"); +        string data2("hijklmn"); + +        Message::shared_ptr message(new Message(0, "test", "my_routing_key", false, false)); +        AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); +        header->setContentSize(14); +        AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); +        AMQContentBody::shared_ptr part2(new AMQContentBody(data2));         +         +        builder.initialise(message); +        CPPUNIT_ASSERT(!handler.msg); +        builder.setHeader(header); +        CPPUNIT_ASSERT(!handler.msg); +        builder.addContent(part1); +        CPPUNIT_ASSERT(!handler.msg); +        builder.addContent(part2); +        CPPUNIT_ASSERT(handler.msg); +        CPPUNIT_ASSERT_EQUAL(message, handler.msg); +    } + +    void testStaging(){ +        DummyHandler handler; +        TestMessageStore store(14); +        MessageBuilder builder(&handler, &store, 5); + +        string data1("abcdefg"); +        string data2("hijklmn"); + +        Message::shared_ptr message(new Message(0, "test", "my_routing_key", false, false)); +        AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); +        header->setContentSize(14); +        BasicHeaderProperties* properties = dynamic_cast<BasicHeaderProperties*>(header->getProperties()); +        properties->setMessageId("MyMessage"); +        properties->getHeaders().setString("abc", "xyz"); + +        AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); +        AMQContentBody::shared_ptr part2(new AMQContentBody(data2));         +         +        builder.initialise(message); +        builder.setHeader(header); +        builder.addContent(part1); +        builder.addContent(part2); +        CPPUNIT_ASSERT(handler.msg); +        CPPUNIT_ASSERT_EQUAL(message, handler.msg); + +        Message::shared_ptr restored = store.getRestoredMessage(); +        CPPUNIT_ASSERT_EQUAL(message->getExchange(), restored->getExchange()); +        CPPUNIT_ASSERT_EQUAL(message->getRoutingKey(), restored->getRoutingKey()); +        CPPUNIT_ASSERT_EQUAL(message->getHeaderProperties()->getMessageId(), restored->getHeaderProperties()->getMessageId()); +        CPPUNIT_ASSERT_EQUAL(message->getHeaderProperties()->getHeaders().getString("abc"),  +                             restored->getHeaderProperties()->getHeaders().getString("abc")); +        CPPUNIT_ASSERT_EQUAL((u_int64_t) 14, restored->contentSize()); +    } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(MessageBuilderTest); diff --git a/cpp/tests/MessageTest.cpp b/cpp/tests/MessageTest.cpp new file mode 100644 index 0000000000..3b3265476c --- /dev/null +++ b/cpp/tests/MessageTest.cpp @@ -0,0 +1,90 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <BrokerMessage.h> +#include <qpid_test_plugin.h> +#include <iostream> + +using namespace boost; +using namespace qpid::broker; +using namespace qpid::framing; + +struct DummyHandler : OutputHandler{ +    std::vector<AMQFrame*> frames;  + +    virtual void send(AMQFrame* frame){ +        frames.push_back(frame); +    } +}; + +class MessageTest : public CppUnit::TestCase   +{ +    CPPUNIT_TEST_SUITE(MessageTest); +    CPPUNIT_TEST(testEncodeDecode); +    CPPUNIT_TEST_SUITE_END(); + +  public: + +    void testEncodeDecode() +    { +        string exchange = "MyExchange"; +        string routingKey = "MyRoutingKey"; +        string messageId = "MyMessage"; +        string data1("abcdefg"); +        string data2("hijklmn"); + +        Message::shared_ptr msg = Message::shared_ptr(new Message(0, exchange, routingKey, false, false)); +        AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); +        header->setContentSize(14);         +        AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); +        AMQContentBody::shared_ptr part2(new AMQContentBody(data2));         +        msg->setHeader(header); +        msg->addContent(part1); +        msg->addContent(part2); + +        msg->getHeaderProperties()->setMessageId(messageId); +        msg->getHeaderProperties()->setDeliveryMode(PERSISTENT); +        msg->getHeaderProperties()->getHeaders().setString("abc", "xyz"); + +        Buffer buffer(msg->encodedSize()); +        msg->encode(buffer); +        buffer.flip(); +         +        msg = Message::shared_ptr(new Message(buffer)); +        CPPUNIT_ASSERT_EQUAL(exchange, msg->getExchange()); +        CPPUNIT_ASSERT_EQUAL(routingKey, msg->getRoutingKey()); +        CPPUNIT_ASSERT_EQUAL(messageId, msg->getHeaderProperties()->getMessageId()); +        CPPUNIT_ASSERT_EQUAL((u_int8_t) PERSISTENT, msg->getHeaderProperties()->getDeliveryMode()); +        CPPUNIT_ASSERT_EQUAL(string("xyz"), msg->getHeaderProperties()->getHeaders().getString("abc")); +        CPPUNIT_ASSERT_EQUAL((u_int64_t) 14, msg->contentSize()); + +        DummyHandler handler; +        msg->deliver(&handler, 0, "ignore", 0, 100);  +        CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size()); +        AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[2]->getBody())); +        CPPUNIT_ASSERT(contentBody); +        CPPUNIT_ASSERT_EQUAL(data1 + data2, contentBody->getData()); +    } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(MessageTest); + diff --git a/cpp/tests/QueueRegistryTest.cpp b/cpp/tests/QueueRegistryTest.cpp new file mode 100644 index 0000000000..3926d56292 --- /dev/null +++ b/cpp/tests/QueueRegistryTest.cpp @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <QueueRegistry.h> +#include <qpid_test_plugin.h> +#include <string> + +using namespace qpid::broker; + +class QueueRegistryTest : public CppUnit::TestCase  +{ +    CPPUNIT_TEST_SUITE(QueueRegistryTest); +    CPPUNIT_TEST(testDeclare); +    CPPUNIT_TEST(testDeclareTmp); +    CPPUNIT_TEST(testFind); +    CPPUNIT_TEST(testDestroy); +    CPPUNIT_TEST_SUITE_END(); + +  private: +    std::string foo, bar; +    QueueRegistry reg; +    std::pair<Queue::shared_ptr,  bool> qc; +     +  public: +    void setUp() { +        foo = "foo"; +        bar = "bar"; +    } +     +    void testDeclare() { +        qc = reg.declare(foo, false, 0, 0); +        Queue::shared_ptr q = qc.first; +        CPPUNIT_ASSERT(q); +        CPPUNIT_ASSERT(qc.second); // New queue +        CPPUNIT_ASSERT_EQUAL(foo, q->getName()); + +        qc = reg.declare(foo, false, 0, 0); +        CPPUNIT_ASSERT_EQUAL(q, qc.first); +        CPPUNIT_ASSERT(!qc.second); + +        qc = reg.declare(bar, false, 0, 0); +        q = qc.first; +        CPPUNIT_ASSERT(q); +        CPPUNIT_ASSERT_EQUAL(true, qc.second); +        CPPUNIT_ASSERT_EQUAL(bar, q->getName()); +    } + +    void testDeclareTmp()  +    { +        qc = reg.declare(std::string(), false, 0, 0); +        CPPUNIT_ASSERT(qc.second); +        CPPUNIT_ASSERT_EQUAL(std::string("tmp_1"), qc.first->getName()); +    } +     +    void testFind() { +        CPPUNIT_ASSERT(reg.find(foo) == 0); + +        reg.declare(foo, false, 0, 0); +        reg.declare(bar, false, 0, 0); +        Queue::shared_ptr q = reg.find(bar); +        CPPUNIT_ASSERT(q); +        CPPUNIT_ASSERT_EQUAL(bar, q->getName()); +    } + +    void testDestroy() { +        qc = reg.declare(foo, false, 0, 0); +        reg.destroy(foo); +        // Queue is gone from the registry. +        CPPUNIT_ASSERT(reg.find(foo) == 0); +        // Queue is not actually destroyed till we drop our reference. +        CPPUNIT_ASSERT_EQUAL(foo, qc.first->getName()); +        // We shoud be the only reference. +        CPPUNIT_ASSERT_EQUAL(1L, qc.first.use_count()); +    } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(QueueRegistryTest); diff --git a/cpp/tests/QueueTest.cpp b/cpp/tests/QueueTest.cpp new file mode 100644 index 0000000000..9d655781c1 --- /dev/null +++ b/cpp/tests/QueueTest.cpp @@ -0,0 +1,179 @@ + /* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <BrokerQueue.h> +#include <QueueRegistry.h> +#include <qpid_test_plugin.h> +#include <iostream> + +using namespace qpid::broker; +using namespace qpid::sys; + + +class TestBinding : public virtual Binding{ +    bool cancelled; + +public: +    TestBinding(); +    virtual void cancel(); +    bool isCancelled(); +}; + +class TestConsumer : public virtual Consumer{ +public: +    Message::shared_ptr last; + +    virtual bool deliver(Message::shared_ptr& msg); +}; + + +class QueueTest : public CppUnit::TestCase   +{ +    CPPUNIT_TEST_SUITE(QueueTest); +    CPPUNIT_TEST(testConsumers); +    CPPUNIT_TEST(testBinding); +    CPPUNIT_TEST(testRegistry); +    CPPUNIT_TEST(testDequeue); +    CPPUNIT_TEST_SUITE_END(); + +  public: +    void testConsumers(){ +        Queue::shared_ptr queue(new Queue("my_queue", true)); +     +        //Test adding consumers: +        TestConsumer c1;  +        TestConsumer c2;  +        queue->consume(&c1); +        queue->consume(&c2); + +        CPPUNIT_ASSERT_EQUAL(u_int32_t(2), queue->getConsumerCount()); +         +        //Test basic delivery: +        Message::shared_ptr msg1 = Message::shared_ptr(new Message(0, "e", "A", true, true)); +        Message::shared_ptr msg2 = Message::shared_ptr(new Message(0, "e", "B", true, true)); +        Message::shared_ptr msg3 = Message::shared_ptr(new Message(0, "e", "C", true, true)); + +        queue->deliver(msg1); +        CPPUNIT_ASSERT_EQUAL(msg1.get(), c1.last.get()); + +        queue->deliver(msg2); +        CPPUNIT_ASSERT_EQUAL(msg2.get(), c2.last.get()); +         +        queue->deliver(msg3); +        CPPUNIT_ASSERT_EQUAL(msg3.get(), c1.last.get());         +     +        //Test cancellation: +        queue->cancel(&c1); +        CPPUNIT_ASSERT_EQUAL(u_int32_t(1), queue->getConsumerCount()); +        queue->cancel(&c2); +        CPPUNIT_ASSERT_EQUAL(u_int32_t(0), queue->getConsumerCount()); +    } + +    void testBinding(){ +        Queue::shared_ptr queue(new Queue("my_queue", true)); +        //Test bindings: +        TestBinding a; +        TestBinding b; +        queue->bound(&a); +        queue->bound(&b);     +     +        queue.reset(); + +        CPPUNIT_ASSERT(a.isCancelled()); +        CPPUNIT_ASSERT(b.isCancelled()); +    } + +    void testRegistry(){ +        //Test use of queues in registry: +        QueueRegistry registry; +        registry.declare("queue1", true, true); +        registry.declare("queue2", true, true); +        registry.declare("queue3", true, true); + +        CPPUNIT_ASSERT(registry.find("queue1")); +        CPPUNIT_ASSERT(registry.find("queue2")); +        CPPUNIT_ASSERT(registry.find("queue3")); +         +        registry.destroy("queue1"); +        registry.destroy("queue2"); +        registry.destroy("queue3"); + +        CPPUNIT_ASSERT(!registry.find("queue1")); +        CPPUNIT_ASSERT(!registry.find("queue2")); +        CPPUNIT_ASSERT(!registry.find("queue3")); +    } + +    void testDequeue(){ +        Queue::shared_ptr queue(new Queue("my_queue", true)); + +        Message::shared_ptr msg1 = Message::shared_ptr(new Message(0, "e", "A", true, true)); +        Message::shared_ptr msg2 = Message::shared_ptr(new Message(0, "e", "B", true, true)); +        Message::shared_ptr msg3 = Message::shared_ptr(new Message(0, "e", "C", true, true)); +        Message::shared_ptr received; + +        queue->deliver(msg1); +        queue->deliver(msg2); +        queue->deliver(msg3); + +        CPPUNIT_ASSERT_EQUAL(u_int32_t(3), queue->getMessageCount()); +         +        received = queue->dequeue(); +        CPPUNIT_ASSERT_EQUAL(msg1.get(), received.get()); +        CPPUNIT_ASSERT_EQUAL(u_int32_t(2), queue->getMessageCount()); + +        received = queue->dequeue(); +        CPPUNIT_ASSERT_EQUAL(msg2.get(), received.get()); +        CPPUNIT_ASSERT_EQUAL(u_int32_t(1), queue->getMessageCount()); + +        TestConsumer consumer;  +        queue->consume(&consumer); +        queue->dispatch(); +        CPPUNIT_ASSERT_EQUAL(msg3.get(), consumer.last.get()); +        CPPUNIT_ASSERT_EQUAL(u_int32_t(0), queue->getMessageCount()); + +        received = queue->dequeue(); +        CPPUNIT_ASSERT(!received); +        CPPUNIT_ASSERT_EQUAL(u_int32_t(0), queue->getMessageCount()); +         +    } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(QueueTest); + +//TestBinding +TestBinding::TestBinding() : cancelled(false) {} + +void TestBinding::cancel(){ +    CPPUNIT_ASSERT(!cancelled); +    cancelled = true; +} + +bool TestBinding::isCancelled(){ +    return cancelled; +} + +//TestConsumer +bool TestConsumer::deliver(Message::shared_ptr& msg){ +    last = msg; +    return true; +} + diff --git a/cpp/tests/TopicExchangeTest.cpp b/cpp/tests/TopicExchangeTest.cpp new file mode 100644 index 0000000000..4ba9cdd6e5 --- /dev/null +++ b/cpp/tests/TopicExchangeTest.cpp @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include <TopicExchange.h> +#include <qpid_test_plugin.h> + +using namespace qpid::broker; + +Tokens makeTokens(char** begin, char** end) +{ +    Tokens t; +    t.insert(t.end(), begin, end); +    return t; +} + +// Calculate size of an array.  +#define LEN(a) (sizeof(a)/sizeof(a[0])) + +// Convert array to token vector +#define TOKENS(a) makeTokens(a, a + LEN(a)) + +// Allow CPPUNIT_EQUALS to print a Tokens. +CppUnit::OStringStream& operator <<(CppUnit::OStringStream& out, const Tokens& v) +{ +    out << "[ "; +    for (Tokens::const_iterator i = v.begin(); +         i != v.end(); ++i) +    { +        out << '"' << *i << '"' << (i+1 == v.end() ? "]" : ", "); +    } +    return out; +} + + +class TokensTest : public CppUnit::TestCase +{ +    CPPUNIT_TEST_SUITE(TokensTest); +    CPPUNIT_TEST(testTokens); +    CPPUNIT_TEST_SUITE_END(); + +  public: +    void testTokens()  +    { +        Tokens tokens("hello.world"); +        char* expect[] = {"hello", "world"}; +        CPPUNIT_ASSERT_EQUAL(TOKENS(expect), tokens); +         +        tokens = "a.b.c"; +        char* expect2[] = { "a", "b", "c" }; +        CPPUNIT_ASSERT_EQUAL(TOKENS(expect2), tokens); + +        tokens = ""; +        CPPUNIT_ASSERT(tokens.empty()); + +        tokens = "x"; +        char* expect3[] = { "x" }; +        CPPUNIT_ASSERT_EQUAL(TOKENS(expect3), tokens); + +        tokens = (".x"); +        char* expect4[] = { "", "x" }; +        CPPUNIT_ASSERT_EQUAL(TOKENS(expect4), tokens); + +        tokens = ("x."); +        char* expect5[] = { "x", "" }; +        CPPUNIT_ASSERT_EQUAL(TOKENS(expect5), tokens); + +        tokens = ("."); +        char* expect6[] = { "", "" }; +        CPPUNIT_ASSERT_EQUAL(TOKENS(expect6), tokens);         + +        tokens = (".."); +        char* expect7[] = { "", "", "" }; +        CPPUNIT_ASSERT_EQUAL(TOKENS(expect7), tokens);         +    } +     +}; + +#define ASSERT_NORMALIZED(expect, pattern) \ +    CPPUNIT_ASSERT_EQUAL(Tokens(expect), static_cast<Tokens>(TopicPattern(pattern))) +class TopicPatternTest : public CppUnit::TestCase  +{ +    CPPUNIT_TEST_SUITE(TopicPatternTest); +    CPPUNIT_TEST(testNormalize); +    CPPUNIT_TEST(testPlain); +    CPPUNIT_TEST(testStar); +    CPPUNIT_TEST(testHash); +    CPPUNIT_TEST(testMixed); +    CPPUNIT_TEST(testCombo); +    CPPUNIT_TEST_SUITE_END(); + +  public: + +    void testNormalize()  +    { +        CPPUNIT_ASSERT(TopicPattern("").empty()); +        ASSERT_NORMALIZED("a.b.c", "a.b.c"); +        ASSERT_NORMALIZED("a.*.c", "a.*.c"); +        ASSERT_NORMALIZED("#", "#"); +        ASSERT_NORMALIZED("#", "#.#.#.#"); +        ASSERT_NORMALIZED("*.*.*.#", "#.*.#.*.#.#.*"); +        ASSERT_NORMALIZED("a.*.*.*.#", "a.*.#.*.#.*.#"); +        ASSERT_NORMALIZED("a.*.*.*.#", "a.*.#.*.#.*"); +    } +     +    void testPlain() { +        TopicPattern p("ab.cd.e"); +        CPPUNIT_ASSERT(p.match("ab.cd.e")); +        CPPUNIT_ASSERT(!p.match("abx.cd.e")); +        CPPUNIT_ASSERT(!p.match("ab.cd")); +        CPPUNIT_ASSERT(!p.match("ab.cd..e.")); +        CPPUNIT_ASSERT(!p.match("ab.cd.e.")); +        CPPUNIT_ASSERT(!p.match(".ab.cd.e")); + +        p = ""; +        CPPUNIT_ASSERT(p.match("")); + +        p = "."; +        CPPUNIT_ASSERT(p.match(".")); +    } + + +    void testStar()  +    { +        TopicPattern p("a.*.b"); +        CPPUNIT_ASSERT(p.match("a.xx.b")); +        CPPUNIT_ASSERT(!p.match("a.b")); + +        p = "*.x"; +        CPPUNIT_ASSERT(p.match("y.x")); +        CPPUNIT_ASSERT(p.match(".x")); +        CPPUNIT_ASSERT(!p.match("x")); + +        p = "x.x.*"; +        CPPUNIT_ASSERT(p.match("x.x.y")); +        CPPUNIT_ASSERT(p.match("x.x.")); +        CPPUNIT_ASSERT(!p.match("x.x")); +        CPPUNIT_ASSERT(!p.match("q.x.y")); +    } + +    void testHash()  +    { +        TopicPattern p("a.#.b"); +        CPPUNIT_ASSERT(p.match("a.b")); +        CPPUNIT_ASSERT(p.match("a.x.b")); +        CPPUNIT_ASSERT(p.match("a..x.y.zz.b")); +        CPPUNIT_ASSERT(!p.match("a.b.")); +        CPPUNIT_ASSERT(!p.match("q.x.b")); + +        p = "a.#"; +        CPPUNIT_ASSERT(p.match("a")); +        CPPUNIT_ASSERT(p.match("a.b")); +        CPPUNIT_ASSERT(p.match("a.b.c")); + +        p = "#.a"; +        CPPUNIT_ASSERT(p.match("a")); +        CPPUNIT_ASSERT(p.match("x.y.a")); +    } + +    void testMixed()  +    { +        TopicPattern p("*.x.#.y"); +        CPPUNIT_ASSERT(p.match("a.x.y")); +        CPPUNIT_ASSERT(p.match("a.x.p.qq.y")); +        CPPUNIT_ASSERT(!p.match("a.a.x.y")); +        CPPUNIT_ASSERT(!p.match("aa.x.b.c")); + +        p = "a.#.b.*"; +        CPPUNIT_ASSERT(p.match("a.b.x")); +        CPPUNIT_ASSERT(p.match("a.x.x.x.b.x")); +    } + +    void testCombo() { +        TopicPattern p("*.#.#.*.*.#"); +        CPPUNIT_ASSERT(p.match("x.y.z")); +        CPPUNIT_ASSERT(p.match("x.y.z.a.b.c")); +        CPPUNIT_ASSERT(!p.match("x.y")); +        CPPUNIT_ASSERT(!p.match("x")); +    } +}; + +     +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(TopicPatternTest); +CPPUNIT_TEST_SUITE_REGISTRATION(TokensTest); diff --git a/cpp/tests/TxAckTest.cpp b/cpp/tests/TxAckTest.cpp new file mode 100644 index 0000000000..709d45c1ad --- /dev/null +++ b/cpp/tests/TxAckTest.cpp @@ -0,0 +1,113 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <NullMessageStore.h> +#include <RecoveryManager.h> +#include <TxAck.h> +#include <qpid_test_plugin.h> +#include <iostream> +#include <list> +#include <vector> + +using std::list; +using std::vector; +using namespace qpid::broker; +using namespace qpid::framing; + +class TxAckTest : public CppUnit::TestCase   +{ + +    class TestMessageStore : public NullMessageStore +    { +    public: +        vector<Message::shared_ptr> dequeued; + +        void dequeue(TransactionContext*, Message::shared_ptr& msg, const Queue& /*queue*/, const string * const /*xid*/) +        { +            dequeued.push_back(msg); +        } + +        TestMessageStore() : NullMessageStore(false) {} +        ~TestMessageStore(){} +    }; + +    CPPUNIT_TEST_SUITE(TxAckTest); +    CPPUNIT_TEST(testPrepare); +    CPPUNIT_TEST(testCommit); +    CPPUNIT_TEST_SUITE_END(); + + +    AccumulatedAck acked; +    TestMessageStore store; +    Queue::shared_ptr queue; +    vector<Message::shared_ptr> messages; +    list<DeliveryRecord> deliveries; +    TxAck op; + + +public: + +    TxAckTest() : queue(new Queue("my_queue", false, &store, 0)), op(acked, deliveries) +    { +        for(int i = 0; i < 10; i++){ +            Message::shared_ptr msg(new Message(0, "exchange", "routing_key", false, false)); +            msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC))); +            msg->getHeaderProperties()->setDeliveryMode(PERSISTENT); +            messages.push_back(msg); +            deliveries.push_back(DeliveryRecord(msg, queue, "xyz", (i+1))); +        } + +        //assume msgs 1-5, 7 and 9 are all acked (i.e. 6, 8 & 10 are not) +        acked.range = 5; +        acked.individual.push_back(7); +        acked.individual.push_back(9); +    }       + +    void testPrepare() +    { +        //ensure acked messages are discarded, i.e. dequeued from store +        op.prepare(0); +        CPPUNIT_ASSERT_EQUAL((size_t) 7, store.dequeued.size()); +        CPPUNIT_ASSERT_EQUAL((size_t) 10, deliveries.size()); +        CPPUNIT_ASSERT_EQUAL(messages[0], store.dequeued[0]);//msg 1 +        CPPUNIT_ASSERT_EQUAL(messages[1], store.dequeued[1]);//msg 2 +        CPPUNIT_ASSERT_EQUAL(messages[2], store.dequeued[2]);//msg 3 +        CPPUNIT_ASSERT_EQUAL(messages[3], store.dequeued[3]);//msg 4 +        CPPUNIT_ASSERT_EQUAL(messages[4], store.dequeued[4]);//msg 5 +        CPPUNIT_ASSERT_EQUAL(messages[6], store.dequeued[5]);//msg 7 +        CPPUNIT_ASSERT_EQUAL(messages[8], store.dequeued[6]);//msg 9 +    } + +    void testCommit() +    { +        //emsure acked messages are removed from list +        op.commit(); +        CPPUNIT_ASSERT_EQUAL((size_t) 3, deliveries.size()); +        list<DeliveryRecord>::iterator i = deliveries.begin(); +        CPPUNIT_ASSERT(i->matches(6));//msg 6 +        CPPUNIT_ASSERT((++i)->matches(8));//msg 8 +        CPPUNIT_ASSERT((++i)->matches(10));//msg 10 +    } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(TxAckTest); + diff --git a/cpp/tests/TxBufferTest.cpp b/cpp/tests/TxBufferTest.cpp new file mode 100644 index 0000000000..0573ad15b0 --- /dev/null +++ b/cpp/tests/TxBufferTest.cpp @@ -0,0 +1,266 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <TxBuffer.h> +#include <qpid_test_plugin.h> +#include <iostream> +#include <vector> + +using namespace qpid::broker; + +template <class T> void assertEqualVector(std::vector<T>& expected, std::vector<T>& actual){ +    unsigned int i = 0; +    while(i < expected.size() && i < actual.size()){ +        CPPUNIT_ASSERT_EQUAL(expected[i], actual[i]); +        i++; +    } +    CPPUNIT_ASSERT(i == expected.size()); +    CPPUNIT_ASSERT(i == actual.size()); +} + +class TxBufferTest : public CppUnit::TestCase   +{ +    class MockTxOp : public TxOp{ +        enum op_codes {PREPARE=2, COMMIT=4, ROLLBACK=8}; +        std::vector<int> expected; +        std::vector<int> actual; +        bool failOnPrepare; +    public: +        MockTxOp() : failOnPrepare(false) {} +        MockTxOp(bool _failOnPrepare) : failOnPrepare(_failOnPrepare) {} + +        bool prepare(TransactionContext*) throw(){ +            actual.push_back(PREPARE); +            return !failOnPrepare; +        } +        void commit()  throw(){ +            actual.push_back(COMMIT); +        } +        void rollback()  throw(){ +            actual.push_back(ROLLBACK); +        } +        MockTxOp& expectPrepare(){ +            expected.push_back(PREPARE); +            return *this; +        } +        MockTxOp& expectCommit(){ +            expected.push_back(COMMIT); +            return *this; +        } +        MockTxOp& expectRollback(){ +            expected.push_back(ROLLBACK); +            return *this; +        } +        void check(){ +            assertEqualVector(expected, actual); +        } +        ~MockTxOp(){}         +    }; + +    class MockTransactionalStore : public TransactionalStore{ +        enum op_codes {BEGIN=2, COMMIT=4, ABORT=8}; +        std::vector<int> expected; +        std::vector<int> actual; + +        enum states {OPEN = 1, COMMITTED = 2, ABORTED = 3}; +        int state; + +        class TestTransactionContext : public TransactionContext{ +            MockTransactionalStore* store; +        public: +            TestTransactionContext(MockTransactionalStore* _store) : store(_store) {} +            void commit(){ +                if(store->state != OPEN) throw "txn already completed"; +                store->state = COMMITTED; +            } + +            void abort(){ +                if(store->state != OPEN) throw "txn already completed"; +                store->state = ABORTED; +            } +            ~TestTransactionContext(){} +        }; + + +    public: +        MockTransactionalStore() : state(OPEN){} + +        std::auto_ptr<TransactionContext> begin(){  +            actual.push_back(BEGIN); +            std::auto_ptr<TransactionContext> txn(new TestTransactionContext(this)); +            return txn; +        } +        void commit(TransactionContext* ctxt){ +            actual.push_back(COMMIT); +            TestTransactionContext* txn(dynamic_cast<TestTransactionContext*>(ctxt)); +            CPPUNIT_ASSERT(txn); +            txn->commit(); +        } +        void abort(TransactionContext* ctxt){ +            actual.push_back(ABORT); +            TestTransactionContext* txn(dynamic_cast<TestTransactionContext*>(ctxt)); +            CPPUNIT_ASSERT(txn); +            txn->abort(); +        }         +        MockTransactionalStore& expectBegin(){ +            expected.push_back(BEGIN); +            return *this; +        } +        MockTransactionalStore& expectCommit(){ +            expected.push_back(COMMIT); +            return *this; +        } +        MockTransactionalStore& expectAbort(){ +            expected.push_back(ABORT); +            return *this; +        } +        void check(){ +            assertEqualVector(expected, actual); +        } + +        bool isCommitted(){ +            return state == COMMITTED; +        } +         +        bool isAborted(){ +            return state == ABORTED; +        } +         +        bool isOpen(){ +            return state == OPEN; +        } +        ~MockTransactionalStore(){} +    }; + +    CPPUNIT_TEST_SUITE(TxBufferTest); +    CPPUNIT_TEST(testPrepareAndCommit); +    CPPUNIT_TEST(testFailOnPrepare); +    CPPUNIT_TEST(testRollback); +    CPPUNIT_TEST(testBufferIsClearedAfterRollback); +    CPPUNIT_TEST(testBufferIsClearedAfterCommit); +    CPPUNIT_TEST_SUITE_END(); + +  public: + +    void testPrepareAndCommit(){ +        MockTransactionalStore store; +        store.expectBegin().expectCommit(); + +        MockTxOp opA; +        opA.expectPrepare().expectCommit(); +        MockTxOp opB; +        opB.expectPrepare().expectPrepare().expectCommit().expectCommit();//opB enlisted twice to test reative order +        MockTxOp opC; +        opC.expectPrepare().expectCommit(); + +        TxBuffer buffer; +        buffer.enlist(&opA); +        buffer.enlist(&opB); +        buffer.enlist(&opB);//opB enlisted twice +        buffer.enlist(&opC); + +        CPPUNIT_ASSERT(buffer.prepare(&store)); +        buffer.commit(); +        store.check(); +        CPPUNIT_ASSERT(store.isCommitted()); +        opA.check(); +        opB.check(); +        opC.check(); +    } + +    void testFailOnPrepare(){ +        MockTransactionalStore store; +        store.expectBegin().expectAbort(); + +        MockTxOp opA; +        opA.expectPrepare(); +        MockTxOp opB(true); +        opB.expectPrepare(); +        MockTxOp opC;//will never get prepare as b will fail + +        TxBuffer buffer; +        buffer.enlist(&opA); +        buffer.enlist(&opB); +        buffer.enlist(&opC); + +        CPPUNIT_ASSERT(!buffer.prepare(&store)); +        store.check(); +        CPPUNIT_ASSERT(store.isAborted()); +        opA.check(); +        opB.check(); +        opC.check(); +    } + +    void testRollback(){ +        MockTxOp opA; +        opA.expectRollback(); +        MockTxOp opB(true); +        opB.expectRollback(); +        MockTxOp opC; +        opC.expectRollback(); + +        TxBuffer buffer; +        buffer.enlist(&opA); +        buffer.enlist(&opB); +        buffer.enlist(&opC); + +        buffer.rollback(); +        opA.check(); +        opB.check(); +        opC.check(); +    } + +    void testBufferIsClearedAfterRollback(){ +        MockTxOp opA; +        opA.expectRollback(); +        MockTxOp opB; +        opB.expectRollback(); + +        TxBuffer buffer; +        buffer.enlist(&opA); +        buffer.enlist(&opB); + +        buffer.rollback(); +        buffer.commit();//second call should not reach ops +        opA.check(); +        opB.check(); +    } + +    void testBufferIsClearedAfterCommit(){ +        MockTxOp opA; +        opA.expectCommit(); +        MockTxOp opB; +        opB.expectCommit(); + +        TxBuffer buffer; +        buffer.enlist(&opA); +        buffer.enlist(&opB); + +        buffer.commit(); +        buffer.rollback();//second call should not reach ops +        opA.check(); +        opB.check(); +    } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(TxBufferTest); + diff --git a/cpp/tests/TxPublishTest.cpp b/cpp/tests/TxPublishTest.cpp new file mode 100644 index 0000000000..6324e5fb01 --- /dev/null +++ b/cpp/tests/TxPublishTest.cpp @@ -0,0 +1,105 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <NullMessageStore.h> +#include <RecoveryManager.h> +#include <TxPublish.h> +#include <qpid_test_plugin.h> +#include <iostream> +#include <list> +#include <vector> + +using std::list; +using std::pair; +using std::vector; +using namespace qpid::broker; +using namespace qpid::framing; + +class TxPublishTest : public CppUnit::TestCase   +{ + +    class TestMessageStore : public NullMessageStore +    { +    public: +        vector< pair<string, Message::shared_ptr> > enqueued; +         +        void enqueue(TransactionContext*, Message::shared_ptr& msg, const Queue& queue, const string * const /*xid*/) +        { +            enqueued.push_back(pair<string, Message::shared_ptr>(queue.getName(),msg)); +        } +         +        //dont care about any of the other methods: +        TestMessageStore() : NullMessageStore(false) {} +        ~TestMessageStore(){} +    }; +     +    CPPUNIT_TEST_SUITE(TxPublishTest); +    CPPUNIT_TEST(testPrepare); +    CPPUNIT_TEST(testCommit); +    CPPUNIT_TEST_SUITE_END(); +     +     +    TestMessageStore store; +    Queue::shared_ptr queue1; +    Queue::shared_ptr queue2; +    Message::shared_ptr msg; +    TxPublish op; +     +     +public: +     +    TxPublishTest() : queue1(new Queue("queue1", false, &store, 0)),  +                      queue2(new Queue("queue2", false, &store, 0)),  +                      msg(new Message(0, "exchange", "routing_key", false, false)), +                      op(msg) +    { +        msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC))); +        msg->getHeaderProperties()->setDeliveryMode(PERSISTENT); +        op.deliverTo(queue1); +        op.deliverTo(queue2); +    }       + +    void testPrepare() +    { +        //ensure messages are enqueued in store +        op.prepare(0); +        CPPUNIT_ASSERT_EQUAL((size_t) 2, store.enqueued.size()); +        CPPUNIT_ASSERT_EQUAL(string("queue1"), store.enqueued[0].first); +        CPPUNIT_ASSERT_EQUAL(msg, store.enqueued[0].second); +        CPPUNIT_ASSERT_EQUAL(string("queue2"), store.enqueued[1].first); +        CPPUNIT_ASSERT_EQUAL(msg, store.enqueued[1].second); +    } + +    void testCommit() +    { +        //ensure messages are delivered to queue +        op.commit(); +        CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue1->getMessageCount()); +        CPPUNIT_ASSERT_EQUAL(msg, queue1->dequeue()); + +        CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue2->getMessageCount()); +        CPPUNIT_ASSERT_EQUAL(msg, queue2->dequeue());             +    } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(TxPublishTest); + diff --git a/cpp/tests/ValueTest.cpp b/cpp/tests/ValueTest.cpp new file mode 100644 index 0000000000..a3f9ec2541 --- /dev/null +++ b/cpp/tests/ValueTest.cpp @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include <Value.h> +#include <qpid_test_plugin.h> + +using namespace qpid::framing; + + +class ValueTest : public CppUnit::TestCase  +{ +    CPPUNIT_TEST_SUITE(ValueTest); +    CPPUNIT_TEST(testStringValueEquals); +    CPPUNIT_TEST(testIntegerValueEquals); +    CPPUNIT_TEST(testDecimalValueEquals); +    CPPUNIT_TEST(testFieldTableValueEquals); +    CPPUNIT_TEST_SUITE_END(); + +    StringValue s; +    IntegerValue i; +    DecimalValue d; +    FieldTableValue ft; +    EmptyValue e; + +  public: +    ValueTest() : +        s("abc"), +        i(42), +        d(1234,2) +         +    { +        ft.getValue().setString("foo", "FOO"); +        ft.getValue().setInt("magic", 7); +    } +     +    void testStringValueEquals()  +    { +         +        CPPUNIT_ASSERT(StringValue("abc") == s); +        CPPUNIT_ASSERT(s != StringValue("foo")); +        CPPUNIT_ASSERT(s != e); +        CPPUNIT_ASSERT(e != d); +        CPPUNIT_ASSERT(e != ft); +    } + +    void testIntegerValueEquals() +    { +        CPPUNIT_ASSERT(IntegerValue(42) == i); +        CPPUNIT_ASSERT(IntegerValue(5) != i); +        CPPUNIT_ASSERT(i != e); +        CPPUNIT_ASSERT(i != d); +    } + +    void testDecimalValueEquals()  +    { +        CPPUNIT_ASSERT(DecimalValue(1234, 2) == d); +        CPPUNIT_ASSERT(DecimalValue(12345, 2) != d); +        CPPUNIT_ASSERT(DecimalValue(1234, 3) != d); +        CPPUNIT_ASSERT(d != s); +    } + + +    void testFieldTableValueEquals() +    { +        CPPUNIT_ASSERT_EQUAL(std::string("FOO"), +                             ft.getValue().getString("foo")); +        CPPUNIT_ASSERT_EQUAL(7, ft.getValue().getInt("magic")); +         +        FieldTableValue f2; +        CPPUNIT_ASSERT(ft != f2); +        f2.getValue().setString("foo", "FOO"); +        CPPUNIT_ASSERT(ft != f2); +        f2.getValue().setInt("magic", 7); +        CPPUNIT_ASSERT_EQUAL(ft,f2); +        CPPUNIT_ASSERT(ft == f2); +        f2.getValue().setString("foo", "BAR"); +        CPPUNIT_ASSERT(ft != f2); +        CPPUNIT_ASSERT(ft != i); +    } +     +}; + +     +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(ValueTest); + diff --git a/cpp/tests/broker b/cpp/tests/broker new file mode 100755 index 0000000000..c49e967466 --- /dev/null +++ b/cpp/tests/broker @@ -0,0 +1,45 @@ +#!/bin/sh +. `dirname $0`/env + +brokerpid() { +    netstat -tpl 2> /dev/null | awk '/amqp/ {print gensub("/.*$","","g",$7) }' +} + +killbroker () { +    PID=`brokerpid` +    if [ -n "$PID" ] ; then kill $PID ; fi +    for ((i=5;i--;)) { +	if [ -z "`brokerpid`" ] ; then exit 0 ; fi +	sleep 1 +    } +    echo "Broker `brokerpid` refuses to die." +} + +waitbroker () { +    while [ -z `brokerpid` ] ; do sleep 1 ; done +} + +startbroker() { +    case $1 in +	j)    +	    export AMQJ_LOGGING_LEVEL=fatal +	    export JDPA_OPTS= +	    export QPID_OPTS=-Xmx1024M +	    export debug=1 +	    CMD="qpid-server" +	    qpid-run -run:print-command # Show the command line. +	    ;; +	c)  CMD=qpidd ;; +    esac +    nohup $CMD  > /dev/null 2>&1 & +    waitbroker +    echo Broker started: $CMD +} + + +case $1 in +    j|c) startbroker $1 ;; +    stop|kill) killbroker ;; +    wait) waitbroker ;; +    pid) brokerpid ;; +esac diff --git a/cpp/tests/client_test.cpp b/cpp/tests/client_test.cpp new file mode 100644 index 0000000000..5ee53e3fa8 --- /dev/null +++ b/cpp/tests/client_test.cpp @@ -0,0 +1,99 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <iostream> + +#include <QpidError.h> +#include <ClientChannel.h> +#include <Connection.h> +#include <ClientMessage.h> +#include <MessageListener.h> +#include <sys/Monitor.h> +#include <FieldTable.h> + +using namespace qpid::client; +using namespace qpid::sys; +using std::string; + +class SimpleListener : public virtual MessageListener{ +    Monitor* monitor; + +public: +    inline SimpleListener(Monitor* _monitor) : monitor(_monitor){} + +    inline virtual void received(Message& /*msg*/){ +	std::cout << "Received message " /**<< msg **/<< std::endl; +	monitor->notify(); +    } +}; + +int main(int argc, char**) +{ +    try{                +	Connection con(argc > 1); +	Channel channel; +	Exchange exchange("MyExchange", Exchange::TOPIC_EXCHANGE); +	Queue queue("MyQueue", true); +	 +	string host("localhost"); +	 +	con.open(host); +	std::cout << "Opened connection." << std::endl; +	con.openChannel(&channel); +	std::cout << "Opened channel." << std::endl;	 +	channel.declareExchange(exchange); +	std::cout << "Declared exchange." << std::endl; +	channel.declareQueue(queue); +	std::cout << "Declared queue." << std::endl; +	qpid::framing::FieldTable args; +	channel.bind(exchange, queue, "MyTopic", args); +	std::cout << "Bound queue to exchange." << std::endl; + +	//set up a message listener +	Monitor monitor; +	SimpleListener listener(&monitor); +	string tag("MyTag"); +	channel.consume(queue, tag, &listener); +	channel.start(); +	std::cout << "Registered consumer." << std::endl; + +	Message msg; +	string data("MyMessage"); +	msg.setData(data); +	channel.publish(msg, exchange, "MyTopic"); +	std::cout << "Published message." << std::endl; + +	{ +            Monitor::ScopedLock l(monitor); +            monitor.wait(); +        } +         +	con.closeChannel(&channel); +	std::cout << "Closed channel." << std::endl; +	con.close();	 +	std::cout << "Closed connection." << std::endl; +    }catch(qpid::QpidError error){ +	std::cout << "Error [" << error.code << "] " << error.msg << " (" +                  << error.location.file << ":" << error.location.line +                  << ")" << std::endl; +	return 1; +    } +    return 0; +} diff --git a/cpp/tests/echo_service.cpp b/cpp/tests/echo_service.cpp new file mode 100644 index 0000000000..6159aafca0 --- /dev/null +++ b/cpp/tests/echo_service.cpp @@ -0,0 +1,198 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <QpidError.h> +#include <ClientChannel.h> +#include <Connection.h> +#include <ClientExchange.h> +#include <MessageListener.h> +#include <ClientQueue.h> +#include <sys/Time.h> +#include <iostream> +#include <sstream> + +using namespace qpid::client; +using namespace qpid::sys; +using std::string; + +class EchoServer : public MessageListener{     +    Channel* const channel; +public: +    EchoServer(Channel* channel); +    virtual void received(Message& msg); +}; + +class LoggingListener : public MessageListener{     +public: +    virtual void received(Message& msg); +}; + +class Args{ +    string host; +    int port; +    bool trace; +    bool help; +    bool client; +public: +    inline Args() : host("localhost"), port(5672), trace(false), help(false), client(false){} +    void parse(int argc, char** argv); +    void usage(); + +    inline const string& getHost() const { return host;} +    inline int getPort() const { return port; } +    inline bool getTrace() const { return trace; } +    inline bool getHelp() const { return help; } +    inline bool getClient() const { return client; } +}; + +int main(int argc, char** argv){ +    const std::string echo_service("echo_service"); +    Args args; +    args.parse(argc, argv); +    if (args.getHelp()) { +        args.usage(); +    } else if (args.getClient()) { +        try { +            //Create connection & open a channel +            Connection connection(args.getTrace()); +            connection.open(args.getHost(), args.getPort()); +            Channel channel; +            connection.openChannel(&channel); +         +            //Setup: declare the private 'response' queue and bind it +            //to the direct exchange by its name which will be +            //generated by the server +            Queue response; +            channel.declareQueue(response); +            qpid::framing::FieldTable emptyArgs; +            channel.bind(Exchange::DEFAULT_DIRECT_EXCHANGE, response, response.getName(), emptyArgs); + +            //Consume from the response queue, logging all echoed message to console: +            LoggingListener listener; +            std::string tag; +            channel.consume(response, tag, &listener); + +            //Process incoming requests on a new thread +            channel.start(); + +            //get messages from console and send them: +            std::string text; +            std::cout << "Enter text to send:" << std::endl; +            while (std::getline(std::cin, text)) { +                std::cout << "Sending " << text << " to echo server." << std::endl; +                Message msg; +                msg.getHeaders().setString("RESPONSE_QUEUE", response.getName()); +                msg.setData(text); +                channel.publish(msg, Exchange::DEFAULT_DIRECT_EXCHANGE, echo_service); +                 +                std::cout << "Enter text to send:" << std::endl; +            } +             +            connection.close(); +        } catch(qpid::QpidError error) { +            std::cout << error.what() << std::endl; +        }         +    } else { +        try { +            //Create connection & open a channel +            Connection connection(args.getTrace()); +            connection.open(args.getHost(), args.getPort()); +            Channel channel; +            connection.openChannel(&channel); +         +            //Setup: declare the 'request' queue and bind it to the direct exchange with a 'well known' name +            Queue request("request"); +            channel.declareQueue(request); +            qpid::framing::FieldTable emptyArgs; +            channel.bind(Exchange::DEFAULT_DIRECT_EXCHANGE, request, echo_service, emptyArgs); + +            //Consume from the request queue, echoing back all messages received to the client that sent them +            EchoServer server(&channel); +            std::string tag = "server_tag"; +            channel.consume(request, tag, &server); + +            //Process incoming requests on the main thread +            channel.run(); +             +            connection.close(); +        } catch(qpid::QpidError error) { +            std::cout << error.what() << std::endl; +        } +    } +} + +EchoServer::EchoServer(Channel* _channel) : channel(_channel){} + +void EchoServer::received(Message& message) +{ +    //get name of response queues binding to the default direct exchange: +    const std::string name = message.getHeaders().getString("RESPONSE_QUEUE"); + +    if (name.empty()) { +        std::cout << "Cannot echo " << message.getData() << ", no response queue specified." << std::endl; +    } else { +        //print message to console: +        std::cout << "Echoing " << message.getData() << " back to " << name << std::endl; +         +        //'echo' the message back: +        channel->publish(message, Exchange::DEFAULT_DIRECT_EXCHANGE, name); +    } +} + +void LoggingListener::received(Message& message) +{ +    //print message to console: +    std::cout << "Received echo: " << message.getData() << std::endl; +} + + +void Args::parse(int argc, char** argv){ +    for(int i = 1; i < argc; i++){ +        string name(argv[i]); +        if("-help" == name){ +            help = true; +            break; +        }else if("-host" == name){ +            host = argv[++i]; +        }else if("-port" == name){ +            port = atoi(argv[++i]); +        }else if("-trace" == name){ +            trace = true; +        }else if("-client" == name){ +            client = true; +        }else{ +            std::cout << "Warning: unrecognised option " << name << std::endl; +        } +    } +} + +void Args::usage(){ +    std::cout << "Options:" << std::endl; +    std::cout << "    -help" << std::endl; +    std::cout << "            Prints this usage message" << std::endl; +    std::cout << "    -host <host>" << std::endl; +    std::cout << "            Specifies host to connect to (default is localhost)" << std::endl; +    std::cout << "    -port <port>" << std::endl; +    std::cout << "            Specifies port to conect to (default is 5762)" << std::endl; +    std::cout << "    -trace" << std::endl; +    std::cout << "            Indicates that the frames sent and received should be logged" << std::endl; +    std::cout << "    -client" << std::endl; +    std::cout << "            Run as a client (else will run as a server)" << std::endl; +} diff --git a/cpp/tests/env b/cpp/tests/env new file mode 100755 index 0000000000..76797b1ef7 --- /dev/null +++ b/cpp/tests/env @@ -0,0 +1,23 @@ +#!/bin/bash +# Set environment variables for test scripts. + +pathmunge () { +    if ! echo $PATH | /bin/egrep -q "(^|:)$1($|:)" ; then +	if [ "$2" = "after" ] ; then +	    PATH=$PATH:$1 +	else +	    PATH=$1:$PATH +	fi +    fi +} + +if [ -z QPID_ROOT ] ; then echo "You must set QPID_ROOT" ; fi +  +pathmunge $QPID_ROOT/cpp/test/bin +pathmunge $QPID_ROOT/cpp/build/*/bin +pathmunge $QPID_ROOT/cpp/build/*/test + +export QPID_HOME=${QPID_HOME:-$QPID_ROOT/java/build} +pathmunge $QPID_HOME/bin + + diff --git a/cpp/tests/gen.mk b/cpp/tests/gen.mk new file mode 100644 index 0000000000..bcb09f4822 --- /dev/null +++ b/cpp/tests/gen.mk @@ -0,0 +1,101 @@ +client_test_SOURCES = client_test.cpp +client_test_LDADD = $(lib_client) $(lib_common) $(extra_libs) +echo_service_SOURCES = echo_service.cpp +echo_service_LDADD = $(lib_client) $(lib_common) $(extra_libs) +topic_listener_SOURCES = topic_listener.cpp +topic_listener_LDADD = $(lib_client) $(lib_common) $(extra_libs) +topic_publisher_SOURCES = topic_publisher.cpp +topic_publisher_LDADD = $(lib_client) $(lib_common) $(extra_libs) +AccumulatedAckTest_la_SOURCES = AccumulatedAckTest.cpp +AccumulatedAckTest_la_LIBADD = $(lib_common) +AccumulatedAckTest_la_LIBADD += $(lib_broker) $(extra_libs) +AccumulatedAckTest_la_LDFLAGS = -module -rpath $(abs_builddir) +ChannelTest_la_SOURCES = ChannelTest.cpp +ChannelTest_la_LIBADD = $(lib_common) +ChannelTest_la_LIBADD += $(lib_broker) $(extra_libs) +ChannelTest_la_LDFLAGS = -module -rpath $(abs_builddir) +ConfigurationTest_la_SOURCES = ConfigurationTest.cpp +ConfigurationTest_la_LIBADD = $(lib_common) +ConfigurationTest_la_LIBADD += $(lib_broker) $(extra_libs) +ConfigurationTest_la_LDFLAGS = -module -rpath $(abs_builddir) +ExchangeTest_la_SOURCES = ExchangeTest.cpp +ExchangeTest_la_LIBADD = $(lib_common) +ExchangeTest_la_LIBADD += $(lib_broker) $(extra_libs) +ExchangeTest_la_LDFLAGS = -module -rpath $(abs_builddir) +HeadersExchangeTest_la_SOURCES = HeadersExchangeTest.cpp +HeadersExchangeTest_la_LIBADD = $(lib_common) +HeadersExchangeTest_la_LIBADD += $(lib_broker) $(extra_libs) +HeadersExchangeTest_la_LDFLAGS = -module -rpath $(abs_builddir) +InMemoryContentTest_la_SOURCES = InMemoryContentTest.cpp +InMemoryContentTest_la_LIBADD = $(lib_common) +InMemoryContentTest_la_LIBADD += $(lib_broker) $(extra_libs) +InMemoryContentTest_la_LDFLAGS = -module -rpath $(abs_builddir) +LazyLoadedContentTest_la_SOURCES = LazyLoadedContentTest.cpp +LazyLoadedContentTest_la_LIBADD = $(lib_common) +LazyLoadedContentTest_la_LIBADD += $(lib_broker) $(extra_libs) +LazyLoadedContentTest_la_LDFLAGS = -module -rpath $(abs_builddir) +MessageBuilderTest_la_SOURCES = MessageBuilderTest.cpp +MessageBuilderTest_la_LIBADD = $(lib_common) +MessageBuilderTest_la_LIBADD += $(lib_broker) $(extra_libs) +MessageBuilderTest_la_LDFLAGS = -module -rpath $(abs_builddir) +MessageTest_la_SOURCES = MessageTest.cpp +MessageTest_la_LIBADD = $(lib_common) +MessageTest_la_LIBADD += $(lib_broker) $(extra_libs) +MessageTest_la_LDFLAGS = -module -rpath $(abs_builddir) +QueueRegistryTest_la_SOURCES = QueueRegistryTest.cpp +QueueRegistryTest_la_LIBADD = $(lib_common) +QueueRegistryTest_la_LIBADD += $(lib_broker) $(extra_libs) +QueueRegistryTest_la_LDFLAGS = -module -rpath $(abs_builddir) +QueueTest_la_SOURCES = QueueTest.cpp +QueueTest_la_LIBADD = $(lib_common) +QueueTest_la_LIBADD += $(lib_broker) $(extra_libs) +QueueTest_la_LDFLAGS = -module -rpath $(abs_builddir) +TopicExchangeTest_la_SOURCES = TopicExchangeTest.cpp +TopicExchangeTest_la_LIBADD = $(lib_common) +TopicExchangeTest_la_LIBADD += $(lib_broker) $(extra_libs) +TopicExchangeTest_la_LDFLAGS = -module -rpath $(abs_builddir) +TxAckTest_la_SOURCES = TxAckTest.cpp +TxAckTest_la_LIBADD = $(lib_common) +TxAckTest_la_LIBADD += $(lib_broker) $(extra_libs) +TxAckTest_la_LDFLAGS = -module -rpath $(abs_builddir) +TxBufferTest_la_SOURCES = TxBufferTest.cpp +TxBufferTest_la_LIBADD = $(lib_common) +TxBufferTest_la_LIBADD += $(lib_broker) $(extra_libs) +TxBufferTest_la_LDFLAGS = -module -rpath $(abs_builddir) +TxPublishTest_la_SOURCES = TxPublishTest.cpp +TxPublishTest_la_LIBADD = $(lib_common) +TxPublishTest_la_LIBADD += $(lib_broker) $(extra_libs) +TxPublishTest_la_LDFLAGS = -module -rpath $(abs_builddir) +ValueTest_la_SOURCES = ValueTest.cpp +ValueTest_la_LIBADD = $(lib_common) +ValueTest_la_LIBADD += $(lib_broker) $(extra_libs) +ValueTest_la_LDFLAGS = -module -rpath $(abs_builddir) +BodyHandlerTest_la_SOURCES = BodyHandlerTest.cpp +BodyHandlerTest_la_LIBADD = $(lib_common) +BodyHandlerTest_la_LIBADD += $(lib_broker) $(extra_libs) +BodyHandlerTest_la_LDFLAGS = -module -rpath $(abs_builddir) +FieldTableTest_la_SOURCES = FieldTableTest.cpp +FieldTableTest_la_LIBADD = $(lib_common) +FieldTableTest_la_LIBADD += $(lib_broker) $(extra_libs) +FieldTableTest_la_LDFLAGS = -module -rpath $(abs_builddir) +FramingTest_la_SOURCES = FramingTest.cpp +FramingTest_la_LIBADD = $(lib_common) +FramingTest_la_LIBADD += $(lib_broker) $(extra_libs) +FramingTest_la_LDFLAGS = -module -rpath $(abs_builddir) +HeaderTest_la_SOURCES = HeaderTest.cpp +HeaderTest_la_LIBADD = $(lib_common) +HeaderTest_la_LIBADD += $(lib_broker) $(extra_libs) +HeaderTest_la_LDFLAGS = -module -rpath $(abs_builddir) +ExceptionTest_la_SOURCES = ExceptionTest.cpp +ExceptionTest_la_LIBADD = $(lib_common) +ExceptionTest_la_LIBADD += $(lib_broker) $(extra_libs) +ExceptionTest_la_LDFLAGS = -module -rpath $(abs_builddir) +EventChannelTest_la_SOURCES = EventChannelTest.cpp +EventChannelTest_la_LIBADD = $(lib_common) +EventChannelTest_la_LIBADD += $(lib_broker) $(extra_libs) +EventChannelTest_la_LDFLAGS = -module -rpath $(abs_builddir) +EventChannelThreadsTest_la_SOURCES = EventChannelThreadsTest.cpp +EventChannelThreadsTest_la_LIBADD = $(lib_common) +EventChannelThreadsTest_la_LIBADD += $(lib_broker) $(extra_libs) +EventChannelThreadsTest_la_LDFLAGS = -module -rpath $(abs_builddir) +check_LTLIBRARIES = AccumulatedAckTest.la ChannelTest.la ConfigurationTest.la ExchangeTest.la HeadersExchangeTest.la InMemoryContentTest.la LazyLoadedContentTest.la MessageBuilderTest.la MessageTest.la QueueRegistryTest.la QueueTest.la TopicExchangeTest.la TxAckTest.la TxBufferTest.la TxPublishTest.la ValueTest.la BodyHandlerTest.la FieldTableTest.la FramingTest.la HeaderTest.la ExceptionTest.la EventChannelTest.la EventChannelThreadsTest.la diff --git a/cpp/tests/qpid_test_plugin.h b/cpp/tests/qpid_test_plugin.h new file mode 100644 index 0000000000..b2f4a8ffed --- /dev/null +++ b/cpp/tests/qpid_test_plugin.h @@ -0,0 +1,43 @@ +#ifndef _qpid_test_plugin_ +#define _qpid_test_plugin_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * Convenience to include cppunit headers needed by qpid test plugins and + * workaround for warning from superfluous main() declaration + * in cppunit/TestPlugIn.h + */ + +#include <cppunit/TestCase.h> +#include <cppunit/TextTestRunner.h> +#include <cppunit/extensions/HelperMacros.h> +#include <cppunit/plugin/TestPlugIn.h> + +// Redefine CPPUNIT_PLUGIN_IMPLEMENT_MAIN to a dummy typedef to avoid warnings. +//  +#if defined(CPPUNIT_HAVE_UNIX_DLL_LOADER) || defined(CPPUNIT_HAVE_UNIX_SHL_LOADER) +#undef CPPUNIT_PLUGIN_IMPLEMENT_MAIN  +#define CPPUNIT_PLUGIN_IMPLEMENT_MAIN() typedef char __CppUnitPlugInImplementMainDummyTypeDef +#endif + +#endif  /*!_qpid_test_plugin_*/ diff --git a/cpp/tests/topic_listener.cpp b/cpp/tests/topic_listener.cpp new file mode 100644 index 0000000000..9e9b480df3 --- /dev/null +++ b/cpp/tests/topic_listener.cpp @@ -0,0 +1,186 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <QpidError.h> +#include <ClientChannel.h> +#include <Connection.h> +#include <ClientExchange.h> +#include <MessageListener.h> +#include <ClientQueue.h> +#include <sys/Time.h> +#include <iostream> +#include <sstream> + +using namespace qpid::client; +using namespace qpid::sys; +using std::string; + +class Listener : public MessageListener{     +    Channel* const channel; +    const std::string responseQueue; +    const bool transactional; +    bool init; +    int count; +    Time start; +     +    void shutdown(); +    void report(); +public: +    Listener(Channel* channel, const std::string& reponseQueue, bool tx); +    virtual void received(Message& msg); +}; + +class Args{ +    string host; +    int port; +    int ackMode; +    bool transactional; +    int prefetch; +    bool trace; +    bool help; +public: +    inline Args() : host("localhost"), port(5672), ackMode(NO_ACK), transactional(false), prefetch(1000), trace(false), help(false){} +    void parse(int argc, char** argv); +    void usage(); + +    inline const string& getHost() const { return host;} +    inline int getPort() const { return port; } +    inline int getAckMode(){ return ackMode; } +    inline bool getTransactional() const { return transactional; } +    inline int getPrefetch(){ return prefetch; } +    inline bool getTrace() const { return trace; } +    inline bool getHelp() const { return help; } +}; + +int main(int argc, char** argv){ +    Args args; +    args.parse(argc, argv); +    if(args.getHelp()){ +        args.usage(); +    }else{ +        try{ +            Connection connection(args.getTrace()); +            connection.open(args.getHost(), args.getPort()); +            Channel channel(args.getTransactional(), args.getPrefetch()); +            connection.openChannel(&channel); +         +            //declare exchange, queue and bind them: +            Queue response("response"); +            channel.declareQueue(response); +         +            Queue control; +            channel.declareQueue(control); +            qpid::framing::FieldTable bindArgs; +            channel.bind(Exchange::DEFAULT_TOPIC_EXCHANGE, control, "topic_control", bindArgs); +            //set up listener +            Listener listener(&channel, response.getName(), args.getTransactional()); +            std::string tag; +            channel.consume(control, tag, &listener, args.getAckMode()); +            channel.run(); +            connection.close(); +        }catch(qpid::QpidError error){ +            std::cout << error.what() << std::endl; +        } +    } +} + +Listener::Listener(Channel* _channel, const std::string& _responseq, bool tx) :  +    channel(_channel), responseQueue(_responseq), transactional(tx), init(false), count(0){} + +void Listener::received(Message& message){ +    if(!init){         +        start = now(); +        count = 0; +        init = true; +    } +    std::string type(message.getHeaders().getString("TYPE")); + +    if(type == "TERMINATION_REQUEST"){ +        shutdown(); +    }else if(type == "REPORT_REQUEST"){         +        //send a report: +        report(); +        init = false; +    }else if (++count % 100 == 0){         +        std::cout <<"Received " << count << " messages." << std::endl; +    } +} + +void Listener::shutdown(){ +    channel->close(); +} + +void Listener::report(){ +    Time finish = now(); +    Time time = finish - start; +    std::stringstream reportstr; +    reportstr << "Received " << count << " messages in " +              << time/TIME_MSEC << " ms."; +    Message msg; +    msg.setData(reportstr.str()); +    channel->publish(msg, string(), responseQueue); +    if(transactional){ +        channel->commit(); +    } +} + + +void Args::parse(int argc, char** argv){ +    for(int i = 1; i < argc; i++){ +        string name(argv[i]); +        if("-help" == name){ +            help = true; +            break; +        }else if("-host" == name){ +            host = argv[++i]; +        }else if("-port" == name){ +            port = atoi(argv[++i]); +        }else if("-ack_mode" == name){ +            ackMode = atoi(argv[++i]); +        }else if("-transactional" == name){ +            transactional = true; +        }else if("-prefetch" == name){ +            prefetch = atoi(argv[++i]); +        }else if("-trace" == name){ +            trace = true; +        }else{ +            std::cout << "Warning: unrecognised option " << name << std::endl; +        } +    } +} + +void Args::usage(){ +    std::cout << "Options:" << std::endl; +    std::cout << "    -help" << std::endl; +    std::cout << "            Prints this usage message" << std::endl; +    std::cout << "    -host <host>" << std::endl; +    std::cout << "            Specifies host to connect to (default is localhost)" << std::endl; +    std::cout << "    -port <port>" << std::endl; +    std::cout << "            Specifies port to conect to (default is 5762)" << std::endl; +    std::cout << "    -ack_mode <mode>" << std::endl; +    std::cout << "            Sets the acknowledgement mode" << std::endl; +    std::cout << "            0=NO_ACK (default), 1=AUTO_ACK, 2=LAZY_ACK" << std::endl; +    std::cout << "    -transactional" << std::endl; +    std::cout << "            Indicates the client should use transactions" << std::endl; +    std::cout << "    -prefetch <count>" << std::endl; +    std::cout << "            Specifies the prefetch count (default is 1000)" << std::endl; +    std::cout << "    -trace" << std::endl; +    std::cout << "            Indicates that the frames sent and received should be logged" << std::endl; +} diff --git a/cpp/tests/topic_publisher.cpp b/cpp/tests/topic_publisher.cpp new file mode 100644 index 0000000000..cde3f6ee79 --- /dev/null +++ b/cpp/tests/topic_publisher.cpp @@ -0,0 +1,258 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <QpidError.h> +#include <ClientChannel.h> +#include <Connection.h> +#include <ClientExchange.h> +#include <MessageListener.h> +#include <ClientQueue.h> +#include <sys/Monitor.h> +#include "unistd.h" +#include <sys/Time.h> +#include <cstdlib> +#include <iostream> + +using namespace qpid::client; +using namespace qpid::sys; +using std::string; + +class Publisher : public MessageListener{     +    Channel* const channel; +    const std::string controlTopic; +    const bool transactional; +    Monitor monitor; +    int count; +     +    void waitForCompletion(int msgs); +    string generateData(int size); + +public: +    Publisher(Channel* channel, const std::string& controlTopic, bool tx); +    virtual void received(Message& msg); +    int64_t publish(int msgs, int listeners, int size); +    void terminate(); +}; + +class Args{ +    string host; +    int port; +    int messages; +    int subscribers; +    int ackMode; +    bool transactional; +    int prefetch; +    int batches; +    int delay; +    int size; +    bool trace; +    bool help; +public: +    inline Args() : host("localhost"), port(5672), messages(1000), subscribers(1),  +                    ackMode(NO_ACK), transactional(false), prefetch(1000), batches(1),  +                    delay(0), size(256), trace(false), help(false){} + +    void parse(int argc, char** argv); +    void usage(); + +    inline const string& getHost() const { return host;} +    inline int getPort() const { return port; } +    inline int getMessages() const { return messages; } +    inline int getSubscribers() const { return subscribers; } +    inline int getAckMode(){ return ackMode; } +    inline bool getTransactional() const { return transactional; } +    inline int getPrefetch(){ return prefetch; } +    inline int getBatches(){ return batches; } +    inline int getDelay(){ return delay; } +    inline int getSize(){ return size; } +    inline bool getTrace() const { return trace; } +    inline bool getHelp() const { return help; } +}; + +int main(int argc, char** argv){ +    Args args; +    args.parse(argc, argv); +    if(args.getHelp()){ +        args.usage(); +    }else{ +        try{ +            Connection connection(args.getTrace()); +            connection.open(args.getHost(), args.getPort()); +            Channel channel(args.getTransactional(), args.getPrefetch()); +            connection.openChannel(&channel); + +            //declare queue (relying on default binding): +            Queue response("response"); +            channel.declareQueue(response); + +            //set up listener +            Publisher publisher(&channel, "topic_control", args.getTransactional()); +            std::string tag("mytag"); +            channel.consume(response, tag, &publisher, args.getAckMode()); +            channel.start(); + +            int batchSize(args.getBatches()); +            int64_t max(0); +            int64_t min(0); +            int64_t sum(0); +            for(int i = 0; i < batchSize; i++){ +                if(i > 0 && args.getDelay()) sleep(args.getDelay()); +                Time time = publisher.publish( +                    args.getMessages(), args.getSubscribers(), args.getSize()); +                if(!max || time > max) max = time; +                if(!min || time < min) min = time; +                sum += time; +                std::cout << "Completed " << (i+1) << " of " << batchSize +                          << " in " << time/TIME_MSEC << "ms" << std::endl; +            } +            publisher.terminate(); +            int64_t avg = sum / batchSize; +            if(batchSize > 1){ +                std::cout << batchSize << " batches completed. avg=" << avg <<  +                    ", max=" << max << ", min=" << min << std::endl; +            } +            channel.close(); +            connection.close(); +        }catch(qpid::QpidError error){ +            std::cout << error.what() << std::endl; +        } +    } +} + +Publisher::Publisher(Channel* _channel, const std::string& _controlTopic, bool tx) :  +    channel(_channel), controlTopic(_controlTopic), transactional(tx){} + +void Publisher::received(Message& ){ +    //count responses and when all are received end the current batch +    Monitor::ScopedLock l(monitor); +    if(--count == 0){ +        monitor.notify(); +    } +} + +void Publisher::waitForCompletion(int msgs){ +    count = msgs; +    monitor.wait(); +} + +int64_t Publisher::publish(int msgs, int listeners, int size){ +    Message msg; +    msg.setData(generateData(size)); +    Time start = now(); +    { +        Monitor::ScopedLock l(monitor); +        for(int i = 0; i < msgs; i++){ +            channel->publish(msg, Exchange::DEFAULT_TOPIC_EXCHANGE, controlTopic); +        } +        //send report request +        Message reportRequest; +        reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST"); +        channel->publish(reportRequest, Exchange::DEFAULT_TOPIC_EXCHANGE, controlTopic); +        if(transactional){ +            channel->commit(); +        } + +        waitForCompletion(listeners); +    } + +    Time finish = now(); +    return finish - start;  +} + +string Publisher::generateData(int size){ +    string data; +    for(int i = 0; i < size; i++){ +        data += ('A' + (i / 26)); +    } +    return data; +} + +void Publisher::terminate(){ +    //send termination request +    Message terminationRequest; +    terminationRequest.getHeaders().setString("TYPE", "TERMINATION_REQUEST"); +    channel->publish(terminationRequest, Exchange::DEFAULT_TOPIC_EXCHANGE, controlTopic); +    if(transactional){ +        channel->commit(); +    } +} + +void Args::parse(int argc, char** argv){ +    for(int i = 1; i < argc; i++){ +        string name(argv[i]); +        if("-help" == name){ +            help = true; +            break; +        }else if("-host" == name){ +            host = argv[++i]; +        }else if("-port" == name){ +            port = atoi(argv[++i]); +        }else if("-messages" == name){ +            messages = atoi(argv[++i]); +        }else if("-subscribers" == name){ +            subscribers = atoi(argv[++i]); +        }else if("-ack_mode" == name){ +            ackMode = atoi(argv[++i]); +        }else if("-transactional" == name){ +            transactional = true; +        }else if("-prefetch" == name){ +            prefetch = atoi(argv[++i]); +        }else if("-batches" == name){ +            batches = atoi(argv[++i]); +        }else if("-delay" == name){ +            delay = atoi(argv[++i]); +        }else if("-size" == name){ +            size = atoi(argv[++i]); +        }else if("-trace" == name){ +            trace = true; +        }else{ +            std::cout << "Warning: unrecognised option " << name << std::endl; +        } +    } +} + +void Args::usage(){ +    std::cout << "Options:" << std::endl; +    std::cout << "    -help" << std::endl; +    std::cout << "            Prints this usage message" << std::endl; +    std::cout << "    -host <host>" << std::endl; +    std::cout << "            Specifies host to connect to (default is localhost)" << std::endl; +    std::cout << "    -port <port>" << std::endl; +    std::cout << "            Specifies port to conect to (default is 5762)" << std::endl; +    std::cout << "    -messages <count>" << std::endl; +    std::cout << "            Specifies how many messages to send" << std::endl; +    std::cout << "    -subscribers <count>" << std::endl; +    std::cout << "            Specifies how many subscribers to expect reports from" << std::endl; +    std::cout << "    -ack_mode <mode>" << std::endl; +    std::cout << "            Sets the acknowledgement mode" << std::endl; +    std::cout << "            0=NO_ACK (default), 1=AUTO_ACK, 2=LAZY_ACK" << std::endl; +    std::cout << "    -transactional" << std::endl; +    std::cout << "            Indicates the client should use transactions" << std::endl; +    std::cout << "    -prefetch <count>" << std::endl; +    std::cout << "            Specifies the prefetch count (default is 1000)" << std::endl; +    std::cout << "    -batches <count>" << std::endl; +    std::cout << "            Specifies how many batches to run" << std::endl; +    std::cout << "    -delay <seconds>" << std::endl; +    std::cout << "            Causes a delay between each batch" << std::endl; +    std::cout << "    -size <bytes>" << std::endl; +    std::cout << "            Sets the size of the published messages (default is 256 bytes)" << std::endl; +    std::cout << "    -trace" << std::endl; +    std::cout << "            Indicates that the frames sent and received should be logged" << std::endl; +} diff --git a/cpp/tests/topicall b/cpp/tests/topicall new file mode 100755 index 0000000000..bde04a5b30 --- /dev/null +++ b/cpp/tests/topicall @@ -0,0 +1,25 @@ +#!/bin/sh +# Do 3 runs of topictests for C++ and Java brokers with reduced output. + +. `dirname $0`/env + +# Run a short topictest to warm up the broker and iron out startup effects. +flush() { +    topic_listener >/dev/null 2>&1 & +    topic_publisher >/dev/null 2>&1  +} + +echo Java broker +broker j ; flush +topictest c | tail -n1 +topictest c | tail -n1 +topictest c | tail -n1 + +echo C++ broker +broker c ; flush +topictest c | tail -n1 +topictest c | tail -n1 +topictest c | tail -n1 + +# Don't bother with java clients we know they're slower. + diff --git a/cpp/tests/topictest b/cpp/tests/topictest new file mode 100755 index 0000000000..792f063bea --- /dev/null +++ b/cpp/tests/topictest @@ -0,0 +1,42 @@ +#!/bin/bash +# Run the c++ or java topic test + +. `dirname $0`/env + +# Edit parameters here: + +# Big test: +# LISTENERS=10 +# MESSAGES=10000 +# BATCHES=20 + +LISTENERS=10 +MESSAGES=2000 +BATCHES=10 + +cppcmds() { +    LISTEN_CMD=topic_listener +    PUBLISH_CMD="topic_publisher -messages $MESSAGES -batches $BATCHES -subscribers $LISTENERS" +} + +javacmds() { +    DEF=-Damqj.logging.level="error" +    LISTEN_CMD="qpid-run $DEF org.apache.qpid.topic.Listener" +    PUBLISH_CMD="qpid-run $DEF org.apache.qpid.topic.Publisher -messages $MESSAGES -batch $BATCHES -clients $LISTENERS" +} + +case $1 in +    c) cppcmds ;; +    j) javacmds ;; +    *) cppcmds ;; +esac + +for ((i=$LISTENERS ; i--; )); do +    $LISTEN_CMD  > /dev/null 2>&1 & +done +sleep 1 +echo $PUBLISH_CMD $OPTIONS + +STATS=~/bin/topictest.times +echo "---- topictest `date`" >> $STATS +$PUBLISH_CMD $OPTIONS | tee -a $STATS | 
