diff options
| author | Robert Greig <rgreig@apache.org> | 2007-01-08 17:02:26 +0000 |
|---|---|---|
| committer | Robert Greig <rgreig@apache.org> | 2007-01-08 17:02:26 +0000 |
| commit | d6b4e65f3fd1ff4a2763f8068cd6b3f7fe0b84e0 (patch) | |
| tree | f0c608bcb9e4e5af6cd7ca5245401d2d1716b4f3 /java/broker/src/main | |
| parent | 61350c8523e2edca63d8a9ab2c970ad8607d4c0a (diff) | |
| download | qpid-python-d6b4e65f3fd1ff4a2763f8068cd6b3f7fe0b84e0.tar.gz | |
QPID-255 : Patch Supplied by Rob Godfrey - Change to use bespoke AMQShortString rather than converting to String
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@494121 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/main')
48 files changed, 246 insertions, 233 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java index 509f57be7f..d7326b4c64 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java @@ -31,6 +31,7 @@ import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; import javax.management.JMException; import javax.management.MBeanException; @@ -81,10 +82,10 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr { synchronized (_exchangeRegistry) { - Exchange exchange = _exchangeRegistry.getExchange(exchangeName); + Exchange exchange = _exchangeRegistry.getExchange(new AMQShortString(exchangeName)); if (exchange == null) { - exchange = _exchangeFactory.createExchange(exchangeName, type, durable, autoDelete, 0); + exchange = _exchangeFactory.createExchange(new AMQShortString(exchangeName), new AMQShortString(type), durable, autoDelete, 0); _exchangeRegistry.registerExchange(exchange); } else @@ -114,7 +115,7 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr // when there are no bindings. try { - _exchangeRegistry.unregisterExchange(exchangeName, false); + _exchangeRegistry.unregisterExchange(new AMQShortString(exchangeName), false); } catch (AMQException ex) { @@ -135,7 +136,7 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr public void createNewQueue(String queueName, boolean durable, String owner, boolean autoDelete) throws JMException { - AMQQueue queue = _queueRegistry.getQueue(queueName); + AMQQueue queue = _queueRegistry.getQueue(new AMQShortString(queueName)); if (queue != null) { throw new JMException("The queue \"" + queueName + "\" already exists."); @@ -143,7 +144,7 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr try { - queue = new AMQQueue(queueName, durable, owner, autoDelete, _queueRegistry); + queue = new AMQQueue(new AMQShortString(queueName), durable, new AMQShortString(owner), autoDelete, _queueRegistry); if (queue.isDurable() && !queue.isAutoDelete()) { _messageStore.createQueue(queue); @@ -164,7 +165,7 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr */ public void deleteQueue(String queueName) throws JMException { - AMQQueue queue = _queueRegistry.getQueue(queueName); + AMQQueue queue = _queueRegistry.getQueue(new AMQShortString(queueName)); if (queue == null) { throw new JMException("The Queue " + queueName + " is not a registerd queue."); diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 999eb9f651..799b085fb2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -22,10 +22,7 @@ package org.apache.qpid.server; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicPublishBody; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.*; import org.apache.qpid.server.ack.UnacknowledgedMessage; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl; @@ -88,7 +85,7 @@ public class AMQChannel /** * Maps from consumer tag to queue instance. Allows us to unsubscribe from a queue. */ - private final Map<String, AMQQueue> _consumerTag2QueueMap = new TreeMap<String, AMQQueue>(); + private final Map<AMQShortString, AMQQueue> _consumerTag2QueueMap = new HashMap<AMQShortString, AMQQueue>(); private final MessageStore _messageStore; @@ -270,12 +267,12 @@ public class AMQChannel * @throws ConsumerTagNotUniqueException if the tag is not unique * @throws AMQException if something goes wrong */ - public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks, + public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, AMQProtocolSession session, boolean acks, FieldTable filters, boolean noLocal) throws AMQException, ConsumerTagNotUniqueException { if (tag == null) { - tag = "sgen_" + getNextConsumerTag(); + tag = new AMQShortString("sgen_" + getNextConsumerTag()); } if (_consumerTag2QueueMap.containsKey(tag)) { @@ -288,7 +285,7 @@ public class AMQChannel } - public void unsubscribeConsumer(AMQProtocolSession session, String consumerTag) throws AMQException + public void unsubscribeConsumer(AMQProtocolSession session, AMQShortString consumerTag) throws AMQException { AMQQueue q = _consumerTag2QueueMap.remove(consumerTag); if (q != null) @@ -312,7 +309,7 @@ public class AMQChannel private void unsubscribeAllConsumers(AMQProtocolSession session) throws AMQException { _log.info("Unsubscribing all consumers on channel " + toString()); - for (Map.Entry<String, AMQQueue> me : _consumerTag2QueueMap.entrySet()) + for (Map.Entry<AMQShortString, AMQQueue> me : _consumerTag2QueueMap.entrySet()) { me.getValue().unregisterProtocolSession(session, _channelId, me.getKey()); } @@ -327,7 +324,7 @@ public class AMQChannel * the delivery tag) * @param queue the queue from which the message was delivered */ - public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, String consumerTag, AMQQueue queue) + public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, AMQShortString consumerTag, AMQQueue queue) { _unacknowledgedMessageMap.add(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag, deliveryTag)); checkSuspension(); @@ -362,7 +359,7 @@ public class AMQChannel public boolean callback(UnacknowledgedMessage message) throws AMQException { long deliveryTag = message.deliveryTag; - String consumerTag = message.consumerTag; + AMQShortString consumerTag = message.consumerTag; AMQMessage msg = message.message; msg.setRedelivered(true); msg.writeDeliver(session, _channelId, deliveryTag, consumerTag); @@ -437,7 +434,7 @@ public class AMQChannel return _unacknowledgedMessageMap; } - public void addUnacknowledgedBrowsedMessage(AMQMessage msg, long deliveryTag, String consumerTag, AMQQueue queue) + public void addUnacknowledgedBrowsedMessage(AMQMessage msg, long deliveryTag, AMQShortString consumerTag, AMQQueue queue) { _browsedAcks.add(deliveryTag); addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); @@ -524,7 +521,7 @@ public class AMQChannel for (RequiredDeliveryException bouncedMessage : _returnMessages) { AMQMessage message = bouncedMessage.getAMQMessage(); - message.writeReturn(session, _channelId, bouncedMessage.getReplyCode(), bouncedMessage.getMessage()); + message.writeReturn(session, _channelId, bouncedMessage.getReplyCode(), new AMQShortString(bouncedMessage.getMessage())); } _returnMessages.clear(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java index 26f41e19af..ac390718c6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.ack; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.StoreContext; @@ -28,11 +29,11 @@ import org.apache.qpid.server.store.StoreContext; public class UnacknowledgedMessage { public final AMQMessage message; - public final String consumerTag; + public final AMQShortString consumerTag; public final long deliveryTag; public AMQQueue queue; - public UnacknowledgedMessage(AMQQueue queue, AMQMessage message, String consumerTag, long deliveryTag) + public UnacknowledgedMessage(AMQQueue queue, AMQMessage message, AMQShortString consumerTag, long deliveryTag) { this.queue = queue; this.message = message; diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java index 1f4333549a..0677494134 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java @@ -24,6 +24,7 @@ import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; import java.util.*; @@ -180,7 +181,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap for (Map.Entry<Long, UnacknowledgedMessage> entry : _map.entrySet()) { long deliveryTag = entry.getKey(); - String consumerTag = entry.getValue().consumerTag; + AMQShortString consumerTag = entry.getValue().consumerTag; AMQMessage msg = entry.getValue().message; msg.writeDeliver(protocolSession, channelId, deliveryTag, consumerTag); diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java index 9ecbf3d31a..7e807304c8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java @@ -29,6 +29,7 @@ import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; import org.apache.log4j.Logger; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.XMLConfiguration; @@ -157,7 +158,7 @@ public class VirtualHostConfiguration private void bind(AMQBindingURL binding) throws AMQException, ConfigurationException { - String queueName = binding.getQueueName(); + AMQShortString queueName = binding.getQueueName(); // This will occur if the URL is a Topic if (queueName == null) diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index d5ca567308..94c792c358 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.exchange; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; @@ -31,7 +32,7 @@ import javax.management.ObjectName; public abstract class AbstractExchange implements Exchange, Managable { - private String _name; + private AMQShortString _name; protected boolean _durable; protected String _exchangeType; @@ -58,12 +59,12 @@ public abstract class AbstractExchange implements Exchange, Managable public String getObjectInstanceName() { - return _name; + return _name.toString(); } public String getName() { - return _name; + return _name.toString(); } public String getExchangeType() @@ -95,7 +96,7 @@ public abstract class AbstractExchange implements Exchange, Managable } // End of MBean class - public String getName() + public AMQShortString getName() { return _name; } @@ -107,7 +108,7 @@ public abstract class AbstractExchange implements Exchange, Managable */ protected abstract ExchangeMBean createMBean() throws AMQException; - public void initialise(String name, boolean durable, int ticket, boolean autoDelete) throws AMQException + public void initialise(AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException { _name = name; _durable = durable; diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java index 0c73e0f9f0..222cd2aef2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java @@ -22,6 +22,8 @@ package org.apache.qpid.server.exchange; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; import java.util.HashMap; import java.util.Map; @@ -30,16 +32,16 @@ public class DefaultExchangeFactory implements ExchangeFactory { private static final Logger _logger = Logger.getLogger(DefaultExchangeFactory.class); - private Map<String, Class<? extends Exchange>> _exchangeClassMap = new HashMap<String, Class<? extends Exchange>>(); + private Map<AMQShortString, Class<? extends Exchange>> _exchangeClassMap = new HashMap<AMQShortString, Class<? extends Exchange>>(); public DefaultExchangeFactory() { - _exchangeClassMap.put("direct", org.apache.qpid.server.exchange.DestNameExchange.class); - _exchangeClassMap.put("topic", org.apache.qpid.server.exchange.DestWildExchange.class); - _exchangeClassMap.put("headers", org.apache.qpid.server.exchange.HeadersExchange.class); + _exchangeClassMap.put(ExchangeDefaults.DIRECT_EXCHANGE_CLASS, org.apache.qpid.server.exchange.DestNameExchange.class); + _exchangeClassMap.put(ExchangeDefaults.TOPIC_EXCHANGE_CLASS, org.apache.qpid.server.exchange.DestWildExchange.class); + _exchangeClassMap.put(ExchangeDefaults.HEADERS_EXCHANGE_CLASS, org.apache.qpid.server.exchange.HeadersExchange.class); } - public Exchange createExchange(String exchange, String type, boolean durable, boolean autoDelete, + public Exchange createExchange(AMQShortString exchange, AMQShortString type, boolean durable, boolean autoDelete, int ticket) throws AMQException { diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java index 99c08ad200..cadcd22001 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.exchange; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.protocol.ExchangeInitialiser; import org.apache.qpid.server.queue.AMQMessage; @@ -35,7 +36,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry /** * Maps from exchange name to exchange instance */ - private ConcurrentMap<String, Exchange> _exchangeMap = new ConcurrentHashMap<String, Exchange>(); + private ConcurrentMap<AMQShortString, Exchange> _exchangeMap = new ConcurrentHashMap<AMQShortString, Exchange>(); public DefaultExchangeRegistry(ExchangeFactory exchangeFactory) { @@ -55,7 +56,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry _exchangeMap.put(exchange.getName(), exchange); } - public void unregisterExchange(String name, boolean inUse) throws AMQException + public void unregisterExchange(AMQShortString name, boolean inUse) throws AMQException { // TODO: check inUse argument Exchange e = _exchangeMap.remove(name); @@ -69,7 +70,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry } } - public Exchange getExchange(String name) + public Exchange getExchange(AMQShortString name) { return _exchangeMap.get(name); } @@ -81,7 +82,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry */ public void routeContent(AMQMessage payload) throws AMQException { - final String exchange = payload.getPublishBody().exchange; + final AMQShortString exchange = payload.getPublishBody().exchange; final Exchange exch = _exchangeMap.get(exchange); // there is a small window of opportunity for the exchange to be deleted in between // the BasicPublish being received (where the exchange is validated) and the final diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java index 7b28161263..dc65297615 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java @@ -24,6 +24,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.management.MBeanConstructor; import org.apache.qpid.server.management.MBeanDescription; import org.apache.qpid.server.queue.AMQMessage; @@ -83,21 +84,21 @@ public class DestNameExchange extends AbstractExchange public TabularData bindings() throws OpenDataException { - Map<String, List<AMQQueue>> bindings = _index.getBindingsMap(); + Map<AMQShortString, List<AMQQueue>> bindings = _index.getBindingsMap(); _bindingList = new TabularDataSupport(_bindinglistDataType); - for (Map.Entry<String, List<AMQQueue>> entry : bindings.entrySet()) + for (Map.Entry<AMQShortString, List<AMQQueue>> entry : bindings.entrySet()) { - String key = entry.getKey(); + AMQShortString key = entry.getKey(); List<String> queueList = new ArrayList<String>(); List<AMQQueue> queues = entry.getValue(); for (AMQQueue q : queues) { - queueList.add(q.getName()); + queueList.add(q.getName().toString()); } - Object[] bindingItemValues = {key, queueList.toArray(new String[0])}; + Object[] bindingItemValues = {key.toString(), queueList.toArray(new String[0])}; CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues); _bindingList.put(bindingData); } @@ -107,7 +108,7 @@ public class DestNameExchange extends AbstractExchange public void createNewBinding(String queueName, String binding) throws JMException { - AMQQueue queue = ApplicationRegistry.getInstance().getQueueRegistry().getQueue(queueName); + AMQQueue queue = ApplicationRegistry.getInstance().getQueueRegistry().getQueue(new AMQShortString(queueName)); if (queue == null) { throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange."); @@ -115,8 +116,8 @@ public class DestNameExchange extends AbstractExchange try { - registerQueue(binding, queue, null); - queue.bind(binding, DestNameExchange.this); + registerQueue(new AMQShortString(binding), queue, null); + queue.bind(new AMQShortString(binding), DestNameExchange.this); } catch (AMQException ex) { @@ -140,7 +141,7 @@ public class DestNameExchange extends AbstractExchange } } - public void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException + public void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException { assert queue != null; assert routingKey != null; @@ -154,7 +155,7 @@ public class DestNameExchange extends AbstractExchange } } - public void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException + public void deregisterQueue(AMQShortString routingKey, AMQQueue queue) throws AMQException { assert queue != null; assert routingKey != null; @@ -169,7 +170,7 @@ public class DestNameExchange extends AbstractExchange public void route(AMQMessage payload) throws AMQException { final BasicPublishBody publishBody = payload.getPublishBody(); - final String routingKey = publishBody.routingKey; + final AMQShortString routingKey = publishBody.routingKey; final List<AMQQueue> queues = (routingKey == null) ? null : _index.get(routingKey); if (queues == null || queues.isEmpty()) { @@ -197,13 +198,13 @@ public class DestNameExchange extends AbstractExchange } } - public boolean isBound(String routingKey, AMQQueue queue) throws AMQException + public boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException { final List<AMQQueue> queues = _index.get(routingKey); return queues != null && queues.contains(queue); } - public boolean isBound(String routingKey) throws AMQException + public boolean isBound(AMQShortString routingKey) throws AMQException { final List<AMQQueue> queues = _index.get(routingKey); return queues != null && !queues.isEmpty(); @@ -211,7 +212,7 @@ public class DestNameExchange extends AbstractExchange public boolean isBound(AMQQueue queue) throws AMQException { - Map<String, List<AMQQueue>> bindings = _index.getBindingsMap(); + Map<AMQShortString, List<AMQQueue>> bindings = _index.getBindingsMap(); for (List<AMQQueue> queues : bindings.values()) { if (queues.contains(queue)) diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java index c341f30ab6..179dc0e9ef 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java @@ -24,6 +24,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.management.MBeanConstructor; import org.apache.qpid.server.management.MBeanDescription; import org.apache.qpid.server.queue.AMQMessage; @@ -43,7 +44,7 @@ public class DestWildExchange extends AbstractExchange { private static final Logger _logger = Logger.getLogger(DestWildExchange.class); - private ConcurrentHashMap<String, List<AMQQueue>> _routingKey2queues = new ConcurrentHashMap<String, List<AMQQueue>>(); + private ConcurrentHashMap<AMQShortString, List<AMQQueue>> _routingKey2queues = new ConcurrentHashMap<AMQShortString, List<AMQQueue>>(); /** * DestWildExchangeMBean class implements the management interface for the @@ -87,18 +88,18 @@ public class DestWildExchange extends AbstractExchange public TabularData bindings() throws OpenDataException { _bindingList = new TabularDataSupport(_bindinglistDataType); - for (Map.Entry<String, List<AMQQueue>> entry : _routingKey2queues.entrySet()) + for (Map.Entry<AMQShortString, List<AMQQueue>> entry : _routingKey2queues.entrySet()) { - String key = entry.getKey(); + AMQShortString key = entry.getKey(); List<String> queueList = new ArrayList<String>(); List<AMQQueue> queues = entry.getValue(); for (AMQQueue q : queues) { - queueList.add(q.getName()); + queueList.add(q.getName().toString()); } - Object[] bindingItemValues = {key, queueList.toArray(new String[0])}; + Object[] bindingItemValues = {key.toString(), queueList.toArray(new String[0])}; CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues); _bindingList.put(bindingData); } @@ -108,14 +109,14 @@ public class DestWildExchange extends AbstractExchange public void createNewBinding(String queueName, String binding) throws JMException { - AMQQueue queue = ApplicationRegistry.getInstance().getQueueRegistry().getQueue(queueName); + AMQQueue queue = ApplicationRegistry.getInstance().getQueueRegistry().getQueue(new AMQShortString(queueName)); if (queue == null) throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange."); try { - registerQueue(binding, queue, null); - queue.bind(binding, DestWildExchange.this); + registerQueue(new AMQShortString(binding), queue, null); + queue.bind(new AMQShortString(binding), DestWildExchange.this); } catch (AMQException ex) { @@ -126,7 +127,7 @@ public class DestWildExchange extends AbstractExchange } // End of MBean class - public synchronized void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException + public synchronized void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException { assert queue != null; assert routingKey != null; @@ -154,7 +155,7 @@ public class DestWildExchange extends AbstractExchange { BasicPublishBody publishBody = payload.getPublishBody(); - final String routingKey = publishBody.routingKey; + final AMQShortString routingKey = publishBody.routingKey; List<AMQQueue> queues = _routingKey2queues.get(routingKey); // if we have no registered queues we have nothing to do // TODO: add support for the immediate flag @@ -175,14 +176,14 @@ public class DestWildExchange extends AbstractExchange } } - public boolean isBound(String routingKey, AMQQueue queue) throws AMQException + public boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException { List<AMQQueue> queues = _routingKey2queues.get(routingKey); return queues != null && queues.contains(queue); } - public boolean isBound(String routingKey) throws AMQException + public boolean isBound(AMQShortString routingKey) throws AMQException { List<AMQQueue> queues = _routingKey2queues.get(routingKey); return queues != null && !queues.isEmpty(); @@ -205,7 +206,7 @@ public class DestWildExchange extends AbstractExchange return !_routingKey2queues.isEmpty(); } - public synchronized void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException + public synchronized void deregisterQueue(AMQShortString routingKey, AMQQueue queue) throws AMQException { assert queue != null; assert routingKey != null; diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java index 8ef5f0ab29..7ba9ddd5a8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java @@ -22,14 +22,15 @@ package org.apache.qpid.server.exchange; import org.apache.qpid.AMQException; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.AMQMessage; public interface Exchange { - String getName(); + AMQShortString getName(); - void initialise(String name, boolean durable, int ticket, boolean autoDelete) throws AMQException; + void initialise(AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException; boolean isDurable(); @@ -42,9 +43,9 @@ public interface Exchange void close() throws AMQException; - void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException; + void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException; - void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException; + void deregisterQueue(AMQShortString routingKey, AMQQueue queue) throws AMQException; void route(AMQMessage message) throws AMQException; @@ -55,7 +56,7 @@ public interface Exchange * @return * @throws AMQException */ - boolean isBound(String routingKey, AMQQueue queue) throws AMQException; + boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException; /** * Determines whether a message is routing to any queue using a specific routing key @@ -63,7 +64,7 @@ public interface Exchange * @return * @throws AMQException */ - boolean isBound(String routingKey) throws AMQException; + boolean isBound(AMQShortString routingKey) throws AMQException; boolean isBound(AMQQueue queue) throws AMQException; diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java index 37ba883bc3..e07fd0b8fc 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java @@ -21,11 +21,12 @@ package org.apache.qpid.server.exchange; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; public interface ExchangeFactory { - Exchange createExchange(String exchange, String type, boolean durable, boolean autoDelete, + Exchange createExchange(AMQShortString exchange, AMQShortString type, boolean durable, boolean autoDelete, int ticket) throws AMQException; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java index 4a0a6a0ee1..efcb963f8b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.exchange; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; public interface ExchangeRegistry extends MessageRouter @@ -34,7 +35,7 @@ public interface ExchangeRegistry extends MessageRouter * @throws ExchangeInUseException when the exchange cannot be deleted because it is in use * @throws AMQException */ - void unregisterExchange(String name, boolean inUse) throws ExchangeInUseException, AMQException; + void unregisterExchange(AMQShortString name, boolean inUse) throws ExchangeInUseException, AMQException; - Exchange getExchange(String name); + Exchange getExchange(AMQShortString name); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java index 1c63a5571e..cf10f219aa 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java @@ -34,7 +34,7 @@ class HeadersBinding { private static final Logger _logger = Logger.getLogger(HeadersBinding.class); - private final FieldTable _mappings = new FieldTable(); + private final FieldTable _mappings; private final Set<String> required = new HashSet<String>(); private final Map<String,Object> matches = new HashMap<String,Object>(); private boolean matchAny; @@ -91,12 +91,7 @@ class HeadersBinding HeadersBinding(FieldTable mappings) { - Enumeration propertyNames = mappings.getPropertyNames(); - while(propertyNames.hasMoreElements()) - { - String propName = (String) propertyNames.nextElement(); - _mappings.put(propName, mappings.getObject(propName)); - } + _mappings = mappings; initMappings(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index dcb64e2d30..e681cb4c47 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -110,7 +110,7 @@ public class HeadersExchange extends AbstractExchange for (Iterator<Registration> itr = _bindings.iterator(); itr.hasNext();) { Registration registration = itr.next(); - String queueName = registration.queue.getName(); + String queueName = registration.queue.getName().toString(); HeadersBinding headers = registration.binding; FieldTable headerMappings = headers.getMappings(); @@ -149,7 +149,7 @@ public class HeadersExchange extends AbstractExchange */ public void createNewBinding(String queueName, String binding) throws JMException { - AMQQueue queue = ApplicationRegistry.getInstance().getQueueRegistry().getQueue(queueName); + AMQQueue queue = ApplicationRegistry.getInstance().getQueueRegistry().getQueue(new AMQShortString(queueName)); if (queue == null) { @@ -173,13 +173,13 @@ public class HeadersExchange extends AbstractExchange } // End of MBean class - public void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException + public void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException { _logger.debug("Exchange " + getName() + ": Binding " + queue.getName() + " with " + args); _bindings.add(new Registration(new HeadersBinding(args), queue)); } - public void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException + public void deregisterQueue(AMQShortString routingKey, AMQQueue queue) throws AMQException { _logger.debug("Exchange " + getName() + ": Unbinding " + queue.getName()); _bindings.remove(new Registration(null, queue)); @@ -223,12 +223,12 @@ public class HeadersExchange extends AbstractExchange } } - public boolean isBound(String routingKey, AMQQueue queue) throws AMQException + public boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException { return isBound(queue); } - public boolean isBound(String routingKey) throws AMQException + public boolean isBound(AMQShortString routingKey) throws AMQException { return hasBindings(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java index 485c4739bd..8527a68862 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.exchange; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.framing.AMQShortString; import java.util.List; import java.util.Map; @@ -35,10 +36,10 @@ import java.util.concurrent.CopyOnWriteArrayList; */ class Index { - private ConcurrentMap<String, List<AMQQueue>> _index - = new ConcurrentHashMap<String, List<AMQQueue>>(); + private ConcurrentMap<AMQShortString, List<AMQQueue>> _index + = new ConcurrentHashMap<AMQShortString, List<AMQQueue>>(); - synchronized boolean add(String key, AMQQueue queue) + synchronized boolean add(AMQShortString key, AMQQueue queue) { List<AMQQueue> queues = _index.get(key); if(queues == null) @@ -62,7 +63,7 @@ class Index } } - synchronized boolean remove(String key, AMQQueue queue) + synchronized boolean remove(AMQShortString key, AMQQueue queue) { List<AMQQueue> queues = _index.get(key); if (queues != null) @@ -77,13 +78,13 @@ class Index return false; } - List<AMQQueue> get(String key) + List<AMQQueue> get(AMQShortString key) { return _index.get(key); } - Map<String, List<AMQQueue>> getBindingsMap() + Map<AMQShortString, List<AMQQueue>> getBindingsMap() { - return new HashMap<String, List<AMQQueue>>(_index); + return new HashMap<AMQShortString, List<AMQQueue>>(_index); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java b/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java index 49f99132ef..5c784983cb 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java @@ -45,29 +45,24 @@ public class FilterManagerFactory manager = new SimpleFilterManager(); - Iterator it = filters.keySet().iterator(); - _logger.info("Processing filters:"); - while (it.hasNext()) + if(filters.containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue())) { - String key = (String) it.next(); - _logger.info("filter:" + key); - if (key.equals(AMQPFilterTypes.JMS_SELECTOR.getValue())) - { - String selector = (String) filters.get(key); - - if (selector != null && !selector.equals("")) - { - manager.add(new JMSSelectorFilter(selector)); - } - } + String selector = filters.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()); - if (key.equals(AMQPFilterTypes.NO_CONSUME.getValue())) + if (selector != null && !selector.equals("")) { - manager.add(new NoConsumerFilter()); + manager.add(new JMSSelectorFilter(selector)); } } + if (filters.containsKey(AMQPFilterTypes.NO_CONSUME.getValue())) + { + manager.add(new NoConsumerFilter()); + } + + + //If we added no filters don't bear the overhead of having an filter manager if (!manager.hasFilters()) { @@ -76,7 +71,7 @@ public class FilterManagerFactory } else { - _logger.info("No Filters found."); + _logger.debug("No Filters found."); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java index 7d6a98df84..934bca991d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java @@ -248,7 +248,7 @@ public class PropertyExpression implements Expression _logger.info("Looking up property:" + name); _logger.info("Properties are:" + _properties.getHeaders().keySet()); - return _properties.getHeaders().get(name); + return _properties.getHeaders().getObject(name); } // catch (IOException ioe) // { diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java index d3aece9818..0cb1d8bee8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java @@ -23,10 +23,7 @@ package org.apache.qpid.server.handler; import org.apache.qpid.AMQException; import org.apache.qpid.AMQInvalidSelectorException; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.framing.BasicConsumeBody; -import org.apache.qpid.framing.BasicConsumeOkBody; -import org.apache.qpid.framing.ConnectionCloseBody; -import org.apache.qpid.framing.ChannelCloseBody; +import org.apache.qpid.framing.*; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.ConsumerTagNotUniqueException; import org.apache.qpid.server.exchange.ExchangeRegistry; @@ -77,7 +74,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic } try { - String consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck, + AMQShortString consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck, body.arguments, body.noLocal); if (!body.nowait) { @@ -103,11 +100,11 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic BasicConsumeBody.getClazz((byte)8, (byte)0), // classId BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId AMQConstant.INVALID_SELECTOR.getCode(), // replyCode - ise.getMessage())); // replyText + new AMQShortString(ise.getMessage()))); // replyText } catch (ConsumerTagNotUniqueException e) { - String msg = "Non-unique consumer tag, '" + body.consumerTag + "'"; + AMQShortString msg = new AMQShortString("Non-unique consumer tag, '" + body.consumerTag + "'"); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java index 423ea5f276..181409c255 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java @@ -21,9 +21,11 @@ package org.apache.qpid.server.handler; import org.apache.qpid.AMQException; +import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ChannelCloseBody; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeRegistry; @@ -36,6 +38,8 @@ import org.apache.qpid.server.state.StateAwareMethodListener; public class BasicPublishMethodHandler implements StateAwareMethodListener<BasicPublishBody> { private static final BasicPublishMethodHandler _instance = new BasicPublishMethodHandler(); + + private static final AMQShortString UNKNOWN_EXCHANGE_NAME = new AMQShortString("Unknown exchange name"); public static BasicPublishMethodHandler getInstance() { @@ -55,7 +59,8 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi // TODO: check the delivery tag field details - is it unique across the broker or per subscriber? if (body.exchange == null) { - body.exchange = "amq.direct"; + body.exchange = ExchangeDefaults.DIRECT_EXCHANGE_NAME; + } Exchange e = exchangeRegistry.getExchange(body.exchange); // if the exchange does not exist we raise a channel exception @@ -72,7 +77,7 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener<Basi ChannelCloseBody.getClazz((byte)8, (byte)0), // classId ChannelCloseBody.getMethod((byte)8, (byte)0), // methodId 500, // replyCode - "Unknown exchange name"); // replyText + UNKNOWN_EXCHANGE_NAME); // replyText protocolSession.writeFrame(cf); } else diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java index 9f9b029ada..c00fe858fa 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java @@ -24,6 +24,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ConnectionOpenBody; import org.apache.qpid.framing.ConnectionOpenOkBody; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -45,9 +46,9 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con { } - private static String generateClientID() + private static AMQShortString generateClientID() { - return Long.toString(System.currentTimeMillis()); + return new AMQShortString(Long.toString(System.currentTimeMillis())); } public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, @@ -55,7 +56,7 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con AMQMethodEvent<ConnectionOpenBody> evt) throws AMQException { ConnectionOpenBody body = evt.getMethod(); - String contextKey = body.virtualHost; + AMQShortString contextKey = body.virtualHost; //todo //FIXME The virtual host must be validated by the server for the connection to open-ok // See Spec (0.8.2). Section 3.1.2 Virtual Hosts diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java index 9f24100df1..7cf1236d2f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java @@ -73,7 +73,7 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< SaslServer ss = null; try { - ss = authMgr.createSaslServer(body.mechanism, protocolSession.getLocalFQDN()); + ss = authMgr.createSaslServer(String.valueOf(body.mechanism), protocolSession.getLocalFQDN()); protocolSession.setSaslServer(ss); AuthenticationResult authResult = authMgr.authenticate(ss, body.response); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java index 30e8990b54..0b216c4da1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java @@ -21,6 +21,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ExchangeBoundBody; import org.apache.qpid.framing.ExchangeBoundOkBody; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQMethodEvent; @@ -71,9 +72,9 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo ExchangeBoundBody body = evt.getMethod(); - String exchangeName = body.exchange; - String queueName = body.queue; - String routingKey = body.routingKey; + AMQShortString exchangeName = body.exchange; + AMQShortString queueName = body.queue; + AMQShortString routingKey = body.routingKey; if (exchangeName == null) { throw new AMQException("Exchange exchange must not be null"); @@ -86,7 +87,7 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), major, minor, // AMQP version (major, minor) EXCHANGE_NOT_FOUND, // replyCode - "Exchange " + exchangeName + " not found"); // replyText + new AMQShortString("Exchange " + exchangeName + " not found")); // replyText } else if (routingKey == null) { @@ -118,7 +119,7 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), major, minor, // AMQP version (major, minor) QUEUE_NOT_FOUND, // replyCode - "Queue " + queueName + " not found"); // replyText + new AMQShortString("Queue " + queueName + " not found")); // replyText } else { @@ -136,7 +137,7 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), major, minor, // AMQP version (major, minor) QUEUE_NOT_BOUND, // replyCode - "Queue " + queueName + " not bound to exchange " + exchangeName); // replyText + new AMQShortString("Queue " + queueName + " not bound to exchange " + exchangeName)); // replyText } } } @@ -150,7 +151,7 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), major, minor, // AMQP version (major, minor) QUEUE_NOT_FOUND, // replyCode - "Queue " + queueName + " not found"); // replyText + new AMQShortString("Queue " + queueName + " not found")); // replyText } else { @@ -168,8 +169,8 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), major, minor, // AMQP version (major, minor) SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode - "Queue " + queueName + " not bound with routing key " + - body.routingKey + " to exchange " + exchangeName); // replyText + new AMQShortString("Queue " + queueName + " not bound with routing key " + + body.routingKey + " to exchange " + exchangeName)); // replyText } } } @@ -189,8 +190,8 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), major, minor, // AMQP version (major, minor) NO_QUEUE_BOUND_WITH_RK, // replyCode - "No queue bound with routing key " + body.routingKey + - " to exchange " + exchangeName); // replyText + new AMQShortString("No queue bound with routing key " + body.routingKey + + " to exchange " + exchangeName)); // replyText } } protocolSession.writeFrame(response); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index 83f98de2d9..6ff7700a13 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -22,10 +22,12 @@ package org.apache.qpid.server.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.configuration.Configured; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.QueueDeclareBody; import org.apache.qpid.framing.QueueDeclareOkBody; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.protocol.AMQMethodEvent; @@ -91,7 +93,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar queueRegistry.registerQueue(queue); if (autoRegister) { - Exchange defaultExchange = exchangeRegistry.getExchange("amq.direct"); + Exchange defaultExchange = exchangeRegistry.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME); defaultExchange.registerQueue(body.queue, queue, null); queue.bind(body.queue, defaultExchange); _log.info("Queue " + body.queue + " bound to default exchange"); @@ -115,9 +117,9 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar } } - protected String createName() + protected AMQShortString createName() { - return "tmp_" + pad(_counter.incrementAndGet()); + return new AMQShortString("tmp_" + pad(_counter.incrementAndGet())); } protected static String pad(int value) @@ -128,7 +130,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar protected AMQQueue createQueue(QueueDeclareBody body, QueueRegistry registry, AMQProtocolSession session) throws AMQException { - String owner = body.exclusive ? session.getContextKey() : null; + AMQShortString owner = body.exclusive ? session.getContextKey() : null; return new AMQQueue(body.queue, body.durable, owner, body.autoDelete, registry); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java index 688968b8a0..5437561095 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java @@ -80,7 +80,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDelete else { int purged = queue.delete(body.ifUnused, body.ifEmpty); - _store.removeQueue(queue.getName()); + _store.removeQueue(queue.getName().toString()); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. diff --git a/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java b/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java index 376f88cbf1..ab201c476e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/message/jms/JMSMessage.java @@ -178,116 +178,116 @@ public class JMSMessage implements MessageDecorator public void clearProperties() throws MessageNotWriteableException { checkWriteable(); - _properties.getJMSHeaders().clear(); + _properties.clear(); } public boolean propertyExists(String string) { - return _properties.getJMSHeaders().propertyExists(string); + return _properties.propertyExists(string); } public boolean getBooleanProperty(String string) throws JMSException { - return _properties.getJMSHeaders().getBoolean(string); + return _properties.getBoolean(string); } public byte getByteProperty(String string) throws JMSException { - return _properties.getJMSHeaders().getByte(string); + return _properties.getByte(string); } public short getShortProperty(String string) throws JMSException { - return _properties.getJMSHeaders().getShort(string); + return _properties.getShort(string); } public int getIntProperty(String string) throws JMSException { - return _properties.getJMSHeaders().getInteger(string); + return _properties.getInteger(string); } public long getLongProperty(String string) throws JMSException { - return _properties.getJMSHeaders().getLong(string); + return _properties.getLong(string); } public float getFloatProperty(String string) throws JMSException { - return _properties.getJMSHeaders().getFloat(string); + return _properties.getFloat(string); } public double getDoubleProperty(String string) throws JMSException { - return _properties.getJMSHeaders().getDouble(string); + return _properties.getDouble(string); } public String getStringProperty(String string) throws JMSException { - return _properties.getJMSHeaders().getString(string); + return _properties.getString(string); } public Object getObjectProperty(String string) throws JMSException { - return _properties.getJMSHeaders().getObject(string); + return _properties.getObject(string); } public Enumeration getPropertyNames() { - return _properties.getJMSHeaders().getPropertyNames(); + return _properties.getPropertyNames(); } public void setBooleanProperty(String string, boolean b) throws JMSException { checkWriteable(); - _properties.getJMSHeaders().setBoolean(string, b); + _properties.setBoolean(string, b); } public void setByteProperty(String string, byte b) throws JMSException { checkWriteable(); - _properties.getJMSHeaders().setByte(string, b); + _properties.setByte(string, b); } public void setShortProperty(String string, short i) throws JMSException { checkWriteable(); - _properties.getJMSHeaders().setShort(string, i); + _properties.setShort(string, i); } public void setIntProperty(String string, int i) throws JMSException { checkWriteable(); - _properties.getJMSHeaders().setInteger(string, i); + _properties.setInteger(string, i); } public void setLongProperty(String string, long l) throws JMSException { checkWriteable(); - _properties.getJMSHeaders().setLong(string, l); + _properties.setLong(string, l); } public void setFloatProperty(String string, float v) throws JMSException { checkWriteable(); - _properties.getJMSHeaders().setFloat(string, v); + _properties.setFloat(string, v); } public void setDoubleProperty(String string, double v) throws JMSException { checkWriteable(); - _properties.getJMSHeaders().setDouble(string, v); + _properties.setDouble(string, v); } public void setStringProperty(String string, String string1) throws JMSException { checkWriteable(); - _properties.getJMSHeaders().setString(string, string1); + _properties.setString(string, string1); } public void setObjectProperty(String string, Object object) throws JMSException { checkWriteable(); - _properties.getJMSHeaders().setObject(string, object); + _properties.setObject(string, object); } public void acknowledge() throws MessageNotWriteableException diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 08668f0f6a..f86d8afe02 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -26,16 +26,7 @@ import org.apache.mina.common.IoSession; import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.framing.ProtocolInitiation; -import org.apache.qpid.framing.ConnectionStartBody; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.ProtocolVersionList; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.HeartbeatBody; -import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.*; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.codec.AMQDecoder; @@ -65,7 +56,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, private final IoSession _minaProtocolSession; - private String _contextKey; + private AMQShortString _contextKey; private final Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>(); @@ -291,12 +282,12 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, _minaProtocolSession.write(frame); } - public String getContextKey() + public AMQShortString getContextKey() { return _contextKey; } - public void setContextKey(String contextKey) + public void setContextKey(AMQShortString contextKey) { _contextKey = contextKey; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java index 2e9590277b..f4f443b162 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java @@ -180,7 +180,7 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoco 0, // classId 0, // methodId 200, // replyCode - throwable.getMessage() // replyText + new AMQShortString(throwable.getMessage()) // replyText )); _logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable); protocolSession.close(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index a75627d240..ee01dd9f5b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.AMQException; @@ -48,14 +49,14 @@ public interface AMQProtocolSession * in the AMQ protocol specification (RFC 6). * @return the context key */ - String getContextKey(); + AMQShortString getContextKey(); /** * Set the context key associated with this session. Context key is described * in the AMQ protocol specification (RFC 6). * @param contextKey the context key */ - void setContextKey(String contextKey); + void setContextKey(AMQShortString contextKey); /** * Get the channel for this session associated with the specified id. A channel diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java index 0ceadcb30b..08045e1c41 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java @@ -21,6 +21,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.framing.ConnectionCloseBody; import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.MBeanConstructor; @@ -58,6 +59,8 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed private OpenType[] _channelAttributeTypes = {SimpleType.INTEGER, SimpleType.BOOLEAN, SimpleType.STRING, SimpleType.INTEGER}; private CompositeType _channelType = null; // represents the data type for channel data private TabularType _channelsType = null; // Data type for list of channels type + private static final AMQShortString BROKER_MANAGEMENT_CONSOLE_HAS_CLOSING_THE_CONNECTION = + new AMQShortString("Broker Management Console has closing the connection."); @MBeanConstructor("Creates an MBean exposing an AMQ Broker Connection") public AMQProtocolSessionMBean(AMQMinaProtocolSession session) throws JMException @@ -201,7 +204,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed 0, // classId 0, // methodId AMQConstant.REPLY_SUCCESS.getCode(), // replyCode - "Broker Management Console has closing the connection." // replyText + BROKER_MANAGEMENT_CONSOLE_HAS_CLOSING_THE_CONNECTION // replyText ); _session.writeFrame(response); diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java index d3ec70456f..d4881aefaf 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.protocol; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; @@ -34,7 +35,7 @@ public class ExchangeInitialiser } private void define(ExchangeRegistry r, ExchangeFactory f, - String name, String type) throws AMQException + AMQShortString name, AMQShortString type) throws AMQException { r.registerExchange(f.createExchange(name, type, true, false, 0)); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index f30667690f..05b4f5ec2b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -562,7 +562,7 @@ public class AMQMessage } } - public void writeDeliver(AMQProtocolSession protocolSession, int channelId, long deliveryTag, String consumerTag) + public void writeDeliver(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException { ByteBuffer deliver = createEncodedDeliverFrame(channelId, deliveryTag, consumerTag); @@ -598,7 +598,7 @@ public class AMQMessage } - private ByteBuffer createEncodedDeliverFrame(int channelId, long deliveryTag, String consumerTag) + private ByteBuffer createEncodedDeliverFrame(int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException { BasicPublishBody pb = getPublishBody(); @@ -611,7 +611,7 @@ public class AMQMessage return buf; } - private ByteBuffer createEncodedReturnFrame(int channelId, int replyCode, String replyText) throws AMQException + private ByteBuffer createEncodedReturnFrame(int channelId, int replyCode, AMQShortString replyText) throws AMQException { AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId, (byte) 8, (byte) 0, getPublishBody().exchange, replyCode, replyText, @@ -622,7 +622,7 @@ public class AMQMessage return buf; } - public void writeReturn(AMQProtocolSession protocolSession, int channelId, int replyCode, String replyText) + public void writeReturn(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText) throws AMQException { ByteBuffer returnFrame = createEncodedReturnFrame(channelId, replyCode, replyText); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 6f1018e753..ea09654988 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; @@ -42,12 +43,12 @@ public class AMQQueue implements Managable, Comparable { private static final Logger _logger = Logger.getLogger(AMQQueue.class); - private final String _name; + private final AMQShortString _name; /** * null means shared */ - private final String _owner; + private final AMQShortString _owner; private final boolean _durable; @@ -111,7 +112,7 @@ public class AMQQueue implements Managable, Comparable return _name.compareTo(((AMQQueue) o).getName()); } - public AMQQueue(String name, boolean durable, String owner, + public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, QueueRegistry queueRegistry) throws AMQException { @@ -119,7 +120,7 @@ public class AMQQueue implements Managable, Comparable AsyncDeliveryConfig.getAsyncDeliveryExecutor(), new SubscriptionImpl.Factory()); } - public AMQQueue(String name, boolean durable, String owner, + public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, QueueRegistry queueRegistry, SubscriptionFactory subscriptionFactory) throws AMQException { @@ -127,7 +128,7 @@ public class AMQQueue implements Managable, Comparable AsyncDeliveryConfig.getAsyncDeliveryExecutor(), subscriptionFactory); } - public AMQQueue(String name, boolean durable, String owner, + public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery, SubscriptionFactory subscriptionFactory) throws AMQException @@ -136,7 +137,7 @@ public class AMQQueue implements Managable, Comparable this(name, durable, owner, autoDelete, queueRegistry, asyncDelivery, new SubscriptionSet(), subscriptionFactory); } - public AMQQueue(String name, boolean durable, String owner, + public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery) throws AMQException { @@ -145,7 +146,7 @@ public class AMQQueue implements Managable, Comparable new SubscriptionImpl.Factory()); } - protected AMQQueue(String name, boolean durable, String owner, + protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, QueueRegistry queueRegistry, SubscriptionSet subscribers, SubscriptionFactory subscriptionFactory) throws AMQException @@ -154,7 +155,7 @@ public class AMQQueue implements Managable, Comparable AsyncDeliveryConfig.getAsyncDeliveryExecutor(), subscribers, subscriptionFactory); } - protected AMQQueue(String name, boolean durable, String owner, + protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, QueueRegistry queueRegistry, SubscriptionSet subscribers) throws AMQException @@ -163,7 +164,7 @@ public class AMQQueue implements Managable, Comparable AsyncDeliveryConfig.getAsyncDeliveryExecutor(), subscribers, new SubscriptionImpl.Factory()); } - protected AMQQueue(String name, boolean durable, String owner, + protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery, SubscriptionSet subscribers, SubscriptionFactory subscriptionFactory) throws AMQException @@ -225,7 +226,7 @@ public class AMQQueue implements Managable, Comparable } } - public String getName() + public AMQShortString getName() { return _name; } @@ -240,7 +241,7 @@ public class AMQQueue implements Managable, Comparable return _durable; } - public String getOwner() + public AMQShortString getOwner() { return _owner; } @@ -356,17 +357,17 @@ public class AMQQueue implements Managable, Comparable _deliveryMgr.clearAllMessages(storeContext); } - public void bind(String routingKey, Exchange exchange) + public void bind(AMQShortString routingKey, Exchange exchange) { _bindings.addBinding(routingKey, exchange); } - public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, FieldTable filters) throws AMQException + public void registerProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag, boolean acks, FieldTable filters) throws AMQException { registerProtocolSession(ps, channel, consumerTag, acks, filters, false); } - public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, FieldTable filters, boolean noLocal) + public void registerProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag, boolean acks, FieldTable filters, boolean noLocal) throws AMQException { debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this); @@ -384,7 +385,7 @@ public class AMQQueue implements Managable, Comparable _subscribers.addSubscriber(subscription); } - public void unregisterProtocolSession(AMQProtocolSession ps, int channel, String consumerTag) throws AMQException + public void unregisterProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag) throws AMQException { debug("Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}", ps, channel, consumerTag, this); @@ -475,7 +476,7 @@ public class AMQQueue implements Managable, Comparable } catch (AMQException e) { - throw new FailedDequeueException(_name, e); + throw new FailedDequeueException(_name.toString(), e); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index 77bbdf7b4b..fb4a8e06bf 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -112,7 +112,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue public String getOwner() { - return _queue.getOwner(); + return String.valueOf(_queue.getOwner()); } public boolean isAutoDelete() diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java index a2898ccdce..1a44e86f1a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.configuration.Configured; import org.apache.qpid.server.configuration.Configurator; import org.apache.qpid.server.store.StoreContext; @@ -294,7 +295,7 @@ public class ConcurrentDeliveryManager implements DeliveryManager } } - public void deliver(StoreContext storeContext, String name, AMQMessage msg) throws FailedDequeueException, AMQException + public void deliver(StoreContext storeContext, AMQShortString name, AMQMessage msg) throws FailedDequeueException, AMQException { // first check whether we are queueing, and enqueue if we are if (!enqueue(msg)) diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 8f0c3a5ec7..91c49a4cf9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -25,6 +25,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; import org.apache.qpid.configuration.Configured; import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.configuration.Configurator; import org.apache.qpid.server.store.StoreContext; @@ -280,7 +281,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager return _messages.poll(); } - public void deliver(StoreContext context, String name, AMQMessage msg) throws AMQException + public void deliver(StoreContext context, AMQShortString name, AMQMessage msg) throws AMQException { if (_log.isDebugEnabled()) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java index 3b73072e30..8ab26def74 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java @@ -21,13 +21,14 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentHashMap; public class DefaultQueueRegistry implements QueueRegistry { - private ConcurrentMap<String, AMQQueue> _queueMap = new ConcurrentHashMap<String, AMQQueue>(); + private ConcurrentMap<AMQShortString, AMQQueue> _queueMap = new ConcurrentHashMap<AMQShortString, AMQQueue>(); public DefaultQueueRegistry() { @@ -38,12 +39,12 @@ public class DefaultQueueRegistry implements QueueRegistry _queueMap.put(queue.getName(), queue); } - public void unregisterQueue(String name) throws AMQException + public void unregisterQueue(AMQShortString name) throws AMQException { _queueMap.remove(name); } - public AMQQueue getQueue(String name) + public AMQQueue getQueue(AMQShortString name) { return _queueMap.get(name); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java index 82d8f9538f..d3d235f07f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.store.StoreContext; import java.util.concurrent.Executor; @@ -67,7 +68,7 @@ interface DeliveryManager * @param msg the message to deliver * @throws org.apache.qpid.server.queue.FailedDequeueException if the message could not be dequeued */ - void deliver(StoreContext storeContext, String name, AMQMessage msg) throws FailedDequeueException, AMQException; + void deliver(StoreContext storeContext, AMQShortString name, AMQMessage msg) throws FailedDequeueException, AMQException; void removeAMessageFromTop(StoreContext storeContext) throws AMQException; diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java index 684e312fa3..2f742952c9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; import java.util.List; import java.util.HashSet; @@ -37,9 +38,9 @@ class ExchangeBindings static class ExchangeBinding { private final Exchange exchange; - private final String routingKey; + private final AMQShortString routingKey; - ExchangeBinding(String routingKey, Exchange exchange) + ExchangeBinding(AMQShortString routingKey, Exchange exchange) { this.routingKey = routingKey; this.exchange = exchange; @@ -55,7 +56,7 @@ class ExchangeBindings return exchange; } - public String getRoutingKey() + public AMQShortString getRoutingKey() { return routingKey; } @@ -87,7 +88,7 @@ class ExchangeBindings * are being tracked by the instance has been bound to the exchange * @param exchange the exchange bound to */ - void addBinding(String routingKey, Exchange exchange) + void addBinding(AMQShortString routingKey, Exchange exchange) { _bindings.add(new ExchangeBinding(routingKey, exchange)); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java index c83f17b98c..bfbaf27c84 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java @@ -21,13 +21,14 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; public interface QueueRegistry { void registerQueue(AMQQueue queue) throws AMQException; - void unregisterQueue(String name) throws AMQException; + void unregisterQueue(AMQShortString name) throws AMQException; - AMQQueue getQueue(String name); + AMQQueue getQueue(AMQShortString name); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java index 2bb77dc649..6cc55f2818 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.AMQException; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.AMQShortString; /** * Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This @@ -33,10 +34,10 @@ import org.apache.qpid.framing.FieldTable; */ public interface SubscriptionFactory { - Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, + Subscription createSubscription(int channel, AMQProtocolSession protocolSession, AMQShortString consumerTag, boolean acks, FieldTable filters, boolean noLocal) throws AMQException; - Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag) + Subscription createSubscription(int channel, AMQProtocolSession protocolSession, AMQShortString consumerTag) throws AMQException; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index 0dc1f3b0c1..0afe17c6ca 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -25,10 +25,7 @@ import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.common.ClientProperties; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.BasicCancelOkBody; -import org.apache.qpid.framing.BasicDeliverBody; -import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.*; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; @@ -53,7 +50,7 @@ public class SubscriptionImpl implements Subscription public final AMQProtocolSession protocolSession; - public final String consumerTag; + public final AMQShortString consumerTag; private final Object sessionKey; @@ -72,12 +69,12 @@ public class SubscriptionImpl implements Subscription public static class Factory implements SubscriptionFactory { - public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters, boolean noLocal) throws AMQException + public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, AMQShortString consumerTag, boolean acks, FieldTable filters, boolean noLocal) throws AMQException { return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters, noLocal); } - public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag) + public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, AMQShortString consumerTag) throws AMQException { return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null, false); @@ -85,14 +82,14 @@ public class SubscriptionImpl implements Subscription } public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession, - String consumerTag, boolean acks) + AMQShortString consumerTag, boolean acks) throws AMQException { this(channelId, protocolSession, consumerTag, acks, null, false); } public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession, - String consumerTag, boolean acks, FieldTable filters, boolean noLocal) + AMQShortString consumerTag, boolean acks, FieldTable filters, boolean noLocal) throws AMQException { AMQChannel channel = protocolSession.getChannel(channelId); @@ -162,7 +159,7 @@ public class SubscriptionImpl implements Subscription public SubscriptionImpl(int channel, AMQProtocolSession protocolSession, - String consumerTag) + AMQShortString consumerTag) throws AMQException { this(channel, protocolSession, consumerTag, false); @@ -304,8 +301,8 @@ public class SubscriptionImpl implements Subscription if (_noLocal) { // We don't want local messages so check to see if message is one we sent - if (protocolSession.getClientProperties().get(ClientProperties.instance.toString()).equals( - msg.getPublisher().getClientProperties().get(ClientProperties.instance.toString()))) + if (protocolSession.getClientProperties().getObject(ClientProperties.instance.toString()).equals( + msg.getPublisher().getClientProperties().getObject(ClientProperties.instance.toString()))) { if (_logger.isTraceEnabled()) { @@ -395,7 +392,7 @@ public class SubscriptionImpl implements Subscription } - private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange) + private ByteBuffer createEncodedDeliverFrame(long deliveryTag, AMQShortString routingKey, AMQShortString exchange) { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java index f290452058..02fe86a083 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.store.StoreContext; import org.apache.log4j.Logger; @@ -234,7 +235,7 @@ class SynchronizedDeliveryManager implements DeliveryManager * @throws NoConsumersException if there are no active subscribers to deliver * the message to */ - public void deliver(StoreContext storeContext, String name, AMQMessage msg) throws FailedDequeueException, AMQException + public void deliver(StoreContext storeContext, AMQShortString name, AMQMessage msg) throws FailedDequeueException, AMQException { // first check whether we are queueing, and enqueue if we are if (!enqueue(msg)) diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java index 2fb2bdd2e3..446cf5ec2c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java @@ -180,11 +180,11 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle public void enqueue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException { - _messageStore.enqueueMessage(storeContext, queue.getName(), messageId); + _messageStore.enqueueMessage(storeContext, queue.getName().toString(), messageId); } public void dequeue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException { - _messageStore.dequeueMessage(storeContext, queue.getName(), messageId); + _messageStore.dequeueMessage(storeContext, queue.getName().toString(), messageId); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/AuthenticationManager.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/AuthenticationManager.java index 9f4addd7ee..6cee2ee452 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/AuthenticationManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/AuthenticationManager.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.security.auth; +import org.apache.qpid.framing.AMQShortString; + import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/NullAuthenticationManager.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/NullAuthenticationManager.java index 14cce86715..5c21dd4de4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/NullAuthenticationManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/NullAuthenticationManager.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.security.auth; import org.apache.qpid.server.security.auth.AuthenticationManager; import org.apache.qpid.server.security.auth.AuthenticationResult; +import org.apache.qpid.framing.AMQShortString; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/SASLAuthenticationManager.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/SASLAuthenticationManager.java index 21eb80c69d..e96bd68cad 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/SASLAuthenticationManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/SASLAuthenticationManager.java @@ -24,6 +24,7 @@ import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.configuration.PropertyUtils; +import org.apache.qpid.framing.AMQShortString; import javax.security.auth.callback.CallbackHandler; import javax.security.sasl.Sasl; diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/amqplain/AmqPlainSaslServer.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/amqplain/AmqPlainSaslServer.java index c364ca1d8d..a943003bd3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/amqplain/AmqPlainSaslServer.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/amqplain/AmqPlainSaslServer.java @@ -56,13 +56,13 @@ public class AmqPlainSaslServer implements SaslServer try { final FieldTable ft = FieldTableFactory.newFieldTable(ByteBuffer.wrap(response), response.length); - String username = (String) ft.get("LOGIN"); + String username = (String) ft.getString("LOGIN"); // we do not care about the prompt but it throws if null NameCallback nameCb = new NameCallback("prompt", username); // we do not care about the prompt but it throws if null PasswordCallback passwordCb = new PasswordCallback("prompt", false); // TODO: should not get pwd as a String but as a char array... - String pwd = (String) ft.get("PASSWORD"); + String pwd = (String) ft.getString("PASSWORD"); passwordCb.setPassword(pwd.toCharArray()); AuthorizeCallback authzCb = new AuthorizeCallback(username, username); Callback[] callbacks = new Callback[]{nameCb, passwordCb, authzCb}; |
