diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2007-09-04 13:02:58 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2007-09-04 13:02:58 +0000 |
| commit | aadc8bf21d375f33c8db7625c6e84607c8f55488 (patch) | |
| tree | 7285b58230507b88abf0455a68851b1ba0046fc6 /java/client | |
| parent | 331b0e84ae06da0c057a82c0f5b67f550bcd636b (diff) | |
| download | qpid-python-aadc8bf21d375f33c8db7625c6e84607c8f55488.tar.gz | |
added byteBuffer to Stream converter
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@572656 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
6 files changed, 386 insertions, 99 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java index 1ab0b717ab..a3c03ca6d0 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java @@ -40,7 +40,8 @@ import org.apache.qpidity.jms.message.QpidMessage; /** * Implementation of JMS message consumer */ -public class MessageConsumerImpl extends MessageActor implements MessageConsumer, org.apache.qpidity.client.util.MessageListener +public class MessageConsumerImpl extends MessageActor + implements MessageConsumer, org.apache.qpidity.client.util.MessageListener { // we can receive up to 100 messages for an asynchronous listener public static final int MAX_MESSAGE_TRANSFERRED = 100; @@ -78,28 +79,19 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer private MessageListener _messageListener; /** - * The synchronous message just delivered - */ - private QpidMessage _incomingMessage; - - /** * A lcok on the syncrhonous message */ private final Object _incomingMessageLock = new Object(); - /** - * Indicates that this consumer is receiving a synch message - */ - private boolean _isReceiving = false; /** * Number of mesages received asynchronously * Nether exceed MAX_MESSAGE_TRANSFERRED */ private int _messageAsyncrhonouslyReceived = 0; - + private LinkedBlockingQueue<QpidMessage> _queue = new LinkedBlockingQueue<QpidMessage>(); - + //----- Constructors /** * Create a new MessageProducerImpl. @@ -126,7 +118,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer _subscriptionName = subscriptionName; _isStopped = getSession().isStopped(); // let's create a message part assembler - + MessagePartListener messageAssembler = new MessagePartListenerAdapter(this); if (destination instanceof Queue) @@ -283,12 +275,10 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer // Check if we can get a message immediately Message result; result = receiveNoWait(); - - if(result != null) + if (result != null) { return result; } - try { // Now issue a credit and wait for the broker to send a message @@ -296,7 +286,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer // This will only overload the broker. After the initial try we can wait // for the broker to send a message when it gets one requestCredit(1); - return (Message)_queue.take(); + return (Message) _queue.take(); } catch (Exception e) { @@ -323,19 +313,19 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer { throw new JMSException("Invalid timeout value: " + timeout); } - + Message result; try { // first check if we have any in the queue already - result = (Message)_queue.poll(); - if(result == null) + result = (Message) _queue.poll(); + if (result == null) { requestCredit(1); requestFlush(); // We shouldn't do a sync(). Bcos the timeout can happen // before the sync() returns - return (Message)_queue.poll(timeout,TimeUnit.MILLISECONDS); + return (Message) _queue.poll(timeout, TimeUnit.MILLISECONDS); } else { @@ -362,50 +352,71 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer try { // first check if we have any in the queue already - result = (Message)_queue.poll(); - if(result == null) + result = (Message) _queue.poll(); + if (result == null) { requestCredit(1); requestFlush(); requestSync(); - return (Message)_queue.poll(); + return (Message) _queue.poll(); } else { return result; - } + } } catch (Exception e) { throw ExceptionHelper.convertQpidExceptionToJMSException(e); } } - - // not public methods - private void requestCredit(int units) + + // not public methods + /** + * Upon receipt of this method, the broker adds "value" + * number of messages to the available credit balance for this consumer. + * + * @param value Number of credits, a value of 0 indicates an infinite amount of credit. + */ + private void requestCredit(int value) { getSession().getQpidSession() - .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, units); + .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, value); } - + + /** + * Forces the broker to exhaust its credit supply. + * <p> The broker's credit will always be zero when + * this method completes. + */ private void requestFlush() { getSession().getQpidSession().messageFlush(getMessageActorID()); } - + + /** + * Sync method will block until all outstanding broker + * commands + * are executed. + */ private void requestSync() { getSession().getQpidSession().sync(); } - + + /** + * Check whether this consumer is closed. + * + * @throws JMSException If this consumer is closed. + */ private void checkClosed() throws JMSException { - if(_isStopped) + if (_isStopped) { throw new JMSException("Session is closed"); } } - + /** * Stop the delivery of messages to this consumer. * <p>For asynchronous receiver, this operation blocks until the message listener @@ -428,10 +439,14 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer { synchronized (_incomingMessageLock) { - _isStopped = false; + _isStopped = false; } } + /** + * This method notifies this consumer that a message has been delivered + * @param message The received message. + */ public void onMessage(org.apache.qpidity.api.Message message) { try @@ -440,7 +455,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer if (checkPreConditions(jmsMessage)) { preApplicationProcessing(jmsMessage); - + if (_messageListener == null) { _queue.offer(jmsMessage); @@ -453,16 +468,16 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer notifyMessageListener(jmsMessage); } } - } + } catch (Exception e) { throw new RuntimeException(e.getMessage()); } } - - - public void notifyMessageListener(QpidMessage message)throws RuntimeException - { + + + public void notifyMessageListener(QpidMessage message) throws RuntimeException + { try { _messageAsyncrhonouslyReceived++; @@ -471,8 +486,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer // ask the server for the delivery of MAX_MESSAGE_TRANSFERRED more messages resetAsynchMessageReceived(); } - - + // The JMS specs says: /* The result of a listener throwing a RuntimeException depends on the session?s * acknowledgment mode. @@ -484,9 +498,9 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer * * The number of time we try redelivering the message is 0 **/ - try + try { - + _messageListener.onMessage((Message) message); } catch (RuntimeException re) @@ -494,14 +508,19 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer // do nothing as this message will not be redelivered } - + } catch (Exception e) { throw new RuntimeException(e.getMessage()); } } - + + /** + * Check whether this consumer is asynchronous + * + * @throws javax.jms.IllegalStateException If this consumer is asynchronous. + */ private void checkIfListenerSet() throws javax.jms.IllegalStateException { @@ -510,8 +529,14 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer throw new javax.jms.IllegalStateException("A listener has already been set."); } } - - private void preApplicationProcessing(QpidMessage message)throws Exception + + /** + * pre process a received message. + * + * @param message The message to pre-process. + * @throws Exception If the message cannot be pre-processed due to some internal error. + */ + private void preApplicationProcessing(QpidMessage message) throws Exception { getSession().preProcessMessage(message); // If the session is transacted we need to ack the message first @@ -522,41 +547,54 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer } message.afterMessageReceive(); } - - private boolean checkPreConditions(QpidMessage message)throws QpidException + + /** + * Check whether a message can be delivered to this consumer. + * + * @param message The message to be checked. + * @return true if the message matches the selector and can be acquired, false otherwise. + * @throws QpidException If the message preConditions cannot be checked due to some internal error. + */ + private boolean checkPreConditions(QpidMessage message) throws QpidException { boolean messageOk = true; if (_messageSelector != null) { - messageOk = _filter.matches((Message) message); - if (!messageOk) - { - System.out.println("Message not OK, releasing"); - releaseMessage(message); - return false; - } + messageOk = _filter.matches((Message) message); + } + if (_logger.isDebugEnabled()) + { + _logger.debug("messageOk " + messageOk); + _logger.debug("_preAcquire " + _preAcquire); } - - System.out.println("messageOk " + messageOk); - System.out.println("_preAcquire " + _preAcquire); - if (!messageOk && _preAcquire) { // this is the case for topics // We need to ack this message - System.out.println("filterMessage - trying to ack message"); + if (_logger.isDebugEnabled()) + { + _logger.debug("filterMessage - trying to ack message"); + } acknowledgeMessage(message); - System.out.println("filterMessage - acked message"); + } + else if (!messageOk) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Message not OK, releasing"); + } + releaseMessage(message); } // now we need to acquire this message if needed // this is the case of queue with a message selector set if (!_preAcquire && messageOk) { - System.out.println("filterMessage - trying to acquire message"); + if (_logger.isDebugEnabled()) + { + _logger.debug("filterMessage - trying to acquire message"); + } messageOk = acquireMessage(message); - System.out.println("filterMessage - acquired message"); } - return messageOk; } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java index b9be446db5..ef2590bcee 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java @@ -836,25 +836,10 @@ public class BytesMessageImpl extends MessageImpl implements BytesMessage { try { - /* - * messageData.array() throws an UnsupportedOperationException - System.out.println("messageData Array : " +messageData.array().length); - - _dataIn = new DataInputStream( - new ByteArrayInputStream(messageData.array(), messageData.arrayOffset() + messageData.position() - , messageData.remaining())); - */ - - // temp hack - byte[] b = new byte[messageData.limit()]; - messageData.get(b); - _dataIn = new DataInputStream( - new ByteArrayInputStream(b)); - + _dataIn = new DataInputStream(asInputStream()); } catch (Exception e) { - e.printStackTrace(); throw new QpidException("Cannot retrieve data from message ", null, e); } } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java index fceb1a300a..9ec74f1dc6 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java @@ -594,10 +594,7 @@ public class MapMessageImpl extends MessageImpl implements MapMessage { try { - ByteArrayInputStream bais = new ByteArrayInputStream(messageData.array(), - messageData.arrayOffset() + messageData.position(), - messageData.remaining()); - ObjectInputStream ois = new ObjectInputStream(bais); + ObjectInputStream ois = new ObjectInputStream(asInputStream()); _map = (Map<String, Object>) ois.readObject(); } catch (IOException ioe) diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java index e1e57d9441..4b843f45f9 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java @@ -23,6 +23,12 @@ import org.apache.qpidity.QpidException; import javax.jms.*; import java.util.Enumeration; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.CoderResult; /** * Implementation of javax.jms.Message @@ -56,7 +62,7 @@ public class MessageImpl extends QpidMessage implements Message /** * Indicate whether the message properties are in writeable status. */ - protected boolean _proertiesReadOnly = false; + protected boolean _propertiesReadOnly = false; /** * The message consumer through which this message was received. @@ -83,7 +89,7 @@ public class MessageImpl extends QpidMessage implements Message { super(message); } - + //---- javax.jms.Message interface /** * Get the message ID. @@ -506,7 +512,7 @@ public class MessageImpl extends QpidMessage implements Message { // The properties can now be written // Properties are read only when the message is received. - _proertiesReadOnly = false; + _propertiesReadOnly = false; super.clearMessageProperties(); } @@ -827,7 +833,7 @@ public class MessageImpl extends QpidMessage implements Message */ public void setObjectProperty(String name, Object value) throws JMSException { - if (_proertiesReadOnly) + if (_propertiesReadOnly) { throw new MessageNotWriteableException("Error the message properties are read only"); } @@ -895,7 +901,7 @@ public class MessageImpl extends QpidMessage implements Message // recreate a destination object for the encoded ReplyTo destination (if it exists) // _replyTo = // todo - _proertiesReadOnly = true; + _propertiesReadOnly = true; _readOnly = true; } @@ -924,4 +930,87 @@ public class MessageImpl extends QpidMessage implements Message _messageConsumer = messageConsumer; } + /** + * Returns an {@link java.io.InputStream} that reads the data from this mesage buffer. + * {@link java.io.InputStream#read()} returns <tt>-1</tt> if the buffer position + * reaches to the limit. + * + * @return An {@link java.io.InputStream} that reads the data from this mesage buffer. + */ + public InputStream asInputStream() + { + return new InputStream() + { + @Override + public int available() + { + return getMessageData().remaining(); + } + + @Override + public synchronized void mark(int readlimit) + { + getMessageData().mark(); + } + + @Override + public boolean markSupported() + { + return true; + } + + @Override + public int read() + { + if (getMessageData().hasRemaining()) + { + return getMessageData().get() & 0xff; + } + else + { + return -1; + } + } + + @Override + public int read(byte[] b, int off, int len) + { + int remaining = getMessageData().remaining(); + if (remaining > 0) + { + int readBytes = Math.min(remaining, len); + getMessageData().get(b, off, readBytes); + return readBytes; + } + else + { + return -1; + } + } + + @Override + public synchronized void reset() + { + getMessageData().reset(); + } + + @Override + public long skip(long n) + { + int bytes; + if (n > Integer.MAX_VALUE) + { + bytes = getMessageData().remaining(); + } + else + { + bytes = Math.min(getMessageData().remaining(), (int) n); + } + getMessageData().position(getMessageData().position() + bytes); + return bytes; + } + }; + } + + } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java index 45b7ac8c96..8f31fd228a 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java @@ -158,10 +158,7 @@ public class ObjectMessageImpl extends MessageImpl implements ObjectMessage { try { - ByteArrayInputStream bais = new ByteArrayInputStream(messageData.array(), - messageData.arrayOffset() + messageData.position(), - messageData.remaining()); - ObjectInputStream ois = new ObjectInputStream(bais); + ObjectInputStream ois = new ObjectInputStream(asInputStream()); _object = (Serializable) ois.readObject(); } catch (IOException ioe) diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java index 647d1fe5b5..199012c680 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java @@ -22,6 +22,8 @@ import org.apache.qpidity.QpidException; import javax.jms.TextMessage; import javax.jms.JMSException; import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.*; import java.io.UnsupportedEncodingException; /** @@ -119,10 +121,9 @@ public class TextMessageImpl extends MessageImpl implements TextMessage { try { - _messageText = new String(messageData.array(), messageData.arrayOffset() + messageData.position(), - messageData.remaining(), CHARACTER_ENCODING); + _messageText = getString(); } - catch (UnsupportedEncodingException e) + catch (Exception e) { throw new QpidException("Problem when decoding text", null, e); } @@ -143,5 +144,185 @@ public class TextMessageImpl extends MessageImpl implements TextMessage super.clearBody(); _messageText = null; } + + /** + * This method is taken from Mina code + * + * Reads a <code>NUL</code>-terminated string from this buffer using the + * specified <code>decoder</code> and returns it. This method reads + * until the limit of this buffer if no <tt>NUL</tt> is found. + * + * @return + * @throws java.nio.charset.CharacterCodingException + * + */ + public String getString() throws CharacterCodingException + { + if (!getMessageData().hasRemaining()) + { + return ""; + } + Charset charset = Charset.forName(CHARACTER_ENCODING); + CharsetDecoder decoder = charset.newDecoder(); + + boolean utf16 = decoder.charset().name().startsWith("UTF-16"); + + int oldPos = getMessageData().position(); + int oldLimit = getMessageData().limit(); + int end = -1; + int newPos; + + if (!utf16) + { + end = indexOf((byte) 0x00); + if (end < 0) + { + newPos = end = oldLimit; + } + else + { + newPos = end + 1; + } + } + else + { + int i = oldPos; + for (; ;) + { + boolean wasZero = getMessageData().get(i) == 0; + i++; + + if (i >= oldLimit) + { + break; + } + + if (getMessageData().get(i) != 0) + { + i++; + if (i >= oldLimit) + { + break; + } + else + { + continue; + } + } + + if (wasZero) + { + end = i - 1; + break; + } + } + + if (end < 0) + { + newPos = end = oldPos + ((oldLimit - oldPos) & 0xFFFFFFFE); + } + else + { + if (end + 2 <= oldLimit) + { + newPos = end + 2; + } + else + { + newPos = end; + } + } + } + + if (oldPos == end) + { + getMessageData().position(newPos); + return ""; + } + + getMessageData().limit(end); + decoder.reset(); + + int expectedLength = (int) (getMessageData().remaining() * decoder.averageCharsPerByte()) + 1; + CharBuffer out = CharBuffer.allocate(expectedLength); + for (; ;) + { + CoderResult cr; + if (getMessageData().hasRemaining()) + { + cr = decoder.decode(getMessageData(), out, true); + } + else + { + cr = decoder.flush(out); + } + + if (cr.isUnderflow()) + { + break; + } + + if (cr.isOverflow()) + { + CharBuffer o = CharBuffer.allocate(out.capacity() + expectedLength); + out.flip(); + o.put(out); + out = o; + continue; + } + + if (cr.isError()) + { + // Revert the buffer back to the previous state. + getMessageData().limit(oldLimit); + getMessageData().position(oldPos); + cr.throwException(); + } + } + + getMessageData().limit(oldLimit); + getMessageData().position(newPos); + return out.flip().toString(); + } + + /** + * Returns the first occurence position of the specified byte from the current position to + * the current limit. + * + * @return <tt>-1</tt> if the specified byte is not found + * @param b + */ + public int indexOf(byte b) + { + if (getMessageData().hasArray()) + { + int arrayOffset = getMessageData().arrayOffset(); + int beginPos = arrayOffset + getMessageData().position(); + int limit = arrayOffset + getMessageData().limit(); + byte[] array = getMessageData().array(); + + for (int i = beginPos; i < limit; i++) + { + if (array[i] == b) + { + return i - arrayOffset; + } + } + } + else + { + int beginPos = getMessageData().position(); + int limit = getMessageData().limit(); + + for (int i = beginPos; i < limit; i++) + { + if (getMessageData().get(i) == b) + { + return i; + } + } + } + return -1; + } } |
