diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-07-14 22:55:54 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-07-14 22:55:54 +0000 |
| commit | e745b78b3f111daa2c76ddb9cd1afd4ca10417e1 (patch) | |
| tree | c3eb8853a37be5b35177f3a661ab503a1facf51c /qpid/java/broker | |
| parent | c4865a9355ff5362b525819090a486f075be042c (diff) | |
| download | qpid-python-e745b78b3f111daa2c76ddb9cd1afd4ca10417e1.tar.gz | |
QPID-4659 : [Java Broker] fix protocol version specific code in logging, subscriptions
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1503076 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker')
18 files changed, 134 insertions, 161 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java index da280af059..a5fe14641c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.logging.actors; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.logging.RootMessageLogger; import org.apache.qpid.server.logging.subjects.ChannelLogSubject; @@ -45,7 +46,7 @@ public class AMQPChannelActor extends AbstractActor * @param channel The Channel for this LogActor * @param rootLogger The root Logger that this LogActor should use */ - public AMQPChannelActor(AMQChannel channel, RootMessageLogger rootLogger) + public AMQPChannelActor(AMQSessionModel channel, RootMessageLogger rootLogger) { super(rootLogger); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java index 1b4bc91bc1..a4bd002f01 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java @@ -14,14 +14,15 @@ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. + * under the License. + * * - * */ package org.apache.qpid.server.logging.actors; import org.apache.qpid.server.logging.RootMessageLogger; import org.apache.qpid.server.logging.subjects.ConnectionLogSubject; +import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -39,7 +40,7 @@ public class AMQPConnectionActor extends AbstractActor { private ConnectionLogSubject _logSubject; - public AMQPConnectionActor(AMQProtocolSession session, RootMessageLogger rootLogger) + public AMQPConnectionActor(AMQConnectionModel session, RootMessageLogger rootLogger) { super(rootLogger); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java index 0ff26a64b6..5b0e34b73e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java @@ -20,20 +20,16 @@ */ package org.apache.qpid.server.logging.subjects; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_10.ServerConnection; -import org.apache.qpid.server.protocol.v0_10.ServerSession; +import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.protocol.AMQSessionModel; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT; public class ChannelLogSubject extends AbstractLogSubject { - public ChannelLogSubject(AMQChannel channel) + public ChannelLogSubject(AMQSessionModel session) { - AMQProtocolSession session = channel.getProtocolSession(); - /** * LOG FORMAT used by the AMQPConnectorActor follows * ChannelLogSubject.CHANNEL_FORMAT : con:{0}({1}@{2}/{3})/ch:{4}. @@ -47,39 +43,14 @@ public class ChannelLogSubject extends AbstractLogSubject * 3 - Virtualhost * 4 - Channel ID */ + AMQConnectionModel connection = session.getConnectionModel(); setLogStringWithFormat(CHANNEL_FORMAT, - session.getSessionID(), - session.getAuthorizedPrincipal().getName(), - session.getRemoteAddress(), - session.getVirtualHost().getName(), - channel.getChannelId()); - } + connection == null ? -1L : connection.getConnectionId(), + (connection == null || connection.getPrincipalAsString() == null) ? "?" : connection.getPrincipalAsString(), + (connection == null || connection.getRemoteAddressString() == null) ? "?" : connection.getRemoteAddressString(), + (connection == null || connection.getVirtualHostName() == null) ? "?" : connection.getVirtualHostName(), + session.getChannelId()); - public ChannelLogSubject(ServerSession session) - { - /** - * LOG FORMAT used by the AMQPConnectorActor follows - * ChannelLogSubject.CHANNEL_FORMAT : con:{0}({1}@{2}/{3})/ch:{4}. - * - * Uses a MessageFormat call to insert the required values according to - * these indices: - * - * 0 - Connection ID - * 1 - User ID - * 2 - IP - * 3 - Virtualhost - * 4 - Channel ID - */ - if(session.getConnection() instanceof ServerConnection) - { - ServerConnection connection = (ServerConnection) session.getConnection(); - setLogStringWithFormat(CHANNEL_FORMAT, - connection == null ? -1L : connection.getConnectionId(), - session.getAuthorizedPrincipal() == null ? "?" : session.getAuthorizedPrincipal().getName(), - (connection == null || connection.getRemoteAddressString() == null) ? "?" : connection.getRemoteAddressString(), - session.getVirtualHost().getName(), - session.getChannel()); - } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java index 3b08a172b6..87c2377e0f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java @@ -20,34 +20,33 @@ */ package org.apache.qpid.server.logging.subjects; -import org.apache.qpid.server.protocol.AMQProtocolSession; +import java.text.MessageFormat; +import org.apache.qpid.server.protocol.AMQConnectionModel; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT; -import java.text.MessageFormat; - /** The Connection LogSubject */ public class ConnectionLogSubject extends AbstractLogSubject { - public ConnectionLogSubject(AMQProtocolSession session) + // The Session this Actor is representing + private AMQConnectionModel _session; + + public ConnectionLogSubject(AMQConnectionModel session) { _session = session; } - // The Session this Actor is representing - private AMQProtocolSession _session; - // Used to stop re-creating the _logString when we reach our final format private boolean _upToDate = false; /** * Update the LogString as the Connection process proceeds. - * + * * When the Session has an authorized ID add that to the string. - * + * * When the Session then gains a Vhost add that to the string, at this point * we can set upToDate = true as the _logString will not need to be updated * from this point onwards. @@ -56,44 +55,44 @@ public class ConnectionLogSubject extends AbstractLogSubject { if (!_upToDate) { - if (_session.getAuthorizedPrincipal() != null) + if (_session.getPrincipalAsString() != null) { - if (_session.getVirtualHost() != null) + if (_session.getVirtualHostName() != null) { /** * LOG FORMAT used by the AMQPConnectorActor follows * ConnectionLogSubject.CONNECTION_FORMAT : * con:{0}({1}@{2}/{3}) - * + * * Uses a MessageFormat call to insert the required values * according to these indices: - * + * * 0 - Connection ID 1 - User ID 2 - IP 3 - Virtualhost */ setLogString("[" + MessageFormat.format(CONNECTION_FORMAT, - _session.getSessionID(), - _session.getAuthorizedPrincipal().getName(), - _session.getRemoteAddress(), - _session.getVirtualHost().getName()) + _session.getConnectionId(), + _session.getPrincipalAsString(), + _session.getRemoteAddressString(), + _session.getVirtualHostName()) + "] "); _upToDate = true; - } + } else { setLogString("[" + MessageFormat.format(USER_FORMAT, - _session.getSessionID(), - _session.getAuthorizedPrincipal().getName(), - _session.getRemoteAddress()) + _session.getConnectionId(), + _session.getPrincipalAsString(), + _session.getRemoteAddressString()) + "] "); } - } + } else { setLogString("[" + MessageFormat.format(SOCKET_FORMAT, - _session.getSessionID(), - _session.getRemoteAddress()) + _session.getConnectionId(), + _session.getRemoteAddressString()) + "] "); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java index e757898b69..19c5d03e0c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java @@ -92,4 +92,6 @@ public interface AMQConnectionModel extends StatisticsGatherer void stop(); boolean isStopped(); + + String getVirtualHostName(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java index ad54bc148a..ab50a33b9b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java @@ -76,7 +76,7 @@ public class ProtocolEngineCreator_0_10 implements ProtocolEngineCreator final ConnectionDelegate connDelegate = new ServerConnectionDelegate(broker, fqdn, broker.getSubjectCreator(address)); - ServerConnection conn = new ServerConnection(id); + ServerConnection conn = new ServerConnection(id,broker); conn.setConnectionDelegate(connDelegate); conn.setRemoteAddress(network.getRemoteAddress()); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index 1b0f068646..0015988ab7 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -32,9 +32,11 @@ import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.actors.AMQPConnectionActor; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.ConnectionMessages; +import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.protocol.AMQConnectionModel; @@ -60,7 +62,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel, { private Runnable _onOpenTask; private AtomicBoolean _logClosed = new AtomicBoolean(false); - private LogActor _actor = GenericActor.getInstance(this); + private LogActor _actor; private Subject _authorizedSubject = null; private Principal _authorizedPrincipal = null; @@ -76,9 +78,10 @@ public class ServerConnection extends Connection implements AMQConnectionModel, private Transport _transport; private volatile boolean _stopped; - public ServerConnection(final long connectionId) + public ServerConnection(final long connectionId, Broker broker) { _connectionId = connectionId; + _actor = new AMQPConnectionActor(this, broker.getRootMessageLogger()); } public Object getReference() @@ -154,6 +157,12 @@ public class ServerConnection extends Connection implements AMQConnectionModel, } @Override + public String getVirtualHostName() + { + return _virtualHost == null ? null : _virtualHost.getName(); + } + + @Override public Port getPort() { return _port; @@ -503,7 +512,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel, public String getPrincipalAsString() { - return getAuthorizedPrincipal().getName(); + return getAuthorizedPrincipal() == null ? null : getAuthorizedPrincipal().getName(); } public long getSessionCountLimit() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 08be1fdd2b..8e79813216 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -45,7 +45,6 @@ import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.protocol.v0_8.SubscriptionFactoryImpl; import org.apache.qpid.server.txn.AlreadyKnownDtxException; import org.apache.qpid.server.txn.DtxNotSelectedException; import org.apache.qpid.server.txn.IncorrectDtxStateException; @@ -250,7 +249,7 @@ public class ServerSessionDelegate extends SessionDelegate return; } - Subscription_0_10 sub = SubscriptionFactoryImpl.INSTANCE.createSubscription((ServerSession)session, + Subscription_0_10 sub = new Subscription_0_10((ServerSession)session, destination, method.getAcceptMode(), method.getAcquireMode(), diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java index 68b956978a..87d482c44c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java @@ -132,9 +132,9 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr MessageAcquireMode acquireMode, MessageFlowMode flowMode, FlowCreditManager_0_10 creditManager, - FilterManager filters,Map<String, Object> arguments, long subscriptionId) + FilterManager filters,Map<String, Object> arguments) { - _subscriptionID = subscriptionId; + _subscriptionID = SUB_ID_GENERATOR.getAndIncrement(); _session = session; _postIdSettingAction = new AddMessageDispositionListenerAction(session); _destination = destination; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 6de8486b56..3e1e8baddc 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -1322,6 +1322,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return _stopped; } + @Override + public String getVirtualHostName() + { + return _virtualHost == null ? null : _virtualHost.getName(); + } + public long getLastReceivedTime() { return _lastReceivedTime; @@ -1359,7 +1365,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi public String getAuthId() { - return getAuthorizedPrincipal().getName(); + return getAuthorizedPrincipal() == null ? null : getAuthorizedPrincipal().getName(); } public Integer getRemotePID() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java index c0d55dd438..f17b79d896 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java @@ -56,33 +56,23 @@ public interface SubscriptionFactory Subscription createSubscription(AMQChannel channel, - AMQProtocolSession protocolSession, - AMQShortString consumerTag, - boolean acks, - FieldTable filters, - boolean noLocal, - FlowCreditManager creditManager, - ClientDeliveryMethod clientMethod, - RecordDeliveryMethod recordMethod - ) - throws AMQException; + AMQProtocolSession protocolSession, + AMQShortString consumerTag, + boolean acks, + FieldTable filters, + boolean noLocal, + FlowCreditManager creditManager, + ClientDeliveryMethod clientMethod, + RecordDeliveryMethod recordMethod) throws AMQException; - SubscriptionImpl.GetNoAckSubscription createBasicGetNoAckSubscription(AMQChannel channel, - AMQProtocolSession session, - AMQShortString consumerTag, - FieldTable filters, - boolean noLocal, - FlowCreditManager creditManager, - ClientDeliveryMethod deliveryMethod, - RecordDeliveryMethod recordMethod) throws AMQException; + Subscription createBasicGetNoAckSubscription(AMQChannel channel, + AMQProtocolSession session, + AMQShortString consumerTag, + FieldTable filters, + boolean noLocal, + FlowCreditManager creditManager, + ClientDeliveryMethod deliveryMethod, + RecordDeliveryMethod recordMethod) throws AMQException; - Subscription_0_10 createSubscription(final ServerSession session, - final String destination, - final MessageAcceptMode acceptMode, - final MessageAcquireMode acquireMode, - final MessageFlowMode flowMode, - final FlowCreditManager_0_10 creditManager, - final FilterManager filterManager, - final Map<String,Object> arguments); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java index 286e62c18a..d28a0785b1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java @@ -25,25 +25,14 @@ import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.flow.FlowCreditManager; -import org.apache.qpid.server.protocol.v0_10.FlowCreditManager_0_10; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.protocol.v0_10.Subscription_0_10; -import org.apache.qpid.server.protocol.v0_10.ServerSession; import org.apache.qpid.server.subscription.ClientDeliveryMethod; import org.apache.qpid.server.subscription.RecordDeliveryMethod; import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.transport.MessageAcceptMode; -import org.apache.qpid.transport.MessageAcquireMode; -import org.apache.qpid.transport.MessageFlowMode; - -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; public class SubscriptionFactoryImpl implements SubscriptionFactory { - private static final AtomicLong SUB_ID_GENERATOR = new AtomicLong(0); public Subscription createSubscription(int channelId, AMQProtocolSession protocolSession, AMQShortString consumerTag, boolean acks, FieldTable filters, @@ -92,15 +81,15 @@ public class SubscriptionFactoryImpl implements SubscriptionFactory if(isBrowser) { - return new SubscriptionImpl.BrowserSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod, getNextSubscriptionId()); + return new SubscriptionImpl.BrowserSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod); } else if(acks) { - return new SubscriptionImpl.AckSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod, getNextSubscriptionId()); + return new SubscriptionImpl.AckSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod); } else { - return new SubscriptionImpl.NoAckSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod, getNextSubscriptionId()); + return new SubscriptionImpl.NoAckSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod); } } @@ -113,26 +102,9 @@ public class SubscriptionFactoryImpl implements SubscriptionFactory final ClientDeliveryMethod deliveryMethod, final RecordDeliveryMethod recordMethod) throws AMQException { - return new SubscriptionImpl.GetNoAckSubscription(channel, session, null, null, false, creditManager, deliveryMethod, recordMethod, getNextSubscriptionId()); - } - - public Subscription_0_10 createSubscription(final ServerSession session, - final String destination, - final MessageAcceptMode acceptMode, - final MessageAcquireMode acquireMode, - final MessageFlowMode flowMode, - final FlowCreditManager_0_10 creditManager, - final FilterManager filterManager, - final Map<String,Object> arguments) - { - return new Subscription_0_10(session, destination, acceptMode, acquireMode, - flowMode, creditManager, filterManager, arguments, getNextSubscriptionId()); + return new SubscriptionImpl.GetNoAckSubscription(channel, session, null, null, false, creditManager, deliveryMethod, recordMethod); } public static final SubscriptionFactoryImpl INSTANCE = new SubscriptionFactoryImpl(); - private static long getNextSubscriptionId() - { - return SUB_ID_GENERATOR.getAndIncrement(); - } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java index 67e7452b15..23fc335dc7 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java @@ -101,11 +101,10 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage AMQShortString consumerTag, FieldTable filters, boolean noLocal, FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, - RecordDeliveryMethod recordMethod, - long subscriptionID) + RecordDeliveryMethod recordMethod) throws AMQException { - super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID); + super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod); } @@ -152,12 +151,11 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage public NoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession, AMQShortString consumerTag, FieldTable filters, boolean noLocal, FlowCreditManager creditManager, - ClientDeliveryMethod deliveryMethod, - RecordDeliveryMethod recordMethod, - long subscriptionID) + ClientDeliveryMethod deliveryMethod, + RecordDeliveryMethod recordMethod) throws AMQException { - super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID); + super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod); } @@ -241,14 +239,13 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage public static final class GetNoAckSubscription extends SubscriptionImpl.NoAckSubscription { public GetNoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession, - AMQShortString consumerTag, FieldTable filters, - boolean noLocal, FlowCreditManager creditManager, - ClientDeliveryMethod deliveryMethod, - RecordDeliveryMethod recordMethod, - long subscriptionID) + AMQShortString consumerTag, FieldTable filters, + boolean noLocal, FlowCreditManager creditManager, + ClientDeliveryMethod deliveryMethod, + RecordDeliveryMethod recordMethod) throws AMQException { - super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID); + super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod); } public boolean isTransient() @@ -268,12 +265,11 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage public AckSubscription(AMQChannel channel, AMQProtocolSession protocolSession, AMQShortString consumerTag, FieldTable filters, boolean noLocal, FlowCreditManager creditManager, - ClientDeliveryMethod deliveryMethod, - RecordDeliveryMethod recordMethod, - long subscriptionID) + ClientDeliveryMethod deliveryMethod, + RecordDeliveryMethod recordMethod) throws AMQException { - super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID); + super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod); } @@ -336,15 +332,14 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage - public SubscriptionImpl(AMQChannel channel , AMQProtocolSession protocolSession, + public SubscriptionImpl(AMQChannel channel, AMQProtocolSession protocolSession, AMQShortString consumerTag, FieldTable arguments, boolean noLocal, FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, - RecordDeliveryMethod recordMethod, - long subscriptionID) + RecordDeliveryMethod recordMethod) throws AMQException { - _subscriptionID = subscriptionID; + _subscriptionID = SUB_ID_GENERATOR.getAndIncrement(); _channel = channel; _consumerTag = consumerTag; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index bf5f34e17a..320875cc97 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -242,6 +242,12 @@ public class Connection_1_0 implements ConnectionEventListener } @Override + public String getVirtualHostName() + { + return _vhost == null ? null : _vhost.getName(); + } + + @Override public Port getPort() { return _port; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java index 7a924abb3d..36d49d8279 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.subscription; +import java.util.concurrent.atomic.AtomicLong; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.logging.LogActor; @@ -29,6 +30,8 @@ import org.apache.qpid.server.queue.QueueEntry; public interface Subscription { + AtomicLong SUB_ID_GENERATOR = new AtomicLong(0); + LogActor getLogActor(); boolean isTransient(); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java index 7082705cb5..421adb33a8 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java @@ -18,16 +18,18 @@ */ package org.apache.qpid.server.protocol.v0_10; +import org.apache.qpid.server.logging.RootMessageLogger; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.GenericActor; -import org.apache.qpid.server.protocol.v0_10.ServerConnection; -import org.apache.qpid.server.protocol.v0_10.ServerSession; -import org.apache.qpid.server.protocol.v0_10.ServerSessionDelegate; +import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.transport.Binary; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class ServerSessionTest extends QpidTestCase { @@ -61,13 +63,15 @@ public class ServerSessionTest extends QpidTestCase public void testCompareTo() throws Exception { - ServerConnection connection = new ServerConnection(1); + final Broker broker = mock(Broker.class); + when(broker.getRootMessageLogger()).thenReturn(mock(RootMessageLogger.class)); + ServerConnection connection = new ServerConnection(1, broker); connection.setVirtualHost(_virtualHost); ServerSession session1 = new ServerSession(connection, new ServerSessionDelegate(), new Binary(getName().getBytes()), 0); // create a session with the same name but on a different connection - ServerConnection connection2 = new ServerConnection(2); + ServerConnection connection2 = new ServerConnection(2, broker); connection2.setVirtualHost(_virtualHost); ServerSession session2 = new ServerSession(connection2, new ServerSessionDelegate(), new Binary(getName().getBytes()), 0); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java index ed60d5374b..4fa35c2ceb 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java @@ -569,5 +569,11 @@ public class MockSubscription implements Subscription { return false; } + + @Override + public String getVirtualHostName() + { + return null; + } } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java index 64521b64e2..7bc02ad517 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java @@ -23,6 +23,9 @@ package org.apache.qpid.server.subscription; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.protocol.v0_10.Subscription_0_10; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_10.WindowCreditManager; import org.apache.qpid.server.logging.UnitTestMessageLogger; @@ -41,6 +44,9 @@ import org.apache.qpid.transport.MessageAcquireMode; import org.apache.qpid.transport.MessageFlowMode; import org.apache.qpid.transport.TestNetworkConnection; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class SubscriptionFactoryImplTest extends QpidTestCase { private AMQChannel _channel; @@ -104,14 +110,17 @@ public class SubscriptionFactoryImplTest extends QpidTestCase previousId = getNoAckSub.getSubscriptionID(); //create a 0-10 subscription - ServerConnection conn = new ServerConnection(1); + final Broker broker = mock(Broker.class); + when(broker.getRootMessageLogger()).thenReturn(mock(RootMessageLogger.class)); + + ServerConnection conn = new ServerConnection(1, broker); ProtocolEngine_0_10 engine = new ProtocolEngine_0_10(conn, new TestNetworkConnection(), null, null); conn.setVirtualHost(_session.getVirtualHost()); ServerSessionDelegate sesDel = new ServerSessionDelegate(); Binary name = new Binary(new byte[]{new Byte("1")}); ServerSession session = new ServerSession(conn, sesDel, name, 0); - Subscription sub_0_10 = SubscriptionFactoryImpl.INSTANCE.createSubscription(session, "1", MessageAcceptMode.EXPLICIT, + Subscription sub_0_10 = new Subscription_0_10(session, "1", MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED, MessageFlowMode.WINDOW, new WindowCreditManager(), null, null); assertEquals("Unexpected Subscription ID allocated", previousId + 1, sub_0_10.getSubscriptionID()); } |
