diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /cpp/src/qpid/client/SessionImpl.cpp | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-asyncstore.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/SessionImpl.cpp')
-rw-r--r-- | cpp/src/qpid/client/SessionImpl.cpp | 108 |
1 files changed, 34 insertions, 74 deletions
diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp index 91e728d5ae..01e614e041 100644 --- a/cpp/src/qpid/client/SessionImpl.cpp +++ b/cpp/src/qpid/client/SessionImpl.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -61,9 +61,7 @@ SessionImpl::SessionImpl(const std::string& name, boost::shared_ptr<ConnectionIm ioHandler(*this), proxy(ioHandler), nextIn(0), - nextOut(0), - doClearDeliveryPropertiesExchange(true), - autoDetach(true) + nextOut(0) { channel.next = connection.get(); } @@ -72,12 +70,10 @@ SessionImpl::~SessionImpl() { { Lock l(state); if (state != DETACHED && state != DETACHING) { - if (autoDetach) { - QPID_LOG(warning, "Session was not closed cleanly: " << id); - // Inform broker but don't wait for detached as that deadlocks. - // The detached will be ignored as the channel will be invalid. - try { detach(); } catch (...) {} // ignore errors. - } + QPID_LOG(warning, "Session was not closed cleanly: " << id); + // Inform broker but don't wait for detached as that deadlocks. + // The detached will be ignored as the channel will be invalid. + try { detach(); } catch (...) {} // ignore errors. setState(DETACHED); handleClosed(); state.waitWaiters(); @@ -136,10 +132,10 @@ void SessionImpl::resume(boost::shared_ptr<ConnectionImpl>) // user thread void SessionImpl::suspend() //user thread { Lock l(state); - detach(); + detach(); } -void SessionImpl::detach() //call with lock held +void SessionImpl::detach() //call with lock held { if (state == ATTACHED) { setState(DETACHING); @@ -149,8 +145,8 @@ void SessionImpl::detach() //call with lock held uint16_t SessionImpl::getChannel() const // user thread -{ - return channel; +{ + return channel; } void SessionImpl::setChannel(uint16_t c) // user thread @@ -182,7 +178,7 @@ void SessionImpl::waitForCompletionImpl(const SequenceNumber& id) //call with lo bool SessionImpl::isComplete(const SequenceNumber& id) { - Lock l(state); + Lock l(state); return !incompleteOut.contains(id); } @@ -219,7 +215,7 @@ framing::SequenceNumber SessionImpl::getCompleteUpTo() return --firstIncomplete; } -struct MarkCompleted +struct MarkCompleted { const SequenceNumber& id; SequenceSet& completedIn; @@ -230,7 +226,7 @@ struct MarkCompleted { if (id >= end) { completedIn.add(start, end); - } else if (id >= start) { + } else if (id >= start) { completedIn.add(start, id); } } @@ -244,13 +240,13 @@ void SessionImpl::markCompleted(const SequenceSet& ids, bool notifyPeer) completedIn.add(ids); if (notifyPeer) { sendCompletion(); - } + } } void SessionImpl::markCompleted(const SequenceNumber& id, bool cumulative, bool notifyPeer) { Lock l(state); - if (cumulative) { + if (cumulative) { //everything in incompleteIn less than or equal to id is now complete MarkCompleted f(id, completedIn); incompleteIn.for_each(f); @@ -260,11 +256,11 @@ void SessionImpl::markCompleted(const SequenceNumber& id, bool cumulative, bool incompleteIn.remove(completedIn); } else if (incompleteIn.contains(id)) { incompleteIn.remove(id); - completedIn.add(id); + completedIn.add(id); } if (notifyPeer) { sendCompletion(); - } + } } void SessionImpl::setException(const sys::ExceptionHolder& ex) { @@ -310,42 +306,24 @@ namespace { struct SendContentFn { FrameHandler& handler; void operator()(const AMQFrame& f) { - if (!f.getMethod()) + if (!f.getMethod()) handler(const_cast<AMQFrame&>(f)); } SendContentFn(FrameHandler& h) : handler(h) {} }; -// Adaptor to make FrameSet look like MethodContent; used in cluster update client -struct MethodContentAdaptor : MethodContent -{ - AMQHeaderBody header; - const std::string content; - - MethodContentAdaptor(const FrameSet& f) : header(*f.getHeaders()), content(f.getContent()) {} - - const AMQHeaderBody& getHeader() const - { - return header; - } - const std::string& getData() const - { - return content; - } -}; - } - -Future SessionImpl::send(const AMQBody& command, const FrameSet& content, bool reframe) { + +Future SessionImpl::send(const AMQBody& command, const FrameSet& content) { Acquire a(sendLock); SequenceNumber id = nextOut++; { Lock l(state); - checkOpen(); + checkOpen(); incompleteOut.add(id); } Future f(id); - if (command.getMethod()->resultExpected()) { + if (command.getMethod()->resultExpected()) { Lock l(state); //result listener must be set before the command is sent f.setFutureResult(results.listenForResult(id)); @@ -353,14 +331,8 @@ Future SessionImpl::send(const AMQBody& command, const FrameSet& content, bool r AMQFrame frame(command); frame.setEof(false); handleOut(frame); - - if (reframe) { - MethodContentAdaptor c(content); - sendContent(c); - } else { - SendContentFn send(out); - content.map(send); - } + SendContentFn send(out); + content.map(send); return f; } @@ -375,11 +347,11 @@ Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent* con SequenceNumber id = nextOut++; { Lock l(state); - checkOpen(); + checkOpen(); incompleteOut.add(id); } Future f(id); - if (command.getMethod()->resultExpected()) { + if (command.getMethod()->resultExpected()) { Lock l(state); //result listener must be set before the command is sent f.setFutureResult(results.listenForResult(id)); @@ -399,23 +371,13 @@ void SessionImpl::sendContent(const MethodContent& content) { AMQFrame header(content.getHeader()); - // doClearDeliveryPropertiesExchange is set by cluster update client so - // it can send messages with delivery-properties.exchange set. - // - if (doClearDeliveryPropertiesExchange) { - // Normal client is not allowed to set the delivery-properties.exchange - // so clear it here. - AMQHeaderBody* headerp = static_cast<AMQHeaderBody*>(header.getBody()); - if (headerp && headerp->get<DeliveryProperties>()) - headerp->get<DeliveryProperties>(true)->clearExchangeFlag(); - } header.setFirstSegment(false); uint64_t data_length = content.getData().length(); if(data_length > 0){ header.setLastSegment(false); - handleOut(header); + handleOut(header); /*Note: end of frame marker included in overhead but not in size*/ - const uint32_t frag_size = maxFrameSize - AMQFrame::frameOverhead(); + const uint32_t frag_size = maxFrameSize - AMQFrame::frameOverhead(); if(data_length < frag_size){ AMQFrame frame((AMQContentBody(content.getData()))); @@ -442,7 +404,7 @@ void SessionImpl::sendContent(const MethodContent& content) } } } else { - handleOut(header); + handleOut(header); } } @@ -462,7 +424,7 @@ bool isContentFrame(AMQFrame& frame) { AMQBody* body = frame.getBody(); uint8_t type = body->type(); - return type == HEADER_BODY || type == CONTENT_BODY || isMessageMethod(body); + return type == HEADER_BODY || type == CONTENT_BODY || isMessageMethod(body); } void SessionImpl::handleIn(AMQFrame& frame) // network thread @@ -585,7 +547,7 @@ void SessionImpl::timeout(uint32_t t) void SessionImpl::commandPoint(const framing::SequenceNumber& id, uint64_t offset) { if (offset) throw NotImplementedException("Non-zero byte offset not yet supported for command-point"); - + Lock l(state); nextIn = id; } @@ -677,10 +639,10 @@ void SessionImpl::exception(uint16_t errorCode, { Lock l(state); setExceptionLH(createSessionException(errorCode, description)); - QPID_LOG(warning, "Exception received from broker: " << exceptionHolder.what() + QPID_LOG(warning, "Exception received from broker: " << exceptionHolder.what() << " [caused by " << commandId << " " << classCode << ":" << commandCode << "]"); - if (detachedLifetime) + if (detachedLifetime) setTimeout(0); } @@ -748,6 +710,4 @@ boost::shared_ptr<ConnectionImpl> SessionImpl::getConnection() return connection; } -void SessionImpl::disableAutoDetach() { autoDetach = false; } - }} |