From 888581cb9781259073d190edede25e6253ec7dd9 Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Mon, 21 Oct 2013 22:04:51 +0000 Subject: QPID-4984: WIP - Merge from trunk r.1534385. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/linearstore@1534394 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/java/client/build.xml | 2 +- qpid/java/client/pom.xml | 114 ++++++++++++++ .../client/src/main/assembly/qpid-client-bin.xml | 27 ++++ .../apache/qpid/client/AMQConnectionDelegate.java | 2 + .../qpid/client/AMQConnectionDelegate_0_10.java | 7 + .../qpid/client/AMQConnectionDelegate_8_0.java | 36 ++++- .../org/apache/qpid/client/AMQSession_0_8.java | 9 ++ .../qpid/client/BasicMessageProducer_0_8.java | 17 ++- .../handler/ConnectionStartMethodHandler.java | 2 + .../client/message/AMQMessageDelegate_0_10.java | 4 +- .../qpid/client/message/QpidMessageProperties.java | 1 + .../client/messaging/address/QpidQueueOptions.java | 7 +- .../qpid/client/protocol/AMQProtocolSession.java | 13 +- .../org/apache/qpid/client/AMQSession_0_8Test.java | 2 +- .../message/AMQMessageDelegate_0_10Test.java | 2 +- .../client/protocol/AMQProtocolHandlerTest.java | 2 +- .../apache/qpid/client/transport/MockSender.java | 49 +++++++ .../client/transport/TestNetworkConnection.java | 163 +++++++++++++++++++++ 18 files changed, 439 insertions(+), 20 deletions(-) create mode 100644 qpid/java/client/pom.xml create mode 100644 qpid/java/client/src/main/assembly/qpid-client-bin.xml create mode 100644 qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java create mode 100644 qpid/java/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java (limited to 'qpid/java/client') diff --git a/qpid/java/client/build.xml b/qpid/java/client/build.xml index 707bfda024..0375b83c65 100644 --- a/qpid/java/client/build.xml +++ b/qpid/java/client/build.xml @@ -21,7 +21,7 @@ - + diff --git a/qpid/java/client/pom.xml b/qpid/java/client/pom.xml new file mode 100644 index 0000000000..e97b5b7a0d --- /dev/null +++ b/qpid/java/client/pom.xml @@ -0,0 +1,114 @@ + + + + + org.apache.qpid + qpid-project + 0.26-SNAPSHOT + + 4.0.0 + + qpid-client + + + + org.apache.qpid + qpid-common + 0.26-SNAPSHOT + compile + + + + org.apache.geronimo.specs + geronimo-jms_1.1_spec + 1.0 + provided + + + + org.slf4j + slf4j-api + 1.6.4 + compile + + + + + org.apache.qpid + qpid-test-utils + 0.26-SNAPSHOT + test + + + + log4j + log4j + 1.2.16 + test + + + + + + + src/main/resources + + + src/main/java + + **/*.java/ + + + + + + + ${basedir}/src/test/java + + **/*.java/ + + + + ${basedir}/src/test/resources + + + + + + org.apache.maven.plugins + maven-assembly-plugin + + + + src/main/assembly/qpid-client-bin.xml + + + + + make-assembly + package + + single + + + + + + + + diff --git a/qpid/java/client/src/main/assembly/qpid-client-bin.xml b/qpid/java/client/src/main/assembly/qpid-client-bin.xml new file mode 100644 index 0000000000..cc48890fa0 --- /dev/null +++ b/qpid/java/client/src/main/assembly/qpid-client-bin.xml @@ -0,0 +1,27 @@ + + bin + + tar.gz + + qpid-client/${project.version} + + + ${project.basedir} + / + + README* + LICENSE* + NOTICE* + + + + + + /lib + true + + + + diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index a8fdaeb65c..0329deee03 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -80,4 +80,6 @@ public interface AMQConnectionDelegate boolean isSupportedServerFeature(final String featureName); void setHeartbeatListener(HeartbeatListener listener); + + boolean supportsIsBound(); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 69e79d42a0..66590aa0d7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -530,4 +530,11 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec } return true; } + + @Override + public boolean supportsIsBound() + { + //0-10 supports the isBound method + return true; + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 67d7c2a78c..340aca70eb 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -22,7 +22,6 @@ package org.apache.qpid.client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.qpid.AMQException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; @@ -35,13 +34,14 @@ import org.apache.qpid.framing.BasicQosBody; import org.apache.qpid.framing.BasicQosOkBody; import org.apache.qpid.framing.ChannelOpenBody; import org.apache.qpid.framing.ChannelOpenOkBody; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.framing.TxSelectBody; import org.apache.qpid.framing.TxSelectOkBody; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ChannelLimitReachedException; import org.apache.qpid.jms.ConnectionURL; -import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.properties.ConnectionStartProperties; import org.apache.qpid.transport.ConnectionSettings; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.transport.network.OutgoingNetworkTransport; @@ -51,11 +51,10 @@ import org.apache.qpid.transport.network.security.SecurityLayerFactory; import javax.jms.JMSException; import javax.jms.XASession; -import javax.net.ssl.SSLContext; + import java.io.IOException; import java.net.ConnectException; import java.nio.channels.UnresolvedAddressException; -import java.security.GeneralSecurityException; import java.text.MessageFormat; import java.util.ArrayList; import java.util.EnumSet; @@ -384,4 +383,33 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate { _conn.getProtocolHandler().setHeartbeatListener(listener); } + + @Override + public boolean supportsIsBound() + { + //Rough check whether the 'isBound' AMQP extension method is supported, by trying to determine if we are connected to Qpid. + //As older versions of the Qpid broker did not send properties, the value will be assumed true if no server properties + //are found, or the 'product' entry isn't present, and will only be false if it is present but doesn't match expectation. + boolean connectedToQpid = true; + + FieldTable serverProperties = _conn.getProtocolHandler().getProtocolSession().getConnectionStartServerProperties(); + if(serverProperties != null) + { + if(serverProperties.containsKey(ConnectionStartProperties.PRODUCT)) + { + //String.valueof to ensure it is non-null, then lowercase it + String product = String.valueOf(serverProperties.getString(ConnectionStartProperties.PRODUCT)).toLowerCase(); + + //value is "unknown" when the naming properties file hasn't been found, e.g in IDE. + connectedToQpid = product.contains("qpid") || product.equals("unknown"); + } + } + + if(_logger.isDebugEnabled()) + { + _logger.debug("supportsIsBound: " + connectedToQpid); + } + + return connectedToQpid; + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 9a9da62f2a..3ff7416d8f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -397,10 +397,19 @@ public class AMQSession_0_8 extends AMQSession( new FailoverProtectedOperation() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java index bb270b0878..3cb723e5a8 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java @@ -24,6 +24,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.client.message.AMQMessageDelegate_0_8; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.BasicPublishBody; @@ -32,7 +33,6 @@ import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.ExchangeDeclareBody; import org.apache.qpid.framing.MethodRegistry; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,12 +40,14 @@ import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Queue; import javax.jms.Topic; + import java.nio.ByteBuffer; import java.util.UUID; public class BasicMessageProducer_0_8 extends BasicMessageProducer { - private static final Logger _logger = LoggerFactory.getLogger(BasicMessageProducer_0_8.class); + private static final Logger _logger = LoggerFactory.getLogger(BasicMessageProducer_0_8.class); + private static final boolean SET_EXPIRATION_AS_TTL = Boolean.getBoolean(ClientProperties.SET_EXPIRATION_AS_TTL); BasicMessageProducer_0_8(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, AMQSession session, AMQProtocolHandler protocolHandler, long producerId, Boolean immediate, Boolean mandatory) throws AMQException @@ -118,7 +120,16 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer if (timeToLive > 0) { - contentHeaderProperties.setExpiration(currentTime + timeToLive); + if(!SET_EXPIRATION_AS_TTL) + { + //default behaviour used by Qpid + contentHeaderProperties.setExpiration(currentTime + timeToLive); + } + else + { + //alternative behaviour for brokers interpreting the expiration header directly as a TTL. + contentHeaderProperties.setExpiration(timeToLive); + } } else { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java index 366b5f115e..b0c30f82fa 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java @@ -168,6 +168,8 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener public static final String QPID_MAX_COUNT = "qpid.max_count"; public static final String QPID_MAX_SIZE = "qpid.max_size"; public static final String QPID_POLICY_TYPE = "qpid.policy_type"; - public static final String QPID_PERSIST_LAST_NODE = "qpid.persist_last_node"; public static final String QPID_LVQ_KEY = "qpid.LVQ_key"; public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue"; public static final String QPID_LAST_VALUE_QUEUE_NO_BROWSE = "qpid.last_value_queue_no_browse"; @@ -60,11 +59,7 @@ public class QpidQueueOptions extends HashMap this.put(QPID_MAX_SIZE, i); } - public void setPersistLastNode() - { - this.put(QPID_PERSIST_LAST_NODE, 1); - } - + public void setOrderingPolicy(String s) { if (QpidQueueOptions.QPID_LAST_VALUE_QUEUE.equals(s)) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index aed10cf15f..67bd8de846 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -22,7 +22,6 @@ package org.apache.qpid.client.protocol; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; @@ -36,6 +35,7 @@ import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.HeartbeatBody; import org.apache.qpid.framing.MethodDispatcher; import org.apache.qpid.framing.MethodRegistry; @@ -101,6 +101,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession private final AMQConnection _connection; private ConnectionTuneParameters _connectionTuneParameters; + private FieldTable _connectionStartServerProperties; private SaslClient _saslClient; @@ -529,4 +530,14 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { return _connection; } + + public void setConnectionStartServerProperties(FieldTable serverProperties) + { + _connectionStartServerProperties = serverProperties; + } + + public FieldTable getConnectionStartServerProperties() + { + return _connectionStartServerProperties; + } } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_8Test.java b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_8Test.java index d9caa68ef8..c56cf9a72b 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_8Test.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_8Test.java @@ -21,10 +21,10 @@ package org.apache.qpid.client; import org.apache.qpid.AMQException; +import org.apache.qpid.client.transport.TestNetworkConnection; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.amqp_0_91.QueueDeclareOkBodyImpl; import org.apache.qpid.test.utils.QpidTestCase; -import org.apache.qpid.transport.TestNetworkConnection; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.url.AMQBindingURL; diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10Test.java b/qpid/java/client/src/test/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10Test.java index 3afeb79ac3..68f678c1b8 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10Test.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10Test.java @@ -124,7 +124,7 @@ public class AMQMessageDelegate_0_10Test extends QpidTestCase for (Enumeration props = delegate.getPropertyNames(); props.hasMoreElements();) { String key = (String)props.nextElement(); - if (key.equals("JMS_" + QpidMessageProperties.QPID_SUBJECT)) + if (key.equals("JMS_" + QpidMessageProperties.QPID_SUBJECT_JMS_PROPER)) { propFound = true; } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java index 9a5ca33174..4c9448cb39 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java @@ -28,12 +28,12 @@ import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQAuthenticationException; import org.apache.qpid.client.MockAMQConnection; import org.apache.qpid.client.state.AMQState; +import org.apache.qpid.client.transport.TestNetworkConnection; import org.apache.qpid.framing.AMQBody; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.amqp_8_0.BasicRecoverOkBodyImpl; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.transport.TestNetworkConnection; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java b/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java new file mode 100644 index 0000000000..7c3988c19a --- /dev/null +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java @@ -0,0 +1,49 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.transport; + +import java.nio.ByteBuffer; + +import org.apache.qpid.transport.Sender; + +public class MockSender implements Sender +{ + + public void setIdleTimeout(int i) + { + + } + + public void send(ByteBuffer msg) + { + + } + + public void flush() + { + + } + + public void close() + { + + } +} diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java b/qpid/java/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java new file mode 100644 index 0000000000..1ec217e468 --- /dev/null +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java @@ -0,0 +1,163 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.transport; + +import java.security.Principal; +import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.NetworkTransportConfiguration; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.network.NetworkConnection; + +import java.net.BindException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; + +/** + * Test implementation of IoSession, which is required for some tests. Methods not being used are not implemented, + * so if this class is being used and some methods are to be used, then please update those. + */ +public class TestNetworkConnection implements NetworkConnection +{ + private String _remoteHost = "127.0.0.1"; + private String _localHost = "127.0.0.1"; + private int _port = 1; + private SocketAddress _localAddress = null; + private SocketAddress _remoteAddress = null; + private final MockSender _sender; + + public TestNetworkConnection() + { + _sender = new MockSender(); + } + + + + public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory, + NetworkTransportConfiguration config, SSLContextFactory sslFactory) throws BindException + { + + } + + public SocketAddress getLocalAddress() + { + return (_localAddress != null) ? _localAddress : new InetSocketAddress(_localHost, _port); + } + + public SocketAddress getRemoteAddress() + { + return (_remoteAddress != null) ? _remoteAddress : new InetSocketAddress(_remoteHost, _port); + } + + public void setMaxReadIdle(int idleTime) + { + + } + + @Override + public void setPeerPrincipal(Principal principal) + { + } + + @Override + public Principal getPeerPrincipal() + { + return null; + } + + @Override + public int getMaxReadIdle() + { + return 0; + } + + @Override + public int getMaxWriteIdle() + { + return 0; + } + + public void setMaxWriteIdle(int idleTime) + { + + } + + public void close() + { + + } + + public void flush() + { + + } + + public void send(ByteBuffer msg) + { + + } + + public void setIdleTimeout(int i) + { + + } + + public void setPort(int port) + { + _port = port; + } + + public int getPort() + { + return _port; + } + + public void setLocalHost(String host) + { + _localHost = host; + } + + public void setRemoteHost(String host) + { + _remoteHost = host; + } + + public void setLocalAddress(SocketAddress address) + { + _localAddress = address; + } + + public void setRemoteAddress(SocketAddress address) + { + _remoteAddress = address; + } + + public Sender getSender() + { + return _sender; + } + + public void start() + { + } +} -- cgit v1.2.1