summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client')
-rw-r--r--cpp/src/qpid/client/ClientChannel.cpp65
-rw-r--r--cpp/src/qpid/client/ClientChannel.h7
-rw-r--r--cpp/src/qpid/client/ClientConnection.cpp12
-rw-r--r--cpp/src/qpid/client/Completion.h53
-rw-r--r--cpp/src/qpid/client/CompletionTracker.cpp74
-rw-r--r--cpp/src/qpid/client/CompletionTracker.h31
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.cpp9
-rw-r--r--cpp/src/qpid/client/Execution.h40
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.cpp87
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.h37
-rw-r--r--cpp/src/qpid/client/Future.h97
-rw-r--r--cpp/src/qpid/client/FutureCompletion.cpp21
-rw-r--r--cpp/src/qpid/client/FutureCompletion.h10
-rw-r--r--cpp/src/qpid/client/FutureResponse.cpp5
-rw-r--r--cpp/src/qpid/client/FutureResponse.h4
-rw-r--r--cpp/src/qpid/client/FutureResult.cpp43
-rw-r--r--cpp/src/qpid/client/FutureResult.h46
-rw-r--r--cpp/src/qpid/client/Response.h19
-rw-r--r--cpp/src/qpid/client/ScopedAssociation.h53
-rw-r--r--cpp/src/qpid/client/SessionCore.cpp94
-rw-r--r--cpp/src/qpid/client/SessionCore.h22
-rw-r--r--cpp/src/qpid/client/TypedResult.h51
22 files changed, 694 insertions, 186 deletions
diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp
index cc2b7aedc8..1a0fd25bc3 100644
--- a/cpp/src/qpid/client/ClientChannel.cpp
+++ b/cpp/src/qpid/client/ClientChannel.cpp
@@ -56,7 +56,7 @@ class ScopedSync
Channel::Channel(bool _transactional, u_int16_t _prefetch) :
prefetch(_prefetch), transactional(_transactional), running(false),
- uniqueId(true)/*could eventually be the session id*/, nameCounter(0)
+ uniqueId(true)/*could eventually be the session id*/, nameCounter(0), active(false)
{
}
@@ -65,26 +65,25 @@ Channel::~Channel()
join();
}
-void Channel::open(ConnectionImpl::shared_ptr c, SessionCore::shared_ptr s)
+void Channel::open(const Session& s)
{
+ Mutex::ScopedLock l(lock);
if (isOpen())
THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel");
-
- connection = c;
- sessionCore = s;
- session = auto_ptr<Session>(new Session(c, s));
+ active = true;
+ session = s;
}
bool Channel::isOpen() const {
Mutex::ScopedLock l(lock);
- return connection;
+ return active;
}
void Channel::setQos() {
- session->basicQos(0, getPrefetch(), false);
+ session.basicQos(0, getPrefetch(), false);
if(isTransactional()) {
//I think this is wrong! should only send TxSelect once...
- session->txSelect();
+ session.txSelect();
}
}
@@ -95,13 +94,13 @@ void Channel::setPrefetch(uint16_t _prefetch){
void Channel::declareExchange(Exchange& exchange, bool synch){
FieldTable args;
- ScopedSync s(*session, synch);
- session->exchangeDeclare(0, exchange.getName(), exchange.getType(), empty, false, false, false, args);
+ ScopedSync s(session, synch);
+ session.exchangeDeclare(0, exchange.getName(), exchange.getType(), empty, false, false, false, args);
}
void Channel::deleteExchange(Exchange& exchange, bool synch){
- ScopedSync s(*session, synch);
- session->exchangeDelete(0, exchange.getName(), false);
+ ScopedSync s(session, synch);
+ session.exchangeDelete(0, exchange.getName(), false);
}
void Channel::declareQueue(Queue& queue, bool synch){
@@ -112,30 +111,30 @@ void Channel::declareQueue(Queue& queue, bool synch){
}
FieldTable args;
- ScopedSync s(*session, synch);
- session->queueDeclare(0, queue.getName(), empty, false/*passive*/, queue.isDurable(),
+ ScopedSync s(session, synch);
+ session.queueDeclare(0, queue.getName(), empty, false/*passive*/, queue.isDurable(),
queue.isExclusive(), queue.isAutoDelete(), args);
}
void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){
- ScopedSync s(*session, synch);
- session->queueDelete(0, queue.getName(), ifunused, ifempty);
+ ScopedSync s(session, synch);
+ session.queueDelete(0, queue.getName(), ifunused, ifempty);
}
void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){
string e = exchange.getName();
string q = queue.getName();
- ScopedSync s(*session, synch);
- session->queueBind(0, q, e, key, args);
+ ScopedSync s(session, synch);
+ session.queueBind(0, q, e, key, args);
}
void Channel::commit(){
- session->txCommit();
+ session.txCommit();
}
void Channel::rollback(){
- session->txRollback();
+ session.txRollback();
}
void Channel::consume(
@@ -155,8 +154,8 @@ void Channel::consume(
c.ackMode = ackMode;
c.lastDeliveryTag = 0;
}
- ScopedSync s(*session, synch);
- session->basicConsume(0, queue.getName(), tag, noLocal,
+ ScopedSync s(session, synch);
+ session.basicConsume(0, queue.getName(), tag, noLocal,
ackMode == NO_ACK, false, !synch,
fields ? *fields : FieldTable());
}
@@ -171,13 +170,13 @@ void Channel::cancel(const std::string& tag, bool synch) {
c = i->second;
consumers.erase(i);
}
- ScopedSync s(*session, synch);
- session->basicCancel(tag);
+ ScopedSync s(session, synch);
+ session.basicCancel(tag);
}
bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) {
- Response response = session->basicGet(0, queue.getName(), ackMode == NO_ACK);
- sessionCore->flush();//TODO: need to expose the ability to request completion info through session
+ Response response = session.basicGet(0, queue.getName(), ackMode == NO_ACK);
+ session.execution().sendFlushRequest();
if (response.isA<BasicGetEmptyBody>()) {
return false;
} else {
@@ -194,19 +193,15 @@ void Channel::publish(const Message& msg, const Exchange& exchange,
const string e = exchange.getName();
string key = routingKey;
- session->basicPublish(0, e, key, mandatory, immediate, msg);
+ session.basicPublish(0, e, key, mandatory, immediate, msg);
}
void Channel::close()
{
- session->close();
+ session.close();
{
Mutex::ScopedLock l(lock);
- if (connection);
- {
- sessionCore.reset();
- connection.reset();
- }
+ active = false;
}
stop();
}
@@ -232,7 +227,7 @@ void Channel::join() {
void Channel::run() {
try {
while (true) {
- FrameSet::shared_ptr content = session->get();
+ FrameSet::shared_ptr content = session.get();
//need to dispatch this to the relevant listener:
if (content->isA<BasicDeliverBody>()) {
ConsumerMap::iterator i = consumers.find(content->as<BasicDeliverBody>()->getConsumerTag());
diff --git a/cpp/src/qpid/client/ClientChannel.h b/cpp/src/qpid/client/ClientChannel.h
index c355fe007a..7ba4b0a246 100644
--- a/cpp/src/qpid/client/ClientChannel.h
+++ b/cpp/src/qpid/client/ClientChannel.h
@@ -79,18 +79,17 @@ class Channel : private sys::Runnable
bool running;
ConsumerMap consumers;
- ConnectionImpl::shared_ptr connection;
- std::auto_ptr<Session> session;
- SessionCore::shared_ptr sessionCore;
+ Session session;
framing::ChannelId channelId;
BlockingQueue<framing::FrameSet::shared_ptr> gets;
framing::Uuid uniqueId;
uint32_t nameCounter;
+ bool active;
void stop();
void setQos();
- void open(ConnectionImpl::shared_ptr, SessionCore::shared_ptr);
+ void open(const Session& session);
void closeInternal();
void join();
diff --git a/cpp/src/qpid/client/ClientConnection.cpp b/cpp/src/qpid/client/ClientConnection.cpp
index e1581503f9..8c5f83f9f5 100644
--- a/cpp/src/qpid/client/ClientConnection.cpp
+++ b/cpp/src/qpid/client/ClientConnection.cpp
@@ -25,6 +25,7 @@
#include "Connection.h"
#include "ClientChannel.h"
#include "ClientMessage.h"
+#include "ScopedAssociation.h"
#include "qpid/log/Logger.h"
#include "qpid/log/Options.h"
#include "qpid/log/Statement.h"
@@ -66,18 +67,15 @@ void Connection::open(
}
void Connection::openChannel(Channel& channel) {
- ChannelId id = ++channelIdCounter;
- SessionCore::shared_ptr session(new SessionCore(id, impl, max_frame_size));
- impl->allocated(session);
- channel.open(impl, session);
- session->open();
+ channel.open(newSession());
}
Session Connection::newSession() {
ChannelId id = ++channelIdCounter;
SessionCore::shared_ptr session(new SessionCore(id, impl, max_frame_size));
- impl->allocated(session);
- return Session(impl, session);
+ ScopedAssociation::shared_ptr assoc(new ScopedAssociation(session, impl));
+ session->open();
+ return Session(assoc);
}
void Connection::close()
diff --git a/cpp/src/qpid/client/Completion.h b/cpp/src/qpid/client/Completion.h
new file mode 100644
index 0000000000..000bba2138
--- /dev/null
+++ b/cpp/src/qpid/client/Completion.h
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _Completion_
+#define _Completion_
+
+#include <boost/shared_ptr.hpp>
+#include "Future.h"
+#include "SessionCore.h"
+
+namespace qpid {
+namespace client {
+
+class Completion
+{
+protected:
+ Future future;
+ SessionCore::shared_ptr session;
+
+public:
+ Completion(Future f, SessionCore::shared_ptr s) : future(f), session(s) {}
+
+ void sync()
+ {
+ future.sync(*session);
+ }
+
+ bool isComplete() {
+ return future.isComplete();
+ }
+};
+
+}}
+
+#endif
diff --git a/cpp/src/qpid/client/CompletionTracker.cpp b/cpp/src/qpid/client/CompletionTracker.cpp
index 996971dbd2..46a7384ac2 100644
--- a/cpp/src/qpid/client/CompletionTracker.cpp
+++ b/cpp/src/qpid/client/CompletionTracker.cpp
@@ -20,45 +20,101 @@
*/
#include "CompletionTracker.h"
+#include <algorithm>
using qpid::client::CompletionTracker;
using namespace qpid::framing;
using namespace boost;
+namespace
+{
+const std::string empty;
+}
+
CompletionTracker::CompletionTracker() {}
CompletionTracker::CompletionTracker(const SequenceNumber& m) : mark(m) {}
+void CompletionTracker::close()
+{
+ sys::Mutex::ScopedLock l(lock);
+ while (!listeners.empty()) {
+ Record r(listeners.front());
+ {
+ sys::Mutex::ScopedUnlock u(lock);
+ r.completed();
+ }
+ listeners.pop_front();
+ }
+}
void CompletionTracker::completed(const SequenceNumber& _mark)
{
sys::Mutex::ScopedLock l(lock);
mark = _mark;
- while (!listeners.empty() && !(listeners.front().first > mark)) {
- Listener f(listeners.front().second);
+ while (!listeners.empty() && !(listeners.front().id > mark)) {
+ Record r(listeners.front());
{
sys::Mutex::ScopedUnlock u(lock);
- f();
+ r.completed();
}
- listeners.pop();
+ listeners.pop_front();
+ }
+}
+
+void CompletionTracker::received(const SequenceNumber& id, const std::string& result)
+{
+ sys::Mutex::ScopedLock l(lock);
+ Listeners::iterator i = seek(id);
+ if (i != listeners.end() && i->id == id) {
+ i->received(result);
+ listeners.erase(i);
}
}
-void CompletionTracker::listen(const SequenceNumber& point, Listener listener)
+void CompletionTracker::listenForCompletion(const SequenceNumber& point, CompletionListener listener)
{
- if (!add(point, listener)) {
+ if (!add(Record(point, listener))) {
listener();
}
}
-bool CompletionTracker::add(const SequenceNumber& point, Listener listener)
+void CompletionTracker::listenForResult(const SequenceNumber& point, ResultListener listener)
+{
+ if (!add(Record(point, listener))) {
+ listener(empty);
+ }
+}
+
+bool CompletionTracker::add(const Record& record)
{
sys::Mutex::ScopedLock l(lock);
- if (point < mark) {
+ if (record.id < mark) {
return false;
} else {
- listeners.push(make_pair(point, listener));
+ //insert at the correct position
+ Listeners::iterator i = seek(record.id);
+ if (i == listeners.end()) i = listeners.begin();
+ listeners.insert(i, record);
+
return true;
}
}
+CompletionTracker::Listeners::iterator CompletionTracker::seek(const framing::SequenceNumber& point)
+{
+ Listeners::iterator i = listeners.begin();
+ while (i != listeners.end() && i->id < point) i++;
+ return i;
+}
+
+void CompletionTracker::Record::completed()
+{
+ if (f) f();
+ else if(g) g(empty);//won't get a result if command is now complete
+}
+
+void CompletionTracker::Record::received(const std::string& result)
+{
+ if (g) g(result);
+}
diff --git a/cpp/src/qpid/client/CompletionTracker.h b/cpp/src/qpid/client/CompletionTracker.h
index 30999b4184..05cdc45c9f 100644
--- a/cpp/src/qpid/client/CompletionTracker.h
+++ b/cpp/src/qpid/client/CompletionTracker.h
@@ -19,7 +19,7 @@
*
*/
-#include <queue>
+#include <list>
#include <boost/function.hpp>
#include "qpid/framing/amqp_framing.h"
#include "qpid/framing/SequenceNumber.h"
@@ -34,19 +34,40 @@ namespace client {
class CompletionTracker
{
public:
- typedef boost::function<void()> Listener;
+ //typedef boost::function<void()> CompletionListener;
+ typedef boost::function0<void> CompletionListener;
+ typedef boost::function<void(const std::string&)> ResultListener;
CompletionTracker();
CompletionTracker(const framing::SequenceNumber& mark);
void completed(const framing::SequenceNumber& mark);
- void listen(const framing::SequenceNumber& point, Listener l);
+ void received(const framing::SequenceNumber& id, const std::string& result);
+ void listenForCompletion(const framing::SequenceNumber& point, CompletionListener l);
+ void listenForResult(const framing::SequenceNumber& point, ResultListener l);
+ void close();
private:
+ struct Record
+ {
+ framing::SequenceNumber id;
+ CompletionListener f;
+ ResultListener g;
+
+ Record(const framing::SequenceNumber& _id, CompletionListener l) : id(_id), f(l) {}
+ Record(const framing::SequenceNumber& _id, ResultListener l) : id(_id), g(l) {}
+ void completed();
+ void received(const std::string& result);
+
+ };
+
+ typedef std::list<Record> Listeners;
+
sys::Mutex lock;
framing::SequenceNumber mark;
- std::queue< std::pair<framing::SequenceNumber, Listener> > listeners;
+ Listeners listeners;
- bool add(const framing::SequenceNumber& point, Listener l);
+ bool add(const Record& r);
+ Listeners::iterator seek(const framing::SequenceNumber&);
};
}
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp
index b4d2156c31..5ff34cde4e 100644
--- a/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -60,11 +60,11 @@ void ConnectionImpl::handle(framing::AMQFrame& frame)
void ConnectionImpl::incoming(framing::AMQFrame& frame)
{
uint16_t id = frame.getChannel();
- SessionCore::shared_ptr session = sessions[id];
- if (!session) {
+ SessionMap::iterator i = sessions.find(id);
+ if (i == sessions.end()) {
throw ConnectionException(504, (boost::format("Invalid channel number %g") % id).str());
}
- session->handle(frame);
+ i->second->handle(frame);
}
void ConnectionImpl::open(const std::string& host, int port,
@@ -111,7 +111,8 @@ void ConnectionImpl::idleOut()
connector->send(frame);
}
-void ConnectionImpl::shutdown() {
+void ConnectionImpl::shutdown()
+{
//this indicates that the socket to the server has closed
for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); i++) {
i->second->closed(0, "Unexpected socket closure.");
diff --git a/cpp/src/qpid/client/Execution.h b/cpp/src/qpid/client/Execution.h
new file mode 100644
index 0000000000..1e8c48734d
--- /dev/null
+++ b/cpp/src/qpid/client/Execution.h
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _Execution_
+#define _Execution_
+
+#include "qpid/framing/SequenceNumber.h"
+
+namespace qpid {
+namespace client {
+
+class Execution
+{
+public:
+ virtual ~Execution() {}
+ virtual void sendSyncRequest() = 0;
+ virtual void sendFlushRequest() = 0;
+ virtual void completed(const framing::SequenceNumber& id, bool cumulative, bool send) = 0;
+};
+
+}}
+
+#endif
diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp
index d10b3d3fe8..1520ba2272 100644
--- a/cpp/src/qpid/client/ExecutionHandler.cpp
+++ b/cpp/src/qpid/client/ExecutionHandler.cpp
@@ -97,8 +97,7 @@ void ExecutionHandler::complete(uint32_t cumulative, const SequenceNumberSet& ra
void ExecutionHandler::flush()
{
- //send completion
- incoming.lwm = incoming.hwm;
+ sendCompletion();
}
void ExecutionHandler::noop()
@@ -106,48 +105,88 @@ void ExecutionHandler::noop()
//do nothing
}
-void ExecutionHandler::result(uint32_t /*command*/, const std::string& /*data*/)
+void ExecutionHandler::result(uint32_t command, const std::string& data)
{
- //TODO: need to signal the result to the appropriate listener
+ completion.received(command, data);
}
void ExecutionHandler::sync()
{
- //TODO: implement (the application is in charge of completion of
- //some commands, so need to track completion for them).
+ //TODO: implement - need to note the mark requested and then
+ //remember to send a response when that point is reached
+}
- //This shouldn't ever need to be called by the server (in my
- //opinion) as the server never needs to synchronise with the
- //clients execution
+void ExecutionHandler::flushTo(const framing::SequenceNumber& point)
+{
+ if (point > outgoing.lwm) {
+ sendFlushRequest();
+ }
}
-void ExecutionHandler::sendFlush()
+void ExecutionHandler::sendFlushRequest()
{
AMQFrame frame(0, ExecutionFlushBody());
- out(frame);
+ out(frame);
}
-void ExecutionHandler::send(const AMQBody& command, CompletionTracker::Listener f, Correlator::Listener g)
+void ExecutionHandler::syncTo(const framing::SequenceNumber& point)
{
- //allocate id:
- ++outgoing.hwm;
- //register listeners if necessary:
- if (f) {
- completion.listen(outgoing.hwm, f);
- }
- if (g) {
- correlation.listen(g);
+ if (point > outgoing.lwm) {
+ sendSyncRequest();
+ }
+}
+
+
+void ExecutionHandler::sendSyncRequest()
+{
+ AMQFrame frame(0, ExecutionSyncBody());
+ out(frame);
+}
+
+void ExecutionHandler::completed(const SequenceNumber& id, bool cumulative, bool send)
+{
+ if (id > completionStatus.mark) {
+ if (cumulative) {
+ completionStatus.update(completionStatus.mark, id);
+ } else {
+ completionStatus.update(id, id);
+ }
}
+ if (send) {
+ sendCompletion();
+ }
+}
- AMQFrame frame(0/*id will be filled in be channel handler*/, command);
+
+void ExecutionHandler::sendCompletion()
+{
+ SequenceNumberSet range;
+ completionStatus.collectRanges(range);
+ AMQFrame frame(0, ExecutionCompleteBody(version, completionStatus.mark.getValue(), range));
+ out(frame);
+}
+
+SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener l)
+{
+ SequenceNumber id = ++outgoing.hwm;
+ if(l) {
+ completion.listenForResult(id, l);
+ }
+ AMQFrame frame(0/*channel will be filled in be channel handler*/, command);
out(frame);
+ return id;
}
-void ExecutionHandler::sendContent(const AMQBody& command, const BasicHeaderProperties& headers, const std::string& data,
- CompletionTracker::Listener f, Correlator::Listener g)
+SequenceNumber ExecutionHandler::send(const AMQBody& command, const MethodContent& content,
+ CompletionTracker::ResultListener l)
{
- send(command, f, g);
+ SequenceNumber id = send(command, l);
+ sendContent(dynamic_cast<const BasicHeaderProperties&>(content.getMethodHeaders()), content.getData());
+ return id;
+}
+void ExecutionHandler::sendContent(const BasicHeaderProperties& headers, const std::string& data)
+{
AMQHeaderBody header;
BasicHeaderProperties::copy(*header.get<BasicHeaderProperties>(true), headers);
header.get<BasicHeaderProperties>(true)->setContentLength(data.size());
diff --git a/cpp/src/qpid/client/ExecutionHandler.h b/cpp/src/qpid/client/ExecutionHandler.h
index f740e14185..a42697e26a 100644
--- a/cpp/src/qpid/client/ExecutionHandler.h
+++ b/cpp/src/qpid/client/ExecutionHandler.h
@@ -22,28 +22,34 @@
#define _ExecutionHandler_
#include <queue>
+#include "qpid/framing/AccumulatedAck.h"
#include "qpid/framing/AMQP_ServerOperations.h"
#include "qpid/framing/FrameSet.h"
+#include "qpid/framing/MethodContent.h"
#include "qpid/framing/SequenceNumber.h"
#include "BlockingQueue.h"
#include "ChainableFrameHandler.h"
#include "CompletionTracker.h"
#include "Correlator.h"
+#include "Execution.h"
namespace qpid {
namespace client {
class ExecutionHandler :
private framing::AMQP_ServerOperations::ExecutionHandler,
- public ChainableFrameHandler
+ public ChainableFrameHandler,
+ public Execution
{
framing::Window incoming;
framing::Window outgoing;
framing::FrameSet::shared_ptr arriving;
Correlator correlation;
CompletionTracker completion;
+ BlockingQueue<framing::FrameSet::shared_ptr> received;
framing::ProtocolVersion version;
uint64_t maxFrameSize;
+ framing::AccumulatedAck completionStatus;
void complete(uint32_t mark, const framing::SequenceNumberSet& range);
void flush();
@@ -51,22 +57,29 @@ class ExecutionHandler :
void result(uint32_t command, const std::string& data);
void sync();
+ void sendCompletion();
+
+ void sendContent(const framing::BasicHeaderProperties& headers, const std::string& data);
+
public:
- BlockingQueue<framing::FrameSet::shared_ptr> received;
+ typedef CompletionTracker::ResultListener ResultListener;
ExecutionHandler(uint64_t maxFrameSize = 65536);
- void setMaxFrameSize(uint64_t size) { maxFrameSize = size; }
-
void handle(framing::AMQFrame& frame);
- void send(const framing::AMQBody& command,
- CompletionTracker::Listener f = CompletionTracker::Listener(),
- Correlator::Listener g = Correlator::Listener());
- void sendContent(const framing::AMQBody& command,
- const framing::BasicHeaderProperties& headers, const std::string& data,
- CompletionTracker::Listener f = CompletionTracker::Listener(),
- Correlator::Listener g = Correlator::Listener());
- void sendFlush();
+ framing::SequenceNumber send(const framing::AMQBody& command, ResultListener=ResultListener());
+ framing::SequenceNumber send(const framing::AMQBody& command, const framing::MethodContent& content,
+ ResultListener=ResultListener());
+ void sendSyncRequest();
+ void sendFlushRequest();
+ void completed(const framing::SequenceNumber& id, bool cumulative, bool send);
+ void syncTo(const framing::SequenceNumber& point);
+ void flushTo(const framing::SequenceNumber& point);
+
+ void setMaxFrameSize(uint64_t size) { maxFrameSize = size; }
+ Correlator& getCorrelator() { return correlation; }
+ CompletionTracker& getCompletionTracker() { return completion; }
+ BlockingQueue<framing::FrameSet::shared_ptr>& getReceived() { return received; }
};
}}
diff --git a/cpp/src/qpid/client/Future.h b/cpp/src/qpid/client/Future.h
new file mode 100644
index 0000000000..c2f3b426da
--- /dev/null
+++ b/cpp/src/qpid/client/Future.h
@@ -0,0 +1,97 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _Future_
+#define _Future_
+
+#include <boost/bind.hpp>
+#include <boost/shared_ptr.hpp>
+#include "qpid/Exception.h"
+#include "qpid/framing/SequenceNumber.h"
+#include "qpid/framing/StructHelper.h"
+#include "FutureCompletion.h"
+#include "FutureResponse.h"
+#include "FutureResult.h"
+#include "SessionCore.h"
+
+namespace qpid {
+namespace client {
+
+class Future : private framing::StructHelper
+{
+ framing::SequenceNumber command;
+ boost::shared_ptr<FutureResponse> response;
+ boost::shared_ptr<FutureResult> result;
+ bool complete;
+
+public:
+ Future() : complete(false) {}
+ Future(const framing::SequenceNumber& id) : command(id), complete(false) {}
+
+ void sync(SessionCore& session)
+ {
+ if (!complete) {
+ FutureCompletion callback;
+ session.getExecution().flushTo(command);
+ session.getExecution().getCompletionTracker().listenForCompletion(
+ command,
+ boost::bind(&FutureCompletion::completed, &callback)
+ );
+ callback.waitForCompletion();
+ session.checkClosed();
+ complete = true;
+ }
+ }
+
+ framing::AMQMethodBody* getResponse(SessionCore& session)
+ {
+ if (response) {
+ session.getExecution().getCompletionTracker().listenForCompletion(
+ command,
+ boost::bind(&FutureResponse::completed, response)
+ );
+ return response->getResponse(session);
+ } else {
+ throw Exception("Response not expected");
+ }
+ }
+
+ template <class T> void decodeResult(T& value, SessionCore& session)
+ {
+ if (result) {
+ decode(value, result->getResult(session));
+ } else {
+ throw Exception("Result not expected");
+ }
+ }
+
+ bool isComplete() {
+ return complete;
+ }
+
+ void setCommandId(const framing::SequenceNumber& id) { command = id; }
+ void setFutureResponse(boost::shared_ptr<FutureResponse> r) { response = r; }
+ void setFutureResult(boost::shared_ptr<FutureResult> r) { result = r; }
+};
+
+}}
+
+#endif
diff --git a/cpp/src/qpid/client/FutureCompletion.cpp b/cpp/src/qpid/client/FutureCompletion.cpp
index 6fc3d5f088..130c65d6aa 100644
--- a/cpp/src/qpid/client/FutureCompletion.cpp
+++ b/cpp/src/qpid/client/FutureCompletion.cpp
@@ -24,9 +24,9 @@
using namespace qpid::client;
using namespace qpid::sys;
-FutureCompletion::FutureCompletion() : complete(false), closed(false), code(0) {}
+FutureCompletion::FutureCompletion() : complete(false) {}
-bool FutureCompletion::isComplete()
+bool FutureCompletion::isComplete() const
{
Monitor::ScopedLock l(lock);
return complete;
@@ -39,23 +39,10 @@ void FutureCompletion::completed()
lock.notifyAll();
}
-void FutureCompletion::waitForCompletion()
+void FutureCompletion::waitForCompletion() const
{
Monitor::ScopedLock l(lock);
- while (!complete && !closed) {
+ while (!complete) {
lock.wait();
}
- if (closed) {
- throw ChannelException(code, text);
- }
-}
-
-void FutureCompletion::close(uint16_t _code, const std::string& _text)
-{
- Monitor::ScopedLock l(lock);
- complete = true;
- closed = true;
- code = _code;
- text = _text;
- lock.notifyAll();
}
diff --git a/cpp/src/qpid/client/FutureCompletion.h b/cpp/src/qpid/client/FutureCompletion.h
index 3487a0910a..1897230230 100644
--- a/cpp/src/qpid/client/FutureCompletion.h
+++ b/cpp/src/qpid/client/FutureCompletion.h
@@ -31,19 +31,15 @@ namespace client {
class FutureCompletion
{
protected:
- sys::Monitor lock;
+ mutable sys::Monitor lock;
bool complete;
- bool closed;
- uint16_t code;
- std::string text;
public:
FutureCompletion();
virtual ~FutureCompletion(){}
- bool isComplete();
- void waitForCompletion();
+ bool isComplete() const;
+ void waitForCompletion() const;
void completed();
- void close(uint16_t code, const std::string& text);
};
}}
diff --git a/cpp/src/qpid/client/FutureResponse.cpp b/cpp/src/qpid/client/FutureResponse.cpp
index e63dc9c192..afdd35c5eb 100644
--- a/cpp/src/qpid/client/FutureResponse.cpp
+++ b/cpp/src/qpid/client/FutureResponse.cpp
@@ -21,14 +21,17 @@
#include "FutureResponse.h"
+#include "SessionCore.h"
+
using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::sys;
-AMQMethodBody* FutureResponse::getResponse()
+AMQMethodBody* FutureResponse::getResponse(SessionCore& session)
{
waitForCompletion();
+ session.checkClosed();
return response.get();
}
diff --git a/cpp/src/qpid/client/FutureResponse.h b/cpp/src/qpid/client/FutureResponse.h
index 75b1f72c04..1e8a7eb456 100644
--- a/cpp/src/qpid/client/FutureResponse.h
+++ b/cpp/src/qpid/client/FutureResponse.h
@@ -29,11 +29,13 @@
namespace qpid {
namespace client {
+class SessionCore;
+
class FutureResponse : public FutureCompletion
{
framing::MethodHolder response;
public:
- framing::AMQMethodBody* getResponse();
+ framing::AMQMethodBody* getResponse(SessionCore& session);
void received(framing::AMQMethodBody* response);
};
diff --git a/cpp/src/qpid/client/FutureResult.cpp b/cpp/src/qpid/client/FutureResult.cpp
new file mode 100644
index 0000000000..a523129206
--- /dev/null
+++ b/cpp/src/qpid/client/FutureResult.cpp
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "FutureResult.h"
+
+#include "SessionCore.h"
+
+using namespace qpid::client;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+const std::string& FutureResult::getResult(SessionCore& session) const
+{
+ waitForCompletion();
+ session.checkClosed();
+ return result;
+}
+
+void FutureResult::received(const std::string& r)
+{
+ Monitor::ScopedLock l(lock);
+ result = r;
+ complete = true;
+ lock.notifyAll();
+}
diff --git a/cpp/src/qpid/client/FutureResult.h b/cpp/src/qpid/client/FutureResult.h
new file mode 100644
index 0000000000..3117b63802
--- /dev/null
+++ b/cpp/src/qpid/client/FutureResult.h
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _FutureResult_
+#define _FutureResult_
+
+#include <string>
+#include "qpid/framing/amqp_framing.h"
+#include "FutureCompletion.h"
+
+namespace qpid {
+namespace client {
+
+class SessionCore;
+
+class FutureResult : public FutureCompletion
+{
+ std::string result;
+public:
+ const std::string& getResult(SessionCore& session) const;
+ void received(const std::string& result);
+};
+
+}}
+
+
+
+#endif
diff --git a/cpp/src/qpid/client/Response.h b/cpp/src/qpid/client/Response.h
index 7866df916c..f9a9f97b75 100644
--- a/cpp/src/qpid/client/Response.h
+++ b/cpp/src/qpid/client/Response.h
@@ -24,34 +24,27 @@
#include <boost/shared_ptr.hpp>
#include "qpid/framing/amqp_framing.h"
-#include "FutureResponse.h"
+#include "Completion.h"
namespace qpid {
namespace client {
-class Response
+class Response : public Completion
{
- boost::shared_ptr<FutureResponse> future;
-
public:
- Response(boost::shared_ptr<FutureResponse> f) : future(f) {}
+ Response(Future f, SessionCore::shared_ptr s) : Completion(f, s) {}
template <class T> T& as()
{
- framing::AMQMethodBody* response(future->getResponse());
- assert(response);
+ framing::AMQMethodBody* response(future.getResponse(*session));
return *boost::polymorphic_downcast<T*>(response);
}
+
template <class T> bool isA()
{
- framing::AMQMethodBody* response(future->getResponse());
+ framing::AMQMethodBody* response(future.getResponse(*session));
return response && response->isA<T>();
}
-
- void sync()
- {
- return future->waitForCompletion();
- }
};
}}
diff --git a/cpp/src/qpid/client/ScopedAssociation.h b/cpp/src/qpid/client/ScopedAssociation.h
new file mode 100644
index 0000000000..861a28c0f8
--- /dev/null
+++ b/cpp/src/qpid/client/ScopedAssociation.h
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ScopedAssociation_
+#define _ScopedAssociation_
+
+#include "ConnectionImpl.h"
+#include "SessionCore.h"
+
+namespace qpid {
+namespace client {
+
+struct ScopedAssociation
+{
+ typedef boost::shared_ptr<ScopedAssociation> shared_ptr;
+
+ SessionCore::shared_ptr session;
+ ConnectionImpl::shared_ptr connection;
+
+ ScopedAssociation() {}
+
+ ScopedAssociation(SessionCore::shared_ptr s, ConnectionImpl::shared_ptr c) : session(s), connection(c)
+ {
+ connection->allocated(session);
+ }
+
+ ~ScopedAssociation()
+ {
+ if (connection && session) connection->released(session);
+ }
+};
+
+
+}}
+
+#endif
diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp
index 1b04e74af4..8dfe42989b 100644
--- a/cpp/src/qpid/client/SessionCore.cpp
+++ b/cpp/src/qpid/client/SessionCore.cpp
@@ -21,12 +21,15 @@
#include "SessionCore.h"
#include <boost/bind.hpp>
+#include "Future.h"
+#include "FutureResponse.h"
+#include "FutureResult.h"
using namespace qpid::client;
using namespace qpid::framing;
SessionCore::SessionCore(uint16_t _id, boost::shared_ptr<framing::FrameHandler> out,
- uint64_t maxFrameSize) : l3(maxFrameSize), id(_id), sync(false)
+ uint64_t maxFrameSize) : l3(maxFrameSize), id(_id), sync(false), isClosed(false)
{
l2.out = boost::bind(&FrameHandler::handle, out, _1);
l2.in = boost::bind(&ExecutionHandler::handle, &l3, _1);
@@ -39,47 +42,15 @@ void SessionCore::open()
l2.open(id);
}
-void SessionCore::flush()
+ExecutionHandler& SessionCore::getExecution()
{
- l3.sendFlush();
-}
-
-Response SessionCore::send(const AMQMethodBody& method, bool expectResponse)
-{
- boost::shared_ptr<FutureResponse> f(futures.createResponse());
- if (expectResponse) {
- l3.send(method, boost::bind(&FutureResponse::completed, f), boost::bind(&FutureResponse::received, f, _1));
- } else {
- l3.send(method, boost::bind(&FutureResponse::completed, f));
- }
- if (sync) {
- flush();
- f->waitForCompletion();
- }
- return Response(f);
-}
-
-Response SessionCore::send(const AMQMethodBody& method, const MethodContent& content, bool expectResponse)
-{
- //TODO: lots of duplication between these two send methods; refactor
- boost::shared_ptr<FutureResponse> f(futures.createResponse());
- if (expectResponse) {
- l3.sendContent(method, dynamic_cast<const BasicHeaderProperties&>(content.getMethodHeaders()), content.getData(),
- boost::bind(&FutureResponse::completed, f), boost::bind(&FutureResponse::received, f, _1));
- } else {
- l3.sendContent(method, dynamic_cast<const BasicHeaderProperties&>(content.getMethodHeaders()), content.getData(),
- boost::bind(&FutureResponse::completed, f));
- }
- if (sync) {
- flush();
- f->waitForCompletion();
- }
- return Response(f);
+ checkClosed();
+ return l3;
}
FrameSet::shared_ptr SessionCore::get()
{
- return l3.received.pop();
+ return l3.getReceived().pop();
}
void SessionCore::setSync(bool s)
@@ -95,12 +66,13 @@ bool SessionCore::isSync()
void SessionCore::close()
{
l2.close();
- l3.received.close();
+ stop();
}
void SessionCore::stop()
{
- l3.received.close();
+ l3.getReceived().close();
+ l3.getCompletionTracker().close();
}
void SessionCore::handle(AMQFrame& frame)
@@ -110,6 +82,46 @@ void SessionCore::handle(AMQFrame& frame)
void SessionCore::closed(uint16_t code, const std::string& text)
{
- l3.received.close();
- futures.close(code, text);
+ stop();
+
+ isClosed = true;
+ reason.code = code;
+ reason.text = text;
+}
+
+void SessionCore::checkClosed()
+{
+ if (isClosed) {
+ throw ChannelException(reason.code, reason.text);
+ }
+}
+
+Future SessionCore::send(const AMQBody& command)
+{
+ Future f;
+ //any result/response listeners must be set before the command is sent
+ if (command.getMethod()->resultExpected()) {
+ boost::shared_ptr<FutureResult> r(new FutureResult());
+ f.setFutureResult(r);
+ //result listener is tied to command id, and is set when that
+ //is allocated by the execution handler, so pass it to send
+ f.setCommandId(l3.send(command, boost::bind(&FutureResult::received, r, _1)));
+ } else {
+ if (command.getMethod()->responseExpected()) {
+ boost::shared_ptr<FutureResponse> r(new FutureResponse());
+ f.setFutureResponse(r);
+ l3.getCorrelator().listen(boost::bind(&FutureResponse::received, r, _1));
+ }
+
+ f.setCommandId(l3.send(command));
+ }
+ return f;
+}
+
+Future SessionCore::send(const AMQBody& command, const MethodContent& content)
+{
+ //content bearing methods don't currently have responses or
+ //results, if that changes should follow procedure for the other
+ //send method impl:
+ return Future(l3.send(command, content));
}
diff --git a/cpp/src/qpid/client/SessionCore.h b/cpp/src/qpid/client/SessionCore.h
index 0febb956b9..80fe13715f 100644
--- a/cpp/src/qpid/client/SessionCore.h
+++ b/cpp/src/qpid/client/SessionCore.h
@@ -22,6 +22,7 @@
#ifndef _SessionCore_
#define _SessionCore_
+#include <boost/function.hpp>
#include <boost/shared_ptr.hpp>
#include "qpid/framing/AMQMethodBody.h"
#include "qpid/framing/FrameHandler.h"
@@ -29,35 +30,44 @@
#include "qpid/framing/MethodContent.h"
#include "ChannelHandler.h"
#include "ExecutionHandler.h"
-#include "FutureFactory.h"
-#include "Response.h"
namespace qpid {
namespace client {
+class Future;
+
class SessionCore : public framing::FrameHandler
{
+ struct Reason
+ {
+ uint16_t code;
+ std::string text;
+ };
+
ExecutionHandler l3;
ChannelHandler l2;
- FutureFactory futures;
const uint16_t id;
bool sync;
+ bool isClosed;
+ Reason reason;
public:
typedef boost::shared_ptr<SessionCore> shared_ptr;
SessionCore(uint16_t id, boost::shared_ptr<framing::FrameHandler> out, uint64_t maxFrameSize);
- Response send(const framing::AMQMethodBody& method, bool expectResponse = false);
- Response send(const framing::AMQMethodBody& method, const framing::MethodContent& content, bool expectResponse = false);
framing::FrameSet::shared_ptr get();
uint16_t getId() const { return id; }
void setSync(bool);
bool isSync();
- void flush();
void open();
void close();
void stop();
void closed(uint16_t code, const std::string& text);
+ void checkClosed();
+ ExecutionHandler& getExecution();
+
+ Future send(const framing::AMQBody& command);
+ Future send(const framing::AMQBody& command, const framing::MethodContent& content);
//for incoming frames:
void handle(framing::AMQFrame& frame);
diff --git a/cpp/src/qpid/client/TypedResult.h b/cpp/src/qpid/client/TypedResult.h
new file mode 100644
index 0000000000..38892c42bd
--- /dev/null
+++ b/cpp/src/qpid/client/TypedResult.h
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _TypedResult_
+#define _TypedResult_
+
+#include "Completion.h"
+
+namespace qpid {
+namespace client {
+
+template <class T> class TypedResult : public Completion
+{
+ T result;
+ bool decoded;
+
+public:
+ TypedResult(Future f, SessionCore::shared_ptr s) : Completion(f, s), decoded(false) {}
+
+ T& get()
+ {
+ if (!decoded) {
+ future.decodeResult(result, *session);
+ decoded = true;
+ }
+
+ return result;
+ }
+};
+
+}}
+
+#endif