diff options
author | Alan Conway <aconway@apache.org> | 2008-05-20 13:44:34 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-05-20 13:44:34 +0000 |
commit | 0333573627c831142aa251bfb1cabdb1e2bf438e (patch) | |
tree | 953bf8c624374c57953aa3f2888254d175609d9a /cpp/src | |
parent | 96024622ccfcc8fdd24b3c9ace44f7c8849fac46 (diff) | |
download | qpid-python-0333573627c831142aa251bfb1cabdb1e2bf438e.tar.gz |
Support for AMQP 0-10 sessions in C++ broker.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@658246 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
22 files changed, 478 insertions, 717 deletions
diff --git a/cpp/src/qpid/SessionState.cpp b/cpp/src/qpid/SessionState.cpp index 8905fb5f9d..2a6b8303b3 100644 --- a/cpp/src/qpid/SessionState.cpp +++ b/cpp/src/qpid/SessionState.cpp @@ -31,6 +31,7 @@ using framing::AMQFrame; using amqp_0_10::NotImplementedException; using amqp_0_10::InvalidArgumentException; using amqp_0_10::IllegalStateException; +using amqp_0_10::ResourceLimitExceededException; namespace { bool isControl(const AMQFrame& f) { @@ -63,78 +64,83 @@ bool SessionPoint::operator==(const SessionPoint& x) const { return command == x.command && offset == x.offset; } -SessionState::SendState::SendState(SessionState& s) : session(&s), unflushedSize(0) {} +SessionPoint SessionState::senderGetCommandPoint() { return sender.sendPoint; } +SequenceSet SessionState::senderGetIncomplete() const { return sender.incomplete; } +SessionPoint SessionState::senderGetReplayPoint() const { return sender.replayPoint; } -const SessionPoint& SessionState::SendState::getCommandPoint() { - return sendPoint; +SessionState::ReplayRange SessionState::senderExpected(const SessionPoint& expect) { + if (expect < sender.replayPoint || sender.sendPoint < expect) + throw InvalidArgumentException(QPID_MSG(getId() << ": expected command-point out of range.")); + ReplayList::iterator i = sender.replayList.begin(); + SessionPoint p = sender.replayPoint; + while (i != sender.replayList.end() && p.command < expect.command) + p.advance(*i++); + assert(p.command == expect.command); + return boost::make_iterator_range(i, sender.replayList.end()); } -bool SessionState::SendState::expected(const SessionPoint& point) { - if (point < replayPoint || sendPoint < point) - throw InvalidArgumentException(QPID_MSG(session->getId() << ": expected command-point out of range.")); - // FIXME aconway 2008-05-06: this is not strictly correct, we should keep - // an intermediate replay pointer into the replay list. - confirmed(point); // Drop commands prior to expected from replay. - return (!replayList.empty()); -} - -void SessionState::SendState::record(const AMQFrame& f) { +void SessionState::senderRecord(const AMQFrame& f) { if (isControl(f)) return; // Ignore control frames. - session->stateful = true; - replayList.push_back(f); - unflushedSize += f.size(); - incomplete += sendPoint.command; - sendPoint.advance(f); -} - -bool SessionState::SendState::needFlush() const { return unflushedSize >= session->config.replaySyncSize; } - -void SessionState::SendState::recordFlush() { - assert(flushPoint <= sendPoint); - flushPoint = sendPoint; - unflushedSize = 0; -} - -void SessionState::SendState::confirmed(const SessionPoint& confirmed) { - if (confirmed > sendPoint) - throw InvalidArgumentException(QPID_MSG(session->getId() << "Confirmed commands not yet sent.")); - ReplayList::iterator i = replayList.begin(); - while (i != replayList.end() && replayPoint.command < confirmed.command) { - replayPoint.advance(*i); - assert(replayPoint <= sendPoint); - if (replayPoint > flushPoint) - unflushedSize -= i->size(); + stateful = true; + sender.replayList.push_back(f); + sender.unflushedSize += f.size(); + sender.replaySize += f.size(); + sender.incomplete += sender.sendPoint.command; + sender.sendPoint.advance(f); + if (config.replayHardLimit && config.replayHardLimit < sender.replaySize) + throw ResourceLimitExceededException("Replay bufffer exceeeded hard limit"); +} + +bool SessionState::senderNeedFlush() const { + return config.replayFlushLimit && sender.unflushedSize >= config.replayFlushLimit; +} + +void SessionState::senderRecordFlush() { + assert(sender.flushPoint <= sender.sendPoint); + sender.flushPoint = sender.sendPoint; + sender.unflushedSize = 0; +} + +void SessionState::senderConfirmed(const SessionPoint& confirmed) { + if (confirmed > sender.sendPoint) + throw InvalidArgumentException(QPID_MSG(getId() << "Confirmed commands not yet sent.")); + ReplayList::iterator i = sender.replayList.begin(); + while (i != sender.replayList.end() && sender.replayPoint.command < confirmed.command) { + sender.replayPoint.advance(*i); + assert(sender.replayPoint <= sender.sendPoint); + sender.replaySize -= i->size(); + if (sender.replayPoint > sender.flushPoint) + sender.unflushedSize -= i->size(); ++i; } - if (replayPoint > flushPoint) flushPoint = replayPoint; - replayList.erase(replayList.begin(), i); - assert(replayPoint.offset == 0); + if (sender.replayPoint > sender.flushPoint) + sender.flushPoint = sender.replayPoint; + sender.replayList.erase(sender.replayList.begin(), i); + assert(sender.replayPoint.offset == 0); } -void SessionState::SendState::completed(const SequenceSet& commands) { +void SessionState::senderCompleted(const SequenceSet& commands) { if (commands.empty()) return; - incomplete -= commands; + sender.incomplete -= commands; // Completion implies confirmation but we don't handle out-of-order // confirmation, so confirm only the first contiguous range of commands. - confirmed(SessionPoint(commands.rangesBegin()->end())); + senderConfirmed(SessionPoint(commands.rangesBegin()->end())); } -SessionState::ReceiveState::ReceiveState(SessionState& s) : session(&s) {} - -void SessionState::ReceiveState::setCommandPoint(const SessionPoint& point) { - if (session->hasState() && point > received) - throw InvalidArgumentException(QPID_MSG(session->getId() << ": Command-point out of range.")); - expected = point; - if (expected > received) - received = expected; +void SessionState::receiverSetCommandPoint(const SessionPoint& point) { + if (hasState() && point > receiver.received) + throw InvalidArgumentException(QPID_MSG(getId() << ": Command-point out of range.")); + receiver.expected = point; + if (receiver.expected > receiver.received) + receiver.received = receiver.expected; } -bool SessionState::ReceiveState::record(const AMQFrame& f) { +bool SessionState::receiverRecord(const AMQFrame& f) { if (isControl(f)) return true; // Ignore control frames. - session->stateful = true; - expected.advance(f); - if (expected > received) { - received = expected; + stateful = true; + receiver.expected.advance(f); + if (receiver.expected > receiver.received) { + receiver.received = receiver.expected; return true; } else { @@ -143,41 +149,43 @@ bool SessionState::ReceiveState::record(const AMQFrame& f) { } } -void SessionState::ReceiveState::completed(SequenceNumber command, bool cumulative) { - assert(command <= received.command); // Internal error to complete an unreceived command. - assert(firstIncomplete <= command); +void SessionState::receiverCompleted(SequenceNumber command, bool cumulative) { + assert(command <= receiver.received.command); // Internal error to complete an unreceived command. + assert(receiver.firstIncomplete <= command); if (cumulative) - unknownCompleted.add(firstIncomplete, command); + receiver.unknownCompleted.add(receiver.firstIncomplete, command); else - unknownCompleted += command; - firstIncomplete = unknownCompleted.rangeContaining(firstIncomplete).end(); + receiver.unknownCompleted += command; + receiver.firstIncomplete = receiver.unknownCompleted.rangeContaining(receiver.firstIncomplete).end(); + QPID_LOG(debug, "Completed " << command << " unknown=" << receiver.unknownCompleted); } -void SessionState::ReceiveState::knownCompleted(const SequenceSet& commands) { - if (!commands.empty() && commands.back() > received.command) - throw InvalidArgumentException(QPID_MSG(session->getId() << ": Known-completed has invalid commands.")); - unknownCompleted -= commands; +void SessionState::receiverKnownCompleted(const SequenceSet& commands) { + if (!commands.empty() && commands.back() > receiver.received.command) + throw InvalidArgumentException(QPID_MSG(getId() << ": Known-completed has invalid commands.")); + receiver.unknownCompleted -= commands; } -SequenceNumber SessionState::ReceiveState::getCurrent() const { - SequenceNumber current = expected.command; // FIXME aconway 2008-05-08: SequenceNumber arithmetic. - return --current; +const SessionPoint& SessionState::receiverGetExpected() const { return receiver.expected; } +const SessionPoint& SessionState::receiverGetReceived() const { return receiver.received; } +const SequenceSet& SessionState::receiverGetUnknownComplete() const { return receiver.unknownCompleted; } + +SequenceNumber SessionState::receiverGetCurrent() const { + SequenceNumber current = receiver.expected.command; + if (receiver.expected.offset == 0) + --current; + return current; } -// FIXME aconway 2008-05-02: implement sync & kill limits. -SessionState::Configuration::Configuration() - : replaySyncSize(std::numeric_limits<size_t>::max()), - replayKillSize(std::numeric_limits<size_t>::max()) {} +SessionState::Configuration::Configuration(size_t flush, size_t hard) : + replayFlushLimit(flush), replayHardLimit(hard) {} -SessionState::SessionState(const SessionId& i, const Configuration& c) - : sender(*this), receiver(*this), id(i), timeout(), config(c), stateful() +SessionState::SessionState(const SessionId& i, const Configuration& c) : id(i), timeout(), config(c), stateful() { QPID_LOG(debug, "SessionState::SessionState " << id << ": " << this); } -bool SessionState::hasState() const { - return stateful; -} +bool SessionState::hasState() const { return stateful; } SessionState::~SessionState() {} diff --git a/cpp/src/qpid/SessionState.h b/cpp/src/qpid/SessionState.h index 7957825dd3..47fabb04c8 100644 --- a/cpp/src/qpid/SessionState.h +++ b/cpp/src/qpid/SessionState.h @@ -28,6 +28,7 @@ #include <qpid/framing/AMQFrame.h> #include <qpid/framing/FrameHandler.h> #include <boost/operators.hpp> +#include <boost/range/iterator_range.hpp> #include <vector> #include <iosfwd> @@ -70,135 +71,118 @@ std::ostream& operator<<(std::ostream&, const SessionPoint&); * source-incompatbile API changes. */ class SessionState { - public: + typedef std::vector<framing::AMQFrame> ReplayList; - /** State for commands sent. Records commands for replay, - * tracks confirmation and completion of sent commands. - */ -class SendState { public: - typedef std::vector<framing::AMQFrame> ReplayList; + + typedef boost::iterator_range<ReplayList::iterator> ReplayRange; + + struct Configuration { + Configuration(size_t flush=0, size_t hard=0); + size_t replayFlushLimit; // Flush when the replay list >= N bytes. 0 disables. + size_t replayHardLimit; // Kill session if replay list > N bytes. 0 disables. + }; + + SessionState(const SessionId& =SessionId(), const Configuration& =Configuration()); + + virtual ~SessionState(); + + bool hasState() const; + + const SessionId& getId() const { return id; } + + uint32_t getTimeout() const { return timeout; } + void setTimeout(uint32_t seconds) { timeout = seconds; } + + bool operator==(const SessionId& other) const { return id == other; } + bool operator==(const SessionState& other) const { return id == other.id; } + + // ==== Functions for sender state. /** Record frame f for replay. Should not be called during replay. */ - void record(const framing::AMQFrame& f); + virtual void senderRecord(const framing::AMQFrame& f); /** @return true if we should send flush for confirmed and completed commands. */ - bool needFlush() const; + virtual bool senderNeedFlush() const; /** Called when flush for confirmed and completed commands is sent to peer. */ - void recordFlush(); + virtual void senderRecordFlush(); /** Called when the peer confirms up to comfirmed. */ - void confirmed(const SessionPoint& confirmed); + virtual void senderConfirmed(const SessionPoint& confirmed); /** Called when the peer indicates commands completed */ - void completed(const SequenceSet& commands); + virtual void senderCompleted(const SequenceSet& commands); - /** Point from which we can replay. All data < replayPoint is confirmed. */ - const SessionPoint& getReplayPoint() const { return replayPoint; } - - /** Get the replay list, starting from getReplayPoint() */ - // TODO aconway 2008-04-30: should be const, but FrameHandler takes non-const AMQFrame&. - ReplayList& getReplayList() { return replayList; } - - /** Point from which the next data will be sent. */ - const SessionPoint& getCommandPoint(); + /** Point from which the next new (not replayed) data will be sent. */ + virtual SessionPoint senderGetCommandPoint(); /** Set of outstanding incomplete commands */ - const SequenceSet& getIncomplete() const { return incomplete; } + virtual SequenceSet senderGetIncomplete() const; + + /** Point from which we can replay. */ + virtual SessionPoint senderGetReplayPoint() const; /** Peer expecting commands from this point. - *@return true if replay is required, sets replayPoint. + virtual *@return Range of frames to be replayed. */ - bool expected(const SessionPoint& expected); + virtual ReplayRange senderExpected(const SessionPoint& expected); - private: - SendState(SessionState& s); - SessionState* session; - // invariant: replayPoint <= flushPoint <= sendPoint - SessionPoint replayPoint; // Can replay from this point - SessionPoint flushPoint; // Point of last flush - SessionPoint sendPoint; // Send from this point - ReplayList replayList; // Starts from replayPoint. - size_t unflushedSize; // Un-flushed bytes in replay list. - SequenceSet incomplete; // Commands sent and not yet completed. + // ==== Functions for receiver state - friend class SessionState; -}; - - /** State for commands received. - * Idempotence barrier for duplicate commands, tracks completion - * and of received commands. - */ -class ReceiveState { - public: /** Set the command point. */ - void setCommandPoint(const SessionPoint& point); + virtual void receiverSetCommandPoint(const SessionPoint& point); /** Returns true if frame should be be processed, false if it is a duplicate. */ - bool record(const framing::AMQFrame& f); + virtual bool receiverRecord(const framing::AMQFrame& f); /** Command completed locally */ - void completed(SequenceNumber command, bool cumulative=false); + virtual void receiverCompleted(SequenceNumber command, bool cumulative=false); /** Peer has indicated commands are known completed */ - void knownCompleted(const SequenceSet& commands); + virtual void receiverKnownCompleted(const SequenceSet& commands); /** Get the incoming command point */ - const SessionPoint& getExpected() const { return expected; } + virtual const SessionPoint& receiverGetExpected() const; /** Get the received high-water-mark, may be > getExpected() during replay */ - const SessionPoint& getReceived() const { return received; } + virtual const SessionPoint& receiverGetReceived() const; /** Completed commands that the peer may not know about */ - const SequenceSet& getUnknownComplete() const { return unknownCompleted; } + virtual const SequenceSet& receiverGetUnknownComplete() const; /** ID of the command currently being handled. */ - SequenceNumber getCurrent() const; + virtual SequenceNumber receiverGetCurrent() const; private: - ReceiveState(SessionState&); + struct SendState { + SendState() : session(), unflushedSize(), replaySize() {} + SessionState* session; + // invariant: replayPoint <= flushPoint <= sendPoint + SessionPoint replayPoint; // Can replay from this point + SessionPoint flushPoint; // Point of last flush + SessionPoint sendPoint; // Send from this point + ReplayList replayList; // Starts from replayPoint. + size_t unflushedSize; // Un-flushed bytes in replay list. + size_t replaySize; // Total bytes in replay list. + SequenceSet incomplete; // Commands sent and not yet completed. + } sender; + + struct ReceiveState { + ReceiveState() : session() {} SessionState* session; SessionPoint expected; // Expected from here SessionPoint received; // Received to here. Invariant: expected <= received. SequenceSet unknownCompleted; // Received & completed, may not not known-complete by peer. SequenceNumber firstIncomplete; // First incomplete command. + } receiver; - friend class SessionState; - }; - - struct Configuration { - Configuration(); - size_t replaySyncSize; // Issue a sync when the replay list holds >= N bytes - size_t replayKillSize; // Kill session if replay list grows beyond N bytes. - }; - - SessionState(const SessionId& =SessionId(), const Configuration& =Configuration()); - - virtual ~SessionState(); - - const SessionId& getId() const { return id; } - uint32_t getTimeout() const { return timeout; } - void setTimeout(uint32_t seconds) { timeout = seconds; } - - bool operator==(const SessionId& other) const { return id == other; } - bool operator==(const SessionState& other) const { return id == other.id; } - - SendState sender; ///< State for commands sent - ReceiveState receiver; ///< State for commands received - - bool hasState() const; - - private: SessionId id; uint32_t timeout; Configuration config; bool stateful; - - friend class SendState; - friend class ReceiveState; }; inline bool operator==(const SessionId& id, const SessionState& s) { return s == id; } diff --git a/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp index 3fb2579e8c..6d43dd1789 100644 --- a/cpp/src/qpid/amqp_0_10/SessionHandler.cpp +++ b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp @@ -33,10 +33,8 @@ namespace amqp_0_10 { using namespace framing; using namespace std; -SessionHandler::SessionHandler() : peer(channel), ignoring(), sendReady(), receiveReady() {} - -SessionHandler::SessionHandler(FrameHandler& out, ChannelId ch) - : channel(ch, &out), peer(channel), ignoring(false) {} +SessionHandler::SessionHandler(FrameHandler* out, ChannelId ch) + : channel(ch, out), peer(channel), ignoring(false), sendReady(), receiveReady() {} SessionHandler::~SessionHandler() {} @@ -75,7 +73,7 @@ void SessionHandler::handleIn(AMQFrame& f) { checkAttached(); if (!receiveReady) throw IllegalStateException(QPID_MSG(getState()->getId() << ": Not ready to receive data")); - if (!getState()->receiver.record(f)) + if (!getState()->receiverRecord(f)) return; // Ignore duplicates. getInHandler()->handle(f); } @@ -100,10 +98,10 @@ void SessionHandler::handleOut(AMQFrame& f) { checkAttached(); if (!sendReady) throw IllegalStateException(QPID_MSG(getState()->getId() << ": Not ready to send data")); - getState()->sender.record(f); - if (getState()->sender.needFlush()) { + getState()->senderRecord(f); + if (getState()->senderNeedFlush()) { peer.flush(false, true, true); - getState()->sender.recordFlush(); + getState()->senderRecordFlush(); } channel.handle(f); } @@ -128,7 +126,7 @@ void SessionHandler::attach(const std::string& name, bool force) { if (getState()->hasState()) peer.flush(true, true, true); else - sendCommandPoint(); + sendCommandPoint(getState()->senderGetCommandPoint()); } void SessionHandler::attached(const std::string& name) { @@ -168,7 +166,7 @@ void SessionHandler::timeout(uint32_t t) { void SessionHandler::commandPoint(const SequenceNumber& id, uint64_t offset) { checkAttached(); - getState()->receiver.setCommandPoint(SessionPoint(id, offset)); + getState()->receiverSetCommandPoint(SessionPoint(id, offset)); if (!receiveReady) { receiveReady = true; readyToReceive(); @@ -177,32 +175,37 @@ void SessionHandler::commandPoint(const SequenceNumber& id, uint64_t offset) { void SessionHandler::expected(const SequenceSet& commands, const Array& /*fragments*/) { checkAttached(); - if (commands.empty() && getState()->hasState()) - throw IllegalStateException( + if (getState()->hasState()) { // Replay + if (commands.empty()) throw IllegalStateException( QPID_MSG(getState()->getId() << ": has state but client is attaching as new session.")); - getState()->sender.expected(commands.empty() ? SequenceNumber(0) : commands.front()); - if (!sendReady) // send command point if not already sent - sendCommandPoint(); + // TODO aconway 2008-05-12: support replay of partial commands. + // Here we always round down to the last command boundary. + SessionPoint expectedPoint = commands.empty() ? SequenceNumber(0) : SessionPoint(commands.front(),0); + SessionState::ReplayRange replay = getState()->senderExpected(expectedPoint); + sendCommandPoint(expectedPoint); + std::for_each(replay.begin(), replay.end(), out); // replay + } + else + sendCommandPoint(getState()->senderGetCommandPoint()); } void SessionHandler::confirmed(const SequenceSet& commands, const Array& /*fragments*/) { checkAttached(); // Ignore non-contiguous confirmations. - if (!commands.empty() && commands.front() >= getState()->sender.getReplayPoint()) { - getState()->sender.confirmed(commands.rangesBegin()->last()); - } + if (!commands.empty() && commands.front() >= getState()->senderGetReplayPoint()) + getState()->senderConfirmed(commands.rangesBegin()->last()); } void SessionHandler::completed(const SequenceSet& commands, bool /*timelyReply*/) { checkAttached(); - getState()->sender.completed(commands); + getState()->senderCompleted(commands); if (!commands.empty()) peer.knownCompleted(commands); // Always send a timely reply } void SessionHandler::knownCompleted(const SequenceSet& commands) { checkAttached(); - getState()->receiver.knownCompleted(commands); + getState()->receiverKnownCompleted(commands); } void SessionHandler::flush(bool expected, bool confirmed, bool completed) { @@ -210,18 +213,18 @@ void SessionHandler::flush(bool expected, bool confirmed, bool completed) { if (expected) { SequenceSet expectSet; if (getState()->hasState()) - expectSet.add(getState()->receiver.getExpected().command); + expectSet.add(getState()->receiverGetExpected().command); peer.expected(expectSet, Array()); } if (confirmed) { SequenceSet confirmSet; - if (!getState()->receiver.getUnknownComplete().empty()) - confirmSet.add(getState()->receiver.getUnknownComplete().front(), - getState()->receiver.getReceived().command); + if (!getState()->receiverGetUnknownComplete().empty()) + confirmSet.add(getState()->receiverGetUnknownComplete().front(), + getState()->receiverGetReceived().command); peer.confirmed(confirmSet, Array()); } if (completed) - peer.completed(getState()->receiver.getUnknownComplete(), true); + peer.completed(getState()->receiverGetUnknownComplete(), true); } void SessionHandler::gap(const SequenceSet& /*commands*/) { @@ -237,20 +240,20 @@ void SessionHandler::sendDetach() void SessionHandler::sendCompletion() { checkAttached(); - peer.completed(getState()->receiver.getUnknownComplete(), true); + peer.completed(getState()->receiverGetUnknownComplete(), true); } void SessionHandler::sendAttach(bool force) { checkAttached(); + QPID_LOG(debug, "SessionHandler::sendAttach attach id=" << getState()->getId()); peer.attach(getState()->getId().getName(), force); if (getState()->hasState()) peer.flush(true, true, true); else - sendCommandPoint(); + sendCommandPoint(getState()->senderGetCommandPoint()); } -void SessionHandler::sendCommandPoint() { - SessionPoint point(getState()->sender.getCommandPoint()); +void SessionHandler::sendCommandPoint(const SessionPoint& point) { peer.commandPoint(point.command, point.offset); if (!sendReady) { sendReady = true; diff --git a/cpp/src/qpid/amqp_0_10/SessionHandler.h b/cpp/src/qpid/amqp_0_10/SessionHandler.h index 85577ebafc..ccbe597bfc 100644 --- a/cpp/src/qpid/amqp_0_10/SessionHandler.h +++ b/cpp/src/qpid/amqp_0_10/SessionHandler.h @@ -25,10 +25,10 @@ #include "qpid/framing/ChannelHandler.h" #include "qpid/framing/AMQP_AllProxy.h" #include "qpid/framing/AMQP_AllOperations.h" +#include "qpid/SessionState.h" namespace qpid { -class SessionState; namespace amqp_0_10 { @@ -45,8 +45,7 @@ class SessionHandler : public framing::AMQP_AllOperations::SessionHandler, public: typedef framing::AMQP_AllProxy::Session Peer; - SessionHandler(); - SessionHandler(framing::FrameHandler& out, uint16_t channel); + SessionHandler(framing::FrameHandler* out=0, uint16_t channel=0); ~SessionHandler(); void setChannel(uint16_t ch) { channel = ch; } @@ -63,7 +62,6 @@ class SessionHandler : public framing::AMQP_AllOperations::SessionHandler, void sendAttach(bool force); void sendTimeout(uint32_t t); void sendFlush(); - void sendCommandPoint(); /** True if the handler is ready to send and receive */ bool ready() const; @@ -96,8 +94,8 @@ class SessionHandler : public framing::AMQP_AllOperations::SessionHandler, // Notification of events virtual void readyToSend() {} virtual void readyToReceive() {} - virtual void handleDetach(); + virtual void handleDetach(); virtual void handleIn(framing::AMQFrame&); virtual void handleOut(framing::AMQFrame&); @@ -108,8 +106,9 @@ class SessionHandler : public framing::AMQP_AllOperations::SessionHandler, Peer peer; bool ignoring; bool sendReady, receiveReady; - // FIXME aconway 2008-05-07: move handler-related functions from SessionState. + private: + void sendCommandPoint(const SessionPoint&); }; }} // namespace qpid::amqp_0_10 diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp index 337992992f..f3e103dfaf 100644 --- a/cpp/src/qpid/broker/Bridge.cpp +++ b/cpp/src/qpid/broker/Bridge.cpp @@ -59,6 +59,7 @@ void Bridge::create(ConnectionState& c) peer.reset(new framing::AMQP_ServerProxy(*channelHandler)); session->attach(name, false); + session->commandPoint(0,0); if (args.i_src_is_local) { //TODO: handle 'push' here... simplest way is to create frames and pass them to Connection::received() diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 0a45c46d30..4f7686aac4 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -53,6 +53,9 @@ #if HAVE_SASL #include <sasl/sasl.h> +static const bool AUTH_DEFAULT=true; +#else +static const bool AUTH_DEFAULT=false; #endif using qpid::sys::ProtocolFactory; @@ -81,41 +84,25 @@ Broker::Options::Options(const std::string& name) : stagingThreshold(5000000), enableMgmt(1), mgmtPubInterval(10), -#if HAVE_SASL - auth(true), -#else - auth(false), -#endif - realm("QPID"), - ack(0) + auth(AUTH_DEFAULT), + replayFlushLimit(64), + replayHardLimit(0) { int c = sys::SystemInfo::concurrency(); workerThreads=c+1; addOptions() - ("data-dir", optValue(dataDir,"DIR"), - "Directory to contain persistent data generated by the broker") - ("no-data-dir", optValue(noDataDir), - "Don't use a data directory. No persistent configuration will be loaded or stored") - ("port,p", optValue(port,"PORT"), - "Tells the broker to listen on PORT") - ("worker-threads", optValue(workerThreads, "N"), - "Sets the broker thread pool size") - ("max-connections", optValue(maxConnections, "N"), - "Sets the maximum allowed connections") - ("connection-backlog", optValue(connectionBacklog, "N"), - "Sets the connection backlog limit for the server socket") - ("staging-threshold", optValue(stagingThreshold, "N"), - "Stages messages over N bytes to disk") - ("mgmt-enable,m", optValue(enableMgmt,"yes|no"), - "Enable Management") - ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), - "Management Publish Interval") - ("auth", optValue(auth, "yes|no"), - "Enable authentication, if disabled all incoming connections will be trusted") - ("realm", optValue(realm, "REALM"), - "Use the given realm when performing authentication") - ("ack", optValue(ack, "N"), - "Send session.ack/solicit-ack at least every N frames. 0 disables voluntary ack/solitict-ack"); + ("data-dir", optValue(dataDir,"DIR"), "Directory to contain persistent data generated by the broker") + ("no-data-dir", optValue(noDataDir), "Don't use a data directory. No persistent configuration will be loaded or stored") + ("port,p", optValue(port,"PORT"), "Tells the broker to listen on PORT") + ("worker-threads", optValue(workerThreads, "N"), "Sets the broker thread pool size") + ("max-connections", optValue(maxConnections, "N"), "Sets the maximum allowed connections") + ("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket") + ("staging-threshold", optValue(stagingThreshold, "N"), "Stages messages over N bytes to disk") + ("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management") + ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), "Management Publish Interval") + ("auth", optValue(auth, "yes|no"), "Enable authentication, if disabled all incoming connections will be trusted") + ("replay-flush-limit", optValue(replayFlushLimit, "KB"), "Send flush request when the replay buffer reaches this limit. 0 means no limit.") + ("replay-hard-limit", optValue(replayHardLimit, "KB"), "Kill a session if its replay buffer exceeds this limit. 0 means no limit."); } const std::string empty; @@ -132,7 +119,11 @@ Broker::Broker(const Broker::Options& conf) : dataDir(conf.noDataDir ? std::string () : conf.dataDir), links(this), factory(*this), - sessionManager(conf.ack) + sessionManager( + qpid::SessionState::Configuration( + conf.replayFlushLimit*1024, // convert kb to bytes. + conf.replayHardLimit*1024), + *this) { if(conf.enableMgmt){ QPID_LOG(info, "Management enabled"); diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index a1eaf4f62f..7092a86181 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -83,7 +83,8 @@ class Broker : public sys::Runnable, public Plugin::Target, uint16_t mgmtPubInterval; bool auth; std::string realm; - uint32_t ack; + size_t replayFlushLimit; + size_t replayHardLimit; }; virtual ~Broker(); diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index d156b4a914..463193a346 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -124,8 +124,8 @@ void Connection::idleIn(){} void Connection::closed(){ // Physically closed, suspend open sessions. try { - for (ChannelMap::iterator i = channels.begin(); i != channels.end(); ++i) - ptr_map_ptr(i)->handleDetach(); + while (!channels.empty()) + ptr_map_ptr(channels.begin())->handleDetach(); while (!exclusiveQueues.empty()) { Queue::shared_ptr q(exclusiveQueues.front()); q->releaseExclusiveOwnership(); diff --git a/cpp/src/qpid/broker/SaslAuthenticator.cpp b/cpp/src/qpid/broker/SaslAuthenticator.cpp index d48b258ba2..a542211147 100644 --- a/cpp/src/qpid/broker/SaslAuthenticator.cpp +++ b/cpp/src/qpid/broker/SaslAuthenticator.cpp @@ -89,13 +89,20 @@ void NullAuthenticator::getMechanisms(Array& mechanisms) mechanisms.add(boost::shared_ptr<FieldValue>(new Str16Value("ANONYMOUS"))); } -void NullAuthenticator::start(const string& /*mechanism*/, const string& /*response*/) +void NullAuthenticator::start(const string& mechanism, const string& response) { QPID_LOG(warning, "SASL: No Authentication Performed"); - - // TODO: Figure out what should actually be set in this case - connection.setUserId("anonymous"); - + if (mechanism == "PLAIN") { // Old behavior + if (response.size() > 0 && response[0] == (char) 0) { + string temp = response.substr(1); + string::size_type i = temp.find((char)0); + string uid = temp.substr(0, i); + string pwd = temp.substr(i + 1); + connection.setUserId(uid); + } + } else { + connection.setUserId("anonymous"); + } client.tune(framing::CHANNEL_MAX, connection.getFrameMax(), 0, 0); } diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index e284451d14..2d0edde27b 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -410,7 +410,7 @@ void SessionAdapter::MessageHandlerImpl::accept(const framing::SequenceSet& comm framing::MessageAcquireResult SessionAdapter::MessageHandlerImpl::acquire(const framing::SequenceSet& transfers) { - //TODO: change this when SequenceNumberSet is deleted along with preview code + // FIXME aconway 2008-05-12: create SequenceSet directly, no need for intermediate results vector. SequenceNumberSet results; RangedOperation f = boost::bind(&SemanticState::acquire, &state, _1, _2, boost::ref(results)); transfers.for_each(f); diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp index 2ec0988fc0..2f09c6b5ac 100644 --- a/cpp/src/qpid/broker/SessionHandler.cpp +++ b/cpp/src/qpid/broker/SessionHandler.cpp @@ -21,12 +21,6 @@ #include "SessionHandler.h" #include "SessionState.h" #include "Connection.h" -#include "ConnectionState.h" -#include "qpid/framing/reply_exceptions.h" -#include "qpid/framing/constants.h" -#include "qpid/framing/ClientInvoker.h" -#include "qpid/framing/ServerInvoker.h" -#include "qpid/framing/all_method_bodies.h" #include "qpid/log/Statement.h" #include <boost/bind.hpp> @@ -38,11 +32,9 @@ using namespace std; using namespace qpid::sys; SessionHandler::SessionHandler(Connection& c, ChannelId ch) - : InOutHandler(0, &out), - connection(c), channel(ch, &c.getOutput()), - proxy(out), // Via my own handleOut() for L2 data. - peerSession(channel), // Direct to channel for L2 commands. - ignoring(false) + : amqp_0_10::SessionHandler(&c.getOutput(), ch), + connection(c), + proxy(out) {} SessionHandler::~SessionHandler() {} @@ -52,191 +44,52 @@ ClassId classId(AMQMethodBody* m) { return m ? m->amqpMethodId() : 0; } MethodId methodId(AMQMethodBody* m) { return m ? m->amqpClassId() : 0; } } // namespace -void SessionHandler::handleIn(AMQFrame& f) { - // Note on channel states: a channel is attached if session != 0 - AMQMethodBody* m = f.getBody()->getMethod(); - try { - if (ignoring && !(m && m->isA<SessionDetachedBody>())) { - return; - } - if (m && isValid(m) && invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m)) { - //frame was a valid session control and has been handled - return; - } else if (session.get()) { - //we are attached and frame was not a session control so it is for upper layers - session->handle(f); - } else if (m && m->isA<SessionDetachedBody>()) { - handleDetach(); - connection.closeChannel(channel.get()); - } else { - throw NotAttachedException(QPID_MSG("Channel " << channel.get() << " is not attached")); - } - }catch(const ChannelException& e){ - QPID_LOG(error, "Session detached due to: " << e.what()); - peerSession.detached(name, e.code); +void SessionHandler::channelException(uint16_t, const std::string&) { handleDetach(); - connection.closeChannel(channel.get()); - }catch(const ConnectionException& e){ - connection.close(e.code, e.what(), classId(m), methodId(m)); - }catch(const std::exception& e){ - connection.close(501, e.what(), classId(m), methodId(m)); - } } -bool SessionHandler::isValid(AMQMethodBody* m) { - return session.get() || m->isA<SessionAttachBody>() || m->isA<SessionAttachedBody>(); -} - -void SessionHandler::handleOut(AMQFrame& f) { - channel.handle(f); // Send it. - if (session->sent(f)) - peerSession.flush(false, false, true); -} - -void SessionHandler::assertAttached(const char* method) const { - if (!session.get()) { - std::cout << "SessionHandler::assertAttached() failed for " << method << std::endl; - throw NotAttachedException( - QPID_MSG(method << " failed: No session for channel " - << getChannel())); - } -} - -void SessionHandler::assertClosed(const char* method) const { - if (session.get()) - throw SessionBusyException( - QPID_MSG(method << " failed: channel " << channel.get() - << " is already open.")); +void SessionHandler::connectionException(uint16_t code, const std::string& msg) { + connection.close(code, msg, 0, 0); } ConnectionState& SessionHandler::getConnection() { return connection; } -const ConnectionState& SessionHandler::getConnection() const { return connection; } - -//new methods: -void SessionHandler::attach(const std::string& _name, bool /*force*/) -{ - name = _name;//TODO: this should be used in conjunction with - //userid for connection as sessions identity - //TODO: need to revise session manager to support resume as well - assertClosed("attach"); - session.reset(new SessionState(0, this, 0, 0, name)); - peerSession.attached(name); - peerSession.commandPoint(session->nextOut, 0); -} - -void SessionHandler::attached(const std::string& _name) -{ - name = _name;//TODO: this should be used in conjunction with - //userid for connection as sessions identity - session.reset(new SessionState(0, this, 0, 0, name)); - peerSession.commandPoint(session->nextOut, 0); -} +const ConnectionState& SessionHandler::getConnection() const { return connection; } -void SessionHandler::detach(const std::string& name) -{ - assertAttached("detach"); - peerSession.detached(name, session::NORMAL); - handleDetach(); +void SessionHandler::handleDetach() { + amqp_0_10::SessionHandler::handleDetach(); assert(&connection.getChannel(channel.get()) == this); + if (session.get()) + connection.getBroker().getSessionManager().detach(session); + assert(!session.get()); connection.closeChannel(channel.get()); } -void SessionHandler::detached(const std::string& name, uint8_t code) -{ - ignoring = false; - handleDetach(); - if (code) { - //no error - } else { - //error occured - QPID_LOG(warning, "Received session.closed: "<< name << " " << code); - } - connection.closeChannel(channel.get()); -} - -void SessionHandler::handleDetach() -{ - if (session.get()) { - session->detach(); - session.reset(); - } -} - -void SessionHandler::requestDetach() -{ - //TODO: request timeout when python can handle it - //peerSession.requestTimeout(0); - ignoring = true; - peerSession.detach(name); -} - -void SessionHandler::requestTimeout(uint32_t t) -{ - session->setTimeout(t); - peerSession.timeout(t); -} - -void SessionHandler::timeout(uint32_t t) -{ - session->setTimeout(t); -} - -void SessionHandler::commandPoint(const framing::SequenceNumber& id, uint64_t offset) -{ - if (offset) throw NotImplementedException("Non-zero byte offset not yet supported for command-point"); - - session->nextIn = id; -} - -void SessionHandler::expected(const framing::SequenceSet& commands, const framing::Array& fragments) -{ - if (!commands.empty() || fragments.size()) { - throw NotImplementedException("Session resumption not yet supported"); - } +void SessionHandler::setState(const std::string& name, bool force) { + assert(!session.get()); + SessionId id(connection.getUserId(), name); + session = connection.broker.getSessionManager().attach(*this, id, force); } -void SessionHandler::confirmed(const framing::SequenceSet& /*commands*/, const framing::Array& /*fragments*/) -{ - //don't really care too much about this yet -} +FrameHandler* SessionHandler::getInHandler() { return session.get(); } +qpid::SessionState* SessionHandler::getState() { return session.get(); } -void SessionHandler::completed(const framing::SequenceSet& commands, bool timelyReply) -{ - session->complete(commands); - if (timelyReply) { - peerSession.knownCompleted(session->knownCompleted); - session->knownCompleted.clear(); - } +void SessionHandler::readyToSend() { + if (session.get()) session->readyToSend(); } -void SessionHandler::knownCompleted(const framing::SequenceSet& commands) -{ - session->completed.remove(commands); -} - -void SessionHandler::flush(bool expected, bool confirmed, bool completed) -{ - if (expected) { - peerSession.expected(SequenceSet(session->nextIn), Array()); - } - if (confirmed) { - peerSession.confirmed(session->completed, Array()); - } - if (completed) { - peerSession.completed(session->completed, true); - } -} - - -void SessionHandler::sendCompletion() -{ - peerSession.completed(session->completed, true); +// TODO aconway 2008-05-12: hacky - handle attached for bridge clients. +// We need to integrate the client code so we can run a real client +// in the bridge. +// +void SessionHandler::attached(const std::string& name) { + if (session.get()) + checkName(name); + else { + SessionId id(connection.getUserId(), name); + SessionState::Configuration config = connection.broker.getSessionManager().getSessionConfig(); + session.reset(new SessionState(connection.getBroker(), *this, id, config)); } - -void SessionHandler::gap(const framing::SequenceSet& /*commands*/) -{ - throw NotImplementedException("gap not yet supported"); } }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h index 47c534441a..1aa3137fdf 100644 --- a/cpp/src/qpid/broker/SessionHandler.h +++ b/cpp/src/qpid/broker/SessionHandler.h @@ -22,19 +22,12 @@ * */ -#include "qpid/framing/FrameHandler.h" -#include "qpid/framing/AMQP_ClientOperations.h" -#include "qpid/framing/AMQP_ServerOperations.h" +#include "qpid/amqp_0_10/SessionHandler.h" #include "qpid/framing/AMQP_ClientProxy.h" -#include "qpid/framing/amqp_types.h" -#include "qpid/framing/Array.h" -#include "qpid/framing/ChannelHandler.h" -#include "qpid/framing/SequenceNumber.h" - - -#include <boost/noncopyable.hpp> namespace qpid { +class SessionState; + namespace broker { class Connection; @@ -46,65 +39,38 @@ class SessionState; * receives incoming frames, handles session controls and manages the * association between the channel and a session. */ -class SessionHandler : public framing::AMQP_ServerOperations::SessionHandler, - public framing::FrameHandler::InOutHandler, - private boost::noncopyable -{ +class SessionHandler : public amqp_0_10::SessionHandler { public: SessionHandler(Connection&, framing::ChannelId); ~SessionHandler(); - /** Returns 0 if not attached to a session */ + /** Get broker::SessionState */ SessionState* getSession() { return session.get(); } const SessionState* getSession() const { return session.get(); } - framing::ChannelId getChannel() const { return channel.get(); } - ConnectionState& getConnection(); const ConnectionState& getConnection() const; framing::AMQP_ClientProxy& getProxy() { return proxy; } const framing::AMQP_ClientProxy& getProxy() const { return proxy; } - void requestDetach(); - void handleDetach(); - void sendCompletion(); - - protected: - void handleIn(framing::AMQFrame&); - void handleOut(framing::AMQFrame&); + virtual void handleDetach(); - private: - //new methods: - void attach(const std::string& name, bool force); + // Overrides void attached(const std::string& name); - void detach(const std::string& name); - void detached(const std::string& name, uint8_t code); - - void requestTimeout(uint32_t t); - void timeout(uint32_t t); - - void commandPoint(const framing::SequenceNumber& id, uint64_t offset); - void expected(const framing::SequenceSet& commands, const framing::Array& fragments); - void confirmed(const framing::SequenceSet& commands,const framing::Array& fragments); - void completed(const framing::SequenceSet& commands, bool timelyReply); - void knownCompleted(const framing::SequenceSet& commands); - void flush(bool expected, bool confirmed, bool completed); - void gap(const framing::SequenceSet& commands); - void assertAttached(const char* method) const; - void assertActive(const char* method) const; - void assertClosed(const char* method) const; - - bool isValid(framing::AMQMethodBody*); + protected: + virtual void setState(const std::string& sessionName, bool force); + virtual qpid::SessionState* getState(); + virtual framing::FrameHandler* getInHandler(); + virtual void channelException(uint16_t code, const std::string& msg); + virtual void connectionException(uint16_t code, const std::string& msg); + virtual void readyToSend(); + private: Connection& connection; - framing::ChannelHandler channel; framing::AMQP_ClientProxy proxy; - framing::AMQP_ClientProxy::Session peerSession; - bool ignoring; std::auto_ptr<SessionState> session; - std::string name;//TODO: this should be part of the session state and replace the id }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionManager.cpp b/cpp/src/qpid/broker/SessionManager.cpp index d7bae737fc..789e43b902 100644 --- a/cpp/src/qpid/broker/SessionManager.cpp +++ b/cpp/src/qpid/broker/SessionManager.cpp @@ -40,68 +40,58 @@ using boost::intrusive_ptr; using namespace sys; using namespace framing; -SessionManager::SessionManager(uint32_t a) : ack(a) {} +SessionManager::SessionManager(const SessionState::Configuration& c, Broker& b) + : config(c), broker(b) {} -SessionManager::~SessionManager() {} +SessionManager::~SessionManager() { + detached.clear(); // Must clear before destructor as session dtor will call forget() +} -// FIXME aconway 2008-02-01: pass handler*, allow open unattached. -std::auto_ptr<SessionState> SessionManager::open( - SessionHandler& h, uint32_t timeout_, std::string _name) -{ +std::auto_ptr<SessionState> SessionManager::attach(SessionHandler& h, const SessionId& id, bool/*force*/) { Mutex::ScopedLock l(lock); - std::auto_ptr<SessionState> session( - new SessionState(this, &h, timeout_, ack, _name)); - active.insert(session->getId()); + eraseExpired(); // Clean up expired table + std::pair<Attached::iterator, bool> insert = attached.insert(id); + if (!insert.second) + throw SessionBusyException(QPID_MSG("Session already attached: " << id)); + Detached::iterator i = std::find(detached.begin(), detached.end(), id); + std::auto_ptr<SessionState> state; + if (i == detached.end()) { + state.reset(new SessionState(broker, h, id, config)); for_each(observers.begin(), observers.end(), - boost::bind(&Observer::opened, _1,boost::ref(*session))); - return session; + boost::bind(&Observer::opened, _1,boost::ref(*state))); + } + else { + state.reset(detached.release(i).release()); + state->attach(h); + } + return state; + // FIXME aconway 2008-04-29: implement force } -void SessionManager::suspend(std::auto_ptr<SessionState> session) { +void SessionManager::detach(std::auto_ptr<SessionState> session) { Mutex::ScopedLock l(lock); - active.erase(session->getId()); - session->suspend(); + attached.erase(session->getId()); + session->detach(); + if (session->getTimeout() > 0) { session->expiry = AbsTime(now(),session->getTimeout()*TIME_SEC); if (session->mgmtObject.get() != 0) session->mgmtObject->set_expireTime ((uint64_t) Duration (session->expiry)); - suspended.push_back(session.release()); // In expiry order + detached.push_back(session.release()); // In expiry order eraseExpired(); } - -std::auto_ptr<SessionState> SessionManager::resume(const Uuid& id) -{ - Mutex::ScopedLock l(lock); - eraseExpired(); - if (active.find(id) != active.end()) - throw SessionBusyException( - QPID_MSG("Session already active: " << id)); - Suspended::iterator i = std::find_if( - suspended.begin(), suspended.end(), - boost::bind(std::equal_to<Uuid>(), id, boost::bind(&SessionState::getId, _1)) - ); - if (i == suspended.end()) - throw InvalidArgumentException( - QPID_MSG("No suspended session with id=" << id)); - active.insert(id); - std::auto_ptr<SessionState> state(suspended.release(i).release()); - return state; } -void SessionManager::erase(const framing::Uuid& id) -{ - Mutex::ScopedLock l(lock); - active.erase(id); -} +void SessionManager::forget(const SessionId& id) { attached.erase(id); } void SessionManager::eraseExpired() { // Called with lock held. - if (!suspended.empty()) { - Suspended::iterator keep = std::lower_bound( - suspended.begin(), suspended.end(), now(), + if (!detached.empty()) { + Detached::iterator keep = std::lower_bound( + detached.begin(), detached.end(), now(), boost::bind(std::less<AbsTime>(), boost::bind(&SessionState::expiry, _1), _2)); - if (suspended.begin() != keep) { - QPID_LOG(debug, "Expiring sessions: " << log::formatList(suspended.begin(), keep)); - suspended.erase(suspended.begin(), keep); + if (detached.begin() != keep) { + QPID_LOG(debug, "Expiring sessions: " << log::formatList(detached.begin(), keep)); + detached.erase(detached.begin(), keep); } } } diff --git a/cpp/src/qpid/broker/SessionManager.h b/cpp/src/qpid/broker/SessionManager.h index ad064c69bb..9a4142f613 100644 --- a/cpp/src/qpid/broker/SessionManager.h +++ b/cpp/src/qpid/broker/SessionManager.h @@ -22,7 +22,7 @@ * */ -#include <qpid/framing/Uuid.h> +#include <qpid/SessionState.h> #include <qpid/sys/Time.h> #include <qpid/sys/Mutex.h> #include <qpid/RefCounted.h> @@ -37,7 +37,7 @@ namespace qpid { namespace broker { - +class Broker; class SessionState; class SessionHandler; @@ -50,44 +50,43 @@ class SessionManager : private boost::noncopyable { * Observer notified of SessionManager events. */ struct Observer : public RefCounted { + /** Called when a stateless session is attached. */ virtual void opened(SessionState&) {} }; - SessionManager(uint32_t ack); + SessionManager(const qpid::SessionState::Configuration&, Broker&); ~SessionManager(); /** Open a new active session, caller takes ownership */ - std::auto_ptr<SessionState> open(SessionHandler& c, uint32_t timeout_, std::string name); + std::auto_ptr<SessionState> attach(SessionHandler& h, const SessionId& id, bool/*force*/); - /** Suspend a session, start it's timeout counter. - * The factory takes ownership. - */ - void suspend(std::auto_ptr<SessionState> session); + /** Return a detached session to the manager, start the timeout counter. */ + void detach(std::auto_ptr<SessionState>); - /** Resume a suspended session. - *@throw Exception if timed out or non-existant. - */ - std::auto_ptr<SessionState> resume(const framing::Uuid&); + /** Forget about an attached session. Called by SessionState destructor. */ + void forget(const SessionId&); /** Add an Observer. */ void add(const boost::intrusive_ptr<Observer>&); + Broker& getBroker() const { return broker; } + + const qpid::SessionState::Configuration& getSessionConfig() const { return config; } + private: - typedef boost::ptr_vector<SessionState> Suspended; - typedef std::set<framing::Uuid> Active; + typedef boost::ptr_vector<SessionState> Detached; // Sorted in expiry order. + typedef std::set<SessionId> Attached; typedef std::vector<boost::intrusive_ptr<Observer> > Observers; - void erase(const framing::Uuid&); void eraseExpired(); sys::Mutex lock; - Suspended suspended; - Active active; - uint32_t ack; + Detached detached; + Attached attached; + qpid::SessionState::Configuration config; Observers observers; - - friend class SessionState; // removes deleted sessions from active set. + Broker& broker; }; diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 2ef1ed2de4..c851162046 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -32,6 +32,7 @@ #include "qpid/log/Statement.h" #include <boost/bind.hpp> +#include <boost/lexical_cast.hpp> namespace qpid { namespace broker { @@ -45,59 +46,46 @@ using qpid::management::Manageable; using qpid::management::Args; SessionState::SessionState( - SessionManager* f, SessionHandler* h, uint32_t timeout_, uint32_t ack, string& _name) - : framing::SessionState(ack, timeout_ > 0), nextOut(0), - factory(f), handler(h), id(true), timeout(timeout_), - broker(h->getConnection().broker), - version(h->getConnection().getVersion()), - ignoring(false), name(_name), + Broker& b, SessionHandler& h, const SessionId& id, const SessionState::Configuration& config) + : qpid::SessionState(id, config), + broker(b), handler(&h), + ignoring(false), semanticState(*this, *this), adapter(semanticState), msgBuilder(&broker.getStore(), broker.getStagingThreshold()), - ackOp(boost::bind(&SemanticState::completed, &semanticState, _1, _2)), enqueuedOp(boost::bind(&SessionState::enqueued, this, _1)) { - getConnection().outputTasks.addOutputTask(&semanticState); - Manageable* parent = broker.GetVhostObject (); - - if (parent != 0) - { + if (parent != 0) { ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); - - if (agent.get () != 0) - { + if (agent.get () != 0) { mgmtObject = management::Session::shared_ptr - (new management::Session (this, parent, name)); - mgmtObject->set_attached (1); - mgmtObject->set_clientRef (h->getConnection().GetManagementObject()->getObjectId()); - mgmtObject->set_channelId (h->getChannel()); - mgmtObject->set_detachedLifespan (getTimeout()); + (new management::Session (this, parent, getId().getName())); + mgmtObject->set_attached (0); agent->addObject (mgmtObject); } } + attach(h); } SessionState::~SessionState() { // Remove ID from active session list. - if (factory) - factory->erase(getId()); + // FIXME aconway 2008-05-12: Need to distinguish outgoing sessions established by bridge, + // they don't belong in the manager. For now rely on uniqueness of UUIDs. + // + broker.getSessionManager().forget(getId()); if (mgmtObject.get () != 0) mgmtObject->resourceDestroy (); } -SessionHandler* SessionState::getHandler() { - return handler; -} - AMQP_ClientProxy& SessionState::getProxy() { assert(isAttached()); - return getHandler()->getProxy(); + return handler->getProxy(); } ConnectionState& SessionState::getConnection() { assert(isAttached()); - return getHandler()->getConnection(); + return handler->getConnection(); } bool SessionState::isLocal(const ConnectionToken* t) const @@ -106,18 +94,19 @@ bool SessionState::isLocal(const ConnectionToken* t) const } void SessionState::detach() { - getConnection().outputTasks.removeOutputTask(&semanticState); + // activateOutput can be called in a different thread, lock to protect attached status Mutex::ScopedLock l(lock); + QPID_LOG(debug, getId() << ": detached on broker."); + getConnection().outputTasks.removeOutputTask(&semanticState); handler = 0; if (mgmtObject.get() != 0) - { mgmtObject->set_attached (0); } -} void SessionState::attach(SessionHandler& h) { - { + // activateOutput can be called in a different thread, lock to protect attached status Mutex::ScopedLock l(lock); + QPID_LOG(debug, getId() << ": attached on broker."); handler = &h; if (mgmtObject.get() != 0) { @@ -126,16 +115,13 @@ void SessionState::attach(SessionHandler& h) { mgmtObject->set_channelId (h.getChannel()); } } - h.getConnection().outputTasks.addOutputTask(&semanticState); -} -void SessionState::activateOutput() -{ +void SessionState::activateOutput() { + // activateOutput can be called in a different thread, lock to protect attached status Mutex::ScopedLock l(lock); - if (isAttached()) { + if (isAttached()) getConnection().outputTasks.activateOutput(); } -} //This class could be used as the callback for queue notifications //if not attached, it can simply ignore the callback, else pass it //on to the connection @@ -155,7 +141,7 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, case management::Session::METHOD_DETACH : if (handler != 0) { - handler->requestDetach(); + handler->sendDetach(); } status = Manageable::STATUS_OK; break; @@ -179,35 +165,25 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, return status; } -void SessionState::handleCommand(framing::AMQMethodBody* method, SequenceNumber& id) -{ - id = nextIn++; +void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceNumber& id) { Invoker::Result invocation = invoke(adapter, *method); - completed.add(id); - + receiverCompleted(id); if (!invocation.wasHandled()) { throw NotImplementedException(QPID_MSG("Not implemented: " << *method)); } else if (invocation.hasResult()) { - nextOut++;//execution result is now a command, so the counter must be incremented getProxy().getExecution().result(id, invocation.getResult()); } if (method->isSync()) { incomplete.process(enqueuedOp, true); sendCompletion(); } - //TODO: if window gets too large send unsolicited completion } -void SessionState::handleContent(AMQFrame& frame, SequenceNumber& id) +void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) { - intrusive_ptr<Message> msg(msgBuilder.getMessage()); - if (frame.getBof() && frame.getBos()) {//start of frameset - id = nextIn++; + if (frame.getBof() && frame.getBos()) //start of frameset msgBuilder.start(id); - msg = msgBuilder.getMessage(); - } else { - id = msg->getCommandId(); - } + intrusive_ptr<Message> msg(msgBuilder.getMessage()); msgBuilder.handle(frame); if (frame.getEof() && frame.getEos()) {//end of frameset if (frame.getBof()) { @@ -240,19 +216,14 @@ void SessionState::handleContent(AMQFrame& frame, SequenceNumber& id) void SessionState::enqueued(boost::intrusive_ptr<Message> msg) { - completed.add(msg->getCommandId()); - if (msg->requiresAccept()) { - nextOut++;//accept is a command, so the counter must be incremented + receiverCompleted(msg->getCommandId()); + if (msg->requiresAccept()) getProxy().getMessage().accept(SequenceSet(msg->getCommandId())); } -} void SessionState::handle(AMQFrame& frame) { - if (ignoring) return; - received(frame); - - SequenceNumber commandId; + SequenceNumber commandId = receiverGetCurrent(); try { //TODO: make command handling more uniform, regardless of whether //commands carry content. @@ -277,29 +248,36 @@ void SessionState::handle(AMQFrame& frame) } else { getProxy().getExecution().exception(e.code, commandId, 0, 0, 0, e.what(), FieldTable()); } - timeout = 0; ignoring = true; - handler->requestDetach(); + handler->sendDetach(); } } DeliveryId SessionState::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token) { uint32_t maxFrameSize = getConnection().getFrameMax(); - MessageDelivery::deliver(msg, getProxy().getHandler(), nextOut, token, maxFrameSize); - return nextOut++; + assert(senderGetCommandPoint().offset == 0); + SequenceNumber commandId = senderGetCommandPoint().command; + MessageDelivery::deliver(msg, getProxy().getHandler(), commandId, token, maxFrameSize); + assert(senderGetCommandPoint() == SessionPoint(commandId+1, 0)); // Delivery has moved sendPoint. + return commandId; } -void SessionState::sendCompletion() -{ - handler->sendCompletion(); +void SessionState::sendCompletion() { handler->sendCompletion(); } + +void SessionState::senderCompleted(const SequenceSet& commands) { + qpid::SessionState::senderCompleted(commands); + commands.for_each(boost::bind(&SemanticState::completed, &semanticState, _1, _2)); } -void SessionState::complete(const SequenceSet& commands) -{ - knownCompleted.add(commands); - commands.for_each(ackOp); +void SessionState::readyToSend() { + QPID_LOG(debug, getId() << ": ready to send, activating output."); + assert(handler); + sys::AggregateOutput& tasks = handler->getConnection().outputTasks; + tasks.addOutputTask(&semanticState); + tasks.activateOutput(); } +Broker& SessionState::getBroker() { return broker; } }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index ae860e84c9..7b70789161 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -22,11 +22,9 @@ * */ -#include "qpid/framing/Uuid.h" +#include "qpid/SessionState.h" #include "qpid/framing/FrameHandler.h" -#include "qpid/framing/SessionState.h" #include "qpid/framing/SequenceSet.h" -#include "qpid/framing/ProtocolVersion.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/Time.h" #include "qpid/management/Manageable.h" @@ -63,21 +61,20 @@ class SessionManager; * Broker-side session state includes sessions handler chains, which may * themselves have state. */ -class SessionState : public framing::SessionState, +class SessionState : public qpid::SessionState, public SessionContext, public DeliveryAdapter, - public management::Manageable + public management::Manageable, + public framing::FrameHandler { public: + SessionState(Broker&, SessionHandler&, const SessionId&, const SessionState::Configuration&); ~SessionState(); bool isAttached() const { return handler; } void detach(); void attach(SessionHandler& handler); - - SessionHandler* getHandler(); - /** @pre isAttached() */ framing::AMQP_ClientProxy& getProxy(); @@ -85,18 +82,15 @@ class SessionState : public framing::SessionState, ConnectionState& getConnection(); bool isLocal(const ConnectionToken* t) const; - uint32_t getTimeout() const { return timeout; } - void setTimeout(uint32_t t) { timeout = t; } - - Broker& getBroker() { return broker; } - framing::ProtocolVersion getVersion() const { return version; } + Broker& getBroker(); /** OutputControl **/ void activateOutput(); void handle(framing::AMQFrame& frame); - void complete(const framing::SequenceSet& ranges); + void senderCompleted(const framing::SequenceSet& ranges); + void sendCompletion(); //delivery adapter methods: @@ -107,29 +101,13 @@ class SessionState : public framing::SessionState, management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args); - // Normally SessionManager creates sessions. - SessionState(SessionManager*, - SessionHandler* out, - uint32_t timeout, - uint32_t ackInterval, - std::string& name); - - - framing::SequenceSet completed; - framing::SequenceSet knownCompleted; - framing::SequenceNumber nextIn; - framing::SequenceNumber nextOut; + void readyToSend(); private: - typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation; - SessionManager* factory; + Broker& broker; SessionHandler* handler; - framing::Uuid id; - uint32_t timeout; sys::AbsTime expiry; // Used by SessionManager. - Broker& broker; - framing::ProtocolVersion version; sys::Mutex lock; bool ignoring; std::string name; @@ -139,12 +117,11 @@ class SessionState : public framing::SessionState, MessageBuilder msgBuilder; IncompleteMessageList incomplete; - RangedOperation ackOp; IncompleteMessageList::CompletionListener enqueuedOp; management::Session::shared_ptr mgmtObject; - void handleCommand(framing::AMQMethodBody* method, framing::SequenceNumber& id); - void handleContent(framing::AMQFrame& frame, framing::SequenceNumber& id); + void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id); + void handleContent(framing::AMQFrame& frame, const framing::SequenceNumber& id); void enqueued(boost::intrusive_ptr<Message> msg); friend class SessionManager; diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index bca6c49c13..59353d7637 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -71,9 +71,9 @@ struct ClusterDeliverHandler : public FrameHandler { void handle(AMQFrame& f) { next->handle(f); - Mutex::ScopedLock l(sender.lock); - sender.busy=false; - sender.lock.notify(); + Mutex::ScopedLock l(senderLock); + senderBusy=false; + senderLock.notify(); } }; diff --git a/cpp/src/qpid/framing/SequenceNumber.cpp b/cpp/src/qpid/framing/SequenceNumber.cpp index cba00c860a..9e682d89e4 100644 --- a/cpp/src/qpid/framing/SequenceNumber.cpp +++ b/cpp/src/qpid/framing/SequenceNumber.cpp @@ -21,6 +21,7 @@ #include "SequenceNumber.h" #include "Buffer.h" +#include <ostream> using qpid::framing::SequenceNumber; using qpid::framing::Buffer; @@ -102,4 +103,8 @@ int32_t operator-(const SequenceNumber& a, const SequenceNumber& b) return result; } +std::ostream& operator<<(std::ostream& o, const SequenceNumber& n) { + return o << n.getValue(); +} + }} diff --git a/cpp/src/qpid/framing/SequenceNumber.h b/cpp/src/qpid/framing/SequenceNumber.h index d659bec5c1..aacd77501b 100644 --- a/cpp/src/qpid/framing/SequenceNumber.h +++ b/cpp/src/qpid/framing/SequenceNumber.h @@ -22,6 +22,7 @@ #define _framing_SequenceNumber_h #include "amqp_types.h" +#include <iosfwd> namespace qpid { namespace framing { @@ -66,6 +67,8 @@ struct Window SequenceNumber lwm; }; +std::ostream& operator<<(std::ostream& o, const SequenceNumber& n); + }} // namespace qpid::framing diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp index dfda9ecae1..aeff35dbf0 100644 --- a/cpp/src/tests/ClientSessionTest.cpp +++ b/cpp/src/tests/ClientSessionTest.cpp @@ -150,7 +150,7 @@ QPID_AUTO_TEST_CASE(testDispatcherThread) ClientSessionFixture fix; fix.session =fix.connection.newSession(ASYNC); fix.declareSubscribe(); - size_t count = 1000; + size_t count = 10; DummyListener listener(fix.session, "my-dest", count); sys::Thread t(listener); for (size_t i = 0; i < count; ++i) { @@ -205,7 +205,7 @@ QPID_AUTO_TEST_CASE(testSendToSelf) { sys::Thread runner(fix.subs);//start dispatcher thread string data("msg"); Message msg(data, "myq"); - const uint count=1000; + const uint count=10; for (uint i = 0; i < count; ++i) { fix.session.messageTransfer(content=msg); } diff --git a/cpp/src/tests/SessionState.cpp b/cpp/src/tests/SessionState.cpp index 71b90ea9f1..4beef87cfe 100644 --- a/cpp/src/tests/SessionState.cpp +++ b/cpp/src/tests/SessionState.cpp @@ -58,7 +58,7 @@ string str(const AMQFrame& f) { return "H"; // Must be a header. } // Make a string from a range of frames. -string str(const vector<AMQFrame>& frames) { +string str(const boost::iterator_range<vector<AMQFrame>::const_iterator>& frames) { string (*strFrame)(const AMQFrame&) = str; return applyAccumulate(frames.begin(), frames.end(), string(), ptr_fun(strFrame)); } @@ -84,7 +84,7 @@ AMQFrame contentFrameChar(char content, bool isLast=true) { } // Send frame & return size of frame. -size_t send(qpid::SessionState& s, const AMQFrame& f) { s.sender.record(f); return f.size(); } +size_t send(qpid::SessionState& s, const AMQFrame& f) { s.senderRecord(f); return f.size(); } // Send transfer command with no content. size_t transfer0(qpid::SessionState& s) { return send(s, transferFrame(false)); } // Send transfer frame with single content frame. @@ -126,136 +126,132 @@ using qpid::SessionPoint; QPID_AUTO_TEST_CASE(testSendGetReplyList) { qpid::SessionState s; - s.sender.getCommandPoint(); + s.senderGetCommandPoint(); transfer1(s, "abc"); transfers(s, "def"); transferN(s, "xyz"); - BOOST_CHECK_EQUAL(str(s.sender.getReplayList()),"CabcCdCeCfCxyz"); + BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(0,0))),"CabcCdCeCfCxyz"); // Ignore controls. - s.sender.record(AMQFrame(in_place<SessionFlushBody>())); - BOOST_CHECK_EQUAL(str(s.sender.getReplayList()),"CabcCdCeCfCxyz"); + s.senderRecord(AMQFrame(in_place<SessionFlushBody>())); + BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(2,0))),"CeCfCxyz"); } QPID_AUTO_TEST_CASE(testNeedFlush) { qpid::SessionState::Configuration c; // sync after 2 1-byte transfers or equivalent bytes. - c.replaySyncSize = 2*(transferFrameSize()+contentFrameSize()); + c.replayFlushLimit = 2*(transferFrameSize()+contentFrameSize()); qpid::SessionState s(SessionId(), c); - s.sender.getCommandPoint(); + s.senderGetCommandPoint(); transfers(s, "a"); - BOOST_CHECK(!s.sender.needFlush()); + BOOST_CHECK(!s.senderNeedFlush()); transfers(s, "b"); - BOOST_CHECK(s.sender.needFlush()); - s.sender.recordFlush(); - BOOST_CHECK(!s.sender.needFlush()); + BOOST_CHECK(s.senderNeedFlush()); + s.senderRecordFlush(); + BOOST_CHECK(!s.senderNeedFlush()); transfers(s, "c"); - BOOST_CHECK(!s.sender.needFlush()); + BOOST_CHECK(!s.senderNeedFlush()); transfers(s, "d"); - BOOST_CHECK(s.sender.needFlush()); - BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "CaCbCcCd"); + BOOST_CHECK(s.senderNeedFlush()); + BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint())), "CaCbCcCd"); } QPID_AUTO_TEST_CASE(testPeerConfirmed) { qpid::SessionState::Configuration c; // sync after 2 1-byte transfers or equivalent bytes. - c.replaySyncSize = 2*(transferFrameSize()+contentFrameSize()); + c.replayFlushLimit = 2*(transferFrameSize()+contentFrameSize()); qpid::SessionState s(SessionId(), c); - s.sender.getCommandPoint(); + s.senderGetCommandPoint(); transfers(s, "ab"); - BOOST_CHECK(s.sender.needFlush()); + BOOST_CHECK(s.senderNeedFlush()); transfers(s, "cd"); - BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "CaCbCcCd"); - s.sender.confirmed(SessionPoint(3)); - BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "Cd"); - BOOST_CHECK(!s.sender.needFlush()); - - // Never go backwards. - s.sender.confirmed(SessionPoint(2)); - s.sender.confirmed(SessionPoint(3)); + BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(0,0))), "CaCbCcCd"); + s.senderConfirmed(SessionPoint(3)); + BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(3,0))), "Cd"); + BOOST_CHECK(!s.senderNeedFlush()); // Multi-frame transfer. transfer1(s, "efg"); transfers(s, "xy"); - BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "CdCefgCxCy"); - BOOST_CHECK(s.sender.needFlush()); + BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(3,0))), "CdCefgCxCy"); + BOOST_CHECK(s.senderNeedFlush()); - s.sender.confirmed(SessionPoint(4)); - BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "CefgCxCy"); - BOOST_CHECK(s.sender.needFlush()); + s.senderConfirmed(SessionPoint(4)); + BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(4,0))), "CefgCxCy"); + BOOST_CHECK(s.senderNeedFlush()); - s.sender.confirmed(SessionPoint(5)); - BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "CxCy"); - BOOST_CHECK(s.sender.needFlush()); + s.senderConfirmed(SessionPoint(5)); + BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(5,0))), "CxCy"); + BOOST_CHECK(s.senderNeedFlush()); - s.sender.confirmed(SessionPoint(6)); - BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "Cy"); - BOOST_CHECK(!s.sender.needFlush()); + s.senderConfirmed(SessionPoint(6)); + BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(6,0))), "Cy"); + BOOST_CHECK(!s.senderNeedFlush()); } QPID_AUTO_TEST_CASE(testPeerCompleted) { qpid::SessionState s; - s.sender.getCommandPoint(); + s.senderGetCommandPoint(); // Completion implies confirmation transfers(s, "abc"); - BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "CaCbCc"); + BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(0,0))), "CaCbCc"); SequenceSet set(SequenceSet() + 0 + 1); - s.sender.completed(set); - BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "Cc"); + s.senderCompleted(set); + BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(2,0))), "Cc"); transfers(s, "def"); // We dont do out-of-order confirmation, so this will only confirm up to 3: set = SequenceSet(SequenceSet() + 2 + 3 + 5); - s.sender.completed(set); - BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "CeCf"); + s.senderCompleted(set); + BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(4,0))), "CeCf"); } QPID_AUTO_TEST_CASE(testReceive) { // Advance expected/received correctly qpid::SessionState s; - s.receiver.setCommandPoint(SessionPoint()); - BOOST_CHECK_EQUAL(s.receiver.getExpected(), SessionPoint(0)); - BOOST_CHECK_EQUAL(s.receiver.getReceived(), SessionPoint(0)); + s.receiverSetCommandPoint(SessionPoint()); + BOOST_CHECK_EQUAL(s.receiverGetExpected(), SessionPoint(0)); + BOOST_CHECK_EQUAL(s.receiverGetReceived(), SessionPoint(0)); - BOOST_CHECK(s.receiver.record(transferFrame(false))); - BOOST_CHECK_EQUAL(s.receiver.getExpected(), SessionPoint(1)); - BOOST_CHECK_EQUAL(s.receiver.getReceived(), SessionPoint(1)); + BOOST_CHECK(s.receiverRecord(transferFrame(false))); + BOOST_CHECK_EQUAL(s.receiverGetExpected(), SessionPoint(1)); + BOOST_CHECK_EQUAL(s.receiverGetReceived(), SessionPoint(1)); - BOOST_CHECK(s.receiver.record(transferFrame(true))); + BOOST_CHECK(s.receiverRecord(transferFrame(true))); SessionPoint point = SessionPoint(1, transferFrameSize()); - BOOST_CHECK_EQUAL(s.receiver.getExpected(), point); - BOOST_CHECK_EQUAL(s.receiver.getReceived(), point); - BOOST_CHECK(s.receiver.record(contentFrame("", false))); + BOOST_CHECK_EQUAL(s.receiverGetExpected(), point); + BOOST_CHECK_EQUAL(s.receiverGetReceived(), point); + BOOST_CHECK(s.receiverRecord(contentFrame("", false))); point.offset += contentFrameSize(0); - BOOST_CHECK_EQUAL(s.receiver.getExpected(), point); - BOOST_CHECK_EQUAL(s.receiver.getReceived(), point); - BOOST_CHECK(s.receiver.record(contentFrame("", true))); - BOOST_CHECK_EQUAL(s.receiver.getExpected(), SessionPoint(2)); - BOOST_CHECK_EQUAL(s.receiver.getReceived(), SessionPoint(2)); + BOOST_CHECK_EQUAL(s.receiverGetExpected(), point); + BOOST_CHECK_EQUAL(s.receiverGetReceived(), point); + BOOST_CHECK(s.receiverRecord(contentFrame("", true))); + BOOST_CHECK_EQUAL(s.receiverGetExpected(), SessionPoint(2)); + BOOST_CHECK_EQUAL(s.receiverGetReceived(), SessionPoint(2)); // Idempotence barrier, rewind expected & receive some duplicates. - s.receiver.setCommandPoint(SessionPoint(1)); - BOOST_CHECK(!s.receiver.record(transferFrame(false))); - BOOST_CHECK_EQUAL(s.receiver.getExpected(), SessionPoint(2)); - BOOST_CHECK_EQUAL(s.receiver.getReceived(), SessionPoint(2)); - BOOST_CHECK(s.receiver.record(transferFrame(false))); - BOOST_CHECK_EQUAL(s.receiver.getExpected(), SessionPoint(3)); - BOOST_CHECK_EQUAL(s.receiver.getReceived(), SessionPoint(3)); + s.receiverSetCommandPoint(SessionPoint(1)); + BOOST_CHECK(!s.receiverRecord(transferFrame(false))); + BOOST_CHECK_EQUAL(s.receiverGetExpected(), SessionPoint(2)); + BOOST_CHECK_EQUAL(s.receiverGetReceived(), SessionPoint(2)); + BOOST_CHECK(s.receiverRecord(transferFrame(false))); + BOOST_CHECK_EQUAL(s.receiverGetExpected(), SessionPoint(3)); + BOOST_CHECK_EQUAL(s.receiverGetReceived(), SessionPoint(3)); } QPID_AUTO_TEST_CASE(testCompleted) { // completed & unknownCompleted qpid::SessionState s; - s.receiver.setCommandPoint(SessionPoint()); - s.receiver.record(transferFrame(false)); - s.receiver.record(transferFrame(false)); - s.receiver.record(transferFrame(false)); - s.receiver.completed(1); - BOOST_CHECK_EQUAL(s.receiver.getUnknownComplete(), SequenceSet(SequenceSet()+1)); - s.receiver.completed(0); - BOOST_CHECK_EQUAL(s.receiver.getUnknownComplete(), + s.receiverSetCommandPoint(SessionPoint()); + s.receiverRecord(transferFrame(false)); + s.receiverRecord(transferFrame(false)); + s.receiverRecord(transferFrame(false)); + s.receiverCompleted(1); + BOOST_CHECK_EQUAL(s.receiverGetUnknownComplete(), SequenceSet(SequenceSet()+1)); + s.receiverCompleted(0); + BOOST_CHECK_EQUAL(s.receiverGetUnknownComplete(), SequenceSet(SequenceSet() + SequenceSet::Range(0,2))); - s.receiver.knownCompleted(SequenceSet(SequenceSet()+1)); - BOOST_CHECK_EQUAL(s.receiver.getUnknownComplete(), SequenceSet(SequenceSet()+2)); + s.receiverKnownCompleted(SequenceSet(SequenceSet()+1)); + BOOST_CHECK_EQUAL(s.receiverGetUnknownComplete(), SequenceSet(SequenceSet()+2)); // TODO aconway 2008-04-30: missing tests for known-completed. } diff --git a/cpp/src/tests/federation.py b/cpp/src/tests/federation.py index 98e34be0e9..0f52165587 100755 --- a/cpp/src/tests/federation.py +++ b/cpp/src/tests/federation.py @@ -57,7 +57,7 @@ def remote_port(): class Helper: def __init__(self, parent): self.parent = parent - self.session = parent.conn.session("2") + self.session = parent.conn.session("Helper") self.mc = managementClient(self.session.spec) self.mch = self.mc.addChannel(self.session) self.mc.syncWaitForStable(self.mch) @@ -126,7 +126,7 @@ class FederationTests(TestBase010): #send messages to remote broker and confirm it is routed to local broker r_conn = self.connect(host=remote_host(), port=remote_port()) - r_session = r_conn.session("1") + r_session = r_conn.session("test_pull_from_exchange") for i in range(1, 11): dp = r_session.delivery_properties(routing_key="my-key") @@ -153,7 +153,7 @@ class FederationTests(TestBase010): #setup queue on remote broker and add some messages r_conn = self.connect(host=remote_host(), port=remote_port()) - r_session = r_conn.session("1") + r_session = r_conn.session("test_pull_from_queue") r_session.queue_declare(queue="my-bridge-queue", exclusive=True, auto_delete=True) for i in range(1, 6): dp = r_session.delivery_properties(routing_key="my-bridge-queue") @@ -223,7 +223,7 @@ class FederationTests(TestBase010): #send messages to remote broker and confirm it is routed to local broker r_conn = self.connect(host=remote_host(), port=remote_port()) - r_session = r_conn.session("1") + r_session = r_conn.session("test_tracing") trace = [None, "exclude-me", "a,exclude-me,b", "also-exclude-me,c", "dont-exclude-me"] body = ["yes", "first-bad", "second-bad", "third-bad", "yes"] |