diff options
Diffstat (limited to 'qpid/java')
5 files changed, 121 insertions, 6 deletions
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java index f253f8d0ab..8823f0f148 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java @@ -107,6 +107,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect { _conn = new org.apache.qpid.amqp_1_0.client.Connection(_host, _port, _username, _password, container, _remoteHost, _ssl); + _conn.setConnectionErrorTask(new ConnectionErrorTask()); // TODO - retrieve negotiated AMQP version _connectionMetaData = new ConnectionMetaDataImpl(1,0,0); } @@ -234,8 +235,8 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect public void setClientID(final String value) throws JMSException { checkNotConnected("Cannot set client-id to \"" - + value - + "\"; client-id must be set before the connection is used"); + + value + + "\"; client-id must be set before the connection is used"); if( _clientId !=null ) { throw new IllegalStateException("client-id has already been set"); @@ -534,4 +535,32 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect return _syncPublish; } + private class ConnectionErrorTask implements Runnable + { + + @Override + public void run() + { + + try + { + final ExceptionListener exceptionListener = getExceptionListener(); + + if(exceptionListener != null) + { + final org.apache.qpid.amqp_1_0.type.transport.Error connectionError = _conn.getConnectionError(); + if(connectionError != null) + { + exceptionListener.onException(new JMSException(connectionError.getDescription(), + connectionError.getCondition().toString())); + } + } + } + catch (JMSException ignored) + { + // ignored + } + } + } + } diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java index f66a33b978..2cbf0ee591 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java @@ -28,7 +28,10 @@ import java.nio.ByteBuffer; import java.security.Principal; import java.util.logging.Level; import java.util.logging.Logger; + import javax.net.ssl.SSLSocketFactory; + +import org.apache.qpid.amqp_1_0.framing.SocketExceptionHandler; import org.apache.qpid.amqp_1_0.framing.ConnectionHandler; import org.apache.qpid.amqp_1_0.transport.AMQPTransport; import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; @@ -38,8 +41,10 @@ import org.apache.qpid.amqp_1_0.type.Binary; import org.apache.qpid.amqp_1_0.type.FrameBody; import org.apache.qpid.amqp_1_0.type.SaslFrameBody; import org.apache.qpid.amqp_1_0.type.UnsignedInteger; +import org.apache.qpid.amqp_1_0.type.transport.ConnectionError; +import org.apache.qpid.amqp_1_0.type.transport.Error; -public class Connection +public class Connection implements SocketExceptionHandler { private static final Logger RAW_LOGGER = Logger.getLogger("RAW"); private static final int MAX_FRAME_SIZE = 65536; @@ -47,6 +52,8 @@ public class Connection private String _address; private ConnectionEndpoint _conn; private int _sessionCount; + private Runnable _connectionErrorTask; + private Error _socketError; public Connection(final String address, @@ -223,7 +230,7 @@ public class Connection } - ConnectionHandler.BytesOutputHandler outputHandler = new ConnectionHandler.BytesOutputHandler(outputStream, src, _conn); + ConnectionHandler.BytesOutputHandler outputHandler = new ConnectionHandler.BytesOutputHandler(outputStream, src, _conn, this); Thread outputThread = new Thread(outputHandler); outputThread.setDaemon(true); outputThread.start(); @@ -409,4 +416,38 @@ public class Connection } } } + + /** + * Set the connection error task that will be used as a callback for any socket read/write errors. + * + * @param connectionErrorTask connection error task + */ + public void setConnectionErrorTask(Runnable connectionErrorTask) + { + _connectionErrorTask = connectionErrorTask; + } + + /** + * Return the connection error for any socket read/write error that has occurred + * + * @return connection error + */ + public Error getConnectionError() + { + return _socketError; + } + + @Override + public void processSocketException(Exception exception) + { + Error socketError = new Error(); + socketError.setDescription(exception.getClass() + ": " + exception.getMessage()); + socketError.setCondition(ConnectionError.SOCKET_ERROR); + _socketError = socketError; + if(_connectionErrorTask != null) + { + Thread thread = new Thread(_connectionErrorTask); + thread.run(); + } + } } diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java index f391cf3035..d4077e0f08 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java @@ -386,12 +386,14 @@ public class ConnectionHandler private BytesSource _bytesSource; private boolean _closed; private ConnectionEndpoint _conn; + private SocketExceptionHandler _exceptionHandler; - public BytesOutputHandler(OutputStream outputStream, BytesSource source, ConnectionEndpoint conn) + public BytesOutputHandler(OutputStream outputStream, BytesSource source, ConnectionEndpoint conn, SocketExceptionHandler exceptionHandler) { _outputStream = outputStream; _bytesSource = source; _conn = conn; + _exceptionHandler = exceptionHandler; } public void run() @@ -421,7 +423,7 @@ public class ConnectionHandler catch (IOException e) { _closed = true; - e.printStackTrace(); //TODO + _exceptionHandler.processSocketException(e); } } } diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SocketExceptionHandler.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SocketExceptionHandler.java new file mode 100644 index 0000000000..540aee0f8d --- /dev/null +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SocketExceptionHandler.java @@ -0,0 +1,31 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.framing; + +/** + * Callback interface for processing socket exceptions. + */ +public interface SocketExceptionHandler +{ + + public void processSocketException(Exception exception); + +} diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/ConnectionError.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/ConnectionError.java index 07f0496e23..8a2120a252 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/ConnectionError.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/ConnectionError.java @@ -43,6 +43,8 @@ public class ConnectionError public static final ConnectionError REDIRECT = new ConnectionError(Symbol.valueOf("amqp:connection:redirect")); + public static final ConnectionError SOCKET_ERROR = new ConnectionError(Symbol.valueOf("amqp:connection:socket-error")); + private ConnectionError(Symbol val) @@ -73,6 +75,11 @@ public class ConnectionError return "redirect"; } + if(this == SOCKET_ERROR) + { + return "socket-error"; + } + else { return String.valueOf(_val); @@ -97,6 +104,11 @@ public class ConnectionError { return REDIRECT; } + + if(SOCKET_ERROR._val.equals(val)) + { + return SOCKET_ERROR; + } // TODO ERROR return null; |
