summaryrefslogtreecommitdiff
path: root/qpid/java/cluster/src/org
diff options
context:
space:
mode:
authorStephen Vinoski <vinoski@apache.org>2006-11-18 02:12:32 +0000
committerStephen Vinoski <vinoski@apache.org>2006-11-18 02:12:32 +0000
commitdf3dd68224a4703cd03a73cf1bcc3cfcfb3096e2 (patch)
treea81743f333690819f8e0b667c0036db8b0d268e8 /qpid/java/cluster/src/org
parentd6d523c5325aec2411f582b1249993f171748bf7 (diff)
downloadqpid-python-df3dd68224a4703cd03a73cf1bcc3cfcfb3096e2.tar.gz
directory moves required for maven merge
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@476414 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/cluster/src/org')
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/BlockingHandler.java91
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/BroadcastPolicy.java26
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/Broker.java247
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/BrokerFactory.java26
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/BrokerGroup.java368
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/ClientAdapter.java73
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/ClientHandlerRegistry.java135
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/ClusterBuilder.java63
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/ClusterCapability.java58
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java193
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/ClusteredProtocolSession.java135
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/ConnectionStatusMonitor.java80
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/DefaultGroupManager.java369
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/GroupManager.java72
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/GroupRequest.java107
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/GroupResponseHandler.java31
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/InductionBuffer.java90
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/JoinState.java26
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/LoadTable.java107
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/Main.java122
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/Member.java31
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/MemberFailureListener.java26
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/MemberHandle.java34
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/MembershipChangeListener.java28
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/MethodHandler.java29
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/MethodHandlerFactory.java28
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/MethodHandlerRegistry.java44
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/MinaBrokerProxy.java271
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/MinaBrokerProxyFactory.java36
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/ResponseHandler.java30
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/Sendable.java28
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/ServerHandlerRegistry.java94
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/SimpleMemberHandle.java159
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/SimpleSendable.java56
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java72
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java139
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java49
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java281
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/ExtendedHandler.java55
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/HandlerUtils.java25
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java77
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/NullListener.java38
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/PeerHandler.java60
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java63
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java53
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java58
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java84
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java130
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/WrappedListener.java56
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/WrappingMethodHandlerFactory.java85
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/policy/AsynchBroadcastPolicy.java31
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/policy/MajorityResponseBroadcastPolicy.java31
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/policy/OneResponseBroadcastPolicy.java31
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/policy/StandardPolicies.java29
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/policy/SynchBroadcastPolicy.java31
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/replay/ChainedMethodRecorder.java48
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/replay/ConsumerCounts.java69
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/replay/MethodRecorder.java32
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java73
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/replay/ReplayManager.java37
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/replay/ReplayStore.java313
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/util/Bindings.java83
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/util/InvokeMultiple.java72
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/util/LogMessage.java53
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/cluster/util/MultiValuedMap.java61
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/queue/ClusteredQueue.java164
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java95
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/queue/NestedSubscriptionManager.java103
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/queue/PrivateQueue.java63
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/queue/ProxiedQueueCleanup.java60
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/queue/RemoteQueueProxy.java109
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java96
-rw-r--r--qpid/java/cluster/src/org/apache/qpid/server/queue/SubscriberCleanup.java56
73 files changed, 0 insertions, 6378 deletions
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/BlockingHandler.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/BlockingHandler.java
deleted file mode 100644
index 39508df566..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/BlockingHandler.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.qpid.framing.AMQMethodBody;
-
-public class BlockingHandler implements ResponseHandler
-{
- private final Class _expected;
- private boolean _completed;
- private AMQMethodBody _response;
-
-
- public BlockingHandler()
- {
- this(AMQMethodBody.class);
- }
-
- public BlockingHandler(Class<? extends AMQMethodBody> expected)
- {
- _expected = expected;
- }
-
- public void responded(AMQMethodBody response)
- {
- if (_expected.isInstance(response))
- {
- _response = response;
- completed();
- }
- }
-
- public void removed()
- {
- completed();
- }
-
- private synchronized void completed()
- {
- _completed = true;
- notifyAll();
- }
-
- synchronized void waitForCompletion()
- {
- while (!_completed)
- {
- try
- {
- wait();
- }
- catch (InterruptedException ignore)
- {
-
- }
- }
- }
-
- AMQMethodBody getResponse()
- {
- return _response;
- }
-
- boolean failed()
- {
- return _response == null;
- }
-
- boolean isCompleted()
- {
- return _completed;
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/BroadcastPolicy.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/BroadcastPolicy.java
deleted file mode 100644
index 145aa58574..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/BroadcastPolicy.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-public interface BroadcastPolicy
-{
- public boolean isComplete(int responded, int members);
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/Broker.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/Broker.java
deleted file mode 100644
index 7e2cf6da83..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/Broker.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.cluster.util.LogMessage;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.log4j.Logger;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * An implementation of the Member interface (through which data is sent to other
- * peers in the cluster). This class provides a base from which subclasses can
- * inherit some common behaviour for broadcasting GroupRequests and sending methods
- * that may expect a response. It also extends the Member abstraction to support
- * a richer set of operations that are useful within the package but should not be
- * exposed outside of it.
- *
- */
-abstract class Broker extends SimpleMemberHandle implements Member
-{
- private static final Logger _logger = Logger.getLogger(Broker.class);
- private static final int DEFAULT_CHANNEL = 1;
- private static final int START_CHANNEL = 2;
- private static final int END_CHANNEL = 10000;
-
-
- private MemberFailureListener _listener;
- //a wrap-around counter to allocate _requests a unique channel:
- private int _nextChannel = START_CHANNEL;
- //outstanding _requests:
- private final Map<Integer, ResponseHandler> _requests = new HashMap<Integer, ResponseHandler>();
-
- Broker(String host, int port)
- {
- super(host, port);
- }
-
- /**
- * Allows a listener to be registered that will receive callbacks when communication
- * to the peer this broker instance represents fails.
- * @param listener the callback to be notified of failures
- */
- public void addFailureListener(MemberFailureListener listener)
- {
- _listener = listener;
- }
-
- /**
- * Allows subclasses to signal comunication failures
- */
- protected void failed()
- {
- if (_listener != null)
- {
- _listener.failed(this);
- }
- }
-
- /**
- * Subclasses should call this on receiving message responses from the remote
- * peer. They are matched to any outstanding request they might be response
- * to, with the completion and callback of that request being managed if
- * required.
- *
- * @param channel the channel on which the method was received
- * @param response the response received
- * @return true if the response matched an outstanding request
- */
- protected synchronized boolean handleResponse(int channel, AMQMethodBody response)
- {
- ResponseHandler request = _requests.get(channel);
- if (request == null)
- {
- if(!_requests.isEmpty())
- {
- _logger.warn(new LogMessage("[next channel={3, integer}]: Response {0} on channel {1, integer} failed to match outstanding requests: {2}", response, channel, _requests, _nextChannel));
- }
- return false;
- }
- else
- {
- request.responded(response);
- return true;
- }
- }
-
- /**
- * Called when this broker is excluded from the group. Any requests made on
- * it are informed this member has left the group.
- */
- synchronized void remove()
- {
- for (ResponseHandler r : _requests.values())
- {
- r.removed();
- }
- }
-
- /**
- * Engages this broker in the specified group request
- *
- * @param request the request being made to a group of brokers
- * @throws AMQException if there is any failure
- */
- synchronized void invoke(GroupRequest request) throws AMQException
- {
- int channel = nextChannel();
- _requests.put(channel, new GroupRequestAdapter(request, channel));
- request.send(channel, this);
- }
-
- /**
- * Sends a message to the remote peer and undertakes to notify the specified
- * handler of the response.
- *
- * @param msg the message to send
- * @param handler the callback to notify of responses (or the removal of this broker
- * from the group)
- * @throws AMQException
- */
- synchronized void send(Sendable msg, ResponseHandler handler) throws AMQException
- {
- int channel;
- if (handler != null)
- {
- channel = nextChannel();
- _requests.put(channel, new RemovingWrapper(handler, channel));
- }
- else
- {
- channel = DEFAULT_CHANNEL;
- }
-
- msg.send(channel, this);
- }
-
- private int nextChannel()
- {
- int channel = _nextChannel++;
- if(_nextChannel >= END_CHANNEL)
- {
- _nextChannel = START_CHANNEL;
- }
- return channel;
- }
-
- /**
- * extablish connection without handling redirect
- */
- abstract boolean connect() throws IOException, InterruptedException;
-
- /**
- * Start connection process, including replay
- */
- abstract void connectAsynch(Iterable<AMQMethodBody> msgs);
-
- /**
- * Replay messages to the remote peer this instance represents. These messages
- * must be sent before any others whose transmission is requested through send() etc.
- *
- * @param msgs
- */
- abstract void replay(Iterable<AMQMethodBody> msgs);
-
- /**
- * establish connection, handling redirect if required...
- */
- abstract Broker connectToCluster() throws IOException, InterruptedException;
-
- private class GroupRequestAdapter implements ResponseHandler
- {
- private final GroupRequest request;
- private final int channel;
-
- GroupRequestAdapter(GroupRequest request, int channel)
- {
- this.request = request;
- this.channel = channel;
- }
-
- public void responded(AMQMethodBody response)
- {
- request.responseReceived(Broker.this, response);
- _requests.remove(channel);
- }
-
- public void removed()
- {
- request.removed(Broker.this);
- }
-
- public String toString()
- {
- return "GroupRequestAdapter{" + channel + ", " + request + "}";
- }
- }
-
- private class RemovingWrapper implements ResponseHandler
- {
- private final ResponseHandler handler;
- private final int channel;
-
- RemovingWrapper(ResponseHandler handler, int channel)
- {
- this.handler = handler;
- this.channel = channel;
- }
-
- public void responded(AMQMethodBody response)
- {
- handler.responded(response);
- _requests.remove(channel);
- }
-
- public void removed()
- {
- handler.removed();
- }
-
- public String toString()
- {
- return "RemovingWrapper{" + channel + ", " + handler + "}";
- }
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/BrokerFactory.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/BrokerFactory.java
deleted file mode 100644
index 92c3c4e7bf..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/BrokerFactory.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-interface BrokerFactory
-{
- public Broker create(MemberHandle handle);
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/BrokerGroup.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/BrokerGroup.java
deleted file mode 100644
index 755a341607..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/BrokerGroup.java
+++ /dev/null
@@ -1,368 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.cluster.replay.ReplayManager;
-import org.apache.qpid.server.cluster.util.LogMessage;
-import org.apache.qpid.server.cluster.util.InvokeMultiple;
-import org.apache.qpid.framing.AMQMethodBody;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-/**
- * Manages the membership list of a group and the set of brokers representing the
- * remote peers. The group should be initialised through a call to establish()
- * or connectToLeader().
- *
- */
-class BrokerGroup
-{
- private static final Logger _logger = Logger.getLogger(BrokerGroup.class);
-
- private final InvokeMultiple<MembershipChangeListener> _changeListeners = new InvokeMultiple<MembershipChangeListener>(MembershipChangeListener.class);
- private final ReplayManager _replayMgr;
- private final MemberHandle _local;
- private final BrokerFactory _factory;
- private final Object _lock = new Object();
- private final Set<MemberHandle> _synch = new HashSet<MemberHandle>();
- private List<MemberHandle> _members;
- private List<Broker> _peers = new ArrayList<Broker>();
- private JoinState _state = JoinState.UNINITIALISED;
-
- /**
- * Creates an unitialised group.
- *
- * @param local a handle that represents the local broker
- * @param replayMgr the replay manager to use when creating new brokers
- * @param factory the factory through which broker instances are created
- */
- BrokerGroup(MemberHandle local, ReplayManager replayMgr, BrokerFactory factory)
- {
- _replayMgr = replayMgr;
- _local = local;
- _factory = factory;
- }
-
- /**
- * Called to establish the local broker as the leader of a new group
- */
- void establish()
- {
- synchronized (_lock)
- {
- setState(JoinState.JOINED);
- _members = new ArrayList<MemberHandle>();
- _members.add(_local);
- }
- fireChange();
- }
-
- /**
- * Called by prospect to connect to group
- */
- Broker connectToLeader(MemberHandle handle) throws Exception
- {
- Broker leader = _factory.create(handle);
- leader = leader.connectToCluster();
- synchronized (_lock)
- {
- setState(JoinState.JOINING);
- _members = new ArrayList<MemberHandle>();
- _members.add(leader);
- _peers.add(leader);
- }
- fireChange();
- return leader;
- }
-
- /**
- * Called by leader when handling a join request
- */
- Broker connectToProspect(MemberHandle handle) throws IOException, InterruptedException
- {
- Broker prospect = _factory.create(handle);
- prospect.connect();
- synchronized (_lock)
- {
- _members.add(prospect);
- _peers.add(prospect);
- }
- fireChange();
- return prospect;
- }
-
- /**
- * Called in reponse to membership announcements.
- *
- * @param members the list of members now part of the group
- */
- void setMembers(List<MemberHandle> members)
- {
- if (isJoined())
- {
- List<Broker> old = _peers;
-
- synchronized (_lock)
- {
- _peers = getBrokers(members);
- _members = new ArrayList<MemberHandle>(members);
- }
-
- //remove those that are still members
- old.removeAll(_peers);
-
- //handle failure of any brokers that haven't survived
- for (Broker peer : old)
- {
- peer.remove();
- }
- }
- else
- {
- synchronized (_lock)
- {
- setState(JoinState.INITIATION);
- _members = new ArrayList<MemberHandle>(members);
- _synch.addAll(_members);
- _synch.remove(_local);
- }
- }
- fireChange();
- }
-
- List<MemberHandle> getMembers()
- {
- synchronized (_lock)
- {
- return Collections.unmodifiableList(_members);
- }
- }
-
- List<Broker> getPeers()
- {
- synchronized (_lock)
- {
- return _peers;
- }
- }
-
- /**
- * Removes the member presented from the group
- * @param peer the broker that should be removed
- */
- void remove(Broker peer)
- {
- synchronized (_lock)
- {
- _peers.remove(peer);
- _members.remove(peer);
- }
- fireChange();
- }
-
- MemberHandle getLocal()
- {
- return _local;
- }
-
- Broker getLeader()
- {
- synchronized (_lock)
- {
- return _peers.size() > 0 ? _peers.get(0) : null;
- }
- }
-
- /**
- * Allows a Broker instance to be retrieved for a given handle
- *
- * @param handle the handle for which a broker is sought
- * @param create flag to indicate whther a broker should be created for the handle if
- * one is not found within the list of known peers
- * @return the broker corresponding to handle or null if a match cannot be found and
- * create is false
- */
- Broker findBroker(MemberHandle handle, boolean create)
- {
- if (handle instanceof Broker)
- {
- return (Broker) handle;
- }
- else
- {
- for (Broker b : getPeers())
- {
- if (b.matches(handle))
- {
- return b;
- }
- }
- }
- if (create)
- {
- Broker b = _factory.create(handle);
- List<AMQMethodBody> msgs = _replayMgr.replay(isLeader(_local));
- _logger.info(new LogMessage("Replaying {0} from {1} to {2}", msgs, _local, b));
- b.connectAsynch(msgs);
-
- return b;
- }
- else
- {
- return null;
- }
- }
-
- /**
- * @param member the member to test for leadership
- * @return true if the passed in member is the group leader, false otherwise
- */
- boolean isLeader(MemberHandle member)
- {
- synchronized (_lock)
- {
- return member.matches(_members.get(0));
- }
- }
-
- /**
- * @return true if the local broker is the group leader, false otherwise
- */
- boolean isLeader()
- {
- return isLeader(_local);
- }
-
- /**
- * Used when the leader fails and the next broker in the list needs to
- * assume leadership
- * @return true if the action succeeds
- */
- boolean assumeLeadership()
- {
- boolean valid;
- synchronized (_lock)
- {
- valid = _members.size() > 1 && _local.matches(_members.get(1));
- if (valid)
- {
- _members.remove(0);
- _peers.remove(0);
- }
- }
- fireChange();
- return valid;
- }
-
- /**
- * Called in response to a Cluster.Synch message being received during the join
- * process. This indicates that the member mentioned has replayed all necessary
- * messages to the local broker.
- *
- * @param member the member from whom the synch messages was received
- */
- void synched(MemberHandle member)
- {
- _logger.info(new LogMessage("Synchronised with {0}", member));
- synchronized (_lock)
- {
- if (isLeader(member))
- {
- setState(JoinState.INDUCTION);
- }
- _synch.remove(member);
- if (_synch.isEmpty())
- {
- _peers = getBrokers(_members);
- setState(JoinState.JOINED);
- }
- }
- }
-
-
- /**
- * @return the state of the group
- */
- JoinState getState()
- {
- synchronized (_lock)
- {
- return _state;
- }
- }
-
- void addMemberhipChangeListener(MembershipChangeListener l)
- {
- _changeListeners.addListener(l);
- }
-
- void removeMemberhipChangeListener(MembershipChangeListener l)
- {
- _changeListeners.removeListener(l);
- }
-
-
-
- private void setState(JoinState state)
- {
- _logger.info(new LogMessage("Changed state from {0} to {1}", _state, state));
- _state = state;
- }
-
- private boolean isJoined()
- {
- return inState(JoinState.JOINED);
- }
-
- private boolean inState(JoinState state)
- {
- return _state.equals(state);
- }
-
- private List<Broker> getBrokers(List<MemberHandle> handles)
- {
- List<Broker> brokers = new ArrayList<Broker>();
- for (MemberHandle handle : handles)
- {
- if (!_local.matches(handle))
- {
- brokers.add(findBroker(handle, true));
- }
- }
- return brokers;
- }
-
- private void fireChange()
- {
- List<MemberHandle> members;
- synchronized(this)
- {
- members = new ArrayList(_members);
- }
- _changeListeners.getProxy().changed(Collections.unmodifiableList(members));
- }
-} \ No newline at end of file
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/ClientAdapter.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/ClientAdapter.java
deleted file mode 100644
index 7a01995abb..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/ClientAdapter.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.mina.common.IoSession;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
-import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
-import org.apache.qpid.framing.AMQMethodBody;
-
-/**
- * Hack to assist with reuse of the client handlers for connection setup in
- * the inter-broker communication within the cluster.
- *
- */
-class ClientAdapter implements MethodHandler
-{
- private final AMQProtocolSession _session;
- private final AMQStateManager _stateMgr;
-
- ClientAdapter(IoSession session, AMQStateManager stateMgr)
- {
- this(session, stateMgr, "guest", "guest", session.toString(), "/cluster");
- }
-
- ClientAdapter(IoSession session, AMQStateManager stateMgr, String user, String password, String name, String path)
- {
- _session = new SessionAdapter(session, new ConnectionAdapter(user, password, name, path));
- _stateMgr = stateMgr;
- }
-
- public void handle(int channel, AMQMethodBody method) throws AMQException
- {
- AMQMethodEvent evt = new AMQMethodEvent(channel, method, _session);
- _stateMgr.methodReceived(evt);
- }
-
- private class SessionAdapter extends AMQProtocolSession
- {
- public SessionAdapter(IoSession session, AMQConnection connection)
- {
- super(null, session, connection);
- }
- }
-
- private static class ConnectionAdapter extends AMQConnection
- {
- ConnectionAdapter(String username, String password, String clientName, String virtualPath)
- {
- super(username, password, clientName, virtualPath);
- }
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/ClientHandlerRegistry.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
deleted file mode 100644
index c604709078..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.qpid.client.handler.ConnectionCloseMethodHandler;
-import org.apache.qpid.client.handler.ConnectionOpenOkMethodHandler;
-import org.apache.qpid.client.handler.ConnectionSecureMethodHandler;
-import org.apache.qpid.client.handler.ConnectionStartMethodHandler;
-import org.apache.qpid.client.handler.ConnectionTuneMethodHandler;
-import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.client.state.AMQStateManager;
-import org.apache.qpid.client.state.IllegalStateTransitionException;
-import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.ConnectionOpenOkBody;
-import org.apache.qpid.framing.ConnectionSecureBody;
-import org.apache.qpid.framing.ConnectionStartBody;
-import org.apache.qpid.framing.ConnectionTuneBody;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * An extension of client.AMQStateManager that allows different handlers to be registered.
- *
- */
-public class ClientHandlerRegistry extends AMQStateManager
-{
- private final Map<AMQState, ClientRegistry> _handlers = new HashMap<AMQState, ClientRegistry>();
- private final MemberHandle _identity;
-
- protected ClientHandlerRegistry(MemberHandle local)
- {
- super(AMQState.CONNECTION_NOT_STARTED, false);
-
- _identity = local;
-
- addHandler(ConnectionStartBody.class, ConnectionStartMethodHandler.getInstance(),
- AMQState.CONNECTION_NOT_STARTED);
-
- addHandler(ConnectionTuneBody.class, new ConnectionTuneHandler(),
- AMQState.CONNECTION_NOT_TUNED);
- addHandler(ConnectionSecureBody.class, ConnectionSecureMethodHandler.getInstance(),
- AMQState.CONNECTION_NOT_TUNED);
- addHandler(ConnectionOpenOkBody.class, ConnectionOpenOkMethodHandler.getInstance(),
- AMQState.CONNECTION_NOT_OPENED);
-
- addHandlers(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance(),
- AMQState.CONNECTION_NOT_STARTED,
- AMQState.CONNECTION_NOT_TUNED,
- AMQState.CONNECTION_NOT_OPENED);
-
- }
-
- private ClientRegistry state(AMQState state)
- {
- ClientRegistry registry = _handlers.get(state);
- if (registry == null)
- {
- registry = new ClientRegistry();
- _handlers.put(state, registry);
- }
- return registry;
- }
-
- protected StateAwareMethodListener findStateTransitionHandler(AMQState state, AMQMethodBody frame) throws IllegalStateTransitionException
- {
- ClientRegistry registry = _handlers.get(state);
- return registry == null ? null : registry.getHandler(frame);
- }
-
-
- <A extends Class<AMQMethodBody>> void addHandlers(Class type, StateAwareMethodListener handler, AMQState... states)
- {
- for (AMQState state : states)
- {
- addHandler(type, handler, state);
- }
- }
-
- <A extends Class<AMQMethodBody>> void addHandler(Class type, StateAwareMethodListener handler, AMQState state)
- {
- ClientRegistry registry = _handlers.get(state);
- if (registry == null)
- {
- registry = new ClientRegistry();
- _handlers.put(state, registry);
- }
- registry.add(type, handler);
- }
-
- static class ClientRegistry
- {
- private final Map<Class<? extends AMQMethodBody>, StateAwareMethodListener> registry
- = new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener>();
-
- <A extends Class<AMQMethodBody>> void add(A type, StateAwareMethodListener handler)
- {
- registry.put(type, handler);
- }
-
- StateAwareMethodListener getHandler(AMQMethodBody frame)
- {
- return registry.get(frame.getClass());
- }
- }
-
- class ConnectionTuneHandler extends ConnectionTuneMethodHandler
- {
- protected AMQFrame createConnectionOpenFrame(int channel, String path, String capabilities, boolean insist)
- {
- return super.createConnectionOpenFrame(channel, path, ClusterCapability.add(capabilities, _identity), insist);
- }
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/ClusterBuilder.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/ClusterBuilder.java
deleted file mode 100644
index 89f402c1b9..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/ClusterBuilder.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.qpid.server.cluster.handler.ClusterMethodHandlerFactory;
-import org.apache.qpid.server.cluster.replay.RecordingMethodHandlerFactory;
-import org.apache.qpid.server.cluster.replay.ReplayStore;
-
-import java.net.InetSocketAddress;
-
-class ClusterBuilder
-{
- private final LoadTable loadTable = new LoadTable();
- private final ReplayStore replayStore = new ReplayStore();
- private final MemberHandle handle;
- private final GroupManager groupMgr;
-
- ClusterBuilder(InetSocketAddress address)
- {
- handle = new SimpleMemberHandle(address.getHostName(), address.getPort()).resolve();
- groupMgr = new DefaultGroupManager(handle, getBrokerFactory(), replayStore, loadTable);
- }
-
- GroupManager getGroupManager()
- {
- return groupMgr;
- }
-
- ServerHandlerRegistry getHandlerRegistry()
- {
- return new ServerHandlerRegistry(getHandlerFactory());
- }
-
- private MethodHandlerFactory getHandlerFactory()
- {
- MethodHandlerFactory factory = new ClusterMethodHandlerFactory(groupMgr, loadTable);
- //need to wrap relevant handlers with recording handler for easy replay:
- return new RecordingMethodHandlerFactory(factory, replayStore);
- }
-
- private BrokerFactory getBrokerFactory()
- {
- return new MinaBrokerProxyFactory(handle);
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/ClusterCapability.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/ClusterCapability.java
deleted file mode 100644
index 0411019334..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/ClusterCapability.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-public class ClusterCapability
-{
- public static final String PATTERN = ".*\\bcluster_peer=(\\S*:\\d*)\b*.*";
- public static final String PEER = "cluster_peer";
-
- public static String add(String original, MemberHandle identity)
- {
- return original == null ? peer(identity) : original + " " + peer(identity);
- }
-
- private static String peer(MemberHandle identity)
- {
- return PEER + "=" + identity.getDetails();
- }
-
- public static boolean contains(String in)
- {
- return in != null && in.contains(in);
- }
-
- public static MemberHandle getPeer(String in)
- {
- Matcher matcher = Pattern.compile(PATTERN).matcher(in);
- if (matcher.matches())
- {
- return new SimpleMemberHandle(matcher.group(1));
- }
- else
- {
- throw new RuntimeException("Could not find peer in '" + in + "'");
- }
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
deleted file mode 100644
index 6e7efb3659..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.log4j.Logger;
-import org.apache.mina.common.IoSession;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.framing.AMQBody;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.ConnectionOpenBody;
-import org.apache.qpid.framing.ConnectionSecureOkBody;
-import org.apache.qpid.framing.ConnectionStartOkBody;
-import org.apache.qpid.framing.ConnectionTuneOkBody;
-import org.apache.qpid.framing.ClusterMembershipBody;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQPFastProtocolHandler;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.cluster.util.LogMessage;
-
-import java.net.InetSocketAddress;
-
-public class ClusteredProtocolHandler extends AMQPFastProtocolHandler implements InductionBuffer.MessageHandler
-{
- private static final Logger _logger = Logger.getLogger(ClusteredProtocolHandler.class);
- private final InductionBuffer _peerBuffer = new InductionBuffer(this);
- private final InductionBuffer _clientBuffer = new InductionBuffer(this);
- private final GroupManager _groupMgr;
- private final ServerHandlerRegistry _handlers;
-
- public ClusteredProtocolHandler(InetSocketAddress address)
- {
- this(ApplicationRegistry.getInstance(), address);
- }
-
- public ClusteredProtocolHandler(IApplicationRegistry registry, InetSocketAddress address)
- {
- this(registry.getQueueRegistry(), registry.getExchangeRegistry(), address);
- }
-
- public ClusteredProtocolHandler(QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, InetSocketAddress address)
- {
- super(queueRegistry, exchangeRegistry);
- ClusterBuilder builder = new ClusterBuilder(address);
- _groupMgr = builder.getGroupManager();
- _handlers = builder.getHandlerRegistry();
- }
-
- public ClusteredProtocolHandler(ClusteredProtocolHandler handler)
- {
- super(handler);
- _groupMgr = handler._groupMgr;
- _handlers = handler._handlers;
- }
-
- protected void createSession(IoSession session, QueueRegistry queues, ExchangeRegistry exchanges, AMQCodecFactory codec) throws AMQException
- {
- new ClusteredProtocolSession(session, queues, exchanges, codec, new ServerHandlerRegistry(_handlers));
- }
-
- void connect(String join) throws Exception
- {
- if (join == null)
- {
- _groupMgr.establish();
- }
- else
- {
- _groupMgr.join(new SimpleMemberHandle(join));
- }
- }
-
- private boolean inState(JoinState state)
- {
- return _groupMgr.getState().equals(state);
- }
-
- public void messageReceived(IoSession session, Object msg) throws Exception
- {
- JoinState state = _groupMgr.getState();
- switch (state)
- {
- case JOINED:
- _logger.debug(new LogMessage("Received {0}", msg));
- super.messageReceived(session, msg);
- break;
- case JOINING:
- case INITIATION:
- case INDUCTION:
- buffer(session, msg);
- break;
- default:
- throw new AMQException("Received message while in state: " + state);
- }
- JoinState latest = _groupMgr.getState();
- if (!latest.equals(state))
- {
- switch (latest)
- {
- case INDUCTION:
- _logger.info("Reached induction, delivering buffered message from peers");
- _peerBuffer.deliver();
- break;
- case JOINED:
- _logger.info("Reached joined, delivering buffered message from clients");
- _clientBuffer.deliver();
- break;
- }
- }
- }
-
- private void buffer(IoSession session, Object msg) throws Exception
- {
- if (isBufferable(msg))
- {
- MemberHandle peer = ClusteredProtocolSession.getSessionPeer(session);
- if (peer == null)
- {
- _logger.debug(new LogMessage("Buffering {0} for client", msg));
- _clientBuffer.receive(session, msg);
- }
- else if (inState(JoinState.JOINING) && isMembershipAnnouncement(msg))
- {
- _logger.debug(new LogMessage("Initial membership [{0}] received from {1}", msg, peer));
- super.messageReceived(session, msg);
- }
- else if (inState(JoinState.INITIATION) && _groupMgr.isLeader(peer))
- {
- _logger.debug(new LogMessage("Replaying {0} from leader ", msg));
- super.messageReceived(session, msg);
- }
- else if (inState(JoinState.INDUCTION))
- {
- _logger.debug(new LogMessage("Replaying {0} from peer {1}", msg, peer));
- super.messageReceived(session, msg);
- }
- else
- {
- _logger.debug(new LogMessage("Buffering {0} for peer {1}", msg, peer));
- _peerBuffer.receive(session, msg);
- }
- }
- else
- {
- _logger.debug(new LogMessage("Received {0}", msg));
- super.messageReceived(session, msg);
- }
- }
-
- public void deliver(IoSession session, Object msg) throws Exception
- {
- _logger.debug(new LogMessage("Delivering {0}", msg));
- super.messageReceived(session, msg);
- }
-
- private boolean isMembershipAnnouncement(Object msg)
- {
- return msg instanceof AMQFrame && (((AMQFrame) msg).bodyFrame instanceof ClusterMembershipBody);
- }
-
- private boolean isBufferable(Object msg)
- {
- return msg instanceof AMQFrame && isBuffereable(((AMQFrame) msg).bodyFrame);
- }
-
- private boolean isBuffereable(AMQBody body)
- {
- return !(body instanceof ConnectionStartOkBody ||
- body instanceof ConnectionTuneOkBody ||
- body instanceof ConnectionSecureOkBody ||
- body instanceof ConnectionOpenBody);
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/ClusteredProtocolSession.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
deleted file mode 100644
index 1763bcd03f..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.mina.common.IoSession;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.state.AMQStateManager;
-
-public class ClusteredProtocolSession extends AMQMinaProtocolSession
-{
- private MemberHandle _peer;
-
- public ClusteredProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory, AMQStateManager stateManager)
- throws AMQException
- {
- super(session, queueRegistry, exchangeRegistry, codecFactory, stateManager);
- }
-
- public boolean isPeerSession()
- {
- return _peer != null;
- }
-
- public void setSessionPeer(MemberHandle peer)
- {
- _peer = peer;
- }
-
- public MemberHandle getSessionPeer()
- {
- return _peer;
- }
-
- public AMQChannel getChannel(int channelId)
- throws AMQException
- {
- AMQChannel channel = super.getChannel(channelId);
- if (isPeerSession() && channel == null)
- {
- channel = new OneUseChannel(channelId);
- addChannel(channel);
- }
- return channel;
- }
-
- public static boolean isPeerSession(IoSession session)
- {
- return isPeerSession(getAMQProtocolSession(session));
- }
-
- public static boolean isPeerSession(AMQProtocolSession session)
- {
- return session instanceof ClusteredProtocolSession && ((ClusteredProtocolSession) session).isPeerSession();
- }
-
- public static void setSessionPeer(AMQProtocolSession session, MemberHandle peer)
- {
- ((ClusteredProtocolSession) session).setSessionPeer(peer);
- }
-
- public static MemberHandle getSessionPeer(AMQProtocolSession session)
- {
- return ((ClusteredProtocolSession) session).getSessionPeer();
- }
-
- public static MemberHandle getSessionPeer(IoSession session)
- {
- return getSessionPeer(getAMQProtocolSession(session));
- }
-
- /**
- * Cleans itself up after delivery of a message (publish frame, header and optional body frame(s))
- */
- private class OneUseChannel extends AMQChannel
- {
- public OneUseChannel(int channelId)
- throws AMQException
- {
- this(channelId, ApplicationRegistry.getInstance());
- }
-
- public OneUseChannel(int channelId, IApplicationRegistry registry)
- throws AMQException
- {
- super(channelId,
- registry.getMessageStore(),
- registry.getExchangeRegistry());
- }
-
- protected void routeCurrentMessage() throws AMQException
- {
- super.routeCurrentMessage();
- removeChannel(getChannelId());
- }
- }
-
- public static boolean isPayloadFromPeer(AMQMessage payload)
- {
- return isPeerSession(payload.getPublisher());
- }
-
- public static boolean canRelay(AMQMessage payload, MemberHandle target)
- {
- //can only relay client messages that have not already been relayed to the given target
- return !isPayloadFromPeer(payload) && !payload.checkToken(target);
- }
-
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/ConnectionStatusMonitor.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/ConnectionStatusMonitor.java
deleted file mode 100644
index a1f01eff46..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/ConnectionStatusMonitor.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import java.io.IOException;
-
-class ConnectionStatusMonitor
-{
- private boolean _complete;
- private boolean _redirected;
- private String _host;
- private int _port;
- private RuntimeException _error;
-
- synchronized void opened()
- {
- _complete = true;
- notifyAll();
- }
-
- synchronized void redirect(String host, int port)
- {
- _complete = true;
- _redirected = true;
- this._host = host;
- this._port = port;
- }
-
- synchronized void failed(RuntimeException e)
- {
- _error = e;
- _complete = true;
- }
-
- synchronized boolean waitUntilOpen() throws InterruptedException
- {
- while (!_complete)
- {
- wait();
- }
- if (_error != null)
- {
- throw _error;
- }
- return !_redirected;
- }
-
- synchronized boolean isOpened()
- {
- return _complete;
- }
-
- String getHost()
- {
- return _host;
- }
-
- int getPort()
- {
- return _port;
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/DefaultGroupManager.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/DefaultGroupManager.java
deleted file mode 100644
index 07d572d27f..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/DefaultGroupManager.java
+++ /dev/null
@@ -1,369 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.ClusterJoinBody;
-import org.apache.qpid.framing.ClusterLeaveBody;
-import org.apache.qpid.framing.ClusterMembershipBody;
-import org.apache.qpid.framing.ClusterPingBody;
-import org.apache.qpid.framing.ClusterSuspectBody;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.server.cluster.policy.StandardPolicies;
-import org.apache.qpid.server.cluster.replay.ReplayManager;
-import org.apache.qpid.server.cluster.util.LogMessage;
-import org.apache.qpid.server.cluster.util.InvokeMultiple;
-
-import java.util.List;
-
-public class DefaultGroupManager implements GroupManager, MemberFailureListener, BrokerFactory, StandardPolicies
-{
- private static final Logger _logger = Logger.getLogger(DefaultGroupManager.class);
- private final LoadTable _loadTable;
- private final BrokerFactory _factory;
- private final ReplayManager _replayMgr;
- private final BrokerGroup _group;
-
- DefaultGroupManager(MemberHandle handle, BrokerFactory factory, ReplayManager replayMgr)
- {
- this(handle, factory, replayMgr, new LoadTable());
- }
-
- DefaultGroupManager(MemberHandle handle, BrokerFactory factory, ReplayManager replayMgr, LoadTable loadTable)
- {
- handle = SimpleMemberHandle.resolve(handle);
- _logger.info(handle);
- _loadTable = loadTable;
- _factory = factory;
- _replayMgr = replayMgr;
- _group = new BrokerGroup(handle, _replayMgr, this);
- }
-
- public JoinState getState()
- {
- return _group.getState();
- }
-
- public void addMemberhipChangeListener(MembershipChangeListener l)
- {
- _group.addMemberhipChangeListener(l);
- }
-
- public void removeMemberhipChangeListener(MembershipChangeListener l)
- {
- _group.removeMemberhipChangeListener(l);
- }
-
- public void broadcast(Sendable message) throws AMQException
- {
- for (Broker b : _group.getPeers())
- {
- b.send(message, null);
- }
- }
-
- public void broadcast(Sendable message, BroadcastPolicy policy, GroupResponseHandler callback) throws AMQException
- {
- GroupRequest request = new GroupRequest(message, policy, callback);
- for (Broker b : _group.getPeers())
- {
- b.invoke(request);
- }
- request.finishedSend();
- }
-
- public void send(MemberHandle broker, Sendable message) throws AMQException
- {
- Broker destination = findBroker(broker);
- if(destination == null)
- {
- _logger.warn(new LogMessage("Invalid destination sending {0}. {1} not known", message, broker));
- }
- else
- {
- destination.send(message, null);
- _logger.debug(new LogMessage("Sent {0} to {1}", message, broker));
- }
- }
-
- private void send(Broker broker, Sendable message, ResponseHandler handler) throws AMQException
- {
- broker.send(message, handler);
- }
-
- private void ping(Broker b) throws AMQException
- {
- ClusterPingBody ping = new ClusterPingBody();
- ping.broker = _group.getLocal().getDetails();
- ping.responseRequired = true;
- ping.load = _loadTable.getLocalLoad();
- BlockingHandler handler = new BlockingHandler();
- send(getLeader(), new SimpleSendable(ping), handler);
- handler.waitForCompletion();
- if (handler.failed())
- {
- if (isLeader())
- {
- handleFailure(b);
- }
- else
- {
- suspect(b);
- }
- }
- else
- {
- _loadTable.setLoad(b, ((ClusterPingBody) handler.getResponse()).load);
- }
- }
-
- public void handlePing(MemberHandle member, long load)
- {
- _loadTable.setLoad(findBroker(member), load);
- }
-
- public Member redirect()
- {
- return _loadTable.redirect();
- }
-
- public void establish()
- {
- _group.establish();
- _logger.info("Established cluster");
- }
-
- public void join(MemberHandle member) throws AMQException
- {
- member = SimpleMemberHandle.resolve(member);
-
- Broker leader = connectToLeader(member);
- _logger.info(new LogMessage("Connected to {0}. joining", leader));
- ClusterJoinBody join = new ClusterJoinBody();
- join.broker = _group.getLocal().getDetails();
- send(leader, new SimpleSendable(join));
- }
-
- private Broker connectToLeader(MemberHandle member) throws AMQException
- {
- try
- {
- return _group.connectToLeader(member);
- }
- catch (Exception e)
- {
- throw new AMQException("Could not connect to leader: " + e, e);
- }
- }
-
- public void leave() throws AMQException
- {
- ClusterLeaveBody leave = new ClusterLeaveBody();
- leave.broker = _group.getLocal().getDetails();
- send(getLeader(), new SimpleSendable(leave));
- }
-
- private void suspect(MemberHandle broker) throws AMQException
- {
- if (_group.isLeader(broker))
- {
- //need new leader, if this broker is next in line it can assume leadership
- if (_group.assumeLeadership())
- {
- announceMembership();
- }
- else
- {
- _logger.warn(new LogMessage("Leader failed. Expecting {0} to succeed.", _group.getMembers().get(1)));
- }
- }
- else
- {
- ClusterSuspectBody suspect = new ClusterSuspectBody();
- suspect.broker = broker.getDetails();
- send(getLeader(), new SimpleSendable(suspect));
- }
- }
-
-
- public void handleJoin(MemberHandle member) throws AMQException
- {
- _logger.info(new LogMessage("Handling join request for {0}", member));
- if(isLeader())
- {
- //connect to the host and port specified:
- Broker prospect = connectToProspect(member);
- announceMembership();
- List<AMQMethodBody> msgs = _replayMgr.replay(true);
- _logger.info(new LogMessage("Replaying {0} from leader to {1}", msgs, prospect));
- prospect.replay(msgs);
- }
- else
- {
- //pass request on to leader:
- ClusterJoinBody request = new ClusterJoinBody();
- request.broker = member.getDetails();
- Broker leader = getLeader();
- send(leader, new SimpleSendable(request));
- _logger.info(new LogMessage("Passed join request for {0} to {1}", member, leader));
- }
- }
-
- private Broker connectToProspect(MemberHandle member) throws AMQException
- {
- try
- {
- return _group.connectToProspect(member);
- }
- catch (Exception e)
- {
- e.printStackTrace();
- throw new AMQException("Could not connect to prospect: " + e, e);
- }
- }
-
- public void handleLeave(MemberHandle member) throws AMQException
- {
- handleFailure(findBroker(member));
- announceMembership();
- }
-
- public void handleSuspect(MemberHandle member) throws AMQException
- {
- Broker b = findBroker(member);
- if(b != null)
- {
- //ping it to check it has failed, ping will handle failure if it has
- ping(b);
- announceMembership();
- }
- }
-
- public void handleSynch(MemberHandle member)
- {
- _group.synched(member);
- }
-
- private ClusterMembershipBody createAnnouncement(String membership)
- {
- ClusterMembershipBody announce = new ClusterMembershipBody();
- //TODO: revise this way of converting String to bytes...
- announce.members = membership.getBytes();
- return announce;
- }
-
- private void announceMembership() throws AMQException
- {
- String membership = SimpleMemberHandle.membersToString(_group.getMembers());
- ClusterMembershipBody announce = createAnnouncement(membership);
- broadcast(new SimpleSendable(announce));
- _logger.info(new LogMessage("Membership announcement sent: {0}", membership));
- }
-
- private void handleFailure(Broker peer)
- {
- peer.remove();
- _group.remove(peer);
- }
-
- public void handleMembershipAnnouncement(String membership) throws AMQException
- {
- _group.setMembers(SimpleMemberHandle.stringToMembers(membership));
- _logger.info(new LogMessage("Membership announcement received: {0}", membership));
- }
-
- public boolean isLeader()
- {
- return _group.isLeader();
- }
-
- public boolean isLeader(MemberHandle handle)
- {
- return _group.isLeader(handle);
- }
-
- public Broker getLeader()
- {
- return _group.getLeader();
- }
-
- private Broker findBroker(MemberHandle handle)
- {
- return _group.findBroker(handle, false);
- }
-
- public Member getMember(MemberHandle handle)
- {
- return findBroker(handle);
- }
-
- public boolean isMember(MemberHandle member)
- {
- for (MemberHandle handle : _group.getMembers())
- {
- if (handle.matches(member))
- {
- return true;
- }
- }
- return false;
- }
-
- public MemberHandle getLocal()
- {
- return _group.getLocal();
- }
-
- public void failed(MemberHandle member)
- {
- if (isLeader())
- {
- handleFailure(findBroker(member));
- try
- {
- announceMembership();
- }
- catch (AMQException e)
- {
- _logger.error("Error announcing failure: " + e, e);
- }
- }
- else
- {
- try
- {
- suspect(member);
- }
- catch (AMQException e)
- {
- _logger.error("Error sending suspect: " + e, e);
- }
- }
- }
-
- public Broker create(MemberHandle handle)
- {
- Broker broker = _factory.create(handle);
- broker.addFailureListener(this);
- return broker;
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/GroupManager.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/GroupManager.java
deleted file mode 100644
index 5599ae4b1f..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/GroupManager.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.qpid.AMQException;
-
-public interface GroupManager
-{
- /**
- * Establish a new cluster with the local member as the leader.
- */
- public void establish();
-
- /**
- * Join the cluster to which member belongs
- */
- public void join(MemberHandle member) throws AMQException;
-
- public void broadcast(Sendable message) throws AMQException;
-
- public void broadcast(Sendable message, BroadcastPolicy policy, GroupResponseHandler callback) throws AMQException;
-
- public void send(MemberHandle broker, Sendable message) throws AMQException;
-
- public void leave() throws AMQException;
-
- public void handleJoin(MemberHandle member) throws AMQException;
-
- public void handleLeave(MemberHandle member) throws AMQException;
-
- public void handleSuspect(MemberHandle member) throws AMQException;
-
- public void handlePing(MemberHandle member, long load);
-
- public void handleMembershipAnnouncement(String membership) throws AMQException;
-
- public void handleSynch(MemberHandle member);
-
- public boolean isLeader();
-
- public boolean isLeader(MemberHandle handle);
-
- public boolean isMember(MemberHandle member);
-
- public MemberHandle redirect();
-
- public MemberHandle getLocal();
-
- public JoinState getState();
-
- public void addMemberhipChangeListener(MembershipChangeListener l);
-
- public void removeMemberhipChangeListener(MembershipChangeListener l);
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/GroupRequest.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/GroupRequest.java
deleted file mode 100644
index 8ab7856e87..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/GroupRequest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Represents a method sent to a group of Member instances. Manages the responses,
- * completion and callback.
- *
- */
-class GroupRequest
-{
- private final Map<Member, AMQMethodBody> _responses = new HashMap<Member, AMQMethodBody>();
- private final List<Member> _brokers = new ArrayList<Member>();
- private boolean _sent;
-
- private final Sendable _request;
- private final BroadcastPolicy _policy;
- private final GroupResponseHandler _callback;
-
- GroupRequest(Sendable request, BroadcastPolicy policy, GroupResponseHandler callback)
- {
- _request = request;
- _policy = policy;
- _callback = callback;
- }
-
- void send(int channel, Member session) throws AMQException
- {
- _brokers.add(session);
- _request.send(channel, session);
- }
-
- boolean finishedSend()
- {
- _sent = true;
- return checkCompletion();
- }
-
- public boolean responseReceived(Member broker, AMQMethodBody response)
- {
- _responses.put(broker, response);
- return checkCompletion();
- }
-
- public boolean removed(Member broker)
- {
- _brokers.remove(broker);
- return checkCompletion();
- }
-
- private synchronized boolean checkCompletion()
- {
- return isComplete() && callback();
- }
-
- boolean isComplete()
- {
- return _sent && _policy != null && _policy.isComplete(_responses.size(), _brokers.size());
- }
-
- boolean callback()
- {
- _callback.response(getResults(), _brokers);
- return true;
- }
-
- List<AMQMethodBody> getResults()
- {
- List<AMQMethodBody> results = new ArrayList<AMQMethodBody>(_brokers.size());
- for (Member b : _brokers)
- {
- results.add(_responses.get(b));
- }
- return results;
- }
-
- public String toString()
- {
- return "GroupRequest{request=" + _request +", brokers=" + _brokers + ", responses=" + _responses + "}";
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/GroupResponseHandler.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/GroupResponseHandler.java
deleted file mode 100644
index d2e9de2f39..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/GroupResponseHandler.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.qpid.framing.AMQMethodBody;
-
-import java.util.List;
-
-public interface GroupResponseHandler
-{
- //Note: this implies that the response to a group request will always be a method body...
- public void response(List<AMQMethodBody> responses, List<Member> members);
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/InductionBuffer.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/InductionBuffer.java
deleted file mode 100644
index 586d7d4ae8..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/InductionBuffer.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.mina.common.IoSession;
-
-import java.util.LinkedList;
-import java.util.Queue;
-
-/**
- * Buffers any received messages until join completes.
- *
- */
-class InductionBuffer
-{
- private final Queue<Message> _buffer = new LinkedList<Message>();
- private final MessageHandler _handler;
- private boolean _buffering = true;
-
- InductionBuffer(MessageHandler handler)
- {
- _handler = handler;
- }
-
- private void process() throws Exception
- {
- for (Message o = _buffer.poll(); o != null; o = _buffer.poll())
- {
- o.deliver(_handler);
- }
- _buffering = false;
- }
-
- synchronized void deliver() throws Exception
- {
- process();
- }
-
- synchronized void receive(IoSession session, Object msg) throws Exception
- {
- if (_buffering)
- {
- _buffer.offer(new Message(session, msg));
- }
- else
- {
- _handler.deliver(session, msg);
- }
- }
-
- private static class Message
- {
- private final IoSession _session;
- private final Object _msg;
-
- Message(IoSession session, Object msg)
- {
- _session = session;
- _msg = msg;
- }
-
- void deliver(MessageHandler handler) throws Exception
- {
- handler.deliver(_session, _msg);
- }
- }
-
- static interface MessageHandler
- {
- public void deliver(IoSession session, Object msg) throws Exception;
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/JoinState.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/JoinState.java
deleted file mode 100644
index 5f92aa2971..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/JoinState.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-public enum JoinState
-{
- UNINITIALISED, JOINING, INITIATION, INDUCTION, JOINED
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/LoadTable.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/LoadTable.java
deleted file mode 100644
index 13465a8615..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/LoadTable.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.PriorityQueue;
-
-/**
- * Maintains loading information about the local member and its cluster peers.
- *
- */
-public class LoadTable
-{
- private final Map<MemberHandle, Loading> _peers = new HashMap<MemberHandle, Loading>();
- private final PriorityQueue<Loading> _loads = new PriorityQueue<Loading>();
- private final Loading _local = new Loading(null);
-
- public LoadTable()
- {
- _loads.add(_local);
- }
-
- public void setLoad(Member member, long load)
- {
- synchronized (_peers)
- {
- Loading loading = _peers.get(member);
- if (loading == null)
- {
- loading = new Loading(member);
- synchronized (_loads)
- {
- _loads.add(loading);
- }
- _peers.put(member, loading);
- }
- loading.load = load;
- }
- }
-
- public void incrementLocalLoad()
- {
- synchronized (_local)
- {
- _local.load++;
- }
- }
-
- public void decrementLocalLoad()
- {
- synchronized (_local)
- {
- _local.load--;
- }
- }
-
- public long getLocalLoad()
- {
- synchronized (_local)
- {
- return _local.load;
- }
- }
-
- public Member redirect()
- {
- synchronized (_loads)
- {
- return _loads.peek().member;
- }
- }
-
- private static class Loading implements Comparable
- {
- private final Member member;
- private long load;
-
- Loading(Member member)
- {
- this.member = member;
- }
-
- public int compareTo(Object o)
- {
- return (int) (load - ((Loading) o).load);
- }
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/Main.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/Main.java
deleted file mode 100644
index 57779a0550..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/Main.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.cli.PosixParser;
-import org.apache.log4j.Logger;
-import org.apache.mina.common.IoAcceptor;
-import org.apache.mina.transport.socket.nio.SocketAcceptor;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
-import org.apache.qpid.pool.ReadWriteThreadModel;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
-import org.apache.qpid.server.transport.ConnectorConfiguration;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-
-/**
- * TODO: This is a cut-and-paste from the original broker Main class. Would be preferrable
- * to make that class more reuseable to avoid all this duplication.
- *
- */
-public class Main extends org.apache.qpid.server.Main
-{
- private static final Logger _logger = Logger.getLogger(Main.class);
-
- protected Main(String[] args)
- {
- super(args);
- }
-
- protected void setOptions(Options otions)
- {
- super.setOptions(options);
-
- //extensions:
- Option join = OptionBuilder.withArgName("join").hasArg().withDescription("Join the specified cluster member. Overrides any value in the config file").
- withLongOpt("join").create("j");
- options.addOption(join);
- }
-
- protected void bind(int port, ConnectorConfiguration connectorConfig)
- {
- try
- {
- IoAcceptor acceptor = new SocketAcceptor();
- SocketSessionConfig sc = (SocketSessionConfig) acceptor.getSessionConfig();
-
- sc.setReceiveBufferSize(connectorConfig.socketReceiveBufferSize);
- sc.setSendBufferSize(connectorConfig.socketWriteBuferSize);
- sc.setTcpNoDelay(true);
-
- // if we do not use the executor pool threading model we get the default leader follower
- // implementation provided by MINA
- if (connectorConfig.enableExecutorPool)
- {
- acceptor.setThreadModel(new ReadWriteThreadModel());
- }
-
- String host = InetAddress.getLocalHost().getHostName();
- ClusteredProtocolHandler handler = new ClusteredProtocolHandler(new InetSocketAddress(host, port));
- if (connectorConfig.enableNonSSL)
- {
- acceptor.setLocalAddress(new InetSocketAddress(port));
- acceptor.setHandler(handler);
- acceptor.bind();
- _logger.info("Qpid.AMQP listening on non-SSL port " + port);
- handler.connect(commandLine.getOptionValue("j"));
- }
-
- if (connectorConfig.enableSSL)
- {
- ClusteredProtocolHandler sslHandler = new ClusteredProtocolHandler(handler);
- sslHandler.setUseSSL(true);
- acceptor.setLocalAddress(new InetSocketAddress(connectorConfig.sslPort));
- acceptor.setHandler(handler);
- acceptor.bind();
- _logger.info("Qpid.AMQP listening on SSL port " + connectorConfig.sslPort);
- }
- }
- catch (IOException e)
- {
- _logger.error("Unable to bind service to registry: " + e, e);
- }
- catch (Exception e)
- {
- _logger.error("Unable to connect to cluster: " + e, e);
- }
- }
-
- public static void main(String[] args)
- {
- new Main(args);
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/Member.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/Member.java
deleted file mode 100644
index 3fbdfdde70..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/Member.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQDataBlock;
-
-public interface Member extends MemberHandle
-{
- public void send(AMQDataBlock data) throws AMQException;
-
- public void addFailureListener(MemberFailureListener listener);
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/MemberFailureListener.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/MemberFailureListener.java
deleted file mode 100644
index 7ce45dffaa..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/MemberFailureListener.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-interface MemberFailureListener
-{
- public void failed(MemberHandle member);
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/MemberHandle.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/MemberHandle.java
deleted file mode 100644
index b14fede5aa..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/MemberHandle.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-public interface MemberHandle
-{
- public String getHost();
-
- public int getPort();
-
- public boolean matches(MemberHandle m);
-
- public boolean matches(String host, int port);
-
- public String getDetails();
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/MembershipChangeListener.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/MembershipChangeListener.java
deleted file mode 100644
index 591e652e32..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/MembershipChangeListener.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import java.util.List;
-
-public interface MembershipChangeListener
-{
- public void changed(List<MemberHandle> members);
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/MethodHandler.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/MethodHandler.java
deleted file mode 100644
index a83f034021..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/MethodHandler.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-
-interface MethodHandler
-{
- public void handle(int channel, AMQMethodBody method) throws AMQException;
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/MethodHandlerFactory.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/MethodHandlerFactory.java
deleted file mode 100644
index 9bf04f5458..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/MethodHandlerFactory.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.qpid.server.state.AMQState;
-
-public interface MethodHandlerFactory
-{
- public MethodHandlerRegistry register(AMQState state, MethodHandlerRegistry registry);
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/MethodHandlerRegistry.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/MethodHandlerRegistry.java
deleted file mode 100644
index 748a660bb8..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/MethodHandlerRegistry.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class MethodHandlerRegistry
-{
- private final Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>> registry =
- new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>();
-
- public <A extends AMQMethodBody, B extends Class<A>> MethodHandlerRegistry addHandler(B type, StateAwareMethodListener<A> handler)
- {
- registry.put(type, handler);
- return this;
- }
-
- public <B extends AMQMethodBody> StateAwareMethodListener<B> getHandler(B frame)
- {
- return (StateAwareMethodListener<B>) registry.get(frame.getClass());
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/MinaBrokerProxy.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/MinaBrokerProxy.java
deleted file mode 100644
index ee3b3ceb8a..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/MinaBrokerProxy.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.log4j.Logger;
-import org.apache.mina.common.ConnectFuture;
-import org.apache.mina.common.IoHandlerAdapter;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.RuntimeIOException;
-import org.apache.mina.filter.codec.ProtocolCodecFilter;
-import org.apache.mina.transport.socket.nio.SocketConnector;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.cluster.util.LogMessage;
-import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.framing.AMQBody;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.ConnectionRedirectBody;
-import org.apache.qpid.framing.ProtocolInitiation;
-import org.apache.qpid.framing.ProtocolVersionList;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-/**
- * A 'client stub' for a remote cluster peer, using MINA for IO Layer
- *
- */
-public class MinaBrokerProxy extends Broker implements MethodHandler
-{
- private static final Logger _logger = Logger.getLogger(MinaBrokerProxy.class);
- private final ConnectionStatusMonitor _connectionMonitor = new ConnectionStatusMonitor();
- private final ClientHandlerRegistry _legacyHandler;
- private final MinaBinding _binding = new MinaBinding();
- private final MemberHandle _local;
- private IoSession _session;
- private MethodHandler _handler;
- private Iterable<AMQMethodBody> _replay;
-
- MinaBrokerProxy(String host, int port, MemberHandle local)
- {
- super(host, port);
- _local = local;
- _legacyHandler = new ClientHandlerRegistry(local);
- }
-
- private void init(IoSession session)
- {
- _session = session;
- _handler = new ClientAdapter(session, _legacyHandler);
- }
-
- private ConnectFuture connectImpl()
- {
- _logger.info("Connecting to cluster peer: " + getDetails());
- SocketConnector ioConnector = new SocketConnector();
-
- SocketSessionConfig scfg = (SocketSessionConfig) ioConnector.getSessionConfig();
- scfg.setTcpNoDelay(true);
- scfg.setSendBufferSize(32768);
- scfg.setReceiveBufferSize(32768);
- InetSocketAddress address = new InetSocketAddress(getHost(), getPort());
- ioConnector.setHandler(_binding);
- return ioConnector.connect(address);
- }
-
- //extablish connection without handling redirect
- boolean connect() throws IOException, InterruptedException
- {
- ConnectFuture future = connectImpl();
- // wait for connection to complete
- future.join();
- // we call getSession which throws an IOException if there has been an error connecting
- try
- {
- future.getSession();
- }
- catch (RuntimeIOException e)
- {
- _connectionMonitor.failed(e);
- _logger.error(new LogMessage("Could not connect to {0}: {1}", this, e), e);
- throw e;
- }
- return _connectionMonitor.waitUntilOpen();
- }
-
- void connectAsynch(Iterable<AMQMethodBody> msgs)
- {
- _replay = msgs;
- connectImpl();
- }
-
- void replay(Iterable<AMQMethodBody> msgs)
- {
- _replay = msgs;
- if(_connectionMonitor.isOpened())
- {
- replay();
- }
- }
-
- //establish connection, handling redirect if required...
- Broker connectToCluster() throws IOException, InterruptedException
- {
- connect();
- //wait until the connection is open or get a redirection
- if (_connectionMonitor.waitUntilOpen())
- {
- return this;
- }
- else
- {
- Broker broker = new MinaBrokerProxy(_connectionMonitor.getHost(), _connectionMonitor.getPort(), _local);
- broker.connect();
- return broker;
- }
- }
-
- public void send(AMQDataBlock data) throws AMQException
- {
- if (_session == null)
- {
- try
- {
- _connectionMonitor.waitUntilOpen();
- }
- catch (Exception e)
- {
- throw new AMQException("Failed to send " + data + ": " + e, e);
- }
- }
- _session.write(data);
- }
-
- private void replay()
- {
- if(_replay != null)
- {
- for(AMQMethodBody b : _replay)
- {
- _session.write(new AMQFrame(0, b));
- }
- }
- }
-
- public void handle(int channel, AMQMethodBody method) throws AMQException
- {
- _logger.info(new LogMessage("Handling method: {0} for channel {1}", method, channel));
- if (!handleResponse(channel, method))
- {
- _logger.warn(new LogMessage("Unhandled method: {0} for channel {1}", method, channel));
- }
- }
-
- private void handleMethod(int channel, AMQMethodBody method) throws AMQException
- {
- if (method instanceof ConnectionRedirectBody)
- {
- //signal redirection to waiting thread
- ConnectionRedirectBody redirect = (ConnectionRedirectBody) method;
- String[] parts = redirect.host.split(":");
- _connectionMonitor.redirect(parts[0], Integer.parseInt(parts[1]));
- }
- else
- {
- _handler.handle(channel, method);
- if (AMQState.CONNECTION_OPEN.equals(_legacyHandler.getCurrentState()) && _handler != this)
- {
- _handler = this;
- _logger.info(new LogMessage("Connection opened, handler switched"));
- //replay any messages:
- replay();
- //signal waiting thread:
- _connectionMonitor.opened();
- }
- }
- }
-
- private void handleFrame(AMQFrame frame) throws AMQException
- {
- AMQBody body = frame.bodyFrame;
- if (body instanceof AMQMethodBody)
- {
- handleMethod(frame.channel, (AMQMethodBody) body);
- }
- else
- {
- throw new AMQException("Client only expects method body, got: " + body);
- }
- }
-
- public String toString()
- {
- return "MinaBrokerProxy[" + (_session == null ? super.toString() : _session.getRemoteAddress()) + "]";
- }
-
- private class MinaBinding extends IoHandlerAdapter implements ProtocolVersionList
- {
- public void sessionCreated(IoSession session) throws Exception
- {
- init(session);
- _logger.info(new LogMessage("{0}: created", MinaBrokerProxy.this));
- ProtocolCodecFilter pcf = new ProtocolCodecFilter(new AMQCodecFactory(false));
- session.getFilterChain().addLast("protocolFilter", pcf);
-
- /* Find last protocol version in protocol version list. Make sure last protocol version
- listed in the build file (build-module.xml) is the latest version which will be used
- here. */
- int i = pv.length - 1;
- session.write(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR]));
- }
-
- public void sessionOpened(IoSession session) throws Exception
- {
- _logger.info(new LogMessage("{0}: opened", MinaBrokerProxy.this));
- }
-
- public void sessionClosed(IoSession session) throws Exception
- {
- _logger.info(new LogMessage("{0}: closed", MinaBrokerProxy.this));
- }
-
- public void exceptionCaught(IoSession session, Throwable throwable) throws Exception
- {
- _logger.error(new LogMessage("{0}: received {1}", MinaBrokerProxy.this, throwable), throwable);
- if (! (throwable instanceof IOException))
- {
- _session.close();
- }
- failed();
- }
-
- public void messageReceived(IoSession session, Object object) throws Exception
- {
- if (object instanceof AMQFrame)
- {
- handleFrame((AMQFrame) object);
- }
- else
- {
- throw new AMQException("Received message of unrecognised type: " + object);
- }
- }
-
- public void messageSent(IoSession session, Object object) throws Exception
- {
- _logger.debug(new LogMessage("{0}: sent {1}", MinaBrokerProxy.this, object));
- }
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/MinaBrokerProxyFactory.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/MinaBrokerProxyFactory.java
deleted file mode 100644
index 5e70de7665..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/MinaBrokerProxyFactory.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-public class MinaBrokerProxyFactory implements BrokerFactory
-{
- private final MemberHandle _local;
-
- MinaBrokerProxyFactory(MemberHandle local)
- {
- _local = local;
- }
-
- public Broker create(MemberHandle handle)
- {
- return new MinaBrokerProxy(handle.getHost(), handle.getPort(), _local);
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/ResponseHandler.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/ResponseHandler.java
deleted file mode 100644
index fe76ca6505..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/ResponseHandler.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.qpid.framing.AMQMethodBody;
-
-public interface ResponseHandler
-{
- public void responded(AMQMethodBody response);
-
- public void removed();
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/Sendable.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/Sendable.java
deleted file mode 100644
index 159612331c..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/Sendable.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.qpid.AMQException;
-
-public interface Sendable
-{
- public void send(int channel, Member member) throws AMQException;
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/ServerHandlerRegistry.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
deleted file mode 100644
index 71c53146a8..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.server.state.AMQState;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.state.IllegalStateTransitionException;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.cluster.util.LogMessage;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * An extension of server.AMQStateManager that allows different handlers to be registered.
- *
- */
-class ServerHandlerRegistry extends AMQStateManager
-{
- private final Logger _logger = Logger.getLogger(ServerHandlerRegistry.class);
- private final Map<AMQState, MethodHandlerRegistry> _handlers = new HashMap<AMQState, MethodHandlerRegistry>();
-
- ServerHandlerRegistry()
- {
- super(AMQState.CONNECTION_NOT_STARTED, false);
- }
-
- ServerHandlerRegistry(ServerHandlerRegistry s)
- {
- this();
- _handlers.putAll(s._handlers);
- }
-
- ServerHandlerRegistry(MethodHandlerFactory factory)
- {
- this();
- init(factory);
- }
-
- void setHandlers(AMQState state, MethodHandlerRegistry handlers)
- {
- _handlers.put(state, handlers);
- }
-
- void init(MethodHandlerFactory factory)
- {
- for (AMQState s : AMQState.values())
- {
- setHandlers(s, factory.register(s, new MethodHandlerRegistry()));
- }
- }
-
- protected <B extends AMQMethodBody> StateAwareMethodListener<B> findStateTransitionHandler(AMQState state, B frame) throws IllegalStateTransitionException
- {
- MethodHandlerRegistry registry = _handlers.get(state);
- StateAwareMethodListener<B> handler = (registry == null) ? null : registry.getHandler(frame);
- if (handler == null)
- {
- _logger.warn(new LogMessage("No handler for {0}, {1}", state, frame));
- }
- return handler;
- }
-
- <A extends AMQMethodBody, B extends Class<A>> void addHandler(AMQState state, B type, StateAwareMethodListener<A> handler)
- {
- MethodHandlerRegistry registry = _handlers.get(state);
- if (registry == null)
- {
- registry = new MethodHandlerRegistry();
- _handlers.put(state, registry);
- }
- registry.addHandler(type, handler);
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/SimpleMemberHandle.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/SimpleMemberHandle.java
deleted file mode 100644
index b6d5e3d88d..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/SimpleMemberHandle.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.List;
-
-public class SimpleMemberHandle implements MemberHandle
-{
- private final String _host;
- private final int _port;
-
- public SimpleMemberHandle(String host, int port)
- {
- _host = host;
- _port = port;
- }
-
- public SimpleMemberHandle(String details)
- {
- String[] parts = details.split(":");
- _host = parts[0];
- _port = Integer.parseInt(parts[1]);
- }
-
- public SimpleMemberHandle(InetSocketAddress address) throws UnknownHostException
- {
- this(address.getAddress(), address.getPort());
- }
-
- public SimpleMemberHandle(InetAddress address, int port) throws UnknownHostException
- {
- this(canonical(address).getHostAddress(), port);
- }
-
- public String getHost()
- {
- return _host;
- }
-
- public int getPort()
- {
- return _port;
- }
-
- public int hashCode()
- {
- return getPort();
- }
-
- public boolean equals(Object o)
- {
- return o instanceof MemberHandle && matches((MemberHandle) o);
- }
-
- public boolean matches(MemberHandle m)
- {
- return matches(m.getHost(), m.getPort());
- }
-
- public boolean matches(String host, int port)
- {
- return _host.equals(host) && _port == port;
- }
-
- public String getDetails()
- {
- return _host + ":" + _port;
- }
-
- public String toString()
- {
- return getDetails();
- }
-
- static List<MemberHandle> stringToMembers(String membership)
- {
- String[] names = membership.split("\\s");
- List<MemberHandle> members = new ArrayList<MemberHandle>();
- for (String name : names)
- {
- members.add(new SimpleMemberHandle(name));
- }
- return members;
- }
-
- static String membersToString(List<MemberHandle> members)
- {
- StringBuffer buffer = new StringBuffer();
- boolean first = true;
- for (MemberHandle m : members)
- {
- if (first)
- {
- first = false;
- }
- else
- {
- buffer.append(" ");
- }
- buffer.append(m.getDetails());
- }
-
- return buffer.toString();
- }
-
- private static InetAddress canonical(InetAddress address) throws UnknownHostException
- {
- if (address.isLoopbackAddress())
- {
- return InetAddress.getLocalHost();
- }
- else
- {
- return address;
- }
- }
-
- public MemberHandle resolve()
- {
- return resolve(this);
- }
-
- public static MemberHandle resolve(MemberHandle handle)
- {
- try
- {
- return new SimpleMemberHandle(new InetSocketAddress(handle.getHost(), handle.getPort()));
- }
- catch (UnknownHostException e)
- {
- e.printStackTrace();
- return handle;
- }
- }
-
-
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/SimpleSendable.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/SimpleSendable.java
deleted file mode 100644
index 34b5cd829d..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/SimpleSendable.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQBody;
-import org.apache.qpid.framing.AMQFrame;
-
-import java.util.Arrays;
-import java.util.List;
-
-public class SimpleSendable implements Sendable
-{
- private final List<AMQBody> _bodies;
-
- public SimpleSendable(AMQBody body)
- {
- this(Arrays.asList(body));
- }
-
- public SimpleSendable(List<AMQBody> bodies)
- {
- _bodies = bodies;
- }
-
- public void send(int channel, Member member) throws AMQException
- {
- for (AMQBody body : _bodies)
- {
- member.send(new AMQFrame(channel, body));
- }
- }
-
- public String toString()
- {
- return _bodies.toString();
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java
deleted file mode 100644
index 8e7fb1ff49..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.handler;
-
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-
-import java.util.List;
-import java.util.ArrayList;
-
-public class ChainedClusterMethodHandler <A extends AMQMethodBody> extends ClusterMethodHandler<A>
-{
- private final List<ClusterMethodHandler<A>> _handlers;
-
- private ChainedClusterMethodHandler()
- {
- this(new ArrayList<ClusterMethodHandler<A>>());
- }
-
- public ChainedClusterMethodHandler(List<ClusterMethodHandler<A>> handlers)
- {
- _handlers = handlers;
- }
-
- public ChainedClusterMethodHandler(ClusterMethodHandler<A>... handlers)
- {
- this();
- for(ClusterMethodHandler<A>handler: handlers)
- {
- _handlers.add(handler);
- }
- }
-
- protected final void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
- {
- for(ClusterMethodHandler<A> handler : _handlers)
- {
- handler.peer(stateMgr, queues, exchanges, session, evt);
- }
- }
-
- protected final void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
- {
- for(ClusterMethodHandler<A> handler : _handlers)
- {
- handler.client(stateMgr, queues, exchanges, session, evt);
- }
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java
deleted file mode 100644
index 022ee098ab..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.handler;
-
-import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
-import org.apache.qpid.server.cluster.util.LogMessage;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.QueueBindBody;
-import org.apache.qpid.framing.QueueDeleteBody;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.log4j.Logger;
-
-import java.util.Map;
-import java.util.HashMap;
-
-/**
- * Maintains the default queue names for a channel, and alters subsequent frames where necessary
- * to use this (i.e. when no queue is explictly specified).
- *
- */
-class ChannelQueueManager
-{
- private static final Logger _logger = Logger.getLogger(ChannelQueueManager.class);
- private final Map<Integer, String> _channelQueues = new HashMap<Integer, String>();
-
- ClusterMethodHandler<QueueDeclareBody> createQueueDeclareHandler()
- {
- return new QueueDeclareHandler();
- }
-
- ClusterMethodHandler<QueueDeleteBody> createQueueDeleteHandler()
- {
- return new QueueDeleteHandler();
- }
-
- ClusterMethodHandler<QueueBindBody> createQueueBindHandler()
- {
- return new QueueBindHandler();
- }
-
- ClusterMethodHandler<BasicConsumeBody> createBasicConsumeHandler()
- {
- return new BasicConsumeHandler();
- }
-
- private void set(int channel, String queue)
- {
- _channelQueues.put(channel, queue);
- _logger.info(new LogMessage("Set default queue for {0} to {1}", channel, queue));
- }
-
- private String get(int channel)
- {
- String queue = _channelQueues.get(channel);
- _logger.info(new LogMessage("Default queue for {0} is {1}", channel, queue));
- return queue;
- }
-
- private class QueueDeclareHandler extends ClusterMethodHandler<QueueDeclareBody>
- {
- protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
- {
- }
-
- protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
- {
- set(evt.getChannelId(), evt.getMethod().queue);
- }
- }
- private class QueueBindHandler extends ClusterMethodHandler<QueueBindBody>
- {
- protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueBindBody> evt) throws AMQException
- {
- }
-
- protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueBindBody> evt) throws AMQException
- {
- if(evt.getMethod().queue == null)
- {
- evt.getMethod().queue = get(evt.getChannelId());
- }
- }
- }
- private class QueueDeleteHandler extends ClusterMethodHandler<QueueDeleteBody>
- {
- protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
- {
- }
-
- protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
- {
- if(evt.getMethod().queue == null)
- {
- evt.getMethod().queue = get(evt.getChannelId());
- }
- }
- }
-
- private class BasicConsumeHandler extends ClusterMethodHandler<BasicConsumeBody>
- {
- protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException
- {
- }
-
- protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException
- {
- if(evt.getMethod().queue == null)
- {
- evt.getMethod().queue = get(evt.getChannelId());
- }
- }
- }
-
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java
deleted file mode 100644
index 5944d99a14..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.handler;
-
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
-import org.apache.qpid.server.cluster.ClusteredProtocolSession;
-import org.apache.qpid.AMQException;
-
-public abstract class ClusterMethodHandler<A extends AMQMethodBody> implements StateAwareMethodListener<A>
-{
- public final void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
- {
- if (ClusteredProtocolSession.isPeerSession(session))
- {
- peer(stateMgr, queues, exchanges, session, evt);
- }
- else
- {
- client(stateMgr, queues, exchanges, session, evt);
- }
- }
-
- protected abstract void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException;
- protected abstract void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException;
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java
deleted file mode 100644
index 46ba3e5015..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java
+++ /dev/null
@@ -1,281 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.handler;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.BasicCancelBody;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.ChannelFlowBody;
-import org.apache.qpid.framing.ChannelOpenBody;
-import org.apache.qpid.framing.ClusterJoinBody;
-import org.apache.qpid.framing.ClusterLeaveBody;
-import org.apache.qpid.framing.ClusterMembershipBody;
-import org.apache.qpid.framing.ClusterPingBody;
-import org.apache.qpid.framing.ClusterSuspectBody;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.ConnectionOpenBody;
-import org.apache.qpid.framing.ConnectionSecureOkBody;
-import org.apache.qpid.framing.ConnectionStartOkBody;
-import org.apache.qpid.framing.ConnectionTuneOkBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
-import org.apache.qpid.framing.ExchangeDeleteBody;
-import org.apache.qpid.framing.QueueBindBody;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.QueueDeleteBody;
-import org.apache.qpid.framing.ClusterSynchBody;
-import org.apache.qpid.framing.BasicQosBody;
-import org.apache.qpid.framing.TxSelectBody;
-import org.apache.qpid.framing.TxCommitBody;
-import org.apache.qpid.framing.TxRollbackBody;
-import org.apache.qpid.server.cluster.ClusterCapability;
-import org.apache.qpid.server.cluster.ClusteredProtocolSession;
-import org.apache.qpid.server.cluster.GroupManager;
-import org.apache.qpid.server.cluster.LoadTable;
-import org.apache.qpid.server.cluster.MemberHandle;
-import org.apache.qpid.server.cluster.MethodHandlerFactory;
-import org.apache.qpid.server.cluster.MethodHandlerRegistry;
-import org.apache.qpid.server.cluster.SimpleMemberHandle;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.handler.ChannelCloseHandler;
-import org.apache.qpid.server.handler.ChannelFlowHandler;
-import org.apache.qpid.server.handler.ChannelOpenHandler;
-import org.apache.qpid.server.handler.ConnectionCloseMethodHandler;
-import org.apache.qpid.server.handler.ConnectionOpenMethodHandler;
-import org.apache.qpid.server.handler.ConnectionSecureOkMethodHandler;
-import org.apache.qpid.server.handler.ConnectionStartOkMethodHandler;
-import org.apache.qpid.server.handler.ConnectionTuneOkMethodHandler;
-import org.apache.qpid.server.handler.ExchangeDeclareHandler;
-import org.apache.qpid.server.handler.ExchangeDeleteHandler;
-import org.apache.qpid.server.handler.BasicCancelMethodHandler;
-import org.apache.qpid.server.handler.BasicPublishMethodHandler;
-import org.apache.qpid.server.handler.QueueBindHandler;
-import org.apache.qpid.server.handler.QueueDeleteHandler;
-import org.apache.qpid.server.handler.BasicQosHandler;
-import org.apache.qpid.server.handler.TxSelectHandler;
-import org.apache.qpid.server.handler.TxCommitHandler;
-import org.apache.qpid.server.handler.TxRollbackHandler;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.state.AMQState;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-
-public class ClusterMethodHandlerFactory implements MethodHandlerFactory
-{
- private final GroupManager _groupMgr;
- private final LoadTable _loadTable;
-
- public ClusterMethodHandlerFactory(GroupManager groupMgr, LoadTable loadTable)
- {
- _groupMgr = groupMgr;
- _loadTable = loadTable;
- }
-
- public MethodHandlerRegistry register(AMQState state, MethodHandlerRegistry registry)
- {
- switch (state)
- {
- case CONNECTION_NOT_STARTED:
- return registry.addHandler(ConnectionStartOkBody.class, ConnectionStartOkMethodHandler.getInstance());
- case CONNECTION_NOT_AUTH:
- return registry.addHandler(ConnectionSecureOkBody.class, ConnectionSecureOkMethodHandler.getInstance());
- case CONNECTION_NOT_TUNED:
- return registry.addHandler(ConnectionTuneOkBody.class, ConnectionTuneOkMethodHandler.getInstance());
- case CONNECTION_NOT_OPENED:
- //connection.open override:
- return registry.addHandler(ConnectionOpenBody.class, new ConnectionOpenHandler());
- case CONNECTION_OPEN:
- return registerConnectionOpened(registry);
- }
- return registry;
- }
-
- private MethodHandlerRegistry registerConnectionOpened(MethodHandlerRegistry registry)
- {
- //new cluster method handlers:
- registry.addHandler(ClusterJoinBody.class, new JoinHandler());
- registry.addHandler(ClusterLeaveBody.class, new LeaveHandler());
- registry.addHandler(ClusterSuspectBody.class, new SuspectHandler());
- registry.addHandler(ClusterMembershipBody.class, new MembershipHandler());
- registry.addHandler(ClusterPingBody.class, new PingHandler());
- registry.addHandler(ClusterSynchBody.class, new SynchHandler());
-
- //connection.close override:
- registry.addHandler(ConnectionCloseBody.class, new ConnectionCloseHandler());
-
- //replicated handlers:
- registry.addHandler(ExchangeDeclareBody.class, replicated(ExchangeDeclareHandler.getInstance()));
- registry.addHandler(ExchangeDeleteBody.class, replicated(ExchangeDeleteHandler.getInstance()));
-
- ChannelQueueManager channelQueueMgr = new ChannelQueueManager();
-
-
- LocalQueueDeclareHandler handler = new LocalQueueDeclareHandler(_groupMgr);
- registry.addHandler(QueueDeclareBody.class,
- chain(new QueueNameGenerator(handler),
- channelQueueMgr.createQueueDeclareHandler(),
- new ReplicatingHandler<QueueDeclareBody>(_groupMgr, handler)));
-
- registry.addHandler(QueueBindBody.class, chain(channelQueueMgr.createQueueBindHandler(), replicated(QueueBindHandler.getInstance())));
- registry.addHandler(QueueDeleteBody.class, chain(channelQueueMgr.createQueueDeleteHandler(), replicated(alternate(new QueueDeleteHandler(false), new QueueDeleteHandler(true)))));
- registry.addHandler(BasicConsumeBody.class, chain(channelQueueMgr.createBasicConsumeHandler(), new ReplicatingConsumeHandler(_groupMgr)));
-
- //other modified handlers:
- registry.addHandler(BasicCancelBody.class, alternate(new RemoteCancelHandler(), BasicCancelMethodHandler.getInstance()));
-
- //other unaffected handlers:
- registry.addHandler(BasicPublishBody.class, BasicPublishMethodHandler.getInstance());
- registry.addHandler(BasicQosBody.class, BasicQosHandler.getInstance());
- registry.addHandler(ChannelOpenBody.class, ChannelOpenHandler.getInstance());
- registry.addHandler(ChannelCloseBody.class, ChannelCloseHandler.getInstance());
- registry.addHandler(ChannelFlowBody.class, ChannelFlowHandler.getInstance());
- registry.addHandler(TxSelectBody.class, TxSelectHandler.getInstance());
- registry.addHandler(TxCommitBody.class, TxCommitHandler.getInstance());
- registry.addHandler(TxRollbackBody.class, TxRollbackHandler.getInstance());
-
-
- return registry;
- }
-
- private class SynchHandler implements StateAwareMethodListener<ClusterSynchBody>
- {
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession session,
- AMQMethodEvent<ClusterSynchBody> evt) throws AMQException
- {
- _groupMgr.handleSynch(ClusteredProtocolSession.getSessionPeer(session));
- }
- }
-
- private class JoinHandler implements StateAwareMethodListener<ClusterJoinBody>
- {
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession session,
- AMQMethodEvent<ClusterJoinBody> evt) throws AMQException
- {
- _groupMgr.handleJoin(new SimpleMemberHandle(evt.getMethod().broker));
- }
- }
-
- private class LeaveHandler implements StateAwareMethodListener<ClusterLeaveBody>
- {
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<ClusterLeaveBody> evt) throws AMQException
- {
- _groupMgr.handleLeave(new SimpleMemberHandle(evt.getMethod().broker));
- }
- }
-
- private class SuspectHandler implements StateAwareMethodListener<ClusterSuspectBody>
- {
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<ClusterSuspectBody> evt) throws AMQException
- {
- _groupMgr.handleSuspect(new SimpleMemberHandle(evt.getMethod().broker));
- }
- }
-
- private class MembershipHandler implements StateAwareMethodListener<ClusterMembershipBody>
- {
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession session,
- AMQMethodEvent<ClusterMembershipBody> evt) throws AMQException
- {
- ClusterMembershipBody body = evt.getMethod();
- _groupMgr.handleMembershipAnnouncement(new String(body.members));
- }
- }
-
- private class PingHandler implements StateAwareMethodListener<ClusterPingBody>
- {
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession session,
- AMQMethodEvent<ClusterPingBody> evt) throws AMQException
- {
- MemberHandle peer = new SimpleMemberHandle(evt.getMethod().broker);
- _groupMgr.handlePing(peer, evt.getMethod().load);
- if (evt.getMethod().responseRequired)
- {
- evt.getMethod().load = _loadTable.getLocalLoad();
- session.writeFrame(new AMQFrame(evt.getChannelId(), evt.getMethod()));
- }
- }
- }
-
- private class ConnectionOpenHandler extends ExtendedHandler<ConnectionOpenBody>
- {
- ConnectionOpenHandler()
- {
- super(ConnectionOpenMethodHandler.getInstance());
- }
-
- void postHandle(AMQStateManager stateMgr, AMQProtocolSession session, AMQMethodEvent<ConnectionOpenBody> evt)
- {
- String capabilities = evt.getMethod().capabilities;
- if (ClusterCapability.contains(capabilities))
- {
- ClusteredProtocolSession.setSessionPeer(session, ClusterCapability.getPeer(capabilities));
- }
- else
- {
- _loadTable.incrementLocalLoad();
- }
- }
- }
-
- private class ConnectionCloseHandler extends ExtendedHandler<ConnectionCloseBody>
- {
- ConnectionCloseHandler()
- {
- super(ConnectionCloseMethodHandler.getInstance());
- }
-
- void postHandle(AMQStateManager stateMgr, AMQProtocolSession session, AMQMethodEvent<ConnectionCloseBody> evt)
- {
- if (!ClusteredProtocolSession.isPeerSession(session))
- {
- _loadTable.decrementLocalLoad();
- }
- }
- }
-
- private <B extends AMQMethodBody> ReplicatingHandler<B> replicated(StateAwareMethodListener<B> handler)
- {
- return new ReplicatingHandler<B>(_groupMgr, handler);
- }
-
- private <B extends AMQMethodBody> StateAwareMethodListener<B> alternate(StateAwareMethodListener<B> peer, StateAwareMethodListener<B> client)
- {
- return new PeerHandler<B>(peer, client);
- }
-
- private <B extends AMQMethodBody> StateAwareMethodListener<B> chain(ClusterMethodHandler<B>... h)
- {
- return new ChainedClusterMethodHandler<B>(h);
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/ExtendedHandler.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/ExtendedHandler.java
deleted file mode 100644
index 5b2c6f4a9a..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/ExtendedHandler.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.handler;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-
-class ExtendedHandler<A extends AMQMethodBody> implements StateAwareMethodListener<A>
-{
- private final StateAwareMethodListener<A> _base;
-
- ExtendedHandler(StateAwareMethodListener<A> base)
- {
- _base = base;
- }
-
- public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
- {
- preHandle(stateMgr, session, evt);
- _base.methodReceived(stateMgr, queues, exchanges, session, evt);
- postHandle(stateMgr, session, evt);
- }
-
- void preHandle(AMQStateManager stateMgr, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
- {
- }
-
- void postHandle(AMQStateManager stateMgr, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
- {
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/HandlerUtils.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/HandlerUtils.java
deleted file mode 100644
index 0dc7fe00d2..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/HandlerUtils.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.handler;
-
-public abstract class HandlerUtils
-{
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java
deleted file mode 100644
index 1e6bc26444..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.handler;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.server.cluster.ClusteredProtocolSession;
-import org.apache.qpid.server.cluster.GroupManager;
-import org.apache.qpid.server.cluster.util.LogMessage;
-import org.apache.qpid.server.cluster.MemberHandle;
-import org.apache.qpid.server.handler.QueueDeclareHandler;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.ClusteredQueue;
-import org.apache.qpid.server.queue.PrivateQueue;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.queue.RemoteQueueProxy;
-
-public class LocalQueueDeclareHandler extends QueueDeclareHandler
-{
- private static final Logger _logger = Logger.getLogger(LocalQueueDeclareHandler.class);
- private final GroupManager _groupMgr;
-
- LocalQueueDeclareHandler(GroupManager groupMgr)
- {
- _groupMgr = groupMgr;
- }
-
- protected String createName()
- {
- return super.createName() + "@" + _groupMgr.getLocal().getDetails();
- }
-
- protected AMQQueue createQueue(QueueDeclareBody body, QueueRegistry registry, AMQProtocolSession session) throws AMQException
- {
- //is it private or shared:
- if (body.exclusive)
- {
- if (ClusteredProtocolSession.isPeerSession(session))
- {
- //need to get peer from the session...
- MemberHandle peer = ClusteredProtocolSession.getSessionPeer(session);
- _logger.debug(new LogMessage("Creating proxied queue {0} on behalf of {1}", body.queue, peer));
- return new RemoteQueueProxy(peer, _groupMgr, body.queue, body.durable, peer.getDetails(), body.autoDelete, registry);
- }
- else
- {
- _logger.debug(new LogMessage("Creating local private queue {0}", body.queue));
- return new PrivateQueue(_groupMgr, body.queue, body.durable, session.getContextKey(), body.autoDelete, registry);
- }
- }
- else
- {
- _logger.debug(new LogMessage("Creating local shared queue {0}", body.queue));
- return new ClusteredQueue(_groupMgr, body.queue, body.durable, null, body.autoDelete, registry);
- }
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/NullListener.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/NullListener.java
deleted file mode 100644
index 2cd0989f10..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/NullListener.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.handler;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-
-public class NullListener<T extends AMQMethodBody> implements StateAwareMethodListener<T>
-{
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, AMQMethodEvent<T> evt) throws AMQException
- {
- }
-}
-
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/PeerHandler.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/PeerHandler.java
deleted file mode 100644
index 00f37951f2..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/PeerHandler.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.handler;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.server.cluster.ClusteredProtocolSession;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-
-/**
- * Base for implementing handlers that carry out different actions based on whether the method they
- * are handling was sent by a peer (i.e. another broker in the cluster) or a client (i.e. an end-user
- * application).
- *
- */
-public class PeerHandler<A extends AMQMethodBody> extends ClusterMethodHandler<A>
-{
- private final StateAwareMethodListener<A> _peer;
- private final StateAwareMethodListener<A> _client;
-
- PeerHandler(StateAwareMethodListener<A> peer, StateAwareMethodListener<A> client)
- {
- _peer = peer;
- _client = client;
- }
-
- protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
- {
- _peer.methodReceived(stateMgr, queues, exchanges, session, evt);
- }
-
- protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
- {
- _client.methodReceived(stateMgr, queues, exchanges, session, evt);
- }
-
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java
deleted file mode 100644
index 3e8528f533..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.handler;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.state.AMQStateManager;
-
-/**
- * Generates queue names for queues declared with no name.
- *
- */
-class QueueNameGenerator extends ClusterMethodHandler<QueueDeclareBody>
-{
- private final LocalQueueDeclareHandler _handler;
-
- QueueNameGenerator(LocalQueueDeclareHandler handler)
- {
- _handler = handler;
- }
-
- protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
- {
- }
-
- protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges,
- AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt)
- throws AMQException
- {
- setName(evt.getMethod());//need to set the name before propagating this method
- }
-
- protected void setName(QueueDeclareBody body)
- {
- if (body.queue == null)
- {
- body.queue = _handler.createName();
- }
- }
-}
-
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java
deleted file mode 100644
index 729a38c970..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.handler;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicCancelBody;
-import org.apache.qpid.server.cluster.ClusteredProtocolSession;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.ClusteredQueue;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-
-public class RemoteCancelHandler implements StateAwareMethodListener<BasicCancelBody>
-{
- private final Logger _logger = Logger.getLogger(RemoteCancelHandler.class);
-
- public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicCancelBody> evt) throws AMQException
- {
- //By convention, consumers setup between brokers use the queue name as the consumer tag:
- AMQQueue queue = queues.getQueue(evt.getMethod().consumerTag);
- if (queue instanceof ClusteredQueue)
- {
- ((ClusteredQueue) queue).removeRemoteSubscriber(ClusteredProtocolSession.getSessionPeer(session));
- }
- else
- {
- _logger.warn("Got remote cancel request for non-clustered queue: " + queue);
- }
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java
deleted file mode 100644
index 0836e9d5fa..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.handler;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.BasicConsumeOkBody;
-import org.apache.qpid.server.cluster.ClusteredProtocolSession;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.ClusteredQueue;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-
-/**
- * Handles consume requests from other cluster members.
- *
- */
-public class RemoteConsumeHandler implements StateAwareMethodListener<BasicConsumeBody>
-{
- private final Logger _logger = Logger.getLogger(RemoteConsumeHandler.class);
-
- public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException
- {
- AMQQueue queue = queues.getQueue(evt.getMethod().queue);
- if (queue instanceof ClusteredQueue)
- {
- ((ClusteredQueue) queue).addRemoteSubcriber(ClusteredProtocolSession.getSessionPeer(session));
- session.writeFrame(BasicConsumeOkBody.createAMQFrame(evt.getChannelId(), evt.getMethod().queue));
- }
- else
- {
- _logger.warn("Got remote consume request for non-clustered queue: " + queue);
- }
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java
deleted file mode 100644
index 03c644889e..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.handler;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.server.cluster.BroadcastPolicy;
-import org.apache.qpid.server.cluster.GroupManager;
-import org.apache.qpid.server.cluster.util.LogMessage;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.handler.BasicConsumeMethodHandler;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-
-public class ReplicatingConsumeHandler extends ReplicatingHandler<BasicConsumeBody>
-{
- ReplicatingConsumeHandler(GroupManager groupMgr)
- {
- this(groupMgr, null);
- }
-
- ReplicatingConsumeHandler(GroupManager groupMgr, BroadcastPolicy policy)
- {
- super(groupMgr, base(), policy);
- }
-
- protected void replicate(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException
- {
- //only replicate if the queue in question is a shared queue
- if (isShared(queues.getQueue(evt.getMethod().queue)))
- {
- super.replicate(stateMgr, queues, exchanges, session, evt);
- }
- else
- {
- _logger.info(new LogMessage("Handling consume for private queue ({0}) locally", evt.getMethod()));
- local(stateMgr, queues, exchanges, session, evt);
- _logger.info(new LogMessage("Handled consume for private queue ({0}) locally", evt.getMethod()));
-
- }
- }
-
- protected boolean isShared(AMQQueue queue)
- {
- return queue != null && queue.isShared();
- }
-
- static StateAwareMethodListener<BasicConsumeBody> base()
- {
- return new PeerHandler<BasicConsumeBody>(peer(), client());
- }
-
- static StateAwareMethodListener<BasicConsumeBody> peer()
- {
- return new RemoteConsumeHandler();
- }
-
- static StateAwareMethodListener<BasicConsumeBody> client()
- {
- return BasicConsumeMethodHandler.getInstance();
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java
deleted file mode 100644
index db340c6a61..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.handler;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.server.cluster.BroadcastPolicy;
-import org.apache.qpid.server.cluster.ClusteredProtocolSession;
-import org.apache.qpid.server.cluster.GroupManager;
-import org.apache.qpid.server.cluster.GroupResponseHandler;
-import org.apache.qpid.server.cluster.util.LogMessage;
-import org.apache.qpid.server.cluster.Member;
-import org.apache.qpid.server.cluster.SimpleSendable;
-import org.apache.qpid.server.cluster.policy.StandardPolicies;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-
-import java.util.List;
-
-/**
- * Basic template for handling methods that should be broadcast to the group and
- * processed locally after 'completion' of this broadcast.
- *
- */
-class ReplicatingHandler<A extends AMQMethodBody> extends ClusterMethodHandler<A> implements StandardPolicies
-{
- protected static final Logger _logger = Logger.getLogger(ReplicatingHandler.class);
-
- private final StateAwareMethodListener<A> _base;
- private final GroupManager _groupMgr;
- private final BroadcastPolicy _policy;
-
- ReplicatingHandler(GroupManager groupMgr, StateAwareMethodListener<A> base)
- {
- this(groupMgr, base, null);
- }
-
- ReplicatingHandler(GroupManager groupMgr, StateAwareMethodListener<A> base, BroadcastPolicy policy)
- {
- _groupMgr = groupMgr;
- _base = base;
- _policy = policy;
- }
-
- protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
- {
- local(stateMgr, queues, exchanges, session, evt);
- _logger.debug(new LogMessage("Handled {0} locally", evt.getMethod()));
- }
-
- protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
- {
- replicate(stateMgr, queues, exchanges, session, evt);
- }
-
- protected void replicate(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
- {
- if (_policy == null)
- {
- //asynch delivery
- _groupMgr.broadcast(new SimpleSendable(evt.getMethod()));
- local(stateMgr, queues, exchanges, session, evt);
- }
- else
- {
- Callback callback = new Callback(stateMgr, queues, exchanges, session, evt);
- _groupMgr.broadcast(new SimpleSendable(evt.getMethod()), _policy, callback);
- }
- _logger.debug(new LogMessage("Replicated {0} to peers", evt.getMethod()));
- }
-
- protected void local(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
- {
- _base.methodReceived(stateMgr, queues, exchanges, session, evt);
- }
-
- private class Callback implements GroupResponseHandler
- {
- private final AMQStateManager _stateMgr;
- private final QueueRegistry _queues;
- private final ExchangeRegistry _exchanges;
- private final AMQProtocolSession _session;
- private final AMQMethodEvent<A> _evt;
-
- Callback(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt)
- {
- _stateMgr = stateMgr;
- _queues = queues;
- _exchanges = exchanges;
- _session = session;
- _evt = evt;
- }
-
- public void response(List<AMQMethodBody> responses, List<Member> members)
- {
- try
- {
- local(_stateMgr, _queues, _exchanges, _session, _evt);
- _logger.debug(new LogMessage("Handled {0} locally, in response to completion of replication", _evt.getMethod()));
- }
- catch (AMQException e)
- {
- _logger.error(new LogMessage("Error handling {0}:{1}", _evt.getMethod(), e), e);
- }
- }
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/WrappedListener.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/WrappedListener.java
deleted file mode 100644
index d46913d042..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/WrappedListener.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.handler;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-
-public class WrappedListener<T extends AMQMethodBody> implements StateAwareMethodListener<T>
-{
- private final StateAwareMethodListener<T> _primary;
- private final StateAwareMethodListener _post;
- private final StateAwareMethodListener _pre;
-
- WrappedListener(StateAwareMethodListener<T> primary, StateAwareMethodListener pre, StateAwareMethodListener post)
- {
- _pre = check(pre);
- _post = check(post);
- _primary = check(primary);
- }
-
- public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<T> evt) throws AMQException
- {
- _pre.methodReceived(stateMgr, queues, exchanges, session, evt);
- _primary.methodReceived(stateMgr, queues, exchanges, session, evt);
- _post.methodReceived(stateMgr, queues, exchanges, session, evt);
- }
-
- private static <T extends AMQMethodBody> StateAwareMethodListener<T> check(StateAwareMethodListener<T> in)
- {
- return in == null ? new NullListener<T>() : in;
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/WrappingMethodHandlerFactory.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/WrappingMethodHandlerFactory.java
deleted file mode 100644
index 5ec3c9660a..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/handler/WrappingMethodHandlerFactory.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.handler;
-
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.server.cluster.MethodHandlerFactory;
-import org.apache.qpid.server.cluster.MethodHandlerRegistry;
-import org.apache.qpid.server.state.AMQState;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-
-public abstract class WrappingMethodHandlerFactory implements MethodHandlerFactory
-{
- private final MethodHandlerFactory _delegate;
- private final StateAwareMethodListener _pre;
- private final StateAwareMethodListener _post;
-
- protected WrappingMethodHandlerFactory(MethodHandlerFactory delegate,
- StateAwareMethodListener pre,
- StateAwareMethodListener post)
- {
- _delegate = delegate;
- _pre = pre;
- _post = post;
- }
-
- public MethodHandlerRegistry register(AMQState state, MethodHandlerRegistry registry)
- {
- if (isWrappableState(state))
- {
- return wrap(_delegate.register(state, registry), state);
- }
- else
- {
- return _delegate.register(state, registry);
- }
- }
-
- protected abstract boolean isWrappableState(AMQState state);
-
- protected abstract Iterable<FrameDescriptor> getWrappableFrameTypes(AMQState state);
-
- private MethodHandlerRegistry wrap(MethodHandlerRegistry registry, AMQState state)
- {
- for (FrameDescriptor fd : getWrappableFrameTypes(state))
- {
- wrap(registry, fd.type, fd.instance);
- }
- return registry;
- }
-
- private <A extends AMQMethodBody, B extends Class<A>> void wrap(MethodHandlerRegistry r, B type, A frame)
- {
- r.addHandler(type, new WrappedListener<A>(r.getHandler(frame), _pre, _post));
- }
-
- protected static class FrameDescriptor<A extends AMQMethodBody, B extends Class<A>>
- {
- protected final A instance;
- protected final B type;
-
- public FrameDescriptor(B type, A instance)
- {
- this.instance = instance;
- this.type = type;
- }
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/policy/AsynchBroadcastPolicy.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/policy/AsynchBroadcastPolicy.java
deleted file mode 100644
index 79cb558ede..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/policy/AsynchBroadcastPolicy.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.policy;
-
-import org.apache.qpid.server.cluster.BroadcastPolicy;
-
-public class AsynchBroadcastPolicy implements BroadcastPolicy
-{
- public boolean isComplete(int responded, int members)
- {
- return true;
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/policy/MajorityResponseBroadcastPolicy.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/policy/MajorityResponseBroadcastPolicy.java
deleted file mode 100644
index 42382c6e7a..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/policy/MajorityResponseBroadcastPolicy.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.policy;
-
-import org.apache.qpid.server.cluster.BroadcastPolicy;
-
-public class MajorityResponseBroadcastPolicy implements BroadcastPolicy
-{
- public boolean isComplete(int responded, int members)
- {
- return responded > members / 2;
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/policy/OneResponseBroadcastPolicy.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/policy/OneResponseBroadcastPolicy.java
deleted file mode 100644
index e3072a6a40..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/policy/OneResponseBroadcastPolicy.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.policy;
-
-import org.apache.qpid.server.cluster.BroadcastPolicy;
-
-public class OneResponseBroadcastPolicy implements BroadcastPolicy
-{
- public boolean isComplete(int responded, int members)
- {
- return responded > 0;
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/policy/StandardPolicies.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/policy/StandardPolicies.java
deleted file mode 100644
index dbaf690d3a..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/policy/StandardPolicies.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.policy;
-
-import org.apache.qpid.server.cluster.BroadcastPolicy;
-
-public interface StandardPolicies
-{
- public static final BroadcastPolicy ASYNCH_POLICY = new AsynchBroadcastPolicy();
- public static final BroadcastPolicy SYNCH_POLICY = new SynchBroadcastPolicy();
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/policy/SynchBroadcastPolicy.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/policy/SynchBroadcastPolicy.java
deleted file mode 100644
index 605b8dd51e..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/policy/SynchBroadcastPolicy.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.policy;
-
-import org.apache.qpid.server.cluster.BroadcastPolicy;
-
-public class SynchBroadcastPolicy implements BroadcastPolicy
-{
- public boolean isComplete(int responded, int members)
- {
- return responded == members;
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/replay/ChainedMethodRecorder.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/replay/ChainedMethodRecorder.java
deleted file mode 100644
index 3664be58bc..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/replay/ChainedMethodRecorder.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.replay;
-
-import org.apache.qpid.framing.AMQMethodBody;
-
-abstract class ChainedMethodRecorder <T extends AMQMethodBody> implements MethodRecorder<T>
-{
- private final MethodRecorder<T> _recorder;
-
- ChainedMethodRecorder()
- {
- this(null);
- }
-
- ChainedMethodRecorder(MethodRecorder<T> recorder)
- {
- _recorder = recorder;
- }
-
- public final void record(T method)
- {
- if(!doRecord(method) && _recorder != null)
- {
- _recorder.record(method);
- }
- }
-
- protected abstract boolean doRecord(T method);
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/replay/ConsumerCounts.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
deleted file mode 100644
index 3bd9f5d387..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.replay;
-
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.BasicConsumeBody;
-
-import java.util.Map;
-import java.util.HashMap;
-import java.util.List;
-
-class ConsumerCounts
-{
- private final Map<String, Integer> _counts = new HashMap<String, Integer>();
-
- synchronized void increment(String queue)
- {
- _counts.put(queue, get(queue) + 1);
- }
-
- synchronized void decrement(String queue)
- {
- _counts.put(queue, get(queue) - 1);
- }
-
- private int get(String queue)
- {
- Integer count = _counts.get(queue);
- return count == null ? 0 : count;
- }
-
- synchronized void replay(List<AMQMethodBody> messages)
- {
- for(String queue : _counts.keySet())
- {
- BasicConsumeBody m = new BasicConsumeBody();
- m.queue = queue;
- m.consumerTag = queue;
- replay(m, messages);
- }
- }
-
- private void replay(BasicConsumeBody msg, List<AMQMethodBody> messages)
- {
- int count = _counts.get(msg.queue);
- for(int i = 0; i < count; i++)
- {
- messages.add(msg);
- }
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/replay/MethodRecorder.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/replay/MethodRecorder.java
deleted file mode 100644
index e45810438e..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/replay/MethodRecorder.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.replay;
-
-import org.apache.qpid.framing.AMQMethodBody;
-
-/**
- * Abstraction through which a method can be recorded for replay
- *
- */
-interface MethodRecorder<T extends AMQMethodBody>
-{
- public void record(T method);
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
deleted file mode 100644
index 4a00b5cbc3..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.replay;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.BasicCancelBody;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
-import org.apache.qpid.framing.ExchangeDeleteBody;
-import org.apache.qpid.framing.QueueBindBody;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.QueueDeleteBody;
-import org.apache.qpid.server.cluster.MethodHandlerFactory;
-import org.apache.qpid.server.cluster.MethodHandlerRegistry;
-import org.apache.qpid.server.cluster.handler.WrappingMethodHandlerFactory;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.state.AMQState;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-
-import java.util.Arrays;
-
-public class RecordingMethodHandlerFactory extends WrappingMethodHandlerFactory
-{
- private final Iterable<FrameDescriptor> _frames = Arrays.asList(new FrameDescriptor[]
- {
- new FrameDescriptor(QueueDeclareBody.class, new QueueDeclareBody()),
- new FrameDescriptor(QueueDeleteBody.class, new QueueDeleteBody()),
- new FrameDescriptor(QueueBindBody.class, new QueueBindBody()),
- new FrameDescriptor(ExchangeDeclareBody.class, new ExchangeDeclareBody()),
- new FrameDescriptor(ExchangeDeleteBody.class, new ExchangeDeleteBody()),
- new FrameDescriptor(BasicConsumeBody.class, new BasicConsumeBody()),
- new FrameDescriptor(BasicCancelBody.class, new BasicCancelBody())
- });
-
-
- public RecordingMethodHandlerFactory(MethodHandlerFactory factory, ReplayStore store)
- {
- super(factory, null, store);
- }
-
- protected boolean isWrappableState(AMQState state)
- {
- return AMQState.CONNECTION_OPEN.equals(state);
- }
-
- protected Iterable<FrameDescriptor> getWrappableFrameTypes(AMQState state)
- {
- return _frames;
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/replay/ReplayManager.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/replay/ReplayManager.java
deleted file mode 100644
index 898cb80cb3..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/replay/ReplayManager.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.replay;
-
-import org.apache.qpid.server.cluster.Sendable;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQMethodBody;
-
-import java.util.List;
-
-/**
- * Abstraction of a replay strategy for use in getting joining members up to
- * date with respect to cluster state.
- *
- */
-public interface ReplayManager
-{
- public List<AMQMethodBody> replay(boolean isLeader);
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/replay/ReplayStore.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/replay/ReplayStore.java
deleted file mode 100644
index fa737cd1b6..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/replay/ReplayStore.java
+++ /dev/null
@@ -1,313 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.replay;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
-import org.apache.qpid.framing.ExchangeDeleteBody;
-import org.apache.qpid.framing.QueueBindBody;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.QueueDeleteBody;
-import org.apache.qpid.framing.ClusterSynchBody;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.BasicCancelBody;
-import org.apache.qpid.server.cluster.ClusteredProtocolSession;
-import org.apache.qpid.server.cluster.util.LogMessage;
-import org.apache.qpid.server.cluster.util.Bindings;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQMethodEvent;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * Stores method invocations for replay to new members.
- *
- */
-public class ReplayStore implements ReplayManager, StateAwareMethodListener
-{
- private static final Logger _logger = Logger.getLogger(ReplayStore.class);
-
- private final Map<Class<? extends AMQMethodBody>, MethodRecorder> _globalRecorders = new HashMap<Class<? extends AMQMethodBody>, MethodRecorder>();
- private final Map<Class<? extends AMQMethodBody>, MethodRecorder> _localRecorders = new HashMap<Class<? extends AMQMethodBody>, MethodRecorder>();
- private final Map<String, QueueDeclareBody> _sharedQueues = new ConcurrentHashMap<String, QueueDeclareBody>();
- private final Map<String, QueueDeclareBody> _privateQueues = new ConcurrentHashMap<String, QueueDeclareBody>();
- private final Bindings<String, String, QueueBindBody> _sharedBindings = new Bindings<String, String, QueueBindBody>();
- private final Bindings<String, String, QueueBindBody> _privateBindings = new Bindings<String, String, QueueBindBody>();
- private final Map<String, ExchangeDeclareBody> _exchanges = new ConcurrentHashMap<String, ExchangeDeclareBody>();
- private final ConsumerCounts _consumers = new ConsumerCounts();
-
- public ReplayStore()
- {
- _globalRecorders.put(QueueDeclareBody.class, new SharedQueueDeclareRecorder());
- _globalRecorders.put(QueueDeleteBody.class, new SharedQueueDeleteRecorder());
- _globalRecorders.put(QueueBindBody.class, new SharedQueueBindRecorder());
- _globalRecorders.put(ExchangeDeclareBody.class, new ExchangeDeclareRecorder());
- _globalRecorders.put(ExchangeDeleteBody.class, new ExchangeDeleteRecorder());
-
- _localRecorders.put(QueueDeclareBody.class, new PrivateQueueDeclareRecorder());
- _localRecorders.put(QueueDeleteBody.class, new PrivateQueueDeleteRecorder());
- _localRecorders.put(QueueBindBody.class, new PrivateQueueBindRecorder());
- _localRecorders.put(BasicConsumeBody.class, new BasicConsumeRecorder());
- _localRecorders.put(BasicCancelBody.class, new BasicCancelRecorder());
- _localRecorders.put(ExchangeDeclareBody.class, new ExchangeDeclareRecorder());
- _localRecorders.put(ExchangeDeleteBody.class, new ExchangeDeleteRecorder());
- }
-
- public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent evt) throws AMQException
- {
- _logger.debug(new LogMessage("Replay store received {0}", evt.getMethod()));
- AMQMethodBody request = evt.getMethod();
-
- //allow any (relevant) recorder registered for this type of request to record it:
- MethodRecorder recorder = getRecorders(session).get(request.getClass());
- if (recorder != null)
- {
- recorder.record(request);
- }
- }
-
- private Map<Class<? extends AMQMethodBody>, MethodRecorder> getRecorders(AMQProtocolSession session)
- {
- if (ClusteredProtocolSession.isPeerSession(session))
- {
- return _globalRecorders;
- }
- else
- {
- return _localRecorders;
- }
- }
-
- public List<AMQMethodBody> replay(boolean isLeader)
- {
- List<AMQMethodBody> methods = new ArrayList<AMQMethodBody>();
- methods.addAll(_exchanges.values());
- methods.addAll(_privateQueues.values());
- synchronized(_privateBindings)
- {
- methods.addAll(_privateBindings.values());
- }
- if (isLeader)
- {
- methods.addAll(_sharedQueues.values());
- synchronized(_sharedBindings)
- {
- methods.addAll(_sharedBindings.values());
- }
- }
- _consumers.replay(methods);
- methods.add(new ClusterSynchBody());
- return methods;
- }
-
- private class BasicConsumeRecorder implements MethodRecorder<BasicConsumeBody>
- {
- public void record(BasicConsumeBody method)
- {
- if(_sharedQueues.containsKey(method.queue))
- {
- _consumers.increment(method.queue);
- }
- }
- }
-
- private class BasicCancelRecorder implements MethodRecorder<BasicCancelBody>
- {
- public void record(BasicCancelBody method)
- {
- if(_sharedQueues.containsKey(method.consumerTag))
- {
- _consumers.decrement(method.consumerTag);
- }
- }
- }
-
- private class SharedQueueDeclareRecorder extends QueueDeclareRecorder
- {
- SharedQueueDeclareRecorder()
- {
- super(false, _sharedQueues);
- }
- }
-
- private class PrivateQueueDeclareRecorder extends QueueDeclareRecorder
- {
- PrivateQueueDeclareRecorder()
- {
- super(true, _privateQueues, new SharedQueueDeclareRecorder());
- }
- }
-
- private class SharedQueueDeleteRecorder extends QueueDeleteRecorder
- {
- SharedQueueDeleteRecorder()
- {
- super(_sharedQueues, _sharedBindings);
- }
- }
-
- private class PrivateQueueDeleteRecorder extends QueueDeleteRecorder
- {
- PrivateQueueDeleteRecorder()
- {
- super(_privateQueues, _privateBindings, new SharedQueueDeleteRecorder());
- }
- }
-
- private class SharedQueueBindRecorder extends QueueBindRecorder
- {
- SharedQueueBindRecorder()
- {
- super(_sharedQueues, _sharedBindings);
- }
- }
-
- private class PrivateQueueBindRecorder extends QueueBindRecorder
- {
- PrivateQueueBindRecorder()
- {
- super(_privateQueues, _privateBindings, new SharedQueueBindRecorder());
- }
- }
-
-
- private static class QueueDeclareRecorder extends ChainedMethodRecorder<QueueDeclareBody>
- {
- private final boolean _exclusive;
- private final Map<String, QueueDeclareBody> _queues;
-
- QueueDeclareRecorder(boolean exclusive, Map<String, QueueDeclareBody> queues)
- {
- _queues = queues;
- _exclusive = exclusive;
- }
-
- QueueDeclareRecorder(boolean exclusive, Map<String, QueueDeclareBody> queues, QueueDeclareRecorder recorder)
- {
- super(recorder);
- _queues = queues;
- _exclusive = exclusive;
- }
-
-
- protected boolean doRecord(QueueDeclareBody method)
- {
- if (_exclusive == method.exclusive)
- {
- _queues.put(method.queue, method);
- return true;
- }
- else
- {
- return false;
- }
- }
- }
-
- private class QueueDeleteRecorder extends ChainedMethodRecorder<QueueDeleteBody>
- {
- private final Map<String, QueueDeclareBody> _queues;
- private final Bindings<String, String, QueueBindBody> _bindings;
-
- QueueDeleteRecorder(Map<String, QueueDeclareBody> queues, Bindings<String, String, QueueBindBody> bindings)
- {
- this(queues, bindings, null);
- }
-
- QueueDeleteRecorder(Map<String, QueueDeclareBody> queues, Bindings<String, String, QueueBindBody> bindings, QueueDeleteRecorder recorder)
- {
- super(recorder);
- _queues = queues;
- _bindings = bindings;
- }
-
- protected boolean doRecord(QueueDeleteBody method)
- {
- if (_queues.remove(method.queue) != null)
- {
- _bindings.unbind1(method.queue);
- return true;
- }
- else
- {
- return false;
- }
- }
- }
-
- private class QueueBindRecorder extends ChainedMethodRecorder<QueueBindBody>
- {
- private final Map<String, QueueDeclareBody> _queues;
- private final Bindings<String, String, QueueBindBody> _bindings;
-
- QueueBindRecorder(Map<String, QueueDeclareBody> queues, Bindings<String, String, QueueBindBody> bindings)
- {
- _queues = queues;
- _bindings = bindings;
- }
-
- QueueBindRecorder(Map<String, QueueDeclareBody> queues, Bindings<String, String, QueueBindBody> bindings, QueueBindRecorder recorder)
- {
- super(recorder);
- _queues = queues;
- _bindings = bindings;
- }
-
- protected boolean doRecord(QueueBindBody method)
- {
- if (_queues.containsKey(method.queue))
- {
- _bindings.bind(method.queue, method.exchange, method);
- return true;
- }
- else
- {
- return false;
- }
- }
- }
-
- private class ExchangeDeclareRecorder implements MethodRecorder<ExchangeDeclareBody>
- {
- public void record(ExchangeDeclareBody method)
- {
- _exchanges.put(method.exchange, method);
- }
- }
-
- private class ExchangeDeleteRecorder implements MethodRecorder<ExchangeDeleteBody>
- {
- public void record(ExchangeDeleteBody method)
- {
- _exchanges.remove(method.exchange);
- }
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/util/Bindings.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/util/Bindings.java
deleted file mode 100644
index 49de0a7cbf..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/util/Bindings.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.util;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-
-/**
- * Maps two separate keys to a list of values.
- *
- */
-public class Bindings<K1, K2, V>
-{
- private final MultiValuedMap<K1, Binding<K2>> _a = new MultiValuedMap<K1, Binding<K2>>();
- private final MultiValuedMap<K2, Binding<K1>> _b = new MultiValuedMap<K2, Binding<K1>>();
- private final Collection<V> _values = new HashSet<V>();
-
- public void bind(K1 key1, K2 key2, V value)
- {
- _a.add(key1, new Binding<K2>(key2, value));
- _b.add(key2, new Binding<K1>(key1, value));
- _values.add(value);
- }
-
- public void unbind1(K1 key1)
- {
- Collection<Binding<K2>> values = _a.remove(key1);
- for (Binding<K2> v : values)
- {
- _b.remove(v.key);
- _values.remove(v.value);
- }
- }
-
- public void unbind2(K2 key2)
- {
- Collection<Binding<K1>> values = _b.remove(key2);
- for (Binding<K1> v : values)
- {
- _a.remove(v.key);
- _values.remove(v.value);
- }
- }
-
- public Collection<V> values()
- {
- return Collections.unmodifiableCollection(_values);
- }
-
- /**
- * Value needs to hold key to the other map
- */
- private class Binding<T>
- {
- private final T key;
- private final V value;
-
- Binding(T key, V value)
- {
- this.key = key;
- this.value = value;
- }
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/util/InvokeMultiple.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/util/InvokeMultiple.java
deleted file mode 100644
index 406fe45701..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/util/InvokeMultiple.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.util;
-
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.util.Set;
-import java.util.HashSet;
-
-/**
- * Allows a method to be invoked on a list of listeners with one call
- *
- */
-public class InvokeMultiple <T> implements InvocationHandler
-{
- private final Set<T> _targets = new HashSet<T>();
- private final T _proxy;
-
- public InvokeMultiple(Class<? extends T> type)
- {
- _proxy = (T) Proxy.newProxyInstance(type.getClassLoader(), new Class[]{type}, this);
- }
-
- public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
- {
- Set<T> targets;
- synchronized(this)
- {
- targets = new HashSet<T>(_targets);
- }
-
- for(T target : targets)
- {
- method.invoke(target, args);
- }
- return null;
- }
-
- public synchronized void addListener(T t)
- {
- _targets.add(t);
- }
-
- public synchronized void removeListener(T t)
- {
- _targets.remove(t);
- }
-
- public T getProxy()
- {
- return _proxy;
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/util/LogMessage.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/util/LogMessage.java
deleted file mode 100644
index 9be90298ea..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/util/LogMessage.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.util;
-
-import java.text.MessageFormat;
-
-/**
- * Convenience class to allow log messages to be specified in terms
- * of MessageFormat patterns with a variable set of parameters. The
- * production of the string is only done if toSTring is called so it
- * works well with debug level messages, allowing complex messages
- * to be specified that are only evaluated if actually printed.
- *
- */
-public class LogMessage
-{
- private final String _message;
- private final Object[] _args;
-
- public LogMessage(String message)
- {
- this(message, new Object[0]);
- }
-
- public LogMessage(String message, Object... args)
- {
- _message = message;
- _args = args;
- }
-
- public String toString()
- {
- return MessageFormat.format(_message, _args);
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/cluster/util/MultiValuedMap.java b/qpid/java/cluster/src/org/apache/qpid/server/cluster/util/MultiValuedMap.java
deleted file mode 100644
index ebe1fe47dd..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/cluster/util/MultiValuedMap.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.cluster.util;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Maps a key to a collection of values
- *
- */
-public class MultiValuedMap<K, V>
-{
- private Map<K, Collection<V>> _map = new HashMap<K, Collection<V>>();
-
- public boolean add(K key, V value)
- {
- Collection<V> values = get(key);
- if (values == null)
- {
- values = createList();
- _map.put(key, values);
- }
- return values.add(value);
- }
-
- public Collection<V> get(K key)
- {
- return _map.get(key);
- }
-
- public Collection<V> remove(K key)
- {
- return _map.remove(key);
- }
-
- protected Collection<V> createList()
- {
- return new ArrayList<V>();
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/queue/ClusteredQueue.java b/qpid/java/cluster/src/org/apache/qpid/server/queue/ClusteredQueue.java
deleted file mode 100644
index ee16f6062f..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/queue/ClusteredQueue.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicCancelBody;
-import org.apache.qpid.framing.QueueDeleteBody;
-import org.apache.qpid.server.cluster.*;
-import org.apache.qpid.server.cluster.util.LogMessage;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-
-/**
- * Represents a shared queue in a cluster. The key difference is that as well as any
- * local consumers, there may be consumers for this queue on other members of the
- * cluster.
- *
- */
-public class ClusteredQueue extends AMQQueue
-{
- private static final Logger _logger = Logger.getLogger(ClusteredQueue.class);
- private final ConcurrentHashMap<SimpleMemberHandle, RemoteSubscriptionImpl> _peers = new ConcurrentHashMap<SimpleMemberHandle, RemoteSubscriptionImpl>();
- private final GroupManager _groupMgr;
- private final NestedSubscriptionManager _subscriptions;
-
- public ClusteredQueue(GroupManager groupMgr, String name, boolean durable, String owner, boolean autoDelete, QueueRegistry queueRegistry)
- throws AMQException
- {
- super(name, durable, owner, autoDelete, queueRegistry, new ClusteredSubscriptionManager());
- _groupMgr = groupMgr;
- _subscriptions = ((ClusteredSubscriptionManager) getSubscribers()).getAllSubscribers();
- }
-
- public ClusteredQueue(GroupManager groupMgr, String name, boolean durable, String owner, boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery)
- throws AMQException
- {
- super(name, durable, owner, autoDelete, queueRegistry, asyncDelivery, new ClusteredSubscriptionManager(),
- new SubscriptionImpl.Factory());
- _groupMgr = groupMgr;
- _subscriptions = ((ClusteredSubscriptionManager) getSubscribers()).getAllSubscribers();
- }
-
- public void deliver(AMQMessage message) throws AMQException
- {
- _logger.info(new LogMessage("{0} delivered to clustered queue {1}", message, this));
- super.deliver(message);
- }
-
- protected void autodelete() throws AMQException
- {
- if(!_subscriptions.hasActiveSubscribers())
- {
- //delete locally:
- delete();
-
- //send deletion request to all other members:
- QueueDeleteBody request = new QueueDeleteBody();
- request.queue = getName();
- _groupMgr.broadcast(new SimpleSendable(request));
- }
- }
-
- public void unregisterProtocolSession(AMQProtocolSession ps, int channel, String consumerTag) throws AMQException
- {
- //handle locally:
- super.unregisterProtocolSession(ps, channel, consumerTag);
-
- //signal other members:
- BasicCancelBody request = new BasicCancelBody();
- request.consumerTag = getName();
- _groupMgr.broadcast(new SimpleSendable(request));
- }
-
- public void addRemoteSubcriber(MemberHandle peer)
- {
- _logger.info(new LogMessage("Added remote subscriber for {0} to clustered queue {1}", peer, this));
- //find (or create) a matching subscriber for the peer then increment the count
- getSubscriber(key(peer), true).increment();
- }
-
- public void removeRemoteSubscriber(MemberHandle peer)
- {
- //find a matching subscriber for the peer then decrement the count
- //if count is now zero, remove the subscriber
- SimpleMemberHandle key = key(peer);
- RemoteSubscriptionImpl s = getSubscriber(key, true);
- if (s == null)
- {
- throw new RuntimeException("No subscriber for " + peer);
- }
- if (s.decrement())
- {
- _peers.remove(key);
- _subscriptions.removeSubscription(s);
- }
- }
-
- public void removeAllRemoteSubscriber(MemberHandle peer)
- {
- SimpleMemberHandle key = key(peer);
- RemoteSubscriptionImpl s = getSubscriber(key, true);
- _peers.remove(key);
- _subscriptions.removeSubscription(s);
- }
-
- private RemoteSubscriptionImpl getSubscriber(SimpleMemberHandle key, boolean create)
- {
- RemoteSubscriptionImpl s = _peers.get(key);
- if (s == null && create)
- {
- return addSubscriber(key, new RemoteSubscriptionImpl(_groupMgr, key));
- }
- else
- {
- return s;
- }
- }
-
- private RemoteSubscriptionImpl addSubscriber(SimpleMemberHandle key, RemoteSubscriptionImpl s)
- {
- RemoteSubscriptionImpl other = _peers.putIfAbsent(key, s);
- if (other == null)
- {
- _subscriptions.addSubscription(s);
- new SubscriberCleanup(key, this, _groupMgr);
- return s;
- }
- else
- {
- return other;
- }
- }
-
- private SimpleMemberHandle key(MemberHandle peer)
- {
- return peer instanceof SimpleMemberHandle ? (SimpleMemberHandle) peer : (SimpleMemberHandle) SimpleMemberHandle.resolve(peer);
- }
-
- static boolean isFromBroker(AMQMessage msg)
- {
- return ClusteredProtocolSession.isPayloadFromPeer(msg);
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java b/qpid/java/cluster/src/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java
deleted file mode 100644
index fa20e9ab76..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.cluster.util.LogMessage;
-
-class ClusteredSubscriptionManager extends SubscriptionSet
-{
- private static final Logger _logger = Logger.getLogger(ClusteredSubscriptionManager.class);
- private final NestedSubscriptionManager _all;
-
- ClusteredSubscriptionManager()
- {
- this(new NestedSubscriptionManager());
- }
-
- private ClusteredSubscriptionManager(NestedSubscriptionManager all)
- {
- _all = all;
- _all.addSubscription(new Parent());
- }
-
- NestedSubscriptionManager getAllSubscribers()
- {
- return _all;
- }
-
- public boolean hasActiveSubscribers()
- {
- return _all.hasActiveSubscribers();
- }
-
- public Subscription nextSubscriber(AMQMessage msg)
- {
- if(ClusteredQueue.isFromBroker(msg))
- {
- //if message is from another broker, it should only be delivered
- //to another client to meet ordering constraints
- Subscription s = super.nextSubscriber(msg);
- _logger.info(new LogMessage("Returning next *client* subscriber {0}", s));
- if(s == null)
- {
- //TODO: deliver to another broker, but set the redelivered flag on the msg
- //(this should be policy based)
-
- //for now just don't deliver it
- return null;
- }
- else
- {
- return s;
- }
- }
- Subscription s = _all.nextSubscriber(msg);
- _logger.info(new LogMessage("Returning next subscriber {0}", s));
- return s;
- }
-
- private class Parent implements WeightedSubscriptionManager
- {
- public int getWeight()
- {
- return ClusteredSubscriptionManager.this.getWeight();
- }
-
- public boolean hasActiveSubscribers()
- {
- return ClusteredSubscriptionManager.super.hasActiveSubscribers();
- }
-
- public Subscription nextSubscriber(AMQMessage msg)
- {
- return ClusteredSubscriptionManager.super.nextSubscriber(msg);
- }
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/queue/NestedSubscriptionManager.java b/qpid/java/cluster/src/org/apache/qpid/server/queue/NestedSubscriptionManager.java
deleted file mode 100644
index d01ebb5ba2..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/queue/NestedSubscriptionManager.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-/**
- * Distributes messages among a list of subsscription managers, using their
- * weighting.
- *
- */
-class NestedSubscriptionManager implements SubscriptionManager
-{
- private final List<WeightedSubscriptionManager> _subscribers = new CopyOnWriteArrayList<WeightedSubscriptionManager>();
- private int _iterations;
- private int _index;
-
- void addSubscription(WeightedSubscriptionManager s)
- {
- _subscribers.add(s);
- }
-
- void removeSubscription(WeightedSubscriptionManager s)
- {
- _subscribers.remove(s);
- }
-
- public boolean hasActiveSubscribers()
- {
- for(WeightedSubscriptionManager s : _subscribers)
- {
- if(s.hasActiveSubscribers())
- {
- return true;
- }
- }
- return false;
- }
-
- public Subscription nextSubscriber(AMQMessage msg)
- {
- WeightedSubscriptionManager start = current();
- for(WeightedSubscriptionManager s = start; s != null; s = next(start))
- {
- if(hasMore(s))
- {
- return nextSubscriber(s);
- }
- }
- return null;
- }
-
- private Subscription nextSubscriber(WeightedSubscriptionManager s)
- {
- _iterations++;
- return s.nextSubscriber(null);
- }
-
- private WeightedSubscriptionManager current()
- {
- return _subscribers.isEmpty() ? null : _subscribers.get(_index);
- }
-
- private boolean hasMore(WeightedSubscriptionManager s)
- {
- return _iterations < s.getWeight();
- }
-
- private WeightedSubscriptionManager next(WeightedSubscriptionManager start)
- {
- WeightedSubscriptionManager s = next();
- return s == start && !hasMore(s) ? null : s;
- }
-
- private WeightedSubscriptionManager next()
- {
- _iterations = 0;
- if(++_index >= _subscribers.size())
- {
- _index = 0;
- }
- return _subscribers.get(_index);
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/queue/PrivateQueue.java b/qpid/java/cluster/src/org/apache/qpid/server/queue/PrivateQueue.java
deleted file mode 100644
index a3af0fedc7..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/queue/PrivateQueue.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.cluster.SimpleSendable;
-import org.apache.qpid.server.cluster.GroupManager;
-import org.apache.qpid.framing.QueueDeleteBody;
-
-import java.util.concurrent.Executor;
-
-/**
- * Used to represent a private queue held locally.
- *
- */
-public class PrivateQueue extends AMQQueue
-{
- private final GroupManager _groupMgr;
-
- public PrivateQueue(GroupManager groupMgr, String name, boolean durable, String owner, boolean autoDelete, QueueRegistry queueRegistry)
- throws AMQException
- {
- super(name, durable, owner, autoDelete, queueRegistry);
- _groupMgr = groupMgr;
-
- }
-
- public PrivateQueue(GroupManager groupMgr, String name, boolean durable, String owner, boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery)
- throws AMQException
- {
- super(name, durable, owner, autoDelete, queueRegistry, asyncDelivery);
- _groupMgr = groupMgr;
- }
-
- protected void autodelete() throws AMQException
- {
- //delete locally:
- super.autodelete();
-
- //send delete request to peers:
- QueueDeleteBody request = new QueueDeleteBody();
- request.queue = getName();
- _groupMgr.broadcast(new SimpleSendable(request));
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/queue/ProxiedQueueCleanup.java b/qpid/java/cluster/src/org/apache/qpid/server/queue/ProxiedQueueCleanup.java
deleted file mode 100644
index efc0540c18..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/queue/ProxiedQueueCleanup.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.server.cluster.MembershipChangeListener;
-import org.apache.qpid.server.cluster.MemberHandle;
-import org.apache.qpid.server.cluster.GroupManager;
-import org.apache.qpid.server.cluster.util.LogMessage;
-import org.apache.qpid.AMQException;
-import org.apache.log4j.Logger;
-
-import java.util.List;
-
-class ProxiedQueueCleanup implements MembershipChangeListener
-{
- private static final Logger _logger = Logger.getLogger(ProxiedQueueCleanup.class);
-
- private final MemberHandle _subject;
- private final RemoteQueueProxy _queue;
-
- ProxiedQueueCleanup(MemberHandle subject, RemoteQueueProxy queue)
- {
- _subject = subject;
- _queue = queue;
- }
-
- public void changed(List<MemberHandle> members)
- {
- if(!members.contains(_subject))
- {
- try
- {
- _queue.delete();
- _logger.info(new LogMessage("Deleted {0} in response to exclusion of {1}", _queue, _subject));
- }
- catch (AMQException e)
- {
- _logger.info(new LogMessage("Failed to delete {0} in response to exclusion of {1}: {2}", _queue, _subject, e), e);
- }
- }
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/queue/RemoteQueueProxy.java b/qpid/java/cluster/src/org/apache/qpid/server/queue/RemoteQueueProxy.java
deleted file mode 100644
index 752cf05a82..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/queue/RemoteQueueProxy.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQBody;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.server.cluster.ClusteredProtocolSession;
-import org.apache.qpid.server.cluster.GroupManager;
-import org.apache.qpid.server.cluster.util.LogMessage;
-import org.apache.qpid.server.cluster.MemberHandle;
-import org.apache.qpid.server.cluster.SimpleSendable;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Executor;
-
-/**
- * TODO: separate out an abstract base class from AMQQueue from which this inherits. It does
- * not require all the functionality currently in AMQQueue.
- *
- */
-public class RemoteQueueProxy extends AMQQueue
-{
- private static final Logger _logger = Logger.getLogger(RemoteQueueProxy.class);
- private final MemberHandle _target;
- private final GroupManager _groupMgr;
-
- public RemoteQueueProxy(MemberHandle target, GroupManager groupMgr, String name, boolean durable, String owner, boolean autoDelete, QueueRegistry queueRegistry)
- throws AMQException
- {
- super(name, durable, owner, autoDelete, queueRegistry);
- _target = target;
- _groupMgr = groupMgr;
- _groupMgr.addMemberhipChangeListener(new ProxiedQueueCleanup(target, this));
- }
-
- public RemoteQueueProxy(MemberHandle target, GroupManager groupMgr, String name, boolean durable, String owner, boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery)
- throws AMQException
- {
- super(name, durable, owner, autoDelete, queueRegistry, asyncDelivery);
- _target = target;
- _groupMgr = groupMgr;
- _groupMgr.addMemberhipChangeListener(new ProxiedQueueCleanup(target, this));
- }
-
- public void deliver(AMQMessage msg) throws NoConsumersException
- {
- if (ClusteredProtocolSession.canRelay(msg, _target))
- {
- try
- {
- _logger.debug(new LogMessage("Relaying {0} to {1}", msg, _target));
- relay(msg);
- }
- catch (NoConsumersException e)
- {
- throw e;
- }
- catch (AMQException e)
- {
- //TODO: sort out exception handling...
- e.printStackTrace();
- }
- }
- else
- {
- _logger.debug(new LogMessage("Cannot relay {0} to {1}", msg, _target));
- }
- }
-
- void relay(AMQMessage msg) throws AMQException
- {
- BasicPublishBody publish = msg.getPublishBody();
- ContentHeaderBody header = msg.getContentHeaderBody();
- List<ContentBody> bodies = msg.getContentBodies();
-
- //(i) construct a new publishing block:
- publish.immediate = false;//can't as yet handle the immediate flag in a cluster
- List<AMQBody> parts = new ArrayList<AMQBody>(2 + bodies.size());
- parts.add(publish);
- parts.add(header);
- parts.addAll(bodies);
-
- //(ii) send this on to the broker for which it is acting as proxy:
- _groupMgr.send(_target, new SimpleSendable(parts));
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java b/qpid/java/cluster/src/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
deleted file mode 100644
index 0268ff2171..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.server.cluster.MemberHandle;
-import org.apache.qpid.server.cluster.GroupManager;
-import org.apache.qpid.server.cluster.SimpleSendable;
-import org.apache.qpid.AMQException;
-
-class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManager
-{
- private final GroupManager _groupMgr;
- private final MemberHandle _peer;
- private boolean _suspended;
- private int _count;
-
- RemoteSubscriptionImpl(GroupManager groupMgr, MemberHandle peer)
- {
- _groupMgr = groupMgr;
- _peer = peer;
- }
-
- synchronized void increment()
- {
- _count++;
- }
-
- synchronized boolean decrement()
- {
- return --_count <= 0;
- }
-
- public void send(AMQMessage msg, AMQQueue queue)
- {
- try
- {
- _groupMgr.send(_peer, new SimpleSendable(msg.getPayload()));
- }
- catch (AMQException e)
- {
- //TODO: handle exceptions properly...
- e.printStackTrace();
- }
- }
-
- public synchronized void setSuspended(boolean suspended)
- {
- _suspended = suspended;
- }
-
- public synchronized boolean isSuspended()
- {
- return _suspended;
- }
-
- public synchronized int getWeight()
- {
- return _count;
- }
-
- public boolean hasActiveSubscribers()
- {
- return getWeight() == 0;
- }
-
- public Subscription nextSubscriber(AMQMessage msg)
- {
- return this;
- }
-
- public void queueDeleted(AMQQueue queue)
- {
- if(queue instanceof ClusteredQueue)
- {
- ((ClusteredQueue) queue).removeAllRemoteSubscriber(_peer);
- }
- }
-}
diff --git a/qpid/java/cluster/src/org/apache/qpid/server/queue/SubscriberCleanup.java b/qpid/java/cluster/src/org/apache/qpid/server/queue/SubscriberCleanup.java
deleted file mode 100644
index cc951a4709..0000000000
--- a/qpid/java/cluster/src/org/apache/qpid/server/queue/SubscriberCleanup.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.server.cluster.MembershipChangeListener;
-import org.apache.qpid.server.cluster.MemberHandle;
-import org.apache.qpid.server.cluster.GroupManager;
-import org.apache.qpid.server.cluster.util.LogMessage;
-import org.apache.log4j.Logger;
-
-import java.util.List;
-
-class SubscriberCleanup implements MembershipChangeListener
-{
- private static final Logger _logger = Logger.getLogger(SubscriberCleanup.class);
-
- private final MemberHandle _subject;
- private final ClusteredQueue _queue;
- private final GroupManager _manager;
-
- SubscriberCleanup(MemberHandle subject, ClusteredQueue queue, GroupManager manager)
- {
- _subject = subject;
- _queue = queue;
- _manager = manager;
- _manager.addMemberhipChangeListener(this);
- }
-
- public void changed(List<MemberHandle> members)
- {
- if(!members.contains(_subject))
- {
- _queue.removeAllRemoteSubscriber(_subject);
- _manager.removeMemberhipChangeListener(this);
- _logger.info(new LogMessage("Removed {0} from {1}", _subject, _queue));
- }
- }
-}