From edfebbfec207f6b4c93933330c4792a8e8798eb7 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Tue, 20 Feb 2007 15:52:04 +0000 Subject: Some fixes to get more python tests passing. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@509616 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/server/AMQChannel.java | 179 ++++++++++----------- .../handler/ConnectionOpenMethodHandler.java | 2 +- .../qpid/server/handler/MessageQosHandler.java | 5 +- .../server/handler/MessageTransferHandler.java | 8 +- .../server/protocol/AMQMinaProtocolSession.java | 2 - .../server/protocol/AMQPFastProtocolHandler.java | 1 + .../org/apache/qpid/server/queue/AMQMessage.java | 40 +++-- .../org/apache/qpid/server/queue/AMQQueue.java | 19 --- .../queue/ConcurrentSelectorDeliveryManager.java | 1 - 9 files changed, 119 insertions(+), 138 deletions(-) (limited to 'java/broker/src') diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index b2d4215bd0..3ab20e74bf 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -141,6 +141,11 @@ public class AMQChannel private Set _browsedAcks = new HashSet(); + /** + * Used in creating unique references. + */ + private byte _refCounter; + // XXX: clean up arguments public AMQChannel(int channelId, AMQProtocolSession session, MessageStore messageStore, MessageRouter exchanges, AMQMethodListener methodListener) { @@ -218,7 +223,7 @@ public class AMQChannel _prefetch_HighWaterMark = prefetchCount; } - public void addMessageTransfer(MessageTransferBody transferBody, AMQProtocolSession publisher) throws AMQException + public void addMessageTransfer(MessageTransferBody transferBody, long requestId, AMQProtocolSession publisher) throws AMQException { Content body = transferBody.getBody(); AMQMessage message; @@ -226,14 +231,20 @@ public class AMQChannel case INLINE_T: message = new AMQMessage(_messageStore, transferBody, Collections.singletonList(body.getContent()), _txnContext); message.setPublisher(publisher); + message.setRequestId(requestId); routeCurrentMessage(message); - message.routingComplete(_messageStore, _storeContext, _messageHandleFactory); break; case REF_T: - AMQReference ref = getReference(body.getContentAsByteArray()); - message = new AMQMessage(_messageStore, transferBody, ref.getContentList(), _txnContext); - message.setPublisher(publisher); - ref.addRefTransferBody(message); + try { + AMQReference ref = getReference(body.getContentAsByteArray()); + message = new AMQMessage(_messageStore, transferBody, ref.getContentList(), _txnContext); + message.setPublisher(publisher); + message.setRequestId(requestId); + ref.addRefTransferBody(message); + } catch (IllegalArgumentException e) { + throw transferBody.getConnectionException(503, "Reference is not open"); + } + break; } } @@ -277,24 +288,35 @@ public class AMQChannel return ref; } - public void addMessageOpen(MessageOpenBody open) + public void addMessageOpen(MessageOpenBody open) throws AMQException { - createReference(open.reference); + try { + createReference(open.reference); + } catch (IllegalArgumentException e) { + throw open.getConnectionException(503, "Reference is already open"); + } } - public void addMessageAppend(MessageAppendBody append) + public void addMessageAppend(MessageAppendBody append) throws AMQException { - AMQReference ref = getReference(append.reference); - ref.appendContent(ByteBuffer.wrap(append.bytes)); + try { + AMQReference ref = getReference(append.reference); + ref.appendContent(ByteBuffer.wrap(append.bytes)); + } catch (IllegalArgumentException e) { + throw append.getConnectionException(503, "Reference is not open"); + } } public void addMessageClose(MessageCloseBody close) throws AMQException { - AMQReference ref = removeReference(close.reference); - for (AMQMessage msg : ref.getMessageList()) - { - routeCurrentMessage(msg); - msg.routingComplete(_messageStore, _storeContext, _messageHandleFactory); + try { + AMQReference ref = removeReference(close.reference); + for (AMQMessage msg : ref.getMessageList()) + { + routeCurrentMessage(msg); + } + } catch (IllegalArgumentException e) { + throw close.getConnectionException(503, "Reference is not open"); } } @@ -308,38 +330,18 @@ public class AMQChannel { _returnMessages.add(e); } + msg.routingComplete(_messageStore, _storeContext, _messageHandleFactory); + + MessageOkBody ok = MessageOkBody.createMethodBody( + _session.getProtocolMajorVersion(), + _session.getProtocolMinorVersion() + ); + _session.writeResponse(_channelId, msg.getRequestId(), ok); } public void deliver(AMQMessage msg, AMQShortString destination, final long deliveryTag) { - // Do we need to refactor the content for a different frame size? - long maxFrameSize = _session.getFrameMax(); - Iterable contentItr = msg.getContents(); - if (msg.getSize() > maxFrameSize) - { - Iterator cItr = contentItr.iterator(); - if (cItr.next().limit() > maxFrameSize) // First chunk should equal incoming frame size - { - // TODO - Refactor the chunks for smaller outbound frame size - throw new Error("XXX TODO - need to refactor content chunks here"); - // deliverRef(msg, destination, deliveryTag); - } - else - { - // Use ref content as is - no need to refactor - deliverRef(msg, destination, deliveryTag); - } - } - else - { - // Concatenate - all incoming chunks will fit into single outbound frame - deliverInline(msg, destination, deliveryTag); - } - } - - public void deliverInline(AMQMessage msg, AMQShortString destination, final long deliveryTag) - { - deliverInline(msg, destination, new AMQMethodListener() + AMQMethodListener listener = new AMQMethodListener() { public boolean methodReceived(AMQMethodEvent evt) throws AMQException { @@ -361,9 +363,20 @@ public class AMQChannel } } public void error(Exception e) {} - }); + }; + long maxFrameSize = _session.getFrameMax(); + if (msg.getFullSize() > maxFrameSize) + { + //need to send as reference + deliverRef(msg, destination, listener); + } + else + { + //message will fit inline + deliverInline(msg, destination, listener); + } } - + public void deliverInline(AMQMessage msg, AMQShortString destination, AMQMethodListener listener) { MessageTransferBody mtb = msg.getTransferBody().copy(); @@ -378,64 +391,37 @@ public class AMQChannel mtb.body = new Content(Content.TypeEnum.INLINE_T, buf); _session.writeRequest(_channelId, mtb, listener); } - - public void deliverRef(final AMQMessage msg, final AMQShortString destination, final long deliveryTag) - { - final byte[] refId = String.valueOf(System.currentTimeMillis()).getBytes(); - deliverRef(refId, msg, destination, new AMQMethodListener() - { - public boolean methodReceived(AMQMethodEvent evt) throws AMQException - { - AMQMethodBody method = evt.getMethod(); - if (_log.isDebugEnabled()) - { - _log.debug(method + " received on channel " + _channelId); - } - // XXX: multiple? - if (method instanceof MessageOkBody) - { - acknowledgeMessage(deliveryTag, false); - return true; - } - else - { - // TODO: implement reject - return false; - } - } - public void error(Exception e) {} - }); + + private synchronized byte[] nextRefId() { + return new byte[]{_refCounter++}; } - public void deliverRef(byte[] refId, AMQMessage msg, AMQShortString destination, AMQMethodListener listener) + public void deliverRef(AMQMessage msg, AMQShortString destination, AMQMethodListener listener) { - AMQMethodBody openBody = MessageOpenBody.createMethodBody( - _session.getProtocolMajorVersion(), // AMQP major version - _session.getProtocolMinorVersion(), // AMQP minor version - refId); - _session.writeRequest(_channelId, openBody, listener); + AMQMethodListener dummy = new AMQMethodListener() + { + public boolean methodReceived(AMQMethodEvent evt){ return true; } + public void error(Exception e) {} + }; + byte major = _session.getProtocolMajorVersion(); + byte minor = _session.getProtocolMinorVersion(); + byte[] refId = nextRefId(); + _session.writeRequest(_channelId, MessageOpenBody.createMethodBody(major, minor, refId), dummy); MessageTransferBody mtb = msg.getTransferBody().copy(); mtb.destination = destination; - mtb.redelivered = msg.isRedelivered(); mtb.body = new Content(Content.TypeEnum.REF_T, refId); _session.writeRequest(_channelId, mtb, listener); - for (ByteBuffer bb : msg.getContents()) - { - ByteBuffer dup = bb.duplicate(); - byte[] ba = new byte[dup.limit()]; - dup.get(ba); - AMQMethodBody appendBody = MessageAppendBody.createMethodBody( - _session.getProtocolMajorVersion(), // AMQP major version - _session.getProtocolMinorVersion(), // AMQP minor version - ba, - refId); - _session.writeRequest(_channelId, appendBody, listener); + for (ByteBuffer buffer : msg.getContents()) + { + //TODO: try and avoid all this copying! + while (buffer.remaining() > 0) + { + byte[] data = new byte[Math.min((int) _session.getFrameMax(), buffer.remaining())]; + buffer.get(data); + _session.writeRequest(_channelId, MessageAppendBody.createMethodBody(major, minor, data, refId), dummy); + } } - AMQMethodBody closeBody = MessageCloseBody.createMethodBody( - _session.getProtocolMajorVersion(), // AMQP major version - _session.getProtocolMinorVersion(), // AMQP minor version - refId); - _session.writeRequest(_channelId, closeBody, listener); + _session.writeRequest(_channelId, MessageCloseBody.createMethodBody(major, minor, refId), dummy); } public RequestManager getRequestManager() @@ -554,6 +540,7 @@ public class AMQChannel for (UnacknowledgedMessage unacked : messagesToBeDelivered) { + unacked.message.setRedelivered(true); if (unacked.queue != null) { _txnContext.deliver(unacked.message, unacked.queue); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java index a29552de03..2cf69bf27d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java @@ -66,7 +66,7 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener evt) throws AMQException { AMQProtocolSession session = stateManager.getProtocolSession(); - session.getChannel(evt.getChannelId()).setPrefetchCount(evt.getMethod().prefetchCount); + AMQChannel channel = session.getChannel(evt.getChannelId()); + channel.setPrefetchCount(evt.getMethod().prefetchCount); + channel.setPrefetchSize(evt.getMethod().prefetchSize); // Be aware of possible changes to parameter order as versions change. session.writeResponse(evt.getChannelId(), evt.getRequestId(), MessageOkBody.createMethodBody( session.getProtocolMajorVersion(), // AMQP major version diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java index 782909570d..e87b53234e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java @@ -78,10 +78,10 @@ public class MessageTransferHandler implements StateAwareMethodListener 1) - { - if(isExclusive()) - { - decrementSubscriberCount(); - throw EXISTING_EXCLUSIVE; - } - else if(exclusive) - { - decrementSubscriberCount(); - throw EXISTING_SUBSCRIPTION; - } - - } - else if(exclusive) - { - setExclusive(true); - } - debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this); Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 685015226c..fcbcc2f09a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -255,7 +255,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager while (msg != null) { msg.dequeue(storeContext, _queue); - count++; _totalMessageSize.set(0L); count++; msg = poll(); -- cgit v1.2.1