summaryrefslogtreecommitdiff
path: root/java/cluster/src
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2007-01-07 23:11:53 +0000
committerRobert Greig <rgreig@apache.org>2007-01-07 23:11:53 +0000
commit26d0286ef84e00fd27206b5e23aff5c54309a975 (patch)
treeb3ffd1ef57cbbc31faaf95466f91418421d44032 /java/cluster/src
parent189816d88cc72f1053a7e7685b18883669c53d57 (diff)
downloadqpid-python-26d0286ef84e00fd27206b5e23aff5c54309a975.tar.gz
QPID-32: new model for holding and processing message in memory to support new persistent stores
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@493872 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/cluster/src')
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java22
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleBodySendable.java45
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java29
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java11
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java11
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java3
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java21
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java2
-rw-r--r--java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java10
9 files changed, 85 insertions, 69 deletions
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
index 5209df59cd..a7936be8db 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
@@ -22,16 +22,10 @@ package org.apache.qpid.server.cluster;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.ClusterJoinBody;
-import org.apache.qpid.framing.ClusterLeaveBody;
-import org.apache.qpid.framing.ClusterMembershipBody;
-import org.apache.qpid.framing.ClusterPingBody;
-import org.apache.qpid.framing.ClusterSuspectBody;
-import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.*;
import org.apache.qpid.server.cluster.policy.StandardPolicies;
import org.apache.qpid.server.cluster.replay.ReplayManager;
import org.apache.qpid.server.cluster.util.LogMessage;
-import org.apache.qpid.server.cluster.util.InvokeMultiple;
import java.util.List;
@@ -96,7 +90,7 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
Broker destination = findBroker(broker);
if(destination == null)
{
- _logger.warn(new LogMessage("Invalid destination sending {0}. {1} not known", message, broker));
+ _logger.warn(new LogMessage("Invalid destination sending {0}. {1} not known", message, broker));
}
else
{
@@ -119,7 +113,7 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
ping.responseRequired = true;
ping.load = _loadTable.getLocalLoad();
BlockingHandler handler = new BlockingHandler();
- send(getLeader(), new SimpleSendable(ping), handler);
+ send(getLeader(), new SimpleBodySendable(ping), handler);
handler.waitForCompletion();
if (handler.failed())
{
@@ -164,7 +158,7 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
ClusterJoinBody join = new ClusterJoinBody((byte)8, (byte)0);
join.broker = _group.getLocal().getDetails();
- send(leader, new SimpleSendable(join));
+ send(leader, new SimpleBodySendable(join));
}
private Broker connectToLeader(MemberHandle member) throws AMQException
@@ -185,7 +179,7 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
ClusterLeaveBody leave = new ClusterLeaveBody((byte)8, (byte)0);
leave.broker = _group.getLocal().getDetails();
- send(getLeader(), new SimpleSendable(leave));
+ send(getLeader(), new SimpleBodySendable(leave));
}
private void suspect(MemberHandle broker) throws AMQException
@@ -208,7 +202,7 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
ClusterSuspectBody suspect = new ClusterSuspectBody((byte)8, (byte)0);
suspect.broker = broker.getDetails();
- send(getLeader(), new SimpleSendable(suspect));
+ send(getLeader(), new SimpleBodySendable(suspect));
}
}
@@ -233,7 +227,7 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
ClusterJoinBody request = new ClusterJoinBody((byte)8, (byte)0);
request.broker = member.getDetails();
Broker leader = getLeader();
- send(leader, new SimpleSendable(request));
+ send(leader, new SimpleBodySendable(request));
_logger.info(new LogMessage("Passed join request for {0} to {1}", member, leader));
}
}
@@ -287,7 +281,7 @@ public class DefaultGroupManager implements GroupManager, MemberFailureListener,
{
String membership = SimpleMemberHandle.membersToString(_group.getMembers());
ClusterMembershipBody announce = createAnnouncement(membership);
- broadcast(new SimpleSendable(announce));
+ broadcast(new SimpleBodySendable(announce));
_logger.info(new LogMessage("Membership announcement sent: {0}", membership));
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleBodySendable.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleBodySendable.java
new file mode 100644
index 0000000000..f7c40c60b3
--- /dev/null
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleBodySendable.java
@@ -0,0 +1,45 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.cluster;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQFrame;
+
+/**
+ */
+public class SimpleBodySendable implements Sendable
+{
+ private final AMQBody _body;
+
+ public SimpleBodySendable(AMQBody body)
+ {
+ _body = body;
+ }
+
+ public void send(int channel, Member member) throws AMQException
+ {
+ member.send(new AMQFrame(channel, _body));
+ }
+
+ public String toString()
+ {
+ return _body.toString();
+ }
+
+}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java
index 34b5cd829d..51c57efdae 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java
@@ -21,36 +21,29 @@
package org.apache.qpid.server.cluster;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.server.queue.AMQMessage;
-import java.util.Arrays;
-import java.util.List;
+import java.util.Iterator;
public class SimpleSendable implements Sendable
{
- private final List<AMQBody> _bodies;
+ private final AMQMessage _message;
- public SimpleSendable(AMQBody body)
+ public SimpleSendable(AMQMessage message)
{
- this(Arrays.asList(body));
- }
-
- public SimpleSendable(List<AMQBody> bodies)
- {
- _bodies = bodies;
+ _message = message;
}
public void send(int channel, Member member) throws AMQException
{
- for (AMQBody body : _bodies)
+ member.send(new AMQFrame(channel, _message.getPublishBody()));
+ member.send(new AMQFrame(channel, _message.getContentHeaderBody()));
+ Iterator<ContentBody> it = _message.getContentBodyIterator();
+ while (it.hasNext())
{
- member.send(new AMQFrame(channel, body));
+ member.send(new AMQFrame(channel, it.next()));
}
}
-
- public String toString()
- {
- return _bodies.toString();
- }
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java
index db340c6a61..448202cd5e 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java
@@ -23,13 +23,8 @@ package org.apache.qpid.server.cluster.handler;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.server.cluster.BroadcastPolicy;
-import org.apache.qpid.server.cluster.ClusteredProtocolSession;
-import org.apache.qpid.server.cluster.GroupManager;
-import org.apache.qpid.server.cluster.GroupResponseHandler;
import org.apache.qpid.server.cluster.util.LogMessage;
-import org.apache.qpid.server.cluster.Member;
-import org.apache.qpid.server.cluster.SimpleSendable;
+import org.apache.qpid.server.cluster.*;
import org.apache.qpid.server.cluster.policy.StandardPolicies;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQMethodEvent;
@@ -81,13 +76,13 @@ class ReplicatingHandler<A extends AMQMethodBody> extends ClusterMethodHandler<A
if (_policy == null)
{
//asynch delivery
- _groupMgr.broadcast(new SimpleSendable(evt.getMethod()));
+ _groupMgr.broadcast(new SimpleBodySendable(evt.getMethod()));
local(stateMgr, queues, exchanges, session, evt);
}
else
{
Callback callback = new Callback(stateMgr, queues, exchanges, session, evt);
- _groupMgr.broadcast(new SimpleSendable(evt.getMethod()), _policy, callback);
+ _groupMgr.broadcast(new SimpleBodySendable(evt.getMethod()), _policy, callback);
}
_logger.debug(new LogMessage("Replicated {0} to peers", evt.getMethod()));
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
index 8765aebf77..50b2fa0b66 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
@@ -27,6 +27,7 @@ import org.apache.qpid.framing.QueueDeleteBody;
import org.apache.qpid.server.cluster.*;
import org.apache.qpid.server.cluster.util.LogMessage;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.store.StoreContext;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
@@ -61,10 +62,10 @@ public class ClusteredQueue extends AMQQueue
_subscriptions = ((ClusteredSubscriptionManager) getSubscribers()).getAllSubscribers();
}
- public void deliver(AMQMessage message) throws AMQException
+ public void process(StoreContext storeContext, AMQMessage msg) throws AMQException
{
- _logger.info(new LogMessage("{0} delivered to clustered queue {1}", message, this));
- super.deliver(message);
+ _logger.info(new LogMessage("{0} delivered to clustered queue {1}", msg, this));
+ super.process(storeContext, msg);
}
protected void autodelete() throws AMQException
@@ -79,7 +80,7 @@ public class ClusteredQueue extends AMQQueue
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
QueueDeleteBody request = new QueueDeleteBody((byte)8, (byte)0);
request.queue = getName();
- _groupMgr.broadcast(new SimpleSendable(request));
+ _groupMgr.broadcast(new SimpleBodySendable(request));
}
}
@@ -93,7 +94,7 @@ public class ClusteredQueue extends AMQQueue
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
BasicCancelBody request = new BasicCancelBody((byte)8, (byte)0);
request.consumerTag = getName();
- _groupMgr.broadcast(new SimpleSendable(request));
+ _groupMgr.broadcast(new SimpleBodySendable(request));
}
public void addRemoteSubcriber(MemberHandle peer)
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java
index 94f17cb9d3..8315d46b5d 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.cluster.SimpleSendable;
import org.apache.qpid.server.cluster.GroupManager;
+import org.apache.qpid.server.cluster.SimpleBodySendable;
import org.apache.qpid.framing.QueueDeleteBody;
import java.util.concurrent.Executor;
@@ -60,6 +61,6 @@ public class PrivateQueue extends AMQQueue
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
QueueDeleteBody request = new QueueDeleteBody((byte)8, (byte)0);
request.queue = getName();
- _groupMgr.broadcast(new SimpleSendable(request));
+ _groupMgr.broadcast(new SimpleBodySendable(request));
}
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java
index 752cf05a82..f8eba282e2 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java
@@ -22,18 +22,13 @@ package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.server.cluster.ClusteredProtocolSession;
import org.apache.qpid.server.cluster.GroupManager;
-import org.apache.qpid.server.cluster.util.LogMessage;
import org.apache.qpid.server.cluster.MemberHandle;
import org.apache.qpid.server.cluster.SimpleSendable;
+import org.apache.qpid.server.cluster.util.LogMessage;
-import java.util.ArrayList;
-import java.util.List;
import java.util.concurrent.Executor;
/**
@@ -93,17 +88,9 @@ public class RemoteQueueProxy extends AMQQueue
void relay(AMQMessage msg) throws AMQException
{
BasicPublishBody publish = msg.getPublishBody();
- ContentHeaderBody header = msg.getContentHeaderBody();
- List<ContentBody> bodies = msg.getContentBodies();
-
- //(i) construct a new publishing block:
- publish.immediate = false;//can't as yet handle the immediate flag in a cluster
- List<AMQBody> parts = new ArrayList<AMQBody>(2 + bodies.size());
- parts.add(publish);
- parts.add(header);
- parts.addAll(bodies);
+ publish.immediate = false; //can't as yet handle the immediate flag in a cluster
- //(ii) send this on to the broker for which it is acting as proxy:
- _groupMgr.send(_target, new SimpleSendable(parts));
+ // send this on to the broker for which it is acting as proxy:
+ _groupMgr.send(_target, new SimpleSendable(msg));
}
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
index c751e4a011..ef5239fc87 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
@@ -55,7 +55,7 @@ class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManage
{
try
{
- _groupMgr.send(_peer, new SimpleSendable(msg.getPayload()));
+ _groupMgr.send(_peer, new SimpleSendable(msg));
}
catch (AMQException e)
{
diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java
index ed18710c64..20e092bde7 100644
--- a/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java
+++ b/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java
@@ -45,7 +45,7 @@ public class BrokerTest extends TestCase
new RecordingBroker("C", 3)
};
GroupResponseValidator handler = new GroupResponseValidator(new TestMethod("response"), new ArrayList<Member>(Arrays.asList(brokers)));
- GroupRequest grpRequest = new GroupRequest(new SimpleSendable(new TestMethod("request")), StandardPolicies.SYNCH_POLICY, handler);
+ GroupRequest grpRequest = new GroupRequest(new SimpleBodySendable(new TestMethod("request")), StandardPolicies.SYNCH_POLICY, handler);
for (Broker b : brokers)
{
b.invoke(grpRequest);
@@ -70,7 +70,7 @@ public class BrokerTest extends TestCase
RecordingBroker[] succeeded = new RecordingBroker[]{a, c};
GroupResponseValidator handler = new GroupResponseValidator(new TestMethod("response"), new ArrayList<Member>(Arrays.asList(succeeded)));
- GroupRequest grpRequest = new GroupRequest(new SimpleSendable(new TestMethod("request")), StandardPolicies.SYNCH_POLICY, handler);
+ GroupRequest grpRequest = new GroupRequest(new SimpleBodySendable(new TestMethod("request")), StandardPolicies.SYNCH_POLICY, handler);
for (Broker broker : all)
{
@@ -99,7 +99,7 @@ public class BrokerTest extends TestCase
RecordingBroker broker = new RecordingBroker("myhost", 1);
for (AMQBody msg : msgs)
{
- broker.send(new SimpleSendable(msg), null);
+ broker.send(new SimpleBodySendable(msg), null);
}
List<AMQDataBlock> sent = broker.getMessages();
assertEquals(msgs.length, sent.size());
@@ -115,7 +115,7 @@ public class BrokerTest extends TestCase
{
RecordingBroker broker = new RecordingBroker("myhost", 1);
BlockingHandler handler = new BlockingHandler();
- broker.send(new SimpleSendable(new TestMethod("A")), handler);
+ broker.send(new SimpleBodySendable(new TestMethod("A")), handler);
List<AMQDataBlock> sent = broker.getMessages();
assertEquals(1, sent.size());
assertTrue(sent.get(0) instanceof AMQFrame);
@@ -131,7 +131,7 @@ public class BrokerTest extends TestCase
{
RecordingBroker broker = new RecordingBroker("myhost", 1);
BlockingHandler handler = new BlockingHandler();
- broker.send(new SimpleSendable(new TestMethod("A")), handler);
+ broker.send(new SimpleBodySendable(new TestMethod("A")), handler);
List<AMQDataBlock> sent = broker.getMessages();
assertEquals(1, sent.size());
assertTrue(sent.get(0) instanceof AMQFrame);