diff options
Diffstat (limited to 'cpp/src/qpid/client')
| -rw-r--r-- | cpp/src/qpid/client/Completion.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Connector.cpp | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/client/SessionCore.cpp | 23 | ||||
| -rw-r--r-- | cpp/src/qpid/client/SessionCore.h | 1 |
4 files changed, 25 insertions, 7 deletions
diff --git a/cpp/src/qpid/client/Completion.h b/cpp/src/qpid/client/Completion.h index a126bc9766..4d324aaf28 100644 --- a/cpp/src/qpid/client/Completion.h +++ b/cpp/src/qpid/client/Completion.h @@ -36,6 +36,8 @@ protected: shared_ptr<SessionCore> session; public: + Completion() {} + Completion(Future f, shared_ptr<SessionCore> s) : future(f), session(s) {} void sync() diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp index 80d97b10aa..497288bc3f 100644 --- a/cpp/src/qpid/client/Connector.cpp +++ b/cpp/src/qpid/client/Connector.cpp @@ -108,7 +108,7 @@ void Connector::send(AMQFrame& frame){ writeFrameQueue.push(frame); aio->queueWrite(); - QPID_LOG(trace, "SENT: " << frame); + QPID_LOG(trace, "SENT [" << this << "]: " << frame); } void Connector::handleClosed() { @@ -180,8 +180,8 @@ void Connector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) { AMQFrame frame; while(frame.decode(in)){ - QPID_LOG(trace, "RECV: " << frame); - input->received(frame); + QPID_LOG(trace, "RECV [" << this << "]: " << frame); + input->received(frame); } // TODO: unreading needs to go away, and when we can cope // with multiple sub-buffers in the general buffer scheme, it will diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp index 8eab54fa62..3a26734892 100644 --- a/cpp/src/qpid/client/SessionCore.cpp +++ b/cpp/src/qpid/client/SessionCore.cpp @@ -87,7 +87,6 @@ inline void SessionCore::waitFor(State s) { // We can be CLOSED or SUSPENDED by error at any time. state.waitFor(States(s, CLOSED, SUSPENDED)); check(); - assert(state==s); invariant(); } @@ -97,7 +96,8 @@ SessionCore::SessionCore(shared_ptr<ConnectionImpl> conn, sync(false), channel(ch), proxy(channel), - state(OPENING) + state(OPENING), + detachedLifetime(0) { l3.out = &out; attaching(conn); @@ -166,10 +166,11 @@ FrameSet::shared_ptr SessionCore::get() { // user thread static const std::string CANNOT_REOPEN_SESSION="Cannot re-open a session."; -void SessionCore::open(uint32_t detachedLifetime) { // user thread +void SessionCore::open(uint32_t timeout) { // user thread Lock l(state); check(state==OPENING && !session, COMMAND_INVALID, CANNOT_REOPEN_SESSION); + detachedLifetime=timeout; proxy.open(detachedLifetime); waitFor(OPEN); } @@ -364,8 +365,22 @@ Future SessionCore::send(const AMQBody& command, const MethodContent& content) return Future(l3.send(command, content)); } +namespace { +bool isCloseResponse(const AMQFrame& frame) { + return frame.getMethod() && + frame.getMethod()->amqpClassId() == SESSION_CLASS_ID && + frame.getMethod()->amqpMethodId() == SESSION_CLOSED_METHOD_ID; +} +} + // Network thread. void SessionCore::handleIn(AMQFrame& frame) { + { + Lock l(state); + // Ignore frames received while closing other than closed response. + if (state==CLOSING && !isCloseResponse(frame)) + return; + } try { // Cast to expose private SessionHandler functions. if (!invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) { @@ -382,7 +397,7 @@ void SessionCore::handleOut(AMQFrame& frame) { Lock l(state); if (state==OPEN) { - if (session->sent(frame)) + if (detachedLifetime > 0 && session->sent(frame)) proxy.solicitAck(); channel.handle(frame); } diff --git a/cpp/src/qpid/client/SessionCore.h b/cpp/src/qpid/client/SessionCore.h index 38c72359a3..2bb0f41fbf 100644 --- a/cpp/src/qpid/client/SessionCore.h +++ b/cpp/src/qpid/client/SessionCore.h @@ -133,6 +133,7 @@ class SessionCore : public framing::FrameHandler::InOutHandler, framing::ChannelHandler channel; framing::AMQP_ServerProxy::Session proxy; mutable StateMonitor state; + uint32_t detachedLifetime; }; }} // namespace qpid::client |
