diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2007-02-20 20:22:14 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2007-02-20 20:22:14 +0000 |
| commit | bae5b4dac83c2cc28badf10f2fde659066ec27fe (patch) | |
| tree | 3b67473acaee2cd4c7348d9a7453320b64fff9cf /java/broker/src/main | |
| parent | 06a8d7b5dbbacf9eaff5bb1a788aa48a06df8b8e (diff) | |
| download | qpid-python-bae5b4dac83c2cc28badf10f2fde659066ec27fe.tar.gz | |
Fixed the various Ref modes so that the new MessageRefTest passes all tests.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@509738 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/main')
| -rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java | 39 |
1 files changed, 27 insertions, 12 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 3ab20e74bf..3f0fb26a65 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -140,11 +140,11 @@ public class AMQChannel private MessageHandleFactory _messageHandleFactory = new MessageHandleFactory(); private Set<Long> _browsedAcks = new HashSet<Long>(); - + /** * Used in creating unique references. */ - private byte _refCounter; + private static AtomicLong _refIdCounter = new AtomicLong(); // XXX: clean up arguments public AMQChannel(int channelId, AMQProtocolSession session, MessageStore messageStore, MessageRouter exchanges, AMQMethodListener methodListener) @@ -290,32 +290,44 @@ public class AMQChannel public void addMessageOpen(MessageOpenBody open) throws AMQException { - try { + try + { createReference(open.reference); - } catch (IllegalArgumentException e) { + } + catch (IllegalArgumentException e) + { throw open.getConnectionException(503, "Reference is already open"); } } public void addMessageAppend(MessageAppendBody append) throws AMQException { - try { + try + { AMQReference ref = getReference(append.reference); - ref.appendContent(ByteBuffer.wrap(append.bytes)); - } catch (IllegalArgumentException e) { + if (append.bytes != null) // sending an empty string results in a null + { + ref.appendContent(ByteBuffer.wrap(append.bytes)); + } + } + catch (IllegalArgumentException e) + { throw append.getConnectionException(503, "Reference is not open"); } } public void addMessageClose(MessageCloseBody close) throws AMQException { - try { + try + { AMQReference ref = removeReference(close.reference); for (AMQMessage msg : ref.getMessageList()) { routeCurrentMessage(msg); } - } catch (IllegalArgumentException e) { + } + catch (IllegalArgumentException e) + { throw close.getConnectionException(503, "Reference is not open"); } } @@ -392,8 +404,11 @@ public class AMQChannel _session.writeRequest(_channelId, mtb, listener); } - private synchronized byte[] nextRefId() { - return new byte[]{_refCounter++}; + private synchronized byte[] nextRefId() + { + // clumsy + return String.valueOf(_refIdCounter.incrementAndGet()).getBytes(); + //return new byte[]{_refIdCounter.getAndIncrement()}; } public void deliverRef(AMQMessage msg, AMQShortString destination, AMQMethodListener listener) @@ -471,7 +486,7 @@ public class AMQChannel { throw new ConsumerTagNotUniqueException(); } - + acks = acks; queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal, exclusive); _consumerTag2QueueMap.put(tag, queue); return tag; |
