diff options
Diffstat (limited to 'cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 32 |
1 files changed, 24 insertions, 8 deletions
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 64d62934b9..3c6bed4344 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -25,6 +25,9 @@ #include "SemanticHandler.h" #include "SessionManager.h" #include "SessionHandler.h" +#include "qpid/framing/AMQContentBody.h" +#include "qpid/framing/AMQHeaderBody.h" +#include "qpid/framing/AMQMethodBody.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/ServerInvoker.h" @@ -182,7 +185,7 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, SequenceNumber& completed.add(id); if (!invocation.wasHandled()) { - throw NotImplementedException("Not implemented"); + throw NotImplementedException(QPID_MSG("Not implemented: " << *method)); } else if (invocation.hasResult()) { nextOut++;//execution result is now a command, so the counter must be incremented getProxy().getExecution010().result(id, invocation.getResult()); @@ -206,6 +209,14 @@ void SessionState::handleContent(AMQFrame& frame, SequenceNumber& id) } msgBuilder.handle(frame); if (frame.getEof() && frame.getEos()) {//end of frameset + if (frame.getBof()) { + //i.e this is a just a command frame, add a dummy header + AMQFrame header; + header.setBody(AMQHeaderBody()); + header.setBof(false); + header.setEof(false); + msg->getFrames().append(header); + } msg->setPublisher(&getConnection()); semanticState.handle(msg); msgBuilder.end(); @@ -242,13 +253,14 @@ void SessionState::handle(AMQFrame& frame) SequenceNumber commandId; try { //TODO: make command handling more uniform, regardless of whether - //commands carry content. (For now, assume all single frame - //assemblies are non-content bearing and all content-bearing - //assemblies will have more than one frame): - if (frame.getBof() && frame.getEof()) { - handleCommand(frame.getMethod(), commandId); - } else { + //commands carry content. + AMQMethodBody* m = frame.getMethod(); + if (m == 0 || m->isContentBearing()) { handleContent(frame, commandId); + } else if (frame.getBof() && frame.getEof()) { + handleCommand(frame.getMethod(), commandId); + } else { + throw InternalErrorException("Cannot handle multi-frame command segments yet"); } } catch(const SessionException& e) { //TODO: better implementation of new exception handling mechanism @@ -263,7 +275,11 @@ void SessionState::handle(AMQFrame& frame) } else { getProxy().getExecution010().exception(e.code, commandId, 0, 0, 0, e.what(), FieldTable()); } - handler->destroy(); + timeout = 0; + //The python client doesn't currently detach on receiving an exception + //so the session state isn't destroyed. This is a temporary workaround + //until that is addressed + adapter.destroyExclusiveQueues(); } } |