diff options
| author | Gordon Sim <gsim@apache.org> | 2009-09-07 09:15:40 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2009-09-07 09:15:40 +0000 |
| commit | 5748390087f011559549ec33f538e5a3babd43fe (patch) | |
| tree | 7bb9ba35d450e778865c0c26d72735180582a31a /cpp/src | |
| parent | 32bfd712449b0839d9f302f8168c4f588e3c6070 (diff) | |
| download | qpid-python-5748390087f011559549ec33f538e5a3babd43fe.tar.gz | |
QPID-664: Added automatic reconnection logic.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@812049 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp | 114 | ||||
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/ConnectionImpl.h | 26 | ||||
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp | 9 | ||||
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/IncomingMessages.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp | 164 | ||||
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/ReceiverImpl.h | 105 | ||||
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/SenderImpl.cpp | 39 | ||||
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/SenderImpl.h | 56 | ||||
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/SessionImpl.cpp | 224 | ||||
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/SessionImpl.h | 107 | ||||
| -rw-r--r-- | cpp/src/tests/Makefile.am | 5 | ||||
| -rw-r--r-- | cpp/src/tests/MessagingSessionTests.cpp | 20 | ||||
| -rw-r--r-- | cpp/src/tests/qpid_stream.cpp | 163 |
13 files changed, 875 insertions, 159 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp index 9f738731e2..3a735b5698 100644 --- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp @@ -21,14 +21,16 @@ #include "ConnectionImpl.h" #include "SessionImpl.h" #include "qpid/messaging/Session.h" -#include "qpid/client/ConnectionSettings.h" +#include "qpid/client/PrivateImplRef.h" #include "qpid/log/Statement.h" +#include <boost/intrusive_ptr.hpp> namespace qpid { namespace client { namespace amqp0_10 { using qpid::messaging::Variant; +using namespace qpid::sys; template <class T> void setIfFound(const Variant::Map& map, const std::string& key, T& value) { @@ -56,24 +58,124 @@ void convert(const Variant::Map& from, ConnectionSettings& to) setIfFound(from, "bounds", to.bounds); } -ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options) +ConnectionImpl::ConnectionImpl(const std::string& u, const Variant::Map& options) : + url(u), reconnectionEnabled(true), timeout(-1), + minRetryInterval(1), maxRetryInterval(30) { QPID_LOG(debug, "Opening connection to " << url << " with " << options); - Url u(url); - ConnectionSettings settings; convert(options, settings); - connection.open(u, settings); + setIfFound(options, "reconnection-enabled", reconnectionEnabled); + setIfFound(options, "reconnection-timeout", timeout); + setIfFound(options, "min-retry-interval", minRetryInterval); + setIfFound(options, "max-retry-interval", maxRetryInterval); + connection.open(url, settings); } void ConnectionImpl::close() { + qpid::sys::Mutex::ScopedLock l(lock); connection.close(); } +boost::intrusive_ptr<SessionImpl> getImplPtr(qpid::messaging::Session& session) +{ + return boost::dynamic_pointer_cast<SessionImpl>( + qpid::client::PrivateImplRef<qpid::messaging::Session>::get(session) + ); +} + +void ConnectionImpl::closed(SessionImpl& s) +{ + qpid::sys::Mutex::ScopedLock l(lock); + for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) { + if (getImplPtr(*i).get() == &s) { + sessions.erase(i); + break; + } + } +} + qpid::messaging::Session ConnectionImpl::newSession() { - qpid::messaging::Session impl(new SessionImpl(connection.newSession())); + qpid::messaging::Session impl(new SessionImpl(*this)); + { + qpid::sys::Mutex::ScopedLock l(lock); + sessions.push_back(impl); + } + try { + getImplPtr(impl)->setSession(connection.newSession()); + } catch (const TransportFailure&) { + reconnect(); + } return impl; } +void ConnectionImpl::reconnect() +{ + AbsTime start = now(); + ScopedLock<Semaphore> l(semaphore); + if (!connection.isOpen()) connect(start); +} + +bool expired(const AbsTime& start, int timeout) +{ + if (timeout == 0) return true; + if (timeout < 0) return false; + Duration used(start, now()); + Duration allowed = timeout * TIME_SEC; + return allowed > used; +} + +void ConnectionImpl::connect(const AbsTime& started) +{ + for (int i = minRetryInterval; !tryConnect(); i = std::min(i * 2, maxRetryInterval)) { + if (expired(started, timeout)) throw TransportFailure(); + else qpid::sys::sleep(i); + } +} + +bool ConnectionImpl::tryConnect() +{ + if (tryConnect(url) || tryConnect(connection.getKnownBrokers())) { + return resetSessions(); + } else { + return false; + } +} + +bool ConnectionImpl::tryConnect(const Url& u) +{ + try { + QPID_LOG(info, "Trying to connect to " << url << "..."); + connection.open(u, settings); + return true; + } catch (const Exception& e) { + //TODO: need to fix timeout on open so that it throws TransportFailure + QPID_LOG(info, "Failed to connect to " << u << ": " << e.what()); + } + return false; +} + +bool ConnectionImpl::tryConnect(const std::vector<Url>& urls) +{ + for (std::vector<Url>::const_iterator i = urls.begin(); i != urls.end(); ++i) { + if (tryConnect(*i)) return true; + } + return false; +} + +bool ConnectionImpl::resetSessions() +{ + try { + qpid::sys::Mutex::ScopedLock l(lock); + for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) { + getImplPtr(*i)->setSession(connection.newSession()); + } + return true; + } catch (const TransportFailure&) { + QPID_LOG(debug, "Connection failed while re-inialising sessions"); + return false; + } +} + }}} // namespace qpid::client::amqp0_10 diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h index 120a8ab9d8..565f2ec7ec 100644 --- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h +++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h @@ -23,20 +23,46 @@ */ #include "qpid/messaging/ConnectionImpl.h" #include "qpid/messaging/Variant.h" +#include "qpid/Url.h" #include "qpid/client/Connection.h" +#include "qpid/client/ConnectionSettings.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Semaphore.h" +#include <vector> namespace qpid { namespace client { namespace amqp0_10 { +class SessionImpl; + class ConnectionImpl : public qpid::messaging::ConnectionImpl { public: ConnectionImpl(const std::string& url, const qpid::messaging::Variant::Map& options); void close(); qpid::messaging::Session newSession(); + void closed(SessionImpl&); + void reconnect(); private: + typedef std::vector<qpid::messaging::Session> Sessions; + + qpid::sys::Mutex lock;//used to protect data structures + qpid::sys::Semaphore semaphore;//used to coordinate reconnection qpid::client::Connection connection; + qpid::Url url; + qpid::client::ConnectionSettings settings; + Sessions sessions; + bool reconnectionEnabled; + int timeout; + int minRetryInterval; + int maxRetryInterval; + + void connect(const qpid::sys::AbsTime& started); + bool tryConnect(); + bool tryConnect(const std::vector<Url>& urls); + bool tryConnect(const Url&); + bool resetSessions(); }; }}} // namespace qpid::client::amqp0_10 diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp index f9bd355a78..c54a186365 100644 --- a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp +++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp @@ -82,9 +82,11 @@ struct MatchAndTrack }; } -IncomingMessages::IncomingMessages(qpid::client::AsyncSession s) : - session(s), - incoming(SessionBase_0_10Access(session).get()->getDemux().getDefault()) {} +void IncomingMessages::setSession(qpid::client::AsyncSession s) +{ + session = s; + incoming = SessionBase_0_10Access(session).get()->getDemux().getDefault(); +} bool IncomingMessages::get(Handler& handler, Duration timeout) { @@ -208,6 +210,7 @@ void populateHeaders(qpid::messaging::Message& message, if (messageProperties->hasReplyTo()) { message.setReplyTo(AddressResolution::convert(messageProperties->getReplyTo())); } + message.getHeaders().clear(); translate(messageProperties->getApplicationHeaders(), message.getHeaders()); //TODO: convert other message properties } diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.h b/cpp/src/qpid/client/amqp0_10/IncomingMessages.h index c4346fd7d7..5e28877305 100644 --- a/cpp/src/qpid/client/amqp0_10/IncomingMessages.h +++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.h @@ -67,7 +67,7 @@ class IncomingMessages virtual bool accept(MessageTransfer& transfer) = 0; }; - IncomingMessages(qpid::client::AsyncSession session); + void setSession(qpid::client::AsyncSession session); bool get(Handler& handler, qpid::sys::Duration timeout); //bool get(qpid::messaging::Message& message, qpid::sys::Duration timeout); //bool get(const std::string& destination, qpid::messaging::Message& message, qpid::sys::Duration timeout); diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp index e6ed4bfc4e..31efff38a6 100644 --- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp @@ -19,6 +19,7 @@ * */ #include "ReceiverImpl.h" +#include "AddressResolution.h" #include "MessageSource.h" #include "SessionImpl.h" #include "qpid/messaging/MessageListener.h" @@ -38,11 +39,6 @@ void ReceiverImpl::received(qpid::messaging::Message&) window = capacity; } } - -bool ReceiverImpl::get(qpid::messaging::Message& message, qpid::sys::Duration timeout) -{ - return parent.get(*this, message, timeout); -} qpid::messaging::Message ReceiverImpl::get(qpid::sys::Duration timeout) { @@ -50,24 +46,6 @@ qpid::messaging::Message ReceiverImpl::get(qpid::sys::Duration timeout) if (!get(result, timeout)) throw Receiver::NoMessageAvailable(); return result; } - -bool ReceiverImpl::fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout) -{ - if (capacity == 0 && !cancelled) { - session.messageFlow(destination, CREDIT_UNIT_MESSAGE, 1); - if (!started) session.messageFlow(destination, CREDIT_UNIT_BYTE, byteCredit); - } - - if (get(message, timeout)) { - return true; - } else { - if (!cancelled) { - sync(session).messageFlush(destination); - start();//reallocate credit - } - return get(message, 0); - } -} qpid::messaging::Message ReceiverImpl::fetch(qpid::sys::Duration timeout) { @@ -76,71 +54,137 @@ qpid::messaging::Message ReceiverImpl::fetch(qpid::sys::Duration timeout) return result; } +bool ReceiverImpl::get(qpid::messaging::Message& message, qpid::sys::Duration timeout) +{ + Get f(*this, message, timeout); + while (!parent.execute(f)) {} + return f.result; +} + +bool ReceiverImpl::fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout) +{ + Fetch f(*this, message, timeout); + while (!parent.execute(f)) {} + return f.result; +} + void ReceiverImpl::cancel() { - if (!cancelled) { - //TODO: should syncronicity be an optional argument to this call? - source->cancel(session, destination); - //need to be sure cancel is complete and all incoming - //framesets are processed before removing the receiver - parent.receiverCancelled(destination); - cancelled = true; - } + execute<Cancel>(); } void ReceiverImpl::start() { - if (!cancelled) { - started = true; - session.messageSetFlowMode(destination, capacity > 0); + execute<Start>(); +} + +void ReceiverImpl::stop() +{ + execute<Stop>(); +} + +void ReceiverImpl::setCapacity(uint32_t c) +{ + execute1<SetCapacity>(c); +} + +void ReceiverImpl::startFlow() +{ + if (capacity > 0) { + session.messageSetFlowMode(destination, FLOW_MODE_WINDOW); session.messageFlow(destination, CREDIT_UNIT_MESSAGE, capacity); session.messageFlow(destination, CREDIT_UNIT_BYTE, byteCredit); window = capacity; } } -void ReceiverImpl::stop() +void ReceiverImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver) { - session.messageStop(destination); - started = false; + + session = s; + if (state == UNRESOLVED) { + source = resolver.resolveSource(session, address, filter, options); + state = STOPPED;//TODO: if session is started, go straight to started + } + if (state == CANCELLED) { + source->cancel(session, destination); + parent.receiverCancelled(destination); + } else { + source->subscribe(session, destination); + if (state == STARTED) start(); + } } -void ReceiverImpl::subscribe() +void ReceiverImpl::setListener(qpid::messaging::MessageListener* l) { listener = l; } +qpid::messaging::MessageListener* ReceiverImpl::getListener() { return listener; } + +const std::string& ReceiverImpl::getName() const { return destination; } + +ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name, + const qpid::messaging::Address& a, + const qpid::messaging::Filter* f, + const qpid::messaging::Variant::Map& o) : + + parent(p), destination(name), address(a), filter(f), options(o), byteCredit(0xFFFFFFFF), + state(UNRESOLVED), capacity(0), listener(0), window(0) {} + +bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::sys::Duration timeout) { - source->subscribe(session, destination); + return parent.get(*this, message, timeout); } -void ReceiverImpl::setSession(qpid::client::AsyncSession s) +bool ReceiverImpl::fetchImpl(qpid::messaging::Message& message, qpid::sys::Duration timeout) +{ + if (state == CANCELLED) return false;//TODO: or should this be an error? + + if (capacity == 0 || state != STARTED) { + session.messageSetFlowMode(destination, FLOW_MODE_CREDIT); + session.messageFlow(destination, CREDIT_UNIT_MESSAGE, 1); + session.messageFlow(destination, CREDIT_UNIT_BYTE, 0xFFFFFFFF); + } + + if (getImpl(message, timeout)) { + return true; + } else { + sync(session).messageFlush(destination); + startFlow();//reallocate credit + return getImpl(message, 0); + } +} + +void ReceiverImpl::cancelImpl() { - session = s; - if (!cancelled) { - subscribe(); - //if we were in started state before the session was changed, - //start again on this new session - //TODO: locking if receiver is to be threadsafe... - if (started) start(); + if (state != CANCELLED) { + state = CANCELLED; + source->cancel(session, destination); + parent.receiverCancelled(destination); } } -void ReceiverImpl::setCapacity(uint32_t c) +void ReceiverImpl::startImpl() +{ + if (state == STOPPED) { + state = STARTED; + startFlow(); + } +} + +void ReceiverImpl::stopImpl() +{ + state = STOPPED; + session.messageStop(destination); +} + +void ReceiverImpl::setCapacityImpl(uint32_t c) { if (c != capacity) { capacity = c; - if (!cancelled && started) { - stop(); - start(); + if (state == STARTED) { + session.messageStop(destination); + startFlow(); } } } -void ReceiverImpl::setListener(qpid::messaging::MessageListener* l) { listener = l; } -qpid::messaging::MessageListener* ReceiverImpl::getListener() { return listener; } - -const std::string& ReceiverImpl::getName() const { return destination; } - -ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name, std::auto_ptr<MessageSource> s) : - parent(p), source(s), destination(name), byteCredit(0xFFFFFFFF), - capacity(0), started(false), cancelled(false), listener(0), window(0) {} - }}} // namespace qpid::client::amqp0_10 diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h index b549242d35..509c784513 100644 --- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h +++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h @@ -21,9 +21,13 @@ * under the License. * */ +#include "qpid/messaging/Address.h" +#include "qpid/messaging/Filter.h" #include "qpid/messaging/Message.h" #include "qpid/messaging/ReceiverImpl.h" +#include "qpid/messaging/Variant.h" #include "qpid/client/AsyncSession.h" +#include "qpid/client/amqp0_10/SessionImpl.h" #include "qpid/sys/Time.h" #include <memory> @@ -31,8 +35,8 @@ namespace qpid { namespace client { namespace amqp0_10 { +class AddressResolution; class MessageSource; -class SessionImpl; /** * A receiver implementation based on an AMQP 0-10 subscription. @@ -41,8 +45,14 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl { public: - ReceiverImpl(SessionImpl& parent, const std::string& name, std::auto_ptr<MessageSource> source); + enum State {UNRESOLVED, STOPPED, STARTED, CANCELLED}; + ReceiverImpl(SessionImpl& parent, const std::string& name, + const qpid::messaging::Address& address, + const qpid::messaging::Filter* filter, + const qpid::messaging::Variant::Map& options); + + void init(qpid::client::AsyncSession session, AddressResolution& resolver); bool get(qpid::messaging::Message& message, qpid::sys::Duration timeout); qpid::messaging::Message get(qpid::sys::Duration timeout); bool fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout); @@ -50,8 +60,6 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl void cancel(); void start(); void stop(); - void subscribe(); - void setSession(qpid::client::AsyncSession s); const std::string& getName() const; void setCapacity(uint32_t); void setListener(qpid::messaging::MessageListener* listener); @@ -59,16 +67,97 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl void received(qpid::messaging::Message& message); private: SessionImpl& parent; - const std::auto_ptr<MessageSource> source; const std::string destination; + const qpid::messaging::Address address; + const qpid::messaging::Filter* filter; + const qpid::messaging::Variant::Map options; const uint32_t byteCredit; - + State state; + + std::auto_ptr<MessageSource> source; uint32_t capacity; qpid::client::AsyncSession session; - bool started; - bool cancelled; qpid::messaging::MessageListener* listener; uint32_t window; + + void startFlow(); + //implementation of public facing methods + bool fetchImpl(qpid::messaging::Message& message, qpid::sys::Duration timeout); + bool getImpl(qpid::messaging::Message& message, qpid::sys::Duration timeout); + void startImpl(); + void stopImpl(); + void cancelImpl(); + void setCapacityImpl(uint32_t); + + //functors for public facing methods (allows locking and retry + //logic to be centralised) + struct Command + { + ReceiverImpl& impl; + + Command(ReceiverImpl& i) : impl(i) {} + }; + + struct Get : Command + { + qpid::messaging::Message& message; + qpid::sys::Duration timeout; + bool result; + + Get(ReceiverImpl& i, qpid::messaging::Message& m, qpid::sys::Duration t) : + Command(i), message(m), timeout(t), result(false) {} + void operator()() { result = impl.getImpl(message, timeout); } + }; + + struct Fetch : Command + { + qpid::messaging::Message& message; + qpid::sys::Duration timeout; + bool result; + + Fetch(ReceiverImpl& i, qpid::messaging::Message& m, qpid::sys::Duration t) : + Command(i), message(m), timeout(t), result(false) {} + void operator()() { result = impl.fetchImpl(message, timeout); } + }; + + struct Stop : Command + { + Stop(ReceiverImpl& i) : Command(i) {} + void operator()() { impl.stopImpl(); } + }; + + struct Start : Command + { + Start(ReceiverImpl& i) : Command(i) {} + void operator()() { impl.startImpl(); } + }; + + struct Cancel : Command + { + Cancel(ReceiverImpl& i) : Command(i) {} + void operator()() { impl.cancelImpl(); } + }; + + struct SetCapacity : Command + { + uint32_t capacity; + + SetCapacity(ReceiverImpl& i, uint32_t c) : Command(i), capacity(c) {} + void operator()() { impl.setCapacityImpl(capacity); } + }; + + //helper templates for some common patterns + template <class F> void execute() + { + F f(*this); + parent.execute(f); + } + + template <class F, class P> void execute1(P p) + { + F f(*this, p); + parent.execute(f); + } }; }}} // namespace qpid::client::amqp0_10 diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp index ac36eb1537..e70ee8af6f 100644 --- a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp @@ -21,29 +21,54 @@ #include "SenderImpl.h" #include "MessageSink.h" #include "SessionImpl.h" +#include "AddressResolution.h" namespace qpid { namespace client { namespace amqp0_10 { -SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name, std::auto_ptr<MessageSink> _sink) : - parent(_parent), name(_name), sink(_sink) {} +SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name, + const qpid::messaging::Address& _address, + const qpid::messaging::Variant::Map& _options) : + parent(_parent), name(_name), address(_address), options(_options), state(UNRESOLVED) {} void SenderImpl::send(qpid::messaging::Message& m) { - sink->send(session, name, m); + execute1<Send>(&m); } void SenderImpl::cancel() { - sink->cancel(session, name); - parent.senderCancelled(name); + execute<Cancel>(); } -void SenderImpl::setSession(qpid::client::AsyncSession s) +void SenderImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver) { session = s; - sink->declare(session, name); + if (state == UNRESOLVED) { + sink = resolver.resolveSink(session, address, options); + state = ACTIVE; + } + if (state == CANCELLED) { + sink->cancel(session, name); + parent.senderCancelled(name); + } else { + sink->declare(session, name); + //TODO: replay + } +} + +void SenderImpl::sendImpl(qpid::messaging::Message& m) +{ + //TODO: record for replay if appropriate + sink->send(session, name, m); +} + +void SenderImpl::cancelImpl() +{ + state = CANCELLED; + sink->cancel(session, name); + parent.senderCancelled(name); } }}} // namespace qpid::client::amqp0_10 diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.h b/cpp/src/qpid/client/amqp0_10/SenderImpl.h index e737450ba1..e7d7b11c0e 100644 --- a/cpp/src/qpid/client/amqp0_10/SenderImpl.h +++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.h @@ -21,17 +21,20 @@ * under the License. * */ +#include "qpid/messaging/Address.h" #include "qpid/messaging/Message.h" #include "qpid/messaging/SenderImpl.h" +#include "qpid/messaging/Variant.h" #include "qpid/client/AsyncSession.h" +#include "qpid/client/amqp0_10/SessionImpl.h" #include <memory> namespace qpid { namespace client { namespace amqp0_10 { +class AddressResolution; class MessageSink; -class SessionImpl; /** * @@ -39,19 +42,66 @@ class SessionImpl; class SenderImpl : public qpid::messaging::SenderImpl { public: - SenderImpl(SessionImpl& parent, const std::string& name, std::auto_ptr<MessageSink> sink); + enum State {UNRESOLVED, ACTIVE, CANCELLED}; + + SenderImpl(SessionImpl& parent, const std::string& name, + const qpid::messaging::Address& address, + const qpid::messaging::Variant::Map& options); void send(qpid::messaging::Message&); void cancel(); - void setSession(qpid::client::AsyncSession); + void init(qpid::client::AsyncSession, AddressResolution&); private: SessionImpl& parent; const std::string name; + const qpid::messaging::Address address; + const qpid::messaging::Variant::Map options; + State state; std::auto_ptr<MessageSink> sink; qpid::client::AsyncSession session; std::string destination; std::string routingKey; + + //logic for application visible methods: + void sendImpl(qpid::messaging::Message&); + void cancelImpl(); + + //functors for application visible methods (allowing locking and + //retry to be centralised): + struct Command + { + SenderImpl& impl; + + Command(SenderImpl& i) : impl(i) {} + }; + + struct Send : Command + { + qpid::messaging::Message* message; + + Send(SenderImpl& i, qpid::messaging::Message* m) : Command(i), message(m) {} + void operator()() { impl.sendImpl(*message); } + }; + + struct Cancel : Command + { + Cancel(SenderImpl& i) : Command(i) {} + void operator()() { impl.cancelImpl(); } + }; + + //helper templates for some common patterns + template <class F> void execute() + { + F f(*this); + parent.execute(f); + } + + template <class F, class P> void execute1(P p) + { + F f(*this, p); + parent.execute(f); + } }; }}} // namespace qpid::client::amqp0_10 diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index e069520e95..0e6c430d89 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -19,6 +19,7 @@ * */ #include "qpid/client/amqp0_10/SessionImpl.h" +#include "qpid/client/amqp0_10/ConnectionImpl.h" #include "qpid/client/amqp0_10/ReceiverImpl.h" #include "qpid/client/amqp0_10/SenderImpl.h" #include "qpid/client/amqp0_10/MessageSource.h" @@ -49,56 +50,55 @@ namespace qpid { namespace client { namespace amqp0_10 { -SessionImpl::SessionImpl(qpid::client::Session s) : session(s), incoming(session) {} +SessionImpl::SessionImpl(ConnectionImpl& c) : connection(c) {} +void SessionImpl::sync() +{ + retry<Sync>(); +} + +void SessionImpl::flush() +{ + retry<Flush>(); +} + void SessionImpl::commit() { - qpid::sys::Mutex::ScopedLock l(lock); - incoming.accept(); - session.txCommit(); + if (!execute<Commit>()) { + throw Exception();//TODO: what type? + } } void SessionImpl::rollback() { - qpid::sys::Mutex::ScopedLock l(lock); - for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) i->second.stop(); - //ensure that stop has been processed and all previously sent - //messages are available for release: - session.sync(); - incoming.releaseAll(); - session.txRollback(); - for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) i->second.start(); + //If the session fails during this operation, the transaction will + //be rolled back anyway. + execute<Rollback>(); } void SessionImpl::acknowledge() { - qpid::sys::Mutex::ScopedLock l(lock); - incoming.accept(); + //Should probably throw an exception on failure here, or indicate + //it through a return type at least. Failure means that the + //message may be redelivered; i.e. the application cannot delete + //any state necessary for preventing reprocessing of the message + execute<Acknowledge>(); } void SessionImpl::reject(qpid::messaging::Message& m) { - qpid::sys::Mutex::ScopedLock l(lock); - SequenceSet set; - set.add(MessageImplAccess::get(m).getInternalId()); - session.messageReject(set); + //Possibly want to somehow indicate failure here as well. Less + //clear need as compared to acknowledge however. + execute1<Reject>(m); } void SessionImpl::close() { + connection.closed(*this); session.close(); } -void translate(const VariantMap& options, SubscriptionSettings& settings) -{ - //TODO: fill this out - VariantMap::const_iterator i = options.find("auto_acknowledge"); - if (i != options.end()) { - settings.autoAck = i->second.asInt32(); - } -} - template <class T, class S> boost::intrusive_ptr<S> getImplPtr(T& t) { return boost::dynamic_pointer_cast<S>(qpid::client::PrivateImplRef<T>::get(t)); @@ -114,38 +114,88 @@ template <class T> void getFreeKey(std::string& key, T& map) key = name; } -Sender SessionImpl::createSender(const qpid::messaging::Address& address, const VariantMap& options) -{ + +void SessionImpl::setSession(qpid::client::Session s) +{ qpid::sys::Mutex::ScopedLock l(lock); - std::auto_ptr<MessageSink> sink = resolver.resolveSink(session, address, options); - std::string name = address; - getFreeKey(name, senders); - Sender sender(new SenderImpl(*this, name, sink)); - getImplPtr<Sender, SenderImpl>(sender)->setSession(session); - senders[name] = sender; - return sender; + session = s; + incoming.setSession(session); + for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) { + getImplPtr<Receiver, ReceiverImpl>(i->second)->init(session, resolver); + } + for (Senders::iterator i = senders.begin(); i != senders.end(); ++i) { + getImplPtr<Sender, SenderImpl>(i->second)->init(session, resolver); + } } + +struct SessionImpl::CreateReceiver : Command +{ + qpid::messaging::Receiver result; + const qpid::messaging::Address& address; + const Filter* filter; + const qpid::messaging::Variant::Map& options; + + CreateReceiver(SessionImpl& i, const qpid::messaging::Address& a, const Filter* f, + const qpid::messaging::Variant::Map& o) : + Command(i), address(a), filter(f), options(o) {} + void operator()() { result = impl.createReceiverImpl(address, filter, options); } +}; + Receiver SessionImpl::createReceiver(const qpid::messaging::Address& address, const VariantMap& options) { - return addReceiver(address, 0, options); + CreateReceiver f(*this, address, 0, options); + while (!execute(f)) {} + return f.result; } -Receiver SessionImpl::createReceiver(const qpid::messaging::Address& address, const Filter& filter, const VariantMap& options) + +Receiver SessionImpl::createReceiver(const qpid::messaging::Address& address, + const Filter& filter, const VariantMap& options) { - return addReceiver(address, &filter, options); + CreateReceiver f(*this, address, &filter, options); + while (!execute(f)) {} + return f.result; } -Receiver SessionImpl::addReceiver(const qpid::messaging::Address& address, const Filter* filter, const VariantMap& options) +Receiver SessionImpl::createReceiverImpl(const qpid::messaging::Address& address, + const Filter* filter, const VariantMap& options) { - qpid::sys::Mutex::ScopedLock l(lock); - std::auto_ptr<MessageSource> source = resolver.resolveSource(session, address, filter, options); std::string name = address; getFreeKey(name, receivers); - Receiver receiver(new ReceiverImpl(*this, name, source)); - getImplPtr<Receiver, ReceiverImpl>(receiver)->setSession(session); + Receiver receiver(new ReceiverImpl(*this, name, address, filter, options)); + getImplPtr<Receiver, ReceiverImpl>(receiver)->init(session, resolver); receivers[name] = receiver; return receiver; } +struct SessionImpl::CreateSender : Command +{ + qpid::messaging::Sender result; + const qpid::messaging::Address& address; + const qpid::messaging::Variant::Map& options; + + CreateSender(SessionImpl& i, const qpid::messaging::Address& a, + const qpid::messaging::Variant::Map& o) : + Command(i), address(a), options(o) {} + void operator()() { result = impl.createSenderImpl(address, options); } +}; + +Sender SessionImpl::createSender(const qpid::messaging::Address& address, const VariantMap& options) +{ + CreateSender f(*this, address, options); + while (!execute(f)) {} + return f.result; +} + +Sender SessionImpl::createSenderImpl(const qpid::messaging::Address& address, const VariantMap& options) +{ + std::string name = address; + getFreeKey(name, senders); + Sender sender(new SenderImpl(*this, name, address, options)); + getImplPtr<Sender, SenderImpl>(sender)->init(session, resolver); + senders[name] = sender; + return sender; +} + qpid::messaging::Address SessionImpl::createTempQueue(const std::string& baseName) { std::string name = baseName + std::string("_") + session.getId().getName(); @@ -212,27 +262,80 @@ bool SessionImpl::acceptAny(qpid::messaging::Message* message, bool isDispatch, bool SessionImpl::getIncoming(IncomingMessages::Handler& handler, qpid::sys::Duration timeout) { - qpid::sys::Mutex::ScopedLock l(lock); return incoming.get(handler, timeout); } -bool SessionImpl::dispatch(qpid::sys::Duration timeout) +bool SessionImpl::get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::sys::Duration timeout) { - qpid::messaging::Message message; - IncomingMessageHandler handler(boost::bind(&SessionImpl::acceptAny, this, &message, true, _1)); + IncomingMessageHandler handler(boost::bind(&SessionImpl::accept, this, &receiver, &message, false, _1)); return getIncoming(handler, timeout); } -bool SessionImpl::get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::sys::Duration timeout) +bool SessionImpl::dispatch(qpid::sys::Duration timeout) { - IncomingMessageHandler handler(boost::bind(&SessionImpl::accept, this, &receiver, &message, false, _1)); - return getIncoming(handler, timeout); + qpid::sys::Mutex::ScopedLock l(lock); + while (true) { + try { + qpid::messaging::Message message; + IncomingMessageHandler handler(boost::bind(&SessionImpl::acceptAny, this, &message, true, _1)); + return getIncoming(handler, timeout); + } catch (TransportFailure&) { + reconnect(); + } + } } bool SessionImpl::fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout) { - IncomingMessageHandler handler(boost::bind(&SessionImpl::acceptAny, this, &message, false, _1)); - return getIncoming(handler, timeout); + qpid::sys::Mutex::ScopedLock l(lock); + while (true) { + try { + IncomingMessageHandler handler(boost::bind(&SessionImpl::acceptAny, this, &message, false, _1)); + return getIncoming(handler, timeout); + } catch (TransportFailure&) { + reconnect(); + } + } +} + +void SessionImpl::syncImpl() +{ + session.sync(); +} + +void SessionImpl::flushImpl() +{ + session.flush(); +} + + +void SessionImpl::commitImpl() +{ + incoming.accept(); + session.txCommit(); +} + +void SessionImpl::rollbackImpl() +{ + for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) i->second.stop(); + //ensure that stop has been processed and all previously sent + //messages are available for release: + session.sync(); + incoming.releaseAll(); + session.txRollback(); + for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) i->second.start(); +} + +void SessionImpl::acknowledgeImpl() +{ + incoming.accept(); +} + +void SessionImpl::rejectImpl(qpid::messaging::Message& m) +{ + SequenceSet set; + set.add(MessageImplAccess::get(m).getInternalId()); + session.messageReject(set); } qpid::messaging::Message SessionImpl::fetch(qpid::sys::Duration timeout) @@ -244,28 +347,19 @@ qpid::messaging::Message SessionImpl::fetch(qpid::sys::Duration timeout) void SessionImpl::receiverCancelled(const std::string& name) { - { - qpid::sys::Mutex::ScopedLock l(lock); - receivers.erase(name); - } + receivers.erase(name); session.sync(); incoming.releasePending(name); } void SessionImpl::senderCancelled(const std::string& name) { - qpid::sys::Mutex::ScopedLock l(lock); senders.erase(name); } -void SessionImpl::sync() +void SessionImpl::reconnect() { - session.sync(); -} - -void SessionImpl::flush() -{ - session.flush(); + connection.reconnect(); } void* SessionImpl::getLastConfirmedSent() diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/cpp/src/qpid/client/amqp0_10/SessionImpl.h index 7032a8db4a..1c7db17bbb 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.h +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.h @@ -43,6 +43,7 @@ class Session; namespace client { namespace amqp0_10 { +class ConnectionImpl; class ReceiverImpl; class SenderImpl; @@ -53,7 +54,7 @@ class SenderImpl; class SessionImpl : public qpid::messaging::SessionImpl { public: - SessionImpl(qpid::client::Session); + SessionImpl(ConnectionImpl&); void commit(); void rollback(); void acknowledge(); @@ -81,26 +82,120 @@ class SessionImpl : public qpid::messaging::SessionImpl void receiverCancelled(const std::string& name); void senderCancelled(const std::string& name); - + + void setSession(qpid::client::Session); + + template <class T> bool execute(T& f) + { + try { + qpid::sys::Mutex::ScopedLock l(lock); + f(); + return true; + } catch (TransportFailure&) { + reconnect(); + return false; + } + } + static SessionImpl& convert(qpid::messaging::Session&); - qpid::client::Session session; private: typedef std::map<std::string, qpid::messaging::Receiver> Receivers; typedef std::map<std::string, qpid::messaging::Sender> Senders; qpid::sys::Mutex lock; + ConnectionImpl& connection; + qpid::client::Session session; AddressResolution resolver; IncomingMessages incoming; Receivers receivers; Senders senders; - qpid::messaging::Receiver addReceiver(const qpid::messaging::Address& address, - const qpid::messaging::Filter* filter, - const qpid::messaging::VariantMap& options); bool acceptAny(qpid::messaging::Message*, bool, IncomingMessages::MessageTransfer&); bool accept(ReceiverImpl*, qpid::messaging::Message*, bool, IncomingMessages::MessageTransfer&); bool getIncoming(IncomingMessages::Handler& handler, qpid::sys::Duration timeout); + void reconnect(); + + void commitImpl(); + void rollbackImpl(); + void acknowledgeImpl(); + void rejectImpl(qpid::messaging::Message&); + void closeImpl(); + void syncImpl(); + void flushImpl(); + qpid::messaging::Sender createSenderImpl(const qpid::messaging::Address& address, + const qpid::messaging::VariantMap& options); + qpid::messaging::Receiver createReceiverImpl(const qpid::messaging::Address& address, + const qpid::messaging::Filter* filter, + const qpid::messaging::VariantMap& options); + + //functors for public facing methods (allows locking and retry + //logic to be centralised) + struct Command + { + SessionImpl& impl; + + Command(SessionImpl& i) : impl(i) {} + }; + + struct Commit : Command + { + Commit(SessionImpl& i) : Command(i) {} + void operator()() { impl.commitImpl(); } + }; + + struct Rollback : Command + { + Rollback(SessionImpl& i) : Command(i) {} + void operator()() { impl.rollbackImpl(); } + }; + + struct Acknowledge : Command + { + Acknowledge(SessionImpl& i) : Command(i) {} + void operator()() { impl.acknowledgeImpl(); } + }; + + struct Sync : Command + { + Sync(SessionImpl& i) : Command(i) {} + void operator()() { impl.syncImpl(); } + }; + + struct Flush : Command + { + Flush(SessionImpl& i) : Command(i) {} + void operator()() { impl.flushImpl(); } + }; + + struct Reject : Command + { + qpid::messaging::Message& message; + + Reject(SessionImpl& i, qpid::messaging::Message& m) : Command(i), message(m) {} + void operator()() { impl.rejectImpl(message); } + }; + + struct CreateSender; + struct CreateReceiver; + + //helper templates for some common patterns + template <class F> bool execute() + { + F f(*this); + return execute(f); + } + + template <class F> void retry() + { + while (!execute<F>()) {} + } + + template <class F, class P> bool execute1(P p) + { + F f(*this, p); + return execute(f); + } }; }}} // namespace qpid::client::amqp0_10 diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 2e04c85b93..a15ba3578c 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -270,6 +270,11 @@ check_PROGRAMS+=qrsh qrsh_SOURCES=qrsh.cpp qrsh_LDADD=$(lib_client) +check_PROGRAMS+=qpid_stream +qpid_stream_INCLUDES=$(PUBLIC_INCLUDES) +qpid_stream_SOURCES=qpid_stream.cpp +qpid_stream_LDADD=$(lib_client) + TESTS_ENVIRONMENT = \ VALGRIND=$(VALGRIND) \ diff --git a/cpp/src/tests/MessagingSessionTests.cpp b/cpp/src/tests/MessagingSessionTests.cpp index be0b6f42d0..2cbcc8d500 100644 --- a/cpp/src/tests/MessagingSessionTests.cpp +++ b/cpp/src/tests/MessagingSessionTests.cpp @@ -183,6 +183,26 @@ QPID_AUTO_TEST_CASE(testSimpleSendReceive) BOOST_CHECK_EQUAL(in.getBytes(), out.getBytes()); } +QPID_AUTO_TEST_CASE(testSendReceiveHeaders) +{ + QueueFixture fix; + Sender sender = fix.session.createSender(fix.queue); + Message out("test-message"); + for (uint i = 0; i < 10; ++i) { + out.getHeaders()["a"] = i; + sender.send(out); + } + Receiver receiver = fix.session.createReceiver(fix.queue); + Message in; + for (uint i = 0; i < 10; ++i) { + //Message in = receiver.fetch(5 * qpid::sys::TIME_SEC); + BOOST_CHECK(receiver.fetch(in, 5 * qpid::sys::TIME_SEC)); + BOOST_CHECK_EQUAL(in.getBytes(), out.getBytes()); + BOOST_CHECK_EQUAL(in.getHeaders()["a"].asUint32(), i); + fix.session.acknowledge(); + } +} + QPID_AUTO_TEST_CASE(testSenderError) { MessagingFixture fix; diff --git a/cpp/src/tests/qpid_stream.cpp b/cpp/src/tests/qpid_stream.cpp new file mode 100644 index 0000000000..8e02baa8a0 --- /dev/null +++ b/cpp/src/tests/qpid_stream.cpp @@ -0,0 +1,163 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/messaging/Connection.h> +#include <qpid/messaging/Message.h> +#include <qpid/messaging/Receiver.h> +#include <qpid/messaging/Sender.h> +#include <qpid/messaging/Session.h> +#include <qpid/sys/Runnable.h> +#include <qpid/sys/Thread.h> +#include <qpid/sys/Time.h> +#include <qpid/Options.h> +#include <iostream> +#include <string> + +using namespace qpid::messaging; +using namespace qpid::sys; + +struct Args : public qpid::Options +{ + std::string url; + std::string address; + uint rate; + bool durable; + + Args() : url("amqp:tcp:127.0.0.1:5672"), address("test-queue"), rate(1000), durable(false) + { + addOptions() + ("url", qpid::optValue(url, "URL"), "Url to connect to.") + ("address", qpid::optValue(address, "ADDRESS"), "Address to stream messages through.") + ("rate", qpid::optValue(rate, "msgs/sec"), "Rate at which to stream messages.") + ("durable", qpid::optValue(durable, "true|false"), "Mark messages as durable."); + } +}; + +Args opts; + +const std::string TIMESTAMP = "ts"; + +uint64_t timestamp(const AbsTime& time) +{ + Duration t(time); + return t; +} + +struct Client : Runnable +{ + virtual ~Client() {} + virtual void doWork(Session&) = 0; + + void run() + { + try { + Connection connection = Connection::open(opts.url); + Session session = connection.newSession(); + doWork(session); + session.close(); + connection.close(); + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + } + + Thread thread; + + void start() { thread = Thread(this); } + void join() { thread.join(); } +}; + +struct Publish : Client +{ + void doWork(Session& session) + { + Sender sender = session.createSender(opts.address); + Message msg; + uint64_t interval = TIME_SEC / opts.rate; + uint64_t sent = 0, missedRate = 0; + AbsTime start = now(); + while (true) { + AbsTime sentAt = now(); + msg.getHeaders()[TIMESTAMP] = timestamp(sentAt); + sender.send(msg); + ++sent; + AbsTime waitTill(start, sent*interval); + Duration delay(sentAt, waitTill); + if (delay < 0) { + ++missedRate; + } else { + qpid::sys::usleep(delay / TIME_USEC); + } + } + } +}; + +struct Consume : Client +{ + void doWork(Session& session) + { + Message msg; + uint64_t received = 0; + double minLatency = std::numeric_limits<double>::max(); + double maxLatency = 0; + double totalLatency = 0; + Receiver receiver = session.createReceiver(opts.address); + while (receiver.fetch(msg)) { + session.acknowledge();//TODO: add batching option + ++received; + //calculate latency + uint64_t receivedAt = timestamp(now()); + uint64_t sentAt = msg.getHeaders()[TIMESTAMP].asUint64(); + double latency = ((double) (receivedAt - sentAt)) / TIME_MSEC; + + //update avg, min & max + minLatency = std::min(minLatency, latency); + maxLatency = std::max(maxLatency, latency); + totalLatency += latency; + + if (received % opts.rate == 0) { + std::cout << "count=" << received + << ", avg=" << (totalLatency/received) + << ", min=" << minLatency + << ", max=" << maxLatency << std::endl; + } + } + } +}; + +int main(int argc, char** argv) +{ + try { + opts.parse(argc, argv); + Publish publish; + Consume consume; + publish.start(); + consume.start(); + consume.join(); + publish.join(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; +} + + |
