diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2008-10-27 06:19:08 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2008-10-27 06:19:08 +0000 |
| commit | 825327492ededcf62f7307a96eb29f5e7df88351 (patch) | |
| tree | 05ce4cad575ce2c0379c5440f5ec197629c97c16 /java/client/src | |
| parent | 55dae2c49ba8c283583f3688784f2b763e772020 (diff) | |
| download | qpid-python-825327492ededcf62f7307a96eb29f5e7df88351.tar.gz | |
QPID-1339:
- Modified QpidTestCase to start/stop multiple brokers for failover
testing.
- Modified QpidTestCase to substitute port variables into broker
start/stop commands.
- Modified test profiles to use the new port variables.
- Modified QpidTestCase to permit multiple exclude files.
- Modified test profiles to make use of a common exclude list:
ExcludeList
- Added ConnectionTest.testResumeEmptyReplayBuffer.
- Made default exception handling for Connection and Session log the
exception.
- Added SenderExcetion to specifically signal problems with
transmitting connection data.
- Modified Session to catch and deal with connection send failures
for sessions with positive expiry.
- Modified FailoverBaseCase to work for non VM brokers.
- Made FailoverTest fail if failover times out.
- Modified JMS implementation to make use of the recently added low
level session resume.
- Unexcluded failover tests from 0-10 test profiles.
- Excluded MultipleJCAProviderRegistrationTest due to its testing
strategy resulting in spurious failure when running as part of the
larger test suite.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@708093 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
7 files changed, 127 insertions, 55 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 0999a09ca5..4e8fdc2370 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -25,6 +25,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQProtocolException; import org.apache.qpid.AMQUnresolvedAddressException; import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.configuration.ClientProperties; import org.apache.qpid.exchange.ExchangeDefaults; @@ -628,6 +629,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return _delegate.makeBrokerConnection(brokerDetail); } + public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E + { + return _delegate.executeRetrySupport(operation); + } + /** * Get the details of the currently active broker * diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index 60b827a426..b64147fe8f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -27,20 +27,24 @@ import javax.jms.XASession; import org.apache.qpid.AMQException; import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.Session; public interface AMQConnectionDelegate { - public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException; + ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException; - public Session createSession(final boolean transacted, final int acknowledgeMode, - final int prefetchHigh, final int prefetchLow) throws JMSException; + Session createSession(final boolean transacted, final int acknowledgeMode, + final int prefetchHigh, final int prefetchLow) throws JMSException; - public XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException; + XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException; - public void resubscribeSessions() throws JMSException, AMQException, FailoverException; + void resubscribeSessions() throws JMSException, AMQException, FailoverException; + + void closeConnection(long timeout) throws JMSException, AMQException; + + <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E; - public void closeConnection(long timeout) throws JMSException, AMQException; } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 6480a0da76..8a9abcc398 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -23,6 +23,9 @@ package org.apache.qpid.client; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.XASession; @@ -31,6 +34,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQProtocolException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.Session; @@ -61,11 +65,14 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec * The QpidConeection instance that is mapped with thie JMS connection. */ org.apache.qpid.transport.Connection _qpidConnection; + private ConnectionException exception = null; //--- constructor public AMQConnectionDelegate_0_10(AMQConnection conn) { _conn = conn; + _qpidConnection = new Connection(); + _qpidConnection.setConnectionListener(this); } /** @@ -129,16 +136,16 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec */ public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException { - _qpidConnection = new Connection(); try { if (_logger.isDebugEnabled()) { - _logger.debug("creating connection with broker " + " host: " + brokerDetail - .getHost() + " port: " + brokerDetail.getPort() + " virtualhost: " + _conn - .getVirtualHost() + "user name: " + _conn.getUsername() + "password: " + _conn.getPassword()); + _logger.debug("connecting to host: " + brokerDetail.getHost() + + " port: " + brokerDetail.getPort() + + " vhost: " + _conn.getVirtualHost() + + " username: " + _conn.getUsername() + + " password: " + _conn.getPassword()); } - _qpidConnection.setConnectionListener(this); _qpidConnection.connect(brokerDetail.getHost(), brokerDetail.getPort(), _conn.getVirtualHost(), _conn.getUsername(), _conn.getPassword(), brokerDetail.useSSL()); _conn._connected = true; @@ -160,8 +167,13 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec */ public void resubscribeSessions() throws JMSException, AMQException, FailoverException { - //NOT implemented as railover is handled at a lower level - throw new FailoverException("failing to reconnect during failover, operation not supported."); + List<AMQSession> sessions = new ArrayList<AMQSession>(_conn.getSessions().values()); + _logger.info(String.format("Resubscribing sessions = %s sessions.size=%s", sessions, sessions.size())); + for (AMQSession s : sessions) + { + ((AMQSession_0_10) s)._qpidConnection = _qpidConnection; + s.resubscribe(); + } } @@ -181,6 +193,43 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec public void exception(Connection conn, ConnectionException exc) { + if (exception != null) + { + _logger.error("previous exception", exception); + } + + exception = exc; + } + + public void closed(Connection conn) + { + ConnectionException exc = exception; + exception = null; + + ConnectionClose close = exc.getClose(); + if (close == null) + { + try + { + if (_conn.firePreFailover(false) && _conn.attemptReconnection()) + { + _qpidConnection.resume(); + + if (_conn.firePreResubscribe()) + { + _conn.resubscribeSessions(); + } + + _conn.fireFailoverComplete(); + return; + } + } + catch (Exception e) + { + _logger.error("error during failover", e); + } + } + ExceptionListener listener = _conn._exceptionListener; if (listener == null) { @@ -188,19 +237,28 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec } else { - ConnectionClose close = exc.getClose(); String code = null; if (close != null) { code = close.getReplyCode().toString(); } + JMSException ex = new JMSException(exc.getMessage(), code); ex.initCause(exc); - - _conn._exceptionListener.onException(ex); + listener.onException(ex); } } - public void closed(Connection conn) {} + public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E + { + try + { + return operation.execute(); + } + catch (FailoverException e) + { + throw new RuntimeException(e); + } + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 8d42a2f201..035e3830ca 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -247,4 +247,41 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate throw new AMQException(null, "Error reopening channel " + channelId + " after failover: " + e, e); } } + + public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E + { + while (true) + { + try + { + _conn.blockUntilNotFailingOver(); + } + catch (InterruptedException e) + { + _logger.debug("Interrupted: " + e, e); + + return null; + } + + synchronized (_conn.getFailoverMutex()) + { + try + { + return operation.execute(); + } + catch (FailoverException e) + { + _logger.debug("Failover exception caught during operation: " + e, e); + } + catch (IllegalStateException e) + { + if (!(e.getMessage().startsWith("Fail-over interupted no-op failover support"))) + { + throw e; + } + } + } + } + } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index cbdefd0548..b5d12d9520 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -261,6 +261,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /** Holds the highest received delivery tag. */ private final AtomicLong _highestDeliveryTag = new AtomicLong(-1); + private final AtomicLong _rollbackMark = new AtomicLong(-1); /** All the not yet acknowledged message tags */ protected ConcurrentLinkedQueue<Long> _unacknowledgedMessageTags = new ConcurrentLinkedQueue<Long>(); @@ -1809,6 +1810,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { _failedOverDirty = true; } + + _rollbackMark.set(-1); resubscribeProducers(); resubscribeConsumers(); } @@ -2601,7 +2604,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /** Used for debugging in the dispatcher. */ private static final Logger _dispatcherLogger = LoggerFactory.getLogger("org.apache.qpid.client.AMQSession.Dispatcher"); - /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ class Dispatcher extends Thread @@ -2611,7 +2613,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic private final AtomicBoolean _closed = new AtomicBoolean(false); private final Object _lock = new Object(); - private final AtomicLong _rollbackMark = new AtomicLong(-1); private String dispatcherID = "" + System.identityHashCode(this); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 7829966315..ab983aa842 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -113,11 +113,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic super(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetchHighMark, defaultPrefetchLowMark); _qpidConnection = qpidConnection; - // create the qpid session with an expiry <= 0 so that the session does not expire - _qpidSession = qpidConnection.createSession(0); - // set the exception listnere for this session + _qpidSession = _qpidConnection.createSession(1); _qpidSession.setSessionListener(this); - // set transacted if required if (_transacted) { _qpidSession.txSelect(); diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java index cf7e978c03..e9e52cc97c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java @@ -99,37 +99,6 @@ public class FailoverRetrySupport<T, E extends Exception> implements FailoverSup */ public T execute() throws E { - while (true) - { - try - { - connection.blockUntilNotFailingOver(); - } - catch (InterruptedException e) - { - _log.debug("Interrupted: " + e, e); - - return null; - } - - synchronized (connection.getFailoverMutex()) - { - try - { - return operation.execute(); - } - catch (FailoverException e) - { - _log.debug("Failover exception caught during operation: " + e, e); - } - catch (IllegalStateException e) - { - if (!(e.getMessage().startsWith("Fail-over interupted no-op failover support"))) - { - throw e; - } - } - } - } + return connection.executeRetrySupport(operation); } } |
