summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/amqp_0_10/SessionHandler.cpp')
-rw-r--r--cpp/src/qpid/amqp_0_10/SessionHandler.cpp61
1 files changed, 28 insertions, 33 deletions
diff --git a/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
index 2f6b59e901..f58d59cbd7 100644
--- a/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
+++ b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
@@ -34,6 +34,8 @@ namespace amqp_0_10 {
using namespace framing;
using namespace std;
+#define CHECK_ATTACHED(MSG) if (!getState()) throw NotAttachedException(QPID_MSG(MSG << ": channel " << channel.get() << " is not attached"))
+
SessionHandler::SessionHandler(FrameHandler* out, ChannelId ch)
: channel(ch, out), peer(channel), ignoring(false), sendReady(), receiveReady() {}
@@ -61,14 +63,6 @@ session::DetachCode convert(uint8_t code) {
} // namespace
-void SessionHandler::checkAttached() {
- if (!getState())
- throw NotAttachedException(
- QPID_MSG("Channel " << channel.get() << " is not attached"));
- assert(getInHandler());
- assert(channel.next);
-}
-
void SessionHandler::invoke(const AMQMethodBody& m) {
framing::invoke(*this, m);
}
@@ -82,7 +76,7 @@ void SessionHandler::handleIn(AMQFrame& f) {
else if (isSessionControl(m))
invoke(*m);
else {
- checkAttached();
+ CHECK_ATTACHED("receiving " << f);
if (!receiveReady)
throw IllegalStateException(QPID_MSG(getState()->getId() << ": Not ready to receive data"));
if (!getState()->receiverRecord(f))
@@ -126,7 +120,7 @@ bool isCommand(const AMQFrame& f) {
} // namespace
void SessionHandler::handleOut(AMQFrame& f) {
- checkAttached();
+ CHECK_ATTACHED("sending " << f);
if (!sendReady)
throw IllegalStateException(QPID_MSG(getState()->getId() << ": Not ready to send data"));
getState()->senderRecord(f);
@@ -137,14 +131,6 @@ void SessionHandler::handleOut(AMQFrame& f) {
channel.handle(f);
}
-void SessionHandler::checkName(const std::string& name) {
- checkAttached();
- if (name != getState()->getId().getName())
- throw InvalidArgumentException(
- QPID_MSG("Incorrect session name: " << name
- << ", expecting: " << getState()->getId().getName()));
-}
-
void SessionHandler::attach(const std::string& name_, bool force) {
// Save the name for possible session-busy exception. Session-busy
// can be thrown before we have attached the handler to a valid
@@ -164,18 +150,27 @@ void SessionHandler::attach(const std::string& name_, bool force) {
sendCommandPoint(getState()->senderGetCommandPoint());
}
+#define CHECK_NAME(NAME, MSG) do { \
+ CHECK_ATTACHED(MSG); \
+ if (NAME != getState()->getId().getName()) \
+ throw InvalidArgumentException( \
+ QPID_MSG(MSG << ": incorrect session name: " << NAME \
+ << ", expecting: " << getState()->getId().getName())); \
+ } while(0)
+
+
void SessionHandler::attached(const std::string& name) {
- checkName(name);
+ CHECK_NAME(name, "session.attached");
}
void SessionHandler::detach(const std::string& name) {
- checkName(name);
+ CHECK_NAME(name, "session.detach");
peer.detached(name, session::DETACH_CODE_NORMAL);
handleDetach();
}
void SessionHandler::detached(const std::string& name, uint8_t code) {
- checkName(name);
+ CHECK_NAME(name, "session.detached");
ignoring = false;
if (code != session::DETACH_CODE_NORMAL)
channelException(convert(code), "session.detached from peer.");
@@ -189,18 +184,18 @@ void SessionHandler::handleDetach() {
}
void SessionHandler::requestTimeout(uint32_t t) {
- checkAttached();
+ CHECK_ATTACHED("session.request-timeout");
getState()->setTimeout(t);
peer.timeout(t);
}
void SessionHandler::timeout(uint32_t t) {
- checkAttached();
+ CHECK_ATTACHED("session.request-timeout");
getState()->setTimeout(t);
}
void SessionHandler::commandPoint(const SequenceNumber& id, uint64_t offset) {
- checkAttached();
+ CHECK_ATTACHED("session.command-point");
getState()->receiverSetCommandPoint(SessionPoint(id, offset));
if (!receiveReady) {
receiveReady = true;
@@ -209,7 +204,7 @@ void SessionHandler::commandPoint(const SequenceNumber& id, uint64_t offset) {
}
void SessionHandler::expected(const SequenceSet& commands, const Array& /*fragments*/) {
- checkAttached();
+ CHECK_ATTACHED("session.expected");
if (getState()->hasState()) { // Replay
if (commands.empty()) throw IllegalStateException(
QPID_MSG(getState()->getId() << ": has state but client is attaching as new session."));
@@ -225,14 +220,14 @@ void SessionHandler::expected(const SequenceSet& commands, const Array& /*fragme
}
void SessionHandler::confirmed(const SequenceSet& commands, const Array& /*fragments*/) {
- checkAttached();
+ CHECK_ATTACHED("session.confirmed");
// Ignore non-contiguous confirmations.
if (!commands.empty() && commands.front() >= getState()->senderGetReplayPoint())
getState()->senderConfirmed(commands.rangesBegin()->last());
}
void SessionHandler::completed(const SequenceSet& commands, bool timelyReply) {
- checkAttached();
+ CHECK_ATTACHED("session.completed");
getState()->senderCompleted(commands);
if (getState()->senderNeedKnownCompleted() || timelyReply) {
peer.knownCompleted(commands);
@@ -241,12 +236,12 @@ void SessionHandler::completed(const SequenceSet& commands, bool timelyReply) {
}
void SessionHandler::knownCompleted(const SequenceSet& commands) {
- checkAttached();
+ CHECK_ATTACHED("session.known-completed");
getState()->receiverKnownCompleted(commands);
}
void SessionHandler::flush(bool expected, bool confirmed, bool completed) {
- checkAttached();
+ CHECK_ATTACHED("session.flush");
if (expected) {
SequenceSet expectSet;
if (getState()->hasState())
@@ -270,19 +265,19 @@ void SessionHandler::gap(const SequenceSet& /*commands*/) {
void SessionHandler::sendDetach()
{
- checkAttached();
+ CHECK_ATTACHED("session.sendDetach");
ignoring = true;
peer.detach(getState()->getId().getName());
}
void SessionHandler::sendCompletion() {
- checkAttached();
+ CHECK_ATTACHED("session.send-completion");
const SequenceSet& c = getState()->receiverGetUnknownComplete();
peer.completed(c, getState()->receiverNeedKnownCompleted());
}
void SessionHandler::sendAttach(bool force) {
- checkAttached();
+ CHECK_ATTACHED("session.send-attach");
QPID_LOG(debug, "SessionHandler::sendAttach attach id=" << getState()->getId());
peer.attach(getState()->getId().getName(), force);
if (getState()->hasState())
@@ -306,7 +301,7 @@ void SessionHandler::markReadyToSend() {
}
void SessionHandler::sendTimeout(uint32_t t) {
- checkAttached();
+ CHECK_ATTACHED("session.send-timeout");
peer.requestTimeout(t);
}