summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java3
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java22
4 files changed, 21 insertions, 11 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index fc8f0a1a6f..f1318f0f48 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -393,7 +393,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private void setVirtualHost(String virtualHost)
{
- if (virtualHost.startsWith("/"))
+ if (virtualHost != null && virtualHost.startsWith("/"))
{
virtualHost = virtualHost.substring(1);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
index 42e8284b10..ae9a5ff802 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
@@ -268,7 +268,8 @@ public class AMQConnectionURL implements ConnectionURL
public String toString()
{
- StringBuffer sb = new StringBuffer();
+ return _url;
+ /*StringBuffer sb = new StringBuffer();
sb.append(AMQ_PROTOCOL);
sb.append("://");
@@ -299,7 +300,7 @@ public class AMQConnectionURL implements ConnectionURL
sb.append(optionsToString());
- return sb.toString();
+ return sb.toString();*/
}
private String optionsToString()
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index fd507de4b6..d411730c83 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -345,6 +345,7 @@ public class AMQSession_0_10 extends AMQSession
consumer.isNoLocal() ? Option.NO_LOCAL : Option.NO_OPTION,
consumer.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION);
+ getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
// We need to sync so that we get notify of an error.
getQpidSession().sync();
getCurrentException();
@@ -438,7 +439,7 @@ public class AMQSession_0_10 extends AMQSession
{
getQpidSession().messageStop(consumer.getConsumerTag().toString());
getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT);
-
+ getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
}
}
else
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index b0cd749a2a..fa823c2c0f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -174,7 +174,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
{
((AMQSession_0_10) getSession()).getQpidSession().messageStop(getConsumerTag().toString());
((AMQSession_0_10) getSession()).getQpidSession().sync();
- // confirm cancel
+ // confirm cancel
getSession().confirmConsumerCancelled(getConsumerTag());
try
{
@@ -303,7 +303,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
{
// do nothing as the rollback operation will do the job.
}
-
+
/**
* Acquire a message
*
@@ -338,8 +338,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
super.setMessageListener(messageListener);
if (messageListener == null)
{
- _0_10session.getQpidSession().messageFlowMode(getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT);
_0_10session.getQpidSession().messageStop(getConsumerTag().toString());
+ _0_10session.getQpidSession().messageFlowMode(getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT);
+ _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+ org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE,
+ 0xFFFFFFFF);
_0_10session.getQpidSession().sync();
}
else
@@ -367,14 +370,19 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
if (l > 0)
{
o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS);
- _0_10session.getQpidSession().messageFlush(getConsumerTag().toString());
- _0_10session.getQpidSession().sync();
- o = _synchronousQueue.poll();
+ if (o == null)
+ {
+ _logger.debug("Message Didn't arrive in time, checking if one is inflight");
+ // checking if one is inflight
+ _0_10session.getQpidSession().messageFlush(getConsumerTag().toString());
+ _0_10session.getQpidSession().sync();
+ o = _synchronousQueue.poll();
+ }
}
else
{
o = _synchronousQueue.take();
}
- return null;
+ return o;
}
} \ No newline at end of file