summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-05-20 13:44:34 +0000
committerAlan Conway <aconway@apache.org>2008-05-20 13:44:34 +0000
commit0333573627c831142aa251bfb1cabdb1e2bf438e (patch)
tree953bf8c624374c57953aa3f2888254d175609d9a /cpp/src
parent96024622ccfcc8fdd24b3c9ace44f7c8849fac46 (diff)
downloadqpid-python-0333573627c831142aa251bfb1cabdb1e2bf438e.tar.gz
Support for AMQP 0-10 sessions in C++ broker.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@658246 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/SessionState.cpp164
-rw-r--r--cpp/src/qpid/SessionState.h144
-rw-r--r--cpp/src/qpid/amqp_0_10/SessionHandler.cpp61
-rw-r--r--cpp/src/qpid/amqp_0_10/SessionHandler.h11
-rw-r--r--cpp/src/qpid/broker/Bridge.cpp1
-rw-r--r--cpp/src/qpid/broker/Broker.cpp55
-rw-r--r--cpp/src/qpid/broker/Broker.h3
-rw-r--r--cpp/src/qpid/broker/Connection.cpp4
-rw-r--r--cpp/src/qpid/broker/SaslAuthenticator.cpp17
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.cpp2
-rw-r--r--cpp/src/qpid/broker/SessionHandler.cpp209
-rw-r--r--cpp/src/qpid/broker/SessionHandler.h64
-rw-r--r--cpp/src/qpid/broker/SessionManager.cpp78
-rw-r--r--cpp/src/qpid/broker/SessionManager.h39
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp124
-rw-r--r--cpp/src/qpid/broker/SessionState.h47
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp6
-rw-r--r--cpp/src/qpid/framing/SequenceNumber.cpp5
-rw-r--r--cpp/src/qpid/framing/SequenceNumber.h3
-rw-r--r--cpp/src/tests/ClientSessionTest.cpp4
-rw-r--r--cpp/src/tests/SessionState.cpp146
-rwxr-xr-xcpp/src/tests/federation.py8
22 files changed, 478 insertions, 717 deletions
diff --git a/cpp/src/qpid/SessionState.cpp b/cpp/src/qpid/SessionState.cpp
index 8905fb5f9d..2a6b8303b3 100644
--- a/cpp/src/qpid/SessionState.cpp
+++ b/cpp/src/qpid/SessionState.cpp
@@ -31,6 +31,7 @@ using framing::AMQFrame;
using amqp_0_10::NotImplementedException;
using amqp_0_10::InvalidArgumentException;
using amqp_0_10::IllegalStateException;
+using amqp_0_10::ResourceLimitExceededException;
namespace {
bool isControl(const AMQFrame& f) {
@@ -63,78 +64,83 @@ bool SessionPoint::operator==(const SessionPoint& x) const {
return command == x.command && offset == x.offset;
}
-SessionState::SendState::SendState(SessionState& s) : session(&s), unflushedSize(0) {}
+SessionPoint SessionState::senderGetCommandPoint() { return sender.sendPoint; }
+SequenceSet SessionState::senderGetIncomplete() const { return sender.incomplete; }
+SessionPoint SessionState::senderGetReplayPoint() const { return sender.replayPoint; }
-const SessionPoint& SessionState::SendState::getCommandPoint() {
- return sendPoint;
+SessionState::ReplayRange SessionState::senderExpected(const SessionPoint& expect) {
+ if (expect < sender.replayPoint || sender.sendPoint < expect)
+ throw InvalidArgumentException(QPID_MSG(getId() << ": expected command-point out of range."));
+ ReplayList::iterator i = sender.replayList.begin();
+ SessionPoint p = sender.replayPoint;
+ while (i != sender.replayList.end() && p.command < expect.command)
+ p.advance(*i++);
+ assert(p.command == expect.command);
+ return boost::make_iterator_range(i, sender.replayList.end());
}
-bool SessionState::SendState::expected(const SessionPoint& point) {
- if (point < replayPoint || sendPoint < point)
- throw InvalidArgumentException(QPID_MSG(session->getId() << ": expected command-point out of range."));
- // FIXME aconway 2008-05-06: this is not strictly correct, we should keep
- // an intermediate replay pointer into the replay list.
- confirmed(point); // Drop commands prior to expected from replay.
- return (!replayList.empty());
-}
-
-void SessionState::SendState::record(const AMQFrame& f) {
+void SessionState::senderRecord(const AMQFrame& f) {
if (isControl(f)) return; // Ignore control frames.
- session->stateful = true;
- replayList.push_back(f);
- unflushedSize += f.size();
- incomplete += sendPoint.command;
- sendPoint.advance(f);
-}
-
-bool SessionState::SendState::needFlush() const { return unflushedSize >= session->config.replaySyncSize; }
-
-void SessionState::SendState::recordFlush() {
- assert(flushPoint <= sendPoint);
- flushPoint = sendPoint;
- unflushedSize = 0;
-}
-
-void SessionState::SendState::confirmed(const SessionPoint& confirmed) {
- if (confirmed > sendPoint)
- throw InvalidArgumentException(QPID_MSG(session->getId() << "Confirmed commands not yet sent."));
- ReplayList::iterator i = replayList.begin();
- while (i != replayList.end() && replayPoint.command < confirmed.command) {
- replayPoint.advance(*i);
- assert(replayPoint <= sendPoint);
- if (replayPoint > flushPoint)
- unflushedSize -= i->size();
+ stateful = true;
+ sender.replayList.push_back(f);
+ sender.unflushedSize += f.size();
+ sender.replaySize += f.size();
+ sender.incomplete += sender.sendPoint.command;
+ sender.sendPoint.advance(f);
+ if (config.replayHardLimit && config.replayHardLimit < sender.replaySize)
+ throw ResourceLimitExceededException("Replay bufffer exceeeded hard limit");
+}
+
+bool SessionState::senderNeedFlush() const {
+ return config.replayFlushLimit && sender.unflushedSize >= config.replayFlushLimit;
+}
+
+void SessionState::senderRecordFlush() {
+ assert(sender.flushPoint <= sender.sendPoint);
+ sender.flushPoint = sender.sendPoint;
+ sender.unflushedSize = 0;
+}
+
+void SessionState::senderConfirmed(const SessionPoint& confirmed) {
+ if (confirmed > sender.sendPoint)
+ throw InvalidArgumentException(QPID_MSG(getId() << "Confirmed commands not yet sent."));
+ ReplayList::iterator i = sender.replayList.begin();
+ while (i != sender.replayList.end() && sender.replayPoint.command < confirmed.command) {
+ sender.replayPoint.advance(*i);
+ assert(sender.replayPoint <= sender.sendPoint);
+ sender.replaySize -= i->size();
+ if (sender.replayPoint > sender.flushPoint)
+ sender.unflushedSize -= i->size();
++i;
}
- if (replayPoint > flushPoint) flushPoint = replayPoint;
- replayList.erase(replayList.begin(), i);
- assert(replayPoint.offset == 0);
+ if (sender.replayPoint > sender.flushPoint)
+ sender.flushPoint = sender.replayPoint;
+ sender.replayList.erase(sender.replayList.begin(), i);
+ assert(sender.replayPoint.offset == 0);
}
-void SessionState::SendState::completed(const SequenceSet& commands) {
+void SessionState::senderCompleted(const SequenceSet& commands) {
if (commands.empty()) return;
- incomplete -= commands;
+ sender.incomplete -= commands;
// Completion implies confirmation but we don't handle out-of-order
// confirmation, so confirm only the first contiguous range of commands.
- confirmed(SessionPoint(commands.rangesBegin()->end()));
+ senderConfirmed(SessionPoint(commands.rangesBegin()->end()));
}
-SessionState::ReceiveState::ReceiveState(SessionState& s) : session(&s) {}
-
-void SessionState::ReceiveState::setCommandPoint(const SessionPoint& point) {
- if (session->hasState() && point > received)
- throw InvalidArgumentException(QPID_MSG(session->getId() << ": Command-point out of range."));
- expected = point;
- if (expected > received)
- received = expected;
+void SessionState::receiverSetCommandPoint(const SessionPoint& point) {
+ if (hasState() && point > receiver.received)
+ throw InvalidArgumentException(QPID_MSG(getId() << ": Command-point out of range."));
+ receiver.expected = point;
+ if (receiver.expected > receiver.received)
+ receiver.received = receiver.expected;
}
-bool SessionState::ReceiveState::record(const AMQFrame& f) {
+bool SessionState::receiverRecord(const AMQFrame& f) {
if (isControl(f)) return true; // Ignore control frames.
- session->stateful = true;
- expected.advance(f);
- if (expected > received) {
- received = expected;
+ stateful = true;
+ receiver.expected.advance(f);
+ if (receiver.expected > receiver.received) {
+ receiver.received = receiver.expected;
return true;
}
else {
@@ -143,41 +149,43 @@ bool SessionState::ReceiveState::record(const AMQFrame& f) {
}
}
-void SessionState::ReceiveState::completed(SequenceNumber command, bool cumulative) {
- assert(command <= received.command); // Internal error to complete an unreceived command.
- assert(firstIncomplete <= command);
+void SessionState::receiverCompleted(SequenceNumber command, bool cumulative) {
+ assert(command <= receiver.received.command); // Internal error to complete an unreceived command.
+ assert(receiver.firstIncomplete <= command);
if (cumulative)
- unknownCompleted.add(firstIncomplete, command);
+ receiver.unknownCompleted.add(receiver.firstIncomplete, command);
else
- unknownCompleted += command;
- firstIncomplete = unknownCompleted.rangeContaining(firstIncomplete).end();
+ receiver.unknownCompleted += command;
+ receiver.firstIncomplete = receiver.unknownCompleted.rangeContaining(receiver.firstIncomplete).end();
+ QPID_LOG(debug, "Completed " << command << " unknown=" << receiver.unknownCompleted);
}
-void SessionState::ReceiveState::knownCompleted(const SequenceSet& commands) {
- if (!commands.empty() && commands.back() > received.command)
- throw InvalidArgumentException(QPID_MSG(session->getId() << ": Known-completed has invalid commands."));
- unknownCompleted -= commands;
+void SessionState::receiverKnownCompleted(const SequenceSet& commands) {
+ if (!commands.empty() && commands.back() > receiver.received.command)
+ throw InvalidArgumentException(QPID_MSG(getId() << ": Known-completed has invalid commands."));
+ receiver.unknownCompleted -= commands;
}
-SequenceNumber SessionState::ReceiveState::getCurrent() const {
- SequenceNumber current = expected.command; // FIXME aconway 2008-05-08: SequenceNumber arithmetic.
- return --current;
+const SessionPoint& SessionState::receiverGetExpected() const { return receiver.expected; }
+const SessionPoint& SessionState::receiverGetReceived() const { return receiver.received; }
+const SequenceSet& SessionState::receiverGetUnknownComplete() const { return receiver.unknownCompleted; }
+
+SequenceNumber SessionState::receiverGetCurrent() const {
+ SequenceNumber current = receiver.expected.command;
+ if (receiver.expected.offset == 0)
+ --current;
+ return current;
}
-// FIXME aconway 2008-05-02: implement sync & kill limits.
-SessionState::Configuration::Configuration()
- : replaySyncSize(std::numeric_limits<size_t>::max()),
- replayKillSize(std::numeric_limits<size_t>::max()) {}
+SessionState::Configuration::Configuration(size_t flush, size_t hard) :
+ replayFlushLimit(flush), replayHardLimit(hard) {}
-SessionState::SessionState(const SessionId& i, const Configuration& c)
- : sender(*this), receiver(*this), id(i), timeout(), config(c), stateful()
+SessionState::SessionState(const SessionId& i, const Configuration& c) : id(i), timeout(), config(c), stateful()
{
QPID_LOG(debug, "SessionState::SessionState " << id << ": " << this);
}
-bool SessionState::hasState() const {
- return stateful;
-}
+bool SessionState::hasState() const { return stateful; }
SessionState::~SessionState() {}
diff --git a/cpp/src/qpid/SessionState.h b/cpp/src/qpid/SessionState.h
index 7957825dd3..47fabb04c8 100644
--- a/cpp/src/qpid/SessionState.h
+++ b/cpp/src/qpid/SessionState.h
@@ -28,6 +28,7 @@
#include <qpid/framing/AMQFrame.h>
#include <qpid/framing/FrameHandler.h>
#include <boost/operators.hpp>
+#include <boost/range/iterator_range.hpp>
#include <vector>
#include <iosfwd>
@@ -70,135 +71,118 @@ std::ostream& operator<<(std::ostream&, const SessionPoint&);
* source-incompatbile API changes.
*/
class SessionState {
- public:
+ typedef std::vector<framing::AMQFrame> ReplayList;
- /** State for commands sent. Records commands for replay,
- * tracks confirmation and completion of sent commands.
- */
-class SendState {
public:
- typedef std::vector<framing::AMQFrame> ReplayList;
+
+ typedef boost::iterator_range<ReplayList::iterator> ReplayRange;
+
+ struct Configuration {
+ Configuration(size_t flush=0, size_t hard=0);
+ size_t replayFlushLimit; // Flush when the replay list >= N bytes. 0 disables.
+ size_t replayHardLimit; // Kill session if replay list > N bytes. 0 disables.
+ };
+
+ SessionState(const SessionId& =SessionId(), const Configuration& =Configuration());
+
+ virtual ~SessionState();
+
+ bool hasState() const;
+
+ const SessionId& getId() const { return id; }
+
+ uint32_t getTimeout() const { return timeout; }
+ void setTimeout(uint32_t seconds) { timeout = seconds; }
+
+ bool operator==(const SessionId& other) const { return id == other; }
+ bool operator==(const SessionState& other) const { return id == other.id; }
+
+ // ==== Functions for sender state.
/** Record frame f for replay. Should not be called during replay. */
- void record(const framing::AMQFrame& f);
+ virtual void senderRecord(const framing::AMQFrame& f);
/** @return true if we should send flush for confirmed and completed commands. */
- bool needFlush() const;
+ virtual bool senderNeedFlush() const;
/** Called when flush for confirmed and completed commands is sent to peer. */
- void recordFlush();
+ virtual void senderRecordFlush();
/** Called when the peer confirms up to comfirmed. */
- void confirmed(const SessionPoint& confirmed);
+ virtual void senderConfirmed(const SessionPoint& confirmed);
/** Called when the peer indicates commands completed */
- void completed(const SequenceSet& commands);
+ virtual void senderCompleted(const SequenceSet& commands);
- /** Point from which we can replay. All data < replayPoint is confirmed. */
- const SessionPoint& getReplayPoint() const { return replayPoint; }
-
- /** Get the replay list, starting from getReplayPoint() */
- // TODO aconway 2008-04-30: should be const, but FrameHandler takes non-const AMQFrame&.
- ReplayList& getReplayList() { return replayList; }
-
- /** Point from which the next data will be sent. */
- const SessionPoint& getCommandPoint();
+ /** Point from which the next new (not replayed) data will be sent. */
+ virtual SessionPoint senderGetCommandPoint();
/** Set of outstanding incomplete commands */
- const SequenceSet& getIncomplete() const { return incomplete; }
+ virtual SequenceSet senderGetIncomplete() const;
+
+ /** Point from which we can replay. */
+ virtual SessionPoint senderGetReplayPoint() const;
/** Peer expecting commands from this point.
- *@return true if replay is required, sets replayPoint.
+ virtual *@return Range of frames to be replayed.
*/
- bool expected(const SessionPoint& expected);
+ virtual ReplayRange senderExpected(const SessionPoint& expected);
- private:
- SendState(SessionState& s);
- SessionState* session;
- // invariant: replayPoint <= flushPoint <= sendPoint
- SessionPoint replayPoint; // Can replay from this point
- SessionPoint flushPoint; // Point of last flush
- SessionPoint sendPoint; // Send from this point
- ReplayList replayList; // Starts from replayPoint.
- size_t unflushedSize; // Un-flushed bytes in replay list.
- SequenceSet incomplete; // Commands sent and not yet completed.
+ // ==== Functions for receiver state
- friend class SessionState;
-};
-
- /** State for commands received.
- * Idempotence barrier for duplicate commands, tracks completion
- * and of received commands.
- */
-class ReceiveState {
- public:
/** Set the command point. */
- void setCommandPoint(const SessionPoint& point);
+ virtual void receiverSetCommandPoint(const SessionPoint& point);
/** Returns true if frame should be be processed, false if it is a duplicate. */
- bool record(const framing::AMQFrame& f);
+ virtual bool receiverRecord(const framing::AMQFrame& f);
/** Command completed locally */
- void completed(SequenceNumber command, bool cumulative=false);
+ virtual void receiverCompleted(SequenceNumber command, bool cumulative=false);
/** Peer has indicated commands are known completed */
- void knownCompleted(const SequenceSet& commands);
+ virtual void receiverKnownCompleted(const SequenceSet& commands);
/** Get the incoming command point */
- const SessionPoint& getExpected() const { return expected; }
+ virtual const SessionPoint& receiverGetExpected() const;
/** Get the received high-water-mark, may be > getExpected() during replay */
- const SessionPoint& getReceived() const { return received; }
+ virtual const SessionPoint& receiverGetReceived() const;
/** Completed commands that the peer may not know about */
- const SequenceSet& getUnknownComplete() const { return unknownCompleted; }
+ virtual const SequenceSet& receiverGetUnknownComplete() const;
/** ID of the command currently being handled. */
- SequenceNumber getCurrent() const;
+ virtual SequenceNumber receiverGetCurrent() const;
private:
- ReceiveState(SessionState&);
+ struct SendState {
+ SendState() : session(), unflushedSize(), replaySize() {}
+ SessionState* session;
+ // invariant: replayPoint <= flushPoint <= sendPoint
+ SessionPoint replayPoint; // Can replay from this point
+ SessionPoint flushPoint; // Point of last flush
+ SessionPoint sendPoint; // Send from this point
+ ReplayList replayList; // Starts from replayPoint.
+ size_t unflushedSize; // Un-flushed bytes in replay list.
+ size_t replaySize; // Total bytes in replay list.
+ SequenceSet incomplete; // Commands sent and not yet completed.
+ } sender;
+
+ struct ReceiveState {
+ ReceiveState() : session() {}
SessionState* session;
SessionPoint expected; // Expected from here
SessionPoint received; // Received to here. Invariant: expected <= received.
SequenceSet unknownCompleted; // Received & completed, may not not known-complete by peer.
SequenceNumber firstIncomplete; // First incomplete command.
+ } receiver;
- friend class SessionState;
- };
-
- struct Configuration {
- Configuration();
- size_t replaySyncSize; // Issue a sync when the replay list holds >= N bytes
- size_t replayKillSize; // Kill session if replay list grows beyond N bytes.
- };
-
- SessionState(const SessionId& =SessionId(), const Configuration& =Configuration());
-
- virtual ~SessionState();
-
- const SessionId& getId() const { return id; }
- uint32_t getTimeout() const { return timeout; }
- void setTimeout(uint32_t seconds) { timeout = seconds; }
-
- bool operator==(const SessionId& other) const { return id == other; }
- bool operator==(const SessionState& other) const { return id == other.id; }
-
- SendState sender; ///< State for commands sent
- ReceiveState receiver; ///< State for commands received
-
- bool hasState() const;
-
- private:
SessionId id;
uint32_t timeout;
Configuration config;
bool stateful;
-
- friend class SendState;
- friend class ReceiveState;
};
inline bool operator==(const SessionId& id, const SessionState& s) { return s == id; }
diff --git a/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
index 3fb2579e8c..6d43dd1789 100644
--- a/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
+++ b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
@@ -33,10 +33,8 @@ namespace amqp_0_10 {
using namespace framing;
using namespace std;
-SessionHandler::SessionHandler() : peer(channel), ignoring(), sendReady(), receiveReady() {}
-
-SessionHandler::SessionHandler(FrameHandler& out, ChannelId ch)
- : channel(ch, &out), peer(channel), ignoring(false) {}
+SessionHandler::SessionHandler(FrameHandler* out, ChannelId ch)
+ : channel(ch, out), peer(channel), ignoring(false), sendReady(), receiveReady() {}
SessionHandler::~SessionHandler() {}
@@ -75,7 +73,7 @@ void SessionHandler::handleIn(AMQFrame& f) {
checkAttached();
if (!receiveReady)
throw IllegalStateException(QPID_MSG(getState()->getId() << ": Not ready to receive data"));
- if (!getState()->receiver.record(f))
+ if (!getState()->receiverRecord(f))
return; // Ignore duplicates.
getInHandler()->handle(f);
}
@@ -100,10 +98,10 @@ void SessionHandler::handleOut(AMQFrame& f) {
checkAttached();
if (!sendReady)
throw IllegalStateException(QPID_MSG(getState()->getId() << ": Not ready to send data"));
- getState()->sender.record(f);
- if (getState()->sender.needFlush()) {
+ getState()->senderRecord(f);
+ if (getState()->senderNeedFlush()) {
peer.flush(false, true, true);
- getState()->sender.recordFlush();
+ getState()->senderRecordFlush();
}
channel.handle(f);
}
@@ -128,7 +126,7 @@ void SessionHandler::attach(const std::string& name, bool force) {
if (getState()->hasState())
peer.flush(true, true, true);
else
- sendCommandPoint();
+ sendCommandPoint(getState()->senderGetCommandPoint());
}
void SessionHandler::attached(const std::string& name) {
@@ -168,7 +166,7 @@ void SessionHandler::timeout(uint32_t t) {
void SessionHandler::commandPoint(const SequenceNumber& id, uint64_t offset) {
checkAttached();
- getState()->receiver.setCommandPoint(SessionPoint(id, offset));
+ getState()->receiverSetCommandPoint(SessionPoint(id, offset));
if (!receiveReady) {
receiveReady = true;
readyToReceive();
@@ -177,32 +175,37 @@ void SessionHandler::commandPoint(const SequenceNumber& id, uint64_t offset) {
void SessionHandler::expected(const SequenceSet& commands, const Array& /*fragments*/) {
checkAttached();
- if (commands.empty() && getState()->hasState())
- throw IllegalStateException(
+ if (getState()->hasState()) { // Replay
+ if (commands.empty()) throw IllegalStateException(
QPID_MSG(getState()->getId() << ": has state but client is attaching as new session."));
- getState()->sender.expected(commands.empty() ? SequenceNumber(0) : commands.front());
- if (!sendReady) // send command point if not already sent
- sendCommandPoint();
+ // TODO aconway 2008-05-12: support replay of partial commands.
+ // Here we always round down to the last command boundary.
+ SessionPoint expectedPoint = commands.empty() ? SequenceNumber(0) : SessionPoint(commands.front(),0);
+ SessionState::ReplayRange replay = getState()->senderExpected(expectedPoint);
+ sendCommandPoint(expectedPoint);
+ std::for_each(replay.begin(), replay.end(), out); // replay
+ }
+ else
+ sendCommandPoint(getState()->senderGetCommandPoint());
}
void SessionHandler::confirmed(const SequenceSet& commands, const Array& /*fragments*/) {
checkAttached();
// Ignore non-contiguous confirmations.
- if (!commands.empty() && commands.front() >= getState()->sender.getReplayPoint()) {
- getState()->sender.confirmed(commands.rangesBegin()->last());
- }
+ if (!commands.empty() && commands.front() >= getState()->senderGetReplayPoint())
+ getState()->senderConfirmed(commands.rangesBegin()->last());
}
void SessionHandler::completed(const SequenceSet& commands, bool /*timelyReply*/) {
checkAttached();
- getState()->sender.completed(commands);
+ getState()->senderCompleted(commands);
if (!commands.empty())
peer.knownCompleted(commands); // Always send a timely reply
}
void SessionHandler::knownCompleted(const SequenceSet& commands) {
checkAttached();
- getState()->receiver.knownCompleted(commands);
+ getState()->receiverKnownCompleted(commands);
}
void SessionHandler::flush(bool expected, bool confirmed, bool completed) {
@@ -210,18 +213,18 @@ void SessionHandler::flush(bool expected, bool confirmed, bool completed) {
if (expected) {
SequenceSet expectSet;
if (getState()->hasState())
- expectSet.add(getState()->receiver.getExpected().command);
+ expectSet.add(getState()->receiverGetExpected().command);
peer.expected(expectSet, Array());
}
if (confirmed) {
SequenceSet confirmSet;
- if (!getState()->receiver.getUnknownComplete().empty())
- confirmSet.add(getState()->receiver.getUnknownComplete().front(),
- getState()->receiver.getReceived().command);
+ if (!getState()->receiverGetUnknownComplete().empty())
+ confirmSet.add(getState()->receiverGetUnknownComplete().front(),
+ getState()->receiverGetReceived().command);
peer.confirmed(confirmSet, Array());
}
if (completed)
- peer.completed(getState()->receiver.getUnknownComplete(), true);
+ peer.completed(getState()->receiverGetUnknownComplete(), true);
}
void SessionHandler::gap(const SequenceSet& /*commands*/) {
@@ -237,20 +240,20 @@ void SessionHandler::sendDetach()
void SessionHandler::sendCompletion() {
checkAttached();
- peer.completed(getState()->receiver.getUnknownComplete(), true);
+ peer.completed(getState()->receiverGetUnknownComplete(), true);
}
void SessionHandler::sendAttach(bool force) {
checkAttached();
+ QPID_LOG(debug, "SessionHandler::sendAttach attach id=" << getState()->getId());
peer.attach(getState()->getId().getName(), force);
if (getState()->hasState())
peer.flush(true, true, true);
else
- sendCommandPoint();
+ sendCommandPoint(getState()->senderGetCommandPoint());
}
-void SessionHandler::sendCommandPoint() {
- SessionPoint point(getState()->sender.getCommandPoint());
+void SessionHandler::sendCommandPoint(const SessionPoint& point) {
peer.commandPoint(point.command, point.offset);
if (!sendReady) {
sendReady = true;
diff --git a/cpp/src/qpid/amqp_0_10/SessionHandler.h b/cpp/src/qpid/amqp_0_10/SessionHandler.h
index 85577ebafc..ccbe597bfc 100644
--- a/cpp/src/qpid/amqp_0_10/SessionHandler.h
+++ b/cpp/src/qpid/amqp_0_10/SessionHandler.h
@@ -25,10 +25,10 @@
#include "qpid/framing/ChannelHandler.h"
#include "qpid/framing/AMQP_AllProxy.h"
#include "qpid/framing/AMQP_AllOperations.h"
+#include "qpid/SessionState.h"
namespace qpid {
-class SessionState;
namespace amqp_0_10 {
@@ -45,8 +45,7 @@ class SessionHandler : public framing::AMQP_AllOperations::SessionHandler,
public:
typedef framing::AMQP_AllProxy::Session Peer;
- SessionHandler();
- SessionHandler(framing::FrameHandler& out, uint16_t channel);
+ SessionHandler(framing::FrameHandler* out=0, uint16_t channel=0);
~SessionHandler();
void setChannel(uint16_t ch) { channel = ch; }
@@ -63,7 +62,6 @@ class SessionHandler : public framing::AMQP_AllOperations::SessionHandler,
void sendAttach(bool force);
void sendTimeout(uint32_t t);
void sendFlush();
- void sendCommandPoint();
/** True if the handler is ready to send and receive */
bool ready() const;
@@ -96,8 +94,8 @@ class SessionHandler : public framing::AMQP_AllOperations::SessionHandler,
// Notification of events
virtual void readyToSend() {}
virtual void readyToReceive() {}
- virtual void handleDetach();
+ virtual void handleDetach();
virtual void handleIn(framing::AMQFrame&);
virtual void handleOut(framing::AMQFrame&);
@@ -108,8 +106,9 @@ class SessionHandler : public framing::AMQP_AllOperations::SessionHandler,
Peer peer;
bool ignoring;
bool sendReady, receiveReady;
- // FIXME aconway 2008-05-07: move handler-related functions from SessionState.
+ private:
+ void sendCommandPoint(const SessionPoint&);
};
}} // namespace qpid::amqp_0_10
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp
index 337992992f..f3e103dfaf 100644
--- a/cpp/src/qpid/broker/Bridge.cpp
+++ b/cpp/src/qpid/broker/Bridge.cpp
@@ -59,6 +59,7 @@ void Bridge::create(ConnectionState& c)
peer.reset(new framing::AMQP_ServerProxy(*channelHandler));
session->attach(name, false);
+ session->commandPoint(0,0);
if (args.i_src_is_local) {
//TODO: handle 'push' here... simplest way is to create frames and pass them to Connection::received()
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 0a45c46d30..4f7686aac4 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -53,6 +53,9 @@
#if HAVE_SASL
#include <sasl/sasl.h>
+static const bool AUTH_DEFAULT=true;
+#else
+static const bool AUTH_DEFAULT=false;
#endif
using qpid::sys::ProtocolFactory;
@@ -81,41 +84,25 @@ Broker::Options::Options(const std::string& name) :
stagingThreshold(5000000),
enableMgmt(1),
mgmtPubInterval(10),
-#if HAVE_SASL
- auth(true),
-#else
- auth(false),
-#endif
- realm("QPID"),
- ack(0)
+ auth(AUTH_DEFAULT),
+ replayFlushLimit(64),
+ replayHardLimit(0)
{
int c = sys::SystemInfo::concurrency();
workerThreads=c+1;
addOptions()
- ("data-dir", optValue(dataDir,"DIR"),
- "Directory to contain persistent data generated by the broker")
- ("no-data-dir", optValue(noDataDir),
- "Don't use a data directory. No persistent configuration will be loaded or stored")
- ("port,p", optValue(port,"PORT"),
- "Tells the broker to listen on PORT")
- ("worker-threads", optValue(workerThreads, "N"),
- "Sets the broker thread pool size")
- ("max-connections", optValue(maxConnections, "N"),
- "Sets the maximum allowed connections")
- ("connection-backlog", optValue(connectionBacklog, "N"),
- "Sets the connection backlog limit for the server socket")
- ("staging-threshold", optValue(stagingThreshold, "N"),
- "Stages messages over N bytes to disk")
- ("mgmt-enable,m", optValue(enableMgmt,"yes|no"),
- "Enable Management")
- ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"),
- "Management Publish Interval")
- ("auth", optValue(auth, "yes|no"),
- "Enable authentication, if disabled all incoming connections will be trusted")
- ("realm", optValue(realm, "REALM"),
- "Use the given realm when performing authentication")
- ("ack", optValue(ack, "N"),
- "Send session.ack/solicit-ack at least every N frames. 0 disables voluntary ack/solitict-ack");
+ ("data-dir", optValue(dataDir,"DIR"), "Directory to contain persistent data generated by the broker")
+ ("no-data-dir", optValue(noDataDir), "Don't use a data directory. No persistent configuration will be loaded or stored")
+ ("port,p", optValue(port,"PORT"), "Tells the broker to listen on PORT")
+ ("worker-threads", optValue(workerThreads, "N"), "Sets the broker thread pool size")
+ ("max-connections", optValue(maxConnections, "N"), "Sets the maximum allowed connections")
+ ("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket")
+ ("staging-threshold", optValue(stagingThreshold, "N"), "Stages messages over N bytes to disk")
+ ("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management")
+ ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), "Management Publish Interval")
+ ("auth", optValue(auth, "yes|no"), "Enable authentication, if disabled all incoming connections will be trusted")
+ ("replay-flush-limit", optValue(replayFlushLimit, "KB"), "Send flush request when the replay buffer reaches this limit. 0 means no limit.")
+ ("replay-hard-limit", optValue(replayHardLimit, "KB"), "Kill a session if its replay buffer exceeds this limit. 0 means no limit.");
}
const std::string empty;
@@ -132,7 +119,11 @@ Broker::Broker(const Broker::Options& conf) :
dataDir(conf.noDataDir ? std::string () : conf.dataDir),
links(this),
factory(*this),
- sessionManager(conf.ack)
+ sessionManager(
+ qpid::SessionState::Configuration(
+ conf.replayFlushLimit*1024, // convert kb to bytes.
+ conf.replayHardLimit*1024),
+ *this)
{
if(conf.enableMgmt){
QPID_LOG(info, "Management enabled");
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index a1eaf4f62f..7092a86181 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -83,7 +83,8 @@ class Broker : public sys::Runnable, public Plugin::Target,
uint16_t mgmtPubInterval;
bool auth;
std::string realm;
- uint32_t ack;
+ size_t replayFlushLimit;
+ size_t replayHardLimit;
};
virtual ~Broker();
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index d156b4a914..463193a346 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -124,8 +124,8 @@ void Connection::idleIn(){}
void Connection::closed(){ // Physically closed, suspend open sessions.
try {
- for (ChannelMap::iterator i = channels.begin(); i != channels.end(); ++i)
- ptr_map_ptr(i)->handleDetach();
+ while (!channels.empty())
+ ptr_map_ptr(channels.begin())->handleDetach();
while (!exclusiveQueues.empty()) {
Queue::shared_ptr q(exclusiveQueues.front());
q->releaseExclusiveOwnership();
diff --git a/cpp/src/qpid/broker/SaslAuthenticator.cpp b/cpp/src/qpid/broker/SaslAuthenticator.cpp
index d48b258ba2..a542211147 100644
--- a/cpp/src/qpid/broker/SaslAuthenticator.cpp
+++ b/cpp/src/qpid/broker/SaslAuthenticator.cpp
@@ -89,13 +89,20 @@ void NullAuthenticator::getMechanisms(Array& mechanisms)
mechanisms.add(boost::shared_ptr<FieldValue>(new Str16Value("ANONYMOUS")));
}
-void NullAuthenticator::start(const string& /*mechanism*/, const string& /*response*/)
+void NullAuthenticator::start(const string& mechanism, const string& response)
{
QPID_LOG(warning, "SASL: No Authentication Performed");
-
- // TODO: Figure out what should actually be set in this case
- connection.setUserId("anonymous");
-
+ if (mechanism == "PLAIN") { // Old behavior
+ if (response.size() > 0 && response[0] == (char) 0) {
+ string temp = response.substr(1);
+ string::size_type i = temp.find((char)0);
+ string uid = temp.substr(0, i);
+ string pwd = temp.substr(i + 1);
+ connection.setUserId(uid);
+ }
+ } else {
+ connection.setUserId("anonymous");
+ }
client.tune(framing::CHANNEL_MAX, connection.getFrameMax(), 0, 0);
}
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp
index e284451d14..2d0edde27b 100644
--- a/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -410,7 +410,7 @@ void SessionAdapter::MessageHandlerImpl::accept(const framing::SequenceSet& comm
framing::MessageAcquireResult SessionAdapter::MessageHandlerImpl::acquire(const framing::SequenceSet& transfers)
{
- //TODO: change this when SequenceNumberSet is deleted along with preview code
+ // FIXME aconway 2008-05-12: create SequenceSet directly, no need for intermediate results vector.
SequenceNumberSet results;
RangedOperation f = boost::bind(&SemanticState::acquire, &state, _1, _2, boost::ref(results));
transfers.for_each(f);
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp
index 2ec0988fc0..2f09c6b5ac 100644
--- a/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/cpp/src/qpid/broker/SessionHandler.cpp
@@ -21,12 +21,6 @@
#include "SessionHandler.h"
#include "SessionState.h"
#include "Connection.h"
-#include "ConnectionState.h"
-#include "qpid/framing/reply_exceptions.h"
-#include "qpid/framing/constants.h"
-#include "qpid/framing/ClientInvoker.h"
-#include "qpid/framing/ServerInvoker.h"
-#include "qpid/framing/all_method_bodies.h"
#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
@@ -38,11 +32,9 @@ using namespace std;
using namespace qpid::sys;
SessionHandler::SessionHandler(Connection& c, ChannelId ch)
- : InOutHandler(0, &out),
- connection(c), channel(ch, &c.getOutput()),
- proxy(out), // Via my own handleOut() for L2 data.
- peerSession(channel), // Direct to channel for L2 commands.
- ignoring(false)
+ : amqp_0_10::SessionHandler(&c.getOutput(), ch),
+ connection(c),
+ proxy(out)
{}
SessionHandler::~SessionHandler() {}
@@ -52,191 +44,52 @@ ClassId classId(AMQMethodBody* m) { return m ? m->amqpMethodId() : 0; }
MethodId methodId(AMQMethodBody* m) { return m ? m->amqpClassId() : 0; }
} // namespace
-void SessionHandler::handleIn(AMQFrame& f) {
- // Note on channel states: a channel is attached if session != 0
- AMQMethodBody* m = f.getBody()->getMethod();
- try {
- if (ignoring && !(m && m->isA<SessionDetachedBody>())) {
- return;
- }
- if (m && isValid(m) && invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m)) {
- //frame was a valid session control and has been handled
- return;
- } else if (session.get()) {
- //we are attached and frame was not a session control so it is for upper layers
- session->handle(f);
- } else if (m && m->isA<SessionDetachedBody>()) {
- handleDetach();
- connection.closeChannel(channel.get());
- } else {
- throw NotAttachedException(QPID_MSG("Channel " << channel.get() << " is not attached"));
- }
- }catch(const ChannelException& e){
- QPID_LOG(error, "Session detached due to: " << e.what());
- peerSession.detached(name, e.code);
+void SessionHandler::channelException(uint16_t, const std::string&) {
handleDetach();
- connection.closeChannel(channel.get());
- }catch(const ConnectionException& e){
- connection.close(e.code, e.what(), classId(m), methodId(m));
- }catch(const std::exception& e){
- connection.close(501, e.what(), classId(m), methodId(m));
- }
}
-bool SessionHandler::isValid(AMQMethodBody* m) {
- return session.get() || m->isA<SessionAttachBody>() || m->isA<SessionAttachedBody>();
-}
-
-void SessionHandler::handleOut(AMQFrame& f) {
- channel.handle(f); // Send it.
- if (session->sent(f))
- peerSession.flush(false, false, true);
-}
-
-void SessionHandler::assertAttached(const char* method) const {
- if (!session.get()) {
- std::cout << "SessionHandler::assertAttached() failed for " << method << std::endl;
- throw NotAttachedException(
- QPID_MSG(method << " failed: No session for channel "
- << getChannel()));
- }
-}
-
-void SessionHandler::assertClosed(const char* method) const {
- if (session.get())
- throw SessionBusyException(
- QPID_MSG(method << " failed: channel " << channel.get()
- << " is already open."));
+void SessionHandler::connectionException(uint16_t code, const std::string& msg) {
+ connection.close(code, msg, 0, 0);
}
ConnectionState& SessionHandler::getConnection() { return connection; }
-const ConnectionState& SessionHandler::getConnection() const { return connection; }
-
-//new methods:
-void SessionHandler::attach(const std::string& _name, bool /*force*/)
-{
- name = _name;//TODO: this should be used in conjunction with
- //userid for connection as sessions identity
- //TODO: need to revise session manager to support resume as well
- assertClosed("attach");
- session.reset(new SessionState(0, this, 0, 0, name));
- peerSession.attached(name);
- peerSession.commandPoint(session->nextOut, 0);
-}
-
-void SessionHandler::attached(const std::string& _name)
-{
- name = _name;//TODO: this should be used in conjunction with
- //userid for connection as sessions identity
- session.reset(new SessionState(0, this, 0, 0, name));
- peerSession.commandPoint(session->nextOut, 0);
-}
+const ConnectionState& SessionHandler::getConnection() const { return connection; }
-void SessionHandler::detach(const std::string& name)
-{
- assertAttached("detach");
- peerSession.detached(name, session::NORMAL);
- handleDetach();
+void SessionHandler::handleDetach() {
+ amqp_0_10::SessionHandler::handleDetach();
assert(&connection.getChannel(channel.get()) == this);
+ if (session.get())
+ connection.getBroker().getSessionManager().detach(session);
+ assert(!session.get());
connection.closeChannel(channel.get());
}
-void SessionHandler::detached(const std::string& name, uint8_t code)
-{
- ignoring = false;
- handleDetach();
- if (code) {
- //no error
- } else {
- //error occured
- QPID_LOG(warning, "Received session.closed: "<< name << " " << code);
- }
- connection.closeChannel(channel.get());
-}
-
-void SessionHandler::handleDetach()
-{
- if (session.get()) {
- session->detach();
- session.reset();
- }
-}
-
-void SessionHandler::requestDetach()
-{
- //TODO: request timeout when python can handle it
- //peerSession.requestTimeout(0);
- ignoring = true;
- peerSession.detach(name);
-}
-
-void SessionHandler::requestTimeout(uint32_t t)
-{
- session->setTimeout(t);
- peerSession.timeout(t);
-}
-
-void SessionHandler::timeout(uint32_t t)
-{
- session->setTimeout(t);
-}
-
-void SessionHandler::commandPoint(const framing::SequenceNumber& id, uint64_t offset)
-{
- if (offset) throw NotImplementedException("Non-zero byte offset not yet supported for command-point");
-
- session->nextIn = id;
-}
-
-void SessionHandler::expected(const framing::SequenceSet& commands, const framing::Array& fragments)
-{
- if (!commands.empty() || fragments.size()) {
- throw NotImplementedException("Session resumption not yet supported");
- }
+void SessionHandler::setState(const std::string& name, bool force) {
+ assert(!session.get());
+ SessionId id(connection.getUserId(), name);
+ session = connection.broker.getSessionManager().attach(*this, id, force);
}
-void SessionHandler::confirmed(const framing::SequenceSet& /*commands*/, const framing::Array& /*fragments*/)
-{
- //don't really care too much about this yet
-}
+FrameHandler* SessionHandler::getInHandler() { return session.get(); }
+qpid::SessionState* SessionHandler::getState() { return session.get(); }
-void SessionHandler::completed(const framing::SequenceSet& commands, bool timelyReply)
-{
- session->complete(commands);
- if (timelyReply) {
- peerSession.knownCompleted(session->knownCompleted);
- session->knownCompleted.clear();
- }
+void SessionHandler::readyToSend() {
+ if (session.get()) session->readyToSend();
}
-void SessionHandler::knownCompleted(const framing::SequenceSet& commands)
-{
- session->completed.remove(commands);
-}
-
-void SessionHandler::flush(bool expected, bool confirmed, bool completed)
-{
- if (expected) {
- peerSession.expected(SequenceSet(session->nextIn), Array());
- }
- if (confirmed) {
- peerSession.confirmed(session->completed, Array());
- }
- if (completed) {
- peerSession.completed(session->completed, true);
- }
-}
-
-
-void SessionHandler::sendCompletion()
-{
- peerSession.completed(session->completed, true);
+// TODO aconway 2008-05-12: hacky - handle attached for bridge clients.
+// We need to integrate the client code so we can run a real client
+// in the bridge.
+//
+void SessionHandler::attached(const std::string& name) {
+ if (session.get())
+ checkName(name);
+ else {
+ SessionId id(connection.getUserId(), name);
+ SessionState::Configuration config = connection.broker.getSessionManager().getSessionConfig();
+ session.reset(new SessionState(connection.getBroker(), *this, id, config));
}
-
-void SessionHandler::gap(const framing::SequenceSet& /*commands*/)
-{
- throw NotImplementedException("gap not yet supported");
}
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h
index 47c534441a..1aa3137fdf 100644
--- a/cpp/src/qpid/broker/SessionHandler.h
+++ b/cpp/src/qpid/broker/SessionHandler.h
@@ -22,19 +22,12 @@
*
*/
-#include "qpid/framing/FrameHandler.h"
-#include "qpid/framing/AMQP_ClientOperations.h"
-#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/amqp_0_10/SessionHandler.h"
#include "qpid/framing/AMQP_ClientProxy.h"
-#include "qpid/framing/amqp_types.h"
-#include "qpid/framing/Array.h"
-#include "qpid/framing/ChannelHandler.h"
-#include "qpid/framing/SequenceNumber.h"
-
-
-#include <boost/noncopyable.hpp>
namespace qpid {
+class SessionState;
+
namespace broker {
class Connection;
@@ -46,65 +39,38 @@ class SessionState;
* receives incoming frames, handles session controls and manages the
* association between the channel and a session.
*/
-class SessionHandler : public framing::AMQP_ServerOperations::SessionHandler,
- public framing::FrameHandler::InOutHandler,
- private boost::noncopyable
-{
+class SessionHandler : public amqp_0_10::SessionHandler {
public:
SessionHandler(Connection&, framing::ChannelId);
~SessionHandler();
- /** Returns 0 if not attached to a session */
+ /** Get broker::SessionState */
SessionState* getSession() { return session.get(); }
const SessionState* getSession() const { return session.get(); }
- framing::ChannelId getChannel() const { return channel.get(); }
-
ConnectionState& getConnection();
const ConnectionState& getConnection() const;
framing::AMQP_ClientProxy& getProxy() { return proxy; }
const framing::AMQP_ClientProxy& getProxy() const { return proxy; }
- void requestDetach();
- void handleDetach();
- void sendCompletion();
-
- protected:
- void handleIn(framing::AMQFrame&);
- void handleOut(framing::AMQFrame&);
+ virtual void handleDetach();
- private:
- //new methods:
- void attach(const std::string& name, bool force);
+ // Overrides
void attached(const std::string& name);
- void detach(const std::string& name);
- void detached(const std::string& name, uint8_t code);
-
- void requestTimeout(uint32_t t);
- void timeout(uint32_t t);
-
- void commandPoint(const framing::SequenceNumber& id, uint64_t offset);
- void expected(const framing::SequenceSet& commands, const framing::Array& fragments);
- void confirmed(const framing::SequenceSet& commands,const framing::Array& fragments);
- void completed(const framing::SequenceSet& commands, bool timelyReply);
- void knownCompleted(const framing::SequenceSet& commands);
- void flush(bool expected, bool confirmed, bool completed);
- void gap(const framing::SequenceSet& commands);
- void assertAttached(const char* method) const;
- void assertActive(const char* method) const;
- void assertClosed(const char* method) const;
-
- bool isValid(framing::AMQMethodBody*);
+ protected:
+ virtual void setState(const std::string& sessionName, bool force);
+ virtual qpid::SessionState* getState();
+ virtual framing::FrameHandler* getInHandler();
+ virtual void channelException(uint16_t code, const std::string& msg);
+ virtual void connectionException(uint16_t code, const std::string& msg);
+ virtual void readyToSend();
+ private:
Connection& connection;
- framing::ChannelHandler channel;
framing::AMQP_ClientProxy proxy;
- framing::AMQP_ClientProxy::Session peerSession;
- bool ignoring;
std::auto_ptr<SessionState> session;
- std::string name;//TODO: this should be part of the session state and replace the id
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionManager.cpp b/cpp/src/qpid/broker/SessionManager.cpp
index d7bae737fc..789e43b902 100644
--- a/cpp/src/qpid/broker/SessionManager.cpp
+++ b/cpp/src/qpid/broker/SessionManager.cpp
@@ -40,68 +40,58 @@ using boost::intrusive_ptr;
using namespace sys;
using namespace framing;
-SessionManager::SessionManager(uint32_t a) : ack(a) {}
+SessionManager::SessionManager(const SessionState::Configuration& c, Broker& b)
+ : config(c), broker(b) {}
-SessionManager::~SessionManager() {}
+SessionManager::~SessionManager() {
+ detached.clear(); // Must clear before destructor as session dtor will call forget()
+}
-// FIXME aconway 2008-02-01: pass handler*, allow open unattached.
-std::auto_ptr<SessionState> SessionManager::open(
- SessionHandler& h, uint32_t timeout_, std::string _name)
-{
+std::auto_ptr<SessionState> SessionManager::attach(SessionHandler& h, const SessionId& id, bool/*force*/) {
Mutex::ScopedLock l(lock);
- std::auto_ptr<SessionState> session(
- new SessionState(this, &h, timeout_, ack, _name));
- active.insert(session->getId());
+ eraseExpired(); // Clean up expired table
+ std::pair<Attached::iterator, bool> insert = attached.insert(id);
+ if (!insert.second)
+ throw SessionBusyException(QPID_MSG("Session already attached: " << id));
+ Detached::iterator i = std::find(detached.begin(), detached.end(), id);
+ std::auto_ptr<SessionState> state;
+ if (i == detached.end()) {
+ state.reset(new SessionState(broker, h, id, config));
for_each(observers.begin(), observers.end(),
- boost::bind(&Observer::opened, _1,boost::ref(*session)));
- return session;
+ boost::bind(&Observer::opened, _1,boost::ref(*state)));
+ }
+ else {
+ state.reset(detached.release(i).release());
+ state->attach(h);
+ }
+ return state;
+ // FIXME aconway 2008-04-29: implement force
}
-void SessionManager::suspend(std::auto_ptr<SessionState> session) {
+void SessionManager::detach(std::auto_ptr<SessionState> session) {
Mutex::ScopedLock l(lock);
- active.erase(session->getId());
- session->suspend();
+ attached.erase(session->getId());
+ session->detach();
+ if (session->getTimeout() > 0) {
session->expiry = AbsTime(now(),session->getTimeout()*TIME_SEC);
if (session->mgmtObject.get() != 0)
session->mgmtObject->set_expireTime ((uint64_t) Duration (session->expiry));
- suspended.push_back(session.release()); // In expiry order
+ detached.push_back(session.release()); // In expiry order
eraseExpired();
}
-
-std::auto_ptr<SessionState> SessionManager::resume(const Uuid& id)
-{
- Mutex::ScopedLock l(lock);
- eraseExpired();
- if (active.find(id) != active.end())
- throw SessionBusyException(
- QPID_MSG("Session already active: " << id));
- Suspended::iterator i = std::find_if(
- suspended.begin(), suspended.end(),
- boost::bind(std::equal_to<Uuid>(), id, boost::bind(&SessionState::getId, _1))
- );
- if (i == suspended.end())
- throw InvalidArgumentException(
- QPID_MSG("No suspended session with id=" << id));
- active.insert(id);
- std::auto_ptr<SessionState> state(suspended.release(i).release());
- return state;
}
-void SessionManager::erase(const framing::Uuid& id)
-{
- Mutex::ScopedLock l(lock);
- active.erase(id);
-}
+void SessionManager::forget(const SessionId& id) { attached.erase(id); }
void SessionManager::eraseExpired() {
// Called with lock held.
- if (!suspended.empty()) {
- Suspended::iterator keep = std::lower_bound(
- suspended.begin(), suspended.end(), now(),
+ if (!detached.empty()) {
+ Detached::iterator keep = std::lower_bound(
+ detached.begin(), detached.end(), now(),
boost::bind(std::less<AbsTime>(), boost::bind(&SessionState::expiry, _1), _2));
- if (suspended.begin() != keep) {
- QPID_LOG(debug, "Expiring sessions: " << log::formatList(suspended.begin(), keep));
- suspended.erase(suspended.begin(), keep);
+ if (detached.begin() != keep) {
+ QPID_LOG(debug, "Expiring sessions: " << log::formatList(detached.begin(), keep));
+ detached.erase(detached.begin(), keep);
}
}
}
diff --git a/cpp/src/qpid/broker/SessionManager.h b/cpp/src/qpid/broker/SessionManager.h
index ad064c69bb..9a4142f613 100644
--- a/cpp/src/qpid/broker/SessionManager.h
+++ b/cpp/src/qpid/broker/SessionManager.h
@@ -22,7 +22,7 @@
*
*/
-#include <qpid/framing/Uuid.h>
+#include <qpid/SessionState.h>
#include <qpid/sys/Time.h>
#include <qpid/sys/Mutex.h>
#include <qpid/RefCounted.h>
@@ -37,7 +37,7 @@
namespace qpid {
namespace broker {
-
+class Broker;
class SessionState;
class SessionHandler;
@@ -50,44 +50,43 @@ class SessionManager : private boost::noncopyable {
* Observer notified of SessionManager events.
*/
struct Observer : public RefCounted {
+ /** Called when a stateless session is attached. */
virtual void opened(SessionState&) {}
};
- SessionManager(uint32_t ack);
+ SessionManager(const qpid::SessionState::Configuration&, Broker&);
~SessionManager();
/** Open a new active session, caller takes ownership */
- std::auto_ptr<SessionState> open(SessionHandler& c, uint32_t timeout_, std::string name);
+ std::auto_ptr<SessionState> attach(SessionHandler& h, const SessionId& id, bool/*force*/);
- /** Suspend a session, start it's timeout counter.
- * The factory takes ownership.
- */
- void suspend(std::auto_ptr<SessionState> session);
+ /** Return a detached session to the manager, start the timeout counter. */
+ void detach(std::auto_ptr<SessionState>);
- /** Resume a suspended session.
- *@throw Exception if timed out or non-existant.
- */
- std::auto_ptr<SessionState> resume(const framing::Uuid&);
+ /** Forget about an attached session. Called by SessionState destructor. */
+ void forget(const SessionId&);
/** Add an Observer. */
void add(const boost::intrusive_ptr<Observer>&);
+ Broker& getBroker() const { return broker; }
+
+ const qpid::SessionState::Configuration& getSessionConfig() const { return config; }
+
private:
- typedef boost::ptr_vector<SessionState> Suspended;
- typedef std::set<framing::Uuid> Active;
+ typedef boost::ptr_vector<SessionState> Detached; // Sorted in expiry order.
+ typedef std::set<SessionId> Attached;
typedef std::vector<boost::intrusive_ptr<Observer> > Observers;
- void erase(const framing::Uuid&);
void eraseExpired();
sys::Mutex lock;
- Suspended suspended;
- Active active;
- uint32_t ack;
+ Detached detached;
+ Attached attached;
+ qpid::SessionState::Configuration config;
Observers observers;
-
- friend class SessionState; // removes deleted sessions from active set.
+ Broker& broker;
};
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 2ef1ed2de4..c851162046 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -32,6 +32,7 @@
#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
+#include <boost/lexical_cast.hpp>
namespace qpid {
namespace broker {
@@ -45,59 +46,46 @@ using qpid::management::Manageable;
using qpid::management::Args;
SessionState::SessionState(
- SessionManager* f, SessionHandler* h, uint32_t timeout_, uint32_t ack, string& _name)
- : framing::SessionState(ack, timeout_ > 0), nextOut(0),
- factory(f), handler(h), id(true), timeout(timeout_),
- broker(h->getConnection().broker),
- version(h->getConnection().getVersion()),
- ignoring(false), name(_name),
+ Broker& b, SessionHandler& h, const SessionId& id, const SessionState::Configuration& config)
+ : qpid::SessionState(id, config),
+ broker(b), handler(&h),
+ ignoring(false),
semanticState(*this, *this),
adapter(semanticState),
msgBuilder(&broker.getStore(), broker.getStagingThreshold()),
- ackOp(boost::bind(&SemanticState::completed, &semanticState, _1, _2)),
enqueuedOp(boost::bind(&SessionState::enqueued, this, _1))
{
- getConnection().outputTasks.addOutputTask(&semanticState);
-
Manageable* parent = broker.GetVhostObject ();
-
- if (parent != 0)
- {
+ if (parent != 0) {
ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
-
- if (agent.get () != 0)
- {
+ if (agent.get () != 0) {
mgmtObject = management::Session::shared_ptr
- (new management::Session (this, parent, name));
- mgmtObject->set_attached (1);
- mgmtObject->set_clientRef (h->getConnection().GetManagementObject()->getObjectId());
- mgmtObject->set_channelId (h->getChannel());
- mgmtObject->set_detachedLifespan (getTimeout());
+ (new management::Session (this, parent, getId().getName()));
+ mgmtObject->set_attached (0);
agent->addObject (mgmtObject);
}
}
+ attach(h);
}
SessionState::~SessionState() {
// Remove ID from active session list.
- if (factory)
- factory->erase(getId());
+ // FIXME aconway 2008-05-12: Need to distinguish outgoing sessions established by bridge,
+ // they don't belong in the manager. For now rely on uniqueness of UUIDs.
+ //
+ broker.getSessionManager().forget(getId());
if (mgmtObject.get () != 0)
mgmtObject->resourceDestroy ();
}
-SessionHandler* SessionState::getHandler() {
- return handler;
-}
-
AMQP_ClientProxy& SessionState::getProxy() {
assert(isAttached());
- return getHandler()->getProxy();
+ return handler->getProxy();
}
ConnectionState& SessionState::getConnection() {
assert(isAttached());
- return getHandler()->getConnection();
+ return handler->getConnection();
}
bool SessionState::isLocal(const ConnectionToken* t) const
@@ -106,18 +94,19 @@ bool SessionState::isLocal(const ConnectionToken* t) const
}
void SessionState::detach() {
- getConnection().outputTasks.removeOutputTask(&semanticState);
+ // activateOutput can be called in a different thread, lock to protect attached status
Mutex::ScopedLock l(lock);
+ QPID_LOG(debug, getId() << ": detached on broker.");
+ getConnection().outputTasks.removeOutputTask(&semanticState);
handler = 0;
if (mgmtObject.get() != 0)
- {
mgmtObject->set_attached (0);
}
-}
void SessionState::attach(SessionHandler& h) {
- {
+ // activateOutput can be called in a different thread, lock to protect attached status
Mutex::ScopedLock l(lock);
+ QPID_LOG(debug, getId() << ": attached on broker.");
handler = &h;
if (mgmtObject.get() != 0)
{
@@ -126,16 +115,13 @@ void SessionState::attach(SessionHandler& h) {
mgmtObject->set_channelId (h.getChannel());
}
}
- h.getConnection().outputTasks.addOutputTask(&semanticState);
-}
-void SessionState::activateOutput()
-{
+void SessionState::activateOutput() {
+ // activateOutput can be called in a different thread, lock to protect attached status
Mutex::ScopedLock l(lock);
- if (isAttached()) {
+ if (isAttached())
getConnection().outputTasks.activateOutput();
}
-}
//This class could be used as the callback for queue notifications
//if not attached, it can simply ignore the callback, else pass it
//on to the connection
@@ -155,7 +141,7 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId,
case management::Session::METHOD_DETACH :
if (handler != 0)
{
- handler->requestDetach();
+ handler->sendDetach();
}
status = Manageable::STATUS_OK;
break;
@@ -179,35 +165,25 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId,
return status;
}
-void SessionState::handleCommand(framing::AMQMethodBody* method, SequenceNumber& id)
-{
- id = nextIn++;
+void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceNumber& id) {
Invoker::Result invocation = invoke(adapter, *method);
- completed.add(id);
-
+ receiverCompleted(id);
if (!invocation.wasHandled()) {
throw NotImplementedException(QPID_MSG("Not implemented: " << *method));
} else if (invocation.hasResult()) {
- nextOut++;//execution result is now a command, so the counter must be incremented
getProxy().getExecution().result(id, invocation.getResult());
}
if (method->isSync()) {
incomplete.process(enqueuedOp, true);
sendCompletion();
}
- //TODO: if window gets too large send unsolicited completion
}
-void SessionState::handleContent(AMQFrame& frame, SequenceNumber& id)
+void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
{
- intrusive_ptr<Message> msg(msgBuilder.getMessage());
- if (frame.getBof() && frame.getBos()) {//start of frameset
- id = nextIn++;
+ if (frame.getBof() && frame.getBos()) //start of frameset
msgBuilder.start(id);
- msg = msgBuilder.getMessage();
- } else {
- id = msg->getCommandId();
- }
+ intrusive_ptr<Message> msg(msgBuilder.getMessage());
msgBuilder.handle(frame);
if (frame.getEof() && frame.getEos()) {//end of frameset
if (frame.getBof()) {
@@ -240,19 +216,14 @@ void SessionState::handleContent(AMQFrame& frame, SequenceNumber& id)
void SessionState::enqueued(boost::intrusive_ptr<Message> msg)
{
- completed.add(msg->getCommandId());
- if (msg->requiresAccept()) {
- nextOut++;//accept is a command, so the counter must be incremented
+ receiverCompleted(msg->getCommandId());
+ if (msg->requiresAccept())
getProxy().getMessage().accept(SequenceSet(msg->getCommandId()));
}
-}
void SessionState::handle(AMQFrame& frame)
{
- if (ignoring) return;
- received(frame);
-
- SequenceNumber commandId;
+ SequenceNumber commandId = receiverGetCurrent();
try {
//TODO: make command handling more uniform, regardless of whether
//commands carry content.
@@ -277,29 +248,36 @@ void SessionState::handle(AMQFrame& frame)
} else {
getProxy().getExecution().exception(e.code, commandId, 0, 0, 0, e.what(), FieldTable());
}
- timeout = 0;
ignoring = true;
- handler->requestDetach();
+ handler->sendDetach();
}
}
DeliveryId SessionState::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token)
{
uint32_t maxFrameSize = getConnection().getFrameMax();
- MessageDelivery::deliver(msg, getProxy().getHandler(), nextOut, token, maxFrameSize);
- return nextOut++;
+ assert(senderGetCommandPoint().offset == 0);
+ SequenceNumber commandId = senderGetCommandPoint().command;
+ MessageDelivery::deliver(msg, getProxy().getHandler(), commandId, token, maxFrameSize);
+ assert(senderGetCommandPoint() == SessionPoint(commandId+1, 0)); // Delivery has moved sendPoint.
+ return commandId;
}
-void SessionState::sendCompletion()
-{
- handler->sendCompletion();
+void SessionState::sendCompletion() { handler->sendCompletion(); }
+
+void SessionState::senderCompleted(const SequenceSet& commands) {
+ qpid::SessionState::senderCompleted(commands);
+ commands.for_each(boost::bind(&SemanticState::completed, &semanticState, _1, _2));
}
-void SessionState::complete(const SequenceSet& commands)
-{
- knownCompleted.add(commands);
- commands.for_each(ackOp);
+void SessionState::readyToSend() {
+ QPID_LOG(debug, getId() << ": ready to send, activating output.");
+ assert(handler);
+ sys::AggregateOutput& tasks = handler->getConnection().outputTasks;
+ tasks.addOutputTask(&semanticState);
+ tasks.activateOutput();
}
+Broker& SessionState::getBroker() { return broker; }
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h
index ae860e84c9..7b70789161 100644
--- a/cpp/src/qpid/broker/SessionState.h
+++ b/cpp/src/qpid/broker/SessionState.h
@@ -22,11 +22,9 @@
*
*/
-#include "qpid/framing/Uuid.h"
+#include "qpid/SessionState.h"
#include "qpid/framing/FrameHandler.h"
-#include "qpid/framing/SessionState.h"
#include "qpid/framing/SequenceSet.h"
-#include "qpid/framing/ProtocolVersion.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Time.h"
#include "qpid/management/Manageable.h"
@@ -63,21 +61,20 @@ class SessionManager;
* Broker-side session state includes sessions handler chains, which may
* themselves have state.
*/
-class SessionState : public framing::SessionState,
+class SessionState : public qpid::SessionState,
public SessionContext,
public DeliveryAdapter,
- public management::Manageable
+ public management::Manageable,
+ public framing::FrameHandler
{
public:
+ SessionState(Broker&, SessionHandler&, const SessionId&, const SessionState::Configuration&);
~SessionState();
bool isAttached() const { return handler; }
void detach();
void attach(SessionHandler& handler);
-
- SessionHandler* getHandler();
-
/** @pre isAttached() */
framing::AMQP_ClientProxy& getProxy();
@@ -85,18 +82,15 @@ class SessionState : public framing::SessionState,
ConnectionState& getConnection();
bool isLocal(const ConnectionToken* t) const;
- uint32_t getTimeout() const { return timeout; }
- void setTimeout(uint32_t t) { timeout = t; }
-
- Broker& getBroker() { return broker; }
- framing::ProtocolVersion getVersion() const { return version; }
+ Broker& getBroker();
/** OutputControl **/
void activateOutput();
void handle(framing::AMQFrame& frame);
- void complete(const framing::SequenceSet& ranges);
+ void senderCompleted(const framing::SequenceSet& ranges);
+
void sendCompletion();
//delivery adapter methods:
@@ -107,29 +101,13 @@ class SessionState : public framing::SessionState,
management::Manageable::status_t
ManagementMethod (uint32_t methodId, management::Args& args);
- // Normally SessionManager creates sessions.
- SessionState(SessionManager*,
- SessionHandler* out,
- uint32_t timeout,
- uint32_t ackInterval,
- std::string& name);
-
-
- framing::SequenceSet completed;
- framing::SequenceSet knownCompleted;
- framing::SequenceNumber nextIn;
- framing::SequenceNumber nextOut;
+ void readyToSend();
private:
- typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation;
- SessionManager* factory;
+ Broker& broker;
SessionHandler* handler;
- framing::Uuid id;
- uint32_t timeout;
sys::AbsTime expiry; // Used by SessionManager.
- Broker& broker;
- framing::ProtocolVersion version;
sys::Mutex lock;
bool ignoring;
std::string name;
@@ -139,12 +117,11 @@ class SessionState : public framing::SessionState,
MessageBuilder msgBuilder;
IncompleteMessageList incomplete;
- RangedOperation ackOp;
IncompleteMessageList::CompletionListener enqueuedOp;
management::Session::shared_ptr mgmtObject;
- void handleCommand(framing::AMQMethodBody* method, framing::SequenceNumber& id);
- void handleContent(framing::AMQFrame& frame, framing::SequenceNumber& id);
+ void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id);
+ void handleContent(framing::AMQFrame& frame, const framing::SequenceNumber& id);
void enqueued(boost::intrusive_ptr<Message> msg);
friend class SessionManager;
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index bca6c49c13..59353d7637 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -71,9 +71,9 @@ struct ClusterDeliverHandler : public FrameHandler {
void handle(AMQFrame& f) {
next->handle(f);
- Mutex::ScopedLock l(sender.lock);
- sender.busy=false;
- sender.lock.notify();
+ Mutex::ScopedLock l(senderLock);
+ senderBusy=false;
+ senderLock.notify();
}
};
diff --git a/cpp/src/qpid/framing/SequenceNumber.cpp b/cpp/src/qpid/framing/SequenceNumber.cpp
index cba00c860a..9e682d89e4 100644
--- a/cpp/src/qpid/framing/SequenceNumber.cpp
+++ b/cpp/src/qpid/framing/SequenceNumber.cpp
@@ -21,6 +21,7 @@
#include "SequenceNumber.h"
#include "Buffer.h"
+#include <ostream>
using qpid::framing::SequenceNumber;
using qpid::framing::Buffer;
@@ -102,4 +103,8 @@ int32_t operator-(const SequenceNumber& a, const SequenceNumber& b)
return result;
}
+std::ostream& operator<<(std::ostream& o, const SequenceNumber& n) {
+ return o << n.getValue();
+}
+
}}
diff --git a/cpp/src/qpid/framing/SequenceNumber.h b/cpp/src/qpid/framing/SequenceNumber.h
index d659bec5c1..aacd77501b 100644
--- a/cpp/src/qpid/framing/SequenceNumber.h
+++ b/cpp/src/qpid/framing/SequenceNumber.h
@@ -22,6 +22,7 @@
#define _framing_SequenceNumber_h
#include "amqp_types.h"
+#include <iosfwd>
namespace qpid {
namespace framing {
@@ -66,6 +67,8 @@ struct Window
SequenceNumber lwm;
};
+std::ostream& operator<<(std::ostream& o, const SequenceNumber& n);
+
}} // namespace qpid::framing
diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp
index dfda9ecae1..aeff35dbf0 100644
--- a/cpp/src/tests/ClientSessionTest.cpp
+++ b/cpp/src/tests/ClientSessionTest.cpp
@@ -150,7 +150,7 @@ QPID_AUTO_TEST_CASE(testDispatcherThread)
ClientSessionFixture fix;
fix.session =fix.connection.newSession(ASYNC);
fix.declareSubscribe();
- size_t count = 1000;
+ size_t count = 10;
DummyListener listener(fix.session, "my-dest", count);
sys::Thread t(listener);
for (size_t i = 0; i < count; ++i) {
@@ -205,7 +205,7 @@ QPID_AUTO_TEST_CASE(testSendToSelf) {
sys::Thread runner(fix.subs);//start dispatcher thread
string data("msg");
Message msg(data, "myq");
- const uint count=1000;
+ const uint count=10;
for (uint i = 0; i < count; ++i) {
fix.session.messageTransfer(content=msg);
}
diff --git a/cpp/src/tests/SessionState.cpp b/cpp/src/tests/SessionState.cpp
index 71b90ea9f1..4beef87cfe 100644
--- a/cpp/src/tests/SessionState.cpp
+++ b/cpp/src/tests/SessionState.cpp
@@ -58,7 +58,7 @@ string str(const AMQFrame& f) {
return "H"; // Must be a header.
}
// Make a string from a range of frames.
-string str(const vector<AMQFrame>& frames) {
+string str(const boost::iterator_range<vector<AMQFrame>::const_iterator>& frames) {
string (*strFrame)(const AMQFrame&) = str;
return applyAccumulate(frames.begin(), frames.end(), string(), ptr_fun(strFrame));
}
@@ -84,7 +84,7 @@ AMQFrame contentFrameChar(char content, bool isLast=true) {
}
// Send frame & return size of frame.
-size_t send(qpid::SessionState& s, const AMQFrame& f) { s.sender.record(f); return f.size(); }
+size_t send(qpid::SessionState& s, const AMQFrame& f) { s.senderRecord(f); return f.size(); }
// Send transfer command with no content.
size_t transfer0(qpid::SessionState& s) { return send(s, transferFrame(false)); }
// Send transfer frame with single content frame.
@@ -126,136 +126,132 @@ using qpid::SessionPoint;
QPID_AUTO_TEST_CASE(testSendGetReplyList) {
qpid::SessionState s;
- s.sender.getCommandPoint();
+ s.senderGetCommandPoint();
transfer1(s, "abc");
transfers(s, "def");
transferN(s, "xyz");
- BOOST_CHECK_EQUAL(str(s.sender.getReplayList()),"CabcCdCeCfCxyz");
+ BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(0,0))),"CabcCdCeCfCxyz");
// Ignore controls.
- s.sender.record(AMQFrame(in_place<SessionFlushBody>()));
- BOOST_CHECK_EQUAL(str(s.sender.getReplayList()),"CabcCdCeCfCxyz");
+ s.senderRecord(AMQFrame(in_place<SessionFlushBody>()));
+ BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(2,0))),"CeCfCxyz");
}
QPID_AUTO_TEST_CASE(testNeedFlush) {
qpid::SessionState::Configuration c;
// sync after 2 1-byte transfers or equivalent bytes.
- c.replaySyncSize = 2*(transferFrameSize()+contentFrameSize());
+ c.replayFlushLimit = 2*(transferFrameSize()+contentFrameSize());
qpid::SessionState s(SessionId(), c);
- s.sender.getCommandPoint();
+ s.senderGetCommandPoint();
transfers(s, "a");
- BOOST_CHECK(!s.sender.needFlush());
+ BOOST_CHECK(!s.senderNeedFlush());
transfers(s, "b");
- BOOST_CHECK(s.sender.needFlush());
- s.sender.recordFlush();
- BOOST_CHECK(!s.sender.needFlush());
+ BOOST_CHECK(s.senderNeedFlush());
+ s.senderRecordFlush();
+ BOOST_CHECK(!s.senderNeedFlush());
transfers(s, "c");
- BOOST_CHECK(!s.sender.needFlush());
+ BOOST_CHECK(!s.senderNeedFlush());
transfers(s, "d");
- BOOST_CHECK(s.sender.needFlush());
- BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "CaCbCcCd");
+ BOOST_CHECK(s.senderNeedFlush());
+ BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint())), "CaCbCcCd");
}
QPID_AUTO_TEST_CASE(testPeerConfirmed) {
qpid::SessionState::Configuration c;
// sync after 2 1-byte transfers or equivalent bytes.
- c.replaySyncSize = 2*(transferFrameSize()+contentFrameSize());
+ c.replayFlushLimit = 2*(transferFrameSize()+contentFrameSize());
qpid::SessionState s(SessionId(), c);
- s.sender.getCommandPoint();
+ s.senderGetCommandPoint();
transfers(s, "ab");
- BOOST_CHECK(s.sender.needFlush());
+ BOOST_CHECK(s.senderNeedFlush());
transfers(s, "cd");
- BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "CaCbCcCd");
- s.sender.confirmed(SessionPoint(3));
- BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "Cd");
- BOOST_CHECK(!s.sender.needFlush());
-
- // Never go backwards.
- s.sender.confirmed(SessionPoint(2));
- s.sender.confirmed(SessionPoint(3));
+ BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(0,0))), "CaCbCcCd");
+ s.senderConfirmed(SessionPoint(3));
+ BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(3,0))), "Cd");
+ BOOST_CHECK(!s.senderNeedFlush());
// Multi-frame transfer.
transfer1(s, "efg");
transfers(s, "xy");
- BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "CdCefgCxCy");
- BOOST_CHECK(s.sender.needFlush());
+ BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(3,0))), "CdCefgCxCy");
+ BOOST_CHECK(s.senderNeedFlush());
- s.sender.confirmed(SessionPoint(4));
- BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "CefgCxCy");
- BOOST_CHECK(s.sender.needFlush());
+ s.senderConfirmed(SessionPoint(4));
+ BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(4,0))), "CefgCxCy");
+ BOOST_CHECK(s.senderNeedFlush());
- s.sender.confirmed(SessionPoint(5));
- BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "CxCy");
- BOOST_CHECK(s.sender.needFlush());
+ s.senderConfirmed(SessionPoint(5));
+ BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(5,0))), "CxCy");
+ BOOST_CHECK(s.senderNeedFlush());
- s.sender.confirmed(SessionPoint(6));
- BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "Cy");
- BOOST_CHECK(!s.sender.needFlush());
+ s.senderConfirmed(SessionPoint(6));
+ BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(6,0))), "Cy");
+ BOOST_CHECK(!s.senderNeedFlush());
}
QPID_AUTO_TEST_CASE(testPeerCompleted) {
qpid::SessionState s;
- s.sender.getCommandPoint();
+ s.senderGetCommandPoint();
// Completion implies confirmation
transfers(s, "abc");
- BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "CaCbCc");
+ BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(0,0))), "CaCbCc");
SequenceSet set(SequenceSet() + 0 + 1);
- s.sender.completed(set);
- BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "Cc");
+ s.senderCompleted(set);
+ BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(2,0))), "Cc");
transfers(s, "def");
// We dont do out-of-order confirmation, so this will only confirm up to 3:
set = SequenceSet(SequenceSet() + 2 + 3 + 5);
- s.sender.completed(set);
- BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "CeCf");
+ s.senderCompleted(set);
+ BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(4,0))), "CeCf");
}
QPID_AUTO_TEST_CASE(testReceive) {
// Advance expected/received correctly
qpid::SessionState s;
- s.receiver.setCommandPoint(SessionPoint());
- BOOST_CHECK_EQUAL(s.receiver.getExpected(), SessionPoint(0));
- BOOST_CHECK_EQUAL(s.receiver.getReceived(), SessionPoint(0));
+ s.receiverSetCommandPoint(SessionPoint());
+ BOOST_CHECK_EQUAL(s.receiverGetExpected(), SessionPoint(0));
+ BOOST_CHECK_EQUAL(s.receiverGetReceived(), SessionPoint(0));
- BOOST_CHECK(s.receiver.record(transferFrame(false)));
- BOOST_CHECK_EQUAL(s.receiver.getExpected(), SessionPoint(1));
- BOOST_CHECK_EQUAL(s.receiver.getReceived(), SessionPoint(1));
+ BOOST_CHECK(s.receiverRecord(transferFrame(false)));
+ BOOST_CHECK_EQUAL(s.receiverGetExpected(), SessionPoint(1));
+ BOOST_CHECK_EQUAL(s.receiverGetReceived(), SessionPoint(1));
- BOOST_CHECK(s.receiver.record(transferFrame(true)));
+ BOOST_CHECK(s.receiverRecord(transferFrame(true)));
SessionPoint point = SessionPoint(1, transferFrameSize());
- BOOST_CHECK_EQUAL(s.receiver.getExpected(), point);
- BOOST_CHECK_EQUAL(s.receiver.getReceived(), point);
- BOOST_CHECK(s.receiver.record(contentFrame("", false)));
+ BOOST_CHECK_EQUAL(s.receiverGetExpected(), point);
+ BOOST_CHECK_EQUAL(s.receiverGetReceived(), point);
+ BOOST_CHECK(s.receiverRecord(contentFrame("", false)));
point.offset += contentFrameSize(0);
- BOOST_CHECK_EQUAL(s.receiver.getExpected(), point);
- BOOST_CHECK_EQUAL(s.receiver.getReceived(), point);
- BOOST_CHECK(s.receiver.record(contentFrame("", true)));
- BOOST_CHECK_EQUAL(s.receiver.getExpected(), SessionPoint(2));
- BOOST_CHECK_EQUAL(s.receiver.getReceived(), SessionPoint(2));
+ BOOST_CHECK_EQUAL(s.receiverGetExpected(), point);
+ BOOST_CHECK_EQUAL(s.receiverGetReceived(), point);
+ BOOST_CHECK(s.receiverRecord(contentFrame("", true)));
+ BOOST_CHECK_EQUAL(s.receiverGetExpected(), SessionPoint(2));
+ BOOST_CHECK_EQUAL(s.receiverGetReceived(), SessionPoint(2));
// Idempotence barrier, rewind expected & receive some duplicates.
- s.receiver.setCommandPoint(SessionPoint(1));
- BOOST_CHECK(!s.receiver.record(transferFrame(false)));
- BOOST_CHECK_EQUAL(s.receiver.getExpected(), SessionPoint(2));
- BOOST_CHECK_EQUAL(s.receiver.getReceived(), SessionPoint(2));
- BOOST_CHECK(s.receiver.record(transferFrame(false)));
- BOOST_CHECK_EQUAL(s.receiver.getExpected(), SessionPoint(3));
- BOOST_CHECK_EQUAL(s.receiver.getReceived(), SessionPoint(3));
+ s.receiverSetCommandPoint(SessionPoint(1));
+ BOOST_CHECK(!s.receiverRecord(transferFrame(false)));
+ BOOST_CHECK_EQUAL(s.receiverGetExpected(), SessionPoint(2));
+ BOOST_CHECK_EQUAL(s.receiverGetReceived(), SessionPoint(2));
+ BOOST_CHECK(s.receiverRecord(transferFrame(false)));
+ BOOST_CHECK_EQUAL(s.receiverGetExpected(), SessionPoint(3));
+ BOOST_CHECK_EQUAL(s.receiverGetReceived(), SessionPoint(3));
}
QPID_AUTO_TEST_CASE(testCompleted) {
// completed & unknownCompleted
qpid::SessionState s;
- s.receiver.setCommandPoint(SessionPoint());
- s.receiver.record(transferFrame(false));
- s.receiver.record(transferFrame(false));
- s.receiver.record(transferFrame(false));
- s.receiver.completed(1);
- BOOST_CHECK_EQUAL(s.receiver.getUnknownComplete(), SequenceSet(SequenceSet()+1));
- s.receiver.completed(0);
- BOOST_CHECK_EQUAL(s.receiver.getUnknownComplete(),
+ s.receiverSetCommandPoint(SessionPoint());
+ s.receiverRecord(transferFrame(false));
+ s.receiverRecord(transferFrame(false));
+ s.receiverRecord(transferFrame(false));
+ s.receiverCompleted(1);
+ BOOST_CHECK_EQUAL(s.receiverGetUnknownComplete(), SequenceSet(SequenceSet()+1));
+ s.receiverCompleted(0);
+ BOOST_CHECK_EQUAL(s.receiverGetUnknownComplete(),
SequenceSet(SequenceSet() + SequenceSet::Range(0,2)));
- s.receiver.knownCompleted(SequenceSet(SequenceSet()+1));
- BOOST_CHECK_EQUAL(s.receiver.getUnknownComplete(), SequenceSet(SequenceSet()+2));
+ s.receiverKnownCompleted(SequenceSet(SequenceSet()+1));
+ BOOST_CHECK_EQUAL(s.receiverGetUnknownComplete(), SequenceSet(SequenceSet()+2));
// TODO aconway 2008-04-30: missing tests for known-completed.
}
diff --git a/cpp/src/tests/federation.py b/cpp/src/tests/federation.py
index 98e34be0e9..0f52165587 100755
--- a/cpp/src/tests/federation.py
+++ b/cpp/src/tests/federation.py
@@ -57,7 +57,7 @@ def remote_port():
class Helper:
def __init__(self, parent):
self.parent = parent
- self.session = parent.conn.session("2")
+ self.session = parent.conn.session("Helper")
self.mc = managementClient(self.session.spec)
self.mch = self.mc.addChannel(self.session)
self.mc.syncWaitForStable(self.mch)
@@ -126,7 +126,7 @@ class FederationTests(TestBase010):
#send messages to remote broker and confirm it is routed to local broker
r_conn = self.connect(host=remote_host(), port=remote_port())
- r_session = r_conn.session("1")
+ r_session = r_conn.session("test_pull_from_exchange")
for i in range(1, 11):
dp = r_session.delivery_properties(routing_key="my-key")
@@ -153,7 +153,7 @@ class FederationTests(TestBase010):
#setup queue on remote broker and add some messages
r_conn = self.connect(host=remote_host(), port=remote_port())
- r_session = r_conn.session("1")
+ r_session = r_conn.session("test_pull_from_queue")
r_session.queue_declare(queue="my-bridge-queue", exclusive=True, auto_delete=True)
for i in range(1, 6):
dp = r_session.delivery_properties(routing_key="my-bridge-queue")
@@ -223,7 +223,7 @@ class FederationTests(TestBase010):
#send messages to remote broker and confirm it is routed to local broker
r_conn = self.connect(host=remote_host(), port=remote_port())
- r_session = r_conn.session("1")
+ r_session = r_conn.session("test_tracing")
trace = [None, "exclude-me", "a,exclude-me,b", "also-exclude-me,c", "dont-exclude-me"]
body = ["yes", "first-bad", "second-bad", "third-bad", "yes"]