diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2007-08-30 12:19:31 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2007-08-30 12:19:31 +0000 |
| commit | 61a61c3716e42bf175004049976391407f28704d (patch) | |
| tree | 8ab343c1941a7565532189763dc79473a10beb3c /java/client/src/main | |
| parent | e183227707d150b1f42e750df0e90cd7dac8744e (diff) | |
| download | qpid-python-61a61c3716e42bf175004049976391407f28704d.tar.gz | |
Remerge of M2. All tests pass locally
Testing done in Intelij and mvn command line via windows/cygwin.
Python tests removed from auto build pending Jython-siztion. Tested running broker in intelij and python run-tests from cygwin.
All tests pass. (CombinedTest still exhibts a race condition. but that has always been so.)
Additional Race condition identified (around MsgReject/AutoDeleteQueues) during testing patch to follow.
systests are inconsistent Some use TestableMemoryMessageStore some use MemoryMessgaeStore.
Lets not roll back this change if issues are discovered. Lets work together to go forward and address any issues. I have spent a lot of time ensuring the tests work for me so I hope that they work for you.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@571129 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/main')
9 files changed, 164 insertions, 128 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index d59412fdba..1ac43f4388 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -46,11 +46,9 @@ import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.jms.FailoverPolicy; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.url.URLSyntaxException; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import javax.jms.ConnectionConsumer; import javax.jms.ConnectionMetaData; import javax.jms.Destination; @@ -68,7 +66,6 @@ import javax.naming.NamingException; import javax.naming.Reference; import javax.naming.Referenceable; import javax.naming.StringRefAddr; - import java.io.IOException; import java.net.ConnectException; import java.nio.channels.UnresolvedAddressException; @@ -1106,6 +1103,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } else { + //Should never get here as all AMQEs are required to have an ErrorCode! je = new JMSException("Exception thrown against " + toString() + ": " + cause); } @@ -1148,7 +1146,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } else { - _logger.info("Not a hard-error connection not closing."); + _logger.info("Not a hard-error connection not closing: " + cause.getMessage()); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java index b3fbd1f510..eff6360d91 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java @@ -20,6 +20,14 @@ */ package org.apache.qpid.client; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.url.URLHelper; +import org.apache.qpid.url.URLSyntaxException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.net.URI; import java.net.URISyntaxException; import java.util.HashMap; @@ -35,6 +43,8 @@ import org.apache.qpid.url.URLSyntaxException; public class AMQConnectionURL implements ConnectionURL { + private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionURL.class); + private String _url; private String _failoverMethod; private HashMap<String, String> _failoverOptions; @@ -182,7 +192,7 @@ public class AMQConnectionURL implements ConnectionURL if (colonIndex == -1) { throw URLHelper.parseError(AMQ_PROTOCOL.length() + 3, userinfo.length(), - "Null password in user information not allowed.", _url); + "Null password in user information not allowed.", _url); } else { @@ -387,7 +397,14 @@ public class AMQConnectionURL implements ConnectionURL if (_password != null) { sb.append(':'); - sb.append(_password); + if (_logger.isDebugEnabled()) + { + sb.append(_password); + } + else + { + sb.append("********"); + } } sb.append('@'); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 879578bd6c..af469ee291 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -73,7 +73,6 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.URLSyntaxException; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -101,7 +100,6 @@ import javax.jms.Topic; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; - import java.io.Serializable; import java.text.MessageFormat; import java.util.ArrayList; @@ -293,6 +291,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** Indicates that runtime exceptions should be generated on vilations of the strict AMQP. */ private final boolean _strictAMQPFATAL; + private final Object _messageDeliveryLock = new Object(); /** * Creates a new session on a connection. @@ -505,49 +504,53 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); } - // We must close down all producers and consumers in an orderly fashion. This is the only method - // that can be called from a different thread of control from the one controlling the session. - synchronized (_connection.getFailoverMutex()) + synchronized (_messageDeliveryLock) { - // Ensure we only try and close an open session. - if (!_closed.getAndSet(true)) - { - // we pass null since this is not an error case - closeProducersAndConsumers(null); - try + // We must close down all producers and consumers in an orderly fashion. This is the only method + // that can be called from a different thread of control from the one controlling the session. + synchronized (_connection.getFailoverMutex()) + { + // Ensure we only try and close an open session. + if (!_closed.getAndSet(true)) { + // we pass null since this is not an error case + closeProducersAndConsumers(null); - getProtocolHandler().closeSession(this); + try + { - final AMQFrame frame = - ChannelCloseBody.createAMQFrame(getChannelId(), getProtocolMajorVersion(), getProtocolMinorVersion(), - 0, // classId - 0, // methodId - AMQConstant.REPLY_SUCCESS.getCode(), // replyCode - new AMQShortString("JMS client closing channel")); // replyText + getProtocolHandler().closeSession(this); - getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout); + final AMQFrame frame = + ChannelCloseBody.createAMQFrame(getChannelId(), getProtocolMajorVersion(), getProtocolMinorVersion(), + 0, // classId + 0, // methodId + AMQConstant.REPLY_SUCCESS.getCode(), // replyCode + new AMQShortString("JMS client closing channel")); // replyText - // When control resumes at this point, a reply will have been received that - // indicates the broker has closed the channel successfully. - } - catch (AMQException e) - { - JMSException jmse = new JMSException("Error closing session: " + e); - jmse.setLinkedException(e); - throw jmse; - } - // This is ignored because the channel is already marked as closed so the fail-over process will - // not re-open it. - catch (FailoverException e) - { - _logger.debug( - "Got FailoverException during channel close, ignored as channel already marked as closed."); - } - finally - { - _connection.deregisterSession(_channelId); + getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout); + + // When control resumes at this point, a reply will have been received that + // indicates the broker has closed the channel successfully. + } + catch (AMQException e) + { + JMSException jmse = new JMSException("Error closing session: " + e); + jmse.setLinkedException(e); + throw jmse; + } + // This is ignored because the channel is already marked as closed so the fail-over process will + // not re-open it. + catch (FailoverException e) + { + _logger.debug( + "Got FailoverException during channel close, ignored as channel already marked as closed."); + } + finally + { + _connection.deregisterSession(_channelId); + } } } } @@ -560,23 +563,26 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public void closed(Throwable e) throws JMSException { - synchronized (_connection.getFailoverMutex()) + synchronized (_messageDeliveryLock) { - // An AMQException has an error code and message already and will be passed in when closure occurs as a - // result of a channel close request - _closed.set(true); - AMQException amqe; - if (e instanceof AMQException) - { - amqe = (AMQException) e; - } - else + synchronized (_connection.getFailoverMutex()) { - amqe = new AMQException(null, "Closing session forcibly", e); - } + // An AMQException has an error code and message already and will be passed in when closure occurs as a + // result of a channel close request + _closed.set(true); + AMQException amqe; + if (e instanceof AMQException) + { + amqe = (AMQException) e; + } + else + { + amqe = new AMQException(null, "Closing session forcibly", e); + } - _connection.deregisterSession(_channelId); - closeProducersAndConsumers(amqe); + _connection.deregisterSession(_channelId); + closeProducersAndConsumers(amqe); + } } } @@ -1279,7 +1285,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - public void declareAndBind(AMQDestination amqd) + public void declareAndBind(AMQDestination amqd) throws AMQException { @@ -2664,7 +2670,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _lock.wait(); } - dispatchMessage(message); + synchronized (_messageDeliveryLock) + { + dispatchMessage(message); + } while (connectionStopped()) { diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java index 8f9a84a3a6..862a9be8d4 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java @@ -1,18 +1,21 @@ /* * - * Copyright (c) 2006 The Apache Software Foundation + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. * */ package org.apache.qpid.client.handler; diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java index 81228b4cdc..65060d44d2 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java @@ -1,18 +1,21 @@ /* * - * Copyright (c) 2006 The Apache Software Foundation + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. * */ package org.apache.qpid.client.handler; diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index a00078b010..2c435aba6c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -25,7 +25,6 @@ import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoSession; import org.apache.mina.filter.SSLFilter; import org.apache.mina.filter.codec.ProtocolCodecFilter; - import org.apache.qpid.AMQConnectionClosedException; import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; @@ -55,7 +54,6 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.ssl.SSLContextFactory; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -273,10 +271,9 @@ public class AMQProtocolHandler extends IoHandlerAdapter if (_failoverState != FailoverState.IN_PROGRESS) { - _logger.info("sessionClose() not allowed to failover"); - _connection.exceptionReceived( - new AMQDisconnectedException("Server closed connection and reconnection " + - "not permitted.", null)); + _logger.debug("sessionClose() not allowed to failover"); + _connection.exceptionReceived(new AMQDisconnectedException( + "Server closed connection and reconnection " + "not permitted.", null)); } else { diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index bef3180041..9f430d76a7 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -51,7 +51,6 @@ import org.apache.qpid.framing.ExchangeBoundOkBody; import org.apache.qpid.framing.QueueDeleteOkBody; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -254,8 +253,10 @@ public class AMQStateManager implements AMQMethodListener if (_currentState != s) { - _logger.warn("State not achieved within permitted time. Current state " + _currentState + ", desired state: " + s); - throw new AMQException(null, "State not achieved within permitted time. Current state " + _currentState + ", desired state: " + s, null); + _logger.warn("State not achieved within permitted time. Current state " + _currentState + + ", desired state: " + s); + throw new AMQException(null, "State not achieved within permitted time. Current state " + _currentState + + ", desired state: " + s, null); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 459579d920..4cda53a6a1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -26,11 +26,9 @@ import org.apache.mina.common.IoServiceConfig; import org.apache.mina.transport.socket.nio.SocketConnector; import org.apache.mina.transport.vmpipe.VmPipeAcceptor; import org.apache.mina.transport.vmpipe.VmPipeAddress; - import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.pool.ReadWriteThreadModel; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,40 +88,40 @@ public class TransportConnection switch (transport) { - case TCP: - _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory() + case TCP: + _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory() + { + public IoConnector newSocketConnector() { - public IoConnector newSocketConnector() + SocketConnector result; + // FIXME - this needs to be sorted to use the new Mina MultiThread SA. + if (Boolean.getBoolean("qpidnio")) { - SocketConnector result; - // FIXME - this needs to be sorted to use the new Mina MultiThread SA. - if (Boolean.getBoolean("qpidnio")) - { - _logger.error("Using Qpid NIO - sysproperty 'qpidnio' is set."); - // result = new org.apache.qpid.nio.SocketConnector(); // non-blocking connector - } - // else - - { - _logger.info("Using Mina NIO"); - result = new SocketConnector(); // non-blocking connector - } - - // Don't have the connector's worker thread wait around for other connections (we only use - // one SocketConnector per connection at the moment anyway). This allows short-running - // clients (like unit tests) to complete quickly. - result.setWorkerTimeout(0); - - return result; + _logger.error("Using Qpid NIO - sysproperty 'qpidnio' is set."); + // result = new org.apache.qpid.nio.SocketConnector(); // non-blocking connector } - }); - break; + // else - case VM: - { - _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker")); - break; - } + { + _logger.info("Using Mina NIO"); + result = new SocketConnector(); // non-blocking connector + } + + // Don't have the connector's worker thread wait around for other connections (we only use + // one SocketConnector per connection at the moment anyway). This allows short-running + // clients (like unit tests) to complete quickly. + result.setWorkerTimeout(0); + + return result; + } + }); + break; + + case VM: + { + _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker")); + break; + } } return _instance; @@ -145,7 +143,7 @@ public class TransportConnection } private static ITransportConnection getVMTransport(BrokerDetails details, boolean AutoCreate) - throws AMQVMBrokerCreationException + throws AMQVMBrokerCreationException { int port = details.getPort(); @@ -158,7 +156,7 @@ public class TransportConnection else { throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port - + " does not exist. Auto create disabled.", null); + + " does not exist. Auto create disabled.", null); } } @@ -253,8 +251,8 @@ public class TransportConnection IoHandlerAdapter provider; try { - Class[] cnstr = { Integer.class }; - Object[] params = { port }; + Class[] cnstr = {Integer.class}; + Object[] params = {port}; provider = (IoHandlerAdapter) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params); // Give the broker a second to create _logger.info("Created VMBroker Instance:" + port); @@ -273,7 +271,7 @@ public class TransportConnection } AMQVMBrokerCreationException amqbce = - new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", null); + new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", null); amqbce.initCause(e); throw amqbce; } diff --git a/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java b/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java index 1818132be0..dc0d9b8c78 100644 --- a/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java +++ b/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java @@ -37,6 +37,16 @@ public class AMQVMBrokerCreationException extends AMQTransportConnectionExceptio { private int _port; + /** + * @param port + * + * @deprecated + */ + public AMQVMBrokerCreationException(int port) + { + this(null, port, "Unable to create vm broker", null); + } + public AMQVMBrokerCreationException(AMQConstant errorCode, int port, String message, Throwable cause) { super(errorCode, message, cause); |
