diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2009-02-27 13:35:38 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2009-02-27 13:35:38 +0000 |
| commit | 9f75e5deb5ef0ca80e77a4f11983bb6a44e10e50 (patch) | |
| tree | fd44664a749220744620db3d50fd6b40a13a10b2 /java/broker/src/main | |
| parent | 6561836eb77f51b8882be7d6f87c71fa536260d0 (diff) | |
| download | qpid-python-9f75e5deb5ef0ca80e77a4f11983bb6a44e10e50.tar.gz | |
QPID-1635,QPID-1636 : Moved additional properties from AMQMessage up to QueueEntry to allow processing whilst messasge has been flowed. Moved : _flags (for Immediate and delivered status), expiry, messageID. Created base class to maintain counts of data and objects in queue. Removed this responsibility from the AMQQueues and on to the QueueEntryLists. This will more easily allow the QEL structure to be flowed to disk at a later stage. Updated tests as a result of moves.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@748516 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/main')
13 files changed, 447 insertions, 214 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index 5bde27dba5..8dac12fe24 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -60,30 +60,6 @@ public interface AMQMessage //Check the status of this message - /** - * Called selectors to determin if the message has already been sent - * - * @return _deliveredToConsumer - */ - boolean getDeliveredToConsumer(); - - /** - * Called to enforce the 'immediate' flag. - * - * @returns true if the message is marked for immediate delivery but has not been marked as delivered - * to a consumer - */ - boolean immediateAndNotDelivered(); - - /** - * Checks to see if the message has expired. If it has the message is dequeued. - * - * @return true if the message has expire - * - * @throws org.apache.qpid.AMQException - */ - boolean expired() throws AMQException; - /** Is this a persistent message * * @return true if the message is persistent @@ -91,13 +67,8 @@ public interface AMQMessage boolean isPersistent(); - /** - * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality). - * And for selector efficiency. - */ - void setDeliveredToConsumer(); + boolean isImmediate(); - void setExpiration(long expiration); void setClientIdentifier(AMQProtocolSession.ProtocolSessionIdentifier sessionIdentifier); @@ -121,4 +92,8 @@ public interface AMQMessage String toString(); String debugIdentity(); + + void setExpiration(long expiration); + + long getExpiration(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java index 34a70c6969..ade780d0bb 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java @@ -36,12 +36,12 @@ public class AMQPriorityQueue extends SimpleAMQQueue int priorities) throws AMQException { - super(name, durable, owner, autoDelete, virtualHost, new PriorityQueueList.Factory(priorities)); + super(name, durable, owner, autoDelete, virtualHost, new PriorityQueueEntryList.Factory(priorities)); } public int getPriorities() { - return ((PriorityQueueList) _entries).getPriorities(); + return ((PriorityQueueEntryList) _entries).getPriorities(); } @Override diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java new file mode 100644 index 0000000000..72ea5f2667 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java @@ -0,0 +1,185 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.log4j.Logger; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +/** + * This is an abstract base class to handle + */ +public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryList +{ + private static final Logger _log = Logger.getLogger(FlowableBaseQueueEntryList.class); + + private final AtomicInteger _atomicQueueCount = new AtomicInteger(0); + private final AtomicLong _atomicQueueSize = new AtomicLong(0L); + private final AtomicLong _atomicQueueInMemory = new AtomicLong(0L); + /** The maximum amount of memory that is allocated to this queue. Beyond this the queue will flow to disk. */ + + private long _memoryUsageMaximum = 0; + + /** The minimum amount of memory that is allocated to this queue. If the queueDepth hits this level then more flowed data can be read in. */ + private long _memoryUsageMinimum = 0; + private AtomicBoolean _flowed; + private QueueBackingStore _backingStore; + protected AMQQueue _queue; + + FlowableBaseQueueEntryList(AMQQueue queue) + { + _queue = queue; + _flowed = new AtomicBoolean(false); + VirtualHost vhost = queue.getVirtualHost(); + if (vhost != null) + { + _backingStore = vhost.getQueueBackingStore(); + } + } + + public void setFlowed(boolean flowed) + { + if (_flowed.get() != flowed) + { + _log.info("Marking Queue(" + _queue.getName() + ") as flowed (" + flowed + ")"); + _flowed.set(flowed); + } + } + + public boolean isFlowed() + { + return _flowed.get(); + } + + public int size() + { + return _atomicQueueCount.get(); + } + + public long dataSize() + { + return _atomicQueueSize.get(); + } + + public long memoryUsed() + { + return _atomicQueueInMemory.get(); + } + + public void setMemoryUsageMaximum(long maximumMemoryUsage) + { + _memoryUsageMaximum = maximumMemoryUsage; + + // Don't attempt to start the inhaler/purger unless we have a minimum value specified. + if (_memoryUsageMaximum > 0) + { + // If we've increased the max memory above what we have in memory then we can inhale more + if (_memoryUsageMaximum > _atomicQueueInMemory.get()) + { + //TODO start inhaler + } + else // if we have now have to much memory in use we need to purge. + { + //TODO start purger + } + } + } + + public long getMemoryUsageMaximum() + { + return _memoryUsageMaximum; + } + + public void setMemoryUsageMinimum(long minimumMemoryUsage) + { + _memoryUsageMinimum = minimumMemoryUsage; + + // Don't attempt to start the inhaler unless we have a minimum value specified. + if (_memoryUsageMinimum > 0) + { + // If we've increased the minimum memory above what we have in memory then we need to inhale more + if (_memoryUsageMinimum >= _atomicQueueInMemory.get()) + { + //TODO start inhaler + } + } + } + + public long getMemoryUsageMinimum() + { + return _memoryUsageMinimum; + } + + protected boolean willCauseFlowToDisk(QueueEntryImpl queueEntry) + { + return _memoryUsageMaximum != 0 && memoryUsed() + queueEntry.getSize() > _memoryUsageMaximum; + } + + protected void incrementCounters(final QueueEntryImpl queueEntry) + { + _atomicQueueCount.incrementAndGet(); + _atomicQueueSize.addAndGet(queueEntry.getSize()); + if (!willCauseFlowToDisk(queueEntry)) + { + _atomicQueueInMemory.addAndGet(queueEntry.getSize()); + } + else + { + setFlowed(true); + flowingToDisk(queueEntry); + } + } + + /** + * Called when we are now flowing to disk + * @param queueEntry the entry that is being flowed to disk + */ + protected void flowingToDisk(QueueEntryImpl queueEntry) + { + try + { + queueEntry.flow(); + } + catch (UnableToFlowMessageException e) + { + _atomicQueueInMemory.addAndGet(queueEntry.getSize()); + } + } + + protected void dequeued(QueueEntryImpl queueEntry) + { + _atomicQueueCount.decrementAndGet(); + _atomicQueueSize.addAndGet(-queueEntry.getSize()); + if (!queueEntry.isFlowed()) + { + _atomicQueueInMemory.addAndGet(-queueEntry.getSize()); + } + } + + public QueueBackingStore getBackingStore() + { + return _backingStore; + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java new file mode 100644 index 0000000000..4e95978bf8 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +public interface FlowableQueueEntryList +{ + void setFlowed(boolean flowed); + + boolean isFlowed(); + + int size(); + + long dataSize(); + + long memoryUsed(); + + void setMemoryUsageMaximum(long maximumMemoryUsage); + + long getMemoryUsageMaximum(); + + void setMemoryUsageMinimum(long minimumMemoryUsage); + + long getMemoryUsageMinimum(); +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java index 7be2827e0f..d812b8ceca 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java @@ -21,17 +21,17 @@ package org.apache.qpid.server.queue; import org.apache.qpid.framing.CommonContentHeaderProperties; -import org.apache.qpid.AMQException; -public class PriorityQueueList implements QueueEntryList +public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implements QueueEntryList { private final AMQQueue _queue; private final QueueEntryList[] _priorityLists; private final int _priorities; private final int _priorityOffset; - public PriorityQueueList(AMQQueue queue, int priorities) + public PriorityQueueEntryList(AMQQueue queue, int priorities) { + super(queue); _queue = queue; _priorityLists = new QueueEntryList[priorities]; _priorities = priorities; @@ -53,7 +53,7 @@ public class PriorityQueueList implements QueueEntryList } public QueueEntry add(AMQMessage message) - { + { int index = ((CommonContentHeaderProperties)((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset; if(index >= _priorities) { @@ -154,7 +154,29 @@ public class PriorityQueueList implements QueueEntryList public QueueEntryList createQueueEntryList(AMQQueue queue) { - return new PriorityQueueList(queue, _priorities); + return new PriorityQueueEntryList(queue, _priorities); + } + } + + @Override + public int size() + { + int size=0; + for (QueueEntryList queueEntryList : _priorityLists) + { + size += queueEntryList.size(); } + + return size; + } + + + @Override + protected void flowingToDisk(QueueEntryImpl queueEntry) + { + //TODO this disables FTD for priority queues + // As the incomming message isn't always the one to purge. + // More logic is required up in the add() method here to determine if the + // incomming message is at the 'front' or not. } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index 09600b9d28..25d41c8203 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -119,16 +119,42 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept final static EntryState EXPIRED_STATE = new ExpiredState(); final static EntryState NON_SUBSCRIPTION_ACQUIRED_STATE = new NonSubscriptionAcquiredState(); + /** Flag to indicate that this message requires 'immediate' delivery. */ + + final static byte IMMEDIATE = 0x01; + + /** + * Flag to indicate whether this message has been delivered to a consumer. Used in implementing return functionality + * for messages published with the 'immediate' flag. + */ + + final static byte DELIVERED_TO_CONSUMER = 0x02; + + AMQQueue getQueue(); AMQMessage getMessage(); long getSize(); + /** + * Called selectors to determin if the message has already been sent + * + * @return _deliveredToConsumer + */ boolean getDeliveredToConsumer(); + /** + * Checks to see if the message has expired. If it has the message is dequeued. + * + * @return true if the message has expire + * + * @throws org.apache.qpid.AMQException + */ boolean expired() throws AMQException; + public void setExpiration(final long expiration); + boolean isAcquired(); boolean acquire(); @@ -143,10 +169,22 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept void setDeliveredToSubscription(); + /** + * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality). + * And for selector efficiency. + */ + public void setDeliveredToConsumer(); + void release(); String debugIdentity(); + /** + * Called to enforce the 'immediate' flag. + * + * @returns true if the message is marked for immediate delivery but has not been marked as delivered + * to a consumer + */ boolean immediateAndNotDelivered(); void setRedelivered(boolean b); @@ -180,4 +218,11 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept void addStateChangeListener(StateChangeListener listener); boolean removeStateChangeListener(StateChangeListener listener); -} + + void flow() throws UnableToFlowMessageException; + + void recover(); + + boolean isFlowed(); + +}
\ No newline at end of file diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 911ed8321b..3d464d01d3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -30,6 +30,7 @@ import java.util.Set; import java.util.HashSet; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.CopyOnWriteArraySet; @@ -78,6 +79,17 @@ public class QueueEntryImpl implements QueueEntry volatile QueueEntryImpl _next; + private long _messageSize; + private QueueBackingStore _backingStore; + private AtomicBoolean _flowed; + private Long _messageId; + + private byte _flags = 0; + + private long _expiration; + + private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER); + QueueEntryImpl(SimpleQueueEntryList queueEntryList) { @@ -88,8 +100,7 @@ public class QueueEntryImpl implements QueueEntry public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message, final long entryId) { - _queueEntryList = queueEntryList; - _message = message; + this(queueEntryList,message); _entryIdUpdater.set(this, entryId); } @@ -98,6 +109,19 @@ public class QueueEntryImpl implements QueueEntry { _queueEntryList = queueEntryList; _message = message; + if (message != null) + { + _messageId = message.getMessageId(); + _messageSize = message.getSize(); + + if(message.isImmediate()) + { + _flags |= IMMEDIATE; + } + _expiration = message.getExpiration(); + } + _backingStore = queueEntryList.getBackingStore(); + _flowed = new AtomicBoolean(false); } protected void setEntryId(long entryId) @@ -122,17 +146,34 @@ public class QueueEntryImpl implements QueueEntry public long getSize() { - return getMessage().getSize(); + return _messageSize; } public boolean getDeliveredToConsumer() { - return getMessage().getDeliveredToConsumer(); + return (_flags & DELIVERED_TO_CONSUMER) != 0; + } + + public void setDeliveredToConsumer() + { + _flags |= DELIVERED_TO_CONSUMER; } public boolean expired() throws AMQException { - return getMessage().expired(); + if (_expiration != 0L) + { + long now = System.currentTimeMillis(); + + return (now > _expiration); + } + + return false; + } + + public void setExpiration(final long expiration) + { + _expiration = expiration; } public boolean isAcquired() @@ -169,7 +210,7 @@ public class QueueEntryImpl implements QueueEntry public void setDeliveredToSubscription() { - getMessage().setDeliveredToConsumer(); + _flags |= DELIVERED_TO_CONSUMER; } public void release() @@ -185,7 +226,7 @@ public class QueueEntryImpl implements QueueEntry public boolean immediateAndNotDelivered() { - return _message.immediateAndNotDelivered(); + return (_flags & IMMEDIATE_AND_DELIVERED) == IMMEDIATE; } public ContentHeaderBody getContentHeaderBody() throws AMQException @@ -206,8 +247,8 @@ public class QueueEntryImpl implements QueueEntry public void setRedelivered(boolean redelivered) { _redelivered = redelivered; - // todo - here we could mark this message as redelivered so we don't have to mark - // all messages on recover as redelivered. + // todo - here we could record this message as redelivered on this queue in the transactionLog + // so we don't have to mark all messages on recover as redelivered. } public Subscription getDeliveredSubscription() @@ -281,6 +322,8 @@ public class QueueEntryImpl implements QueueEntry s.restoreCredit(this); } + _queueEntryList.dequeued(this); + getQueue().dequeue(storeContext, this); if (_stateChangeListeners != null) @@ -337,6 +380,34 @@ public class QueueEntryImpl implements QueueEntry return false; } + public void flow() throws UnableToFlowMessageException + { + if (_message != null && _backingStore != null) + { + if(_log.isDebugEnabled()) + { + _log.debug("Flowing message:" + _message.debugIdentity()); + } + _backingStore.flow(_message); + _message = null; + _flowed.getAndSet(true); + } + } + + public void recover() + { + if (_messageId != null && _backingStore != null) + { + _message = _backingStore.recover(_messageId); + _flowed.getAndSet(false); + } + } + + public boolean isFlowed() + { + return _flowed.get(); + } + public int compareTo(final QueueEntry o) { @@ -382,7 +453,11 @@ public class QueueEntryImpl implements QueueEntry if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE)) { - _queueEntryList.advanceHead(); + _queueEntryList.advanceHead(); + if (_backingStore != null) + { + _backingStore.delete(_messageId); + } return true; } else @@ -395,4 +470,6 @@ public class QueueEntryImpl implements QueueEntry { return _queueEntryList; } + + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java index 313e076f61..72783e3f78 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.queue; -public interface QueueEntryList +public interface QueueEntryList extends FlowableQueueEntryList { AMQQueue getQueue(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index a4945bc11a..7f46a6063a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -72,12 +72,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>(); - private final AtomicInteger _atomicQueueCount = new AtomicInteger(0); - - private final AtomicLong _atomicQueueSize = new AtomicLong(0L); - - private final AtomicLong _atomicQueueInMemory = new AtomicLong(0L); - private final AtomicInteger _activeSubscriberCount = new AtomicInteger(); protected final SubscriptionList _subscriptionList = new SubscriptionList(this); @@ -106,11 +100,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener /** the minimum interval between sending out consecutive alerts of the same type */ public long _minimumAlertRepeatGap = ApplicationRegistry.getInstance().getConfiguration().getMinimumAlertRepeatGap(); - /** The maximum amount of memory that is allocated to this queue. Beyond this the queue will flow to disk. */ - private long _memoryUsageMaximum = 0; - - /** The minimum amount of memory that is allocated to this queue. If the queueDepth hits this level then more flowed data can be read in. */ - private long _memoryUsageMinimum = 0; private static final int MAX_ASYNC_DELIVERIES = 10; @@ -120,8 +109,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private AtomicReference _asynchronousRunner = new AtomicReference(null); private AtomicInteger _deliveredMessages = new AtomicInteger(); private AtomicBoolean _stopped = new AtomicBoolean(false); - /** Control to determin if this queue is flowed or not. */ - protected AtomicBoolean _flowed = new AtomicBoolean(false); protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) throws AMQException @@ -168,13 +155,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } resetNotifications(); - resetFlowToDisk(); - } - - public void resetFlowToDisk() - { - setMemoryUsageMaximum(_memoryUsageMaximum); - setMemoryUsageMinimum(_memoryUsageMinimum); } public void resetNotifications() @@ -205,7 +185,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public boolean isFlowed() { - return _flowed.get(); + return _entries.isFlowed(); } public AMQShortString getOwner() @@ -341,10 +321,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException { - - incrementQueueCount(); - incrementQueueSize(message); - _totalMessagesReceived.incrementAndGet(); QueueEntry entry; @@ -485,17 +461,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // Simple Queues don't :-) } - private void incrementQueueSize(final AMQMessage message) - { - getAtomicQueueSize().addAndGet(message.getSize()); - getAtomicQueueInMemory().addAndGet(message.getSize()); - } - - private void incrementQueueCount() - { - getAtomicQueueCount().incrementAndGet(); - } - private void deliverMessage(final Subscription sub, final QueueEntry entry) throws AMQException { @@ -594,8 +559,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener */ public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException { - decrementQueueCount(); - decrementQueueSize(entry); if (entry.acquiredBySubscription()) { _deliveredMessages.decrementAndGet(); @@ -625,16 +588,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } - private void decrementQueueSize(final QueueEntry entry) - { - getAtomicQueueSize().addAndGet(-entry.getMessage().getSize()); - getAtomicQueueInMemory().addAndGet(-entry.getMessage().getSize()); - } - - void decrementQueueCount() - { - getAtomicQueueCount().decrementAndGet(); - } public boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException { @@ -682,17 +635,17 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public long getMemoryUsageCurrent() { - return getAtomicQueueInMemory().get(); + return getQueueInMemory(); } public int getMessageCount() { - return getAtomicQueueCount().get(); + return getQueueCount(); } public long getQueueDepth() { - return getAtomicQueueSize().get(); + return getQueueSize(); } public int getUndeliveredMessageCount() @@ -768,21 +721,20 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener return _name.compareTo(o.getName()); } - public AtomicInteger getAtomicQueueCount() + public int getQueueCount() { - return _atomicQueueCount; + return _entries.size(); } - public AtomicLong getAtomicQueueSize() + public long getQueueSize() { - return _atomicQueueSize; + return _entries.dataSize(); } - public AtomicLong getAtomicQueueInMemory() + public long getQueueInMemory() { - return _atomicQueueInMemory; - } - + return _entries.memoryUsed(); + } private boolean isExclusiveSubscriber() { @@ -1493,46 +1445,22 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public long getMemoryUsageMaximum() { - return _memoryUsageMaximum; + return _entries.getMemoryUsageMaximum(); } public void setMemoryUsageMaximum(long maximumMemoryUsage) { - _memoryUsageMaximum = maximumMemoryUsage; - - // Don't attempt to start the inhaler/purger unless we have a minimum value specified. - if (_memoryUsageMaximum > 0) - { - // If we've increased the max memory above what we have in memory then we can inhale more - if (_memoryUsageMaximum > _atomicQueueInMemory.get()) - { - //TODO start inhaler - } - else // if we have now have to much memory in use we need to purge. - { - //TODO start purger - } - } + _entries.setMemoryUsageMaximum(maximumMemoryUsage); } public long getMemoryUsageMinimum() { - return _memoryUsageMinimum; + return _entries.getMemoryUsageMinimum(); } public void setMemoryUsageMinimum(long minimumMemoryUsage) { - _memoryUsageMinimum = minimumMemoryUsage; - - // Don't attempt to start the inhaler unless we have a minimum value specified. - if (_memoryUsageMinimum > 0) - { - // If we've increased the minimum memory above what we have in memory then we need to inhale more - if (_memoryUsageMinimum >= _atomicQueueInMemory.get()) - { - //TODO start inhaler - } - } + _entries.setMemoryUsageMinimum(minimumMemoryUsage); } public long getMinimumAlertRepeatGap() diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java index a46c5ae2e8..10abdd8318 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java @@ -1,6 +1,9 @@ package org.apache.qpid.server.queue; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; /* * @@ -22,8 +25,9 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; * under the License. * */ -public class SimpleQueueEntryList implements QueueEntryList +public class SimpleQueueEntryList extends FlowableBaseQueueEntryList implements QueueEntryList { + private final QueueEntryImpl _head; private volatile QueueEntryImpl _tail; @@ -41,12 +45,9 @@ public class SimpleQueueEntryList implements QueueEntryList AtomicReferenceFieldUpdater.newUpdater (QueueEntryImpl.class, QueueEntryImpl.class, "_next"); - - - - public SimpleQueueEntryList(AMQQueue queue) { + super(queue); _queue = queue; _head = new QueueEntryImpl(this); _tail = _head; @@ -77,6 +78,9 @@ public class SimpleQueueEntryList implements QueueEntryList public QueueEntry add(AMQMessage message) { QueueEntryImpl node = new QueueEntryImpl(this, message); + + incrementCounters(node); + for (;;) { QueueEntryImpl tail = _tail; @@ -101,12 +105,12 @@ public class SimpleQueueEntryList implements QueueEntryList } } + public QueueEntry next(QueueEntry node) { return ((QueueEntryImpl)node).getNext(); } - public class QueueEntryIteratorImpl implements QueueEntryIterator { @@ -172,7 +176,9 @@ public class SimpleQueueEntryList implements QueueEntryList { return new SimpleQueueEntryList(queue); } + } - + + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java index 0334a54fab..4c9fe81439 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java @@ -54,23 +54,11 @@ public class TransientAMQMessage implements AMQMessage protected final Long _messageId; - /** Flag to indicate that this message requires 'immediate' delivery. */ - - private static final byte IMMEDIATE = 0x01; - - /** - * Flag to indicate whether this message has been delivered to a consumer. Used in implementing return functionality - * for messages published with the 'immediate' flag. - */ - - private static final byte DELIVERED_TO_CONSUMER = 0x02; private byte _flags = 0; - private long _expiration; - private AMQProtocolSession.ProtocolSessionIdentifier _sessionIdentifier; - private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER); + private long _expiration; /** * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory @@ -165,11 +153,16 @@ public class TransientAMQMessage implements AMQMessage return "(HC:" + System.identityHashCode(this) + " ID:" + getMessageId() +")"; } - public void setExpiration(final long expiration) + public void setExpiration(long expiration) { _expiration = expiration; } + public long getExpiration() + { + return _expiration; + } + public Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel) { return new BodyFrameIterator(protocolSession, channel); @@ -190,57 +183,6 @@ public class TransientAMQMessage implements AMQMessage return _messageId; } - /** - * Called selectors to determin if the message has already been sent - * - * @return _deliveredToConsumer - */ - public boolean getDeliveredToConsumer() - { - return (_flags & DELIVERED_TO_CONSUMER) != 0; - } - - /** - * Called to enforce the 'immediate' flag. - * - * @returns true if the message is marked for immediate delivery but has not been marked as delivered - * to a consumer - */ - public boolean immediateAndNotDelivered() - { - - return (_flags & IMMEDIATE_AND_DELIVERED) == IMMEDIATE; - - } - - /** - * Checks to see if the message has expired. If it has the message is dequeued. - * - * @return true if the message has expire - * - * @throws AMQException - */ - public boolean expired() throws AMQException - { - - if (_expiration != 0L) - { - long now = System.currentTimeMillis(); - - return (now > _expiration); - } - - return false; - } - - /** - * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality). - * And for selector efficiency. - */ - public void setDeliveredToConsumer() - { - _flags |= DELIVERED_TO_CONSUMER; - } public long getSize() { @@ -315,6 +257,11 @@ public class TransientAMQMessage implements AMQMessage return false; } + public boolean isImmediate() + { + return _messagePublishInfo.isImmediate(); + } + /** * This is called when all the content has been received. * @@ -366,11 +313,6 @@ public class TransientAMQMessage implements AMQMessage { _contentBodies = Collections.EMPTY_LIST; } - - if (_messagePublishInfo.isImmediate()) - { - _flags |= IMMEDIATE; - } } public void recoverContentBodyFrame(ContentChunk contentChunk, boolean isLastContentBody) throws AMQException diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index b4b392c91d..c5b6eeca3e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -48,6 +48,8 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.DefaultQueueRegistry; import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.queue.QueueBackingStore; +import org.apache.qpid.server.queue.FileQueueBackingStore; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.routing.RoutingTable; import org.apache.qpid.server.security.access.ACLManager; @@ -86,6 +88,7 @@ public class VirtualHost implements Accessable private final Timer _houseKeepingTimer; private VirtualHostConfiguration _configuration; + private QueueBackingStore _queueBackingStore; public void setAccessableName(String name) { @@ -113,6 +116,11 @@ public class VirtualHost implements Accessable return _configuration ; } + public QueueBackingStore getQueueBackingStore() + { + return _queueBackingStore; + } + /** * Abstract MBean class. This has some of the methods implemented from management intrerface for exchanges. Any * implementaion of an Exchange MBean should extend this class. @@ -186,6 +194,9 @@ public class VirtualHost implements Accessable initialiseRoutingTable(hostConfig); } + _queueBackingStore = new FileQueueBackingStore(); + _queueBackingStore.configure(this,hostConfig); + _exchangeFactory.initialise(hostConfig); _exchangeRegistry.initialise(); diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java index d46ba85069..49afcb1340 100644 --- a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java +++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java @@ -352,7 +352,7 @@ public class Show extends AbstractCommand isredelivered.add(entry.isRedelivered() ? "true" : "false"); - isdelivered.add(msg.getDeliveredToConsumer() ? "true" : "false"); + isdelivered.add(entry.getDeliveredToConsumer() ? "true" : "false"); BasicContentHeaderProperties headers = null; |
