summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2009-04-10 23:46:19 +0000
committerMartin Ritchie <ritchiem@apache.org>2009-04-10 23:46:19 +0000
commitdf547bcb4497395ed5536ffd72b713d87187e97f (patch)
tree09db278d02b60620b1e7f4fcc5f5944ec2ab5dd4 /java/client/src
parente861aa8025d2138682083ce91f0646d5ac2fb5fc (diff)
downloadqpid-python-df547bcb4497395ed5536ffd72b713d87187e97f.tar.gz
QPID-1779 : Application of patches attached to JIRA. Should address connection close issues experienced on 0-8/9 branch
Excluded test from TCP runs as it is hardwired to InVM. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@764109 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java10
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java22
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/Closeable.java18
4 files changed, 43 insertions, 12 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 5c48d73e43..c09b05bda8 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -58,6 +58,7 @@ import org.apache.qpid.AMQConnectionFailureException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQProtocolException;
import org.apache.qpid.AMQUnresolvedAddressException;
+import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.client.configuration.ClientProperties;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
@@ -924,7 +925,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
if (!_closed.getAndSet(true))
{
- doClose(sessions, timeout);
+ _closing.set(true);
+ try{
+ doClose(sessions, timeout);
+ }finally{
+ _closing.set(false);
+ }
}
}
@@ -1318,7 +1324,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
// in the case of an IOException, MINA has closed the protocol session so we set _closed to true
// so that any generic client code that tries to close the connection will not mess up this error
// handling sequence
- if (cause instanceof IOException)
+ if (cause instanceof IOException || cause instanceof AMQDisconnectedException)
{
closer = !_closed.getAndSet(true);
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index b632c56708..2782505191 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -282,7 +282,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
/** Holds the dispatcher thread for this session. */
protected Dispatcher _dispatcher;
-
+
protected Thread _dispatcherThread;
/** Holds the message factory factory for this session. */
@@ -644,7 +644,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
try
{
- sendClose(timeout);
+ // IF we are closing then send the close.
+ if (_connection.isClosing())
+ {
+ sendClose(timeout);
+ }
}
catch (AMQException e)
{
@@ -1219,9 +1223,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// this is done so that we can produce to a temporary queue before we create a consumer
result.setQueueName(result.getRoutingKey());
- createQueue(result.getAMQQueueName(), result.isAutoDelete(),
+ createQueue(result.getAMQQueueName(), result.isAutoDelete(),
result.isDurable(), result.isExclusive());
- bindQueue(result.getAMQQueueName(), result.getRoutingKey(),
+ bindQueue(result.getAMQQueueName(), result.getRoutingKey(),
new FieldTable(), result.getExchangeName(), result);
return result;
}
@@ -1683,11 +1687,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// if (rawSelector != null)
// ft.put("headers", rawSelector.getDataAsBytes());
// rawSelector is used by HeadersExchange and is not a JMS Selector
- if (rawSelector != null)
+ if (rawSelector != null)
{
ft.addAll(rawSelector);
}
-
+
if (messageSelector != null)
{
ft.put(new AMQShortString("x-filter-jms-selector"), messageSelector);
@@ -1937,13 +1941,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
_dispatcher = new Dispatcher();
try
{
- _dispatcherThread = Threading.getThreadFactory().createThread(_dispatcher);
-
+ _dispatcherThread = Threading.getThreadFactory().createThread(_dispatcher);
+
}
catch(Exception e)
{
throw new Error("Error creating Dispatcher thread",e);
- }
+ }
_dispatcherThread.setName("Dispatcher-Channel-" + _channelId);
_dispatcherThread.setDaemon(true);
_dispatcher.setConnectionStopped(initiallyStopped);
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 2bb443a090..a5f5e5f5fa 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -563,7 +563,10 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
{
try
{
- sendCancel();
+ if (!_connection.isClosing())
+ {
+ sendCancel();
+ }
}
catch (AMQException e)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/Closeable.java b/java/client/src/main/java/org/apache/qpid/client/Closeable.java
index 7e119343a1..e6771e122c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/Closeable.java
+++ b/java/client/src/main/java/org/apache/qpid/client/Closeable.java
@@ -52,6 +52,13 @@ public abstract class Closeable
protected final AtomicBoolean _closed = new AtomicBoolean(false);
/**
+ * Are we in the process of closing. We have this distinction so we can
+ * still signal we are in the process of closing so other objects can tell
+ * the difference and tidy up.
+ */
+ protected final AtomicBoolean _closing = new AtomicBoolean(false);
+
+ /**
* Checks if this is closed, and raises a JMSException if it is.
*
* @throws JMSException If this is closed.
@@ -75,6 +82,17 @@ public abstract class Closeable
}
/**
+ * Checks if this is closis.
+ *
+ * @return <tt>true</tt> if we are closing, <tt>false</tt> otherwise.
+ */
+ public boolean isClosing()
+ {
+ return _closing.get();
+ }
+
+
+ /**
* Closes this object.
*
* @throws JMSException If this cannot be closed for any reason.