summaryrefslogtreecommitdiff
path: root/java/cluster
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-02-23 10:20:44 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-02-23 10:20:44 +0000
commitde248153d311b1e0211dfe3230afcb306f3c0192 (patch)
tree30412df8d5fd1d3ef076fba0903301b25f8a7518 /java/cluster
parentf74e4dc27d1655760d0213fd60cc75c272c26f00 (diff)
downloadqpid-python-de248153d311b1e0211dfe3230afcb306f3c0192.tar.gz
QPID-346 Message loss after rollback
QPID-348 Problems of prefetching messages QPID-355 Closing a consumer does not ensure messages delivery will stop for that subscription BROKER AMQChannel - updated requeue to either resend via the Delivery Manager not directly via msg.writedeliver. BasicRejectMethodHandler - initial place holder. TxRollbackHandler - Added comment AMQMessage - added ability to record who has taken the message so that it can be resent to that subscriber on resend/requeue. AMQQueue - added the queue reference to the Subscription creation ConcurrentSelectorDeliveryManager - Added methods to correctly monitor the size of queue messages. Including messages on the resend queue of a Subscriber. Additional locking to ensure that messages are not sent to the subscriber after Closure. QPID-355 DeliveryManager - adjusted deliver call to allow delivery to the head of the queue. Subscription - changes to allow selction of queue(resend or predelivery) methods to add to resend and getSendLock to ensure that sending to the Subscription is allowed. SubscriptionFactory - changes to allow the AMQQueue to be passed to the Subscription. SubscriptionImpl - implementation of the interfaces. Local storage of messages to be resent and requeuing of the messages during closure. SubscriptionSet - changes to retrieve the actual stored Subscription when performing removeSubscriber. So we have access to the the resend queue. AMQStateManager - Added BasicRejectMethodHandler TransactionalContext - Added option to deliver the messages to the front of the queue. LocalTransactionalContext - cleared the _postComitDeliveryList on rollback. Added option to deliver the messages to the front of the queue. NonTransactionalContext - Added option to deliver the messages to the front of the queue. DeliverMessageOperation.java DELELTED AS NOT USED. CLIENT AMQSession - added ability to get the pervious state of the dispatcher when settting Stopped, fixed the channel suspension problems on broker so uncommented clean up code in rollback and recover. BasicMessageConsumer - updated the rollback so that it sends reject messages to server. AbstractJMSMessage - whitespace + added extra message properties to the toString() AMQProtocolHandler - whitespace + extra debug output TransactedTest - updated expect to prevent NPEs also added extra logging to help understand what is going on. CLUSTER ClusteredQueue - AMQQueue changes for message deliveryFirst. RemoteSubscriptionImpl - Implementation of Subscription SYSTESTS AbstractHeadersExchangeTestBase - AMQQueue changes for message deliveryFirst. AMQQueueMBeanTest - changes for message deliveryFirst. ConcurrencyTest - changes for message deliveryFirst. DeliveryManagerTest - changes for message deliveryFirst. SubscriptionTestHelper - Implementation of Subscription WhiteSpace only UnacknowledgedMessageMapImpl.java git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@510897 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/cluster')
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java5
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java27
2 files changed, 28 insertions, 4 deletions
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
index ecabd9320a..9fa96ece1e 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
@@ -32,7 +32,6 @@ import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
/**
* Represents a shared queue in a cluster. The key difference is that as well as any
@@ -56,10 +55,10 @@ public class ClusteredQueue extends AMQQueue
}
- public void process(StoreContext storeContext, AMQMessage msg) throws AMQException
+ public void process(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException
{
_logger.info(new LogMessage("{0} delivered to clustered queue {1}", msg, this));
- super.process(storeContext, msg);
+ super.process(storeContext, msg, deliverFirst);
}
protected void autodelete() throws AMQException
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
index 364aea81c0..a5ace41752 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
@@ -117,7 +117,17 @@ class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManage
return null;
}
- public void enqueueForPreDelivery(AMQMessage msg)
+ public Queue<AMQMessage> getResendQueue()
+ {
+ return null;
+ }
+
+ public Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages)
+ {
+ return messages;
+ }
+
+ public void enqueueForPreDelivery(AMQMessage msg, boolean deliverFirst)
{
//no-op -- if selectors are implemented on RemoteSubscriptions then look at SubscriptionImpl
}
@@ -132,6 +142,11 @@ class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManage
//no-op
}
+ public boolean isClosed()
+ {
+ return false;
+ }
+
public boolean isBrowser()
{
return false;
@@ -142,4 +157,14 @@ class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManage
return _suspended;
}
+ public void addToResendQueue(AMQMessage msg)
+ {
+ //no-op
+ }
+
+ public Object getSendLock()
+ {
+ return new Object();
+ }
+
}