summaryrefslogtreecommitdiff
path: root/java/broker
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2006-09-27 09:09:42 +0000
committerMartin Ritchie <ritchiem@apache.org>2006-09-27 09:09:42 +0000
commit1ba25d9231401f2f34ee41893d402e3cb2f299ed (patch)
treef0effe6896595dd38c0bbc60350b522c5416f480 /java/broker
parent6ce702dfb4ea0e1835804efd328be2eee79e23b3 (diff)
downloadqpid-python-1ba25d9231401f2f34ee41893d402e3cb2f299ed.tar.gz
AMQProtocolSession.java - white space changes
BasicMessageProducer.java - white space changes BasicMessageConsumer.java - white space changes AMQSession.java - added a comment MemoryMessageStore.java - white space changes SubscriptionImpl.java AMQChannel.java - Removed race condition where two messages could get the same delivery tag and when using acks where messages can be added to the UnackMap out of sequence, Causing unknown message to ack exceptions. DestNameExchange.java - white space/style changes. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@450384 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
-rw-r--r--java/broker/src/org/apache/qpid/server/AMQChannel.java77
-rw-r--r--java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java24
-rw-r--r--java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java26
-rw-r--r--java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java15
4 files changed, 90 insertions, 52 deletions
diff --git a/java/broker/src/org/apache/qpid/server/AMQChannel.java b/java/broker/src/org/apache/qpid/server/AMQChannel.java
index 8dc4626c46..d4226c42aa 100644
--- a/java/broker/src/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/org/apache/qpid/server/AMQChannel.java
@@ -23,26 +23,28 @@ import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.server.exchange.MessageRouter;
+import org.apache.qpid.server.management.DefaultManagedObject;
+import org.apache.qpid.server.management.Managable;
+import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.txn.TxnBuffer;
import org.apache.qpid.server.txn.TxnOp;
-import org.apache.qpid.server.management.Managable;
-import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.management.DefaultManagedObject;
-import javax.management.ObjectName;
-import javax.management.MalformedObjectNameException;
import javax.management.JMException;
import javax.management.MBeanException;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
public class AMQChannel implements Managable
{
@@ -62,7 +64,7 @@ public class AMQChannel implements Managable
* The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that
* value of this represents the <b>last</b> tag sent out
*/
- private long _deliveryTag;
+ private AtomicLong _deliveryTag = new AtomicLong(0);
/**
* A channel has a default queue (the last declared) that is used when no queue name is
@@ -74,7 +76,7 @@ public class AMQChannel implements Managable
* This tag is unique per subscription to a queue. The server returns this in response to a
* basic.consume request.
*/
- private int _consumerTag = 0;
+ private int _consumerTag;
/**
* The current message - which may be partial in the sense that not all frames have been received yet -
@@ -150,7 +152,7 @@ public class AMQChannel implements Managable
_txnBuffer.commit();
}
}
- catch(AMQException ex)
+ catch (AMQException ex)
{
throw new MBeanException(ex, ex.toString());
}
@@ -160,13 +162,13 @@ public class AMQChannel implements Managable
{
if (_transactional)
{
- synchronized (_txnBuffer)
+ synchronized(_txnBuffer)
{
try
{
_txnBuffer.rollback();
}
- catch(AMQException ex)
+ catch (AMQException ex)
{
throw new MBeanException(ex, ex.toString());
}
@@ -201,7 +203,7 @@ public class AMQChannel implements Managable
}
public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges)
- throws AMQException
+ throws AMQException
{
_channelId = channelId;
_channelName = _channelId + "-" + this.hashCode();
@@ -300,7 +302,7 @@ public class AMQChannel implements Managable
public long getNextDeliveryTag()
{
- return ++_deliveryTag;
+ return _deliveryTag.incrementAndGet();
}
public int getNextConsumerTag()
@@ -348,7 +350,7 @@ public class AMQChannel implements Managable
else
{
throw new AMQException(_log, "Consumer tag " + consumerTag + " not known to channel " +
- _channelId);
+ _channelId);
}
}
@@ -361,7 +363,7 @@ public class AMQChannel implements Managable
{
if (_transactional)
{
- synchronized (_txnBuffer)
+ synchronized(_txnBuffer)
{
_txnBuffer.rollback();//releases messages
}
@@ -390,7 +392,7 @@ public class AMQChannel implements Managable
*/
public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, String consumerTag, AMQQueue queue)
{
- synchronized (_unacknowledgedMessageMapLock)
+ synchronized(_unacknowledgedMessageMapLock)
{
_unacknowledgedMessageMap.put(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag));
checkSuspension();
@@ -405,7 +407,7 @@ public class AMQChannel implements Managable
{
// we must create a new map since all the messages will get a new delivery tag when they are redelivered
Map<Long, UnacknowledgedMessage> currentList;
- synchronized (_unacknowledgedMessageMapLock)
+ synchronized(_unacknowledgedMessageMapLock)
{
currentList = _unacknowledgedMessageMap;
_unacknowledgedMessageMap = new LinkedHashMap<Long, UnacknowledgedMessage>(DEFAULT_PREFETCH);
@@ -426,7 +428,7 @@ public class AMQChannel implements Managable
public void resend(AMQProtocolSession session)
{
//messages go to this channel
- synchronized (_unacknowledgedMessageMapLock)
+ synchronized(_unacknowledgedMessageMapLock)
{
for (Map.Entry<Long, UnacknowledgedMessage> entry : _unacknowledgedMessageMap.entrySet())
{
@@ -449,7 +451,7 @@ public class AMQChannel implements Managable
*/
public void queueDeleted(AMQQueue queue)
{
- synchronized (_unacknowledgedMessageMapLock)
+ synchronized(_unacknowledgedMessageMapLock)
{
for (Map.Entry<Long, UnacknowledgedMessage> unacked : _unacknowledgedMessageMap.entrySet())
{
@@ -465,13 +467,25 @@ public class AMQChannel implements Managable
catch (AMQException e)
{
_log.error("Error decrementing ref count on message " + unackedMsg.message.getMessageId() + ": " +
- e, e);
+ e, e);
}
}
}
}
}
+ public synchronized long prepareNewMessageForDelivery(boolean acks, AMQMessage msg, String consumerTag, AMQQueue queue)
+ {
+ long deliveryTag = getNextDeliveryTag();
+
+ if (acks)
+ {
+ addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+ }
+
+ return deliveryTag;
+ }
+
/**
* Acknowledge one or more messages.
*
@@ -498,7 +512,7 @@ public class AMQChannel implements Managable
if (multiple)
{
LinkedList<UnacknowledgedMessage> acked = new LinkedList<UnacknowledgedMessage>();
- synchronized (_unacknowledgedMessageMapLock)
+ synchronized(_unacknowledgedMessageMapLock)
{
if (deliveryTag == 0)
{
@@ -514,10 +528,20 @@ public class AMQChannel implements Managable
throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel");
}
Iterator<Map.Entry<Long, UnacknowledgedMessage>> i = _unacknowledgedMessageMap.entrySet().iterator();
+
while (i.hasNext())
{
+
Map.Entry<Long, UnacknowledgedMessage> unacked = i.next();
+
+ if (unacked.getKey() > deliveryTag)
+ {
+ //This should not occur now.
+ throw new AMQException("UnacknowledgedMessageMap is out of order:" + unacked.getKey() + " When deliveryTag is:" + deliveryTag + "ES:" + _unacknowledgedMessageMap.entrySet().toString());
+ }
+
i.remove();
+
acked.add(unacked.getValue());
if (unacked.getKey() == deliveryTag)
{
@@ -525,11 +549,12 @@ public class AMQChannel implements Managable
}
}
}
- }
+ }// synchronized
+
if (_log.isDebugEnabled())
{
_log.debug("Received multiple ack for delivery tag " + deliveryTag + ". Removing " +
- acked.size() + " items.");
+ acked.size() + " items.");
}
for (UnacknowledgedMessage msg : acked)
@@ -541,12 +566,14 @@ public class AMQChannel implements Managable
else
{
UnacknowledgedMessage msg;
- synchronized (_unacknowledgedMessageMapLock)
+ synchronized(_unacknowledgedMessageMapLock)
{
msg = _unacknowledgedMessageMap.remove(deliveryTag);
}
+
if (msg == null)
{
+ _log.info("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId);
throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId);
}
msg.discard();
@@ -573,7 +600,7 @@ public class AMQChannel implements Managable
{
boolean suspend;
//noinspection SynchronizeOnNonFinalField
- synchronized (_unacknowledgedMessageMapLock)
+ synchronized(_unacknowledgedMessageMapLock)
{
suspend = _unacknowledgedMessageMap.size() >= _prefetchCount;
}
@@ -614,7 +641,7 @@ public class AMQChannel implements Managable
public void rollback() throws AMQException
{
//need to protect rollback and close from each other...
- synchronized (_txnBuffer)
+ synchronized(_txnBuffer)
{
_txnBuffer.rollback();
}
diff --git a/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java
index 7f1c7df224..a703595cc4 100644
--- a/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java
+++ b/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java
@@ -17,19 +17,21 @@
*/
package org.apache.qpid.server.exchange;
-import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.*;
-import org.apache.log4j.Logger;
-import javax.management.openmbean.*;
-import javax.management.MBeanException;
import javax.management.JMException;
+import javax.management.MBeanException;
+import javax.management.openmbean.*;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.ArrayList;
public class DestNameExchange extends AbstractExchange
{
@@ -117,12 +119,14 @@ public class DestNameExchange extends AbstractExchange
}
public void createBinding(String queueName, String binding)
- throws JMException
+ throws JMException
{
AMQQueue queue = ApplicationRegistry.getInstance().getQueueRegistry().getQueue(queueName);
if (queue == null)
+ {
throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange.");
+ }
try
{
@@ -147,7 +151,7 @@ public class DestNameExchange extends AbstractExchange
{
assert queue != null;
assert routingKey != null;
- if(!_index.add(routingKey, queue))
+ if (!_index.add(routingKey, queue))
{
_logger.debug("Queue " + queue + " is already registered with routing key " + routingKey);
}
@@ -195,7 +199,7 @@ public class DestNameExchange extends AbstractExchange
_logger.debug("Publishing message to queue " + queues);
}
- for(AMQQueue q :queues)
+ for (AMQQueue q : queues)
{
q.deliver(payload);
}
diff --git a/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java
index a3c2fab4f4..ef18f61070 100644
--- a/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -19,12 +19,12 @@ package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.BasicDeliverBody;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.AMQException;
/**
* Encapsulation of a supscription to a queue.
@@ -70,7 +70,8 @@ public class SubscriptionImpl implements Subscription
throws AMQException
{
AMQChannel channel = protocolSession.getChannel(channelId);
- if (channel == null) {
+ if (channel == null)
+ {
throw new NullPointerException("channel not found in protocol session");
}
@@ -99,8 +100,8 @@ public class SubscriptionImpl implements Subscription
private boolean equals(SubscriptionImpl psc)
{
return sessionKey.equals(psc.sessionKey)
- && psc.channel == channel
- && psc.consumerTag.equals(consumerTag);
+ && psc.channel == channel
+ && psc.consumerTag.equals(consumerTag);
}
public int hashCode()
@@ -113,18 +114,25 @@ public class SubscriptionImpl implements Subscription
return "[channel=" + channel + ", consumerTag=" + consumerTag + ", session=" + protocolSession.getKey() + "]";
}
+ /**
+ * This method can be called by each of the publisher threads.
+ * As a result all changes to the channel object must be thread safe.
+ *
+ * @param msg
+ * @param queue
+ * @throws AMQException
+ */
public void send(AMQMessage msg, AMQQueue queue) throws AMQException
{
if (msg != null)
{
- final long deliveryTag = channel.getNextDeliveryTag();
+ long deliveryTag = channel.prepareNewMessageForDelivery(_acks,msg,consumerTag,queue);
+
ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
- if (_acks)
- {
- channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
- }
+
protocolSession.writeFrame(frame);
+
// if we do not need to wait for client acknowledgements we can decrement
// the reference count immediately
if (!_acks)
diff --git a/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java
index baa414ff19..8dd268e673 100644
--- a/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -17,21 +17,20 @@
*/
package org.apache.qpid.server.store;
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.AMQException;
-import org.apache.log4j.Logger;
-import org.apache.commons.configuration.Configuration;
-import java.util.concurrent.ConcurrentMap;
+import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.List;
/**
* A simple message store that stores the messages in a threadsafe structure in memory.
- *
*/
public class MemoryMessageStore implements MessageStore
{
@@ -48,7 +47,7 @@ public class MemoryMessageStore implements MessageStore
public void configure()
{
_log.info("Using capacity " + DEFAULT_HASHTABLE_CAPACITY + " for hash table");
- _messageMap = new ConcurrentHashMap<Long, AMQMessage>(DEFAULT_HASHTABLE_CAPACITY);
+ _messageMap = new ConcurrentHashMap<Long, AMQMessage>(DEFAULT_HASHTABLE_CAPACITY);
}
public void configure(String base, Configuration config)
@@ -65,7 +64,7 @@ public class MemoryMessageStore implements MessageStore
public void close() throws Exception
{
- if(_messageMap != null)
+ if (_messageMap != null)
{
_messageMap.clear();
_messageMap = null;