diff options
author | Robert Greig <rgreig@apache.org> | 2007-01-29 10:59:33 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2007-01-29 10:59:33 +0000 |
commit | 92bfc7746a8101a749fc1b4699c1e60cd9f79c4a (patch) | |
tree | 7eb63ea163487ea25f4cf19fe09a1ac2c0b577be /java | |
parent | 2bcc371558ce0659f53b86046cdf3d5de3b20910 (diff) | |
download | qpid-python-92bfc7746a8101a749fc1b4699c1e60cd9f79c4a.tar.gz |
QPID-320 : Patch supplied by Rob Godfrey - Improve performance by remembering protocol version
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@501003 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
32 files changed, 504 insertions, 259 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java index 325e5226ad..5c176c21cb 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java @@ -50,6 +50,6 @@ public class BasicQosHandler implements StateAwareMethodListener<BasicQosBody> // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - session.writeFrame(new AMQFrame(evt.getChannelId(), new BasicQosOkBody((byte)8, (byte)0))); + session.writeFrame(BasicQosOkBody.createAMQFrame(evt.getChannelId(),(byte)8, (byte)0)); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index d87821aa46..5f9fcbdc85 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -61,6 +61,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString(); + // to save boxing the channelId and looking up in a map... cache in an array the low numbered + // channels. This value must be of the form 2^x - 1. + private static final int CHANNEL_CACHE_SIZE = 0xff; private final IoSession _minaProtocolSession; @@ -70,6 +73,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, private final Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>(); + private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE+1]; + private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>(); private final AMQStateManager _stateManager; @@ -89,10 +94,12 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, private long _maxNoOfChannels = 1000; /* AMQP Version for this session */ - private byte _major; - private byte _minor; + private byte _major = pv[pv.length-1][PROTOCOL_MAJOR]; + private byte _minor = pv[pv.length-1][PROTOCOL_MINOR]; private FieldTable _clientProperties; private final List<Task> _taskList = new CopyOnWriteArrayList<Task>(); + private VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(pv[pv.length-1][PROTOCOL_MAJOR],pv[pv.length-1][PROTOCOL_MINOR]); + public ManagedObject getManagedObject() @@ -165,11 +172,14 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, try { pi.checkVersion(this); // Fails if not correct + // This sets the protocol version (and hence framing classes) for this session. - _major = pi.protocolMajor; - _minor = pi.protocolMinor; + setProtocolVersion(pi.protocolMajor,pi.protocolMinor); + String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms(); + String locales = "en_US"; + // Interfacing with generated code - be aware of possible changes to parameter order as versions change. AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0, _major, _minor, // AMQP version (major, minor) @@ -200,7 +210,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { AMQFrame frame = (AMQFrame) message; - if (frame.bodyFrame instanceof AMQMethodBody) + if (frame.getBodyFrame() instanceof AMQMethodBody) { methodFrameReceived(frame); } @@ -217,8 +227,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.debug("Method frame received: " + frame); } - final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.channel, - (AMQMethodBody) frame.bodyFrame); + final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), + (AMQMethodBody) frame.getBodyFrame()); try { try @@ -241,14 +251,14 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, catch (AMQChannelException e) { _logger.error("Closing channel due to: " + e.getMessage()); - writeFrame(e.getCloseFrame(frame.channel)); - closeChannel(frame.channel); + writeFrame(e.getCloseFrame(frame.getChannel())); + closeChannel(frame.getChannel()); } catch (AMQConnectionException e) { _logger.error("Closing connection due to: " + e.getMessage()); closeSession(); - writeFrame(e.getCloseFrame(frame.channel)); + writeFrame(e.getCloseFrame(frame.getChannel())); } } catch (Exception e) @@ -264,15 +274,15 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, private void contentFrameReceived(AMQFrame frame) throws AMQException { - if (frame.bodyFrame instanceof ContentHeaderBody) + if (frame.getBodyFrame() instanceof ContentHeaderBody) { contentHeaderReceived(frame); } - else if (frame.bodyFrame instanceof ContentBody) + else if (frame.getBodyFrame() instanceof ContentBody) { contentBodyReceived(frame); } - else if (frame.bodyFrame instanceof HeartbeatBody) + else if (frame.getBodyFrame() instanceof HeartbeatBody) { _logger.debug("Received heartbeat from client"); } @@ -288,7 +298,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.debug("Content header frame received: " + frame); } - getChannel(frame.channel).publishContentHeader((ContentHeaderBody) frame.bodyFrame); + getChannel(frame.getChannel()).publishContentHeader((ContentHeaderBody) frame.getBodyFrame()); } private void contentBodyReceived(AMQFrame frame) throws AMQException @@ -297,7 +307,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.debug("Content body frame received: " + frame); } - getChannel(frame.channel).publishContentBody((ContentBody)frame.bodyFrame, this); + getChannel(frame.getChannel()).publishContentBody((ContentBody)frame.getBodyFrame(), this); } /** @@ -329,7 +339,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, public AMQChannel getChannel(int channelId) throws AMQException { - return _channelMap.get(channelId); + return ((channelId & CHANNEL_CACHE_SIZE) == channelId) + ? _cachedChannels[channelId] + : _channelMap.get(channelId); } public void addChannel(AMQChannel channel) throws AMQException @@ -339,7 +351,13 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, throw new AMQException("Session is closed"); } - _channelMap.put(channel.getChannelId(), channel); + final int channelId = channel.getChannelId(); + _channelMap.put(channelId, channel); + + if(((channelId & CHANNEL_CACHE_SIZE) == channelId)) + { + _cachedChannels[channelId] = channel; + } checkForNotification(); } @@ -389,7 +407,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, */ public void closeChannel(int channelId) throws AMQException { - final AMQChannel channel = _channelMap.get(channelId); + final AMQChannel channel = getChannel(channelId); if (channel == null) { throw new IllegalArgumentException("Unknown channel id"); @@ -402,7 +420,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } finally { - _channelMap.remove(channelId); + removeChannel(channelId); + } } } @@ -415,6 +434,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, public void removeChannel(int channelId) { _channelMap.remove(channelId); + if((channelId & CHANNEL_CACHE_SIZE) == channelId) + { + _cachedChannels[channelId] = null; + } } /** @@ -444,6 +467,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, channel.close(this); } _channelMap.clear(); + for(int i = 0; i <= CHANNEL_CACHE_SIZE; i++) + { + _cachedChannels[i]=null; + } } /** @@ -534,10 +561,12 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } } - /** - * Convenience methods for managing AMQP version. - * NOTE: Both major and minor will be set to 0 prior to protocol initiation. - */ + private void setProtocolVersion(byte major, byte minor) + { + _major = major; + _minor = minor; + _registry = MainRegistry.getVersionSpecificRegistry(major,minor); + } public byte getProtocolMajorVersion() { @@ -554,6 +583,12 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, return _major == major && _minor == minor; } + public VersionSpecificRegistry getRegistry() + { + return _registry; + } + + public Object getClientIdentifier() { diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java index dd56fe87ec..d7e6af0c29 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java @@ -120,7 +120,10 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoco _logger.info("Protocol Session closed"); final AMQProtocolSession amqProtocolSession = AMQMinaProtocolSession.getAMQProtocolSession(protocolSession); //fixme -- this can be null - amqProtocolSession.closeSession(); + if(amqProtocolSession != null) + { + amqProtocolSession.closeSession(); + } } public void sessionIdle(IoSession session, IdleStatus status) throws Exception diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index ed998b33c6..6b3b4f8021 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -24,6 +24,7 @@ import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.protocol.AMQProtocolWriter; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.AMQException; @@ -31,7 +32,7 @@ import org.apache.qpid.AMQException; import javax.security.sasl.SaslServer; -public interface AMQProtocolSession extends AMQProtocolWriter +public interface AMQProtocolSession extends AMQVersionAwareProtocolSession { @@ -144,8 +145,4 @@ public interface AMQProtocolSession extends AMQProtocolWriter void removeSessionCloseTask(Task task); - byte getProtocolMajorVersion(); - - byte getProtocolMinorVersion(); - } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index 23a5da0a30..7a16901796 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -53,7 +53,7 @@ public class AMQMessage */ private AMQProtocolSession _publisher; - private final long _messageId; + private final Long _messageId; private final AtomicInteger _referenceCount = new AtomicInteger(1); @@ -68,6 +68,13 @@ public class AMQMessage * messages published with the 'immediate' flag. */ private boolean _deliveredToConsumer; + /** + * We need to keep track of whether the message was 'immediate' + * as in extreme circumstances, when the checkDelieveredToConsumer + * is called, the message may already have been received and acknowledged, + * and the body removed from the store. + */ + private boolean _immediate; private AtomicBoolean _taken = new AtomicBoolean(false); @@ -160,11 +167,12 @@ public class AMQMessage } } - public AMQMessage(long messageId, BasicPublishBody publishBody, + public AMQMessage(Long messageId, BasicPublishBody publishBody, TransactionalContext txnContext) { _messageId = messageId; _txnContext = txnContext; + _immediate = publishBody.immediate; _transientMessageData.setPublishBody(publishBody); _taken = new AtomicBoolean(false); @@ -183,7 +191,7 @@ public class AMQMessage * @param factory * @throws AMQException */ - public AMQMessage(long messageId, MessageStore store, MessageHandleFactory factory) throws AMQException + public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory) throws AMQException { _messageId = messageId; _messageHandle = factory.createMessageHandle(messageId, store, true); @@ -198,7 +206,7 @@ public class AMQMessage * @param txnContext * @param contentHeader */ - public AMQMessage(long messageId, BasicPublishBody publishBody, + public AMQMessage(Long messageId, BasicPublishBody publishBody, TransactionalContext txnContext, ContentHeaderBody contentHeader) throws AMQException { this(messageId, publishBody, txnContext); @@ -216,7 +224,7 @@ public class AMQMessage * @param contentBodies * @throws AMQException */ - public AMQMessage(long messageId, BasicPublishBody publishBody, + public AMQMessage(Long messageId, BasicPublishBody publishBody, TransactionalContext txnContext, ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues, List<ContentBody> contentBodies, MessageStore messageStore, StoreContext storeContext, @@ -293,8 +301,9 @@ public class AMQMessage public boolean addContentBodyFrame(StoreContext storeContext, ContentBody contentBody) throws AMQException { _transientMessageData.addBodyLength(contentBody.getSize()); - _messageHandle.addContentBodyFrame(storeContext, _messageId, contentBody); - if (isAllContentReceived()) + final boolean allContentReceived = isAllContentReceived(); + _messageHandle.addContentBodyFrame(storeContext, _messageId, contentBody, allContentReceived); + if (allContentReceived) { deliver(storeContext); return true; @@ -348,6 +357,7 @@ public class AMQMessage { _log.debug("Ref count on message " + _messageId + " is zero; removing message"); } + // must check if the handle is null since there may be cases where we decide to throw away a message // and the handle has not yet been constructed if (_messageHandle != null) @@ -372,6 +382,10 @@ public class AMQMessage Thread.dumpStack(); } } + if(_referenceCount.get()<0) + { + throw new MessageCleanupException("Reference count for message id " + _messageId + " has gone below 0."); + } } } @@ -464,8 +478,8 @@ public class AMQMessage */ public void checkDeliveredToConsumer() throws NoConsumersException, AMQException { - BasicPublishBody pb = getPublishBody(); - if (pb.immediate && !_deliveredToConsumer) + + if (_immediate && !_deliveredToConsumer) { throw new NoConsumersException(this); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java index ef5d460b9b..d788d1b9e2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java @@ -35,17 +35,17 @@ import org.apache.qpid.framing.ContentHeaderBody; */ public interface AMQMessageHandle { - ContentHeaderBody getContentHeaderBody(long messageId) throws AMQException; + ContentHeaderBody getContentHeaderBody(Long messageId) throws AMQException; /** * @return the number of body frames associated with this message */ - int getBodyCount(long messageId) throws AMQException; + int getBodyCount(Long messageId) throws AMQException; /** * @return the size of the body */ - long getBodySize(long messageId) throws AMQException; + long getBodySize(Long messageId) throws AMQException; /** * Get a particular content body @@ -53,25 +53,25 @@ public interface AMQMessageHandle * @return a content body * @throws IllegalArgumentException if the index is invalid */ - ContentBody getContentBody(long messageId, int index) throws IllegalArgumentException, AMQException; + ContentBody getContentBody(Long messageId, int index) throws IllegalArgumentException, AMQException; - void addContentBodyFrame(StoreContext storeContext, long messageId, ContentBody contentBody) throws AMQException; + void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentBody contentBody, boolean isLastContentBody) throws AMQException; - BasicPublishBody getPublishBody(long messageId) throws AMQException; + BasicPublishBody getPublishBody(Long messageId) throws AMQException; boolean isRedelivered(); void setRedelivered(boolean redelivered); - boolean isPersistent(long messageId) throws AMQException; + boolean isPersistent(Long messageId) throws AMQException; - void setPublishAndContentHeaderBody(StoreContext storeContext, long messageId, BasicPublishBody publishBody, + void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody) throws AMQException; - void removeMessage(StoreContext storeContext, long messageId) throws AMQException; + void removeMessage(StoreContext storeContext, Long messageId) throws AMQException; - void enqueue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException; + void enqueue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException; - void dequeue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException; + void dequeue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException; }
\ No newline at end of file diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 709dd28ad5..ab7bbefe92 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -252,6 +252,12 @@ public class AMQQueue implements Managable, Comparable return _deliveryMgr.getMessages(); } + public long getQueueDepth() + { + return _deliveryMgr.getTotalMessageSize(); + } + + /** * @param messageId * @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist. @@ -315,13 +321,13 @@ public class AMQQueue implements Managable, Comparable _maxMessageCount = value; } - public Long getMaximumQueueDepth() + public long getMaximumQueueDepth() { return _maxQueueDepth; } // Sets the queue depth, the max queue size - public void setMaximumQueueDepth(Long value) + public void setMaximumQueueDepth(long value) { _maxQueueDepth = value; } @@ -625,4 +631,6 @@ public class AMQQueue implements Managable, Comparable _deleteTaskList.add(task); } + + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index ab67012b19..e15dc648f7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -191,25 +191,13 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue */ public Long getQueueDepth() throws JMException { - List<AMQMessage> list = _queue.getMessagesOnTheQueue(); - if (list.size() == 0) - { - return 0l; - } + return getQueueDepthKb(); + } - long queueDepth = 0; - try - { - for (AMQMessage message : list) - { - queueDepth = queueDepth + getMessageSize(message); - } - } - catch (AMQException e) - { - throw new JMException("Unable to get message size: " + e); - } - return (long) Math.round(queueDepth / 1000); + public long getQueueDepthKb() + { + long queueBytesSize = _queue.getQueueDepth(); + return queueBytesSize >> 10 ; } /** @@ -245,7 +233,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue } // Check for threshold queue depth in bytes - long queueDepth = getQueueDepth(); + long queueDepth = getQueueDepthKb(); if (queueDepth >= _queue.getMaximumQueueDepth()) { notifyClients("Queue depth(" + queueDepth + "), Queue size has reached the threshold high value"); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index ba4d0bf4ba..c0c0970c48 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -38,6 +38,7 @@ import java.util.Queue; import java.util.concurrent.Executor; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; /** @@ -83,6 +84,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager * Lock is used to control access to hasQueuedMessages() and over the addition of messages to the queue. */ private ReentrantLock _lock = new ReentrantLock(); + private AtomicLong _totalMessageSize = new AtomicLong(); ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue) @@ -116,6 +118,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _messages.offer(msg); + _totalMessageSize.addAndGet(msg.getSize()); + return true; } @@ -150,6 +154,13 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } + + public long getTotalMessageSize() + { + return _totalMessageSize.get(); + } + + public synchronized List<AMQMessage> getMessages() { return new ArrayList<AMQMessage>(_messages); @@ -213,6 +224,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } msg.writeGetOk(protocolSession, channel.getChannelId(), deliveryTag, _queue.getMessageCount()); + _totalMessageSize.addAndGet(-msg.getSize()); } } finally @@ -224,6 +236,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } + public synchronized void removeAMessageFromTop(StoreContext storeContext) throws AMQException { AMQMessage msg = poll(); @@ -231,6 +244,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { msg.dequeue(storeContext, _queue); } + _totalMessageSize.getAndAdd(-msg.getSize()); } public synchronized long clearAllMessages(StoreContext storeContext) throws AMQException @@ -241,8 +255,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { msg.dequeue(storeContext, _queue); count++; + _totalMessageSize.set(0L); msg = poll(); + } + return count; } @@ -292,6 +309,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager //remove sent message from our queue. messageQueue.poll(); + _totalMessageSize.addAndGet(-message.getSize()); } catch (AMQException e) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java index 6954be8473..7d7ede0732 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java @@ -81,4 +81,6 @@ interface DeliveryManager void populatePreDeliveryQueue(Subscription subscription); boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) throws AMQException; + + long getTotalMessageSize(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java index e2758ce6cb..186a5c8753 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java @@ -47,22 +47,22 @@ public class InMemoryMessageHandle implements AMQMessageHandle { } - public ContentHeaderBody getContentHeaderBody(long messageId) throws AMQException + public ContentHeaderBody getContentHeaderBody(Long messageId) throws AMQException { return _contentHeaderBody; } - public int getBodyCount(long messageId) + public int getBodyCount(Long messageId) { return _contentBodies.size(); } - public long getBodySize(long messageId) throws AMQException + public long getBodySize(Long messageId) throws AMQException { return getContentHeaderBody(messageId).bodySize; } - public ContentBody getContentBody(long messageId, int index) throws AMQException, IllegalArgumentException + public ContentBody getContentBody(Long messageId, int index) throws AMQException, IllegalArgumentException { if (index > _contentBodies.size() - 1) { @@ -72,13 +72,13 @@ public class InMemoryMessageHandle implements AMQMessageHandle return _contentBodies.get(index); } - public void addContentBodyFrame(StoreContext storeContext, long messageId, ContentBody contentBody) + public void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentBody contentBody, boolean isLastContentBody) throws AMQException { _contentBodies.add(contentBody); } - public BasicPublishBody getPublishBody(long messageId) throws AMQException + public BasicPublishBody getPublishBody(Long messageId) throws AMQException { return _publishBody; } @@ -94,7 +94,7 @@ public class InMemoryMessageHandle implements AMQMessageHandle _redelivered = redelivered; } - public boolean isPersistent(long messageId) throws AMQException + public boolean isPersistent(Long messageId) throws AMQException { //todo remove literal values to a constant file such as AMQConstants in common ContentHeaderBody chb = getContentHeaderBody(messageId); @@ -108,7 +108,7 @@ public class InMemoryMessageHandle implements AMQMessageHandle * @param contentHeaderBody * @throws AMQException */ - public void setPublishAndContentHeaderBody(StoreContext storeContext, long messageId, BasicPublishBody publishBody, + public void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody) throws AMQException { @@ -116,17 +116,17 @@ public class InMemoryMessageHandle implements AMQMessageHandle _contentHeaderBody = contentHeaderBody; } - public void removeMessage(StoreContext storeContext, long messageId) throws AMQException + public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException { // NO OP } - public void enqueue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException + public void enqueue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException { // NO OP } - public void dequeue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException + public void dequeue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException { // NO OP } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java index bfe0a0ecf1..1e7e6f03d2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java @@ -32,4 +32,8 @@ public class MessageCleanupException extends AMQException { super("Failed to cleanup message with id " + messageId, e); } + public MessageCleanupException(String message) + { + super(message); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java index a7ada2c1f8..94ab935115 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java @@ -31,7 +31,7 @@ import org.apache.qpid.server.store.MessageStore; public class MessageHandleFactory { - public AMQMessageHandle createMessageHandle(long messageId, MessageStore store, boolean persistent) + public AMQMessageHandle createMessageHandle(Long messageId, MessageStore store, boolean persistent) { // just hardcoded for now if (persistent) diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java index 2fb2bdd2e3..50051fdc34 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java @@ -32,6 +32,7 @@ import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.List; import java.util.LinkedList; +import java.util.Collections; /** * @author Robert Greig (robert.j.greig@jpmorgan.com) @@ -48,12 +49,13 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle private final MessageStore _messageStore; + public WeakReferenceMessageHandle(MessageStore messageStore) { _messageStore = messageStore; } - public ContentHeaderBody getContentHeaderBody(long messageId) throws AMQException + public ContentHeaderBody getContentHeaderBody(Long messageId) throws AMQException { ContentHeaderBody chb = (_contentHeaderBody != null?_contentHeaderBody.get():null); if (chb == null) @@ -66,7 +68,7 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle return chb; } - public int getBodyCount(long messageId) throws AMQException + public int getBodyCount(Long messageId) throws AMQException { if (_contentBodies == null) { @@ -81,12 +83,12 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle return _contentBodies.size(); } - public long getBodySize(long messageId) throws AMQException + public long getBodySize(Long messageId) throws AMQException { return getContentHeaderBody(messageId).bodySize; } - public ContentBody getContentBody(long messageId, int index) throws AMQException, IllegalArgumentException + public ContentBody getContentBody(Long messageId, int index) throws AMQException, IllegalArgumentException { if (index > _contentBodies.size() - 1) { @@ -108,19 +110,30 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle * @param storeContext * @param messageId * @param contentBody + * @param isLastContentBody * @throws AMQException */ - public void addContentBodyFrame(StoreContext storeContext, long messageId, ContentBody contentBody) throws AMQException + public void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentBody contentBody, boolean isLastContentBody) throws AMQException { - if (_contentBodies == null) + if(_contentBodies == null && isLastContentBody) { - _contentBodies = new LinkedList<WeakReference<ContentBody>>(); + _contentBodies = Collections.singletonList(new WeakReference<ContentBody>(contentBody)); + + } + else + { + if (_contentBodies == null) + { + _contentBodies = new LinkedList<WeakReference<ContentBody>>(); + } + + + _contentBodies.add(new WeakReference<ContentBody>(contentBody)); } - _contentBodies.add(new WeakReference<ContentBody>(contentBody)); - _messageStore.storeContentBodyChunk(storeContext, messageId, _contentBodies.size() - 1, contentBody); + _messageStore.storeContentBodyChunk(storeContext, messageId, _contentBodies.size() - 1, contentBody, isLastContentBody); } - public BasicPublishBody getPublishBody(long messageId) throws AMQException + public BasicPublishBody getPublishBody(Long messageId) throws AMQException { BasicPublishBody bpb = (_publishBody != null?_publishBody.get():null); if (bpb == null) @@ -143,7 +156,7 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle _redelivered = redelivered; } - public boolean isPersistent(long messageId) throws AMQException + public boolean isPersistent(Long messageId) throws AMQException { //todo remove literal values to a constant file such as AMQConstants in common ContentHeaderBody chb = getContentHeaderBody(messageId); @@ -157,7 +170,7 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle * @param contentHeaderBody * @throws AMQException */ - public void setPublishAndContentHeaderBody(StoreContext storeContext, long messageId, BasicPublishBody publishBody, + public void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody) throws AMQException { @@ -173,17 +186,17 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle _contentHeaderBody = new WeakReference<ContentHeaderBody>(contentHeaderBody); } - public void removeMessage(StoreContext storeContext, long messageId) throws AMQException + public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException { _messageStore.removeMessage(storeContext, messageId); } - public void enqueue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException + public void enqueue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException { _messageStore.enqueueMessage(storeContext, queue.getName(), messageId); } - public void dequeue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException + public void dequeue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException { _messageStore.dequeueMessage(storeContext, queue.getName(), messageId); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java index 7b8ba1d9cc..69b12ec4e5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java @@ -36,6 +36,7 @@ import org.apache.log4j.Logger; import java.util.HashMap; import java.util.Map; +import java.util.EnumMap; import java.util.concurrent.CopyOnWriteArraySet; /** @@ -59,8 +60,9 @@ public class AMQStateManager implements AMQMethodListener * Maps from an AMQState instance to a Map from Class to StateTransitionHandler. * The class must be a subclass of AMQFrame. */ - private final Map<AMQState, Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>> _state2HandlersMap = - new HashMap<AMQState, Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>>(); + private final EnumMap<AMQState, Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>> _state2HandlersMap = + new EnumMap<AMQState, Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>>(AMQState.class); + private CopyOnWriteArraySet<StateListener> _stateListeners = new CopyOnWriteArraySet<StateListener>(); @@ -83,14 +85,7 @@ public class AMQStateManager implements AMQMethodListener protected void registerListeners() { - Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>> frame2handlerMap = - new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>(); - - // we need to register a map for the null (i.e. all state) handlers otherwise you get - // a stack overflow in the handler searching code when you present it with a frame for which - // no handlers are registered - // - _state2HandlersMap.put(null, frame2handlerMap); + Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>> frame2handlerMap; frame2handlerMap = new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>(); frame2handlerMap.put(ConnectionStartOkBody.class, ConnectionStartOkMethodHandler.getInstance()); @@ -205,26 +200,14 @@ public class AMQStateManager implements AMQMethodListener final Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>> classToHandlerMap = _state2HandlersMap.get(currentState); - if (classToHandlerMap == null) - { - // if no specialised per state handler is registered look for a - // handler registered for "all" states - return findStateTransitionHandler(null, frame); - } - final StateAwareMethodListener<B> handler = (StateAwareMethodListener<B>) classToHandlerMap.get(frame.getClass()); + final StateAwareMethodListener<B> handler = classToHandlerMap == null + ? null + : (StateAwareMethodListener<B>) classToHandlerMap.get(frame.getClass()); + if (handler == null) { - if (currentState == null) - { - _logger.debug("No state transition handler defined for receiving frame " + frame); - return null; - } - else - { - // if no specialised per state handler is registered look for a - // handler registered for "all" states - return findStateTransitionHandler(null, frame); - } + _logger.debug("No state transition handler defined for receiving frame " + frame); + return null; } else { diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index eaaffa2dce..1f4cc7530d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -27,11 +27,11 @@ import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageMetaData; -import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.ArrayList; import java.util.List; +import java.util.Collections; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; @@ -87,7 +87,7 @@ public class MemoryMessageStore implements MessageStore } } - public void removeMessage(StoreContext context, long messageId) + public void removeMessage(StoreContext context, Long messageId) { if (_log.isDebugEnabled()) { @@ -107,12 +107,12 @@ public class MemoryMessageStore implements MessageStore // Not required to do anything } - public void enqueueMessage(StoreContext context, AMQShortString name, long messageId) throws AMQException + public void enqueueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException { // Not required to do anything } - public void dequeueMessage(StoreContext context, AMQShortString name, long messageId) throws AMQException + public void dequeueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException { // Not required to do anything } @@ -142,36 +142,44 @@ public class MemoryMessageStore implements MessageStore return null; } - public long getNewMessageId() + public Long getNewMessageId() { return _messageId.getAndIncrement(); } - public void storeContentBodyChunk(StoreContext context, long messageId, int index, ContentBody contentBody) + public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentBody contentBody, boolean lastContentBody) throws AMQException { List<ContentBody> bodyList = _contentBodyMap.get(messageId); - if (bodyList == null) + + if(bodyList == null && lastContentBody) { - bodyList = new ArrayList<ContentBody>(); - _contentBodyMap.put(messageId, bodyList); + _contentBodyMap.put(messageId, Collections.singletonList(contentBody)); } + else + { + if (bodyList == null) + { + bodyList = new ArrayList<ContentBody>(); + _contentBodyMap.put(messageId, bodyList); + } - bodyList.add(index, contentBody); + bodyList.add(index, contentBody); + } } - public void storeMessageMetaData(StoreContext context, long messageId, MessageMetaData messageMetaData) + public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException { _metaDataMap.put(messageId, messageMetaData); } - public MessageMetaData getMessageMetaData(long messageId) throws AMQException + public MessageMetaData getMessageMetaData(Long messageId) throws AMQException { return _metaDataMap.get(messageId); } - public ContentBody getContentBodyChunk(long messageId, int index) throws AMQException + public ContentBody getContentBodyChunk(Long messageId, int index) throws AMQException { List<ContentBody> bodyList = _contentBodyMap.get(messageId); return bodyList.get(index); diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java index 8daad0e5e5..b4ccd2cc51 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java @@ -50,15 +50,15 @@ public interface MessageStore */ void close() throws Exception; - void removeMessage(StoreContext storeContext, long messageId) throws AMQException; + void removeMessage(StoreContext storeContext, Long messageId) throws AMQException; void createQueue(AMQQueue queue) throws AMQException; void removeQueue(AMQShortString name) throws AMQException; - void enqueueMessage(StoreContext context, AMQShortString name, long messageId) throws AMQException; + void enqueueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException; - void dequeueMessage(StoreContext context, AMQShortString name, long messageId) throws AMQException; + void dequeueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException; void beginTran(StoreContext context) throws AMQException; @@ -79,14 +79,14 @@ public interface MessageStore * Return a valid, currently unused message id. * @return a message id */ - long getNewMessageId(); + Long getNewMessageId(); - void storeContentBodyChunk(StoreContext context, long messageId, int index, ContentBody contentBody) throws AMQException; + void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentBody contentBody, boolean lastContentBody) throws AMQException; - void storeMessageMetaData(StoreContext context, long messageId, MessageMetaData messageMetaData) throws AMQException; + void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException; - MessageMetaData getMessageMetaData(long messageId) throws AMQException; + MessageMetaData getMessageMetaData(Long messageId) throws AMQException; - ContentBody getContentBodyChunk(long messageId, int index) throws AMQException; + ContentBody getContentBodyChunk(Long messageId, int index) throws AMQException; } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java index 36287d2923..ebeea8d2b4 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java @@ -24,7 +24,7 @@ import org.apache.mina.common.ByteBuffer; public abstract class AMQBody { - protected abstract byte getFrameType(); + public abstract byte getFrameType(); /** * Get the size of the body diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java index 552c8e599e..e426651588 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java @@ -24,25 +24,29 @@ import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.IoSession; import org.apache.mina.filter.codec.ProtocolDecoderOutput; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import java.util.HashMap; import java.util.Map; public class AMQDataBlockDecoder { - Logger _logger = Logger.getLogger(AMQDataBlockDecoder.class); + private static final String SESSION_METHOD_BODY_FACTORY = "QPID_SESSION_METHOD_BODY_FACTORY"; - private final Map _supportedBodies = new HashMap(); + private static final BodyFactory[] _bodiesSupported = new BodyFactory[Byte.MAX_VALUE]; - private final static BodyFactory[] _bodiesSupported = new BodyFactory[Byte.MAX_VALUE]; static { - _bodiesSupported[AMQMethodBody.TYPE] = AMQMethodBodyFactory.getInstance(); _bodiesSupported[ContentHeaderBody.TYPE] = ContentHeaderBodyFactory.getInstance(); _bodiesSupported[ContentBody.TYPE] = ContentBodyFactory.getInstance(); _bodiesSupported[HeartbeatBody.TYPE] = new HeartbeatBodyFactory(); } + + Logger _logger = Logger.getLogger(AMQDataBlockDecoder.class); + + + public AMQDataBlockDecoder() { } @@ -55,52 +59,57 @@ public class AMQDataBlockDecoder { return false; } - - final byte type = in.get(); - final int channel = in.getUnsignedShort(); + in.skip(1 + 2); final long bodySize = in.getUnsignedInt(); - // bodySize can be zero - if (type <= 0 || channel < 0 || bodySize < 0) - { - throw new AMQFrameDecodingException("Undecodable frame: type = " + type + " channel = " + channel + - " bodySize = " + bodySize); - } + return (remainingAfterAttributes >= bodySize); } - private boolean isSupportedFrameType(byte frameType) + + protected Object createAndPopulateFrame(IoSession session, ByteBuffer in) + throws AMQFrameDecodingException, AMQProtocolVersionException { - final boolean result = _bodiesSupported[frameType] != null; + final byte type = in.get(); + + BodyFactory bodyFactory; + if(type == AMQMethodBody.TYPE) + { + bodyFactory = (BodyFactory) session.getAttribute(SESSION_METHOD_BODY_FACTORY); + if(bodyFactory == null) + { + AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment(); + bodyFactory = new AMQMethodBodyFactory(protocolSession); + session.setAttribute(SESSION_METHOD_BODY_FACTORY, bodyFactory); + + } - if (!result) + } + else { - _logger.warn("AMQDataBlockDecoder does not handle frame type " + frameType); + bodyFactory = _bodiesSupported[type]; } - return result; - } - protected Object createAndPopulateFrame(ByteBuffer in) - throws AMQFrameDecodingException, AMQProtocolVersionException - { - final byte type = in.get(); - BodyFactory bodyFactory = _bodiesSupported[type]; - if (!isSupportedFrameType(type)) + + + if(bodyFactory == null) { throw new AMQFrameDecodingException("Unsupported frame type: " + type); } + final int channel = in.getUnsignedShort(); final long bodySize = in.getUnsignedInt(); - /* - if (bodyFactory == null) + // bodySize can be zero + if (channel < 0 || bodySize < 0) { - throw new AMQFrameDecodingException("Unsupported body type: " + type); + throw new AMQFrameDecodingException("Undecodable frame: type = " + type + " channel = " + channel + + " bodySize = " + bodySize); } - */ + AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory); @@ -115,6 +124,6 @@ public class AMQDataBlockDecoder public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception { - out.write(createAndPopulateFrame(in)); + out.write(createAndPopulateFrame(session, in)); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java index 3446563d35..478cdeb406 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java @@ -28,17 +28,16 @@ import org.apache.mina.filter.codec.demux.MessageEncoder; import java.util.HashSet; import java.util.Set; +import java.util.Collections; -public class AMQDataBlockEncoder implements MessageEncoder +public final class AMQDataBlockEncoder implements MessageEncoder { - Logger _logger = Logger.getLogger(AMQDataBlockEncoder.class); + private static final Logger _logger = Logger.getLogger(AMQDataBlockEncoder.class); - private Set _messageTypes; + private final Set _messageTypes = Collections.singleton(EncodableAMQDataBlock.class); public AMQDataBlockEncoder() { - _messageTypes = new HashSet(); - _messageTypes.add(EncodableAMQDataBlock.class); } public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java b/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java index 9e98d9792b..11f505fd4b 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java @@ -24,59 +24,52 @@ import org.apache.mina.common.ByteBuffer; public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock { - public int channel; + private final int _channel; - public AMQBody bodyFrame; + private final AMQBody _bodyFrame; - public AMQFrame() - { - } - public AMQFrame(int channel, AMQBody bodyFrame) + + public AMQFrame(final int channel, final AMQBody bodyFrame) { - this.channel = channel; - this.bodyFrame = bodyFrame; + _channel = channel; + _bodyFrame = bodyFrame; } - public AMQFrame(ByteBuffer in, int channel, long bodySize, BodyFactory bodyFactory) throws AMQFrameDecodingException + public AMQFrame(final ByteBuffer in, final int channel, final long bodySize, final BodyFactory bodyFactory) throws AMQFrameDecodingException { - this.channel = channel; - this.bodyFrame = bodyFactory.createBody(in,bodySize); + this._channel = channel; + this._bodyFrame = bodyFactory.createBody(in,bodySize); } public long getSize() { - return 1 + 2 + 4 + bodyFrame.getSize() + 1; + return 1 + 2 + 4 + _bodyFrame.getSize() + 1; } public void writePayload(ByteBuffer buffer) { - buffer.put(bodyFrame.getFrameType()); - // TODO: how does channel get populated - EncodingUtils.writeUnsignedShort(buffer, channel); - EncodingUtils.writeUnsignedInteger(buffer, bodyFrame.getSize()); - bodyFrame.writePayload(buffer); + buffer.put(_bodyFrame.getFrameType()); + EncodingUtils.writeUnsignedShort(buffer, _channel); + EncodingUtils.writeUnsignedInteger(buffer, _bodyFrame.getSize()); + _bodyFrame.writePayload(buffer); buffer.put((byte) 0xCE); } - /** - * - * @param buffer - * @param channel unsigned short - * @param bodySize unsigned integer - * @param bodyFactory - * @throws AMQFrameDecodingException - */ - public void populateFromBuffer(ByteBuffer buffer, int channel, long bodySize, BodyFactory bodyFactory) - throws AMQFrameDecodingException, AMQProtocolVersionException + public final int getChannel() + { + return _channel; + } + + public final AMQBody getBodyFrame() { - this.channel = channel; - bodyFrame = bodyFactory.createBody(buffer, bodySize); - + return _bodyFrame; } + + public String toString() { - return "Frame channelId: " + channel + ", bodyFrame: " + String.valueOf(bodyFrame); + return "Frame channelId: " + _channel + ", bodyFrame: " + String.valueOf(_bodyFrame); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java index cd178a6197..3fa5b150ab 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java @@ -57,7 +57,7 @@ public abstract class AMQMethodBody extends AMQBody protected abstract void writeMethodPayload(ByteBuffer buffer); - protected byte getFrameType() + public byte getFrameType() { return TYPE; } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java index 95b461b6dc..5293c00379 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java @@ -22,30 +22,21 @@ package org.apache.qpid.framing; import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; public class AMQMethodBodyFactory implements BodyFactory { private static final Logger _log = Logger.getLogger(AMQMethodBodyFactory.class); + + private final AMQVersionAwareProtocolSession _protocolSession; - private static final AMQMethodBodyFactory _instance = new AMQMethodBodyFactory(); - - public static AMQMethodBodyFactory getInstance() - { - return _instance; - } - - private AMQMethodBodyFactory() + public AMQMethodBodyFactory(AMQVersionAwareProtocolSession protocolSession) { - _log.debug("Creating method body factory"); + _protocolSession = protocolSession; } public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException { - // AMQP version change: MethodBodyDecoderRegistry is obsolete, since all the XML - // segments generated together are now handled by MainRegistry. The Cluster class, - // if generated together with amqp.xml is a part of MainRegistry. - // TODO: Connect with version acquired from ProtocolInitiation class. - return MainRegistry.get((short)in.getUnsignedShort(), (short)in.getUnsignedShort(), - (byte)8, (byte)0, in, bodySize); + return _protocolSession.getRegistry().get((short)in.getUnsignedShort(), (short)in.getUnsignedShort(), in, bodySize); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java index c0a12a9aad..cfbc9d1828 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java @@ -6,4 +6,5 @@ import org.apache.mina.common.ByteBuffer; public abstract interface AMQMethodBodyInstanceFactory
{
public AMQMethodBody newInstance(byte major, byte minor, ByteBuffer buffer, long size) throws AMQFrameDecodingException;
+ public AMQMethodBody newInstance(byte major, byte minor, int clazzID, int methodID, ByteBuffer buffer, long size) throws AMQFrameDecodingException;
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java index f536d73469..47d349a675 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java @@ -260,24 +260,14 @@ public final class AMQShortString implements CharSequence final AMQShortString otherString = (AMQShortString) o;
- if(otherString.length() != length())
- {
- return false;
- }
if((_hashCode != 0) && (otherString._hashCode != 0) && (_hashCode != otherString._hashCode))
{
return false;
}
- final int size = length();
- for(int i = 0; i < size; i++)
- {
- if(_data.get(i) != otherString._data.get(i))
- {
- return false;
- }
- }
- return true;
+ return _data.equals(otherString._data);
+
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java index baeecaa17a..c35fc0a6c4 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java @@ -49,7 +49,7 @@ public class ContentBody extends AMQBody this.payload = payload; } - protected byte getFrameType() + public byte getFrameType() { return TYPE; } @@ -98,9 +98,7 @@ public class ContentBody extends AMQBody public static AMQFrame createAMQFrame(int channelId, ContentBody body) { - final AMQFrame frame = new AMQFrame(); - frame.channel = channelId; - frame.bodyFrame = body; + final AMQFrame frame = new AMQFrame(channelId, body); return frame; } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java index 45280bdae3..02631a5f88 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java @@ -65,7 +65,7 @@ public class ContentHeaderBody extends AMQBody this.bodySize = bodySize; } - protected byte getFrameType() + public byte getFrameType() { return TYPE; } @@ -113,17 +113,11 @@ public class ContentHeaderBody extends AMQBody public static AMQFrame createAMQFrame(int channelId, int classId, int weight, BasicContentHeaderProperties properties, long bodySize) { - final AMQFrame frame = new AMQFrame(); - frame.channel = channelId; - frame.bodyFrame = new ContentHeaderBody(classId, weight, properties, bodySize); - return frame; + return new AMQFrame(channelId, new ContentHeaderBody(classId, weight, properties, bodySize)); } public static AMQFrame createAMQFrame(int channelId, ContentHeaderBody body) { - final AMQFrame frame = new AMQFrame(); - frame.channel = channelId; - frame.bodyFrame = body; - return frame; + return new AMQFrame(channelId, body); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java index ca03f29047..7246c4a1cf 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java @@ -41,7 +41,7 @@ public class HeartbeatBody extends AMQBody } } - protected byte getFrameType() + public byte getFrameType() { return TYPE; } diff --git a/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java b/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java new file mode 100644 index 0000000000..9bc8232d61 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java @@ -0,0 +1,141 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.log4j.Logger;
+
+public class VersionSpecificRegistry
+{
+ private static final Logger _log = Logger.getLogger(VersionSpecificRegistry.class);
+
+
+ private final byte _protocolMajorVersion;
+ private final byte _protocolMinorVersion;
+
+ private static final int DEFAULT_MAX_CLASS_ID = 200;
+ private static final int DEFAULT_MAX_METHOD_ID = 50;
+
+ private AMQMethodBodyInstanceFactory[][] _registry = new AMQMethodBodyInstanceFactory[DEFAULT_MAX_CLASS_ID][];
+
+ public VersionSpecificRegistry(byte major, byte minor)
+ {
+ _protocolMajorVersion = major;
+ _protocolMinorVersion = minor;
+ }
+
+ public byte getProtocolMajorVersion()
+ {
+ return _protocolMajorVersion;
+ }
+
+ public byte getProtocolMinorVersion()
+ {
+ return _protocolMinorVersion;
+ }
+
+ public AMQMethodBodyInstanceFactory getMethodBody(final short classID, final short methodID)
+ {
+ try
+ {
+ return _registry[classID][methodID];
+ }
+ catch (IndexOutOfBoundsException e)
+ {
+ return null;
+ }
+ catch (NullPointerException e)
+ {
+ return null;
+ }
+ }
+
+ public void registerMethod(final short classID, final short methodID, final AMQMethodBodyInstanceFactory instanceFactory)
+ {
+ if(_registry.length <= classID)
+ {
+ AMQMethodBodyInstanceFactory[][] oldRegistry = _registry;
+ _registry = new AMQMethodBodyInstanceFactory[classID+1][];
+ System.arraycopy(oldRegistry, 0, _registry, 0, oldRegistry.length);
+ }
+
+ if(_registry[classID] == null)
+ {
+ _registry[classID] = new AMQMethodBodyInstanceFactory[methodID > DEFAULT_MAX_METHOD_ID ? methodID + 1 : DEFAULT_MAX_METHOD_ID + 1];
+ }
+ else if(_registry[classID].length <= methodID)
+ {
+ AMQMethodBodyInstanceFactory[] oldMethods = _registry[classID];
+ _registry[classID] = new AMQMethodBodyInstanceFactory[methodID+1];
+ System.arraycopy(oldMethods,0,_registry[classID],0,oldMethods.length);
+ }
+
+ _registry[classID][methodID] = instanceFactory;
+
+ }
+
+
+ public AMQMethodBody get(short classID, short methodID, ByteBuffer in, long size)
+ throws AMQFrameDecodingException
+ {
+ AMQMethodBodyInstanceFactory bodyFactory;
+ try
+ {
+ bodyFactory = _registry[classID][methodID];
+ }
+ catch(NullPointerException e)
+ {
+ throw new AMQFrameDecodingException(_log,
+ "Class " + classID + " unknown in AMQP version " + _protocolMajorVersion + "-" + _protocolMinorVersion
+ + " (while trying to decode class " + classID + " method " + methodID + ".");
+ }
+ catch(IndexOutOfBoundsException e)
+ {
+ if(classID >= _registry.length)
+ {
+ throw new AMQFrameDecodingException(_log,
+ "Class " + classID + " unknown in AMQP version " + _protocolMajorVersion + "-" + _protocolMinorVersion
+ + " (while trying to decode class " + classID + " method " + methodID + ".");
+
+ }
+ else
+ {
+ throw new AMQFrameDecodingException(_log,
+ "Method " + methodID + " unknown in AMQP version " + _protocolMajorVersion + "-" + _protocolMinorVersion
+ + " (while trying to decode class " + classID + " method " + methodID + ".");
+
+ }
+ }
+
+
+ if (bodyFactory == null)
+ {
+ throw new AMQFrameDecodingException(_log,
+ "Method " + methodID + " unknown in AMQP version " + _protocolMajorVersion + "-" + _protocolMinorVersion
+ + " (while trying to decode class " + classID + " method " + methodID + ".");
+ }
+
+
+ return bodyFactory.newInstance(_protocolMajorVersion, _protocolMinorVersion, classID, methodID, in, size);
+
+
+ }
+}
diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java new file mode 100644 index 0000000000..a2d3de2f9e --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java @@ -0,0 +1,29 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.protocol;
+
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.VersionSpecificRegistry;
+
+public interface AMQVersionAwareProtocolSession extends AMQProtocolWriter, ProtocolVersionAware
+{
+ public VersionSpecificRegistry getRegistry();
+}
diff --git a/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java b/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java new file mode 100644 index 0000000000..64db953bc2 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java @@ -0,0 +1,28 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.protocol;
+
+public interface ProtocolVersionAware
+{
+ public byte getProtocolMinorVersion();
+
+ public byte getProtocolMajorVersion();
+}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java index 385b5b598a..bf422742b5 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java +++ b/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java @@ -26,7 +26,6 @@ import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageMetaData; -import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.List; @@ -88,7 +87,7 @@ public class SkeletonMessageStore implements MessageStore return _messageId.getAndIncrement(); } - public void storeContentBodyChunk(StoreContext sc, long messageId, int index, ContentBody contentBody) throws AMQException + public void storeContentBodyChunk(StoreContext sc, long messageId, int index, ContentBody contentBody, boolean lastContentBody) throws AMQException { } |