diff options
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp | 28 | ||||
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/SessionImpl.cpp | 34 | ||||
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/SessionImpl.h | 17 |
3 files changed, 44 insertions, 35 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp index 6acd0a3ced..42eceaf9f6 100644 --- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -43,15 +43,15 @@ void ReceiverImpl::received(qpid::messaging::Message&) window = capacity; } } - -qpid::messaging::Message ReceiverImpl::get(qpid::messaging::Duration timeout) + +qpid::messaging::Message ReceiverImpl::get(qpid::messaging::Duration timeout) { qpid::messaging::Message result; if (!get(result, timeout)) throw NoMessageAvailable(); return result; } - -qpid::messaging::Message ReceiverImpl::fetch(qpid::messaging::Duration timeout) + +qpid::messaging::Message ReceiverImpl::fetch(qpid::messaging::Duration timeout) { qpid::messaging::Message result; if (!fetch(result, timeout)) throw NoMessageAvailable(); @@ -72,8 +72,8 @@ bool ReceiverImpl::fetch(qpid::messaging::Message& message, qpid::messaging::Dur return f.result; } -void ReceiverImpl::close() -{ +void ReceiverImpl::close() +{ execute<Close>(); } @@ -143,10 +143,10 @@ uint32_t ReceiverImpl::getUnsettled() return parent->getUnsettledAcks(destination); } -ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name, - const qpid::messaging::Address& a) : +ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name, + const qpid::messaging::Address& a) : - parent(&p), destination(name), address(a), byteCredit(0xFFFFFFFF), + parent(&p), destination(name), address(a), byteCredit(0xFFFFFFFF), state(UNRESOLVED), capacity(0), window(0) {} bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout) @@ -188,11 +188,13 @@ bool ReceiverImpl::fetchImpl(qpid::messaging::Message& message, qpid::messaging: } } -void ReceiverImpl::closeImpl() -{ +void ReceiverImpl::closeImpl() +{ sys::Mutex::ScopedLock l(lock); if (state != CANCELLED) { state = CANCELLED; + session.messageStop(destination); + parent->releasePending(destination); source->cancel(session, destination); parent->receiverCancelled(destination); } diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index 6d98527627..75a71997fd 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -186,7 +186,7 @@ struct SessionImpl::CreateReceiver : Command { qpid::messaging::Receiver result; const qpid::messaging::Address& address; - + CreateReceiver(SessionImpl& i, const qpid::messaging::Address& a) : Command(i), address(a) {} void operator()() { result = impl.createReceiverImpl(address); } @@ -212,7 +212,7 @@ struct SessionImpl::CreateSender : Command { qpid::messaging::Sender result; const qpid::messaging::Address& address; - + CreateSender(SessionImpl& i, const qpid::messaging::Address& a) : Command(i), address(a) {} void operator()() { result = impl.createSenderImpl(address); } @@ -242,7 +242,7 @@ Sender SessionImpl::getSender(const std::string& name) const throw KeyError(name); } else { return i->second; - } + } } Receiver SessionImpl::getReceiver(const std::string& name) const @@ -296,8 +296,8 @@ bool SessionImpl::getNextReceiver(Receiver* receiver, IncomingMessages::MessageT } } -bool SessionImpl::accept(ReceiverImpl* receiver, - qpid::messaging::Message* message, +bool SessionImpl::accept(ReceiverImpl* receiver, + qpid::messaging::Message* message, IncomingMessages::MessageTransfer& transfer) { if (receiver->getName() == transfer.getDestination()) { @@ -359,7 +359,7 @@ bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messag } catch (const qpid::ConnectionException& e) { throw qpid::messaging::ConnectionError(e.what()); } catch (const qpid::ChannelException& e) { - throw qpid::messaging::MessagingException(e.what()); + throw qpid::messaging::MessagingException(e.what()); } } } @@ -385,7 +385,7 @@ struct SessionImpl::Receivable : Command { const std::string* destination; uint32_t result; - + Receivable(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {} void operator()() { result = impl.getReceivableImpl(destination); } }; @@ -414,7 +414,7 @@ struct SessionImpl::UnsettledAcks : Command { const std::string* destination; uint32_t result; - + UnsettledAcks(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {} void operator()() { result = impl.getUnsettledAcksImpl(destination); } }; @@ -451,10 +451,10 @@ void SessionImpl::rollbackImpl() getImplPtr<Receiver, ReceiverImpl>(i->second)->stop(); } //ensure that stop has been processed and all previously sent - //messages are available for release: + //messages are available for release: session.sync(); incoming.releaseAll(); - session.txRollback(); + session.txRollback(); for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) { getImplPtr<Receiver, ReceiverImpl>(i->second)->start(); @@ -495,6 +495,12 @@ void SessionImpl::receiverCancelled(const std::string& name) incoming.releasePending(name); } +void SessionImpl::releasePending(const std::string& name) +{ + ScopedLock l(lock); + incoming.releasePending(name); +} + void SessionImpl::senderCancelled(const std::string& name) { ScopedLock l(lock); @@ -503,12 +509,12 @@ void SessionImpl::senderCancelled(const std::string& name) void SessionImpl::reconnect() { - connection->open(); + connection->open(); } bool SessionImpl::backoff() { - return connection->backoff(); + return connection->backoff(); } qpid::messaging::Connection SessionImpl::getConnection() const diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/cpp/src/qpid/client/amqp0_10/SessionImpl.h index 3dd5cd0189..2a2aa47df6 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.h +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -79,8 +79,9 @@ class SessionImpl : public qpid::messaging::SessionImpl void checkError(); bool hasError(); - bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::messaging::Duration timeout); + bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::messaging::Duration timeout); + void releasePending(const std::string& destination); void receiverCancelled(const std::string& name); void senderCancelled(const std::string& name); @@ -110,7 +111,7 @@ class SessionImpl : public qpid::messaging::SessionImpl } catch (const qpid::ConnectionException& e) { throw qpid::messaging::ConnectionError(e.what()); } catch (const qpid::ChannelException& e) { - throw qpid::messaging::MessagingException(e.what()); + throw qpid::messaging::MessagingException(e.what()); } } @@ -206,11 +207,11 @@ class SessionImpl : public qpid::messaging::SessionImpl struct Acknowledge1 : Command { qpid::messaging::Message& message; - + Acknowledge1(SessionImpl& i, qpid::messaging::Message& m) : Command(i), message(m) {} void operator()() { impl.acknowledgeImpl(message); } }; - + struct CreateSender; struct CreateReceiver; struct UnsettledAcks; @@ -222,12 +223,12 @@ class SessionImpl : public qpid::messaging::SessionImpl F f(*this); return execute(f); } - + template <class F> void retry() { while (!execute<F>()) {} } - + template <class F, class P> bool execute1(P p) { F f(*this, p); |
