diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2015-02-12 20:25:53 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2015-02-12 20:25:53 +0000 |
| commit | 762aa71a99d13cb3f7efd29cb95098eadafb5396 (patch) | |
| tree | 86cf705813a21b0dde2393b799571fe48c2d0d46 /qpid/java/broker-plugins | |
| parent | e243745d439671418016a2be1570209269b45070 (diff) | |
| download | qpid-python-762aa71a99d13cb3f7efd29cb95098eadafb5396.tar.gz | |
merged from trunk r1659391
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1659392 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
3 files changed, 23 insertions, 15 deletions
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java index f06f70b362..fa2e543f8d 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java @@ -47,6 +47,7 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.LinkRegistry; import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; @@ -278,7 +279,12 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget { //TODO getEndpoint().setSource(null); - getEndpoint().detach(); + getEndpoint().close(); + + final LinkRegistry linkReg = getSession().getConnection() + .getVirtualHost() + .getLinkRegistry(getEndpoint().getSession().getConnection().getRemoteContainerId()); + linkReg.unregisterSendingLink(getEndpoint().getName()); } public boolean allocateCredit(final ServerMessage msg) @@ -418,7 +424,8 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget modified.setDeliveryFailed(true); _link.getEndpoint().updateDisposition(_deliveryTag, modified, true); _link.getEndpoint().sendFlowConditional(); - _queueEntry.unlockAcquisition(); + _queueEntry.incrementDeliveryCount(); + _queueEntry.release(); } } }); diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index 48ff420965..6a202998f4 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -292,15 +292,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS actualFilters.put(entry.getKey(), entry.getValue()); } - catch (ParseException e) - { - Error error = new Error(); - error.setCondition(AmqpError.INVALID_FIELD); - error.setDescription("Invalid JMS Selector: " + selectorFilter.getValue()); - error.setInfo(Collections.singletonMap(Symbol.valueOf("field"), Symbol.valueOf("filter"))); - throw new AmqpErrorException(error); - } - catch (SelectorParsingException e) + catch (ParseException | SelectorParsingException e) { Error error = new Error(); error.setCondition(AmqpError.INVALID_FIELD); @@ -364,6 +356,12 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS options.add(ConsumerImpl.Option.NO_LOCAL); } + if(_durability == TerminusDurability.CONFIGURATION || + _durability == TerminusDurability.UNSETTLED_STATE ) + { + options.add(ConsumerImpl.Option.DURABLE); + } + try { final String name; @@ -408,7 +406,8 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS { //TODO // if not durable or close - if(!TerminusDurability.UNSETTLED_STATE.equals(_durability)) + if(Boolean.TRUE.equals(detach.getClosed()) + || !(TerminusDurability.UNSETTLED_STATE.equals(_durability)|| TerminusDurability.CONFIGURATION.equals(_durability))) { while(!_consumer.trySendLock()) { @@ -464,7 +463,8 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS _consumer.releaseSendLock(); } } - else if(detach == null || detach.getError() != null) + else if(detach.getError() != null + && !_linkAttachment.getEndpoint().getSession().isSyntheticError(detach.getError())) { _linkAttachment = null; _target.flowStateChanged(); diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index 0b613d9a5a..0a29a70373 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -215,7 +215,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio registerConsumer(sendingLink); link = sendingLink; - if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable())) + if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable())) { linkRegistry.registerSendingLink(endpoint.getName(), sendingLink); } @@ -377,7 +377,8 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio receivingLinkEndpoint.setLinkEventListener(new SubjectSpecificReceivingLinkListener(receivingLink)); link = receivingLink; - if(TerminusDurability.UNSETTLED_STATE.equals(target.getDurable())) + if(TerminusDurability.UNSETTLED_STATE.equals(target.getDurable()) + || TerminusDurability.CONFIGURATION.equals(target.getDurable())) { linkRegistry.registerReceivingLink(endpoint.getName(), receivingLink); } |
