diff options
| author | Keith Wall <kwall@apache.org> | 2015-03-03 14:56:40 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2015-03-03 14:56:40 +0000 |
| commit | 9dc57fe738f366d875c2319dafdfa2c50ce2f20b (patch) | |
| tree | be6634866a966f358fcb1ba6ba29dfb5c9c340c1 /qpid/java/common | |
| parent | fe37626d4fd8fb3ee5b3146a5159024a3d6d3357 (diff) | |
| download | qpid-python-9dc57fe738f366d875c2319dafdfa2c50ce2f20b.tar.gz | |
merge from trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1663717 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common')
8 files changed, 106 insertions, 13 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java b/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java index 483fbaea50..d033bf86c2 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java @@ -30,7 +30,8 @@ public enum AMQPFilterTypes JMS_SELECTOR("x-filter-jms-selector"), NO_CONSUME("x-filter-no-consume"), AUTO_CLOSE("x-filter-auto-close"), - NO_LOCAL("x-qpid-no-local"); + NO_LOCAL("x-qpid-no-local"), + REPLAY_PERIOD("x-qpid-replay-period"); /** The identifying string for the filter type. */ private final AMQShortString _value; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java index c2a0911bc3..7c4e264ade 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java @@ -181,7 +181,15 @@ public class ClientDelegate extends ConnectionDelegate @Override public void connectionRedirect(Connection conn, ConnectionRedirect redir) { - throw new UnsupportedOperationException(); + conn.setRedirecting(true); + conn.getSender().close(); + for(ConnectionListener listener : conn.getListeners()) + { + if(listener.redirect(redir.getHost(), redir.getKnownHosts())) + { + break; + } + } } @Override diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 7007948980..4ae7e8d47a 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -80,6 +80,7 @@ public class Connection extends ConnectionInvoker private NetworkConnection _networkConnection; private FrameSizeObserver _frameSizeObserver; private boolean _messageCompressionSupported; + private final AtomicBoolean _redirecting = new AtomicBoolean(); public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING } @@ -91,6 +92,12 @@ public class Connection extends ConnectionInvoker log.error(exception, "connection exception"); } public void closed(Connection conn) {} + + @Override + public boolean redirect(final String host, final List<Object> knownHosts) + { + return false; + } } public static interface SessionFactory @@ -150,6 +157,11 @@ public class Connection extends ConnectionInvoker listeners.add(listener); } + public List<ConnectionListener> getListeners() + { + return Collections.unmodifiableList(listeners); + } + public ProtocolEventSender getSender() { return sender; @@ -224,6 +236,7 @@ public class Connection extends ConnectionInvoker synchronized (lock) { conSettings = settings; + _redirecting.set(false); state = OPENING; userID = settings.getUsername(); connectionLost.set(false); @@ -257,7 +270,7 @@ public class Connection extends ConnectionInvoker send(new ProtocolHeader(1, 0, 10)); Waiter w = new Waiter(lock, timeout); - while (w.hasTime() && state == OPENING && error == null) + while (w.hasTime() && ((state == OPENING && error == null) || isRedirecting())) { w.await(); } @@ -853,4 +866,15 @@ public class Connection extends ConnectionInvoker { return _messageCompressionSupported; } + + public boolean isRedirecting() + { + return _redirecting.get(); + } + + public void setRedirecting(final boolean redirecting) + { + _redirecting.set(redirecting); + } + } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionListener.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionListener.java index 616e76825a..b055b9f5e1 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionListener.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionListener.java @@ -21,6 +21,8 @@ package org.apache.qpid.transport; +import java.util.List; + /** * ConnectionListener * @@ -35,4 +37,5 @@ public interface ConnectionListener void closed(Connection connection); + boolean redirect(String host, List<Object> knownHosts); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java index 2a70087c10..5ef94b7d13 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java @@ -21,6 +21,8 @@ package org.apache.qpid.transport.network.security.sasl; +import java.util.List; + import javax.security.sasl.Sasl; import javax.security.sasl.SaslClient; @@ -56,7 +58,13 @@ public abstract class SASLEncryptor implements ConnectionListener } } } - + + @Override + public boolean redirect(final String host, final List<Object> knownHosts) + { + return false; + } + public void exception(Connection conn, ConnectionException exception){} public void closed(Connection conn) {} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java b/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java index 77902c3531..3ea99ce4ab 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java @@ -20,15 +20,16 @@ */ package org.apache.qpid.url; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.Map; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; -import java.net.URISyntaxException; -import java.util.HashMap; - public class AMQBindingURL implements BindingURL { private static final Logger _logger = LoggerFactory.getLogger(AMQBindingURL.class); @@ -135,6 +136,20 @@ public class AMQBindingURL implements BindingURL return _options.get(key); } + @Override + public Map<String,Object> getConsumerOptions() + { + Map<String,Object> options = new HashMap<>(); + for(Map.Entry<String,String> option : _options.entrySet()) + { + if(!NON_CONSUMER_OPTIONS.contains(option.getKey())) + { + options.put(option.getKey(), option.getValue()); + } + } + return options; + } + public void setOption(String key, String value) { _options.put(key, value); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java b/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java index 80a1ae540b..91f80ff88c 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java @@ -20,6 +20,12 @@ */ package org.apache.qpid.url; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + import org.apache.qpid.framing.AMQShortString; /* @@ -47,6 +53,18 @@ public interface BindingURL */ public static final String OPTION_REJECT_BEHAVIOUR = "rejectbehaviour"; + public static final Set<String> NON_CONSUMER_OPTIONS = + Collections.unmodifiableSet(new HashSet<String>(Arrays.asList(OPTION_EXCLUSIVE, + OPTION_AUTODELETE, + OPTION_DURABLE, + OPTION_BROWSE, + OPTION_ROUTING_KEY, + OPTION_BINDING_KEY, + OPTION_EXCHANGE_AUTODELETE, + OPTION_EXCHANGE_DURABLE, + OPTION_EXCHANGE_DURABLE, + OPTION_REJECT_BEHAVIOUR))); + String getURL(); @@ -60,6 +78,9 @@ public interface BindingURL String getOption(String key); + Map<String,Object> getConsumerOptions(); + + boolean containsOption(String key); AMQShortString getRoutingKey(); diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java index 46d1887496..afc2b968f4 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java @@ -20,12 +20,6 @@ */ package org.apache.qpid.transport; -import org.apache.log4j.Logger; -import org.apache.qpid.test.utils.QpidTestCase; -import org.apache.qpid.transport.network.ConnectionBinding; -import org.apache.qpid.transport.network.io.IoAcceptor; -import org.apache.qpid.transport.util.Waiter; - import static org.apache.qpid.transport.Option.EXPECTED; import static org.apache.qpid.transport.Option.NONE; import static org.apache.qpid.transport.Option.SYNC; @@ -37,6 +31,13 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.log4j.Logger; + +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.transport.network.ConnectionBinding; +import org.apache.qpid.transport.network.io.IoAcceptor; +import org.apache.qpid.transport.util.Waiter; + /** * ConnectionTest */ @@ -171,6 +172,12 @@ public class ConnectionTest extends QpidTestCase implements SessionListener closed.countDown(); } } + + @Override + public boolean redirect(final String host, final List<Object> knownHosts) + { + return false; + } }); conn.connect("localhost", port, null, "guest", "guest", false, null); return conn; @@ -437,6 +444,12 @@ public class ConnectionTest extends QpidTestCase implements SessionListener conn.connect("localhost", port, null, "guest", "guest", false, null); conn.resume(); } + + @Override + public boolean redirect(final String host, final List<Object> knownHosts) + { + return false; + } } class TestSessionListener implements SessionListener |
