summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp28
-rw-r--r--cpp/src/qpid/client/amqp0_10/SessionImpl.cpp34
-rw-r--r--cpp/src/qpid/client/amqp0_10/SessionImpl.h17
3 files changed, 44 insertions, 35 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
index 6acd0a3ced..42eceaf9f6 100644
--- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -43,15 +43,15 @@ void ReceiverImpl::received(qpid::messaging::Message&)
window = capacity;
}
}
-
-qpid::messaging::Message ReceiverImpl::get(qpid::messaging::Duration timeout)
+
+qpid::messaging::Message ReceiverImpl::get(qpid::messaging::Duration timeout)
{
qpid::messaging::Message result;
if (!get(result, timeout)) throw NoMessageAvailable();
return result;
}
-
-qpid::messaging::Message ReceiverImpl::fetch(qpid::messaging::Duration timeout)
+
+qpid::messaging::Message ReceiverImpl::fetch(qpid::messaging::Duration timeout)
{
qpid::messaging::Message result;
if (!fetch(result, timeout)) throw NoMessageAvailable();
@@ -72,8 +72,8 @@ bool ReceiverImpl::fetch(qpid::messaging::Message& message, qpid::messaging::Dur
return f.result;
}
-void ReceiverImpl::close()
-{
+void ReceiverImpl::close()
+{
execute<Close>();
}
@@ -143,10 +143,10 @@ uint32_t ReceiverImpl::getUnsettled()
return parent->getUnsettledAcks(destination);
}
-ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name,
- const qpid::messaging::Address& a) :
+ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name,
+ const qpid::messaging::Address& a) :
- parent(&p), destination(name), address(a), byteCredit(0xFFFFFFFF),
+ parent(&p), destination(name), address(a), byteCredit(0xFFFFFFFF),
state(UNRESOLVED), capacity(0), window(0) {}
bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout)
@@ -188,11 +188,13 @@ bool ReceiverImpl::fetchImpl(qpid::messaging::Message& message, qpid::messaging:
}
}
-void ReceiverImpl::closeImpl()
-{
+void ReceiverImpl::closeImpl()
+{
sys::Mutex::ScopedLock l(lock);
if (state != CANCELLED) {
state = CANCELLED;
+ session.messageStop(destination);
+ parent->releasePending(destination);
source->cancel(session, destination);
parent->receiverCancelled(destination);
}
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
index 6d98527627..75a71997fd 100644
--- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -186,7 +186,7 @@ struct SessionImpl::CreateReceiver : Command
{
qpid::messaging::Receiver result;
const qpid::messaging::Address& address;
-
+
CreateReceiver(SessionImpl& i, const qpid::messaging::Address& a) :
Command(i), address(a) {}
void operator()() { result = impl.createReceiverImpl(address); }
@@ -212,7 +212,7 @@ struct SessionImpl::CreateSender : Command
{
qpid::messaging::Sender result;
const qpid::messaging::Address& address;
-
+
CreateSender(SessionImpl& i, const qpid::messaging::Address& a) :
Command(i), address(a) {}
void operator()() { result = impl.createSenderImpl(address); }
@@ -242,7 +242,7 @@ Sender SessionImpl::getSender(const std::string& name) const
throw KeyError(name);
} else {
return i->second;
- }
+ }
}
Receiver SessionImpl::getReceiver(const std::string& name) const
@@ -296,8 +296,8 @@ bool SessionImpl::getNextReceiver(Receiver* receiver, IncomingMessages::MessageT
}
}
-bool SessionImpl::accept(ReceiverImpl* receiver,
- qpid::messaging::Message* message,
+bool SessionImpl::accept(ReceiverImpl* receiver,
+ qpid::messaging::Message* message,
IncomingMessages::MessageTransfer& transfer)
{
if (receiver->getName() == transfer.getDestination()) {
@@ -359,7 +359,7 @@ bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messag
} catch (const qpid::ConnectionException& e) {
throw qpid::messaging::ConnectionError(e.what());
} catch (const qpid::ChannelException& e) {
- throw qpid::messaging::MessagingException(e.what());
+ throw qpid::messaging::MessagingException(e.what());
}
}
}
@@ -385,7 +385,7 @@ struct SessionImpl::Receivable : Command
{
const std::string* destination;
uint32_t result;
-
+
Receivable(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {}
void operator()() { result = impl.getReceivableImpl(destination); }
};
@@ -414,7 +414,7 @@ struct SessionImpl::UnsettledAcks : Command
{
const std::string* destination;
uint32_t result;
-
+
UnsettledAcks(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {}
void operator()() { result = impl.getUnsettledAcksImpl(destination); }
};
@@ -451,10 +451,10 @@ void SessionImpl::rollbackImpl()
getImplPtr<Receiver, ReceiverImpl>(i->second)->stop();
}
//ensure that stop has been processed and all previously sent
- //messages are available for release:
+ //messages are available for release:
session.sync();
incoming.releaseAll();
- session.txRollback();
+ session.txRollback();
for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) {
getImplPtr<Receiver, ReceiverImpl>(i->second)->start();
@@ -495,6 +495,12 @@ void SessionImpl::receiverCancelled(const std::string& name)
incoming.releasePending(name);
}
+void SessionImpl::releasePending(const std::string& name)
+{
+ ScopedLock l(lock);
+ incoming.releasePending(name);
+}
+
void SessionImpl::senderCancelled(const std::string& name)
{
ScopedLock l(lock);
@@ -503,12 +509,12 @@ void SessionImpl::senderCancelled(const std::string& name)
void SessionImpl::reconnect()
{
- connection->open();
+ connection->open();
}
bool SessionImpl::backoff()
{
- return connection->backoff();
+ return connection->backoff();
}
qpid::messaging::Connection SessionImpl::getConnection() const
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/cpp/src/qpid/client/amqp0_10/SessionImpl.h
index 3dd5cd0189..2a2aa47df6 100644
--- a/cpp/src/qpid/client/amqp0_10/SessionImpl.h
+++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -79,8 +79,9 @@ class SessionImpl : public qpid::messaging::SessionImpl
void checkError();
bool hasError();
- bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
+ bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
+ void releasePending(const std::string& destination);
void receiverCancelled(const std::string& name);
void senderCancelled(const std::string& name);
@@ -110,7 +111,7 @@ class SessionImpl : public qpid::messaging::SessionImpl
} catch (const qpid::ConnectionException& e) {
throw qpid::messaging::ConnectionError(e.what());
} catch (const qpid::ChannelException& e) {
- throw qpid::messaging::MessagingException(e.what());
+ throw qpid::messaging::MessagingException(e.what());
}
}
@@ -206,11 +207,11 @@ class SessionImpl : public qpid::messaging::SessionImpl
struct Acknowledge1 : Command
{
qpid::messaging::Message& message;
-
+
Acknowledge1(SessionImpl& i, qpid::messaging::Message& m) : Command(i), message(m) {}
void operator()() { impl.acknowledgeImpl(message); }
};
-
+
struct CreateSender;
struct CreateReceiver;
struct UnsettledAcks;
@@ -222,12 +223,12 @@ class SessionImpl : public qpid::messaging::SessionImpl
F f(*this);
return execute(f);
}
-
+
template <class F> void retry()
{
while (!execute<F>()) {}
}
-
+
template <class F, class P> bool execute1(P p)
{
F f(*this, p);