summaryrefslogtreecommitdiff
path: root/java/broker
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-04-17 20:09:37 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-04-17 20:09:37 +0000
commit7bdfcea8ed796a70b9d1e1ca39ac4dd1485ad41b (patch)
treedd037ab5ecc308a591339402dcf80c7e5aa5bf4e /java/broker
parent330c5ce8fe684975b3a081d734779303f0e4268c (diff)
downloadqpid-python-7bdfcea8ed796a70b9d1e1ca39ac4dd1485ad41b.tar.gz
QPID-3953 : [Java AMQP 1-0] Fix durable subscribers
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1327268 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java29
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java20
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java28
4 files changed, 41 insertions, 40 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
index e6c79a4077..f31ad5052b 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
@@ -20,31 +20,32 @@
*/
package org.apache.qpid.server.protocol;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import javax.security.auth.callback.CallbackHandler;
import org.apache.qpid.amqp_1_0.codec.FrameWriter;
import org.apache.qpid.amqp_1_0.framing.AMQFrame;
import org.apache.qpid.amqp_1_0.framing.FrameHandler;
import org.apache.qpid.amqp_1_0.framing.OversizeFrameException;
-import org.apache.qpid.amqp_1_0.transport.*;
+import org.apache.qpid.amqp_1_0.transport.CallbackHandlerSource;
+import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
+import org.apache.qpid.amqp_1_0.transport.Container;
+import org.apache.qpid.amqp_1_0.transport.FrameOutputHandler;
import org.apache.qpid.amqp_1_0.type.Binary;
import org.apache.qpid.amqp_1_0.type.FrameBody;
-
import org.apache.qpid.protocol.ServerProtocolEngine;
-import org.apache.qpid.server.configuration.*;
+import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.configuration.ConnectionConfigType;
import org.apache.qpid.server.protocol.v1_0.Connection_1_0;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
-import javax.security.auth.callback.CallbackHandler;
-import java.io.PrintWriter;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.security.Principal;
-import java.util.UUID;
-import java.util.logging.*;
-import java.util.concurrent.atomic.AtomicLong;
-
public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHandler
{
static final AtomicLong _connectionIdSource = new AtomicLong(0L);
@@ -94,6 +95,8 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa
}
private State _state = State.A;
+
+
public ProtocolEngine_1_0_0(final IApplicationRegistry appRegistry, long id)
{
@@ -138,7 +141,7 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa
_network = network;
_sender = sender;
- Container container = new Container();
+ Container container = new Container(_appRegistry.getBrokerId().toString());
_conn = new ConnectionEndpoint(container,asCallbackHandlerSource(_appRegistry.getAuthenticationManager()));
_conn.setConnectionEventListener(new Connection_1_0(_appRegistry));
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
index e4487e00f9..ffd5e750b4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
@@ -20,10 +20,16 @@
*/
package org.apache.qpid.server.protocol;
+import java.io.PrintWriter;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import javax.security.auth.callback.CallbackHandler;
import org.apache.qpid.amqp_1_0.codec.FrameWriter;
import org.apache.qpid.amqp_1_0.codec.ProtocolHandler;
import org.apache.qpid.amqp_1_0.framing.AMQFrame;
-import org.apache.qpid.amqp_1_0.framing.FrameHandler;
import org.apache.qpid.amqp_1_0.framing.OversizeFrameException;
import org.apache.qpid.amqp_1_0.framing.SASLFrameHandler;
import org.apache.qpid.amqp_1_0.transport.CallbackHandlerSource;
@@ -32,7 +38,6 @@ import org.apache.qpid.amqp_1_0.transport.Container;
import org.apache.qpid.amqp_1_0.transport.FrameOutputHandler;
import org.apache.qpid.amqp_1_0.type.Binary;
import org.apache.qpid.amqp_1_0.type.FrameBody;
-
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.configuration.ConfigStore;
import org.apache.qpid.server.configuration.ConnectionConfigType;
@@ -43,15 +48,6 @@ import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
-import javax.security.auth.callback.CallbackHandler;
-import java.io.PrintWriter;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.security.Principal;
-import java.util.UUID;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOutputHandler
{
private long _readBytes;
@@ -165,7 +161,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
_network = network;
_sender = sender;
- Container container = new Container();
+ Container container = new Container(_appRegistry.getBrokerId().toString());
_conn = new ConnectionEndpoint(container, asCallbackHandlerSource(ApplicationRegistry.getInstance()
.getAuthenticationManager()));
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index 8d227d9677..b3e9a74d04 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -347,10 +347,8 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
{
//TODO
// if not durable or close
- if(!TerminusDurability.UNSETTLED_STATE.equals(_durability) ||
- (detach != null && Boolean.TRUE.equals(detach.getClosed())))
+ if(!TerminusDurability.UNSETTLED_STATE.equals(_durability))
{
-
AMQQueue queue = _subscription.getQueue();
try
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index ef298b4731..48a551e42a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -24,11 +24,10 @@ import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.SessionEventListener;
-import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.LifetimePolicy;
-import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.*;
import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.qpid.amqp_1_0.type.messaging.Source;
+import org.apache.qpid.amqp_1_0.type.messaging.Target;
import org.apache.qpid.amqp_1_0.type.transaction.Coordinator;
import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability;
import org.apache.qpid.amqp_1_0.type.transport.*;
@@ -143,13 +142,6 @@ public class Session_1_0 implements SessionEventListener
if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()))
{
linkRegistry.registerSendingLink(endpoint.getName(), sendingLink);
- sendingLink.setCloseAction(new Runnable() {
-
- public void run()
- {
- linkRegistry.unregisterSendingLink(endpoint.getName());
- }
- });
}
}
catch(AmqpErrorException e)
@@ -163,7 +155,19 @@ public class Session_1_0 implements SessionEventListener
}
else
{
- endpoint.setSource(previousLink.getEndpoint().getSource());
+ Source newSource = (Source) endpoint.getSource();
+
+ Source oldSource = (Source) previousLink.getEndpoint().getSource();
+ final TerminusDurability newSourceDurable = newSource == null ? null : newSource.getDurable();
+ if(newSourceDurable != null)
+ {
+ oldSource.setDurable(newSourceDurable);
+ if(newSourceDurable.equals(TerminusDurability.NONE))
+ {
+ linkRegistry.unregisterSendingLink(endpoint.getName());
+ }
+ }
+ endpoint.setSource(oldSource);
SendingLinkEndpoint sendingLinkEndpoint = (SendingLinkEndpoint) endpoint;
previousLink.setLinkAttachment(new SendingLinkAttachment(this, sendingLinkEndpoint));
sendingLinkEndpoint.setLinkEventListener(previousLink);