summaryrefslogtreecommitdiff
path: root/qpid/java/common
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2015-03-03 14:56:40 +0000
committerKeith Wall <kwall@apache.org>2015-03-03 14:56:40 +0000
commit9dc57fe738f366d875c2319dafdfa2c50ce2f20b (patch)
treebe6634866a966f358fcb1ba6ba29dfb5c9c340c1 /qpid/java/common
parentfe37626d4fd8fb3ee5b3146a5159024a3d6d3357 (diff)
downloadqpid-python-9dc57fe738f366d875c2319dafdfa2c50ce2f20b.tar.gz
merge from trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1663717 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java3
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java10
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java26
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionListener.java3
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java10
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java21
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java21
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java25
8 files changed, 106 insertions, 13 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java b/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java
index 483fbaea50..d033bf86c2 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java
@@ -30,7 +30,8 @@ public enum AMQPFilterTypes
JMS_SELECTOR("x-filter-jms-selector"),
NO_CONSUME("x-filter-no-consume"),
AUTO_CLOSE("x-filter-auto-close"),
- NO_LOCAL("x-qpid-no-local");
+ NO_LOCAL("x-qpid-no-local"),
+ REPLAY_PERIOD("x-qpid-replay-period");
/** The identifying string for the filter type. */
private final AMQShortString _value;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
index c2a0911bc3..7c4e264ade 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
@@ -181,7 +181,15 @@ public class ClientDelegate extends ConnectionDelegate
@Override
public void connectionRedirect(Connection conn, ConnectionRedirect redir)
{
- throw new UnsupportedOperationException();
+ conn.setRedirecting(true);
+ conn.getSender().close();
+ for(ConnectionListener listener : conn.getListeners())
+ {
+ if(listener.redirect(redir.getHost(), redir.getKnownHosts()))
+ {
+ break;
+ }
+ }
}
@Override
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index 7007948980..4ae7e8d47a 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -80,6 +80,7 @@ public class Connection extends ConnectionInvoker
private NetworkConnection _networkConnection;
private FrameSizeObserver _frameSizeObserver;
private boolean _messageCompressionSupported;
+ private final AtomicBoolean _redirecting = new AtomicBoolean();
public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING }
@@ -91,6 +92,12 @@ public class Connection extends ConnectionInvoker
log.error(exception, "connection exception");
}
public void closed(Connection conn) {}
+
+ @Override
+ public boolean redirect(final String host, final List<Object> knownHosts)
+ {
+ return false;
+ }
}
public static interface SessionFactory
@@ -150,6 +157,11 @@ public class Connection extends ConnectionInvoker
listeners.add(listener);
}
+ public List<ConnectionListener> getListeners()
+ {
+ return Collections.unmodifiableList(listeners);
+ }
+
public ProtocolEventSender getSender()
{
return sender;
@@ -224,6 +236,7 @@ public class Connection extends ConnectionInvoker
synchronized (lock)
{
conSettings = settings;
+ _redirecting.set(false);
state = OPENING;
userID = settings.getUsername();
connectionLost.set(false);
@@ -257,7 +270,7 @@ public class Connection extends ConnectionInvoker
send(new ProtocolHeader(1, 0, 10));
Waiter w = new Waiter(lock, timeout);
- while (w.hasTime() && state == OPENING && error == null)
+ while (w.hasTime() && ((state == OPENING && error == null) || isRedirecting()))
{
w.await();
}
@@ -853,4 +866,15 @@ public class Connection extends ConnectionInvoker
{
return _messageCompressionSupported;
}
+
+ public boolean isRedirecting()
+ {
+ return _redirecting.get();
+ }
+
+ public void setRedirecting(final boolean redirecting)
+ {
+ _redirecting.set(redirecting);
+ }
+
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionListener.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionListener.java
index 616e76825a..b055b9f5e1 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionListener.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionListener.java
@@ -21,6 +21,8 @@
package org.apache.qpid.transport;
+import java.util.List;
+
/**
* ConnectionListener
*
@@ -35,4 +37,5 @@ public interface ConnectionListener
void closed(Connection connection);
+ boolean redirect(String host, List<Object> knownHosts);
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java
index 2a70087c10..5ef94b7d13 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java
@@ -21,6 +21,8 @@
package org.apache.qpid.transport.network.security.sasl;
+import java.util.List;
+
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;
@@ -56,7 +58,13 @@ public abstract class SASLEncryptor implements ConnectionListener
}
}
}
-
+
+ @Override
+ public boolean redirect(final String host, final List<Object> knownHosts)
+ {
+ return false;
+ }
+
public void exception(Connection conn, ConnectionException exception){}
public void closed(Connection conn) {}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java b/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java
index 77902c3531..3ea99ce4ab 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java
@@ -20,15 +20,16 @@
*/
package org.apache.qpid.url;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Map;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
-import java.net.URISyntaxException;
-import java.util.HashMap;
-
public class AMQBindingURL implements BindingURL
{
private static final Logger _logger = LoggerFactory.getLogger(AMQBindingURL.class);
@@ -135,6 +136,20 @@ public class AMQBindingURL implements BindingURL
return _options.get(key);
}
+ @Override
+ public Map<String,Object> getConsumerOptions()
+ {
+ Map<String,Object> options = new HashMap<>();
+ for(Map.Entry<String,String> option : _options.entrySet())
+ {
+ if(!NON_CONSUMER_OPTIONS.contains(option.getKey()))
+ {
+ options.put(option.getKey(), option.getValue());
+ }
+ }
+ return options;
+ }
+
public void setOption(String key, String value)
{
_options.put(key, value);
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java b/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java
index 80a1ae540b..91f80ff88c 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java
@@ -20,6 +20,12 @@
*/
package org.apache.qpid.url;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
import org.apache.qpid.framing.AMQShortString;
/*
@@ -47,6 +53,18 @@ public interface BindingURL
*/
public static final String OPTION_REJECT_BEHAVIOUR = "rejectbehaviour";
+ public static final Set<String> NON_CONSUMER_OPTIONS =
+ Collections.unmodifiableSet(new HashSet<String>(Arrays.asList(OPTION_EXCLUSIVE,
+ OPTION_AUTODELETE,
+ OPTION_DURABLE,
+ OPTION_BROWSE,
+ OPTION_ROUTING_KEY,
+ OPTION_BINDING_KEY,
+ OPTION_EXCHANGE_AUTODELETE,
+ OPTION_EXCHANGE_DURABLE,
+ OPTION_EXCHANGE_DURABLE,
+ OPTION_REJECT_BEHAVIOUR)));
+
String getURL();
@@ -60,6 +78,9 @@ public interface BindingURL
String getOption(String key);
+ Map<String,Object> getConsumerOptions();
+
+
boolean containsOption(String key);
AMQShortString getRoutingKey();
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
index 46d1887496..afc2b968f4 100644
--- a/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
+++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
@@ -20,12 +20,6 @@
*/
package org.apache.qpid.transport;
-import org.apache.log4j.Logger;
-import org.apache.qpid.test.utils.QpidTestCase;
-import org.apache.qpid.transport.network.ConnectionBinding;
-import org.apache.qpid.transport.network.io.IoAcceptor;
-import org.apache.qpid.transport.util.Waiter;
-
import static org.apache.qpid.transport.Option.EXPECTED;
import static org.apache.qpid.transport.Option.NONE;
import static org.apache.qpid.transport.Option.SYNC;
@@ -37,6 +31,13 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.transport.network.ConnectionBinding;
+import org.apache.qpid.transport.network.io.IoAcceptor;
+import org.apache.qpid.transport.util.Waiter;
+
/**
* ConnectionTest
*/
@@ -171,6 +172,12 @@ public class ConnectionTest extends QpidTestCase implements SessionListener
closed.countDown();
}
}
+
+ @Override
+ public boolean redirect(final String host, final List<Object> knownHosts)
+ {
+ return false;
+ }
});
conn.connect("localhost", port, null, "guest", "guest", false, null);
return conn;
@@ -437,6 +444,12 @@ public class ConnectionTest extends QpidTestCase implements SessionListener
conn.connect("localhost", port, null, "guest", "guest", false, null);
conn.resume();
}
+
+ @Override
+ public boolean redirect(final String host, final List<Object> knownHosts)
+ {
+ return false;
+ }
}
class TestSessionListener implements SessionListener