summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/org/apache/qpid/client/AMQSession.java1
-rw-r--r--java/client/src/org/apache/qpid/client/BasicMessageConsumer.java6
-rw-r--r--java/client/src/org/apache/qpid/client/BasicMessageProducer.java25
-rw-r--r--java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java12
4 files changed, 23 insertions, 21 deletions
diff --git a/java/client/src/org/apache/qpid/client/AMQSession.java b/java/client/src/org/apache/qpid/client/AMQSession.java
index 2c59e5f809..4768399036 100644
--- a/java/client/src/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/org/apache/qpid/client/AMQSession.java
@@ -402,6 +402,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
//need to send ack for messages delivered to consumers so far
for(Iterator i = _consumers.values().iterator(); i.hasNext();)
{
+ //Sends acknowledgement to server
((BasicMessageConsumer) i.next()).acknowledgeLastDelivered();
}
diff --git a/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java
index 5d13a1cd41..b46c5f111d 100644
--- a/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java
@@ -382,9 +382,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
try
{
AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.deliverBody.deliveryTag,
- messageFrame.deliverBody.redelivered,
- messageFrame.contentHeader,
- messageFrame.bodies);
+ messageFrame.deliverBody.redelivered,
+ messageFrame.contentHeader,
+ messageFrame.bodies);
_logger.debug("Message is of type: " + jmsMessage.getClass().getName());
diff --git a/java/client/src/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/org/apache/qpid/client/BasicMessageProducer.java
index a6bc7a0781..694a4a7863 100644
--- a/java/client/src/org/apache/qpid/client/BasicMessageProducer.java
+++ b/java/client/src/org/apache/qpid/client/BasicMessageProducer.java
@@ -17,13 +17,13 @@
*/
package org.apache.qpid.client;
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.JMSBytesMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.*;
-import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
@@ -122,10 +122,10 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
void resubscribe() throws AMQException
{
- if (_destination != null)
- {
- declareDestination(_destination);
- }
+ if (_destination != null)
+ {
+ declareDestination(_destination);
+ }
}
private void declareDestination(AMQDestination destination)
@@ -330,16 +330,17 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
if (!(destination instanceof AMQDestination))
{
throw new JMSException("Unsupported destination class: " +
- (destination != null?destination.getClass():null));
+ (destination != null ? destination.getClass() : null));
}
declareDestination((AMQDestination)destination);
}
protected void sendImpl(AMQDestination destination, AbstractJMSMessage message, int deliveryMode, int priority,
- long timeToLive, boolean mandatory, boolean immediate) throws JMSException
+ long timeToLive, boolean mandatory, boolean immediate) throws JMSException
{
sendImpl(destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, _waitUntilSent);
}
+
/**
* The caller of this method must hold the failover mutex.
* @param destination
@@ -352,7 +353,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
* @throws JMSException
*/
protected void sendImpl(AMQDestination destination, AbstractJMSMessage message, int deliveryMode, int priority,
- long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException
+ long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException
{
AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId, 0, destination.getExchangeName(),
destination.getRoutingKey(), mandatory, immediate);
@@ -366,10 +367,10 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
//
// Very nasty temporary hack for GRM-206. Will be altered ASAP.
//
- if(message instanceof JMSBytesMessage)
+ if (message instanceof JMSBytesMessage)
{
JMSBytesMessage msg = (JMSBytesMessage) message;
- if(!msg.isReadable())
+ if (!msg.isReadable())
{
msg.reset();
}
@@ -442,7 +443,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
int dataLength = payload.remaining();
final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
int lastFrame = (dataLength % framePayloadMax) > 0 ? 1 : 0;
- int frameCount = (int) (dataLength/framePayloadMax) + lastFrame;
+ int frameCount = (int) (dataLength / framePayloadMax) + lastFrame;
final ContentBody[] bodies = new ContentBody[frameCount];
if (frameCount == 1)
diff --git a/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 77685a0222..b181490fdd 100644
--- a/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -230,7 +230,7 @@ public class AMQProtocolSession implements ProtocolVersionList
}
if (msg.isAllBodyDataReceived())
{
- deliverMessageToAMQSession(channelId, msg);
+ deliverMessageToAMQSession(channelId, msg);
}
}
@@ -260,8 +260,8 @@ public class AMQProtocolSession implements ProtocolVersionList
public void writeFrame(AMQDataBlock frame, boolean wait)
{
- WriteFuture f =_minaProtocolSession.write(frame);
- if(wait)
+ WriteFuture f = _minaProtocolSession.write(frame);
+ if (wait)
{
f.join();
}
@@ -269,7 +269,7 @@ public class AMQProtocolSession implements ProtocolVersionList
public void addSessionByChannel(int channelId, AMQSession session)
{
- if (channelId <=0)
+ if (channelId <= 0)
{
throw new IllegalArgumentException("Attempt to register a session with a channel id <= zero");
}
@@ -283,7 +283,7 @@ public class AMQProtocolSession implements ProtocolVersionList
public void removeSessionByChannel(int channelId)
{
- if (channelId <=0)
+ if (channelId <= 0)
{
throw new IllegalArgumentException("Attempt to deregister a session with a channel id <= zero");
}
@@ -299,7 +299,7 @@ public class AMQProtocolSession implements ProtocolVersionList
{
_logger.debug("closeSession called on protocol session for session " + session.getChannelId());
final int channelId = session.getChannelId();
- if (channelId <=0)
+ if (channelId <= 0)
{
throw new IllegalArgumentException("Attempt to close a channel with id < 0");
}