summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-10-27 06:19:08 +0000
committerRafael H. Schloming <rhs@apache.org>2008-10-27 06:19:08 +0000
commit825327492ededcf62f7307a96eb29f5e7df88351 (patch)
tree05ce4cad575ce2c0379c5440f5ec197629c97c16 /java/client/src
parent55dae2c49ba8c283583f3688784f2b763e772020 (diff)
downloadqpid-python-825327492ededcf62f7307a96eb29f5e7df88351.tar.gz
QPID-1339:
- Modified QpidTestCase to start/stop multiple brokers for failover testing. - Modified QpidTestCase to substitute port variables into broker start/stop commands. - Modified test profiles to use the new port variables. - Modified QpidTestCase to permit multiple exclude files. - Modified test profiles to make use of a common exclude list: ExcludeList - Added ConnectionTest.testResumeEmptyReplayBuffer. - Made default exception handling for Connection and Session log the exception. - Added SenderExcetion to specifically signal problems with transmitting connection data. - Modified Session to catch and deal with connection send failures for sessions with positive expiry. - Modified FailoverBaseCase to work for non VM brokers. - Made FailoverTest fail if failover times out. - Modified JMS implementation to make use of the recently added low level session resume. - Unexcluded failover tests from 0-10 test profiles. - Excluded MultipleJCAProviderRegistrationTest due to its testing strategy resulting in spurious failure when running as part of the larger test suite. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@708093 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java16
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java80
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java37
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java33
7 files changed, 127 insertions, 55 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 0999a09ca5..4e8fdc2370 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -25,6 +25,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQProtocolException;
import org.apache.qpid.AMQUnresolvedAddressException;
import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.configuration.ClientProperties;
import org.apache.qpid.exchange.ExchangeDefaults;
@@ -628,6 +629,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
return _delegate.makeBrokerConnection(brokerDetail);
}
+ public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E
+ {
+ return _delegate.executeRetrySupport(operation);
+ }
+
/**
* Get the details of the currently active broker
*
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
index 60b827a426..b64147fe8f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
@@ -27,20 +27,24 @@ import javax.jms.XASession;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.Session;
public interface AMQConnectionDelegate
{
- public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException;
+ ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException;
- public Session createSession(final boolean transacted, final int acknowledgeMode,
- final int prefetchHigh, final int prefetchLow) throws JMSException;
+ Session createSession(final boolean transacted, final int acknowledgeMode,
+ final int prefetchHigh, final int prefetchLow) throws JMSException;
- public XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException;
+ XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException;
- public void resubscribeSessions() throws JMSException, AMQException, FailoverException;
+ void resubscribeSessions() throws JMSException, AMQException, FailoverException;
+
+ void closeConnection(long timeout) throws JMSException, AMQException;
+
+ <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E;
- public void closeConnection(long timeout) throws JMSException, AMQException;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index 6480a0da76..8a9abcc398 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -23,6 +23,9 @@ package org.apache.qpid.client;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.XASession;
@@ -31,6 +34,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQProtocolException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.Session;
@@ -61,11 +65,14 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
* The QpidConeection instance that is mapped with thie JMS connection.
*/
org.apache.qpid.transport.Connection _qpidConnection;
+ private ConnectionException exception = null;
//--- constructor
public AMQConnectionDelegate_0_10(AMQConnection conn)
{
_conn = conn;
+ _qpidConnection = new Connection();
+ _qpidConnection.setConnectionListener(this);
}
/**
@@ -129,16 +136,16 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
*/
public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException
{
- _qpidConnection = new Connection();
try
{
if (_logger.isDebugEnabled())
{
- _logger.debug("creating connection with broker " + " host: " + brokerDetail
- .getHost() + " port: " + brokerDetail.getPort() + " virtualhost: " + _conn
- .getVirtualHost() + "user name: " + _conn.getUsername() + "password: " + _conn.getPassword());
+ _logger.debug("connecting to host: " + brokerDetail.getHost() +
+ " port: " + brokerDetail.getPort() +
+ " vhost: " + _conn.getVirtualHost() +
+ " username: " + _conn.getUsername() +
+ " password: " + _conn.getPassword());
}
- _qpidConnection.setConnectionListener(this);
_qpidConnection.connect(brokerDetail.getHost(), brokerDetail.getPort(), _conn.getVirtualHost(),
_conn.getUsername(), _conn.getPassword(), brokerDetail.useSSL());
_conn._connected = true;
@@ -160,8 +167,13 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
*/
public void resubscribeSessions() throws JMSException, AMQException, FailoverException
{
- //NOT implemented as railover is handled at a lower level
- throw new FailoverException("failing to reconnect during failover, operation not supported.");
+ List<AMQSession> sessions = new ArrayList<AMQSession>(_conn.getSessions().values());
+ _logger.info(String.format("Resubscribing sessions = %s sessions.size=%s", sessions, sessions.size()));
+ for (AMQSession s : sessions)
+ {
+ ((AMQSession_0_10) s)._qpidConnection = _qpidConnection;
+ s.resubscribe();
+ }
}
@@ -181,6 +193,43 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
public void exception(Connection conn, ConnectionException exc)
{
+ if (exception != null)
+ {
+ _logger.error("previous exception", exception);
+ }
+
+ exception = exc;
+ }
+
+ public void closed(Connection conn)
+ {
+ ConnectionException exc = exception;
+ exception = null;
+
+ ConnectionClose close = exc.getClose();
+ if (close == null)
+ {
+ try
+ {
+ if (_conn.firePreFailover(false) && _conn.attemptReconnection())
+ {
+ _qpidConnection.resume();
+
+ if (_conn.firePreResubscribe())
+ {
+ _conn.resubscribeSessions();
+ }
+
+ _conn.fireFailoverComplete();
+ return;
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.error("error during failover", e);
+ }
+ }
+
ExceptionListener listener = _conn._exceptionListener;
if (listener == null)
{
@@ -188,19 +237,28 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
}
else
{
- ConnectionClose close = exc.getClose();
String code = null;
if (close != null)
{
code = close.getReplyCode().toString();
}
+
JMSException ex = new JMSException(exc.getMessage(), code);
ex.initCause(exc);
-
- _conn._exceptionListener.onException(ex);
+ listener.onException(ex);
}
}
- public void closed(Connection conn) {}
+ public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E
+ {
+ try
+ {
+ return operation.execute();
+ }
+ catch (FailoverException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index 8d42a2f201..035e3830ca 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -247,4 +247,41 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
throw new AMQException(null, "Error reopening channel " + channelId + " after failover: " + e, e);
}
}
+
+ public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E
+ {
+ while (true)
+ {
+ try
+ {
+ _conn.blockUntilNotFailingOver();
+ }
+ catch (InterruptedException e)
+ {
+ _logger.debug("Interrupted: " + e, e);
+
+ return null;
+ }
+
+ synchronized (_conn.getFailoverMutex())
+ {
+ try
+ {
+ return operation.execute();
+ }
+ catch (FailoverException e)
+ {
+ _logger.debug("Failover exception caught during operation: " + e, e);
+ }
+ catch (IllegalStateException e)
+ {
+ if (!(e.getMessage().startsWith("Fail-over interupted no-op failover support")))
+ {
+ throw e;
+ }
+ }
+ }
+ }
+ }
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index cbdefd0548..b5d12d9520 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -261,6 +261,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
/** Holds the highest received delivery tag. */
private final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
+ private final AtomicLong _rollbackMark = new AtomicLong(-1);
/** All the not yet acknowledged message tags */
protected ConcurrentLinkedQueue<Long> _unacknowledgedMessageTags = new ConcurrentLinkedQueue<Long>();
@@ -1809,6 +1810,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
_failedOverDirty = true;
}
+
+ _rollbackMark.set(-1);
resubscribeProducers();
resubscribeConsumers();
}
@@ -2601,7 +2604,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
/** Used for debugging in the dispatcher. */
private static final Logger _dispatcherLogger = LoggerFactory.getLogger("org.apache.qpid.client.AMQSession.Dispatcher");
-
/** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
class Dispatcher extends Thread
@@ -2611,7 +2613,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
private final AtomicBoolean _closed = new AtomicBoolean(false);
private final Object _lock = new Object();
- private final AtomicLong _rollbackMark = new AtomicLong(-1);
private String dispatcherID = "" + System.identityHashCode(this);
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 7829966315..ab983aa842 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -113,11 +113,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
super(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetchHighMark,
defaultPrefetchLowMark);
_qpidConnection = qpidConnection;
- // create the qpid session with an expiry <= 0 so that the session does not expire
- _qpidSession = qpidConnection.createSession(0);
- // set the exception listnere for this session
+ _qpidSession = _qpidConnection.createSession(1);
_qpidSession.setSessionListener(this);
- // set transacted if required
if (_transacted)
{
_qpidSession.txSelect();
diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java
index cf7e978c03..e9e52cc97c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java
+++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java
@@ -99,37 +99,6 @@ public class FailoverRetrySupport<T, E extends Exception> implements FailoverSup
*/
public T execute() throws E
{
- while (true)
- {
- try
- {
- connection.blockUntilNotFailingOver();
- }
- catch (InterruptedException e)
- {
- _log.debug("Interrupted: " + e, e);
-
- return null;
- }
-
- synchronized (connection.getFailoverMutex())
- {
- try
- {
- return operation.execute();
- }
- catch (FailoverException e)
- {
- _log.debug("Failover exception caught during operation: " + e, e);
- }
- catch (IllegalStateException e)
- {
- if (!(e.getMessage().startsWith("Fail-over interupted no-op failover support")))
- {
- throw e;
- }
- }
- }
- }
+ return connection.executeRetrySupport(operation);
}
}