diff options
| author | Alan Conway <aconway@apache.org> | 2008-05-20 13:44:34 +0000 | 
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2008-05-20 13:44:34 +0000 | 
| commit | 0333573627c831142aa251bfb1cabdb1e2bf438e (patch) | |
| tree | 953bf8c624374c57953aa3f2888254d175609d9a /cpp/src | |
| parent | 96024622ccfcc8fdd24b3c9ace44f7c8849fac46 (diff) | |
| download | qpid-python-0333573627c831142aa251bfb1cabdb1e2bf438e.tar.gz | |
Support for AMQP 0-10 sessions in C++ broker.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@658246 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
22 files changed, 478 insertions, 717 deletions
diff --git a/cpp/src/qpid/SessionState.cpp b/cpp/src/qpid/SessionState.cpp index 8905fb5f9d..2a6b8303b3 100644 --- a/cpp/src/qpid/SessionState.cpp +++ b/cpp/src/qpid/SessionState.cpp @@ -31,6 +31,7 @@ using framing::AMQFrame;  using amqp_0_10::NotImplementedException;  using amqp_0_10::InvalidArgumentException;  using amqp_0_10::IllegalStateException; +using amqp_0_10::ResourceLimitExceededException;  namespace {  bool isControl(const AMQFrame& f) { @@ -63,78 +64,83 @@ bool SessionPoint::operator==(const SessionPoint& x) const {      return command == x.command && offset == x.offset;  } -SessionState::SendState::SendState(SessionState& s) : session(&s), unflushedSize(0) {} +SessionPoint SessionState::senderGetCommandPoint() { return sender.sendPoint; } +SequenceSet  SessionState::senderGetIncomplete() const { return sender.incomplete; } +SessionPoint SessionState::senderGetReplayPoint() const { return sender.replayPoint; } -const SessionPoint& SessionState::SendState::getCommandPoint() { -    return sendPoint; +SessionState::ReplayRange SessionState::senderExpected(const SessionPoint& expect) { +    if (expect < sender.replayPoint || sender.sendPoint < expect) +        throw InvalidArgumentException(QPID_MSG(getId() << ": expected command-point out of range.")); +    ReplayList::iterator i = sender.replayList.begin(); +    SessionPoint p = sender.replayPoint; +    while (i != sender.replayList.end() && p.command < expect.command) +        p.advance(*i++); +    assert(p.command == expect.command); +    return boost::make_iterator_range(i, sender.replayList.end());  } -bool SessionState::SendState::expected(const SessionPoint& point) { -    if (point < replayPoint || sendPoint < point) -        throw InvalidArgumentException(QPID_MSG(session->getId() << ": expected command-point out of range.")); -    // FIXME aconway 2008-05-06: this is not strictly correct, we should keep -    // an intermediate replay pointer into the replay list. -    confirmed(point);           // Drop commands prior to expected from replay. -    return (!replayList.empty()); -} - -void SessionState::SendState::record(const AMQFrame& f) { +void SessionState::senderRecord(const AMQFrame& f) {      if (isControl(f)) return;   // Ignore control frames. -    session->stateful = true; -    replayList.push_back(f); -    unflushedSize += f.size(); -    incomplete += sendPoint.command; -    sendPoint.advance(f); -} - -bool SessionState::SendState::needFlush() const { return unflushedSize >= session->config.replaySyncSize; } - -void SessionState::SendState::recordFlush() { -    assert(flushPoint <= sendPoint); -    flushPoint = sendPoint; -    unflushedSize = 0; -} - -void SessionState::SendState::confirmed(const SessionPoint& confirmed) { -    if (confirmed > sendPoint) -        throw InvalidArgumentException(QPID_MSG(session->getId() << "Confirmed commands not yet sent.")); -    ReplayList::iterator i = replayList.begin(); -    while (i != replayList.end() && replayPoint.command < confirmed.command) { -        replayPoint.advance(*i); -        assert(replayPoint <= sendPoint); -        if (replayPoint > flushPoint)  -            unflushedSize -= i->size(); +    stateful = true; +    sender.replayList.push_back(f); +    sender.unflushedSize += f.size(); +    sender.replaySize += f.size(); +    sender.incomplete += sender.sendPoint.command; +    sender.sendPoint.advance(f); +    if (config.replayHardLimit && config.replayHardLimit < sender.replaySize)  +        throw ResourceLimitExceededException("Replay bufffer exceeeded hard limit"); +} + +bool SessionState::senderNeedFlush() const { +    return config.replayFlushLimit && sender.unflushedSize >= config.replayFlushLimit; +} + +void SessionState::senderRecordFlush() { +    assert(sender.flushPoint <= sender.sendPoint); +    sender.flushPoint = sender.sendPoint; +    sender.unflushedSize = 0; +} + +void SessionState::senderConfirmed(const SessionPoint& confirmed) { +    if (confirmed > sender.sendPoint) +        throw InvalidArgumentException(QPID_MSG(getId() << "Confirmed commands not yet sent.")); +    ReplayList::iterator i = sender.replayList.begin(); +    while (i != sender.replayList.end() && sender.replayPoint.command < confirmed.command) { +        sender.replayPoint.advance(*i); +        assert(sender.replayPoint <= sender.sendPoint); +        sender.replaySize -= i->size(); +        if (sender.replayPoint > sender.flushPoint)  +            sender.unflushedSize -= i->size();          ++i;      } -    if (replayPoint > flushPoint) flushPoint = replayPoint; -    replayList.erase(replayList.begin(), i); -    assert(replayPoint.offset == 0); +    if (sender.replayPoint > sender.flushPoint) +        sender.flushPoint = sender.replayPoint; +    sender.replayList.erase(sender.replayList.begin(), i); +    assert(sender.replayPoint.offset == 0);  } -void SessionState::SendState::completed(const SequenceSet& commands) { +void SessionState::senderCompleted(const SequenceSet& commands) {      if (commands.empty()) return; -    incomplete -= commands; +    sender.incomplete -= commands;      // Completion implies confirmation but we don't handle out-of-order      // confirmation, so confirm only the first contiguous range of commands. -    confirmed(SessionPoint(commands.rangesBegin()->end())); +    senderConfirmed(SessionPoint(commands.rangesBegin()->end()));  } -SessionState::ReceiveState::ReceiveState(SessionState& s) : session(&s) {} - -void SessionState::ReceiveState::setCommandPoint(const SessionPoint& point) { -    if (session->hasState() && point > received) -        throw InvalidArgumentException(QPID_MSG(session->getId() << ": Command-point out of range.")); -    expected = point; -    if (expected > received) -        received = expected; +void SessionState::receiverSetCommandPoint(const SessionPoint& point) { +    if (hasState() && point > receiver.received) +        throw InvalidArgumentException(QPID_MSG(getId() << ": Command-point out of range.")); +    receiver.expected = point; +    if (receiver.expected > receiver.received) +        receiver.received = receiver.expected;  } -bool SessionState::ReceiveState::record(const AMQFrame& f) { +bool SessionState::receiverRecord(const AMQFrame& f) {      if (isControl(f)) return true; // Ignore control frames. -    session->stateful = true; -    expected.advance(f); -    if (expected > received) { -        received = expected; +    stateful = true; +    receiver.expected.advance(f); +    if (receiver.expected > receiver.received) { +        receiver.received = receiver.expected;          return true;      }      else { @@ -143,41 +149,43 @@ bool SessionState::ReceiveState::record(const AMQFrame& f) {  }  } -void SessionState::ReceiveState::completed(SequenceNumber command, bool cumulative) { -    assert(command <= received.command); // Internal error to complete an unreceived command. -    assert(firstIncomplete <= command); +void SessionState::receiverCompleted(SequenceNumber command, bool cumulative) { +    assert(command <= receiver.received.command); // Internal error to complete an unreceived command. +    assert(receiver.firstIncomplete <= command);      if (cumulative) -        unknownCompleted.add(firstIncomplete, command); +        receiver.unknownCompleted.add(receiver.firstIncomplete, command);      else -        unknownCompleted += command; -    firstIncomplete = unknownCompleted.rangeContaining(firstIncomplete).end(); +        receiver.unknownCompleted += command; +    receiver.firstIncomplete = receiver.unknownCompleted.rangeContaining(receiver.firstIncomplete).end(); +    QPID_LOG(debug, "Completed " << command << " unknown=" << receiver.unknownCompleted);  } -void SessionState::ReceiveState::knownCompleted(const SequenceSet& commands) { -    if (!commands.empty() && commands.back() > received.command) -        throw InvalidArgumentException(QPID_MSG(session->getId() << ": Known-completed has invalid commands.")); -    unknownCompleted -= commands; +void SessionState::receiverKnownCompleted(const SequenceSet& commands) { +    if (!commands.empty() && commands.back() > receiver.received.command) +        throw InvalidArgumentException(QPID_MSG(getId() << ": Known-completed has invalid commands.")); +    receiver.unknownCompleted -= commands;  } -SequenceNumber SessionState::ReceiveState::getCurrent() const { -    SequenceNumber current = expected.command; // FIXME aconway 2008-05-08: SequenceNumber arithmetic. -    return --current; +const SessionPoint& SessionState::receiverGetExpected() const { return receiver.expected; } +const SessionPoint& SessionState::receiverGetReceived() const { return receiver.received; } +const SequenceSet& SessionState::receiverGetUnknownComplete() const { return receiver.unknownCompleted; } + +SequenceNumber SessionState::receiverGetCurrent() const { +    SequenceNumber current = receiver.expected.command; +    if (receiver.expected.offset == 0) +        --current; +    return current;  } -// FIXME aconway 2008-05-02: implement sync & kill limits. -SessionState::Configuration::Configuration() -    : replaySyncSize(std::numeric_limits<size_t>::max()), -      replayKillSize(std::numeric_limits<size_t>::max()) {} +SessionState::Configuration::Configuration(size_t flush, size_t hard) : +    replayFlushLimit(flush), replayHardLimit(hard) {} -SessionState::SessionState(const SessionId& i, const Configuration& c) -    : sender(*this), receiver(*this), id(i), timeout(), config(c), stateful() +SessionState::SessionState(const SessionId& i, const Configuration& c) : id(i), timeout(), config(c), stateful()  {      QPID_LOG(debug, "SessionState::SessionState " << id << ": " << this);  } -bool SessionState::hasState() const { -    return stateful; -} +bool SessionState::hasState() const { return stateful; }  SessionState::~SessionState() {} diff --git a/cpp/src/qpid/SessionState.h b/cpp/src/qpid/SessionState.h index 7957825dd3..47fabb04c8 100644 --- a/cpp/src/qpid/SessionState.h +++ b/cpp/src/qpid/SessionState.h @@ -28,6 +28,7 @@  #include <qpid/framing/AMQFrame.h>  #include <qpid/framing/FrameHandler.h>  #include <boost/operators.hpp> +#include <boost/range/iterator_range.hpp>  #include <vector>  #include <iosfwd> @@ -70,135 +71,118 @@ std::ostream& operator<<(std::ostream&, const SessionPoint&);   * source-incompatbile API changes.   */  class SessionState { -  public: +    typedef std::vector<framing::AMQFrame> ReplayList; -    /** State for commands sent. Records commands for replay, -     * tracks confirmation and completion of sent commands. -     */ -class SendState {    public: -    typedef std::vector<framing::AMQFrame> ReplayList; + +    typedef boost::iterator_range<ReplayList::iterator> ReplayRange; + +    struct Configuration { +        Configuration(size_t flush=0, size_t hard=0); +        size_t replayFlushLimit; // Flush when the replay list >= N bytes. 0 disables. +        size_t replayHardLimit; // Kill session if replay list > N bytes. 0 disables. +    }; + +    SessionState(const SessionId& =SessionId(), const Configuration& =Configuration()); +     +    virtual ~SessionState(); + +    bool hasState() const; + +    const SessionId& getId() const { return id; } + +    uint32_t getTimeout() const { return timeout; } +    void setTimeout(uint32_t seconds) { timeout = seconds; } + +    bool operator==(const SessionId& other) const { return id == other; } +    bool operator==(const SessionState& other) const { return id == other.id; } + +    // ==== Functions for sender state.      /** Record frame f for replay. Should not be called during replay. */ -        void record(const framing::AMQFrame& f); +    virtual void senderRecord(const framing::AMQFrame& f);      /** @return true if we should send flush for confirmed and completed commands. */ -    bool needFlush() const; +    virtual bool senderNeedFlush() const;      /** Called when flush for confirmed and completed commands is sent to peer. */ -        void recordFlush(); +    virtual void senderRecordFlush();          /** Called when the peer confirms up to comfirmed. */ -        void confirmed(const SessionPoint& confirmed); +    virtual void senderConfirmed(const SessionPoint& confirmed);      /** Called when the peer indicates commands completed */ -        void completed(const SequenceSet& commands); +    virtual void senderCompleted(const SequenceSet& commands); -        /** Point from which we can replay. All data < replayPoint is confirmed. */ -    const SessionPoint& getReplayPoint() const { return replayPoint; } - -        /** Get the replay list, starting from getReplayPoint() */ -        // TODO aconway 2008-04-30: should be const, but FrameHandler takes non-const AMQFrame&. -        ReplayList& getReplayList() { return replayList; } - -        /** Point from which the next data will be sent. */ -        const SessionPoint& getCommandPoint(); +    /** Point from which the next new (not replayed) data will be sent. */ +    virtual SessionPoint senderGetCommandPoint();          /** Set of outstanding incomplete commands */ -        const SequenceSet& getIncomplete() const { return incomplete; } +    virtual SequenceSet senderGetIncomplete() const; + +    /** Point from which we can replay. */ +    virtual SessionPoint senderGetReplayPoint() const;          /** Peer expecting commands from this point. -         *@return true if replay is required, sets replayPoint. +     virtual *@return Range of frames to be replayed.           */ -        bool expected(const SessionPoint& expected); +    virtual ReplayRange senderExpected(const SessionPoint& expected); -  private: -        SendState(SessionState& s); -        SessionState* session; -    // invariant: replayPoint <= flushPoint <= sendPoint -    SessionPoint replayPoint;   // Can replay from this point -    SessionPoint flushPoint;    // Point of last flush -        SessionPoint sendPoint;     // Send from this point -    ReplayList replayList; // Starts from replayPoint. -    size_t unflushedSize;       // Un-flushed bytes in replay list. -        SequenceSet incomplete;     // Commands sent and not yet completed. +    // ==== Functions for receiver state -      friend class SessionState; -}; - -    /** State for commands received. -     * Idempotence barrier for duplicate commands, tracks completion -     * and of received commands. -     */ -class ReceiveState { -  public:      /** Set the command point. */ -        void setCommandPoint(const SessionPoint& point); +    virtual void receiverSetCommandPoint(const SessionPoint& point);      /** Returns true if frame should be be processed, false if it is a duplicate. */ -        bool record(const framing::AMQFrame& f); +    virtual bool receiverRecord(const framing::AMQFrame& f);      /** Command completed locally */ -        void completed(SequenceNumber command, bool cumulative=false); +    virtual void receiverCompleted(SequenceNumber command, bool cumulative=false);      /** Peer has indicated commands are known completed */ -        void knownCompleted(const SequenceSet& commands); +    virtual void receiverKnownCompleted(const SequenceSet& commands);          /** Get the incoming command point */ -        const SessionPoint& getExpected() const { return expected; } +    virtual const SessionPoint& receiverGetExpected() const;          /** Get the received high-water-mark, may be > getExpected() during replay */ -    const SessionPoint& getReceived() const { return received; } +    virtual const SessionPoint& receiverGetReceived() const;          /** Completed commands that the peer may not know about */ -        const SequenceSet& getUnknownComplete() const { return unknownCompleted; } +    virtual const SequenceSet& receiverGetUnknownComplete() const;          /** ID of the command currently being handled. */ -        SequenceNumber getCurrent() const; +    virtual SequenceNumber receiverGetCurrent() const;    private: -        ReceiveState(SessionState&); +    struct SendState { +        SendState() : session(), unflushedSize(), replaySize() {} +        SessionState* session; +        // invariant: replayPoint <= flushPoint <= sendPoint +        SessionPoint replayPoint;   // Can replay from this point +        SessionPoint flushPoint;    // Point of last flush +        SessionPoint sendPoint;     // Send from this point +        ReplayList replayList;      // Starts from replayPoint. +        size_t unflushedSize;       // Un-flushed bytes in replay list. +        size_t replaySize;          // Total bytes in replay list. +        SequenceSet incomplete;     // Commands sent and not yet completed. +    } sender; + +    struct ReceiveState { +        ReceiveState() : session() {}          SessionState* session;          SessionPoint expected;  // Expected from here          SessionPoint received; // Received to here. Invariant: expected <= received.          SequenceSet unknownCompleted; // Received & completed, may not  not known-complete by peer.          SequenceNumber firstIncomplete;  // First incomplete command. +    } receiver; -      friend class SessionState; -    }; - -    struct Configuration { -        Configuration(); -        size_t replaySyncSize; // Issue a sync when the replay list holds >= N bytes -        size_t replayKillSize; // Kill session if replay list grows beyond N bytes. -    }; -     -    SessionState(const SessionId& =SessionId(), const Configuration& =Configuration()); - -    virtual ~SessionState(); - -    const SessionId& getId() const { return id; } -    uint32_t getTimeout() const { return timeout; } -    void setTimeout(uint32_t seconds) { timeout = seconds; } - -    bool operator==(const SessionId& other) const { return id == other; } -    bool operator==(const SessionState& other) const { return id == other.id; } - -    SendState sender;           ///< State for commands sent -    ReceiveState receiver;      ///< State for commands received - -    bool hasState() const; - -  private:      SessionId id;      uint32_t timeout;      Configuration config;      bool stateful; - -  friend class SendState; -  friend class ReceiveState;  };  inline bool operator==(const SessionId& id, const SessionState& s) { return s == id; } diff --git a/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp index 3fb2579e8c..6d43dd1789 100644 --- a/cpp/src/qpid/amqp_0_10/SessionHandler.cpp +++ b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp @@ -33,10 +33,8 @@ namespace amqp_0_10 {  using namespace framing;  using namespace std; -SessionHandler::SessionHandler() : peer(channel), ignoring(), sendReady(), receiveReady() {} - -SessionHandler::SessionHandler(FrameHandler& out, ChannelId ch) -    : channel(ch, &out), peer(channel), ignoring(false) {} +SessionHandler::SessionHandler(FrameHandler* out, ChannelId ch) +    : channel(ch, out), peer(channel), ignoring(false), sendReady(), receiveReady() {}  SessionHandler::~SessionHandler() {} @@ -75,7 +73,7 @@ void SessionHandler::handleIn(AMQFrame& f) {              checkAttached();              if (!receiveReady)                  throw IllegalStateException(QPID_MSG(getState()->getId() << ": Not ready to receive data")); -            if (!getState()->receiver.record(f)) +            if (!getState()->receiverRecord(f))                  return; // Ignore duplicates.              getInHandler()->handle(f);          } @@ -100,10 +98,10 @@ void SessionHandler::handleOut(AMQFrame& f) {      checkAttached();      if (!sendReady)          throw IllegalStateException(QPID_MSG(getState()->getId() << ": Not ready to send data")); -    getState()->sender.record(f);  -    if (getState()->sender.needFlush()) { +    getState()->senderRecord(f);  +    if (getState()->senderNeedFlush()) {          peer.flush(false, true, true);  -        getState()->sender.recordFlush(); +        getState()->senderRecordFlush();      }      channel.handle(f);  } @@ -128,7 +126,7 @@ void SessionHandler::attach(const std::string& name, bool force) {      if (getState()->hasState())          peer.flush(true, true, true);      else -        sendCommandPoint(); +        sendCommandPoint(getState()->senderGetCommandPoint());  }  void SessionHandler::attached(const std::string& name) { @@ -168,7 +166,7 @@ void SessionHandler::timeout(uint32_t t) {  void SessionHandler::commandPoint(const SequenceNumber& id, uint64_t offset) {      checkAttached(); -    getState()->receiver.setCommandPoint(SessionPoint(id, offset)); +    getState()->receiverSetCommandPoint(SessionPoint(id, offset));      if (!receiveReady) {          receiveReady = true;          readyToReceive(); @@ -177,32 +175,37 @@ void SessionHandler::commandPoint(const SequenceNumber& id, uint64_t offset) {  void SessionHandler::expected(const SequenceSet& commands, const Array& /*fragments*/) {      checkAttached(); -    if (commands.empty() && getState()->hasState()) -        throw IllegalStateException( +    if (getState()->hasState()) { // Replay +        if (commands.empty()) throw IllegalStateException(              QPID_MSG(getState()->getId() << ": has state but client is attaching as new session."));         -    getState()->sender.expected(commands.empty() ? SequenceNumber(0) : commands.front()); -    if (!sendReady)  // send command point if not already sent -        sendCommandPoint(); +        // TODO aconway 2008-05-12: support replay of partial commands. +        // Here we always round down to the last command boundary. +        SessionPoint expectedPoint = commands.empty() ? SequenceNumber(0) : SessionPoint(commands.front(),0); +        SessionState::ReplayRange replay = getState()->senderExpected(expectedPoint); +        sendCommandPoint(expectedPoint); +        std::for_each(replay.begin(), replay.end(), out); // replay +    } +    else +        sendCommandPoint(getState()->senderGetCommandPoint());  }  void SessionHandler::confirmed(const SequenceSet& commands, const Array& /*fragments*/) {      checkAttached();      // Ignore non-contiguous confirmations. -    if (!commands.empty() && commands.front() >= getState()->sender.getReplayPoint()) { -        getState()->sender.confirmed(commands.rangesBegin()->last()); -        } +    if (!commands.empty() && commands.front() >= getState()->senderGetReplayPoint())  +        getState()->senderConfirmed(commands.rangesBegin()->last());  }  void SessionHandler::completed(const SequenceSet& commands, bool /*timelyReply*/) {      checkAttached(); -    getState()->sender.completed(commands); +    getState()->senderCompleted(commands);      if (!commands.empty())          peer.knownCompleted(commands);    // Always send a timely reply  }  void SessionHandler::knownCompleted(const SequenceSet& commands) {      checkAttached(); -    getState()->receiver.knownCompleted(commands); +    getState()->receiverKnownCompleted(commands);  }  void SessionHandler::flush(bool expected, bool confirmed, bool completed) { @@ -210,18 +213,18 @@ void SessionHandler::flush(bool expected, bool confirmed, bool completed) {      if (expected)  {          SequenceSet expectSet;          if (getState()->hasState()) -            expectSet.add(getState()->receiver.getExpected().command); +            expectSet.add(getState()->receiverGetExpected().command);          peer.expected(expectSet, Array());      }      if (confirmed) {          SequenceSet confirmSet; -        if (!getState()->receiver.getUnknownComplete().empty())  -            confirmSet.add(getState()->receiver.getUnknownComplete().front(), -                           getState()->receiver.getReceived().command); +        if (!getState()->receiverGetUnknownComplete().empty())  +            confirmSet.add(getState()->receiverGetUnknownComplete().front(), +                           getState()->receiverGetReceived().command);          peer.confirmed(confirmSet, Array());      }      if (completed) -        peer.completed(getState()->receiver.getUnknownComplete(), true); +        peer.completed(getState()->receiverGetUnknownComplete(), true);  }  void SessionHandler::gap(const SequenceSet& /*commands*/) { @@ -237,20 +240,20 @@ void SessionHandler::sendDetach()  void SessionHandler::sendCompletion() {      checkAttached(); -    peer.completed(getState()->receiver.getUnknownComplete(), true); +    peer.completed(getState()->receiverGetUnknownComplete(), true);  }  void SessionHandler::sendAttach(bool force) {      checkAttached(); +    QPID_LOG(debug, "SessionHandler::sendAttach attach id=" << getState()->getId());      peer.attach(getState()->getId().getName(), force);      if (getState()->hasState())          peer.flush(true, true, true);      else -        sendCommandPoint(); +        sendCommandPoint(getState()->senderGetCommandPoint());  } -void SessionHandler::sendCommandPoint()  { -    SessionPoint point(getState()->sender.getCommandPoint()); +void SessionHandler::sendCommandPoint(const SessionPoint& point)  {      peer.commandPoint(point.command, point.offset);      if (!sendReady) {          sendReady = true; diff --git a/cpp/src/qpid/amqp_0_10/SessionHandler.h b/cpp/src/qpid/amqp_0_10/SessionHandler.h index 85577ebafc..ccbe597bfc 100644 --- a/cpp/src/qpid/amqp_0_10/SessionHandler.h +++ b/cpp/src/qpid/amqp_0_10/SessionHandler.h @@ -25,10 +25,10 @@  #include "qpid/framing/ChannelHandler.h"  #include "qpid/framing/AMQP_AllProxy.h"  #include "qpid/framing/AMQP_AllOperations.h" +#include "qpid/SessionState.h"  namespace qpid { -class SessionState;  namespace amqp_0_10 { @@ -45,8 +45,7 @@ class SessionHandler : public framing::AMQP_AllOperations::SessionHandler,    public:      typedef framing::AMQP_AllProxy::Session Peer; -    SessionHandler(); -    SessionHandler(framing::FrameHandler& out, uint16_t channel); +    SessionHandler(framing::FrameHandler* out=0, uint16_t channel=0);      ~SessionHandler();      void setChannel(uint16_t ch) { channel = ch; } @@ -63,7 +62,6 @@ class SessionHandler : public framing::AMQP_AllOperations::SessionHandler,      void sendAttach(bool force);      void sendTimeout(uint32_t t);      void sendFlush(); -    void sendCommandPoint();      /** True if the handler is ready to send and receive */      bool ready() const; @@ -96,8 +94,8 @@ class SessionHandler : public framing::AMQP_AllOperations::SessionHandler,      // Notification of events      virtual void readyToSend() {}      virtual void readyToReceive() {} -    virtual void handleDetach(); +    virtual void handleDetach();      virtual void handleIn(framing::AMQFrame&);      virtual void handleOut(framing::AMQFrame&); @@ -108,8 +106,9 @@ class SessionHandler : public framing::AMQP_AllOperations::SessionHandler,      Peer peer;      bool ignoring;      bool sendReady, receiveReady; -    // FIXME aconway 2008-05-07: move handler-related functions from SessionState. +  private: +    void sendCommandPoint(const SessionPoint&);  };  }} // namespace qpid::amqp_0_10 diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp index 337992992f..f3e103dfaf 100644 --- a/cpp/src/qpid/broker/Bridge.cpp +++ b/cpp/src/qpid/broker/Bridge.cpp @@ -59,6 +59,7 @@ void Bridge::create(ConnectionState& c)      peer.reset(new framing::AMQP_ServerProxy(*channelHandler));      session->attach(name, false); +    session->commandPoint(0,0);      if (args.i_src_is_local) {          //TODO: handle 'push' here... simplest way is to create frames and pass them to Connection::received() diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 0a45c46d30..4f7686aac4 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -53,6 +53,9 @@  #if HAVE_SASL  #include <sasl/sasl.h> +static const bool AUTH_DEFAULT=true; +#else +static const bool AUTH_DEFAULT=false;  #endif  using qpid::sys::ProtocolFactory; @@ -81,41 +84,25 @@ Broker::Options::Options(const std::string& name) :      stagingThreshold(5000000),      enableMgmt(1),      mgmtPubInterval(10), -#if HAVE_SASL -    auth(true), -#else -    auth(false), -#endif -	realm("QPID"), -    ack(0) +    auth(AUTH_DEFAULT), +    replayFlushLimit(64), +    replayHardLimit(0)  {      int c = sys::SystemInfo::concurrency();      workerThreads=c+1;      addOptions() -        ("data-dir", optValue(dataDir,"DIR"), -         "Directory to contain persistent data generated by the broker") -        ("no-data-dir", optValue(noDataDir), -         "Don't use a data directory.  No persistent configuration will be loaded or stored") -        ("port,p", optValue(port,"PORT"), -         "Tells the broker to listen on PORT") -        ("worker-threads", optValue(workerThreads, "N"), -         "Sets the broker thread pool size") -        ("max-connections", optValue(maxConnections, "N"), -         "Sets the maximum allowed connections") -        ("connection-backlog", optValue(connectionBacklog, "N"), -         "Sets the connection backlog limit for the server socket") -        ("staging-threshold", optValue(stagingThreshold, "N"), -         "Stages messages over N bytes to disk") -        ("mgmt-enable,m", optValue(enableMgmt,"yes|no"), -         "Enable Management") -        ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), -         "Management Publish Interval") -        ("auth", optValue(auth, "yes|no"), -         "Enable authentication, if disabled all incoming connections will be trusted") -        ("realm", optValue(realm, "REALM"), -         "Use the given realm when performing authentication") -        ("ack", optValue(ack, "N"), -         "Send session.ack/solicit-ack at least every N frames. 0 disables voluntary ack/solitict-ack"); +        ("data-dir", optValue(dataDir,"DIR"), "Directory to contain persistent data generated by the broker") +        ("no-data-dir", optValue(noDataDir), "Don't use a data directory.  No persistent configuration will be loaded or stored") +        ("port,p", optValue(port,"PORT"), "Tells the broker to listen on PORT") +        ("worker-threads", optValue(workerThreads, "N"), "Sets the broker thread pool size") +        ("max-connections", optValue(maxConnections, "N"), "Sets the maximum allowed connections") +        ("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket") +        ("staging-threshold", optValue(stagingThreshold, "N"), "Stages messages over N bytes to disk") +        ("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management") +        ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), "Management Publish Interval") +        ("auth", optValue(auth, "yes|no"), "Enable authentication, if disabled all incoming connections will be trusted") +        ("replay-flush-limit", optValue(replayFlushLimit, "KB"), "Send flush request when the replay buffer reaches this limit. 0 means no limit.") +        ("replay-hard-limit", optValue(replayHardLimit, "KB"), "Kill a session if its replay buffer exceeds this limit. 0 means no limit.");  }  const std::string empty; @@ -132,7 +119,11 @@ Broker::Broker(const Broker::Options& conf) :      dataDir(conf.noDataDir ? std::string () : conf.dataDir),      links(this),      factory(*this), -    sessionManager(conf.ack) +    sessionManager( +        qpid::SessionState::Configuration( +            conf.replayFlushLimit*1024, // convert kb to bytes. +            conf.replayHardLimit*1024), +        *this)  {      if(conf.enableMgmt){          QPID_LOG(info, "Management enabled"); diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index a1eaf4f62f..7092a86181 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -83,7 +83,8 @@ class Broker : public sys::Runnable, public Plugin::Target,          uint16_t mgmtPubInterval;          bool auth;          std::string realm; -        uint32_t ack; +        size_t replayFlushLimit; +        size_t replayHardLimit;      };      virtual ~Broker(); diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index d156b4a914..463193a346 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -124,8 +124,8 @@ void Connection::idleIn(){}  void Connection::closed(){ // Physically closed, suspend open sessions.      try { -	for (ChannelMap::iterator i = channels.begin(); i != channels.end(); ++i) -	    ptr_map_ptr(i)->handleDetach(); +        while (!channels.empty())  +	    ptr_map_ptr(channels.begin())->handleDetach();          while (!exclusiveQueues.empty()) {              Queue::shared_ptr q(exclusiveQueues.front());              q->releaseExclusiveOwnership(); diff --git a/cpp/src/qpid/broker/SaslAuthenticator.cpp b/cpp/src/qpid/broker/SaslAuthenticator.cpp index d48b258ba2..a542211147 100644 --- a/cpp/src/qpid/broker/SaslAuthenticator.cpp +++ b/cpp/src/qpid/broker/SaslAuthenticator.cpp @@ -89,13 +89,20 @@ void NullAuthenticator::getMechanisms(Array& mechanisms)      mechanisms.add(boost::shared_ptr<FieldValue>(new Str16Value("ANONYMOUS")));  } -void NullAuthenticator::start(const string& /*mechanism*/, const string& /*response*/) +void NullAuthenticator::start(const string& mechanism, const string& response)  {      QPID_LOG(warning, "SASL: No Authentication Performed"); -     -    // TODO: Figure out what should actually be set in this case -    connection.setUserId("anonymous"); -     +    if (mechanism == "PLAIN") { // Old behavior +        if (response.size() > 0 && response[0] == (char) 0) { +            string temp = response.substr(1); +            string::size_type i = temp.find((char)0); +            string uid = temp.substr(0, i); +            string pwd = temp.substr(i + 1); +            connection.setUserId(uid); +        } +    } else { +        connection.setUserId("anonymous"); +    }         client.tune(framing::CHANNEL_MAX, connection.getFrameMax(), 0, 0);  } diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index e284451d14..2d0edde27b 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -410,7 +410,7 @@ void SessionAdapter::MessageHandlerImpl::accept(const framing::SequenceSet& comm  framing::MessageAcquireResult SessionAdapter::MessageHandlerImpl::acquire(const framing::SequenceSet& transfers)  { -    //TODO: change this when SequenceNumberSet is deleted along with preview code +    // FIXME aconway 2008-05-12: create SequenceSet directly, no need for intermediate results vector.      SequenceNumberSet results;      RangedOperation f = boost::bind(&SemanticState::acquire, &state, _1, _2, boost::ref(results));      transfers.for_each(f); diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp index 2ec0988fc0..2f09c6b5ac 100644 --- a/cpp/src/qpid/broker/SessionHandler.cpp +++ b/cpp/src/qpid/broker/SessionHandler.cpp @@ -21,12 +21,6 @@  #include "SessionHandler.h"  #include "SessionState.h"  #include "Connection.h" -#include "ConnectionState.h" -#include "qpid/framing/reply_exceptions.h" -#include "qpid/framing/constants.h" -#include "qpid/framing/ClientInvoker.h" -#include "qpid/framing/ServerInvoker.h" -#include "qpid/framing/all_method_bodies.h"  #include "qpid/log/Statement.h"  #include <boost/bind.hpp> @@ -38,11 +32,9 @@ using namespace std;  using namespace qpid::sys;  SessionHandler::SessionHandler(Connection& c, ChannelId ch) -    : InOutHandler(0, &out), -      connection(c), channel(ch, &c.getOutput()), -      proxy(out),               // Via my own handleOut() for L2 data. -      peerSession(channel),     // Direct to channel for L2 commands. -      ignoring(false) +    : amqp_0_10::SessionHandler(&c.getOutput(), ch), +      connection(c),  +      proxy(out)  {}  SessionHandler::~SessionHandler() {} @@ -52,191 +44,52 @@ ClassId classId(AMQMethodBody* m) { return m ? m->amqpMethodId() : 0; }  MethodId methodId(AMQMethodBody* m) { return m ? m->amqpClassId() : 0; }  } // namespace -void SessionHandler::handleIn(AMQFrame& f) { -    // Note on channel states: a channel is attached if session != 0 -    AMQMethodBody* m = f.getBody()->getMethod(); -    try { -        if (ignoring && !(m && m->isA<SessionDetachedBody>())) { -            return; -        } -        if (m && isValid(m) && invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m)) { -            //frame was a valid session control and has been handled -            return; -        } else if (session.get()) { -            //we are attached and frame was not a session control so it is for upper layers -            session->handle(f); -        } else if (m && m->isA<SessionDetachedBody>()) { -            handleDetach(); -            connection.closeChannel(channel.get());  -        } else { -            throw NotAttachedException(QPID_MSG("Channel " << channel.get() << " is not attached"));                 -        } -    }catch(const ChannelException& e){ -        QPID_LOG(error, "Session detached due to: " << e.what()); -        peerSession.detached(name, e.code); +void SessionHandler::channelException(uint16_t, const std::string&) {          handleDetach(); -        connection.closeChannel(channel.get());  -    }catch(const ConnectionException& e){ -        connection.close(e.code, e.what(), classId(m), methodId(m)); -    }catch(const std::exception& e){ -        connection.close(501, e.what(), classId(m), methodId(m)); -    }  } -bool SessionHandler::isValid(AMQMethodBody* m) { -    return session.get() || m->isA<SessionAttachBody>() || m->isA<SessionAttachedBody>(); -} - -void SessionHandler::handleOut(AMQFrame& f) { -    channel.handle(f);          // Send it. -    if (session->sent(f)) -        peerSession.flush(false, false, true); -} - -void SessionHandler::assertAttached(const char* method) const { -    if (!session.get()) { -        std::cout << "SessionHandler::assertAttached() failed for " << method << std::endl; -        throw NotAttachedException( -            QPID_MSG(method << " failed: No session for channel " -                     << getChannel())); -    } -} - -void SessionHandler::assertClosed(const char* method) const { -    if (session.get()) -        throw SessionBusyException( -            QPID_MSG(method << " failed: channel " << channel.get() -                     << " is already open.")); +void SessionHandler::connectionException(uint16_t code, const std::string& msg) { +    connection.close(code, msg, 0, 0);  }  ConnectionState& SessionHandler::getConnection() { return connection; } -const ConnectionState& SessionHandler::getConnection() const { return connection; } - -//new methods: -void SessionHandler::attach(const std::string& _name, bool /*force*/) -{ -    name = _name;//TODO: this should be used in conjunction with -                 //userid for connection as sessions identity -    //TODO: need to revise session manager to support resume as well -    assertClosed("attach"); -    session.reset(new SessionState(0, this, 0, 0, name)); -    peerSession.attached(name); -    peerSession.commandPoint(session->nextOut, 0); -} - -void SessionHandler::attached(const std::string& _name) -{ -    name = _name;//TODO: this should be used in conjunction with -                 //userid for connection as sessions identity -    session.reset(new SessionState(0, this, 0, 0, name)); -    peerSession.commandPoint(session->nextOut, 0); -} +const ConnectionState& SessionHandler::getConnection() const { return connection; } -void SessionHandler::detach(const std::string& name) -{ -    assertAttached("detach"); -    peerSession.detached(name, session::NORMAL); -    handleDetach(); +void SessionHandler::handleDetach() { +    amqp_0_10::SessionHandler::handleDetach();      assert(&connection.getChannel(channel.get()) == this); +    if (session.get()) +        connection.getBroker().getSessionManager().detach(session); +    assert(!session.get());      connection.closeChannel(channel.get());   } -void SessionHandler::detached(const std::string& name, uint8_t code) -{ -    ignoring = false; -    handleDetach(); -    if (code) { -        //no error -    } else { -        //error occured -        QPID_LOG(warning, "Received session.closed: "<< name << " " << code); -    } -    connection.closeChannel(channel.get());  -} - -void SessionHandler::handleDetach() -{ -    if (session.get()) { -        session->detach(); -        session.reset(); -    } -} - -void SessionHandler::requestDetach() -{ -    //TODO: request timeout when python can handle it -    //peerSession.requestTimeout(0); -    ignoring = true; -    peerSession.detach(name); -} - -void SessionHandler::requestTimeout(uint32_t t) -{ -    session->setTimeout(t); -    peerSession.timeout(t); -} - -void SessionHandler::timeout(uint32_t t) -{ -    session->setTimeout(t); -} - -void SessionHandler::commandPoint(const framing::SequenceNumber& id, uint64_t offset) -{ -    if (offset) throw NotImplementedException("Non-zero byte offset not yet supported for command-point"); -     -    session->nextIn = id; -} - -void SessionHandler::expected(const framing::SequenceSet& commands, const framing::Array& fragments) -{ -    if (!commands.empty() || fragments.size()) { -        throw NotImplementedException("Session resumption not yet supported"); -    } +void SessionHandler::setState(const std::string& name, bool force) { +    assert(!session.get()); +    SessionId id(connection.getUserId(), name); +    session = connection.broker.getSessionManager().attach(*this, id, force);  } -void SessionHandler::confirmed(const framing::SequenceSet& /*commands*/, const framing::Array& /*fragments*/) -{ -    //don't really care too much about this yet -} +FrameHandler* SessionHandler::getInHandler() { return session.get(); } +qpid::SessionState* SessionHandler::getState() { return session.get(); } -void SessionHandler::completed(const framing::SequenceSet& commands, bool timelyReply) -{ -    session->complete(commands); -    if (timelyReply) { -        peerSession.knownCompleted(session->knownCompleted); -        session->knownCompleted.clear(); -    } +void SessionHandler::readyToSend() { +    if (session.get()) session->readyToSend();  } -void SessionHandler::knownCompleted(const framing::SequenceSet& commands) -{ -    session->completed.remove(commands); -} - -void SessionHandler::flush(bool expected, bool confirmed, bool completed) -{ -    if (expected) { -        peerSession.expected(SequenceSet(session->nextIn), Array()); -    } -    if (confirmed) { -        peerSession.confirmed(session->completed, Array()); -    } -    if (completed) { -        peerSession.completed(session->completed, true); -    } -} - - -void SessionHandler::sendCompletion() -{ -    peerSession.completed(session->completed, true); +// TODO aconway 2008-05-12: hacky - handle attached for bridge clients. +// We need to integrate the client code so we can run a real client +// in the bridge. +//  +void SessionHandler::attached(const std::string& name) { +    if (session.get()) +        checkName(name); +    else { +        SessionId id(connection.getUserId(), name); +        SessionState::Configuration config = connection.broker.getSessionManager().getSessionConfig(); +        session.reset(new SessionState(connection.getBroker(), *this, id, config));  } - -void SessionHandler::gap(const framing::SequenceSet& /*commands*/) -{ -    throw NotImplementedException("gap not yet supported");  }  }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h index 47c534441a..1aa3137fdf 100644 --- a/cpp/src/qpid/broker/SessionHandler.h +++ b/cpp/src/qpid/broker/SessionHandler.h @@ -22,19 +22,12 @@   *   */ -#include "qpid/framing/FrameHandler.h" -#include "qpid/framing/AMQP_ClientOperations.h" -#include "qpid/framing/AMQP_ServerOperations.h" +#include "qpid/amqp_0_10/SessionHandler.h"  #include "qpid/framing/AMQP_ClientProxy.h" -#include "qpid/framing/amqp_types.h" -#include "qpid/framing/Array.h" -#include "qpid/framing/ChannelHandler.h" -#include "qpid/framing/SequenceNumber.h" - - -#include <boost/noncopyable.hpp>  namespace qpid { +class SessionState; +  namespace broker {  class Connection; @@ -46,65 +39,38 @@ class SessionState;   * receives incoming frames, handles session controls and manages the   * association between the channel and a session.   */ -class SessionHandler : public framing::AMQP_ServerOperations::SessionHandler, -                       public framing::FrameHandler::InOutHandler, -                       private boost::noncopyable -{ +class SessionHandler : public amqp_0_10::SessionHandler {    public:      SessionHandler(Connection&, framing::ChannelId);      ~SessionHandler(); -    /** Returns 0 if not attached to a session */ +    /** Get broker::SessionState */      SessionState* getSession() { return session.get(); }      const SessionState* getSession() const { return session.get(); } -    framing::ChannelId getChannel() const { return channel.get(); } -          ConnectionState& getConnection();      const ConnectionState& getConnection() const;      framing::AMQP_ClientProxy& getProxy() { return proxy; }      const framing::AMQP_ClientProxy& getProxy() const { return proxy; } -    void requestDetach(); -    void handleDetach(); -    void sendCompletion(); -     -  protected: -    void handleIn(framing::AMQFrame&); -    void handleOut(framing::AMQFrame&); +    virtual void handleDetach(); -  private: -    //new methods: -    void attach(const std::string& name, bool force); +    // Overrides      void attached(const std::string& name); -    void detach(const std::string& name); -    void detached(const std::string& name, uint8_t code); - -    void requestTimeout(uint32_t t); -    void timeout(uint32_t t); - -    void commandPoint(const framing::SequenceNumber& id, uint64_t offset); -    void expected(const framing::SequenceSet& commands, const framing::Array& fragments); -    void confirmed(const framing::SequenceSet& commands,const framing::Array& fragments); -    void completed(const framing::SequenceSet& commands, bool timelyReply); -    void knownCompleted(const framing::SequenceSet& commands); -    void flush(bool expected, bool confirmed, bool completed); -    void gap(const framing::SequenceSet& commands);     -    void assertAttached(const char* method) const; -    void assertActive(const char* method) const; -    void assertClosed(const char* method) const; - -    bool isValid(framing::AMQMethodBody*); +  protected: +    virtual void setState(const std::string& sessionName, bool force); +    virtual qpid::SessionState* getState(); +    virtual framing::FrameHandler* getInHandler(); +    virtual void channelException(uint16_t code, const std::string& msg); +    virtual void connectionException(uint16_t code, const std::string& msg); +    virtual void readyToSend(); +  private:      Connection& connection; -    framing::ChannelHandler channel;      framing::AMQP_ClientProxy proxy; -    framing::AMQP_ClientProxy::Session peerSession; -    bool ignoring;      std::auto_ptr<SessionState> session; -    std::string name;//TODO: this should be part of the session state and replace the id  };  }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionManager.cpp b/cpp/src/qpid/broker/SessionManager.cpp index d7bae737fc..789e43b902 100644 --- a/cpp/src/qpid/broker/SessionManager.cpp +++ b/cpp/src/qpid/broker/SessionManager.cpp @@ -40,68 +40,58 @@ using boost::intrusive_ptr;  using namespace sys;  using namespace framing; -SessionManager::SessionManager(uint32_t a) : ack(a) {} +SessionManager::SessionManager(const SessionState::Configuration& c, Broker& b) +    : config(c), broker(b) {} -SessionManager::~SessionManager() {} +SessionManager::~SessionManager() { +    detached.clear();           // Must clear before destructor as session dtor will call forget() +} -// FIXME aconway 2008-02-01: pass handler*, allow open  unattached. -std::auto_ptr<SessionState>  SessionManager::open( -    SessionHandler& h, uint32_t timeout_, std::string _name) -{ +std::auto_ptr<SessionState>  SessionManager::attach(SessionHandler& h, const SessionId& id, bool/*force*/) {      Mutex::ScopedLock l(lock); -    std::auto_ptr<SessionState> session( -        new SessionState(this, &h, timeout_, ack, _name)); -    active.insert(session->getId()); +    eraseExpired();             // Clean up expired table +    std::pair<Attached::iterator, bool> insert = attached.insert(id); +    if (!insert.second) +        throw SessionBusyException(QPID_MSG("Session already attached: " << id)); +    Detached::iterator i = std::find(detached.begin(), detached.end(), id); +    std::auto_ptr<SessionState> state; +    if (i == detached.end())  { +        state.reset(new SessionState(broker, h, id, config));      for_each(observers.begin(), observers.end(), -             boost::bind(&Observer::opened, _1,boost::ref(*session))); -    return session; +                 boost::bind(&Observer::opened, _1,boost::ref(*state))); +    } +    else { +        state.reset(detached.release(i).release()); +        state->attach(h); +    } +    return state; +    // FIXME aconway 2008-04-29: implement force   } -void  SessionManager::suspend(std::auto_ptr<SessionState> session) { +void  SessionManager::detach(std::auto_ptr<SessionState> session) {      Mutex::ScopedLock l(lock); -    active.erase(session->getId()); -    session->suspend(); +    attached.erase(session->getId()); +    session->detach(); +    if (session->getTimeout() > 0) {      session->expiry = AbsTime(now(),session->getTimeout()*TIME_SEC);      if (session->mgmtObject.get() != 0)          session->mgmtObject->set_expireTime ((uint64_t) Duration (session->expiry)); -    suspended.push_back(session.release()); // In expiry order +        detached.push_back(session.release()); // In expiry order      eraseExpired();  } - -std::auto_ptr<SessionState>  SessionManager::resume(const Uuid& id) -{ -    Mutex::ScopedLock l(lock); -    eraseExpired(); -    if (active.find(id) != active.end())  -        throw SessionBusyException( -            QPID_MSG("Session already active: " << id)); -    Suspended::iterator i = std::find_if( -        suspended.begin(), suspended.end(), -        boost::bind(std::equal_to<Uuid>(), id, boost::bind(&SessionState::getId, _1)) -    ); -    if (i == suspended.end()) -        throw InvalidArgumentException( -            QPID_MSG("No suspended session with id=" << id)); -    active.insert(id); -    std::auto_ptr<SessionState> state(suspended.release(i).release()); -    return state;  } -void SessionManager::erase(const framing::Uuid& id) -{ -    Mutex::ScopedLock l(lock); -    active.erase(id); -} +void SessionManager::forget(const SessionId& id) { attached.erase(id); }  void SessionManager::eraseExpired() {      // Called with lock held. -    if (!suspended.empty()) { -        Suspended::iterator keep = std::lower_bound( -            suspended.begin(), suspended.end(), now(), +    if (!detached.empty()) { +        Detached::iterator keep = std::lower_bound( +            detached.begin(), detached.end(), now(),              boost::bind(std::less<AbsTime>(), boost::bind(&SessionState::expiry, _1), _2)); -        if (suspended.begin() != keep) { -            QPID_LOG(debug, "Expiring sessions: " << log::formatList(suspended.begin(), keep)); -            suspended.erase(suspended.begin(), keep); +        if (detached.begin() != keep) { +            QPID_LOG(debug, "Expiring sessions: " << log::formatList(detached.begin(), keep)); +            detached.erase(detached.begin(), keep);          }      }  } diff --git a/cpp/src/qpid/broker/SessionManager.h b/cpp/src/qpid/broker/SessionManager.h index ad064c69bb..9a4142f613 100644 --- a/cpp/src/qpid/broker/SessionManager.h +++ b/cpp/src/qpid/broker/SessionManager.h @@ -22,7 +22,7 @@   *   */ -#include <qpid/framing/Uuid.h> +#include <qpid/SessionState.h>  #include <qpid/sys/Time.h>  #include <qpid/sys/Mutex.h>  #include <qpid/RefCounted.h> @@ -37,7 +37,7 @@  namespace qpid {  namespace broker { - +class Broker;  class SessionState;  class SessionHandler; @@ -50,44 +50,43 @@ class SessionManager : private boost::noncopyable {       * Observer notified of SessionManager events.       */      struct Observer : public RefCounted { +        /** Called when a stateless session is attached. */          virtual void opened(SessionState&) {}      }; -    SessionManager(uint32_t ack); +    SessionManager(const qpid::SessionState::Configuration&, Broker&);      ~SessionManager();      /** Open a new active session, caller takes ownership */ -    std::auto_ptr<SessionState> open(SessionHandler& c, uint32_t timeout_, std::string name); +    std::auto_ptr<SessionState> attach(SessionHandler& h, const SessionId& id, bool/*force*/); -    /** Suspend a session, start it's timeout counter. -     * The factory takes ownership. -     */ -    void suspend(std::auto_ptr<SessionState> session); +    /** Return a detached session to the manager, start the timeout counter. */ +    void detach(std::auto_ptr<SessionState>); -    /** Resume a suspended session. -     *@throw Exception if timed out or non-existant. -     */ -    std::auto_ptr<SessionState> resume(const framing::Uuid&); +    /** Forget about an attached session. Called by SessionState destructor. */ +    void forget(const SessionId&);      /** Add an Observer. */      void add(const boost::intrusive_ptr<Observer>&); +    Broker& getBroker() const { return broker; } + +    const qpid::SessionState::Configuration& getSessionConfig() const { return config; } +    private: -    typedef boost::ptr_vector<SessionState> Suspended; -    typedef std::set<framing::Uuid> Active; +    typedef boost::ptr_vector<SessionState> Detached; // Sorted in expiry order. +    typedef std::set<SessionId> Attached;      typedef std::vector<boost::intrusive_ptr<Observer> > Observers; -    void erase(const framing::Uuid&);                   void eraseExpired();                   sys::Mutex lock; -    Suspended suspended; -    Active active; -    uint32_t ack; +    Detached detached; +    Attached attached; +    qpid::SessionState::Configuration config;      Observers observers; -     -  friend class SessionState; // removes deleted sessions from active set. +    Broker& broker;  }; diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 2ef1ed2de4..c851162046 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -32,6 +32,7 @@  #include "qpid/log/Statement.h"  #include <boost/bind.hpp> +#include <boost/lexical_cast.hpp>  namespace qpid {  namespace broker { @@ -45,59 +46,46 @@ using qpid::management::Manageable;  using qpid::management::Args;  SessionState::SessionState( -      SessionManager* f, SessionHandler* h, uint32_t timeout_, uint32_t ack, string& _name)  -    : framing::SessionState(ack, timeout_ > 0), nextOut(0), -      factory(f), handler(h), id(true), timeout(timeout_), -      broker(h->getConnection().broker), -      version(h->getConnection().getVersion()), -      ignoring(false), name(_name), +    Broker& b, SessionHandler& h, const SessionId& id, const SessionState::Configuration& config)  +    : qpid::SessionState(id, config), +      broker(b), handler(&h), +      ignoring(false),        semanticState(*this, *this),        adapter(semanticState),        msgBuilder(&broker.getStore(), broker.getStagingThreshold()), -      ackOp(boost::bind(&SemanticState::completed, &semanticState, _1, _2)),        enqueuedOp(boost::bind(&SessionState::enqueued, this, _1))  { -    getConnection().outputTasks.addOutputTask(&semanticState); -      Manageable* parent = broker.GetVhostObject (); - -    if (parent != 0) -    { +    if (parent != 0) {          ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); - -        if (agent.get () != 0) -        { +        if (agent.get () != 0) {              mgmtObject = management::Session::shared_ptr -                (new management::Session (this, parent, name)); -            mgmtObject->set_attached (1); -            mgmtObject->set_clientRef (h->getConnection().GetManagementObject()->getObjectId()); -            mgmtObject->set_channelId (h->getChannel()); -            mgmtObject->set_detachedLifespan (getTimeout()); +                (new management::Session (this, parent, getId().getName())); +            mgmtObject->set_attached (0);              agent->addObject (mgmtObject);          }      } +    attach(h);  }  SessionState::~SessionState() {      // Remove ID from active session list. -    if (factory) -        factory->erase(getId()); +    // FIXME aconway 2008-05-12: Need to distinguish outgoing sessions established by bridge, +    // they don't belong in the manager. For now rely on uniqueness of UUIDs. +    //  +    broker.getSessionManager().forget(getId());      if (mgmtObject.get () != 0)          mgmtObject->resourceDestroy ();  } -SessionHandler* SessionState::getHandler() { -    return handler; -} -  AMQP_ClientProxy& SessionState::getProxy() {      assert(isAttached()); -    return getHandler()->getProxy(); +    return handler->getProxy();  }  ConnectionState& SessionState::getConnection() {      assert(isAttached()); -    return getHandler()->getConnection(); +    return handler->getConnection();  }  bool SessionState::isLocal(const ConnectionToken* t) const @@ -106,18 +94,19 @@ bool SessionState::isLocal(const ConnectionToken* t) const  }  void SessionState::detach() { -    getConnection().outputTasks.removeOutputTask(&semanticState); +    // activateOutput can be called in a different thread, lock to protect attached status      Mutex::ScopedLock l(lock); +    QPID_LOG(debug, getId() << ": detached on broker."); +    getConnection().outputTasks.removeOutputTask(&semanticState);      handler = 0;      if (mgmtObject.get() != 0) -    {          mgmtObject->set_attached  (0);      } -}  void SessionState::attach(SessionHandler& h) { -    { +    // activateOutput can be called in a different thread, lock to protect attached status          Mutex::ScopedLock l(lock); +    QPID_LOG(debug, getId() << ": attached on broker.");          handler = &h;          if (mgmtObject.get() != 0)          { @@ -126,16 +115,13 @@ void SessionState::attach(SessionHandler& h) {              mgmtObject->set_channelId (h.getChannel());          }      } -    h.getConnection().outputTasks.addOutputTask(&semanticState); -} -void SessionState::activateOutput() -{ +void SessionState::activateOutput() { +    // activateOutput can be called in a different thread, lock to protect attached status      Mutex::ScopedLock l(lock); -    if (isAttached()) { +    if (isAttached())           getConnection().outputTasks.activateOutput();      } -}      //This class could be used as the callback for queue notifications      //if not attached, it can simply ignore the callback, else pass it      //on to the connection @@ -155,7 +141,7 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId,      case management::Session::METHOD_DETACH :          if (handler != 0)          { -            handler->requestDetach(); +            handler->sendDetach();          }          status = Manageable::STATUS_OK;          break; @@ -179,35 +165,25 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId,      return status;  } -void SessionState::handleCommand(framing::AMQMethodBody* method, SequenceNumber& id) -{ -    id = nextIn++; +void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceNumber& id) {      Invoker::Result invocation = invoke(adapter, *method); -    completed.add(id);                                     -     +    receiverCompleted(id);                                          if (!invocation.wasHandled()) {          throw NotImplementedException(QPID_MSG("Not implemented: " << *method));      } else if (invocation.hasResult()) { -        nextOut++;//execution result is now a command, so the counter must be incremented          getProxy().getExecution().result(id, invocation.getResult());      }      if (method->isSync()) {           incomplete.process(enqueuedOp, true);          sendCompletion();       } -    //TODO: if window gets too large send unsolicited completion  } -void SessionState::handleContent(AMQFrame& frame, SequenceNumber& id) +void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)  { -    intrusive_ptr<Message> msg(msgBuilder.getMessage()); -    if (frame.getBof() && frame.getBos()) {//start of frameset -        id = nextIn++; +    if (frame.getBof() && frame.getBos()) //start of frameset          msgBuilder.start(id); -        msg = msgBuilder.getMessage(); -    } else {         -        id = msg->getCommandId(); -    } +    intrusive_ptr<Message> msg(msgBuilder.getMessage());      msgBuilder.handle(frame);      if (frame.getEof() && frame.getEos()) {//end of frameset          if (frame.getBof()) { @@ -240,19 +216,14 @@ void SessionState::handleContent(AMQFrame& frame, SequenceNumber& id)  void SessionState::enqueued(boost::intrusive_ptr<Message> msg)  { -    completed.add(msg->getCommandId()); -    if (msg->requiresAccept()) {         -        nextOut++;//accept is a command, so the counter must be incremented +    receiverCompleted(msg->getCommandId()); +    if (msg->requiresAccept())                   getProxy().getMessage().accept(SequenceSet(msg->getCommandId()));              } -}  void SessionState::handle(AMQFrame& frame)  { -    if (ignoring) return; -    received(frame); - -    SequenceNumber commandId; +    SequenceNumber commandId = receiverGetCurrent();      try {          //TODO: make command handling more uniform, regardless of whether          //commands carry content. @@ -277,29 +248,36 @@ void SessionState::handle(AMQFrame& frame)          } else {              getProxy().getExecution().exception(e.code, commandId, 0, 0, 0, e.what(), FieldTable());          } -        timeout = 0;          ignoring = true; -        handler->requestDetach(); +        handler->sendDetach();      }  }  DeliveryId SessionState::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token)  {      uint32_t maxFrameSize = getConnection().getFrameMax(); -    MessageDelivery::deliver(msg, getProxy().getHandler(), nextOut, token, maxFrameSize); -    return nextOut++; +    assert(senderGetCommandPoint().offset == 0); +    SequenceNumber commandId = senderGetCommandPoint().command; +    MessageDelivery::deliver(msg, getProxy().getHandler(), commandId, token, maxFrameSize); +    assert(senderGetCommandPoint() == SessionPoint(commandId+1, 0)); // Delivery has moved sendPoint. +    return commandId;  } -void SessionState::sendCompletion() -{ -    handler->sendCompletion(); +void SessionState::sendCompletion() { handler->sendCompletion(); } + +void SessionState::senderCompleted(const SequenceSet& commands) { +    qpid::SessionState::senderCompleted(commands); +    commands.for_each(boost::bind(&SemanticState::completed, &semanticState, _1, _2));  } -void SessionState::complete(const SequenceSet& commands)  -{ -    knownCompleted.add(commands); -    commands.for_each(ackOp); +void SessionState::readyToSend() { +    QPID_LOG(debug, getId() << ": ready to send, activating output."); +    assert(handler); +    sys::AggregateOutput& tasks = handler->getConnection().outputTasks; +    tasks.addOutputTask(&semanticState); +    tasks.activateOutput();  } +Broker& SessionState::getBroker() { return broker; }  }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index ae860e84c9..7b70789161 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -22,11 +22,9 @@   *   */ -#include "qpid/framing/Uuid.h" +#include "qpid/SessionState.h"  #include "qpid/framing/FrameHandler.h" -#include "qpid/framing/SessionState.h"  #include "qpid/framing/SequenceSet.h" -#include "qpid/framing/ProtocolVersion.h"  #include "qpid/sys/Mutex.h"  #include "qpid/sys/Time.h"  #include "qpid/management/Manageable.h" @@ -63,21 +61,20 @@ class SessionManager;   * Broker-side session state includes sessions handler chains, which may   * themselves have state.    */ -class SessionState : public framing::SessionState, +class SessionState : public qpid::SessionState,       public SessionContext,      public DeliveryAdapter, -    public management::Manageable +                     public management::Manageable, +                     public framing::FrameHandler  {    public: +    SessionState(Broker&, SessionHandler&, const SessionId&, const SessionState::Configuration&);      ~SessionState();      bool isAttached() const { return handler; }      void detach();      void attach(SessionHandler& handler); -     -    SessionHandler* getHandler(); -      /** @pre isAttached() */      framing::AMQP_ClientProxy& getProxy(); @@ -85,18 +82,15 @@ class SessionState : public framing::SessionState,      ConnectionState& getConnection();      bool isLocal(const ConnectionToken* t) const; -    uint32_t getTimeout() const { return timeout; } -    void setTimeout(uint32_t t) { timeout = t; } - -    Broker& getBroker() { return broker; } -    framing::ProtocolVersion getVersion() const { return version; } +    Broker& getBroker();      /** OutputControl **/      void activateOutput();      void handle(framing::AMQFrame& frame); -    void complete(const framing::SequenceSet& ranges);     +    void senderCompleted(const framing::SequenceSet& ranges); +          void sendCompletion();      //delivery adapter methods: @@ -107,29 +101,13 @@ class SessionState : public framing::SessionState,      management::Manageable::status_t          ManagementMethod (uint32_t methodId, management::Args& args); -    // Normally SessionManager creates sessions. -    SessionState(SessionManager*, -                 SessionHandler* out, -                 uint32_t timeout, -                 uint32_t ackInterval, -                 std::string& name); -     - -    framing::SequenceSet completed; -    framing::SequenceSet knownCompleted; -    framing::SequenceNumber nextIn; -    framing::SequenceNumber nextOut; +    void readyToSend();    private: -    typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation;     -    SessionManager* factory; +    Broker& broker;      SessionHandler* handler;     -    framing::Uuid id; -    uint32_t timeout;      sys::AbsTime expiry;        // Used by SessionManager. -    Broker& broker; -    framing::ProtocolVersion version;      sys::Mutex lock;      bool ignoring;      std::string name; @@ -139,12 +117,11 @@ class SessionState : public framing::SessionState,      MessageBuilder msgBuilder;      IncompleteMessageList incomplete; -    RangedOperation ackOp;      IncompleteMessageList::CompletionListener enqueuedOp;      management::Session::shared_ptr mgmtObject; -    void handleCommand(framing::AMQMethodBody* method, framing::SequenceNumber& id); -    void handleContent(framing::AMQFrame& frame, framing::SequenceNumber& id); +    void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id); +    void handleContent(framing::AMQFrame& frame, const framing::SequenceNumber& id);      void enqueued(boost::intrusive_ptr<Message> msg);      friend class SessionManager; diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index bca6c49c13..59353d7637 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -71,9 +71,9 @@ struct ClusterDeliverHandler : public FrameHandler {      void handle(AMQFrame& f) {          next->handle(f); -        Mutex::ScopedLock l(sender.lock); -        sender.busy=false; -        sender.lock.notify(); +        Mutex::ScopedLock l(senderLock); +        senderBusy=false; +        senderLock.notify();      }  }; diff --git a/cpp/src/qpid/framing/SequenceNumber.cpp b/cpp/src/qpid/framing/SequenceNumber.cpp index cba00c860a..9e682d89e4 100644 --- a/cpp/src/qpid/framing/SequenceNumber.cpp +++ b/cpp/src/qpid/framing/SequenceNumber.cpp @@ -21,6 +21,7 @@  #include "SequenceNumber.h"  #include "Buffer.h" +#include <ostream>  using qpid::framing::SequenceNumber;  using qpid::framing::Buffer; @@ -102,4 +103,8 @@ int32_t operator-(const SequenceNumber& a, const SequenceNumber& b)      return result;  } +std::ostream& operator<<(std::ostream& o, const SequenceNumber& n) { +    return o << n.getValue(); +} +  }} diff --git a/cpp/src/qpid/framing/SequenceNumber.h b/cpp/src/qpid/framing/SequenceNumber.h index d659bec5c1..aacd77501b 100644 --- a/cpp/src/qpid/framing/SequenceNumber.h +++ b/cpp/src/qpid/framing/SequenceNumber.h @@ -22,6 +22,7 @@  #define _framing_SequenceNumber_h  #include "amqp_types.h" +#include <iosfwd>  namespace qpid {  namespace framing { @@ -66,6 +67,8 @@ struct Window      SequenceNumber lwm;  }; +std::ostream& operator<<(std::ostream& o, const SequenceNumber& n); +  }} // namespace qpid::framing diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp index dfda9ecae1..aeff35dbf0 100644 --- a/cpp/src/tests/ClientSessionTest.cpp +++ b/cpp/src/tests/ClientSessionTest.cpp @@ -150,7 +150,7 @@ QPID_AUTO_TEST_CASE(testDispatcherThread)      ClientSessionFixture fix;      fix.session =fix.connection.newSession(ASYNC);      fix.declareSubscribe(); -    size_t count = 1000; +    size_t count = 10;      DummyListener listener(fix.session, "my-dest", count);      sys::Thread t(listener);      for (size_t i = 0; i < count; ++i) { @@ -205,7 +205,7 @@ QPID_AUTO_TEST_CASE(testSendToSelf) {      sys::Thread runner(fix.subs);//start dispatcher thread      string data("msg");      Message msg(data, "myq"); -    const uint count=1000; +    const uint count=10;      for (uint i = 0; i < count; ++i) {          fix.session.messageTransfer(content=msg);      } diff --git a/cpp/src/tests/SessionState.cpp b/cpp/src/tests/SessionState.cpp index 71b90ea9f1..4beef87cfe 100644 --- a/cpp/src/tests/SessionState.cpp +++ b/cpp/src/tests/SessionState.cpp @@ -58,7 +58,7 @@ string str(const AMQFrame& f) {      return "H";                 // Must be a header.  }  // Make a string from a range of frames. -string str(const vector<AMQFrame>& frames) { +string str(const boost::iterator_range<vector<AMQFrame>::const_iterator>& frames) {      string (*strFrame)(const AMQFrame&) = str;      return applyAccumulate(frames.begin(), frames.end(), string(), ptr_fun(strFrame));  } @@ -84,7 +84,7 @@ AMQFrame contentFrameChar(char content, bool isLast=true) {  }  // Send frame & return size of frame. -size_t send(qpid::SessionState& s, const AMQFrame& f) { s.sender.record(f); return f.size(); } +size_t send(qpid::SessionState& s, const AMQFrame& f) { s.senderRecord(f); return f.size(); }  // Send transfer command with no content.  size_t transfer0(qpid::SessionState& s) { return send(s, transferFrame(false)); }  // Send transfer frame with single content frame. @@ -126,136 +126,132 @@ using qpid::SessionPoint;  QPID_AUTO_TEST_CASE(testSendGetReplyList) {      qpid::SessionState s; -    s.sender.getCommandPoint(); +    s.senderGetCommandPoint();      transfer1(s, "abc");      transfers(s, "def");      transferN(s, "xyz"); -    BOOST_CHECK_EQUAL(str(s.sender.getReplayList()),"CabcCdCeCfCxyz"); +    BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(0,0))),"CabcCdCeCfCxyz");      // Ignore controls. -    s.sender.record(AMQFrame(in_place<SessionFlushBody>())); -    BOOST_CHECK_EQUAL(str(s.sender.getReplayList()),"CabcCdCeCfCxyz");     +    s.senderRecord(AMQFrame(in_place<SessionFlushBody>())); +    BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(2,0))),"CeCfCxyz");      }  QPID_AUTO_TEST_CASE(testNeedFlush) {      qpid::SessionState::Configuration c;      // sync after 2 1-byte transfers or equivalent bytes. -    c.replaySyncSize = 2*(transferFrameSize()+contentFrameSize()); +    c.replayFlushLimit = 2*(transferFrameSize()+contentFrameSize());      qpid::SessionState s(SessionId(), c); -    s.sender.getCommandPoint(); +    s.senderGetCommandPoint();      transfers(s, "a"); -    BOOST_CHECK(!s.sender.needFlush()); +    BOOST_CHECK(!s.senderNeedFlush());      transfers(s, "b"); -    BOOST_CHECK(s.sender.needFlush()); -    s.sender.recordFlush(); -    BOOST_CHECK(!s.sender.needFlush()); +    BOOST_CHECK(s.senderNeedFlush()); +    s.senderRecordFlush(); +    BOOST_CHECK(!s.senderNeedFlush());      transfers(s, "c"); -    BOOST_CHECK(!s.sender.needFlush()); +    BOOST_CHECK(!s.senderNeedFlush());      transfers(s, "d"); -    BOOST_CHECK(s.sender.needFlush()); -    BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "CaCbCcCd"); +    BOOST_CHECK(s.senderNeedFlush()); +    BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint())), "CaCbCcCd");  }  QPID_AUTO_TEST_CASE(testPeerConfirmed) {      qpid::SessionState::Configuration c;      // sync after 2 1-byte transfers or equivalent bytes. -    c.replaySyncSize = 2*(transferFrameSize()+contentFrameSize()); +    c.replayFlushLimit = 2*(transferFrameSize()+contentFrameSize());      qpid::SessionState s(SessionId(), c); -    s.sender.getCommandPoint(); +    s.senderGetCommandPoint();      transfers(s, "ab"); -    BOOST_CHECK(s.sender.needFlush()); +    BOOST_CHECK(s.senderNeedFlush());      transfers(s, "cd"); -    BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "CaCbCcCd"); -    s.sender.confirmed(SessionPoint(3)); -    BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "Cd"); -    BOOST_CHECK(!s.sender.needFlush()); - -    // Never go backwards. -    s.sender.confirmed(SessionPoint(2)); -    s.sender.confirmed(SessionPoint(3)); +    BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(0,0))), "CaCbCcCd"); +    s.senderConfirmed(SessionPoint(3)); +    BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(3,0))), "Cd"); +    BOOST_CHECK(!s.senderNeedFlush());      // Multi-frame transfer.      transfer1(s, "efg");      transfers(s, "xy"); -    BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "CdCefgCxCy"); -    BOOST_CHECK(s.sender.needFlush()); +    BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(3,0))), "CdCefgCxCy"); +    BOOST_CHECK(s.senderNeedFlush()); -    s.sender.confirmed(SessionPoint(4)); -    BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "CefgCxCy"); -    BOOST_CHECK(s.sender.needFlush()); +    s.senderConfirmed(SessionPoint(4)); +    BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(4,0))), "CefgCxCy"); +    BOOST_CHECK(s.senderNeedFlush()); -    s.sender.confirmed(SessionPoint(5)); -    BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "CxCy"); -    BOOST_CHECK(s.sender.needFlush()); +    s.senderConfirmed(SessionPoint(5)); +    BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(5,0))), "CxCy"); +    BOOST_CHECK(s.senderNeedFlush()); -    s.sender.confirmed(SessionPoint(6)); -    BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "Cy"); -    BOOST_CHECK(!s.sender.needFlush()); +    s.senderConfirmed(SessionPoint(6)); +    BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(6,0))), "Cy"); +    BOOST_CHECK(!s.senderNeedFlush());  }  QPID_AUTO_TEST_CASE(testPeerCompleted) {      qpid::SessionState s; -    s.sender.getCommandPoint(); +    s.senderGetCommandPoint();      // Completion implies confirmation       transfers(s, "abc"); -    BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "CaCbCc"); +    BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(0,0))), "CaCbCc");      SequenceSet set(SequenceSet() + 0 + 1); -    s.sender.completed(set); -    BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "Cc"); +    s.senderCompleted(set); +    BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(2,0))), "Cc");      transfers(s, "def");      // We dont do out-of-order confirmation, so this will only confirm up to 3:      set = SequenceSet(SequenceSet() + 2 + 3 + 5); -    s.sender.completed(set);     -    BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "CeCf"); +    s.senderCompleted(set);     +    BOOST_CHECK_EQUAL(str(s.senderExpected(SessionPoint(4,0))), "CeCf");  }  QPID_AUTO_TEST_CASE(testReceive) {      // Advance expected/received correctly      qpid::SessionState s; -    s.receiver.setCommandPoint(SessionPoint()); -    BOOST_CHECK_EQUAL(s.receiver.getExpected(), SessionPoint(0)); -    BOOST_CHECK_EQUAL(s.receiver.getReceived(), SessionPoint(0)); +    s.receiverSetCommandPoint(SessionPoint()); +    BOOST_CHECK_EQUAL(s.receiverGetExpected(), SessionPoint(0)); +    BOOST_CHECK_EQUAL(s.receiverGetReceived(), SessionPoint(0)); -    BOOST_CHECK(s.receiver.record(transferFrame(false))); -    BOOST_CHECK_EQUAL(s.receiver.getExpected(), SessionPoint(1)); -    BOOST_CHECK_EQUAL(s.receiver.getReceived(), SessionPoint(1)); +    BOOST_CHECK(s.receiverRecord(transferFrame(false))); +    BOOST_CHECK_EQUAL(s.receiverGetExpected(), SessionPoint(1)); +    BOOST_CHECK_EQUAL(s.receiverGetReceived(), SessionPoint(1)); -    BOOST_CHECK(s.receiver.record(transferFrame(true))); +    BOOST_CHECK(s.receiverRecord(transferFrame(true)));      SessionPoint point = SessionPoint(1, transferFrameSize()); -    BOOST_CHECK_EQUAL(s.receiver.getExpected(), point); -    BOOST_CHECK_EQUAL(s.receiver.getReceived(), point); -    BOOST_CHECK(s.receiver.record(contentFrame("", false))); +    BOOST_CHECK_EQUAL(s.receiverGetExpected(), point); +    BOOST_CHECK_EQUAL(s.receiverGetReceived(), point); +    BOOST_CHECK(s.receiverRecord(contentFrame("", false)));      point.offset += contentFrameSize(0); -    BOOST_CHECK_EQUAL(s.receiver.getExpected(), point); -    BOOST_CHECK_EQUAL(s.receiver.getReceived(), point); -    BOOST_CHECK(s.receiver.record(contentFrame("", true))); -    BOOST_CHECK_EQUAL(s.receiver.getExpected(), SessionPoint(2)); -    BOOST_CHECK_EQUAL(s.receiver.getReceived(), SessionPoint(2)); +    BOOST_CHECK_EQUAL(s.receiverGetExpected(), point); +    BOOST_CHECK_EQUAL(s.receiverGetReceived(), point); +    BOOST_CHECK(s.receiverRecord(contentFrame("", true))); +    BOOST_CHECK_EQUAL(s.receiverGetExpected(), SessionPoint(2)); +    BOOST_CHECK_EQUAL(s.receiverGetReceived(), SessionPoint(2));      // Idempotence barrier, rewind expected & receive some duplicates. -    s.receiver.setCommandPoint(SessionPoint(1)); -    BOOST_CHECK(!s.receiver.record(transferFrame(false))); -    BOOST_CHECK_EQUAL(s.receiver.getExpected(), SessionPoint(2)); -    BOOST_CHECK_EQUAL(s.receiver.getReceived(), SessionPoint(2)); -    BOOST_CHECK(s.receiver.record(transferFrame(false))); -    BOOST_CHECK_EQUAL(s.receiver.getExpected(), SessionPoint(3)); -    BOOST_CHECK_EQUAL(s.receiver.getReceived(), SessionPoint(3)); +    s.receiverSetCommandPoint(SessionPoint(1)); +    BOOST_CHECK(!s.receiverRecord(transferFrame(false))); +    BOOST_CHECK_EQUAL(s.receiverGetExpected(), SessionPoint(2)); +    BOOST_CHECK_EQUAL(s.receiverGetReceived(), SessionPoint(2)); +    BOOST_CHECK(s.receiverRecord(transferFrame(false))); +    BOOST_CHECK_EQUAL(s.receiverGetExpected(), SessionPoint(3)); +    BOOST_CHECK_EQUAL(s.receiverGetReceived(), SessionPoint(3));  }  QPID_AUTO_TEST_CASE(testCompleted) {      // completed & unknownCompleted      qpid::SessionState s; -    s.receiver.setCommandPoint(SessionPoint()); -    s.receiver.record(transferFrame(false)); -    s.receiver.record(transferFrame(false)); -    s.receiver.record(transferFrame(false)); -    s.receiver.completed(1); -    BOOST_CHECK_EQUAL(s.receiver.getUnknownComplete(), SequenceSet(SequenceSet()+1)); -    s.receiver.completed(0); -    BOOST_CHECK_EQUAL(s.receiver.getUnknownComplete(), +    s.receiverSetCommandPoint(SessionPoint()); +    s.receiverRecord(transferFrame(false)); +    s.receiverRecord(transferFrame(false)); +    s.receiverRecord(transferFrame(false)); +    s.receiverCompleted(1); +    BOOST_CHECK_EQUAL(s.receiverGetUnknownComplete(), SequenceSet(SequenceSet()+1)); +    s.receiverCompleted(0); +    BOOST_CHECK_EQUAL(s.receiverGetUnknownComplete(),                        SequenceSet(SequenceSet() + SequenceSet::Range(0,2))); -    s.receiver.knownCompleted(SequenceSet(SequenceSet()+1)); -    BOOST_CHECK_EQUAL(s.receiver.getUnknownComplete(), SequenceSet(SequenceSet()+2)); +    s.receiverKnownCompleted(SequenceSet(SequenceSet()+1)); +    BOOST_CHECK_EQUAL(s.receiverGetUnknownComplete(), SequenceSet(SequenceSet()+2));      // TODO aconway 2008-04-30: missing tests for known-completed.  } diff --git a/cpp/src/tests/federation.py b/cpp/src/tests/federation.py index 98e34be0e9..0f52165587 100755 --- a/cpp/src/tests/federation.py +++ b/cpp/src/tests/federation.py @@ -57,7 +57,7 @@ def remote_port():  class Helper:      def __init__(self, parent):          self.parent = parent -        self.session = parent.conn.session("2") +        self.session = parent.conn.session("Helper")          self.mc  = managementClient(self.session.spec)          self.mch = self.mc.addChannel(self.session)          self.mc.syncWaitForStable(self.mch) @@ -126,7 +126,7 @@ class FederationTests(TestBase010):          #send messages to remote broker and confirm it is routed to local broker          r_conn = self.connect(host=remote_host(), port=remote_port()) -        r_session = r_conn.session("1") +        r_session = r_conn.session("test_pull_from_exchange")          for i in range(1, 11):              dp = r_session.delivery_properties(routing_key="my-key") @@ -153,7 +153,7 @@ class FederationTests(TestBase010):          #setup queue on remote broker and add some messages          r_conn = self.connect(host=remote_host(), port=remote_port()) -        r_session = r_conn.session("1") +        r_session = r_conn.session("test_pull_from_queue")          r_session.queue_declare(queue="my-bridge-queue", exclusive=True, auto_delete=True)          for i in range(1, 6):              dp = r_session.delivery_properties(routing_key="my-bridge-queue") @@ -223,7 +223,7 @@ class FederationTests(TestBase010):          #send messages to remote broker and confirm it is routed to local broker          r_conn = self.connect(host=remote_host(), port=remote_port()) -        r_session = r_conn.session("1") +        r_session = r_conn.session("test_tracing")          trace = [None, "exclude-me", "a,exclude-me,b", "also-exclude-me,c", "dont-exclude-me"]          body = ["yes", "first-bad", "second-bad", "third-bad", "yes"]  | 
