From 825327492ededcf62f7307a96eb29f5e7df88351 Mon Sep 17 00:00:00 2001 From: "Rafael H. Schloming" Date: Mon, 27 Oct 2008 06:19:08 +0000 Subject: QPID-1339: - Modified QpidTestCase to start/stop multiple brokers for failover testing. - Modified QpidTestCase to substitute port variables into broker start/stop commands. - Modified test profiles to use the new port variables. - Modified QpidTestCase to permit multiple exclude files. - Modified test profiles to make use of a common exclude list: ExcludeList - Added ConnectionTest.testResumeEmptyReplayBuffer. - Made default exception handling for Connection and Session log the exception. - Added SenderExcetion to specifically signal problems with transmitting connection data. - Modified Session to catch and deal with connection send failures for sessions with positive expiry. - Modified FailoverBaseCase to work for non VM brokers. - Made FailoverTest fail if failover times out. - Modified JMS implementation to make use of the recently added low level session resume. - Unexcluded failover tests from 0-10 test profiles. - Excluded MultipleJCAProviderRegistrationTest due to its testing strategy resulting in spurious failure when running as part of the larger test suite. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@708093 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQConnection.java | 6 ++ .../apache/qpid/client/AMQConnectionDelegate.java | 16 +++-- .../qpid/client/AMQConnectionDelegate_0_10.java | 80 +++++++++++++++++++--- .../qpid/client/AMQConnectionDelegate_8_0.java | 37 ++++++++++ .../java/org/apache/qpid/client/AMQSession.java | 5 +- .../org/apache/qpid/client/AMQSession_0_10.java | 5 +- .../qpid/client/failover/FailoverRetrySupport.java | 33 +-------- 7 files changed, 127 insertions(+), 55 deletions(-) (limited to 'java/client/src') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 0999a09ca5..4e8fdc2370 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -25,6 +25,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQProtocolException; import org.apache.qpid.AMQUnresolvedAddressException; import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.configuration.ClientProperties; import org.apache.qpid.exchange.ExchangeDefaults; @@ -628,6 +629,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return _delegate.makeBrokerConnection(brokerDetail); } + public T executeRetrySupport(FailoverProtectedOperation operation) throws E + { + return _delegate.executeRetrySupport(operation); + } + /** * Get the details of the currently active broker * diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index 60b827a426..b64147fe8f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -27,20 +27,24 @@ import javax.jms.XASession; import org.apache.qpid.AMQException; import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.Session; public interface AMQConnectionDelegate { - public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException; + ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException; - public Session createSession(final boolean transacted, final int acknowledgeMode, - final int prefetchHigh, final int prefetchLow) throws JMSException; + Session createSession(final boolean transacted, final int acknowledgeMode, + final int prefetchHigh, final int prefetchLow) throws JMSException; - public XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException; + XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException; - public void resubscribeSessions() throws JMSException, AMQException, FailoverException; + void resubscribeSessions() throws JMSException, AMQException, FailoverException; + + void closeConnection(long timeout) throws JMSException, AMQException; + + T executeRetrySupport(FailoverProtectedOperation operation) throws E; - public void closeConnection(long timeout) throws JMSException, AMQException; } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 6480a0da76..8a9abcc398 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -23,6 +23,9 @@ package org.apache.qpid.client; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.XASession; @@ -31,6 +34,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQProtocolException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.Session; @@ -61,11 +65,14 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec * The QpidConeection instance that is mapped with thie JMS connection. */ org.apache.qpid.transport.Connection _qpidConnection; + private ConnectionException exception = null; //--- constructor public AMQConnectionDelegate_0_10(AMQConnection conn) { _conn = conn; + _qpidConnection = new Connection(); + _qpidConnection.setConnectionListener(this); } /** @@ -129,16 +136,16 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec */ public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException { - _qpidConnection = new Connection(); try { if (_logger.isDebugEnabled()) { - _logger.debug("creating connection with broker " + " host: " + brokerDetail - .getHost() + " port: " + brokerDetail.getPort() + " virtualhost: " + _conn - .getVirtualHost() + "user name: " + _conn.getUsername() + "password: " + _conn.getPassword()); + _logger.debug("connecting to host: " + brokerDetail.getHost() + + " port: " + brokerDetail.getPort() + + " vhost: " + _conn.getVirtualHost() + + " username: " + _conn.getUsername() + + " password: " + _conn.getPassword()); } - _qpidConnection.setConnectionListener(this); _qpidConnection.connect(brokerDetail.getHost(), brokerDetail.getPort(), _conn.getVirtualHost(), _conn.getUsername(), _conn.getPassword(), brokerDetail.useSSL()); _conn._connected = true; @@ -160,8 +167,13 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec */ public void resubscribeSessions() throws JMSException, AMQException, FailoverException { - //NOT implemented as railover is handled at a lower level - throw new FailoverException("failing to reconnect during failover, operation not supported."); + List sessions = new ArrayList(_conn.getSessions().values()); + _logger.info(String.format("Resubscribing sessions = %s sessions.size=%s", sessions, sessions.size())); + for (AMQSession s : sessions) + { + ((AMQSession_0_10) s)._qpidConnection = _qpidConnection; + s.resubscribe(); + } } @@ -181,6 +193,43 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec public void exception(Connection conn, ConnectionException exc) { + if (exception != null) + { + _logger.error("previous exception", exception); + } + + exception = exc; + } + + public void closed(Connection conn) + { + ConnectionException exc = exception; + exception = null; + + ConnectionClose close = exc.getClose(); + if (close == null) + { + try + { + if (_conn.firePreFailover(false) && _conn.attemptReconnection()) + { + _qpidConnection.resume(); + + if (_conn.firePreResubscribe()) + { + _conn.resubscribeSessions(); + } + + _conn.fireFailoverComplete(); + return; + } + } + catch (Exception e) + { + _logger.error("error during failover", e); + } + } + ExceptionListener listener = _conn._exceptionListener; if (listener == null) { @@ -188,19 +237,28 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec } else { - ConnectionClose close = exc.getClose(); String code = null; if (close != null) { code = close.getReplyCode().toString(); } + JMSException ex = new JMSException(exc.getMessage(), code); ex.initCause(exc); - - _conn._exceptionListener.onException(ex); + listener.onException(ex); } } - public void closed(Connection conn) {} + public T executeRetrySupport(FailoverProtectedOperation operation) throws E + { + try + { + return operation.execute(); + } + catch (FailoverException e) + { + throw new RuntimeException(e); + } + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 8d42a2f201..035e3830ca 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -247,4 +247,41 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate throw new AMQException(null, "Error reopening channel " + channelId + " after failover: " + e, e); } } + + public T executeRetrySupport(FailoverProtectedOperation operation) throws E + { + while (true) + { + try + { + _conn.blockUntilNotFailingOver(); + } + catch (InterruptedException e) + { + _logger.debug("Interrupted: " + e, e); + + return null; + } + + synchronized (_conn.getFailoverMutex()) + { + try + { + return operation.execute(); + } + catch (FailoverException e) + { + _logger.debug("Failover exception caught during operation: " + e, e); + } + catch (IllegalStateException e) + { + if (!(e.getMessage().startsWith("Fail-over interupted no-op failover support"))) + { + throw e; + } + } + } + } + } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index cbdefd0548..b5d12d9520 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -261,6 +261,7 @@ public abstract class AMQSession _unacknowledgedMessageTags = new ConcurrentLinkedQueue(); @@ -1809,6 +1810,8 @@ public abstract class AMQSession implements FailoverSup */ public T execute() throws E { - while (true) - { - try - { - connection.blockUntilNotFailingOver(); - } - catch (InterruptedException e) - { - _log.debug("Interrupted: " + e, e); - - return null; - } - - synchronized (connection.getFailoverMutex()) - { - try - { - return operation.execute(); - } - catch (FailoverException e) - { - _log.debug("Failover exception caught during operation: " + e, e); - } - catch (IllegalStateException e) - { - if (!(e.getMessage().startsWith("Fail-over interupted no-op failover support"))) - { - throw e; - } - } - } - } + return connection.executeRetrySupport(operation); } } -- cgit v1.2.1