diff options
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r-- | qpid/cpp/src/CMakeLists.txt | 101 | ||||
-rw-r--r-- | qpid/cpp/src/Makefile.am | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.cpp | 20 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.h | 7 | ||||
-rw-r--r-- | qpid/cpp/src/tests/MessagingFixture.h | 117 | ||||
-rw-r--r-- | qpid/cpp/src/tests/MessagingSessionTests.cpp | 47 | ||||
-rw-r--r-- | qpid/cpp/src/tests/QueueTest.cpp | 2 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/qpid-ctrl | 5 | ||||
-rw-r--r-- | qpid/cpp/src/tests/sender.cpp | 2 |
9 files changed, 288 insertions, 18 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index 4b5a1b1c2c..d0ca2d9c2b 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -313,10 +313,6 @@ if (NOT Boost_FILESYSTEM_LIBRARY) set(Boost_FILESYSTEM_LIBRARY boost_filesystem) endif (NOT Boost_FILESYSTEM_LIBRARY) -if (NOT Boost_SYSTEM_LIBRARY) - set(Boost_SYSTEM_LIBRARY boost_system) -endif (NOT Boost_SYSTEM_LIBRARY) - if (NOT Boost_UNIT_TEST_FRAMEWORK_LIBRARY) set(Boost_UNIT_TEST_FRAMEWORK_LIBRARY boost_unit_test_framework) endif (NOT Boost_UNIT_TEST_FRAMEWORK_LIBRARY) @@ -602,14 +598,14 @@ if (CMAKE_SYSTEM_NAME STREQUAL Windows) set (CMAKE_CXX_FLAGS_RELWITHDEBINFO "/MD /O2 /Ob2 /D NDEBUG") set (CMAKE_SHARED_LINKER_FLAGS_RELWITHDEBINFO "/debug /INCREMENTAL:NO") - # Set the windows version for the .NET Binding cpp project - configure_file(${CMAKE_CURRENT_SOURCE_DIR}/../bindings/qpid/dotnet/src/org.apache.qpid.messaging.template.rc - ${CMAKE_CURRENT_BINARY_DIR}/windows/resources/org.apache.qpid.messaging.rc) - - # Set the windows version for the .NET Binding sessionreceiver project - configure_file(${CMAKE_CURRENT_SOURCE_DIR}/../bindings/qpid/dotnet/src/sessionreceiver/properties/sessionreceiver-AssemblyInfo-template.cs - ${CMAKE_CURRENT_BINARY_DIR}/windows/generated_src/sessionreceiver-AssemblyInfo.cs) - + if (EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/../bindings/qpid/dotnet/src) + # Set the windows version for the .NET Binding cpp project + configure_file(${CMAKE_CURRENT_SOURCE_DIR}/../bindings/qpid/dotnet/src/org.apache.qpid.messaging.template.rc + ${CMAKE_CURRENT_BINARY_DIR}/windows/resources/org.apache.qpid.messaging.rc) + # Set the windows version for the .NET Binding sessionreceiver project + configure_file(${CMAKE_CURRENT_SOURCE_DIR}/../bindings/qpid/dotnet/src/sessionreceiver/properties/sessionreceiver-AssemblyInfo-template.cs + ${CMAKE_CURRENT_BINARY_DIR}/windows/generated_src/sessionreceiver-AssemblyInfo.cs) + endif (EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/../bindings/qpid/dotnet/src) endif (MSVC) set (qpidtypes_platform_SOURCES @@ -1068,13 +1064,15 @@ endif (CPACK_GENERATOR STREQUAL "NSIS") # REVISION => Version of underlying implementation. # Bump if implementation changes but API/ABI doesn't # AGE => Number of API/ABI versions this is backward compatible with -set (qmf_version 1.0.0) +set (qmf_version 2.0.0) +set (qmf2_version 1.0.0) set (qmfengine_version 1.0.0) set (qmf_SOURCES qpid/agent/ManagementAgentImpl.cpp qpid/agent/ManagementAgentImpl.h ) + add_msvc_version (qmf library dll) add_library (qmf SHARED ${qmf_SOURCES}) target_link_libraries (qmf qpidclient) @@ -1085,6 +1083,83 @@ install (TARGETS qmf OPTIONAL COMPONENT ${QPID_COMPONENT_QMF}) install_pdb (qmf ${QPID_COMPONENT_QMF}) +if(NOT WIN32) + set (qmf2_HEADERS + ../include/qmf/AgentEvent.h + ../include/qmf/Agent.h + ../include/qmf/AgentSession.h + ../include/qmf/ConsoleEvent.h + ../include/qmf/ConsoleSession.h + ../include/qmf/DataAddr.h + ../include/qmf/Data.h + ../include/qmf/exceptions.h + ../include/qmf/Handle.h + ../include/qmf/ImportExport.h + ../include/qmf/Query.h + ../include/qmf/Schema.h + ../include/qmf/SchemaId.h + ../include/qmf/SchemaMethod.h + ../include/qmf/SchemaProperty.h + ../include/qmf/SchemaTypes.h + ../include/qmf/Subscription.h + ) + + set (qmf2_SOURCES + ${qmf2_HEADERS} + qmf/agentCapability.h + qmf/Agent.cpp + qmf/AgentEvent.cpp + qmf/AgentEventImpl.h + qmf/AgentImpl.h + qmf/AgentSession.cpp + qmf/AgentSubscription.cpp + qmf/AgentSubscription.h + qmf/ConsoleEvent.cpp + qmf/ConsoleEventImpl.h + qmf/ConsoleSession.cpp + qmf/ConsoleSessionImpl.h + qmf/constants.cpp + qmf/constants.h + qmf/DataAddr.cpp + qmf/DataAddrImpl.h + qmf/Data.cpp + qmf/DataImpl.h + qmf/exceptions.cpp + qmf/Expression.cpp + qmf/Expression.h + qmf/Hash.cpp + qmf/Hash.h + qmf/PrivateImplRef.h + qmf/Query.cpp + qmf/QueryImpl.h + qmf/Schema.cpp + qmf/SchemaCache.cpp + qmf/SchemaCache.h + qmf/SchemaId.cpp + qmf/SchemaIdImpl.h + qmf/SchemaImpl.h + qmf/SchemaMethod.cpp + qmf/SchemaMethodImpl.h + qmf/SchemaProperty.cpp + qmf/SchemaPropertyImpl.h + qmf/Subscription.cpp + qmf/SubscriptionImpl.h + ) + + add_msvc_version (qmf2 library dll) + add_library (qmf2 SHARED ${qmf2_SOURCES}) + target_link_libraries (qmf2 qpidmessaging qpidtypes qpidclient qpidcommon) + set_target_properties (qmf2 PROPERTIES + VERSION ${qmf2_version}) + install (TARGETS qmf2 OPTIONAL + DESTINATION ${QPID_INSTALL_LIBDIR} + COMPONENT ${QPID_COMPONENT_QMF}) + install (FILES ${qmf2_HEADERS} + DESTINATION ${QPID_INSTALL_INCLUDEDIR}/qmf + COMPONENT ${QPID_COMPONENT_QMF}) + install_pdb (qmf2 ${QPID_COMPONENT_QMF}) +endif (NOT WIN32) + set (qmfengine_SOURCES qmf/engine/Agent.cpp qmf/engine/BrokerProxyImpl.cpp diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 6fafff7d54..dfb2547613 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -56,7 +56,10 @@ windows_dist = \ qpid/broker/windows/BrokerDefaults.cpp \ qpid/broker/windows/SaslAuthenticator.cpp \ qpid/broker/windows/SslProtocolFactory.cpp \ - qpid/messaging/HandleInstantiator.cpp + qpid/messaging/HandleInstantiator.cpp \ + windows/resources/template-resource.rc \ + windows/resources/version-resource.h \ + windows/resources/qpid-icon.ico EXTRA_DIST= $(platform_dist) $(rgen_srcs) $(windows_dist) diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index 23c999a98a..8b4defaa73 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -31,6 +31,7 @@ #include <qpid/broker/Message.h> #include "qpid/framing/MessageTransferBody.h" #include "qpid/sys/Time.h" +#include "qpid/sys/Thread.h" #include "qpid/broker/ConnectionState.h" #include "qpid/broker/AclModule.h" #include "qpid/types/Variant.h" @@ -2237,6 +2238,7 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg, bool viaLocal) uint32_t bufferLen = inBuffer.getPosition(); inBuffer.reset(); + setManagementExecutionContext((const qpid::broker::ConnectionState*) msg.getPublisher()); const framing::FieldTable *headers = msg.getApplicationHeaders(); if (headers && msg.getAppId() == "qmf2") { @@ -3085,3 +3087,21 @@ bool ManagementAgent::moveDeletedObjectsLH() { } return !deleteList.empty(); } + +namespace qpid { +namespace management { + +namespace { +QPID_TSS const qpid::broker::ConnectionState* executionContext = 0; +} + +void setManagementExecutionContext(const qpid::broker::ConnectionState* ctxt) +{ + executionContext = ctxt; +} +const qpid::broker::ConnectionState* getManagementExecutionContext() +{ + return executionContext; +} + +}} diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h index 0db19594a7..fb15dc6ed1 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.h +++ b/qpid/cpp/src/qpid/management/ManagementAgent.h @@ -41,6 +41,9 @@ #include <map> namespace qpid { +namespace broker { +class ConnectionState; +} namespace management { class ManagementAgent @@ -422,6 +425,8 @@ private: void debugSnapshot(const char* title); }; +void setManagementExecutionContext(const qpid::broker::ConnectionState*); +const qpid::broker::ConnectionState* getManagementExecutionContext(); }} - + #endif /*!_ManagementAgent_*/ diff --git a/qpid/cpp/src/tests/MessagingFixture.h b/qpid/cpp/src/tests/MessagingFixture.h index 715de09bad..2312a87e9d 100644 --- a/qpid/cpp/src/tests/MessagingFixture.h +++ b/qpid/cpp/src/tests/MessagingFixture.h @@ -27,15 +27,19 @@ #include "qpid/client/Connection.h" #include "qpid/client/Session.h" #include "qpid/framing/Uuid.h" +#include "qpid/messaging/Address.h" #include "qpid/messaging/Connection.h" #include "qpid/messaging/Session.h" #include "qpid/messaging/Sender.h" #include "qpid/messaging/Receiver.h" #include "qpid/messaging/Message.h" +#include "qpid/types/Variant.h" namespace qpid { namespace tests { +using qpid::types::Variant; + struct BrokerAdmin { qpid::client::Connection connection; @@ -223,6 +227,119 @@ inline void receive(messaging::Receiver& receiver, uint count = 1, uint start = } } + +class MethodInvoker +{ + public: + MethodInvoker(messaging::Session& session) : replyTo("#; {create:always, node:{x-declare:{auto-delete:true}}}"), + sender(session.createSender("qmf.default.direct/broker")), + receiver(session.createReceiver(replyTo)) {} + + void createExchange(const std::string& name, const std::string& type, bool durable=false) + { + Variant::Map params; + params["name"]=name; + params["type"]="exchange"; + params["properties"] = Variant::Map(); + params["properties"].asMap()["exchange-type"] = type; + params["properties"].asMap()["durable"] = durable; + methodRequest("create", params); + } + + void deleteExchange(const std::string& name) + { + Variant::Map params; + params["name"]=name; + params["type"]="exchange"; + methodRequest("delete", params); + } + + void createQueue(const std::string& name, bool durable=false, bool autodelete=false, + const Variant::Map& options=Variant::Map()) + { + Variant::Map params; + params["name"]=name; + params["type"]="queue"; + params["properties"] = options; + params["properties"].asMap()["durable"] = durable; + params["properties"].asMap()["auto-delete"] = autodelete; + methodRequest("create", params); + } + + void deleteQueue(const std::string& name) + { + Variant::Map params; + params["name"]=name; + params["type"]="queue"; + methodRequest("delete", params); + } + + void bind(const std::string& exchange, const std::string& queue, const std::string& key, + const Variant::Map& options=Variant::Map()) + { + Variant::Map params; + params["name"]=(boost::format("%1%/%2%/%3%") % (exchange) % (queue) % (key)).str(); + params["type"]="binding"; + params["properties"] = options; + methodRequest("create", params); + } + + void unbind(const std::string& exchange, const std::string& queue, const std::string& key) + { + Variant::Map params; + params["name"]=(boost::format("%1%/%2%/%3%") % (exchange) % (queue) % (key)).str(); + params["type"]="binding"; + methodRequest("delete", params); + } + + void methodRequest(const std::string& method, const Variant::Map& inParams, Variant::Map* outParams = 0) + { + Variant::Map content; + Variant::Map objectId; + objectId["_object_name"] = "org.apache.qpid.broker:broker:amqp-broker"; + content["_object_id"] = objectId; + content["_method_name"] = method; + content["_arguments"] = inParams; + + messaging::Message request; + request.setReplyTo(replyTo); + request.getProperties()["x-amqp-0-10.app-id"] = "qmf2"; + request.getProperties()["qmf.opcode"] = "_method_request"; + encode(content, request); + + sender.send(request); + + messaging::Message response; + if (receiver.fetch(response, messaging::Duration::SECOND*5)) { + if (response.getProperties()["x-amqp-0-10.app-id"] == "qmf2") { + std::string opcode = response.getProperties()["qmf.opcode"]; + if (opcode == "_method_response") { + if (outParams) { + Variant::Map m; + decode(response, m); + *outParams = m["_arguments"].asMap(); + } + } else if (opcode == "_exception") { + Variant::Map m; + decode(response, m); + throw Exception(QPID_MSG("Error: " << m["_values"])); + } else { + throw Exception(QPID_MSG("Invalid response received, unexpected opcode: " << opcode)); + } + } else { + throw Exception(QPID_MSG("Invalid response received, not a qmfv2 message: app-id=" + << response.getProperties()["x-amqp-0-10.app-id"])); + } + } else { + throw Exception(QPID_MSG("No response received")); + } + } + private: + messaging::Address replyTo; + messaging::Sender sender; + messaging::Receiver receiver; +}; + }} // namespace qpid::tests #endif /*!TESTS_MESSAGINGFIXTURE_H*/ diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp index 991ec847bf..f9a8b0e4c1 100644 --- a/qpid/cpp/src/tests/MessagingSessionTests.cpp +++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp @@ -890,6 +890,53 @@ QPID_AUTO_TEST_CASE(testAcknowledge) BOOST_CHECK(!fix.session.createReceiver(fix.queue).fetch(m, Duration::IMMEDIATE)); } +QPID_AUTO_TEST_CASE(testQmfCreateAndDelete) +{ + MessagingFixture fix(Broker::Options(), true/*enable management*/); + MethodInvoker control(fix.session); + control.createQueue("my-queue"); + control.createExchange("my-exchange", "topic"); + control.bind("my-exchange", "my-queue", "subject1"); + + Sender sender = fix.session.createSender("my-exchange"); + Receiver receiver = fix.session.createReceiver("my-queue"); + Message out; + out.setSubject("subject1"); + out.setContent("one"); + sender.send(out); + Message in; + BOOST_CHECK(receiver.fetch(in, Duration::SECOND*5)); + BOOST_CHECK_EQUAL(out.getContent(), in.getContent()); + control.unbind("my-exchange", "my-queue", "subject1"); + control.bind("my-exchange", "my-queue", "subject2"); + + out.setContent("two"); + sender.send(out);//should be dropped + + out.setSubject("subject2"); + out.setContent("three"); + sender.send(out);//should not be dropped + + BOOST_CHECK(receiver.fetch(in, Duration::SECOND*5)); + BOOST_CHECK_EQUAL(out.getContent(), in.getContent()); + BOOST_CHECK(!receiver.fetch(in, Duration::IMMEDIATE)); + sender.close(); + receiver.close(); + + control.deleteExchange("my-exchange"); + messaging::Session other = fix.connection.createSession(); + { + ScopedSuppressLogging sl; + BOOST_CHECK_THROW(other.createSender("my-exchange"), qpid::messaging::NotFound); + } + control.deleteQueue("my-queue"); + other = fix.connection.createSession(); + { + ScopedSuppressLogging sl; + BOOST_CHECK_THROW(other.createReceiver("my-queue"), qpid::messaging::NotFound); + } +} + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp index 4d63d9bd97..e4e9897195 100644 --- a/qpid/cpp/src/tests/QueueTest.cpp +++ b/qpid/cpp/src/tests/QueueTest.cpp @@ -249,7 +249,7 @@ QPID_AUTO_TEST_CASE(testBound){ exchange2.reset(); //unbind the queue from all exchanges it knows it has been bound to: - queue->unbind(exchanges, queue); + queue->unbind(exchanges); //ensure the remaining exchanges don't still have the queue bound to them: FailOnDeliver deliverable; diff --git a/qpid/cpp/src/tests/qpid-ctrl b/qpid/cpp/src/tests/qpid-ctrl index 7b46c190fb..4246c57898 100755 --- a/qpid/cpp/src/tests/qpid-ctrl +++ b/qpid/cpp/src/tests/qpid-ctrl @@ -92,7 +92,10 @@ try: arguments = {} for a in args: name, val = nameval(a) - arguments[name] = val + if val[0] == '{' or val[0] == '[': + arguments[name] = eval(val) + else: + arguments[name] = val content = { "_object_id": {"_object_name": object_name}, "_method_name": method_name, diff --git a/qpid/cpp/src/tests/sender.cpp b/qpid/cpp/src/tests/sender.cpp index 9850e851da..063b5e87dc 100644 --- a/qpid/cpp/src/tests/sender.cpp +++ b/qpid/cpp/src/tests/sender.cpp @@ -120,7 +120,7 @@ void Sender::execute(AsyncSession& session, bool isRetry) string data; while (getline(std::cin, data)) { message.setData(data); - message.getHeaders().setInt("SN", ++sent); + //message.getHeaders().setInt("SN", ++sent); string matchKey; if (lvqMatchValues && getline(lvqMatchValues, matchKey)) { message.getHeaders().setString(QueueOptions::strLVQMatchProperty, matchKey); |