summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java33
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java45
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java6
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SocketExceptionHandler.java31
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/ConnectionError.java12
5 files changed, 121 insertions, 6 deletions
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
index f253f8d0ab..8823f0f148 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
@@ -107,6 +107,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
{
_conn = new org.apache.qpid.amqp_1_0.client.Connection(_host,
_port, _username, _password, container, _remoteHost, _ssl);
+ _conn.setConnectionErrorTask(new ConnectionErrorTask());
// TODO - retrieve negotiated AMQP version
_connectionMetaData = new ConnectionMetaDataImpl(1,0,0);
}
@@ -234,8 +235,8 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
public void setClientID(final String value) throws JMSException
{
checkNotConnected("Cannot set client-id to \""
- + value
- + "\"; client-id must be set before the connection is used");
+ + value
+ + "\"; client-id must be set before the connection is used");
if( _clientId !=null )
{
throw new IllegalStateException("client-id has already been set");
@@ -534,4 +535,32 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
return _syncPublish;
}
+ private class ConnectionErrorTask implements Runnable
+ {
+
+ @Override
+ public void run()
+ {
+
+ try
+ {
+ final ExceptionListener exceptionListener = getExceptionListener();
+
+ if(exceptionListener != null)
+ {
+ final org.apache.qpid.amqp_1_0.type.transport.Error connectionError = _conn.getConnectionError();
+ if(connectionError != null)
+ {
+ exceptionListener.onException(new JMSException(connectionError.getDescription(),
+ connectionError.getCondition().toString()));
+ }
+ }
+ }
+ catch (JMSException ignored)
+ {
+ // ignored
+ }
+ }
+ }
+
}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
index f66a33b978..2cbf0ee591 100644
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
+++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
@@ -28,7 +28,10 @@ import java.nio.ByteBuffer;
import java.security.Principal;
import java.util.logging.Level;
import java.util.logging.Logger;
+
import javax.net.ssl.SSLSocketFactory;
+
+import org.apache.qpid.amqp_1_0.framing.SocketExceptionHandler;
import org.apache.qpid.amqp_1_0.framing.ConnectionHandler;
import org.apache.qpid.amqp_1_0.transport.AMQPTransport;
import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
@@ -38,8 +41,10 @@ import org.apache.qpid.amqp_1_0.type.Binary;
import org.apache.qpid.amqp_1_0.type.FrameBody;
import org.apache.qpid.amqp_1_0.type.SaslFrameBody;
import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.transport.ConnectionError;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
-public class Connection
+public class Connection implements SocketExceptionHandler
{
private static final Logger RAW_LOGGER = Logger.getLogger("RAW");
private static final int MAX_FRAME_SIZE = 65536;
@@ -47,6 +52,8 @@ public class Connection
private String _address;
private ConnectionEndpoint _conn;
private int _sessionCount;
+ private Runnable _connectionErrorTask;
+ private Error _socketError;
public Connection(final String address,
@@ -223,7 +230,7 @@ public class Connection
}
- ConnectionHandler.BytesOutputHandler outputHandler = new ConnectionHandler.BytesOutputHandler(outputStream, src, _conn);
+ ConnectionHandler.BytesOutputHandler outputHandler = new ConnectionHandler.BytesOutputHandler(outputStream, src, _conn, this);
Thread outputThread = new Thread(outputHandler);
outputThread.setDaemon(true);
outputThread.start();
@@ -409,4 +416,38 @@ public class Connection
}
}
}
+
+ /**
+ * Set the connection error task that will be used as a callback for any socket read/write errors.
+ *
+ * @param connectionErrorTask connection error task
+ */
+ public void setConnectionErrorTask(Runnable connectionErrorTask)
+ {
+ _connectionErrorTask = connectionErrorTask;
+ }
+
+ /**
+ * Return the connection error for any socket read/write error that has occurred
+ *
+ * @return connection error
+ */
+ public Error getConnectionError()
+ {
+ return _socketError;
+ }
+
+ @Override
+ public void processSocketException(Exception exception)
+ {
+ Error socketError = new Error();
+ socketError.setDescription(exception.getClass() + ": " + exception.getMessage());
+ socketError.setCondition(ConnectionError.SOCKET_ERROR);
+ _socketError = socketError;
+ if(_connectionErrorTask != null)
+ {
+ Thread thread = new Thread(_connectionErrorTask);
+ thread.run();
+ }
+ }
}
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java
index f391cf3035..d4077e0f08 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java
@@ -386,12 +386,14 @@ public class ConnectionHandler
private BytesSource _bytesSource;
private boolean _closed;
private ConnectionEndpoint _conn;
+ private SocketExceptionHandler _exceptionHandler;
- public BytesOutputHandler(OutputStream outputStream, BytesSource source, ConnectionEndpoint conn)
+ public BytesOutputHandler(OutputStream outputStream, BytesSource source, ConnectionEndpoint conn, SocketExceptionHandler exceptionHandler)
{
_outputStream = outputStream;
_bytesSource = source;
_conn = conn;
+ _exceptionHandler = exceptionHandler;
}
public void run()
@@ -421,7 +423,7 @@ public class ConnectionHandler
catch (IOException e)
{
_closed = true;
- e.printStackTrace(); //TODO
+ _exceptionHandler.processSocketException(e);
}
}
}
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SocketExceptionHandler.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SocketExceptionHandler.java
new file mode 100644
index 0000000000..540aee0f8d
--- /dev/null
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SocketExceptionHandler.java
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.framing;
+
+/**
+ * Callback interface for processing socket exceptions.
+ */
+public interface SocketExceptionHandler
+{
+
+ public void processSocketException(Exception exception);
+
+}
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/ConnectionError.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/ConnectionError.java
index 07f0496e23..8a2120a252 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/ConnectionError.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/ConnectionError.java
@@ -43,6 +43,8 @@ public class ConnectionError
public static final ConnectionError REDIRECT = new ConnectionError(Symbol.valueOf("amqp:connection:redirect"));
+ public static final ConnectionError SOCKET_ERROR = new ConnectionError(Symbol.valueOf("amqp:connection:socket-error"));
+
private ConnectionError(Symbol val)
@@ -73,6 +75,11 @@ public class ConnectionError
return "redirect";
}
+ if(this == SOCKET_ERROR)
+ {
+ return "socket-error";
+ }
+
else
{
return String.valueOf(_val);
@@ -97,6 +104,11 @@ public class ConnectionError
{
return REDIRECT;
}
+
+ if(SOCKET_ERROR._val.equals(val))
+ {
+ return SOCKET_ERROR;
+ }
// TODO ERROR
return null;