summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java11
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java42
2 files changed, 33 insertions, 20 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
index 6da0da9f6f..c93b607740 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
@@ -334,4 +334,15 @@ public class AMQBrokerDetails implements BrokerDetails
}
+ public static String checkTransport(String broker)
+ {
+ if ((!broker.contains("://")))
+ {
+ return "tcp://" + broker;
+ }
+ else
+ {
+ return broker;
+ }
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 58ac49dd4e..034cf3a1eb 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -150,8 +150,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" +
username + ":" + password + "@" +
- (clientName==null?"":clientName) +
- virtualHost + "?brokerlist='" + broker + "'"));
+ (clientName == null ? "" : clientName) +
+ virtualHost + "?brokerlist='" + AMQBrokerDetails.checkTransport(broker) + "'"));
}
public AMQConnection(String host, int port, String username, String password,
@@ -166,12 +166,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
this(new AMQConnectionURL(useSSL ?
ConnectionURL.AMQ_PROTOCOL + "://" +
username + ":" + password + "@" +
- (clientName==null?"":clientName) +
+ (clientName == null ? "" : clientName) +
virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'"
+ "," + ConnectionURL.OPTIONS_SSL + "='true'" :
ConnectionURL.AMQ_PROTOCOL + "://" +
username + ":" + password + "@" +
- (clientName==null?"":clientName) +
+ (clientName == null ? "" : clientName) +
virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'"
+ "," + ConnectionURL.OPTIONS_SSL + "='false'"
));
@@ -469,22 +469,22 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
_protocolHandler.syncWrite(
- ChannelOpenBody.createAMQFrame(channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- null), // outOfBand
- ChannelOpenOkBody.class);
+ ChannelOpenBody.createAMQFrame(channelId,
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ null), // outOfBand
+ ChannelOpenOkBody.class);
//todo send low water mark when protocol allows.
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
_protocolHandler.syncWrite(
- BasicQosBody.createAMQFrame(channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- false, // global
- prefetchHigh, // prefetchCount
- 0), // prefetchSize
- BasicQosOkBody.class);
+ BasicQosBody.createAMQFrame(channelId,
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ false, // global
+ prefetchHigh, // prefetchCount
+ 0), // prefetchSize
+ BasicQosOkBody.class);
if (transacted)
{
@@ -495,7 +495,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, (byte)8, (byte)0), TxSelectOkBody.class);
+ _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, (byte) 8, (byte) 0), TxSelectOkBody.class);
}
}
@@ -527,6 +527,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
/**
* Returns an AMQQueueSessionAdaptor which wraps an AMQSession and throws IllegalStateExceptions
* where specified in the JMS spec
+ *
* @param transacted
* @param acknowledgeMode
* @return QueueSession
@@ -540,6 +541,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
/**
* Returns an AMQTopicSessionAdapter which wraps an AMQSession and throws IllegalStateExceptions
* where specified in the JMS spec
+ *
* @param transacted
* @param acknowledgeMode
* @return TopicSession
@@ -574,7 +576,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
checkNotClosed();
return _connectionMetaData;
-
+
}
public ExceptionListener getExceptionListener() throws JMSException
@@ -625,7 +627,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public void close() throws JMSException
{
- synchronized(getFailoverMutex())
+ synchronized (getFailoverMutex())
{
if (!_closed.getAndSet(true))
{
@@ -903,7 +905,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
if (cause instanceof AMQException)
{
- je = new JMSException(Integer.toString(((AMQException)cause).getErrorCode()) ,"Exception thrown against " + toString() + ": " + cause);
+ je = new JMSException(Integer.toString(((AMQException) cause).getErrorCode()), "Exception thrown against " + toString() + ": " + cause);
}
else
{
@@ -956,8 +958,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
void deregisterSession(int channelId)
{
_sessions.remove(channelId);
- }
-
+ }
+
/**
* For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling.
* The caller must hold the failover mutex before calling this method.