diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2015-02-11 01:04:08 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2015-02-11 01:04:08 +0000 |
| commit | 35f8db0065335d4da24de4459cf228b077218138 (patch) | |
| tree | aa2221d5165f355940a14089404b959740b24df3 /qpid/java | |
| parent | 93024d74d2711c3c3cdab6e98f7158ca730abbe1 (diff) | |
| download | qpid-python-35f8db0065335d4da24de4459cf228b077218138.tar.gz | |
QPID-6384 : fix various issues with durable links
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1658849 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
4 files changed, 38 insertions, 10 deletions
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java index 434f939a21..246d43d3de 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java @@ -21,15 +21,28 @@ package org.apache.qpid.amqp_1_0.transport; -import org.apache.qpid.amqp_1_0.type.*; -import org.apache.qpid.amqp_1_0.type.transport.*; -import org.apache.qpid.amqp_1_0.type.transport.Error; - import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.DeliveryState; +import org.apache.qpid.amqp_1_0.type.Outcome; +import org.apache.qpid.amqp_1_0.type.Source; +import org.apache.qpid.amqp_1_0.type.Symbol; +import org.apache.qpid.amqp_1_0.type.Target; +import org.apache.qpid.amqp_1_0.type.UnsignedInteger; +import org.apache.qpid.amqp_1_0.type.UnsignedLong; +import org.apache.qpid.amqp_1_0.type.transport.Attach; +import org.apache.qpid.amqp_1_0.type.transport.Detach; +import org.apache.qpid.amqp_1_0.type.transport.Error; +import org.apache.qpid.amqp_1_0.type.transport.Flow; +import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode; +import org.apache.qpid.amqp_1_0.type.transport.Role; +import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode; +import org.apache.qpid.amqp_1_0.type.transport.Transfer; + public abstract class LinkEndpoint<T extends LinkEventListener> { diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java index 0f37518773..5a28ddcb60 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java @@ -98,6 +98,12 @@ public class SessionEndpoint private int _availableOutgoingCredit; private UnsignedInteger _lastSentIncomingLimit; + private final Error _sessionEndedLinkError = + new Error(LinkError.DETACH_FORCED, + "Force detach the link because the session is remotely ended."); + + + public SessionEndpoint(final ConnectionEndpoint connectionEndpoint) { this(connectionEndpoint, UnsignedInteger.valueOf(0)); @@ -240,19 +246,21 @@ public class SessionEndpoint private void detachLinks() { Collection<UnsignedInteger> handles = new ArrayList<UnsignedInteger>(_remoteLinkEndpoints.keySet()); - Error error = new Error(); - error.setCondition(LinkError.DETACH_FORCED); - error.setDescription("Force detach the link because the session is remotely ended."); for(UnsignedInteger handle : handles) { Detach detach = new Detach(); detach.setClosed(false); detach.setHandle(handle); - detach.setError(error); + detach.setError(_sessionEndedLinkError); detach(handle, detach); } } + public boolean isSyntheticError(Error error) + { + return error == _sessionEndedLinkError; + } + public short getSendingChannel() { return _sendingChannel; diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java index 598fce03b9..f19ce6b1be 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java @@ -46,6 +46,7 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.LinkRegistry; import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; @@ -283,7 +284,12 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget { //TODO getEndpoint().setSource(null); - getEndpoint().detach(); + getEndpoint().close(); + + final LinkRegistry linkReg = getSession().getConnection() + .getVirtualHost() + .getLinkRegistry(getEndpoint().getSession().getConnection().getRemoteContainerId()); + linkReg.unregisterSendingLink(getEndpoint().getName()); } public boolean allocateCredit(final ServerMessage msg) diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index cdaf5f0ed6..e3994005d6 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -464,7 +464,8 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS _consumer.releaseSendLock(); } } - else if(detach == null || detach.getError() != null) + else if(detach.getError() != null + && !_linkAttachment.getEndpoint().getSession().isSyntheticError(detach.getError())) { _linkAttachment = null; _target.flowStateChanged(); |
