diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2015-02-12 20:25:53 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2015-02-12 20:25:53 +0000 |
| commit | 762aa71a99d13cb3f7efd29cb95098eadafb5396 (patch) | |
| tree | 86cf705813a21b0dde2393b799571fe48c2d0d46 /qpid/java | |
| parent | e243745d439671418016a2be1570209269b45070 (diff) | |
| download | qpid-python-762aa71a99d13cb3f7efd29cb95098eadafb5396.tar.gz | |
merged from trunk r1659391
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1659392 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
18 files changed, 128 insertions, 228 deletions
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java index b89025a27b..4b53cfa795 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java @@ -131,6 +131,13 @@ public class TopicSubscriberImpl extends MessageConsumerImpl implements TopicSub protected void closeUnderlyingReceiver(Receiver receiver) { - receiver.close(); + if(isDurable()) + { + receiver.detach(); + } + else + { + receiver.close(); + } } } diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java index 69b4939070..a4f9ac5a3a 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java @@ -179,7 +179,7 @@ public class Connection implements ExceptionHandler boolean ssl, int channelMax) throws ConnectionException { - this(ssl?"amqp":"amqps",address,port,username,password,maxFrameSize,container, + this(ssl?"amqps":"amqp",address,port,username,password,maxFrameSize,container, remoteHostname, getSslContext(ssl), null, @@ -291,6 +291,12 @@ public class Connection implements ExceptionHandler private TransportProvider getTransportProvider(final String protocol) throws ConnectionException { + TCPTransportProviderFactory tcpTransportProviderFactory = new TCPTransportProviderFactory(); + if(tcpTransportProviderFactory.getSupportedTransports().contains(protocol)) + { + return tcpTransportProviderFactory.getProvider(protocol); + } + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); ServiceLoader<TransportProviderFactory> providerFactories = ServiceLoader.load(TransportProviderFactory.class, classLoader); diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java index 3e9dca683e..38f28667d6 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java @@ -225,8 +225,8 @@ public class FrameHandler implements ProtocolHandler // PARSE HERE try { - Object val = _typeHandler.parse(in); - + Object val = in.hasRemaining() ? _typeHandler.parse(in) : null; + if(in.hasRemaining()) { if(val instanceof Transfer) diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java index 766c9705a1..17f334153d 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java @@ -457,6 +457,7 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour case AWAITING_OPEN: case CLOSE_SENT: _state = ConnectionState.CLOSED; + closeSender(); break; case OPEN: _state = ConnectionState.CLOSE_RECEIVED; diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java index 434f939a21..246d43d3de 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java @@ -21,15 +21,28 @@ package org.apache.qpid.amqp_1_0.transport; -import org.apache.qpid.amqp_1_0.type.*; -import org.apache.qpid.amqp_1_0.type.transport.*; -import org.apache.qpid.amqp_1_0.type.transport.Error; - import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.DeliveryState; +import org.apache.qpid.amqp_1_0.type.Outcome; +import org.apache.qpid.amqp_1_0.type.Source; +import org.apache.qpid.amqp_1_0.type.Symbol; +import org.apache.qpid.amqp_1_0.type.Target; +import org.apache.qpid.amqp_1_0.type.UnsignedInteger; +import org.apache.qpid.amqp_1_0.type.UnsignedLong; +import org.apache.qpid.amqp_1_0.type.transport.Attach; +import org.apache.qpid.amqp_1_0.type.transport.Detach; +import org.apache.qpid.amqp_1_0.type.transport.Error; +import org.apache.qpid.amqp_1_0.type.transport.Flow; +import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode; +import org.apache.qpid.amqp_1_0.type.transport.Role; +import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode; +import org.apache.qpid.amqp_1_0.type.transport.Transfer; + public abstract class LinkEndpoint<T extends LinkEventListener> { diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java index 0f37518773..5a28ddcb60 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java @@ -98,6 +98,12 @@ public class SessionEndpoint private int _availableOutgoingCredit; private UnsignedInteger _lastSentIncomingLimit; + private final Error _sessionEndedLinkError = + new Error(LinkError.DETACH_FORCED, + "Force detach the link because the session is remotely ended."); + + + public SessionEndpoint(final ConnectionEndpoint connectionEndpoint) { this(connectionEndpoint, UnsignedInteger.valueOf(0)); @@ -240,19 +246,21 @@ public class SessionEndpoint private void detachLinks() { Collection<UnsignedInteger> handles = new ArrayList<UnsignedInteger>(_remoteLinkEndpoints.keySet()); - Error error = new Error(); - error.setCondition(LinkError.DETACH_FORCED); - error.setDescription("Force detach the link because the session is remotely ended."); for(UnsignedInteger handle : handles) { Detach detach = new Detach(); detach.setClosed(false); detach.setHandle(handle); - detach.setError(error); + detach.setError(_sessionEndedLinkError); detach(handle, detach); } } + public boolean isSyntheticError(Error error) + { + return error == _sessionEndedLinkError; + } + public short getSendingChannel() { return _sendingChannel; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java index 3b196df902..fd6f3385c6 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java @@ -39,7 +39,8 @@ public interface ConsumerImpl SEES_REQUEUES, TRANSIENT, EXCLUSIVE, - NO_LOCAL + NO_LOCAL, + DURABLE } long getBytesOut(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index d3ce911406..65e8a1358d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -246,6 +246,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> private final ConcurrentLinkedQueue<EnqueueRequest> _postRecoveryQueue = new ConcurrentLinkedQueue<>(); private final QueueRunner _queueRunner = new QueueRunner(this); + private boolean _closing; protected AbstractQueue(Map<String, Object> attributes, VirtualHostImpl virtualHost) { @@ -754,6 +755,15 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } + @Override + protected void beforeClose() + { + _closing = true; + super.beforeClose(); + } + + + synchronized void unregisterConsumer(final QueueConsumerImpl consumer) { if (consumer == null) @@ -794,7 +804,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> if(!consumer.isTransient() && ( getLifetimePolicy() == LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS || getLifetimePolicy() == LifetimePolicy.DELETE_ON_NO_LINKS ) - && getConsumerCount() == 0) + && getConsumerCount() == 0 + && !(consumer.isDurable() && _closing)) { if (_logger.isInfoEnabled()) @@ -1794,6 +1805,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> { ReferenceCountingExecutorService.getInstance().releaseExecutorService(); } + _closing = false; } public void checkCapacity(AMQSessionModel channel) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java index 450d4d98d5..4ffb868537 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java @@ -153,7 +153,7 @@ class QueueConsumerImpl attributes.put(EXCLUSIVE, optionSet.contains(Option.EXCLUSIVE)); attributes.put(NO_LOCAL, optionSet.contains(Option.NO_LOCAL)); attributes.put(DISTRIBUTION_MODE, optionSet.contains(Option.ACQUIRES) ? "MOVE" : "COPY"); - attributes.put(DURABLE,false); + attributes.put(DURABLE,optionSet.contains(Option.DURABLE)); attributes.put(LIFETIME_POLICY, LifetimePolicy.DELETE_ON_SESSION_END); if(filters != null) { diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java index f06f70b362..fa2e543f8d 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java @@ -47,6 +47,7 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.LinkRegistry; import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; @@ -278,7 +279,12 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget { //TODO getEndpoint().setSource(null); - getEndpoint().detach(); + getEndpoint().close(); + + final LinkRegistry linkReg = getSession().getConnection() + .getVirtualHost() + .getLinkRegistry(getEndpoint().getSession().getConnection().getRemoteContainerId()); + linkReg.unregisterSendingLink(getEndpoint().getName()); } public boolean allocateCredit(final ServerMessage msg) @@ -418,7 +424,8 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget modified.setDeliveryFailed(true); _link.getEndpoint().updateDisposition(_deliveryTag, modified, true); _link.getEndpoint().sendFlowConditional(); - _queueEntry.unlockAcquisition(); + _queueEntry.incrementDeliveryCount(); + _queueEntry.release(); } } }); diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index 48ff420965..6a202998f4 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -292,15 +292,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS actualFilters.put(entry.getKey(), entry.getValue()); } - catch (ParseException e) - { - Error error = new Error(); - error.setCondition(AmqpError.INVALID_FIELD); - error.setDescription("Invalid JMS Selector: " + selectorFilter.getValue()); - error.setInfo(Collections.singletonMap(Symbol.valueOf("field"), Symbol.valueOf("filter"))); - throw new AmqpErrorException(error); - } - catch (SelectorParsingException e) + catch (ParseException | SelectorParsingException e) { Error error = new Error(); error.setCondition(AmqpError.INVALID_FIELD); @@ -364,6 +356,12 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS options.add(ConsumerImpl.Option.NO_LOCAL); } + if(_durability == TerminusDurability.CONFIGURATION || + _durability == TerminusDurability.UNSETTLED_STATE ) + { + options.add(ConsumerImpl.Option.DURABLE); + } + try { final String name; @@ -408,7 +406,8 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS { //TODO // if not durable or close - if(!TerminusDurability.UNSETTLED_STATE.equals(_durability)) + if(Boolean.TRUE.equals(detach.getClosed()) + || !(TerminusDurability.UNSETTLED_STATE.equals(_durability)|| TerminusDurability.CONFIGURATION.equals(_durability))) { while(!_consumer.trySendLock()) { @@ -464,7 +463,8 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS _consumer.releaseSendLock(); } } - else if(detach == null || detach.getError() != null) + else if(detach.getError() != null + && !_linkAttachment.getEndpoint().getSession().isSyntheticError(detach.getError())) { _linkAttachment = null; _target.flowStateChanged(); diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index 0b613d9a5a..0a29a70373 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -215,7 +215,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio registerConsumer(sendingLink); link = sendingLink; - if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable())) + if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable())) { linkRegistry.registerSendingLink(endpoint.getName(), sendingLink); } @@ -377,7 +377,8 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio receivingLinkEndpoint.setLinkEventListener(new SubjectSpecificReceivingLinkListener(receivingLink)); link = receivingLink; - if(TerminusDurability.UNSETTLED_STATE.equals(target.getDurable())) + if(TerminusDurability.UNSETTLED_STATE.equals(target.getDurable()) + || TerminusDurability.CONFIGURATION.equals(target.getDurable())) { linkRegistry.registerReceivingLink(endpoint.getName(), receivingLink); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 12e9285af8..8f5e9524b6 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -133,7 +133,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /** * Flag indicating to start dispatcher as a daemon thread */ - protected final boolean DEAMON_DISPATCHER_THREAD = Boolean.getBoolean(ClientProperties.DAEMON_DISPATCHER); + protected final boolean DAEMON_DISPATCHER_THREAD = Boolean.getBoolean(ClientProperties.DAEMON_DISPATCHER); /** The connection to which this session belongs. */ private AMQConnection _connection; @@ -187,7 +187,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic private MessageFactoryRegistry _messageFactoryRegistry; /** Holds all of the producers created by this session, keyed by their unique identifiers. */ - private Map<Long, MessageProducer> _producers = new ConcurrentHashMap<Long, MessageProducer>(); + private final Map<Long, MessageProducer> _producers = new ConcurrentHashMap<Long, MessageProducer>(); /** * Used as a source of unique identifiers so that the consumers can be tagged to match them to BasicConsume @@ -195,7 +195,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic */ private int _nextTag = 1; - private final IdToConsumerMap<C> _consumers = new IdToConsumerMap<C>(); + private final Map<Integer,C> _consumers = new ConcurrentHashMap<>(); /** * Contains a list of consumers which have been removed but which might still have @@ -224,7 +224,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic private volatile boolean _usingDispatcherForCleanup; /** Used to indicates that the connection to which this session belongs, has been stopped. */ - private boolean _connectionStopped; + private final AtomicBoolean _connectionStopped = new AtomicBoolean(); /** Used to indicate that this session has a message listener attached to it. */ private boolean _hasMessageListeners; @@ -294,12 +294,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } /** - * Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right - * consumer. + * Consumers associated with this session */ - protected IdToConsumerMap<C> getConsumers() + protected Collection<C> getConsumers() { - return _consumers; + return new ArrayList(_consumers.values()); } protected void setUsingDispatcherForCleanup(boolean usingDispatcherForCleanup) @@ -317,83 +316,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic abstract void handleLinkDelete(final AMQDestination dest) throws AMQException; - public static final class IdToConsumerMap<C extends BasicMessageConsumer> - { - private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16]; - private final ConcurrentMap<Integer, C> _slowAccessConsumers = new ConcurrentHashMap<Integer, C>(); - - public C get(int id) - { - if ((id & 0xFFFFFFF0) == 0) - { - return (C) _fastAccessConsumers[id]; - } - else - { - return _slowAccessConsumers.get(id); - } - } - - public C put(int id, C consumer) - { - C oldVal; - if ((id & 0xFFFFFFF0) == 0) - { - oldVal = (C) _fastAccessConsumers[id]; - _fastAccessConsumers[id] = consumer; - } - else - { - oldVal = _slowAccessConsumers.put(id, consumer); - } - - return oldVal; - - } - - public C remove(int id) - { - C consumer; - if ((id & 0xFFFFFFF0) == 0) - { - consumer = (C) _fastAccessConsumers[id]; - _fastAccessConsumers[id] = null; - } - else - { - consumer = _slowAccessConsumers.remove(id); - } - - return consumer; - - } - - public Collection<C> values() - { - ArrayList<C> values = new ArrayList<C>(); - - for (int i = 0; i < 16; i++) - { - if (_fastAccessConsumers[i] != null) - { - values.add((C) _fastAccessConsumers[i]); - } - } - values.addAll(_slowAccessConsumers.values()); - - return values; - } - - public void clear() - { - _slowAccessConsumers.clear(); - for (int i = 0; i < 16; i++) - { - _fastAccessConsumers[i] = null; - } - } - } - /** * Creates a new session on a connection. * @@ -2490,7 +2412,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic String dispatcherThreadName = "Dispatcher-" + _channelId + "-Conn-" + _connection.getConnectionNumber(); _dispatcherThread.setName(dispatcherThreadName); - _dispatcherThread.setDaemon(DEAMON_DISPATCHER_THREAD); + _dispatcherThread.setDaemon(DAEMON_DISPATCHER_THREAD); _dispatcher.setConnectionStopped(initiallyStopped); _dispatcherThread.start(); if (_dispatcherLogger.isDebugEnabled()) @@ -3488,25 +3410,28 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // only call while holding lock final boolean connectionStopped() { - return _connectionStopped; + return _connectionStopped.get(); } boolean setConnectionStopped(boolean connectionStopped) { - boolean currently; - synchronized (_lock) + boolean currently = _connectionStopped.get(); + if(connectionStopped != currently) { - currently = _connectionStopped; - _connectionStopped = connectionStopped; - _lock.notify(); - - if (_dispatcherLogger.isDebugEnabled()) + synchronized (_lock) { - _dispatcherLogger.debug("Set Dispatcher Connection " + (connectionStopped ? "Stopped" : "Started") - + ": Currently " + (currently ? "Stopped" : "Started")); + _connectionStopped.set(connectionStopped); + _lock.notify(); + + if (_dispatcherLogger.isDebugEnabled()) + { + _dispatcherLogger.debug("Set Dispatcher Connection " + (connectionStopped + ? "Stopped" + : "Started") + + ": Currently " + (currently ? "Stopped" : "Started")); + } } } - return currently; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index dc1f9a719e..206ca15c82 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -833,7 +833,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { if (suspend) { - for (BasicMessageConsumer consumer : getConsumers().values()) + for (BasicMessageConsumer consumer : getConsumers()) { getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()), Option.UNRELIABLE); @@ -842,7 +842,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } else { - for (BasicMessageConsumer_0_10 consumer : getConsumers().values()) + for (BasicMessageConsumer_0_10 consumer : getConsumers()) { String consumerTag = String.valueOf(consumer.getConsumerTag()); //only set if msg list is null @@ -1320,7 +1320,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic drainDispatchQueue(); setUsingDispatcherForCleanup(false); - for (BasicMessageConsumer consumer : getConsumers().values()) + for (BasicMessageConsumer consumer : getConsumers()) { List<Long> tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags(); getPrefetchedMessageTags().addAll(tags); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 143de271a1..5fb9329af7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -27,6 +27,7 @@ import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_W import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -330,10 +331,9 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe { _logger.debug("Prefetched message: _unacknowledgedMessageTags :" + getUnacknowledgedMessageTags()); } - ArrayList<BasicMessageConsumer_0_8> consumersToCheck = new ArrayList<BasicMessageConsumer_0_8>(getConsumers().values()); boolean messageListenerFound = false; boolean serverRejectBehaviourFound = false; - for(BasicMessageConsumer_0_8 consumer : consumersToCheck) + for(BasicMessageConsumer_0_8 consumer : getConsumers()) { if (consumer.isMessageListenerSet()) { @@ -344,7 +344,6 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe serverRejectBehaviourFound = true; } } - _logger.debug("about to pre-reject messages for " + consumersToCheck.size() + " consumer(s)"); if (serverRejectBehaviourFound) { @@ -376,7 +375,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe // consumer on the queue. Whilst this is within the JMS spec it is not // user friendly and avoidable. boolean normalRejectBehaviour = true; - for (BasicMessageConsumer_0_8 consumer : getConsumers().values()) + for (BasicMessageConsumer_0_8 consumer : getConsumers()) { if(RejectBehaviour.SERVER.equals(consumer.getRejectBehaviour())) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java b/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java index 2fdb35de49..f46c61daa7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java @@ -22,106 +22,45 @@ package org.apache.qpid.client; import java.util.ArrayList; import java.util.Collection; -import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; public final class ChannelToSessionMap { - private final AMQSession[] _fastAccessSessions = new AMQSession[16]; - private final LinkedHashMap<Integer, AMQSession> _slowAccessSessions = new LinkedHashMap<Integer, AMQSession>(); - private int _size = 0; - private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0; + private final Map<Integer, AMQSession> _sessionMap = new ConcurrentHashMap<>(); private AtomicInteger _idFactory = new AtomicInteger(0); private int _maxChannelID; private int _minChannelID; public AMQSession get(int channelId) { - if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) - { - return _fastAccessSessions[channelId]; - } - else - { - return _slowAccessSessions.get(channelId); - } + return _sessionMap.get(channelId); } - public AMQSession put(int channelId, AMQSession session) + public void put(int channelId, AMQSession session) { - AMQSession oldVal; - if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) - { - oldVal = _fastAccessSessions[channelId]; - _fastAccessSessions[channelId] = session; - } - else - { - oldVal = _slowAccessSessions.put(channelId, session); - } - if ((oldVal != null) && (session == null)) - { - _size--; - } - else if ((oldVal == null) && (session != null)) - { - _size++; - } - - return session; - + _sessionMap.put(channelId, session); } - public AMQSession remove(int channelId) + public void remove(int channelId) { - AMQSession session; - if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) - { - session = _fastAccessSessions[channelId]; - _fastAccessSessions[channelId] = null; - } - else - { - session = _slowAccessSessions.remove(channelId); - } - - if (session != null) - { - _size--; - } - return session; - + _sessionMap.remove(channelId); } public Collection<AMQSession> values() { - ArrayList<AMQSession> values = new ArrayList<AMQSession>(size()); - - for (int i = 0; i < 16; i++) - { - if (_fastAccessSessions[i] != null) - { - values.add(_fastAccessSessions[i]); - } - } - values.addAll(_slowAccessSessions.values()); - - return values; + return new ArrayList<>(_sessionMap.values()); } public int size() { - return _size; + return _sessionMap.size(); } public void clear() { - _size = 0; - _slowAccessSessions.clear(); - for (int i = 0; i < 16; i++) - { - _fastAccessSessions[i] = null; - } + _sessionMap.clear(); } /* @@ -141,14 +80,8 @@ public final class ChannelToSessionMap //go back to the start _idFactory.set(_minChannelID); } - if ((id & FAST_CHANNEL_ACCESS_MASK) == 0) - { - done = (_fastAccessSessions[id] == null); - } - else - { - done = (!_slowAccessSessions.keySet().contains(id)); - } + + done = (!_sessionMap.keySet().contains(id)); } return id; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java index d9514338ce..d625a9ae69 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java @@ -29,7 +29,7 @@ import javax.jms.XATopicConnection; import javax.jms.XATopicSession; /** - * This class implements the javax.njms.XAConnection interface + * This class implements the javax.jms.XAConnection interface */ public class XAConnectionImpl extends AMQConnection implements XAConnection, XAQueueConnection, XATopicConnection { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidQueueOptions.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidQueueOptions.java index 5b6c027f4a..24295a0832 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidQueueOptions.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidQueueOptions.java @@ -30,7 +30,6 @@ public class QpidQueueOptions extends HashMap<String,Object> public static final String QPID_LVQ_KEY = "qpid.LVQ_key"; public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue"; public static final String QPID_LAST_VALUE_QUEUE_NO_BROWSE = "qpid.last_value_queue_no_browse"; - public static final String QPID_QUEUE_EVENT_GENERATION = "qpid.queue_event_generation"; public void validatePolicyType(String type) { @@ -83,16 +82,4 @@ public class QpidQueueOptions extends HashMap<String,Object> this.put(QPID_LVQ_KEY, key); } - public void setQueueEvents(String value) - { - if (value != null && (value.equals("1") || value.equals("2"))) - { - this.put(QPID_QUEUE_EVENT_GENERATION, value); - } - else - { - throw new IllegalArgumentException("Invalid value for " + - QPID_QUEUE_EVENT_GENERATION + " should be one of {1|2}"); - } - } } |
