summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2015-02-13 17:01:59 +0000
committerKeith Wall <kwall@apache.org>2015-02-13 17:01:59 +0000
commitfbb2c460dfa60e63712f616a3e45c75c9735d5c7 (patch)
tree44e8662a0492e8faa11eb84c6da9e5337ae24fd3
parent8d84886a1324a42db1992a4d567487821894d691 (diff)
downloadqpid-python-fbb2c460dfa60e63712f616a3e45c75c9735d5c7.tar.gz
QPID-6374: [Java Broker] 0-10 Failover: the thread performing the failover prep now syncs the dispatch queue (avoids possibility of app level dead lock)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1659605 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java47
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java24
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java5
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java44
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java7
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java53
9 files changed, 140 insertions, 46 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 4c596b88a0..8e7b5b90d8 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -1216,11 +1216,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
return _failoverMutex;
}
- public void failoverPrep()
- {
- _delegate.failoverPrep();
- }
-
public void resubscribeSessions() throws JMSException, AMQException, FailoverException
{
_delegate.resubscribeSessions();
@@ -1653,4 +1648,46 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
return _messageCompressionThresholdSize;
}
+
+ void doWithAllLocks(Runnable r)
+ {
+ doWithAllLocks(r, _sessions.values());
+
+ }
+
+ private void doWithAllLocks(final Runnable r, final List<AMQSession> sessions)
+ {
+ if (!sessions.isEmpty())
+ {
+ AMQSession session = sessions.remove(0);
+
+ final Object dispatcherLock = session.getDispatcherLock();
+ if (dispatcherLock != null)
+ {
+ synchronized (dispatcherLock)
+ {
+ synchronized (session.getMessageDeliveryLock())
+ {
+ doWithAllLocks(r, sessions);
+ }
+ }
+ }
+ else
+ {
+ synchronized (session.getMessageDeliveryLock())
+ {
+ doWithAllLocks(r, sessions);
+ }
+ }
+ }
+ else
+ {
+ synchronized (getFailoverMutex())
+ {
+ r.run();
+ }
+ }
+ }
+
+
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
index 74ca1ed74f..c359fbcc84 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
@@ -52,8 +52,6 @@ public interface AMQConnectionDelegate
XASession createXASession(int ackMode) throws JMSException;
- void failoverPrep();
-
void resubscribeSessions() throws JMSException, AMQException, FailoverException;
void closeConnection(long timeout) throws JMSException, AMQException;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index fdeab7ae70..e22a341205 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
@@ -249,7 +250,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
List<AMQSession> sessions = new ArrayList<AMQSession>(_conn.getSessions().values());
for (AMQSession s : sessions)
{
- s.failoverPrep();
+ ((AMQSession_0_10)s).failoverPrep();
}
}
@@ -306,16 +307,21 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
_qpidConnection.notifyFailoverRequired();
- synchronized (_conn.getFailoverMutex())
+ final AtomicBoolean failoverDone = new AtomicBoolean();
+
+ _conn.doWithAllLocks(new Runnable()
{
+ @Override
+ public void run()
+ {
try
{
if (_conn.firePreFailover(false) && _conn.attemptReconnection())
{
- _conn.failoverPrep();
+ failoverPrep();
_conn.resubscribeSessions();
_conn.fireFailoverComplete();
- return;
+ failoverDone.set(true);
}
}
catch (Exception e)
@@ -327,9 +333,19 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
_conn.getProtocolHandler().getFailoverLatch().countDown();
_conn.getProtocolHandler().setFailoverLatch(null);
}
+
+ }
+ });
+
+
+ if (failoverDone.get())
+ {
+ return;
}
+
}
+
_conn.setClosed();
final ExceptionListener listener = _conn.getExceptionListenerNoCheck();
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index 35582d92b7..ae83b6ab48 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -350,11 +350,6 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
}
}
- public void failoverPrep()
- {
- // do nothing
- }
-
/**
* For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling.
* The caller must hold the failover mutex before calling this method.
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 8f5e9524b6..3966e75423 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -169,7 +169,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
private final Lock _subscriberDetails = new ReentrantLock(true);
private final Lock _subscriberAccess = new ReentrantLock(true);
- private final FlowControllingBlockingQueue _queue;
+ private final FlowControllingBlockingQueue<Dispatchable> _queue;
private final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
private final AtomicLong _rollbackMark = new AtomicLong(-1);
@@ -358,7 +358,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
if (_acknowledgeMode == NO_ACKNOWLEDGE)
{
_queue =
- new FlowControllingBlockingQueue(_prefetchHighMark, _prefetchLowMark,
+ new FlowControllingBlockingQueue<Dispatchable>(_prefetchHighMark, _prefetchLowMark,
new FlowControllingBlockingQueue.ThresholdListener()
{
private final AtomicBoolean _suspendState = new AtomicBoolean();
@@ -423,7 +423,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
else
{
- _queue = new FlowControllingBlockingQueue(_prefetchHighMark, null);
+ _queue = new FlowControllingBlockingQueue<Dispatchable>(_prefetchHighMark, null);
}
// Add creation logging to tie in with the existing close logging
@@ -1789,7 +1789,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
//in the pre-dispatch queue.
_usingDispatcherForCleanup = true;
- syncDispatchQueue();
+ syncDispatchQueue(false);
// Set to false before sending the recover as 0-8/9/9-1 will
//send messages back before the recover completes, and we
@@ -1881,7 +1881,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
setRollbackMark();
- syncDispatchQueue();
+ syncDispatchQueue(false);
_dispatcher.rollback();
@@ -2201,21 +2201,17 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
- void failoverPrep()
- {
- syncDispatchQueue();
- }
- void syncDispatchQueue()
+ void syncDispatchQueue(final boolean holdDispatchLock)
{
- if (Thread.currentThread() == _dispatcherThread)
+ if (Thread.currentThread() == _dispatcherThread || holdDispatchLock)
{
while (!super.isClosed() && !_queue.isEmpty())
{
Dispatchable disp;
try
{
- disp = (Dispatchable) _queue.take();
+ disp = _queue.take();
}
catch (InterruptedException e)
{
@@ -2267,7 +2263,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
Dispatchable disp;
try
{
- disp = (Dispatchable) _queue.take();
+ disp = _queue.take();
}
catch (InterruptedException e)
{
@@ -3086,7 +3082,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
private void rejectMessagesForConsumerTag(int consumerTag, boolean requeue, boolean rejectAllConsumers)
{
- Iterator messages = _queue.iterator();
+ Iterator<Dispatchable> messages = _queue.iterator();
if (_logger.isDebugEnabled())
{
_logger.debug("Rejecting messages from _queue for Consumer tag(" + consumerTag + ") (PDispatchQ) requeue:"
@@ -3237,6 +3233,12 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public abstract void setFlowControl(final boolean active);
+ Object getDispatcherLock()
+ {
+ Dispatcher dispatcher = _dispatcher;
+ return dispatcher == null ? null : dispatcher._lock;
+ }
+
public interface Dispatchable
{
void dispatch(AMQSession ssn);
@@ -3389,10 +3391,18 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
try
{
- Dispatchable disp;
- while (((disp = (Dispatchable) _queue.take()) != null) && !_closed.get())
+
+ while (((_queue.blockingPeek()) != null) && !_closed.get())
{
- disp.dispatch(AMQSession.this);
+ synchronized (_lock)
+ {
+ Dispatchable disp = _queue.nonBlockingTake();
+
+ if(disp != null)
+ {
+ disp.dispatch(AMQSession.this);
+ }
+ }
}
}
catch (InterruptedException e)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 206ca15c82..08d7ea3f67 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -235,7 +235,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
void failoverPrep()
{
- super.failoverPrep();
+ syncDispatchQueue(true);
clearUnacked();
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index e0d8ac3702..0cb103f0cb 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -412,7 +412,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
_capacity,
Option.UNRELIABLE);
}
- _0_10session.syncDispatchQueue();
+ _0_10session.syncDispatchQueue(false);
o = super.getMessageFromQueue(-1);
}
if (_capacity == 0)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java b/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java
index f46c61daa7..0ba5cfdacb 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java
@@ -22,13 +22,16 @@ package org.apache.qpid.client;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
public final class ChannelToSessionMap
{
- private final Map<Integer, AMQSession> _sessionMap = new ConcurrentHashMap<>();
+ private final Map<Integer, AMQSession> _sessionMap = Collections.synchronizedMap(new LinkedHashMap<Integer, AMQSession>());
private AtomicInteger _idFactory = new AtomicInteger(0);
private int _maxChannelID;
private int _minChannelID;
@@ -48,7 +51,7 @@ public final class ChannelToSessionMap
_sessionMap.remove(channelId);
}
- public Collection<AMQSession> values()
+ public List<AMQSession> values()
{
return new ArrayList<>(_sessionMap.values());
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
index b194ac88de..df54b7066b 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
@@ -20,13 +20,13 @@
*/
package org.apache.qpid.client.util;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* A blocking queue that emits events above a user specified threshold allowing the caller to take action (e.g. flow
* control) to try to prevent the queue growing (much) further. The underlying queue itself is not bounded therefore the
@@ -37,12 +37,12 @@ import java.util.concurrent.ConcurrentLinkedQueue;
* <p>
* TODO Make this implement java.util.Queue and hide the implementation. Then different queue types can be substituted.
*/
-public class FlowControllingBlockingQueue
+public class FlowControllingBlockingQueue<T>
{
private static final Logger _logger = LoggerFactory.getLogger(FlowControllingBlockingQueue.class);
/** This queue is bounded and is used to store messages before being dispatched to the consumer */
- private final Queue _queue = new ConcurrentLinkedQueue();
+ private final Queue<T> _queue = new ConcurrentLinkedQueue<T>();
private final int _flowControlHighThreshold;
private final int _flowControlLowThreshold;
@@ -82,9 +82,44 @@ public class FlowControllingBlockingQueue
}
}
- public Object take() throws InterruptedException
+ public T blockingPeek() throws InterruptedException
+ {
+ T o = _queue.peek();
+ if (o == null)
+ {
+ synchronized (this)
+ {
+ while ((o = _queue.peek()) == null)
+ {
+ wait();
+ }
+ }
+ }
+ return o;
+ }
+
+ public T nonBlockingTake() throws InterruptedException
+ {
+ T o = _queue.poll();
+
+ if (o != null && !disableFlowControl && _listener != null)
+ {
+ synchronized (_listener)
+ {
+ if (_count-- == _flowControlLowThreshold)
+ {
+ _listener.underThreshold(_count);
+ }
+ }
+
+ }
+
+ return o;
+ }
+
+ public T take() throws InterruptedException
{
- Object o = _queue.poll();
+ T o = _queue.poll();
if(o == null)
{
synchronized(this)
@@ -110,7 +145,7 @@ public class FlowControllingBlockingQueue
return o;
}
- public void add(Object o)
+ public void add(T o)
{
synchronized(this)
{
@@ -130,7 +165,7 @@ public class FlowControllingBlockingQueue
}
}
- public Iterator iterator()
+ public Iterator<T> iterator()
{
return _queue.iterator();
}