summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SessionState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp32
1 files changed, 24 insertions, 8 deletions
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 64d62934b9..3c6bed4344 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -25,6 +25,9 @@
#include "SemanticHandler.h"
#include "SessionManager.h"
#include "SessionHandler.h"
+#include "qpid/framing/AMQContentBody.h"
+#include "qpid/framing/AMQHeaderBody.h"
+#include "qpid/framing/AMQMethodBody.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/ServerInvoker.h"
@@ -182,7 +185,7 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, SequenceNumber&
completed.add(id);
if (!invocation.wasHandled()) {
- throw NotImplementedException("Not implemented");
+ throw NotImplementedException(QPID_MSG("Not implemented: " << *method));
} else if (invocation.hasResult()) {
nextOut++;//execution result is now a command, so the counter must be incremented
getProxy().getExecution010().result(id, invocation.getResult());
@@ -206,6 +209,14 @@ void SessionState::handleContent(AMQFrame& frame, SequenceNumber& id)
}
msgBuilder.handle(frame);
if (frame.getEof() && frame.getEos()) {//end of frameset
+ if (frame.getBof()) {
+ //i.e this is a just a command frame, add a dummy header
+ AMQFrame header;
+ header.setBody(AMQHeaderBody());
+ header.setBof(false);
+ header.setEof(false);
+ msg->getFrames().append(header);
+ }
msg->setPublisher(&getConnection());
semanticState.handle(msg);
msgBuilder.end();
@@ -242,13 +253,14 @@ void SessionState::handle(AMQFrame& frame)
SequenceNumber commandId;
try {
//TODO: make command handling more uniform, regardless of whether
- //commands carry content. (For now, assume all single frame
- //assemblies are non-content bearing and all content-bearing
- //assemblies will have more than one frame):
- if (frame.getBof() && frame.getEof()) {
- handleCommand(frame.getMethod(), commandId);
- } else {
+ //commands carry content.
+ AMQMethodBody* m = frame.getMethod();
+ if (m == 0 || m->isContentBearing()) {
handleContent(frame, commandId);
+ } else if (frame.getBof() && frame.getEof()) {
+ handleCommand(frame.getMethod(), commandId);
+ } else {
+ throw InternalErrorException("Cannot handle multi-frame command segments yet");
}
} catch(const SessionException& e) {
//TODO: better implementation of new exception handling mechanism
@@ -263,7 +275,11 @@ void SessionState::handle(AMQFrame& frame)
} else {
getProxy().getExecution010().exception(e.code, commandId, 0, 0, 0, e.what(), FieldTable());
}
- handler->destroy();
+ timeout = 0;
+ //The python client doesn't currently detach on receiving an exception
+ //so the session state isn't destroyed. This is a temporary workaround
+ //until that is addressed
+ adapter.destroyExclusiveQueues();
}
}