summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-01-27 20:15:31 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-01-27 20:15:31 +0000
commit4dd9cbaf7fdc498a4eb5f2652d88afd20fe5d530 (patch)
treee49a2b2dab05502118d48385e0989faab41feb45 /java/client
parentab01c9c19e109b2f91cb505f53497592c52ca88d (diff)
downloadqpid-python-4dd9cbaf7fdc498a4eb5f2652d88afd20fe5d530.tar.gz
NO-JIRA: Encapsulate fields, use private members and accesors (keep checkstyle happy)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1236867 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/OptionParser.java26
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java2
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java14
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java87
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/publisher/TopicPublisher.java5
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/pubsub/Client.java32
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java10
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/pubsub/Subscriber.java12
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Client.java12
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Server.java12
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java47
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java15
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java22
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQDestination.java82
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQQueueSessionAdaptor.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java303
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java70
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java112
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQTopic.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQTopicSessionAdaptor.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java82
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java44
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java14
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java105
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java16
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java34
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/DispatcherCallback.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java12
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java7
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java10
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java8
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java20
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java27
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java236
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java71
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java8
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/collections/ReferenceMap.java20
-rw-r--r--java/client/src/main/java/org/apache/qpid/collections/keyvalue/AbstractKeyValue.java19
-rw-r--r--java/client/src/main/java/org/apache/qpid/collections/keyvalue/AbstractMapEntry.java15
-rw-r--r--java/client/src/main/java/org/apache/qpid/filter/ArithmeticExpression.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/filter/BinaryExpression.java19
-rw-r--r--java/client/src/main/java/org/apache/qpid/filter/ComparisonExpression.java10
-rw-r--r--java/client/src/main/java/org/apache/qpid/filter/LogicExpression.java8
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java4
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/AMQQueueTest.java10
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java6
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java6
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/util/ClassLoadingAwareObjectInputStreamTest.java4
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java2
64 files changed, 1065 insertions, 665 deletions
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/OptionParser.java b/java/client/example/src/main/java/org/apache/qpid/example/OptionParser.java
index 9548eab4c5..6cc6db1974 100644
--- a/java/client/example/src/main/java/org/apache/qpid/example/OptionParser.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/OptionParser.java
@@ -147,8 +147,8 @@ public class OptionParser
for (Option option: optDefs)
{
- if ((op.startsWith("-") && option.shortForm != null && option.shortForm.equals(key)) ||
- (op.startsWith("--") && option.longForm != null && option.longForm.equals(key)) )
+ if ((op.startsWith("-") && option.getShortForm() != null && option.getShortForm().equals(key)) ||
+ (op.startsWith("--") && option.getLongForm() != null && option.getLongForm().equals(key)) )
{
match = true;
break;
@@ -219,18 +219,18 @@ public class OptionParser
protected boolean containsOp(Option op)
{
- return optMap.containsKey(op.shortForm) || optMap.containsKey(op.longForm);
+ return optMap.containsKey(op.getShortForm()) || optMap.containsKey(op.getLongForm());
}
protected String getOp(Option op)
{
- if (optMap.containsKey(op.shortForm))
+ if (optMap.containsKey(op.getShortForm()))
{
- return (String)optMap.get(op.shortForm);
+ return (String)optMap.get(op.getShortForm());
}
- else if (optMap.containsKey(op.longForm))
+ else if (optMap.containsKey(op.getLongForm()))
{
- return (String)optMap.get(op.longForm);
+ return (String)optMap.get(op.getLongForm());
}
else
{
@@ -286,12 +286,12 @@ public class OptionParser
static class Option
{
- private String shortForm;
- private String longForm;
- private String desc;
- private String valueLabel;
- private String defaultValue;
- private Class type;
+ private final String shortForm;
+ private final String longForm;
+ private final String desc;
+ private final String valueLabel;
+ private final String defaultValue;
+ private final Class type;
public Option(String shortForm, String longForm, String desc,
String valueLabel, String defaultValue, Class type)
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java
index 3d16e01af4..2b1e641689 100644
--- a/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java
@@ -103,7 +103,7 @@ public class MonitorMessageDispatcher
// (FileMessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(),"monitor:" +System.currentTimeMillis()));
getMonitorPublisher().sendMessage
- (getMonitorPublisher()._session,
+ (getMonitorPublisher().getSession(),
FileMessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(), "monitor:" + System.currentTimeMillis()),
DeliveryMode.PERSISTENT, false, true);
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java
index 750f57d9dc..b2bb0893d8 100644
--- a/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java
@@ -36,7 +36,7 @@ public class MonitorPublisher extends Publisher
private static final Logger _log = LoggerFactory.getLogger(Publisher.class);
- BasicMessageProducer _producer;
+ private BasicMessageProducer _producer;
public MonitorPublisher()
{
@@ -51,14 +51,14 @@ public class MonitorPublisher extends Publisher
{
try
{
- _producer = (BasicMessageProducer) session.createProducer(_destination);
+ _producer = (BasicMessageProducer) session.createProducer(getDestination());
_producer.send(message, deliveryMode, immediate);
if (commit)
{
//commit the message send and close the transaction
- _session.commit();
+ getSession().commit();
}
}
@@ -70,7 +70,7 @@ public class MonitorPublisher extends Publisher
throw new UndeliveredMessageException("Cannot deliver immediate message", e);
}
- _log.info(_name + " finished sending message: " + message);
+ _log.info(getName() + " finished sending message: " + message);
return true;
}
@@ -81,14 +81,14 @@ public class MonitorPublisher extends Publisher
{
try
{
- _producer = (BasicMessageProducer) _session.createProducer(_destination);
+ _producer = (BasicMessageProducer) getSession().createProducer(getDestination());
//Send message via our producer which is not persistent and is immediate
//NB: not available via jms interface MessageProducer
_producer.send(message, DeliveryMode.NON_PERSISTENT, true);
//commit the message send and close the transaction
- _session.commit();
+ getSession().commit();
}
catch (JMSException e)
@@ -99,7 +99,7 @@ public class MonitorPublisher extends Publisher
throw new UndeliveredMessageException("Cannot deliver immediate message", e);
}
- _log.info(_name + " finished sending message: " + message);
+ _log.info(getName() + " finished sending message: " + message);
return true;
}
}
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java
index b5f44557a4..76531523b9 100644
--- a/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java
@@ -34,19 +34,19 @@ public class Publisher
protected InitialContextHelper _contextHelper;
- protected Connection _connection;
+ private Connection _connection;
- protected Session _session;
+ private Session _session;
- protected MessageProducer _producer;
+ private MessageProducer _producer;
- protected String _destinationDir;
+ private String _destinationDir;
- protected String _name = "Publisher";
+ private String _name = "Publisher";
- protected Destination _destination;
+ private Destination _destination;
- protected static final String _defaultDestinationDir = "/tmp";
+ private static final String _defaultDestinationDir = "/tmp";
/**
* Creates a Publisher instance using properties from example.properties
@@ -62,9 +62,9 @@ public class Publisher
//then create a connection using the AMQConnectionFactory
AMQConnectionFactory cf = (AMQConnectionFactory) ctx.lookup("local");
- _connection = cf.createConnection();
+ setConnection(cf.createConnection());
- _connection.setExceptionListener(new ExceptionListener()
+ getConnection().setExceptionListener(new ExceptionListener()
{
public void onException(JMSException jmse)
{
@@ -76,19 +76,19 @@ public class Publisher
});
//create a transactional session
- _session = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ setSession(getConnection().createSession(true, Session.AUTO_ACKNOWLEDGE));
//lookup the example queue and use it
//Queue is non-exclusive and not deleted when last consumer detaches
- _destination = (Queue) ctx.lookup("MyQueue");
+ setDestination((Queue) ctx.lookup("MyQueue"));
//create a message producer
- _producer = _session.createProducer(_destination);
+ setProducer(getSession().createProducer(getDestination()));
//set destination dir for files that have been processed
- _destinationDir = _defaultDestinationDir;
+ setDestinationDir(get_defaultDestinationDir());
- _connection.start();
+ getConnection().start();
}
catch (Exception e)
{
@@ -97,6 +97,11 @@ public class Publisher
}
}
+ public static String get_defaultDestinationDir()
+ {
+ return _defaultDestinationDir;
+ }
+
/**
* Creates and sends the number of messages specified in the param
*/
@@ -104,7 +109,7 @@ public class Publisher
{
try
{
- TextMessage txtMessage = _session.createTextMessage("msg");
+ TextMessage txtMessage = getSession().createTextMessage("msg");
for (int i=0;i<numMessages;i++)
{
sendMessage(txtMessage);
@@ -128,10 +133,10 @@ public class Publisher
try
{
//Send message via our producer which is not persistent
- _producer.send(message, DeliveryMode.PERSISTENT, _producer.getPriority(), _producer.getTimeToLive());
+ getProducer().send(message, DeliveryMode.PERSISTENT, getProducer().getPriority(), getProducer().getTimeToLive());
//commit the message send and close the transaction
- _session.commit();
+ getSession().commit();
}
catch (JMSException e)
@@ -139,7 +144,7 @@ public class Publisher
//Have to assume our commit failed and rollback here
try
{
- _session.rollback();
+ getSession().rollback();
_log.error("JMSException", e);
e.printStackTrace();
return false;
@@ -162,13 +167,13 @@ public class Publisher
{
try
{
- if (_connection != null)
+ if (getConnection() != null)
{
- _connection.stop();
- _connection.close();
+ getConnection().stop();
+ getConnection().close();
}
- _connection = null;
- _producer = null;
+ setConnection(null);
+ setProducer(null);
}
catch(Exception e)
{
@@ -204,5 +209,41 @@ public class Publisher
public void setName(String _name) {
this._name = _name;
}
+
+
+ public Connection getConnection()
+ {
+ return _connection;
+ }
+
+ public void setConnection(Connection connection)
+ {
+ _connection = connection;
+ }
+
+ public void setSession(Session session)
+ {
+ _session = session;
+ }
+
+ public MessageProducer getProducer()
+ {
+ return _producer;
+ }
+
+ public void setProducer(MessageProducer producer)
+ {
+ _producer = producer;
+ }
+
+ public Destination getDestination()
+ {
+ return _destination;
+ }
+
+ public void setDestination(Destination destination)
+ {
+ _destination = destination;
+ }
}
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/publisher/TopicPublisher.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/TopicPublisher.java
index 8645e41101..953a875912 100644
--- a/java/client/example/src/main/java/org/apache/qpid/example/publisher/TopicPublisher.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/TopicPublisher.java
@@ -18,7 +18,6 @@
*/
package org.apache.qpid.example.publisher;
-import org.apache.qpid.client.BasicMessageProducer;
import org.apache.qpid.example.shared.InitialContextHelper;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
@@ -44,10 +43,10 @@ public class TopicPublisher extends Publisher
InitialContext ctx = _contextHelper.getInitialContext();
//lookup the example topic and use it
- _destination = (Topic) ctx.lookup("MyTopic");
+ setDestination((Topic) ctx.lookup("MyTopic"));
//create a message producer
- _producer = _session.createProducer(_destination);
+ setProducer(getSession().createProducer(getDestination()));
}
catch (Exception e)
{
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Client.java b/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Client.java
index e32ee0ba73..5b0f4757ca 100644
--- a/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Client.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Client.java
@@ -31,11 +31,11 @@ import javax.naming.NamingException;
*/
public abstract class Client
{
- protected ConnectionSetup _setup;
+ private ConnectionSetup _setup;
- protected Connection _connection;
- protected Destination _destination;
- protected Session _session;
+ private Connection _connection;
+ private Destination _destination;
+ private Session _session;
public Client(String destination)
{
@@ -69,4 +69,28 @@ public abstract class Client
public abstract void start();
+ public ConnectionSetup getSetup()
+ {
+ return _setup;
+ }
+
+ public Connection getConnection()
+ {
+ return _connection;
+ }
+
+ public Destination getDestination()
+ {
+ return _destination;
+ }
+
+ public Session getSession()
+ {
+ return _session;
+ }
+
+ public void setSession(Session session)
+ {
+ _session = session;
+ }
} \ No newline at end of file
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java b/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java
index ac3829d49e..f35d56c702 100644
--- a/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java
@@ -36,7 +36,7 @@ import javax.jms.Session;
*/
public class Publisher extends Client
{
- int _msgCount;
+ private int _msgCount;
public Publisher(String destination, int msgCount)
{
@@ -48,18 +48,18 @@ public class Publisher extends Client
{
try
{
- _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ setSession(getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE));
- MessageProducer _producer = _session.createProducer(_destination);
+ MessageProducer _producer = getSession().createProducer(getDestination());
for (int msgCount = 0; msgCount < _msgCount; msgCount++)
{
- _producer.send(_session.createTextMessage("msg:" + msgCount));
+ _producer.send(getSession().createTextMessage("msg:" + msgCount));
System.out.println("Sent:" + msgCount);
}
System.out.println("Done.");
- _connection.close();
+ getConnection().close();
}
catch (JMSException e)
{
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Subscriber.java b/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Subscriber.java
index f2d736701f..1d7fc43b9c 100644
--- a/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Subscriber.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Subscriber.java
@@ -41,7 +41,7 @@ import java.util.concurrent.CountDownLatch;
public class Subscriber extends Client implements MessageListener
{
- CountDownLatch _count;
+ private CountDownLatch _count;
public Subscriber(String destination, int msgCount)
{
@@ -54,16 +54,16 @@ public class Subscriber extends Client implements MessageListener
{
try
{
- _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ setSession(getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE));
- _session.createDurableSubscriber((Topic) _setup.getDestination(ConnectionSetup.TOPIC_JNDI_NAME),
- "exampleClient").setMessageListener(this);
- _connection.start();
+ getSession().createDurableSubscriber((Topic) getSetup().getDestination(ConnectionSetup.TOPIC_JNDI_NAME),
+ "exampleClient").setMessageListener(this);
+ getConnection().start();
_count.await();
System.out.println("Done");
- _connection.close();
+ getConnection().close();
}
catch (JMSException e)
{
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Client.java b/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Client.java
index 8a0ff88448..ee52e8b9ea 100644
--- a/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Client.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Client.java
@@ -40,15 +40,15 @@ import java.util.concurrent.CountDownLatch;
public class Client implements MessageListener
{
- final String BROKER = "localhost";
+ private final String BROKER = "localhost";
- final String INITIAL_CONTEXT_FACTORY = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory";
+ private final String INITIAL_CONTEXT_FACTORY = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory";
- final String CONNECTION_JNDI_NAME = "local";
- final String CONNECTION_NAME = "amqp://guest:guest@clientid/test?brokerlist='" + BROKER + "'";
+ private final String CONNECTION_JNDI_NAME = "local";
+ private final String CONNECTION_NAME = "amqp://guest:guest@clientid/test?brokerlist='" + BROKER + "'";
- final String QUEUE_JNDI_NAME = "queue";
- final String QUEUE_NAME = "example.RequestQueue";
+ private final String QUEUE_JNDI_NAME = "queue";
+ private final String QUEUE_NAME = "example.RequestQueue";
private InitialContext _ctx;
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Server.java b/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Server.java
index 9c284eee97..88e8ca1f45 100644
--- a/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Server.java
+++ b/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Server.java
@@ -45,15 +45,15 @@ import java.io.IOException;
public class Server implements MessageListener
{
- final String BROKER = "localhost";
+ private final String BROKER = "localhost";
- final String INITIAL_CONTEXT_FACTORY = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory";
+ private final String INITIAL_CONTEXT_FACTORY = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory";
- final String CONNECTION_JNDI_NAME = "local";
- final String CONNECTION_NAME = "amqp://guest:guest@clientid/test?brokerlist='" + BROKER + "'";
+ private final String CONNECTION_JNDI_NAME = "local";
+ private final String CONNECTION_NAME = "amqp://guest:guest@clientid/test?brokerlist='" + BROKER + "'";
- final String QUEUE_JNDI_NAME = "queue";
- final String QUEUE_NAME = "example.RequestQueue";
+ private final String QUEUE_JNDI_NAME = "queue";
+ private final String QUEUE_NAME = "example.RequestQueue";
private InitialContext _ctx;
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 746d5b8f34..6c684e593d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -106,7 +106,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
* the handler deals with this. It also deals with the initial dispatch of any protocol frames to their appropriate
* handler.
*/
- protected AMQProtocolHandler _protocolHandler;
+ private AMQProtocolHandler _protocolHandler;
/** Maps from session id (Integer) to AMQSession instance */
private final ChannelToSessionMap _sessions = new ChannelToSessionMap();
@@ -122,7 +122,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
/** The virtual path to connect to on the AMQ server */
private String _virtualHost;
- protected ExceptionListener _exceptionListener;
+ private ExceptionListener _exceptionListener;
private ConnectionListener _connectionListener;
@@ -132,15 +132,15 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
* Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for message
* publication.
*/
- protected volatile boolean _started;
+ private volatile boolean _started;
/** Policy dictating how to failover */
- protected FailoverPolicy _failoverPolicy;
+ private FailoverPolicy _failoverPolicy;
/*
* _Connected should be refactored with a suitable wait object.
*/
- protected boolean _connected;
+ private boolean _connected;
/*
* The connection meta data
@@ -156,7 +156,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private final ExecutorService _taskPool = Executors.newCachedThreadPool();
private static final long DEFAULT_TIMEOUT = 1000 * 30;
- protected AMQConnectionDelegate _delegate;
+ private AMQConnectionDelegate _delegate;
// this connection maximum number of prefetched messages
private int _maxPrefetch;
@@ -346,11 +346,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_logger.info("Connecting with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion());
// We are not currently connected
- _connected = false;
+ setConnected(false);
boolean retryAllowed = true;
Exception connectionException = null;
- while (!_connected && retryAllowed && brokerDetails != null)
+ while (!isConnected() && retryAllowed && brokerDetails != null)
{
ProtocolVersion pe = null;
try
@@ -374,7 +374,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
// broker
initDelegate(pe);
}
- else if (!_connected)
+ else if (!isConnected())
{
retryAllowed = _failoverPolicy.failoverAllowed();
brokerDetails = _failoverPolicy.getNextBrokerDetails();
@@ -384,10 +384,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
if (_logger.isDebugEnabled())
{
- _logger.debug("Are we connected:" + _connected);
+ _logger.debug("Are we connected:" + isConnected());
}
- if (!_connected)
+ if (!isConnected())
{
if (_logger.isDebugEnabled())
{
@@ -590,7 +590,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public boolean failoverAllowed()
{
- if (!_connected)
+ if (!isConnected())
{
return false;
}
@@ -729,6 +729,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
+ protected final ExceptionListener getExceptionListenerNoCheck()
+ {
+ return _exceptionListener;
+ }
+
public ExceptionListener getExceptionListener() throws JMSException
{
checkNotClosed();
@@ -1048,16 +1053,26 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
return _virtualHost;
}
- public AMQProtocolHandler getProtocolHandler()
+ public final AMQProtocolHandler getProtocolHandler()
{
return _protocolHandler;
}
- public boolean started()
+ public final boolean started()
{
return _started;
}
+ protected final boolean isConnected()
+ {
+ return _connected;
+ }
+
+ protected final void setConnected(boolean connected)
+ {
+ _connected = connected;
+ }
+
public void bytesSent(long writtenBytes)
{
if (_connectionListener != null)
@@ -1489,4 +1504,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
return _lastFailoverTime;
}
+ protected AMQConnectionDelegate getDelegate()
+ {
+ return _delegate;
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index 74475c0bc1..a18a3fcbd4 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -71,7 +71,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
/**
* The QpidConeection instance that is mapped with this JMS connection.
*/
- org.apache.qpid.transport.Connection _qpidConnection;
+ private org.apache.qpid.transport.Connection _qpidConnection;
private ConnectionException exception = null;
//--- constructor
@@ -109,7 +109,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
session = new AMQSession_0_10(_qpidConnection, _conn, channelId, transacted, acknowledgeMode, prefetchHigh,
prefetchLow,name);
_conn.registerSession(channelId, session);
- if (_conn._started)
+ if (_conn.started())
{
session.start();
}
@@ -152,7 +152,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
{
session = new XASessionImpl(_qpidConnection, _conn, channelId, prefetchHigh, prefetchLow);
_conn.registerSession(channelId, session);
- if (_conn._started)
+ if (_conn.started())
{
session.start();
}
@@ -164,7 +164,6 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
return session;
}
- @Override
public XASession createXASession(int ackMode)
throws JMSException
{
@@ -182,7 +181,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
{
session = new XASessionImpl(_qpidConnection, _conn, channelId, ackMode, (int)_conn.getMaxPrefetch(), (int)_conn.getMaxPrefetch() / 2);
_conn.registerSession(channelId, session);
- if (_conn._started)
+ if (_conn.started())
{
session.start();
}
@@ -218,10 +217,10 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
_qpidConnection.setConnectionDelegate(new ClientConnectionDelegate(conSettings, _conn.getConnectionURL()));
_qpidConnection.connect(conSettings);
- _conn._connected = true;
+ _conn.setConnected(true);
_conn.setUsername(_qpidConnection.getUserID());
_conn.setMaximumChannelCount(_qpidConnection.getChannelMax());
- _conn._failoverPolicy.attainedConnection();
+ _conn.getFailoverPolicy().attainedConnection();
}
catch (ProtocolVersionException pe)
{
@@ -327,7 +326,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
}
}
- ExceptionListener listener = _conn._exceptionListener;
+ ExceptionListener listener = _conn.getExceptionListenerNoCheck();
if (listener == null)
{
_logger.error("connection exception: " + conn, exc);
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index 287e4f3859..5068b1bc50 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -120,7 +120,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
EnumSet.of(AMQState.CONNECTION_OPEN, AMQState.CONNECTION_CLOSED);
- StateWaiter waiter = _conn._protocolHandler.createWaiter(openOrClosedStates);
+ StateWaiter waiter = _conn.getProtocolHandler().createWaiter(openOrClosedStates);
ConnectionSettings settings = brokerDetail.buildConnectionSettings();
settings.setProtocol(brokerDetail.getTransport());
@@ -148,9 +148,9 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
SecurityLayer securityLayer = SecurityLayerFactory.newInstance(settings);
OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(getProtocolVersion());
- NetworkConnection network = transport.connect(settings, securityLayer.receiver(_conn._protocolHandler), sslContext);
- _conn._protocolHandler.setNetworkConnection(network, securityLayer.sender(network.getSender()));
- _conn._protocolHandler.getProtocolSession().init();
+ NetworkConnection network = transport.connect(settings, securityLayer.receiver(_conn.getProtocolHandler()), sslContext);
+ _conn.getProtocolHandler().setNetworkConnection(network, securityLayer.sender(network.getSender()));
+ _conn.getProtocolHandler().getProtocolSession().init();
// this blocks until the connection has been set up or when an error
// has prevented the connection being set up
@@ -158,13 +158,13 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
if(state == AMQState.CONNECTION_OPEN)
{
- _conn._failoverPolicy.attainedConnection();
- _conn._connected = true;
+ _conn.getFailoverPolicy().attainedConnection();
+ _conn.setConnected(true);
return null;
}
else
{
- return _conn._protocolHandler.getSuggestedProtocolVersion();
+ return _conn.getProtocolHandler().getSuggestedProtocolVersion();
}
}
@@ -237,7 +237,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
}
}
- if (_conn._started)
+ if (_conn.started())
{
try
{
@@ -271,12 +271,12 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
{
ChannelOpenBody channelOpenBody = _conn.getProtocolHandler().getMethodRegistry().createChannelOpenBody(null);
// TODO: Be aware of possible changes to parameter order as versions change.
- _conn._protocolHandler.syncWrite(channelOpenBody.generateFrame(channelId), ChannelOpenOkBody.class);
+ _conn.getProtocolHandler().syncWrite(channelOpenBody.generateFrame(channelId), ChannelOpenOkBody.class);
// todo send low water mark when protocol allows.
// todo Be aware of possible changes to parameter order as versions change.
BasicQosBody basicQosBody = _conn.getProtocolHandler().getMethodRegistry().createBasicQosBody(0,prefetchHigh,false);
- _conn._protocolHandler.syncWrite(basicQosBody.generateFrame(channelId),BasicQosOkBody.class);
+ _conn.getProtocolHandler().syncWrite(basicQosBody.generateFrame(channelId),BasicQosOkBody.class);
if (transacted)
{
@@ -287,7 +287,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
TxSelectBody body = _conn.getProtocolHandler().getMethodRegistry().createTxSelectBody();
// TODO: Be aware of possible changes to parameter order as versions change.
- _conn._protocolHandler.syncWrite(body.generateFrame(channelId), TxSelectOkBody.class);
+ _conn.getProtocolHandler().syncWrite(body.generateFrame(channelId), TxSelectOkBody.class);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index 76828afd6a..9e19cc8969 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -48,15 +48,15 @@ public abstract class AMQDestination implements Destination, Referenceable
{
private static final Logger _logger = LoggerFactory.getLogger(AMQDestination.class);
- protected AMQShortString _exchangeName;
+ private AMQShortString _exchangeName;
- protected AMQShortString _exchangeClass;
+ private AMQShortString _exchangeClass;
- protected boolean _isDurable;
+ private boolean _isDurable;
- protected boolean _isExclusive;
+ private boolean _isExclusive;
- protected boolean _isAutoDelete;
+ private boolean _isAutoDelete;
private boolean _browseOnly;
@@ -81,6 +81,41 @@ public abstract class AMQDestination implements Destination, Referenceable
public static final int TOPIC_TYPE = 2;
public static final int UNKNOWN_TYPE = 3;
+ protected void setExclusive(boolean exclusive)
+ {
+ _isExclusive = exclusive;
+ }
+
+ protected AddressHelper getAddrHelper()
+ {
+ return _addrHelper;
+ }
+
+ protected void setAddrHelper(AddressHelper addrHelper)
+ {
+ _addrHelper = addrHelper;
+ }
+
+ protected String getName()
+ {
+ return _name;
+ }
+
+ protected void setName(String name)
+ {
+ _name = name;
+ }
+
+ protected Link getTargetLink()
+ {
+ return _targetLink;
+ }
+
+ protected void setTargetLink(Link targetLink)
+ {
+ _targetLink = targetLink;
+ }
+
// ----- Fields required to support new address syntax -------
public enum DestSyntax {
@@ -132,23 +167,23 @@ public abstract class AMQDestination implements Destination, Referenceable
}
}
- protected final static DestSyntax defaultDestSyntax;
+ private final static DestSyntax defaultDestSyntax;
- protected DestSyntax _destSyntax = DestSyntax.ADDR;
+ private DestSyntax _destSyntax = DestSyntax.ADDR;
- protected AddressHelper _addrHelper;
- protected Address _address;
- protected int _addressType = AMQDestination.UNKNOWN_TYPE;
- protected String _name;
- protected String _subject;
- protected AddressOption _create = AddressOption.NEVER;
- protected AddressOption _assert = AddressOption.NEVER;
- protected AddressOption _delete = AddressOption.NEVER;
+ private AddressHelper _addrHelper;
+ private Address _address;
+ private int _addressType = AMQDestination.UNKNOWN_TYPE;
+ private String _name;
+ private String _subject;
+ private AddressOption _create = AddressOption.NEVER;
+ private AddressOption _assert = AddressOption.NEVER;
+ private AddressOption _delete = AddressOption.NEVER;
- protected Node _targetNode;
- protected Node _sourceNode;
- protected Link _targetLink;
- protected Link _link;
+ private Node _targetNode;
+ private Node _sourceNode;
+ private Link _targetLink;
+ private Link _link;
// ----- / Fields required to support new address syntax -------
@@ -646,10 +681,10 @@ public abstract class AMQDestination implements Destination, Referenceable
public static class Binding
{
- String exchange;
- String bindingKey;
- String queue;
- Map<String,Object> args;
+ private String exchange;
+ private String bindingKey;
+ private String queue;
+ private Map<String,Object> args;
public Binding(String exchange,
String queue,
@@ -902,4 +937,5 @@ public abstract class AMQDestination implements Destination, Referenceable
return _rejectBehaviour;
}
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java b/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
index 3f9eadeef3..465d858091 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
@@ -108,7 +108,7 @@ public class AMQQueueBrowser implements QueueBrowser
private class QueueBrowserEnumeration implements Enumeration
{
- Message _nextMessage;
+ private Message _nextMessage;
private BasicMessageConsumer _consumer;
public QueueBrowserEnumeration(BasicMessageConsumer consumer) throws JMSException
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQQueueSessionAdaptor.java b/java/client/src/main/java/org/apache/qpid/client/AMQQueueSessionAdaptor.java
index c59eba60b8..d1c796c34a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQQueueSessionAdaptor.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQQueueSessionAdaptor.java
@@ -31,7 +31,7 @@ import java.io.Serializable;
public class AMQQueueSessionAdaptor implements QueueSession, AMQSessionAdapter
{
//holds a session for delegation
- protected final AMQSession _session;
+ private final AMQSession _session;
/**
* Construct an adaptor with a session to wrap
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index cca76ebac5..92579c31f0 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -96,6 +96,166 @@ import java.util.concurrent.locks.ReentrantLock;
*/
public abstract class AMQSession<C extends BasicMessageConsumer, P extends BasicMessageProducer> extends Closeable implements Session, QueueSession, TopicSession
{
+ /**
+ * Used to reference durable subscribers so that requests for unsubscribe can be handled correctly. Note this only
+ * keeps a record of subscriptions which have been created in the current instance. It does not remember
+ * subscriptions between executions of the client.
+ */
+ protected ConcurrentHashMap<String, TopicSubscriberAdaptor<C>> getSubscriptions()
+ {
+ return _subscriptions;
+ }
+
+ /**
+ * Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked
+ * up in the {@link #_subscriptions} map.
+ */
+ protected ConcurrentHashMap<C, String> getReverseSubscriptionMap()
+ {
+ return _reverseSubscriptionMap;
+ }
+
+ /**
+ * Locks to keep access to subscriber details atomic.
+ * <p>
+ * Added for QPID2418
+ */
+ protected Lock getSubscriberDetails()
+ {
+ return _subscriberDetails;
+ }
+
+ protected Lock getSubscriberAccess()
+ {
+ return _subscriberAccess;
+ }
+
+ /**
+ * Used to hold incoming messages.
+ *
+ * @todo Weaken the type once {@link org.apache.qpid.client.util.FlowControllingBlockingQueue} implements Queue.
+ */
+ protected FlowControllingBlockingQueue getQueue()
+ {
+ return _queue;
+ }
+
+ /** Holds the highest received delivery tag. */
+ protected AtomicLong getHighestDeliveryTag()
+ {
+ return _highestDeliveryTag;
+ }
+
+ /** Pre-fetched message tags */
+ protected ConcurrentLinkedQueue<Long> getPrefetchedMessageTags()
+ {
+ return _prefetchedMessageTags;
+ }
+
+ protected void setPrefetchedMessageTags(ConcurrentLinkedQueue<Long> prefetchedMessageTags)
+ {
+ _prefetchedMessageTags = prefetchedMessageTags;
+ }
+
+ /** All the not yet acknowledged message tags */
+ protected ConcurrentLinkedQueue<Long> getUnacknowledgedMessageTags()
+ {
+ return _unacknowledgedMessageTags;
+ }
+
+ protected void setUnacknowledgedMessageTags(ConcurrentLinkedQueue<Long> unacknowledgedMessageTags)
+ {
+ _unacknowledgedMessageTags = unacknowledgedMessageTags;
+ }
+
+ /** All the delivered message tags */
+ protected ConcurrentLinkedQueue<Long> getDeliveredMessageTags()
+ {
+ return _deliveredMessageTags;
+ }
+
+ protected void setDeliveredMessageTags(ConcurrentLinkedQueue<Long> deliveredMessageTags)
+ {
+ _deliveredMessageTags = deliveredMessageTags;
+ }
+
+ /** Holds the dispatcher thread for this session. */
+ protected Dispatcher getDispatcher()
+ {
+ return _dispatcher;
+ }
+
+ protected void setDispatcher(Dispatcher dispatcher)
+ {
+ _dispatcher = dispatcher;
+ }
+
+ protected Thread getDispatcherThread()
+ {
+ return _dispatcherThread;
+ }
+
+ protected void setDispatcherThread(Thread dispatcherThread)
+ {
+ _dispatcherThread = dispatcherThread;
+ }
+
+ /** Holds the message factory factory for this session. */
+ protected MessageFactoryRegistry getMessageFactoryRegistry()
+ {
+ return _messageFactoryRegistry;
+ }
+
+ protected void setMessageFactoryRegistry(MessageFactoryRegistry messageFactoryRegistry)
+ {
+ _messageFactoryRegistry = messageFactoryRegistry;
+ }
+
+ /**
+ * Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right
+ * consumer.
+ */
+ protected IdToConsumerMap<C> getConsumers()
+ {
+ return _consumers;
+ }
+
+ /**
+ * Set when the dispatcher should direct incoming messages straight into the UnackedMessage list instead of
+ * to the syncRecieveQueue or MessageListener. Used during cleanup, e.g. in Session.recover().
+ */
+ protected boolean isUsingDispatcherForCleanup()
+ {
+ return _usingDispatcherForCleanup;
+ }
+
+ protected void setUsingDispatcherForCleanup(boolean usingDispatcherForCleanup)
+ {
+ _usingDispatcherForCleanup = usingDispatcherForCleanup;
+ }
+
+ /**
+ * Used to ensure that only the first call to start the dispatcher can unsuspend the channel.
+ *
+ * @todo This is accessed only within a synchronized method, so does not need to be atomic.
+ */
+ protected AtomicBoolean getFirstDispatcher()
+ {
+ return _firstDispatcher;
+ }
+
+ /** Used to indicate that the session should start pre-fetching messages as soon as it is started. */
+ protected boolean isImmediatePrefetch()
+ {
+ return _immediatePrefetch;
+ }
+
+ /** Indicates that runtime exceptions should be generated on vilations of the strict AMQP. */
+ protected boolean isStrictAMQPFATAL()
+ {
+ return _strictAMQPFATAL;
+ }
+
public static final class IdToConsumerMap<C extends BasicMessageConsumer>
{
private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
@@ -173,8 +333,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
}
- final AMQSession<C, P> _thisSession = this;
-
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
@@ -182,34 +340,34 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
* The default value for immediate flag used by producers created by this session is false. That is, a consumer does
* not need to be attached to a queue.
*/
- protected final boolean DEFAULT_IMMEDIATE = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false"));
+ private final boolean _defaultImmediateValue = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false"));
/**
* The default value for mandatory flag used by producers created by this session is true. That is, server will not
* silently drop messages where no queue is connected to the exchange for the message.
*/
- protected final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true"));
+ private final boolean _defaultMandatoryValue = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true"));
/**
* The period to wait while flow controlled before sending a log message confirming that the session is still
* waiting on flow control being revoked
*/
- protected final long FLOW_CONTROL_WAIT_PERIOD = Long.getLong("qpid.flow_control_wait_notify_period",5000L);
+ private final long _flowControlWaitPeriod = Long.getLong("qpid.flow_control_wait_notify_period",5000L);
/**
* The period to wait while flow controlled before declaring a failure
*/
public static final long DEFAULT_FLOW_CONTROL_WAIT_FAILURE = 120000L;
- protected final long FLOW_CONTROL_WAIT_FAILURE = Long.getLong("qpid.flow_control_wait_failure",
+ private final long _flowControlWaitFailure = Long.getLong("qpid.flow_control_wait_failure",
DEFAULT_FLOW_CONTROL_WAIT_FAILURE);
- protected final boolean DECLARE_QUEUES =
+ private final boolean _delareQueues =
Boolean.parseBoolean(System.getProperty("qpid.declare_queues", "true"));
- protected final boolean DECLARE_EXCHANGES =
+ private final boolean _declareExchanges =
Boolean.parseBoolean(System.getProperty("qpid.declare_exchanges", "true"));
- protected final boolean USE_AMQP_ENCODED_MAP_MESSAGE;
+ private final boolean _useAMQPEncodedMapMessage;
/** System property to enable strict AMQP compliance. */
public static final String STRICT_AMQP = "STRICT_AMQP";
@@ -230,16 +388,16 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public static final String IMMEDIATE_PREFETCH_DEFAULT = "false";
/** The connection to which this session belongs. */
- protected AMQConnection _connection;
+ private AMQConnection _connection;
/** Used to indicate whether or not this is a transactional session. */
- protected final boolean _transacted;
+ private final boolean _transacted;
/** Holds the sessions acknowledgement mode. */
- protected final int _acknowledgeMode;
+ private final int _acknowledgeMode;
/** Holds this session unique identifier, used to distinguish it from other sessions. */
- protected int _channelId;
+ private int _channelId;
private int _ticket;
@@ -255,55 +413,30 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
/** Used to indicate that this session has been started at least once. */
private AtomicBoolean _startedAtLeastOnce = new AtomicBoolean(false);
- /**
- * Used to reference durable subscribers so that requests for unsubscribe can be handled correctly. Note this only
- * keeps a record of subscriptions which have been created in the current instance. It does not remember
- * subscriptions between executions of the client.
- */
- protected final ConcurrentHashMap<String, TopicSubscriberAdaptor<C>> _subscriptions =
+ private final ConcurrentHashMap<String, TopicSubscriberAdaptor<C>> _subscriptions =
new ConcurrentHashMap<String, TopicSubscriberAdaptor<C>>();
- /**
- * Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked
- * up in the {@link #_subscriptions} map.
- */
- protected final ConcurrentHashMap<C, String> _reverseSubscriptionMap = new ConcurrentHashMap<C, String>();
+ private final ConcurrentHashMap<C, String> _reverseSubscriptionMap = new ConcurrentHashMap<C, String>();
- /**
- * Locks to keep access to subscriber details atomic.
- * <p>
- * Added for QPID2418
- */
- protected final Lock _subscriberDetails = new ReentrantLock(true);
- protected final Lock _subscriberAccess = new ReentrantLock(true);
+ private final Lock _subscriberDetails = new ReentrantLock(true);
+ private final Lock _subscriberAccess = new ReentrantLock(true);
- /**
- * Used to hold incoming messages.
- *
- * @todo Weaken the type once {@link FlowControllingBlockingQueue} implements Queue.
- */
- protected final FlowControllingBlockingQueue _queue;
+ private final FlowControllingBlockingQueue _queue;
- /** Holds the highest received delivery tag. */
- protected final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
+ private final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
private final AtomicLong _rollbackMark = new AtomicLong(-1);
- /** Pre-fetched message tags */
- protected ConcurrentLinkedQueue<Long> _prefetchedMessageTags = new ConcurrentLinkedQueue<Long>();
+ private ConcurrentLinkedQueue<Long> _prefetchedMessageTags = new ConcurrentLinkedQueue<Long>();
- /** All the not yet acknowledged message tags */
- protected ConcurrentLinkedQueue<Long> _unacknowledgedMessageTags = new ConcurrentLinkedQueue<Long>();
+ private ConcurrentLinkedQueue<Long> _unacknowledgedMessageTags = new ConcurrentLinkedQueue<Long>();
- /** All the delivered message tags */
- protected ConcurrentLinkedQueue<Long> _deliveredMessageTags = new ConcurrentLinkedQueue<Long>();
+ private ConcurrentLinkedQueue<Long> _deliveredMessageTags = new ConcurrentLinkedQueue<Long>();
- /** Holds the dispatcher thread for this session. */
- protected Dispatcher _dispatcher;
+ private Dispatcher _dispatcher;
- protected Thread _dispatcherThread;
+ private Thread _dispatcherThread;
- /** Holds the message factory factory for this session. */
- protected MessageFactoryRegistry _messageFactoryRegistry;
+ private MessageFactoryRegistry _messageFactoryRegistry;
/** Holds all of the producers created by this session, keyed by their unique identifiers. */
private Map<Long, MessageProducer> _producers = new ConcurrentHashMap<Long, MessageProducer>();
@@ -314,11 +447,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
*/
private int _nextTag = 1;
- /**
- * Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right
- * consumer.
- */
- protected final IdToConsumerMap<C> _consumers = new IdToConsumerMap<C>();
+ private final IdToConsumerMap<C> _consumers = new IdToConsumerMap<C>();
/**
* Contains a list of consumers which have been removed but which might still have
@@ -344,11 +473,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
*/
private volatile boolean _sessionInRecovery;
- /**
- * Set when the dispatcher should direct incoming messages straight into the UnackedMessage list instead of
- * to the syncRecieveQueue or MessageListener. Used during cleanup, e.g. in Session.recover().
- */
- protected volatile boolean _usingDispatcherForCleanup;
+ private volatile boolean _usingDispatcherForCleanup;
/** Used to indicates that the connection to which this session belongs, has been stopped. */
private boolean _connectionStopped;
@@ -365,21 +490,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
*/
private final Object _suspensionLock = new Object();
- /**
- * Used to ensure that only the first call to start the dispatcher can unsuspend the channel.
- *
- * @todo This is accessed only within a synchronized method, so does not need to be atomic.
- */
- protected final AtomicBoolean _firstDispatcher = new AtomicBoolean(true);
+ private final AtomicBoolean _firstDispatcher = new AtomicBoolean(true);
- /** Used to indicate that the session should start pre-fetching messages as soon as it is started. */
- protected final boolean _immediatePrefetch;
+ private final boolean _immediatePrefetch;
- /** Indicates that warnings should be generated on violations of the strict AMQP. */
- protected final boolean _strictAMQP;
+ private final boolean _strictAMQP;
- /** Indicates that runtime exceptions should be generated on vilations of the strict AMQP. */
- protected final boolean _strictAMQPFATAL;
+ private final boolean _strictAMQPFATAL;
private final Object _messageDeliveryLock = new Object();
/** Session state : used to detect if commit is a) required b) allowed , i.e. does the tx span failover. */
@@ -420,7 +537,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
protected AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
{
- USE_AMQP_ENCODED_MAP_MESSAGE = con == null ? true : !con.isUseLegacyMapMessageFormat();
+ _useAMQPEncodedMapMessage = con == null ? true : !con.isUseLegacyMapMessageFormat();
_strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT));
_strictAMQPFATAL =
Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT));
@@ -456,7 +573,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
// If the session has been closed don't waste time creating a thread to do
// flow control
- if (!(_thisSession.isClosed() || _thisSession.isClosing()))
+ if (!(AMQSession.this.isClosed() || AMQSession.this.isClosing()))
{
// Only execute change if previous state
// was False
@@ -484,7 +601,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
// If the session has been closed don't waste time creating a thread to do
// flow control
- if (!(_thisSession.isClosed() || _thisSession.isClosing()))
+ if (!(AMQSession.this.isClosed() || AMQSession.this.isClosing()))
{
// Only execute change if previous state
// was true
@@ -1136,7 +1253,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public MapMessage createMapMessage() throws JMSException
{
checkNotClosed();
- if (USE_AMQP_ENCODED_MAP_MESSAGE)
+ if (_useAMQPEncodedMapMessage)
{
AMQPEncodedMapMessage msg = new AMQPEncodedMapMessage(getMessageDelegateFactory());
msg.setAMQSession(this);
@@ -1173,12 +1290,12 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public P createProducer(Destination destination) throws JMSException
{
- return createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE);
+ return createProducerImpl(destination, _defaultMandatoryValue, _defaultImmediateValue);
}
public P createProducer(Destination destination, boolean immediate) throws JMSException
{
- return createProducerImpl(destination, DEFAULT_MANDATORY, immediate);
+ return createProducerImpl(destination, _defaultMandatoryValue, immediate);
}
public P createProducer(Destination destination, boolean mandatory, boolean immediate)
@@ -1625,6 +1742,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
return (counter != null) && (counter.get() != 0);
}
+ /** Indicates that warnings should be generated on violations of the strict AMQP. */
public boolean isStrictAMQP()
{
return _strictAMQP;
@@ -2915,12 +3033,12 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
else
{
- if (DECLARE_EXCHANGES)
+ if (_declareExchanges)
{
declareExchange(amqd, protocolHandler, nowait);
}
- if (DECLARE_QUEUES || amqd.isNameRequired())
+ if (_delareQueues || amqd.isNameRequired())
{
declareQueue(amqd, protocolHandler, consumer.isNoLocal(), nowait);
}
@@ -3141,17 +3259,17 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
synchronized (_flowControl)
{
while (!_flowControl.getFlowControl() &&
- (expiryTime == 0L ? (expiryTime = System.currentTimeMillis() + FLOW_CONTROL_WAIT_FAILURE)
+ (expiryTime == 0L ? (expiryTime = System.currentTimeMillis() + _flowControlWaitFailure)
: expiryTime) >= System.currentTimeMillis() )
{
- _flowControl.wait(FLOW_CONTROL_WAIT_PERIOD);
- _logger.warn("Message send delayed by " + (System.currentTimeMillis() + FLOW_CONTROL_WAIT_FAILURE - expiryTime)/1000 + "s due to broker enforced flow control");
+ _flowControl.wait(_flowControlWaitPeriod);
+ _logger.warn("Message send delayed by " + (System.currentTimeMillis() + _flowControlWaitFailure - expiryTime)/1000 + "s due to broker enforced flow control");
}
if(!_flowControl.getFlowControl())
{
_logger.error("Message send failed due to timeout waiting on broker enforced flow control");
- throw new JMSException("Unable to send message for " + FLOW_CONTROL_WAIT_FAILURE/1000 + " seconds due to broker enforced flow control");
+ throw new JMSException("Unable to send message for " + _flowControlWaitFailure /1000 + " seconds due to broker enforced flow control");
}
}
@@ -3198,6 +3316,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
+ private AtomicBoolean getClosed()
+ {
+ return _closed;
+ }
+
public void rejectPending(C consumer)
{
synchronized (_lock)
@@ -3333,7 +3456,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
if (_dispatcherLogger.isInfoEnabled())
{
- _dispatcherLogger.info(_dispatcherThread.getName() + " thread terminating for channel " + _channelId + ":" + _thisSession);
+ _dispatcherLogger.info(_dispatcherThread.getName() + " thread terminating for channel " + _channelId + ":" + AMQSession.this);
}
}
@@ -3454,7 +3577,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
if (_logger.isDebugEnabled())
{
_logger.debug("Rejecting message with delivery tag " + message.getDeliveryTag()
- + " for closing consumer " + String.valueOf(consumer == null? null: consumer._consumerTag));
+ + " for closing consumer " + String.valueOf(consumer == null? null: consumer.getConsumerTag()));
}
rejectMessage(message, true);
}
@@ -3513,7 +3636,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
// If the session has closed by the time we get here
// then we should not attempt to write to the sesion/channel.
- if (!(_thisSession.isClosed() || _thisSession.isClosing()))
+ if (!(AMQSession.this.isClosed() || AMQSession.this.isClosing()))
{
suspendChannel(_suspend.get());
}
@@ -3521,11 +3644,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
catch (AMQException e)
{
- _logger.warn("Unable to " + (_suspend.get() ? "suspend" : "unsuspend") + " session " + _thisSession + " due to: " + e);
+ _logger.warn("Unable to " + (_suspend.get() ? "suspend" : "unsuspend") + " session " + AMQSession.this + " due to: " + e);
if (_logger.isDebugEnabled())
{
_logger.debug("Is the _queue empty?" + _queue.isEmpty());
- _logger.debug("Is the dispatcher closed?" + (_dispatcher == null ? "it's Null" : _dispatcher._closed));
+ _logger.debug("Is the dispatcher closed?" + (_dispatcher == null ? "it's Null" : _dispatcher.getClosed()));
}
}
}
@@ -3556,7 +3679,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public boolean isDeclareExchanges()
{
- return DECLARE_EXCHANGES;
+ return _declareExchanges;
}
JMSException toJMSException(String message, TransportException e)
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 784e82237d..cfd5776c0a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -78,6 +78,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
private static final Logger _logger = LoggerFactory.getLogger(AMQSession_0_10.class);
private static Timer timer = new Timer("ack-flusher", true);
+
private static class Flusher extends TimerTask
{
@@ -120,7 +121,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
private AMQException _currentException;
// a ref on the qpid connection
- protected org.apache.qpid.transport.Connection _qpidConnection;
+ private org.apache.qpid.transport.Connection _qpidConnection;
private long maxAckDelay = Long.getLong("qpid.session.max_ack_delay", 1000);
private TimerTask flushTask = null;
@@ -163,7 +164,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
_qpidSession = _qpidConnection.createSession(name,1);
}
_qpidSession.setSessionListener(this);
- if (_transacted)
+ if (isTransacted())
{
_qpidSession.txSelect();
_qpidSession.setTransacted(true);
@@ -214,6 +215,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
+ protected Connection getQpidConnection()
+ {
+ return _qpidConnection;
+ }
+
//------- overwritten methods of class AMQSession
void failoverPrep()
@@ -234,17 +240,17 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Sending ack for delivery tag " + deliveryTag + " on session " + _channelId);
+ _logger.debug("Sending ack for delivery tag " + deliveryTag + " on session " + getChannelId());
}
// acknowledge this message
if (multiple)
{
- for (Long messageTag : _unacknowledgedMessageTags)
+ for (Long messageTag : getUnacknowledgedMessageTags())
{
if( messageTag <= deliveryTag )
{
addUnacked(messageTag.intValue());
- _unacknowledgedMessageTags.remove(messageTag);
+ getUnacknowledgedMessageTags().remove(messageTag);
}
}
//empty the list of unack messages
@@ -253,12 +259,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
else
{
addUnacked((int) deliveryTag);
- _unacknowledgedMessageTags.remove(deliveryTag);
+ getUnacknowledgedMessageTags().remove(deliveryTag);
}
long prefetch = getAMQConnection().getMaxPrefetch();
- if (unackedCount >= prefetch/2 || maxAckDelay <= 0 || _acknowledgeMode == javax.jms.Session.AUTO_ACKNOWLEDGE)
+ if (unackedCount >= prefetch/2 || maxAckDelay <= 0 || getAcknowledgeMode() == javax.jms.Session.AUTO_ACKNOWLEDGE)
{
flushAcknowledgments();
}
@@ -276,7 +282,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
if (unackedCount > 0)
{
messageAcknowledge
- (unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE,setSyncBit);
+ (unacked, getAcknowledgeMode() != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE,setSyncBit);
clearUnacked();
}
}
@@ -444,8 +450,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
// release all unacked messages
RangeSet all = RangeSetFactory.createRangeSet();
- RangeSet delivered = gatherRangeSet(_unacknowledgedMessageTags);
- RangeSet prefetched = gatherRangeSet(_prefetchedMessageTags);
+ RangeSet delivered = gatherRangeSet(getUnacknowledgedMessageTags());
+ RangeSet prefetched = gatherRangeSet(getPrefetchedMessageTags());
for (Iterator<Range> deliveredIter = delivered.iterator(); deliveredIter.hasNext();)
{
Range range = deliveredIter.next();
@@ -526,9 +532,9 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
final AMQProtocolHandler protocolHandler = getProtocolHandler();
- return new BasicMessageConsumer_0_10(_channelId, _connection, destination, messageSelector, noLocal,
- _messageFactoryRegistry, this, protocolHandler, rawSelector, prefetchHigh,
- prefetchLow, exclusive, _acknowledgeMode, noConsume, autoClose);
+ return new BasicMessageConsumer_0_10(getChannelId(), getAMQConnection(), destination, messageSelector, noLocal,
+ getMessageFactoryRegistry(), this, protocolHandler, rawSelector, prefetchHigh,
+ prefetchLow, exclusive, getAcknowledgeMode(), noConsume, autoClose);
}
/**
@@ -630,7 +636,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF,
Option.UNRELIABLE);
- if(capacity > 0 && _dispatcher != null && (isStarted() || _immediatePrefetch))
+ if(capacity > 0 && getDispatcher() != null && (isStarted() || isImmediatePrefetch()))
{
// set the flow
getQpidSession().messageFlow(consumerTag,
@@ -653,7 +659,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
try
{
- return new BasicMessageProducer_0_10(_connection, (AMQDestination) destination, _transacted, _channelId, this,
+ return new BasicMessageProducer_0_10(getAMQConnection(), (AMQDestination) destination, isTransacted(), getChannelId(), this,
getProtocolHandler(), producerId, immediate, mandatory);
}
catch (AMQException e)
@@ -795,7 +801,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
if (suspend)
{
- for (BasicMessageConsumer consumer : _consumers.values())
+ for (BasicMessageConsumer consumer : getConsumers().values())
{
getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()),
Option.UNRELIABLE);
@@ -804,7 +810,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
else
{
- for (BasicMessageConsumer_0_10 consumer : _consumers.values())
+ for (BasicMessageConsumer_0_10 consumer : getConsumers().values())
{
String consumerTag = String.valueOf(consumer.getConsumerTag());
//only set if msg list is null
@@ -942,7 +948,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
return send0_10QueueDeclare(amqd, protocolHandler, noLocal, nowait);
}
- }, _connection).execute();
+ }, getAMQConnection()).execute();
}
protected Long requestQueueDepth(AMQDestination amqd)
@@ -969,8 +975,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
protected void sendTxCompletionsIfNecessary()
{
// this is a heuristic, we may want to have that configurable
- if (_txSize > 0 && (_connection.getMaxPrefetch() == 1 ||
- _connection.getMaxPrefetch() != 0 && _txSize % (_connection.getMaxPrefetch() / 2) == 0))
+ if (_txSize > 0 && (getAMQConnection().getMaxPrefetch() == 1 ||
+ getAMQConnection().getMaxPrefetch() != 0 && _txSize % (getAMQConnection().getMaxPrefetch() / 2) == 0))
{
// send completed so consumer credits don't dry up
messageAcknowledge(_txRangeSet, false);
@@ -1040,7 +1046,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
AMQException amqe = new AMQException(AMQConstant.getConstant(code), se.getMessage(), se.getCause());
_currentException = amqe;
}
- _connection.exceptionReceived(_currentException);
+ getAMQConnection().exceptionReceived(_currentException);
}
public AMQMessageDelegateFactory getMessageDelegateFactory()
@@ -1159,7 +1165,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
boolean isConsumer,
boolean noWait) throws AMQException
{
- if (dest.isAddressResolved() && dest.isResolvedAfter(_connection.getLastFailoverTime()))
+ if (dest.isAddressResolved() && dest.isResolvedAfter(getAMQConnection().getLastFailoverTime()))
{
if (isConsumer && AMQDestination.TOPIC_TYPE == dest.getAddressType())
{
@@ -1329,7 +1335,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
protected void acknowledgeImpl()
{
- RangeSet ranges = gatherRangeSet(_unacknowledgedMessageTags);
+ RangeSet ranges = gatherRangeSet(getUnacknowledgedMessageTags());
if(ranges.size() > 0 )
{
@@ -1345,13 +1351,13 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
// return the first <total number of msgs received on session>
// messages sent by the brokers following the first rollback
// after failover
- _highestDeliveryTag.set(-1);
+ getHighestDeliveryTag().set(-1);
// Clear txRangeSet/unacknowledgedMessageTags so we don't complete commands corresponding to
//messages that came from the old broker.
_txRangeSet.clear();
_txSize = 0;
- _unacknowledgedMessageTags.clear();
- _prefetchedMessageTags.clear();
+ getUnacknowledgedMessageTags().clear();
+ getPrefetchedMessageTags().clear();
super.resubscribe();
getQpidSession().sync();
}
@@ -1362,18 +1368,18 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
super.stop();
synchronized (getMessageDeliveryLock())
{
- for (BasicMessageConsumer consumer : _consumers.values())
+ for (BasicMessageConsumer consumer : getConsumers().values())
{
List<Long> tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags();
- _prefetchedMessageTags.addAll(tags);
+ getPrefetchedMessageTags().addAll(tags);
}
}
- _usingDispatcherForCleanup = true;
+ setUsingDispatcherForCleanup(true);
drainDispatchQueue();
- _usingDispatcherForCleanup = false;
+ setUsingDispatcherForCleanup(false);
- RangeSet delivered = gatherRangeSet(_unacknowledgedMessageTags);
- RangeSet prefetched = gatherRangeSet(_prefetchedMessageTags);
+ RangeSet delivered = gatherRangeSet(getUnacknowledgedMessageTags());
+ RangeSet prefetched = gatherRangeSet(getPrefetchedMessageTags());
RangeSet all = RangeSetFactory.createRangeSet(delivered.size()
+ prefetched.size());
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 574775804b..96994e7963 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -103,7 +103,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
{
while (true)
{
- Long tag = _unacknowledgedMessageTags.poll();
+ Long tag = getUnacknowledgedMessageTags().poll();
if (tag == null)
{
break;
@@ -117,15 +117,15 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
{
BasicAckBody body = getMethodRegistry().createBasicAckBody(deliveryTag, multiple);
- final AMQFrame ackFrame = body.generateFrame(_channelId);
+ final AMQFrame ackFrame = body.generateFrame(getChannelId());
if (_logger.isDebugEnabled())
{
- _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);
+ _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + getChannelId());
}
getProtocolHandler().writeFrame(ackFrame, !isTransacted());
- _unacknowledgedMessageTags.remove(deliveryTag);
+ getUnacknowledgedMessageTags().remove(deliveryTag);
}
public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
@@ -134,7 +134,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
{
getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createQueueBindBody
(getTicket(),queueName,exchangeName,routingKey,false,arguments).
- generateFrame(_channelId), QueueBindOkBody.class);
+ generateFrame(getChannelId()), QueueBindOkBody.class);
}
public void sendClose(long timeout) throws AMQException, FailoverException
@@ -151,7 +151,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
getProtocolHandler().closeSession(this);
getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createChannelCloseBody(AMQConstant.REPLY_SUCCESS.getCode(),
- new AMQShortString("JMS client closing channel"), 0, 0).generateFrame(_channelId),
+ new AMQShortString("JMS client closing channel"), 0, 0).generateFrame(getChannelId()),
ChannelCloseOkBody.class, timeout);
// When control resumes at this point, a reply will have been received that
// indicates the broker has closed the channel successfully.
@@ -163,7 +163,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
// Acknowledge all delivered messages
while (true)
{
- Long tag = _deliveredMessageTags.poll();
+ Long tag = getDeliveredMessageTags().poll();
if (tag == null)
{
break;
@@ -174,7 +174,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
final AMQProtocolHandler handler = getProtocolHandler();
- handler.syncWrite(getProtocolHandler().getMethodRegistry().createTxCommitBody().generateFrame(_channelId), TxCommitOkBody.class);
+ handler.syncWrite(getProtocolHandler().getMethodRegistry().createTxCommitBody().generateFrame(getChannelId()), TxCommitOkBody.class);
}
public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, final boolean exclusive, final Map<String, Object> arguments) throws AMQException,
@@ -190,22 +190,22 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
}
}
QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,table);
- AMQFrame queueDeclare = body.generateFrame(_channelId);
+ AMQFrame queueDeclare = body.generateFrame(getChannelId());
getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
}
public void sendRecover() throws AMQException, FailoverException
{
enforceRejectBehaviourDuringRecover();
- _prefetchedMessageTags.clear();
- _unacknowledgedMessageTags.clear();
+ getPrefetchedMessageTags().clear();
+ getUnacknowledgedMessageTags().clear();
if (isStrictAMQP())
{
// We can't use the BasicRecoverBody-OK method as it isn't part of the spec.
BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false);
- _connection.getProtocolHandler().writeFrame(body.generateFrame(_channelId));
+ getAMQConnection().getProtocolHandler().writeFrame(body.generateFrame(getChannelId()));
_logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order.");
}
else
@@ -215,17 +215,17 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
if(getProtocolHandler().getProtocolVersion().equals(ProtocolVersion.v8_0))
{
BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false);
- _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverOkBody.class);
+ getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverOkBody.class);
}
else if(getProtocolVersion().equals(ProtocolVersion.v0_9))
{
BasicRecoverSyncBody body = ((MethodRegistry_0_9)getMethodRegistry()).createBasicRecoverSyncBody(false);
- _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverSyncOkBody.class);
+ getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverSyncOkBody.class);
}
else if(getProtocolVersion().equals(ProtocolVersion.v0_91))
{
BasicRecoverSyncBody body = ((MethodRegistry_0_91)getMethodRegistry()).createBasicRecoverSyncBody(false);
- _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverSyncOkBody.class);
+ getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverSyncOkBody.class);
}
else
{
@@ -238,9 +238,9 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Prefetched message: _unacknowledgedMessageTags :" + _unacknowledgedMessageTags);
+ _logger.debug("Prefetched message: _unacknowledgedMessageTags :" + getUnacknowledgedMessageTags());
}
- ArrayList<BasicMessageConsumer_0_8> consumersToCheck = new ArrayList<BasicMessageConsumer_0_8>(_consumers.values());
+ ArrayList<BasicMessageConsumer_0_8> consumersToCheck = new ArrayList<BasicMessageConsumer_0_8>(getConsumers().values());
boolean messageListenerFound = false;
boolean serverRejectBehaviourFound = false;
for(BasicMessageConsumer_0_8 consumer : consumersToCheck)
@@ -259,7 +259,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
if (serverRejectBehaviourFound)
{
//reject(false) any messages we don't want returned again
- switch(_acknowledgeMode)
+ switch(getAcknowledgeMode())
{
case Session.DUPS_OK_ACKNOWLEDGE:
case Session.AUTO_ACKNOWLEDGE:
@@ -268,7 +268,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
break;
}
case Session.CLIENT_ACKNOWLEDGE:
- for(Long tag : _unacknowledgedMessageTags)
+ for(Long tag : getUnacknowledgedMessageTags())
{
rejectMessage(tag, false);
}
@@ -286,7 +286,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
// consumer on the queue. Whilst this is within the JMS spec it is not
// user friendly and avoidable.
boolean normalRejectBehaviour = true;
- for (BasicMessageConsumer_0_8 consumer : _consumers.values())
+ for (BasicMessageConsumer_0_8 consumer : getConsumers().values())
{
if(RejectBehaviour.SERVER.equals(consumer.getRejectBehaviour()))
{
@@ -298,7 +298,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
while (true)
{
- Long tag = _deliveredMessageTags.poll();
+ Long tag = getDeliveredMessageTags().poll();
if (tag == null)
{
break;
@@ -310,8 +310,8 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
public void rejectMessage(long deliveryTag, boolean requeue)
{
- if ((_acknowledgeMode == CLIENT_ACKNOWLEDGE) || (_acknowledgeMode == SESSION_TRANSACTED)||
- ((_acknowledgeMode == AUTO_ACKNOWLEDGE || _acknowledgeMode == DUPS_OK_ACKNOWLEDGE ) && hasMessageListeners()))
+ if ((getAcknowledgeMode() == CLIENT_ACKNOWLEDGE) || (getAcknowledgeMode() == SESSION_TRANSACTED)||
+ ((getAcknowledgeMode() == AUTO_ACKNOWLEDGE || getAcknowledgeMode() == DUPS_OK_ACKNOWLEDGE ) && hasMessageListeners()))
{
if (_logger.isDebugEnabled())
{
@@ -319,9 +319,9 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
}
BasicRejectBody body = getMethodRegistry().createBasicRejectBody(deliveryTag, requeue);
- AMQFrame frame = body.generateFrame(_channelId);
+ AMQFrame frame = body.generateFrame(getChannelId());
- _connection.getProtocolHandler().writeFrame(frame);
+ getAMQConnection().getProtocolHandler().writeFrame(frame);
}
}
@@ -342,12 +342,12 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
public AMQMethodEvent execute() throws AMQException, FailoverException
{
AMQFrame boundFrame = getProtocolHandler().getMethodRegistry().createExchangeBoundBody
- (exchangeName, routingKey, queueName).generateFrame(_channelId);
+ (exchangeName, routingKey, queueName).generateFrame(getChannelId());
return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
}
- }, _connection).execute();
+ }, getAMQConnection()).execute();
// Extract and return the response code from the query.
ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod();
@@ -378,7 +378,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
consumer.getArguments());
- AMQFrame jmsConsume = body.generateFrame(_channelId);
+ AMQFrame jmsConsume = body.generateFrame(getChannelId());
if (nowait)
{
@@ -396,7 +396,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
ExchangeDeclareBody body = getMethodRegistry().createExchangeDeclareBody(getTicket(),name,type,
name.toString().startsWith("amq."),
false,false,false,false,null);
- AMQFrame exchangeDeclare = body.generateFrame(_channelId);
+ AMQFrame exchangeDeclare = body.generateFrame(getChannelId());
protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
}
@@ -406,7 +406,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
{
QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),amqd.getAMQQueueName(),false,amqd.isDurable(),amqd.isExclusive(),amqd.isAutoDelete(),false,null);
- AMQFrame queueDeclare = body.generateFrame(_channelId);
+ AMQFrame queueDeclare = body.generateFrame(getChannelId());
protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class);
}
@@ -418,7 +418,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
false,
false,
true);
- AMQFrame queueDeleteFrame = body.generateFrame(_channelId);
+ AMQFrame queueDeleteFrame = body.generateFrame(getChannelId());
getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class);
}
@@ -426,8 +426,8 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
public void sendSuspendChannel(boolean suspend) throws AMQException, FailoverException
{
ChannelFlowBody body = getMethodRegistry().createChannelFlowBody(!suspend);
- AMQFrame channelFlowFrame = body.generateFrame(_channelId);
- _connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class);
+ AMQFrame channelFlowFrame = body.generateFrame(getChannelId());
+ getAMQConnection().getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class);
}
public BasicMessageConsumer_0_8 createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
@@ -436,9 +436,9 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
{
final AMQProtocolHandler protocolHandler = getProtocolHandler();
- return new BasicMessageConsumer_0_8(_channelId, _connection, destination, messageSelector, noLocal,
- _messageFactoryRegistry,this, protocolHandler, arguments, prefetchHigh, prefetchLow,
- exclusive, _acknowledgeMode, noConsume, autoClose);
+ return new BasicMessageConsumer_0_8(getChannelId(), getAMQConnection(), destination, messageSelector, noLocal,
+ getMessageFactoryRegistry(),this, protocolHandler, arguments, prefetchHigh, prefetchLow,
+ exclusive, getAcknowledgeMode(), noConsume, autoClose);
}
@@ -447,7 +447,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
{
try
{
- return new BasicMessageProducer_0_8(_connection, (AMQDestination) destination, _transacted, _channelId,
+ return new BasicMessageProducer_0_8(getAMQConnection(), (AMQDestination) destination, isTransacted(), getChannelId(),
this, getProtocolHandler(), producerId, immediate, mandatory);
}
catch (AMQException e)
@@ -477,7 +477,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
private void returnBouncedMessage(final ReturnMessage msg)
{
- _connection.performConnectionTask(new Runnable()
+ getAMQConnection().performConnectionTask(new Runnable()
{
public void run()
{
@@ -485,8 +485,8 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
{
// Bounced message is processed here, away from the mina thread
AbstractJMSMessage bouncedMessage =
- _messageFactoryRegistry.createMessage(0, false, msg.getExchange(),
- msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies(),_queueDestinationCache,_topicDestinationCache);
+ getMessageFactoryRegistry().createMessage(0, false, msg.getExchange(),
+ msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies(), _queueDestinationCache, _topicDestinationCache);
AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode());
AMQShortString reason = msg.getReplyText();
_logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
@@ -494,20 +494,17 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
// @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
if (errorCode == AMQConstant.NO_CONSUMERS)
{
- _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage, null));
- }
- else if (errorCode == AMQConstant.NO_ROUTE)
+ getAMQConnection().exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage, null));
+ } else if (errorCode == AMQConstant.NO_ROUTE)
{
- _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null));
- }
- else
+ getAMQConnection().exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null));
+ } else
{
- _connection.exceptionReceived(
+ getAMQConnection().exceptionReceived(
new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage, null));
}
- }
- catch (Exception e)
+ } catch (Exception e)
{
_logger.error(
"Caught exception trying to raise undelivered message exception (dump follows) - ignoring...",
@@ -543,7 +540,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
return null;
}
- }, _connection).execute();
+ }, getAMQConnection()).execute();
}
public DestinationCache<AMQQueue> getQueueDestinationCache()
@@ -579,6 +576,15 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
return matches;
}
+ public long getMessageCount()
+ {
+ return _messageCount;
+ }
+
+ public long getConsumerCount()
+ {
+ return _consumerCount;
+ }
}
protected Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException
@@ -591,10 +597,10 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
amqd.isExclusive(),
amqd.isAutoDelete(),
false,
- null).generateFrame(_channelId);
+ null).generateFrame(getChannelId());
QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler();
getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler);
- return okHandler._messageCount;
+ return okHandler.getMessageCount();
}
protected boolean tagLE(long tag1, long tag2)
@@ -655,7 +661,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
{
// if the Connection has closed then we should throw any exception that
// has occurred that we were not waiting for
- AMQStateManager manager = _connection.getProtocolHandler()
+ AMQStateManager manager = getAMQConnection().getProtocolHandler()
.getStateManager();
Exception e = manager.getLastException();
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
index 00df898ec4..f09ef5e01d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
@@ -174,7 +174,7 @@ public class AMQTopic extends AMQDestination implements Topic
}
else
{
- return _exchangeName;
+ return super.getExchangeName();
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopicSessionAdaptor.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopicSessionAdaptor.java
index 597cd6301b..6e454cdae9 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQTopicSessionAdaptor.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopicSessionAdaptor.java
@@ -26,7 +26,7 @@ import java.io.Serializable;
public class AMQTopicSessionAdaptor implements TopicSession, AMQSessionAdapter
{
- protected final AMQSession _session;
+ private final AMQSession _session;
public AMQTopicSessionAdaptor(Session session)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 568d47080b..f0f2c85c2f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -60,14 +60,13 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
{
private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class);
- /** The connection being used by this consumer */
- protected final AMQConnection _connection;
+ private final AMQConnection _connection;
- protected final MessageFilter _messageSelectorFilter;
+ private final MessageFilter _messageSelectorFilter;
private final boolean _noLocal;
- protected AMQDestination _destination;
+ private AMQDestination _destination;
/**
* When true indicates that a blocking receive call is in progress
@@ -78,23 +77,17 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
*/
private final AtomicReference<MessageListener> _messageListener = new AtomicReference<MessageListener>();
- /** The consumer tag allows us to close the consumer by sending a jmsCancel method to the broker */
- protected int _consumerTag;
+ private int _consumerTag;
- /** We need to know the channel id when constructing frames */
- protected final int _channelId;
+ private final int _channelId;
- /**
- * Used in the blocking receive methods to receive a message from the Session thread. <p/> Or to notify of errors
- * <p/> Argument true indicates we want strict FIFO semantics
- */
- protected final BlockingQueue _synchronousQueue;
+ private final BlockingQueue _synchronousQueue;
- protected final MessageFactoryRegistry _messageFactory;
+ private final MessageFactoryRegistry _messageFactory;
- protected final AMQSession _session;
+ private final AMQSession _session;
- protected final AMQProtocolHandler _protocolHandler;
+ private final AMQProtocolHandler _protocolHandler;
/**
* We need to store the "raw" field table so that we can resubscribe in the event of failover being required
@@ -113,17 +106,9 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
*/
private final int _prefetchLow;
- /**
- * We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover
- */
- protected boolean _exclusive;
+ private boolean _exclusive;
- /**
- * The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes per
- * consumer whereas JMS defines this at the session level, hence why we associate it with the consumer in our
- * implementation.
- */
- protected final int _acknowledgeMode;
+ private final int _acknowledgeMode;
/**
* List of tags delievered, The last of which which should be acknowledged on commit in transaction mode.
@@ -238,6 +223,11 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
return _messageListener.get();
}
+ /**
+ * The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes per
+ * consumer whereas JMS defines this at the session level, hence why we associate it with the consumer in our
+ * implementation.
+ */
public int getAcknowledgeMode()
{
return _acknowledgeMode;
@@ -377,6 +367,9 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
return _noLocal;
}
+ /**
+ * We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover
+ */
public boolean isExclusive()
{
return _exclusive;
@@ -865,6 +858,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
_session.deregisterConsumer(this);
}
+ /** The consumer tag allows us to close the consumer by sending a jmsCancel method to the broker */
public int getConsumerTag()
{
return _consumerTag;
@@ -1014,4 +1008,40 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
public void failedOverPost() {}
+ /** The connection being used by this consumer */
+ protected AMQConnection getConnection()
+ {
+ return _connection;
+ }
+
+ protected void setDestination(AMQDestination destination)
+ {
+ _destination = destination;
+ }
+
+ /** We need to know the channel id when constructing frames */
+ protected int getChannelId()
+ {
+ return _channelId;
+ }
+
+ /**
+ * Used in the blocking receive methods to receive a message from the Session thread. <p/> Or to notify of errors
+ * <p/> Argument true indicates we want strict FIFO semantics
+ */
+ protected BlockingQueue getSynchronousQueue()
+ {
+ return _synchronousQueue;
+ }
+
+ protected MessageFactoryRegistry getMessageFactory()
+ {
+ return _messageFactory;
+ }
+
+ protected AMQProtocolHandler getProtocolHandler()
+ {
+ return _protocolHandler;
+ }
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 5dadcd5ca3..ccde720673 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -57,7 +57,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
/**
* This class logger
*/
- protected final Logger _logger = LoggerFactory.getLogger(getClass());
+ private final Logger _logger = LoggerFactory.getLogger(getClass());
/**
* The underlying QpidSession
@@ -78,7 +78,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
private final long _capacity;
/** Flag indicating if the server supports message selectors */
- protected final boolean _serverJmsSelectorSupport;
+ private final boolean _serverJmsSelectorSupport;
protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
@@ -103,8 +103,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
if (!namedQueue)
{
- _destination = destination.copyDestination();
- _destination.setQueueName(null);
+ setDestination(destination.copyDestination());
+ getDestination().setQueueName(null);
}
}
}
@@ -192,14 +192,14 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
{
super.preDeliver(jmsMsg);
- if (_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE)
+ if (getAcknowledgeMode() == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE)
{
//For 0-10 we need to ensure that all messages are indicated processed in some way to
//ensure their AMQP command-id is marked completed, and so we must send a completion
//even for no-ack messages even though there isnt actually an 'acknowledgement' occurring.
//Add message to the unacked message list to ensure we dont lose record of it before
//sending a completion of some sort.
- _session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag());
+ getSession().addUnacknowledgedMessage(jmsMsg.getDeliveryTag());
}
}
@@ -207,7 +207,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_10 msg) throws Exception
{
AMQMessageDelegate_0_10.updateExchangeTypeMapping(msg.getMessageTransfer().getHeader(), ((AMQSession_0_10)getSession()).getQpidSession());
- return _messageFactory.createMessage(msg.getMessageTransfer());
+ return getMessageFactory().createMessage(msg.getMessageTransfer());
}
/**
@@ -222,9 +222,9 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
boolean messageOk = true;
try
{
- if (_messageSelectorFilter != null && !_serverJmsSelectorSupport)
+ if (getMessageSelectorFilter() != null && !_serverJmsSelectorSupport)
{
- messageOk = _messageSelectorFilter.matches(message);
+ messageOk = getMessageSelectorFilter().matches(message);
}
}
catch (Exception e)
@@ -285,7 +285,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
{
_0_10session.messageAcknowledge
(Range.newInstance((int) message.getDeliveryTag()),
- _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
+ getAcknowledgeMode() != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
final AMQException amqe = _0_10session.getCurrentException();
if (amqe != null)
@@ -349,20 +349,20 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
{
messageFlow();
}
- if (messageListener != null && !_synchronousQueue.isEmpty())
+ if (messageListener != null && !getSynchronousQueue().isEmpty())
{
- Iterator messages=_synchronousQueue.iterator();
+ Iterator messages= getSynchronousQueue().iterator();
while (messages.hasNext())
{
AbstractJMSMessage message=(AbstractJMSMessage) messages.next();
messages.remove();
- _session.rejectMessage(message, true);
+ getSession().rejectMessage(message, true);
}
}
}
catch(TransportException e)
{
- throw _session.toJMSException("Exception while setting message listener:"+ e.getMessage(), e);
+ throw getSession().toJMSException("Exception while setting message listener:" + e.getMessage(), e);
}
}
@@ -389,7 +389,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
{
_syncReceive.set(true);
}
- if (_0_10session.isStarted() && _capacity == 0 && _synchronousQueue.isEmpty())
+ if (_0_10session.isStarted() && _capacity == 0 && getSynchronousQueue().isEmpty())
{
messageFlow();
}
@@ -426,19 +426,19 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
{
super.postDeliver(msg);
- switch (_acknowledgeMode)
+ switch (getAcknowledgeMode())
{
case Session.SESSION_TRANSACTED:
_0_10session.sendTxCompletionsIfNecessary();
break;
case Session.NO_ACKNOWLEDGE:
- if (!_session.isInRecovery())
+ if (!getSession().isInRecovery())
{
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ getSession().acknowledgeMessage(msg.getDeliveryTag(), false);
}
break;
case Session.AUTO_ACKNOWLEDGE:
- if (!_session.isInRecovery() && _session.getAMQConnection().getSyncAck())
+ if (!getSession().isInRecovery() && getSession().getAMQConnection().getSyncAck())
{
((AMQSession_0_10) getSession()).getQpidSession().sync();
}
@@ -454,10 +454,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
@Override public void rollbackPendingMessages()
{
- if (_synchronousQueue.size() > 0)
+ if (getSynchronousQueue().size() > 0)
{
RangeSet ranges = RangeSetFactory.createRangeSet();
- Iterator iterator = _synchronousQueue.iterator();
+ Iterator iterator = getSynchronousQueue().iterator();
while (iterator.hasNext())
{
@@ -497,7 +497,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
}
else
{
- return _exclusive;
+ return super.isExclusive();
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
index 755c95fcfc..b00f9dd98a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
@@ -44,7 +44,7 @@ import javax.jms.Message;
public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMessage_0_8>
{
- protected final Logger _logger = LoggerFactory.getLogger(getClass());
+ private final Logger _logger = LoggerFactory.getLogger(getClass());
private AMQSession_0_8.DestinationCache<AMQTopic> _topicDestinationCache;
private AMQSession_0_8.DestinationCache<AMQQueue> _queueDestinationCache;
@@ -95,11 +95,11 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
void sendCancel() throws AMQException, FailoverException
{
- BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(new AMQShortString(String.valueOf(_consumerTag)), false);
+ BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(new AMQShortString(String.valueOf(getConsumerTag())), false);
- final AMQFrame cancelFrame = body.generateFrame(_channelId);
+ final AMQFrame cancelFrame = body.generateFrame(getChannelId());
- _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class);
+ getProtocolHandler().syncWrite(cancelFrame, BasicCancelOkBody.class);
if (_logger.isDebugEnabled())
{
@@ -110,9 +110,9 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_8 messageFrame)throws Exception
{
- return _messageFactory.createMessage(messageFrame.getDeliveryTag(),
- messageFrame.isRedelivered(), messageFrame.getExchange(),
- messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies(),
+ return getMessageFactory().createMessage(messageFrame.getDeliveryTag(),
+ messageFrame.isRedelivered(), messageFrame.getExchange(),
+ messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies(),
_queueDestinationCache, _topicDestinationCache);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 0275cd2fd5..84747d6f09 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -47,16 +47,76 @@ import java.util.UUID;
public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
{
+ /**
+ * If true, messages will not get a timestamp.
+ */
+ protected boolean isDisableTimestamps()
+ {
+ return _disableTimestamps;
+ }
+
+ protected void setDisableTimestamps(boolean disableTimestamps)
+ {
+ _disableTimestamps = disableTimestamps;
+ }
+
+ protected void setDestination(AMQDestination destination)
+ {
+ _destination = destination;
+ }
+
+ protected AMQProtocolHandler getProtocolHandler()
+ {
+ return _protocolHandler;
+ }
+
+ protected void setProtocolHandler(AMQProtocolHandler protocolHandler)
+ {
+ _protocolHandler = protocolHandler;
+ }
+
+ protected int getChannelId()
+ {
+ return _channelId;
+ }
+
+ protected void setChannelId(int channelId)
+ {
+ _channelId = channelId;
+ }
+
+ protected void setSession(AMQSession session)
+ {
+ _session = session;
+ }
+
+ protected String getUserID()
+ {
+ return _userID;
+ }
+
+ protected void setUserID(String userID)
+ {
+ _userID = userID;
+ }
+
+ protected PublishMode getPublishMode()
+ {
+ return publishMode;
+ }
+
+ protected void setPublishMode(PublishMode publishMode)
+ {
+ this.publishMode = publishMode;
+ }
+
enum PublishMode { ASYNC_PUBLISH_ALL, SYNC_PUBLISH_PERSISTENT, SYNC_PUBLISH_ALL };
- protected final Logger _logger = LoggerFactory.getLogger(getClass());
+ private final Logger _logger = LoggerFactory.getLogger(getClass());
private AMQConnection _connection;
- /**
- * If true, messages will not get a timestamp.
- */
- protected boolean _disableTimestamps;
+ private boolean _disableTimestamps;
/**
* Priority of messages created by this producer.
@@ -73,10 +133,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
*/
private int _deliveryMode = DeliveryMode.PERSISTENT;
- /**
- * The Destination used for this consumer, if specified upon creation.
- */
- protected AMQDestination _destination;
+ private AMQDestination _destination;
/**
* Default encoding used for messages produced by this producer.
@@ -88,14 +145,14 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
*/
private String _mimeType;
- protected AMQProtocolHandler _protocolHandler;
+ private AMQProtocolHandler _protocolHandler;
/**
* True if this producer was created from a transacted session
*/
private boolean _transacted;
- protected int _channelId;
+ private int _channelId;
/**
* This is an id generated by the session and is used to tie individual producers to the session. This means we
@@ -105,10 +162,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
*/
private long _producerId;
- /**
- * The session used to create this producer
- */
- protected AMQSession _session;
+ private AMQSession _session;
private final boolean _immediate;
@@ -118,11 +172,11 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
private UUIDGen _messageIdGenerator = UUIDs.newGenerator();
- protected String _userID; // ref user id used in the connection.
+ private String _userID; // ref user id used in the connection.
private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0];
- protected PublishMode publishMode = PublishMode.ASYNC_PUBLISH_ALL;
+ private PublishMode publishMode = PublishMode.ASYNC_PUBLISH_ALL;
protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory) throws AMQException
@@ -256,6 +310,14 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
return _timeToLive;
}
+ protected AMQDestination getAMQDestination()
+ {
+ return _destination;
+ }
+
+ /**
+ * The Destination used for this consumer, if specified upon creation.
+ */
public Destination getDestination() throws JMSException
{
checkNotClosed();
@@ -564,6 +626,9 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
}
+ /**
+ * The session used to create this producer
+ */
public AMQSession getSession()
{
return _session;
@@ -580,4 +645,10 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
throw getSession().toJMSException("Exception whilst checking destination binding:" + e.getMessage(), e);
}
}
+
+ Logger getLogger()
+ {
+ return _logger;
+ }
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index 452e76776b..91811ccf98 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -65,7 +65,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
{
super(connection, destination, transacted, channelId, session, protocolHandler, producerId, immediate, mandatory);
- userIDBytes = Strings.toUTF8(_userID);
+ userIDBytes = Strings.toUTF8(getUserID());
}
void declareDestination(AMQDestination destination) throws AMQException
@@ -125,7 +125,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
}
long currentTime = 0;
- if (timeToLive > 0 || !_disableTimestamps)
+ if (timeToLive > 0 || !isDisableTimestamps())
{
currentTime = System.currentTimeMillis();
}
@@ -136,7 +136,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
message.setJMSExpiration(currentTime + timeToLive);
}
- if (!_disableTimestamps)
+ if (!isDisableTimestamps())
{
deliveryProp.setTimestamp(currentTime);
@@ -213,8 +213,8 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
// if true, we need to sync the delivery of this message
boolean sync = false;
- sync = ( (publishMode == PublishMode.SYNC_PUBLISH_ALL) ||
- (publishMode == PublishMode.SYNC_PUBLISH_PERSISTENT &&
+ sync = ( (getPublishMode() == PublishMode.SYNC_PUBLISH_ALL) ||
+ (getPublishMode() == PublishMode.SYNC_PUBLISH_PERSISTENT &&
deliveryMode == DeliveryMode.PERSISTENT)
);
@@ -248,14 +248,14 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
@Override
public boolean isBound(AMQDestination destination) throws JMSException
{
- return _session.isQueueBound(destination);
+ return getSession().isQueueBound(destination);
}
@Override
public void close() throws JMSException
{
super.close();
- AMQDestination dest = _destination;
+ AMQDestination dest = getAMQDestination();
if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
{
if (dest.getDelete() == AddressOption.ALWAYS ||
@@ -264,7 +264,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
try
{
((AMQSession_0_10) getSession()).getQpidSession().queueDelete(
- _destination.getQueueName());
+ getAMQDestination().getQueueName());
}
catch(TransportException e)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
index 94121db99f..3b5e361f97 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
@@ -54,7 +54,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
final MethodRegistry methodRegistry = getSession().getMethodRegistry();
ExchangeDeclareBody body =
- methodRegistry.createExchangeDeclareBody(_session.getTicket(),
+ methodRegistry.createExchangeDeclareBody(getSession().getTicket(),
destination.getExchangeName(),
destination.getExchangeClass(),
destination.getExchangeName().toString().startsWith("amq."),
@@ -66,29 +66,29 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
// Declare the exchange
// Note that the durable and internal arguments are ignored since passive is set to false
- AMQFrame declare = body.generateFrame(_channelId);
+ AMQFrame declare = body.generateFrame(getChannelId());
- _protocolHandler.writeFrame(declare);
+ getProtocolHandler().writeFrame(declare);
}
void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
UUID messageId, int deliveryMode,int priority, long timeToLive, boolean mandatory,
boolean immediate) throws JMSException
{
- BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(_session.getTicket(),
+ BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(getSession().getTicket(),
destination.getExchangeName(),
destination.getRoutingKey(),
mandatory,
immediate);
- AMQFrame publishFrame = body.generateFrame(_channelId);
+ AMQFrame publishFrame = body.generateFrame(getChannelId());
message.prepareForSending();
ByteBuffer payload = message.getData();
AMQMessageDelegate_0_8 delegate = (AMQMessageDelegate_0_8) message.getDelegate();
BasicContentHeaderProperties contentHeaderProperties = delegate.getContentHeaderProperties();
- contentHeaderProperties.setUserId(_userID);
+ contentHeaderProperties.setUserId(getUserID());
//Set the JMS_QPID_DESTTYPE for 0-8/9 messages
int type;
@@ -108,7 +108,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
//Set JMS_QPID_DESTTYPE
delegate.getContentHeaderProperties().getHeaders().setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), type);
- if (!_disableTimestamps)
+ if (!isDisableTimestamps())
{
final long currentTime = System.currentTimeMillis();
contentHeaderProperties.setTimestamp(currentTime);
@@ -132,12 +132,12 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
if (payload != null)
{
- createContentBodies(payload, frames, 2, _channelId);
+ createContentBodies(payload, frames, 2, getChannelId());
}
- if ((contentBodyFrameCount != 0) && _logger.isDebugEnabled())
+ if ((contentBodyFrameCount != 0) && getLogger().isDebugEnabled())
{
- _logger.debug("Sending content body frames to " + destination);
+ getLogger().debug("Sending content body frames to " + destination);
}
@@ -145,11 +145,11 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
int classIfForBasic = getSession().getMethodRegistry().createBasicQosOkBody().getClazz();
AMQFrame contentHeaderFrame =
- ContentHeaderBody.createAMQFrame(_channelId,
+ ContentHeaderBody.createAMQFrame(getChannelId(),
classIfForBasic, 0, contentHeaderProperties, size);
- if (_logger.isDebugEnabled())
+ if (getLogger().isDebugEnabled())
{
- _logger.debug("Sending content header frame to " + destination);
+ getLogger().debug("Sending content header frame to " + destination);
}
frames[0] = publishFrame;
@@ -158,7 +158,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
try
{
- _session.checkFlowControl();
+ getSession().checkFlowControl();
}
catch (InterruptedException e)
{
@@ -168,7 +168,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
throw jmse;
}
- _protocolHandler.writeFrame(compositeFrame);
+ getProtocolHandler().writeFrame(compositeFrame);
}
/**
@@ -192,7 +192,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
else
{
- final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
+ final long framePayloadMax = getSession().getAMQConnection().getMaximumFrameSize() - 1;
long remaining = payload.remaining();
for (int i = offset; i < frames.length; i++)
{
@@ -222,7 +222,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
else
{
int dataLength = payload.remaining();
- final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
+ final long framePayloadMax = getSession().getAMQConnection().getMaximumFrameSize() - 1;
int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0;
frameCount = (int) (dataLength / framePayloadMax) + lastFrame;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/DispatcherCallback.java b/java/client/src/main/java/org/apache/qpid/client/DispatcherCallback.java
index 81a55006ed..1cd8df6e4a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/DispatcherCallback.java
+++ b/java/client/src/main/java/org/apache/qpid/client/DispatcherCallback.java
@@ -24,7 +24,7 @@ import java.util.Queue;
public abstract class DispatcherCallback
{
- BasicMessageConsumer _consumer;
+ private BasicMessageConsumer _consumer;
public DispatcherCallback(BasicMessageConsumer mc)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java b/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java
index 585d6db3fd..134159afe1 100644
--- a/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java
+++ b/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java
@@ -22,8 +22,8 @@ package org.apache.qpid.client;
public class MessageConsumerPair
{
- BasicMessageConsumer _consumer;
- Object _item;
+ private BasicMessageConsumer _consumer;
+ private Object _item;
public MessageConsumerPair(BasicMessageConsumer consumer, Object item)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java b/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
index 295c6a4091..0b797df9dd 100644
--- a/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
+++ b/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
@@ -202,7 +202,7 @@ public class QueueSenderAdapter implements QueueSender
{
if (_delegate.getSession().isStrictAMQP())
{
- _delegate._logger.warn("AMQP does not support destination validation before publish, ");
+ _delegate.getLogger().warn("AMQP does not support destination validation before publish, ");
destination.setCheckedForQueueBinding(true);
}
else
diff --git a/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java b/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java
index a7494305c6..d9514338ce 100644
--- a/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java
@@ -53,7 +53,7 @@ public class XAConnectionImpl extends AMQConnection implements XAConnection, XAQ
public synchronized XASession createXASession() throws JMSException
{
checkNotClosed();
- return _delegate.createXASession();
+ return getDelegate().createXASession();
}
//-- Interface XAQueueConnection
@@ -86,6 +86,6 @@ public class XAConnectionImpl extends AMQConnection implements XAConnection, XAQ
public XASession createXASession(int ackMode) throws JMSException
{
checkNotClosed();
- return _delegate.createXASession(ackMode);
+ return getDelegate().createXASession(ackMode);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java b/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
index 1d991372df..85623df8c0 100644
--- a/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
@@ -86,7 +86,7 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic
*/
public void createSession()
{
- _qpidDtxSession = _qpidConnection.createSession(0);
+ _qpidDtxSession = getQpidConnection().createSession(0);
_qpidDtxSession.setSessionListener(this);
_qpidDtxSession.dtxSelect();
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java
index 51cc94965a..a69e808880 100644
--- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java
+++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java
@@ -38,10 +38,10 @@ import org.apache.qpid.client.AMQConnection;
public class FailoverNoopSupport<T, E extends Exception> implements FailoverSupport<T, E>
{
/** The protected operation that is to be retried in the event of fail-over. */
- FailoverProtectedOperation<T, E> operation;
+ private FailoverProtectedOperation<T, E> operation;
/** The connection on which the fail-over protected operation is to be performed. */
- AMQConnection connection;
+ private AMQConnection connection;
/**
* Creates an automatic retrying fail-over handler for the specified operation.
diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java
index 0146f1935d..d3d33d3c75 100644
--- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java
+++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java
@@ -73,10 +73,10 @@ public class FailoverRetrySupport<T, E extends Exception> implements FailoverSup
private static final Logger _log = LoggerFactory.getLogger(FailoverRetrySupport.class);
/** The protected operation that is to be retried in the event of fail-over. */
- FailoverProtectedOperation<T, E> operation;
+ private FailoverProtectedOperation<T, E> operation;
/** The connection on which the fail-over protected operation is to be performed. */
- AMQConnection connection;
+ private AMQConnection connection;
/**
* Creates an automatic retrying fail-over handler for the specified operation.
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
index 9af225aded..558d93538b 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
@@ -104,7 +104,7 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher
return factory.createMethodDispatcher(session);
}
- AMQProtocolSession _session;
+ private AMQProtocolSession _session;
public ClientMethodDispatcherImpl(AMQProtocolSession session)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
index 98ca8ed8cb..1395f39b99 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
@@ -121,18 +121,18 @@ public abstract class AbstractAMQMessageDelegate implements AMQMessageDelegate
exchangeInfo = new ExchangeInfo(exchange.asString(),"",AMQDestination.UNKNOWN_TYPE);
}
- if ("topic".equals(exchangeInfo.exchangeType))
+ if ("topic".equals(exchangeInfo.getExchangeType()))
{
dest = new AMQTopic(exchange, routingKey, null);
}
- else if ("direct".equals(exchangeInfo.exchangeType))
+ else if ("direct".equals(exchangeInfo.getExchangeType()))
{
dest = new AMQQueue(exchange, routingKey, routingKey);
}
else
{
dest = new AMQAnyDestination(exchange,
- new AMQShortString(exchangeInfo.exchangeType),
+ new AMQShortString(exchangeInfo.getExchangeType()),
routingKey,
false,
false,
@@ -223,9 +223,9 @@ public abstract class AbstractAMQMessageDelegate implements AMQMessageDelegate
class ExchangeInfo
{
- String exchangeName;
- String exchangeType;
- int destType = AMQDestination.QUEUE_TYPE;
+ private String exchangeName;
+ private String exchangeType;
+ private int destType = AMQDestination.QUEUE_TYPE;
public ExchangeInfo(String exchangeName, String exchangeType,
int destType)
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java
index a4173be1d7..9c7bd0bdcf 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java
@@ -34,7 +34,7 @@ import java.nio.ByteBuffer;
*/
public abstract class AbstractBytesTypedMessage extends AbstractJMSMessage
{
- protected boolean _readableMessage = false;
+ private boolean _readableMessage = false;
AbstractBytesTypedMessage(AMQMessageDelegateFactory delegateFactory, boolean fromReceivedMessage)
{
@@ -75,6 +75,11 @@ public abstract class AbstractBytesTypedMessage extends AbstractJMSMessage
_readableMessage = false;
}
+ protected void setReadable(boolean readable)
+ {
+ _readableMessage = readable;
+ }
+
public String toBodyString() throws JMSException
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index 442fca6fe3..d1e43447cc 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -36,7 +36,7 @@ public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message
/** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */
- protected AMQMessageDelegate _delegate;
+ private AMQMessageDelegate _delegate;
private boolean _redelivered;
private boolean _receivedFromServer;
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
index 6fbcea8aed..608567674a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
@@ -57,25 +57,25 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
{
if (debug)
{
- _logger.debug("Non-fragmented message body (bodySize=" + contentHeader.bodySize + ")");
+ _logger.debug("Non-fragmented message body (bodySize=" + contentHeader.getBodySize() + ")");
}
- data = ByteBuffer.wrap(((ContentBody) bodies.get(0))._payload);
+ data = ByteBuffer.wrap(((ContentBody) bodies.get(0)).getPayload());
}
else if (bodies != null)
{
if (debug)
{
_logger.debug("Fragmented message body (" + bodies
- .size() + " frames, bodySize=" + contentHeader.bodySize + ")");
+ .size() + " frames, bodySize=" + contentHeader.getBodySize() + ")");
}
- data = ByteBuffer.allocate((int) contentHeader.bodySize); // XXX: Is cast a problem?
+ data = ByteBuffer.allocate((int) contentHeader.getBodySize()); // XXX: Is cast a problem?
final Iterator it = bodies.iterator();
while (it.hasNext())
{
ContentBody cb = (ContentBody) it.next();
- final ByteBuffer payload = ByteBuffer.wrap(cb._payload);
+ final ByteBuffer payload = ByteBuffer.wrap(cb.getPayload());
if(payload.isDirect() || payload.isReadOnly())
{
data.put(payload);
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
index 98cc323ad3..b0320d0f4e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
@@ -52,7 +52,7 @@ public class JMSBytesMessage extends AbstractBytesTypedMessage implements BytesM
public void reset()
{
- _readableMessage = true;
+ setReadable(true);
if(_typedBytesContentReader != null)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
index 7fca76268f..b958d89515 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
@@ -54,7 +54,7 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea
public void reset()
{
- _readableMessage = true;
+ setReadable(true);
if(_typedBytesContentReader != null)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
index 847975a5e5..c78b6ced93 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
@@ -87,13 +87,13 @@ public class UnprocessedMessage_0_8 extends UnprocessedMessage
public void receiveBody(ContentBody body)
{
- if (body._payload != null)
+ if (body.getPayload() != null)
{
- final long payloadSize = body._payload.length;
+ final long payloadSize = body.getPayload().length;
if (_bodies == null)
{
- if (payloadSize == getContentHeader().bodySize)
+ if (payloadSize == getContentHeader().getBodySize())
{
_bodies = Collections.singletonList(body);
}
@@ -124,7 +124,7 @@ public class UnprocessedMessage_0_8 extends UnprocessedMessage
public boolean isAllBodyDataReceived()
{
- return _bytesReceived == getContentHeader().bodySize;
+ return _bytesReceived == getContentHeader().getBodySize();
}
public BasicDeliverBody getDeliverBody()
diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
index c73d800b14..41f6725c8f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
+++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
@@ -29,16 +29,16 @@ public class Link
public enum Reliability { UNRELIABLE, AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE }
- protected String name;
- protected String _filter;
- protected FilterType _filterType = FilterType.SUBJECT;
- protected boolean _isNoLocal;
- protected boolean _isDurable;
- protected int _consumerCapacity = 0;
- protected int _producerCapacity = 0;
- protected Node node;
- protected Subscription subscription;
- protected Reliability reliability = Reliability.AT_LEAST_ONCE;
+ private String name;
+ private String _filter;
+ private FilterType _filterType = FilterType.SUBJECT;
+ private boolean _isNoLocal;
+ private boolean _isDurable;
+ private int _consumerCapacity = 0;
+ private int _producerCapacity = 0;
+ private Node node;
+ private Subscription subscription;
+ private Reliability reliability = Reliability.AT_LEAST_ONCE;
public Reliability getReliability()
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java
index fe469090d8..bb5bba9068 100644
--- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java
+++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/Node.java
@@ -31,13 +31,18 @@ import java.util.Map;
public abstract class Node
{
- protected int _nodeType = AMQDestination.UNKNOWN_TYPE;
- protected boolean _isDurable;
- protected boolean _isAutoDelete;
- protected String _alternateExchange;
- protected List<Binding> _bindings = new ArrayList<Binding>();
- protected Map<String,Object> _declareArgs = Collections.emptyMap();
-
+ private int _nodeType = AMQDestination.UNKNOWN_TYPE;
+ private boolean _isDurable;
+ private boolean _isAutoDelete;
+ private String _alternateExchange;
+ private List<Binding> _bindings = new ArrayList<Binding>();
+ private Map<String,Object> _declareArgs = Collections.emptyMap();
+
+ protected Node(int nodeType)
+ {
+ _nodeType = nodeType;
+ }
+
public int getType()
{
return _nodeType;
@@ -104,7 +109,7 @@ public abstract class Node
public QueueNode()
{
- _nodeType = AMQDestination.QUEUE_TYPE;
+ super(AMQDestination.QUEUE_TYPE);
}
public boolean isExclusive()
@@ -125,7 +130,7 @@ public abstract class Node
public ExchangeNode()
{
- _nodeType = AMQDestination.TOPIC_TYPE;
+ super(AMQDestination.TOPIC_TYPE);
}
public String getExchangeType()
@@ -142,5 +147,9 @@ public abstract class Node
public static class UnknownNodeType extends Node
{
+ public UnknownNodeType()
+ {
+ super(AMQDestination.UNKNOWN_TYPE);
+ }
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index c23e2ba985..d4da0ede32 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -865,159 +865,159 @@ public class AMQProtocolHandler implements ProtocolEngine
}
private static class BytesDataOutput implements DataOutput
+ {
+ private int _pos = 0;
+ private byte[] _buf;
+
+ public BytesDataOutput(byte[] buf)
{
- int _pos = 0;
- byte[] _buf;
+ _buf = buf;
+ }
- public BytesDataOutput(byte[] buf)
- {
- _buf = buf;
- }
+ public void setBuffer(byte[] buf)
+ {
+ _buf = buf;
+ _pos = 0;
+ }
- public void setBuffer(byte[] buf)
- {
- _buf = buf;
- _pos = 0;
- }
+ public void reset()
+ {
+ _pos = 0;
+ }
- public void reset()
- {
- _pos = 0;
- }
+ public int length()
+ {
+ return _pos;
+ }
- public int length()
- {
- return _pos;
- }
+ public void write(int b)
+ {
+ _buf[_pos++] = (byte) b;
+ }
- public void write(int b)
- {
- _buf[_pos++] = (byte) b;
- }
+ public void write(byte[] b)
+ {
+ System.arraycopy(b, 0, _buf, _pos, b.length);
+ _pos+=b.length;
+ }
- public void write(byte[] b)
- {
- System.arraycopy(b, 0, _buf, _pos, b.length);
- _pos+=b.length;
- }
+ public void write(byte[] b, int off, int len)
+ {
+ System.arraycopy(b, off, _buf, _pos, len);
+ _pos+=len;
- public void write(byte[] b, int off, int len)
- {
- System.arraycopy(b, off, _buf, _pos, len);
- _pos+=len;
+ }
- }
+ public void writeBoolean(boolean v)
+ {
+ _buf[_pos++] = v ? (byte) 1 : (byte) 0;
+ }
- public void writeBoolean(boolean v)
- {
- _buf[_pos++] = v ? (byte) 1 : (byte) 0;
- }
+ public void writeByte(int v)
+ {
+ _buf[_pos++] = (byte) v;
+ }
- public void writeByte(int v)
- {
- _buf[_pos++] = (byte) v;
- }
+ public void writeShort(int v)
+ {
+ _buf[_pos++] = (byte) (v >>> 8);
+ _buf[_pos++] = (byte) v;
+ }
- public void writeShort(int v)
- {
- _buf[_pos++] = (byte) (v >>> 8);
- _buf[_pos++] = (byte) v;
- }
+ public void writeChar(int v)
+ {
+ _buf[_pos++] = (byte) (v >>> 8);
+ _buf[_pos++] = (byte) v;
+ }
- public void writeChar(int v)
+ public void writeInt(int v)
+ {
+ _buf[_pos++] = (byte) (v >>> 24);
+ _buf[_pos++] = (byte) (v >>> 16);
+ _buf[_pos++] = (byte) (v >>> 8);
+ _buf[_pos++] = (byte) v;
+ }
+
+ public void writeLong(long v)
+ {
+ _buf[_pos++] = (byte) (v >>> 56);
+ _buf[_pos++] = (byte) (v >>> 48);
+ _buf[_pos++] = (byte) (v >>> 40);
+ _buf[_pos++] = (byte) (v >>> 32);
+ _buf[_pos++] = (byte) (v >>> 24);
+ _buf[_pos++] = (byte) (v >>> 16);
+ _buf[_pos++] = (byte) (v >>> 8);
+ _buf[_pos++] = (byte)v;
+ }
+
+ public void writeFloat(float v)
+ {
+ writeInt(Float.floatToIntBits(v));
+ }
+
+ public void writeDouble(double v)
+ {
+ writeLong(Double.doubleToLongBits(v));
+ }
+
+ public void writeBytes(String s)
+ {
+ int len = s.length();
+ for (int i = 0 ; i < len ; i++)
{
- _buf[_pos++] = (byte) (v >>> 8);
- _buf[_pos++] = (byte) v;
+ _buf[_pos++] = ((byte)s.charAt(i));
}
+ }
- public void writeInt(int v)
+ public void writeChars(String s)
+ {
+ int len = s.length();
+ for (int i = 0 ; i < len ; i++)
{
- _buf[_pos++] = (byte) (v >>> 24);
- _buf[_pos++] = (byte) (v >>> 16);
+ int v = s.charAt(i);
_buf[_pos++] = (byte) (v >>> 8);
_buf[_pos++] = (byte) v;
}
+ }
- public void writeLong(long v)
- {
- _buf[_pos++] = (byte) (v >>> 56);
- _buf[_pos++] = (byte) (v >>> 48);
- _buf[_pos++] = (byte) (v >>> 40);
- _buf[_pos++] = (byte) (v >>> 32);
- _buf[_pos++] = (byte) (v >>> 24);
- _buf[_pos++] = (byte) (v >>> 16);
- _buf[_pos++] = (byte) (v >>> 8);
- _buf[_pos++] = (byte)v;
- }
+ public void writeUTF(String s)
+ {
+ int strlen = s.length();
- public void writeFloat(float v)
- {
- writeInt(Float.floatToIntBits(v));
- }
+ int pos = _pos;
+ _pos+=2;
- public void writeDouble(double v)
- {
- writeLong(Double.doubleToLongBits(v));
- }
- public void writeBytes(String s)
+ for (int i = 0; i < strlen; i++)
{
- int len = s.length();
- for (int i = 0 ; i < len ; i++)
+ int c = s.charAt(i);
+ if ((c >= 0x0001) && (c <= 0x007F))
{
- _buf[_pos++] = ((byte)s.charAt(i));
- }
- }
+ c = s.charAt(i);
+ _buf[_pos++] = (byte) c;
- public void writeChars(String s)
- {
- int len = s.length();
- for (int i = 0 ; i < len ; i++)
+ }
+ else if (c > 0x07FF)
{
- int v = s.charAt(i);
- _buf[_pos++] = (byte) (v >>> 8);
- _buf[_pos++] = (byte) v;
+ _buf[_pos++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
+ _buf[_pos++] = (byte) (0x80 | ((c >> 6) & 0x3F));
+ _buf[_pos++] = (byte) (0x80 | (c & 0x3F));
}
- }
-
- public void writeUTF(String s)
- {
- int strlen = s.length();
-
- int pos = _pos;
- _pos+=2;
-
-
- for (int i = 0; i < strlen; i++)
+ else
{
- int c = s.charAt(i);
- if ((c >= 0x0001) && (c <= 0x007F))
- {
- c = s.charAt(i);
- _buf[_pos++] = (byte) c;
-
- }
- else if (c > 0x07FF)
- {
- _buf[_pos++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
- _buf[_pos++] = (byte) (0x80 | ((c >> 6) & 0x3F));
- _buf[_pos++] = (byte) (0x80 | (c & 0x3F));
- }
- else
- {
- _buf[_pos++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
- _buf[_pos++] = (byte) (0x80 | (c & 0x3F));
- }
+ _buf[_pos++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
+ _buf[_pos++] = (byte) (0x80 | (c & 0x3F));
}
-
- int len = _pos - (pos + 2);
-
- _buf[pos++] = (byte) (len >>> 8);
- _buf[pos] = (byte) len;
}
+ int len = _pos - (pos + 2);
+
+ _buf[pos++] = (byte) (len >>> 8);
+ _buf[pos] = (byte) len;
}
+ }
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 93a51adb68..c9b2e9cdc4 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -73,16 +73,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
protected static final String SASL_CLIENT = "SASLClient";
- /**
- * The handler from which this session was created and which is used to handle protocol events. We send failover
- * events to the handler.
- */
- protected final AMQProtocolHandler _protocolHandler;
+ private final AMQProtocolHandler _protocolHandler;
- /** Maps from the channel id to the AMQSession that it represents. */
- protected ConcurrentMap<Integer, AMQSession> _channelId2SessionMap = new ConcurrentHashMap<Integer, AMQSession>();
+ private ConcurrentMap<Integer, AMQSession> _channelId2SessionMap = new ConcurrentHashMap<Integer, AMQSession>();
- protected ConcurrentMap _closingChannels = new ConcurrentHashMap();
+ private ConcurrentMap _closingChannels = new ConcurrentHashMap();
/**
* Maps from a channel id to an unprocessed message. This is used to tie together the JmsDeliverBody (which arrives
@@ -91,9 +86,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
private final ConcurrentMap<Integer, UnprocessedMessage> _channelId2UnprocessedMsgMap = new ConcurrentHashMap<Integer, UnprocessedMessage>();
private final UnprocessedMessage[] _channelId2UnprocessedMsgArray = new UnprocessedMessage[16];
- /** Counter to ensure unique queue names */
- protected int _queueId = 1;
- protected final Object _queueIdLock = new Object();
+ private int _queueId = 1;
+ private final Object _queueIdLock = new Object();
private ProtocolVersion _protocolVersion;
// private VersionSpecificRegistry _registry =
@@ -104,7 +98,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
private MethodDispatcher _methodDispatcher;
- protected final AMQConnection _connection;
+ private final AMQConnection _connection;
private ConnectionTuneParameters _connectionTuneParameters;
@@ -223,7 +217,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
}
msg.setContentHeader(contentHeader);
- if (contentHeader.bodySize == 0)
+ if (contentHeader.getBodySize() == 0)
{
deliverMessageToAMQSession(channelId, msg);
}
@@ -470,4 +464,55 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
return "AMQProtocolSession[" + _connection + ']';
}
+
+ /**
+ * The handler from which this session was created and which is used to handle protocol events. We send failover
+ * events to the handler.
+ */
+ protected AMQProtocolHandler getProtocolHandler()
+ {
+ return _protocolHandler;
+ }
+
+ /** Maps from the channel id to the AMQSession that it represents. */
+ protected ConcurrentMap<Integer, AMQSession> getChannelId2SessionMap()
+ {
+ return _channelId2SessionMap;
+ }
+
+ protected void setChannelId2SessionMap(ConcurrentMap<Integer, AMQSession> channelId2SessionMap)
+ {
+ _channelId2SessionMap = channelId2SessionMap;
+ }
+
+ protected ConcurrentMap getClosingChannels()
+ {
+ return _closingChannels;
+ }
+
+ protected void setClosingChannels(ConcurrentMap closingChannels)
+ {
+ _closingChannels = closingChannels;
+ }
+
+ /** Counter to ensure unique queue names */
+ protected int getQueueId()
+ {
+ return _queueId;
+ }
+
+ protected void setQueueId(int queueId)
+ {
+ _queueId = queueId;
+ }
+
+ protected Object getQueueIdLock()
+ {
+ return _queueIdLock;
+ }
+
+ protected AMQConnection getConnection()
+ {
+ return _connection;
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
index 4350b48a10..b865c51cb7 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
@@ -63,7 +63,7 @@ public abstract class BlockingMethodFrameListener extends BlockingWaiter<AMQMeth
{
/** Holds the channel id for the channel upon which this listener is waiting for a response. */
- protected int _channelId;
+ private int _channelId;
/**
* Creates a new method listener, that filters incoming method to just those that match the specified channel id.
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
index b52c121485..616c02f3aa 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
@@ -63,7 +63,7 @@ public class AMQStateManager implements AMQMethodListener
private static final long MAXIMUM_STATE_WAIT_TIME = Long.parseLong(System.getProperty("amqj.MaximumStateWait", "30000"));
- protected final List<StateWaiter> _waiters = new CopyOnWriteArrayList<StateWaiter>();
+ private final List<StateWaiter> _waiters = new CopyOnWriteArrayList<StateWaiter>();
private Exception _lastException;
public AMQStateManager()
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java b/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java
index 6e47e2ce28..9e21e1c4ab 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java
@@ -36,7 +36,7 @@ import org.apache.qpid.jms.BrokerDetails;
*/
public class AMQNoTransportForProtocolException extends AMQTransportConnectionException
{
- BrokerDetails _details;
+ private BrokerDetails _details;
public AMQNoTransportForProtocolException(BrokerDetails details, String message, Throwable cause)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java b/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java
index db7c16974a..3c9a6e1500 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java
@@ -85,7 +85,7 @@ public class ClientConnectionDelegate extends ClientDelegate
protected SaslClient createSaslClient(List<Object> brokerMechs) throws ConnectionException, SaslException
{
final String brokerMechanisms = Strings.join(" ", brokerMechs);
- final String restrictionList = _conSettings.getSaslMechs();
+ final String restrictionList = getConnectionSettings().getSaslMechs();
final String selectedMech = CallbackHandlerRegistry.getInstance().selectMechanism(brokerMechanisms, restrictionList);
if (selectedMech == null)
{
@@ -96,14 +96,14 @@ public class ClientConnectionDelegate extends ClientDelegate
}
Map<String,Object> saslProps = new HashMap<String,Object>();
- if (_conSettings.isUseSASLEncryption())
+ if (getConnectionSettings().isUseSASLEncryption())
{
saslProps.put(Sasl.QOP, "auth-conf");
}
final AMQCallbackHandler handler = CallbackHandlerRegistry.getInstance().createCallbackHandler(selectedMech);
handler.initialise(_connectionURL);
- final SaslClient sc = Sasl.createSaslClient(new String[] {selectedMech}, null, _conSettings.getSaslProtocol(), _conSettings.getSaslServerName(), saslProps, handler);
+ final SaslClient sc = Sasl.createSaslClient(new String[] {selectedMech}, null, getConnectionSettings().getSaslProtocol(), getConnectionSettings().getSaslServerName(), saslProps, handler);
return sc;
}
@@ -137,7 +137,7 @@ public class ClientConnectionDelegate extends ClientDelegate
private String getKerberosUser()
{
LOGGER.debug("Obtaining userID from kerberos");
- String service = _conSettings.getSaslProtocol() + "@" + _conSettings.getSaslServerName();
+ String service = getConnectionSettings().getSaslProtocol() + "@" + getConnectionSettings().getSaslServerName();
GSSManager manager = GSSManager.getInstance();
try
diff --git a/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java b/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
index 5d36b2f19e..c371341265 100644
--- a/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
+++ b/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
@@ -85,7 +85,7 @@ public abstract class BlockingWaiter<T>
private volatile Exception _error;
/** Holds the incomming Object. */
- protected Object _doneObject = null;
+ private Object _doneObject = null;
private AtomicBoolean _waiting = new AtomicBoolean(false);
private boolean _closed = false;
diff --git a/java/client/src/main/java/org/apache/qpid/collections/ReferenceMap.java b/java/client/src/main/java/org/apache/qpid/collections/ReferenceMap.java
index 7bc1322e02..84e4704867 100644
--- a/java/client/src/main/java/org/apache/qpid/collections/ReferenceMap.java
+++ b/java/client/src/main/java/org/apache/qpid/collections/ReferenceMap.java
@@ -795,10 +795,10 @@ public class ReferenceMap extends AbstractMap
// the mapping is stale and should be removed.
private class Entry implements Map.Entry, KeyValue {
- Object key;
- Object value;
- int hash;
- Entry next;
+ private Object key;
+ private Object value;
+ private int hash;
+ private Entry next;
public Entry(Object key, int hash, Object value, Entry next) {
@@ -887,17 +887,17 @@ public class ReferenceMap extends AbstractMap
private class EntryIterator implements Iterator {
// These fields keep track of where we are in the table.
- int index;
- Entry entry;
- Entry previous;
+ private int index;
+ private Entry entry;
+ private Entry previous;
// These Object fields provide hard references to the
// current and next entry; this assures that if hasNext()
// returns true, next() will actually return a valid element.
- Object nextKey, nextValue;
- Object currentKey, currentValue;
+ private Object nextKey, nextValue;
+ private Object currentKey, currentValue;
- int expectedModCount;
+ private int expectedModCount;
public EntryIterator() {
diff --git a/java/client/src/main/java/org/apache/qpid/collections/keyvalue/AbstractKeyValue.java b/java/client/src/main/java/org/apache/qpid/collections/keyvalue/AbstractKeyValue.java
index a7ca67ad15..f1b6d11bee 100644
--- a/java/client/src/main/java/org/apache/qpid/collections/keyvalue/AbstractKeyValue.java
+++ b/java/client/src/main/java/org/apache/qpid/collections/keyvalue/AbstractKeyValue.java
@@ -33,9 +33,9 @@ import org.apache.qpid.collections.KeyValue;
public abstract class AbstractKeyValue implements KeyValue {
/** The key */
- protected Object key;
+ private Object key;
/** The value */
- protected Object value;
+ private Object value;
/**
* Constructs a new pair with the specified key and given value.
@@ -68,6 +68,21 @@ public abstract class AbstractKeyValue implements KeyValue {
}
/**
+ * Sets the value stored in this <code>Map.Entry</code>.
+ * <p>
+ * This <code>Map.Entry</code> is not connected to a Map, so only the
+ * local data is changed.
+ *
+ * @param value the new value
+ * @return the previous value
+ */
+ public Object setValue(Object value) {
+ Object answer = this.value;
+ this.value = value;
+ return answer;
+ }
+
+ /**
* Gets a debugging String view of the pair.
*
* @return a String view of the entry
diff --git a/java/client/src/main/java/org/apache/qpid/collections/keyvalue/AbstractMapEntry.java b/java/client/src/main/java/org/apache/qpid/collections/keyvalue/AbstractMapEntry.java
index a5223d2361..7135c31fd7 100644
--- a/java/client/src/main/java/org/apache/qpid/collections/keyvalue/AbstractMapEntry.java
+++ b/java/client/src/main/java/org/apache/qpid/collections/keyvalue/AbstractMapEntry.java
@@ -43,20 +43,7 @@ public abstract class AbstractMapEntry extends AbstractKeyValue implements Map.E
// Map.Entry interface
//-------------------------------------------------------------------------
- /**
- * Sets the value stored in this <code>Map.Entry</code>.
- * <p>
- * This <code>Map.Entry</code> is not connected to a Map, so only the
- * local data is changed.
- *
- * @param value the new value
- * @return the previous value
- */
- public Object setValue(Object value) {
- Object answer = this.value;
- this.value = value;
- return answer;
- }
+
/**
* Compares this <code>Map.Entry</code> with another <code>Map.Entry</code>.
diff --git a/java/client/src/main/java/org/apache/qpid/filter/ArithmeticExpression.java b/java/client/src/main/java/org/apache/qpid/filter/ArithmeticExpression.java
index a86613f10c..df5e2acd66 100644
--- a/java/client/src/main/java/org/apache/qpid/filter/ArithmeticExpression.java
+++ b/java/client/src/main/java/org/apache/qpid/filter/ArithmeticExpression.java
@@ -243,13 +243,13 @@ public abstract class ArithmeticExpression extends BinaryExpression
public Object evaluate(AbstractJMSMessage message) throws AMQInternalException
{
- Object lvalue = left.evaluate(message);
+ Object lvalue = getLeft().evaluate(message);
if (lvalue == null)
{
return null;
}
- Object rvalue = right.evaluate(message);
+ Object rvalue = getRight().evaluate(message);
if (rvalue == null)
{
return null;
diff --git a/java/client/src/main/java/org/apache/qpid/filter/BinaryExpression.java b/java/client/src/main/java/org/apache/qpid/filter/BinaryExpression.java
index f97f858fad..a08a6cc094 100644
--- a/java/client/src/main/java/org/apache/qpid/filter/BinaryExpression.java
+++ b/java/client/src/main/java/org/apache/qpid/filter/BinaryExpression.java
@@ -22,8 +22,8 @@ package org.apache.qpid.filter;
*/
public abstract class BinaryExpression implements Expression
{
- protected Expression left;
- protected Expression right;
+ private final Expression left;
+ private final Expression right;
public BinaryExpression(Expression left, Expression right)
{
@@ -84,20 +84,5 @@ public abstract class BinaryExpression implements Expression
*/
public abstract String getExpressionSymbol();
- /**
- * @param expression
- */
- public void setRight(Expression expression)
- {
- right = expression;
- }
-
- /**
- * @param expression
- */
- public void setLeft(Expression expression)
- {
- left = expression;
- }
}
diff --git a/java/client/src/main/java/org/apache/qpid/filter/ComparisonExpression.java b/java/client/src/main/java/org/apache/qpid/filter/ComparisonExpression.java
index eebfec0b2d..87d43ec343 100644
--- a/java/client/src/main/java/org/apache/qpid/filter/ComparisonExpression.java
+++ b/java/client/src/main/java/org/apache/qpid/filter/ComparisonExpression.java
@@ -69,7 +69,7 @@ public abstract class ComparisonExpression extends BinaryExpression implements B
static class LikeExpression extends UnaryExpression implements BooleanExpression
{
- Pattern likePattern;
+ private Pattern likePattern;
/**
* @param right
@@ -236,8 +236,8 @@ public abstract class ComparisonExpression extends BinaryExpression implements B
public Object evaluate(AbstractJMSMessage message) throws AMQInternalException
{
- Object lv = left.evaluate(message);
- Object rv = right.evaluate(message);
+ Object lv = getLeft().evaluate(message);
+ Object rv = getRight().evaluate(message);
// Iff one of the values is null
if ((lv == null) ^ (rv == null))
@@ -419,13 +419,13 @@ public abstract class ComparisonExpression extends BinaryExpression implements B
public Object evaluate(AbstractJMSMessage message) throws AMQInternalException
{
- Comparable lv = (Comparable) left.evaluate(message);
+ Comparable lv = (Comparable) getLeft().evaluate(message);
if (lv == null)
{
return null;
}
- Comparable rv = (Comparable) right.evaluate(message);
+ Comparable rv = (Comparable) getRight().evaluate(message);
if (rv == null)
{
return null;
diff --git a/java/client/src/main/java/org/apache/qpid/filter/LogicExpression.java b/java/client/src/main/java/org/apache/qpid/filter/LogicExpression.java
index 7ef85cbacb..b08b93228f 100644
--- a/java/client/src/main/java/org/apache/qpid/filter/LogicExpression.java
+++ b/java/client/src/main/java/org/apache/qpid/filter/LogicExpression.java
@@ -35,14 +35,14 @@ public abstract class LogicExpression extends BinaryExpression implements Boolea
public Object evaluate(AbstractJMSMessage message) throws AMQInternalException
{
- Boolean lv = (Boolean) left.evaluate(message);
+ Boolean lv = (Boolean) getLeft().evaluate(message);
// Can we do an OR shortcut??
if ((lv != null) && lv.booleanValue())
{
return Boolean.TRUE;
}
- Boolean rv = (Boolean) right.evaluate(message);
+ Boolean rv = (Boolean) getRight().evaluate(message);
return (rv == null) ? null : rv;
}
@@ -62,7 +62,7 @@ public abstract class LogicExpression extends BinaryExpression implements Boolea
public Object evaluate(AbstractJMSMessage message) throws AMQInternalException
{
- Boolean lv = (Boolean) left.evaluate(message);
+ Boolean lv = (Boolean) getLeft().evaluate(message);
// Can we do an AND shortcut??
if (lv == null)
@@ -75,7 +75,7 @@ public abstract class LogicExpression extends BinaryExpression implements Boolea
return Boolean.FALSE;
}
- Boolean rv = (Boolean) right.evaluate(message);
+ Boolean rv = (Boolean) getRight().evaluate(message);
return (rv == null) ? null : rv;
}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java b/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java
index ad0a625c6a..9a2e9de3d9 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java
@@ -38,8 +38,8 @@ import java.nio.ByteBuffer;
*/
public class MessagePartListenerAdapter implements MessagePartListener
{
- MessageListener _adaptee;
- ByteBufferMessage _currentMsg;
+ private MessageListener _adaptee;
+ private ByteBufferMessage _currentMsg;
public MessagePartListenerAdapter(MessageListener listener)
{
diff --git a/java/client/src/test/java/org/apache/qpid/client/AMQQueueTest.java b/java/client/src/test/java/org/apache/qpid/client/AMQQueueTest.java
index d862acf28d..bc48ee8895 100644
--- a/java/client/src/test/java/org/apache/qpid/client/AMQQueueTest.java
+++ b/java/client/src/test/java/org/apache/qpid/client/AMQQueueTest.java
@@ -26,11 +26,11 @@ import org.apache.qpid.framing.AMQShortString;
public class AMQQueueTest extends TestCase
{
- AMQShortString exchange = new AMQShortString("test.exchange");
- AMQShortString routingkey = new AMQShortString("test-route");
- AMQShortString qname = new AMQShortString("test-queue");
- AMQShortString[] oneBinding = new AMQShortString[]{new AMQShortString("bindingA")};
- AMQShortString[] bindings = new AMQShortString[]{new AMQShortString("bindingB"),
+ private AMQShortString exchange = new AMQShortString("test.exchange");
+ private AMQShortString routingkey = new AMQShortString("test-route");
+ private AMQShortString qname = new AMQShortString("test-queue");
+ private AMQShortString[] oneBinding = new AMQShortString[]{new AMQShortString("bindingA")};
+ private AMQShortString[] bindings = new AMQShortString[]{new AMQShortString("bindingB"),
new AMQShortString("bindingC")};
public void testToURLNoBindings()
diff --git a/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java b/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java
index 919809edc3..009598d8a4 100644
--- a/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java
+++ b/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java
@@ -51,13 +51,13 @@ public class MockAMQConnection extends AMQConnection
@Override
public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException
{
- _connected = true;
- _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_OPEN);
+ setConnected(true);
+ getProtocolHandler().getStateManager().changeState(AMQState.CONNECTION_OPEN);
return null;
}
public AMQConnectionDelegate getDelegate()
{
- return _delegate;
+ return super.getDelegate();
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java b/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
index 6b5fc81be6..9a5ca33174 100644
--- a/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
+++ b/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
@@ -59,15 +59,15 @@ public class AMQProtocolHandlerTest extends TestCase
private static final Logger _logger = LoggerFactory.getLogger(AMQProtocolHandlerTest.class);
// The handler to test
- AMQProtocolHandler _handler;
+ private AMQProtocolHandler _handler;
// A frame to block upon whilst waiting the exception
- AMQFrame _blockFrame;
+ private AMQFrame _blockFrame;
// Latch to know when the listener receives an exception
private CountDownLatch _handleCountDown;
// The listener that will receive an exception
- BlockToAccessFrameListener _listener;
+ private BlockToAccessFrameListener _listener;
@Override
public void setUp() throws Exception
diff --git a/java/client/src/test/java/org/apache/qpid/client/util/ClassLoadingAwareObjectInputStreamTest.java b/java/client/src/test/java/org/apache/qpid/client/util/ClassLoadingAwareObjectInputStreamTest.java
index 8cd320b06e..91460ab4e7 100644
--- a/java/client/src/test/java/org/apache/qpid/client/util/ClassLoadingAwareObjectInputStreamTest.java
+++ b/java/client/src/test/java/org/apache/qpid/client/util/ClassLoadingAwareObjectInputStreamTest.java
@@ -31,8 +31,8 @@ import java.util.List;
public class ClassLoadingAwareObjectInputStreamTest extends QpidTestCase
{
- InputStream _in;
- ClassLoadingAwareObjectInputStream _claOIS;
+ private InputStream _in;
+ private ClassLoadingAwareObjectInputStream _claOIS;
protected void setUp() throws Exception
{
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java
index 5690a05254..576ab4fa05 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java
@@ -34,7 +34,7 @@ import java.util.Properties;
public class JNDIPropertyFileTest extends TestCase
{
- Context ctx;
+ private Context ctx;
public JNDIPropertyFileTest() throws Exception
{