diff options
Diffstat (limited to 'java/client')
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 23 |
1 files changed, 15 insertions, 8 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index b80178baf9..3390dfef8b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -1835,12 +1835,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic throw ex; } - synchronized (destination) - { - _destinationConsumerCount.putIfAbsent(destination, new AtomicInteger()); - _destinationConsumerCount.get(destination).incrementAndGet(); - } - return consumer; } }, _connection).execute(); @@ -1869,9 +1863,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic Destination dest = consumer.getDestination(); synchronized (dest) { - if (_destinationConsumerCount.get(dest).decrementAndGet() == 0) + // Provide additional NPE check + // This would occur if the consumer was closed before it was + // fully opened. + if (_destinationConsumerCount.get(dest) != null) { - _destinationConsumerCount.remove(dest); + if (_destinationConsumerCount.get(dest).decrementAndGet() == 0) + { + _destinationConsumerCount.remove(dest); + } } } @@ -2328,6 +2328,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // we must register the consumer in the map before we actually start listening _consumers.put(tagId, consumer); + synchronized (consumer.getDestination()) + { + _destinationConsumerCount.putIfAbsent(consumer.getDestination(), new AtomicInteger()); + _destinationConsumerCount.get(consumer.getDestination()).incrementAndGet(); + } + + try { sendConsume(consumer, queueName, protocolHandler, nowait, messageSelector, tagId); |
