summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2009-07-22 09:52:02 +0000
committerMartin Ritchie <ritchiem@apache.org>2009-07-22 09:52:02 +0000
commitba01534206bc194dab376f25fcc3fa3687d0dc2c (patch)
tree6f7f204aa120473340f4fbc2c10889e463d475b9 /qpid/java/broker/src/main
parent33ee5b9247bd4d1e6b7eb88869286ed77c2baf17 (diff)
downloadqpid-python-ba01534206bc194dab376f25fcc3fa3687d0dc2c.tar.gz
QPID-1992 : Addition of new Broker Logging Framework
Provided static CurrentActor for accessing ThreadLocal. Included Test to validate setting of ThreadLocals. Added Test for AMQPActor Added getRootMessageLogger() to IApplicationRegistry Adjusted *ProtocolSessions to start counting at 0. Allowed Setting of Vhost on the MockProtocolSession Created a fixed Principle in MockProtocolSession Changes to MockProtocolSession, prevent NPEs when the AMQPActor creates its log string. Converted CurrentActor to use a Stack allowing a variety of actors to take their turn on a thread. Improved package structure Added testing for Actors Moved FileMonitorTools functionality to FileUtils and provided a Test Converted Log4jMessageLoggerTest to a proper UnitTest Moved Test cases to test package Updated other broker tests to set the authenticated user before setting the virtualhost, Whilst the logging could output null as the username it would be better if the tests correctly set the authorizedID. Update to include tests for disabled logging Fully tested LogSubjects Updated MockAMQQueue to be able to take a Virtualhost as per a normal Queue. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@796650 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src/main')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java17
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/BrokerMessages.java34
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogActor.java43
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogMessage.java26
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogSubject.java37
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RawMessageLogger.java44
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLogger.java56
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLoggerImpl.java52
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java79
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java115
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java45
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java54
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java57
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLogger.java55
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/AbstractLogSubject.java64
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java62
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java54
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java48
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubject.java46
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java45
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubject.java49
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java54
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java11
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java21
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java14
32 files changed, 1202 insertions, 20 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
index e0d325a5b0..fc16b75e1a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
@@ -94,6 +94,7 @@ public class ServerConfiguration implements SignalHandler
envVarMap.put("QPID_SOCKETWRITEBUFFER", "connector.socketWriteBuffer");
envVarMap.put("QPID_TCPNODELAY", "connector.tcpNoDelay");
envVarMap.put("QPID_ENABLEPOOLEDALLOCATOR", "advanced.enablePooledAllocator");
+ envVarMap.put("QPID_STATUS-UPDATES", "status-updates");
}
public ServerConfiguration(File configurationURL) throws ConfigurationException
@@ -186,7 +187,12 @@ public class ServerConfiguration implements SignalHandler
}
return conf;
}
-
+
+ public boolean getStatusEnabled()
+ {
+ return getConfig().getBoolean("status-updates", true);
+ }
+
// Our configuration class needs to make the interpolate method
// public so it can be called below from the config method.
private static class MyConfiguration extends CompositeConfiguration
@@ -541,4 +547,13 @@ public class ServerConfiguration implements SignalHandler
getConfig().getLong("housekeeping.expiredMessageCheckPeriod",
DEFAULT_HOUSEKEEPING_PERIOD));
}
+
+ public boolean getStatusUpdates()
+ {
+ // Retrieve the setting from configuration but default to on.
+ String value = getConfig().getString("status-updates", "on");
+
+ return value.equalsIgnoreCase("on");
+ }
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
index 054674aed4..5d7adc6371 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
@@ -54,8 +54,11 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
- final AMQChannel channel = new AMQChannel(session,channelId, virtualHost.getMessageStore()
- );
+ final AMQChannel channel = new AMQChannel(session,channelId,
+ virtualHost.getMessageStore());
+
+
+
session.addChannel(channel);
ChannelOpenOkBody response;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/BrokerMessages.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/BrokerMessages.java
new file mode 100644
index 0000000000..e9cc7449cd
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/BrokerMessages.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging;
+
+public class BrokerMessages
+{
+
+ public static LogMessage BRK_1001(String version, String build)
+ {
+ return new LogMessage()
+ {
+
+ };
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogActor.java
new file mode 100644
index 0000000000..203a5d160d
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogActor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging;
+
+/**
+ * LogActor the entity that is stored as in a ThreadLocal and used to perform logging.
+ *
+ * The actor is responsible for formatting its display name for the log entry.
+ *
+ * The actor performs the requested logging.
+ */
+public interface LogActor
+{
+ /**
+ * Logs the specified LogMessage about the LogSubject
+ *
+ * Currently logging has a global setting however this will later be revised and
+ * as such the LogActor will need to take into consideration any new configuration
+ * as a means of enabling the logging of LogActors and LogSubjects.
+ *
+ * @param subject The subject that is being logged
+ * @param message The message to log
+ */
+ public void message(LogSubject subject, LogMessage message);
+} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogMessage.java
new file mode 100644
index 0000000000..5c112ff100
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogMessage.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging;
+
+public interface LogMessage
+{
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogSubject.java
new file mode 100644
index 0000000000..e53ef364bf
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogSubject.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging;
+
+/**
+ * Each LogSubject that wishes to be logged will implement this to provide their
+ * own display representation.
+ *
+ * The display representation is retrieved through the toString() method.
+ */
+public interface LogSubject
+{
+ /**
+ * Logs the message as provided by String.valueOf(message).
+ *
+ * @returns String the display representation of this LogSubject
+ */
+ public String toString();
+} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RawMessageLogger.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RawMessageLogger.java
new file mode 100644
index 0000000000..7d515f3263
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RawMessageLogger.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging;
+
+/**
+ * A RawMessage Logger takes the given String and any Throwable and writes the
+ * data to its resource.
+ */
+public interface RawMessageLogger
+{
+
+ /**
+ * Log the given message.
+ *
+ * @param message String to log.
+ */
+ public void rawMessage(String message);
+
+ /**
+ * Log the message and formatted stack trace for any Throwable.
+ *
+ * @param message String to log.
+ * @param throwable Throwable for which to provide stack trace.
+ */
+ public void rawMessage(String message, Throwable throwable);
+} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLogger.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLogger.java
new file mode 100644
index 0000000000..cd7992faa7
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLogger.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging;
+
+/**
+ * The RootMessageLogger is used by the LogActors to query if
+ * logging is enabled for the requested message and to provide the actual
+ * message that should be logged.
+ */
+public interface RootMessageLogger
+{
+ /**
+ * Determine if the LogSubject and the LogActor should be
+ * generating log messages.
+ *
+ * @param subject The subject of this log request
+ * @param actor The actor requesting the logging
+ * @return boolean true if the message should be logged.
+ */
+ boolean isMessageEnabled(LogActor actor, LogSubject subject);
+
+
+ /**
+ * Log the raw message to the configured logger.
+ *
+ * @param message The message to log
+ */
+ public void rawMessage(String message);
+
+ /**
+ * Log the raw message to the configured logger.
+ * Along with a formated stack trace from the Throwable.
+ *
+ * @param message The message to log
+ * @param throwable Optional Throwable that should provide stact trace
+ */
+ void rawMessage(String message, Throwable throwable);
+} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLoggerImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLoggerImpl.java
new file mode 100644
index 0000000000..9270c316b6
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLoggerImpl.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging;
+
+import org.apache.qpid.server.configuration.ServerConfiguration;
+
+public class RootMessageLoggerImpl implements RootMessageLogger
+{
+ private boolean _enabled;
+
+ RawMessageLogger _rawLogger;
+ private static final String MESSAGE = "MESSAGE ";
+
+ public RootMessageLoggerImpl(ServerConfiguration configuration, RawMessageLogger rawLogger)
+ {
+ _enabled = configuration.getStatusUpdates();
+ _rawLogger = rawLogger;
+ }
+
+ public boolean isMessageEnabled(LogActor actor, LogSubject subject)
+ {
+ return _enabled;
+ }
+
+ public void rawMessage(String message)
+ {
+ _rawLogger.rawMessage(MESSAGE + message);
+ }
+
+ public void rawMessage(String message, Throwable throwable)
+ {
+ _rawLogger.rawMessage(MESSAGE + message, throwable);
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java
new file mode 100644
index 0000000000..3170040a77
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging.actors;
+
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.text.MessageFormat;
+
+/**
+ * An AMQPChannelActor represtents a connection through the AMQP port with an
+ * associated Channel.
+ *
+ * <p/>
+ * This is responsible for correctly formatting the LogActor String in the log
+ * <p/>
+ * [con:1(user@127.0.0.1/)/ch:1]
+ * <p/>
+ * To do this it requires access to the IO Layers as well as a Channel
+ */
+public class AMQPChannelActor extends AbstractActor
+{
+
+ /**
+ * Create a new ChannelActor
+ *
+ * @param channel The Channel for this LogActor
+ * @param rootLogger The root Logger that this LogActor should use
+ */
+ public AMQPChannelActor(AMQChannel channel, RootMessageLogger rootLogger)
+ {
+ super(rootLogger);
+
+ AMQProtocolSession session = channel.getProtocolSession();
+
+ /**
+ * LOG FORMAT used by the AMQPConnectorActor follows
+ * ChannelLogSubject.CHANNEL_FORMAT :
+ * con:{0}({1}@{2}/{3})/ch:{4}
+ *
+ * Uses a MessageFormat call to insert the requried values according to
+ * these indicies:
+ *
+ * 0 - Connection ID
+ * 1 - User ID
+ * 2 - IP
+ * 3 - Virtualhost
+ */
+ _logString = "[" + MessageFormat.format(ChannelLogSubject.CHANNEL_FORMAT,
+ session.getSessionID(),
+ session.getAuthorizedID().getName(),
+ session.getRemoteAddress(),
+ session.getVirtualHost().getName(),
+ channel.getChannelId())
+ + "] ";
+ }
+}
+
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java
new file mode 100644
index 0000000000..432b1d8203
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging.actors;
+
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+
+import java.text.MessageFormat;
+
+/**
+ * An AMQPConnectionActor represtents a connectionthrough the AMQP port.
+ * <p/>
+ * This is responsible for correctly formatting the LogActor String in the log
+ * <p/>
+ * [ con:1(user@127.0.0.1/) ]
+ * <p/>
+ * To do this it requires access to the IO Layers.
+ */
+public class AMQPConnectionActor extends AbstractActor
+{
+ /**
+ * 0 - Connection ID
+ * 1 - Remote Address
+ */
+ public static String SOCKET_FORMAT = "con:{0}({1})";
+
+ /**
+ * LOG FORMAT for the ConnectionLogSubject,
+ * Uses a MessageFormat call to insert the requried values according to
+ * these indicies:
+ *
+ * 0 - Connection ID
+ * 1 - User ID
+ * 2 - IP
+ */
+ public static final String USER_FORMAT = "con:{0}({1}@{2})";
+
+ public AMQPConnectionActor(AMQProtocolSession session, RootMessageLogger rootLogger)
+ {
+ super(rootLogger);
+
+ _logString = "[" + MessageFormat.format(SOCKET_FORMAT,
+ session.getSessionID(),
+ session.getRemoteAddress())
+
+ + "] ";
+ }
+
+ /**
+ * Call when the connection has been authorized so that the logString
+ * can be updated with the new user identity.
+ *
+ * @param session the authorized session
+ */
+ public void connectionAuthorized(AMQProtocolSession session)
+ {
+ _logString = "[" + MessageFormat.format(USER_FORMAT,
+ session.getSessionID(),
+ session.getAuthorizedID().getName(),
+ session.getRemoteAddress())
+ + "] ";
+
+ }
+
+ /**
+ * Called once the user has been authenticated and they are now selecting
+ * the virtual host they wish to use.
+ *
+ * @param session the session that now has a virtualhost associated with it.
+ */
+ public void virtualHostSelected(AMQProtocolSession session)
+ {
+
+ /**
+ * LOG FORMAT used by the AMQPConnectorActor follows
+ * ConnectionLogSubject.CONNECTION_FORMAT :
+ * con:{0}({1}@{2}/{3})
+ *
+ * Uses a MessageFormat call to insert the requried values according to
+ * these indicies:
+ *
+ * 0 - Connection ID
+ * 1 - User ID
+ * 2 - IP
+ * 3 - Virtualhost
+ */
+ _logString = "[" + MessageFormat.format(ConnectionLogSubject.CONNECTION_FORMAT,
+ session.getSessionID(),
+ session.getAuthorizedID().getName(),
+ session.getRemoteAddress(),
+ session.getVirtualHost().getName())
+ + "] ";
+
+ }
+}
+
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java
new file mode 100644
index 0000000000..95f2dc9ff6
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.actors;
+
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogMessage;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.RootMessageLogger;
+
+public abstract class AbstractActor implements LogActor
+{
+ protected String _logString;
+ protected RootMessageLogger _rootLogger;
+
+ public AbstractActor(RootMessageLogger rootLogger)
+ {
+ _rootLogger = rootLogger;
+ }
+
+ public void message(LogSubject subject, LogMessage message)
+ {
+ if (_rootLogger.isMessageEnabled(this, subject))
+ {
+ _rootLogger.rawMessage(_logString + String.valueOf(subject) + message);
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java
new file mode 100644
index 0000000000..221e57eebb
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.actors;
+
+import org.apache.qpid.server.logging.LogActor;
+
+import java.util.LinkedList;
+import java.util.Deque;
+
+public class CurrentActor
+{
+ private static final ThreadLocal<Deque<LogActor>> _currentActor = new ThreadLocal<Deque<LogActor>>()
+ {
+ protected Deque<LogActor> initialValue()
+ {
+ return new LinkedList<LogActor>();
+ }
+ };
+
+ public static void set(LogActor actor)
+ {
+ Deque<LogActor> stack = _currentActor.get();
+ stack.addFirst(actor);
+ }
+
+ public static void remove()
+ {
+ Deque<LogActor> stack = _currentActor.get();
+ stack.remove();
+ }
+
+ public static LogActor get()
+ {
+ return _currentActor.get().peek();
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java
new file mode 100644
index 0000000000..58d55a13bb
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.actors;
+
+import org.apache.qpid.server.logging.RootMessageLogger;
+
+import java.text.MessageFormat;
+import java.security.Principal;
+
+public class ManagementActor extends AbstractActor
+{
+
+ /**
+ * LOG FORMAT for the ManagementActor,
+ * Uses a MessageFormat call to insert the requried values according to
+ * these indicies:
+ *
+ * 0 - Connection ID
+ * 1 - User ID
+ * 2 - IP
+ */
+ public static final String MANAGEMENT_FORMAT = "mng:{0}({1}@{2})";
+
+ /**
+ * //todo Correct interface to provide connection details
+ * @param user
+ * @param rootLogger The RootLogger to use for this Actor
+ */
+ public ManagementActor(Principal user, RootMessageLogger rootLogger)
+ {
+ super(rootLogger);
+
+ _logString = "["+ MessageFormat.format(MANAGEMENT_FORMAT,
+ "<MNG:ConnectionID>",
+ user.getName(),
+ "<MNG:RemoteAddress>")
+ + "] ";
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLogger.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLogger.java
new file mode 100644
index 0000000000..3774155626
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLogger.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging.rawloggers;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.logging.RawMessageLogger;
+
+public class Log4jMessageLogger implements RawMessageLogger
+{
+ public static final String DEFAULT_LEVEL = "INFO";
+ public static final String DEFAULT_LOGGER = "qpid.message";
+ private Level _level;
+ private Logger _rawMessageLogger;
+
+ public Log4jMessageLogger()
+ {
+ this(DEFAULT_LEVEL, DEFAULT_LOGGER);
+ }
+
+ public Log4jMessageLogger(String level, String logger)
+ {
+ _level = Level.toLevel(level);
+
+ _rawMessageLogger = Logger.getLogger(logger);
+ }
+
+ public void rawMessage(String message)
+ {
+ rawMessage(message, null);
+ }
+
+ public void rawMessage(String message, Throwable throwable)
+ {
+ _rawMessageLogger.log(_level, message, throwable);
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/AbstractLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/AbstractLogSubject.java
new file mode 100644
index 0000000000..4fb5bdcc93
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/AbstractLogSubject.java
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import org.apache.qpid.server.logging.LogSubject;
+
+import java.text.MessageFormat;
+
+/**
+ * The LogSubjects all have a similar requriement to format their output and
+ * provide the String value.
+ *
+ * This Abstract LogSubject provides this basic functionality, allowing the
+ * actual LogSubjects to provide their formating and data.
+ */
+public abstract class AbstractLogSubject implements LogSubject
+{
+ /**
+ * The logString that will be returned via toString
+ */
+ protected String logString;
+
+ /**
+ * Set the toString logging of this LogSubject. Based on a format provided
+ * by format and the var args.
+ * @param format The Message to format
+ * @param args The values to put in to the message.
+ */
+ protected void setLogStringWithFormat(String format, Object... args)
+ {
+ logString = "[" + MessageFormat.format(format, args) + "] ";
+ }
+
+ /**
+ * ToString is how the Logging infrastructure will get the text for this
+ * LogSubject
+ *
+ * @return String representing this LogSubject
+ */
+ @Override
+ public String toString()
+ {
+ return logString;
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java
new file mode 100644
index 0000000000..fd171fea5a
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.queue.AMQQueue;
+
+public class BindingLogSubject extends AbstractLogSubject
+{
+
+ /**
+ * LOG FORMAT for the ChannelLogSubject,
+ * Uses a MessageFormat call to insert the requried values according to
+ * these indicies:
+ *
+ * 0 - Virtualhost Name
+ * 1 - Exchange Type
+ * 2 - Exchange Name
+ * 3 - Queue Name
+ * 4 - Binding RoutingKey
+ */
+ protected static String BINDING_FORMAT = "vh(/{0})/ex({1}/{2})/qu({3})/rk({4})";
+
+ /**
+ * Create a BindingLogSubject that Logs in the following format.
+ *
+ * [ vh(/)/ex(amq.direct)/qu(testQueue)/bd(testQueue) ]
+ *
+ * @param routingKey
+ * @param exchange
+ * @param queue
+ */
+ public BindingLogSubject(AMQShortString routingKey, Exchange exchange,
+ AMQQueue queue)
+ {
+ setLogStringWithFormat(BINDING_FORMAT, queue.getVirtualHost().getName(),
+ exchange.getType(),
+ exchange.getName(),
+ queue.getName(),
+ routingKey);
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
new file mode 100644
index 0000000000..1b22de6d01
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+
+public class ChannelLogSubject extends AbstractLogSubject
+{
+ /**
+ * LOG FORMAT for the ChannelLogSubject,
+ * Uses a MessageFormat call to insert the requried values according to
+ * these indicies:
+ *
+ * 0 - Connection ID
+ * 1 - User ID
+ * 2 - IP
+ * 3 - Virtualhost
+ * 4 - Channel ID
+ */
+ public static String CHANNEL_FORMAT = ConnectionLogSubject.CONNECTION_FORMAT
+ + "/ch:{4}";
+
+ public ChannelLogSubject(AMQChannel channel)
+ {
+ AMQProtocolSession session = channel.getProtocolSession();
+
+ // Provide the value for the 4th replacement.
+ setLogStringWithFormat(CHANNEL_FORMAT,
+ session.getSessionID(),
+ session.getAuthorizedID().getName(),
+ session.getRemoteAddress(),
+ session.getVirtualHost().getName(),
+ channel.getChannelId());
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java
new file mode 100644
index 0000000000..e07dbcda23
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+
+/** The Connection LogSubject */
+public class ConnectionLogSubject extends AbstractLogSubject
+{
+
+ /**
+ * LOG FORMAT for the ConnectionLogSubject,
+ * Uses a MessageFormat call to insert the requried values according to
+ * these indicies:
+ *
+ * 0 - Connection ID
+ * 1 - User ID
+ * 2 - IP
+ * 3 - Virtualhost
+ */
+ public static final String CONNECTION_FORMAT = "con:{0}({1}@{2}/{3})";
+
+ public ConnectionLogSubject(AMQProtocolSession session)
+ {
+ setLogStringWithFormat(CONNECTION_FORMAT, session.getSessionID(),
+ session.getAuthorizedID().getName(),
+ session.getRemoteAddress(),
+ session.getVirtualHost().getName());
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubject.java
new file mode 100644
index 0000000000..21e5f5e4ce
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubject.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class ExchangeLogSubject extends AbstractLogSubject
+{
+
+ /**
+ * LOG FORMAT for the ExchangeLogSubject,
+ * Uses a MessageFormat call to insert the requried values according to
+ * these indicies:
+ *
+ * 0 - Virtualhost Name
+ * 1 - Exchange Type
+ * 2 - Exchange Name
+ */
+ protected static String BINDING_FORMAT = "vh(/{0})/ex({1}/{2})";
+
+ /** Create an ExchangeLogSubject that Logs in the following format. */
+ public ExchangeLogSubject(Exchange exchange, VirtualHost vhost)
+ {
+ setLogStringWithFormat(BINDING_FORMAT, vhost.getName(),
+ exchange.getType(), exchange.getName());
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java
new file mode 100644
index 0000000000..89f31ef477
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import org.apache.qpid.server.queue.AMQQueue;
+
+public class QueueLogSubject extends AbstractLogSubject
+{
+
+ /**
+ * LOG FORMAT for the ExchangeLogSubject,
+ * Uses a MessageFormat call to insert the requried values according to
+ * these indicies:
+ *
+ * 0 - Virtualhost name
+ * 1 - queue name
+ */
+ protected static String BINDING_FORMAT = "vh(/{0})/qu({1})";
+
+ /** Create an QueueLogSubject that Logs in the following format. */
+ public QueueLogSubject(AMQQueue queue)
+ {
+ setLogStringWithFormat(BINDING_FORMAT,
+ queue.getVirtualHost().getName(),
+ queue.getName());
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubject.java
new file mode 100644
index 0000000000..b68ef2e9a9
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubject.java
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import org.apache.qpid.server.subscription.Subscription;
+
+public class SubscriptionLogSubject extends AbstractLogSubject
+{
+
+ /**
+ * LOG FORMAT for the SubscriptionLogSubject,
+ * Uses a MessageFormat call to insert the requried values according to
+ * these indicies:
+ *
+ * 0 - Subscription ID
+ * 1 - queue name
+ */
+ protected static String BINDING_FORMAT = "sub:{0}(qu({1}))";
+
+ /**
+ * Create an QueueLogSubject that Logs in the following format.
+ *
+ * @param subscription
+ */
+ public SubscriptionLogSubject(Subscription subscription)
+ {
+
+ setLogStringWithFormat(BINDING_FORMAT, subscription.getSubscriptionID(),
+ subscription.getQueue().getName());
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 205ca73f13..e46a52f3bf 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -21,26 +21,39 @@
package org.apache.qpid.server.protocol;
import org.apache.log4j.Logger;
-
+import org.apache.mina.common.CloseFuture;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.common.IoSession;
-import org.apache.mina.common.CloseFuture;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
-
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.codec.AMQDecoder;
import org.apache.qpid.common.ClientProperties;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.MethodDispatcher;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.handler.ServerMethodDispatcherImpl;
+import org.apache.qpid.server.logging.actors.AMQPConnectionActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.output.ProtocolOutputConverter;
@@ -54,7 +67,6 @@ import org.apache.qpid.transport.Sender;
import javax.management.JMException;
import javax.security.sasl.SaslServer;
-
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.security.Principal;
@@ -64,6 +76,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicLong;
public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
{
@@ -71,6 +84,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
+ private static final AtomicLong idGenerator = new AtomicLong(0);
+
// to save boxing the channelId and looking up in a map... cache in an array the low numbered
// channels. This value must be of the form 2^x - 1.
private static final int CHANNEL_CACHE_SIZE = 0xff;
@@ -120,6 +135,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
private static final long LAST_WRITE_FUTURE_JOIN_TIMEOUT = 60000L;
private org.apache.mina.common.WriteFuture _lastWriteFuture;
+ // Create a simple ID that increments for ever new Session
+ private final long _sessionID = idGenerator.getAndIncrement();
+
+ private AMQPConnectionActor _actor;
+
public ManagedObject getManagedObject()
{
return _managedObject;
@@ -134,6 +154,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
_codecFactory = codecFactory;
+ _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
+
try
{
IoServiceConfig config = session.getServiceConfig();
@@ -158,6 +180,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
_codecFactory = codecFactory;
+ _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
}
private AMQProtocolSessionMBean createMBean() throws AMQException
@@ -183,6 +206,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
return (AMQProtocolSession) minaProtocolSession.getAttachment();
}
+ public long getSessionID()
+ {
+ return _sessionID;
+ }
+
public void dataBlockReceived(AMQDataBlock message) throws Exception
{
_lastReceived = message;
@@ -235,6 +263,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
}
}
+ CurrentActor.set(_actor);
try
{
body.handle(channelId, this);
@@ -244,7 +273,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
closeChannel(channelId);
throw e;
}
-
+ finally
+ {
+ CurrentActor.remove();
+ }
}
private void protocolInitiationReceived(ProtocolInitiation pi)
@@ -796,6 +828,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
{
_virtualHost = virtualHost;
+ _actor.virtualHostSelected(this);
+
_virtualHost.getConnectionRegistry().registerConnection(this);
_managedObject = createMBean();
@@ -820,6 +854,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
public void setAuthorizedID(Principal authorizedID)
{
_authorizedID = authorizedID;
+
+ // Let the actor know that this connection is now Authorized
+ _actor.connectionAuthorized(this);
}
public Principal getAuthorizedID()
@@ -827,6 +864,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
return _authorizedID;
}
+ public SocketAddress getRemoteAddress()
+ {
+ return _minaProtocolSession.getRemoteAddress();
+ }
+
public MethodRegistry getMethodRegistry()
{
return MethodRegistry.getMethodRegistry(getProtocolVersion());
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index 1bac601225..f721730d9c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -36,6 +36,7 @@ import java.security.Principal;
public interface AMQProtocolSession extends AMQVersionAwareProtocolSession
{
+ long getSessionID();
public static final class ProtocolSessionIdentifier
{
@@ -198,6 +199,8 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession
/** @return a Principal that was used to authorized this session */
Principal getAuthorizedID();
+ public java.net.SocketAddress getRemoteAddress();
+
public MethodRegistry getMethodRegistry();
public MethodDispatcher getMethodDispatcher();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index e994967dc5..8c66508307 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -202,6 +202,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public void bind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException
{
+
exchange.registerQueue(routingKey, this, arguments);
if (isDurable() && exchange.isDurable())
{
@@ -209,6 +210,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
_bindings.addBinding(routingKey, arguments, exchange);
+// ExchangeBinding binding = new ExchangeBinding(routingKey, exchange, arguments);
+
+ //fixme MR logging in progress
+// _bindings.addBinding(binding);
+//
+// if (_logger.isMessageEnabled(binding))
+// {
+// _logger.message(binding, "QM-1001 : Created Binding");
+// }
}
public void unBind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
index 22b4623ae1..b58b849133 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
@@ -35,6 +35,7 @@ import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.logging.RootMessageLogger;
/**
* An abstract application registry that provides access to configuration information and handles the
@@ -70,6 +71,8 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
protected PluginManager _pluginManager;
+ protected RootMessageLogger _rootMessageLogger;
+
static
{
Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownService()));
@@ -287,4 +290,9 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
return _pluginManager;
}
+ public RootMessageLogger getRootMessageLogger()
+ {
+ return _rootMessageLogger;
+ }
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
index 39164883f9..31a85b878a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
@@ -33,6 +33,8 @@ import org.apache.qpid.server.security.auth.database.ConfigurationFilePrincipalD
import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.logging.RootMessageLoggerImpl;
+import org.apache.qpid.server.logging.rawloggers.Log4jMessageLogger;
public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
{
@@ -44,9 +46,12 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
public void initialise() throws Exception
{
+ _rootMessageLogger = new RootMessageLoggerImpl(_configuration,
+ new Log4jMessageLogger());
+
initialiseManagedObjectRegistry();
- _virtualHostRegistry = new VirtualHostRegistry();
+ _virtualHostRegistry = new VirtualHostRegistry(this);
_pluginManager = new PluginManager(_configuration.getPluginDirectory());
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
index bbfda3addc..7d17639f22 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
@@ -33,6 +33,7 @@ import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
import org.apache.qpid.server.security.access.ACLManager;
import org.apache.qpid.server.security.access.ACLPlugin;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.logging.RootMessageLogger;
import org.apache.mina.common.IoAcceptor;
public interface IApplicationRegistry
@@ -69,6 +70,8 @@ public interface IApplicationRegistry
PluginManager getPluginManager();
+ RootMessageLogger getRootMessageLogger();
+
/**
* Register any acceptors for this registry
* @param bindAddress The address that the acceptor has been bound with
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
index 9419572399..19eabce9ff 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
@@ -52,6 +52,8 @@ public interface Subscription
AMQShortString getConsumerTag();
+ long getSubscriptionID();
+
boolean isSuspended();
boolean hasInterest(QueueEntry msg);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index 7aa9d1e3af..51da884d1e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.subscription;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -66,6 +67,11 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
private QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
private final Lock _stateChangeLock;
+ private static final AtomicLong idGenerator = new AtomicLong(0);
+ // Create a simple ID that increments for ever new Subscription
+ private final long _subscriptionID = idGenerator.getAndIncrement();
+
+
static final class BrowserSubscription extends SubscriptionImpl
{
public BrowserSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
@@ -526,6 +532,11 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
return _consumerTag;
}
+ public long getSubscriptionID()
+ {
+ return _subscriptionID;
+ }
+
public AMQProtocolSession getProtocolSession()
{
return _channel.getProtocolSession();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
index eda2d3a94e..9ef1e029d3 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
@@ -20,17 +20,12 @@
*/
package org.apache.qpid.server.util;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Properties;
-
-import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.MapConfiguration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.logging.RootMessageLoggerImpl;
+import org.apache.qpid.server.logging.rawloggers.Log4jMessageLogger;
import org.apache.qpid.server.management.NoopManagedObjectRegistry;
import org.apache.qpid.server.plugins.PluginManager;
import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -41,6 +36,10 @@ import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticat
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Properties;
+
public class NullApplicationRegistry extends ApplicationRegistry
{
public NullApplicationRegistry() throws ConfigurationException
@@ -51,9 +50,11 @@ public class NullApplicationRegistry extends ApplicationRegistry
public void initialise() throws Exception
{
_logger.info("Initialising NullApplicationRegistry");
-
+
+ _rootMessageLogger = new RootMessageLoggerImpl(_configuration, new Log4jMessageLogger());
+
_configuration.setHousekeepingExpiredMessageCheckPeriod(200);
-
+
Properties users = new Properties();
users.put("guest", "guest");
@@ -65,7 +66,7 @@ public class NullApplicationRegistry extends ApplicationRegistry
_authenticationManager = new PrincipalDatabaseAuthenticationManager(null, null);
_managedObjectRegistry = new NoopManagedObjectRegistry();
- _virtualHostRegistry = new VirtualHostRegistry();
+ _virtualHostRegistry = new VirtualHostRegistry(this);
PropertiesConfiguration vhostProps = new PropertiesConfiguration();
VirtualHostConfiguration hostConfig = new VirtualHostConfiguration("test", vhostProps);
VirtualHost dummyHost = new VirtualHost(hostConfig);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java
index 27917fac8a..5543adbeb5 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.server.virtualhost;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
@@ -32,6 +35,12 @@ public class VirtualHostRegistry
private String _defaultVirtualHostName;
+ private ApplicationRegistry _applicationRegistry;
+
+ public VirtualHostRegistry(ApplicationRegistry applicationRegistry)
+ {
+ _applicationRegistry = applicationRegistry;
+ }
public synchronized void registerVirtualHost(VirtualHost host) throws Exception
{
@@ -67,4 +76,9 @@ public class VirtualHostRegistry
{
return new ArrayList<VirtualHost>(_registry.values());
}
+
+ public ApplicationRegistry getApplicationRegistry()
+ {
+ return _applicationRegistry;
+ }
}