From ba01534206bc194dab376f25fcc3fa3687d0dc2c Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Wed, 22 Jul 2009 09:52:02 +0000 Subject: QPID-1992 : Addition of new Broker Logging Framework Provided static CurrentActor for accessing ThreadLocal. Included Test to validate setting of ThreadLocals. Added Test for AMQPActor Added getRootMessageLogger() to IApplicationRegistry Adjusted *ProtocolSessions to start counting at 0. Allowed Setting of Vhost on the MockProtocolSession Created a fixed Principle in MockProtocolSession Changes to MockProtocolSession, prevent NPEs when the AMQPActor creates its log string. Converted CurrentActor to use a Stack allowing a variety of actors to take their turn on a thread. Improved package structure Added testing for Actors Moved FileMonitorTools functionality to FileUtils and provided a Test Converted Log4jMessageLoggerTest to a proper UnitTest Moved Test cases to test package Updated other broker tests to set the authenticated user before setting the virtualhost, Whilst the logging could output null as the username it would be better if the tests correctly set the authorizedID. Update to include tests for disabled logging Fully tested LogSubjects Updated MockAMQQueue to be able to take a Virtualhost as per a normal Queue. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@796650 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/configuration/ServerConfiguration.java | 17 ++- .../qpid/server/handler/ChannelOpenHandler.java | 7 +- .../apache/qpid/server/logging/BrokerMessages.java | 34 ++++++ .../org/apache/qpid/server/logging/LogActor.java | 43 ++++++++ .../org/apache/qpid/server/logging/LogMessage.java | 26 +++++ .../org/apache/qpid/server/logging/LogSubject.java | 37 +++++++ .../qpid/server/logging/RawMessageLogger.java | 44 ++++++++ .../qpid/server/logging/RootMessageLogger.java | 56 ++++++++++ .../qpid/server/logging/RootMessageLoggerImpl.java | 52 ++++++++++ .../server/logging/actors/AMQPChannelActor.java | 79 ++++++++++++++ .../server/logging/actors/AMQPConnectionActor.java | 115 +++++++++++++++++++++ .../qpid/server/logging/actors/AbstractActor.java | 45 ++++++++ .../qpid/server/logging/actors/CurrentActor.java | 54 ++++++++++ .../server/logging/actors/ManagementActor.java | 57 ++++++++++ .../logging/rawloggers/Log4jMessageLogger.java | 55 ++++++++++ .../logging/subjects/AbstractLogSubject.java | 64 ++++++++++++ .../server/logging/subjects/BindingLogSubject.java | 62 +++++++++++ .../server/logging/subjects/ChannelLogSubject.java | 54 ++++++++++ .../logging/subjects/ConnectionLogSubject.java | 48 +++++++++ .../logging/subjects/ExchangeLogSubject.java | 46 +++++++++ .../server/logging/subjects/QueueLogSubject.java | 45 ++++++++ .../logging/subjects/SubscriptionLogSubject.java | 49 +++++++++ .../server/protocol/AMQMinaProtocolSession.java | 54 ++++++++-- .../qpid/server/protocol/AMQProtocolSession.java | 3 + .../apache/qpid/server/queue/SimpleAMQQueue.java | 10 ++ .../qpid/server/registry/ApplicationRegistry.java | 8 ++ .../ConfigurationFileApplicationRegistry.java | 7 +- .../qpid/server/registry/IApplicationRegistry.java | 3 + .../qpid/server/subscription/Subscription.java | 2 + .../qpid/server/subscription/SubscriptionImpl.java | 11 ++ .../qpid/server/util/NullApplicationRegistry.java | 21 ++-- .../server/virtualhost/VirtualHostRegistry.java | 14 +++ 32 files changed, 1202 insertions(+), 20 deletions(-) create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/logging/BrokerMessages.java create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogActor.java create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogMessage.java create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogSubject.java create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RawMessageLogger.java create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLogger.java create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLoggerImpl.java create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLogger.java create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/AbstractLogSubject.java create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubject.java create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubject.java (limited to 'qpid/java/broker/src/main') diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java index e0d325a5b0..fc16b75e1a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java @@ -94,6 +94,7 @@ public class ServerConfiguration implements SignalHandler envVarMap.put("QPID_SOCKETWRITEBUFFER", "connector.socketWriteBuffer"); envVarMap.put("QPID_TCPNODELAY", "connector.tcpNoDelay"); envVarMap.put("QPID_ENABLEPOOLEDALLOCATOR", "advanced.enablePooledAllocator"); + envVarMap.put("QPID_STATUS-UPDATES", "status-updates"); } public ServerConfiguration(File configurationURL) throws ConfigurationException @@ -186,7 +187,12 @@ public class ServerConfiguration implements SignalHandler } return conf; } - + + public boolean getStatusEnabled() + { + return getConfig().getBoolean("status-updates", true); + } + // Our configuration class needs to make the interpolate method // public so it can be called below from the config method. private static class MyConfiguration extends CompositeConfiguration @@ -541,4 +547,13 @@ public class ServerConfiguration implements SignalHandler getConfig().getLong("housekeeping.expiredMessageCheckPeriod", DEFAULT_HOUSEKEEPING_PERIOD)); } + + public boolean getStatusUpdates() + { + // Retrieve the setting from configuration but default to on. + String value = getConfig().getString("status-updates", "on"); + + return value.equalsIgnoreCase("on"); + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java index 054674aed4..5d7adc6371 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java @@ -54,8 +54,11 @@ public class ChannelOpenHandler implements StateAwareMethodListener + * This is responsible for correctly formatting the LogActor String in the log + *

