summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-02-11 01:04:08 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-02-11 01:04:08 +0000
commit35f8db0065335d4da24de4459cf228b077218138 (patch)
treeaa2221d5165f355940a14089404b959740b24df3 /qpid/java
parent93024d74d2711c3c3cdab6e98f7158ca730abbe1 (diff)
downloadqpid-python-35f8db0065335d4da24de4459cf228b077218138.tar.gz
QPID-6384 : fix various issues with durable links
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1658849 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java21
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java16
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java8
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java3
4 files changed, 38 insertions, 10 deletions
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java
index 434f939a21..246d43d3de 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java
@@ -21,15 +21,28 @@
package org.apache.qpid.amqp_1_0.transport;
-import org.apache.qpid.amqp_1_0.type.*;
-import org.apache.qpid.amqp_1_0.type.transport.*;
-import org.apache.qpid.amqp_1_0.type.transport.Error;
-
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.DeliveryState;
+import org.apache.qpid.amqp_1_0.type.Outcome;
+import org.apache.qpid.amqp_1_0.type.Source;
+import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.Target;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.UnsignedLong;
+import org.apache.qpid.amqp_1_0.type.transport.Attach;
+import org.apache.qpid.amqp_1_0.type.transport.Detach;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
+import org.apache.qpid.amqp_1_0.type.transport.Flow;
+import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode;
+import org.apache.qpid.amqp_1_0.type.transport.Role;
+import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
+import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+
public abstract class LinkEndpoint<T extends LinkEventListener>
{
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
index 0f37518773..5a28ddcb60 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
@@ -98,6 +98,12 @@ public class SessionEndpoint
private int _availableOutgoingCredit;
private UnsignedInteger _lastSentIncomingLimit;
+ private final Error _sessionEndedLinkError =
+ new Error(LinkError.DETACH_FORCED,
+ "Force detach the link because the session is remotely ended.");
+
+
+
public SessionEndpoint(final ConnectionEndpoint connectionEndpoint)
{
this(connectionEndpoint, UnsignedInteger.valueOf(0));
@@ -240,19 +246,21 @@ public class SessionEndpoint
private void detachLinks()
{
Collection<UnsignedInteger> handles = new ArrayList<UnsignedInteger>(_remoteLinkEndpoints.keySet());
- Error error = new Error();
- error.setCondition(LinkError.DETACH_FORCED);
- error.setDescription("Force detach the link because the session is remotely ended.");
for(UnsignedInteger handle : handles)
{
Detach detach = new Detach();
detach.setClosed(false);
detach.setHandle(handle);
- detach.setError(error);
+ detach.setError(_sessionEndedLinkError);
detach(handle, detach);
}
}
+ public boolean isSyntheticError(Error error)
+ {
+ return error == _sessionEndedLinkError;
+ }
+
public short getSendingChannel()
{
return _sendingChannel;
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index 598fce03b9..f19ce6b1be 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -46,6 +46,7 @@ import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
@@ -283,7 +284,12 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
{
//TODO
getEndpoint().setSource(null);
- getEndpoint().detach();
+ getEndpoint().close();
+
+ final LinkRegistry linkReg = getSession().getConnection()
+ .getVirtualHost()
+ .getLinkRegistry(getEndpoint().getSession().getConnection().getRemoteContainerId());
+ linkReg.unregisterSendingLink(getEndpoint().getName());
}
public boolean allocateCredit(final ServerMessage msg)
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index cdaf5f0ed6..e3994005d6 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -464,7 +464,8 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
_consumer.releaseSendLock();
}
}
- else if(detach == null || detach.getError() != null)
+ else if(detach.getError() != null
+ && !_linkAttachment.getEndpoint().getSession().isSyntheticError(detach.getError()))
{
_linkAttachment = null;
_target.flowStateChanged();