diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2009-10-25 22:58:57 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2009-10-25 22:58:57 +0000 |
| commit | afcf8099695253651c73910a243fb29aa520b008 (patch) | |
| tree | e514bc51797181c567500a8ddbfc20ea9b89b908 /qpid/java/client | |
| parent | f315ac548e346ded9ed1d081db4118e703c362b4 (diff) | |
| download | qpid-python-afcf8099695253651c73910a243fb29aa520b008.tar.gz | |
Merged from java-broker-0-10 branch
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@829675 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
9 files changed, 80 insertions, 67 deletions
diff --git a/qpid/java/client/build.xml b/qpid/java/client/build.xml index 73252ac124..3c6132dc5b 100644 --- a/qpid/java/client/build.xml +++ b/qpid/java/client/build.xml @@ -20,7 +20,8 @@ --> <project name="AMQ Client" default="build"> - <property name="module.depends" value="common common/test"/> + <property name="module.depends" value="common"/> + <property name="module.test.depends" value="common/test" /> <property name="module.genpom" value="true"/> <property name="module.genpom.args" value="-Sgeronimo-jms_1.1_spec=provided"/> diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index ed122a772e..b57c834598 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -214,14 +214,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect if ((id & FAST_CHANNEL_ACCESS_MASK) == 0) { done = (_fastAccessSessions[id] == null); - } + } else { done = (!_slowAccessSessions.keySet().contains(id)); } } } - + return id; } @@ -320,11 +320,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect //Indicates whether we need to sync on every message ack private boolean _syncAck; - + //Indicates the sync publish options (persistent|all) //By default it's async publish - private String _syncPublish = ""; - + private String _syncPublish = ""; + /** * @param broker brokerdetails * @param username username @@ -418,7 +418,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE) != null) { - _syncPersistence = + _syncPersistence = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE)); _logger.warn("sync_persistence is a deprecated property, " + "please use sync_publish={persistent|all} instead"); @@ -453,10 +453,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // use the default value set for all connections _syncPublish = System.getProperty((ClientProperties.SYNC_ACK_PROP_NAME),_syncPublish); } - + + String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), "0-10"); + _failoverPolicy = new FailoverPolicy(connectionURL, this); BrokerDetails brokerDetails = _failoverPolicy.getCurrentBrokerDetails(); - if (brokerDetails.getTransport().equals(BrokerDetails.VM)) + if (brokerDetails.getTransport().equals(BrokerDetails.VM) || "0-8".equals(amqpVersion) || "0-9".equals(amqpVersion)) { _delegate = new AMQConnectionDelegate_8_0(this); } @@ -538,7 +540,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } else if (!_connected) { - retryAllowed = _failoverPolicy.failoverAllowed(); + retryAllowed = _failoverPolicy.failoverAllowed(); brokerDetails = _failoverPolicy.getNextBrokerDetails(); } } @@ -591,7 +593,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect throw new AMQConnectionFailureException(message, connectionException); } - + _connectionMetaData = new QpidConnectionMetaData(this); } @@ -1573,7 +1575,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { return _syncPersistence; } - + /** * Indicates whether we need to sync on every message ack */ @@ -1581,12 +1583,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { return _syncAck; } - + public String getSyncPublish() { return _syncPublish; } - + public void setIdleTimeout(long l) { _delegate.setIdleTimeout(l); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 97d0d0516e..e1d9ae735c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -89,11 +89,11 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate StateWaiter waiter = _conn._protocolHandler.createWaiter(openOrClosedStates); // TODO: use system property thingy for this - if (System.getProperty("UseTransportIo", "false").equals("false")) + if (System.getProperty("UseTransportIo", "false").equals("false")) { TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail); - } - else + } + else { _conn.getProtocolHandler().createIoTransportSession(brokerDetail); } @@ -197,7 +197,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate * Low = MaxPrefetch / 2 * @return XASession * @throws JMSException thrown if there is a problem creating the session. - */ + */ public XASession createXASession() throws JMSException { return createXASession((int) _conn.getMaxPrefetch(), (int) _conn.getMaxPrefetch() / 2); @@ -214,7 +214,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate // todo Be aware of possible changes to parameter order as versions change. BasicQosBody basicQosBody = _conn.getProtocolHandler().getMethodRegistry().createBasicQosBody(0,prefetchHigh,false); _conn._protocolHandler.syncWrite(basicQosBody.generateFrame(channelId),BasicQosOkBody.class); - + if (transacted) { if (_logger.isDebugEnabled()) @@ -222,7 +222,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate _logger.debug("Issuing TxSelect for " + channelId); } TxSelectBody body = _conn.getProtocolHandler().getMethodRegistry().createTxSelectBody(); - + // TODO: Be aware of possible changes to parameter order as versions change. _conn._protocolHandler.syncWrite(body.generateFrame(channelId), TxSelectOkBody.class); } @@ -299,7 +299,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate } } } - + public void setIdleTimeout(long l){} public int getMaxChannelID() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 1587d6a6bf..2324d441cc 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -468,7 +468,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey,AMQShortString[] bindingKeys) throws JMSException { - String rk = ""; + String rk = null; boolean res; if (bindingKeys != null && bindingKeys.length>0) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 44ce59975a..df59be25d0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -46,14 +46,14 @@ import org.slf4j.LoggerFactory; public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer { - enum PublishMode { ASYNC_PUBLISH_ALL, SYNC_PUBLISH_PERSISTENT, SYNC_PUBLISH_ALL }; - + enum PublishMode { ASYNC_PUBLISH_ALL, SYNC_PUBLISH_PERSISTENT, SYNC_PUBLISH_ALL }; + protected final Logger _logger = LoggerFactory.getLogger(getClass()); private AMQConnection _connection; /** - * If true, messages will not get a timestamp. + * If true, messages will not get a timestamp. */ protected boolean _disableTimestamps; @@ -105,7 +105,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac private long _producerId; /** - * The session used to create this producer + * The session used to create this producer */ protected AMQSession _session; @@ -118,11 +118,11 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac private boolean _disableMessageId; private UUIDGen _messageIdGenerator = UUIDs.newGenerator(); - + protected String _userID; // ref user id used in the connection. private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0]; - + protected PublishMode publishMode = PublishMode.ASYNC_PUBLISH_ALL; protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, @@ -145,14 +145,14 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac _mandatory = mandatory; _waitUntilSent = waitUntilSent; _userID = connection.getUsername(); - setPublishMode(); + setPublishMode(); } - + void setPublishMode() { // Publish mode could be configured at destination level as well. // Will add support for this when we provide a more robust binding URL - + String syncPub = _connection.getSyncPublish(); // Support for deprecated option sync_persistence if (syncPub.equals("persistent") || _connection.getSyncPersistence()) @@ -163,7 +163,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac { publishMode = PublishMode.SYNC_PUBLISH_ALL; } - + _logger.info("MessageProducer " + toString() + " using publish mode : " + publishMode); } @@ -277,6 +277,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac checkPreConditions(); checkInitialDestination(); + synchronized (_connection.getFailoverMutex()) { sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate); @@ -548,6 +549,10 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac { throw new javax.jms.IllegalStateException("Invalid Session"); } + if(_session.getAMQConnection().isClosed()) + { + throw new javax.jms.IllegalStateException("Connection closed"); + } } private void checkInitialDestination() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java b/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java index be0d283470..e5050b4fbd 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java @@ -33,7 +33,7 @@ public class ClientProperties public static final String IGNORE_SET_CLIENTID_PROP_NAME = "ignore_setclientID"; /** - * This property is currently used within the 0.10 code path only + * This property is currently used within the 0.10 code path only * The maximum number of pre-fetched messages per destination * This property is used for all the connection unless it is overwritten by the connectionURL * type: long @@ -46,13 +46,13 @@ public class ClientProperties * type: boolean */ public static final String SYNC_PERSISTENT_PROP_NAME = "sync_persistence"; - + /** * When true a sync command is sent after sending a message ack. * type: boolean */ public static final String SYNC_ACK_PROP_NAME = "sync_ack"; - + /** * sync_publish property - {persistent|all} * If set to 'persistent',then persistent messages will be publish synchronously @@ -60,17 +60,17 @@ public class ClientProperties * published synchronously. */ public static final String SYNC_PUBLISH_PROP_NAME = "sync_publish"; - + /** * This value will be used in the following settings * To calculate the SO_TIMEOUT option of the socket (2*idle_timeout) * If this values is between the max and min values specified for heartbeat * by the broker in TuneOK it will be used as the heartbeat interval. - * If not a warning will be printed and the max value specified for + * If not a warning will be printed and the max value specified for * heartbeat in TuneOK will be used */ public static final String IDLE_TIMEOUT_PROP_NAME = "idle_timeout"; - + /** * ========================================================== @@ -100,4 +100,6 @@ public class ClientProperties */ public static final String WRITE_BUFFER_LIMIT_PROP_NAME = "qpid.read.buffer.limit"; public static final String WRITE_BUFFER_LIMIT_DEFAULT = "262144"; + + public static final String AMQP_VERSION = "qpid.amqp.version"; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java index 190776891e..f74dbba939 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java @@ -21,7 +21,6 @@ package org.apache.qpid.client.failover; import org.apache.qpid.AMQDisconnectedException; -import org.apache.qpid.AMQException; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.AMQState; @@ -94,7 +93,6 @@ public class FailoverHandler implements Runnable * Creates a failover handler on a protocol session, for a particular MINA session (network connection). * * @param amqProtocolHandler The protocol handler that spans the failover. - * @param session The MINA session, for the failing connection. */ public FailoverHandler(AMQProtocolHandler amqProtocolHandler) { @@ -135,10 +133,12 @@ public class FailoverHandler implements Runnable // have a state waiter waiting until the connection is closed for some reason. Or in future we may have // a slightly more complex state model therefore I felt it was worthwhile doing this. AMQStateManager existingStateManager = _amqProtocolHandler.getStateManager(); - + + // Use a fresh new StateManager for the reconnection attempts _amqProtocolHandler.setStateManager(new AMQStateManager()); + if (!_amqProtocolHandler.getConnection().firePreFailover(_host != null)) { _logger.info("Failover process veto-ed by client"); @@ -190,7 +190,7 @@ public class FailoverHandler implements Runnable } else { - // Set the new Protocol Session in the StateManager. + // Set the new Protocol Session in the StateManager. existingStateManager.setProtocolSession(_amqProtocolHandler.getProtocolSession()); // Now that the ProtocolHandler has been reconnected clean up @@ -198,7 +198,7 @@ public class FailoverHandler implements Runnable // it any old exception that had occured prior to failover may // prohibit reconnection. // e.g. During testing when the broker is shutdown gracefully. - // The broker + // The broker // Clear any exceptions we gathered if (existingStateManager.getCurrentState() != AMQState.CONNECTION_OPEN) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 35bc521c80..6500a82818 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -110,10 +110,6 @@ import org.slf4j.LoggerFactory; * <tr><td> * </table> * - * @todo Explain the system property: amqj.shared_read_write_pool. How does putting the protocol codec filter before the - * async write filter make it a shared pool? The pooling filter uses the same thread pool for reading and writing - * anyway, see {@link org.apache.qpid.pool.PoolingFilter}, docs for comments. Will putting the protocol codec - * filter before it mean not doing the read/write asynchronously but in the main filter thread? * @todo Use a single handler instance, by shifting everything to do with the 'protocol session' state, including * failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of * AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could @@ -172,10 +168,10 @@ public class AMQProtocolHandler implements ProtocolEngine private Job _writeJob; private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance(); private NetworkDriver _networkDriver; - + private long _writtenBytes; private long _readBytes; - + /** * Creates a new protocol handler, associated with the specified client connection instance. * @@ -215,10 +211,6 @@ public class AMQProtocolHandler implements ProtocolEngine * process will be started, provided that it is the clients policy to allow failover, and provided that a failover * has not already been started or failed. * - * <p/>It is important to note that when the connection dies this method may be called or {@link #exceptionCaught} - * may be called first followed by this method. This depends on whether the client was trying to send data at the - * time of the failure. - * * @todo Clarify: presumably exceptionCaught is called when the client is sending during a connection failure and * not otherwise? The above comment doesn't make that clear. */ @@ -261,7 +253,7 @@ public class AMQProtocolHandler implements ProtocolEngine { _logger.debug("sessionClose() not allowed to failover"); _connection.exceptionReceived(new AMQDisconnectedException( - "Server closed connection and reconnection " + "not permitted.", + "Server closed connection and reconnection " + "not permitted.", _stateManager.getLastException())); } else @@ -277,12 +269,15 @@ public class AMQProtocolHandler implements ProtocolEngine /** See {@link FailoverHandler} to see rationale for separate thread. */ private void startFailoverThread() { - Thread failoverThread = new Thread(_failoverHandler); - failoverThread.setName("Failover"); - // Do not inherit daemon-ness from current thread as this can be a daemon - // thread such as a AnonymousIoService thread. - failoverThread.setDaemon(false); - failoverThread.start(); + if(!_connection.isClosed()) + { + Thread failoverThread = new Thread(_failoverHandler); + failoverThread.setName("Failover"); + // Do not inherit daemon-ness from current thread as this can be a daemon + // thread such as a AnonymousIoService thread. + failoverThread.setDaemon(false); + failoverThread.start(); + } } public void readerIdle() @@ -293,7 +288,7 @@ public class AMQProtocolHandler implements ProtocolEngine _logger.warn("Timed out while waiting for heartbeat from peer."); _networkDriver.close(); } - + public void writerIdle() { _logger.debug("Protocol Session [" + this + "] idle: reader"); @@ -365,6 +360,7 @@ public class AMQProtocolHandler implements ProtocolEngine public void propagateExceptionToAllWaiters(Exception e) { getStateManager().error(e); + propagateExceptionToFrameListeners(e); } @@ -582,7 +578,7 @@ public class AMQProtocolHandler implements ProtocolEngine } _connection.bytesSent(_writtenBytes); - + if (wait) { _networkDriver.flush(); @@ -642,7 +638,7 @@ public class AMQProtocolHandler implements ProtocolEngine _frameListeners.add(listener); //FIXME: At this point here we should check or before add we should check _stateManager is in an open - // state so as we don't check we are likely just to time out here as I believe is being seen in QPID-1255 + // state so as we don't check we are likely just to time out here as I believe is being seen in QPID-1255 } writeFrame(frame); @@ -828,7 +824,7 @@ public class AMQProtocolHandler implements ProtocolEngine { _networkDriver = driver; } - + /** @param delay delay in seconds (not ms) */ void initHeartbeats(int delay) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index cd049c24a1..8910920017 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -102,7 +102,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession public AMQProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection) { - _protocolHandler = protocolHandler; + _protocolHandler = protocolHandler; _protocolVersion = connection.getProtocolVersion(); _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(), this); @@ -156,7 +156,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession public SaslClient getSaslClient() { - return _saslClient; + return _saslClient; } /** @@ -192,7 +192,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession * @throws AMQException if this was not expected */ public void unprocessedMessageReceived(final int channelId, UnprocessedMessage message) throws AMQException - { + { if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) { _channelId2UnprocessedMsgArray[channelId] = message; @@ -468,4 +468,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { // No-op, interface munging } + + + @Override + public String toString() + { + return "AMQProtocolSession[" + _connection + ']'; + } } |
