summaryrefslogtreecommitdiff
path: root/qpid/java/broker
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-07-14 22:55:54 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-07-14 22:55:54 +0000
commite745b78b3f111daa2c76ddb9cd1afd4ca10417e1 (patch)
treec3eb8853a37be5b35177f3a661ab503a1facf51c /qpid/java/broker
parentc4865a9355ff5362b525819090a486f075be042c (diff)
downloadqpid-python-e745b78b3f111daa2c76ddb9cd1afd4ca10417e1.tar.gz
QPID-4659 : [Java Broker] fix protocol version specific code in logging, subscriptions
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1503076 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java47
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java47
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java15
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java42
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java36
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java37
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java3
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java14
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java6
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java13
18 files changed, 134 insertions, 161 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java
index da280af059..a5fe14641c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.logging.actors;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.logging.RootMessageLogger;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
@@ -45,7 +46,7 @@ public class AMQPChannelActor extends AbstractActor
* @param channel The Channel for this LogActor
* @param rootLogger The root Logger that this LogActor should use
*/
- public AMQPChannelActor(AMQChannel channel, RootMessageLogger rootLogger)
+ public AMQPChannelActor(AMQSessionModel channel, RootMessageLogger rootLogger)
{
super(rootLogger);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java
index 1b4bc91bc1..a4bd002f01 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java
@@ -14,14 +14,15 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
+ *
*
- *
*/
package org.apache.qpid.server.logging.actors;
import org.apache.qpid.server.logging.RootMessageLogger;
import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -39,7 +40,7 @@ public class AMQPConnectionActor extends AbstractActor
{
private ConnectionLogSubject _logSubject;
- public AMQPConnectionActor(AMQProtocolSession session, RootMessageLogger rootLogger)
+ public AMQPConnectionActor(AMQConnectionModel session, RootMessageLogger rootLogger)
{
super(rootLogger);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
index 0ff26a64b6..5b0e34b73e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
@@ -20,20 +20,16 @@
*/
package org.apache.qpid.server.logging.subjects;
-import org.apache.qpid.server.protocol.v0_8.AMQChannel;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_10.ServerConnection;
-import org.apache.qpid.server.protocol.v0_10.ServerSession;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
public class ChannelLogSubject extends AbstractLogSubject
{
- public ChannelLogSubject(AMQChannel channel)
+ public ChannelLogSubject(AMQSessionModel session)
{
- AMQProtocolSession session = channel.getProtocolSession();
-
/**
* LOG FORMAT used by the AMQPConnectorActor follows
* ChannelLogSubject.CHANNEL_FORMAT : con:{0}({1}@{2}/{3})/ch:{4}.
@@ -47,39 +43,14 @@ public class ChannelLogSubject extends AbstractLogSubject
* 3 - Virtualhost
* 4 - Channel ID
*/
+ AMQConnectionModel connection = session.getConnectionModel();
setLogStringWithFormat(CHANNEL_FORMAT,
- session.getSessionID(),
- session.getAuthorizedPrincipal().getName(),
- session.getRemoteAddress(),
- session.getVirtualHost().getName(),
- channel.getChannelId());
- }
+ connection == null ? -1L : connection.getConnectionId(),
+ (connection == null || connection.getPrincipalAsString() == null) ? "?" : connection.getPrincipalAsString(),
+ (connection == null || connection.getRemoteAddressString() == null) ? "?" : connection.getRemoteAddressString(),
+ (connection == null || connection.getVirtualHostName() == null) ? "?" : connection.getVirtualHostName(),
+ session.getChannelId());
- public ChannelLogSubject(ServerSession session)
- {
- /**
- * LOG FORMAT used by the AMQPConnectorActor follows
- * ChannelLogSubject.CHANNEL_FORMAT : con:{0}({1}@{2}/{3})/ch:{4}.
- *
- * Uses a MessageFormat call to insert the required values according to
- * these indices:
- *
- * 0 - Connection ID
- * 1 - User ID
- * 2 - IP
- * 3 - Virtualhost
- * 4 - Channel ID
- */
- if(session.getConnection() instanceof ServerConnection)
- {
- ServerConnection connection = (ServerConnection) session.getConnection();
- setLogStringWithFormat(CHANNEL_FORMAT,
- connection == null ? -1L : connection.getConnectionId(),
- session.getAuthorizedPrincipal() == null ? "?" : session.getAuthorizedPrincipal().getName(),
- (connection == null || connection.getRemoteAddressString() == null) ? "?" : connection.getRemoteAddressString(),
- session.getVirtualHost().getName(),
- session.getChannel());
- }
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java
index 3b08a172b6..87c2377e0f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java
@@ -20,34 +20,33 @@
*/
package org.apache.qpid.server.logging.subjects;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
+import java.text.MessageFormat;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT;
-import java.text.MessageFormat;
-
/** The Connection LogSubject */
public class ConnectionLogSubject extends AbstractLogSubject
{
- public ConnectionLogSubject(AMQProtocolSession session)
+ // The Session this Actor is representing
+ private AMQConnectionModel _session;
+
+ public ConnectionLogSubject(AMQConnectionModel session)
{
_session = session;
}
- // The Session this Actor is representing
- private AMQProtocolSession _session;
-
// Used to stop re-creating the _logString when we reach our final format
private boolean _upToDate = false;
/**
* Update the LogString as the Connection process proceeds.
- *
+ *
* When the Session has an authorized ID add that to the string.
- *
+ *
* When the Session then gains a Vhost add that to the string, at this point
* we can set upToDate = true as the _logString will not need to be updated
* from this point onwards.
@@ -56,44 +55,44 @@ public class ConnectionLogSubject extends AbstractLogSubject
{
if (!_upToDate)
{
- if (_session.getAuthorizedPrincipal() != null)
+ if (_session.getPrincipalAsString() != null)
{
- if (_session.getVirtualHost() != null)
+ if (_session.getVirtualHostName() != null)
{
/**
* LOG FORMAT used by the AMQPConnectorActor follows
* ConnectionLogSubject.CONNECTION_FORMAT :
* con:{0}({1}@{2}/{3})
- *
+ *
* Uses a MessageFormat call to insert the required values
* according to these indices:
- *
+ *
* 0 - Connection ID 1 - User ID 2 - IP 3 - Virtualhost
*/
setLogString("[" + MessageFormat.format(CONNECTION_FORMAT,
- _session.getSessionID(),
- _session.getAuthorizedPrincipal().getName(),
- _session.getRemoteAddress(),
- _session.getVirtualHost().getName())
+ _session.getConnectionId(),
+ _session.getPrincipalAsString(),
+ _session.getRemoteAddressString(),
+ _session.getVirtualHostName())
+ "] ");
_upToDate = true;
- }
+ }
else
{
setLogString("[" + MessageFormat.format(USER_FORMAT,
- _session.getSessionID(),
- _session.getAuthorizedPrincipal().getName(),
- _session.getRemoteAddress())
+ _session.getConnectionId(),
+ _session.getPrincipalAsString(),
+ _session.getRemoteAddressString())
+ "] ");
}
- }
+ }
else
{
setLogString("[" + MessageFormat.format(SOCKET_FORMAT,
- _session.getSessionID(),
- _session.getRemoteAddress())
+ _session.getConnectionId(),
+ _session.getRemoteAddressString())
+ "] ");
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
index e757898b69..19c5d03e0c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
@@ -92,4 +92,6 @@ public interface AMQConnectionModel extends StatisticsGatherer
void stop();
boolean isStopped();
+
+ String getVirtualHostName();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
index ad54bc148a..ab50a33b9b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
@@ -76,7 +76,7 @@ public class ProtocolEngineCreator_0_10 implements ProtocolEngineCreator
final ConnectionDelegate connDelegate = new ServerConnectionDelegate(broker,
fqdn, broker.getSubjectCreator(address));
- ServerConnection conn = new ServerConnection(id);
+ ServerConnection conn = new ServerConnection(id,broker);
conn.setConnectionDelegate(connDelegate);
conn.setRemoteAddress(network.getRemoteAddress());
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index 1b0f068646..0015988ab7 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -32,9 +32,11 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.actors.AMQPConnectionActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
+import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.protocol.AMQConnectionModel;
@@ -60,7 +62,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
{
private Runnable _onOpenTask;
private AtomicBoolean _logClosed = new AtomicBoolean(false);
- private LogActor _actor = GenericActor.getInstance(this);
+ private LogActor _actor;
private Subject _authorizedSubject = null;
private Principal _authorizedPrincipal = null;
@@ -76,9 +78,10 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
private Transport _transport;
private volatile boolean _stopped;
- public ServerConnection(final long connectionId)
+ public ServerConnection(final long connectionId, Broker broker)
{
_connectionId = connectionId;
+ _actor = new AMQPConnectionActor(this, broker.getRootMessageLogger());
}
public Object getReference()
@@ -154,6 +157,12 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
}
@Override
+ public String getVirtualHostName()
+ {
+ return _virtualHost == null ? null : _virtualHost.getName();
+ }
+
+ @Override
public Port getPort()
{
return _port;
@@ -503,7 +512,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
public String getPrincipalAsString()
{
- return getAuthorizedPrincipal().getName();
+ return getAuthorizedPrincipal() == null ? null : getAuthorizedPrincipal().getName();
}
public long getSessionCountLimit()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 08be1fdd2b..8e79813216 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -45,7 +45,6 @@ import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.protocol.v0_8.SubscriptionFactoryImpl;
import org.apache.qpid.server.txn.AlreadyKnownDtxException;
import org.apache.qpid.server.txn.DtxNotSelectedException;
import org.apache.qpid.server.txn.IncorrectDtxStateException;
@@ -250,7 +249,7 @@ public class ServerSessionDelegate extends SessionDelegate
return;
}
- Subscription_0_10 sub = SubscriptionFactoryImpl.INSTANCE.createSubscription((ServerSession)session,
+ Subscription_0_10 sub = new Subscription_0_10((ServerSession)session,
destination,
method.getAcceptMode(),
method.getAcquireMode(),
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
index 68b956978a..87d482c44c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
@@ -132,9 +132,9 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
MessageAcquireMode acquireMode,
MessageFlowMode flowMode,
FlowCreditManager_0_10 creditManager,
- FilterManager filters,Map<String, Object> arguments, long subscriptionId)
+ FilterManager filters,Map<String, Object> arguments)
{
- _subscriptionID = subscriptionId;
+ _subscriptionID = SUB_ID_GENERATOR.getAndIncrement();
_session = session;
_postIdSettingAction = new AddMessageDispositionListenerAction(session);
_destination = destination;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index 6de8486b56..3e1e8baddc 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -1322,6 +1322,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
return _stopped;
}
+ @Override
+ public String getVirtualHostName()
+ {
+ return _virtualHost == null ? null : _virtualHost.getName();
+ }
+
public long getLastReceivedTime()
{
return _lastReceivedTime;
@@ -1359,7 +1365,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
public String getAuthId()
{
- return getAuthorizedPrincipal().getName();
+ return getAuthorizedPrincipal() == null ? null : getAuthorizedPrincipal().getName();
}
public Integer getRemotePID()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java
index c0d55dd438..f17b79d896 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java
@@ -56,33 +56,23 @@ public interface SubscriptionFactory
Subscription createSubscription(AMQChannel channel,
- AMQProtocolSession protocolSession,
- AMQShortString consumerTag,
- boolean acks,
- FieldTable filters,
- boolean noLocal,
- FlowCreditManager creditManager,
- ClientDeliveryMethod clientMethod,
- RecordDeliveryMethod recordMethod
- )
- throws AMQException;
+ AMQProtocolSession protocolSession,
+ AMQShortString consumerTag,
+ boolean acks,
+ FieldTable filters,
+ boolean noLocal,
+ FlowCreditManager creditManager,
+ ClientDeliveryMethod clientMethod,
+ RecordDeliveryMethod recordMethod) throws AMQException;
- SubscriptionImpl.GetNoAckSubscription createBasicGetNoAckSubscription(AMQChannel channel,
- AMQProtocolSession session,
- AMQShortString consumerTag,
- FieldTable filters,
- boolean noLocal,
- FlowCreditManager creditManager,
- ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod) throws AMQException;
+ Subscription createBasicGetNoAckSubscription(AMQChannel channel,
+ AMQProtocolSession session,
+ AMQShortString consumerTag,
+ FieldTable filters,
+ boolean noLocal,
+ FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod) throws AMQException;
- Subscription_0_10 createSubscription(final ServerSession session,
- final String destination,
- final MessageAcceptMode acceptMode,
- final MessageAcquireMode acquireMode,
- final MessageFlowMode flowMode,
- final FlowCreditManager_0_10 creditManager,
- final FilterManager filterManager,
- final Map<String,Object> arguments);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java
index 286e62c18a..d28a0785b1 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java
@@ -25,25 +25,14 @@ import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.flow.FlowCreditManager;
-import org.apache.qpid.server.protocol.v0_10.FlowCreditManager_0_10;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_10.Subscription_0_10;
-import org.apache.qpid.server.protocol.v0_10.ServerSession;
import org.apache.qpid.server.subscription.ClientDeliveryMethod;
import org.apache.qpid.server.subscription.RecordDeliveryMethod;
import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.transport.MessageAcceptMode;
-import org.apache.qpid.transport.MessageAcquireMode;
-import org.apache.qpid.transport.MessageFlowMode;
-
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
public class SubscriptionFactoryImpl implements SubscriptionFactory
{
- private static final AtomicLong SUB_ID_GENERATOR = new AtomicLong(0);
public Subscription createSubscription(int channelId, AMQProtocolSession protocolSession,
AMQShortString consumerTag, boolean acks, FieldTable filters,
@@ -92,15 +81,15 @@ public class SubscriptionFactoryImpl implements SubscriptionFactory
if(isBrowser)
{
- return new SubscriptionImpl.BrowserSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod, getNextSubscriptionId());
+ return new SubscriptionImpl.BrowserSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod);
}
else if(acks)
{
- return new SubscriptionImpl.AckSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod, getNextSubscriptionId());
+ return new SubscriptionImpl.AckSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod);
}
else
{
- return new SubscriptionImpl.NoAckSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod, getNextSubscriptionId());
+ return new SubscriptionImpl.NoAckSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod);
}
}
@@ -113,26 +102,9 @@ public class SubscriptionFactoryImpl implements SubscriptionFactory
final ClientDeliveryMethod deliveryMethod,
final RecordDeliveryMethod recordMethod) throws AMQException
{
- return new SubscriptionImpl.GetNoAckSubscription(channel, session, null, null, false, creditManager, deliveryMethod, recordMethod, getNextSubscriptionId());
- }
-
- public Subscription_0_10 createSubscription(final ServerSession session,
- final String destination,
- final MessageAcceptMode acceptMode,
- final MessageAcquireMode acquireMode,
- final MessageFlowMode flowMode,
- final FlowCreditManager_0_10 creditManager,
- final FilterManager filterManager,
- final Map<String,Object> arguments)
- {
- return new Subscription_0_10(session, destination, acceptMode, acquireMode,
- flowMode, creditManager, filterManager, arguments, getNextSubscriptionId());
+ return new SubscriptionImpl.GetNoAckSubscription(channel, session, null, null, false, creditManager, deliveryMethod, recordMethod);
}
public static final SubscriptionFactoryImpl INSTANCE = new SubscriptionFactoryImpl();
- private static long getNextSubscriptionId()
- {
- return SUB_ID_GENERATOR.getAndIncrement();
- }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java
index 67e7452b15..23fc335dc7 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java
@@ -101,11 +101,10 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
AMQShortString consumerTag, FieldTable filters,
boolean noLocal, FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod,
- long subscriptionID)
+ RecordDeliveryMethod recordMethod)
throws AMQException
{
- super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID);
+ super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
}
@@ -152,12 +151,11 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
public NoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
AMQShortString consumerTag, FieldTable filters,
boolean noLocal, FlowCreditManager creditManager,
- ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod,
- long subscriptionID)
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod)
throws AMQException
{
- super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID);
+ super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
}
@@ -241,14 +239,13 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
public static final class GetNoAckSubscription extends SubscriptionImpl.NoAckSubscription
{
public GetNoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
- AMQShortString consumerTag, FieldTable filters,
- boolean noLocal, FlowCreditManager creditManager,
- ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod,
- long subscriptionID)
+ AMQShortString consumerTag, FieldTable filters,
+ boolean noLocal, FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod)
throws AMQException
{
- super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID);
+ super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
}
public boolean isTransient()
@@ -268,12 +265,11 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
public AckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
AMQShortString consumerTag, FieldTable filters,
boolean noLocal, FlowCreditManager creditManager,
- ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod,
- long subscriptionID)
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod)
throws AMQException
{
- super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod, subscriptionID);
+ super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
}
@@ -336,15 +332,14 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
- public SubscriptionImpl(AMQChannel channel , AMQProtocolSession protocolSession,
+ public SubscriptionImpl(AMQChannel channel, AMQProtocolSession protocolSession,
AMQShortString consumerTag, FieldTable arguments,
boolean noLocal, FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod,
- long subscriptionID)
+ RecordDeliveryMethod recordMethod)
throws AMQException
{
- _subscriptionID = subscriptionID;
+ _subscriptionID = SUB_ID_GENERATOR.getAndIncrement();
_channel = channel;
_consumerTag = consumerTag;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
index bf5f34e17a..320875cc97 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
@@ -242,6 +242,12 @@ public class Connection_1_0 implements ConnectionEventListener
}
@Override
+ public String getVirtualHostName()
+ {
+ return _vhost == null ? null : _vhost.getName();
+ }
+
+ @Override
public Port getPort()
{
return _port;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
index 7a924abb3d..36d49d8279 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.subscription;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.logging.LogActor;
@@ -29,6 +30,8 @@ import org.apache.qpid.server.queue.QueueEntry;
public interface Subscription
{
+ AtomicLong SUB_ID_GENERATOR = new AtomicLong(0);
+
LogActor getLogActor();
boolean isTransient();
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
index 7082705cb5..421adb33a8 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
@@ -18,16 +18,18 @@
*/
package org.apache.qpid.server.protocol.v0_10;
+import org.apache.qpid.server.logging.RootMessageLogger;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.GenericActor;
-import org.apache.qpid.server.protocol.v0_10.ServerConnection;
-import org.apache.qpid.server.protocol.v0_10.ServerSession;
-import org.apache.qpid.server.protocol.v0_10.ServerSessionDelegate;
+import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.transport.Binary;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
public class ServerSessionTest extends QpidTestCase
{
@@ -61,13 +63,15 @@ public class ServerSessionTest extends QpidTestCase
public void testCompareTo() throws Exception
{
- ServerConnection connection = new ServerConnection(1);
+ final Broker broker = mock(Broker.class);
+ when(broker.getRootMessageLogger()).thenReturn(mock(RootMessageLogger.class));
+ ServerConnection connection = new ServerConnection(1, broker);
connection.setVirtualHost(_virtualHost);
ServerSession session1 = new ServerSession(connection, new ServerSessionDelegate(),
new Binary(getName().getBytes()), 0);
// create a session with the same name but on a different connection
- ServerConnection connection2 = new ServerConnection(2);
+ ServerConnection connection2 = new ServerConnection(2, broker);
connection2.setVirtualHost(_virtualHost);
ServerSession session2 = new ServerSession(connection2, new ServerSessionDelegate(),
new Binary(getName().getBytes()), 0);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
index ed60d5374b..4fa35c2ceb 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
@@ -569,5 +569,11 @@ public class MockSubscription implements Subscription
{
return false;
}
+
+ @Override
+ public String getVirtualHostName()
+ {
+ return null;
+ }
}
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java
index 64521b64e2..7bc02ad517 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java
@@ -23,6 +23,9 @@ package org.apache.qpid.server.subscription;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.protocol.v0_10.Subscription_0_10;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_10.WindowCreditManager;
import org.apache.qpid.server.logging.UnitTestMessageLogger;
@@ -41,6 +44,9 @@ import org.apache.qpid.transport.MessageAcquireMode;
import org.apache.qpid.transport.MessageFlowMode;
import org.apache.qpid.transport.TestNetworkConnection;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
public class SubscriptionFactoryImplTest extends QpidTestCase
{
private AMQChannel _channel;
@@ -104,14 +110,17 @@ public class SubscriptionFactoryImplTest extends QpidTestCase
previousId = getNoAckSub.getSubscriptionID();
//create a 0-10 subscription
- ServerConnection conn = new ServerConnection(1);
+ final Broker broker = mock(Broker.class);
+ when(broker.getRootMessageLogger()).thenReturn(mock(RootMessageLogger.class));
+
+ ServerConnection conn = new ServerConnection(1, broker);
ProtocolEngine_0_10 engine = new ProtocolEngine_0_10(conn, new TestNetworkConnection(), null, null);
conn.setVirtualHost(_session.getVirtualHost());
ServerSessionDelegate sesDel = new ServerSessionDelegate();
Binary name = new Binary(new byte[]{new Byte("1")});
ServerSession session = new ServerSession(conn, sesDel, name, 0);
- Subscription sub_0_10 = SubscriptionFactoryImpl.INSTANCE.createSubscription(session, "1", MessageAcceptMode.EXPLICIT,
+ Subscription sub_0_10 = new Subscription_0_10(session, "1", MessageAcceptMode.EXPLICIT,
MessageAcquireMode.PRE_ACQUIRED, MessageFlowMode.WINDOW, new WindowCreditManager(), null, null);
assertEquals("Unexpected Subscription ID allocated", previousId + 1, sub_0_10.getSubscriptionID());
}