summaryrefslogtreecommitdiff
path: root/java/broker/src/main
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2007-02-20 20:22:14 +0000
committerKim van der Riet <kpvdr@apache.org>2007-02-20 20:22:14 +0000
commitbae5b4dac83c2cc28badf10f2fde659066ec27fe (patch)
tree3b67473acaee2cd4c7348d9a7453320b64fff9cf /java/broker/src/main
parent06a8d7b5dbbacf9eaff5bb1a788aa48a06df8b8e (diff)
downloadqpid-python-bae5b4dac83c2cc28badf10f2fde659066ec27fe.tar.gz
Fixed the various Ref modes so that the new MessageRefTest passes all tests.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@509738 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/main')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java39
1 files changed, 27 insertions, 12 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 3ab20e74bf..3f0fb26a65 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -140,11 +140,11 @@ public class AMQChannel
private MessageHandleFactory _messageHandleFactory = new MessageHandleFactory();
private Set<Long> _browsedAcks = new HashSet<Long>();
-
+
/**
* Used in creating unique references.
*/
- private byte _refCounter;
+ private static AtomicLong _refIdCounter = new AtomicLong();
// XXX: clean up arguments
public AMQChannel(int channelId, AMQProtocolSession session, MessageStore messageStore, MessageRouter exchanges, AMQMethodListener methodListener)
@@ -290,32 +290,44 @@ public class AMQChannel
public void addMessageOpen(MessageOpenBody open) throws AMQException
{
- try {
+ try
+ {
createReference(open.reference);
- } catch (IllegalArgumentException e) {
+ }
+ catch (IllegalArgumentException e)
+ {
throw open.getConnectionException(503, "Reference is already open");
}
}
public void addMessageAppend(MessageAppendBody append) throws AMQException
{
- try {
+ try
+ {
AMQReference ref = getReference(append.reference);
- ref.appendContent(ByteBuffer.wrap(append.bytes));
- } catch (IllegalArgumentException e) {
+ if (append.bytes != null) // sending an empty string results in a null
+ {
+ ref.appendContent(ByteBuffer.wrap(append.bytes));
+ }
+ }
+ catch (IllegalArgumentException e)
+ {
throw append.getConnectionException(503, "Reference is not open");
}
}
public void addMessageClose(MessageCloseBody close) throws AMQException
{
- try {
+ try
+ {
AMQReference ref = removeReference(close.reference);
for (AMQMessage msg : ref.getMessageList())
{
routeCurrentMessage(msg);
}
- } catch (IllegalArgumentException e) {
+ }
+ catch (IllegalArgumentException e)
+ {
throw close.getConnectionException(503, "Reference is not open");
}
}
@@ -392,8 +404,11 @@ public class AMQChannel
_session.writeRequest(_channelId, mtb, listener);
}
- private synchronized byte[] nextRefId() {
- return new byte[]{_refCounter++};
+ private synchronized byte[] nextRefId()
+ {
+ // clumsy
+ return String.valueOf(_refIdCounter.incrementAndGet()).getBytes();
+ //return new byte[]{_refIdCounter.getAndIncrement()};
}
public void deliverRef(AMQMessage msg, AMQShortString destination, AMQMethodListener listener)
@@ -471,7 +486,7 @@ public class AMQChannel
{
throw new ConsumerTagNotUniqueException();
}
-
+ acks = acks;
queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal, exclusive);
_consumerTag2QueueMap.put(tag, queue);
return tag;