summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2006-09-27 09:09:42 +0000
committerMartin Ritchie <ritchiem@apache.org>2006-09-27 09:09:42 +0000
commit1ba25d9231401f2f34ee41893d402e3cb2f299ed (patch)
treef0effe6896595dd38c0bbc60350b522c5416f480 /java/client
parent6ce702dfb4ea0e1835804efd328be2eee79e23b3 (diff)
downloadqpid-python-1ba25d9231401f2f34ee41893d402e3cb2f299ed.tar.gz
AMQProtocolSession.java - white space changes
BasicMessageProducer.java - white space changes BasicMessageConsumer.java - white space changes AMQSession.java - added a comment MemoryMessageStore.java - white space changes SubscriptionImpl.java AMQChannel.java - Removed race condition where two messages could get the same delivery tag and when using acks where messages can be added to the UnackMap out of sequence, Causing unknown message to ack exceptions. DestNameExchange.java - white space/style changes. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@450384 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/org/apache/qpid/client/AMQSession.java1
-rw-r--r--java/client/src/org/apache/qpid/client/BasicMessageConsumer.java6
-rw-r--r--java/client/src/org/apache/qpid/client/BasicMessageProducer.java25
-rw-r--r--java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java12
4 files changed, 23 insertions, 21 deletions
diff --git a/java/client/src/org/apache/qpid/client/AMQSession.java b/java/client/src/org/apache/qpid/client/AMQSession.java
index 2c59e5f809..4768399036 100644
--- a/java/client/src/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/org/apache/qpid/client/AMQSession.java
@@ -402,6 +402,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
//need to send ack for messages delivered to consumers so far
for(Iterator i = _consumers.values().iterator(); i.hasNext();)
{
+ //Sends acknowledgement to server
((BasicMessageConsumer) i.next()).acknowledgeLastDelivered();
}
diff --git a/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java
index 5d13a1cd41..b46c5f111d 100644
--- a/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/org/apache/qpid/client/BasicMessageConsumer.java
@@ -382,9 +382,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
try
{
AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.deliverBody.deliveryTag,
- messageFrame.deliverBody.redelivered,
- messageFrame.contentHeader,
- messageFrame.bodies);
+ messageFrame.deliverBody.redelivered,
+ messageFrame.contentHeader,
+ messageFrame.bodies);
_logger.debug("Message is of type: " + jmsMessage.getClass().getName());
diff --git a/java/client/src/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/org/apache/qpid/client/BasicMessageProducer.java
index a6bc7a0781..694a4a7863 100644
--- a/java/client/src/org/apache/qpid/client/BasicMessageProducer.java
+++ b/java/client/src/org/apache/qpid/client/BasicMessageProducer.java
@@ -17,13 +17,13 @@
*/
package org.apache.qpid.client;
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.JMSBytesMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.*;
-import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
@@ -122,10 +122,10 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
void resubscribe() throws AMQException
{
- if (_destination != null)
- {
- declareDestination(_destination);
- }
+ if (_destination != null)
+ {
+ declareDestination(_destination);
+ }
}
private void declareDestination(AMQDestination destination)
@@ -330,16 +330,17 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
if (!(destination instanceof AMQDestination))
{
throw new JMSException("Unsupported destination class: " +
- (destination != null?destination.getClass():null));
+ (destination != null ? destination.getClass() : null));
}
declareDestination((AMQDestination)destination);
}
protected void sendImpl(AMQDestination destination, AbstractJMSMessage message, int deliveryMode, int priority,
- long timeToLive, boolean mandatory, boolean immediate) throws JMSException
+ long timeToLive, boolean mandatory, boolean immediate) throws JMSException
{
sendImpl(destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, _waitUntilSent);
}
+
/**
* The caller of this method must hold the failover mutex.
* @param destination
@@ -352,7 +353,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
* @throws JMSException
*/
protected void sendImpl(AMQDestination destination, AbstractJMSMessage message, int deliveryMode, int priority,
- long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException
+ long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException
{
AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId, 0, destination.getExchangeName(),
destination.getRoutingKey(), mandatory, immediate);
@@ -366,10 +367,10 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
//
// Very nasty temporary hack for GRM-206. Will be altered ASAP.
//
- if(message instanceof JMSBytesMessage)
+ if (message instanceof JMSBytesMessage)
{
JMSBytesMessage msg = (JMSBytesMessage) message;
- if(!msg.isReadable())
+ if (!msg.isReadable())
{
msg.reset();
}
@@ -442,7 +443,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
int dataLength = payload.remaining();
final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
int lastFrame = (dataLength % framePayloadMax) > 0 ? 1 : 0;
- int frameCount = (int) (dataLength/framePayloadMax) + lastFrame;
+ int frameCount = (int) (dataLength / framePayloadMax) + lastFrame;
final ContentBody[] bodies = new ContentBody[frameCount];
if (frameCount == 1)
diff --git a/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 77685a0222..b181490fdd 100644
--- a/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -230,7 +230,7 @@ public class AMQProtocolSession implements ProtocolVersionList
}
if (msg.isAllBodyDataReceived())
{
- deliverMessageToAMQSession(channelId, msg);
+ deliverMessageToAMQSession(channelId, msg);
}
}
@@ -260,8 +260,8 @@ public class AMQProtocolSession implements ProtocolVersionList
public void writeFrame(AMQDataBlock frame, boolean wait)
{
- WriteFuture f =_minaProtocolSession.write(frame);
- if(wait)
+ WriteFuture f = _minaProtocolSession.write(frame);
+ if (wait)
{
f.join();
}
@@ -269,7 +269,7 @@ public class AMQProtocolSession implements ProtocolVersionList
public void addSessionByChannel(int channelId, AMQSession session)
{
- if (channelId <=0)
+ if (channelId <= 0)
{
throw new IllegalArgumentException("Attempt to register a session with a channel id <= zero");
}
@@ -283,7 +283,7 @@ public class AMQProtocolSession implements ProtocolVersionList
public void removeSessionByChannel(int channelId)
{
- if (channelId <=0)
+ if (channelId <= 0)
{
throw new IllegalArgumentException("Attempt to deregister a session with a channel id <= zero");
}
@@ -299,7 +299,7 @@ public class AMQProtocolSession implements ProtocolVersionList
{
_logger.debug("closeSession called on protocol session for session " + session.getChannelId());
final int channelId = session.getChannelId();
- if (channelId <=0)
+ if (channelId <= 0)
{
throw new IllegalArgumentException("Attempt to close a channel with id < 0");
}