summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2009-09-07 09:15:40 +0000
committerGordon Sim <gsim@apache.org>2009-09-07 09:15:40 +0000
commit5748390087f011559549ec33f538e5a3babd43fe (patch)
tree7bb9ba35d450e778865c0c26d72735180582a31a /cpp/src
parent32bfd712449b0839d9f302f8168c4f588e3c6070 (diff)
downloadqpid-python-5748390087f011559549ec33f538e5a3babd43fe.tar.gz
QPID-664: Added automatic reconnection logic.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@812049 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp114
-rw-r--r--cpp/src/qpid/client/amqp0_10/ConnectionImpl.h26
-rw-r--r--cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp9
-rw-r--r--cpp/src/qpid/client/amqp0_10/IncomingMessages.h2
-rw-r--r--cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp164
-rw-r--r--cpp/src/qpid/client/amqp0_10/ReceiverImpl.h105
-rw-r--r--cpp/src/qpid/client/amqp0_10/SenderImpl.cpp39
-rw-r--r--cpp/src/qpid/client/amqp0_10/SenderImpl.h56
-rw-r--r--cpp/src/qpid/client/amqp0_10/SessionImpl.cpp224
-rw-r--r--cpp/src/qpid/client/amqp0_10/SessionImpl.h107
-rw-r--r--cpp/src/tests/Makefile.am5
-rw-r--r--cpp/src/tests/MessagingSessionTests.cpp20
-rw-r--r--cpp/src/tests/qpid_stream.cpp163
13 files changed, 875 insertions, 159 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
index 9f738731e2..3a735b5698 100644
--- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
@@ -21,14 +21,16 @@
#include "ConnectionImpl.h"
#include "SessionImpl.h"
#include "qpid/messaging/Session.h"
-#include "qpid/client/ConnectionSettings.h"
+#include "qpid/client/PrivateImplRef.h"
#include "qpid/log/Statement.h"
+#include <boost/intrusive_ptr.hpp>
namespace qpid {
namespace client {
namespace amqp0_10 {
using qpid::messaging::Variant;
+using namespace qpid::sys;
template <class T> void setIfFound(const Variant::Map& map, const std::string& key, T& value)
{
@@ -56,24 +58,124 @@ void convert(const Variant::Map& from, ConnectionSettings& to)
setIfFound(from, "bounds", to.bounds);
}
-ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options)
+ConnectionImpl::ConnectionImpl(const std::string& u, const Variant::Map& options) :
+ url(u), reconnectionEnabled(true), timeout(-1),
+ minRetryInterval(1), maxRetryInterval(30)
{
QPID_LOG(debug, "Opening connection to " << url << " with " << options);
- Url u(url);
- ConnectionSettings settings;
convert(options, settings);
- connection.open(u, settings);
+ setIfFound(options, "reconnection-enabled", reconnectionEnabled);
+ setIfFound(options, "reconnection-timeout", timeout);
+ setIfFound(options, "min-retry-interval", minRetryInterval);
+ setIfFound(options, "max-retry-interval", maxRetryInterval);
+ connection.open(url, settings);
}
void ConnectionImpl::close()
{
+ qpid::sys::Mutex::ScopedLock l(lock);
connection.close();
}
+boost::intrusive_ptr<SessionImpl> getImplPtr(qpid::messaging::Session& session)
+{
+ return boost::dynamic_pointer_cast<SessionImpl>(
+ qpid::client::PrivateImplRef<qpid::messaging::Session>::get(session)
+ );
+}
+
+void ConnectionImpl::closed(SessionImpl& s)
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) {
+ if (getImplPtr(*i).get() == &s) {
+ sessions.erase(i);
+ break;
+ }
+ }
+}
+
qpid::messaging::Session ConnectionImpl::newSession()
{
- qpid::messaging::Session impl(new SessionImpl(connection.newSession()));
+ qpid::messaging::Session impl(new SessionImpl(*this));
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ sessions.push_back(impl);
+ }
+ try {
+ getImplPtr(impl)->setSession(connection.newSession());
+ } catch (const TransportFailure&) {
+ reconnect();
+ }
return impl;
}
+void ConnectionImpl::reconnect()
+{
+ AbsTime start = now();
+ ScopedLock<Semaphore> l(semaphore);
+ if (!connection.isOpen()) connect(start);
+}
+
+bool expired(const AbsTime& start, int timeout)
+{
+ if (timeout == 0) return true;
+ if (timeout < 0) return false;
+ Duration used(start, now());
+ Duration allowed = timeout * TIME_SEC;
+ return allowed > used;
+}
+
+void ConnectionImpl::connect(const AbsTime& started)
+{
+ for (int i = minRetryInterval; !tryConnect(); i = std::min(i * 2, maxRetryInterval)) {
+ if (expired(started, timeout)) throw TransportFailure();
+ else qpid::sys::sleep(i);
+ }
+}
+
+bool ConnectionImpl::tryConnect()
+{
+ if (tryConnect(url) || tryConnect(connection.getKnownBrokers())) {
+ return resetSessions();
+ } else {
+ return false;
+ }
+}
+
+bool ConnectionImpl::tryConnect(const Url& u)
+{
+ try {
+ QPID_LOG(info, "Trying to connect to " << url << "...");
+ connection.open(u, settings);
+ return true;
+ } catch (const Exception& e) {
+ //TODO: need to fix timeout on open so that it throws TransportFailure
+ QPID_LOG(info, "Failed to connect to " << u << ": " << e.what());
+ }
+ return false;
+}
+
+bool ConnectionImpl::tryConnect(const std::vector<Url>& urls)
+{
+ for (std::vector<Url>::const_iterator i = urls.begin(); i != urls.end(); ++i) {
+ if (tryConnect(*i)) return true;
+ }
+ return false;
+}
+
+bool ConnectionImpl::resetSessions()
+{
+ try {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) {
+ getImplPtr(*i)->setSession(connection.newSession());
+ }
+ return true;
+ } catch (const TransportFailure&) {
+ QPID_LOG(debug, "Connection failed while re-inialising sessions");
+ return false;
+ }
+}
+
}}} // namespace qpid::client::amqp0_10
diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
index 120a8ab9d8..565f2ec7ec 100644
--- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
+++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
@@ -23,20 +23,46 @@
*/
#include "qpid/messaging/ConnectionImpl.h"
#include "qpid/messaging/Variant.h"
+#include "qpid/Url.h"
#include "qpid/client/Connection.h"
+#include "qpid/client/ConnectionSettings.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Semaphore.h"
+#include <vector>
namespace qpid {
namespace client {
namespace amqp0_10 {
+class SessionImpl;
+
class ConnectionImpl : public qpid::messaging::ConnectionImpl
{
public:
ConnectionImpl(const std::string& url, const qpid::messaging::Variant::Map& options);
void close();
qpid::messaging::Session newSession();
+ void closed(SessionImpl&);
+ void reconnect();
private:
+ typedef std::vector<qpid::messaging::Session> Sessions;
+
+ qpid::sys::Mutex lock;//used to protect data structures
+ qpid::sys::Semaphore semaphore;//used to coordinate reconnection
qpid::client::Connection connection;
+ qpid::Url url;
+ qpid::client::ConnectionSettings settings;
+ Sessions sessions;
+ bool reconnectionEnabled;
+ int timeout;
+ int minRetryInterval;
+ int maxRetryInterval;
+
+ void connect(const qpid::sys::AbsTime& started);
+ bool tryConnect();
+ bool tryConnect(const std::vector<Url>& urls);
+ bool tryConnect(const Url&);
+ bool resetSessions();
};
}}} // namespace qpid::client::amqp0_10
diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
index f9bd355a78..c54a186365 100644
--- a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
+++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
@@ -82,9 +82,11 @@ struct MatchAndTrack
};
}
-IncomingMessages::IncomingMessages(qpid::client::AsyncSession s) :
- session(s),
- incoming(SessionBase_0_10Access(session).get()->getDemux().getDefault()) {}
+void IncomingMessages::setSession(qpid::client::AsyncSession s)
+{
+ session = s;
+ incoming = SessionBase_0_10Access(session).get()->getDemux().getDefault();
+}
bool IncomingMessages::get(Handler& handler, Duration timeout)
{
@@ -208,6 +210,7 @@ void populateHeaders(qpid::messaging::Message& message,
if (messageProperties->hasReplyTo()) {
message.setReplyTo(AddressResolution::convert(messageProperties->getReplyTo()));
}
+ message.getHeaders().clear();
translate(messageProperties->getApplicationHeaders(), message.getHeaders());
//TODO: convert other message properties
}
diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.h b/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
index c4346fd7d7..5e28877305 100644
--- a/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
+++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
@@ -67,7 +67,7 @@ class IncomingMessages
virtual bool accept(MessageTransfer& transfer) = 0;
};
- IncomingMessages(qpid::client::AsyncSession session);
+ void setSession(qpid::client::AsyncSession session);
bool get(Handler& handler, qpid::sys::Duration timeout);
//bool get(qpid::messaging::Message& message, qpid::sys::Duration timeout);
//bool get(const std::string& destination, qpid::messaging::Message& message, qpid::sys::Duration timeout);
diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
index e6ed4bfc4e..31efff38a6 100644
--- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
@@ -19,6 +19,7 @@
*
*/
#include "ReceiverImpl.h"
+#include "AddressResolution.h"
#include "MessageSource.h"
#include "SessionImpl.h"
#include "qpid/messaging/MessageListener.h"
@@ -38,11 +39,6 @@ void ReceiverImpl::received(qpid::messaging::Message&)
window = capacity;
}
}
-
-bool ReceiverImpl::get(qpid::messaging::Message& message, qpid::sys::Duration timeout)
-{
- return parent.get(*this, message, timeout);
-}
qpid::messaging::Message ReceiverImpl::get(qpid::sys::Duration timeout)
{
@@ -50,24 +46,6 @@ qpid::messaging::Message ReceiverImpl::get(qpid::sys::Duration timeout)
if (!get(result, timeout)) throw Receiver::NoMessageAvailable();
return result;
}
-
-bool ReceiverImpl::fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout)
-{
- if (capacity == 0 && !cancelled) {
- session.messageFlow(destination, CREDIT_UNIT_MESSAGE, 1);
- if (!started) session.messageFlow(destination, CREDIT_UNIT_BYTE, byteCredit);
- }
-
- if (get(message, timeout)) {
- return true;
- } else {
- if (!cancelled) {
- sync(session).messageFlush(destination);
- start();//reallocate credit
- }
- return get(message, 0);
- }
-}
qpid::messaging::Message ReceiverImpl::fetch(qpid::sys::Duration timeout)
{
@@ -76,71 +54,137 @@ qpid::messaging::Message ReceiverImpl::fetch(qpid::sys::Duration timeout)
return result;
}
+bool ReceiverImpl::get(qpid::messaging::Message& message, qpid::sys::Duration timeout)
+{
+ Get f(*this, message, timeout);
+ while (!parent.execute(f)) {}
+ return f.result;
+}
+
+bool ReceiverImpl::fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout)
+{
+ Fetch f(*this, message, timeout);
+ while (!parent.execute(f)) {}
+ return f.result;
+}
+
void ReceiverImpl::cancel()
{
- if (!cancelled) {
- //TODO: should syncronicity be an optional argument to this call?
- source->cancel(session, destination);
- //need to be sure cancel is complete and all incoming
- //framesets are processed before removing the receiver
- parent.receiverCancelled(destination);
- cancelled = true;
- }
+ execute<Cancel>();
}
void ReceiverImpl::start()
{
- if (!cancelled) {
- started = true;
- session.messageSetFlowMode(destination, capacity > 0);
+ execute<Start>();
+}
+
+void ReceiverImpl::stop()
+{
+ execute<Stop>();
+}
+
+void ReceiverImpl::setCapacity(uint32_t c)
+{
+ execute1<SetCapacity>(c);
+}
+
+void ReceiverImpl::startFlow()
+{
+ if (capacity > 0) {
+ session.messageSetFlowMode(destination, FLOW_MODE_WINDOW);
session.messageFlow(destination, CREDIT_UNIT_MESSAGE, capacity);
session.messageFlow(destination, CREDIT_UNIT_BYTE, byteCredit);
window = capacity;
}
}
-void ReceiverImpl::stop()
+void ReceiverImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver)
{
- session.messageStop(destination);
- started = false;
+
+ session = s;
+ if (state == UNRESOLVED) {
+ source = resolver.resolveSource(session, address, filter, options);
+ state = STOPPED;//TODO: if session is started, go straight to started
+ }
+ if (state == CANCELLED) {
+ source->cancel(session, destination);
+ parent.receiverCancelled(destination);
+ } else {
+ source->subscribe(session, destination);
+ if (state == STARTED) start();
+ }
}
-void ReceiverImpl::subscribe()
+void ReceiverImpl::setListener(qpid::messaging::MessageListener* l) { listener = l; }
+qpid::messaging::MessageListener* ReceiverImpl::getListener() { return listener; }
+
+const std::string& ReceiverImpl::getName() const { return destination; }
+
+ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name,
+ const qpid::messaging::Address& a,
+ const qpid::messaging::Filter* f,
+ const qpid::messaging::Variant::Map& o) :
+
+ parent(p), destination(name), address(a), filter(f), options(o), byteCredit(0xFFFFFFFF),
+ state(UNRESOLVED), capacity(0), listener(0), window(0) {}
+
+bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::sys::Duration timeout)
{
- source->subscribe(session, destination);
+ return parent.get(*this, message, timeout);
}
-void ReceiverImpl::setSession(qpid::client::AsyncSession s)
+bool ReceiverImpl::fetchImpl(qpid::messaging::Message& message, qpid::sys::Duration timeout)
+{
+ if (state == CANCELLED) return false;//TODO: or should this be an error?
+
+ if (capacity == 0 || state != STARTED) {
+ session.messageSetFlowMode(destination, FLOW_MODE_CREDIT);
+ session.messageFlow(destination, CREDIT_UNIT_MESSAGE, 1);
+ session.messageFlow(destination, CREDIT_UNIT_BYTE, 0xFFFFFFFF);
+ }
+
+ if (getImpl(message, timeout)) {
+ return true;
+ } else {
+ sync(session).messageFlush(destination);
+ startFlow();//reallocate credit
+ return getImpl(message, 0);
+ }
+}
+
+void ReceiverImpl::cancelImpl()
{
- session = s;
- if (!cancelled) {
- subscribe();
- //if we were in started state before the session was changed,
- //start again on this new session
- //TODO: locking if receiver is to be threadsafe...
- if (started) start();
+ if (state != CANCELLED) {
+ state = CANCELLED;
+ source->cancel(session, destination);
+ parent.receiverCancelled(destination);
}
}
-void ReceiverImpl::setCapacity(uint32_t c)
+void ReceiverImpl::startImpl()
+{
+ if (state == STOPPED) {
+ state = STARTED;
+ startFlow();
+ }
+}
+
+void ReceiverImpl::stopImpl()
+{
+ state = STOPPED;
+ session.messageStop(destination);
+}
+
+void ReceiverImpl::setCapacityImpl(uint32_t c)
{
if (c != capacity) {
capacity = c;
- if (!cancelled && started) {
- stop();
- start();
+ if (state == STARTED) {
+ session.messageStop(destination);
+ startFlow();
}
}
}
-void ReceiverImpl::setListener(qpid::messaging::MessageListener* l) { listener = l; }
-qpid::messaging::MessageListener* ReceiverImpl::getListener() { return listener; }
-
-const std::string& ReceiverImpl::getName() const { return destination; }
-
-ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name, std::auto_ptr<MessageSource> s) :
- parent(p), source(s), destination(name), byteCredit(0xFFFFFFFF),
- capacity(0), started(false), cancelled(false), listener(0), window(0) {}
-
}}} // namespace qpid::client::amqp0_10
diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
index b549242d35..509c784513 100644
--- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
+++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
@@ -21,9 +21,13 @@
* under the License.
*
*/
+#include "qpid/messaging/Address.h"
+#include "qpid/messaging/Filter.h"
#include "qpid/messaging/Message.h"
#include "qpid/messaging/ReceiverImpl.h"
+#include "qpid/messaging/Variant.h"
#include "qpid/client/AsyncSession.h"
+#include "qpid/client/amqp0_10/SessionImpl.h"
#include "qpid/sys/Time.h"
#include <memory>
@@ -31,8 +35,8 @@ namespace qpid {
namespace client {
namespace amqp0_10 {
+class AddressResolution;
class MessageSource;
-class SessionImpl;
/**
* A receiver implementation based on an AMQP 0-10 subscription.
@@ -41,8 +45,14 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl
{
public:
- ReceiverImpl(SessionImpl& parent, const std::string& name, std::auto_ptr<MessageSource> source);
+ enum State {UNRESOLVED, STOPPED, STARTED, CANCELLED};
+ ReceiverImpl(SessionImpl& parent, const std::string& name,
+ const qpid::messaging::Address& address,
+ const qpid::messaging::Filter* filter,
+ const qpid::messaging::Variant::Map& options);
+
+ void init(qpid::client::AsyncSession session, AddressResolution& resolver);
bool get(qpid::messaging::Message& message, qpid::sys::Duration timeout);
qpid::messaging::Message get(qpid::sys::Duration timeout);
bool fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout);
@@ -50,8 +60,6 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl
void cancel();
void start();
void stop();
- void subscribe();
- void setSession(qpid::client::AsyncSession s);
const std::string& getName() const;
void setCapacity(uint32_t);
void setListener(qpid::messaging::MessageListener* listener);
@@ -59,16 +67,97 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl
void received(qpid::messaging::Message& message);
private:
SessionImpl& parent;
- const std::auto_ptr<MessageSource> source;
const std::string destination;
+ const qpid::messaging::Address address;
+ const qpid::messaging::Filter* filter;
+ const qpid::messaging::Variant::Map options;
const uint32_t byteCredit;
-
+ State state;
+
+ std::auto_ptr<MessageSource> source;
uint32_t capacity;
qpid::client::AsyncSession session;
- bool started;
- bool cancelled;
qpid::messaging::MessageListener* listener;
uint32_t window;
+
+ void startFlow();
+ //implementation of public facing methods
+ bool fetchImpl(qpid::messaging::Message& message, qpid::sys::Duration timeout);
+ bool getImpl(qpid::messaging::Message& message, qpid::sys::Duration timeout);
+ void startImpl();
+ void stopImpl();
+ void cancelImpl();
+ void setCapacityImpl(uint32_t);
+
+ //functors for public facing methods (allows locking and retry
+ //logic to be centralised)
+ struct Command
+ {
+ ReceiverImpl& impl;
+
+ Command(ReceiverImpl& i) : impl(i) {}
+ };
+
+ struct Get : Command
+ {
+ qpid::messaging::Message& message;
+ qpid::sys::Duration timeout;
+ bool result;
+
+ Get(ReceiverImpl& i, qpid::messaging::Message& m, qpid::sys::Duration t) :
+ Command(i), message(m), timeout(t), result(false) {}
+ void operator()() { result = impl.getImpl(message, timeout); }
+ };
+
+ struct Fetch : Command
+ {
+ qpid::messaging::Message& message;
+ qpid::sys::Duration timeout;
+ bool result;
+
+ Fetch(ReceiverImpl& i, qpid::messaging::Message& m, qpid::sys::Duration t) :
+ Command(i), message(m), timeout(t), result(false) {}
+ void operator()() { result = impl.fetchImpl(message, timeout); }
+ };
+
+ struct Stop : Command
+ {
+ Stop(ReceiverImpl& i) : Command(i) {}
+ void operator()() { impl.stopImpl(); }
+ };
+
+ struct Start : Command
+ {
+ Start(ReceiverImpl& i) : Command(i) {}
+ void operator()() { impl.startImpl(); }
+ };
+
+ struct Cancel : Command
+ {
+ Cancel(ReceiverImpl& i) : Command(i) {}
+ void operator()() { impl.cancelImpl(); }
+ };
+
+ struct SetCapacity : Command
+ {
+ uint32_t capacity;
+
+ SetCapacity(ReceiverImpl& i, uint32_t c) : Command(i), capacity(c) {}
+ void operator()() { impl.setCapacityImpl(capacity); }
+ };
+
+ //helper templates for some common patterns
+ template <class F> void execute()
+ {
+ F f(*this);
+ parent.execute(f);
+ }
+
+ template <class F, class P> void execute1(P p)
+ {
+ F f(*this, p);
+ parent.execute(f);
+ }
};
}}} // namespace qpid::client::amqp0_10
diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
index ac36eb1537..e70ee8af6f 100644
--- a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
@@ -21,29 +21,54 @@
#include "SenderImpl.h"
#include "MessageSink.h"
#include "SessionImpl.h"
+#include "AddressResolution.h"
namespace qpid {
namespace client {
namespace amqp0_10 {
-SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name, std::auto_ptr<MessageSink> _sink) :
- parent(_parent), name(_name), sink(_sink) {}
+SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name,
+ const qpid::messaging::Address& _address,
+ const qpid::messaging::Variant::Map& _options) :
+ parent(_parent), name(_name), address(_address), options(_options), state(UNRESOLVED) {}
void SenderImpl::send(qpid::messaging::Message& m)
{
- sink->send(session, name, m);
+ execute1<Send>(&m);
}
void SenderImpl::cancel()
{
- sink->cancel(session, name);
- parent.senderCancelled(name);
+ execute<Cancel>();
}
-void SenderImpl::setSession(qpid::client::AsyncSession s)
+void SenderImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver)
{
session = s;
- sink->declare(session, name);
+ if (state == UNRESOLVED) {
+ sink = resolver.resolveSink(session, address, options);
+ state = ACTIVE;
+ }
+ if (state == CANCELLED) {
+ sink->cancel(session, name);
+ parent.senderCancelled(name);
+ } else {
+ sink->declare(session, name);
+ //TODO: replay
+ }
+}
+
+void SenderImpl::sendImpl(qpid::messaging::Message& m)
+{
+ //TODO: record for replay if appropriate
+ sink->send(session, name, m);
+}
+
+void SenderImpl::cancelImpl()
+{
+ state = CANCELLED;
+ sink->cancel(session, name);
+ parent.senderCancelled(name);
}
}}} // namespace qpid::client::amqp0_10
diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.h b/cpp/src/qpid/client/amqp0_10/SenderImpl.h
index e737450ba1..e7d7b11c0e 100644
--- a/cpp/src/qpid/client/amqp0_10/SenderImpl.h
+++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.h
@@ -21,17 +21,20 @@
* under the License.
*
*/
+#include "qpid/messaging/Address.h"
#include "qpid/messaging/Message.h"
#include "qpid/messaging/SenderImpl.h"
+#include "qpid/messaging/Variant.h"
#include "qpid/client/AsyncSession.h"
+#include "qpid/client/amqp0_10/SessionImpl.h"
#include <memory>
namespace qpid {
namespace client {
namespace amqp0_10 {
+class AddressResolution;
class MessageSink;
-class SessionImpl;
/**
*
@@ -39,19 +42,66 @@ class SessionImpl;
class SenderImpl : public qpid::messaging::SenderImpl
{
public:
- SenderImpl(SessionImpl& parent, const std::string& name, std::auto_ptr<MessageSink> sink);
+ enum State {UNRESOLVED, ACTIVE, CANCELLED};
+
+ SenderImpl(SessionImpl& parent, const std::string& name,
+ const qpid::messaging::Address& address,
+ const qpid::messaging::Variant::Map& options);
void send(qpid::messaging::Message&);
void cancel();
- void setSession(qpid::client::AsyncSession);
+ void init(qpid::client::AsyncSession, AddressResolution&);
private:
SessionImpl& parent;
const std::string name;
+ const qpid::messaging::Address address;
+ const qpid::messaging::Variant::Map options;
+ State state;
std::auto_ptr<MessageSink> sink;
qpid::client::AsyncSession session;
std::string destination;
std::string routingKey;
+
+ //logic for application visible methods:
+ void sendImpl(qpid::messaging::Message&);
+ void cancelImpl();
+
+ //functors for application visible methods (allowing locking and
+ //retry to be centralised):
+ struct Command
+ {
+ SenderImpl& impl;
+
+ Command(SenderImpl& i) : impl(i) {}
+ };
+
+ struct Send : Command
+ {
+ qpid::messaging::Message* message;
+
+ Send(SenderImpl& i, qpid::messaging::Message* m) : Command(i), message(m) {}
+ void operator()() { impl.sendImpl(*message); }
+ };
+
+ struct Cancel : Command
+ {
+ Cancel(SenderImpl& i) : Command(i) {}
+ void operator()() { impl.cancelImpl(); }
+ };
+
+ //helper templates for some common patterns
+ template <class F> void execute()
+ {
+ F f(*this);
+ parent.execute(f);
+ }
+
+ template <class F, class P> void execute1(P p)
+ {
+ F f(*this, p);
+ parent.execute(f);
+ }
};
}}} // namespace qpid::client::amqp0_10
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
index e069520e95..0e6c430d89 100644
--- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
@@ -19,6 +19,7 @@
*
*/
#include "qpid/client/amqp0_10/SessionImpl.h"
+#include "qpid/client/amqp0_10/ConnectionImpl.h"
#include "qpid/client/amqp0_10/ReceiverImpl.h"
#include "qpid/client/amqp0_10/SenderImpl.h"
#include "qpid/client/amqp0_10/MessageSource.h"
@@ -49,56 +50,55 @@ namespace qpid {
namespace client {
namespace amqp0_10 {
-SessionImpl::SessionImpl(qpid::client::Session s) : session(s), incoming(session) {}
+SessionImpl::SessionImpl(ConnectionImpl& c) : connection(c) {}
+void SessionImpl::sync()
+{
+ retry<Sync>();
+}
+
+void SessionImpl::flush()
+{
+ retry<Flush>();
+}
+
void SessionImpl::commit()
{
- qpid::sys::Mutex::ScopedLock l(lock);
- incoming.accept();
- session.txCommit();
+ if (!execute<Commit>()) {
+ throw Exception();//TODO: what type?
+ }
}
void SessionImpl::rollback()
{
- qpid::sys::Mutex::ScopedLock l(lock);
- for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) i->second.stop();
- //ensure that stop has been processed and all previously sent
- //messages are available for release:
- session.sync();
- incoming.releaseAll();
- session.txRollback();
- for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) i->second.start();
+ //If the session fails during this operation, the transaction will
+ //be rolled back anyway.
+ execute<Rollback>();
}
void SessionImpl::acknowledge()
{
- qpid::sys::Mutex::ScopedLock l(lock);
- incoming.accept();
+ //Should probably throw an exception on failure here, or indicate
+ //it through a return type at least. Failure means that the
+ //message may be redelivered; i.e. the application cannot delete
+ //any state necessary for preventing reprocessing of the message
+ execute<Acknowledge>();
}
void SessionImpl::reject(qpid::messaging::Message& m)
{
- qpid::sys::Mutex::ScopedLock l(lock);
- SequenceSet set;
- set.add(MessageImplAccess::get(m).getInternalId());
- session.messageReject(set);
+ //Possibly want to somehow indicate failure here as well. Less
+ //clear need as compared to acknowledge however.
+ execute1<Reject>(m);
}
void SessionImpl::close()
{
+ connection.closed(*this);
session.close();
}
-void translate(const VariantMap& options, SubscriptionSettings& settings)
-{
- //TODO: fill this out
- VariantMap::const_iterator i = options.find("auto_acknowledge");
- if (i != options.end()) {
- settings.autoAck = i->second.asInt32();
- }
-}
-
template <class T, class S> boost::intrusive_ptr<S> getImplPtr(T& t)
{
return boost::dynamic_pointer_cast<S>(qpid::client::PrivateImplRef<T>::get(t));
@@ -114,38 +114,88 @@ template <class T> void getFreeKey(std::string& key, T& map)
key = name;
}
-Sender SessionImpl::createSender(const qpid::messaging::Address& address, const VariantMap& options)
-{
+
+void SessionImpl::setSession(qpid::client::Session s)
+{
qpid::sys::Mutex::ScopedLock l(lock);
- std::auto_ptr<MessageSink> sink = resolver.resolveSink(session, address, options);
- std::string name = address;
- getFreeKey(name, senders);
- Sender sender(new SenderImpl(*this, name, sink));
- getImplPtr<Sender, SenderImpl>(sender)->setSession(session);
- senders[name] = sender;
- return sender;
+ session = s;
+ incoming.setSession(session);
+ for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) {
+ getImplPtr<Receiver, ReceiverImpl>(i->second)->init(session, resolver);
+ }
+ for (Senders::iterator i = senders.begin(); i != senders.end(); ++i) {
+ getImplPtr<Sender, SenderImpl>(i->second)->init(session, resolver);
+ }
}
+
+struct SessionImpl::CreateReceiver : Command
+{
+ qpid::messaging::Receiver result;
+ const qpid::messaging::Address& address;
+ const Filter* filter;
+ const qpid::messaging::Variant::Map& options;
+
+ CreateReceiver(SessionImpl& i, const qpid::messaging::Address& a, const Filter* f,
+ const qpid::messaging::Variant::Map& o) :
+ Command(i), address(a), filter(f), options(o) {}
+ void operator()() { result = impl.createReceiverImpl(address, filter, options); }
+};
+
Receiver SessionImpl::createReceiver(const qpid::messaging::Address& address, const VariantMap& options)
{
- return addReceiver(address, 0, options);
+ CreateReceiver f(*this, address, 0, options);
+ while (!execute(f)) {}
+ return f.result;
}
-Receiver SessionImpl::createReceiver(const qpid::messaging::Address& address, const Filter& filter, const VariantMap& options)
+
+Receiver SessionImpl::createReceiver(const qpid::messaging::Address& address,
+ const Filter& filter, const VariantMap& options)
{
- return addReceiver(address, &filter, options);
+ CreateReceiver f(*this, address, &filter, options);
+ while (!execute(f)) {}
+ return f.result;
}
-Receiver SessionImpl::addReceiver(const qpid::messaging::Address& address, const Filter* filter, const VariantMap& options)
+Receiver SessionImpl::createReceiverImpl(const qpid::messaging::Address& address,
+ const Filter* filter, const VariantMap& options)
{
- qpid::sys::Mutex::ScopedLock l(lock);
- std::auto_ptr<MessageSource> source = resolver.resolveSource(session, address, filter, options);
std::string name = address;
getFreeKey(name, receivers);
- Receiver receiver(new ReceiverImpl(*this, name, source));
- getImplPtr<Receiver, ReceiverImpl>(receiver)->setSession(session);
+ Receiver receiver(new ReceiverImpl(*this, name, address, filter, options));
+ getImplPtr<Receiver, ReceiverImpl>(receiver)->init(session, resolver);
receivers[name] = receiver;
return receiver;
}
+struct SessionImpl::CreateSender : Command
+{
+ qpid::messaging::Sender result;
+ const qpid::messaging::Address& address;
+ const qpid::messaging::Variant::Map& options;
+
+ CreateSender(SessionImpl& i, const qpid::messaging::Address& a,
+ const qpid::messaging::Variant::Map& o) :
+ Command(i), address(a), options(o) {}
+ void operator()() { result = impl.createSenderImpl(address, options); }
+};
+
+Sender SessionImpl::createSender(const qpid::messaging::Address& address, const VariantMap& options)
+{
+ CreateSender f(*this, address, options);
+ while (!execute(f)) {}
+ return f.result;
+}
+
+Sender SessionImpl::createSenderImpl(const qpid::messaging::Address& address, const VariantMap& options)
+{
+ std::string name = address;
+ getFreeKey(name, senders);
+ Sender sender(new SenderImpl(*this, name, address, options));
+ getImplPtr<Sender, SenderImpl>(sender)->init(session, resolver);
+ senders[name] = sender;
+ return sender;
+}
+
qpid::messaging::Address SessionImpl::createTempQueue(const std::string& baseName)
{
std::string name = baseName + std::string("_") + session.getId().getName();
@@ -212,27 +262,80 @@ bool SessionImpl::acceptAny(qpid::messaging::Message* message, bool isDispatch,
bool SessionImpl::getIncoming(IncomingMessages::Handler& handler, qpid::sys::Duration timeout)
{
- qpid::sys::Mutex::ScopedLock l(lock);
return incoming.get(handler, timeout);
}
-bool SessionImpl::dispatch(qpid::sys::Duration timeout)
+bool SessionImpl::get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::sys::Duration timeout)
{
- qpid::messaging::Message message;
- IncomingMessageHandler handler(boost::bind(&SessionImpl::acceptAny, this, &message, true, _1));
+ IncomingMessageHandler handler(boost::bind(&SessionImpl::accept, this, &receiver, &message, false, _1));
return getIncoming(handler, timeout);
}
-bool SessionImpl::get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::sys::Duration timeout)
+bool SessionImpl::dispatch(qpid::sys::Duration timeout)
{
- IncomingMessageHandler handler(boost::bind(&SessionImpl::accept, this, &receiver, &message, false, _1));
- return getIncoming(handler, timeout);
+ qpid::sys::Mutex::ScopedLock l(lock);
+ while (true) {
+ try {
+ qpid::messaging::Message message;
+ IncomingMessageHandler handler(boost::bind(&SessionImpl::acceptAny, this, &message, true, _1));
+ return getIncoming(handler, timeout);
+ } catch (TransportFailure&) {
+ reconnect();
+ }
+ }
}
bool SessionImpl::fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout)
{
- IncomingMessageHandler handler(boost::bind(&SessionImpl::acceptAny, this, &message, false, _1));
- return getIncoming(handler, timeout);
+ qpid::sys::Mutex::ScopedLock l(lock);
+ while (true) {
+ try {
+ IncomingMessageHandler handler(boost::bind(&SessionImpl::acceptAny, this, &message, false, _1));
+ return getIncoming(handler, timeout);
+ } catch (TransportFailure&) {
+ reconnect();
+ }
+ }
+}
+
+void SessionImpl::syncImpl()
+{
+ session.sync();
+}
+
+void SessionImpl::flushImpl()
+{
+ session.flush();
+}
+
+
+void SessionImpl::commitImpl()
+{
+ incoming.accept();
+ session.txCommit();
+}
+
+void SessionImpl::rollbackImpl()
+{
+ for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) i->second.stop();
+ //ensure that stop has been processed and all previously sent
+ //messages are available for release:
+ session.sync();
+ incoming.releaseAll();
+ session.txRollback();
+ for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) i->second.start();
+}
+
+void SessionImpl::acknowledgeImpl()
+{
+ incoming.accept();
+}
+
+void SessionImpl::rejectImpl(qpid::messaging::Message& m)
+{
+ SequenceSet set;
+ set.add(MessageImplAccess::get(m).getInternalId());
+ session.messageReject(set);
}
qpid::messaging::Message SessionImpl::fetch(qpid::sys::Duration timeout)
@@ -244,28 +347,19 @@ qpid::messaging::Message SessionImpl::fetch(qpid::sys::Duration timeout)
void SessionImpl::receiverCancelled(const std::string& name)
{
- {
- qpid::sys::Mutex::ScopedLock l(lock);
- receivers.erase(name);
- }
+ receivers.erase(name);
session.sync();
incoming.releasePending(name);
}
void SessionImpl::senderCancelled(const std::string& name)
{
- qpid::sys::Mutex::ScopedLock l(lock);
senders.erase(name);
}
-void SessionImpl::sync()
+void SessionImpl::reconnect()
{
- session.sync();
-}
-
-void SessionImpl::flush()
-{
- session.flush();
+ connection.reconnect();
}
void* SessionImpl::getLastConfirmedSent()
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/cpp/src/qpid/client/amqp0_10/SessionImpl.h
index 7032a8db4a..1c7db17bbb 100644
--- a/cpp/src/qpid/client/amqp0_10/SessionImpl.h
+++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.h
@@ -43,6 +43,7 @@ class Session;
namespace client {
namespace amqp0_10 {
+class ConnectionImpl;
class ReceiverImpl;
class SenderImpl;
@@ -53,7 +54,7 @@ class SenderImpl;
class SessionImpl : public qpid::messaging::SessionImpl
{
public:
- SessionImpl(qpid::client::Session);
+ SessionImpl(ConnectionImpl&);
void commit();
void rollback();
void acknowledge();
@@ -81,26 +82,120 @@ class SessionImpl : public qpid::messaging::SessionImpl
void receiverCancelled(const std::string& name);
void senderCancelled(const std::string& name);
-
+
+ void setSession(qpid::client::Session);
+
+ template <class T> bool execute(T& f)
+ {
+ try {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ f();
+ return true;
+ } catch (TransportFailure&) {
+ reconnect();
+ return false;
+ }
+ }
+
static SessionImpl& convert(qpid::messaging::Session&);
- qpid::client::Session session;
private:
typedef std::map<std::string, qpid::messaging::Receiver> Receivers;
typedef std::map<std::string, qpid::messaging::Sender> Senders;
qpid::sys::Mutex lock;
+ ConnectionImpl& connection;
+ qpid::client::Session session;
AddressResolution resolver;
IncomingMessages incoming;
Receivers receivers;
Senders senders;
- qpid::messaging::Receiver addReceiver(const qpid::messaging::Address& address,
- const qpid::messaging::Filter* filter,
- const qpid::messaging::VariantMap& options);
bool acceptAny(qpid::messaging::Message*, bool, IncomingMessages::MessageTransfer&);
bool accept(ReceiverImpl*, qpid::messaging::Message*, bool, IncomingMessages::MessageTransfer&);
bool getIncoming(IncomingMessages::Handler& handler, qpid::sys::Duration timeout);
+ void reconnect();
+
+ void commitImpl();
+ void rollbackImpl();
+ void acknowledgeImpl();
+ void rejectImpl(qpid::messaging::Message&);
+ void closeImpl();
+ void syncImpl();
+ void flushImpl();
+ qpid::messaging::Sender createSenderImpl(const qpid::messaging::Address& address,
+ const qpid::messaging::VariantMap& options);
+ qpid::messaging::Receiver createReceiverImpl(const qpid::messaging::Address& address,
+ const qpid::messaging::Filter* filter,
+ const qpid::messaging::VariantMap& options);
+
+ //functors for public facing methods (allows locking and retry
+ //logic to be centralised)
+ struct Command
+ {
+ SessionImpl& impl;
+
+ Command(SessionImpl& i) : impl(i) {}
+ };
+
+ struct Commit : Command
+ {
+ Commit(SessionImpl& i) : Command(i) {}
+ void operator()() { impl.commitImpl(); }
+ };
+
+ struct Rollback : Command
+ {
+ Rollback(SessionImpl& i) : Command(i) {}
+ void operator()() { impl.rollbackImpl(); }
+ };
+
+ struct Acknowledge : Command
+ {
+ Acknowledge(SessionImpl& i) : Command(i) {}
+ void operator()() { impl.acknowledgeImpl(); }
+ };
+
+ struct Sync : Command
+ {
+ Sync(SessionImpl& i) : Command(i) {}
+ void operator()() { impl.syncImpl(); }
+ };
+
+ struct Flush : Command
+ {
+ Flush(SessionImpl& i) : Command(i) {}
+ void operator()() { impl.flushImpl(); }
+ };
+
+ struct Reject : Command
+ {
+ qpid::messaging::Message& message;
+
+ Reject(SessionImpl& i, qpid::messaging::Message& m) : Command(i), message(m) {}
+ void operator()() { impl.rejectImpl(message); }
+ };
+
+ struct CreateSender;
+ struct CreateReceiver;
+
+ //helper templates for some common patterns
+ template <class F> bool execute()
+ {
+ F f(*this);
+ return execute(f);
+ }
+
+ template <class F> void retry()
+ {
+ while (!execute<F>()) {}
+ }
+
+ template <class F, class P> bool execute1(P p)
+ {
+ F f(*this, p);
+ return execute(f);
+ }
};
}}} // namespace qpid::client::amqp0_10
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 2e04c85b93..a15ba3578c 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -270,6 +270,11 @@ check_PROGRAMS+=qrsh
qrsh_SOURCES=qrsh.cpp
qrsh_LDADD=$(lib_client)
+check_PROGRAMS+=qpid_stream
+qpid_stream_INCLUDES=$(PUBLIC_INCLUDES)
+qpid_stream_SOURCES=qpid_stream.cpp
+qpid_stream_LDADD=$(lib_client)
+
TESTS_ENVIRONMENT = \
VALGRIND=$(VALGRIND) \
diff --git a/cpp/src/tests/MessagingSessionTests.cpp b/cpp/src/tests/MessagingSessionTests.cpp
index be0b6f42d0..2cbcc8d500 100644
--- a/cpp/src/tests/MessagingSessionTests.cpp
+++ b/cpp/src/tests/MessagingSessionTests.cpp
@@ -183,6 +183,26 @@ QPID_AUTO_TEST_CASE(testSimpleSendReceive)
BOOST_CHECK_EQUAL(in.getBytes(), out.getBytes());
}
+QPID_AUTO_TEST_CASE(testSendReceiveHeaders)
+{
+ QueueFixture fix;
+ Sender sender = fix.session.createSender(fix.queue);
+ Message out("test-message");
+ for (uint i = 0; i < 10; ++i) {
+ out.getHeaders()["a"] = i;
+ sender.send(out);
+ }
+ Receiver receiver = fix.session.createReceiver(fix.queue);
+ Message in;
+ for (uint i = 0; i < 10; ++i) {
+ //Message in = receiver.fetch(5 * qpid::sys::TIME_SEC);
+ BOOST_CHECK(receiver.fetch(in, 5 * qpid::sys::TIME_SEC));
+ BOOST_CHECK_EQUAL(in.getBytes(), out.getBytes());
+ BOOST_CHECK_EQUAL(in.getHeaders()["a"].asUint32(), i);
+ fix.session.acknowledge();
+ }
+}
+
QPID_AUTO_TEST_CASE(testSenderError)
{
MessagingFixture fix;
diff --git a/cpp/src/tests/qpid_stream.cpp b/cpp/src/tests/qpid_stream.cpp
new file mode 100644
index 0000000000..8e02baa8a0
--- /dev/null
+++ b/cpp/src/tests/qpid_stream.cpp
@@ -0,0 +1,163 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/messaging/Connection.h>
+#include <qpid/messaging/Message.h>
+#include <qpid/messaging/Receiver.h>
+#include <qpid/messaging/Sender.h>
+#include <qpid/messaging/Session.h>
+#include <qpid/sys/Runnable.h>
+#include <qpid/sys/Thread.h>
+#include <qpid/sys/Time.h>
+#include <qpid/Options.h>
+#include <iostream>
+#include <string>
+
+using namespace qpid::messaging;
+using namespace qpid::sys;
+
+struct Args : public qpid::Options
+{
+ std::string url;
+ std::string address;
+ uint rate;
+ bool durable;
+
+ Args() : url("amqp:tcp:127.0.0.1:5672"), address("test-queue"), rate(1000), durable(false)
+ {
+ addOptions()
+ ("url", qpid::optValue(url, "URL"), "Url to connect to.")
+ ("address", qpid::optValue(address, "ADDRESS"), "Address to stream messages through.")
+ ("rate", qpid::optValue(rate, "msgs/sec"), "Rate at which to stream messages.")
+ ("durable", qpid::optValue(durable, "true|false"), "Mark messages as durable.");
+ }
+};
+
+Args opts;
+
+const std::string TIMESTAMP = "ts";
+
+uint64_t timestamp(const AbsTime& time)
+{
+ Duration t(time);
+ return t;
+}
+
+struct Client : Runnable
+{
+ virtual ~Client() {}
+ virtual void doWork(Session&) = 0;
+
+ void run()
+ {
+ try {
+ Connection connection = Connection::open(opts.url);
+ Session session = connection.newSession();
+ doWork(session);
+ session.close();
+ connection.close();
+ } catch(const std::exception& error) {
+ std::cout << error.what() << std::endl;
+ }
+ }
+
+ Thread thread;
+
+ void start() { thread = Thread(this); }
+ void join() { thread.join(); }
+};
+
+struct Publish : Client
+{
+ void doWork(Session& session)
+ {
+ Sender sender = session.createSender(opts.address);
+ Message msg;
+ uint64_t interval = TIME_SEC / opts.rate;
+ uint64_t sent = 0, missedRate = 0;
+ AbsTime start = now();
+ while (true) {
+ AbsTime sentAt = now();
+ msg.getHeaders()[TIMESTAMP] = timestamp(sentAt);
+ sender.send(msg);
+ ++sent;
+ AbsTime waitTill(start, sent*interval);
+ Duration delay(sentAt, waitTill);
+ if (delay < 0) {
+ ++missedRate;
+ } else {
+ qpid::sys::usleep(delay / TIME_USEC);
+ }
+ }
+ }
+};
+
+struct Consume : Client
+{
+ void doWork(Session& session)
+ {
+ Message msg;
+ uint64_t received = 0;
+ double minLatency = std::numeric_limits<double>::max();
+ double maxLatency = 0;
+ double totalLatency = 0;
+ Receiver receiver = session.createReceiver(opts.address);
+ while (receiver.fetch(msg)) {
+ session.acknowledge();//TODO: add batching option
+ ++received;
+ //calculate latency
+ uint64_t receivedAt = timestamp(now());
+ uint64_t sentAt = msg.getHeaders()[TIMESTAMP].asUint64();
+ double latency = ((double) (receivedAt - sentAt)) / TIME_MSEC;
+
+ //update avg, min & max
+ minLatency = std::min(minLatency, latency);
+ maxLatency = std::max(maxLatency, latency);
+ totalLatency += latency;
+
+ if (received % opts.rate == 0) {
+ std::cout << "count=" << received
+ << ", avg=" << (totalLatency/received)
+ << ", min=" << minLatency
+ << ", max=" << maxLatency << std::endl;
+ }
+ }
+ }
+};
+
+int main(int argc, char** argv)
+{
+ try {
+ opts.parse(argc, argv);
+ Publish publish;
+ Consume consume;
+ publish.start();
+ consume.start();
+ consume.join();
+ publish.join();
+ return 0;
+ } catch(const std::exception& error) {
+ std::cout << error.what() << std::endl;
+ }
+ return 1;
+}
+
+