diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2007-11-26 17:05:05 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2007-11-26 17:05:05 +0000 |
commit | e7bfaa4a7fe3dae4079e84010a1be5e80a63fc6a (patch) | |
tree | 629dbbf2de566d941fc67283b74133789e5c1c79 /Final/java/cluster/src | |
parent | 44ff34c5a5d2a4ee8976dcead0e7876c347e54c3 (diff) | |
download | qpid-python-e7bfaa4a7fe3dae4079e84010a1be5e80a63fc6a.tar.gz |
tag for M2 final release
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/tags/M2@598344 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'Final/java/cluster/src')
89 files changed, 7928 insertions, 0 deletions
diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQConnectionWaitException.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQConnectionWaitException.java new file mode 100644 index 0000000000..2baaa344ef --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQConnectionWaitException.java @@ -0,0 +1,42 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.qpid.AMQException;
+
+/**
+ * AMQConnectionWaitException represents a failure to connect to a cluster peer in a timely manner.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represents failure to connect to a cluster peer in a timely manner.
+ * </table>
+ *
+ * @todo Not an AMQP exception as no status code.
+ */
+public class AMQConnectionWaitException extends AMQException
+{
+ public AMQConnectionWaitException(String s, Throwable e)
+ {
+ super(s, e);
+
+ }
+}
diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedBodyTypeException.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedBodyTypeException.java new file mode 100644 index 0000000000..951bd22df0 --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedBodyTypeException.java @@ -0,0 +1,46 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQBody;
+
+/**
+ * AMQUnexpectedBodyTypeException represents a failure where a message body does not match its expected type. For example,
+ * and AMQP method should have a method body.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represents a failure where a message body does not match its expected type.
+ * </table>
+ *
+ * @todo Not an AMQP exception as no status code.
+ *
+ * @todo Seems like this exception was created to handle an unsafe type cast that will never happen in practice. Would
+ * be better just to leave that as a ClassCastException. Check that the framing layer will pick up the error first.
+ */
+public class AMQUnexpectedBodyTypeException extends AMQException
+{
+ public AMQUnexpectedBodyTypeException(Class<? extends AMQBody> expectedClass, AMQBody body)
+ {
+ super("Unexpected body type. Expected: " + expectedClass.getName() + "; got: " + body.getClass().getName());
+ }
+}
diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedFrameTypeException.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedFrameTypeException.java new file mode 100644 index 0000000000..4dd318f90d --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedFrameTypeException.java @@ -0,0 +1,45 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.qpid.AMQException;
+
+/**
+ * AMQUnexpectedFrameTypeException represents a failure when Mina passes an unexpected frame type.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represents failure to cast a frame to its expected type.
+ * </table>
+ *
+ * @todo Not an AMQP exception as no status code.
+ *
+ * @todo Seems like this exception was created to handle an unsafe type cast that will never happen in practice. Would
+ * be better just to leave that as a ClassCastException. However, check the framing layer catches this error
+ * first.
+ */
+public class AMQUnexpectedFrameTypeException extends AMQException
+{
+ public AMQUnexpectedFrameTypeException(String s)
+ {
+ super(s);
+ }
+}
diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/BlockingHandler.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/BlockingHandler.java new file mode 100644 index 0000000000..39508df566 --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/BlockingHandler.java @@ -0,0 +1,91 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.framing.AMQMethodBody; + +public class BlockingHandler implements ResponseHandler +{ + private final Class _expected; + private boolean _completed; + private AMQMethodBody _response; + + + public BlockingHandler() + { + this(AMQMethodBody.class); + } + + public BlockingHandler(Class<? extends AMQMethodBody> expected) + { + _expected = expected; + } + + public void responded(AMQMethodBody response) + { + if (_expected.isInstance(response)) + { + _response = response; + completed(); + } + } + + public void removed() + { + completed(); + } + + private synchronized void completed() + { + _completed = true; + notifyAll(); + } + + synchronized void waitForCompletion() + { + while (!_completed) + { + try + { + wait(); + } + catch (InterruptedException ignore) + { + + } + } + } + + AMQMethodBody getResponse() + { + return _response; + } + + boolean failed() + { + return _response == null; + } + + boolean isCompleted() + { + return _completed; + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/BroadcastPolicy.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/BroadcastPolicy.java new file mode 100644 index 0000000000..145aa58574 --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/BroadcastPolicy.java @@ -0,0 +1,26 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +public interface BroadcastPolicy +{ + public boolean isComplete(int responded, int members); +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/Broker.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/Broker.java new file mode 100644 index 0000000000..7e2cf6da83 --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/Broker.java @@ -0,0 +1,247 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.cluster.util.LogMessage; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * An implementation of the Member interface (through which data is sent to other + * peers in the cluster). This class provides a base from which subclasses can + * inherit some common behaviour for broadcasting GroupRequests and sending methods + * that may expect a response. It also extends the Member abstraction to support + * a richer set of operations that are useful within the package but should not be + * exposed outside of it. + * + */ +abstract class Broker extends SimpleMemberHandle implements Member +{ + private static final Logger _logger = Logger.getLogger(Broker.class); + private static final int DEFAULT_CHANNEL = 1; + private static final int START_CHANNEL = 2; + private static final int END_CHANNEL = 10000; + + + private MemberFailureListener _listener; + //a wrap-around counter to allocate _requests a unique channel: + private int _nextChannel = START_CHANNEL; + //outstanding _requests: + private final Map<Integer, ResponseHandler> _requests = new HashMap<Integer, ResponseHandler>(); + + Broker(String host, int port) + { + super(host, port); + } + + /** + * Allows a listener to be registered that will receive callbacks when communication + * to the peer this broker instance represents fails. + * @param listener the callback to be notified of failures + */ + public void addFailureListener(MemberFailureListener listener) + { + _listener = listener; + } + + /** + * Allows subclasses to signal comunication failures + */ + protected void failed() + { + if (_listener != null) + { + _listener.failed(this); + } + } + + /** + * Subclasses should call this on receiving message responses from the remote + * peer. They are matched to any outstanding request they might be response + * to, with the completion and callback of that request being managed if + * required. + * + * @param channel the channel on which the method was received + * @param response the response received + * @return true if the response matched an outstanding request + */ + protected synchronized boolean handleResponse(int channel, AMQMethodBody response) + { + ResponseHandler request = _requests.get(channel); + if (request == null) + { + if(!_requests.isEmpty()) + { + _logger.warn(new LogMessage("[next channel={3, integer}]: Response {0} on channel {1, integer} failed to match outstanding requests: {2}", response, channel, _requests, _nextChannel)); + } + return false; + } + else + { + request.responded(response); + return true; + } + } + + /** + * Called when this broker is excluded from the group. Any requests made on + * it are informed this member has left the group. + */ + synchronized void remove() + { + for (ResponseHandler r : _requests.values()) + { + r.removed(); + } + } + + /** + * Engages this broker in the specified group request + * + * @param request the request being made to a group of brokers + * @throws AMQException if there is any failure + */ + synchronized void invoke(GroupRequest request) throws AMQException + { + int channel = nextChannel(); + _requests.put(channel, new GroupRequestAdapter(request, channel)); + request.send(channel, this); + } + + /** + * Sends a message to the remote peer and undertakes to notify the specified + * handler of the response. + * + * @param msg the message to send + * @param handler the callback to notify of responses (or the removal of this broker + * from the group) + * @throws AMQException + */ + synchronized void send(Sendable msg, ResponseHandler handler) throws AMQException + { + int channel; + if (handler != null) + { + channel = nextChannel(); + _requests.put(channel, new RemovingWrapper(handler, channel)); + } + else + { + channel = DEFAULT_CHANNEL; + } + + msg.send(channel, this); + } + + private int nextChannel() + { + int channel = _nextChannel++; + if(_nextChannel >= END_CHANNEL) + { + _nextChannel = START_CHANNEL; + } + return channel; + } + + /** + * extablish connection without handling redirect + */ + abstract boolean connect() throws IOException, InterruptedException; + + /** + * Start connection process, including replay + */ + abstract void connectAsynch(Iterable<AMQMethodBody> msgs); + + /** + * Replay messages to the remote peer this instance represents. These messages + * must be sent before any others whose transmission is requested through send() etc. + * + * @param msgs + */ + abstract void replay(Iterable<AMQMethodBody> msgs); + + /** + * establish connection, handling redirect if required... + */ + abstract Broker connectToCluster() throws IOException, InterruptedException; + + private class GroupRequestAdapter implements ResponseHandler + { + private final GroupRequest request; + private final int channel; + + GroupRequestAdapter(GroupRequest request, int channel) + { + this.request = request; + this.channel = channel; + } + + public void responded(AMQMethodBody response) + { + request.responseReceived(Broker.this, response); + _requests.remove(channel); + } + + public void removed() + { + request.removed(Broker.this); + } + + public String toString() + { + return "GroupRequestAdapter{" + channel + ", " + request + "}"; + } + } + + private class RemovingWrapper implements ResponseHandler + { + private final ResponseHandler handler; + private final int channel; + + RemovingWrapper(ResponseHandler handler, int channel) + { + this.handler = handler; + this.channel = channel; + } + + public void responded(AMQMethodBody response) + { + handler.responded(response); + _requests.remove(channel); + } + + public void removed() + { + handler.removed(); + } + + public String toString() + { + return "RemovingWrapper{" + channel + ", " + handler + "}"; + } + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/BrokerFactory.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/BrokerFactory.java new file mode 100644 index 0000000000..92c3c4e7bf --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/BrokerFactory.java @@ -0,0 +1,26 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +interface BrokerFactory +{ + public Broker create(MemberHandle handle); +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/BrokerGroup.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/BrokerGroup.java new file mode 100644 index 0000000000..755a341607 --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/BrokerGroup.java @@ -0,0 +1,368 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.cluster.replay.ReplayManager; +import org.apache.qpid.server.cluster.util.LogMessage; +import org.apache.qpid.server.cluster.util.InvokeMultiple; +import org.apache.qpid.framing.AMQMethodBody; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * Manages the membership list of a group and the set of brokers representing the + * remote peers. The group should be initialised through a call to establish() + * or connectToLeader(). + * + */ +class BrokerGroup +{ + private static final Logger _logger = Logger.getLogger(BrokerGroup.class); + + private final InvokeMultiple<MembershipChangeListener> _changeListeners = new InvokeMultiple<MembershipChangeListener>(MembershipChangeListener.class); + private final ReplayManager _replayMgr; + private final MemberHandle _local; + private final BrokerFactory _factory; + private final Object _lock = new Object(); + private final Set<MemberHandle> _synch = new HashSet<MemberHandle>(); + private List<MemberHandle> _members; + private List<Broker> _peers = new ArrayList<Broker>(); + private JoinState _state = JoinState.UNINITIALISED; + + /** + * Creates an unitialised group. + * + * @param local a handle that represents the local broker + * @param replayMgr the replay manager to use when creating new brokers + * @param factory the factory through which broker instances are created + */ + BrokerGroup(MemberHandle local, ReplayManager replayMgr, BrokerFactory factory) + { + _replayMgr = replayMgr; + _local = local; + _factory = factory; + } + + /** + * Called to establish the local broker as the leader of a new group + */ + void establish() + { + synchronized (_lock) + { + setState(JoinState.JOINED); + _members = new ArrayList<MemberHandle>(); + _members.add(_local); + } + fireChange(); + } + + /** + * Called by prospect to connect to group + */ + Broker connectToLeader(MemberHandle handle) throws Exception + { + Broker leader = _factory.create(handle); + leader = leader.connectToCluster(); + synchronized (_lock) + { + setState(JoinState.JOINING); + _members = new ArrayList<MemberHandle>(); + _members.add(leader); + _peers.add(leader); + } + fireChange(); + return leader; + } + + /** + * Called by leader when handling a join request + */ + Broker connectToProspect(MemberHandle handle) throws IOException, InterruptedException + { + Broker prospect = _factory.create(handle); + prospect.connect(); + synchronized (_lock) + { + _members.add(prospect); + _peers.add(prospect); + } + fireChange(); + return prospect; + } + + /** + * Called in reponse to membership announcements. + * + * @param members the list of members now part of the group + */ + void setMembers(List<MemberHandle> members) + { + if (isJoined()) + { + List<Broker> old = _peers; + + synchronized (_lock) + { + _peers = getBrokers(members); + _members = new ArrayList<MemberHandle>(members); + } + + //remove those that are still members + old.removeAll(_peers); + + //handle failure of any brokers that haven't survived + for (Broker peer : old) + { + peer.remove(); + } + } + else + { + synchronized (_lock) + { + setState(JoinState.INITIATION); + _members = new ArrayList<MemberHandle>(members); + _synch.addAll(_members); + _synch.remove(_local); + } + } + fireChange(); + } + + List<MemberHandle> getMembers() + { + synchronized (_lock) + { + return Collections.unmodifiableList(_members); + } + } + + List<Broker> getPeers() + { + synchronized (_lock) + { + return _peers; + } + } + + /** + * Removes the member presented from the group + * @param peer the broker that should be removed + */ + void remove(Broker peer) + { + synchronized (_lock) + { + _peers.remove(peer); + _members.remove(peer); + } + fireChange(); + } + + MemberHandle getLocal() + { + return _local; + } + + Broker getLeader() + { + synchronized (_lock) + { + return _peers.size() > 0 ? _peers.get(0) : null; + } + } + + /** + * Allows a Broker instance to be retrieved for a given handle + * + * @param handle the handle for which a broker is sought + * @param create flag to indicate whther a broker should be created for the handle if + * one is not found within the list of known peers + * @return the broker corresponding to handle or null if a match cannot be found and + * create is false + */ + Broker findBroker(MemberHandle handle, boolean create) + { + if (handle instanceof Broker) + { + return (Broker) handle; + } + else + { + for (Broker b : getPeers()) + { + if (b.matches(handle)) + { + return b; + } + } + } + if (create) + { + Broker b = _factory.create(handle); + List<AMQMethodBody> msgs = _replayMgr.replay(isLeader(_local)); + _logger.info(new LogMessage("Replaying {0} from {1} to {2}", msgs, _local, b)); + b.connectAsynch(msgs); + + return b; + } + else + { + return null; + } + } + + /** + * @param member the member to test for leadership + * @return true if the passed in member is the group leader, false otherwise + */ + boolean isLeader(MemberHandle member) + { + synchronized (_lock) + { + return member.matches(_members.get(0)); + } + } + + /** + * @return true if the local broker is the group leader, false otherwise + */ + boolean isLeader() + { + return isLeader(_local); + } + + /** + * Used when the leader fails and the next broker in the list needs to + * assume leadership + * @return true if the action succeeds + */ + boolean assumeLeadership() + { + boolean valid; + synchronized (_lock) + { + valid = _members.size() > 1 && _local.matches(_members.get(1)); + if (valid) + { + _members.remove(0); + _peers.remove(0); + } + } + fireChange(); + return valid; + } + + /** + * Called in response to a Cluster.Synch message being received during the join + * process. This indicates that the member mentioned has replayed all necessary + * messages to the local broker. + * + * @param member the member from whom the synch messages was received + */ + void synched(MemberHandle member) + { + _logger.info(new LogMessage("Synchronised with {0}", member)); + synchronized (_lock) + { + if (isLeader(member)) + { + setState(JoinState.INDUCTION); + } + _synch.remove(member); + if (_synch.isEmpty()) + { + _peers = getBrokers(_members); + setState(JoinState.JOINED); + } + } + } + + + /** + * @return the state of the group + */ + JoinState getState() + { + synchronized (_lock) + { + return _state; + } + } + + void addMemberhipChangeListener(MembershipChangeListener l) + { + _changeListeners.addListener(l); + } + + void removeMemberhipChangeListener(MembershipChangeListener l) + { + _changeListeners.removeListener(l); + } + + + + private void setState(JoinState state) + { + _logger.info(new LogMessage("Changed state from {0} to {1}", _state, state)); + _state = state; + } + + private boolean isJoined() + { + return inState(JoinState.JOINED); + } + + private boolean inState(JoinState state) + { + return _state.equals(state); + } + + private List<Broker> getBrokers(List<MemberHandle> handles) + { + List<Broker> brokers = new ArrayList<Broker>(); + for (MemberHandle handle : handles) + { + if (!_local.matches(handle)) + { + brokers.add(findBroker(handle, true)); + } + } + return brokers; + } + + private void fireChange() + { + List<MemberHandle> members; + synchronized(this) + { + members = new ArrayList(_members); + } + _changeListeners.getProxy().changed(Collections.unmodifiableList(members)); + } +}
\ No newline at end of file diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java new file mode 100644 index 0000000000..1b4a3e8327 --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.mina.common.IoSession; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.framing.AMQMethodBody; + +/** + * Hack to assist with reuse of the client handlers for connection setup in + * the inter-broker communication within the cluster. + * + */ +class ClientAdapter implements MethodHandler +{ + private final AMQProtocolSession _session; + private final AMQStateManager _stateMgr; + + ClientAdapter(IoSession session, AMQStateManager stateMgr) + { + this(session, stateMgr, "guest", "guest", session.toString(), "/cluster"); + } + + ClientAdapter(IoSession session, AMQStateManager stateMgr, String user, String password, String name, String path) + { + _session = new SessionAdapter(session, new ConnectionAdapter(user, password, name, path)); + _stateMgr = stateMgr; + } + + public void handle(int channel, AMQMethodBody method) throws AMQException + { + AMQMethodEvent evt = new AMQMethodEvent(channel, method); + _stateMgr.methodReceived(evt); + } + + private class SessionAdapter extends AMQProtocolSession + { + public SessionAdapter(IoSession session, AMQConnection connection) + { + super(null, session, connection); + } + } + + private static class ConnectionAdapter extends AMQConnection + { + ConnectionAdapter(String username, String password, String clientName, String virtualPath) + { + super(username, password, clientName, virtualPath); + } + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java new file mode 100644 index 0000000000..c1caf8bbff --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java @@ -0,0 +1,130 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.client.handler.ConnectionCloseMethodHandler; +import org.apache.qpid.client.handler.ConnectionOpenOkMethodHandler; +import org.apache.qpid.client.handler.ConnectionSecureMethodHandler; +import org.apache.qpid.client.handler.ConnectionStartMethodHandler; +import org.apache.qpid.client.handler.ConnectionTuneMethodHandler; +import org.apache.qpid.client.state.AMQState; +import org.apache.qpid.client.state.AMQStateManager; +// import org.apache.qpid.client.state.IllegalStateTransitionException; +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.framing.*; + +import java.util.HashMap; +import java.util.Map; + +/** + * An extension of client.AMQStateManager that allows different handlers to be registered. + * + */ +public class ClientHandlerRegistry extends AMQStateManager +{ + private final Map<AMQState, ClientRegistry> _handlers = new HashMap<AMQState, ClientRegistry>(); + private final MemberHandle _identity; + + protected ClientHandlerRegistry(MemberHandle local, AMQProtocolSession protocolSession) + { + super(AMQState.CONNECTION_NOT_STARTED, false, protocolSession); + + _identity = local; + + addHandler(ConnectionStartBody.class, ConnectionStartMethodHandler.getInstance(), + AMQState.CONNECTION_NOT_STARTED); + + addHandler(ConnectionTuneBody.class, new ConnectionTuneHandler(), + AMQState.CONNECTION_NOT_TUNED); + addHandler(ConnectionSecureBody.class, ConnectionSecureMethodHandler.getInstance(), + AMQState.CONNECTION_NOT_TUNED); + addHandler(ConnectionOpenOkBody.class, ConnectionOpenOkMethodHandler.getInstance(), + AMQState.CONNECTION_NOT_OPENED); + + addHandlers(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance(), + AMQState.CONNECTION_NOT_STARTED, + AMQState.CONNECTION_NOT_TUNED, + AMQState.CONNECTION_NOT_OPENED); + + } + + private ClientRegistry state(AMQState state) + { + ClientRegistry registry = _handlers.get(state); + if (registry == null) + { + registry = new ClientRegistry(); + _handlers.put(state, registry); + } + return registry; + } + + protected StateAwareMethodListener findStateTransitionHandler(AMQState state, AMQMethodBody frame) //throws IllegalStateTransitionException + { + ClientRegistry registry = _handlers.get(state); + return registry == null ? null : registry.getHandler(frame); + } + + + <A extends Class<AMQMethodBody>> void addHandlers(Class type, StateAwareMethodListener handler, AMQState... states) + { + for (AMQState state : states) + { + addHandler(type, handler, state); + } + } + + <A extends Class<AMQMethodBody>> void addHandler(Class type, StateAwareMethodListener handler, AMQState state) + { + ClientRegistry registry = _handlers.get(state); + if (registry == null) + { + registry = new ClientRegistry(); + _handlers.put(state, registry); + } + registry.add(type, handler); + } + + static class ClientRegistry + { + private final Map<Class<? extends AMQMethodBody>, StateAwareMethodListener> registry + = new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener>(); + + <A extends Class<AMQMethodBody>> void add(A type, StateAwareMethodListener handler) + { + registry.put(type, handler); + } + + StateAwareMethodListener getHandler(AMQMethodBody frame) + { + return registry.get(frame.getClass()); + } + } + + class ConnectionTuneHandler extends ConnectionTuneMethodHandler + { + protected AMQFrame createConnectionOpenFrame(int channel, AMQShortString path, AMQShortString capabilities, boolean insist, byte major, byte minor) + { + return super.createConnectionOpenFrame(channel, path, new AMQShortString(ClusterCapability.add(capabilities, _identity)), insist, major, minor); + } + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java new file mode 100644 index 0000000000..80f9ef62b1 --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java @@ -0,0 +1,63 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.server.cluster.handler.ClusterMethodHandlerFactory; +import org.apache.qpid.server.cluster.replay.RecordingMethodHandlerFactory; +import org.apache.qpid.server.cluster.replay.ReplayStore; + +import java.net.InetSocketAddress; + +class ClusterBuilder +{ + private final LoadTable loadTable = new LoadTable(); + private final ReplayStore replayStore = new ReplayStore(); + private final MemberHandle handle; + private final GroupManager groupMgr; + + ClusterBuilder(InetSocketAddress address) + { + handle = new SimpleMemberHandle(address.getHostName(), address.getPort()).resolve(); + groupMgr = new DefaultGroupManager(handle, getBrokerFactory(), replayStore, loadTable); + } + + GroupManager getGroupManager() + { + return groupMgr; + } + + ServerHandlerRegistry getHandlerRegistry() + { + return new ServerHandlerRegistry(getHandlerFactory(), null, null); + } + + private MethodHandlerFactory getHandlerFactory() + { + MethodHandlerFactory factory = new ClusterMethodHandlerFactory(groupMgr, loadTable); + //need to wrap relevant handlers with recording handler for easy replay: + return new RecordingMethodHandlerFactory(factory, replayStore); + } + + private BrokerFactory getBrokerFactory() + { + return new MinaBrokerProxyFactory(handle); + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterCapability.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterCapability.java new file mode 100644 index 0000000000..57c48f0611 --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterCapability.java @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.framing.AMQShortString; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class ClusterCapability +{ + public static final String PATTERN = ".*\\bcluster_peer=(\\S*:\\d*)\b*.*"; + public static final String PEER = "cluster_peer"; + + public static AMQShortString add(AMQShortString original, MemberHandle identity) + { + return original == null ? peer(identity) : new AMQShortString(original + " " + peer(identity)); + } + + private static AMQShortString peer(MemberHandle identity) + { + return new AMQShortString(PEER + "=" + identity.getDetails()); + } + + public static boolean contains(AMQShortString in) + { + return in != null; // && in.contains(in); + } + + public static MemberHandle getPeer(AMQShortString in) + { + Matcher matcher = Pattern.compile(PATTERN).matcher(in); + if (matcher.matches()) + { + return new SimpleMemberHandle(matcher.group(1)); + } + else + { + throw new RuntimeException("Could not find peer in '" + in + "'"); + } + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java new file mode 100644 index 0000000000..ee5aa48db9 --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java @@ -0,0 +1,190 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.log4j.Logger; +import org.apache.mina.common.IoSession; +import org.apache.qpid.AMQException; +import org.apache.qpid.codec.AMQCodecFactory; +import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.ConnectionOpenBody; +import org.apache.qpid.framing.ConnectionSecureOkBody; +import org.apache.qpid.framing.ConnectionStartOkBody; +import org.apache.qpid.framing.ConnectionTuneOkBody; +import org.apache.qpid.framing.ClusterMembershipBody; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQPFastProtocolHandler; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.cluster.util.LogMessage; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; + +import java.net.InetSocketAddress; + +public class ClusteredProtocolHandler extends AMQPFastProtocolHandler implements InductionBuffer.MessageHandler +{ + private static final Logger _logger = Logger.getLogger(ClusteredProtocolHandler.class); + private final InductionBuffer _peerBuffer = new InductionBuffer(this); + private final InductionBuffer _clientBuffer = new InductionBuffer(this); + private final GroupManager _groupMgr; + private final ServerHandlerRegistry _handlers; + + public ClusteredProtocolHandler(InetSocketAddress address) + { + this(ApplicationRegistry.getInstance(), address); + } + + public ClusteredProtocolHandler(IApplicationRegistry registry, InetSocketAddress address) + { + super(registry); + ClusterBuilder builder = new ClusterBuilder(address); + _groupMgr = builder.getGroupManager(); + _handlers = builder.getHandlerRegistry(); + } + + public ClusteredProtocolHandler(ClusteredProtocolHandler handler) + { + super(handler); + _groupMgr = handler._groupMgr; + _handlers = handler._handlers; + } + + protected void createSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession, AMQCodecFactory codec) throws AMQException + { + new ClusteredProtocolSession(session, virtualHostRegistry, codec, new ServerHandlerRegistry(_handlers, virtualHostRegistry, protocolSession)); + } + + void connect(String join) throws Exception + { + if (join == null) + { + _groupMgr.establish(); + } + else + { + _groupMgr.join(new SimpleMemberHandle(join)); + } + } + + private boolean inState(JoinState state) + { + return _groupMgr.getState().equals(state); + } + + public void messageReceived(IoSession session, Object msg) throws Exception + { + JoinState state = _groupMgr.getState(); + switch (state) + { + case JOINED: + _logger.debug(new LogMessage("Received {0}", msg)); + super.messageReceived(session, msg); + break; + case JOINING: + case INITIATION: + case INDUCTION: + buffer(session, msg); + break; + default: + throw new AMQException("Received message while in state: " + state); + } + JoinState latest = _groupMgr.getState(); + if (!latest.equals(state)) + { + switch (latest) + { + case INDUCTION: + _logger.info("Reached induction, delivering buffered message from peers"); + _peerBuffer.deliver(); + break; + case JOINED: + _logger.info("Reached joined, delivering buffered message from clients"); + _clientBuffer.deliver(); + break; + } + } + } + + private void buffer(IoSession session, Object msg) throws Exception + { + if (isBufferable(msg)) + { + MemberHandle peer = ClusteredProtocolSession.getSessionPeer(session); + if (peer == null) + { + _logger.debug(new LogMessage("Buffering {0} for client", msg)); + _clientBuffer.receive(session, msg); + } + else if (inState(JoinState.JOINING) && isMembershipAnnouncement(msg)) + { + _logger.debug(new LogMessage("Initial membership [{0}] received from {1}", msg, peer)); + super.messageReceived(session, msg); + } + else if (inState(JoinState.INITIATION) && _groupMgr.isLeader(peer)) + { + _logger.debug(new LogMessage("Replaying {0} from leader ", msg)); + super.messageReceived(session, msg); + } + else if (inState(JoinState.INDUCTION)) + { + _logger.debug(new LogMessage("Replaying {0} from peer {1}", msg, peer)); + super.messageReceived(session, msg); + } + else + { + _logger.debug(new LogMessage("Buffering {0} for peer {1}", msg, peer)); + _peerBuffer.receive(session, msg); + } + } + else + { + _logger.debug(new LogMessage("Received {0}", msg)); + super.messageReceived(session, msg); + } + } + + public void deliver(IoSession session, Object msg) throws Exception + { + _logger.debug(new LogMessage("Delivering {0}", msg)); + super.messageReceived(session, msg); + } + + private boolean isMembershipAnnouncement(Object msg) + { + return msg instanceof AMQFrame && (((AMQFrame) msg).getBodyFrame() instanceof ClusterMembershipBody); + } + + private boolean isBufferable(Object msg) + { + return msg instanceof AMQFrame && isBuffereable(((AMQFrame) msg).getBodyFrame()); + } + + private boolean isBuffereable(AMQBody body) + { + return !(body instanceof ConnectionStartOkBody || + body instanceof ConnectionTuneOkBody || + body instanceof ConnectionSecureOkBody || + body instanceof ConnectionOpenBody); + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java new file mode 100644 index 0000000000..eea660c4f0 --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java @@ -0,0 +1,133 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.mina.common.IoSession; +import org.apache.qpid.AMQException; +import org.apache.qpid.codec.AMQCodecFactory; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQMinaProtocolSession; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.state.AMQStateManager; + +public class ClusteredProtocolSession extends AMQMinaProtocolSession +{ + private MemberHandle _peer; + + public ClusteredProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory, AMQStateManager stateManager) throws AMQException +// public ClusteredProtocolSession(IoSession session, QueueRegistry queueRegistry, +// ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory) throws AMQException + { + super(session, virtualHostRegistry, codecFactory, stateManager); +// super(session, queueRegistry, exchangeRegistry, codecFactory); + } + + public boolean isPeerSession() + { + return _peer != null; + } + + public void setSessionPeer(MemberHandle peer) + { + _peer = peer; + } + + public MemberHandle getSessionPeer() + { + return _peer; + } + + public AMQChannel getChannel(int channelId) + throws AMQException + { + AMQChannel channel = super.getChannel(channelId); + if (isPeerSession() && channel == null) + { + channel = new OneUseChannel(channelId, getVirtualHost()); + addChannel(channel); + } + return channel; + } + + public static boolean isPeerSession(IoSession session) + { + return isPeerSession(getAMQProtocolSession(session)); + } + + public static boolean isPeerSession(AMQProtocolSession session) + { + return session instanceof ClusteredProtocolSession && ((ClusteredProtocolSession) session).isPeerSession(); + } + + public static void setSessionPeer(AMQProtocolSession session, MemberHandle peer) + { + ((ClusteredProtocolSession) session).setSessionPeer(peer); + } + + public static MemberHandle getSessionPeer(AMQProtocolSession session) + { + return ((ClusteredProtocolSession) session).getSessionPeer(); + } + + public static MemberHandle getSessionPeer(IoSession session) + { + return getSessionPeer(getAMQProtocolSession(session)); + } + + /** + * Cleans itself up after delivery of a message (publish frame, header and optional body frame(s)) + */ + private class OneUseChannel extends AMQChannel + { + public OneUseChannel(int channelId, VirtualHost virtualHost) + throws AMQException + { + super(ClusteredProtocolSession.this,channelId, + virtualHost.getMessageStore(), + virtualHost.getExchangeRegistry()); + } + + protected void routeCurrentMessage() throws AMQException + { + super.routeCurrentMessage(); + removeChannel(getChannelId()); + } + } + + public static boolean isPayloadFromPeer(AMQMessage payload) + { + return isPeerSession(payload.getPublisher()); + } + + public static boolean canRelay(AMQMessage payload, MemberHandle target) + { + //can only relay client messages that have not already been relayed to the given target + return !isPayloadFromPeer(payload) && !payload.checkToken(target); + } + +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/ConnectionStatusMonitor.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/ConnectionStatusMonitor.java new file mode 100644 index 0000000000..a1f01eff46 --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/ConnectionStatusMonitor.java @@ -0,0 +1,80 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +import java.io.IOException; + +class ConnectionStatusMonitor +{ + private boolean _complete; + private boolean _redirected; + private String _host; + private int _port; + private RuntimeException _error; + + synchronized void opened() + { + _complete = true; + notifyAll(); + } + + synchronized void redirect(String host, int port) + { + _complete = true; + _redirected = true; + this._host = host; + this._port = port; + } + + synchronized void failed(RuntimeException e) + { + _error = e; + _complete = true; + } + + synchronized boolean waitUntilOpen() throws InterruptedException + { + while (!_complete) + { + wait(); + } + if (_error != null) + { + throw _error; + } + return !_redirected; + } + + synchronized boolean isOpened() + { + return _complete; + } + + String getHost() + { + return _host; + } + + int getPort() + { + return _port; + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java new file mode 100644 index 0000000000..2f473b63fb --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java @@ -0,0 +1,396 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.*; +import org.apache.qpid.server.cluster.policy.StandardPolicies; +import org.apache.qpid.server.cluster.replay.ReplayManager; +import org.apache.qpid.server.cluster.util.LogMessage; + +import java.util.List; + +public class DefaultGroupManager implements GroupManager, MemberFailureListener, BrokerFactory, StandardPolicies +{ + private static final Logger _logger = Logger.getLogger(DefaultGroupManager.class); + private final LoadTable _loadTable; + private final BrokerFactory _factory; + private final ReplayManager _replayMgr; + private final BrokerGroup _group; + + DefaultGroupManager(MemberHandle handle, BrokerFactory factory, ReplayManager replayMgr) + { + this(handle, factory, replayMgr, new LoadTable()); + } + + DefaultGroupManager(MemberHandle handle, BrokerFactory factory, ReplayManager replayMgr, LoadTable loadTable) + { + handle = SimpleMemberHandle.resolve(handle); + _logger.info(handle); + _loadTable = loadTable; + _factory = factory; + _replayMgr = replayMgr; + _group = new BrokerGroup(handle, _replayMgr, this); + } + + public JoinState getState() + { + return _group.getState(); + } + + public void addMemberhipChangeListener(MembershipChangeListener l) + { + _group.addMemberhipChangeListener(l); + } + + public void removeMemberhipChangeListener(MembershipChangeListener l) + { + _group.removeMemberhipChangeListener(l); + } + + public void broadcast(Sendable message) throws AMQException + { + for (Broker b : _group.getPeers()) + { + b.send(message, null); + } + } + + public void broadcast(Sendable message, BroadcastPolicy policy, GroupResponseHandler callback) throws AMQException + { + GroupRequest request = new GroupRequest(message, policy, callback); + for (Broker b : _group.getPeers()) + { + b.invoke(request); + } + request.finishedSend(); + } + + public void send(MemberHandle broker, Sendable message) throws AMQException + { + Broker destination = findBroker(broker); + if(destination == null) + { + _logger.warn(new LogMessage("Invalid destination sending {0}. {1} not known", message, broker)); + } + else + { + destination.send(message, null); + _logger.debug(new LogMessage("Sent {0} to {1}", message, broker)); + } + } + + private void send(Broker broker, Sendable message, ResponseHandler handler) throws AMQException + { + broker.send(message, handler); + } + + private void ping(Broker b) throws AMQException + { + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + ClusterPingBody ping = new ClusterPingBody((byte)8, + (byte)0, + ClusterPingBody.getClazz((byte)8, (byte)0), + ClusterPingBody.getMethod((byte)8, (byte)0), + _group.getLocal().getDetails(), + _loadTable.getLocalLoad(), + true); + BlockingHandler handler = new BlockingHandler(); + send(getLeader(), new SimpleBodySendable(ping), handler); + handler.waitForCompletion(); + if (handler.failed()) + { + if (isLeader()) + { + handleFailure(b); + } + else + { + suspect(b); + } + } + else + { + _loadTable.setLoad(b, ((ClusterPingBody) handler.getResponse()).load); + } + } + + public void handlePing(MemberHandle member, long load) + { + _loadTable.setLoad(findBroker(member), load); + } + + public Member redirect() + { + return _loadTable.redirect(); + } + + public void establish() + { + _group.establish(); + _logger.info("Established cluster"); + } + + public void join(MemberHandle member) throws AMQException + { + member = SimpleMemberHandle.resolve(member); + + Broker leader = connectToLeader(member); + _logger.info(new LogMessage("Connected to {0}. joining", leader)); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + ClusterJoinBody join = new ClusterJoinBody((byte)8, + (byte)0, + ClusterJoinBody.getClazz((byte)8, (byte)0), + ClusterJoinBody.getMethod((byte)8, (byte)0), + _group.getLocal().getDetails()); + + send(leader, new SimpleBodySendable(join)); + } + + private Broker connectToLeader(MemberHandle member) throws AMQException + { + try + { + return _group.connectToLeader(member); + } + catch (Exception e) + { + throw new AMQException("Could not connect to leader: " + e, e); + } + } + + public void leave() throws AMQException + { + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + ClusterLeaveBody leave = new ClusterLeaveBody((byte)8, + (byte)0, + ClusterLeaveBody.getClazz((byte)8, (byte)0), + ClusterLeaveBody.getMethod((byte)8, (byte)0), + _group.getLocal().getDetails()); + + send(getLeader(), new SimpleBodySendable(leave)); + } + + private void suspect(MemberHandle broker) throws AMQException + { + if (_group.isLeader(broker)) + { + //need new leader, if this broker is next in line it can assume leadership + if (_group.assumeLeadership()) + { + announceMembership(); + } + else + { + _logger.warn(new LogMessage("Leader failed. Expecting {0} to succeed.", _group.getMembers().get(1))); + } + } + else + { + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + ClusterSuspectBody suspect = new ClusterSuspectBody((byte)8, + (byte)0, + ClusterSuspectBody.getClazz((byte)8, (byte)0), + ClusterSuspectBody.getMethod((byte)8, (byte)0), + broker.getDetails()); + + send(getLeader(), new SimpleBodySendable(suspect)); + } + } + + + public void handleJoin(MemberHandle member) throws AMQException + { + _logger.info(new LogMessage("Handling join request for {0}", member)); + if(isLeader()) + { + //connect to the host and port specified: + Broker prospect = connectToProspect(member); + announceMembership(); + List<AMQMethodBody> msgs = _replayMgr.replay(true); + _logger.info(new LogMessage("Replaying {0} from leader to {1}", msgs, prospect)); + prospect.replay(msgs); + } + else + { + //pass request on to leader: + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + ClusterJoinBody request = new ClusterJoinBody((byte)8, (byte)0, + ClusterJoinBody.getClazz((byte)8, (byte)0), + ClusterJoinBody.getMethod((byte)8, (byte)0), + member.getDetails()); + + Broker leader = getLeader(); + send(leader, new SimpleBodySendable(request)); + _logger.info(new LogMessage("Passed join request for {0} to {1}", member, leader)); + } + } + + private Broker connectToProspect(MemberHandle member) throws AMQException + { + try + { + return _group.connectToProspect(member); + } + catch (Exception e) + { + e.printStackTrace(); + throw new AMQException("Could not connect to prospect: " + e, e); + } + } + + public void handleLeave(MemberHandle member) throws AMQException + { + handleFailure(findBroker(member)); + announceMembership(); + } + + public void handleSuspect(MemberHandle member) throws AMQException + { + Broker b = findBroker(member); + if(b != null) + { + //ping it to check it has failed, ping will handle failure if it has + ping(b); + announceMembership(); + } + } + + public void handleSynch(MemberHandle member) + { + _group.synched(member); + } + + private ClusterMembershipBody createAnnouncement(String membership) + { + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + ClusterMembershipBody announce = new ClusterMembershipBody((byte)8, (byte)0, + ClusterMembershipBody.getClazz((byte)8, (byte)0), + ClusterMembershipBody.getMethod((byte)8, (byte)0), + membership.getBytes()); + + + return announce; + } + + private void announceMembership() throws AMQException + { + String membership = SimpleMemberHandle.membersToString(_group.getMembers()); + ClusterMembershipBody announce = createAnnouncement(membership); + broadcast(new SimpleBodySendable(announce)); + _logger.info(new LogMessage("Membership announcement sent: {0}", membership)); + } + + private void handleFailure(Broker peer) + { + peer.remove(); + _group.remove(peer); + } + + public void handleMembershipAnnouncement(String membership) throws AMQException + { + _group.setMembers(SimpleMemberHandle.stringToMembers(membership)); + _logger.info(new LogMessage("Membership announcement received: {0}", membership)); + } + + public boolean isLeader() + { + return _group.isLeader(); + } + + public boolean isLeader(MemberHandle handle) + { + return _group.isLeader(handle); + } + + public Broker getLeader() + { + return _group.getLeader(); + } + + private Broker findBroker(MemberHandle handle) + { + return _group.findBroker(handle, false); + } + + public Member getMember(MemberHandle handle) + { + return findBroker(handle); + } + + public boolean isMember(MemberHandle member) + { + for (MemberHandle handle : _group.getMembers()) + { + if (handle.matches(member)) + { + return true; + } + } + return false; + } + + public MemberHandle getLocal() + { + return _group.getLocal(); + } + + public void failed(MemberHandle member) + { + if (isLeader()) + { + handleFailure(findBroker(member)); + try + { + announceMembership(); + } + catch (AMQException e) + { + _logger.error("Error announcing failure: " + e, e); + } + } + else + { + try + { + suspect(member); + } + catch (AMQException e) + { + _logger.error("Error sending suspect: " + e, e); + } + } + } + + public Broker create(MemberHandle handle) + { + Broker broker = _factory.create(handle); + broker.addFailureListener(this); + return broker; + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupManager.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupManager.java new file mode 100644 index 0000000000..5599ae4b1f --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupManager.java @@ -0,0 +1,72 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.AMQException; + +public interface GroupManager +{ + /** + * Establish a new cluster with the local member as the leader. + */ + public void establish(); + + /** + * Join the cluster to which member belongs + */ + public void join(MemberHandle member) throws AMQException; + + public void broadcast(Sendable message) throws AMQException; + + public void broadcast(Sendable message, BroadcastPolicy policy, GroupResponseHandler callback) throws AMQException; + + public void send(MemberHandle broker, Sendable message) throws AMQException; + + public void leave() throws AMQException; + + public void handleJoin(MemberHandle member) throws AMQException; + + public void handleLeave(MemberHandle member) throws AMQException; + + public void handleSuspect(MemberHandle member) throws AMQException; + + public void handlePing(MemberHandle member, long load); + + public void handleMembershipAnnouncement(String membership) throws AMQException; + + public void handleSynch(MemberHandle member); + + public boolean isLeader(); + + public boolean isLeader(MemberHandle handle); + + public boolean isMember(MemberHandle member); + + public MemberHandle redirect(); + + public MemberHandle getLocal(); + + public JoinState getState(); + + public void addMemberhipChangeListener(MembershipChangeListener l); + + public void removeMemberhipChangeListener(MembershipChangeListener l); +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupRequest.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupRequest.java new file mode 100644 index 0000000000..8ab7856e87 --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupRequest.java @@ -0,0 +1,107 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQMethodBody; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Represents a method sent to a group of Member instances. Manages the responses, + * completion and callback. + * + */ +class GroupRequest +{ + private final Map<Member, AMQMethodBody> _responses = new HashMap<Member, AMQMethodBody>(); + private final List<Member> _brokers = new ArrayList<Member>(); + private boolean _sent; + + private final Sendable _request; + private final BroadcastPolicy _policy; + private final GroupResponseHandler _callback; + + GroupRequest(Sendable request, BroadcastPolicy policy, GroupResponseHandler callback) + { + _request = request; + _policy = policy; + _callback = callback; + } + + void send(int channel, Member session) throws AMQException + { + _brokers.add(session); + _request.send(channel, session); + } + + boolean finishedSend() + { + _sent = true; + return checkCompletion(); + } + + public boolean responseReceived(Member broker, AMQMethodBody response) + { + _responses.put(broker, response); + return checkCompletion(); + } + + public boolean removed(Member broker) + { + _brokers.remove(broker); + return checkCompletion(); + } + + private synchronized boolean checkCompletion() + { + return isComplete() && callback(); + } + + boolean isComplete() + { + return _sent && _policy != null && _policy.isComplete(_responses.size(), _brokers.size()); + } + + boolean callback() + { + _callback.response(getResults(), _brokers); + return true; + } + + List<AMQMethodBody> getResults() + { + List<AMQMethodBody> results = new ArrayList<AMQMethodBody>(_brokers.size()); + for (Member b : _brokers) + { + results.add(_responses.get(b)); + } + return results; + } + + public String toString() + { + return "GroupRequest{request=" + _request +", brokers=" + _brokers + ", responses=" + _responses + "}"; + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupResponseHandler.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupResponseHandler.java new file mode 100644 index 0000000000..d2e9de2f39 --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/GroupResponseHandler.java @@ -0,0 +1,31 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.framing.AMQMethodBody; + +import java.util.List; + +public interface GroupResponseHandler +{ + //Note: this implies that the response to a group request will always be a method body... + public void response(List<AMQMethodBody> responses, List<Member> members); +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/InductionBuffer.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/InductionBuffer.java new file mode 100644 index 0000000000..586d7d4ae8 --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/InductionBuffer.java @@ -0,0 +1,90 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.mina.common.IoSession; + +import java.util.LinkedList; +import java.util.Queue; + +/** + * Buffers any received messages until join completes. + * + */ +class InductionBuffer +{ + private final Queue<Message> _buffer = new LinkedList<Message>(); + private final MessageHandler _handler; + private boolean _buffering = true; + + InductionBuffer(MessageHandler handler) + { + _handler = handler; + } + + private void process() throws Exception + { + for (Message o = _buffer.poll(); o != null; o = _buffer.poll()) + { + o.deliver(_handler); + } + _buffering = false; + } + + synchronized void deliver() throws Exception + { + process(); + } + + synchronized void receive(IoSession session, Object msg) throws Exception + { + if (_buffering) + { + _buffer.offer(new Message(session, msg)); + } + else + { + _handler.deliver(session, msg); + } + } + + private static class Message + { + private final IoSession _session; + private final Object _msg; + + Message(IoSession session, Object msg) + { + _session = session; + _msg = msg; + } + + void deliver(MessageHandler handler) throws Exception + { + handler.deliver(_session, _msg); + } + } + + static interface MessageHandler + { + public void deliver(IoSession session, Object msg) throws Exception; + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/JoinState.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/JoinState.java new file mode 100644 index 0000000000..5f92aa2971 --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/JoinState.java @@ -0,0 +1,26 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +public enum JoinState +{ + UNINITIALISED, JOINING, INITIATION, INDUCTION, JOINED +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/LoadTable.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/LoadTable.java new file mode 100644 index 0000000000..13465a8615 --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/LoadTable.java @@ -0,0 +1,107 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +import java.util.HashMap; +import java.util.Map; +import java.util.PriorityQueue; + +/** + * Maintains loading information about the local member and its cluster peers. + * + */ +public class LoadTable +{ + private final Map<MemberHandle, Loading> _peers = new HashMap<MemberHandle, Loading>(); + private final PriorityQueue<Loading> _loads = new PriorityQueue<Loading>(); + private final Loading _local = new Loading(null); + + public LoadTable() + { + _loads.add(_local); + } + + public void setLoad(Member member, long load) + { + synchronized (_peers) + { + Loading loading = _peers.get(member); + if (loading == null) + { + loading = new Loading(member); + synchronized (_loads) + { + _loads.add(loading); + } + _peers.put(member, loading); + } + loading.load = load; + } + } + + public void incrementLocalLoad() + { + synchronized (_local) + { + _local.load++; + } + } + + public void decrementLocalLoad() + { + synchronized (_local) + { + _local.load--; + } + } + + public long getLocalLoad() + { + synchronized (_local) + { + return _local.load; + } + } + + public Member redirect() + { + synchronized (_loads) + { + return _loads.peek().member; + } + } + + private static class Loading implements Comparable + { + private final Member member; + private long load; + + Loading(Member member) + { + this.member = member; + } + + public int compareTo(Object o) + { + return (int) (load - ((Loading) o).load); + } + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/Main.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/Main.java new file mode 100644 index 0000000000..15752353d1 --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/Main.java @@ -0,0 +1,117 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.PosixParser; +import org.apache.log4j.Logger; +import org.apache.mina.common.IoAcceptor; +import org.apache.mina.transport.socket.nio.SocketAcceptor; +import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; +import org.apache.mina.transport.socket.nio.SocketSessionConfig; +import org.apache.qpid.pool.ReadWriteThreadModel; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; +import org.apache.qpid.server.transport.ConnectorConfiguration; + +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; + +/** + * TODO: This is a cut-and-paste from the original broker Main class. Would be preferrable to make that class more + * reuseable to avoid all this duplication. + */ +public class Main extends org.apache.qpid.server.Main +{ + private static final Logger _logger = Logger.getLogger(Main.class); + + protected Main(String[] args) + { + super(args); + } + + protected void setOptions(Options otions) + { + super.setOptions(options); + + //extensions: + Option join = OptionBuilder.withArgName("join").hasArg().withDescription("Join the specified cluster member. Overrides any value in the config file"). + withLongOpt("join").create("j"); + options.addOption(join); + } + + protected void bind(int port, ConnectorConfiguration connectorConfig) + { + try + { + IoAcceptor acceptor = new SocketAcceptor(); + SocketAcceptorConfig sconfig = (SocketAcceptorConfig) acceptor.getDefaultConfig(); + SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig(); + + sc.setReceiveBufferSize(connectorConfig.socketReceiveBufferSize); + sc.setSendBufferSize(connectorConfig.socketWriteBuferSize); + sc.setTcpNoDelay(true); + + // if we do not use the executor pool threading model we get the default leader follower + // implementation provided by MINA + if (connectorConfig.enableExecutorPool) + { + sconfig.setThreadModel(ReadWriteThreadModel.getInstance()); + } + + String host = InetAddress.getLocalHost().getHostName(); + ClusteredProtocolHandler handler = new ClusteredProtocolHandler(new InetSocketAddress(host, port)); + if (!connectorConfig.enableSSL) + { + acceptor.bind(new InetSocketAddress(port), handler, sconfig); + _logger.info("Qpid.AMQP listening on non-SSL port " + port); + handler.connect(commandLine.getOptionValue("j")); + } + else + { + ClusteredProtocolHandler sslHandler = new ClusteredProtocolHandler(handler); + acceptor.bind(new InetSocketAddress(connectorConfig.sslPort), sslHandler, sconfig); + _logger.info("Qpid.AMQP listening on SSL port " + connectorConfig.sslPort); + } + } + catch (IOException e) + { + _logger.error("Unable to bind service to registry: " + e, e); + } + catch (Exception e) + { + _logger.error("Unable to connect to cluster: " + e, e); + } + } + + public static void main(String[] args) + { + new Main(args); + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/Member.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/Member.java new file mode 100644 index 0000000000..3fbdfdde70 --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/Member.java @@ -0,0 +1,31 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQDataBlock; + +public interface Member extends MemberHandle +{ + public void send(AMQDataBlock data) throws AMQException; + + public void addFailureListener(MemberFailureListener listener); +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberFailureListener.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberFailureListener.java new file mode 100644 index 0000000000..7ce45dffaa --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberFailureListener.java @@ -0,0 +1,26 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +interface MemberFailureListener +{ + public void failed(MemberHandle member); +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberHandle.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberHandle.java new file mode 100644 index 0000000000..b8099a12f7 --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/MemberHandle.java @@ -0,0 +1,36 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.framing.AMQShortString; + +public interface MemberHandle +{ + public String getHost(); + + public int getPort(); + + public boolean matches(MemberHandle m); + + public boolean matches(String host, int port); + + public AMQShortString getDetails(); +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/MembershipChangeListener.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/MembershipChangeListener.java new file mode 100644 index 0000000000..591e652e32 --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/MembershipChangeListener.java @@ -0,0 +1,28 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +import java.util.List; + +public interface MembershipChangeListener +{ + public void changed(List<MemberHandle> members); +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandler.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandler.java new file mode 100644 index 0000000000..a83f034021 --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandler.java @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQMethodBody; + +interface MethodHandler +{ + public void handle(int channel, AMQMethodBody method) throws AMQException; +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandlerFactory.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandlerFactory.java new file mode 100644 index 0000000000..9bf04f5458 --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandlerFactory.java @@ -0,0 +1,28 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.server.state.AMQState; + +public interface MethodHandlerFactory +{ + public MethodHandlerRegistry register(AMQState state, MethodHandlerRegistry registry); +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandlerRegistry.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandlerRegistry.java new file mode 100644 index 0000000000..748a660bb8 --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/MethodHandlerRegistry.java @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.server.state.StateAwareMethodListener; + +import java.util.HashMap; +import java.util.Map; + +public class MethodHandlerRegistry +{ + private final Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>> registry = + new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>(); + + public <A extends AMQMethodBody, B extends Class<A>> MethodHandlerRegistry addHandler(B type, StateAwareMethodListener<A> handler) + { + registry.put(type, handler); + return this; + } + + public <B extends AMQMethodBody> StateAwareMethodListener<B> getHandler(B frame) + { + return (StateAwareMethodListener<B>) registry.get(frame.getClass()); + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java new file mode 100644 index 0000000000..b01ec491ec --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java @@ -0,0 +1,272 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.log4j.Logger; +import org.apache.mina.common.ConnectFuture; +import org.apache.mina.common.IoHandlerAdapter; +import org.apache.mina.common.IoSession; +import org.apache.mina.common.RuntimeIOException; +import org.apache.mina.filter.codec.ProtocolCodecFilter; +import org.apache.mina.transport.socket.nio.SocketConnector; +import org.apache.mina.transport.socket.nio.SocketConnectorConfig; +import org.apache.mina.transport.socket.nio.SocketSessionConfig; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.cluster.util.LogMessage; +import org.apache.qpid.client.state.AMQState; +import org.apache.qpid.codec.AMQCodecFactory; +import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.ConnectionRedirectBody; +import org.apache.qpid.framing.ProtocolInitiation; +import org.apache.qpid.framing.ProtocolVersion; + +import java.io.IOException; +import java.net.InetSocketAddress; + +/** + * A 'client stub' for a remote cluster peer, using MINA for IO Layer + * + */ +public class MinaBrokerProxy extends Broker implements MethodHandler +{ + private static final Logger _logger = Logger.getLogger(MinaBrokerProxy.class); + private final ConnectionStatusMonitor _connectionMonitor = new ConnectionStatusMonitor(); + private final ClientHandlerRegistry _legacyHandler; + private final MinaBinding _binding = new MinaBinding(); + private final MemberHandle _local; + private IoSession _session; + private MethodHandler _handler; + private Iterable<AMQMethodBody> _replay; + + MinaBrokerProxy(String host, int port, MemberHandle local) + { + super(host, port); + _local = local; + _legacyHandler = new ClientHandlerRegistry(local, null); + } + + private void init(IoSession session) + { + _session = session; + _handler = new ClientAdapter(session, _legacyHandler); + } + + private ConnectFuture connectImpl() + { + _logger.info("Connecting to cluster peer: " + getDetails()); + SocketConnector ioConnector = new SocketConnector(); + SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig(); + + SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); + scfg.setTcpNoDelay(true); + scfg.setSendBufferSize(32768); + scfg.setReceiveBufferSize(32768); + InetSocketAddress address = new InetSocketAddress(getHost(), getPort()); + return ioConnector.connect(address, _binding); + } + + //extablish connection without handling redirect + boolean connect() throws IOException, InterruptedException + { + ConnectFuture future = connectImpl(); + // wait for connection to complete + future.join(); + // we call getSession which throws an IOException if there has been an error connecting + try + { + future.getSession(); + } + catch (RuntimeIOException e) + { + _connectionMonitor.failed(e); + _logger.error(new LogMessage("Could not connect to {0}: {1}", this, e), e); + throw e; + } + return _connectionMonitor.waitUntilOpen(); + } + + void connectAsynch(Iterable<AMQMethodBody> msgs) + { + _replay = msgs; + connectImpl(); + } + + void replay(Iterable<AMQMethodBody> msgs) + { + _replay = msgs; + if(_connectionMonitor.isOpened()) + { + replay(); + } + } + + //establish connection, handling redirect if required... + Broker connectToCluster() throws IOException, InterruptedException + { + connect(); + //wait until the connection is open or get a redirection + if (_connectionMonitor.waitUntilOpen()) + { + return this; + } + else + { + Broker broker = new MinaBrokerProxy(_connectionMonitor.getHost(), _connectionMonitor.getPort(), _local); + broker.connect(); + return broker; + } + } + + public void send(AMQDataBlock data) throws AMQConnectionWaitException + { + if (_session == null) + { + try + { + _connectionMonitor.waitUntilOpen(); + } + catch (InterruptedException e) + { + throw new AMQConnectionWaitException("Failed to send " + data + ": " + e, e); + } + } + _session.write(data); + } + + private void replay() + { + if(_replay != null) + { + for(AMQMethodBody b : _replay) + { + _session.write(new AMQFrame(0, b)); + } + } + } + + public void handle(int channel, AMQMethodBody method) throws AMQException + { + _logger.info(new LogMessage("Handling method: {0} for channel {1}", method, channel)); + if (!handleResponse(channel, method)) + { + _logger.warn(new LogMessage("Unhandled method: {0} for channel {1}", method, channel)); + } + } + + private void handleMethod(int channel, AMQMethodBody method) throws AMQException + { + if (method instanceof ConnectionRedirectBody) + { + //signal redirection to waiting thread + ConnectionRedirectBody redirect = (ConnectionRedirectBody) method; + String[] parts = redirect.host.toString().split(":"); + _connectionMonitor.redirect(parts[0], Integer.parseInt(parts[1])); + } + else + { + _handler.handle(channel, method); + if (AMQState.CONNECTION_OPEN.equals(_legacyHandler.getCurrentState()) && _handler != this) + { + _handler = this; + _logger.info(new LogMessage("Connection opened, handler switched")); + //replay any messages: + replay(); + //signal waiting thread: + _connectionMonitor.opened(); + } + } + } + + private void handleFrame(AMQFrame frame) throws AMQException + { + AMQBody body = frame.getBodyFrame(); + if (body instanceof AMQMethodBody) + { + handleMethod(frame.getChannel(), (AMQMethodBody) body); + } + else + { + throw new AMQUnexpectedBodyTypeException(AMQMethodBody.class, body); + } + } + + public String toString() + { + return "MinaBrokerProxy[" + (_session == null ? super.toString() : _session.getRemoteAddress()) + "]"; + } + + private class MinaBinding extends IoHandlerAdapter + { + public void sessionCreated(IoSession session) throws Exception + { + init(session); + _logger.info(new LogMessage("{0}: created", MinaBrokerProxy.this)); + ProtocolCodecFilter pcf = new ProtocolCodecFilter(new AMQCodecFactory(false)); + session.getFilterChain().addLast("protocolFilter", pcf); + + /* Find last protocol version in protocol version list. Make sure last protocol version + listed in the build file (build-module.xml) is the latest version which will be used + here. */ + + session.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())); + } + + public void sessionOpened(IoSession session) throws Exception + { + _logger.info(new LogMessage("{0}: opened", MinaBrokerProxy.this)); + } + + public void sessionClosed(IoSession session) throws Exception + { + _logger.info(new LogMessage("{0}: closed", MinaBrokerProxy.this)); + } + + public void exceptionCaught(IoSession session, Throwable throwable) throws Exception + { + _logger.error(new LogMessage("{0}: received {1}", MinaBrokerProxy.this, throwable), throwable); + if (! (throwable instanceof IOException)) + { + _session.close(); + } + failed(); + } + + public void messageReceived(IoSession session, Object object) throws Exception + { + if (object instanceof AMQFrame) + { + handleFrame((AMQFrame) object); + } + else + { + throw new AMQUnexpectedFrameTypeException("Received message of unrecognised type: " + object); + } + } + + public void messageSent(IoSession session, Object object) throws Exception + { + _logger.debug(new LogMessage("{0}: sent {1}", MinaBrokerProxy.this, object)); + } + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxyFactory.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxyFactory.java new file mode 100644 index 0000000000..5e70de7665 --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxyFactory.java @@ -0,0 +1,36 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +public class MinaBrokerProxyFactory implements BrokerFactory +{ + private final MemberHandle _local; + + MinaBrokerProxyFactory(MemberHandle local) + { + _local = local; + } + + public Broker create(MemberHandle handle) + { + return new MinaBrokerProxy(handle.getHost(), handle.getPort(), _local); + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/ResponseHandler.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/ResponseHandler.java new file mode 100644 index 0000000000..fe76ca6505 --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/ResponseHandler.java @@ -0,0 +1,30 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.framing.AMQMethodBody; + +public interface ResponseHandler +{ + public void responded(AMQMethodBody response); + + public void removed(); +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/Sendable.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/Sendable.java new file mode 100644 index 0000000000..159612331c --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/Sendable.java @@ -0,0 +1,28 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.AMQException; + +public interface Sendable +{ + public void send(int channel, Member member) throws AMQException; +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java new file mode 100644 index 0000000000..aadcfa4b4c --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java @@ -0,0 +1,98 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.log4j.Logger; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.server.state.AMQState; +import org.apache.qpid.server.state.AMQStateManager; +//import org.apache.qpid.server.state.IllegalStateTransitionException; +import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.cluster.util.LogMessage; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; + +import java.util.HashMap; +import java.util.Map; + +/** + * An extension of server.AMQStateManager that allows different handlers to be registered. + * + */ +class ServerHandlerRegistry extends AMQStateManager +{ + private final Logger _logger = Logger.getLogger(ServerHandlerRegistry.class); + private final Map<AMQState, MethodHandlerRegistry> _handlers = new HashMap<AMQState, MethodHandlerRegistry>(); + + ServerHandlerRegistry(VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession) + { + super(AMQState.CONNECTION_NOT_STARTED, false, virtualHostRegistry, protocolSession); + } + + ServerHandlerRegistry(ServerHandlerRegistry s, VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession) + { + this(virtualHostRegistry, protocolSession); + _handlers.putAll(s._handlers); + } + + ServerHandlerRegistry(MethodHandlerFactory factory, VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession) + { + this(virtualHostRegistry, protocolSession); + init(factory); + } + + void setHandlers(AMQState state, MethodHandlerRegistry handlers) + { + _handlers.put(state, handlers); + } + + void init(MethodHandlerFactory factory) + { + for (AMQState s : AMQState.values()) + { + setHandlers(s, factory.register(s, new MethodHandlerRegistry())); + } + } + + protected <B extends AMQMethodBody> StateAwareMethodListener<B> findStateTransitionHandler(AMQState state, B frame) //throws IllegalStateTransitionException + { + MethodHandlerRegistry registry = _handlers.get(state); + StateAwareMethodListener<B> handler = (registry == null) ? null : registry.getHandler(frame); + if (handler == null) + { + _logger.warn(new LogMessage("No handler for {0}, {1}", state, frame)); + } + return handler; + } + + <A extends AMQMethodBody, B extends Class<A>> void addHandler(AMQState state, B type, StateAwareMethodListener<A> handler) + { + MethodHandlerRegistry registry = _handlers.get(state); + if (registry == null) + { + registry = new MethodHandlerRegistry(); + _handlers.put(state, registry); + } + registry.addHandler(type, handler); + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleBodySendable.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleBodySendable.java new file mode 100644 index 0000000000..bd3757bf97 --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleBodySendable.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQFrame; + +/** + */ +public class SimpleBodySendable implements Sendable +{ + private final AMQBody _body; + + public SimpleBodySendable(AMQBody body) + { + _body = body; + } + + public void send(int channel, Member member) throws AMQException + { + member.send(new AMQFrame(channel, _body)); + } + + public String toString() + { + return _body.toString(); + } + +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleMemberHandle.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleMemberHandle.java new file mode 100644 index 0000000000..1255094b1d --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleMemberHandle.java @@ -0,0 +1,166 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.framing.AMQShortString; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; + +public class SimpleMemberHandle implements MemberHandle +{ + private final String _host; + private final int _port; + + public SimpleMemberHandle(String host, int port) + { + _host = host; + _port = port; + } + + public SimpleMemberHandle(AMQShortString details) + { + this(details.toString()); + } + + public SimpleMemberHandle(String details) + { + String[] parts = details.split(":"); + _host = parts[0]; + _port = Integer.parseInt(parts[1]); + } + + public SimpleMemberHandle(InetSocketAddress address) throws UnknownHostException + { + this(address.getAddress(), address.getPort()); + } + + public SimpleMemberHandle(InetAddress address, int port) throws UnknownHostException + { + this(canonical(address).getHostAddress(), port); + } + + public String getHost() + { + return _host; + } + + public int getPort() + { + return _port; + } + + public int hashCode() + { + return getPort(); + } + + public boolean equals(Object o) + { + return o instanceof MemberHandle && matches((MemberHandle) o); + } + + public boolean matches(MemberHandle m) + { + return matches(m.getHost(), m.getPort()); + } + + public boolean matches(String host, int port) + { + return _host.equals(host) && _port == port; + } + + public AMQShortString getDetails() + { + return new AMQShortString(_host + ":" + _port); + } + + public String toString() + { + return getDetails().toString(); + } + + static List<MemberHandle> stringToMembers(String membership) + { + String[] names = membership.split("\\s"); + List<MemberHandle> members = new ArrayList<MemberHandle>(); + for (String name : names) + { + members.add(new SimpleMemberHandle(name)); + } + return members; + } + + static String membersToString(List<MemberHandle> members) + { + StringBuffer buffer = new StringBuffer(); + boolean first = true; + for (MemberHandle m : members) + { + if (first) + { + first = false; + } + else + { + buffer.append(" "); + } + buffer.append(m.getDetails()); + } + + return buffer.toString(); + } + + private static InetAddress canonical(InetAddress address) throws UnknownHostException + { + if (address.isLoopbackAddress()) + { + return InetAddress.getLocalHost(); + } + else + { + return address; + } + } + + public MemberHandle resolve() + { + return resolve(this); + } + + public static MemberHandle resolve(MemberHandle handle) + { + try + { + return new SimpleMemberHandle(new InetSocketAddress(handle.getHost(), handle.getPort())); + } + catch (UnknownHostException e) + { + e.printStackTrace(); + return handle; + } + } + + +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java new file mode 100644 index 0000000000..7e5563460f --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java @@ -0,0 +1,55 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.MethodConverter_8_0; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; +import org.apache.qpid.server.queue.AMQMessage; + +import java.util.Iterator; + +public class SimpleSendable implements Sendable +{ + + //todo fixme - remove 0-8 hard coding + ProtocolVersionMethodConverter _methodConverter = new MethodConverter_8_0(); + + private final AMQMessage _message; + + public SimpleSendable(AMQMessage message) + { + _message = message; + } + + public void send(int channel, Member member) throws AMQException + { + member.send(new AMQFrame(channel, _methodConverter.convertToBody(_message.getMessagePublishInfo()))); + member.send(new AMQFrame(channel, _message.getContentHeaderBody())); + Iterator<ContentChunk> it = _message.getContentBodyIterator(); + while (it.hasNext()) + { + member.send(new AMQFrame(channel, _methodConverter.convertToBody(it.next()))); + } + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java new file mode 100644 index 0000000000..86710e8a31 --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster.handler; + +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQMethodBody; + +import java.util.List; +import java.util.ArrayList; + +public class ChainedClusterMethodHandler <A extends AMQMethodBody> extends ClusterMethodHandler<A> +{ + private final List<ClusterMethodHandler<A>> _handlers; + + private ChainedClusterMethodHandler() + { + this(new ArrayList<ClusterMethodHandler<A>>()); + } + + public ChainedClusterMethodHandler(List<ClusterMethodHandler<A>> handlers) + { + _handlers = handlers; + } + + public ChainedClusterMethodHandler(ClusterMethodHandler<A>... handlers) + { + this(); + for(ClusterMethodHandler<A>handler: handlers) + { + _handlers.add(handler); + } + } + + protected final void peer(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException + { + for(ClusterMethodHandler<A> handler : _handlers) + { + handler.peer(stateMgr, evt); + } + } + + protected final void client(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException + { + for(ClusterMethodHandler<A> handler : _handlers) + { + handler.client(stateMgr, evt); + } + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java new file mode 100644 index 0000000000..c9f6dbfb37 --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java @@ -0,0 +1,136 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster.handler; + +import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.cluster.util.LogMessage; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.*; +import org.apache.log4j.Logger; + +import java.util.Map; +import java.util.HashMap; + +/** + * Maintains the default queue names for a channel, and alters subsequent frames where necessary + * to use this (i.e. when no queue is explictly specified). + * + */ +class ChannelQueueManager +{ + private static final Logger _logger = Logger.getLogger(ChannelQueueManager.class); + private final Map<Integer, AMQShortString> _channelQueues = new HashMap<Integer, AMQShortString>(); + + ClusterMethodHandler<QueueDeclareBody> createQueueDeclareHandler() + { + return new QueueDeclareHandler(); + } + + ClusterMethodHandler<QueueDeleteBody> createQueueDeleteHandler() + { + return new QueueDeleteHandler(); + } + + ClusterMethodHandler<QueueBindBody> createQueueBindHandler() + { + return new QueueBindHandler(); + } + + ClusterMethodHandler<BasicConsumeBody> createBasicConsumeHandler() + { + return new BasicConsumeHandler(); + } + + private void set(int channel, AMQShortString queue) + { + _channelQueues.put(channel, queue); + _logger.info(new LogMessage("Set default queue for {0} to {1}", channel, queue)); + } + + private AMQShortString get(int channel) + { + AMQShortString queue = _channelQueues.get(channel); + _logger.info(new LogMessage("Default queue for {0} is {1}", channel, queue)); + return queue; + } + + private class QueueDeclareHandler extends ClusterMethodHandler<QueueDeclareBody> + { + protected void peer(AMQStateManager stateMgr, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException + { + } + + protected void client(AMQStateManager stateMgr, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException + { + set(evt.getChannelId(), evt.getMethod().queue); + } + } + private class QueueBindHandler extends ClusterMethodHandler<QueueBindBody> + { + protected void peer(AMQStateManager stateMgr, AMQMethodEvent<QueueBindBody> evt) throws AMQException + { + } + + protected void client(AMQStateManager stateMgr, AMQMethodEvent<QueueBindBody> evt) throws AMQException + { + if(evt.getMethod().queue == null) + { + evt.getMethod().queue = get(evt.getChannelId()); + } + } + } + private class QueueDeleteHandler extends ClusterMethodHandler<QueueDeleteBody> + { + protected void peer(AMQStateManager stateMgr, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException + { + } + + protected void client(AMQStateManager stateMgr, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException + { + if(evt.getMethod().queue == null) + { + evt.getMethod().queue = get(evt.getChannelId()); + } + } + } + + private class BasicConsumeHandler extends ClusterMethodHandler<BasicConsumeBody> + { + protected void peer(AMQStateManager stateMgr, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException + { + } + + protected void client(AMQStateManager stateMgr, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException + { + if(evt.getMethod().queue == null) + { + evt.getMethod().queue = get(evt.getChannelId()); + } + } + } + +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java new file mode 100644 index 0000000000..faab99b0f6 --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java @@ -0,0 +1,51 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster.handler; + +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.cluster.ClusteredProtocolSession; +import org.apache.qpid.AMQException; + +public abstract class ClusterMethodHandler<A extends AMQMethodBody> implements StateAwareMethodListener<A> +{ + public final void methodReceived(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException + { + AMQProtocolSession session = stateMgr.getProtocolSession(); + + if (ClusteredProtocolSession.isPeerSession(session)) + { + peer(stateMgr, evt); + } + else + { + client(stateMgr, evt); + } + } + + protected abstract void peer(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException; + protected abstract void client(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException; +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java new file mode 100644 index 0000000000..e7509da32a --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java @@ -0,0 +1,239 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster.handler; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.*; +import org.apache.qpid.server.cluster.ClusterCapability; +import org.apache.qpid.server.cluster.ClusteredProtocolSession; +import org.apache.qpid.server.cluster.GroupManager; +import org.apache.qpid.server.cluster.LoadTable; +import org.apache.qpid.server.cluster.MemberHandle; +import org.apache.qpid.server.cluster.MethodHandlerFactory; +import org.apache.qpid.server.cluster.MethodHandlerRegistry; +import org.apache.qpid.server.cluster.SimpleMemberHandle; +import org.apache.qpid.server.handler.ChannelCloseHandler; +import org.apache.qpid.server.handler.ChannelFlowHandler; +import org.apache.qpid.server.handler.ChannelOpenHandler; +import org.apache.qpid.server.handler.ConnectionCloseMethodHandler; +import org.apache.qpid.server.handler.ConnectionOpenMethodHandler; +import org.apache.qpid.server.handler.ConnectionSecureOkMethodHandler; +import org.apache.qpid.server.handler.ConnectionStartOkMethodHandler; +import org.apache.qpid.server.handler.ConnectionTuneOkMethodHandler; +import org.apache.qpid.server.handler.ExchangeDeclareHandler; +import org.apache.qpid.server.handler.ExchangeDeleteHandler; +import org.apache.qpid.server.handler.BasicCancelMethodHandler; +import org.apache.qpid.server.handler.BasicPublishMethodHandler; +import org.apache.qpid.server.handler.QueueBindHandler; +import org.apache.qpid.server.handler.QueueDeleteHandler; +import org.apache.qpid.server.handler.BasicQosHandler; +import org.apache.qpid.server.handler.TxSelectHandler; +import org.apache.qpid.server.handler.TxCommitHandler; +import org.apache.qpid.server.handler.TxRollbackHandler; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.state.AMQState; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.state.StateAwareMethodListener; + +public class ClusterMethodHandlerFactory implements MethodHandlerFactory +{ + private final GroupManager _groupMgr; + private final LoadTable _loadTable; + + public ClusterMethodHandlerFactory(GroupManager groupMgr, LoadTable loadTable) + { + _groupMgr = groupMgr; + _loadTable = loadTable; + } + + public MethodHandlerRegistry register(AMQState state, MethodHandlerRegistry registry) + { + switch (state) + { + case CONNECTION_NOT_STARTED: + return registry.addHandler(ConnectionStartOkBody.class, ConnectionStartOkMethodHandler.getInstance()); + case CONNECTION_NOT_AUTH: + return registry.addHandler(ConnectionSecureOkBody.class, ConnectionSecureOkMethodHandler.getInstance()); + case CONNECTION_NOT_TUNED: + return registry.addHandler(ConnectionTuneOkBody.class, ConnectionTuneOkMethodHandler.getInstance()); + case CONNECTION_NOT_OPENED: + //connection.open override: + return registry.addHandler(ConnectionOpenBody.class, new ConnectionOpenHandler()); + case CONNECTION_OPEN: + return registerConnectionOpened(registry); + } + return registry; + } + + private MethodHandlerRegistry registerConnectionOpened(MethodHandlerRegistry registry) + { + //new cluster method handlers: + registry.addHandler(ClusterJoinBody.class, new JoinHandler()); + registry.addHandler(ClusterLeaveBody.class, new LeaveHandler()); + registry.addHandler(ClusterSuspectBody.class, new SuspectHandler()); + registry.addHandler(ClusterMembershipBody.class, new MembershipHandler()); + registry.addHandler(ClusterPingBody.class, new PingHandler()); + registry.addHandler(ClusterSynchBody.class, new SynchHandler()); + + //connection.close override: + registry.addHandler(ConnectionCloseBody.class, new ConnectionCloseHandler()); + + //replicated handlers: + registry.addHandler(ExchangeDeclareBody.class, replicated(ExchangeDeclareHandler.getInstance())); + registry.addHandler(ExchangeDeleteBody.class, replicated(ExchangeDeleteHandler.getInstance())); + + ChannelQueueManager channelQueueMgr = new ChannelQueueManager(); + + + LocalQueueDeclareHandler handler = new LocalQueueDeclareHandler(_groupMgr); + registry.addHandler(QueueDeclareBody.class, + chain(new QueueNameGenerator(handler), + channelQueueMgr.createQueueDeclareHandler(), + new ReplicatingHandler<QueueDeclareBody>(_groupMgr, handler))); + + registry.addHandler(QueueBindBody.class, chain(channelQueueMgr.createQueueBindHandler(), replicated(QueueBindHandler.getInstance()))); + registry.addHandler(QueueDeleteBody.class, chain(channelQueueMgr.createQueueDeleteHandler(), replicated(alternate(new QueueDeleteHandler(false), new QueueDeleteHandler(true))))); + registry.addHandler(BasicConsumeBody.class, chain(channelQueueMgr.createBasicConsumeHandler(), new ReplicatingConsumeHandler(_groupMgr))); + + //other modified handlers: + registry.addHandler(BasicCancelBody.class, alternate(new RemoteCancelHandler(), BasicCancelMethodHandler.getInstance())); + + //other unaffected handlers: + registry.addHandler(BasicPublishBody.class, BasicPublishMethodHandler.getInstance()); + registry.addHandler(BasicQosBody.class, BasicQosHandler.getInstance()); + registry.addHandler(ChannelOpenBody.class, ChannelOpenHandler.getInstance()); + registry.addHandler(ChannelCloseBody.class, ChannelCloseHandler.getInstance()); + registry.addHandler(ChannelFlowBody.class, ChannelFlowHandler.getInstance()); + registry.addHandler(TxSelectBody.class, TxSelectHandler.getInstance()); + registry.addHandler(TxCommitBody.class, TxCommitHandler.getInstance()); + registry.addHandler(TxRollbackBody.class, TxRollbackHandler.getInstance()); + + + return registry; + } + + private class SynchHandler implements StateAwareMethodListener<ClusterSynchBody> + { + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterSynchBody> evt) throws AMQException + { + _groupMgr.handleSynch(ClusteredProtocolSession.getSessionPeer(stateManager.getProtocolSession())); + } + } + + private class JoinHandler implements StateAwareMethodListener<ClusterJoinBody> + { + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterJoinBody> evt) throws AMQException + { + _groupMgr.handleJoin(new SimpleMemberHandle(evt.getMethod().broker)); + } + } + + private class LeaveHandler implements StateAwareMethodListener<ClusterLeaveBody> + { + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterLeaveBody> evt) throws AMQException + { + _groupMgr.handleLeave(new SimpleMemberHandle(evt.getMethod().broker)); + } + } + + private class SuspectHandler implements StateAwareMethodListener<ClusterSuspectBody> + { + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterSuspectBody> evt) throws AMQException + { + _groupMgr.handleSuspect(new SimpleMemberHandle(evt.getMethod().broker)); + } + } + + private class MembershipHandler implements StateAwareMethodListener<ClusterMembershipBody> + { + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterMembershipBody> evt) throws AMQException + { + ClusterMembershipBody body = evt.getMethod(); + _groupMgr.handleMembershipAnnouncement(new String(body.members)); + } + } + + private class PingHandler implements StateAwareMethodListener<ClusterPingBody> + { + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterPingBody> evt) throws AMQException + { + MemberHandle peer = new SimpleMemberHandle(evt.getMethod().broker); + _groupMgr.handlePing(peer, evt.getMethod().load); + if (evt.getMethod().responseRequired) + { + evt.getMethod().load = _loadTable.getLocalLoad(); + stateManager.getProtocolSession().writeFrame(new AMQFrame(evt.getChannelId(), evt.getMethod())); + } + } + } + + private class ConnectionOpenHandler extends ExtendedHandler<ConnectionOpenBody> + { + ConnectionOpenHandler() + { + super(ConnectionOpenMethodHandler.getInstance()); + } + + void postHandle(AMQStateManager stateMgr, AMQMethodEvent<ConnectionOpenBody> evt) + { + AMQShortString capabilities = evt.getMethod().capabilities; + if (ClusterCapability.contains(capabilities)) + { + ClusteredProtocolSession.setSessionPeer(stateMgr.getProtocolSession(), ClusterCapability.getPeer(capabilities)); + } + else + { + _loadTable.incrementLocalLoad(); + } + } + } + + private class ConnectionCloseHandler extends ExtendedHandler<ConnectionCloseBody> + { + ConnectionCloseHandler() + { + super(ConnectionCloseMethodHandler.getInstance()); + } + + void postHandle(AMQStateManager stateMgr, AMQMethodEvent<ConnectionCloseBody> evt) + { + if (!ClusteredProtocolSession.isPeerSession(stateMgr.getProtocolSession())) + { + _loadTable.decrementLocalLoad(); + } + } + } + + private <B extends AMQMethodBody> ReplicatingHandler<B> replicated(StateAwareMethodListener<B> handler) + { + return new ReplicatingHandler<B>(_groupMgr, handler); + } + + private <B extends AMQMethodBody> StateAwareMethodListener<B> alternate(StateAwareMethodListener<B> peer, StateAwareMethodListener<B> client) + { + return new PeerHandler<B>(peer, client); + } + + private <B extends AMQMethodBody> StateAwareMethodListener<B> chain(ClusterMethodHandler<B>... h) + { + return new ChainedClusterMethodHandler<B>(h); + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java new file mode 100644 index 0000000000..a2f62f714b --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java @@ -0,0 +1,55 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster.handler; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.state.StateAwareMethodListener; + +class ExtendedHandler<A extends AMQMethodBody> implements StateAwareMethodListener<A> +{ + private final StateAwareMethodListener<A> _base; + + ExtendedHandler(StateAwareMethodListener<A> base) + { + _base = base; + } + + public void methodReceived(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException + { + preHandle(stateMgr, evt); + _base.methodReceived(stateMgr, evt); + postHandle(stateMgr, evt); + } + + void preHandle(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException + { + } + + void postHandle(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException + { + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/HandlerUtils.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/HandlerUtils.java new file mode 100644 index 0000000000..0dc7fe00d2 --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/HandlerUtils.java @@ -0,0 +1,25 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster.handler; + +public abstract class HandlerUtils +{ +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java new file mode 100644 index 0000000000..f01a8349f2 --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java @@ -0,0 +1,79 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster.handler; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.QueueDeclareBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.cluster.ClusteredProtocolSession; +import org.apache.qpid.server.cluster.GroupManager; +import org.apache.qpid.server.cluster.util.LogMessage; +import org.apache.qpid.server.cluster.MemberHandle; +import org.apache.qpid.server.handler.QueueDeclareHandler; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.ClusteredQueue; +import org.apache.qpid.server.queue.PrivateQueue; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.queue.RemoteQueueProxy; +import org.apache.qpid.server.virtualhost.VirtualHost; + +public class LocalQueueDeclareHandler extends QueueDeclareHandler +{ + private static final Logger _logger = Logger.getLogger(LocalQueueDeclareHandler.class); + private final GroupManager _groupMgr; + + LocalQueueDeclareHandler(GroupManager groupMgr) + { + _groupMgr = groupMgr; + } + + protected AMQShortString createName() + { + return new AMQShortString(super.createName().toString() + "@" + _groupMgr.getLocal().getDetails()); + } + + protected AMQQueue createQueue(QueueDeclareBody body, VirtualHost virtualHost, AMQProtocolSession session) throws AMQException + { + //is it private or shared: + if (body.exclusive) + { + if (ClusteredProtocolSession.isPeerSession(session)) + { + //need to get peer from the session... + MemberHandle peer = ClusteredProtocolSession.getSessionPeer(session); + _logger.debug(new LogMessage("Creating proxied queue {0} on behalf of {1}", body.queue, peer)); + return new RemoteQueueProxy(peer, _groupMgr, body.queue, body.durable, new AMQShortString(peer.getDetails()), body.autoDelete, virtualHost); + } + else + { + _logger.debug(new LogMessage("Creating local private queue {0}", body.queue)); + return new PrivateQueue(_groupMgr, body.queue, body.durable, session.getContextKey(), body.autoDelete, virtualHost); + } + } + else + { + _logger.debug(new LogMessage("Creating local shared queue {0}", body.queue)); + return new ClusteredQueue(_groupMgr, body.queue, body.durable, null, body.autoDelete, virtualHost); + } + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java new file mode 100644 index 0000000000..8b0bb4b127 --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster.handler; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.state.StateAwareMethodListener; + +public class NullListener<T extends AMQMethodBody> implements StateAwareMethodListener<T> +{ + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<T> evt) throws AMQException + { + } +} + diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java new file mode 100644 index 0000000000..447e51ccd9 --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster.handler; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.server.cluster.ClusteredProtocolSession; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.state.StateAwareMethodListener; + +/** + * Base for implementing handlers that carry out different actions based on whether the method they + * are handling was sent by a peer (i.e. another broker in the cluster) or a client (i.e. an end-user + * application). + * + */ +public class PeerHandler<A extends AMQMethodBody> extends ClusterMethodHandler<A> +{ + private final StateAwareMethodListener<A> _peer; + private final StateAwareMethodListener<A> _client; + + PeerHandler(StateAwareMethodListener<A> peer, StateAwareMethodListener<A> client) + { + _peer = peer; + _client = client; + } + + protected void peer(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException + { + _peer.methodReceived(stateMgr, evt); + } + + protected void client(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException + { + _client.methodReceived(stateMgr, evt); + } + +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java new file mode 100644 index 0000000000..a669171d3c --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java @@ -0,0 +1,62 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster.handler; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.QueueDeclareBody; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; + +/** + * Generates queue names for queues declared with no name. + * + */ +class QueueNameGenerator extends ClusterMethodHandler<QueueDeclareBody> +{ + private final LocalQueueDeclareHandler _handler; + + QueueNameGenerator(LocalQueueDeclareHandler handler) + { + _handler = handler; + } + + protected void peer(AMQStateManager stateMgr, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException + { + } + + protected void client(AMQStateManager stateMgr, AMQMethodEvent<QueueDeclareBody> evt) + throws AMQException + { + setName(evt.getMethod());//need to set the name before propagating this method + } + + protected void setName(QueueDeclareBody body) + { + if (body.queue == null) + { + body.queue = _handler.createName(); + } + } +} + diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java new file mode 100644 index 0000000000..f09763e1ad --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java @@ -0,0 +1,59 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster.handler; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.BasicCancelBody; +import org.apache.qpid.server.cluster.ClusteredProtocolSession; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.ClusteredQueue; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.virtualhost.VirtualHost; + +public class RemoteCancelHandler implements StateAwareMethodListener<BasicCancelBody> +{ + private final Logger _logger = Logger.getLogger(RemoteCancelHandler.class); + + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicCancelBody> evt) throws AMQException + { + AMQProtocolSession session = stateManager.getProtocolSession(); + VirtualHost virtualHost = session.getVirtualHost(); + QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); + + + //By convention, consumers setup between brokers use the queue name as the consumer tag: + AMQQueue queue = queueRegistry.getQueue(evt.getMethod().consumerTag); + if (queue instanceof ClusteredQueue) + { + ((ClusteredQueue) queue).removeRemoteSubscriber(ClusteredProtocolSession.getSessionPeer(session)); + } + else + { + _logger.warn("Got remote cancel request for non-clustered queue: " + queue); + } + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java new file mode 100644 index 0000000000..073b13688c --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java @@ -0,0 +1,69 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster.handler; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.BasicConsumeBody; +import org.apache.qpid.framing.BasicConsumeOkBody; +import org.apache.qpid.server.cluster.ClusteredProtocolSession; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.ClusteredQueue; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.virtualhost.VirtualHost; + +/** + * Handles consume requests from other cluster members. + * + */ +public class RemoteConsumeHandler implements StateAwareMethodListener<BasicConsumeBody> +{ + private final Logger _logger = Logger.getLogger(RemoteConsumeHandler.class); + + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException + { + AMQProtocolSession session = stateManager.getProtocolSession(); + VirtualHost virtualHost = session.getVirtualHost(); + QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); + + AMQQueue queue = queueRegistry.getQueue(evt.getMethod().queue); + if (queue instanceof ClusteredQueue) + { + ((ClusteredQueue) queue).addRemoteSubcriber(ClusteredProtocolSession.getSessionPeer(session)); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + session.writeFrame(BasicConsumeOkBody.createAMQFrame(evt.getChannelId(), + (byte)8, (byte)0, // AMQP version (major, minor) + evt.getMethod().queue // consumerTag + )); + } + else + { + _logger.warn("Got remote consume request for non-clustered queue: " + queue); + } + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java new file mode 100644 index 0000000000..897f8e4fb7 --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java @@ -0,0 +1,90 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster.handler; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.BasicConsumeBody; +import org.apache.qpid.server.cluster.BroadcastPolicy; +import org.apache.qpid.server.cluster.GroupManager; +import org.apache.qpid.server.cluster.util.LogMessage; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.handler.BasicConsumeMethodHandler; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.virtualhost.VirtualHost; + +public class ReplicatingConsumeHandler extends ReplicatingHandler<BasicConsumeBody> +{ + ReplicatingConsumeHandler(GroupManager groupMgr) + { + this(groupMgr, null); + } + + ReplicatingConsumeHandler(GroupManager groupMgr, BroadcastPolicy policy) + { + super(groupMgr, base(), policy); + } + + protected void replicate(AMQStateManager stateManager, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException + { + AMQProtocolSession session = stateManager.getProtocolSession(); + VirtualHost virtualHost = session.getVirtualHost(); + ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); + QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); + + //only replicate if the queue in question is a shared queue + if (isShared(queueRegistry.getQueue(evt.getMethod().queue))) + { + super.replicate(stateManager, evt); + } + else + { + _logger.info(new LogMessage("Handling consume for private queue ({0}) locally", evt.getMethod())); + local(stateManager, evt); + _logger.info(new LogMessage("Handled consume for private queue ({0}) locally", evt.getMethod())); + + } + } + + protected boolean isShared(AMQQueue queue) + { + return queue != null && queue.isShared(); + } + + static StateAwareMethodListener<BasicConsumeBody> base() + { + return new PeerHandler<BasicConsumeBody>(peer(), client()); + } + + static StateAwareMethodListener<BasicConsumeBody> peer() + { + return new RemoteConsumeHandler(); + } + + static StateAwareMethodListener<BasicConsumeBody> client() + { + return BasicConsumeMethodHandler.getInstance(); + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java new file mode 100644 index 0000000000..888fa4e426 --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java @@ -0,0 +1,125 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster.handler; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.server.cluster.util.LogMessage; +import org.apache.qpid.server.cluster.*; +import org.apache.qpid.server.cluster.policy.StandardPolicies; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.virtualhost.VirtualHost; + +import java.util.List; + +/** + * Basic template for handling methods that should be broadcast to the group and + * processed locally after 'completion' of this broadcast. + * + */ +class ReplicatingHandler<A extends AMQMethodBody> extends ClusterMethodHandler<A> implements StandardPolicies +{ + protected static final Logger _logger = Logger.getLogger(ReplicatingHandler.class); + + private final StateAwareMethodListener<A> _base; + private final GroupManager _groupMgr; + private final BroadcastPolicy _policy; + + ReplicatingHandler(GroupManager groupMgr, StateAwareMethodListener<A> base) + { + this(groupMgr, base, null); + } + + ReplicatingHandler(GroupManager groupMgr, StateAwareMethodListener<A> base, BroadcastPolicy policy) + { + _groupMgr = groupMgr; + _base = base; + _policy = policy; + } + + protected void peer(AMQStateManager stateManager, AMQMethodEvent<A> evt) throws AMQException + { + AMQProtocolSession session = stateManager.getProtocolSession(); + VirtualHost virtualHost = session.getVirtualHost(); + ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); + QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); + + local(stateManager, evt); + _logger.debug(new LogMessage("Handled {0} locally", evt.getMethod())); + } + + protected void client(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException + { + replicate(stateMgr, evt); + } + + protected void replicate(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException + { + if (_policy == null) + { + //asynch delivery + _groupMgr.broadcast(new SimpleBodySendable(evt.getMethod())); + local(stateMgr, evt); + } + else + { + Callback callback = new Callback(stateMgr, evt); + _groupMgr.broadcast(new SimpleBodySendable(evt.getMethod()), _policy, callback); + } + _logger.debug(new LogMessage("Replicated {0} to peers", evt.getMethod())); + } + + protected void local(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException + { + _base.methodReceived(stateMgr, evt); + } + + private class Callback implements GroupResponseHandler + { + private final AMQStateManager _stateMgr; + private final AMQMethodEvent<A> _evt; + + Callback(AMQStateManager stateMgr, AMQMethodEvent<A> evt) + { + _stateMgr = stateMgr; + _evt = evt; + } + + public void response(List<AMQMethodBody> responses, List<Member> members) + { + try + { + local(_stateMgr, _evt); + _logger.debug(new LogMessage("Handled {0} locally, in response to completion of replication", _evt.getMethod())); + } + catch (AMQException e) + { + _logger.error(new LogMessage("Error handling {0}:{1}", _evt.getMethod(), e), e); + } + } + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java new file mode 100644 index 0000000000..8b0c638d63 --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java @@ -0,0 +1,56 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster.handler; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.state.StateAwareMethodListener; + +public class WrappedListener<T extends AMQMethodBody> implements StateAwareMethodListener<T> +{ + private final StateAwareMethodListener<T> _primary; + private final StateAwareMethodListener _post; + private final StateAwareMethodListener _pre; + + WrappedListener(StateAwareMethodListener<T> primary, StateAwareMethodListener pre, StateAwareMethodListener post) + { + _pre = check(pre); + _post = check(post); + _primary = check(primary); + } + + public void methodReceived(AMQStateManager stateMgr, AMQMethodEvent<T> evt) throws AMQException + { + _pre.methodReceived(stateMgr, evt); + _primary.methodReceived(stateMgr, evt); + _post.methodReceived(stateMgr, evt); + } + + private static <T extends AMQMethodBody> StateAwareMethodListener<T> check(StateAwareMethodListener<T> in) + { + return in == null ? new NullListener<T>() : in; + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappingMethodHandlerFactory.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappingMethodHandlerFactory.java new file mode 100644 index 0000000000..5ec3c9660a --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappingMethodHandlerFactory.java @@ -0,0 +1,85 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster.handler; + +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.server.cluster.MethodHandlerFactory; +import org.apache.qpid.server.cluster.MethodHandlerRegistry; +import org.apache.qpid.server.state.AMQState; +import org.apache.qpid.server.state.StateAwareMethodListener; + +public abstract class WrappingMethodHandlerFactory implements MethodHandlerFactory +{ + private final MethodHandlerFactory _delegate; + private final StateAwareMethodListener _pre; + private final StateAwareMethodListener _post; + + protected WrappingMethodHandlerFactory(MethodHandlerFactory delegate, + StateAwareMethodListener pre, + StateAwareMethodListener post) + { + _delegate = delegate; + _pre = pre; + _post = post; + } + + public MethodHandlerRegistry register(AMQState state, MethodHandlerRegistry registry) + { + if (isWrappableState(state)) + { + return wrap(_delegate.register(state, registry), state); + } + else + { + return _delegate.register(state, registry); + } + } + + protected abstract boolean isWrappableState(AMQState state); + + protected abstract Iterable<FrameDescriptor> getWrappableFrameTypes(AMQState state); + + private MethodHandlerRegistry wrap(MethodHandlerRegistry registry, AMQState state) + { + for (FrameDescriptor fd : getWrappableFrameTypes(state)) + { + wrap(registry, fd.type, fd.instance); + } + return registry; + } + + private <A extends AMQMethodBody, B extends Class<A>> void wrap(MethodHandlerRegistry r, B type, A frame) + { + r.addHandler(type, new WrappedListener<A>(r.getHandler(frame), _pre, _post)); + } + + protected static class FrameDescriptor<A extends AMQMethodBody, B extends Class<A>> + { + protected final A instance; + protected final B type; + + public FrameDescriptor(B type, A instance) + { + this.instance = instance; + this.type = type; + } + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/AsynchBroadcastPolicy.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/AsynchBroadcastPolicy.java new file mode 100644 index 0000000000..79cb558ede --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/AsynchBroadcastPolicy.java @@ -0,0 +1,31 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster.policy; + +import org.apache.qpid.server.cluster.BroadcastPolicy; + +public class AsynchBroadcastPolicy implements BroadcastPolicy +{ + public boolean isComplete(int responded, int members) + { + return true; + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/MajorityResponseBroadcastPolicy.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/MajorityResponseBroadcastPolicy.java new file mode 100644 index 0000000000..42382c6e7a --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/MajorityResponseBroadcastPolicy.java @@ -0,0 +1,31 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster.policy; + +import org.apache.qpid.server.cluster.BroadcastPolicy; + +public class MajorityResponseBroadcastPolicy implements BroadcastPolicy +{ + public boolean isComplete(int responded, int members) + { + return responded > members / 2; + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/OneResponseBroadcastPolicy.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/OneResponseBroadcastPolicy.java new file mode 100644 index 0000000000..e3072a6a40 --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/OneResponseBroadcastPolicy.java @@ -0,0 +1,31 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster.policy; + +import org.apache.qpid.server.cluster.BroadcastPolicy; + +public class OneResponseBroadcastPolicy implements BroadcastPolicy +{ + public boolean isComplete(int responded, int members) + { + return responded > 0; + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/StandardPolicies.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/StandardPolicies.java new file mode 100644 index 0000000000..dbaf690d3a --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/StandardPolicies.java @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster.policy; + +import org.apache.qpid.server.cluster.BroadcastPolicy; + +public interface StandardPolicies +{ + public static final BroadcastPolicy ASYNCH_POLICY = new AsynchBroadcastPolicy(); + public static final BroadcastPolicy SYNCH_POLICY = new SynchBroadcastPolicy(); +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/SynchBroadcastPolicy.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/SynchBroadcastPolicy.java new file mode 100644 index 0000000000..605b8dd51e --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/policy/SynchBroadcastPolicy.java @@ -0,0 +1,31 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster.policy; + +import org.apache.qpid.server.cluster.BroadcastPolicy; + +public class SynchBroadcastPolicy implements BroadcastPolicy +{ + public boolean isComplete(int responded, int members) + { + return responded == members; + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ChainedMethodRecorder.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ChainedMethodRecorder.java new file mode 100644 index 0000000000..3664be58bc --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ChainedMethodRecorder.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster.replay; + +import org.apache.qpid.framing.AMQMethodBody; + +abstract class ChainedMethodRecorder <T extends AMQMethodBody> implements MethodRecorder<T> +{ + private final MethodRecorder<T> _recorder; + + ChainedMethodRecorder() + { + this(null); + } + + ChainedMethodRecorder(MethodRecorder<T> recorder) + { + _recorder = recorder; + } + + public final void record(T method) + { + if(!doRecord(method) && _recorder != null) + { + _recorder.record(method); + } + } + + protected abstract boolean doRecord(T method); +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java new file mode 100644 index 0000000000..5a433b869b --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java @@ -0,0 +1,83 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster.replay; + +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.BasicConsumeBody; +import org.apache.qpid.framing.AMQShortString; + +import java.util.Map; +import java.util.HashMap; +import java.util.List; + +class ConsumerCounts +{ + private final Map<AMQShortString, Integer> _counts = new HashMap<AMQShortString, Integer>(); + + synchronized void increment(AMQShortString queue) + { + _counts.put(queue, get(queue) + 1); + } + + synchronized void decrement(AMQShortString queue) + { + _counts.put(queue, get(queue) - 1); + } + + private int get(AMQShortString queue) + { + Integer count = _counts.get(queue); + return count == null ? 0 : count; + } + + synchronized void replay(List<AMQMethodBody> messages) + { + for(AMQShortString queue : _counts.keySet()) + { + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + BasicConsumeBody m = new BasicConsumeBody((byte)8, + (byte)0, + BasicConsumeBody.getClazz((byte)8, (byte)0), + BasicConsumeBody.getMethod((byte)8, (byte)0), + null, + queue, + false, + false, + false, + false, + queue, + 0); + m.queue = queue; + m.consumerTag = queue; + replay(m, messages); + } + } + + private void replay(BasicConsumeBody msg, List<AMQMethodBody> messages) + { + int count = _counts.get(msg.queue); + for(int i = 0; i < count; i++) + { + messages.add(msg); + } + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/MethodRecorder.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/MethodRecorder.java new file mode 100644 index 0000000000..e45810438e --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/MethodRecorder.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster.replay; + +import org.apache.qpid.framing.AMQMethodBody; + +/** + * Abstraction through which a method can be recorded for replay + * + */ +interface MethodRecorder<T extends AMQMethodBody> +{ + public void record(T method); +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java new file mode 100644 index 0000000000..4d3fe1dbed --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java @@ -0,0 +1,77 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster.replay; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.BasicCancelBody; +import org.apache.qpid.framing.BasicConsumeBody; +import org.apache.qpid.framing.ExchangeDeclareBody; +import org.apache.qpid.framing.ExchangeDeleteBody; +import org.apache.qpid.framing.QueueBindBody; +import org.apache.qpid.framing.QueueDeclareBody; +import org.apache.qpid.framing.QueueDeleteBody; +import org.apache.qpid.server.cluster.MethodHandlerFactory; +import org.apache.qpid.server.cluster.MethodHandlerRegistry; +import org.apache.qpid.server.cluster.handler.WrappingMethodHandlerFactory; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQState; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.state.StateAwareMethodListener; + +import java.util.Arrays; + +public class RecordingMethodHandlerFactory extends WrappingMethodHandlerFactory +{ + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + private final byte major = (byte)8; + private final byte minor = (byte)0; + private final Iterable<FrameDescriptor> _frames = Arrays.asList(new FrameDescriptor[] + { + new FrameDescriptor(QueueDeclareBody.class, new QueueDeclareBody(major, minor, QueueDeclareBody.getClazz(major, minor), QueueDeclareBody.getMethod(major, minor),null,false,false,false,false,false,null,0)), + new FrameDescriptor(QueueDeleteBody.class, new QueueDeleteBody(major, minor, QueueDeleteBody.getClazz(major, minor), QueueDeleteBody.getMethod(major, minor),false,false,false,null,0)), + new FrameDescriptor(QueueBindBody.class, new QueueBindBody(major, minor, QueueBindBody.getClazz(major, minor), QueueBindBody.getMethod(major, minor),null,null,false,null,null,0)), + new FrameDescriptor(ExchangeDeclareBody.class, new ExchangeDeclareBody(major, minor, ExchangeDeclareBody.getClazz(major, minor), ExchangeDeclareBody.getMethod(major, minor),null,false,false,null,false,false,false,0,null)), + new FrameDescriptor(ExchangeDeleteBody.class, new ExchangeDeleteBody(major, minor, ExchangeDeleteBody.getClazz(major, minor), ExchangeDeleteBody.getMethod(major, minor),null,false,false,0)), + new FrameDescriptor(BasicConsumeBody.class, new BasicConsumeBody(major, minor, BasicConsumeBody.getClazz(major, minor), BasicConsumeBody.getMethod(major, minor),null,null,false,false,false,false,null,0)), + new FrameDescriptor(BasicCancelBody.class, new BasicCancelBody(major, minor, BasicCancelBody.getClazz(major, minor), BasicCancelBody.getMethod(major, minor),null,false)) + }); + + + public RecordingMethodHandlerFactory(MethodHandlerFactory factory, ReplayStore store) + { + super(factory, null, store); + } + + protected boolean isWrappableState(AMQState state) + { + return AMQState.CONNECTION_OPEN.equals(state); + } + + protected Iterable<FrameDescriptor> getWrappableFrameTypes(AMQState state) + { + return _frames; + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayManager.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayManager.java new file mode 100644 index 0000000000..898cb80cb3 --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayManager.java @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster.replay; + +import org.apache.qpid.server.cluster.Sendable; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQMethodBody; + +import java.util.List; + +/** + * Abstraction of a replay strategy for use in getting joining members up to + * date with respect to cluster state. + * + */ +public interface ReplayManager +{ + public List<AMQMethodBody> replay(boolean isLeader); +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java new file mode 100644 index 0000000000..d7bbb1c36b --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java @@ -0,0 +1,311 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster.replay; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.*; +import org.apache.qpid.server.cluster.ClusteredProtocolSession; +import org.apache.qpid.server.cluster.util.LogMessage; +import org.apache.qpid.server.cluster.util.Bindings; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.virtualhost.VirtualHost; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Stores method invocations for replay to new members. + * + */ +public class ReplayStore implements ReplayManager, StateAwareMethodListener +{ + private static final Logger _logger = Logger.getLogger(ReplayStore.class); + + private final Map<Class<? extends AMQMethodBody>, MethodRecorder> _globalRecorders = new HashMap<Class<? extends AMQMethodBody>, MethodRecorder>(); + private final Map<Class<? extends AMQMethodBody>, MethodRecorder> _localRecorders = new HashMap<Class<? extends AMQMethodBody>, MethodRecorder>(); + private final Map<AMQShortString, QueueDeclareBody> _sharedQueues = new ConcurrentHashMap<AMQShortString, QueueDeclareBody>(); + private final Map<AMQShortString, QueueDeclareBody> _privateQueues = new ConcurrentHashMap<AMQShortString, QueueDeclareBody>(); + private final Bindings<AMQShortString, AMQShortString, QueueBindBody> _sharedBindings = new Bindings<AMQShortString, AMQShortString, QueueBindBody>(); + private final Bindings<AMQShortString, AMQShortString, QueueBindBody> _privateBindings = new Bindings<AMQShortString, AMQShortString, QueueBindBody>(); + private final Map<AMQShortString, ExchangeDeclareBody> _exchanges = new ConcurrentHashMap<AMQShortString, ExchangeDeclareBody>(); + private final ConsumerCounts _consumers = new ConsumerCounts(); + + public ReplayStore() + { + _globalRecorders.put(QueueDeclareBody.class, new SharedQueueDeclareRecorder()); + _globalRecorders.put(QueueDeleteBody.class, new SharedQueueDeleteRecorder()); + _globalRecorders.put(QueueBindBody.class, new SharedQueueBindRecorder()); + _globalRecorders.put(ExchangeDeclareBody.class, new ExchangeDeclareRecorder()); + _globalRecorders.put(ExchangeDeleteBody.class, new ExchangeDeleteRecorder()); + + _localRecorders.put(QueueDeclareBody.class, new PrivateQueueDeclareRecorder()); + _localRecorders.put(QueueDeleteBody.class, new PrivateQueueDeleteRecorder()); + _localRecorders.put(QueueBindBody.class, new PrivateQueueBindRecorder()); + _localRecorders.put(BasicConsumeBody.class, new BasicConsumeRecorder()); + _localRecorders.put(BasicCancelBody.class, new BasicCancelRecorder()); + _localRecorders.put(ExchangeDeclareBody.class, new ExchangeDeclareRecorder()); + _localRecorders.put(ExchangeDeleteBody.class, new ExchangeDeleteRecorder()); + } + + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + { + AMQProtocolSession session = stateManager.getProtocolSession(); + VirtualHost virtualHost = session.getVirtualHost(); + + _logger.debug(new LogMessage("Replay store received {0}", evt.getMethod())); + AMQMethodBody request = evt.getMethod(); + + //allow any (relevant) recorder registered for this type of request to record it: + MethodRecorder recorder = getRecorders(session).get(request.getClass()); + if (recorder != null) + { + recorder.record(request); + } + } + + private Map<Class<? extends AMQMethodBody>, MethodRecorder> getRecorders(AMQProtocolSession session) + { + if (ClusteredProtocolSession.isPeerSession(session)) + { + return _globalRecorders; + } + else + { + return _localRecorders; + } + } + + public List<AMQMethodBody> replay(boolean isLeader) + { + List<AMQMethodBody> methods = new ArrayList<AMQMethodBody>(); + methods.addAll(_exchanges.values()); + methods.addAll(_privateQueues.values()); + synchronized(_privateBindings) + { + methods.addAll(_privateBindings.values()); + } + if (isLeader) + { + methods.addAll(_sharedQueues.values()); + synchronized(_sharedBindings) + { + methods.addAll(_sharedBindings.values()); + } + } + _consumers.replay(methods); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + methods.add(new ClusterSynchBody((byte)8, (byte)0, ClusterSynchBody.getClazz((byte)8, (byte)0), ClusterSynchBody.getMethod((byte)8, (byte)0))); + return methods; + } + + private class BasicConsumeRecorder implements MethodRecorder<BasicConsumeBody> + { + public void record(BasicConsumeBody method) + { + if(_sharedQueues.containsKey(method.queue)) + { + _consumers.increment(method.queue); + } + } + } + + private class BasicCancelRecorder implements MethodRecorder<BasicCancelBody> + { + public void record(BasicCancelBody method) + { + if(_sharedQueues.containsKey(method.consumerTag)) + { + _consumers.decrement(method.consumerTag); + } + } + } + + private class SharedQueueDeclareRecorder extends QueueDeclareRecorder + { + SharedQueueDeclareRecorder() + { + super(false, _sharedQueues); + } + } + + private class PrivateQueueDeclareRecorder extends QueueDeclareRecorder + { + PrivateQueueDeclareRecorder() + { + super(true, _privateQueues, new SharedQueueDeclareRecorder()); + } + } + + private class SharedQueueDeleteRecorder extends QueueDeleteRecorder + { + SharedQueueDeleteRecorder() + { + super(_sharedQueues, _sharedBindings); + } + } + + private class PrivateQueueDeleteRecorder extends QueueDeleteRecorder + { + PrivateQueueDeleteRecorder() + { + super(_privateQueues, _privateBindings, new SharedQueueDeleteRecorder()); + } + } + + private class SharedQueueBindRecorder extends QueueBindRecorder + { + SharedQueueBindRecorder() + { + super(_sharedQueues, _sharedBindings); + } + } + + private class PrivateQueueBindRecorder extends QueueBindRecorder + { + PrivateQueueBindRecorder() + { + super(_privateQueues, _privateBindings, new SharedQueueBindRecorder()); + } + } + + + private static class QueueDeclareRecorder extends ChainedMethodRecorder<QueueDeclareBody> + { + private final boolean _exclusive; + private final Map<AMQShortString, QueueDeclareBody> _queues; + + QueueDeclareRecorder(boolean exclusive, Map<AMQShortString, QueueDeclareBody> queues) + { + _queues = queues; + _exclusive = exclusive; + } + + QueueDeclareRecorder(boolean exclusive, Map<AMQShortString, QueueDeclareBody> queues, QueueDeclareRecorder recorder) + { + super(recorder); + _queues = queues; + _exclusive = exclusive; + } + + + protected boolean doRecord(QueueDeclareBody method) + { + if (_exclusive == method.exclusive) + { + _queues.put(method.queue, method); + return true; + } + else + { + return false; + } + } + } + + private class QueueDeleteRecorder extends ChainedMethodRecorder<QueueDeleteBody> + { + private final Map<AMQShortString, QueueDeclareBody> _queues; + private final Bindings<AMQShortString, AMQShortString, QueueBindBody> _bindings; + + QueueDeleteRecorder(Map<AMQShortString, QueueDeclareBody> queues, Bindings<AMQShortString, AMQShortString, QueueBindBody> bindings) + { + this(queues, bindings, null); + } + + QueueDeleteRecorder(Map<AMQShortString, QueueDeclareBody> queues, Bindings<AMQShortString, AMQShortString, QueueBindBody> bindings, QueueDeleteRecorder recorder) + { + super(recorder); + _queues = queues; + _bindings = bindings; + } + + protected boolean doRecord(QueueDeleteBody method) + { + if (_queues.remove(method.queue) != null) + { + _bindings.unbind1(method.queue); + return true; + } + else + { + return false; + } + } + } + + private class QueueBindRecorder extends ChainedMethodRecorder<QueueBindBody> + { + private final Map<AMQShortString, QueueDeclareBody> _queues; + private final Bindings<AMQShortString, AMQShortString, QueueBindBody> _bindings; + + QueueBindRecorder(Map<AMQShortString, QueueDeclareBody> queues, Bindings<AMQShortString, AMQShortString, QueueBindBody> bindings) + { + _queues = queues; + _bindings = bindings; + } + + QueueBindRecorder(Map<AMQShortString, QueueDeclareBody> queues, Bindings<AMQShortString, AMQShortString, QueueBindBody> bindings, QueueBindRecorder recorder) + { + super(recorder); + _queues = queues; + _bindings = bindings; + } + + protected boolean doRecord(QueueBindBody method) + { + if (_queues.containsKey(method.queue)) + { + _bindings.bind(method.queue, method.exchange, method); + return true; + } + else + { + return false; + } + } + } + + private class ExchangeDeclareRecorder implements MethodRecorder<ExchangeDeclareBody> + { + public void record(ExchangeDeclareBody method) + { + _exchanges.put(method.exchange, method); + } + } + + private class ExchangeDeleteRecorder implements MethodRecorder<ExchangeDeleteBody> + { + public void record(ExchangeDeleteBody method) + { + _exchanges.remove(method.exchange); + } + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/Bindings.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/Bindings.java new file mode 100644 index 0000000000..49de0a7cbf --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/Bindings.java @@ -0,0 +1,83 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster.util; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; + +/** + * Maps two separate keys to a list of values. + * + */ +public class Bindings<K1, K2, V> +{ + private final MultiValuedMap<K1, Binding<K2>> _a = new MultiValuedMap<K1, Binding<K2>>(); + private final MultiValuedMap<K2, Binding<K1>> _b = new MultiValuedMap<K2, Binding<K1>>(); + private final Collection<V> _values = new HashSet<V>(); + + public void bind(K1 key1, K2 key2, V value) + { + _a.add(key1, new Binding<K2>(key2, value)); + _b.add(key2, new Binding<K1>(key1, value)); + _values.add(value); + } + + public void unbind1(K1 key1) + { + Collection<Binding<K2>> values = _a.remove(key1); + for (Binding<K2> v : values) + { + _b.remove(v.key); + _values.remove(v.value); + } + } + + public void unbind2(K2 key2) + { + Collection<Binding<K1>> values = _b.remove(key2); + for (Binding<K1> v : values) + { + _a.remove(v.key); + _values.remove(v.value); + } + } + + public Collection<V> values() + { + return Collections.unmodifiableCollection(_values); + } + + /** + * Value needs to hold key to the other map + */ + private class Binding<T> + { + private final T key; + private final V value; + + Binding(T key, V value) + { + this.key = key; + this.value = value; + } + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/InvokeMultiple.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/InvokeMultiple.java new file mode 100644 index 0000000000..406fe45701 --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/InvokeMultiple.java @@ -0,0 +1,72 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster.util; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.Set; +import java.util.HashSet; + +/** + * Allows a method to be invoked on a list of listeners with one call + * + */ +public class InvokeMultiple <T> implements InvocationHandler +{ + private final Set<T> _targets = new HashSet<T>(); + private final T _proxy; + + public InvokeMultiple(Class<? extends T> type) + { + _proxy = (T) Proxy.newProxyInstance(type.getClassLoader(), new Class[]{type}, this); + } + + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable + { + Set<T> targets; + synchronized(this) + { + targets = new HashSet<T>(_targets); + } + + for(T target : targets) + { + method.invoke(target, args); + } + return null; + } + + public synchronized void addListener(T t) + { + _targets.add(t); + } + + public synchronized void removeListener(T t) + { + _targets.remove(t); + } + + public T getProxy() + { + return _proxy; + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/LogMessage.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/LogMessage.java new file mode 100644 index 0000000000..9be90298ea --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/LogMessage.java @@ -0,0 +1,53 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster.util; + +import java.text.MessageFormat; + +/** + * Convenience class to allow log messages to be specified in terms + * of MessageFormat patterns with a variable set of parameters. The + * production of the string is only done if toSTring is called so it + * works well with debug level messages, allowing complex messages + * to be specified that are only evaluated if actually printed. + * + */ +public class LogMessage +{ + private final String _message; + private final Object[] _args; + + public LogMessage(String message) + { + this(message, new Object[0]); + } + + public LogMessage(String message, Object... args) + { + _message = message; + _args = args; + } + + public String toString() + { + return MessageFormat.format(_message, _args); + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/MultiValuedMap.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/MultiValuedMap.java new file mode 100644 index 0000000000..ebe1fe47dd --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/cluster/util/MultiValuedMap.java @@ -0,0 +1,61 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster.util; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * Maps a key to a collection of values + * + */ +public class MultiValuedMap<K, V> +{ + private Map<K, Collection<V>> _map = new HashMap<K, Collection<V>>(); + + public boolean add(K key, V value) + { + Collection<V> values = get(key); + if (values == null) + { + values = createList(); + _map.put(key, values); + } + return values.add(value); + } + + public Collection<V> get(K key) + { + return _map.get(key); + } + + public Collection<V> remove(K key) + { + return _map.remove(key); + } + + protected Collection<V> createList() + { + return new ArrayList<V>(); + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java new file mode 100644 index 0000000000..9fa96ece1e --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java @@ -0,0 +1,175 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.BasicCancelBody; +import org.apache.qpid.framing.QueueDeleteBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.cluster.*; +import org.apache.qpid.server.cluster.util.LogMessage; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.virtualhost.VirtualHost; + +import java.util.concurrent.ConcurrentHashMap; + +/** + * Represents a shared queue in a cluster. The key difference is that as well as any + * local consumers, there may be consumers for this queue on other members of the + * cluster. + * + */ +public class ClusteredQueue extends AMQQueue +{ + private static final Logger _logger = Logger.getLogger(ClusteredQueue.class); + private final ConcurrentHashMap<SimpleMemberHandle, RemoteSubscriptionImpl> _peers = new ConcurrentHashMap<SimpleMemberHandle, RemoteSubscriptionImpl>(); + private final GroupManager _groupMgr; + private final NestedSubscriptionManager _subscriptions; + + public ClusteredQueue(GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) + throws AMQException + { + super(name, durable, owner, autoDelete, virtualHost, new ClusteredSubscriptionManager()); + _groupMgr = groupMgr; + _subscriptions = ((ClusteredSubscriptionManager) getSubscribers()).getAllSubscribers(); + } + + + public void process(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException + { + _logger.info(new LogMessage("{0} delivered to clustered queue {1}", msg, this)); + super.process(storeContext, msg, deliverFirst); + } + + protected void autodelete() throws AMQException + { + if(!_subscriptions.hasActiveSubscribers()) + { + //delete locally: + delete(); + + //send deletion request to all other members: + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + QueueDeleteBody request = new QueueDeleteBody((byte)8, + (byte)0, + QueueDeleteBody.getClazz((byte)8,(byte)0), + QueueDeleteBody.getMethod((byte)8,(byte)0), + false, + false, + false, + getName(), + 0); + + _groupMgr.broadcast(new SimpleBodySendable(request)); + } + } + + public void unregisterProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag) throws AMQException + { + //handle locally: + super.unregisterProtocolSession(ps, channel, consumerTag); + + //signal other members: + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + BasicCancelBody request = new BasicCancelBody((byte)8, + (byte)0, + BasicCancelBody.getClazz((byte)8, (byte)0), + BasicCancelBody.getMethod((byte)8, (byte)0), + getName(), + false); + + _groupMgr.broadcast(new SimpleBodySendable(request)); + } + + public void addRemoteSubcriber(MemberHandle peer) + { + _logger.info(new LogMessage("Added remote subscriber for {0} to clustered queue {1}", peer, this)); + //find (or create) a matching subscriber for the peer then increment the count + getSubscriber(key(peer), true).increment(); + } + + public void removeRemoteSubscriber(MemberHandle peer) + { + //find a matching subscriber for the peer then decrement the count + //if count is now zero, remove the subscriber + SimpleMemberHandle key = key(peer); + RemoteSubscriptionImpl s = getSubscriber(key, true); + if (s == null) + { + throw new RuntimeException("No subscriber for " + peer); + } + if (s.decrement()) + { + _peers.remove(key); + _subscriptions.removeSubscription(s); + } + } + + public void removeAllRemoteSubscriber(MemberHandle peer) + { + SimpleMemberHandle key = key(peer); + RemoteSubscriptionImpl s = getSubscriber(key, true); + _peers.remove(key); + _subscriptions.removeSubscription(s); + } + + private RemoteSubscriptionImpl getSubscriber(SimpleMemberHandle key, boolean create) + { + RemoteSubscriptionImpl s = _peers.get(key); + if (s == null && create) + { + return addSubscriber(key, new RemoteSubscriptionImpl(_groupMgr, key)); + } + else + { + return s; + } + } + + private RemoteSubscriptionImpl addSubscriber(SimpleMemberHandle key, RemoteSubscriptionImpl s) + { + RemoteSubscriptionImpl other = _peers.putIfAbsent(key, s); + if (other == null) + { + _subscriptions.addSubscription(s); + new SubscriberCleanup(key, this, _groupMgr); + return s; + } + else + { + return other; + } + } + + private SimpleMemberHandle key(MemberHandle peer) + { + return peer instanceof SimpleMemberHandle ? (SimpleMemberHandle) peer : (SimpleMemberHandle) SimpleMemberHandle.resolve(peer); + } + + static boolean isFromBroker(AMQMessage msg) + { + return ClusteredProtocolSession.isPayloadFromPeer(msg); + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java new file mode 100644 index 0000000000..39ae7e3c3e --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java @@ -0,0 +1,102 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.cluster.util.LogMessage; + +import java.util.List; + +class ClusteredSubscriptionManager extends SubscriptionSet +{ + private static final Logger _logger = Logger.getLogger(ClusteredSubscriptionManager.class); + private final NestedSubscriptionManager _all; + + ClusteredSubscriptionManager() + { + this(new NestedSubscriptionManager()); + } + + private ClusteredSubscriptionManager(NestedSubscriptionManager all) + { + _all = all; + _all.addSubscription(new Parent()); + } + + NestedSubscriptionManager getAllSubscribers() + { + return _all; + } + + public boolean hasActiveSubscribers() + { + return _all.hasActiveSubscribers(); + } + + public Subscription nextSubscriber(AMQMessage msg) + { + if(ClusteredQueue.isFromBroker(msg)) + { + //if message is from another broker, it should only be delivered + //to another client to meet ordering constraints + Subscription s = super.nextSubscriber(msg); + _logger.info(new LogMessage("Returning next *client* subscriber {0}", s)); + if(s == null) + { + //TODO: deliver to another broker, but set the redelivered flag on the msg + //(this should be policy based) + + //for now just don't deliver it + return null; + } + else + { + return s; + } + } + Subscription s = _all.nextSubscriber(msg); + _logger.info(new LogMessage("Returning next subscriber {0}", s)); + return s; + } + + private class Parent implements WeightedSubscriptionManager + { + public int getWeight() + { + return ClusteredSubscriptionManager.this.getWeight(); + } + + public List<Subscription> getSubscriptions() + { + return ClusteredSubscriptionManager.super.getSubscriptions(); + } + + public boolean hasActiveSubscribers() + { + return ClusteredSubscriptionManager.super.hasActiveSubscribers(); + } + + public Subscription nextSubscriber(AMQMessage msg) + { + return ClusteredSubscriptionManager.super.nextSubscriber(msg); + } + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java new file mode 100644 index 0000000000..0566c5203b --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java @@ -0,0 +1,116 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import java.util.List; +import java.util.LinkedList; +import java.util.concurrent.CopyOnWriteArrayList; + +/** + * Distributes messages among a list of subsscription managers, using their + * weighting. + */ +class NestedSubscriptionManager implements SubscriptionManager +{ + private final List<WeightedSubscriptionManager> _subscribers = new CopyOnWriteArrayList<WeightedSubscriptionManager>(); + private int _iterations; + private int _index; + + void addSubscription(WeightedSubscriptionManager s) + { + _subscribers.add(s); + } + + void removeSubscription(WeightedSubscriptionManager s) + { + _subscribers.remove(s); + } + + + public List<Subscription> getSubscriptions() + { + List<Subscription> allSubs = new LinkedList<Subscription>(); + + for (WeightedSubscriptionManager subMans : _subscribers) + { + allSubs.addAll(subMans.getSubscriptions()); + } + + return allSubs; + } + + public boolean hasActiveSubscribers() + { + for (WeightedSubscriptionManager s : _subscribers) + { + if (s.hasActiveSubscribers()) + { + return true; + } + } + return false; + } + + public Subscription nextSubscriber(AMQMessage msg) + { + WeightedSubscriptionManager start = current(); + for (WeightedSubscriptionManager s = start; s != null; s = next(start)) + { + if (hasMore(s)) + { + return nextSubscriber(s); + } + } + return null; + } + + private Subscription nextSubscriber(WeightedSubscriptionManager s) + { + _iterations++; + return s.nextSubscriber(null); + } + + private WeightedSubscriptionManager current() + { + return _subscribers.isEmpty() ? null : _subscribers.get(_index); + } + + private boolean hasMore(WeightedSubscriptionManager s) + { + return _iterations < s.getWeight(); + } + + private WeightedSubscriptionManager next(WeightedSubscriptionManager start) + { + WeightedSubscriptionManager s = next(); + return s == start && !hasMore(s) ? null : s; + } + + private WeightedSubscriptionManager next() + { + _iterations = 0; + if (++_index >= _subscribers.size()) + { + _index = 0; + } + return _subscribers.get(_index); + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java new file mode 100644 index 0000000000..f8e4311a77 --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.cluster.SimpleSendable; +import org.apache.qpid.server.cluster.GroupManager; +import org.apache.qpid.server.cluster.SimpleBodySendable; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.framing.QueueDeleteBody; +import org.apache.qpid.framing.AMQShortString; + +import java.util.concurrent.Executor; + +/** + * Used to represent a private queue held locally. + * + */ +public class PrivateQueue extends AMQQueue +{ + private final GroupManager _groupMgr; + + public PrivateQueue(GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) + throws AMQException + { + super(name, durable, owner, autoDelete, virtualHost); + _groupMgr = groupMgr; + + } + + protected void autodelete() throws AMQException + { + //delete locally: + super.autodelete(); + + //send delete request to peers: + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + QueueDeleteBody request = new QueueDeleteBody((byte)8, (byte)0, + QueueDeleteBody.getClazz((byte)8, (byte)0), + QueueDeleteBody.getMethod((byte)8, (byte)0), + false,false,false,null,0); + request.queue = getName(); + _groupMgr.broadcast(new SimpleBodySendable(request)); + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/queue/ProxiedQueueCleanup.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/queue/ProxiedQueueCleanup.java new file mode 100644 index 0000000000..efc0540c18 --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/queue/ProxiedQueueCleanup.java @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.server.cluster.MembershipChangeListener; +import org.apache.qpid.server.cluster.MemberHandle; +import org.apache.qpid.server.cluster.GroupManager; +import org.apache.qpid.server.cluster.util.LogMessage; +import org.apache.qpid.AMQException; +import org.apache.log4j.Logger; + +import java.util.List; + +class ProxiedQueueCleanup implements MembershipChangeListener +{ + private static final Logger _logger = Logger.getLogger(ProxiedQueueCleanup.class); + + private final MemberHandle _subject; + private final RemoteQueueProxy _queue; + + ProxiedQueueCleanup(MemberHandle subject, RemoteQueueProxy queue) + { + _subject = subject; + _queue = queue; + } + + public void changed(List<MemberHandle> members) + { + if(!members.contains(_subject)) + { + try + { + _queue.delete(); + _logger.info(new LogMessage("Deleted {0} in response to exclusion of {1}", _queue, _subject)); + } + catch (AMQException e) + { + _logger.info(new LogMessage("Failed to delete {0} in response to exclusion of {1}: {2}", _queue, _subject, e), e); + } + } + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java new file mode 100644 index 0000000000..2a83d65ae5 --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java @@ -0,0 +1,91 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.cluster.ClusteredProtocolSession; +import org.apache.qpid.server.cluster.GroupManager; +import org.apache.qpid.server.cluster.MemberHandle; +import org.apache.qpid.server.cluster.SimpleSendable; +import org.apache.qpid.server.cluster.util.LogMessage; +import org.apache.qpid.server.virtualhost.VirtualHost; + +/** + * TODO: separate out an abstract base class from AMQQueue from which this inherits. It does + * not require all the functionality currently in AMQQueue. + * + */ +public class RemoteQueueProxy extends AMQQueue +{ + private static final Logger _logger = Logger.getLogger(RemoteQueueProxy.class); + private final MemberHandle _target; + private final GroupManager _groupMgr; + + public RemoteQueueProxy(MemberHandle target, GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) + throws AMQException + { + super(name, durable, owner, autoDelete, virtualHost); + _target = target; + _groupMgr = groupMgr; + _groupMgr.addMemberhipChangeListener(new ProxiedQueueCleanup(target, this)); + } + + + public void deliver(AMQMessage msg) throws NoConsumersException + { + if (ClusteredProtocolSession.canRelay(msg, _target)) + { + try + { + _logger.debug(new LogMessage("Relaying {0} to {1}", msg, _target)); + relay(msg); + } + catch (NoConsumersException e) + { + throw e; + } + catch (AMQException e) + { + //TODO: sort out exception handling... + e.printStackTrace(); + } + } + else + { + _logger.debug(new LogMessage("Cannot relay {0} to {1}", msg, _target)); + } + } + + void relay(AMQMessage msg) throws AMQException + { + // TODO FIXME - can no longer update the publish body as it is an opaque wrapper object + // if cluster can handle immediate then it should wrap the wrapper... + +// BasicPublishBody publish = msg.getMessagePublishInfo(); +// publish.immediate = false; //can't as yet handle the immediate flag in a cluster + + // send this on to the broker for which it is acting as proxy: + _groupMgr.send(_target, new SimpleSendable(msg)); + } +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java new file mode 100644 index 0000000000..e396432cea --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java @@ -0,0 +1,176 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.server.cluster.MemberHandle; +import org.apache.qpid.server.cluster.GroupManager; +import org.apache.qpid.server.cluster.SimpleSendable; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.AMQException; + +import java.util.Queue; +import java.util.List; + +class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManager +{ + private final GroupManager _groupMgr; + private final MemberHandle _peer; + private boolean _suspended; + private int _count; + + RemoteSubscriptionImpl(GroupManager groupMgr, MemberHandle peer) + { + _groupMgr = groupMgr; + _peer = peer; + } + + synchronized void increment() + { + _count++; + } + + synchronized boolean decrement() + { + return --_count <= 0; + } + + public void send(AMQMessage msg, AMQQueue queue) + { + try + { + _groupMgr.send(_peer, new SimpleSendable(msg)); + } + catch (AMQException e) + { + //TODO: handle exceptions properly... + e.printStackTrace(); + } + } + + public synchronized void setSuspended(boolean suspended) + { + _suspended = suspended; + } + + public synchronized boolean isSuspended() + { + return _suspended; + } + + public synchronized int getWeight() + { + return _count; + } + + public List<Subscription> getSubscriptions() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean hasActiveSubscribers() + { + return getWeight() == 0; + } + + public Subscription nextSubscriber(AMQMessage msg) + { + return this; + } + + public void queueDeleted(AMQQueue queue) + { + if (queue instanceof ClusteredQueue) + { + ((ClusteredQueue) queue).removeAllRemoteSubscriber(_peer); + } + } + + public boolean filtersMessages() + { + return false; + } + + public boolean hasInterest(AMQMessage msg) + { + return true; + } + + public Queue<AMQMessage> getPreDeliveryQueue() + { + return null; + } + + public Queue<AMQMessage> getResendQueue() + { + return null; + } + + public Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages) + { + return messages; + } + + public void enqueueForPreDelivery(AMQMessage msg, boolean deliverFirst) + { + //no-op -- if selectors are implemented on RemoteSubscriptions then look at SubscriptionImpl + } + + public boolean isAutoClose() + { + return false; + } + + public void close() + { + //no-op + } + + public boolean isClosed() + { + return false; + } + + public boolean isBrowser() + { + return false; + } + + public boolean wouldSuspend(AMQMessage msg) + { + return _suspended; + } + + public void addToResendQueue(AMQMessage msg) + { + //no-op + } + + public Object getSendLock() + { + return new Object(); + } + + public AMQChannel getChannel() + { + return null; + } + +} diff --git a/Final/java/cluster/src/main/java/org/apache/qpid/server/queue/SubscriberCleanup.java b/Final/java/cluster/src/main/java/org/apache/qpid/server/queue/SubscriberCleanup.java new file mode 100644 index 0000000000..cc951a4709 --- /dev/null +++ b/Final/java/cluster/src/main/java/org/apache/qpid/server/queue/SubscriberCleanup.java @@ -0,0 +1,56 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.server.cluster.MembershipChangeListener; +import org.apache.qpid.server.cluster.MemberHandle; +import org.apache.qpid.server.cluster.GroupManager; +import org.apache.qpid.server.cluster.util.LogMessage; +import org.apache.log4j.Logger; + +import java.util.List; + +class SubscriberCleanup implements MembershipChangeListener +{ + private static final Logger _logger = Logger.getLogger(SubscriberCleanup.class); + + private final MemberHandle _subject; + private final ClusteredQueue _queue; + private final GroupManager _manager; + + SubscriberCleanup(MemberHandle subject, ClusteredQueue queue, GroupManager manager) + { + _subject = subject; + _queue = queue; + _manager = manager; + _manager.addMemberhipChangeListener(this); + } + + public void changed(List<MemberHandle> members) + { + if(!members.contains(_subject)) + { + _queue.removeAllRemoteSubscriber(_subject); + _manager.removeMemberhipChangeListener(this); + _logger.info(new LogMessage("Removed {0} from {1}", _subject, _queue)); + } + } +} diff --git a/Final/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerGroupTest.java b/Final/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerGroupTest.java new file mode 100644 index 0000000000..b91d7140e0 --- /dev/null +++ b/Final/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerGroupTest.java @@ -0,0 +1,270 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +import junit.framework.TestCase; + +import java.io.IOException; +import java.util.Arrays; + +public class BrokerGroupTest extends TestCase +{ + private final MemberHandle a = new SimpleMemberHandle("A", 1); + private final MemberHandle b = new SimpleMemberHandle("B", 1); + private final MemberHandle c = new SimpleMemberHandle("C", 1); + private final MemberHandle d = new SimpleMemberHandle("D", 1); + + //join (new members perspective) + // (i) connectToLeader() + // ==> check state + // (ii) setMembers() + // ==> check state + // ==> check members + // (iii) synched(leader) + // ==> check state + // ==> check peers + // (iv) synched(other) + // ==> check state + // ==> check peers + // repeat for all others + public void testJoin_newMember() throws Exception + { + MemberHandle[] pre = new MemberHandle[]{a, b, c}; + MemberHandle[] post = new MemberHandle[]{a, b, c}; + + BrokerGroup group = new BrokerGroup(d, new TestReplayManager(), new TestBrokerFactory()); + assertEquals(JoinState.UNINITIALISED, group.getState()); + //(i) + group.connectToLeader(a); + assertEquals(JoinState.JOINING, group.getState()); + assertEquals("Wrong number of peers", 1, group.getPeers().size()); + //(ii) + group.setMembers(Arrays.asList(post)); + assertEquals(JoinState.INITIATION, group.getState()); + assertEquals(Arrays.asList(post), group.getMembers()); + //(iii) & (iv) + for (MemberHandle member : pre) + { + group.synched(member); + if (member == c) + { + assertEquals(JoinState.JOINED, group.getState()); + assertEquals("Wrong number of peers", pre.length, group.getPeers().size()); + } + else + { + assertEquals(JoinState.INDUCTION, group.getState()); + assertEquals("Wrong number of peers", 1, group.getPeers().size()); + } + } + } + + //join (leaders perspective) + // (i) extablish() + // ==> check state + // ==> check members + // ==> check peers + // (ii) connectToProspect() + // ==> check members + // ==> check peers + // repeat (ii) + public void testJoin_Leader() throws IOException, InterruptedException + { + MemberHandle[] prospects = new MemberHandle[]{b, c, d}; + + BrokerGroup group = new BrokerGroup(a, new TestReplayManager(), new TestBrokerFactory()); + assertEquals(JoinState.UNINITIALISED, group.getState()); + //(i) + group.establish(); + assertEquals(JoinState.JOINED, group.getState()); + assertEquals("Wrong number of peers", 0, group.getPeers().size()); + assertEquals("Wrong number of members", 1, group.getMembers().size()); + assertEquals(a, group.getMembers().get(0)); + //(ii) + for (int i = 0; i < prospects.length; i++) + { + group.connectToProspect(prospects[i]); + assertEquals("Wrong number of peers", i + 1, group.getPeers().size()); + for (int j = 0; j <= i; j++) + { + assertTrue(prospects[i].matches(group.getPeers().get(i))); + } + assertEquals("Wrong number of members", i + 2, group.getMembers().size()); + assertEquals(a, group.getMembers().get(0)); + for (int j = 0; j <= i; j++) + { + assertEquals(prospects[i], group.getMembers().get(i + 1)); + } + } + } + + //join (general perspective) + // (i) set up group + // (ii) setMembers() + // ==> check members + // ==> check peers + public void testJoin_general() throws Exception + { + MemberHandle[] view1 = new MemberHandle[]{a, b, c}; + MemberHandle[] view2 = new MemberHandle[]{a, b, c, d}; + MemberHandle[] peers = new MemberHandle[]{a, b, d}; + + BrokerGroup group = new BrokerGroup(c, new TestReplayManager(), new TestBrokerFactory()); + //(i) + group.connectToLeader(a); + group.setMembers(Arrays.asList(view1)); + for (MemberHandle h : view1) + { + group.synched(h); + } + //(ii) + group.setMembers(Arrays.asList(view2)); + assertEquals(Arrays.asList(view2), group.getMembers()); + assertEquals(peers.length, group.getPeers().size()); + for (int i = 0; i < peers.length; i++) + { + assertTrue(peers[i].matches(group.getPeers().get(i))); + } + } + + //leadership transfer (valid) + // (i) set up group + // (ii) assumeLeadership() + // ==> check return value + // ==> check members + // ==> check peers + // ==> check isLeader() + // ==> check isLeader(old_leader) + // ==> check isMember(old_leader) + public void testTransferLeadership_valid() throws Exception + { + MemberHandle[] view1 = new MemberHandle[]{a, b}; + MemberHandle[] view2 = new MemberHandle[]{a, b, c, d}; + MemberHandle[] view3 = new MemberHandle[]{b, c, d}; + + BrokerGroup group = new BrokerGroup(b, new TestReplayManager(), new TestBrokerFactory()); + //(i) + group.connectToLeader(a); + group.setMembers(Arrays.asList(view1)); + for (MemberHandle h : view1) + { + group.synched(h); + } + group.setMembers(Arrays.asList(view2)); + //(ii) + boolean result = group.assumeLeadership(); + assertTrue(result); + assertTrue(group.isLeader()); + assertFalse(group.isLeader(a)); + assertEquals(Arrays.asList(view3), group.getMembers()); + assertEquals(2, group.getPeers().size()); + assertTrue(c.matches(group.getPeers().get(0))); + assertTrue(d.matches(group.getPeers().get(1))); + } + + //leadership transfer (invalid) + // (i) set up group + // (ii) assumeLeadership() + // ==> check return value + // ==> check members + // ==> check peers + // ==> check isLeader() + // ==> check isLeader(old_leader) + // ==> check isMember(old_leader) + public void testTransferLeadership_invalid() throws Exception + { + MemberHandle[] view1 = new MemberHandle[]{a, b, c}; + MemberHandle[] view2 = new MemberHandle[]{a, b, c, d}; + + BrokerGroup group = new BrokerGroup(c, new TestReplayManager(), new TestBrokerFactory()); + //(i) + group.connectToLeader(a); + group.setMembers(Arrays.asList(view1)); + for (MemberHandle h : view1) + { + group.synched(h); + } + group.setMembers(Arrays.asList(view2)); + //(ii) + boolean result = group.assumeLeadership(); + assertFalse(result); + assertFalse(group.isLeader()); + assertTrue(group.isLeader(a)); + assertEquals(Arrays.asList(view2), group.getMembers()); + assertEquals(3, group.getPeers().size()); + assertTrue(a.matches(group.getPeers().get(0))); + assertTrue(b.matches(group.getPeers().get(1))); + assertTrue(d.matches(group.getPeers().get(2))); + + } + + //leave (leaders perspective) + // (i) set up group + // (ii) remove a member + // ==> check members + // ==> check peers + // ==> check isMember(removed_member) + // repeat (ii) + public void testLeave_leader() + { + MemberHandle[] view1 = new MemberHandle[]{a, b, c, d}; + MemberHandle[] view2 = new MemberHandle[]{a, b, d}; + MemberHandle[] view3 = new MemberHandle[]{a, d}; + MemberHandle[] view4 = new MemberHandle[]{a}; + //(i) + BrokerGroup group = new BrokerGroup(a, new TestReplayManager(), new TestBrokerFactory()); + group.establish(); + group.setMembers(Arrays.asList(view1)); + //(ii) + group.remove(group.findBroker(c, false)); + assertEquals(Arrays.asList(view2), group.getMembers()); + + group.remove(group.findBroker(b, false)); + assertEquals(Arrays.asList(view3), group.getMembers()); + + group.remove(group.findBroker(d, false)); + assertEquals(Arrays.asList(view4), group.getMembers()); + } + + + //leave (general perspective) + // (i) set up group + // (ii) setMember + // ==> check members + // ==> check peers + // ==> check isMember(removed_member) + // repeat (ii) + public void testLeave_general() + { + MemberHandle[] view1 = new MemberHandle[]{a, b, c, d}; + MemberHandle[] view2 = new MemberHandle[]{a, c, d}; + //(i) + BrokerGroup group = new BrokerGroup(c, new TestReplayManager(), new TestBrokerFactory()); + group.establish(); //not strictly the correct way to build up the group, but ok for here + group.setMembers(Arrays.asList(view1)); + //(ii) + group.setMembers(Arrays.asList(view2)); + assertEquals(Arrays.asList(view2), group.getMembers()); + assertEquals(2, group.getPeers().size()); + assertTrue(a.matches(group.getPeers().get(0))); + assertTrue(d.matches(group.getPeers().get(1))); + } +} diff --git a/Final/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java b/Final/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java new file mode 100644 index 0000000000..f1da312eea --- /dev/null +++ b/Final/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java @@ -0,0 +1,237 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +import junit.framework.TestCase; +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQFrameDecodingException; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.server.cluster.policy.StandardPolicies; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class BrokerTest extends TestCase +{ + //group request (no failure) + public void testGroupRequest_noFailure() throws AMQException + { + RecordingBroker[] brokers = new RecordingBroker[]{ + new RecordingBroker("A", 1), + new RecordingBroker("B", 2), + new RecordingBroker("C", 3) + }; + GroupResponseValidator handler = new GroupResponseValidator(new TestMethod("response"), new ArrayList<Member>(Arrays.asList(brokers))); + GroupRequest grpRequest = new GroupRequest(new SimpleBodySendable(new TestMethod("request")), StandardPolicies.SYNCH_POLICY, handler); + for (Broker b : brokers) + { + b.invoke(grpRequest); + } + grpRequest.finishedSend(); + + for (RecordingBroker b : brokers) + { + b.handleResponse(((AMQFrame) b.getMessages().get(0)).getChannel(), new TestMethod("response")); + } + + assertTrue("Handler did not receive response", handler.isCompleted()); + } + + //group request (failure) + public void testGroupRequest_failure() throws AMQException + { + RecordingBroker a = new RecordingBroker("A", 1); + RecordingBroker b = new RecordingBroker("B", 2); + RecordingBroker c = new RecordingBroker("C", 3); + RecordingBroker[] all = new RecordingBroker[]{a, b, c}; + RecordingBroker[] succeeded = new RecordingBroker[]{a, c}; + + GroupResponseValidator handler = new GroupResponseValidator(new TestMethod("response"), new ArrayList<Member>(Arrays.asList(succeeded))); + GroupRequest grpRequest = new GroupRequest(new SimpleBodySendable(new TestMethod("request")), StandardPolicies.SYNCH_POLICY, handler); + + for (Broker broker : all) + { + broker.invoke(grpRequest); + } + grpRequest.finishedSend(); + + for (RecordingBroker broker : succeeded) + { + broker.handleResponse(((AMQFrame) broker.getMessages().get(0)).getChannel(), new TestMethod("response")); + } + b.remove(); + + assertTrue("Handler did not receive response", handler.isCompleted()); + } + + + //simple send (no response) + public void testSend_noResponse() throws AMQException + { + AMQBody[] msgs = new AMQBody[]{ + new TestMethod("A"), + new TestMethod("B"), + new TestMethod("C") + }; + RecordingBroker broker = new RecordingBroker("myhost", 1); + for (AMQBody msg : msgs) + { + broker.send(new SimpleBodySendable(msg), null); + } + List<AMQDataBlock> sent = broker.getMessages(); + assertEquals(msgs.length, sent.size()); + for (int i = 0; i < msgs.length; i++) + { + assertTrue(sent.get(i) instanceof AMQFrame); + assertEquals(msgs[i], ((AMQFrame) sent.get(i)).getBodyFrame()); + } + } + + //simple send (no failure) + public void testSend_noFailure() throws AMQException + { + RecordingBroker broker = new RecordingBroker("myhost", 1); + BlockingHandler handler = new BlockingHandler(); + broker.send(new SimpleBodySendable(new TestMethod("A")), handler); + List<AMQDataBlock> sent = broker.getMessages(); + assertEquals(1, sent.size()); + assertTrue(sent.get(0) instanceof AMQFrame); + assertEquals(new TestMethod("A"), ((AMQFrame) sent.get(0)).getBodyFrame()); + + broker.handleResponse(((AMQFrame) sent.get(0)).getChannel(), new TestMethod("B")); + + assertEquals(new TestMethod("B"), handler.getResponse()); + } + + //simple send (failure) + public void testSend_failure() throws AMQException + { + RecordingBroker broker = new RecordingBroker("myhost", 1); + BlockingHandler handler = new BlockingHandler(); + broker.send(new SimpleBodySendable(new TestMethod("A")), handler); + List<AMQDataBlock> sent = broker.getMessages(); + assertEquals(1, sent.size()); + assertTrue(sent.get(0) instanceof AMQFrame); + assertEquals(new TestMethod("A"), ((AMQFrame) sent.get(0)).getBodyFrame()); + broker.remove(); + assertEquals(null, handler.getResponse()); + assertTrue(handler.isCompleted()); + assertTrue(handler.failed()); + } + + private static class TestMethod extends AMQMethodBody + { + private final Object id; + + TestMethod(Object id) + { + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + super((byte)8, (byte)0); + this.id = id; + } + + protected int getBodySize() + { + return 0; + } + + protected int getClazz() + { + return 1002; + } + + protected int getMethod() + { + return 1003; + } + + protected void writeMethodPayload(ByteBuffer buffer) + { + } + + protected byte getType() + { + return 0; + } + + protected int getSize() + { + return 0; + } + + protected void writePayload(ByteBuffer buffer) + { + } + + protected void populateMethodBodyFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException + { + } + + protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException + { + } + + public boolean equals(Object o) + { + return o instanceof TestMethod && id.equals(((TestMethod) o).id); + } + + public int hashCode() + { + return id.hashCode(); + } + + } + + private static class GroupResponseValidator implements GroupResponseHandler + { + private final AMQMethodBody _response; + private final List<Member> _members; + private boolean _completed = false; + + GroupResponseValidator(AMQMethodBody response, List<Member> members) + { + _response = response; + _members = members; + } + + public void response(List<AMQMethodBody> responses, List<Member> members) + { + for (AMQMethodBody r : responses) + { + assertEquals(_response, r); + } + assertEquals(_members, members); + _completed = true; + } + + boolean isCompleted() + { + return _completed; + } + } +} diff --git a/Final/java/cluster/src/test/java/org/apache/qpid/server/cluster/ClusterCapabilityTest.java b/Final/java/cluster/src/test/java/org/apache/qpid/server/cluster/ClusterCapabilityTest.java new file mode 100644 index 0000000000..830a00f4c2 --- /dev/null +++ b/Final/java/cluster/src/test/java/org/apache/qpid/server/cluster/ClusterCapabilityTest.java @@ -0,0 +1,43 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +import junit.framework.TestCase; +import org.apache.qpid.framing.AMQShortString; + +public class ClusterCapabilityTest extends TestCase +{ + public void testStartWithNull() + { + MemberHandle peer = new SimpleMemberHandle("myhost:9999"); + AMQShortString c = ClusterCapability.add(null, peer); + assertTrue(ClusterCapability.contains(c)); + assertTrue(peer.matches(ClusterCapability.getPeer(c))); + } + + public void testStartWithText() + { + MemberHandle peer = new SimpleMemberHandle("myhost:9999"); + AMQShortString c = ClusterCapability.add(new AMQShortString("existing text"), peer); + assertTrue(ClusterCapability.contains(c)); + assertTrue(peer.matches(ClusterCapability.getPeer(c))); + } +} diff --git a/Final/java/cluster/src/test/java/org/apache/qpid/server/cluster/InductionBufferTest.java b/Final/java/cluster/src/test/java/org/apache/qpid/server/cluster/InductionBufferTest.java new file mode 100644 index 0000000000..7e58add91e --- /dev/null +++ b/Final/java/cluster/src/test/java/org/apache/qpid/server/cluster/InductionBufferTest.java @@ -0,0 +1,106 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.mina.common.IoSession; + +import java.util.List; +import java.util.ArrayList; + +import junit.framework.TestCase; + +public class InductionBufferTest extends TestCase +{ + public void test() throws Exception + { + IoSession session1 = new TestSession(); + IoSession session2 = new TestSession(); + IoSession session3 = new TestSession(); + + TestMessageHandler handler = new TestMessageHandler(); + InductionBuffer buffer = new InductionBuffer(handler); + + buffer.receive(session1, "one"); + buffer.receive(session2, "two"); + buffer.receive(session3, "three"); + + buffer.receive(session1, "four"); + buffer.receive(session1, "five"); + buffer.receive(session1, "six"); + + buffer.receive(session3, "seven"); + buffer.receive(session3, "eight"); + + handler.checkEmpty(); + buffer.deliver(); + + handler.check(session1, "one"); + handler.check(session2, "two"); + handler.check(session3, "three"); + + handler.check(session1, "four"); + handler.check(session1, "five"); + handler.check(session1, "six"); + + handler.check(session3, "seven"); + handler.check(session3, "eight"); + handler.checkEmpty(); + + buffer.receive(session1, "nine"); + buffer.receive(session2, "ten"); + buffer.receive(session3, "eleven"); + + handler.check(session1, "nine"); + handler.check(session2, "ten"); + handler.check(session3, "eleven"); + + handler.checkEmpty(); + } + + private static class TestMessageHandler implements InductionBuffer.MessageHandler + { + private final List<IoSession> _sessions = new ArrayList<IoSession>(); + private final List<Object> _msgs = new ArrayList<Object>(); + + public synchronized void deliver(IoSession session, Object msg) throws Exception + { + _sessions.add(session); + _msgs.add(msg); + } + + void check(IoSession actualSession, Object actualMsg) + { + assertFalse(_sessions.isEmpty()); + assertFalse(_msgs.isEmpty()); + IoSession expectedSession = _sessions.remove(0); + Object expectedMsg = _msgs.remove(0); + assertEquals(expectedSession, actualSession); + assertEquals(expectedMsg, actualMsg); + } + + void checkEmpty() + { + assertTrue(_sessions.isEmpty()); + assertTrue(_msgs.isEmpty()); + } + } +} + diff --git a/Final/java/cluster/src/test/java/org/apache/qpid/server/cluster/RecordingBroker.java b/Final/java/cluster/src/test/java/org/apache/qpid/server/cluster/RecordingBroker.java new file mode 100644 index 0000000000..1ec5154a98 --- /dev/null +++ b/Final/java/cluster/src/test/java/org/apache/qpid/server/cluster/RecordingBroker.java @@ -0,0 +1,53 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQDataBlock; + +import java.util.ArrayList; +import java.util.List; + +class RecordingBroker extends TestBroker +{ + private final List<AMQDataBlock> _messages = new ArrayList<AMQDataBlock>(); + + RecordingBroker(String host, int port) + { + super(host, port); + } + + public void send(AMQDataBlock data) throws AMQException + { + _messages.add(data); + } + + List<AMQDataBlock> getMessages() + { + return _messages; + } + + void clear() + { + _messages.clear(); + } + +} diff --git a/Final/java/cluster/src/test/java/org/apache/qpid/server/cluster/RecordingBrokerFactory.java b/Final/java/cluster/src/test/java/org/apache/qpid/server/cluster/RecordingBrokerFactory.java new file mode 100644 index 0000000000..d3e972e273 --- /dev/null +++ b/Final/java/cluster/src/test/java/org/apache/qpid/server/cluster/RecordingBrokerFactory.java @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +class RecordingBrokerFactory implements BrokerFactory +{ + public Broker create(MemberHandle handle) + { + return new RecordingBroker(handle.getHost(), handle.getPort()); + } +} diff --git a/Final/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java b/Final/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java new file mode 100644 index 0000000000..86cde3cee7 --- /dev/null +++ b/Final/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.url.URLSyntaxException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQSession; + +import javax.jms.JMSException; + +import junit.framework.TestCase; + +public class SimpleClusterTest extends TestCase +{ + public void testDeclareExchange() throws AMQException, JMSException, URLSyntaxException + { + AMQConnection con = new AMQConnection("localhost:9000", "guest", "guest", "test", "/test"); + AMQSession session = (AMQSession) con.createSession(false, AMQSession.NO_ACKNOWLEDGE); + System.out.println("Session created"); + session.declareExchange(new AMQShortString("my_exchange"), new AMQShortString("direct"), true); + System.out.println("Exchange declared"); + con.close(); + System.out.println("Connection closed"); + } +} diff --git a/Final/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleMemberHandleTest.java b/Final/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleMemberHandleTest.java new file mode 100644 index 0000000000..8ff8357377 --- /dev/null +++ b/Final/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleMemberHandleTest.java @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +import junit.framework.TestCase; + +public class SimpleMemberHandleTest extends TestCase +{ + public void testMatches() + { + assertMatch(new SimpleMemberHandle("localhost", 8888), new SimpleMemberHandle("localhost", 8888)); + assertNoMatch(new SimpleMemberHandle("localhost", 8889), new SimpleMemberHandle("localhost", 8888)); + assertNoMatch(new SimpleMemberHandle("localhost", 8888), new SimpleMemberHandle("localhost2", 8888)); + } + + public void testResolve() + { + assertEquivalent(new SimpleMemberHandle("WGLAIBD8XGR0J:9000"), new SimpleMemberHandle("localhost:9000")); + } + + private void assertEquivalent(MemberHandle a, MemberHandle b) + { + String msg = a + " is not equivalent to " + b; + a = SimpleMemberHandle.resolve(a); + b = SimpleMemberHandle.resolve(b); + msg += "(" + a + " does not match " + b + ")"; + assertTrue(msg, a.matches(b)); + } + + private void assertMatch(MemberHandle a, MemberHandle b) + { + assertTrue(a + " does not match " + b, a.matches(b)); + } + + private void assertNoMatch(MemberHandle a, MemberHandle b) + { + assertFalse(a + " matches " + b, a.matches(b)); + } +} diff --git a/Final/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestBroker.java b/Final/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestBroker.java new file mode 100644 index 0000000000..d3ccbf0ac6 --- /dev/null +++ b/Final/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestBroker.java @@ -0,0 +1,70 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; + +import java.io.IOException; + +class TestBroker extends Broker +{ + TestBroker(String host, int port) + { + super(host, port); + } + + boolean connect() throws IOException, InterruptedException + { + return true; + } + + void connectAsynch(Iterable<AMQMethodBody> msgs) + { + replay(msgs); + } + + void replay(Iterable<AMQMethodBody> msgs) + { + try + { + for (AMQMethodBody b : msgs) + { + send(new AMQFrame(0, b)); + } + } + catch (AMQException e) + { + throw new RuntimeException(e); + } + } + + Broker connectToCluster() throws IOException, InterruptedException + { + return this; + } + + public void send(AMQDataBlock data) throws AMQException + { + } +} diff --git a/Final/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestBrokerFactory.java b/Final/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestBrokerFactory.java new file mode 100644 index 0000000000..92eaec876a --- /dev/null +++ b/Final/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestBrokerFactory.java @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +class TestBrokerFactory implements BrokerFactory +{ + public Broker create(MemberHandle handle) + { + return new TestBroker(handle.getHost(), handle.getPort()); + } +} diff --git a/Final/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestReplayManager.java b/Final/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestReplayManager.java new file mode 100644 index 0000000000..c529c83cc0 --- /dev/null +++ b/Final/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestReplayManager.java @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.server.cluster.replay.ReplayManager; + +import java.util.ArrayList; +import java.util.List; + +class TestReplayManager implements ReplayManager +{ + private final List<AMQMethodBody> _msgs; + + TestReplayManager() + { + this(new ArrayList<AMQMethodBody>()); + } + + TestReplayManager(List<AMQMethodBody> msgs) + { + _msgs = msgs; + } + + public List<AMQMethodBody> replay(boolean isLeader) + { + return _msgs; + } +} diff --git a/Final/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestSession.java b/Final/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestSession.java new file mode 100644 index 0000000000..86ec808924 --- /dev/null +++ b/Final/java/cluster/src/test/java/org/apache/qpid/server/cluster/TestSession.java @@ -0,0 +1,269 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.mina.common.*; + +import java.net.SocketAddress; +import java.util.Set; + +class TestSession implements IoSession +{ + public IoService getService() + { + return null; //TODO + } + + public IoServiceConfig getServiceConfig() + { + return null; //TODO + } + + public IoHandler getHandler() + { + return null; //TODO + } + + public IoSessionConfig getConfig() + { + return null; //TODO + } + + public IoFilterChain getFilterChain() + { + return null; //TODO + } + + public WriteFuture write(Object message) + { + return null; //TODO + } + + public CloseFuture close() + { + return null; //TODO + } + + public Object getAttachment() + { + return null; //TODO + } + + public Object setAttachment(Object attachment) + { + return null; //TODO + } + + public Object getAttribute(String key) + { + return null; //TODO + } + + public Object setAttribute(String key, Object value) + { + return null; //TODO + } + + public Object setAttribute(String key) + { + return null; //TODO + } + + public Object removeAttribute(String key) + { + return null; //TODO + } + + public boolean containsAttribute(String key) + { + return false; //TODO + } + + public Set getAttributeKeys() + { + return null; //TODO + } + + public TransportType getTransportType() + { + return null; //TODO + } + + public boolean isConnected() + { + return false; //TODO + } + + public boolean isClosing() + { + return false; //TODO + } + + public CloseFuture getCloseFuture() + { + return null; //TODO + } + + public SocketAddress getRemoteAddress() + { + return null; //TODO + } + + public SocketAddress getLocalAddress() + { + return null; //TODO + } + + public SocketAddress getServiceAddress() + { + return null; //TODO + } + + public int getIdleTime(IdleStatus status) + { + return 0; //TODO + } + + public long getIdleTimeInMillis(IdleStatus status) + { + return 0; //TODO + } + + public void setIdleTime(IdleStatus status, int idleTime) + { + //TODO + } + + public int getWriteTimeout() + { + return 0; //TODO + } + + public long getWriteTimeoutInMillis() + { + return 0; //TODO + } + + public void setWriteTimeout(int writeTimeout) + { + //TODO + } + + public TrafficMask getTrafficMask() + { + return null; //TODO + } + + public void setTrafficMask(TrafficMask trafficMask) + { + //TODO + } + + public void suspendRead() + { + //TODO + } + + public void suspendWrite() + { + //TODO + } + + public void resumeRead() + { + //TODO + } + + public void resumeWrite() + { + //TODO + } + + public long getReadBytes() + { + return 0; //TODO + } + + public long getWrittenBytes() + { + return 0; //TODO + } + + public long getReadMessages() + { + return 0; + } + + public long getWrittenMessages() + { + return 0; + } + + public long getWrittenWriteRequests() + { + return 0; //TODO + } + + public int getScheduledWriteRequests() + { + return 0; //TODO + } + + public int getScheduledWriteBytes() + { + return 0; //TODO + } + + public long getCreationTime() + { + return 0; //TODO + } + + public long getLastIoTime() + { + return 0; //TODO + } + + public long getLastReadTime() + { + return 0; //TODO + } + + public long getLastWriteTime() + { + return 0; //TODO + } + + public boolean isIdle(IdleStatus status) + { + return false; //TODO + } + + public int getIdleCount(IdleStatus status) + { + return 0; //TODO + } + + public long getLastIdleTime(IdleStatus status) + { + return 0; //TODO + } +} |