diff options
Diffstat (limited to 'java/client')
4 files changed, 23 insertions, 21 deletions
diff --git a/java/client/src/org/apache/qpid/client/AMQSession.java b/java/client/src/org/apache/qpid/client/AMQSession.java index 2c59e5f809..4768399036 100644 --- a/java/client/src/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/org/apache/qpid/client/AMQSession.java @@ -402,6 +402,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi //need to send ack for messages delivered to consumers so far for(Iterator i = _consumers.values().iterator(); i.hasNext();) { + //Sends acknowledgement to server ((BasicMessageConsumer) i.next()).acknowledgeLastDelivered(); } diff --git a/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java index 5d13a1cd41..b46c5f111d 100644 --- a/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java @@ -382,9 +382,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer try { AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.deliverBody.deliveryTag, - messageFrame.deliverBody.redelivered, - messageFrame.contentHeader, - messageFrame.bodies); + messageFrame.deliverBody.redelivered, + messageFrame.contentHeader, + messageFrame.bodies); _logger.debug("Message is of type: " + jmsMessage.getClass().getName()); diff --git a/java/client/src/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/org/apache/qpid/client/BasicMessageProducer.java index a6bc7a0781..694a4a7863 100644 --- a/java/client/src/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/org/apache/qpid/client/BasicMessageProducer.java @@ -17,13 +17,13 @@ */ package org.apache.qpid.client; +import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.JMSBytesMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.*; -import org.apache.log4j.Logger; -import org.apache.mina.common.ByteBuffer; import javax.jms.DeliveryMode; import javax.jms.Destination; @@ -122,10 +122,10 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j void resubscribe() throws AMQException { - if (_destination != null) - { - declareDestination(_destination); - } + if (_destination != null) + { + declareDestination(_destination); + } } private void declareDestination(AMQDestination destination) @@ -330,16 +330,17 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j if (!(destination instanceof AMQDestination)) { throw new JMSException("Unsupported destination class: " + - (destination != null?destination.getClass():null)); + (destination != null ? destination.getClass() : null)); } declareDestination((AMQDestination)destination); } protected void sendImpl(AMQDestination destination, AbstractJMSMessage message, int deliveryMode, int priority, - long timeToLive, boolean mandatory, boolean immediate) throws JMSException + long timeToLive, boolean mandatory, boolean immediate) throws JMSException { sendImpl(destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, _waitUntilSent); } + /** * The caller of this method must hold the failover mutex. * @param destination @@ -352,7 +353,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j * @throws JMSException */ protected void sendImpl(AMQDestination destination, AbstractJMSMessage message, int deliveryMode, int priority, - long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException + long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException { AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId, 0, destination.getExchangeName(), destination.getRoutingKey(), mandatory, immediate); @@ -366,10 +367,10 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j // // Very nasty temporary hack for GRM-206. Will be altered ASAP. // - if(message instanceof JMSBytesMessage) + if (message instanceof JMSBytesMessage) { JMSBytesMessage msg = (JMSBytesMessage) message; - if(!msg.isReadable()) + if (!msg.isReadable()) { msg.reset(); } @@ -442,7 +443,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j int dataLength = payload.remaining(); final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1; int lastFrame = (dataLength % framePayloadMax) > 0 ? 1 : 0; - int frameCount = (int) (dataLength/framePayloadMax) + lastFrame; + int frameCount = (int) (dataLength / framePayloadMax) + lastFrame; final ContentBody[] bodies = new ContentBody[frameCount]; if (frameCount == 1) diff --git a/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java index 77685a0222..b181490fdd 100644 --- a/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -230,7 +230,7 @@ public class AMQProtocolSession implements ProtocolVersionList } if (msg.isAllBodyDataReceived()) { - deliverMessageToAMQSession(channelId, msg); + deliverMessageToAMQSession(channelId, msg); } } @@ -260,8 +260,8 @@ public class AMQProtocolSession implements ProtocolVersionList public void writeFrame(AMQDataBlock frame, boolean wait) { - WriteFuture f =_minaProtocolSession.write(frame); - if(wait) + WriteFuture f = _minaProtocolSession.write(frame); + if (wait) { f.join(); } @@ -269,7 +269,7 @@ public class AMQProtocolSession implements ProtocolVersionList public void addSessionByChannel(int channelId, AMQSession session) { - if (channelId <=0) + if (channelId <= 0) { throw new IllegalArgumentException("Attempt to register a session with a channel id <= zero"); } @@ -283,7 +283,7 @@ public class AMQProtocolSession implements ProtocolVersionList public void removeSessionByChannel(int channelId) { - if (channelId <=0) + if (channelId <= 0) { throw new IllegalArgumentException("Attempt to deregister a session with a channel id <= zero"); } @@ -299,7 +299,7 @@ public class AMQProtocolSession implements ProtocolVersionList { _logger.debug("closeSession called on protocol session for session " + session.getChannelId()); final int channelId = session.getChannelId(); - if (channelId <=0) + if (channelId <= 0) { throw new IllegalArgumentException("Attempt to close a channel with id < 0"); } |
