summaryrefslogtreecommitdiff
path: root/cpp/src/tests/MessagingFixture.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/MessagingFixture.h')
-rw-r--r--cpp/src/tests/MessagingFixture.h345
1 files changed, 0 insertions, 345 deletions
diff --git a/cpp/src/tests/MessagingFixture.h b/cpp/src/tests/MessagingFixture.h
deleted file mode 100644
index 2312a87e9d..0000000000
--- a/cpp/src/tests/MessagingFixture.h
+++ /dev/null
@@ -1,345 +0,0 @@
-#ifndef TESTS_MESSAGINGFIXTURE_H
-#define TESTS_MESSAGINGFIXTURE_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "BrokerFixture.h"
-#include "unit_test.h"
-#include "test_tools.h"
-#include "qpid/client/Connection.h"
-#include "qpid/client/Session.h"
-#include "qpid/framing/Uuid.h"
-#include "qpid/messaging/Address.h"
-#include "qpid/messaging/Connection.h"
-#include "qpid/messaging/Session.h"
-#include "qpid/messaging/Sender.h"
-#include "qpid/messaging/Receiver.h"
-#include "qpid/messaging/Message.h"
-#include "qpid/types/Variant.h"
-
-namespace qpid {
-namespace tests {
-
-using qpid::types::Variant;
-
-struct BrokerAdmin
-{
- qpid::client::Connection connection;
- qpid::client::Session session;
-
- BrokerAdmin(uint16_t port)
- {
- connection.open("localhost", port);
- session = connection.newSession();
- }
-
- void createQueue(const std::string& name)
- {
- session.queueDeclare(qpid::client::arg::queue=name);
- }
-
- void deleteQueue(const std::string& name)
- {
- session.queueDelete(qpid::client::arg::queue=name);
- }
-
- void createExchange(const std::string& name, const std::string& type)
- {
- session.exchangeDeclare(qpid::client::arg::exchange=name, qpid::client::arg::type=type);
- }
-
- void deleteExchange(const std::string& name)
- {
- session.exchangeDelete(qpid::client::arg::exchange=name);
- }
-
- bool checkQueueExists(const std::string& name)
- {
- return session.queueQuery(name).getQueue() == name;
- }
-
- bool checkExchangeExists(const std::string& name, std::string& type)
- {
- qpid::framing::ExchangeQueryResult result = session.exchangeQuery(name);
- type = result.getType();
- return !result.getNotFound();
- }
-
- void send(qpid::client::Message& message, const std::string& exchange=std::string())
- {
- session.messageTransfer(qpid::client::arg::destination=exchange, qpid::client::arg::content=message);
- }
-
- ~BrokerAdmin()
- {
- session.close();
- connection.close();
- }
-};
-
-struct MessagingFixture : public BrokerFixture
-{
- messaging::Connection connection;
- messaging::Session session;
- BrokerAdmin admin;
-
- MessagingFixture(Broker::Options opts = Broker::Options(), bool mgmtEnabled=false) :
- BrokerFixture(opts, mgmtEnabled),
- connection(open(broker->getPort(Broker::TCP_TRANSPORT))),
- session(connection.createSession()),
- admin(broker->getPort(Broker::TCP_TRANSPORT))
- {
- }
-
- static messaging::Connection open(uint16_t port)
- {
- messaging::Connection connection(
- (boost::format("amqp:tcp:localhost:%1%") % (port)).str());
- connection.open();
- return connection;
- }
-
- /** Open a connection to the broker. */
- qpid::messaging::Connection newConnection()
- {
- qpid::messaging::Connection connection(
- (boost::format("amqp:tcp:localhost:%1%") % (broker->getPort(qpid::broker::Broker::TCP_TRANSPORT))).str());
- return connection;
- }
-
- void ping(const qpid::messaging::Address& address)
- {
- messaging::Receiver r = session.createReceiver(address);
- messaging::Sender s = session.createSender(address);
- messaging::Message out(framing::Uuid(true).str());
- s.send(out);
- messaging::Message in;
- BOOST_CHECK(r.fetch(in, 5*messaging::Duration::SECOND));
- BOOST_CHECK_EQUAL(out.getContent(), in.getContent());
- r.close();
- s.close();
- }
-
- ~MessagingFixture()
- {
- session.close();
- connection.close();
- }
-};
-
-struct QueueFixture : MessagingFixture
-{
- std::string queue;
-
- QueueFixture(const std::string& name = "test-queue") : queue(name)
- {
- admin.createQueue(queue);
- }
-
- ~QueueFixture()
- {
- admin.deleteQueue(queue);
- }
-
-};
-
-struct TopicFixture : MessagingFixture
-{
- std::string topic;
-
- TopicFixture(const std::string& name = "test-topic", const std::string& type="fanout") : topic(name)
- {
- admin.createExchange(topic, type);
- }
-
- ~TopicFixture()
- {
- admin.deleteExchange(topic);
- }
-
-};
-
-struct MultiQueueFixture : MessagingFixture
-{
- typedef std::vector<std::string>::const_iterator const_iterator;
- std::vector<std::string> queues;
-
- MultiQueueFixture(const std::vector<std::string>& names = boost::assign::list_of<std::string>("q1")("q2")("q3")) : queues(names)
- {
- for (const_iterator i = queues.begin(); i != queues.end(); ++i) {
- admin.createQueue(*i);
- }
- }
-
- ~MultiQueueFixture()
- {
- connection.close();
- for (const_iterator i = queues.begin(); i != queues.end(); ++i) {
- admin.deleteQueue(*i);
- }
- }
-
-};
-
-inline std::vector<std::string> fetch(messaging::Receiver& receiver, int count, messaging::Duration timeout=messaging::Duration::SECOND*5)
-{
- std::vector<std::string> data;
- messaging::Message message;
- for (int i = 0; i < count && receiver.fetch(message, timeout); i++) {
- data.push_back(message.getContent());
- }
- return data;
-}
-
-
-inline void send(messaging::Sender& sender, uint count = 1, uint start = 1,
- const std::string& base = "Message")
-{
- for (uint i = start; i < start + count; ++i) {
- sender.send(messaging::Message((boost::format("%1%_%2%") % base % i).str()));
- }
-}
-
-inline void receive(messaging::Receiver& receiver, uint count = 1, uint start = 1,
- const std::string& base = "Message",
- messaging::Duration timeout=messaging::Duration::SECOND*5)
-{
- for (uint i = start; i < start + count; ++i) {
- BOOST_CHECK_EQUAL(receiver.fetch(timeout).getContent(), (boost::format("%1%_%2%") % base % i).str());
- }
-}
-
-
-class MethodInvoker
-{
- public:
- MethodInvoker(messaging::Session& session) : replyTo("#; {create:always, node:{x-declare:{auto-delete:true}}}"),
- sender(session.createSender("qmf.default.direct/broker")),
- receiver(session.createReceiver(replyTo)) {}
-
- void createExchange(const std::string& name, const std::string& type, bool durable=false)
- {
- Variant::Map params;
- params["name"]=name;
- params["type"]="exchange";
- params["properties"] = Variant::Map();
- params["properties"].asMap()["exchange-type"] = type;
- params["properties"].asMap()["durable"] = durable;
- methodRequest("create", params);
- }
-
- void deleteExchange(const std::string& name)
- {
- Variant::Map params;
- params["name"]=name;
- params["type"]="exchange";
- methodRequest("delete", params);
- }
-
- void createQueue(const std::string& name, bool durable=false, bool autodelete=false,
- const Variant::Map& options=Variant::Map())
- {
- Variant::Map params;
- params["name"]=name;
- params["type"]="queue";
- params["properties"] = options;
- params["properties"].asMap()["durable"] = durable;
- params["properties"].asMap()["auto-delete"] = autodelete;
- methodRequest("create", params);
- }
-
- void deleteQueue(const std::string& name)
- {
- Variant::Map params;
- params["name"]=name;
- params["type"]="queue";
- methodRequest("delete", params);
- }
-
- void bind(const std::string& exchange, const std::string& queue, const std::string& key,
- const Variant::Map& options=Variant::Map())
- {
- Variant::Map params;
- params["name"]=(boost::format("%1%/%2%/%3%") % (exchange) % (queue) % (key)).str();
- params["type"]="binding";
- params["properties"] = options;
- methodRequest("create", params);
- }
-
- void unbind(const std::string& exchange, const std::string& queue, const std::string& key)
- {
- Variant::Map params;
- params["name"]=(boost::format("%1%/%2%/%3%") % (exchange) % (queue) % (key)).str();
- params["type"]="binding";
- methodRequest("delete", params);
- }
-
- void methodRequest(const std::string& method, const Variant::Map& inParams, Variant::Map* outParams = 0)
- {
- Variant::Map content;
- Variant::Map objectId;
- objectId["_object_name"] = "org.apache.qpid.broker:broker:amqp-broker";
- content["_object_id"] = objectId;
- content["_method_name"] = method;
- content["_arguments"] = inParams;
-
- messaging::Message request;
- request.setReplyTo(replyTo);
- request.getProperties()["x-amqp-0-10.app-id"] = "qmf2";
- request.getProperties()["qmf.opcode"] = "_method_request";
- encode(content, request);
-
- sender.send(request);
-
- messaging::Message response;
- if (receiver.fetch(response, messaging::Duration::SECOND*5)) {
- if (response.getProperties()["x-amqp-0-10.app-id"] == "qmf2") {
- std::string opcode = response.getProperties()["qmf.opcode"];
- if (opcode == "_method_response") {
- if (outParams) {
- Variant::Map m;
- decode(response, m);
- *outParams = m["_arguments"].asMap();
- }
- } else if (opcode == "_exception") {
- Variant::Map m;
- decode(response, m);
- throw Exception(QPID_MSG("Error: " << m["_values"]));
- } else {
- throw Exception(QPID_MSG("Invalid response received, unexpected opcode: " << opcode));
- }
- } else {
- throw Exception(QPID_MSG("Invalid response received, not a qmfv2 message: app-id="
- << response.getProperties()["x-amqp-0-10.app-id"]));
- }
- } else {
- throw Exception(QPID_MSG("No response received"));
- }
- }
- private:
- messaging::Address replyTo;
- messaging::Sender sender;
- messaging::Receiver receiver;
-};
-
-}} // namespace qpid::tests
-
-#endif /*!TESTS_MESSAGINGFIXTURE_H*/