summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-02-12 20:25:53 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-02-12 20:25:53 +0000
commit762aa71a99d13cb3f7efd29cb95098eadafb5396 (patch)
tree86cf705813a21b0dde2393b799571fe48c2d0d46 /qpid/java/broker-plugins
parente243745d439671418016a2be1570209269b45070 (diff)
downloadqpid-python-762aa71a99d13cb3f7efd29cb95098eadafb5396.tar.gz
merged from trunk r1659391
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1659392 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java11
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java22
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java5
3 files changed, 23 insertions, 15 deletions
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index f06f70b362..fa2e543f8d 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -47,6 +47,7 @@ import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
@@ -278,7 +279,12 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
{
//TODO
getEndpoint().setSource(null);
- getEndpoint().detach();
+ getEndpoint().close();
+
+ final LinkRegistry linkReg = getSession().getConnection()
+ .getVirtualHost()
+ .getLinkRegistry(getEndpoint().getSession().getConnection().getRemoteContainerId());
+ linkReg.unregisterSendingLink(getEndpoint().getName());
}
public boolean allocateCredit(final ServerMessage msg)
@@ -418,7 +424,8 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
modified.setDeliveryFailed(true);
_link.getEndpoint().updateDisposition(_deliveryTag, modified, true);
_link.getEndpoint().sendFlowConditional();
- _queueEntry.unlockAcquisition();
+ _queueEntry.incrementDeliveryCount();
+ _queueEntry.release();
}
}
});
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index 48ff420965..6a202998f4 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -292,15 +292,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
actualFilters.put(entry.getKey(), entry.getValue());
}
- catch (ParseException e)
- {
- Error error = new Error();
- error.setCondition(AmqpError.INVALID_FIELD);
- error.setDescription("Invalid JMS Selector: " + selectorFilter.getValue());
- error.setInfo(Collections.singletonMap(Symbol.valueOf("field"), Symbol.valueOf("filter")));
- throw new AmqpErrorException(error);
- }
- catch (SelectorParsingException e)
+ catch (ParseException | SelectorParsingException e)
{
Error error = new Error();
error.setCondition(AmqpError.INVALID_FIELD);
@@ -364,6 +356,12 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
options.add(ConsumerImpl.Option.NO_LOCAL);
}
+ if(_durability == TerminusDurability.CONFIGURATION ||
+ _durability == TerminusDurability.UNSETTLED_STATE )
+ {
+ options.add(ConsumerImpl.Option.DURABLE);
+ }
+
try
{
final String name;
@@ -408,7 +406,8 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
{
//TODO
// if not durable or close
- if(!TerminusDurability.UNSETTLED_STATE.equals(_durability))
+ if(Boolean.TRUE.equals(detach.getClosed())
+ || !(TerminusDurability.UNSETTLED_STATE.equals(_durability)|| TerminusDurability.CONFIGURATION.equals(_durability)))
{
while(!_consumer.trySendLock())
{
@@ -464,7 +463,8 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
_consumer.releaseSendLock();
}
}
- else if(detach == null || detach.getError() != null)
+ else if(detach.getError() != null
+ && !_linkAttachment.getEndpoint().getSession().isSyntheticError(detach.getError()))
{
_linkAttachment = null;
_target.flowStateChanged();
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 0b613d9a5a..0a29a70373 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -215,7 +215,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
registerConsumer(sendingLink);
link = sendingLink;
- if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()))
+ if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable()))
{
linkRegistry.registerSendingLink(endpoint.getName(), sendingLink);
}
@@ -377,7 +377,8 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
receivingLinkEndpoint.setLinkEventListener(new SubjectSpecificReceivingLinkListener(receivingLink));
link = receivingLink;
- if(TerminusDurability.UNSETTLED_STATE.equals(target.getDurable()))
+ if(TerminusDurability.UNSETTLED_STATE.equals(target.getDurable())
+ || TerminusDurability.CONFIGURATION.equals(target.getDurable()))
{
linkRegistry.registerReceivingLink(endpoint.getName(), receivingLink);
}