diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2006-09-27 09:09:42 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2006-09-27 09:09:42 +0000 |
| commit | 1ba25d9231401f2f34ee41893d402e3cb2f299ed (patch) | |
| tree | f0effe6896595dd38c0bbc60350b522c5416f480 /java/client | |
| parent | 6ce702dfb4ea0e1835804efd328be2eee79e23b3 (diff) | |
| download | qpid-python-1ba25d9231401f2f34ee41893d402e3cb2f299ed.tar.gz | |
AMQProtocolSession.java - white space changes
BasicMessageProducer.java - white space changes
BasicMessageConsumer.java - white space changes
AMQSession.java - added a comment
MemoryMessageStore.java - white space changes
SubscriptionImpl.java AMQChannel.java - Removed race condition where two messages could get the same delivery tag and when using acks where messages can be added to the UnackMap out of sequence, Causing unknown message to ack exceptions.
DestNameExchange.java - white space/style changes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@450384 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
4 files changed, 23 insertions, 21 deletions
diff --git a/java/client/src/org/apache/qpid/client/AMQSession.java b/java/client/src/org/apache/qpid/client/AMQSession.java index 2c59e5f809..4768399036 100644 --- a/java/client/src/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/org/apache/qpid/client/AMQSession.java @@ -402,6 +402,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi //need to send ack for messages delivered to consumers so far for(Iterator i = _consumers.values().iterator(); i.hasNext();) { + //Sends acknowledgement to server ((BasicMessageConsumer) i.next()).acknowledgeLastDelivered(); } diff --git a/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java index 5d13a1cd41..b46c5f111d 100644 --- a/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java @@ -382,9 +382,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer try { AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.deliverBody.deliveryTag, - messageFrame.deliverBody.redelivered, - messageFrame.contentHeader, - messageFrame.bodies); + messageFrame.deliverBody.redelivered, + messageFrame.contentHeader, + messageFrame.bodies); _logger.debug("Message is of type: " + jmsMessage.getClass().getName()); diff --git a/java/client/src/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/org/apache/qpid/client/BasicMessageProducer.java index a6bc7a0781..694a4a7863 100644 --- a/java/client/src/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/org/apache/qpid/client/BasicMessageProducer.java @@ -17,13 +17,13 @@ */ package org.apache.qpid.client; +import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.JMSBytesMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.*; -import org.apache.log4j.Logger; -import org.apache.mina.common.ByteBuffer; import javax.jms.DeliveryMode; import javax.jms.Destination; @@ -122,10 +122,10 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j void resubscribe() throws AMQException { - if (_destination != null) - { - declareDestination(_destination); - } + if (_destination != null) + { + declareDestination(_destination); + } } private void declareDestination(AMQDestination destination) @@ -330,16 +330,17 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j if (!(destination instanceof AMQDestination)) { throw new JMSException("Unsupported destination class: " + - (destination != null?destination.getClass():null)); + (destination != null ? destination.getClass() : null)); } declareDestination((AMQDestination)destination); } protected void sendImpl(AMQDestination destination, AbstractJMSMessage message, int deliveryMode, int priority, - long timeToLive, boolean mandatory, boolean immediate) throws JMSException + long timeToLive, boolean mandatory, boolean immediate) throws JMSException { sendImpl(destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, _waitUntilSent); } + /** * The caller of this method must hold the failover mutex. * @param destination @@ -352,7 +353,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j * @throws JMSException */ protected void sendImpl(AMQDestination destination, AbstractJMSMessage message, int deliveryMode, int priority, - long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException + long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException { AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId, 0, destination.getExchangeName(), destination.getRoutingKey(), mandatory, immediate); @@ -366,10 +367,10 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j // // Very nasty temporary hack for GRM-206. Will be altered ASAP. // - if(message instanceof JMSBytesMessage) + if (message instanceof JMSBytesMessage) { JMSBytesMessage msg = (JMSBytesMessage) message; - if(!msg.isReadable()) + if (!msg.isReadable()) { msg.reset(); } @@ -442,7 +443,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j int dataLength = payload.remaining(); final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1; int lastFrame = (dataLength % framePayloadMax) > 0 ? 1 : 0; - int frameCount = (int) (dataLength/framePayloadMax) + lastFrame; + int frameCount = (int) (dataLength / framePayloadMax) + lastFrame; final ContentBody[] bodies = new ContentBody[frameCount]; if (frameCount == 1) diff --git a/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java index 77685a0222..b181490fdd 100644 --- a/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -230,7 +230,7 @@ public class AMQProtocolSession implements ProtocolVersionList } if (msg.isAllBodyDataReceived()) { - deliverMessageToAMQSession(channelId, msg); + deliverMessageToAMQSession(channelId, msg); } } @@ -260,8 +260,8 @@ public class AMQProtocolSession implements ProtocolVersionList public void writeFrame(AMQDataBlock frame, boolean wait) { - WriteFuture f =_minaProtocolSession.write(frame); - if(wait) + WriteFuture f = _minaProtocolSession.write(frame); + if (wait) { f.join(); } @@ -269,7 +269,7 @@ public class AMQProtocolSession implements ProtocolVersionList public void addSessionByChannel(int channelId, AMQSession session) { - if (channelId <=0) + if (channelId <= 0) { throw new IllegalArgumentException("Attempt to register a session with a channel id <= zero"); } @@ -283,7 +283,7 @@ public class AMQProtocolSession implements ProtocolVersionList public void removeSessionByChannel(int channelId) { - if (channelId <=0) + if (channelId <= 0) { throw new IllegalArgumentException("Attempt to deregister a session with a channel id <= zero"); } @@ -299,7 +299,7 @@ public class AMQProtocolSession implements ProtocolVersionList { _logger.debug("closeSession called on protocol session for session " + session.getChannelId()); final int channelId = session.getChannelId(); - if (channelId <=0) + if (channelId <= 0) { throw new IllegalArgumentException("Attempt to close a channel with id < 0"); } |
