diff options
Diffstat (limited to 'cpp')
| -rw-r--r-- | cpp/src/qpid/amqp_0_10/SessionHandler.cpp | 31 | ||||
| -rw-r--r-- | cpp/src/qpid/amqp_0_10/SessionHandler.h | 9 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.cpp | 23 | ||||
| -rw-r--r-- | cpp/src/qpid/client/SessionImpl.cpp | 12 | ||||
| -rw-r--r-- | cpp/src/qpid/client/SessionImpl.h | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 1 | 
6 files changed, 52 insertions, 29 deletions
| diff --git a/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp index c33573b178..014e057968 100644 --- a/cpp/src/qpid/amqp_0_10/SessionHandler.cpp +++ b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp @@ -35,14 +35,14 @@ using namespace framing;  using namespace std;  void SessionHandler::checkAttached() { -    if (!getState()) { -        ignoring = true; +    if (!getState())           throw NotAttachedException(QPID_MSG("Channel " << channel.get() << " is not attached")); -    }  }  SessionHandler::SessionHandler(FrameHandler* out, ChannelId ch) -    : channel(ch, out), peer(channel), ignoring(false), sendReady(), receiveReady() {} +    : channel(ch, out), peer(channel), +      awaitingDetached(false), +      sendReady(), receiveReady() {}  SessionHandler::~SessionHandler() {} @@ -50,7 +50,7 @@ namespace {  bool isSessionControl(AMQMethodBody* m) {      return m &&          m->amqpClassId() == SESSION_CLASS_ID; -} +    }  bool isSessionDetachedControl(AMQMethodBody* m) {      return isSessionControl(m) &&          m->amqpMethodId() == SESSION_DETACHED_METHOD_ID; @@ -76,12 +76,13 @@ void SessionHandler::handleIn(AMQFrame& f) {      // Note on channel states: a channel is attached if session != 0      AMQMethodBody* m = f.getBody()->getMethod();      try { -        if (ignoring && !isSessionDetachedControl(m)) -            return; -        else if (isSessionControl(m)) +        if (isSessionControl(m)) {              invoke(*m); +        }          else { -            checkAttached(); +            // Drop frames if we are awaiting a detached control or +            // if we are currently detached. +            if (awaitingDetached || !getState()) return;                if (!receiveReady)                  throw IllegalStateException(QPID_MSG(getState()->getId() << ": Not ready to receive data"));              if (!getState()->receiverRecord(f)) @@ -142,11 +143,11 @@ void SessionHandler::attach(const std::string& name_, bool force) {      // Save the name for possible session-busy exception. Session-busy      // can be thrown before we have attached the handler to a valid      // SessionState, and in that case we need the name to send peer.detached -    name = name_;                +    name = name_;      if (getState() && name == getState()->getId().getName())          return;                 // Idempotent      if (getState()) -        throw TransportBusyException( +            throw TransportBusyException(              QPID_MSG("Channel " << channel.get() << " already attached to " << getState()->getId()));      setState(name, force);      QPID_LOG(debug, "Attached channel " << channel.get() << " to " << getState()->getId()); @@ -157,8 +158,8 @@ void SessionHandler::attach(const std::string& name_, bool force) {          sendCommandPoint(getState()->senderGetCommandPoint());  } -#define CHECK_NAME(NAME, MSG) do {                                       \ -    checkAttached();                                                \ +#define CHECK_NAME(NAME, MSG) do {                                      \ +    checkAttached();                                                    \      if (NAME != getState()->getId().getName())                          \          throw InvalidArgumentException(                                 \              QPID_MSG(MSG << ": incorrect session name: " << NAME        \ @@ -178,7 +179,7 @@ void SessionHandler::detach(const std::string& name) {  void SessionHandler::detached(const std::string& name, uint8_t code) {      CHECK_NAME(name, "session.detached"); -    ignoring = false; +    awaitingDetached = false;      if (code != session::DETACH_CODE_NORMAL)          channelException(convert(code), "session.detached from peer.");      else { @@ -273,7 +274,7 @@ void SessionHandler::gap(const SequenceSet& /*commands*/) {  void SessionHandler::sendDetach()  {      checkAttached(); -    ignoring = true; +    awaitingDetached = true;      peer.detach(getState()->getId().getName());  } diff --git a/cpp/src/qpid/amqp_0_10/SessionHandler.h b/cpp/src/qpid/amqp_0_10/SessionHandler.h index 3ecc5e6141..fa6e6f4af6 100644 --- a/cpp/src/qpid/amqp_0_10/SessionHandler.h +++ b/cpp/src/qpid/amqp_0_10/SessionHandler.h @@ -101,14 +101,15 @@ class SessionHandler : public framing::AMQP_AllOperations::SessionHandler,      QPID_COMMON_EXTERN virtual void handleOut(framing::AMQFrame&);      framing::ChannelHandler channel; -    framing::AMQP_AllProxy::Session  peer; -    bool ignoring; -    bool sendReady, receiveReady; -    std::string name;    private:      void checkAttached();      void sendCommandPoint(const SessionPoint&); + +    framing::AMQP_AllProxy::Session  peer; +    std::string name; +    bool awaitingDetached; +    bool sendReady, receiveReady;  };  }} // namespace qpid::amqp_0_10 diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index c56d6a6807..6a46cb6249 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -95,11 +95,21 @@ ConnectionImpl::~ConnectionImpl() {  void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session, uint16_t channel)  {      Mutex::ScopedLock l(lock); -    session->setChannel(channel == NEXT_CHANNEL ? nextChannel++ : channel); -    boost::weak_ptr<SessionImpl>& s = sessions[session->getChannel()]; -    boost::shared_ptr<SessionImpl> ss = s.lock(); -    if (ss) throw SessionBusyException(QPID_MSG("Channel " << ss->getChannel() << " attached to " << ss->getId())); -    s = session; +    for (uint16_t i = 0; i < NEXT_CHANNEL; i++) { //will at most search through channels once +        uint16_t c = channel == NEXT_CHANNEL ? nextChannel++ : channel; +        boost::weak_ptr<SessionImpl>& s = sessions[c]; +        boost::shared_ptr<SessionImpl> ss = s.lock(); +        if (!ss) { +            //channel is free, we can assign it to this session +            session->setChannel(c); +            s = session; +            return; +        } else if (channel != NEXT_CHANNEL) { +            //channel is taken and was requested explicitly so don't look for another +            throw SessionBusyException(QPID_MSG("Channel " << ss->getChannel() << " attached to " << ss->getId())); +        } //else channel is busy, but we can keep looking for a free one +    } +  }  void ConnectionImpl::handle(framing::AMQFrame& frame) @@ -165,7 +175,6 @@ void ConnectionImpl::open()      } else {          QPID_LOG(debug, "No security layer in place");      } -      failover.reset(new FailoverListener(shared_from_this(), handler.knownBrokersUrls));  } @@ -246,7 +255,7 @@ const ConnectionSettings& ConnectionImpl::getNegotiatedSettings()  {      return handler;  } -     +  std::vector<qpid::Url> ConnectionImpl::getKnownBrokers() {      return failover ? failover->getKnownBrokers() : handler.knownBrokersUrls;  } diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp index 32541dceac..d443a1170b 100644 --- a/cpp/src/qpid/client/SessionImpl.cpp +++ b/cpp/src/qpid/client/SessionImpl.cpp @@ -65,7 +65,8 @@ SessionImpl::SessionImpl(const std::string& name, boost::shared_ptr<ConnectionIm        nextIn(0),        nextOut(0),        sendMsgCredit(0), -      doClearDeliveryPropertiesExchange(true) +      doClearDeliveryPropertiesExchange(true), +      autoDetach(true)  {      channel.next = connectionShared.get();  } @@ -73,8 +74,11 @@ SessionImpl::SessionImpl(const std::string& name, boost::shared_ptr<ConnectionIm  SessionImpl::~SessionImpl() {      {          Lock l(state); -        if (state != DETACHED) { -            QPID_LOG(warning, "Session was not closed cleanly"); +        if (state != DETACHED && state != DETACHING) { +            QPID_LOG(warning, "Session was not closed cleanly: " << id); +            // Inform broker but don't wait for detached as that deadlocks. +            // The detached will be ignored as the channel will be invalid. +            if (autoDetach) detach();              setState(DETACHED);              handleClosed();              state.waitWaiters(); @@ -816,4 +820,6 @@ boost::shared_ptr<ConnectionImpl> SessionImpl::getConnection()      return connectionWeak.lock();  } +void SessionImpl::disableAutoDetach() { autoDetach = false; } +  }} diff --git a/cpp/src/qpid/client/SessionImpl.h b/cpp/src/qpid/client/SessionImpl.h index 0624bb8b3c..cbd0742045 100644 --- a/cpp/src/qpid/client/SessionImpl.h +++ b/cpp/src/qpid/client/SessionImpl.h @@ -132,6 +132,9 @@ public:      void setDoClearDeliveryPropertiesExchange(bool b=true) { doClearDeliveryPropertiesExchange = b; } +    /** Suppress sending detach in destructor. Used by cluster to build session state */ +    void disableAutoDetach(); +  private:      enum State {          INACTIVE, @@ -247,6 +250,8 @@ private:      bool doClearDeliveryPropertiesExchange; +    bool autoDetach; +        friend class client::SessionHandler;  }; diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index d6df8bd5ac..a9761962e8 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -313,6 +313,7 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {      // Create a client session to update session state.       boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection);      boost::shared_ptr<client::SessionImpl> simpl = cimpl->newSession(ss->getId().getName(), ss->getTimeout(), sh.getChannel()); +    simpl->disableAutoDetach();      client::SessionBase_0_10Access(shadowSession).set(simpl);      AMQP_AllProxy::ClusterConnection proxy(simpl->out); | 
