From fbb2c460dfa60e63712f616a3e45c75c9735d5c7 Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Fri, 13 Feb 2015 17:01:59 +0000 Subject: QPID-6374: [Java Broker] 0-10 Failover: the thread performing the failover prep now syncs the dispatch queue (avoids possibility of app level dead lock) git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1659605 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQConnection.java | 47 +++++++++++++++++-- .../apache/qpid/client/AMQConnectionDelegate.java | 2 - .../qpid/client/AMQConnectionDelegate_0_10.java | 24 ++++++++-- .../qpid/client/AMQConnectionDelegate_8_0.java | 5 -- .../java/org/apache/qpid/client/AMQSession.java | 44 +++++++++++------- .../org/apache/qpid/client/AMQSession_0_10.java | 2 +- .../qpid/client/BasicMessageConsumer_0_10.java | 2 +- .../apache/qpid/client/ChannelToSessionMap.java | 7 ++- .../client/util/FlowControllingBlockingQueue.java | 53 ++++++++++++++++++---- 9 files changed, 140 insertions(+), 46 deletions(-) (limited to 'qpid/java') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 4c596b88a0..8e7b5b90d8 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -1216,11 +1216,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return _failoverMutex; } - public void failoverPrep() - { - _delegate.failoverPrep(); - } - public void resubscribeSessions() throws JMSException, AMQException, FailoverException { _delegate.resubscribeSessions(); @@ -1653,4 +1648,46 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { return _messageCompressionThresholdSize; } + + void doWithAllLocks(Runnable r) + { + doWithAllLocks(r, _sessions.values()); + + } + + private void doWithAllLocks(final Runnable r, final List sessions) + { + if (!sessions.isEmpty()) + { + AMQSession session = sessions.remove(0); + + final Object dispatcherLock = session.getDispatcherLock(); + if (dispatcherLock != null) + { + synchronized (dispatcherLock) + { + synchronized (session.getMessageDeliveryLock()) + { + doWithAllLocks(r, sessions); + } + } + } + else + { + synchronized (session.getMessageDeliveryLock()) + { + doWithAllLocks(r, sessions); + } + } + } + else + { + synchronized (getFailoverMutex()) + { + r.run(); + } + } + } + + } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index 74ca1ed74f..c359fbcc84 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -52,8 +52,6 @@ public interface AMQConnectionDelegate XASession createXASession(int ackMode) throws JMSException; - void failoverPrep(); - void resubscribeSessions() throws JMSException, AMQException, FailoverException; void closeConnection(long timeout) throws JMSException, AMQException; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index fdeab7ae70..e22a341205 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.ExceptionListener; import javax.jms.JMSException; @@ -249,7 +250,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec List sessions = new ArrayList(_conn.getSessions().values()); for (AMQSession s : sessions) { - s.failoverPrep(); + ((AMQSession_0_10)s).failoverPrep(); } } @@ -306,16 +307,21 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec _qpidConnection.notifyFailoverRequired(); - synchronized (_conn.getFailoverMutex()) + final AtomicBoolean failoverDone = new AtomicBoolean(); + + _conn.doWithAllLocks(new Runnable() { + @Override + public void run() + { try { if (_conn.firePreFailover(false) && _conn.attemptReconnection()) { - _conn.failoverPrep(); + failoverPrep(); _conn.resubscribeSessions(); _conn.fireFailoverComplete(); - return; + failoverDone.set(true); } } catch (Exception e) @@ -327,9 +333,19 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec _conn.getProtocolHandler().getFailoverLatch().countDown(); _conn.getProtocolHandler().setFailoverLatch(null); } + + } + }); + + + if (failoverDone.get()) + { + return; } + } + _conn.setClosed(); final ExceptionListener listener = _conn.getExceptionListenerNoCheck(); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 35582d92b7..ae83b6ab48 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -350,11 +350,6 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate } } - public void failoverPrep() - { - // do nothing - } - /** * For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling. * The caller must hold the failover mutex before calling this method. diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 8f5e9524b6..3966e75423 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -169,7 +169,7 @@ public abstract class AMQSession _queue; private final AtomicLong _highestDeliveryTag = new AtomicLong(-1); private final AtomicLong _rollbackMark = new AtomicLong(-1); @@ -358,7 +358,7 @@ public abstract class AMQSession(_prefetchHighMark, _prefetchLowMark, new FlowControllingBlockingQueue.ThresholdListener() { private final AtomicBoolean _suspendState = new AtomicBoolean(); @@ -423,7 +423,7 @@ public abstract class AMQSession(_prefetchHighMark, null); } // Add creation logging to tie in with the existing close logging @@ -1789,7 +1789,7 @@ public abstract class AMQSession messages = _queue.iterator(); if (_logger.isDebugEnabled()) { _logger.debug("Rejecting messages from _queue for Consumer tag(" + consumerTag + ") (PDispatchQ) requeue:" @@ -3237,6 +3233,12 @@ public abstract class AMQSession _sessionMap = new ConcurrentHashMap<>(); + private final Map _sessionMap = Collections.synchronizedMap(new LinkedHashMap()); private AtomicInteger _idFactory = new AtomicInteger(0); private int _maxChannelID; private int _minChannelID; @@ -48,7 +51,7 @@ public final class ChannelToSessionMap _sessionMap.remove(channelId); } - public Collection values() + public List values() { return new ArrayList<>(_sessionMap.values()); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java index b194ac88de..df54b7066b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java @@ -20,13 +20,13 @@ */ package org.apache.qpid.client.util; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.Iterator; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * A blocking queue that emits events above a user specified threshold allowing the caller to take action (e.g. flow * control) to try to prevent the queue growing (much) further. The underlying queue itself is not bounded therefore the @@ -37,12 +37,12 @@ import java.util.concurrent.ConcurrentLinkedQueue; *

* TODO Make this implement java.util.Queue and hide the implementation. Then different queue types can be substituted. */ -public class FlowControllingBlockingQueue +public class FlowControllingBlockingQueue { private static final Logger _logger = LoggerFactory.getLogger(FlowControllingBlockingQueue.class); /** This queue is bounded and is used to store messages before being dispatched to the consumer */ - private final Queue _queue = new ConcurrentLinkedQueue(); + private final Queue _queue = new ConcurrentLinkedQueue(); private final int _flowControlHighThreshold; private final int _flowControlLowThreshold; @@ -82,9 +82,44 @@ public class FlowControllingBlockingQueue } } - public Object take() throws InterruptedException + public T blockingPeek() throws InterruptedException + { + T o = _queue.peek(); + if (o == null) + { + synchronized (this) + { + while ((o = _queue.peek()) == null) + { + wait(); + } + } + } + return o; + } + + public T nonBlockingTake() throws InterruptedException + { + T o = _queue.poll(); + + if (o != null && !disableFlowControl && _listener != null) + { + synchronized (_listener) + { + if (_count-- == _flowControlLowThreshold) + { + _listener.underThreshold(_count); + } + } + + } + + return o; + } + + public T take() throws InterruptedException { - Object o = _queue.poll(); + T o = _queue.poll(); if(o == null) { synchronized(this) @@ -110,7 +145,7 @@ public class FlowControllingBlockingQueue return o; } - public void add(Object o) + public void add(T o) { synchronized(this) { @@ -130,7 +165,7 @@ public class FlowControllingBlockingQueue } } - public Iterator iterator() + public Iterator iterator() { return _queue.iterator(); } -- cgit v1.2.1