#ifndef TESTS_MESSAGINGFIXTURE_H #define TESTS_MESSAGINGFIXTURE_H /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "BrokerFixture.h" #include "unit_test.h" #include "test_tools.h" #include "qpid/client/Connection.h" #include "qpid/client/Session.h" #include "qpid/framing/Uuid.h" #include "qpid/messaging/Address.h" #include "qpid/messaging/Connection.h" #include "qpid/messaging/Session.h" #include "qpid/messaging/Sender.h" #include "qpid/messaging/Receiver.h" #include "qpid/messaging/Message.h" #include "qpid/types/Variant.h" namespace qpid { namespace tests { using qpid::types::Variant; struct BrokerAdmin { qpid::client::Connection connection; qpid::client::Session session; BrokerAdmin(uint16_t port) { connection.open("localhost", port); session = connection.newSession(); } void createQueue(const std::string& name) { session.queueDeclare(qpid::client::arg::queue=name); } void deleteQueue(const std::string& name) { session.queueDelete(qpid::client::arg::queue=name); } void createExchange(const std::string& name, const std::string& type) { session.exchangeDeclare(qpid::client::arg::exchange=name, qpid::client::arg::type=type); } void deleteExchange(const std::string& name) { session.exchangeDelete(qpid::client::arg::exchange=name); } bool checkQueueExists(const std::string& name) { return session.queueQuery(name).getQueue() == name; } bool checkExchangeExists(const std::string& name, std::string& type) { qpid::framing::ExchangeQueryResult result = session.exchangeQuery(name); type = result.getType(); return !result.getNotFound(); } void send(qpid::client::Message& message, const std::string& exchange=std::string()) { session.messageTransfer(qpid::client::arg::destination=exchange, qpid::client::arg::content=message); } ~BrokerAdmin() { session.close(); connection.close(); } }; struct MessagingFixture : public BrokerFixture { messaging::Connection connection; messaging::Session session; BrokerAdmin admin; MessagingFixture(Broker::Options opts = Broker::Options(), bool mgmtEnabled=false) : BrokerFixture(opts, mgmtEnabled), connection(open(broker->getPort(Broker::TCP_TRANSPORT))), session(connection.createSession()), admin(broker->getPort(Broker::TCP_TRANSPORT)) { } static messaging::Connection open(uint16_t port) { messaging::Connection connection( (boost::format("amqp:tcp:localhost:%1%") % (port)).str()); connection.open(); return connection; } /** Open a connection to the broker. */ qpid::messaging::Connection newConnection() { qpid::messaging::Connection connection( (boost::format("amqp:tcp:localhost:%1%") % (broker->getPort(qpid::broker::Broker::TCP_TRANSPORT))).str()); return connection; } void ping(const qpid::messaging::Address& address) { messaging::Receiver r = session.createReceiver(address); messaging::Sender s = session.createSender(address); messaging::Message out(framing::Uuid(true).str()); s.send(out); messaging::Message in; BOOST_CHECK(r.fetch(in, 5*messaging::Duration::SECOND)); BOOST_CHECK_EQUAL(out.getContent(), in.getContent()); r.close(); s.close(); } ~MessagingFixture() { session.close(); connection.close(); } }; struct QueueFixture : MessagingFixture { std::string queue; QueueFixture(const std::string& name = "test-queue") : queue(name) { admin.createQueue(queue); } ~QueueFixture() { admin.deleteQueue(queue); } }; struct TopicFixture : MessagingFixture { std::string topic; TopicFixture(const std::string& name = "test-topic", const std::string& type="fanout") : topic(name) { admin.createExchange(topic, type); } ~TopicFixture() { admin.deleteExchange(topic); } }; struct MultiQueueFixture : MessagingFixture { typedef std::vector::const_iterator const_iterator; std::vector queues; MultiQueueFixture(const std::vector& names = boost::assign::list_of("q1")("q2")("q3")) : queues(names) { for (const_iterator i = queues.begin(); i != queues.end(); ++i) { admin.createQueue(*i); } } ~MultiQueueFixture() { connection.close(); for (const_iterator i = queues.begin(); i != queues.end(); ++i) { admin.deleteQueue(*i); } } }; inline std::vector fetch(messaging::Receiver& receiver, int count, messaging::Duration timeout=messaging::Duration::SECOND*5) { std::vector data; messaging::Message message; for (int i = 0; i < count && receiver.fetch(message, timeout); i++) { data.push_back(message.getContent()); } return data; } inline void send(messaging::Sender& sender, uint count = 1, uint start = 1, const std::string& base = "Message") { for (uint i = start; i < start + count; ++i) { sender.send(messaging::Message((boost::format("%1%_%2%") % base % i).str())); } } inline void receive(messaging::Receiver& receiver, uint count = 1, uint start = 1, const std::string& base = "Message", messaging::Duration timeout=messaging::Duration::SECOND*5) { for (uint i = start; i < start + count; ++i) { BOOST_CHECK_EQUAL(receiver.fetch(timeout).getContent(), (boost::format("%1%_%2%") % base % i).str()); } } class MethodInvoker { public: MethodInvoker(messaging::Session& session) : replyTo("#; {create:always, node:{x-declare:{auto-delete:true}}}"), sender(session.createSender("qmf.default.direct/broker")), receiver(session.createReceiver(replyTo)) {} void createExchange(const std::string& name, const std::string& type, bool durable=false) { Variant::Map params; params["name"]=name; params["type"]="exchange"; params["properties"] = Variant::Map(); params["properties"].asMap()["exchange-type"] = type; params["properties"].asMap()["durable"] = durable; methodRequest("create", params); } void deleteExchange(const std::string& name) { Variant::Map params; params["name"]=name; params["type"]="exchange"; methodRequest("delete", params); } void createQueue(const std::string& name, bool durable=false, bool autodelete=false, const Variant::Map& options=Variant::Map()) { Variant::Map params; params["name"]=name; params["type"]="queue"; params["properties"] = options; params["properties"].asMap()["durable"] = durable; params["properties"].asMap()["auto-delete"] = autodelete; methodRequest("create", params); } void deleteQueue(const std::string& name) { Variant::Map params; params["name"]=name; params["type"]="queue"; methodRequest("delete", params); } void bind(const std::string& exchange, const std::string& queue, const std::string& key, const Variant::Map& options=Variant::Map()) { Variant::Map params; params["name"]=(boost::format("%1%/%2%/%3%") % (exchange) % (queue) % (key)).str(); params["type"]="binding"; params["properties"] = options; methodRequest("create", params); } void unbind(const std::string& exchange, const std::string& queue, const std::string& key) { Variant::Map params; params["name"]=(boost::format("%1%/%2%/%3%") % (exchange) % (queue) % (key)).str(); params["type"]="binding"; methodRequest("delete", params); } void methodRequest(const std::string& method, const Variant::Map& inParams, Variant::Map* outParams = 0) { Variant::Map content; Variant::Map objectId; objectId["_object_name"] = "org.apache.qpid.broker:broker:amqp-broker"; content["_object_id"] = objectId; content["_method_name"] = method; content["_arguments"] = inParams; messaging::Message request; request.setReplyTo(replyTo); request.getProperties()["x-amqp-0-10.app-id"] = "qmf2"; request.getProperties()["qmf.opcode"] = "_method_request"; encode(content, request); sender.send(request); messaging::Message response; if (receiver.fetch(response, messaging::Duration::SECOND*5)) { if (response.getProperties()["x-amqp-0-10.app-id"] == "qmf2") { std::string opcode = response.getProperties()["qmf.opcode"]; if (opcode == "_method_response") { if (outParams) { Variant::Map m; decode(response, m); *outParams = m["_arguments"].asMap(); } } else if (opcode == "_exception") { Variant::Map m; decode(response, m); throw Exception(QPID_MSG("Error: " << m["_values"])); } else { throw Exception(QPID_MSG("Invalid response received, unexpected opcode: " << opcode)); } } else { throw Exception(QPID_MSG("Invalid response received, not a qmfv2 message: app-id=" << response.getProperties()["x-amqp-0-10.app-id"])); } } else { throw Exception(QPID_MSG("No response received")); } } private: messaging::Address replyTo; messaging::Sender sender; messaging::Receiver receiver; }; }} // namespace qpid::tests #endif /*!TESTS_MESSAGINGFIXTURE_H*/