+ * [con:1(user@127.0.0.1/)/ch:1] + *

+ * To do this it requires access to the IO Layers as well as a Channel + */ +public class AMQPChannelActor extends AbstractActor +{ + + /** + * Create a new ChannelActor + * + * @param channel The Channel for this LogActor + * @param rootLogger The root Logger that this LogActor should use + */ + public AMQPChannelActor(AMQChannel channel, RootMessageLogger rootLogger) + { + super(rootLogger); + + AMQProtocolSession session = channel.getProtocolSession(); + + /** + * LOG FORMAT used by the AMQPConnectorActor follows + * ChannelLogSubject.CHANNEL_FORMAT : + * con:{0}({1}@{2}/{3})/ch:{4} + * + * Uses a MessageFormat call to insert the requried values according to + * these indicies: + * + * 0 - Connection ID + * 1 - User ID + * 2 - IP + * 3 - Virtualhost + */ + _logString = "[" + MessageFormat.format(ChannelLogSubject.CHANNEL_FORMAT, + session.getSessionID(), + session.getAuthorizedID().getName(), + session.getRemoteAddress(), + session.getVirtualHost().getName(), + channel.getChannelId()) + + "] "; + } +} + diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java new file mode 100644 index 0000000000..432b1d8203 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.logging.actors; + +import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.logging.subjects.ConnectionLogSubject; +import org.apache.qpid.server.protocol.AMQProtocolSession; + +import java.text.MessageFormat; + +/** + * An AMQPConnectionActor represtents a connectionthrough the AMQP port. + *

+ * This is responsible for correctly formatting the LogActor String in the log + *

+ * [ con:1(user@127.0.0.1/) ] + *

+ * To do this it requires access to the IO Layers. + */ +public class AMQPConnectionActor extends AbstractActor +{ + /** + * 0 - Connection ID + * 1 - Remote Address + */ + public static String SOCKET_FORMAT = "con:{0}({1})"; + + /** + * LOG FORMAT for the ConnectionLogSubject, + * Uses a MessageFormat call to insert the requried values according to + * these indicies: + * + * 0 - Connection ID + * 1 - User ID + * 2 - IP + */ + public static final String USER_FORMAT = "con:{0}({1}@{2})"; + + public AMQPConnectionActor(AMQProtocolSession session, RootMessageLogger rootLogger) + { + super(rootLogger); + + _logString = "[" + MessageFormat.format(SOCKET_FORMAT, + session.getSessionID(), + session.getRemoteAddress()) + + + "] "; + } + + /** + * Call when the connection has been authorized so that the logString + * can be updated with the new user identity. + * + * @param session the authorized session + */ + public void connectionAuthorized(AMQProtocolSession session) + { + _logString = "[" + MessageFormat.format(USER_FORMAT, + session.getSessionID(), + session.getAuthorizedID().getName(), + session.getRemoteAddress()) + + "] "; + + } + + /** + * Called once the user has been authenticated and they are now selecting + * the virtual host they wish to use. + * + * @param session the session that now has a virtualhost associated with it. + */ + public void virtualHostSelected(AMQProtocolSession session) + { + + /** + * LOG FORMAT used by the AMQPConnectorActor follows + * ConnectionLogSubject.CONNECTION_FORMAT : + * con:{0}({1}@{2}/{3}) + * + * Uses a MessageFormat call to insert the requried values according to + * these indicies: + * + * 0 - Connection ID + * 1 - User ID + * 2 - IP + * 3 - Virtualhost + */ + _logString = "[" + MessageFormat.format(ConnectionLogSubject.CONNECTION_FORMAT, + session.getSessionID(), + session.getAuthorizedID().getName(), + session.getRemoteAddress(), + session.getVirtualHost().getName()) + + "] "; + + } +} + diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java new file mode 100644 index 0000000000..95f2dc9ff6 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.actors; + +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.LogMessage; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.RootMessageLogger; + +public abstract class AbstractActor implements LogActor +{ + protected String _logString; + protected RootMessageLogger _rootLogger; + + public AbstractActor(RootMessageLogger rootLogger) + { + _rootLogger = rootLogger; + } + + public void message(LogSubject subject, LogMessage message) + { + if (_rootLogger.isMessageEnabled(this, subject)) + { + _rootLogger.rawMessage(_logString + String.valueOf(subject) + message); + } + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java new file mode 100644 index 0000000000..221e57eebb --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.actors; + +import org.apache.qpid.server.logging.LogActor; + +import java.util.LinkedList; +import java.util.Deque; + +public class CurrentActor +{ + private static final ThreadLocal> _currentActor = new ThreadLocal>() + { + protected Deque initialValue() + { + return new LinkedList(); + } + }; + + public static void set(LogActor actor) + { + Deque stack = _currentActor.get(); + stack.addFirst(actor); + } + + public static void remove() + { + Deque stack = _currentActor.get(); + stack.remove(); + } + + public static LogActor get() + { + return _currentActor.get().peek(); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java new file mode 100644 index 0000000000..58d55a13bb --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.actors; + +import org.apache.qpid.server.logging.RootMessageLogger; + +import java.text.MessageFormat; +import java.security.Principal; + +public class ManagementActor extends AbstractActor +{ + + /** + * LOG FORMAT for the ManagementActor, + * Uses a MessageFormat call to insert the requried values according to + * these indicies: + * + * 0 - Connection ID + * 1 - User ID + * 2 - IP + */ + public static final String MANAGEMENT_FORMAT = "mng:{0}({1}@{2})"; + + /** + * //todo Correct interface to provide connection details + * @param user + * @param rootLogger The RootLogger to use for this Actor + */ + public ManagementActor(Principal user, RootMessageLogger rootLogger) + { + super(rootLogger); + + _logString = "["+ MessageFormat.format(MANAGEMENT_FORMAT, + "", + user.getName(), + "") + + "] "; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLogger.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLogger.java new file mode 100644 index 0000000000..3774155626 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLogger.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.logging.rawloggers; + +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.qpid.server.logging.RawMessageLogger; + +public class Log4jMessageLogger implements RawMessageLogger +{ + public static final String DEFAULT_LEVEL = "INFO"; + public static final String DEFAULT_LOGGER = "qpid.message"; + private Level _level; + private Logger _rawMessageLogger; + + public Log4jMessageLogger() + { + this(DEFAULT_LEVEL, DEFAULT_LOGGER); + } + + public Log4jMessageLogger(String level, String logger) + { + _level = Level.toLevel(level); + + _rawMessageLogger = Logger.getLogger(logger); + } + + public void rawMessage(String message) + { + rawMessage(message, null); + } + + public void rawMessage(String message, Throwable throwable) + { + _rawMessageLogger.log(_level, message, throwable); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/AbstractLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/AbstractLogSubject.java new file mode 100644 index 0000000000..4fb5bdcc93 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/AbstractLogSubject.java @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.subjects; + +import org.apache.qpid.server.logging.LogSubject; + +import java.text.MessageFormat; + +/** + * The LogSubjects all have a similar requriement to format their output and + * provide the String value. + * + * This Abstract LogSubject provides this basic functionality, allowing the + * actual LogSubjects to provide their formating and data. + */ +public abstract class AbstractLogSubject implements LogSubject +{ + /** + * The logString that will be returned via toString + */ + protected String logString; + + /** + * Set the toString logging of this LogSubject. Based on a format provided + * by format and the var args. + * @param format The Message to format + * @param args The values to put in to the message. + */ + protected void setLogStringWithFormat(String format, Object... args) + { + logString = "[" + MessageFormat.format(format, args) + "] "; + } + + /** + * ToString is how the Logging infrastructure will get the text for this + * LogSubject + * + * @return String representing this LogSubject + */ + @Override + public String toString() + { + return logString; + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java new file mode 100644 index 0000000000..fd171fea5a --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java @@ -0,0 +1,62 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.subjects; + +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.queue.AMQQueue; + +public class BindingLogSubject extends AbstractLogSubject +{ + + /** + * LOG FORMAT for the ChannelLogSubject, + * Uses a MessageFormat call to insert the requried values according to + * these indicies: + * + * 0 - Virtualhost Name + * 1 - Exchange Type + * 2 - Exchange Name + * 3 - Queue Name + * 4 - Binding RoutingKey + */ + protected static String BINDING_FORMAT = "vh(/{0})/ex({1}/{2})/qu({3})/rk({4})"; + + /** + * Create a BindingLogSubject that Logs in the following format. + * + * [ vh(/)/ex(amq.direct)/qu(testQueue)/bd(testQueue) ] + * + * @param routingKey + * @param exchange + * @param queue + */ + public BindingLogSubject(AMQShortString routingKey, Exchange exchange, + AMQQueue queue) + { + setLogStringWithFormat(BINDING_FORMAT, queue.getVirtualHost().getName(), + exchange.getType(), + exchange.getName(), + queue.getName(), + routingKey); + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java new file mode 100644 index 0000000000..1b22de6d01 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.subjects; + +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.protocol.AMQProtocolSession; + +public class ChannelLogSubject extends AbstractLogSubject +{ + /** + * LOG FORMAT for the ChannelLogSubject, + * Uses a MessageFormat call to insert the requried values according to + * these indicies: + * + * 0 - Connection ID + * 1 - User ID + * 2 - IP + * 3 - Virtualhost + * 4 - Channel ID + */ + public static String CHANNEL_FORMAT = ConnectionLogSubject.CONNECTION_FORMAT + + "/ch:{4}"; + + public ChannelLogSubject(AMQChannel channel) + { + AMQProtocolSession session = channel.getProtocolSession(); + + // Provide the value for the 4th replacement. + setLogStringWithFormat(CHANNEL_FORMAT, + session.getSessionID(), + session.getAuthorizedID().getName(), + session.getRemoteAddress(), + session.getVirtualHost().getName(), + channel.getChannelId()); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java new file mode 100644 index 0000000000..e07dbcda23 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.subjects; + +import org.apache.qpid.server.protocol.AMQProtocolSession; + +/** The Connection LogSubject */ +public class ConnectionLogSubject extends AbstractLogSubject +{ + + /** + * LOG FORMAT for the ConnectionLogSubject, + * Uses a MessageFormat call to insert the requried values according to + * these indicies: + * + * 0 - Connection ID + * 1 - User ID + * 2 - IP + * 3 - Virtualhost + */ + public static final String CONNECTION_FORMAT = "con:{0}({1}@{2}/{3})"; + + public ConnectionLogSubject(AMQProtocolSession session) + { + setLogStringWithFormat(CONNECTION_FORMAT, session.getSessionID(), + session.getAuthorizedID().getName(), + session.getRemoteAddress(), + session.getVirtualHost().getName()); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubject.java new file mode 100644 index 0000000000..21e5f5e4ce --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubject.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.subjects; + +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.virtualhost.VirtualHost; + +public class ExchangeLogSubject extends AbstractLogSubject +{ + + /** + * LOG FORMAT for the ExchangeLogSubject, + * Uses a MessageFormat call to insert the requried values according to + * these indicies: + * + * 0 - Virtualhost Name + * 1 - Exchange Type + * 2 - Exchange Name + */ + protected static String BINDING_FORMAT = "vh(/{0})/ex({1}/{2})"; + + /** Create an ExchangeLogSubject that Logs in the following format. */ + public ExchangeLogSubject(Exchange exchange, VirtualHost vhost) + { + setLogStringWithFormat(BINDING_FORMAT, vhost.getName(), + exchange.getType(), exchange.getName()); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java new file mode 100644 index 0000000000..89f31ef477 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.subjects; + +import org.apache.qpid.server.queue.AMQQueue; + +public class QueueLogSubject extends AbstractLogSubject +{ + + /** + * LOG FORMAT for the ExchangeLogSubject, + * Uses a MessageFormat call to insert the requried values according to + * these indicies: + * + * 0 - Virtualhost name + * 1 - queue name + */ + protected static String BINDING_FORMAT = "vh(/{0})/qu({1})"; + + /** Create an QueueLogSubject that Logs in the following format. */ + public QueueLogSubject(AMQQueue queue) + { + setLogStringWithFormat(BINDING_FORMAT, + queue.getVirtualHost().getName(), + queue.getName()); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubject.java new file mode 100644 index 0000000000..b68ef2e9a9 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubject.java @@ -0,0 +1,49 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.subjects; + +import org.apache.qpid.server.subscription.Subscription; + +public class SubscriptionLogSubject extends AbstractLogSubject +{ + + /** + * LOG FORMAT for the SubscriptionLogSubject, + * Uses a MessageFormat call to insert the requried values according to + * these indicies: + * + * 0 - Subscription ID + * 1 - queue name + */ + protected static String BINDING_FORMAT = "sub:{0}(qu({1}))"; + + /** + * Create an QueueLogSubject that Logs in the following format. + * + * @param subscription + */ + public SubscriptionLogSubject(Subscription subscription) + { + + setLogStringWithFormat(BINDING_FORMAT, subscription.getSubscriptionID(), + subscription.getQueue().getName()); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 205ca73f13..e46a52f3bf 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -21,26 +21,39 @@ package org.apache.qpid.server.protocol; import org.apache.log4j.Logger; - +import org.apache.mina.common.CloseFuture; import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoServiceConfig; import org.apache.mina.common.IoSession; -import org.apache.mina.common.CloseFuture; import org.apache.mina.transport.vmpipe.VmPipeAddress; - import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.codec.AMQDecoder; import org.apache.qpid.common.ClientProperties; -import org.apache.qpid.framing.*; +import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ChannelCloseOkBody; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.HeartbeatBody; +import org.apache.qpid.framing.MethodDispatcher; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.ProtocolInitiation; +import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.handler.ServerMethodDispatcherImpl; +import org.apache.qpid.server.logging.actors.AMQPConnectionActor; +import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.output.ProtocolOutputConverter; @@ -54,7 +67,6 @@ import org.apache.qpid.transport.Sender; import javax.management.JMException; import javax.security.sasl.SaslServer; - import java.net.InetSocketAddress; import java.net.SocketAddress; import java.security.Principal; @@ -64,6 +76,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicLong; public class AMQMinaProtocolSession implements AMQProtocolSession, Managable { @@ -71,6 +84,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString(); + private static final AtomicLong idGenerator = new AtomicLong(0); + // to save boxing the channelId and looking up in a map... cache in an array the low numbered // channels. This value must be of the form 2^x - 1. private static final int CHANNEL_CACHE_SIZE = 0xff; @@ -120,6 +135,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable private static final long LAST_WRITE_FUTURE_JOIN_TIMEOUT = 60000L; private org.apache.mina.common.WriteFuture _lastWriteFuture; + // Create a simple ID that increments for ever new Session + private final long _sessionID = idGenerator.getAndIncrement(); + + private AMQPConnectionActor _actor; + public ManagedObject getManagedObject() { return _managedObject; @@ -134,6 +154,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable _codecFactory = codecFactory; + _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger()); + try { IoServiceConfig config = session.getServiceConfig(); @@ -158,6 +180,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable _codecFactory = codecFactory; + _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger()); } private AMQProtocolSessionMBean createMBean() throws AMQException @@ -183,6 +206,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable return (AMQProtocolSession) minaProtocolSession.getAttachment(); } + public long getSessionID() + { + return _sessionID; + } + public void dataBlockReceived(AMQDataBlock message) throws Exception { _lastReceived = message; @@ -235,6 +263,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable } } + CurrentActor.set(_actor); try { body.handle(channelId, this); @@ -244,7 +273,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable closeChannel(channelId); throw e; } - + finally + { + CurrentActor.remove(); + } } private void protocolInitiationReceived(ProtocolInitiation pi) @@ -796,6 +828,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable { _virtualHost = virtualHost; + _actor.virtualHostSelected(this); + _virtualHost.getConnectionRegistry().registerConnection(this); _managedObject = createMBean(); @@ -820,6 +854,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable public void setAuthorizedID(Principal authorizedID) { _authorizedID = authorizedID; + + // Let the actor know that this connection is now Authorized + _actor.connectionAuthorized(this); } public Principal getAuthorizedID() @@ -827,6 +864,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable return _authorizedID; } + public SocketAddress getRemoteAddress() + { + return _minaProtocolSession.getRemoteAddress(); + } + public MethodRegistry getMethodRegistry() { return MethodRegistry.getMethodRegistry(getProtocolVersion()); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index 1bac601225..f721730d9c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -36,6 +36,7 @@ import java.security.Principal; public interface AMQProtocolSession extends AMQVersionAwareProtocolSession { + long getSessionID(); public static final class ProtocolSessionIdentifier { @@ -198,6 +199,8 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession /** @return a Principal that was used to authorized this session */ Principal getAuthorizedID(); + public java.net.SocketAddress getRemoteAddress(); + public MethodRegistry getMethodRegistry(); public MethodDispatcher getMethodDispatcher(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index e994967dc5..8c66508307 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -202,6 +202,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public void bind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException { + exchange.registerQueue(routingKey, this, arguments); if (isDurable() && exchange.isDurable()) { @@ -209,6 +210,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } _bindings.addBinding(routingKey, arguments, exchange); +// ExchangeBinding binding = new ExchangeBinding(routingKey, exchange, arguments); + + //fixme MR logging in progress +// _bindings.addBinding(binding); +// +// if (_logger.isMessageEnabled(binding)) +// { +// _logger.message(binding, "QM-1001 : Created Binding"); +// } } public void unBind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java index 22b4623ae1..b58b849133 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java @@ -35,6 +35,7 @@ import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.server.logging.RootMessageLogger; /** * An abstract application registry that provides access to configuration information and handles the @@ -70,6 +71,8 @@ public abstract class ApplicationRegistry implements IApplicationRegistry protected PluginManager _pluginManager; + protected RootMessageLogger _rootMessageLogger; + static { Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownService())); @@ -287,4 +290,9 @@ public abstract class ApplicationRegistry implements IApplicationRegistry return _pluginManager; } + public RootMessageLogger getRootMessageLogger() + { + return _rootMessageLogger; + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java index 39164883f9..31a85b878a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java @@ -33,6 +33,8 @@ import org.apache.qpid.server.security.auth.database.ConfigurationFilePrincipalD import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.server.logging.RootMessageLoggerImpl; +import org.apache.qpid.server.logging.rawloggers.Log4jMessageLogger; public class ConfigurationFileApplicationRegistry extends ApplicationRegistry { @@ -44,9 +46,12 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry public void initialise() throws Exception { + _rootMessageLogger = new RootMessageLoggerImpl(_configuration, + new Log4jMessageLogger()); + initialiseManagedObjectRegistry(); - _virtualHostRegistry = new VirtualHostRegistry(); + _virtualHostRegistry = new VirtualHostRegistry(this); _pluginManager = new PluginManager(_configuration.getPluginDirectory()); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java index bbfda3addc..7d17639f22 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java @@ -33,6 +33,7 @@ import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; import org.apache.qpid.server.security.access.ACLManager; import org.apache.qpid.server.security.access.ACLPlugin; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.server.logging.RootMessageLogger; import org.apache.mina.common.IoAcceptor; public interface IApplicationRegistry @@ -69,6 +70,8 @@ public interface IApplicationRegistry PluginManager getPluginManager(); + RootMessageLogger getRootMessageLogger(); + /** * Register any acceptors for this registry * @param bindAddress The address that the acceptor has been bound with diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java index 9419572399..19eabce9ff 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java @@ -52,6 +52,8 @@ public interface Subscription AMQShortString getConsumerTag(); + long getSubscriptionID(); + boolean isSuspended(); boolean hasInterest(QueueEntry msg); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index 7aa9d1e3af..51da884d1e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.subscription; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -66,6 +67,11 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage private QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this); private final Lock _stateChangeLock; + private static final AtomicLong idGenerator = new AtomicLong(0); + // Create a simple ID that increments for ever new Subscription + private final long _subscriptionID = idGenerator.getAndIncrement(); + + static final class BrowserSubscription extends SubscriptionImpl { public BrowserSubscription(AMQChannel channel, AMQProtocolSession protocolSession, @@ -526,6 +532,11 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage return _consumerTag; } + public long getSubscriptionID() + { + return _subscriptionID; + } + public AMQProtocolSession getProtocolSession() { return _channel.getProtocolSession(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java index eda2d3a94e..9ef1e029d3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java @@ -20,17 +20,12 @@ */ package org.apache.qpid.server.util; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.Properties; - -import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.MapConfiguration; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.configuration.VirtualHostConfiguration; +import org.apache.qpid.server.logging.RootMessageLoggerImpl; +import org.apache.qpid.server.logging.rawloggers.Log4jMessageLogger; import org.apache.qpid.server.management.NoopManagedObjectRegistry; import org.apache.qpid.server.plugins.PluginManager; import org.apache.qpid.server.registry.ApplicationRegistry; @@ -41,6 +36,10 @@ import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticat import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import java.util.Arrays; +import java.util.Collection; +import java.util.Properties; + public class NullApplicationRegistry extends ApplicationRegistry { public NullApplicationRegistry() throws ConfigurationException @@ -51,9 +50,11 @@ public class NullApplicationRegistry extends ApplicationRegistry public void initialise() throws Exception { _logger.info("Initialising NullApplicationRegistry"); - + + _rootMessageLogger = new RootMessageLoggerImpl(_configuration, new Log4jMessageLogger()); + _configuration.setHousekeepingExpiredMessageCheckPeriod(200); - + Properties users = new Properties(); users.put("guest", "guest"); @@ -65,7 +66,7 @@ public class NullApplicationRegistry extends ApplicationRegistry _authenticationManager = new PrincipalDatabaseAuthenticationManager(null, null); _managedObjectRegistry = new NoopManagedObjectRegistry(); - _virtualHostRegistry = new VirtualHostRegistry(); + _virtualHostRegistry = new VirtualHostRegistry(this); PropertiesConfiguration vhostProps = new PropertiesConfiguration(); VirtualHostConfiguration hostConfig = new VirtualHostConfiguration("test", vhostProps); VirtualHost dummyHost = new VirtualHost(hostConfig); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java index 27917fac8a..5543adbeb5 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.server.virtualhost; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; + import java.util.ArrayList; import java.util.Collection; import java.util.Map; @@ -32,6 +35,12 @@ public class VirtualHostRegistry private String _defaultVirtualHostName; + private ApplicationRegistry _applicationRegistry; + + public VirtualHostRegistry(ApplicationRegistry applicationRegistry) + { + _applicationRegistry = applicationRegistry; + } public synchronized void registerVirtualHost(VirtualHost host) throws Exception { @@ -67,4 +76,9 @@ public class VirtualHostRegistry { return new ArrayList(_registry.values()); } + + public ApplicationRegistry getApplicationRegistry() + { + return _applicationRegistry; + } } -- cgit v1.2.1