summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2008-02-21 10:09:03 +0000
committerRobert Godfrey <rgodfrey@apache.org>2008-02-21 10:09:03 +0000
commit3047c0ec2d581f4b51c77fec84fbf0bec8599573 (patch)
tree7ba966b95105a3576cf2fc9150b6b9dd322f4b14 /java/client/src
parent3aed99f65d795c234faa9b584182cf3ea8c67b4a (diff)
downloadqpid-python-3047c0ec2d581f4b51c77fec84fbf0bec8599573.tar.gz
QPID-790 : Performance Improvements
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@629731 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java111
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java206
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java63
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java12
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java54
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java53
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java8
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java93
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java123
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java74
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java17
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java28
14 files changed, 600 insertions, 252 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 79d92f7705..b60a8dfaad 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -73,6 +73,105 @@ import java.util.concurrent.atomic.AtomicInteger;
public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
{
+ private static final class ChannelToSessionMap
+ {
+ private final AMQSession[] _fastAccessSessions = new AMQSession[16];
+ private final LinkedHashMap<Integer, AMQSession> _slowAccessSessions = new LinkedHashMap<Integer, AMQSession>();
+ private int _size = 0;
+ private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
+
+ public AMQSession get(int channelId)
+ {
+ if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+ {
+ return _fastAccessSessions[channelId];
+ }
+ else
+ {
+ return _slowAccessSessions.get(channelId);
+ }
+ }
+
+ public AMQSession put(int channelId, AMQSession session)
+ {
+ AMQSession oldVal;
+ if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+ {
+ oldVal = _fastAccessSessions[channelId];
+ _fastAccessSessions[channelId] = session;
+ }
+ else
+ {
+ oldVal = _slowAccessSessions.put(channelId, session);
+ }
+ if((oldVal != null) && (session == null))
+ {
+ _size--;
+ }
+ else if((oldVal == null) && (session != null))
+ {
+ _size++;
+ }
+
+ return session;
+
+ }
+
+
+ public AMQSession remove(int channelId)
+ {
+ AMQSession session;
+ if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+ {
+ session = _fastAccessSessions[channelId];
+ _fastAccessSessions[channelId] = null;
+ }
+ else
+ {
+ session = _slowAccessSessions.remove(channelId);
+ }
+
+ if(session != null)
+ {
+ _size--;
+ }
+ return session;
+
+ }
+
+ public Collection<AMQSession> values()
+ {
+ ArrayList<AMQSession> values = new ArrayList<AMQSession>(size());
+
+ for(int i = 0; i < 16; i++)
+ {
+ if(_fastAccessSessions[i] != null)
+ {
+ values.add(_fastAccessSessions[i]);
+ }
+ }
+ values.addAll(_slowAccessSessions.values());
+
+ return values;
+ }
+
+ public int size()
+ {
+ return _size;
+ }
+
+ public void clear()
+ {
+ _size = 0;
+ _slowAccessSessions.clear();
+ for(int i = 0; i<16; i++)
+ {
+ _fastAccessSessions[i] = null;
+ }
+ }
+ }
+
+
private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class);
private AtomicInteger _idFactory = new AtomicInteger(0);
@@ -102,7 +201,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private AMQProtocolHandler _protocolHandler;
/** Maps from session id (Integer) to AMQSession instance */
- private final Map<Integer, AMQSession> _sessions = new LinkedHashMap<Integer, AMQSession>();
+ private final ChannelToSessionMap _sessions = new ChannelToSessionMap();
private String _clientName;
@@ -757,10 +856,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
checkNotClosed();
if (!_started)
{
- final Iterator it = _sessions.entrySet().iterator();
+ final Iterator it = _sessions.values().iterator();
while (it.hasNext())
{
- final AMQSession s = (AMQSession) ((Map.Entry) it.next()).getValue();
+ final AMQSession s = (AMQSession) (it.next());
try
{
s.start();
@@ -1014,11 +1113,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
return _maximumFrameSize;
}
- public Map getSessions()
- {
- return _sessions;
- }
-
public String getUsername()
{
return _username;
@@ -1239,6 +1333,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
// _protocolHandler.addSessionByChannel(s.getChannelId(), s);
reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted());
s.resubscribe();
+ s.setFlowControl(true);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 0c94216597..87c813982e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -78,10 +78,7 @@ import javax.jms.TopicSubscriber;
import javax.jms.TransactionRolledBackException;
import java.io.Serializable;
import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
@@ -107,6 +104,89 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class AMQSession extends Closeable implements Session, QueueSession, TopicSession
{
+ private static final class IdToConsumerMap
+ {
+ private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
+ private final ConcurrentHashMap<Integer, BasicMessageConsumer> _slowAccessConsumers = new ConcurrentHashMap<Integer, BasicMessageConsumer>();
+
+
+ public BasicMessageConsumer get(int id)
+ {
+ if((id & 0xFFFFFFF0) == 0)
+ {
+ return _fastAccessConsumers[id];
+ }
+ else
+ {
+ return _slowAccessConsumers.get(id);
+ }
+ }
+
+ public BasicMessageConsumer put(int id, BasicMessageConsumer consumer)
+ {
+ BasicMessageConsumer oldVal;
+ if((id & 0xFFFFFFF0) == 0)
+ {
+ oldVal = _fastAccessConsumers[id];
+ _fastAccessConsumers[id] = consumer;
+ }
+ else
+ {
+ oldVal = _slowAccessConsumers.put(id, consumer);
+ }
+
+ return consumer;
+
+ }
+
+
+ public BasicMessageConsumer remove(int id)
+ {
+ BasicMessageConsumer consumer;
+ if((id & 0xFFFFFFF0) == 0)
+ {
+ consumer = _fastAccessConsumers[id];
+ _fastAccessConsumers[id] = null;
+ }
+ else
+ {
+ consumer = _slowAccessConsumers.remove(id);
+ }
+
+ return consumer;
+
+ }
+
+ public Collection<BasicMessageConsumer> values()
+ {
+ ArrayList<BasicMessageConsumer> values = new ArrayList<BasicMessageConsumer>();
+
+ for(int i = 0; i < 16; i++)
+ {
+ if(_fastAccessConsumers[i] != null)
+ {
+ values.add(_fastAccessConsumers[i]);
+ }
+ }
+ values.addAll(_slowAccessConsumers.values());
+
+ return values;
+ }
+
+
+ public void clear()
+ {
+ _slowAccessConsumers.clear();
+ for(int i = 0; i<16; i++)
+ {
+ _fastAccessConsumers[i] = null;
+ }
+ }
+ }
+
+
+
+
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
@@ -156,7 +236,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private boolean _transacted;
/** Holds the sessions acknowledgement mode. */
- private int _acknowledgeMode;
+ private final int _acknowledgeMode;
/** Holds this session unique identifier, used to distinguish it from other sessions. */
private int _channelId;
@@ -217,8 +297,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right
* consumer.
*/
- private Map<AMQShortString, BasicMessageConsumer> _consumers =
- new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
+ private final IdToConsumerMap _consumers = new IdToConsumerMap();
+
+ //Map<AMQShortString, BasicMessageConsumer> _consumers =
+ //new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
/**
* Contains a list of consumers which have been removed but which might still have
@@ -281,6 +363,27 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
/** Has failover occured on this session */
private boolean _failedOver;
+
+
+ private static final class FlowControlIndicator
+ {
+ private volatile boolean _flowControl = true;
+
+ public synchronized void setFlowControl(boolean flowControl)
+ {
+ _flowControl= flowControl;
+ notify();
+ }
+
+ public boolean getFlowControl()
+ {
+ return _flowControl;
+ }
+ }
+
+ /** Flow control */
+ private FlowControlIndicator _flowControl = new FlowControlIndicator();
+
/**
* Creates a new session on a connection.
*
@@ -327,24 +430,20 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
public void aboveThreshold(int currentValue)
{
- if (_acknowledgeMode == NO_ACKNOWLEDGE)
- {
_logger.debug(
"Above threshold(" + _defaultPrefetchHighMark
+ ") so suspending channel. Current value is " + currentValue);
new Thread(new SuspenderRunner(true)).start();
- }
+
}
public void underThreshold(int currentValue)
{
- if (_acknowledgeMode == NO_ACKNOWLEDGE)
- {
_logger.debug(
"Below threshold(" + _defaultPrefetchLowMark
+ ") so unsuspending channel. Current value is " + currentValue);
new Thread(new SuspenderRunner(false)).start();
- }
+
}
});
}
@@ -662,7 +761,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
// Remove the consumer from the map
- BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag);
+ BasicMessageConsumer consumer = _consumers.get(consumerTag.toIntValue());
if (consumer != null)
{
// fixme this isn't right.. needs to check if _queue contains data for this consumer
@@ -744,6 +843,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
false, false);
}
+
+ public MessageConsumer createExclusiveConsumer(Destination destination) throws JMSException
+ {
+ checkValidDestination(destination);
+
+ return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, true, null, null,
+ false, false);
+ }
+
+
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
{
checkValidDestination(destination);
@@ -761,6 +870,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
messageSelector, null, false, false);
}
+
+ public MessageConsumer createExclusiveConsumer(Destination destination, String messageSelector, boolean noLocal)
+ throws JMSException
+ {
+ checkValidDestination(destination);
+
+ return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, true,
+ messageSelector, null, false, false);
+ }
+
+
public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
String selector) throws JMSException
{
@@ -925,7 +1045,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
checkNotClosed();
- return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic), topic);
+ return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic,false,false), topic);
}
public Queue createQueue(String queueName) throws JMSException
@@ -1089,9 +1209,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
AMQTopic dest = checkValidTopic(topic);
// AMQTopic dest = new AMQTopic(topic.getTopicName());
- return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
+ return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest));
}
+
/**
* Creates a non-durable subscriber with a message selector
*
@@ -1109,7 +1230,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
AMQTopic dest = checkValidTopic(topic);
// AMQTopic dest = new AMQTopic(topic.getTopicName());
- return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal));
+ return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest, messageSelector, noLocal));
}
public TemporaryQueue createTemporaryQueue() throws JMSException
@@ -1276,15 +1397,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
+ "] received in session with channel id " + _channelId);
}
- if (message.getDeliverBody() == null)
+ if (message.isDeliverMessage())
{
- // Return of the bounced message.
- returnBouncedMessage(message);
+ _highestDeliveryTag.set(message.getDeliverBody().getDeliveryTag());
+ _queue.add(message);
}
else
{
- _highestDeliveryTag.set(message.getDeliverBody().getDeliveryTag());
- _queue.add(message);
+ // Return of the bounced message.
+ returnBouncedMessage(message);
}
}
@@ -1666,7 +1787,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
void deregisterConsumer(BasicMessageConsumer consumer)
{
- if (_consumers.remove(consumer.getConsumerTag()) != null)
+ if (_consumers.remove(consumer.getConsumerTag().toIntValue()) != null)
{
String subscriptionName = _reverseSubscriptionMap.remove(consumer);
if (subscriptionName != null)
@@ -2063,8 +2184,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName,
AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException
{
+ int tagId = _nextTag++;
// need to generate a consumer tag on the client so we can exploit the nowait flag
- AMQShortString tag = new AMQShortString(Integer.toString(_nextTag++));
+ AMQShortString tag = new AMQShortString(Integer.toString(tagId));
FieldTable arguments = FieldTableFactory.newFieldTable();
if ((messageSelector != null) && !messageSelector.equals(""))
@@ -2084,7 +2206,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
consumer.setConsumerTag(tag);
// we must register the consumer in the map before we actually start listening
- _consumers.put(tag, consumer);
+ _consumers.put(tagId, consumer);
try
{
@@ -2112,7 +2234,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
catch (AMQException e)
{
// clean-up the map in the event of an error
- _consumers.remove(tag);
+ _consumers.remove(tagId);
throw e;
}
}
@@ -2659,6 +2781,25 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_ticket = ticket;
}
+ public void setFlowControl(final boolean active)
+ {
+ _flowControl.setFlowControl(active);
+ }
+
+
+ public void checkFlowControl() throws InterruptedException
+ {
+ synchronized(_flowControl)
+ {
+ while(!_flowControl.getFlowControl())
+ {
+ _flowControl.wait();
+ }
+ }
+
+ }
+
+
/** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
private class Dispatcher extends Thread
{
@@ -2850,10 +2991,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private void dispatchMessage(UnprocessedMessage message)
{
- if (message.getDeliverBody() != null)
+ final BasicDeliverBody deliverBody = message.getDeliverBody();
+ if (deliverBody != null)
{
final BasicMessageConsumer consumer =
- (BasicMessageConsumer) _consumers.get(message.getDeliverBody().getConsumerTag());
+ _consumers.get(deliverBody.getConsumerTag().toIntValue());
if ((consumer == null) || consumer.isClosed())
{
@@ -2862,13 +3004,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (consumer == null)
{
_dispatcherLogger.info("Dispatcher(" + dispatcherID + ")Received a message(" + System.identityHashCode(message) + ")" + "["
- + message.getDeliverBody().getDeliveryTag() + "] from queue "
- + message.getDeliverBody().getConsumerTag() + " )without a handler - rejecting(requeue)...");
+ + deliverBody.getDeliveryTag() + "] from queue "
+ + deliverBody.getConsumerTag() + " )without a handler - rejecting(requeue)...");
}
else
{
_dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "["
- + message.getDeliverBody().getDeliveryTag() + "] from queue " + " consumer("
+ + deliverBody.getDeliveryTag() + "] from queue " + " consumer("
+ consumer.debugIdentity() + ") is closed rejecting(requeue)...");
}
}
@@ -2880,7 +3022,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
else
{
- consumer.notifyMessage(message, _channelId);
+ consumer.notifyMessage(message);
}
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 488d22c4bd..bf11572163 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -26,11 +26,7 @@ import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicCancelBody;
-import org.apache.qpid.framing.BasicCancelOkBody;
-import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.*;
import org.apache.qpid.jms.MessageConsumer;
import org.apache.qpid.jms.Session;
import org.slf4j.Logger;
@@ -53,13 +49,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class);
/** The connection being used by this consumer */
- private AMQConnection _connection;
+ private final AMQConnection _connection;
- private String _messageSelector;
+ private final String _messageSelector;
- private boolean _noLocal;
+ private final boolean _noLocal;
- private AMQDestination _destination;
+ private final AMQDestination _destination;
/** When true indicates that a blocking receive call is in progress */
private final AtomicBoolean _receiving = new AtomicBoolean(false);
@@ -70,7 +66,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
private AMQShortString _consumerTag;
/** We need to know the channel id when constructing frames */
- private int _channelId;
+ private final int _channelId;
/**
* Used in the blocking receive methods to receive a message from the Session thread. <p/> Or to notify of errors
@@ -78,36 +74,36 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
*/
private final ArrayBlockingQueue _synchronousQueue;
- private MessageFactoryRegistry _messageFactory;
+ private final MessageFactoryRegistry _messageFactory;
private final AMQSession _session;
- private AMQProtocolHandler _protocolHandler;
+ private final AMQProtocolHandler _protocolHandler;
/** We need to store the "raw" field table so that we can resubscribe in the event of failover being required */
- private FieldTable _rawSelectorFieldTable;
+ private final FieldTable _rawSelectorFieldTable;
/**
* We store the high water prefetch field in order to be able to reuse it when resubscribing in the event of
* failover
*/
- private int _prefetchHigh;
+ private final int _prefetchHigh;
/**
* We store the low water prefetch field in order to be able to reuse it when resubscribing in the event of
* failover
*/
- private int _prefetchLow;
+ private final int _prefetchLow;
/** We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover */
- private boolean _exclusive;
+ private final boolean _exclusive;
/**
* The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes per
* consumer whereas JMS defines this at the session level, hence why we associate it with the consumer in our
* implementation.
*/
- private int _acknowledgeMode;
+ private final int _acknowledgeMode;
/** Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode */
private int _outstanding;
@@ -133,10 +129,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
* autoClose denotes that the consumer will automatically cancel itself when there are no more messages to receive
* on the queue. This is used for queue browsing.
*/
- private boolean _autoClose;
+ private final boolean _autoClose;
private boolean _closeWhenNoMessages;
- private boolean _noConsume;
+ private final boolean _noConsume;
private List<StackTraceElement> _closedStack = null;
protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination,
@@ -156,7 +152,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
_prefetchHigh = prefetchHigh;
_prefetchLow = prefetchLow;
_exclusive = exclusive;
- _acknowledgeMode = acknowledgeMode;
+
_synchronousQueue = new ArrayBlockingQueue(prefetchHigh, true);
_autoClose = autoClose;
_noConsume = noConsume;
@@ -166,6 +162,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
_acknowledgeMode = Session.NO_ACKNOWLEDGE;
}
+ else
+ {
+ _acknowledgeMode = acknowledgeMode;
+ }
}
public AMQDestination getDestination()
@@ -254,7 +254,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
switch (_acknowledgeMode)
{
case Session.DUPS_OK_ACKNOWLEDGE:
- _logger.info("Recording tag for acking on close:" + msg.getDeliveryTag());
_receivedDeliveryTags.add(msg.getDeliveryTag());
break;
@@ -269,7 +268,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
else
{
- _logger.info("Recording tag for commit:" + msg.getDeliveryTag());
_receivedDeliveryTags.add(msg.getDeliveryTag());
}
@@ -645,9 +643,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
* message listener or a synchronous receive() caller.
*
* @param messageFrame the raw unprocessed mesage
- * @param channelId channel on which this message was sent
*/
- void notifyMessage(UnprocessedMessage messageFrame, int channelId)
+ void notifyMessage(UnprocessedMessage messageFrame)
{
final boolean debug = _logger.isDebugEnabled();
@@ -658,10 +655,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
try
{
+ final BasicDeliverBody deliverBody = messageFrame.getDeliverBody();
+
AbstractJMSMessage jmsMessage =
- _messageFactory.createMessage(messageFrame.getDeliverBody().getDeliveryTag(),
- messageFrame.getDeliverBody().getRedelivered(), messageFrame.getDeliverBody().getExchange(),
- messageFrame.getDeliverBody().getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies());
+ _messageFactory.createMessage(deliverBody.getDeliveryTag(),
+ deliverBody.getRedelivered(),
+ deliverBody.getExchange(),
+ deliverBody.getRoutingKey(),
+ messageFrame.getContentHeader(),
+ messageFrame.getBodies());
if (debug)
{
@@ -673,11 +675,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
// if (!_closed.get())
{
- jmsMessage.setConsumer(this);
-
preDeliver(jmsMessage);
- notifyMessage(jmsMessage, channelId);
+ notifyMessage(jmsMessage);
}
// else
// {
@@ -702,9 +702,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
/**
* @param jmsMessage this message has already been processed so can't redo preDeliver
- * @param channelId
*/
- public void notifyMessage(AbstractJMSMessage jmsMessage, int channelId)
+ public void notifyMessage(AbstractJMSMessage jmsMessage)
{
try
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 7e96fb537c..ae71846870 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -538,6 +538,18 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
frames[0] = publishFrame;
frames[1] = contentHeaderFrame;
CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
+
+ try
+ {
+ _session.checkFlowControl();
+ }
+ catch (InterruptedException e)
+ {
+ JMSException jmsEx = new JMSException("Interrupted while waiting for flow control to be removed");
+ jmsEx.setLinkedException(e);
+ throw jmsEx;
+ }
+
_protocolHandler.writeFrame(compositeFrame, wait);
if (message != origMessage)
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
index 49c8a83833..d05e99d210 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
@@ -26,7 +26,6 @@ import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.BasicDeliverBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,8 +45,8 @@ public class BasicDeliverMethodHandler implements StateAwareMethodListener<Basic
throws AMQException
{
final AMQProtocolSession session = stateManager.getProtocolSession();
- final UnprocessedMessage msg = new UnprocessedMessage(channelId, body);
+ final UnprocessedMessage msg = new UnprocessedMessage.UnprocessedDeliverMessage(body);
_logger.debug("New JmsDeliver method received");
- session.unprocessedMessageReceived(msg);
+ session.unprocessedMessageReceived(channelId, msg);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
index 428d366f07..2ebc9288c3 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
@@ -26,7 +26,6 @@ import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.BasicReturnBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,9 +46,9 @@ public void methodReceived(AMQStateManager stateManager, BasicReturnBody body, i
{
_logger.debug("New JmsBounce method received");
final AMQProtocolSession session = stateManager.getProtocolSession();
- final UnprocessedMessage msg = new UnprocessedMessage(channelId, body);
+ final UnprocessedMessage msg = new UnprocessedMessage.UnprocessedBouncedMessage(body);
- session.unprocessedMessageReceived(msg);
+ session.unprocessedMessageReceived(channelId, msg);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java
new file mode 100644
index 0000000000..b47fe751d6
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java
@@ -0,0 +1,54 @@
+package org.apache.qpid.client.handler;
+
+import org.apache.qpid.framing.ChannelFlowBody;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.AMQException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+public class ChannelFlowMethodHandler implements StateAwareMethodListener<ChannelFlowBody>
+{
+ private static final Logger _logger = LoggerFactory.getLogger(ChannelFlowMethodHandler.class);
+ private static final ChannelFlowMethodHandler _instance = new ChannelFlowMethodHandler();
+
+ public static ChannelFlowMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private ChannelFlowMethodHandler()
+ { }
+
+ public void methodReceived(AMQStateManager stateManager, ChannelFlowBody body, int channelId)
+ throws AMQException
+ {
+
+ final AMQProtocolSession session = stateManager.getProtocolSession();
+ session.setFlowControl(channelId, body.getActive());
+ }
+
+
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index b029770946..1947a18653 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -55,8 +55,9 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
protected boolean _changedData;
private Destination _destination;
private JMSHeaderAdapter _headerAdapter;
- private BasicMessageConsumer _consumer;
- private boolean _strictAMQP;
+
+ private static final boolean STRICT_AMQP_COMPLIANCE =
+ Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT));
protected AbstractJMSMessage(ByteBuffer data)
{
@@ -72,8 +73,6 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
_changedData = (data == null);
_headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders());
- _strictAMQP =
- Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT));
}
protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
@@ -121,7 +120,10 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
{
if (getContentHeaderProperties().getMessageIdAsString() == null)
{
- getContentHeaderProperties().setMessageId("ID:" + UUID.randomUUID());
+ StringBuilder b = new StringBuilder(39);
+ b.append("ID");
+ b.append(UUID.randomUUID());
+ getContentHeaderProperties().setMessageId(b.toString());
}
return getContentHeaderProperties().getMessageIdAsString();
@@ -301,7 +303,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public boolean getBooleanProperty(AMQShortString propertyName) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -311,7 +313,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public boolean getBooleanProperty(String propertyName) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -321,7 +323,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public byte getByteProperty(String propertyName) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -331,7 +333,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public byte[] getBytesProperty(AMQShortString propertyName) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -341,7 +343,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public short getShortProperty(String propertyName) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -351,7 +353,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public int getIntProperty(String propertyName) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -361,7 +363,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public long getLongProperty(String propertyName) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -371,7 +373,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public float getFloatProperty(String propertyName) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -381,7 +383,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public double getDoubleProperty(String propertyName) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -404,7 +406,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
}
else
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -425,7 +427,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public void setBooleanProperty(AMQShortString propertyName, boolean b) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -436,7 +438,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public void setBooleanProperty(String propertyName, boolean b) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -447,7 +449,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public void setByteProperty(String propertyName, byte b) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -458,7 +460,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public void setBytesProperty(AMQShortString propertyName, byte[] bytes) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -469,7 +471,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public void setShortProperty(String propertyName, short i) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -487,7 +489,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public void setLongProperty(String propertyName, long l) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -498,7 +500,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public void setFloatProperty(String propertyName, float f) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -509,7 +511,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public void setDoubleProperty(String propertyName, double v) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -691,9 +693,4 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
}
}
- public void setConsumer(BasicMessageConsumer basicMessageConsumer)
- {
- _consumer = basicMessageConsumer;
- }
-
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
index a70acbabbe..d8fe964b85 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
@@ -55,7 +55,11 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm
JMSMapMessage(ByteBuffer data) throws JMSException
{
super(data); // this instantiates a content header
- populateMapFromData();
+ if(data != null)
+ {
+ populateMapFromData();
+ }
+
}
JMSMapMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, AMQShortString routingKey,
@@ -76,7 +80,7 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm
public String toBodyString() throws JMSException
{
- return _map.toString();
+ return _map == null ? "" : _map.toString();
}
public AMQShortString getMimeTypeAsShortString()
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
index 5b199f2478..bc1ba155cb 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
@@ -36,33 +36,15 @@ import org.apache.qpid.framing.ContentHeaderBody;
* Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher
* thread in order to minimise the amount of work done in the MINA dispatcher thread.
*/
-public class UnprocessedMessage
+public abstract class UnprocessedMessage
{
- private long _bytesReceived = 0;
+ private long _bytesReceived = 0L;
- private final BasicDeliverBody _deliverBody;
- private final BasicReturnBody _bounceBody; // TODO: check change (gustavo)
- private final int _channelId;
private ContentHeaderBody _contentHeader;
/** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */
private List<ContentBody> _bodies;
- public UnprocessedMessage(int channelId, BasicDeliverBody deliverBody)
- {
- _deliverBody = deliverBody;
- _channelId = channelId;
- _bounceBody = null;
- }
-
-
- public UnprocessedMessage(int channelId, BasicReturnBody bounceBody)
- {
- _deliverBody = null;
- _channelId = channelId;
- _bounceBody = bounceBody;
- }
-
public void receiveBody(ContentBody body) //throws UnexpectedBodyReceivedException
{
@@ -96,22 +78,11 @@ public class UnprocessedMessage
return _bytesReceived == getContentHeader().bodySize;
}
- public BasicDeliverBody getDeliverBody()
- {
- return _deliverBody;
- }
- public BasicReturnBody getBounceBody()
- {
- return _bounceBody;
- }
+ abstract public BasicDeliverBody getDeliverBody();
- public int getChannelId()
- {
- return _channelId;
- }
-
+ abstract public BasicReturnBody getBounceBody();
public ContentHeaderBody getContentHeader()
{
@@ -128,4 +99,60 @@ public class UnprocessedMessage
return _bodies;
}
+ abstract public boolean isDeliverMessage();
+
+ public static final class UnprocessedDeliverMessage extends UnprocessedMessage
+ {
+ private final BasicDeliverBody _body;
+
+ public UnprocessedDeliverMessage(final BasicDeliverBody body)
+ {
+ _body = body;
+ }
+
+
+ public BasicDeliverBody getDeliverBody()
+ {
+ return _body;
+ }
+
+ public BasicReturnBody getBounceBody()
+ {
+ return null;
+ }
+
+ public boolean isDeliverMessage()
+ {
+ return true;
+ }
+ }
+
+ public static final class UnprocessedBouncedMessage extends UnprocessedMessage
+ {
+ private final BasicReturnBody _body;
+
+ public UnprocessedBouncedMessage(final BasicReturnBody body)
+ {
+ _body = body;
+ }
+
+
+ public BasicDeliverBody getDeliverBody()
+ {
+ return null;
+ }
+
+ public BasicReturnBody getBounceBody()
+ {
+ return _body;
+ }
+
+ public boolean isDeliverMessage()
+ {
+ return false;
+ }
+ }
+
+
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index f70c1faa84..3dee0b0142 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -407,7 +407,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
*/
public void propagateExceptionToWaiters(Exception e)
{
- getStateManager().error(e);
+
if (!_frameListeners.isEmpty())
{
final Iterator it = _frameListeners.iterator();
@@ -439,78 +439,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter
HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
- switch (bodyFrame.getFrameType())
- {
- case AMQMethodBody.TYPE:
-
- if (debug)
- {
- _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame);
- }
-
- final AMQMethodEvent<AMQMethodBody> evt =
- new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame);
-
- try
- {
-
- boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
- if (!_frameListeners.isEmpty())
- {
- Iterator it = _frameListeners.iterator();
- while (it.hasNext())
- {
- final AMQMethodListener listener = (AMQMethodListener) it.next();
- wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
- }
- }
-
- if (!wasAnyoneInterested)
- {
- throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:"
- + _frameListeners);
- }
- }
- catch (AMQException e)
- {
- getStateManager().error(e);
- if (!_frameListeners.isEmpty())
- {
- Iterator it = _frameListeners.iterator();
- while (it.hasNext())
- {
- final AMQMethodListener listener = (AMQMethodListener) it.next();
- listener.error(e);
- }
- }
-
- exceptionCaught(session, e);
- }
-
- break;
-
- case ContentHeaderBody.TYPE:
-
- _protocolSession.messageContentHeaderReceived(frame.getChannel(), (ContentHeaderBody) bodyFrame);
- break;
+ bodyFrame.handle(frame.getChannel(),_protocolSession);
- case ContentBody.TYPE:
-
- _protocolSession.messageContentBodyReceived(frame.getChannel(), (ContentBody) bodyFrame);
- break;
-
- case HeartbeatBody.TYPE:
-
- if (debug)
- {
- _logger.debug("Received heartbeat");
- }
-
- break;
-
- default:
-
- }
_connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
}
@@ -528,6 +458,55 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
}
+ public void methodBodyReceived(final int channelId, final AMQBody bodyFrame, IoSession session)//, final IoSession session)
+ throws AMQException
+ {
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + bodyFrame);
+ }
+
+ final AMQMethodEvent<AMQMethodBody> evt =
+ new AMQMethodEvent<AMQMethodBody>(channelId, (AMQMethodBody) bodyFrame);
+
+ try
+ {
+
+ boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
+ if (!_frameListeners.isEmpty())
+ {
+ Iterator it = _frameListeners.iterator();
+ while (it.hasNext())
+ {
+ final AMQMethodListener listener = (AMQMethodListener) it.next();
+ wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
+ }
+ }
+
+ if (!wasAnyoneInterested)
+ {
+ throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:"
+ + _frameListeners);
+ }
+ }
+ catch (AMQException e)
+ {
+ if (!_frameListeners.isEmpty())
+ {
+ Iterator it = _frameListeners.iterator();
+ while (it.hasNext())
+ {
+ final AMQMethodListener listener = (AMQMethodListener) it.next();
+ listener.error(e);
+ }
+ }
+
+ exceptionCaught(session, e);
+ }
+
+ }
+
private static int _messagesOut;
public void messageSent(IoSession session, Object message) throws Exception
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index b48adbdb08..6a5cc62bfc 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -74,8 +74,6 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
*/
protected final AMQProtocolHandler _protocolHandler;
- /** Maps from the channel id to the AMQSession that it represents. */
- protected ConcurrentMap<Integer, AMQSession> _channelId2SessionMap = new ConcurrentHashMap<Integer, AMQSession>();
protected ConcurrentMap _closingChannels = new ConcurrentHashMap();
@@ -83,7 +81,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
* Maps from a channel id to an unprocessed message. This is used to tie together the JmsDeliverBody (which arrives
* first) with the subsequent content header and content bodies.
*/
- protected ConcurrentMap _channelId2UnprocessedMsgMap = new ConcurrentHashMap();
+ private final ConcurrentMap<Integer,UnprocessedMessage> _channelId2UnprocessedMsgMap = new ConcurrentHashMap<Integer,UnprocessedMessage>();
+ private final UnprocessedMessage[] _channelId2UnprocessedMsgArray = new UnprocessedMessage[16];
/** Counter to ensure unique queue names */
protected int _queueId = 1;
@@ -101,7 +100,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
private MethodDispatcher _methodDispatcher;
- private final AMQConnection _connection;
+ private final AMQConnection _connection;
+ private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
{
@@ -230,14 +230,24 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
*
* @throws AMQException if this was not expected
*/
- public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException
+ public void unprocessedMessageReceived(final int channelId, UnprocessedMessage message) throws AMQException
{
- _channelId2UnprocessedMsgMap.put(message.getChannelId(), message);
+ if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+ {
+ _channelId2UnprocessedMsgArray[channelId] = message;
+ }
+ else
+ {
+ _channelId2UnprocessedMsgMap.put(channelId, message);
+ }
}
- public void messageContentHeaderReceived(int channelId, ContentHeaderBody contentHeader) throws AMQException
+ public void contentHeaderReceived(int channelId, ContentHeaderBody contentHeader) throws AMQException
{
- UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId);
+ final UnprocessedMessage msg = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0 ? _channelId2UnprocessedMsgArray[channelId]
+ : _channelId2UnprocessedMsgMap.get(channelId);
+
+
if (msg == null)
{
throw new AMQException("Error: received content header without having received a BasicDeliver frame first");
@@ -256,9 +266,19 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
}
}
- public void messageContentBodyReceived(int channelId, ContentBody contentBody) throws AMQException
+ public void contentBodyReceived(final int channelId, ContentBody contentBody) throws AMQException
{
- UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId);
+ UnprocessedMessage msg;
+ final boolean fastAccess = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0;
+ if(fastAccess)
+ {
+ msg = _channelId2UnprocessedMsgArray[channelId];
+ }
+ else
+ {
+ msg = _channelId2UnprocessedMsgMap.get(channelId);
+ }
+
if (msg == null)
{
throw new AMQException("Error: received content body without having received a JMSDeliver frame first");
@@ -266,7 +286,14 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
if (msg.getContentHeader() == null)
{
- _channelId2UnprocessedMsgMap.remove(channelId);
+ if(fastAccess)
+ {
+ _channelId2UnprocessedMsgArray[channelId] = null;
+ }
+ else
+ {
+ _channelId2UnprocessedMsgMap.remove(channelId);
+ }
throw new AMQException("Error: received content body without having received a ContentHeader frame first");
}
@@ -286,6 +313,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
}
}
+ public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException
+ {
+
+ }
+
/**
* Deliver a message to the appropriate session, removing the unprocessed message from our map
*
@@ -296,7 +328,14 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
AMQSession session = getSession(channelId);
session.messageReceived(msg);
- _channelId2UnprocessedMsgMap.remove(channelId);
+ if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+ {
+ _channelId2UnprocessedMsgArray[channelId] = null;
+ }
+ else
+ {
+ _channelId2UnprocessedMsgMap.remove(channelId);
+ }
}
protected AMQSession getSession(int channelId)
@@ -486,4 +525,15 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
_methodDispatcher = methodDispatcher;
}
+
+ public void setFlowControl(final int channelId, final boolean active)
+ {
+ final AMQSession session = getSession(channelId);
+ session.setFlowControl(active);
+ }
+
+ public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws AMQException
+ {
+ _protocolHandler.methodBodyReceived(channel, amqMethodBody, _minaProtocolSession);
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
index e62fce0f60..2e6a4beb83 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
@@ -37,7 +37,7 @@ import java.util.concurrent.CopyOnWriteArraySet;
* The state manager is responsible for managing the state of the protocol session. <p/> For each AMQProtocolHandler
* there is a separate state manager.
*/
-public class AMQStateManager implements AMQMethodListener
+public class AMQStateManager
{
private static final Logger _logger = LoggerFactory.getLogger(AMQStateManager.class);
@@ -52,7 +52,7 @@ public class AMQStateManager implements AMQMethodListener
* AMQFrame.
*/
- private final CopyOnWriteArraySet _stateListeners = new CopyOnWriteArraySet();
+
private final Object _stateLock = new Object();
private static final long MAXIMUM_STATE_WAIT_TIME = Long.parseLong(System.getProperty("amqj.MaximumStateWait", "30000"));
@@ -91,19 +91,6 @@ public class AMQStateManager implements AMQMethodListener
}
}
- public void error(Exception e)
- {
- _logger.debug("State manager receive error notification: " + e);
- synchronized (_stateListeners)
- {
- final Iterator it = _stateListeners.iterator();
- while (it.hasNext())
- {
- final StateListener l = (StateListener) it.next();
- l.error(e);
- }
- }
- }
public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
index 0ea04e5bc3..adbec6e35f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
@@ -85,7 +85,7 @@ public class TransportConnection
throw new AMQNoTransportForProtocolException(details);
}
- if (transport == _currentInstance)
+ /* if (transport == _currentInstance)
{
if (transport == VM)
{
@@ -100,21 +100,23 @@ public class TransportConnection
}
}
- _currentInstance = transport;
+ _currentInstance = transport;*/
+ ITransportConnection instance;
switch (transport)
{
case SOCKET:
- _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
- {
- public IoConnector newSocketConnector()
- {
- return new ExistingSocketConnector();
- }
- });
+ instance =
+ new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
+ {
+ public IoConnector newSocketConnector()
+ {
+ return new ExistingSocketConnector();
+ }
+ });
break;
case TCP:
- _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
+ instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
{
public IoConnector newSocketConnector()
{
@@ -142,12 +144,14 @@ public class TransportConnection
break;
case VM:
{
- _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
+ instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
break;
}
+ default:
+ throw new AMQNoTransportForProtocolException(details);
}
- return _instance;
+ return instance;
}
private static int getTransport(String transport)