diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2007-11-09 10:43:47 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2007-11-09 10:43:47 +0000 |
| commit | 8cf55327406939cb023e2d81cf53d951ac099152 (patch) | |
| tree | a9c4ce10dd8785942abb2ece77c3c8e24a20f337 /qpid/java/client | |
| parent | 5de69bedce014b1ffbc81abd3766bc333ae02912 (diff) | |
| download | qpid-python-8cf55327406939cb023e2d81cf53d951ac099152.tar.gz | |
removed remote exchange class querying when standard exchanges are used
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@593486 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
| -rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java | 133 |
1 files changed, 81 insertions, 52 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index caa7d1cacf..9354caea6f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -27,6 +27,7 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.AMQException; +import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpidity.api.Message; import org.apache.qpidity.nclient.Session; @@ -57,7 +58,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By /** * Number of received message so far */ - private final AtomicLong _messagesReceived = new AtomicLong(0); + private final AtomicLong _messagesReceived = new AtomicLong(0); /** * This class logger @@ -95,7 +96,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler, rawSelectorFieldTable, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, noConsume, autoClose); _0_10session = (AMQSession_0_10) session; - if (messageSelector != null && ! messageSelector.equals("") ) + if (messageSelector != null && !messageSelector.equals("")) { try { @@ -110,7 +111,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By _preAcquire = false; } } - _isStarted = connection.started(); + _isStarted = connection.started(); } // ----- Interface org.apache.qpidity.client.util.MessageListener @@ -136,7 +137,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By catch (Exception e1) { // the receiver may be waiting for a message - if( _messageCounter.get() >= 0) + if (_messageCounter.get() >= 0) { _messageCounter.decrementAndGet(); _synchronousQueue.add(new NullTocken()); @@ -153,13 +154,13 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By public void onMessage(Message message) { - if( isMessageListenerSet()) + if (isMessageListenerSet()) { _messagesReceived.incrementAndGet(); - if( _messagesReceived.get() >= AMQSession_0_10.MAX_PREFETCH ) + if (_messagesReceived.get() >= AMQSession_0_10.MAX_PREFETCH) { // require more credit - _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), + _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, AMQSession_0_10.MAX_PREFETCH); _messagesReceived.set(0); @@ -182,11 +183,9 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By getSession().getAMQConnection().exceptionReceived(e); } Struct[] headers = {message.getMessageProperties(), message.getDeliveryProperties()}; - // if there is a replyto destination then we need to request the exchange info + // if there is a replyto destination then we need to request the exchange info ReplyTo replyTo = message.getMessageProperties().getReplyTo(); - if (replyTo != null && - replyTo.getExchangeName() != null && - !replyTo.getExchangeName().equals("")) + if (replyTo != null && replyTo.getExchangeName() != null && !replyTo.getExchangeName().equals("")) { // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']* // the exchnage class will be set later from within the sesion thread @@ -234,20 +233,41 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By super.postDeliver(msg); } - void notifyMessage(UnprocessedMessage messageFrame, int channelId) + void notifyMessage(UnprocessedMessage messageFrame, int channelId) { - // if there is a replyto destination then we need to request the exchange info - String replyToURL = messageFrame.getReplyToURL() ; - if (replyToURL != null && ! replyToURL.equals("")) + // if there is a replyto destination then we need to request the exchange info + String replyToURL = messageFrame.getReplyToURL(); + if (replyToURL != null && !replyToURL.equals("")) { - String exchangeName = replyToURL.substring(0, replyToURL.indexOf('/')); - Future<ExchangeQueryResult> future = ((AMQSession_0_10) getSession()).getQpidSession().exchangeQuery(exchangeName); - ExchangeQueryResult res = future.get(); - // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']* - String replyToUrl = res.getType() + "://" + replyToURL; + AMQShortString shortExchangeName = new AMQShortString( replyToURL.substring(0, replyToURL.indexOf('/'))); + String replyToUrl = "://" + replyToURL; + if (shortExchangeName.equals(ExchangeDefaults.TOPIC_EXCHANGE_NAME)) + { + replyToUrl = ExchangeDefaults.TOPIC_EXCHANGE_CLASS + replyToUrl; + } + else if (shortExchangeName.equals(ExchangeDefaults.DIRECT_EXCHANGE_NAME)) + { + replyToUrl = ExchangeDefaults.DIRECT_EXCHANGE_CLASS + replyToUrl; + } + else if (shortExchangeName.equals(ExchangeDefaults.HEADERS_EXCHANGE_NAME)) + { + replyToUrl = ExchangeDefaults.HEADERS_EXCHANGE_CLASS + replyToUrl; + } + else if (shortExchangeName.equals(ExchangeDefaults.FANOUT_EXCHANGE_NAME)) + { + replyToUrl = ExchangeDefaults.FANOUT_EXCHANGE_CLASS + replyToUrl; + } + else + { + Future<ExchangeQueryResult> future = + ((AMQSession_0_10) getSession()).getQpidSession().exchangeQuery(shortExchangeName.toString()); + ExchangeQueryResult res = future.get(); + // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']* + replyToUrl = res.getType() + replyToUrl; + } ((UnprocessedMessage_0_10) messageFrame).setReplyToURL(replyToUrl); } - super.notifyMessage(messageFrame, channelId); + super.notifyMessage(messageFrame, channelId); } public AbstractJMSMessage createJMSMessageFromUnprocessedMessage( @@ -273,7 +293,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By // TODO Use a tag for fiding out if message filtering is done here or by the broker. try { - if (getMessageSelector() != null && ! getMessageSelector().equals("")) + if (getMessageSelector() != null && !getMessageSelector().equals("")) { messageOk = _filter.matches(message); } @@ -283,7 +303,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By throw new AMQException(AMQConstant.INTERNAL_ERROR, "Error when evaluating message selector", e); } - if (_logger.isDebugEnabled()) + if (_logger.isDebugEnabled()) { _logger.debug("messageOk " + messageOk); _logger.debug("_preAcquire " + _preAcquire); @@ -308,7 +328,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By } // now we need to acquire this message if needed // this is the case of queue with a message selector set - if (!_preAcquire && messageOk && ! isNoConsume()) + if (!_preAcquire && messageOk && !isNoConsume()) { if (_logger.isDebugEnabled()) { @@ -316,7 +336,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By } messageOk = acquireMessage(message); } - if( ! messageOk ) + if (!messageOk) { requestCreditIfCreditMode(); } @@ -330,14 +350,17 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By // the current message received is not good, so we need to get a message. if (getMessageListener() == null) { - int oldval = _messageCounter.intValue(); + int oldval = _messageCounter.intValue(); _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), - org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,1); - _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF); + org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, + 1); + _0_10session.getQpidSession() + .messageFlow(getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF); _0_10session.getQpidSession().messageFlush(getConsumerTag().toString()); _0_10session.getQpidSession().sync(); - _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF); - if( _messageCounter.intValue() <= oldval ) + _0_10session.getQpidSession() + .messageFlow(getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF); + if (_messageCounter.intValue() <= oldval) { // we haven't received a message so tell the receiver to return null _synchronousQueue.add(new NullTocken()); @@ -350,9 +373,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By // we now need to check if we have received a message } - catch(Exception e) + catch (Exception e) { - _logger.error("Error getting message listener, couldn't request credit after releasing a message that failed the selector test",e); + _logger.error( + "Error getting message listener, couldn't request credit after releasing a message that failed the selector test", + e); } } @@ -390,10 +415,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By } } - protected void rollbackReceivedMessages() - { - // do nothing as the rollback operation will do the job. - } + protected void rollbackReceivedMessages() + { + // do nothing as the rollback operation will do the job. + } /** * Acquire a message @@ -430,17 +455,19 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By if (messageListener == null) { _0_10session.getQpidSession().messageStop(getConsumerTag().toString()); - _0_10session.getQpidSession().messageFlowMode(getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT); + _0_10session.getQpidSession() + .messageFlowMode(getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT); _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), - org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE, - 0xFFFFFFFF); + org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE, + 0xFFFFFFFF); _0_10session.getQpidSession().sync(); } else { if (_connection.started()) { - _0_10session.getQpidSession().messageFlowMode(getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_WINDOW); + _0_10session.getQpidSession() + .messageFlowMode(getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_WINDOW); _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, AMQSession_0_10.MAX_PREFETCH); @@ -448,20 +475,21 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF); _0_10session.getQpidSession().sync(); - _messagesReceived.set(0);; + _messagesReceived.set(0); + ; } } } public Object getMessageFromQueue(long l) throws InterruptedException { - if( !_isStarted ) + if (!_isStarted) { return null; } Object o; _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), - org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,1); + org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1); if (l == 0) { @@ -480,24 +508,25 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By if (o == null) { _logger.debug("Message Didn't arrive in time, checking if one is inflight"); - // checking if one is inflight + // checking if one is inflight _0_10session.getQpidSession().messageFlush(getConsumerTag().toString()); _0_10session.getQpidSession().sync(); - _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF); - if( _messageCounter.get() > 0 ) + _0_10session.getQpidSession() + .messageFlow(getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF); + if (_messageCounter.get() > 0) { - o = _synchronousQueue.take(); + o = _synchronousQueue.take(); } } } - if( o instanceof NullTocken ) + if (o instanceof NullTocken) { o = null; } return o; } - protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException + protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException { _messageCounter.decrementAndGet(); super.preApplicationProcessing(jmsMsg); @@ -507,14 +536,14 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By { } - + public void start() { - _isStarted = true; + _isStarted = true; } public void stop() { - _isStarted = false; + _isStarted = false; } }
\ No newline at end of file |
