diff options
author | Robert Greig <rgreig@apache.org> | 2007-01-08 17:02:26 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2007-01-08 17:02:26 +0000 |
commit | d6b4e65f3fd1ff4a2763f8068cd6b3f7fe0b84e0 (patch) | |
tree | f0c608bcb9e4e5af6cd7ca5245401d2d1716b4f3 /java/cluster/src | |
parent | 61350c8523e2edca63d8a9ab2c970ad8607d4c0a (diff) | |
download | qpid-python-d6b4e65f3fd1ff4a2763f8068cd6b3f7fe0b84e0.tar.gz |
QPID-255 : Patch Supplied by Rob Godfrey - Change to use bespoke AMQShortString rather than converting to String
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@494121 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/cluster/src')
16 files changed, 80 insertions, 107 deletions
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java index c604709078..3c1b50fb99 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java @@ -29,13 +29,7 @@ import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.IllegalStateTransitionException; import org.apache.qpid.client.state.StateAwareMethodListener; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.ConnectionCloseBody; -import org.apache.qpid.framing.ConnectionOpenOkBody; -import org.apache.qpid.framing.ConnectionSecureBody; -import org.apache.qpid.framing.ConnectionStartBody; -import org.apache.qpid.framing.ConnectionTuneBody; +import org.apache.qpid.framing.*; import java.util.HashMap; import java.util.Map; @@ -127,9 +121,9 @@ public class ClientHandlerRegistry extends AMQStateManager class ConnectionTuneHandler extends ConnectionTuneMethodHandler { - protected AMQFrame createConnectionOpenFrame(int channel, String path, String capabilities, boolean insist) + protected AMQFrame createConnectionOpenFrame(int channel, AMQShortString path, AMQShortString capabilities, boolean insist) { - return super.createConnectionOpenFrame(channel, path, ClusterCapability.add(capabilities, _identity), insist); + return super.createConnectionOpenFrame(channel, path, new AMQShortString(ClusterCapability.add(capabilities, _identity)), insist); } } } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterCapability.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterCapability.java index 0411019334..57c48f0611 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterCapability.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterCapability.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.cluster; +import org.apache.qpid.framing.AMQShortString; + import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -28,22 +30,22 @@ public class ClusterCapability public static final String PATTERN = ".*\\bcluster_peer=(\\S*:\\d*)\b*.*"; public static final String PEER = "cluster_peer"; - public static String add(String original, MemberHandle identity) + public static AMQShortString add(AMQShortString original, MemberHandle identity) { - return original == null ? peer(identity) : original + " " + peer(identity); + return original == null ? peer(identity) : new AMQShortString(original + " " + peer(identity)); } - private static String peer(MemberHandle identity) + private static AMQShortString peer(MemberHandle identity) { - return PEER + "=" + identity.getDetails(); + return new AMQShortString(PEER + "=" + identity.getDetails()); } - public static boolean contains(String in) + public static boolean contains(AMQShortString in) { - return in != null && in.contains(in); + return in != null; // && in.contains(in); } - public static MemberHandle getPeer(String in) + public static MemberHandle getPeer(AMQShortString in) { Matcher matcher = Pattern.compile(PATTERN).matcher(in); if (matcher.matches()) diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java index a7936be8db..d8d2220a8e 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java @@ -109,7 +109,7 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. ClusterPingBody ping = new ClusterPingBody((byte)8, (byte)0); - ping.broker = _group.getLocal().getDetails(); + ping.broker = new AMQShortString(_group.getLocal().getDetails()); ping.responseRequired = true; ping.load = _loadTable.getLocalLoad(); BlockingHandler handler = new BlockingHandler(); @@ -157,7 +157,7 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. ClusterJoinBody join = new ClusterJoinBody((byte)8, (byte)0); - join.broker = _group.getLocal().getDetails(); + join.broker = new AMQShortString(_group.getLocal().getDetails()); send(leader, new SimpleBodySendable(join)); } @@ -178,7 +178,7 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. ClusterLeaveBody leave = new ClusterLeaveBody((byte)8, (byte)0); - leave.broker = _group.getLocal().getDetails(); + leave.broker = new AMQShortString(_group.getLocal().getDetails()); send(getLeader(), new SimpleBodySendable(leave)); } @@ -201,7 +201,7 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. ClusterSuspectBody suspect = new ClusterSuspectBody((byte)8, (byte)0); - suspect.broker = broker.getDetails(); + suspect.broker = new AMQShortString(broker.getDetails()); send(getLeader(), new SimpleBodySendable(suspect)); } } @@ -225,7 +225,7 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. ClusterJoinBody request = new ClusterJoinBody((byte)8, (byte)0); - request.broker = member.getDetails(); + request.broker = new AMQShortString(member.getDetails()); Broker leader = getLeader(); send(leader, new SimpleBodySendable(request)); _logger.info(new LogMessage("Passed join request for {0} to {1}", member, leader)); diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberHandle.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberHandle.java index b14fede5aa..b8099a12f7 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberHandle.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberHandle.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.cluster; +import org.apache.qpid.framing.AMQShortString; + public interface MemberHandle { public String getHost(); @@ -30,5 +32,5 @@ public interface MemberHandle public boolean matches(String host, int port); - public String getDetails(); + public AMQShortString getDetails(); } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java index 275ed39b5f..8557fc17c7 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java @@ -180,7 +180,7 @@ public class MinaBrokerProxy extends Broker implements MethodHandler { //signal redirection to waiting thread ConnectionRedirectBody redirect = (ConnectionRedirectBody) method; - String[] parts = redirect.host.split(":"); + String[] parts = redirect.host.toString().split(":"); _connectionMonitor.redirect(parts[0], Integer.parseInt(parts[1])); } else diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleMemberHandle.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleMemberHandle.java index b6d5e3d88d..1255094b1d 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleMemberHandle.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleMemberHandle.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.cluster; +import org.apache.qpid.framing.AMQShortString; + import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; @@ -37,6 +39,11 @@ public class SimpleMemberHandle implements MemberHandle _port = port; } + public SimpleMemberHandle(AMQShortString details) + { + this(details.toString()); + } + public SimpleMemberHandle(String details) { String[] parts = details.split(":"); @@ -84,14 +91,14 @@ public class SimpleMemberHandle implements MemberHandle return _host.equals(host) && _port == port; } - public String getDetails() + public AMQShortString getDetails() { - return _host + ":" + _port; + return new AMQShortString(_host + ":" + _port); } public String toString() { - return getDetails(); + return getDetails().toString(); } static List<MemberHandle> stringToMembers(String membership) diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java index 022ee098ab..9cff310a8a 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java @@ -28,11 +28,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.cluster.util.LogMessage; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.QueueDeclareBody; -import org.apache.qpid.framing.QueueBindBody; -import org.apache.qpid.framing.QueueDeleteBody; -import org.apache.qpid.framing.BasicConsumeBody; +import org.apache.qpid.framing.*; import org.apache.log4j.Logger; import java.util.Map; @@ -46,7 +42,7 @@ import java.util.HashMap; class ChannelQueueManager { private static final Logger _logger = Logger.getLogger(ChannelQueueManager.class); - private final Map<Integer, String> _channelQueues = new HashMap<Integer, String>(); + private final Map<Integer, AMQShortString> _channelQueues = new HashMap<Integer, AMQShortString>(); ClusterMethodHandler<QueueDeclareBody> createQueueDeclareHandler() { @@ -68,15 +64,15 @@ class ChannelQueueManager return new BasicConsumeHandler(); } - private void set(int channel, String queue) + private void set(int channel, AMQShortString queue) { _channelQueues.put(channel, queue); _logger.info(new LogMessage("Set default queue for {0} to {1}", channel, queue)); } - private String get(int channel) + private AMQShortString get(int channel) { - String queue = _channelQueues.get(channel); + AMQShortString queue = _channelQueues.get(channel); _logger.info(new LogMessage("Default queue for {0} is {1}", channel, queue)); return queue; } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java index 46ba3e5015..f90373cd98 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java @@ -21,34 +21,7 @@ package org.apache.qpid.server.cluster.handler; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.BasicCancelBody; -import org.apache.qpid.framing.BasicConsumeBody; -import org.apache.qpid.framing.BasicPublishBody; -import org.apache.qpid.framing.ChannelCloseBody; -import org.apache.qpid.framing.ChannelFlowBody; -import org.apache.qpid.framing.ChannelOpenBody; -import org.apache.qpid.framing.ClusterJoinBody; -import org.apache.qpid.framing.ClusterLeaveBody; -import org.apache.qpid.framing.ClusterMembershipBody; -import org.apache.qpid.framing.ClusterPingBody; -import org.apache.qpid.framing.ClusterSuspectBody; -import org.apache.qpid.framing.ConnectionCloseBody; -import org.apache.qpid.framing.ConnectionOpenBody; -import org.apache.qpid.framing.ConnectionSecureOkBody; -import org.apache.qpid.framing.ConnectionStartOkBody; -import org.apache.qpid.framing.ConnectionTuneOkBody; -import org.apache.qpid.framing.ExchangeDeclareBody; -import org.apache.qpid.framing.ExchangeDeleteBody; -import org.apache.qpid.framing.QueueBindBody; -import org.apache.qpid.framing.QueueDeclareBody; -import org.apache.qpid.framing.QueueDeleteBody; -import org.apache.qpid.framing.ClusterSynchBody; -import org.apache.qpid.framing.BasicQosBody; -import org.apache.qpid.framing.TxSelectBody; -import org.apache.qpid.framing.TxCommitBody; -import org.apache.qpid.framing.TxRollbackBody; +import org.apache.qpid.framing.*; import org.apache.qpid.server.cluster.ClusterCapability; import org.apache.qpid.server.cluster.ClusteredProtocolSession; import org.apache.qpid.server.cluster.GroupManager; @@ -236,7 +209,7 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory void postHandle(AMQStateManager stateMgr, AMQProtocolSession session, AMQMethodEvent<ConnectionOpenBody> evt) { - String capabilities = evt.getMethod().capabilities; + AMQShortString capabilities = evt.getMethod().capabilities; if (ClusterCapability.contains(capabilities)) { ClusteredProtocolSession.setSessionPeer(session, ClusterCapability.getPeer(capabilities)); diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java index 1e6bc26444..6b876095a4 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.cluster.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.QueueDeclareBody; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.cluster.ClusteredProtocolSession; import org.apache.qpid.server.cluster.GroupManager; import org.apache.qpid.server.cluster.util.LogMessage; @@ -45,9 +46,9 @@ public class LocalQueueDeclareHandler extends QueueDeclareHandler _groupMgr = groupMgr; } - protected String createName() + protected AMQShortString createName() { - return super.createName() + "@" + _groupMgr.getLocal().getDetails(); + return new AMQShortString(super.createName().toString() + "@" + _groupMgr.getLocal().getDetails()); } protected AMQQueue createQueue(QueueDeclareBody body, QueueRegistry registry, AMQProtocolSession session) throws AMQException @@ -60,7 +61,7 @@ public class LocalQueueDeclareHandler extends QueueDeclareHandler //need to get peer from the session... MemberHandle peer = ClusteredProtocolSession.getSessionPeer(session); _logger.debug(new LogMessage("Creating proxied queue {0} on behalf of {1}", body.queue, peer)); - return new RemoteQueueProxy(peer, _groupMgr, body.queue, body.durable, peer.getDetails(), body.autoDelete, registry); + return new RemoteQueueProxy(peer, _groupMgr, body.queue, body.durable, new AMQShortString(peer.getDetails()), body.autoDelete, registry); } else { diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java index 832e4830ab..722ec1b256 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.cluster.replay; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.BasicConsumeBody; +import org.apache.qpid.framing.AMQShortString; import java.util.Map; import java.util.HashMap; @@ -29,19 +30,19 @@ import java.util.List; class ConsumerCounts { - private final Map<String, Integer> _counts = new HashMap<String, Integer>(); + private final Map<AMQShortString, Integer> _counts = new HashMap<AMQShortString, Integer>(); - synchronized void increment(String queue) + synchronized void increment(AMQShortString queue) { _counts.put(queue, get(queue) + 1); } - synchronized void decrement(String queue) + synchronized void decrement(AMQShortString queue) { _counts.put(queue, get(queue) - 1); } - private int get(String queue) + private int get(AMQShortString queue) { Integer count = _counts.get(queue); return count == null ? 0 : count; @@ -49,7 +50,7 @@ class ConsumerCounts synchronized void replay(List<AMQMethodBody> messages) { - for(String queue : _counts.keySet()) + for(AMQShortString queue : _counts.keySet()) { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java index 338817e892..3193c206c7 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java @@ -22,15 +22,7 @@ package org.apache.qpid.server.cluster.replay; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.ExchangeDeclareBody; -import org.apache.qpid.framing.ExchangeDeleteBody; -import org.apache.qpid.framing.QueueBindBody; -import org.apache.qpid.framing.QueueDeclareBody; -import org.apache.qpid.framing.QueueDeleteBody; -import org.apache.qpid.framing.ClusterSynchBody; -import org.apache.qpid.framing.BasicConsumeBody; -import org.apache.qpid.framing.BasicCancelBody; +import org.apache.qpid.framing.*; import org.apache.qpid.server.cluster.ClusteredProtocolSession; import org.apache.qpid.server.cluster.util.LogMessage; import org.apache.qpid.server.cluster.util.Bindings; @@ -57,11 +49,11 @@ public class ReplayStore implements ReplayManager, StateAwareMethodListener private final Map<Class<? extends AMQMethodBody>, MethodRecorder> _globalRecorders = new HashMap<Class<? extends AMQMethodBody>, MethodRecorder>(); private final Map<Class<? extends AMQMethodBody>, MethodRecorder> _localRecorders = new HashMap<Class<? extends AMQMethodBody>, MethodRecorder>(); - private final Map<String, QueueDeclareBody> _sharedQueues = new ConcurrentHashMap<String, QueueDeclareBody>(); - private final Map<String, QueueDeclareBody> _privateQueues = new ConcurrentHashMap<String, QueueDeclareBody>(); - private final Bindings<String, String, QueueBindBody> _sharedBindings = new Bindings<String, String, QueueBindBody>(); - private final Bindings<String, String, QueueBindBody> _privateBindings = new Bindings<String, String, QueueBindBody>(); - private final Map<String, ExchangeDeclareBody> _exchanges = new ConcurrentHashMap<String, ExchangeDeclareBody>(); + private final Map<AMQShortString, QueueDeclareBody> _sharedQueues = new ConcurrentHashMap<AMQShortString, QueueDeclareBody>(); + private final Map<AMQShortString, QueueDeclareBody> _privateQueues = new ConcurrentHashMap<AMQShortString, QueueDeclareBody>(); + private final Bindings<AMQShortString, AMQShortString, QueueBindBody> _sharedBindings = new Bindings<AMQShortString, AMQShortString, QueueBindBody>(); + private final Bindings<AMQShortString, AMQShortString, QueueBindBody> _privateBindings = new Bindings<AMQShortString, AMQShortString, QueueBindBody>(); + private final Map<AMQShortString, ExchangeDeclareBody> _exchanges = new ConcurrentHashMap<AMQShortString, ExchangeDeclareBody>(); private final ConsumerCounts _consumers = new ConsumerCounts(); public ReplayStore() @@ -204,15 +196,15 @@ public class ReplayStore implements ReplayManager, StateAwareMethodListener private static class QueueDeclareRecorder extends ChainedMethodRecorder<QueueDeclareBody> { private final boolean _exclusive; - private final Map<String, QueueDeclareBody> _queues; + private final Map<AMQShortString, QueueDeclareBody> _queues; - QueueDeclareRecorder(boolean exclusive, Map<String, QueueDeclareBody> queues) + QueueDeclareRecorder(boolean exclusive, Map<AMQShortString, QueueDeclareBody> queues) { _queues = queues; _exclusive = exclusive; } - QueueDeclareRecorder(boolean exclusive, Map<String, QueueDeclareBody> queues, QueueDeclareRecorder recorder) + QueueDeclareRecorder(boolean exclusive, Map<AMQShortString, QueueDeclareBody> queues, QueueDeclareRecorder recorder) { super(recorder); _queues = queues; @@ -236,15 +228,15 @@ public class ReplayStore implements ReplayManager, StateAwareMethodListener private class QueueDeleteRecorder extends ChainedMethodRecorder<QueueDeleteBody> { - private final Map<String, QueueDeclareBody> _queues; - private final Bindings<String, String, QueueBindBody> _bindings; + private final Map<AMQShortString, QueueDeclareBody> _queues; + private final Bindings<AMQShortString, AMQShortString, QueueBindBody> _bindings; - QueueDeleteRecorder(Map<String, QueueDeclareBody> queues, Bindings<String, String, QueueBindBody> bindings) + QueueDeleteRecorder(Map<AMQShortString, QueueDeclareBody> queues, Bindings<AMQShortString, AMQShortString, QueueBindBody> bindings) { this(queues, bindings, null); } - QueueDeleteRecorder(Map<String, QueueDeclareBody> queues, Bindings<String, String, QueueBindBody> bindings, QueueDeleteRecorder recorder) + QueueDeleteRecorder(Map<AMQShortString, QueueDeclareBody> queues, Bindings<AMQShortString, AMQShortString, QueueBindBody> bindings, QueueDeleteRecorder recorder) { super(recorder); _queues = queues; @@ -267,16 +259,16 @@ public class ReplayStore implements ReplayManager, StateAwareMethodListener private class QueueBindRecorder extends ChainedMethodRecorder<QueueBindBody> { - private final Map<String, QueueDeclareBody> _queues; - private final Bindings<String, String, QueueBindBody> _bindings; + private final Map<AMQShortString, QueueDeclareBody> _queues; + private final Bindings<AMQShortString, AMQShortString, QueueBindBody> _bindings; - QueueBindRecorder(Map<String, QueueDeclareBody> queues, Bindings<String, String, QueueBindBody> bindings) + QueueBindRecorder(Map<AMQShortString, QueueDeclareBody> queues, Bindings<AMQShortString, AMQShortString, QueueBindBody> bindings) { _queues = queues; _bindings = bindings; } - QueueBindRecorder(Map<String, QueueDeclareBody> queues, Bindings<String, String, QueueBindBody> bindings, QueueBindRecorder recorder) + QueueBindRecorder(Map<AMQShortString, QueueDeclareBody> queues, Bindings<AMQShortString, AMQShortString, QueueBindBody> bindings, QueueBindRecorder recorder) { super(recorder); _queues = queues; diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java index 50b2fa0b66..6898ffcec2 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java @@ -24,6 +24,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicCancelBody; import org.apache.qpid.framing.QueueDeleteBody; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.cluster.*; import org.apache.qpid.server.cluster.util.LogMessage; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -45,7 +46,7 @@ public class ClusteredQueue extends AMQQueue private final GroupManager _groupMgr; private final NestedSubscriptionManager _subscriptions; - public ClusteredQueue(GroupManager groupMgr, String name, boolean durable, String owner, boolean autoDelete, QueueRegistry queueRegistry) + public ClusteredQueue(GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, QueueRegistry queueRegistry) throws AMQException { super(name, durable, owner, autoDelete, queueRegistry, new ClusteredSubscriptionManager()); @@ -53,7 +54,7 @@ public class ClusteredQueue extends AMQQueue _subscriptions = ((ClusteredSubscriptionManager) getSubscribers()).getAllSubscribers(); } - public ClusteredQueue(GroupManager groupMgr, String name, boolean durable, String owner, boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery) + public ClusteredQueue(GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery) throws AMQException { super(name, durable, owner, autoDelete, queueRegistry, asyncDelivery, new ClusteredSubscriptionManager(), @@ -84,7 +85,7 @@ public class ClusteredQueue extends AMQQueue } } - public void unregisterProtocolSession(AMQProtocolSession ps, int channel, String consumerTag) throws AMQException + public void unregisterProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag) throws AMQException { //handle locally: super.unregisterProtocolSession(ps, channel, consumerTag); diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java index 8315d46b5d..89ce0bc8b1 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java @@ -25,6 +25,7 @@ import org.apache.qpid.server.cluster.SimpleSendable; import org.apache.qpid.server.cluster.GroupManager; import org.apache.qpid.server.cluster.SimpleBodySendable; import org.apache.qpid.framing.QueueDeleteBody; +import org.apache.qpid.framing.AMQShortString; import java.util.concurrent.Executor; @@ -36,7 +37,7 @@ public class PrivateQueue extends AMQQueue { private final GroupManager _groupMgr; - public PrivateQueue(GroupManager groupMgr, String name, boolean durable, String owner, boolean autoDelete, QueueRegistry queueRegistry) + public PrivateQueue(GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, QueueRegistry queueRegistry) throws AMQException { super(name, durable, owner, autoDelete, queueRegistry); @@ -44,7 +45,7 @@ public class PrivateQueue extends AMQQueue } - public PrivateQueue(GroupManager groupMgr, String name, boolean durable, String owner, boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery) + public PrivateQueue(GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery) throws AMQException { super(name, durable, owner, autoDelete, queueRegistry, asyncDelivery); diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java index f8eba282e2..a6cce05a03 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.cluster.ClusteredProtocolSession; import org.apache.qpid.server.cluster.GroupManager; import org.apache.qpid.server.cluster.MemberHandle; @@ -42,7 +43,7 @@ public class RemoteQueueProxy extends AMQQueue private final MemberHandle _target; private final GroupManager _groupMgr; - public RemoteQueueProxy(MemberHandle target, GroupManager groupMgr, String name, boolean durable, String owner, boolean autoDelete, QueueRegistry queueRegistry) + public RemoteQueueProxy(MemberHandle target, GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, QueueRegistry queueRegistry) throws AMQException { super(name, durable, owner, autoDelete, queueRegistry); @@ -51,7 +52,7 @@ public class RemoteQueueProxy extends AMQQueue _groupMgr.addMemberhipChangeListener(new ProxiedQueueCleanup(target, this)); } - public RemoteQueueProxy(MemberHandle target, GroupManager groupMgr, String name, boolean durable, String owner, boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery) + public RemoteQueueProxy(MemberHandle target, GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery) throws AMQException { super(name, durable, owner, autoDelete, queueRegistry, asyncDelivery); diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/ClusterCapabilityTest.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/ClusterCapabilityTest.java index 76b1da8754..830a00f4c2 100644 --- a/java/cluster/src/test/java/org/apache/qpid/server/cluster/ClusterCapabilityTest.java +++ b/java/cluster/src/test/java/org/apache/qpid/server/cluster/ClusterCapabilityTest.java @@ -21,13 +21,14 @@ package org.apache.qpid.server.cluster; import junit.framework.TestCase; +import org.apache.qpid.framing.AMQShortString; public class ClusterCapabilityTest extends TestCase { public void testStartWithNull() { MemberHandle peer = new SimpleMemberHandle("myhost:9999"); - String c = ClusterCapability.add(null, peer); + AMQShortString c = ClusterCapability.add(null, peer); assertTrue(ClusterCapability.contains(c)); assertTrue(peer.matches(ClusterCapability.getPeer(c))); } @@ -35,7 +36,7 @@ public class ClusterCapabilityTest extends TestCase public void testStartWithText() { MemberHandle peer = new SimpleMemberHandle("myhost:9999"); - String c = ClusterCapability.add("existing text", peer); + AMQShortString c = ClusterCapability.add(new AMQShortString("existing text"), peer); assertTrue(ClusterCapability.contains(c)); assertTrue(peer.matches(ClusterCapability.getPeer(c))); } diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java index c427285f4a..70209cd2a3 100644 --- a/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java +++ b/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.cluster; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.url.URLSyntaxException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; @@ -36,7 +37,7 @@ public class SimpleClusterTest extends TestCase AMQConnection con = new AMQConnection("localhost:9000", "guest", "guest", "test", "/test"); AMQSession session = (AMQSession) con.createSession(false, AMQSession.NO_ACKNOWLEDGE); System.out.println("Session created"); - session.declareExchange("my_exchange", "direct"); + session.declareExchange(new AMQShortString("my_exchange"), new AMQShortString("direct")); System.out.println("Exchange declared"); con.close(); System.out.println("Connection closed"); |