diff options
| author | Gordon Sim <gsim@apache.org> | 2008-04-20 12:10:37 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2008-04-20 12:10:37 +0000 |
| commit | 0637677cf6653256b67c82dcb74f35133601220c (patch) | |
| tree | 8507bb8373e8b6dfd8c9b96fcb4b262fd4d61501 /cpp/src/qpid/client | |
| parent | 48dab065ef526f68a5a7d4c4ba22c5b8b2e2e026 (diff) | |
| download | qpid-python-0637677cf6653256b67c82dcb74f35133601220c.tar.gz | |
QPID-920: converted c++ client to use final 0-10 protocol
* connection handler converted to using invoker & proxy and updated to final method defs
* SessionCore & ExecutionHandler replace by SessionImpl
* simplified handling of completion & results, removed handling of responses
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@649915 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
39 files changed, 1256 insertions, 1727 deletions
diff --git a/cpp/src/qpid/client/Channel.cpp b/cpp/src/qpid/client/Channel.cpp index 4af69c8552..ae9f78483d 100644 --- a/cpp/src/qpid/client/Channel.cpp +++ b/cpp/src/qpid/client/Channel.cpp @@ -26,7 +26,6 @@ #include "Message.h" #include "Connection.h" #include "Demux.h" -#include "FutureResponse.h" #include "MessageListener.h" #include "MessageQueue.h" #include <boost/format.hpp> @@ -47,9 +46,17 @@ const std::string empty; class ScopedSync { Session& session; + const bool change; + const bool value; public: - ScopedSync(Session& s, bool enabled = true) : session(s) { session.setSynchronous(enabled); } - ~ScopedSync() { session.setSynchronous(false); } + ScopedSync(Session& s, bool desired = true) : session(s), change(s.isSynchronous() != desired), value(desired) + { + if (change) session.setSynchronous(value); + } + ~ScopedSync() + { + if (change) session.setSynchronous(!value); + } }; Channel::Channel(bool _transactional, u_int16_t _prefetch) : @@ -116,7 +123,7 @@ void Channel::bind(const Exchange& exchange, const Queue& queue, const std::stri string e = exchange.getName(); string q = queue.getName(); ScopedSync s(session, synch); - session.queueBind(0, q, e, key, args); + session.exchangeBind(q, e, key, args); } void Channel::commit(){ @@ -129,7 +136,7 @@ void Channel::rollback(){ void Channel::consume( Queue& _queue, const std::string& tag, MessageListener* listener, - AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) { + AckMode ackMode, bool noLocal, bool synch, FieldTable* fields) { if (tag.empty()) { throw Exception("A tag must be specified for a consumer."); @@ -144,13 +151,18 @@ void Channel::consume( c.ackMode = ackMode; c.count = 0; } - uint8_t confirmMode = ackMode == NO_ACK ? 0 : 1; + uint8_t confirmMode = ackMode == NO_ACK ? 1 : 0; ScopedSync s(session, synch); - session.messageSubscribe(0, _queue.getName(), tag, noLocal, + FieldTable ft; + FieldTable* ftptr = fields ? fields : &ft; + if (noLocal) { + ftptr->setString("qpid.no-local","yes"); + } + session.messageSubscribe(_queue.getName(), tag, confirmMode, 0/*pre-acquire*/, - false, fields ? *fields : FieldTable()); + false, "", 0, *ftptr); if (!prefetch) { - session.messageFlowMode(tag, 0/*credit based*/); + session.messageSetFlowMode(tag, 0/*credit based*/); } //allocate some credit: @@ -177,17 +189,22 @@ bool Channel::get(Message& msg, const Queue& _queue, AckMode ackMode) { ScopedDivert handler(tag, session.getExecution().getDemux()); Demux::QueuePtr incoming = handler.getQueue(); - session.messageSubscribe(destination=tag, queue=_queue.getName(), confirmMode=(ackMode == NO_ACK ? 0 : 1)); + session.messageSubscribe(destination=tag, queue=_queue.getName(), acceptMode=(ackMode == NO_ACK ? 1 : 0)); session.messageFlow(tag, 1/*BYTES*/, 0xFFFFFFFF); session.messageFlow(tag, 0/*MESSAGES*/, 1); - Completion status = session.messageFlush(tag); - status.sync(); + { + ScopedSync s(session); + session.messageFlush(tag); + } session.messageCancel(tag); FrameSet::shared_ptr p; if (incoming->tryPop(p)) { msg.populate(*p); - if (ackMode == AUTO_ACK) msg.acknowledge(session, false, true); + if (ackMode == AUTO_ACK) { + msg.setSession(session); + msg.acknowledge(false, true); + } return true; } else @@ -243,7 +260,7 @@ void Channel::dispatch(FrameSet& content, const std::string& destination) bool send = i->second.ackMode == AUTO_ACK || (prefetch && ++(i->second.count) > (prefetch / 2)); if (send) i->second.count = 0; - session.getExecution().completed(content.getId(), true, send); + session.getExecution().markCompleted(content.getId(), true, send); } } else { QPID_LOG(warning, "Dropping message for unrecognised consumer: " << destination); diff --git a/cpp/src/qpid/client/Channel.h b/cpp/src/qpid/client/Channel.h index 2cda97dc63..1c3c2c9ae8 100644 --- a/cpp/src/qpid/client/Channel.h +++ b/cpp/src/qpid/client/Channel.h @@ -256,7 +256,7 @@ class Channel : private sys::Runnable void consume( Queue& queue, const std::string& tag, MessageListener* listener, AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true, - const framing::FieldTable* fields = 0); + framing::FieldTable* fields = 0); /** * Cancels a subscription previously set up through a call to consume(). diff --git a/cpp/src/qpid/client/Completion.h b/cpp/src/qpid/client/Completion.h index 4d324aaf28..19d5b31777 100644 --- a/cpp/src/qpid/client/Completion.h +++ b/cpp/src/qpid/client/Completion.h @@ -24,7 +24,7 @@ #include <boost/shared_ptr.hpp> #include "Future.h" -#include "SessionCore.h" +#include "SessionImpl.h" namespace qpid { namespace client { @@ -33,17 +33,12 @@ class Completion { protected: Future future; - shared_ptr<SessionCore> session; + shared_ptr<SessionImpl> session; public: Completion() {} - Completion(Future f, shared_ptr<SessionCore> s) : future(f), session(s) {} - - void sync() - { - future.sync(*session); - } + Completion(Future f, shared_ptr<SessionImpl> s) : future(f), session(s) {} void wait() { @@ -53,10 +48,6 @@ public: bool isComplete() { return future.isComplete(*session); } - - bool isCompleteUpTo() { - return future.isCompleteUpTo(*session); - } }; }} diff --git a/cpp/src/qpid/client/CompletionTracker.cpp b/cpp/src/qpid/client/CompletionTracker.cpp deleted file mode 100644 index 76ea9dec51..0000000000 --- a/cpp/src/qpid/client/CompletionTracker.cpp +++ /dev/null @@ -1,121 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "CompletionTracker.h" -#include <algorithm> - -using qpid::client::CompletionTracker; -using namespace qpid::framing; -using namespace boost; - -namespace -{ -const std::string empty; -} - -CompletionTracker::CompletionTracker() : closed(false) {} -CompletionTracker::CompletionTracker(const SequenceNumber& m) : mark(m) {} - -void CompletionTracker::close() -{ - sys::Mutex::ScopedLock l(lock); - closed=true; - while (!listeners.empty()) { - Record r(listeners.front()); - { - sys::Mutex::ScopedUnlock u(lock); - r.completed(); - } - listeners.pop_front(); - } -} - - -void CompletionTracker::completed(const SequenceNumber& _mark) -{ - sys::Mutex::ScopedLock l(lock); - mark = _mark; - while (!listeners.empty() && !(listeners.front().id > mark)) { - Record r(listeners.front()); - listeners.pop_front(); - { - sys::Mutex::ScopedUnlock u(lock); - r.completed(); - } - } -} - -void CompletionTracker::received(const SequenceNumber& id, const std::string& result) -{ - sys::Mutex::ScopedLock l(lock); - Listeners::iterator i = seek(id); - if (i != listeners.end() && i->id == id) { - i->received(result); - listeners.erase(i); - } -} - -void CompletionTracker::listenForCompletion(const SequenceNumber& point, CompletionListener listener) -{ - if (!add(Record(point, listener))) { - listener(); - } -} - -void CompletionTracker::listenForResult(const SequenceNumber& point, ResultListener listener) -{ - if (!add(Record(point, listener))) { - listener(empty); - } -} - -bool CompletionTracker::add(const Record& record) -{ - sys::Mutex::ScopedLock l(lock); - if (record.id <= mark || closed) { - return false; - } else { - //insert at the correct position - Listeners::iterator i = seek(record.id); - if (i == listeners.end()) i = listeners.begin(); - listeners.insert(i, record); - return true; - } -} - -CompletionTracker::Listeners::iterator CompletionTracker::seek(const framing::SequenceNumber& point) -{ - Listeners::iterator i = listeners.begin(); - while (i != listeners.end() && i->id < point) i++; - return i; -} - - -void CompletionTracker::Record::completed() -{ - if (f) f(); - else if(g) g(empty);//won't get a result if command is now complete -} - -void CompletionTracker::Record::received(const std::string& result) -{ - if (g) g(result); -} diff --git a/cpp/src/qpid/client/CompletionTracker.h b/cpp/src/qpid/client/CompletionTracker.h deleted file mode 100644 index 55f7ff7531..0000000000 --- a/cpp/src/qpid/client/CompletionTracker.h +++ /dev/null @@ -1,77 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <list> -#include <boost/function.hpp> -#include "qpid/framing/amqp_framing.h" -#include "qpid/framing/SequenceNumber.h" -#include "qpid/sys/Mutex.h" - -#ifndef _CompletionTracker_ -#define _CompletionTracker_ - -namespace qpid { -namespace client { - -class CompletionTracker -{ -public: - typedef boost::function<void()> CompletionListener; - typedef boost::function<void(const std::string&)> ResultListener; - - CompletionTracker(); - CompletionTracker(const framing::SequenceNumber& mark); - void completed(const framing::SequenceNumber& mark); - void received(const framing::SequenceNumber& id, const std::string& result); - void listenForCompletion(const framing::SequenceNumber& point, CompletionListener l); - void listenForResult(const framing::SequenceNumber& point, ResultListener l); - void close(); - -private: - struct Record - { - framing::SequenceNumber id; - CompletionListener f; - ResultListener g; - - Record(const framing::SequenceNumber& _id, CompletionListener l) : id(_id), f(l) {} - Record(const framing::SequenceNumber& _id, ResultListener l) : id(_id), g(l) {} - void completed(); - void received(const std::string& result); - - }; - - typedef std::list<Record> Listeners; - bool closed; - - sys::Mutex lock; - framing::SequenceNumber mark; - Listeners listeners; - - bool add(const Record& r); - Listeners::iterator seek(const framing::SequenceNumber&); -}; - -} -} - - -#endif diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp index 872e04b3b5..25d1c510c8 100644 --- a/cpp/src/qpid/client/Connection.cpp +++ b/cpp/src/qpid/client/Connection.cpp @@ -25,7 +25,7 @@ #include "Connection.h" #include "Channel.h" #include "Message.h" -#include "SessionCore.h" +#include "SessionImpl.h" #include "qpid/log/Logger.h" #include "qpid/log/Options.h" #include "qpid/log/Statement.h" @@ -76,8 +76,8 @@ void Connection::openChannel(Channel& channel) { Session Connection::newSession(SynchronousMode sync, uint32_t detachedLifetime) { - shared_ptr<SessionCore> core( - new SessionCore(impl, ++channelIdCounter, max_frame_size)); + shared_ptr<SessionImpl> core( + new SessionImpl(impl, ++channelIdCounter, max_frame_size)); core->setSync(sync); impl->addSession(core); core->open(detachedLifetime); diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h index 81d9b972b6..d24809b31e 100644 --- a/cpp/src/qpid/client/Connection.h +++ b/cpp/src/qpid/client/Connection.h @@ -137,7 +137,7 @@ class Connection Session newSession(SynchronousMode sync, uint32_t detachedLifetime=0); /** - * Resume a suspendded session. A session may be resumed + * Resume a suspended session. A session may be resumed * on a different connection to the one that created it. */ void resume(Session& session); diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp index e1c50c14fc..13de271e3b 100644 --- a/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/cpp/src/qpid/client/ConnectionHandler.cpp @@ -24,6 +24,7 @@ #include "qpid/framing/amqp_framing.h" #include "qpid/framing/AMQP_HighestVersion.h" #include "qpid/framing/all_method_bodies.h" +#include "qpid/framing/ClientInvoker.h" using namespace qpid::client; using namespace qpid::framing; @@ -31,14 +32,21 @@ using namespace boost; namespace { const std::string OK("OK"); +const std::string PLAIN("PLAIN"); +const std::string en_US("en_US"); + +const std::string INVALID_STATE_START("start received in invalid state"); +const std::string INVALID_STATE_TUNE("tune received in invalid state"); +const std::string INVALID_STATE_OPEN_OK("open-ok received in invalid state"); +const std::string INVALID_STATE_CLOSE_OK("close-ok received in invalid state"); } ConnectionHandler::ConnectionHandler() - : StateManager(NOT_STARTED) + : StateManager(NOT_STARTED), outHandler(*this), proxy(outHandler) { - mechanism = "PLAIN"; - locale = "en_US"; + mechanism = PLAIN; + locale = en_US; heartbeat = 0; maxChannels = 32767; maxFrameSize = 65535; @@ -52,34 +60,29 @@ ConnectionHandler::ConnectionHandler() void ConnectionHandler::incoming(AMQFrame& frame) { if (getState() == CLOSED) { - throw Exception("Connection is closed."); + throw Exception("Received frame on closed connection"); } + AMQBody* body = frame.getBody(); - if (frame.getChannel() == 0) { - if (body->getMethod()) { - handle(body->getMethod()); - } else { - error(503, "Cannot send content on channel zero."); - } - } else { - switch(getState()) { - case OPEN: - try { + try { + if (frame.getChannel() != 0 || !invoke(static_cast<ConnectionOperations&>(*this), *body)) { + switch(getState()) { + case OPEN: in(frame); - }catch(ConnectionException& e){ - error(e.code, e.what(), body); - }catch(std::exception& e){ - error(541/*internal error*/, e.what(), body); + break; + case CLOSING: + QPID_LOG(warning, "Ignoring frame while closing connection: " << frame); + break; + default: + throw Exception("Cannot receive frames on non-zero channel until connection is established."); } - break; - case CLOSING: - QPID_LOG(warning, "Received frame on non-zero channel while closing connection; frame ignored."); - break; - default: - //must be in connection initialisation: - fail("Cannot receive frames on non-zero channel until connection is established."); } + }catch(std::exception& e){ + QPID_LOG(warning, "Closing connection due to " << e.what()); + setState(CLOSING); + proxy.close(501, e.what()); + if (onError) onError(501, e.what()); } } @@ -109,101 +112,77 @@ void ConnectionHandler::close() break; case OPEN: setState(CLOSING); - send(ConnectionCloseBody(version, 200, OK, 0, 0)); + proxy.close(200, OK); waitFor(CLOSED); break; // Nothing to do for CLOSING, CLOSED, FAILED or NOT_STARTED } } -void ConnectionHandler::send(const framing::AMQBody& body) +void ConnectionHandler::checkState(STATES s, const std::string& msg) { - AMQFrame f(body); - out(f); + if (getState() != s) { + throw CommandInvalidException(msg); + } } -void ConnectionHandler::error(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId) +void ConnectionHandler::fail(const std::string& message) { - setState(CLOSING); - send(ConnectionCloseBody(version, code, message, classId, methodId)); + QPID_LOG(warning, message); + setState(FAILED); } -void ConnectionHandler::error(uint16_t code, const std::string& message, AMQBody* body) +void ConnectionHandler::start(const FieldTable& /*serverProps*/, const Array& /*mechanisms*/, const Array& /*locales*/) { - if (onError) - onError(code, message); - AMQMethodBody* method = body->getMethod(); - if (method) - error(code, message, method->amqpClassId(), method->amqpMethodId()); - else - error(code, message); + checkState(NOT_STARTED, INVALID_STATE_START); + setState(NEGOTIATING); + //TODO: verify that desired mechanism and locale are supported + string response = ((char)0) + uid + ((char)0) + pwd; + proxy.startOk(properties, mechanism, response, locale); } +void ConnectionHandler::secure(const std::string& /*challenge*/) +{ + throw NotImplementedException("Challenge-response cycle not yet implemented in client"); +} -void ConnectionHandler::fail(const std::string& message) +void ConnectionHandler::tune(uint16_t channelMax, uint16_t /*frameMax*/, uint16_t /*heartbeatMin*/, uint16_t /*heartbeatMax*/) { - QPID_LOG(warning, message); - setState(FAILED); + checkState(NEGOTIATING, INVALID_STATE_TUNE); + //TODO: verify that desired heartbeat and max frame size are valid + maxChannels = channelMax; + proxy.tuneOk(maxChannels, maxFrameSize, heartbeat); + setState(OPENING); + proxy.open(vhost, capabilities, insist); } -void ConnectionHandler::handle(AMQMethodBody* method) +void ConnectionHandler::openOk(const framing::Array& /*knownHosts*/) { - switch (getState()) { - case NOT_STARTED: - if (method->isA<ConnectionStartBody>()) { - setState(NEGOTIATING); - string response = ((char)0) + uid + ((char)0) + pwd; - send(ConnectionStartOkBody(version, properties, mechanism, response, locale)); - } else { - fail("Bad method sequence, expected connection-start."); - } - break; - case NEGOTIATING: - if (method->isA<ConnectionTuneBody>()) { - ConnectionTuneBody* proposal=polymorphic_downcast<ConnectionTuneBody*>(method); - heartbeat = proposal->getHeartbeat(); - maxChannels = proposal->getChannelMax(); - send(ConnectionTuneOkBody(version, maxChannels, maxFrameSize, heartbeat)); - setState(OPENING); - send(ConnectionOpenBody(version, vhost, capabilities, insist)); - //TODO: support for further security challenges - //} else if (method->isA<ConnectionSecureBody>()) { - } else { - fail("Unexpected method sequence, expected connection-tune."); - } - break; - case OPENING: - if (method->isA<ConnectionOpenOkBody>()) { - setState(OPEN); - //TODO: support for redirection - //} else if (method->isA<ConnectionRedirectBody>()) { - } else { - fail("Unexpected method sequence, expected connection-open-ok."); - } - break; - case OPEN: - if (method->isA<ConnectionCloseBody>()) { - send(ConnectionCloseOkBody(version)); - setState(CLOSED); - ConnectionCloseBody* c=polymorphic_downcast<ConnectionCloseBody*>(method); - QPID_LOG(warning, "Broker closed connection: " << c->getReplyCode() - << ", " << c->getReplyText()); - if (onError) { - onError(c->getReplyCode(), c->getReplyText()); - } - } else { - error(503, "Unexpected method on channel zero.", method->amqpClassId(), method->amqpMethodId()); - } - break; - case CLOSING: - if (method->isA<ConnectionCloseOkBody>()) { - if (onClose) { - onClose(); - } - setState(CLOSED); - } else { - QPID_LOG(warning, "Received frame on channel zero while closing connection; frame ignored."); - } - break; + checkState(OPENING, INVALID_STATE_OPEN_OK); + //TODO: store knownHosts for reconnection etc + setState(OPEN); +} + +void ConnectionHandler::redirect(const std::string& /*host*/, const Array& /*knownHosts*/) +{ + throw NotImplementedException("Redirection received from broker; not yet implemented in client"); +} + +void ConnectionHandler::close(uint16_t replyCode, const std::string& replyText) +{ + proxy.closeOk(); + setState(CLOSED); + QPID_LOG(warning, "Broker closed connection: " << replyCode << ", " << replyText); + if (onError) { + onError(replyCode, replyText); + } +} + +void ConnectionHandler::closeOk() +{ + checkState(CLOSING, INVALID_STATE_CLOSE_OK); + if (onClose) { + onClose(); } + setState(CLOSED); } diff --git a/cpp/src/qpid/client/ConnectionHandler.h b/cpp/src/qpid/client/ConnectionHandler.h index bb50495c06..b298b02701 100644 --- a/cpp/src/qpid/client/ConnectionHandler.h +++ b/cpp/src/qpid/client/ConnectionHandler.h @@ -21,12 +21,16 @@ #ifndef _ConnectionHandler_ #define _ConnectionHandler_ +#include "ChainableFrameHandler.h" #include "Connector.h" #include "StateManager.h" -#include "ChainableFrameHandler.h" -#include "qpid/framing/InputHandler.h" -#include "qpid/framing/FieldTable.h" #include "qpid/framing/AMQMethodBody.h" +#include "qpid/framing/AMQP_ClientOperations.h" +#include "qpid/framing/AMQP_ServerProxy.h" +#include "qpid/framing/Array.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/framing/FrameHandler.h" +#include "qpid/framing/InputHandler.h" namespace qpid { namespace client { @@ -39,7 +43,7 @@ struct ConnectionProperties framing::FieldTable properties; std::string mechanism; std::string locale; - std::string capabilities; + framing::Array capabilities; uint16_t heartbeat; uint16_t maxChannels; uint64_t maxFrameSize; @@ -48,17 +52,42 @@ struct ConnectionProperties }; class ConnectionHandler : private StateManager, - public ConnectionProperties, - public ChainableFrameHandler, - public framing::InputHandler + public ConnectionProperties, + public ChainableFrameHandler, + public framing::InputHandler, + private framing::AMQP_ClientOperations::Connection010Handler { + typedef framing::AMQP_ClientOperations::Connection010Handler ConnectionOperations; enum STATES {NOT_STARTED, NEGOTIATING, OPENING, OPEN, CLOSING, CLOSED, FAILED}; std::set<int> ESTABLISHED; - void handle(framing::AMQMethodBody* method); - void send(const framing::AMQBody& body); - void error(uint16_t code, const std::string& message, uint16_t classId = 0, uint16_t methodId = 0); - void error(uint16_t code, const std::string& message, framing::AMQBody* body); + class Adapter : public framing::FrameHandler + { + ConnectionHandler& handler; + public: + Adapter(ConnectionHandler& h) : handler(h) {} + void handle(framing::AMQFrame& f) { handler.out(f); } + }; + + Adapter outHandler; + framing::AMQP_ServerProxy::Connection010 proxy; + + void checkState(STATES s, const std::string& msg); + + //methods corresponding to connection controls: + void start(const framing::FieldTable& serverProperties, + const framing::Array& mechanisms, + const framing::Array& locales); + void secure(const std::string& challenge); + void tune(uint16_t channelMax, + uint16_t frameMax, + uint16_t heartbeatMin, + uint16_t heartbeatMax); + void openOk(const framing::Array& knownHosts); + void redirect(const std::string& host, + const framing::Array& knownHosts); + void close(uint16_t replyCode, const std::string& replyText); + void closeOk(); public: using InputHandler::handle; diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index b248de8744..d1fd66ff26 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -23,7 +23,7 @@ #include "qpid/framing/reply_exceptions.h" #include "ConnectionImpl.h" -#include "SessionCore.h" +#include "SessionImpl.h" #include <boost/bind.hpp> #include <boost/format.hpp> @@ -32,6 +32,7 @@ using namespace qpid::client; using namespace qpid::framing; using namespace qpid::sys; + ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c) : connector(c), isClosed(false), isClosing(false) { @@ -52,10 +53,10 @@ ConnectionImpl::~ConnectionImpl() { connector->close(); } -void ConnectionImpl::addSession(const boost::shared_ptr<SessionCore>& session) +void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session) { Mutex::ScopedLock l(lock); - boost::weak_ptr<SessionCore>& s = sessions[session->getChannel()]; + boost::weak_ptr<SessionImpl>& s = sessions[session->getChannel()]; if (s.lock()) throw ChannelBusyException(); s = session; } @@ -67,7 +68,7 @@ void ConnectionImpl::handle(framing::AMQFrame& frame) void ConnectionImpl::incoming(framing::AMQFrame& frame) { - boost::shared_ptr<SessionCore> s; + boost::shared_ptr<SessionImpl> s; { Mutex::ScopedLock l(lock); s = sessions[frame.getChannel()].lock(); @@ -122,7 +123,7 @@ ConnectionImpl::SessionVector ConnectionImpl::closeInternal(const Mutex::ScopedL connector->close(); SessionVector save; for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) { - boost::shared_ptr<SessionCore> s = i->second.lock(); + boost::shared_ptr<SessionImpl> s = i->second.lock(); if (s) save.push_back(s); } sessions.clear(); @@ -135,7 +136,7 @@ void ConnectionImpl::closed(uint16_t code, const std::string& text) if (isClosed) return; SessionVector save(closeInternal(l)); Mutex::ScopedUnlock u(lock); - std::for_each(save.begin(), save.end(), boost::bind(&SessionCore::connectionClosed, _1, code, text)); + std::for_each(save.begin(), save.end(), boost::bind(&SessionImpl::connectionClosed, _1, code, text)); } static const std::string CONN_CLOSED("Connection closed by broker"); @@ -148,7 +149,7 @@ void ConnectionImpl::shutdown() handler.fail(CONN_CLOSED); Mutex::ScopedUnlock u(lock); std::for_each(save.begin(), save.end(), - boost::bind(&SessionCore::connectionBroke, _1, INTERNAL_ERROR, CONN_CLOSED)); + boost::bind(&SessionImpl::connectionBroke, _1, INTERNAL_ERROR, CONN_CLOSED)); } void ConnectionImpl::erase(uint16_t ch) { diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h index bf8226a776..d0df9238f2 100644 --- a/cpp/src/qpid/client/ConnectionImpl.h +++ b/cpp/src/qpid/client/ConnectionImpl.h @@ -35,15 +35,15 @@ namespace qpid { namespace client { -class SessionCore; +class SessionImpl; class ConnectionImpl : public framing::FrameHandler, public sys::TimeoutHandler, public sys::ShutdownHandler { - typedef std::map<uint16_t, boost::weak_ptr<SessionCore> > SessionMap; - typedef std::vector<boost::shared_ptr<SessionCore> > SessionVector; + typedef std::map<uint16_t, boost::weak_ptr<SessionImpl> > SessionMap; + typedef std::vector<boost::shared_ptr<SessionImpl> > SessionVector; SessionMap sessions; ConnectionHandler handler; @@ -69,7 +69,7 @@ class ConnectionImpl : public framing::FrameHandler, ConnectionImpl(boost::shared_ptr<Connector> c); ~ConnectionImpl(); - void addSession(const boost::shared_ptr<SessionCore>&); + void addSession(const boost::shared_ptr<SessionImpl>&); void open(const std::string& host, int port = 5672, const std::string& uid = "guest", diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp index 11aff6184b..7fb4997f5a 100644 --- a/cpp/src/qpid/client/Connector.cpp +++ b/cpp/src/qpid/client/Connector.cpp @@ -46,6 +46,7 @@ Connector::Connector( receive_buffer_size(buffer_size), send_buffer_size(buffer_size), version(ver), + initiated(false), closed(true), joined(true), timeout(0), @@ -240,6 +241,14 @@ void Connector::Writer::write(sys::AsynchIO&) { void Connector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) { framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount); + if (!initiated) { + framing::ProtocolInitiation protocolInit; + if (protocolInit.decode(in)) { + //TODO: check the version is correct + QPID_LOG(debug, "RECV " << identifier << " INIT(" << protocolInit << ")"); + } + initiated = true; + } AMQFrame frame; while(frame.decode(in)){ QPID_LOG(trace, "RECV " << identifier << ": " << frame); diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h index 78aad0b60a..366f82acbd 100644 --- a/cpp/src/qpid/client/Connector.h +++ b/cpp/src/qpid/client/Connector.h @@ -77,8 +77,9 @@ class Connector : public framing::OutputHandler, const int receive_buffer_size; const int send_buffer_size; framing::ProtocolVersion version; + bool initiated; - sys::Mutex closedLock; + sys::Mutex closedLock; bool closed; bool joined; diff --git a/cpp/src/qpid/client/Correlator.cpp b/cpp/src/qpid/client/Correlator.cpp deleted file mode 100644 index f30c92b992..0000000000 --- a/cpp/src/qpid/client/Correlator.cpp +++ /dev/null @@ -1,45 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "Correlator.h" - -using qpid::client::Correlator; -using namespace qpid::framing; -using namespace boost; - -bool Correlator::receive(const AMQMethodBody* response) -{ - if (listeners.empty()) { - return false; - } else { - Listener l = listeners.front(); - if (l) l(response); - listeners.pop(); - return true; - } -} - -void Correlator::listen(Listener l) -{ - listeners.push(l); -} - - diff --git a/cpp/src/qpid/client/Correlator.h b/cpp/src/qpid/client/Correlator.h deleted file mode 100644 index 45b22fb2fe..0000000000 --- a/cpp/src/qpid/client/Correlator.h +++ /dev/null @@ -1,52 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <memory> -#include <queue> -#include <set> -#include <boost/function.hpp> -#include "qpid/framing/AMQMethodBody.h" -#include "qpid/sys/Monitor.h" - -#ifndef _Correlator_ -#define _Correlator_ - -namespace qpid { -namespace client { - - -class Correlator -{ -public: - typedef boost::function<void(const framing::AMQMethodBody*)> Listener; - - bool receive(const framing::AMQMethodBody*); - void listen(Listener l); - -private: - std::queue<Listener> listeners; -}; - -} -} - - -#endif diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp index 2484dabf1f..cc67701748 100644 --- a/cpp/src/qpid/client/Dispatcher.cpp +++ b/cpp/src/qpid/client/Dispatcher.cpp @@ -84,7 +84,7 @@ void Dispatcher::run() if (handler.get()) { handler->handle(*content); } else { - QPID_LOG(error, "No handler found for " << *(content->getMethod())); + QPID_LOG(warning, "No handler found for " << *(content->getMethod())); } } } diff --git a/cpp/src/qpid/client/Execution.h b/cpp/src/qpid/client/Execution.h index 5f717de586..e4b2db23e1 100644 --- a/cpp/src/qpid/client/Execution.h +++ b/cpp/src/qpid/client/Execution.h @@ -28,21 +28,27 @@ namespace qpid { namespace client { /** - * Provides more detailed access to the amqp 'execution layer'. + * Provides access to more detailed aspects of the session + * implementation. */ class Execution { public: virtual ~Execution() {} - virtual void sendSyncRequest() = 0; - virtual void sendFlushRequest() = 0; - virtual void completed(const framing::SequenceNumber& id, bool cumulative, bool send) = 0; + /** + * Mark the incoming command with the specified id as completed + */ + virtual void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer) = 0; + /** + * Provides access to the demultiplexing function within the + * session implementation + */ virtual Demux& getDemux() = 0; - virtual bool isComplete(const framing::SequenceNumber& id) = 0; - virtual bool isCompleteUpTo(const framing::SequenceNumber& id) = 0; - virtual void setCompletionListener(boost::function<void()>) = 0; - virtual void syncWait(const framing::SequenceNumber& id) = 0; - virtual framing::SequenceNumber lastSent() const = 0; + /** + * Wait until notification has been received of completion of the + * outgoing command with the specified id. + */ + void waitForCompletion(const framing::SequenceNumber& id); }; }} diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp deleted file mode 100644 index afdd13c9e9..0000000000 --- a/cpp/src/qpid/client/ExecutionHandler.cpp +++ /dev/null @@ -1,267 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "ExecutionHandler.h" -#include "qpid/Exception.h" -#include "qpid/framing/BasicDeliverBody.h" -#include "qpid/framing/MessageTransferBody.h" -#include "qpid/framing/AMQP_HighestVersion.h" -#include "qpid/framing/all_method_bodies.h" -#include "qpid/framing/ServerInvoker.h" -#include "qpid/client/FutureCompletion.h" -#include <boost/bind.hpp> - -using namespace qpid::client; -using namespace qpid::framing; -using namespace boost; -using qpid::sys::Mutex; - -bool isMessageMethod(AMQMethodBody* method) -{ - return method->isA<BasicDeliverBody>() || method->isA<MessageTransferBody>() || method->isA<BasicGetOkBody>(); -} - -bool isMessageMethod(AMQBody* body) -{ - AMQMethodBody* method=body->getMethod(); - return method && isMessageMethod(method); -} - -bool isContentFrame(AMQFrame& frame) -{ - AMQBody* body = frame.getBody(); - uint8_t type = body->type(); - return type == HEADER_BODY || type == CONTENT_BODY || isMessageMethod(body); -} - -ExecutionHandler::ExecutionHandler(uint64_t _maxFrameSize) : - version(framing::highestProtocolVersion), maxFrameSize(_maxFrameSize) {} - -//incoming: -void ExecutionHandler::handle(AMQFrame& frame) -{ - if (!invoke(*this, *frame.getBody())) { - if (!arriving) { - arriving = FrameSet::shared_ptr(new FrameSet(++incomingCounter)); - } - arriving->append(frame); - if (arriving->isComplete()) { - if (arriving->isContentBearing() || !correlation.receive(arriving->getMethod())) { - demux.handle(arriving); - } - arriving.reset(); - } - } -} - -void ExecutionHandler::complete(uint32_t cumulative, const SequenceNumberSet& range) -{ - if (range.size() % 2) { //must be even number - throw NotAllowedException(QPID_MSG("Received odd number of elements in ranged mark")); - } else { - SequenceNumber mark(cumulative); - { - Mutex::ScopedLock l(lock); - outgoingCompletionStatus.update(mark, range); - } - if (completionListener) completionListener(); - completion.completed(outgoingCompletionStatus.mark); - //TODO: signal listeners of early notification? - } -} - -void ExecutionHandler::flush() -{ - sendCompletion(); -} - -void ExecutionHandler::noop() -{ - //do nothing -} - -void ExecutionHandler::result(uint32_t command, const std::string& data) -{ - completion.received(command, data); -} - -void ExecutionHandler::sync() -{ - //TODO: implement - need to note the mark requested and then - //remember to send a response when that point is reached -} - -void ExecutionHandler::flushTo(const framing::SequenceNumber& point) -{ - Mutex::ScopedLock l(lock); - if (point > outgoingCompletionStatus.mark) { - sendFlushRequest(); - } -} - -void ExecutionHandler::sendFlushRequest() -{ - Mutex::ScopedLock l(lock); - AMQFrame frame(in_place<ExecutionFlushBody>()); - out(frame); -} - -void ExecutionHandler::syncTo(const framing::SequenceNumber& point) -{ - Mutex::ScopedLock l(lock); - if (point > outgoingCompletionStatus.mark) { - sendSyncRequest(); - } -} - - -void ExecutionHandler::sendSyncRequest() -{ - Mutex::ScopedLock l(lock); - AMQFrame frame(in_place<ExecutionSyncBody>()); - out(frame); -} - -void ExecutionHandler::completed(const SequenceNumber& id, bool cumulative, bool send) -{ - { - Mutex::ScopedLock l(lock); - if (id > incomingCompletionStatus.mark) { - if (cumulative) { - incomingCompletionStatus.update(incomingCompletionStatus.mark, id); - } else { - incomingCompletionStatus.update(id, id); - } - } - } - if (send) { - sendCompletion(); - } -} - - -void ExecutionHandler::sendCompletion() -{ - Mutex::ScopedLock l(lock); - SequenceNumberSet range; - incomingCompletionStatus.collectRanges(range); - AMQFrame frame( - in_place<ExecutionCompleteBody>( - version, incomingCompletionStatus.mark.getValue(), range)); - out(frame); -} - -SequenceNumber ExecutionHandler::lastSent() const { return outgoingCounter; } - -SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener listener) -{ - Mutex::ScopedLock l(lock); - return send(command, listener, false); -} - -SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener l, bool hasContent) -{ - SequenceNumber id = ++outgoingCounter; - if(l) { - completion.listenForResult(id, l); - } - AMQFrame frame(command); - if (hasContent) { - frame.setEof(false); - } - out(frame); - return id; -} - -SequenceNumber ExecutionHandler::send(const AMQBody& command, const MethodContent& content, - CompletionTracker::ResultListener listener) -{ - Mutex::ScopedLock l(lock); - SequenceNumber id = send(command, listener, true); - sendContent(content); - return id; -} - -void ExecutionHandler::sendContent(const MethodContent& content) -{ - AMQFrame header(content.getHeader()); - header.setBof(false); - u_int64_t data_length = content.getData().length(); - if(data_length > 0){ - header.setEof(false); - out(header); - const u_int32_t frag_size = maxFrameSize - (AMQFrame::frameOverhead() - 1 /*end of frame marker included in overhead but not in size*/); - if(data_length < frag_size){ - AMQFrame frame(in_place<AMQContentBody>(content.getData())); - frame.setBof(false); - out(frame); - }else{ - u_int32_t offset = 0; - u_int32_t remaining = data_length - offset; - while (remaining > 0) { - u_int32_t length = remaining > frag_size ? frag_size : remaining; - string frag(content.getData().substr(offset, length)); - AMQFrame frame(in_place<AMQContentBody>(frag)); - frame.setBof(false); - if (offset > 0) { - frame.setBos(false); - } - offset += length; - remaining = data_length - offset; - if (remaining) { - frame.setEos(false); - frame.setEof(false); - } - out(frame); - } - } - } else { - out(header); - } -} - -bool ExecutionHandler::isComplete(const SequenceNumber& id) -{ - Mutex::ScopedLock l(lock); - return outgoingCompletionStatus.covers(id); -} - -bool ExecutionHandler::isCompleteUpTo(const SequenceNumber& id) -{ - Mutex::ScopedLock l(lock); - return outgoingCompletionStatus.mark >= id; -} - -void ExecutionHandler::setCompletionListener(boost::function<void()> l) -{ - completionListener = l; -} - - -void ExecutionHandler::syncWait(const SequenceNumber& id) { - syncTo(id); - FutureCompletion fc; - completion.listenForCompletion( - id, boost::bind(&FutureCompletion::completed, &fc) - ); - fc.waitForCompletion(); - assert(isCompleteUpTo(id)); -} diff --git a/cpp/src/qpid/client/ExecutionHandler.h b/cpp/src/qpid/client/ExecutionHandler.h deleted file mode 100644 index d9113b683b..0000000000 --- a/cpp/src/qpid/client/ExecutionHandler.h +++ /dev/null @@ -1,104 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _ExecutionHandler_ -#define _ExecutionHandler_ - -#include <queue> -#include "qpid/framing/AccumulatedAck.h" -#include "qpid/framing/AMQP_ServerOperations.h" -#include "qpid/framing/FrameSet.h" -#include "qpid/framing/MethodContent.h" -#include "qpid/framing/SequenceNumber.h" -#include "qpid/sys/Mutex.h" -#include "ChainableFrameHandler.h" -#include "CompletionTracker.h" -#include "Correlator.h" -#include "Demux.h" -#include "Execution.h" - -namespace qpid { -namespace client { - -class ExecutionHandler : - public framing::AMQP_ServerOperations::ExecutionHandler, - public framing::FrameHandler, - public Execution -{ - framing::SequenceNumber incomingCounter; - framing::AccumulatedAck incomingCompletionStatus; - framing::SequenceNumber outgoingCounter; - framing::AccumulatedAck outgoingCompletionStatus; - framing::FrameSet::shared_ptr arriving; - Correlator correlation; - CompletionTracker completion; - Demux demux; - sys::Mutex lock; - framing::ProtocolVersion version; - uint64_t maxFrameSize; - boost::function<void()> completionListener; - - void complete(uint32_t mark, const framing::SequenceNumberSet& range); - void flush(); - void noop(); - void result(uint32_t command, const std::string& data); - void sync(); - - void sendCompletion(); - - framing::SequenceNumber send(const framing::AMQBody&, CompletionTracker::ResultListener, bool hasContent); - void sendContent(const framing::MethodContent&); - -public: - typedef CompletionTracker::ResultListener ResultListener; - - // Allow other classes to set the out handler. - framing::FrameHandler::Chain out; - - ExecutionHandler(uint64_t maxFrameSize = 65535); - - // Incoming handler. - void handle(framing::AMQFrame& frame); - - framing::SequenceNumber send(const framing::AMQBody& command, ResultListener=ResultListener()); - framing::SequenceNumber send(const framing::AMQBody& command, const framing::MethodContent& content, - ResultListener=ResultListener()); - framing::SequenceNumber lastSent() const; - void sendSyncRequest(); - void sendFlushRequest(); - void completed(const framing::SequenceNumber& id, bool cumulative, bool send); - void syncTo(const framing::SequenceNumber& point); - void flushTo(const framing::SequenceNumber& point); - void syncWait(const framing::SequenceNumber& id); - - bool isComplete(const framing::SequenceNumber& id); - bool isCompleteUpTo(const framing::SequenceNumber& id); - - void setMaxFrameSize(uint64_t size) { maxFrameSize = size; } - Correlator& getCorrelator() { return correlation; } - CompletionTracker& getCompletionTracker() { return completion; } - Demux& getDemux() { return demux; } - - void setCompletionListener(boost::function<void()>); -}; - -}} - -#endif diff --git a/cpp/src/qpid/client/FutureResponse.h b/cpp/src/qpid/client/Future.cpp index 534ca01bb7..6a0c78ae4b 100644 --- a/cpp/src/qpid/client/FutureResponse.h +++ b/cpp/src/qpid/client/Future.cpp @@ -19,28 +19,27 @@ * */ -#ifndef _FutureResponse_ -#define _FutureResponse_ - -#include "qpid/framing/amqp_framing.h" -#include "qpid/framing/BodyHolder.h" -#include "FutureCompletion.h" +#include "Future.h" namespace qpid { namespace client { -class SessionCore; - -class FutureResponse : public FutureCompletion +void Future::wait(SessionImpl& session) { - framing::BodyHolder response; -public: - framing::AMQMethodBody* getResponse(SessionCore& session); - void received(const framing::AMQMethodBody* response); -}; - -}} + if (!complete) { + session.waitForCompletion(command); + } + complete = true; +} +bool Future::isComplete(SessionImpl& session) +{ + return complete || session.isComplete(command); +} +void Future::setFutureResult(boost::shared_ptr<FutureResult> r) +{ + result = r; +} -#endif +}} diff --git a/cpp/src/qpid/client/Future.h b/cpp/src/qpid/client/Future.h index d07f9f149c..faf68c9104 100644 --- a/cpp/src/qpid/client/Future.h +++ b/cpp/src/qpid/client/Future.h @@ -28,9 +28,8 @@ #include "qpid/framing/SequenceNumber.h" #include "qpid/framing/StructHelper.h" #include "FutureCompletion.h" -#include "FutureResponse.h" #include "FutureResult.h" -#include "SessionCore.h" +#include "SessionImpl.h" namespace qpid { namespace client { @@ -38,7 +37,6 @@ namespace client { class Future : private framing::StructHelper { framing::SequenceNumber command; - boost::shared_ptr<FutureResponse> response; boost::shared_ptr<FutureResult> result; bool complete; @@ -46,42 +44,7 @@ public: Future() : complete(false) {} Future(const framing::SequenceNumber& id) : command(id), complete(false) {} - void sync(SessionCore& session) - { - if (!isComplete(session)) { - session.getExecution().syncTo(command); - wait(session); - } - } - - void wait(SessionCore& session) - { - if (!isComplete(session)) { - FutureCompletion callback; - session.getExecution().getCompletionTracker().listenForCompletion( - command, - boost::bind(&FutureCompletion::completed, &callback) - ); - callback.waitForCompletion(); - session.assertOpen(); - complete = true; - } - } - - framing::AMQMethodBody* getResponse(SessionCore& session) - { - if (response) { - session.getExecution().getCompletionTracker().listenForCompletion( - command, - boost::bind(&FutureResponse::completed, response) - ); - return response->getResponse(session); - } else { - throw Exception("Response not expected"); - } - } - - template <class T> void decodeResult(T& value, SessionCore& session) + template <class T> void decodeResult(T& value, SessionImpl& session) { if (result) { decode(value, result->getResult(session)); @@ -90,17 +53,9 @@ public: } } - bool isComplete(SessionCore& session) { - return complete || session.getExecution().isComplete(command); - } - - bool isCompleteUpTo(SessionCore& session) { - return complete || session.getExecution().isCompleteUpTo(command); - } - - void setCommandId(const framing::SequenceNumber& id) { command = id; } - void setFutureResponse(boost::shared_ptr<FutureResponse> r) { response = r; } - void setFutureResult(boost::shared_ptr<FutureResult> r) { result = r; } + void wait(SessionImpl& session); + bool isComplete(SessionImpl& session); + void setFutureResult(boost::shared_ptr<FutureResult> r); }; }} diff --git a/cpp/src/qpid/client/FutureFactory.cpp b/cpp/src/qpid/client/FutureFactory.cpp deleted file mode 100644 index 7f9d51e77f..0000000000 --- a/cpp/src/qpid/client/FutureFactory.cpp +++ /dev/null @@ -1,51 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "FutureFactory.h" - -using namespace qpid::client; -using namespace boost; - -shared_ptr<FutureCompletion> FutureFactory::createCompletion() -{ - shared_ptr<FutureCompletion> f(new FutureCompletion()); - weak_ptr<FutureCompletion> w(f); - set.push_back(w); - return f; -} - -shared_ptr<FutureResponse> FutureFactory::createResponse() -{ - shared_ptr<FutureResponse> f(new FutureResponse()); - weak_ptr<FutureCompletion> w(static_pointer_cast<FutureCompletion>(f)); - set.push_back(w); - return f; -} - -void FutureFactory::close(uint16_t code, const std::string& text) -{ - for (WeakPtrSet::iterator i = set.begin(); i != set.end(); i++) { - shared_ptr<FutureCompletion> p = i->lock(); - if (p) { - p->close(code, text); - } - } -} diff --git a/cpp/src/qpid/client/FutureFactory.h b/cpp/src/qpid/client/FutureFactory.h deleted file mode 100644 index b126e296fd..0000000000 --- a/cpp/src/qpid/client/FutureFactory.h +++ /dev/null @@ -1,48 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#ifndef _FutureFactory_ -#define _FutureFactory_ - -#include <vector> -#include <boost/shared_ptr.hpp> -#include <boost/weak_ptr.hpp> -#include "FutureCompletion.h" -#include "FutureResponse.h" - -namespace qpid { -namespace client { - -class FutureFactory -{ - typedef std::vector< boost::weak_ptr<FutureCompletion> > WeakPtrSet; - WeakPtrSet set; - -public: - boost::shared_ptr<FutureCompletion> createCompletion(); - boost::shared_ptr<FutureResponse> createResponse(); - void close(uint16_t code, const std::string& text); -}; - -}} - - -#endif diff --git a/cpp/src/qpid/client/FutureResponse.cpp b/cpp/src/qpid/client/FutureResponse.cpp deleted file mode 100644 index 32d99531fa..0000000000 --- a/cpp/src/qpid/client/FutureResponse.cpp +++ /dev/null @@ -1,45 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "FutureResponse.h" - -#include "SessionCore.h" - -using namespace qpid::client; -using namespace qpid::framing; -using namespace qpid::sys; - - -AMQMethodBody* FutureResponse::getResponse(SessionCore& session) -{ - waitForCompletion(); - session.assertOpen(); - return response.getMethod(); -} - -void FutureResponse::received(const AMQMethodBody* r) -{ - Monitor::ScopedLock l(lock); - response.setBody(*r); - complete = true; - lock.notifyAll(); -} - diff --git a/cpp/src/qpid/client/FutureResult.cpp b/cpp/src/qpid/client/FutureResult.cpp index 681202edea..007f278051 100644 --- a/cpp/src/qpid/client/FutureResult.cpp +++ b/cpp/src/qpid/client/FutureResult.cpp @@ -21,13 +21,13 @@ #include "FutureResult.h" -#include "SessionCore.h" +#include "SessionImpl.h" using namespace qpid::client; using namespace qpid::framing; using namespace qpid::sys; -const std::string& FutureResult::getResult(SessionCore& session) const +const std::string& FutureResult::getResult(SessionImpl& session) const { waitForCompletion(); session.assertOpen(); diff --git a/cpp/src/qpid/client/FutureResult.h b/cpp/src/qpid/client/FutureResult.h index 3117b63802..f889706493 100644 --- a/cpp/src/qpid/client/FutureResult.h +++ b/cpp/src/qpid/client/FutureResult.h @@ -29,13 +29,13 @@ namespace qpid { namespace client { -class SessionCore; +class SessionImpl; class FutureResult : public FutureCompletion { std::string result; public: - const std::string& getResult(SessionCore& session) const; + const std::string& getResult(SessionImpl& session) const; void received(const std::string& result); }; diff --git a/cpp/src/qpid/client/Message.cpp b/cpp/src/qpid/client/Message.cpp new file mode 100644 index 0000000000..3d4b9da9fa --- /dev/null +++ b/cpp/src/qpid/client/Message.cpp @@ -0,0 +1,76 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Message.h" + +namespace qpid { +namespace client { + + Message::Message(const std::string& data_, + const std::string& routingKey, + const std::string& exchange) : TransferContent(data_, routingKey, exchange) {} + + std::string Message::getDestination() const + { + return method.getDestination(); + } + + bool Message::isRedelivered() const + { + return hasDeliveryProperties() && getDeliveryProperties().getRedelivered(); + } + + void Message::setRedelivered(bool redelivered) + { + getDeliveryProperties().setRedelivered(redelivered); + } + + framing::FieldTable& Message::getHeaders() + { + return getMessageProperties().getApplicationHeaders(); + } + + void Message::acknowledge(bool cumulative, bool notifyPeer) const + { + const_cast<Session&>(session).getExecution().markCompleted(id, cumulative, notifyPeer); + } + + const framing::MessageTransferBody& Message::getMethod() const + { + return method; + } + + const framing::SequenceNumber& Message::getId() const + { + return id; + } + + /**@internal for incoming messages */ + Message::Message(const framing::FrameSet& frameset, Session s) : + method(*frameset.as<framing::MessageTransferBody>()), id(frameset.getId()), session(s) + { + populate(frameset); + } + + /**@internal use for incoming messages. */ + void Message::setSession(Session s) { session=s; } + +}} diff --git a/cpp/src/qpid/client/Message.h b/cpp/src/qpid/client/Message.h index daac30ba36..977cc89146 100644 --- a/cpp/src/qpid/client/Message.h +++ b/cpp/src/qpid/client/Message.h @@ -30,7 +30,7 @@ namespace qpid { namespace client { /** - * A representation of messages for sent or recived through the + * A representation of messages for sent or received through the * client api. * * \ingroup clientapi @@ -38,60 +38,21 @@ namespace client { class Message : public framing::TransferContent { public: - Message(const std::string& data_=std::string(), + Message(const std::string& data=std::string(), const std::string& routingKey=std::string(), - const std::string& exchange=std::string() - ) : TransferContent(data_, routingKey, exchange) {} - - std::string getDestination() const - { - return method.getDestination(); - } - - bool isRedelivered() const - { - return hasDeliveryProperties() && getDeliveryProperties().getRedelivered(); - } - - void setRedelivered(bool redelivered) - { - getDeliveryProperties().setRedelivered(redelivered); - } - - framing::FieldTable& getHeaders() - { - return getMessageProperties().getApplicationHeaders(); - } - - void acknowledge(Session& session, bool cumulative = true, bool send = true) const - { - session.getExecution().completed(id, cumulative, send); - } - - void acknowledge(bool cumulative = true, bool send = true) const - { - const_cast<Session&>(session).getExecution().completed(id, cumulative, send); - } + const std::string& exchange=std::string()); + std::string getDestination() const; + bool isRedelivered() const; + void setRedelivered(bool redelivered); + framing::FieldTable& getHeaders(); + void acknowledge(bool cumulative = true, bool notifyPeer = true) const; + const framing::MessageTransferBody& getMethod() const; + const framing::SequenceNumber& getId() const; /**@internal for incoming messages */ - Message(const framing::FrameSet& frameset, Session s) : - method(*frameset.as<framing::MessageTransferBody>()), id(frameset.getId()), session(s) - { - populate(frameset); - } - - const framing::MessageTransferBody& getMethod() const - { - return method; - } - - const framing::SequenceNumber& getId() const - { - return id; - } - + Message(const framing::FrameSet& frameset, Session s); /**@internal use for incoming messages. */ - void setSession(Session s) { session=s; } + void setSession(Session s); private: //method and id are only set for received messages: framing::MessageTransferBody method; diff --git a/cpp/src/qpid/client/Results.cpp b/cpp/src/qpid/client/Results.cpp new file mode 100644 index 0000000000..1fb3a6fec9 --- /dev/null +++ b/cpp/src/qpid/client/Results.cpp @@ -0,0 +1,71 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Results.h" +#include "FutureResult.h" + +using namespace qpid::framing; +using namespace boost; + +namespace qpid { +namespace client { + +Results::Results() {} + +void Results::close() +{ + for (Listeners::iterator i = listeners.begin(); i != listeners.end(); i++) { + i->second->completed(); + } + listeners.clear(); +} + +void Results::completed(const SequenceSet& set) +{ + //call complete on those listeners whose ids fall within the set + Listeners::iterator i = listeners.begin(); + while (i != listeners.end()) { + if (set.contains(i->first)) { + i->second->completed(); + listeners.erase(i++); + } else { + i++; + } + } +} + +void Results::received(const SequenceNumber& id, const std::string& result) +{ + Listeners::iterator i = listeners.find(id); + if (i != listeners.end()) { + i->second->received(result); + listeners.erase(i); + } +} + +Results::FutureResultPtr Results::listenForResult(const SequenceNumber& id) +{ + FutureResultPtr l(new FutureResult()); + listeners[id] = l; + return l; +} + +}} diff --git a/cpp/src/qpid/client/Response.h b/cpp/src/qpid/client/Results.h index 2b7d55ec1f..e17021b327 100644 --- a/cpp/src/qpid/client/Response.h +++ b/cpp/src/qpid/client/Results.h @@ -19,34 +19,37 @@ * */ -#ifndef _Response_ -#define _Response_ - +#include "qpid/framing/SequenceNumber.h" +#include "qpid/framing/SequenceSet.h" +#include <map> #include <boost/shared_ptr.hpp> -#include "qpid/framing/amqp_framing.h" -#include "Completion.h" + +#ifndef _Results_ +#define _Results_ namespace qpid { namespace client { -class Response : public Completion +class FutureResult; + +class Results { public: - Response(Future f, shared_ptr<SessionCore> s) : Completion(f, s) {} - - template <class T> T& as() - { - framing::AMQMethodBody* response(future.getResponse(*session)); - return *boost::polymorphic_downcast<T*>(response); - } - - template <class T> bool isA() - { - framing::AMQMethodBody* response(future.getResponse(*session)); - return response && response->isA<T>(); - } + typedef boost::shared_ptr<FutureResult> FutureResultPtr; + + Results(); + void completed(const framing::SequenceSet& set); + void received(const framing::SequenceNumber& id, const std::string& result); + FutureResultPtr listenForResult(const framing::SequenceNumber& point); + void close(); + +private: + typedef std::map<framing::SequenceNumber, FutureResultPtr> Listeners; + Listeners listeners; }; -}} +} +} + #endif diff --git a/cpp/src/qpid/client/SessionBase.cpp b/cpp/src/qpid/client/SessionBase.cpp index 0e1fa67bda..d6a7571e9f 100644 --- a/cpp/src/qpid/client/SessionBase.cpp +++ b/cpp/src/qpid/client/SessionBase.cpp @@ -19,6 +19,7 @@ * */ #include "SessionBase.h" +#include "qpid/framing/all_method_bodies.h" namespace qpid { namespace client { @@ -26,7 +27,7 @@ using namespace framing; SessionBase::SessionBase() {} SessionBase::~SessionBase() {} -SessionBase::SessionBase(shared_ptr<SessionCore> core) : impl(core) {} +SessionBase::SessionBase(shared_ptr<SessionImpl> core) : impl(core) {} void SessionBase::suspend() { impl->suspend(); } void SessionBase::close() { impl->close(); } @@ -37,14 +38,30 @@ SynchronousMode SessionBase::getSynchronous() const { return SynchronousMode(impl->isSync()); } -Execution& SessionBase::getExecution() { return impl->getExecution(); } +Execution& SessionBase::getExecution() +{ + return *impl; +} + +void SessionBase::sync() +{ + ExecutionSyncBody b; + b.setSync(true); + impl->send(b).wait(*impl); +} + Uuid SessionBase::getId() const { return impl->getId(); } framing::FrameSet::shared_ptr SessionBase::get() { return impl->get(); } -void SessionBase::sync() { - Execution& ex = getExecution(); - ex.syncWait(ex.lastSent()); - impl->assertOpen(); +SessionBase::ScopedSync::ScopedSync(SessionBase& s) : session(s), change(!s.isSynchronous()) +{ + if (change) session.setSynchronous(true); } +SessionBase::ScopedSync::~ScopedSync() +{ + if (change) session.setSynchronous(false); +} + + }} // namespace qpid::client diff --git a/cpp/src/qpid/client/SessionBase.h b/cpp/src/qpid/client/SessionBase.h index 3565145bb9..54484113b1 100644 --- a/cpp/src/qpid/client/SessionBase.h +++ b/cpp/src/qpid/client/SessionBase.h @@ -29,8 +29,8 @@ #include "qpid/framing/TransferContent.h" #include "qpid/client/Completion.h" #include "qpid/client/ConnectionImpl.h" -#include "qpid/client/Response.h" -#include "qpid/client/SessionCore.h" +#include "qpid/client/Execution.h" +#include "qpid/client/SessionImpl.h" #include "qpid/client/TypedResult.h" #include "qpid/shared_ptr.h" #include <string> @@ -61,8 +61,11 @@ using framing::Uuid; * <em>later</em> function call. * * If you need to notify some extenal agent that some actions have - * been taken (e.g. binding queues to exchanages), you must call - * Session::sync() first, to ensure that all the commands are complete. + * been taken (e.g. binding queues to exchanges), you must ensure that + * the broker has completed the command. In synchronous mode this is + * when the session method for the command returns. In asynchronous + * mode you can call Session::sync(), to ensure that all the commands + * are complete. * * You can freely switch between modes by calling Session::setSynchronous() * @@ -77,6 +80,21 @@ enum SynchronousMode { SYNC=true, ASYNC=false }; class SessionBase { public: + /** + * Instances of this class turn synchronous mode on for the + * duration of their scope (and revert back to async if required + * afterwards). + */ + class ScopedSync + { + SessionBase& session; + const bool change; + public: + ScopedSync(SessionBase& s); + ~ScopedSync(); + }; + + SessionBase(); ~SessionBase(); @@ -110,23 +128,17 @@ class SessionBase /** Close the session */ void close(); - - /** - * Synchronize with the broker. Wait for all commands issued so far in - * the session to complete. - * @see SynchronousMode - */ - void sync(); Execution& getExecution(); + void sync(); typedef framing::TransferContent DefaultContent; protected: - shared_ptr<SessionCore> impl; + shared_ptr<SessionImpl> impl; framing::ProtocolVersion version; friend class Connection; - SessionBase(shared_ptr<SessionCore>); + SessionBase(shared_ptr<SessionImpl>); }; }} // namespace qpid::client diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp deleted file mode 100644 index 5079c47b5e..0000000000 --- a/cpp/src/qpid/client/SessionCore.cpp +++ /dev/null @@ -1,440 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "SessionCore.h" -#include "Future.h" -#include "FutureResponse.h" -#include "FutureResult.h" -#include "ConnectionImpl.h" -#include "qpid/framing/FrameSet.h" -#include "qpid/framing/constants.h" -#include "qpid/framing/ClientInvoker.h" -#include "qpid/log/Statement.h" - -#include <boost/bind.hpp> - -namespace qpid { -namespace client { - -using namespace qpid::framing; - -namespace { const std::string OK="ok"; } - -typedef sys::Monitor::ScopedLock Lock; -typedef sys::Monitor::ScopedUnlock UnLock; - -inline void SessionCore::invariant() const { - switch (state.get()) { - case OPENING: - assert(!session); - assert(code==REPLY_SUCCESS); - assert(connection); - assert(channel.get()); - assert(channel.next == connection.get()); - break; - case RESUMING: - assert(session); - assert(session->getState() == SessionState::RESUMING); - assert(code==REPLY_SUCCESS); - assert(connection); - assert(channel.get()); - assert(channel.next == connection.get()); - break; - case OPEN: - case CLOSING: - case SUSPENDING: - assert(session); - assert(connection); - assert(channel.get()); - assert(channel.next == connection.get()); - break; - case SUSPENDED: - assert(session); - assert(!connection); - break; - case CLOSED: - assert(!session); - assert(!connection); - break; - } -} - -inline void SessionCore::setState(State s) { - state = s; - invariant(); -} - -inline void SessionCore::waitFor(State s) { - invariant(); - // We can be CLOSED or SUSPENDED by error at any time. - state.waitFor(States(s, CLOSED, SUSPENDED)); - check(); - invariant(); -} - -SessionCore::SessionCore(shared_ptr<ConnectionImpl> conn, - uint16_t ch, uint64_t maxFrameSize) - : l3(maxFrameSize), - sync(false), - channel(ch), - proxy(channel), - state(OPENING), - detachedLifetime(0) -{ - l3.out = &out; - attaching(conn); -} - -void SessionCore::attaching(shared_ptr<ConnectionImpl> c) { - assert(c); - assert(channel.get()); - connection = c; - channel.next = connection.get(); - code = REPLY_SUCCESS; - text = OK; - state = session ? RESUMING : OPENING; - invariant(); -} - -SessionCore::~SessionCore() { - Lock l(state); - detach(COMMAND_INVALID, "Session deleted"); - state.waitWaiters(); -} - -void SessionCore::detach(int c, const std::string& t) { - connection.reset(); - channel.next = 0; - code=c; - text=t; - l3.getDemux().close(); -} - -void SessionCore::doClose(int code, const std::string& text) { - if (state != CLOSED) { - session.reset(); - detach(code, text); - setState(CLOSED); - l3.getCompletionTracker().close(); - } - invariant(); -} - -void SessionCore::doSuspend(int code, const std::string& text) { - if (state != CLOSED && state != SUSPENDED) { - detach(code, text); - session->suspend(); - setState(SUSPENDED); - } - invariant(); -} - -ExecutionHandler& SessionCore::getExecution() { // user thread - return l3; -} - -void SessionCore::setSync(bool s) { // user thread - sync = s; -} - -bool SessionCore::isSync() { // user thread - return sync; -} - -FrameSet::shared_ptr SessionCore::get() { // user thread - // No lock here: pop does a blocking wait. - return l3.getDemux().getDefault()->pop(); -} - -static const std::string CANNOT_REOPEN_SESSION="Cannot re-open a session."; - -void SessionCore::open(uint32_t timeout) { // user thread - Lock l(state); - check(state==OPENING && !session, - COMMAND_INVALID, CANNOT_REOPEN_SESSION); - detachedLifetime=timeout; - proxy.open(detachedLifetime); - waitFor(OPEN); -} - -void SessionCore::close() { // user thread - Lock l(state); - check(); - if (state==OPEN) { - setState(CLOSING); - proxy.close(); - waitFor(CLOSED); - } - else - doClose(REPLY_SUCCESS, OK); -} - -void SessionCore::suspend() { // user thread - Lock l(state); - checkOpen(); - setState(SUSPENDING); - proxy.suspend(); - waitFor(SUSPENDED); -} - -void SessionCore::setChannel(uint16_t ch) { channel=ch; } - -static const std::string CANNOT_RESUME_SESSION("Session cannot be resumed."); - -void SessionCore::resume(shared_ptr<ConnectionImpl> c) { - // user thread - { - Lock l(state); - if (state==SUSPENDED) { // Clear error that caused suspend - code=REPLY_SUCCESS; - text=OK; - } - check(state==SUSPENDED, COMMAND_INVALID, CANNOT_RESUME_SESSION); - SequenceNumber sendAck=session->resuming(); - attaching(c); - proxy.resume(getId()); - waitFor(OPEN); - proxy.ack(sendAck, SequenceNumberSet()); - // TODO aconway 2007-10-23: Replay inside the lock might be a prolem - // for large replay sets. - SessionState::Replay replay=session->replay(); - for (SessionState::Replay::iterator i = replay.begin(); - i != replay.end(); ++i) - { - invariant(); - channel.handle(*i); // Direct to channel. - check(); - } - l3.getDemux().open(); - } -} - -void SessionCore::assertOpen() const { - Lock l(state); - checkOpen(); -} - -static const std::string UNEXPECTED_SESSION_ATTACHED( - "Received unexpected session.attached"); - -static const std::string INVALID_SESSION_RESUME_ID( - "session.resumed has invalid ID."); - -// network thread -void SessionCore::attached(const Uuid& sessionId, - uint32_t /*detachedLifetime*/) -{ - Lock l(state); - invariant(); - check(state == OPENING || state == RESUMING, - COMMAND_INVALID, UNEXPECTED_SESSION_ATTACHED); - if (state==OPENING) { // New session - // TODO aconway 2007-10-17: 0 disables sesskon.ack for now. - // If AMQP WG decides to keep it, we need to add configuration - // for the ack rate. - session=in_place<SessionState>(0, detachedLifetime > 0, sessionId); - setState(OPEN); - } - else { // RESUMING - check(sessionId == session->getId(), - INVALID_ARGUMENT, INVALID_SESSION_RESUME_ID); - // Don't setState yet, wait for first incoming ack. - } -} - -static const std::string UNEXPECTED_SESSION_DETACHED( - "Received unexpected session.detached."); - -static const std::string UNEXPECTED_SESSION_ACK( - "Received unexpected session.ack"); - -void SessionCore::detached() { // network thread - Lock l(state); - check(state == SUSPENDING, - COMMAND_INVALID, UNEXPECTED_SESSION_DETACHED); - doSuspend(REPLY_SUCCESS, OK); -} - -void SessionCore::ack(uint32_t ack, const SequenceNumberSet&) { - Lock l(state); - invariant(); - check(state==OPEN || state==RESUMING, - COMMAND_INVALID, UNEXPECTED_SESSION_ACK); - session->receivedAck(ack); - if (state==RESUMING) { - setState(OPEN); - } - invariant(); -} - -void SessionCore::closed(uint16_t code, const std::string& text) -{ // network thread - Lock l(state); - invariant(); - doClose(code, text); -} - -// closed by connection -void SessionCore::connectionClosed(uint16_t code, const std::string& text) { - Lock l(state); - try { - doClose(code, text); - } catch(...) { assert (0); } -} - -void SessionCore::connectionBroke(uint16_t code, const std::string& text) { - Lock l(state); - try { - doSuspend(code, text); - } catch (...) { assert(0); } -} - -void SessionCore::check() const { // Called with lock held. - invariant(); - if (code != REPLY_SUCCESS) - throwReplyException(code, text); -} - -void SessionCore::check(bool cond, int newCode, const std::string& msg) const { - check(); - if (!cond) { - const_cast<SessionCore*>(this)->doClose(newCode, msg); - throwReplyException(code, text); - } -} - -static const std::string SESSION_NOT_OPEN("Session is not open"); - -void SessionCore::checkOpen() const { - if (state==SUSPENDED) { - std::string cause; - if (code != REPLY_SUCCESS) - cause=" by :"+text; - throw CommandInvalidException(QPID_MSG("Session is suspended" << cause)); - } - check(state==OPEN, COMMAND_INVALID, SESSION_NOT_OPEN); -} - -Future SessionCore::send(const AMQBody& command) -{ - Lock l(state); - checkOpen(); - command.getMethod()->setSync(sync); - Future f; - //any result/response listeners must be set before the command is sent - if (command.getMethod()->resultExpected()) { - boost::shared_ptr<FutureResult> r(new FutureResult()); - f.setFutureResult(r); - //result listener is tied to command id, and is set when that - //is allocated by the execution handler, so pass it to send - f.setCommandId(l3.send(command, boost::bind(&FutureResult::received, r, _1))); - } else { - if (command.getMethod()->responseExpected()) { - boost::shared_ptr<FutureResponse> r(new FutureResponse()); - f.setFutureResponse(r); - l3.getCorrelator().listen(boost::bind(&FutureResponse::received, r, _1)); - } - - f.setCommandId(l3.send(command)); - } - return f; -} - -Future SessionCore::send(const AMQBody& command, const MethodContent& content) -{ - Lock l(state); - checkOpen(); - //content bearing methods don't currently have responses or - //results, if that changes should follow procedure for the other - //send method impl: - return Future(l3.send(command, content)); -} - -namespace { -bool isCloseResponse(const AMQFrame& frame) { - return frame.getMethod() && - frame.getMethod()->amqpClassId() == SESSION_CLASS_ID && - frame.getMethod()->amqpMethodId() == SESSION_CLOSED_METHOD_ID; -} -} - -// Network thread. -void SessionCore::handleIn(AMQFrame& frame) { - ConnectionImpl::shared_ptr save; - { - Lock l(state); - save=connection; - // Ignore frames received while closing other than closed response. - if (state==CLOSING && !isCloseResponse(frame)) - return; - } - try { - // Cast to expose private SessionHandler functions. - if (invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) { - // If we were detached by a session command, tell the connection. - if (!connection) save->erase(channel); - } - else { - session->received(frame); - l3.handle(frame); - } - } catch (const ChannelException& e) { - QPID_LOG(error, "Channel exception:" << e.what()); - doClose(e.code, e.what()); - } -} - -void SessionCore::handleOut(AMQFrame& frame) -{ - Lock l(state); - if (state==OPEN) { - if (detachedLifetime > 0 && session->sent(frame)) - proxy.solicitAck(); - channel.handle(frame); - } -} - -void SessionCore::solicitAck( ) { - Lock l(state); - checkOpen(); - proxy.ack(session->sendingAck(), SequenceNumberSet()); -} - -void SessionCore::flow(bool) { - assert(0); throw NotImplementedException("session.flow"); -} - -void SessionCore::flowOk(bool /*active*/) { - assert(0); throw NotImplementedException("session.flow"); -} - -void SessionCore::highWaterMark(uint32_t /*lastSentMark*/) { - // TODO aconway 2007-10-02: may be removed from spec. - assert(0); throw NotImplementedException("session.highWaterMark"); -} - -const Uuid SessionCore::getId() const { - if (session) - return session->getId(); - throw Exception(QPID_MSG("Closed session, no ID.")); -} - -}} // namespace qpid::client diff --git a/cpp/src/qpid/client/SessionCore.h b/cpp/src/qpid/client/SessionCore.h deleted file mode 100644 index 2bb0f41fbf..0000000000 --- a/cpp/src/qpid/client/SessionCore.h +++ /dev/null @@ -1,141 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#ifndef _SessionCore_ -#define _SessionCore_ - -#include "qpid/shared_ptr.h" -#include "qpid/framing/FrameHandler.h" -#include "qpid/framing/ChannelHandler.h" -#include "qpid/framing/SessionState.h" -#include "qpid/framing/SequenceNumber.h" -#include "qpid/framing/AMQP_ClientOperations.h" -#include "qpid/framing/AMQP_ServerProxy.h" -#include "qpid/sys/StateMonitor.h" -#include "ExecutionHandler.h" - -#include <boost/optional.hpp> - -namespace qpid { -namespace framing { -class FrameSet; -class MethodContent; -class SequenceNumberSet; -} - -namespace client { - -class Future; -class ConnectionImpl; - -/** - * Session implementation, sets up handler chains. - * Attaches to a SessionHandler when active, detaches - * when closed. - */ -class SessionCore : public framing::FrameHandler::InOutHandler, - private framing::AMQP_ClientOperations::SessionHandler -{ - public: - SessionCore(shared_ptr<ConnectionImpl>, uint16_t channel, uint64_t maxFrameSize); - ~SessionCore(); - - framing::FrameSet::shared_ptr get(); - const framing::Uuid getId() const; - uint16_t getChannel() const { return channel; } - void assertOpen() const; - - // NOTE: Public functions called in user thread. - void open(uint32_t detachedLifetime); - void close(); - void resume(shared_ptr<ConnectionImpl>); - void suspend(); - void setChannel(uint16_t channel); - - void setSync(bool s); - bool isSync(); - ExecutionHandler& getExecution(); - - Future send(const framing::AMQBody& command); - - Future send(const framing::AMQBody& command, const framing::MethodContent& content); - - void connectionClosed(uint16_t code, const std::string& text); - void connectionBroke(uint16_t code, const std::string& text); - - private: - enum State { - OPENING, - RESUMING, - OPEN, - CLOSING, - SUSPENDING, - SUSPENDED, - CLOSED - }; - typedef framing::AMQP_ClientOperations::SessionHandler SessionHandler; - typedef sys::StateMonitor<State, CLOSED> StateMonitor; - typedef StateMonitor::Set States; - - inline void invariant() const; - inline void setState(State s); - inline void waitFor(State); - void doClose(int code, const std::string& text); - void doSuspend(int code, const std::string& text); - - /** If there is an error, throw the exception */ - void check(bool condition, int code, const std::string& text) const; - /** Throw if *error */ - void check() const; - - void handleIn(framing::AMQFrame& frame); - void handleOut(framing::AMQFrame& frame); - - // Private functions are called by broker in network thread. - void attached(const framing::Uuid& sessionId, uint32_t detachedLifetime); - void flow(bool active); - void flowOk(bool active); - void detached(); - void ack(uint32_t cumulativeSeenMark, - const framing::SequenceNumberSet& seenFrameSet); - void highWaterMark(uint32_t lastSentMark); - void solicitAck(); - void closed(uint16_t code, const std::string& text); - - void attaching(shared_ptr<ConnectionImpl>); - void detach(int code, const std::string& text); - void checkOpen() const; - - int code; // Error code - std::string text; // Error text - boost::optional<framing::SessionState> session; - shared_ptr<ConnectionImpl> connection; - ExecutionHandler l3; - volatile bool sync; - framing::ChannelHandler channel; - framing::AMQP_ServerProxy::Session proxy; - mutable StateMonitor state; - uint32_t detachedLifetime; -}; - -}} // namespace qpid::client - -#endif diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp new file mode 100644 index 0000000000..57f12cf28e --- /dev/null +++ b/cpp/src/qpid/client/SessionImpl.cpp @@ -0,0 +1,605 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "SessionImpl.h" + +#include "ConnectionImpl.h" +#include "Future.h" + +#include "qpid/framing/all_method_bodies.h" +#include "qpid/framing/ClientInvoker.h" +#include "qpid/framing/constants.h" +#include "qpid/framing/FrameSet.h" +#include "qpid/framing/MethodContent.h" +#include "qpid/framing/SequenceSet.h" +#include "qpid/log/Statement.h" + +#include <boost/bind.hpp> + +namespace { const std::string OK="ok"; } + +namespace qpid { +namespace client { + +using namespace qpid::framing; + +typedef sys::Monitor::ScopedLock Lock; +typedef sys::Monitor::ScopedUnlock UnLock; + + +SessionImpl::SessionImpl(shared_ptr<ConnectionImpl> conn, + uint16_t ch, uint64_t _maxFrameSize) + : code(REPLY_SUCCESS), + text(OK), + state(INACTIVE), + syncMode(false), + detachedLifetime(0), + maxFrameSize(_maxFrameSize), + id(true),//generate unique uuid for each session + name(id.str()), + //TODO: may want to allow application defined names instead + connection(conn), + channel(ch), + proxy(channel), + nextIn(0), + nextOut(0) +{ + channel.next = connection.get(); +} + +SessionImpl::~SessionImpl() { + Lock l(state); + if (state != DETACHED) { + QPID_LOG(warning, "Detaching deleted session"); + setState(DETACHED); + handleClosed(); + state.waitWaiters(); + } + connection->erase(channel); +} + +void SessionImpl::setSync(bool s) // user thread +{ + syncMode = s; //syncMode is volatile +} + +bool SessionImpl::isSync() // user thread +{ + return syncMode; //syncMode is volatile +} + +FrameSet::shared_ptr SessionImpl::get() // user thread +{ + // No lock here: pop does a blocking wait. + return demux.getDefault()->pop(); +} + +const Uuid SessionImpl::getId() const //user thread +{ + return id; //id is immutable +} + +void SessionImpl::open(uint32_t timeout) // user thread +{ + Lock l(state); + if (state == INACTIVE) { + setState(ATTACHING); + proxy.attach(name, false); + waitFor(ATTACHED); + //TODO: timeout will not be set locally until get response to + //confirm, should we wait for that? + proxy.requestTimeout(timeout); + proxy.commandPoint(nextOut, 0); + } else { + throw Exception("Open already called for this session"); + } +} + +void SessionImpl::close() //user thread +{ + Lock l(state); + if (detachedLifetime) { + proxy.requestTimeout(0); + //should we wait for the timeout response? + detachedLifetime = 0; + } + detach(); + waitFor(DETACHED); +} + +void SessionImpl::resume(shared_ptr<ConnectionImpl>) // user thread +{ + throw NotImplementedException("Resume not yet implemented by client!"); +} + +void SessionImpl::suspend() //user thread +{ + Lock l(state); + detach(); +} + +void SessionImpl::detach() //call with lock held +{ + if (state == ATTACHED) { + proxy.detach(name); + setState(DETACHING); + } +} + + +uint16_t SessionImpl::getChannel() const // user thread +{ + return channel; +} + +void SessionImpl::setChannel(uint16_t c) // user thread +{ + //channel will only ever be set when session is detached (and + //about to be resumed) + channel = c; +} + +Demux& SessionImpl::getDemux() +{ + return demux; +} + +void SessionImpl::waitForCompletion(const SequenceNumber& id) +{ + Lock l(state); + waitForCompletionImpl(id); +} + +void SessionImpl::waitForCompletionImpl(const SequenceNumber& id) //call with lock held +{ + while (incompleteOut.contains(id)) { + checkOpen(); + state.wait(); + } +} + +bool SessionImpl::isComplete(const SequenceNumber& id) +{ + Lock l(state); + return !incompleteOut.contains(id); +} + +struct IsCompleteUpTo +{ + const SequenceNumber& id; + bool result; + + IsCompleteUpTo(const SequenceNumber& _id) : id(_id), result(true) {} + void operator()(const SequenceNumber& start, const SequenceNumber&) + { + if (start <= id) result = false; + } + +}; + +bool SessionImpl::isCompleteUpTo(const SequenceNumber& id) +{ + Lock l(state); + //return false if incompleteOut contains anything less than id, + //true otherwise + IsCompleteUpTo f(id); + incompleteIn.for_each(f); + return f.result; +} + +struct MarkCompleted +{ + const SequenceNumber& id; + SequenceSet& completedIn; + + MarkCompleted(const SequenceNumber& _id, SequenceSet& set) : id(_id), completedIn(set) {} + + void operator()(const SequenceNumber& start, const SequenceNumber& end) + { + if (id >= end) { + completedIn.add(start, end); + } else if (id >= start) { + completedIn.add(start, id); + } + } + +}; + +void SessionImpl::markCompleted(const SequenceNumber& id, bool cumulative, bool notifyPeer) +{ + Lock l(state); + if (cumulative) { + //everything in incompleteIn less than or equal to id is now complete + MarkCompleted f(id, completedIn); + incompleteIn.for_each(f); + //make sure id itself is in + completedIn.add(id); + //then remove anything thats completed from the incomplete set + incompleteIn.remove(completedIn); + } else if (incompleteIn.contains(id)) { + incompleteIn.remove(id); + completedIn.add(id); + } + if (notifyPeer) { + sendCompletion(); + } +} + +/** + * Called by ConnectionImpl to notify active sessions when connection + * is explictly closed + */ +void SessionImpl::connectionClosed(uint16_t _code, const std::string& _text) +{ + Lock l(state); + code = _code; + text = _text; + setState(DETACHED); + handleClosed(); +} + +/** + * Called by ConnectionImpl to notify active sessions when connection + * is disconnected + */ +void SessionImpl::connectionBroke(uint16_t _code, const std::string& _text) +{ + connectionClosed(_code, _text); +} + +Future SessionImpl::send(const AMQBody& command) +{ + return sendCommand(command); +} + +Future SessionImpl::send(const AMQBody& command, const MethodContent& content) +{ + return sendCommand(command, &content); +} + +Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent* content) +{ + Lock l(state); + checkOpen(); + SequenceNumber id = nextOut++; + incompleteOut.add(id); + + if (syncMode) command.getMethod()->setSync(syncMode); + Future f(id); + if (command.getMethod()->resultExpected()) { + //result listener must be set before the command is sent + f.setFutureResult(results.listenForResult(id)); + } + AMQFrame frame(command); + if (content) { + frame.setEof(false); + } + handleOut(frame); + if (content) { + sendContent(*content); + } + if (syncMode) { + waitForCompletionImpl(id); + } + return f; +} + +void SessionImpl::sendContent(const MethodContent& content) +{ + AMQFrame header(content.getHeader()); + header.setBof(false); + u_int64_t data_length = content.getData().length(); + if(data_length > 0){ + header.setEof(false); + handleOut(header); + /*Note: end of frame marker included in overhead but not in size*/ + const u_int32_t frag_size = maxFrameSize - (AMQFrame::frameOverhead() - 1); + + if(data_length < frag_size){ + AMQFrame frame(in_place<AMQContentBody>(content.getData())); + frame.setBof(false); + handleOut(frame); + }else{ + u_int32_t offset = 0; + u_int32_t remaining = data_length - offset; + while (remaining > 0) { + u_int32_t length = remaining > frag_size ? frag_size : remaining; + string frag(content.getData().substr(offset, length)); + AMQFrame frame(in_place<AMQContentBody>(frag)); + frame.setBof(false); + if (offset > 0) { + frame.setBos(false); + } + offset += length; + remaining = data_length - offset; + if (remaining) { + frame.setEos(false); + frame.setEof(false); + } + handleOut(frame); + } + } + } else { + handleOut(header); + } +} + + +bool isMessageMethod(AMQMethodBody* method) +{ + return method->isA<MessageTransferBody>(); +} + +bool isMessageMethod(AMQBody* body) +{ + AMQMethodBody* method=body->getMethod(); + return method && isMessageMethod(method); +} + +bool isContentFrame(AMQFrame& frame) +{ + AMQBody* body = frame.getBody(); + uint8_t type = body->type(); + return type == HEADER_BODY || type == CONTENT_BODY || isMessageMethod(body); +} + +void SessionImpl::handleIn(AMQFrame& frame) // network thread +{ + try { + if (!invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) { + if (invoke(static_cast<ExecutionHandler&>(*this), *frame.getBody())) { + //make sure the command id sequence and completion + //tracking takes account of execution commands + Lock l(state); + completedIn.add(nextIn++); + } else { + //if not handled by this class, its for the application: + deliver(frame); + } + } + } catch (const SessionException& e) { + //TODO: proper 0-10 exception handling + QPID_LOG(error, "Session exception:" << e.what()); + Lock l(state); + code = e.code; + text = e.what(); + } +} + +void SessionImpl::handleOut(AMQFrame& frame) // user thread +{ + channel.handle(frame); +} + +void SessionImpl::deliver(AMQFrame& frame) // network thread +{ + if (!arriving) { + arriving = FrameSet::shared_ptr(new FrameSet(nextIn++)); + } + arriving->append(frame); + if (arriving->isComplete()) { + //message.transfers will be marked completed only when 'acked' + //as completion affects flow control; other commands will be + //considered completed as soon as processed here + if (arriving->isA<MessageTransferBody>()) { + Lock l(state); + incompleteIn.add(arriving->getId()); + } else { + Lock l(state); + completedIn.add(arriving->getId()); + } + demux.handle(arriving); + arriving.reset(); + } +} + +//control handler methods (called by network thread when controls are +//received from peer): + +void SessionImpl::attach(const std::string& /*name*/, bool /*force*/) +{ + throw NotImplementedException("Client does not support attach"); +} + +void SessionImpl::attached(const std::string& _name) +{ + Lock l(state); + if (name != _name) throw InternalErrorException("Incorrect session name"); + setState(ATTACHED); +} + +void SessionImpl::detach(const std::string& /*name*/) +{ + throw NotImplementedException("Client does not support detach"); +} + +void SessionImpl::detached(const std::string& _name, uint8_t _code) +{ + Lock l(state); + if (name != _name) throw InternalErrorException("Incorrect session name"); + setState(DETACHED); + if (_code) { + //TODO: make sure this works with execution.exception - don't + //want to overwrite the code from that + QPID_LOG(error, "Session detached by peer: " << name << " " << code); + code = _code; + text = "Session detached by peer"; + } + if (detachedLifetime == 0) { + handleClosed(); + } +} + +void SessionImpl::requestTimeout(uint32_t t) +{ + Lock l(state); + detachedLifetime = t; + proxy.timeout(t); +} + +void SessionImpl::timeout(uint32_t t) +{ + Lock l(state); + detachedLifetime = t; +} + +void SessionImpl::commandPoint(const framing::SequenceNumber& id, uint64_t offset) +{ + if (offset) throw NotImplementedException("Non-zero byte offset not yet supported for command-point"); + + Lock l(state); + nextIn = id; +} + +void SessionImpl::expected(const framing::SequenceSet& commands, const framing::Array& fragments) +{ + if (!commands.empty() || fragments.size()) { + throw NotImplementedException("Session resumption not yet supported"); + } +} + +void SessionImpl::confirmed(const framing::SequenceSet& /*commands*/, const framing::Array& /*fragments*/) +{ + //don't really care too much about this yet +} + +void SessionImpl::completed(const framing::SequenceSet& commands, bool timelyReply) +{ + Lock l(state); + incompleteOut.remove(commands); + state.notify();//notify any waiters of completion + completedOut.add(commands); + //notify any waiting results of completion + results.completed(commands); + + if (timelyReply) { + proxy.knownCompleted(completedOut); + completedOut.clear(); + } +} + +void SessionImpl::knownCompleted(const framing::SequenceSet& commands) +{ + Lock l(state); + completedIn.remove(commands); +} + +void SessionImpl::flush(bool expected, bool confirmed, bool completed) +{ + Lock l(state); + if (expected) { + proxy.expected(SequenceSet(nextIn), Array()); + } + if (confirmed) { + proxy.confirmed(completedIn, Array()); + } + if (completed) { + proxy.completed(completedIn, true); + } +} + +void SessionImpl::sendCompletion() +{ + proxy.completed(completedIn, true); +} + +void SessionImpl::gap(const framing::SequenceSet& /*commands*/) +{ + throw NotImplementedException("gap not yet supported"); +} + + + +void SessionImpl::sync() {} + +void SessionImpl::result(uint32_t commandId, const std::string& value) +{ + Lock l(state); + results.received(commandId, value); +} + +void SessionImpl::exception(uint16_t errorCode, + uint32_t commandId, + uint8_t classCode, + uint8_t commandCode, + uint8_t /*fieldIndex*/, + const std::string& description, + const framing::FieldTable& /*errorInfo*/) +{ + QPID_LOG(warning, "Exception received from peer: " << errorCode << ":" << description + << " [caused by " << commandId << " " << classCode << ":" << commandCode << "]"); + + Lock l(state); + code = errorCode; + text = description; + if (detachedLifetime) { + proxy.requestTimeout(0); + //should we wait for the timeout response? + detachedLifetime = 0; + } + detach(); +} + + +//private utility methods: + +inline void SessionImpl::setState(State s) //call with lock held +{ + state = s; +} + +inline void SessionImpl::waitFor(State s) //call with lock held +{ + // We can be DETACHED at any time + state.waitFor(States(s, DETACHED)); + check(); +} + +void SessionImpl::check() const //call with lock held. +{ + if (code != REPLY_SUCCESS) { + throwReplyException(code, text); + } +} + +void SessionImpl::checkOpen() const //call with lock held. +{ + check(); + if (state != ATTACHED) { + throwReplyException(0, "Session isn't attached"); + } +} + +void SessionImpl::assertOpen() const +{ + Lock l(state); + checkOpen(); +} + +void SessionImpl::handleClosed() +{ + QPID_LOG(info, "SessionImpl::handleClosed(): entering"); + demux.close(); + results.close(); + QPID_LOG(info, "SessionImpl::handleClosed(): returning"); +} + +}} diff --git a/cpp/src/qpid/client/SessionImpl.h b/cpp/src/qpid/client/SessionImpl.h new file mode 100644 index 0000000000..1284670389 --- /dev/null +++ b/cpp/src/qpid/client/SessionImpl.h @@ -0,0 +1,188 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef _SessionImpl_ +#define _SessionImpl_ + +#include "Demux.h" +#include "Execution.h" +#include "Results.h" + +#include "qpid/shared_ptr.h" +#include "qpid/framing/FrameHandler.h" +#include "qpid/framing/ChannelHandler.h" +#include "qpid/framing/SessionState.h" +#include "qpid/framing/SequenceNumber.h" +#include "qpid/framing/AMQP_ClientOperations.h" +#include "qpid/framing/AMQP_ServerProxy.h" +#include "qpid/sys/StateMonitor.h" + +#include <boost/optional.hpp> + +namespace qpid { + +namespace framing { + +class FrameSet; +class MethodContent; +class SequenceSet; + +} + +namespace client { + +class Future; +class ConnectionImpl; + +class SessionImpl : public framing::FrameHandler::InOutHandler, + public Execution, + private framing::AMQP_ClientOperations::Session010Handler, + private framing::AMQP_ClientOperations::Execution010Handler +{ +public: + SessionImpl(shared_ptr<ConnectionImpl>, uint16_t channel, uint64_t maxFrameSize); + ~SessionImpl(); + + + //NOTE: Public functions called in user thread. + framing::FrameSet::shared_ptr get(); + + const framing::Uuid getId() const; + + uint16_t getChannel() const; + void setChannel(uint16_t channel); + + void open(uint32_t detachedLifetime); + void close(); + void resume(shared_ptr<ConnectionImpl>); + void suspend(); + + void setSync(bool s); + bool isSync(); + void assertOpen() const; + + Future send(const framing::AMQBody& command); + Future send(const framing::AMQBody& command, const framing::MethodContent& content); + + Demux& getDemux(); + void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer); + bool isComplete(const framing::SequenceNumber& id); + bool isCompleteUpTo(const framing::SequenceNumber& id); + void waitForCompletion(const framing::SequenceNumber& id); + + //NOTE: these are called by the network thread when the connection is closed or dies + void connectionClosed(uint16_t code, const std::string& text); + void connectionBroke(uint16_t code, const std::string& text); + +private: + enum State { + INACTIVE, + ATTACHING, + ATTACHED, + DETACHING, + DETACHED + }; + typedef framing::AMQP_ClientOperations::Session010Handler SessionHandler; + typedef framing::AMQP_ClientOperations::Execution010Handler ExecutionHandler; + typedef sys::StateMonitor<State, DETACHED> StateMonitor; + typedef StateMonitor::Set States; + + inline void setState(State s); + inline void waitFor(State); + + void detach(); + + void check() const; + void checkOpen() const; + void handleClosed(); + + void handleIn(framing::AMQFrame& frame); + void handleOut(framing::AMQFrame& frame); + void deliver(framing::AMQFrame& frame); + + Future sendCommand(const framing::AMQBody&, const framing::MethodContent* = 0); + void sendContent(const framing::MethodContent&); + void waitForCompletionImpl(const framing::SequenceNumber& id); + + void sendCompletion(); + + // Note: Following methods are called by network thread in + // response to session controls from the broker + void attach(const std::string& name, bool force); + void attached(const std::string& name); + void detach(const std::string& name); + void detached(const std::string& name, uint8_t detachCode); + void requestTimeout(uint32_t timeout); + void timeout(uint32_t timeout); + void commandPoint(const framing::SequenceNumber& commandId, uint64_t commandOffset); + void expected(const framing::SequenceSet& commands, const framing::Array& fragments); + void confirmed(const framing::SequenceSet& commands, const framing::Array& fragments); + void completed(const framing::SequenceSet& commands, bool timelyReply); + void knownCompleted(const framing::SequenceSet& commands); + void flush(bool expected, bool confirmed, bool completed); + void gap(const framing::SequenceSet& commands); + + // Note: Following methods are called by network thread in + // response to execution commands from the broker + void sync(); + void result(uint32_t commandId, const std::string& value); + void exception(uint16_t errorCode, + uint32_t commandId, + uint8_t classCode, + uint8_t commandCode, + uint8_t fieldIndex, + const std::string& description, + const framing::FieldTable& errorInfo); + + + //hack for old generator: + void commandPoint(uint32_t id, uint64_t offset) { commandPoint(framing::SequenceNumber(id), offset); } + + int code; // Error code + std::string text; // Error text + mutable StateMonitor state; + volatile bool syncMode; + uint32_t detachedLifetime; + const uint64_t maxFrameSize; + const framing::Uuid id; + const std::string name; + + + shared_ptr<ConnectionImpl> connection; + framing::ChannelHandler channel; + framing::AMQP_ServerProxy::Session010 proxy; + + Results results; + Demux demux; + framing::FrameSet::shared_ptr arriving; + + framing::SequenceSet incompleteIn;//incoming commands that are as yet incomplete + framing::SequenceSet completedIn;//incoming commands that are have completed + framing::SequenceSet incompleteOut;//outgoing commands not yet known to be complete + framing::SequenceSet completedOut;//outgoing commands that we know to be completed + framing::SequenceNumber nextIn; + framing::SequenceNumber nextOut; + +}; + +}} // namespace qpid::client + +#endif diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp index f14344225c..b353be481b 100644 --- a/cpp/src/qpid/client/SubscriptionManager.cpp +++ b/cpp/src/qpid/client/SubscriptionManager.cpp @@ -35,7 +35,7 @@ namespace client { SubscriptionManager::SubscriptionManager(Session& s) : dispatcher(s), session(s), messages(UNLIMITED), bytes(UNLIMITED), window(true), - confirmMode(true), acquireMode(false), + acceptMode(0), acquireMode(0), autoStop(true) {} @@ -43,7 +43,7 @@ Completion SubscriptionManager::subscribeInternal( const std::string& q, const std::string& dest) { Completion c = session.messageSubscribe(arg::queue=q, arg::destination=dest, - arg::confirmMode=confirmMode, arg::acquireMode=acquireMode); + arg::acceptMode=acceptMode, arg::acquireMode=acquireMode); setFlowControl(dest, messages, bytes, window); return c; } @@ -68,7 +68,7 @@ Completion SubscriptionManager::subscribe( void SubscriptionManager::setFlowControl( const std::string& dest, uint32_t messages, uint32_t bytes, bool window) { - session.messageFlowMode(dest, window); + session.messageSetFlowMode(dest, window); session.messageFlow(dest, 0, messages); session.messageFlow(dest, 1, bytes); } @@ -81,7 +81,7 @@ void SubscriptionManager::setFlowControl( window=window_; } -void SubscriptionManager::setConfirmMode(bool c) { confirmMode=c; } +void SubscriptionManager::setAcceptMode(bool c) { acceptMode=c; } void SubscriptionManager::setAcquireMode(bool a) { acquireMode=a; } diff --git a/cpp/src/qpid/client/SubscriptionManager.h b/cpp/src/qpid/client/SubscriptionManager.h index 1741796f4f..48cb725fb8 100644 --- a/cpp/src/qpid/client/SubscriptionManager.h +++ b/cpp/src/qpid/client/SubscriptionManager.h @@ -53,7 +53,7 @@ class SubscriptionManager : public sys::Runnable uint32_t bytes; bool window; AckPolicy autoAck; - bool confirmMode; + bool acceptMode; bool acquireMode; bool autoStop; @@ -116,16 +116,16 @@ class SubscriptionManager : public sys::Runnable */ void setFlowControl(uint32_t messages, uint32_t bytes, bool window=true); - /** Set the confirm-mode for new subscriptions. Defaults to true. - *@param confirm: if true messages must be confirmed by calling + /** Set the accept-mode for new subscriptions. Defaults to true. + *@param required: if true messages must be confirmed by calling *Message::acknowledge() or automatically, see setAckPolicy() */ - void setConfirmMode(bool confirm); + void setAcceptMode(bool required); /** Set the acquire-mode for new subscriptions. Defaults to false. *@param acquire: if false messages pre-acquired, if true * messages are dequed on acknowledgement or on transfer - * depending on confirmMode. + * depending on acceptMode. */ void setAcquireMode(bool acquire); diff --git a/cpp/src/qpid/client/TypedResult.h b/cpp/src/qpid/client/TypedResult.h index edcf728c54..0b36be9716 100644 --- a/cpp/src/qpid/client/TypedResult.h +++ b/cpp/src/qpid/client/TypedResult.h @@ -33,7 +33,7 @@ template <class T> class TypedResult : public Completion bool decoded; public: - TypedResult(Future f, shared_ptr<SessionCore> s) : Completion(f, s), decoded(false) {} + TypedResult(Future f, shared_ptr<SessionImpl> s) : Completion(f, s), decoded(false) {} T& get() { |
