diff options
author | Robert Greig <rgreig@apache.org> | 2007-01-07 23:11:53 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2007-01-07 23:11:53 +0000 |
commit | 26d0286ef84e00fd27206b5e23aff5c54309a975 (patch) | |
tree | b3ffd1ef57cbbc31faaf95466f91418421d44032 /java/cluster/src | |
parent | 189816d88cc72f1053a7e7685b18883669c53d57 (diff) | |
download | qpid-python-26d0286ef84e00fd27206b5e23aff5c54309a975.tar.gz |
QPID-32: new model for holding and processing message in memory to support new persistent stores
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@493872 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/cluster/src')
9 files changed, 85 insertions, 69 deletions
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java index 5209df59cd..a7936be8db 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java @@ -22,16 +22,10 @@ package org.apache.qpid.server.cluster; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.ClusterJoinBody; -import org.apache.qpid.framing.ClusterLeaveBody; -import org.apache.qpid.framing.ClusterMembershipBody; -import org.apache.qpid.framing.ClusterPingBody; -import org.apache.qpid.framing.ClusterSuspectBody; -import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.*; import org.apache.qpid.server.cluster.policy.StandardPolicies; import org.apache.qpid.server.cluster.replay.ReplayManager; import org.apache.qpid.server.cluster.util.LogMessage; -import org.apache.qpid.server.cluster.util.InvokeMultiple; import java.util.List; @@ -96,7 +90,7 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, Broker destination = findBroker(broker); if(destination == null) { - _logger.warn(new LogMessage("Invalid destination sending {0}. {1} not known", message, broker)); + _logger.warn(new LogMessage("Invalid destination sending {0}. {1} not known", message, broker)); } else { @@ -119,7 +113,7 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, ping.responseRequired = true; ping.load = _loadTable.getLocalLoad(); BlockingHandler handler = new BlockingHandler(); - send(getLeader(), new SimpleSendable(ping), handler); + send(getLeader(), new SimpleBodySendable(ping), handler); handler.waitForCompletion(); if (handler.failed()) { @@ -164,7 +158,7 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. ClusterJoinBody join = new ClusterJoinBody((byte)8, (byte)0); join.broker = _group.getLocal().getDetails(); - send(leader, new SimpleSendable(join)); + send(leader, new SimpleBodySendable(join)); } private Broker connectToLeader(MemberHandle member) throws AMQException @@ -185,7 +179,7 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. ClusterLeaveBody leave = new ClusterLeaveBody((byte)8, (byte)0); leave.broker = _group.getLocal().getDetails(); - send(getLeader(), new SimpleSendable(leave)); + send(getLeader(), new SimpleBodySendable(leave)); } private void suspect(MemberHandle broker) throws AMQException @@ -208,7 +202,7 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. ClusterSuspectBody suspect = new ClusterSuspectBody((byte)8, (byte)0); suspect.broker = broker.getDetails(); - send(getLeader(), new SimpleSendable(suspect)); + send(getLeader(), new SimpleBodySendable(suspect)); } } @@ -233,7 +227,7 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, ClusterJoinBody request = new ClusterJoinBody((byte)8, (byte)0); request.broker = member.getDetails(); Broker leader = getLeader(); - send(leader, new SimpleSendable(request)); + send(leader, new SimpleBodySendable(request)); _logger.info(new LogMessage("Passed join request for {0} to {1}", member, leader)); } } @@ -287,7 +281,7 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener, { String membership = SimpleMemberHandle.membersToString(_group.getMembers()); ClusterMembershipBody announce = createAnnouncement(membership); - broadcast(new SimpleSendable(announce)); + broadcast(new SimpleBodySendable(announce)); _logger.info(new LogMessage("Membership announcement sent: {0}", membership)); } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleBodySendable.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleBodySendable.java new file mode 100644 index 0000000000..f7c40c60b3 --- /dev/null +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleBodySendable.java @@ -0,0 +1,45 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.cluster; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQFrame; + +/** + */ +public class SimpleBodySendable implements Sendable +{ + private final AMQBody _body; + + public SimpleBodySendable(AMQBody body) + { + _body = body; + } + + public void send(int channel, Member member) throws AMQException + { + member.send(new AMQFrame(channel, _body)); + } + + public String toString() + { + return _body.toString(); + } + +} diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java index 34b5cd829d..51c57efdae 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java @@ -21,36 +21,29 @@ package org.apache.qpid.server.cluster; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQBody; import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.server.queue.AMQMessage; -import java.util.Arrays; -import java.util.List; +import java.util.Iterator; public class SimpleSendable implements Sendable { - private final List<AMQBody> _bodies; + private final AMQMessage _message; - public SimpleSendable(AMQBody body) + public SimpleSendable(AMQMessage message) { - this(Arrays.asList(body)); - } - - public SimpleSendable(List<AMQBody> bodies) - { - _bodies = bodies; + _message = message; } public void send(int channel, Member member) throws AMQException { - for (AMQBody body : _bodies) + member.send(new AMQFrame(channel, _message.getPublishBody())); + member.send(new AMQFrame(channel, _message.getContentHeaderBody())); + Iterator<ContentBody> it = _message.getContentBodyIterator(); + while (it.hasNext()) { - member.send(new AMQFrame(channel, body)); + member.send(new AMQFrame(channel, it.next())); } } - - public String toString() - { - return _bodies.toString(); - } } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java index db340c6a61..448202cd5e 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java @@ -23,13 +23,8 @@ package org.apache.qpid.server.cluster.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.server.cluster.BroadcastPolicy; -import org.apache.qpid.server.cluster.ClusteredProtocolSession; -import org.apache.qpid.server.cluster.GroupManager; -import org.apache.qpid.server.cluster.GroupResponseHandler; import org.apache.qpid.server.cluster.util.LogMessage; -import org.apache.qpid.server.cluster.Member; -import org.apache.qpid.server.cluster.SimpleSendable; +import org.apache.qpid.server.cluster.*; import org.apache.qpid.server.cluster.policy.StandardPolicies; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQMethodEvent; @@ -81,13 +76,13 @@ class ReplicatingHandler<A extends AMQMethodBody> extends ClusterMethodHandler<A if (_policy == null) { //asynch delivery - _groupMgr.broadcast(new SimpleSendable(evt.getMethod())); + _groupMgr.broadcast(new SimpleBodySendable(evt.getMethod())); local(stateMgr, queues, exchanges, session, evt); } else { Callback callback = new Callback(stateMgr, queues, exchanges, session, evt); - _groupMgr.broadcast(new SimpleSendable(evt.getMethod()), _policy, callback); + _groupMgr.broadcast(new SimpleBodySendable(evt.getMethod()), _policy, callback); } _logger.debug(new LogMessage("Replicated {0} to peers", evt.getMethod())); } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java index 8765aebf77..50b2fa0b66 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java @@ -27,6 +27,7 @@ import org.apache.qpid.framing.QueueDeleteBody; import org.apache.qpid.server.cluster.*; import org.apache.qpid.server.cluster.util.LogMessage; import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.store.StoreContext; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; @@ -61,10 +62,10 @@ public class ClusteredQueue extends AMQQueue _subscriptions = ((ClusteredSubscriptionManager) getSubscribers()).getAllSubscribers(); } - public void deliver(AMQMessage message) throws AMQException + public void process(StoreContext storeContext, AMQMessage msg) throws AMQException { - _logger.info(new LogMessage("{0} delivered to clustered queue {1}", message, this)); - super.deliver(message); + _logger.info(new LogMessage("{0} delivered to clustered queue {1}", msg, this)); + super.process(storeContext, msg); } protected void autodelete() throws AMQException @@ -79,7 +80,7 @@ public class ClusteredQueue extends AMQQueue // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. QueueDeleteBody request = new QueueDeleteBody((byte)8, (byte)0); request.queue = getName(); - _groupMgr.broadcast(new SimpleSendable(request)); + _groupMgr.broadcast(new SimpleBodySendable(request)); } } @@ -93,7 +94,7 @@ public class ClusteredQueue extends AMQQueue // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. BasicCancelBody request = new BasicCancelBody((byte)8, (byte)0); request.consumerTag = getName(); - _groupMgr.broadcast(new SimpleSendable(request)); + _groupMgr.broadcast(new SimpleBodySendable(request)); } public void addRemoteSubcriber(MemberHandle peer) diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java index 94f17cb9d3..8315d46b5d 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; import org.apache.qpid.server.cluster.SimpleSendable; import org.apache.qpid.server.cluster.GroupManager; +import org.apache.qpid.server.cluster.SimpleBodySendable; import org.apache.qpid.framing.QueueDeleteBody; import java.util.concurrent.Executor; @@ -60,6 +61,6 @@ public class PrivateQueue extends AMQQueue // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. QueueDeleteBody request = new QueueDeleteBody((byte)8, (byte)0); request.queue = getName(); - _groupMgr.broadcast(new SimpleSendable(request)); + _groupMgr.broadcast(new SimpleBodySendable(request)); } } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java index 752cf05a82..f8eba282e2 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java @@ -22,18 +22,13 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQBody; import org.apache.qpid.framing.BasicPublishBody; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.server.cluster.ClusteredProtocolSession; import org.apache.qpid.server.cluster.GroupManager; -import org.apache.qpid.server.cluster.util.LogMessage; import org.apache.qpid.server.cluster.MemberHandle; import org.apache.qpid.server.cluster.SimpleSendable; +import org.apache.qpid.server.cluster.util.LogMessage; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.Executor; /** @@ -93,17 +88,9 @@ public class RemoteQueueProxy extends AMQQueue void relay(AMQMessage msg) throws AMQException { BasicPublishBody publish = msg.getPublishBody(); - ContentHeaderBody header = msg.getContentHeaderBody(); - List<ContentBody> bodies = msg.getContentBodies(); - - //(i) construct a new publishing block: - publish.immediate = false;//can't as yet handle the immediate flag in a cluster - List<AMQBody> parts = new ArrayList<AMQBody>(2 + bodies.size()); - parts.add(publish); - parts.add(header); - parts.addAll(bodies); + publish.immediate = false; //can't as yet handle the immediate flag in a cluster - //(ii) send this on to the broker for which it is acting as proxy: - _groupMgr.send(_target, new SimpleSendable(parts)); + // send this on to the broker for which it is acting as proxy: + _groupMgr.send(_target, new SimpleSendable(msg)); } } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java index c751e4a011..ef5239fc87 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java @@ -55,7 +55,7 @@ class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManage { try { - _groupMgr.send(_peer, new SimpleSendable(msg.getPayload())); + _groupMgr.send(_peer, new SimpleSendable(msg)); } catch (AMQException e) { diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java index ed18710c64..20e092bde7 100644 --- a/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java +++ b/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java @@ -45,7 +45,7 @@ public class BrokerTest extends TestCase new RecordingBroker("C", 3) }; GroupResponseValidator handler = new GroupResponseValidator(new TestMethod("response"), new ArrayList<Member>(Arrays.asList(brokers))); - GroupRequest grpRequest = new GroupRequest(new SimpleSendable(new TestMethod("request")), StandardPolicies.SYNCH_POLICY, handler); + GroupRequest grpRequest = new GroupRequest(new SimpleBodySendable(new TestMethod("request")), StandardPolicies.SYNCH_POLICY, handler); for (Broker b : brokers) { b.invoke(grpRequest); @@ -70,7 +70,7 @@ public class BrokerTest extends TestCase RecordingBroker[] succeeded = new RecordingBroker[]{a, c}; GroupResponseValidator handler = new GroupResponseValidator(new TestMethod("response"), new ArrayList<Member>(Arrays.asList(succeeded))); - GroupRequest grpRequest = new GroupRequest(new SimpleSendable(new TestMethod("request")), StandardPolicies.SYNCH_POLICY, handler); + GroupRequest grpRequest = new GroupRequest(new SimpleBodySendable(new TestMethod("request")), StandardPolicies.SYNCH_POLICY, handler); for (Broker broker : all) { @@ -99,7 +99,7 @@ public class BrokerTest extends TestCase RecordingBroker broker = new RecordingBroker("myhost", 1); for (AMQBody msg : msgs) { - broker.send(new SimpleSendable(msg), null); + broker.send(new SimpleBodySendable(msg), null); } List<AMQDataBlock> sent = broker.getMessages(); assertEquals(msgs.length, sent.size()); @@ -115,7 +115,7 @@ public class BrokerTest extends TestCase { RecordingBroker broker = new RecordingBroker("myhost", 1); BlockingHandler handler = new BlockingHandler(); - broker.send(new SimpleSendable(new TestMethod("A")), handler); + broker.send(new SimpleBodySendable(new TestMethod("A")), handler); List<AMQDataBlock> sent = broker.getMessages(); assertEquals(1, sent.size()); assertTrue(sent.get(0) instanceof AMQFrame); @@ -131,7 +131,7 @@ public class BrokerTest extends TestCase { RecordingBroker broker = new RecordingBroker("myhost", 1); BlockingHandler handler = new BlockingHandler(); - broker.send(new SimpleSendable(new TestMethod("A")), handler); + broker.send(new SimpleBodySendable(new TestMethod("A")), handler); List<AMQDataBlock> sent = broker.getMessages(); assertEquals(1, sent.size()); assertTrue(sent.get(0) instanceof AMQFrame); |