summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/ClientChannel.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-07-05 09:47:07 +0000
committerGordon Sim <gsim@apache.org>2007-07-05 09:47:07 +0000
commit07c8c499649c725a226eeda3e0bfe58fa8ba984c (patch)
tree0f71fc80b9e6e9929184334f4dc7d8fc03f7ccc0 /cpp/src/qpid/client/ClientChannel.cpp
parentd4be469092c558ca9031d82b963b8b845fa1e1bd (diff)
downloadqpid-python-07c8c499649c725a226eeda3e0bfe58fa8ba984c.tar.gz
Fix for QPID-534. Get now detects closure correctly. Also fixed broker to allow channel.close-ok (and fixed client to send it).
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@553441 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/ClientChannel.cpp')
-rw-r--r--cpp/src/qpid/client/ClientChannel.cpp46
1 files changed, 35 insertions, 11 deletions
diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp
index 0cb0931155..ab6b9a41c3 100644
--- a/cpp/src/qpid/client/ClientChannel.cpp
+++ b/cpp/src/qpid/client/ClientChannel.cpp
@@ -40,7 +40,7 @@ using namespace qpid::framing;
using namespace qpid::sys;
Channel::Channel(bool _transactional, u_int16_t _prefetch, InteropMode mode) :
- connection(0), prefetch(_prefetch), transactional(_transactional)
+ connection(0), prefetch(_prefetch), transactional(_transactional), errorCode(200), errorText("Ok"), running(false)
{
switch (mode) {
case AMQP_08: messaging.reset(new BasicMessageChannel(*this)); break;
@@ -50,7 +50,8 @@ Channel::Channel(bool _transactional, u_int16_t _prefetch, InteropMode mode) :
}
Channel::~Channel(){
- close();
+ closeInternal();
+ stop();
}
void Channel::open(ChannelId id, Connection& con)
@@ -119,7 +120,10 @@ void Channel::protocolInit(
}
}
-bool Channel::isOpen() const { return connection; }
+bool Channel::isOpen() const {
+ Mutex::ScopedLock l(lock);
+ return connection;
+}
void Channel::setQos() {
messaging->setQos();
@@ -187,7 +191,7 @@ void Channel::rollback(){
}
void Channel::handleMethodInContext(
- AMQMethodBody::shared_ptr method, const MethodContext&)
+AMQMethodBody::shared_ptr method, const MethodContext& ctxt)
{
// Special case for consume OK as it is both an expected response
// and needs handling in this thread.
@@ -204,7 +208,7 @@ void Channel::handleMethodInContext(
switch (method->amqpClassId()) {
case MessageOkBody::CLASS_ID:
case BasicGetOkBody::CLASS_ID: messaging->handle(method); break;
- case ChannelCloseBody::CLASS_ID: handleChannel(method); break;
+ case ChannelCloseBody::CLASS_ID: handleChannel(method, ctxt); break;
case ConnectionCloseBody::CLASS_ID: handleConnection(method); break;
default: throw UnknownMethod();
}
@@ -216,9 +220,10 @@ void Channel::handleMethodInContext(
}
}
-void Channel::handleChannel(AMQMethodBody::shared_ptr method) {
+void Channel::handleChannel(AMQMethodBody::shared_ptr method, const MethodContext& ctxt) {
switch (method->amqpMethodId()) {
case ChannelCloseBody::METHOD_ID:
+ send(new ChannelCloseOkBody(version, ctxt.getRequestId()));
peerClose(shared_polymorphic_downcast<ChannelCloseBody>(method));
return;
case ChannelFlowBody::METHOD_ID:
@@ -249,6 +254,7 @@ void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
}
void Channel::start(){
+ running = true;
dispatcher = Thread(*messaging);
}
@@ -260,6 +266,8 @@ void Channel::close(
if (isOpen()) {
try {
if (getId() != 0) {
+ if (code == 200) messaging->cancelAll();
+
sendAndReceive<ChannelCloseOkBody>(
make_shared_ptr(new ChannelCloseBody(
version, code, text, classId, methodId)));
@@ -272,23 +280,35 @@ void Channel::close(
throw;
}
}
+ stop();
}
// Channel closed by peer.
-void Channel::peerClose(ChannelCloseBody::shared_ptr) {
+void Channel::peerClose(ChannelCloseBody::shared_ptr reason) {
assert(isOpen());
+ //record reason:
+ errorCode = reason->getReplyCode();
+ errorText = reason->getReplyText();
closeInternal();
}
void Channel::closeInternal() {
- if (isOpen());
+ Mutex::ScopedLock l(lock);
+ if (connection);
{
- messaging->close();
connection = 0;
+ messaging->close();
// A 0 response means we are closed.
responses.signalResponse(AMQMethodBody::shared_ptr());
}
- dispatcher.join();
+}
+
+void Channel::stop() {
+ Mutex::ScopedLock l(stopLock);
+ if(running) {
+ dispatcher.join();
+ running = false;
+ }
}
AMQMethodBody::shared_ptr Channel::sendAndReceive(
@@ -321,7 +341,11 @@ void Channel::cancel(const std::string& tag, bool synch) {
}
bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) {
- return messaging->get(msg, queue, ackMode);
+ bool result = messaging->get(msg, queue, ackMode);
+ if (!isOpen()) {
+ throw ChannelException(errorCode, errorText);
+ }
+ return result;
}
void Channel::publish(const Message& msg, const Exchange& exchange,