diff options
| author | Alan Conway <aconway@apache.org> | 2013-01-14 19:08:53 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2013-01-14 19:08:53 +0000 |
| commit | 418025c1dcbc9d450bd06c9fd697977ad5583542 (patch) | |
| tree | a5fc93459d14604f74e9b3c88214d62f66f8348f /qpid/cpp/src | |
| parent | 5c64719d6d4a78e037ad20f8b0f55ad92a9a42c5 (diff) | |
| download | qpid-python-418025c1dcbc9d450bd06c9fd697977ad5583542.tar.gz | |
QPID-4514: Clean up cluster code: SessionImpl
Clean up obsolete code in SessionImpl class used only by defunct cluster code:
- Remove SessionImpl::send reframe parameter.
- Remove doClearDeliveryPropertiesExchange flag.
- Remove disableAutoDetach and autoDetach flag.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1433055 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/client/SessionImpl.cpp | 108 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/client/SessionImpl.h | 19 |
2 files changed, 35 insertions, 92 deletions
diff --git a/qpid/cpp/src/qpid/client/SessionImpl.cpp b/qpid/cpp/src/qpid/client/SessionImpl.cpp index 91e728d5ae..01e614e041 100644 --- a/qpid/cpp/src/qpid/client/SessionImpl.cpp +++ b/qpid/cpp/src/qpid/client/SessionImpl.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -61,9 +61,7 @@ SessionImpl::SessionImpl(const std::string& name, boost::shared_ptr<ConnectionIm ioHandler(*this), proxy(ioHandler), nextIn(0), - nextOut(0), - doClearDeliveryPropertiesExchange(true), - autoDetach(true) + nextOut(0) { channel.next = connection.get(); } @@ -72,12 +70,10 @@ SessionImpl::~SessionImpl() { { Lock l(state); if (state != DETACHED && state != DETACHING) { - if (autoDetach) { - QPID_LOG(warning, "Session was not closed cleanly: " << id); - // Inform broker but don't wait for detached as that deadlocks. - // The detached will be ignored as the channel will be invalid. - try { detach(); } catch (...) {} // ignore errors. - } + QPID_LOG(warning, "Session was not closed cleanly: " << id); + // Inform broker but don't wait for detached as that deadlocks. + // The detached will be ignored as the channel will be invalid. + try { detach(); } catch (...) {} // ignore errors. setState(DETACHED); handleClosed(); state.waitWaiters(); @@ -136,10 +132,10 @@ void SessionImpl::resume(boost::shared_ptr<ConnectionImpl>) // user thread void SessionImpl::suspend() //user thread { Lock l(state); - detach(); + detach(); } -void SessionImpl::detach() //call with lock held +void SessionImpl::detach() //call with lock held { if (state == ATTACHED) { setState(DETACHING); @@ -149,8 +145,8 @@ void SessionImpl::detach() //call with lock held uint16_t SessionImpl::getChannel() const // user thread -{ - return channel; +{ + return channel; } void SessionImpl::setChannel(uint16_t c) // user thread @@ -182,7 +178,7 @@ void SessionImpl::waitForCompletionImpl(const SequenceNumber& id) //call with lo bool SessionImpl::isComplete(const SequenceNumber& id) { - Lock l(state); + Lock l(state); return !incompleteOut.contains(id); } @@ -219,7 +215,7 @@ framing::SequenceNumber SessionImpl::getCompleteUpTo() return --firstIncomplete; } -struct MarkCompleted +struct MarkCompleted { const SequenceNumber& id; SequenceSet& completedIn; @@ -230,7 +226,7 @@ struct MarkCompleted { if (id >= end) { completedIn.add(start, end); - } else if (id >= start) { + } else if (id >= start) { completedIn.add(start, id); } } @@ -244,13 +240,13 @@ void SessionImpl::markCompleted(const SequenceSet& ids, bool notifyPeer) completedIn.add(ids); if (notifyPeer) { sendCompletion(); - } + } } void SessionImpl::markCompleted(const SequenceNumber& id, bool cumulative, bool notifyPeer) { Lock l(state); - if (cumulative) { + if (cumulative) { //everything in incompleteIn less than or equal to id is now complete MarkCompleted f(id, completedIn); incompleteIn.for_each(f); @@ -260,11 +256,11 @@ void SessionImpl::markCompleted(const SequenceNumber& id, bool cumulative, bool incompleteIn.remove(completedIn); } else if (incompleteIn.contains(id)) { incompleteIn.remove(id); - completedIn.add(id); + completedIn.add(id); } if (notifyPeer) { sendCompletion(); - } + } } void SessionImpl::setException(const sys::ExceptionHolder& ex) { @@ -310,42 +306,24 @@ namespace { struct SendContentFn { FrameHandler& handler; void operator()(const AMQFrame& f) { - if (!f.getMethod()) + if (!f.getMethod()) handler(const_cast<AMQFrame&>(f)); } SendContentFn(FrameHandler& h) : handler(h) {} }; -// Adaptor to make FrameSet look like MethodContent; used in cluster update client -struct MethodContentAdaptor : MethodContent -{ - AMQHeaderBody header; - const std::string content; - - MethodContentAdaptor(const FrameSet& f) : header(*f.getHeaders()), content(f.getContent()) {} - - const AMQHeaderBody& getHeader() const - { - return header; - } - const std::string& getData() const - { - return content; - } -}; - } - -Future SessionImpl::send(const AMQBody& command, const FrameSet& content, bool reframe) { + +Future SessionImpl::send(const AMQBody& command, const FrameSet& content) { Acquire a(sendLock); SequenceNumber id = nextOut++; { Lock l(state); - checkOpen(); + checkOpen(); incompleteOut.add(id); } Future f(id); - if (command.getMethod()->resultExpected()) { + if (command.getMethod()->resultExpected()) { Lock l(state); //result listener must be set before the command is sent f.setFutureResult(results.listenForResult(id)); @@ -353,14 +331,8 @@ Future SessionImpl::send(const AMQBody& command, const FrameSet& content, bool r AMQFrame frame(command); frame.setEof(false); handleOut(frame); - - if (reframe) { - MethodContentAdaptor c(content); - sendContent(c); - } else { - SendContentFn send(out); - content.map(send); - } + SendContentFn send(out); + content.map(send); return f; } @@ -375,11 +347,11 @@ Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent* con SequenceNumber id = nextOut++; { Lock l(state); - checkOpen(); + checkOpen(); incompleteOut.add(id); } Future f(id); - if (command.getMethod()->resultExpected()) { + if (command.getMethod()->resultExpected()) { Lock l(state); //result listener must be set before the command is sent f.setFutureResult(results.listenForResult(id)); @@ -399,23 +371,13 @@ void SessionImpl::sendContent(const MethodContent& content) { AMQFrame header(content.getHeader()); - // doClearDeliveryPropertiesExchange is set by cluster update client so - // it can send messages with delivery-properties.exchange set. - // - if (doClearDeliveryPropertiesExchange) { - // Normal client is not allowed to set the delivery-properties.exchange - // so clear it here. - AMQHeaderBody* headerp = static_cast<AMQHeaderBody*>(header.getBody()); - if (headerp && headerp->get<DeliveryProperties>()) - headerp->get<DeliveryProperties>(true)->clearExchangeFlag(); - } header.setFirstSegment(false); uint64_t data_length = content.getData().length(); if(data_length > 0){ header.setLastSegment(false); - handleOut(header); + handleOut(header); /*Note: end of frame marker included in overhead but not in size*/ - const uint32_t frag_size = maxFrameSize - AMQFrame::frameOverhead(); + const uint32_t frag_size = maxFrameSize - AMQFrame::frameOverhead(); if(data_length < frag_size){ AMQFrame frame((AMQContentBody(content.getData()))); @@ -442,7 +404,7 @@ void SessionImpl::sendContent(const MethodContent& content) } } } else { - handleOut(header); + handleOut(header); } } @@ -462,7 +424,7 @@ bool isContentFrame(AMQFrame& frame) { AMQBody* body = frame.getBody(); uint8_t type = body->type(); - return type == HEADER_BODY || type == CONTENT_BODY || isMessageMethod(body); + return type == HEADER_BODY || type == CONTENT_BODY || isMessageMethod(body); } void SessionImpl::handleIn(AMQFrame& frame) // network thread @@ -585,7 +547,7 @@ void SessionImpl::timeout(uint32_t t) void SessionImpl::commandPoint(const framing::SequenceNumber& id, uint64_t offset) { if (offset) throw NotImplementedException("Non-zero byte offset not yet supported for command-point"); - + Lock l(state); nextIn = id; } @@ -677,10 +639,10 @@ void SessionImpl::exception(uint16_t errorCode, { Lock l(state); setExceptionLH(createSessionException(errorCode, description)); - QPID_LOG(warning, "Exception received from broker: " << exceptionHolder.what() + QPID_LOG(warning, "Exception received from broker: " << exceptionHolder.what() << " [caused by " << commandId << " " << classCode << ":" << commandCode << "]"); - if (detachedLifetime) + if (detachedLifetime) setTimeout(0); } @@ -748,6 +710,4 @@ boost::shared_ptr<ConnectionImpl> SessionImpl::getConnection() return connection; } -void SessionImpl::disableAutoDetach() { autoDetach = false; } - }} diff --git a/qpid/cpp/src/qpid/client/SessionImpl.h b/qpid/cpp/src/qpid/client/SessionImpl.h index 4f9213a00a..e6ea8e6b90 100644 --- a/qpid/cpp/src/qpid/client/SessionImpl.h +++ b/qpid/cpp/src/qpid/client/SessionImpl.h @@ -87,15 +87,7 @@ public: Future send(const framing::AMQBody& command); Future send(const framing::AMQBody& command, const framing::MethodContent& content); - /** - * This method takes the content as a FrameSet; if reframe=false, - * the caller is resposnible for ensuring that the header and - * content frames in that set are correct for this connection - * (right flags, right fragmentation etc). If reframe=true, then - * the header and content from the frameset will be copied and - * reframed correctly for the connection. - */ - QPID_CLIENT_EXTERN Future send(const framing::AMQBody& command, const framing::FrameSet& content, bool reframe=false); + QPID_CLIENT_EXTERN Future send(const framing::AMQBody& command, const framing::FrameSet& content); void sendRawFrame(framing::AMQFrame& frame); Demux& getDemux(); @@ -125,11 +117,6 @@ public: */ boost::shared_ptr<ConnectionImpl> getConnection(); - void setDoClearDeliveryPropertiesExchange(bool b=true) { doClearDeliveryPropertiesExchange = b; } - - /** Suppress sending detach in destructor. Used by cluster to build session state */ - void disableAutoDetach(); - private: enum State { INACTIVE, @@ -225,10 +212,6 @@ private: SessionState sessionState; - bool doClearDeliveryPropertiesExchange; - - bool autoDetach; - friend class client::SessionHandler; }; |
