diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2007-02-23 10:20:44 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2007-02-23 10:20:44 +0000 |
| commit | de248153d311b1e0211dfe3230afcb306f3c0192 (patch) | |
| tree | 30412df8d5fd1d3ef076fba0903301b25f8a7518 /java/common/src | |
| parent | f74e4dc27d1655760d0213fd60cc75c272c26f00 (diff) | |
| download | qpid-python-de248153d311b1e0211dfe3230afcb306f3c0192.tar.gz | |
QPID-346 Message loss after rollback
QPID-348 Problems of prefetching messages
QPID-355 Closing a consumer does not ensure messages delivery will stop for that subscription
BROKER
AMQChannel - updated requeue to either resend via the Delivery Manager not directly via msg.writedeliver.
BasicRejectMethodHandler - initial place holder.
TxRollbackHandler - Added comment
AMQMessage - added ability to record who has taken the message so that it can be resent to that subscriber on resend/requeue.
AMQQueue - added the queue reference to the Subscription creation
ConcurrentSelectorDeliveryManager - Added methods to correctly monitor the size of queue messages. Including messages on the resend queue of a Subscriber. Additional locking to ensure that messages are not sent to the subscriber after Closure. QPID-355
DeliveryManager - adjusted deliver call to allow delivery to the head of the queue.
Subscription - changes to allow selction of queue(resend or predelivery) methods to add to resend and getSendLock to ensure that sending to the Subscription is allowed.
SubscriptionFactory - changes to allow the AMQQueue to be passed to the Subscription.
SubscriptionImpl - implementation of the interfaces. Local storage of messages to be resent and requeuing of the messages during closure.
SubscriptionSet - changes to retrieve the actual stored Subscription when performing removeSubscriber. So we have access to the the resend queue.
AMQStateManager - Added BasicRejectMethodHandler
TransactionalContext - Added option to deliver the messages to the front of the queue.
LocalTransactionalContext - cleared the _postComitDeliveryList on rollback. Added option to deliver the messages to the front of the queue.
NonTransactionalContext - Added option to deliver the messages to the front of the queue.
DeliverMessageOperation.java DELELTED AS NOT USED.
CLIENT
AMQSession - added ability to get the pervious state of the dispatcher when settting Stopped, fixed the channel suspension problems on broker so uncommented clean up code in rollback and recover.
BasicMessageConsumer - updated the rollback so that it sends reject messages to server.
AbstractJMSMessage - whitespace + added extra message properties to the toString()
AMQProtocolHandler - whitespace + extra debug output
TransactedTest - updated expect to prevent NPEs also added extra logging to help understand what is going on.
CLUSTER
ClusteredQueue - AMQQueue changes for message deliveryFirst.
RemoteSubscriptionImpl - Implementation of Subscription
SYSTESTS
AbstractHeadersExchangeTestBase - AMQQueue changes for message deliveryFirst.
AMQQueueMBeanTest - changes for message deliveryFirst.
ConcurrencyTest - changes for message deliveryFirst.
DeliveryManagerTest - changes for message deliveryFirst.
SubscriptionTestHelper - Implementation of Subscription
WhiteSpace only
UnacknowledgedMessageMapImpl.java
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@510897 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src')
| -rw-r--r-- | java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java | 197 | ||||
| -rw-r--r-- | java/common/src/main/java/org/apache/qpid/util/MessageQueue.java | 30 |
2 files changed, 227 insertions, 0 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java b/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java new file mode 100644 index 0000000000..cdf686b4cb --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.util; + +import org.apache.log4j.Logger; + +import java.util.Queue; +import java.util.Collection; +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicInteger; + +public class ConcurrentLinkedMessageQueueAtomicSize<E> extends ConcurrentLinkedQueueAtomicSize<E> implements MessageQueue<E> +{ + private static final Logger _logger = Logger.getLogger(ConcurrentLinkedMessageQueueAtomicSize.class); + + protected Queue<E> _messageHead = new ConcurrentLinkedQueueAtomicSize<E>(); + + protected AtomicInteger _messageHeadSize = new AtomicInteger(0); + + @Override + public int size() + { + return super.size() + _messageHeadSize.get(); + } + + @Override + public E poll() + { + if (_messageHead.isEmpty()) + { + return super.poll(); + } + else + { + _logger.debug("Providing item from message head"); + + E e = _messageHead.poll(); + + if (e != null) + { + _messageHeadSize.decrementAndGet(); + } + + return e; + } + } + + @Override + public boolean remove(Object o) + { + + if (_messageHead.isEmpty()) + { + return super.remove(o); + } + else + { + if (_messageHead.remove(o)) + { + _messageHeadSize.decrementAndGet(); + return true; + } + + return super.remove(o); + } + } + + @Override + public boolean removeAll(Collection<?> c) + { + if (_messageHead.isEmpty()) + { + return super.removeAll(c); + } + else + { + //fixme this is super.removeAll but iterator here doesn't work + // we need to be able to correctly decrement _messageHeadSize +// boolean modified = false; +// Iterator<?> e = iterator(); +// while (e.hasNext()) +// { +// if (c.contains(e.next())) +// { +// e.remove(); +// modified = true; +// _size.decrementAndGet(); +// } +// } +// return modified; + + throw new RuntimeException("Not implemented"); + } + } + + + @Override + public boolean isEmpty() + { + return (_messageHead.isEmpty() && super.isEmpty()); + } + + @Override + public void clear() + { + super.clear(); + _messageHead.clear(); + } + + @Override + public boolean contains(Object o) + { + return _messageHead.contains(o) || super.contains(o); + } + + @Override + public boolean containsAll(Collection<?> o) + { + return _messageHead.containsAll(o) || super.containsAll(o); + } + + @Override + public E element() + { + if (_messageHead.isEmpty()) + { + return super.element(); + } + else + { + return _messageHead.element(); + } + } + + @Override + public E peek() + { + if (_messageHead.isEmpty()) + { + return super.peek(); + } + else + { + _logger.debug("Providing item from message head"); + return _messageHead.peek(); + } + + } + + @Override + public Iterator<E> iterator() + { + throw new RuntimeException("Not Implemented"); + + } + + @Override + public boolean retainAll(Collection<?> c) + { + throw new RuntimeException("Not Implemented"); + } + + @Override + public Object[] toArray() + { + throw new RuntimeException("Not Implemented"); + } + + public boolean pushHead(E o) + { + _logger.debug("Adding item to head of queue"); + if (_messageHead.offer(o)) + { + _messageHeadSize.incrementAndGet(); + return true; + } + return false; + } +}
\ No newline at end of file diff --git a/java/common/src/main/java/org/apache/qpid/util/MessageQueue.java b/java/common/src/main/java/org/apache/qpid/util/MessageQueue.java new file mode 100644 index 0000000000..9cf3319374 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/util/MessageQueue.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.util; + +import java.util.Queue; + +public interface MessageQueue<E> extends Queue<E> +{ + + boolean pushHead(E o); + +} |
