summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SessionHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SessionHandler.cpp')
-rw-r--r--cpp/src/qpid/broker/SessionHandler.cpp24
1 files changed, 20 insertions, 4 deletions
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp
index bbdbccad7d..fb46cb522d 100644
--- a/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/cpp/src/qpid/broker/SessionHandler.cpp
@@ -23,6 +23,7 @@
#include "Connection.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/constants.h"
+#include "qpid/framing/ClientInvoker.h"
#include "qpid/framing/ServerInvoker.h"
#include "qpid/log/Statement.h"
@@ -57,17 +58,19 @@ void SessionHandler::handleIn(AMQFrame& f) {
//
AMQMethodBody* m = f.getBody()->getMethod();
try {
- if (m && invoke(*this, *m))
+ if (m && invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m)) {
return;
- else if (session.get()) {
+ } else if (session.get()) {
boost::optional<SequenceNumber> ack=session->received(f);
session->in.handle(f);
if (ack)
peerSession.ack(*ack, SequenceNumberSet());
- }
- else if (!ignoring)
+ } else if (m && invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) {
+ return;
+ } else if (!ignoring) {
throw ChannelErrorException(
QPID_MSG("Channel " << channel.get() << " is not open"));
+ }
} catch(const ChannelException& e) {
ignoring=true; // Ignore trailing frames sent by client.
session->detach();
@@ -188,4 +191,17 @@ void SessionHandler::solicitAck() {
peerSession.ack(session->sendingAck(), SequenceNumberSet());
}
+void SessionHandler::attached(const Uuid& /*sessionId*/, uint32_t detachedLifetime)
+{
+ std::auto_ptr<SessionState> state(
+ connection.broker.getSessionManager().open(*this, detachedLifetime));
+ session.reset(state.release());
+}
+
+void SessionHandler::detached()
+{
+ connection.broker.getSessionManager().suspend(session);
+ session.reset();
+}
+
}} // namespace qpid::broker