summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-02-12 20:25:53 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-02-12 20:25:53 +0000
commit762aa71a99d13cb3f7efd29cb95098eadafb5396 (patch)
tree86cf705813a21b0dde2393b799571fe48c2d0d46 /qpid/java
parente243745d439671418016a2be1570209269b45070 (diff)
downloadqpid-python-762aa71a99d13cb3f7efd29cb95098eadafb5396.tar.gz
merged from trunk r1659391
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1659392 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java9
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java8
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java4
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java1
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java21
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java16
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java14
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java2
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java11
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java22
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java5
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java119
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java6
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java7
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java93
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidQueueOptions.java13
18 files changed, 128 insertions, 228 deletions
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java
index b89025a27b..4b53cfa795 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java
@@ -131,6 +131,13 @@ public class TopicSubscriberImpl extends MessageConsumerImpl implements TopicSub
protected void closeUnderlyingReceiver(Receiver receiver)
{
- receiver.close();
+ if(isDurable())
+ {
+ receiver.detach();
+ }
+ else
+ {
+ receiver.close();
+ }
}
}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
index 69b4939070..a4f9ac5a3a 100644
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
+++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
@@ -179,7 +179,7 @@ public class Connection implements ExceptionHandler
boolean ssl,
int channelMax) throws ConnectionException
{
- this(ssl?"amqp":"amqps",address,port,username,password,maxFrameSize,container,
+ this(ssl?"amqps":"amqp",address,port,username,password,maxFrameSize,container,
remoteHostname,
getSslContext(ssl),
null,
@@ -291,6 +291,12 @@ public class Connection implements ExceptionHandler
private TransportProvider getTransportProvider(final String protocol) throws ConnectionException
{
+ TCPTransportProviderFactory tcpTransportProviderFactory = new TCPTransportProviderFactory();
+ if(tcpTransportProviderFactory.getSupportedTransports().contains(protocol))
+ {
+ return tcpTransportProviderFactory.getProvider(protocol);
+ }
+
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
ServiceLoader<TransportProviderFactory> providerFactories = ServiceLoader.load(TransportProviderFactory.class, classLoader);
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java
index 3e9dca683e..38f28667d6 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java
@@ -225,8 +225,8 @@ public class FrameHandler implements ProtocolHandler
// PARSE HERE
try
{
- Object val = _typeHandler.parse(in);
-
+ Object val = in.hasRemaining() ? _typeHandler.parse(in) : null;
+
if(in.hasRemaining())
{
if(val instanceof Transfer)
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
index 766c9705a1..17f334153d 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
@@ -457,6 +457,7 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour
case AWAITING_OPEN:
case CLOSE_SENT:
_state = ConnectionState.CLOSED;
+ closeSender();
break;
case OPEN:
_state = ConnectionState.CLOSE_RECEIVED;
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java
index 434f939a21..246d43d3de 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java
@@ -21,15 +21,28 @@
package org.apache.qpid.amqp_1_0.transport;
-import org.apache.qpid.amqp_1_0.type.*;
-import org.apache.qpid.amqp_1_0.type.transport.*;
-import org.apache.qpid.amqp_1_0.type.transport.Error;
-
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.DeliveryState;
+import org.apache.qpid.amqp_1_0.type.Outcome;
+import org.apache.qpid.amqp_1_0.type.Source;
+import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.Target;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.UnsignedLong;
+import org.apache.qpid.amqp_1_0.type.transport.Attach;
+import org.apache.qpid.amqp_1_0.type.transport.Detach;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
+import org.apache.qpid.amqp_1_0.type.transport.Flow;
+import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode;
+import org.apache.qpid.amqp_1_0.type.transport.Role;
+import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
+import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+
public abstract class LinkEndpoint<T extends LinkEventListener>
{
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
index 0f37518773..5a28ddcb60 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
@@ -98,6 +98,12 @@ public class SessionEndpoint
private int _availableOutgoingCredit;
private UnsignedInteger _lastSentIncomingLimit;
+ private final Error _sessionEndedLinkError =
+ new Error(LinkError.DETACH_FORCED,
+ "Force detach the link because the session is remotely ended.");
+
+
+
public SessionEndpoint(final ConnectionEndpoint connectionEndpoint)
{
this(connectionEndpoint, UnsignedInteger.valueOf(0));
@@ -240,19 +246,21 @@ public class SessionEndpoint
private void detachLinks()
{
Collection<UnsignedInteger> handles = new ArrayList<UnsignedInteger>(_remoteLinkEndpoints.keySet());
- Error error = new Error();
- error.setCondition(LinkError.DETACH_FORCED);
- error.setDescription("Force detach the link because the session is remotely ended.");
for(UnsignedInteger handle : handles)
{
Detach detach = new Detach();
detach.setClosed(false);
detach.setHandle(handle);
- detach.setError(error);
+ detach.setError(_sessionEndedLinkError);
detach(handle, detach);
}
}
+ public boolean isSyntheticError(Error error)
+ {
+ return error == _sessionEndedLinkError;
+ }
+
public short getSendingChannel()
{
return _sendingChannel;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
index 3b196df902..fd6f3385c6 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
@@ -39,7 +39,8 @@ public interface ConsumerImpl
SEES_REQUEUES,
TRANSIENT,
EXCLUSIVE,
- NO_LOCAL
+ NO_LOCAL,
+ DURABLE
}
long getBytesOut();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index d3ce911406..65e8a1358d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -246,6 +246,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
private final ConcurrentLinkedQueue<EnqueueRequest> _postRecoveryQueue = new ConcurrentLinkedQueue<>();
private final QueueRunner _queueRunner = new QueueRunner(this);
+ private boolean _closing;
protected AbstractQueue(Map<String, Object> attributes, VirtualHostImpl virtualHost)
{
@@ -754,6 +755,15 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
+ @Override
+ protected void beforeClose()
+ {
+ _closing = true;
+ super.beforeClose();
+ }
+
+
+
synchronized void unregisterConsumer(final QueueConsumerImpl consumer)
{
if (consumer == null)
@@ -794,7 +804,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
if(!consumer.isTransient()
&& ( getLifetimePolicy() == LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS
|| getLifetimePolicy() == LifetimePolicy.DELETE_ON_NO_LINKS )
- && getConsumerCount() == 0)
+ && getConsumerCount() == 0
+ && !(consumer.isDurable() && _closing))
{
if (_logger.isInfoEnabled())
@@ -1794,6 +1805,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
{
ReferenceCountingExecutorService.getInstance().releaseExecutorService();
}
+ _closing = false;
}
public void checkCapacity(AMQSessionModel channel)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
index 450d4d98d5..4ffb868537 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
@@ -153,7 +153,7 @@ class QueueConsumerImpl
attributes.put(EXCLUSIVE, optionSet.contains(Option.EXCLUSIVE));
attributes.put(NO_LOCAL, optionSet.contains(Option.NO_LOCAL));
attributes.put(DISTRIBUTION_MODE, optionSet.contains(Option.ACQUIRES) ? "MOVE" : "COPY");
- attributes.put(DURABLE,false);
+ attributes.put(DURABLE,optionSet.contains(Option.DURABLE));
attributes.put(LIFETIME_POLICY, LifetimePolicy.DELETE_ON_SESSION_END);
if(filters != null)
{
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index f06f70b362..fa2e543f8d 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -47,6 +47,7 @@ import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
@@ -278,7 +279,12 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
{
//TODO
getEndpoint().setSource(null);
- getEndpoint().detach();
+ getEndpoint().close();
+
+ final LinkRegistry linkReg = getSession().getConnection()
+ .getVirtualHost()
+ .getLinkRegistry(getEndpoint().getSession().getConnection().getRemoteContainerId());
+ linkReg.unregisterSendingLink(getEndpoint().getName());
}
public boolean allocateCredit(final ServerMessage msg)
@@ -418,7 +424,8 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
modified.setDeliveryFailed(true);
_link.getEndpoint().updateDisposition(_deliveryTag, modified, true);
_link.getEndpoint().sendFlowConditional();
- _queueEntry.unlockAcquisition();
+ _queueEntry.incrementDeliveryCount();
+ _queueEntry.release();
}
}
});
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index 48ff420965..6a202998f4 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -292,15 +292,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
actualFilters.put(entry.getKey(), entry.getValue());
}
- catch (ParseException e)
- {
- Error error = new Error();
- error.setCondition(AmqpError.INVALID_FIELD);
- error.setDescription("Invalid JMS Selector: " + selectorFilter.getValue());
- error.setInfo(Collections.singletonMap(Symbol.valueOf("field"), Symbol.valueOf("filter")));
- throw new AmqpErrorException(error);
- }
- catch (SelectorParsingException e)
+ catch (ParseException | SelectorParsingException e)
{
Error error = new Error();
error.setCondition(AmqpError.INVALID_FIELD);
@@ -364,6 +356,12 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
options.add(ConsumerImpl.Option.NO_LOCAL);
}
+ if(_durability == TerminusDurability.CONFIGURATION ||
+ _durability == TerminusDurability.UNSETTLED_STATE )
+ {
+ options.add(ConsumerImpl.Option.DURABLE);
+ }
+
try
{
final String name;
@@ -408,7 +406,8 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
{
//TODO
// if not durable or close
- if(!TerminusDurability.UNSETTLED_STATE.equals(_durability))
+ if(Boolean.TRUE.equals(detach.getClosed())
+ || !(TerminusDurability.UNSETTLED_STATE.equals(_durability)|| TerminusDurability.CONFIGURATION.equals(_durability)))
{
while(!_consumer.trySendLock())
{
@@ -464,7 +463,8 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
_consumer.releaseSendLock();
}
}
- else if(detach == null || detach.getError() != null)
+ else if(detach.getError() != null
+ && !_linkAttachment.getEndpoint().getSession().isSyntheticError(detach.getError()))
{
_linkAttachment = null;
_target.flowStateChanged();
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 0b613d9a5a..0a29a70373 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -215,7 +215,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
registerConsumer(sendingLink);
link = sendingLink;
- if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()))
+ if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable()))
{
linkRegistry.registerSendingLink(endpoint.getName(), sendingLink);
}
@@ -377,7 +377,8 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
receivingLinkEndpoint.setLinkEventListener(new SubjectSpecificReceivingLinkListener(receivingLink));
link = receivingLink;
- if(TerminusDurability.UNSETTLED_STATE.equals(target.getDurable()))
+ if(TerminusDurability.UNSETTLED_STATE.equals(target.getDurable())
+ || TerminusDurability.CONFIGURATION.equals(target.getDurable()))
{
linkRegistry.registerReceivingLink(endpoint.getName(), receivingLink);
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 12e9285af8..8f5e9524b6 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -133,7 +133,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
/**
* Flag indicating to start dispatcher as a daemon thread
*/
- protected final boolean DEAMON_DISPATCHER_THREAD = Boolean.getBoolean(ClientProperties.DAEMON_DISPATCHER);
+ protected final boolean DAEMON_DISPATCHER_THREAD = Boolean.getBoolean(ClientProperties.DAEMON_DISPATCHER);
/** The connection to which this session belongs. */
private AMQConnection _connection;
@@ -187,7 +187,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
private MessageFactoryRegistry _messageFactoryRegistry;
/** Holds all of the producers created by this session, keyed by their unique identifiers. */
- private Map<Long, MessageProducer> _producers = new ConcurrentHashMap<Long, MessageProducer>();
+ private final Map<Long, MessageProducer> _producers = new ConcurrentHashMap<Long, MessageProducer>();
/**
* Used as a source of unique identifiers so that the consumers can be tagged to match them to BasicConsume
@@ -195,7 +195,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
*/
private int _nextTag = 1;
- private final IdToConsumerMap<C> _consumers = new IdToConsumerMap<C>();
+ private final Map<Integer,C> _consumers = new ConcurrentHashMap<>();
/**
* Contains a list of consumers which have been removed but which might still have
@@ -224,7 +224,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
private volatile boolean _usingDispatcherForCleanup;
/** Used to indicates that the connection to which this session belongs, has been stopped. */
- private boolean _connectionStopped;
+ private final AtomicBoolean _connectionStopped = new AtomicBoolean();
/** Used to indicate that this session has a message listener attached to it. */
private boolean _hasMessageListeners;
@@ -294,12 +294,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
/**
- * Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right
- * consumer.
+ * Consumers associated with this session
*/
- protected IdToConsumerMap<C> getConsumers()
+ protected Collection<C> getConsumers()
{
- return _consumers;
+ return new ArrayList(_consumers.values());
}
protected void setUsingDispatcherForCleanup(boolean usingDispatcherForCleanup)
@@ -317,83 +316,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
abstract void handleLinkDelete(final AMQDestination dest) throws AMQException;
- public static final class IdToConsumerMap<C extends BasicMessageConsumer>
- {
- private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
- private final ConcurrentMap<Integer, C> _slowAccessConsumers = new ConcurrentHashMap<Integer, C>();
-
- public C get(int id)
- {
- if ((id & 0xFFFFFFF0) == 0)
- {
- return (C) _fastAccessConsumers[id];
- }
- else
- {
- return _slowAccessConsumers.get(id);
- }
- }
-
- public C put(int id, C consumer)
- {
- C oldVal;
- if ((id & 0xFFFFFFF0) == 0)
- {
- oldVal = (C) _fastAccessConsumers[id];
- _fastAccessConsumers[id] = consumer;
- }
- else
- {
- oldVal = _slowAccessConsumers.put(id, consumer);
- }
-
- return oldVal;
-
- }
-
- public C remove(int id)
- {
- C consumer;
- if ((id & 0xFFFFFFF0) == 0)
- {
- consumer = (C) _fastAccessConsumers[id];
- _fastAccessConsumers[id] = null;
- }
- else
- {
- consumer = _slowAccessConsumers.remove(id);
- }
-
- return consumer;
-
- }
-
- public Collection<C> values()
- {
- ArrayList<C> values = new ArrayList<C>();
-
- for (int i = 0; i < 16; i++)
- {
- if (_fastAccessConsumers[i] != null)
- {
- values.add((C) _fastAccessConsumers[i]);
- }
- }
- values.addAll(_slowAccessConsumers.values());
-
- return values;
- }
-
- public void clear()
- {
- _slowAccessConsumers.clear();
- for (int i = 0; i < 16; i++)
- {
- _fastAccessConsumers[i] = null;
- }
- }
- }
-
/**
* Creates a new session on a connection.
*
@@ -2490,7 +2412,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
String dispatcherThreadName = "Dispatcher-" + _channelId + "-Conn-" + _connection.getConnectionNumber();
_dispatcherThread.setName(dispatcherThreadName);
- _dispatcherThread.setDaemon(DEAMON_DISPATCHER_THREAD);
+ _dispatcherThread.setDaemon(DAEMON_DISPATCHER_THREAD);
_dispatcher.setConnectionStopped(initiallyStopped);
_dispatcherThread.start();
if (_dispatcherLogger.isDebugEnabled())
@@ -3488,25 +3410,28 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// only call while holding lock
final boolean connectionStopped()
{
- return _connectionStopped;
+ return _connectionStopped.get();
}
boolean setConnectionStopped(boolean connectionStopped)
{
- boolean currently;
- synchronized (_lock)
+ boolean currently = _connectionStopped.get();
+ if(connectionStopped != currently)
{
- currently = _connectionStopped;
- _connectionStopped = connectionStopped;
- _lock.notify();
-
- if (_dispatcherLogger.isDebugEnabled())
+ synchronized (_lock)
{
- _dispatcherLogger.debug("Set Dispatcher Connection " + (connectionStopped ? "Stopped" : "Started")
- + ": Currently " + (currently ? "Stopped" : "Started"));
+ _connectionStopped.set(connectionStopped);
+ _lock.notify();
+
+ if (_dispatcherLogger.isDebugEnabled())
+ {
+ _dispatcherLogger.debug("Set Dispatcher Connection " + (connectionStopped
+ ? "Stopped"
+ : "Started")
+ + ": Currently " + (currently ? "Stopped" : "Started"));
+ }
}
}
-
return currently;
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index dc1f9a719e..206ca15c82 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -833,7 +833,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
if (suspend)
{
- for (BasicMessageConsumer consumer : getConsumers().values())
+ for (BasicMessageConsumer consumer : getConsumers())
{
getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()),
Option.UNRELIABLE);
@@ -842,7 +842,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
else
{
- for (BasicMessageConsumer_0_10 consumer : getConsumers().values())
+ for (BasicMessageConsumer_0_10 consumer : getConsumers())
{
String consumerTag = String.valueOf(consumer.getConsumerTag());
//only set if msg list is null
@@ -1320,7 +1320,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
drainDispatchQueue();
setUsingDispatcherForCleanup(false);
- for (BasicMessageConsumer consumer : getConsumers().values())
+ for (BasicMessageConsumer consumer : getConsumers())
{
List<Long> tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags();
getPrefetchedMessageTags().addAll(tags);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 143de271a1..5fb9329af7 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -27,6 +27,7 @@ import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_W
import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@@ -330,10 +331,9 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
{
_logger.debug("Prefetched message: _unacknowledgedMessageTags :" + getUnacknowledgedMessageTags());
}
- ArrayList<BasicMessageConsumer_0_8> consumersToCheck = new ArrayList<BasicMessageConsumer_0_8>(getConsumers().values());
boolean messageListenerFound = false;
boolean serverRejectBehaviourFound = false;
- for(BasicMessageConsumer_0_8 consumer : consumersToCheck)
+ for(BasicMessageConsumer_0_8 consumer : getConsumers())
{
if (consumer.isMessageListenerSet())
{
@@ -344,7 +344,6 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
serverRejectBehaviourFound = true;
}
}
- _logger.debug("about to pre-reject messages for " + consumersToCheck.size() + " consumer(s)");
if (serverRejectBehaviourFound)
{
@@ -376,7 +375,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
// consumer on the queue. Whilst this is within the JMS spec it is not
// user friendly and avoidable.
boolean normalRejectBehaviour = true;
- for (BasicMessageConsumer_0_8 consumer : getConsumers().values())
+ for (BasicMessageConsumer_0_8 consumer : getConsumers())
{
if(RejectBehaviour.SERVER.equals(consumer.getRejectBehaviour()))
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java b/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java
index 2fdb35de49..f46c61daa7 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java
@@ -22,106 +22,45 @@ package org.apache.qpid.client;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
public final class ChannelToSessionMap
{
- private final AMQSession[] _fastAccessSessions = new AMQSession[16];
- private final LinkedHashMap<Integer, AMQSession> _slowAccessSessions = new LinkedHashMap<Integer, AMQSession>();
- private int _size = 0;
- private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
+ private final Map<Integer, AMQSession> _sessionMap = new ConcurrentHashMap<>();
private AtomicInteger _idFactory = new AtomicInteger(0);
private int _maxChannelID;
private int _minChannelID;
public AMQSession get(int channelId)
{
- if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
- {
- return _fastAccessSessions[channelId];
- }
- else
- {
- return _slowAccessSessions.get(channelId);
- }
+ return _sessionMap.get(channelId);
}
- public AMQSession put(int channelId, AMQSession session)
+ public void put(int channelId, AMQSession session)
{
- AMQSession oldVal;
- if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
- {
- oldVal = _fastAccessSessions[channelId];
- _fastAccessSessions[channelId] = session;
- }
- else
- {
- oldVal = _slowAccessSessions.put(channelId, session);
- }
- if ((oldVal != null) && (session == null))
- {
- _size--;
- }
- else if ((oldVal == null) && (session != null))
- {
- _size++;
- }
-
- return session;
-
+ _sessionMap.put(channelId, session);
}
- public AMQSession remove(int channelId)
+ public void remove(int channelId)
{
- AMQSession session;
- if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
- {
- session = _fastAccessSessions[channelId];
- _fastAccessSessions[channelId] = null;
- }
- else
- {
- session = _slowAccessSessions.remove(channelId);
- }
-
- if (session != null)
- {
- _size--;
- }
- return session;
-
+ _sessionMap.remove(channelId);
}
public Collection<AMQSession> values()
{
- ArrayList<AMQSession> values = new ArrayList<AMQSession>(size());
-
- for (int i = 0; i < 16; i++)
- {
- if (_fastAccessSessions[i] != null)
- {
- values.add(_fastAccessSessions[i]);
- }
- }
- values.addAll(_slowAccessSessions.values());
-
- return values;
+ return new ArrayList<>(_sessionMap.values());
}
public int size()
{
- return _size;
+ return _sessionMap.size();
}
public void clear()
{
- _size = 0;
- _slowAccessSessions.clear();
- for (int i = 0; i < 16; i++)
- {
- _fastAccessSessions[i] = null;
- }
+ _sessionMap.clear();
}
/*
@@ -141,14 +80,8 @@ public final class ChannelToSessionMap
//go back to the start
_idFactory.set(_minChannelID);
}
- if ((id & FAST_CHANNEL_ACCESS_MASK) == 0)
- {
- done = (_fastAccessSessions[id] == null);
- }
- else
- {
- done = (!_slowAccessSessions.keySet().contains(id));
- }
+
+ done = (!_sessionMap.keySet().contains(id));
}
return id;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java
index d9514338ce..d625a9ae69 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java
@@ -29,7 +29,7 @@ import javax.jms.XATopicConnection;
import javax.jms.XATopicSession;
/**
- * This class implements the javax.njms.XAConnection interface
+ * This class implements the javax.jms.XAConnection interface
*/
public class XAConnectionImpl extends AMQConnection implements XAConnection, XAQueueConnection, XATopicConnection
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidQueueOptions.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidQueueOptions.java
index 5b6c027f4a..24295a0832 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidQueueOptions.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidQueueOptions.java
@@ -30,7 +30,6 @@ public class QpidQueueOptions extends HashMap<String,Object>
public static final String QPID_LVQ_KEY = "qpid.LVQ_key";
public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue";
public static final String QPID_LAST_VALUE_QUEUE_NO_BROWSE = "qpid.last_value_queue_no_browse";
- public static final String QPID_QUEUE_EVENT_GENERATION = "qpid.queue_event_generation";
public void validatePolicyType(String type)
{
@@ -83,16 +82,4 @@ public class QpidQueueOptions extends HashMap<String,Object>
this.put(QPID_LVQ_KEY, key);
}
- public void setQueueEvents(String value)
- {
- if (value != null && (value.equals("1") || value.equals("2")))
- {
- this.put(QPID_QUEUE_EVENT_GENERATION, value);
- }
- else
- {
- throw new IllegalArgumentException("Invalid value for " +
- QPID_QUEUE_EVENT_GENERATION + " should be one of {1|2}");
- }
- }
}