summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2007-01-18 01:28:44 +0000
committerRafael H. Schloming <rhs@apache.org>2007-01-18 01:28:44 +0000
commit762cba1e7db15cb3c9e987a9edcbf3106c7244cb (patch)
tree3b1d2f4cd0f49f5148925b558f4c47a077a43652 /java
parentffd69bc96e35142f380b86237fb85ee274e65057 (diff)
downloadqpid-python-762cba1e7db15cb3c9e987a9edcbf3106c7244cb.tar.gz
filled out consume and transfer handlers
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@497278 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/etc/log4j.xml6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java160
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java111
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java47
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java35
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java2
8 files changed, 286 insertions, 79 deletions
diff --git a/java/broker/etc/log4j.xml b/java/broker/etc/log4j.xml
index 98da18b8bb..0cfabf8643 100644
--- a/java/broker/etc/log4j.xml
+++ b/java/broker/etc/log4j.xml
@@ -41,8 +41,12 @@
<priority value="debug"/>
</category>
+ <category name="org.apache.qpid.server.handler">
+ <priority value="debug"/>
+ </category>
+
<root>
- <priority value="info"/>
+ <priority value="debug"/>
<appender-ref ref="STDOUT"/>
<appender-ref ref="FileAppender"/>
</root>
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 59ebc08428..6b9ceff053 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -20,15 +20,20 @@
*/
package org.apache.qpid.server;
+import org.apache.qpid.framing.Content;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.Content;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.MessageAppendBody;
+import org.apache.qpid.framing.MessageCloseBody;
+import org.apache.qpid.framing.MessageOpenBody;
+import org.apache.qpid.framing.MessageTransferBody;
import org.apache.qpid.framing.RequestManager;
import org.apache.qpid.framing.ResponseManager;
-import org.apache.qpid.protocol.AMQProtocolWriter;
import org.apache.qpid.protocol.AMQMethodListener;
+import org.apache.qpid.protocol.AMQProtocolWriter;
import org.apache.qpid.server.ack.TxAck;
import org.apache.qpid.server.ack.UnacknowledgedMessage;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
@@ -66,7 +71,7 @@ public class AMQChannel
private long _prefetch_HighWaterMark;
private long _prefetch_LowWaterMark;
-
+
private RequestManager _requestManager;
private ResponseManager _responseManager;
@@ -89,11 +94,16 @@ public class AMQChannel
private int _consumerTag;
/**
- * The current message - which may be partial in the sense that not all frames have been received yet -
- * which has been received by this channel. As the frames are received the message gets updated and once all
+ * The set of current messages - which may be partial in the sense that not all frames have been received yet -
+ * which has been received by this channel. As the frames are received the references get updated and once all
* frames have been received the message can then be routed.
*/
- private AMQMessage _currentMessage;
+ private Map<String, List<AMQMessage>> _messages = new LinkedHashMap();
+
+ /**
+ * The set of open references on this channel.
+ */
+ private Map<String, List<MessageAppendBody>> _references = new LinkedHashMap();
/**
* Maps from consumer tag to queue instance. Allows us to unsubscribe from a queue.
@@ -177,55 +187,80 @@ public class AMQChannel
_prefetch_HighWaterMark = prefetchCount;
}
-// public void setPublishFrame(BasicPublishBody publishBody, AMQProtocolSession publisher) throws AMQException
-// {
-// _currentMessage = new AMQMessage(_messageStore, publishBody);
-// _currentMessage.setPublisher(publisher);
-// }
-
-// public void publishContentHeader(ContentHeaderBody contentHeaderBody)
-// throws AMQException
-// {
-// if (_currentMessage == null)
-// {
-// throw new AMQException("Received content header without previously receiving a BasicDeliver frame");
-// }
-// else
-// {
-// _currentMessage.setContentHeaderBody(contentHeaderBody);
-// // check and route if header says body length is zero
-// if (contentHeaderBody.bodySize == 0)
-// {
-// routeCurrentMessage();
-// }
-// }
-// }
-//
-// public void publishContentBody(ContentBody contentBody)
-// throws AMQException
-// {
-// if (_currentMessage == null)
-// {
-// throw new AMQException("Received content body without previously receiving a JmsPublishBody");
-// }
-// if (_currentMessage.getContentHeaderBody() == null)
-// {
-// throw new AMQException("Received content body without previously receiving a content header");
-// }
-//
-// _currentMessage.addContentBodyFrame(contentBody);
-// if (_currentMessage.isAllContentReceived())
-// {
-// routeCurrentMessage();
-// }
-// }
-
- protected void routeCurrentMessage() throws AMQException
+ public void addMessageTransfer(MessageTransferBody transferBody, AMQProtocolSession publisher) throws AMQException
+ {
+ AMQMessage message = new AMQMessage(_messageStore, transferBody);
+ message.setPublisher(publisher);
+ Content body = transferBody.getBody();
+ switch (body.getContentType()) {
+ case CONTENT_TYPE_INLINE:
+ route(message);
+ break;
+ case CONTENT_TYPE_REFERENCE:
+ getMessages(body.getContent()).add(message);
+ break;
+ }
+ }
+
+ private List<AMQMessage> getMessages(byte[] reference) {
+ String key = new String(reference);
+ List<AMQMessage> result = _messages.get(key);
+ if (result == null) {
+ throw new IllegalArgumentException(key);
+ }
+ return result;
+ }
+
+ private List<MessageAppendBody> getReference(byte[] reference) {
+ String key = new String(reference);
+ List<MessageAppendBody> result = _references.get(key);
+ if (result == null) {
+ throw new IllegalArgumentException(key);
+ }
+ return result;
+ }
+
+ private void createReference(byte[] reference) {
+ String key = new String(reference);
+ if (_references.containsKey(key)) {
+ throw new IllegalArgumentException(key);
+ } else {
+ _references.put(key, new LinkedList());
+ _messages.put(key, new LinkedList());
+ }
+ }
+
+ private void clearReference(byte[] reference) {
+ String key = new String(reference);
+ _references.remove(key);
+ _messages.remove(key);
+ }
+
+ public void addMessageOpen(MessageOpenBody open) {
+ createReference(open.reference);
+ }
+
+ public void addMessageAppend(MessageAppendBody append) {
+ getReference(append.reference).add(append);
+ }
+
+ public void addMessageClose(MessageCloseBody close) throws AMQException {
+ List<AMQMessage> messages = getMessages(close.reference);
+ try {
+ for (AMQMessage msg : messages) {
+ route(msg);
+ }
+ } finally {
+ clearReference(close.reference);
+ }
+ }
+
+ protected void route(AMQMessage msg) throws AMQException
{
if (_transactional)
{
//don't create a transaction unless needed
- if (_currentMessage.isPersistent())
+ if (msg.isPersistent())
{
_txnBuffer.containsPersistentChanges();
}
@@ -236,13 +271,13 @@ public class AMQChannel
//be added for every queue onto which the message is
//enqueued. Finally a cleanup op will be added to decrement
//the reference associated with the routing.
- Store storeOp = new Store(_currentMessage);
+ Store storeOp = new Store(msg);
_txnBuffer.enlist(storeOp);
- _currentMessage.setTxnBuffer(_txnBuffer);
+ msg.setTxnBuffer(_txnBuffer);
try
{
- _exchanges.routeContent(_currentMessage);
- _txnBuffer.enlist(new Cleanup(_currentMessage));
+ _exchanges.routeContent(msg);
+ _txnBuffer.enlist(new Cleanup(msg));
}
catch (RequiredDeliveryException e)
{
@@ -255,33 +290,28 @@ public class AMQChannel
_txnBuffer.cancel(storeOp);
throw e;
}
- finally
- {
- _currentMessage = null;
- }
}
else
{
try
{
- _exchanges.routeContent(_currentMessage);
+ _exchanges.routeContent(msg);
//following check implements the functionality
//required by the 'immediate' flag:
- _currentMessage.checkDeliveredToConsumer();
+ msg.checkDeliveredToConsumer();
}
finally
{
- _currentMessage.decrementReference();
- _currentMessage = null;
+ msg.decrementReference();
}
}
}
-
+
public RequestManager getRequestManager()
{
return _requestManager;
}
-
+
public ResponseManager getResponseManager()
{
return _responseManager;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
index c019a3b043..a84fa4ff4a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
@@ -20,17 +20,29 @@
*/
package org.apache.qpid.server.handler;
+import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidSelectorException;
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.framing.MessageConsumeBody;
+import org.apache.qpid.framing.MessageOkBody;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodListener;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.ConsumerTagNotUniqueException;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
public class MessageConsumeHandler implements StateAwareMethodListener<MessageConsumeBody>
{
+ private static final Logger _log = Logger.getLogger(MessageConsumeHandler.class);
+
private static MessageConsumeHandler _instance = new MessageConsumeHandler();
public static MessageConsumeHandler getInstance()
@@ -39,16 +51,105 @@ public class MessageConsumeHandler implements StateAwareMethodListener<MessageCo
}
private MessageConsumeHandler() {}
-
-
+
+
public void methodReceived (AMQStateManager stateManager,
- QueueRegistry queueRegistry,
+ QueueRegistry queueRegistry,
ExchangeRegistry exchangeRegistry,
- AMQProtocolSession protocolSession,
+ AMQProtocolSession session,
AMQMethodEvent<MessageConsumeBody> evt)
throws AMQException
{
- // TODO
+ MessageConsumeBody body = evt.getMethod();
+ final int channelId = evt.getChannelId();
+
+ AMQChannel channel = session.getChannel(channelId);
+ if (channel == null)
+ {
+ _log.error("Channel " + channelId + " not found");
+ // TODO: either alert or error that the
+ }
+ else
+ {
+ AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : queueRegistry.getQueue(body.queue);
+
+ if (queue == null)
+ {
+ _log.info("No queue for '" + body.queue + "'");
+ if(body.queue!=null)
+ {
+ channelClose(session, channelId, stateManager,
+ "No such queue, '" + body.queue + "'", AMQConstant.NOT_FOUND);
+ }
+ else
+ {
+ connectionClose(session, channelId, stateManager,
+ "No queue name provided, no default queue defined.",
+ AMQConstant.NOT_ALLOWED);
+ }
+ }
+ else
+ {
+ try
+ {
+ /*AMQShort*/String destination = channel.subscribeToQueue
+ (body.destination, queue, session, !body.noAck, /*XXX*/null, body.noLocal);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ session.writeResponse(evt, MessageOkBody.createMethodBody((byte)0, (byte)9));
+
+ //now allow queue to start async processing of any backlog of messages
+ queue.deliverAsync();
+ }
+ catch (AMQInvalidSelectorException ise)
+ {
+ _log.info("Closing connection due to invalid selector");
+ channelClose(session, channelId, stateManager, ise.getMessage(), AMQConstant.INVALID_SELECTOR);
+ }
+ catch (ConsumerTagNotUniqueException e)
+ {
+ connectionClose(session, channelId, stateManager,
+ "Non-unique consumer tag, '" + body.destination + "'",
+ AMQConstant.NOT_ALLOWED);
+ }
+ }
+ }
}
+
+ private void channelClose(AMQProtocolSession session, int channelId, AMQMethodListener listener,
+ String message, AMQConstant code)
+ throws AMQException
+ {
+ /*AMQShort*/String msg = new /*AMQShort*/String(message);
+ // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ session.writeRequest(channelId, ChannelCloseBody.createMethodBody
+ ((byte)0, (byte)9, // AMQP version (major, minor)
+ MessageConsumeBody.getClazz((byte)0, (byte)9), // classId
+ MessageConsumeBody.getMethod((byte)0, (byte)9), // methodId
+ code.getCode(), // replyCode
+ msg), // replyText
+ listener);
+ }
+
+ private void connectionClose(AMQProtocolSession session, int channelId, AMQMethodListener listener,
+ String message, AMQConstant code)
+ throws AMQException
+ {
+ /*AMQShort*/String msg = new /*AMQShort*/String(message);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ session.writeRequest(channelId, ConnectionCloseBody.createMethodBody
+ ((byte)0, (byte)9, // AMQP version (major, minor)
+ MessageConsumeBody.getClazz((byte)0, (byte)9), // classId
+ MessageConsumeBody.getMethod((byte)0, (byte)9), // methodId
+ code.getCode(), // replyCode
+ msg), // replyText
+ listener);
+ }
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
index 4f6886e885..58160611d1 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
@@ -20,9 +20,15 @@
*/
package org.apache.qpid.server.handler;
+import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.framing.MessageTransferBody;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
@@ -31,6 +37,8 @@ import org.apache.qpid.server.state.StateAwareMethodListener;
public class MessageTransferHandler implements StateAwareMethodListener<MessageTransferBody>
{
+ private static final Logger _log = Logger.getLogger(MessageTransferHandler.class);
+
private static MessageTransferHandler _instance = new MessageTransferHandler();
public static MessageTransferHandler getInstance()
@@ -38,9 +46,10 @@ public class MessageTransferHandler implements StateAwareMethodListener<MessageT
return _instance;
}
+ private static final String UNKNOWN_EXCHANGE_NAME = "Unknown exchange name";
+
private MessageTransferHandler() {}
-
-
+
public void methodReceived (AMQStateManager stateManager,
QueueRegistry queueRegistry,
ExchangeRegistry exchangeRegistry,
@@ -48,7 +57,39 @@ public class MessageTransferHandler implements StateAwareMethodListener<MessageT
AMQMethodEvent<MessageTransferBody> evt)
throws AMQException
{
- // TODO
+ final MessageTransferBody body = evt.getMethod();
+
+ if (_log.isDebugEnabled()) {
+ _log.debug("Publish received on channel " + evt.getChannelId());
+ }
+
+ // TODO: check the delivery tag field details - is it unique across the broker or per subscriber?
+ if (body.exchange == null) {
+ body.exchange = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
+ }
+ Exchange e = exchangeRegistry.getExchange(body.exchange);
+ // if the exchange does not exist we raise a channel exception
+ if (e == null) {
+ protocolSession.closeChannel(evt.getChannelId());
+ // TODO: modify code gen to make getClazz and getMethod public methods rather than protected
+ // then we can remove the hardcoded 0,0
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQMethodBody cf = ChannelCloseBody.createMethodBody
+ ((byte)8, (byte)0, // AMQP version (major, minor)
+ MessageTransferBody.getClazz((byte)0, (byte)9), // classId
+ MessageTransferBody.getMethod((byte)0, (byte)9), // methodId
+ 500, // replyCode
+ UNKNOWN_EXCHANGE_NAME); // replyText
+ protocolSession.writeRequest(evt.getChannelId(), cf, stateManager);
+ } else {
+ // The partially populated BasicDeliver frame plus the received route body
+ // is stored in the channel. Once the final body frame has been received
+ // it is routed to the exchange.
+ AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
+ channel.addMessageTransfer(body, protocolSession);
+ }
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index 3f0cf63b70..e0457748f4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -132,7 +132,38 @@ public class AMQMessage
}
public long getSize() {
- throw new Error("XXX");
+ return getHeaderSize() + getBodySize();
+ }
+
+ public int getHeaderSize() {
+ int size = _transferBody.getBodySize();
+ Content body = _transferBody.getBody();
+ switch (body.getContentType()) {
+ case CONTENT_TYPE_INLINE:
+ size -= _transferBody.getBody().getEncodedSize();
+ break;
+ }
+ return size;
+ }
+
+ public long getBodySize() {
+ Content body = _transferBody.getBody();
+ switch (body.getContentType()) {
+ case CONTENT_TYPE_INLINE:
+ return _transferBody.getBody().getContent().length;
+ case CONTENT_TYPE_REFERENCE:
+ return getReferenceSize();
+ default:
+ throw new IllegalStateException("unrecognized type: " + body.getContentType());
+ }
+ }
+
+ public long getReferenceSize() {
+ long size = 0;
+ for (MessageAppendBody mab : _contentBodies) {
+ size += mab.getBytes().length;
+ }
+ return size;
}
public FieldTable getHeadersTable() {
@@ -164,7 +195,7 @@ public class AMQMessage
}
public byte getDeliveryMode() {
- throw new Error("XXX");
+ return (byte) _transferBody.deliveryMode;
}
public void setReplyTo(String replyTo) {
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index aee6f74117..f8e2642a1f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -190,7 +190,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
return 0l;
}
- return msg.getSize();
+ return msg.getBodySize();
}
/**
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 53ada898ab..74f69030e0 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -96,7 +96,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
private boolean addMessageToQueue(AMQMessage msg)
{
// Shrink the ContentBodies to their actual size to save memory.
- if (true) throw new Error("XXX");
+ // XXX
/*if (compressBufferOnQueue)
{
Iterator it = msg.getContentBodies().iterator();
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
index 5ccc900b2c..b7f9fb4666 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
@@ -42,7 +42,7 @@ public abstract class AMQMethodBody extends AMQBody
}
/** unsigned short */
- protected abstract int getBodySize();
+ public abstract int getBodySize();
/**
* @return unsigned short