diff options
| author | Gordon Sim <gsim@apache.org> | 2007-02-20 15:52:04 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2007-02-20 15:52:04 +0000 |
| commit | edfebbfec207f6b4c93933330c4792a8e8798eb7 (patch) | |
| tree | a722b808201bc5d6b598ee54e78c1b1c7a072f6d /java/broker/src | |
| parent | 08b43d86e9fd619cc3c07fbfd5c9d6befe978e8e (diff) | |
| download | qpid-python-edfebbfec207f6b4c93933330c4792a8e8798eb7.tar.gz | |
Some fixes to get more python tests passing.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@509616 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
9 files changed, 119 insertions, 138 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index b2d4215bd0..3ab20e74bf 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -141,6 +141,11 @@ public class AMQChannel private Set<Long> _browsedAcks = new HashSet<Long>(); + /** + * Used in creating unique references. + */ + private byte _refCounter; + // XXX: clean up arguments public AMQChannel(int channelId, AMQProtocolSession session, MessageStore messageStore, MessageRouter exchanges, AMQMethodListener methodListener) { @@ -218,7 +223,7 @@ public class AMQChannel _prefetch_HighWaterMark = prefetchCount; } - public void addMessageTransfer(MessageTransferBody transferBody, AMQProtocolSession publisher) throws AMQException + public void addMessageTransfer(MessageTransferBody transferBody, long requestId, AMQProtocolSession publisher) throws AMQException { Content body = transferBody.getBody(); AMQMessage message; @@ -226,14 +231,20 @@ public class AMQChannel case INLINE_T: message = new AMQMessage(_messageStore, transferBody, Collections.singletonList(body.getContent()), _txnContext); message.setPublisher(publisher); + message.setRequestId(requestId); routeCurrentMessage(message); - message.routingComplete(_messageStore, _storeContext, _messageHandleFactory); break; case REF_T: - AMQReference ref = getReference(body.getContentAsByteArray()); - message = new AMQMessage(_messageStore, transferBody, ref.getContentList(), _txnContext); - message.setPublisher(publisher); - ref.addRefTransferBody(message); + try { + AMQReference ref = getReference(body.getContentAsByteArray()); + message = new AMQMessage(_messageStore, transferBody, ref.getContentList(), _txnContext); + message.setPublisher(publisher); + message.setRequestId(requestId); + ref.addRefTransferBody(message); + } catch (IllegalArgumentException e) { + throw transferBody.getConnectionException(503, "Reference is not open"); + } + break; } } @@ -277,24 +288,35 @@ public class AMQChannel return ref; } - public void addMessageOpen(MessageOpenBody open) + public void addMessageOpen(MessageOpenBody open) throws AMQException { - createReference(open.reference); + try { + createReference(open.reference); + } catch (IllegalArgumentException e) { + throw open.getConnectionException(503, "Reference is already open"); + } } - public void addMessageAppend(MessageAppendBody append) + public void addMessageAppend(MessageAppendBody append) throws AMQException { - AMQReference ref = getReference(append.reference); - ref.appendContent(ByteBuffer.wrap(append.bytes)); + try { + AMQReference ref = getReference(append.reference); + ref.appendContent(ByteBuffer.wrap(append.bytes)); + } catch (IllegalArgumentException e) { + throw append.getConnectionException(503, "Reference is not open"); + } } public void addMessageClose(MessageCloseBody close) throws AMQException { - AMQReference ref = removeReference(close.reference); - for (AMQMessage msg : ref.getMessageList()) - { - routeCurrentMessage(msg); - msg.routingComplete(_messageStore, _storeContext, _messageHandleFactory); + try { + AMQReference ref = removeReference(close.reference); + for (AMQMessage msg : ref.getMessageList()) + { + routeCurrentMessage(msg); + } + } catch (IllegalArgumentException e) { + throw close.getConnectionException(503, "Reference is not open"); } } @@ -308,38 +330,18 @@ public class AMQChannel { _returnMessages.add(e); } + msg.routingComplete(_messageStore, _storeContext, _messageHandleFactory); + + MessageOkBody ok = MessageOkBody.createMethodBody( + _session.getProtocolMajorVersion(), + _session.getProtocolMinorVersion() + ); + _session.writeResponse(_channelId, msg.getRequestId(), ok); } public void deliver(AMQMessage msg, AMQShortString destination, final long deliveryTag) { - // Do we need to refactor the content for a different frame size? - long maxFrameSize = _session.getFrameMax(); - Iterable<ByteBuffer> contentItr = msg.getContents(); - if (msg.getSize() > maxFrameSize) - { - Iterator<ByteBuffer> cItr = contentItr.iterator(); - if (cItr.next().limit() > maxFrameSize) // First chunk should equal incoming frame size - { - // TODO - Refactor the chunks for smaller outbound frame size - throw new Error("XXX TODO - need to refactor content chunks here"); - // deliverRef(msg, destination, deliveryTag); - } - else - { - // Use ref content as is - no need to refactor - deliverRef(msg, destination, deliveryTag); - } - } - else - { - // Concatenate - all incoming chunks will fit into single outbound frame - deliverInline(msg, destination, deliveryTag); - } - } - - public void deliverInline(AMQMessage msg, AMQShortString destination, final long deliveryTag) - { - deliverInline(msg, destination, new AMQMethodListener() + AMQMethodListener listener = new AMQMethodListener() { public boolean methodReceived(AMQMethodEvent evt) throws AMQException { @@ -361,9 +363,20 @@ public class AMQChannel } } public void error(Exception e) {} - }); + }; + long maxFrameSize = _session.getFrameMax(); + if (msg.getFullSize() > maxFrameSize) + { + //need to send as reference + deliverRef(msg, destination, listener); + } + else + { + //message will fit inline + deliverInline(msg, destination, listener); + } } - + public void deliverInline(AMQMessage msg, AMQShortString destination, AMQMethodListener listener) { MessageTransferBody mtb = msg.getTransferBody().copy(); @@ -378,64 +391,37 @@ public class AMQChannel mtb.body = new Content(Content.TypeEnum.INLINE_T, buf); _session.writeRequest(_channelId, mtb, listener); } - - public void deliverRef(final AMQMessage msg, final AMQShortString destination, final long deliveryTag) - { - final byte[] refId = String.valueOf(System.currentTimeMillis()).getBytes(); - deliverRef(refId, msg, destination, new AMQMethodListener() - { - public boolean methodReceived(AMQMethodEvent evt) throws AMQException - { - AMQMethodBody method = evt.getMethod(); - if (_log.isDebugEnabled()) - { - _log.debug(method + " received on channel " + _channelId); - } - // XXX: multiple? - if (method instanceof MessageOkBody) - { - acknowledgeMessage(deliveryTag, false); - return true; - } - else - { - // TODO: implement reject - return false; - } - } - public void error(Exception e) {} - }); + + private synchronized byte[] nextRefId() { + return new byte[]{_refCounter++}; } - public void deliverRef(byte[] refId, AMQMessage msg, AMQShortString destination, AMQMethodListener listener) + public void deliverRef(AMQMessage msg, AMQShortString destination, AMQMethodListener listener) { - AMQMethodBody openBody = MessageOpenBody.createMethodBody( - _session.getProtocolMajorVersion(), // AMQP major version - _session.getProtocolMinorVersion(), // AMQP minor version - refId); - _session.writeRequest(_channelId, openBody, listener); + AMQMethodListener dummy = new AMQMethodListener() + { + public boolean methodReceived(AMQMethodEvent evt){ return true; } + public void error(Exception e) {} + }; + byte major = _session.getProtocolMajorVersion(); + byte minor = _session.getProtocolMinorVersion(); + byte[] refId = nextRefId(); + _session.writeRequest(_channelId, MessageOpenBody.createMethodBody(major, minor, refId), dummy); MessageTransferBody mtb = msg.getTransferBody().copy(); mtb.destination = destination; - mtb.redelivered = msg.isRedelivered(); mtb.body = new Content(Content.TypeEnum.REF_T, refId); _session.writeRequest(_channelId, mtb, listener); - for (ByteBuffer bb : msg.getContents()) - { - ByteBuffer dup = bb.duplicate(); - byte[] ba = new byte[dup.limit()]; - dup.get(ba); - AMQMethodBody appendBody = MessageAppendBody.createMethodBody( - _session.getProtocolMajorVersion(), // AMQP major version - _session.getProtocolMinorVersion(), // AMQP minor version - ba, - refId); - _session.writeRequest(_channelId, appendBody, listener); + for (ByteBuffer buffer : msg.getContents()) + { + //TODO: try and avoid all this copying! + while (buffer.remaining() > 0) + { + byte[] data = new byte[Math.min((int) _session.getFrameMax(), buffer.remaining())]; + buffer.get(data); + _session.writeRequest(_channelId, MessageAppendBody.createMethodBody(major, minor, data, refId), dummy); + } } - AMQMethodBody closeBody = MessageCloseBody.createMethodBody( - _session.getProtocolMajorVersion(), // AMQP major version - _session.getProtocolMinorVersion(), // AMQP minor version - refId); - _session.writeRequest(_channelId, closeBody, listener); + _session.writeRequest(_channelId, MessageCloseBody.createMethodBody(major, minor, refId), dummy); } public RequestManager getRequestManager() @@ -554,6 +540,7 @@ public class AMQChannel for (UnacknowledgedMessage unacked : messagesToBeDelivered) { + unacked.message.setRedelivered(true); if (unacked.queue != null) { _txnContext.deliver(unacked.message, unacked.queue); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java index a29552de03..2cf69bf27d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java @@ -66,7 +66,7 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener<Con } else { - virtualHostName = String.valueOf(body.virtualHost); + virtualHostName = body.virtualHost == null ? null : String.valueOf(body.virtualHost); } VirtualHost virtualHost = stateManager.getVirtualHostRegistry().getVirtualHost(virtualHostName); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java index cab8a7d0ad..987420f88f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java @@ -24,6 +24,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.MessageQosBody; import org.apache.qpid.framing.MessageOkBody; import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; @@ -46,7 +47,9 @@ public class MessageQosHandler implements StateAwareMethodListener<MessageQosBod public void methodReceived (AMQStateManager stateManager, AMQMethodEvent<MessageQosBody> evt) throws AMQException { AMQProtocolSession session = stateManager.getProtocolSession(); - session.getChannel(evt.getChannelId()).setPrefetchCount(evt.getMethod().prefetchCount); + AMQChannel channel = session.getChannel(evt.getChannelId()); + channel.setPrefetchCount(evt.getMethod().prefetchCount); + channel.setPrefetchSize(evt.getMethod().prefetchSize); // Be aware of possible changes to parameter order as versions change. session.writeResponse(evt.getChannelId(), evt.getRequestId(), MessageOkBody.createMethodBody( session.getProtocolMajorVersion(), // AMQP major version diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java index 782909570d..e87b53234e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java @@ -78,10 +78,10 @@ public class MessageTransferHandler implements StateAwareMethodListener<MessageT // is stored in the channel. Once the final body frame has been received // it is routed to the exchange. AMQChannel channel = session.getChannel(evt.getChannelId()); - channel.addMessageTransfer(body, session); - session.writeResponse(evt, MessageOkBody.createMethodBody( - session.getProtocolMajorVersion(), // AMQP major version - session.getProtocolMinorVersion())); // AMQP minor version + channel.addMessageTransfer(body, evt.getRequestId(), session); + //session.writeResponse(evt, MessageOkBody.createMethodBody( + // session.getProtocolMajorVersion(), // AMQP major version + // session.getProtocolMinorVersion())); // AMQP minor version } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 58461e8f1b..55edea4bbb 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -610,8 +610,6 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, task.doTask(this); } } -// gsim-python -// _minaProtocolSession.close(); } /** diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java index 4571da502e..7b367ad50c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java @@ -168,6 +168,7 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoco // gsim-python //session.closeSessionRequest(200, new AMQShortString(throwable.getMessage())); session.closeSession(); + protocolSession.close(); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index d433dd6bea..35b779da25 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -80,8 +80,6 @@ public class AMQMessage } }; - private boolean _redelivered; - private final Long _messageId; private final AtomicInteger _referenceCount = new AtomicInteger(1); @@ -119,6 +117,7 @@ public class AMQMessage private boolean _deliveredToConsumer; private AtomicBoolean _taken = new AtomicBoolean(false); + private long _requestId;//the request id of the transfer that this message represents public AMQMessage(MessageStore messageStore, MessageTransferBody transferBody, TransactionalContext txnContext) { @@ -160,6 +159,16 @@ public class AMQMessage public long getSize() { + //based on existing usage, this should return the size of the + //data and inline data will already be included in the count + //by getBodySize() + return getBodySize(); + } + + public long getFullSize() + { + //this is used in determining whether a message can be inlined + //or not and therefore must include the header size also return getHeaderSize() + getBodySize(); } @@ -300,11 +309,11 @@ public class AMQMessage _transferBody.priority = priority; } - // TODO - how does this relate to the _redelivered flag in this class? See other isRedelivered() method below. -// public boolean isRedelivered() -// { -// return _transferBody.getRedelivered(); -// } + + public boolean isRedelivered() + { + return _transferBody.getRedelivered(); + } public AMQShortString getReplyTo() { @@ -406,12 +415,6 @@ public class AMQMessage //return _bodyLengthReceived == _contentHeaderBody.bodySize; } - - public boolean isRedelivered() - { - return _redelivered; - } - NoConsumersException getNoConsumersException(String queue) { return new NoConsumersException(queue, this); @@ -420,7 +423,6 @@ public class AMQMessage public void setRedelivered(boolean redelivered) { _transferBody.redelivered = redelivered; - _redelivered = redelivered; } public long getMessageId() @@ -638,4 +640,14 @@ public class AMQMessage throw new Error("XXX"); } + public void setRequestId(long requestId) + { + _requestId = requestId; + } + + public long getRequestId() + { + return _requestId; + } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index a7c4b5ca19..2f128a3e3e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -371,25 +371,6 @@ public class AMQQueue implements Managable, Comparable setExclusive(true); } - if(incrementSubscriberCount() > 1) - { - if(isExclusive()) - { - decrementSubscriberCount(); - throw EXISTING_EXCLUSIVE; - } - else if(exclusive) - { - decrementSubscriberCount(); - throw EXISTING_SUBSCRIPTION; - } - - } - else if(exclusive) - { - setExclusive(true); - } - debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this); Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 685015226c..fcbcc2f09a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -255,7 +255,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager while (msg != null) { msg.dequeue(storeContext, _queue); - count++; _totalMessageSize.set(0L); count++; msg = poll(); |
