diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2007-01-18 01:28:44 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2007-01-18 01:28:44 +0000 |
| commit | 762cba1e7db15cb3c9e987a9edcbf3106c7244cb (patch) | |
| tree | 3b1d2f4cd0f49f5148925b558f4c47a077a43652 /java | |
| parent | ffd69bc96e35142f380b86237fb85ee274e65057 (diff) | |
| download | qpid-python-762cba1e7db15cb3c9e987a9edcbf3106c7244cb.tar.gz | |
filled out consume and transfer handlers
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@497278 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
8 files changed, 286 insertions, 79 deletions
diff --git a/java/broker/etc/log4j.xml b/java/broker/etc/log4j.xml index 98da18b8bb..0cfabf8643 100644 --- a/java/broker/etc/log4j.xml +++ b/java/broker/etc/log4j.xml @@ -41,8 +41,12 @@ <priority value="debug"/> </category> + <category name="org.apache.qpid.server.handler"> + <priority value="debug"/> + </category> + <root> - <priority value="info"/> + <priority value="debug"/> <appender-ref ref="STDOUT"/> <appender-ref ref="FileAppender"/> </root> diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 59ebc08428..6b9ceff053 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -20,15 +20,20 @@ */ package org.apache.qpid.server; +import org.apache.qpid.framing.Content; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.Content; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.MessageAppendBody; +import org.apache.qpid.framing.MessageCloseBody; +import org.apache.qpid.framing.MessageOpenBody; +import org.apache.qpid.framing.MessageTransferBody; import org.apache.qpid.framing.RequestManager; import org.apache.qpid.framing.ResponseManager; -import org.apache.qpid.protocol.AMQProtocolWriter; import org.apache.qpid.protocol.AMQMethodListener; +import org.apache.qpid.protocol.AMQProtocolWriter; import org.apache.qpid.server.ack.TxAck; import org.apache.qpid.server.ack.UnacknowledgedMessage; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; @@ -66,7 +71,7 @@ public class AMQChannel private long _prefetch_HighWaterMark; private long _prefetch_LowWaterMark; - + private RequestManager _requestManager; private ResponseManager _responseManager; @@ -89,11 +94,16 @@ public class AMQChannel private int _consumerTag; /** - * The current message - which may be partial in the sense that not all frames have been received yet - - * which has been received by this channel. As the frames are received the message gets updated and once all + * The set of current messages - which may be partial in the sense that not all frames have been received yet - + * which has been received by this channel. As the frames are received the references get updated and once all * frames have been received the message can then be routed. */ - private AMQMessage _currentMessage; + private Map<String, List<AMQMessage>> _messages = new LinkedHashMap(); + + /** + * The set of open references on this channel. + */ + private Map<String, List<MessageAppendBody>> _references = new LinkedHashMap(); /** * Maps from consumer tag to queue instance. Allows us to unsubscribe from a queue. @@ -177,55 +187,80 @@ public class AMQChannel _prefetch_HighWaterMark = prefetchCount; } -// public void setPublishFrame(BasicPublishBody publishBody, AMQProtocolSession publisher) throws AMQException -// { -// _currentMessage = new AMQMessage(_messageStore, publishBody); -// _currentMessage.setPublisher(publisher); -// } - -// public void publishContentHeader(ContentHeaderBody contentHeaderBody) -// throws AMQException -// { -// if (_currentMessage == null) -// { -// throw new AMQException("Received content header without previously receiving a BasicDeliver frame"); -// } -// else -// { -// _currentMessage.setContentHeaderBody(contentHeaderBody); -// // check and route if header says body length is zero -// if (contentHeaderBody.bodySize == 0) -// { -// routeCurrentMessage(); -// } -// } -// } -// -// public void publishContentBody(ContentBody contentBody) -// throws AMQException -// { -// if (_currentMessage == null) -// { -// throw new AMQException("Received content body without previously receiving a JmsPublishBody"); -// } -// if (_currentMessage.getContentHeaderBody() == null) -// { -// throw new AMQException("Received content body without previously receiving a content header"); -// } -// -// _currentMessage.addContentBodyFrame(contentBody); -// if (_currentMessage.isAllContentReceived()) -// { -// routeCurrentMessage(); -// } -// } - - protected void routeCurrentMessage() throws AMQException + public void addMessageTransfer(MessageTransferBody transferBody, AMQProtocolSession publisher) throws AMQException + { + AMQMessage message = new AMQMessage(_messageStore, transferBody); + message.setPublisher(publisher); + Content body = transferBody.getBody(); + switch (body.getContentType()) { + case CONTENT_TYPE_INLINE: + route(message); + break; + case CONTENT_TYPE_REFERENCE: + getMessages(body.getContent()).add(message); + break; + } + } + + private List<AMQMessage> getMessages(byte[] reference) { + String key = new String(reference); + List<AMQMessage> result = _messages.get(key); + if (result == null) { + throw new IllegalArgumentException(key); + } + return result; + } + + private List<MessageAppendBody> getReference(byte[] reference) { + String key = new String(reference); + List<MessageAppendBody> result = _references.get(key); + if (result == null) { + throw new IllegalArgumentException(key); + } + return result; + } + + private void createReference(byte[] reference) { + String key = new String(reference); + if (_references.containsKey(key)) { + throw new IllegalArgumentException(key); + } else { + _references.put(key, new LinkedList()); + _messages.put(key, new LinkedList()); + } + } + + private void clearReference(byte[] reference) { + String key = new String(reference); + _references.remove(key); + _messages.remove(key); + } + + public void addMessageOpen(MessageOpenBody open) { + createReference(open.reference); + } + + public void addMessageAppend(MessageAppendBody append) { + getReference(append.reference).add(append); + } + + public void addMessageClose(MessageCloseBody close) throws AMQException { + List<AMQMessage> messages = getMessages(close.reference); + try { + for (AMQMessage msg : messages) { + route(msg); + } + } finally { + clearReference(close.reference); + } + } + + protected void route(AMQMessage msg) throws AMQException { if (_transactional) { //don't create a transaction unless needed - if (_currentMessage.isPersistent()) + if (msg.isPersistent()) { _txnBuffer.containsPersistentChanges(); } @@ -236,13 +271,13 @@ public class AMQChannel //be added for every queue onto which the message is //enqueued. Finally a cleanup op will be added to decrement //the reference associated with the routing. - Store storeOp = new Store(_currentMessage); + Store storeOp = new Store(msg); _txnBuffer.enlist(storeOp); - _currentMessage.setTxnBuffer(_txnBuffer); + msg.setTxnBuffer(_txnBuffer); try { - _exchanges.routeContent(_currentMessage); - _txnBuffer.enlist(new Cleanup(_currentMessage)); + _exchanges.routeContent(msg); + _txnBuffer.enlist(new Cleanup(msg)); } catch (RequiredDeliveryException e) { @@ -255,33 +290,28 @@ public class AMQChannel _txnBuffer.cancel(storeOp); throw e; } - finally - { - _currentMessage = null; - } } else { try { - _exchanges.routeContent(_currentMessage); + _exchanges.routeContent(msg); //following check implements the functionality //required by the 'immediate' flag: - _currentMessage.checkDeliveredToConsumer(); + msg.checkDeliveredToConsumer(); } finally { - _currentMessage.decrementReference(); - _currentMessage = null; + msg.decrementReference(); } } } - + public RequestManager getRequestManager() { return _requestManager; } - + public ResponseManager getResponseManager() { return _responseManager; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java index c019a3b043..a84fa4ff4a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java @@ -20,17 +20,29 @@ */ package org.apache.qpid.server.handler; +import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQInvalidSelectorException; +import org.apache.qpid.framing.ChannelCloseBody; +import org.apache.qpid.framing.ConnectionCloseBody; import org.apache.qpid.framing.MessageConsumeBody; +import org.apache.qpid.framing.MessageOkBody; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQMethodListener; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.ConsumerTagNotUniqueException; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; public class MessageConsumeHandler implements StateAwareMethodListener<MessageConsumeBody> { + private static final Logger _log = Logger.getLogger(MessageConsumeHandler.class); + private static MessageConsumeHandler _instance = new MessageConsumeHandler(); public static MessageConsumeHandler getInstance() @@ -39,16 +51,105 @@ public class MessageConsumeHandler implements StateAwareMethodListener<MessageCo } private MessageConsumeHandler() {} - - + + public void methodReceived (AMQStateManager stateManager, - QueueRegistry queueRegistry, + QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, - AMQProtocolSession protocolSession, + AMQProtocolSession session, AMQMethodEvent<MessageConsumeBody> evt) throws AMQException { - // TODO + MessageConsumeBody body = evt.getMethod(); + final int channelId = evt.getChannelId(); + + AMQChannel channel = session.getChannel(channelId); + if (channel == null) + { + _log.error("Channel " + channelId + " not found"); + // TODO: either alert or error that the + } + else + { + AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : queueRegistry.getQueue(body.queue); + + if (queue == null) + { + _log.info("No queue for '" + body.queue + "'"); + if(body.queue!=null) + { + channelClose(session, channelId, stateManager, + "No such queue, '" + body.queue + "'", AMQConstant.NOT_FOUND); + } + else + { + connectionClose(session, channelId, stateManager, + "No queue name provided, no default queue defined.", + AMQConstant.NOT_ALLOWED); + } + } + else + { + try + { + /*AMQShort*/String destination = channel.subscribeToQueue + (body.destination, queue, session, !body.noAck, /*XXX*/null, body.noLocal); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + session.writeResponse(evt, MessageOkBody.createMethodBody((byte)0, (byte)9)); + + //now allow queue to start async processing of any backlog of messages + queue.deliverAsync(); + } + catch (AMQInvalidSelectorException ise) + { + _log.info("Closing connection due to invalid selector"); + channelClose(session, channelId, stateManager, ise.getMessage(), AMQConstant.INVALID_SELECTOR); + } + catch (ConsumerTagNotUniqueException e) + { + connectionClose(session, channelId, stateManager, + "Non-unique consumer tag, '" + body.destination + "'", + AMQConstant.NOT_ALLOWED); + } + } + } } + + private void channelClose(AMQProtocolSession session, int channelId, AMQMethodListener listener, + String message, AMQConstant code) + throws AMQException + { + /*AMQShort*/String msg = new /*AMQShort*/String(message); + // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + session.writeRequest(channelId, ChannelCloseBody.createMethodBody + ((byte)0, (byte)9, // AMQP version (major, minor) + MessageConsumeBody.getClazz((byte)0, (byte)9), // classId + MessageConsumeBody.getMethod((byte)0, (byte)9), // methodId + code.getCode(), // replyCode + msg), // replyText + listener); + } + + private void connectionClose(AMQProtocolSession session, int channelId, AMQMethodListener listener, + String message, AMQConstant code) + throws AMQException + { + /*AMQShort*/String msg = new /*AMQShort*/String(message); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + session.writeRequest(channelId, ConnectionCloseBody.createMethodBody + ((byte)0, (byte)9, // AMQP version (major, minor) + MessageConsumeBody.getClazz((byte)0, (byte)9), // classId + MessageConsumeBody.getMethod((byte)0, (byte)9), // methodId + code.getCode(), // replyCode + msg), // replyText + listener); + } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java index 4f6886e885..58160611d1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java @@ -20,9 +20,15 @@ */ package org.apache.qpid.server.handler; +import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.ChannelCloseBody; import org.apache.qpid.framing.MessageTransferBody; import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; @@ -31,6 +37,8 @@ import org.apache.qpid.server.state.StateAwareMethodListener; public class MessageTransferHandler implements StateAwareMethodListener<MessageTransferBody> { + private static final Logger _log = Logger.getLogger(MessageTransferHandler.class); + private static MessageTransferHandler _instance = new MessageTransferHandler(); public static MessageTransferHandler getInstance() @@ -38,9 +46,10 @@ public class MessageTransferHandler implements StateAwareMethodListener<MessageT return _instance; } + private static final String UNKNOWN_EXCHANGE_NAME = "Unknown exchange name"; + private MessageTransferHandler() {} - - + public void methodReceived (AMQStateManager stateManager, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, @@ -48,7 +57,39 @@ public class MessageTransferHandler implements StateAwareMethodListener<MessageT AMQMethodEvent<MessageTransferBody> evt) throws AMQException { - // TODO + final MessageTransferBody body = evt.getMethod(); + + if (_log.isDebugEnabled()) { + _log.debug("Publish received on channel " + evt.getChannelId()); + } + + // TODO: check the delivery tag field details - is it unique across the broker or per subscriber? + if (body.exchange == null) { + body.exchange = ExchangeDefaults.DIRECT_EXCHANGE_NAME; + } + Exchange e = exchangeRegistry.getExchange(body.exchange); + // if the exchange does not exist we raise a channel exception + if (e == null) { + protocolSession.closeChannel(evt.getChannelId()); + // TODO: modify code gen to make getClazz and getMethod public methods rather than protected + // then we can remove the hardcoded 0,0 + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + AMQMethodBody cf = ChannelCloseBody.createMethodBody + ((byte)8, (byte)0, // AMQP version (major, minor) + MessageTransferBody.getClazz((byte)0, (byte)9), // classId + MessageTransferBody.getMethod((byte)0, (byte)9), // methodId + 500, // replyCode + UNKNOWN_EXCHANGE_NAME); // replyText + protocolSession.writeRequest(evt.getChannelId(), cf, stateManager); + } else { + // The partially populated BasicDeliver frame plus the received route body + // is stored in the channel. Once the final body frame has been received + // it is routed to the exchange. + AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); + channel.addMessageTransfer(body, protocolSession); + } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index 3f0cf63b70..e0457748f4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -132,7 +132,38 @@ public class AMQMessage } public long getSize() { - throw new Error("XXX"); + return getHeaderSize() + getBodySize(); + } + + public int getHeaderSize() { + int size = _transferBody.getBodySize(); + Content body = _transferBody.getBody(); + switch (body.getContentType()) { + case CONTENT_TYPE_INLINE: + size -= _transferBody.getBody().getEncodedSize(); + break; + } + return size; + } + + public long getBodySize() { + Content body = _transferBody.getBody(); + switch (body.getContentType()) { + case CONTENT_TYPE_INLINE: + return _transferBody.getBody().getContent().length; + case CONTENT_TYPE_REFERENCE: + return getReferenceSize(); + default: + throw new IllegalStateException("unrecognized type: " + body.getContentType()); + } + } + + public long getReferenceSize() { + long size = 0; + for (MessageAppendBody mab : _contentBodies) { + size += mab.getBytes().length; + } + return size; } public FieldTable getHeadersTable() { @@ -164,7 +195,7 @@ public class AMQMessage } public byte getDeliveryMode() { - throw new Error("XXX"); + return (byte) _transferBody.deliveryMode; } public void setReplyTo(String replyTo) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index aee6f74117..f8e2642a1f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -190,7 +190,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue return 0l; } - return msg.getSize(); + return msg.getBodySize(); } /** diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 53ada898ab..74f69030e0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -96,7 +96,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager private boolean addMessageToQueue(AMQMessage msg) { // Shrink the ContentBodies to their actual size to save memory. - if (true) throw new Error("XXX"); + // XXX /*if (compressBufferOnQueue) { Iterator it = msg.getContentBodies().iterator(); diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java index 5ccc900b2c..b7f9fb4666 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java @@ -42,7 +42,7 @@ public abstract class AMQMethodBody extends AMQBody } /** unsigned short */ - protected abstract int getBodySize(); + public abstract int getBodySize(); /** * @return unsigned short |
