diff options
Diffstat (limited to 'cpp/src/tests/MessagingFixture.h')
-rw-r--r-- | cpp/src/tests/MessagingFixture.h | 117 |
1 files changed, 117 insertions, 0 deletions
diff --git a/cpp/src/tests/MessagingFixture.h b/cpp/src/tests/MessagingFixture.h index 715de09bad..2312a87e9d 100644 --- a/cpp/src/tests/MessagingFixture.h +++ b/cpp/src/tests/MessagingFixture.h @@ -27,15 +27,19 @@ #include "qpid/client/Connection.h" #include "qpid/client/Session.h" #include "qpid/framing/Uuid.h" +#include "qpid/messaging/Address.h" #include "qpid/messaging/Connection.h" #include "qpid/messaging/Session.h" #include "qpid/messaging/Sender.h" #include "qpid/messaging/Receiver.h" #include "qpid/messaging/Message.h" +#include "qpid/types/Variant.h" namespace qpid { namespace tests { +using qpid::types::Variant; + struct BrokerAdmin { qpid::client::Connection connection; @@ -223,6 +227,119 @@ inline void receive(messaging::Receiver& receiver, uint count = 1, uint start = } } + +class MethodInvoker +{ + public: + MethodInvoker(messaging::Session& session) : replyTo("#; {create:always, node:{x-declare:{auto-delete:true}}}"), + sender(session.createSender("qmf.default.direct/broker")), + receiver(session.createReceiver(replyTo)) {} + + void createExchange(const std::string& name, const std::string& type, bool durable=false) + { + Variant::Map params; + params["name"]=name; + params["type"]="exchange"; + params["properties"] = Variant::Map(); + params["properties"].asMap()["exchange-type"] = type; + params["properties"].asMap()["durable"] = durable; + methodRequest("create", params); + } + + void deleteExchange(const std::string& name) + { + Variant::Map params; + params["name"]=name; + params["type"]="exchange"; + methodRequest("delete", params); + } + + void createQueue(const std::string& name, bool durable=false, bool autodelete=false, + const Variant::Map& options=Variant::Map()) + { + Variant::Map params; + params["name"]=name; + params["type"]="queue"; + params["properties"] = options; + params["properties"].asMap()["durable"] = durable; + params["properties"].asMap()["auto-delete"] = autodelete; + methodRequest("create", params); + } + + void deleteQueue(const std::string& name) + { + Variant::Map params; + params["name"]=name; + params["type"]="queue"; + methodRequest("delete", params); + } + + void bind(const std::string& exchange, const std::string& queue, const std::string& key, + const Variant::Map& options=Variant::Map()) + { + Variant::Map params; + params["name"]=(boost::format("%1%/%2%/%3%") % (exchange) % (queue) % (key)).str(); + params["type"]="binding"; + params["properties"] = options; + methodRequest("create", params); + } + + void unbind(const std::string& exchange, const std::string& queue, const std::string& key) + { + Variant::Map params; + params["name"]=(boost::format("%1%/%2%/%3%") % (exchange) % (queue) % (key)).str(); + params["type"]="binding"; + methodRequest("delete", params); + } + + void methodRequest(const std::string& method, const Variant::Map& inParams, Variant::Map* outParams = 0) + { + Variant::Map content; + Variant::Map objectId; + objectId["_object_name"] = "org.apache.qpid.broker:broker:amqp-broker"; + content["_object_id"] = objectId; + content["_method_name"] = method; + content["_arguments"] = inParams; + + messaging::Message request; + request.setReplyTo(replyTo); + request.getProperties()["x-amqp-0-10.app-id"] = "qmf2"; + request.getProperties()["qmf.opcode"] = "_method_request"; + encode(content, request); + + sender.send(request); + + messaging::Message response; + if (receiver.fetch(response, messaging::Duration::SECOND*5)) { + if (response.getProperties()["x-amqp-0-10.app-id"] == "qmf2") { + std::string opcode = response.getProperties()["qmf.opcode"]; + if (opcode == "_method_response") { + if (outParams) { + Variant::Map m; + decode(response, m); + *outParams = m["_arguments"].asMap(); + } + } else if (opcode == "_exception") { + Variant::Map m; + decode(response, m); + throw Exception(QPID_MSG("Error: " << m["_values"])); + } else { + throw Exception(QPID_MSG("Invalid response received, unexpected opcode: " << opcode)); + } + } else { + throw Exception(QPID_MSG("Invalid response received, not a qmfv2 message: app-id=" + << response.getProperties()["x-amqp-0-10.app-id"])); + } + } else { + throw Exception(QPID_MSG("No response received")); + } + } + private: + messaging::Address replyTo; + messaging::Sender sender; + messaging::Receiver receiver; +}; + }} // namespace qpid::tests #endif /*!TESTS_MESSAGINGFIXTURE_H*/ |