diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2012-01-27 20:15:31 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2012-01-27 20:15:31 +0000 |
| commit | 4dd9cbaf7fdc498a4eb5f2652d88afd20fe5d530 (patch) | |
| tree | e49a2b2dab05502118d48385e0989faab41feb45 /java/client | |
| parent | ab01c9c19e109b2f91cb505f53497592c52ca88d (diff) | |
| download | qpid-python-4dd9cbaf7fdc498a4eb5f2652d88afd20fe5d530.tar.gz | |
NO-JIRA: Encapsulate fields, use private members and accesors (keep checkstyle happy)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1236867 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
64 files changed, 1065 insertions, 665 deletions
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/OptionParser.java b/java/client/example/src/main/java/org/apache/qpid/example/OptionParser.java index 9548eab4c5..6cc6db1974 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/OptionParser.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/OptionParser.java @@ -147,8 +147,8 @@ public class OptionParser for (Option option: optDefs) { - if ((op.startsWith("-") && option.shortForm != null && option.shortForm.equals(key)) || - (op.startsWith("--") && option.longForm != null && option.longForm.equals(key)) ) + if ((op.startsWith("-") && option.getShortForm() != null && option.getShortForm().equals(key)) || + (op.startsWith("--") && option.getLongForm() != null && option.getLongForm().equals(key)) ) { match = true; break; @@ -219,18 +219,18 @@ public class OptionParser protected boolean containsOp(Option op) { - return optMap.containsKey(op.shortForm) || optMap.containsKey(op.longForm); + return optMap.containsKey(op.getShortForm()) || optMap.containsKey(op.getLongForm()); } protected String getOp(Option op) { - if (optMap.containsKey(op.shortForm)) + if (optMap.containsKey(op.getShortForm())) { - return (String)optMap.get(op.shortForm); + return (String)optMap.get(op.getShortForm()); } - else if (optMap.containsKey(op.longForm)) + else if (optMap.containsKey(op.getLongForm())) { - return (String)optMap.get(op.longForm); + return (String)optMap.get(op.getLongForm()); } else { @@ -286,12 +286,12 @@ public class OptionParser static class Option { - private String shortForm; - private String longForm; - private String desc; - private String valueLabel; - private String defaultValue; - private Class type; + private final String shortForm; + private final String longForm; + private final String desc; + private final String valueLabel; + private final String defaultValue; + private final Class type; public Option(String shortForm, String longForm, String desc, String valueLabel, String defaultValue, Class type) diff --git a/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java index 3d16e01af4..2b1e641689 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java @@ -103,7 +103,7 @@ public class MonitorMessageDispatcher // (FileMessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(),"monitor:" +System.currentTimeMillis())); getMonitorPublisher().sendMessage - (getMonitorPublisher()._session, + (getMonitorPublisher().getSession(), FileMessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(), "monitor:" + System.currentTimeMillis()), DeliveryMode.PERSISTENT, false, true); diff --git a/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java index 750f57d9dc..b2bb0893d8 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java @@ -36,7 +36,7 @@ public class MonitorPublisher extends Publisher private static final Logger _log = LoggerFactory.getLogger(Publisher.class); - BasicMessageProducer _producer; + private BasicMessageProducer _producer; public MonitorPublisher() { @@ -51,14 +51,14 @@ public class MonitorPublisher extends Publisher { try { - _producer = (BasicMessageProducer) session.createProducer(_destination); + _producer = (BasicMessageProducer) session.createProducer(getDestination()); _producer.send(message, deliveryMode, immediate); if (commit) { //commit the message send and close the transaction - _session.commit(); + getSession().commit(); } } @@ -70,7 +70,7 @@ public class MonitorPublisher extends Publisher throw new UndeliveredMessageException("Cannot deliver immediate message", e); } - _log.info(_name + " finished sending message: " + message); + _log.info(getName() + " finished sending message: " + message); return true; } @@ -81,14 +81,14 @@ public class MonitorPublisher extends Publisher { try { - _producer = (BasicMessageProducer) _session.createProducer(_destination); + _producer = (BasicMessageProducer) getSession().createProducer(getDestination()); //Send message via our producer which is not persistent and is immediate //NB: not available via jms interface MessageProducer _producer.send(message, DeliveryMode.NON_PERSISTENT, true); //commit the message send and close the transaction - _session.commit(); + getSession().commit(); } catch (JMSException e) @@ -99,7 +99,7 @@ public class MonitorPublisher extends Publisher throw new UndeliveredMessageException("Cannot deliver immediate message", e); } - _log.info(_name + " finished sending message: " + message); + _log.info(getName() + " finished sending message: " + message); return true; } } diff --git a/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java index b5f44557a4..76531523b9 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java @@ -34,19 +34,19 @@ public class Publisher protected InitialContextHelper _contextHelper; - protected Connection _connection; + private Connection _connection; - protected Session _session; + private Session _session; - protected MessageProducer _producer; + private MessageProducer _producer; - protected String _destinationDir; + private String _destinationDir; - protected String _name = "Publisher"; + private String _name = "Publisher"; - protected Destination _destination; + private Destination _destination; - protected static final String _defaultDestinationDir = "/tmp"; + private static final String _defaultDestinationDir = "/tmp"; /** * Creates a Publisher instance using properties from example.properties @@ -62,9 +62,9 @@ public class Publisher //then create a connection using the AMQConnectionFactory AMQConnectionFactory cf = (AMQConnectionFactory) ctx.lookup("local"); - _connection = cf.createConnection(); + setConnection(cf.createConnection()); - _connection.setExceptionListener(new ExceptionListener() + getConnection().setExceptionListener(new ExceptionListener() { public void onException(JMSException jmse) { @@ -76,19 +76,19 @@ public class Publisher }); //create a transactional session - _session = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + setSession(getConnection().createSession(true, Session.AUTO_ACKNOWLEDGE)); //lookup the example queue and use it //Queue is non-exclusive and not deleted when last consumer detaches - _destination = (Queue) ctx.lookup("MyQueue"); + setDestination((Queue) ctx.lookup("MyQueue")); //create a message producer - _producer = _session.createProducer(_destination); + setProducer(getSession().createProducer(getDestination())); //set destination dir for files that have been processed - _destinationDir = _defaultDestinationDir; + setDestinationDir(get_defaultDestinationDir()); - _connection.start(); + getConnection().start(); } catch (Exception e) { @@ -97,6 +97,11 @@ public class Publisher } } + public static String get_defaultDestinationDir() + { + return _defaultDestinationDir; + } + /** * Creates and sends the number of messages specified in the param */ @@ -104,7 +109,7 @@ public class Publisher { try { - TextMessage txtMessage = _session.createTextMessage("msg"); + TextMessage txtMessage = getSession().createTextMessage("msg"); for (int i=0;i<numMessages;i++) { sendMessage(txtMessage); @@ -128,10 +133,10 @@ public class Publisher try { //Send message via our producer which is not persistent - _producer.send(message, DeliveryMode.PERSISTENT, _producer.getPriority(), _producer.getTimeToLive()); + getProducer().send(message, DeliveryMode.PERSISTENT, getProducer().getPriority(), getProducer().getTimeToLive()); //commit the message send and close the transaction - _session.commit(); + getSession().commit(); } catch (JMSException e) @@ -139,7 +144,7 @@ public class Publisher //Have to assume our commit failed and rollback here try { - _session.rollback(); + getSession().rollback(); _log.error("JMSException", e); e.printStackTrace(); return false; @@ -162,13 +167,13 @@ public class Publisher { try { - if (_connection != null) + if (getConnection() != null) { - _connection.stop(); - _connection.close(); + getConnection().stop(); + getConnection().close(); } - _connection = null; - _producer = null; + setConnection(null); + setProducer(null); } catch(Exception e) { @@ -204,5 +209,41 @@ public class Publisher public void setName(String _name) { this._name = _name; } + + + public Connection getConnection() + { + return _connection; + } + + public void setConnection(Connection connection) + { + _connection = connection; + } + + public void setSession(Session session) + { + _session = session; + } + + public MessageProducer getProducer() + { + return _producer; + } + + public void setProducer(MessageProducer producer) + { + _producer = producer; + } + + public Destination getDestination() + { + return _destination; + } + + public void setDestination(Destination destination) + { + _destination = destination; + } } diff --git a/java/client/example/src/main/java/org/apache/qpid/example/publisher/TopicPublisher.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/TopicPublisher.java index 8645e41101..953a875912 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/publisher/TopicPublisher.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/TopicPublisher.java @@ -18,7 +18,6 @@ */ package org.apache.qpid.example.publisher; -import org.apache.qpid.client.BasicMessageProducer; import org.apache.qpid.example.shared.InitialContextHelper; import org.slf4j.LoggerFactory; import org.slf4j.Logger; @@ -44,10 +43,10 @@ public class TopicPublisher extends Publisher InitialContext ctx = _contextHelper.getInitialContext(); //lookup the example topic and use it - _destination = (Topic) ctx.lookup("MyTopic"); + setDestination((Topic) ctx.lookup("MyTopic")); //create a message producer - _producer = _session.createProducer(_destination); + setProducer(getSession().createProducer(getDestination())); } catch (Exception e) { diff --git a/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Client.java b/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Client.java index e32ee0ba73..5b0f4757ca 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Client.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Client.java @@ -31,11 +31,11 @@ import javax.naming.NamingException; */ public abstract class Client { - protected ConnectionSetup _setup; + private ConnectionSetup _setup; - protected Connection _connection; - protected Destination _destination; - protected Session _session; + private Connection _connection; + private Destination _destination; + private Session _session; public Client(String destination) { @@ -69,4 +69,28 @@ public abstract class Client public abstract void start(); + public ConnectionSetup getSetup() + { + return _setup; + } + + public Connection getConnection() + { + return _connection; + } + + public Destination getDestination() + { + return _destination; + } + + public Session getSession() + { + return _session; + } + + public void setSession(Session session) + { + _session = session; + } }
\ No newline at end of file diff --git a/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java b/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java index ac3829d49e..f35d56c702 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java @@ -36,7 +36,7 @@ import javax.jms.Session; */ public class Publisher extends Client { - int _msgCount; + private int _msgCount; public Publisher(String destination, int msgCount) { @@ -48,18 +48,18 @@ public class Publisher extends Client { try { - _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + setSession(getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE)); - MessageProducer _producer = _session.createProducer(_destination); + MessageProducer _producer = getSession().createProducer(getDestination()); for (int msgCount = 0; msgCount < _msgCount; msgCount++) { - _producer.send(_session.createTextMessage("msg:" + msgCount)); + _producer.send(getSession().createTextMessage("msg:" + msgCount)); System.out.println("Sent:" + msgCount); } System.out.println("Done."); - _connection.close(); + getConnection().close(); } catch (JMSException e) { diff --git a/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Subscriber.java b/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Subscriber.java index f2d736701f..1d7fc43b9c 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Subscriber.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Subscriber.java @@ -41,7 +41,7 @@ import java.util.concurrent.CountDownLatch; public class Subscriber extends Client implements MessageListener { - CountDownLatch _count; + private CountDownLatch _count; public Subscriber(String destination, int msgCount) { @@ -54,16 +54,16 @@ public class Subscriber extends Client implements MessageListener { try { - _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + setSession(getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE)); - _session.createDurableSubscriber((Topic) _setup.getDestination(ConnectionSetup.TOPIC_JNDI_NAME), - "exampleClient").setMessageListener(this); - _connection.start(); + getSession().createDurableSubscriber((Topic) getSetup().getDestination(ConnectionSetup.TOPIC_JNDI_NAME), + "exampleClient").setMessageListener(this); + getConnection().start(); _count.await(); System.out.println("Done"); - _connection.close(); + getConnection().close(); } catch (JMSException e) { diff --git a/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Client.java b/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Client.java index 8a0ff88448..ee52e8b9ea 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Client.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Client.java @@ -40,15 +40,15 @@ import java.util.concurrent.CountDownLatch; public class Client implements MessageListener { - final String BROKER = "localhost"; + private final String BROKER = "localhost"; - final String INITIAL_CONTEXT_FACTORY = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"; + private final String INITIAL_CONTEXT_FACTORY = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"; - final String CONNECTION_JNDI_NAME = "local"; - final String CONNECTION_NAME = "amqp://guest:guest@clientid/test?brokerlist='" + BROKER + "'"; + private final String CONNECTION_JNDI_NAME = "local"; + private final String CONNECTION_NAME = "amqp://guest:guest@clientid/test?brokerlist='" + BROKER + "'"; - final String QUEUE_JNDI_NAME = "queue"; - final String QUEUE_NAME = "example.RequestQueue"; + private final String QUEUE_JNDI_NAME = "queue"; + private final String QUEUE_NAME = "example.RequestQueue"; private InitialContext _ctx; diff --git a/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Server.java b/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Server.java index 9c284eee97..88e8ca1f45 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Server.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Server.java @@ -45,15 +45,15 @@ import java.io.IOException; public class Server implements MessageListener { - final String BROKER = "localhost"; + private final String BROKER = "localhost"; - final String INITIAL_CONTEXT_FACTORY = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"; + private final String INITIAL_CONTEXT_FACTORY = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"; - final String CONNECTION_JNDI_NAME = "local"; - final String CONNECTION_NAME = "amqp://guest:guest@clientid/test?brokerlist='" + BROKER + "'"; + private final String CONNECTION_JNDI_NAME = "local"; + private final String CONNECTION_NAME = "amqp://guest:guest@clientid/test?brokerlist='" + BROKER + "'"; - final String QUEUE_JNDI_NAME = "queue"; - final String QUEUE_NAME = "example.RequestQueue"; + private final String QUEUE_JNDI_NAME = "queue"; + private final String QUEUE_NAME = "example.RequestQueue"; private InitialContext _ctx; diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 746d5b8f34..6c684e593d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -106,7 +106,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect * the handler deals with this. It also deals with the initial dispatch of any protocol frames to their appropriate * handler. */ - protected AMQProtocolHandler _protocolHandler; + private AMQProtocolHandler _protocolHandler; /** Maps from session id (Integer) to AMQSession instance */ private final ChannelToSessionMap _sessions = new ChannelToSessionMap(); @@ -122,7 +122,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect /** The virtual path to connect to on the AMQ server */ private String _virtualHost; - protected ExceptionListener _exceptionListener; + private ExceptionListener _exceptionListener; private ConnectionListener _connectionListener; @@ -132,15 +132,15 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect * Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for message * publication. */ - protected volatile boolean _started; + private volatile boolean _started; /** Policy dictating how to failover */ - protected FailoverPolicy _failoverPolicy; + private FailoverPolicy _failoverPolicy; /* * _Connected should be refactored with a suitable wait object. */ - protected boolean _connected; + private boolean _connected; /* * The connection meta data @@ -156,7 +156,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private final ExecutorService _taskPool = Executors.newCachedThreadPool(); private static final long DEFAULT_TIMEOUT = 1000 * 30; - protected AMQConnectionDelegate _delegate; + private AMQConnectionDelegate _delegate; // this connection maximum number of prefetched messages private int _maxPrefetch; @@ -346,11 +346,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _logger.info("Connecting with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion()); // We are not currently connected - _connected = false; + setConnected(false); boolean retryAllowed = true; Exception connectionException = null; - while (!_connected && retryAllowed && brokerDetails != null) + while (!isConnected() && retryAllowed && brokerDetails != null) { ProtocolVersion pe = null; try @@ -374,7 +374,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // broker initDelegate(pe); } - else if (!_connected) + else if (!isConnected()) { retryAllowed = _failoverPolicy.failoverAllowed(); brokerDetails = _failoverPolicy.getNextBrokerDetails(); @@ -384,10 +384,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect if (_logger.isDebugEnabled()) { - _logger.debug("Are we connected:" + _connected); + _logger.debug("Are we connected:" + isConnected()); } - if (!_connected) + if (!isConnected()) { if (_logger.isDebugEnabled()) { @@ -590,7 +590,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public boolean failoverAllowed() { - if (!_connected) + if (!isConnected()) { return false; } @@ -729,6 +729,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } + protected final ExceptionListener getExceptionListenerNoCheck() + { + return _exceptionListener; + } + public ExceptionListener getExceptionListener() throws JMSException { checkNotClosed(); @@ -1048,16 +1053,26 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return _virtualHost; } - public AMQProtocolHandler getProtocolHandler() + public final AMQProtocolHandler getProtocolHandler() { return _protocolHandler; } - public boolean started() + public final boolean started() { return _started; } + protected final boolean isConnected() + { + return _connected; + } + + protected final void setConnected(boolean connected) + { + _connected = connected; + } + public void bytesSent(long writtenBytes) { if (_connectionListener != null) @@ -1489,4 +1504,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return _lastFailoverTime; } + protected AMQConnectionDelegate getDelegate() + { + return _delegate; + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 74475c0bc1..a18a3fcbd4 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -71,7 +71,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec /** * The QpidConeection instance that is mapped with this JMS connection. */ - org.apache.qpid.transport.Connection _qpidConnection; + private org.apache.qpid.transport.Connection _qpidConnection; private ConnectionException exception = null; //--- constructor @@ -109,7 +109,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec session = new AMQSession_0_10(_qpidConnection, _conn, channelId, transacted, acknowledgeMode, prefetchHigh, prefetchLow,name); _conn.registerSession(channelId, session); - if (_conn._started) + if (_conn.started()) { session.start(); } @@ -152,7 +152,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec { session = new XASessionImpl(_qpidConnection, _conn, channelId, prefetchHigh, prefetchLow); _conn.registerSession(channelId, session); - if (_conn._started) + if (_conn.started()) { session.start(); } @@ -164,7 +164,6 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec return session; } - @Override public XASession createXASession(int ackMode) throws JMSException { @@ -182,7 +181,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec { session = new XASessionImpl(_qpidConnection, _conn, channelId, ackMode, (int)_conn.getMaxPrefetch(), (int)_conn.getMaxPrefetch() / 2); _conn.registerSession(channelId, session); - if (_conn._started) + if (_conn.started()) { session.start(); } @@ -218,10 +217,10 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec _qpidConnection.setConnectionDelegate(new ClientConnectionDelegate(conSettings, _conn.getConnectionURL())); _qpidConnection.connect(conSettings); - _conn._connected = true; + _conn.setConnected(true); _conn.setUsername(_qpidConnection.getUserID()); _conn.setMaximumChannelCount(_qpidConnection.getChannelMax()); - _conn._failoverPolicy.attainedConnection(); + _conn.getFailoverPolicy().attainedConnection(); } catch (ProtocolVersionException pe) { @@ -327,7 +326,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec } } - ExceptionListener listener = _conn._exceptionListener; + ExceptionListener listener = _conn.getExceptionListenerNoCheck(); if (listener == null) { _logger.error("connection exception: " + conn, exc); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 287e4f3859..5068b1bc50 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -120,7 +120,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate EnumSet.of(AMQState.CONNECTION_OPEN, AMQState.CONNECTION_CLOSED); - StateWaiter waiter = _conn._protocolHandler.createWaiter(openOrClosedStates); + StateWaiter waiter = _conn.getProtocolHandler().createWaiter(openOrClosedStates); ConnectionSettings settings = brokerDetail.buildConnectionSettings(); settings.setProtocol(brokerDetail.getTransport()); @@ -148,9 +148,9 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate SecurityLayer securityLayer = SecurityLayerFactory.newInstance(settings); OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(getProtocolVersion()); - NetworkConnection network = transport.connect(settings, securityLayer.receiver(_conn._protocolHandler), sslContext); - _conn._protocolHandler.setNetworkConnection(network, securityLayer.sender(network.getSender())); - _conn._protocolHandler.getProtocolSession().init(); + NetworkConnection network = transport.connect(settings, securityLayer.receiver(_conn.getProtocolHandler()), sslContext); + _conn.getProtocolHandler().setNetworkConnection(network, securityLayer.sender(network.getSender())); + _conn.getProtocolHandler().getProtocolSession().init(); // this blocks until the connection has been set up or when an error // has prevented the connection being set up @@ -158,13 +158,13 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate if(state == AMQState.CONNECTION_OPEN) { - _conn._failoverPolicy.attainedConnection(); - _conn._connected = true; + _conn.getFailoverPolicy().attainedConnection(); + _conn.setConnected(true); return null; } else { - return _conn._protocolHandler.getSuggestedProtocolVersion(); + return _conn.getProtocolHandler().getSuggestedProtocolVersion(); } } @@ -237,7 +237,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate } } - if (_conn._started) + if (_conn.started()) { try { @@ -271,12 +271,12 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate { ChannelOpenBody channelOpenBody = _conn.getProtocolHandler().getMethodRegistry().createChannelOpenBody(null); // TODO: Be aware of possible changes to parameter order as versions change. - _conn._protocolHandler.syncWrite(channelOpenBody.generateFrame(channelId), ChannelOpenOkBody.class); + _conn.getProtocolHandler().syncWrite(channelOpenBody.generateFrame(channelId), ChannelOpenOkBody.class); // todo send low water mark when protocol allows. // todo Be aware of possible changes to parameter order as versions change. BasicQosBody basicQosBody = _conn.getProtocolHandler().getMethodRegistry().createBasicQosBody(0,prefetchHigh,false); - _conn._protocolHandler.syncWrite(basicQosBody.generateFrame(channelId),BasicQosOkBody.class); + _conn.getProtocolHandler().syncWrite(basicQosBody.generateFrame(channelId),BasicQosOkBody.class); if (transacted) { @@ -287,7 +287,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate TxSelectBody body = _conn.getProtocolHandler().getMethodRegistry().createTxSelectBody(); // TODO: Be aware of possible changes to parameter order as versions change. - _conn._protocolHandler.syncWrite(body.generateFrame(channelId), TxSelectOkBody.class); + _conn.getProtocolHandler().syncWrite(body.generateFrame(channelId), TxSelectOkBody.class); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index 76828afd6a..9e19cc8969 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -48,15 +48,15 @@ public abstract class AMQDestination implements Destination, Referenceable { private static final Logger _logger = LoggerFactory.getLogger(AMQDestination.class); - protected AMQShortString _exchangeName; + private AMQShortString _exchangeName; - protected AMQShortString _exchangeClass; + private AMQShortString _exchangeClass; - protected boolean _isDurable; + private boolean _isDurable; - protected boolean _isExclusive; + private boolean _isExclusive; - protected boolean _isAutoDelete; + private boolean _isAutoDelete; private boolean _browseOnly; @@ -81,6 +81,41 @@ public abstract class AMQDestination implements Destination, Referenceable public static final int TOPIC_TYPE = 2; public static final int UNKNOWN_TYPE = 3; + protected void setExclusive(boolean exclusive) + { + _isExclusive = exclusive; + } + + protected AddressHelper getAddrHelper() + { + return _addrHelper; + } + + protected void setAddrHelper(AddressHelper addrHelper) + { + _addrHelper = addrHelper; + } + + protected String getName() + { + return _name; + } + + protected void setName(String name) + { + _name = name; + } + + protected Link getTargetLink() + { + return _targetLink; + } + + protected void setTargetLink(Link targetLink) + { + _targetLink = targetLink; + } + // ----- Fields required to support new address syntax ------- public enum DestSyntax { @@ -132,23 +167,23 @@ public abstract class AMQDestination implements Destination, Referenceable } } - protected final static DestSyntax defaultDestSyntax; + private final static DestSyntax defaultDestSyntax; - protected DestSyntax _destSyntax = DestSyntax.ADDR; + private DestSyntax _destSyntax = DestSyntax.ADDR; - protected AddressHelper _addrHelper; - protected Address _address; - protected int _addressType = AMQDestination.UNKNOWN_TYPE; - protected String _name; - protected String _subject; - protected AddressOption _create = AddressOption.NEVER; - protected AddressOption _assert = AddressOption.NEVER; - protected AddressOption _delete = AddressOption.NEVER; + private AddressHelper _addrHelper; + private Address _address; + private int _addressType = AMQDestination.UNKNOWN_TYPE; + private String _name; + private String _subject; + private AddressOption _create = AddressOption.NEVER; + private AddressOption _assert = AddressOption.NEVER; + private AddressOption _delete = AddressOption.NEVER; - protected Node _targetNode; - protected Node _sourceNode; - protected Link _targetLink; - protected Link _link; + private Node _targetNode; + private Node _sourceNode; + private Link _targetLink; + private Link _link; // ----- / Fields required to support new address syntax ------- @@ -646,10 +681,10 @@ public abstract class AMQDestination implements Destination, Referenceable public static class Binding { - String exchange; - String bindingKey; - String queue; - Map<String,Object> args; + private String exchange; + private String bindingKey; + private String queue; + private Map<String,Object> args; public Binding(String exchange, String queue, @@ -902,4 +937,5 @@ public abstract class AMQDestination implements Destination, Referenceable return _rejectBehaviour; } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java b/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java index 3f9eadeef3..465d858091 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java @@ -108,7 +108,7 @@ public class AMQQueueBrowser implements QueueBrowser private class QueueBrowserEnumeration implements Enumeration { - Message _nextMessage; + private Message _nextMessage; private BasicMessageConsumer _consumer; public QueueBrowserEnumeration(BasicMessageConsumer consumer) throws JMSException diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQQueueSessionAdaptor.java b/java/client/src/main/java/org/apache/qpid/client/AMQQueueSessionAdaptor.java index c59eba60b8..d1c796c34a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQQueueSessionAdaptor.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQQueueSessionAdaptor.java @@ -31,7 +31,7 @@ import java.io.Serializable; public class AMQQueueSessionAdaptor implements QueueSession, AMQSessionAdapter { //holds a session for delegation - protected final AMQSession _session; + private final AMQSession _session; /** * Construct an adaptor with a session to wrap diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index cca76ebac5..92579c31f0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -96,6 +96,166 @@ import java.util.concurrent.locks.ReentrantLock; */ public abstract class AMQSession<C extends BasicMessageConsumer, P extends BasicMessageProducer> extends Closeable implements Session, QueueSession, TopicSession { + /** + * Used to reference durable subscribers so that requests for unsubscribe can be handled correctly. Note this only + * keeps a record of subscriptions which have been created in the current instance. It does not remember + * subscriptions between executions of the client. + */ + protected ConcurrentHashMap<String, TopicSubscriberAdaptor<C>> getSubscriptions() + { + return _subscriptions; + } + + /** + * Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked + * up in the {@link #_subscriptions} map. + */ + protected ConcurrentHashMap<C, String> getReverseSubscriptionMap() + { + return _reverseSubscriptionMap; + } + + /** + * Locks to keep access to subscriber details atomic. + * <p> + * Added for QPID2418 + */ + protected Lock getSubscriberDetails() + { + return _subscriberDetails; + } + + protected Lock getSubscriberAccess() + { + return _subscriberAccess; + } + + /** + * Used to hold incoming messages. + * + * @todo Weaken the type once {@link org.apache.qpid.client.util.FlowControllingBlockingQueue} implements Queue. + */ + protected FlowControllingBlockingQueue getQueue() + { + return _queue; + } + + /** Holds the highest received delivery tag. */ + protected AtomicLong getHighestDeliveryTag() + { + return _highestDeliveryTag; + } + + /** Pre-fetched message tags */ + protected ConcurrentLinkedQueue<Long> getPrefetchedMessageTags() + { + return _prefetchedMessageTags; + } + + protected void setPrefetchedMessageTags(ConcurrentLinkedQueue<Long> prefetchedMessageTags) + { + _prefetchedMessageTags = prefetchedMessageTags; + } + + /** All the not yet acknowledged message tags */ + protected ConcurrentLinkedQueue<Long> getUnacknowledgedMessageTags() + { + return _unacknowledgedMessageTags; + } + + protected void setUnacknowledgedMessageTags(ConcurrentLinkedQueue<Long> unacknowledgedMessageTags) + { + _unacknowledgedMessageTags = unacknowledgedMessageTags; + } + + /** All the delivered message tags */ + protected ConcurrentLinkedQueue<Long> getDeliveredMessageTags() + { + return _deliveredMessageTags; + } + + protected void setDeliveredMessageTags(ConcurrentLinkedQueue<Long> deliveredMessageTags) + { + _deliveredMessageTags = deliveredMessageTags; + } + + /** Holds the dispatcher thread for this session. */ + protected Dispatcher getDispatcher() + { + return _dispatcher; + } + + protected void setDispatcher(Dispatcher dispatcher) + { + _dispatcher = dispatcher; + } + + protected Thread getDispatcherThread() + { + return _dispatcherThread; + } + + protected void setDispatcherThread(Thread dispatcherThread) + { + _dispatcherThread = dispatcherThread; + } + + /** Holds the message factory factory for this session. */ + protected MessageFactoryRegistry getMessageFactoryRegistry() + { + return _messageFactoryRegistry; + } + + protected void setMessageFactoryRegistry(MessageFactoryRegistry messageFactoryRegistry) + { + _messageFactoryRegistry = messageFactoryRegistry; + } + + /** + * Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right + * consumer. + */ + protected IdToConsumerMap<C> getConsumers() + { + return _consumers; + } + + /** + * Set when the dispatcher should direct incoming messages straight into the UnackedMessage list instead of + * to the syncRecieveQueue or MessageListener. Used during cleanup, e.g. in Session.recover(). + */ + protected boolean isUsingDispatcherForCleanup() + { + return _usingDispatcherForCleanup; + } + + protected void setUsingDispatcherForCleanup(boolean usingDispatcherForCleanup) + { + _usingDispatcherForCleanup = usingDispatcherForCleanup; + } + + /** + * Used to ensure that only the first call to start the dispatcher can unsuspend the channel. + * + * @todo This is accessed only within a synchronized method, so does not need to be atomic. + */ + protected AtomicBoolean getFirstDispatcher() + { + return _firstDispatcher; + } + + /** Used to indicate that the session should start pre-fetching messages as soon as it is started. */ + protected boolean isImmediatePrefetch() + { + return _immediatePrefetch; + } + + /** Indicates that runtime exceptions should be generated on vilations of the strict AMQP. */ + protected boolean isStrictAMQPFATAL() + { + return _strictAMQPFATAL; + } + public static final class IdToConsumerMap<C extends BasicMessageConsumer> { private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16]; @@ -173,8 +333,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } } - final AMQSession<C, P> _thisSession = this; - /** Used for debugging. */ private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class); @@ -182,34 +340,34 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * The default value for immediate flag used by producers created by this session is false. That is, a consumer does * not need to be attached to a queue. */ - protected final boolean DEFAULT_IMMEDIATE = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false")); + private final boolean _defaultImmediateValue = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false")); /** * The default value for mandatory flag used by producers created by this session is true. That is, server will not * silently drop messages where no queue is connected to the exchange for the message. */ - protected final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true")); + private final boolean _defaultMandatoryValue = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true")); /** * The period to wait while flow controlled before sending a log message confirming that the session is still * waiting on flow control being revoked */ - protected final long FLOW_CONTROL_WAIT_PERIOD = Long.getLong("qpid.flow_control_wait_notify_period",5000L); + private final long _flowControlWaitPeriod = Long.getLong("qpid.flow_control_wait_notify_period",5000L); /** * The period to wait while flow controlled before declaring a failure */ public static final long DEFAULT_FLOW_CONTROL_WAIT_FAILURE = 120000L; - protected final long FLOW_CONTROL_WAIT_FAILURE = Long.getLong("qpid.flow_control_wait_failure", + private final long _flowControlWaitFailure = Long.getLong("qpid.flow_control_wait_failure", DEFAULT_FLOW_CONTROL_WAIT_FAILURE); - protected final boolean DECLARE_QUEUES = + private final boolean _delareQueues = Boolean.parseBoolean(System.getProperty("qpid.declare_queues", "true")); - protected final boolean DECLARE_EXCHANGES = + private final boolean _declareExchanges = Boolean.parseBoolean(System.getProperty("qpid.declare_exchanges", "true")); - protected final boolean USE_AMQP_ENCODED_MAP_MESSAGE; + private final boolean _useAMQPEncodedMapMessage; /** System property to enable strict AMQP compliance. */ public static final String STRICT_AMQP = "STRICT_AMQP"; @@ -230,16 +388,16 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public static final String IMMEDIATE_PREFETCH_DEFAULT = "false"; /** The connection to which this session belongs. */ - protected AMQConnection _connection; + private AMQConnection _connection; /** Used to indicate whether or not this is a transactional session. */ - protected final boolean _transacted; + private final boolean _transacted; /** Holds the sessions acknowledgement mode. */ - protected final int _acknowledgeMode; + private final int _acknowledgeMode; /** Holds this session unique identifier, used to distinguish it from other sessions. */ - protected int _channelId; + private int _channelId; private int _ticket; @@ -255,55 +413,30 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /** Used to indicate that this session has been started at least once. */ private AtomicBoolean _startedAtLeastOnce = new AtomicBoolean(false); - /** - * Used to reference durable subscribers so that requests for unsubscribe can be handled correctly. Note this only - * keeps a record of subscriptions which have been created in the current instance. It does not remember - * subscriptions between executions of the client. - */ - protected final ConcurrentHashMap<String, TopicSubscriberAdaptor<C>> _subscriptions = + private final ConcurrentHashMap<String, TopicSubscriberAdaptor<C>> _subscriptions = new ConcurrentHashMap<String, TopicSubscriberAdaptor<C>>(); - /** - * Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked - * up in the {@link #_subscriptions} map. - */ - protected final ConcurrentHashMap<C, String> _reverseSubscriptionMap = new ConcurrentHashMap<C, String>(); + private final ConcurrentHashMap<C, String> _reverseSubscriptionMap = new ConcurrentHashMap<C, String>(); - /** - * Locks to keep access to subscriber details atomic. - * <p> - * Added for QPID2418 - */ - protected final Lock _subscriberDetails = new ReentrantLock(true); - protected final Lock _subscriberAccess = new ReentrantLock(true); + private final Lock _subscriberDetails = new ReentrantLock(true); + private final Lock _subscriberAccess = new ReentrantLock(true); - /** - * Used to hold incoming messages. - * - * @todo Weaken the type once {@link FlowControllingBlockingQueue} implements Queue. - */ - protected final FlowControllingBlockingQueue _queue; + private final FlowControllingBlockingQueue _queue; - /** Holds the highest received delivery tag. */ - protected final AtomicLong _highestDeliveryTag = new AtomicLong(-1); + private final AtomicLong _highestDeliveryTag = new AtomicLong(-1); private final AtomicLong _rollbackMark = new AtomicLong(-1); - /** Pre-fetched message tags */ - protected ConcurrentLinkedQueue<Long> _prefetchedMessageTags = new ConcurrentLinkedQueue<Long>(); + private ConcurrentLinkedQueue<Long> _prefetchedMessageTags = new ConcurrentLinkedQueue<Long>(); - /** All the not yet acknowledged message tags */ - protected ConcurrentLinkedQueue<Long> _unacknowledgedMessageTags = new ConcurrentLinkedQueue<Long>(); + private ConcurrentLinkedQueue<Long> _unacknowledgedMessageTags = new ConcurrentLinkedQueue<Long>(); - /** All the delivered message tags */ - protected ConcurrentLinkedQueue<Long> _deliveredMessageTags = new ConcurrentLinkedQueue<Long>(); + private ConcurrentLinkedQueue<Long> _deliveredMessageTags = new ConcurrentLinkedQueue<Long>(); - /** Holds the dispatcher thread for this session. */ - protected Dispatcher _dispatcher; + private Dispatcher _dispatcher; - protected Thread _dispatcherThread; + private Thread _dispatcherThread; - /** Holds the message factory factory for this session. */ - protected MessageFactoryRegistry _messageFactoryRegistry; + private MessageFactoryRegistry _messageFactoryRegistry; /** Holds all of the producers created by this session, keyed by their unique identifiers. */ private Map<Long, MessageProducer> _producers = new ConcurrentHashMap<Long, MessageProducer>(); @@ -314,11 +447,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic */ private int _nextTag = 1; - /** - * Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right - * consumer. - */ - protected final IdToConsumerMap<C> _consumers = new IdToConsumerMap<C>(); + private final IdToConsumerMap<C> _consumers = new IdToConsumerMap<C>(); /** * Contains a list of consumers which have been removed but which might still have @@ -344,11 +473,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic */ private volatile boolean _sessionInRecovery; - /** - * Set when the dispatcher should direct incoming messages straight into the UnackedMessage list instead of - * to the syncRecieveQueue or MessageListener. Used during cleanup, e.g. in Session.recover(). - */ - protected volatile boolean _usingDispatcherForCleanup; + private volatile boolean _usingDispatcherForCleanup; /** Used to indicates that the connection to which this session belongs, has been stopped. */ private boolean _connectionStopped; @@ -365,21 +490,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic */ private final Object _suspensionLock = new Object(); - /** - * Used to ensure that only the first call to start the dispatcher can unsuspend the channel. - * - * @todo This is accessed only within a synchronized method, so does not need to be atomic. - */ - protected final AtomicBoolean _firstDispatcher = new AtomicBoolean(true); + private final AtomicBoolean _firstDispatcher = new AtomicBoolean(true); - /** Used to indicate that the session should start pre-fetching messages as soon as it is started. */ - protected final boolean _immediatePrefetch; + private final boolean _immediatePrefetch; - /** Indicates that warnings should be generated on violations of the strict AMQP. */ - protected final boolean _strictAMQP; + private final boolean _strictAMQP; - /** Indicates that runtime exceptions should be generated on vilations of the strict AMQP. */ - protected final boolean _strictAMQPFATAL; + private final boolean _strictAMQPFATAL; private final Object _messageDeliveryLock = new Object(); /** Session state : used to detect if commit is a) required b) allowed , i.e. does the tx span failover. */ @@ -420,7 +537,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic protected AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark) { - USE_AMQP_ENCODED_MAP_MESSAGE = con == null ? true : !con.isUseLegacyMapMessageFormat(); + _useAMQPEncodedMapMessage = con == null ? true : !con.isUseLegacyMapMessageFormat(); _strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT)); _strictAMQPFATAL = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT)); @@ -456,7 +573,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { // If the session has been closed don't waste time creating a thread to do // flow control - if (!(_thisSession.isClosed() || _thisSession.isClosing())) + if (!(AMQSession.this.isClosed() || AMQSession.this.isClosing())) { // Only execute change if previous state // was False @@ -484,7 +601,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { // If the session has been closed don't waste time creating a thread to do // flow control - if (!(_thisSession.isClosed() || _thisSession.isClosing())) + if (!(AMQSession.this.isClosed() || AMQSession.this.isClosing())) { // Only execute change if previous state // was true @@ -1136,7 +1253,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public MapMessage createMapMessage() throws JMSException { checkNotClosed(); - if (USE_AMQP_ENCODED_MAP_MESSAGE) + if (_useAMQPEncodedMapMessage) { AMQPEncodedMapMessage msg = new AMQPEncodedMapMessage(getMessageDelegateFactory()); msg.setAMQSession(this); @@ -1173,12 +1290,12 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public P createProducer(Destination destination) throws JMSException { - return createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE); + return createProducerImpl(destination, _defaultMandatoryValue, _defaultImmediateValue); } public P createProducer(Destination destination, boolean immediate) throws JMSException { - return createProducerImpl(destination, DEFAULT_MANDATORY, immediate); + return createProducerImpl(destination, _defaultMandatoryValue, immediate); } public P createProducer(Destination destination, boolean mandatory, boolean immediate) @@ -1625,6 +1742,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic return (counter != null) && (counter.get() != 0); } + /** Indicates that warnings should be generated on violations of the strict AMQP. */ public boolean isStrictAMQP() { return _strictAMQP; @@ -2915,12 +3033,12 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } else { - if (DECLARE_EXCHANGES) + if (_declareExchanges) { declareExchange(amqd, protocolHandler, nowait); } - if (DECLARE_QUEUES || amqd.isNameRequired()) + if (_delareQueues || amqd.isNameRequired()) { declareQueue(amqd, protocolHandler, consumer.isNoLocal(), nowait); } @@ -3141,17 +3259,17 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic synchronized (_flowControl) { while (!_flowControl.getFlowControl() && - (expiryTime == 0L ? (expiryTime = System.currentTimeMillis() + FLOW_CONTROL_WAIT_FAILURE) + (expiryTime == 0L ? (expiryTime = System.currentTimeMillis() + _flowControlWaitFailure) : expiryTime) >= System.currentTimeMillis() ) { - _flowControl.wait(FLOW_CONTROL_WAIT_PERIOD); - _logger.warn("Message send delayed by " + (System.currentTimeMillis() + FLOW_CONTROL_WAIT_FAILURE - expiryTime)/1000 + "s due to broker enforced flow control"); + _flowControl.wait(_flowControlWaitPeriod); + _logger.warn("Message send delayed by " + (System.currentTimeMillis() + _flowControlWaitFailure - expiryTime)/1000 + "s due to broker enforced flow control"); } if(!_flowControl.getFlowControl()) { _logger.error("Message send failed due to timeout waiting on broker enforced flow control"); - throw new JMSException("Unable to send message for " + FLOW_CONTROL_WAIT_FAILURE/1000 + " seconds due to broker enforced flow control"); + throw new JMSException("Unable to send message for " + _flowControlWaitFailure /1000 + " seconds due to broker enforced flow control"); } } @@ -3198,6 +3316,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } + private AtomicBoolean getClosed() + { + return _closed; + } + public void rejectPending(C consumer) { synchronized (_lock) @@ -3333,7 +3456,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic if (_dispatcherLogger.isInfoEnabled()) { - _dispatcherLogger.info(_dispatcherThread.getName() + " thread terminating for channel " + _channelId + ":" + _thisSession); + _dispatcherLogger.info(_dispatcherThread.getName() + " thread terminating for channel " + _channelId + ":" + AMQSession.this); } } @@ -3454,7 +3577,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic if (_logger.isDebugEnabled()) { _logger.debug("Rejecting message with delivery tag " + message.getDeliveryTag() - + " for closing consumer " + String.valueOf(consumer == null? null: consumer._consumerTag)); + + " for closing consumer " + String.valueOf(consumer == null? null: consumer.getConsumerTag())); } rejectMessage(message, true); } @@ -3513,7 +3636,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { // If the session has closed by the time we get here // then we should not attempt to write to the sesion/channel. - if (!(_thisSession.isClosed() || _thisSession.isClosing())) + if (!(AMQSession.this.isClosed() || AMQSession.this.isClosing())) { suspendChannel(_suspend.get()); } @@ -3521,11 +3644,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } catch (AMQException e) { - _logger.warn("Unable to " + (_suspend.get() ? "suspend" : "unsuspend") + " session " + _thisSession + " due to: " + e); + _logger.warn("Unable to " + (_suspend.get() ? "suspend" : "unsuspend") + " session " + AMQSession.this + " due to: " + e); if (_logger.isDebugEnabled()) { _logger.debug("Is the _queue empty?" + _queue.isEmpty()); - _logger.debug("Is the dispatcher closed?" + (_dispatcher == null ? "it's Null" : _dispatcher._closed)); + _logger.debug("Is the dispatcher closed?" + (_dispatcher == null ? "it's Null" : _dispatcher.getClosed())); } } } @@ -3556,7 +3679,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public boolean isDeclareExchanges() { - return DECLARE_EXCHANGES; + return _declareExchanges; } JMSException toJMSException(String message, TransportException e) diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 784e82237d..cfd5776c0a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -78,6 +78,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic private static final Logger _logger = LoggerFactory.getLogger(AMQSession_0_10.class); private static Timer timer = new Timer("ack-flusher", true); + private static class Flusher extends TimerTask { @@ -120,7 +121,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic private AMQException _currentException; // a ref on the qpid connection - protected org.apache.qpid.transport.Connection _qpidConnection; + private org.apache.qpid.transport.Connection _qpidConnection; private long maxAckDelay = Long.getLong("qpid.session.max_ack_delay", 1000); private TimerTask flushTask = null; @@ -163,7 +164,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic _qpidSession = _qpidConnection.createSession(name,1); } _qpidSession.setSessionListener(this); - if (_transacted) + if (isTransacted()) { _qpidSession.txSelect(); _qpidSession.setTransacted(true); @@ -214,6 +215,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } + protected Connection getQpidConnection() + { + return _qpidConnection; + } + //------- overwritten methods of class AMQSession void failoverPrep() @@ -234,17 +240,17 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { if (_logger.isDebugEnabled()) { - _logger.debug("Sending ack for delivery tag " + deliveryTag + " on session " + _channelId); + _logger.debug("Sending ack for delivery tag " + deliveryTag + " on session " + getChannelId()); } // acknowledge this message if (multiple) { - for (Long messageTag : _unacknowledgedMessageTags) + for (Long messageTag : getUnacknowledgedMessageTags()) { if( messageTag <= deliveryTag ) { addUnacked(messageTag.intValue()); - _unacknowledgedMessageTags.remove(messageTag); + getUnacknowledgedMessageTags().remove(messageTag); } } //empty the list of unack messages @@ -253,12 +259,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic else { addUnacked((int) deliveryTag); - _unacknowledgedMessageTags.remove(deliveryTag); + getUnacknowledgedMessageTags().remove(deliveryTag); } long prefetch = getAMQConnection().getMaxPrefetch(); - if (unackedCount >= prefetch/2 || maxAckDelay <= 0 || _acknowledgeMode == javax.jms.Session.AUTO_ACKNOWLEDGE) + if (unackedCount >= prefetch/2 || maxAckDelay <= 0 || getAcknowledgeMode() == javax.jms.Session.AUTO_ACKNOWLEDGE) { flushAcknowledgments(); } @@ -276,7 +282,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic if (unackedCount > 0) { messageAcknowledge - (unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE,setSyncBit); + (unacked, getAcknowledgeMode() != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE,setSyncBit); clearUnacked(); } } @@ -444,8 +450,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { // release all unacked messages RangeSet all = RangeSetFactory.createRangeSet(); - RangeSet delivered = gatherRangeSet(_unacknowledgedMessageTags); - RangeSet prefetched = gatherRangeSet(_prefetchedMessageTags); + RangeSet delivered = gatherRangeSet(getUnacknowledgedMessageTags()); + RangeSet prefetched = gatherRangeSet(getPrefetchedMessageTags()); for (Iterator<Range> deliveredIter = delivered.iterator(); deliveredIter.hasNext();) { Range range = deliveredIter.next(); @@ -526,9 +532,9 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { final AMQProtocolHandler protocolHandler = getProtocolHandler(); - return new BasicMessageConsumer_0_10(_channelId, _connection, destination, messageSelector, noLocal, - _messageFactoryRegistry, this, protocolHandler, rawSelector, prefetchHigh, - prefetchLow, exclusive, _acknowledgeMode, noConsume, autoClose); + return new BasicMessageConsumer_0_10(getChannelId(), getAMQConnection(), destination, messageSelector, noLocal, + getMessageFactoryRegistry(), this, protocolHandler, rawSelector, prefetchHigh, + prefetchLow, exclusive, getAcknowledgeMode(), noConsume, autoClose); } /** @@ -630,7 +636,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF, Option.UNRELIABLE); - if(capacity > 0 && _dispatcher != null && (isStarted() || _immediatePrefetch)) + if(capacity > 0 && getDispatcher() != null && (isStarted() || isImmediatePrefetch())) { // set the flow getQpidSession().messageFlow(consumerTag, @@ -653,7 +659,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { try { - return new BasicMessageProducer_0_10(_connection, (AMQDestination) destination, _transacted, _channelId, this, + return new BasicMessageProducer_0_10(getAMQConnection(), (AMQDestination) destination, isTransacted(), getChannelId(), this, getProtocolHandler(), producerId, immediate, mandatory); } catch (AMQException e) @@ -795,7 +801,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { if (suspend) { - for (BasicMessageConsumer consumer : _consumers.values()) + for (BasicMessageConsumer consumer : getConsumers().values()) { getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()), Option.UNRELIABLE); @@ -804,7 +810,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } else { - for (BasicMessageConsumer_0_10 consumer : _consumers.values()) + for (BasicMessageConsumer_0_10 consumer : getConsumers().values()) { String consumerTag = String.valueOf(consumer.getConsumerTag()); //only set if msg list is null @@ -942,7 +948,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } return send0_10QueueDeclare(amqd, protocolHandler, noLocal, nowait); } - }, _connection).execute(); + }, getAMQConnection()).execute(); } protected Long requestQueueDepth(AMQDestination amqd) @@ -969,8 +975,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic protected void sendTxCompletionsIfNecessary() { // this is a heuristic, we may want to have that configurable - if (_txSize > 0 && (_connection.getMaxPrefetch() == 1 || - _connection.getMaxPrefetch() != 0 && _txSize % (_connection.getMaxPrefetch() / 2) == 0)) + if (_txSize > 0 && (getAMQConnection().getMaxPrefetch() == 1 || + getAMQConnection().getMaxPrefetch() != 0 && _txSize % (getAMQConnection().getMaxPrefetch() / 2) == 0)) { // send completed so consumer credits don't dry up messageAcknowledge(_txRangeSet, false); @@ -1040,7 +1046,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic AMQException amqe = new AMQException(AMQConstant.getConstant(code), se.getMessage(), se.getCause()); _currentException = amqe; } - _connection.exceptionReceived(_currentException); + getAMQConnection().exceptionReceived(_currentException); } public AMQMessageDelegateFactory getMessageDelegateFactory() @@ -1159,7 +1165,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic boolean isConsumer, boolean noWait) throws AMQException { - if (dest.isAddressResolved() && dest.isResolvedAfter(_connection.getLastFailoverTime())) + if (dest.isAddressResolved() && dest.isResolvedAfter(getAMQConnection().getLastFailoverTime())) { if (isConsumer && AMQDestination.TOPIC_TYPE == dest.getAddressType()) { @@ -1329,7 +1335,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic protected void acknowledgeImpl() { - RangeSet ranges = gatherRangeSet(_unacknowledgedMessageTags); + RangeSet ranges = gatherRangeSet(getUnacknowledgedMessageTags()); if(ranges.size() > 0 ) { @@ -1345,13 +1351,13 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic // return the first <total number of msgs received on session> // messages sent by the brokers following the first rollback // after failover - _highestDeliveryTag.set(-1); + getHighestDeliveryTag().set(-1); // Clear txRangeSet/unacknowledgedMessageTags so we don't complete commands corresponding to //messages that came from the old broker. _txRangeSet.clear(); _txSize = 0; - _unacknowledgedMessageTags.clear(); - _prefetchedMessageTags.clear(); + getUnacknowledgedMessageTags().clear(); + getPrefetchedMessageTags().clear(); super.resubscribe(); getQpidSession().sync(); } @@ -1362,18 +1368,18 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic super.stop(); synchronized (getMessageDeliveryLock()) { - for (BasicMessageConsumer consumer : _consumers.values()) + for (BasicMessageConsumer consumer : getConsumers().values()) { List<Long> tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags(); - _prefetchedMessageTags.addAll(tags); + getPrefetchedMessageTags().addAll(tags); } } - _usingDispatcherForCleanup = true; + setUsingDispatcherForCleanup(true); drainDispatchQueue(); - _usingDispatcherForCleanup = false; + setUsingDispatcherForCleanup(false); - RangeSet delivered = gatherRangeSet(_unacknowledgedMessageTags); - RangeSet prefetched = gatherRangeSet(_prefetchedMessageTags); + RangeSet delivered = gatherRangeSet(getUnacknowledgedMessageTags()); + RangeSet prefetched = gatherRangeSet(getPrefetchedMessageTags()); RangeSet all = RangeSetFactory.createRangeSet(delivered.size() + prefetched.size()); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 574775804b..96994e7963 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -103,7 +103,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe { while (true) { - Long tag = _unacknowledgedMessageTags.poll(); + Long tag = getUnacknowledgedMessageTags().poll(); if (tag == null) { break; @@ -117,15 +117,15 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe { BasicAckBody body = getMethodRegistry().createBasicAckBody(deliveryTag, multiple); - final AMQFrame ackFrame = body.generateFrame(_channelId); + final AMQFrame ackFrame = body.generateFrame(getChannelId()); if (_logger.isDebugEnabled()) { - _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId); + _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + getChannelId()); } getProtocolHandler().writeFrame(ackFrame, !isTransacted()); - _unacknowledgedMessageTags.remove(deliveryTag); + getUnacknowledgedMessageTags().remove(deliveryTag); } public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, @@ -134,7 +134,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe { getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createQueueBindBody (getTicket(),queueName,exchangeName,routingKey,false,arguments). - generateFrame(_channelId), QueueBindOkBody.class); + generateFrame(getChannelId()), QueueBindOkBody.class); } public void sendClose(long timeout) throws AMQException, FailoverException @@ -151,7 +151,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe getProtocolHandler().closeSession(this); getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createChannelCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), - new AMQShortString("JMS client closing channel"), 0, 0).generateFrame(_channelId), + new AMQShortString("JMS client closing channel"), 0, 0).generateFrame(getChannelId()), ChannelCloseOkBody.class, timeout); // When control resumes at this point, a reply will have been received that // indicates the broker has closed the channel successfully. @@ -163,7 +163,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe // Acknowledge all delivered messages while (true) { - Long tag = _deliveredMessageTags.poll(); + Long tag = getDeliveredMessageTags().poll(); if (tag == null) { break; @@ -174,7 +174,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe final AMQProtocolHandler handler = getProtocolHandler(); - handler.syncWrite(getProtocolHandler().getMethodRegistry().createTxCommitBody().generateFrame(_channelId), TxCommitOkBody.class); + handler.syncWrite(getProtocolHandler().getMethodRegistry().createTxCommitBody().generateFrame(getChannelId()), TxCommitOkBody.class); } public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, final boolean exclusive, final Map<String, Object> arguments) throws AMQException, @@ -190,22 +190,22 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe } } QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,table); - AMQFrame queueDeclare = body.generateFrame(_channelId); + AMQFrame queueDeclare = body.generateFrame(getChannelId()); getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class); } public void sendRecover() throws AMQException, FailoverException { enforceRejectBehaviourDuringRecover(); - _prefetchedMessageTags.clear(); - _unacknowledgedMessageTags.clear(); + getPrefetchedMessageTags().clear(); + getUnacknowledgedMessageTags().clear(); if (isStrictAMQP()) { // We can't use the BasicRecoverBody-OK method as it isn't part of the spec. BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false); - _connection.getProtocolHandler().writeFrame(body.generateFrame(_channelId)); + getAMQConnection().getProtocolHandler().writeFrame(body.generateFrame(getChannelId())); _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order."); } else @@ -215,17 +215,17 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe if(getProtocolHandler().getProtocolVersion().equals(ProtocolVersion.v8_0)) { BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false); - _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverOkBody.class); + getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverOkBody.class); } else if(getProtocolVersion().equals(ProtocolVersion.v0_9)) { BasicRecoverSyncBody body = ((MethodRegistry_0_9)getMethodRegistry()).createBasicRecoverSyncBody(false); - _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverSyncOkBody.class); + getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverSyncOkBody.class); } else if(getProtocolVersion().equals(ProtocolVersion.v0_91)) { BasicRecoverSyncBody body = ((MethodRegistry_0_91)getMethodRegistry()).createBasicRecoverSyncBody(false); - _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverSyncOkBody.class); + getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverSyncOkBody.class); } else { @@ -238,9 +238,9 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe { if (_logger.isDebugEnabled()) { - _logger.debug("Prefetched message: _unacknowledgedMessageTags :" + _unacknowledgedMessageTags); + _logger.debug("Prefetched message: _unacknowledgedMessageTags :" + getUnacknowledgedMessageTags()); } - ArrayList<BasicMessageConsumer_0_8> consumersToCheck = new ArrayList<BasicMessageConsumer_0_8>(_consumers.values()); + ArrayList<BasicMessageConsumer_0_8> consumersToCheck = new ArrayList<BasicMessageConsumer_0_8>(getConsumers().values()); boolean messageListenerFound = false; boolean serverRejectBehaviourFound = false; for(BasicMessageConsumer_0_8 consumer : consumersToCheck) @@ -259,7 +259,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe if (serverRejectBehaviourFound) { //reject(false) any messages we don't want returned again - switch(_acknowledgeMode) + switch(getAcknowledgeMode()) { case Session.DUPS_OK_ACKNOWLEDGE: case Session.AUTO_ACKNOWLEDGE: @@ -268,7 +268,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe break; } case Session.CLIENT_ACKNOWLEDGE: - for(Long tag : _unacknowledgedMessageTags) + for(Long tag : getUnacknowledgedMessageTags()) { rejectMessage(tag, false); } @@ -286,7 +286,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe // consumer on the queue. Whilst this is within the JMS spec it is not // user friendly and avoidable. boolean normalRejectBehaviour = true; - for (BasicMessageConsumer_0_8 consumer : _consumers.values()) + for (BasicMessageConsumer_0_8 consumer : getConsumers().values()) { if(RejectBehaviour.SERVER.equals(consumer.getRejectBehaviour())) { @@ -298,7 +298,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe while (true) { - Long tag = _deliveredMessageTags.poll(); + Long tag = getDeliveredMessageTags().poll(); if (tag == null) { break; @@ -310,8 +310,8 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe public void rejectMessage(long deliveryTag, boolean requeue) { - if ((_acknowledgeMode == CLIENT_ACKNOWLEDGE) || (_acknowledgeMode == SESSION_TRANSACTED)|| - ((_acknowledgeMode == AUTO_ACKNOWLEDGE || _acknowledgeMode == DUPS_OK_ACKNOWLEDGE ) && hasMessageListeners())) + if ((getAcknowledgeMode() == CLIENT_ACKNOWLEDGE) || (getAcknowledgeMode() == SESSION_TRANSACTED)|| + ((getAcknowledgeMode() == AUTO_ACKNOWLEDGE || getAcknowledgeMode() == DUPS_OK_ACKNOWLEDGE ) && hasMessageListeners())) { if (_logger.isDebugEnabled()) { @@ -319,9 +319,9 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe } BasicRejectBody body = getMethodRegistry().createBasicRejectBody(deliveryTag, requeue); - AMQFrame frame = body.generateFrame(_channelId); + AMQFrame frame = body.generateFrame(getChannelId()); - _connection.getProtocolHandler().writeFrame(frame); + getAMQConnection().getProtocolHandler().writeFrame(frame); } } @@ -342,12 +342,12 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe public AMQMethodEvent execute() throws AMQException, FailoverException { AMQFrame boundFrame = getProtocolHandler().getMethodRegistry().createExchangeBoundBody - (exchangeName, routingKey, queueName).generateFrame(_channelId); + (exchangeName, routingKey, queueName).generateFrame(getChannelId()); return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class); } - }, _connection).execute(); + }, getAMQConnection()).execute(); // Extract and return the response code from the query. ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod(); @@ -378,7 +378,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe consumer.getArguments()); - AMQFrame jmsConsume = body.generateFrame(_channelId); + AMQFrame jmsConsume = body.generateFrame(getChannelId()); if (nowait) { @@ -396,7 +396,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe ExchangeDeclareBody body = getMethodRegistry().createExchangeDeclareBody(getTicket(),name,type, name.toString().startsWith("amq."), false,false,false,false,null); - AMQFrame exchangeDeclare = body.generateFrame(_channelId); + AMQFrame exchangeDeclare = body.generateFrame(getChannelId()); protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); } @@ -406,7 +406,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe { QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),amqd.getAMQQueueName(),false,amqd.isDurable(),amqd.isExclusive(),amqd.isAutoDelete(),false,null); - AMQFrame queueDeclare = body.generateFrame(_channelId); + AMQFrame queueDeclare = body.generateFrame(getChannelId()); protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class); } @@ -418,7 +418,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe false, false, true); - AMQFrame queueDeleteFrame = body.generateFrame(_channelId); + AMQFrame queueDeleteFrame = body.generateFrame(getChannelId()); getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class); } @@ -426,8 +426,8 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe public void sendSuspendChannel(boolean suspend) throws AMQException, FailoverException { ChannelFlowBody body = getMethodRegistry().createChannelFlowBody(!suspend); - AMQFrame channelFlowFrame = body.generateFrame(_channelId); - _connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class); + AMQFrame channelFlowFrame = body.generateFrame(getChannelId()); + getAMQConnection().getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class); } public BasicMessageConsumer_0_8 createMessageConsumer(final AMQDestination destination, final int prefetchHigh, @@ -436,9 +436,9 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe { final AMQProtocolHandler protocolHandler = getProtocolHandler(); - return new BasicMessageConsumer_0_8(_channelId, _connection, destination, messageSelector, noLocal, - _messageFactoryRegistry,this, protocolHandler, arguments, prefetchHigh, prefetchLow, - exclusive, _acknowledgeMode, noConsume, autoClose); + return new BasicMessageConsumer_0_8(getChannelId(), getAMQConnection(), destination, messageSelector, noLocal, + getMessageFactoryRegistry(),this, protocolHandler, arguments, prefetchHigh, prefetchLow, + exclusive, getAcknowledgeMode(), noConsume, autoClose); } @@ -447,7 +447,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe { try { - return new BasicMessageProducer_0_8(_connection, (AMQDestination) destination, _transacted, _channelId, + return new BasicMessageProducer_0_8(getAMQConnection(), (AMQDestination) destination, isTransacted(), getChannelId(), this, getProtocolHandler(), producerId, immediate, mandatory); } catch (AMQException e) @@ -477,7 +477,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe private void returnBouncedMessage(final ReturnMessage msg) { - _connection.performConnectionTask(new Runnable() + getAMQConnection().performConnectionTask(new Runnable() { public void run() { @@ -485,8 +485,8 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe { // Bounced message is processed here, away from the mina thread AbstractJMSMessage bouncedMessage = - _messageFactoryRegistry.createMessage(0, false, msg.getExchange(), - msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies(),_queueDestinationCache,_topicDestinationCache); + getMessageFactoryRegistry().createMessage(0, false, msg.getExchange(), + msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies(), _queueDestinationCache, _topicDestinationCache); AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode()); AMQShortString reason = msg.getReplyText(); _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); @@ -494,20 +494,17 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. if (errorCode == AMQConstant.NO_CONSUMERS) { - _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage, null)); - } - else if (errorCode == AMQConstant.NO_ROUTE) + getAMQConnection().exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage, null)); + } else if (errorCode == AMQConstant.NO_ROUTE) { - _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null)); - } - else + getAMQConnection().exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null)); + } else { - _connection.exceptionReceived( + getAMQConnection().exceptionReceived( new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage, null)); } - } - catch (Exception e) + } catch (Exception e) { _logger.error( "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", @@ -543,7 +540,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe return null; } - }, _connection).execute(); + }, getAMQConnection()).execute(); } public DestinationCache<AMQQueue> getQueueDestinationCache() @@ -579,6 +576,15 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe return matches; } + public long getMessageCount() + { + return _messageCount; + } + + public long getConsumerCount() + { + return _consumerCount; + } } protected Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException @@ -591,10 +597,10 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe amqd.isExclusive(), amqd.isAutoDelete(), false, - null).generateFrame(_channelId); + null).generateFrame(getChannelId()); QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler(); getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler); - return okHandler._messageCount; + return okHandler.getMessageCount(); } protected boolean tagLE(long tag1, long tag2) @@ -655,7 +661,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe { // if the Connection has closed then we should throw any exception that // has occurred that we were not waiting for - AMQStateManager manager = _connection.getProtocolHandler() + AMQStateManager manager = getAMQConnection().getProtocolHandler() .getStateManager(); Exception e = manager.getLastException(); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java index 00df898ec4..f09ef5e01d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java @@ -174,7 +174,7 @@ public class AMQTopic extends AMQDestination implements Topic } else { - return _exchangeName; + return super.getExchangeName(); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopicSessionAdaptor.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopicSessionAdaptor.java index 597cd6301b..6e454cdae9 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQTopicSessionAdaptor.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopicSessionAdaptor.java @@ -26,7 +26,7 @@ import java.io.Serializable; public class AMQTopicSessionAdaptor implements TopicSession, AMQSessionAdapter { - protected final AMQSession _session; + private final AMQSession _session; public AMQTopicSessionAdaptor(Session session) { diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 568d47080b..f0f2c85c2f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -60,14 +60,13 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa { private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class); - /** The connection being used by this consumer */ - protected final AMQConnection _connection; + private final AMQConnection _connection; - protected final MessageFilter _messageSelectorFilter; + private final MessageFilter _messageSelectorFilter; private final boolean _noLocal; - protected AMQDestination _destination; + private AMQDestination _destination; /** * When true indicates that a blocking receive call is in progress @@ -78,23 +77,17 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa */ private final AtomicReference<MessageListener> _messageListener = new AtomicReference<MessageListener>(); - /** The consumer tag allows us to close the consumer by sending a jmsCancel method to the broker */ - protected int _consumerTag; + private int _consumerTag; - /** We need to know the channel id when constructing frames */ - protected final int _channelId; + private final int _channelId; - /** - * Used in the blocking receive methods to receive a message from the Session thread. <p/> Or to notify of errors - * <p/> Argument true indicates we want strict FIFO semantics - */ - protected final BlockingQueue _synchronousQueue; + private final BlockingQueue _synchronousQueue; - protected final MessageFactoryRegistry _messageFactory; + private final MessageFactoryRegistry _messageFactory; - protected final AMQSession _session; + private final AMQSession _session; - protected final AMQProtocolHandler _protocolHandler; + private final AMQProtocolHandler _protocolHandler; /** * We need to store the "raw" field table so that we can resubscribe in the event of failover being required @@ -113,17 +106,9 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa */ private final int _prefetchLow; - /** - * We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover - */ - protected boolean _exclusive; + private boolean _exclusive; - /** - * The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes per - * consumer whereas JMS defines this at the session level, hence why we associate it with the consumer in our - * implementation. - */ - protected final int _acknowledgeMode; + private final int _acknowledgeMode; /** * List of tags delievered, The last of which which should be acknowledged on commit in transaction mode. @@ -238,6 +223,11 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa return _messageListener.get(); } + /** + * The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes per + * consumer whereas JMS defines this at the session level, hence why we associate it with the consumer in our + * implementation. + */ public int getAcknowledgeMode() { return _acknowledgeMode; @@ -377,6 +367,9 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa return _noLocal; } + /** + * We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover + */ public boolean isExclusive() { return _exclusive; @@ -865,6 +858,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa _session.deregisterConsumer(this); } + /** The consumer tag allows us to close the consumer by sending a jmsCancel method to the broker */ public int getConsumerTag() { return _consumerTag; @@ -1014,4 +1008,40 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa public void failedOverPost() {} + /** The connection being used by this consumer */ + protected AMQConnection getConnection() + { + return _connection; + } + + protected void setDestination(AMQDestination destination) + { + _destination = destination; + } + + /** We need to know the channel id when constructing frames */ + protected int getChannelId() + { + return _channelId; + } + + /** + * Used in the blocking receive methods to receive a message from the Session thread. <p/> Or to notify of errors + * <p/> Argument true indicates we want strict FIFO semantics + */ + protected BlockingQueue getSynchronousQueue() + { + return _synchronousQueue; + } + + protected MessageFactoryRegistry getMessageFactory() + { + return _messageFactory; + } + + protected AMQProtocolHandler getProtocolHandler() + { + return _protocolHandler; + } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 5dadcd5ca3..ccde720673 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -57,7 +57,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM /** * This class logger */ - protected final Logger _logger = LoggerFactory.getLogger(getClass()); + private final Logger _logger = LoggerFactory.getLogger(getClass()); /** * The underlying QpidSession @@ -78,7 +78,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM private final long _capacity; /** Flag indicating if the server supports message selectors */ - protected final boolean _serverJmsSelectorSupport; + private final boolean _serverJmsSelectorSupport; protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, @@ -103,8 +103,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM if (!namedQueue) { - _destination = destination.copyDestination(); - _destination.setQueueName(null); + setDestination(destination.copyDestination()); + getDestination().setQueueName(null); } } } @@ -192,14 +192,14 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM { super.preDeliver(jmsMsg); - if (_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE) + if (getAcknowledgeMode() == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE) { //For 0-10 we need to ensure that all messages are indicated processed in some way to //ensure their AMQP command-id is marked completed, and so we must send a completion //even for no-ack messages even though there isnt actually an 'acknowledgement' occurring. //Add message to the unacked message list to ensure we dont lose record of it before //sending a completion of some sort. - _session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag()); + getSession().addUnacknowledgedMessage(jmsMsg.getDeliveryTag()); } } @@ -207,7 +207,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_10 msg) throws Exception { AMQMessageDelegate_0_10.updateExchangeTypeMapping(msg.getMessageTransfer().getHeader(), ((AMQSession_0_10)getSession()).getQpidSession()); - return _messageFactory.createMessage(msg.getMessageTransfer()); + return getMessageFactory().createMessage(msg.getMessageTransfer()); } /** @@ -222,9 +222,9 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM boolean messageOk = true; try { - if (_messageSelectorFilter != null && !_serverJmsSelectorSupport) + if (getMessageSelectorFilter() != null && !_serverJmsSelectorSupport) { - messageOk = _messageSelectorFilter.matches(message); + messageOk = getMessageSelectorFilter().matches(message); } } catch (Exception e) @@ -285,7 +285,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM { _0_10session.messageAcknowledge (Range.newInstance((int) message.getDeliveryTag()), - _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE); + getAcknowledgeMode() != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE); final AMQException amqe = _0_10session.getCurrentException(); if (amqe != null) @@ -349,20 +349,20 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM { messageFlow(); } - if (messageListener != null && !_synchronousQueue.isEmpty()) + if (messageListener != null && !getSynchronousQueue().isEmpty()) { - Iterator messages=_synchronousQueue.iterator(); + Iterator messages= getSynchronousQueue().iterator(); while (messages.hasNext()) { AbstractJMSMessage message=(AbstractJMSMessage) messages.next(); messages.remove(); - _session.rejectMessage(message, true); + getSession().rejectMessage(message, true); } } } catch(TransportException e) { - throw _session.toJMSException("Exception while setting message listener:"+ e.getMessage(), e); + throw getSession().toJMSException("Exception while setting message listener:" + e.getMessage(), e); } } @@ -389,7 +389,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM { _syncReceive.set(true); } - if (_0_10session.isStarted() && _capacity == 0 && _synchronousQueue.isEmpty()) + if (_0_10session.isStarted() && _capacity == 0 && getSynchronousQueue().isEmpty()) { messageFlow(); } @@ -426,19 +426,19 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM { super.postDeliver(msg); - switch (_acknowledgeMode) + switch (getAcknowledgeMode()) { case Session.SESSION_TRANSACTED: _0_10session.sendTxCompletionsIfNecessary(); break; case Session.NO_ACKNOWLEDGE: - if (!_session.isInRecovery()) + if (!getSession().isInRecovery()) { - _session.acknowledgeMessage(msg.getDeliveryTag(), false); + getSession().acknowledgeMessage(msg.getDeliveryTag(), false); } break; case Session.AUTO_ACKNOWLEDGE: - if (!_session.isInRecovery() && _session.getAMQConnection().getSyncAck()) + if (!getSession().isInRecovery() && getSession().getAMQConnection().getSyncAck()) { ((AMQSession_0_10) getSession()).getQpidSession().sync(); } @@ -454,10 +454,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM @Override public void rollbackPendingMessages() { - if (_synchronousQueue.size() > 0) + if (getSynchronousQueue().size() > 0) { RangeSet ranges = RangeSetFactory.createRangeSet(); - Iterator iterator = _synchronousQueue.iterator(); + Iterator iterator = getSynchronousQueue().iterator(); while (iterator.hasNext()) { @@ -497,7 +497,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM } else { - return _exclusive; + return super.isExclusive(); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index 755c95fcfc..b00f9dd98a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -44,7 +44,7 @@ import javax.jms.Message; public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMessage_0_8> { - protected final Logger _logger = LoggerFactory.getLogger(getClass()); + private final Logger _logger = LoggerFactory.getLogger(getClass()); private AMQSession_0_8.DestinationCache<AMQTopic> _topicDestinationCache; private AMQSession_0_8.DestinationCache<AMQQueue> _queueDestinationCache; @@ -95,11 +95,11 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe void sendCancel() throws AMQException, FailoverException { - BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(new AMQShortString(String.valueOf(_consumerTag)), false); + BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(new AMQShortString(String.valueOf(getConsumerTag())), false); - final AMQFrame cancelFrame = body.generateFrame(_channelId); + final AMQFrame cancelFrame = body.generateFrame(getChannelId()); - _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class); + getProtocolHandler().syncWrite(cancelFrame, BasicCancelOkBody.class); if (_logger.isDebugEnabled()) { @@ -110,9 +110,9 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_8 messageFrame)throws Exception { - return _messageFactory.createMessage(messageFrame.getDeliveryTag(), - messageFrame.isRedelivered(), messageFrame.getExchange(), - messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies(), + return getMessageFactory().createMessage(messageFrame.getDeliveryTag(), + messageFrame.isRedelivered(), messageFrame.getExchange(), + messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies(), _queueDestinationCache, _topicDestinationCache); } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 0275cd2fd5..84747d6f09 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -47,16 +47,76 @@ import java.util.UUID; public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer { + /** + * If true, messages will not get a timestamp. + */ + protected boolean isDisableTimestamps() + { + return _disableTimestamps; + } + + protected void setDisableTimestamps(boolean disableTimestamps) + { + _disableTimestamps = disableTimestamps; + } + + protected void setDestination(AMQDestination destination) + { + _destination = destination; + } + + protected AMQProtocolHandler getProtocolHandler() + { + return _protocolHandler; + } + + protected void setProtocolHandler(AMQProtocolHandler protocolHandler) + { + _protocolHandler = protocolHandler; + } + + protected int getChannelId() + { + return _channelId; + } + + protected void setChannelId(int channelId) + { + _channelId = channelId; + } + + protected void setSession(AMQSession session) + { + _session = session; + } + + protected String getUserID() + { + return _userID; + } + + protected void setUserID(String userID) + { + _userID = userID; + } + + protected PublishMode getPublishMode() + { + return publishMode; + } + + protected void setPublishMode(PublishMode publishMode) + { + this.publishMode = publishMode; + } + enum PublishMode { ASYNC_PUBLISH_ALL, SYNC_PUBLISH_PERSISTENT, SYNC_PUBLISH_ALL }; - protected final Logger _logger = LoggerFactory.getLogger(getClass()); + private final Logger _logger = LoggerFactory.getLogger(getClass()); private AMQConnection _connection; - /** - * If true, messages will not get a timestamp. - */ - protected boolean _disableTimestamps; + private boolean _disableTimestamps; /** * Priority of messages created by this producer. @@ -73,10 +133,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac */ private int _deliveryMode = DeliveryMode.PERSISTENT; - /** - * The Destination used for this consumer, if specified upon creation. - */ - protected AMQDestination _destination; + private AMQDestination _destination; /** * Default encoding used for messages produced by this producer. @@ -88,14 +145,14 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac */ private String _mimeType; - protected AMQProtocolHandler _protocolHandler; + private AMQProtocolHandler _protocolHandler; /** * True if this producer was created from a transacted session */ private boolean _transacted; - protected int _channelId; + private int _channelId; /** * This is an id generated by the session and is used to tie individual producers to the session. This means we @@ -105,10 +162,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac */ private long _producerId; - /** - * The session used to create this producer - */ - protected AMQSession _session; + private AMQSession _session; private final boolean _immediate; @@ -118,11 +172,11 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac private UUIDGen _messageIdGenerator = UUIDs.newGenerator(); - protected String _userID; // ref user id used in the connection. + private String _userID; // ref user id used in the connection. private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0]; - protected PublishMode publishMode = PublishMode.ASYNC_PUBLISH_ALL; + private PublishMode publishMode = PublishMode.ASYNC_PUBLISH_ALL; protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory) throws AMQException @@ -256,6 +310,14 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac return _timeToLive; } + protected AMQDestination getAMQDestination() + { + return _destination; + } + + /** + * The Destination used for this consumer, if specified upon creation. + */ public Destination getDestination() throws JMSException { checkNotClosed(); @@ -564,6 +626,9 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac } + /** + * The session used to create this producer + */ public AMQSession getSession() { return _session; @@ -580,4 +645,10 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac throw getSession().toJMSException("Exception whilst checking destination binding:" + e.getMessage(), e); } } + + Logger getLogger() + { + return _logger; + } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 452e76776b..91811ccf98 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -65,7 +65,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer { super(connection, destination, transacted, channelId, session, protocolHandler, producerId, immediate, mandatory); - userIDBytes = Strings.toUTF8(_userID); + userIDBytes = Strings.toUTF8(getUserID()); } void declareDestination(AMQDestination destination) throws AMQException @@ -125,7 +125,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer } long currentTime = 0; - if (timeToLive > 0 || !_disableTimestamps) + if (timeToLive > 0 || !isDisableTimestamps()) { currentTime = System.currentTimeMillis(); } @@ -136,7 +136,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer message.setJMSExpiration(currentTime + timeToLive); } - if (!_disableTimestamps) + if (!isDisableTimestamps()) { deliveryProp.setTimestamp(currentTime); @@ -213,8 +213,8 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer // if true, we need to sync the delivery of this message boolean sync = false; - sync = ( (publishMode == PublishMode.SYNC_PUBLISH_ALL) || - (publishMode == PublishMode.SYNC_PUBLISH_PERSISTENT && + sync = ( (getPublishMode() == PublishMode.SYNC_PUBLISH_ALL) || + (getPublishMode() == PublishMode.SYNC_PUBLISH_PERSISTENT && deliveryMode == DeliveryMode.PERSISTENT) ); @@ -248,14 +248,14 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer @Override public boolean isBound(AMQDestination destination) throws JMSException { - return _session.isQueueBound(destination); + return getSession().isQueueBound(destination); } @Override public void close() throws JMSException { super.close(); - AMQDestination dest = _destination; + AMQDestination dest = getAMQDestination(); if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) { if (dest.getDelete() == AddressOption.ALWAYS || @@ -264,7 +264,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer try { ((AMQSession_0_10) getSession()).getQpidSession().queueDelete( - _destination.getQueueName()); + getAMQDestination().getQueueName()); } catch(TransportException e) { diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java index 94121db99f..3b5e361f97 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java @@ -54,7 +54,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer final MethodRegistry methodRegistry = getSession().getMethodRegistry(); ExchangeDeclareBody body = - methodRegistry.createExchangeDeclareBody(_session.getTicket(), + methodRegistry.createExchangeDeclareBody(getSession().getTicket(), destination.getExchangeName(), destination.getExchangeClass(), destination.getExchangeName().toString().startsWith("amq."), @@ -66,29 +66,29 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer // Declare the exchange // Note that the durable and internal arguments are ignored since passive is set to false - AMQFrame declare = body.generateFrame(_channelId); + AMQFrame declare = body.generateFrame(getChannelId()); - _protocolHandler.writeFrame(declare); + getProtocolHandler().writeFrame(declare); } void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message, UUID messageId, int deliveryMode,int priority, long timeToLive, boolean mandatory, boolean immediate) throws JMSException { - BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(_session.getTicket(), + BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(getSession().getTicket(), destination.getExchangeName(), destination.getRoutingKey(), mandatory, immediate); - AMQFrame publishFrame = body.generateFrame(_channelId); + AMQFrame publishFrame = body.generateFrame(getChannelId()); message.prepareForSending(); ByteBuffer payload = message.getData(); AMQMessageDelegate_0_8 delegate = (AMQMessageDelegate_0_8) message.getDelegate(); BasicContentHeaderProperties contentHeaderProperties = delegate.getContentHeaderProperties(); - contentHeaderProperties.setUserId(_userID); + contentHeaderProperties.setUserId(getUserID()); //Set the JMS_QPID_DESTTYPE for 0-8/9 messages int type; @@ -108,7 +108,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer //Set JMS_QPID_DESTTYPE delegate.getContentHeaderProperties().getHeaders().setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), type); - if (!_disableTimestamps) + if (!isDisableTimestamps()) { final long currentTime = System.currentTimeMillis(); contentHeaderProperties.setTimestamp(currentTime); @@ -132,12 +132,12 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer if (payload != null) { - createContentBodies(payload, frames, 2, _channelId); + createContentBodies(payload, frames, 2, getChannelId()); } - if ((contentBodyFrameCount != 0) && _logger.isDebugEnabled()) + if ((contentBodyFrameCount != 0) && getLogger().isDebugEnabled()) { - _logger.debug("Sending content body frames to " + destination); + getLogger().debug("Sending content body frames to " + destination); } @@ -145,11 +145,11 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer int classIfForBasic = getSession().getMethodRegistry().createBasicQosOkBody().getClazz(); AMQFrame contentHeaderFrame = - ContentHeaderBody.createAMQFrame(_channelId, + ContentHeaderBody.createAMQFrame(getChannelId(), classIfForBasic, 0, contentHeaderProperties, size); - if (_logger.isDebugEnabled()) + if (getLogger().isDebugEnabled()) { - _logger.debug("Sending content header frame to " + destination); + getLogger().debug("Sending content header frame to " + destination); } frames[0] = publishFrame; @@ -158,7 +158,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer try { - _session.checkFlowControl(); + getSession().checkFlowControl(); } catch (InterruptedException e) { @@ -168,7 +168,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer throw jmse; } - _protocolHandler.writeFrame(compositeFrame); + getProtocolHandler().writeFrame(compositeFrame); } /** @@ -192,7 +192,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer else { - final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1; + final long framePayloadMax = getSession().getAMQConnection().getMaximumFrameSize() - 1; long remaining = payload.remaining(); for (int i = offset; i < frames.length; i++) { @@ -222,7 +222,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer else { int dataLength = payload.remaining(); - final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1; + final long framePayloadMax = getSession().getAMQConnection().getMaximumFrameSize() - 1; int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0; frameCount = (int) (dataLength / framePayloadMax) + lastFrame; } diff --git a/java/client/src/main/java/org/apache/qpid/client/DispatcherCallback.java b/java/client/src/main/java/org/apache/qpid/client/DispatcherCallback.java index 81a55006ed..1cd8df6e4a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/DispatcherCallback.java +++ b/java/client/src/main/java/org/apache/qpid/client/DispatcherCallback.java @@ -24,7 +24,7 @@ import java.util.Queue; public abstract class DispatcherCallback { - BasicMessageConsumer _consumer; + private BasicMessageConsumer _consumer; public DispatcherCallback(BasicMessageConsumer mc) { diff --git a/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java b/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java index 585d6db3fd..134159afe1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java +++ b/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java @@ -22,8 +22,8 @@ package org.apache.qpid.client; public class MessageConsumerPair { - BasicMessageConsumer _consumer; - Object _item; + private BasicMessageConsumer _consumer; + private Object _item; public MessageConsumerPair(BasicMessageConsumer consumer, Object item) { diff --git a/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java b/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java index 295c6a4091..0b797df9dd 100644 --- a/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java +++ b/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java @@ -202,7 +202,7 @@ public class QueueSenderAdapter implements QueueSender { if (_delegate.getSession().isStrictAMQP()) { - _delegate._logger.warn("AMQP does not support destination validation before publish, "); + _delegate.getLogger().warn("AMQP does not support destination validation before publish, "); destination.setCheckedForQueueBinding(true); } else diff --git a/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java b/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java index a7494305c6..d9514338ce 100644 --- a/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java +++ b/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java @@ -53,7 +53,7 @@ public class XAConnectionImpl extends AMQConnection implements XAConnection, XAQ public synchronized XASession createXASession() throws JMSException { checkNotClosed(); - return _delegate.createXASession(); + return getDelegate().createXASession(); } //-- Interface XAQueueConnection @@ -86,6 +86,6 @@ public class XAConnectionImpl extends AMQConnection implements XAConnection, XAQ public XASession createXASession(int ackMode) throws JMSException { checkNotClosed(); - return _delegate.createXASession(ackMode); + return getDelegate().createXASession(ackMode); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java b/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java index 1d991372df..85623df8c0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java +++ b/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java @@ -86,7 +86,7 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic */ public void createSession() { - _qpidDtxSession = _qpidConnection.createSession(0); + _qpidDtxSession = getQpidConnection().createSession(0); _qpidDtxSession.setSessionListener(this); _qpidDtxSession.dtxSelect(); } diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java index 51cc94965a..a69e808880 100644 --- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java @@ -38,10 +38,10 @@ import org.apache.qpid.client.AMQConnection; public class FailoverNoopSupport<T, E extends Exception> implements FailoverSupport<T, E> { /** The protected operation that is to be retried in the event of fail-over. */ - FailoverProtectedOperation<T, E> operation; + private FailoverProtectedOperation<T, E> operation; /** The connection on which the fail-over protected operation is to be performed. */ - AMQConnection connection; + private AMQConnection connection; /** * Creates an automatic retrying fail-over handler for the specified operation. diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java index 0146f1935d..d3d33d3c75 100644 --- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java @@ -73,10 +73,10 @@ public class FailoverRetrySupport<T, E extends Exception> implements FailoverSup private static final Logger _log = LoggerFactory.getLogger(FailoverRetrySupport.class); /** The protected operation that is to be retried in the event of fail-over. */ - FailoverProtectedOperation<T, E> operation; + private FailoverProtectedOperation<T, E> operation; /** The connection on which the fail-over protected operation is to be performed. */ - AMQConnection connection; + private AMQConnection connection; /** * Creates an automatic retrying fail-over handler for the specified operation. diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java index 9af225aded..558d93538b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java @@ -104,7 +104,7 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher return factory.createMethodDispatcher(session); } - AMQProtocolSession _session; + private AMQProtocolSession _session; public ClientMethodDispatcherImpl(AMQProtocolSession session) { diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java index 98ca8ed8cb..1395f39b99 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java @@ -121,18 +121,18 @@ public abstract class AbstractAMQMessageDelegate implements AMQMessageDelegate exchangeInfo = new ExchangeInfo(exchange.asString(),"",AMQDestination.UNKNOWN_TYPE); } - if ("topic".equals(exchangeInfo.exchangeType)) + if ("topic".equals(exchangeInfo.getExchangeType())) { dest = new AMQTopic(exchange, routingKey, null); } - else if ("direct".equals(exchangeInfo.exchangeType)) + else if ("direct".equals(exchangeInfo.getExchangeType())) { dest = new AMQQueue(exchange, routingKey, routingKey); } else { dest = new AMQAnyDestination(exchange, - new AMQShortString(exchangeInfo.exchangeType), + new AMQShortString(exchangeInfo.getExchangeType()), routingKey, false, false, @@ -223,9 +223,9 @@ public abstract class AbstractAMQMessageDelegate implements AMQMessageDelegate class ExchangeInfo { - String exchangeName; - String exchangeType; - int destType = AMQDestination.QUEUE_TYPE; + private String exchangeName; + private String exchangeType; + private int destType = AMQDestination.QUEUE_TYPE; public ExchangeInfo(String exchangeName, String exchangeType, int destType) diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java index a4173be1d7..9c7bd0bdcf 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java @@ -34,7 +34,7 @@ import java.nio.ByteBuffer; */ public abstract class AbstractBytesTypedMessage extends AbstractJMSMessage { - protected boolean _readableMessage = false; + private boolean _readableMessage = false; AbstractBytesTypedMessage(AMQMessageDelegateFactory delegateFactory, boolean fromReceivedMessage) { @@ -75,6 +75,11 @@ public abstract class AbstractBytesTypedMessage extends AbstractJMSMessage _readableMessage = false; } + protected void setReadable(boolean readable) + { + _readableMessage = readable; + } + public String toBodyString() throws JMSException { diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 442fca6fe3..d1e43447cc 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -36,7 +36,7 @@ public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */ - protected AMQMessageDelegate _delegate; + private AMQMessageDelegate _delegate; private boolean _redelivered; private boolean _receivedFromServer; diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java index 6fbcea8aed..608567674a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java @@ -57,25 +57,25 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory { if (debug) { - _logger.debug("Non-fragmented message body (bodySize=" + contentHeader.bodySize + ")"); + _logger.debug("Non-fragmented message body (bodySize=" + contentHeader.getBodySize() + ")"); } - data = ByteBuffer.wrap(((ContentBody) bodies.get(0))._payload); + data = ByteBuffer.wrap(((ContentBody) bodies.get(0)).getPayload()); } else if (bodies != null) { if (debug) { _logger.debug("Fragmented message body (" + bodies - .size() + " frames, bodySize=" + contentHeader.bodySize + ")"); + .size() + " frames, bodySize=" + contentHeader.getBodySize() + ")"); } - data = ByteBuffer.allocate((int) contentHeader.bodySize); // XXX: Is cast a problem? + data = ByteBuffer.allocate((int) contentHeader.getBodySize()); // XXX: Is cast a problem? final Iterator it = bodies.iterator(); while (it.hasNext()) { ContentBody cb = (ContentBody) it.next(); - final ByteBuffer payload = ByteBuffer.wrap(cb._payload); + final ByteBuffer payload = ByteBuffer.wrap(cb.getPayload()); if(payload.isDirect() || payload.isReadOnly()) { data.put(payload); diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java index 98cc323ad3..b0320d0f4e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java @@ -52,7 +52,7 @@ public class JMSBytesMessage extends AbstractBytesTypedMessage implements BytesM public void reset() { - _readableMessage = true; + setReadable(true); if(_typedBytesContentReader != null) { diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java index 7fca76268f..b958d89515 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java @@ -54,7 +54,7 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea public void reset() { - _readableMessage = true; + setReadable(true); if(_typedBytesContentReader != null) { diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java index 847975a5e5..c78b6ced93 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java @@ -87,13 +87,13 @@ public class UnprocessedMessage_0_8 extends UnprocessedMessage public void receiveBody(ContentBody body) { - if (body._payload != null) + if (body.getPayload() != null) { - final long payloadSize = body._payload.length; + final long payloadSize = body.getPayload().length; if (_bodies == null) { - if (payloadSize == getContentHeader().bodySize) + if (payloadSize == getContentHeader().getBodySize()) { _bodies = Collections.singletonList(body); } @@ -124,7 +124,7 @@ public class UnprocessedMessage_0_8 extends UnprocessedMessage public boolean isAllBodyDataReceived() { - return _bytesReceived == getContentHeader().bodySize; + return _bytesReceived == getContentHeader().getBodySize(); } public BasicDeliverBody getDeliverBody() diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java index c73d800b14..41f6725c8f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java +++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java @@ -29,16 +29,16 @@ public class Link public enum Reliability { UNRELIABLE, AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE } - protected String name; - protected String _filter; - protected FilterType _filterType = FilterType.SUBJECT; - protected boolean _isNoLocal; - protected boolean _isDurable; - protected int _consumerCapacity = 0; - protected int _producerCapacity = 0; - protected Node node; - protected Subscription subscription; - protected Reliability reliability = Reliability.AT_LEAST_ONCE; + private String name; + private String _filter; + private FilterType _filterType = FilterType.SUBJECT; + private boolean _isNoLocal; + private boolean _isDurable; + private int _consumerCapacity = 0; + private int _producerCapacity = 0; + private Node node; + private Subscription subscription; + private Reliability reliability = Reliability.AT_LEAST_ONCE; public Reliability getReliability() { diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java index fe469090d8..bb5bba9068 100644 --- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java +++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java @@ -31,13 +31,18 @@ import java.util.Map; public abstract class Node { - protected int _nodeType = AMQDestination.UNKNOWN_TYPE; - protected boolean _isDurable; - protected boolean _isAutoDelete; - protected String _alternateExchange; - protected List<Binding> _bindings = new ArrayList<Binding>(); - protected Map<String,Object> _declareArgs = Collections.emptyMap(); - + private int _nodeType = AMQDestination.UNKNOWN_TYPE; + private boolean _isDurable; + private boolean _isAutoDelete; + private String _alternateExchange; + private List<Binding> _bindings = new ArrayList<Binding>(); + private Map<String,Object> _declareArgs = Collections.emptyMap(); + + protected Node(int nodeType) + { + _nodeType = nodeType; + } + public int getType() { return _nodeType; @@ -104,7 +109,7 @@ public abstract class Node public QueueNode() { - _nodeType = AMQDestination.QUEUE_TYPE; + super(AMQDestination.QUEUE_TYPE); } public boolean isExclusive() @@ -125,7 +130,7 @@ public abstract class Node public ExchangeNode() { - _nodeType = AMQDestination.TOPIC_TYPE; + super(AMQDestination.TOPIC_TYPE); } public String getExchangeType() @@ -142,5 +147,9 @@ public abstract class Node public static class UnknownNodeType extends Node { + public UnknownNodeType() + { + super(AMQDestination.UNKNOWN_TYPE); + } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index c23e2ba985..d4da0ede32 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -865,159 +865,159 @@ public class AMQProtocolHandler implements ProtocolEngine } private static class BytesDataOutput implements DataOutput + { + private int _pos = 0; + private byte[] _buf; + + public BytesDataOutput(byte[] buf) { - int _pos = 0; - byte[] _buf; + _buf = buf; + } - public BytesDataOutput(byte[] buf) - { - _buf = buf; - } + public void setBuffer(byte[] buf) + { + _buf = buf; + _pos = 0; + } - public void setBuffer(byte[] buf) - { - _buf = buf; - _pos = 0; - } + public void reset() + { + _pos = 0; + } - public void reset() - { - _pos = 0; - } + public int length() + { + return _pos; + } - public int length() - { - return _pos; - } + public void write(int b) + { + _buf[_pos++] = (byte) b; + } - public void write(int b) - { - _buf[_pos++] = (byte) b; - } + public void write(byte[] b) + { + System.arraycopy(b, 0, _buf, _pos, b.length); + _pos+=b.length; + } - public void write(byte[] b) - { - System.arraycopy(b, 0, _buf, _pos, b.length); - _pos+=b.length; - } + public void write(byte[] b, int off, int len) + { + System.arraycopy(b, off, _buf, _pos, len); + _pos+=len; - public void write(byte[] b, int off, int len) - { - System.arraycopy(b, off, _buf, _pos, len); - _pos+=len; + } - } + public void writeBoolean(boolean v) + { + _buf[_pos++] = v ? (byte) 1 : (byte) 0; + } - public void writeBoolean(boolean v) - { - _buf[_pos++] = v ? (byte) 1 : (byte) 0; - } + public void writeByte(int v) + { + _buf[_pos++] = (byte) v; + } - public void writeByte(int v) - { - _buf[_pos++] = (byte) v; - } + public void writeShort(int v) + { + _buf[_pos++] = (byte) (v >>> 8); + _buf[_pos++] = (byte) v; + } - public void writeShort(int v) - { - _buf[_pos++] = (byte) (v >>> 8); - _buf[_pos++] = (byte) v; - } + public void writeChar(int v) + { + _buf[_pos++] = (byte) (v >>> 8); + _buf[_pos++] = (byte) v; + } - public void writeChar(int v) + public void writeInt(int v) + { + _buf[_pos++] = (byte) (v >>> 24); + _buf[_pos++] = (byte) (v >>> 16); + _buf[_pos++] = (byte) (v >>> 8); + _buf[_pos++] = (byte) v; + } + + public void writeLong(long v) + { + _buf[_pos++] = (byte) (v >>> 56); + _buf[_pos++] = (byte) (v >>> 48); + _buf[_pos++] = (byte) (v >>> 40); + _buf[_pos++] = (byte) (v >>> 32); + _buf[_pos++] = (byte) (v >>> 24); + _buf[_pos++] = (byte) (v >>> 16); + _buf[_pos++] = (byte) (v >>> 8); + _buf[_pos++] = (byte)v; + } + + public void writeFloat(float v) + { + writeInt(Float.floatToIntBits(v)); + } + + public void writeDouble(double v) + { + writeLong(Double.doubleToLongBits(v)); + } + + public void writeBytes(String s) + { + int len = s.length(); + for (int i = 0 ; i < len ; i++) { - _buf[_pos++] = (byte) (v >>> 8); - _buf[_pos++] = (byte) v; + _buf[_pos++] = ((byte)s.charAt(i)); } + } - public void writeInt(int v) + public void writeChars(String s) + { + int len = s.length(); + for (int i = 0 ; i < len ; i++) { - _buf[_pos++] = (byte) (v >>> 24); - _buf[_pos++] = (byte) (v >>> 16); + int v = s.charAt(i); _buf[_pos++] = (byte) (v >>> 8); _buf[_pos++] = (byte) v; } + } - public void writeLong(long v) - { - _buf[_pos++] = (byte) (v >>> 56); - _buf[_pos++] = (byte) (v >>> 48); - _buf[_pos++] = (byte) (v >>> 40); - _buf[_pos++] = (byte) (v >>> 32); - _buf[_pos++] = (byte) (v >>> 24); - _buf[_pos++] = (byte) (v >>> 16); - _buf[_pos++] = (byte) (v >>> 8); - _buf[_pos++] = (byte)v; - } + public void writeUTF(String s) + { + int strlen = s.length(); - public void writeFloat(float v) - { - writeInt(Float.floatToIntBits(v)); - } + int pos = _pos; + _pos+=2; - public void writeDouble(double v) - { - writeLong(Double.doubleToLongBits(v)); - } - public void writeBytes(String s) + for (int i = 0; i < strlen; i++) { - int len = s.length(); - for (int i = 0 ; i < len ; i++) + int c = s.charAt(i); + if ((c >= 0x0001) && (c <= 0x007F)) { - _buf[_pos++] = ((byte)s.charAt(i)); - } - } + c = s.charAt(i); + _buf[_pos++] = (byte) c; - public void writeChars(String s) - { - int len = s.length(); - for (int i = 0 ; i < len ; i++) + } + else if (c > 0x07FF) { - int v = s.charAt(i); - _buf[_pos++] = (byte) (v >>> 8); - _buf[_pos++] = (byte) v; + _buf[_pos++] = (byte) (0xE0 | ((c >> 12) & 0x0F)); + _buf[_pos++] = (byte) (0x80 | ((c >> 6) & 0x3F)); + _buf[_pos++] = (byte) (0x80 | (c & 0x3F)); } - } - - public void writeUTF(String s) - { - int strlen = s.length(); - - int pos = _pos; - _pos+=2; - - - for (int i = 0; i < strlen; i++) + else { - int c = s.charAt(i); - if ((c >= 0x0001) && (c <= 0x007F)) - { - c = s.charAt(i); - _buf[_pos++] = (byte) c; - - } - else if (c > 0x07FF) - { - _buf[_pos++] = (byte) (0xE0 | ((c >> 12) & 0x0F)); - _buf[_pos++] = (byte) (0x80 | ((c >> 6) & 0x3F)); - _buf[_pos++] = (byte) (0x80 | (c & 0x3F)); - } - else - { - _buf[_pos++] = (byte) (0xC0 | ((c >> 6) & 0x1F)); - _buf[_pos++] = (byte) (0x80 | (c & 0x3F)); - } + _buf[_pos++] = (byte) (0xC0 | ((c >> 6) & 0x1F)); + _buf[_pos++] = (byte) (0x80 | (c & 0x3F)); } - - int len = _pos - (pos + 2); - - _buf[pos++] = (byte) (len >>> 8); - _buf[pos] = (byte) len; } + int len = _pos - (pos + 2); + + _buf[pos++] = (byte) (len >>> 8); + _buf[pos] = (byte) len; } + } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 93a51adb68..c9b2e9cdc4 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -73,16 +73,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession protected static final String SASL_CLIENT = "SASLClient"; - /** - * The handler from which this session was created and which is used to handle protocol events. We send failover - * events to the handler. - */ - protected final AMQProtocolHandler _protocolHandler; + private final AMQProtocolHandler _protocolHandler; - /** Maps from the channel id to the AMQSession that it represents. */ - protected ConcurrentMap<Integer, AMQSession> _channelId2SessionMap = new ConcurrentHashMap<Integer, AMQSession>(); + private ConcurrentMap<Integer, AMQSession> _channelId2SessionMap = new ConcurrentHashMap<Integer, AMQSession>(); - protected ConcurrentMap _closingChannels = new ConcurrentHashMap(); + private ConcurrentMap _closingChannels = new ConcurrentHashMap(); /** * Maps from a channel id to an unprocessed message. This is used to tie together the JmsDeliverBody (which arrives @@ -91,9 +86,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession private final ConcurrentMap<Integer, UnprocessedMessage> _channelId2UnprocessedMsgMap = new ConcurrentHashMap<Integer, UnprocessedMessage>(); private final UnprocessedMessage[] _channelId2UnprocessedMsgArray = new UnprocessedMessage[16]; - /** Counter to ensure unique queue names */ - protected int _queueId = 1; - protected final Object _queueIdLock = new Object(); + private int _queueId = 1; + private final Object _queueIdLock = new Object(); private ProtocolVersion _protocolVersion; // private VersionSpecificRegistry _registry = @@ -104,7 +98,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession private MethodDispatcher _methodDispatcher; - protected final AMQConnection _connection; + private final AMQConnection _connection; private ConnectionTuneParameters _connectionTuneParameters; @@ -223,7 +217,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession } msg.setContentHeader(contentHeader); - if (contentHeader.bodySize == 0) + if (contentHeader.getBodySize() == 0) { deliverMessageToAMQSession(channelId, msg); } @@ -470,4 +464,55 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { return "AMQProtocolSession[" + _connection + ']'; } + + /** + * The handler from which this session was created and which is used to handle protocol events. We send failover + * events to the handler. + */ + protected AMQProtocolHandler getProtocolHandler() + { + return _protocolHandler; + } + + /** Maps from the channel id to the AMQSession that it represents. */ + protected ConcurrentMap<Integer, AMQSession> getChannelId2SessionMap() + { + return _channelId2SessionMap; + } + + protected void setChannelId2SessionMap(ConcurrentMap<Integer, AMQSession> channelId2SessionMap) + { + _channelId2SessionMap = channelId2SessionMap; + } + + protected ConcurrentMap getClosingChannels() + { + return _closingChannels; + } + + protected void setClosingChannels(ConcurrentMap closingChannels) + { + _closingChannels = closingChannels; + } + + /** Counter to ensure unique queue names */ + protected int getQueueId() + { + return _queueId; + } + + protected void setQueueId(int queueId) + { + _queueId = queueId; + } + + protected Object getQueueIdLock() + { + return _queueIdLock; + } + + protected AMQConnection getConnection() + { + return _connection; + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java index 4350b48a10..b865c51cb7 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java @@ -63,7 +63,7 @@ public abstract class BlockingMethodFrameListener extends BlockingWaiter<AMQMeth { /** Holds the channel id for the channel upon which this listener is waiting for a response. */ - protected int _channelId; + private int _channelId; /** * Creates a new method listener, that filters incoming method to just those that match the specified channel id. diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index b52c121485..616c02f3aa 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -63,7 +63,7 @@ public class AMQStateManager implements AMQMethodListener private static final long MAXIMUM_STATE_WAIT_TIME = Long.parseLong(System.getProperty("amqj.MaximumStateWait", "30000")); - protected final List<StateWaiter> _waiters = new CopyOnWriteArrayList<StateWaiter>(); + private final List<StateWaiter> _waiters = new CopyOnWriteArrayList<StateWaiter>(); private Exception _lastException; public AMQStateManager() diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java b/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java index 6e47e2ce28..9e21e1c4ab 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java @@ -36,7 +36,7 @@ import org.apache.qpid.jms.BrokerDetails; */ public class AMQNoTransportForProtocolException extends AMQTransportConnectionException { - BrokerDetails _details; + private BrokerDetails _details; public AMQNoTransportForProtocolException(BrokerDetails details, String message, Throwable cause) { diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java b/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java index db7c16974a..3c9a6e1500 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java @@ -85,7 +85,7 @@ public class ClientConnectionDelegate extends ClientDelegate protected SaslClient createSaslClient(List<Object> brokerMechs) throws ConnectionException, SaslException { final String brokerMechanisms = Strings.join(" ", brokerMechs); - final String restrictionList = _conSettings.getSaslMechs(); + final String restrictionList = getConnectionSettings().getSaslMechs(); final String selectedMech = CallbackHandlerRegistry.getInstance().selectMechanism(brokerMechanisms, restrictionList); if (selectedMech == null) { @@ -96,14 +96,14 @@ public class ClientConnectionDelegate extends ClientDelegate } Map<String,Object> saslProps = new HashMap<String,Object>(); - if (_conSettings.isUseSASLEncryption()) + if (getConnectionSettings().isUseSASLEncryption()) { saslProps.put(Sasl.QOP, "auth-conf"); } final AMQCallbackHandler handler = CallbackHandlerRegistry.getInstance().createCallbackHandler(selectedMech); handler.initialise(_connectionURL); - final SaslClient sc = Sasl.createSaslClient(new String[] {selectedMech}, null, _conSettings.getSaslProtocol(), _conSettings.getSaslServerName(), saslProps, handler); + final SaslClient sc = Sasl.createSaslClient(new String[] {selectedMech}, null, getConnectionSettings().getSaslProtocol(), getConnectionSettings().getSaslServerName(), saslProps, handler); return sc; } @@ -137,7 +137,7 @@ public class ClientConnectionDelegate extends ClientDelegate private String getKerberosUser() { LOGGER.debug("Obtaining userID from kerberos"); - String service = _conSettings.getSaslProtocol() + "@" + _conSettings.getSaslServerName(); + String service = getConnectionSettings().getSaslProtocol() + "@" + getConnectionSettings().getSaslServerName(); GSSManager manager = GSSManager.getInstance(); try diff --git a/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java b/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java index 5d36b2f19e..c371341265 100644 --- a/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java +++ b/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java @@ -85,7 +85,7 @@ public abstract class BlockingWaiter<T> private volatile Exception _error; /** Holds the incomming Object. */ - protected Object _doneObject = null; + private Object _doneObject = null; private AtomicBoolean _waiting = new AtomicBoolean(false); private boolean _closed = false; diff --git a/java/client/src/main/java/org/apache/qpid/collections/ReferenceMap.java b/java/client/src/main/java/org/apache/qpid/collections/ReferenceMap.java index 7bc1322e02..84e4704867 100644 --- a/java/client/src/main/java/org/apache/qpid/collections/ReferenceMap.java +++ b/java/client/src/main/java/org/apache/qpid/collections/ReferenceMap.java @@ -795,10 +795,10 @@ public class ReferenceMap extends AbstractMap // the mapping is stale and should be removed. private class Entry implements Map.Entry, KeyValue { - Object key; - Object value; - int hash; - Entry next; + private Object key; + private Object value; + private int hash; + private Entry next; public Entry(Object key, int hash, Object value, Entry next) { @@ -887,17 +887,17 @@ public class ReferenceMap extends AbstractMap private class EntryIterator implements Iterator { // These fields keep track of where we are in the table. - int index; - Entry entry; - Entry previous; + private int index; + private Entry entry; + private Entry previous; // These Object fields provide hard references to the // current and next entry; this assures that if hasNext() // returns true, next() will actually return a valid element. - Object nextKey, nextValue; - Object currentKey, currentValue; + private Object nextKey, nextValue; + private Object currentKey, currentValue; - int expectedModCount; + private int expectedModCount; public EntryIterator() { diff --git a/java/client/src/main/java/org/apache/qpid/collections/keyvalue/AbstractKeyValue.java b/java/client/src/main/java/org/apache/qpid/collections/keyvalue/AbstractKeyValue.java index a7ca67ad15..f1b6d11bee 100644 --- a/java/client/src/main/java/org/apache/qpid/collections/keyvalue/AbstractKeyValue.java +++ b/java/client/src/main/java/org/apache/qpid/collections/keyvalue/AbstractKeyValue.java @@ -33,9 +33,9 @@ import org.apache.qpid.collections.KeyValue; public abstract class AbstractKeyValue implements KeyValue { /** The key */ - protected Object key; + private Object key; /** The value */ - protected Object value; + private Object value; /** * Constructs a new pair with the specified key and given value. @@ -68,6 +68,21 @@ public abstract class AbstractKeyValue implements KeyValue { } /** + * Sets the value stored in this <code>Map.Entry</code>. + * <p> + * This <code>Map.Entry</code> is not connected to a Map, so only the + * local data is changed. + * + * @param value the new value + * @return the previous value + */ + public Object setValue(Object value) { + Object answer = this.value; + this.value = value; + return answer; + } + + /** * Gets a debugging String view of the pair. * * @return a String view of the entry diff --git a/java/client/src/main/java/org/apache/qpid/collections/keyvalue/AbstractMapEntry.java b/java/client/src/main/java/org/apache/qpid/collections/keyvalue/AbstractMapEntry.java index a5223d2361..7135c31fd7 100644 --- a/java/client/src/main/java/org/apache/qpid/collections/keyvalue/AbstractMapEntry.java +++ b/java/client/src/main/java/org/apache/qpid/collections/keyvalue/AbstractMapEntry.java @@ -43,20 +43,7 @@ public abstract class AbstractMapEntry extends AbstractKeyValue implements Map.E // Map.Entry interface //------------------------------------------------------------------------- - /** - * Sets the value stored in this <code>Map.Entry</code>. - * <p> - * This <code>Map.Entry</code> is not connected to a Map, so only the - * local data is changed. - * - * @param value the new value - * @return the previous value - */ - public Object setValue(Object value) { - Object answer = this.value; - this.value = value; - return answer; - } + /** * Compares this <code>Map.Entry</code> with another <code>Map.Entry</code>. diff --git a/java/client/src/main/java/org/apache/qpid/filter/ArithmeticExpression.java b/java/client/src/main/java/org/apache/qpid/filter/ArithmeticExpression.java index a86613f10c..df5e2acd66 100644 --- a/java/client/src/main/java/org/apache/qpid/filter/ArithmeticExpression.java +++ b/java/client/src/main/java/org/apache/qpid/filter/ArithmeticExpression.java @@ -243,13 +243,13 @@ public abstract class ArithmeticExpression extends BinaryExpression public Object evaluate(AbstractJMSMessage message) throws AMQInternalException { - Object lvalue = left.evaluate(message); + Object lvalue = getLeft().evaluate(message); if (lvalue == null) { return null; } - Object rvalue = right.evaluate(message); + Object rvalue = getRight().evaluate(message); if (rvalue == null) { return null; diff --git a/java/client/src/main/java/org/apache/qpid/filter/BinaryExpression.java b/java/client/src/main/java/org/apache/qpid/filter/BinaryExpression.java index f97f858fad..a08a6cc094 100644 --- a/java/client/src/main/java/org/apache/qpid/filter/BinaryExpression.java +++ b/java/client/src/main/java/org/apache/qpid/filter/BinaryExpression.java @@ -22,8 +22,8 @@ package org.apache.qpid.filter; */ public abstract class BinaryExpression implements Expression { - protected Expression left; - protected Expression right; + private final Expression left; + private final Expression right; public BinaryExpression(Expression left, Expression right) { @@ -84,20 +84,5 @@ public abstract class BinaryExpression implements Expression */ public abstract String getExpressionSymbol(); - /** - * @param expression - */ - public void setRight(Expression expression) - { - right = expression; - } - - /** - * @param expression - */ - public void setLeft(Expression expression) - { - left = expression; - } } diff --git a/java/client/src/main/java/org/apache/qpid/filter/ComparisonExpression.java b/java/client/src/main/java/org/apache/qpid/filter/ComparisonExpression.java index eebfec0b2d..87d43ec343 100644 --- a/java/client/src/main/java/org/apache/qpid/filter/ComparisonExpression.java +++ b/java/client/src/main/java/org/apache/qpid/filter/ComparisonExpression.java @@ -69,7 +69,7 @@ public abstract class ComparisonExpression extends BinaryExpression implements B static class LikeExpression extends UnaryExpression implements BooleanExpression { - Pattern likePattern; + private Pattern likePattern; /** * @param right @@ -236,8 +236,8 @@ public abstract class ComparisonExpression extends BinaryExpression implements B public Object evaluate(AbstractJMSMessage message) throws AMQInternalException { - Object lv = left.evaluate(message); - Object rv = right.evaluate(message); + Object lv = getLeft().evaluate(message); + Object rv = getRight().evaluate(message); // Iff one of the values is null if ((lv == null) ^ (rv == null)) @@ -419,13 +419,13 @@ public abstract class ComparisonExpression extends BinaryExpression implements B public Object evaluate(AbstractJMSMessage message) throws AMQInternalException { - Comparable lv = (Comparable) left.evaluate(message); + Comparable lv = (Comparable) getLeft().evaluate(message); if (lv == null) { return null; } - Comparable rv = (Comparable) right.evaluate(message); + Comparable rv = (Comparable) getRight().evaluate(message); if (rv == null) { return null; diff --git a/java/client/src/main/java/org/apache/qpid/filter/LogicExpression.java b/java/client/src/main/java/org/apache/qpid/filter/LogicExpression.java index 7ef85cbacb..b08b93228f 100644 --- a/java/client/src/main/java/org/apache/qpid/filter/LogicExpression.java +++ b/java/client/src/main/java/org/apache/qpid/filter/LogicExpression.java @@ -35,14 +35,14 @@ public abstract class LogicExpression extends BinaryExpression implements Boolea public Object evaluate(AbstractJMSMessage message) throws AMQInternalException { - Boolean lv = (Boolean) left.evaluate(message); + Boolean lv = (Boolean) getLeft().evaluate(message); // Can we do an OR shortcut?? if ((lv != null) && lv.booleanValue()) { return Boolean.TRUE; } - Boolean rv = (Boolean) right.evaluate(message); + Boolean rv = (Boolean) getRight().evaluate(message); return (rv == null) ? null : rv; } @@ -62,7 +62,7 @@ public abstract class LogicExpression extends BinaryExpression implements Boolea public Object evaluate(AbstractJMSMessage message) throws AMQInternalException { - Boolean lv = (Boolean) left.evaluate(message); + Boolean lv = (Boolean) getLeft().evaluate(message); // Can we do an AND shortcut?? if (lv == null) @@ -75,7 +75,7 @@ public abstract class LogicExpression extends BinaryExpression implements Boolea return Boolean.FALSE; } - Boolean rv = (Boolean) right.evaluate(message); + Boolean rv = (Boolean) getRight().evaluate(message); return (rv == null) ? null : rv; } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java b/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java index ad0a625c6a..9a2e9de3d9 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java @@ -38,8 +38,8 @@ import java.nio.ByteBuffer; */ public class MessagePartListenerAdapter implements MessagePartListener { - MessageListener _adaptee; - ByteBufferMessage _currentMsg; + private MessageListener _adaptee; + private ByteBufferMessage _currentMsg; public MessagePartListenerAdapter(MessageListener listener) { diff --git a/java/client/src/test/java/org/apache/qpid/client/AMQQueueTest.java b/java/client/src/test/java/org/apache/qpid/client/AMQQueueTest.java index d862acf28d..bc48ee8895 100644 --- a/java/client/src/test/java/org/apache/qpid/client/AMQQueueTest.java +++ b/java/client/src/test/java/org/apache/qpid/client/AMQQueueTest.java @@ -26,11 +26,11 @@ import org.apache.qpid.framing.AMQShortString; public class AMQQueueTest extends TestCase { - AMQShortString exchange = new AMQShortString("test.exchange"); - AMQShortString routingkey = new AMQShortString("test-route"); - AMQShortString qname = new AMQShortString("test-queue"); - AMQShortString[] oneBinding = new AMQShortString[]{new AMQShortString("bindingA")}; - AMQShortString[] bindings = new AMQShortString[]{new AMQShortString("bindingB"), + private AMQShortString exchange = new AMQShortString("test.exchange"); + private AMQShortString routingkey = new AMQShortString("test-route"); + private AMQShortString qname = new AMQShortString("test-queue"); + private AMQShortString[] oneBinding = new AMQShortString[]{new AMQShortString("bindingA")}; + private AMQShortString[] bindings = new AMQShortString[]{new AMQShortString("bindingB"), new AMQShortString("bindingC")}; public void testToURLNoBindings() diff --git a/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java b/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java index 919809edc3..009598d8a4 100644 --- a/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java +++ b/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java @@ -51,13 +51,13 @@ public class MockAMQConnection extends AMQConnection @Override public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException { - _connected = true; - _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_OPEN); + setConnected(true); + getProtocolHandler().getStateManager().changeState(AMQState.CONNECTION_OPEN); return null; } public AMQConnectionDelegate getDelegate() { - return _delegate; + return super.getDelegate(); } } diff --git a/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java b/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java index 6b5fc81be6..9a5ca33174 100644 --- a/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java +++ b/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java @@ -59,15 +59,15 @@ public class AMQProtocolHandlerTest extends TestCase private static final Logger _logger = LoggerFactory.getLogger(AMQProtocolHandlerTest.class); // The handler to test - AMQProtocolHandler _handler; + private AMQProtocolHandler _handler; // A frame to block upon whilst waiting the exception - AMQFrame _blockFrame; + private AMQFrame _blockFrame; // Latch to know when the listener receives an exception private CountDownLatch _handleCountDown; // The listener that will receive an exception - BlockToAccessFrameListener _listener; + private BlockToAccessFrameListener _listener; @Override public void setUp() throws Exception diff --git a/java/client/src/test/java/org/apache/qpid/client/util/ClassLoadingAwareObjectInputStreamTest.java b/java/client/src/test/java/org/apache/qpid/client/util/ClassLoadingAwareObjectInputStreamTest.java index 8cd320b06e..91460ab4e7 100644 --- a/java/client/src/test/java/org/apache/qpid/client/util/ClassLoadingAwareObjectInputStreamTest.java +++ b/java/client/src/test/java/org/apache/qpid/client/util/ClassLoadingAwareObjectInputStreamTest.java @@ -31,8 +31,8 @@ import java.util.List; public class ClassLoadingAwareObjectInputStreamTest extends QpidTestCase { - InputStream _in; - ClassLoadingAwareObjectInputStream _claOIS; + private InputStream _in; + private ClassLoadingAwareObjectInputStream _claOIS; protected void setUp() throws Exception { diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java index 5690a05254..576ab4fa05 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java @@ -34,7 +34,7 @@ import java.util.Properties; public class JNDIPropertyFileTest extends TestCase { - Context ctx; + private Context ctx; public JNDIPropertyFileTest() throws Exception { |
