summaryrefslogtreecommitdiff
path: root/qpid/java/client/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2009-10-25 22:58:57 +0000
committerRobert Godfrey <rgodfrey@apache.org>2009-10-25 22:58:57 +0000
commitafcf8099695253651c73910a243fb29aa520b008 (patch)
treee514bc51797181c567500a8ddbfc20ea9b89b908 /qpid/java/client/src
parentf315ac548e346ded9ed1d081db4118e703c362b4 (diff)
downloadqpid-python-afcf8099695253651c73910a243fb29aa520b008.tar.gz
Merged from java-broker-0-10 branch
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@829675 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client/src')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java28
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java14
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java25
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java14
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java10
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java38
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java13
8 files changed, 78 insertions, 66 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index ed122a772e..b57c834598 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -214,14 +214,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
if ((id & FAST_CHANNEL_ACCESS_MASK) == 0)
{
done = (_fastAccessSessions[id] == null);
- }
+ }
else
{
done = (!_slowAccessSessions.keySet().contains(id));
}
}
}
-
+
return id;
}
@@ -320,11 +320,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
//Indicates whether we need to sync on every message ack
private boolean _syncAck;
-
+
//Indicates the sync publish options (persistent|all)
//By default it's async publish
- private String _syncPublish = "";
-
+ private String _syncPublish = "";
+
/**
* @param broker brokerdetails
* @param username username
@@ -418,7 +418,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE) != null)
{
- _syncPersistence =
+ _syncPersistence =
Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE));
_logger.warn("sync_persistence is a deprecated property, " +
"please use sync_publish={persistent|all} instead");
@@ -453,10 +453,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
// use the default value set for all connections
_syncPublish = System.getProperty((ClientProperties.SYNC_ACK_PROP_NAME),_syncPublish);
}
-
+
+ String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), "0-10");
+
_failoverPolicy = new FailoverPolicy(connectionURL, this);
BrokerDetails brokerDetails = _failoverPolicy.getCurrentBrokerDetails();
- if (brokerDetails.getTransport().equals(BrokerDetails.VM))
+ if (brokerDetails.getTransport().equals(BrokerDetails.VM) || "0-8".equals(amqpVersion) || "0-9".equals(amqpVersion))
{
_delegate = new AMQConnectionDelegate_8_0(this);
}
@@ -538,7 +540,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
else if (!_connected)
{
- retryAllowed = _failoverPolicy.failoverAllowed();
+ retryAllowed = _failoverPolicy.failoverAllowed();
brokerDetails = _failoverPolicy.getNextBrokerDetails();
}
}
@@ -591,7 +593,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
throw new AMQConnectionFailureException(message, connectionException);
}
-
+
_connectionMetaData = new QpidConnectionMetaData(this);
}
@@ -1573,7 +1575,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
return _syncPersistence;
}
-
+
/**
* Indicates whether we need to sync on every message ack
*/
@@ -1581,12 +1583,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
return _syncAck;
}
-
+
public String getSyncPublish()
{
return _syncPublish;
}
-
+
public void setIdleTimeout(long l)
{
_delegate.setIdleTimeout(l);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index 97d0d0516e..e1d9ae735c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -89,11 +89,11 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
StateWaiter waiter = _conn._protocolHandler.createWaiter(openOrClosedStates);
// TODO: use system property thingy for this
- if (System.getProperty("UseTransportIo", "false").equals("false"))
+ if (System.getProperty("UseTransportIo", "false").equals("false"))
{
TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail);
- }
- else
+ }
+ else
{
_conn.getProtocolHandler().createIoTransportSession(brokerDetail);
}
@@ -197,7 +197,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
* Low = MaxPrefetch / 2
* @return XASession
* @throws JMSException thrown if there is a problem creating the session.
- */
+ */
public XASession createXASession() throws JMSException
{
return createXASession((int) _conn.getMaxPrefetch(), (int) _conn.getMaxPrefetch() / 2);
@@ -214,7 +214,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
// todo Be aware of possible changes to parameter order as versions change.
BasicQosBody basicQosBody = _conn.getProtocolHandler().getMethodRegistry().createBasicQosBody(0,prefetchHigh,false);
_conn._protocolHandler.syncWrite(basicQosBody.generateFrame(channelId),BasicQosOkBody.class);
-
+
if (transacted)
{
if (_logger.isDebugEnabled())
@@ -222,7 +222,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
_logger.debug("Issuing TxSelect for " + channelId);
}
TxSelectBody body = _conn.getProtocolHandler().getMethodRegistry().createTxSelectBody();
-
+
// TODO: Be aware of possible changes to parameter order as versions change.
_conn._protocolHandler.syncWrite(body.generateFrame(channelId), TxSelectOkBody.class);
}
@@ -299,7 +299,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
}
}
}
-
+
public void setIdleTimeout(long l){}
public int getMaxChannelID()
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 1587d6a6bf..2324d441cc 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -468,7 +468,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey,AMQShortString[] bindingKeys)
throws JMSException
{
- String rk = "";
+ String rk = null;
boolean res;
if (bindingKeys != null && bindingKeys.length>0)
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 44ce59975a..df59be25d0 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -46,14 +46,14 @@ import org.slf4j.LoggerFactory;
public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
{
- enum PublishMode { ASYNC_PUBLISH_ALL, SYNC_PUBLISH_PERSISTENT, SYNC_PUBLISH_ALL };
-
+ enum PublishMode { ASYNC_PUBLISH_ALL, SYNC_PUBLISH_PERSISTENT, SYNC_PUBLISH_ALL };
+
protected final Logger _logger = LoggerFactory.getLogger(getClass());
private AMQConnection _connection;
/**
- * If true, messages will not get a timestamp.
+ * If true, messages will not get a timestamp.
*/
protected boolean _disableTimestamps;
@@ -105,7 +105,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
private long _producerId;
/**
- * The session used to create this producer
+ * The session used to create this producer
*/
protected AMQSession _session;
@@ -118,11 +118,11 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
private boolean _disableMessageId;
private UUIDGen _messageIdGenerator = UUIDs.newGenerator();
-
+
protected String _userID; // ref user id used in the connection.
private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0];
-
+
protected PublishMode publishMode = PublishMode.ASYNC_PUBLISH_ALL;
protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
@@ -145,14 +145,14 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
_mandatory = mandatory;
_waitUntilSent = waitUntilSent;
_userID = connection.getUsername();
- setPublishMode();
+ setPublishMode();
}
-
+
void setPublishMode()
{
// Publish mode could be configured at destination level as well.
// Will add support for this when we provide a more robust binding URL
-
+
String syncPub = _connection.getSyncPublish();
// Support for deprecated option sync_persistence
if (syncPub.equals("persistent") || _connection.getSyncPersistence())
@@ -163,7 +163,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
{
publishMode = PublishMode.SYNC_PUBLISH_ALL;
}
-
+
_logger.info("MessageProducer " + toString() + " using publish mode : " + publishMode);
}
@@ -277,6 +277,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
checkPreConditions();
checkInitialDestination();
+
synchronized (_connection.getFailoverMutex())
{
sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate);
@@ -548,6 +549,10 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
{
throw new javax.jms.IllegalStateException("Invalid Session");
}
+ if(_session.getAMQConnection().isClosed())
+ {
+ throw new javax.jms.IllegalStateException("Connection closed");
+ }
}
private void checkInitialDestination()
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java b/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
index be0d283470..e5050b4fbd 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
@@ -33,7 +33,7 @@ public class ClientProperties
public static final String IGNORE_SET_CLIENTID_PROP_NAME = "ignore_setclientID";
/**
- * This property is currently used within the 0.10 code path only
+ * This property is currently used within the 0.10 code path only
* The maximum number of pre-fetched messages per destination
* This property is used for all the connection unless it is overwritten by the connectionURL
* type: long
@@ -46,13 +46,13 @@ public class ClientProperties
* type: boolean
*/
public static final String SYNC_PERSISTENT_PROP_NAME = "sync_persistence";
-
+
/**
* When true a sync command is sent after sending a message ack.
* type: boolean
*/
public static final String SYNC_ACK_PROP_NAME = "sync_ack";
-
+
/**
* sync_publish property - {persistent|all}
* If set to 'persistent',then persistent messages will be publish synchronously
@@ -60,17 +60,17 @@ public class ClientProperties
* published synchronously.
*/
public static final String SYNC_PUBLISH_PROP_NAME = "sync_publish";
-
+
/**
* This value will be used in the following settings
* To calculate the SO_TIMEOUT option of the socket (2*idle_timeout)
* If this values is between the max and min values specified for heartbeat
* by the broker in TuneOK it will be used as the heartbeat interval.
- * If not a warning will be printed and the max value specified for
+ * If not a warning will be printed and the max value specified for
* heartbeat in TuneOK will be used
*/
public static final String IDLE_TIMEOUT_PROP_NAME = "idle_timeout";
-
+
/**
* ==========================================================
@@ -100,4 +100,6 @@ public class ClientProperties
*/
public static final String WRITE_BUFFER_LIMIT_PROP_NAME = "qpid.read.buffer.limit";
public static final String WRITE_BUFFER_LIMIT_DEFAULT = "262144";
+
+ public static final String AMQP_VERSION = "qpid.amqp.version";
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
index 190776891e..f74dbba939 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
@@ -21,7 +21,6 @@
package org.apache.qpid.client.failover;
import org.apache.qpid.AMQDisconnectedException;
-import org.apache.qpid.AMQException;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.AMQState;
@@ -94,7 +93,6 @@ public class FailoverHandler implements Runnable
* Creates a failover handler on a protocol session, for a particular MINA session (network connection).
*
* @param amqProtocolHandler The protocol handler that spans the failover.
- * @param session The MINA session, for the failing connection.
*/
public FailoverHandler(AMQProtocolHandler amqProtocolHandler)
{
@@ -135,10 +133,12 @@ public class FailoverHandler implements Runnable
// have a state waiter waiting until the connection is closed for some reason. Or in future we may have
// a slightly more complex state model therefore I felt it was worthwhile doing this.
AMQStateManager existingStateManager = _amqProtocolHandler.getStateManager();
-
+
+
// Use a fresh new StateManager for the reconnection attempts
_amqProtocolHandler.setStateManager(new AMQStateManager());
+
if (!_amqProtocolHandler.getConnection().firePreFailover(_host != null))
{
_logger.info("Failover process veto-ed by client");
@@ -190,7 +190,7 @@ public class FailoverHandler implements Runnable
}
else
{
- // Set the new Protocol Session in the StateManager.
+ // Set the new Protocol Session in the StateManager.
existingStateManager.setProtocolSession(_amqProtocolHandler.getProtocolSession());
// Now that the ProtocolHandler has been reconnected clean up
@@ -198,7 +198,7 @@ public class FailoverHandler implements Runnable
// it any old exception that had occured prior to failover may
// prohibit reconnection.
// e.g. During testing when the broker is shutdown gracefully.
- // The broker
+ // The broker
// Clear any exceptions we gathered
if (existingStateManager.getCurrentState() != AMQState.CONNECTION_OPEN)
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 35bc521c80..6500a82818 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -110,10 +110,6 @@ import org.slf4j.LoggerFactory;
* <tr><td>
* </table>
*
- * @todo Explain the system property: amqj.shared_read_write_pool. How does putting the protocol codec filter before the
- * async write filter make it a shared pool? The pooling filter uses the same thread pool for reading and writing
- * anyway, see {@link org.apache.qpid.pool.PoolingFilter}, docs for comments. Will putting the protocol codec
- * filter before it mean not doing the read/write asynchronously but in the main filter thread?
* @todo Use a single handler instance, by shifting everything to do with the 'protocol session' state, including
* failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of
* AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could
@@ -172,10 +168,10 @@ public class AMQProtocolHandler implements ProtocolEngine
private Job _writeJob;
private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
private NetworkDriver _networkDriver;
-
+
private long _writtenBytes;
private long _readBytes;
-
+
/**
* Creates a new protocol handler, associated with the specified client connection instance.
*
@@ -215,10 +211,6 @@ public class AMQProtocolHandler implements ProtocolEngine
* process will be started, provided that it is the clients policy to allow failover, and provided that a failover
* has not already been started or failed.
*
- * <p/>It is important to note that when the connection dies this method may be called or {@link #exceptionCaught}
- * may be called first followed by this method. This depends on whether the client was trying to send data at the
- * time of the failure.
- *
* @todo Clarify: presumably exceptionCaught is called when the client is sending during a connection failure and
* not otherwise? The above comment doesn't make that clear.
*/
@@ -261,7 +253,7 @@ public class AMQProtocolHandler implements ProtocolEngine
{
_logger.debug("sessionClose() not allowed to failover");
_connection.exceptionReceived(new AMQDisconnectedException(
- "Server closed connection and reconnection " + "not permitted.",
+ "Server closed connection and reconnection " + "not permitted.",
_stateManager.getLastException()));
}
else
@@ -277,12 +269,15 @@ public class AMQProtocolHandler implements ProtocolEngine
/** See {@link FailoverHandler} to see rationale for separate thread. */
private void startFailoverThread()
{
- Thread failoverThread = new Thread(_failoverHandler);
- failoverThread.setName("Failover");
- // Do not inherit daemon-ness from current thread as this can be a daemon
- // thread such as a AnonymousIoService thread.
- failoverThread.setDaemon(false);
- failoverThread.start();
+ if(!_connection.isClosed())
+ {
+ Thread failoverThread = new Thread(_failoverHandler);
+ failoverThread.setName("Failover");
+ // Do not inherit daemon-ness from current thread as this can be a daemon
+ // thread such as a AnonymousIoService thread.
+ failoverThread.setDaemon(false);
+ failoverThread.start();
+ }
}
public void readerIdle()
@@ -293,7 +288,7 @@ public class AMQProtocolHandler implements ProtocolEngine
_logger.warn("Timed out while waiting for heartbeat from peer.");
_networkDriver.close();
}
-
+
public void writerIdle()
{
_logger.debug("Protocol Session [" + this + "] idle: reader");
@@ -365,6 +360,7 @@ public class AMQProtocolHandler implements ProtocolEngine
public void propagateExceptionToAllWaiters(Exception e)
{
getStateManager().error(e);
+
propagateExceptionToFrameListeners(e);
}
@@ -582,7 +578,7 @@ public class AMQProtocolHandler implements ProtocolEngine
}
_connection.bytesSent(_writtenBytes);
-
+
if (wait)
{
_networkDriver.flush();
@@ -642,7 +638,7 @@ public class AMQProtocolHandler implements ProtocolEngine
_frameListeners.add(listener);
//FIXME: At this point here we should check or before add we should check _stateManager is in an open
- // state so as we don't check we are likely just to time out here as I believe is being seen in QPID-1255
+ // state so as we don't check we are likely just to time out here as I believe is being seen in QPID-1255
}
writeFrame(frame);
@@ -828,7 +824,7 @@ public class AMQProtocolHandler implements ProtocolEngine
{
_networkDriver = driver;
}
-
+
/** @param delay delay in seconds (not ms) */
void initHeartbeats(int delay)
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index cd049c24a1..8910920017 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -102,7 +102,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
public AMQProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
{
- _protocolHandler = protocolHandler;
+ _protocolHandler = protocolHandler;
_protocolVersion = connection.getProtocolVersion();
_methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
this);
@@ -156,7 +156,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
public SaslClient getSaslClient()
{
- return _saslClient;
+ return _saslClient;
}
/**
@@ -192,7 +192,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
* @throws AMQException if this was not expected
*/
public void unprocessedMessageReceived(final int channelId, UnprocessedMessage message) throws AMQException
- {
+ {
if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
{
_channelId2UnprocessedMsgArray[channelId] = message;
@@ -468,4 +468,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
// No-op, interface munging
}
+
+
+ @Override
+ public String toString()
+ {
+ return "AMQProtocolSession[" + _connection + ']';
+ }
}