summaryrefslogtreecommitdiff
path: root/java/common/src
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-03-13 10:35:42 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-03-13 10:35:42 +0000
commitc4f018f7c10d2169ced4c59e776844ee5cf52d33 (patch)
tree421d85cf41bf382bab618587298d9d6bef825bdc /java/common/src
parent685ad5615e73f02f76f69841162fb9aa126892d2 (diff)
downloadqpid-python-c4f018f7c10d2169ced4c59e776844ee5cf52d33.tar.gz
QPID-346,QPID-386,QPID-403, QPID-410 Rollback, Basic-Reject, QueueBrowser NO_ACK.
QPID-346 Message loss after rollback\recover QPID-386 Updated Transactional Tests to cover underlying AMQP/Qpid state. QPID-403 Implement Basic.Reject QPID-410 Queue Browsers should use not acknowledge messages. ------------------------------------- Broker TxAck - Added comment and fixed white space UnacknowledgedMessage - Added comment for messageDecrement AMQChannel - Added extra debugging. + Created a NonTransactionalContext for requeuing messages as using txContext will tie the requeue to any runing transaction. + Updated message reference counting. So it is in terms of queues don't increment when giving to client. BasicCancelMethodHandler - Added Debug log. BasicConsumeMethodHandler - Reverted to directly writes frames to the session, throwing ChannelException caused problems. Added Trace and debug logging. BasicRejectMethodHandler, ChannelCloseHandler, ConnectionCloseMethodHandler - Added Debug logging AMQPFastProtocolHandler - moved error log to before session.write AMQMessage - Added additional debug via debugIdentity() and comments AMQQueue - Decoupled reference counting from dequeue operation. ConcurrentSelectorDeliveryManager - Added comments and increased info in debug logging SubscriptionImpl - Disabled use of acks for browsers. For now put setDeliveredToConsumer back in the finally block. commented that I'm not sure this is correct as even an error writing to client will cause msg to be marked delivered to consumer. + On Close ensured that it is only called once. + Had problem where closing browser was causing two CancelOk frames to be sent back to client. RequiredDeliveryException - Added comment to explain incrementReference LocalTransactionalContext - Commented out incrementReference as it shouldn't be required here. NonTransactionalContext - Removed incrementReference on deliver + - Fixed bug where browsers - acks would cause messages to be discarded. new JIRA this needs tidied up. TxnBuffer - Added debug logging. Client ------ AMQQueueBrowser - Added comments AMQSession - Added comments and debug + Updated to cause closed consumer to reject messages rather than receive them. + Prevented NoConsumer's from rollingback and rejecting.. they simply clear their SyncQueue - JIRA to ensure clean state with rollback BasicMessageConsumer - Added trace level debuging on close calls + Forced noConsume-rs to use NO_ACK + added more logging Closeable - Updated to use isClosed rather than directly calling _closed.get() to aid in future work on ensuring multi threaded close still allows pending acks to be processed first. ChannelCloseOkMethodHandler - updated comment AMQProtocolSession - Update comments,whitespace TransportConnection - removed static block FlowControllingBlockingQueue - Added isEmpty() Method PropertyValueTest - Added VM Broker setup + Updated test to run once and 50 times to pull out delivery tag problems that were occuring. + Adjusted logging level to be more helpful. moved some info down to trace and debug. MessageRequeueTest - Moved QpidClientConnection its own file. + Fixed it so it actually runs more than one consumer, concurrently.Now 3 was 1. ConcurrentLinkedMessageQueueAtomicSize - Implemented iterator(). Added QueueBrowserTest to system tests to test QueueBrowsering. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@517638 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src')
-rw-r--r--java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java31
1 files changed, 30 insertions, 1 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java b/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java
index 883d5018cd..4636f44795 100644
--- a/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java
+++ b/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java
@@ -181,8 +181,37 @@ public class ConcurrentLinkedMessageQueueAtomicSize<E> extends ConcurrentLinkedQ
@Override
public Iterator<E> iterator()
{
- throw new RuntimeException("Not Implemented");
+ final Iterator<E> mainMessageIterator = super.iterator();
+ return new Iterator<E>()
+ {
+ final Iterator<E> _headIterator = _messageHead.iterator();
+ final Iterator<E> _mainIterator = mainMessageIterator;
+
+ Iterator<E> last;
+
+ public boolean hasNext()
+ {
+ return _headIterator.hasNext() || _mainIterator.hasNext();
+ }
+ public E next()
+ {
+ if (_headIterator.hasNext())
+ {
+ last = _headIterator;
+ return _headIterator.next();
+ }
+ else
+ {
+ last = _mainIterator;
+ return _mainIterator.next();
+ }
+ }
+ public void remove()
+ {
+ last.remove();
+ }
+ };
}
@Override