summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-07-29 02:07:20 +0000
committerRafael H. Schloming <rhs@apache.org>2008-07-29 02:07:20 +0000
commit188748bd3e3fba0e16d3d3d4bc7b1de72e285d09 (patch)
tree73e7eccc4529e9453bd4038410d2f1bff2611658 /java/client/src
parent3d5456aa0817248733d721050d6f3bdb9f8782da (diff)
downloadqpid-python-188748bd3e3fba0e16d3d3d4bc7b1de72e285d09.tar.gz
QPID-1201: fixed up version of aidan's patch, there are still failures when running against an external java broker, however we seem to get past basic connection negotiation now
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@680602 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java3
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java11
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java21
-rw-r--r--java/client/src/main/java/org/apache/qpidity/nclient/Client.java31
6 files changed, 67 insertions, 72 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index b3b3cc1ffd..bb28e70d76 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -36,7 +36,6 @@ import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.jms.FailoverPolicy;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.url.URLSyntaxException;
-import org.apache.qpidity.transport.TransportConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,6 +57,7 @@ import javax.naming.Reference;
import javax.naming.Referenceable;
import javax.naming.StringRefAddr;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
import java.net.ConnectException;
import java.net.UnknownHostException;
import java.nio.channels.UnresolvedAddressException;
@@ -364,10 +364,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
else
{
- // We always assume that the broker supports the lates AMQ protocol verions
- // thie is currently 0.10
- // TODO: use this code once we have switch to 0.10
- // getDelegate();
_delegate = new AMQConnectionDelegate_0_10(this);
}
@@ -420,21 +416,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
Exception connectionException = null;
while (!_connected && retryAllowed)
{
+ ProtocolVersion pe = null;
try
{
- makeBrokerConnection(brokerDetails);
- }
- catch (AMQProtocolException pe)
- {
- if (_logger.isInfoEnabled())
- {
- _logger.info(pe.getMessage());
- _logger.info("Trying broker supported protocol version: " +
- TransportConstants.getVersionMajor() + "." +
- TransportConstants.getVersionMinor());
- }
- // we need to check whether we have a delegate for the supported protocol
- getDelegate();
+ pe = makeBrokerConnection(brokerDetails);
}
catch (Exception e)
{
@@ -447,6 +432,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
connectionException = e;
}
+ if (pe != null)
+ {
+ // reset the delegate to the version returned by the
+ // broker
+ initDelegate(pe);
+ }
+
if (!_connected)
{
retryAllowed = _failoverPolicy.failoverAllowed();
@@ -518,23 +510,41 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException));
}
- private void getDelegate() throws AMQProtocolException
+ private void initDelegate(ProtocolVersion pe) throws AMQProtocolException
{
try
{
- Class c = Class.forName("org.apache.qpid.client.AMQConnectionDelegate_" +
- TransportConstants.getVersionMajor() + "_" +
- TransportConstants.getVersionMinor());
+ Class c = Class.forName(String.format
+ ("org.apache.qpid.client.AMQConnectionDelegate_%s_%s",
+ pe.getMajorVersion(), pe.getMinorVersion()));
Class partypes[] = new Class[1];
partypes[0] = AMQConnection.class;
_delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this);
}
- catch (Exception e)
+ catch (ClassNotFoundException e)
+ {
+ throw new AMQProtocolException
+ (AMQConstant.UNSUPPORTED_CLIENT_PROTOCOL_ERROR,
+ String.format("Protocol: %s.%s is rquired by the broker but is not " +
+ "currently supported by this client library implementation",
+ pe.getMajorVersion(), pe.getMinorVersion()),
+ e);
+ }
+ catch (NoSuchMethodException e)
+ {
+ throw new RuntimeException("unable to locate constructor for delegate", e);
+ }
+ catch (InstantiationException e)
+ {
+ throw new RuntimeException("error instantiating delegate", e);
+ }
+ catch (IllegalAccessException e)
+ {
+ throw new RuntimeException("error accessing delegate", e);
+ }
+ catch (InvocationTargetException e)
{
- throw new AMQProtocolException(AMQConstant.UNSUPPORTED_CLIENT_PROTOCOL_ERROR,
- "Protocol: " + TransportConstants.getVersionMajor() + "."
- + TransportConstants.getVersionMinor() + " is rquired by the broker but is not " +
- "currently supported by this client library implementation", e);
+ throw new RuntimeException("error invoking delegate", e);
}
}
@@ -615,9 +625,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
return false;
}
- public void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException
+ public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException
{
- _delegate.makeBrokerConnection(brokerDetail);
+ return _delegate.makeBrokerConnection(brokerDetail);
}
/**
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
index 07bd7ea0ae..7f36ec6e99 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
@@ -27,12 +27,13 @@ import javax.jms.XASession;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.Session;
public interface AMQConnectionDelegate
{
- public void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException;
+ public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException;
public Session createSession(final boolean transacted, final int acknowledgeMode,
final int prefetchHigh, final int prefetchLow) throws JMSException;
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index 825a52c5cb..ce10553210 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -9,13 +9,14 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQProtocolException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.Session;
import org.apache.qpidity.nclient.Client;
import org.apache.qpidity.nclient.ClosedListener;
import org.apache.qpidity.ErrorCode;
import org.apache.qpidity.QpidException;
-import org.apache.qpidity.ProtocolException;
+import org.apache.qpidity.transport.ProtocolVersionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -101,7 +102,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed
* @throws IOException
* @throws AMQException
*/
- public void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException
+ public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException
{
_qpidConnection = Client.createConnection();
try
@@ -117,14 +118,16 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Closed
_qpidConnection.setClosedListener(this);
_conn._connected = true;
}
- catch(ProtocolException pe)
+ catch(ProtocolVersionException pe)
{
- throw new AMQProtocolException(null, pe.getMessage(), pe);
+ return new ProtocolVersion(pe.getMajor(), pe.getMinor());
}
catch (QpidException e)
{
throw new AMQException(AMQConstant.CHANNEL_ERROR, "cannot connect to broker", e);
}
+
+ return null;
}
/**
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index aab094ca7d..1e65c50304 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -43,6 +43,7 @@ import org.apache.qpid.framing.BasicQosBody;
import org.apache.qpid.framing.BasicQosOkBody;
import org.apache.qpid.framing.ChannelOpenBody;
import org.apache.qpid.framing.ChannelOpenOkBody;
+import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.framing.TxSelectBody;
import org.apache.qpid.framing.TxSelectOkBody;
import org.apache.qpid.jms.BrokerDetails;
@@ -79,7 +80,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException));
}
- public void makeBrokerConnection(BrokerDetails brokerDetail) throws AMQException, IOException
+ public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws AMQException, IOException
{
final Set<AMQState> openOrClosedStates =
EnumSet.of(AMQState.CONNECTION_OPEN, AMQState.CONNECTION_CLOSED);
@@ -98,6 +99,8 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
_conn._failoverPolicy.attainedConnection();
_conn._connected = true;
}
+
+ return null;
}
public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch)
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java
index fd8063e99b..090620a560 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java
@@ -38,7 +38,6 @@ import javax.naming.spi.ObjectFactory;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.URLSyntaxException;
-import org.apache.qpidity.transport.TransportConstants;
public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory,
@@ -434,23 +433,15 @@ public class AMQConnectionFactory implements ConnectionFactory, QueueConnectionF
*/
public XAConnection createXAConnection() throws JMSException
{
- if (TransportConstants.getVersionMajor() == 0 &&
- TransportConstants.getVersionMinor() == 8)
+ try
{
- throw new UnsupportedOperationException("This protocol version does not support XA operations");
+ return new XAConnectionImpl(_connectionDetails, _sslConfig);
}
- else
+ catch (Exception e)
{
- try
- {
- return new XAConnectionImpl(_connectionDetails, _sslConfig);
- }
- catch (Exception e)
- {
- JMSException jmse = new JMSException("Error creating connection: " + e.getMessage());
- jmse.setLinkedException(e);
- throw jmse;
- }
+ JMSException jmse = new JMSException("Error creating connection: " + e.getMessage());
+ jmse.setLinkedException(e);
+ throw jmse;
}
}
diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/Client.java b/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
index f8d5bbcb1c..eb0e370560 100644
--- a/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
+++ b/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
@@ -31,7 +31,6 @@ import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.url.QpidURL;
import org.apache.qpidity.ErrorCode;
import org.apache.qpidity.QpidException;
-import org.apache.qpidity.ProtocolException;
import org.apache.qpidity.nclient.impl.ClientSession;
import org.apache.qpidity.nclient.impl.ClientSessionDelegate;
import org.apache.qpidity.transport.Channel;
@@ -40,8 +39,8 @@ import org.apache.qpidity.transport.Connection;
import org.apache.qpidity.transport.ConnectionClose;
import org.apache.qpidity.transport.ConnectionCloseCode;
import org.apache.qpidity.transport.ConnectionCloseOk;
-import org.apache.qpidity.transport.TransportConstants;
import org.apache.qpidity.transport.ProtocolHeader;
+import org.apache.qpidity.transport.ProtocolVersionException;
import org.apache.qpidity.transport.SessionDelegate;
import org.apache.qpidity.transport.network.io.IoTransport;
import org.apache.qpidity.transport.network.mina.MinaHandler;
@@ -60,6 +59,8 @@ public class Client implements org.apache.qpidity.nclient.Connection
private boolean closed = false;
private long timeout = 60000;
+ private ProtocolHeader header = null;
+
/**
*
* @return returns a new connection to the broker.
@@ -79,7 +80,6 @@ public class Client implements org.apache.qpidity.nclient.Connection
ClientDelegate connectionDelegate = new ClientDelegate()
{
private boolean receivedClose = false;
- private String _unsupportedProtocol;
public SessionDelegate getSessionDelegate()
{
return new ClientSessionDelegate();
@@ -138,28 +138,18 @@ public class Client implements org.apache.qpidity.nclient.Connection
this.receivedClose = true;
}
-
@Override public void init(Channel ch, ProtocolHeader hdr)
{
// TODO: once the merge is done we'll need to update this code
- // for handling 0.8 protocol version type i.e. major=8 and minor=0 :(
- if (hdr.getMajor() != TransportConstants.getVersionMajor()
- || hdr.getMinor() != TransportConstants.getVersionMinor())
+ // for handling 0.8 protocol version type i.e. major=8 and mino
+ if (hdr.getMajor() != 0 || hdr.getMinor() != 10)
{
- _unsupportedProtocol = TransportConstants.getVersionMajor() + "." +
- TransportConstants.getVersionMinor();
- TransportConstants.setVersionMajor( hdr.getMajor() );
- TransportConstants.setVersionMinor( hdr.getMinor() );
+ Client.this.header = hdr;
_lock.lock();
negotiationComplete.signalAll();
_lock.unlock();
}
}
-
- @Override public String getUnsupportedProtocol()
- {
- return _unsupportedProtocol;
- }
};
connectionDelegate.setCondition(_lock,negotiationComplete);
@@ -186,18 +176,15 @@ public class Client implements org.apache.qpidity.nclient.Connection
}
// XXX: hardcoded version numbers
- _conn.send(new ProtocolHeader(1, TransportConstants.getVersionMajor(),
- TransportConstants.getVersionMinor()));
+ _conn.send(new ProtocolHeader(1, 0, 10));
try
{
negotiationComplete.await(timeout, TimeUnit.MILLISECONDS);
- if( connectionDelegate.getUnsupportedProtocol() != null )
+ if (header != null)
{
_conn.close();
- throw new ProtocolException("Unsupported protocol version: " + connectionDelegate.getUnsupportedProtocol()
- , ErrorCode.UNSUPPORTED_PROTOCOL, null);
-
+ throw new ProtocolVersionException(header.getMajor(), header.getMinor());
}
}
catch (InterruptedException e)