summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java17
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/BrokerMessages.java34
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogActor.java43
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogMessage.java26
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogSubject.java37
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RawMessageLogger.java44
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLogger.java56
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLoggerImpl.java52
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java79
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java115
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java45
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java54
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java57
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLogger.java55
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/AbstractLogSubject.java64
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java62
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java54
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java48
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubject.java46
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java45
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubject.java49
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java54
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java11
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java21
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java14
32 files changed, 1202 insertions, 20 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
index e0d325a5b0..fc16b75e1a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
@@ -94,6 +94,7 @@ public class ServerConfiguration implements SignalHandler
envVarMap.put("QPID_SOCKETWRITEBUFFER", "connector.socketWriteBuffer");
envVarMap.put("QPID_TCPNODELAY", "connector.tcpNoDelay");
envVarMap.put("QPID_ENABLEPOOLEDALLOCATOR", "advanced.enablePooledAllocator");
+ envVarMap.put("QPID_STATUS-UPDATES", "status-updates");
}
public ServerConfiguration(File configurationURL) throws ConfigurationException
@@ -186,7 +187,12 @@ public class ServerConfiguration implements SignalHandler
}
return conf;
}
-
+
+ public boolean getStatusEnabled()
+ {
+ return getConfig().getBoolean("status-updates", true);
+ }
+
// Our configuration class needs to make the interpolate method
// public so it can be called below from the config method.
private static class MyConfiguration extends CompositeConfiguration
@@ -541,4 +547,13 @@ public class ServerConfiguration implements SignalHandler
getConfig().getLong("housekeeping.expiredMessageCheckPeriod",
DEFAULT_HOUSEKEEPING_PERIOD));
}
+
+ public boolean getStatusUpdates()
+ {
+ // Retrieve the setting from configuration but default to on.
+ String value = getConfig().getString("status-updates", "on");
+
+ return value.equalsIgnoreCase("on");
+ }
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
index 054674aed4..5d7adc6371 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
@@ -54,8 +54,11 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
- final AMQChannel channel = new AMQChannel(session,channelId, virtualHost.getMessageStore()
- );
+ final AMQChannel channel = new AMQChannel(session,channelId,
+ virtualHost.getMessageStore());
+
+
+
session.addChannel(channel);
ChannelOpenOkBody response;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/BrokerMessages.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/BrokerMessages.java
new file mode 100644
index 0000000000..e9cc7449cd
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/BrokerMessages.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging;
+
+public class BrokerMessages
+{
+
+ public static LogMessage BRK_1001(String version, String build)
+ {
+ return new LogMessage()
+ {
+
+ };
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogActor.java
new file mode 100644
index 0000000000..203a5d160d
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogActor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging;
+
+/**
+ * LogActor the entity that is stored as in a ThreadLocal and used to perform logging.
+ *
+ * The actor is responsible for formatting its display name for the log entry.
+ *
+ * The actor performs the requested logging.
+ */
+public interface LogActor
+{
+ /**
+ * Logs the specified LogMessage about the LogSubject
+ *
+ * Currently logging has a global setting however this will later be revised and
+ * as such the LogActor will need to take into consideration any new configuration
+ * as a means of enabling the logging of LogActors and LogSubjects.
+ *
+ * @param subject The subject that is being logged
+ * @param message The message to log
+ */
+ public void message(LogSubject subject, LogMessage message);
+} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogMessage.java
new file mode 100644
index 0000000000..5c112ff100
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogMessage.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging;
+
+public interface LogMessage
+{
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogSubject.java
new file mode 100644
index 0000000000..e53ef364bf
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogSubject.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging;
+
+/**
+ * Each LogSubject that wishes to be logged will implement this to provide their
+ * own display representation.
+ *
+ * The display representation is retrieved through the toString() method.
+ */
+public interface LogSubject
+{
+ /**
+ * Logs the message as provided by String.valueOf(message).
+ *
+ * @returns String the display representation of this LogSubject
+ */
+ public String toString();
+} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RawMessageLogger.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RawMessageLogger.java
new file mode 100644
index 0000000000..7d515f3263
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RawMessageLogger.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging;
+
+/**
+ * A RawMessage Logger takes the given String and any Throwable and writes the
+ * data to its resource.
+ */
+public interface RawMessageLogger
+{
+
+ /**
+ * Log the given message.
+ *
+ * @param message String to log.
+ */
+ public void rawMessage(String message);
+
+ /**
+ * Log the message and formatted stack trace for any Throwable.
+ *
+ * @param message String to log.
+ * @param throwable Throwable for which to provide stack trace.
+ */
+ public void rawMessage(String message, Throwable throwable);
+} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLogger.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLogger.java
new file mode 100644
index 0000000000..cd7992faa7
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLogger.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging;
+
+/**
+ * The RootMessageLogger is used by the LogActors to query if
+ * logging is enabled for the requested message and to provide the actual
+ * message that should be logged.
+ */
+public interface RootMessageLogger
+{
+ /**
+ * Determine if the LogSubject and the LogActor should be
+ * generating log messages.
+ *
+ * @param subject The subject of this log request
+ * @param actor The actor requesting the logging
+ * @return boolean true if the message should be logged.
+ */
+ boolean isMessageEnabled(LogActor actor, LogSubject subject);
+
+
+ /**
+ * Log the raw message to the configured logger.
+ *
+ * @param message The message to log
+ */
+ public void rawMessage(String message);
+
+ /**
+ * Log the raw message to the configured logger.
+ * Along with a formated stack trace from the Throwable.
+ *
+ * @param message The message to log
+ * @param throwable Optional Throwable that should provide stact trace
+ */
+ void rawMessage(String message, Throwable throwable);
+} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLoggerImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLoggerImpl.java
new file mode 100644
index 0000000000..9270c316b6
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLoggerImpl.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging;
+
+import org.apache.qpid.server.configuration.ServerConfiguration;
+
+public class RootMessageLoggerImpl implements RootMessageLogger
+{
+ private boolean _enabled;
+
+ RawMessageLogger _rawLogger;
+ private static final String MESSAGE = "MESSAGE ";
+
+ public RootMessageLoggerImpl(ServerConfiguration configuration, RawMessageLogger rawLogger)
+ {
+ _enabled = configuration.getStatusUpdates();
+ _rawLogger = rawLogger;
+ }
+
+ public boolean isMessageEnabled(LogActor actor, LogSubject subject)
+ {
+ return _enabled;
+ }
+
+ public void rawMessage(String message)
+ {
+ _rawLogger.rawMessage(MESSAGE + message);
+ }
+
+ public void rawMessage(String message, Throwable throwable)
+ {
+ _rawLogger.rawMessage(MESSAGE + message, throwable);
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java
new file mode 100644
index 0000000000..3170040a77
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging.actors;
+
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.text.MessageFormat;
+
+/**
+ * An AMQPChannelActor represtents a connection through the AMQP port with an
+ * associated Channel.
+ *
+ * <p/>
+ * This is responsible for correctly formatting the LogActor String in the log
+ * <p/>
+ * [con:1(user@127.0.0.1/)/ch:1]
+ * <p/>
+ * To do this it requires access to the IO Layers as well as a Channel
+ */
+public class AMQPChannelActor extends AbstractActor
+{
+
+ /**
+ * Create a new ChannelActor
+ *
+ * @param channel The Channel for this LogActor
+ * @param rootLogger The root Logger that this LogActor should use
+ */
+ public AMQPChannelActor(AMQChannel channel, RootMessageLogger rootLogger)
+ {
+ super(rootLogger);
+
+ AMQProtocolSession session = channel.getProtocolSession();
+
+ /**
+ * LOG FORMAT used by the AMQPConnectorActor follows
+ * ChannelLogSubject.CHANNEL_FORMAT :
+ * con:{0}({1}@{2}/{3})/ch:{4}
+ *
+ * Uses a MessageFormat call to insert the requried values according to
+ * these indicies:
+ *
+ * 0 - Connection ID
+ * 1 - User ID
+ * 2 - IP
+ * 3 - Virtualhost
+ */
+ _logString = "[" + MessageFormat.format(ChannelLogSubject.CHANNEL_FORMAT,
+ session.getSessionID(),
+ session.getAuthorizedID().getName(),
+ session.getRemoteAddress(),
+ session.getVirtualHost().getName(),
+ channel.getChannelId())
+ + "] ";
+ }
+}
+
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java
new file mode 100644
index 0000000000..432b1d8203
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging.actors;
+
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+
+import java.text.MessageFormat;
+
+/**
+ * An AMQPConnectionActor represtents a connectionthrough the AMQP port.
+ * <p/>
+ * This is responsible for correctly formatting the LogActor String in the log
+ * <p/>
+ * [ con:1(user@127.0.0.1/) ]
+ * <p/>
+ * To do this it requires access to the IO Layers.
+ */
+public class AMQPConnectionActor extends AbstractActor
+{
+ /**
+ * 0 - Connection ID
+ * 1 - Remote Address
+ */
+ public static String SOCKET_FORMAT = "con:{0}({1})";
+
+ /**
+ * LOG FORMAT for the ConnectionLogSubject,
+ * Uses a MessageFormat call to insert the requried values according to
+ * these indicies:
+ *
+ * 0 - Connection ID
+ * 1 - User ID
+ * 2 - IP
+ */
+ public static final String USER_FORMAT = "con:{0}({1}@{2})";
+
+ public AMQPConnectionActor(AMQProtocolSession session, RootMessageLogger rootLogger)
+ {
+ super(rootLogger);
+
+ _logString = "[" + MessageFormat.format(SOCKET_FORMAT,
+ session.getSessionID(),
+ session.getRemoteAddress())
+
+ + "] ";
+ }
+
+ /**
+ * Call when the connection has been authorized so that the logString
+ * can be updated with the new user identity.
+ *
+ * @param session the authorized session
+ */
+ public void connectionAuthorized(AMQProtocolSession session)
+ {
+ _logString = "[" + MessageFormat.format(USER_FORMAT,
+ session.getSessionID(),
+ session.getAuthorizedID().getName(),
+ session.getRemoteAddress())
+ + "] ";
+
+ }
+
+ /**
+ * Called once the user has been authenticated and they are now selecting
+ * the virtual host they wish to use.
+ *
+ * @param session the session that now has a virtualhost associated with it.
+ */
+ public void virtualHostSelected(AMQProtocolSession session)
+ {
+
+ /**
+ * LOG FORMAT used by the AMQPConnectorActor follows
+ * ConnectionLogSubject.CONNECTION_FORMAT :
+ * con:{0}({1}@{2}/{3})
+ *
+ * Uses a MessageFormat call to insert the requried values according to
+ * these indicies:
+ *
+ * 0 - Connection ID
+ * 1 - User ID
+ * 2 - IP
+ * 3 - Virtualhost
+ */
+ _logString = "[" + MessageFormat.format(ConnectionLogSubject.CONNECTION_FORMAT,
+ session.getSessionID(),
+ session.getAuthorizedID().getName(),
+ session.getRemoteAddress(),
+ session.getVirtualHost().getName())
+ + "] ";
+
+ }
+}
+
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java
new file mode 100644
index 0000000000..95f2dc9ff6
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.actors;
+
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogMessage;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.RootMessageLogger;
+
+public abstract class AbstractActor implements LogActor
+{
+ protected String _logString;
+ protected RootMessageLogger _rootLogger;
+
+ public AbstractActor(RootMessageLogger rootLogger)
+ {
+ _rootLogger = rootLogger;
+ }
+
+ public void message(LogSubject subject, LogMessage message)
+ {
+ if (_rootLogger.isMessageEnabled(this, subject))
+ {
+ _rootLogger.rawMessage(_logString + String.valueOf(subject) + message);
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java
new file mode 100644
index 0000000000..221e57eebb
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.actors;
+
+import org.apache.qpid.server.logging.LogActor;
+
+import java.util.LinkedList;
+import java.util.Deque;
+
+public class CurrentActor
+{
+ private static final ThreadLocal<Deque<LogActor>> _currentActor = new ThreadLocal<Deque<LogActor>>()
+ {
+ protected Deque<LogActor> initialValue()
+ {
+ return new LinkedList<LogActor>();
+ }
+ };
+
+ public static void set(LogActor actor)
+ {
+ Deque<LogActor> stack = _currentActor.get();
+ stack.addFirst(actor);
+ }
+
+ public static void remove()
+ {
+ Deque<LogActor> stack = _currentActor.get();
+ stack.remove();
+ }
+
+ public static LogActor get()
+ {
+ return _currentActor.get().peek();
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java
new file mode 100644
index 0000000000..58d55a13bb
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.actors;
+
+import org.apache.qpid.server.logging.RootMessageLogger;
+
+import java.text.MessageFormat;
+import java.security.Principal;
+
+public class ManagementActor extends AbstractActor
+{
+
+ /**
+ * LOG FORMAT for the ManagementActor,
+ * Uses a MessageFormat call to insert the requried values according to
+ * these indicies:
+ *
+ * 0 - Connection ID
+ * 1 - User ID
+ * 2 - IP
+ */
+ public static final String MANAGEMENT_FORMAT = "mng:{0}({1}@{2})";
+
+ /**
+ * //todo Correct interface to provide connection details
+ * @param user
+ * @param rootLogger The RootLogger to use for this Actor
+ */
+ public ManagementActor(Principal user, RootMessageLogger rootLogger)
+ {
+ super(rootLogger);
+
+ _logString = "["+ MessageFormat.format(MANAGEMENT_FORMAT,
+ "<MNG:ConnectionID>",
+ user.getName(),
+ "<MNG:RemoteAddress>")
+ + "] ";
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLogger.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLogger.java
new file mode 100644
index 0000000000..3774155626
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLogger.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.logging.rawloggers;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.logging.RawMessageLogger;
+
+public class Log4jMessageLogger implements RawMessageLogger
+{
+ public static final String DEFAULT_LEVEL = "INFO";
+ public static final String DEFAULT_LOGGER = "qpid.message";
+ private Level _level;
+ private Logger _rawMessageLogger;
+
+ public Log4jMessageLogger()
+ {
+ this(DEFAULT_LEVEL, DEFAULT_LOGGER);
+ }
+
+ public Log4jMessageLogger(String level, String logger)
+ {
+ _level = Level.toLevel(level);
+
+ _rawMessageLogger = Logger.getLogger(logger);
+ }
+
+ public void rawMessage(String message)
+ {
+ rawMessage(message, null);
+ }
+
+ public void rawMessage(String message, Throwable throwable)
+ {
+ _rawMessageLogger.log(_level, message, throwable);
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/AbstractLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/AbstractLogSubject.java
new file mode 100644
index 0000000000..4fb5bdcc93
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/AbstractLogSubject.java
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import org.apache.qpid.server.logging.LogSubject;
+
+import java.text.MessageFormat;
+
+/**
+ * The LogSubjects all have a similar requriement to format their output and
+ * provide the String value.
+ *
+ * This Abstract LogSubject provides this basic functionality, allowing the
+ * actual LogSubjects to provide their formating and data.
+ */
+public abstract class AbstractLogSubject implements LogSubject
+{
+ /**
+ * The logString that will be returned via toString
+ */
+ protected String logString;
+
+ /**
+ * Set the toString logging of this LogSubject. Based on a format provided
+ * by format and the var args.
+ * @param format The Message to format
+ * @param args The values to put in to the message.
+ */
+ protected void setLogStringWithFormat(String format, Object... args)
+ {
+ logString = "[" + MessageFormat.format(format, args) + "] ";
+ }
+
+ /**
+ * ToString is how the Logging infrastructure will get the text for this
+ * LogSubject
+ *
+ * @return String representing this LogSubject
+ */
+ @Override
+ public String toString()
+ {
+ return logString;
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java
new file mode 100644
index 0000000000..fd171fea5a
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.queue.AMQQueue;
+
+public class BindingLogSubject extends AbstractLogSubject
+{
+
+ /**
+ * LOG FORMAT for the ChannelLogSubject,
+ * Uses a MessageFormat call to insert the requried values according to
+ * these indicies:
+ *
+ * 0 - Virtualhost Name
+ * 1 - Exchange Type
+ * 2 - Exchange Name
+ * 3 - Queue Name
+ * 4 - Binding RoutingKey
+ */
+ protected static String BINDING_FORMAT = "vh(/{0})/ex({1}/{2})/qu({3})/rk({4})";
+
+ /**
+ * Create a BindingLogSubject that Logs in the following format.
+ *
+ * [ vh(/)/ex(amq.direct)/qu(testQueue)/bd(testQueue) ]
+ *
+ * @param routingKey
+ * @param exchange
+ * @param queue
+ */
+ public BindingLogSubject(AMQShortString routingKey, Exchange exchange,
+ AMQQueue queue)
+ {
+ setLogStringWithFormat(BINDING_FORMAT, queue.getVirtualHost().getName(),
+ exchange.getType(),
+ exchange.getName(),
+ queue.getName(),
+ routingKey);
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
new file mode 100644
index 0000000000..1b22de6d01
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+
+public class ChannelLogSubject extends AbstractLogSubject
+{
+ /**
+ * LOG FORMAT for the ChannelLogSubject,
+ * Uses a MessageFormat call to insert the requried values according to
+ * these indicies:
+ *
+ * 0 - Connection ID
+ * 1 - User ID
+ * 2 - IP
+ * 3 - Virtualhost
+ * 4 - Channel ID
+ */
+ public static String CHANNEL_FORMAT = ConnectionLogSubject.CONNECTION_FORMAT
+ + "/ch:{4}";
+
+ public ChannelLogSubject(AMQChannel channel)
+ {
+ AMQProtocolSession session = channel.getProtocolSession();
+
+ // Provide the value for the 4th replacement.
+ setLogStringWithFormat(CHANNEL_FORMAT,
+ session.getSessionID(),
+ session.getAuthorizedID().getName(),
+ session.getRemoteAddress(),
+ session.getVirtualHost().getName(),
+ channel.getChannelId());
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java
new file mode 100644
index 0000000000..e07dbcda23
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+
+/** The Connection LogSubject */
+public class ConnectionLogSubject extends AbstractLogSubject
+{
+
+ /**
+ * LOG FORMAT for the ConnectionLogSubject,
+ * Uses a MessageFormat call to insert the requried values according to
+ * these indicies:
+ *
+ * 0 - Connection ID
+ * 1 - User ID
+ * 2 - IP
+ * 3 - Virtualhost
+ */
+ public static final String CONNECTION_FORMAT = "con:{0}({1}@{2}/{3})";
+
+ public ConnectionLogSubject(AMQProtocolSession session)
+ {
+ setLogStringWithFormat(CONNECTION_FORMAT, session.getSessionID(),
+ session.getAuthorizedID().getName(),
+ session.getRemoteAddress(),
+ session.getVirtualHost().getName());
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubject.java
new file mode 100644
index 0000000000..21e5f5e4ce
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubject.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class ExchangeLogSubject extends AbstractLogSubject
+{
+
+ /**
+ * LOG FORMAT for the ExchangeLogSubject,
+ * Uses a MessageFormat call to insert the requried values according to
+ * these indicies:
+ *
+ * 0 - Virtualhost Name
+ * 1 - Exchange Type
+ * 2 - Exchange Name
+ */
+ protected static String BINDING_FORMAT = "vh(/{0})/ex({1}/{2})";
+
+ /** Create an ExchangeLogSubject that Logs in the following format. */
+ public ExchangeLogSubject(Exchange exchange, VirtualHost vhost)
+ {
+ setLogStringWithFormat(BINDING_FORMAT, vhost.getName(),
+ exchange.getType(), exchange.getName());
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java
new file mode 100644
index 0000000000..89f31ef477
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import org.apache.qpid.server.queue.AMQQueue;
+
+public class QueueLogSubject extends AbstractLogSubject
+{
+
+ /**
+ * LOG FORMAT for the ExchangeLogSubject,
+ * Uses a MessageFormat call to insert the requried values according to
+ * these indicies:
+ *
+ * 0 - Virtualhost name
+ * 1 - queue name
+ */
+ protected static String BINDING_FORMAT = "vh(/{0})/qu({1})";
+
+ /** Create an QueueLogSubject that Logs in the following format. */
+ public QueueLogSubject(AMQQueue queue)
+ {
+ setLogStringWithFormat(BINDING_FORMAT,
+ queue.getVirtualHost().getName(),
+ queue.getName());
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubject.java
new file mode 100644
index 0000000000..b68ef2e9a9
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubject.java
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.logging.subjects;
+
+import org.apache.qpid.server.subscription.Subscription;
+
+public class SubscriptionLogSubject extends AbstractLogSubject
+{
+
+ /**
+ * LOG FORMAT for the SubscriptionLogSubject,
+ * Uses a MessageFormat call to insert the requried values according to
+ * these indicies:
+ *
+ * 0 - Subscription ID
+ * 1 - queue name
+ */
+ protected static String BINDING_FORMAT = "sub:{0}(qu({1}))";
+
+ /**
+ * Create an QueueLogSubject that Logs in the following format.
+ *
+ * @param subscription
+ */
+ public SubscriptionLogSubject(Subscription subscription)
+ {
+
+ setLogStringWithFormat(BINDING_FORMAT, subscription.getSubscriptionID(),
+ subscription.getQueue().getName());
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 205ca73f13..e46a52f3bf 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -21,26 +21,39 @@
package org.apache.qpid.server.protocol;
import org.apache.log4j.Logger;
-
+import org.apache.mina.common.CloseFuture;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.common.IoSession;
-import org.apache.mina.common.CloseFuture;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
-
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.codec.AMQDecoder;
import org.apache.qpid.common.ClientProperties;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.MethodDispatcher;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.handler.ServerMethodDispatcherImpl;
+import org.apache.qpid.server.logging.actors.AMQPConnectionActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.output.ProtocolOutputConverter;
@@ -54,7 +67,6 @@ import org.apache.qpid.transport.Sender;
import javax.management.JMException;
import javax.security.sasl.SaslServer;
-
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.security.Principal;
@@ -64,6 +76,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicLong;
public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
{
@@ -71,6 +84,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
+ private static final AtomicLong idGenerator = new AtomicLong(0);
+
// to save boxing the channelId and looking up in a map... cache in an array the low numbered
// channels. This value must be of the form 2^x - 1.
private static final int CHANNEL_CACHE_SIZE = 0xff;
@@ -120,6 +135,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
private static final long LAST_WRITE_FUTURE_JOIN_TIMEOUT = 60000L;
private org.apache.mina.common.WriteFuture _lastWriteFuture;
+ // Create a simple ID that increments for ever new Session
+ private final long _sessionID = idGenerator.getAndIncrement();
+
+ private AMQPConnectionActor _actor;
+
public ManagedObject getManagedObject()
{
return _managedObject;
@@ -134,6 +154,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
_codecFactory = codecFactory;
+ _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
+
try
{
IoServiceConfig config = session.getServiceConfig();
@@ -158,6 +180,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
_codecFactory = codecFactory;
+ _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
}
private AMQProtocolSessionMBean createMBean() throws AMQException
@@ -183,6 +206,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
return (AMQProtocolSession) minaProtocolSession.getAttachment();
}
+ public long getSessionID()
+ {
+ return _sessionID;
+ }
+
public void dataBlockReceived(AMQDataBlock message) throws Exception
{
_lastReceived = message;
@@ -235,6 +263,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
}
}
+ CurrentActor.set(_actor);
try
{
body.handle(channelId, this);
@@ -244,7 +273,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
closeChannel(channelId);
throw e;
}
-
+ finally
+ {
+ CurrentActor.remove();
+ }
}
private void protocolInitiationReceived(ProtocolInitiation pi)
@@ -796,6 +828,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
{
_virtualHost = virtualHost;
+ _actor.virtualHostSelected(this);
+
_virtualHost.getConnectionRegistry().registerConnection(this);
_managedObject = createMBean();
@@ -820,6 +854,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
public void setAuthorizedID(Principal authorizedID)
{
_authorizedID = authorizedID;
+
+ // Let the actor know that this connection is now Authorized
+ _actor.connectionAuthorized(this);
}
public Principal getAuthorizedID()
@@ -827,6 +864,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
return _authorizedID;
}
+ public SocketAddress getRemoteAddress()
+ {
+ return _minaProtocolSession.getRemoteAddress();
+ }
+
public MethodRegistry getMethodRegistry()
{
return MethodRegistry.getMethodRegistry(getProtocolVersion());
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index 1bac601225..f721730d9c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -36,6 +36,7 @@ import java.security.Principal;
public interface AMQProtocolSession extends AMQVersionAwareProtocolSession
{
+ long getSessionID();
public static final class ProtocolSessionIdentifier
{
@@ -198,6 +199,8 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession
/** @return a Principal that was used to authorized this session */
Principal getAuthorizedID();
+ public java.net.SocketAddress getRemoteAddress();
+
public MethodRegistry getMethodRegistry();
public MethodDispatcher getMethodDispatcher();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index e994967dc5..8c66508307 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -202,6 +202,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public void bind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException
{
+
exchange.registerQueue(routingKey, this, arguments);
if (isDurable() && exchange.isDurable())
{
@@ -209,6 +210,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
_bindings.addBinding(routingKey, arguments, exchange);
+// ExchangeBinding binding = new ExchangeBinding(routingKey, exchange, arguments);
+
+ //fixme MR logging in progress
+// _bindings.addBinding(binding);
+//
+// if (_logger.isMessageEnabled(binding))
+// {
+// _logger.message(binding, "QM-1001 : Created Binding");
+// }
}
public void unBind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
index 22b4623ae1..b58b849133 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
@@ -35,6 +35,7 @@ import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.logging.RootMessageLogger;
/**
* An abstract application registry that provides access to configuration information and handles the
@@ -70,6 +71,8 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
protected PluginManager _pluginManager;
+ protected RootMessageLogger _rootMessageLogger;
+
static
{
Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownService()));
@@ -287,4 +290,9 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
return _pluginManager;
}
+ public RootMessageLogger getRootMessageLogger()
+ {
+ return _rootMessageLogger;
+ }
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
index 39164883f9..31a85b878a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
@@ -33,6 +33,8 @@ import org.apache.qpid.server.security.auth.database.ConfigurationFilePrincipalD
import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.logging.RootMessageLoggerImpl;
+import org.apache.qpid.server.logging.rawloggers.Log4jMessageLogger;
public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
{
@@ -44,9 +46,12 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
public void initialise() throws Exception
{
+ _rootMessageLogger = new RootMessageLoggerImpl(_configuration,
+ new Log4jMessageLogger());
+
initialiseManagedObjectRegistry();
- _virtualHostRegistry = new VirtualHostRegistry();
+ _virtualHostRegistry = new VirtualHostRegistry(this);
_pluginManager = new PluginManager(_configuration.getPluginDirectory());
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
index bbfda3addc..7d17639f22 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
@@ -33,6 +33,7 @@ import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
import org.apache.qpid.server.security.access.ACLManager;
import org.apache.qpid.server.security.access.ACLPlugin;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.logging.RootMessageLogger;
import org.apache.mina.common.IoAcceptor;
public interface IApplicationRegistry
@@ -69,6 +70,8 @@ public interface IApplicationRegistry
PluginManager getPluginManager();
+ RootMessageLogger getRootMessageLogger();
+
/**
* Register any acceptors for this registry
* @param bindAddress The address that the acceptor has been bound with
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
index 9419572399..19eabce9ff 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
@@ -52,6 +52,8 @@ public interface Subscription
AMQShortString getConsumerTag();
+ long getSubscriptionID();
+
boolean isSuspended();
boolean hasInterest(QueueEntry msg);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index 7aa9d1e3af..51da884d1e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.subscription;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -66,6 +67,11 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
private QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
private final Lock _stateChangeLock;
+ private static final AtomicLong idGenerator = new AtomicLong(0);
+ // Create a simple ID that increments for ever new Subscription
+ private final long _subscriptionID = idGenerator.getAndIncrement();
+
+
static final class BrowserSubscription extends SubscriptionImpl
{
public BrowserSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
@@ -526,6 +532,11 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
return _consumerTag;
}
+ public long getSubscriptionID()
+ {
+ return _subscriptionID;
+ }
+
public AMQProtocolSession getProtocolSession()
{
return _channel.getProtocolSession();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
index eda2d3a94e..9ef1e029d3 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
@@ -20,17 +20,12 @@
*/
package org.apache.qpid.server.util;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Properties;
-
-import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.MapConfiguration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.logging.RootMessageLoggerImpl;
+import org.apache.qpid.server.logging.rawloggers.Log4jMessageLogger;
import org.apache.qpid.server.management.NoopManagedObjectRegistry;
import org.apache.qpid.server.plugins.PluginManager;
import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -41,6 +36,10 @@ import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticat
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Properties;
+
public class NullApplicationRegistry extends ApplicationRegistry
{
public NullApplicationRegistry() throws ConfigurationException
@@ -51,9 +50,11 @@ public class NullApplicationRegistry extends ApplicationRegistry
public void initialise() throws Exception
{
_logger.info("Initialising NullApplicationRegistry");
-
+
+ _rootMessageLogger = new RootMessageLoggerImpl(_configuration, new Log4jMessageLogger());
+
_configuration.setHousekeepingExpiredMessageCheckPeriod(200);
-
+
Properties users = new Properties();
users.put("guest", "guest");
@@ -65,7 +66,7 @@ public class NullApplicationRegistry extends ApplicationRegistry
_authenticationManager = new PrincipalDatabaseAuthenticationManager(null, null);
_managedObjectRegistry = new NoopManagedObjectRegistry();
- _virtualHostRegistry = new VirtualHostRegistry();
+ _virtualHostRegistry = new VirtualHostRegistry(this);
PropertiesConfiguration vhostProps = new PropertiesConfiguration();
VirtualHostConfiguration hostConfig = new VirtualHostConfiguration("test", vhostProps);
VirtualHost dummyHost = new VirtualHost(hostConfig);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java
index 27917fac8a..5543adbeb5 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.server.virtualhost;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
@@ -32,6 +35,12 @@ public class VirtualHostRegistry
private String _defaultVirtualHostName;
+ private ApplicationRegistry _applicationRegistry;
+
+ public VirtualHostRegistry(ApplicationRegistry applicationRegistry)
+ {
+ _applicationRegistry = applicationRegistry;
+ }
public synchronized void registerVirtualHost(VirtualHost host) throws Exception
{
@@ -67,4 +76,9 @@ public class VirtualHostRegistry
{
return new ArrayList<VirtualHost>(_registry.values());
}
+
+ public ApplicationRegistry getApplicationRegistry()
+ {
+ return _applicationRegistry;
+ }
}