diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2009-03-06 12:27:49 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2009-03-06 12:27:49 +0000 |
| commit | fff143ba920066a1f127c4e69ed5f3c145f73443 (patch) | |
| tree | 173a90c473b92389a9416bcfbec65ff696df1ed9 /qpid/java | |
| parent | c8f5fbf155a5f85ec09b78fd108660f0d0fef573 (diff) | |
| download | qpid-python-fff143ba920066a1f127c4e69ed5f3c145f73443.tar.gz | |
Style Changes
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@750871 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
| -rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java | 119 | ||||
| -rw-r--r-- | qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java | 11 |
2 files changed, 61 insertions, 69 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 32abe3e1cf..c510ec3374 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -20,26 +20,23 @@ */ package org.apache.qpid.server.queue; +import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.subscription.Subscription; -import org.apache.log4j.Logger; -import java.util.Set; import java.util.HashSet; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import java.util.concurrent.atomic.AtomicLongFieldUpdater; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; - +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; public class QueueEntryImpl implements QueueEntry { - /** - * Used for debugging purposes. - */ + /** Used for debugging purposes. */ private static final Logger _log = Logger.getLogger(QueueEntryImpl.class); private final SimpleQueueEntryList _queueEntryList; @@ -53,27 +50,24 @@ public class QueueEntryImpl implements QueueEntry private volatile EntryState _state = AVAILABLE_STATE; private static final - AtomicReferenceFieldUpdater<QueueEntryImpl, EntryState> + AtomicReferenceFieldUpdater<QueueEntryImpl, EntryState> _stateUpdater = - AtomicReferenceFieldUpdater.newUpdater - (QueueEntryImpl.class, EntryState.class, "_state"); - + AtomicReferenceFieldUpdater.newUpdater + (QueueEntryImpl.class, EntryState.class, "_state"); private volatile Set<StateChangeListener> _stateChangeListeners; private static final - AtomicReferenceFieldUpdater<QueueEntryImpl, Set> - _listenersUpdater = - AtomicReferenceFieldUpdater.newUpdater - (QueueEntryImpl.class, Set.class, "_stateChangeListeners"); - + AtomicReferenceFieldUpdater<QueueEntryImpl, Set> + _listenersUpdater = + AtomicReferenceFieldUpdater.newUpdater + (QueueEntryImpl.class, Set.class, "_stateChangeListeners"); private static final - AtomicLongFieldUpdater<QueueEntryImpl> + AtomicLongFieldUpdater<QueueEntryImpl> _entryIdUpdater = - AtomicLongFieldUpdater.newUpdater - (QueueEntryImpl.class, "_entryId"); - + AtomicLongFieldUpdater.newUpdater + (QueueEntryImpl.class, "_entryId"); private volatile long _entryId; @@ -90,17 +84,15 @@ public class QueueEntryImpl implements QueueEntry private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER); - QueueEntryImpl(SimpleQueueEntryList queueEntryList) { - this(queueEntryList,null,Long.MIN_VALUE); + this(queueEntryList, null, Long.MIN_VALUE); _state = DELETED_STATE; } - public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message, final long entryId) { - this(queueEntryList,message); + this(queueEntryList, message); _entryIdUpdater.set(this, entryId); } @@ -113,8 +105,8 @@ public class QueueEntryImpl implements QueueEntry { _messageId = message.getMessageId(); _messageSize = message.getSize(); - - if(message.isImmediate()) + + if (message.isImmediate()) { _flags |= IMMEDIATE; } @@ -193,8 +185,8 @@ public class QueueEntryImpl implements QueueEntry private boolean acquire(final EntryState state) { - boolean acquired = _stateUpdater.compareAndSet(this,AVAILABLE_STATE, state); - if(acquired && _stateChangeListeners != null) + boolean acquired = _stateUpdater.compareAndSet(this, AVAILABLE_STATE, state); + if (acquired && _stateChangeListeners != null) { notifyStateChange(State.AVAILABLE, State.ACQUIRED); } @@ -220,24 +212,23 @@ public class QueueEntryImpl implements QueueEntry public void release() { - _stateUpdater.set(this,AVAILABLE_STATE); + _stateUpdater.set(this, AVAILABLE_STATE); } public String debugIdentity() { - String entry="[State:"+_state.getState().name()+"]"; + String entry = "[State:" + _state.getState().name() + "]"; if (_message == null) { - return entry+"(Message Unloaded ID:" + _messageId +")"; + return entry + "(Message Unloaded ID:" + _messageId + ")"; } else { - return entry+_message.debugIdentity(); + return entry + _message.debugIdentity(); } } - - public boolean immediateAndNotDelivered() + public boolean immediateAndNotDelivered() { return (_flags & IMMEDIATE_AND_DELIVERED) == IMMEDIATE; } @@ -266,15 +257,15 @@ public class QueueEntryImpl implements QueueEntry public Subscription getDeliveredSubscription() { - EntryState state = _state; - if (state instanceof SubscriptionAcquiredState) - { - return ((SubscriptionAcquiredState) state).getSubscription(); - } - else - { - return null; - } + EntryState state = _state; + if (state instanceof SubscriptionAcquiredState) + { + return ((SubscriptionAcquiredState) state).getSubscription(); + } + else + { + return null; + } } @@ -301,7 +292,7 @@ public class QueueEntryImpl implements QueueEntry } public boolean isRejectedBy(Subscription subscription) - { + { if (_rejectedBy != null) // We have subscriptions that rejected this message { @@ -313,11 +304,10 @@ public class QueueEntryImpl implements QueueEntry } } - public void requeue(final StoreContext storeContext) throws AMQException { getQueue().requeue(storeContext, this); - if(_stateChangeListeners != null) + if (_stateChangeListeners != null) { notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE); } @@ -327,7 +317,7 @@ public class QueueEntryImpl implements QueueEntry { EntryState state = _state; - if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE)) + if ((state.getState() == State.ACQUIRED) && _stateUpdater.compareAndSet(this, state, DEQUEUED_STATE)) { if (state instanceof SubscriptionAcquiredState) { @@ -348,7 +338,7 @@ public class QueueEntryImpl implements QueueEntry private void notifyStateChange(final State oldState, final State newState) { - for(StateChangeListener l : _stateChangeListeners) + for (StateChangeListener l : _stateChangeListeners) { l.stateChanged(this, oldState, newState); } @@ -373,7 +363,7 @@ public class QueueEntryImpl implements QueueEntry public void addStateChangeListener(StateChangeListener listener) { Set<StateChangeListener> listeners = _stateChangeListeners; - if(listeners == null) + if (listeners == null) { _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener>()); listeners = _stateChangeListeners; @@ -385,7 +375,7 @@ public class QueueEntryImpl implements QueueEntry public boolean removeStateChangeListener(StateChangeListener listener) { Set<StateChangeListener> listeners = _stateChangeListeners; - if(listeners != null) + if (listeners != null) { return listeners.remove(listener); } @@ -402,21 +392,23 @@ public class QueueEntryImpl implements QueueEntry { _backingStore.unload(_message); - if(_log.isDebugEnabled()) + if (_log.isDebugEnabled()) { _log.debug("Unloaded:" + debugIdentity()); } - _message = null; - //Update the memoryState if this load call resulted in the message being purged from memory + + //Update the memoryState if this load call resulted in the message being purged from memory if (!_flowed.getAndSet(true)) { _queueEntryList.entryUnloadedUpdateMemory(this); } - } catch (UnableToFlowMessageException utfme) { + } + catch (UnableToFlowMessageException utfme) + { // There is no recovery needed as the memory states remain unchanged. - if(_log.isDebugEnabled()) + if (_log.isDebugEnabled()) { _log.debug("Unable to Flow message:" + debugIdentity() + ", due to:" + utfme.getMessage()); } @@ -430,7 +422,7 @@ public class QueueEntryImpl implements QueueEntry { _message = _backingStore.load(_messageId); - if(_log.isDebugEnabled()) + if (_log.isDebugEnabled()) { _log.debug("Loaded:" + debugIdentity()); } @@ -448,10 +440,9 @@ public class QueueEntryImpl implements QueueEntry return _flowed.get(); } - public int compareTo(final QueueEntry o) { - QueueEntryImpl other = (QueueEntryImpl)o; + QueueEntryImpl other = (QueueEntryImpl) o; return getEntryId() > other.getEntryId() ? 1 : getEntryId() < other.getEntryId() ? -1 : 0; } @@ -459,13 +450,13 @@ public class QueueEntryImpl implements QueueEntry { QueueEntryImpl next = nextNode(); - while(next != null && next.isDeleted()) + while (next != null && next.isDeleted()) { final QueueEntryImpl newNext = next.nextNode(); - if(newNext != null) + if (newNext != null) { - SimpleQueueEntryList._nextUpdater.compareAndSet(this,next, newNext); + SimpleQueueEntryList._nextUpdater.compareAndSet(this, next, newNext); next = nextNode(); } else @@ -491,7 +482,7 @@ public class QueueEntryImpl implements QueueEntry { EntryState state = _state; - if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE)) + if (state != DELETED_STATE && _stateUpdater.compareAndSet(this, state, DELETED_STATE)) { _queueEntryList.advanceHead(); if (_backingStore != null) diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 86d1948f20..f39dfe765e 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -415,7 +415,7 @@ public class SimpleAMQQueueTest extends TestCase //Check the queue is still within it's limits. long current = _queue.getMemoryUsageCurrent(); - assertTrue("Queue has gone over quota:" + current+"/"+_queue.getMemoryUsageMaximum() , + assertTrue("Queue has gone over quota:" + current + "/" + _queue.getMemoryUsageMaximum(), current <= _queue.getMemoryUsageMaximum()); assertTrue("Queue has a negative quota:" + _queue.getMemoryUsageCurrent(), _queue.getMemoryUsageCurrent() >= 0); @@ -428,14 +428,14 @@ public class SimpleAMQQueueTest extends TestCase } } - public void testMessagesFlowToDiskPurger() throws AMQException, InterruptedException + public void testMessagesFlowToDiskPurger() throws AMQException, InterruptedException { // Create IncomingMessage and nondurable queue NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null); MESSAGE_SIZE = 1; /** Set to larger than the purge batch size. Default 100. - * @see FlowableBaseQueueEntryList.BATCH_PROCESS_COUNT */ + * @see FlowableBaseQueueEntryList.BATCH_PROCESS_COUNT */ long MEMORY_MAX = 500; int MESSAGE_COUNT = (int) MEMORY_MAX; //Set the Memory Usage to be very low @@ -454,7 +454,7 @@ public class SimpleAMQQueueTest extends TestCase // Send anothe and ensure we are flowed sendMessage(txnContext); assertEquals(MESSAGE_COUNT + 1, _queue.getMessageCount()); - assertEquals(MESSAGE_COUNT , _queue.getMemoryUsageCurrent()); + assertEquals(MESSAGE_COUNT, _queue.getMemoryUsageCurrent()); assertTrue("Queue is not flowed.", _queue.isFlowed()); _queue.setMemoryUsageMaximum(0L); @@ -469,7 +469,7 @@ public class SimpleAMQQueueTest extends TestCase } assertEquals(MESSAGE_COUNT + 1, _queue.getMessageCount()); - assertEquals(0L , _queue.getMemoryUsageCurrent()); + assertEquals(0L, _queue.getMemoryUsageCurrent()); assertTrue("Queue is not flowed.", _queue.isFlowed()); } @@ -511,6 +511,7 @@ public class SimpleAMQQueueTest extends TestCase assertNotNull(data); } + // FIXME: move this to somewhere useful private static AMQMessage createMessage(final MessagePublishInfo publishBody) { |
