diff options
Diffstat (limited to 'qpid/java/client/src/main')
| -rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java | 11 | ||||
| -rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java | 42 |
2 files changed, 33 insertions, 20 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java index 6da0da9f6f..c93b607740 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java @@ -334,4 +334,15 @@ public class AMQBrokerDetails implements BrokerDetails } + public static String checkTransport(String broker) + { + if ((!broker.contains("://"))) + { + return "tcp://" + broker; + } + else + { + return broker; + } + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 58ac49dd4e..034cf3a1eb 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -150,8 +150,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + - (clientName==null?"":clientName) + - virtualHost + "?brokerlist='" + broker + "'")); + (clientName == null ? "" : clientName) + + virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'")); } public AMQConnection(String host, int port, String username, String password, @@ -166,12 +166,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect this(new AMQConnectionURL(useSSL ? ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + - (clientName==null?"":clientName) + + (clientName == null ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'" + "," + ConnectionURL.OPTIONS_SSL + "='true'" : ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@" + - (clientName==null?"":clientName) + + (clientName == null ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'" + "," + ConnectionURL.OPTIONS_SSL + "='false'" )); @@ -469,22 +469,22 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. _protocolHandler.syncWrite( - ChannelOpenBody.createAMQFrame(channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - null), // outOfBand - ChannelOpenOkBody.class); + ChannelOpenBody.createAMQFrame(channelId, + (byte) 8, (byte) 0, // AMQP version (major, minor) + null), // outOfBand + ChannelOpenOkBody.class); //todo send low water mark when protocol allows. // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. _protocolHandler.syncWrite( - BasicQosBody.createAMQFrame(channelId, - (byte)8, (byte)0, // AMQP version (major, minor) - false, // global - prefetchHigh, // prefetchCount - 0), // prefetchSize - BasicQosOkBody.class); + BasicQosBody.createAMQFrame(channelId, + (byte) 8, (byte) 0, // AMQP version (major, minor) + false, // global + prefetchHigh, // prefetchCount + 0), // prefetchSize + BasicQosOkBody.class); if (transacted) { @@ -495,7 +495,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, (byte)8, (byte)0), TxSelectOkBody.class); + _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, (byte) 8, (byte) 0), TxSelectOkBody.class); } } @@ -527,6 +527,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect /** * Returns an AMQQueueSessionAdaptor which wraps an AMQSession and throws IllegalStateExceptions * where specified in the JMS spec + * * @param transacted * @param acknowledgeMode * @return QueueSession @@ -540,6 +541,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect /** * Returns an AMQTopicSessionAdapter which wraps an AMQSession and throws IllegalStateExceptions * where specified in the JMS spec + * * @param transacted * @param acknowledgeMode * @return TopicSession @@ -574,7 +576,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { checkNotClosed(); return _connectionMetaData; - + } public ExceptionListener getExceptionListener() throws JMSException @@ -625,7 +627,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public void close() throws JMSException { - synchronized(getFailoverMutex()) + synchronized (getFailoverMutex()) { if (!_closed.getAndSet(true)) { @@ -903,7 +905,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { if (cause instanceof AMQException) { - je = new JMSException(Integer.toString(((AMQException)cause).getErrorCode()) ,"Exception thrown against " + toString() + ": " + cause); + je = new JMSException(Integer.toString(((AMQException) cause).getErrorCode()), "Exception thrown against " + toString() + ": " + cause); } else { @@ -956,8 +958,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect void deregisterSession(int channelId) { _sessions.remove(channelId); - } - + } + /** * For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling. * The caller must hold the failover mutex before calling this method. |
