summaryrefslogtreecommitdiff
path: root/java/cluster/src
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2007-01-08 17:02:26 +0000
committerRobert Greig <rgreig@apache.org>2007-01-08 17:02:26 +0000
commitd6b4e65f3fd1ff4a2763f8068cd6b3f7fe0b84e0 (patch)
treef0c608bcb9e4e5af6cd7ca5245401d2d1716b4f3 /java/cluster/src
parent61350c8523e2edca63d8a9ab2c970ad8607d4c0a (diff)
downloadqpid-python-d6b4e65f3fd1ff4a2763f8068cd6b3f7fe0b84e0.tar.gz
QPID-255 : Patch Supplied by Rob Godfrey - Change to use bespoke AMQShortString rather than converting to String
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@494121 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/cluster/src')
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java12
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterCapability.java16
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java10
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberHandle.java4
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java2
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleMemberHandle.java13
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java14
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java31
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java7
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java11
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java42
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java7
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java5
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java5
-rw-r--r--java/cluster/src/test/java/org/apache/qpid/server/cluster/ClusterCapabilityTest.java5
-rw-r--r--java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java3
16 files changed, 80 insertions, 107 deletions
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
index c604709078..3c1b50fb99 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
@@ -29,13 +29,7 @@ import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.IllegalStateTransitionException;
import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.ConnectionOpenOkBody;
-import org.apache.qpid.framing.ConnectionSecureBody;
-import org.apache.qpid.framing.ConnectionStartBody;
-import org.apache.qpid.framing.ConnectionTuneBody;
+import org.apache.qpid.framing.*;
import java.util.HashMap;
import java.util.Map;
@@ -127,9 +121,9 @@ public class ClientHandlerRegistry extends AMQStateManager
class ConnectionTuneHandler extends ConnectionTuneMethodHandler
{
- protected AMQFrame createConnectionOpenFrame(int channel, String path, String capabilities, boolean insist)
+ protected AMQFrame createConnectionOpenFrame(int channel, AMQShortString path, AMQShortString capabilities, boolean insist)
{
- return super.createConnectionOpenFrame(channel, path, ClusterCapability.add(capabilities, _identity), insist);
+ return super.createConnectionOpenFrame(channel, path, new AMQShortString(ClusterCapability.add(capabilities, _identity)), insist);
}
}
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterCapability.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterCapability.java
index 0411019334..57c48f0611 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterCapability.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterCapability.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.cluster;
+import org.apache.qpid.framing.AMQShortString;
+
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -28,22 +30,22 @@ public class ClusterCapability
public static final String PATTERN = ".*\\bcluster_peer=(\\S*:\\d*)\b*.*";
public static final String PEER = "cluster_peer";
- public static String add(String original, MemberHandle identity)
+ public static AMQShortString add(AMQShortString original, MemberHandle identity)
{
- return original == null ? peer(identity) : original + " " + peer(identity);
+ return original == null ? peer(identity) : new AMQShortString(original + " " + peer(identity));
}
- private static String peer(MemberHandle identity)
+ private static AMQShortString peer(MemberHandle identity)
{
- return PEER + "=" + identity.getDetails();
+ return new AMQShortString(PEER + "=" + identity.getDetails());
}
- public static boolean contains(String in)
+ public static boolean contains(AMQShortString in)
{
- return in != null && in.contains(in);
+ return in != null; // && in.contains(in);
}
- public static MemberHandle getPeer(String in)
+ public static MemberHandle getPeer(AMQShortString in)
{
Matcher matcher = Pattern.compile(PATTERN).matcher(in);
if (matcher.matches())
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
index a7936be8db..d8d2220a8e 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
@@ -109,7 +109,7 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
ClusterPingBody ping = new ClusterPingBody((byte)8, (byte)0);
- ping.broker = _group.getLocal().getDetails();
+ ping.broker = new AMQShortString(_group.getLocal().getDetails());
ping.responseRequired = true;
ping.load = _loadTable.getLocalLoad();
BlockingHandler handler = new BlockingHandler();
@@ -157,7 +157,7 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
ClusterJoinBody join = new ClusterJoinBody((byte)8, (byte)0);
- join.broker = _group.getLocal().getDetails();
+ join.broker = new AMQShortString(_group.getLocal().getDetails());
send(leader, new SimpleBodySendable(join));
}
@@ -178,7 +178,7 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
ClusterLeaveBody leave = new ClusterLeaveBody((byte)8, (byte)0);
- leave.broker = _group.getLocal().getDetails();
+ leave.broker = new AMQShortString(_group.getLocal().getDetails());
send(getLeader(), new SimpleBodySendable(leave));
}
@@ -201,7 +201,7 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
ClusterSuspectBody suspect = new ClusterSuspectBody((byte)8, (byte)0);
- suspect.broker = broker.getDetails();
+ suspect.broker = new AMQShortString(broker.getDetails());
send(getLeader(), new SimpleBodySendable(suspect));
}
}
@@ -225,7 +225,7 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
ClusterJoinBody request = new ClusterJoinBody((byte)8, (byte)0);
- request.broker = member.getDetails();
+ request.broker = new AMQShortString(member.getDetails());
Broker leader = getLeader();
send(leader, new SimpleBodySendable(request));
_logger.info(new LogMessage("Passed join request for {0} to {1}", member, leader));
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberHandle.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberHandle.java
index b14fede5aa..b8099a12f7 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberHandle.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberHandle.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.cluster;
+import org.apache.qpid.framing.AMQShortString;
+
public interface MemberHandle
{
public String getHost();
@@ -30,5 +32,5 @@ public interface MemberHandle
public boolean matches(String host, int port);
- public String getDetails();
+ public AMQShortString getDetails();
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
index 275ed39b5f..8557fc17c7 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
@@ -180,7 +180,7 @@ public class MinaBrokerProxy extends Broker implements MethodHandler
{
//signal redirection to waiting thread
ConnectionRedirectBody redirect = (ConnectionRedirectBody) method;
- String[] parts = redirect.host.split(":");
+ String[] parts = redirect.host.toString().split(":");
_connectionMonitor.redirect(parts[0], Integer.parseInt(parts[1]));
}
else
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleMemberHandle.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleMemberHandle.java
index b6d5e3d88d..1255094b1d 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleMemberHandle.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleMemberHandle.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.cluster;
+import org.apache.qpid.framing.AMQShortString;
+
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
@@ -37,6 +39,11 @@ public class SimpleMemberHandle implements MemberHandle
_port = port;
}
+ public SimpleMemberHandle(AMQShortString details)
+ {
+ this(details.toString());
+ }
+
public SimpleMemberHandle(String details)
{
String[] parts = details.split(":");
@@ -84,14 +91,14 @@ public class SimpleMemberHandle implements MemberHandle
return _host.equals(host) && _port == port;
}
- public String getDetails()
+ public AMQShortString getDetails()
{
- return _host + ":" + _port;
+ return new AMQShortString(_host + ":" + _port);
}
public String toString()
{
- return getDetails();
+ return getDetails().toString();
}
static List<MemberHandle> stringToMembers(String membership)
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java
index 022ee098ab..9cff310a8a 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java
@@ -28,11 +28,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.protocol.AMQMethodEvent;
import org.apache.qpid.server.cluster.util.LogMessage;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.QueueBindBody;
-import org.apache.qpid.framing.QueueDeleteBody;
-import org.apache.qpid.framing.BasicConsumeBody;
+import org.apache.qpid.framing.*;
import org.apache.log4j.Logger;
import java.util.Map;
@@ -46,7 +42,7 @@ import java.util.HashMap;
class ChannelQueueManager
{
private static final Logger _logger = Logger.getLogger(ChannelQueueManager.class);
- private final Map<Integer, String> _channelQueues = new HashMap<Integer, String>();
+ private final Map<Integer, AMQShortString> _channelQueues = new HashMap<Integer, AMQShortString>();
ClusterMethodHandler<QueueDeclareBody> createQueueDeclareHandler()
{
@@ -68,15 +64,15 @@ class ChannelQueueManager
return new BasicConsumeHandler();
}
- private void set(int channel, String queue)
+ private void set(int channel, AMQShortString queue)
{
_channelQueues.put(channel, queue);
_logger.info(new LogMessage("Set default queue for {0} to {1}", channel, queue));
}
- private String get(int channel)
+ private AMQShortString get(int channel)
{
- String queue = _channelQueues.get(channel);
+ AMQShortString queue = _channelQueues.get(channel);
_logger.info(new LogMessage("Default queue for {0} is {1}", channel, queue));
return queue;
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java
index 46ba3e5015..f90373cd98 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java
@@ -21,34 +21,7 @@
package org.apache.qpid.server.cluster.handler;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.BasicCancelBody;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.ChannelFlowBody;
-import org.apache.qpid.framing.ChannelOpenBody;
-import org.apache.qpid.framing.ClusterJoinBody;
-import org.apache.qpid.framing.ClusterLeaveBody;
-import org.apache.qpid.framing.ClusterMembershipBody;
-import org.apache.qpid.framing.ClusterPingBody;
-import org.apache.qpid.framing.ClusterSuspectBody;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.ConnectionOpenBody;
-import org.apache.qpid.framing.ConnectionSecureOkBody;
-import org.apache.qpid.framing.ConnectionStartOkBody;
-import org.apache.qpid.framing.ConnectionTuneOkBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
-import org.apache.qpid.framing.ExchangeDeleteBody;
-import org.apache.qpid.framing.QueueBindBody;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.QueueDeleteBody;
-import org.apache.qpid.framing.ClusterSynchBody;
-import org.apache.qpid.framing.BasicQosBody;
-import org.apache.qpid.framing.TxSelectBody;
-import org.apache.qpid.framing.TxCommitBody;
-import org.apache.qpid.framing.TxRollbackBody;
+import org.apache.qpid.framing.*;
import org.apache.qpid.server.cluster.ClusterCapability;
import org.apache.qpid.server.cluster.ClusteredProtocolSession;
import org.apache.qpid.server.cluster.GroupManager;
@@ -236,7 +209,7 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory
void postHandle(AMQStateManager stateMgr, AMQProtocolSession session, AMQMethodEvent<ConnectionOpenBody> evt)
{
- String capabilities = evt.getMethod().capabilities;
+ AMQShortString capabilities = evt.getMethod().capabilities;
if (ClusterCapability.contains(capabilities))
{
ClusteredProtocolSession.setSessionPeer(session, ClusterCapability.getPeer(capabilities));
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java
index 1e6bc26444..6b876095a4 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.cluster.handler;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.QueueDeclareBody;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.cluster.ClusteredProtocolSession;
import org.apache.qpid.server.cluster.GroupManager;
import org.apache.qpid.server.cluster.util.LogMessage;
@@ -45,9 +46,9 @@ public class LocalQueueDeclareHandler extends QueueDeclareHandler
_groupMgr = groupMgr;
}
- protected String createName()
+ protected AMQShortString createName()
{
- return super.createName() + "@" + _groupMgr.getLocal().getDetails();
+ return new AMQShortString(super.createName().toString() + "@" + _groupMgr.getLocal().getDetails());
}
protected AMQQueue createQueue(QueueDeclareBody body, QueueRegistry registry, AMQProtocolSession session) throws AMQException
@@ -60,7 +61,7 @@ public class LocalQueueDeclareHandler extends QueueDeclareHandler
//need to get peer from the session...
MemberHandle peer = ClusteredProtocolSession.getSessionPeer(session);
_logger.debug(new LogMessage("Creating proxied queue {0} on behalf of {1}", body.queue, peer));
- return new RemoteQueueProxy(peer, _groupMgr, body.queue, body.durable, peer.getDetails(), body.autoDelete, registry);
+ return new RemoteQueueProxy(peer, _groupMgr, body.queue, body.durable, new AMQShortString(peer.getDetails()), body.autoDelete, registry);
}
else
{
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
index 832e4830ab..722ec1b256 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.cluster.replay;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.BasicConsumeBody;
+import org.apache.qpid.framing.AMQShortString;
import java.util.Map;
import java.util.HashMap;
@@ -29,19 +30,19 @@ import java.util.List;
class ConsumerCounts
{
- private final Map<String, Integer> _counts = new HashMap<String, Integer>();
+ private final Map<AMQShortString, Integer> _counts = new HashMap<AMQShortString, Integer>();
- synchronized void increment(String queue)
+ synchronized void increment(AMQShortString queue)
{
_counts.put(queue, get(queue) + 1);
}
- synchronized void decrement(String queue)
+ synchronized void decrement(AMQShortString queue)
{
_counts.put(queue, get(queue) - 1);
}
- private int get(String queue)
+ private int get(AMQShortString queue)
{
Integer count = _counts.get(queue);
return count == null ? 0 : count;
@@ -49,7 +50,7 @@ class ConsumerCounts
synchronized void replay(List<AMQMethodBody> messages)
{
- for(String queue : _counts.keySet())
+ for(AMQShortString queue : _counts.keySet())
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
index 338817e892..3193c206c7 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
@@ -22,15 +22,7 @@ package org.apache.qpid.server.cluster.replay;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
-import org.apache.qpid.framing.ExchangeDeleteBody;
-import org.apache.qpid.framing.QueueBindBody;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.QueueDeleteBody;
-import org.apache.qpid.framing.ClusterSynchBody;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.BasicCancelBody;
+import org.apache.qpid.framing.*;
import org.apache.qpid.server.cluster.ClusteredProtocolSession;
import org.apache.qpid.server.cluster.util.LogMessage;
import org.apache.qpid.server.cluster.util.Bindings;
@@ -57,11 +49,11 @@ public class ReplayStore implements ReplayManager, StateAwareMethodListener
private final Map<Class<? extends AMQMethodBody>, MethodRecorder> _globalRecorders = new HashMap<Class<? extends AMQMethodBody>, MethodRecorder>();
private final Map<Class<? extends AMQMethodBody>, MethodRecorder> _localRecorders = new HashMap<Class<? extends AMQMethodBody>, MethodRecorder>();
- private final Map<String, QueueDeclareBody> _sharedQueues = new ConcurrentHashMap<String, QueueDeclareBody>();
- private final Map<String, QueueDeclareBody> _privateQueues = new ConcurrentHashMap<String, QueueDeclareBody>();
- private final Bindings<String, String, QueueBindBody> _sharedBindings = new Bindings<String, String, QueueBindBody>();
- private final Bindings<String, String, QueueBindBody> _privateBindings = new Bindings<String, String, QueueBindBody>();
- private final Map<String, ExchangeDeclareBody> _exchanges = new ConcurrentHashMap<String, ExchangeDeclareBody>();
+ private final Map<AMQShortString, QueueDeclareBody> _sharedQueues = new ConcurrentHashMap<AMQShortString, QueueDeclareBody>();
+ private final Map<AMQShortString, QueueDeclareBody> _privateQueues = new ConcurrentHashMap<AMQShortString, QueueDeclareBody>();
+ private final Bindings<AMQShortString, AMQShortString, QueueBindBody> _sharedBindings = new Bindings<AMQShortString, AMQShortString, QueueBindBody>();
+ private final Bindings<AMQShortString, AMQShortString, QueueBindBody> _privateBindings = new Bindings<AMQShortString, AMQShortString, QueueBindBody>();
+ private final Map<AMQShortString, ExchangeDeclareBody> _exchanges = new ConcurrentHashMap<AMQShortString, ExchangeDeclareBody>();
private final ConsumerCounts _consumers = new ConsumerCounts();
public ReplayStore()
@@ -204,15 +196,15 @@ public class ReplayStore implements ReplayManager, StateAwareMethodListener
private static class QueueDeclareRecorder extends ChainedMethodRecorder<QueueDeclareBody>
{
private final boolean _exclusive;
- private final Map<String, QueueDeclareBody> _queues;
+ private final Map<AMQShortString, QueueDeclareBody> _queues;
- QueueDeclareRecorder(boolean exclusive, Map<String, QueueDeclareBody> queues)
+ QueueDeclareRecorder(boolean exclusive, Map<AMQShortString, QueueDeclareBody> queues)
{
_queues = queues;
_exclusive = exclusive;
}
- QueueDeclareRecorder(boolean exclusive, Map<String, QueueDeclareBody> queues, QueueDeclareRecorder recorder)
+ QueueDeclareRecorder(boolean exclusive, Map<AMQShortString, QueueDeclareBody> queues, QueueDeclareRecorder recorder)
{
super(recorder);
_queues = queues;
@@ -236,15 +228,15 @@ public class ReplayStore implements ReplayManager, StateAwareMethodListener
private class QueueDeleteRecorder extends ChainedMethodRecorder<QueueDeleteBody>
{
- private final Map<String, QueueDeclareBody> _queues;
- private final Bindings<String, String, QueueBindBody> _bindings;
+ private final Map<AMQShortString, QueueDeclareBody> _queues;
+ private final Bindings<AMQShortString, AMQShortString, QueueBindBody> _bindings;
- QueueDeleteRecorder(Map<String, QueueDeclareBody> queues, Bindings<String, String, QueueBindBody> bindings)
+ QueueDeleteRecorder(Map<AMQShortString, QueueDeclareBody> queues, Bindings<AMQShortString, AMQShortString, QueueBindBody> bindings)
{
this(queues, bindings, null);
}
- QueueDeleteRecorder(Map<String, QueueDeclareBody> queues, Bindings<String, String, QueueBindBody> bindings, QueueDeleteRecorder recorder)
+ QueueDeleteRecorder(Map<AMQShortString, QueueDeclareBody> queues, Bindings<AMQShortString, AMQShortString, QueueBindBody> bindings, QueueDeleteRecorder recorder)
{
super(recorder);
_queues = queues;
@@ -267,16 +259,16 @@ public class ReplayStore implements ReplayManager, StateAwareMethodListener
private class QueueBindRecorder extends ChainedMethodRecorder<QueueBindBody>
{
- private final Map<String, QueueDeclareBody> _queues;
- private final Bindings<String, String, QueueBindBody> _bindings;
+ private final Map<AMQShortString, QueueDeclareBody> _queues;
+ private final Bindings<AMQShortString, AMQShortString, QueueBindBody> _bindings;
- QueueBindRecorder(Map<String, QueueDeclareBody> queues, Bindings<String, String, QueueBindBody> bindings)
+ QueueBindRecorder(Map<AMQShortString, QueueDeclareBody> queues, Bindings<AMQShortString, AMQShortString, QueueBindBody> bindings)
{
_queues = queues;
_bindings = bindings;
}
- QueueBindRecorder(Map<String, QueueDeclareBody> queues, Bindings<String, String, QueueBindBody> bindings, QueueBindRecorder recorder)
+ QueueBindRecorder(Map<AMQShortString, QueueDeclareBody> queues, Bindings<AMQShortString, AMQShortString, QueueBindBody> bindings, QueueBindRecorder recorder)
{
super(recorder);
_queues = queues;
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
index 50b2fa0b66..6898ffcec2 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
@@ -24,6 +24,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicCancelBody;
import org.apache.qpid.framing.QueueDeleteBody;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.cluster.*;
import org.apache.qpid.server.cluster.util.LogMessage;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -45,7 +46,7 @@ public class ClusteredQueue extends AMQQueue
private final GroupManager _groupMgr;
private final NestedSubscriptionManager _subscriptions;
- public ClusteredQueue(GroupManager groupMgr, String name, boolean durable, String owner, boolean autoDelete, QueueRegistry queueRegistry)
+ public ClusteredQueue(GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, QueueRegistry queueRegistry)
throws AMQException
{
super(name, durable, owner, autoDelete, queueRegistry, new ClusteredSubscriptionManager());
@@ -53,7 +54,7 @@ public class ClusteredQueue extends AMQQueue
_subscriptions = ((ClusteredSubscriptionManager) getSubscribers()).getAllSubscribers();
}
- public ClusteredQueue(GroupManager groupMgr, String name, boolean durable, String owner, boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery)
+ public ClusteredQueue(GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery)
throws AMQException
{
super(name, durable, owner, autoDelete, queueRegistry, asyncDelivery, new ClusteredSubscriptionManager(),
@@ -84,7 +85,7 @@ public class ClusteredQueue extends AMQQueue
}
}
- public void unregisterProtocolSession(AMQProtocolSession ps, int channel, String consumerTag) throws AMQException
+ public void unregisterProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag) throws AMQException
{
//handle locally:
super.unregisterProtocolSession(ps, channel, consumerTag);
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java
index 8315d46b5d..89ce0bc8b1 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java
@@ -25,6 +25,7 @@ import org.apache.qpid.server.cluster.SimpleSendable;
import org.apache.qpid.server.cluster.GroupManager;
import org.apache.qpid.server.cluster.SimpleBodySendable;
import org.apache.qpid.framing.QueueDeleteBody;
+import org.apache.qpid.framing.AMQShortString;
import java.util.concurrent.Executor;
@@ -36,7 +37,7 @@ public class PrivateQueue extends AMQQueue
{
private final GroupManager _groupMgr;
- public PrivateQueue(GroupManager groupMgr, String name, boolean durable, String owner, boolean autoDelete, QueueRegistry queueRegistry)
+ public PrivateQueue(GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, QueueRegistry queueRegistry)
throws AMQException
{
super(name, durable, owner, autoDelete, queueRegistry);
@@ -44,7 +45,7 @@ public class PrivateQueue extends AMQQueue
}
- public PrivateQueue(GroupManager groupMgr, String name, boolean durable, String owner, boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery)
+ public PrivateQueue(GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery)
throws AMQException
{
super(name, durable, owner, autoDelete, queueRegistry, asyncDelivery);
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java
index f8eba282e2..a6cce05a03 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.cluster.ClusteredProtocolSession;
import org.apache.qpid.server.cluster.GroupManager;
import org.apache.qpid.server.cluster.MemberHandle;
@@ -42,7 +43,7 @@ public class RemoteQueueProxy extends AMQQueue
private final MemberHandle _target;
private final GroupManager _groupMgr;
- public RemoteQueueProxy(MemberHandle target, GroupManager groupMgr, String name, boolean durable, String owner, boolean autoDelete, QueueRegistry queueRegistry)
+ public RemoteQueueProxy(MemberHandle target, GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, QueueRegistry queueRegistry)
throws AMQException
{
super(name, durable, owner, autoDelete, queueRegistry);
@@ -51,7 +52,7 @@ public class RemoteQueueProxy extends AMQQueue
_groupMgr.addMemberhipChangeListener(new ProxiedQueueCleanup(target, this));
}
- public RemoteQueueProxy(MemberHandle target, GroupManager groupMgr, String name, boolean durable, String owner, boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery)
+ public RemoteQueueProxy(MemberHandle target, GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery)
throws AMQException
{
super(name, durable, owner, autoDelete, queueRegistry, asyncDelivery);
diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/ClusterCapabilityTest.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/ClusterCapabilityTest.java
index 76b1da8754..830a00f4c2 100644
--- a/java/cluster/src/test/java/org/apache/qpid/server/cluster/ClusterCapabilityTest.java
+++ b/java/cluster/src/test/java/org/apache/qpid/server/cluster/ClusterCapabilityTest.java
@@ -21,13 +21,14 @@
package org.apache.qpid.server.cluster;
import junit.framework.TestCase;
+import org.apache.qpid.framing.AMQShortString;
public class ClusterCapabilityTest extends TestCase
{
public void testStartWithNull()
{
MemberHandle peer = new SimpleMemberHandle("myhost:9999");
- String c = ClusterCapability.add(null, peer);
+ AMQShortString c = ClusterCapability.add(null, peer);
assertTrue(ClusterCapability.contains(c));
assertTrue(peer.matches(ClusterCapability.getPeer(c)));
}
@@ -35,7 +36,7 @@ public class ClusterCapabilityTest extends TestCase
public void testStartWithText()
{
MemberHandle peer = new SimpleMemberHandle("myhost:9999");
- String c = ClusterCapability.add("existing text", peer);
+ AMQShortString c = ClusterCapability.add(new AMQShortString("existing text"), peer);
assertTrue(ClusterCapability.contains(c));
assertTrue(peer.matches(ClusterCapability.getPeer(c)));
}
diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java
index c427285f4a..70209cd2a3 100644
--- a/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java
+++ b/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.cluster;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.url.URLSyntaxException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
@@ -36,7 +37,7 @@ public class SimpleClusterTest extends TestCase
AMQConnection con = new AMQConnection("localhost:9000", "guest", "guest", "test", "/test");
AMQSession session = (AMQSession) con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
System.out.println("Session created");
- session.declareExchange("my_exchange", "direct");
+ session.declareExchange(new AMQShortString("my_exchange"), new AMQShortString("direct"));
System.out.println("Exchange declared");
con.close();
System.out.println("Connection closed");