diff options
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/examples/messaging/CMakeLists.txt | 2 | ||||
-rw-r--r-- | cpp/examples/messaging/Makefile.am | 8 | ||||
-rw-r--r-- | cpp/examples/messaging/queue_listener.cpp | 82 | ||||
-rw-r--r-- | cpp/examples/messaging/topic_listener.cpp | 79 | ||||
-rw-r--r-- | cpp/include/qpid/messaging/MessageListener.h | 49 | ||||
-rw-r--r-- | cpp/include/qpid/messaging/Receiver.h | 8 | ||||
-rw-r--r-- | cpp/include/qpid/messaging/Session.h | 20 | ||||
-rw-r--r-- | cpp/src/Makefile.am | 1 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/ReceiverImpl.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/SessionImpl.cpp | 72 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/SessionImpl.h | 12 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/Receiver.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/ReceiverImpl.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/Session.cpp | 15 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/SessionImpl.h | 3 | ||||
-rw-r--r-- | cpp/src/tests/MessagingSessionTests.cpp | 57 |
17 files changed, 20 insertions, 397 deletions
diff --git a/cpp/examples/messaging/CMakeLists.txt b/cpp/examples/messaging/CMakeLists.txt index e7885d0b50..31310d4ae2 100644 --- a/cpp/examples/messaging/CMakeLists.txt +++ b/cpp/examples/messaging/CMakeLists.txt @@ -17,11 +17,9 @@ # under the License. # -add_example(messaging queue_listener) add_example(messaging queue_receiver) add_example(messaging queue_sender) -add_example(messaging topic_listener) add_example(messaging topic_receiver) add_example(messaging topic_sender) diff --git a/cpp/examples/messaging/Makefile.am b/cpp/examples/messaging/Makefile.am index a16b0b3bb1..70a7fd59a6 100644 --- a/cpp/examples/messaging/Makefile.am +++ b/cpp/examples/messaging/Makefile.am @@ -21,23 +21,17 @@ examplesdir=$(pkgdatadir)/examples/messaging MAKELDFLAGS=$(CLIENTFLAGS) include $(top_srcdir)/examples/makedist.mk -noinst_PROGRAMS=queue_sender queue_listener queue_receiver topic_sender topic_listener topic_receiver client server map_sender map_receiver +noinst_PROGRAMS=queue_sender queue_receiver topic_sender topic_receiver client server map_sender map_receiver queue_sender_SOURCES=queue_sender.cpp queue_sender_LDADD=$(CLIENT_LIB) -queue_listener_SOURCES=queue_listener.cpp -queue_listener_LDADD=$(CLIENT_LIB) - queue_receiver_SOURCES=queue_receiver.cpp queue_receiver_LDADD=$(CLIENT_LIB) topic_sender_SOURCES=topic_sender.cpp topic_sender_LDADD=$(CLIENT_LIB) -topic_listener_SOURCES=topic_listener.cpp -topic_listener_LDADD=$(CLIENT_LIB) - topic_receiver_SOURCES=topic_receiver.cpp topic_receiver_LDADD=$(CLIENT_LIB) diff --git a/cpp/examples/messaging/queue_listener.cpp b/cpp/examples/messaging/queue_listener.cpp deleted file mode 100644 index 92a0eed5ed..0000000000 --- a/cpp/examples/messaging/queue_listener.cpp +++ /dev/null @@ -1,82 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <qpid/messaging/Connection.h> -#include <qpid/messaging/Session.h> -#include <qpid/messaging/Message.h> -#include <qpid/messaging/MessageListener.h> -#include <qpid/messaging/Receiver.h> - -#include <cstdlib> -#include <iostream> - -using namespace qpid::messaging; - -class Listener : public MessageListener -{ - public: - Listener(const Receiver& receiver); - void received(Message& message); - bool isFinished(); - private: - Receiver receiver; - bool finished; -}; - -Listener::Listener(const Receiver& r) : receiver(r), finished(false) {} - -bool Listener::isFinished() { return finished; } - -void Listener::received(Message& message) -{ - std::cout << "Message: " << message.getContent() << std::endl; - if (message.getContent() == "That's all, folks!") { - std::cout << "Shutting down listener" << std::endl; - receiver.cancel(); - finished = true; - } -} - -int main(int argc, char** argv) { - const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672"; - - try { - Connection connection = Connection::open(url); - Session session = connection.newSession(); - - Receiver receiver = session.createReceiver("message_queue"); - Listener listener(receiver); - receiver.setListener(&listener); - receiver.setCapacity(1); - receiver.start(); - while (session.dispatch()) { - session.acknowledge(); - if (listener.isFinished()) break; - } - connection.close(); - return 0; - } catch(const std::exception& error) { - std::cout << error.what() << std::endl; - } - return 1; -} - - diff --git a/cpp/examples/messaging/topic_listener.cpp b/cpp/examples/messaging/topic_listener.cpp deleted file mode 100644 index 4c97caef7c..0000000000 --- a/cpp/examples/messaging/topic_listener.cpp +++ /dev/null @@ -1,79 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <qpid/messaging/Connection.h> -#include <qpid/messaging/Message.h> -#include <qpid/messaging/MessageListener.h> -#include <qpid/messaging/Session.h> -#include <qpid/messaging/Receiver.h> -#include <qpid/messaging/Variant.h> - -#include <cstdlib> -#include <iostream> - -using namespace qpid::messaging; - -class Listener : public MessageListener -{ - public: - Listener(const Receiver& receiver); - void received(Message& message); - bool isFinished(); - private: - Receiver receiver; - bool finished; -}; - -Listener::Listener(const Receiver& r) : receiver(r), finished(false) {} - -bool Listener::isFinished() { return finished; } - -void Listener::received(Message& message) -{ - std::cout << "Message: " << message.getContent() << std::endl; - if (message.getContent() == "That's all, folks!") { - std::cout << "Shutting down listener" << std::endl; - receiver.cancel(); - finished = true; - } -} - -int main(int argc, char** argv) { - const std::string url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672"; - const std::string pattern = argc>2 ? argv[2] : "#.#"; - - try { - Connection connection = Connection::open(url); - Session session = connection.newSession(); - - Receiver receiver = session.createReceiver("news_service {filter:[control, " + pattern + "]}"); - Listener listener(receiver); - receiver.setListener(&listener); - receiver.setCapacity(1); - receiver.start(); - while (session.dispatch() && !listener.isFinished()) ; - connection.close(); - return 0; - } catch(const std::exception& error) { - std::cout << error.what() << std::endl; - } - return 1; -} diff --git a/cpp/include/qpid/messaging/MessageListener.h b/cpp/include/qpid/messaging/MessageListener.h deleted file mode 100644 index 72811e7b9c..0000000000 --- a/cpp/include/qpid/messaging/MessageListener.h +++ /dev/null @@ -1,49 +0,0 @@ -#ifndef QPID_MESSAGING_MESSAGELISTENER_H -#define QPID_MESSAGING_MESSAGELISTENER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/client/ClientImportExport.h" - -namespace qpid { -namespace messaging { - -class Message; - -/** - * To use a push style interface for receiving messages, applications - * provide implementations of this interface and pass an implementing - * instance to MessageSource::subscribe(). - * - * Messages arriving for that subscription will then be passed to the - * implementation via the received() method. - */ -class MessageListener -{ - public: - QPID_CLIENT_EXTERN virtual ~MessageListener() {} - virtual void received(Message&) = 0; - private: -}; - -}} // namespace qpid::messaging - -#endif /*!QPID_MESSAGING_MESSAGELISTENER_H*/ diff --git a/cpp/include/qpid/messaging/Receiver.h b/cpp/include/qpid/messaging/Receiver.h index a4fdd7a34b..1d72e5fc49 100644 --- a/cpp/include/qpid/messaging/Receiver.h +++ b/cpp/include/qpid/messaging/Receiver.h @@ -36,7 +36,6 @@ template <class> class PrivateImplRef; namespace messaging { class Message; -class MessageListener; class ReceiverImpl; /** @@ -121,13 +120,6 @@ class Receiver : public qpid::client::Handle<ReceiverImpl> * Cancels this receiver. */ QPID_CLIENT_EXTERN void cancel(); - - /** - * Set a message listener for this receiver. - * - * @see Session::dispatch() - */ - QPID_CLIENT_EXTERN void setListener(MessageListener* listener); private: friend class qpid::client::PrivateImplRef<Receiver>; }; diff --git a/cpp/include/qpid/messaging/Session.h b/cpp/include/qpid/messaging/Session.h index d77ddf3e43..4b23d355ec 100644 --- a/cpp/include/qpid/messaging/Session.h +++ b/cpp/include/qpid/messaging/Session.h @@ -45,7 +45,7 @@ class Subscription; /** * A session represents a distinct 'conversation' which can involve - * sending and receiving messages from different sources and sinks. + * sending and receiving messages to and from different addresses. */ class Session : public qpid::client::Handle<SessionImpl> { @@ -85,10 +85,22 @@ class Session : public qpid::client::Handle<SessionImpl> * has not yet been confirmed as processed by the server. */ QPID_CLIENT_EXTERN uint32_t pendingAck(); - QPID_CLIENT_EXTERN bool fetch(Message& message, qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE); - QPID_CLIENT_EXTERN Message fetch(qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE); - QPID_CLIENT_EXTERN bool dispatch(qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE); + /** + * Retrieves the receiver for the next available message. If there + * are no available messages at present the call will block for up + * to the specified timeout waiting for one to arrive. Returns + * true if a message was available at the point of return, in + * which case the passed in receiver reference will be set to the + * receiver for that message or fals if no message was available. + */ QPID_CLIENT_EXTERN bool nextReceiver(Receiver&, qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE); + /** + * Returns the receiver for the next available message. If there + * are no available messages at present the call will block for up + * to the specified timeout waiting for one to arrive. Will throw + * Receiver::NoMessageAvailable if no message became available in + * time. + */ QPID_CLIENT_EXTERN Receiver nextReceiver(qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE); diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 4b859cda47..a15d722f5d 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -799,7 +799,6 @@ nobase_include_HEADERS += \ ../include/qpid/messaging/MapContent.h \ ../include/qpid/messaging/MapView.h \ ../include/qpid/messaging/Message.h \ - ../include/qpid/messaging/MessageListener.h \ ../include/qpid/messaging/Sender.h \ ../include/qpid/messaging/Receiver.h \ ../include/qpid/messaging/Session.h \ diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp index f294d7e273..83b245aa02 100644 --- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp @@ -22,7 +22,6 @@ #include "AddressResolution.h" #include "MessageSource.h" #include "SessionImpl.h" -#include "qpid/messaging/MessageListener.h" #include "qpid/messaging/Receiver.h" namespace qpid { @@ -115,8 +114,6 @@ void ReceiverImpl::init(qpid::client::AsyncSession s, AddressResolution& resolve } } -void ReceiverImpl::setListener(qpid::messaging::MessageListener* l) { listener = l; } -qpid::messaging::MessageListener* ReceiverImpl::getListener() { return listener; } const std::string& ReceiverImpl::getName() const { return destination; } @@ -139,7 +136,7 @@ ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name, const qpid::messaging::Address& a) : parent(p), destination(name), address(a), byteCredit(0xFFFFFFFF), - state(UNRESOLVED), capacity(0), listener(0), window(0) {} + state(UNRESOLVED), capacity(0), window(0) {} bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::sys::Duration timeout) { diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h index d05fd3d045..3a18368116 100644 --- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h +++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h @@ -62,8 +62,6 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl uint32_t getCapacity(); uint32_t available(); uint32_t pendingAck(); - void setListener(qpid::messaging::MessageListener* listener); - qpid::messaging::MessageListener* getListener(); void received(qpid::messaging::Message& message); private: SessionImpl& parent; diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index 7f8e5f4e79..d0085dad75 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -30,7 +30,6 @@ #include "qpid/messaging/Address.h" #include "qpid/messaging/Message.h" #include "qpid/messaging/MessageImpl.h" -#include "qpid/messaging/MessageListener.h" #include "qpid/messaging/Sender.h" #include "qpid/messaging/Receiver.h" #include "qpid/messaging/Session.h" @@ -177,13 +176,6 @@ Sender SessionImpl::createSenderImpl(const qpid::messaging::Address& address) return sender; } -qpid::messaging::Address SessionImpl::createTempQueue(const std::string& baseName) -{ - std::string name = baseName + std::string("_") + session.getId().getName(); - session.queueDeclare(arg::queue=name, arg::exclusive=true, arg::autoDelete=true); - return qpid::messaging::Address(name); -} - SessionImpl& SessionImpl::convert(qpid::messaging::Session& s) { boost::intrusive_ptr<SessionImpl> impl = getImplPtr<qpid::messaging::Session, SessionImpl>(s); @@ -225,16 +217,10 @@ bool SessionImpl::getNextReceiver(Receiver* receiver, IncomingMessages::MessageT bool SessionImpl::accept(ReceiverImpl* receiver, qpid::messaging::Message* message, - bool isDispatch, IncomingMessages::MessageTransfer& transfer) { if (receiver->getName() == transfer.getDestination()) { transfer.retrieve(message); - if (isDispatch) { - qpid::sys::Mutex::ScopedUnlock u(lock); - qpid::messaging::MessageListener* listener = receiver->getListener(); - if (listener) listener->received(*message); - } receiver->received(*message); return true; } else { @@ -242,18 +228,6 @@ bool SessionImpl::accept(ReceiverImpl* receiver, } } -bool SessionImpl::acceptAny(qpid::messaging::Message* message, bool isDispatch, IncomingMessages::MessageTransfer& transfer) -{ - Receivers::iterator i = receivers.find(transfer.getDestination()); - if (i == receivers.end()) { - QPID_LOG(error, "Received message for unknown destination " << transfer.getDestination()); - return false; - } else { - boost::intrusive_ptr<ReceiverImpl> receiver = getImplPtr<Receiver, ReceiverImpl>(i->second); - return receiver && (!isDispatch || receiver->getListener()) && accept(receiver.get(), message, isDispatch, transfer); - } -} - bool SessionImpl::getIncoming(IncomingMessages::Handler& handler, qpid::sys::Duration timeout) { return incoming.get(handler, timeout); @@ -261,37 +235,10 @@ bool SessionImpl::getIncoming(IncomingMessages::Handler& handler, qpid::sys::Dur bool SessionImpl::get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::sys::Duration timeout) { - IncomingMessageHandler handler(boost::bind(&SessionImpl::accept, this, &receiver, &message, false, _1)); + IncomingMessageHandler handler(boost::bind(&SessionImpl::accept, this, &receiver, &message, _1)); return getIncoming(handler, timeout); } -bool SessionImpl::dispatch(qpid::sys::Duration timeout) -{ - qpid::sys::Mutex::ScopedLock l(lock); - while (true) { - try { - qpid::messaging::Message message; - IncomingMessageHandler handler(boost::bind(&SessionImpl::acceptAny, this, &message, true, _1)); - return getIncoming(handler, timeout); - } catch (TransportFailure&) { - reconnect(); - } - } -} - -bool SessionImpl::fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout) -{ - qpid::sys::Mutex::ScopedLock l(lock); - while (true) { - try { - IncomingMessageHandler handler(boost::bind(&SessionImpl::acceptAny, this, &message, false, _1)); - return getIncoming(handler, timeout); - } catch (TransportFailure&) { - reconnect(); - } - } -} - bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::sys::Duration timeout) { qpid::sys::Mutex::ScopedLock l(lock); @@ -418,13 +365,6 @@ void SessionImpl::rejectImpl(qpid::messaging::Message& m) session.messageReject(set); } -qpid::messaging::Message SessionImpl::fetch(qpid::sys::Duration timeout) -{ - qpid::messaging::Message result; - if (!fetch(result, timeout)) throw Receiver::NoMessageAvailable(); - return result; -} - void SessionImpl::receiverCancelled(const std::string& name) { receivers.erase(name); @@ -442,14 +382,4 @@ void SessionImpl::reconnect() connection.reconnect(); } -void* SessionImpl::getLastConfirmedSent() -{ - return 0; -} - -void* SessionImpl::getLastConfirmedAcknowledged() -{ - return 0; -} - }}} // namespace qpid::client::amqp0_10 diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/cpp/src/qpid/client/amqp0_10/SessionImpl.h index ec9a6162c1..f3018b9685 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.h +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.h @@ -62,21 +62,12 @@ class SessionImpl : public qpid::messaging::SessionImpl void close(); void sync(); void flush(); - qpid::messaging::Address createTempQueue(const std::string& baseName); qpid::messaging::Sender createSender(const qpid::messaging::Address& address); qpid::messaging::Receiver createReceiver(const qpid::messaging::Address& address); - void* getLastConfirmedSent(); - void* getLastConfirmedAcknowledged(); - - bool fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout); - qpid::messaging::Message fetch(qpid::sys::Duration timeout); - bool dispatch(qpid::sys::Duration timeout); - bool nextReceiver(qpid::messaging::Receiver& receiver, qpid::sys::Duration timeout); qpid::messaging::Receiver nextReceiver(qpid::sys::Duration timeout); - bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::sys::Duration timeout); void receiverCancelled(const std::string& name); @@ -116,8 +107,7 @@ class SessionImpl : public qpid::messaging::SessionImpl Receivers receivers; Senders senders; - bool acceptAny(qpid::messaging::Message*, bool, IncomingMessages::MessageTransfer&); - bool accept(ReceiverImpl*, qpid::messaging::Message*, bool, IncomingMessages::MessageTransfer&); + bool accept(ReceiverImpl*, qpid::messaging::Message*, IncomingMessages::MessageTransfer&); bool getIncoming(IncomingMessages::Handler& handler, qpid::sys::Duration timeout); bool getNextReceiver(qpid::messaging::Receiver* receiver, IncomingMessages::MessageTransfer& transfer); void reconnect(); diff --git a/cpp/src/qpid/messaging/Receiver.cpp b/cpp/src/qpid/messaging/Receiver.cpp index 3290ea98ac..76750cfc59 100644 --- a/cpp/src/qpid/messaging/Receiver.cpp +++ b/cpp/src/qpid/messaging/Receiver.cpp @@ -49,6 +49,5 @@ uint32_t Receiver::getCapacity() { return impl->getCapacity(); } uint32_t Receiver::available() { return impl->available(); } uint32_t Receiver::pendingAck() { return impl->pendingAck(); } void Receiver::cancel() { impl->cancel(); } -void Receiver::setListener(MessageListener* listener) { impl->setListener(listener); } }} // namespace qpid::messaging diff --git a/cpp/src/qpid/messaging/ReceiverImpl.h b/cpp/src/qpid/messaging/ReceiverImpl.h index 7db20acc29..e463559d99 100644 --- a/cpp/src/qpid/messaging/ReceiverImpl.h +++ b/cpp/src/qpid/messaging/ReceiverImpl.h @@ -48,7 +48,6 @@ class ReceiverImpl : public virtual qpid::RefCounted virtual uint32_t available() = 0; virtual uint32_t pendingAck() = 0; virtual void cancel() = 0; - virtual void setListener(MessageListener*) = 0; }; }} // namespace qpid::messaging diff --git a/cpp/src/qpid/messaging/Session.cpp b/cpp/src/qpid/messaging/Session.cpp index aa8e067168..53e85d53b1 100644 --- a/cpp/src/qpid/messaging/Session.cpp +++ b/cpp/src/qpid/messaging/Session.cpp @@ -75,21 +75,6 @@ void Session::flush() impl->flush(); } -bool Session::fetch(Message& message, qpid::sys::Duration timeout) -{ - return impl->fetch(message, timeout); -} - -Message Session::fetch(qpid::sys::Duration timeout) -{ - return impl->fetch(timeout); -} - -bool Session::dispatch(qpid::sys::Duration timeout) -{ - return impl->dispatch(timeout); -} - bool Session::nextReceiver(Receiver& receiver, qpid::sys::Duration timeout) { return impl->nextReceiver(receiver, timeout); diff --git a/cpp/src/qpid/messaging/SessionImpl.h b/cpp/src/qpid/messaging/SessionImpl.h index cf95e22ae8..b68baf821c 100644 --- a/cpp/src/qpid/messaging/SessionImpl.h +++ b/cpp/src/qpid/messaging/SessionImpl.h @@ -48,9 +48,6 @@ class SessionImpl : public virtual qpid::RefCounted virtual void close() = 0; virtual void sync() = 0; virtual void flush() = 0; - virtual bool fetch(Message& message, qpid::sys::Duration timeout) = 0; - virtual Message fetch(qpid::sys::Duration timeout) = 0; - virtual bool dispatch(qpid::sys::Duration timeout) = 0; virtual Sender createSender(const Address& address) = 0; virtual Receiver createReceiver(const Address& address) = 0; virtual bool nextReceiver(Receiver& receiver, qpid::sys::Duration timeout) = 0; diff --git a/cpp/src/tests/MessagingSessionTests.cpp b/cpp/src/tests/MessagingSessionTests.cpp index dd7166df46..00a8481c93 100644 --- a/cpp/src/tests/MessagingSessionTests.cpp +++ b/cpp/src/tests/MessagingSessionTests.cpp @@ -28,7 +28,6 @@ #include "qpid/messaging/MapContent.h" #include "qpid/messaging/MapView.h" #include "qpid/messaging/Message.h" -#include "qpid/messaging/MessageListener.h" #include "qpid/messaging/Receiver.h" #include "qpid/messaging/Sender.h" #include "qpid/messaging/Session.h" @@ -187,16 +186,6 @@ struct MultiQueueFixture : MessagingFixture } }; - -struct MessageDataCollector : MessageListener -{ - std::vector<std::string> messageData; - - void received(Message& message) { - messageData.push_back(message.getContent()); - } -}; - std::vector<std::string> fetch(Receiver& receiver, int count, qpid::sys::Duration timeout=qpid::sys::TIME_SEC*5) { std::vector<std::string> data; @@ -308,52 +297,6 @@ QPID_AUTO_TEST_CASE(testSimpleTopic) //TODO: check pending messages... } -QPID_AUTO_TEST_CASE(testSessionFetch) -{ - MultiQueueFixture fix; - - for (uint i = 0; i < fix.queues.size(); i++) { - Receiver r = fix.session.createReceiver(fix.queues[i]); - r.setCapacity(10u); - r.start();//TODO: add Session::start - } - - for (uint i = 0; i < fix.queues.size(); i++) { - Sender s = fix.session.createSender(fix.queues[i]); - Message msg((boost::format("Message_%1%") % (i+1)).str()); - s.send(msg); - } - - for (uint i = 0; i < fix.queues.size(); i++) { - Message msg; - BOOST_CHECK(fix.session.fetch(msg, qpid::sys::TIME_SEC)); - BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1)).str()); - } -} - -QPID_AUTO_TEST_CASE(testSessionDispatch) -{ - MultiQueueFixture fix; - - MessageDataCollector collector; - for (uint i = 0; i < fix.queues.size(); i++) { - Receiver r = fix.session.createReceiver(fix.queues[i]); - r.setListener(&collector); - r.setCapacity(10u); - r.start();//TODO: add Session::start - } - - for (uint i = 0; i < fix.queues.size(); i++) { - Sender s = fix.session.createSender(fix.queues[i]); - Message msg((boost::format("Message_%1%") % (i+1)).str()); - s.send(msg); - } - - while (fix.session.dispatch(qpid::sys::TIME_SEC)) ; - - BOOST_CHECK_EQUAL(collector.messageData, boost::assign::list_of<std::string>("Message_1")("Message_2")("Message_3")); -} - QPID_AUTO_TEST_CASE(testNextReceiver) { MultiQueueFixture fix; |