summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2009-03-06 12:27:49 +0000
committerMartin Ritchie <ritchiem@apache.org>2009-03-06 12:27:49 +0000
commitfff143ba920066a1f127c4e69ed5f3c145f73443 (patch)
tree173a90c473b92389a9416bcfbec65ff696df1ed9 /qpid/java
parentc8f5fbf155a5f85ec09b78fd108660f0d0fef573 (diff)
downloadqpid-python-fff143ba920066a1f127c4e69ed5f3c145f73443.tar.gz
Style Changes
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@750871 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java119
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java11
2 files changed, 61 insertions, 69 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 32abe3e1cf..c510ec3374 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -20,26 +20,23 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.subscription.Subscription;
-import org.apache.log4j.Logger;
-import java.util.Set;
import java.util.HashSet;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
-
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
public class QueueEntryImpl implements QueueEntry
{
- /**
- * Used for debugging purposes.
- */
+ /** Used for debugging purposes. */
private static final Logger _log = Logger.getLogger(QueueEntryImpl.class);
private final SimpleQueueEntryList _queueEntryList;
@@ -53,27 +50,24 @@ public class QueueEntryImpl implements QueueEntry
private volatile EntryState _state = AVAILABLE_STATE;
private static final
- AtomicReferenceFieldUpdater<QueueEntryImpl, EntryState>
+ AtomicReferenceFieldUpdater<QueueEntryImpl, EntryState>
_stateUpdater =
- AtomicReferenceFieldUpdater.newUpdater
- (QueueEntryImpl.class, EntryState.class, "_state");
-
+ AtomicReferenceFieldUpdater.newUpdater
+ (QueueEntryImpl.class, EntryState.class, "_state");
private volatile Set<StateChangeListener> _stateChangeListeners;
private static final
- AtomicReferenceFieldUpdater<QueueEntryImpl, Set>
- _listenersUpdater =
- AtomicReferenceFieldUpdater.newUpdater
- (QueueEntryImpl.class, Set.class, "_stateChangeListeners");
-
+ AtomicReferenceFieldUpdater<QueueEntryImpl, Set>
+ _listenersUpdater =
+ AtomicReferenceFieldUpdater.newUpdater
+ (QueueEntryImpl.class, Set.class, "_stateChangeListeners");
private static final
- AtomicLongFieldUpdater<QueueEntryImpl>
+ AtomicLongFieldUpdater<QueueEntryImpl>
_entryIdUpdater =
- AtomicLongFieldUpdater.newUpdater
- (QueueEntryImpl.class, "_entryId");
-
+ AtomicLongFieldUpdater.newUpdater
+ (QueueEntryImpl.class, "_entryId");
private volatile long _entryId;
@@ -90,17 +84,15 @@ public class QueueEntryImpl implements QueueEntry
private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER);
-
QueueEntryImpl(SimpleQueueEntryList queueEntryList)
{
- this(queueEntryList,null,Long.MIN_VALUE);
+ this(queueEntryList, null, Long.MIN_VALUE);
_state = DELETED_STATE;
}
-
public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message, final long entryId)
{
- this(queueEntryList,message);
+ this(queueEntryList, message);
_entryIdUpdater.set(this, entryId);
}
@@ -113,8 +105,8 @@ public class QueueEntryImpl implements QueueEntry
{
_messageId = message.getMessageId();
_messageSize = message.getSize();
-
- if(message.isImmediate())
+
+ if (message.isImmediate())
{
_flags |= IMMEDIATE;
}
@@ -193,8 +185,8 @@ public class QueueEntryImpl implements QueueEntry
private boolean acquire(final EntryState state)
{
- boolean acquired = _stateUpdater.compareAndSet(this,AVAILABLE_STATE, state);
- if(acquired && _stateChangeListeners != null)
+ boolean acquired = _stateUpdater.compareAndSet(this, AVAILABLE_STATE, state);
+ if (acquired && _stateChangeListeners != null)
{
notifyStateChange(State.AVAILABLE, State.ACQUIRED);
}
@@ -220,24 +212,23 @@ public class QueueEntryImpl implements QueueEntry
public void release()
{
- _stateUpdater.set(this,AVAILABLE_STATE);
+ _stateUpdater.set(this, AVAILABLE_STATE);
}
public String debugIdentity()
{
- String entry="[State:"+_state.getState().name()+"]";
+ String entry = "[State:" + _state.getState().name() + "]";
if (_message == null)
{
- return entry+"(Message Unloaded ID:" + _messageId +")";
+ return entry + "(Message Unloaded ID:" + _messageId + ")";
}
else
{
- return entry+_message.debugIdentity();
+ return entry + _message.debugIdentity();
}
}
-
- public boolean immediateAndNotDelivered()
+ public boolean immediateAndNotDelivered()
{
return (_flags & IMMEDIATE_AND_DELIVERED) == IMMEDIATE;
}
@@ -266,15 +257,15 @@ public class QueueEntryImpl implements QueueEntry
public Subscription getDeliveredSubscription()
{
- EntryState state = _state;
- if (state instanceof SubscriptionAcquiredState)
- {
- return ((SubscriptionAcquiredState) state).getSubscription();
- }
- else
- {
- return null;
- }
+ EntryState state = _state;
+ if (state instanceof SubscriptionAcquiredState)
+ {
+ return ((SubscriptionAcquiredState) state).getSubscription();
+ }
+ else
+ {
+ return null;
+ }
}
@@ -301,7 +292,7 @@ public class QueueEntryImpl implements QueueEntry
}
public boolean isRejectedBy(Subscription subscription)
- {
+ {
if (_rejectedBy != null) // We have subscriptions that rejected this message
{
@@ -313,11 +304,10 @@ public class QueueEntryImpl implements QueueEntry
}
}
-
public void requeue(final StoreContext storeContext) throws AMQException
{
getQueue().requeue(storeContext, this);
- if(_stateChangeListeners != null)
+ if (_stateChangeListeners != null)
{
notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
}
@@ -327,7 +317,7 @@ public class QueueEntryImpl implements QueueEntry
{
EntryState state = _state;
- if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE))
+ if ((state.getState() == State.ACQUIRED) && _stateUpdater.compareAndSet(this, state, DEQUEUED_STATE))
{
if (state instanceof SubscriptionAcquiredState)
{
@@ -348,7 +338,7 @@ public class QueueEntryImpl implements QueueEntry
private void notifyStateChange(final State oldState, final State newState)
{
- for(StateChangeListener l : _stateChangeListeners)
+ for (StateChangeListener l : _stateChangeListeners)
{
l.stateChanged(this, oldState, newState);
}
@@ -373,7 +363,7 @@ public class QueueEntryImpl implements QueueEntry
public void addStateChangeListener(StateChangeListener listener)
{
Set<StateChangeListener> listeners = _stateChangeListeners;
- if(listeners == null)
+ if (listeners == null)
{
_listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener>());
listeners = _stateChangeListeners;
@@ -385,7 +375,7 @@ public class QueueEntryImpl implements QueueEntry
public boolean removeStateChangeListener(StateChangeListener listener)
{
Set<StateChangeListener> listeners = _stateChangeListeners;
- if(listeners != null)
+ if (listeners != null)
{
return listeners.remove(listener);
}
@@ -402,21 +392,23 @@ public class QueueEntryImpl implements QueueEntry
{
_backingStore.unload(_message);
- if(_log.isDebugEnabled())
+ if (_log.isDebugEnabled())
{
_log.debug("Unloaded:" + debugIdentity());
}
-
_message = null;
- //Update the memoryState if this load call resulted in the message being purged from memory
+
+ //Update the memoryState if this load call resulted in the message being purged from memory
if (!_flowed.getAndSet(true))
{
_queueEntryList.entryUnloadedUpdateMemory(this);
}
- } catch (UnableToFlowMessageException utfme) {
+ }
+ catch (UnableToFlowMessageException utfme)
+ {
// There is no recovery needed as the memory states remain unchanged.
- if(_log.isDebugEnabled())
+ if (_log.isDebugEnabled())
{
_log.debug("Unable to Flow message:" + debugIdentity() + ", due to:" + utfme.getMessage());
}
@@ -430,7 +422,7 @@ public class QueueEntryImpl implements QueueEntry
{
_message = _backingStore.load(_messageId);
- if(_log.isDebugEnabled())
+ if (_log.isDebugEnabled())
{
_log.debug("Loaded:" + debugIdentity());
}
@@ -448,10 +440,9 @@ public class QueueEntryImpl implements QueueEntry
return _flowed.get();
}
-
public int compareTo(final QueueEntry o)
{
- QueueEntryImpl other = (QueueEntryImpl)o;
+ QueueEntryImpl other = (QueueEntryImpl) o;
return getEntryId() > other.getEntryId() ? 1 : getEntryId() < other.getEntryId() ? -1 : 0;
}
@@ -459,13 +450,13 @@ public class QueueEntryImpl implements QueueEntry
{
QueueEntryImpl next = nextNode();
- while(next != null && next.isDeleted())
+ while (next != null && next.isDeleted())
{
final QueueEntryImpl newNext = next.nextNode();
- if(newNext != null)
+ if (newNext != null)
{
- SimpleQueueEntryList._nextUpdater.compareAndSet(this,next, newNext);
+ SimpleQueueEntryList._nextUpdater.compareAndSet(this, next, newNext);
next = nextNode();
}
else
@@ -491,7 +482,7 @@ public class QueueEntryImpl implements QueueEntry
{
EntryState state = _state;
- if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE))
+ if (state != DELETED_STATE && _stateUpdater.compareAndSet(this, state, DELETED_STATE))
{
_queueEntryList.advanceHead();
if (_backingStore != null)
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 86d1948f20..f39dfe765e 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -415,7 +415,7 @@ public class SimpleAMQQueueTest extends TestCase
//Check the queue is still within it's limits.
long current = _queue.getMemoryUsageCurrent();
- assertTrue("Queue has gone over quota:" + current+"/"+_queue.getMemoryUsageMaximum() ,
+ assertTrue("Queue has gone over quota:" + current + "/" + _queue.getMemoryUsageMaximum(),
current <= _queue.getMemoryUsageMaximum());
assertTrue("Queue has a negative quota:" + _queue.getMemoryUsageCurrent(), _queue.getMemoryUsageCurrent() >= 0);
@@ -428,14 +428,14 @@ public class SimpleAMQQueueTest extends TestCase
}
}
- public void testMessagesFlowToDiskPurger() throws AMQException, InterruptedException
+ public void testMessagesFlowToDiskPurger() throws AMQException, InterruptedException
{
// Create IncomingMessage and nondurable queue
NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null);
MESSAGE_SIZE = 1;
/** Set to larger than the purge batch size. Default 100.
- * @see FlowableBaseQueueEntryList.BATCH_PROCESS_COUNT */
+ * @see FlowableBaseQueueEntryList.BATCH_PROCESS_COUNT */
long MEMORY_MAX = 500;
int MESSAGE_COUNT = (int) MEMORY_MAX;
//Set the Memory Usage to be very low
@@ -454,7 +454,7 @@ public class SimpleAMQQueueTest extends TestCase
// Send anothe and ensure we are flowed
sendMessage(txnContext);
assertEquals(MESSAGE_COUNT + 1, _queue.getMessageCount());
- assertEquals(MESSAGE_COUNT , _queue.getMemoryUsageCurrent());
+ assertEquals(MESSAGE_COUNT, _queue.getMemoryUsageCurrent());
assertTrue("Queue is not flowed.", _queue.isFlowed());
_queue.setMemoryUsageMaximum(0L);
@@ -469,7 +469,7 @@ public class SimpleAMQQueueTest extends TestCase
}
assertEquals(MESSAGE_COUNT + 1, _queue.getMessageCount());
- assertEquals(0L , _queue.getMemoryUsageCurrent());
+ assertEquals(0L, _queue.getMemoryUsageCurrent());
assertTrue("Queue is not flowed.", _queue.isFlowed());
}
@@ -511,6 +511,7 @@ public class SimpleAMQQueueTest extends TestCase
assertNotNull(data);
}
+
// FIXME: move this to somewhere useful
private static AMQMessage createMessage(final MessagePublishInfo publishBody)
{