diff options
Diffstat (limited to 'java')
24 files changed, 549 insertions, 173 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 2e1653e69d..97c95dae5e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -197,7 +197,6 @@ public class AMQChannel _currentMessage = new AMQMessage(_messageStore.getNewMessageId(), info, _txnContext); - // TODO: used in clustering only I think (RG) _currentMessage.setPublisher(publisher); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java index 146d0566ce..14aa919356 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -70,7 +70,7 @@ public class Main private static final String DEFAULT_CONFIG_FILE = "etc/config.xml"; private static final String DEFAULT_LOG_CONFIG_FILENAME = "log4j.xml"; - + public static final String QPID_HOME = "QPID_HOME"; private static final int IPV4_ADDRESS_LENGTH = 4; private static final char IPV4_LITERAL_SEPARATOR = '.'; @@ -204,7 +204,7 @@ public class Main protected void startup() throws InitException, ConfigurationException, Exception { - final String QpidHome = System.getProperty("QPID_HOME"); + final String QpidHome = System.getProperty(QPID_HOME); final File defaultConfigFile = new File(QpidHome, DEFAULT_CONFIG_FILE); final File configFile = new File(commandLine.getOptionValue("c", defaultConfigFile.getPath())); if (!configFile.exists()) @@ -213,7 +213,7 @@ public class Main if (QpidHome == null) { - error = error + "\nNote: Qpid_HOME is not set."; + error = error + "\nNote: "+QPID_HOME+" is not set."; } throw new InitException(error, null); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index b2046efee3..e19038504f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -80,18 +80,17 @@ public class AMQMessage */ private boolean _immediate; - private AtomicBoolean _taken = new AtomicBoolean(false); + // private Subscription _takenBySubcription; + // private AtomicBoolean _taken = new AtomicBoolean(false); private TransientMessageData _transientMessageData = new TransientMessageData(); - private Subscription _takenBySubcription; + private Set<Subscription> _rejectedBy = null; + + private Map<AMQQueue, AtomicBoolean> _takenMap = new HashMap<AMQQueue, AtomicBoolean>(); private Map<AMQQueue, Subscription> _takenBySubcriptionMap = new HashMap<AMQQueue, Subscription>(); - public boolean isTaken(AMQQueue queue) - { - return _taken.get(); - } private final int hashcode = System.identityHashCode(this); @@ -206,7 +205,7 @@ public class AMQMessage _immediate = info.isImmediate(); _transientMessageData.setMessagePublishInfo(info); - _taken = new AtomicBoolean(false); +// _taken = new AtomicBoolean(false); if (_log.isDebugEnabled()) { @@ -326,7 +325,6 @@ public class AMQMessage for (AMQQueue q : _transientMessageData.getDestinationQueues()) { - _takenMap.put(q, new AtomicBoolean(false)); _messageHandle.enqueue(storeContext, _messageId, q); } @@ -459,17 +457,53 @@ public class AMQMessage return _deliveredToConsumer; } - - public boolean taken(AMQQueue queue, Subscription sub) + public boolean isTaken(AMQQueue queue) { - if (_taken.getAndSet(true)) + //return _taken.get(); + + synchronized (this) { - return true; + AtomicBoolean taken = _takenMap.get(queue); + if (taken == null) + { + taken = new AtomicBoolean(false); + _takenMap.put(queue, taken); + } + + return taken.get(); } - else + } + + public boolean taken(AMQQueue queue, Subscription sub) + { +// if (_taken.getAndSet(true)) +// { +// return true; +// } +// else +// { +// _takenBySubcription = sub; +// return false; +// } + + synchronized (this) { - _takenBySubcription = sub; - return false; + AtomicBoolean taken = _takenMap.get(queue); + if (taken == null) + { + taken = new AtomicBoolean(false); + } + + if (taken.getAndSet(true)) + { + return true; + } + else + { + _takenMap.put(queue, taken); + _takenBySubcriptionMap.put(queue, sub); + return false; + } } } @@ -479,8 +513,26 @@ public class AMQMessage { _log.trace("Releasing Message:" + debugIdentity()); } - _taken.set(false); - _takenBySubcription = null; + +// _taken.set(false); +// _takenBySubcription = null; + + + synchronized (this) + { + AtomicBoolean taken = _takenMap.get(queue); + if (taken == null) + { + taken = new AtomicBoolean(false); + } + else + { + taken.set(false); + } + + _takenMap.put(queue, taken); + _takenBySubcriptionMap.put(queue, null); + } } public boolean checkToken(Object token) @@ -833,16 +885,20 @@ public class AMQMessage public String toString() { - return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " + - _taken + " by :" + _takenBySubcription; +// return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " + +// _taken + " by :" + _takenBySubcription; -// return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " + -// _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString(); + return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " + + _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString(); } public Subscription getDeliveredSubscription(AMQQueue queue) { - return _takenBySubcription; +// return _takenBySubcription; + synchronized (this) + { + return _takenBySubcriptionMap.get(queue); + } } public void reject(Subscription subscription) diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index a418bb8f8a..65d5906d05 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -21,7 +21,6 @@ package org.apache.qpid.server.queue; import java.text.MessageFormat; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; @@ -237,8 +236,10 @@ public class AMQQueue implements Managable, Comparable /** * Returns messages within the given range of message Ids + * * @param fromMessageId * @param toMessageId + * * @return List of messages */ public List<AMQMessage> getMessagesOnTheQueue(long fromMessageId, long toMessageId) @@ -253,6 +254,7 @@ public class AMQQueue implements Managable, Comparable /** * @param messageId + * * @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist. */ public AMQMessage getMessageOnTheQueue(long messageId) @@ -267,10 +269,10 @@ public class AMQQueue implements Managable, Comparable /** * moves messages from this queue to another queue. to do this the approach is following- - setup the queue for - * moving messages (stop the async delivery) - get all the messages available in the given message - * id range - setup the other queue for moving messages (stop the async delivery) - send these - * available messages to the other queue (enqueue in other queue) - Once sending to other Queue is successful, - * remove messages from this queue - remove locks from both queues and start async delivery + * moving messages (stop the async delivery) - get all the messages available in the given message id range - setup + * the other queue for moving messages (stop the async delivery) - send these available messages to the other queue + * (enqueue in other queue) - Once sending to other Queue is successful, remove messages from this queue - remove + * locks from both queues and start async delivery * * @param fromMessageId * @param toMessageId @@ -442,7 +444,7 @@ public class AMQQueue implements Managable, Comparable Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal, this); - if (subscription.hasFilters()) + if (subscription.filtersMessages()) { if (_deliveryMgr.hasQueuedMessages()) { @@ -641,7 +643,7 @@ public class AMQQueue implements Managable, Comparable { _totalMessagesReceived.incrementAndGet(); } - + try { _managedObject.checkForNotification(msg); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 979f692361..1f92cee1df 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -28,7 +28,6 @@ import java.util.Set; import java.util.Collections; import java.util.HashSet; import java.util.concurrent.Executor; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicInteger; @@ -372,7 +371,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { for (Subscription sub : _subscriptions.getSubscriptions()) { - if (!sub.isSuspended() && sub.hasFilters()) + if (!sub.isSuspended() && sub.filtersMessages()) { Queue<AMQMessage> preDeliveryQueue = sub.getPreDeliveryQueue(); for (AMQMessage msg : messageList) @@ -613,6 +612,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _processingThreadName = Thread.currentThread().getName(); } + if (_log.isDebugEnabled()) + { + _log.debug(debugIdentity() + "Running process Queue." + currentStatus()); + } + // Continue to process delivery while we haveSubscribers and messages boolean hasSubscribers = _subscriptions.hasActiveSubscribers(); @@ -633,11 +637,17 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } } + + if (_log.isDebugEnabled()) + { + _log.debug(debugIdentity() + "Done process Queue." + currentStatus()); + } + } // private void sendNextMessage(Subscription sub) // { -// if (sub.hasFilters()) +// if (sub.filtersMessages()) // { // sendNextMessage(sub, sub.getPreDeliveryQueue()); // if (sub.isAutoClose()) @@ -817,6 +827,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager //are we already running? if so, don't re-run if (_processing.compareAndSet(false, true)) { + if (_log.isDebugEnabled()) + { + _log.debug(debugIdentity() + "Executing Async process."); + } executor.execute(asyncDelivery); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java index e9f209839a..e6d5d0c88d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java @@ -32,7 +32,7 @@ public interface Subscription void queueDeleted(AMQQueue queue) throws AMQException; - boolean hasFilters(); + boolean filtersMessages(); boolean hasInterest(AMQMessage msg); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index e3944954f3..3bce950ba9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -158,7 +158,7 @@ public class SubscriptionImpl implements Subscription } - if (_filters != null) + if (filtersMessages()) { _messages = new ConcurrentLinkedMessageQueueAtomicSize<AMQMessage>(); } @@ -346,9 +346,9 @@ public class SubscriptionImpl implements Subscription channel.queueDeleted(queue); } - public boolean hasFilters() + public boolean filtersMessages() { - return _filters != null; + return _filters != null || _noLocal; } public boolean hasInterest(AMQMessage msg) @@ -363,7 +363,10 @@ public class SubscriptionImpl implements Subscription // return false; } - if (_noLocal) + final AMQProtocolSession publisher = msg.getPublisher(); + + //todo - client id should be recoreded and this test removed but handled below + if (_noLocal && publisher != null) { // We don't want local messages so check to see if message is one we sent Object localInstance; @@ -372,8 +375,9 @@ public class SubscriptionImpl implements Subscription if ((protocolSession.getClientProperties() != null) && (localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) { - if ((msg.getPublisher().getClientProperties() != null) && - (msgInstance = msg.getPublisher().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) + + if ((publisher.getClientProperties() != null) && + (msgInstance = publisher.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) { if (localInstance == msgInstance || localInstance.equals(msgInstance)) { @@ -388,8 +392,11 @@ public class SubscriptionImpl implements Subscription } else { + localInstance = protocolSession.getClientIdentifier(); - msgInstance = msg.getPublisher().getClientIdentifier(); + //todo - client id should be recoreded and this test removed but handled here + + msgInstance = publisher.getClientIdentifier(); if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance))) { if (_logger.isTraceEnabled()) @@ -399,7 +406,6 @@ public class SubscriptionImpl implements Subscription } return false; } - } @@ -623,7 +629,7 @@ public class SubscriptionImpl implements Subscription return _resendQueue; } - if (_filters != null) + if (filtersMessages()) { if (isAutoClose()) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java index 26b040aae0..b500247fa4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java @@ -157,7 +157,7 @@ class SubscriptionSet implements WeightedSubscriptionManager //FIXME the queue could be full of sent messages. // Either need to clean all PDQs after sending a message // OR have a clean up thread that runs the PDQs expunging the messages. - if (!subscription.hasFilters() || subscription.getPreDeliveryQueue().isEmpty()) + if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty()) { return subscription; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/AMQUserManagementMBean.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/AMQUserManagementMBean.java index a43474559d..20f123179f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/access/AMQUserManagementMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/AMQUserManagementMBean.java @@ -22,7 +22,6 @@ package org.apache.qpid.server.security.access; import org.apache.qpid.server.management.MBeanDescription; import org.apache.qpid.server.management.AMQManagedObject; -import org.apache.qpid.server.management.MBeanOperationParameter; import org.apache.qpid.server.management.MBeanOperation; import org.apache.qpid.server.management.MBeanInvocationHandlerImpl; import org.apache.qpid.server.security.auth.database.PrincipalDatabase; @@ -107,8 +106,7 @@ public class AMQUserManagementMBean extends AMQManagedObject implements UserMana return UserManagement.TYPE; } - public boolean setPassword(@MBeanOperationParameter(name = "username", description = "Username")String username, - @MBeanOperationParameter(name = "password", description = "Password")String password) + public boolean setPassword(String username, char[] password) { try { @@ -122,10 +120,7 @@ public class AMQUserManagementMBean extends AMQManagedObject implements UserMana } } - public boolean setRights(@MBeanOperationParameter(name = "username", description = "Username")String username, - @MBeanOperationParameter(name = "read", description = "Administration read")boolean read, - @MBeanOperationParameter(name = "write", description = "Administration write")boolean write, - @MBeanOperationParameter(name = "admin", description = "Administration rights")boolean admin) + public boolean setRights(String username, boolean read, boolean write, boolean admin) { if (_accessRights.get(username) == null) @@ -179,11 +174,7 @@ public class AMQUserManagementMBean extends AMQManagedObject implements UserMana return true; } - public boolean createUser(@MBeanOperationParameter(name = "username", description = "Username")String username, - @MBeanOperationParameter(name = "password", description = "Password")String password, - @MBeanOperationParameter(name = "read", description = "Administration read")boolean read, - @MBeanOperationParameter(name = "write", description = "Administration write")boolean write, - @MBeanOperationParameter(name = "admin", description = "Administration rights")boolean admin) + public boolean createUser(String username, char[] password, boolean read, boolean write, boolean admin) { if (_principalDatabase.createPrincipal(new UsernamePrincipal(username), password)) { @@ -195,7 +186,7 @@ public class AMQUserManagementMBean extends AMQManagedObject implements UserMana return false; } - public boolean deleteUser(@MBeanOperationParameter(name = "username", description = "Username")String username) + public boolean deleteUser(String username) { try diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/UserManagement.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/UserManagement.java index 6381213398..ce5e9fa4a7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/access/UserManagement.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/UserManagement.java @@ -45,7 +45,7 @@ public interface UserManagement */ @MBeanOperation(name = "setPassword", description = "Set password for user.") boolean setPassword(@MBeanOperationParameter(name = "username", description = "Username")String username, - @MBeanOperationParameter(name = "password", description = "Password")String password); + @MBeanOperationParameter(name = "password", description = "Password")char[] password); /** * set rights for users with given details @@ -76,7 +76,7 @@ public interface UserManagement */ @MBeanOperation(name = "createUser", description = "Create new user from system.") boolean createUser(@MBeanOperationParameter(name = "username", description = "Username")String username, - @MBeanOperationParameter(name = "password", description = "Password")String password, + @MBeanOperationParameter(name = "password", description = "Password")char[] password, @MBeanOperationParameter(name = "read", description = "Administration read")boolean read, @MBeanOperationParameter(name = "write", description = "Administration write")boolean write, @MBeanOperationParameter(name = "admin", description = "Administration rights")boolean admin); diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java index 956db64d90..cd0a371b48 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java @@ -176,7 +176,7 @@ public class Base64MD5PasswordFilePrincipalDatabase implements PrincipalDatabase } } - public boolean updatePassword(Principal principal, String password) throws AccountNotFoundException + public boolean updatePassword(Principal principal, char[] password) throws AccountNotFoundException { User user = _users.get(principal.getName()); @@ -187,13 +187,10 @@ public class Base64MD5PasswordFilePrincipalDatabase implements PrincipalDatabase try { - - char[] passwd = convertPassword(password); - try { _userUpdate.lock(); - user.setPassword(passwd); + user.setPassword(password); try { @@ -215,7 +212,7 @@ public class Base64MD5PasswordFilePrincipalDatabase implements PrincipalDatabase } } } - catch (UnsupportedEncodingException e) + catch (Exception e) { return false; } @@ -237,23 +234,14 @@ public class Base64MD5PasswordFilePrincipalDatabase implements PrincipalDatabase return passwd; } - public boolean createPrincipal(Principal principal, String password) + public boolean createPrincipal(Principal principal, char[] password) { if (_users.get(principal.getName()) != null) { return false; } - User user; - try - { - user = new User(principal.getName(), convertPassword(password)); - } - catch (UnsupportedEncodingException e) - { - _logger.warn("Unable to encode password:" + e); - return false; - } + User user = new User(principal.getName(), password); try { @@ -598,8 +586,13 @@ public class Base64MD5PasswordFilePrincipalDatabase implements PrincipalDatabase private void encodePassword() throws EncoderException, UnsupportedEncodingException, NoSuchAlgorithmException { - Base64 b64 = new Base64(); - _encodedPassword = b64.encode(new String(_password).getBytes(DEFAULT_ENCODING)); + byte[] byteArray = new byte[_password.length]; + int index = 0; + for (char c : _password) + { + byteArray[index++] = (byte)c; + } + _encodedPassword = (new Base64()).encode(byteArray); } public boolean isModified() diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java index 3f6794aaaf..90d08c963e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java @@ -151,12 +151,12 @@ public class PlainPasswordFilePrincipalDatabase implements PrincipalDatabase return passwd; } - public boolean updatePassword(Principal principal, String password) throws AccountNotFoundException + public boolean updatePassword(Principal principal, char[] password) throws AccountNotFoundException { return false; // updates denied } - public boolean createPrincipal(Principal principal, String password) + public boolean createPrincipal(Principal principal, char[] password) { return false; // updates denied } diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabase.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabase.java index 8073fcc3c6..494d8e0bf4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabase.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabase.java @@ -65,7 +65,7 @@ public interface PrincipalDatabase * @return True if change was successful * @throws AccountNotFoundException If the given principal doesn't exist in the Database */ - boolean updatePassword(Principal principal, String password) + boolean updatePassword(Principal principal, char[] password) throws AccountNotFoundException; /** @@ -74,7 +74,7 @@ public interface PrincipalDatabase * @param password The password to set for the principal * @return True on a successful creation */ - boolean createPrincipal(Principal principal, String password); + boolean createPrincipal(Principal principal, char[] password); /** * Delete a principal diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java index b1ac0e1f00..74c330f606 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java @@ -93,12 +93,12 @@ public class PropertiesPrincipalDatabase implements PrincipalDatabase } } - public boolean updatePassword(Principal principal, String password) throws AccountNotFoundException + public boolean updatePassword(Principal principal, char[] password) throws AccountNotFoundException { return false; // updates denied } - public boolean createPrincipal(Principal principal, String password) + public boolean createPrincipal(Principal principal, char[] password) { return false; // updates denied } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 8bb5b622f7..7b65f279be 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -199,11 +199,18 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private final Object _suspensionLock = new Object(); - /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ + /** Boolean to control immediate prefetch . Records the first call to the dispatcher to prevent further flow(true) */ + private final AtomicBoolean _firstDispatcher = new AtomicBoolean(true); + + /** System property to enable immediate message prefetching */ + public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH"; + /** Immediate message prefetch default */ + public static final String IMMEDIATE_PREFETCH_DEFAULT = "false"; + private static final Logger _dispatcherLogger = Logger.getLogger(Dispatcher.class); - private AtomicBoolean _firstDispatcher = new AtomicBoolean(true); + /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ private class Dispatcher extends Thread { @@ -1932,20 +1939,19 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi synchronized void startDistpatcherIfNecessary() { - if (Boolean.parseBoolean(System.getProperties().getProperty("REGISTER_CONSUMERS_FLOWED", "false"))) + // If IMMEDIATE_PREFETCH is not set then we need to start fetching + if (!Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT))) { -// if (!connectionStopped) + // We do this now if this is the first call on a started connection + if (isSuspended() && _startedAtLeastOnce.get() && _firstDispatcher.getAndSet(false)) { - if (isSuspended() && _firstDispatcher.getAndSet(false)) + try { - try - { - suspendChannel(false); - } - catch (AMQException e) - { - _logger.info("Suspending channel threw an exception:" + e); - } + suspendChannel(false); + } + catch (AMQException e) + { + _logger.info("Suspending channel threw an exception:" + e); } } } @@ -1998,11 +2004,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable()); - // The dispatcher will be null if we have just created this session - // so suspend the channel before we register our consumer so that we don't - // start prefetching until a receive/mListener is set. - if (Boolean.parseBoolean(System.getProperties().getProperty("REGISTER_CONSUMERS_FLOWED", "false"))) + // If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch + if (!Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT))) { + // The dispatcher will be null if we have just created this session + // so suspend the channel before we register our consumer so that we don't + // start prefetching until a receive/mListener is set. if (_dispatcher == null) { if (!isSuspended()) @@ -2010,6 +2017,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi try { suspendChannel(true); + _logger.info("Prefetching delayed existing messages will not flow until requested via receive*() or setML()."); } catch (AMQException e) { @@ -2018,6 +2026,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } } + else + { + _logger.info("Immediately prefetching existing messages to new consumer."); + } try { diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerImmediatePrefetch.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerImmediatePrefetch.java new file mode 100644 index 0000000000..9e48914431 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerImmediatePrefetch.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.client; + +import java.util.Hashtable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.naming.Context; +import javax.naming.spi.InitialContextFactory; + +import junit.framework.TestCase; + +import org.apache.log4j.Logger; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; + +/** + * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery + * queue <p/> The message delivery process: Mina puts a message on _queue in AMQSession and the dispatcher thread + * take()s from here and dispatches to the _consumers. If the _consumer1 doesn't have a message listener set at + * connection start then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple + * consumers on a session can run in any order and a synchronous put/poll will block the dispatcher). <p/> When setting + * the message listener later the _synchronousQueue is just poll()'ed and the first message delivered the remaining + * messages will be left on the queue and lost, subsequent messages on the session will arrive first. + */ +public class MessageListenerMultiConsumerImmediatePrefetch extends MessageListenerMultiConsumerTest +{ + + + protected void setUp() throws Exception + { + + System.setProperty(AMQSession.IMMEDIATE_PREFETCH, "true"); + super.setUp(); + + } + + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(MessageListenerMultiConsumerImmediatePrefetch.class); + } +} diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java index 794fd5c8c1..c9407d8ff6 100644 --- a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java +++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java @@ -62,7 +62,8 @@ public class MessageListenerMultiConsumerTest extends TestCase private Connection _clientConnection; private MessageConsumer _consumer1; private MessageConsumer _consumer2; - + private Session _clientSession1; + private Queue _queue; private final CountDownLatch _allMessagesSent = new CountDownLatch(2); //all messages Sent Lock @@ -76,25 +77,25 @@ public class MessageListenerMultiConsumerTest extends TestCase Hashtable<String, String> env = new Hashtable<String, String>(); env.put("connectionfactory.connection", "amqp://guest:guest@MLT_ID/test?brokerlist='vm://:1'"); - env.put("queue.queue", "direct://amq.direct//MessageListenerTest"); + env.put("queue.queue", "direct://amq.direct//"+this.getClass().getName()); _context = factory.getInitialContext(env); - Queue queue = (Queue) _context.lookup("queue"); + _queue = (Queue) _context.lookup("queue"); //Create Client 1 _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); _clientConnection.start(); - Session clientSession1 = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _clientSession1 = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - _consumer1 = clientSession1.createConsumer(queue); + _consumer1 = _clientSession1.createConsumer(_queue); //Create Client 2 Session clientSession2 = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - _consumer2 = clientSession2.createConsumer(queue); + _consumer2 = clientSession2.createConsumer(_queue); //Create Producer Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); @@ -104,7 +105,7 @@ public class MessageListenerMultiConsumerTest extends TestCase Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = producerSession.createProducer(queue); + MessageProducer producer = producerSession.createProducer(_queue); for (int msg = 0; msg < MSG_COUNT; msg++) { @@ -123,20 +124,6 @@ public class MessageListenerMultiConsumerTest extends TestCase TransportConnection.killAllVMBrokers(); } -// public void testRecieveC1thenC2() throws Exception -// { -// -// for (int msg = 0; msg < MSG_COUNT / 2; msg++) -// { -// -// assertTrue(_consumer1.receive() != null); -// } -// -// for (int msg = 0; msg < MSG_COUNT / 2; msg++) -// { -// assertTrue(_consumer2.receive() != null); -// } -// } public void testRecieveInterleaved() throws Exception { @@ -206,10 +193,12 @@ public class MessageListenerMultiConsumerTest extends TestCase assertEquals(MSG_COUNT, receivedCount1 + receivedCount2); } - public void testRecieveC2Only_OnlyRunWith_REGISTER_CONSUMERS_FLOWED() throws Exception + public void testRecieveC2Only() throws Exception { - if (Boolean.parseBoolean(System.getProperties().getProperty("REGISTER_CONSUMERS_FLOWED", "false"))) + if (!Boolean.parseBoolean(System.getProperties(). + getProperty(AMQSession.IMMEDIATE_PREFETCH, AMQSession.IMMEDIATE_PREFETCH_DEFAULT))) { + _logger.info("Performing Receive only on C2"); for (int msg = 0; msg < MSG_COUNT; msg++) { assertTrue(MSG_COUNT + " msg should be received. Only received:" + msg, @@ -218,6 +207,43 @@ public class MessageListenerMultiConsumerTest extends TestCase } } + public void testRecieveBoth() throws Exception + { + if (!Boolean.parseBoolean(System.getProperties(). + getProperty(AMQSession.IMMEDIATE_PREFETCH, AMQSession.IMMEDIATE_PREFETCH_DEFAULT))) + { + _logger.info("Performing Receive only with two consumers on one session "); + + MessageConsumer consumer2 = _clientSession1.createConsumer(_queue); + + for (int msg = 0; msg < MSG_COUNT / 2; msg++) + { + + assertTrue(_consumer1.receive() != null); + } + + for (int msg = 0; msg < MSG_COUNT / 2; msg++) + { + assertTrue(consumer2.receive() != null); + } + } + else + { + _logger.info("Performing Receive only on both C1 and C2"); + + for (int msg = 0; msg < MSG_COUNT / 2; msg++) + { + + assertTrue(_consumer1.receive() != null); + } + + for (int msg = 0; msg < MSG_COUNT / 2; msg++) + { + assertTrue(_consumer2.receive() != null); + } + } + } + public static junit.framework.Test suite() { diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/close/CloseTests.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/close/CloseTests.java new file mode 100644 index 0000000000..505af361bc --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/close/CloseTests.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.test.unit.basic.close; + +import junit.framework.TestCase; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.AMQException; +import org.apache.qpid.url.URLSyntaxException; +import org.apache.qpid.url.AMQBindingURL; +import org.apache.log4j.Logger; + +import javax.jms.Session; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; + +public class CloseTests extends TestCase +{ + + private static final Logger _logger = Logger.getLogger(CloseTests.class); + + + private static final String BROKER = "vm://:1"; + + + protected void setUp() throws Exception + { + super.setUp(); + + TransportConnection.createVMBroker(1); + } + + protected void tearDown() throws Exception + { + super.setUp(); + + TransportConnection.killVMBroker(1); + } + + + public void testCloseQueueReceiver() throws AMQException, URLSyntaxException, JMSException + { + AMQConnection connection = new AMQConnection(BROKER, "guest", "guest", this.getName(), "test"); + + Session session = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); + + AMQQueue queue = new AMQQueue(new AMQBindingURL("test-queue")); + MessageConsumer consumer = session.createConsumer(queue); + + MessageProducer producer_not_used_but_created_for_testing = session.createProducer(queue); + + connection.start(); + + _logger.info("About to close consumer"); + + consumer.close(); + + _logger.info("Closed Consumer"); + + } +} diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java index 190b3861f0..15cb9678e4 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java @@ -36,9 +36,11 @@ import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.url.URLSyntaxException; +import org.apache.log4j.Logger; public class DurableSubscriptionTest extends TestCase { + private static final Logger _logger = Logger.getLogger(DurableSubscriptionTest.class); protected void setUp() throws Exception { @@ -55,41 +57,59 @@ public class DurableSubscriptionTest extends TestCase public void testUnsubscribe() throws AMQException, JMSException, URLSyntaxException { AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "test"); - AMQTopic topic = new AMQTopic(con,"MyTopic"); + AMQTopic topic = new AMQTopic(con, "MyTopic"); + _logger.info("Create Session 1"); Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE); + _logger.info("Create Consumer on Session 1"); MessageConsumer consumer1 = session1.createConsumer(topic); + _logger.info("Create Producer on Session 1"); MessageProducer producer = session1.createProducer(topic); + _logger.info("Create Session 2"); Session session2 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE); + _logger.info("Create Durable Subscriber on Session 2"); TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription"); + _logger.info("Starting connection"); con.start(); + _logger.info("Producer sending message A"); producer.send(session1.createTextMessage("A")); Message msg; + _logger.info("Receive message on consumer 1:expecting A"); msg = consumer1.receive(); assertEquals("A", ((TextMessage) msg).getText()); + _logger.info("Receive message on consumer 1 :expecting null"); msg = consumer1.receive(1000); assertEquals(null, msg); + + _logger.info("Receive message on consumer 1:expecting A"); msg = consumer2.receive(); assertEquals("A", ((TextMessage) msg).getText()); msg = consumer2.receive(1000); + _logger.info("Receive message on consumer 1 :expecting null"); assertEquals(null, msg); + _logger.info("Unsubscribe session2/consumer2"); session2.unsubscribe("MySubscription"); + _logger.info("Producer sending message B"); producer.send(session1.createTextMessage("B")); + _logger.info("Receive message on consumer 1 :expecting B"); msg = consumer1.receive(); assertEquals("B", ((TextMessage) msg).getText()); + _logger.info("Receive message on consumer 1 :expecting null"); msg = consumer1.receive(1000); assertEquals(null, msg); + _logger.info("Receive message on consumer 2 :expecting null"); msg = consumer2.receive(1000); assertEquals(null, msg); + _logger.info("Close connection"); con.close(); } @@ -97,7 +117,7 @@ public class DurableSubscriptionTest extends TestCase { AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "test"); - AMQTopic topic = new AMQTopic(con,"MyTopic"); + AMQTopic topic = new AMQTopic(con, "MyTopic"); Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE); MessageConsumer consumer1 = session1.createConsumer(topic); @@ -129,13 +149,17 @@ public class DurableSubscriptionTest extends TestCase producer.send(session1.createTextMessage("B")); + _logger.info("Receive message on consumer 1 :expecting B"); msg = consumer1.receive(100); assertEquals("B", ((TextMessage) msg).getText()); + _logger.info("Receive message on consumer 1 :expecting null"); msg = consumer1.receive(100); assertEquals(null, msg); + _logger.info("Receive message on consumer 3 :expecting B"); msg = consumer3.receive(100); assertEquals("B", ((TextMessage) msg).getText()); + _logger.info("Receive message on consumer 3 :expecting null"); msg = consumer3.receive(100); assertEquals(null, msg); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java index fe7efb4e88..a19687b07c 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java @@ -38,11 +38,11 @@ import org.apache.qpid.client.AMQTopic; import org.apache.qpid.client.transport.TransportConnection; -/** - * @author Apache Software Foundation - */ +/** @author Apache Software Foundation */ public class TopicSessionTest extends TestCase { + private static final String BROKER = "vm://:1"; + protected void setUp() throws Exception { super.setUp(); @@ -53,17 +53,16 @@ public class TopicSessionTest extends TestCase { super.tearDown(); TransportConnection.killAllVMBrokers(); - //Thread.sleep(2000); } public void testTopicSubscriptionUnsubscription() throws Exception { - AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test"); - AMQTopic topic = new AMQTopic(con.getDefaultTopicExchangeName(),"MyTopic"); + AMQConnection con = new AMQConnection(BROKER+"?retries='0'", "guest", "guest", "test", "test"); + AMQTopic topic = new AMQTopic(con.getDefaultTopicExchangeName(), "MyTopic"); TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); - TopicSubscriber sub = session1.createDurableSubscriber(topic,"subscription0"); + TopicSubscriber sub = session1.createDurableSubscriber(topic, "subscription0"); TopicPublisher publisher = session1.createPublisher(topic); con.start(); @@ -81,11 +80,11 @@ public class TopicSessionTest extends TestCase session1.unsubscribe("not a subscription"); fail("expected InvalidDestinationException when unsubscribing from unknown subscription"); } - catch(InvalidDestinationException e) + catch (InvalidDestinationException e) { ; // PASS } - catch(Exception e) + catch (Exception e) { fail("expected InvalidDestinationException when unsubscribing from unknown subscription, got: " + e); } @@ -106,8 +105,8 @@ public class TopicSessionTest extends TestCase private void subscriptionNameReuseForDifferentTopic(boolean shutdown) throws Exception { AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test"); - AMQTopic topic = new AMQTopic(con,"MyTopic1" + String.valueOf(shutdown)); - AMQTopic topic2 = new AMQTopic(con,"MyOtherTopic1" + String.valueOf(shutdown)); + AMQTopic topic = new AMQTopic(con, "MyTopic1" + String.valueOf(shutdown)); + AMQTopic topic2 = new AMQTopic(con, "MyOtherTopic1" + String.valueOf(shutdown)); TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); TopicSubscriber sub = session1.createDurableSubscriber(topic, "subscription0"); @@ -145,7 +144,7 @@ public class TopicSessionTest extends TestCase public void testUnsubscriptionAfterConnectionClose() throws Exception { AMQConnection con1 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test"); - AMQTopic topic = new AMQTopic(con1,"MyTopic3"); + AMQTopic topic = new AMQTopic(con1, "MyTopic3"); TopicSession session1 = con1.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); TopicPublisher publisher = session1.createPublisher(topic); @@ -176,7 +175,7 @@ public class TopicSessionTest extends TestCase { AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test"); - AMQTopic topic = new AMQTopic(con,"MyTopic4"); + AMQTopic topic = new AMQTopic(con, "MyTopic4"); TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); TopicPublisher publisher = session1.createPublisher(topic); MessageConsumer consumer1 = session1.createConsumer(topic); @@ -226,11 +225,11 @@ public class TopicSessionTest extends TestCase producer.send(sentMessage); TextMessage receivedMessage = (TextMessage) consumer.receive(2000); assertNotNull(receivedMessage); - assertEquals(sentMessage.getText(),receivedMessage.getText()); + assertEquals(sentMessage.getText(), receivedMessage.getText()); producer.send(sentMessage); receivedMessage = (TextMessage) consumer.receive(2000); assertNotNull(receivedMessage); - assertEquals(sentMessage.getText(),receivedMessage.getText()); + assertEquals(sentMessage.getText(), receivedMessage.getText()); conn.close(); @@ -248,14 +247,14 @@ public class TopicSessionTest extends TestCase producer.send(session.createTextMessage("hello")); TextMessage tm = (TextMessage) consumer.receive(2000); assertNotNull(tm); - assertEquals("hello",tm.getText()); + assertEquals("hello", tm.getText()); try { topic.delete(); fail("Expected JMSException : should not be able to delete while there are active consumers"); } - catch(JMSException je) + catch (JMSException je) { ; //pass } @@ -266,7 +265,7 @@ public class TopicSessionTest extends TestCase { topic.delete(); } - catch(JMSException je) + catch (JMSException je) { fail("Unexpected Exception: " + je.getMessage()); } @@ -283,11 +282,92 @@ public class TopicSessionTest extends TestCase } - conn.close(); } + public void testNoLocal() throws Exception + { + + AMQConnection con = new AMQConnection(BROKER + "?retries='0'", "guest", "guest", "test", "test"); + + AMQTopic topic = new AMQTopic(con, "testNoLocal"); + + TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); + TopicSubscriber noLocal = session1.createDurableSubscriber(topic, "noLocal", "", true); + TopicSubscriber select = session1.createDurableSubscriber(topic, "select", "Selector = 'select'", false); + TopicSubscriber normal = session1.createDurableSubscriber(topic, "normal"); + + TopicPublisher publisher = session1.createPublisher(topic); + + con.start(); + TextMessage m; + TextMessage message; + + //send message to all consumers + publisher.publish(session1.createTextMessage("hello-new2")); + + //test normal subscriber gets message + m = (TextMessage) normal.receive(1000); + assertNotNull(m); + + //test selector subscriber doesn't message + m = (TextMessage) select.receive(1000); + assertNull(m); + + //test nolocal subscriber doesn't message + m = (TextMessage) noLocal.receive(1000); + if (m != null) + { + System.out.println("Message:" + m.getText()); + } + assertNull(m); + + //send message to all consumers + message = session1.createTextMessage("hello2"); + message.setStringProperty("Selector", "select"); + + publisher.publish(message); + + //test normal subscriber gets message + m = (TextMessage) normal.receive(1000); + assertNotNull(m); + + //test selector subscriber does get message + m = (TextMessage) select.receive(100); + assertNotNull(m); + + //test nolocal subscriber doesn't message + m = (TextMessage) noLocal.receive(100); + assertNull(m); + + AMQConnection con2 = new AMQConnection(BROKER + "?retries='0'", "guest", "guest", "test2", "test"); + TopicSession session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); + TopicPublisher publisher2 = session2.createPublisher(topic); + + + message = session2.createTextMessage("hello2"); + message.setStringProperty("Selector", "select"); + + publisher2.publish(message); + + //test normal subscriber gets message + m = (TextMessage) normal.receive(1000); + assertNotNull(m); + + //test selector subscriber does get message + m = (TextMessage) select.receive(100); + assertNotNull(m); + + //test nolocal subscriber does message + m = (TextMessage) noLocal.receive(100); + assertNotNull(m); + + + con.close(); + con2.close(); + } + public static junit.framework.Test suite() { return new junit.framework.TestSuite(TopicSessionTest.class); diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java index a5ace41752..42412bebae 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java @@ -102,7 +102,7 @@ class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManage } } - public boolean hasFilters() + public boolean filtersMessages() { return false; } diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/OperationTabControl.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/OperationTabControl.java index 2ac037e4f0..60d8f7920d 100644 --- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/OperationTabControl.java +++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/OperationTabControl.java @@ -31,6 +31,7 @@ import javax.management.openmbean.CompositeData; import javax.management.openmbean.TabularDataSupport; import static org.apache.qpid.management.ui.Constants.*; + import org.apache.qpid.management.ui.ApplicationRegistry; import org.apache.qpid.management.ui.ManagedBean; import org.apache.qpid.management.ui.jmx.MBeanUtility; @@ -337,10 +338,17 @@ public class OperationTabControl extends TabControl // display the parameter data type next to the text field if (valueInCombo) + { label = _toolkit.createLabel(_paramsComposite, ""); + } + else if (PASSWORD.equalsIgnoreCase(param.getName())) + { + label = _toolkit.createLabel(_paramsComposite, "(String)"); + } else { - String str = param.getType() ; + String str = param.getType(); + if (param.getType().lastIndexOf(".") != -1) str = param.getType().substring(1 + param.getType().lastIndexOf(".")); @@ -581,34 +589,32 @@ public class OperationTabControl extends TabControl } // End of custom code - - // customized for passwords - if (PASSWORD.equalsIgnoreCase(param.getName())) + ViewUtility.popupInfoMessage(_form.getText(), "Please select the " + ViewUtility.getDisplayText(param.getName())); + return; + } + + // customized for passwords + String securityMechanism = ApplicationRegistry.getSecurityMechanism(); + if ((MECH_CRAMMD5.equals(securityMechanism)) && PASSWORD.equalsIgnoreCase(param.getName())) + { + try { - try - { - param.setValueFromString(ViewUtility.getHashedString(param.getValue())); - } - catch (Exception ex) - { - MBeanUtility.handleException(_mbean, ex); - return; - } + param.setValue(ViewUtility.getMD5HashedCharArray(param.getValue())); + } + catch (Exception ex) + { + MBeanUtility.handleException(_mbean, ex); + return; } - // end of customization - ViewUtility.popupInfoMessage(_form.getText(), - "Please select the " + ViewUtility.getDisplayText(param.getName())); - - return; } + // end of customization } } if (_opData.getImpact() == OPERATION_IMPACT_ACTION) { String bean = _mbean.getName() == null ? _mbean.getType() : _mbean.getName(); - int response = ViewUtility.popupConfirmationMessage(bean, - "Do you want to " + _form.getText()+ " ?"); + int response = ViewUtility.popupConfirmationMessage(bean, "Do you want to " + _form.getText()+ " ?"); if (response == SWT.YES) { executeAndShowResults(); diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/ViewUtility.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/ViewUtility.java index 9b5cddd342..89ab360937 100644 --- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/ViewUtility.java +++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/ViewUtility.java @@ -560,10 +560,26 @@ public class ViewUtility } } - public static String getHashedString(Object text) throws NoSuchAlgorithmException, UnsupportedEncodingException + public static char[] getMD5HashedCharArray(Object text) throws NoSuchAlgorithmException, UnsupportedEncodingException { - char[] chars = getHash((String)text); - return new String(chars); + byte[] data = ((String)text).getBytes("utf-8"); + + MessageDigest md = MessageDigest.getInstance("MD5"); + + for (byte b : data) + { + md.update(b); + } + + byte[] digest = md.digest(); + + char[] byteArray = new char[digest.length]; + int index = 0; + for (byte b : digest) + { + byteArray[index++] = (char)b; + } + return byteArray; } public static char[] getHash(String text) throws NoSuchAlgorithmException, UnsupportedEncodingException diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java index 01eb2ba6a2..1a0a341bbf 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java @@ -87,7 +87,7 @@ public class SubscriptionTestHelper implements Subscription { } - public boolean hasFilters() + public boolean filtersMessages() { return false; } |
