diff options
author | Keith Wall <kwall@apache.org> | 2015-02-13 17:01:59 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2015-02-13 17:01:59 +0000 |
commit | fbb2c460dfa60e63712f616a3e45c75c9735d5c7 (patch) | |
tree | 44e8662a0492e8faa11eb84c6da9e5337ae24fd3 | |
parent | 8d84886a1324a42db1992a4d567487821894d691 (diff) | |
download | qpid-python-fbb2c460dfa60e63712f616a3e45c75c9735d5c7.tar.gz |
QPID-6374: [Java Broker] 0-10 Failover: the thread performing the failover prep now syncs the dispatch queue (avoids possibility of app level dead lock)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1659605 13f79535-47bb-0310-9956-ffa450edef68
9 files changed, 140 insertions, 46 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 4c596b88a0..8e7b5b90d8 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -1216,11 +1216,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return _failoverMutex; } - public void failoverPrep() - { - _delegate.failoverPrep(); - } - public void resubscribeSessions() throws JMSException, AMQException, FailoverException { _delegate.resubscribeSessions(); @@ -1653,4 +1648,46 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { return _messageCompressionThresholdSize; } + + void doWithAllLocks(Runnable r) + { + doWithAllLocks(r, _sessions.values()); + + } + + private void doWithAllLocks(final Runnable r, final List<AMQSession> sessions) + { + if (!sessions.isEmpty()) + { + AMQSession session = sessions.remove(0); + + final Object dispatcherLock = session.getDispatcherLock(); + if (dispatcherLock != null) + { + synchronized (dispatcherLock) + { + synchronized (session.getMessageDeliveryLock()) + { + doWithAllLocks(r, sessions); + } + } + } + else + { + synchronized (session.getMessageDeliveryLock()) + { + doWithAllLocks(r, sessions); + } + } + } + else + { + synchronized (getFailoverMutex()) + { + r.run(); + } + } + } + + } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index 74ca1ed74f..c359fbcc84 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -52,8 +52,6 @@ public interface AMQConnectionDelegate XASession createXASession(int ackMode) throws JMSException; - void failoverPrep(); - void resubscribeSessions() throws JMSException, AMQException, FailoverException; void closeConnection(long timeout) throws JMSException, AMQException; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index fdeab7ae70..e22a341205 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.ExceptionListener; import javax.jms.JMSException; @@ -249,7 +250,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec List<AMQSession> sessions = new ArrayList<AMQSession>(_conn.getSessions().values()); for (AMQSession s : sessions) { - s.failoverPrep(); + ((AMQSession_0_10)s).failoverPrep(); } } @@ -306,16 +307,21 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec _qpidConnection.notifyFailoverRequired(); - synchronized (_conn.getFailoverMutex()) + final AtomicBoolean failoverDone = new AtomicBoolean(); + + _conn.doWithAllLocks(new Runnable() { + @Override + public void run() + { try { if (_conn.firePreFailover(false) && _conn.attemptReconnection()) { - _conn.failoverPrep(); + failoverPrep(); _conn.resubscribeSessions(); _conn.fireFailoverComplete(); - return; + failoverDone.set(true); } } catch (Exception e) @@ -327,9 +333,19 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec _conn.getProtocolHandler().getFailoverLatch().countDown(); _conn.getProtocolHandler().setFailoverLatch(null); } + + } + }); + + + if (failoverDone.get()) + { + return; } + } + _conn.setClosed(); final ExceptionListener listener = _conn.getExceptionListenerNoCheck(); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 35582d92b7..ae83b6ab48 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -350,11 +350,6 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate } } - public void failoverPrep() - { - // do nothing - } - /** * For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling. * The caller must hold the failover mutex before calling this method. diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 8f5e9524b6..3966e75423 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -169,7 +169,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic private final Lock _subscriberDetails = new ReentrantLock(true); private final Lock _subscriberAccess = new ReentrantLock(true); - private final FlowControllingBlockingQueue _queue; + private final FlowControllingBlockingQueue<Dispatchable> _queue; private final AtomicLong _highestDeliveryTag = new AtomicLong(-1); private final AtomicLong _rollbackMark = new AtomicLong(-1); @@ -358,7 +358,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic if (_acknowledgeMode == NO_ACKNOWLEDGE) { _queue = - new FlowControllingBlockingQueue(_prefetchHighMark, _prefetchLowMark, + new FlowControllingBlockingQueue<Dispatchable>(_prefetchHighMark, _prefetchLowMark, new FlowControllingBlockingQueue.ThresholdListener() { private final AtomicBoolean _suspendState = new AtomicBoolean(); @@ -423,7 +423,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } else { - _queue = new FlowControllingBlockingQueue(_prefetchHighMark, null); + _queue = new FlowControllingBlockingQueue<Dispatchable>(_prefetchHighMark, null); } // Add creation logging to tie in with the existing close logging @@ -1789,7 +1789,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic //in the pre-dispatch queue. _usingDispatcherForCleanup = true; - syncDispatchQueue(); + syncDispatchQueue(false); // Set to false before sending the recover as 0-8/9/9-1 will //send messages back before the recover completes, and we @@ -1881,7 +1881,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic setRollbackMark(); - syncDispatchQueue(); + syncDispatchQueue(false); _dispatcher.rollback(); @@ -2201,21 +2201,17 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } - void failoverPrep() - { - syncDispatchQueue(); - } - void syncDispatchQueue() + void syncDispatchQueue(final boolean holdDispatchLock) { - if (Thread.currentThread() == _dispatcherThread) + if (Thread.currentThread() == _dispatcherThread || holdDispatchLock) { while (!super.isClosed() && !_queue.isEmpty()) { Dispatchable disp; try { - disp = (Dispatchable) _queue.take(); + disp = _queue.take(); } catch (InterruptedException e) { @@ -2267,7 +2263,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic Dispatchable disp; try { - disp = (Dispatchable) _queue.take(); + disp = _queue.take(); } catch (InterruptedException e) { @@ -3086,7 +3082,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic private void rejectMessagesForConsumerTag(int consumerTag, boolean requeue, boolean rejectAllConsumers) { - Iterator messages = _queue.iterator(); + Iterator<Dispatchable> messages = _queue.iterator(); if (_logger.isDebugEnabled()) { _logger.debug("Rejecting messages from _queue for Consumer tag(" + consumerTag + ") (PDispatchQ) requeue:" @@ -3237,6 +3233,12 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public abstract void setFlowControl(final boolean active); + Object getDispatcherLock() + { + Dispatcher dispatcher = _dispatcher; + return dispatcher == null ? null : dispatcher._lock; + } + public interface Dispatchable { void dispatch(AMQSession ssn); @@ -3389,10 +3391,18 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic try { - Dispatchable disp; - while (((disp = (Dispatchable) _queue.take()) != null) && !_closed.get()) + + while (((_queue.blockingPeek()) != null) && !_closed.get()) { - disp.dispatch(AMQSession.this); + synchronized (_lock) + { + Dispatchable disp = _queue.nonBlockingTake(); + + if(disp != null) + { + disp.dispatch(AMQSession.this); + } + } } } catch (InterruptedException e) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 206ca15c82..08d7ea3f67 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -235,7 +235,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic void failoverPrep() { - super.failoverPrep(); + syncDispatchQueue(true); clearUnacked(); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index e0d8ac3702..0cb103f0cb 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -412,7 +412,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM _capacity, Option.UNRELIABLE); } - _0_10session.syncDispatchQueue(); + _0_10session.syncDispatchQueue(false); o = super.getMessageFromQueue(-1); } if (_capacity == 0) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java b/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java index f46c61daa7..0ba5cfdacb 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java @@ -22,13 +22,16 @@ package org.apache.qpid.client; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; public final class ChannelToSessionMap { - private final Map<Integer, AMQSession> _sessionMap = new ConcurrentHashMap<>(); + private final Map<Integer, AMQSession> _sessionMap = Collections.synchronizedMap(new LinkedHashMap<Integer, AMQSession>()); private AtomicInteger _idFactory = new AtomicInteger(0); private int _maxChannelID; private int _minChannelID; @@ -48,7 +51,7 @@ public final class ChannelToSessionMap _sessionMap.remove(channelId); } - public Collection<AMQSession> values() + public List<AMQSession> values() { return new ArrayList<>(_sessionMap.values()); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java index b194ac88de..df54b7066b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java @@ -20,13 +20,13 @@ */ package org.apache.qpid.client.util; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.Iterator; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * A blocking queue that emits events above a user specified threshold allowing the caller to take action (e.g. flow * control) to try to prevent the queue growing (much) further. The underlying queue itself is not bounded therefore the @@ -37,12 +37,12 @@ import java.util.concurrent.ConcurrentLinkedQueue; * <p> * TODO Make this implement java.util.Queue and hide the implementation. Then different queue types can be substituted. */ -public class FlowControllingBlockingQueue +public class FlowControllingBlockingQueue<T> { private static final Logger _logger = LoggerFactory.getLogger(FlowControllingBlockingQueue.class); /** This queue is bounded and is used to store messages before being dispatched to the consumer */ - private final Queue _queue = new ConcurrentLinkedQueue(); + private final Queue<T> _queue = new ConcurrentLinkedQueue<T>(); private final int _flowControlHighThreshold; private final int _flowControlLowThreshold; @@ -82,9 +82,44 @@ public class FlowControllingBlockingQueue } } - public Object take() throws InterruptedException + public T blockingPeek() throws InterruptedException + { + T o = _queue.peek(); + if (o == null) + { + synchronized (this) + { + while ((o = _queue.peek()) == null) + { + wait(); + } + } + } + return o; + } + + public T nonBlockingTake() throws InterruptedException + { + T o = _queue.poll(); + + if (o != null && !disableFlowControl && _listener != null) + { + synchronized (_listener) + { + if (_count-- == _flowControlLowThreshold) + { + _listener.underThreshold(_count); + } + } + + } + + return o; + } + + public T take() throws InterruptedException { - Object o = _queue.poll(); + T o = _queue.poll(); if(o == null) { synchronized(this) @@ -110,7 +145,7 @@ public class FlowControllingBlockingQueue return o; } - public void add(Object o) + public void add(T o) { synchronized(this) { @@ -130,7 +165,7 @@ public class FlowControllingBlockingQueue } } - public Iterator iterator() + public Iterator<T> iterator() { return _queue.iterator(); } |