summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java1
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/Main.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java100
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java16
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java20
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java24
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/access/AMQUserManagementMBean.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/access/UserManagement.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java31
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabase.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java46
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerImmediatePrefetch.java70
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java72
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/close/CloseTests.java81
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java28
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java118
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java2
-rw-r--r--java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/OperationTabControl.java46
-rw-r--r--java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/ViewUtility.java22
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java2
24 files changed, 549 insertions, 173 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 2e1653e69d..97c95dae5e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -197,7 +197,6 @@ public class AMQChannel
_currentMessage = new AMQMessage(_messageStore.getNewMessageId(), info,
_txnContext);
- // TODO: used in clustering only I think (RG)
_currentMessage.setPublisher(publisher);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java
index 146d0566ce..14aa919356 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/Main.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java
@@ -70,7 +70,7 @@ public class Main
private static final String DEFAULT_CONFIG_FILE = "etc/config.xml";
private static final String DEFAULT_LOG_CONFIG_FILENAME = "log4j.xml";
-
+ public static final String QPID_HOME = "QPID_HOME";
private static final int IPV4_ADDRESS_LENGTH = 4;
private static final char IPV4_LITERAL_SEPARATOR = '.';
@@ -204,7 +204,7 @@ public class Main
protected void startup() throws InitException, ConfigurationException, Exception
{
- final String QpidHome = System.getProperty("QPID_HOME");
+ final String QpidHome = System.getProperty(QPID_HOME);
final File defaultConfigFile = new File(QpidHome, DEFAULT_CONFIG_FILE);
final File configFile = new File(commandLine.getOptionValue("c", defaultConfigFile.getPath()));
if (!configFile.exists())
@@ -213,7 +213,7 @@ public class Main
if (QpidHome == null)
{
- error = error + "\nNote: Qpid_HOME is not set.";
+ error = error + "\nNote: "+QPID_HOME+" is not set.";
}
throw new InitException(error, null);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index b2046efee3..e19038504f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -80,18 +80,17 @@ public class AMQMessage
*/
private boolean _immediate;
- private AtomicBoolean _taken = new AtomicBoolean(false);
+ // private Subscription _takenBySubcription;
+ // private AtomicBoolean _taken = new AtomicBoolean(false);
private TransientMessageData _transientMessageData = new TransientMessageData();
- private Subscription _takenBySubcription;
+
private Set<Subscription> _rejectedBy = null;
+
+
private Map<AMQQueue, AtomicBoolean> _takenMap = new HashMap<AMQQueue, AtomicBoolean>();
private Map<AMQQueue, Subscription> _takenBySubcriptionMap = new HashMap<AMQQueue, Subscription>();
- public boolean isTaken(AMQQueue queue)
- {
- return _taken.get();
- }
private final int hashcode = System.identityHashCode(this);
@@ -206,7 +205,7 @@ public class AMQMessage
_immediate = info.isImmediate();
_transientMessageData.setMessagePublishInfo(info);
- _taken = new AtomicBoolean(false);
+// _taken = new AtomicBoolean(false);
if (_log.isDebugEnabled())
{
@@ -326,7 +325,6 @@ public class AMQMessage
for (AMQQueue q : _transientMessageData.getDestinationQueues())
{
- _takenMap.put(q, new AtomicBoolean(false));
_messageHandle.enqueue(storeContext, _messageId, q);
}
@@ -459,17 +457,53 @@ public class AMQMessage
return _deliveredToConsumer;
}
-
- public boolean taken(AMQQueue queue, Subscription sub)
+ public boolean isTaken(AMQQueue queue)
{
- if (_taken.getAndSet(true))
+ //return _taken.get();
+
+ synchronized (this)
{
- return true;
+ AtomicBoolean taken = _takenMap.get(queue);
+ if (taken == null)
+ {
+ taken = new AtomicBoolean(false);
+ _takenMap.put(queue, taken);
+ }
+
+ return taken.get();
}
- else
+ }
+
+ public boolean taken(AMQQueue queue, Subscription sub)
+ {
+// if (_taken.getAndSet(true))
+// {
+// return true;
+// }
+// else
+// {
+// _takenBySubcription = sub;
+// return false;
+// }
+
+ synchronized (this)
{
- _takenBySubcription = sub;
- return false;
+ AtomicBoolean taken = _takenMap.get(queue);
+ if (taken == null)
+ {
+ taken = new AtomicBoolean(false);
+ }
+
+ if (taken.getAndSet(true))
+ {
+ return true;
+ }
+ else
+ {
+ _takenMap.put(queue, taken);
+ _takenBySubcriptionMap.put(queue, sub);
+ return false;
+ }
}
}
@@ -479,8 +513,26 @@ public class AMQMessage
{
_log.trace("Releasing Message:" + debugIdentity());
}
- _taken.set(false);
- _takenBySubcription = null;
+
+// _taken.set(false);
+// _takenBySubcription = null;
+
+
+ synchronized (this)
+ {
+ AtomicBoolean taken = _takenMap.get(queue);
+ if (taken == null)
+ {
+ taken = new AtomicBoolean(false);
+ }
+ else
+ {
+ taken.set(false);
+ }
+
+ _takenMap.put(queue, taken);
+ _takenBySubcriptionMap.put(queue, null);
+ }
}
public boolean checkToken(Object token)
@@ -833,16 +885,20 @@ public class AMQMessage
public String toString()
{
- return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
- _taken + " by :" + _takenBySubcription;
+// return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
+// _taken + " by :" + _takenBySubcription;
-// return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " +
-// _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
+ return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " +
+ _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
}
public Subscription getDeliveredSubscription(AMQQueue queue)
{
- return _takenBySubcription;
+// return _takenBySubcription;
+ synchronized (this)
+ {
+ return _takenBySubcriptionMap.get(queue);
+ }
}
public void reject(Subscription subscription)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index a418bb8f8a..65d5906d05 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -21,7 +21,6 @@
package org.apache.qpid.server.queue;
import java.text.MessageFormat;
-import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
@@ -237,8 +236,10 @@ public class AMQQueue implements Managable, Comparable
/**
* Returns messages within the given range of message Ids
+ *
* @param fromMessageId
* @param toMessageId
+ *
* @return List of messages
*/
public List<AMQMessage> getMessagesOnTheQueue(long fromMessageId, long toMessageId)
@@ -253,6 +254,7 @@ public class AMQQueue implements Managable, Comparable
/**
* @param messageId
+ *
* @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist.
*/
public AMQMessage getMessageOnTheQueue(long messageId)
@@ -267,10 +269,10 @@ public class AMQQueue implements Managable, Comparable
/**
* moves messages from this queue to another queue. to do this the approach is following- - setup the queue for
- * moving messages (stop the async delivery) - get all the messages available in the given message
- * id range - setup the other queue for moving messages (stop the async delivery) - send these
- * available messages to the other queue (enqueue in other queue) - Once sending to other Queue is successful,
- * remove messages from this queue - remove locks from both queues and start async delivery
+ * moving messages (stop the async delivery) - get all the messages available in the given message id range - setup
+ * the other queue for moving messages (stop the async delivery) - send these available messages to the other queue
+ * (enqueue in other queue) - Once sending to other Queue is successful, remove messages from this queue - remove
+ * locks from both queues and start async delivery
*
* @param fromMessageId
* @param toMessageId
@@ -442,7 +444,7 @@ public class AMQQueue implements Managable, Comparable
Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks,
filters, noLocal, this);
- if (subscription.hasFilters())
+ if (subscription.filtersMessages())
{
if (_deliveryMgr.hasQueuedMessages())
{
@@ -641,7 +643,7 @@ public class AMQQueue implements Managable, Comparable
{
_totalMessagesReceived.incrementAndGet();
}
-
+
try
{
_managedObject.checkForNotification(msg);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 979f692361..1f92cee1df 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -28,7 +28,6 @@ import java.util.Set;
import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.Executor;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicInteger;
@@ -372,7 +371,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
for (Subscription sub : _subscriptions.getSubscriptions())
{
- if (!sub.isSuspended() && sub.hasFilters())
+ if (!sub.isSuspended() && sub.filtersMessages())
{
Queue<AMQMessage> preDeliveryQueue = sub.getPreDeliveryQueue();
for (AMQMessage msg : messageList)
@@ -613,6 +612,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_processingThreadName = Thread.currentThread().getName();
}
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(debugIdentity() + "Running process Queue." + currentStatus());
+ }
+
// Continue to process delivery while we haveSubscribers and messages
boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
@@ -633,11 +637,17 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
}
}
+
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(debugIdentity() + "Done process Queue." + currentStatus());
+ }
+
}
// private void sendNextMessage(Subscription sub)
// {
-// if (sub.hasFilters())
+// if (sub.filtersMessages())
// {
// sendNextMessage(sub, sub.getPreDeliveryQueue());
// if (sub.isAutoClose())
@@ -817,6 +827,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
//are we already running? if so, don't re-run
if (_processing.compareAndSet(false, true))
{
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(debugIdentity() + "Executing Async process.");
+ }
executor.execute(asyncDelivery);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
index e9f209839a..e6d5d0c88d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
@@ -32,7 +32,7 @@ public interface Subscription
void queueDeleted(AMQQueue queue) throws AMQException;
- boolean hasFilters();
+ boolean filtersMessages();
boolean hasInterest(AMQMessage msg);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index e3944954f3..3bce950ba9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -158,7 +158,7 @@ public class SubscriptionImpl implements Subscription
}
- if (_filters != null)
+ if (filtersMessages())
{
_messages = new ConcurrentLinkedMessageQueueAtomicSize<AMQMessage>();
}
@@ -346,9 +346,9 @@ public class SubscriptionImpl implements Subscription
channel.queueDeleted(queue);
}
- public boolean hasFilters()
+ public boolean filtersMessages()
{
- return _filters != null;
+ return _filters != null || _noLocal;
}
public boolean hasInterest(AMQMessage msg)
@@ -363,7 +363,10 @@ public class SubscriptionImpl implements Subscription
// return false;
}
- if (_noLocal)
+ final AMQProtocolSession publisher = msg.getPublisher();
+
+ //todo - client id should be recoreded and this test removed but handled below
+ if (_noLocal && publisher != null)
{
// We don't want local messages so check to see if message is one we sent
Object localInstance;
@@ -372,8 +375,9 @@ public class SubscriptionImpl implements Subscription
if ((protocolSession.getClientProperties() != null) &&
(localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
{
- if ((msg.getPublisher().getClientProperties() != null) &&
- (msgInstance = msg.getPublisher().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
+
+ if ((publisher.getClientProperties() != null) &&
+ (msgInstance = publisher.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
{
if (localInstance == msgInstance || localInstance.equals(msgInstance))
{
@@ -388,8 +392,11 @@ public class SubscriptionImpl implements Subscription
}
else
{
+
localInstance = protocolSession.getClientIdentifier();
- msgInstance = msg.getPublisher().getClientIdentifier();
+ //todo - client id should be recoreded and this test removed but handled here
+
+ msgInstance = publisher.getClientIdentifier();
if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance)))
{
if (_logger.isTraceEnabled())
@@ -399,7 +406,6 @@ public class SubscriptionImpl implements Subscription
}
return false;
}
-
}
@@ -623,7 +629,7 @@ public class SubscriptionImpl implements Subscription
return _resendQueue;
}
- if (_filters != null)
+ if (filtersMessages())
{
if (isAutoClose())
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
index 26b040aae0..b500247fa4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
@@ -157,7 +157,7 @@ class SubscriptionSet implements WeightedSubscriptionManager
//FIXME the queue could be full of sent messages.
// Either need to clean all PDQs after sending a message
// OR have a clean up thread that runs the PDQs expunging the messages.
- if (!subscription.hasFilters() || subscription.getPreDeliveryQueue().isEmpty())
+ if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty())
{
return subscription;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/AMQUserManagementMBean.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/AMQUserManagementMBean.java
index a43474559d..20f123179f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/security/access/AMQUserManagementMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/AMQUserManagementMBean.java
@@ -22,7 +22,6 @@ package org.apache.qpid.server.security.access;
import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.MBeanOperationParameter;
import org.apache.qpid.server.management.MBeanOperation;
import org.apache.qpid.server.management.MBeanInvocationHandlerImpl;
import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
@@ -107,8 +106,7 @@ public class AMQUserManagementMBean extends AMQManagedObject implements UserMana
return UserManagement.TYPE;
}
- public boolean setPassword(@MBeanOperationParameter(name = "username", description = "Username")String username,
- @MBeanOperationParameter(name = "password", description = "Password")String password)
+ public boolean setPassword(String username, char[] password)
{
try
{
@@ -122,10 +120,7 @@ public class AMQUserManagementMBean extends AMQManagedObject implements UserMana
}
}
- public boolean setRights(@MBeanOperationParameter(name = "username", description = "Username")String username,
- @MBeanOperationParameter(name = "read", description = "Administration read")boolean read,
- @MBeanOperationParameter(name = "write", description = "Administration write")boolean write,
- @MBeanOperationParameter(name = "admin", description = "Administration rights")boolean admin)
+ public boolean setRights(String username, boolean read, boolean write, boolean admin)
{
if (_accessRights.get(username) == null)
@@ -179,11 +174,7 @@ public class AMQUserManagementMBean extends AMQManagedObject implements UserMana
return true;
}
- public boolean createUser(@MBeanOperationParameter(name = "username", description = "Username")String username,
- @MBeanOperationParameter(name = "password", description = "Password")String password,
- @MBeanOperationParameter(name = "read", description = "Administration read")boolean read,
- @MBeanOperationParameter(name = "write", description = "Administration write")boolean write,
- @MBeanOperationParameter(name = "admin", description = "Administration rights")boolean admin)
+ public boolean createUser(String username, char[] password, boolean read, boolean write, boolean admin)
{
if (_principalDatabase.createPrincipal(new UsernamePrincipal(username), password))
{
@@ -195,7 +186,7 @@ public class AMQUserManagementMBean extends AMQManagedObject implements UserMana
return false;
}
- public boolean deleteUser(@MBeanOperationParameter(name = "username", description = "Username")String username)
+ public boolean deleteUser(String username)
{
try
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/UserManagement.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/UserManagement.java
index 6381213398..ce5e9fa4a7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/security/access/UserManagement.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/UserManagement.java
@@ -45,7 +45,7 @@ public interface UserManagement
*/
@MBeanOperation(name = "setPassword", description = "Set password for user.")
boolean setPassword(@MBeanOperationParameter(name = "username", description = "Username")String username,
- @MBeanOperationParameter(name = "password", description = "Password")String password);
+ @MBeanOperationParameter(name = "password", description = "Password")char[] password);
/**
* set rights for users with given details
@@ -76,7 +76,7 @@ public interface UserManagement
*/
@MBeanOperation(name = "createUser", description = "Create new user from system.")
boolean createUser(@MBeanOperationParameter(name = "username", description = "Username")String username,
- @MBeanOperationParameter(name = "password", description = "Password")String password,
+ @MBeanOperationParameter(name = "password", description = "Password")char[] password,
@MBeanOperationParameter(name = "read", description = "Administration read")boolean read,
@MBeanOperationParameter(name = "write", description = "Administration write")boolean write,
@MBeanOperationParameter(name = "admin", description = "Administration rights")boolean admin);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java
index 956db64d90..cd0a371b48 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java
@@ -176,7 +176,7 @@ public class Base64MD5PasswordFilePrincipalDatabase implements PrincipalDatabase
}
}
- public boolean updatePassword(Principal principal, String password) throws AccountNotFoundException
+ public boolean updatePassword(Principal principal, char[] password) throws AccountNotFoundException
{
User user = _users.get(principal.getName());
@@ -187,13 +187,10 @@ public class Base64MD5PasswordFilePrincipalDatabase implements PrincipalDatabase
try
{
-
- char[] passwd = convertPassword(password);
-
try
{
_userUpdate.lock();
- user.setPassword(passwd);
+ user.setPassword(password);
try
{
@@ -215,7 +212,7 @@ public class Base64MD5PasswordFilePrincipalDatabase implements PrincipalDatabase
}
}
}
- catch (UnsupportedEncodingException e)
+ catch (Exception e)
{
return false;
}
@@ -237,23 +234,14 @@ public class Base64MD5PasswordFilePrincipalDatabase implements PrincipalDatabase
return passwd;
}
- public boolean createPrincipal(Principal principal, String password)
+ public boolean createPrincipal(Principal principal, char[] password)
{
if (_users.get(principal.getName()) != null)
{
return false;
}
- User user;
- try
- {
- user = new User(principal.getName(), convertPassword(password));
- }
- catch (UnsupportedEncodingException e)
- {
- _logger.warn("Unable to encode password:" + e);
- return false;
- }
+ User user = new User(principal.getName(), password);
try
{
@@ -598,8 +586,13 @@ public class Base64MD5PasswordFilePrincipalDatabase implements PrincipalDatabase
private void encodePassword() throws EncoderException, UnsupportedEncodingException, NoSuchAlgorithmException
{
- Base64 b64 = new Base64();
- _encodedPassword = b64.encode(new String(_password).getBytes(DEFAULT_ENCODING));
+ byte[] byteArray = new byte[_password.length];
+ int index = 0;
+ for (char c : _password)
+ {
+ byteArray[index++] = (byte)c;
+ }
+ _encodedPassword = (new Base64()).encode(byteArray);
}
public boolean isModified()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java
index 3f6794aaaf..90d08c963e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java
@@ -151,12 +151,12 @@ public class PlainPasswordFilePrincipalDatabase implements PrincipalDatabase
return passwd;
}
- public boolean updatePassword(Principal principal, String password) throws AccountNotFoundException
+ public boolean updatePassword(Principal principal, char[] password) throws AccountNotFoundException
{
return false; // updates denied
}
- public boolean createPrincipal(Principal principal, String password)
+ public boolean createPrincipal(Principal principal, char[] password)
{
return false; // updates denied
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabase.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabase.java
index 8073fcc3c6..494d8e0bf4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabase.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabase.java
@@ -65,7 +65,7 @@ public interface PrincipalDatabase
* @return True if change was successful
* @throws AccountNotFoundException If the given principal doesn't exist in the Database
*/
- boolean updatePassword(Principal principal, String password)
+ boolean updatePassword(Principal principal, char[] password)
throws AccountNotFoundException;
/**
@@ -74,7 +74,7 @@ public interface PrincipalDatabase
* @param password The password to set for the principal
* @return True on a successful creation
*/
- boolean createPrincipal(Principal principal, String password);
+ boolean createPrincipal(Principal principal, char[] password);
/**
* Delete a principal
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java
index b1ac0e1f00..74c330f606 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java
@@ -93,12 +93,12 @@ public class PropertiesPrincipalDatabase implements PrincipalDatabase
}
}
- public boolean updatePassword(Principal principal, String password) throws AccountNotFoundException
+ public boolean updatePassword(Principal principal, char[] password) throws AccountNotFoundException
{
return false; // updates denied
}
- public boolean createPrincipal(Principal principal, String password)
+ public boolean createPrincipal(Principal principal, char[] password)
{
return false; // updates denied
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 8bb5b622f7..7b65f279be 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -199,11 +199,18 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private final Object _suspensionLock = new Object();
- /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
+ /** Boolean to control immediate prefetch . Records the first call to the dispatcher to prevent further flow(true) */
+ private final AtomicBoolean _firstDispatcher = new AtomicBoolean(true);
+
+ /** System property to enable immediate message prefetching */
+ public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH";
+ /** Immediate message prefetch default */
+ public static final String IMMEDIATE_PREFETCH_DEFAULT = "false";
+
private static final Logger _dispatcherLogger = Logger.getLogger(Dispatcher.class);
- private AtomicBoolean _firstDispatcher = new AtomicBoolean(true);
+ /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
private class Dispatcher extends Thread
{
@@ -1932,20 +1939,19 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
synchronized void startDistpatcherIfNecessary()
{
- if (Boolean.parseBoolean(System.getProperties().getProperty("REGISTER_CONSUMERS_FLOWED", "false")))
+ // If IMMEDIATE_PREFETCH is not set then we need to start fetching
+ if (!Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT)))
{
-// if (!connectionStopped)
+ // We do this now if this is the first call on a started connection
+ if (isSuspended() && _startedAtLeastOnce.get() && _firstDispatcher.getAndSet(false))
{
- if (isSuspended() && _firstDispatcher.getAndSet(false))
+ try
{
- try
- {
- suspendChannel(false);
- }
- catch (AMQException e)
- {
- _logger.info("Suspending channel threw an exception:" + e);
- }
+ suspendChannel(false);
+ }
+ catch (AMQException e)
+ {
+ _logger.info("Suspending channel threw an exception:" + e);
}
}
}
@@ -1998,11 +2004,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
- // The dispatcher will be null if we have just created this session
- // so suspend the channel before we register our consumer so that we don't
- // start prefetching until a receive/mListener is set.
- if (Boolean.parseBoolean(System.getProperties().getProperty("REGISTER_CONSUMERS_FLOWED", "false")))
+ // If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch
+ if (!Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT)))
{
+ // The dispatcher will be null if we have just created this session
+ // so suspend the channel before we register our consumer so that we don't
+ // start prefetching until a receive/mListener is set.
if (_dispatcher == null)
{
if (!isSuspended())
@@ -2010,6 +2017,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
try
{
suspendChannel(true);
+ _logger.info("Prefetching delayed existing messages will not flow until requested via receive*() or setML().");
}
catch (AMQException e)
{
@@ -2018,6 +2026,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
}
+ else
+ {
+ _logger.info("Immediately prefetching existing messages to new consumer.");
+ }
try
{
diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerImmediatePrefetch.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerImmediatePrefetch.java
new file mode 100644
index 0000000000..9e48914431
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerImmediatePrefetch.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.client;
+
+import java.util.Hashtable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.naming.Context;
+import javax.naming.spi.InitialContextFactory;
+
+import junit.framework.TestCase;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
+
+/**
+ * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery
+ * queue <p/> The message delivery process: Mina puts a message on _queue in AMQSession and the dispatcher thread
+ * take()s from here and dispatches to the _consumers. If the _consumer1 doesn't have a message listener set at
+ * connection start then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple
+ * consumers on a session can run in any order and a synchronous put/poll will block the dispatcher). <p/> When setting
+ * the message listener later the _synchronousQueue is just poll()'ed and the first message delivered the remaining
+ * messages will be left on the queue and lost, subsequent messages on the session will arrive first.
+ */
+public class MessageListenerMultiConsumerImmediatePrefetch extends MessageListenerMultiConsumerTest
+{
+
+
+ protected void setUp() throws Exception
+ {
+
+ System.setProperty(AMQSession.IMMEDIATE_PREFETCH, "true");
+ super.setUp();
+
+ }
+
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(MessageListenerMultiConsumerImmediatePrefetch.class);
+ }
+}
diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
index 794fd5c8c1..c9407d8ff6 100644
--- a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
+++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
@@ -62,7 +62,8 @@ public class MessageListenerMultiConsumerTest extends TestCase
private Connection _clientConnection;
private MessageConsumer _consumer1;
private MessageConsumer _consumer2;
-
+ private Session _clientSession1;
+ private Queue _queue;
private final CountDownLatch _allMessagesSent = new CountDownLatch(2); //all messages Sent Lock
@@ -76,25 +77,25 @@ public class MessageListenerMultiConsumerTest extends TestCase
Hashtable<String, String> env = new Hashtable<String, String>();
env.put("connectionfactory.connection", "amqp://guest:guest@MLT_ID/test?brokerlist='vm://:1'");
- env.put("queue.queue", "direct://amq.direct//MessageListenerTest");
+ env.put("queue.queue", "direct://amq.direct//"+this.getClass().getName());
_context = factory.getInitialContext(env);
- Queue queue = (Queue) _context.lookup("queue");
+ _queue = (Queue) _context.lookup("queue");
//Create Client 1
_clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
_clientConnection.start();
- Session clientSession1 = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _clientSession1 = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- _consumer1 = clientSession1.createConsumer(queue);
+ _consumer1 = _clientSession1.createConsumer(_queue);
//Create Client 2
Session clientSession2 = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- _consumer2 = clientSession2.createConsumer(queue);
+ _consumer2 = clientSession2.createConsumer(_queue);
//Create Producer
Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
@@ -104,7 +105,7 @@ public class MessageListenerMultiConsumerTest extends TestCase
Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = producerSession.createProducer(queue);
+ MessageProducer producer = producerSession.createProducer(_queue);
for (int msg = 0; msg < MSG_COUNT; msg++)
{
@@ -123,20 +124,6 @@ public class MessageListenerMultiConsumerTest extends TestCase
TransportConnection.killAllVMBrokers();
}
-// public void testRecieveC1thenC2() throws Exception
-// {
-//
-// for (int msg = 0; msg < MSG_COUNT / 2; msg++)
-// {
-//
-// assertTrue(_consumer1.receive() != null);
-// }
-//
-// for (int msg = 0; msg < MSG_COUNT / 2; msg++)
-// {
-// assertTrue(_consumer2.receive() != null);
-// }
-// }
public void testRecieveInterleaved() throws Exception
{
@@ -206,10 +193,12 @@ public class MessageListenerMultiConsumerTest extends TestCase
assertEquals(MSG_COUNT, receivedCount1 + receivedCount2);
}
- public void testRecieveC2Only_OnlyRunWith_REGISTER_CONSUMERS_FLOWED() throws Exception
+ public void testRecieveC2Only() throws Exception
{
- if (Boolean.parseBoolean(System.getProperties().getProperty("REGISTER_CONSUMERS_FLOWED", "false")))
+ if (!Boolean.parseBoolean(System.getProperties().
+ getProperty(AMQSession.IMMEDIATE_PREFETCH, AMQSession.IMMEDIATE_PREFETCH_DEFAULT)))
{
+ _logger.info("Performing Receive only on C2");
for (int msg = 0; msg < MSG_COUNT; msg++)
{
assertTrue(MSG_COUNT + " msg should be received. Only received:" + msg,
@@ -218,6 +207,43 @@ public class MessageListenerMultiConsumerTest extends TestCase
}
}
+ public void testRecieveBoth() throws Exception
+ {
+ if (!Boolean.parseBoolean(System.getProperties().
+ getProperty(AMQSession.IMMEDIATE_PREFETCH, AMQSession.IMMEDIATE_PREFETCH_DEFAULT)))
+ {
+ _logger.info("Performing Receive only with two consumers on one session ");
+
+ MessageConsumer consumer2 = _clientSession1.createConsumer(_queue);
+
+ for (int msg = 0; msg < MSG_COUNT / 2; msg++)
+ {
+
+ assertTrue(_consumer1.receive() != null);
+ }
+
+ for (int msg = 0; msg < MSG_COUNT / 2; msg++)
+ {
+ assertTrue(consumer2.receive() != null);
+ }
+ }
+ else
+ {
+ _logger.info("Performing Receive only on both C1 and C2");
+
+ for (int msg = 0; msg < MSG_COUNT / 2; msg++)
+ {
+
+ assertTrue(_consumer1.receive() != null);
+ }
+
+ for (int msg = 0; msg < MSG_COUNT / 2; msg++)
+ {
+ assertTrue(_consumer2.receive() != null);
+ }
+ }
+ }
+
public static junit.framework.Test suite()
{
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/close/CloseTests.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/close/CloseTests.java
new file mode 100644
index 0000000000..505af361bc
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/close/CloseTests.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.test.unit.basic.close;
+
+import junit.framework.TestCase;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.url.AMQBindingURL;
+import org.apache.log4j.Logger;
+
+import javax.jms.Session;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+
+public class CloseTests extends TestCase
+{
+
+ private static final Logger _logger = Logger.getLogger(CloseTests.class);
+
+
+ private static final String BROKER = "vm://:1";
+
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ TransportConnection.createVMBroker(1);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.setUp();
+
+ TransportConnection.killVMBroker(1);
+ }
+
+
+ public void testCloseQueueReceiver() throws AMQException, URLSyntaxException, JMSException
+ {
+ AMQConnection connection = new AMQConnection(BROKER, "guest", "guest", this.getName(), "test");
+
+ Session session = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
+
+ AMQQueue queue = new AMQQueue(new AMQBindingURL("test-queue"));
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ MessageProducer producer_not_used_but_created_for_testing = session.createProducer(queue);
+
+ connection.start();
+
+ _logger.info("About to close consumer");
+
+ consumer.close();
+
+ _logger.info("Closed Consumer");
+
+ }
+}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
index 190b3861f0..15cb9678e4 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
@@ -36,9 +36,11 @@ import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.url.URLSyntaxException;
+import org.apache.log4j.Logger;
public class DurableSubscriptionTest extends TestCase
{
+ private static final Logger _logger = Logger.getLogger(DurableSubscriptionTest.class);
protected void setUp() throws Exception
{
@@ -55,41 +57,59 @@ public class DurableSubscriptionTest extends TestCase
public void testUnsubscribe() throws AMQException, JMSException, URLSyntaxException
{
AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "test");
- AMQTopic topic = new AMQTopic(con,"MyTopic");
+ AMQTopic topic = new AMQTopic(con, "MyTopic");
+ _logger.info("Create Session 1");
Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ _logger.info("Create Consumer on Session 1");
MessageConsumer consumer1 = session1.createConsumer(topic);
+ _logger.info("Create Producer on Session 1");
MessageProducer producer = session1.createProducer(topic);
+ _logger.info("Create Session 2");
Session session2 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ _logger.info("Create Durable Subscriber on Session 2");
TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription");
+ _logger.info("Starting connection");
con.start();
+ _logger.info("Producer sending message A");
producer.send(session1.createTextMessage("A"));
Message msg;
+ _logger.info("Receive message on consumer 1:expecting A");
msg = consumer1.receive();
assertEquals("A", ((TextMessage) msg).getText());
+ _logger.info("Receive message on consumer 1 :expecting null");
msg = consumer1.receive(1000);
assertEquals(null, msg);
+
+ _logger.info("Receive message on consumer 1:expecting A");
msg = consumer2.receive();
assertEquals("A", ((TextMessage) msg).getText());
msg = consumer2.receive(1000);
+ _logger.info("Receive message on consumer 1 :expecting null");
assertEquals(null, msg);
+ _logger.info("Unsubscribe session2/consumer2");
session2.unsubscribe("MySubscription");
+ _logger.info("Producer sending message B");
producer.send(session1.createTextMessage("B"));
+ _logger.info("Receive message on consumer 1 :expecting B");
msg = consumer1.receive();
assertEquals("B", ((TextMessage) msg).getText());
+ _logger.info("Receive message on consumer 1 :expecting null");
msg = consumer1.receive(1000);
assertEquals(null, msg);
+ _logger.info("Receive message on consumer 2 :expecting null");
msg = consumer2.receive(1000);
assertEquals(null, msg);
+ _logger.info("Close connection");
con.close();
}
@@ -97,7 +117,7 @@ public class DurableSubscriptionTest extends TestCase
{
AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "test");
- AMQTopic topic = new AMQTopic(con,"MyTopic");
+ AMQTopic topic = new AMQTopic(con, "MyTopic");
Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
MessageConsumer consumer1 = session1.createConsumer(topic);
@@ -129,13 +149,17 @@ public class DurableSubscriptionTest extends TestCase
producer.send(session1.createTextMessage("B"));
+ _logger.info("Receive message on consumer 1 :expecting B");
msg = consumer1.receive(100);
assertEquals("B", ((TextMessage) msg).getText());
+ _logger.info("Receive message on consumer 1 :expecting null");
msg = consumer1.receive(100);
assertEquals(null, msg);
+ _logger.info("Receive message on consumer 3 :expecting B");
msg = consumer3.receive(100);
assertEquals("B", ((TextMessage) msg).getText());
+ _logger.info("Receive message on consumer 3 :expecting null");
msg = consumer3.receive(100);
assertEquals(null, msg);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
index fe7efb4e88..a19687b07c 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
@@ -38,11 +38,11 @@ import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.client.transport.TransportConnection;
-/**
- * @author Apache Software Foundation
- */
+/** @author Apache Software Foundation */
public class TopicSessionTest extends TestCase
{
+ private static final String BROKER = "vm://:1";
+
protected void setUp() throws Exception
{
super.setUp();
@@ -53,17 +53,16 @@ public class TopicSessionTest extends TestCase
{
super.tearDown();
TransportConnection.killAllVMBrokers();
- //Thread.sleep(2000);
}
public void testTopicSubscriptionUnsubscription() throws Exception
{
- AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
- AMQTopic topic = new AMQTopic(con.getDefaultTopicExchangeName(),"MyTopic");
+ AMQConnection con = new AMQConnection(BROKER+"?retries='0'", "guest", "guest", "test", "test");
+ AMQTopic topic = new AMQTopic(con.getDefaultTopicExchangeName(), "MyTopic");
TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
- TopicSubscriber sub = session1.createDurableSubscriber(topic,"subscription0");
+ TopicSubscriber sub = session1.createDurableSubscriber(topic, "subscription0");
TopicPublisher publisher = session1.createPublisher(topic);
con.start();
@@ -81,11 +80,11 @@ public class TopicSessionTest extends TestCase
session1.unsubscribe("not a subscription");
fail("expected InvalidDestinationException when unsubscribing from unknown subscription");
}
- catch(InvalidDestinationException e)
+ catch (InvalidDestinationException e)
{
; // PASS
}
- catch(Exception e)
+ catch (Exception e)
{
fail("expected InvalidDestinationException when unsubscribing from unknown subscription, got: " + e);
}
@@ -106,8 +105,8 @@ public class TopicSessionTest extends TestCase
private void subscriptionNameReuseForDifferentTopic(boolean shutdown) throws Exception
{
AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
- AMQTopic topic = new AMQTopic(con,"MyTopic1" + String.valueOf(shutdown));
- AMQTopic topic2 = new AMQTopic(con,"MyOtherTopic1" + String.valueOf(shutdown));
+ AMQTopic topic = new AMQTopic(con, "MyTopic1" + String.valueOf(shutdown));
+ AMQTopic topic2 = new AMQTopic(con, "MyOtherTopic1" + String.valueOf(shutdown));
TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
TopicSubscriber sub = session1.createDurableSubscriber(topic, "subscription0");
@@ -145,7 +144,7 @@ public class TopicSessionTest extends TestCase
public void testUnsubscriptionAfterConnectionClose() throws Exception
{
AMQConnection con1 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
- AMQTopic topic = new AMQTopic(con1,"MyTopic3");
+ AMQTopic topic = new AMQTopic(con1, "MyTopic3");
TopicSession session1 = con1.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
TopicPublisher publisher = session1.createPublisher(topic);
@@ -176,7 +175,7 @@ public class TopicSessionTest extends TestCase
{
AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
- AMQTopic topic = new AMQTopic(con,"MyTopic4");
+ AMQTopic topic = new AMQTopic(con, "MyTopic4");
TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
TopicPublisher publisher = session1.createPublisher(topic);
MessageConsumer consumer1 = session1.createConsumer(topic);
@@ -226,11 +225,11 @@ public class TopicSessionTest extends TestCase
producer.send(sentMessage);
TextMessage receivedMessage = (TextMessage) consumer.receive(2000);
assertNotNull(receivedMessage);
- assertEquals(sentMessage.getText(),receivedMessage.getText());
+ assertEquals(sentMessage.getText(), receivedMessage.getText());
producer.send(sentMessage);
receivedMessage = (TextMessage) consumer.receive(2000);
assertNotNull(receivedMessage);
- assertEquals(sentMessage.getText(),receivedMessage.getText());
+ assertEquals(sentMessage.getText(), receivedMessage.getText());
conn.close();
@@ -248,14 +247,14 @@ public class TopicSessionTest extends TestCase
producer.send(session.createTextMessage("hello"));
TextMessage tm = (TextMessage) consumer.receive(2000);
assertNotNull(tm);
- assertEquals("hello",tm.getText());
+ assertEquals("hello", tm.getText());
try
{
topic.delete();
fail("Expected JMSException : should not be able to delete while there are active consumers");
}
- catch(JMSException je)
+ catch (JMSException je)
{
; //pass
}
@@ -266,7 +265,7 @@ public class TopicSessionTest extends TestCase
{
topic.delete();
}
- catch(JMSException je)
+ catch (JMSException je)
{
fail("Unexpected Exception: " + je.getMessage());
}
@@ -283,11 +282,92 @@ public class TopicSessionTest extends TestCase
}
-
conn.close();
}
+ public void testNoLocal() throws Exception
+ {
+
+ AMQConnection con = new AMQConnection(BROKER + "?retries='0'", "guest", "guest", "test", "test");
+
+ AMQTopic topic = new AMQTopic(con, "testNoLocal");
+
+ TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+ TopicSubscriber noLocal = session1.createDurableSubscriber(topic, "noLocal", "", true);
+ TopicSubscriber select = session1.createDurableSubscriber(topic, "select", "Selector = 'select'", false);
+ TopicSubscriber normal = session1.createDurableSubscriber(topic, "normal");
+
+ TopicPublisher publisher = session1.createPublisher(topic);
+
+ con.start();
+ TextMessage m;
+ TextMessage message;
+
+ //send message to all consumers
+ publisher.publish(session1.createTextMessage("hello-new2"));
+
+ //test normal subscriber gets message
+ m = (TextMessage) normal.receive(1000);
+ assertNotNull(m);
+
+ //test selector subscriber doesn't message
+ m = (TextMessage) select.receive(1000);
+ assertNull(m);
+
+ //test nolocal subscriber doesn't message
+ m = (TextMessage) noLocal.receive(1000);
+ if (m != null)
+ {
+ System.out.println("Message:" + m.getText());
+ }
+ assertNull(m);
+
+ //send message to all consumers
+ message = session1.createTextMessage("hello2");
+ message.setStringProperty("Selector", "select");
+
+ publisher.publish(message);
+
+ //test normal subscriber gets message
+ m = (TextMessage) normal.receive(1000);
+ assertNotNull(m);
+
+ //test selector subscriber does get message
+ m = (TextMessage) select.receive(100);
+ assertNotNull(m);
+
+ //test nolocal subscriber doesn't message
+ m = (TextMessage) noLocal.receive(100);
+ assertNull(m);
+
+ AMQConnection con2 = new AMQConnection(BROKER + "?retries='0'", "guest", "guest", "test2", "test");
+ TopicSession session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+ TopicPublisher publisher2 = session2.createPublisher(topic);
+
+
+ message = session2.createTextMessage("hello2");
+ message.setStringProperty("Selector", "select");
+
+ publisher2.publish(message);
+
+ //test normal subscriber gets message
+ m = (TextMessage) normal.receive(1000);
+ assertNotNull(m);
+
+ //test selector subscriber does get message
+ m = (TextMessage) select.receive(100);
+ assertNotNull(m);
+
+ //test nolocal subscriber does message
+ m = (TextMessage) noLocal.receive(100);
+ assertNotNull(m);
+
+
+ con.close();
+ con2.close();
+ }
+
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(TopicSessionTest.class);
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
index a5ace41752..42412bebae 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
@@ -102,7 +102,7 @@ class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManage
}
}
- public boolean hasFilters()
+ public boolean filtersMessages()
{
return false;
}
diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/OperationTabControl.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/OperationTabControl.java
index 2ac037e4f0..60d8f7920d 100644
--- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/OperationTabControl.java
+++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/OperationTabControl.java
@@ -31,6 +31,7 @@ import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularDataSupport;
import static org.apache.qpid.management.ui.Constants.*;
+
import org.apache.qpid.management.ui.ApplicationRegistry;
import org.apache.qpid.management.ui.ManagedBean;
import org.apache.qpid.management.ui.jmx.MBeanUtility;
@@ -337,10 +338,17 @@ public class OperationTabControl extends TabControl
// display the parameter data type next to the text field
if (valueInCombo)
+ {
label = _toolkit.createLabel(_paramsComposite, "");
+ }
+ else if (PASSWORD.equalsIgnoreCase(param.getName()))
+ {
+ label = _toolkit.createLabel(_paramsComposite, "(String)");
+ }
else
{
- String str = param.getType() ;
+ String str = param.getType();
+
if (param.getType().lastIndexOf(".") != -1)
str = param.getType().substring(1 + param.getType().lastIndexOf("."));
@@ -581,34 +589,32 @@ public class OperationTabControl extends TabControl
}
// End of custom code
-
- // customized for passwords
- if (PASSWORD.equalsIgnoreCase(param.getName()))
+ ViewUtility.popupInfoMessage(_form.getText(), "Please select the " + ViewUtility.getDisplayText(param.getName()));
+ return;
+ }
+
+ // customized for passwords
+ String securityMechanism = ApplicationRegistry.getSecurityMechanism();
+ if ((MECH_CRAMMD5.equals(securityMechanism)) && PASSWORD.equalsIgnoreCase(param.getName()))
+ {
+ try
{
- try
- {
- param.setValueFromString(ViewUtility.getHashedString(param.getValue()));
- }
- catch (Exception ex)
- {
- MBeanUtility.handleException(_mbean, ex);
- return;
- }
+ param.setValue(ViewUtility.getMD5HashedCharArray(param.getValue()));
+ }
+ catch (Exception ex)
+ {
+ MBeanUtility.handleException(_mbean, ex);
+ return;
}
- // end of customization
- ViewUtility.popupInfoMessage(_form.getText(),
- "Please select the " + ViewUtility.getDisplayText(param.getName()));
-
- return;
}
+ // end of customization
}
}
if (_opData.getImpact() == OPERATION_IMPACT_ACTION)
{
String bean = _mbean.getName() == null ? _mbean.getType() : _mbean.getName();
- int response = ViewUtility.popupConfirmationMessage(bean,
- "Do you want to " + _form.getText()+ " ?");
+ int response = ViewUtility.popupConfirmationMessage(bean, "Do you want to " + _form.getText()+ " ?");
if (response == SWT.YES)
{
executeAndShowResults();
diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/ViewUtility.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/ViewUtility.java
index 9b5cddd342..89ab360937 100644
--- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/ViewUtility.java
+++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/ViewUtility.java
@@ -560,10 +560,26 @@ public class ViewUtility
}
}
- public static String getHashedString(Object text) throws NoSuchAlgorithmException, UnsupportedEncodingException
+ public static char[] getMD5HashedCharArray(Object text) throws NoSuchAlgorithmException, UnsupportedEncodingException
{
- char[] chars = getHash((String)text);
- return new String(chars);
+ byte[] data = ((String)text).getBytes("utf-8");
+
+ MessageDigest md = MessageDigest.getInstance("MD5");
+
+ for (byte b : data)
+ {
+ md.update(b);
+ }
+
+ byte[] digest = md.digest();
+
+ char[] byteArray = new char[digest.length];
+ int index = 0;
+ for (byte b : digest)
+ {
+ byteArray[index++] = (char)b;
+ }
+ return byteArray;
}
public static char[] getHash(String text) throws NoSuchAlgorithmException, UnsupportedEncodingException
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
index 01eb2ba6a2..1a0a341bbf 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
@@ -87,7 +87,7 @@ public class SubscriptionTestHelper implements Subscription
{
}
- public boolean hasFilters()
+ public boolean filtersMessages()
{
return false;
}