diff options
Diffstat (limited to 'java/client')
4 files changed, 205 insertions, 356 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java b/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java index 20a0542bf6..ac4f4dbd06 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java +++ b/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java @@ -22,8 +22,8 @@ public class JMSTestCase msg.writeInt(123); prod.send(msg); - javax.jms.Message m = cons.receive(); - System.out.println(m); + javax.jms.BytesMessage m = (javax.jms.BytesMessage)cons.receive(); + System.out.println("Data : " + m.readInt()); } catch(Exception e) diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java index b8718d687c..0f7342f1ab 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java @@ -17,24 +17,30 @@ */ package org.apache.qpidity.jms; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Queue; -import org.apache.qpidity.jms.message.QpidMessage; -import org.apache.qpidity.RangeSet; -import org.apache.qpidity.QpidException; import org.apache.qpidity.Option; -import org.apache.qpidity.filter.MessageFilter; -import org.apache.qpidity.filter.JMSSelectorFilter; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.RangeSet; import org.apache.qpidity.client.MessagePartListener; import org.apache.qpidity.client.util.MessagePartListenerAdapter; import org.apache.qpidity.exchange.ExchangeDefaults; - -import javax.jms.*; +import org.apache.qpidity.filter.JMSSelectorFilter; +import org.apache.qpidity.filter.MessageFilter; +import org.apache.qpidity.jms.message.MessageFactory; +import org.apache.qpidity.jms.message.QpidMessage; /** * Implementation of JMS message consumer */ -public class MessageConsumerImpl extends MessageActor implements MessageConsumer +public class MessageConsumerImpl extends MessageActor implements MessageConsumer, org.apache.qpidity.client.util.MessageListener { // we can receive up to 100 messages for an asynchronous listener public static final int MAX_MESSAGE_TRANSFERRED = 100; @@ -91,9 +97,9 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer * Nether exceed MAX_MESSAGE_TRANSFERRED */ private int _messageAsyncrhonouslyReceived = 0; - - private AtomicBoolean _messageReceived = new AtomicBoolean(); - + + private LinkedBlockingQueue<QpidMessage> _queue = new LinkedBlockingQueue<QpidMessage>(); + //----- Constructors /** * Create a new MessageProducerImpl. @@ -120,11 +126,8 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer _subscriptionName = subscriptionName; _isStopped = getSession().isStopped(); // let's create a message part assembler - /** - * A Qpid message listener that pushes messages to this consumer session when this consumer is - * asynchronous or directly to this consumer when it is synchronously accessed. - */ - MessagePartListener messageAssembler = new MessagePartListenerAdapter(new QpidMessageListener(this)); + + MessagePartListener messageAssembler = new MessagePartListenerAdapter(this); if (destination instanceof Queue) { @@ -183,10 +186,8 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer // this will prevent the broker from sending more than one message // When a messageListener is set the flow will be adjusted. // until then we assume it's for synchronous message consumption - getSession().getQpidSession() - .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1); - - getSession().getQpidSession().sync(); + requestCredit(1); + requestSync(); // check for an exception if (getSession().getCurrentException() != null) { @@ -266,9 +267,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer getSession().getQpidSession().messageStop(getMessageActorID()); } _messageAsyncrhonouslyReceived = 0; - getSession().getQpidSession() - .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, - MAX_MESSAGE_TRANSFERRED); + requestCredit(MAX_MESSAGE_TRANSFERRED); } /** @@ -281,7 +280,28 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer */ public Message receive() throws JMSException { - return receive(0); + // Check if we can get a message immediately + Message result; + result = receiveNoWait(); + + if(result != null) + { + return result; + } + + try + { + // Now issue a credit and wait for the broker to send a message + // IMO no point doing a credit() flush() and sync() in a loop. + // This will only overload the broker. After the initial try we can wait + // for the broker to send a message when it gets one + requestCredit(1); + return (Message)_queue.take(); + } + catch (Exception e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } } /** @@ -297,20 +317,35 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer */ public Message receive(long timeout) throws JMSException { + checkClosed(); + checkIfListenerSet(); if (timeout < 0) { throw new JMSException("Invalid timeout value: " + timeout); } + Message result; try { - result = internalReceive(timeout); + // first check if we have any in the queue already + result = (Message)_queue.poll(); + if(result == null) + { + requestCredit(1); + requestFlush(); + // We shouldn't do a sync(). Bcos the timeout can happen + // before the sync() returns + return (Message)_queue.poll(timeout,TimeUnit.MILLISECONDS); + } + else + { + return result; + } } catch (Exception e) { throw ExceptionHelper.convertQpidExceptionToJMSException(e); } - return result; } /** @@ -321,138 +356,56 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer */ public Message receiveNoWait() throws JMSException { + checkClosed(); + checkIfListenerSet(); Message result; try { - result = internalReceive(-1); + // first check if we have any in the queue already + result = (Message)_queue.poll(); + if(result == null) + { + requestCredit(1); + requestFlush(); + requestSync(); + return (Message)_queue.poll(); + } + else + { + return result; + } } catch (Exception e) { throw ExceptionHelper.convertQpidExceptionToJMSException(e); } - return result; } - - // not public methods - - /** - * Receive a synchronous message - * <p> This call blocks until a message arrives, the timeout expires, or this message consumer - * is closed. - * <p> A timeout of zero never expires, and the call blocks indefinitely (unless this message consumer - * is closed) - * <p> A timeout less than 0 returns the next message or null if one is not available. - * - * @param timeout The timeout value (in milliseconds) - * @return the next message or null if one is not available. - * @throws Exception If receiving the next message fails due to some internal error. - */ - private Message internalReceive(long timeout) throws Exception + + // not public methods + private void requestCredit(int units) { - checkNotClosed(); - Message result = null; - - if (_messageListener != null) - { - throw new javax.jms.IllegalStateException("A listener has already been set."); - } - - synchronized (_incomingMessageLock) - { - // This indicate to the delivery thread to deliver the message to this consumer - // as it can happens that a message is delivered after a receive operation as returned. - _isReceiving = true; - boolean blockingReceived = timeout == 0; - if (!_isStopped) - { - // if this consumer is stopped then this will be call when starting - requestOneMessage(); - //When sync() returns we know whether we have received a message or not. - System.out.println("Internal receive -- Called sync()"); - getSession().getQpidSession().sync(); - System.out.println("Internal receive -- Returned from sync()"); - } - if (_messageReceived.get() && timeout < 0) - { - // this is a nowait and we havent received a message then we must immediatly return - result = null; - } - else - { - boolean messageReceived = false; - while (!messageReceived) - { - long timeBeforeWait = 0; - while (_incomingMessage == null && !_isClosed) - { - if (!blockingReceived) - { - timeBeforeWait = System.currentTimeMillis(); - } - try - { - _incomingMessageLock.wait(timeout); - } - catch (InterruptedException e) - { - throw new RuntimeException(e); - } - } - if (_incomingMessage != null) - { - result = (Message) _incomingMessage; - // tell the session that a message is inprocess - getSession().preProcessMessage(_incomingMessage); - // tell the session to acknowledge this message (if required) - getSession().acknowledgeMessage(_incomingMessage); - _incomingMessage.afterMessageReceive(); - messageReceived = true; - } - else - { - //now setup the new timeout - if (!blockingReceived) - { - timeout = timeout - (System.currentTimeMillis() - timeBeforeWait); - } - if (!_isClosed) - { - // we need to request a new message - requestOneMessage(); - getSession().getQpidSession().sync(); - if (_messageReceived.get() && timeout < 0) - { - // we are waiting for too long and we haven't received a proper message - result = null; - messageReceived = true; - } - } - } - } - _incomingMessage = null; - } - // We now release any message received for this consumer - _isReceiving = false; - getSession().testQpidException(); - } - return result; + getSession().getQpidSession() + .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, units); } - - /** - * Request a single message - */ - private void requestOneMessage() + + private void requestFlush() + { + getSession().getQpidSession().messageFlush(getMessageActorID()); + } + + private void requestSync() + { + getSession().getQpidSession().sync(); + } + + private void checkClosed() throws JMSException { - if (_logger.isDebugEnabled()) + if(_isStopped) { - _logger.debug("Requesting a single message"); + throw new JMSException("Session is closed"); } - getSession().getQpidSession() - .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1); - getSession().getQpidSession().messageFlush(getMessageActorID()); - _messageReceived.set(false); } - + /** * Stop the delivery of messages to this consumer. * <p>For asynchronous receiver, this operation blocks until the message listener @@ -475,93 +428,42 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer { synchronized (_incomingMessageLock) { - _isStopped = false; - if (_isReceiving) - { - // there is a synch call waiting for a message to be delivered - // so tell the broker to deliver a message - getSession().getQpidSession() - .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, - 1); - getSession().getQpidSession().messageFlush(getMessageActorID()); - } + _isStopped = false; } } - /** - * Deliver a message to this consumer. - * - * @param message The message delivered to this consumer. - */ - protected synchronized void onMessage(QpidMessage message) - { + public void onMessage(org.apache.qpidity.api.Message message) + { try { - // if there is a message selector then we need to evaluate it. - boolean messageOk = true; - if (_messageSelector != null) - { - messageOk = _filter.matches((Message) message); - } - - System.out.println("Received a message- onMessage in message consumer Impl"); - if (!messageOk && _preAcquire) + QpidMessage jmsMessage = MessageFactory.getQpidMessage(message); + if (_messageListener == null) { - // this is the case for topics - // We need to ack this message - System.out.println("onMessage - trying to ack message"); - acknowledgeMessage(message); - System.out.println("onMessage - acked message"); + _queue.offer(jmsMessage); } - // now we need to acquire this message if needed - // this is the case of queue with a message selector set - if (!_preAcquire && messageOk) + else { - System.out.println("onMessage - trying to acquire message"); - messageOk = acquireMessage(message); - System.out.println("onMessage - acquired message"); + // I still think we don't need that additional thread in SessionImpl + // if the Application blocks on a message thats fine + // getSession().dispatchMessage(getMessageActorID(), jmsMessage); + notifyMessageListener(jmsMessage); } + } + catch (Exception e) + { + throw new RuntimeException(e.getMessage()); + } + } + + + public void notifyMessageListener(QpidMessage message)throws RuntimeException + { + try + { + boolean messageOk = checkPreConditions(message); - // if this consumer is synchronous then set the current message and - // notify the waiting thread - if (_messageListener == null) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Received a message- onMessage in message consumer Impl"); - } - synchronized (_incomingMessageLock) - { - System.out.println("got incomming message lock"); - if (messageOk) - { - // we have received a proper message that we can deliver - if (_isReceiving) - { - System.out.println("Is receiving true, setting message and notifying"); - _incomingMessage = message; - _incomingMessageLock.notify(); - } - else - { - // this message has been received after a received as returned - // we need to release it - releaseMessage(message); - } - } - else - { - // oups the message did not match the selector or we did not manage to acquire it - // If the receiver is still waiting for a message - // then we need to request a new one from the server - if (_isReceiving) - { - _incomingMessageLock.notify(); - } - } - } - } - else + // only deliver the message if it is valid + if (messageOk) { _messageAsyncrhonouslyReceived++; if (_messageAsyncrhonouslyReceived >= MAX_MESSAGE_TRANSFERRED) @@ -569,56 +471,93 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer // ask the server for the delivery of MAX_MESSAGE_TRANSFERRED more messages resetAsynchMessageReceived(); } - // only deliver the message if it is valid - if (messageOk) + + preApplicationProcessing(message); + // The JMS specs says: + /* The result of a listener throwing a RuntimeException depends on the session?s + * acknowledgment mode. + ? --- AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE - the message + * will be immediately redelivered. The number of times a JMS provider will + * redeliver the same message before giving up is provider-dependent. + ? --- CLIENT_ACKNOWLEDGE - the next message for the listener is delivered. + * --- Transacted Session - the next message for the listener is delivered. + * + * The number of time we try redelivering the message is 0 + **/ + try + { + + _messageListener.onMessage((Message) message); + } + catch (RuntimeException re) { - // This is an asynchronous message - // tell the session that a message is in process - getSession().preProcessMessage(message); - // If the session is transacted we need to ack the message first - // This is because a message is associated with its tx only when acked - if (getSession().getTransacted()) - { - getSession().acknowledgeMessage(message); - } - // The JMS specs says: - /* The result of a listener throwing a RuntimeException depends on the session?s - * acknowledgment mode. - ? --- AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE - the message - * will be immediately redelivered. The number of times a JMS provider will - * redeliver the same message before giving up is provider-dependent. - ? --- CLIENT_ACKNOWLEDGE - the next message for the listener is delivered. - * --- Transacted Session - the next message for the listener is delivered. - * - * The number of time we try redelivering the message is 0 - **/ - try - { - message.afterMessageReceive(); - _messageListener.onMessage((Message) message); - } - catch (RuntimeException re) - { - // do nothing as this message will not be redelivered - } - // If the session has been recovered we then need to redelivered this message - if (getSession().isInRecovery()) - { - releaseMessage(message); - } - else if (!getSession().getTransacted()) - { - // Tell the jms Session to ack this message if required - getSession().acknowledgeMessage(message); - } + // do nothing as this message will not be redelivered } } + } catch (Exception e) { throw new RuntimeException(e.getMessage()); } } + + private void checkIfListenerSet() throws javax.jms.IllegalStateException + { + + if (_messageListener != null) + { + throw new javax.jms.IllegalStateException("A listener has already been set."); + } + } + + private void preApplicationProcessing(QpidMessage message)throws Exception + { + getSession().preProcessMessage(message); + // If the session is transacted we need to ack the message first + // This is because a message is associated with its tx only when acked + if (getSession().getTransacted()) + { + getSession().acknowledgeMessage(message); + } + message.afterMessageReceive(); + } + + private boolean checkPreConditions(QpidMessage message)throws QpidException + { + boolean messageOk = true; + if (_messageSelector != null) + { + messageOk = _filter.matches((Message) message); + if (!messageOk) + { + System.out.println("Message not OK, releasing"); + releaseMessage(message); + } + } + + System.out.println("messageOk " + messageOk); + System.out.println("_preAcquire " + _preAcquire); + + if (!messageOk && _preAcquire) + { + // this is the case for topics + // We need to ack this message + System.out.println("filterMessage - trying to ack message"); + acknowledgeMessage(message); + System.out.println("filterMessage - acked message"); + } + // now we need to acquire this message if needed + // this is the case of queue with a message selector set + if (!_preAcquire && messageOk) + { + System.out.println("filterMessage - trying to acquire message"); + messageOk = acquireMessage(message); + System.out.println("filterMessage - acquired message"); + } + + return messageOk; + } /** * Release a message @@ -681,9 +620,4 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer getSession().testQpidException(); } } - - public void notifyMessageReceived() - { - _messageReceived.set(true); - } } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java b/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java deleted file mode 100644 index 4dbf86a388..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java +++ /dev/null @@ -1,86 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpidity.jms; - -import org.apache.qpidity.jms.message.QpidMessage; -import org.apache.qpidity.jms.message.MessageFactory; -import org.apache.qpidity.api.Message; -import org.apache.qpidity.client.util.MessageListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * <p> When asynchronous, upon receive of a message this listener delegate the dispatching to its session. - * This is for guarantying that asynch messages are sequentially processed within their session. - * <p> when used synchonously, messages are dispatched to the receiver itself. - */ -public class QpidMessageListener implements MessageListener -{ - /** - * Used for debugging. - */ - private static final Logger _logger = LoggerFactory.getLogger(SessionImpl.class); - - /** - * This message listener consumer - */ - MessageConsumerImpl _consumer = null; - - //---- constructor - /** - * Create a message listener wrapper for a given consumer - * - * @param consumer The consumer of this listener - */ - public QpidMessageListener(MessageConsumerImpl consumer) - { - _consumer = consumer; - } - - //---- org.apache.qpidity.MessagePartListener API - /** - * Deliver a message to the listener. - * - * @param message The message delivered to the listner. - */ - public void onMessage(Message message) - { - try - { - // to be used with flush - _consumer.notifyMessageReceived(); - - //convert this message into a JMS one - QpidMessage jmsMessage = MessageFactory.getQpidMessage(message); - // if consumer is asynchronous then send this message to its session. - if( _consumer.getMessageListener() != null ) - { - _consumer.getSession().dispatchMessage(_consumer.getMessageActorID(), jmsMessage); - } - else - { - // deliver this message to the consumer itself - _consumer.onMessage(jmsMessage); - } - } - catch (Exception e) - { - throw new RuntimeException(e.getMessage()); - } - } -} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java index f08fdba373..010a43ab41 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java @@ -1301,7 +1301,8 @@ public class SessionImpl implements Session { try { - mc.onMessage(message.getMessage()); + // mc.onMessage(message.getMessage()); + mc.notifyMessageListener(message.getMessage()); } catch (RuntimeException t) { |
