summaryrefslogtreecommitdiff
path: root/qpid/java/client
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2007-11-09 10:43:47 +0000
committerArnaud Simon <arnaudsimon@apache.org>2007-11-09 10:43:47 +0000
commit8cf55327406939cb023e2d81cf53d951ac099152 (patch)
treea9c4ce10dd8785942abb2ece77c3c8e24a20f337 /qpid/java/client
parent5de69bedce014b1ffbc81abd3766bc333ae02912 (diff)
downloadqpid-python-8cf55327406939cb023e2d81cf53d951ac099152.tar.gz
removed remote exchange class querying when standard exchanges are used
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@593486 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java133
1 files changed, 81 insertions, 52 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index caa7d1cacf..9354caea6f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -27,6 +27,7 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpidity.api.Message;
import org.apache.qpidity.nclient.Session;
@@ -57,7 +58,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
/**
* Number of received message so far
*/
- private final AtomicLong _messagesReceived = new AtomicLong(0);
+ private final AtomicLong _messagesReceived = new AtomicLong(0);
/**
* This class logger
@@ -95,7 +96,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler,
rawSelectorFieldTable, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, noConsume, autoClose);
_0_10session = (AMQSession_0_10) session;
- if (messageSelector != null && ! messageSelector.equals("") )
+ if (messageSelector != null && !messageSelector.equals(""))
{
try
{
@@ -110,7 +111,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
_preAcquire = false;
}
}
- _isStarted = connection.started();
+ _isStarted = connection.started();
}
// ----- Interface org.apache.qpidity.client.util.MessageListener
@@ -136,7 +137,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
catch (Exception e1)
{
// the receiver may be waiting for a message
- if( _messageCounter.get() >= 0)
+ if (_messageCounter.get() >= 0)
{
_messageCounter.decrementAndGet();
_synchronousQueue.add(new NullTocken());
@@ -153,13 +154,13 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
public void onMessage(Message message)
{
- if( isMessageListenerSet())
+ if (isMessageListenerSet())
{
_messagesReceived.incrementAndGet();
- if( _messagesReceived.get() >= AMQSession_0_10.MAX_PREFETCH )
+ if (_messagesReceived.get() >= AMQSession_0_10.MAX_PREFETCH)
{
// require more credit
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+ _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
AMQSession_0_10.MAX_PREFETCH);
_messagesReceived.set(0);
@@ -182,11 +183,9 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
getSession().getAMQConnection().exceptionReceived(e);
}
Struct[] headers = {message.getMessageProperties(), message.getDeliveryProperties()};
- // if there is a replyto destination then we need to request the exchange info
+ // if there is a replyto destination then we need to request the exchange info
ReplyTo replyTo = message.getMessageProperties().getReplyTo();
- if (replyTo != null &&
- replyTo.getExchangeName() != null &&
- !replyTo.getExchangeName().equals(""))
+ if (replyTo != null && replyTo.getExchangeName() != null && !replyTo.getExchangeName().equals(""))
{
// <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
// the exchnage class will be set later from within the sesion thread
@@ -234,20 +233,41 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
super.postDeliver(msg);
}
- void notifyMessage(UnprocessedMessage messageFrame, int channelId)
+ void notifyMessage(UnprocessedMessage messageFrame, int channelId)
{
- // if there is a replyto destination then we need to request the exchange info
- String replyToURL = messageFrame.getReplyToURL() ;
- if (replyToURL != null && ! replyToURL.equals(""))
+ // if there is a replyto destination then we need to request the exchange info
+ String replyToURL = messageFrame.getReplyToURL();
+ if (replyToURL != null && !replyToURL.equals(""))
{
- String exchangeName = replyToURL.substring(0, replyToURL.indexOf('/'));
- Future<ExchangeQueryResult> future = ((AMQSession_0_10) getSession()).getQpidSession().exchangeQuery(exchangeName);
- ExchangeQueryResult res = future.get();
- // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
- String replyToUrl = res.getType() + "://" + replyToURL;
+ AMQShortString shortExchangeName = new AMQShortString( replyToURL.substring(0, replyToURL.indexOf('/')));
+ String replyToUrl = "://" + replyToURL;
+ if (shortExchangeName.equals(ExchangeDefaults.TOPIC_EXCHANGE_NAME))
+ {
+ replyToUrl = ExchangeDefaults.TOPIC_EXCHANGE_CLASS + replyToUrl;
+ }
+ else if (shortExchangeName.equals(ExchangeDefaults.DIRECT_EXCHANGE_NAME))
+ {
+ replyToUrl = ExchangeDefaults.DIRECT_EXCHANGE_CLASS + replyToUrl;
+ }
+ else if (shortExchangeName.equals(ExchangeDefaults.HEADERS_EXCHANGE_NAME))
+ {
+ replyToUrl = ExchangeDefaults.HEADERS_EXCHANGE_CLASS + replyToUrl;
+ }
+ else if (shortExchangeName.equals(ExchangeDefaults.FANOUT_EXCHANGE_NAME))
+ {
+ replyToUrl = ExchangeDefaults.FANOUT_EXCHANGE_CLASS + replyToUrl;
+ }
+ else
+ {
+ Future<ExchangeQueryResult> future =
+ ((AMQSession_0_10) getSession()).getQpidSession().exchangeQuery(shortExchangeName.toString());
+ ExchangeQueryResult res = future.get();
+ // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
+ replyToUrl = res.getType() + replyToUrl;
+ }
((UnprocessedMessage_0_10) messageFrame).setReplyToURL(replyToUrl);
}
- super.notifyMessage(messageFrame, channelId);
+ super.notifyMessage(messageFrame, channelId);
}
public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(
@@ -273,7 +293,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
// TODO Use a tag for fiding out if message filtering is done here or by the broker.
try
{
- if (getMessageSelector() != null && ! getMessageSelector().equals(""))
+ if (getMessageSelector() != null && !getMessageSelector().equals(""))
{
messageOk = _filter.matches(message);
}
@@ -283,7 +303,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
throw new AMQException(AMQConstant.INTERNAL_ERROR, "Error when evaluating message selector", e);
}
- if (_logger.isDebugEnabled())
+ if (_logger.isDebugEnabled())
{
_logger.debug("messageOk " + messageOk);
_logger.debug("_preAcquire " + _preAcquire);
@@ -308,7 +328,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
}
// now we need to acquire this message if needed
// this is the case of queue with a message selector set
- if (!_preAcquire && messageOk && ! isNoConsume())
+ if (!_preAcquire && messageOk && !isNoConsume())
{
if (_logger.isDebugEnabled())
{
@@ -316,7 +336,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
}
messageOk = acquireMessage(message);
}
- if( ! messageOk )
+ if (!messageOk)
{
requestCreditIfCreditMode();
}
@@ -330,14 +350,17 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
// the current message received is not good, so we need to get a message.
if (getMessageListener() == null)
{
- int oldval = _messageCounter.intValue();
+ int oldval = _messageCounter.intValue();
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
- org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,1);
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
+ org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
+ 1);
+ _0_10session.getQpidSession()
+ .messageFlow(getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
_0_10session.getQpidSession().messageFlush(getConsumerTag().toString());
_0_10session.getQpidSession().sync();
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
- if( _messageCounter.intValue() <= oldval )
+ _0_10session.getQpidSession()
+ .messageFlow(getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
+ if (_messageCounter.intValue() <= oldval)
{
// we haven't received a message so tell the receiver to return null
_synchronousQueue.add(new NullTocken());
@@ -350,9 +373,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
// we now need to check if we have received a message
}
- catch(Exception e)
+ catch (Exception e)
{
- _logger.error("Error getting message listener, couldn't request credit after releasing a message that failed the selector test",e);
+ _logger.error(
+ "Error getting message listener, couldn't request credit after releasing a message that failed the selector test",
+ e);
}
}
@@ -390,10 +415,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
}
}
- protected void rollbackReceivedMessages()
- {
- // do nothing as the rollback operation will do the job.
- }
+ protected void rollbackReceivedMessages()
+ {
+ // do nothing as the rollback operation will do the job.
+ }
/**
* Acquire a message
@@ -430,17 +455,19 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
if (messageListener == null)
{
_0_10session.getQpidSession().messageStop(getConsumerTag().toString());
- _0_10session.getQpidSession().messageFlowMode(getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT);
+ _0_10session.getQpidSession()
+ .messageFlowMode(getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT);
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
- org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE,
- 0xFFFFFFFF);
+ org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE,
+ 0xFFFFFFFF);
_0_10session.getQpidSession().sync();
}
else
{
if (_connection.started())
{
- _0_10session.getQpidSession().messageFlowMode(getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_WINDOW);
+ _0_10session.getQpidSession()
+ .messageFlowMode(getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_WINDOW);
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
AMQSession_0_10.MAX_PREFETCH);
@@ -448,20 +475,21 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE,
0xFFFFFFFF);
_0_10session.getQpidSession().sync();
- _messagesReceived.set(0);;
+ _messagesReceived.set(0);
+ ;
}
}
}
public Object getMessageFromQueue(long l) throws InterruptedException
{
- if( !_isStarted )
+ if (!_isStarted)
{
return null;
}
Object o;
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
- org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,1);
+ org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
if (l == 0)
{
@@ -480,24 +508,25 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
if (o == null)
{
_logger.debug("Message Didn't arrive in time, checking if one is inflight");
- // checking if one is inflight
+ // checking if one is inflight
_0_10session.getQpidSession().messageFlush(getConsumerTag().toString());
_0_10session.getQpidSession().sync();
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
- if( _messageCounter.get() > 0 )
+ _0_10session.getQpidSession()
+ .messageFlow(getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
+ if (_messageCounter.get() > 0)
{
- o = _synchronousQueue.take();
+ o = _synchronousQueue.take();
}
}
}
- if( o instanceof NullTocken )
+ if (o instanceof NullTocken)
{
o = null;
}
return o;
}
- protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
+ protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
{
_messageCounter.decrementAndGet();
super.preApplicationProcessing(jmsMsg);
@@ -507,14 +536,14 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
{
}
-
+
public void start()
{
- _isStarted = true;
+ _isStarted = true;
}
public void stop()
{
- _isStarted = false;
+ _isStarted = false;
}
} \ No newline at end of file