From 58fb658d72dd0a0d750fdd3f97e607cf3d2e9134 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Mon, 15 Jul 2013 23:32:24 +0000 Subject: QPID-4659 : [Java Broker] tidy up amqp 0-8 implementation, reduce unnecessary usage in tests git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1503523 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/logging/actors/AMQPConnectionActor.java | 2 - .../qpid/server/protocol/AMQProtocolSession.java | 231 ---------------- .../qpid/server/protocol/v0_8/AMQChannel.java | 2 - .../server/protocol/v0_8/AMQProtocolEngine.java | 1 - .../server/protocol/v0_8/AMQProtocolSession.java | 231 ++++++++++++++++ .../qpid/server/protocol/v0_8/IncomingMessage.java | 289 +++++++++++++++++++++ .../server/protocol/v0_8/SubscriptionFactory.java | 1 - .../protocol/v0_8/SubscriptionFactoryImpl.java | 1 - .../server/protocol/v0_8/SubscriptionImpl.java | 1 - .../v0_8/handler/AccessRequestHandler.java | 2 +- .../v0_8/handler/BasicAckMethodHandler.java | 2 +- .../v0_8/handler/BasicCancelMethodHandler.java | 2 +- .../v0_8/handler/BasicConsumeMethodHandler.java | 2 +- .../v0_8/handler/BasicGetMethodHandler.java | 2 +- .../v0_8/handler/BasicPublishMethodHandler.java | 2 +- .../protocol/v0_8/handler/BasicQosHandler.java | 2 +- .../v0_8/handler/BasicRecoverMethodHandler.java | 2 +- .../handler/BasicRecoverSyncMethodHandler.java | 2 +- .../v0_8/handler/BasicRejectMethodHandler.java | 2 +- .../protocol/v0_8/handler/ChannelCloseHandler.java | 2 +- .../protocol/v0_8/handler/ChannelFlowHandler.java | 2 +- .../protocol/v0_8/handler/ChannelOpenHandler.java | 2 +- .../v0_8/handler/ConnectionCloseMethodHandler.java | 2 +- .../handler/ConnectionCloseOkMethodHandler.java | 2 +- .../v0_8/handler/ConnectionOpenMethodHandler.java | 2 +- .../handler/ConnectionSecureOkMethodHandler.java | 2 +- .../handler/ConnectionStartOkMethodHandler.java | 2 +- .../handler/ConnectionTuneOkMethodHandler.java | 2 +- .../v0_8/handler/ExchangeBoundHandler.java | 2 +- .../v0_8/handler/ExchangeDeclareHandler.java | 2 +- .../v0_8/handler/ExchangeDeleteHandler.java | 2 +- .../protocol/v0_8/handler/QueueBindHandler.java | 2 +- .../protocol/v0_8/handler/QueueDeclareHandler.java | 2 +- .../protocol/v0_8/handler/QueueDeleteHandler.java | 2 +- .../protocol/v0_8/handler/QueuePurgeHandler.java | 2 +- .../protocol/v0_8/handler/QueueUnbindHandler.java | 2 +- .../protocol/v0_8/handler/TxCommitHandler.java | 2 +- .../protocol/v0_8/handler/TxRollbackHandler.java | 2 +- .../protocol/v0_8/handler/TxSelectHandler.java | 2 +- .../v0_8/output/ProtocolOutputConverter.java | 2 +- .../v0_8/output/ProtocolOutputConverterImpl.java | 2 +- .../output/ProtocolOutputConverterRegistry.java | 2 +- .../protocol/v0_8/state/AMQStateManager.java | 2 +- .../apache/qpid/server/queue/IncomingMessage.java | 288 -------------------- .../apache/qpid/server/ack/AcknowledgeTest.java | 180 ------------- .../qpid/server/exchange/TopicExchangeTest.java | 221 +++++----------- .../logging/actors/AMQPChannelActorTest.java | 4 +- .../actors/BaseConnectionActorTestCase.java | 24 +- .../server/logging/actors/CurrentActorTest.java | 8 +- .../qpid/server/logging/actors/QueueActorTest.java | 2 +- .../logging/actors/SubscriptionActorTest.java | 2 +- .../logging/subjects/AbstractTestLogSubject.java | 4 +- .../logging/subjects/ChannelLogSubjectTest.java | 13 +- .../logging/subjects/ConnectionLogSubjectTest.java | 37 +-- .../subjects/SubscriptionLogSubjectTest.java | 117 --------- .../server/protocol/AMQProtocolEngineTest.java | 73 ------ .../qpid/server/protocol/v0_8/AMQChannelTest.java | 1 - .../protocol/v0_8/AMQProtocolEngineTest.java | 72 +++++ .../apache/qpid/server/protocol/v0_8/AckTest.java | 4 +- .../qpid/server/protocol/v0_8/AcknowledgeTest.java | 181 +++++++++++++ .../server/protocol/v0_8/BrokerTestHelper_0_8.java | 99 +++++++ .../qpid/server/protocol/v0_8/MaxChannelsTest.java | 4 +- .../protocol/v0_8/QueueBrowserUsesNoAckTest.java | 149 +++++++++++ .../protocol/v0_8/SubscriptionFactoryImplTest.java | 96 +++++++ .../protocol/v0_8/SubscriptionLogSubjectTest.java | 115 ++++++++ .../qpid/server/queue/SimpleAMQQueueTest.java | 1 + .../subscription/QueueBrowserUsesNoAckTest.java | 147 ----------- .../subscription/SubscriptionFactoryImplTest.java | 98 ------- .../apache/qpid/server/util/BrokerTestHelper.java | 70 ++--- .../apache/qpid/server/store/MessageStoreTest.java | 2 +- 70 files changed, 1401 insertions(+), 1438 deletions(-) delete mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java delete mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java delete mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java delete mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubjectTest.java delete mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolEngineTest.java create mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java create mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java create mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java create mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java create mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImplTest.java create mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionLogSubjectTest.java delete mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/QueueBrowserUsesNoAckTest.java delete mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java (limited to 'qpid/java') diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java index a4bd002f01..99e11bf9ae 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java @@ -23,8 +23,6 @@ package org.apache.qpid.server.logging.actors; import org.apache.qpid.server.logging.RootMessageLogger; import org.apache.qpid.server.logging.subjects.ConnectionLogSubject; import org.apache.qpid.server.protocol.AMQConnectionModel; -import org.apache.qpid.server.protocol.AMQProtocolSession; - /** diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java deleted file mode 100644 index 5ed7017e46..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ /dev/null @@ -1,231 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol; - -import java.net.SocketAddress; -import java.security.Principal; -import java.util.List; -import java.util.concurrent.locks.Lock; - -import javax.security.auth.Subject; -import javax.security.sasl.SaslServer; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.MethodDispatcher; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; -import org.apache.qpid.server.security.AuthorizationHolder; -import org.apache.qpid.server.subscription.ClientDeliveryMethod; -import org.apache.qpid.server.virtualhost.VirtualHost; - - -public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, AuthorizationHolder, AMQConnectionModel -{ - long getSessionID(); - - LogActor getLogActor(); - - void setMaxFrameSize(long frameMax); - - long getMaxFrameSize(); - - boolean isClosing(); - - void flushBatched(); - - void setDeferFlush(boolean defer); - - ClientDeliveryMethod createDeliveryMethod(int channelId); - - long getLastReceivedTime(); - - /** - * Return the local socket address for the connection - * - * @return the socket address - */ - SocketAddress getLocalAddress(); - - public static interface Task - { - public void doTask(AMQProtocolSession session) throws AMQException; - } - - /** - * Get the context key associated with this session. Context key is described in the AMQ protocol specification (RFC - * 6). - * - * @return the context key - */ - AMQShortString getContextKey(); - - /** - * Set the context key associated with this session. Context key is described in the AMQ protocol specification (RFC - * 6). - * - * @param contextKey the context key - */ - void setContextKey(AMQShortString contextKey); - - /** - * Get the channel for this session associated with the specified id. A channel id is unique per connection (i.e. - * per session). - * - * @param channelId the channel id which must be valid - * - * @return null if no channel exists, the channel otherwise - */ - AMQChannel getChannel(int channelId); - - /** - * Associate a channel with this session. - * - * @param channel the channel to associate with this session. It is an error to associate the same channel with more - * than one session but this is not validated. - */ - void addChannel(AMQChannel channel) throws AMQException; - - /** - * Close a specific channel. This will remove any resources used by the channel, including: - * - * @param channelId id of the channel to close - * - * @throws org.apache.qpid.AMQException if an error occurs closing the channel - * @throws IllegalArgumentException if the channel id is not valid - */ - void closeChannel(int channelId) throws AMQException; - - void closeChannel(int channelId, AMQConstant cause, String message) throws AMQException; - - /** - * Markes the specific channel as closed. This will release the lock for that channel id so a new channel can be - * created on that id. - * - * @param channelId id of the channel to close - */ - void closeChannelOk(int channelId); - - /** - * Check to see if this chanel is closing - * - * @param channelId id to check - * @return boolean with state of channel awaiting closure - */ - boolean channelAwaitingClosure(int channelId); - - /** - * Remove a channel from the session but do not close it. - * - * @param channelId - */ - void removeChannel(int channelId); - - /** - * Initialise heartbeats on the session. - * - * @param delay delay in seconds (not ms) - */ - void initHeartbeats(int delay); - - /** This must be called when the session is _closed in order to free up any resources managed by the session. */ - void closeSession() throws AMQException; - - void closeProtocolSession(); - - /** @return a key that uniquely identifies this session */ - Object getKey(); - - /** - * Get the fully qualified domain name of the local address to which this session is bound. Since some servers may - * be bound to multiple addresses this could vary depending on the acceptor this session was created from. - * - * @return a String FQDN - */ - String getLocalFQDN(); - - /** @return the sasl server that can perform authentication for this session. */ - SaslServer getSaslServer(); - - /** - * Set the sasl server that is to perform authentication for this session. - * - * @param saslServer - */ - void setSaslServer(SaslServer saslServer); - - void setClientProperties(FieldTable clientProperties); - - Object getReference(); - - VirtualHost getVirtualHost(); - - void setVirtualHost(VirtualHost virtualHost) throws AMQException; - - void addSessionCloseTask(Task task); - - void removeSessionCloseTask(Task task); - - public ProtocolOutputConverter getProtocolOutputConverter(); - - void setAuthorizedSubject(Subject authorizedSubject); - - public java.net.SocketAddress getRemoteAddress(); - - public MethodRegistry getMethodRegistry(); - - public MethodDispatcher getMethodDispatcher(); - - String getClientVersion(); - - long getLastIoTime(); - - long getWrittenBytes(); - - Long getMaximumNumberOfChannels(); - - void setMaximumNumberOfChannels(Long value); - - void commitTransactions(AMQChannel channel) throws AMQException; - - void rollbackTransactions(AMQChannel channel) throws AMQException; - - List getChannels(); - - void mgmtCloseChannel(int channelId); - - public Principal getPeerPrincipal(); - - Lock getReceivedLock(); - - /** - * Used for 0-8/0-9/0-9-1 connections to choose to close - * the connection when a transactional session receives a 'mandatory' message which - * can't be routed rather than returning the message. - */ - boolean isCloseWhenNoRoute(); -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 6dac83a13f..6867ee7bb5 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -71,12 +71,10 @@ import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.protocol.AMQConnectionModel; -import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.InboundMessageAdapter; -import org.apache.qpid.server.queue.IncomingMessage; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.MessageStore; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 82b1f6a193..dcf8d1fd47 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -73,7 +73,6 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.protocol.ServerProtocolEngine; -import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java new file mode 100644 index 0000000000..559ab3468e --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java @@ -0,0 +1,231 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_8; + +import java.net.SocketAddress; +import java.security.Principal; +import java.util.List; +import java.util.concurrent.locks.Lock; + +import javax.security.auth.Subject; +import javax.security.sasl.SaslServer; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.MethodDispatcher; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; +import org.apache.qpid.server.security.AuthorizationHolder; +import org.apache.qpid.server.subscription.ClientDeliveryMethod; +import org.apache.qpid.server.virtualhost.VirtualHost; + + +public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, AuthorizationHolder, AMQConnectionModel +{ + long getSessionID(); + + LogActor getLogActor(); + + void setMaxFrameSize(long frameMax); + + long getMaxFrameSize(); + + boolean isClosing(); + + void flushBatched(); + + void setDeferFlush(boolean defer); + + ClientDeliveryMethod createDeliveryMethod(int channelId); + + long getLastReceivedTime(); + + /** + * Return the local socket address for the connection + * + * @return the socket address + */ + SocketAddress getLocalAddress(); + + public static interface Task + { + public void doTask(AMQProtocolSession session) throws AMQException; + } + + /** + * Get the context key associated with this session. Context key is described in the AMQ protocol specification (RFC + * 6). + * + * @return the context key + */ + AMQShortString getContextKey(); + + /** + * Set the context key associated with this session. Context key is described in the AMQ protocol specification (RFC + * 6). + * + * @param contextKey the context key + */ + void setContextKey(AMQShortString contextKey); + + /** + * Get the channel for this session associated with the specified id. A channel id is unique per connection (i.e. + * per session). + * + * @param channelId the channel id which must be valid + * + * @return null if no channel exists, the channel otherwise + */ + AMQChannel getChannel(int channelId); + + /** + * Associate a channel with this session. + * + * @param channel the channel to associate with this session. It is an error to associate the same channel with more + * than one session but this is not validated. + */ + void addChannel(AMQChannel channel) throws AMQException; + + /** + * Close a specific channel. This will remove any resources used by the channel, including: + * + * @param channelId id of the channel to close + * + * @throws org.apache.qpid.AMQException if an error occurs closing the channel + * @throws IllegalArgumentException if the channel id is not valid + */ + void closeChannel(int channelId) throws AMQException; + + void closeChannel(int channelId, AMQConstant cause, String message) throws AMQException; + + /** + * Markes the specific channel as closed. This will release the lock for that channel id so a new channel can be + * created on that id. + * + * @param channelId id of the channel to close + */ + void closeChannelOk(int channelId); + + /** + * Check to see if this chanel is closing + * + * @param channelId id to check + * @return boolean with state of channel awaiting closure + */ + boolean channelAwaitingClosure(int channelId); + + /** + * Remove a channel from the session but do not close it. + * + * @param channelId + */ + void removeChannel(int channelId); + + /** + * Initialise heartbeats on the session. + * + * @param delay delay in seconds (not ms) + */ + void initHeartbeats(int delay); + + /** This must be called when the session is _closed in order to free up any resources managed by the session. */ + void closeSession() throws AMQException; + + void closeProtocolSession(); + + /** @return a key that uniquely identifies this session */ + Object getKey(); + + /** + * Get the fully qualified domain name of the local address to which this session is bound. Since some servers may + * be bound to multiple addresses this could vary depending on the acceptor this session was created from. + * + * @return a String FQDN + */ + String getLocalFQDN(); + + /** @return the sasl server that can perform authentication for this session. */ + SaslServer getSaslServer(); + + /** + * Set the sasl server that is to perform authentication for this session. + * + * @param saslServer + */ + void setSaslServer(SaslServer saslServer); + + void setClientProperties(FieldTable clientProperties); + + Object getReference(); + + VirtualHost getVirtualHost(); + + void setVirtualHost(VirtualHost virtualHost) throws AMQException; + + void addSessionCloseTask(Task task); + + void removeSessionCloseTask(Task task); + + public ProtocolOutputConverter getProtocolOutputConverter(); + + void setAuthorizedSubject(Subject authorizedSubject); + + public java.net.SocketAddress getRemoteAddress(); + + public MethodRegistry getMethodRegistry(); + + public MethodDispatcher getMethodDispatcher(); + + String getClientVersion(); + + long getLastIoTime(); + + long getWrittenBytes(); + + Long getMaximumNumberOfChannels(); + + void setMaximumNumberOfChannels(Long value); + + void commitTransactions(AMQChannel channel) throws AMQException; + + void rollbackTransactions(AMQChannel channel) throws AMQException; + + List getChannels(); + + void mgmtCloseChannel(int channelId); + + public Principal getPeerPrincipal(); + + Lock getReceivedLock(); + + /** + * Used for 0-8/0-9/0-9-1 connections to choose to close + * the connection when a transactional session receives a 'mandatory' message which + * can't be routed rather than returning the message. + */ + boolean isCloseWhenNoRoute(); +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java new file mode 100644 index 0000000000..90c764daac --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java @@ -0,0 +1,289 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_8; + +import org.apache.log4j.Logger; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.EnqueableMessage; +import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.message.MessageContentSource; +import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.queue.Filterable; +import org.apache.qpid.server.store.StoredMessage; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +public class IncomingMessage implements Filterable, InboundMessage, EnqueableMessage, MessageContentSource +{ + + /** Used for debugging purposes. */ + private static final Logger _logger = Logger.getLogger(IncomingMessage.class); + + private final MessagePublishInfo _messagePublishInfo; + private ContentHeaderBody _contentHeaderBody; + + + /** + * Keeps a track of how many bytes we have received in body frames + */ + private long _bodyLengthReceived = 0; + + /** + * This is stored during routing, to know the queues to which this message should immediately be + * delivered. It is cleared after delivery has been attempted. Any persistent record of destinations is done + * by the message handle. + */ + private List _destinationQueues; + + private long _expiration; + + private Exchange _exchange; + + private List _contentChunks = new ArrayList(); + + // we keep both the original meta data object and the store reference to it just in case the + // store would otherwise flow it to disk + + private MessageMetaData _messageMetaData; + + private StoredMessage _storedMessageHandle; + private Object _connectionReference; + + + public IncomingMessage( + final MessagePublishInfo info + ) + { + this(info, null); + } + + public IncomingMessage(MessagePublishInfo info, Object reference) + { + _messagePublishInfo = info; + _connectionReference = reference; + } + + public void setContentHeaderBody(final ContentHeaderBody contentHeaderBody) throws AMQException + { + _contentHeaderBody = contentHeaderBody; + } + + public void setExpiration() + { + _expiration = ((BasicContentHeaderProperties) _contentHeaderBody.getProperties()).getExpiration(); + } + + public MessageMetaData headersReceived(long currentTime) + { + _messageMetaData = new MessageMetaData(_messagePublishInfo, _contentHeaderBody, 0, currentTime); + return _messageMetaData; + } + + + public List getDestinationQueues() + { + return _destinationQueues; + } + + public void addContentBodyFrame(final ContentChunk contentChunk) throws AMQException + { + _bodyLengthReceived += contentChunk.getSize(); + _contentChunks.add(contentChunk); + } + + public boolean allContentReceived() + { + return (_bodyLengthReceived == getContentHeader().getBodySize()); + } + + public AMQShortString getExchange() + { + return _messagePublishInfo.getExchange(); + } + + public AMQShortString getRoutingKeyShortString() + { + return _messagePublishInfo.getRoutingKey(); + } + + public String getRoutingKey() + { + return _messagePublishInfo.getRoutingKey() == null ? null : _messagePublishInfo.getRoutingKey().toString(); + } + + public String getBinding() + { + return _messagePublishInfo.getRoutingKey() == null ? null : _messagePublishInfo.getRoutingKey().toString(); + } + + + public boolean isMandatory() + { + return _messagePublishInfo.isMandatory(); + } + + + public boolean isImmediate() + { + return _messagePublishInfo.isImmediate(); + } + + public ContentHeaderBody getContentHeader() + { + return _contentHeaderBody; + } + + + public AMQMessageHeader getMessageHeader() + { + return _messageMetaData.getMessageHeader(); + } + + public boolean isPersistent() + { + return getContentHeader().getProperties() instanceof BasicContentHeaderProperties && + ((BasicContentHeaderProperties) getContentHeader().getProperties()).getDeliveryMode() == + BasicContentHeaderProperties.PERSISTENT; + } + + public boolean isRedelivered() + { + return false; + } + + + public long getSize() + { + return getContentHeader().getBodySize(); + } + + public long getMessageNumber() + { + return _storedMessageHandle.getMessageNumber(); + } + + public void setExchange(final Exchange e) + { + _exchange = e; + } + + public void route() + { + enqueue(_exchange.route(this)); + + } + + public void enqueue(final List queues) + { + _destinationQueues = queues; + } + + public MessagePublishInfo getMessagePublishInfo() + { + return _messagePublishInfo; + } + + public long getExpiration() + { + return _expiration; + } + + public int getBodyCount() throws AMQException + { + return _contentChunks.size(); + } + + public ContentChunk getContentChunk(int index) + { + return _contentChunks.get(index); + } + + + public int getContent(ByteBuffer buf, int offset) + { + int pos = 0; + int written = 0; + for(ContentChunk cb : _contentChunks) + { + ByteBuffer data = ByteBuffer.wrap(cb.getData()); + if(offset+written >= pos && offset < pos + data.limit()) + { + ByteBuffer src = data.duplicate(); + src.position(offset+written - pos); + src = src.slice(); + + if(buf.remaining() < src.limit()) + { + src.limit(buf.remaining()); + } + int count = src.limit(); + buf.put(src); + written += count; + if(buf.remaining() == 0) + { + break; + } + } + pos+=data.limit(); + } + return written; + + } + + + public ByteBuffer getContent(int offset, int size) + { + ByteBuffer buf = ByteBuffer.allocate(size); + getContent(buf,offset); + buf.flip(); + return buf; + } + + public void setStoredMessage(StoredMessage storedMessageHandle) + { + _storedMessageHandle = storedMessageHandle; + } + + public StoredMessage getStoredMessage() + { + return _storedMessageHandle; + } + + public Object getConnectionReference() + { + return _connectionReference; + } + + public MessageMetaData getMessageMetaData() + { + return _messageMetaData; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java index e66f0889dd..6646dc0cc2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java @@ -24,7 +24,6 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.flow.FlowCreditManager; -import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.subscription.ClientDeliveryMethod; import org.apache.qpid.server.subscription.RecordDeliveryMethod; import org.apache.qpid.server.subscription.Subscription; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java index d28a0785b1..93b51a0567 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java @@ -26,7 +26,6 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.flow.FlowCreditManager; -import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.subscription.ClientDeliveryMethod; import org.apache.qpid.server.subscription.RecordDeliveryMethod; import org.apache.qpid.server.subscription.Subscription; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java index ae8e019bd0..5803135b16 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java @@ -37,7 +37,6 @@ import org.apache.qpid.server.logging.messages.SubscriptionMessages; import org.apache.qpid.server.logging.subjects.SubscriptionLogSubject; import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; -import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/AccessRequestHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/AccessRequestHandler.java index d805049662..ae07d60c4e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/AccessRequestHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/AccessRequestHandler.java @@ -29,7 +29,7 @@ import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicAckMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicAckMethodHandler.java index 0858598457..f623d27e87 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicAckMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicAckMethodHandler.java @@ -25,7 +25,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicAckBody; import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicCancelMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicCancelMethodHandler.java index 9b2efa3d03..5a6a7bdc18 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicCancelMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicCancelMethodHandler.java @@ -27,7 +27,7 @@ import org.apache.qpid.framing.BasicCancelBody; import org.apache.qpid.framing.BasicCancelOkBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java index cfdde2c9fb..6577efe292 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java @@ -29,7 +29,7 @@ import org.apache.qpid.framing.BasicConsumeBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java index 9fac6d20fa..8883422989 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java @@ -32,7 +32,7 @@ import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.flow.MessageOnlyCreditManager; import org.apache.qpid.server.protocol.v0_8.AMQMessage; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java index 81a96a9699..96936dc429 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java @@ -30,7 +30,7 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.virtualhost.VirtualHost; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicQosHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicQosHandler.java index 85f85b3677..e4a6636a74 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicQosHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicQosHandler.java @@ -25,7 +25,7 @@ import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.BasicQosBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java index 7ce8316c94..0a79466b35 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java @@ -28,7 +28,7 @@ import org.apache.qpid.framing.BasicRecoverBody; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0; import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java index bf8565b3f2..b54e1c7dcf 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java @@ -30,7 +30,7 @@ import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91; import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java index 40619a5879..0cfdff3338 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java @@ -25,7 +25,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicRejectBody; import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseHandler.java index 6ff5d80048..e96d098618 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseHandler.java @@ -28,7 +28,7 @@ import org.apache.qpid.framing.ChannelCloseOkBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelFlowHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelFlowHandler.java index e29c22f25f..cc1677c93e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelFlowHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelFlowHandler.java @@ -27,7 +27,7 @@ import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.ChannelFlowBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java index 7d47efbc8b..442c912032 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java @@ -32,7 +32,7 @@ import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91; import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.virtualhost.VirtualHost; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseMethodHandler.java index b530b8a188..60f9c1d495 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseMethodHandler.java @@ -26,7 +26,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.ConnectionCloseBody; import org.apache.qpid.framing.ConnectionCloseOkBody; import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseOkMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseOkMethodHandler.java index 6b80a2c17e..fe46b6c0cd 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseOkMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseOkMethodHandler.java @@ -24,7 +24,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.ConnectionCloseOkBody; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQState; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java index 286f58e1b3..62b13baac2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java @@ -28,7 +28,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ConnectionOpenBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQState; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java index 628a6b40c3..d319f080d2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java @@ -32,7 +32,7 @@ import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.SubjectAuthenticationResult; import org.apache.qpid.server.protocol.v0_8.state.AMQState; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java index 1ffbf046d2..9350327346 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java @@ -31,7 +31,7 @@ import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.SubjectAuthenticationResult; import org.apache.qpid.server.protocol.v0_8.state.AMQState; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java index bc98bbad2d..5fddab6576 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java @@ -24,7 +24,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.ConnectionTuneOkBody; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQState; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java index ac5a64f48a..85f0a6fd3d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java @@ -27,7 +27,7 @@ import org.apache.qpid.framing.ExchangeBoundOkBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java index 7f8fbe1cdc..4949fcd62b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java @@ -32,7 +32,7 @@ import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.virtualhost.ExchangeExistsException; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java index e0fe3d7493..75f749fe9a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java @@ -27,7 +27,7 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeInUseException; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java index 62706d8c5d..0eed82b9de 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java @@ -32,7 +32,7 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java index 3f582a6d73..9f887d881d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java @@ -31,7 +31,7 @@ import org.apache.qpid.framing.QueueDeclareOkBody; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java index d6cac50a5d..6f5e0ea992 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java @@ -26,7 +26,7 @@ import org.apache.qpid.framing.QueueDeleteBody; import org.apache.qpid.framing.QueueDeleteOkBody; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java index ff4425b8f7..e925eb7455 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java @@ -27,7 +27,7 @@ import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.QueuePurgeBody; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java index f5d7a49915..aad5446cb5 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java @@ -33,7 +33,7 @@ import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxCommitHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxCommitHandler.java index db158d05cf..b257030a59 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxCommitHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxCommitHandler.java @@ -27,7 +27,7 @@ import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.TxCommitBody; import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java index 030b8fd94f..19d0da007b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java @@ -25,7 +25,7 @@ import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.TxRollbackBody; import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxSelectHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxSelectHandler.java index 7152368d8d..a43e1ebdab 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxSelectHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxSelectHandler.java @@ -25,7 +25,7 @@ import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.TxSelectBody; import org.apache.qpid.framing.TxSelectOkBody; import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java index bcedbd6e5f..48e42ce5a3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java @@ -32,7 +32,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.message.MessageContentSource; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.queue.QueueEntry; public interface ProtocolOutputConverter diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java index adba8e665a..dd5e13e56a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java @@ -38,7 +38,7 @@ import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.protocol.v0_8.AMQMessage; import org.apache.qpid.server.message.MessageContentSource; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.queue.QueueEntry; import java.io.DataOutput; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterRegistry.java index f38652fb32..d4332b37ee 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterRegistry.java @@ -29,7 +29,7 @@ package org.apache.qpid.server.protocol.v0_8.output; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter.Factory; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import java.util.HashMap; import java.util.Map; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java index 773a114002..0555bba98b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java @@ -32,7 +32,7 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java deleted file mode 100644 index a7cd07a301..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java +++ /dev/null @@ -1,288 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.log4j.Logger; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.abstraction.ContentChunk; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.message.AMQMessageHeader; -import org.apache.qpid.server.message.EnqueableMessage; -import org.apache.qpid.server.message.InboundMessage; -import org.apache.qpid.server.message.MessageContentSource; -import org.apache.qpid.server.protocol.v0_8.MessageMetaData; -import org.apache.qpid.server.store.StoredMessage; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; - -public class IncomingMessage implements Filterable, InboundMessage, EnqueableMessage, MessageContentSource -{ - - /** Used for debugging purposes. */ - private static final Logger _logger = Logger.getLogger(IncomingMessage.class); - - private final MessagePublishInfo _messagePublishInfo; - private ContentHeaderBody _contentHeaderBody; - - - /** - * Keeps a track of how many bytes we have received in body frames - */ - private long _bodyLengthReceived = 0; - - /** - * This is stored during routing, to know the queues to which this message should immediately be - * delivered. It is cleared after delivery has been attempted. Any persistent record of destinations is done - * by the message handle. - */ - private List _destinationQueues; - - private long _expiration; - - private Exchange _exchange; - - private List _contentChunks = new ArrayList(); - - // we keep both the original meta data object and the store reference to it just in case the - // store would otherwise flow it to disk - - private MessageMetaData _messageMetaData; - - private StoredMessage _storedMessageHandle; - private Object _connectionReference; - - - public IncomingMessage( - final MessagePublishInfo info - ) - { - this(info, null); - } - - public IncomingMessage(MessagePublishInfo info, Object reference) - { - _messagePublishInfo = info; - _connectionReference = reference; - } - - public void setContentHeaderBody(final ContentHeaderBody contentHeaderBody) throws AMQException - { - _contentHeaderBody = contentHeaderBody; - } - - public void setExpiration() - { - _expiration = ((BasicContentHeaderProperties) _contentHeaderBody.getProperties()).getExpiration(); - } - - public MessageMetaData headersReceived(long currentTime) - { - _messageMetaData = new MessageMetaData(_messagePublishInfo, _contentHeaderBody, 0, currentTime); - return _messageMetaData; - } - - - public List getDestinationQueues() - { - return _destinationQueues; - } - - public void addContentBodyFrame(final ContentChunk contentChunk) throws AMQException - { - _bodyLengthReceived += contentChunk.getSize(); - _contentChunks.add(contentChunk); - } - - public boolean allContentReceived() - { - return (_bodyLengthReceived == getContentHeader().getBodySize()); - } - - public AMQShortString getExchange() - { - return _messagePublishInfo.getExchange(); - } - - public AMQShortString getRoutingKeyShortString() - { - return _messagePublishInfo.getRoutingKey(); - } - - public String getRoutingKey() - { - return _messagePublishInfo.getRoutingKey() == null ? null : _messagePublishInfo.getRoutingKey().toString(); - } - - public String getBinding() - { - return _messagePublishInfo.getRoutingKey() == null ? null : _messagePublishInfo.getRoutingKey().toString(); - } - - - public boolean isMandatory() - { - return _messagePublishInfo.isMandatory(); - } - - - public boolean isImmediate() - { - return _messagePublishInfo.isImmediate(); - } - - public ContentHeaderBody getContentHeader() - { - return _contentHeaderBody; - } - - - public AMQMessageHeader getMessageHeader() - { - return _messageMetaData.getMessageHeader(); - } - - public boolean isPersistent() - { - return getContentHeader().getProperties() instanceof BasicContentHeaderProperties && - ((BasicContentHeaderProperties) getContentHeader().getProperties()).getDeliveryMode() == - BasicContentHeaderProperties.PERSISTENT; - } - - public boolean isRedelivered() - { - return false; - } - - - public long getSize() - { - return getContentHeader().getBodySize(); - } - - public long getMessageNumber() - { - return _storedMessageHandle.getMessageNumber(); - } - - public void setExchange(final Exchange e) - { - _exchange = e; - } - - public void route() - { - enqueue(_exchange.route(this)); - - } - - public void enqueue(final List queues) - { - _destinationQueues = queues; - } - - public MessagePublishInfo getMessagePublishInfo() - { - return _messagePublishInfo; - } - - public long getExpiration() - { - return _expiration; - } - - public int getBodyCount() throws AMQException - { - return _contentChunks.size(); - } - - public ContentChunk getContentChunk(int index) - { - return _contentChunks.get(index); - } - - - public int getContent(ByteBuffer buf, int offset) - { - int pos = 0; - int written = 0; - for(ContentChunk cb : _contentChunks) - { - ByteBuffer data = ByteBuffer.wrap(cb.getData()); - if(offset+written >= pos && offset < pos + data.limit()) - { - ByteBuffer src = data.duplicate(); - src.position(offset+written - pos); - src = src.slice(); - - if(buf.remaining() < src.limit()) - { - src.limit(buf.remaining()); - } - int count = src.limit(); - buf.put(src); - written += count; - if(buf.remaining() == 0) - { - break; - } - } - pos+=data.limit(); - } - return written; - - } - - - public ByteBuffer getContent(int offset, int size) - { - ByteBuffer buf = ByteBuffer.allocate(size); - getContent(buf,offset); - buf.flip(); - return buf; - } - - public void setStoredMessage(StoredMessage storedMessageHandle) - { - _storedMessageHandle = storedMessageHandle; - } - - public StoredMessage getStoredMessage() - { - return _storedMessageHandle; - } - - public Object getConnectionReference() - { - return _connectionReference; - } - - public MessageMetaData getMessageMetaData() - { - return _messageMetaData; - } -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java deleted file mode 100644 index 0e4dbe87a3..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java +++ /dev/null @@ -1,180 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.ack; - - -import org.apache.qpid.AMQException; -import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.v0_8.InternalTestProtocolSession; -import org.apache.qpid.server.queue.SimpleAMQQueue; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.TestableMemoryMessageStore; -import org.apache.qpid.server.util.BrokerTestHelper; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.test.utils.QpidTestCase; - -import java.util.List; - -public class AcknowledgeTest extends QpidTestCase -{ - private AMQChannel _channel; - private SimpleAMQQueue _queue; - private MessageStore _messageStore; - private String _queueName; - - @Override - public void setUp() throws Exception - { - super.setUp(); - BrokerTestHelper.setUp(); - _channel = BrokerTestHelper.createChannel(); - VirtualHost virtualHost = _channel.getVirtualHost(); - _queueName = getTestName(); - _queue = BrokerTestHelper.createQueue(_queueName, virtualHost); - _messageStore = virtualHost.getMessageStore(); - } - - @Override - public void tearDown() throws Exception - { - try - { - if (_channel != null) - { - _channel.getVirtualHost().close(); - } - } - finally - { - BrokerTestHelper.tearDown(); - super.tearDown(); - } - } - - private AMQChannel getChannel() - { - return _channel; - } - - private InternalTestProtocolSession getSession() - { - return (InternalTestProtocolSession)_channel.getProtocolSession(); - } - - private SimpleAMQQueue getQueue() - { - return _queue; - } - - public void testTransactionalSingleAck() throws AMQException - { - getChannel().setLocalTransactional(); - runMessageAck(1, 1, 1, false, 0); - } - - public void testTransactionalMultiAck() throws AMQException - { - getChannel().setLocalTransactional(); - runMessageAck(10, 1, 5, true, 5); - } - - public void testTransactionalAckAll() throws AMQException - { - getChannel().setLocalTransactional(); - runMessageAck(10, 1, 0, true, 0); - } - - public void testNonTransactionalSingleAck() throws AMQException - { - runMessageAck(1, 1, 1, false, 0); - } - - public void testNonTransactionalMultiAck() throws AMQException - { - runMessageAck(10, 1, 5, true, 5); - } - - public void testNonTransactionalAckAll() throws AMQException - { - runMessageAck(10, 1, 0, true, 0); - } - - protected void runMessageAck(int sendMessageCount, long firstDeliveryTag, long acknowledgeDeliveryTag, boolean acknowldegeMultiple, int remainingUnackedMessages) throws AMQException - { - //Check store is empty - checkStoreContents(0); - - //Send required messsages to the queue - BrokerTestHelper.publishMessages(getChannel(), sendMessageCount, _queueName, ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString()); - - if (getChannel().isTransactional()) - { - getChannel().commit(); - } - - //Ensure they are stored - checkStoreContents(sendMessageCount); - - //Check that there are no unacked messages - assertEquals("Channel should have no unacked msgs ", 0, getChannel().getUnacknowledgedMessageMap().size()); - - //Subscribe to the queue - AMQShortString subscriber = _channel.subscribeToQueue(null, _queue, true, null, false, true); - - getQueue().deliverAsync(); - - //Wait for the messages to be delivered - getSession().awaitDelivery(sendMessageCount); - - //Check that they are all waiting to be acknoledged - assertEquals("Channel should have unacked msgs", sendMessageCount, getChannel().getUnacknowledgedMessageMap().size()); - - List messages = getSession().getDelivers(getChannel().getChannelId(), subscriber, sendMessageCount); - - //Double check we received the right number of messages - assertEquals(sendMessageCount, messages.size()); - - //Check that the first message has the expected deliveryTag - assertEquals("First message does not have expected deliveryTag", firstDeliveryTag, messages.get(0).getDeliveryTag()); - - //Send required Acknowledgement - getChannel().acknowledgeMessage(acknowledgeDeliveryTag, acknowldegeMultiple); - - if (getChannel().isTransactional()) - { - getChannel().commit(); - } - - // Check Remaining Acknowledgements - assertEquals("Channel unacked msgs count incorrect", remainingUnackedMessages, getChannel().getUnacknowledgedMessageMap().size()); - - //Check store contents are also correct. - checkStoreContents(remainingUnackedMessages); - } - - private void checkStoreContents(int messageCount) - { - assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getMessageCount()); - } - -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java index b2d7237737..f2a64381df 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java @@ -20,33 +20,31 @@ */ package org.apache.qpid.server.exchange; +import java.util.List; import junit.framework.Assert; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.protocol.v0_8.AMQMessage; -import org.apache.qpid.server.protocol.v0_8.MessageMetaData; +import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.BaseQueue; -import org.apache.qpid.server.queue.IncomingMessage; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.TestMemoryMessageStore; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class TopicExchangeTest extends QpidTestCase { private TopicExchange _exchange; private VirtualHost _vhost; - private MessageStore _store; @Override @@ -56,7 +54,6 @@ public class TopicExchangeTest extends QpidTestCase BrokerTestHelper.setUp(); _exchange = new TopicExchange(); _vhost = BrokerTestHelper.createVirtualHost(getName()); - _store = new TestMemoryMessageStore(); } @Override @@ -82,8 +79,7 @@ public class TopicExchangeTest extends QpidTestCase _exchange.registerQueue(new Binding(null, "a.*.#.b",queue, _exchange, null)); - IncomingMessage message = createMessage("a.b"); - routeMessage(message); + routeMessage("a.b", 0l); Assert.assertEquals(0, queue.getMessageCount()); } @@ -94,21 +90,16 @@ public class TopicExchangeTest extends QpidTestCase _exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null)); - IncomingMessage message = createMessage("a.b"); - - routeMessage(message); + routeMessage("a.b",0l); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); + Assert.assertEquals("Wrong message received", 0l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); - - message = createMessage("a.c"); - - int queueCount = routeMessage(message); + int queueCount = routeMessage("a.c",1l); Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); @@ -121,34 +112,26 @@ public class TopicExchangeTest extends QpidTestCase _exchange.registerQueue(new Binding(null, "a.*",queue, _exchange, null)); - IncomingMessage message = createMessage("a.b"); - - routeMessage(message); + routeMessage("a.b",0l); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); + Assert.assertEquals("Wrong message received", 0l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); - message = createMessage("a.c"); - - int queueCount = routeMessage(message); + routeMessage("a.c",1l); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); + Assert.assertEquals("Wrong message received", 1l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); - - message = createMessage("a"); - - - queueCount = routeMessage(message); + int queueCount = routeMessage("a",2l); Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); @@ -160,57 +143,45 @@ public class TopicExchangeTest extends QpidTestCase _exchange.registerQueue(new Binding(null, "a.#",queue, _exchange, null)); - IncomingMessage message = createMessage("a.b.c"); - - int queueCount = routeMessage(message); + routeMessage("a.b.c",0l); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); + Assert.assertEquals("Wrong message received", 0l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); - - message = createMessage("a.b"); - - queueCount = routeMessage(message); + routeMessage("a.b",1l); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); + Assert.assertEquals("Wrong message received", 1l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); - message = createMessage("a.c"); - - queueCount = routeMessage(message); + routeMessage("a.c",2l); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); + Assert.assertEquals("Wrong message received", 2l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); - message = createMessage("a"); - - queueCount = routeMessage(message); + routeMessage("a",3l); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); + Assert.assertEquals("Wrong message received", 3l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); - message = createMessage("b"); - - - queueCount = routeMessage(message); + int queueCount = routeMessage("b", 4l); Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); @@ -222,25 +193,20 @@ public class TopicExchangeTest extends QpidTestCase AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, _vhost, null); _exchange.registerQueue(new Binding(null, "a.*.#.b",queue, _exchange, null)); - - IncomingMessage message = createMessage("a.c.d.b"); - - routeMessage(message); + routeMessage("a.c.d.b",0l); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); + Assert.assertEquals("Wrong message received", 0l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); - message = createMessage("a.c.b"); - - routeMessage(message); + routeMessage("a.c.b",1l); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); + Assert.assertEquals("Wrong message received", 1l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); @@ -253,39 +219,31 @@ public class TopicExchangeTest extends QpidTestCase _exchange.registerQueue(new Binding(null, "a.*.#.b.c",queue, _exchange, null)); - IncomingMessage message = createMessage("a.c.b.b"); - - int queueCount = routeMessage(message); + int queueCount = routeMessage("a.c.b.b",0l); Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); - message = createMessage("a.a.b.c"); - - routeMessage(message); + routeMessage("a.a.b.c",1l); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); + Assert.assertEquals("Wrong message received", 1l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); - message = createMessage("a.b.c.b"); - - queueCount = routeMessage(message); + queueCount = routeMessage("a.b.c.b",2l); Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); - message = createMessage("a.b.c.b.c"); - - routeMessage(message); + routeMessage("a.b.c.b.c",3l); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); + Assert.assertEquals("Wrong message received", 3l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); @@ -298,22 +256,16 @@ public class TopicExchangeTest extends QpidTestCase AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, _vhost, null); _exchange.registerQueue(new Binding(null, "a.*.#.b.c.#.d",queue, _exchange, null)); - - IncomingMessage message = createMessage("a.c.b.b.c"); - - int queueCount = routeMessage(message); + int queueCount = routeMessage("a.c.b.b.c",0l); Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); - - message = createMessage("a.a.b.c.d"); - - routeMessage(message); + routeMessage("a.a.b.c.d",1l); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); + Assert.assertEquals("Wrong message received", 1l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); @@ -325,21 +277,16 @@ public class TopicExchangeTest extends QpidTestCase AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, _vhost, null); _exchange.registerQueue(new Binding(null, "a.#.*.#.d",queue, _exchange, null)); - - IncomingMessage message = createMessage("a.c.b.b.c"); - - int queueCount = routeMessage(message); + int queueCount = routeMessage("a.c.b.b.c",0l); Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); - message = createMessage("a.a.b.c.d"); - - routeMessage(message); + routeMessage("a.a.b.c.d",1l); Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); + Assert.assertEquals("Wrong message received", 1l, queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(); Assert.assertEquals(0, queue.getMessageCount()); @@ -351,29 +298,30 @@ public class TopicExchangeTest extends QpidTestCase AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, _vhost, null); _exchange.registerQueue(new Binding(null, "a.b.c.d",queue, _exchange, null)); - - IncomingMessage message = createMessage("a.b.c"); - - int queueCount = routeMessage(message); + int queueCount = routeMessage("a.b.c",0l); Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); } - private int routeMessage(final IncomingMessage message) - throws AMQException + private int routeMessage(String routingKey, long messageNumber) throws AMQException { - MessageMetaData mmd = message.headersReceived(System.currentTimeMillis()); - message.setStoredMessage(_store.addMessage(mmd)); - - message.enqueue(_exchange.route(message)); - AMQMessage msg = new AMQMessage(message.getStoredMessage()); - for(BaseQueue q : message.getDestinationQueues()) + InboundMessage inboundMessage = mock(InboundMessage.class); + when(inboundMessage.getRoutingKey()).thenReturn(routingKey); + when(inboundMessage.getRoutingKeyShortString()).thenReturn(new AMQShortString(routingKey)); + List queues = _exchange.route(inboundMessage); + ServerMessage message = mock(ServerMessage.class); + MessageReference ref = mock(MessageReference.class); + when(ref.getMessage()).thenReturn(message); + when(message.newReference()).thenReturn(ref); + when(message.getMessageNumber()).thenReturn(messageNumber); + for(BaseQueue q : queues) { - q.enqueue(msg); + q.enqueue(message); } - return message.getDestinationQueues().size(); + + return queues.size(); } public void testMoreRouting() throws AMQException @@ -382,9 +330,7 @@ public class TopicExchangeTest extends QpidTestCase _exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null)); - IncomingMessage message = createMessage("a.b.c"); - - int queueCount = routeMessage(message); + int queueCount = routeMessage("a.b.c",0l); Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); @@ -397,62 +343,11 @@ public class TopicExchangeTest extends QpidTestCase _exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null)); - IncomingMessage message = createMessage("a"); - - int queueCount = routeMessage(message); + int queueCount = routeMessage("a",0l); Assert.assertEquals("Message should not route to any queues", 0, queueCount); Assert.assertEquals(0, queue.getMessageCount()); } - private IncomingMessage createMessage(String s) throws AMQException - { - MessagePublishInfo info = new PublishInfo(new AMQShortString(s)); - - IncomingMessage message = new IncomingMessage(info); - final ContentHeaderBody chb = new ContentHeaderBody(); - BasicContentHeaderProperties props = new BasicContentHeaderProperties(); - chb.setProperties(props); - message.setContentHeaderBody(chb); - - - return message; - } - - - class PublishInfo implements MessagePublishInfo - { - private AMQShortString _routingkey; - - PublishInfo(AMQShortString routingkey) - { - _routingkey = routingkey; - } - - public AMQShortString getExchange() - { - return null; - } - - public void setExchange(AMQShortString exchange) - { - - } - - public boolean isImmediate() - { - return false; - } - - public boolean isMandatory() - { - return true; - } - - public AMQShortString getRoutingKey() - { - return _routingkey; - } - } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java index 055197c23e..41b42fac78 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.logging.actors; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.util.BrokerTestHelper; import java.util.List; @@ -45,7 +45,7 @@ public class AMQPChannelActorTest extends BaseConnectionActorTestCase private void setUpNow() throws Exception { super.setUp(); - AMQChannel channel = BrokerTestHelper.createChannel(1, getSession()); + AMQSessionModel channel = BrokerTestHelper.createSession(1, getConnection()); setAmqpActor(new AMQPChannelActor(channel, getRootLogger())); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java index 09dd48e4d3..1cb6474e41 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java @@ -20,31 +20,43 @@ */ package org.apache.qpid.server.logging.actors; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.util.BrokerTestHelper; +import org.apache.qpid.server.virtualhost.VirtualHost; public class BaseConnectionActorTestCase extends BaseActorTestCase { - private AMQProtocolSession _session; + private AMQConnectionModel _session; + private VirtualHost _virtualHost; @Override public void setUp() throws Exception { super.setUp(); BrokerTestHelper.setUp(); - _session = BrokerTestHelper.createSession(); - + _session = BrokerTestHelper.createConnection(); + _virtualHost = BrokerTestHelper.createVirtualHost("test"); setAmqpActor(new AMQPConnectionActor(_session, getRootLogger())); } + public VirtualHost getVirtualHost() + { + return _virtualHost; + } + @Override public void tearDown() throws Exception { try { + if(_virtualHost != null) + { + _virtualHost.close(); + } if (_session != null) { - _session.getVirtualHost().close(); + _session.close(AMQConstant.CONNECTION_FORCED, ""); } } finally @@ -54,7 +66,7 @@ public class BaseConnectionActorTestCase extends BaseActorTestCase } } - public AMQProtocolSession getSession() + public AMQConnectionModel getConnection() { return _session; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java index 0c64ce837e..d413c4d4c9 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java @@ -23,9 +23,11 @@ package org.apache.qpid.server.logging.actors; import org.apache.commons.configuration.ConfigurationException; import org.apache.qpid.AMQException; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.NullRootMessageLogger; +import org.apache.qpid.server.util.BrokerTestHelper; /** * Test : CurrentActorTest @@ -71,7 +73,7 @@ public class CurrentActorTest extends BaseConnectionActorTestCase public void testLIFO() throws AMQException, ConfigurationException { assertTrue("Unexpected actor: " + CurrentActor.get(), CurrentActor.get() instanceof TestLogActor); - AMQPConnectionActor connectionActor = new AMQPConnectionActor(getSession(), + AMQPConnectionActor connectionActor = new AMQPConnectionActor(getConnection(), new NullRootMessageLogger()); /* @@ -98,7 +100,7 @@ public class CurrentActorTest extends BaseConnectionActorTestCase * */ - AMQChannel channel = new AMQChannel(getSession(), 1, getSession().getVirtualHost().getMessageStore()); + AMQSessionModel channel = BrokerTestHelper.createSession(1, getConnection()); AMQPChannelActor channelActor = new AMQPChannelActor(channel, new NullRootMessageLogger()); @@ -214,7 +216,7 @@ public class CurrentActorTest extends BaseConnectionActorTestCase { LogActor defaultActor = CurrentActor.get(); - AMQPConnectionActor actor = new AMQPConnectionActor(getSession(), + AMQPConnectionActor actor = new AMQPConnectionActor(getConnection(), new NullRootMessageLogger()); CurrentActor.set(actor); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/QueueActorTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/QueueActorTest.java index 2dc44c58ce..55153b7389 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/QueueActorTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/QueueActorTest.java @@ -31,7 +31,7 @@ public class QueueActorTest extends BaseConnectionActorTestCase public void setUp() throws Exception { super.setUp(); - setAmqpActor(new QueueActor(BrokerTestHelper.createQueue(getName(), getSession().getVirtualHost()), getRootLogger())); + setAmqpActor(new QueueActor(BrokerTestHelper.createQueue(getName(), getVirtualHost()), getRootLogger())); } /** diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/SubscriptionActorTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/SubscriptionActorTest.java index 58fca488c4..92915e7092 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/SubscriptionActorTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/SubscriptionActorTest.java @@ -44,7 +44,7 @@ public class SubscriptionActorTest extends BaseConnectionActorTestCase MockSubscription mockSubscription = new MockSubscription(); - mockSubscription.setQueue(BrokerTestHelper.createQueue(getName(), getSession().getVirtualHost()), false); + mockSubscription.setQueue(BrokerTestHelper.createQueue(getName(), getVirtualHost()), false); setAmqpActor(new SubscriptionActor(getRootLogger(), mockSubscription)); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java index 193e8a490d..f779295cd4 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java @@ -113,7 +113,7 @@ public abstract class AbstractTestLogSubject extends QpidTestCase // This should return us MockProtocolSessionUser@null/test String connectionSlice = getSlice("con:" + connectionID, message); - assertNotNull("Unable to find connection 'con:" + connectionID + "'", + assertNotNull("Unable to find connection 'con:" + connectionID + "' in '"+message+"'", connectionSlice); // Exract the userName @@ -131,7 +131,7 @@ public abstract class AbstractTestLogSubject extends QpidTestCase // We will have three sections assertEquals("Unable to split IP from rest of Connection:" - + userNameParts[1], 3, ipParts.length); + + userNameParts[1] + " in '"+message+"'", 3, ipParts.length); // We need to skip the first '/' split will be empty so validate 1 as IP assertEquals("IP not as expected", ipString, ipParts[1]); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java index a6701301f9..a3d96c6d12 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java @@ -20,7 +20,10 @@ */ package org.apache.qpid.server.logging.subjects; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; +import org.apache.qpid.server.protocol.AMQSessionModel; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * Validate ChannelLogSubjects are logged as expected @@ -34,10 +37,10 @@ public class ChannelLogSubjectTest extends ConnectionLogSubjectTest { super.setUp(); - - AMQChannel channel = new AMQChannel(getSession(), _channelID, getSession().getVirtualHost().getMessageStore()); - - _subject = new ChannelLogSubject(channel); + AMQSessionModel session = mock(AMQSessionModel.class); + when(session.getConnectionModel()).thenReturn(getConnection()); + when(session.getChannelId()).thenReturn(_channelID); + _subject = new ChannelLogSubject(session); } /** diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java index 835bcb3e97..e9a9317102 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java @@ -20,8 +20,10 @@ */ package org.apache.qpid.server.logging.subjects; -import org.apache.qpid.server.protocol.v0_8.InternalTestProtocolSession; -import org.apache.qpid.server.util.BrokerTestHelper; +import org.apache.qpid.server.protocol.AMQConnectionModel; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * Validate ConnectionLogSubjects are logged as expected @@ -29,25 +31,24 @@ import org.apache.qpid.server.util.BrokerTestHelper; public class ConnectionLogSubjectTest extends AbstractTestLogSubject { - private InternalTestProtocolSession _session; + private static final long CONNECTION_ID = 456l; + private static final String USER = "InternalTestProtocolSession"; + private static final String IP_STRING = "127.0.0.1:1"; + private static final String VHOST = "test"; + + private AMQConnectionModel _connection; @Override public void setUp() throws Exception { super.setUp(); - _session = BrokerTestHelper.createSession("test"); - _subject = new ConnectionLogSubject(_session); - } - - @Override - public void tearDown() throws Exception - { - if (_session != null) - { - _session.getVirtualHost().close(); - } - super.tearDown(); + _connection = mock(AMQConnectionModel.class); + when(_connection.getConnectionId()).thenReturn(CONNECTION_ID); + when(_connection.getPrincipalAsString()).thenReturn(USER); + when(_connection.getRemoteAddressString()).thenReturn("/"+IP_STRING); + when(_connection.getVirtualHostName()).thenReturn(VHOST); + _subject = new ConnectionLogSubject(_connection); } /** @@ -57,12 +58,12 @@ public class ConnectionLogSubjectTest extends AbstractTestLogSubject */ protected void validateLogStatement(String message) { - verifyConnection(_session.getSessionID(), "InternalTestProtocolSession", "127.0.0.1:1", "test", message); + verifyConnection(CONNECTION_ID, USER, IP_STRING, VHOST, message); } - public InternalTestProtocolSession getSession() + public AMQConnectionModel getConnection() { - return _session; + return _connection; } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubjectTest.java deleted file mode 100644 index e314361a89..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubjectTest.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.logging.subjects; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.flow.LimitlessCreditManager; -import org.apache.qpid.server.protocol.v0_8.InternalTestProtocolSession; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.MockAMQQueue; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.protocol.v0_8.SubscriptionFactory; -import org.apache.qpid.server.protocol.v0_8.SubscriptionFactoryImpl; -import org.apache.qpid.server.util.BrokerTestHelper; -import org.apache.qpid.server.virtualhost.VirtualHost; - -/** - * Validate SubscriptionLogSubjects are logged as expected - */ -public class SubscriptionLogSubjectTest extends AbstractTestLogSubject -{ - - private AMQQueue _queue; - private VirtualHost _testVhost; - private int _channelID = 1; - private Subscription _subscription; - - @Override - public void setUp() throws Exception - { - super.setUp(); - - InternalTestProtocolSession session = BrokerTestHelper.createSession(); - _testVhost = session.getVirtualHost(); - - _queue = new MockAMQQueue("SubscriptionLogSubjectTest"); - ((MockAMQQueue) _queue).setVirtualHost(_testVhost); - - AMQChannel channel = new AMQChannel(session, _channelID, _testVhost.getMessageStore()); - - session.addChannel(channel); - - SubscriptionFactory factory = new SubscriptionFactoryImpl(); - - _subscription = factory.createSubscription(_channelID, session, new AMQShortString("cTag"), - false, null, false, - new LimitlessCreditManager()); - - _subscription.setQueue(_queue, false); - - _subject = new SubscriptionLogSubject(_subscription); - } - - @Override - public void tearDown() throws Exception - { - if (_testVhost != null) - { - _testVhost.close(); - } - super.tearDown(); - } - - /** - * Validate that the logged Subject message is as expected: - * MESSAGE [Blank][sub:0(vh(/test)/qu(SubscriptionLogSubjectTest))] - * - * @param message the message whos format needs validation - */ - @Override - protected void validateLogStatement(String message) - { - String subscriptionSlice = getSlice("sub:" - + _subscription.getSubscriptionID(), - message); - - assertNotNull("Unable to locate subscription 'sub:" + - _subscription.getSubscriptionID() + "'"); - - - - // Pull out the qu(..) section from the subscription message - // Split it into three parts - // MESSAGE [Blank][sub:0(vh(/ - // test)/ - // qu(SubscriptionLogSubjectTest))] - // Take the last bit and drop off the extra )] - String[] parts = message.split("/"); - assertEquals("Message part count wrong", 3, parts.length); - String subscription = parts[2].substring(0, parts[2].indexOf(")") + 1); - - // Adding the ')' is a bit ugly but SubscriptionLogSubject is the only - // Subject that nests () and so the simple parser of checking for the - // next ')' falls down. - verifyVirtualHost(subscriptionSlice+ ")", _queue.getVirtualHost()); - - verifyQueue(subscription, _queue); - } -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolEngineTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolEngineTest.java deleted file mode 100644 index b523979387..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolEngineTest.java +++ /dev/null @@ -1,73 +0,0 @@ -package org.apache.qpid.server.protocol; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.properties.ConnectionStartProperties; -import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.Port; -import org.apache.qpid.server.model.Transport; -import org.apache.qpid.server.protocol.v0_8.AMQProtocolEngine; -import org.apache.qpid.server.util.BrokerTestHelper; -import org.apache.qpid.test.utils.QpidTestCase; -import org.apache.qpid.transport.network.NetworkConnection; - -public class AMQProtocolEngineTest extends QpidTestCase -{ - private Broker _broker; - private Port _port; - private NetworkConnection _network; - private Transport _transport; - - public void setUp() throws Exception - { - super.setUp(); - BrokerTestHelper.setUp(); - _broker = BrokerTestHelper.createBrokerMock(); - when(_broker.getAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE)).thenReturn(true); - - _port = mock(Port.class); - _network = mock(NetworkConnection.class); - _transport = Transport.TCP; - } - - public void tearDown() throws Exception - { - try - { - super.tearDown(); - } - finally - { - BrokerTestHelper.tearDown(); - } - } - - public void testSetClientPropertiesForNoRouteProvidedAsString() - { - AMQProtocolEngine engine = new AMQProtocolEngine(_broker, _network, 0, _port, _transport); - assertTrue("Unexpected closeWhenNoRoute before client properties set", engine.isCloseWhenNoRoute()); - - Map clientProperties = new HashMap(); - clientProperties.put(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE, Boolean.FALSE.toString()); - engine.setClientProperties(FieldTable.convertToFieldTable(clientProperties)); - - assertFalse("Unexpected closeWhenNoRoute after client properties set", engine.isCloseWhenNoRoute()); - } - - public void testSetClientPropertiesForNoRouteProvidedAsBoolean() - { - AMQProtocolEngine engine = new AMQProtocolEngine(_broker, _network, 0, _port, _transport); - assertTrue("Unexpected closeWhenNoRoute before client properties set", engine.isCloseWhenNoRoute()); - - Map clientProperties = new HashMap(); - clientProperties.put(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE, Boolean.FALSE); - engine.setClientProperties(FieldTable.convertToFieldTable(clientProperties)); - - assertFalse("Unexpected closeWhenNoRoute after client properties set", engine.isCloseWhenNoRoute()); - } -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java index a5e5529ed5..b358c7c5c5 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java @@ -35,7 +35,6 @@ import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.message.MessageContentSource; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java new file mode 100644 index 0000000000..f5e58cfd02 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java @@ -0,0 +1,72 @@ +package org.apache.qpid.server.protocol.v0_8; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.properties.ConnectionStartProperties; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.util.BrokerTestHelper; +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.transport.network.NetworkConnection; + +public class AMQProtocolEngineTest extends QpidTestCase +{ + private Broker _broker; + private Port _port; + private NetworkConnection _network; + private Transport _transport; + + public void setUp() throws Exception + { + super.setUp(); + BrokerTestHelper.setUp(); + _broker = BrokerTestHelper.createBrokerMock(); + when(_broker.getAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE)).thenReturn(true); + + _port = mock(Port.class); + _network = mock(NetworkConnection.class); + _transport = Transport.TCP; + } + + public void tearDown() throws Exception + { + try + { + super.tearDown(); + } + finally + { + BrokerTestHelper.tearDown(); + } + } + + public void testSetClientPropertiesForNoRouteProvidedAsString() + { + AMQProtocolEngine engine = new AMQProtocolEngine(_broker, _network, 0, _port, _transport); + assertTrue("Unexpected closeWhenNoRoute before client properties set", engine.isCloseWhenNoRoute()); + + Map clientProperties = new HashMap(); + clientProperties.put(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE, Boolean.FALSE.toString()); + engine.setClientProperties(FieldTable.convertToFieldTable(clientProperties)); + + assertFalse("Unexpected closeWhenNoRoute after client properties set", engine.isCloseWhenNoRoute()); + } + + public void testSetClientPropertiesForNoRouteProvidedAsBoolean() + { + AMQProtocolEngine engine = new AMQProtocolEngine(_broker, _network, 0, _port, _transport); + assertTrue("Unexpected closeWhenNoRoute before client properties set", engine.isCloseWhenNoRoute()); + + Map clientProperties = new HashMap(); + clientProperties.put(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE, Boolean.FALSE); + engine.setClientProperties(FieldTable.convertToFieldTable(clientProperties)); + + assertFalse("Unexpected closeWhenNoRoute after client properties set", engine.isCloseWhenNoRoute()); + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java index 60b6b215c6..4ab64ca100 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java @@ -28,9 +28,7 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.flow.LimitlessCreditManager; import org.apache.qpid.server.flow.Pre0_10CreditManager; -import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.IncomingMessage; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TestableMemoryMessageStore; @@ -67,7 +65,7 @@ public class AckTest extends QpidTestCase { super.setUp(); BrokerTestHelper.setUp(); - _channel = BrokerTestHelper.createChannel(5); + _channel = BrokerTestHelper_0_8.createChannel(5); _protocolSession = _channel.getProtocolSession(); _virtualHost = _protocolSession.getVirtualHost(); _queue = BrokerTestHelper.createQueue(getTestName(), _virtualHost); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java new file mode 100644 index 0000000000..a9eb0ebfe7 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java @@ -0,0 +1,181 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_8; + + +import org.apache.qpid.AMQException; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.queue.SimpleAMQQueue; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.util.BrokerTestHelper; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.test.utils.QpidTestCase; + +import java.util.List; + +public class AcknowledgeTest extends QpidTestCase +{ + private AMQChannel _channel; + private SimpleAMQQueue _queue; + private MessageStore _messageStore; + private String _queueName; + + @Override + public void setUp() throws Exception + { + super.setUp(); + BrokerTestHelper.setUp(); + _channel = BrokerTestHelper_0_8.createChannel(); + VirtualHost virtualHost = _channel.getVirtualHost(); + _queueName = getTestName(); + _queue = BrokerTestHelper.createQueue(_queueName, virtualHost); + _messageStore = virtualHost.getMessageStore(); + } + + @Override + public void tearDown() throws Exception + { + try + { + if (_channel != null) + { + _channel.getVirtualHost().close(); + } + } + finally + { + BrokerTestHelper.tearDown(); + super.tearDown(); + } + } + + private AMQChannel getChannel() + { + return _channel; + } + + private InternalTestProtocolSession getSession() + { + return (InternalTestProtocolSession)_channel.getProtocolSession(); + } + + private SimpleAMQQueue getQueue() + { + return _queue; + } + + public void testTransactionalSingleAck() throws AMQException + { + getChannel().setLocalTransactional(); + runMessageAck(1, 1, 1, false, 0); + } + + public void testTransactionalMultiAck() throws AMQException + { + getChannel().setLocalTransactional(); + runMessageAck(10, 1, 5, true, 5); + } + + public void testTransactionalAckAll() throws AMQException + { + getChannel().setLocalTransactional(); + runMessageAck(10, 1, 0, true, 0); + } + + public void testNonTransactionalSingleAck() throws AMQException + { + runMessageAck(1, 1, 1, false, 0); + } + + public void testNonTransactionalMultiAck() throws AMQException + { + runMessageAck(10, 1, 5, true, 5); + } + + public void testNonTransactionalAckAll() throws AMQException + { + runMessageAck(10, 1, 0, true, 0); + } + + protected void runMessageAck(int sendMessageCount, long firstDeliveryTag, long acknowledgeDeliveryTag, boolean acknowldegeMultiple, int remainingUnackedMessages) throws AMQException + { + //Check store is empty + checkStoreContents(0); + + //Send required messsages to the queue + BrokerTestHelper_0_8.publishMessages(getChannel(), + sendMessageCount, + _queueName, + ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString()); + + if (getChannel().isTransactional()) + { + getChannel().commit(); + } + + //Ensure they are stored + checkStoreContents(sendMessageCount); + + //Check that there are no unacked messages + assertEquals("Channel should have no unacked msgs ", 0, getChannel().getUnacknowledgedMessageMap().size()); + + //Subscribe to the queue + AMQShortString subscriber = _channel.subscribeToQueue(null, _queue, true, null, false, true); + + getQueue().deliverAsync(); + + //Wait for the messages to be delivered + getSession().awaitDelivery(sendMessageCount); + + //Check that they are all waiting to be acknoledged + assertEquals("Channel should have unacked msgs", sendMessageCount, getChannel().getUnacknowledgedMessageMap().size()); + + List messages = getSession().getDelivers(getChannel().getChannelId(), subscriber, sendMessageCount); + + //Double check we received the right number of messages + assertEquals(sendMessageCount, messages.size()); + + //Check that the first message has the expected deliveryTag + assertEquals("First message does not have expected deliveryTag", firstDeliveryTag, messages.get(0).getDeliveryTag()); + + //Send required Acknowledgement + getChannel().acknowledgeMessage(acknowledgeDeliveryTag, acknowldegeMultiple); + + if (getChannel().isTransactional()) + { + getChannel().commit(); + } + + // Check Remaining Acknowledgements + assertEquals("Channel unacked msgs count incorrect", remainingUnackedMessages, getChannel().getUnacknowledgedMessageMap().size()); + + //Check store contents are also correct. + checkStoreContents(remainingUnackedMessages); + } + + private void checkStoreContents(int messageCount) + { + assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getMessageCount()); + } + +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java new file mode 100644 index 0000000000..0919607bd7 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java @@ -0,0 +1,99 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_8; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.util.BrokerTestHelper; +import org.apache.qpid.server.virtualhost.VirtualHost; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class BrokerTestHelper_0_8 extends BrokerTestHelper +{ + + public static AMQChannel createChannel(int channelId, AMQProtocolSession session) throws AMQException + { + AMQChannel channel = new AMQChannel(session, channelId, session.getVirtualHost().getMessageStore()); + session.addChannel(channel); + return channel; + } + + public static AMQChannel createChannel(int channelId) throws Exception + { + InternalTestProtocolSession session = createProtocolSession(); + return createChannel(channelId, session); + } + + public static AMQChannel createChannel() throws Exception + { + return createChannel(1); + } + + public static InternalTestProtocolSession createProtocolSession() throws Exception + { + return createProtocolSession("test"); + } + + public static InternalTestProtocolSession createProtocolSession(String hostName) throws Exception + { + VirtualHost virtualHost = createVirtualHost(hostName); + return new InternalTestProtocolSession(virtualHost, createBrokerMock()); + } + + public static void publishMessages(AMQChannel channel, int numberOfMessages, String queueName, String exchangeName) throws AMQException + { + AMQShortString rouningKey = new AMQShortString(queueName); + AMQShortString exchangeNameAsShortString = new AMQShortString(exchangeName); + MessagePublishInfo info = mock(MessagePublishInfo.class); + when(info.getExchange()).thenReturn(exchangeNameAsShortString); + when(info.getRoutingKey()).thenReturn(rouningKey); + + Exchange exchange = channel.getVirtualHost().getExchange(exchangeName); + for (int count = 0; count < numberOfMessages; count++) + { + channel.setPublishFrame(info, exchange); + + // Set the body size + ContentHeaderBody _headerBody = new ContentHeaderBody(); + _headerBody.setBodySize(0); + + // Set Minimum properties + BasicContentHeaderProperties properties = new BasicContentHeaderProperties(); + + properties.setExpiration(0L); + properties.setTimestamp(System.currentTimeMillis()); + + // Make Message Persistent + properties.setDeliveryMode((byte) 2); + + _headerBody.setProperties(properties); + + channel.publishContentHeader(_headerBody); + } + channel.sync(); + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java index 1cc872e177..a77475c05f 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java @@ -22,8 +22,6 @@ package org.apache.qpid.server.protocol.v0_8; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.v0_8.AMQProtocolEngine; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.test.utils.QpidTestCase; @@ -37,7 +35,7 @@ public class MaxChannelsTest extends QpidTestCase { super.setUp(); BrokerTestHelper.setUp(); - _session = BrokerTestHelper.createSession(); + _session = BrokerTestHelper_0_8.createProtocolSession(); } public void testChannels() throws Exception diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java new file mode 100644 index 0000000000..21142e7ab6 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java @@ -0,0 +1,149 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_8; + +import org.apache.qpid.AMQException; +import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.SimpleAMQQueue; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.util.BrokerTestHelper; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.test.utils.QpidTestCase; + +import java.util.List; + +public class QueueBrowserUsesNoAckTest extends QpidTestCase +{ + private AMQChannel _channel; + private SimpleAMQQueue _queue; + private MessageStore _messageStore; + private String _queueName; + + @Override + public void setUp() throws Exception + { + super.setUp(); + BrokerTestHelper.setUp(); + _channel = BrokerTestHelper_0_8.createChannel(); + VirtualHost virtualHost = _channel.getVirtualHost(); + _queueName = getTestName(); + _queue = BrokerTestHelper.createQueue(_queueName, virtualHost); + _messageStore = virtualHost.getMessageStore(); + } + + @Override + public void tearDown() throws Exception + { + try + { + if (_channel != null) + { + _channel.getVirtualHost().close(); + } + } + finally + { + BrokerTestHelper.tearDown(); + super.tearDown(); + } + } + + private AMQChannel getChannel() + { + return _channel; + } + + private InternalTestProtocolSession getSession() + { + return (InternalTestProtocolSession)_channel.getProtocolSession(); + } + + private SimpleAMQQueue getQueue() + { + return _queue; + } + + public void testQueueBrowserUsesNoAck() throws AMQException + { + int sendMessageCount = 2; + int prefetch = 1; + + //Check store is empty + checkStoreContents(0); + + //Send required messsages to the queue + BrokerTestHelper_0_8.publishMessages(getChannel(), + sendMessageCount, + _queueName, + ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString()); + + //Ensure they are stored + checkStoreContents(sendMessageCount); + + //Check that there are no unacked messages + assertEquals("Channel should have no unacked msgs ", 0, + getChannel().getUnacknowledgedMessageMap().size()); + + //Set the prefetch on the session to be less than the sent messages + getChannel().setCredit(0, prefetch); + + //browse the queue + AMQShortString browser = browse(getChannel(), getQueue()); + + getQueue().deliverAsync(); + + //Wait for messages to fill the prefetch + getSession().awaitDelivery(prefetch); + + //Get those messages + List messages = + getSession().getDelivers(getChannel().getChannelId(), browser, + prefetch); + + //Ensure we recevied the prefetched messages + assertEquals(prefetch, messages.size()); + + //Check the process didn't suspend the subscription as this would + // indicate we are using the prefetch credit. i.e. using acks not No-Ack + assertTrue("The subscription has been suspended", + !getChannel().getSubscription(browser).getState() + .equals(Subscription.State.SUSPENDED)); + } + + private void checkStoreContents(int messageCount) + { + assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getMessageCount()); + } + + private AMQShortString browse(AMQChannel channel, AMQQueue queue) throws AMQException + { + FieldTable filters = new FieldTable(); + filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true); + + return channel.subscribeToQueue(null, queue, true, filters, false, true); + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImplTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImplTest.java new file mode 100644 index 0000000000..e98dd63450 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImplTest.java @@ -0,0 +1,96 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_8; + +import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.logging.UnitTestMessageLogger; +import org.apache.qpid.server.logging.actors.GenericActor; +import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.util.BrokerTestHelper; +import org.apache.qpid.test.utils.QpidTestCase; + +public class SubscriptionFactoryImplTest extends QpidTestCase +{ + private AMQChannel _channel; + private AMQProtocolSession _session; + + @Override + public void setUp() throws Exception + { + super.setUp(); + BrokerTestHelper.setUp(); + _channel = BrokerTestHelper_0_8.createChannel(); + _session = _channel.getProtocolSession(); + GenericActor.setDefaultMessageLogger(new UnitTestMessageLogger(false)); + } + + @Override + public void tearDown() throws Exception + { + try + { + if (_channel != null) + { + _channel.getVirtualHost().close(); + } + } + finally + { + BrokerTestHelper.tearDown(); + super.tearDown(); + } + } + + /** + * Tests that while creating Subscriptions of various types, the + * ID numbers assigned are allocated from a common sequence + * (in increasing order). + */ + public void testDifferingSubscriptionTypesShareCommonIdNumberingSequence() throws Exception + { + //create a No-Ack subscription, get the first Subscription ID + long previousId = 0; + Subscription noAckSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, _session, new AMQShortString("1"), false, null, false, _channel.getCreditManager()); + previousId = noAckSub.getSubscriptionID(); + + //create an ack subscription, verify the next Subscription ID is used + Subscription ackSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, _session, new AMQShortString("1"), true, null, false, _channel.getCreditManager()); + assertEquals("Unexpected Subscription ID allocated", previousId + 1, ackSub.getSubscriptionID()); + previousId = ackSub.getSubscriptionID(); + + //create a browser subscription + FieldTable filters = new FieldTable(); + filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true); + Subscription browerSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, _session, new AMQShortString("1"), true, null, false, _channel.getCreditManager()); + assertEquals("Unexpected Subscription ID allocated", previousId + 1, browerSub.getSubscriptionID()); + previousId = browerSub.getSubscriptionID(); + + //create an BasicGet NoAck subscription + Subscription getNoAckSub = SubscriptionFactoryImpl.INSTANCE.createBasicGetNoAckSubscription(_channel, _session, new AMQShortString("1"), null, false, + _channel.getCreditManager(),_channel.getClientDeliveryMethod(), _channel.getRecordDeliveryMethod()); + assertEquals("Unexpected Subscription ID allocated", previousId + 1, getNoAckSub.getSubscriptionID()); + previousId = getNoAckSub.getSubscriptionID(); + + } + +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionLogSubjectTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionLogSubjectTest.java new file mode 100644 index 0000000000..c44fdebc03 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionLogSubjectTest.java @@ -0,0 +1,115 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_8; + +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.logging.subjects.AbstractTestLogSubject; +import org.apache.qpid.server.logging.subjects.SubscriptionLogSubject; +import org.apache.qpid.server.flow.LimitlessCreditManager; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.MockAMQQueue; +import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.util.BrokerTestHelper; +import org.apache.qpid.server.virtualhost.VirtualHost; + +/** + * Validate SubscriptionLogSubjects are logged as expected + */ +public class SubscriptionLogSubjectTest extends AbstractTestLogSubject +{ + + private AMQQueue _queue; + private VirtualHost _testVhost; + private int _channelID = 1; + private Subscription _subscription; + + @Override + public void setUp() throws Exception + { + super.setUp(); + + InternalTestProtocolSession session = BrokerTestHelper_0_8.createProtocolSession(); + _testVhost = session.getVirtualHost(); + + _queue = new MockAMQQueue("SubscriptionLogSubjectTest"); + ((MockAMQQueue) _queue).setVirtualHost(_testVhost); + + AMQChannel channel = new AMQChannel(session, _channelID, _testVhost.getMessageStore()); + + session.addChannel(channel); + + SubscriptionFactory factory = new SubscriptionFactoryImpl(); + + _subscription = factory.createSubscription(_channelID, session, new AMQShortString("cTag"), + false, null, false, + new LimitlessCreditManager()); + + _subscription.setQueue(_queue, false); + + _subject = new SubscriptionLogSubject(_subscription); + } + + @Override + public void tearDown() throws Exception + { + if (_testVhost != null) + { + _testVhost.close(); + } + super.tearDown(); + } + + /** + * Validate that the logged Subject message is as expected: + * MESSAGE [Blank][sub:0(vh(/test)/qu(SubscriptionLogSubjectTest))] + * + * @param message the message whos format needs validation + */ + @Override + protected void validateLogStatement(String message) + { + String subscriptionSlice = getSlice("sub:" + + _subscription.getSubscriptionID(), + message); + + assertNotNull("Unable to locate subscription 'sub:" + + _subscription.getSubscriptionID() + "'"); + + + + // Pull out the qu(..) section from the subscription message + // Split it into three parts + // MESSAGE [Blank][sub:0(vh(/ + // test)/ + // qu(SubscriptionLogSubjectTest))] + // Take the last bit and drop off the extra )] + String[] parts = message.split("/"); + assertEquals("Message part count wrong", 3, parts.length); + String subscription = parts[2].substring(0, parts[2].indexOf(")") + 1); + + // Adding the ')' is a bit ugly but SubscriptionLogSubject is the only + // Subject that nests () and so the simple parser of checking for the + // next ')' falls down. + verifyVirtualHost(subscriptionSlice+ ")", _queue.getVirtualHost()); + + verifyQueue(subscription, _queue); + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 87f0c06c2b..07e72d3535 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -39,6 +39,7 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.protocol.v0_8.AMQMessage; +import org.apache.qpid.server.protocol.v0_8.IncomingMessage; import org.apache.qpid.server.protocol.v0_8.MessageMetaData; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.UUIDGenerator; diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/QueueBrowserUsesNoAckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/QueueBrowserUsesNoAckTest.java deleted file mode 100644 index d9c0b7055f..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/QueueBrowserUsesNoAckTest.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.subscription; - -import org.apache.qpid.AMQException; -import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.v0_8.InternalTestProtocolSession; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.SimpleAMQQueue; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.TestableMemoryMessageStore; -import org.apache.qpid.server.util.BrokerTestHelper; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.test.utils.QpidTestCase; - -import java.util.List; - -public class QueueBrowserUsesNoAckTest extends QpidTestCase -{ - private AMQChannel _channel; - private SimpleAMQQueue _queue; - private MessageStore _messageStore; - private String _queueName; - - @Override - public void setUp() throws Exception - { - super.setUp(); - BrokerTestHelper.setUp(); - _channel = BrokerTestHelper.createChannel(); - VirtualHost virtualHost = _channel.getVirtualHost(); - _queueName = getTestName(); - _queue = BrokerTestHelper.createQueue(_queueName, virtualHost); - _messageStore = virtualHost.getMessageStore(); - } - - @Override - public void tearDown() throws Exception - { - try - { - if (_channel != null) - { - _channel.getVirtualHost().close(); - } - } - finally - { - BrokerTestHelper.tearDown(); - super.tearDown(); - } - } - - private AMQChannel getChannel() - { - return _channel; - } - - private InternalTestProtocolSession getSession() - { - return (InternalTestProtocolSession)_channel.getProtocolSession(); - } - - private SimpleAMQQueue getQueue() - { - return _queue; - } - - public void testQueueBrowserUsesNoAck() throws AMQException - { - int sendMessageCount = 2; - int prefetch = 1; - - //Check store is empty - checkStoreContents(0); - - //Send required messsages to the queue - BrokerTestHelper.publishMessages(getChannel(), sendMessageCount, _queueName, ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString()); - - //Ensure they are stored - checkStoreContents(sendMessageCount); - - //Check that there are no unacked messages - assertEquals("Channel should have no unacked msgs ", 0, - getChannel().getUnacknowledgedMessageMap().size()); - - //Set the prefetch on the session to be less than the sent messages - getChannel().setCredit(0, prefetch); - - //browse the queue - AMQShortString browser = browse(getChannel(), getQueue()); - - getQueue().deliverAsync(); - - //Wait for messages to fill the prefetch - getSession().awaitDelivery(prefetch); - - //Get those messages - List messages = - getSession().getDelivers(getChannel().getChannelId(), browser, - prefetch); - - //Ensure we recevied the prefetched messages - assertEquals(prefetch, messages.size()); - - //Check the process didn't suspend the subscription as this would - // indicate we are using the prefetch credit. i.e. using acks not No-Ack - assertTrue("The subscription has been suspended", - !getChannel().getSubscription(browser).getState() - .equals(Subscription.State.SUSPENDED)); - } - - private void checkStoreContents(int messageCount) - { - assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getMessageCount()); - } - - private AMQShortString browse(AMQChannel channel, AMQQueue queue) throws AMQException - { - FieldTable filters = new FieldTable(); - filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true); - - return channel.subscribeToQueue(null, queue, true, filters, false, true); - } -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java deleted file mode 100644 index 731b1aadc4..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.subscription; - -import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.logging.UnitTestMessageLogger; -import org.apache.qpid.server.logging.actors.GenericActor; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.SubscriptionFactoryImpl; -import org.apache.qpid.server.util.BrokerTestHelper; -import org.apache.qpid.test.utils.QpidTestCase; - -public class SubscriptionFactoryImplTest extends QpidTestCase -{ - private AMQChannel _channel; - private AMQProtocolSession _session; - - @Override - public void setUp() throws Exception - { - super.setUp(); - BrokerTestHelper.setUp(); - _channel = BrokerTestHelper.createChannel(); - _session = _channel.getProtocolSession(); - GenericActor.setDefaultMessageLogger(new UnitTestMessageLogger(false)); - } - - @Override - public void tearDown() throws Exception - { - try - { - if (_channel != null) - { - _channel.getVirtualHost().close(); - } - } - finally - { - BrokerTestHelper.tearDown(); - super.tearDown(); - } - } - - /** - * Tests that while creating Subscriptions of various types, the - * ID numbers assigned are allocated from a common sequence - * (in increasing order). - */ - public void testDifferingSubscriptionTypesShareCommonIdNumberingSequence() throws Exception - { - //create a No-Ack subscription, get the first Subscription ID - long previousId = 0; - Subscription noAckSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, _session, new AMQShortString("1"), false, null, false, _channel.getCreditManager()); - previousId = noAckSub.getSubscriptionID(); - - //create an ack subscription, verify the next Subscription ID is used - Subscription ackSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, _session, new AMQShortString("1"), true, null, false, _channel.getCreditManager()); - assertEquals("Unexpected Subscription ID allocated", previousId + 1, ackSub.getSubscriptionID()); - previousId = ackSub.getSubscriptionID(); - - //create a browser subscription - FieldTable filters = new FieldTable(); - filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true); - Subscription browerSub = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, _session, new AMQShortString("1"), true, null, false, _channel.getCreditManager()); - assertEquals("Unexpected Subscription ID allocated", previousId + 1, browerSub.getSubscriptionID()); - previousId = browerSub.getSubscriptionID(); - - //create an BasicGet NoAck subscription - Subscription getNoAckSub = SubscriptionFactoryImpl.INSTANCE.createBasicGetNoAckSubscription(_channel, _session, new AMQShortString("1"), null, false, - _channel.getCreditManager(),_channel.getClientDeliveryMethod(), _channel.getRecordDeliveryMethod()); - assertEquals("Unexpected Subscription ID allocated", previousId + 1, getNoAckSub.getSubscriptionID()); - previousId = getNoAckSub.getSubscriptionID(); - - } - -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java index bc4e67aefe..caf6acb4bb 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java @@ -31,11 +31,8 @@ import java.util.UUID; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; +import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.configuration.store.JsonConfigurationEntryStore; import org.apache.qpid.server.exchange.DefaultExchangeFactory; @@ -47,8 +44,6 @@ import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.actors.TestLogActor; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_8.InternalTestProtocolSession; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.SimpleAMQQueue; import org.apache.qpid.server.security.SecurityManager; @@ -143,33 +138,35 @@ public class BrokerTestHelper return vhostConfig; } - public static AMQChannel createChannel(int channelId, AMQProtocolSession session) throws AMQException + public static AMQSessionModel createSession(int channelId, AMQConnectionModel connection) throws AMQException { - AMQChannel channel = new AMQChannel(session, channelId, session.getVirtualHost().getMessageStore()); - session.addChannel(channel); - return channel; + AMQSessionModel session = mock(AMQSessionModel.class); + when(session.getConnectionModel()).thenReturn(connection); + when(session.getChannelId()).thenReturn(channelId); + return session; } - public static AMQChannel createChannel(int channelId) throws Exception + public static AMQSessionModel createSession(int channelId) throws Exception { - InternalTestProtocolSession session = createSession(); - return createChannel(channelId, session); + AMQConnectionModel session = createConnection(); + return createSession(channelId, session); } - public static AMQChannel createChannel() throws Exception + public static AMQSessionModel createSession() throws Exception { - return createChannel(1); + return createSession(1); } - public static InternalTestProtocolSession createSession() throws Exception + public static AMQConnectionModel createConnection() throws Exception { - return createSession("test"); + return createConnection("test"); } - public static InternalTestProtocolSession createSession(String hostName) throws Exception + public static AMQConnectionModel createConnection(String hostName) throws Exception { VirtualHost virtualHost = createVirtualHost(hostName); - return new InternalTestProtocolSession(virtualHost, createBrokerMock()); + AMQConnectionModel connection = mock(AMQConnectionModel.class); + return connection; } public static Exchange createExchange(String hostName) throws Exception @@ -182,39 +179,6 @@ public class BrokerTestHelper return factory.createExchange("amp.direct", "direct", false, false); } - public static void publishMessages(AMQChannel channel, int numberOfMessages, String queueName, String exchangeName) throws AMQException - { - AMQShortString rouningKey = new AMQShortString(queueName); - AMQShortString exchangeNameAsShortString = new AMQShortString(exchangeName); - MessagePublishInfo info = mock(MessagePublishInfo.class); - when(info.getExchange()).thenReturn(exchangeNameAsShortString); - when(info.getRoutingKey()).thenReturn(rouningKey); - - Exchange exchange = channel.getVirtualHost().getExchange(exchangeName); - for (int count = 0; count < numberOfMessages; count++) - { - channel.setPublishFrame(info, exchange); - - // Set the body size - ContentHeaderBody _headerBody = new ContentHeaderBody(); - _headerBody.setBodySize(0); - - // Set Minimum properties - BasicContentHeaderProperties properties = new BasicContentHeaderProperties(); - - properties.setExpiration(0L); - properties.setTimestamp(System.currentTimeMillis()); - - // Make Message Persistent - properties.setDeliveryMode((byte) 2); - - _headerBody.setProperties(properties); - - channel.publishContentHeader(_headerBody); - } - channel.sync(); - } - public static SimpleAMQQueue createQueue(String queueName, VirtualHost virtualHost) throws AMQException { SimpleAMQQueue queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, null, diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java index 05761f88a1..a3551b8952 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -48,7 +48,7 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.ConflationQueue; -import org.apache.qpid.server.queue.IncomingMessage; +import org.apache.qpid.server.protocol.v0_8.IncomingMessage; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.queue.SimpleAMQQueue; import org.apache.qpid.server.txn.AutoCommitTransaction; -- cgit v1.2.1