diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2006-09-27 09:09:42 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2006-09-27 09:09:42 +0000 |
| commit | 1ba25d9231401f2f34ee41893d402e3cb2f299ed (patch) | |
| tree | f0effe6896595dd38c0bbc60350b522c5416f480 /java/broker | |
| parent | 6ce702dfb4ea0e1835804efd328be2eee79e23b3 (diff) | |
| download | qpid-python-1ba25d9231401f2f34ee41893d402e3cb2f299ed.tar.gz | |
AMQProtocolSession.java - white space changes
BasicMessageProducer.java - white space changes
BasicMessageConsumer.java - white space changes
AMQSession.java - added a comment
MemoryMessageStore.java - white space changes
SubscriptionImpl.java AMQChannel.java - Removed race condition where two messages could get the same delivery tag and when using acks where messages can be added to the UnackMap out of sequence, Causing unknown message to ack exceptions.
DestNameExchange.java - white space/style changes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@450384 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
4 files changed, 90 insertions, 52 deletions
diff --git a/java/broker/src/org/apache/qpid/server/AMQChannel.java b/java/broker/src/org/apache/qpid/server/AMQChannel.java index 8dc4626c46..d4226c42aa 100644 --- a/java/broker/src/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/org/apache/qpid/server/AMQChannel.java @@ -23,26 +23,28 @@ import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.server.exchange.MessageRouter; +import org.apache.qpid.server.management.DefaultManagedObject; +import org.apache.qpid.server.management.Managable; +import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.txn.TxnBuffer; import org.apache.qpid.server.txn.TxnOp; -import org.apache.qpid.server.management.Managable; -import org.apache.qpid.server.management.ManagedObject; -import org.apache.qpid.server.management.DefaultManagedObject; -import javax.management.ObjectName; -import javax.management.MalformedObjectNameException; import javax.management.JMException; import javax.management.MBeanException; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + public class AMQChannel implements Managable { @@ -62,7 +64,7 @@ public class AMQChannel implements Managable * The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that * value of this represents the <b>last</b> tag sent out */ - private long _deliveryTag; + private AtomicLong _deliveryTag = new AtomicLong(0); /** * A channel has a default queue (the last declared) that is used when no queue name is @@ -74,7 +76,7 @@ public class AMQChannel implements Managable * This tag is unique per subscription to a queue. The server returns this in response to a * basic.consume request. */ - private int _consumerTag = 0; + private int _consumerTag; /** * The current message - which may be partial in the sense that not all frames have been received yet - @@ -150,7 +152,7 @@ public class AMQChannel implements Managable _txnBuffer.commit(); } } - catch(AMQException ex) + catch (AMQException ex) { throw new MBeanException(ex, ex.toString()); } @@ -160,13 +162,13 @@ public class AMQChannel implements Managable { if (_transactional) { - synchronized (_txnBuffer) + synchronized(_txnBuffer) { try { _txnBuffer.rollback(); } - catch(AMQException ex) + catch (AMQException ex) { throw new MBeanException(ex, ex.toString()); } @@ -201,7 +203,7 @@ public class AMQChannel implements Managable } public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges) - throws AMQException + throws AMQException { _channelId = channelId; _channelName = _channelId + "-" + this.hashCode(); @@ -300,7 +302,7 @@ public class AMQChannel implements Managable public long getNextDeliveryTag() { - return ++_deliveryTag; + return _deliveryTag.incrementAndGet(); } public int getNextConsumerTag() @@ -348,7 +350,7 @@ public class AMQChannel implements Managable else { throw new AMQException(_log, "Consumer tag " + consumerTag + " not known to channel " + - _channelId); + _channelId); } } @@ -361,7 +363,7 @@ public class AMQChannel implements Managable { if (_transactional) { - synchronized (_txnBuffer) + synchronized(_txnBuffer) { _txnBuffer.rollback();//releases messages } @@ -390,7 +392,7 @@ public class AMQChannel implements Managable */ public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, String consumerTag, AMQQueue queue) { - synchronized (_unacknowledgedMessageMapLock) + synchronized(_unacknowledgedMessageMapLock) { _unacknowledgedMessageMap.put(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag)); checkSuspension(); @@ -405,7 +407,7 @@ public class AMQChannel implements Managable { // we must create a new map since all the messages will get a new delivery tag when they are redelivered Map<Long, UnacknowledgedMessage> currentList; - synchronized (_unacknowledgedMessageMapLock) + synchronized(_unacknowledgedMessageMapLock) { currentList = _unacknowledgedMessageMap; _unacknowledgedMessageMap = new LinkedHashMap<Long, UnacknowledgedMessage>(DEFAULT_PREFETCH); @@ -426,7 +428,7 @@ public class AMQChannel implements Managable public void resend(AMQProtocolSession session) { //messages go to this channel - synchronized (_unacknowledgedMessageMapLock) + synchronized(_unacknowledgedMessageMapLock) { for (Map.Entry<Long, UnacknowledgedMessage> entry : _unacknowledgedMessageMap.entrySet()) { @@ -449,7 +451,7 @@ public class AMQChannel implements Managable */ public void queueDeleted(AMQQueue queue) { - synchronized (_unacknowledgedMessageMapLock) + synchronized(_unacknowledgedMessageMapLock) { for (Map.Entry<Long, UnacknowledgedMessage> unacked : _unacknowledgedMessageMap.entrySet()) { @@ -465,13 +467,25 @@ public class AMQChannel implements Managable catch (AMQException e) { _log.error("Error decrementing ref count on message " + unackedMsg.message.getMessageId() + ": " + - e, e); + e, e); } } } } } + public synchronized long prepareNewMessageForDelivery(boolean acks, AMQMessage msg, String consumerTag, AMQQueue queue) + { + long deliveryTag = getNextDeliveryTag(); + + if (acks) + { + addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); + } + + return deliveryTag; + } + /** * Acknowledge one or more messages. * @@ -498,7 +512,7 @@ public class AMQChannel implements Managable if (multiple) { LinkedList<UnacknowledgedMessage> acked = new LinkedList<UnacknowledgedMessage>(); - synchronized (_unacknowledgedMessageMapLock) + synchronized(_unacknowledgedMessageMapLock) { if (deliveryTag == 0) { @@ -514,10 +528,20 @@ public class AMQChannel implements Managable throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel"); } Iterator<Map.Entry<Long, UnacknowledgedMessage>> i = _unacknowledgedMessageMap.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry<Long, UnacknowledgedMessage> unacked = i.next(); + + if (unacked.getKey() > deliveryTag) + { + //This should not occur now. + throw new AMQException("UnacknowledgedMessageMap is out of order:" + unacked.getKey() + " When deliveryTag is:" + deliveryTag + "ES:" + _unacknowledgedMessageMap.entrySet().toString()); + } + i.remove(); + acked.add(unacked.getValue()); if (unacked.getKey() == deliveryTag) { @@ -525,11 +549,12 @@ public class AMQChannel implements Managable } } } - } + }// synchronized + if (_log.isDebugEnabled()) { _log.debug("Received multiple ack for delivery tag " + deliveryTag + ". Removing " + - acked.size() + " items."); + acked.size() + " items."); } for (UnacknowledgedMessage msg : acked) @@ -541,12 +566,14 @@ public class AMQChannel implements Managable else { UnacknowledgedMessage msg; - synchronized (_unacknowledgedMessageMapLock) + synchronized(_unacknowledgedMessageMapLock) { msg = _unacknowledgedMessageMap.remove(deliveryTag); } + if (msg == null) { + _log.info("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId); throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId); } msg.discard(); @@ -573,7 +600,7 @@ public class AMQChannel implements Managable { boolean suspend; //noinspection SynchronizeOnNonFinalField - synchronized (_unacknowledgedMessageMapLock) + synchronized(_unacknowledgedMessageMapLock) { suspend = _unacknowledgedMessageMap.size() >= _prefetchCount; } @@ -614,7 +641,7 @@ public class AMQChannel implements Managable public void rollback() throws AMQException { //need to protect rollback and close from each other... - synchronized (_txnBuffer) + synchronized(_txnBuffer) { _txnBuffer.rollback(); } diff --git a/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java index 7f1c7df224..a703595cc4 100644 --- a/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java +++ b/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java @@ -17,19 +17,21 @@ */ package org.apache.qpid.server.exchange; -import org.apache.qpid.server.queue.AMQQueue; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.*; -import org.apache.log4j.Logger; -import javax.management.openmbean.*; -import javax.management.MBeanException; import javax.management.JMException; +import javax.management.MBeanException; +import javax.management.openmbean.*; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.ArrayList; public class DestNameExchange extends AbstractExchange { @@ -117,12 +119,14 @@ public class DestNameExchange extends AbstractExchange } public void createBinding(String queueName, String binding) - throws JMException + throws JMException { AMQQueue queue = ApplicationRegistry.getInstance().getQueueRegistry().getQueue(queueName); if (queue == null) + { throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange."); + } try { @@ -147,7 +151,7 @@ public class DestNameExchange extends AbstractExchange { assert queue != null; assert routingKey != null; - if(!_index.add(routingKey, queue)) + if (!_index.add(routingKey, queue)) { _logger.debug("Queue " + queue + " is already registered with routing key " + routingKey); } @@ -195,7 +199,7 @@ public class DestNameExchange extends AbstractExchange _logger.debug("Publishing message to queue " + queues); } - for(AMQQueue q :queues) + for (AMQQueue q : queues) { q.deliver(payload); } diff --git a/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java index a3c2fab4f4..ef18f61070 100644 --- a/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -19,12 +19,12 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.BasicDeliverBody; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.AMQException; /** * Encapsulation of a supscription to a queue. @@ -70,7 +70,8 @@ public class SubscriptionImpl implements Subscription throws AMQException { AMQChannel channel = protocolSession.getChannel(channelId); - if (channel == null) { + if (channel == null) + { throw new NullPointerException("channel not found in protocol session"); } @@ -99,8 +100,8 @@ public class SubscriptionImpl implements Subscription private boolean equals(SubscriptionImpl psc) { return sessionKey.equals(psc.sessionKey) - && psc.channel == channel - && psc.consumerTag.equals(consumerTag); + && psc.channel == channel + && psc.consumerTag.equals(consumerTag); } public int hashCode() @@ -113,18 +114,25 @@ public class SubscriptionImpl implements Subscription return "[channel=" + channel + ", consumerTag=" + consumerTag + ", session=" + protocolSession.getKey() + "]"; } + /** + * This method can be called by each of the publisher threads. + * As a result all changes to the channel object must be thread safe. + * + * @param msg + * @param queue + * @throws AMQException + */ public void send(AMQMessage msg, AMQQueue queue) throws AMQException { if (msg != null) { - final long deliveryTag = channel.getNextDeliveryTag(); + long deliveryTag = channel.prepareNewMessageForDelivery(_acks,msg,consumerTag,queue); + ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName()); AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId()); - if (_acks) - { - channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); - } + protocolSession.writeFrame(frame); + // if we do not need to wait for client acknowledgements we can decrement // the reference count immediately if (!_acks) diff --git a/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java index baa414ff19..8dd268e673 100644 --- a/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java @@ -17,21 +17,20 @@ */ package org.apache.qpid.server.store; +import org.apache.commons.configuration.Configuration; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.AMQException; -import org.apache.log4j.Logger; -import org.apache.commons.configuration.Configuration; -import java.util.concurrent.ConcurrentMap; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; -import java.util.List; /** * A simple message store that stores the messages in a threadsafe structure in memory. - * */ public class MemoryMessageStore implements MessageStore { @@ -48,7 +47,7 @@ public class MemoryMessageStore implements MessageStore public void configure() { _log.info("Using capacity " + DEFAULT_HASHTABLE_CAPACITY + " for hash table"); - _messageMap = new ConcurrentHashMap<Long, AMQMessage>(DEFAULT_HASHTABLE_CAPACITY); + _messageMap = new ConcurrentHashMap<Long, AMQMessage>(DEFAULT_HASHTABLE_CAPACITY); } public void configure(String base, Configuration config) @@ -65,7 +64,7 @@ public class MemoryMessageStore implements MessageStore public void close() throws Exception { - if(_messageMap != null) + if (_messageMap != null) { _messageMap.clear(); _messageMap = null; |
