diff options
Diffstat (limited to 'cpp/src/qpid/amqp_0_10/SessionHandler.cpp')
| -rw-r--r-- | cpp/src/qpid/amqp_0_10/SessionHandler.cpp | 61 |
1 files changed, 28 insertions, 33 deletions
diff --git a/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp index 2f6b59e901..f58d59cbd7 100644 --- a/cpp/src/qpid/amqp_0_10/SessionHandler.cpp +++ b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp @@ -34,6 +34,8 @@ namespace amqp_0_10 { using namespace framing; using namespace std; +#define CHECK_ATTACHED(MSG) if (!getState()) throw NotAttachedException(QPID_MSG(MSG << ": channel " << channel.get() << " is not attached")) + SessionHandler::SessionHandler(FrameHandler* out, ChannelId ch) : channel(ch, out), peer(channel), ignoring(false), sendReady(), receiveReady() {} @@ -61,14 +63,6 @@ session::DetachCode convert(uint8_t code) { } // namespace -void SessionHandler::checkAttached() { - if (!getState()) - throw NotAttachedException( - QPID_MSG("Channel " << channel.get() << " is not attached")); - assert(getInHandler()); - assert(channel.next); -} - void SessionHandler::invoke(const AMQMethodBody& m) { framing::invoke(*this, m); } @@ -82,7 +76,7 @@ void SessionHandler::handleIn(AMQFrame& f) { else if (isSessionControl(m)) invoke(*m); else { - checkAttached(); + CHECK_ATTACHED("receiving " << f); if (!receiveReady) throw IllegalStateException(QPID_MSG(getState()->getId() << ": Not ready to receive data")); if (!getState()->receiverRecord(f)) @@ -126,7 +120,7 @@ bool isCommand(const AMQFrame& f) { } // namespace void SessionHandler::handleOut(AMQFrame& f) { - checkAttached(); + CHECK_ATTACHED("sending " << f); if (!sendReady) throw IllegalStateException(QPID_MSG(getState()->getId() << ": Not ready to send data")); getState()->senderRecord(f); @@ -137,14 +131,6 @@ void SessionHandler::handleOut(AMQFrame& f) { channel.handle(f); } -void SessionHandler::checkName(const std::string& name) { - checkAttached(); - if (name != getState()->getId().getName()) - throw InvalidArgumentException( - QPID_MSG("Incorrect session name: " << name - << ", expecting: " << getState()->getId().getName())); -} - void SessionHandler::attach(const std::string& name_, bool force) { // Save the name for possible session-busy exception. Session-busy // can be thrown before we have attached the handler to a valid @@ -164,18 +150,27 @@ void SessionHandler::attach(const std::string& name_, bool force) { sendCommandPoint(getState()->senderGetCommandPoint()); } +#define CHECK_NAME(NAME, MSG) do { \ + CHECK_ATTACHED(MSG); \ + if (NAME != getState()->getId().getName()) \ + throw InvalidArgumentException( \ + QPID_MSG(MSG << ": incorrect session name: " << NAME \ + << ", expecting: " << getState()->getId().getName())); \ + } while(0) + + void SessionHandler::attached(const std::string& name) { - checkName(name); + CHECK_NAME(name, "session.attached"); } void SessionHandler::detach(const std::string& name) { - checkName(name); + CHECK_NAME(name, "session.detach"); peer.detached(name, session::DETACH_CODE_NORMAL); handleDetach(); } void SessionHandler::detached(const std::string& name, uint8_t code) { - checkName(name); + CHECK_NAME(name, "session.detached"); ignoring = false; if (code != session::DETACH_CODE_NORMAL) channelException(convert(code), "session.detached from peer."); @@ -189,18 +184,18 @@ void SessionHandler::handleDetach() { } void SessionHandler::requestTimeout(uint32_t t) { - checkAttached(); + CHECK_ATTACHED("session.request-timeout"); getState()->setTimeout(t); peer.timeout(t); } void SessionHandler::timeout(uint32_t t) { - checkAttached(); + CHECK_ATTACHED("session.request-timeout"); getState()->setTimeout(t); } void SessionHandler::commandPoint(const SequenceNumber& id, uint64_t offset) { - checkAttached(); + CHECK_ATTACHED("session.command-point"); getState()->receiverSetCommandPoint(SessionPoint(id, offset)); if (!receiveReady) { receiveReady = true; @@ -209,7 +204,7 @@ void SessionHandler::commandPoint(const SequenceNumber& id, uint64_t offset) { } void SessionHandler::expected(const SequenceSet& commands, const Array& /*fragments*/) { - checkAttached(); + CHECK_ATTACHED("session.expected"); if (getState()->hasState()) { // Replay if (commands.empty()) throw IllegalStateException( QPID_MSG(getState()->getId() << ": has state but client is attaching as new session.")); @@ -225,14 +220,14 @@ void SessionHandler::expected(const SequenceSet& commands, const Array& /*fragme } void SessionHandler::confirmed(const SequenceSet& commands, const Array& /*fragments*/) { - checkAttached(); + CHECK_ATTACHED("session.confirmed"); // Ignore non-contiguous confirmations. if (!commands.empty() && commands.front() >= getState()->senderGetReplayPoint()) getState()->senderConfirmed(commands.rangesBegin()->last()); } void SessionHandler::completed(const SequenceSet& commands, bool timelyReply) { - checkAttached(); + CHECK_ATTACHED("session.completed"); getState()->senderCompleted(commands); if (getState()->senderNeedKnownCompleted() || timelyReply) { peer.knownCompleted(commands); @@ -241,12 +236,12 @@ void SessionHandler::completed(const SequenceSet& commands, bool timelyReply) { } void SessionHandler::knownCompleted(const SequenceSet& commands) { - checkAttached(); + CHECK_ATTACHED("session.known-completed"); getState()->receiverKnownCompleted(commands); } void SessionHandler::flush(bool expected, bool confirmed, bool completed) { - checkAttached(); + CHECK_ATTACHED("session.flush"); if (expected) { SequenceSet expectSet; if (getState()->hasState()) @@ -270,19 +265,19 @@ void SessionHandler::gap(const SequenceSet& /*commands*/) { void SessionHandler::sendDetach() { - checkAttached(); + CHECK_ATTACHED("session.sendDetach"); ignoring = true; peer.detach(getState()->getId().getName()); } void SessionHandler::sendCompletion() { - checkAttached(); + CHECK_ATTACHED("session.send-completion"); const SequenceSet& c = getState()->receiverGetUnknownComplete(); peer.completed(c, getState()->receiverNeedKnownCompleted()); } void SessionHandler::sendAttach(bool force) { - checkAttached(); + CHECK_ATTACHED("session.send-attach"); QPID_LOG(debug, "SessionHandler::sendAttach attach id=" << getState()->getId()); peer.attach(getState()->getId().getName(), force); if (getState()->hasState()) @@ -306,7 +301,7 @@ void SessionHandler::markReadyToSend() { } void SessionHandler::sendTimeout(uint32_t t) { - checkAttached(); + CHECK_ATTACHED("session.send-timeout"); peer.requestTimeout(t); } |
