diff options
4 files changed, 21 insertions, 11 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index fc8f0a1a6f..f1318f0f48 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -393,7 +393,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private void setVirtualHost(String virtualHost) { - if (virtualHost.startsWith("/")) + if (virtualHost != null && virtualHost.startsWith("/")) { virtualHost = virtualHost.substring(1); } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java index 42e8284b10..ae9a5ff802 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java @@ -268,7 +268,8 @@ public class AMQConnectionURL implements ConnectionURL public String toString() { - StringBuffer sb = new StringBuffer(); + return _url; + /*StringBuffer sb = new StringBuffer(); sb.append(AMQ_PROTOCOL); sb.append("://"); @@ -299,7 +300,7 @@ public class AMQConnectionURL implements ConnectionURL sb.append(optionsToString()); - return sb.toString(); + return sb.toString();*/ } private String optionsToString() diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index fd507de4b6..d411730c83 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -345,6 +345,7 @@ public class AMQSession_0_10 extends AMQSession consumer.isNoLocal() ? Option.NO_LOCAL : Option.NO_OPTION, consumer.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION); + getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF); // We need to sync so that we get notify of an error. getQpidSession().sync(); getCurrentException(); @@ -438,7 +439,7 @@ public class AMQSession_0_10 extends AMQSession { getQpidSession().messageStop(consumer.getConsumerTag().toString()); getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT); - + getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF); } } else diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index b0cd749a2a..fa823c2c0f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -174,7 +174,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By { ((AMQSession_0_10) getSession()).getQpidSession().messageStop(getConsumerTag().toString()); ((AMQSession_0_10) getSession()).getQpidSession().sync(); - // confirm cancel + // confirm cancel getSession().confirmConsumerCancelled(getConsumerTag()); try { @@ -303,7 +303,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By { // do nothing as the rollback operation will do the job. } - + /** * Acquire a message * @@ -338,8 +338,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By super.setMessageListener(messageListener); if (messageListener == null) { - _0_10session.getQpidSession().messageFlowMode(getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT); _0_10session.getQpidSession().messageStop(getConsumerTag().toString()); + _0_10session.getQpidSession().messageFlowMode(getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT); + _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), + org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE, + 0xFFFFFFFF); _0_10session.getQpidSession().sync(); } else @@ -367,14 +370,19 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By if (l > 0) { o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS); - _0_10session.getQpidSession().messageFlush(getConsumerTag().toString()); - _0_10session.getQpidSession().sync(); - o = _synchronousQueue.poll(); + if (o == null) + { + _logger.debug("Message Didn't arrive in time, checking if one is inflight"); + // checking if one is inflight + _0_10session.getQpidSession().messageFlush(getConsumerTag().toString()); + _0_10session.getQpidSession().sync(); + o = _synchronousQueue.poll(); + } } else { o = _synchronousQueue.take(); } - return null; + return o; } }
\ No newline at end of file |
